大部分聊天软件的机器人自动回复消息流程

sequenceDiagram
	participant 用户
    participant 聊天软件后台
    participant 机器人后台
	聊天软件后台 ->> 机器人后台: 向回调地址发送验证消息
	机器人后台 ->> 聊天软件后台: 回复
    用户 ->> 聊天软件后台: 发送消息
    聊天软件后台 ->> 机器人后台: 向回调地址推送消息
    机器人后台 ->> 聊天软件后台: 确认收到
    机器人后台 ->> 聊天软件后台: 请求 Access Token
	聊天软件后台 ->> 机器人后台: Access Token
    机器人后台 ->> 聊天软件后台: 调用发送消息的接口
    聊天软件后台 ->> 用户: 发送消息

QQ 机器人

文档:QQ 机器人 - 简介

控制台:QQ 开放平台

申请流程

  1. 在 QQ 开放平台注册账号,可以选“个人主体入驻”
  2. 创建应用 -> 创建机器人
  3. 开发设置 -> 记录 APP ID、APP Secret
  4. 沙箱配置 -> 将测试频道添加到沙箱环境

部署后台

使用 python SDK,Github - botpy

安装

pip install qq-botpy

示例脚本如下

import botpy
from botpy.message import Message

class MyClient(botpy.Client):

	# 接受频道所有消息
	async def on_message_create(self, message: Message):
	    logging.info(f'QQ received message from {message.author.username}: {message.content}')
        reply = 'Hello, ' + message.content
        await message.reply(content=reply)

	# 接收 @机器人 的消息
    async def on_at_message_create(self, message: Message):
        at_pattern = r"^<@!?(\d+)>[\s]*"
        content = re.sub(at_pattern, '', message.content)
        logging.info(f'QQ received at message from {message.author.username}: {content}')
        reply = 'Hello, ' + message.content
        await message.reply(content=reply)
        
    # 接收私信的消息
    async def on_direct_message_create(self, message: Message):
        at_pattern = r"^<@!?(\d+)>[\s]*"
        content = re.sub(at_pattern, '', message.content)
        logging.info(f'QQ received at message from {message.author.username}: {content}')
        reply = 'Hello, ' + message.content
        await message.reply(content=reply)


intents = botpy.Intents(
    guild_messages=True,
    public_guild_messages=True,
    direct_message=True
)
client = MyClient(intents=intents)
client.run(appid=YOUR_APP_ID, app_secret=YOUR_APP_SECRET)

其中 intents 表示监听事件,监听事件后通过实现 clienton_XX 方法可以获取并响应对应事件

  • guild_messages:频道消息(只有私域机器人可以监听频道所有消息)
    • on_message_create:接收频道所有消息
  • direct_message:私信消息
    • on_direct_message_create:接收私信给机器人的消息
  • public_guild_messages:公域消息(公域机器人只能监听被 @ 的消息)
    • on_at_message_create:接收 @机器人 的消息

所有监听事件见文档

Discord 机器人

申请流程,也可以参考文档 Getting Started

  1. 开发后台申请创建一个 Application:Developer Portal
  2. General Information -> 记录 Application ID、Public Key(使用 SDK 似乎不需要用到)
  3. Bot -> 设置 Token 并记录下来
  4. 配置权限:
    1. Bot -> 勾选 Message Content Intent
    2. OAuth2 -> OAuth2 URL Generator -> bot -> Administrator

部署后台

使用 python SDK,discord.py - Quickstart

安装

pip install -U discord.py

示例脚本如下

import discord

intents = discord.Intents.default()
intents.message_content = True

client = discord.Client(intents=intents)

@client.event
async def on_ready():
    print(f'We have logged in as {client.user}')

@client.event
async def on_message(message):
    if message.author == client.user:
        return

    if message.content.startswith('$hello'):
        await message.channel.send('Hello!')

client.run('your token here')

修改 token 后运行该脚本即可,机器人客户端会自动请求 Discord 服务器并保持心跳,无需配置回调地址或者放开服务端口。

注意:机器人后台最好部署在海外服务器,不然可能请求不通 Discord 的服务器

飞书机器人

