跳到主要内容
Python 观察者模式:构建解耦的事件系统 | 极客日志
Python SaaS 算法
Python 观察者模式:构建解耦的事件系统 Python 观察者模式,解决组件耦合问题。通过构建同步与异步 EventBus,实现优先级调度、错误隔离及一次性监听等功能。涵盖电商场景实战、Django/Flask 信号集成、内存泄漏预防及单元测试策略,并延伸至 CQRS 与事件溯源架构,提供生产级事件驱动系统设计方案。
指针猎手 发布于 2026/3/26 更新于 2026/5/21 25K 浏览观察者模式全解析:用 Python 构建优雅的事件系统,让组件彻底解耦
一、引言:当组件之间的'耦合'成为噩梦
想象这样一个场景:你在开发一个电商系统,用户成功下单之后需要做很多事——发送短信通知、更新库存、记录日志、推送积分、触发物流系统……
初级写法是这样的:
def place_order (order ):
save_order_to_db(order)
send_sms_notification(order)
update_inventory(order)
write_log(order)
add_points(order.user_id)
trigger_logistics(order)
push_to_dashboard(order)
这段代码有几个致命问题:place_order 函数职责爆炸,承担了所有后续处理;每次新增需求都要打开核心函数修改,稍有不慎就引入 Bug;各模块之间高度耦合,完全无法独立测试。
这就是紧耦合 的代价。
观察者模式(Observer Pattern) 是解决这类问题的经典方案。它建立了一种「一对多」的依赖关系:当一个对象(被观察者/主题)状态变化时,所有依赖它的对象(观察者)都会自动收到通知并响应,而被观察者完全不需要知道观察者是谁、有多少个、做了什么。
这正是现代事件驱动架构的哲学核心:「我只管发出信号,谁响应、怎么响应,与我无关。」
本文将带你从零构建一个完整的 Python 事件系统,覆盖同步事件、异步事件、优先级调度、错误隔离等生产级特性,并结合真实项目案例展示其威力。
二、观察者模式基础:概念与最小实现
2.1 三个核心角色
Subject(主题/被观察者) :维护观察者列表,提供注册/注销接口,状态变化时通知所有观察者。
Observer(观察者) :定义接收通知的接口,具体逻辑由子类实现。
ConcreteObserver(具体观察者) :实现观察者接口,对特定事件做出响应。
2.2 最小骨架实现
from abc import ABC, abstractmethod
class Observer (ABC ):
@abstractmethod
def update (self, event: str , data: dict ) -> None :
pass
class Subject (ABC ):
( ):
._observers: [Observer] = []
( ) -> :
._observers.append(observer)
( ) -> :
._observers.remove(observer)
( ) -> :
observer ._observers:
observer.update(event, data)
( ):
( ) -> :
( )
.notify( , order)
( ):
( ) -> :
event == :
( )
( ):
( ) -> :
event == :
( )
order_system = OrderSystem()
order_system.attach(SMSObserver())
order_system.attach(InventoryObserver())
order_system.place_order({ : , : , : [ , ]})
def
__init__
self
self
list
def
attach
self, observer: Observer
None
self
def
detach
self, observer: Observer
None
self
def
notify
self, event: str , data: dict
None
for
in
self
class
OrderSystem
Subject
def
place_order
self, order: dict
None
print
f"[订单系统] 订单 {order['id' ]} 创建成功"
self
'order_placed'
class
SMSObserver
Observer
def
update
self, event: str , data: dict
None
if
'order_placed'
print
f"[短信服务] 发送短信给用户 {data['user_id' ]} "
class
InventoryObserver
Observer
def
update
self, event: str , data: dict
None
if
'order_placed'
print
f"[库存系统] 扣减商品库存:{data['items' ]} "
'id'
'ORD001'
'user_id'
'U100'
'items'
'商品 A'
'商品 B'
[订单系统] 订单 ORD001 创建成功
[短信服务] 发送短信给用户 U100
[库存系统] 扣减商品库存:['商品 A' , '商品 B' ]
核心业务代码与后续处理完全分离,新增观察者零改动主题。这就是观察者模式的魅力所在。
三、进阶实战:构建生产级事件总线 基础实现过于简单,真实项目需要更强大的能力。我们来构建一个功能完整的 EventBus(事件总线) ,支持:事件类型过滤、优先级调度、一次性监听、错误隔离、装饰器注册。
3.1 事件对象设计 from dataclasses import dataclass, field
from typing import Any
from datetime import datetime
import uuid
@dataclass
class Event :
"""事件基类"""
name: str
data: Any = None
source: str = ''
event_id: str = field(default_factory=lambda : str (uuid.uuid4())[:8 ])
timestamp: datetime = field(default_factory=datetime.now)
def __repr__ (self ):
return f"Event(name={self.name!r} , id={self.event_id} , source={self.source!r} )"
@dataclass
class OrderPlacedEvent (Event ):
name: str = 'order.placed'
@dataclass
class UserRegisteredEvent (Event ):
name: str = 'user.registered'
@dataclass
class PaymentSuccessEvent (Event ):
name: str = 'payment.success'
3.2 功能完整的 EventBus from typing import Callable , Optional
import logging
from collections import defaultdict
logger = logging.getLogger(__name__)
EventHandler = Callable [[Event], None ]
class HandlerEntry :
"""封装处理器元数据"""
def __init__ (self, handler: EventHandler, priority: int = 0 , once: bool = False , name: str = '' ):
self .handler = handler
self .priority = priority
self .once = once
self .name = name or handler.__name__
self .call_count = 0
def __repr__ (self ):
return f"HandlerEntry(name={self.name!r} , priority={self.priority} , once={self.once} )"
class EventBus :
"""
生产级事件总线
- 支持事件类型订阅
- 支持通配符订阅('*' 订阅所有事件)
- 支持优先级调度
- 支持一次性监听
- 支持错误隔离(单个处理器异常不影响其他处理器)
"""
def __init__ (self, error_handling: str = 'log' ):
"""
:param error_handling: 'log'=记录日志继续 | 'raise'=抛出异常停止
"""
self ._handlers: dict [str , list [HandlerEntry]] = defaultdict(list )
self ._error_handling = error_handling
self ._event_history: list [Event] = []
self ._max_history = 100
def on (self, event_name: str , priority: int = 0 , once: bool = False ) -> Callable :
"""装饰器:注册事件处理器"""
def decorator (func: EventHandler ) -> EventHandler:
self .subscribe(event_name, func, priority=priority, once=once)
return func
return decorator
def subscribe (self, event_name: str , handler: EventHandler, priority: int = 0 , once: bool = False ) -> None :
"""注册事件处理器"""
entry = HandlerEntry(handler, priority=priority, once=once)
self ._handlers[event_name].append(entry)
self ._handlers[event_name].sort(key=lambda e: e.priority, reverse=True )
logger.debug(f"订阅事件 '{event_name} ': {entry.name} (priority={priority} )" )
def unsubscribe (self, event_name: str , handler: EventHandler ) -> bool :
"""注销事件处理器,返回是否成功"""
entries = self ._handlers.get(event_name, [])
before = len (entries)
self ._handlers[event_name] = [e for e in entries if e.handler != handler]
return len (self ._handlers[event_name]) < before
def emit (self, event: Event ) -> int :
"""
发布事件
:return: 触发的处理器数量
"""
self ._event_history.append(event)
if len (self ._event_history) > self ._max_history:
self ._event_history.pop(0 )
entries = (self ._handlers.get(event.name, []) + self ._handlers.get('*' , []))
seen = set ()
unique_entries = []
for e in entries:
if id (e.handler) not in seen:
seen.add(id (e.handler))
unique_entries.append(e)
triggered = 0
to_remove = []
for entry in unique_entries:
try :
entry.handler(event)
entry.call_count += 1
triggered += 1
if entry.once:
to_remove.append((event.name, entry))
except Exception as exc:
if self ._error_handling == 'raise' :
raise
logger.error(f"处理器 '{entry.name} ' 处理事件 '{event.name} ' 时出错:{exc} " )
for evt_name, entry in to_remove:
self ._handlers[evt_name] = [e for e in self ._handlers[evt_name] if e is not entry]
return triggered
def emit_by_name (self, event_name: str , data: Any = None , source: str = '' ) -> int :
"""便捷方法:直接用事件名发布"""
return self .emit(Event(name=event_name, data=data, source=source))
@property
def stats (self ) -> dict :
"""统计信息"""
return {
'subscribed_events' : list (self ._handlers.keys()),
'total_handlers' : sum (len (v) for v in self ._handlers.values()),
'history_count' : len (self ._event_history),
}
3.3 完整使用示例:电商下单流程
bus = EventBus(error_handling='log' )
@bus.on('order.placed' , priority=100 )
def handle_inventory (event: Event ):
order = event.data
print (f"[库存服务] 扣减库存:{[i['name' ] for i in order['items' ]]} " )
@bus.on('order.placed' , priority=80 )
def handle_sms (event: Event ):
order = event.data
print (f"[短信服务] 通知用户 {order['user_id' ]} : 您的订单 {order['id' ]} 已确认" )
@bus.on('order.placed' , priority=60 )
def handle_points (event: Event ):
order = event.data
points = int (order['total' ] * 0.1 )
print (f"[积分系统] 用户 {order['user_id' ]} 获得 {points} 积分" )
@bus.on('order.placed' , priority=40 )
def handle_logistics (event: Event ):
order = event.data
print (f"[物流系统] 创建发货任务:{order['address' ]} " )
@bus.on('order.placed' , priority=20 )
def handle_log (event: Event ):
print (f"[日志系统] 记录事件:{event} " )
@bus.on('*' , priority=0 )
def global_monitor (event: Event ):
print (f"[监控中心] 事件上报:{event.name} @ {event.timestamp.strftime('%H:%M:%S' )} " )
@bus.on('order.placed' , once=True )
def first_order_bonus (event: Event ):
print (f"[营销系统] 🎉 首单礼包已发放给用户 {event.data['user_id' ]} !" )
print ("=" *50 )
print ("【第一笔订单】" )
bus.emit(Event(
name='order.placed' ,
source='order_service' ,
data={'id' : 'ORD_20250001' , 'user_id' : 'U_888' , 'items' : [{'name' : '机械键盘' , 'qty' : 1 }, {'name' : '鼠标垫' , 'qty' : 2 }], 'total' : 399.0 , 'address' : '北京市朝阳区 xxx' }
))
print ("\n【第二笔订单(首单礼包不再触发)】" )
bus.emit(Event(
name='order.placed' ,
source='order_service' ,
data={'id' : 'ORD_20250002' , 'user_id' : 'U_999' , 'items' : [{'name' : '显示器' , 'qty' : 1 }], 'total' : 1299.0 , 'address' : '上海市静安区 xxx' }
))
print (f"\n事件总线统计:{bus.stats} " )
输出效果(按优先级有序执行,首单礼包只触发一次):
==================================================
【第一笔订单】
[库存服务] 扣减库存:['机械键盘' , '鼠标垫' ]
[短信服务] 通知用户 U_888 : 您的订单 ORD_20250001 已确认
[积分系统] 用户 U_888 获得 39 积分
[物流系统] 创建发货任务:北京市朝阳区 xxx
[日志系统] 记录事件:Event (name='order.placed' , ...)
[营销系统] 🎉 首单礼包已发放给用户 U_888 !
[监控中心] 事件上报:order .placed @ 10 :23 :45
【第二笔订单(首单礼包不再触发)】
[库存服务] 扣减库存:['显示器' ]
...(首单礼包处理器已自动注销)
四、异步事件系统:应对高并发场景 同步事件总线在高并发场景(如每秒数千次事件)下会成为瓶颈。用 asyncio 构建异步版本:
import asyncio
from typing import Callable , Coroutine , Union
AsyncHandler = Callable [[Event], Coroutine ]
class AsyncEventBus :
"""支持异步处理器的事件总线"""
def __init__ (self ):
self ._handlers: dict [str , list [tuple [int , AsyncHandler]]] = defaultdict(list )
def on (self, event_name: str , priority: int = 0 ):
def decorator (func: AsyncHandler ):
self ._handlers[event_name].append((priority, func))
self ._handlers[event_name].sort(key=lambda x: x[0 ], reverse=True )
return func
return decorator
async def emit (self, event: Event, concurrent: bool = False ) -> None :
"""
:param concurrent: True=所有处理器并发执行,False=按优先级顺序执行
"""
handlers = self ._handlers.get(event.name, [])
if not handlers:
return
if concurrent:
await asyncio.gather(*[handler(event) for _, handler in handlers], return_exceptions=True )
else :
for _, handler in handlers:
await handler(event)
async_bus = AsyncEventBus()
@async_bus.on('user.registered' , priority=100 )
async def send_welcome_email (event: Event ):
await asyncio.sleep(0.1 )
print (f"[邮件服务] 发送欢迎邮件给 {event.data['email' ]} " )
@async_bus.on('user.registered' , priority=80 )
async def init_user_profile (event: Event ):
await asyncio.sleep(0.05 )
print (f"[用户服务] 初始化用户档案:{event.data['username' ]} " )
@async_bus.on('user.registered' , priority=60 )
async def send_coupon (event: Event ):
await asyncio.sleep(0.02 )
print (f"[营销服务] 发放新人优惠券给 {event.data['username' ]} " )
async def main ():
event = Event(
name='user.registered' ,
data={'username' : 'alice' , 'email' : '[email protected] ' }
)
print ("--- 顺序执行(有优先级保证)---" )
await async_bus.emit(event, concurrent=False )
print ("\n--- 并发执行(无顺序保证,更快)---" )
import time
start = time.perf_counter()
await async_bus.emit(event, concurrent=True )
print (f"并发耗时:{time.perf_counter() - start:.3 f} s" )
asyncio.run(main())
五、实战变体:Django/Flask 中的信号机制 观察者模式在主流 Web 框架中无处不在。Django 内置的 Signals 就是观察者模式的官方实现:
from django.db.models.signals import post_save
from django.dispatch import receiver
from django.contrib.auth.models import User
@receiver(post_save, sender=User )
def user_created_handler (sender, instance, created, **kwargs ):
if created:
send_welcome_email(instance.email)
create_user_profile(instance)
from django.dispatch import Signal
order_placed = Signal()
order_placed.send(sender=OrderView, order=order_instance)
@receiver(order_placed )
def on_order_placed (sender, order, **kwargs ):
update_inventory(order)
在 Flask 中,可以使用 blinker 库(Flask 依赖项)实现同样的效果:
from blinker import Namespace
my_signals = Namespace()
order_placed = my_signals.signal('order-placed' )
user_registered = my_signals.signal('user.registered' )
@order_placed.connect
def on_order (sender, order, **kwargs ):
print (f"Flask 信号触发:订单 {order['id' ]} " )
order_placed.send('order_service' , order={'id' : 'ORD001' })
六、最佳实践与常见陷阱
6.1 事件命名规范 建议采用「模块。动作」的命名方式,清晰表达事件语义:
'order.placed'
'order.cancelled'
'payment.success'
'payment.failed'
'user.registered'
'inventory.low_stock'
6.2 事件溯源与调试 为 EventBus 增加事件历史记录,方便调试和回放:
class DebuggableEventBus (EventBus ):
def emit (self, event: Event ) -> int :
count = super ().emit(event)
if logger.isEnabledFor(logging.DEBUG):
logger.debug(f"事件 '{event.name} ' 触发了 {count} 个处理器" )
return count
def replay (self, event_name: str = None ) -> None :
"""重放历史事件(用于调试/恢复场景)"""
history = self ._event_history
if event_name:
history = [e for e in history if e.name == event_name]
print (f"回放 {len (history)} 个历史事件..." )
for event in history:
self .emit(event)
6.3 防止内存泄漏 观察者模式最常见的坑:忘记注销观察者导致内存泄漏 。建议使用弱引用或上下文管理器:
import weakref
class WeakEventBus (EventBus ):
"""使用弱引用的事件总线,避免内存泄漏"""
def subscribe (self, event_name: str , handler, **kwargs ):
weak_handler = weakref.ref(handler)
def wrapper (event ):
h = weak_handler()
if h is not None :
h(event)
else :
self .unsubscribe(event_name, wrapper)
super ().subscribe(event_name, wrapper, **kwargs)
6.4 单元测试 import pytest
from unittest.mock import MagicMock, call
def test_order_placed_triggers_inventory_handler ():
bus = EventBus()
mock_handler = MagicMock()
bus.subscribe('order.placed' , mock_handler)
event = Event(name='order.placed' , data={'id' : 'ORD001' })
bus.emit(event)
mock_handler.assert_called_once_with(event)
def test_once_handler_only_fires_once ():
bus = EventBus()
counter = {'count' : 0 }
@bus.on('test.event' , once=True )
def one_time_handler (event ):
counter['count' ] += 1
bus.emit_by_name('test.event' )
bus.emit_by_name('test.event' )
bus.emit_by_name('test.event' )
assert counter['count' ] == 1
def test_error_in_handler_does_not_affect_others ():
bus = EventBus(error_handling='log' )
results = []
@bus.on('test.event' , priority=100 )
def bad_handler (event ):
raise RuntimeError("我崩了!" )
@bus.on('test.event' , priority=50 )
def good_handler (event ):
results.append('success' )
bus.emit_by_name('test.event' )
assert results == ['success' ]
七、前沿应用:CQRS 与事件溯源架构 观察者模式是更高级架构模式的基石。在现代微服务领域,CQRS(命令查询职责分离) 和 Event Sourcing(事件溯源) 都以事件系统为核心。
class EventStore :
"""事件存储:系统所有状态变更都以事件形式持久化"""
def __init__ (self ):
self ._store: list [Event] = []
self ._bus = EventBus()
def append (self, event: Event ) -> None :
self ._store.append(event)
self ._bus.emit(event)
def get_history (self, aggregate_id: str ) -> list [Event]:
return [e for e in self ._store if e.data.get('aggregate_id' ) == aggregate_id]
def rebuild_state (self, aggregate_id: str ) -> dict :
"""从事件历史重建实体状态"""
state = {}
for event in self .get_history(aggregate_id):
if event.name == 'account.created' :
state = {'id' : aggregate_id, 'balance' : 0 }
elif event.name == 'account.deposited' :
state['balance' ] += event.data['amount' ]
elif event.name == 'account.withdrawn' :
state['balance' ] -= event.data['amount' ]
return state
store = EventStore()
acc_id = 'ACC_001'
store.append(Event('account.created' , data={'aggregate_id' : acc_id, 'owner' : '张三' }))
store.append(Event('account.deposited' , data={'aggregate_id' : acc_id, 'amount' : 1000 }))
store.append(Event('account.withdrawn' , data={'aggregate_id' : acc_id, 'amount' : 200 }))
store.append(Event('account.deposited' , data={'aggregate_id' : acc_id, 'amount' : 500 }))
state = store.rebuild_state(acc_id)
print (f"账户当前状态:{state} " )
结合 FastAPI 和异步事件总线,可以快速搭建生产级事件驱动微服务,这也是当下云原生架构的主流选择。
八、总结 观察者模式以其「低耦合、高内聚」的特性,成为事件驱动架构的基石。本文从基础骨架出发,逐步构建了一个支持优先级、一次性监听、错误隔离、异步处理的完整事件总线,并覆盖了以下关键实践:
同步 EventBus :优先级调度、一次性监听、通配符订阅、错误隔离
异步 EventBus :基于 asyncio,支持顺序与并发两种执行模式
框架集成 :Django Signals、Flask blinker 的实战用法
最佳实践 :事件命名规范、弱引用防泄漏、单元测试策略
架构延伸 :事件溯源的初步实现
💡 一句话选用标准: 当你发现核心模块需要「通知」多个其他模块,但又不希望与它们直接耦合时——观察者模式就是你需要的那把钥匙。
参考资料 相关免费在线工具 加密/解密文本 使用加密算法(如AES、TripleDES、Rabbit或RC4)加密和解密文本明文。 在线工具,加密/解密文本在线工具,online
Gemini 图片去水印 基于开源反向 Alpha 混合算法去除 Gemini/Nano Banana 图片水印,支持批量处理与下载。 在线工具,Gemini 图片去水印在线工具,online
curl 转代码 解析常见 curl 参数并生成 fetch、axios、PHP curl 或 Python requests 示例代码。 在线工具,curl 转代码在线工具,online
Base64 字符串编码/解码 将字符串编码和解码为其 Base64 格式表示形式即可。 在线工具,Base64 字符串编码/解码在线工具,online
Base64 文件转换器 将字符串、文件或图像转换为其 Base64 表示形式。 在线工具,Base64 文件转换器在线工具,online
Markdown转HTML 将 Markdown(GFM)转为 HTML 片段,浏览器内 marked 解析;与 HTML转Markdown 互为补充。 在线工具,Markdown转HTML在线工具,online