Python 3.12 logging - 20 - handlers - QueueHandler

Python 3.12 logging - 20 - handlers - QueueHandler

Python 3.12 logging handler - QueueHandler详解

在 Python 的 logging 模块中,QueueHandlerQueueListener 是一对黄金搭档,用于实现异步日志记录。它们通过一个线程安全的队列将日志产生与日志处理分离,从而避免 I/O 操作阻塞业务线程,特别适合高并发、多线程环境。本文将详细介绍它们的工作原理,并通过两个线程安全的 Demo 展示其用法。


一、为什么需要 QueueHandler + QueueListener?

在多线程程序中,如果多个线程直接向同一个文件写入日志,会面临锁竞争和 I/O 阻塞问题:

  • 每个日志写入都需要获取文件锁,导致线程排队等待。
  • 磁盘 I/O 可能耗时较长,阻塞业务逻辑。

QueueHandler 将日志记录放入内存队列后立即返回,业务线程无需等待日志真正写入。QueueListener 在单独的线程中从队列取出记录,并交由实际的处理器(如 FileHandler)进行处理。这样既保证了日志的线程安全,又大幅提升了程序性能。


二、核心组件

1. QueueHandler

  • 继承自 logging.Handler
  • 将日志记录放入指定的队列(通常为 queue.Queue)。
  • 由于队列本身是线程安全的,多个线程可以安全地向其放入记录。

2. QueueListener

  • 位于 logging.handlers 模块。
  • 在一个独立线程中运行,不断从队列中取出日志记录。
  • 将取出的记录传递给一个或多个目标处理器(如 FileHandlerStreamHandler)进行处理。
  • 可以通过 respect_handler_level 参数控制是否尊重目标处理器的级别过滤。

三、Demo 1:基础多线程异步日志

本示例演示如何在多线程环境中使用 QueueHandlerQueueListener,确保日志有序写入文件而不阻塞工作线程。

代码

import logging import logging.handlers import queue import threading import time import random defworker(logger, thread_id):"""模拟工作线程,随机产生不同级别的日志"""for i inrange(5): level = random.choice([logging.INFO, logging.WARNING, logging.ERROR]) logger.log(level,f"Thread-{thread_id} message {i}") time.sleep(random.uniform(0.1,0.3))defmain():# 1. 创建一个线程安全的队列 log_queue = queue.Queue()# 2. 创建 QueueHandler,并设置级别(可选) queue_handler = logging.handlers.QueueHandler(log_queue) queue_handler.setLevel(logging.DEBUG)# 3. 创建目标处理器(真正执行输出的处理器) file_handler = logging.FileHandler('async.log', encoding='utf-8') file_handler.setFormatter(logging.Formatter('%(asctime)s - %(threadName)s - %(levelname)s - %(message)s'))# 4. 创建 QueueListener,将队列中的日志交给 file_handler 处理 listener = logging.handlers.QueueListener(log_queue, file_handler, respect_handler_level=True)# 5. 配置根日志器,添加 QueueHandler root_logger = logging.getLogger() root_logger.setLevel(logging.DEBUG) root_logger.addHandler(queue_handler)# 6. 启动 QueueListener listener.start()# 7. 启动多个工作线程 threads =[]for i inrange(3): t = threading.Thread(target=worker, args=(root_logger, i), name=f"Worker-{i}") t.start() threads.append(t)# 8. 等待所有工作线程结束for t in threads: t.join()# 9. 停止监听器(确保所有日志被处理完) listener.stop()print("所有日志已写入 async.log,请查看文件。")if __name__ =='__main__': main()

逐行解析

