跳到主要内容基于 MetaGPT 构建 LLM 订阅服务与智能 Agent | 极客日志PythonAI
基于 MetaGPT 构建 LLM 订阅服务与智能 Agent
本文介绍了基于 MetaGPT 框架构建 LLM 订阅服务的完整流程。通过自定义爬虫 Action 爬取 GitHub Trending 或 Huggingface 论文数据,结合 LLM 进行内容分析与总结,并利用 SubscriptionRunner 模块实现定时触发。最终通过 Discord 机器人或 QQ 邮箱回调函数将结果推送到用户端。文章涵盖了环境配置、代码实现细节及扩展方法,帮助开发者快速搭建自动化信息聚合 Agent。
FlinkHero2 浏览 基于 MetaGPT 构建 LLM 订阅服务与智能 Agent
一、介绍
在开源社区中,及时获取最新的技术发展信息和资源对于开发者至关重要。GitHub Trending 页面展示了当前最受欢迎的开源项目,是了解技术趋势的绝佳窗口。本文旨在利用 MetaGPT 框架构建一个 OSS(Open Source Software)订阅 Agent,实现自动爬取热门仓库信息、分析总结并通过指定渠道(如 Discord 或邮箱)推送的功能。
- 基于 Python 构建 GitHub Trending 爬虫 Action
- 基于 MetaGPT 构建仓库内容生成 Action
- 使用 SubscriptionRunner 模块实现定时触发
- 构建 Discord 机器人及邮件订阅回调
二、OSS 订阅 Agent 实现
1. 需求分析
MetaGPT 中的 SubscriptionRunner 模块提供了 Role 的定时运行方式。基于该类,我们可以定时触发运行一个 Role,并将执行输出通知给用户。核心要素包括:
- Role:智能体本身
- Trigger:触发器(决定何时运行)
- Callback:回调函数(处理运行结果)
- 构建一个爬虫
Action,从 GitHub Trending 爬取感兴趣的开源类目信息。
- 设置
Trigger,例如每天早上 9 点定时爬取。
- 设计
Callback,将爬取的信息根据 Prompt 模版整理为每日推送的文章,并发送到订阅平台(Discord、Telegram 或 QQ 邮箱)。
2. OSSAgent Role 实现
我们需要构建两个主要 Action:爬虫 Crawl Action 和 模版输出 Action。
(1) Crawl Action 实现
① 爬取方式确定
- 基于 API:最高效稳定,但 GitHub Trending 未提供公开 API。
- 基于自动化页面抓取:如 Selenium,稳定但开发复杂,需配置环境。
- 基于网页爬虫:使用
requests 或 aiohttp 配合 BeautifulSoup,简单高效。
鉴于 MetaGPT 框架对异步的支持,我们选择使用 aiohttp 库进行数据爬取。
注意:如果爬取目标为国外网页,但代码在国内本地运行,请修改 metagpt 项目文件夹下的 config/key.yaml 中配置代理服务器地址:
GLOBAL_PROXY: http://127.0.0.1:8118
② 确定爬取目标
我们需要爬取的数据包括:仓库名称、链接、简介、Star 数、Fork 数、Readme 文件等。
筛选条件建议:语言(Python/Node.js)、时间范围(This week)、关注目标(Repositories)。
③ 解析网页
进入目标 URL 页面,按 F12 打开开发者工具,定位元素。每个仓库项目信息存储在 class 为 Box-row 的元素中。
关键定位符:
- 单仓库所有信息元素类:
Box-row
- 仓库名称:
h2 a
- 仓库链接:
h2 a 的 href 属性
- Star/Fork:
a.Link--muted[0] / [1]
④ 仓库列表数据爬取
import aiohttp
import asyncio
from bs4 import BeautifulSoup
import json
import os
from github import Github
from dotenv import load_dotenv
load_dotenv()
class CrawlAction:
async def run(self, url: str = "https://github.com/trending/python?since=weekly"):
async with aiohttp.ClientSession() as client:
async with client.get(url) as response:
response.raise_for_status()
html = await response.text()
soup = BeautifulSoup(html, 'html.parser')
repositories = []
g = Github(os.getenv("GITHUB_ACCESS_TOKEN"))
for article in soup.select('article.Box-row'):
repo_info = {}
try:
repo_info['name'] = article.select_one('h2 a').text.strip().replace('\n', '').replace(' ', '')
repo_info['url'] = "https://github.com" + article.select_one('h2 a')['href'].strip()
repo = g.get_repo(repo_info['name'])
contents = repo.get_contents("README.md", ref="main")
repo_info['readme'] = contents.decoded_content.decode()
except Exception as e:
print(f"Error getting README.md file for {repo_info.get('name', 'unknown')}: {e}")
continue
description_element = article.select_one('p')
repo_info['description'] = description_element.text.strip() if description_element else None
language_element = article.select_one('span[itemprop="programmingLanguage"]')
repo_info['language'] = language_element.text.strip() if language_element else None
stars_element = article.select('a.Link--muted')[0]
forks_element = article.select('a.Link--muted')[1]
repo_info['stars'] = stars_element.text.strip()
repo_info['forks'] = forks_element.text.strip()
today_stars_element = article.select_one('span.d-inline-block.float-sm-right')
repo_info['week_stars'] = today_stars_element.text.strip() if today_stars_element else None
if repo_info.get("readme"):
repositories.append(repo_info)
return json.dumps(repositories)
(2) AnalysisOSSRepository Action 实现
获取数据后,让 LLM 进行分析。我们将单个仓库信息列为 Prompt 模版字符串,设定 LLM 的处理逻辑和输出格式。
from typing import Any
from metagpt.actions.action import Action
import json
class AnalysisOSSRepository(Action):
def prompt_format(self, repo_info):
question = f"""# 需求
您是一名 GitHub 仓库分析师,旨在为用户提供有见地的、个性化的仓库分析。
关于仓库的标题
仓库分析:深入探索 {repo_info['name']} 项目的特点和优势!
---
格式示例
## 项目名称
### 项目地址
xxx
### 仓库介绍
xxx 是一个用于 xxx 的开源项目。它使用 xxx 技术栈实现。
### 特点和优势
- 特点 1
- 特点 2
### 部署和使用
可以通过 Docker 或本地部署的方式使用该项目。
---
当前已有信息如下:
项目名称:{repository_name}
项目地址:{repository_URL}
项目 Star:{repository_star}
项目 Fork:{repository_fork}
项目语言:{repository_language}
项目 readme:{repository_readme}
""".format(
repository_name=repo_info["name"],
repository_URL=repo_info["url"],
repository_star=repo_info["stars"],
repository_fork=repo_info["forks"],
repository_language=repo_info["language"],
repository_readme=repo_info["readme"]
)
return question
async def run(self, repo_info_list: Any):
repo_summary_list = []
for repo_info in json.loads(repo_info_list):
repository_info = self.prompt_format(repo_info)
summary = await self._aask(repository_info)
repo_summary_list.append(summary)
return repo_summary_list
(3) OSSWatchAgent 角色设计
from metagpt.roles import Role
from metagpt.logs import logger
from metagpt.schema import Message
class OssWatcher(Role):
def __init__(self):
super().__init__(
name="cheems",
profile="OssWatcher",
goal="根据我提供给你的资料生成一个有见地的 GitHub 仓库分析报告。",
constraints="仅基于提供的 GitHub 仓库数据进行分析。",
)
self.set_actions([CrawlAction(), AnalysisOSSRepository()])
self._set_react_mode(react_mode="by_order")
async def _act(self) -> Message:
logger.info(f"{self._setting}: ready to {self.rc.todo}")
todo = self.rc.todo
msg = self.get_memories(k=1)[0]
result = await todo.run(msg.content)
new_msg = Message(content=str(result), role=self.profile, cause_by=type(todo))
self.rc.memory.add(new_msg)
return result
(4) 定时触发器(Trigger)的实现
from pytz import timezone
from aiocron import crontab
from typing import Optional
from pytz import BaseTzInfo
from metagpt.schema import Message
class GithubTrendingCronTrigger:
def __init__(self, spec: str, tz: Optional[BaseTzInfo] = None, url: str = "https://github.com/trending"):
self.crontab = crontab(spec, tz=tz)
self.url = url
def __aiter__(self):
return self
async def __anext__(self):
await self.crontab.next()
return Message(content=self.url)
beijing_tz = timezone('Asia/Shanghai')
cron_trigger = GithubTrendingCronTrigger("0 9 * * *", tz=beijing_tz)
(5) Callback 设计
以 Discord 为例,实现 Agent 自动发布订阅信息。
import discord
import os
from metagpt.config import CONFIG
async def discord_callback(msg_list):
intents = discord.Intents.default()
intents.message_content = True
token = os.environ["DISCORD_TOKEN"]
channel_id = int(os.environ["DISCORD_CHANNEL_ID"])
client = discord.Client(intents=intents)
async with client:
await client.login(token)
channel = await client.fetch_channel(channel_id)
for repo in msg_list:
lines = []
for line in repo.splitlines():
if line.startswith(("# ", "## ", "### ")):
if lines:
await channel.send("\n".join(lines))
lines = []
lines.append(line)
if lines:
await channel.send("\n".join(lines))
3. 基于 QQ 邮箱实现 Huggingface 论文总结订阅 Agent
除了 GitHub,我们还可以爬取 Huggingface 论文页面内容,总结为文档并发送到 QQ 邮箱。
环境配置
GITHUB_ACCESS_TOKEN
QQ_EMAIL_USER
QQ_EMAIL_PASSWORD
QQ_EMAIL_TO
pip install aiohttp beautifulsoup4 pydantic python-dotenv discord.py aiocron smtplib fire
完整代码示例
import asyncio
import os
import smtplib
from typing import Any, AsyncGenerator, Awaitable, Callable, Dict, Optional, List
from email.mime.multipart import MIMEMultipart
from email.mime.text import MIMEText
from github import Github
import aiohttp
from aiocron import crontab
from bs4 import BeautifulSoup
from pydantic import BaseModel, Field
from pytz import BaseTzInfo
import json
from metagpt.actions.action import Action
from metagpt.logs import logger
from metagpt.roles import Role
from metagpt.schema import Message
from dotenv import load_dotenv
load_dotenv()
class SubscriptionRunner(BaseModel):
tasks: Dict[Role, asyncio.Task] = Field(default_factory=dict)
class Config:
arbitrary_types_allowed = True
async def subscribe(
self,
role: Role,
trigger: AsyncGenerator[Message, None],
callback: Callable[[Message], Awaitable[None]],
):
loop = asyncio.get_running_loop()
async def _start_role():
async for msg in trigger:
resp = await role.run(msg)
await callback(resp)
self.tasks[role] = loop.create_task(_start_role(), name=f"Subscription-{role}")
async def unsubscribe(self, role: Role):
task = self.tasks.pop(role)
task.cancel()
async def run(self, raise_exception: bool = True):
while True:
for role, task in self.tasks.items():
if task.done():
if task.exception():
if raise_exception:
raise task.exception()
logger.opt(exception=task.exception()).error(f"Task {task.get_name()} run error")
else:
logger.warning(f"Task {task.get_name()} has completed.")
self.tasks.pop(role)
break
else:
await asyncio.sleep(1)
class CrawlAction(Action):
async def run(self, url: str = "https://huggingface.co/papers"):
async with aiohttp.ClientSession() as client:
papers = await self._fetch_papers(url, client)
return json.dumps(papers[:5])
async def _fetch_papers(self, url: str, client: aiohttp.ClientSession):
async with client.get(url) as response:
response.raise_for_status()
html = await response.text()
soup = BeautifulSoup(html, 'html.parser')
papers = []
for article in soup.select('h3 > a[href^="/papers/"]')[:5]:
paper_info = {}
paper_info['title'] = article.text.strip()
paper_info['url'] = "https://huggingface.co" + article['href'].strip()
paper_html = await self._fetch_paper_detail(paper_info['url'], client)
paper_soup = BeautifulSoup(paper_html, 'html.parser')
paper_info['abstract'] = paper_soup.find("section").get_text(separator=' ', strip=True).strip()
papers.append(paper_info)
return papers
async def _fetch_paper_detail(self, url: str, client: aiohttp.ClientSession) -> str:
async with client.get(url) as response:
response.raise_for_status()
return await response.text()
class AnalysisPaper(Action):
def prompt_format(self, paper_info):
question = f"""# 需求
您是一名学术论文分析师,旨在为用户提供有见地的、个性化的论文分析。
论文分析:深入探索 {paper_info['title']} 论文的特点和贡献!
---
格式示例
# 论文标题
## 论文链接
## 论文摘要
## 研究背景
## 研究方法
## 实验结果
## 结论
---
当前已有信息如下:
论文标题:{paper_title}
论文链接:{paper_URL}
论文摘要:{paper_abstract}
""".format(
paper_title=paper_info["title"],
paper_URL=paper_info["url"],
paper_abstract=paper_info["abstract"]
)
return question
async def run(self, paper_info_list: Any):
paper_summary_list = []
for paper_info in json.loads(paper_info_list):
paper_info_str = self.prompt_format(paper_info)
summary = await self._aask(paper_info_str)
paper_summary_list.append(summary)
return paper_summary_list
class PaperWatcher(Role):
def __init__(self):
super().__init__(
name="cheems",
profile="PaperWatcher",
goal="根据我提供给你的资料生成一个有见地的学术论文分析报告。",
constraints="仅基于提供的论文数据进行分析。",
)
self.set_actions([CrawlAction(), AnalysisPaper()])
self._set_react_mode(react_mode="by_order")
async def _act(self) -> Message:
logger.info(f"{self._setting}: ready to {self.rc.todo}")
todo = self.rc.todo
msg = self.get_memories(k=1)[0]
result = await todo.run(msg.content)
new_msg = Message(content=str(result), role=self.profile, cause_by=type(todo))
self.rc.memory.add(new_msg)
return result
class HuggingfacePapersCronTrigger:
def __init__(self, spec: str, tz: Optional[BaseTzInfo] = None, url: str = "https://huggingface.co/papers"):
self.crontab = crontab(spec, tz=tz)
self.url = url
def __aiter__(self):
return self
async def __anext__(self):
await self.crontab.next()
return Message(content=self.url)
async def email_callback(paper_summary_list: List):
gmail_user = os.getenv("QQ_EMAIL_USER")
gmail_password = os.getenv("QQ_EMAIL_PASSWORD")
to = os.getenv("QQ_EMAIL_TO")
subject = "Huggingface Papers Weekly Digest"
body = "\n\n".join(paper_summary_list)
msg = MIMEMultipart()
msg['Subject'] = subject
msg['From'] = gmail_user
msg['To'] = to
msg.attach(MIMEText(body, 'plain'))
server = smtplib.SMTP('smtp.qq.com', 587)
server.starttls()
server.login(gmail_user, gmail_password)
server.send_message(msg)
server.quit()
print("send success!")
async def main(spec: str = "* * * * *", email: bool = True):
callbacks = []
if email:
callbacks.append(email_callback)
if not callbacks:
async def _print(msg: Message):
print(msg.content)
callbacks.append(_print)
async def callback(msg):
await asyncio.gather(*(call(msg) for call in callbacks))
runner = SubscriptionRunner()
await runner.subscribe(PaperWatcher(), HuggingfacePapersCronTrigger(spec), callback)
await runner.run()
if __name__ == "__main__":
import fire
fire.Fire(main)
四、总结
本文详细介绍了如何利用 MetaGPT 订阅模块构建智能 Agent。关键点如下:
- 修改订阅平台:只需修改
callback 回调函数。
- 修改爬取网站:只需修改
Crawl Action。
- 修改总结内容风格:只需修改
AnalysisOSSRepository 或 AnalysisPaper 模块。
通过灵活组合这些模块,可以实现各种定制化的信息订阅服务。
相关免费在线工具
- RSA密钥对生成器
生成新的随机RSA私钥和公钥pem证书。 在线工具,RSA密钥对生成器在线工具,online
- Mermaid 预览与可视化编辑
基于 Mermaid.js 实时预览流程图、时序图等图表,支持源码编辑与即时渲染。 在线工具,Mermaid 预览与可视化编辑在线工具,online
- curl 转代码
解析常见 curl 参数并生成 fetch、axios、PHP curl 或 Python requests 示例代码。 在线工具,curl 转代码在线工具,online
- Base64 字符串编码/解码
将字符串编码和解码为其 Base64 格式表示形式即可。 在线工具,Base64 字符串编码/解码在线工具,online
- Base64 文件转换器
将字符串、文件或图像转换为其 Base64 表示形式。 在线工具,Base64 文件转换器在线工具,online
- Markdown转HTML
将 Markdown(GFM)转为 HTML 片段,浏览器内 marked 解析;与 HTML转Markdown 互为补充。 在线工具,Markdown转HTML在线工具,online