LangChain 提供了一个强大的回调系统,允许开发者在 LLM(大语言模型)应用程序的各个阶段挂接自定义逻辑。这对于日志记录、性能监控、流式传输输出以及其他调试任务非常有用。通过合理使用 Callbacks,可以显著提升应用的透明度和可维护性。
1. LangChain Callbacks 模块提供的 Callback 接口一览
LangChain 的核心回调类是 BaseCallbackHandler。开发者可以通过继承此类并重写特定方法来捕获不同阶段的执行事件。以下是主要接口的说明:
class BaseCallbackHandler:
"""Base callback handler that can be used to handle callbacks from langchain."""
def on_llm_start(
self, serialized: Dict[str, Any], prompts: List[str], **kwargs: Any
) -> Any:
"""Run when LLM starts running."""
def on_chat_model_start(
self, serialized: Dict[str, Any], messages: List[List[BaseMessage]], **kwargs: Any
) -> Any:
"""Run when Chat Model starts running."""
def on_llm_new_token(self, token: str, **kwargs: Any) -> Any:
"""Run on new LLM token. Only available when streaming is enabled."""
def on_llm_end(self, response: LLMResult, **kwargs: Any) -> Any:
"""Run when LLM ends running."""
def on_llm_error(
self, error: Union[Exception, KeyboardInterrupt], **kwargs: Any
) -> Any:
"""Run when LLM errors."""
def on_chain_start(
self, serialized: Dict[str, Any], inputs: Dict[str, Any], **kwargs: Any
) -> Any:
"""Run when chain starts running."""
def on_chain_end(self, outputs: Dict[str, Any], **kwargs: Any) -> Any:
"""Run when chain ends running."""
def on_chain_error(
self, error: Union[Exception, KeyboardInterrupt], **kwargs: Any
) -> Any:
"""Run when chain errors."""
def on_tool_start(
self, serialized: Dict[str, Any], input_str: str, **kwargs: Any
) -> Any:
"""Run when tool starts running."""
def on_tool_end(self, output: str, **kwargs: Any) -> Any:
"""Run when tool ends running."""
def on_tool_error(
self, error: Union[Exception, KeyboardInterrupt], **kwargs: Any
) -> Any:
"""Run when tool errors."""
def on_text(self, text: str, **kwargs: Any) -> Any:
"""Run on arbitrary text."""
def on_agent_action(self, action: AgentAction, **kwargs: Any) -> Any:
"""Run on agent action."""
def on_agent_finish(self, finish: AgentFinish, **kwargs: Any) -> Any:
"""Run on agent end."""
2. 最常用的 Callback:StdOutCallbackHandler
StdOutCallbackHandler 将所有事件的日志作为标准输出,打印到终端中。这是开发过程中最直观的调试方式。
注意:当 verbose 参数设置为 true 时,StdOutCallbackHandler 会被默认启用。这意味着即使不显式声明 callbacks,运行过程的日志也会全部打印到终端窗口中。
以下示例展示了三种设置 StdOutCallbackHandler 的方式:
from langchain.callbacks import StdOutCallbackHandler
from langchain.chains import LLMChain
from langchain_openai import OpenAI
from langchain.prompts import PromptTemplate
handler = StdOutCallbackHandler()
llm = OpenAI()
prompt = PromptTemplate.from_template("1 + {number} = ")
chain = LLMChain(llm=llm, prompt=prompt, callbacks=[handler])
chain.invoke({"number": 2})
chain_verbose = LLMChain(llm=llm, prompt=prompt, verbose=True)
chain_verbose.invoke({"number": 2})
chain_default = LLMChain(llm=llm, prompt=prompt)
chain_default.invoke({"number": 2}, {"callbacks": [handler]})
从运行结果可以看出,这三种方式均能实现相同的日志输出效果。开发者可根据具体场景选择全局配置或局部注入。
3. 各种类型的 Callback 实践
3.1 通用 Callback:BaseCallbackHandler
如果需要自定义行为,可以实现一个自己的 Callback Handler,继承自 BaseCallbackHandler,然后重写自己需要的回调函数即可。
from langchain.callbacks.base import BaseCallbackHandler
from langchain.schema import HumanMessage
from langchain_openai import ChatOpenAI
class MyCustomHandler(BaseCallbackHandler):
def on_llm_new_token(self, token: str, **kwargs) -> None:
print(f"My custom handler, token: {token}", end="", flush=True)
chat = ChatOpenAI(max_tokens=25, streaming=True, callbacks=[MyCustomHandler()])
chat([HumanMessage(content="Tell me a joke")])
3.2 异步 Callback:AsyncCallbackHandler
有时候在 Callback 内需要进行大量的数据处理(如写入数据库、发送网络请求),如果使用同步 Callback,会阻塞主线程运行,导致响应变慢。此时异步 Callback 就非常有用。
实现一个自己的 Callback Handler,继承自 AsyncCallbackHandler,然后重写自己需要的回调函数即可。注意异步方法通常以 async_ 开头或在签名中包含 await。
import asyncio
from langchain.callbacks.base import AsyncCallbackHandler
from langchain.schema import HumanMessage
from langchain_openai import ChatOpenAI
class MyCustomAsyncHandler(AsyncCallbackHandler):
"""Async callback handler that can be used to handle callbacks from langchain."""
async def on_llm_new_token(self, token: str, **kwargs) -> None:
await asyncio.sleep(0.1)
print(f"Async token received: {token}")
chat = ChatOpenAI(max_tokens=25, streaming=True, callbacks=[MyCustomAsyncHandler()])
chat([HumanMessage(content="Explain quantum computing")])
3.3 写日志 / 日志文件:FileCallbackHandler
开发项目过程中,写日志是重要的调试手段之一。正式的项目中,不能总是将日志输出到终端中,这样无法传递和保存。FileCallbackHandler 可以将日志写入本地文件。
from langchain.callbacks import FileCallbackHandler
from langchain.chains import LLMChain
from langchain.prompts import PromptTemplate
from langchain_openai import OpenAI
logfile = "output.log"
handler = FileCallbackHandler(logfile)
llm = OpenAI()
prompt = PromptTemplate.from_template("1 + {number} = ")
chain = LLMChain(llm=llm, prompt=prompt, callbacks=[handler], verbose=True)
answer = chain.run(number=2)
日志解析提示:
有时生成的 log 文件包含 ANSI 转义字符导致乱码,可以使用 ansi2html 库进行解析展示:
pip install --upgrade ansi2html ipython
from ansi2html import Ansi2HTMLConverter
from IPython.display import HTML, display
with open("output.log", "r", encoding="utf-8") as f:
content = f.read()
conv = Ansi2HTMLConverter()
html = conv.convert(content, full=True)
display(HTML(html))
3.4 Token 计数:get_openai_callback
Token 消耗直接关系到成本,因此知道程序运行中使用了多少 Token 非常重要。通过 get_openai_callback 上下文管理器可以轻松获取 Token 消耗统计。
from langchain.callbacks import get_openai_callback
from langchain_openai import OpenAI
llm = OpenAI(temperature=0)
with get_openai_callback() as cb:
llm("What is the square root of 4?")
total_tokens = cb.total_tokens
print("total_tokens: ", total_tokens)
4. 最佳实践与注意事项
- 性能影响:频繁的回调处理可能会增加延迟。对于高并发场景,建议使用异步回调或减少不必要的日志级别。
- 错误处理:在
on_llm_error 或 on_chain_error 中应妥善记录异常堆栈信息,避免掩盖真实问题。
- 敏感数据:确保回调中不包含用户隐私或 API Key 等敏感信息的明文输出。
- 全局配置:可以在 LangChain 的初始化配置中设置默认的 handlers,但建议优先在具体的 Chain 或 Agent 实例中显式指定,以保证控制粒度。
5. 总结
本文详细讲解了 LangChain 的 Callbacks 模块,涵盖了从基础接口到高级异步处理的多种用法。通过实践 StdOutCallbackHandler、FileCallbackHandler 以及自定义 Handler,开发者可以实现日志记录、Token 计数、流式输出监控等功能。这些机制对于 Debug 程序和监控程序的各个阶段至关重要,有助于构建更健壮、可观测的大模型应用。