代码说明
log_queue = queue.Queue()创建一个线程安全的队列,用于在 QueueHandlerQueueListener 之间传递日志记录。
queue_handler = logging.handlers.QueueHandler(log_queue)创建 QueueHandler,并关联到队列。该处理器会将日志记录放入队列。
queue_handler.setLevel(logging.DEBUG)设置处理器级别,所有级别日志都会放入队列(因为根日志器级别也是 DEBUG)。
file_handler = logging.FileHandler('async.log', encoding='utf-8')创建实际写入文件的处理器,并设置格式。
listener = logging.handlers.QueueListener(log_queue, file_handler, respect_handler_level=True)创建 QueueListener,指定队列和要调用的处理器。respect_handler_level=True 表示在将记录传递给处理器时,会检查处理器自身的级别。
root_logger.addHandler(queue_handler)QueueHandler 添加到根日志器。此后所有通过根日志器发出的日志都会被放入队列。
listener.start()启动监听器线程,开始从队列取记录并处理。
worker 函数模拟工作线程,随机产生日志,并模拟一些耗时操作。
listener.stop()停止监听器,它会等待队列中的剩余记录被处理完毕后才返回。

线程安全分析

  • queue.Queue 内部使用锁保护,多个线程同时 put 是安全的,不会造成数据损坏。
  • QueueListener 在单个线程中依次处理记录,因此对 file_handler 的调用是串行的,不会发生多个线程竞争文件锁的情况。
  • 工作线程只需将日志放入队列,无需等待 I/O,因此可以并发执行,极大提高了吞吐量。

运行结果

2026-03-01 xx:xx:xx,879 - Worker-0 - INFO - Thread-0 message 0 2026-03-01 xx:xx:xx,880 - Worker-1 - ERROR - Thread-1 message 0 2026-03-01 xx:xx:xx,880 - Worker-2 - ERROR - Thread-2 message 0 2026-03-01 xx:xx:xx,990 - Worker-2 - WARNING - Thread-2 message 1 2026-03-01 xx:xx:xx,116 - Worker-0 - WARNING - Thread-0 message 1 2026-03-01 xx:xx:xx,168 - Worker-1 - INFO - Thread-1 message 1 2026-03-01 xx:xx:xx,174 - Worker-2 - ERROR - Thread-2 message 2 2026-03-01 xx:xx:xx,251 - Worker-0 - ERROR - Thread-0 message 2 2026-03-01 xx:xx:xx,302 - Worker-1 - ERROR - Thread-1 message 2 2026-03-01 xx:xx:xx,463 - Worker-2 - WARNING - Thread-2 message 3 2026-03-01 xx:xx:xx,492 - Worker-1 - INFO - Thread-1 message 3 2026-03-01 xx:xx:xx,548 - Worker-0 - ERROR - Thread-0 message 3 2026-03-01 xx:xx:xx,640 - Worker-1 - INFO - Thread-1 message 4 2026-03-01 xx:xx:xx,733 - Worker-2 - INFO - Thread-2 message 4 2026-03-01 xx:xx:xx,760 - Worker-0 - INFO - Thread-0 message 4 

四、Demo 2:模拟高并发写入与性能对比

本示例展示在高并发下,使用 QueueHandler 与直接使用 FileHandler 的性能差异,并验证日志记录的完整性。

代码

import logging import logging.handlers import queue import threading import time defdirect_logging(logger, count, thread_id):"""直接使用 FileHandler 写入日志(对比组)"""for i inrange(count): logger.info(f"Direct-{thread_id} msg {i}")defasync_logging(logger, count, thread_id):"""通过 QueueHandler 写入日志(测试组)"""for i inrange(count): logger.info(f"Async-{thread_id} msg {i}")defmeasure_time(func,*args,**kwargs): start = time.perf_counter() func(*args,**kwargs)return time.perf_counter()- start defmain():# ---------- 直接写入方式 ---------- direct_logger = logging.getLogger('direct') direct_logger.setLevel(logging.INFO) direct_handler = logging.FileHandler('direct.log', encoding='utf-8') direct_handler.setFormatter(logging.Formatter('%(message)s')) direct_logger.addHandler(direct_handler)# 启动多个线程直接写日志 threads =[]for i inrange(10): t = threading.Thread(target=direct_logging, args=(direct_logger,100, i)) t.start() threads.append(t)for t in threads: t.join()# ---------- 异步方式 ---------- log_queue = queue.Queue() queue_handler = logging.handlers.QueueHandler(log_queue) async_logger = logging.getLogger('async') async_logger.setLevel(logging.INFO) async_logger.addHandler(queue_handler) file_handler = logging.FileHandler('async.log', encoding='utf-8') file_handler.setFormatter(logging.Formatter('%(message)s')) listener = logging.handlers.QueueListener(log_queue, file_handler) listener.start() threads =[]for i inrange(10): t = threading.Thread(target=async_logging, args=(async_logger,100, i)) t.start() threads.append(t)for t in threads: t.join() listener.stop()print("日志写入完成,请比较 direct.log 和 async.log 的大小与内容完整性。")if __name__ =='__main__': main()