申请流程

  1. 飞书开放平台后台创建一个应用
  2. 根据文档申请权限
    1. 添加应用能力 -> 机器人
    2. 权限管理 -> API 权限 -> 获取与发送单聊、群组消息
  3. 凭证与信息管理 -> 记录 APP_ID、APP_SECRET,用于申请 Access Token
  4. 事件与回调
    1. 事件与回调 -> 记录 Encrypt Key、Verification Token
    2. 事件订阅 -> 根据上述 Encrypt Key、Verification Token 启动验证后台服务,配置回调地址
    3. 添加监听事件 -> 接收消息(Message received),并开通对应权限
  5. 配置 IP 白名单:安全设置 -> IP 白名单
  6. 开通发送消息权限:发送消息
  7. 应用发布:版本管理与发布

部署后台

验证 URL 与接受消息

from fastapi import FastAPI
from pydantic import BaseModel

class LarkRequest(BaseModel):
    encrypt: str

app = FastAPI()
    
@app.post('/bot/lark')
def bot_lark(req: LarkRequest):
    cipher = AESCipher(YOUR_ENCRYPT_KEY)
    decrypted = bot.cipher.decrypt_string(encrypt)
    data = json.loads(decrypted)
    logging.info(data)
    
    if data.get('type') == 'url_verification':
        return {
            "challenge": data.get('challenge', 'challenge_failed')
        }
    
    message = data.get('event', {}).get('message', {})
    msg_content = json.loads(message.get('content', '{}'))
    content = msg_content.get('text', '')
    sender = data.get('event', {}).get('sender', {})
    user_id = sender.get('sender_id', {}).get('user_id', '')
    logging.info(f"received message `{content}` from user `{user_id}`")

其中解密模块如下

#!pip install pycryptodome

from Crypto.Cipher import AES
import base64
import hashlib

class  AESCipher(object):
    def __init__(self, key):
        self.bs = AES.block_size
        self.key=hashlib.sha256(AESCipher.str_to_bytes(key)).digest()
    @staticmethod
    def str_to_bytes(data):
        u_type = type(b"".decode('utf8'))
        if isinstance(data, u_type):
            return data.encode('utf8')
        return data
    @staticmethod
    def _unpad(s):
        return s[:-ord(s[len(s) - 1:])]
    def decrypt(self, enc):
        iv = enc[:AES.block_size]
        cipher = AES.new(self.key, AES.MODE_CBC, iv)
        return  self._unpad(cipher.decrypt(enc[AES.block_size:]))
    def decrypt_string(self, enc):
        enc = base64.b64decode(enc)
        return  self.decrypt(enc).decode('utf8')
    

发送消息

获取 access token

import httpx
import logging

url = 'https://open.feishu.cn/open-apis/auth/v3/tenant_access_token/internal'
headers = {
    'Content-Type': 'application/json; charset=utf-8'
}
data = {
    'app_id': YOUR_APP_ID,
    'app_secret': YOUR_APP_SECRET,
}

with httpx.Client() as client:
    r = client.post(url, headers=headers, json=data)
    try:
        token = r.json()['tenant_access_token']
        logging.info(f'token updated: {token}')
    except Exception as e:
        logging.error(f'update token failed: {e}')
        logging.error(r.text)

发送消息

url = 'https://open.feishu.cn/open-apis/im/v1/messages?receive_id_type=chat_id'
headers = {
    'Authorization': 'Bearer ' + YOUR_ACCESS_TOKEN,
    'Content-Type': 'application/json; charset=utf-8',
}
content = {
    "text": 'YOUR_REPLY',
}
data = {
    'receive_id': 'CHAT_ID_FROM_EVENT',
    'msg_type': 'text',
    'content': json.dumps(content)
}

with httpx.Client() as client:
    r = client.post(url, headers=headers, json=data)
    logging.info(r.text)

Lark 机器人

开发流程与接口与飞书类似,有以下几个区别点

  1. 控制台地址为 Lark Developer,文档地址为 Quick Starts
  2. API 域名不同,例如发送消息的 API 飞书为 https://open.feishu.cn/open-apis/im/v1/messages, Lark 为 https://open.larksuite.com/open-apis/im/v1/messages ,获取 Access Token 的 API 飞书为 https://open.feishu.cn/open-apis/auth/v3/tenant_access_token/internal,Lark 为 https://open.larksuite.com/open-apis/auth/v3/tenant_access_token/internal
  3. 机器人后台最好部署在海外服务器,不然可能请求不通 Lark 的服务器

