import google.generativeai as genai
import asyncio
import datetime
import pytz
import pickle
import os
import random
API_KEY = "在这里粘贴你的 Google_API_Key"
DATA_FILE = "bot_memory.pkl"
genai.configure(api_key=API_KEY)
chat_model = genai.GenerativeModel('gemini-1.5-flash')
class GroupMemory:
def __init__(self, group_id):
self.group_id = group_id
self.summary = "暂无早期历史记录。"
self.buffer = []
self.lock = asyncio.Lock()
self.last_active_time = 0
self.last_msg_content = ""
self.repeat_count = 0
self.bot_config = {
"COOLDOWN_SECONDS": 0,
"TRIGGER_PROBABILITY": 0.05,
"REPEATER_THRESHOLD": 2,
"REPEATER_EXEC_PROB": 0.1,
"MEMORY_LIMIT_CHARS": 10000,
"MEMORY_KEEP_COUNT": 100
}
self.checkin_data = {}
def __getstate__(self):
state = self.__dict__.copy()
del state['lock']
return state
def __setstate__(self, state):
self.__dict__.update(state)
self.lock = asyncio.Lock()
def checkin(self, user_id, user_name, content):
if user_id not in self.checkin_data:
self.checkin_data[user_id] = {"name": "", "total_stars": 0, "records": {}}
user_data = self.checkin_data[user_id]
user_data["name"] = user_name
if content not in user_data["records"]:
user_data["records"][content] = 0
user_data["records"][content] += 1
user_data["total_stars"] += 1
save_data()
def delete_checkin(self, user_id, content):
if user_id not in self.checkin_data:
return False, "你还没有任何打卡记录哦~"
user_data = self.checkin_data[user_id]
if content not in user_data["records"] or user_data["records"][content] == 0:
return False, f"你没有「{content}」的打卡记录~"
user_data["records"][content] -= 1
user_data["total_stars"] -= 1
if user_data["records"][content] == 0:
del user_data["records"][content]
if not user_data["records"]:
del self.checkin_data[user_id]
save_data()
return True, f"已删除「{content}」的 1 次打卡记录~"
def admin_checkin(self, target_user_id, target_user_name, content, count, operation):
if target_user_id not in self.checkin_data:
self.checkin_data[target_user_id] = {"name": target_user_name, "total_stars": 0, "records": {}}
user_data = self.checkin_data[target_user_id]
user_data["name"] = target_user_name
if content not in user_data["records"]:
user_data["records"][content] = 0
if operation == "add":
user_data["records"][content] += count
user_data["total_stars"] += count
save_data()
return True, f"已为用户【{target_user_name}】增加「{content}」打卡记录 {count} 次~"
elif operation == "delete":
if user_data["records"][content] < count:
return False, f"用户【{target_user_name}】的「{content}」打卡记录不足 {count} 次!"
user_data["records"][content] -= count
user_data["total_stars"] -= 1
if user_data["records"][content] == 0:
del user_data["records"][content]
if not user_data["records"]:
del self.checkin_data[target_user_id]
save_data()
return True, f"已为用户【{target_user_name}】删除「{content}」打卡记录 {count} 次~"
else:
return False, "操作类型错误!仅支持「增加」或「删除」"
def calculate_medals(self, stars):
crown = stars // (4*4*4)
stars %= (4*4*4)
sun = stars // (4*4)
stars %= (4*4)
moon = stars // 4
star = stars % 4
medals = ""
if crown > 0: medals += "👑" * crown
if sun > 0: medals += "☀️" * sun
if moon > 0: medals += "🌙" * moon
if star > 0 or (crown+sun+moon+star == 0): medals += "⭐" * star
return medals
def generate_checkin_ranking(self):
if not self.checkin_data:
return "当前还没有打卡记录哦~快叫群友一起打卡吧!"
sorted_users = sorted(self.checkin_data.values(), key=lambda x: x["total_stars"], reverse=True)
ranking = "🏆 打卡记录排行榜 🏆\n"
for idx, user in enumerate(sorted_users, 1):
total_medals = self.calculate_medals(user["total_stars"])
ranking += f"{idx}. {user['name']}{total_medals}\n"
for content, count in user["records"].items():
content_medals = self.calculate_medals(count)
ranking += f" · {content}{content_medals}\n"
return ranking
def add_message(self, name, msg, time_str):
entry = f"[{time_str}] 【{name}】: {msg}"
self.buffer.append(entry)
clean_msg = msg.strip()
if clean_msg == self.last_msg_content and clean_msg:
self.repeat_count += 1
else:
self.last_msg_content = clean_msg
self.repeat_count = 1
def estimate_length(self):
return sum(len(m) for m in self.buffer)
async def check_and_compress(self):
async with self.lock:
if self.estimate_length() > self.bot_config["MEMORY_LIMIT_CHARS"] and len(self.buffer) > self.bot_config["MEMORY_KEEP_COUNT"]:
to_compress = self.buffer[:-self.bot_config["MEMORY_KEEP_COUNT"]]
text_block = "\n".join(to_compress)
prompt = f""" 你是记忆整理员,更新长期记忆摘要(800 字内):
【长期记忆】:{self.summary}
【待归档对话】:{text_block}"""
try:
loop = asyncio.get_running_loop()
resp = await loop.run_in_executor(None, lambda: chat_model.generate_content(prompt))
if resp.candidates[0].content.parts[0].text:
self.summary = resp.candidates[0].content.parts[0].text
self.buffer = self.buffer[-self.bot_config["MEMORY_KEEP_COUNT"]:]
save_data()
except Exception as e:
print(f"压缩记忆失败:{e}")
async def generate_reply(self, current_question, sender_name, is_active_interrupt=False):
tz = pytz.timezone('Asia/Shanghai')
context_str = "\n".join(self.buffer)
if is_active_interrupt:
task_prompt = "没人艾特你,自然加入讨论,简短、像群友,可复读/玩梗,直接说话不加称呼,绝对不要输出任何时间戳/日期/时间格式,不要添加自己的名字前缀(如【Gemini】)。"
else:
if not current_question.strip():
task_prompt = f"用户【{sender_name}】艾特了你,自然接话,绝对不要输出任何时间戳/日期/时间格式,不要添加自己的名字前缀(如【Gemini】)。"
else:
task_prompt = f"用户【{sender_name}】提问:{current_question},请回答,绝对不要输出任何时间戳/日期/时间格式,不要添加自己的名字前缀(如【Gemini】)。"
final_prompt = f""" 你是 QQ 群里的 AI 伙伴 Gemini。
【长期记忆】:{self.summary}
【近期对话】:{context_str}
【任务】:{task_prompt}
【规则】:
1. 问时间时只说当前北京时间(比如'现在是下午 3 点 20 分'),但不要输出任何括号/时间戳/数字格式的时间;
2. 回复时禁止添加自己的名字或任何标识前缀(如【Gemini】),直接输出回复内容。 """
try:
loop = asyncio.get_running_loop()
resp = await loop.run_in_executor(None, lambda: chat_model.generate_content(final_prompt))
return resp.candidates[0].content.parts[0].text if resp.candidates[0].content.parts[0].text else "(暂时无法回复)"
except Exception as e:
return f"(回复失败:{str(e)})"
async def check_active_intervention(self, current_msg):
import time
now = time.time()
if now - self.last_active_time < self.bot_config["COOLDOWN_SECONDS"]:
return False, None
if (self.repeat_count >= self.bot_config["REPEATER_THRESHOLD"] and random.random() < self.bot_config["REPEATER_EXEC_PROB"] and current_msg.strip()):
try:
loop = asyncio.get_running_loop()
check_resp = await loop.run_in_executor(None, lambda: chat_model.generate_content(f"判断'{current_msg}'是否是正常梗(非广告/脏话):是回 YES,否回 NO"))
if "YES" in check_resp.candidates[0].content.parts[0].text.strip().upper():
self.last_active_time = now
return True, current_msg
except:
pass
return False, None
if "gemini" not in current_msg.lower() and random.random() > self.bot_config["TRIGGER_PROBABILITY"]:
return False, None
try:
loop = asyncio.get_running_loop()
decision_resp = await loop.run_in_executor(None, lambda: chat_model.generate_content(f""" 最近消息:{chr(10).join(self.buffer[-15:])}
当前消息:{current_msg}
没被艾特,是否有必要说话?满足(客观问题无人答/讨论你/有趣的氛围/有趣的话题)回 YES,否则 NO """))
if "YES" in decision_resp.candidates[0].content.parts[0].text.strip().upper():
reply = await self.generate_reply("", "群友", is_active_interrupt=True)
self.last_active_time = now
return True, reply
except Exception as e:
print(f"主动介入失败:{e}")
return False, None
memories = {}
def save_data():
try:
with open(DATA_FILE, 'wb') as f:
pickle.dump(memories, f)
except Exception as e:
print(f"保存记忆失败:{e}")
def load_data():
global memories
if os.path.exists(DATA_FILE):
try:
with open(DATA_FILE, 'rb') as f:
memories = pickle.load(f)
except:
memories = {}
def get_memory(group_id):
if group_id not in memories:
memories[group_id] = GroupMemory(group_id)
return memories[group_id]
load_data()