from fastapi import FastAPI, Request, HTTPException
from fastapi.responses import PlainTextResponse, Response
from WXBizMsgCrypt3 import WXBizMsgCrypt
import xmltodict
import logging
import time
import xml.etree.ElementTree as ET
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
app = FastAPI()
TOKEN = 'your_token'
ENCODING_AES_KEY = 'your_encoding_aes_key'
CORP_ID = 'your_corp_id'
try:
wxcpt = WXBizMsgCrypt(TOKEN, ENCODING_AES_KEY, CORP_ID)
except Exception as e:
logger.error(f"初始化 WXBizMsgCrypt 失败:{str(e)}")
raise
@app.get("/callback")
async def verify_url(msg_signature: str, timestamp: str, nonce: str, echostr: str):
"""验证 URL 有效性"""
try:
logger.info(f"收到验证请求:msg_signature={msg_signature}, timestamp={timestamp}, nonce={nonce}")
ret, sEchoStr = wxcpt.VerifyURL(msg_signature, timestamp, nonce, echostr)
if ret == 0:
logger.info("URL 验证成功")
return PlainTextResponse(content=sEchoStr)
else:
logger.error(f"URL 验证失败,错误码:{ret}")
raise HTTPException(status_code=400, detail="验证失败")
except Exception as e:
logger.error(f"验证过程发生错误:{str(e)}")
raise HTTPException(status_code=500, detail="服务器内部错误")
@app.post("/callback")
async def receive_message(request: Request):
"""接收并处理企业微信消息"""
try:
body = await request.body()
msg_signature = request.query_params.get("msg_signature")
timestamp = request.query_params.get("timestamp")
nonce = request.query_params.get("nonce")
if not all([msg_signature, timestamp, nonce]):
raise HTTPException(status_code=400, detail="缺少必要的参数")
logger.info(f"收到消息推送:msg_signature={msg_signature}, timestamp={timestamp}, nonce={nonce}")
ret, sMsg = wxcpt.DecryptMsg(body, msg_signature, timestamp, nonce)
if ret != 0:
logger.error(f"消息解密失败,错误码:{ret}")
raise HTTPException(status_code=400, detail="消息解密失败")
xml_dict = xmltodict.parse(sMsg)
logger.info(f"解密后的消息内容:{xml_dict}")
xml_content = xml_dict['xml']
to_user_name = xml_content.get('ToUserName')
from_user_name = xml_content.get('FromUserName')
create_time = xml_content.get('CreateTime')
msg_type = xml_content.get('MsgType')
content = xml_content.get('Content')
msg_id = xml_content.get('MsgId')
agent_id = xml_content.get('AgentID')
logger.info(f"收到消息:{content}")
reply_content = content.replace('吗', '').replace('?', '!').replace('?', '!')
current_time = str(int(time.time()))
reply_msg = f"""<xml>
<ToUserName><![CDATA[{from_user_name}]]></ToUserName>
<FromUserName><![CDATA[{to_user_name}]]></FromUserName>
<CreateTime>{current_time}</CreateTime>
<MsgType><![CDATA[text]]></MsgType>
<Content><![CDATA[{reply_content}]]></Content>
<MsgId>{msg_id}</MsgId>
<AgentID>{agent_id}</AgentID>
</xml>"""
ret, encrypted_msg = wxcpt.EncryptMsg(reply_msg, nonce, current_time)
if ret != 0:
logger.error(f"消息加密失败,错误码:{ret}")
raise HTTPException(status_code=500, detail="消息加密失败")
logger.info("成功构造并加密回复消息")
return Response(content=encrypted_msg, media_type="application/xml")
except Exception as e:
logger.error(f"处理消息时发生错误:{str(e)}")
raise HTTPException(status_code=500, detail="服务器内部错误")
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="127.0.0.1", port=5000)