banner
Nagi-ovo

Nagi-ovo

Breezing homepage: [nagi.fun](nagi.fun)
github

Let's build AlphaZero

本文是对于 Sunrise:从头理解 AlphaZero,MCTS,Self-Play,UCB 等文章、视频教程和代码实现的消化与理解。

本文将从 AlphaGo 的设计原理出发,通过深入理解 MCTS 和 Self-Play 这两个核心机制,逐步揭示如何构建一个能超越人类的 AI 五子棋(Gomoku) 系统。

AlphaGo:从模仿到超越#

AlphaGo 的进化路径#

围棋的可能局面数超过宇宙中的原子数量,使得传统穷举搜索方法完全失效。AlphaGo 通过一个分阶段的方法解决这个问题:先学习人类知识,然后通过 self-play 不断进化。

image

这个进化过程可以分为三层:

  1. 模仿人类高手
  2. 自我博弈提升
  3. 学习评估局势

核心组件#

image

快速走子策略(Rollout Policy)#

最左边的一个轻量级策略网络,用于快速评估,准确性较低但计算效率高。

SL 策略网络#

监督学习策略网络 PσP_{\sigma} 通过模仿人类棋谱学习下棋:

  • 输入:棋盘状态
  • 输出:模仿人类专家的下一步走法概率分布
  • 训练数据:16 万场对局,约 3000 万个落子步骤

RL 策略网络#

类似当你棋艺达到一定水平时,开始自己复盘,自己和自己对战,发现新的战术和更深的策略。RL 网络 PρP_{\rho} 从 SL 网络初始化,该网络已经远超人类,可以找到许多人类未曾发现的强力策略。

  • 这个阶段生成了大量 self-play 的数据,输入依然是棋盘状态,输出是通过强化学习改进的走棋策略
  • 与历史版本的策略网络 pπp_\pi 进行 self-play,网络通过 “赢棋” 信号来调整参数,通过 Policy Gradiant 强化那些导致胜利的走法。:
tz(τ)logp(atst;ρ)\sum_{t} z(\tau) \log p(a_t \mid s_t; \rho)

其中:

  • τ\tau 是对局序列 (s0,a0,s1,a1,...)(s_0,a_0,s_1,a_1,...)
  • z(τ)z(\tau) 是胜负标签(胜为正,负为负)

价值网络#

价值网络 vθv_\theta 学习评估局势,可以是个 CNN:

  • 训练目标:最小化均方误差
i(zvθ(s))2\sum_{i} (z - v_\theta(s))^2
  • zz:最终胜负结果
  • vθ(s)v_\theta(s):预测的获胜概率

补充解释#

关于 PρP_\rhovθv_\theta 的关系

  • 策略网络 PρP_\rho 提供具体的走子概率,用于指导搜索。
  • 价值网络 vθv_\theta 提供局面评估,减少搜索树中不必要的模拟。
  • 两者配合,使 MCTS 不仅能更快地探索高胜率路径,还能显著提升整体的下棋水平。

AlphaGo 的 MCTS 实现#

Selection 阶段#

结合了探索与利用:

at=argmaxaQ(st,a)+u(st,a)a_t = \arg\max_a Q(s_t, a) + u(s_t, a)
u(st,a)P(st,a)1+N(st,a)u(s_t, a) \propto \frac{P(s_t, a)}{1 + N(s_t, a)}

其中:

  • Q(st,a)Q(s_t, a):长期回报估计
  • u(st,a)u(s_t, a):探索奖励
  • P(st,a)P(s_t, a):策略网络输出的先验概率
  • N(st,a)N(s_t, a):访问次数

Simulation 与评估#

在原始的 MCTS 算法中,模拟阶段 (Simulation) 的作用是通过快速 rollout 策略从叶子节点(扩展的新节点)进行随机对弈,直至游戏结束,然后根据对弈的胜负得到一个回报 。这个回报被传递回搜索树中的节点,用于更新这些节点的值估计(即 Q(s,a)Q(s, a) )。

这样的实现简单直接,但是 rollout 策略通常是随机或简单的规则,模拟质量可能较差。且只能给出短期信息,无法很好地结合全局的战略评估。

而 AlphaGo 在模拟阶段结合了 Value Network vθv_\theta 和 rollout 策略。Value Network 提供了更高效的叶子节点估计和全局能力评估,rollout 策略通过快速模拟捕捉局部的短期效果。

使用超参数 λ\lambda 权衡 vθ(sL)v_\theta(s_L)zLz_L,兼顾局部模拟和全局评估。评估函数:

V(sL)=(1λ)vθ(sL)+λzL V(s_L) = (1 - \lambda)v_\theta(s_L) + \lambda z_L

Backpropagation#

n 次 MCTS 时节点访问次数更新(I\mathbb{I}是指示函数,访问则为 1):

