Pytorch源码DDP之 DistributedSampler

Pytorch源码DDP之 DistributedSampler
import math
from typing import TypeVar, Optional, Iterator

import torch
from . import Sampler, Dataset
import torch.distributed as dist

__all__ = ["DistributedSampler", ]

T_co = TypeVar('T_co', covariant=True)


class DistributedSampler(Sampler[T_co]):
    r"""Sampler that restricts data loading to a subset of the dataset.

    It is especially useful in conjunction with
    :class:`torch.nn.parallel.DistributedDataParallel`. In such a case, each
    process can pass a :class:`~torch.utils.data.DistributedSampler` instance as a
    :class:`~torch.utils.data.DataLoader` sampler, and load a subset of the
    original dataset that is exclusive to it.

    .. note::
        Dataset is assumed to be of constant size and that any instance of it always
        returns the same elements in the same order.

    Args:
        dataset: Dataset used for sampling.
        num_replicas (int, optional): Number of processes participating in
            distributed training. By default, :attr:`world_size` is retrieved from the
            current distributed group.
        rank (int, optional): Rank of the current process within :attr:`num_replicas`.
            By default, :attr:`rank` is retrieved from the current distributed
            group.
        shuffle (bool, optional): If ``True`` (default), sampler will shuffle the
            indices.
        seed (int, optional): random seed used to shuffle the sampler if
            :attr:`shuffle=True`. This number should be identical across all
            processes in the distributed group. Default: ``0``.
        drop_last (bool, optional): if ``True``, then the sampler will drop the
            tail of the data to make it evenly divisible across the number of
            replicas. If ``False``, the sampler will add extra indices to make
            the data evenly divisible across the replicas. Default: ``False``.

    .. warning::
        In distributed mode, calling the :meth:`set_epoch` method at
        the beginning of each epoch **before** creating the :class:`DataLoader` iterator
        is necessary to make shuffling work properly across multiple epochs. Otherwise,
        the same ordering will be always used.

    Example::

        >>> # xdoctest: +SKIP
        >>> sampler = DistributedSampler(dataset) if is_distributed else None
        >>> loader = DataLoader(dataset, shuffle=(sampler is None),
        ...                     sampler=sampler)
        >>> for epoch in range(start_epoch, n_epochs):
        ...     if is_distributed:
        ...         sampler.set_epoch(epoch)
        ...     train(loader)
    """

    def __init__(self, dataset: Dataset, num_replicas: Optional[int] = None,
                 rank: Optional[int] = None, shuffle: bool = True,
                 seed: int = 0, drop_last: bool = False) -> None:
        if num_replicas is None:
            if not dist.is_available():
                raise RuntimeError("Requires distributed package to be available")
            num_replicas = dist.get_world_size()
        if rank is None:
            if not dist.is_available():
                raise RuntimeError("Requires distributed package to be available")
            rank = dist.get_rank()
        if rank >= num_replicas or rank < 0:
            raise ValueError(
                f"Invalid rank {rank}, rank should be in the interval [0, {num_replicas - 1}]")
        self.dataset = dataset
        self.num_replicas = num_replicas
        self.rank = rank
        self.epoch = 0
        self.drop_last = drop_last
        # If the dataset length is evenly divisible by # of replicas, then there
        # is no need to drop any data, since the dataset will be split equally.
        if self.drop_last and len(self.dataset) % self.num_replicas != 0:  # type: ignore[arg-type]
            # Split to nearest available length that is evenly divisible.
            # This is to ensure each rank receives the same amount of data when
            # using this Sampler.
            self.num_samples = math.ceil(
                (len(self.dataset) - self.num_replicas) / self.num_replicas  # type: ignore[arg-type]
            )
        else:
            self.num_samples = math.ceil(len(self.dataset) / self.num_replicas)  # type: ignore[arg-type]
        self.total_size = self.num_samples * self.num_replicas
        self.shuffle = shuffle
        self.seed = seed

    def __iter__(self) -> Iterator[T_co]:
        if self.shuffle:
            # deterministically shuffle based on epoch and seed
            g = torch.Generator()
            g.manual_seed(self.seed + self.epoch)
            indices = torch.randperm(len(self.dataset), generator=g).tolist()  # type: ignore[arg-type]
        else:
            indices = list(range(len(self.dataset)))  # type: ignore[arg-type]

        if not self.drop_last:
            # add extra samples to make it evenly divisible
            padding_size = self.total_size - len(indices)
            if padding_size <= len(indices):
                indices += indices[:padding_size]
            else:
                indices += (indices * math.ceil(padding_size / len(indices)))[:padding_size]
        else:
            # remove tail of data to make it evenly divisible.
            indices = indices[:self.total_size]
        assert len(indices) == self.total_size

        # subsample
        indices = indices[self.rank:self.total_size:self.num_replicas]
        assert len(indices) == self.num_samples

        return iter(indices)

    def __len__(self) -> int:
        return self.num_samples

    def set_epoch(self, epoch: int) -> None:
        r"""
        Set the epoch for this sampler.

        When :attr:`shuffle=True`, this ensures all replicas
        use a different random ordering for each epoch. Otherwise, the next iteration of this
        sampler will yield the same ordering.

        Args:
            epoch (int): Epoch number.
        """
        self.epoch = epoch

