跳到主要内容PPO算法在四足机器人上的实现:rsl_rl代码解读 | 极客日志PythonAI算法
PPO算法在四足机器人上的实现:rsl_rl代码解读
拆解rsl_rl仓库中PPO算法的Python实现,覆盖目录结构、初始化、采样、GAE计算和策略更新。重点介绍概率比率裁剪、价值损失裁剪、自适应学习率控制及训练循环,揭示PPO在四足机器人上的实际代码细节。
最近在看宇树机器人的强化学习控制,顺便把 rsl_rl 仓库里的 PPO 实现走了一遍。这篇文章就是走读过程的记录,主要梳理类结构、关键函数和训练循环,不涉及环境配置的细节(官方文档已经写得很清楚了)。
仓库地址:https://github.com/unitreerobotics/unitree_rl_gym.git ,官方教程:https://github.com/unitreerobotics/unitree_rl_gym/blob/main/doc/setup_zh.md
快速拿到代码:
git clone https://github.com/leggedrobotics/rsl_rl.git
cd rsl_rl
git checkout v1.0.2
项目整体结构用 tree 看一下:
rsl_rl/
├── algorithms/
├── env/
├── modules/
├── runners/
├── storage/
└── utils/
algorithms/:放 RL 算法,目前只有 ppo.py,后续可以扩展 DDPG 之类。
env/:向量化环境封装,对接 PyBullet/Mujoco。
modules/:Actor-Critic 网络定义,包含普通版和 RNN 版。
runners/:训练调度器 on_policy_runner.py,负责采数据、调 update、保存模型。
storage/:轨迹存储 rollout_storage.py,管理 mini-batch 生成和归一化。
utils/:日志、模型存取等工具。
重点在 algorithms/ppo.py。这个文件实现了完整的 PPO,代码大约 150 行(含版权头),下面分段拆解。
初始化与超参数
class PPO:
actor_critic: ActorCritic
def __init__(self, actor_critic, num_learning_epochs=1, num_mini_batches=1,
clip_param=0.2, gamma=0.998, lam=0.95, value_loss_coef=1.0,
entropy_coef=0.0, learning_rate=1e-3, max_grad_norm=1.0,
use_clipped_value_loss=True, schedule="fixed",
desired_kl=, device=):
.actor_critic = actor_critic
.actor_critic.to(.device)
.storage =
.optimizer = optim.Adam(.actor_critic.parameters(), lr=learning_rate)
.transition = RolloutStorage.Transition()
.clip_param = clip_param
.num_learning_epochs = num_learning_epochs
.num_mini_batches = num_mini_batches
.value_loss_coef = value_loss_coef
.entropy_coef = entropy_coef
.gamma = gamma
.lam = lam
.max_grad_norm = max_grad_norm
.use_clipped_value_loss = use_clipped_value_loss
0.01
'cpu'
self
self
self
self
None
self
self
self
self
self
self
self
self
self
self
self
self
传入的 actor_critic 是 Actor-Critic 双输出网络(定义在 modules/actor_critic.py)。Adam 优化器固定绑在参数上。有意思的是 self.transition,这是一个单步数据容器,每步采样完就填进去,之后整个塞进 storage。
clip_param=0.2:PPO 的裁剪范围 ε,控制策略更新幅度。
gamma=0.998:折扣因子,0.998 在连续控制里比较常见。
lam=0.95:GAE 的 λ,平衡偏差与方差。
value_loss_coef=1.0:价值损失的权重,调大可以让 Critic 学得更猛一点。
entropy_coef=0.0:熵正则系数,默认关掉了,需要探索时可以给个小值。
learning_rate=1e-3:初始学习率,配合自适应调度会动态变化。
max_grad_norm=1.0:梯度裁剪阈值,防爆炸。
use_clipped_value_loss=True:对 value loss 也做裁剪,防 Critic 更新过大。
schedule="fixed" 和 desired_kl=0.01:如果改成 adaptive,会根据新旧策略的 KL 散度自动调整学习率,目标 KL 约 0.01。
采样流程:act() 和 process_env_step()
act() 函数负责与环境交互,计算动作并记录必要数据:
def act(self, obs, critic_obs):
if self.actor_critic.is_recurrent:
self.transition.hidden_states = self.actor_critic.get_hidden_states()
self.transition.actions = self.actor_critic.act(obs).detach()
self.transition.values = self.actor_critic.evaluate(critic_obs).detach()
self.transition.actions_log_prob = self.actor_critic.get_actions_log_prob(
self.transition.actions).detach()
self.transition.action_mean = self.actor_critic.action_mean.detach()
self.transition.action_sigma = self.actor_critic.action_std.detach()
self.transition.observations = obs
self.transition.critic_observations = critic_obs
return self.transition.actions
obs 给 Actor,critic_obs 给 Critic(两者可能不同,比如 Critic 可以多看一些信息)。detach() 切断梯度,因为采样阶段不需要反向传播。动作的均值和对数概率要留下来,后面算比率和 KL 时会用。最后记得把原始观测也存在 transition 里,因为环境 step 之后状态就变了。
拿到动作后,环境给出奖励和 done 信号,交给 process_env_step():
def process_env_step(self, rewards, dones, infos):
self.transition.rewards = rewards.clone()
self.transition.dones = dones
if 'time_outs' in infos:
self.transition.rewards += self.gamma * torch.squeeze(
self.transition.values * infos['time_outs'].unsqueeze(1).to(self.device), 1)
self.storage.add_transitions(self.transition)
self.transition.clear()
self.actor_critic.reset(dones)
这里有个常见坑:因步数上限而被截断的 episode,不能简单把后续价值当零看,所以做了奖励修正 rewards + γ*V(s)。之后整个 transition 推入 storage,清空临时数据;如果策略网络是 RNN,还要按照 done 重置隐藏状态。
采样完一轮之后,调用 compute_returns() 计算回报和 GAE:
def compute_returns(self, last_critic_obs):
last_values = self.actor_critic.evaluate(last_critic_obs).detach()
self.storage.compute_returns(last_values, self.gamma, self.lam)
last_values 是最后状态的 V(s),用来做 bootstrapping。GAE 的计算在 storage.compute_returns() 里完成,大致是:
δ_t = r_t + γ V(s_{t+1}) - V(s_t)
A_t^{GAE} = δ_t + (γλ) δ_{t+1} + (γλ)^2 δ_{t+2} + ...
同时也会算好 target return R_t = r_t + γ R_{t+1},供 value loss 使用。
核心更新:update()
def update(self):
mean_value_loss = 0
mean_surrogate_loss = 0
if self.actor_critic.is_recurrent:
generator = self.storage.reccurent_mini_batch_generator(
self.num_mini_batches, self.num_learning_epochs)
else:
generator = self.storage.mini_batch_generator(
self.num_mini_batches, self.num_learning_epochs)
for ... in generator:
采样数据按 mini-batch 喂给网络,重复 num_learning_epochs 轮。每个 batch 里会做下面这些事。
self.actor_critic.act(obs_batch, masks=masks_batch, hidden_states=hid_states_batch[0])
actions_log_prob_batch = self.actor_critic.get_actions_log_prob(actions_batch)
value_batch = self.actor_critic.evaluate(critic_obs_batch, masks=masks_batch,
hidden_states=hid_states_batch[1])
mu_batch = self.actor_critic.action_mean
sigma_batch = self.actor_critic.action_std
entropy_batch = self.actor_critic.entropy
这里不是直接用之前存的动作分布,而是根据 batch 里的观测重新计算,得到当前的 μ、σ 和新策略下动作的对数概率。同时 Critic 输出当前价值估计。
如果 schedule='adaptive' 且 desired_kl 不为 None,会计算新旧高斯分布的 KL 散度:
KL(π_old || π_new) = log(σ/σ_old) + (σ_old² + (μ_old - μ)²)/(2σ²) - 1/2
kl = torch.sum(
torch.log(sigma_batch / old_sigma_batch + 1.e-5) +
(torch.square(old_sigma_batch) + torch.square(old_mu_batch - mu_batch))
/ (2.0 * torch.square(sigma_batch)) - 0.5,
axis=-1)
kl_mean = torch.mean(kl)
| KL 均值 | 处理 |
|---|
| > desired_kl × 2 | 更新太猛,学习率除以 1.5 |
| < desired_kl / 2 且>0 | 更新太慢,学习率乘以 1.5 |
调整后的学习率会覆写到优化器的所有参数组。这个机制实现的是 Trust Region 的近似,比无裁剪稳定不少。
ratio = torch.exp(actions_log_prob_batch - old_actions_log_prob_batch)
即 r(θ) = π_θ(a|s) / π_θ_old(a|s)。
surrogate = -advantages_batch * ratio
surrogate_clipped = -advantages_batch * torch.clamp(
ratio, 1.0 - self.clip_param, 1.0 + self.clip_param)
surrogate_loss = torch.max(surrogate, surrogate_clipped).mean()
形式化就是 L^{CLIP} = E[ min(r·A, clip(r, 1-ε, 1+ε)·A) ]。注意前面加了负号,因为 PyTorch 优化器默认最小化。如果优势函数 A 是正的,我们不希望比率变得过大(策略变得太 greedy)。裁剪限制了概率比的有效范围,梯度就不会再推着 ratio 走远。
价值损失也采用了类似的裁剪(如果 use_clipped_value_loss=True):
value_clipped = target_values_batch + (value_batch - target_values_batch).clamp(
-self.clip_param, self.clip_param)
value_losses = (value_batch - returns_batch).pow(2)
value_losses_clipped = (value_clipped - returns_batch).pow(2)
value_loss = torch.max(value_losses, value_losses_clipped).mean()
target_values_batch 是采样时存下的旧价值(V_old),value_batch 是当前价值。如果当前价值偏离旧价值超过 ε,裁剪会限制 loss,防止 Critic 一步迈太大。不用裁剪时,就是普通的 MSE:
value_loss = (returns_batch - value_batch).pow(2).mean()
loss = surrogate_loss + self.value_loss_coef * value_loss \
- self.entropy_coef * entropy_batch.mean()
熵项前面是减号,因为熵越大(策略越随机)意味着更多的探索,我们希望鼓励一点熵,所以最大化熵,等价于最小化负熵。
self.optimizer.zero_grad()
loss.backward()
nn.utils.clip_grad_norm_(self.actor_critic.parameters(), self.max_grad_norm)
self.optimizer.step()
一个 epoch 结束后对 loss 取平均,最后清空 storage,返回平均 value loss 和 surrogate loss,方便日志监控。
总结训练循环
- 用当前策略与环境交互,采样 rollout 数据(含动作、奖励、观测、对数概率、均值方差)。
- 用 GAE 计算优势函数和 target return。
- 将 rollout 分为 mini-batch,重复若干个 epoch:
- 重新计算当前策略分布和对数概率。
- (可选)根据 KL 散度自适应调整学习率。
- 计算裁剪后的策略损失和价值损失,加上熵正则。
- 反向传播、梯度裁剪、参数更新。
- 清空 storage,进入下一轮采样。
L = E[ min(r·A, clip(r, 1-ε, 1+ε)·A) ] + c₁ (V - R)² - c₂ H(π)
整个实现很紧凑,对理解 PPO 的工程细节帮助挺大。后面打算再读一下 modules/actor_critic.py 里网络的具体构造,看是怎么把观测映射到动作分布的。
相关免费在线工具
- 加密/解密文本
使用加密算法(如AES、TripleDES、Rabbit或RC4)加密和解密文本明文。 在线工具,加密/解密文本在线工具,online
- RSA密钥对生成器
生成新的随机RSA私钥和公钥pem证书。 在线工具,RSA密钥对生成器在线工具,online
- Mermaid 预览与可视化编辑
基于 Mermaid.js 实时预览流程图、时序图等图表,支持源码编辑与即时渲染。 在线工具,Mermaid 预览与可视化编辑在线工具,online
- 随机西班牙地址生成器
随机生成西班牙地址(支持马德里、加泰罗尼亚、安达卢西亚、瓦伦西亚筛选),支持数量快捷选择、显示全部与下载。 在线工具,随机西班牙地址生成器在线工具,online
- Gemini 图片去水印
基于开源反向 Alpha 混合算法去除 Gemini/Nano Banana 图片水印,支持批量处理与下载。 在线工具,Gemini 图片去水印在线工具,online
- curl 转代码
解析常见 curl 参数并生成 fetch、axios、PHP curl 或 Python requests 示例代码。 在线工具,curl 转代码在线工具,online