如何使用 GeoMX 通信加速器?#

考虑到广域网中的网络资源常常是受限的,并且随时间动态时变,跨数据中心的分布式训练仍然面临诸多通信瓶颈。为了缓解这些瓶颈,GeoMX 采用了多种优化技术。这些技术包括梯度稀疏化、低精度量化(例如 FP16)、混合精度量化、更先进的传输协议、同步算法、流量调度、优先级调度、负载均衡,以及其他一些正在开发或集成的技术,例如通信覆盖调度等)。这些技术全面地解决了跨广域分布式系统的通信问题,进一步提高了 GeoMX 系统训练的效率和稳健性。

本教程将详述在 GeoMX 系统中启用每种通信优化器所需的环境变量和超参数设置。

双向梯度稀疏化#

已有方法如深度梯度压缩 (DGC) 会对上行梯度张量进行稀疏化处理。为了进一步压缩通信流量,GeoMX 还对下行(聚合)梯度张量进行稀疏化处理,而不是拉取完整模型参数。该技术被设计用于数据中心之间,以减少跨广域传输的通信流量。请参考这篇文章 (paper) 了解更多关于双向梯度稀疏化的介绍。

要启用双向梯度稀疏化,需要在 kvstore_dist.set_gradient_compression 中定义它,并设置压缩率:

import mxnet as mx

# Initialize distributed kvstore in synchronous mode.
kvstore_dist = mx.kv.create("dist_sync")

# Obtain the total number of training nodes.
num_all_workers = kvstore_dist.num_all_workers

# Master worker enables bidirectional gradient sparsification on the global parameter server.
if kvstore_dist.is_master_worker:
    kvstore_dist.set_gradient_compression({"type": "bsc", "threshold": 0.01})

# Define local trainer to use Adam optimizer.
optimizer = mx.optimizer.Adam(learning_rate=lr)
trainer = Trainer(net.collect_params(), optimizer=optimizer)

for epoch in range(num_epochs):
    for _, batch in enumerate(train_iter):
        # Perform forward and backward propagation to calculate gradients.
        ...
        # Synchronize gradients for gradient aggregation.
        for idx, param in enumerate(net_params):
            if param.grad_req == "null": continue
            kvstore_dist.push(idx, param.grad(), priority=-idx)
            kvstore_dist.pull(idx, param.grad(), priority=-idx)
        # Use aggregated gradients to update local model parameters.
        trainer.step(num_all_workers * batch_size)
        # Put gradients to zero manually.
        for param in net_params:
            param.zero_grad()

梯度张量根据大小分为大张量和小张量,只有大张量才会被稀疏化传输。我们可以通过环境变量 MXNET_KVSTORE_SIZE_LOWER_BOUND 设置分类大小张量的阈值:

MXNET_KVSTORE_SIZE_LOWER_BOUND = 1000

演示代码可以在 examples/cnn_bsc.py 中找到。您可以简单地执行 bash scripts/xpu/run_bisparse_compression.sh 来运行此示例,其中 xpu 应为 cpugpu

低精度量化#

GeoMX 也支持将模型数据量化为较低精度进行传输,例如以 FP16 数值精度格式。在这种方案中,GeoMX 使用 FP32 计算模型,但在传输时,它将模型数据张量转换为 FP16。一旦接收到拉取的数据,GeoMX 就会将其恢复为 FP32 并继续进行模型计算。这种方法可以有效地将局域网和广域网上传输的数据流量减半。

为了以 FP16 数值精度格式量化模型数据进行传输,我们可以简单地在 Python 代码中使用 astype('float16') 来转换张量的数值精度:

import mxnet as mx

# Initialize distributed kvstore in synchronous mode.
kvstore_dist = mx.kv.create("dist_sync")
is_master_worker = kvstore_dist.is_master_worker

# Initialize 16-bit kvstore space on parameter servers to store model parameters or gradients.
for idx, param in enumerate(net_params):
    init_buff = param.data().astype('float16')
    kvstore_dist.init(idx, init_buff)
    if is_master_worker: continue
    kvstore_dist.pull(idx, init_buff)
    param.set_data(init_buff.astype('float32'))

