强化学习数学原理详解:从动态规划开始
第一部分:基础数学概念
1.1 马尔可夫决策过程(MDP)
一个马尔可夫决策过程由五元组构成:$\text{MDP} = (S, A, P, R, \gamma)$,其中:
- $S$:状态空间(有限或无限集合)
- $A$:动作空间(有限或无限集合)
- $P$:状态转移概率,$P(s' \mid s,a) = \text{Pr}(S_{t+1}=s' \mid S_t=s, A_t=a)$
- $R$:奖励函数,$R(s,a,s') = \mathbb{E}(R_{t+1} \mid S_t=s, A_t=a, S_{t+1}=s')$
- $\gamma$:折扣因子,$0 \le \gamma < 1$
这里的状态转移概率是一个比较反直觉的点。理论上来说从一个状态出发做出一个动作,下一时刻的状态似乎是确定的,然而在 MDP 中通常涉及概率分布。
1.2 策略(Policy)
策略 $\pi$ 是状态到动作概率分布的映射: $$\pi(a \mid s) = \text{Pr}(A_t=a \mid S_t=s)$$
第二部分:价值函数与贝尔曼方程
2.1 回报(Return)
从时间 $t$ 开始的累积折扣奖励: $$G_t = R_{t+1} + \gamma R_{t+2} + \gamma^2 R_{t+3} + \cdots = \sum_{k=0}^{\infty} \gamma^k R_{t+k+1}$$
2.2 状态价值函数(State-Value Function)
在策略 $\pi$ 下,状态 $s$ 的价值是期望回报: $$V^\pi(s) = \mathbb{E}_\pi(G_t \mid S_t = s)$$
2.3 动作价值函数(Action-Value Function)
在状态 $s$ 执行动作 $a$ 后遵循策略 $\pi$ 的期望回报: $$Q^\pi(s,a) = \mathbb{E}_\pi(G_t \mid S_t = s, A_t = a)$$
2.4 贝尔曼期望方程(Bellman Expectation Equation)
对于 $V^\pi$:
$$V^\pi(s) = \sum_{a \in A} \pi(a \mid s) \sum_{s' \in S} P(s' \mid s,a) \left[ R(s,a,s') + \gamma V^\pi(s') \right]$$
推导: 从定义出发:$V^\pi(s) = \mathbb{E}\pi(G_t \mid S_t = s)$。 展开 $G_t = R{t+1} + \gamma G_{t+1}$,代入得: $$V^\pi(s) = \mathbb{E}\pi(R{t+1} + \gamma G_{t+1} \mid S_t = s)$$ 利用期望线性性拆分: $$V^\pi(s) = \mathbb{E}\pi(R{t+1} \mid S_t = s) + \gamma \mathbb{E}\pi(G{t+1} \mid S_t = s)$$ 第一项为期望即时奖励: $$\mathbb{E}\pi(R{t+1} \mid S_t = s) = \sum_{a} \pi(a \mid s) \sum_{s'} P(s' \mid s,a) R(s,a,s')$$ 第二项利用马尔可夫性转换为下一状态价值: $$\mathbb{E}\pi\left[G{t+1} \mid S_t = s\right] = \sum_{a} \pi(a \mid s) \sum_{s'} P(s' \mid s,a) V^\pi(s') = \mathbb{E}\left[V^\pi(S_{t+1}) \mid S_t = s\right]$$ 合并即得贝尔曼期望方程。
贝尔曼期望方程用一句话来讲那就是描述了不同状态的状态价值之间的关系。
这里对贝尔曼期望方程做一个直观理解:贝尔曼期望方程即状态价值函数是某个状态下的期望回报。第一项表达的是从这个状态出发能够拿到的即时回报,具体而言,从这个状态出发,在不同的策略下将会执行不同的动作,执行不同的动作会到达不同的状态(即使从相同的状态出发,执行相同的动作,到达的状态也可能会不同)。结合期望的定义,我们可以很容易理解即时回报就是考虑在这个状态下能够做出所有动作,然后考虑在这个动作下会到达的状态及其对应的奖励,相乘起来就得到了即时回报。 第二项则也同样好理解,从某个状态出发,考虑在这个状态下能够做出所有动作,然后考虑在这个动作下会到达的状态及其在这个状态下的状态价值函数,这个就是未来回报。
注:上面的贝尔曼期望方程,其实是贝尔曼方程的另一种表达,在学习 TD learning 时会看到这种表示形式,大家一般看见的贝尔曼方程的形式: $$V^\pi(s) = \sum_{a \in A} \pi(a \mid s) \left[ \sum_{r \in R} p(r \mid s,a) r + \gamma \sum_{s' \in S} P(s' \mid s,a) V^\pi(s') \right]$$ 二者其实是等价的。
对于 $Q^\pi$:
$$Q^\pi(s,a) = \sum_{s' \in S} P(s' \mid s,a) \left[ R(s,a,s') + \gamma \sum_{a' \in A} \pi(a' \mid s') Q^\pi(s',a') \right]$$
动作价值函数与状态价值函数的关系: $$V^\pi(s) = \sum_{a \in A} \pi(a \mid s) \cdot Q^\pi(s,a)$$
第三部分:动态规划算法
3.1 策略评估(Policy Evaluation)
问题:给定策略 $\pi$,计算其对应的状态价值函数 $V^\pi$。 方法:迭代应用贝尔曼期望方程。 算法步骤:
- 初始化 $V_0(s)$ 为任意值(通常为 0)。
- 对于 $k=0,1,2,\ldots$,执行迭代更新: $$V_{k+1}(s) = \sum_{a \in A} \pi(a \mid s) \sum_{s' \in S} P(s' \mid s,a) \left[ R(s,a,s') + \gamma V_k(s') \right]$$
- 当 $\max_s |V_{k+1}(s) - V_k(s)| < \theta$($\theta$ 为收敛阈值)时停止。
收敛性:贝尔曼期望算子是 $\gamma$-收缩映射,必收敛到唯一不动点 $V^\pi$。
3.2 策略改进(Policy Improvement)
问题:给定价值函数 $V^\pi$,找到更优的策略 $\pi'$。 核心定理(策略改进定理): 对于任意两个确定性策略 $\pi$ 和 $\pi'$,若对所有状态 $s$ 满足: $$Q^\pi(s, \pi'(s)) \ge V^\pi(s)$$ 则对所有状态 $s$ 有: $$V^{\pi'}(s) \ge V^\pi(s)$$
3.3 策略迭代(Policy Iteration)
算法流程:
- 初始化随机策略 $\pi_0$。
- 对于 $i=0,1,2,\ldots$: a. 策略评估:计算 $V^{\pi_i}$。 b. 策略改进:通过贪心策略更新得到 $\pi_{i+1}$: $$\pi_{i+1}(s) = \arg\max_{a \in A} \sum_{s' \in S} P(s' \mid s,a) \left[ R(s,a,s') + \gamma V^{\pi_i}(s') \right]$$ 即 $$\pi_{i+1}(s) = \arg\max_{a\in A} Q^{\pi_i}(s,a)$$ c. 若 $\pi_{i+1} = \pi_i$,停止迭代,得到最优策略 $\pi^*$。
即:对每个状态,选择能够最大化动作价值的动作。 算法流程:先定策略,再评估,再改进。 策略评估部分在数学上求解需要求解方程组(或者矩阵计算求逆),在工程上的大规模问题这种方式不方便求解,在工程上一般使用迭代算法来求解。
3.3.1 策略评估(求解当前策略的 $V^\pi$)
- 初始化:$V_0(s) = 0, \forall s \in S$
- 迭代更新:$V_{k+1}(s) = \sum_a \pi(a|s) \sum_{s'} P(s'|s,a) \left( R(s,a,s') + \gamma V_k(s') \right)$
- 收敛终止:$\max_s |V_{k+1}(s) - V_k(s)| < \theta \quad \Rightarrow \quad V^\pi = V_k$
3.3.2 策略改进(构造更优策略 $\pi'$)
- 计算动作价值:$Q^\pi(s,a) = \sum_{s'} P(s'|s,a) \left( R(s,a,s') + \gamma V^\pi(s') \right)$
- 贪心更新策略:$\pi'(s) = \arg\max_a Q^\pi(s,a)$
3.3.3 收敛判断
若 $\pi' = \pi$,则收敛到最优策略 $\pi^* = \pi$;否则令 $\pi = \pi'$,回到「策略评估」继续迭代。
3.4 价值迭代(Value Iteration)
核心思想:直接迭代最优价值函数,将策略评估的迭代次数简化为 1 次。 算法步骤:
- 初始化 $V_0(s)$ 为任意值(通常为 0)。
- 对于 $k=0,1,2,\ldots$,执行迭代更新: $$V_{k+1}(s) = \max_{a \in A} \sum_{s' \in S} P(s' \mid s,a) \left[ R(s,a,s') + \gamma V_k(s') \right]$$
- 当 $\max_s |V_{k+1}(s) - V_k(s)| < \theta$ 时停止。
- 提取最优策略: $$\pi^(s) = \arg\max_{a \in A} \sum_{s' \in S} P(s' \mid s,a) \left[ R(s,a,s') + \gamma V^(s') \right]$$
算法流程:直接算出最优值,再提取策略。
第四部分:具体示例与计算
4.1 3×3 网格世界示例
环境设定:
- 状态:9 个格子(编号 0~8,右下角 8 为目标)
- 动作:上、下、左、右(4 个动作,撞墙时状态不变)
- 转移:确定性转移($P(s' \mid s,a) = 1$ 或 0)
- 奖励:普通移动 -1,到达目标 +10,目标状态 0
- 折扣因子:$\gamma = 0.9$
4.1.1 策略评估计算(随机策略)
以角落状态 $s_{00}$(编号 0)为例,随机策略下每个动作概率 0.25: $$\begin{align*} V^\pi(s_{00}) &= 0.25 \times [-1 + 0.9V^\pi(s_{00})] + 0.25 \times [-1 + 0.9V^\pi(s_{10})] \ &+ 0.25 \times [-1 + 0.9V^\pi(s_{01})] + 0.25 \times [-1 + 0.9V^\pi(s_{00})] \end{align*}$$ 简化得: $$V^\pi(s_{00}) = \frac{-1 + 0.225V^\pi(s_{10}) + 0.225V^\pi(s_{01})}{0.55}$$
4.1.2 价值迭代计算(中心状态 $s_{11}$)
初始 $V_0(s_{11}) = 0$:
- 第一次迭代:$V_1(s_{11}) = \max(-1, -1, -1, -1) = -1$
- 第二次迭代:$V_2(s_{11}) = \max(-1 + 0.9 \times (-1), \dots) = -1.9$
第五部分:代码实现(Python)
5.1 策略迭代代码
import numpy as np
class GridWorldMDP:
"""3×3 网格世界 MDP 实现"""
def __init__(self, size=3, goal_reward=10, step_cost=-1, gamma=0.9):
self.size = size
self.n_states = size * size
self.n_actions = 4 # 0:上,1:下,2:左,3:右
self.goal_reward = goal_reward
self.step_cost = step_cost
self.gamma = gamma
# 初始化转移概率 P(s'|s,a) 和奖励函数 R(s,a,s')
self.P = np.zeros((self.n_states, self.n_actions, self.n_states))
self.R = np.zeros((self.n_states, self.n_actions, self.n_states))
self._build_transitions()
def _build_transitions(self):
"""构建确定性转移和奖励函数"""
goal_state = self.n_states - 1 # 右下角为目标
for s in range(self.n_states):
row, col = divmod(s, self.size)
for a in range(self.n_actions):
# 计算下一个状态(撞墙时不变)
if a == 0:
next_row, next_col = max(row-1, 0), col
elif a == 1:
next_row, next_col = min(row+1, self.size-1), col
elif a == 2:
next_row, next_col = row, max(col-1, 0)
else:
next_row, next_col = row, min(col+1, self.size-1)
s_next = next_row * self.size + next_col
self.P[s, a, s_next] = 1.0 # 确定性转移
# 设置奖励
if s == goal_state:
self.R[s, a, s_next] = 0
elif s_next == goal_state:
self.R[s, a, s_next] = self.goal_reward
else:
self.R[s, a, s_next] = self.step_cost
def policy_evaluation(self, policy, max_iter=1000, theta=1e-6):
"""策略评估:迭代求解 V^π"""
V = np.zeros(self.n_states)
for i in range(max_iter):
delta = 0
V_new = np.zeros(self.n_states)
for s in range(self.n_states):
v = 0
for a in range(self.n_actions):
pi_a_s = policy[s, a]
expected_value = 0
for s_next in range(self.n_states):
p = self.P[s, a, s_next]
r = self.R[s, a, s_next]
expected_value += p * (r + self.gamma * V[s_next])
v += pi_a_s * expected_value
V_new[s] = v
delta = max(delta, abs(v - V[s]))
V = V_new
if delta < theta:
print(f"策略评估在{i+1}次迭代后收敛")
break
return V
def policy_improvement(self, V):
"""策略改进:贪心策略更新"""
new_policy = np.zeros((self.n_states, self.n_actions))
for s in range(self.n_states):
# 计算所有动作的 Q 值
q_values = np.zeros(self.n_actions)
for a in range(self.n_actions):
q = 0
for s_next in range(self.n_states):
p = self.P[s, a, s_next]
r = self.R[s, a, s_next]
q += p * (r + self.gamma * V[s_next])
q_values[a] = q
# 选择最优动作(多最优动作时平均分配概率)
best_actions = np.where(q_values == np.max(q_values))[0]
for a in best_actions:
new_policy[s, a] = 1.0 / len(best_actions)
return new_policy
def policy_iteration(self, max_iter=100):
"""策略迭代算法"""
# 初始化随机策略
policy = np.ones((self.n_states, self.n_actions)) / self.n_actions
for i in range(max_iter):
print(f"\n=== 策略迭代第{i+1}轮 ===")
# 策略评估
V = self.policy_evaluation(policy)
# 策略改进
new_policy = self.policy_improvement(V)
# 检查收敛
if np.allclose(policy, new_policy):
print(f"策略在{i+1}次迭代后收敛到最优")
return V, new_policy
policy = new_policy
print("达到最大迭代次数")
return V, policy
def value_iteration(self, max_iter=1000, theta=1e-6):
"""价值迭代算法"""
V = np.zeros(self.n_states)
for i in range(max_iter):
delta = 0
V_new = np.zeros(self.n_states)
for s in range(self.n_states):
# 计算所有动作的期望价值
action_values = np.zeros(self.n_actions)
for a in range(self.n_actions):
q = 0
for s_next in range(self.n_states):
p = self.P[s, a, s_next]
r = self.R[s, a, s_next]
q += p * (r + self.gamma * V[s_next])
action_values[a] = q
# 取最大值
V_new[s] = np.max(action_values)
delta = max(delta, abs(V_new[s] - V[s]))
V = V_new
if delta < theta:
print(f"价值迭代在{i+1}次迭代后收敛")
break
# 提取最优策略
optimal_policy = self.policy_improvement(V)
return V, optimal_policy
# 运行示例
if __name__ == "__main__":
print("="*60)
print("3×3 网格世界 MDP 示例")
print("="*60)
# 创建 MDP 实例
mdp = GridWorldMDP(size=3, goal_reward=10, step_cost=-1, gamma=0.9)
# 1. 策略迭代
print("\n1. 策略迭代:")
V_pi, policy_pi = mdp.policy_iteration(max_iter=10)
print(f"最优价值函数(策略迭代):\n{V_pi.reshape(3,3).round(4)}")
# 2. 价值迭代
print("\n" + "="*60)
print("\n2. 价值迭代:")
V_vi, policy_vi = mdp.value_iteration()
print(f"最优价值函数(价值迭代):\n{V_vi.reshape(3,3).round(4)}")
# 比较结果
print("\n" + "="*60)
print("策略迭代 vs 价值迭代:")
print(f"价值函数最大差异:{np.max(np.abs(V_pi - V_vi)):.6f}")
print(f"策略最大差异:{np.max(np.abs(policy_pi - policy_vi)):.6f}")
# 验证贝尔曼最优方程
print("\n" + "="*60)
print("验证贝尔曼最优方程:")
for s in range(mdp.n_states):
lhs = V_vi[s]
# 计算右侧
action_values = np.zeros(mdp.n_actions)
for a in range(mdp.n_actions):
q = 0
for s_next in range(mdp.n_states):
p = mdp.P[s, a, s_next]
r = mdp.R[s, a, s_next]
q += p * (r + mdp.gamma * V_vi[s_next])
action_values[a] = q
rhs = np.max(action_values)
print(f"状态{s}: LHS={lhs:.4f}, RHS={rhs:.4f}, 差异={abs(lhs-rhs):.6f}")
5.2 价值迭代代码
from Policy_iteration import GridWorldMDP
import numpy as np
class ValueIterationMDP(GridWorldMDP):
def value_iteration(self, theta=1e-6):
V = np.zeros(self.n_states)
print("="*60)
print("价值迭代算法开始")
print("="*60)
print(f"初始价值函数:\n{V.reshape(self.size, self.size)}\n")
for iteration in range(1000):
delta = 0
V_new = np.zeros(self.n_states)
for s in range(self.n_states):
action_value = []
for a in range(self.n_actions):
q = 0
# 计算每个动作 a 的完整 Q 值(求和所有转移)
for next_state in np.where(self.P[s, a]>0)[0]:
q += self.P[s, a, next_state]*(self.R[s, a, next_state]+ self.gamma * V[next_state])
action_value.append(q)
V_new[s] = max(action_value) if action_value else 0.0
delta = max(delta, abs(V[s]- V_new[s]))
V = V_new
if iteration % 5 == 0:
print(f"第{iteration +1}轮迭代 | 本轮最大误差:{delta:.6f}")
print(f"当前价值函数:\n{V.reshape(self.size, self.size)}\n")
# 收敛判断
if delta < theta:
print(f"\n✅ 价值迭代在第{iteration +1}次迭代收敛!")
print(f"最终收敛误差:{delta:.8f}")
break
# 提取最优策略
policy = self._extract_policy(V)
return V, policy
def _extract_policy(self, V):
policy = np.zeros((self.n_states, self.n_actions))
for s in range(self.n_states):
q_value = []
for a in range(self.n_actions):
q = 0
for next_state in np.where(self.P[s, a]>0)[0]:
q += self.P[s, a, next_state]*(self.R[s, a, next_state]+ self.gamma * V[next_state])
q_value.append(q)
max_q = max(q_value)
# 筛选所有最优动作(处理多最优动作均分概率)
best_action = [a for a, q in enumerate(q_value) if abs(q - max_q)<1e-6]
for a in best_action:
policy[s, a] = 1.0/len(best_action)
return policy
if __name__ == "__main__":
mdp_vi = ValueIterationMDP()
print("价值迭代算法完整演示")
print("="*60)
# 运行价值迭代
V_vi, policy_vi = mdp_vi.value_iteration()
# 1. 展示最终收敛的最优价值函数
print("\n" + "="*60)
print("最终收敛 - 最优价值函数")
print("="*60)
V_grid = V_vi.reshape(mdp_vi.size, mdp_vi.size)
print(V_grid)
# 2. 展示最终收敛的最优策略矩阵
print("\n" + "="*60)
print("最终收敛 - 最优策略矩阵 (状态数×动作数)")
print("="*60)
print(f"策略矩阵形状:{policy_vi.shape}")
print("【矩阵含义:行=状态,列=动作,值=选择该动作的概率】")
print(policy_vi)
# 3. 策略可视化
print("\n" + "="*60)
print(" 最优策略 - 网格可视化")
print("="*60)
# 动作映射(根据你的 GridWorldMDP 动作定义,通用上下左右映射)
action_map = {0:"↑",1:"↓",2:"←",3:"→"}
policy_grid = np.zeros((mdp_vi.size, mdp_vi.size), dtype=object)
for i in range(mdp_vi.size):
for j in range(mdp_vi.size):
s = i * mdp_vi.size + j # 网格坐标转状态编号
best_acts = np.where(policy_vi[s]>0)[0] # 筛选最优动作
act_str = "".join([action_map[a] for a in best_acts])
policy_grid[i, j] = act_str if act_str else "终端"
print("【每个网格值:当前位置的最优动作,多动作表示等概率选择】")
print(policy_grid)
补充说明:
- 策略迭代是基于初始的状态价值出发,在策略下不断迭代,最终得到对该策略下的状态价值的估计,然后使用这个状态价值,计算动作价值,并去更新策略,循环往复得到最优策略。
- 价值迭代也是从初始状态价值出发,但是和策略迭代不同的是,每次迭代他都直接计算在该数值(状态价值是状态 s 执行动作 a 后遵循策略 π 的期望回报,策略迭代的内层迭代专门对状态价值进行估计,而这里没有估计,所以这里以及称不上该值为状态价值了,只是一个值而已)下的动作价值,然后在找到每个状态哪个动作的动作价值最高,得到新的策略和新的 V 值,这里的新的策略不会影响后续的值,因为在下一轮迭代中只用用到上一轮迭代的新的 V 值和环境值(P,R)来计算新的动作价值。

