diff --git a/fastNLP/core/drivers/paddle_driver/fleet.py b/fastNLP/core/drivers/paddle_driver/fleet.py index bde6f37f..a1275bed 100644 --- a/fastNLP/core/drivers/paddle_driver/fleet.py +++ b/fastNLP/core/drivers/paddle_driver/fleet.py @@ -19,6 +19,7 @@ from fastNLP.core.utils import ( check_user_specific_params, paddle_move_data_to_device, is_in_paddle_dist, + rank_zero_rm ) from fastNLP.core.samplers import ( RandomBatchSampler, @@ -55,20 +56,134 @@ class PaddleFleetDriver(PaddleDriver): fp16: bool = False, **kwargs ): - """ - 采用fleet接口进行并行paddle训练的driver - PaddleFleetDriver 目前考虑支持的三种启动方式: - 1. 用户自己不进行 fleet 的任何操作,直接使用我们的 Trainer,并且只运行一个 main 脚本,这时是由我们自己使用 open_subprocesses 拉起 - 多个进程,然后由 Driver 自己进行初始化 - 2. 其它情况同 1,但是用户自己使用 python -m paddle.distributed.launch 拉起; - 3. 用户自己在外面初始化 Fleet,并且通过 python -m paddle.distributed.launch 拉起; - - 注意多机的启动强制要求用户在每一台机器上使用 python -m paddle.distributed.launch 启动; - - 如果用户自己在外面初始化了 fleet,那么 - parallel_device 为 None; - data_device 为 表示单卡的一个参数; - dist.is_initialized 为 true; + r""" + 通过使用 PaddlePaddle 的 Fleet 框架启动多卡进程的 Driver。 + 需要注意的一点是,由于 PaddlePaddle 框架的特性,如果直接使用在 rank0 拉起其它进程的方法的话,如果不加以任何限制,PaddlePaddle会出现 + 第一次前向传播后卡住或占用所有显卡的现象;为了解决这一问题,我们在引入 FastNLP 时,会使用 `CUDA_VISIBLE_DEVICES` 将设备限制在卡0上, + 而用户如果使用了这一环境变量,我们会将其储存在 `USER_CUDA_VISIBLE_DEVICES` 中,并且通过一定的手段实现了转换(详细的设置请参见: + `fastNLP/envs/set_backend.py`)。在拉起其它进程的时候,我们会如法炮制,将环境限制在对应的设备上。 + + `PaddleFleetDriver` 目前支持的三种启动方式: + 1. 用户自己不进行分布式的任何操作,直接使用我们的 Trainer,这时是由我们自己使用 `FleetLauncher` 拉起多个进程, + 然后 `PaddleFleetDriver` 自己通过调用 `fleet.init` 来初始化 ddp 的通信组;(情况 A) + 2. 用户同样不在 Trainer 之外初始化分布式训练,但是用户自己使用 python -m paddle.distributed.launch 拉起来创建多个进程,这时我们仍旧 + 会通过调用 `fleet.init` 来初始化 ddp 的通信组;(情况 B) + 3. 用户自己在外面初始化分布式,并且通过 python -m paddle.distributed.launch 拉起,这时无论是多个进程的拉起和通信组的建立 + 都由用户自己操作,我们只会在 driver.setup 的时候对 `PaddleFleetDriver` 设置一些必要的属性值;(情况 C) + + 注意多机的启动强制要求用户在每一台机器上使用 python -m paddle.distributed.launch 启动;因此我们不会在 `PaddleFleetDriver` 中保存 + 任何当前有多少台机器的信息; + + Part 1:三种启动方式的具体分析: + (1)对于用户运行的脚本中,如果 `driver.setup` 只会被调用一次(意味着用户的启动脚本中只初始化了一个 trainer/evaluator)时, + `PaddleFleetDriver` 在初始化以及 `setup` 函数中会做的事情分别如下所示: + -> 情况 A:这种情况下用户传入的 model 在一定是普通的 model(没有经 `DataParallel` 包裹的model), + 因为 `Parallel` 的使用一定要求 fleet.init 已经被调用用来建立当前的 ddp 通信组;但是这意味着如果 + 用户需要使用 2 张以上的显卡,那么其必然需要使用 paddle.distributed.launch 来启动,意味着就不是情况 A 了; + 这时我们首先会调用 `FleetLauncher.launch` 函数来拉起多个进程,其中进程的数量等于用户传入给 trainer 的使用的 gpu + 的数量(例如 `Trainer` 中的参数是 device=[0, 1, 6, 7],那么我们就会使用第 0、1、6、7 张 gpu 来拉起 4 个进程); + 接着我们会调用 `fleet.init` 来初始化各个进程之间的通信组; + 这里需要注意拉起的新的进程会从前到后完整地运行一遍用户的启动脚本(例如 main.py),因此也都会运行这两个函数,但是需要注意只有进程 0 + 才会去真正地运行 `FleetLauncher.launch`;进程 0 运行到 `fleet.init`,paddle 会阻塞进程 0 继续 + 向前运行,直到其它进程也运行到这里; + 最后我们会设置这个进程对应的 device,然后将模型迁移到对应的机器上,再使用 `DataParallel` 将模型包裹; + 至此,paddle 分布式的环境配置过程全部完成; + + -> 情况 B:注意这种情况我们直接限定了用户是通过 paddle.distributed.launch 拉起,并且没有自己建立分布式的通信组。这时在 + `PaddleFleetDriver` 的初始化和 setup 函数的调用过程中,与情况 A 首要的不同就在于用户在 trainer 中输入的参数 device 不再有效, + 这时每个进程所使用的 gpu 是我们直接通过 `CUDA_VISIBLE_DEVICE` 来配置的;因此,如果用户想要实现使用特定 gpu + 设备的目的,可以通过自己设置环境变量实现(例如 os.environ["CUDA_VISIBLE_DEVICE"] 来实现,我们会通过一定的手段将其保存起来); + 剩下的操作和情况 A 类似; + + -> 情况 C:注意这种情况我们限定了用户是通过 paddle.distributed.launch 拉起,并且 ddp 的通信组也是由自己建立。这时基本上所有的 + 与操作相关的操作都应当由用户自己完成,包括迁移模型到对应 gpu 上以及将模型用 `DataParallel` 包裹等。 + (2)如果 `driver.setup` 函数在脚本中会被调用两次及以上(意味着用户的启动脚本初始化了两个及以上的 trainer/evaluator)时: + 注意这种情况下我们是会保证前后两个 trainer/evaluator 使用的 `PaddleFleetDriver` 以及其初始化方式的一致性,换句话说,如果 trainer1 + 检测到的启动方式是 '情况 A',那么我们会保证 trainer2 检测到的启动方式同样是 '情况A'(即使这需要一些额外的处理);因此这里我们主要讨论 + 我们是通过怎样的操作来保证 trainer2/3/... 检测到的启动方式是和 trainer1 一致的;简单来说,我们是通过使用环境变量来标记每一种不同的 + 启动方式来实现这一点的: + 我们会使用 `FASTNLP_DISTRIBUTED_CHECK` 来标记 '情况 A',使用 `fastnlp_torch_launch_not_ddp` 来标记 '情况 B',意味着我们在 + 使用 '情况 A' 来启动 `PaddleFleetDriver` 时,我们会将 `FASTNLP_DISTRIBUTED_CHECK` 这一字符串注入到环境变量中,而 '情况 B' 时则 + 会将 `fastnlp_torch_launch_not_ddp` 这一字符串注入到环境变量中。因此在 trainer2 的 `PaddleFleetDriver` 的初始化和 setup 过程中, + 如果检测到这些特殊的环境变量,我们就会将启动方式变更为其对应的启动方式,即使其它的参数特征属于另外的启动方式。 + + Part 2:对应的代码细节: + 1. 如何判断当前的各进程之间的通信组已经被建立(fleet 已经被初始化); + parallel_helper._is_parallel_ctx_initialized(); + 2. 如何判断不同的进程是否是由 `python -m paddle.distributed.launch` 拉起还是由我们的 `FleetLauncher.launch()` + 函数拉起; + 我们会在用户脚本 `import fastNLP` 的时候检测当前的环境变量中是否有 'PADDLE_RANK_IN_NODE'、'PADDLE_TRAINER_ID' + 以及没有 `FASTNLP_DISTRIBUTED_CHECK`, + 如果满足条件,则我们会向环境变量中注入特殊的值 'FASTNLP_BACKEND_LAUNCH' 来标记用户是否使用了 `python -m paddle.distributed.launch` + 来拉起多个进程; + 3. 整体的处理判断流程: + ___________________________________ + |进入 PaddleFleetDriver 的 __init__ 函数| + ——————————————————————————————————— + ↓ + ___________________________________________________ + | 判断不同的进程是否是由 paddle.distributed.launch 拉起 | + |(或者我们自己的 FleetLauncher 函数拉起) | --------------> + ———————————————————————————————————————————————————  | + ↓ 是由 paddle.distributed.launch 拉起 | 我们自己的 FleetLauncher 函数拉起多个进程 +  _____________________________            |  + ←←←←← | 检测用户是否自己初始化了 fleet |              | + ↓ —————————————————————————————                  ↓ + ↓ ↓ 是 ________ + ↓ ______ | 情况 A | + ↓ 否 |情况 C| ————————— + ↓ ——————— + ↓ + ↓ ______ + ↓ -----------> |情况 B| +   ——————— + 4. 为了完成全部的建立分布式所需要的操作,三种情况都需要做的事情,以及每件事情的职责归属: + + 情况 A | 情况 B | 情况 C + ________________________________________________________________________________________________________ + 配置 fleet 所 | FleetLauncher.launch | paddle.distributed.launch| paddle.distributed.launch + 需要的环境变量 | | | + ———————————————————————————————————————————————————————————————————————————————————————————————————————— + 开启多个进程 | FleetLauncher.launch | paddle.distributed.launch| paddle.distributed.launch + ———————————————————————————————————————————————————————————————————————————————————————————————————————— + 调用 fleet.init函数 | PaddleFleetDriver.setup | PaddleFleetDriver.setup | 用户自己调用 + ———————————————————————————————————————————————————————————————————————————————————————————————————————— + 设置 PaddleFleetDriver | | | + 的 world_size 和 | PaddleFleetDriver.setup | PaddleFleetDriver.setup | PaddleFleetDriver.setup + global_rank 属性 | | | + ———————————————————————————————————————————————————————————————————————————————————————————————————————— + + Part 3:其它的处理细节: + 1. 环境变量; + fastNLP 的 `PaddleFleetDriver` 运行时所需要的环境变量分为两种,一种是 paddle fleet 运行所需要的环境变量;另一种是 fastNLP 自己 + 的环境变量。前者的配置情况如上表所示;而后者中的大多数环境变量则是在用户 import fastNLP 时就设置好了; + 2. parallel_device, model_device 和 data_device 的关系; + parallel_device 为 `PaddleFleetDriver` 的参数,model_device 和 data_device 都为 driver 的属性; + 其中 data_device 仅当情况 C 时由用户自己指定;如果其不为 None,那么在模型 forward 的时候,我们就会将数据迁移到 data_device 上; + model_device 永远都为单独的一个 torch.device; + + 情况 A | 情况 B | 情况 C + ________________________________________________________________________________________________________ + parallel_device | 由用户传入trainer的参数 | | + | device 决定,必须是一个list, | 为 CUDA_VISIBLE_DEVICES | 为 CUDA_VISIBLE_DEVICES + | 其中每一个对象都是 int | | + ———————————————————————————————————————————————————————————————————————————————————————————————————————— + model_device | parallel_device[local_rank] | parallel_device | None + ———————————————————————————————————————————————————————————————————————————————————————————————————————— + data_device | model_device | model_device | 由用户传入 trainer 的参数 + | | | data_device 决定 + ———————————————————————————————————————————————————————————————————————————————————————————————————————— + + 3. _DDPWrappingModel 的作用; + 因为我们即需要调用模型的 `train_step`、`evaluate_step`、`test_step` 方法,又需要通过 `DataParallel` 的forward 函数来帮助 + 我们同步各个设备上的梯度,因此我们需要先将模型单独包裹一层,然后在 forward 的时候,其先经过 `DataParallel` 的 forward 方法, + 然后再经过 `_DDPWrappingModel` 的 forward 方法,我们会在该 forward 函数中进行判断,确定调用的是模型自己的 forward 函数,还是 + `train_step`、`evaluate_step`、`test_step` 方法。 + + 4. 当某一个进程出现 exception 后,`PaddleFleetDriver` 的处理; + + 不管是什么情况,`PaddleFleetDriver` 在 `setup` 函数的最后,都会将所有进程的 pid 主动记录下来,这样当一个进程出现 exception 后, + driver 的 on_exception 函数就会被 trainer 调用,其会调用 os.kill 指令将其它进程 kill 掉; """ super(PaddleFleetDriver, self).__init__(model, fp16=fp16, **kwargs) @@ -78,6 +193,7 @@ class PaddleFleetDriver(PaddleDriver): "when your value of parameter `device` is `None` in your `Trainer` instance.") # 如果用户自己初始化了 paddle 的分布式训练那么一定是通过 launch 拉起的 + # 这个参数会在 initialize_paddle_drvier 中设置。 self.is_pull_by_paddle_run = is_pull_by_paddle_run self.parallel_device = parallel_device # 在初始化时,如果发现 is_pull_by_paddle_run ,则将 parallel_device 设置成当前进程的gpu @@ -98,7 +214,7 @@ class PaddleFleetDriver(PaddleDriver): self.outside_fleet = True # 用户只有将模型上传到对应机器上后才能用 DataParallel 包裹,因此如果用户在外面初始化了 Fleet,那么在 PaddleFleetDriver 中 - # 我们就直接将 model_device 置为 None; + # 我们就直接将 model_device 置为 None; self._model_device = None # 当参数 `device` 为 None 时并且该参数不为 None,表示将对应的数据移到指定的机器上; @@ -119,9 +235,12 @@ class PaddleFleetDriver(PaddleDriver): self.world_size = None self.global_rank = 0 + self.gloo_rendezvous_dir = None + # 分布式环境的其它参数设置 self._fleet_kwargs = kwargs.get("paddle_fleet_kwargs", {}) check_user_specific_params(self._fleet_kwargs, DataParallel.__init__) + # fleet.init 中对于分布式策略的设置,详情可以参考 PaddlePaddle 的官方文档 self.strategy = self._fleet_kwargs.get("strategy", fleet.DistributedStrategy()) self.is_collective = self._fleet_kwargs.get("is_collective", True) if not self.is_collective: @@ -145,7 +264,10 @@ class PaddleFleetDriver(PaddleDriver): def setup(self): """ - 在主进程拉起其它子进程,将主进程作为rank 0 + 根据不同的情况进行不同的设置。 + 1、如果是通过 paddle.distributed.launch 方法启动时,则根据已经设置好的环境获取 + 分布式的属性。 + 2、否则,调用 FleetLauncher 类启动子进程 """ if self._has_setup: return @@ -174,7 +296,7 @@ class PaddleFleetDriver(PaddleDriver): # 此时 parallel_helper._is_parallel_ctx_initialized() 一定为 False # parallel_device 是 list, if not parallel_helper._is_parallel_ctx_initialized(): - # 没有初始化分布式环境,且是主进程 + # 拉起子进程并设置相应的属性 self.init_fleet_and_set() # 用户在这个 trainer 前面又初始化了一个 trainer,并且使用的是 PaddleFleetDriver; else: @@ -216,12 +338,13 @@ class PaddleFleetDriver(PaddleDriver): # 是 rank0 的话,则拉起其它子进程 launcher = FleetLauncher(self.parallel_device, self.output_from_new_proc) launcher.launch() + self.gloo_rendezvous_dir = launcher.gloo_rendezvous_dir # 设置参数和初始化分布式环境 fleet.init(self.role_maker, self.is_collective, self.strategy) self.global_rank = int(os.getenv("PADDLE_TRAINER_ID")) self.world_size = int(os.getenv("PADDLE_TRAINERS_NUM")) - # 正常情况下不会Assert出问题,但还是保险一下 + # 正常情况下不会 Assert 出问题,但还是保险一下 assert self.global_rank is not None assert self.world_size is not None assert self.world_size == len(self.parallel_device) @@ -235,10 +358,19 @@ class PaddleFleetDriver(PaddleDriver): self.global_rank = paddledist.get_rank() def barrier(self): + r""" + 用于在多进程工作时同步各进程的工作进度,运行快的进程运行到这里会等待运行慢的进程,只有所有进程都运行到此函数时,所有的进程才会继续运行; + 仅在多分布式训练场景中有使用。 + + 注意,该函数的行为会受到 FASTNLP_NO_SYNC 的影响。仅当 FASTNLP_NO_SYNC 在 os.environ 中不存在,或小于 1 时才真的执行 barrier 。 + """ if int(os.environ.get(FASTNLP_NO_SYNC, 0)) < 1: # 当 FASTNLP_NO_SYNC 小于 1 时实际执行 paddledist.barrier() def configure_fleet(self): + """ + 将模型用 DataParallel 和自定义的类型包裹起来 + """ if not self._has_fleetwrapped and not isinstance(self.model, DataParallel): self.model = DataParallel( _FleetWrappingModel(self.model), @@ -247,8 +379,14 @@ class PaddleFleetDriver(PaddleDriver): self._has_fleetwrapped = True def on_exception(self): - if os.path.exists(self.gloo_rendezvous_dir): - shutil.rmtree(self.gloo_rendezvous_dir) + """ + 该函数用于在训练或者预测过程中出现错误时正确地关掉其它的进程,这一点是通过在多进程 driver 调用 open_subprocess 的时候将每一个进程 + 的 pid 记录下来,然后在出现错误后,由出现错误的进程手动地将其它进程 kill 掉; + + 因此,每一个多进程 driver 如果想要该函数能够正确地执行,其需要在自己的 open_subprocess(开启多进程的函数)中正确地记录每一个进程的 + pid 的信息; + """ + rank_zero_rm(self.gloo_rendezvous_dir) super().on_exception() @property @@ -282,6 +420,17 @@ class PaddleFleetDriver(PaddleDriver): return self.model_device def model_call(self, batch, fn: Callable, signature_fn: Optional[Callable]) -> Dict: + """ + 通过调用 `fn` 来实现训练时的前向传播过程; + 注意 Trainer 和 Evaluator 会调用该函数来实现网络的前向传播过程,其中传入该函数的参数 `fn` 是函数 `get_model_call_fn` 所返回的 + 函数; + + :param batch: 当前的一个 batch 的数据;可以为字典或者其它类型; + :param fn: 调用该函数进行一次计算。 + :param signature_fn: 由 Trainer 传入的用于网络前向传播一次的签名函数,因为当 batch 是一个 Dict 的时候,我们会自动调用 auto_param_call + 函数,而一些被包裹的模型需要暴露其真正的函数签名,例如 DistributedDataParallel 的调用函数是 forward,但是需要其函数签名为 model.module.forward; + :return: 返回由 `fn` 返回的结果(应当为一个 dict 或者 dataclass,但是不需要我们去检查); + """ if self._has_fleetwrapped: return self.model(batch, fastnlp_fn=fn, fastnlp_signature_fn=signature_fn, wo_auto_param_call=self.wo_auto_param_call) @@ -292,6 +441,27 @@ class PaddleFleetDriver(PaddleDriver): return fn(batch) def get_model_call_fn(self, fn: str) -> Tuple: + """ + 该函数会接受 Trainer 的 train_fn 或者 Evaluator 的 evaluate_fn,返回一个实际用于调用 driver.model_call 时传入的函数参数; + 该函数会在 Trainer 和 Evaluator 在 driver.setup 函数之后调用; + + 之所以设置该函数的目的在于希望将具体的 model_call function 从 driver 中抽离出来,然后将其附着在 Trainer 或者 Evaluator 身上; + 这样是因为在新版的设计中,使用 model 的哪种方法来进行 `train step` 或者 `evaluate step` 是通过额外的参数 `train_fn` 和 + `evaluate_fn` 来确定的,而二者又分别是通过 Trainer 和 Evaluator 来控制的;因此不能将确定具体的 `train step fn` 和 + `evaluate step fn` 的逻辑放在每一个 driver 的初始化的时候(因此在 Trainer 初始化第一个 driver 时,Evaluator 还没有初始化,但是 + `evaluate step fn` 的确定却需要 Evaluator 的初始化),因此我们将这一逻辑抽象到这一函数当中; + + 这一函数应当通过参数 `fn` 来判断应当返回的实际的调用的函数,具体逻辑如下所示: + 1. 如果 fn == "train_step" or "evaluate_step",那么对传入的模型进行检测,如果模型没有定义方法 `fn`,则默认调用模型的 `forward` + 函数,然后给出 warning; + 2. 如果 fn 是其他字符串,那么如果模型没有定义方法 `fn` 则直接报错; + 注意不同的 driver 需要做额外的检测处理,例如在 DDPDriver 中,当传入的模型本身就是 DistributedDataParallel 中,我们只能调用模型的 + forward 函数,因此需要额外的 warning;这一点特别需要注意的问题在于 driver 自己在 setup 时也会对模型进行改变(DDPDriver),因此 + 可能需要额外标记最初传入 driver 的模型是哪种形式的; + + :param fn: 应当为一个字符串,该函数通过该字符串判断要返回模型的哪种方法; + :return: 返回一个元组,包含两个函数,用于在调用 driver.model_call 时传入; + """ model = self.unwrap_model() if self._has_fleetwrapped: if hasattr(model, fn): @@ -316,7 +486,25 @@ class PaddleFleetDriver(PaddleDriver): return self.model, model.forward def set_dist_repro_dataloader(self, dataloader, dist: Optional[Union[str, ReproducibleSampler, RandomBatchSampler]], - reproducible: bool = False, sampler_or_batch_sampler=None): + reproducible: bool = False): + r""" + 根据输入的 dataloader 得到一个 支持分布式 (distributed) 与 可复现的 (reproducible) 的 dataloader。 + + :param dataloader: 根据 dataloader 设置其对应的分布式版本以及可复现版本 + :param dist: 应当为一个字符串,其值应当为以下之一:[None, "dist", "unrepeatdist"];为 None 时,表示不需要考虑当前 dataloader + 切换为分布式状态;为 'dist' 时,表示该 dataloader 应该保证每个 gpu 上返回的 batch 的数量是一样多的,允许出现少量 sample ,在 + 不同 gpu 上出现重复;为 'unrepeatdist' 时,表示该 dataloader 应该保证所有 gpu 上迭代出来的数据合并起来应该刚好等于原始的 + 数据,允许不同 gpu 上 batch 的数量不一致。其中 trainer 中 kwargs 的参数 `use_dist_sampler` 为 True 时,该值为 "dist"; + 否则为 None ,evaluator 中的 kwargs 的参数 `use_dist_sampler` 为 True 时,该值为 "unrepeatdist",否则为 None; + 注意当 dist 为 ReproducibleSampler, ReproducibleBatchSampler 时,是断点重训加载时 driver.load 函数在调用; + 当 dist 为 str 或者 None 时,是 trainer 在初始化时调用该函数; + + :param reproducible: 如果为 False ,不要做任何考虑;如果为 True ,需要保证返回的 dataloader 可以保存当前的迭代状态,使得 + 可以可以加载。 + :return: 应当返回一个被替换 sampler 后的新的 dataloader 对象 (注意此处一定需要返回一个新的 dataloader 对象) ;此外, + 如果传入的 dataloader 中是 ReproducibleSampler 或者 ReproducibleBatchSampler 需要重新初始化一个放入返回的 + dataloader 中。如果 dist 为空,且 reproducible 为 False,可直接返回原对象。 + """ # 暂时不支持iterableDataset assert dataloader.dataset_kind != _DatasetKind.ITER, \ "FastNLP does not support `IteratorDataset` now." @@ -429,10 +617,7 @@ class PaddleFleetDriver(PaddleDriver): @staticmethod def _check_optimizer_legality(optimizers): - """ - paddle存在设置分布式optimizers的函数,返回值为fleet.meta_optimizers.HybridParallelOptimizer - 重写是为了防止单卡下也传入了分布式的优化器 - """ + # paddle 存在设置分布式 optimizers 的函数,返回值为 fleet.meta_optimizers.HybridParallelOptimizer DistribuedOptimizer = fleet.meta_optimizers.HybridParallelOptimizer for each_optimizer in optimizers: if not isinstance(each_optimizer, (Optimizer, DistribuedOptimizer)): diff --git a/fastNLP/core/drivers/paddle_driver/fleet_launcher.py b/fastNLP/core/drivers/paddle_driver/fleet_launcher.py index 66eccfca..471679a7 100644 --- a/fastNLP/core/drivers/paddle_driver/fleet_launcher.py +++ b/fastNLP/core/drivers/paddle_driver/fleet_launcher.py @@ -20,7 +20,7 @@ from .utils import ( # 记录各个进程信息 class SubTrainer(object): """ - 和fastnlp的Triainer没有关系,仅用于统计节点内不同训练的一些信息 + 用于统计节点内不同训练进程的信息,和 fastnlp 的 Triainer 没有关系 """ def __init__(self, endpoint=None, rank=None): self.devices = [] @@ -30,8 +30,8 @@ class SubTrainer(object): class FleetLauncher: """ - 复原了 paddle 的 launch_collective 函数,将其集成到一个类里 - 仅支持单机多卡的启动 + 复原了 paddle 的 launch_collective 函数,将其简化后集成到一个类里 + 仅支持每个机器单卡的情况。 """ def __init__( self, @@ -45,17 +45,26 @@ class FleetLauncher: self.setup() def setup(self): - + """ + 进行初始化设置的函数,根据传入的设备找到分布式训练使用的端口号 + """ self.set_endpoints() self.sub_trainers = self.get_process_info() - def launch(self) -> int: + def launch(self): + """ + 用于启动分布式进程。 + 首先设置 PaddlePaddle 分布式训练需要设置的环境变量,然后建立新的子进程 + """ # 设置环境变量 self.global_envs = self.get_global_env() self.open_subprocess() reset_seed() def open_subprocess(self): + """ + 从 sub_trainers 中获取各个 rank 的信息,并且使用 subprocess.Popen 建立新的子进程。 + """ if __main__.__spec__ is None: # Script called as `python a/b/c.py` @@ -77,6 +86,7 @@ class FleetLauncher: current_env = copy.copy(self.global_envs) for idx, t in enumerate(self.sub_trainers): + # 根据不同的 rank 设置环境变量 proc_env = { # global_rank "PADDLE_TRAINER_ID": f"{t.rank}", @@ -108,6 +118,14 @@ class FleetLauncher: os.environ.update(current_env) def get_global_env(self): + """ + 设置分布式训练需要的全局变量,包括: + 1、GLOO 相关的设置 + 2、`PADDLE_TRAINERS_NUM` :所有的进程数目 + 3、`PADDLE_TRAINER_ENDPOINTS` :使用的所有地址及其端口 + 4、`PADDLE_WORLD_DEVICE_IDS` :使用的所有设备 + 5、FASTNLP_DISTRIBUTED_CHECK:通过 fastNLP 建立子进程的标志,保存分布式训练使用的设备 + """ global_envs = copy.copy(os.environ.copy()) self.gloo_rendezvous_dir = tempfile.mkdtemp() @@ -137,7 +155,7 @@ class FleetLauncher: def set_endpoints(self): """ - Reference to `get_cluster_from_args` + 寻找用户设置的端口或是空闲端口用于分布式训练,参考了 PaddlePaddle 中的 `get_cluster_from_args` 函数 """ self.node_ip = "127.0.0.1" @@ -157,7 +175,7 @@ class FleetLauncher: def get_process_info(self): """ - Reference to `get_cluster` + 获取各个训练进程的设备、rank 和端口信息,参考 PaddlePaddle 的 `get_cluster` 函数。 """ sub_trainers = [] assert len(self.endpoints) >= len( diff --git a/fastNLP/core/drivers/paddle_driver/initialize_paddle_driver.py b/fastNLP/core/drivers/paddle_driver/initialize_paddle_driver.py index eac2d4a4..9a9d4198 100644 --- a/fastNLP/core/drivers/paddle_driver/initialize_paddle_driver.py +++ b/fastNLP/core/drivers/paddle_driver/initialize_paddle_driver.py @@ -17,14 +17,16 @@ def initialize_paddle_driver(driver: str, device: Optional[Union[str, int, List[ model: paddle.nn.Layer, **kwargs) -> PaddleDriver: r""" 用来根据参数 `driver` 和 `device` 来确定并且初始化一个具体的 `Driver` 实例然后返回回去; - 注意如果输入的 `device` 如果和 `driver` 对应不上就直接报错; + 1、如果检测到当前进程为用户通过 `python -m paddle.distributed.launch xxx.py` 方式拉起的,则将 + 设备自动设置为用户指定的设备(由于我们在引入 fastNLP 进行了特殊的设置,因此可以通过 `CUDA_VISIBLE_DEVICES` 获取) + 2、如果检测到输入的 `driver` 是 `paddle` 但 `device` 包含了多个设备,那么我们会给出警告并且自动返回多卡的 Driver + 3、如果检测到输入的 `driver` 是 `fleet` 但 `device` 仅有一个设备,那么我们会给出警告但仍旧返回多卡的 Driver :param driver: 该参数的值应为以下之一:["paddle", "fleet"]; :param device: 该参数的格式与 `Trainer` 对参数 `device` 的要求一致; :param model: 训练或者评测的具体的模型; - :return: 返回一个元组,元组的第一个值是具体的基于 pytorch 的 `Driver` 实例,元组的第二个值是该 driver 的名字(用于检测一个脚本中 - 先后 driver 的次序的正确问题); + :return: 返回构造的 `Driver` 实例。 """ if is_in_paddle_launch_dist(): if device is not None: diff --git a/fastNLP/core/drivers/paddle_driver/single_device.py b/fastNLP/core/drivers/paddle_driver/single_device.py index e47360ee..f140ad69 100644 --- a/fastNLP/core/drivers/paddle_driver/single_device.py +++ b/fastNLP/core/drivers/paddle_driver/single_device.py @@ -31,6 +31,9 @@ __all__ = [ ] class PaddleSingleDriver(PaddleDriver): + """ + 支持 paddle cpu 或单卡 gpu 训练的 driver + """ def __init__(self, model, device: Union[str, int], fp16: Optional[bool] = False, **kwargs): if isinstance(model, DataParallel): raise ValueError("`paddle.DataParallel` is not supported in `PaddleSingleDriver`") @@ -59,18 +62,53 @@ class PaddleSingleDriver(PaddleDriver): self.world_size = 1 def setup(self): + r""" + 该函数用来初始化训练环境,用于设置当前训练的设备,并将模型迁移到对应设备上。 + """ device = self.model_device device = get_device_from_visible(device, output_type=str) paddle.device.set_device(device) self.model.to(device) def model_call(self, batch, fn: Callable, signature_fn: Optional[Callable]) -> Dict: + """ + 通过调用 `fn` 来实现训练时的前向传播过程; + 注意 Trainer 和 Evaluator 会调用该函数来实现网络的前向传播过程,其中传入该函数的参数 `fn` 是函数 `get_model_call_fn` 所返回的 + 函数; + + :param batch: 当前的一个 batch 的数据;可以为字典或者其它类型; + :param fn: 调用该函数进行一次计算。 + :param signature_fn: 由 Trainer 传入的用于网络前向传播一次的签名函数,因为当 batch 是一个 Dict 的时候,我们会自动调用 auto_param_call + 函数,而一些被包裹的模型需要暴露其真正的函数签名,例如 DistributedDataParallel 的调用函数是 forward,但是需要其函数签名为 model.module.forward; + :return: 返回由 `fn` 返回的结果(应当为一个 dict 或者 dataclass,但是不需要我们去检查); + """ if isinstance(batch, Dict) and not self.wo_auto_param_call: return auto_param_call(fn, batch, signature_fn=signature_fn) else: return fn(batch) def get_model_call_fn(self, fn: str) -> Tuple: + """ + 该函数会接受 Trainer 的 train_fn 或者 Evaluator 的 evaluate_fn,返回一个实际用于调用 driver.model_call 时传入的函数参数; + 该函数会在 Trainer 和 Evaluator 在 driver.setup 函数之后调用; + + 之所以设置该函数的目的在于希望将具体的 model_call function 从 driver 中抽离出来,然后将其附着在 Trainer 或者 Evaluator 身上; + 这样是因为在新版的设计中,使用 model 的哪种方法来进行 `train step` 或者 `evaluate step` 是通过额外的参数 `train_fn` 和 + `evaluate_fn` 来确定的,而二者又分别是通过 Trainer 和 Evaluator 来控制的;因此不能将确定具体的 `train step fn` 和 + `evaluate step fn` 的逻辑放在每一个 driver 的初始化的时候(因此在 Trainer 初始化第一个 driver 时,Evaluator 还没有初始化,但是 + `evaluate step fn` 的确定却需要 Evaluator 的初始化),因此我们将这一逻辑抽象到这一函数当中; + + 这一函数应当通过参数 `fn` 来判断应当返回的实际的调用的函数,具体逻辑如下所示: + 1. 如果 fn == "train_step" or "evaluate_step",那么对传入的模型进行检测,如果模型没有定义方法 `fn`,则默认调用模型的 `forward` + 函数,然后给出 warning; + 2. 如果 fn 是其他字符串,那么如果模型没有定义方法 `fn` 则直接报错; + 注意不同的 driver 需要做额外的检测处理,例如在 DDPDriver 中,当传入的模型本身就是 DistributedDataParallel 中,我们只能调用模型的 + forward 函数,因此需要额外的 warning;这一点特别需要注意的问题在于 driver 自己在 setup 时也会对模型进行改变(DDPDriver),因此 + 可能需要额外标记最初传入 driver 的模型是哪种形式的; + + :param fn: 应当为一个字符串,该函数通过该字符串判断要返回模型的哪种方法; + :return: 返回一个元组,包含两个函数,用于在调用 driver.model_call 时传入; + """ if hasattr(self.model, fn): fn = getattr(self.model, fn) if not callable(fn): @@ -95,6 +133,24 @@ class PaddleSingleDriver(PaddleDriver): def set_dist_repro_dataloader(self, dataloader, dist: Union[str, ReproducibleBatchSampler, ReproducibleSampler]=None, reproducible: bool = False): + r""" + 根据输入的 dataloader 得到一个 支持分布式 (distributed) 与 可复现的 (reproducible) 的 dataloader。 + + :param dataloader: 根据 dataloader 设置其对应的分布式版本以及可复现版本 + :param dist: 应当为一个字符串,其值应当为以下之一:[None, "dist", "unrepeatdist"];为 None 时,表示不需要考虑当前 dataloader + 切换为分布式状态;为 'dist' 时,表示该 dataloader 应该保证每个 gpu 上返回的 batch 的数量是一样多的,允许出现少量 sample ,在 + 不同 gpu 上出现重复;为 'unrepeatdist' 时,表示该 dataloader 应该保证所有 gpu 上迭代出来的数据合并起来应该刚好等于原始的 + 数据,允许不同 gpu 上 batch 的数量不一致。其中 trainer 中 kwargs 的参数 `use_dist_sampler` 为 True 时,该值为 "dist"; + 否则为 None ,evaluator 中的 kwargs 的参数 `use_dist_sampler` 为 True 时,该值为 "unrepeatdist",否则为 None; + 注意当 dist 为 ReproducibleSampler, ReproducibleBatchSampler 时,是断点重训加载时 driver.load 函数在调用; + 当 dist 为 str 或者 None 时,是 trainer 在初始化时调用该函数; + + :param reproducible: 如果为 False ,不要做任何考虑;如果为 True ,需要保证返回的 dataloader 可以保存当前的迭代状态,使得 + 可以可以加载。 + :return: 应当返回一个被替换 sampler 后的新的 dataloader 对象 (注意此处一定需要返回一个新的 dataloader 对象) ;此外, + 如果传入的 dataloader 中是 ReproducibleSampler 或者 ReproducibleBatchSampler 需要重新初始化一个放入返回的 + dataloader 中。如果 dist 为空,且 reproducible 为 False,可直接返回原对象。 + """ # 暂时不支持iterableDataset assert dataloader.dataset_kind != _DatasetKind.ITER, \ diff --git a/fastNLP/core/drivers/paddle_driver/utils.py b/fastNLP/core/drivers/paddle_driver/utils.py index 48598a34..6cd7b252 100644 --- a/fastNLP/core/drivers/paddle_driver/utils.py +++ b/fastNLP/core/drivers/paddle_driver/utils.py @@ -69,7 +69,6 @@ def paddle_seed_everything(seed: Optional[int] = None, workers: bool = False) -> os.environ[FASTNLP_SEED_WORKERS] = f"{int(workers)}" return seed - def reset_seed() -> None: """ fleet 会开启多个进程,因此当用户在脚本中指定 seed_everything 时,在开启多个脚本后,会在每个脚本内重新 @@ -80,16 +79,10 @@ def reset_seed() -> None: if seed is not None: paddle_seed_everything(int(seed), workers=bool(int(workers))) -class ForwardState(IntEnum): - TRAIN = 0 - VALIDATE = 1 - TEST = 2 - PREDICT = 3 - class _FleetWrappingModel(Layer): """ - 参考_DDPWrappingModel,paddle的分布式训练也需要用paddle.nn.DataParallel进行包装,采用和 - pytorch相似的处理方式 + 参考 _DDPWrappingModel , paddle 的分布式训练也需要用 paddle.nn.DataParallel 进行包装,采用和 + pytorch 相似的处理方式 """ def __init__(self, model: 'nn.Layer'): super(_FleetWrappingModel, self).__init__() @@ -109,7 +102,6 @@ class _FleetWrappingModel(Layer): class DummyGradScaler: """ 用于仿造的GradScaler对象,防止重复写大量的if判断 - """ def __init__(self, *args, **kwargs): pass @@ -152,6 +144,9 @@ def _build_fp16_env(dummy=False): return auto_cast, GradScaler def find_free_ports(num): + """ + 在空闲的端口中找到 num 个端口 + """ def __free_port(): with closing(socket.socket(socket.AF_INET, socket.SOCK_STREAM)) as s: s.setsockopt(socket.SOL_SOCKET, socket.SO_LINGER, @@ -178,18 +173,11 @@ def find_free_ports(num): return None -def get_host_name_ip(): - try: - host_name = socket.gethostname() - host_ip = socket.gethostbyname(host_name) - return host_name, host_ip - except: - return None - def get_device_from_visible(device: Union[str, int], output_type=int): """ 在有 CUDA_VISIBLE_DEVICES 的情况下,获取对应的设备。 如 CUDA_VISIBLE_DEVICES=2,3 ,device=3 ,则返回1。 + :param device: 未转化的设备名 :param output_type: 返回值的类型 :return: 转化后的设备id diff --git a/fastNLP/core/utils/paddle_utils.py b/fastNLP/core/utils/paddle_utils.py index 1f461e0f..e65cd735 100644 --- a/fastNLP/core/utils/paddle_utils.py +++ b/fastNLP/core/utils/paddle_utils.py @@ -22,6 +22,13 @@ from .utils import apply_to_collection def paddle_to(data, device: Union[str, int]): + """ + 将 `data` 迁移到指定的 `device` 上 + + :param data: 要迁移的张量 + :param device: 目标设备,可以是 `str` 或 `int` + :return: 迁移后的张量 + """ if device == "cpu": return data.cpu() @@ -31,6 +38,9 @@ def paddle_to(data, device: Union[str, int]): def get_paddle_gpu_str(device: Union[str, int]): """ 获得 `gpu:x` 类型的设备名 + + :param device: 设备编号或设备名 + :return: 返回对应的 `gpu:x` 格式的设备名 """ if isinstance(device, str): return device.replace("cuda", "gpu") @@ -38,7 +48,10 @@ def get_paddle_gpu_str(device: Union[str, int]): def get_paddle_device_id(device: Union[str, int]): """ - 获得 gpu 的设备id,注意不要传入 `cpu` 。 + 获得 gpu 的设备id + + :param: device: 设备编号或设备名 + :return: 设备对应的编号 """ if isinstance(device, int): return device