逐行解析

  • 本 Demo 没有直接测量时间,而是通过构造大量并发写入来观察系统负载。实际运行时,你可以感受到直接写入方式下程序可能明显卡顿,而异步方式则流畅得多。
  • 两个方式都启动了 10 个线程,每个线程写入 10 条日志,总共 100 条日志。
  • 直接写入方式中,多个线程竞争同一个文件句柄,内核频繁加锁,导致性能下降。
  • 异步方式中,日志记录被快速放入队列,工作线程几乎立即返回,实际写入由单个监听器线程处理,避免了锁竞争。

线程安全验证

  • 对比两个输出文件,会发现 direct.log 中的日志行可能出现交错(因为多个线程同时写入,文件锁虽然保证写入原子性,但顺序无法预测),而 async.log 中的日志顺序由监听器从队列取出的顺序决定,可能不完全与产生顺序一致(因为队列的 FIFO 特性保证顺序,但多个线程放入的顺序取决于调度),但不会出现数据损坏。
  • 可以通过统计行数验证完整性:两个文件都应该恰好包含 100 行。

运行结果

  • async.log
Async-0 msg 0 Async-0 msg 1 Async-0 msg 2 Async-0 msg 3 Async-0 msg 4 Async-0 msg 5 Async-0 msg 6 Async-0 msg 7 Async-0 msg 8 Async-0 msg 9 Async-1 msg 0 Async-1 msg 1 Async-1 msg 2 ... 
  • direct.log
Direct-0 msg 0 Direct-1 msg 0 Direct-0 msg 1 Direct-2 msg 0 Direct-3 msg 0 Direct-1 msg 1 Direct-4 msg 0 Direct-5 msg 0 Direct-0 msg 2 Direct-2 msg 1 Direct-3 msg 1 Direct-1 msg 2 Direct-6 msg 0 ... 

五、应用场景

场景说明
高并发 Web 服务每个请求可能产生日志,如果直接在请求线程中写入文件,会影响响应速度。使用 QueueHandler 可将日志异步处理。
多线程数据处理多个工作线程同时处理数据并记录日志,异步日志避免线程阻塞。
实时性要求不高的后台任务日志可以批量处理,减少 I/O 次数,提升整体吞吐量。
日志聚合系统可以将日志放入队列,由单独的进程或线程发送到远程日志服务器(如 Elasticsearch)。

六、注意事项

  1. 队列大小:默认队列无界,但内存有限。可设置 maxsize 限制队列大小,防止内存溢出。当队列满时,put() 会阻塞,可根据需要调整。
  2. 监听器异常QueueListener 默认会捕获处理器抛出的异常并打印到 sys.stderr。可以通过传入 handlerhandleError 方法自定义错误处理。
  3. 日志顺序:虽然队列是 FIFO,但多线程放入的顺序不可预测,因此最终日志时间顺序可能与产生顺序略有差异。如需严格顺序,可考虑使用锁或单线程产生日志。
  4. 停止监听器:调用 stop() 会等待队列中所有记录被处理,确保日志不丢失。
  5. respect_handler_level:如果设为 TrueQueueListener 在将记录传给目标处理器前会检查处理器级别;否则由 QueueHandler 级别控制,目标处理器级别被忽略。

七、总结

QueueHandlerQueueListener 是 Python logging 模块提供的强大工具,它们将日志记录的生产与消费解耦,既保证了线程安全,又显著提升了程序的并发性能。通过合理配置,你可以轻松构建出适合高并发场景的日志系统。

如果在学习过程中遇到问题,欢迎在评论区留言讨论!

Read more

解锁DeepSeek潜能:Docker+Ollama打造本地大模型部署新范式

