CS294(285) Actor Critic案例之路径工具类utils
CS294(285) Actor Critic案例之路径工具类utils
在CS294(285) Actor Critic系列文章中,我们将跟着CS294 285的作业内容,一步一步的实现自己的演员-评论家算法。本篇实现路径工具类utils。
美国加州大学伯克利分校CS294(285)系列:
CS294(285) Actor Critic之agents https://duanzhihua.blog.csdn.net/article/details/103106090
CS294(285) Actor Critic之Critic https://duanzhihua.blog.csdn.net/article/details/103110785
强化学习系列:
Deep Q Network 算法 https://duanzhihua.blog.csdn.net/article/details/102963043
Double Deep Q Network 算法 https://duanzhihua.blog.csdn.net/article/details/102990509
Policy Gradient (策略梯度算法)https://duanzhihua.blog.csdn.net/article/details/102982992
Actor Critic算法 https://duanzhihua.blog.csdn.net/article/details/103097054
Actor Critic案例之路径工具类utils的几个主要方法:
sample_trajectory:抽样一条路径轨迹Path,限制每条路径步数,路径步数最大为max_path_length,每一步包括观测值、 观测图像、动作值、奖励、下一个观测值、终端状态值。
sample_trajectories:抽样多条路径,限制每条路径步数,每条路径的步数为pathLen;限制总步数,总步数求和小于每个批次的最小步数。
sample_n_trajectories:抽样n条路径。限制每次的步数,但不限总步数。
Path:构建1条路径,Path包括路径中每一步的观测值、图像观测值、行动值、奖励、下一个观测值、终端状态。
convert_listofrollouts:分拆路径集,分别构建各个元素。
将路径集(多条路径)的每一条路径的各个元素取出来,重新按元素组合。
观测值:每一条路径中每一步观测值的合并。
动作值:每一条路径中每一步动作值的合并。
下一个观测值:每一条路径中每一步下一个观测值的合并。
终端值:每一条路径中每一步终端值的合并。
连锁奖励值:每一条路径中每一步奖励值的合并。
非连锁奖励值:各条路径奖励的列表。
import numpy as np
import time
############################################
############################################
def sample_trajectory(env, policy, max_path_length, render=False, render_mode=('rgb_array')):
# initialize env for the beginning of a new rollout
ob = env.reset() # HINT: should be the output of resetting the env
# init vars
obs, acs, rewards, next_obs, terminals, image_obs = [], [], [], [], [], []
steps = 0
while True:
# render image of the simulated env
if render:
if 'rgb_array' in render_mode:
if hasattr(env, 'sim'):
if 'track' in env.env.model.camera_names:
image_obs.append(env.sim.render(camera_name='track', height=500, width=500)[::-1])
else:
image_obs.append(env.sim.render(height=500, width=500)[::-1])
else:
image_obs.append(env.render(mode=render_mode))
if 'human' in render_mode:
env.render(mode=render_mode)
time.sleep(env.model.opt.timestep)
# use the most recent ob to decide what to do
obs.append(ob)
ac = policy.get_action(ob) # HINT: query the policy's get_action function
ac = ac[0]
acs.append(ac)
# take that action and record results
ob, rew, done, _ = env.step(ac)
# record result of taking that action
steps += 1
next_obs.append(ob)
rewards.append(rew)
# End the rollout if the rollout ended
# Note that the rollout can end due to done, or due to max_path_length
rollout_done = 1 if (done or (steps >= max_path_length)) else 0 # HINT: this is either 0 or 1
terminals.append(rollout_done)
if rollout_done:
break
return Path(obs, image_obs, acs, rewards, next_obs, terminals)
def sample_trajectories(env, policy, min_timesteps_per_batch, max_path_length, render=False, render_mode=('rgb_array')):
"""
Collect rollouts until we have collected min_timesteps_per_batch steps.
implement this function
Hint1: use sample_trajectory to get each path (i.e. rollout) that goes into paths
Hint2: use get_pathlength to count the timesteps collected in each path
"""
timesteps_this_batch = 0
paths = []
print("\n")
while timesteps_this_batch < min_timesteps_per_batch:
path = sample_trajectory(env, policy, max_path_length, render, render_mode)
paths.append(path)
pathLen = get_pathlength(path)
timesteps_this_batch += pathLen
print("Steps for batch " + str(len(paths)) + "= " + str(pathLen) + "\n")
print("Num Rollouts " + str(len(paths)) + "\n")
return paths, timesteps_this_batch
def sample_n_trajectories(env, policy, ntraj, max_path_length, render=False, render_mode=('rgb_array')):
"""
Collect ntraj rollouts.
implement this function
Hint1: use sample_trajectory to get each path (i.e. rollout) that goes into paths
"""
paths = []
for _ in range(ntraj):
path = sample_trajectory(env, policy, max_path_length, render, render_mode)
paths.append(path)
return paths
############################################
############################################
def Path(obs, image_obs, acs, rewards, next_obs, terminals):
"""
Take info (separate arrays) from a single rollout
and return it in a single dictionary
"""
if image_obs != []:
image_obs = np.stack(image_obs, axis=0)
return {"observation" : np.array(obs, dtype=np.float32),
"image_obs" : np.array(image_obs, dtype=np.uint8),
"reward" : np.array(rewards, dtype=np.float32),
"action" : np.array(acs, dtype=np.float32),
"next_observation": np.array(next_obs, dtype=np.float32),
"terminal": np.array(terminals, dtype=np.float32)}
def convert_listofrollouts(paths):
"""
Take a list of rollout dictionaries
and return separate arrays,
where each array is a concatenation of that array from across the rollouts
"""
observations = np.concatenate([path["observation"] for path in paths])
actions = np.concatenate([path["action"] for path in paths])
next_observations = np.concatenate([path["next_observation"] for path in paths])
terminals = np.concatenate([path["terminal"] for path in paths])
concatenated_rewards = np.concatenate([path["reward"] for path in paths])
unconcatenated_rewards = [path["reward"] for path in paths]
return observations, actions, next_observations, terminals, concatenated_rewards, unconcatenated_rewards
############################################
############################################
def get_pathlength(path):
return len(path["reward"])