Claude-3.5-Sonnet

Poe

我来逐行详细解释这段代码,同时说明数据是如何广播到多卡的。

  1. 首先是导入部分:

python

复制

import math  # 用于数学计算
from typing import TypeVar, Optional, Iterator  # 类型提示
import torch
from . import Sampler, Dataset  # 导入基类
import torch.distributed as dist  # 分布式训练模块

# 声明对外暴露的类
__all__ = ["DistributedSampler", ]

# 定义协变类型变量
T_co = TypeVar('T_co', covariant=True)
  1. 类的初始化部分:

python

复制

def __init__(self, dataset: Dataset, num_replicas: Optional[int] = None,
             rank: Optional[int] = None, shuffle: bool = True,
             seed: int = 0, drop_last: bool = False) -> None:
    # 如果未指定num_replicas(GPU数量),从环境获取
    if num_replicas is None:
        if not dist.is_available():
            raise RuntimeError("Requires distributed package to be available")
        num_replicas = dist.get_world_size()  # 获取总进程数
        
    # 如果未指定rank(GPU编号),从环境获取
    if rank is None:
        if not dist.is_available():
            raise RuntimeError("Requires distributed package to be available")
        rank = dist.get_rank()  # 获取当前进程号
        
    # 验证rank值的合法性
    if rank >= num_replicas or rank < 0:
        raise ValueError(
            f"Invalid rank {rank}, rank should be in the interval [0, {num_replicas - 1}]")
            
    # 保存参数
    self.dataset = dataset
    self.num_replicas = num_replicas  # GPU数量
    self.rank = rank                  # 当前GPU编号
    self.epoch = 0                    # 当前epoch
    self.drop_last = drop_last        # 是否丢弃不完整的batch
  1. 样本数量计算部分:

python

复制

    # 如果需要丢弃不完整batch且数据集长度不能被GPU数整除
    if self.drop_last and len(self.dataset) % self.num_replicas != 0:
        # 计算每个GPU分配的样本数
        self.num_samples = math.ceil(
            (len(self.dataset) - self.num_replicas) / self.num_replicas
        )
    else:
        # 不丢弃时的样本数计算
        self.num_samples = math.ceil(len(self.dataset) / self.num_replicas)
        
    # 总样本数(可能大于原始数据集大小)
    self.total_size = self.num_samples * self.num_replicas
    self.shuffle = shuffle
    self.seed = seed
  1. 迭代器实现部分:

python

复制

