观察者模式全解析:用 Python 构建优雅的事件系统,让组件彻底解耦

观察者模式全解析:用 Python 构建优雅的事件系统,让组件彻底解耦

观察者模式全解析:用 Python 构建优雅的事件系统,让组件彻底解耦


一、引言:当组件之间的"耦合"成为噩梦

想象这样一个场景:你在开发一个电商系统,用户成功下单之后需要做很多事——发送短信通知、更新库存、记录日志、推送积分、触发物流系统……

初级写法是这样的:

defplace_order(order):# 业务逻辑 save_order_to_db(order)# 下单后的一堆副作用 send_sms_notification(order) update_inventory(order) write_log(order) add_points(order.user_id) trigger_logistics(order)# 产品又加了个新需求... push_to_dashboard(order)

这段代码有几个致命问题:place_order 函数职责爆炸,承担了所有后续处理;每次新增需求都要打开核心函数修改,稍有不慎就引入 Bug;各模块之间高度耦合,完全无法独立测试。

这就是紧耦合的代价。

观察者模式(Observer Pattern) 是解决这类问题的经典方案。它建立了一种「一对多」的依赖关系:当一个对象(被观察者/主题)状态变化时,所有依赖它的对象(观察者)都会自动收到通知并响应,而被观察者完全不需要知道观察者是谁、有多少个、做了什么。

这正是现代事件驱动架构的哲学核心:「我只管发出信号,谁响应、怎么响应,与我无关。」

本文将带你从零构建一个完整的 Python 事件系统,覆盖同步事件、异步事件、优先级调度、错误隔离等生产级特性,并结合真实项目案例展示其威力。


二、观察者模式基础:概念与最小实现

2.1 三个核心角色

  • Subject(主题/被观察者):维护观察者列表,提供注册/注销接口,状态变化时通知所有观察者。
  • Observer(观察者):定义接收通知的接口,具体逻辑由子类实现。
  • ConcreteObserver(具体观察者):实现观察者接口,对特定事件做出响应。

2.2 最小骨架实现

from abc import ABC, abstractmethod # 抽象观察者classObserver(ABC):@abstractmethoddefupdate(self, event:str, data:dict)->None:pass# 抽象主题classSubject(ABC):def__init__(self): self._observers:list[Observer]=[]defattach(self, observer: Observer)->None: self._observers.append(observer)defdetach(self, observer: Observer)->None: self._observers.remove(observer)defnotify(self, event:str, data:dict)->None:for observer in self._observers: observer.update(event, data)# 具体主题classOrderSystem(Subject):defplace_order(self, order:dict)->None:print(f"[订单系统] 订单 {order['id']} 创建成功") self.notify('order_placed', order)# 具体观察者classSMSObserver(Observer):defupdate(self, event:str, data:dict)->None:if event =='order_placed':print(f"[短信服务] 发送短信给用户 {data['user_id']}")classInventoryObserver(Observer):defupdate(self, event:str, data:dict)->None:if event =='order_placed':print(f"[库存系统] 扣减商品库存: {data['items']}")# 使用 order_system = OrderSystem() order_system.attach(SMSObserver()) order_system.attach(InventoryObserver()) order_system.place_order({'id':'ORD001','user_id':'U100','items':['商品A','商品B']})

输出:

[订单系统] 订单 ORD001 创建成功 [短信服务] 发送短信给用户 U100 [库存系统] 扣减商品库存: ['商品A', '商品B'] 

核心业务代码与后续处理完全分离,新增观察者零改动主题。这就是观察者模式的魅力所在。


三、进阶实战:构建生产级事件总线

基础实现过于简单,真实项目需要更强大的能力。我们来构建一个功能完整的 EventBus(事件总线),支持:事件类型过滤、优先级调度、一次性监听、错误隔离、装饰器注册。

3.1 事件对象设计

