DQN
DQN(Deep Q Network)是 Q-Learning 的神经网络形式,相比于普通的Q-Learning,它做出了如下的改进与性能优化:
DQN网络的更新原则
简单来说,DQN的更新是为了最小化这么一个损失函数:
L ( θ ) = E ( s , a , r , s ′ ) [ ( r + γ max a ′ Q target ( s ′ , a ′ ; θ target ) − Q ( s , a ; θ ) ) 2 ] L(\theta) = \mathbb{E}_{(s, a, r, s')} \left[ \left( r + \gamma \max_{a'} Q_{\text{target}}(s', a'; \theta_{\text{target}}) - Q(s, a; \theta) \right)^2 \right]
L ( θ ) = E ( s , a , r , s ′ ) [ ( r + γ a ′ max Q target ( s ′ , a ′ ; θ target ) − Q ( s , a ; θ ) ) 2 ]
其中:
DQN算法实现
首先是定义经验回放池,用于存放样本与取出样本
class ReplayBuffer : def __init__ (self,capacity ): self .buffer = collections.deque(maxlen=capacity) def add (self,state,action,reward,next_state,done ): self .buffer.append((state,action,reward,next_state,done)) def sample (self,batch_size ): transitions = random.sample(self .buffer,batch_size) state,action,reward,next_state,done = zip (*transitions) return np.array(state),action,reward,np.array(next_state),done def size (self ): return len (self .buffer)
由于我们在Cartpole环境中实现DQN,神经网络不必复杂,只需定义一个只有一层隐藏层的神经网络
class Qnet (torch.nn.Module): def __init__ (self,state_dim,hidden_dim,action_dim ): super (Qnet,self ).__init__() self .fc1 = torch.nn.Linear(state_dim,hidden_dim) self .fc2 = torch.nn.Linear(hidden_dim,action_dim) def forward (self,x ): x = F.relu(self .fc1(x)) return self .fc2(x)
在这个神经网络中,输入层输入的是环境空间中的状态,在Cartpole环境中,状态空间是一个维数为4的向量,即为(车的位置,车的速度,杆的角速度,杆尖端的速度),输出层得到的是执行所有动作后的Q。
神经网络运算
简单来说,神经网络是在执行一个矩阵运算。
我们把输入的状态矩阵设为x x x ,它的形状为[batch_size, state_dim],batch_size为取出的样本数量,state_dim是状态空间的维度。
我们将x x x 输入到第一个线性层fc1中,fc1计算x = x ⋅ W 1 + b 1 x=x·W_1+b_1 x = x ⋅ W 1 + b 1 ,其中W 1 W_1 W 1 的形状为[state_dim, hidden_dim]。然后对计算结果应用ReLU激活函数,将所有负值变为0,这是为了增加网络的非线性性质。之后再进入fc2,进行线性运算,得到输出。
接下来定义DQN算法
class DQN : def __init__ (self,state_dim,hidden_dim,action_dim,learning_rate,gamma,epsilon,target_update,device ): self .action_dim = action_dim self .q_net = DQN_Net.Qnet(state_dim,hidden_dim,self .action_dim).to(device) self .taget_q_net = DQN_Net.Qnet(state_dim,hidden_dim,self .action_dim).to(device) self .optimizer = torch.optim.Adam(self .q_net.parameters(),lr=learning_rate) self .gamma = gamma self .epsilon = epsilon self .taget_update = target_update self .count = 0 self .device = device def take_action (self,state ): if np.random.random() < self .epsilon: action = np.random.randint(self .action_dim) else : state = torch.tensor([state],dtype=torch.float ).to(self .device) action = self .q_net.forward(state).argmax().item() return action def update (self,transition_dict ): states = torch.tensor(transition_dict['states' ],dtype=torch.float ).to(self .device) actions = torch.tensor(transition_dict['actions' ]).view(-1 , 1 ).to(self .device) rewards = torch.tensor(transition_dict['rewards' ],dtype=torch.float ).view(-1 , 1 ).to(self .device) next_states = torch.tensor(transition_dict['next_states' ],dtype=torch.float ).to(self .device) dones = torch.tensor(transition_dict['dones' ],dtype=torch.float ).view(-1 , 1 ).to(self .device) q_values = self .q_net.forward(states).gather(1 , actions) max_next_q_values = self .taget_q_net.forward(next_states).max (1 )[0 ].view(-1 ,1 ) q_targets = rewards + self .gamma * max_next_q_values * (1 -dones) dqn_loss = torch.mean(F.mse_loss(q_values, q_targets)) self .optimizer.zero_grad() dqn_loss.backward() self .optimizer.step() if self .count % self .taget_update == 0 : self .taget_q_net.load_state_dict(self .q_net.state_dict()) self .count += 1
take_action函数利用ε-Greedy策略选择输入状态为state时下一步采取什么动作。
update函数用于更新当前Q网络与目标Q网络的参数
最后是参数设定与开始训练
import randomimport gymimport numpy as npimport collectionsfrom tqdm import tqdmimport torchimport torch.nn.functional as Fimport matplotlib.pyplot as pltimport DQN_Netimport DQN_Algorithmimport rl_utilsnp.bool8 = np.bool lr = 2e-3 num_episodes = 500 hidden_dim = 128 gamma = 0.98 epsilon = 0.01 target_update = 10 buffer_size = 10000 minimal_size = 500 batch_size = 64 device = torch.device("cuda" ) if torch.cuda.is_available() else torch.device("cpu" ) env_name = 'CartPole-v0' env = gym.make(env_name) random.seed(0 ) np.random.seed(0 ) env.reset(seed=0 ) torch.manual_seed(0 ) replay_buffer = DQN_Net.ReplayBuffer(buffer_size) state_dim = env.observation_space.shape[0 ] action_dim = env.action_space.n agent = DQN_Algorithm.DQN(state_dim, hidden_dim, action_dim, lr, gamma, epsilon, target_update, device) return_list = [] for i in range (10 ): with tqdm(total=int (num_episodes / 10 ), desc='Iteration %d' % i) as pbar: for i_episode in range (int (num_episodes / 10 )): episode_return = 0 state = env.reset() state = state[0 ] done = False while not done: action = agent.take_action(state) next_state,reward,done,truncated,_ = env.step(action) done = done or truncated replay_buffer.add(state,action,reward,next_state,done) state = next_state episode_return += reward if replay_buffer.size() > minimal_size: s,a,r,ns,d = replay_buffer.sample(batch_size) transition_dict = { 'states' : s, 'actions' : a, 'rewards' : r, 'next_states' : ns, 'dones' : d } agent.update(transition_dict) return_list.append(episode_return) if (i_episode + 1 ) % 10 == 0 : pbar.set_postfix({ 'episode' : '%d' % (num_episodes / 10 * i + i_episode + 1 ), 'return' : '%.3f' % np.mean(return_list[-10 :]) }) pbar.update(1 ) episodes_list = list (range (len (return_list))) plt.plot(episodes_list, return_list) plt.xlabel('Episodes' ) plt.ylabel('Returns' ) plt.title('DQN on {}' .format (env_name)) plt.show() mv_return = rl_utils.moving_average(return_list, 9 ) plt.plot(episodes_list, mv_return) plt.xlabel('Episodes' ) plt.ylabel('Returns' ) plt.title('DQN on {}' .format (env_name)) plt.show()
运行代码,得到策略总回报Return与训练次数Episode的关系图
可以看到,在训练了大概100次之后,策略的Return陡然上升,很快收敛到最优值200。但我们也可以看到,在 DQN 的性能得到提升后,它会持续出现一定程度的震荡,这主要是神经网络过拟合到一些局部经验数据后由a r g m a x argmax a r g m a x 运算带来的影响。
Dueling DQN
Dueling DQN是DQN的改进算法,它能够很好地学习到不同动作的差异性,在动作空间较大的环境下非常有效。
Dueling DQN优化之处
我们定义A ( s , a ) = Q ( s , a ) − V ( s ) A(s,a)=Q(s,a)-V(s) A ( s , a ) = Q ( s , a ) − V ( s ) ,A ( s , a ) A(s,a) A ( s , a ) 为每个动作的优势函数。Dueling DQN将价值函数V ( s ) V(s) V ( s ) 与优势函数A ( s , a ) A(s,a) A ( s , a ) 分别建模,作为神经网络的两个不同分支来输出,然后求和得到Q值。将状态价值函数和优势函数分别建模的好处在于:某些情境下智能体只会关注状态的价值,而并不关心不同动作导致的差异,此时将二者分开建模能够使智能体更好地处理与动作关联较小的状态。
Q ( s , a ) = V ( s ) + ( A ( s , a ) − 1 ∣ A ∣ ∑ a ′ A ( s , a ′ ) ) Q(s, a) = V(s) + \left( A(s, a) - \frac{1}{|A|} \sum_{a'} A(s, a') \right)
Q ( s , a ) = V ( s ) + ( A ( s , a ) − ∣ A ∣ 1 a ′ ∑ A ( s , a ′ ) )
这个公式中的修正部分( A ( s , a ) − 1 ∣ A ∣ ∑ a ′ A ( s , a ′ ) ) \left( A(s, a) - \frac{1}{|A|} \sum_{a'} A(s, a') \right) ( A ( s , a ) − ∣ A ∣ 1 ∑ a ′ A ( s , a ′ ) ) 表示从优势函数中减去其均值,从而保证所有动作的平均优势为零。Dueling DQN能更高效学习状态价值函数。每一次更新时,函数都会被更新,这也会影响到其他动作的Q值。而传统的DQN只会更新某个动作的Q值,其他动作的Q值就不会更新。因此,Dueling DQN能够更加频繁、准确地学习状态价值函数。
Dueling DQN代码实现
神经网络部分的修改
修改为输出两个分支,再求和
class VAnet (torch.nn.Module): def __init__ (self,state_dim,hidden_dim,action_dim ): super (VAnet,self ).__init__() self .fc1 = torch.nn.Linear(state_dim,hidden_dim) self .fc2 = torch.nn.Linear(hidden_dim,action_dim) self .fc3 = torch.nn.Linear(hidden_dim,1 ) def forward (self,x ): x = F.relu(self .fc1(x)) A = self .fc2(x) V = self .fc3(x) Q = V + A - A.mean(1 ).view(-1 ,1 ) return Q
算法部分
class DQN : def __init__ (self,state_dim,hidden_dim,action_dim,learning_rate,gamma,epsilon,target_update,device,dqn_type = 'VanillaDQN' ): self .action_dim = action_dim if dqn_type == 'DuelingDQN' : self .q_net = D_DQN_Net.VAnet(state_dim,hidden_dim,self .action_dim).to(device) self .target_q_net = D_DQN_Net.VAnet(state_dim,hidden_dim,self .action_dim).to(device) else : self .q_net = D_DQN_Net.Qnet(state_dim,hidden_dim,self .action_dim).to(device) self .q_net = D_DQN_Net.Qnet(state_dim,hidden_dim,self .action_dim).to(device) self .optimizer = torch.optim.Adam(self .q_net.parameters(),lr=learning_rate) self .gamma = gamma self .epsilon = epsilon self .target_update = target_update self .count = 0 self .dqn_type = dqn_type self .device = device def take_action (self,state ): if np.random.random() < self .epsilon: action = np.random.randint(self .action_dim) else : state = torch.tensor([state],dtype=torch.float ).to(self .device) action = self .q_net.forward(state).argmax().item() return action def max_q_value (self,state ): state = torch.tensor([state],dtype=float ).to(self .device) return self .q_net(state).max ().item() def update (self,transition_dict ): states = torch.tensor(transition_dict['states' ],dtype=torch.float ).to(self .device) actions = torch.tensor(transition_dict['actions' ]).view(-1 ,1 ).to(self .device) rewards = torch.tensor(transition_dict['rewards' ]).view(-1 ,1 ).to(self .device) next_states = torch.tensor(transition_dict['next_states' ],dtype=torch.float ).to(self .device) dones = torch.tensor(transition_dict['dones' ],dtype=torch.float ).view(-1 ,1 ).to(self .device) q_values = self .q_net(states).gather(1 ,actions) if self .dqn_type == 'DoubleDQN' : max_action = self .q_net.forward(next_states).max (1 )[1 ].view(-1 , 1 ) max_next_q_values = self .target_q_net.forward(next_states).gather(1 , max_action) else : max_next_q_values = self .target_q_net.forward(next_states).max (1 )[0 ].view(-1 , 1 ) q_targets = rewards + self .gamma * max_next_q_values * (1 -dones) dqn_loss = torch.mean(F.mse_loss(q_values, q_targets)) self .optimizer.zero_grad() dqn_loss.backward() self .optimizer.step() if self .count % self .target_update == 0 : self .target_q_net.load_state_dict(self .q_net.state_dict()) self .count += 1
参数设定与开始训练
lr = 2e-3 num_episodes = 1000 hidden_dim = 128 gamma = 0.98 epsilon = 0.01 target_update = 10 buffer_size = 10000 minimal_size = 500 batch_size = 64 device = torch.device("cuda" ) if torch.cuda.is_available() else torch.device("cpu" ) env_name = 'CartPole-v0' env = gym.make(env_name) state_dim = env.observation_space.shape[0 ] action_dim = env.action_space.n random.seed(0 ) np.random.seed(0 ) env.reset(seed=0 ) torch.manual_seed(0 ) replay_buffer = D_DQN_Net.ReplayBuffer(buffer_size) agent = D_DQN_Algorithm.DQN(state_dim,hidden_dim,action_dim,lr,gamma,epsilon,target_update,device,'DuelingDQN' ) return_list = [] for i in range (10 ): with tqdm(total=int (num_episodes / 10 ), desc='Iteration %d' % i) as pbar: for i_episode in range (int (num_episodes / 10 )): episode_return = 0 state = env.reset() state = state[0 ] done = False while not done: action = agent.take_action(state) next_state,reward,done,truncated,_ = env.step(action) done = done or truncated replay_buffer.add(state,action,reward,next_state,done) state = next_state episode_return += reward if replay_buffer.size() > minimal_size: s,a,r,ns,d = replay_buffer.sample(batch_size) transition_dict = { 'states' : s, 'actions' : a, 'rewards' : r, 'next_states' : ns, 'dones' : d } agent.update(transition_dict) return_list.append(episode_return) if (i_episode + 1 ) % 10 == 0 : pbar.set_postfix({ 'episode' : '%d' % (num_episodes / 10 * i + i_episode + 1 ), 'return' : '%.3f' % np.mean(return_list[-10 :]) }) pbar.update(1 ) episodes_list = list (range (len (return_list))) mv_return = rl_utils.moving_average(return_list, 5 ) plt.plot(episodes_list, mv_return) plt.xlabel('Episodes' ) plt.ylabel('Returns' ) plt.title('Dueling DQN on {}' .format (env_name)) plt.show()
运行代码,训练完成后得到
这里出现了一个问题,为什么这里收敛的速度比DQN收敛的慢很多,这不是说明Dueling DQN的性能很差吗?
了解后发现,对于Cartpole环境来说,它的动作空间只有2维,复杂度很低,所以在这种情况下,Dueling DQN不能体现出优势,又由于相对DQN较复杂的神经网络运算方法,导致效率比较低。如果将环境换为更复杂的情况,那么收敛速度将明显快于DQN。
完整源代码点这里