跳到主要内容Python 多线程日志错乱:logging.Handler 的并发问题 | 极客日志Python算法
Python 多线程日志错乱:logging.Handler 的并发问题
Python 多线程环境下 logging 模块常出现日志错乱,根源在于 Handler 级别的线程安全机制在高并发及自定义场景下的竞态条件。 Handler.emit()、Formatter.format() 及底层 I/O 操作的原子性问题,通过源码揭示了锁竞争与交错写入的风险。解决方案包括使用 QueueHandler 实现异步队列处理、自定义同步锁机制、批量批处理以及异步日志处理器。生产环境建议采用分层架构结合监控诊断工具,确保日志系统的稳定性与性能。
赛博行者1 浏览 Python 多线程日志错乱:logging.Handler 的并发问题
1. 问题现象与复现
1.1 典型的日志错乱场景
在多线程环境中,最常见的日志错乱表现为以下几种形式:
import logging
import threading
import time
from concurrent.futures import ThreadPoolExecutor
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s [%(threadName)s] %(levelname)s: %(message)s',
handlers=[
logging.FileHandler('app.log'),
logging.StreamHandler()
]
)
logger = logging.getLogger(__name__)
def worker_task(task_id):
"""模拟工作任务,产生大量日志"""
for i in range(100):
message = f"Task {task_id} processing item {i} with data: " + "x" * 50
logger.info(message)
time.sleep(0.001)
logger.info(f"Task {task_id} completed item {i} successfully")
def reproduce_log_corruption():
"""重现日志错乱问题"""
print("开始重现多线程日志错乱问题...")
with ThreadPoolExecutor(max_workers=10) as executor:
futures = [executor.submit(worker_task, i) for i in range(5)]
for future in futures:
future.result()
print("任务执行完成,请检查 app.log 文件中的日志错乱情况")
if __name__ == "__main__":
reproduce_log_corruption()
运行上述代码后,你可能会在日志文件中看到类似这样的错乱输出:
2024-01-15 10:30:15,123 [ThreadPoolExecutor-0_0] INFO: Task 0 processing item 5 with data: xxxxxxxxxx2024-01-15 10:30:15,124 [ThreadPoolExecutor-0_1] INFO: Task 1 processing item 3 with data: xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx xxxxxxxxxxxxxxxxxxxxxxxxxx 2024-01-15 10:30:15,125 [ThreadPoolExecutor-0_2] INFO: Task 2 completed item 2 successfully
2. logging 模块的线程安全机制分析
2.1 Handler 级别的线程安全
Python 的 logging 模块在 Handler 级别提供了基本的线程安全保护:
import logging
import threading
import inspect
class ThreadSafeAnalyzer:
"""分析 logging 模块的线程安全机制"""
def __init__(self):
self.logger = logging.getLogger('analyzer')
self.handler = logging.StreamHandler()
self.logger.addHandler(self.handler)
def analyze_handler_locks(self):
"""分析 Handler 的锁机制"""
print("=== Handler 锁机制分析 ===")
if hasattr(self.handler, 'lock'):
print(f"Handler 锁类型:{type(self.handler.lock)}")
print(f"锁对象:{self.handler.lock}")
else:
print("Handler 没有锁机制")
emit_source = inspect.getsource(self.handler.emit)
print(f"emit 方法长度:{len(emit_source.split(chr(10)))} 行")
def analyze_logger_locks(self):
"""分析 Logger 的锁机制"""
print("\n=== Logger 锁机制分析 ===")
(logging, ):
()
thread_safe_methods = [, , ]
method thread_safe_methods:
(.logger, method):
()
():
(logging.StreamHandler):
():
().__init__(stream)
.emit_count =
.lock_wait_time =
():
time
start_time = time.time()
.acquire()
:
lock_acquired_time = time.time()
.lock_wait_time += (lock_acquired_time - start_time)
.emit_count +=
.stream:
msg = .(record)
enhanced_msg =
.stream.write(enhanced_msg + )
.flush()
:
.release()
():
{
: .emit_count,
: .lock_wait_time,
: .lock_wait_time / (, .emit_count)
}
DetailedLockingHandler()
__name__ == :
analyzer = ThreadSafeAnalyzer()
analyzer.analyze_handler_locks()
analyzer.analyze_logger_locks()
2.2 锁竞争的性能影响分析