def __iter__(self) -> Iterator[T_co]:
    if self.shuffle:
        # 根据epoch和seed生成确定性随机序列
        g = torch.Generator()
        g.manual_seed(self.seed + self.epoch)
        indices = torch.randperm(len(self.dataset), generator=g).tolist()
    else:
        # 不打乱时使用顺序索引
        indices = list(range(len(self.dataset)))

    if not self.drop_last:
        # 补充数据以确保能被GPU数整除
        padding_size = self.total_size - len(indices)
        if padding_size <= len(indices):
            indices += indices[:padding_size]
        else:
            indices += (indices * math.ceil(padding_size / len(indices)))[:padding_size]
    else:
        # 丢弃多余数据
        indices = indices[:self.total_size]
        
    # 核心分发逻辑:每个GPU获取间隔采样的数据
    # rank=0的GPU获取indices[0, num_replicas, 2*num_replicas, ...]
    # rank=1的GPU获取indices[1, num_replicas+1, 2*num_replicas+1, ...]
    indices = indices[self.rank:self.total_size:self.num_replicas]
    assert len(indices) == self.num_samples

    return iter(indices)
  1. 其他辅助方法:

python

复制

def __len__(self) -> int:
    return self.num_samples

def set_epoch(self, epoch: int) -> None:
    # 设置epoch,确保每轮随机性
    self.epoch = epoch

数据广播到多卡的过程:

  1. 数据分发机制:
  • DistributedSampler为每个GPU创建互不重叠的数据索引
  • 通过stride采样(indices[rank::num_replicas])确保数据均匀分配
  • 每个GPU只处理自己对应的索引数据
  1. 工作流程:

python

复制

# 示例工作流程
for epoch in range(num_epochs):
    sampler.set_epoch(epoch)  # 更新随机种子
    
    for batch_idx, (data, target) in enumerate(train_loader):
        # 每个GPU只获取自己的数据子集
        # data的shape为[local_batch_size, ...]
        
        # 前向传播(各GPU并行)
        output = model(data)
        
        # 反向传播(DDP自动同步梯度)
        loss.backward()
        
        # 参数更新(各GPU同步)
        optimizer.step()
  1. 通信过程:
  • 数据加载:每个GPU独立加载自己的数据子集
  • 前向传播:各GPU并行计算
  • 反向传播:DDP自动进行梯度的all-reduce操作
  • 参数更新:确保所有GPU参数同步

Read more

深入理解 Proxy 和 Object.defineProperty

在JavaScript中,对象是一种核心的数据结构,而对对象的操作也是开发中经常遇到的任务。在这个过程中,我们经常会使用到两个重要的特性:Proxy和Object.defineProperty。这两者都允许我们在对象上进行拦截和自定义操作,但它们在实现方式、应用场景和灵活性等方面存在一些显著的区别。本文将深入比较Proxy和Object.defineProperty,包括它们的基本概念、使用示例以及适用场景,以帮助读者更好地理解和运用这两个特性。 1. Object.defineProperty 1.1 基本概念 Object.defineProperty 是 ECMAScript 5 引入的一个方法,用于直接在对象上定义新属性或修改已有属性。它的基本语法如下: javascript 代码解读复制代码Object.defineProperty(obj, prop, descriptor); 其中,obj是目标对象,prop是要定义或修改的属性名,descriptor是一个描述符对象,用于定义属性的特性。 1.2 使用示例 javascript 代码解读复制代码//

By Ne0inhk

Proxy 和 Object.defineProperty 的区别

Proxy 和 Object.defineProperty 是 JavaScript 中两个不同的特性,它们的作用也不完全相同。 Object.defineProperty 允许你在一个对象上定义一个新属性或者修改一个已有属性。通过这个方法你可以精确地定义属性的特征,比如它是否可写、可枚举、可配置等。该方法的使用场景通常是需要在一个对象上创建一个属性,然后控制这个属性的行为。 Proxy 也可以用来代理一个对象,但是相比于 Object.defineProperty,它提供了更加强大的功能。使用 Proxy 可以截获并重定义对象的基本操作,比如访问属性、赋值、函数调用等等。在这些操作被执行之前,可以通过拦截器函数对这些操作进行拦截和修改。因此,通过 Proxy,你可以完全重写一个对象的默认行为。该方法的使用场景通常是需要对一个对象的行为进行定制化,或者需要在对象上添加额外的功能。 对比 以下是 Proxy 和 Object.defineProperty 的一些区别对比: 方面ProxyObject.defineProperty语法使用 new Proxy(target,

By Ne0inhk