diff --git a/fastNLP/core/drivers/torch_driver/ddp.py b/fastNLP/core/drivers/torch_driver/ddp.py index 020a0d4c..637b1e67 100644 --- a/fastNLP/core/drivers/torch_driver/ddp.py +++ b/fastNLP/core/drivers/torch_driver/ddp.py @@ -7,7 +7,6 @@ from time import sleep from typing import List, Optional, Union, Dict from functools import partial -# todo 这个等大家的 __all__ 都弄完后改为 from fastNLP.env import; from fastNLP.envs.imports import _NEED_IMPORT_TORCH if _NEED_IMPORT_TORCH: import torch @@ -44,20 +43,128 @@ class TorchDDPDriver(TorchDriver): fp16: bool = False, **kwargs ): - """ - DDP 目前考虑支持的三种启动方式: - 1. 用户自己不进行 ddp 的任何操作,直接使用我们的 Trainer,并且只运行一个 main 脚本,这时是由我们自己使用 open_subprocesses 拉起 - 多个进程,然后 TorchDDPDriver 自己 init_process_group; - 2. 其它情况同 1,但是用户自己使用 python -m torch.distributed.launch 拉起; - 3. 用户自己在外面初始化 DDP,并且通过 python -m torch.distributed.launch 拉起; - - 注意多机的启动强制要求用户在每一台机器上使用 python -m torch.distributed.launch 启动; - - 如果用户自己在外面初始化了 ddp,那么 - parallel_device 为 None; - data_device 为 表示单卡的一个参数; - dist.is_initialized 为 true; - + r""" + `TorchDDPDriver` 目前支持的三种启动方式: + 1. 用户自己不进行 ddp 的任何操作,直接使用我们的 Trainer,这时是由我们自己使用 `open_subprocesses` 拉起多个进程, + 然后 `TorchDDPDriver` 自己通过调用 `dist.init_process_group` 来初始化 ddp 的通信组;(情况 A) + 2. 用户同样不在 Trainer 之外初始化 ddp,但是用户自己使用 python -m torch.distributed.launch 拉起来创建多个进程,这时我们仍旧 + 会通过调用 `dist.init_process_group` 来初始化 ddp 的通信组;(情况 B) + 3. 用户自己在外面初始化 DDP,并且通过 python -m torch.distributed.launch 拉起,这时无论是多个进程的拉起和 ddp 的通信组的建立 + 都由用户自己操作,我们只会在 driver.setup 的时候对 `TorchDDPDriver` 设置一些必要的属性值;(情况 C) + + 注意多机的启动强制要求用户在每一台机器上使用 python -m torch.distributed.launch 启动;因此我们不会在 `TorchDDPDriver` 中保存 + 任何当前有多少台机器的信息(num_nodes,不是 gpu 的数量); + + Part 1:三种启动方式的具体分析: + (1)对于用户运行的脚本中,如果 `driver.setup` 只会被调用一次(意味着用户的启动脚本中只初始化了一个 trainer/evaluator)时, + `TorchDDPDriver` 在初始化以及 `setup` 函数中会做的事情分别如下所示: + -> 情况 A:这种情况下用户传入的 model 在一定是普通的 model(没有经 `DistributedDataParallel` 包裹的model), + 因为 `DistributedDataParallel` 的使用一定要求 init_process_group 已经被调用用来建立当前的 ddp 通信组;但是这意味着如果 + 用户需要使用 2 张以上的显卡,那么其必然需要使用 torch.distributed.launch 来启动,意味着就不是情况 A 了; + 这时我们首先会调用 `TorchDDPDriver.open_subprocess` 函数来拉起多个进程,其中进程的数量等于用户传入给 trainer 的使用的 gpu + 的数量(例如 `Trainer` 中的参数是 device=[0, 1, 6, 7],那么我们就会使用第 0、1、6、7 张 gpu 来拉起 4 个进程); + 接着我们会调用 `dist.init_process_group` 来初始化各个进程之间的通信组; + 这里需要注意拉起的新的进程会从前到后完整地运行一遍用户的启动脚本(例如 main.py),因此也都会运行这两个函数,但是需要注意只有进程 0 + 才会去真正地运行 `TorchDDPDriver.open_subprocess`;进程 0 运行到 `dist.init_process_group`,pytorch 会阻塞进程 0 继续 + 向前运行,直到其它进程也运行到这里; + 最后我们会设置这个进程对应的 device,然后将模型迁移到对应的机器上,再使用 `DistributedDataParallel` 将模型包裹; + 至此,ddp 的环境配置过程全部完成; + + -> 情况 B:注意这种情况我们直接限定了用户是通过 torch.distributed.launch 拉起,并且没有自己建立 ddp 的通信组。这时在 + `TorchDDPDriver` 的初始化和 setup 函数的调用过程中,与情况 A 首要的不同就在于用户在 trainer 中输入的参数 device 不再有效, + 这时每个进程所使用的 gpu 是我们直接通过 `torch.device("cuda:{local_rank}")` 来配置的;因此,如果用户想要实现使用特定 gpu + 设备的目的,可以通过自己设置环境变量实现(例如 os.environ["CUDA_VISIBLE_DEVICE"] 来实现);剩下的操作和情况 A 类似; + + -> 情况 C:注意这种情况我们限定了用户是通过 torch.distributed.launch 拉起,并且 ddp 的通信组也是由自己建立。这时基本上所有的 + 与操作相关的操作都应当由用户自己完成,包括迁移模型到对应 gpu 上以及将模型用 `DistributedDataParallel` 包裹等。 + (2)如果 `driver.setup` 函数在脚本中会被调用两次及以上(意味着用户的启动脚本初始化了两个及以上的 trainer/evaluator)时: + 注意这种情况下我们是会保证前后两个 trainer/evaluator 使用的 `TorchDDPDriver` 以及其初始化方式的一致性,换句话说,如果 trainer1 + 检测到的启动方式是 '情况 A',那么我们会保证 trainer2 检测到的启动方式同样是 '情况A'(即使这需要一些额外的处理);因此这里我们主要讨论 + 我们是通过怎样的操作来保证 trainer2/3/... 检测到的启动方式是和 trainer1 一致的;简单来说,我们是通过使用环境变量来标记每一种不同的 + 启动方式来实现这一点的: + 我们会使用 `FASTNLP_DISTRIBUTED_CHECK` 来标记 '情况 A',使用 `fastnlp_torch_launch_not_ddp` 来标记 '情况 B',意味着我们在 + 使用 '情况 A' 来启动 `TorchDDPDriver` 时,我们会将 `FASTNLP_DISTRIBUTED_CHECK` 这一字符串注入到环境变量中,而 '情况 B' 时则 + 会将 `fastnlp_torch_launch_not_ddp` 这一字符串注入到环境变量中。因此在 trainer2 的 `TorchDDPDriver` 的初始化和 setup 过程中, + 如果检测到这些特殊的环境变量,我们就会将启动方式变更为其对应的启动方式,即使其它的参数特征属于另外的启动方式。 + + Part 2:对应的代码细节: + 1. 如何判断当前的各进程之间的通信组已经被建立(ddp 已经被初始化); + dist.is_initialized(); + 2. 如何判断不同的进程是否是由 `python -m torch.distributed.launch` 拉起还是由我们的 `TorchDDPDriver.open_subprocess` + 函数拉起; + 我们会在用户脚本 `import fastNLP` 的时候检测当前的环境变量中是否有 'LOCAL_RANK'、'WORLD_SIZE' 以及没有 `FASTNLP_DISTRIBUTED_CHECK`, + 如果满足条件,则我们会向环境变量中注入特殊的值 'FASTNLP_BACKEND_LAUNCH' 来标记用户是否使用了 `python -m torch.distributed.launch` + 来拉起多个进程; + 3. 整体的处理判断流程: + ___________________________________ + |进入 TorchDDPDriver 的 __init__ 函数| + ——————————————————————————————————— + ↓ + ___________________________________________________ + | 判断不同的进程是否是由 torch.distributed.launch 拉起 | + |(或者我们自己的 open_subprocess 函数拉起) | --------------> + ———————————————————————————————————————————————————  | + ↓ 是由 torch.distributed.launch 拉起 | 我们自己的 open_subprocess 函数拉起多个进程 +  ___________________________            |  + ←←←←← | 检测用户是否自己初始化了 ddp |              | + ↓ ———————————————————————————                    ↓ + ↓ ↓ 是 ________ + ↓ ______ | 情况 A | + ↓ 否 |情况 C| ————————— + ↓ ——————— + ↓ + ↓ ______ + ↓ -----------> |情况 B| +   ——————— + 4. 为了完成全部的建立 ddp 所需要的操作,三种情况都需要做的事情,以及每件事情的职责归属: + + 情况 A | 情况 B | 情况 C + ________________________________________________________________________________________________________ + 配置 ddp 所 | TorchDDPDriver.open_subprocess | torch.distributed.launch| torch.distributed.launch + 需要的环境变量 | | | + ———————————————————————————————————————————————————————————————————————————————————————————————————————— + 开启多个进程 | TorchDDPDriver.open_subprocess | torch.distributed.launch| torch.distributed.launch + ———————————————————————————————————————————————————————————————————————————————————————————————————————— + 调用 dist. | | | + init_process\ | TorchDDPDriver.setup | TorchDDPDriver.setup | 用户自己调用 + _group 函数 | | | + ———————————————————————————————————————————————————————————————————————————————————————————————————————— + 设置 TorchDDPDriver | | | + 的 world_size 和 | TorchDDPDriver.setup | TorchDDPDriver.setup | TorchDDPDriver.setup + global_rank 属性 | | | + ———————————————————————————————————————————————————————————————————————————————————————————————————————— + + Part 3:其它的处理细节: + 1. 环境变量; + fastNLP 的 `TorchDDPDriver` 运行时所需要的环境变量分为两种,一种是 torch 的 ddp 运行所需要的环境变量;另一种是 fastNLP 自己 + 的环境变量。前者的配置情况如上表所示;而后者中的大多数环境变量则是在用户 import fastNLP 时就设置好了; + 2. parallel_device, model_device 和 data_device 的关系; + parallel_device 为 `TorchDDPDriver` 的参数,model_device 和 data_device 都为 driver 的属性; + 其中 data_device 仅当情况 C 时由用户自己指定;如果其不为 None,那么在模型 forward 的时候,我们就会将数据迁移到 data_device 上; + model_device 永远都为单独的一个 torch.device; + + 情况 A | 情况 B | 情况 C + ________________________________________________________________________________________________________ + parallel_device | 由用户传入trainer的参数 | 为 torch.device( | 为 torch.device( + | device 决定,必须是一个list, | "cuda:{local_rank}") | "cuda:{local_rank}") + | 其中每一个对象都是 torch.device | | + ———————————————————————————————————————————————————————————————————————————————————————————————————————— + model_device | parallel_device[local_rank] | parallel_device | None + ———————————————————————————————————————————————————————————————————————————————————————————————————————— + data_device | model_device | model_device | 由用户传入 trainer 的参数 + | | | data_device 决定 + ———————————————————————————————————————————————————————————————————————————————————————————————————————— + + 3. _DDPWrappingModel 的作用; + 因为我们即需要调用模型的 `train_step`、`validate_step`、`test_step` 方法,又需要通过 `DistributedDataParallel` 的 + forward 函数来帮助我们同步各个设备上的梯度,因此我们需要先将模型单独包裹一层,然后在 forward 的时候,其先经过 `DistributedDataParallel` + 的 forward 方法,然后再经过 `_DDPWrappingModel` 的 forward 方法,我们会在该 forward 函数中进行判断,确定调用的是模型自己的 + forward 函数,还是 `train_step`、`validate_step`、`test_step` 方法。 + + 4. 当某一个进程出现 exception 后,`TorchDDPDriver` 的处理; + + 不管是什么情况,`TorchDDPDriver` 在 `setup` 函数的最后,都会将所有进程的 pid 主动记录下来,这样当一个进程出现 exception 后, + driver 的 on_exception 函数就会被 trainer 调用,其会调用 os.kill 指令将其它进程 kill 掉; """ super(TorchDDPDriver, self).__init__(model, fp16=fp16, **kwargs) @@ -81,7 +188,8 @@ class TorchDDPDriver(TorchDriver): # 如果用户自己在外面初始化了 DDP; self.outside_ddp = False - if dist.is_initialized() and FASTNLP_DISTRIBUTED_CHECK not in os.environ and "fastnlp_special" not in os.environ: + if dist.is_initialized() and FASTNLP_DISTRIBUTED_CHECK not in os.environ and \ + "fastnlp_torch_launch_not_ddp" not in os.environ: # 如果用户自己在外面初始化了 DDP,那么我们要求用户传入的模型一定是已经由 DistributedDataParallel 包裹后的模型; if not isinstance(model, DistributedDataParallel): raise RuntimeError( @@ -97,7 +205,7 @@ class TorchDDPDriver(TorchDriver): if isinstance(batch, Dict): return auto_param_call(step_fn, batch, signature_fn=signature_fn) else: - return self._validate_step(batch) + return step_fn(batch) model = model.module if hasattr(model, "train_step"): @@ -185,7 +293,7 @@ class TorchDDPDriver(TorchDriver): backend="nccl", rank=self.global_rank, world_size=self.world_size ) - os.environ["fastnlp_special"] = "yes" + os.environ["fastnlp_torch_launch_not_ddp"] = "yes" # 进入到这里的情况时: # dist.is_initialized 一定为 False; diff --git a/fastNLP/core/drivers/torch_driver/utils.py b/fastNLP/core/drivers/torch_driver/utils.py index 77a1f50a..406e030b 100644 --- a/fastNLP/core/drivers/torch_driver/utils.py +++ b/fastNLP/core/drivers/torch_driver/utils.py @@ -34,20 +34,13 @@ def _select_seed_randomly(min_seed_value: int = 0, max_seed_value: int = 255) -> def torch_seed_everything(seed: Optional[int] = None, workers: bool = False) -> int: - """Function that sets seed for pseudo-random number generators in: pytorch, numpy, python.random In addition, - sets the following environment variables: + r""" + 为伪随机数生成器设置种子的函数:pytorch、numpy、python.random 另外, + 设置以下环境变量: - - `PL_GLOBAL_SEED`: will be passed to spawned subprocesses (e.g. ddp_spawn backend). - - `PL_SEED_WORKERS`: (optional) is set to 1 if ``workers=True``. - - Args: - seed: the integer value seed for global random state in Lightning. - If `None`, will read seed from `PL_GLOBAL_SEED` env variable - or select it randomly. - workers: if set to ``True``, will properly configure all dataloaders passed to the - Trainer with a ``worker_init_fn``. If the user already provides such a function - for their dataloaders, setting this argument will have no influence. See also: - :func:`~pytorch_lightning.utilities.seed.pl_worker_init_function`. + :param seed: 全局随机状态的整数值种子。如果为“无”,将从 "FASTNLP_GLOBAL_SEED" 环境变量中读取种子或随机选择。 + :param workers: 如果设置为“True”,将正确配置所有传递给带有“worker_init_fn”的培训师。如果用户已经提供了这样的功能对于他们的数据加载器, + 设置此参数将没有影响; """ max_seed_value = np.iinfo(np.uint32).max min_seed_value = np.iinfo(np.uint32).min @@ -56,7 +49,6 @@ def torch_seed_everything(seed: Optional[int] = None, workers: bool = False) -> env_seed = os.environ.get(FASTNLP_GLOBAL_SEED) if env_seed is None: seed = _select_seed_randomly(min_seed_value, max_seed_value) - # rank_zero_warn(f"No seed found, seed set to {seed}") else: try: seed = int(env_seed) @@ -69,12 +61,8 @@ def torch_seed_everything(seed: Optional[int] = None, workers: bool = False) -> if not (min_seed_value <= seed <= max_seed_value): logger.warning("Your seed value is two big or two small for numpy, we will choose a random seed for you.") - # rank_zero_warn(f"{seed} is not in bounds, numpy accepts from {min_seed_value} to {max_seed_value}") seed = _select_seed_randomly(min_seed_value, max_seed_value) - # using `log.info` instead of `rank_zero_info`, - # so users can verify the seed is properly set in distributed training. - # log.info(f"Global seed set to {seed}") random.seed(seed) np.random.seed(seed) torch.manual_seed(seed) @@ -84,11 +72,9 @@ def torch_seed_everything(seed: Optional[int] = None, workers: bool = False) -> def reset_seed() -> None: - """ + r""" 这个函数主要是给 ddp 用的,因为 ddp 会开启多个进程,因此当用户在脚本中指定 seed_everything 时,在开启多个脚本后,会在每个脚本内重新 进行随机数的设置; - - If :func:`pytorch_lightning.utilities.seed.seed_everything` is unused, this function will do nothing. """ seed = os.environ.get(FASTNLP_GLOBAL_SEED, None) workers = os.environ.get(FASTNLP_SEED_WORKERS, "0")