图 2:不同线程数下的日志性能对比图
3. 深入源码:竞态条件的根本原因
3.1 Handler.emit() 方法的竞态分析
让我们深入分析 logging 模块中最关键的 emit() 方法:
import logging
import threading
import time
from typing import List, Dict, Any
class RaceConditionDemo:
"""演示竞态条件的具体场景"""
def __init__(self):
self.race_conditions: List[Dict[str, Any]] = []
self.lock = threading.Lock()
def simulate_emit_race_condition(self):
"""模拟 emit 方法中的竞态条件"""
class RacyHandler(logging.Handler):
def __init__(self, demo_instance):
super().__init__()
self.demo = demo_instance
self.step_counter = 0
def emit(self, record):
"""模拟有竞态条件的 emit 实现"""
thread_id = threading.current_thread().ident
self.demo.log_step(thread_id, "开始格式化消息")
formatted_msg = self.format(record)
time.sleep(0.001)
self.demo.log_step(thread_id, "准备写入文件")
.demo.log_step(thread_id, )
parts = [formatted_msg[i:i+] i (, (formatted_msg), )]
i, part (parts):
()
time.sleep()
.demo.log_step(thread_id, )
RacyHandler()
():
.lock:
.race_conditions.append({
: thread_id,
: time.time(),
: step
})
RacyHandler()
():
()
sorted_steps = (.race_conditions, key= x: x[])
thread_states = {}
step sorted_steps:
thread_id = step[]
thread_id thread_states:
thread_states[thread_id] = []
thread_states[thread_id].append(step[])
race_patterns = []
i ((sorted_steps) - ):
current = sorted_steps[i]
next_step = sorted_steps[i + ]
(current[] != next_step[] current[] next_step[]):
race_patterns.append({
: ,
: [current[], next_step[]],
: next_step[] - current[]
})
race_patterns
():
(logging.Formatter):
():
().__init__()
.counter =
.thread_info = {}
():
thread_id = threading.current_thread().ident
.counter +=
current_count = .counter
time.sleep()
.thread_info[thread_id] = {
: record.getMessage(),
: current_count
}
formatted =
formatted
logger = logging.getLogger()
handler = logging.StreamHandler()
handler.setFormatter(StatefulFormatter())
logger.addHandler(handler)
logger.setLevel(logging.INFO)
():
i ():
logger.info()
threads = []
i ():
t = threading.Thread(target=worker, args=(i,))
threads.append(t)
t.start()
t threads:
t.join()
__name__ == :
demo = RaceConditionDemo()
handler = demo.simulate_emit_race_condition()
logger = logging.getLogger()
logger.addHandler(handler)
logger.setLevel(logging.INFO)
():
i ():
logger.info()
threads = []
i ():
t = threading.Thread(target=test_worker, args=(i,))
threads.append(t)
t.start()
t threads:
t.join()
patterns = demo.analyze_race_conditions()
()
3.2 I/O 操作的原子性问题

