【宇树机器人强化学习】(一):PPO算法的python实现与解析

【宇树机器人强化学习】(一):PPO算法的python实现与解析

前言

Unitree RL GYM 是一个开源的 基于 Unitree 机器人强化学习(Reinforcement Learning, RL)控制示例项目,用于训练、测试和部署四足机器人控制策略。该仓库支持多种 Unitree 机器人型号,包括 Go2、H1、H1_2 和 G1仓库地址

请添加图片描述

0 仓库安装

  • 关于仓库的安装和环境配置官方的文档已经非常清楚了,这里就不在赘述。
  • 通过下述指令可以快速获取仓库代码。

官方教程

请添加图片描述
git clone https://github.com/leggedrobotics/rsl_rl.git cd rsl_rl git checkout v1.0.2 

0-1 PPO公式回顾
  • 姑且这里回顾一下PPO的核心公式PPO 的目标函数是: L c l i p ( θ ) = E [ min ⁡ ( r ( θ ) A , c l i p ( r ( θ ) , 1 − ϵ , 1 + ϵ ) A ) ] L^{clip}(\theta)=\mathbb{E}[\min(r(\theta)A,\mathrm{clip}(r(\theta),1-\epsilon,1+\epsilon)A)] Lclip(θ)=E[min(r(θ)A,clip(r(θ),1−ϵ,1+ϵ)A)]其中:
    • r ( θ ) r(\theta) r(θ):新旧策略概率比
    • A A A:Advantage(优势函数)
    • ϵ \epsilon ϵ:裁剪范围,一般取 0.1~0.2

  • 概率比率(Probability Ratio) r ( θ ) = π θ ( a ∣ s ) π θ o l d ( a ∣ s ) r(\theta) = \frac{\pi_\theta(a|s)}{\pi_{\theta_{old}}(a|s)} r(θ)=πθold​​(a∣s)πθ​(a∣s)​它表示:​
    • 新策略和旧策略在某个动作上的概率比例。
    • 如果r ≈ 1,说明新旧策略 差不多
    • 如果r >> 1 或 r << 1,说明策略 变化太大
  • 通过上述公式,PPO 会限制 r ( θ ) r(\theta) r(θ)的取值范围 [ 1 − ϵ , 1 + ϵ ] [1-\epsilon, 1+\epsilon] [1−ϵ,1+ϵ]如果超过这个范围,梯度就会被裁剪,不再继续增大。

  • GAE(Generalized Advantage Estimation):
    • 它通过引入一个参数 λ(lambda),将 多步 TD 误差进行加权平均,从而得到更加稳定的 Advantage 估计。
  • 公式: A t G A E = ∑ l = 0 ∞ ( γ λ ) l δ t + l A_t^{GAE} = \sum_{l=0}^{\infty} (\gamma \lambda)^l \delta_{t+l} AtGAE​=l=0∑∞​(γλ)lδt+l​其中: δ t = r t + γ V ( s t + 1 ) − V ( s t ) \delta_t = r_t + \gamma V(s_{t+1}) - V(s_t) δt​=rt​+γV(st+1​)−V(st​)
  • 表示 TD 误差(Temporal Difference Error)
  • 参数 λ \lambda λ 控制了 偏差和方差之间的平衡
    • λ = 0 只使用 一步 TD 误差,方差小,偏差较大
    • λ = 1 接近 Monte Carlo 回报.偏差小.方差较大

  • 策略熵的公式为: H ( π ) = − ∑ π ( a ∣ s ) log ⁡ π ( a ∣ s ) H(\pi) = -\sum \pi(a|s)\log \pi(a|s) H(π)=−∑π(a∣s)logπ(a∣s)
    • 策略越随机,熵越大

1 仓库一览

  • 拉取完仓库以后,我们可以简单的使用tree指令看一下整个项目的结构
  • rsl_rl目录结构
rsl_rl/ ├── algorithms/ ├── env/ ├── modules/ ├── runners/ ├── storage/ └── utils/ 

1-1 algorithms/目录
algorithms/ ├── __init__.py ├── ppo.py 
  • 功能:存放 RL 算法实现,例如 ppo.py 实现了 PPO(Proximal Policy Optimization) 算法。
  • 特点
    • 可以扩展更多算法(如 DDPG、TD3、DPPO)。
    • 提供训练所需的核心算法逻辑(策略更新、损失函数计算等)。

1-2 env/目录
env/ ├── __init__.py └── vec_env.py 
  • 功能:封装环境接口。
    • vec_env.py 实现 Vectorized Environment,支持多环境并行训练。
  • 作用
    • 对接仿真环境(如 PyBullet / Mujoco)。
    • 提供标准接口给算法训练(step、reset、render 等)。

1-3 modules/目录
modules/ ├── actor_critic.py ├── actor_critic_recurrent.py 
  • 功能:定义策略网络结构。
    • actor_critic.py:普通 Actor-Critic 网络。
    • actor_critic_recurrent.py:RNN / LSTM 版本的 Actor-Critic 网络。
  • 作用
    • 提供策略和价值网络给 PPO 或其他算法调用。
    • 支持状态序列建模,适合处理时间相关的机器人动作控制。

