丰富的线上&线下活动,深入探索云世界
做任务,得社区积分和周边
最真实的开发者用云体验
让每位学生受益于普惠算力
让创作激发创新
资深技术专家手把手带教
遇见技术追梦人
技术交流,直击现场
海量开发者使用工具、手册,免费下载
极速、全面、稳定、安全的开源镜像
开发手册、白皮书、案例集等实战精华
为开发者定制的Chrome浏览器插件
数据并行(DataParallelism,DP)的核心思想是将大规模的数据集分割成若干个较小的数据子集,并将这些子集分配到不同的NPU计算节点上,每个节点运行相同的模型副本,但处理不同的数据子集。在每一轮训练结束后,各节点会将计算得到的梯度进行汇总,并更新模型参数。这样,每个节点都能在下一轮训练中使用更新后的模型参数,从而保证整个模型在所有节点上保持一致。
数据并行只能在单台机器上运行,采用单进程、多线程的实现方式,将原本在NPU0上进行的数据训练过程,扩展到多个NPU并行训练。在某NPU上随机初始化模型和优化器后,就可进行数据并行的训练,算法主要分为下面两个步骤:
如图所示,不断上述步骤重复进行,直到模型收敛或者达到预定的训练轮数。
另外在算法层面,全局的梯度累积和参数更新发生在一个NPU上,会出现明显的单个NPU利用率更高,其他NPU空闲的情况,造成了资源的浪费。同时如果在数据并行中的mini-batch设置过小,将导致NPU内并行度不足,从而降低训练速度;在通信开销的影响下,甚至可能出现比单NPU慢的情况。
分布式数据并行(DistributedDataParallel,DDP)是数据并行的一种高级形式,它综合了多种优化,是当前应用最广的并行算法之一,通常用于大型NPUAI集群和AI系统中。DDP在每个NPU上创建一个模型副本,并在每个训练步骤结束时,通过高效的梯度聚合和参数同步机制,确保模型的一致性。
除此以外,DDP针对数据并行的缺点做了许多改进,并拥有良好的的扩展性,如:完全分片的数据并行就是基于分布式数据并行的内存高效扩展版本。具体来说,分布式数据并行使用了多进程的实现方式,这避免了开发语言层面PythonGIL的限制,也将并行规模扩展到多台网络连接的机器,进一步扩大分布式规模和效率;同时,针对通信做了大量优化,如使用RingAllReduce算法和延迟隐藏技术进行高效的集合通信。分布式数据并行的各NPU负载也更均衡,没有单独在某一个NPU上工作的情况。
在分布式数据并行中,程序会启动设备数量个进程,每个进程单独启动一个主训练脚本副本。在开始时,主进程将模型从设备NPU0复制到其余NPU一次,保证各NPU的模型、优化器完全相同,接下来是分布式数据并行的训练过程:
将在后续内容具体介绍有关计算与通信的重叠的内容。梯度的一致性可确保各NPU的模型保持一致,避免使用其他模型的梯度进行参数更新而导致收敛问题。
在异步的数据并行中,还将会接着讨论模型不一致的情况,这将会带来一定的收敛问题,但是可以使整个迭代过程更快,同时NPU的利用率更高。
上述步骤重复进行,直到模型收敛或者达到预定的训练轮数。
数据并行是分布式训练中最基础和常见的并行算法。本文将重点介绍分布式数据并行(DDP)在PyTorch中的简单实现示例,并对数据并行的各个关键步骤如前向传播、反向传播、梯度更新等进行详细分析,以更深入地理解数据并行的实现原理和具体执行步骤。
下面在分布式环境下使用2个NPU训练简单网络的完整例子。
首先需要导入了实现分布式数据并行训练所需的库。包括PyTorch的核心库torch、神经网络模块torch.nn、优化器模块torch.optim、分布式启动模块torch.distributed和多进程模块torch.multiprocessing:
importosimporttorchimporttorch.nnasnnimporttorch.optimasoptimimporttorch.distributedasdistimporttorch.multiprocessingasmpfromtorch.nn.parallelimportDistributedDataParallelasDDP首先指使用dist.init_process_group初始化进程组。example函数实现了分布式数据并行训练的主要逻辑。该函数首先加载模型并将其复制到当前进程的NPU上,并使用torch.nn.parallel.DistributedDataParallel将其封装为分布式数据并行模型,同步不同进程的参数。
对于每个数据,该函数将首先把数据移动到当前NPU,然后前向传播计算模型输出,基于损失函数计算损失值并反向传播计算梯度,最后使用优化器更新模型参数。
接下来来看PyTorch2.0中分布式数据并行具体的实现方式,这里先不涉及PyTorch2.0或torchdynamo引入的编译部分。首先看看DDP的初始化与前向传播,以及在这个过程中是如何维护模型一致性的。
模型的一致性要求每次进行的前向传播每个进程的参数需要相同。它依赖于torch.nn.Module类和DistributedDataParallel类,在PyTorch中,所有的模型都会继承Module类(包括分布式数据并行类DistributedDataParallel)。
#torch.nn.modules.pyclassModule:..._parameters:Dict[str,Optional[Parameter]]_buffers:Dict[str,Optional[Tensor]]...DDP在构建时,会通过_sync_module_states同步各个进程的模型参数,包括_parameters和_buffers以达到模型的一致性。
#torch.nn.parallel.distributed.pyclassDistributedDataParallel(Module,Joinable):...def__init__(...#Syncparamsandbuffers.EnsuresallDDPmodels#startoffatthesamevalue._sync_module_states(module=self.module,process_group=self.process_group,broadcast_bucket_size=self.broadcast_bucket_size,src=0,params_and_buffers_to_ignore=self.parameters_to_ignore,)...同时,在每次网络传播开始前,DDP也都会通过_sync_module_states同步进程之间的buffer,维持状态的统一。
#torch.nn.parallel.distributed.pyclassDistributedDataParallel(Module,Joinable):...defforward(self,*inputs,**kwargs):...#Syncparamsandbuffers.EnsuresallDDPmodelsstartoffatthesamevalue._sync_module_states(module=self.module,process_group=self.process_group,broadcast_bucket_size=self.broadcast_bucket_size,src=0,params_and_buffers_to_ignore=self.parameters_to_ignore,)...DDP计算通信重叠在分布式数据并行中,一项重要的优化是在反向传播过程中同时进行参数更新,这一过程也被称为计算与通信的重叠。在分布式训练中,每个进程通常会在完成当前网络反向传播的同时进行梯度更新,以隐藏通信延迟。
这里同样使用PyTorch2.0进行举例。在此过程中涉及到钩子函数hook、参数桶bucket和归约管理器reducer三个关键部分。
钩子函数hook是在torch.Tensor上实现的,每次计算相对于张量的梯度时都会调用该钩子。通过钩子函数,当张量梯度计算完成后,就可以立即进行集合通信。需要注意的是,虽然DDP的关键代码是用C++实现的,但在C++和Python代码中,Tensor都提供了相似的hook接口,实现了类似的功能。
#torch._tensor.pyclassTensor(torch._C._TensorBase):...defregister_hook(self,hook):r"""Registersabackwardhook.ThehookwillbecalledeverytimeagradientwithrespecttotheTensoriscomputed....PyTorch使用归约管理器reducer在反向传播期间进行梯度同步。为提高通信效率,reducer将参数梯度组织到多个桶buckets中,并对每个桶进行集合通信(可通过在DDP构造函数中设置bucket_cap_mb参数来配置桶大小)。
其中参数梯度到桶的映射,在构造时基于桶大小限制和参数大小确定。模型参数按照给定模型Model.parameters()的大致相反顺序分配到桶中(使用相反顺序的原因是DDP期望在反向传播时以大致相同的顺序准备好梯度)。
除了分桶,reducer在构造阶段为每个参数注册了autograd钩子,在反向传播时当梯度准备就绪时触发这些钩子。PyTorch使用_ddp_init_helper函数,进行参数的reducer的初始化以及参数的装桶。
#torch.nn.parallel.distributed.pyclassDistributedDataParallel(Module,Joinable):...def__init__(...#Buildsreducer.self._ddp_init_helper(parameters,expect_sparse_gradient,param_to_name_mapping,static_graph,)......def_ddp_init_helper(self,parameters,expect_sparse_gradient,param_to_name_mapping,static_graph,):"""Initializationhelperfunctionthatdoesthefollowing:(1)bucketingtheparametersforreductions(2)resettingthebucketingstates(3)registeringthegradhooks(4)Loggingconstruction-timeDDPloggingdata(5)passingahandleofDDPtoSyncBatchNormLayer"""...如果一个参数在前向传播中没有被使用,当前参数的桶会在反向传播时永远等待缺失的梯度。如果设置了find_unused_parameters为True,DDP会分析来自本地模型的输出,从而确定在模型的子图上运行反向传播时哪些参数参与了计算。DDP通过从模型输出遍历计算图来找出未使用的参数,并将其标记为可供reduce。
在反向传播期间,reducer只会等待未就绪的参数,但它仍会对所有桶进行reduce操作。将参数梯度标记为就绪不会帮助DDP跳过桶,但会防止其在反向传播时永远等待缺失的梯度。值得注意的是,遍历计算图会带来额外开销,因此只有在必要时才应将find_unused_parameters设置为True。
由于反向传播的函数backward直接在损失张量上调用,这超出了DDP的控制范围。DDP使用在构造时注册的autograd钩子来触发梯度同步。当一个梯度准备就绪时,相应的DDP钩子会被触发,DDP将标记该参数梯度为就绪可供reduce。
当一个桶中的所有梯度都准备就绪时,reducer将在该桶上启动异步allreduce操作以计算所有进程中梯度的平均值。当所有桶都就绪时,reducer将阻塞等待所有allreduce操作完成。
完成后,平均梯度将被写入所有参数的param.grad字段。因此,在反向传播之后,不同DDP进程上相同的参数其grad字段应该是相同的。在之后的优化器步骤中,所有DDP进程上的模型副本可以保持同步,因为它们都从同一个状态开始,并且在每次迭代中具有相同的平均梯度。
所使用的DataLoader是一个迭代器,在加载__iter__方法时,会根据进程数量选择对应的迭代器并赋值给类变量_iterator,迭代器种类分为_SingleProcessDataLoaderIter和_MultiProcessingDataLoaderIter,其中_MultiProcessingDataLoaderIter负责多进程的数据读取。
#torch.utils.dat.dataLoader.pyclassDataLoader(Generic[T_co]):...def__iter__(self)->'_BaseDataLoaderIter':...ifself.persistent_workersandself.num_workers>0:ifself._iteratorisNone:self._iterator=self._get_iterator()else:self._iterator._reset(self)returnself._iteratorelse:returnself._get_iterator()...def_get_iterator(self)->'_BaseDataLoaderIter':ifself.num_workers==0:return_SingleProcessDataLoaderIter(self)else:self.check_worker_number_rationality()return_MultiProcessingDataLoaderIter(self)在获取数据时,这些迭代器会调用使用_reset初始化sampler,然后通过_next_data方法获取数据。
#torch.utils.data.dataLoader.pyclass_MultiProcessingDataLoaderIter(_BaseDataLoaderIter):def__init__(self,loader):...foriinrange(self._num_workers):#Nocertaintywhichmodulemultiprocessing_contextis#type:ignore[var-annotated]index_queue=multiprocessing_context.Queue()#Needto`cancel_join_thread`here!#Seesections(2)and(3b)above.index_queue.cancel_join_thread()w=multiprocessing_context.Process(target=_utils.worker._worker_loop,args=(self._dataset_kind,self._dataset,index_queue,self._worker_result_queue,self._workers_done_event,self._auto_collation,self._collate_fn,self._drop_last,self._base_seed,self._worker_init_fn,i,self._num_workers,self._persistent_workers,self._shared_seed))w.daemon=Truew.start()self._index_queues.append(index_queue)self._workers.append(w)...其中每一个进程都运行_worker_loop函数,从index_queue中获取index,而后从Dataset中获取对应的数据。
#torch.utils.data.dataLoader.pyclass_MultiProcessingDataLoaderIter(_BaseDataLoaderIter):def_next_data(self):whileTrue:...#Checkifthenextsamplehasalreadybeengeneratediflen(self._task_info[self._rcvd_idx])==2:data=self._task_info.pop(self._rcvd_idx)[1]returnself._process_data(data)def_process_data(self,data):self._rcvd_idx+=1self._try_put_index()ifisinstance(data,ExceptionWrapper):data.reraise()returndata如果设置了pin_memory=True,则主进程会启动一个内存固定线程,该线程从结果队列中获取数据,并使用_pin_memory_loop将其复制到NPU内存中。复制后的数据被放入数据队列中,供主进程使用。
#torch.utils.data.dataLoader.pyclass_MultiProcessingDataLoaderIter(_BaseDataLoaderIter):def__init__(self,loader):...pin_memory_thread=threading.Thread(target=_utils.pin_memory._pin_memory_loop,args=(self._worker_result_queue,self._data_queue,current_device,self._pin_memory_thread_done_event,self._pin_memory_device))在分布式环境下,通过DistributedSampler可以获取到基于NPU索引的数据切分,这样就确保了每个NPU可以拿到不同的数据。
#torch.utils.data.distributed.pyclassDistributedSampler(Sampler[T_co]):def__iter__(self)->Iterator[T_co]:ifself.shuffle:#deterministicallyshufflebasedonepochandseedg=torch.Generator()g.manual_seed(self.seed+self.epoch)indices=torch.randperm(len(self.dataset),generator=g).tolist()else:indices=list(range(len(self.dataset)))...#subsampleindices=indices[self.rank:self.total_size:self.num_replicas]assertlen(indices)==self.num_samplesreturniter(indices)DDP性能分析使用torch.profiler.profile对DDP的过程进行性能分析。只需要对训练的循环进行简单嵌套,就能得到清晰的具体分析结果。
这里使用了两张V100-SXM2-16GB作为设备并使用NV-Link连接,通过CIFAR10训练ResNet50网络。
同样,在追踪视图中,可以看到反向传播的主要计算函数autograd::engine::evaluate_function:ConvolutionBackward0与集合通信的函数nccl:all_reduce执行是重叠的。
DDP反向传播中计算与通信的重叠导致无需等待所有计算完成后再集中进行通信,也不必在计算完成后等待通信完成,提高了AI集群的使用率。
前面的介绍都是基于同步的数据并行,同步的数据并行特别适用于计算资源相对均衡的情况。在同步数据并行中,每个NPU都处理数据的一个子集,并独立地计算梯度。在每次迭代中,所有NPU都将它们的梯度汇总,并通过一致的规则来更新模型参数。这样,所有NPU上的模型都保持一致,不会出现不同步的情况。
由于所有NPU在每个训练步骤中都执行相同的更新操作,模型的收敛性更容易得到保证。且所有NPU都参与到梯度更新的计算中,整体计算效率也相对较高。此外,同步数据并行还易于实现,因为所有NPU的操作都是同步的,不需要复杂的同步机制。
异步的数据并行(AsynchronousDataParallelism,ADP)可以在一定程度上解决这些问题。在异步数据并行中,不同NPU的计算过程相互独立,不再需要等待其他NPU完成计算。每个NPU都按照自己的速度进行前向和反向传播,随时将计算得到的梯度更新到模型参数中。这样,快速的NPU不再受到慢速NPU的影响,整体计算效率得到提高。异步数据并行的步骤为:
由于是异步更新,不需要等待所有NPU都完成计算后再进行更新,这样快速的NPU可以频繁更新参数,而慢速的NPU则不影响整体更新进度。
异步数据并行的优点之一是它可以充分利用集群中每个NPU的计算能力,快速的NPU不会受到慢速NPU的影响,从而提高了整体的训练速度。此外,由于每个NPU都独立地进行计算和参数更新,异步数据并行也具有较好的扩展性,能够适应不同规模的集群和不同数量、类型的NPU。
但是异步数据并行也存在一些挑战。由于计算过程是异步的,可能会出现梯度更新之间的竞争条件,需要采取一些机制来解决,如:参数服务器。同时由于计算过程不再同步,模型的收敛性可能会受到影响,需要通过调整学习率或者采用一些优化算法来弥补。
弹性训练是一种分布式机器学习训练方法,旨在提高系统在动态环境中的容错性和灵活性。其核心理念是通过动态调整训练过程中的资源分配和任务调度,以应对文点故障、资源变化等不可预测的情况,从而保证训练过程的连续性和高效性。弹性训练主要通过以下方法实现其目标:
PyTorch提供了Torchelastic组件,用于支持分布式训练过程中的弹性调度和故障恢复。它使得在大规模分布式训练环境中,可以动态调整参与训练的节点数量,并在节点发生故障时进行自动恢复,从而提高训练过程的鲁棒性和效率。Torchelastic包括ElasticAgent服务器、Rendezvous等组件。
ElasticAgent是TorchElastic的控制面板。它是一个进程,负责启动和管理底层的工作进程。其主要职责包括:
最简单的Agent部署在每个节点上,管理本地进程。更高级的Agent可以远程启动和管理工作进程。Agent可以完全去中心化,根据其管理的工作进程独立做出决策,也可以通过与其他管理同一作业的Agent协作,做出集体决策。
下图是一个管理本地工作进程组的Agent的示意图。每个Agent都会管理多个Worker,并运行一个Rendezvous模块,用于在分布式环境中实现节点的同步和发现,阻塞会持续到至少Min个ElasticAgent加入后返回。
当有新的节点加入或现有节点退出时(即成员变更),Rendezvous过程会重新开始。Rendezvous过程包括两个关键步骤:屏障操作(barrier)和排名分配(rankassignment)。屏障操作确保所有节点在达到最小节点数量之前都处于等待状态,并在达到最大节点数量后立即完成。排名分配则为每个节点分配一个唯一的rank,确保每个节点在分布式训练中的rank是明确的。
ElasticAgent持续监控本地的工作进程状态。如果检测到任何工作进程失败或不健康,ElasticAgent会立即终止所有工作进程,并重新启动。这一过程通过重新启动(respawn)工作进程来实现,确保训练任务的持续进行。
监控工作进程的健康状态是一个持续的过程,ElasticAgent不断检查工作进程的状态,并在必要时触发故障恢复机制。
ElasticAgent还具备弹性调度功能。它能够动态调整参与训练的节点数量,响应成员变更,重新启动包含新成员的工作进程。这样,即使在训练过程中有节点故障或新增节点,ElasticAgent也能够及时调整,保证训练过程的鲁棒性和灵活性。
在TorchDistributedElastic中,Rendezvous是一种功能,结合了分布式同步原语和节点发现。它用于在分布式训练作业中聚集节点,确保所有节点对节点列表及其角色达成一致,并一致决定何时开始或恢复训练。Rendezvous的关键功能有:
TorchDistributedElastic提供了DynamicRendezvousHandler类,实现上述Rendezvous机制。它是一种与后端无关的类型,需在构造时指定特定的RendezvousBackend实例。Torch分布式用户可以实现自己的后端类型,或使用PyTorch提供的实现:
以下是使用DynamicRendezvousHandler的示例代码:
store=TCPStore("localhost")backend=C10dRendezvousBackend(store,"my_run_id")rdzv_handler=DynamicRendezvousHandler.from_backend(run_id="my_run_id",store=store,backend=backend,min_nodes=2,max_nodes=4)下面是描述Rendezvous工作流程的状态图。
弹性数据并行和数据并行的启动方式一致,但有以下区别:
以下是一个训练脚本的示例,它在每个epoch上进行检查点操作,因此在失败时最坏情况下会丢失一个完整epoch的训练进度。
defmain():args=parse_args(sys.argv[1:])state=load_checkpoint(args.checkpoint_path)initialize(state)torch.distributed.init_process_group(backend=args.backend)foriinrange(state.epoch,state.total_num_epochs):forbatchiniter(state.dataset):train(batch,state.model)state.epoch+=1save_checkpoint(state)可以通过torchrun启动分布式和弹性训练,在启动弹性作业时,需要在至少MIN_SIZE节点和最多MAX_SIZE节点上运行以下命令。