近端策略优化算法 (PPO) 详解
近端策略优化(Proximal Policy Optimization, PPO)是一种强化学习算法,旨在复杂任务中兼顾性能提升、训练稳定性与效率。相比传统策略梯度方法,PPO 通过限制策略更新幅度,有效防止模型因参数过大更新而崩溃。
1. 核心思想与背景
PPO 由 OpenAI 在 2017 年提出,其核心目标是简化训练过程,克服 TRPO(Trust Region Policy Optimization)的计算复杂性。在强化学习中,直接优化策略往往导致不稳定的训练。PPO 的解决方案是引入概率比率和剪辑机制,确保每一步训练都不会偏离当前策略太多,同时高效利用采样数据。
1.1 概率比率
PPO 使用概率比率 $r_t(\theta)$ 来衡量新旧策略的差异:
$$r_t(\theta) = \frac{\pi_\theta(a_t | s_t)}{\pi_{\theta_{\text{old}}}(a_t | s_t)}$$
其中 $\pi_{\theta_{\text{old}}}$ 为旧策略,$\pi_\theta$ 为新策略。该比率表示新策略在相同状态下选择动作的概率变化程度。
1.2 优势函数
为了评价某个动作的相对好坏,PPO 引入了优势函数 $A_t$:
$$A_t = Q(s_t, a_t) - V(s_t)$$
或者使用广义优势估计(GAE)进行近似。优势函数引导策略向更优方向改进。
2. 优化目标与损失函数
PPO 的目标是在保持改进的同时防止策略变化过大,主要通过以下损失函数组合实现:
2.1 裁剪策略损失
为了防止策略更新过度,PPO 采用裁剪操作将概率比率限制在区间 $[1-\epsilon, 1+\epsilon]$ 内:
$$L^{CLIP}(\theta) = \mathbb{E}_t \left[ \min \left( r_t(\theta) A_t, \text{clip}(r_t(\theta), 1-\epsilon, 1+\epsilon) A_t \right) \right]$$
这一机制相当于给策略更新设定了'安全边界',既允许进步,又避免激进更新导致的性能崩塌。
2.2 值函数损失
Critic 网络负责估计状态价值 $V(s_t)$,通过最小化均方误差进行更新:
$$L^{VF}(\theta) = \mathbb{E}_t \left[ \left( V(s_t; \theta) - R_t \right)^2 \right]$$
其中 $R_t$ 为累计回报。这有助于 Critic 更准确地评估当前状态的价值。
2.3 熵正则化
为了鼓励探索,防止策略过早收敛到局部最优,加入熵正则化项:
$$L^{ENT}(\theta) = \mathbb{E}t \left[ H(\pi\theta(s_t)) \right]$$
2.4 总损失函数
综合上述三项,PPO 的总损失函数为:
$$L(\theta) = \mathbb{E}_t \left[ L^{CLIP}(\theta) - c_1 L^{VF}(\theta) + c_2 L^{ENT}(\theta) \right]$$
其中 $c_1$ 和 $c_2$ 为权重系数,用于平衡策略优化、值函数更新和探索能力。
3. PyTorch 代码实现
以下是基于 PyTorch 的完整 PPO 实现,包含 Actor-Critic 网络、经验存储及训练循环。代码逻辑清晰,便于理解各模块的作用。
import torch
import torch.nn as nn
import torch.optim as optim
from torch.distributions import Categorical
import numpy np
gym
device = torch.device( torch.cuda.is_available() )
(nn.Module):
():
(ActorCritic, ).__init__()
.shared_layer = nn.Sequential(
nn.Linear(state_dim, ),
nn.ReLU()
)
.actor = nn.Sequential(
nn.Linear(, action_dim),
nn.Softmax(dim=-)
)
.critic = nn.Linear(, )
():
shared = .shared_layer(state)
action_probs = .actor(shared)
state_value = .critic(shared)
action_probs, state_value
:
():
.states = []
.actions = []
.logprobs = []
.rewards = []
.is_terminals = []
():
.states = []
.actions = []
.logprobs = []
.rewards = []
.is_terminals = []
:
():
.policy = ActorCritic(state_dim, action_dim).to(device)
.optimizer = optim.Adam(.policy.parameters(), lr=lr)
.policy_old = ActorCritic(state_dim, action_dim).to(device)
.policy_old.load_state_dict(.policy.state_dict())
.MseLoss = nn.MSELoss()
.gamma = gamma
.eps_clip = eps_clip
.K_epochs = K_epochs
():
state = torch.FloatTensor(state).to(device)
action_probs, _ = .policy_old(state)
dist = Categorical(action_probs)
action = dist.sample()
memory.states.append(state)
memory.actions.append(action)
memory.logprobs.append(dist.log_prob(action))
action.item()
():
old_states = torch.stack(memory.states).to(device).detach()
old_actions = torch.stack(memory.actions).to(device).detach()
old_logprobs = torch.stack(memory.logprobs).to(device).detach()
rewards = []
discounted_reward =
reward, is_terminal ((memory.rewards), (memory.is_terminals)):
is_terminal:
discounted_reward =
discounted_reward = reward + (.gamma * discounted_reward)
rewards.insert(, discounted_reward)
rewards = torch.tensor(rewards, dtype=torch.float32).to(device)
rewards = (rewards - rewards.mean()) / (rewards.std() + )
_ (.K_epochs):
action_probs, state_values = .policy(old_states)
dist = Categorical(action_probs)
new_logprobs = dist.log_prob(old_actions)
entropy = dist.entropy()
ratios = torch.exp(new_logprobs - old_logprobs.detach())
advantages = rewards - state_values.detach().squeeze()
surr1 = ratios * advantages
surr2 = torch.clamp(ratios, - .eps_clip, + .eps_clip) * advantages
loss_actor = -torch.(surr1, surr2).mean()
loss_critic = .MseLoss(state_values.squeeze(), rewards)
loss = loss_actor + * loss_critic - * entropy.mean()
.optimizer.zero_grad()
loss.backward()
.optimizer.step()
.policy_old.load_state_dict(.policy.state_dict())
__name__ == :
env = gym.make()
state_dim = env.observation_space.shape[]
action_dim = env.action_space.n
ppo = PPO(state_dim, action_dim, lr=, gamma=, eps_clip=, K_epochs=)
memory = Memory()
max_episodes =
max_timesteps =
episode (, max_episodes + ):
state = env.reset()
total_reward =
t (max_timesteps):
action = ppo.select_action(state, memory)
state, reward, done, _ = env.step(action)
memory.rewards.append(reward)
memory.is_terminals.append(done)
total_reward += reward
done:
ppo.update(memory)
memory.clear()
()
env.close()