1-4 runners/目录
runners/ └── on_policy_runner.py 
  • 功能:训练调度器。
    • on_policy_runner.py 负责 按策略采样数据并执行训练循环
  • 作用
    • 管理数据采样、训练步数、模型保存。
    • 将算法、环境、存储模块整合成完整的训练流程。

1-5 storage/目录
storage/ └── rollout_storage.py 
  • 功能:存储采样轨迹(rollouts)。
  • 作用
    • PPO 需要保存每一步的状态、动作、奖励等。
    • 提供 mini-batch 更新、归一化等功能。

1-6 utils/目录
utils/ └── utils.py 
  • 功能:工具函数。
    • 例如日志记录、模型保存/加载、张量操作等。
  • 作用
    • 提供训练和部署所需的通用工具函数,减轻主逻辑负担。

2 PPO算法的python实现

2-1 代码一览
  • 代码的路径在
algorithms/ ├── __init__.py ├── ppo.py 
  • 代码整体在这,别急我们一部分一部分进行分析
# SPDX-FileCopyrightText: Copyright (c) 2021 NVIDIA CORPORATION & AFFILIATES. All rights reserved.# SPDX-License-Identifier: BSD-3-Clause# # Redistribution and use in source and binary forms, with or without# modification, are permitted provided that the following conditions are met:## 1. Redistributions of source code must retain the above copyright notice, this# list of conditions and the following disclaimer.## 2. Redistributions in binary form must reproduce the above copyright notice,# this list of conditions and the following disclaimer in the documentation# and/or other materials provided with the distribution.## 3. Neither the name of the copyright holder nor the names of its# contributors may be used to endorse or promote products derived from# this software without specific prior written permission.## THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR# SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.## Copyright (c) 2021 ETH Zurich, Nikita Rudinimport torch import torch.nn as nn import torch.optim as optim from rsl_rl.modules import ActorCritic from rsl_rl.storage import RolloutStorage classPPO: actor_critic: ActorCritic def__init__(self, actor_critic, num_learning_epochs=1, num_mini_batches=1, clip_param=0.2, gamma=0.998, lam=0.95, value_loss_coef=1.0, entropy_coef=0.0, learning_rate=1e-3, max_grad_norm=1.0, use_clipped_value_loss=True, schedule="fixed", desired_kl=0.01, device='cpu',): self.device = device self.desired_kl = desired_kl self.schedule = schedule self.learning_rate = learning_rate # PPO components self.actor_critic = actor_critic self.actor_critic.to(self.device) self.storage =None# initialized later self.optimizer = optim.Adam(self.actor_critic.parameters(), lr=learning_rate) self.transition = RolloutStorage.Transition()# PPO parameters self.clip_param = clip_param self.num_learning_epochs = num_learning_epochs self.num_mini_batches = num_mini_batches self.value_loss_coef = value_loss_coef self.entropy_coef = entropy_coef self.gamma = gamma self.lam = lam self.max_grad_norm = max_grad_norm self.use_clipped_value_loss = use_clipped_value_loss definit_storage(self, num_envs, num_transitions_per_env, actor_obs_shape, critic_obs_shape, action_shape): self.storage = RolloutStorage(num_envs, num_transitions_per_env, actor_obs_shape, critic_obs_shape, action_shape, self.device)deftest_mode(self): self.actor_critic.test()deftrain_mode(self): self.actor_critic.train()defact(self, obs, critic_obs):if self.actor_critic.is_recurrent: self.transition.hidden_states = self.actor_critic.get_hidden_states()# Compute the actions and values self.transition.actions = self.actor_critic.act(obs).detach() self.transition.values = self.actor_critic.evaluate(critic_obs).detach() self.transition.actions_log_prob = self.actor_critic.get_actions_log_prob(self.transition.actions).detach() self.transition.action_mean = self.actor_critic.action_mean.detach() self.transition.action_sigma = self.actor_critic.action_std.detach()# need to record obs and critic_obs before env.step() self.transition.observations = obs self.transition.critic_observations = critic_obs return self.transition.actions defprocess_env_step(self, rewards, dones, infos): self.transition.rewards = rewards.clone() self.transition.dones = dones # Bootstrapping on time outsif'time_outs'in infos: self.transition.rewards += self.gamma * torch.squeeze(self.transition.values * infos['time_outs'].unsqueeze(1).to(self.device),1)# Record the transition self.storage.add_transitions(self.transition) self.transition.clear() self.actor_critic.reset(dones)defcompute_returns(self, last_critic_obs): last_values= self.actor_critic.evaluate(last_critic_obs).detach() self.storage.compute_returns(last_values, self.gamma, self.lam)defupdate(self): mean_value_loss =0 mean_surrogate_loss =0if self.actor_critic.is_recurrent: generator = self.storage.reccurent_mini_batch_generator(self.num_mini_batches, self.num_learning_epochs)else: generator = self.storage.mini_batch_generator(self.num_mini_batches, self.num_learning_epochs)for obs_batch, critic_obs_batch, actions_batch, target_values_batch, advantages_batch, returns_batch, old_actions_log_prob_batch, \ old_mu_batch, old_sigma_batch, hid_states_batch, masks_batch in generator: self.actor_critic.act(obs_batch, masks=masks_batch, hidden_states=hid_states_batch[0]) actions_log_prob_batch = self.actor_critic.get_actions_log_prob(actions_batch) value_batch = self.actor_critic.evaluate(critic_obs_batch, masks=masks_batch, hidden_states=hid_states_batch[1]) mu_batch = self.actor_critic.action_mean sigma_batch = self.actor_critic.action_std entropy_batch = self.actor_critic.entropy # KLif self.desired_kl !=Noneand self.schedule =='adaptive':with torch.inference_mode(): kl = torch.sum( torch.log(sigma_batch / old_sigma_batch +1.e-5)+(torch.square(old_sigma_batch)+ torch.square(old_mu_batch - mu_batch))/(2.0* torch.square(sigma_batch))-0.5, axis=-1) kl_mean = torch.mean(kl)if kl_mean > self.desired_kl *2.0: self.learning_rate =max(1e-5, self.learning_rate /1.5)elif kl_mean < self.desired_kl /2.0and kl_mean >0.0: self.learning_rate =min(1e-2, self.learning_rate *1.5)for param_group in self.optimizer.param_groups: param_group['lr']= self.learning_rate # Surrogate loss ratio = torch.exp(actions_log_prob_batch - torch.squeeze(old_actions_log_prob_batch)) surrogate =-torch.squeeze(advantages_batch)* ratio surrogate_clipped =-torch.squeeze(advantages_batch)* torch.clamp(ratio,1.0- self.clip_param,1.0+ self.clip_param) surrogate_loss = torch.max(surrogate, surrogate_clipped).mean()# Value function lossif self.use_clipped_value_loss: value_clipped = target_values_batch +(value_batch - target_values_batch).clamp(-self.clip_param, self.clip_param) value_losses =(value_batch - returns_batch).pow(2) value_losses_clipped =(value_clipped - returns_batch).pow(2) value_loss = torch.max(value_losses, value_losses_clipped).mean()else: value_loss =(returns_batch - value_batch).pow(2).mean() loss = surrogate_loss + self.value_loss_coef * value_loss - self.entropy_coef * entropy_batch.mean()# Gradient step self.optimizer.zero_grad() loss.backward() nn.utils.clip_grad_norm_(self.actor_critic.parameters(), self.max_grad_norm) self.optimizer.step() mean_value_loss += value_loss.item() mean_surrogate_loss += surrogate_loss.item() num_updates = self.num_learning_epochs * self.num_mini_batches mean_value_loss /= num_updates mean_surrogate_loss /= num_updates self.storage.clear()return mean_value_loss, mean_surrogate_loss 

