CS294(285) Actor Critic算法系列
CS294(285) Actor Critic算法系列
CS294(285) Actor Critic之agents(https://duanzhihua.blog.csdn.net/article/details/103106090)
CS294(285) Actor Critic之Critic (https://duanzhihua.blog.csdn.net/article/details/103110785)
CS294(285) Actor Critic之路径工具类utils(https://duanzhihua.blog.csdn.net/article/details/103142032)
本节实现Actor Critic之Policy:
构建基类BasePolicy
import numpy as np
class BasePolicy (object):
def __init__(self,**kwargs):
super(BasePolicy,self).__init__(**kwargs)
def build_graph(self):
raise NotImplementedError
def get_action(self,obs):
raise NotImplementedError
def update(self,obs,acs):
raise NotImplementedError
def save(self,filepath):
raise NotImplementedError
def restore(self,filepath):
raise NotImplementedError
构建MLPPolicy子类:
继承基类BasePolicy,实现相应的方法 (Tensorflow 已经升版到2.0版本,tensorflow_probability升版到0.8.0版本)
import numpy as np
#import tensorflow as tf
import tensorflow.compat.v1 as tf
tf.disable_v2_behavior()
from .base_policy import BasePolicy
from cs285.infrastructure.tf_utils import build_mlp
import tensorflow_probability as tfp
class MLPPolicy (BasePolicy):
def __init__(self,
sess,
ac_dim,
ob_dim,
n_layers,
size,
learning_rate=1e-4,
training=True,
policy_scope='polic_vars',
discrete=False,
nn_baseline=False,
**kwargs):
super().__init__(**kwargs)
self.sess =sess
self.ac_dim =ac_dim
self.ob_dim =ob_dim
self.n_layers =n_layers
self.size =size
self.discrete =discrete
self.learning_rate =learning_rate
self.training =training
self.nn_baseline = nn_baseline
#构建tensorflow 图
with tf.variable_scope(policy_scope,reuse=tf.AUTO_REUSE):
self.build_graph()
#与训练无关的策略变量的保存器
self.policy_vars =[v for v in tf.all_variables() if policy_scope in v.name and 'train' not in v.name]
self.policy_saver =tf.train.Saver(self.policy_vars,max_to_keep=None)
def build_graph(self):
self.define_placeholders()
self.define_forward_pass()
self.build_action_sampling()
if self.training:
with tf.variable_scope('train',reuse=tf.AUTO_REUSE):
if self.nn_baseline:
self.build_baseline_forward_pass()
self.define_train_op()
def define_placeholders(self):
raise NotImplementedError
def define_forward_pass(self):
if self.discrete:
logits_na=build_mlp(self.observations_pl,output_size=self.ac_dim,scope='discrete_logits',n_layers=self.n_layers,size=self.size)
self.parameters=logits_na
else:
mean =build_mlp(self.observations_pl,output_size=self.ac_dim,scope='continuous_logits',n_layers=self.n_layers,size=self.size)
logstd =tf.Variable(tf.zeros(self.ac_dim),name='logstd')
self.parameters =(mean,logstd)
def build_action_sampling(self):
if self.discrete:
logits_na =self.parameters
'''
第一个参数logits是一个数组,每个元素的值为对应index的选择概率。
如果logits数组中有n个概率值,生成的数都在[0, n-1]之间。
第二个参数num_samples表示抽样的个数。
'''
self.sample_ac=tf.squeeze(tf.multinomial(logits_na,num_samples=1),axis=1)
else:
mean,logstd =self.parameters
self.sample_ac =mean + tf.exp(logstd)*tf.random_normal(tf.shape(mean),0,1)
def define_train_op(self):
raise NotImplementedError
def define_log_prob(self):
if self.discrete:
#分类分布下的对数概率
logits_na =self.parameters
self.logprob_n = tf.distributions.Categorical(logits=logits_na).log_prob(self.actions_pl)
else:
#多元高斯下的对数概率
mean,logstd=self.parameters
self.logprob_n=tfp.distributions.MultivariateNormalDiag(
loc =mean,
scale_diag=tf.exp(logstd).log_prob(self.action_pl)
)
def build_baseline_forward_pass(self):
self.baseline_prediction = tf.squeeze(build_mlp(self.observations_pl,
output_size=1,scope='nn_baseline',
n_layers=self.n_layers,size=self.size))
def save(self,filepath):
self.policy_saver.save(self.sess,filepath,write_meta_graph=False)
def restore(self,filepath):
self.policy_saver.restore(self.sess,filepath)
def update(self,obs,acs):
raise NotImplementedError
def get_action(self,obs):
if len(obs.shape)>1:
observation =obs
else:
observation =obs[None]
return self.sess.run(self.sample_ac,feed_dict={self.observations_pl:observation})
class MLPPolicyPG(MLPPolicy):
def define_placeholders(self):
self.observations_pl =tf.placeholder(shape=[None,self.ob_dim],name='ob',dtype=tf.float32)
if self.discrete:
self.actions_pl = tf.placeholder(shape=[None],name="ac",dtype=tf.int32)
else:
self.actions_pl =tf.placeholder(shape =[None,self.ac_dim],name="ac",dtype=tf.float32)
if self.training:
#优势值
self.adv_n =tf.placeholder(shape=[None],name="adv",dtype=tf.float32)
#基线目标
if self.nn_baseline:
self.targets_n=tf.placeholder(shape=[None],name="baseline_target",dtype=tf.float32)
def define_train_op(self):
#定义当前策略下从观察值执行这个动作的对数概率
self.define_log_prob()
'''
定义一个损失度,使用策略梯度训练策略时应优化的。
提示:
我们期望对收集的路径轨迹的期望最大:sum_{t=0}^{T-1} [grad [log pi(a_t|s_t) * (Q_t - b_t)]]
通过define_log_prob 方法获得当前策略下从观察值执行这个动作的对数概率 log pi(a_t|s_t)
获取计算优势值的占位符,优势值的计算:优势(adv_n)为A(s,A)=Q(s,A)- V(s)
取负数计算,将最大值MAXIMIZE转为计算损失度的最小值MINIMIZE
'''
self.loss =tf.reduce_sum(-self.logprob_n*self.adv_n)
self.train_op =tf.train.AdamOptimizer(self.learning_rate).minimize(self.loss)
if self.nn_baseline:
self.baseline_loss =tf.losses.mean_squared_error(self.baseline_prediction,self.targets_n)
self.baseline_update_op =tf.train.AdamOptimizer(self.learning_rate).minimize(self.baseline_loss)
def run_baseline_prediction(self,obs):
if len(obs.shape)>1:
observation =obs
else:
observation =obs[None]
return self.sess.run(self.baseline_loss,feed_dict={self.observations_pl:obs})
def update(self,observations,acs_na,adv_n=None,acs_labels_na=None,qvals=None):
assert (self.training,'Policy must be created with training= True in order to perform training updates...')
_,loss= self.sess.run([self.train_op,self.loss],feed_dict ={self.observations_pl:observations,
self.actions_pl:acs_na,self.adv_n:adv_n})
if self.nn_baseline:
targets_n=(qvals -np.mean(qvals))/(np.std(qvals)+1e-8)
self.sess.run(self.baseline_update_op,feed_dict={self.targets_n:targets_n,self.observations_pl:observations})
return loss
class MLPPolicyAC(MLPPolicyPG):
'''
演员评论家算法需要多层感知机神经网络策略。
注意:这个类的代码实际上可能与MLPPolicyPG相同,除了不需要基线神经网络
( self.nn_baseline线始终为false)。
'''
def __init__(self,*args,**kwargs):
if 'nn_baseline' in kwargs.keys():
assert kwargs['nn_baseline'] ==False,"MLPPolicyAC should nor use the nn_baseline_flag"
super().__init__(*args,**kwargs)
run_hw3_actor_critic_stepbystep的入口程序:
import os
import gym
import pdb
import time
import numpy as np
import tensorflow.compat.v1 as tf
tf.disable_v2_behavior()
#import tensorflow as tf
import sys
'''
python run_hw3_actor_critic_stepbystep.py --env_name CartPole-v0 -n 100 -b 1000 --exp_name 1_1 -ntu 1 -ngsptu 1
'''
curPath =os.path.abspath((os.path.dirname(__file__)))
rootPath=os.path.split(curPath)[0]
needPath =os.path.split(rootPath)[0]
sys.path.append(needPath)
print(needPath)
from cs285.infrastructure.rl_trainer import RL_Trainer
from cs285.agents.ac_agent import ACAgent
class AC_Trainer(object):
def __init__(self,params):
computation_graph_args ={
'n_layers':params['n_layers'],
'size':params['size'],
'learning_rate':params['learning_rate'],
'num_target_updates':params['num_target_updates'],
'num_grad_steps_per_target_update':params['num_grad_steps_per_target_update']
}
estimate_advantage_args={
'gamma':params['discount'],
'standardize_advantages':not(params['dont_standardize_advantages']),
}
train_args ={
'num_agent_train_steps_per_iter': params['num_agent_train_steps_per_iter'],
'num_critic_updates_per_agent_update': params['num_critic_updates_per_agent_update'],
'num_actor_updates_per_agent_update': params['num_actor_updates_per_agent_update'],
}
agent_params ={**computation_graph_args,**estimate_advantage_args,**train_args}
self.params = params
self.params['agent_class'] = ACAgent
self.params['agent_params'] = agent_params
self.params['batch_size_initial'] = self.params['batch_size']
self.rl_trainer =RL_Trainer(self.params)
def run_training_loop(self):
self.rl_trainer.run_traing_loop(
self.params['n_iter'],
collect_policy=self.rl_trainer.agent.actor,
eval_policy=self.rl_trainer.agent.actor,
)
def main():
import argparse
parser = argparse.ArgumentParser()
parser.add_argument('--env_name', type=str)
parser.add_argument('--ep_len', type=int, default=200)
parser.add_argument('--exp_name', type=str, default='todo')
parser.add_argument('--n_iter', '-n', type=int, default=200)
parser.add_argument('--num_agent_train_steps_per_iter', type=int, default=1)
parser.add_argument('--num_critic_updates_per_agent_update', type=int, default=1)
parser.add_argument('--num_actor_updates_per_agent_update', type=int, default=1)
parser.add_argument('--batch_size', '-b', type=int, default=1000) #steps collected per train iteration
parser.add_argument('--eval_batch_size', '-eb', type=int, default=400) #steps collected per eval iteration
parser.add_argument('--train_batch_size', '-tb', type=int, default=1000) ##steps used per gradient step
parser.add_argument('--discount', type=float, default=1.0)
parser.add_argument('--learning_rate', '-lr', type=float, default=5e-3)
parser.add_argument('--dont_standardize_advantages', '-dsa', action='store_true')
parser.add_argument('--num_target_updates', '-ntu', type=int, default=10)
parser.add_argument('--num_grad_steps_per_target_update', '-ngsptu', type=int, default=10)
parser.add_argument('--n_layers', '-l', type=int, default=2)
parser.add_argument('--size', '-s', type=int, default=64)
parser.add_argument('--seed', type=int, default=1)
parser.add_argument('--use_gpu', '-gpu', action='store_true')
parser.add_argument('--which_gpu', '-gpu_id', default=0)
parser.add_argument('--video_log_freq', type=int, default=-1)
parser.add_argument('--scalar_log_freq', type=int, default=10)
parser.add_argument('--save_params', action='store_true')
args = parser.parse_args()
# convert to dictionary
params = vars(args)
# for policy gradient, we made a design decision
# to force batch_size = train_batch_size
# note that, to avoid confusion, you don't even have a train_batch_size argument anymore (above)
params['train_batch_size'] = params['batch_size']
logdir_prefix = 'ac_'
data_path = os.path.join(os.path.dirname(os.path.realpath(__file__)), '../data')
if not (os.path.exists(data_path)):
os.makedirs(data_path)
logdir = logdir_prefix + args.exp_name + '_' + args.env_name + '_' + time.strftime("%d-%m-%Y_%H-%M-%S")
logdir = os.path.join(data_path, logdir)
params['logdir'] = logdir
if not(os.path.exists(logdir)):
os.makedirs(logdir)
print("\n\n\nLOGGING TO: ", logdir, "\n\n\n")
trainer = AC_Trainer(params)
trainer.run_training_loop()
if __name__ =="__main__":
main()
CS294(285) 作业的Actor Critic算法目录结构如下:其中DQN算法,作业中使用tensorflow.contrib.layers构建深度学习神经网络,Tensorflow升版到2.0以后,对tensorflow.contrib库已停止更新,因此,作业中的代码需稍作修改,自定义实现全连接网络。感兴趣的同学也可自定义卷积神经网络。
dqn_utils.py代码
import gym
import numpy as np
import random
from collections import namedtuple
import tensorflow.compat.v1 as tf
tf.disable_v2_behavior()
import tensorflow.contrib.layers as layers
from cs285.infrastructure.atari_wrappers import wrap_deepmind
OptimizerSpec =namedtuple("OptimizerSpec",["constructor","kwargs","lr_schedule"])
def get_wrapper_by_name(env, classname):
currentenv = env
while True:
if classname in currentenv.__class__.__name__:
return currentenv
elif isinstance(env, gym.Wrapper):
currentenv = currentenv.env
else:
raise ValueError("Couldn't find wrapper named %s"%classname)
class MemoryOptimizedReplayBuffer(object):
def __init__(self,size,frame_history_len,lander=False):
self.lander =lander
self.size =size
self.frame_history_len =frame_history_len
self.next_idx =0
self.num_in_buffer =0
self.obs =None
self.action =None
self.reward =None
self.done =None
def can_sample(self,batch_size):
return batch_size +1 <= self.num_in_buffer
def _encode_sample(self,idxes):
obs_batch =np.concatenate([self._encode_observation(idx)[None] for idx in idxes],0)
act_batch=self.action[idxes]
rew_batch =self.reward[idxes]
next_obs_batch =np.concatenate([self._encode_observation(idx+1)[None] for idx in idxes],0)
done_mask =np.array([1.0 if self.done[idx] else 0.0 for idx in idxes],dtype=np.float32)
return obs_batch,act_batch,rew_batch,next_obs_batch,done_mask
def sample(self,batch_size):
assert self.can_sample(batch_size)
idxes =sample_n_unique(lambda:random.randint(0,self.num_in_buffer -2),batch_size )
return self._encode_sample(idxes)
def encode_recent_observation(self):
assert self.num_in_buffer >0
return self._encode_observation((self.next_idx -1)% self.size)
def _encode_observation(self,idx):
end_idx =idx+1
start_idx =end_idx-self.frame_history_len
if len(self.obs.shape) ==2:
return self.obs[end_idx-1]
if start_idx <0 and self.num_in_buffer!=self.size:
start_idx=0
for idx in range(start_idx,end_idx-1):
if self.done[idx % self.size]:
start_idx =idx +1
missing_context =self.frame_history_len -(end_idx-start_idx)
if start_idx<0 or missing_context>0:
frames =[np.zeros_like(self.obs[0]) for _ in range(missing_context)]
for idx in range(start_idx,end_idx):
frames.append(self.obs[idx % self.size])
return np.concatenate(frames,2)
else:
img_h,img_w =self.obs.shape[1],self.obs.shape[2]
return self.obs[start_idx:end_idx].transpose(1,2,0,3).reshape(img_h, img_w,-1)
def store_frame(self,frame):
if self.obs is None:
self.obs =np.empty([self.size]+list(frame.shape),dtype=np.float32 if self.lander else np.uint8)
self.action=np.empty([self.size],dtype=np.int32)
self.reward=np.empty([self.size],dtype=np.float32)
self.done=np.empty([self.size],dtype=np.bool)
self.obs[self.next_idx] =frame
ret=self.next_idx
self.next_idx=(self.next_idx+1)%self.size
self.num_in_buffer =min(self.size,self.num_in_buffer+1)
return ret
def store_efffect(self,idx,action,reward,done):
self.action[idx]=action
self.reward[idx]=reward
self.done[idx]=done
def sample_n_unique(sampling_f,n):
res =[]
while len(res)<n:
candidate =sampling_f()
if candidate not in res:
res.append(candidate)
return res
#插值
def linear_interpolation(l,r,alpha):
return l+alpha*(r-l)
class PiecewiseSchedule(object):
def __init__(self,endpoint,interpolation=linear_interpolation,outside_value=None):
idxes=[e[0] for e in endpoint]
assert idxes == sorted(idxes)
self._interpolation =interpolation
self._outside_value =outside_value
self._endpoints =endpoint
#在时间序列中插入时间t的值
def value(self,t):
for(l_t,l),(r_t,r) in zip(self._endpoints[:-1],self._endpoints[1:]):
if l_t<=t and t<r_t:
alpha =float(t-l_t)/(r_t -l_t)
return self._interpolation(l,r,alpha)
assert self._outside_value is not None
return self._outside_value
'''
梯度裁剪
'''
def minimize_and_clip(optimizer,objective,var_list,clip_val=10):
gradients =optimizer.compute_gradients(objective,var_list=var_list)
for i,(grad,var) in enumerate(gradients):
if grad is not None:
gradients[i]= (tf.clip_by_norm(grad,clip_val),var)
return optimizer.apply_gradients(gradients)
'''
带参损失函数
Huber Loss用于解决奇点数据过拟合;
Focal Loss用于解决分类问题中类别不均衡。
'''
def huber_loss(x,delta=1.0):
return tf.where(
tf.abs(x)<delta,
tf.square(x)*0.5,
delta*(tf.abs(x)-0.5*delta)
)
# 全连接层函数
def fcn_layer(
inputs, # 输入数据
input_dim, # 输入层神经元数量
output_dim, # 输出层神经元数量
activation=None): # 激活函数
W = tf.Variable(tf.truncated_normal([input_dim, output_dim], stddev=0.1))
# 以截断正态分布的随机初始化W
b = tf.Variable(tf.zeros([output_dim]))
XWb = tf.matmul(inputs, W) + b # Y=WX+B
if (activation == None): # 默认不使用激活函数
outputs = XWb
else:
outputs = activation(XWb) # 代入参数选择的激活函数
return outputs # 返回
def lander_model(obs,num_actions,scope,reuse=False):
with tf.variable_scope(scope,reuse=reuse):
out =obs
with tf.variable_scope("action_value"):
out = fcn_layer(out, out.shape[1], 64, tf.nn.relu)
out = fcn_layer(out, out.shape[1], 64, tf.nn.relu)
out = fcn_layer(out, out.shape[1], num_actions)
'''
out = layers.fully_connected(out,num_outputs =64,activation_fn=tf.nn.relu)
out = layers.fully_connected(out,num_outputs =64,activation_fn=tf.nn.relu)
out = layers.fully_connected(out,num_outputs=num_actions,activation_fn=None)
'''
return out
def get_env_kwargs(env_name):
print("\n #################",env_name)
if env_name == 'PongNoFrameskip-v4':
kwargs = {
'learning_starts': 50000,
'target_update_freq': 10000,
'replay_buffer_size': int(1e6),
'num_timesteps': int(2e8),
'q_func': atari_model,
'learning_freq': 4,
'grad_norm_clipping': 10,
'input_shape': (84, 84, 4),
'env_wrappers': wrap_deepmind,
'frame_history_len': 4,
'gamma': 0.99,
}
kwargs['optimizer_spec'] = atari_optimizer(kwargs['num_timesteps'])
kwargs['exploration_schedule'] = atari_exploration_schedule(kwargs['num_timesteps'])
elif env_name == 'LunarLander-v2':
def lunar_empty_wrapper(env):
return env
kwargs = {
'optimizer_spec': lander_optimizer(),
'q_func': lander_model,
'replay_buffer_size': 50000,
'batch_size': 32,
'gamma': 1.00,
'learning_starts': 1000,
'learning_freq': 1,
'frame_history_len': 1,
'target_update_freq': 3000,
'grad_norm_clipping': 10,
'lander': True,
'num_timesteps': 500000,
'env_wrappers': lunar_empty_wrapper
}
kwargs['exploration_schedule'] = lander_exploration_schedule(kwargs['num_timesteps'])
else:
raise NotImplementedError
return kwargs
def atari_model(img_input, num_actions, scope, reuse=False):
with tf.variable_scope(scope, reuse=reuse):
out = tf.cast(img_input, tf.float32) / 255.0
with tf.variable_scope("convnet"):
# original architecture
out = layers.convolution2d(out, num_outputs=32, kernel_size=8, stride=4, activation_fn=tf.nn.relu)
out = layers.convolution2d(out, num_outputs=64, kernel_size=4, stride=2, activation_fn=tf.nn.relu)
out = layers.convolution2d(out, num_outputs=64, kernel_size=3, stride=1, activation_fn=tf.nn.relu)
out = layers.flatten(out)
with tf.variable_scope("action_value"):
out = layers.fully_connected(out, num_outputs=512, activation_fn=tf.nn.relu)
out = layers.fully_connected(out, num_outputs=num_actions, activation_fn=None)
return out
#在训练的迭代次数中采用渐变的学习率
def atari_optimizer(num_timesteps):
num_iterations = num_timesteps /4
lr_multiplier =1.0
lr_schedule =PiecewiseSchedule([
(0,1e-4 *lr_multiplier),
(num_iterations /10,1e-4*lr_multiplier),
(num_iterations /2,5e-5*lr_multiplier)
],
outside_value=5e-5*lr_multiplier)
return OptimizerSpec(
constructor=tf.train.AdamOptimizer,
kwargs=dict(epsilon=1e-4),
lr_schedule=lr_schedule
)
#在训练的迭代次数中采用渐变的探索率
def atari_exploration_schedule(num_timsteps):
return PiecewiseSchedule(
[
(0,1.0),
(1e6,0.1),
(num_timsteps /8,0.1),
],outside_value=0.01
)
def lander_exploration_schedule(num_timsteps):
return PiecewiseSchedule(
[
(0,1),
(num_timsteps*0.1,0.02),
],outside_value=0.02
)
def lander_optimizer():
return OptimizerSpec(
constructor=tf.train.AdamOptimizer,
lr_schedule=ConstantSchedule(1e-3),
kwargs={}
)
class ConstantSchedule(object):
def __init__(self, value):
"""Value remains constant over time.
Parameters
----------
value: float
Constant value of the schedule
"""
self._v = value
def value(self, t):
"""See Schedule.value"""
return self._v