深度确定性策略梯度算法 (DDPG) 详解与实现
深度确定性策略梯度算法 DDPG 适用于连续动作空间控制问题,结合 DQN 与策略梯度方法。本文介绍其核心原理包括目标网络、经验回放及双网络架构,推导了 Q 值函数与策略更新公式。提供基于 PyTorch 和 Gym 环境的完整 Python 代码实现,涵盖 Actor-Critic 网络定义、训练循环及可视化,并分析算法优势与应用场景如机器人控制。

深度确定性策略梯度算法 DDPG 适用于连续动作空间控制问题,结合 DQN 与策略梯度方法。本文介绍其核心原理包括目标网络、经验回放及双网络架构,推导了 Q 值函数与策略更新公式。提供基于 PyTorch 和 Gym 环境的完整 Python 代码实现,涵盖 Actor-Critic 网络定义、训练循环及可视化,并分析算法优势与应用场景如机器人控制。

深度确定性策略梯度(Deep Deterministic Policy Gradient、DDPG)算法是一种基于深度强化学习的算法,适用于解决连续动作空间的问题,比如机器人控制中的连续运动。它结合了确定性策略和深度神经网络,是一种模型无关的强化学习算法,属于 Actor-Critic 框架,并且同时利用了 DQN 和 PG(Policy Gradient)的优点。