2-2 初始化函数
  • 我们来看看这个类初始化部分:
classPPO: actor_critic: ActorCritic def__init__(self, actor_critic, num_learning_epochs=1, num_mini_batches=1, clip_param=0.2, gamma=0.998, lam=0.95, value_loss_coef=1.0, entropy_coef=0.0, learning_rate=1e-3, max_grad_norm=1.0, use_clipped_value_loss=True, schedule="fixed", desired_kl=0.01, device='cpu',): self.device = device self.desired_kl = desired_kl self.schedule = schedule self.learning_rate = learning_rate # PPO components self.actor_critic = actor_critic self.actor_critic.to(self.device) self.storage =None# initialized later self.optimizer = optim.Adam(self.actor_critic.parameters(), lr=learning_rate) self.transition = RolloutStorage.Transition()# PPO parameters self.clip_param = clip_param self.num_learning_epochs = num_learning_epochs self.num_mini_batches = num_mini_batches self.value_loss_coef = value_loss_coef self.entropy_coef = entropy_coef self.gamma = gamma self.lam = lam self.max_grad_norm = max_grad_norm self.use_clipped_value_loss = use_clipped_value_loss 
  • 初始化传入了大量PPO的超参数:
    • actor_critic:这里传入的是PPO算法必须的Actor-Critic 网络(这个网络的定义在modules/actor_critic.py,这个我们后面几期会进行解析)
    • num_learning_epochs=1:每一批 rollout 数据 重复训练多少轮
    • num_mini_batches=1:把 rollout 数据分成多少 mini-batch(以提高样本利用率)
    • ==clip_param=0.2:这个是PPO的 ϵ \epsilon ϵ核心参数,用于对策略进行裁切
    • gamma=0.998:奖励折扣因子,用于控制控制 长期奖励权重
    • lam=0.95GAE的 λ 参数,用于在计算优势函数的时候降低方差
    • value_loss_coef=1.0:损失函数权重,越高越关注value网络
    • entropy_coef=0.0:策略熵, 鼓励策略保持一定随机性,用于设置额外探索奖励
    • learning_rate=1e-3:神经网络学习率
    • max_grad_norm=1.0:梯度裁剪,大于此值的梯度值会被裁切,防止梯度爆炸
    • use_clipped_value_loss=True:是否使用Value Clipping,防止 Critic更新过大
    • schedule="fixed":表示 训练过程中学习率保持固定,不根据 KL 或训练情况动态调整
    • desired_kl=0.01:目标 KL 散度,表示 期望新旧策略之间的 KL 距离大约为 0.01,用于在自适应学习率策略中控制策略更新幅度
    • `device=‘cpu’:运行设备
  • 同时还定义了一些变量
self.storage =None# initialized later self.optimizer = optim.Adam(self.actor_critic.parameters(), lr=learning_rate) self.transition = RolloutStorage.Transition()
  • self.storage:经验回放缓存(Rollout Buffer)占位符
  • self.optimizerAdam优化器 来更新 Actor-Critic 网络参数
  • self.transition临时数据结构(step buffer)

2-3 初始化经验回放缓存函数 init_storage()
definit_storage(self, num_envs, num_transitions_per_env, actor_obs_shape, critic_obs_shape, action_shape): self.storage = RolloutStorage( num_envs, num_transitions_per_env, actor_obs_shape, critic_obs_shape, action_shape, self.device )
  • 这个函数用于初始化经验回放缓存(Rollout Buffer)机制的数据缓存(Rollout Buffer)
  • 定义在storage/rollout_storage.py,我们之后也会解析

2-4 模式函数
deftest_mode(self): self.actor_critic.test()deftrain_mode(self): self.actor_critic.train()
  • 这两都是启用actor_critic的模式
  • 这个网络的定义在modules/actor_critic.py,我们之后也会解析

2-5 行动函数act()
  • 这个函数的作用:根据当前状态 ,计算动作并返回,同时并记录训练数据
defact(self, obs, critic_obs):if self.actor_critic.is_recurrent: self.transition.hidden_states = self.actor_critic.get_hidden_states()# Compute the actions and values self.transition.actions = self.actor_critic.act(obs).detach() self.transition.values = self.actor_critic.evaluate(critic_obs).detach() self.transition.actions_log_prob = self.actor_critic.get_actions_log_prob(self.transition.actions).detach() self.transition.action_mean = self.actor_critic.action_mean.detach() self.transition.action_sigma = self.actor_critic.action_std.detach()# need to record obs and critic_obs before env.step() self.transition.observations = obs self.transition.critic_observations = critic_obs return self.transition.actions 

  • 我们一步步看,首先我们来看函数的输入
defact(self, obs, critic_obs):
  • obs策略网络的输入
  • critic_obs价值网络输入

if self.actor_critic.is_recurrent: self.transition.hidden_states = self.actor_critic.get_hidden_states()
  • 这里判断是否需要使用 RNN / LSTM 网络,如果是,需要保存hidden_state,否则后面训练无法恢复序列状态。

self.transition.actions = self.actor_critic.act(obs).detach() self.transition.values = self.actor_critic.evaluate(critic_obs).detach()
  • Actor网络根据策略网络的输入来计算动作,.detach()表示不参与梯度计算,只是进行采样。
  • Critic网络 计算价值,.detach()表示不参与梯度计算,只是进行采样。

self.transition.actions_log_prob = self.actor_critic.get_actions_log_prob(self.transition.actions).detach()
  • 这里计算动作概率 l o g π θ ( a ∣ s ) log π_\theta(a|s) logπθ​(a∣s),用于后面计算概率比率(Probability Ratio)的时候使用

self.transition.action_mean = self.actor_critic.action_mean.detach() self.transition.action_sigma = self.actor_critic.action_std.detach()
  • 这里保存策略分布,用于 KL散度计算。其中动作通常来自 高斯分布: a   N ( μ , σ ) a ~ N(μ , σ) a N(μ,σ)

# need to record obs and critic_obs before env.step() self.transition.observations = obs self.transition.critic_observations = critic_obs return self.transition.actions 
  • 保存状态并返回动作,这里需要在env.step()之前保存,否则状态就改变了

2-6 处理环境反馈函数process_env_step()
  • 这个函数用于处理环境返回结果,并存储数据
defprocess_env_step(self, rewards, dones, infos): self.transition.rewards = rewards.clone() self.transition.dones = dones # Bootstrapping on time outsif'time_outs'in infos: self.transition.rewards += self.gamma * torch.squeeze(self.transition.values * infos['time_outs'].unsqueeze(1).to(self.device),1)# Record the transition self.storage.add_transitions(self.transition) self.transition.clear() self.actor_critic.reset(dones)

self.transition.rewards = rewards.clone() self.transition.dones = dones 
  • 保存奖励 r t r_t rt​,同时保存终止信号

# Bootstrapping on time outsif'time_outs'in infos: self.transition.rewards += self.gamma * torch.squeeze(self.transition.values * infos['time_outs'].unsqueeze(1).to(self.device),1)
  • 有些 episode 结束不是因为失败,而是达到最大步数,那就不能把未来价值 V ( s ) V(s) V(s)当成 0。
  • 这时候修正value的计算 r = r + γ V ( s ) r = r + γV(s) r=r+γV(s)

# Record the transition self.storage.add_transitions(self.transition) self.transition.clear() self.actor_critic.reset(dones)
  • 在经验池里头储存数据,每一步的数据包含 ( s , a , r , V , l o g p r o b ) (s,a,r,V,log_prob) (s,a,r,V,logp​rob)
  • 清空 transition并重置RNN

2-7 计算收获函数compute_returns
  • 计算 PPO 训练需要的奖励returns和 优势函数advantage
defcompute_returns(self, last_critic_obs): last_values= self.actor_critic.evaluate(last_critic_obs).detach() self.storage.compute_returns(last_values, self.gamma, self.lam)
  • 第一行会计算最后状态价值 V ( s T ) V(s_T) V(sT​)
  • 第二行就是GAE计算,将 多步 TD 误差进行加权平均,从而得到更加稳定的 Advantage 估计。
  • Return: R t = r t + γ R t + 1 R_t = r_t + γR_{t+1} Rt​=rt​+γRt+1​
  • 优势函数Advantage(GAE): δ t = r t + γ V ( s t + 1 ) − V ( s t ) δ_t = r_t + γV(s_{t+1}) - V(s_t) δt​=rt​+γV(st+1​)−V(st​) A t = δ t + γ λ δ t + 1 + ( γ λ ) 2 δ t + 2 + . . . A_t = δ_t + γλδ_{t+1} + (γλ)^2δ_{t+2}+... At​=δt​+γλδt+1​+(γλ)2δt+2​+...

3 核心函数update

3-1 完整实现
  • 前面的代码主要完成 数据采样与优势计算,而 PPO 的核心训练逻辑全部在 update() 函数中完成。 这个函数负责:
    • 重新计算策略概率
    • 计算 PPO loss
    • 反向传播更新网络
defupdate(self): mean_value_loss =0 mean_surrogate_loss =0if self.actor_critic.is_recurrent: generator = self.storage.reccurent_mini_batch_generator(self.num_mini_batches, self.num_learning_epochs)else: generator = self.storage.mini_batch_generator(self.num_mini_batches, self.num_learning_epochs)for obs_batch, critic_obs_batch, actions_batch, target_values_batch, advantages_batch, returns_batch, old_actions_log_prob_batch, \ old_mu_batch, old_sigma_batch, hid_states_batch, masks_batch in generator: self.actor_critic.act(obs_batch, masks=masks_batch, hidden_states=hid_states_batch[0]) actions_log_prob_batch = self.actor_critic.get_actions_log_prob(actions_batch) value_batch = self.actor_critic.evaluate(critic_obs_batch, masks=masks_batch, hidden_states=hid_states_batch[1]) mu_batch = self.actor_critic.action_mean sigma_batch = self.actor_critic.action_std entropy_batch = self.actor_critic.entropy # KLif self.desired_kl !=Noneand self.schedule =='adaptive':with torch.inference_mode(): kl = torch.sum( torch.log(sigma_batch / old_sigma_batch +1.e-5)+(torch.square(old_sigma_batch)+ torch.square(old_mu_batch - mu_batch))/(2.0* torch.square(sigma_batch))-0.5, axis=-1) kl_mean = torch.mean(kl)if kl_mean > self.desired_kl *2.0: self.learning_rate =max(1e-5, self.learning_rate /1.5)elif kl_mean < self.desired_kl /2.0and kl_mean >0.0: self.learning_rate =min(1e-2, self.learning_rate *1.5)for param_group in self.optimizer.param_groups: param_group['lr']= self.learning_rate # Surrogate loss ratio = torch.exp(actions_log_prob_batch - torch.squeeze(old_actions_log_prob_batch)) surrogate =-torch.squeeze(advantages_batch)* ratio surrogate_clipped =-torch.squeeze(advantages_batch)* torch.clamp(ratio,1.0- self.clip_param,1.0+ self.clip_param) surrogate_loss = torch.max(surrogate, surrogate_clipped).mean()# Value function lossif self.use_clipped_value_loss: value_clipped = target_values_batch +(value_batch - target_values_batch).clamp(-self.clip_param, self.clip_param) value_losses =(value_batch - returns_batch).pow(2) value_losses_clipped =(value_clipped - returns_batch).pow(2) value_loss = torch.max(value_losses, value_losses_clipped).mean()else: value_loss =(returns_batch - value_batch).pow(2).mean() loss = surrogate_loss + self.value_loss_coef * value_loss - self.entropy_coef * entropy_batch.mean()# Gradient step self.optimizer.zero_grad() loss.backward() nn.utils.clip_grad_norm_(self.actor_critic.parameters(), self.max_grad_norm) self.optimizer.step() mean_value_loss += value_loss.item() mean_surrogate_loss += surrogate_loss.item() num_updates = self.num_learning_epochs * self.num_mini_batches mean_value_loss /= num_updates mean_surrogate_loss /= num_updates self.storage.clear()return mean_value_loss, mean_surrogate_loss 

3-2 参数定义
  • 我们一步步来看:
mean_value_loss =0 mean_surrogate_loss =0if self.actor_critic.is_recurrent: generator = self.storage.reccurent_mini_batch_generator(self.num_mini_batches, self.num_learning_epochs)else: generator = self.storage.mini_batch_generator(self.num_mini_batches, self.num_learning_epochs)
  • mean_value_lossmean_surrogate_loss:统计整个训练过程中的 平均 loss,用于日志打印。
  • 然后我们根据是否使用 RNN / LSTM 网络来构造 mini-batch 迭代器

3-3 每个batch
for obs_batch, critic_obs_batch, actions_batch, target_values_batch, advantages_batch, returns_batch, old_actions_log_prob_batch, \ old_mu_batch, old_sigma_batch, hid_states_batch, masks_batch in generator:
  • 然后我们在每个 batch 取出这些变量:
    • obs_batch:Actor网络输入
    • critic_obs_batch:Critic网络输入
    • actions_batch:采样动作
    • target_values_batch:旧价值函数V(s)
    • advantages_batch:GAE优势函数
    • critic_obs_batch:Critic输入
    • returns_batch:目标价值
    • old_actions_log_prob_batch:旧策略概率 l o g π θ ( a ∣ s ) log π_\theta(a|s) logπθ​(a∣s)
    • old_mu_batch:旧策略均值 μ μ μ
    • old_sigma_batch:旧策略方差 σ σ σ

self.actor_critic.act(obs_batch, masks=masks_batch, hidden_states=hid_states_batch[0])
  • 首先调用act函数进行Actor前向计算,重新计算 当前策略的动作分布 π θ ( a ∣ s ) = N ( μ θ ( s ) , σ θ ( s ) ) \pi_\theta(a|s) = \mathcal{N}(\mu_\theta(s), \sigma_\theta(s)) πθ​(a∣s)=N(μθ​(s),σθ​(s))

actions_log_prob_batch = self.actor_critic.get_actions_log_prob(actions_batch)
  • 然后获取当前动作概率log prob log ⁡ π θ ( a t ∣ s t ) \log \pi_\theta(a_t|s_t) logπθ​(at​∣st​),用于一会计算概率比率

value_batch = self.actor_critic.evaluate(critic_obs_batch)
  • Critic计算价值函数 value

mu_batch = self.actor_critic.action_mean sigma_batch = self.actor_critic.action_std entropy_batch = self.actor_critic.entropy 
  • mu_batch: μ \mu μ策略均值
  • sigma_batch: σ \sigma σ策略方差
  • entropy:策略熵

3-4 KL散度控制
  • KL散度控制就干一件事:如果 KL 太大,降低学习率。
  • 这是一种简单的 Trust Region 近似实现, 用于防止策略更新过大导致训练不稳定。
# KLif self.desired_kl !=Noneand self.schedule =='adaptive':with torch.inference_mode(): kl = torch.sum( torch.log(sigma_batch / old_sigma_batch +1.e-5)+(torch.square(old_sigma_batch)+ torch.square(old_mu_batch - mu_batch))/(2.0* torch.square(sigma_batch))-0.5, axis=-1) kl_mean = torch.mean(kl)if kl_mean > self.desired_kl *2.0: self.learning_rate =max(1e-5, self.learning_rate /1.5)elif kl_mean < self.desired_kl /2.0and kl_mean >0.0: self.learning_rate =min(1e-2, self.learning_rate *1.5)for param_group in self.optimizer.param_groups: param_group['lr']= self.learning_rate 
  • 其中的kl对应 高斯分布KL公式 K L ( π o l d ∣ ∣ π n e w ) = log ⁡ σ σ o l d + σ o l d 2 + ( μ o l d − μ ) 2 2 σ 2 − 1 2 KL(\pi_{old}||\pi_{new}) = \log\frac{\sigma}{\sigma_{old}} + \frac{\sigma_{old}^2 + (\mu_{old}-\mu)^2}{2\sigma^2} - \frac12 KL(πold​∣∣πnew​)=logσold​σ​+2σ2σold2​+(μold​−μ)2​−21​
if kl_mean > self.desired_kl *2.0: self.learning_rate =max(1e-5, self.learning_rate /1.5)elif kl_mean < self.desired_kl /2.0and kl_mean >0.0: self.learning_rate =min(1e-2, self.learning_rate *1.5)for param_group in self.optimizer.param_groups: param_group['lr']= self.learning_rate 
  • 自适应学习率并更新
KL说明
太大更新太猛
太小更新太慢

3-5 PPO核心:概率比率
ratio = torch.exp(actions_log_prob_batch - old_actions_log_prob_batch)
  • 这就是PPO的核心公式: r t ( θ ) = π θ ( a t ∣ s t ) π θ o l d ( a t ∣ s t ) r_t(\theta) = \frac{\pi_\theta(a_t|s_t)} {\pi_{\theta_{old}}(a_t|s_t)} rt​(θ)=πθold​​(at​∣st​)πθ​(at​∣st​)​
  • 它表示:​
    • 新策略和旧策略在某个动作上的概率比例。
    • 如果r ≈ 1,说明新旧策略 差不多
    • 如果r >> 1 或 r << 1,说明策略 变化太大

3-6 PPO又一核心 Clip裁切
surrogate =-torch.squeeze(advantages_batch)* ratio surrogate_clipped =-torch.squeeze(advantages_batch)* torch.clamp(ratio,1.0- self.clip_param,1.0+ self.clip_param) surrogate_loss = torch.max(surrogate, surrogate_clipped).mean()
  • 也就是对应的 L C L I P = E [ min ⁡ ( r t ( θ ) A t , c l i p ( r t ( θ ) , 1 − ϵ , 1 + ϵ ) A t ) ] L^{CLIP} = E[ \min( r_t(\theta)A_t, clip(r_t(\theta),1-\epsilon,1+\epsilon)A_t ) ] LCLIP=E[min(rt​(θ)At​,clip(rt​(θ),1−ϵ,1+ϵ)At​)]
  • 第一行是原始策略梯度,也就是公式中的 L P G = E [ r t ( θ ) A t ] L^{PG} = E[r_t(\theta)A_t] LPG=E[rt​(θ)At​]
  • 通过上述公式,PPO 会限制 r ( θ ) r(\theta) r(θ)的取值范围 [ 1 − ϵ , 1 + ϵ ] [1-\epsilon, 1+\epsilon] [1−ϵ,1+ϵ]如果超过这个范围,梯度就会被裁剪,不再继续增大。
  • 注意:这里加负号是因为 PyTorch默认最小化loss

3-7 损失函数
# Value function lossif self.use_clipped_value_loss: value_clipped = target_values_batch +(value_batch - target_values_batch).clamp(-self.clip_param, self.clip_param) value_losses =(value_batch - returns_batch).pow(2) value_losses_clipped =(value_clipped - returns_batch).pow(2) value_loss = torch.max(value_losses, value_losses_clipped).mean()else: value_loss =(returns_batch - value_batch).pow(2).mean() loss = surrogate_loss + self.value_loss_coef * value_loss - self.entropy_coef * entropy_batch.mean()
  • 这里计算完整的损失函数公式 L = L p o l i c y + c 1 L v a l u e − c 2 H ( π ) L = L_{policy} + c_1 L_{value} - c_2 H(\pi) L=Lpolicy​+c1​Lvalue​−c2​H(π)
  • 其中:
    • self.value_loss_coef * value_loss:价值网络损失
    • surrogate_loss:策略网络损失
    • self.entropy_coef * entropy_batch.mean():策略熵函数损失
  • 这里根据是否使用PPO value clip分为两种计算value_loss的方式
  1. 普通value loss L V = ( V θ ( s ) − R t ) 2 L_V = (V_\theta(s) - R_t)^2 LV​=(Vθ​(s)−Rt​)2
  2. PPO value clip V c l i p = V o l d + c l i p ( V θ − V o l d ) V^{clip} = V_{old} + clip(V_\theta - V_{old}) Vclip=Vold​+clip(Vθ​−Vold​) L V = m a x ( ( V − R ) 2 , ( V c l i p − R ) 2 ) L_V = max( (V - R)^2, (V^{clip}-R)^2 ) LV​=max((V−R)2,(Vclip−R)2)
  • 如果使用PPO value clip可以防止 critic更新过大

3-8 梯度推进
# Gradient step self.optimizer.zero_grad() loss.backward() nn.utils.clip_grad_norm_(self.actor_critic.parameters(), self.max_grad_norm) self.optimizer.step() mean_value_loss += value_loss.item() mean_surrogate_loss += surrogate_loss.item()
  • 剩下的就是
    • 清空梯度
    • 反向传播
    • 梯度裁剪 ∣ ∣ g ∣ ∣ < m a x _ g r a d _ n o r m ∣∣g∣∣<max\_grad\_norm ∣∣g∣∣<max_grad_norm
    • 更新参数
    • 记录loss

3-9 外层循环收尾工作
  • 计算平均loss,清空rollout buffer
num_updates = self.num_learning_epochs * self.num_mini_batches mean_value_loss /= num_updates mean_surrogate_loss /= num_updates self.storage.clear()

3-10 PPO训练循环
  • PPO训练循环:
  1. 收集 rollout
  2. 计算 advantage (GAE)
  3. 多轮 mini-batch 更新
  4. clip policy
  5. clip value
  6. entropy regularization
  • 数学目标: L = E [ min ⁡ ( r t A t , c l i p ( r t , 1 − ϵ , 1 + ϵ ) A t ) ] + c 1 ( V − R ) 2 − c 2 H ( π ) L = E[ \min( r_t A_t, clip(r_t,1-\epsilon,1+\epsilon)A_t ) ] + c_1(V-R)^2 - c_2H(\pi) L=E[min(rt​At​,clip(rt​,1−ϵ,1+ϵ)At​)]+c1​(V−R)2−c2​H(π)

在这里插入图片描述

小结

  • 本期我们对 rsl_rl 仓库中 PPO 算法的 Python 实现进行了全面解析:从初始化超参数、经验回放缓存、动作采样、环境反馈处理,到优势函数计算与策略更新的完整流程。核心机制包括概率比率裁剪 (clip)、GAE 优势估计、价值函数裁剪、防止梯度爆炸、以及可选的自适应学习率和 KL 控制,最终通过组合策略损失、价值损失和策略熵形成完整优化目标,实现对四足机器人稳定且高效的强化学习训练。
  • 如有错误,欢迎指出!感谢观看

Read more

如何用腾讯云轻量应用服务器内置OpenClaw应用搭建OpenClaw并接入QQ、飞书机器人,下载skill,开启对话

如何用腾讯云轻量应用服务器内置OpenClaw应用搭建OpenClaw并接入QQ、飞书机器人,下载skill,开启对话

诸神缄默不语-个人技术博文与视频目录 如需OpenClaw下载安装、配置、部署服务可以联系:https://my.feishu.cn/share/base/form/shrcnqjFuoNiBPXjADvRhiUcB1B 我发现腾讯云买服务器可以用QQ钱包,这不得狠狠把我多年来抢的红包狠狠利用一下。 OpenClaw我之前玩了几天,现在把gateway关了,因为我感觉第一是感觉AI对于一些细微的执行逻辑还是绕不明白,而且API太慢了等得我着急,慢得我都不知道它是死了还是只是慢,不如我直接一个古法编程下去开发一个自己的工具。我本来是想拿OpenClaw当时间管理助手的,但是研究了一番感觉它作为整个人完整的时间/项目/文件系统/财务/生活管理助手的潜力还是很大的。但是,也就仅止于潜力了,跟OpenClaw绕记账怎么记实在是把我绕火大了……第二,正如网上一直宣传的那样,这玩意太耗token了,我的混元和Qwen免费额度几乎都秒爆,GLM也给我一下子烧了一大笔。我觉得这不是我的消费水平该玩的东西……主要我也确实没有什么用OpenClaw赚大钱的好idea。 但是我仍然觉得OpenClaw

2025年12 电子学会 机器人三级等级考试真题

2512 青少年等级考试机器人理论真题 单选题(20题,共80分) 第1题 下列选项中,关于传感器描述正确的是? A.将非电的物理量转化为数字信号的器件 B.将非电的物理量转化为模拟信号的器件 C.将非电的物理量转化为电信号的器件 D.将电信号转化为其他形式信号的器件 第2题 下列选项中,属于半导体材料的是? A.电阻 B.发光二极管 C.铜导线 D.纯净水 第3题 下列电路符号中,用于标识光敏电阻的是? A. B. C. D. 第4题 下列选项中,说法错误的是? A.电路搭设完毕,通电前要检查电路 B.电路搭设时,因为电阻没有极性,无需考虑方向 C.电路搭设时,需要注意LED引脚的极性 D.电路搭设时,可以带电插拔元器件

【Microi 吾码】基于 Microi 吾码低代码框架构建 Vue 高效应用之道

【Microi 吾码】基于 Microi 吾码低代码框架构建 Vue 高效应用之道

我的个人主页 文章专栏:Microi吾码 引言 在当今快速发展的软件开发领域,低代码开发平台正逐渐崭露头角,为开发者们提供了更高效的应用构建途径。Microi 吾码低代码框架结合 Vue的强大前端能力,更是为打造高效应用提供了绝佳的组合。在这里,我将深入探讨如何基于 Microi 吾码低代码框架构建 Vue 高效应用。 Microi吾码官网: https://microi.net GitEE开源地址: microi.net: 一:Microi吾码安装指南 1、系统要求 * 操作系统:支持Windows、Linux等主流操作系统。 * 数据库:需要安装并配置支持的数据库,如MySql5.5+、SqlServer2016+、Oracle11g+等。 * 其他软件:安装.NET 8 SDK、Redis,并且最好安装Git用于代码获取。对于一些高级功能,可能还需要安装Docker、MinIO、MongoDB、RabbitMQ、

无人机目标检测数据集介绍-14,751张图片 无人机检测 航拍图像

无人机目标检测数据集介绍-14,751张图片 无人机检测 航拍图像

🚁 无人机目标检测数据集介绍-14,751张图片 * 📦 已发布目标检测数据集合集(持续更新) * 🚁 无人机实例目标检测数据集介绍 * 📌 数据集概览 * 包含类别 * 🎯 应用场景 * 🖼 数据特性 * 🌟 项目功能 * 🔗 技术标签 * YOLOv8 训练实战 * 📦 1. 环境配置 * 安装 YOLOv8 官方库 ultralytics * 📁 2. 数据准备 * 2.1 数据标注格式(YOLO) * 2.2 文件结构示例 * 2.3 创建 data.yaml 配置文件 * 🚀 3. 模型训练 * 关键参数补充说明: * 📈 4. 模型验证与测试 * 4.1 验证模型性能 * 关键参数详解 * 常用可选参数 * 典型输出指标 * 4.2 推理测试图像 * 🧠 5. 自定义推理脚本(