在构建长期运行的智能体时,上下文窗口管理至关重要。这里保留最近 20 轮对话,而不是无限累加,有助于控制上下文膨胀,避免 Token 成本失控。
7.4 情景记忆的写入接口
工程上建议把写入接口设计成'结构化事件',而不是自由文本日志,这样后续分析、统计和训练样本构造都会更方便。我们定义一个 EpisodicMemoryRepository 类来封装数据库操作:
from datetime import datetime
from typing import Dict, Any
class EpisodicMemoryRepository:
def __init__(self, db_client):
self.db = db_client
def record_event(self, patient_id: str, event_type: str, payload: Dict[str, Any], model_version: str, trace_id: str) -> None:
row = {
"patient_id": patient_id,
"event_type": event_type,
"payload": payload,
"model_version": model_version,
"trace_id": trace_id,
"created_at": datetime.utcnow().isoformat()
}
self.db.insert(row)
注意这里使用了 datetime.utcnow() 统一时间戳格式,方便后续查询排序。trace_id 的引入则保证了全链路追踪的能力,这在排查医疗场景下的推理异常时非常关键。
7.5 异步运行主循环的稳健版本
接下来看服务端的运行逻辑。使用 aio_pika 处理消息队列是 Python 生态中常见的选择,但要注意背压处理和优雅关闭。
import asyncio
import aio_pika
import json
from contextlib import suppress
:
():
.amqp_url = amqp_url
.request_queue = request_queue
._shutdown = asyncio.Event()
() -> :
{: , : payload}
():
message.process(requeue=):
payload = json.loads(message.body.decode())
response = asyncio.wait_for(.handle_payload(payload), timeout=)
(, response)
():
connection = aio_pika.connect_robust(.amqp_url)
channel = connection.channel()
channel.set_qos(prefetch_count=)
queue = channel.declare_queue(.request_queue, durable=)
queue.consume(.process_message)
:
._shutdown.wait()
:
suppress(Exception):
channel.close()
connection.close()