N(st,a)=i=1nI(s,a,i)N(s_t, a) = \sum_{i=1}^{n} \mathbb{I}(s, a, i)

Q 值更新,即执行 a 到节点 sts_t 的长期预期回报:

Q(st,a)=1N(st,a)i=1nI(s,a,i)V(sLi)Q(s_t, a) = \frac{1}{N(s_t, a)} \sum_{i=1}^{n} \mathbb{I}(s, a, i) V(s_L^i)

总结#

  1. 结构创新

    • 策略网络提供先验知识
    • 价值网络提供全局评估
    • MCTS 提供战术验证
  2. 训练创新

    • 从监督学习起步
    • 通过强化学习超越教师
    • 自我博弈产生新知识
  3. MCTS 改进

    • 使用神经网络指导搜索
    • Policy Network 提了探索方向的先验概率,Value Network 提升了叶子节点评估的准确性。
    • 这样结合价值网络和 rollout 的评估,不仅减少了搜索宽度和深度,还显著提高了整体性能。
    • 高效的探索 - 利用平衡

这种设计使 AlphaGo 能在庞大的搜索空间中找到高效的解决方案,最终超越人类水平。

启发式搜索与 MCTS#

MCTS 就像是一个不断进化的探索者,在决策树中寻找最佳路径。

核心思想#

MCTS 的本质是什么?简单来说,它是一个 "边玩边学" 的过程。想象你在玩一盘全新的棋类游戏:

  • 开始时,你会尝试各种可能的走法(探索)
  • 慢慢发现某些走法效果更好(利用)
  • 在探索新策略和利用已知好策略之间取得平衡

这正是 MCTS 所做的,只不过它用数学的方式来系统化这个过程。其是一种 rollout 算法,通过累计蒙特卡洛模拟的价值估计来引导策略选择。

image

算法流程#

Monte Carlo Tree Search - YouTube这个老师对于 MCTS 的流程讲的很好。

MCTS 的优雅之处在于它的四个简单但强大的步骤,这里我用 A、B 两种理解方式来介绍:

  1. 选择 (Selection)

    • A:你知道小孩子是怎么学习的吗?他们总是在已知和未知之间徘徊。MCTS 也是如此:从根节点开始,使用 UCB (Upper Confidence Bound) 公式来权衡是选择已知的好路径,还是去探索新的可能。
    • B:从根节点出发,依据特定策略从当前节点中选择一个后续节点。该策略通常基于树的搜索历史,选取最具潜力的路径。例如,我们在每个节点依据当前的策略 π(s)\pi(s) 执行动作 aa,以平衡探索与利用,逐步深入。
  2. 扩展 (Expansion)

    • A:就像探险家在地图上开辟新的区域,当我们到达一个叶节点时,我们会向下扩展,创建新的可能性。
    • B:当选中的节点尚有未探索的子节点时,我们在此节点下依据可行动作集扩展新节点。这一过程的目的是增加决策树的广度,逐步生成可能的决策路径。通过这一扩展操作,我们确保搜索涵盖更多可能的 state action pair (s,a)(s,a)
  3. 模拟 (Simulation)

    • A:这是最有趣的部分。从新节点开始,我们进行一次 "假想" 的游戏,直到游戏结束。这就像下棋时在脑中推演 "如果我这样走,对手那样走..."。
    • B:从当前拓展的节点出发,执行随机模拟(rollout),在 MDP 环境中沿着当前策略 π(s)\pi(s) 进行采样直至终止状态。此过程提供了从当前节点出发到终点的回报估计,为路径的优劣提供数值依据。
  4. 回溯 (Backpropagation)

    • A:最后,我们把得到的结果沿路径返回,更新每个节点的统计信息。这就像在说:"嘿,我试过这条路,效果还不错(或不太好)"。
    • B:完成模拟后,将该模拟的估计回报回溯到经过的所有节点,以更新这些节点的价值。这一过程累积了历史回报信息,使得未来的选择更加精确地趋向高收益路径。

image

UCB1:探索与利用的完美平衡#

这里要特别提到 UCB1 公式,它是 MCTS 的灵魂:

UCB1=Xˉi+C×(ln(N)/ni)UCB1 = X̄ᵢ + C × \sqrt{(ln(N)/nᵢ)}

让我们解构一下:

  • XˉiX̄ᵢ 是平均收益(利用项)
  • (ln(N)/ni)\sqrt{(ln(N)/nᵢ)} 是不确定性度量(探索项)
  • CC 是探索参数

就像一个优秀的投资组合,既要关注已知的好机会,也要保持对新机会的开放(探索 - 利用权衡)。

image

Best Multi-Armed Bandit Strategy? (feat: UCB Method) 这个视频对 Multi-Armed Bandit 和 UCB Method 讲的很好,我这里借鉴这个老师用的例子:

一个吃货尝试理解 UCB#

想象你刚到一个城市,有 100 家餐厅可选择,你有 300 天时间。每天你都要选择一家餐厅就餐,希望在这 300 天里平均吃到最好的美食。

ε-greedy 策略:简单但不够智能#

这就像是用掷骰子决定:

  • 90% 的时间 (ε=0.1):去已知最好的餐厅(利用)
  • 10% 的时间:随机尝试一家新餐厅(探索)
def epsilon_greedy(restaurants, ratings, epsilon=0.1):
    if random.random() < epsilon:
        return random.choice(restaurants)  # 探索
    else:
        return restaurants[np.argmax(ratings)]  # 利用

这样的效果是:

  • 探索完全随机,可能重复访问已知很差的餐厅
  • 探索比例固定,不会随时间调整
  • 不考虑访问次数的影响

UCB 策略:更智能的选择权衡#

UCB 公式在餐厅选择中的含义如下:

评分=平均得分+C×ln(总访问天数)/该餐厅访问次数评分 = 平均得分 + C × \sqrt{ln(总访问天数)/该餐厅访问次数}

例如,考虑两家餐厅在第 100 天时的情况:

A 餐厅:

  • 访问 50 次,平均分 4.5
  • UCB 分数 = 4.5+2×ln(100)/504.5+0.6=5.14.5 + 2×\sqrt{ln(100)/50} ≈ 4.5 + 0.6 = 5.1

B 餐厅:

  • 访问 5 次,平均分 4.0
  • UCB 分数 = 4.0+2×ln(100)/54.0+1.9=5.94.0 + 2×\sqrt{ln(100)/5} ≈ 4.0 + 1.9 = 5.9

虽然 B 餐厅平均分较低,但因为访问次数少,不确定性高,所以 UCB 给予更高的探索奖励

image

代码实现:

class Restaurant:
    def __init__(self, name):
        self.name = name
        self.total_rating = 0
        self.visits = 0
        
def ucb_choice(restaurants, total_days, c=2):
    # 确保每家餐厅至少访问一次
    for r in restaurants:
        if r.visits == 0:
            return r
            
    # 使用UCB公式选择餐厅
    scores = []
    for r in restaurants:
        avg_rating = r.total_rating / r.visits
        exploration_term = c * math.sqrt(math.log(total_days) / r.visits)
        ucb_score = avg_rating + exploration_term
        scores.append(ucb_score)
        
    return restaurants[np.argmax(scores)]

为什么 UCB 更好?#

  1. 自适应探索

    • 访问次数少的餐厅获得更高的探索奖励
    • 随着总天数增加,探索项会逐渐减小,以更好地进行利用
  2. 平衡时间投资

    • 不会在明显较差的餐厅上浪费太多时间
    • 会在潜力相近的餐厅之间合理分配访问次数
  3. 理论保证

    • Regret Bound(与最优选择的差距)随时间呈对数增长
    • 300 天的探索足够找到最好的几家餐厅

我们回到 MCTS:

image

为什么 MCTS 如此强大?#

  • 高效处理组合爆炸: MCTS 不需要像 Minimax 穷举搜索所有可能,而是专注于最有希望的分支,使其能够有效处理分支因子巨大的问题。
  • 自适应搜索: MCTS 动态调整其搜索策略,将更多资源分配给更有希望的区域,从而更快地找到好的解决方案。
  • 平衡探索与利用: 通过 UCB 公式,MCTS 在探索新可能性和利用已知良好选择之间取得平衡,避免陷入局部最优。
  • 无需领域知识: MCTS 不依赖于特定领域的专业知识,仅依靠游戏规则和模拟结果进行学习,使其具有广泛的适用性。
  • 可随时停止: MCTS 是一种 “随时” 算法,可以随时中断并返回当前最佳解决方案,这对于实时应用至关重要。

AlphaZero:从 MCTS 到自我进化#

AlphaZero 是 DeepMind 在 AlphaGo 之后推出的通用强化学习算法,它能够在不使用人类棋谱的情况下,通过自我对弈(Self-Play)从零开始学习并最终超越专业水平。

AlphaZero 对传统的 MCTS 进行了改进,引入了 神经网络 来指导搜索:

  • 策略先验:使用神经网络预测每个动作的先验概率,使 搜索更加高效
  • 价值评估:在叶节点,使用神经网络的价值预测代替随机模拟,降低计算成本。

下面我们我们以五子棋为例,实现一个可以这样自学成才的 AI。这里学习 & 借鉴 schinger/alphazero.py 的优秀实现。

游戏环境的设计#

在实现 AlphaZero 之前,我们需要先定义游戏环境。

定义游戏接口#