图 3:多线程日志写入时序图
4. 解决方案详解
4.1 方案对比矩阵
解决方案 | 实现复杂度 | 性能影响 | 线程安全性 | 适用场景 | 推荐指数 |
QueueHandler | 中等 | 低 | 高 | 高并发应用 | ⭐⭐⭐⭐⭐ |
自定义锁机制 | 高 | 中等 | 高 | 定制化需求 | ⭐⭐⭐⭐ |
单线程日志 | 低 | 高 | 高 | 简单应用 | ⭐⭐⭐ |
进程级日志 | 高 | 低 | 高 | 分布式系统 | ⭐⭐⭐⭐ |
第三方库 | 低 | 低 | 高 | 快速解决 | ⭐⭐⭐⭐ |
4.2 QueueHandler 解决方案
import logging
import logging.handlers
import queue
import threading
import time
from concurrent.futures import ThreadPoolExecutor
class ThreadSafeLoggingSystem:
"""线程安全的日志系统实现"""
def __init__(self, log_file='safe_app.log', max_queue_size=1000):
self.log_queue = queue.Queue(maxsize=max_queue_size)
self.setup_logging(log_file)
self.start_log_listener()
def setup_logging(self, log_file):
"""设置日志配置"""
queue_handler = logging.handlers.QueueHandler(self.log_queue)
root_logger = logging.getLogger()
root_logger.setLevel(logging.INFO)
root_logger.addHandler(queue_handler)
file_handler = logging.FileHandler(log_file)
console_handler = logging.StreamHandler()
formatter = logging.Formatter(
'%(asctime)s [%(threadName)-12s] %(levelname)-8s: %(message)s'
)
file_handler.setFormatter(formatter)
console_handler.setFormatter(formatter)
self.queue_listener = logging.handlers.QueueListener(
self.log_queue, file_handler, console_handler, respect_handler_level=True
)
def start_log_listener(self):
"""启动日志监听器"""
self.queue_listener.start()
print("日志监听器已启动")
():
.queue_listener.stop()
()
():
logging.getLogger(name)
(logging.handlers.QueueHandler):
():
().__init__(queue_obj)
.max_retries = max_retries
.retry_delay = retry_delay
.dropped_logs =
.total_logs =
():
.total_logs +=
attempt (.max_retries):
:
.enqueue(record)
queue.Full:
attempt < .max_retries - :
time.sleep(.retry_delay)
:
.dropped_logs +=
.handle_dropped_log(record)
Exception e:
attempt < .max_retries - :
time.sleep(.retry_delay)
:
.handleError(record)
():
emergency_msg =
()
():
{
: .total_logs,
: .dropped_logs,
: (.total_logs - .dropped_logs) / (, .total_logs)
}
():
log_system = ThreadSafeLoggingSystem()
logger = log_system.get_logger()
():
i (num_logs):
logger.info()
logger.debug()
i % == :
logger.warning()
time.sleep()
logger.info()
()
start_time = time.time()
ThreadPoolExecutor(max_workers=) executor:
futures = [
executor.submit(intensive_logging_task, i, ) i ()
]
future futures:
future.result()
end_time = time.time()
()
log_system.stop_log_listener()
log_system
__name__ == :
test_thread_safe_logging()
4.3 自定义同步机制
import logging
import threading
import time
import contextlib
from typing import Optional, Dict, Any
class SynchronizedHandler(logging.Handler):
"""完全同步的日志处理器"""
def __init__(self, target_handler: logging.Handler):
super().__init__()
self.target_handler = target_handler
self.emit_lock = threading.RLock()
self.format_lock = threading.RLock()
self.stats = {
'total_emits': 0,
'lock_wait_time': 0.0,
'max_wait_time': 0.0,
'concurrent_attempts': 0
}
def emit(self, record):
"""完全同步的 emit 实现"""
start_wait = time.time()
with self.emit_lock:
wait_time = time.time() - start_wait
self.stats['lock_wait_time'] += wait_time
self.stats['max_wait_time'] = max(self.stats['max_wait_time'], wait_time)
self.stats['total_emits'] += 1
try:
.format_lock:
.formatter:
record.message = record.getMessage()
formatted = .formatter.(record)
:
formatted = record.getMessage()
.target_handler.emit(record)
Exception e:
.handleError(record)
() -> [, ]:
total_emits = (, .stats[])
{
: .stats[],
: (.stats[] / total_emits) * ,
: .stats[] * ,
: .stats[]
}
(logging.Handler):
():
().__init__()
.target_handler = target_handler
.batch_size = batch_size
.flush_interval = flush_interval
.buffer = []
.buffer_lock = threading.Lock()
.last_flush = time.time()
.flush_thread = threading.Thread(target=._flush_worker, daemon=)
.flush_thread.start()
.shutdown_event = threading.Event()
():
.buffer_lock:
.buffer.append(record)
((.buffer) >= .batch_size time.time() - .last_flush >= .flush_interval):
._flush_buffer()
():
.buffer:
records_to_flush = .buffer.copy()
.buffer.clear()
.last_flush = time.time()
record records_to_flush:
:
.target_handler.emit(record)
Exception:
.handleError(record)
():
.shutdown_event.is_set():
time.sleep(.flush_interval)
.buffer_lock:
.buffer time.time() - .last_flush >= .flush_interval:
._flush_buffer()
():
.shutdown_event.()
.buffer_lock:
._flush_buffer()
().close()
():
start_time = time.time()
start_memory = threading.active_count()
()
:
:
end_time = time.time()
end_memory = threading.active_count()
()
()
()
():
base_handler = logging.FileHandler()
sync_handler = SynchronizedHandler(base_handler)
logger = logging.getLogger()
logger.addHandler(sync_handler)
logger.setLevel(logging.INFO)
():
i ():
logger.info()
time.sleep()
performance_monitor():
threads = []
i ():
t = threading.Thread(target=sync_worker, args=(i,))
threads.append(t)
t.start()
t threads:
t.join()
stats = sync_handler.get_performance_stats()
()
__name__ == :
test_synchronization_solutions()
4.4 异步日志队列的高级实现
import asyncio
import logging
import threading
import time
from typing import Optional, Callable, Any
from concurrent.futures import ThreadPoolExecutor
import json
class AsyncLogProcessor:
"""异步日志处理器"""
def __init__(self, batch_size: int = 50, flush_interval: float = 0.5):
self.batch_size = batch_size
self.flush_interval = flush_interval
self.log_queue = asyncio.Queue()
self.handlers = []
self.running = False
self.stats = {
'processed': 0,
'batches': 0,
'errors': 0
}
def add_handler(self, handler: logging.Handler):
"""添加处理器"""
self.handlers.append(handler)
async def start(self):
"""启动异步处理"""
self.running = True
await asyncio.gather(
self._batch_processor(),
._periodic_flush()
)
():
.running =
._flush_remaining()
():
.log_queue.put(record)
():
batch = []
.running:
:
(batch) < .batch_size .running:
:
record = asyncio.wait_for(
.log_queue.get(), timeout=
)
batch.append(record)
asyncio.TimeoutError:
batch:
._process_batch(batch)
batch.clear()
Exception e:
.stats[] +=
()
():
.stats[] +=
.stats[] += (batch)
loop = asyncio.get_event_loop()
ThreadPoolExecutor(max_workers=) executor:
tasks = []
handler .handlers:
task = loop.run_in_executor(
executor, ._write_batch_to_handler, handler, batch
)
tasks.append(task)
asyncio.gather(*tasks, return_exceptions=)
():
record batch:
:
handler.emit(record)
Exception e:
handler.handleError(record)
():
.running:
asyncio.sleep(.flush_interval)
handler .handlers:
(handler, ):
handler.flush()
():
remaining = []
.log_queue.empty():
:
record = .log_queue.get_nowait()
remaining.append(record)
asyncio.QueueEmpty:
remaining:
._process_batch(remaining)
(logging.Handler):
():
().__init__()
.async_processor = async_processor
.loop =
._setup_event_loop()
():
():
.loop = asyncio.new_event_loop()
asyncio.set_event_loop(.loop)
.loop.run_until_complete(.async_processor.start())
.async_thread = threading.Thread(target=run_async_processor, daemon=)
.async_thread.start()
time.sleep()
():
.loop .loop.is_closed():
future = asyncio.run_coroutine_threadsafe(
.async_processor.log_async(record), .loop
)
:
future.result(timeout=)
Exception e:
.handleError(record)
():
.loop .loop.is_closed():
asyncio.run_coroutine_threadsafe(
.async_processor.stop(), .loop
)
().close()
5. 性能优化与最佳实践
5.1 日志性能优化策略

