spreadsheetToken = "Pxxxxxxxxxxxxh"
sheetId = "lxxxxV"
data_raw = {
"valueRange": {
"range": f"{sheetId}!A1:E3",
"values": [
["2023/12/25", "收入", "微信", "100", "帐号 老表 max"],
["2023/12/25", "支出", "支付宝", "10", "买东西 老表 max"],
["2023/12/26", "支出", "支付宝", "19.9", "买东西 老表 max"]
]
}
}
url = f"https://open.feishu.cn/open-apis/sheets/v2/spreadsheets/{spreadsheetToken}/values_append"
r2 = requests.post(url, data=json.dumps(data_raw), headers=header)
print(r2.json()["msg"])
from flask import Flask, request, jsonify
app = Flask(__name__)
@app.route('/func', methods=['POST'])
def all_func():
try:
data = request.json
print(data)
if data:
return jsonify({"status": "success", "message": "Received"})
else:
return "Missing data", 400
except Exception as e:
return str(e), 500
if __name__ == '__main__':
app.run(debug=True, port=8003, host="0.0.0.0")
import requests
import json
import re
from flask import Flask, request, jsonify
APP_ID = "cli_xxxxxxxx"
APP_SECRET = "8xxxxxxxxxxxxxxxxxxk"
SPREADSHEET_TOKEN = "Pxxxxxxxxxxxxh"
SHEET_ID = "lxxxxV"
TRIGGER_KEYWORD = "/fs"
app = Flask(__name__)
def get_access_token():
url = "https://open.feishu.cn/open-apis/auth/v3/tenant_access_token/internal/"
post_data = {"app_id": APP_ID, "app_secret": APP_SECRET}
r = requests.post(url, data=post_data)
return r.json()["tenant_access_token"]
def write_to_sheet(amount, type_, channel, remark):
token = get_access_token()
header = {"Content-Type": "application/json; charset=utf-8", "Authorization": f"Bearer {token}"}
date = "2023-12-25"
data_raw = {
"valueRange": {
"range": f"{SHEET_ID}!A1:A1",
"values": [[date, type_, channel, amount, remark]]
}
}
url = f"https://open.feishu.cn/open-apis/sheets/v2/spreadsheets/{SPREADSHEET_TOKEN}/values_append"
r = requests.post(url, data=json.dumps(data_raw), headers=header)
return r.json()
def send_msg(user_id, text):
token = get_access_token()
header = {"Content-Type": "application/json; charset=utf-8", "Authorization": f"Bearer {token}"}
url = f"https://open.feishu.cn/open-apis/im/v1/messages?receive_id_type=user_id"
post_data = {
"receive_id": user_id,
"msg_type": "text",
"content": json.dumps({"text": text})
}
r = requests.post(url, headers=header, data=json.dumps(post_data))
return r.json()
@app.route('/func', methods=['POST'])
def webhook_handler():
data = request.json
if not data:
return "No Data", 400
event = data.get("event", {})
content = event.get("message", {}).get("content", "")
sender_id = event.get("sender", {}).get("user_id", "")
if TRIGGER_KEYWORD in content:
parts = content.split()
if len(parts) >= 5:
_, type_, channel, amount, remark = parts[0], parts[1], parts[2], parts[3], parts[4]
result = write_to_sheet(amount, type_, channel, remark)
if result.get("code") == 0:
msg_text = f"记账成功!\n类型:{type_}\n金额:{amount}\n详情:{remark}"
else:
msg_text = f"记账失败:{result.get('msg')}"
else:
msg_text = "格式错误,请使用 /fs 类型 渠道 金额 备注"
send_msg(sender_id, msg_text)
return jsonify({"status": "ok"})
if __name__ == '__main__':
app.run(host="0.0.0.0", port=8003)