# conf.ini
[wechat]
corpid =
[app]
it = <app_agent_id>:<app_secret>
[group]
it = usera|userb
#!/usr/bin/env python3
"""
@author: zyh
@contact: [email protected]
@software: vscode
@file: sendchat.py
@time: 2020/02/05
"""
import sys, os, requests, pathlib, json, configparser
import logging
import logging.handlers
from datetime import datetime
class PySendchat():
def __init__(self, corpid, agentid, secret, touser, content):
self.corpid=corpid
self.agentid=agentid
self.secret=secret
self.touser=touser
self.content=content
LOG_PATHDIR=os.path.dirname(os.path.abspath(__file__))
LOG_FILENAME = '{0}/sendchat.log'.format(LOG_PATHDIR)
self.my_logger = logging.getLogger('SendChat')
self.my_logger.setLevel(logging.INFO)
# Add the log message handler to the logger
handler = logging.handlers.RotatingFileHandler(LOG_FILENAME, maxBytes=102400000, backupCount=5)
# create formatter and add it to the handlers
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
handler.setFormatter(formatter)
self.my_logger.addHandler(handler)
def gettoken(self):
self.my_logger.info('-----------------------------------------------------------------------------------------')
pwd=os.path.dirname(os.path.abspath(__file__))
tokenfile='{0}/wechat.{1}'.format(pwd,self.agentid)
if pathlib.Path(tokenfile).exists():
tokenfilectime=os.path.getctime(tokenfile)
currenttime=datetime.now().timestamp()
dtime=currenttime-tokenfilectime
self.my_logger.info('{0} lived {1}s.'.format(tokenfile, dtime))
if dtime >= 7200:
try:
os.remove(tokenfile)
self.my_logger.info('Token file {0}: delete success'.format(tokenfile))
except Exception as e:
self.my_logger.error('Token file:{0} delete error.Reason:{1}'.format(tokenfile,e))
exit
# check token file
try:
tokensize = os.path.getsize(tokenfile)
except Exception as e:
self.my_logger.info('Token file is not exist.Reason:{0}'.format(e))
tokensize = 0
# get token from token file
if tokensize != 0:
with open(tokenfile, 'rb') as fd:
token = fd.read() # get token success
self.my_logger.info('Get token from token file.')
jsonObject = json.loads(token.decode(encoding='utf8'))
access_token = jsonObject.get("access_token")
return access_token
# get token from weixin api
else:
try:
self.my_logger.info('New Token Create.')
f = requests.get('https://qyapi.weixin.qq.com/cgi-bin/gettoken?corpid={0}&corpsecret={1}'.format(self.corpid, self.secret))
token = f.content
self.my_logger.info('Get token from weixin api.')
jsonObject = json.loads(token.decode(encoding='utf8'))
errcode=int(jsonObject.get("errcode"))
if errcode != 0:
errmsg=jsonObject.get("errmsg")
self.my_logger.error('Get token error!Reason:{0}'.format(errmsg))
exit()
except Exception as e:
self.my_logger.error('Get token error!Reason:{0}'.format(e))
exit()
try:
self.my_logger.info('Write token to {0}.'.format(tokenfile))
with open(tokenfile, 'wb') as fd:
fd.write(token)
except Exception as e:
self.my_logger.error('Write {0} error!Reason:{1}'.format(tokenfile,e))
exit()
access_token = jsonObject.get("access_token")
return access_token
def sendmsg(self):
accessToken = self.gettoken()
self.my_logger.info('Token:{0}'.format(accessToken))
sendMapDirectroy = {}
sendMapDirectroy["agentid"] = self.agentid
sendMapDirectroy["touser"] = self.touser
sendMapDirectroy["msgtype"] = "text"
sendMapDirectroy["safe"] = "0"
contentDirectory = {}
sendMapDirectroy["text"] = contentDirectory
contentDirectory["content"] = self.content
bodyStr = json.dumps(sendMapDirectroy, ensure_ascii=False).encode(encoding="utf-8")
try:
f = requests.post(url="https://qyapi.weixin.qq.com/cgi-bin/message/send?access_token=%s" % accessToken,
data=bodyStr, timeout=5)
self.my_logger.info(f.content)
except Exception as e:
self.my_logger.error('Send chat network error!Reason:{0}'.format(e))
if __name__ == '__main__':
appname = sys.argv[1]
content = sys.argv[3]
# read conf.ini
conf = configparser.ConfigParser()
conf_path = os.path.dirname(os.path.abspath(__file__))
conf_ini = "{0}/conf.ini".format(conf_path)
if pathlib.Path(conf_ini).exists():
conf.read(conf_ini)
corpid = conf.get("wechat", "corpid")
appinfo = conf.get("app", appname)
agentid = appinfo.split(':')[0]
secret = appinfo.split(':')[1]
groupname = conf.get("group", appname)
touser = groupname.split(':')[0]
chatobj = PySendchat(corpid, agentid, secret, touser, content)
else:
print('conf.ini error')
exit()
try:
chatobj.sendmsg()
except:
chatobj.my_logger.error("Send chat failure!")
eg:
python sendchat.py it ‘’ <预警内容>
创建好app,并关联用户到app
执行上述命令,会将预警内容通过<app_agent_id> 应用发送给用户usera和userb