from dataclasses import dataclass, field from typing import Any from datetime import datetime import uuid @dataclassclassEvent:"""事件基类""" name:str data: Any =None source:str='' event_id:str= field(default_factory=lambda:str(uuid.uuid4())[:8]) timestamp: datetime = field(default_factory=datetime.now)def__repr__(self):returnf"Event(name={self.name!r}, id={self.event_id}, source={self.source!r})"# 具体事件类型(可选,提供更强的类型约束)@dataclassclassOrderPlacedEvent(Event): name:str='order.placed'@dataclassclassUserRegisteredEvent(Event): name:str='user.registered'@dataclassclassPaymentSuccessEvent(Event): name:str='payment.success'

3.2 功能完整的 EventBus

from typing import Callable, Optional import logging from collections import defaultdict logger = logging.getLogger(__name__)# 监听器类型别名 EventHandler = Callable[[Event],None]classHandlerEntry:"""封装处理器元数据"""def__init__(self, handler: EventHandler, priority:int=0, once:bool=False, name:str=''): self.handler = handler self.priority = priority # 数值越大优先级越高 self.once = once # 是否只触发一次 self.name = name or handler.__name__ self.call_count =0def__repr__(self):returnf"HandlerEntry(name={self.name!r}, priority={self.priority}, once={self.once})"classEventBus:""" 生产级事件总线 - 支持事件类型订阅 - 支持通配符订阅('*' 订阅所有事件) - 支持优先级调度 - 支持一次性监听 - 支持错误隔离(单个处理器异常不影响其他处理器) """def__init__(self, error_handling:str='log'):""" :param error_handling: 'log'=记录日志继续 | 'raise'=抛出异常停止 """ self._handlers:dict[str,list[HandlerEntry]]= defaultdict(list) self._error_handling = error_handling self._event_history:list[Event]=[] self._max_history =100defon(self, event_name:str, priority:int=0, once:bool=False)-> Callable:"""装饰器:注册事件处理器"""defdecorator(func: EventHandler)-> EventHandler: self.subscribe(event_name, func, priority=priority, once=once)return func return decorator defsubscribe(self, event_name:str, handler: EventHandler, priority:int=0, once:bool=False)->None:"""注册事件处理器""" entry = HandlerEntry(handler, priority=priority, once=once) self._handlers[event_name].append(entry)# 按优先级降序排列 self._handlers[event_name].sort(key=lambda e: e.priority, reverse=True) logger.debug(f"订阅事件 '{event_name}': {entry.name} (priority={priority})")defunsubscribe(self, event_name:str, handler: EventHandler)->bool:"""注销事件处理器,返回是否成功""" entries = self._handlers.get(event_name,[]) before =len(entries) self._handlers[event_name]=[e for e in entries if e.handler != handler]returnlen(self._handlers[event_name])< before defemit(self, event: Event)->int:""" 发布事件 :return: 触发的处理器数量 """# 记录历史 self._event_history.append(event)iflen(self._event_history)> self._max_history: self._event_history.pop(0)# 收集处理器:精确匹配 + 通配符 entries =(self._handlers.get(event.name,[])+ self._handlers.get('*',[]))# 去重(同一处理器不重复调用) seen =set() unique_entries =[]for e in entries:ifid(e.handler)notin seen: seen.add(id(e.handler)) unique_entries.append(e)# 执行处理器 triggered =0 to_remove =[]for entry in unique_entries:try: entry.handler(event) entry.call_count +=1 triggered +=1if entry.once: to_remove.append((event.name, entry))except Exception as exc:if self._error_handling =='raise':raise logger.error(f"处理器 '{entry.name}' 处理事件 '{event.name}' 时出错: {exc}")# 清理一次性处理器for evt_name, entry in to_remove: self._handlers[evt_name]=[ e for e in self._handlers[evt_name]if e isnot entry ]return triggered defemit_by_name(self, event_name:str, data: Any =None, source:str='')->int:"""便捷方法:直接用事件名发布"""return self.emit(Event(name=event_name, data=data, source=source))@propertydefstats(self)->dict:"""统计信息"""return{'subscribed_events':list(self._handlers.keys()),'total_handlers':sum(len(v)for v in self._handlers.values()),'history_count':len(self._event_history),}

