前言
设计模式不是银弹,但在合适的场景下能让代码更清晰、更易维护。本文不讲大道理,直接用 Python 代码展示常用设计模式的实际应用场景。
设计模式分类
创建型
- 单例模式
- 工厂模式
- 建造者模式
结构型
- 装饰器模式
- 适配器模式
- 代理模式
行为型
- 策略模式
- 观察者模式
- 责任链模式
展示单例、工厂、建造者、装饰器、适配器、策略、观察者及责任链等八种常用模式的实际应用场景。通过具体的 Python 代码实现,演示了各模式在数据库连接、支付处理、HTTP 请求构建、功能扩展、接口适配、折扣计算、事件通知及请求处理链中的用法。文末提供模式选择指南与总结,强调理解模式思想并在合适场景应用的重要性。
设计模式不是银弹,但在合适的场景下能让代码更清晰、更易维护。本文不讲大道理,直接用 Python 代码展示常用设计模式的实际应用场景。
场景:数据库连接池、配置管理、日志记录器
Singleton
- _instance: Singleton
- _lock: Lock
+ get_instance() : Singleton
+ some_method()
全局只有一个实例
实现方式一:装饰器
import threading
from functools import wraps
def singleton(cls):
"""单例装饰器"""
instances = {}
lock = threading.Lock()
@wraps(cls)
def get_instance(*args, **kwargs):
if cls not in instances:
with lock:
if cls not in instances:
# 双重检查
instances[cls] = cls(*args, **kwargs)
return instances[cls]
return get_instance
@singleton
class DatabaseConnection:
def __init__(self, host: str = 'localhost', port: int = 3306):
self.host = host
self.port = port
self.connected = False
print(f"创建数据库连接:{host}:{port}")
def connect(self):
if not self.connected:
print("连接数据库...")
self.connected = True
def query(self, sql: str):
return f"执行:{sql}"
# 测试
db1 = DatabaseConnection('192.168.1.1', 3306)
db2 = DatabaseConnection('192.168.1.2', 5432)
# 参数被忽略
print(f"db1 is db2: {db1 is db2}") # True
print(f"host: {db1.host}") # 192.168.1.1
实现方式二:元类
import threading
class SingletonMeta(type):
"""单例元类"""
_instances = {}
_lock = threading.Lock()
def __call__(cls, *args, **kwargs):
if cls not in cls._instances:
with cls._lock:
if cls not in cls._instances:
instance = super().__call__(*args, **kwargs)
cls._instances[cls] = instance
return cls._instances[cls]
class AppConfig(metaclass=SingletonMeta):
def __init__(self):
self.settings = {}
print("加载配置...")
def get(self, key: str, default=None):
return self.settings.get(key, default)
def set(self, key: str, value):
self.settings[key] = value
# 测试
config1 = AppConfig()
config1.set('debug', True)
config2 = AppConfig()
print(config2.get('debug')) # True
print(config1 is config2) # True
场景:根据条件创建不同类型的对象
«interface» PaymentProcessor
+ process(amount) : bool
AlipayProcessor
+ process(amount) : bool
WechatProcessor
+ process(amount) : bool
CreditCardProcessor
+ process(amount) : bool
PaymentFactory
+ create(payment_type) : PaymentProcessor
from abc import ABC, abstractmethod
from typing import Dict, Type
from dataclasses import dataclass
# 抽象产品
class PaymentProcessor(ABC):
@abstractmethod
def process(self, amount: float) -> bool:
pass
@abstractmethod
def refund(self, transaction_id: str) -> bool:
pass
# 具体产品
class AlipayProcessor(PaymentProcessor):
def process(self, amount: float) -> bool:
print(f"支付宝支付:¥{amount}")
# 调用支付宝 API...
return True
def refund(self, transaction_id: str) -> bool:
print(f"支付宝退款:{transaction_id}")
return True
class WechatPayProcessor(PaymentProcessor):
def process(self, amount: float) -> bool:
print(f"微信支付:¥{amount}")
# 调用微信支付 API...
return True
def refund(self, transaction_id: str) -> bool:
print(f"微信退款:{transaction_id}")
return True
class CreditCardProcessor(PaymentProcessor):
def __init__(self, gateway: str = 'stripe'):
self.gateway = gateway
def process(self, amount: float) -> bool:
print(f"信用卡支付 ({self.gateway}): ¥{amount}")
return True
def refund(self, transaction_id: str) -> bool:
print(f"信用卡退款:{transaction_id}")
return True
# 工厂类
class PaymentFactory:
_processors: Dict[str, Type[PaymentProcessor]] = {
'alipay': AlipayProcessor,
'wechat': WechatPayProcessor,
'credit_card': CreditCardProcessor,
}
@classmethod
def register(cls, name: str, processor_class: Type[PaymentProcessor]):
"""注册新的支付处理器"""
cls._processors[name] = processor_class
@classmethod
def create(cls, payment_type: str, **kwargs) -> PaymentProcessor:
"""创建支付处理器"""
processor_class = cls._processors.get(payment_type)
if not processor_class:
raise ValueError(f"不支持的支付方式:{payment_type}")
return processor_class(**kwargs)
# 使用示例
@dataclass
class Order:
order_id: str
amount: float
payment_type: str
def process_order(order: Order) -> bool:
processor = PaymentFactory.create(order.payment_type)
return processor.process(order.amount)
# 测试
orders = [
Order('ORD001', 99.9, 'alipay'),
Order('ORD002', 199.0, 'wechat'),
Order('ORD003', 599.0, 'credit_card'),
]
for order in orders:
success = process_order(order)
print(f"订单 {order.order_id}: {'成功' if success else '失败'}\n")
场景:构建复杂对象,特别是有很多可选参数时
Builder
设置属性 1
设置属性 2
设置属性 3
build
Product
from dataclasses import dataclass, field
from typing import List, Optional
from enum import Enum
class HttpMethod(Enum):
GET = 'GET'
POST = 'POST'
PUT = 'PUT'
DELETE = 'DELETE'
@dataclass
class HttpRequest:
"""HTTP 请求对象"""
url: str
method: HttpMethod = HttpMethod.GET
headers: dict = field(default_factory=dict)
params: dict = field(default_factory=dict)
body: Optional[str] = None
timeout: int = 30
retries: int = 0
def __str__(self):
return f"{self.method.value}{self.url}"
class HttpRequestBuilder:
"""HTTP 请求建造者"""
def __init__(self, url: str):
self._url = url
self._method = HttpMethod.GET
self._headers = {}
self._params = {}
self._body = None
self._timeout = 30
self._retries = 0
def method(self, method: HttpMethod) -> 'HttpRequestBuilder':
self._method = method
return self
def header(self, key: str, value: str) -> 'HttpRequestBuilder':
self._headers[key] = value
return self
def headers(self, headers: dict) -> 'HttpRequestBuilder':
self._headers.update(headers)
return self
def param(self, key: str, value: str) -> 'HttpRequestBuilder':
self._params[key] = value
return self
def params(self, params: dict) -> 'HttpRequestBuilder':
self._params.update(params)
return self
def body(self, body: str) -> 'HttpRequestBuilder':
self._body = body
return self
def json(self, data: dict) -> 'HttpRequestBuilder':
import json
self._body = json.dumps(data)
self._headers['Content-Type'] = 'application/json'
return self
def timeout(self, seconds: int) -> 'HttpRequestBuilder':
self._timeout = seconds
return self
def retries(self, count: int) -> 'HttpRequestBuilder':
self._retries = count
return self
def auth(self, token: str) -> 'HttpRequestBuilder':
self._headers['Authorization'] = f'Bearer {token}'
return self
def build(self) -> HttpRequest:
return HttpRequest(
url=self._url,
method=self._method,
headers=self._headers,
params=self._params,
body=self._body,
timeout=self._timeout,
retries=self._retries
)
# 使用示例
request = (
HttpRequestBuilder('https://api.example.com/users')
.method(HttpMethod.POST)
.header('Accept', 'application/json')
.auth('my-secret-token')
.json({'name': '张三', 'email': '[email protected]'})
.timeout(10)
.retries(3)
.build()
)
print(f"请求:{request}")
print(f"Headers: {request.headers}")
print(f"Body: {request.body}")
场景:动态地给对象添加功能,不修改原有代码
原始函数
装饰器 1
装饰器 2
装饰器 3
最终函数
import time
import functools
import logging
from typing import Callable, Any
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# 计时装饰器
def timer(func: Callable) -> Callable:
@functools.wraps(func)
def wrapper(*args, **kwargs) -> Any:
start = time.perf_counter()
result = func(*args, **kwargs)
elapsed = time.perf_counter() - start
logger.info(f"{func.__name__} 耗时:{elapsed:.4f}秒")
return result
return wrapper
# 重试装饰器
def retry(max_attempts: int = 3, delay: float = 1.0):
def decorator(func: Callable) -> Callable:
@functools.wraps(func)
def wrapper(*args, **kwargs) -> Any:
last_exception = None
for attempt in range(max_attempts):
try:
return func(*args, **kwargs)
except Exception as e:
last_exception = e
logger.warning(f"{func.__name__} 第{attempt + 1}次失败:{e}")
if attempt < max_attempts - 1:
time.sleep(delay)
raise last_exception
return wrapper
return decorator
# 缓存装饰器
def cache(ttl: int = 60):
"""带过期时间的缓存"""
def decorator(func: Callable) -> Callable:
cache_data = {}
@functools.wraps(func)
def wrapper(*args, **kwargs) -> Any:
key = str(args) + str(kwargs)
now = time.time()
if key in cache_data:
result, timestamp = cache_data[key]
if now - timestamp < ttl:
logger.info(f"{func.__name__} 命中缓存")
return result
result = func(*args, **kwargs)
cache_data[key] = (result, now)
return result
wrapper.clear_cache = lambda: cache_data.clear()
return wrapper
return decorator
# 参数验证装饰器
def validate_types(**type_hints):
def decorator(func: Callable) -> Callable:
@functools.wraps(func)
def wrapper(*args, **kwargs) -> Any:
# 验证关键字参数
for param, expected_type in type_hints.items():
if param in kwargs:
value = kwargs[param]
if not isinstance(value, expected_type):
raise TypeError(f"参数 {param} 应为 {expected_type.__name__}, "
f"实际为 {type(value).__name__}")
return func(*args, **kwargs)
return wrapper
return decorator
# 使用示例
@timer
@retry(max_attempts=3, delay=0.5)
@cache(ttl=300)
def fetch_user_data(user_id: int) -> dict:
"""模拟获取用户数据"""
import random
if random.random() < 0.3: # 30% 概率失败
raise ConnectionError("网络错误")
time.sleep(0.1) # 模拟网络延迟
return {'id': user_id, 'name': f'User_{user_id}'}
@validate_types(amount=float, currency=str)
def process_payment(amount: float, currency: str = 'CNY') -> str:
return f"支付 {amount}{currency}"
# 测试
print(fetch_user_data(1))
print(fetch_user_data(1)) # 第二次会命中缓存
print(process_payment(amount=99.9, currency='CNY'))
# process_payment(amount='100', currency='CNY') # 会抛出 TypeError
场景:让不兼容的接口能够协同工作
«interface» Target
+ request() : str
Adaptee
+ specific_request() : str
Adapter
- adaptee: Adaptee
+ request() : str
from abc import ABC, abstractmethod
from typing import Dict, Any
import json
import xml.etree.ElementTree as ET
# 目标接口:统一的数据解析器
class DataParser(ABC):
@abstractmethod
def parse(self, data: str) -> Dict[str, Any]:
pass
@abstractmethod
def serialize(self, data: Dict[str, Any]) -> str:
pass
# 已有的 JSON 解析器(符合目标接口)
class JsonParser(DataParser):
def parse(self, data: str) -> Dict[str, Any]:
return json.loads(data)
def serialize(self, data: Dict[str, Any]) -> str:
return json.dumps(data, ensure_ascii=False)
# 第三方 XML 库(不符合目标接口)
class LegacyXmlProcessor:
"""遗留的 XML 处理类,接口不兼容"""
def xml_to_dict(self, xml_string: str) -> Dict:
root = ET.fromstring(xml_string)
return self._element_to_dict(root)
def _element_to_dict(self, element) -> Dict:
result = {}
for child in element:
if len(child) == 0:
result[child.tag] = child.text
else:
result[child.tag] = self._element_to_dict(child)
return result
def dict_to_xml(self, data: Dict, root_name: str = 'root') -> str:
root = ET.Element(root_name)
self._dict_to_element(data, root)
return ET.tostring(root, encoding='unicode')
def _dict_to_element(self, data: Dict, parent):
for key, value in data.items():
child = ET.SubElement(parent, key)
if isinstance(value, dict):
self._dict_to_element(value, child)
else:
child.text = str(value)
# 适配器:让 LegacyXmlProcessor 符合 DataParser 接口
class XmlParserAdapter(DataParser):
def __init__(self):
self._xml_processor = LegacyXmlProcessor()
def parse(self, data: str) -> Dict[str, Any]:
return self._xml_processor.xml_to_dict(data)
def serialize(self, data: Dict[str, Any]) -> str:
return self._xml_processor.dict_to_xml(data)
# 统一的数据处理函数
def process_data(parser: DataParser, raw_data: str) -> Dict:
"""使用统一接口处理数据"""
data = parser.parse(raw_data)
# 处理数据...
data['processed'] = True
return data
# 使用示例
json_data = '{"name": "张三", "age": 30}'
xml_data = '<user><name>李四</name><age>25</age></user>'
json_parser = JsonParser()
xml_parser = XmlParserAdapter()
print("JSON 解析:", process_data(json_parser, json_data))
print("XML 解析:", process_data(xml_parser, xml_data))
# 序列化
data = {'name': '王五', 'age': 35}
print("JSON 序列化:", json_parser.serialize(data))
print("XML 序列化:", xml_parser.serialize(data))
场景:根据不同情况选择不同的算法或行为
Context
- strategy: Strategy
+ set_strategy(strategy)
+ execute_strategy()
«interface» Strategy
+ execute()
ConcreteStrategyA
+ execute()
ConcreteStrategyB
+ execute()
from abc import ABC, abstractmethod
from typing import List
from dataclasses import dataclass
@dataclass
class Product:
name: str
price: float
quantity: int
# 策略接口
class DiscountStrategy(ABC):
@abstractmethod
def calculate(self, products: List[Product]) -> float:
"""计算折扣后的总价"""
pass
@abstractmethod
def description(self) -> str:
pass
# 具体策略
class NoDiscount(DiscountStrategy):
def calculate(self, products: List[Product]) -> float:
return sum(p.price * p.quantity for p in products)
def description(self) -> str:
return "无折扣"
class PercentageDiscount(DiscountStrategy):
def __init__(self, percentage: float):
self.percentage = percentage
def calculate(self, products: List[Product]) -> float:
total = sum(p.price * p.quantity for p in products)
return total * (1 - self.percentage / 100)
def description(self) -> str:
return f"{self.percentage}% 折扣"
class FixedAmountDiscount(DiscountStrategy):
def __init__(self, amount: float, min_purchase: float = 0):
self.amount = amount
self.min_purchase = min_purchase
def calculate(self, products: List[Product]) -> float:
total = sum(p.price * p.quantity for p in products)
if total >= self.min_purchase:
return max(0, total - self.amount)
return total
def description(self) -> str:
return f"满{self.min_purchase}减{self.amount}"
class BuyNGetMFree(DiscountStrategy):
"""买 N 件送 M 件(取最便宜的)"""
def __init__(self, buy: int, free: int):
self.buy = buy
self.free = free
def calculate(self, products: List[Product]) -> float:
# 展开所有商品
all_items = []
for p in products:
all_items.extend([p.price] * p.quantity)
# 按价格排序
all_items.sort(reverse=True)
# 计算免费商品数量
total_items = len(all_items)
free_count = (total_items // (self.buy + self.free)) * self.free
# 最便宜的商品免费
paid_items = all_items[:total_items - free_count]
return sum(paid_items)
def description(self) -> str:
return f"买{self.buy}送{self.free}"
# 上下文
class ShoppingCart:
def __init__(self):
self.products: List[Product] = []
self._strategy: DiscountStrategy = NoDiscount()
def add_product(self, product: Product):
self.products.append(product)
def set_discount(self, strategy: DiscountStrategy):
self._strategy = strategy
def get_total(self) -> float:
return self._strategy.calculate(self.products)
def checkout(self):
original = sum(p.price * p.quantity for p in self.products)
final = self.get_total()
saved = original - final
print(f"{'='*40}")
print("购物车清单:")
for p in self.products:
print(f" {p.name} x{p.quantity}: ¥{p.price * p.quantity:.2f}")
print(f"{'='*40}")
print(f"原价:¥{original:.2f}")
print(f"优惠策略:{self._strategy.description()}")
print(f"节省:¥{saved:.2f}")
print(f"实付:¥{final:.2f}")
# 使用示例
cart = ShoppingCart()
cart.add_product(Product("Python 编程", 89.0, 2))
cart.add_product(Product("设计模式", 69.0, 1))
cart.add_product(Product("算法导论", 128.0, 1))
# 测试不同策略
strategies = [
NoDiscount(),
PercentageDiscount(20),
FixedAmountDiscount(50, min_purchase=200),
BuyNGetMFree(2, 1),
]
for strategy in strategies:
cart.set_discount(strategy)
cart.checkout()
print()
场景:当一个对象状态改变时,自动通知所有依赖它的对象
观察者 3 观察者 2 观察者 1
主题 (Subject)
状态改变
订阅
订阅
订阅
通知
通知
通知
from abc import ABC, abstractmethod
from typing import List, Dict, Any, Callable
from dataclasses import dataclass
from datetime import datetime
from enum import Enum
class EventType(Enum):
ORDER_CREATED = 'order_created'
ORDER_PAID = 'order_paid'
ORDER_SHIPPED = 'order_shipped'
ORDER_COMPLETED = 'order_completed'
@dataclass
class Event:
type: EventType
data: Dict[str, Any]
timestamp: datetime = None
def __post_init__(self):
if self.timestamp is None:
self.timestamp = datetime.now()
# 观察者接口
class Observer(ABC):
@abstractmethod
def update(self, event: Event):
pass
# 主题(被观察者)
class EventBus:
_instance = None
def __new__(cls):
if cls._instance is None:
cls._instance = super().__new__(cls)
cls._instance._observers: Dict[EventType, List[Observer]] = {}
cls._instance._callbacks: Dict[EventType, List[Callable]] = {}
return cls._instance
def subscribe(self, event_type: EventType, observer: Observer):
"""订阅事件(观察者模式)"""
if event_type not in self._observers:
self._observers[event_type] = []
self._observers[event_type].append(observer)
def on(self, event_type: EventType, callback: Callable):
"""订阅事件(回调函数方式)"""
if event_type not in self._callbacks:
self._callbacks[event_type] = []
self._callbacks[event_type].append(callback)
def unsubscribe(self, event_type: EventType, observer: Observer):
"""取消订阅"""
if event_type in self._observers:
self._observers[event_type].remove(observer)
def publish(self, event: Event):
"""发布事件"""
# 通知观察者
if event.type in self._observers:
for observer in self._observers[event.type]:
observer.update(event)
# 调用回调函数
if event.type in self._callbacks:
for callback in self._callbacks[event.type]:
callback(event)
# 具体观察者
class EmailNotifier(Observer):
def update(self, event: Event):
order_id = event.data.get('order_id')
email = event.data.get('email')
if event.type == EventType.ORDER_CREATED:
print(f"[邮件] 发送订单确认邮件到 {email}, 订单号:{order_id}")
elif event.type == EventType.ORDER_SHIPPED:
print(f"[邮件] 发送发货通知到 {email}, 订单号:{order_id}")
class SMSNotifier(Observer):
def update(self, event: Event):
order_id = event.data.get('order_id')
phone = event.data.get('phone')
if event.type == EventType.ORDER_PAID:
print(f"[短信] 发送支付成功短信到 {phone}, 订单号:{order_id}")
elif event.type == EventType.ORDER_SHIPPED:
print(f"[短信] 发送物流短信到 {phone}, 订单号:{order_id}")
class InventoryManager(Observer):
def update(self, event: Event):
if event.type == EventType.ORDER_PAID:
items = event.data.get('items', [])
print(f"[库存] 扣减库存:{items}")
class AnalyticsTracker(Observer):
def update(self, event: Event):
print(f"[分析] 记录事件:{event.type.value}, 时间:{event.timestamp}")
# 订单服务
class OrderService:
def __init__(self):
self.event_bus = EventBus()
def create_order(self, order_data: Dict):
order_id = f"ORD-{datetime.now().strftime('%Y%m%d%H%M%S')}"
print(f"\n创建订单:{order_id}")
self.event_bus.publish(Event(type=EventType.ORDER_CREATED, data={**order_data, 'order_id': order_id}))
return order_id
def pay_order(self, order_id: str, order_data: Dict):
print(f"\n支付订单:{order_id}")
self.event_bus.publish(Event(type=EventType.ORDER_PAID, data={**order_data, 'order_id': order_id}))
def ship_order(self, order_id: str, order_data: Dict):
print(f"\n发货订单:{order_id}")
self.event_bus.publish(Event(type=EventType.ORDER_SHIPPED, data={**order_data, 'order_id': order_id}))
# 使用示例
event_bus = EventBus()
# 注册观察者
event_bus.subscribe(EventType.ORDER_CREATED, EmailNotifier())
event_bus.subscribe(EventType.ORDER_PAID, SMSNotifier())
event_bus.subscribe(EventType.ORDER_PAID, InventoryManager())
event_bus.subscribe(EventType.ORDER_SHIPPED, EmailNotifier())
event_bus.subscribe(EventType.ORDER_SHIPPED, SMSNotifier())
# 注册分析追踪(所有事件)
analytics = AnalyticsTracker()
for event_type in EventType:
event_bus.subscribe(event_type, analytics)
# 使用回调函数方式
event_bus.on(EventType.ORDER_COMPLETED, lambda e: print(f"[回调] 订单完成:{e.data['order_id']}"))
# 模拟订单流程
order_service = OrderService()
order_data = {
'email': '[email protected]',
'phone': '138****1234',
'items': [{'sku': 'BOOK001', 'qty': 2}]
}
order_id = order_service.create_order(order_data)
order_service.pay_order(order_id, order_data)
order_service.ship_order(order_id, order_data)
场景:请求沿着处理链传递,直到被处理
不处理
不处理
处理
处理
处理
请求
处理器 1
处理器 2
处理器 3
响应
from abc import ABC, abstractmethod
from typing import Optional, Dict, Any
from dataclasses import dataclass
from enum import Enum
class LogLevel(Enum):
DEBUG = 1
INFO = 2
WARNING = 3
ERROR = 4
CRITICAL = 5
@dataclass
class Request:
"""请求对象"""
user_id: str
action: str
data: Dict[str, Any]
ip_address: str = ''
token: str = ''
@dataclass
class Response:
success: bool
message: str
data: Any = None
# 处理器抽象类
class Handler(ABC):
def __init__(self):
self._next_handler: Optional[Handler] = None
def set_next(self, handler: 'Handler') -> 'Handler':
self._next_handler = handler
return handler
def handle(self, request: Request) -> Response:
result = self._process(request)
# 如果当前处理器返回失败,直接返回
if not result.success:
return result
# 传递给下一个处理器
if self._next_handler:
return self._next_handler.handle(request)
return result
@abstractmethod
def _process(self, request: Request) -> Response:
pass
# 具体处理器
class RateLimitHandler(Handler):
"""限流处理器"""
def __init__(self, max_requests: int = 100):
super().__init__()
self.max_requests = max_requests
self._request_counts: Dict[str, int] = {}
def _process(self, request: Request) -> Response:
ip = request.ip_address
self._request_counts[ip] = self._request_counts.get(ip, 0) + 1
if self._request_counts[ip] > self.max_requests:
return Response(False, f"请求过于频繁,IP: {ip}")
print(f"[限流] 通过 - IP: {ip}, 请求数:{self._request_counts[ip]}")
return Response(True, "限流检查通过")
class AuthenticationHandler(Handler):
"""认证处理器"""
def __init__(self):
super().__init__()
self._valid_tokens = {'token123', 'token456', 'admin_token'}
def _process(self, request: Request) -> Response:
if not request.token:
return Response(False, "缺少认证令牌")
if request.token not in self._valid_tokens:
return Response(False, "无效的认证令牌")
print(f"[认证] 通过 - 用户:{request.user_id}")
return Response(True, "认证通过")
class AuthorizationHandler(Handler):
"""授权处理器"""
def __init__(self):
super().__init__()
self._permissions = {
'user1': ['read', 'write'],
'user2': ['read'],
'admin': ['read', 'write', 'delete', 'admin'],
}
def _process(self, request: Request) -> Response:
user_perms = self._permissions.get(request.user_id, [])
required_perm = request.action
if required_perm not in user_perms:
return Response(False, f"用户 {request.user_id} 无权执行 {required_perm}")
print(f"[授权] 通过 - 用户:{request.user_id}, 操作:{required_perm}")
return Response(True, "授权通过")
class ValidationHandler(Handler):
"""数据验证处理器"""
def _process(self, request: Request) -> Response:
data = request.data
# 简单验证示例
if not data:
return Response(False, "请求数据不能为空")
if 'id' in data and not isinstance(data['id'], int):
return Response(False, "id 必须是整数")
print(f"[验证] 通过 - 数据:{data}")
return Response(True, "数据验证通过")
class BusinessHandler(Handler):
"""业务处理器"""
def _process(self, request: Request) -> Response:
print(f"[业务] 处理请求 - 操作:{request.action}, 数据:{request.data}")
# 模拟业务处理
result = {'action': request.action, 'processed_data': request.data, 'status': 'completed'}
return Response(True, "业务处理成功", result)
class LoggingHandler(Handler):
"""日志处理器"""
def _process(self, request: Request) -> Response:
print(f"[日志] 记录请求 - 用户:{request.user_id}, "
f"操作:{request.action}, IP: {request.ip_address}")
return Response(True, "日志记录完成")
# 构建处理链
def build_handler_chain() -> Handler:
""" 处理链顺序:日志 -> 限流 -> 认证 -> 授权 -> 验证 -> 业务处理 """
logging_handler = LoggingHandler()
rate_limit = RateLimitHandler(max_requests=100)
auth = AuthenticationHandler()
authz = AuthorizationHandler()
validation = ValidationHandler()
business = BusinessHandler()
# 链接处理器
logging_handler.set_next(rate_limit).set_next(auth).set_next(authz).set_next(validation).set_next(business)
return logging_handler
# 使用示例
handler_chain = build_handler_chain()
# 测试请求
requests = [
Request(
user_id='user1',
action='write',
data={'id': 1, 'name': '测试'},
ip_address='192.168.1.1',
token='token123',
),
Request(
user_id='user2',
action='delete', # user2 没有 delete 权限
data={'id': 2},
ip_address='192.168.1.2',
token='token456',
),
Request(
user_id='admin',
action='admin',
data={'config': 'new_value'},
ip_address='192.168.1.3',
token='admin_token',
),
Request(
user_id='hacker',
action='read',
data={},
ip_address='10.0.0.1',
token='invalid_token', # 无效 token
),
]
for i, req in enumerate(requests, 1):
print(f"\n{'='*50}")
print(f"请求 {i}: 用户={req.user_id}, 操作={req.action}")
print(f"{'='*50}")
response = handler_chain.handle(req)
print(f"\n结果:{'✓ 成功' if response.success else '✗ 失败'}")
print(f"消息:{response.message}")
if response.data:
print(f"数据:{response.data}")
需要什么?
是 -> 创建对象?
是 -> 全局唯一?
是 -> 单例模式
否 -> 根据条件创建?
是 -> 工厂模式
否 -> 参数很多?
是 -> 建造者模式
否 -> 扩展功能?
是 -> 动态添加?
是 -> 装饰器模式
否 -> 接口不兼容?
是 -> 适配器模式
否 -> 行为变化?
是 -> 算法可替换?
是 -> 策略模式
否 -> 一对多通知?
是 -> 观察者模式
否 -> 链式处理?
是 -> 责任链模式
| 模式 | 适用场景 | Python 特色实现 |
|---|---|---|
| 单例 | 全局唯一实例 | 装饰器、元类、模块 |
| 工厂 | 根据条件创建对象 | 字典映射、类方法 |
| 建造者 | 复杂对象构建 | 链式调用、dataclass |
| 装饰器 | 动态添加功能 | @decorator 语法 |
| 适配器 | 接口转换 | 组合、继承 |
| 策略 | 算法可替换 | 函数作为一等公民 |
| 观察者 | 事件通知 | 回调函数、信号槽 |
| 责任链 | 链式处理 | 链表结构 |
设计模式不是教条,Python 的动态特性让很多模式可以用更简洁的方式实现。关键是理解模式背后的思想,在合适的场景选择合适的方案。

微信公众号「极客日志」,在微信中扫描左侧二维码关注。展示文案:极客日志 zeeklog
使用加密算法(如AES、TripleDES、Rabbit或RC4)加密和解密文本明文。 在线工具,加密/解密文本在线工具,online
解析常见 curl 参数并生成 fetch、axios、PHP curl 或 Python requests 示例代码。 在线工具,curl 转代码在线工具,online
将字符串编码和解码为其 Base64 格式表示形式即可。 在线工具,Base64 字符串编码/解码在线工具,online
将字符串、文件或图像转换为其 Base64 表示形式。 在线工具,Base64 文件转换器在线工具,online
将 Markdown(GFM)转为 HTML 片段,浏览器内 marked 解析;与 HTML转Markdown 互为补充。 在线工具,Markdown转HTML在线工具,online
将 HTML 片段转为 GitHub Flavored Markdown,支持标题、列表、链接、代码块与表格等;浏览器内处理,可链接预填。 在线工具,HTML转Markdown在线工具,online