在 AlphaZero 中,神经网络接收当前的棋盘状态,输出一个策略向量 P\boldsymbol{P} 表示每个动作的概率,以及一个标量值 vv 表示当前玩家的胜率预测。

为使 MCTS 和神经网络能够通用地与游戏交互,我们需要定义一个一致的游戏接口。

class GomokuGame:
    def __init__(self, n=15):
        self.n = n  # 棋盘大小,默认15x15

    def getInitBoard(self):
        """
        返回初始的棋盘状态,所有位置都为空。
        """
        b = Board(self.n)
        return np.array(b.pieces)

    def getBoardSize(self):
        """
        返回棋盘的尺寸,即 (n, n)。
        """
        return (self.n, self.n)

    def getActionSize(self):
        """
        返回动作的总数,这里是 n * n,因为每个格子都可能是一个动作。
        """
        return self.n * self.n

    def getNextState(self, board, player, action):
        """
        执行动作,返回下一个棋盘状态和下一个玩家。

        参数:
        - board: 当前棋盘状态
        - player: 当前玩家(1 或 -1)
        - action: 当前动作,0 ~ n*n-1

        返回:
        - (next_board, next_player): 执行动作后的棋盘和下一玩家
        """
        b = Board(self.n)
        b.pieces = np.copy(board)
        # 将动作转换为坐标 (x, y)
        move = (action // self.n, action % self.n)
        b.execute_move(move, player)
        return (b.pieces, -player)
  • 统一的接口:使得我们的 MCTS 和神经网络可以在不同的游戏中复用,只需实现游戏的具体逻辑。
  • 棋盘表示:使用二维数组表示棋盘,便于处理和可视化。

核心算法实现#

双头神经网络#

网络结构#

在 AlphaZero 中,我们使用一个统一的双头神经网络来同时预测策略(policy)和价值(value)。这个神经网络接收当前的棋盘状态以计算输出:

pp :策略头输出的概率分布,表示在状态 ss 下每个可能动作的选择概率。这个策略头与 MCTS 搜索结合,生成更强的决策。
vv :价值头输出一个标量,表示当前棋盘状态 ss 的预测值(最终获胜的概率)。

import torch
import torch.nn as nn
import torch.nn.functional as F

class AlphaZeroNNet(nn.Module):
    def __init__(self, game, args):
        """
        参数:
        - game: 游戏对象,提供棋盘大小和动作空间大小等信息。
        - args: 包含网络结构的参数,例如通道数、dropout 概率等。
        """
        super(AlphaZeroNNet, self).__init__()
        self.board_x, self.board_y = game.getBoardSize()  # 棋盘尺寸
        self.action_size = game.getActionSize()           # 动作空间大小
        self.args = args

        # 卷积层块
        self.conv_layers = nn.Sequential(
            nn.Conv2d(1, args.num_channels, kernel_size=3, padding=1),
            nn.BatchNorm2d(args.num_channels),
            nn.ReLU(),
            nn.Conv2d(args.num_channels, args.num_channels, kernel_size=3, padding=1),
            nn.BatchNorm2d(args.num_channels),
            nn.ReLU(),
            nn.Conv2d(args.num_channels, args.num_channels, kernel_size=3),
            nn.BatchNorm2d(args.num_channels),
            nn.ReLU(),
            nn.Conv2d(args.num_channels, args.num_channels, kernel_size=3),
            nn.BatchNorm2d(args.num_channels),
            nn.ReLU(),
        )

        # 计算卷积层输出的尺寸
        conv_output_size = self._get_conv_output_size()

        # 全连接层块
        self.fc_layers = nn.Sequential(
            nn.Linear(conv_output_size, 1024),
            nn.BatchNorm1d(1024),
            nn.ReLU(),
            nn.Dropout(args.dropout),
            nn.Linear(1024, 512),
            nn.BatchNorm1d(512),
            nn.ReLU(),
            nn.Dropout(args.dropout),
        )

        # 策略头:输出每个动作的对数概率
        self.policy_head = nn.Linear(512, self.action_size)

        # 价值头:输出当前状态的价值估计
        self.value_head = nn.Linear(512, 1)

    def _get_conv_output_size(self):
        """
        计算卷积层输出的特征数量。
        """
        dummy_input = torch.zeros(1, 1, self.board_x, self.board_y)
        output_feat = self.conv_layers(dummy_input)
        n_size = output_feat.view(1, -1).size(1)
        return n_size

    def forward(self, s):
        """
        前向传播函数。

        参数:
        - s: 输入的棋盘状态,形状为 (batch_size, board_x, board_y)。

        返回:
        - log_policies: 策略的对数概率,形状为 (batch_size, action_size)。
        - values: 价值估计,形状为 (batch_size, 1)。
        """
        # 输入形状调整
        s = s.view(-1, 1, self.board_x, self.board_y)  # (batch_size, 1, board_x, board_y)

        # 卷积层提取特征
        s = self.conv_layers(s)  # (batch_size, num_channels, new_board_x, new_board_y)

        # 展平卷积层输出
        s = s.view(s.size(0), -1)  # (batch_size, conv_output_size)

        # 全连接层提取高级特征
        s = self.fc_layers(s)  # (batch_size, 512)

        # 策略头输出
        policies = self.policy_head(s)  # (batch_size, action_size)
        log_policies = F.log_softmax(policies, dim=1)

        # 价值头输出
        values = self.value_head(s)  # (batch_size, 1)
        values = torch.tanh(values)  # 将价值限定在 [-1, 1]

        return log_policies, values
  • 卷积层提取特征:通过多层卷积层,提取棋盘的空间特征。
  • 全连接层输出:将特征展平,通过全连接层,分别输出策略和价值。
  • 激活函数:
    • 策略输出使用 log_softmax,方便后续计算交叉熵损失。
    • 价值输出使用 tanh,将值限制在 [1,1][-1, 1] 之间。

损失函数#

训练目标通过以下损失函数定义实现同时训练网络的策略和价值评估能力

l=(zv)2πlogp+cθ2l = (z - v)^2 - \pi \log p + c \| \theta \|^2

(zv)2(z - v)^2 :价值损失,要求网络输出的 vv 尽量接近对局结果 zz(胜利为 +1,失败为 -1,平均为 0)。
πlogp- \pi \log p:策略损失,通过交叉熵,要求网络输出的策略 pp 尽量匹配 MCTS 得到的最终策略 π\pi
cθ2c \| \theta \|^2 :L2 正则化项,用于防止过拟合,保持模型参数 θ\theta 的权重规模适当。

MCTS 类#

class MCTS:
    def __init__(self, game, nnet, args):
        self.game = game         # 游戏环境
        self.nnet = nnet         # 神经网络
        self.args = args         # 参数
        self.Qsa = {}            # 存储 Q 值:Q(s,a)
        self.Nsa = {}            # 存储边的访问次数:N(s,a)
        self.Ns = {}             # 存储节点的访问次数:N(s)
        self.Ps = {}             # 存储策略先验:P(s,a)

        self.Es = {}             # 存储游戏结束信息:E(s)
        self.Vs = {}             # 存储合法动作:V(s)

这里的一个亮点是使用了缓存机制:使用字典来缓存计算结果,避免重复计算,提高效率。

有效动作掩码#

在很多游戏中,某些动作在特定状态下是非法的。比如在五子棋中,如果某个位置已经被占据,那么在该位置下子就是非法的。因此,确保只考虑合法动作 对于算法的正确性和效率至关重要。这是通过 有效动作掩码(valid mask) 来实现的。

search 方法中,当我们到达叶节点并需要使用神经网络进行预测时,我们对神经网络输出的策略进行处理,使用 valid mask 来屏蔽非法动作。

if s not in self.Ps: # 不在策略先验存储 P(s,a) 中
    # 叶节点,根据神经网络输出策略进行扩展
    self.Ps[s], v = self.nnet.predict(canonicalBoard)
    valids = self.game.getValidMoves(canonicalBoard, 1)  # 获取合法动作的掩码,1 为合法
    self.Ps[s] = self.Ps[s] * valids  # 掩码非法动作
    sum_Ps_s = np.sum(self.Ps[s])
    
    if sum_Ps_s > 0:
        self.Ps[s] /= sum_Ps_s  # 归一化策略概率
    else:
        # 所有动作被屏蔽,进行均匀分布
        self.Ps[s] = self.Ps[s] + valids
        self.Ps[s] /= np.sum(self.Ps[s])

    self.Vs[s] = valids  # 存储合法动作的掩码
    self.Ns[s] = 0       # 初始化状态访问次数
    return v

PUCT#

UCB 公式的一种变体,专门用于 MCTS。

在 AlhphaZero 中,MCTS 使用神经网络的输出来指导搜索方向。在搜索过程中,神经网络通过改进的上置信限公式 PUCT (Polynomial Upper Confidence Trees for Trees) 来选择动作 PUCT ,引入了先验概率 P(s,a)P(s, a),这使得 MCTS 可以利用外部知识(策略网络)来指导搜索。:

U(s,a)=Q(s,a)+cpuctP(s,a)N(s)1+N(s,a)U(s, a) = Q(s, a) + c_{puct} \cdot P(s, a) \cdot \frac{\sqrt{N(s)}}{1 + N(s, a)}

其中:

  • Q(s,a)Q(s, a) 是在状态 ss 选择动作 aa 的平均价值。
  • P(s,a)P(s, a) 是先验概率,由神经网络提供。
  • N(s)N(s)N(s,a)N(s, a) 分别是状态 ss 和边 (s,a)(s, a) 被访问的次数。
  • cpuctc_{puct} 是一个控制探索程度的常数。结合后面的探索项可以鼓励选择访问次数较少但先验概率高的动作
cur_best = -float('inf') # 记录当前最大的 U 值
best_act = -1            # 记录具有最大 U 值的动作

# 遍历所有可能的动作
for a in range(self.game.getActionSize()):
    if valids[a]: 
        # 对于合法的动作,计算 PUCT 值
        if (s, a) in self.Qsa:
            # 如果之前访问过 (s, a),使用已计算的 Q 值和访问次数
            u = self.Qsa[(s, a)] + self.args.cpuct * self.Ps[s][a] * \
                math.sqrt(self.Ns[s]) / (1 + self.Nsa[(s, a)])
        else:
            # 如果是未访问过的节点,Q 值为 0,鼓励探索
            u = self.args.cpuct * self.Ps[s][a] * math.sqrt(self.Ns[s] + 1e-8) # 对未访问过的动作给予更高的 U 值,避免除零错误

        # 选择具有最高 U 值的动作
        if u > cur_best:
            cur_best = u
            best_act = a

平衡探索与利用#

  • 未访问过的节点由于 N(s, a) = 0,分母更小,使得探索项更大,鼓励算法探索新的动作。

  • Q (s, a) 反映了当前对动作价值的估计,代表了利用。

价值取反#

在玩家 AA 的视角下,获得越高的价值对玩家 BB 就意味着越低的价值,反之亦然。因此在整个搜索和价值评估过程中,确保价值反映的是当前玩家的利益最大化,从而保持视角一致性。

在代码中,这一实现就体现在如下调用中:

v = -self.search(next_s)

Search 函数#

def search(self, canonicalBoard):
    """
    执行一次 MCTS 搜索。

    参数:
    - canonicalBoard: 当前玩家的棋盘状态(规范形式)

    返回:
    - v: 对当前节点的价值评估
    """
    s = self.game.stringRepresentation(canonicalBoard)

    if s not in self.Es:
        self.Es[s] = self.game.getGameEnded(canonicalBoard, 1)
    if self.Es[s] is not None:
        # 如果游戏结束,返回结果
        return self.Es[s]

    '''
    Valid Mask & 叶节点拓展
    '''

    valids = self.Vs[s]

    '''
    使用 PUCT 公式选择最优动作
    '''

    a = best_act
    next_s, next_player = self.game.getNextState(canonicalBoard, 1, a)
    next_s = self.game.getCanonicalForm(next_s, next_player)

    # 下一状态的价值取反
    v = -self.search(next_s)

    # 更新 Q 值和访问次数
    if (s, a) in self.Qsa:
        self.Qsa[(s, a)] = (self.Nsa[(s, a)] * self.Qsa[(s, a)] + v) / (self.Nsa[(s, a)] + 1)
        self.Nsa[(s, a)] += 1
    else:
        self.Qsa[(s, a)] = v
        self.Nsa[(s, a)] = 1

    self.Ns[s] += 1
    return v
  • 通过递归深入搜索,模拟未来的可能性。
  • 在叶节点使用神经网络预测,加快搜索速度。
  • 由于玩家轮流,下一状态的价值取反。

Self-Play#

不需要人类对弈数据训练的围棋算法,仅通过 self-play 获取数据进行训练。

self-play 即每走一步,执行若干次 MCTS 得到提升策略 π(s,a)\pi(s,a) 来选择走子。依次轮换黑白子玩家直至游戏结束。自我对弈生成了大量的训练数据,帮助神经网络学习更好的策略和价值估计。

image

实现 “执行一局自我对弈”#

首先来看 Self-Play 的核心函数:executeEpisode,它负责执行一整局游戏,直到终局,并收集训练数据。

def executeEpisode(self):
    """
    执行一局自我对弈,返回训练样本列表。

    返回:
        trainExamples: 一个列表,包含多个 (canonicalBoard, pi, v) 元组。
    """
    trainExamples = []
    board = self.game.getInitBoard()  # 初始化棋盘
    self.curPlayer = 1                # 当前玩家(1 或 -1)
    episodeStep = 0                   # 记录步数

    while True: # 直到游戏结束
        episodeStep += 1
        canonicalBoard = self.game.getCanonicalForm(board, self.curPlayer)

        temp = int(episodeStep < self.args.tempThreshold)  # 温度参数

        pi = self.mcts.getActionProb(canonicalBoard, temp=temp)  # 使用 MCTS 获取动作的概率分布(策略)
        sym = self.game.getSymmetries(canonicalBoard, pi)        # 数据增强(对称性)
        for b, p in sym:
            trainExamples.append([b, self.curPlayer, p, None])

        action = np.random.choice(len(pi), p=pi)  # 按照策略概率选择动作
        board, self.curPlayer = self.game.getNextState(board, self.curPlayer, action) # 执行动作,更新棋盘和当前玩家。

        r = self.game.getGameEnded(board, self.curPlayer)  # 检查游戏是否结束
        if r is not None:
            # 为每个训练样本赋予最终的价值,对于获胜的玩家为 +1,失败的玩家为 -1,平局为 0
            return [(x[0], x[2], r * ((-1)  (x[1] != self.curPlayer))) for x in trainExamples]
  • 温度参数:控制策略的探索性。前期设置为 1,鼓励探索;后期设置为 0,选择最优策略偏向利用。

  • 数据增强:在五子棋中,棋盘的旋转和翻转不会改变游戏的本质。利用这些对称性,生成等价的棋盘和策略,增加训练数据以提高模型的泛化能力。

如果游戏结束,则根据对弈记录生成训练样本,返回 trainExamples

自我对弈的主循环#

自我对弈不仅仅是执行一局游戏,我们需要在多次迭代中,不断地进行自我对弈,更新模型。

def learn(self):
    """
    进行多次迭代的自我对弈和模型更新。
    """
    for i in range(1, self.args.numIters + 1):
        log.info(f"Starting Iter #{i} ...")
        # 存储本次迭代的训练样本
        iterationTrainExamples = deque([], maxlen=self.args.maxlenOfQueue)

        # 进行指定次数的自我对弈
        for _ in tqdm(range(self.args.numEps), desc="Self Play"):
            self.mcts = MCTS(self.game, self.nnet, self.args)  # 重置 MCTS
            iterationTrainExamples += self.executeEpisode()

        # 保存训练样本
        self.trainExamplesHistory.append(iterationTrainExamples)

        # 维护固定长度的训练样本历史
        if len(self.trainExamplesHistory) > self.args.numItersForTrainExamplesHistory:
            log.warning(f"Removing the oldest entry in trainExamples.")
            self.trainExamplesHistory.pop(0)

        # 合并并打乱所有的训练样本
        trainExamples = []
        for e in self.trainExamplesHistory:
            trainExamples.extend(e)
        shuffle(trainExamples)

        # 训练神经网络
        self.nnet.save_checkpoint(folder=self.args.checkpoint, filename="temp.pth.tar")
        self.pnet.load_checkpoint(folder=self.args.checkpoint, filename="temp.pth.tar")
        pmcts = MCTS(self.game, self.pnet, self.args)

        self.nnet.train(trainExamples)
        nmcts = MCTS(self.game, self.nnet, self.args)
  • 如果历史长度超过设定的阈值,则移除最旧的样本,确保模型不会过度拟合于旧数据。
  • 在收集到足够的训练数据后,使用 self.nnet.train(trainExamples) 来训练模型。

筛选机制#

为了确保模型的棋力不断提升,AlphaGo Zero 引入了 新旧模型对弈的筛选机制

  • 在每轮训练完成后,将新模型与旧模型进行对弈。
    • 如果新模型的胜率超过设定的阈值(如 55%),则接受新模型,继续进入下一轮自我对弈;否则,恢复旧模型的权重继续以旧模型为基础重新生成数据并训练,避免无意义的退化和过拟合问题。
        # 评估新模型
        log.info("PITTING AGAINST PREVIOUS VERSION")
        arena = game.Arena(lambda x: np.argmax(pmcts.getActionProb(x, temp=0)),
                           lambda x: np.argmax(nmcts.getActionProb(x, temp=0)),
                           self.game)
        pwins, nwins, draws = arena.playGames(self.args.arenaCompare)

        log.info(f"NEW/PREV WINS : {nwins} / {pwins} ; DRAWS : {draws}")
        if pwins + nwins == 0 or float(nwins) / (pwins + nwins) < self.args.updateThreshold:
            log.info("REJECTING NEW MODEL")
            self.nnet.load_checkpoint(folder=self.args.checkpoint, filename="temp.pth.tar")
        else:
            log.info("ACCEPTING NEW MODEL")
            self.nnet.save_checkpoint(folder=self.args.checkpoint, filename="best.pth.tar")

从 MCTS 中获取策略#

在自我对弈过程中,我们使用 MCTS 来选择动作。getActionProb 函数从 MCTS 中获取策略概率分布。

def getActionProb(self, canonicalBoard, temp=1):
    """
    执行多次 MCTS 搜索,返回动作的概率分布。

    参数:
        canonicalBoard: 当前规范化的棋盘状态
        temp: 温度参数

    返回:
        probs: 动作的概率分布
    """
    # 进行指定次数的 MCTS 搜索
    for _ in range(self.args.numMCTSSims):
        self.search(canonicalBoard) 

    s = self.game.stringRepresentation(canonicalBoard)
    # 记录每个动作被访问的次数 N(s, a)
    counts = [self.Nsa[(s, a)] if (s, a) in self.Nsa else 0 
              for a in range(self.game.getActionSize())]

    if temp == 0:
        bestAs = np.array(np.argwhere(counts == np.max(counts))).flatten()
        bestA = np.random.choice(bestAs)
        probs = [0] * len(counts)
        probs[bestA] = 1
        return probs

    counts = [x  (1. / temp) for x in counts]
    counts_sum = float(sum(counts))
    probs = [x / counts_sum for x in counts]
    return probs

访问次数 N(s, a) 反映了 MCTS 对动作的信心。

计算策略概率:

  • temp = 0 时:
    • 选择访问次数最多的动作,概率为 1。
    • 保证策略的确定性,通常用于游戏的后期或评估阶段。
  • temp > 0 时:
    • 对访问次数进行温度变换:counts = [x (1. / temp) for x in counts]
    • 温度越高,策略越接近均匀分布,探索性越强。

训练历程#

MCTS 模拟次数:400,cpuct:1.0,训练 50 轮:

image

此时模型已经可以学到五子棋的规则,达到了初级难度的 AI,你可以在棋盘边缘偷偷取胜。

image

加入 1cycle 学习率调整#

动态调整学习率可以提高模型的收敛速度和泛化能力。

  • 第一阶段(前半周期):学习率从最小值(1.0×1041.0 \times 10^{-4})逐渐增加到最大值(1.0×1021.0 \times 10^{-2})。
  • 第二阶段(后半周期):学习率从最大值逐渐减少回最小值。
class NNetWrapper:
    def __init__(self, game, args):
        ...
        self.optimizer = optim.Adam(self.nnet.parameters(), lr=args.max_lr)
        
        # 1cycle 学习率参数
        self.total_steps = args.numIters * args.epochs * (args.maxlenOfQueue // args.batch_size)
        self.current_step = 0

    def get_learning_rate(self):
        """实现 1cycle 学习率策略"""
        if self.current_step >= self.total_steps:
            return self.args.min_lr
        
        half_cycle = self.total_steps // 2
        
        if self.current_step <= half_cycle:
            # 第一阶段:从 min_lr 增加到 max_lr
            phase = self.current_step / half_cycle
            lr = self.args.min_lr + (self.args.max_lr - self.args.min_lr) * phase
        else:
            # 第二阶段:从 max_lr 减少到 min_lr
            phase = (self.current_step - half_cycle) / half_cycle
            lr = self.args.max_lr - (self.args.max_lr - self.args.min_lr) * phase
        
        return lr

加入梯度裁剪#

可以复习一下 PPO

为防止梯度爆炸,稳定训练过程,我们在反向传播后加入 梯度裁剪。
假设某个参数的梯度向量为 g\mathbf{g},其当前的 L2 范数为 g\|\mathbf{g}\|

  • 如果 g1.0\|\mathbf{g}\| \leq 1.0,则 g\mathbf{g} 保持不变。
  • 如果 g>1.0\|\mathbf{g}\| > 1.0,则将 g\mathbf{g} 缩放为 1.0gg\frac{1.0}{\|\mathbf{g}\|} \cdot \mathbf{g}

这使得:

1.0gg=1.0\|\frac{1.0}{\|\mathbf{g}\|} \cdot \mathbf{g}\| = 1.0
def train(self, examples):
    ...
    for epoch in range(self.args.epochs):
        ...
        for _ in t:
            ...
            # 计算损失并反向传播
            total_loss.backward()

            # 梯度裁剪
            if self.args.grad_clip:
                torch.nn.utils.clip_grad_norm_(self.nnet.parameters(), self.args.grad_clip)

            # 更新参数
            self.optimizer.step()
            ...

梯度裁剪将梯度的范数限制在指定阈值内,防止梯度过大导致训练不稳定。通过在每次参数更新前剪裁梯度,我们确保训练过程的稳健性,加速模型的收敛。

结语#

最后的代码和 Demo 可以在下面仓库中找到,模型权重可以在文档中的 huggingface 链接仓库里找到。

做出上述这些改进后,后面将 MCTS 模拟增加到 2000 次,cput:4.0 增加探索倾向,训练 22 轮(在单张 3090 Ti 上运行了 4 天),这时我们的模型逐渐升级到普通难度(在 50 次 MCTS 模拟设置下,前期压迫感尚可,中后期会注意不到对方的赢点)

image

如果增加推理时的 MCTS 模拟次数可以让其达到困难人机的水平,感兴趣的话可以试试能否获胜。

Screenshot 2024-12-15 at 15.54.38

参考资料#

加载中...
此文章数据所有权由区块链加密技术和智能合约保障仅归创作者所有。