3.3 完整使用示例:电商下单流程

# 创建全局事件总线 bus = EventBus(error_handling='log')# ===== 各模块通过装饰器注册监听器 [email protected]('order.placed', priority=100)# 最高优先级——库存先扣defhandle_inventory(event: Event): order = event.data print(f"[库存服务] 扣减库存: {[i['name']for i in order['items']]}")@bus.on('order.placed', priority=80)defhandle_sms(event: Event): order = event.data print(f"[短信服务] 通知用户 {order['user_id']}: 您的订单 {order['id']} 已确认")@bus.on('order.placed', priority=60)defhandle_points(event: Event): order = event.data points =int(order['total']*0.1)print(f"[积分系统] 用户 {order['user_id']} 获得 {points} 积分")@bus.on('order.placed', priority=40)defhandle_logistics(event: Event): order = event.data print(f"[物流系统] 创建发货任务: {order['address']}")@bus.on('order.placed', priority=20)defhandle_log(event: Event):print(f"[日志系统] 记录事件: {event}")# 通配符监听——监控所有事件@bus.on('*', priority=0)defglobal_monitor(event: Event):print(f"[监控中心] 事件上报: {event.name} @ {event.timestamp.strftime('%H:%M:%S')}")# ===== 一次性监听:首单礼包 [email protected]('order.placed', once=True)deffirst_order_bonus(event: Event):print(f"[营销系统] 🎉 首单礼包已发放给用户 {event.data['user_id']}!")# ===== 触发事件 =====print("="*50)print("【第一笔订单】") bus.emit(Event( name='order.placed', source='order_service', data={'id':'ORD_20250001','user_id':'U_888','items':[{'name':'机械键盘','qty':1},{'name':'鼠标垫','qty':2}],'total':399.0,'address':'北京市朝阳区xxx'}))print("\n【第二笔订单(首单礼包不再触发)】") bus.emit(Event( name='order.placed', source='order_service', data={'id':'ORD_20250002','user_id':'U_999','items':[{'name':'显示器','qty':1}],'total':1299.0,'address':'上海市静安区xxx'}))print(f"\n事件总线统计: {bus.stats}")

输出效果(按优先级有序执行,首单礼包只触发一次):

================================================== 【第一笔订单】 [库存服务] 扣减库存: ['机械键盘', '鼠标垫'] [短信服务] 通知用户 U_888: 您的订单 ORD_20250001 已确认 [积分系统] 用户 U_888 获得 39 积分 [物流系统] 创建发货任务: 北京市朝阳区xxx [日志系统] 记录事件: Event(name='order.placed', ...) [营销系统] 🎉 首单礼包已发放给用户 U_888! [监控中心] 事件上报: order.placed @ 10:23:45 【第二笔订单(首单礼包不再触发)】 [库存服务] 扣减库存: ['显示器'] ...(首单礼包处理器已自动注销) 

四、异步事件系统:应对高并发场景

同步事件总线在高并发场景(如每秒数千次事件)下会成为瓶颈。用 asyncio 构建异步版本:

import asyncio from typing import Callable, Coroutine, Union AsyncHandler = Callable[[Event], Coroutine]classAsyncEventBus:"""支持异步处理器的事件总线"""def__init__(self): self._handlers:dict[str,list[tuple[int, AsyncHandler]]]= defaultdict(list)defon(self, event_name:str, priority:int=0):defdecorator(func: AsyncHandler): self._handlers[event_name].append((priority, func)) self._handlers[event_name].sort(key=lambda x: x[0], reverse=True)return func return decorator asyncdefemit(self, event: Event, concurrent:bool=False)->None:""" :param concurrent: True=所有处理器并发执行, False=按优先级顺序执行 """ handlers = self._handlers.get(event.name,[])ifnot handlers:returnif concurrent:# 并发执行——适合独立、无顺序依赖的处理器await asyncio.gather(*[handler(event)for _, handler in handlers], return_exceptions=True)else:# 顺序执行——适合有优先级依赖的处理器for _, handler in handlers:await handler(event)# 使用示例 async_bus = AsyncEventBus()@async_bus.on('user.registered', priority=100)asyncdefsend_welcome_email(event: Event):await asyncio.sleep(0.1)# 模拟邮件发送print(f"[邮件服务] 发送欢迎邮件给 {event.data['email']}")@async_bus.on('user.registered', priority=80)asyncdefinit_user_profile(event: Event):await asyncio.sleep(0.05)# 模拟数据库写入print(f"[用户服务] 初始化用户档案: {event.data['username']}")@async_bus.on('user.registered', priority=60)asyncdefsend_coupon(event: Event):await asyncio.sleep(0.02)print(f"[营销服务] 发放新人优惠券给 {event.data['username']}")asyncdefmain(): event = Event( name='user.registered', data={'username':'alice','email':'[email protected]'})print("--- 顺序执行(有优先级保证)---")await async_bus.emit(event, concurrent=False)print("\n--- 并发执行(无顺序保证,更快)---")import time start = time.perf_counter()await async_bus.emit(event, concurrent=True)print(f"并发耗时: {time.perf_counter()- start:.3f}s") asyncio.run(main())

五、实战变体:Django/Flask 中的信号机制

观察者模式在主流 Web 框架中无处不在。Django 内置的 Signals 就是观察者模式的官方实现:

# Django Signals 示例from django.db.models.signals import post_save from django.dispatch import receiver from django.contrib.auth.models import User @receiver(post_save, sender=User)defuser_created_handler(sender, instance, created,**kwargs):if created:# 新用户注册后自动触发 send_welcome_email(instance.email) create_user_profile(instance)# 自定义信号from django.dispatch import Signal order_placed = Signal()# 自定义信号# 发送信号 order_placed.send(sender=OrderView, order=order_instance)# 接收信号@receiver(order_placed)defon_order_placed(sender, order,**kwargs): update_inventory(order)

在 Flask 中,可以使用 blinker 库(Flask 依赖项)实现同样的效果:

from blinker import Namespace # 定义信号命名空间 my_signals = Namespace() order_placed = my_signals.signal('order-placed') user_registered = my_signals.signal('user-registered')# 订阅@order_placed.connectdefon_order(sender, order,**kwargs):print(f"Flask 信号触发: 订单 {order['id']}")# 发布 order_placed.send('order_service', order={'id':'ORD001'})

六、最佳实践与常见陷阱

6.1 事件命名规范

建议采用「模块.动作」的命名方式,清晰表达事件语义:

# 推荐命名风格'order.placed'# 订单已下单'order.cancelled'# 订单已取消'payment.success'# 支付成功'payment.failed'# 支付失败'user.registered'# 用户注册'inventory.low_stock'# 库存不足(状态预警)

6.2 事件溯源与调试

为 EventBus 增加事件历史记录,方便调试和回放:

classDebuggableEventBus(EventBus):defemit(self, event: Event)->int: count =super().emit(event)if logger.isEnabledFor(logging.DEBUG): logger.debug(f"事件 '{event.name}' 触发了 {count} 个处理器")return count defreplay(self, event_name:str=None)->None:"""重放历史事件(用于调试/恢复场景)""" history = self._event_history if event_name: history =[e for e in history if e.name == event_name]print(f"回放 {len(history)} 个历史事件...")for event in history: self.emit(event)

6.3 防止内存泄漏

观察者模式最常见的坑:忘记注销观察者导致内存泄漏。建议使用弱引用或上下文管理器:

import weakref classWeakEventBus(EventBus):"""使用弱引用的事件总线,避免内存泄漏"""defsubscribe(self, event_name:str, handler,**kwargs):# 弱引用:若处理器对象被 GC,自动从注册表移除 weak_handler = weakref.ref(handler)# 包装处理器defwrapper(event): h = weak_handler()if h isnotNone: h(event)else: self.unsubscribe(event_name, wrapper)super().subscribe(event_name, wrapper,**kwargs)

6.4 单元测试

观察者模式极易测试,每个处理器可以独立验证:

import pytest from unittest.mock import MagicMock, call deftest_order_placed_triggers_inventory_handler(): bus = EventBus() mock_handler = MagicMock() bus.subscribe('order.placed', mock_handler) event = Event(name='order.placed', data={'id':'ORD001'}) bus.emit(event) mock_handler.assert_called_once_with(event)deftest_once_handler_only_fires_once(): bus = EventBus() counter ={'count':0}@bus.on('test.event', once=True)defone_time_handler(event): counter['count']+=1 bus.emit_by_name('test.event') bus.emit_by_name('test.event') bus.emit_by_name('test.event')assert counter['count']==1deftest_error_in_handler_does_not_affect_others(): bus = EventBus(error_handling='log') results =[]@bus.on('test.event', priority=100)defbad_handler(event):raise RuntimeError("我崩了!")@bus.on('test.event', priority=50)defgood_handler(event): results.append('success') bus.emit_by_name('test.event')assert results ==['success']# 坏处理器不影响好处理器

七、前沿应用:CQRS 与事件溯源架构

观察者模式是更高级架构模式的基石。在现代微服务领域,CQRS(命令查询职责分离)Event Sourcing(事件溯源) 都以事件系统为核心。

# 简化版事件溯源示例classEventStore:"""事件存储:系统所有状态变更都以事件形式持久化"""def__init__(self): self._store:list[Event]=[] self._bus = EventBus()defappend(self, event: Event)->None: self._store.append(event) self._bus.emit(event)# 同时发布事件defget_history(self, aggregate_id:str)->list[Event]:return[e for e in self._store if e.data.get('aggregate_id')== aggregate_id]defrebuild_state(self, aggregate_id:str)->dict:"""从事件历史重建实体状态""" state ={}for event in self.get_history(aggregate_id):if event.name =='account.created': state ={'id': aggregate_id,'balance':0}elif event.name =='account.deposited': state['balance']+= event.data['amount']elif event.name =='account.withdrawn': state['balance']-= event.data['amount']return state # 使用事件溯源追踪账户变更 store = EventStore() acc_id ='ACC_001' store.append(Event('account.created', data={'aggregate_id': acc_id,'owner':'张三'})) store.append(Event('account.deposited', data={'aggregate_id': acc_id,'amount':1000})) store.append(Event('account.withdrawn', data={'aggregate_id': acc_id,'amount':200})) store.append(Event('account.deposited', data={'aggregate_id': acc_id,'amount':500})) state = store.rebuild_state(acc_id)print(f"账户当前状态: {state}")# {'id': 'ACC_001', 'balance': 1300}

结合 FastAPI 和异步事件总线,可以快速搭建生产级事件驱动微服务,这也是当下云原生架构的主流选择。


八、总结

观察者模式以其「低耦合、高内聚」的特性,成为事件驱动架构的基石。本文从基础骨架出发,逐步构建了一个支持优先级、一次性监听、错误隔离、异步处理的完整事件总线,并覆盖了以下关键实践:

  • 同步 EventBus:优先级调度、一次性监听、通配符订阅、错误隔离
  • 异步 EventBus:基于 asyncio,支持顺序与并发两种执行模式
  • 框架集成:Django Signals、Flask blinker 的实战用法
  • 最佳实践:事件命名规范、弱引用防泄漏、单元测试策略
  • 架构延伸:事件溯源的初步实现
💡 一句话选用标准: 当你发现核心模块需要「通知」多个其他模块,但又不希望与它们直接耦合时——观察者模式就是你需要的那把钥匙。

你在项目中构建过事件系统吗?是用自定义实现还是借助消息队列(如 Redis、Kafka)?欢迎在评论区分享你的架构思考和踩坑经验,让我们一起在技术的道路上走得更稳、更远。


参考资料

Read more

AI大模型实用(三)Java快速实现智能体整理(Springboot+LangChain4j)

