如何使用 GeoMX 同步并行?#

GeoMX 目前支持两种基础同步算法,即全同步算法和复合同步算法,以及一种同步优化算法,即分层频率聚合算法。在本节中,我们将依次介绍它们。

全同步算法#

全同步算法 FSA 是默认的模型同步策略。在这种同步算法中,训练节点在每个轮次都会同步其模型数据(模型参数或梯度),并且数据中心内外的参数服务器系统都运行在同步并行模式下。所有训练节点都保持相同步调以确保模型一致性,但代价是训练速度,因为它需要在每个迭代执行模型同步,并等待所有计算和通信完成。

要使用全同步算法,只需要在初始化 kvstore 时指定为 dist_sync 运行模式:

import mxnet as mx

# Initialize distributed kvstore in synchronous mode.
kvstore_dist = mx.kv.create("dist_sync")

# Master worker sets the optimizer of the global parameter server to Adam.
if kvstore_dist.is_master_worker:
    kvstore_dist.set_optimizer(mx.optimizer.Adam(learning_rate=lr))

for epoch in range(num_epochs):
    for _, batch in enumerate(train_iter):
        # Perform forward and backward propagation to calculate gradients.
        ...
        # Synchronize gradients to obtain updated parameters.
        for idx, param in enumerate(net_params):
            if param.grad_req == "null": continue
            kvstore_dist.push(idx, param.grad(), priority=-idx)
            kvstore_dist.pull(idx, param.data(), priority=-idx)

示例代码可以在 examples/cnn.py 中找到。您可以简单地执行 bash scripts/xpu/run_vanilla_hips.sh 来运行这个示例,其中, xpu 需要设置为 cpugpu

复合同步算法#

复合同步算法 MixedSync 是全同步算法的异步版本,区别在于数据中心之间的参数服务器系统以异步并行模式运行。这种设置适用于数据中心内部训练节点具有同质性,而不同数据中心之间的资源异质性显著的情况。异步方法解决了低性能数据中心掉队导致的同步阻塞问题,从而能够加速分布式训练。

要使用复合同步算法,只需要在初始化 kvstore 时设置 dist_async (而不是 dist_sync)。其它设置保持不变:

import mxnet as mx

kvstore_dist = mx.kv.create("dist_async")

您也可以在我们提供的 Python 脚本中使用 --mixed-sync 选项来启用复合同步算法:

python examples/cnn.py --mixed-sync

您可以执行 bash scripts/xpu/run_mixed_sync.sh 来运行这个示例,其中 xpu 应为 cpugpu

在复合同步中使用 DCASGD 优化器#

为了缓解异步并行中的梯度过时问题,全局参数服务器可以配置为使用 DCASGD 优化器。这个优化有助于改善训练收敛性。

在复合同步中启用 DCASGD 的方式与在 MXNET 中一样:只需将 Adam 优化器替换为 DCASGD:

import mxnet as mx

kvstore_dist = mx.kv.create("dist_async")
if kvstore_dist.is_master_worker:
    kvstore_dist.set_optimizer(mx.optimizer.DCASGD(learning_rate=lr))

我们可以使用以下命令来同时启用复合同步和 DCASGD 优化器:

python examples/cnn.py --mixed-sync --dcasgd

请试试按上述提示修改 scripts/xpu/run_mixed_sync.sh 并运行它。

分层频率聚合#

受到这篇文章 (paper)的启发,我们的分层频率聚合算法首先在训练节点上执行 \(K_1\) 步本地更新,然后在域内参数服务器进行 \(K_2\) 次局域同步,然后才在全局参数服务器进行一次全局同步。这种方法有效减少了跨数据中心的模型同步频率,从而能够大幅提升分布式训练的效率。

要启用分层频率聚合,我们将 kvstore 初始化为 dist_sync 模式,并对训练循环做一些简单的修改:

import mxnet as mx

# Initialize distributed kvstore in synchronous mode.
kvstore_dist = mx.kv.create("dist_sync")

# Obtain K1 from environmental variables.
period_k1 = int(os.getenv('MXNET_KVSTORE_HFA_K1'))

# Obtain the number of training nodes in each data center.
num_local_workers = kvstore_dist.num_workers

# Define local trainer to use Adam optimizer.
optimizer = mx.optimizer.Adam(learning_rate=lr)
trainer = Trainer(net.collect_params(), optimizer=optimizer)

global_iters = 1
for epoch in range(num_epochs):
    for _, batch in enumerate(train_iter):
        # Perform forward and backward propagation to calculate gradients.
        ...
        # Update local model parameters.
        trainer.step(num_samples)
        # Synchronize model parameters every K1 round.
        if global_iters % period_k1 == 0:
            for idx, param in enumerate(net_params):
                kvstore_dist.push(idx, param.data() / num_local_workers, priority=-idx)
                kvstore_dist.pull(idx, param.data(), priority=-idx)
        # Update the iteration counter
        global_iters += 1

接下来,设置三个环境变量:

MXNET_KVSTORE_USE_HFA = 1  # whether HFA is enabled
MXNET_KVSTORE_HFA_K1 = 20  # number of loops before a local synchronization
MXNET_KVSTORE_HFA_K2 = 10  # number of loops before a global synchronization

演示代码可以在 examples/cnn_hfa.py 找到。您只需要简单地执行 bash scripts/xpu/run_hfa_sync.sh 即可。