图 4:日志解决方案性能与复杂度象限图
5.2 生产环境配置建议
import logging
import logging.config
import os
from pathlib import Path
def create_production_logging_config():
"""创建生产环境日志配置"""
log_dir = Path("logs")
log_dir.mkdir(exist_ok=True)
config = {
'version': 1,
'disable_existing_loggers': False,
'formatters': {
'detailed': {
'format': '%(asctime)s [%(process)d:%(thread)d] %(name)s %(levelname)s: %(message)s',
'datefmt': '%Y-%m-%d %H:%M:%S'
},
'simple': {
'format': '%(levelname)s: %(message)s'
},
'json': {
'format': '{"timestamp": "%(asctime)s", "level": "%(levelname)s", "logger": "%(name)s", "message": "%(message)s", "thread": "%(thread)d"}',
'datefmt': '%Y-%m-%dT%H:%M:%S'
}
},
'handlers': {
'console': {
'class': 'logging.StreamHandler',
'level': 'INFO',
'formatter': 'simple',
'stream': 'ext://sys.stdout'
},
'file_info': {
'class': 'logging.handlers.RotatingFileHandler',
'level': ,
: ,
: (log_dir / ),
: ,
: ,
:
},
: {
: ,
: ,
: ,
: (log_dir / ),
: ,
: ,
:
},
: {
: ,
: {
: ,
:
}
}
},
: {
: {
: ,
: []
},
: {
: ,
: [, , ],
:
},
: {
: ,
: [],
:
}
}
}
config
:
():
.config = create_production_logging_config()
.setup_logging()
.setup_queue_listener()
():
logging.config.dictConfig(.config)
():
queue
logging.handlers
root_logger = logging.getLogger()
queue_handler =
handler root_logger.handlers:
(handler, logging.handlers.QueueHandler):
queue_handler = handler
queue_handler:
file_handler = logging.handlers.RotatingFileHandler(
, maxBytes=, backupCount=
)
file_handler.setFormatter(
logging.Formatter(
)
)
.queue_listener = logging.handlers.QueueListener(
queue_handler.queue, file_handler, respect_handler_level=
)
.queue_listener.start()
() -> logging.Logger:
logging.getLogger(name)
():
(, ):
.queue_listener.stop()
logging.shutdown()
():
log_manager = ProductionLoggingManager()
app_logger = log_manager.get_logger()
perf_logger = log_manager.get_logger()
():
app_logger.info()
i ():
app_logger.debug()
i % == :
perf_logger.info()
i == :
app_logger.warning()
i == :
:
ValueError()
ValueError e:
app_logger.error(, exc_info=)
app_logger.info()
threads = []
i ():
t = threading.Thread(target=simulate_application_work)
threads.append(t)
t.start()
t threads:
t.join()
log_manager.shutdown()
__name__ == :
demonstrate_production_logging()
6. 监控与诊断
6.1 日志系统健康监控

