【MADRL】多智能体近端策略优化(MAPPO)算法

【MADRL】多智能体近端策略优化(MAPPO)算法
        本篇文章是博主强化学习RL领域学习时,用于个人学习、研究或者欣赏使用,并基于博主对相关等领域的一些理解而记录的学习摘录和笔记,若有不当和侵权之处,指出后将会立即改正,还望谅解。文章分类在强化学习专栏:

       强化学习(8)---《【MADRL】多智能体近端策略优化(MAPPO)算法》

【MADRL】多智能体近端策略优化(MAPPO)算法

目录

0.前言

1.背景与动机

2.算法结构

3.具体公式

4.算法流程

5.公式总结

6.优势与应用场景

7.结论

 [Python] MAPPO实现(可移植)


0.前言

       多智能体近端策略优化算法 MAPPO(Multi-Agent Proximal Policy Optimization)是PPO(Proximal Policy Optimization)在多智能体环境中的一种扩展,它通过在多智能体系统中引入PPO的策略优化机制,实现了在协作和竞争环境中更加高效的策略学习。MAPPO是一种基于策略梯度的多智能体强化学习算法,特别适用于混合协作和竞争的多智能体场景。

论文:The Surprising Effectiveness of PPO in Cooperative, Multi-Agent Games

代码:MADRL多智能体近端策略优化(MAPPO)算法
 


1.背景与动机

        PPO 是近年来最流行的强化学习算法之一,它通过引入裁剪的策略更新,解决了传统策略梯度方法(如TRPO)中策略更新步长过大导致训练不稳定的问题。在多智能体环境中,多个智能体同时学习策略,每个智能体的行为会影响其他智能体的决策,因此需要一个鲁棒且稳定的策略优化方法。MAPPO通过中心化的Critic和去中心化的Actor来实现多智能体的协同训练,并采用PPO的优势来提高多智能体环境下的学习效率和稳定性。


2.算法结构

        MAPPO继承了PPO的核心思想,并结合了多智能体系统的特点,采用了集中式训练,分布式执行的架构:

  • 集中式训练:在训练阶段,所有智能体的Critic网络都能够访问全局的状态和其他智能体的动作信息,以便于学习到更准确的价值函数。
  • 分布式执行:在执行阶段,每个智能体只使用自己观测到的局部状态和策略进行动作选择,保证了系统的分布式控制。

3.具体公式

        MAPPO算法主要分为两部分:策略更新价值函数估计。我们将分别介绍这两部分的公式。

优势函数的计算:优势函数

( \hat{A}t^i )

用于衡量某个动作

( a_t )

相对于当前策略下平均动作的优劣程度。优势函数可以通过以下公式估计:

[ \hat{A}t^i = \delta_t + (\gamma \lambda) \delta{t+1} + ... + (\gamma \lambda)^{T-t+1} \delta{T-1} ]


其中

( \delta_t )

是时间差分误差,定义为:

[ \delta_t = r_t + \gamma V(s_{t+1}) - V(s_t) ]


这里

( \lambda )

是GAE(Generalized Advantage Estimation)中的权重参数,用于平衡偏差和方差。

价值函数估计:每个智能体

( i )

的价值函数

( V_i(s) )

由一个中心化的Critic网络估计。Critic网络使用全局状态

( s )

和所有智能体的动作

( a_1, a_2, ..., a_N )

来估计全局的价值函数。MAPPO通过使用中心化Critic保证每个智能体在训练过程中可以考虑到其他智能体的策略,从而学到更有效的策略。Critic的目标是最小化均方误差(MSE)损失函数:

[ L(\phi_i) = \mathbb{E}{s_t, r_t, s{t+1}} \left[ \left( V_i(s_t; \phi_i) - R_t \right)^2 \right] ]


其中,

( R_t )

是从当前时刻

( t )

到未来的累计回报,通常通过时间差分法(TD目标)进行估计:

[ R_t = r_t + \gamma V_i(s_{t+1}; \phi'_i) ]


其中,

( \gamma )

是折扣因子,

( \phi'_i )

是目标网络的参数。

( \epsilon )

是裁剪的阈值,用于控制策略更新的幅度。

( \hat{A}_t )

是优势函数的估计值,用于衡量动作

( a_t )

在状态

( s_t )

下的优势;

( r_t(\theta) = \frac{\pi_{\theta}(a_t | s_t)}{\pi_{\theta_{\text{old}}}(a_t | s_t)} )

是当前策略与旧策略的比率;

策略更新:        在PPO中,策略更新的目标是通过最大化策略的期望回报

( \mathbb{E}_{\pi} [R] )

来更新策略。然而,直接使用策略梯度更新容易导致大的策略变化。因此,PPO引入了一个裁剪目标函数来限制每次更新的策略变化幅度。MAPPO也遵循相同的原则,但应用在每个智能体 ( i ) 的策略上。PPO的目标函数为:

[ L^{CLIP}(\theta) = \mathbb{E}_t \left[ \min \left( r_t(\theta) \hat{A}_t, \text{clip}(r_t(\theta), 1 - \epsilon, 1 + \epsilon) \hat{A}_t \right) \right] ]


其中:        MAPPO中的每个智能体

( i )

采用类似的目标函数进行策略更新,但每个智能体的策略仅依赖于自己的观测

( o_i )

,即:

[ L^{CLIP}_i(\theta_i) = \mathbb{E}_t \left[ \min \left( r_t(\theta_i) \hat{A}_t^i, \text{clip}(r_t(\theta_i), 1 - \epsilon, 1 + \epsilon) \hat{A}_t^i \right) \right] ]


其中

( r_t(\theta_i) )

是智能体

( i )

的策略更新比率,

( \hat{A}_t^i )

是智能体

( i )

的优势估计值。


4.算法流程

  1. 交互与经验收集:每个智能体根据当前策略与环境交互,并存储每个时间步的状态、动作、奖励、下一状态、价值等信息。
  2. 更新Critic网络:根据Critic损失函数更新每个智能体的Critic网络参数,最小化均方误差。
  3. 更新Actor网络:根据裁剪的PPO目标函数,使用策略梯度法更新每个智能体的策略网络参数。
  4. 软更新目标网络:使用软更新机制逐步更新目标网络的参数。
  5. 重复:循环进行以上步骤,直到智能体策略达到收敛。

计算回报和优势:通过时间差分法计算每个智能体的回报

( R_t )

和优势函数

( \hat{A}_t^i )

初始化:为每个智能体初始化策略网络

( \pi_i )

和Critic网络

( V_i )

,并初始化对应的目标网络。


5.公式总结

优势函数估计

[ \hat{A}t^i = \sum{l=0}^{T-t} (\gamma \lambda)^l \delta_{t+l} ]

Critic网络损失函数

[ L(\phi_i) = \mathbb{E}{s_t, r_t, s{t+1}} \left[ \left( V_i(s_t; \phi_i) - R_t \right)^2 \right] ]

策略更新目标

[ L^{CLIP}_i(\theta_i) = \mathbb{E}_t \left[ \min \left( r_t(\theta_i) \hat{A}_t^i, \text{clip}(r_t(\theta_i), 1 - \epsilon, 1 + \epsilon) \hat{A}_t^i \right) \right] ]

6.优势与应用场景

  • 鲁棒性与稳定性:PPO算法引入的裁剪更新机制使策略梯度更新更加稳定,避免了更新幅度过大的问题。MAPPO继承了这一优势,能够在多智能体环境中提供更稳定的策略更新。
  • 集中式训练与分布式执行:通过中心化的Critic结构,智能体可以利用全局信息进行策略训练,而去中心化的Actor使得智能体在执行过程中只依赖局部观测,提高了算法的灵活性和扩展性。
  • 适应复杂的多智能体环境:MAPPO可以处理多智能体环境中的协作、竞争或混合型任务,非常适用于复杂的多智能体系统,如机器人集群、自动驾驶车队、多人游戏等。

7.结论

        MAPPO是对PPO算法的多智能体扩展,采用了中心化的Critic和去中心化的Actor结构,能够在多智能体环境中提供稳定、高效的策略优化。通过PPO的裁剪更新机制,MAPPO在策略更新过程中保持了良好的收敛性和鲁棒性,是当前研究和应用中广泛使用的算法之一。


 [Python] MAPPO实现(可移植)

        若是下面代码复现困难或者有问题,欢迎评论区留言;需要以整个项目形式的代码,请在评论区留下您的邮箱,以便于及时分享给您(私信难以及时回复)。

主文件:MAPPO_MPE_main

import torch import numpy as np from torch.utils.tensorboard import SummaryWriter import argparse from normalization import Normalization, RewardScaling from replay_buffer import ReplayBuffer from mappo_mpe import MAPPO_MPE from environment import Env class Runner_MAPPO_MPE: def __init__(self, args, env_name, number, seed): self.args = args self.env_name = env_name self.number = number self.seed = seed # Set random seed np.random.seed(self.seed) torch.manual_seed(self.seed) # Create env self.env = Env(env_name, discrete=True) # Discrete action space self.args.N = self.env.n # The number of agents self.args.obs_dim_n = [self.env.observation_space[i].shape[0] for i in range(self.args.N)] # obs dimensions of N agents self.args.action_dim_n = [self.env.action_space[i].n for i in range(self.args.N)] # actions dimensions of N agents # Only for homogenous agents environments like Spread in MPE,all agents have the same dimension of observation space and action space self.args.obs_dim = self.args.obs_dim_n[0] # The dimensions of an agent's observation space self.args.action_dim = self.args.action_dim_n[0] # The dimensions of an agent's action space self.args.state_dim = np.sum(self.args.obs_dim_n) # The dimensions of global state space(Sum of the dimensions of the local observation space of all agents) print("observation_space=", self.env.observation_space) print("obs_dim_n={}".format(self.args.obs_dim_n)) print("action_space=", self.env.action_space) print("action_dim_n={}".format(self.args.action_dim_n)) # Create N agents self.agent_n = MAPPO_MPE(self.args) self.replay_buffer = ReplayBuffer(self.args) # Create a tensorboard self.writer = SummaryWriter(log_dir='runs/MAPPO/MAPPO_env_{}_number_{}_seed_{}'.format(self.env_name, self.number, self.seed)) self.evaluate_rewards = [] # Record the rewards during the evaluating self.total_steps = 0 if self.args.use_reward_norm: print("------use reward norm------") self.reward_norm = Normalization(shape=self.args.N) elif self.args.use_reward_scaling: print("------use reward scaling------") self.reward_scaling = RewardScaling(shape=self.args.N, gamma=self.args.gamma) def run(self, ): evaluate_num = -1 # Record the number of evaluations while self.total_steps < self.args.max_train_steps: if self.total_steps // self.args.evaluate_freq > evaluate_num: self.evaluate_policy() # Evaluate the policy every 'evaluate_freq' steps evaluate_num += 1 _, episode_steps = self.run_episode_mpe(evaluate=False) # Run an episode self.total_steps += episode_steps if self.replay_buffer.episode_num == self.args.batch_size: self.agent_n.train(self.replay_buffer, self.total_steps) # Training self.replay_buffer.reset_buffer() self.evaluate_policy() self.env.close() def evaluate_policy(self, ): evaluate_reward = 0 for _ in range(self.args.evaluate_times): episode_reward, _ = self.run_episode_mpe(evaluate=True) evaluate_reward += episode_reward evaluate_reward = evaluate_reward / self.args.evaluate_times self.evaluate_rewards.append(evaluate_reward) print("total_steps:{} \t evaluate_reward:{}".format(self.total_steps, evaluate_reward)) self.writer.add_scalar('evaluate_step_rewards_{}'.format(self.env_name), evaluate_reward, global_step=self.total_steps) # Save the rewards and models np.save('./data_train/MAPPO_env_{}_number_{}_seed_{}.npy'.format(self.env_name, self.number, self.seed), np.array(self.evaluate_rewards)) self.agent_n.save_model(self.env_name, self.number, self.seed, self.total_steps) def run_episode_mpe(self, evaluate=False): episode_reward = 0 obs_n = self.env.reset() if self.args.use_reward_scaling: self.reward_scaling.reset() if self.args.use_rnn: # If use RNN, before the beginning of each episode,reset the rnn_hidden of the Q network. self.agent_n.actor.rnn_hidden = None self.agent_n.critic.rnn_hidden = None for episode_step in range(self.args.episode_limit): a_n, a_logprob_n = self.agent_n.choose_action(obs_n, evaluate=evaluate) # Get actions and the corresponding log probabilities of N agents s = np.array(obs_n).flatten() # In MPE, global state is the concatenation of all agents' local obs. v_n = self.agent_n.get_value(s) # Get the state values (V(s)) of N agents obs_next_n, r_n, done_n, _ = self.env.step(a_n) episode_reward += r_n[0] if not evaluate: if self.args.use_reward_norm: r_n = self.reward_norm(r_n) elif args.use_reward_scaling: r_n = self.reward_scaling(r_n) # Store the transition self.replay_buffer.store_transition(episode_step, obs_n, s, v_n, a_n, a_logprob_n, r_n, done_n) obs_n = obs_next_n if all(done_n): break if not evaluate: # An episode is over, store v_n in the last step s = np.array(obs_n).flatten() v_n = self.agent_n.get_value(s) self.replay_buffer.store_last_value(episode_step + 1, v_n) return episode_reward, episode_step + 1 if __name__ == '__main__': parser = argparse.ArgumentParser("Hyperparameters Setting for MAPPO in MPE environment") parser.add_argument("--max_train_steps", type=int, default=int(3e6), help=" Maximum number of training steps") parser.add_argument("--episode_limit", type=int, default=25, help="Maximum number of steps per episode") parser.add_argument("--evaluate_freq", type=float, default=5000, help="Evaluate the policy every 'evaluate_freq' steps") parser.add_argument("--evaluate_times", type=float, default=3, help="Evaluate times") parser.add_argument("--batch_size", type=int, default=32, help="Batch size (the number of episodes)") parser.add_argument("--mini_batch_size", type=int, default=8, help="Minibatch size (the number of episodes)") parser.add_argument("--rnn_hidden_dim", type=int, default=64, help="The number of neurons in hidden layers of the rnn") parser.add_argument("--mlp_hidden_dim", type=int, default=64, help="The number of neurons in hidden layers of the mlp") parser.add_argument("--lr", type=float, default=5e-4, help="Learning rate") parser.add_argument("--gamma", type=float, default=0.99, help="Discount factor") parser.add_argument("--lamda", type=float, default=0.95, help="GAE parameter") parser.add_argument("--epsilon", type=float, default=0.2, help="GAE parameter") parser.add_argument("--K_epochs", type=int, default=15, help="GAE parameter") parser.add_argument("--use_adv_norm", type=bool, default=True, help="Trick 1:advantage normalization") parser.add_argument("--use_reward_norm", type=bool, default=True, help="Trick 3:reward normalization") parser.add_argument("--use_reward_scaling", type=bool, default=False, help="Trick 4:reward scaling. Here, we do not use it.") parser.add_argument("--entropy_coef", type=float, default=0.01, help="Trick 5: policy entropy") parser.add_argument("--use_lr_decay", type=bool, default=True, help="Trick 6:learning rate Decay") parser.add_argument("--use_grad_clip", type=bool, default=True, help="Trick 7: Gradient clip") parser.add_argument("--use_orthogonal_init", type=bool, default=True, help="Trick 8: orthogonal initialization") parser.add_argument("--set_adam_eps", type=float, default=True, help="Trick 9: set Adam epsilon=1e-5") parser.add_argument("--use_relu", type=float, default=False, help="Whether to use relu, if False, we will use tanh") parser.add_argument("--use_rnn", type=bool, default=False, help="Whether to use RNN") parser.add_argument("--add_agent_id", type=float, default=False, help="Whether to add agent_id. Here, we do not use it.") parser.add_argument("--use_value_clip", type=float, default=False, help="Whether to use value clip.") args = parser.parse_args() runner = Runner_MAPPO_MPE(args, env_name="simple_spread", number=1, seed=0) runner.run() 

移植事项:

1.注意环境参数的设置格式

2.注意环境的返回值利用

3.注意主运行流程的runner.run()的相关设置,等

可借鉴:【MADRL】基于MADRL的单调价值函数分解(QMIX)算法​​​​​​ 中关于 QMIX算法移植的注意事项和代码注释。


     文章若有不当和不正确之处,还望理解与指出。由于部分文字、图片等来源于互联网,无法核实真实出处,如涉及相关争议,请联系博主删除。如有错误、疑问和侵权,欢迎评论留言联系作者,或者关注VX公众号:Rain21321,联系作者。

Read more

Python实现 MCP 客户端调用(高德地图 MCP 服务)查询天气示例

Python实现 MCP 客户端调用(高德地图 MCP 服务)查询天气示例

文章目录 * MCP 官网 * MCP 官方文档中文版 * 官方 MCP 服务示例 * Github * MCP 市场 * 简介 * 架构 * 高德地图 MCP 客户端示例 * python-sdk 客户端 * java-sdk 客户端 MCP 官网 * https://modelcontextprotocol.io/introduction MCP 官方文档中文版 * https://app.apifox.com/project/5991953 官方 MCP 服务示例 * https://github.com/modelcontextprotocol/servers Github * python-sdk:https://github.com/modelcontextprotocol/python-sdk * java-sdk:

By Ne0inhk
43-dify案例分享-MCP-Server让工作流秒变第三方可调用服务

43-dify案例分享-MCP-Server让工作流秒变第三方可调用服务

1.前言 之前我们为大家介绍过MCP SSE插件,它能够支持MCP-server在Dify平台上的调用,从而帮助Dify与第三方平台提供的MCP-server进行无缝对接。有些小伙伴提出了疑问:既然Dify可以通过MCP SSE插件调用其他平台的MCP-server,那么Dify的工作流或Chatflow是否也能发布为MCP-server,供其他支持MCP client的工具使用呢?今天,我们将为大家介绍一款Dify插件——mcp-server,它能够实现这一功能,即将Dify的工作流或Chatflow发布为MCP-server,供其他第三方工具调用。 插件名字叫做MCP-server,我们在dify插件市场可以找到这个工具 Mcp-server 是一个由 Dify 社区贡献的 Extension 类型插件。安装后,你可以把任何 Dify 应用转变成符合 MCP 标准的 Server Endpoint,供外部 MCP 客户端直接访问。它的主要功能包括: * **暴露为 MCP 工具:**将 Dify 应用抽象为单一 MCP 工具,供外部 MCP 客户端(如

By Ne0inhk
【MCP】详细了解MCP协议:和function call的区别何在?如何使用MCP?

【MCP】详细了解MCP协议:和function call的区别何在?如何使用MCP?

本文介绍了MCP大模型上下文协议的的概念,并对比了MCP协议和function call的区别,同时用python sdk为例介绍了mcp的使用方式。 1. 什么是MCP? 官网:https://modelcontextprotocol.io/introduction 2025年,Anthropic提出了MCP协议。MCP全称为Model Context Protocol,翻译过来是大模型上下文协议。这个协议的主要为AI大模型和外部工具(比如让AI去查询信息,或者让AI操作本地文件)之间的交互提供了一个统一的处理协议。我们常用的USB TypeC接口(USB-C)统一了USB接口的样式,MCP协议就好比AI大模型中的USB-C,统一了大模型与工具的对接方式。 MCP协议采用了C/S架构,也就是服务端、客户端架构,能支持在客户端设备上调用远程Server提供的服务,同时也支持stdio流式传输模式,也就是在客户端本地启动mcp服务端。只需要在配置文件中新增MCP服务端,就能用上这个MCP服务器提供的各种工具,大大提高了大模型使用外部工具的便捷性。 MCP是开源协议,能让所有A

By Ne0inhk
【大模型系列篇】大模型基建工程:基于 FastAPI 自动构建 SSE MCP 服务器

【大模型系列篇】大模型基建工程:基于 FastAPI 自动构建 SSE MCP 服务器

今天我们将使用FastAPI来构建 MCP 服务器,Anthropic 推出的这个MCP 协议,目的是让 AI 代理和你的应用程序之间的对话变得更顺畅、更清晰。FastAPI 基于 Starlette 和 Uvicorn,采用异步编程模型,可轻松处理高并发请求,尤其适合 MCP 场景下大模型与外部系统的实时交互需求,其性能接近 Node.js 和 Go,在数据库查询、文件操作等 I/O 密集型任务中表现卓越。 开始今天的正题前,我们来回顾下相关的知识内容: 《高性能Python Web服务部署架构解析》、《使用Python开发MCP Server及Inspector工具调试》、《构建智能体MCP客户端:完成大模型与MCP服务端能力集成与最小闭环验证》   FastAPI基础知识 安装依赖 pip install uvicorn, fastapi FastAPI服务代码示例  from fastapi import FastAPI app

By Ne0inhk