From 680421fe8663366a4566d7ba54bac3113cf3ee42 Mon Sep 17 00:00:00 2001 From: x54-729 <17307130121@fudan.edu.cn> Date: Sat, 9 Apr 2022 12:52:19 +0000 Subject: [PATCH] =?UTF-8?q?paddle=E5=88=86=E5=B8=83=E5=BC=8F=E8=AE=AD?= =?UTF-8?q?=E7=BB=83?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- fastNLP/core/drivers/paddle_driver/fleet.py | 50 +++++++++---------- .../paddle_driver/initialize_paddle_driver.py | 11 ++-- fastNLP/core/utils/paddle_utils.py | 6 +-- tests/helpers/datasets/paddle_data.py | 22 +------- tests/helpers/models/paddle_model.py | 27 +++++++++- 5 files changed, 57 insertions(+), 59 deletions(-) diff --git a/fastNLP/core/drivers/paddle_driver/fleet.py b/fastNLP/core/drivers/paddle_driver/fleet.py index 77cd62c2..f16ea289 100644 --- a/fastNLP/core/drivers/paddle_driver/fleet.py +++ b/fastNLP/core/drivers/paddle_driver/fleet.py @@ -8,7 +8,6 @@ from .utils import ( _FleetWrappingModel, ForwardState, _MODE_PARAMETER, - get_host_name_ip, get_device_from_visible, reset_seed, ) @@ -81,9 +80,9 @@ class PaddleFleetDriver(PaddleDriver): # 如果用户自己在外面初始化了并行模型; self.outside_fleet = False - # 检测 paddle 分布式的环境变量 - if parallel_helper._is_parallel_ctx_initialized(): - # 如果用户自己在外面初始化了 DDP,那么我们要求用户传入的模型一定是已经由 DistributedDataParallel 包裹后的模型; + if parallel_helper._is_parallel_ctx_initialized() and FASTNLP_DISTRIBUTED_CHECK not in os.environ and \ + "fastnlp_paddle_launch_not_fleet" not in os.environ: + # 如果用户自己在外面初始化了 Fleet,那么我们要求用户传入的模型一定是已经由 DistributedDataParallel 包裹后的模型; if not isinstance(model, DataParallel): raise RuntimeError( "It is not allowed to input a normal model instead of `paddle.DataParallel` when" @@ -125,11 +124,11 @@ class PaddleFleetDriver(PaddleDriver): self._test_step = partial(_running_fn_, step_fn=self.model, signature_fn=model.forward) # 当参数 `device` 为 None 时并且该参数不为 None,表示将对应的数据移到指定的机器上; - self._data_device = kwargs.get("_data_device", None) + self._data_device = kwargs.get("data_device", None) if self._data_device is not None: if isinstance(self._data_device, int): if self._data_device < 0: - raise ValueError("Parameter `_data_device` can not be smaller than 0.") + raise ValueError("Parameter `data_device` can not be smaller than 0.") _could_use_device_num = paddle.device.cuda.device_count() if self._data_device >= _could_use_device_num: raise ValueError("The gpu device that parameter `device` specifies is not existed.") @@ -140,18 +139,6 @@ class PaddleFleetDriver(PaddleDriver): logger.warning("`Parameter data_device` is not equal to paddle.deivce.get_device(), " "please keep them equal to avoid some potential bugs.") - if not self.outside_fleet and parallel_device is None: - raise ValueError("Parameter `parallel_device` can not be None when using `PaddleFleetDriver`. This error is caused " - "when your value of parameter `device` is `None` in your `Trainer` instance.") - - # 可能需要放在参数里 - self.strategy = kwargs.get("strategy", fleet.DistributedStrategy()) - self.is_collective = kwargs.get("is_collective", True) - if not self.is_collective: - raise NotImplementedError("FastNLP dose not support `parameters server` for distributed training now.") - self.role_maker = kwargs.get("role_maker", None) - - self._master_port = None self.world_size = None self.global_rank = 0 self._configured = False # 防止重复调用 configure_ddp() 函数使用 @@ -159,7 +146,11 @@ class PaddleFleetDriver(PaddleDriver): self._fleet_kwargs = kwargs.get("paddle_fleet_kwargs", {}) check_user_specific_params(self._fleet_kwargs, DataParallel.__init__) - # TODO 对这些参数的检查 + self.strategy = self._fleet_kwargs.get("strategy", fleet.DistributedStrategy()) + self.is_collective = self._fleet_kwargs.get("is_collective", True) + if not self.is_collective: + raise NotImplementedError("FastNLP only support `collective` for distributed training now.") + self.role_maker = self._fleet_kwargs.get("role_maker", None) if self.local_rank == 0 and not is_in_paddle_dist(): # 由于使用driver时模型一定会被初始化,因此在一开始程序一定会占用一部分显存来存放模型,然而这部分显存没有 @@ -193,14 +184,16 @@ class PaddleFleetDriver(PaddleDriver): self.world_size = int(os.environ.get("PADDLE_TRAINERS_NUM")) self.global_rank = int(os.environ.get("PADDLE_TRAINER_ID")) reset_seed() - logger.warning(f"\nworld size, global rank: {self.world_size}, {self.global_rank}\n") - fleet.init(self.role_maker, self.is_collective, self.strategy) + logger.info(f"\nworld size, global rank: {self.world_size}, {self.global_rank}\n") + if not parallel_helper._is_parallel_ctx_initialized(): + fleet.init(self.role_maker, self.is_collective, self.strategy) + + os.environ["fastnlp_paddle_launch_not_fleet"] = "yes" else: # 在用户只使用了一个分布式 trainer 的情况下 # 此时 parallel_helper._is_parallel_ctx_initialized() 一定为 False # parallel_device 是 list, - # if self.local_rank == 0 and FASTNLP_DISTRIBUTED_CHECK not in os.environ: if not parallel_helper._is_parallel_ctx_initialized(): # 没有初始化分布式环境,且是主进程 self.init_fleet_and_set() @@ -212,11 +205,15 @@ class PaddleFleetDriver(PaddleDriver): if sorted(pre_gpus) != sorted(self.parallel_device): raise RuntimeError("Notice you are using `PaddleFleetDriver` after one instantiated `PaddleFleetDriver`, it is not" "allowed that your second `PaddleFleetDriver` has a new setting of parameters `parallel_device`.") + self.world_size = dist.get_world_size() + self.global_rank = dist.get_rank() if not self.outside_fleet: # self.model.to(self.model_device) self.configure_fleet() + self.barrier() + # 初始化 self._pids,从而使得每一个进程都能接受到 rank0 的 send 操作; # TODO 不用.to会怎么样? self._pids = [] @@ -238,6 +235,7 @@ class PaddleFleetDriver(PaddleDriver): """ if self.local_rank == 0: # 是 rank0 的话,则拉起其它子进程 + print("in launcher") launcher = FleetLauncher(self.parallel_device, self.output_from_new_proc) launcher.launch() # 设置参数和初始化分布式环境 @@ -255,6 +253,7 @@ class PaddleFleetDriver(PaddleDriver): 当用户使用了 `python -m paddle.distributed.launch xxx.py` 启动时,我们需要 根据 paddle 设置的环境变量来获得各种属性 """ + print("set_from_env") self.world_size = dist.get_world_size() self.global_rank = dist.get_rank() @@ -296,8 +295,6 @@ class PaddleFleetDriver(PaddleDriver): @property def model_device(self): - # 我认为这里的两个 device 应该返回真实值,对 CUDA_VISIBLDE_DEIVCES的转换应该在相应的 to 函数完成 - # 否则会造成用户的困惑 return self._model_device @property @@ -407,9 +404,8 @@ class PaddleFleetDriver(PaddleDriver): def move_data_to_device(self, batch: 'paddle.Tensor'): device = self.data_device - # 因为设置了CUDA_VISIBLE_DEVICES,在子进程中可能会引起错误 - if FASTNLP_DISTRIBUTED_CHECK in os.environ: - device = get_device_from_visible(device) + # 因为设置了CUDA_VISIBLE_DEVICES,可能会引起错误 + device = get_device_from_visible(device) return paddle_move_data_to_device(batch, device) @staticmethod diff --git a/fastNLP/core/drivers/paddle_driver/initialize_paddle_driver.py b/fastNLP/core/drivers/paddle_driver/initialize_paddle_driver.py index e362017e..db30517f 100644 --- a/fastNLP/core/drivers/paddle_driver/initialize_paddle_driver.py +++ b/fastNLP/core/drivers/paddle_driver/initialize_paddle_driver.py @@ -7,7 +7,7 @@ from .single_device import PaddleSingleDriver from .fleet import PaddleFleetDriver from fastNLP.envs.imports import _NEED_IMPORT_PADDLE -from fastNLP.envs.env import FASTNLP_DISTRIBUTED_CHECK +from fastNLP.core.utils import is_in_paddle_launch_dist from fastNLP.core.log import logger if _NEED_IMPORT_PADDLE: @@ -26,13 +26,14 @@ def initialize_paddle_driver(driver: str, device: Optional[Union[str, int, List[ :return: 返回一个元组,元组的第一个值是具体的基于 pytorch 的 `Driver` 实例,元组的第二个值是该 driver 的名字(用于检测一个脚本中 先后 driver 的次序的正确问题); """ - if "PADDLE_TRAINERS_NUM" in os.environ and "PADDLE_RANK_IN_NODE" in os.environ and FASTNLP_DISTRIBUTED_CHECK not in os.environ: + if is_in_paddle_launch_dist(): if device is not None: logger.warning("Parameter `device` would be ignored when you are using `paddle.distributed.launch` to pull " "up your script. And we will directly get the local device via " - "`f'gpu:{os.environ['FLAGS_selected_gpus']}')`.") - device = [int(g) for g in os.environ["FLAGS_selected_gpus"].split(",")] - return PaddleFleetDriver(model, f"gpu:{os.environ['PADDLE_RANK_IN_NODE']}", True, **kwargs) + "and `os.environ['CUDA_VISIBLE_DEVICES']``.") + device = [int(g) for g in os.environ["CUDA_VISIBLE_DEVICES"].split(",")] + # TODO 目前一个进程仅对应一个卡,所以暂时传入一个 int + return PaddleFleetDriver(model, device[0], True, **kwargs) if driver not in {"paddle", "fleet"}: raise ValueError("Parameter `driver` can only be one of these values: ['paddle', 'fleet'].") diff --git a/fastNLP/core/utils/paddle_utils.py b/fastNLP/core/utils/paddle_utils.py index 2e1bfeda..51a19e89 100644 --- a/fastNLP/core/utils/paddle_utils.py +++ b/fastNLP/core/utils/paddle_utils.py @@ -13,7 +13,7 @@ import re from typing import Any, Optional, Union from fastNLP.envs.imports import _NEED_IMPORT_PADDLE -from fastNLP.envs import FASTNLP_DISTRIBUTED_CHECK +from fastNLP.envs import FASTNLP_DISTRIBUTED_CHECK, FASTNLP_BACKEND_LAUNCH if _NEED_IMPORT_PADDLE: import paddle @@ -94,6 +94,4 @@ def is_in_paddle_launch_dist(): """ 判断是否处于 launch 启动的分布式进程中 """ - return 'PADDLE_RANK_IN_NODE' in os.environ and \ - 'FLAGS_selected_gpus' in os.environ and \ - FASTNLP_DISTRIBUTED_CHECK not in os.environ \ No newline at end of file + return FASTNLP_BACKEND_LAUNCH in os.environ \ No newline at end of file diff --git a/tests/helpers/datasets/paddle_data.py b/tests/helpers/datasets/paddle_data.py index f00c8d95..17b2d310 100644 --- a/tests/helpers/datasets/paddle_data.py +++ b/tests/helpers/datasets/paddle_data.py @@ -15,7 +15,7 @@ class PaddleNormalDataset(Dataset): return self._data[item] -class PaddleRandomDataset(Dataset): +class PaddleRandomMaxDataset(Dataset): def __init__(self, num_samples, num_features): self.x = paddle.randn((num_samples, num_features)) self.y = self.x.argmax(axis=-1) @@ -25,23 +25,3 @@ class PaddleRandomDataset(Dataset): def __getitem__(self, item): return {"x": self.x[item], "y": self.y[item]} - - -class PaddleDataset_MNIST(Dataset): - def __init__(self, mode="train"): - - self.dataset = [ - ( - np.array(img).astype('float32').reshape(-1), - label - ) for img, label in paddle.vision.datasets.MNIST(mode=mode) - ] - - def __getitem__(self, idx): - return {"x": self.dataset[idx][0], "y": self.dataset[idx][1]} - - def __len__(self): - return len(self.dataset) - - - diff --git a/tests/helpers/models/paddle_model.py b/tests/helpers/models/paddle_model.py index 37b0ff45..a830b1ff 100644 --- a/tests/helpers/models/paddle_model.py +++ b/tests/helpers/models/paddle_model.py @@ -1,12 +1,12 @@ import paddle import paddle.nn as nn -class PaddleNormalModel_Classification(paddle.nn.Layer): +class PaddleNormalModel_Classification_1(paddle.nn.Layer): """ 基础的paddle分类模型 """ def __init__(self, num_labels, feature_dimension): - super(PaddleNormalModel_Classification, self).__init__() + super(PaddleNormalModel_Classification_1, self).__init__() self.num_labels = num_labels self.linear1 = nn.Linear(in_features=feature_dimension, out_features=64) @@ -30,3 +30,26 @@ class PaddleNormalModel_Classification(paddle.nn.Layer): x = self(x) return {"pred": x, "target": y.reshape((-1,))} + + +class PaddleNormalModel_Classification_2(paddle.nn.Layer): + """ + 基础的paddle分类模型,只实现 forward 函数测试用户自己初始化了分布式的场景 + """ + def __init__(self, num_labels, feature_dimension): + super(PaddleNormalModel_Classification_2, self).__init__() + self.num_labels = num_labels + + self.linear1 = nn.Linear(in_features=feature_dimension, out_features=64) + self.ac1 = nn.ReLU() + self.linear2 = nn.Linear(in_features=64, out_features=32) + self.ac2 = nn.ReLU() + self.output = nn.Linear(in_features=32, out_features=num_labels) + self.loss_fn = nn.CrossEntropyLoss() + + def forward(self, x, y): + x = self.ac1(self.linear1(x)) + x = self.ac2(self.linear2(x)) + x = self.output(x) + loss = self.loss_fn(x, y) + return {"loss": self.loss_fn(x, y), "pred": x, "target": y.reshape((-1,))}