解锁DeepSeek潜能:Docker+Ollama打造本地大模型部署新范式

🐇明明跟你说过:个人主页 🏅个人专栏:《深度探秘:AI界的007》 🏅 🔖行路有良友,便是天堂🔖 目录 一、引言 1、什么是Docker 2、什么是Ollama 二、准备工作 1、操作系统 2、镜像准备 三、安装 1、安装Docker 2、启动Ollama 3、拉取Deepseek大模型 4、启动Deepseek  一、引言 1、什么是Docker Docker:就像一个“打包好的App” 想象一下,你写了一个很棒的程序,在自己的电脑上运行得很好。但当你把它发给别人,可能会遇到各种问题: * “这个软件需要 Python 3.8,但我只有 Python 3.6!

By Ne0inhk
深挖 DeepSeek 隐藏玩法·智能炼金术2.0版本

深挖 DeepSeek 隐藏玩法·智能炼金术2.0版本

前引:屏幕前的你还在AI智能搜索框这样搜索吗?“这道题怎么写”“苹果为什么红”“怎么不被发现翘课” ,。看到此篇文章的小伙伴们!请准备好你的思维魔杖,开启【霍格沃茨模式】,看我如何更新秘密的【知识炼金术】,我们一起来解锁更加刺激的剧情!友情提醒:《《《前方高能》》》 目录 在哪使用DeepSeek 如何对提需求  隐藏玩法总结 几个高阶提示词 职场打工人 自媒体创作 电商实战 程序员开挂 非适用场地 “服务器繁忙”如何解决 (1)硅基流动平台 (2)Chatbox + API集成方案 (3)各大云平台 搭建个人知识库 前置准备 下载安装AnythingLLM 选择DeepSeek作为AI提供商 创作工作区 导入文档 编辑  编辑 小编寄语 ——————————————————————————————————————————— 在哪使用DeepSeek 我们解锁剧情前,肯定要知道在哪用DeepSeek!咯,为了照顾一些萌新朋友,它的下载方式我放在下面了,拿走不谢!  (1)

By Ne0inhk
【AI大模型】DeepSeek + 通义万相高效制作AI视频实战详解

【AI大模型】DeepSeek + 通义万相高效制作AI视频实战详解

目录 一、前言 二、AI视频概述 2.1 什么是AI视频 2.2 AI视频核心特点 2.3 AI视频应用场景 三、通义万相介绍 3.1 通义万相概述 3.1.1 什么是通义万相 3.2 通义万相核心特点 3.3 通义万相技术特点 3.4 通义万相应用场景 四、DeepSeek + 通义万相制作AI视频流程 4.1 DeepSeek + 通义万相制作视频优势 4.1.1 DeepSeek 优势 4.1.2 通义万相视频生成优势 4.2

By Ne0inhk
【DeepSeek微调实践】DeepSeek-R1大模型基于MS-Swift框架部署/推理/微调实践大全

【DeepSeek微调实践】DeepSeek-R1大模型基于MS-Swift框架部署/推理/微调实践大全

系列篇章💥 No.文章01【DeepSeek应用实践】DeepSeek接入Word、WPS方法详解:无需代码,轻松实现智能办公助手功能02【DeepSeek应用实践】通义灵码 + DeepSeek:AI 编程助手的实战指南03【DeepSeek应用实践】Cline集成DeepSeek:开源AI编程助手,终端与Web开发的超强助力04【DeepSeek开发入门】DeepSeek API 开发初体验05【DeepSeek开发入门】DeepSeek API高级开发指南(推理与多轮对话机器人实践)06【DeepSeek开发入门】Function Calling 函数功能应用实战指南07【DeepSeek部署实战】DeepSeek-R1-Distill-Qwen-7B:本地部署与API服务快速上手08【DeepSeek部署实战】DeepSeek-R1-Distill-Qwen-7B:Web聊天机器人部署指南09【DeepSeek部署实战】DeepSeek-R1-Distill-Qwen-7B:基于vLLM 搭建高性能推理服务器10【DeepSeek部署实战】基于Ollama快速部署Dee

By Ne0inhk