|
DouZero是快手团队开发的一个能够通过自我对战从而学到如何玩三人斗地主的模型。我看了一下官方的代码,不是很复杂,作为一个强化学习入门的例子还不错,毕竟能真跑起来的东西不太好找。本文就其基本思路和代码实现进行分析和讲解。
这个链接可以找到DouZero的官方代码实现和论文链接。
这里是Demo程序代码地址。
我研究了这两个项目的代码,并且把DouZero的三人对战模型改成了双人对战,并且将Demo程序也由三人对战改成了两个人对战的。
基本思路
DouZero 的基本思路是训练了两个神经网络,三个模型,其中一个神经网络对应地主,另外一个神经网络对应农民,农民有两个角色,一个是地主上家,一个是地主下家,他们使用了一个网络结构,但是分别训练了模型,所以是两个网络结构,三个模型。
神经网络的作用是,输入当前的牌局情况和某种合法出牌序列,神经网络会对这个出牌序列进行打分,如果反复输入当前的牌局状态和穷尽所有的合法出牌序列,那么就可以为玩家当前情况下的所有合法出牌序列打分,选择分值最高的出牌。
训练这个神经网络的方法是,通过 ACTOR 过程依据当下正在训练的神经网络进行模拟牌局,也就是三个机器人互相打,每个机器人使用属于自己的神经网络和参数,当轮到自己出牌的时候,机器人就会把当前的牌局信息和手里的合法出牌序列做个BATCH扔给神经网络,神经网络就会给它一系列的分值,每个分值对应一个合法的出牌可能,那么挑一个分最大的,出牌。即使是一团乱战,几番下来,一定有个幸运的机器人赢了,对方输了,那么关键的操作来了,要把获胜一方的所有历史出牌序列进行奖励,把失败一方的所有历史出牌序列进行惩罚,所谓奖励和惩罚就是打标签了,相当于对赢的一方说干得好,下次还这样干,每出的一把牌都经典,赶紧记录下来;对输牌的一方说,不行啊,下回别这么出了,没有一把出的对,统统都是错。
这时候另外一个角色上场了 LEANER,学习者。ACTOR 打牌,并且做了标记,总结了很多牌局的数据,但是ACTOR 还是继续打牌,并不继续使用它自己总结的数据。这些数据会交给 LEARNER,LEARNER 拿到这些牌局信息,出牌的序列,还有奖励惩罚的信息,可以用来做监督训练。具体做法是,将每一刻(时间步)的牌局信息和出牌的序列提供给神经网络,训练神经网络,让其输出的值尽可能靠近当前步的奖励/惩罚的值。对于赢牌方的序列和当下牌局信息,神经网络将其作为输入,将其奖励值作为输出,训练网络让网络匹配这组输入和输出,对于输掉牌局的一方,训练神经网络让神经网络识别这些牌局和这样的出牌序列,知道是要输的,跟惩罚值匹配。
每次 LEARNER 完成一个完整牌局的训练就会为 ACTOR 的神经网络更新参数,让 ACTOR 的神经网络别那么随机,俗话说,长点心。虽然 ACTOR 必然有输就有赢,但高手之间对决和新手乱打区别还是很大的。
无论 ACTOR 还是 LERNER 都是三个角色,三个模型,但是两个农民使用相同的神经网络结构。
代码分析
DouZero 为了加速,到处使用多进程和数据共享,这些为了并行而写的代码对于理解模型的逻辑是个障碍,但是为了训练加速又不得不用,强化学习比较难于训练,论文上说他们用了一个48 CORES CPU 加上4个1080Ti,最好的效果训练 30 天,也有 2天就可以训练出一个基本可玩的模型了。总之如果把这些训练加速的程序去掉,就会简单很多。
阅读这些模型的实现还是基本的套路,先看模型训练好了它是如何做推理,然后再看网络结构和训练的代码。如果学有余力可能再看看斗地主的外围应用是如何使用的。
DouZero 有一个单独的评估应用 evaluate,这个评估也不是说结果有多好,只是能跑,并且统计各个玩家的角色胜负。只有在真正和其它模型对战或者和人对战才能做出所谓的评估。
评估之前,它先通过一个脚本生成一大堆测试牌局:
mport argparse
import pickle
import numpy as np
deck = []
for i in range(3, 15):
deck.extend([i for _ in range(4)])
deck.extend([17 for _ in range(4)])
deck.extend([20, 30])
def get_parser():
parser = argparse.ArgumentParser(description='DouZero: random data generator')
parser.add_argument('--output', default='eval_data', type=str)
parser.add_argument('--num_games', default=10000, type=int)
return parser
def generate():
_deck = deck.copy()
np.random.shuffle(_deck)
card_play_data = {'landlord': _deck[:20],
'landlord_up': _deck[20:37],
'landlord_down': _deck[37:54],
'three_landlord_cards': _deck[17:20],
}
for key in card_play_data:
card_play_data[key].sort()
return card_play_data
if __name__ == '__main__':
flags = get_parser().parse_args()
output_pickle = flags.output + '.pkl'
print("output_pickle:", output_pickle)
print("generating data...")
data = []
for _ in range(flags.num_games):
data.append(generate())
print("saving pickle file...")
with open(output_pickle,'wb') as g:
pickle.dump(data,g,pickle.HIGHEST_PROTOCOL)
其中deck 对牌面进行了映射:
deck = []
for i in range(3, 15):
deck.extend([i for _ in range(4)])
deck.extend([17 for _ in range(4)])
deck.extend([20, 30])
从3-15 是普通牌,17 是 “2”,20 是“小王”,30 是“大王”。数值大小表示了牌本身的大小。
然后随机生成牌局,默认10000个牌局,并且把牌局作为数据存起来。相当于发牌。
在 simulation 中关键的一段就是下面的代码,就是主要的玩牌的过程,稍微做了一点抽象。根据玩家初始化整个GameEnv,然后根据刚才造的10000个牌局开始玩牌,一局一局,每一次牌局结束,GameEnv的game over 变量都会设置为 True,就是出现了某个获胜方,然后环境会 reset 再来一局。
def mp_simulate(card_play_data_list, card_play_model_path_dict, q):
players = load_card_play_models(card_play_model_path_dict)
env = GameEnv(players)
for idx, card_play_data in enumerate(card_play_data_list):
env.card_play_init(card_play_data)
while not env.game_over:
env.step()
env.reset()
q.put((env.num_wins['landlord'],
env.num_wins['farmer'],
env.num_scores['landlord'],
env.num_scores['farmer']
))
主要看一下这个 step 函数,看哪里为模型提供输入和拿到模型的输出。
def step(self):
action = self.players[self.acting_player_position].act(
self.game_infoset)
assert action in self.game_infoset.legal_actions
if len(action) > 0:
self.last_pid = self.acting_player_position
if action in bombs:
self.bomb_num += 1
self.last_move_dict[
self.acting_player_position] = action.copy()
self.card_play_action_seq.append(action)
self.update_acting_player_hand_cards(action)
self.played_cards[self.acting_player_position] += action
if self.acting_player_position == 'landlord' and \
len(action) > 0 and \
len(self.three_landlord_cards) > 0:
for card in action:
if len(self.three_landlord_cards) > 0:
if card in self.three_landlord_cards:
self.three_landlord_cards.remove(card)
else:
break
self.game_done()
if not self.game_over:
self.get_acting_player_position()
self.game_infoset = self.get_infoset()第一句话就是我们要找的,因为它是通过玩家的角色进行 ACT 选取,game infoset 就是牌局的信息,这个函数返回一个合法的出牌。
class DeepAgent:
def __init__(self, position, model_path):
self.model = _load_model(position, model_path)
def act(self, infoset):
if len(infoset.legal_actions) == 1:
return infoset.legal_actions[0]
obs = get_obs(infoset)
z_batch = torch.from_numpy(obs['z_batch']).float()
x_batch = torch.from_numpy(obs['x_batch']).float()
if torch.cuda.is_available():
z_batch, x_batch = z_batch.cuda(), x_batch.cuda()
y_pred = self.model.forward(z_batch, x_batch, return_value=True)['values']
y_pred = y_pred.detach().cpu().numpy()
best_action_index = np.argmax(y_pred, axis=0)[0]
best_action = infoset.legal_actions[best_action_index]
return best_action如果 合法的出牌序列 legal actions 中只有一种可能,那就直接返回好了(它怎么没有判断没有合法的出牌序列?)
如果有好多种合法的出牌序列,这时候就要呼唤神经网络给出一个最好的选择。就是把所有的可能做成了一个MINI-BATCH扔给神经网络,返回最大的值对应的那个出牌。
这就是整个评估的过程,可能 env step 稍微麻烦一点,因为它涉及到整理牌局信息和根据当前手里牌遍历所有的合法出牌的组合,这些都是需要写挺长的代码才能解决。也许训练的过程中我们还会看到一些具体细节。
网络结构和训练过程
在 dmc/models.py 下:
"""
This file includes the torch models. We wrap the three
models into one class for convenience.
"""
import numpy as np
import torch
from torch import nn
class LandlordLstmModel(nn.Module):
def __init__(self):
super().__init__()
self.lstm = nn.LSTM(162, 128, batch_first=True)
self.dense1 = nn.Linear(373 + 128, 512)
self.dense2 = nn.Linear(512, 512)
self.dense3 = nn.Linear(512, 512)
self.dense4 = nn.Linear(512, 512)
self.dense5 = nn.Linear(512, 512)
self.dense6 = nn.Linear(512, 1)
def forward(self, z, x, return_value=False, flags=None):
lstm_out, (h_n, _) = self.lstm(z)
lstm_out = lstm_out[:,-1,:]
x = torch.cat([lstm_out,x], dim=-1)
x = self.dense1(x)
x = torch.relu(x)
x = self.dense2(x)
x = torch.relu(x)
x = self.dense3(x)
x = torch.relu(x)
x = self.dense4(x)
x = torch.relu(x)
x = self.dense5(x)
x = torch.relu(x)
x = self.dense6(x)
if return_value:
return dict(values=x)
else:
if flags is not None and flags.exp_epsilon > 0 and np.random.rand() < flags.exp_epsilon:
action = torch.randint(x.shape[0], (1,))[0]
else:
action = torch.argmax(x,dim=0)[0]
return dict(action=action)
class FarmerLstmModel(nn.Module):
def __init__(self):
super().__init__()
self.lstm = nn.LSTM(162, 128, batch_first=True)
self.dense1 = nn.Linear(484 + 128, 512)
self.dense2 = nn.Linear(512, 512)
self.dense3 = nn.Linear(512, 512)
self.dense4 = nn.Linear(512, 512)
self.dense5 = nn.Linear(512, 512)
self.dense6 = nn.Linear(512, 1)
def forward(self, z, x, return_value=False, flags=None):
lstm_out, (h_n, _) = self.lstm(z)
lstm_out = lstm_out[:,-1,:]
x = torch.cat([lstm_out,x], dim=-1)
x = self.dense1(x)
x = torch.relu(x)
x = self.dense2(x)
x = torch.relu(x)
x = self.dense3(x)
x = torch.relu(x)
x = self.dense4(x)
x = torch.relu(x)
x = self.dense5(x)
x = torch.relu(x)
x = self.dense6(x)
if return_value:
return dict(values=x)
else:
if flags is not None and flags.exp_epsilon > 0 and np.random.rand() < flags.exp_epsilon:
action = torch.randint(x.shape[0], (1,))[0]
else:
action = torch.argmax(x,dim=0)[0]
return dict(action=action)
# Model dict is only used in evaluation but not training
model_dict = {}
model_dict[&#39;landlord&#39;] = LandlordLstmModel
model_dict[&#39;landlord_up&#39;] = FarmerLstmModel
model_dict[&#39;landlord_down&#39;] = FarmerLstmModel
class Model:
&#34;&#34;&#34;
The wrapper for the three models. We also wrap several
interfaces such as share_memory, eval, etc.
&#34;&#34;&#34;
def __init__(self, device=0):
self.models = {}
if not device == &#34;cpu&#34;:
device = &#39;cuda:&#39; + str(device)
self.models[&#39;landlord&#39;] = LandlordLstmModel().to(torch.device(device))
self.models[&#39;landlord_up&#39;] = FarmerLstmModel().to(torch.device(device))
self.models[&#39;landlord_down&#39;] = FarmerLstmModel().to(torch.device(device))
def forward(self, position, z, x, training=False, flags=None):
model = self.models[position]
return model.forward(z, x, training, flags)
def share_memory(self):
self.models[&#39;landlord&#39;].share_memory()
self.models[&#39;landlord_up&#39;].share_memory()
self.models[&#39;landlord_down&#39;].share_memory()
def eval(self):
self.models[&#39;landlord&#39;].eval()
self.models[&#39;landlord_up&#39;].eval()
self.models[&#39;landlord_down&#39;].eval()
def parameters(self, position):
return self.models[position].parameters()
def get_model(self, position):
return self.models[position]
def get_models(self):
return self.models前面也讲过了,两个网络结构,三个模型实例,因为两个农民分别是地主上家和地主下家,他们在同样的局面下,出牌的策略是不同的,所以采用了单独的模型,地主和农民的模型结构其实是一样的,只是维度不同,因为农民有队友,而地主没有队友,所以农名要考虑队友曾经的出牌序列,而地主不用管那么多,就是干他们。
下面的这段代码在论文中有着重指出,就是在 ACTOR 出牌的时候也不是专挑神经网络的输出值最大的,还随机选,因为神经网络可能是个自负的大傻子,所以无论多好的神经网络也不全信。
if flags is not None and flags.exp_epsilon > 0 and np.random.rand() < flags.exp_epsilon:
action = torch.randint(x.shape[0], (1,))[0]
else:
action = torch.argmax(x,dim=0)[0]
return dict(action=action)
整体网络的思路是把当前桌面上的历史出牌序列先搞到一个 LSTM 里面,压出一个向量来,相当于对已经出现的出牌顺序做一个特别的关注,如果不考虑顺序很多出牌会失去意义。然后把 LSTM 的信息再和一些即时的信息拼接在一起,这些信息都在这个类里面,然后再接入到6个全连接网络里面。
class InfoSet(object):
&#34;&#34;&#34;
The game state is described as infoset, which
includes all the information in the current situation,
such as the hand cards of the three players, the
historical moves, etc.
&#34;&#34;&#34;
def __init__(self, player_position):
# The player position, i.e., landlord, landlord_down, or landlord_up
self.player_position = player_position
# The hand cands of the current player. A list.
self.player_hand_cards = None
# The number of cards left for each player. It is a dict with str-->int
self.num_cards_left_dict = None
# The three landload cards. A list.
self.three_landlord_cards = None
# The historical moves. It is a list of list
self.card_play_action_seq = None
# The union of the hand cards of the other two players for the current player
self.other_hand_cards = None
# The legal actions for the current move. It is a list of list
self.legal_actions = None
# The most recent valid move
self.last_move = None
# The most recent two moves
self.last_two_moves = None
# The last moves for all the postions
self.last_move_dict = None
# The played cands so far. It is a list.
self.played_cards = None
# The hand cards of all the players. It is a dict.
self.all_handcards = None
# Last player position that plays a valid move, i.e., not `pass`
self.last_pid = None
# The number of bombs played so far
self.bomb_num = None这些信息是要统统扔给神经网络的,但是有一个是特别重要的,就是legal actions,这个actions 表示当下牌局下所有合法的的出牌,如果是 ACTOR,要根据 actions 的数量复制多份牌局信息,然后做成一个 batch,这个 batch 中的数据除了 action 之外其它都是重复的,神经网络会为每个 action + 信息 给出一个 value,那么 ACTOR 就会根据这个评分去选 action。
但是对于 LEARNER 只需要一份牌局信息和一个 ACTOR 最终选择的那个 ACTION 就可以,在代码中能看到:
x_batch = np.hstack((my_handcards_batch,
other_handcards_batch,
last_action_batch,
landlord_up_played_cards_batch,
landlord_down_played_cards_batch,
landlord_up_num_cards_left_batch,
landlord_down_num_cards_left_batch,
bomb_num_batch,
my_action_batch))
x_no_action = np.hstack((my_handcards,
other_handcards,
last_action,
landlord_up_played_cards,
landlord_down_played_cards,
landlord_up_num_cards_left,
landlord_down_num_cards_left,
bomb_num)) x_batch 就是给 ACTOR 准备的,x no action 就是给 LEARNER 准备的。
def learn(position,
actor_models,
model,
batch,
optimizer,
flags,
lock):
&#34;&#34;&#34;Performs a learning (optimization) step.&#34;&#34;&#34;
if flags.training_device != &#34;cpu&#34;:
device = torch.device(&#39;cuda:&#39;+str(flags.training_device))
else:
device = torch.device(&#39;cpu&#39;)
obs_x_no_action = batch[&#39;obs_x_no_action&#39;].to(device)
obs_action = batch[&#39;obs_action&#39;].to(device)
obs_x = torch.cat((obs_x_no_action, obs_action), dim=2).float()
obs_x = torch.flatten(obs_x, 0, 1)
obs_z = torch.flatten(batch[&#39;obs_z&#39;].to(device), 0, 1).float()
target = torch.flatten(batch[&#39;target&#39;].to(device), 0, 1)
episode_returns = batch[&#39;episode_return&#39;][batch[&#39;done&#39;]]
mean_episode_return_buf[position].append(torch.mean(episode_returns).to(device))
with lock:
learner_outputs = model(obs_z, obs_x, return_value=True)
loss = compute_loss(learner_outputs[&#39;values&#39;], target)
stats = {
&#39;mean_episode_return_&#39;+position: torch.mean(torch.stack([_r for _r in mean_episode_return_buf[position]])).item(),
&#39;loss_&#39;+position: loss.item(),
}
optimizer.zero_grad()
loss.backward()
nn.utils.clip_grad_norm_(model.parameters(), flags.max_grad_norm)
optimizer.step()
for actor_model in actor_models.values():
actor_model.get_model(position).load_state_dict(model.state_dict())
return stats
这是 LEARNER,用到了 no action 的数据,还有不太一样的地方就是:
for actor_model in actor_models.values():
actor_model.get_model(position).load_state_dict(model.state_dict())这是在更新 ACTOR 的网络。
train的过程很多是进程处理相关的,实际上就是启动 actor 和 learner。代码就不贴了。
Github 上作者也致谢了一个Facebook 的项目,我大概看了一下,复制了图,是不是跟我上文描写的 ACTOR 和 LEANER 有点像。 ACTOR 负责产生训练数据,LEARNER 负责 TRAIN 模型,LEARNER 训练的模型参数再同步给 ACTOR 接着打。
|-----------------| |-----------------| |-----------------|
| ACTOR 1 | | ACTOR 2 | | ACTOR n |
|-------| | |-------| | |-------| |
| | .......| | | .......| . . . | | .......|
| Env |<-.Model.| | Env |<-.Model.| | Env |<-.Model.|
| |->.......| | |->.......| | |->.......|
|-----------------| |-----------------| |-----------------|
^ I ^ I ^ I
| I | I | I Actors
| I rollout | I rollout weights| I send
| I | I /--------/ I rollouts
| I weights| I | I (frames,
| I | I | I actions
| I | v | I etc)
| L=======>|--------------------------------------|<===========J
| |......... LEARNER |
\--------------|..Model.. Consumes rollouts, updates |
Learner |......... model weights |
sends |--------------------------------------|
weights
后续
后来我又将三个模型改成了一个,去支持地主和一个农民对战,双人的策略和规则和三人不一样了,但是两者基本对称了,我就改成一个模型训练,还改了一下 DEMO 变成了两人玩,一个真人一个 AI。
在训练的过程中,由于 ACTOR 速度太快,LEARNER 根本训练不过来,后来把 ACTOR 减少,才得以真正训练起来。
强化学习还有VAE或者GAN都是比较难训练的,因为它们都是均衡,不是一直强就好,训练两步网络,即使网络结构一样。
还有一个小小心得,对于博弈性质的机器学习,保留随机性是非常必要的,不然只要是重复的有明确规律性的必然被干。(跟量化交易有点干系)
参考
(1)Zha, Daochen et al. “DouZero: Mastering DouDizhu with Self-Play Deep Reinforcement Learning.” ICML (2021). |
|