本章内容包括:
理解数据并行、模型并行和管道并行
使用在Kubernetes中支持数据并行训练的示例训练服务
使用多个GPU进行训练大型模型
在深度学习研究领域中,一个明显的趋势是通过更大的数据集和更复杂的模型架构来提升模型性能。但是更多的数据和更庞大的模型也带来了一些问题:模型训练过程和模型开发过程变得缓慢。在计算中,性能往往与速度产生矛盾。例如,使用单个GPU训练BERT(双向编码器表示来自Transformer的表示)自然语言处理模型可能需要几个月的时间。
为了解决不断增长的数据集和模型参数规模的问题,研究人员提出了各种分布式训练策略。主要的训练框架,如TensorFlow和PyTorch,提供了实现这些训练策略的SDK。借助这些训练SDK,数据科学家可以编写在多个设备(CPU或GPU)上并行运行的训练代码。
在本章中,我们将从软件工程师的角度探讨如何支持分布式训练。具体而言,我们将了解如何编写一个训练服务,以在一组机器上执行由数据科学家开发的不同分布式训练代码。
阅读完本章后,您将全面了解分布式训练的工作原理,从数据科学家和开发者的角度来看。您将了解几种分布式训练策略和分布式训练代码模式,以及训练服务如何支持不同的分布式训练代码。
分布式训练方法的类型
有三种主要的分布式训练方法:模型并行、数据并行和管道并行。模型并行是将神经网络分割为多个连续的子网络,并在不同的设备(GPU或CPU)上运行每个子网络的策略。通过这种方式,我们可以使用一组GPU对大型模型进行训练。
管道并行是模型并行的高级版本。模型并行的一个主要问题是在训练过程中只有一个GPU处于活动状态,其他GPU处于空闲状态。通过将每个训练样本批次分成小的微批次,管道并行可以在层之间重叠计算,以最大化GPU的性能。这样不同的GPU可以同时处理不同的微批次。GPU的训练吞吐量和设备利用率得到改善,模型训练速度比模型并行快得多。
数据并行将数据集分成较小的子集,并让每个设备单独训练这些子集。因为每个设备现在训练的数据集较小,训练速度提高了。
将单设备训练代码转换为模型并行或管道并行训练需要进行大量的代码更改,包括将神经网络分割为多个子网络,在不同的GPU上运行子网络,并在不同的GPU上复制子网络的计算输出。这些更改的数量和复杂性使其变得棘手且难以调试。每个模型算法可能具有截然不同的模型架构,因此没有标准化的方法可以将模型分割为模型并行或管道并行。数据科学家必须根据实际情况逐个构建代码。
相反,数据并行仅需要对单设备训练代码进行最小的代码更改。并且有标准化的模式可以将非分布式训练代码转换为数据并行,而无需更改模型算法或架构。此外,数据并行代码相对容易理解和调试。这些优点使数据并行成为分布式训练的首选方法。
尽管数据并行具有许多优点,但模型并行和管道并行也有各自的优势和用途。例如,当你有无法放入一个GPU中的大型模型时,它们是最佳的分布式解决方案。我们将在第4.4节中更详细地讨论它们。
数据并行
在本节中,我们将探讨数据并行理论及其并行执行的挑战,以及使用PyTorch、TensorFlow和Horovod的示例训练代码。
理解数据并行化
数据并行化涉及一组训练设备共同处理一个大型数据集。通过让每个设备处理数据集的一个子集,我们可以大大减少训练时间。 同步数据并行是最常用的数据并行方法。它将模型网络复制到训练组中的每个设备上,无论是 GPU 还是 CPU。数据集被分成小批次,并将这些批次分发到所有设备上(CPU 或 GPU)。训练步骤同时进行,在每个设备上使用不同的小批次;因此,设备本身充当其自己的数据分区。在计算梯度以更新神经网络时,算法通过从每个设备聚合梯度来计算最终梯度。然后,它将聚合的梯度分发回每个设备,以更新其本地神经网络。尽管每个设备上的训练数据集不同,但这些设备本地的神经网络是相同的,因为它们在每个训练迭代中使用相同的梯度进行更新。因此,这个过程被称为同步数据并行。
您可以在图 4.1 中可视化此过程。图 (a) 在左侧显示了在单个 GPU 上进行深度学习训练的过程,图 (b) 在右侧显示了使用三个 GPU 进行同步数据并行训练的设置。 通过比较图 (a) 和 (b),您可以看到同步数据并行相对于单设备训练引入了两个额外的步骤。第一个额外的步骤是将一个训练批次分成三个小批次,以便每个设备可以处理自己的小批次。第二个步骤是同步从所有设备聚合的梯度,以便它们在更新本地模型时使用相同的梯度。 注意:要聚合不同工作器计算的梯度,可以使用 all-reduce 算法。这是一种流行的算法,可以独立地将所有进程的数据数组组合成一个数组。在“使用 PyTorch 编写分布式应用程序”(pytorch.org/tutorials/i…)中,您可以找到PyTorch 如何支持 all-reduce 算法的示例。 从实现的角度来看,数据并行化对于单设备模型训练过程的改变很小。它的主要开销在于添加梯度聚合的同步步骤。
模型参数更新:同步 vs. 异步
在数据并行化中,有两种关于在工作器之间聚合梯度的思路:同步更新和异步更新。让我们详细了解每种方法的工作原理、优点和缺点,以便您可以根据自己的需求进行选择:
- 同步模型更新:如图4.1所示,同步模型更新在梯度同步步骤中暂停训练迭代,直到所有设备接收到聚合的梯度。然后继续下一步,更新模型参数。这样,所有设备在每次训练迭代中同时得到相同的梯度更新,确保每个工作器的模型在同一页面上。同步模型更新的问题很明显:在梯度在工作器之间同步时,训练迭代被阻塞,因此没有一个工作器可以开始处理下一个小批量的数据。如果存在一些慢速机器或网络问题,整个分布式工作组都会被阻塞,而速度较快的工作器则处于空闲状态。
- 异步模型更新:相比之下,异步模型更新方法不强制每个训练设备或工作器等待接收来自其他设备的梯度。相反,只要一个设备完成计算梯度,它就立即更新本地模型,而无需检查其他设备。每个设备都独立工作,尽管它的梯度仍然需要复制到每个设备,但不需要同步这些更新。异步方法可能看起来非常吸引人;它简单,并且比同步方法每分钟可以运行更多的训练步骤。异步方法的缺点是训练时间较长,生成的模型比同步模型更新方法的准确性较低。
在使用异步方法时,不同设备上的梯度是独立计算的。某些机器运行得更快,而其他机器运行得更慢;因此,这些梯度可以来自每个设备的不同训练迭代。因此,不能保证聚合的梯度将指向最优方向。例如,假设梯度来自慢速机器计算的第5个训练迭代,而其他更快的机器已经进入第20个训练迭代。当我们聚合所有工作器的梯度时,较低迭代的梯度会应用于较高迭代的梯度;这会降低梯度的质量。
此外,异步方法通常收敛较慢,准确性损失较高。因此,现今大多数数据并行化库都使用同步模型更新。在本章中,当提及数据并行化及其代码实现时,我们指的是同步数据并行化。
数据集和模型的内存限制
在深度学习中,数据集和模型在训练期间占用了计算实例的大部分内存。如果训练数据或神经网络(模型)超出了本地设备的内存限制,训练过程将因内存不足(OOM)错误而中断。数据并行化旨在提高训练速度,而不是解决内存限制问题。
对于由加载数据集引起的内存不足(OOM),我们可以减小训练数据的批量大小,这样训练过程在每个训练循环中加载的数据量就较小。在数据并行化的上下文中,我们需要确保小批量训练数据可以适应每个工作设备的内存。
对于由模型大小引起的内存不足(OOM),我们需要采用模型并行化或管道并行化(参见第4.4节)。数据并行化涉及多个训练设备在一个大型数据集上共同工作。通过使每个设备处理数据集的子集,我们可以大大减少训练时间。
同步模型更新是最常用的数据并行化方法。它将模型网络复制到训练组中的每个设备上,无论是 GPU 还是 CPU。数据集被分成小批次,并将这些批次分发到所有设备上(无论是 CPU 还是 GPU)。训练步骤同时进行,在每个设备上使用不同的小批次;因此,设备本身充当其自己的数据分区。在计算梯度以更新神经网络时,算法通过从每个设备聚合梯度来计算最终梯度。然后,它将聚合的梯度分发回每个设备,以更新其本地神经网络。尽管每个设备上的训练数据集不同,但这些设备本地的神经网络是相同的,因为它们在每次训练迭代中使用相同的梯度进行更新。因此,这个过程被称为同步模型更新。
您可以在图 4.1 中可视化这个过程。图 (a) 在左侧显示了在单个 GPU 上进行深度学习训练的过程,图 (b) 在右侧显示了使用三个 GPU 进行同步数据并行训练的设置。
通过比较图 (a) 和 (b),您可以看到同步数据并行化相对于单设备训练引入了两个额外的步骤。第一个额外的步骤是将一个训练批次分成三个小批次,以便每个设备可以处理自己的小批次。第二个步骤是同步从所有机器聚合的梯度,以便它们在更新本地模型时使用相同的梯度。
注意:要聚合不同工作器计算的梯度,可以使用 all-reduce 算法。这是一种流行的算法,可以独立地将所有进程的数据数组组合成一个数组。在“使用 PyTorch 编写分布式应用程序”(pytorch.org/tutorials/i… PyTorch 如何支持 all-reduce 算法的示例。
从实现的角度来看,数据并行化对于单设备模型训练过程的改变很小。它的主要开销在于添加梯度聚合的同步步骤。
在深度学习中,数据集和模型占据了计算实例的大部分内存。如果训练数据或神经网络(模型)超出了本地设备的内存限制,训练过程将因内存不足(OOM)错误而中断。数据并行化旨在提高训练速度,而不是解决内存限制问题。
对于由加载数据集引起的内存不足(OOM),我们可以减小训练数据的批量大小,这样训练过程每个训练循环中加载的数据量就较小。在数据并行化的上下文中,我们需要确保小批量训练数据可以适应每个工作设备的内存。
对于由模型大小引起的内存不足(OOM),我们需要采用模型并行化或管道并行化(参见第4.4节)。数据并行化在单设备上的模型大小超过内存限制时无法正常工作。
多工作器训练的挑战
容错性和带宽饱和度是我们作为软件开发人员在执行数据并行代码时需要解决的两个挑战。解决这两个挑战对于降低数据并行分布式训练的运营成本和提高训练性能至关重要。
容错性
我们不希望整个分布式训练组在一个工作器意外失败时也随之失败。这不仅会影响服务的可用性,还会增加训练成本,因为如果一个工作器失败,其他所有工作器的努力都将白费。
为了提高容错性,我们可以在每个工作器的远程文件系统中保留每个训练步骤的训练状态(即模型参数)。然后,如果一个工作器失败或花费太长时间来完成一个训练迭代,我们可以重新启动该工作器并加载其最近的先前状态。
TensorFlow和PyTorch框架都具备备份和恢复功能。作为训练服务的开发人员,我们可以设置远程磁盘或备份存储系统,并将访问配置传递给训练容器。然后,在训练过程中,训练代码可以使用外部文件系统来备份或恢复状态。
带宽饱和度
向分布式训练组中添加更多的GPU和机器并不总是会提高性能。无论我们使用同步还是异步模型更新,算法都必须在每个训练迭代的末尾在训练工作器之间传递梯度或模型参数。将数据移入和移出GPU内存以及在网络中传输数据所花费的时间最终将超过通过拆分训练工作负载获得的加速效果。
因此,存在一个并行实例数量的上限,超过这个上限,数据并行性将达到其性能峰值。这个上限由模型参数的数量和模型的密度(模型权重中的非零值数量)决定。如果是一个参数和梯度传输较多的大型密集模型,它的饱和度将高于较小的模型或大型稀疏模型。
有一些推荐的并行实例数量,例如对于神经机器翻译的8个GPU可以实现6倍加速,对于ImageNet模型的50个GPU可以实现32倍加速。但是,我们需要通过自己的实验来确定最佳性能点,因为GPU和模型架构都在快速演进,标准推荐很快就会过时。作为平台开发人员,除了选择合适数量的并行工作器之外,我们还有三种降低带宽饱和度的方法。
首先,我们可以将并行工作器(即容器或Pod)分组到较少的机器上,以减少网络跳数。例如,在Kubernetes中,您可以使用节点选择器和亲和性规则(mng.bz/qo76)来在几个选定的服务器上分配训练实例(Kubernetes Pod),这些服务器具有更好的网络和更强大的计算能力。
第二个选择是始终升级训练镜像以使用最新版本的训练框架。流行的框架如PyTorch、TensorFlow等不断演进,以减少在分布式训练中在网络中传输的数据量。密切关注发布说明并利用这些改进。
最后,不要低估在初始化分布式组时进行小的调整所带来的收益。以PyTorch为例,PyTorch的数据并行库将神经网络参数梯度分割成桶(bucket),然后在梯度同步步骤中将这些桶发送给工作器。桶的大小决定了在不同设备之间传输的数据量。因此,通过选择适当的桶大小,我们可以在设备饱和度和网络饱和度之间找到一个最佳点,从而达到最佳的训练速度。可以在PyTorch分布式数据并行(DDP)组件的构造函数中配置桶的大小(mng.bz/7ZB7)。
为不同的训练框架编写分布式训练(数据并行)代码
在本节中,您将看到三种训练框架(TensorFlow、PyTorch和Horovod)中用于数据并行分布式训练的一些训练代码片段。如果这里的代码样本很难理解,不要担心。目的是体验数据科学家在处理分布式训练时的方式,以便让您对训练服务如何支持分布式训练有所了解。
PYTORCH
PyTorch框架有一个DDP(DistributedDataParallel)库,它在模块级别实现了数据并行。DDP包装了模型对象,使其可以在多台机器上无缝运行。它的训练过程可以在同一台机器上或跨多台机器上进行。
要将单设备/进程的训练代码转换为数据并行的分布式训练代码,我们需要进行以下两个修改。首先,我们必须通过允许每个训练进程向主进程注册自己来初始化训练组。其中一个进程将自称为主进程,其他进程将自称为工作进程。每个训练进程在此注册阶段将处于等待状态,直到所有工作进程加入分布式组。
要注册一个进程,我们需要知道训练进程的总数(world_size),该进程的唯一ID(rank),以及主进程的地址(在环境变量中定义MASTER_ADDR和MASTER_PORT)。代码示例如下:
def setup(rank, world_size):
os.environ['MASTER_ADDR'] = 'xxxx'
os.environ['MASTER_PORT'] = 'xxx'
# initialize the process group, "gloo" is one of the communication
# backends Pytorch supports, it also supports MPI and NCCL.
# rank is the process’s rank, it's a globally unique id
# for this process. rank=0 means master process.
# world_size is the total number of processes in this training group.
dist.init_process_group("gloo", rank=rank, world_size=world_size)
def cleanup():
dist.destroy_process_group()
其次,我们使用DDP类来包装模型对象。PyTorch的DDP类将处理分布式数据通信、梯度聚合和本地模型参数更新:
import torch.distributed as dist
from torch.nn.parallel import DistributedDataParallel as DDP
# create model and move it to GPU
model = DpModel().to(device)
# wrap the model with DDP
ddp_model = DDP(model, device_ids=[rank])
outputs = ddp_model(data)
# compute the loss and sync gradient with other workers.
# when 'backward' function returns, the param.grad already
# contains synchronized gradient tensor
loss_fn(outputs, labels).backward()
对于高级用例,PyTorch库提供了API,让您可以在较低层次上实现自己的梯度同步函数。您可以在官方教程“使用PyTorch编写分布式应用程序”(mng.bz/m27W)中了解详细信息。
TENSORFLOW/KERAS
TensorFlow以非常类似的方式支持分布式训练;它首先定义一个分布式训练策略(如MultiWorkerMirroredStrategy),然后使用该策略初始化模型。为了让策略识别分布式组中的工作进程,我们需要在每个训练进程中定义一个TF_CONFIG环境变量。TF_CONFIG包含工作进程的唯一ID和组中所有其他工作进程的地址。以下是代码示例:
# Step 1: define 'TF_CONFIG' environment variable to describe
# the training group and the role for the process.
# The worker array defines the IP addresses and ports of
# all the TensorFlow servers used in this training.
tf_config = {
'cluster': {
'worker': ['192.168.4.53:12345', '192.168.4.55:23456']
},
# A 'task' provides information of the current task and is
# different for each worker. It specifies the 'type' and
# 'index' of that worker.
'task': {'type': 'worker', 'index': 0}
}
os.environ['TF_CONFIG'] = json.dumps(tf_config)
# Step 2: define distributed training strategy,
# the MultiWorkerMirroredStrategy takes
# care of the synchronous data parallel distributed training.
strategy = tf.distribute.MultiWorkerMirroredStrategy()
global_batch_size = per_worker_batch_size * num_workers multi_worker_dataset = mnist.mnist_dataset(global_batch_size)
# Step 3: start the distributed training.
with strategy.scope():
# Model building/compiling need to be within 'strategy.scope()'.
multi_worker_model = mnist.build_and_compile_cnn_model()
multi_worker_model.fit(multi_worker_dataset,epochs=3, steps_per_epoch=70)
**HOROVOD **
Horovod是一个单一用途的分布式框架。与TensorFlow和PyTorch可以用于数据处理、模型训练和模型服务等多种任务不同,Horovod只专注于一个任务:使分布式深度学习训练快速且易于使用。 Horovod最大的优势是它可以与不同的训练框架配合使用,如TensorFlow、Keras、PyTorch和Apache MXNet。因此,我们可以以一种方式(即Horovod方式)配置训练集群,以运行PyTorch、TensorFlow和其他框架的分布式训练。在这里,我们只列出了使用Horovod与TensorFlow和PyTorch的两个代码示例,但您可以在Horovod的网站上查看其他框架的示例。
让我们来看一下TensorFlow的示例。要设置数据并行的分布式训练,首先我们初始化Horovod训练组,它会自动查找集群中的其他Horovod节点。接下来,我们将0号(主工作节点)的初始变量状态广播给所有其他进程。这将确保所有工作节点的初始状态一致。然后,我们使用分布式梯度记录器来包装梯度记录器,它会在所有工作节点上平均梯度。其余的代码只是正常的TensorFlow训练代码。请参考以下代码(github.com/horovod/hor…):
hvd.init()
.. .. ..
@tf.function
def training_step(images, labels, first_batch):
with tf.GradientTape() as tape:
probs = mnist_model(images, training=True)
loss_value = loss(labels, probs)
# Wrap tape with Horovod Distributed GradientTape.
# This gradient tape averages gradients from all
# workers by using allreduce or allgather, and then
# applies those averaged gradients back to the local model.
tape = hvd.DistributedGradientTape(tape)
grads = tape.gradient(loss_value, mnist_model.trainable_variables) opt.apply_gradients(zip(grads, mnist_model.trainable_variables))
# Broadcast initial variable states
# from rank 0 to all other processes.
if first_batch:
hvd.broadcast_variables(mnist_model.variables, root_rank=0)
hvd.broadcast_variables(opt.variables(), root_rank=0)
return loss_value
for batch, (images, labels) in \
enumerate(dataset.take(10000 / hvd.size())):
loss_value = training_step(images, labels, batch == 0)
.. .. ..
# save checkpoints only on worker 0 to
# prevent other workers from corrupting it.
if hvd.rank() == 0:
checkpoint.save(checkpoint_dir)
以下代码是使用Horovod与PyTorch的示例。PyTorch Horovod的一些API与TensorFlow不同,例如hvd.DistributedOptimizer与hvd.DistributedGradientTape。但是这些API都来自同一个Horovod SDK,在底层共享相同的工作节点间机制。让我们看一下PyTorch的代码片段:
# Horovod: initialize Horovod.
import torch
import horovod.torch as hvd
# Initialize Horovod
hvd.init()
.. .. ..
# Build model...
model = ...
optimizer = optim.SGD(model.parameters())
# Add Horovod Distributed Optimizer, this is equal
# to hvd.DistributedGradientTape(tape)
# for Tensorflow2
optimizer = hvd.DistributedOptimizer(optimizer,named_parameters=model.named_parameters())
# Broadcast parameters from rank 0 to
# all other processes.
hvd.broadcast_parameters(model.state_dict(),root_rank=0)
for epoch in range(100):
for batch_idx, (data, target) in enumerate(train_loader):
optimizer.zero_grad()
output = model(data)
loss = F.nll_loss(output, target)
loss.backward()
optimizer.step()
.. .. ..
虽然这两个代码片段中的模型定义在两个不同的框架(TensorFlow 2和PyTorch)中,但我们可以看到它们都使用相同的Horovod SDK来运行分布式训练。这里的好处是我们可以使用一个标准方法(Horovod方法)来设置训练集群中的分布式工作组,并且它仍然适用于使用不同训练框架编写的训练代码。
关于训练代码,有两个要点需要强调:
- 尽管本节中的代码示例使用不同的框架和不同的API实现了分布式训练,但代码都遵循了第4.2.1节中描述的相同的数据并行主义范式。也就是说,代码总是(1)为每个并行训练过程设置通信组,并(2)配置模型对象以在所有工作节点上聚合梯度。因此,作为开发人员,我们可以使用统一的方法来设置和管理不同训练框架的分布式训练过程。
- 将模型训练代码从单设备训练扩展到数据并行分布式训练的工作相对较小。如今,分布式训练框架/SDK非常强大,我们不需要实现数据并行的每个细节,例如在网络上同步梯度的梯度同步。训练框架和SDK会处理这些过程,使其无缝运行。分布式数据并行训练代码与单设备训练代码几乎相同,除了在配置训练组时有所不同。
在数据并行分布式训练中的工程投入
因此,在生产环境中启用数据并行分布式训练的工作是什么样的呢?首先,它需要数据科学家和服务开发人员之间的共同工程努力。对于数据科学家来说,他们需要将单设备训练代码升级为可以分布式运行的代码,使用前面部分中的代码片段等。同时,服务开发人员必须增强训练服务,以自动设置分布式工作组,以实现分布式训练。
为了使训练服务对用户友好,服务应该整合不同的分布式训练框架的设置细节。因此,数据科学家只需定义他们所需的训练的并行实例数量。
让我们以TensorFlow分布式训练为例。根据我们在4.2.3节中的讨论,每个设备上的TensorFlow训练代码必须将tf_config(如下面的示例所示)作为环境变量。因此,训练过程中的TensorFlow分布式库知道如何与其他训练进程进行通信:
tf_config = { 'cluster': {
'worker': ['192.168.4.53:12345', '192.168.4.55:23456'] },
# A 'task' provides information of the current task
# and is different for each worker. It specifies
# the 'type' and 'index' of that worker.
'task': {'type': 'worker', 'index': 0}
}
从可用性的角度来看,我们不能指望数据科学家为每个分布式训练进程找到设置值 – 服务器IP地址和任务索引,尤其是如果整个训练组是动态配置的。一个训练服务应该自动创建一组计算资源用于分布式训练请求,使用正确的IP地址初始化分布式训练库,并启动训练过程。
图4.2是支持分布式训练的训练服务的概念图。从图中可以看到,数据科学家Alex发送一个训练请求以启动分布式训练。然后,由服务开发人员Tang构建的服务生成两台工作机器,并分布执行训练代码。除了准备训练代码外,Alex还可以指定训练运行的配置,例如并行工作人数和分布式训练框架的类型(TensorFlow、PyTorch或Horovod)。
让我们慢慢地浏览一下这个图表,以更好地理解系统是如何设置的,每个人的职责是什么。我们可以看到,作为工程师的Tang需要进行三项改进-在图4.2中标为1、2和3-将训练服务从单设备训练器(正如我们在第3章中看到的)转变为数据并行的分布式训练器。
第一步是更新训练服务,以在运行时根据需求构建分布式训练组。当服务接收到分布式训练的请求时,它会从训练集群中分配多个工作节点给训练任务,并将训练代码分发给每个工作节点。 第二步是以编程方式为每个训练进程初始化正确的服务器IP、端口号和训练进程ID。这确保了分布式库(如TensorFlow等框架)有足够的信息来为训练组设置节点间通信。正如我们在前面的章节中所看到的,每个分布式训练框架的设置配置都有所不同。训练服务应该知道如何为不同的框架设置节点间通信,这样数据科学家就只需要关注算法的开发,而不用担心底层基础设施。 第三步是提供远程存储来备份和恢复每个工作节点的训练状态。在分布式训练中,如果一个工作节点失败,整个训练组都会失败,大量计算资源将被浪费。因此,让分布式训练组具有从硬件故障或网络问题中恢复的能力非常重要。通过提供远程存储和备份API,分布式训练进程可以在每个训练迭代后保存其训练状态(神经网络)。当训练过程在训练中间失败时,它可以恢复到之前的状态并重新开始,整个训练组可以继续进行。 注意:如果你想了解更多关于数据并行的内容,你可以参考以下两篇文章:O’Reilly的博客文章《分布式TensorFlow:通过使用多个GPU服务器,减少神经网络的实验时间和训练时间》(作者:Jim Dowling,链接:www.oreilly.com/content/dis… uted-tensorflow/)和Google Brain的论文《重访分布式同步SGD》(作者:Chen等,链接:arxiv.org/pdf/1604.00…)。
一个支持数据并行分布式训练的示例服务
在本节中,我们将扩展前一章节(第3.3节)介绍的示例服务,以支持数据并行分布式训练。
服务概述
与第3.3节中讨论的单设备训练相比,用户工作流程保持不变。数据科学家Alex首先构建模型训练代码,并向训练服务发送训练请求。然后,服务运行实际的训练过程,并在最后生成模型。
然而,有一些关键的区别。首先,Alex将意图分类训练代码升级,使其可以在单个设备和多个设备上运行。其次,服务开发人员Tang修改了训练服务的API,提供了一个新的参数PARALLEL_INSTANCES。这个参数允许Alex定义他的分布式训练任务的工作组大小。
为了正确管理服务器集群,我们需要Kubernetes的帮助。Kubernetes可以帮助我们减少在工作资源分配和工作节点间通信方面的工作量。因此,我们引入了一个新的组件——Kubernetes作业跟踪器,用于在Kubernetes中管理训练作业。你可以在图4.3中看到更新后的服务设计图和用户工作流程。
图4.3(a)重复了我们在第3.3节中讨论的训练服务的系统图,该系统使用Docker作业跟踪器在Docker引擎中运行训练作业。图4.3(b)展示了更新后的训练服务,现在支持分布式训练,包括Kubernetes和Docker引擎后端。添加了Kubernetes作业跟踪器,用于在Kubernetes集群中运行分布式训练作业。该组件通过启动Kubernetes Pod来执行训练作业,并在内存存储中监视和更新作业执行状态。
我们还对意图分类的PyTorch训练代码进行了一些更改,使其能够进行分布式训练。我们将在4.3.5节中简要介绍这些更改。
一个很大的时间节省是,我们不需要改变已经创建的服务API接口(第3.3.3节)。我们的用户可以在Docker引擎和Kubernetes集群中使用相同的API来训练模型。这符合训练服务的第一原则,即使用统一的API,并使其与后端实现无关。
玩转服务
首先,让我们使用Kubernetes后端运行训练服务,请参考以下命令(scripts/ts-001-start-server-kube.sh):
$ docker build -t orca3/services:latest -f services.dockerfile .
$ docker run --name training-service -v \
$HOME/.kube/config:/.kube/config --env \
APP_CONFIG=config-kube.properties \
--network orca3 --rm -d -p
"${TS_PORT}":51001
orca3/services:latest training-service.jar
注意:本节仅包含运行示例服务所需的主要步骤和关键命令。因此,可以清晰地演示概念,而无需冗长的代码和执行输出。如果您想在本节中运行实验,请按照orca3/MiniAutoML git存储库中的“Distributed trainer training demo”(github.com/orca3/MiniAutoML/blob/main/training-service/distributed_trainer_demo.md)文档中的说明进行操作。
一旦训练服务容器正在运行,我们就可以提交一个训练的gRPC请求。虽然服务现在运行在Kubernetes后端,但训练API仍然保持不变。与我们在Docker后端演示中发送的训练请求(参见3.3.1节)相比,在请求有效负载中只添加了一个额外的参数PARALLEL_INSTANCES=3。这告诉训练服务创建一个由三个工作节点组成的分布式训练组来训练模型。如果将此参数设置为1,则为单设备训练请求。请参考以下代码片段,以使用三个并行实例提交分布式训练请求(scripts/ts-004-start-parallel-run.sh 1):
# submit a distributed training request
$ grpcurl -plaintext -d "{ "metadata":
{ "algorithm":"intent-classification",
"dataset_id":"1",
"Name":"test1",
"train_data_version_hash":"hashBA==",
"Parameters":{
"LR":"4","EPOCHS":"15",
"PARALLEL_INSTANCES": "3" ,
"BATCH_SIZE":"64","FC_SIZE":"128"}}
}"
${TS_SERVER}:${TS_PORT}
training.TrainingService/Train
检查训练执行进度的方法是使用GetTrainingStatus API:
grpcurl -plaintext -d "{"job_id": "$1"}"
${TS_SERVER}:"${TS_PORT}"
training.TrainingService/GetTrainingStatus
除了查询训练服务API以获取作业执行状态外,我们还可以在Kubernetes中检查训练进度。通过使用Kubernetes命令kubectl get all,我们可以看到在本地Kubernetes环境中创建了三个工作器(worker)Pod。其中一个是主工作器(master worker),另外两个是普通的工作器(normal workers)。还创建了一个名为intent-classification-1-master-service的Kubernetes服务对象,用于主工作器(master pod)与工作器(worker pod)之间的网络连接。以下是代码片段:
# check Kubernetes resources status.
# We could see a distributed training group contains
# with three pods and one service are created in Kubernetes
$ kubectl get all -n orca3
NAME READY STATUS
pod/intent-classification-1-1-worker 0/1 Completed
pod/intent-classification-1-2-worker 0/1 Completed
pod/intent-classification-1-master 0/1 Completed
NAME TYPE .. ..
service/intent-classification-1-master-service ClusterIP
启动训练作业
现在,让我们看看使用Kubernetes后端启动训练作业的工作流程。当接收到一个训练请求时,请求将被添加到作业队列中。同时,Kubernetes作业跟踪器会监视作业队列。当跟踪器发现等待的作业并且系统有可用容量时,它将开始处理这些作业。
为了启动一个PyTorch分布式训练作业,跟踪器首先创建所需数量的Kubernetes pod。每个pod托管一个训练进程。跟踪器还向每个pod传递单独的参数,然后将作业从作业队列移动到启动列表中(图4.4)。
在图4.4中,Kubernetes作业跟踪器可以处理单设备训练和分布式训练。它为单设备训练创建一个Kubernetes pod,并为分布式训练创建多个pod。 Kubernetes作业跟踪器类似于Docker作业跟踪器,它运行一个训练pod。它将所有用户定义的参数封装在环境变量中,并将它们传递给Kubernetes pod。
为了设置具有多个pod的PyTorch分布式训练,服务处理了另外两个功能。首先,它创建了一个Kubernetes服务对象,用于与主pod进行通信。
根据PyTorch分布式训练算法部分(4.2.3)的内容,我们知道每个PyTorch训练进程都需要主进程(pod)的IP地址来初始化分布式训练组。例如,每个PyTorch代码在训练逻辑开始之前需要添加以下代码片段:
def setup(rank, world_size):
os.environ['MASTER_ADDR'] = 'xxx xxx xxx xxx'
os.environ['MASTER_PORT'] = '12356'
dist.init_process_group("gloo",rank=rank, world_size=world_size)
但是在Kubernetes中,Pod是一种短暂的资源,因此我们不能依赖Pod的IP地址来定位Pod。相反,我们使用Kubernetes的域名服务(DNS)作为永久地址来定位Pod。即使Pod在不同的节点被销毁和重建,IP也发生了变化,我们仍然可以使用相同的DNS来访问它。因此,为了启用训练组的初始化,我们首先为主Pod创建一个Kubernetes服务,然后将该DNS地址传递给所有的工作Pod作为主Pod的地址。
其次,它向每个Pod传递了四个环境变量。每个训练Pod所需的四个变量是WORLD_SIZE、RANK、MASTER_ADDR和MASTER_PORT:
- WORLD_SIZE表示训练组中Pod的总数,包括主Pod和工作Pod。
- RANK是一个训练过程的唯一ID;主进程的rank必须为0。
- MASTER_ADDR和MASTER_PORT定义了主进程的主机地址和端口号,以便每个工作进程可以使用它们来访问主Pod。
例如,在使用三个实例进行分布式训练时,我们为每个Pod创建了三个环境变量(一个主Pod,两个工作Pod):
Master Pod:
WORLD_SIZE:3; RANK:0,
MASTER_ADDR: intent-classification-1-master-service,
MASTER_PORT: 12356
Worker Pod 1:
WORLD_SIZE:3; RANK:1,
MASTER_ADDR: intent-classification-1-master-service,
MASTER_PORT: 12356
Worker Pod 2:
WORLD_SIZE:3; RANK:2,
MASTER_ADDR: intent-classification-1-master-service,
MASTER_PORT: 12356
鉴于所有的解释,让我们来看一下实际代码是如何实现的。以下清单突出了在Kubernetes中启动分布式训练的实现方式。
protected List<String> launchTrainingPods(int jobId, int worldSize, TrainingJobMetadata metadata, .. ..) {
.. .. ..
// It's a distributed training if the worldSize is greater than 1.
if (worldSize > 1) {
// .. .. ..
api.createNamespacedService(
config.kubeNamespace, serviceBody,null, null, null);
serviceTracker.add(masterServiceName);
logger.info(String.format("Launched master service %s",
masterServiceName));
.. .. ..
}
// create training pods definition
for (int rank = 0; rank < worldSize; rank++) {
envs.put("WORLD_SIZE", Integer.toString(worldSize)); // RANK 0 is master
envs.put("RANK", Integer.toString(rank));
envs.put("MASTER_ADDR", masterPodDnsName);
envs.put("MASTER_PORT", Integer.toString(masterPort));
V1PodSpec podSpec = new V1PodSpec()
.restartPolicy("Never")
.addContainersItem(new V1Container()
.image(algorithmToImage(metadata.getAlgorithm()))
.env(envVarsToList(envs)) .. .. ..
String workerPodName = rank == 0 ? masterPodName :String.format("job-%d-%d-%s-worker-%d", jobId,now, metadata.getName(), rank);
V1Pod workerPodBody = new V1Pod();
workerPodBody.apiVersion("v1");
.. .. ..
// (3)
api.createNamespacedPod(config.kubeNamespace,workerPodBody, null, null, null);
.. .. ..
}
return podNames;
}
您可能注意到,在这个示例中创建的Kubernetes的Pod和服务是为PyTorch分布式训练库定制的。实际上,这个示例服务并不限于PyTorch。为了支持其他框架编写的训练代码,比如TensorFlow 2,我们可以扩展Kubernetes作业跟踪器,以支持TensorFlow分布式训练的设置。
例如,我们可以收集所有worker pod的IP地址或DNS,并将它们组合起来,然后将它们广播回每个worker pod。在广播过程中,我们将worker组信息设置到每个pod的TF_CONFIG环境变量中,以启动分布式训练组。TF_CONFIG环境变量是TensorFlow分布式库的特殊要求。
更新和获取作业状态
创建训练 Pod 后,Kubernetes 作业跟踪器将继续查询 Pod 的执行状态,并在状态发生变化时将作业移动到其他作业列表中。例如,如果 Pod 成功创建并开始运行,跟踪器将作业从启动列表移动到运行列表中。如果 Pod 执行完成,跟踪器将作业从运行列表移动到已完成作业列表中。图 4.5 描述了这个过程。
当用户提交作业状态查询时,训练服务将在内存存储中的四个作业队列中搜索作业 ID,并返回作业对象。有趣的是,尽管存在多个训练 Pod,我们只需要检查主 Pod 的状态来跟踪分布式训练的进度。这是因为在同步数据并行训练中,所有的工作节点在每个训练周期都需要进行同步,所以主 Pod 可以代表其他工作 Pod。
查询和更新作业执行状态的代码与我们在第3.3.5节中看到的 Docker 作业跟踪器非常相似。唯一的区别是我们查询 Kubernetes 集群而不是 Docker 引擎来获取训练状态。我们留下代码供您探索;您可以在 KubectlTracker 类的 updateContainerStatus 方法中找到它。
将训练代码转换为分布式运行
我们对意图分类训练代码(在前一章节中介绍的第3.3.6节)进行了两处更改,以支持分布式模式和单设备模式。
第一个更改:初始化训练组
我们使用WORLD_SIZE环境变量来检查训练代码是否应该在分布式训练中运行。如果world size等于1,则使用我们在第3.3.6节中看到的相同的单设备训练代码。
但是如果值大于1,则初始化训练过程以加入分布式组。请注意,从训练服务(Kubernetes作业跟踪器)传递了每个pod的唯一RANK值,这对于分布式组的初始化是必需的。在自我注册到分布式组后,我们还声明了模型和数据采样器的分布式方式。以下是更改的代码示例:
def should_distribute():
return dist.is_available() and config.WORLD_SIZE > 1
def is_distributed():
return dist.is_available() and dist.is_initialized()
if should_distribute():
# initialize the distributed process group,
# wait until all works are ready.
dist.init_process_group("gloo",rank=config.RANK, world_size=config.WORLD_SIZE)
if is_distributed():
# wrap the model with DistributedDataParallel (DDP)
# package to enable data parallel training.
model = DDP(model)
if is_distributed():
# restricts data loading to a subset of the dataset
# exclusive to the current process
train_sampler = DistributedSampler(dataset=split_train_, num_replicas=config.WORLD_SIZE, rank=config.RANK)
第二个更改:只允许主节点(rank = 0)上传最终模型
第二个更改:仅允许主节点(rank = 0)上传最终模型。这是为了防止每个工作节点多次上传相同的模型:
if config.RANK == 0:
accu_test = evaluate(test_dataloader)
.. .. ..
# upload model to metadata store.
artifact = orca3_utils.create_artifact(Rank 0 is the master pod.
config.MODEL_BUCKET, config.MODEL_OBJECT_NAME)
.. .. ..
改进
如果我们继续将这个示例服务推向生产环境,我们可以按照第4.2.2节中的思路来改进容错性并减少网络带宽饱和度。我们还可以扩展Kubernetes作业跟踪器以支持TensorFlow和Horovod的分布式训练。从训练服务的角度来看,它们并没有太大的区别,因为训练服务传递给训练代码的配置是非常通用的;这些信息对于所有框架都是必需的,只是名称不同而已。只要训练服务和训练代码之间的协议清晰稳定,我们仍然可以将训练代码视为黑盒,即使在分布式环境中也是如此。
训练无法在单个GPU上加载的大型模型
神经网络的大小(由参数数量定义)在研究领域中迅速增长,我们不能忽视这一趋势。以ImageNet挑战为例,2014年的冠军(GoogleNet)有400万个参数;2017年的冠军(Squeeze-and-Excitation Networks)有1.458亿个参数;当前的领先方法拥有超过10亿个参数。
虽然我们的神经网络大小增长了近300倍,但GPU内存仅增加了4倍。在未来,我们会经常遇到无法训练模型的情况,因为它无法加载到单个GPU上。
在本节中,我们将讨论训练大型模型的常见策略。与第4.2节中介绍的数据并行策略不同,这里介绍的方法需要在训练代码上付出努力的工作。
注意:虽然本节介绍的方法通常由数据科学家实施,但我们希望您仍然能够理解它们。了解这些训练技术背后的策略对于设计训练服务和训练代码之间的通信协议非常有帮助。它还为训练服务中的故障排除或性能微调提供了洞察力。为了简化起见,我们只会以概念层面描述算法,并侧重从工程角度看所需的工作。
传统方法:内存节省
让我们假设您的数据科学团队希望训练一个可以加载到训练集群中最大的GPU上的模型,例如,他们想在10GB内存的GPU上训练一个24GB的BERT模型。在这种情况下,团队可以使用几种节省内存的技术来训练模型,包括梯度累积和内存交换。这些工作通常由数据科学家来实现。作为平台开发人员,您只需要了解这些选项即可。我们将简要介绍它们,以便您知道何时建议使用每种方法。
注意:还有其他几种节省内存的方法,例如OpenAI的梯度检查点技术(github.com/cybertronai…)和NVIDIA的vDNN(arxiv.org/abs/1602.08…),但由于本书不涉及深度学习算法,我们将留给独立研究。
梯度累积
在深度学习训练中,数据集被分成批次。在每个训练步骤中,为了计算损失、计算梯度和更新模型参数,我们一次性将整个批次的示例(训练数据)加载到内存中,并进行计算。
通过减小批次大小,我们可以减轻内存压力,例如,将批次大小从32减小到16。但是减小批次大小可能会导致模型收敛速度变慢。这时,梯度累积就可以发挥作用。
梯度累积将批次示例分成可配置数量的小批次,然后在每个小批次之后计算损失和梯度。但是,它不会立即更新模型参数,而是等待并累积所有小批次的梯度。最后,它根据累积的梯度更新模型参数。
让我们通过一个示例来看看这如何加速过程。假设由于GPU内存限制,我们无法使用批次大小为32进行训练。通过梯度累积,我们可以将每个批次分成四个小批次,每个小批次大小为8。因为我们累积了所有四个小批次的梯度,并且只在全部完成后更新模型,所以这个过程几乎等同于使用批次大小为32进行训练。不同之处在于,我们在GPU中一次只计算8个示例,而不是32个,因此与32个批次相比,速度慢了4倍。
内存交换(GPU和CPU)
内存交换方法非常简单:它在CPU和GPU之间来回复制激活(activation)。如果您对深度学习术语不熟悉,可以将激活视为神经网络每个节点的计算输出。其思想是仅在当前计算步骤中保留所需的数据在GPU上,并将计算结果交换到CPU内存中供将来的步骤使用。
基于这个思想,一种名为L2L(从层到层)的新中继式执行技术仅将执行层和中间缓冲区保留在GPU上。整个模型和保存状态的优化器存储在CPU空间中。L2L可以大大提高GPU的吞吐量,并允许我们在可承受的设备上开发大型模型。如果您对这种方法感兴趣,可以查看Pudipeddi等人的论文《Training Large Neural Networks with Constant Memory Using a New Execution Algorithm》(arxiv.org/abs/2002.05…),该论文还在GitHub上提供了PyTorch的实现。
梯度累积和内存交换都是在较小的GPU上训练大型模型的有效方法。但是,像大多数事物一样,它们也有一个代价:它们往往会减慢训练速度。由于这个缺点,我们通常只在原型验证阶段。
为了获得可行的训练速度,我们确实需要在多个GPU上进行分布式训练。因此,在下一节中,我们将介绍一种更接近生产环境的方法:管道并行。它可以以惊人的训练速度对大型模型进行分布式训练。
管道模型并行
在第4.2节中,我们讨论了最常用的分布式训练方法:数据并行。该方法在每个设备上保存整个模型的副本,并将数据分割到多个设备中。然后,它聚合梯度并在每个训练步骤中更新模型。数据并行的整个方法在能够将整个模型加载到一个GPU中时表现良好。然而,正如我们在本节中所看到的,我们并不总是能够做到这一点。而这就是管道并行的用武之地。在本节中,我们将学习管道并行,一种在多个GPU上分布式训练大型模型的方法。 为了理解管道并行,让我们首先简要了解模型并行。这个小插曲将使我们更容易理解管道并行。
模型并行
模型并行的思想是将神经网络分割成较小的子网络,并在不同的GPU上运行每个子网络。图4.6说明了模型并行的方法。
图4.6展示了模型并行的过程。它首先将一个神经网络(四层)转换为四个子神经网络(单层),然后为每个单层网络分配一个独立的GPU。通过这样做,我们在四个GPU上分布式地运行一个模型。 模型并行的概念很简单,但实际实现可能会比较棘手,这取决于网络的架构。为了让您有一个想法,以下是一个虚拟的PyTorch代码片段,使一个网络在两个GPU上运行。
gpu1 = 1
gpu2 = 2
class a_large_model(nn.Module):
def __init__(self):
super().__init__()
# initialize the network as two subnetworks.
self.subnet1 = ...
self.subnet2 = ...
# put subnetwork 1 and 2 to two different GPUs
self.subnet1.cuda(gpu1)
self.subnet2.cuda(gpu2)
def forward(x):
# load data to GPU 1 and calculate output for
# subnet 1, GPU 2 is idle at the moment.
x = x.cuda(gpu1)
x = self.subnet1(x)
# move the output of subnet 1 to GPU 2 and calculate
# output for subnet 2. GPU 1 is idle
x = x.cuda(gpu2)
x = self.sub_network2(x)
return x
正如您在4.2节的代码清单4.2中所看到的,两个子网络在__init__函数中初始化并分配给两个GPU,然后在forward函数中连接起来。由于深度学习网络的结构多样性,没有通用的方法(范例)来拆分网络。我们必须根据具体情况实现模型并行。
模型并行的另一个问题是其对GPU资源的严重低效利用。因为训练组中的所有设备都有顺序依赖性,一次只能有一个设备工作,这浪费了大量的GPU周期。图4.7可视化了使用三个GPU进行模型并行训练时的GPU利用情况。
让我们通过这个图来看看为什么GPU利用率如此低。在左侧的图4.7(a)中,我们看到了模型并行的设计。我们将一个模型网络拆分为三个子网络,并让每个子网络在不同的GPU上运行。在每个训练迭代中,在进行前向传播时,我们首先计算子网络1,然后是子网络2和子网络3;在进行反向传播时,梯度更新是反向进行的。 在右侧的图4.7(b)中,您可以看到训练期间三个GPU的资源利用情况。时间轴分为两个部分:前向传播和反向传播。前向传播表示模型推理的计算,从GPU 1到GPU 2和GPU 3,而反向传播表示模型权重更新的反向传播,从GPU 3到GPU 2和GPU 1。
无论是前向传播还是反向传播,如果您垂直查看时间条,您会发现一次只有一个GPU处于活动状态。这是由于每个子网络之间存在顺序依赖关系。例如,在前向传播中,子网络2需要等待子网络1的输出以完成自身的前向计算,因此在前向传播过程中,GPU 2将在GPU 1的计算完成之前处于空闲状态。
无论您添加多少个GPU,一次只有一个GPU可以工作,这是一种巨大的浪费。这就是流水线并行的用武之地。流水线并行通过消除这种浪费并充分利用GPU来使模型训练更加高效。让我们来看看它是如何工作的。
流水线并行
流水线并行本质上是模型并行的改进版。除了将网络分割到不同的GPU上,它还将每个训练示例批次划分为小的小批次,并在层之间重叠这些小批次的计算。通过这样做,它能够让所有的GPU大部分时间都保持忙碌状态,从而提高了GPU的利用率。
这种方法有两个主要的实现:PipeDream(微软)和GPipe(谷歌)。我们在这里使用GPipe作为演示示例,因为它在每个训练步骤中优化了梯度的更新,并具有更好的训练吞吐量。您可以从Huang等人的论文“GPipe: Easy scaling with micro-batch pipeline parallelism”中了解更多关于GPipe的细节(arxiv.org/abs/1811.06…)。让我们在图4.8中以高层次的方式看一下GPipe的工作原理。
图4.8(a)描绘了由四个子网络组成的神经网络;每个子网络加载在一个GPU上。F表示前向传播,B表示反向传播,Fk和Bk在GPUk上运行。训练的顺序是首先进行前向传播:F0 -> F1 -> F2 -> F3,然后进行反向传播:F3 -> (B3, F2) -> (B2, F2) -> (B1, F1) -> B0。 图4.8(b)展示了传统的模型并行方法的训练流程。我们可以看到GPU的利用率非常低;在前向传播和反向传播中,只有一个GPU处于活动状态;因此,每个GPU有75%的时间处于空闲状态。
图4.8(c)展示了GPipe在训练操作顺序上的改进。GPipe首先将每个训练示例批次分为四个相等的微批次,并通过四个GPU进行流水线处理。图中的F(0,2)表示在GPU 0上使用微批次2进行的前向传播计算。在反向传播过程中,根据用于前向传播的相同模型参数计算每个微批次的梯度。关键是它不会立即更新模型参数;相反,它会累积每个微批次的所有梯度。在每个训练批次结束时,我们使用来自所有四个微批次的累积梯度来更新跨所有四个GPU的模型参数。
通过比较图4.8(b)和(c),我们可以看到GPU利用率大大提高;现在每个GPU在47%的时间内处于空闲状态。让我们看一个使用PyTorch GPipe实现的代码示例,使用两个GPU训练一个Transformer模型(请参见下面的示例代码)。为了清晰地演示这个想法,我们只保留了与流水线相关的代码,并将其分为四个部分。您可以查看Pritam Damania的教程“PyTorch: Training transformer models using pipeline parallelism”以获取完整的代码(mng.bz/5mD8)。
## Part One: initialize remote communication
# for multiple machines
rpc.init_rpc(
name="worker",
# set rank number to this node, rank is the global
# unique id of a node, 0 is the master,
# other ranks are observers
rank=0,
# set the number of workers in the group
world_size=1,.. .. .. )
.. .. ..
## Part Two: split model to 2 subnetworks, load
# to different GPUs and initialize the pipeline.
num_gpus = 2
partition_len = ((nlayers - 1) // num_gpus) + 1
# Add all the necessary transformer blocks.
for i in range(nlayers):
transformer_block = TransformerEncoderLayer(emsize, nhead, nhid, dropout)
.. .. ..
# Load first half encoder layers to GPU 0 and second hard encoder layers to GPU 1.
device = i // (partition_len) tmp_list.append(**transformer_block.to(device)** )
# Load decoder to GPU 1.
tmp_list.append(Decoder(ntokens, emsize).cuda(num_gpus - 1)) module_list.append(nn.Sequential(*tmp_list))
## Part Three**: Build up the pipeline.
chunks = 8 # Set micro-batches number to 8.
model = Pipe(torch.nn.Sequential(*module_list), chunks = chunks)
.. .. ..
## Part 4: Train with pipeline
def train():
model.train() # Turn on the train mode
.. .. ..
for batch, i in enumerate(range(0, nbatches, bptt)):
data, targets = get_batch(train_data, i)
optimizer.zero_grad()
# Compute pipeline output,by following the pipeline setup,
# the Pytorch framework will coordinate the network computation
# between GPU 0 and GPU 1.
# Since the Pipe is only within a single host and process the "RRef"
# returned by forward method is local to this node and can simply
# retrieved via "RRef.local_value()".
output = model(data).local_value()
# Compute the loss on GPU 1.
# Need to move targets to the device where the output of the # pipeline resides.
loss = criterion(output.view(-1, ntokens), targets.cuda(1))
# Backprop and model parameters update are the same as single GPU training.
# The Pytorch framework hides all the details of micro-batches
# computation and model parameters update.
loss.backward()
torch.nn.utils.clip_grad_norm_(model.parameters(), 0.5)
optimizer.step()
.. .. ..
正如我们从代码清单4.3中可以看到的那样,流水线并行代码比分布式数据并行代码要复杂得多。除了设置通信组之外,我们还需要考虑如何划分模型网络以及在工作节点之间进行梯度和激活值(子网络的前向输出)的传输。
软件工程师怎样支持流水线并行
你可能已经注意到,在本节中我们讨论的所有方法都是用于编写训练代码的技术。由于数据科学家通常编写训练代码,你可能想知道作为软件开发人员,我们可以做些什么来支持流水线并行训练。
首先,我们可以致力于构建训练服务,以自动化流水线训练的执行并改善资源利用率(例如,始终保持 GPU 繁忙)。这种自动化包括分配工作资源、启用工作进程间通信,并将相应的初始化参数与流水线训练代码分发给每个工作进程(例如工作进程的 IP 地址、进程 ID、GPU ID 和工作组大小)。
其次,我们可以向数据科学家团队介绍新的分布式训练选项。有时数据科学家团队可能不知道可以改善模型训练体验的新工程方法,因此沟通至关重要。我们可以与团队成员合作,并引导关于尝试流水线并行方法的讨论。
第三,我们可以致力于提高模型训练的可用性。在4.2.4节中,我们讨论了分布式训练的脆弱性;它要求每个工作进程保持一致的运行。如果一个工作进程出现故障,整个训练组都会失败,这将是时间和预算的巨大浪费。数据科学家会非常感激我们在训练过程监控、故障转移和故障恢复方面所做的努力。
总结
- 分布式训练有两种思路:数据并行和模型并行。管道并行是模型并行的改进版本。
- 如果模型可以加载到一个GPU中,数据并行是实现分布式训练的主要方法;它简单易用且能显著提高训练速度。
- 使用Kubernetes来管理计算集群可以大大降低计算资源管理的复杂性。
- 尽管每个训练框架(TensorFlow、PyTorch)提供了不同的配置和API来编写分布式训练代码,但它们的代码模式和执行流程非常相似。因此,训练服务可以使用统一的方法支持各种分布式训练代码。
- 在封装各种训练框架的设置配置之后,训练服务仍然可以将训练代码视为一个黑盒,在分布式训练环境中使用。
- 要获取数据并行训练的进展/状态,只需检查主工作进程,因为所有工作进程始终保持同步。此外,为了避免在训练作业完成时从所有工作进程保存重复的模型,可以将训练代码设置为仅在主工作进程执行时持久化模型和检查点文件。
- Horovod是一个很好的分布式训练框架。它提供了一个统一的方法来运行使用不同框架编写的分布式训练代码:PyTorch、TensorFlow、MXNet和PySpark。如果训练代码使用Horovod来实现分布式训练,训练服务可以使用单个方法(Horovod方法)来执行它,无论它使用哪个训练框架编写。
- 可用性、弹性和故障恢复是分布式训练的重要工程问题。
- 对于无法放入一个GPU中的模型,有两种策略:节省内存的方法和模型并行的方法。
- 节省内存的方法每次只加载模型的一部分或小批量数据到GPU中,例如梯度累积和内存交换。这些方法易于实现,但会降低模型训练速度。
- 模型并行方法将大模型划分为一组子神经网络,并将它们分布在多个GPU上。这种方法的缺点是GPU利用率低。为了克服这一问题,发明了管道模型并行。