跳到主要内容OpenClaw 系统架构深度解析 | 极客日志PythonAI算法
OpenClaw 系统架构深度解析
OpenClaw 系统采用四层架构设计,涵盖应用层、编排层、核心层及基础设施层。核心层包含感知、规划、执行、记忆四大引擎,实现 UI 识别、任务分解、操作执行与知识管理。编排层集成工作流引擎与服务网格,支持复杂任务调度。系统提供分布式状态管理、多级安全防御、可观测性监控及插件化扩展能力,具备高可用性与扩展性,适用于构建工业级 GUI 自动化智能体。
云朵棉花糖17 浏览 OpenClaw 系统架构深度解析
🏗️ 一、架构概览与设计哲学
1.1 核心设计原则
- 模块化设计:高内聚低耦合,分层架构,清晰的责任边界。
- 插件化扩展:热插拔组件。
- 事件驱动:异步非阻塞。
- 容错设计:故障隔离与恢复。
- 可观测性:监控 + 日志 + 追踪。
1.2 整体架构图
OpenClaw 四层架构体系 ====================================
┌─────────────────────────────────────────────────────────┐
│ 应用层 (Application Layer) │
├─────────────────────────────────────────────────────────┤
│ • Web Dashboard • API Gateway • CLI Interface │
│ • Mobile App • Chatbot • IDE Plugin │
└─────────────────────────────────────────────────────────┘
│
┌─────────────────────────────────────────────────────────┐
│ 编排层 (Orchestration Layer) │
├─────────────────────────────────────────────────────────┤
│ • Workflow Engine • Task Scheduler • State Manager │
│ • Resource Manager • Load Balancer • Service Mesh │
└─────────────────────────────────────────────────────────┘
│
┌─────────────────────────────────────────────────────────┐
│ 核心层 (Core Layer) │
├─────────────────────────────────────────────────────────┤
│ 感知引擎 │ 规划引擎 │ 执行引擎 │ 记忆引擎 │
│ Perception │ Planning │ Execution │ Memory │
├───────────────┼───────────────┼───────────────┼──────────┤
│ • 视觉识别 │ • LLM 推理 │ • 驱动适配 │ • 向量存储 │
│ • OCR 提取 │ • 任务分解 │ • 操作执行 │ • 知识库 │
│ • 元素检测 │ • 路径规划 │ • 错误处理 │ • 上下文 │
└─────────────────────────────────────────────────────────┘
│
┌─────────────────────────────────────────────────────────┐
│ 基础设施层 (Infrastructure Layer) │
├─────────────────────────────────────────────────────────┤
│ • 消息队列 • 数据库集群 • 对象存储 • 缓存系统 │
│ • 服务发现 • 配置中心 • 监控告警 • 安全认证 │
└─────────────────────────────────────────────────────────┘
🔧 二、核心层深度剖析
2.1 感知引擎架构
""" 感知引擎架构解析
输入:屏幕图像/UI 描述/事件流
输出:结构化 UI 元素 + 语义理解
"""
class PerceptionEngineArchitecture:
"""感知引擎架构"""
def __init__(self):
.input_processors = {
: VisualProcessor(),
: TextualProcessor(),
: EventProcessor(),
: AXProcessor()
}
.feature_extractors = {
: LowLevelFeatureExtractor(),
: MidLevelFeatureExtractor(),
: HighLevelFeatureExtractor()
}
.recognition_pipeline = [
ElementDetector(),
TextRecognizer(),
IconClassifier(),
LayoutAnalyzer(),
SemanticParser()
]
.fusion_engine = MultiModalFusionEngine()
.post_processor = PostProcessor()
() -> PerceptionResult:
processed_inputs = ._preprocess_inputs(input_data)
features = ._extract_features_parallel(processed_inputs)
recognition_results = ._pipeline_recognition(features)
fused_result = .fusion_engine.fuse(recognition_results)
final_result = .post_processor.process(fused_result)
final_result
:
() -> VisualFeatures:
steps = [
._preprocess_image(screenshot),
._build_feature_pyramid(),
._apply_attention(),
._model_spatial_relations()
]
._aggregate_features(steps)
():
{
: ._extract_at_scale(),
: ._extract_at_scale(),
: ._extract_at_scale(),
: ._compute_attention()
}
:
():
.detectors = {
: TemplateMatcher(),
: MLDetector(),
: DeepLearningDetector(),
: HeuristicDetector()
}
.strategy_router = StrategyRouter()
() -> [UIElement]:
strategy = .strategy_router.choose_strategy(image)
detector_tasks = []
detector_name strategy[]:
detector = .detectors[detector_name]
task = asyncio.create_task(detector.detect(image))
detector_tasks.append((detector_name, task))
all_results = {}
name, task detector_tasks:
all_results[name] = task
fused_elements = ._fuse_detections(all_results)
final_elements = ._postprocess(fused_elements)
final_elements
() -> [UIElement]:
elements = defaultdict(: {: [], : []})
detector_name, results all_results.items():
weight = ._get_detector_weight(detector_name)
element results:
element_id = ._generate_element_id(element)
elements[element_id][].append(weight)
elements[element_id][].append(element.bbox)
fused = []
element_id, data elements.items():
(data[]) >= :
avg_score = np.mean(data[])
fused_box = ._weighted_box_fusion(data[], data[])
element = UIElement(
bbox=fused_box,
confidence=avg_score,
source=
)
fused.append(element)
fused
self
'visual'
'textual'
'event'
'accessibility'
self
'low_level'
'mid_level'
'high_level'
self
self
self
async
def
perceive
self, input_data: Dict
"""完整感知流程"""
await
self
await
self
await
self
await
self
await
self
return
class
VisualProcessor
"""视觉处理器架构"""
def
process
self, screenshot: Image
""" 视觉处理流程:
1. 图像预处理 (去噪、增强、标准化)
2. 多尺度特征金字塔构建
3. 注意力机制引导的特征提取
4. 空间关系建模
"""
self
self
self
self
return
self
def
_build_feature_pyramid
self
"""构建特征金字塔 - 多尺度感知"""
return
'scale_1x'
self
1.0
'scale_0.5x'
self
0.5
'scale_0.25x'
self
0.25
'attention_maps'
self
class
ElementDetector
"""元素检测器 - 混合检测策略"""
def
__init__
self
self
'template'
'ml'
'dl'
'heuristic'
self
async
def
detect
self, image: Image
List
"""混合检测流程"""
self
for
in
'detectors'
self
for
in
await
await
self
await
self
return
def
_fuse_detections
self, all_results: Dict
List
"""检测结果融合算法"""
lambda
'scores'
'boxes'
for
in
self
for
in
self
'scores'
'boxes'
for
in
if
len
'scores'
2
'scores'
self
'boxes'
'scores'
'fused'
return
2.2 规划引擎架构
""" 规划引擎架构解析
输入:用户意图 + 环境状态
输出:可执行的操作序列
"""
class PlanningEngineArchitecture:
"""分层规划引擎"""
def __init__(self):
self.strategic_planner = StrategicPlanner()
self.tactical_planner = TacticalPlanner()
self.operational_planner = OperationalPlanner()
self.knowledge_base = PlanningKnowledgeBase()
self.optimizers = {
'efficiency': EfficiencyOptimizer(),
'robustness': RobustnessOptimizer(),
'usability': UsabilityOptimizer()
}
async def plan(self, goal: Goal, context: Context) -> Plan:
"""分层规划流程"""
strategic_plan = await self.strategic_planner.plan(goal, context)
tactical_plan = await self.tactical_planner.plan(strategic_plan, context)
operational_plan = await self.operational_planner.plan(tactical_plan, context)
optimized_plan = await self._optimize_plan(operational_plan)
validated_plan = await self._validate_plan(optimized_plan)
return validated_plan
class StrategicPlanner:
"""战略规划器 - 基于 LLM 的意图理解"""
def __init__(self):
self.llm_engine = LLMEngine()
self.intent_classifier = IntentClassifier()
self.goal_decomposer = GoalDecomposer()
async def plan(self, goal: Goal, context: Context) -> StrategicPlan:
"""战略规划流程"""
intent = await self.intent_classifier.classify(goal.description)
subgoals = await self.goal_decomposer.decompose(goal, intent)
dependencies = await self._analyze_dependencies(subgoals)
prioritized = await self._prioritize_subgoals(subgoals, context)
return StrategicPlan(
intent=intent,
subgoals=prioritized,
dependencies=dependencies,
constraints=self._extract_constraints(goal)
)
class TacticalPlanner:
"""战术规划器 - 模式匹配与策略选择"""
def __init__(self):
self.pattern_library = PatternLibrary()
self.strategy_selector = StrategySelector()
self.constraint_solver = ConstraintSolver()
async def plan(self, strategic_plan: StrategicPlan, context: Context) -> TacticalPlan:
"""战术规划流程"""
matched_patterns = await self.pattern_library.match(
strategic_plan.subgoals, context
)
strategies = []
for pattern in matched_patterns:
strategy = await self._generate_strategy(pattern, context)
strategies.append(strategy)
selected_strategy = await self.strategy_selector.select(
strategies, context
)
solution = await self.constraint_solver.solve(
selected_strategy, strategic_plan.constraints
)
return TacticalPlan(
strategy=selected_strategy,
constraints=solution,
alternatives=self._generate_alternatives(strategies)
)
class OperationalPlanner:
"""操作规划器 - 生成具体动作序列"""
def __init__(self):
self.action_generator = ActionGenerator()
self.sequence_optimizer = SequenceOptimizer()
self.error_handler = ErrorHandler()
async def plan(self, tactical_plan: TacticalPlan, context: Context) -> OperationalPlan:
"""操作规划流程"""
action_templates = tactical_plan.strategy.action_templates
instantiated_actions = []
for template in action_templates:
action = await self.action_generator.instantiate(
template, context
)
instantiated_actions.append(action)
sequence = await self._sequence_actions(
instantiated_actions, tactical_plan.constraints
)
sequence_with_error_handling = await self.error_handler.add_checkpoints(sequence)
optimized_sequence = await self.sequence_optimizer.optimize(
sequence_with_error_handling, context
)
return OperationalPlan(
actions=optimized_sequence,
preconditions=self._extract_preconditions(optimized_sequence),
expected_outcomes=self._predict_outcomes(optimized_sequence)
)
2.3 执行引擎架构
""" 执行引擎架构解析
输入:操作序列 + 环境状态
输出:执行结果 + 状态更新
"""
class ExecutionEngineArchitecture:
"""分布式执行引擎"""
def __init__(self):
self.executor_pool = ExecutorPool()
self.scheduler = TaskScheduler()
self.monitor = ExecutionMonitor()
self.coordinator = ExecutionCoordinator()
self.recovery_manager = RecoveryManager()
async def execute(self, plan: OperationalPlan, context: Context) -> ExecutionResult:
"""分布式执行流程"""
tasks = await self._decompose_plan(plan)
assigned_tasks = await self.scheduler.schedule(tasks, self.executor_pool)
execution_results = await self._execute_parallel(assigned_tasks)
aggregated_result = await self._aggregate_results(execution_results)
await self._sync_state(aggregated_result)
return aggregated_result
class ExecutorPool:
"""执行器池 - 多类型执行器管理"""
def __init__(self):
self.executors = {
'windows': WindowsExecutor(),
'macos': MacOSExecutor(),
'linux': LinuxExecutor(),
'web': WebExecutor(),
'native': NativeExecutor(),
'accessibility': AXExecutor(),
'computer_vision': CVExecutor(),
'api': APIExecutor(),
'composite': CompositeExecutor(),
'fallback': FallbackExecutor()
}
self.load_balancer = LoadBalancer()
self.health_checker = HealthChecker()
async def get_executor(self, action: Action) -> Executor:
"""智能选择执行器"""
compatible_executors = self._filter_compatible_executors(action)
healthy_executors = await self.health_checker.filter_healthy(compatible_executors)
selected = await self.load_balancer.select(healthy_executors)
await selected.prepare(action)
return selected
def _filter_compatible_executors(self, action: Action) -> List[Executor]:
"""基于动作特性选择执行器"""
executors = []
current_platform = platform.system().lower()
if current_platform in self.executors:
executors.append(self.executors[current_platform])
if action.requires_native_api:
executors.append(self.executors['native'])
if action.requires_vision:
executors.append(self.executors['computer_vision'])
if action.is_fallback_allowed:
executors.append(self.executors['fallback'])
return executors
class TaskScheduler:
"""任务调度器 - 智能调度算法"""
def __init__(self):
self.scheduling_algorithms = {
'fifo': FIFOScheduler(),
'priority': PriorityScheduler(),
'deadline': DeadlineScheduler(),
'dynamic': DynamicScheduler()
}
self.resource_manager = ResourceManager()
self.dependency_resolver = DependencyResolver()
async def schedule(self, tasks: List[Task], executor_pool: ExecutorPool) -> Dict[Task, Executor]:
"""智能任务调度"""
dependency_graph = await self.dependency_resolver.analyze(tasks)
resource_requirements = await self._assess_resource_requirements(tasks)
executor_capabilities = await self._evaluate_executor_capabilities(executor_pool)
algorithm = self._select_scheduling_algorithm(
dependency_graph, resource_requirements
)
schedule = await algorithm.schedule(
tasks, executor_pool, dependency_graph
)
return schedule
def _select_scheduling_algorithm(self, dependency_graph, resource_requirements):
"""自适应调度算法选择"""
if len(dependency_graph.edges) > len(dependency_graph.nodes) * 0.5:
return self.scheduling_algorithms['dynamic']
elif any(req['deadline'] for req in resource_requirements.values()):
return self.scheduling_algorithms['deadline']
elif any(req['priority'] > 5 for req in resource_requirements.values()):
return self.scheduling_algorithms['priority']
else:
return self.scheduling_algorithms['fifo']
class ExecutionMonitor:
"""执行监控器 - 实时监控与干预"""
def __init__(self):
self.metrics = {
'performance': PerformanceMetrics(),
'accuracy': AccuracyMetrics(),
'reliability': ReliabilityMetrics(),
'resource': ResourceMetrics()
}
self.anomaly_detectors = {
'statistical': StatisticalAnomalyDetector(),
'ml': MLAnomalyDetector(),
'rule_based': RuleBasedAnomalyDetector()
}
self.intervention_strategies = {
'retry': RetryStrategy(),
'fallback': FallbackStrategy(),
'escalate': EscalationStrategy(),
'abort': AbortStrategy()
}
async def monitor(self, execution: Execution) -> MonitoringResult:
"""实时监控流程"""
collected_metrics = await self._collect_metrics(execution)
anomalies = await self._detect_anomalies(collected_metrics)
if anomalies:
root_causes = await self._analyze_root_causes(anomalies)
intervention = await self._decide_intervention(root_causes)
if intervention:
await self._apply_intervention(intervention, execution)
return MonitoringResult(
metrics=collected_metrics,
anomalies=anomalies or [],
interventions_applied=bool(anomalies)
)
2.4 记忆引擎架构
""" 记忆引擎架构解析
功能:知识存储、检索、推理、学习
"""
class MemoryEngineArchitecture:
"""分层记忆系统"""
def __init__(self):
self.sensory_memory = SensoryMemory()
self.working_memory = WorkingMemory()
self.episodic_memory = EpisodicMemory()
self.semantic_memory = SemanticMemory()
self.consolidator = MemoryConsolidator()
self.retriever = MemoryRetriever()
self.forgetter = AdaptiveForgetter()
self.vector_store = VectorStore()
async def store(self, experience: Experience) -> MemoryIndex:
"""记忆存储流程"""
sensory_trace = await self.sensory_memory.store(experience.raw_data)
encoded = await self._encode_experience(experience)
working_memory_item = await self.working_memory.process(encoded)
if working_memory_item.importance > THRESHOLD:
episodic_index = await self.episodic_memory.store(working_memory_item)
patterns = await self._extract_patterns(working_memory_item)
semantic_index = await self.semantic_memory.store(patterns)
await self._link_memories(episodic_index, semantic_index)
vector_id = await self.vector_store.add(encoded.vector)
return MemoryIndex(
sensory=sensory_trace.id,
working=working_memory_item.id,
episodic=episodic_index if 'episodic_index' in locals() else None,
semantic=semantic_index if 'semantic_index' in locals() else None,
vector=vector_id
)
async def retrieve(self, query: Query, context: Context) -> List[Memory]:
"""记忆检索流程"""
retrieval_tasks = [
self.vector_store.search(query.embedding, top_k=10),
self.episodic_memory.search_by_time(context.timestamp, window='1h'),
self.semantic_memory.search(query.keywords),
self._search_similar_tasks(query.task_similarity)
]
results = await asyncio.gather(*retrieval_tasks)
fused_results = await self._fuse_retrieval_results(results)
filtered = await self._filter_relevant(fused_results, query.relevance_threshold)
memories = await self._format_as_memories(filtered)
return memories
class SensoryMemory:
"""感知记忆 - 原始数据缓冲区"""
def __init__(self):
self.buffer = CircularBuffer(max_size=1000)
self.temporal_index = TemporalIndex()
self.feature_extractors = {
'visual': VisualFeatureExtractor(),
'textual': TextualFeatureExtractor(),
'temporal': TemporalFeatureExtractor()
}
async def store(self, raw_data: RawData) -> SensoryTrace:
"""存储感知数据"""
trace = SensoryTrace(id=uuid.uuid4(), data=raw_data, timestamp=time.time(), features={})
feature_tasks = []
for name, extractor in self.feature_extractors.items():
task = asyncio.create_task(extractor.extract(raw_data))
feature_tasks.append((name, task))
for name, task in feature_tasks:
trace.features[name] = await task
self.buffer.push(trace)
self.temporal_index.add(trace)
return trace
def get_recent(self, n: int = 10) -> List[SensoryTrace]:
"""获取最近的感知数据"""
return self.buffer.get_last(n)
class EpisodicMemory:
"""情景记忆 - 具体经历存储"""
def __init__(self):
self.timeseries_db = TimeseriesDB()
self.event_graph = EventGraph()
self.emotion_tagger = EmotionTagger()
self.importance_evaluator = ImportanceEvaluator()
async def store(self, memory_item: WorkingMemoryItem) -> EpisodicIndex:
"""存储情景记忆"""
importance = await self.importance_evaluator.evaluate(memory_item)
emotion = await self.emotion_tagger.tag(memory_item)
episode = Episode(
id=uuid.uuid4(),
content=memory_item.content,
timestamp=memory_item.timestamp,
importance=importance,
emotion=emotion,
context=memory_item.context
)
await self.timeseries_db.insert(episode)
await self.event_graph.add_node(episode)
if last_episode := await self._get_last_episode():
await self.event_graph.add_edge(last_episode, episode, relation='temporal_next')
return EpisodicIndex(
episode_id=episode.id,
timestamp=episode.timestamp,
importance=episode.importance
)
class SemanticMemory:
"""语义记忆 - 抽象知识存储"""
def __init__(self):
self.knowledge_graph = KnowledgeGraph()
self.pattern_extractor = PatternExtractor()
self.inference_engine = InferenceEngine()
self.belief_updater = BeliefUpdater()
async def store(self, patterns: List[Pattern]) -> SemanticIndex:
"""存储语义知识"""
concepts = await self._extract_concepts(patterns)
for concept in concepts:
existing = await self.knowledge_graph.find_concept(concept.name)
if existing:
await self.belief_updater.update(existing, concept)
else:
await self.knowledge_graph.add_concept(concept)
for relation in concept.relations:
await self.knowledge_graph.add_relation(concept, relation)
inferred_knowledge = await self.inference_engine.infer(concepts)
for inferred in inferred_knowledge:
await self.knowledge_graph.add_inferred(inferred)
return SemanticIndex(
concepts=[c.name for c in concepts],
relations=len(concepts) * 2,
timestamp=time.time()
)
🌐 三、编排层架构
3.1 工作流引擎
""" 工作流引擎架构
功能:复杂任务编排、状态管理、错误恢复
"""
class WorkflowEngineArchitecture:
"""基于状态机的工作流引擎"""
def __init__(self):
self.workflow_definitions = WorkflowRegistry()
self.state_manager = DistributedStateManager()
self.event_bus = EventBus()
self.checkpoint_service = CheckpointService()
self.compensation_manager = CompensationManager()
async def execute_workflow(self, workflow_id: str, input_data: Dict) -> WorkflowResult:
"""工作流执行流程"""
instance = await self._initialize_instance(workflow_id, input_data)
await self.state_manager.save_state(instance.state)
while not instance.is_completed:
current_state = instance.current_state
next_state = await self._trigger_transition(current_state, instance.context)
execution_result = await self._execute_state_action(next_state, instance)
await self._handle_execution_result(execution_result, instance)
await self.state_manager.save_state(instance.state)
if instance.state.should_checkpoint():
await self.checkpoint_service.create_checkpoint(instance)
await self._cleanup(instance)
return WorkflowResult(
success=instance.is_successful,
output=instance.output,
metrics=instance.metrics
)
class DistributedStateManager:
"""分布式状态管理器"""
def __init__(self):
self.storage_backends = {
'redis': RedisStorage(),
'postgres': PostgresStorage(),
'memory': MemoryStorage(),
's3': S3Storage()
}
self.serializers = {
'json': JSONSerializer(),
'msgpack': MsgPackSerializer(),
'protobuf': ProtobufSerializer()
}
self.partitioner = StatePartitioner()
self.synchronizer = StateSynchronizer()
async def save_state(self, state: WorkflowState) -> StateVersion:
"""保存状态"""
partitions = await self.partitioner.partition(state)
storage_tasks = []
for partition in partitions:
backend = self._select_storage_backend(partition)
serializer = self._select_serializer(partition)
task = asyncio.create_task(
self._store_partition(partition, backend, serializer)
)
storage_tasks.append(task)
await asyncio.gather(*storage_tasks)
version = await self._generate_version(state)
await self.synchronizer.sync(state, version)
return version
def _select_storage_backend(self, partition: StatePartition) -> StorageBackend:
"""智能选择存储后端"""
size = len(str(partition.data))
if size < 10 * 1024:
return self.storage_backends['memory']
elif size < 1 * 1024 * 1024:
return self.storage_backends['redis']
elif size < 10 * 1024 * 1024:
return self.storage_backends['postgres']
else:
return self.storage_backends['s3']
class CompensationManager:
"""补偿事务管理器 - Saga 模式实现"""
def __init__(self):
self.compensation_actions = CompensationRegistry()
self.transaction_log = TransactionLog()
self.recovery_strategies = {
'retry': RetryStrategy(),
'compensate': CompensateStrategy(),
'forward_recovery': ForwardRecoveryStrategy(),
'manual': ManualInterventionStrategy()
}
async def execute_with_compensation(self, actions: List[Action]) -> bool:
"""执行带补偿的事务"""
executed_actions = []
try:
for action in actions:
result = await action.execute()
await self.transaction_log.log_execution(action, result)
if compensation := action.get_compensation():
await self.compensation_actions.register(
action_id=action.id,
compensation=compensation
)
executed_actions.append(action)
return True
except Exception as e:
await self._compensate_executed(executed_actions)
return False
async def _compensate_executed(self, executed_actions: List[Action]):
"""补偿已执行的动作"""
for action in reversed(executed_actions):
try:
compensation = await self.compensation_actions.get(action.id)
if compensation:
await compensation.execute()
await self.transaction_log.log_compensation(action, True)
except Exception as e:
await self.transaction_log.log_compensation(action, False, str(e))
3.2 服务网格与通信
""" 服务网格架构
功能:服务发现、负载均衡、熔断、限流
"""
class ServiceMeshArchitecture:
"""微服务通信基础设施"""
def __init__(self):
self.registry = ServiceRegistry()
self.discovery = ServiceDiscovery(self.registry)
self.load_balancers = {
'round_robin': RoundRobinBalancer(),
'least_connections': LeastConnectionsBalancer(),
'consistent_hash': ConsistentHashBalancer(),
'weighted': WeightedBalancer()
}
self.circuit_breakers = CircuitBreakerFactory()
self.rate_limiters = RateLimiterFactory()
self.tracer = DistributedTracer()
async def call_service(self, service_name: str, request: Request) -> Response:
"""服务调用全流程"""
instances = await self.discovery.discover(service_name)
if not instances:
raise ServiceUnavailableError(f"Service {service_name} not found")
balancer = self._select_balancer(service_name, request)
selected_instance = await balancer.select(instances, request)
if await self.circuit_breakers.is_open(selected_instance.id):
raise CircuitBreakerOpenError(selected_instance.id)
if not await self.rate_limiters.try_acquire(selected_instance.id):
raise RateLimitExceededError(selected_instance.id)
with self.tracer.start_span(f"call_{service_name}") as span:
span.set_tag("instance", selected_instance.id)
span.set_tag("service", service_name)
start_time = time.time()
try:
response = await self._execute_call(selected_instance, request, span)
duration = time.time() - start_time
await self._record_success(selected_instance.id, duration)
return response
except Exception as e:
await self._record_failure(selected_instance.id, e)
await self.circuit_breakers.record_failure(selected_instance.id)
raise
class ServiceRegistry:
"""服务注册中心"""
def __init__(self):
self.services = defaultdict(list)
self.health_checker = HealthChecker()
self.lease_manager = LeaseManager()
async def register(self, service: ServiceInstance) -> bool:
"""服务注册"""
if not await self.health_checker.check(service):
return False
lease = await self.lease_manager.grant_lease(service)
self.services[service.name].append({
'instance': service,
'lease': lease,
'metadata': service.metadata,
'registered_at': time.time(),
'last_heartbeat': time.time()
})
await self._notify_registration(service)
return True
async def deregister(self, service_id: str) -> bool:
"""服务注销"""
for service_name, instances in self.services.items():
for i, instance in enumerate(instances):
if instance['instance'].id == service_id:
await self.lease_manager.revoke_lease(instance['lease'])
instances.pop(i)
await self._notify_deregistration(instance['instance'])
return True
return False
class CircuitBreakerFactory:
"""熔断器工厂"""
def __init__(self):
self.breakers = {}
self.config_manager = CircuitBreakerConfigManager()
self.state_transitioner = CircuitBreakerStateTransitioner()
async def is_open(self, service_id: str) -> bool:
"""检查熔断器是否打开"""
breaker = await self._get_or_create_breaker(service_id)
return breaker.state == 'OPEN'
async def record_failure(self, service_id: str):
"""记录失败"""
breaker = await self._get_or_create_breaker(service_id)
await breaker.record_failure()
if await breaker.should_trip():
await self.state_transitioner.trip(breaker)
async def record_success(self, service_id: str):
"""记录成功"""
breaker = await self._get_or_create_breaker(service_id)
await breaker.record_success()
if await breaker.should_reset():
await self.state_transitioner.reset(breaker)
class CircuitBreaker:
"""熔断器实现"""
def __init__(self, config):
self.state = 'CLOSED'
self.failure_count = 0
self.success_count = 0
self.last_failure_time = None
self.config = config
self.metrics = CircuitBreakerMetrics()
async def record_failure(self):
"""记录失败"""
self.failure_count += 1
self.last_failure_time = time.time()
await self.metrics.record_failure()
async def record_success(self):
"""记录成功"""
self.success_count += 1
await self.metrics.record_success()
async def should_trip(self) -> bool:
"""判断是否需要熔断"""
total = self.failure_count + self.success_count
if total >= self.config.minimum_calls:
failure_rate = self.failure_count / total
if failure_rate > self.config.failure_rate_threshold:
return True
if self.failure_count >= self.config.consecutive_failure_threshold:
return True
return False
async def should_reset(self) -> bool:
"""判断是否需要重置"""
if self.state == 'OPEN':
if (time.time() - self.last_failure_time) > self.config.wait_duration:
return True
elif self.state == 'HALF_OPEN':
if self.success_count >= self.config.success_threshold:
return True
return False
📊 四、数据流与状态管理
4.1 数据流架构
""" 数据流架构
基于事件驱动的数据管道
"""
class DataflowArchitecture:
"""事件驱动的数据流处理"""
def __init__(self):
self.sources = {
'perception': PerceptionDataSource(),
'execution': ExecutionDataSource(),
'monitoring': MonitoringDataSource(),
'external': ExternalDataSource()
}
self.processors = {
'filter': FilterProcessor(),
'transform': TransformProcessor(),
'enrich': EnrichProcessor(),
'aggregate': AggregateProcessor()
}
self.sinks = {
'storage': StorageSink(),
'analytics': AnalyticsSink(),
'alerting': AlertingSink(),
'dashboard': DashboardSink()
}
self.stream_processor = StreamProcessor()
self.batch_processor = BatchProcessor()
async def process_dataflow(self, flow_id: str) -> DataflowResult:
"""处理数据流"""
flow_def = await self._get_flow_definition(flow_id)
pipeline = await self._build_pipeline(flow_def)
source_streams = []
for source_config in flow_def.sources:
source = self.sources[source_config.type]
stream = await source.start_stream(source_config)
source_streams.append(stream)
merged_stream = await self._merge_streams(source_streams)
processed_stream = merged_stream
for processor_config in flow_def.processors:
processor = self.processors[processor_config.type]
processed_stream = await processor.process(
processed_stream, processor_config
)
sink_tasks = []
for sink_config in flow_def.sinks:
sink = self.sinks[sink_config.type]
task = asyncio.create_task(
sink.receive(processed_stream, sink_config)
)
sink_tasks.append(task)
metrics_task = asyncio.create_task(
self._collect_metrics(processed_stream)
)
await asyncio.gather(*sink_tasks, metrics_task)
return DataflowResult(success=True, metrics=await metrics_task)
class StreamProcessor:
"""流式处理器"""
def __init__(self):
self.window_manager = WindowManager()
self.state_backend = StreamStateBackend()
self.watermark_generator = WatermarkGenerator()
self.late_data_handler = LateDataHandler()
async def process(self, stream: DataStream, processors: List[Processor]) -> DataStream:
"""流式处理管道"""
processed = stream
for processor in processors:
if processor.window_config:
windowed = await self.window_manager.apply_window(
processed, processor.window_config
)
else:
windowed = processed
processed = await processor.process(windowed)
if processor.stateful:
await self.state_backend.manage_state(processed, processor)
if processor.handle_late_data:
processed = await self.late_data_handler.handle(
processed, processor
)
return processed
class WindowManager:
"""窗口管理器"""
async def apply_window(self, stream: DataStream, config: WindowConfig) -> WindowedStream:
"""应用窗口"""
window_type = config.type
if window_type == 'tumbling':
return await self._apply_tumbling_window(stream, config)
elif window_type == 'sliding':
return await self._apply_sliding_window(stream, config)
elif window_type == 'session':
return await self._apply_session_window(stream, config)
elif window_type == 'global':
return await self._apply_global_window(stream, config)
else:
raise ValueError(f"Unknown window type: {window_type}")
async def _apply_tumbling_window(self, stream: DataStream, config: WindowConfig) -> WindowedStream:
"""滚动窗口"""
window_size = config.size
windows = []
current_window = Window(
start=stream.events[0].timestamp,
end=stream.events[0].timestamp + window_size
)
for event in stream.events:
if event.timestamp >= current_window.end:
windows.append(current_window)
current_window = Window(
start=current_window.end,
end=current_window.end + window_size
)
current_window.add_event(event)
if current_window.events:
windows.append(current_window)
return WindowedStream(windows=windows)
4.2 状态管理架构
""" 状态管理架构
分布式、持久化、一致性保证
"""
class StateManagementArchitecture:
"""分布式状态管理系统"""
def __init__(self):
self.storage = DistributedStateStorage()
self.synchronizer = StateSynchronizer()
self.version_manager = VersionManager()
self.partitioner = StatePartitioner()
self.cache = StateCache()
async def get_state(self, key: StateKey, options: GetOptions = None) -> StateValue:
"""获取状态"""
if options and options.use_cache:
cached = await self.cache.get(key)
if cached:
return cached
partition = await self.partitioner.get_partition(key)
value = await self.storage.get(partition, key)
if options and options.use_cache:
await self.cache.set(key, value, ttl=options.cache_ttl)
return value
async def set_state(self, key: StateKey, value: StateValue, options: SetOptions = None) -> bool:
"""设置状态"""
if not await self._validate_state(value):
raise InvalidStateError(value)
version = await self.version_manager.generate_version(key, value)
partition = await self.partitioner.get_partition(key)
success = await self.storage.set(
partition, key, value, version, options
)
if not success:
return False
if options and options.replicate:
await self.synchronizer.replicate(key, value, version)
if options and options.update_cache:
await self.cache.set(key, value, ttl=options.cache_ttl)
return True
class DistributedStateStorage:
"""分布式状态存储"""
def __init__(self):
self.storage_layers = {
'L0': InMemoryStorage(),
'L1': RedisStorage(),
'L2': DatabaseStorage(),
'L3': ObjectStorage()
}
self.storage_policy = StoragePolicy()
self.compressors = {
'gzip': GzipCompressor(),
'lz4': LZ4Compressor(),
'zstd': ZstdCompressor()
}
async def get(self, partition: Partition, key: StateKey) -> StateValue:
"""从多级存储获取"""
for level in ['L0', 'L1', 'L2', 'L3']:
storage = self.storage_layers[level]
if await storage.contains(partition, key):
value = await storage.get(partition, key)
if level in ['L2', 'L3']:
await self._promote_to_higher_level(key, value)
return value
raise KeyNotFoundError(key)
async def set(self, partition: Partition, key: StateKey, value: StateValue, version: Version, options: SetOptions) -> bool:
"""写入多级存储"""
target_levels = self.storage_policy.get_target_levels(value, options)
compressed_value = await self._compress(value, options.compression)
write_tasks = []
for level in target_levels:
storage = self.storage_layers[level]
task = asyncio.create_task(
storage.set(partition, key, compressed_value, version)
)
write_tasks.append(task)
results = await asyncio.gather(*write_tasks, return_exceptions=True)
success = all(r is True for r in results)
return success
class StateSynchronizer:
"""状态同步器 - 基于 CRDT"""
def __init__(self):
self.crdt_types = {
'counter': GCounter(),
'set': GSet(),
'map': ORMap(),
'register': LWWRegister()
}
self.conflict_resolvers = {
'last_write_wins': LastWriteWinsResolver(),
'merge': MergeResolver(),
'custom': CustomResolver()
}
self.sync_protocols = {
'gossip': GossipProtocol(),
'anti_entropy': AntiEntropyProtocol(),
'state_transfer': StateTransferProtocol()
}
async def synchronize(self, node_id: str, state: Dict) -> SynchronizedState:
"""状态同步"""
protocol = self._select_protocol(state)
neighbors = await self._get_neighbors(node_id)
sync_results = []
for neighbor in neighbors:
result = await protocol.sync(node_id, neighbor, state)
sync_results.append(result)
merged_state = await self._merge_results(sync_results, state)
resolved_state = await self._resolve_conflicts(merged_state)
return resolved_state
async def _merge_results(self, results: List, local_state: Dict) -> Dict:
"""合并多个同步结果"""
merged = local_state.copy()
for result in results:
for key, remote_value in result.items():
if key not in merged:
merged[key] = remote_value
else:
local_value = merged[key]
crdt_type = self._get_crdt_type(key)
merged_value = await crdt_type.merge(local_value, remote_value)
merged[key] = merged_value
return merged
🔐 五、安全架构
5.1 安全架构设计
""" 安全架构
多层次防御体系
"""
class SecurityArchitecture:
"""深度防御安全架构"""
def __init__(self):
self.authentication = MultiFactorAuthentication()
self.authorization = AttributeBasedAuthorization()
self.encryption = EndToEndEncryption()
self.audit = ComprehensiveAudit()
self.threat_detection = ThreatDetectionSystem()
self.vulnerability_management = VulnerabilityManagement()
async def secure_operation(self, operation: Operation, context: SecurityContext) -> SecurityResult:
"""安全操作执行"""
if not await self._validate_input(operation.input):
raise SecurityValidationError("Invalid input")
if not await self.authentication.authenticate(context.user):
raise AuthenticationError("Authentication failed")
if not await self.authorization.check_permission(context.user, operation):
raise AuthorizationError("Permission denied")
encrypted_data = await self.encryption.encrypt(operation.data)
result = await self._execute_in_sandbox(operation, encrypted_data)
if not await self._validate_output(result):
raise SecurityValidationError("Invalid output")
await self.audit.log_operation(operation, context, result)
await self.threat_detection.analyze(operation, result)
return SecurityResult(
data=result,
security_level='high',
audit_trail=await self.audit.get_trail(operation.id)
)
class MultiFactorAuthentication:
"""多因素认证"""
def __init__(self):
self.factors = {
'knowledge': KnowledgeFactor(),
'possession': PossessionFactor(),
'inherence': InherenceFactor(),
'location': LocationFactor(),
'behavior': BehaviorFactor()
}
self.policies = {
'basic': ['knowledge'],
'standard': ['knowledge', 'possession'],
'high': ['knowledge', 'possession', 'inherence'],
'critical': ['knowledge', 'possession', 'inherence', 'location']
}
self.risk_assessor = RiskAssessor()
async def authenticate(self, user: User, operation: Operation = None) -> bool:
"""多因素认证"""
risk_level = await self.risk_assessor.assess(user, operation)
policy_name = self._select_policy(risk_level, operation)
required_factors = self.policies[policy_name]
factor_tasks = []
for factor_name in required_factors:
factor = self.factors[factor_name]
task = asyncio.create_task(factor.verify(user))
factor_tasks.append(task)
results = await asyncio.gather(*factor_tasks)
return all(results)
class AttributeBasedAuthorization:
"""基于属性的授权"""
def __init__(self):
self.pdp = PolicyDecisionPoint()
self.pep = PolicyEnforcementPoint()
self.pap = PolicyAdministrationPoint()
self.pip = PolicyInformationPoint()
self.attribute_store = AttributeStore()
async def check_permission(self, user: User, operation: Operation) -> bool:
"""授权检查"""
user_attrs = await self.attribute_store.get_user_attributes(user.id)
resource_attrs = await self.attribute_store.get_resource_attributes(operation.resource)
env_attrs = await self.pip.get_environment_attributes()
request = DecisionRequest(
subject=user_attrs,
resource=resource_attrs,
action=operation.action,
environment=env_attrs
)
decision = await self.pdp.evaluate(request)
if decision.permit:
await self.pep.enforce_permit(operation, decision.obligations)
return True
else:
await self.pep.enforce_deny(operation, decision.reasons)
return False
class ThreatDetectionSystem:
"""威胁检测系统"""
def __init__(self):
self.detection_engines = {
'signature': SignatureBasedEngine(),
'anomaly': AnomalyDetectionEngine(),
'behavior': BehaviorAnalysisEngine(),
'heuristic': HeuristicEngine(),
'machine_learning': MLEngine()
}
self.threat_intelligence = ThreatIntelligenceFeed()
self.event_correlator = EventCorrelator()
self.response_engine = ResponseEngine()
async def analyze(self, operation: Operation, result: Any) -> ThreatAnalysis:
"""威胁分析"""
detection_tasks = []
for name, engine in self.detection_engines.items():
task = asyncio.create_task(
engine.analyze(operation, result)
)
detection_tasks.append((name, task))
detections = {}
for name, task in detection_tasks:
detections[name] = await task
ti_matches = await self.threat_intelligence.match(operation, result)
correlated = await self.event_correlator.correlate(detections, ti_matches)
risk = await self._assess_risk(correlated)
if risk.level > THRESHOLD:
response = await self.response_engine.decide_response(risk, correlated)
await self.response_engine.execute(response)
return ThreatAnalysis(
detections=detections,
ti_matches=ti_matches,
correlated_events=correlated,
risk_assessment=risk,
response_taken=risk.level > THRESHOLD
)
📈 六、可观测性架构
6.1 监控体系
""" 可观测性架构
监控、日志、追踪三位一体
"""
class ObservabilityArchitecture:
"""全面的可观测性体系"""
def __init__(self):
self.metrics_collector = MetricsCollector()
self.log_collector = LogCollector()
self.tracer = DistributedTracer()
self.event_collector = EventCollector()
self.profiler = PerformanceProfiler()
self.visualizer = MetricsVisualizer()
self.alert_manager = AlertManager()
async def instrument_operation(self, operation: Operation) -> Instrumentation:
"""操作埋点"""
span = self.tracer.start_span(operation.name)
await self.metrics_collector.record_start(operation)
await self.log_collector.log_start(operation, span)
profile_id = await self.profiler.start_profile(operation)
return Instrumentation(
span=span,
metrics_start=time.time(),
profile_id=profile_id,
operation_id=operation.id
)
async def complete_operation(self, instrumentation: Instrumentation, result: Any, error: Exception = None):
"""完成操作记录"""
instrumentation.span.finish()
duration = time.time() - instrumentation.metrics_start
await self.metrics_collector.record_end(
instrumentation.operation_id, duration, error
)
await self.log_collector.log_end(
instrumentation.operation_id, result, error, instrumentation.span
)
if instrumentation.profile_id:
await self.profiler.stop_profile(instrumentation.profile_id)
event = OperationEvent(
operation_id=instrumentation.operation_id,
duration=duration,
success=error is None,
error=error,
span_id=instrumentation.span.span_id,
trace_id=instrumentation.span.trace_id
)
await self.event_collector.collect(event)
class MetricsCollector:
"""指标收集器 - 支持多种指标类型"""
def __init__(self):
self.metric_types = {
'counter': CounterMetric(),
'gauge': GaugeMetric(),
'histogram': HistogramMetric(),
'summary': SummaryMetric(),
'rate': RateMetric()
}
self.aggregators = {
'time': TimeBasedAggregator(),
'space': SpaceBasedAggregator(),
'cardinality': CardinalityAggregator()
}
self.storage_backends = {
'prometheus': PrometheusBackend(),
'influxdb': InfluxDBBackend(),
'timescale': TimescaleBackend()
}
async def record_metric(self, metric: Metric) -> bool:
"""记录指标"""
if not await self._validate_metric(metric):
return False
processor = self.metric_types[metric.type]
processed = await processor.process(metric)
aggregated = await self.aggregators[metric.aggregation].aggregate(processed)
storage = self.storage_backends[metric.storage_backend]
success = await storage.store(aggregated)
return success
async def query_metrics(self, query: MetricQuery) -> MetricResult:
"""查询指标"""
parsed_query = await self._parse_query(query)
query_tasks = []
for backend_name in parsed_query.backends:
backend = self.storage_backends[backend_name]
task = asyncio.create_task(
backend.query(parsed_query)
)
query_tasks.append(task)
results = await asyncio.gather(*query_tasks)
merged = await self._merge_results(results, parsed_query.merge_strategy)
processed = await self._postprocess(merged, parsed_query.postprocessing)
return MetricResult(
data=processed,
query=query,
metadata=self._generate_metadata(results)
)
class DistributedTracer:
"""分布式追踪系统"""
def __init__(self):
self.sampling_strategies = {
'probabilistic': ProbabilisticSampler(),
'rate_limiting': RateLimitingSampler(),
'adaptive': AdaptiveSampler()
}
self.propagation_formats = {
'jaeger': JaegerFormat(),
'zipkin': ZipkinFormat(),
'ot': OpenTelemetryFormat(),
'b3': B3Format()
}
self.trace_storage = TraceStorage()
self.trace_analyzer = TraceAnalyzer()
def start_span(self, name: str, parent: Span = None) -> Span:
"""开始一个 span"""
sampler = self.sampling_strategies['adaptive']
sampling_decision = sampler.should_sample(name, parent)
if not sampling_decision.sample:
return NoOpSpan()
span_id = self._generate_span_id()
trace_id = parent.trace_id if parent else self._generate_trace_id()
span = Span(
name=name,
span_id=span_id,
trace_id=trace_id,
parent_id=parent.span_id if parent else None,
start_time=time.time(),
sampling_rate=sampling_decision.rate,
attributes={}
)
propagation_data = self.propagation_formats['ot'].inject(span)
span.propagation_data = propagation_data
return span
async def export_trace(self, span: Span):
"""导出追踪数据"""
trace_data = await self._collect_trace_data(span)
if self._should_batch(trace_data):
await self._batch_trace_data(trace_data)
else:
await self.trace_storage.store(trace_data)
await self.trace_analyzer.analyze(trace_data)
🚀 七、扩展性设计
7.1 插件架构
""" 插件架构
热插拔、动态加载、运行时扩展
"""
class PluginArchitecture:
"""插件化系统架构"""
def __init__(self):
self.registry = PluginRegistry()
self.loader = PluginLoader()
self.manager = PluginManager()
self.dependency_resolver = DependencyResolver()
self.sandbox = PluginSandbox()
self.hot_reload = HotReloadManager()
async def load_plugin(self, plugin_path: str, config: PluginConfig = None) -> PluginHandle:
"""加载插件"""
plugin_info = await self._discover_plugin(plugin_path)
dependencies = await self.dependency_resolver.resolve(
plugin_info.dependencies
)
if not await self._security_verify(plugin_info):
raise SecurityError("Plugin failed security verification")
plugin_instance = await self.sandbox.load_in_sandbox(
plugin_path, config
)
await plugin_instance.initialize()
handle = await self.registry.register(
plugin_info, plugin_instance, dependencies
)
await self._connect_plugin(handle)
await plugin_instance.start()
return handle
async def unload_plugin(self, plugin_id: str) -> bool:
"""卸载插件"""
plugin = await self.registry.get(plugin_id)
await plugin.instance.stop()
await self._disconnect_plugin(plugin)
success = await self.registry.unregister(plugin_id)
if success:
await plugin.instance.cleanup()
await self.sandbox.unload(plugin_id)
return success
class PluginRegistry:
"""插件注册中心"""
def __init__(self):
self.plugins = {}
self.categories = defaultdict(list)
self.interfaces = defaultdict(list)
self.lifecycle = PluginLifecycleManager()
self.version_manager = PluginVersionManager()
async def register(self, plugin_info: PluginInfo, instance: PluginInstance, dependencies: List[Dependency]) -> PluginHandle:
"""注册插件"""
if plugin_info.id in self.plugins:
raise PluginAlreadyRegisteredError(plugin_info.id)
if not await self.version_manager.check_compatibility(plugin_info):
raise VersionCompatibilityError(plugin_info.version)
entry = PluginEntry(
info=plugin_info,
instance=instance,
dependencies=dependencies,
state='loading',
registered_at=time.time(),
last_heartbeat=time.time()
)
self.plugins[plugin_info.id] = entry
self.categories[plugin_info.category].append(plugin_info.id)
for interface in plugin_info.implements:
self.interfaces[interface].append(plugin_info.id)
await self._notify_plugin_registered(plugin_info)
return PluginHandle(
plugin_id=plugin_info.id,
instance=instance,
entry=entry
)
async def get_plugins_by_interface(self, interface: str) -> List[PluginHandle]:
"""通过接口获取插件"""
plugin_ids = self.interfaces.get(interface, [])
plugins = []
for plugin_id in plugin_ids:
if entry := self.plugins.get(plugin_id):
if entry.state == 'active':
plugins.append(PluginHandle(
plugin_id=plugin_id,
instance=entry.instance,
entry=entry
))
return plugins
class PluginSandbox:
"""插件沙箱 - 安全隔离"""
def __init__(self):
self.isolation_techniques = {
'process': ProcessIsolation(),
'container': ContainerIsolation(),
'vm': VMIsolation(),
'wasm': WebAssemblyIsolation()
}
self.resource_limits = ResourceLimiter()
self.permission_controller = PermissionController()
self.behavior_monitor = BehaviorMonitor()
async def load_in_sandbox(self, plugin_path: str, config: PluginConfig) -> PluginInstance:
"""在沙箱中加载插件"""
isolation = self._select_isolation_technique(config)
sandbox_env = await isolation.create_environment(
plugin_path, config
)
await self.resource_limits.apply_limits(sandbox_env, config.resource_limits)
await self.permission_controller.set_permissions(
sandbox_env, config.permissions
)
plugin_code = await self._load_plugin_code(plugin_path)
plugin_instance = await isolation.execute_in_sandbox(
sandbox_env, plugin_code, config
)
await self.behavior_monitor.start_monitoring(
plugin_instance, sandbox_env
)
return plugin_instance
该 OpenClaw 系统架构详解涵盖了从底层核心引擎到上层应用编排的完整设计:
- 核心层 - 四大引擎的详细架构
- 编排层 - 工作流、服务网格、数据流
- 状态管理 - 分布式状态、CRDT、一致性
- 安全架构 - 深度防御、多因素认证、威胁检测
- 可观测性 - 监控、日志、追踪一体化
- 扩展性 - 插件化架构、热插拔支持
每个组件都采用了工业级的设计模式,具备高可用、高扩展、高安全的特性。这个架构可以作为构建复杂 AI 自动化系统的蓝图。
相关免费在线工具
- 加密/解密文本
使用加密算法(如AES、TripleDES、Rabbit或RC4)加密和解密文本明文。 在线工具,加密/解密文本在线工具,online
- RSA密钥对生成器
生成新的随机RSA私钥和公钥pem证书。 在线工具,RSA密钥对生成器在线工具,online
- Mermaid 预览与可视化编辑
基于 Mermaid.js 实时预览流程图、时序图等图表,支持源码编辑与即时渲染。 在线工具,Mermaid 预览与可视化编辑在线工具,online
- 随机西班牙地址生成器
随机生成西班牙地址(支持马德里、加泰罗尼亚、安达卢西亚、瓦伦西亚筛选),支持数量快捷选择、显示全部与下载。 在线工具,随机西班牙地址生成器在线工具,online
- Gemini 图片去水印
基于开源反向 Alpha 混合算法去除 Gemini/Nano Banana 图片水印,支持批量处理与下载。 在线工具,Gemini 图片去水印在线工具,online
- curl 转代码
解析常见 curl 参数并生成 fetch、axios、PHP curl 或 Python requests 示例代码。 在线工具,curl 转代码在线工具,online