跳到主要内容人形运控部署框架汇总:rsl_rl 与 unitree_rl_gym 解析 | 极客日志PythonAI算法
人形运控部署框架汇总:rsl_rl 与 unitree_rl_gym 解析
强化学习框架 RSL-RL 是人形机器人运动控制的核心组件之一,广泛应用于仿真训练与真机部署。该框架包含 Runner、Algorithm 和 Network 三大模块,支持 PPO 及蒸馏算法。核心实现涉及近端策略优化(PPO)算法细节,包括初始化、存储管理、动作选择、环境步处理、回报计算及参数更新。文章详细解析了 PPO 代码逻辑,涵盖价值函数损失、代理损失计算、KL 散度自适应调整等关键机制,并探讨了基于 Transformer 的 Actor-Critic 网络结构,为理解人形运控中的强化学习部署提供技术参考。
佛系玩家1 浏览 前言
人形机器人运动控制中,强化学习(RL)已成为核心部署方案。RSL-RL 作为经典的 RL 框架,在多个开源项目中被广泛采用,如 legged_gym 及宇树开源的 unitree_rl_gym。本文汇总梳理人形运控部署框架,重点解析 rsl_rl 架构及其在 PPO 算法中的具体实现。
第一部分 rsl_rl:经典 RL 框架封装
RSL-RL 是一个强化学习算法框架,其包含三大主要组件:Runners、Algorithms 和 Networks。
- 管理环境步进和智能体学习的 Runner
__init__.py
distillation_runner.py
on_policy_runner.py
- 供算法使用的 Network 结构
__init__.py
memory.py
mlp.py
normalization.py
定义学习智能体的 Algorithm——包含 ppo.py、distillation.py
即 RSL-RL 包含两种算法:
- PPO:一种无模型、基于策略的强化学习方法,能够从零开始学习复杂任务,无需先验知识。
- BC 算法:类似于 DAgger 的行为克隆算法,用于将专家策略的行为迁移到学生策略中。在使用 PPO 进行强化学习训练后,如果训练环境与硬件部署环境存在差异,该算法尤其有用。
此外,还有对环境的封装,以及模型组件:
- env:
__init__.py, vec_env.py
- modules:
__init__.py, actor_critic.py, actor_critic_recurrent.py, rnd.py, student_teacher.py 等。
具体而言,其代码结构为:
rsl_rl/
config/ # 配置文件
algorithms/ # 强化学习算法实现,如 PPO
env/ # 环境封装
modules/ # 模型组件
actor_critic.py # 基础 Actor-Critic 网络
actor_critic_depth_cnn.py # 带深度视觉的 Actor-Critic 网络
actor_critic_history.py # 带历史信息的 Actor-Critic 网络
actor_critic_recurrent.py # 循环神经网络版 Actor-Critic
depth_backbone.py # 深度视觉处理网络
runners/ # 训练运行器
storage/ # 数据存储
utils/ # 工具函数
这个模块定义了强化学习的核心组件,包括 actor-critic 网络架构(有普通版、CNN 版、历史记忆版和 RNN 版)、PPO 算法实现、训练运行器等。从代码中可以看出,它支持不同类型的输入数据 (如关节状态、深度图像等)。
1.1 rsl_rl/algorithms/ppo.py:近端策略优化 PPO 的实现
这段代码定义了一个名为 PPO 的类,它实现了近端策略优化 (Proximal Policy Optimization) 算法。
1.1.1 初始化 init
构造函数接收一个 actor_critic 网络(策略网络和价值网络的组合)以及学习超参数 (如学习周期 num_learning_epochs、小批量数量 num_mini_batches、裁剪参数 clip_param、折扣因子 、GAE lambda 等),以及优化器设置(学习率 、梯度裁剪范数 )。
gamma
lam
learning_rate
max_grad_norm
import torch
import torch.nn as nn
import torch.optim as optim
from rsl_rl.modules import ActorCriticTransformer
from rsl_rl.storage import RolloutStorage
class PPO:
actor_critic: ActorCriticTransformer
def __init__(self,
actor_critic,
num_learning_epochs=1,
num_mini_batches=1,
clip_param=0.2,
gamma=0.998,
lam=0.95,
value_loss_coef=1.0,
entropy_coef=0.0,
learning_rate=1e-3,
max_grad_norm=1.0,
use_clipped_value_loss=True,
schedule="fixed",
desired_kl=0.01,
device='cpu'):
device: 传入的设备参数。
desired_kl: 目标 KL 散度,用于自适应学习率调整。
schedule: 学习率调度策略。
optimizer: 使用 Adam 优化器来更新 actor_critic 网络的参数。
storage: 用于存储经验轨迹(transitions)的 RolloutStorage 对象。
1.1.2 存储初始化 init_storage
这个方法根据环境的数量、每个环境收集的转换(transitions)数量、观察空间形状和动作空间形状来创建 RolloutStorage 实例。
def init_storage(self, num_envs, num_transitions_per_env, actor_obs_shape, critic_obs_shape, action_shape):
self.storage = RolloutStorage(num_envs, num_transitions_per_env, actor_obs_shape, critic_obs_shape, action_shape, self.device)
1.1.3 模式切换 test_mode/train_mode
这些方法用于切换 actor_critic 网络到评估(测试)模式或训练模式。
def test_mode(self):
self.actor_critic.test()
def train_mode(self):
self.actor_critic.train()
1.1.4 动作选择 act
这是智能体与环境交互的核心。给定当前观察 obs 和 critic_obs,它执行以下操作:
def act(self, obs, critic_obs):
self.transition.action_mean = self.actor_critic.action_mean.detach()
self.transition.action_sigma = self.actor_critic.action_std.detach()
self.transition.actions = self.actor_critic.act(obs).detach()
self.transition.values = self.actor_critic.evaluate(critic_obs).detach()
self.transition.actions_log_prob = self.actor_critic.get_actions_log_prob(self.transition.actions).detach()
if self.actor_critic.is_recurrent:
self.transition.hidden_states = self.actor_critic.get_hidden_states()
return self.transition.actions
1.1.5 处理环境步骤 process_env_step
在环境执行动作后调用此方法。如果 episode 因为达到时间限制而不是因为失败状态而结束,它会将最后一步的估计价值加到奖励中,这称为'超时引导 (Bootstrapping on time outs)'。
def process_env_step(self, rewards, dones, infos):
self.transition.rewards = rewards.clone()
self.transition.dones = dones
self.actor_critic.reset(dones)
if 'time_outs' in infos:
self.transition.rewards += self.gamma * torch.squeeze(self.transition.values * infos['time_outs'].unsqueeze(1).to(self.device), 1)
self.storage.add_transitions(self.transition)
self.transition.clear()
1.1.6 计算回报 compute_returns
在收集了足够多的 transitions 后调用。通常使用广义优势估计 (Generalized Advantage Estimation, GAE) 来计算每个时间步的回报 (returns) 和优势 (advantages)。
def compute_returns(self, last_critic_obs):
last_values = self.actor_critic.evaluate(last_critic_obs).detach()
self.storage.compute_returns(last_values, self.gamma, self.lam)
1.1.7 update:策略和价值网络参数更新
def update(self):
mean_value_loss = 0
mean_surrogate_loss = 0
if self.actor_critic.is_recurrent:
generator = self.storage.reccurent_mini_batch_generator(self.num_mini_batches, self.num_learning_epochs)
else:
generator = self.storage.mini_batch_generator(self.num_mini_batches, self.num_learning_epochs)
for (obs_batch, critic_obs_batch, actions_batch, target_values_batch, advantages_batch, returns_batch, old_actions_log_prob_batch, old_mu_batch, old_sigma_batch, hid_states_batch, masks_batch) in generator:
ratio = torch.exp(actions_log_prob_batch - torch.squeeze(old_actions_log_prob_batch))
surrogate = -torch.squeeze(advantages_batch) * ratio
surrogate_clipped = -torch.squeeze(advantages_batch) * torch.clamp(ratio, 1.0 - self.clip_param, 1.0 + self.clip_param)
surrogate_loss = torch.max(surrogate, surrogate_clipped).mean()
if self.use_clipped_value_loss:
value_clipped = target_values_batch + (value_batch - target_values_batch).clamp(-self.clip_param, self.clip_param)
value_losses = (value_batch - returns_batch).pow(2)
value_losses_clipped = (value_clipped - returns_batch).pow(2)
value_loss = torch.max(value_losses, value_losses_clipped).mean()
else:
value_loss = (returns_batch - value_batch).pow(2).mean()
loss = surrogate_loss + self.value_loss_coef * value_loss - self.entropy_coef * entropy_batch.mean()
self.optimizer.zero_grad()
loss.backward()
nn.utils.clip_grad_norm_(self.actor_critic.parameters(), self.max_grad_norm)
self.optimizer.step()
mean_value_loss += value_loss.item()
mean_surrogate_loss += surrogate_loss.item()
if self.desired_kl is not None and self.schedule == "adaptive":
with torch.inference_mode():
kl = torch.sum(torch.log(sigma_batch / old_sigma_batch + 1.0e-5) + (torch.square(old_sigma_batch) + torch.square(old_mu_batch - mu_batch)) / (2.0 * torch.square(sigma_batch)) - 0.5, axis=-1)
kl_mean = torch.mean(kl)
if kl_mean > self.desired_kl * 2.0:
self.learning_rate = max(1e-5, self.learning_rate / 1.5)
elif kl_mean < self.desired_kl / 2.0 and kl_mean > 0.0:
self.learning_rate = min(1e-2, self.learning_rate * 1.5)
num_updates = self.num_learning_epochs * self.num_mini_batches
mean_value_loss /= num_updates
mean_surrogate_loss /= num_updates
self.storage.clear()
return mean_value_loss, mean_surrogate_loss
1.2 rsl_rl/env
1.3 rsl_rl/modules
1.3.0 modules/actor_critic_transformer.py
这段代码定义了一个基于 BERT 风格的 Transformer 模型,用于强化学习中的 Actor-Critic 方法。它由三个主要的类组成:Transformer_Block, Transformer、ActorCriticTransformer。
Transformer 类构建了一个完整的 Transformer 模型,接收输入数据并通过一系列的 Transformer_Block 进行处理。首先,输入数据通过一个线性层和位置嵌入 (Position Embedding) 进行处理,然后数据通过多个 Transformer_Block 进行处理,最后通过另一个线性层输出最终结果。
class Transformer(nn.Module):
def __init__(self, input_dim, output_dim, context_len, latent_dim=128, num_head=4, num_layer=4, dropout_rate=0.1) -> None:
super().__init__()
self.input_layer = nn.Sequential(
nn.Linear(input_dim, latent_dim),
nn.Dropout(dropout_rate),
)
self.weight_pos_embed = nn.Embedding(context_len, latent_dim)
self.attention_blocks = nn.Sequential(
*[Transformer_Block(latent_dim, num_head, dropout_rate) for _ in range(num_layer)],
)
self.output_layer = nn.Sequential(
nn.LayerNorm(latent_dim),
nn.Linear(latent_dim, output_dim),
)
def forward(self, x):
x = self.input_layer(x)
x = x + self.weight_pos_embed(torch.arange(x.shape[1], device=x.device))
x = self.attention_blocks(x)
x = x[:, -1, :]
x = self.output_layer(x)
return x
Transformer_Block 类是构建 Transformer 模型的基础块,包含了多头注意力机制 (Multihead Attention) 和前馈神经网络 (FeedForward Neural Network)。
class Transformer_Block(nn.Module):
def __init__(self, latent_dim, num_head, dropout_rate) -> None:
super().__init__()
self.ln_1 = nn.LayerNorm(latent_dim)
self.attn = nn.MultiheadAttention(latent_dim, num_head, dropout=dropout_rate, batch_first=True)
self.ln_2 = nn.LayerNorm(latent_dim)
self.mlp = nn.Sequential(
nn.Linear(latent_dim, 4 * latent_dim),
nn.GELU(),
nn.Linear(4 * latent_dim, latent_dim),
nn.Dropout(dropout_rate),
)
def forward(self, x):
x = self.ln_1(x)
x = x + self.attn(x, x, x, need_weights=False)[0]
x = self.ln_2(x)
x = x + self.mlp(x)
return x
1.3.1 modules/actor_critic_depth_cnn.py
1.3.2 modules/actor_critic_history.py
1.3.3 modules/actor_critic_recurrent.py
1.3.4 modules/actor_critic.py
这段代码定义了一个名为 ActorCritic 的类,它是一个用于强化学习的演员 - 评论家模型的实现。包含两个主要部分:一个用于决策的策略网络 (Actor) 和一个用于评估动作价值的价值网络 (Critic)。
class ActorCritic(nn.Module):
is_recurrent = False
def __init__(self, num_actor_obs, num_critic_obs, num_actions, actor_hidden_dims=[256, 256, 256], critic_hidden_dims=[256, 256, 256], activation='elu', init_noise_std=1.0, **kwargs):
super(ActorCritic, self).__init__()
activation = get_activation(activation)
mlp_input_dim_a = num_actor_obs
mlp_input_dim_c = num_critic_obs
actor_layers = []
actor_layers.append(nn.Linear(mlp_input_dim_a, actor_hidden_dims[0]))
actor_layers.append(activation)
for l in range(len(actor_hidden_dims)):
if l == len(actor_hidden_dims) - 1:
actor_layers.append(nn.Linear(actor_hidden_dims[l], num_actions))
else:
actor_layers.append(nn.Linear(actor_hidden_dims[l], actor_hidden_dims[l + 1]))
actor_layers.append(activation)
self.actor = nn.Sequential(*actor_layers)
critic_layers = []
critic_layers.append(nn.Linear(mlp_input_dim_c, critic_hidden_dims[0]))
critic_layers.append(activation)
for l in range(len(critic_hidden_dims)):
if l == len(critic_hidden_dims) - 1:
critic_layers.append(nn.Linear(critic_hidden_dims[l], 1))
else:
critic_layers.append(nn.Linear(critic_hidden_dims[l], critic_hidden_dims[l + 1]))
critic_layers.append(activation)
self.critic = nn.Sequential(*critic_layers)
1.3.5 modules/depth_backbone.py
1.3.6 modules/normalizer.py
1.4 rsl_rl/runners
相关免费在线工具
- 加密/解密文本
使用加密算法(如AES、TripleDES、Rabbit或RC4)加密和解密文本明文。 在线工具,加密/解密文本在线工具,online
- RSA密钥对生成器
生成新的随机RSA私钥和公钥pem证书。 在线工具,RSA密钥对生成器在线工具,online
- Mermaid 预览与可视化编辑
基于 Mermaid.js 实时预览流程图、时序图等图表,支持源码编辑与即时渲染。 在线工具,Mermaid 预览与可视化编辑在线工具,online
- curl 转代码
解析常见 curl 参数并生成 fetch、axios、PHP curl 或 Python requests 示例代码。 在线工具,curl 转代码在线工具,online
- Base64 字符串编码/解码
将字符串编码和解码为其 Base64 格式表示形式即可。 在线工具,Base64 字符串编码/解码在线工具,online
- Base64 文件转换器
将字符串、文件或图像转换为其 Base64 表示形式。 在线工具,Base64 文件转换器在线工具,online