MAPPO 多智能体近端策略优化算法详解
多智能体近端策略优化(Multi-Agent Proximal Policy Optimization, MAPPO)是 PPO 在多智能体环境中的扩展。它继承了 PPO 稳定的策略更新机制,通过引入集中式训练与分布式执行(CTDE)的架构,有效解决了多智能体协作与竞争场景下的策略学习问题。
核心思想与动机
PPO 之所以流行,在于其裁剪策略更新机制解决了传统方法步长过大导致的不稳定问题。在多智能体系统中,每个智能体的行为都会影响全局状态和其他智能体的决策,因此需要一个鲁棒的优化方法。
MAPPO 的核心架构如下:
- 集中式训练:训练阶段,Critic 网络可以访问全局状态和其他智能体的动作信息,从而学习到更准确的价值函数。
- 分布式执行:执行阶段,每个智能体仅使用局部观测和自身策略进行动作选择,保证了系统的去中心化控制能力。
这种设计使得智能体在训练时能'看到'队友的动作以协同优化,而在部署时又能独立运行,兼顾了性能与灵活性。
关键公式解析
MAPPO 主要包含两部分:价值函数估计与策略更新。
优势函数计算
优势函数用于衡量某个动作相对于平均动作的优劣程度。通常采用广义优势估计(GAE):
$$\hat{A}t^i = \delta_t + (\gamma \lambda) \delta{t+1} + ... + (\gamma \lambda)^{T-t+1} \delta_{T-1}$$
其中 $\delta_t$ 是时间差分误差:
$$\delta_t = r_t + \gamma V(s_{t+1}) - V(s_t)$$
这里 $\lambda$ 是 GAE 权重参数,用于平衡偏差和方差。
价值函数估计
每个智能体 $i$ 的价值函数 $V_i(s)$ 由一个中心化的 Critic 网络估计。该网络利用全局状态 $s$ 和所有智能体的动作来预测全局价值。目标是最小化均方误差损失:
$$L(\phi_i) = \mathbb{E}{s_t, r_t, s{t+1}} \left[ \left( V_i(s_t; \phi_i) - R_t \right)^2 \right]$$
其中 $R_t$ 是累计回报,通常通过 TD 目标估计:
$$R_t = r_t + \gamma V_i(s_{t+1}; \phi'_i)$$
策略更新目标
为了限制策略变化幅度,MAPPO 沿用了 PPO 的裁剪目标函数。对于每个智能体 $i$,其策略更新目标为:
$$L^{CLIP}_i(\theta_i) = \mathbb{E}_t \left[ \min \left( r_t(\theta_i) \hat{A}_t^i, \text{clip}(r_t(\theta_i), 1 - \epsilon, 1 + \epsilon) \hat{A}_t^i \right) \right]$$
其中 $r_t(\theta_i)$ 是当前策略与旧策略的比率,$\epsilon$ 是裁剪阈值。这意味着当新策略偏离旧策略太远时,梯度会被截断,从而保证训练的稳定性。
算法流程
实际训练中,我们遵循以下循环步骤:
- 交互与收集:智能体与环境交互,存储状态、动作、奖励及下一状态等经验。
- Critic 更新:基于当前经验最小化价值函数损失,更新 Critic 参数。
- Actor 更新:利用裁剪后的 PPO 目标函数更新策略网络参数。
- 软更新:对目标网络进行软更新,保持训练平滑。
- 重复迭代:直到策略收敛或达到预设步数。
在此过程中,需特别注意回报 $R_t$ 和优势函数 $\hat{A}_t^i$ 的计算,这直接决定了梯度的方向和质量。
Python 实现参考
下面是一个基于 MPE(Multi-Agent Particle Environment)环境的 MAPPO 实现框架。代码结构清晰,便于移植到其他多智能体仿真环境中。
import torch
import numpy np
torch.utils.tensorboard SummaryWriter
argparse
normalization Normalization, RewardScaling
replay_buffer ReplayBuffer
mappo_mpe MAPPO_MPE
environment Env
:
():
.args = args
.env_name = env_name
.number = number
.seed = seed
np.random.seed(.seed)
torch.manual_seed(.seed)
.env = Env(env_name, discrete=)
.args.N = .env.n
.args.obs_dim_n = [.env.observation_space[i].shape[] i (.args.N)]
.args.action_dim_n = [.env.action_space[i].n i (.args.N)]
.args.obs_dim = .args.obs_dim_n[]
.args.action_dim = .args.action_dim_n[]
.args.state_dim = np.(.args.obs_dim_n)
()
()
.agent_n = MAPPO_MPE(.args)
.replay_buffer = ReplayBuffer(.args)
.writer = SummaryWriter(log_dir=)
.evaluate_rewards = []
.total_steps =
.args.use_reward_norm:
.reward_norm = Normalization(shape=.args.N)
.args.use_reward_scaling:
.reward_scaling = RewardScaling(shape=.args.N, gamma=.args.gamma)
():
evaluate_num = -
.total_steps < .args.max_train_steps:
.total_steps // .args.evaluate_freq > evaluate_num:
.evaluate_policy()
evaluate_num +=
_, episode_steps = .run_episode_mpe(evaluate=)
.total_steps += episode_steps
.replay_buffer.episode_num == .args.batch_size:
.agent_n.train(.replay_buffer, .total_steps)
.replay_buffer.reset_buffer()
.evaluate_policy()
.env.close()
():
evaluate_reward =
_ (.args.evaluate_times):
episode_reward, _ = .run_episode_mpe(evaluate=)
evaluate_reward += episode_reward
evaluate_reward /= .args.evaluate_times
.evaluate_rewards.append(evaluate_reward)
()
.writer.add_scalar(.(.env_name), evaluate_reward, global_step=.total_steps)
np.save(.(.env_name, .number, .seed), np.array(.evaluate_rewards))
.agent_n.save_model(.env_name, .number, .seed, .total_steps)
():
episode_reward =
obs_n = .env.reset()
.args.use_reward_scaling:
.reward_scaling.reset()
.args.use_rnn:
.agent_n.actor.rnn_hidden =
.agent_n.critic.rnn_hidden =
episode_step (.args.episode_limit):
a_n, a_logprob_n = .agent_n.choose_action(obs_n, evaluate=evaluate)
s = np.array(obs_n).flatten()
v_n = .agent_n.get_value(s)
obs_next_n, r_n, done_n, _ = .env.step(a_n)
episode_reward += r_n[]
evaluate:
.args.use_reward_norm:
r_n = .reward_norm(r_n)
.args.use_reward_scaling:
r_n = .reward_scaling(r_n)
.replay_buffer.store_transition(episode_step, obs_n, s, v_n, a_n, a_logprob_n, r_n, done_n)
obs_n = obs_next_n
(done_n):
evaluate:
s = np.array(obs_n).flatten()
v_n = .agent_n.get_value(s)
.replay_buffer.store_last_value(episode_step + , v_n)
episode_reward, episode_step +
__name__ == :
parser = argparse.ArgumentParser()
parser.add_argument(, =, default=(), =)
parser.add_argument(, =, default=, =)
parser.add_argument(, =, default=, =)
parser.add_argument(, =, default=, =)
parser.add_argument(, =, default=, =)
parser.add_argument(, =, default=, =)
parser.add_argument(, =, default=, =)
parser.add_argument(, =, default=, =)
parser.add_argument(, =, default=, =)
parser.add_argument(, =, default=, =)
parser.add_argument(, =, default=, =)
parser.add_argument(, =, default=, =)
args = parser.parse_args()
runner = Runner_MAPPO_MPE(args, env_name=, number=, seed=)
runner.run()


