这里保留最近 20 轮对话,而不是无限累加,有助于控制上下文膨胀。
7.4 情景记忆的写入接口
from datetime import datetime
from typing import Dict, Any
class EpisodicMemoryRepository:
def __init__(self, db_client):
self.db = db_client
def record_event(
self,
patient_id: str,
event_type: str,
payload: Dict[str, Any],
model_version: str,
trace_id: str
) -> None:
row = {
"patient_id": patient_id,
"event_type": event_type,
"payload": payload,
"model_version": model_version,
"trace_id": trace_id,
"created_at": datetime.utcnow().isoformat()
}
self.db.insert(row)
工程上建议把写入接口设计成'结构化事件',而不是自由文本日志,这样后续分析、统计和训练样本构造都会更方便。
7.5 异步运行主循环的稳健版本
import asyncio
import aio_pika
import json
from contextlib import suppress
class AgentRuntime:
def __init__(self, amqp_url: str, request_queue: ):
.amqp_url = amqp_url
.request_queue = request_queue
._shutdown = asyncio.Event()
() -> :
{: , : payload}
():
message.process(requeue=):
payload = json.loads(message.body.decode())
response = asyncio.wait_for(.handle_payload(payload), timeout=)
(, response)
():
connection = aio_pika.connect_robust(.amqp_url)
channel = connection.channel()
channel.set_qos(prefetch_count=)
queue = channel.declare_queue(.request_queue, durable=)
queue.consume(.process_message)
:
._shutdown.wait()
:
suppress(Exception):
channel.close()


