首先,你需要申请一个企业微信。https://work.weixin.qq.com/
然后创建一个应用,获取其 agentid和secret,再加上自己的企业id,再利用企业微信的api 就可以实现发送对应的消息到某个联系人或者某个部门。其实什么语言都可以,为了方便我们使用Python写个简单的demo。
代码链接:python企业微信发送消息
为了方便配置管理,我们将所有的配置写入配置文件:
wechatbot:
appid: xxxxx
agentid: xxxxx
secret: xxxxxxx
bot_access_token_url: https://qyapi.weixin.qq.com/cgi-bin/gettoken
send_msg_url: https://qyapi.weixin.qq.com/cgi-bin/message/send?access_token=
整个获取工具如下,代码比较简单,需要注意的是 access_token 需要缓存,过期后再去请求微信的api获取,这一步我们简单将其缓存到文件,有需要的可以缓存到redis:
#!/usr/bin/env python3
# -*-coding : utf-8 -*-
# author magicdu
import os
import time
import requests
import json
import yaml
import re
try:
from yaml import CLoader as Loader, CDumper as Dumper
except ImportError:
from yaml import Loader, Dumper
def getIP():
ip= requests.get("https://api.ipify.org/?format=json")
ip=json.loads(ip.text)
return ip['ip']
# 加载yaml文件
def loadYmlConfig():
file = open('bootstrap.yml', 'r', encoding='utf-8')
stream = file.read()
config = yaml.load(stream, Loader=Loader)
return config
# 获取 yaml 文件的 某个节点的 配置
def getConfig(key):
keyConfig = loadYmlConfig()[key]
print(keyConfig)
return keyConfig
# 获取企业号的配置
def getBotConfig():
ymlconfig = getConfig('wechatbot')
botConfig = WechatBotConfig(
ymlconfig['appid'], ymlconfig['agentid'], ymlconfig['secret'], ymlconfig['bot_access_token_url'], ymlconfig['send_msg_url'])
return botConfig
# 获取 token
def get_access_token(config):
values = {'corpid': config.appid,
'corpsecret': config.secret,
}
req = requests.post(config.bot_access_token_url, params=values)
data = json.loads(req.text)
return data["access_token"]
# 获取 access_token 并缓存到本地
# token 过期时重新获取
def get_bot_access_token(config):
try:
with open('./tmp/access_token.conf', 'r') as f:
t, access_token = f.read().split()
except:
with open('./tmp/access_token.conf', 'w') as f:
access_token = get_access_token(config)
cur_time = time.time()
f.write('\t'.join([str(cur_time), access_token]))
return access_token
else:
cur_time = time.time()
if 0 < cur_time - float(t) < 7260:
return access_token
else:
with open('./tmp/access_token.conf', 'w') as f:
access_token = get_access_token(config)
f.write('\t'.join([str(cur_time), access_token]))
return access_token
## 发送消息
def send_data(config,userids,message):
send_url = config.send_msg_url + get_bot_access_token(config)
send_values = {
"touser": userids,
"msgtype": "text",
"agentid": config.agentid,
"text": {
"content": message
},
"safe": "0"
}
send_msges=(bytes(json.dumps(send_values), 'utf-8'))
respone = requests.post(send_url, send_msges)
respone = respone.json() #当返回的数据是json串的时候直接用.json即可将respone转换成字典
print(respone)
return respone["errmsg"]
# 企业号 配置
class WechatBotConfig(object):
def __init__(self, appid, agentid, secret, bot_access_token_url, send_msg_url):
self.appid = appid
self.agentid = agentid
self.secret = secret
self.bot_access_token_url = bot_access_token_url
self.send_msg_url=send_msg_url
测试函数,获取当前的公网ip,发送到所有的企业微信用户:
#!/usr/bin/env python3
# -*-coding : utf-8 -*-
# author magicdu
from botutils import getBotConfig,send_data,getIP
if __name__ == "__main__":
config=getBotConfig()
partyIds="@all"
ip=getIP()
print(ip)
send_data(config,partyIds,ip)