for epoch in range(num_epochs):
    for _, batch in enumerate(train_iter):
        # Perform forward and backward propagation to calculate gradients.
        ...
        # Synchronize gradients for gradient aggregation.
        for idx, param in enumerate(net_params):
            if param.grad_req == "null": continue
            # Push / pull large tensors in 16 bits.
            grad_buff = param.grad().astype('float16')
            kvstore_dist.push(idx, grad_buff, priority=-idx)
            kvstore_dist.pull(idx, grad_buff, priority=-idx)
            # Convert received gradient tensors back to 32 bits.
            param.grad()[:] = grad_buff.astype('float32')
        # Use aggregated gradients to update local model parameters.
        trainer.step(num_all_workers * batch_size)
        # Put gradients to zero manually.
        for param in net_params:
            param.zero_grad()

示例代码位于 examples/cnn_fp16.py,我们可以使用 bash scripts/xpu/run_fp16.sh 来运行它。

混合精度量化#

混合精度量化结合了双向梯度稀疏化和低精度量化两种技术。在这种方案中,小张量被量化为 FP16 格式进行传输,而大张量则保持为 FP32 格式。但是,这些大张量在传输前将经过稀疏化处理。这样设计是为了减少关键信息的丢失,避免对模型性能造成明显损伤。

表格 1:双向梯度稀疏化、低精度量化与混合精度量化的作用域汇总#

数据中心内部

数据中心之间

大张量

小张量

大张量

小张量

双向梯度稀疏化

单精度,密集张量

单精度,密集张量

单精度,稀疏张量

单精度,密集张量

半精度量化

半精度,密集张量

半精度,密集张量

半精度,密集张量

半精度,密集张量

混合精度量化

单精度,密集张量

半精度,密集张量

单精度,稀疏张量

半精度,密集张量

关于如何分类大张量和小张量,请参阅 双向梯度稀疏化。以下给出使用混合精度量化的示例代码:

import os
import mxnet as mx

# Define the threshold to classify large and tiny tensors, here, the threshold
# is the same as that in Bidirectional Gradient Sparsification.
size_lower_bound = int(os.getenv('MXNET_KVSTORE_SIZE_LOWER_BOUND', 1e3))

# Initialize distributed kvstore in synchronous mode.
kvstore_dist = mx.kv.create("dist_sync")
is_master_worker = kvstore_dist.is_master_worker

# Master worker enables bidirectional gradient sparsification on the global parameter server.
if is_master_worker:
    kvstore_dist.set_gradient_compression({"type": "bsc", "threshold": compression_ratio})

# Initialize kvstore space on parameter servers to store model parameters or gradients.
# Create 32-bit space for large tensors and 16-bit space for tiny tensors.
for idx, param in enumerate(net_params):
    init_buff = param.data() if param.data().size > size_lower_bound \
        else param.data().astype('float16')
    kvstore_dist.init(idx, init_buff)
    if is_master_worker: continue
    kvstore_dist.pull(idx, init_buff)
    param.set_data(init_buff.astype('float32'))

for epoch in range(num_epochs):
    for _, batch in enumerate(train_iter):
        # Perform forward and backward propagation to calculate gradients.
        ...
        # Synchronize gradients for gradient aggregation.
        for idx, param in enumerate(net_params):
            if param.grad_req == "null": continue
            # Push / pull large tensors in 32 bits, but tiny tensors in 16 bits.
            grad_buff = param.grad() if param.grad().size > size_lower_bound \
                else param.grad().astype('float16')
            kvstore_dist.push(idx, grad_buff, priority=-idx)
            kvstore_dist.pull(idx, grad_buff, priority=-idx)
            # Convert received gradient tensors back to 32 bits.
            param.grad()[:] = grad_buff.astype('float32')
        # Use aggregated gradients to update local model parameters.
        trainer.step(num_all_workers * batch_size)
        # Put gradients to zero manually.
        for param in net_params:
            param.zero_grad()

您可以在 examples/cnn_mpq.py 中找到它们,并执行 scripts/xpu/run_mixed_precision.sh 来运行。

差异梯度传输#