适用于连续动作空间: DDPG 直接输出连续值动作,无需对动作进行离散化。
利用确定性策略: 与随机策略不同,DDPG 输出的是每个状态下一个确定的最优动作。
结合目标网络: 使用延迟更新的目标网络,稳定了训练过程,避免了过大的参数波动。
经验回放机制: 通过经验回放缓解数据相关性,提升样本利用率。
高效学习: 使用 Critic 网络评估动作的质量,使得策略优化过程更加高效。
1. 从 DQN 继承的目标网络: 避免 Q 值的估计震荡问题,提高算法的训练稳定性。
2. 从 PG 继承的策略梯度优化: 通过 Actor 网络直接优化策略,适应连续动作问题。
3. 经验回放(Replay Buffer): 将交互环境中的经验(状态、动作、奖励、下一状态)存储起来,训练时从中随机采样,减少数据相关性和样本浪费。
4. 双网络架构: Actor 网络负责生成动作;Critic 网络评估动作的质量。
DDPG 使用 Bellman 方程更新 Critic 网络的目标 Q 值:
$$y = r + \gamma Q'(s', \mu'(s'; \theta^{\mu'}); \theta^{Q'})$$
其中 $s'$ 是下一状态,$\mu'(s')$ 是目标动作。
$\gamma$ 是折扣因子。
$\mu'$ 是目标 Actor 网络。
$Q'$ 是目标 Critic 网络。
Critic 网络的优化目标是最小化以下损失函数:
$$L(\theta^Q) = \frac{1}{N} \sum_{i} \left( Q(s_i, a_i; \theta^Q) - y_i \right)^2$$
其中:
$y_i$ 是目标值。
$\theta^Q$ 是 Critic 网络的参数。
Actor 网络通过最大化 Critic 网络的 Q 值来优化策略,其目标函数为:
$$J(\theta^\mu) = \frac{1}{N} \sum_{i} Q(s_i, \mu(s_i; \theta^\mu); \theta^Q)$$
使用梯度上升法更新 Actor 网络:
$$\nabla_{\theta^\mu} J \approx \frac{1}{N} \sum_{i} \nabla_a Q(s, a; \theta^Q) \big|{a=\mu(s)} \nabla{\theta^\mu} \mu(s; \theta^\mu)$$
目标网络采用软更新方式,缓慢地向当前网络靠近:
$$\theta^{Q'} \leftarrow \tau \theta^Q + (1 - \tau) \theta^{Q'}$$ $$\theta^{\mu'} \leftarrow \tau \theta^\mu + (1 - \tau) \theta^{\mu'}$$
其中 $\tau \in (0, 1)$ 是软更新系数。
更新 Critic 网络: 使用采样数据和目标 Critic 网络计算目标值 $y_i$,最小化 Critic 的损失函数。
采样训练: 从经验池中随机采样小批量数据 $(s_i, a_i, r_i, s'_i)$。
存储经验: 将 $(s_t, a_t, r_t, s_{t+1})$ 存储到经验回放池。
交互环境: 在状态 $s_t$ 下,通过 Actor 网络生成动作 $a_t$。执行动作 $a_t$,获取奖励 $r_t$ 和下一状态 $s_{t+1}$。

下面给出了 DDPG(深度确定性策略梯度)算法的完整 Python 实现。该实现包括 Actor-Critic 架构、缓冲区和目标网络等。
import gym # 导入 Gym 库,用于创建和管理强化学习环境
import numpy as np # 导入 NumPy,用于处理数组和数学运算
import torch # 导入 PyTorch,用于构建和训练神经网络
import torch.nn as nn # 导入 PyTorch 的神经网络模块
import torch.optim as optim # 导入 PyTorch 的优化器模块
from collections import deque # 导入双端队列,用于实现经验回放池
import random # 导入随机模块,用于从经验池中采样
Pendulum-v1)。# 定义 Actor 网络(策略网络)
class Actor(nn.Module):
def __init__(self, state_dim, action_dim, max_action):
super(Actor, self).__init__()
self.layer1 = nn.Linear(state_dim, 256) # 输入层到隐藏层 1,大小为 256
self.layer2 = nn.Linear(256, 256) # 隐藏层 1 到隐藏层 2,大小为 256
self.layer3 = nn.Linear(256, action_dim) # 隐藏层 2 到输出层,输出动作维度
self.max_action = max_action # 动作的最大值,用于限制输出范围
def forward(self, state):
x = torch.relu(self.layer1(state)) # 使用 ReLU 激活函数处理隐藏层 1
x = torch.relu(self.layer2(x)) # 使用 ReLU 激活函数处理隐藏层 2
x = torch.tanh(self.layer3(x)) * self.max_action # 使用 Tanh 激活函数,并放大到动作范围
return x # 返回输出动作
解析:
state_dim(环境状态的维度)、action_dim(动作空间的维度)、max_action(动作的最大值)。tanh 激活函数(将动作限制在 [-1, 1]),再乘以 max_action 缩放到实际动作范围。# 定义 Critic 网络(价值网络)
class Critic(nn.Module):
def __init__(self, state_dim, action_dim):
super(Critic, self).__init__()
self.layer1 = nn.Linear(state_dim + action_dim, 256) # 将状态和动作拼接后输入到隐藏层 1
self.layer2 = nn.Linear(256, 256) # 隐藏层 1 到隐藏层 2,大小为 256
self.layer3 = nn.Linear(256, 1) # 隐藏层 2 到输出层,输出 Q 值
def forward(self, state, action):
x = torch.cat([state, action], dim=1) # 将状态和动作拼接为单个输入
x = torch.relu(self.layer1(x)) # 使用 ReLU 激活函数处理隐藏层 1
x = torch.relu(self.layer2(x)) # 使用 ReLU 激活函数处理隐藏层 2
x = self.layer3(x) # 输出 Q 值
return x # 返回 Q 值
解析:
state(环境的当前状态)、action(给定状态下的动作)。# 定义经验回放池
class ReplayBuffer:
def __init__(self, max_size):
self.buffer = deque(maxlen=max_size) # 初始化一个双端队列,设置最大容量
def add(self, state, action, reward, next_state, done):
self.buffer.append((state, action, reward, next_state, done)) # 将经验存入队列
def sample(self, batch_size):
batch = random.sample(self.buffer, batch_size) # 随机采样一个小批量数据
states, actions, rewards, next_states, dones = zip(*batch) # 解压采样数据
return (np.array(states), np.array(actions), np.array(rewards), np.array(next_states), np.array(dones)) # 返回 NumPy 数组格式的数据
def size(self):
return len(self.buffer) # 返回经验池中当前存储的样本数量
解析:
add(存入经验)、sample(随机采样)、size(返回数量)。# 定义 DDPG 智能体
class DDPGAgent:
def __init__(self, state_dim, action_dim, max_action, gamma=0.99, tau=0.005, buffer_size=100000, batch_size=64):
self.actor = Actor(state_dim, action_dim, max_action) # 初始化 Actor 网络
self.actor_target = Actor(state_dim, action_dim, max_action) # 初始化目标 Actor 网络
self.actor_target.load_state_dict(self.actor.state_dict()) # 将 Actor 网络的权重复制到目标网络
self.actor_optimizer = optim.Adam(self.actor.parameters(), lr=1e-4) # 定义 Actor 网络的优化器
self.critic = Critic(state_dim, action_dim) # 初始化 Critic 网络
self.critic_target = Critic(state_dim, action_dim) # 初始化目标 Critic 网络
self.critic_target.load_state_dict(self.critic.state_dict()) # 将 Critic 网络的权重复制到目标网络
self.critic_optimizer = optim.Adam(self.critic.parameters(), lr=1e-3) # 定义 Critic 网络的优化器
self.max_action = max_action # 动作的最大值
self.gamma = gamma # 折扣因子
self.tau = tau # 目标网络软更新参数
self.replay_buffer = ReplayBuffer(buffer_size) # 初始化经验回放池
self.batch_size = batch_size # 批量大小
解析:
gamma(折扣因子)、tau(目标网络更新的系数)、buffer_size(经验池容量)、batch_size(每次训练样本数量)。# 根据状态选择动作
def select_action(self, state):
state = torch.FloatTensor(state.reshape(1, -1)) # 将状态转换为 PyTorch 张量
action = self.actor(state).detach().cpu().numpy().flatten() # 使用 Actor 网络生成动作,并转为 NumPy 数组
return action # 返回动作
作用:根据当前状态 (s),生成一个连续动作 (a)。
# 训练方法
def train(self):
# 如果回放池中样本数量不足,直接返回
if self.replay_buffer.size() < self.batch_size:
return
# 从回放池中采样一批数据
states, actions, rewards, next_states, dones = self.replay_buffer.sample(self.batch_size)
# 将采样的数据转换为张量
states = torch.FloatTensor(states)
actions = torch.FloatTensor(actions)
rewards = torch.FloatTensor(rewards).unsqueeze(1) # 添加一个维度以匹配 Q 值维度
next_states = torch.FloatTensor(next_states)
dones = torch.FloatTensor(dones).unsqueeze(1) # 添加一个维度以匹配 Q 值维度
# 计算 critic 的损失
with torch.no_grad():
# 关闭梯度计算
next_actions = self.actor_target(next_states) # 使用目标 actor 网络预测下一步动作
target_q = self.critic_target(next_states, next_actions) # 目标 Q 值
# 使用贝尔曼方程更新目标 Q 值
target_q = rewards + (1 - dones) * self.gamma * target_q
# 当前 Q 值
current_q = self.critic(states, actions)
# 均方误差损失
critic_loss = nn.MSELoss()(current_q, target_q)
# 优化 critic 网络
self.critic_optimizer.zero_grad()
critic_loss.backward()
self.critic_optimizer.step()
# 计算 actor 的损失
actor_loss = -self.critic(states, self.actor(states)).mean() # 策略梯度目标为最大化 Q 值
# 优化 actor 网络
self.actor_optimizer.zero_grad()
actor_loss.backward()
self.actor_optimizer.step()
# 更新目标网络参数(软更新)
for target_param, param in zip(self.critic_target.parameters(), self.critic.parameters()):
target_param.data.copy_(self.tau * param.data + (1 - self.tau) * target_param.data)
for target_param, param in zip(self.actor_target.parameters(), self.actor.parameters()):
target_param.data.copy_(self.tau * param.data + (1 - self.tau) * target_param.data)
# 将样本添加到回放池中
def add_to_replay_buffer(self, state, action, reward, next_state, done):
self.replay_buffer.add(state, action, reward, next_state, done)
解析:
def train_ddpg(env_name, episodes=1000, max_steps=200):
# 创建环境
env = gym.make(env_name)
state_dim = env.observation_space.shape[0] # 状态空间维度
action_dim = env.action_space.shape[0] # 动作空间维度
max_action = float(env.action_space.high[0]) # 动作最大值
# 初始化 DDPG 智能体
agent = DDPGAgent(state_dim, action_dim, max_action)
rewards = [] # 用于存储每个 episode 的奖励
for episode in range(episodes):
state, _ = env.reset() # 重置环境,获取初始状态
episode_reward = 0 # 初始化每轮奖励为 0
for step in range(max_steps):
# 选择动作
action = agent.select_action(state)
# 执行动作,获取环境反馈
next_state, reward, done, _, _ = env.step(action)
# 将样本存入回放池
agent.add_to_replay_buffer(state, action, reward, next_state, done)
# 训练智能体
agent.train()
# 更新当前状态
state = next_state
# 累加奖励
episode_reward += reward
if done: # 如果完成(到达终止状态),结束本轮
break
# 记录每轮的累计奖励
rewards.append(episode_reward)
print(f"Episode: {episode + 1}, Reward: {episode_reward}")
解析:
import matplotlib.pyplot as plt
# 绘制学习曲线
plt.plot(rewards)
plt.title("Learning Curve")
plt.xlabel("Episodes")
plt.ylabel("Cumulative Reward")
plt.show()
"""《DDPG 算法的代码》
时间:2024.12
环境:gym
"""
import gym
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
from collections import deque
import random
# 定义 Actor 网络(策略网络)
class Actor(nn.Module):
def __init__(self, state_dim, action_dim, max_action):
super(Actor, self).__init__()
self.layer1 = nn.Linear(state_dim, 256)
self.layer2 = nn.Linear(256, 256)
self.layer3 = nn.Linear(256, action_dim)
self.max_action = max_action
def forward(self, state):
x = torch.relu(self.layer1(state))
x = torch.relu(self.layer2(x))
x = torch.tanh(self.layer3(x)) * self.max_action
return x
# 定义 Critic 网络(价值网络)
class Critic(nn.Module):
def __init__(self, state_dim, action_dim):
super(Critic, self).__init__()
self.layer1 = nn.Linear(state_dim + action_dim, 256)
self.layer2 = nn.Linear(256, 256)
self.layer3 = nn.Linear(256, 1)
def forward(self, state, action):
x = torch.cat([state, action], dim=1)
x = torch.relu(self.layer1(x))
x = torch.relu(self.layer2(x))
x = self.layer3(x)
return x
# 定义经验回放池
class ReplayBuffer:
def __init__(self, max_size):
self.buffer = deque(maxlen=max_size)
def add(self, state, action, reward, next_state, done):
self.buffer.append((state, action, reward, next_state, done))
def sample(self, batch_size):
batch = random.sample(self.buffer, batch_size)
states, actions, rewards, next_states, dones = zip(*batch)
return (np.array(states), np.array(actions), np.array(rewards), np.array(next_states), np.array(dones))
def size(self):
return len(self.buffer)
# DDPG 智能体类定义
class DDPGAgent:
def __init__(self, state_dim, action_dim, max_action, gamma=0.99, tau=0.005, buffer_size=100000, batch_size=64):
self.actor = Actor(state_dim, action_dim, max_action)
self.actor_target = Actor(state_dim, action_dim, max_action)
self.actor_target.load_state_dict(self.actor.state_dict())
self.actor_optimizer = optim.Adam(self.actor.parameters(), lr=1e-4)
self.critic = Critic(state_dim, action_dim)
self.critic_target = Critic(state_dim, action_dim)
self.critic_target.load_state_dict(self.critic.state_dict())
self.critic_optimizer = optim.Adam(self.critic.parameters(), lr=1e-3)
self.max_action = max_action
self.gamma = gamma
self.tau = tau
self.replay_buffer = ReplayBuffer(buffer_size)
self.batch_size = batch_size
def select_action(self, state):
state = torch.FloatTensor(state.reshape(1, -1))
action = self.actor(state).detach().cpu().numpy().flatten()
return action
def train(self):
if self.replay_buffer.size() < self.batch_size:
return
states, actions, rewards, next_states, dones = self.replay_buffer.sample(self.batch_size)
states = torch.FloatTensor(states)
actions = torch.FloatTensor(actions)
rewards = torch.FloatTensor(rewards).unsqueeze(1)
next_states = torch.FloatTensor(next_states)
dones = torch.FloatTensor(dones).unsqueeze(1)
with torch.no_grad():
next_actions = self.actor_target(next_states)
target_q = self.critic_target(next_states, next_actions)
target_q = rewards + (1 - dones) * self.gamma * target_q
current_q = self.critic(states, actions)
critic_loss = nn.MSELoss()(current_q, target_q)
self.critic_optimizer.zero_grad()
critic_loss.backward()
self.critic_optimizer.step()
actor_loss = -self.critic(states, self.actor(states)).mean()
self.actor_optimizer.zero_grad()
actor_loss.backward()
self.actor_optimizer.step()
for target_param, param in zip(self.critic_target.parameters(), self.critic.parameters()):
target_param.data.copy_(self.tau * param.data + (1 - self.tau) * target_param.data)
for target_param, param in zip(self.actor_target.parameters(), self.actor.parameters()):
target_param.data.copy_(self.tau * param.data + (1 - self.tau) * target_param.data)
def add_to_replay_buffer(self, state, action, reward, next_state, done):
self.replay_buffer.add(state, action, reward, next_state, done)
def train_ddpg(env_name, episodes=1000, max_steps=200):
env = gym.make(env_name)
state_dim = env.observation_space.shape[0]
action_dim = env.action_space.shape[0]
max_action = float(env.action_space.high[0])
agent = DDPGAgent(state_dim, action_dim, max_action)
rewards = []
for episode in range(episodes):
state, _ = env.reset()
episode_reward = 0
for step in range(max_steps):
action = agent.select_action(state)
next_state, reward, done, _, _ = env.step(action)
agent.add_to_replay_buffer(state, action, reward, next_state, done)
agent.train()
state = next_state
episode_reward += reward
if done:
break
rewards.append(episode_reward)
print(f"Episode: {episode + 1}, Reward: {episode_reward}")
import matplotlib.pyplot as plt
plt.plot(rewards)
plt.title("Learning Curve")
plt.xlabel("Episodes")
plt.ylabel("Cumulative Reward")
plt.show()
env.close()
if __name__ == "__main__":
env_name = "Pendulum-v1"
episodes = 500
train_ddpg(env_name, episodes=episodes)

演员和评论家网络:
Replay Buffer:
训练:
目标网络:
环境:
# 环境配置
Python 3.11.5
torch 2.1.0
torchvision 0.16.0
gym 0.26.2
由于博文主要为了介绍相关算法的原理和应用的方法,缺乏对于实际效果的关注,算法可能在上述环境中的效果不佳或者无法运行,一是算法不适配上述环境,二是算法未调参和优化,三是没有呈现完整的代码,四是等等。上述代码用于了解和学习算法足够了,但若是想直接将上面代码应用于实际项目中,还需要进行修改。
可以把 DDPG 算法想象成一个赛车手(Actor)和他的教练(Critic):

微信公众号「极客日志」,在微信中扫描左侧二维码关注。展示文案:极客日志 zeeklog
使用加密算法(如AES、TripleDES、Rabbit或RC4)加密和解密文本明文。 在线工具,加密/解密文本在线工具,online
生成新的随机RSA私钥和公钥pem证书。 在线工具,RSA密钥对生成器在线工具,online
基于 Mermaid.js 实时预览流程图、时序图等图表,支持源码编辑与即时渲染。 在线工具,Mermaid 预览与可视化编辑在线工具,online
解析常见 curl 参数并生成 fetch、axios、PHP curl 或 Python requests 示例代码。 在线工具,curl 转代码在线工具,online
将字符串编码和解码为其 Base64 格式表示形式即可。 在线工具,Base64 字符串编码/解码在线工具,online
将字符串、文件或图像转换为其 Base64 表示形式。 在线工具,Base64 文件转换器在线工具,online