目录 1.1 简介 1.2 示例 步骤一: 添加pom 步骤二:配置 步骤三:流式输出 步骤四: 正常输出 步骤五: 【类似函数调用】AI Service接口 1.3 调试问题 问题1: ClassNotFoundException: dev.langchain4j.exception.IllegalConfigurationException 问题2: overriding is disabled 问题3 :dev.langchain4j.exception.IllegalConfigurationException 1.4  langchain4j与springAI对比 1.1 简介 一个基于 Java 的库,旨在简化自然语言处理(NLP)和大型语言模型(LLM)

爆锤OpenClaw,内存爆降 99%!仅需 5MB, ZeroClaw横空出世

爆锤OpenClaw,内存爆降 99%!仅需 5MB, ZeroClaw横空出世

作者按:就在所有人还在围着 OpenClaw 疯狂刷屏,捧着它近20万 Star 奉为“AI数字员工天花板”,却又在深夜痛骂它那动辄 1GB+ 的内存溢出时——ZeroClaw 横空出世了。今天,我们将从源码级剖析这个由哈佛、MIT 极客团队打造的纯 Rust 怪物,带你手把手在 几十块 的破旧设备上跑起属于你的 AI 特工! 一、 引言:天下苦 OpenClaw 久矣! 2025年到2026年,AI Agent 迎来了大爆发,OpenClaw 凭借其全能的特性火遍全网。但是,作为一名在生产环境中踩坑无数的架构师,我必须说句实话:OpenClaw 太重了,重到令人发指! 试想一下:你只想在自己吃灰的树莓派(Raspberry Pi)或者一台廉价的 512MB 内存云服务器上跑一个自动收发邮件、定时抓取数据的个人小助理。结果一跑

英伟达2025下一代AI基础设施的800伏直流架构白皮书(中文翻译版本第二部分)

英伟达2025下一代AI基础设施的800伏直流架构白皮书(中文翻译版本第二部分)

【接上一篇博客,跳转阅读连接】 英伟达2025下一代AI基础设施的800伏直流架构白皮书(中文翻译版本第一部分)https://blog.ZEEKLOG.net/qq_39648250/article/details/154962337 设施级直流配电方案评估         当前,领先的数据中心运营商和云服务提供商正在评估多种设施级数据中心架构: 800 VDC(NVIDIA MGX):直接支持Nvidia 800 VDC机架架构。得益于氮化镓(Gallium Nitride)和碳化硅(碳化硅)器件转换设备的日益成熟以及电动汽车的广泛采用。实现AI计算的无缝端到端集成。         750 VDC(ODCA / VDE -SPEC 90037):一种广泛采用的欧洲工业标准,旨在简化双线系统。与现有的1000 V级设备和电子设备具有短期兼容性。它在运输和工业设施中越来越受欢迎。         ±400 VDC(OCP 标准):技术上可行,基于以前的 400V DC 体系结构发展而来。

从0到1上手OpenClaw:本地安装 + 云部署全攻略,人人都能拥有专属 AI 执行助手

从0到1上手OpenClaw:本地安装 + 云部署全攻略,人人都能拥有专属 AI 执行助手

在上一篇深度解析中,我们见证了 OpenClaw 如何打破 AI “只会说不会做” 的桎梏,从对话式 AI 进化为能落地执行的数字助手。很多朋友留言表示,被 OpenClaw 的全场景能力打动,却卡在了 “安装部署” 这第一步,担心代码门槛太高无从下手,或是怕踩了环境配置的坑迟迟无法启动。 作为系列教程的开篇,我们就从最零门槛、零成本的本地安装讲起,全程附带可直接复制的命令、新手避坑提醒,哪怕你是第一次接触终端操作,跟着步骤走也能顺利完成安装,真正实现 “一句话指令,AI 全流程执行”。 1. 安装前的必备准备 在正式开始安装前,做好这几项基础准备,能帮你避开 90% 的前期踩坑,大幅提升部署成功率,所有需要用到的工具均为免费开源,可直接从官网下载。 (1)硬件适配 不用盲目追求高配,根据自己的使用场景满足基础要求即可: * a. 零基础新手尝鲜试玩:电脑满足 4 核 CPU、