跳到主要内容
分布式训练框架的编程基础与 PyTorch RPC 实践 | 极客日志
Python AI 算法
分布式训练框架的编程基础与 PyTorch RPC 实践 分布式训练框架涉及多进程通信、显存优化及异步计算。文章基于 PyTorch 详解分布式环境初始化(init_process_group)、通信算子(all_reduce, broadcast, p2p)及通信组管理。通过蒸馏 Demo 展示数据并行与 Teacher-Student 通信实现。深入讲解 register_hook 在梯度累积中的应用,以及多进程启动模式对 CUDA 初始化的影响。最后介绍 TorchRPC 作为独立于 init_process_group 的通信方式,支持远程调用与跨设备映射,为构建大规模 LLM 训练框架提供底层编程基础。
未来可期 发布于 2025/2/6 更新于 2026/4/24 4 浏览为什么会有这篇文章:虽然工作内容不是 infra,但是我比较喜欢研究训练方法,魔改训练框架造轮子。正好最近看到 OpenRLHF 用 ray 管理 VLLM 的方案,感觉很有意思,遂研究了一下,发现 VLLM 的 TP 切分和 Megatron 是一套逻辑,用 torch 的 rpc 也可以代替 ray 的远程调用,所以打算用 Megatron+TorchRPC+VLLM 实现一套类似的框架,后期再把 VLLM 原地换掉直接 megatron 推理。在开始这个大工程之前,正好有机会写下这篇文章,就算是开工仪式了。
本文的主要内容
本文主要是从编程的角度,对LLM 训练框架 所涉及的一些前置编程知识 进行讲解,并且会举一些应用技巧,对应到当前的 LLM 训练框架,辅助理解训练框架的代码逻辑。举个例子,下面是一段 megatron 初始化多卡通信组的代码:
rank = torch.distributed.get_rank()
for ranks in rank_generator.get_ranks('dp' ):
group = torch.distributed.new_group(
ranks, timeout=timeout, pg_options=get_nccl_options('dp' , nccl_comm_cfgs)
)
group_gloo = torch.distributed.new_group(ranks, timeout=timeout, backend="gloo" )
if rank in ranks:
_DATA_PARALLEL_GROUP = group
_DATA_PARALLEL_GROUP_GLOO = group_gloo
_DATA_PARALLEL_GLOBAL_RANKS = ranks
读完本文你会了解:
什么是 rank、什么是 world_size,什么是通信组 group
为什么这段代码是先建通信组,再根据 rank in ranks 决定是否保存?判断 rank in ranks 了再创建 group 行不行?
在子进程和子线程中创建通信组有什么区别和要注意的地方。
backend="gloo":什么是 backend,gloo backend 是干什么的,为什么不用 nccl backend。
再比如下面是一段 deepspeed 在参数上注册的回调函数:
def create_reduce_and_remove_grad_hooks (self ):
self .grad_accs = []
for i, param_group in enumerate (self .bit16_groups):
for param in param_group:
if param.requires_grad:
def wrapper (param, i ):
param_tmp = param.expand_as(param)
grad_acc = param_tmp.grad_fn.next_functions[ ][ ]
( ):
.reduce_ready_partitions_and_remove_grads(param, i)
._grad_acc_hooks.append(grad_acc.register_hook(reduce_partition_and_remove_grads))
.grad_accs.append(grad_acc)
wrapper(param, i)
0
0
def
reduce_partition_and_remove_grads
*notneeded
self
self
self
register_hook 是干什么,什么时候触发。
通常都是直接在参数上注册 hook,为什么这里要在参数的视图的前一个算子注册 hook:param.expand_as(param).grad_fn.next_functions[0][0]
训练框架是干什么的 目前 LLM 训练有两大主流框架:Deepspeed 和 Megatron-LM。前者的主要提出和维护者是微软的工程师,后者是英伟达的工程师。两个框架从底层原理到设计语言可以说是大相径庭。训练框架的主要目标有 2:一是在有限的 GPU 中尽可能地塞入一个大号模型,二是高效地利用多 GPU 进行训练。完成第一个目标主要依赖的是模型切分,或者更笼统地说是降低单卡显存占用。完成第二个目标依赖的是异步、高重叠度、高带宽的数据通信。Deepspeed 在降低显存这方面应用的技术主要有 Zero-1、2、3,序列并行、CPU Offload,高效通信方面主要是依赖 register_hook 回调函数的异步通信、多 cuda 事件流、GPU 计算重叠、连续锁页缓存。Megatron 在降低显存方面的主要技术有 Distributed Optimizer、Tensor Model Parallel、Pipeline Model Parallel、序列并行,在高效通信方面主要的技术有 P2P 通信、重叠流水线并行、梯度缓存。在设计语言上,DeepSpeed 属于外挂框架,框架并不介入模型前向的计算图,因此对模型结构一般没有特殊要求,核心代码通过大量回调函数和 torch 派生类封装,并不暴露给用户。Megatron 则是属于内嵌框架,直接改变模型计算图,因此限制模型结构必须是类 Transformer 的结构,代码全部暴露在外,不怎么依赖回调函数。外挂框架的好处是兼容性好,对新手友好,缺点是启动慢、计算速度略低,适合数据量不大、不关注训练加速技术,专注于模型和数据迭代的人。内嵌框架的优点是启动、训练效率高一点,对于想魔改底层的人更友好,但是对于想轻微修改训练逻辑的不太友好,适合大规模训练和想要了解、改动并行训练代码的人。
Hello World import torch
import os
if __name__ == '__main__' :
rank = int (os.getenv('RANK' ,'0' ))
world_size = int (os.getenv('WORLD_SIZE' ,'1' ))
torch.distributed.init_process_group(rank=rank, world_size=world_size, backend='nccl' )
print (f'Hello World from rank:{torch.distributed.get_rank()} out of {torch.distributed.get_world_size()} worlds.' )
假设代码命名为 t1.py,则可以用 torchrun --nproc-per-node 2 t1.py 来启动。(如果不做特殊说明,后面都用这组参数启动。后面给出的 demo 大多数没有 GPU 也可以跑,只需要将张量的创建位置改为 cpu,以及通信相关的后端全部使用 gloo 后端即可)
正常情况下,可以看到程序打印出 Hello World from rank:0 out of 2 worlds. 和 Hello World from rank:1 out of 2 worlds.
启动命令 torchrun 是安装 torch 后自带的命令,作用是帮助我们以多进程方式启动指定脚本。
在分布式环境中,我们首先要区分多机和多卡,多机是指多台运行训练任务的服务器,多卡是指一台机器上有多个显卡。我们一般称一台服务器为 node,有几台服务器就有几个 node。
那么 --nproc-per-node 从字面意思上就能看出表示每台服务器上启动多少个进程。一般来说,我们启动的进程数与服务器上的 gpu 数相同,也就是 8 卡机 nproc-per-node 设为 8。当然一台 8 卡机你可以只启动 2 个进程,也可以启动 100 个进程,这个并不是硬限制。但是保持进程数和卡数相同可以简化我们的编程逻辑,让每个进程只负责一张卡上的计算。
除了 nproc-per-node,启动命令还有几个常见参数:
–master_addr 和–master_port:当启动的是多机环境时,需要用这两个参数指定主机的 IP 地址和端口号。这两个参数在所有 master 和 slaver 机器上都是一样的,不需要修改。
–nnodes:当多机启动分布式时,使用这个参数,表明总共有多少台机器。master 会根据这个参数配置来等待 slaver 链接,直到 slaver 数量凑够 nnodes 才会启动。这个参数在每台机器上都是一样的,不需要修改。
–node_rank:表示当前 node 是全部 node 中的第几个,这个参数需要根据启动的机器进行更改,master 的 rank 必须是 0。
上述启动参数要放在 torchrun 之后,脚本路径之前。写在脚本路径之前的参数不会被传递给脚本运行的环境,所以脚本中不要处理这些参数。
rank & world size 在启动的训练脚本中,需要先初始化分布式环境,也就是下面这几行:
rank = int (os.getenv('RANK' ,'0' ))
world_size = int (os.getenv('WORLD_SIZE' ,'1' ))
torch.distributed.init_process_group(rank=rank, world_size=world_size, backend='nccl' )
如果是使用 torchrun 启动的脚本,torch 会帮你在环境变量中写入 RANK 和 WORLD_SIZE 这两个变量,在脚本中可以读出来。它们分别表示当前进程的序号和总进程数。这个进程序号是全局的进程序号,也就是说如果每台服务器启动 8 个进程,总共 10 台服务器参与训练,那么第十台机器的 8 个进程 rank 分别是 72、73、74、75、76、77、78、79。world_size 也是全局的进程数,而不是单台服务器上的进程数。
获得 rank 和 world_size 后,就可以用 torch.distributed.init_process_group 初始化分布式环境。
在初始化之后的任何地方,都可以用 torch.distributed.get_rank() 和 torch.distributed.get_world_size() 来获取当前进程序号和总进程数。
另外需要提的一点是,torch 并没有 local_rank 这个概念。这是一些训练框架自己定义的,通常用来表示当前进程是这台服务器上的第几个进程。
后端 backend 在初始化命令中,我们还指定了 backend 参数,这个参数表示的是分布式环境默认使用的通信后端是什么,一般可以选择 nccl、gloo 和 mpi 后端。gpu to gpu 的通信选择 nccl 后端。cpu to cpu 的通信选择 gloo 后端,一般不太会用到 mpi 后端。
nccl 通信会比 gloo 通信快,所以应该尽量使用 nccl 进行通信。但是在有些时候,比如读取数据的阶段,如果多个进程之间需要通信,一般使用用 gloo 通信。因为这时,我们并不希望这些还没有开始正向传播的张量过早出现在 gpu 上,避免过多占用显存。在后面讲 group 的部分,我们会提到怎么让训练进程使用 nccl 后端,数据读取进程使用 gloo 后端。
训练脚本开始阶段的一些小细节
set device 分布式训练中,在我们创建 gpu 张量时,经常使用 torch.cuda.current_device() 获取当前 rank 使用的 cuda 设备,然后直接在这个设备上创建。使用这个函数的前提是之前通过 set_device 显式设定过默认设备。另外还有一些算子会要求用户必须执行过 set_device。
所以一个好习惯是,在执行主要的训练逻辑之前,尽早设置默认的 cuda 设备:
devices = torch.cuda.device_count()
torch.cuda.set_device(rank % devices)
如果不手动设置,current_device 默认会返回 cuda:0。
固定随机种子 分布式训练时随机种子是一个容易忽略的事,但却是很重要的事。在脚本初始化阶段设置一个固定的随机种子有两个好处,一个是让实验可复现,另一个就是让不同 rank 的模型以相同的方式初始化,不然还要用通信操作同步一下各个模型来保证模型初始化状态的一致性。
def set_random_seed (seed ):
random.seed(seed)
np.random.seed(seed)
torch.manual_seed(seed)
通信算子 通信算子指的是在分布式训练中,能够在不同 rank 之间进行数据交换的算子。torch 提供了很多通信相关的算子,最常用可以划分为 5 类:规约、聚集、广播、点对点通信和其他。
规约 规约最典型的就是 torch.distributed.all_reduce,这个算子能够对不同 rank 之间的数据进行求和、求均值、最大值等操作,特点是不论有多少个 rank,最终结果只有一个 tensor。例如下面这段代码,作用是对所有 rank 之间的"tensor"求和:
import torch
import os
if __name__ == '__main__' :
rank = int (os.getenv('RANK' ,'0' ))
world_size = int (os.getenv('WORLD_SIZE' ,'1' ))
torch.distributed.init_process_group(rank=rank, world_size=world_size, backend='nccl' )
devices = torch.cuda.device_count()
torch.cuda.set_device(rank % devices)
tensor = torch.tensor([rank + 1 ], dtype=torch.long, device='cuda' )
torch.distributed.all_reduce(tensor)
print (f'rank:{rank} {tensor} ' )
rank0 的 tensor=1,rank1 的 tensor=2,求和结果为 3,因此能看到这样的输出:
rank:0 tensor(3)
rank:1 tensor(3)
规约操作基本可以算是最常用的通信算子。例如 rank 之间通过规约操作同步梯度、计算平均 loss,再比如分布式 softmax 利用规约计算最大值、归一化分母等等。
聚集 聚集操作中最典型的是 torch.distributed.all_gather(),能够把不同 rank 的数据收集到一起,特点是参与通信的 rank 有多少,就会得到多少个 tensor。例如下面这段代码,作用是将所有 rank 的 tensor 收集到一起:
import torch
import os
if __name__ == '__main__' :
rank = int (os.getenv('RANK' ,'0' ))
world_size = int (os.getenv('WORLD_SIZE' ,'1' ))
torch.distributed.init_process_group(rank=rank, world_size=world_size, backend='nccl' )
devices = torch.cuda.device_count()
torch.cuda.set_device(rank % devices)
tensor = torch.tensor([rank+1 ], dtype=torch.long, device='cuda' )
tensors = [torch.empty_like(tensor) for _ in range (world_size)]
torch.distributed.all_gather(tensors,tensor)
print (f'rank:{rank} {tensors} ' )
由于'所有'rank 都收集了'所有'tensor,所以每个 rank 都会拥有其他所有 rank 和自己的 tensor,因此会打印出如下内容:
rank:0 [tensor(1), tensor(2)]
rank:1 [tensor(1), tensor(2)]
all_gather 可以模拟 all_reduce,比如 sum(tensors) 就等于 all_reduce sum,max(tensors) 就是 all_reduce max。还有一个需要注意的点是不能使用列表乘法创建 tensors:
tensors = [torch.empty_like(tensor)] * world_size
因为除了 all_gather_object 等支持 python 变量的通信算子,大部分都是原地操作,而列表乘法创建的 tensors 每一个 tensor 都是指向同一个对象,与原地操作会发生冲突。聚集相比规约更加灵活,但是规约的效率和显存占用通常会更好。
广播 广播的典型操作是 torch.distributed.broadcast(),作用是将指定 rank 的 tensor 发送给其他所有 rank,比如下面这段代码,是将 rank0 的 tensor 发送给其他所有 rank,这样最终所有 rank 的 tensor 都是一样的
import torch
import os
if __name__ == '__main__' :
rank = int (os.getenv('RANK' ,'0' ))
world_size = int (os.getenv('WORLD_SIZE' ,'1' ))
torch.distributed.init_process_group(rank=rank, world_size=world_size, backend='nccl' )
devices = torch.cuda.device_count()
torch.cuda.set_device(rank % devices)
tensor = torch.tensor([rank+1 ], dtype=torch.long, device='cuda' )
torch.distributed.broadcast(tensor,0 )
print (f'rank:{rank} {tensor} ' )
rank:0 tensor(1)
rank:1 tensor(1)
广播的特点是数据由一个节点到所有节点,通常用于将只会出现在某一个 rank 的信号发送到全部 rank。例如动态数据量训练,是否到达最后一个 batch,会以最后一个数据并行 rank 能否获得一个完整的 micro batch 作为信号。此时可以利用广播操作,将最后一个数据并行 rank 的结束信号广播到其他所有 rank。再比如在 megatron 中,当使用 tensor 并行时,仅第一个 tensor rank 会获取数据,其他 tensor rank 的数据是由第一个 tensor rank 广播来的。
点对点通信、p2p 除了上述几个所有 rank 之间的通信,有时我们也需要两个 rank 之间的两两通信,这时就会用到 p2p 通信。p2p 通信的发送方使用 torch.distributed.send(),接收方使用 torch.distributed.recv()。比如下面这段代码,功能是所有偶数 rank 将 tensor 发送到下一个奇数 rank:
import torch
import os
if __name__ == '__main__' :
rank = int (os.getenv('RANK' ,'0' ))
world_size = int (os.getenv('WORLD_SIZE' ,'1' ))
torch.distributed.init_process_group(rank=rank, world_size=world_size, backend='nccl' )
devices = torch.cuda.device_count()
torch.cuda.set_device(rank % devices)
if rank % 2 == 0 :
tensor = torch.tensor([999 ], dtype=torch.long, device='cuda' )
torch.distributed.send(tensor, rank+1 )
else :
tensor = torch.empty(1 , dtype=torch.long, device='cuda' )
torch.distributed.recv(tensor,rank-1 )
print (f'rank:{rank} {tensor} ' )
rank:0 tensor(999)
rank:1 tensor(999)
上面大多数算子都是原地操作,这就带来一个问题,原地操作要求先创建一个空张量,等待通信算子把数据放进来。但是一个 rank 怎么知道被通信过来的张量 shape 是什么样的,怎么提前创建这个空张量?尤其是在广播和 p2p 通信时经常会碰到这个问题。一个常用的方案是在通信以前,先通信一个固定 ndim 的张量用来表示接下来要通信的张量的 shape,然后再通信真正的数据,比如下面这样:
import torch
import os
if __name__ == '__main__' :
rank = int (os.getenv('RANK' ,'0' ))
world_size = int (os.getenv('WORLD_SIZE' ,'1' ))
torch.distributed.init_process_group(rank=rank, world_size=world_size, backend='nccl' )
devices = torch.cuda.device_count()
torch.cuda.set_device(rank % devices)
if rank % 2 == 0 :
tensor = torch.randn(1 , 4 , dtype=torch.float16, device='cuda' )
shape_tensor = torch.tensor(tensor.size(), dtype=torch.long, device='cuda' )
torch.distributed.send(shape_tensor, rank+1 )
torch.distributed.send(tensor, rank+1 )
else :
shape_tensor = torch.empty(2 , dtype=torch.long, device='cuda' )
torch.distributed.recv(shape_tensor, rank-1 )
tensor = torch.empty(torch.Size(shape_tensor), dtype=torch.float16, device='cuda' )
torch.distributed.recv(tensor, rank-1 )
print (f'rank:{rank} {tensor} ' )
这种方法有点像定义一种通信协议,第一次握手通信 shape,第二次通信数据。如果在使用时 ndim 也固定不下来,或者 tensor 的 dtype 也需要通信,那么我们就可以像定义通信协议一样,定义一个长一点的定长 shape tensor,比如 shape=(10,) ,用前 9 位表示接下来要通信的数据张量的 shape,不足的位置补 0,最后一位用一个数字表示数据类型。点对点通信主要的应用场景一个是 pipeline 并行,由上一个 pipe 发送数据到下一个 pipe,一个是蒸馏的 teacher 模型发送 probabilities 给 student,以及类似的 reference-polocy model。
其他通信算子 还有一个经常使用的是同步屏障 torch.distributed.barrier(),这个操作不通信任何数据,作用是确保所有进程都运行到此处后再开始之后的动作。比如当存储 checkpoint 时,我们通常只会让第一个数据并行的 rank 进行保存,其他 rank 此时就应该使用同步屏障等待第一个 rank 保存结束。这样可以避免其他 rank 提前开始新的计算或提前结束导致保存失败,例如:
import torch
import os
import time
if __name__ == '__main__' :
rank = int (os.getenv('RANK' ,'0' ))
world_size = int (os.getenv('WORLD_SIZE' ,'1' ))
torch.distributed.init_process_group(rank=rank, world_size=world_size, backend='nccl' )
devices = torch.cuda.device_count()
torch.cuda.set_device(rank % devices)
if rank == 0 :
time.sleep(20 )
else :
a = 1 + 2
torch.distributed.barrier()
除了上面这几种,还有 all-to-all 算子和 scatter 算子。在 LLM 场景中经这两个算子经常出现在序列并行的正、反向传播中。
在使用通信算子时,需要确保所有相关 rank 都要执行到这一段代码,不然已经执行到这一步的 rank 会 hang 住一直等待,导致程序无法继续。
关于通信模式 上面提到的通信算子基本都对应有自己特殊的通信模式,比如 reduce 算子的背后有 tree-reduce 和 ring-reduce 通信模式,广播、p2p 等等也都分别对应自己的通信模式。在 LLM 训练框架开发这个层面,我们可能需要关注下 ring-reduce 和 tree-reduce 这两种通信模式。按理说这两种通信模式属于 all_reduce 的底层实现,torch 这个层面应该不需要过多关注,但是我在实际开发中多次遇到相关问题,比如:
all_reduce 会根据通信数据量、GPU 的数量、NVLink 连接结构来推断用 ring 还是 tree,这导致在模型并行数、卡数变化时,同一个 all_reduce 操作使用不同的通信模式,有时候会导致一些内存溢出的 bug,尤其是当使用 tree-reduce 且通信环境比较复杂的时候。
ring-reduce 和 tree-reduce 在不同类型的 GPU 上精度损失是不一样的,知乎上有一篇很精彩的 debug 文就是关于追查这个问题的:AI 训练与计算:由 A800 平台训练 InternLM-7B 无法收敛引发的思考
我在 torch 2.3 版本还遇到了 communicator 创建了错误数量的 ranks、cuBus 错误复用等问题,2.4 得到修复。
当你怀疑可能自己也遇到了这类问题时,可以考虑通过环境变量 NCCL_ALGO 强制指定使用 tree 还是 ring。或者尝试用 all_gather 替换 all_reduce,或者更新 torch 版本。
当你打算涉足通信,就意味着你即将离开 torch 稳定的后方,站上痛苦 debug 的前线。
通信组 上面的提到的算子,除了 p2p 通信,基本都是所有 rank 参与的通信,如果只能这样未免有些太死板。想要前 5 个 rank all_reduce,后 5 个 rank all_gather 应该怎么做?广播只广播给奇数 rank 怎么做?一部分操作在 gpu 上用 nccl 通信,一部分在 cpu 上用 gloo 通信怎么实现?如果 2 个 rank 同时用 p2p 通信不同张量,怎么做区分?这里就轮到通信组登场了。
注:之后的例子会更复杂一点,之后的所有脚本都会启动 4 个 rank,也就是在此之后的所有 --nproc-per-node 都为 4
创建单个通信组 ranks = [0 ,1 ]
group = torch.distributed.new_group(ranks,backend='nccl' )
上面这段代码的意思是创建 1 个通信组,包含 rank0 和 rank1,以 nccl 作为后端。在使用通信算子时,我们可以通过 group 参数指定通信组,这样数据交换就只会发生在组内。比如当你这样使用 barrier 时,可以指定通信组,这样只要组内的 rank 都到达 barrier,就可以继续,而不是等待所有 rank 都到达 barrier 才能继续:
import torch
import os
if __name__ == '__main__' :
rank = int (os.getenv('RANK' ,'0' ))
world_size = int (os.getenv('WORLD_SIZE' ,'1' ))
torch.distributed.init_process_group(rank=rank, world_size=world_size, backend='nccl' )
devices = torch.cuda.device_count()
torch.cuda.set_device(rank % devices)
ranks = [0 ,1 ]
group = torch.distributed.new_group(ranks)
if rank in ranks:
torch.distributed.barrier(group=group)
print (f'rank:{rank} finish' )
else :
torch.distributed.barrier()
print (f'rank:{rank} finish' )
rank:0 finish
rank:1 finish
rank:2 finish (blocked)
rank:3 finish (blocked)
因为 rank0、1 的 barrier 指定了通信组,所以只要 0、1 两个 rank 运行到 barrier 就可以继续,打印出 finish。rank2、3 的 barrier 没有指定通信组,因此他们会等待所有 rank 到达,但是 rank0、1 并不会进入这个 else 分支,所以 rank2、3 会卡住。
所有通信相关算子都支持指定通信组,作用都是类似的,就是把之前全部 rank 间的通信变为组内 rank 间的通信。对于 p2p 通信来说,group 还有一个重要作用就是可以用来作为通信标识符。如果两个 rank 间同时进行多个 p2p 通信,不同的 group 可以用于区分不同的通信。
创建多个通信组 通信组可以创建多个,并且每个可以指定使用不同的后端,方便进行 cuda 或 cpu 张量的混合通信:
group1 = torch.distributed.new_group([0 ,1 ])
group2 = torch.distributed.new_group([2 ,3 ])
group3 = torch.distributed.new_group([1 ,2 ,3 ])
group4 = torch.distributed.new_group([0 ,3 ], backend='gloo' )
在使用通信组时,rank 自身必须是这个通信组的成员,比如 rank3 不能使用 group1。那么既然 rank3 不能使用 group1,那 rank3 能不能干脆就不创建 group1,只创建 group2、3、4 呢?答案是不行。torch 对创建通信组有两个要求:
通信组中涉及到的所有 rank 必须全都执行创建通信组的代码
比如在 rank0 创建的第一个通信组是 [0, 1],那么 rank1 创建的第一个也必须这个,rank2、3 也一样,即使他们不能用这个通信组。所以多通信组的创建代码一般是长这样的:
group = None
rank = torch.distributed.get_rank()
for ranks in [[0 ,1 ],[2 ,3 ]]:
_group = torch.distributed.new_group(ranks)
if rank in ranks:
group = _group
这样保证 group 的创建顺序是一致的,并且只保留自己这个 rank 能用的组。想一想这个设计也是合理的,想象一下两个并行的进程,相互之间是访问不到对方变量的,那两个进程怎么知道对方是用哪个通信组呢?首先构成通信组的 rank 不行,因为可以两个组可以由相同的 rank 组成,这个不具有唯一性。创建时间也不行,因为每个 rank 并不是完全同步的。可以人为地为每个组指定一个唯一 id,那自增 id 也可以。
说到这里,其实基础的 torch 分布式训练功能就都讲完了,下面我们做个把这些功能都用上的小 demo。
分布式训练 demo 我们以语言模型为背景,实现一个蒸馏框架,这里 teacher 和 student 模型是模拟语言模型输入输出的假模型。蒸馏框架支持数据并行,训练数据也是模拟的。分布式优化器要自己实现。框架要支持 teacher 和 student 的重叠计算和数据通信。(如果有兴趣的话,推荐在 megatron 框架里实现一个蒸馏框架)
下面我们一步一步来完成,部分上面已经实现过的函数这里就不再实现了。写 demo 我习惯讲一步写一步,并且不给出可以直接执行的完整代码。
我们写一个模拟语言模型输入的迭代器来生成假的数据。语言模型的输入是 token 序列,也就是 shape = [batch_size, seq_length] 值为 0 - vocab_size 的整型张量,这里 seq_length 我们每次随机一个值。
import torch
class Dataloader :
def __init__ (self,batch_size, max_length, vocab_size ):
self .batch_size = batch_size
self .max_length = max_length
self .vocab_size = vocab_size
def __iter__ (self ):
while True :
length = torch.randint(2 ,self .max_length,size=(1 ,))
input_ids = torch.randint(0 ,self .vocab_size,size=(self .batch_size,length),device='cpu' )
yield input_ids
teacher 和 student 模型都是语言模型,输入是 token 序列,输出是 token 序列的词表概率,也就是输出一个 shape = [batch_size, seq_length, vocab_size] 的浮点张量。这里我们假设参数就假设只有一个 lm head 头。
class Model (torch.nn.Module):
def __init__ (self, vocab_size ):
super ().__init__()
self .lm_head = torch.nn.Parameter(torch.randn(1 , vocab_size, dtype=torch.float16))
def forward (self,input_ids:torch.Tensor ):
logits = input_ids.unsqueeze(-1 ).to(self .lm_head.dtype) @ self .lm_head
probs = logits.softmax(-1 )
return probs
我们要实现一个分布式优化器用来更新 student 模型的参数。因为我们使用的是数据并行,梯度分散在每张卡上,在更新模型参数前,需要先进行一次 all_reduce,把所有梯度加在一起。这里优化器需要传入一个通信组参数,包含所有学生 rank,因为 teacher 模型没有优化器,不参与 all_reduce。
class DistrubutedAdam (torch.optim.Adam):
def __init__ (self, *args, group=None , **kwargs ):
self .group = group
super ().__init__(*args, **kwargs)
def step (self, closure=None ):
if closure is not None :
closure.mean().backward()
for group in self .param_groups:
for param in group['params' ]:
if param.grad is not None :
torch.distributed.all_reduce(param.grad, group=self .group)
super ().step()
数据并行的通信组,用于区分模型的角色(student 还是 teacher),以便优化器只对当前角色的全部模型进行规约。
teacher-student 通信组,用于标记需要相互之间传递数据的 teacher 和 student。这个通信组在本例里这种简单通信设定下不是必要的,不创建也可以。
world_size 总数要求是偶数,前一半用来当 student,后一半是 teacher。
student0 和 teacher0 配对通信数据,student1 与 teacher1 配对通信数据,…
data_parallel_group = None
teacher_student_group = None
def create_group ():
rank = torch.distributed.get_rank()
world_size = torch.distributed.get_world_size()
assert world_size % 2 == 0
student_ranks = list (range (world_size // 2 ))
teacher_ranks = list (range (world_size // 2 , world_size))
global data_parallel_group
global teacher_student_group
for ranks in [student_ranks,teacher_ranks]:
group = torch.distributed.new_group(ranks, backend='nccl' )
if rank in ranks:
data_parallel_group = group
for ranks in zip (student_ranks,teacher_ranks):
group = torch.distributed.new_group(ranks, backend='nccl' )
if rank in ranks:
teacher_student_group = group
teacher 模型计算出的 prob 要发送给 student 模型。每条数据只需被一个 teacher 模型计算,teacher 模型计算结果也只需要发给一个 student,所以这里用 p2p 通信。我们的输入序列长度是随机变化的,所以这里需要用到上面提到的动态 shape 的 p2p 通信。之前说 p2p 算子用的是 send 和 recv,但是这两个算子是同步算子,这里我们用异步算子。
def send_tensor (tensor, dst ):
shape = torch.tensor(tensor.shape, dtype=torch.int64, device='cuda' )
ops = []
ops.append(torch.distributed.P2POp(
torch.distributed.isend,
shape,
dst,
group=teacher_student_group
))
reqs = torch.distributed.batch_isend_irecv(ops)
for req in reqs:
req.wait()
ops = []
ops.append(torch.distributed.P2POp(
torch.distributed.isend,
tensor,
dst,
group=teacher_student_group
))
reqs = torch.distributed.batch_isend_irecv(ops)
for req in reqs:
req.wait()
这里实现的简单点,tensor 的 ndim 和 dtype 作为参数传入
def recv_tensor (src, ndim, dtype ):
shape = torch.empty(ndim, dtype=torch.int64, device='cuda' )
ops = []
ops.append(torch.distributed.P2POp(
torch.distributed.irecv,
shape,
src,
group=teacher_student_group
))
reqs = torch.distributed.batch_isend_irecv(ops)
for req in reqs:
req.wait()
tensor = torch.empty(torch.Size(shape), dtype=dtype, device='cuda' )
ops = []
ops.append(torch.distributed.P2POp(
torch.distributed.irecv,
tensor,
src,
group=teacher_student_group
))
reqs = torch.distributed.batch_isend_irecv(ops)
for req in reqs:
req.wait()
return tensor
注:异步 p2p 操作是先创建算子(op),再批量执行(batch_isend_irecv),可以增加并行度。比如在 megatron 的 pp 平行中,向后一张卡发送计算结果和接收后一张卡回传梯度是同时进行的。
上面各个重要模块的功能已经实现完了,最后还剩下主函数。首先还是环境初始化,设置默认 cuda 设备。然后创建通信组,定义 vocab size 为 20,计算 teacher rank 的 offset:
if __name__ == '__main__' :
rank = int (os.getenv('RANK' ,'0' ))
world_size = int (os.getenv('WORLD_SIZE' ,'1' ))
torch.distributed.init_process_group(rank=rank, world_size=world_size, backend='nccl' )
devices = torch.cuda.device_count()
torch.cuda.set_device(rank % devices)
create_group()
vocab_size = 20
teacher_offset = world_size // 2
rank 小于 offset 的都是 student,首先设置随机种子,然后初始化数据集、模型、优化器。
load 到 input_ids 之后,student 先发送给 teacher,然后再开始计算,计算得到 prob 后再从 teacher 接收 tensor,来实现 student 和 teacher 的重叠。
计算 kl 散度作为 loss,反向传播,然后调用 optimizer.step 更新模型参数。
打印变量,让我们看到每个 rank 的 loss 和总的平均 loss,以及打印当前模型的前两个参数。
loss 在下降,且每个 rank 模型参数都始终是相同的我们的代码才算是正确的。
if rank < teacher_offset:
torch.random.manual_seed(1 )
dataloader = Dataloader(1 ,200 ,vocab_size)
model = Model(vocab_size).half().cuda()
optimizer = DistrubutedAdam(model.parameters(), lr=1e-2 , group=data_parallel_group, eps=1e-4 )
for i,input_ids in enumerate (dataloader):
if i % teacher_offset != rank:
continue
optimizer.zero_grad()
input_ids = input_ids.cuda()
send_tensor(input_ids, rank + teacher_offset)
student_probs = model(input_ids)
teacher_probs = recv_tensor(rank + teacher_offset, 3 , torch.float16)
kl_loss = teacher_probs * ((teacher_probs + 1e-5 ).log() - (student_probs + 1e-5 ).log())
kl_loss = kl_loss.sum (-1 ).mean() / torch.distributed.get_world_size(data_parallel_group)
kl_loss.backward()
optimizer.step()
reporting_kl_loss = kl_loss.clone()
torch.distributed.all_reduce(reporting_kl_loss, group=data_parallel_group)
print (f'rank:{rank} reporting kl loss:{reporting_kl_loss} kl loss:{kl_loss} weight:{model.lm_head.data[0 ,:2 ]} ' ,flush=True )
torch.distributed.barrier(group=data_parallel_group)
if i >= 10 :
break
else :
一般所有 rank 的随机种子我们都设成一样的就行,这里故意把 teacher 的随机种子设成不一样的,避免 teacher 和 student 计算出来的 prob 完全一致,kl 始终为 0。
if rank < teacher_offset:
else :
torch.random.manual_seed(2 )
model = Model(vocab_size).half().cuda()
model.eval ()
while True :
input_ids = recv_tensor(rank - teacher_offset, 2 , torch.int64)
teacher_probs = model(input_ids)
send_tensor(teacher_probs, rank - teacher_offset)
rank:0 reporting kl loss:tensor(0.0001) kl loss:tensor(0.0001) weight:tensor([-0.1234, 0.5678], dtype=torch.float16)
rank:1 reporting kl loss:tensor(0.0001) kl loss:tensor(0.0001) weight:tensor([-0.1234, 0.5678], dtype=torch.float16)
把训练 10 条数据退出的逻辑去掉,应该能看到 loss 很快降到 1e-4 以下,这样 demo 就算完成了。
这个 demo 并不是最优的蒸馏框架。首先 teacher 和 student 的参数量不一样,且 teacher 不进行反向传播,因此两者的计算速度不一样,一个 teacher 配一个 student 效率并不一定高。可能会需要几个 teacher 对 1 个 student,或者 1 个 teacher 对几个 student 的情况,这是完全体框架要实现的。其次,目前主流模型的词表大小在 15 万左右,训练数据的长度一般是 8k,也就是最后的 teacher_probs 是一个 8k * 150k 的 float 张量,通信成本太高。一种优化策略是 sample 一部分,比如取 top-n,或者放回、不放回采样。另一个策略是把 teacher 的 lm_head 层放到 student 所在的 rank。在计算 logits 之前,把 teacher 的 hidden_states 通信过来,在 student 本地乘 lm_head 算出 logits。
register_hook register_hook 虽然不是分布式相关的功能,但基本每个框架都会用到。register_hook 的作用是在参数或算子上注册一个回调函数,当该参数或算子的梯度计算完成,但还没有赋值给 grad 的时候调用。如果回调函数有返回值,会使用返回值替换原本的梯度。
import torch
def print_grad (grad ):
print (grad)
return grad / 2
w = torch.nn.Parameter(torch.randn(2 , 2 ))
w.register_hook(print_grad)
loss = (w - 1 ) ** 2
print ('before backward' )
loss.mean().backward()
print ('after backward' )
print (w.grad)
before backward
tensor([[...]])
after backward
tensor([[...]])
用 0 替换掉梯度里的 nan 值是一些文章介绍 register_hook 给出的例子,但是实际编程我不推荐这么做。我建议遇到 nan 直接抛出异常,不要改成某个安全值然后更新模型,否则模型出了问题完全无法定位。
register_hook 不仅可以把回调函数注册在参数上,还可以注册在算子上,这也是各个框架对 register_hook 的主要用法。比如下面这个操作,就是注册在了加法算子上:
import torch
def parameter_hook (grad ):
print ('parameter hook' )
def operator_hook (*grads ):
print ('operator_hook' )
w = torch.nn.Parameter(torch.randn(2 , 2 ))
w.register_hook(parameter_hook)
print ('first' )
y = w + 1
op1 = y.grad_fn
print (op1)
op1.register_hook(operator_hook)
y.sum ().backward()
print ('second' )
z = w + 1
op2 = z.grad_fn
print (op2)
z.sum ().backward()
first
<AddBackward0 object at ...>
operator_hook
parameter_hook
second
<AddBackward0 object at ...>
operator_hook
parameter_hook
算子一般都是一次性的,且是先执行算子的回调再执行参数的回调。但是有一个特殊的算子是梯度累积算子,它的回调函数发生在参数的回调函数之后,且这个算子不会每次都创建新的。
import torch
def parameter_hook (grad ):
print ('parameter hook' )
def operator_hook (*grads ):
print ('operator_hook' )
w = torch.nn.Parameter(torch.randn(2 , 2 ))
w.register_hook(parameter_hook)
y = w + 1
op = y.grad_fn.next_functions[0 ][0 ]
print (op)
op.register_hook(operator_hook)
print ('first' )
y.sum ().backward()
print ('second' )
z = w + 1
op2 = z.grad_fn.next_functions[0 ][0 ]
print (op2)
z.sum ().backward()
很多框架会围绕着梯度累计算子的这个特性展开。为了获得梯度累积算子,需要创建一个计算图。一般用 expand_as,这个计算结果的 grad_fn 指向的是 expand_as 自己,next_functions 指向的是上一个算子,也就是梯度累积算子:
grad_acc_op = w.expand_as(w).grad_fn.next_functions[0 ][0 ]
然后可以利用闭包注册一个 hook,让 hook 能够直接访问参数而不仅仅是梯度。
def make_grad_hook (param ):
def hook (*grads ):
print (param.grad)
return hook
grad_acc_op.register_hook(make_grad_hook(w))
我们可以简单看下 megatron 和 deepspeed 的代码,看看他们是怎么用的。下面这段是 megatron 的用法:
megatron 的优化器并不使用 param.grad,而是自己在参数上注册了一个 param.main_grad,用它来累积梯度。这个 main_grad 不会删除,只会累积和清 0。然后在最后一个 microbatch 的时候做 allreduce,而不是等优化器来做:
deepspeed 因为要支持连续内存和 cpu offload,逻辑更加复杂,会等积攒的 grad 数量够了一批一批的操作:
会使用不同事件流(stream)来增加计算和梯度累积的重叠度:
使用了锁页内存,并且会在 cpu 和 gpu 之间来回来去传递张量来进行计算。
有兴趣的读者可以自己研究一下,deepspeed 的代码我一直没机会仔细读一遍。
多进程 在理解 torch 分布式训练时,多进程这个概念是一直伴随我们左右的。使用 torchrun 启动脚本,就是以多进程方式启动脚本。这里我们还可以再深入了解一下 torch 与多进程。
import multiprocessing as mp
def main (rank, world ):
print (rank, world)
if __name__ == '__main__' :
world_size = 4
ps = [mp.Process(None , main, args=(rank, world_size)) for rank in range (world_size)]
for p in ps:
p.start()
for p in ps:
p.join()
这种多进程能不能拿来作为 torch 的分布式训练环境呢?当然是可以的,只需要这样操作:
import multiprocessing as mp
import torch
import torch.distributed
def main (rank, world_size, master_addr='127.0.0.1' , master_port=29500 ):
init_method = f'tcp://{master_addr} :{master_port} '
torch.distributed.init_process_group(rank=rank,world_size=world_size,init_method=init_method,backend='nccl' )
print (f'rank:{torch.distributed.get_rank()} world_size:{torch.distributed.get_world_size()} ' )
if __name__ == '__main__' :
world_size = 4
ps = [mp.Process(None , main, args=(rank, world_size)) for rank in range (world_size)]
for p in ps:
p.start()
for p in ps:
p.join()
这里 init_method 就是用来在初始化阶段,实现进程发现的方法,除了 tcp,还可以用本地文件发现或者环境变量。除了 multiprocessing 的多进程,用 subprocess 的多进程也是一样可以的。当然 torch 也提供了一种启动多进程的方法:
import torch
def main (rank, world_size, master_addr='127.0.0.1' , master_port=29500 ):
init_method = f'tcp://{master_addr} :{master_port} '
torch.distributed.init_process_group(rank=rank,world_size=world_size,init_method=init_method,backend='nccl' )
print (f'rank:{torch.distributed.get_rank()} world_size:{torch.distributed.get_world_size()} ' )
if __name__ == '__main__' :
world_size = 4
torch.multiprocessing.spawn(main, args=(world_size,), nprocs=world_size)
import multiprocessing as mp
import torch
import torch.distributed
def sub_process (rank, world_size, master_addr='127.0.0.1' , master_port=29500 ):
init_method = f'tcp://{master_addr} :{master_port} '
torch.distributed.init_process_group(rank=rank, world_size=world_size, init_method=init_method, backend='nccl' )
torch.distributed.barrier()
print (f'rank:{torch.distributed.get_rank()} world_size:{torch.distributed.get_world_size()} ' )
def main (rank, world_size, master_addr='127.0.0.1' , master_port=29500 ):
init_method = f'tcp://{master_addr} :{master_port} '
process = mp.Process(None , sub_process, args=(rank + world_size, 2 * world_size,))
process.start()
torch.distributed.init_process_group(rank=rank, world_size=2 *world_size, init_method=init_method, backend='nccl' )
torch.distributed.barrier()
print (f'rank:{torch.distributed.get_rank()} world_size:{torch.distributed.get_world_size()} ' )
if __name__ == '__main__' :
world_size = 2
ps = [mp.Process(None , main, args=(rank, world_size)) for rank in range (world_size)]
for p in ps:
p.start()
for p in ps:
p.join()
在 torch 的多进程中,再次启动子进程有一点需要注意的地方。那就是如果在启动子进程之前触发了任何与 cuda 相关的操作,比如使用了 set_device,或者在 cuda 上创建了一个张量,那么子进程中就不能再使用 cuda。比如下面这段代码:
import multiprocessing as mp
import torch
import torch.distributed
def sub_process ():
tensor = torch.tensor([2 ]).cuda(0 )
if __name__ == '__main__' :
torch.cuda.set_device(0 )
process = mp.Process(None ,sub_process)
process.start()
process.join()
RuntimeError: Cannot re-initialize CUDA in forked subprocess
这个报错的意思是,cuda 环境只能初始化一次,并且与进程绑定。Linux 上创建的子进程默认使用的是 fork 的方式。fork 创建的子进程会继承父进程的内存空间,因此已经绑定了父进程的 cuda 环境被继承给了子进程,子进程使用 cuda 就会报错。报错中要求子进程以 spawn 方式启动,是因为 spawn 方式启动的子进程使用的是全新的解释器,cuda 还处于未初始化的状态。
这里用的时候需要权衡清楚,究竟需不需要子进程继承父进程的内存,以及是否需要在子进程使用 cuda。子进程如果用 spawn 方式启动不继承父进程,可能需要单独初始化分布式环境,父进程的全局变量子进程也用不了。如果用 fork 方式启动继承父进程内存,意味着继承了父进程创建的各种变量,以及父进程初始化过的分布式环境但是不能用 cuda。另外需要注意的是,就算用 fork 方式启动,子进程也继承不了父进程创建的通信组,但是会继承'通信组的创建顺序'。意思是如果 rank0 顺序创建了 5 个 group,rank1 创建的 3 个 group,然后用 fork 方式启动了一个子进程,子进程又创建了 2 个,这 2 个会去对应 rank0 的第 4、5 个 group。
说了这么多,多进程好像挺麻烦,那他相比 torchrun 有啥好处呢?
好处就是可以更加灵活的使用 init_process_group 初始化环境,以区分不同角色。比如上面的我们的 demo 中的这个蒸馏场景,我们是 4 个 rank,分成 2 个 student2 个 teacher,通信还不是很复杂。那如果 student 和 teacher 不是各占一卡,而是用了 3d 混合并行占很多卡,相互之间还有 tp、pp 的通信,通信逻辑就很复杂了。我们可以考虑给 teacher 和 student 分别用一套不同的 rank、world_size 个 init_method 初始化,让他们在这个分布式环境中只能看见自己这个角色的进程,这样就只需要实现自己的 3d 混合并行就可以了。
再比如如果你想在自己的训练环境中引入一个 VLLM 模型。VLLM 内部是会调用 init_process_group 创建自己的环境,使用自己的 rank 和 world_size 来实现 TP 并行的,和你训练环境的 init_process_group 是冲突的。这个时候使用自定义的多进程,就可以减小 VLLM 的干扰。
问题来了,每个角色都使用独立的分布式环境,相互之间怎么通信呢?这就是最后的部分,TorchRPC。
RPC TorchRPC 原本的用法是在本地创建远程变量的引用,在本地调用远程函数。但是我觉得这种编程不灵活且抽象,堪比 tf 的静态图,所以我这里不把 rpc 当作远程调用,只把他当作对 p2p 算子的封装,以及除 init_process_group 之外第二种建立 rank 间通信的方式。
rpc 的初始化和分布式环境很像。rpc 的初始化和 init_process_group 可以同时存在,且可以使用不同的 rank 和 world_size:
import multiprocessing as mp
import torch
def main (rank,world_size ):
torch.distributed.init_process_group(rank=0 ,world_size=1 ,backend='nccl' ,init_method=f'tcp://127.0.0.1:{29500 +rank} ' )
options = torch.distributed.rpc.TensorPipeRpcBackendOptions(init_method='tcp://127.0.0.1:30001' )
torch.distributed.rpc.init_rpc(f'worker-{rank} ' , rank=rank, world_size=world_size, rpc_backend_options=options)
print (f'rank: {torch.distributed.get_rank()} ' ,
f' world_size: {torch.distributed.get_world_size()} ' ,
f' {torch.distributed.rpc.get_worker_info()} ' )
torch.distributed.rpc.shutdown()
if __name__ == '__main__' :
world_size = 4
ps = [mp.Process(None ,main,args=(rank,world_size)) for rank in range (world_size)]
for p in ps:
p.start()
for p in ps:
p.join()
rank: 0 world_size: 4 worker-info...
这里需要注意一点,因为我们每个 rank 都独立各自初始化分布式环境,互不干扰,因此 init_method 的 port 要换一下。
我们在简单重写一下之前的蒸馏 demo,主要为了演示在这种用法下如何使用 rpc 通信。
import multiprocessing as mp
import torch
import torch.distributed
from torch.distributed import rpc
class Model :
def __call__ (self, tensor ) -> torch.Any :
return tensor + 1
def call_model (tensor ):
return model(tensor)
model = None
def teacher (rank,world_size ):
torch.distributed.init_process_group(rank=0 ,world_size=1 ,backend='nccl' ,init_method=f'tcp://127.0.0.1:{29500 +rank} ' )
options = rpc.TensorPipeRpcBackendOptions(init_method='tcp://127.0.0.1:30000' )
global model
model = Model()
rpc.init_rpc('teacher' , rank=rank, world_size=world_size, rpc_backend_options=options)
rpc.shutdown()
先初始化分布式环境,然后创建模型,赋值给全局变量,然后再初始化 rpc,确保 rpc 初始化后模型一定已经准备好了,最后 shutdown 等待。这里初始化分布式环境只是模拟一下,RPC 本身并不依赖分布式环境。
def student (rank,world_size ):
torch.distributed.init_process_group(rank=0 ,world_size=1 ,backend='nccl' ,init_method=f'tcp://127.0.0.1:{29500 +rank} ' )
options = rpc.TensorPipeRpcBackendOptions(init_method='tcp://127.0.0.1:30000' )
rpc.init_rpc('student' , rank=rank, world_size=world_size, rpc_backend_options=options)
input_ids = torch.randn(4 )
teacher_probs = rpc.rpc_async('teacher' , call_model, args=(input_ids,))
student_probs = input_ids
loss = teacher_probs.wait() - student_probs
print (loss)
rpc.shutdown()
student 也是先初始化分布式环境和 rpc,然后模拟一下输入数据 input_ids。然后通过 rpc.rpc_async 异步远程调用 teacher 进程的 call_model 函数。此时 teacher 进程的全局变量应该已经有值了,call_model 可以正常返回。这里使用异步调用,不需要等待结果,直接继续。下面就是假装 student 在计算,得到 student_probs,然后计算一下差值,使用 teacher_probs.wait() 等待远程调用的结果,不出意外应该等于全 1 向量。
def main (rank, world_size ):
teacher_offset = world_size // 2
if rank < teacher_offset:
student(rank, world_size)
else :
teacher(rank, world_size)
if __name__ == '__main__' :
world_size = 2
ps = [mp.Process(None ,main,args=(rank,world_size)) for rank in range (world_size)]
for p in ps:
p.start()
for p in ps:
p.join()
RPC 后端对 NVlink、IB 等都是支持的,也支持传递 cuda 张量,只需要在初始化 rpc 环境时,指定一下本机 cuda 和远程 cuda 的映射。
rpc.TensorPipeRpcBackendOptions(init_method='tcp://127.0.0.1:30000' , device_maps={'teacher' :{0 :1 }})
这个表示把 teacher 进程的 cuda1 映射到本地的 cuda0,这样本地 cuda0 张量通信到远端时就会被放到 cuda1,不需要移动到 cpu。
最后再说一下,RPC 的官方用法是远程调用和远程引用,可以去看官网教程。
后记 目前已经在 Megatron-RPC 框架下实现了 SFT、DPO、Distillation 和 on-policy RS,性能持平 P2P 通信,远超一些非 IB、NVLink 的通信方案,证明 RPC 方案确实可行。RPC 是 torch1.4 版本就引入的特性,支持远程引用(本地创建一个模型,但是占用远程机器的显存)、链式异步调用(以异步函数的方式调用远程模型,且调用过程中支持继续调用其他远程模型)和自动求导(远程调用返回的结果可以求导,梯度传递给远程模型)。不得不感慨 torch 确实有前瞻性,以前都没太关注过这个特性。
相关免费在线工具 加密/解密文本 使用加密算法(如AES、TripleDES、Rabbit或RC4)加密和解密文本明文。 在线工具,加密/解密文本在线工具,online
RSA密钥对生成器 生成新的随机RSA私钥和公钥pem证书。 在线工具,RSA密钥对生成器在线工具,online
Mermaid 预览与可视化编辑 基于 Mermaid.js 实时预览流程图、时序图等图表,支持源码编辑与即时渲染。 在线工具,Mermaid 预览与可视化编辑在线工具,online
随机西班牙地址生成器 随机生成西班牙地址(支持马德里、加泰罗尼亚、安达卢西亚、瓦伦西亚筛选),支持数量快捷选择、显示全部与下载。 在线工具,随机西班牙地址生成器在线工具,online
Gemini 图片去水印 基于开源反向 Alpha 混合算法去除 Gemini/Nano Banana 图片水印,支持批量处理与下载。 在线工具,Gemini 图片去水印在线工具,online
curl 转代码 解析常见 curl 参数并生成 fetch、axios、PHP curl 或 Python requests 示例代码。 在线工具,curl 转代码在线工具,online