from flask import Flask, request, jsonify
import requests
from Crypto.Cipher import AES
import base64
import json
import time
CORP_ID = "你的企业ID"
CORP_SECRET = "你的应用 Secret"
AGENT_ID = "你的 AgentId"
TOKEN = "videoparse2025"
ENCODING_AES_KEY = "你的 EncodingAESKey"
PARSE_API = "https://your-parser-api.com/parse"
app = Flask(__name__)
access_token_cache = {
"token": None,
"expire_time": 0
}
def get_access_token():
"""获取企业微信 access_token(接口调用凭证)"""
global access_token_cache
if access_token_cache["token"] and time.time() < access_token_cache["expire_time"]:
return access_token_cache["token"]
url = f"https://qyapi.weixin.qq.com/cgi-bin/gettoken?corpid={CORP_ID}&corpsecret={CORP_SECRET}"
try:
res = requests.get(url, timeout=10)
res.raise_for_status()
data = res.json()
if data.get("errcode") == 0:
access_token = data["access_token"]
access_token_cache["token"] = access_token
access_token_cache["expire_time"] = time.time() + 7100
return access_token
else:
print(f"获取 access_token 失败:{data}")
return None
except Exception as e:
print(f"获取 access_token 异常:{str(e)}")
return None
def decrypt_msg(msg_signature, timestamp, nonce, encrypt_msg):
"""解密企业微信加密消息"""
try:
aes_key = base64.b64decode(ENCODING_AES_KEY + "=")
cipher = AES.new(aes_key, AES.MODE_CBC, aes_key[:16])
decrypt_data = cipher.decrypt(base64.b64decode(encrypt_msg))
msg_len = int.from_bytes(decrypt_data[16:20], byteorder="big")
real_msg = decrypt_data[20:20+msg_len].decode("utf-8")
return json.loads(real_msg)
except Exception as e:
print(f"解密消息失败:{str(e)}")
return None
def extract_weixin_video_url(video_id):
"""通过视频号临时素材 ID,提取原始链接"""
access_token = get_access_token()
if not access_token:
return None
url = f"https://qyapi.weixin.qq.com/cgi-bin/media/get?access_token={access_token}&media_id={video_id}"
try:
res = requests.get(url, timeout=15)
res.raise_for_status()
data = res.json()
if data.get("url"):
print(f"提取到视频号链接:{data['url']}")
return data["url"]
else:
print(f"提取视频号链接失败:{data}")
return None
except Exception as e:
print(f"获取临时素材异常:{str(e)}")
return None
def parse_video_no_watermark(original_url):
"""调用解析接口,获取无水印链接"""
if not original_url:
return "提取视频号链接失败,请重试"
try:
encoded_url = requests.utils.quote(original_url)
parse_url = f"{PARSE_API}?url={encoded_url}"
res = requests.get(parse_url, timeout=20)
res.raise_for_status()
data = res.json()
if data.get("code") == 0 and data.get("data", {}).get("no_watermark_url"):
return data["data"]["no_watermark_url"]
elif data.get("success") and data.get("url"):
return data["url"]
else:
print(f"解析无水印失败:{data}")
return f"解析失败,错误信息:{data.get('msg', '未知错误')}"
except Exception as e:
print(f"解析接口异常:{str(e)}")
return "解析接口超时,请稍后再试"
def send_msg_to_user(user_id, content):
"""给用户发送消息(企业微信文本消息接口)"""
access_token = get_access_token()
if not access_token:
return
url = f"https://qyapi.weixin.qq.com/cgi-bin/message/send?access_token={access_token}"
data = {
"touser": user_id,
"agentid": int(AGENT_ID),
"msgtype": "text",
"text": {"content": content},
"safe": 0,
"enable_id_trans": 0,
"enable_duplicate_check": 0
}
try:
res = requests.post(url, json=data, timeout=10)
res.raise_for_status()
result = res.json()
if result.get("errcode") == 0:
print(f"回复用户{user_id}成功:{content}")
else:
print(f"回复用户失败:{result}")
except Exception as e:
print(f"发送消息异常:{str(e)}")
@app.route("/wechat/callback", methods=["POST"])
def wechat_callback():
"""接收企业微信消息回调(用户转发视频号触发)"""
msg_signature = request.args.get("msg_signature")
timestamp = request.args.get("timestamp")
nonce = request.args.get("nonce")
encrypt_msg = request.data.decode("utf-8", errors="ignore")
real_msg = decrypt_msg(msg_signature, timestamp, nonce, encrypt_msg)
if not real_msg:
return jsonify({"errcode": -1, "errmsg": "decrypt failed"})
print(f"收到用户消息:{real_msg}")
if real_msg.get("MsgType") == "video" and real_msg.get("VideoMsg"):
user_id = real_msg.get("FromUserName")
video_id = real_msg["VideoMsg"].get("MsgId")
original_url = extract_weixin_video_url(video_id)
no_watermark_url = parse_video_no_watermark(original_url)
reply_content = f"✅ 视频号无水印链接已生成:\n{no_watermark_url}\n\n💡 提示:链接有效期 24 小时,建议及时保存"
send_msg_to_user(user_id, reply_content)
else:
user_id = real_msg.get("FromUserName")
send_msg_to_user(user_id, "❌ 请转发「微信视频号视频」给我,我会自动生成无水印链接~")
return jsonify({"errcode": 0, "errmsg": "success"})
if __name__ == "__main__":
app.run(host="0.0.0.0", port=8080, debug=True)