差异梯度传输是一种针对分布式机器学习任务进行了特别优化的新型传输协议,它利用梯度下降算法对部分梯度丢失的容忍性,使用多个具有不同可靠性和优先级的通道传输梯度。梯度被调度于哪个通道进行传输,取决于它们对模型收敛的贡献。通过这些优先级通道,重要梯度在传输中得到优先处理,而其他不太重要的梯度则以较低的优先级和可靠性进行尽力而为传输。这有助于减少分布式机器学习通信流量的尾流时延,从而减少参数同步的完成时间。关于 DGT 协议更加详细的说明请参考这篇文章 (Paper),如果希望独立使用 DGT 协议,请尝试这个代码库 (Repo)。

要启用差异梯度传输协议,设置以下环境变量:

ENABLE_DGT = 2  # whether to enable DGT, use value 2 for DGT instead of value 1
DMLC_UDP_CHANNEL_NUM = 3  # number of transmission channels
DMLC_K = 0.8  # compression ratio
ADAPTIVE_K_FLAG = 1  # set value K adaptively

您可以使用示例脚本 scripts/xpu/run_dgt.sh 来尝试运行它。

通信覆盖调度#

为了解决分布式系统通信的 TCP Incast 问题,GeoMX 整合了 TSEngine,这是一个为广域网中的高效通信覆盖设计的自适应通信调度器。TSEngine 可以根据实时网络条件,动态优化分布式节点之间的拓扑覆盖和通信逻辑。在系统通信效率和可扩展性方面,这种自适应调度器相较于现有的通信模式展现出明显优势。请参阅此文章 (Paper) 获取更多详情,也可以使用这个代码库 (Repo) 独立应用 TSEngine。

与差异梯度传输类似,我们只需要设置一些环境变量就可以启用 TSEngine:

ENABLE_INTER_TS = 1  # whether to enable TSEngine within the data center
ENABLE_INTRA_TS = 1  # whether to enable TSEngine between data centers
MAX_GREED_RATE_TS = 0.9  # perform exploration with a probability of 10%

使用示例脚本 scripts/xpu/run_tsengine.sh 来体验它!

备注

如果使用了 ENABLE_INTER_TS,TSEngine 就会在跨数据中心之间启用。反之,如果使用了 ENABLE_INTRA_TS,TSEngine 就会在数据中心内部启用。我们可以同时启用 ENABLE_INTER_TSENABLE_INTRA_TS,像这个例子给出的一样,但我们也可以选择只启用其中一个。

优先级参数传播#

在传统的实现中,第 \(r\) 轮的梯度同步与第 \(r+1\) 轮的前向传播不重叠,因为前向传播依赖于梯度同步的完成。为了提高系统效率,GeoMX 集成了 P3 调度器,该调度器优先传输浅层梯度。这种设置使得前向传播和梯度同步可以重叠,允许更早地执行下一轮的前向传播,从而加速分布式训练。请参阅此论文 (Paper) 以获取更多详情,如果希望独立使用 P3 调度器,请使用这个代码库 (Repo)。

要启用优先级参数传播,我们只需要设置一个环境变量:

ENABLE_P3 = 1  # whether to enable P3

请使用示例脚本 scripts/xpu/run_p3.sh 来体验它!

多全局参数服务器负载均衡#

GeoMX 支持启用多个全局参数服务器以均衡工作负载,包括通信流量、参数存储和聚合计算等。多全局参数服务器负载均衡技术 MultiGPS 可以避免单一全局参数服务器成为性能瓶颈,从而提高 GeoMX 系统的效率、可扩展性和整体性能。

要启用多全局参数服务器负载均衡,需要将所有机构的 DMLC_NUM_GLOBAL_SERVER 和中央机构内的 DMLC_NUM_SERVER 设置为大于 1 的整数。

# In the central party:
# For the global scheduler
DMLC_NUM_GLOBAL_SERVER = 2
# For the global server 0
DMLC_NUM_GLOBAL_SERVER = 2
DMLC_NUM_SERVER = 2
# For the global server 1
DMLC_NUM_GLOBAL_SERVER = 2
DMLC_NUM_SERVER = 2
# For the master worker
DMLC_NUM_SERVER = 2
# For the local scheduler in the central party
DMLC_NUM_SERVER = 2

# In the other parties:
# For the local server
DMLC_NUM_GLOBAL_SERVER = 2

我们也已经提供好了相应的示例脚本,使用 scripts/xpu/run_multi_gps.sh 来体验它!