企业微信机器人

创建流程

  1. 在某个群聊 -> 右上角 … 号 -> 添加群机器人 -> 新创建机器人
  2. 创建机器人后点 “点击配置”
    1. 机器人名称下方有Webhook 地址,推送消息调用这个地址即可
    2. 需要接收消息点 “接收消息配置”,配置回调地址、Token、EncodingAESKey(目前只有腾讯内部开启了这个配置项)

验证消息

配置回调地址时会发送验证消息,需要将消息解密后返回才能通过验证

from fastapi.responses import PlainTextResponse
from fastapi import FastAPI
import logging
from urllib.parse import unquote

app = FastAPI()

@app.get('/bot', response_class=PlainTextResponse)
def handle_echo(msg_signature: str, timestamp: str, nonce: str, echostr: str) -> str:
    echostr = unquote(echostr)
    logging.info(f'msg_signature: {msg_signature}, timestamp: {timestamp}, nonce: {nonce}, echostr: {echostr}')
    ret, echostr = bot.wxcpt.VerifyURL(msg_signature, timestamp, nonce, echostr)
    logging.info(f'ret: {ret}, echostr: {echostr}')
    if ret != 0:
        logging.error("error, VerifyURL ret: " + str(ret))
        return ''
    return echostr.decode('utf-8')

其中解密模块见:Github - weworkapi_python - WXBizMsgCrypt3.py

需要安装 pycryptodome

pip install pycryptodome

接收消息

与验证消息的 handler 类似,但是是 post 请求

from fastapi import Body

@app.post('/bot', response_class=PlainTextResponse)
def handle_echo(msg_signature: str, timestamp: str, nonce: str, data: str = Body(..., media_type="text/plain")) -> str:
	ret, xml = bot.wxcpt.DecryptMsg(data, msg_signature, timestamp, nonce)
    if ret != 0:
        logging.error("error, DecryptMsg ret: " + str(ret))
        return ''

    msg = _xml.read_xml(xml)
    if msg is None:
        logging.error(f"parse xml error, xml: {xml}")
        return ''
    logging.info(f'receive msg: {msg}')
    return ''

消息解密后是 xml 格式,可以用以下代码解析

from typing import NamedTuple
import logging
import xml.etree.cElementTree as ET

class Message(NamedTuple):
    user_id: str
    user_name: str
    user_alias: str
    webhook_url: str
    chat_id: str
    chat_info_url: str
    msg_id: str
    msg_type: str
    chat_type: str
    content: str


def read_xml(xml: bytes) -> Message:
    try:
        xml_tree = ET.fromstring(xml.decode())
        sender = xml_tree.find('From')
        user_id = sender.find('UserId').text
        user_name = sender.find('Name').text
        user_alias = sender.find('Alias').text
        webhook_url = xml_tree.find('WebhookUrl').text
        chat_id = xml_tree.find('ChatId').text
        chat_info_url = xml_tree.find('GetChatInfoUrl').text
        msg_id = xml_tree.find('MsgId').text
        msg_type = xml_tree.find('MsgType').text
        chat_type = xml_tree.find('ChatType').text
        content = ''
        if msg_type == 'text':
            content = xml_tree.find('Text').find('Content').text

        return Message(
            user_id,
            user_name,
            user_alias,
            webhook_url,
            chat_id,
            chat_info_url,
            msg_id,
            msg_type,
            chat_type,
            content
        )
    except Exception as e:
        logging.error(f"read xml error: {str(e)}")
        return None

推送消息

发送文字示例,支持 markdown 格式

import httpx

body = {
    'msgtype': 'markdown',
    'markdown': {
        'content': 'YOUR_MESSAGE'
    }
}

if chat_id is not None:
    body['chatid'] = chat_id #  发送到指定群聊
if user_id is not None:
    content = '<@{user_id}>\n' + content # 加上 @用户

r = httpx.post(webhook_url, json=body)

发送图片示例

body = {
    'msgtype': 'image',
    'image': {
        'base64': image_base64,
        'md5': image_md5,
    }
}
r = httpx.post(self.webhook_url, json=body)