大部分聊天软件的机器人自动回复消息流程
sequenceDiagram
participant 用户
participant 聊天软件后台
participant 机器人后台
聊天软件后台 ->> 机器人后台: 向回调地址发送验证消息
机器人后台 ->> 聊天软件后台: 回复
用户 ->> 聊天软件后台: 发送消息
聊天软件后台 ->> 机器人后台: 向回调地址推送消息
机器人后台 ->> 聊天软件后台: 确认收到
机器人后台 ->> 聊天软件后台: 请求 Access Token
聊天软件后台 ->> 机器人后台: Access Token
机器人后台 ->> 聊天软件后台: 调用发送消息的接口
聊天软件后台 ->> 用户: 发送消息
QQ 机器人
文档:QQ 机器人 - 简介
控制台:QQ 开放平台
申请流程
- 在 QQ 开放平台注册账号,可以选“个人主体入驻”
- 创建应用 -> 创建机器人
- 开发设置 -> 记录 APP ID、APP Secret
- 沙箱配置 -> 将测试频道添加到沙箱环境
部署后台
使用 python SDK,Github - botpy
安装
pip install qq-botpy
示例脚本如下
import botpy
from botpy.message import Message
class MyClient(botpy.Client):
# 接受频道所有消息
async def on_message_create(self, message: Message):
logging.info(f'QQ received message from {message.author.username}: {message.content}')
reply = 'Hello, ' + message.content
await message.reply(content=reply)
# 接收 @机器人 的消息
async def on_at_message_create(self, message: Message):
at_pattern = r"^<@!?(\d+)>[\s]*"
content = re.sub(at_pattern, '', message.content)
logging.info(f'QQ received at message from {message.author.username}: {content}')
reply = 'Hello, ' + message.content
await message.reply(content=reply)
# 接收私信的消息
async def on_direct_message_create(self, message: Message):
at_pattern = r"^<@!?(\d+)>[\s]*"
content = re.sub(at_pattern, '', message.content)
logging.info(f'QQ received at message from {message.author.username}: {content}')
reply = 'Hello, ' + message.content
await message.reply(content=reply)
intents = botpy.Intents(
guild_messages=True,
public_guild_messages=True,
direct_message=True
)
client = MyClient(intents=intents)
client.run(appid=YOUR_APP_ID, app_secret=YOUR_APP_SECRET)
其中 intents 表示监听事件,监听事件后通过实现 client
的 on_XX
方法可以获取并响应对应事件
- guild_messages:频道消息(只有私域机器人可以监听频道所有消息)
- on_message_create:接收频道所有消息
- direct_message:私信消息
- on_direct_message_create:接收私信给机器人的消息
- public_guild_messages:公域消息(公域机器人只能监听被 @ 的消息)
- on_at_message_create:接收 @机器人 的消息
所有监听事件见文档
Discord 机器人
申请流程,也可以参考文档 Getting Started
- 开发后台申请创建一个 Application:Developer Portal
- General Information -> 记录 Application ID、Public Key(使用 SDK 似乎不需要用到)
- Bot -> 设置 Token 并记录下来
- 配置权限:
- Bot -> 勾选 Message Content Intent
- OAuth2 -> OAuth2 URL Generator -> bot -> Administrator
部署后台
使用 python SDK,discord.py - Quickstart
安装
pip install -U discord.py
示例脚本如下
import discord
intents = discord.Intents.default()
intents.message_content = True
client = discord.Client(intents=intents)
@client.event
async def on_ready():
print(f'We have logged in as {client.user}')
@client.event
async def on_message(message):
if message.author == client.user:
return
if message.content.startswith('$hello'):
await message.channel.send('Hello!')
client.run('your token here')
修改 token 后运行该脚本即可,机器人客户端会自动请求 Discord 服务器并保持心跳,无需配置回调地址或者放开服务端口。
注意:机器人后台最好部署在海外服务器,不然可能请求不通 Discord 的服务器
飞书机器人
申请流程
- 飞书开放平台后台创建一个应用
- 根据文档申请权限
- 添加应用能力 -> 机器人
- 权限管理 -> API 权限 -> 获取与发送单聊、群组消息
- 凭证与信息管理 -> 记录 APP_ID、APP_SECRET,用于申请 Access Token
- 事件与回调
- 事件与回调 -> 记录 Encrypt Key、Verification Token
- 事件订阅 -> 根据上述 Encrypt Key、Verification Token 启动验证后台服务,配置回调地址
- 添加监听事件 -> 接收消息(Message received),并开通对应权限
- 配置 IP 白名单:安全设置 -> IP 白名单
- 开通发送消息权限:发送消息
- 应用发布:版本管理与发布
部署后台
验证 URL 与接受消息
from fastapi import FastAPI
from pydantic import BaseModel
class LarkRequest(BaseModel):
encrypt: str
app = FastAPI()
@app.post('/bot/lark')
def bot_lark(req: LarkRequest):
cipher = AESCipher(YOUR_ENCRYPT_KEY)
decrypted = bot.cipher.decrypt_string(encrypt)
data = json.loads(decrypted)
logging.info(data)
if data.get('type') == 'url_verification':
return {
"challenge": data.get('challenge', 'challenge_failed')
}
message = data.get('event', {}).get('message', {})
msg_content = json.loads(message.get('content', '{}'))
content = msg_content.get('text', '')
sender = data.get('event', {}).get('sender', {})
user_id = sender.get('sender_id', {}).get('user_id', '')
logging.info(f"received message `{content}` from user `{user_id}`")
其中解密模块如下
#!pip install pycryptodome
from Crypto.Cipher import AES
import base64
import hashlib
class AESCipher(object):
def __init__(self, key):
self.bs = AES.block_size
self.key=hashlib.sha256(AESCipher.str_to_bytes(key)).digest()
@staticmethod
def str_to_bytes(data):
u_type = type(b"".decode('utf8'))
if isinstance(data, u_type):
return data.encode('utf8')
return data
@staticmethod
def _unpad(s):
return s[:-ord(s[len(s) - 1:])]
def decrypt(self, enc):
iv = enc[:AES.block_size]
cipher = AES.new(self.key, AES.MODE_CBC, iv)
return self._unpad(cipher.decrypt(enc[AES.block_size:]))
def decrypt_string(self, enc):
enc = base64.b64decode(enc)
return self.decrypt(enc).decode('utf8')
发送消息
获取 access token
import httpx
import logging
url = 'https://open.feishu.cn/open-apis/auth/v3/tenant_access_token/internal'
headers = {
'Content-Type': 'application/json; charset=utf-8'
}
data = {
'app_id': YOUR_APP_ID,
'app_secret': YOUR_APP_SECRET,
}
with httpx.Client() as client:
r = client.post(url, headers=headers, json=data)
try:
token = r.json()['tenant_access_token']
logging.info(f'token updated: {token}')
except Exception as e:
logging.error(f'update token failed: {e}')
logging.error(r.text)
发送消息
url = 'https://open.feishu.cn/open-apis/im/v1/messages?receive_id_type=chat_id'
headers = {
'Authorization': 'Bearer ' + YOUR_ACCESS_TOKEN,
'Content-Type': 'application/json; charset=utf-8',
}
content = {
"text": 'YOUR_REPLY',
}
data = {
'receive_id': 'CHAT_ID_FROM_EVENT',
'msg_type': 'text',
'content': json.dumps(content)
}
with httpx.Client() as client:
r = client.post(url, headers=headers, json=data)
logging.info(r.text)
Lark 机器人
开发流程与接口与飞书类似,有以下几个区别点
- 控制台地址为 Lark Developer,文档地址为 Quick Starts
- API 域名不同,例如发送消息的 API 飞书为 https://open.feishu.cn/open-apis/im/v1/messages, Lark 为 https://open.larksuite.com/open-apis/im/v1/messages ,获取 Access Token 的 API 飞书为 https://open.feishu.cn/open-apis/auth/v3/tenant_access_token/internal,Lark 为 https://open.larksuite.com/open-apis/auth/v3/tenant_access_token/internal
- 机器人后台最好部署在海外服务器,不然可能请求不通 Lark 的服务器
企业微信机器人
创建流程
- 在某个群聊 -> 右上角 … 号 -> 添加群机器人 -> 新创建机器人
- 创建机器人后点 “点击配置”
- 机器人名称下方有Webhook 地址,推送消息调用这个地址即可
- 需要接收消息点 “接收消息配置”,配置回调地址、Token、EncodingAESKey(目前只有腾讯内部开启了这个配置项)
验证消息
配置回调地址时会发送验证消息,需要将消息解密后返回才能通过验证
from fastapi.responses import PlainTextResponse
from fastapi import FastAPI
import logging
from urllib.parse import unquote
app = FastAPI()
@app.get('/bot', response_class=PlainTextResponse)
def handle_echo(msg_signature: str, timestamp: str, nonce: str, echostr: str) -> str:
echostr = unquote(echostr)
logging.info(f'msg_signature: {msg_signature}, timestamp: {timestamp}, nonce: {nonce}, echostr: {echostr}')
ret, echostr = bot.wxcpt.VerifyURL(msg_signature, timestamp, nonce, echostr)
logging.info(f'ret: {ret}, echostr: {echostr}')
if ret != 0:
logging.error("error, VerifyURL ret: " + str(ret))
return ''
return echostr.decode('utf-8')
其中解密模块见:Github - weworkapi_python - WXBizMsgCrypt3.py
需要安装 pycryptodome
pip install pycryptodome
接收消息
与验证消息的 handler 类似,但是是 post 请求
from fastapi import Body
@app.post('/bot', response_class=PlainTextResponse)
def handle_echo(msg_signature: str, timestamp: str, nonce: str, data: str = Body(..., media_type="text/plain")) -> str:
ret, xml = bot.wxcpt.DecryptMsg(data, msg_signature, timestamp, nonce)
if ret != 0:
logging.error("error, DecryptMsg ret: " + str(ret))
return ''
msg = _xml.read_xml(xml)
if msg is None:
logging.error(f"parse xml error, xml: {xml}")
return ''
logging.info(f'receive msg: {msg}')
return ''
消息解密后是 xml 格式,可以用以下代码解析
from typing import NamedTuple
import logging
import xml.etree.cElementTree as ET
class Message(NamedTuple):
user_id: str
user_name: str
user_alias: str
webhook_url: str
chat_id: str
chat_info_url: str
msg_id: str
msg_type: str
chat_type: str
content: str
def read_xml(xml: bytes) -> Message:
try:
xml_tree = ET.fromstring(xml.decode())
sender = xml_tree.find('From')
user_id = sender.find('UserId').text
user_name = sender.find('Name').text
user_alias = sender.find('Alias').text
webhook_url = xml_tree.find('WebhookUrl').text
chat_id = xml_tree.find('ChatId').text
chat_info_url = xml_tree.find('GetChatInfoUrl').text
msg_id = xml_tree.find('MsgId').text
msg_type = xml_tree.find('MsgType').text
chat_type = xml_tree.find('ChatType').text
content = ''
if msg_type == 'text':
content = xml_tree.find('Text').find('Content').text
return Message(
user_id,
user_name,
user_alias,
webhook_url,
chat_id,
chat_info_url,
msg_id,
msg_type,
chat_type,
content
)
except Exception as e:
logging.error(f"read xml error: {str(e)}")
return None
推送消息
发送文字示例,支持 markdown 格式
import httpx
body = {
'msgtype': 'markdown',
'markdown': {
'content': 'YOUR_MESSAGE'
}
}
if chat_id is not None:
body['chatid'] = chat_id # 发送到指定群聊
if user_id is not None:
content = '<@{user_id}>\n' + content # 加上 @用户
r = httpx.post(webhook_url, json=body)
发送图片示例
body = {
'msgtype': 'image',
'image': {
'base64': image_base64,
'md5': image_md5,
}
}
r = httpx.post(self.webhook_url, json=body)