图 5:日志系统监控与维护甘特图
6.2 诊断工具实现
import logging
import threading
import time
import psutil
import json
from typing import Dict, List, Any
from dataclasses import dataclass, asdict
from datetime import datetime, timedelta
@dataclass
class LoggingMetrics:
"""日志系统指标"""
timestamp: str
queue_size: int
queue_capacity: int
logs_per_second: float
error_rate: float
memory_usage_mb: float
thread_count: int
handler_stats: Dict[str, Any]
class LoggingDiagnostics:
"""日志系统诊断工具"""
def __init__(self, monitoring_interval: float = 1.0):
self.monitoring_interval = monitoring_interval
self.metrics_history: List[LoggingMetrics] = []
self.is_monitoring = False
self.log_counter = 0
self.error_counter = 0
self.last_reset_time = time.time()
self.monitor_thread = None
():
.is_monitoring =
.monitor_thread = threading.Thread(target=._monitoring_loop, daemon=)
.monitor_thread.start()
()
():
.is_monitoring =
.monitor_thread:
.monitor_thread.join()
()
():
.is_monitoring:
:
metrics = ._collect_metrics()
.metrics_history.append(metrics)
(.metrics_history) > :
.metrics_history = .metrics_history[-:]
._check_alerts(metrics)
Exception e:
()
time.sleep(.monitoring_interval)
() -> LoggingMetrics:
current_time = time.time()
time_diff = current_time - .last_reset_time
logs_per_second = .log_counter / (time_diff, )
error_rate = .error_counter / (.log_counter, )
process = psutil.Process()
memory_usage = process.memory_info().rss / /
thread_count = threading.active_count()
queue_size, queue_capacity = ._get_queue_info()
handler_stats = ._get_handler_stats()
metrics = LoggingMetrics(
timestamp=datetime.now().isoformat(),
queue_size=queue_size,
queue_capacity=queue_capacity,
logs_per_second=logs_per_second,
error_rate=error_rate,
memory_usage_mb=memory_usage,
thread_count=thread_count,
handler_stats=handler_stats
)
.log_counter =
.error_counter =
.last_reset_time = current_time
metrics
() -> :
:
root_logger = logging.getLogger()
handler root_logger.handlers:
(handler, ):
queue = handler.queue
(queue, ) (queue, ):
queue.qsize(), queue.maxsize
,
:
,
() -> [, ]:
stats = {}
root_logger = logging.getLogger()
i, handler (root_logger.handlers):
handler_name =
handler_stats = {
: (handler).__name__,
: handler.level,
: (handler.formatter).__name__ handler.formatter
}
(handler, ):
handler_stats.update(handler.get_stats())
stats[handler_name] = handler_stats
stats
():
alerts = []
metrics.queue_capacity > :
queue_usage = metrics.queue_size / metrics.queue_capacity
queue_usage > :
alerts.append()
metrics.error_rate > :
alerts.append()
metrics.memory_usage_mb > :
alerts.append()
metrics.thread_count > :
alerts.append()
alerts:
()
():
.log_counter +=
():
.error_counter +=
() -> [LoggingMetrics]:
cutoff_time = datetime.now() - timedelta(minutes=minutes)
recent_metrics = []
metric (.metrics_history):
metric_time = datetime.fromisoformat(metric.timestamp)
metric_time >= cutoff_time:
recent_metrics.append(metric)
:
((recent_metrics))
() -> :
.metrics_history:
recent_metrics = .get_recent_metrics()
recent_metrics:
avg_logs_per_sec = (m.logs_per_second m recent_metrics) / (recent_metrics)
avg_error_rate = (m.error_rate m recent_metrics) / (recent_metrics)
avg_memory = (m.memory_usage_mb m recent_metrics) / (recent_metrics)
max_queue_size = (m.queue_size m recent_metrics)
report =
report
(logging.Handler):
():
().__init__()
.target_handler = target_handler
.diagnostics = diagnostics
():
:
.target_handler.emit(record)
.diagnostics.increment_log_count()
Exception e:
.diagnostics.increment_error_count()
.handleError(record)
():
diagnostics = LoggingDiagnostics(monitoring_interval=)
logger = logging.getLogger()
base_handler = logging.StreamHandler()
diagnostic_handler = DiagnosticHandler(base_handler, diagnostics)
logger.addHandler(diagnostic_handler)
logger.setLevel(logging.INFO)
diagnostics.start_monitoring()
:
():
i ():
logger.info()
time.sleep()
i % == :
:
ValueError()
ValueError:
logger.error(, exc_info=)
threads = []
i ():
t = threading.Thread(target=log_worker, args=(i,))
threads.append(t)
t.start()
time.sleep()
(diagnostics.generate_report())
t threads:
t.join()
()
(diagnostics.generate_report())
:
diagnostics.stop_monitoring()
__name__ == :
demonstrate_logging_diagnostics()
7. 总结与展望
经过深入的分析和实践,我们可以看到 Python 多线程日志错乱问题的复杂性远超表面现象。这个问题不仅涉及到 logging 模块的内部实现机制,还关联到操作系统的 I/O 调度、文件系统的原子性保证以及 Python GIL 的影响。
通过本文的探索,我发现解决多线程日志错乱的关键在于理解并发访问的本质。虽然 Python 的 logging 模块在 Handler 级别提供了基本的线程安全保护,但在高并发场景下,特别是涉及到复杂的格式化操作和频繁的 I/O 写入时,仍然存在竞态条件的风险。我们提供的多种解决方案各有优劣:QueueHandler 适合大多数生产环境,异步处理器适合高性能要求的场景,而自定义同步机制则适合有特殊需求的定制化应用。
在实际项目中,我建议采用分层的日志架构:应用层使用简单的日志接口,中间层负责缓冲和批处理,底层负责实际的 I/O 操作。这样不仅能够有效避免并发问题,还能提供更好的性能和可维护性。同时,完善的监控和诊断机制是保证日志系统稳定运行的重要保障。
随着 Python 生态系统的不断发展,我们也看到了更多优秀的第三方日志库,如 structlog、loguru 等,它们在设计之初就考虑了并发安全性和性能优化。未来的日志系统将更加注重云原生环境的适配、结构化日志的支持以及与可观测性平台的集成。作为开发者,我们需要持续关注这些技术发展,选择最适合自己项目需求的解决方案。
"在多线程的世界里,日志不仅是程序的记录者,更是并发安全的试金石。只有深入理解其内在机制,才能构建真正可靠的日志系统。"
参考链接
- Python 官方文档 - logging 模块
- Python Enhancement Proposal 282 - logging 配置
- Python 多线程编程指南
- logging.handlers 模块详解
- 高性能 Python 日志最佳实践
微信扫一扫,关注极客日志
微信公众号「极客日志」,在微信中扫描左侧二维码关注。展示文案:极客日志 zeeklog
相关免费在线工具
- 加密/解密文本
使用加密算法(如AES、TripleDES、Rabbit或RC4)加密和解密文本明文。 在线工具,加密/解密文本在线工具,online
- curl 转代码
解析常见 curl 参数并生成 fetch、axios、PHP curl 或 Python requests 示例代码。 在线工具,curl 转代码在线工具,online
- Base64 字符串编码/解码
将字符串编码和解码为其 Base64 格式表示形式即可。 在线工具,Base64 字符串编码/解码在线工具,online
- Base64 文件转换器
将字符串、文件或图像转换为其 Base64 表示形式。 在线工具,Base64 文件转换器在线工具,online
- Markdown转HTML
将 Markdown(GFM)转为 HTML 片段,浏览器内 marked 解析;与 HTML转Markdown 互为补充。 在线工具,Markdown转HTML在线工具,online
- HTML转Markdown
将 HTML 片段转为 GitHub Flavored Markdown,支持标题、列表、链接、代码块与表格等;浏览器内处理,可链接预填。 在线工具,HTML转Markdown在线工具,online
if
hasattr
'_lock'
print
f"全局锁:{logging._lock}"
'_log'
'handle'
'callHandlers'
for
in
if
hasattr
self
print
f"线程安全方法:{method}"
def
custom_handler_with_detailed_locking
self
"""自定义 Handler 展示详细的锁机制"""
class
DetailedLockingHandler
def
__init__
self, stream=None
super
self
0
self
0
def
emit
self, record
"""重写 emit 方法,添加详细的锁分析"""
import
self
try
self
self
1
if
self
self
format
f"[EMIT#{self.emit_count}|WAIT:{(lock_acquired_time - start_time)*1000:.2f}ms] {msg}"
self
'\n'
self
finally
self
def
get_stats
self
"""获取锁统计信息"""
return
'total_emits'
self
'total_wait_time'
self
'avg_wait_time'
self
max
1
self
return
if
"__main__"
self
f"写入消息:{formatted_msg[:50]}..."
10
for
in
range
0
len
10
for
in
enumerate
print
f"[Thread-{thread_id}] Part {i}: {part}"
0.0001
self
"写入完成"
return
self
def
log_step
self, thread_id: int, step: str
"""记录执行步骤"""
with
self
self
'thread_id'
'timestamp'
'step'
return
self
def
analyze_race_conditions
self
"""分析竞态条件"""
print
"\n=== 竞态条件分析 ==="
sorted
self
lambda
'timestamp'
for
in
'thread_id'
if
not
in
'step'
for
in
range
len
1
1
if
'thread_id'
'thread_id'
and
'写入'
in
'step'
and
'写入'
in
'step'
'pattern'
'concurrent_write'
'threads'
'thread_id'
'thread_id'
'time_gap'
'timestamp'
'timestamp'
return
def
demonstrate_formatter_race_condition
"""演示 Formatter 中的竞态条件"""
class
StatefulFormatter
"""有状态的格式化器,容易产生竞态条件"""
def
__init__
self
super
self
0
self
def
format
self, record
"""非线程安全的格式化方法"""
self
1
self
0.001
self
'last_message'
'count'
f"[{current_count:04d}] {record.levelname}: {record.getMessage()}"
return
'race_test'
def
worker
worker_id
for
in
range
10
f"Worker {worker_id} message {i}"
for
in
range
5
for
in
if
"__main__"
'race_demo'
def
test_worker
worker_id
for
in
range
3
f"Worker {worker_id} executing task {i}"
for
in
range
3
for
in
print
f"检测到 {len(patterns)} 个竞态模式"
def
stop_log_listener
self
"""停止日志监听器"""
self
print
"日志监听器已停止"
def
get_logger
self, name
"""获取日志器"""
return
class
AdvancedQueueHandler
"""增强的队列处理器"""
def
__init__
self, queue_obj, max_retries=3, retry_delay=0.1
super
self
self
self
0
self
0
def
emit
self, record
"""重写 emit 方法,添加重试机制"""
self
1
for
in
range
self
try
self
return
except
if
self
1
self
continue
else
self
1
self
break
except
as
if
self
1
self
continue
else
self
break
def
handle_dropped_log
self, record
"""处理被丢弃的日志"""
f"DROPPED LOG: {record.getMessage()}"
print
f"WARNING: {emergency_msg}"
def
get_stats
self
"""获取统计信息"""
return
'total_logs'
self
'dropped_logs'
self
'success_rate'
self
self
max
1
self
def
test_thread_safe_logging
"""测试线程安全的日志系统"""
'test_app'
def
intensive_logging_task
task_id, num_logs=100
"""密集日志记录任务"""
for
in
range
f"Task {task_id} - Processing item {i}"
f"Task {task_id} - Debug info for item {i}"
if
10
0
f"Task {task_id} - Checkpoint at item {i}"
0.001
f"Task {task_id} completed successfully"
print
"开始线程安全日志测试..."
with
20
as
50
for
in
range
10
for
in
print
f"测试完成,耗时:{end_time - start_time:.2f} 秒"
return
if
"__main__"
with
self
if
self
self
format
else
self
except
as
self
def
get_performance_stats
self
Dict
str
Any
"""获取性能统计"""
max
1
self
'total_emits'
return
'total_emits'
self
'total_emits'
'avg_wait_time_ms'
self
'lock_wait_time'
1000
'max_wait_time_ms'
self
'max_wait_time'
1000
'total_wait_time_s'
self
'lock_wait_time'
class
BatchingHandler
"""批量处理日志的处理器"""
def
__init__
self, target_handler: logging.Handler, batch_size: int = 100, flush_interval: float = 1.0
super
self
self
self
self
self
self
self
self
True
self
self
def
emit
self, record
"""批量 emit 实现"""
with
self
self
if
len
self
self
or
self
self
self
def
_flush_buffer
self
"""刷新缓冲区"""
if
not
self
return
self
self
self
for
in
try
self
except
self
def
_flush_worker
self
"""后台刷新工作线程"""
while
not
self
self
with
self
if
self
and
self
self
self
def
close
self
"""关闭处理器"""
self
set
with
self
self
super
@contextlib.contextmanager
def
performance_monitor
name: str
"""性能监控上下文管理器"""
print
f"开始监控:{name}"
try
yield
finally
print
f"监控结束:{name}"
print
f"执行时间:{end_time - start_time:.3f}秒"
print
f"线程数变化:{start_memory} -> {end_memory}"
def
test_synchronization_solutions
"""测试各种同步解决方案"""
'sync_test.log'
'sync_test'
def
sync_worker
worker_id
for
in
range
50
f"Sync worker {worker_id} message {i}"
0.001
with
"同步处理器测试"
for
in
range
10
for
in
print
f"同步处理器统计:{stats}"
if
"__main__"
self
async
def
stop
self
"""停止异步处理"""
self
False
await
self
async
def
log_async
self, record: logging.LogRecord
"""异步记录日志"""
await
self
async
def
_batch_processor
self
"""批量处理器"""
while
self
try
while
len
self
and
self
try
await
self
0.1
except
break
if
await
self
except
as
self
'errors'
1
print
f"批量处理错误:{e}"
async
def
_process_batch
self, batch
"""处理一批日志记录"""
self
'batches'
1
self
'processed'
len
with
2
as
for
in
self
self
await
True
def
_write_batch_to_handler
self, handler: logging.Handler, batch
"""将批量记录写入处理器"""
for
in
try
except
as
async
def
_periodic_flush
self
"""定期刷新"""
while
self
await
self
for
in
self
if
hasattr
'flush'
async
def
_flush_remaining
self
"""刷新剩余日志"""
while
not
self
try
self
except
break
if
await
self
class
AsyncLogHandler
"""异步日志处理器适配器"""
def
__init__
self, async_processor: AsyncLogProcessor
super
self
self
None
self
def
_setup_event_loop
self
"""设置事件循环"""
def
run_async_processor
self
self
self
self
self
True
self
0.1
def
emit
self, record
"""发送日志记录到异步处理器"""
if
self
and
not
self
self
self
try
0.1
except
as
self
def
close
self
"""关闭处理器"""
if
self
and
not
self
self
self
super
'INFO'
'formatter'
'detailed'
'filename'
str
'app.log'
'maxBytes'
10485760
'backupCount'
5
'encoding'
'utf8'
'file_error'
'class'
'logging.handlers.RotatingFileHandler'
'level'
'ERROR'
'formatter'
'detailed'
'filename'
str
'error.log'
'maxBytes'
10485760
'backupCount'
10
'encoding'
'utf8'
'queue_handler'
'class'
'logging.handlers.QueueHandler'
'queue'
'()'
'queue.Queue'
'maxsize'
1000
'loggers'
''
'level'
'INFO'
'handlers'
'queue_handler'
'app'
'level'
'DEBUG'
'handlers'
'console'
'file_info'
'file_error'
'propagate'
False
'performance'
'level'
'INFO'
'handlers'
'file_info'
'propagate'
False
return
class
ProductionLoggingManager
"""生产环境日志管理器"""
def
__init__
self
self
self
self
def
setup_logging
self
"""设置日志配置"""
self
def
setup_queue_listener
self
"""设置队列监听器"""
import
import
None
for
in
if
isinstance
break
if
'logs/queue_app.log'
10485760
5
'%(asctime)s [%(process)d:%(thread)d] %(name)s %(levelname)s: %(message)s'
self
True
self
def
get_logger
self, name: str
"""获取日志器"""
return
def
shutdown
self
"""关闭日志系统"""
if
hasattr
self
'queue_listener'
self
def
demonstrate_production_logging
"""演示生产环境日志使用"""
'app.service'
'performance'
def
simulate_application_work
"""模拟应用程序工作"""
"应用程序启动"
for
in
range
100
f"处理任务 {i}"
if
20
0
f"性能检查点:已处理 {i} 个任务"
if
50
"达到中间检查点"
if
75
try
raise
"模拟业务错误"
except
as
f"业务错误:{e}"
True
"应用程序完成"
for
in
range
5
for
in
if
"__main__"
def
start_monitoring
self
"""开始监控"""
self
True
self
self
True
self
print
"日志系统监控已启动"
def
stop_monitoring
self
"""停止监控"""
self
False
if
self
self
print
"日志系统监控已停止"
def
_monitoring_loop
self
"""监控循环"""
while
self
try
self
self
if
len
self
1000
self
self
500
self
except
as
print
f"监控错误:{e}"
self
def
_collect_metrics
self
"""收集指标"""
self
self
max
1
self
max
self
1
1024
1024
self
self
self
0
self
0
self
return
def
_get_queue_info
self
tuple
"""获取队列信息"""
try
for
in
if
hasattr
'queue'
if
hasattr
'qsize'
and
hasattr
'maxsize'
return
return
0
0
except
return
0
0
def
_get_handler_stats
self
Dict
str
Any
"""获取处理器统计信息"""
for
in
enumerate
f"{type(handler).__name__}_{i}"
'type'
type
'level'
'formatter'
type
if
else
None
if
hasattr
'get_stats'
return
def
_check_alerts
self, metrics: LoggingMetrics
"""检查告警条件"""
if
0
if
0.8
f"队列使用率过高:{queue_usage:.1%}"
if
0.05
f"错误率过高:{metrics.error_rate:.1%}"
if
500
f"内存使用过高:{metrics.memory_usage_mb:.1f}MB"
if
50
f"线程数过多:{metrics.thread_count}"
if
print
f"[ALERT] {datetime.now()}: {'; '.join(alerts)}"
def
increment_log_count
self
"""增加日志计数"""
self
1
def
increment_error_count
self
"""增加错误计数"""
self
1
def
get_recent_metrics
self, minutes: int = 5
List
"""获取最近的指标"""
for
in
reversed
self
if
else
break
return
list
reversed
def
generate_report
self
str
"""生成诊断报告"""
if
not
self
return
"暂无监控数据"
self
10
if
not
return
"最近 10 分钟无监控数据"
sum
for
in
len
sum
for
in
len
sum
for
in
len
max
for
in
f""" === 日志系统诊断报告 ===
时间范围:最近 10 分钟
数据点数:{len(recent_metrics)}
性能指标:
- 平均日志速率:{avg_logs_per_sec:.2f} logs/sec
- 平均错误率:{avg_error_rate:.2%}
- 平均内存使用:{avg_memory:.1f} MB
- 最大队列长度:{max_queue_size}
当前状态:
- 线程数:{recent_metrics[-1].thread_count}
- 队列使用:{recent_metrics[-1].queue_size}/{recent_metrics[-1].queue_capacity}
- 内存使用:{recent_metrics[-1].memory_usage_mb:.1f} MB
处理器状态:{json.dumps(recent_metrics[-1].handler_stats, indent=2, ensure_ascii=False)}
"""
return
class
DiagnosticHandler
"""带诊断功能的处理器包装器"""
def
__init__
self, target_handler: logging.Handler, diagnostics: LoggingDiagnostics
super
self
self
def
emit
self, record
"""发送日志记录"""
try
self
self
except
as
self
self
def
demonstrate_logging_diagnostics
"""演示日志诊断功能"""
0.5
'diagnostic_test'
try
def
log_worker
worker_id
for
in
range
100
f"Worker {worker_id} message {i}"
0.01
if
30
0
try
raise
"测试错误"
except
"模拟错误"
True
for
in
range
3
5
print
for
in
print
"\n=== 最终报告 ==="
print
finally
if
"__main__"