import google.generativeai as genai
import asyncio
import datetime
import pytz
import pickle
import os
import random
API_KEY = "在这里粘贴你的 Google_API_Key"
DATA_FILE = "bot_memory.pkl"
genai.configure(api_key=API_KEY)
chat_model = genai.GenerativeModel('gemini-1.5-flash')
class GroupMemory:
def __init__(self, group_id):
self.group_id = group_id
self.summary = "暂无早期历史记录。"
self.buffer = []
self.lock = asyncio.Lock()
self.last_active_time = 0
self.last_msg_content = ""
self.repeat_count = 0
self.bot_config = {
"COOLDOWN_SECONDS": 0,
"TRIGGER_PROBABILITY": 0.05,
"REPEATER_THRESHOLD": 2,
"REPEATER_EXEC_PROB": 0.1,
"MEMORY_LIMIT_CHARS": 10000,
"MEMORY_KEEP_COUNT": 100
}
self.checkin_data = {}
def __getstate__(self):
state = self.__dict__.copy()
del state['lock']
return state
def __setstate__(self, state):
self.__dict__.update(state)
self.lock = asyncio.Lock()
def checkin(self, user_id, user_name, content):
"""执行打卡:增加记录 + 总星星"""
if user_id not in self.checkin_data:
self.checkin_data[user_id] = {"name": "", "total_stars": 0, "records": {}}
user_data = self.checkin_data[user_id]
user_data["name"] = user_name
if content not in user_data["records"]:
user_data["records"][content] = 0
user_data["records"][content] += 1
user_data["total_stars"] += 1
save_data()
def delete_checkin(self, user_id, content):
"""删除打卡:减少记录 + 总星星(返回:是否成功,提示语)"""
if user_id not in self.checkin_data:
return False, "你还没有任何打卡记录哦~"
user_data = self.checkin_data[user_id]
if content not in user_data["records"] or user_data["records"][content] == 0:
return False, f"你没有「{content}」的打卡记录~"
user_data["records"][content] -= 1
user_data["total_stars"] -= 1
if user_data["records"][content] == 0:
del user_data["records"][content]
if not user_data["records"]:
del self.checkin_data[user_id]
save_data()
return True, f"已删除「{content}」的 1 次打卡记录~"
def admin_checkin(self, target_user_id, target_user_name, content, count, operation):
""" 管理员操作他人打卡记录 operation: "add"(增加) / "delete"(删除)
返回:是否成功,提示语 """
if target_user_id not in self.checkin_data:
self.checkin_data[target_user_id] = {"name": target_user_name, "total_stars": 0, "records": {}}
user_data = self.checkin_data[target_user_id]
user_data["name"] = target_user_name
if content not in user_data["records"]:
user_data["records"][content] = 0
if operation == "add":
user_data["records"][content] += count
user_data["total_stars"] += count
save_data()
return True, f"已为用户【{target_user_name}】增加「{content}」打卡记录 {count} 次~"
elif operation == "delete":
if user_data["records"][content] < count:
return False, f"用户【{target_user_name}】的「{content}」打卡记录不足 {count} 次!"
user_data["records"][content] -= count
user_data["total_stars"] -= count
if user_data["records"][content] == 0:
del user_data["records"][content]
if not user_data["records"]:
del self.checkin_data[target_user_id]
save_data()
return True, f"已为用户【{target_user_name}】删除「{content}」打卡记录 {count} 次~"
else:
return False, "操作类型错误!仅支持「增加」或「删除」"
def calculate_medals(self, stars):
"""计算星星对应的勋章:4 星=1 月,4 月=1 日,4 日=1 皇冠"""
crown = stars // (4 * 4 * 4)
stars %= (4 * 4 * 4)
sun = stars // (4 * 4)
stars %= (4 * 4)
moon = stars // 4
star = stars % 4
medals = ""
if crown > 0: medals += "👑" * crown
if sun > 0: medals += "☀️" * sun
if moon > 0: medals += "🌙" * moon
if star > 0 or (crown + sun + moon + star == 0): medals += "⭐" * star
return medals
def generate_checkin_ranking(self):
"""生成打卡排行榜(按总星星降序,子项也显示勋章)"""
if not self.checkin_data:
return "当前还没有打卡记录哦~快叫群友一起打卡吧!"
sorted_users = sorted(
self.checkin_data.values(), key=lambda x: x["total_stars"], reverse=True
)
ranking = "🏆 打卡记录排行榜 🏆\n"
for idx, user in enumerate(sorted_users, 1):
total_medals = self.calculate_medals(user["total_stars"])
ranking += f"{idx}. {user['name']}{total_medals}\n"
for content, count in user["records"].items():
content_medals = self.calculate_medals(count)
ranking += f" · {content}{content_medals}\n"
return ranking
def add_message(self, name, msg, time_str):
entry = f"[{time_str}] 【{name}】: {msg}"
self.buffer.append(entry)
clean_msg = msg.strip()
if clean_msg == self.last_msg_content and clean_msg:
self.repeat_count += 1
else:
self.last_msg_content = clean_msg
self.repeat_count = 1
def estimate_length(self):
return sum(len(m) for m in self.buffer)
async def check_and_compress(self):
async with self.lock:
if self.estimate_length() > self.bot_config["MEMORY_LIMIT_CHARS"] and len(self.buffer) > self.bot_config["MEMORY_KEEP_COUNT"]:
to_compress = self.buffer[:-self.bot_config["MEMORY_KEEP_COUNT"]]
text_block = "\n".join(to_compress)
prompt = f"""
你是记忆整理员,更新长期记忆摘要(800 字内):
【长期记忆】:{self.summary}
【待归档对话】:{text_block}
"""
try:
loop = asyncio.get_running_loop()
resp = await loop.run_in_executor(None, lambda: chat_model.generate_content(prompt))
if resp.candidates[0].content.parts[0].text:
self.summary = resp.candidates[0].content.parts[0].text
self.buffer = self.buffer[-self.bot_config["MEMORY_KEEP_COUNT"]:]
save_data()
except Exception as e:
print(f"压缩记忆失败:{e}")
pass
async def generate_reply(self, current_question, sender_name, is_active_interrupt=False):
tz = pytz.timezone('Asia/Shanghai')
context_str = "\n".join(self.buffer)
if is_active_interrupt:
task_prompt = "没人艾特你,自然加入讨论,简短、像群友,可复读/玩梗,直接说话不加称呼,绝对不要输出任何时间戳/日期/时间格式,不要添加自己的名字前缀(如【Gemini】)。"
else:
if not current_question.strip():
task_prompt = f"用户【{sender_name}】艾特了你,自然接话,绝对不要输出任何时间戳/日期/时间格式,不要添加自己的名字前缀(如【Gemini】)。"
else:
task_prompt = f"用户【{sender_name}】提问:{current_question},请回答,绝对不要输出任何时间戳/日期/时间格式,不要添加自己的名字前缀(如【Gemini】)。"
final_prompt = f"""
你是 QQ 群里的 AI 伙伴 Gemini。
【长期记忆】:{self.summary}
【近期对话】:{context_str}
【任务】:{task_prompt}
【规则】:
1. 问时间时只说当前北京时间(比如'现在是下午 3 点 20 分'),但不要输出任何括号/时间戳/数字格式的时间;
2. 问新闻/数据自动联网,风格像群友而非客服;
3. 回复时禁止添加自己的名字或任何标识前缀(如【Gemini】),直接输出回复内容。
"""
try:
loop = asyncio.get_running_loop()
resp = await loop.run_in_executor(None, lambda: chat_model.generate_content(final_prompt))
return resp.candidates[0].content.parts[0].text if resp.candidates[0].content.parts[0].text else "(暂时无法回复)"
except Exception as e:
return f"(回复失败:{str(e)})"
async def check_active_intervention(self, current_msg):
import time
now = time.time()
if now - self.last_active_time < self.bot_config["COOLDOWN_SECONDS"]:
return False, None
if (self.repeat_count >= self.bot_config["REPEATER_THRESHOLD"] and random.random() < self.bot_config["REPEATER_EXEC_PROB"] and current_msg.strip()):
try:
loop = asyncio.get_running_loop()
check_resp = await loop.run_in_executor(None, lambda: chat_model.generate_content(f"判断'{current_msg}'是否是正常梗(非广告/脏话):是回 YES,否回 NO"))
if "YES" in check_resp.candidates[0].content.parts[0].text.strip().upper():
self.last_active_time = now
return True, current_msg
except:
pass
return False, None
if "gemini" not in current_msg.lower() and random.random() > self.bot_config["TRIGGER_PROBABILITY"]:
return False, None
try:
loop = asyncio.get_running_loop()
decision_resp = await loop.run_in_executor(None, lambda: chat_model.generate_content(f"""
最近消息:{chr(10).join(self.buffer[-15:])}
当前消息:{current_msg}
没被艾特,是否有必要说话?满足(客观问题无人答/讨论你/有趣的氛围/有趣的话题)回 YES,否则 NO
"""))
if "YES" in decision_resp.candidates[0].content.parts[0].text.strip().upper():
reply = await self.generate_reply("", "群友", is_active_interrupt=True)
self.last_active_time = now
return True, reply
except Exception as e:
print(f"主动介入失败:{e}")
pass
return False, None
memories = {}
def save_data():
try:
with open(DATA_FILE, 'wb') as f:
pickle.dump(memories, f)
except Exception as e:
print(f"保存记忆失败:{e}")
def load_data():
global memories
if os.path.exists(DATA_FILE):
try:
with open(DATA_FILE, 'rb') as f:
memories = pickle.load(f)
except:
memories = {}
def get_memory(group_id):
if group_id not in memories:
memories[group_id] = GroupMemory(group_id)
return memories[group_id]
load_data()
import nonebot
from nonebot import on_message
from nonebot.adapters.onebot.v11 import Adapter, Bot, GroupMessageEvent
from memory import get_memory, save_data
import atexit
import datetime
import pytz
ADMIN_QQ = 1234567
nonebot.init()
driver = nonebot.get_driver()
driver.register_adapter(Adapter)
atexit.register(save_data)
def unix_to_beijing(timestamp):
"""将时间戳转换为北京时间字符串"""
dt_utc = datetime.datetime.fromtimestamp(timestamp, pytz.utc)
dt_bj = dt_utc.astimezone(pytz.timezone('Asia/Shanghai'))
return dt_bj.strftime("%H:%M:%S")
def get_current_bj_time():
"""获取当前北京时间字符串"""
return datetime.datetime.now(pytz.timezone('Asia/Shanghai')).strftime("%H:%M:%S")
def is_at_me(bot: Bot, event: GroupMessageEvent) -> bool:
"""判断是否@了机器人"""
if event.is_tome():
return True
for seg in event.message:
if seg.type == "at" and str(seg.data.get("qq")) == str(bot.self_id):
return True
return False
handler = on_message(priority=1, block=True)
@handler.handle()
async def _(bot: Bot, event: GroupMessageEvent):
user_id = event.user_id
group_id = event.group_id
msg = event.get_plaintext().strip()
mem = get_memory(group_id)
try:
info = await bot.get_group_member_info(group_id=group_id, user_id=user_id)
name = info.get('card') or info.get('nickname') or str(user_id)
except:
name = "群友"
msg_lower = msg.lower()
def get_cmd_content(prefix):
prefix_len = len(prefix)
if msg.startswith(prefix):
return msg[prefix_len:].strip()
elif msg_lower.startswith(prefix.lower()):
for i in range(len(msg)):
if msg[i:i+len(prefix)].lower() == prefix.lower():
return msg[i+len(prefix):].strip()
return ""
if msg_lower.startswith("gemini 设置 "):
cmd_content = get_cmd_content("gemini 设置 ")
mem.add_message(name, msg, get_current_bj_time())
save_data()
reply_msg = ""
if user_id != ADMIN_QQ:
reply_msg = "❌ 你没有权限执行此操作!仅管理员可用~"
else:
try:
cmd_parts = msg.split(" ", 3)
if len(cmd_parts) != 4:
reply_msg = "❌ 指令格式错误!\n正确格式:gemini 设置 参数名 值\n示例:gemini 设置 冷却时间 600"
else:
param_map = {
"冷却时间": "COOLDOWN_SECONDS",
"插嘴概率": "TRIGGER_PROBABILITY",
"复读阈值": "REPEATER_THRESHOLD",
"复读概率": "REPEATER_EXEC_PROB",
"缓存字数上限": "MEMORY_LIMIT_CHARS",
"保留消息数": "MEMORY_KEEP_COUNT"
}
param_cn = cmd_parts[2]
param_value = cmd_parts[3]
if param_cn not in param_map:
valid_params = "、".join(param_map.keys())
reply_msg = f"❌ 参数名错误!\n支持的参数:{valid_params}"
else:
param_en = param_map[param_cn]
if param_en in ["COOLDOWN_SECONDS", "REPEATER_THRESHOLD", "MEMORY_LIMIT_CHARS", "MEMORY_KEEP_COUNT"]:
new_value = int(param_value)
else:
new_value = float(param_value)
mem.bot_config[param_en] = new_value
save_data()
reply_msg = f"✅ 【{group_id}群】参数修改成功!\n{param_cn} = {new_value}"
except ValueError:
reply_msg = "❌ 参数值格式错误!\n整数填数字(如 600),概率填小数(如 0.02)"
except Exception as e:
reply_msg = f"❌ 修改失败:{str(e)}"
mem.add_message("Gemini", reply_msg, get_current_bj_time())
save_data()
await handler.finish(reply_msg)
elif msg_lower.startswith("gemini 打卡 "):
content = get_cmd_content("gemini 打卡 ")
if not content:
reply_msg = "❌ 打卡内容不能为空哦~比如:「gemini 打卡 篮球」"
else:
mem.checkin(str(user_id), name, content)
ranking = mem.generate_checkin_ranking()
reply_msg = f"✅ {name} 打卡「{content}」成功!\n{ranking}"
mem.add_message(name, msg, get_current_bj_time())
mem.add_message("Gemini", reply_msg, get_current_bj_time())
save_data()
await handler.finish(reply_msg)
elif msg_lower.startswith("gemini 删除 "):
content = get_cmd_content("gemini 删除 ")
if not content:
reply_msg = "❌ 删除内容不能为空哦~比如:「gemini 删除 篮球」"
else:
success, result = mem.delete_checkin(str(user_id), content)
if success:
ranking = mem.generate_checkin_ranking()
reply_msg = f"{result}\n{ranking}"
else:
reply_msg = result
mem.add_message(name, msg, get_current_bj_time())
mem.add_message("Gemini", reply_msg, get_current_bj_time())
save_data()
await handler.finish(reply_msg)
elif msg_lower.strip() == "gemini 排行榜":
ranking = mem.generate_checkin_ranking()
reply_msg = f"{ranking}"
mem.add_message(name, msg, get_current_bj_time())
mem.add_message("Gemini", reply_msg, get_current_bj_time())
save_data()
await handler.finish(reply_msg)
elif msg_lower.startswith("gemini 管理打卡 "):
if user_id != ADMIN_QQ:
reply_msg = "❌ 你没有权限执行此操作!仅管理员可用~"
mem.add_message(name, msg, get_current_bj_time())
mem.add_message("Gemini", reply_msg, get_current_bj_time())
save_data()
await handler.finish(reply_msg)
cmd_content = get_cmd_content("gemini 管理打卡 ")
if not cmd_content:
reply_msg = "❌ 指令格式错误!\n正确格式:\ngemini 管理打卡 增加 目标 QQ 号 内容 次数\n示例:gemini 管理打卡 增加 123456 跑步 3\n\ngemini 管理打卡 删除 目标 QQ 号 内容 次数\n示例:gemini 管理打卡 删除 123456 跑步 1"
mem.add_message(name, msg, get_current_bj_time())
mem.add_message("Gemini", reply_msg, get_current_bj_time())
save_data()
await handler.finish(reply_msg)
cmd_parts = cmd_content.split()
if len(cmd_parts) != 4:
reply_msg = "❌ 指令参数不足!\n正确格式:gemini 管理打卡 增加/删除 目标 QQ 号 内容 次数"
mem.add_message(name, msg, get_current_bj_time())
mem.add_message("Gemini", reply_msg, get_current_bj_time())
save_data()
await handler.finish(reply_msg)
operation = cmd_parts[0]
target_qq = cmd_parts[1]
content = cmd_parts[2]
count_str = cmd_parts[3]
if operation not in ["增加", "删除"]:
reply_msg = "❌ 操作类型错误!仅支持「增加」或「删除」"
mem.add_message(name, msg, get_current_bj_time())
mem.add_message("Gemini", reply_msg, get_current_bj_time())
save_data()
await handler.finish(reply_msg)
if not target_qq.isdigit():
reply_msg = "❌ 目标 QQ 号必须是纯数字!"
mem.add_message(name, msg, get_current_bj_time())
mem.add_message("Gemini", reply_msg, get_current_bj_time())
save_data()
await handler.finish(reply_msg)
if not count_str.isdigit() or int(count_str) <= 0:
reply_msg = "❌ 次数必须是正整数!"
mem.add_message(name, msg, get_current_bj_time())
mem.add_message("Gemini", reply_msg, get_current_bj_time())
save_data()
await handler.finish(reply_msg)
count = int(count_str)
try:
target_info = await bot.get_group_member_info(group_id=group_id, user_id=target_qq)
target_name = target_info.get('card') or target_info.get('nickname') or target_qq
except:
target_name = target_qq
op_map = {"增加": "add", "删除": "delete"}
success, result = mem.admin_checkin(
target_user_id=str(target_qq),
target_user_name=target_name,
content=content,
count=count,
operation=op_map[operation]
)
if success:
ranking = mem.generate_checkin_ranking()
reply_msg = f"{result}\n{ranking}"
else:
reply_msg = result
mem.add_message(name, msg, get_current_bj_time())
mem.add_message("Gemini", reply_msg, get_current_bj_time())
save_data()
await handler.finish(reply_msg)
if not msg:
msg = " "
mem.add_message(name, msg, unix_to_beijing(event.time))
save_data()
await mem.check_and_compress()
final_reply = None
if is_at_me(bot, event):
final_reply = await mem.generate_reply(msg, name, is_active_interrupt=False)
else:
should, content = await mem.check_active_intervention(msg)
if should and content:
final_reply = content
if final_reply:
mem.add_message("Gemini", final_reply, get_current_bj_time())
save_data()
await handler.finish(final_reply)
if __name__ == "__main__":
nonebot.run(host="0.0.0.0", port=8080)