策略梯度算法的基本概念
策略梯度算法(Policy Gradient Algorithm)是一种直接优化策略的方法。与基于值函数的方法(如DQN)不同,策略梯度算法直接对策略进行参数化,并通过优化一个目标函数(通常是期望回报)来学习最优策略。
策略梯度算法的基本思想是:通过优化策略的参数,使得在与环境交互时,智能体能够获得最大化的累积回报。
优化目标
策略梯度算法是为了优化这么一个目标函数:
J(θ)=E[t=0∑∞γtRt+1]
其中:
- πθ表示当前参数化的策略
- γ是折扣因子
- Rt+1是智能体在时间t时执行动作得到的reward
且可以证明,J(θ)=Vπ
REINFORECE算法
先对目标函数求梯度
∇θJ(θ)=∇θE[t=0∑∞γtRt+1]
可以写成如下的形式
∇θJ(θ)=s∈S∑η(s)a∈A∑∇θπ(a∣s,θ)qπ(s,a)=E[∇θlnπ(A∣S,θ)qπ(S,A)]
我们要知道这个式子的expectation,而在实际情况中,我们是无法得到的。但根据stochastic gradient descent的基本思路,我们可以用采样来近似这个expectation,得到
∇θJ(θ)≈∇θlnπ(a∣s,θ)qπ(s,a)
可以把qπ替换成qt
θt+1=θt+α∇θlnπ(at∣st,θt)qt(st,at)
采取Monte-Carlo方法
qt(st,at)=k=t+1∑Tγk−t−1rk
得到的这样一个算法便称为REINFORCE。
伪代码
REINFORCE算法的伪代码
Pseudocode: Policy Gradient by Monte Carlo (REINFORCE)
Initialization: A parameterized function π(a∣s,θ), γ∈(0,1), and α>0.
Aim: Search for an optimal policy maximizing J(θ).
For the kth iteration, do
s0 and generate an episode following π(θk).Suppose the episode is {s0,a0,r1,…,sT−1,aT−1,rT}.
For (t=0,1,…,T−1), do
Value update:
qt(st,at)=∑k=t+1Tγk−t−1rk
Policy update:
θt+1=θt+α∇θlnπ(at∣st,θt)qt(st,at)
θk=θT
代码实现
定义PolicyNet
class PolicyNet(torch.nn.Module): def __init__(self,state_dim,hidden_dim,action_dim): super(PolicyNet,self).__init__() self.fc1 = torch.nn.Linear(state_dim, hidden_dim) self.fc2 = torch.nn.Linear(hidden_dim, action_dim)
def forward(self, x): x = F.relu(self.fc1(x)) return F.softmax(self.fc2(x), dim=1)
|
定义REINFORCE算法
class REINFORCE: def __init__(self,state_dim,hidden_dim,action_dim,learning_rate,gamma,device): self.policy_net = R_Net.PolicyNet(state_dim,hidden_dim,action_dim).to(device) self.optimizer = torch.optim.Adam(self.policy_net.parameters(),lr=learning_rate) self.gamma = gamma self.device = device
def take_action(self,state): state = torch.tensor([state],dtype=torch.float).to(self.device) probs = self.policy_net(state) action_dist = torch.distributions.Categorical(probs) action = action_dist.sample() return action.item() def update(self,transition_dict): reward_list = transition_dict['rewards'] state_list = transition_dict['states'] action_list = transition_dict['actions']
G = 0 self.optimizer.zero_grad() for i in reversed(range(len(reward_list))): reward = reward_list[i] state = torch.tensor([state_list[i]], dtype=torch.float).to(self.device) action = torch.tensor([action_list[i]]).view(-1, 1).to(self.device) log_prob = torch.log(self.policy_net(state).gather(1, action)) G = self.gamma * G + reward loss = -log_prob * G loss.backward() self.optimizer.step()
|
开始训练
learning_rate = 1e-3 num_episodes = 1000 hidden_dim = 128 gamma = 0.98
device = torch.device("cuda") if torch.cuda.is_available() else torch.device("cpu")
env_name = 'CartPole-v0' env = gym.make(env_name) random.seed(0) np.random.seed(0) env.reset(seed=0) torch.manual_seed(0)
state_dim = env.observation_space.shape[0]
action_dim = env.action_space.n
agent = R_Algorithm.REINFORCE(state_dim, hidden_dim, action_dim, learning_rate, gamma, device)
return_list = [] for i in range(10): with tqdm(total=int(num_episodes / 10), desc='Iteration %d' % i) as pbar: for i_episode in range(int(num_episodes / 10)): episode_return = 0 transition_dict = { 'states': [], 'actions': [], 'next_states': [], 'rewards': [], 'dones': [] } state = env.reset() state = state[0] done = False while not done: action = agent.take_action(state) next_state, reward, done, truncated, _ = env.step(action) done = done or truncated transition_dict['states'].append(state) transition_dict['actions'].append(action) transition_dict['next_states'].append(next_state) transition_dict['rewards'].append(reward) transition_dict['dones'].append(done) state = next_state episode_return += reward return_list.append(episode_return) agent.update(transition_dict) if (i_episode + 1) % 10 == 0: pbar.set_postfix({ 'episode': '%d' % (num_episodes / 10 * i + i_episode + 1), 'return': '%.3f' % np.mean(return_list[-10:]) }) pbar.update(1)
episodes_list = list(range(len(return_list))) plt.plot(episodes_list, return_list) plt.xlabel('Episodes') plt.ylabel('Returns') plt.title('REINFORCE on {}'.format(env_name)) plt.show()
mv_return = rl_utils.moving_average(return_list, 9) plt.plot(episodes_list, mv_return) plt.xlabel('Episodes') plt.ylabel('Returns') plt.title('REINFORCE on {}'.format(env_name)) plt.show()
|
运行代码,得到策略总回报Return与训练次数Episode的关系图
可以看到,随着收集到的轨迹越来越多,REINFORCE算法有效地学习到了最优策略。不过,相比于前面的DQN算法,REINFORCE算法使用了更多的序列,这是因为REINFORCE算法是一个在线策略算法,之前收集到的轨迹数据不会被再次利用。此外,REINFORCE算法的性能也有一定程度的波动,这主要是因为每条采样轨迹的回报值波动比较大,这也是REINFORCE算法主要的不足。