From 9969502dcd69041f3badb8dfaf3c24913ba3e908 Mon Sep 17 00:00:00 2001 From: x54-729 <17307130121@fudan.edu.cn> Date: Fri, 8 Apr 2022 12:06:56 +0000 Subject: [PATCH] =?UTF-8?q?=E6=8F=90=E4=BA=A4core/drivers/paddle=5Fdriver/?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../core/drivers/paddle_driver/__init__.py | 11 + fastNLP/core/drivers/paddle_driver/fleet.py | 426 ++++++++++++++++++ .../drivers/paddle_driver/fleet_launcher.py | 176 ++++++++ .../paddle_driver/initialize_paddle_driver.py | 87 ++++ .../drivers/paddle_driver/paddle_driver.py | 315 +++++++++++++ .../drivers/paddle_driver/single_device.py | 161 +++++++ fastNLP/core/drivers/paddle_driver/utils.py | 351 +++++++++++++++ 7 files changed, 1527 insertions(+) create mode 100644 fastNLP/core/drivers/paddle_driver/__init__.py create mode 100644 fastNLP/core/drivers/paddle_driver/fleet.py create mode 100644 fastNLP/core/drivers/paddle_driver/fleet_launcher.py create mode 100644 fastNLP/core/drivers/paddle_driver/initialize_paddle_driver.py create mode 100644 fastNLP/core/drivers/paddle_driver/paddle_driver.py create mode 100644 fastNLP/core/drivers/paddle_driver/single_device.py create mode 100644 fastNLP/core/drivers/paddle_driver/utils.py diff --git a/fastNLP/core/drivers/paddle_driver/__init__.py b/fastNLP/core/drivers/paddle_driver/__init__.py new file mode 100644 index 00000000..0dc85934 --- /dev/null +++ b/fastNLP/core/drivers/paddle_driver/__init__.py @@ -0,0 +1,11 @@ +__all__ = [ + "PaddleDriver", + "PaddleSingleDriver", + "PaddleFleetDriver", + "paddle_seed_everything", +] + +from .paddle_driver import PaddleDriver +from .single_device import PaddleSingleDriver +from .fleet import PaddleFleetDriver +from .utils import paddle_seed_everything \ No newline at end of file diff --git a/fastNLP/core/drivers/paddle_driver/fleet.py b/fastNLP/core/drivers/paddle_driver/fleet.py new file mode 100644 index 00000000..ff80cb9e --- /dev/null +++ b/fastNLP/core/drivers/paddle_driver/fleet.py @@ -0,0 +1,426 @@ +import os +from functools import partial +from typing import List, Union, Optional, Dict + +from .paddle_driver import PaddleDriver +from .fleet_launcher import FleetLauncher +from .utils import ( + _FleetWrappingModel, + ForwardState, + _MODE_PARAMETER, + get_host_name_ip, + get_device_from_visible, + reset_seed, +) + +from fastNLP.envs.imports import _NEED_IMPORT_PADDLE +from fastNLP.core.utils import ( + auto_param_call, + check_user_specific_params, + paddle_move_data_to_device, + is_in_paddle_dist, +) +from fastNLP.core.samplers import ReproducibleIterator, RandomSampler, UnrepeatedDistributedSampler +from fastNLP.envs.env import FASTNLP_DISTRIBUTED_CHECK, USER_CUDA_VISIBLE_DEVICES +from fastNLP.core.log import logger + +if _NEED_IMPORT_PADDLE: + import paddle + from paddle import DataParallel + import paddle.distributed.fleet as fleet + import paddle.distributed as dist + from paddle.io import BatchSampler + from paddle.optimizer import Optimizer + from paddle.fluid.reader import _DatasetKind + from paddle.fluid.dygraph import parallel_helper + +__all__ = [ + "PaddleFleetDriver", +] +# if os.path.exists(self.gloo_rendezvous_dir): +# shutil.rmtree(self.gloo_rendezvous_dir) +class PaddleFleetDriver(PaddleDriver): + def __init__( + self, + model, + parallel_device: Optional[Union[List[int], int]], + is_pull_by_paddle_run: bool = False, + fp16: bool = False, + **kwargs + ): + """ + 采用fleet接口进行并行paddle训练的driver + PaddleFleetDriver 目前考虑支持的三种启动方式: + 1. 用户自己不进行 fleet 的任何操作,直接使用我们的 Trainer,并且只运行一个 main 脚本,这时是由我们自己使用 open_subprocesses 拉起 + 多个进程,然后由 Driver 自己进行初始化 + 2. 其它情况同 1,但是用户自己使用 python -m paddle.distributed.launch 拉起; + 3. 用户自己在外面初始化 Fleet,并且通过 python -m paddle.distributed.launch 拉起; + + 注意多机的启动强制要求用户在每一台机器上使用 python -m paddle.distributed.launch 启动; + + 如果用户自己在外面初始化了 fleet,那么 + parallel_device 为 None; + data_device 为 表示单卡的一个参数; + dist.is_initialized 为 true; + """ + super(PaddleFleetDriver, self).__init__(model, fp16=fp16, **kwargs) + + # 如果不是通过 launch 启动,要求用户必须传入 parallel_device + if not is_pull_by_paddle_run and parallel_device is None: + raise ValueError("Parameter `parallel_device` can not be None when using `PaddleFleetDriver`. This error is caused " + "when your value of parameter `device` is `None` in your `Trainer` instance.") + + # 如果用户自己初始化了 paddle 的分布式训练那么一定是通过 launch 拉起的 + self.is_pull_by_paddle_run = is_pull_by_paddle_run + self.parallel_device = parallel_device + # 在初始化时,如果发现 is_pull_by_paddle_run ,则将 parallel_device 设置成当前进程的gpu + if is_pull_by_paddle_run: + self._model_device = parallel_device + else: + self._model_device = parallel_device[self.local_rank] + + # 如果用户自己在外面初始化了并行模型; + self.outside_fleet = False + # 检测 paddle 分布式的环境变量 + if parallel_helper._is_parallel_ctx_initialized(): + # 如果用户自己在外面初始化了 DDP,那么我们要求用户传入的模型一定是已经由 DistributedDataParallel 包裹后的模型; + if not isinstance(model, DataParallel): + raise RuntimeError( + "It is not allowed to input a normal model instead of `paddle.DataParallel` when" + "you initialize the paddle distribued process out of our control.") + + self.outside_fleet = True + # 用户只有将模型上传到对应机器上后才能用 DataParallel 包裹,因此如果用户在外面初始化了 Fleet,那么在 PaddleFleetDriver 中 + # 我们就直接将 model_device 置为 None; + self._model_device = None + + def _running_fn_(batch, step_fn, signature_fn): + if isinstance(batch, Dict): + return auto_param_call(step_fn, batch, signature_fn=signature_fn) + else: + return self._validate_step(batch) + + model = model._layers + if hasattr(model, "train_step"): + logger.warning( + "Notice your model is a `paddle.DataParallel` model. And your " + "model also implements the `train_step` method, which we can not call actually, we will" + " call `forward` function instead of `train_step` and you should note that.") + self._train_step = partial(_running_fn_, step_fn=self.model, signature_fn=model.forward) + # self._train_signature_fn = model.forward + + if hasattr(model, "validate_step"): + logger.warning( + "Notice your model is a `paddle.DataParallel` model. And your " + "model also implements the `validate_step` method, which we can not call actually, " + "we will call `forward` function instead of `validate_step` and you should note that.") + self._validate_step = partial(_running_fn_, step_fn=self.model, signature_fn=model.forward) + # self._validate_signature_fn = model.forward + + if hasattr(model, "test_step"): + logger.warning( + "Notice your model is a `paddle.DataParallel` model. And your " + "model also implements the `test_step` method, which we can not call actually, we will" + " call `forward` function instead of `test_step` and you should note that.") + self._test_step = partial(_running_fn_, step_fn=self.model, signature_fn=model.forward) + + # 当参数 `device` 为 None 时并且该参数不为 None,表示将对应的数据移到指定的机器上; + self._data_device = kwargs.get("_data_device", None) + if self._data_device is not None: + if isinstance(self._data_device, int): + if self._data_device < 0: + raise ValueError("Parameter `_data_device` can not be smaller than 0.") + _could_use_device_num = paddle.device.cuda.device_count() + if self._data_device >= _could_use_device_num: + raise ValueError("The gpu device that parameter `device` specifies is not existed.") + self._data_device = f"gpu:{self._data_device}" + elif not isinstance(self._data_device, str): + raise ValueError("Parameter `device` is wrong type, please check our documentation for the right use.") + if self.outside_fleet and paddle.device.get_device() != self._data_device: + logger.warning("`Parameter data_device` is not equal to paddle.deivce.get_device(), " + "please keep them equal to avoid some potential bugs.") + + if not self.outside_fleet and parallel_device is None: + raise ValueError("Parameter `parallel_device` can not be None when using `PaddleFleetDriver`. This error is caused " + "when your value of parameter `device` is `None` in your `Trainer` instance.") + + # 可能需要放在参数里 + self.strategy = kwargs.get("strategy", fleet.DistributedStrategy()) + self.is_collective = kwargs.get("is_collective", True) + if not self.is_collective: + raise NotImplementedError("FastNLP dose not support `parameters server` for distributed training now.") + self.role_maker = kwargs.get("role_maker", None) + + self._master_port = None + self.world_size = None + self.global_rank = 0 + self._configured = False # 防止重复调用 configure_ddp() 函数使用 + self._has_setup = False # 防止重复调用 setup() 函数 + + self._fleet_kwargs = kwargs.get("paddle_fleet_kwargs", {}) + check_user_specific_params(self._fleet_kwargs, DataParallel.__init__) + # TODO 对这些参数的检查 + + if self.local_rank == 0 and not is_in_paddle_dist(): + # 由于使用driver时模型一定会被初始化,因此在一开始程序一定会占用一部分显存来存放模型,然而这部分显存没有 + # 发挥任何作用。 + logger.warning(f"The program will use some extra space on {paddle.device.get_device()} to place your model since the model " + "has already been initialized.") + + self.output_from_new_proc = kwargs.get("output_from_new_proc", "only_error") + assert isinstance(self.output_from_new_proc, str), "Parameter `output_from_new_proc` can only be `str` type." + if self.output_from_new_proc not in {"all", "ignore", "only_error"}: + os.makedirs(name=self.output_from_new_proc, exist_ok=True) + self.output_from_new_proc = os.path.abspath(self.output_from_new_proc) + + def setup(self): + """ + 在主进程拉起其它子进程,将主进程作为rank 0 + """ + if self._has_setup: + return + self._has_setup = True + # 如果用户需要使用多机模式,那么一定进入到这里; + if self.is_pull_by_paddle_run: + + if self.outside_fleet: + # 已经初始化了多机环境 + self.set_from_fleet_environment() + else: + # 用户没有初始化多机环境 + # TODO 绕一下 + # dist.get_world_size() 只能在初始化之后进行调用; + self.world_size = int(os.environ.get("PADDLE_TRAINERS_NUM")) + self.global_rank = int(os.environ.get("PADDLE_TRAINER_ID")) + reset_seed() + logger.warning(f"\nworld size, global rank: {self.world_size}, {self.global_rank}\n") + fleet.init(self.role_maker, self.is_collective, self.strategy) + + else: + # 在用户只使用了一个分布式 trainer 的情况下 + # 此时 parallel_helper._is_parallel_ctx_initialized() 一定为 False + # parallel_device 是 list, + # if self.local_rank == 0 and FASTNLP_DISTRIBUTED_CHECK not in os.environ: + if not parallel_helper._is_parallel_ctx_initialized(): + # 没有初始化分布式环境,且是主进程 + self.init_fleet_and_set() + # 用户在这个 trainer 前面又初始化了一个 trainer,并且使用的是 PaddleFleetDriver; + else: + # 已经设置过一次,保证参数必须是一样的 + pre_gpus = os.environ[FASTNLP_DISTRIBUTED_CHECK] + pre_gpus = [int (x) for x in pre_gpus.split(",")] + if sorted(pre_gpus) != sorted(self.parallel_device): + raise RuntimeError("Notice you are using `PaddleFleetDriver` after one instantiated `PaddleFleetDriver`, it is not" + "allowed that your second `PaddleFleetDriver` has a new setting of parameters `parallel_device`.") + + if not self.outside_fleet: + # self.model.to(self.model_device) + self.configure_fleet() + + # 初始化 self._pids,从而使得每一个进程都能接受到 rank0 的 send 操作; + # TODO 不用.to会怎么样? + self._pids = [] + dist.all_gather(self._pids, paddle.to_tensor(os.getpid(), dtype="int32")) + # TODO LOCAL_WORLD_SIZE + local_world_size = int(os.environ.get("LOCAL_WORLD_SIZE")) if "LOCAL_WORLD_SIZE" in os.environ else None + if local_world_size is None: + local_world_size = paddle.to_tensor(self.local_rank, dtype="int32") + dist.all_reduce(local_world_size, op=dist.ReduceOp.MAX) + local_world_size = local_world_size.item() + 1 + + node_rank = self.global_rank // local_world_size + self._pids = self._pids[node_rank*local_world_size: (node_rank+1)*local_world_size] + self._pids = self.tensor_to_numeric(self._pids) + + def init_fleet_and_set(self): + """ + 使用 FleetLauncher 拉起子进程 + """ + if self.local_rank == 0: + # 是 rank0 的话,则拉起其它子进程 + launcher = FleetLauncher(self.parallel_device, self.output_from_new_proc) + launcher.launch() + # 设置参数和初始化分布式环境 + reset_seed() + fleet.init(self.role_maker, self.is_collective, self.strategy) + self.global_rank = int(os.getenv("PADDLE_TRAINER_ID")) + self.world_size = int(os.getenv("PADDLE_TRAINERS_NUM")) + + # 正常情况下不会Assert出问题,但还是保险一下 + assert self.global_rank is not None + assert self.world_size is not None + assert self.world_size == len(self.parallel_device) + + def set_from_fleet_environment(self): + """ + 当用户使用了 `python -m paddle.distributed.launch xxx.py` 启动时,我们需要 + 根据 paddle 设置的环境变量来获得各种属性 + """ + self.world_size = dist.get_world_size() + self.global_rank = dist.get_rank() + + def barrier(self): + dist.barrier() + + def configure_fleet(self): + if not self._configured and not isinstance(self.model, DataParallel): + self.model = DataParallel( + _FleetWrappingModel(self.model), + **self._fleet_kwargs + ) + + self._train_step = partial(self.model, **{_MODE_PARAMETER: ForwardState.TRAIN}) + self._validate_step = partial(self.model, **{_MODE_PARAMETER: ForwardState.VALIDATE}) + self._test_step = partial(self.model, **{_MODE_PARAMETER: ForwardState.TEST}) + + self._configured = True + + @property + def world_size(self) -> int: + return self._world_size + + @world_size.setter + def world_size(self, size: int) -> None: + self._world_size = size + + @property + def global_rank(self) -> int: + return self._global_rank + + @global_rank.setter + def global_rank(self, rank: int) -> None: + self._global_rank = rank + + @property + def local_rank(self) -> int: + return int(os.getenv("PADDLE_RANK_IN_NODE", "0")) + + @property + def model_device(self): + # 我认为这里的两个 device 应该返回真实值,对 CUDA_VISIBLDE_DEIVCES的转换应该在相应的 to 函数完成 + # 否则会造成用户的困惑 + return self._model_device + + @property + def data_device(self): + if self.outside_fleet: + return self._data_device + return self.model_device + + def train_step(self, batch): + return self._train_step(batch) + + def validate_step(self, batch): + return self._validate_step(batch) + + def test_step(self, batch): + return self._test_step(batch) + + def replace_sampler(self, dataloader, dist_sampler: Optional[Union[str, ReproducibleIterator]] = "dist", reproducible: bool = False): + + # 暂时不支持iterableDataset + assert dataloader.dataset_kind != _DatasetKind.ITER, \ + "FastNLP does not support `IteratorDataset` now." + if isinstance(dist_sampler, ReproducibleIterator): + dataloader.batch_sampler.sampler = dist_sampler + return dataloader + + # paddle 的 BatchSampler 和 DataLoader 没有 shuffle 成员,只能根据 sampler 判断 + # 但是其子类 DistributedBatchSampler 却有 shuffle 成员 + # 因此用 type() 进行严格的判断 + if type(dataloader.batch_sampler) == BatchSampler: + shuffle = isinstance(dataloader.batch_sampler.sampler, RandomSampler) + else: + shuffle = dataloader.batch_sampler.shuffle + + # trainer, evaluator + if dist_sampler is None: + if reproducible: + raise RuntimeError("It is not allowed to use checkpoint retraining when you initialize fleet out of our " + "control.") + else: + return dataloader + # trainer + elif dist_sampler == "dist": + # 如果用户的 trainer.use_dist_sampler 为 True,那么此时其是否进行断点重训,不影响这里的行为; + if isinstance(dataloader.batch_sampler.sampler, ReproducibleIterator): + dataloader.batch_sampler.sampler.set_distributed( + num_replicas=self.world_size, + rank=self.global_rank, + pad=True + ) + return dataloader + else: + sampler = RandomSampler( + dataset=dataloader.dataset, + shuffle=shuffle, + seed=int(os.environ.get("FASTNLP_SEED", 0)) + ) + sampler.set_distributed( + num_replicas=self.world_size, + rank=self.global_rank, + pad=True + ) + dataloader.batch_sampler.sampler = sampler + return dataloader + # evaluator + elif dist_sampler == "unrepeatdist": + sampler = UnrepeatedDistributedSampler( + dataset=dataloader.dataset, + shuffle=shuffle, + seed=int(os.environ.get("FASTNLP_SEED", 0)) + ) + sampler.set_distributed( + num_replicas=self.world_size, + rank=self.global_rank + ) + dataloader.batch_sampler.sampler = sampler + return dataloader + else: + raise ValueError("Parameter `dist_sampler` can only be one of three values: ('dist', 'unrepeatdist', None).") + + def backward(self, loss): + self.grad_scaler.scale(loss).backward() + + def step(self): + for optimizer in self.optimizers: + self.grad_scaler.step(optimizer) + self.grad_scaler.update() + + def is_global_zero(self): + return self.global_rank == 0 + + def get_no_sync_context(self): + return self.model.no_sync + + def unwrap_model(self): + _layers = self.model._layers + if isinstance(_layers, _FleetWrappingModel): + return _layers.model + else: + return _layers + + def get_local_rank(self) ->int: + return self.local_rank + + def is_distributed(self): + return True + + def move_data_to_device(self, batch: 'paddle.Tensor'): + device = self.data_device + # 因为设置了CUDA_VISIBLE_DEVICES,在子进程中可能会引起错误 + if FASTNLP_DISTRIBUTED_CHECK in os.environ: + device = get_device_from_visible(device) + return paddle_move_data_to_device(batch, device) + + @staticmethod + def _check_optimizer_legality(optimizers): + """ + paddle存在设置分布式optimizers的函数,返回值为fleet.meta_optimizers.HybridParallelOptimizer + 重写是为了防止单卡下也传入了分布式的优化器 + """ + DistribuedOptimizer = fleet.meta_optimizers.HybridParallelOptimizer + for each_optimizer in optimizers: + if not isinstance(each_optimizer, (Optimizer, DistribuedOptimizer)): + raise ValueError(f"Each optimizer of parameter `optimizers` should be 'paddle.optimizer.Optimizer' type, " + f"not {type(each_optimizer)}.") diff --git a/fastNLP/core/drivers/paddle_driver/fleet_launcher.py b/fastNLP/core/drivers/paddle_driver/fleet_launcher.py new file mode 100644 index 00000000..66eccfca --- /dev/null +++ b/fastNLP/core/drivers/paddle_driver/fleet_launcher.py @@ -0,0 +1,176 @@ +import os +import sys +import __main__ +import tempfile +import copy +from typing import List + +from fastNLP.core.drivers.utils import distributed_open_proc +from fastNLP.envs.env import ( + FASTNLP_DISTRIBUTED_CHECK, + FASTNLP_LOG_LEVEL, + FASTNLP_GLOBAL_SEED, + USER_CUDA_VISIBLE_DEVICES, +) +from .utils import ( + find_free_ports, + reset_seed, +) + +# 记录各个进程信息 +class SubTrainer(object): + """ + 和fastnlp的Triainer没有关系,仅用于统计节点内不同训练的一些信息 + """ + def __init__(self, endpoint=None, rank=None): + self.devices = [] + self.endpoint = endpoint + self.rank = rank + + +class FleetLauncher: + """ + 复原了 paddle 的 launch_collective 函数,将其集成到一个类里 + 仅支持单机多卡的启动 + """ + def __init__( + self, + devices: List[int], + output_from_new_proc: str = "only_error" + ): + + self.devices = devices + self.output_from_new_proc = output_from_new_proc + + self.setup() + + def setup(self): + + self.set_endpoints() + self.sub_trainers = self.get_process_info() + + def launch(self) -> int: + # 设置环境变量 + self.global_envs = self.get_global_env() + self.open_subprocess() + reset_seed() + + def open_subprocess(self): + + if __main__.__spec__ is None: + # Script called as `python a/b/c.py` + # when user is using hydra find the absolute path + path_lib = os.path.abspath + + # pull out the commands used to run the script and resolve the abs file path + command = sys.argv + try: + full_path = path_lib(command[0]) + except Exception: + full_path = os.path.abspath(command[0]) + + command[0] = full_path + # use the same python interpreter and actually running + command = [sys.executable] + command + else: # Script called as `python -m a.b.c` + command = [sys.executable, "-m", __main__.__spec__._name] + sys.argv[1:] + + current_env = copy.copy(self.global_envs) + for idx, t in enumerate(self.sub_trainers): + proc_env = { + # global_rank + "PADDLE_TRAINER_ID": f"{t.rank}", + "PADDLE_CURRENT_ENDPOINT": f"{t.endpoint}", + # rank + "PADDLE_RANK_IN_NODE": f"{idx}", + "PADDLE_LOCAL_DEVICE_IDS": + ",".join([str(g) for g in t.devices]), + } + + if len(t.devices) > 0: + proc_env["FLAGS_selected_gpus"] = "%s" % ",".join( + [str(g) for g in t.devices]) + proc_env["FLAGS_selected_devices"] = "%s" % ",".join( + [str(g) for g in t.devices]) + + current_env.update(proc_env) + + if os.environ.get(FASTNLP_GLOBAL_SEED) is None and FASTNLP_GLOBAL_SEED in current_env: + del current_env[FASTNLP_GLOBAL_SEED] + + if idx != 0: + # 子进程 + if os.environ.get(FASTNLP_LOG_LEVEL, None) is None: + current_env[FASTNLP_LOG_LEVEL] = "warning" + proc = distributed_open_proc(self.output_from_new_proc, command, current_env, t.rank) + else: + # 更新当前的环境变量 + os.environ.update(current_env) + + def get_global_env(self): + + global_envs = copy.copy(os.environ.copy()) + self.gloo_rendezvous_dir = tempfile.mkdtemp() + # launch中涉及的gloo环境 + global_envs["PADDLE_WITH_GLOO"] = str(os.getenv("PADDLE_WITH_GLOO", "0")) + global_envs["PADDLE_GLOO_RENDEZVOUS"] = "3" + global_envs["PADDLE_GLOO_FS_PATH"] = self.gloo_rendezvous_dir + global_envs["PADDLE_DISTRI_BACKEND"] = "nccl" + + # 通过FNLP初始化的标志 + global_envs[FASTNLP_DISTRIBUTED_CHECK] = f"{','.join([str(g) for g in self.devices])}" + + # 统计全局信息 + device_ids = [] + for t in self.sub_trainers: + device_ids.append([str(acc) for acc in t.devices]) + world_device_ids = [':'.join(ele) for ele in device_ids] + # 全局环境变量 + global_envs.update({ + # world_size + "PADDLE_TRAINERS_NUM": f"{len(self.sub_trainers)}", + "PADDLE_TRAINER_ENDPOINTS": ",".join(self.endpoints), + "PADDLE_WORLD_DEVICE_IDS": ",".join(world_device_ids), + }) + + return global_envs + + def set_endpoints(self): + """ + Reference to `get_cluster_from_args` + """ + self.node_ip = "127.0.0.1" + + free_ports = None + if os.environ.get("FLAGS_START_PORT") is None: + free_ports = find_free_ports(len(self.devices)) + if free_ports is not None: + free_ports = list(free_ports) + else: + start_port = int(os.getenv("FLAGS_START_PORT", "6070")) + + free_ports = [ + x for x in range(start_port, start_port + len(self.devices)) + ] + + self.endpoints = ["%s:%d" % (self.node_ip, port) for port in free_ports] + + def get_process_info(self): + """ + Reference to `get_cluster` + """ + sub_trainers = [] + assert len(self.endpoints) >= len( + self.devices + ), "current trainer_endpoints size should be greater equal than acclerators size." + + for i in range(len(self.devices)): + sub_trainer = SubTrainer(f"{self.endpoints[i]}", i) + if isinstance(self.devices[i], (list, tuple)): + sub_trainer.devices.extend(self.devices[i]) + else: + sub_trainer.devices.append(self.devices[i]) + + sub_trainers.append(sub_trainer) + + return sub_trainers diff --git a/fastNLP/core/drivers/paddle_driver/initialize_paddle_driver.py b/fastNLP/core/drivers/paddle_driver/initialize_paddle_driver.py new file mode 100644 index 00000000..0e76ceae --- /dev/null +++ b/fastNLP/core/drivers/paddle_driver/initialize_paddle_driver.py @@ -0,0 +1,87 @@ +import os + +from typing import Optional, List, Sequence, Union + +from .paddle_driver import PaddleDriver +from .single_device import PaddleSingleDriver +from .fleet import PaddleFleetDriver + +from fastNLP.envs.imports import _NEED_IMPORT_PADDLE +from fastNLP.envs.env import FASTNLP_DISTRIBUTED_CHECK +from fastNLP.core.log import logger + +if _NEED_IMPORT_PADDLE: + import paddle + +def initialize_paddle_driver(driver: str, device: Optional[Union[str, int, List[int]]], + model: paddle.nn.Layer, **kwargs) -> PaddleDriver: + r""" + 用来根据参数 `driver` 和 `device` 来确定并且初始化一个具体的 `Driver` 实例然后返回回去; + 注意如果输入的 `device` 如果和 `driver` 对应不上就直接报错; + + :param driver: 该参数的值应为以下之一:["paddle", "fleet"]; + :param device: 该参数的格式与 `Trainer` 对参数 `device` 的要求一致; + :param model: 训练或者评测的具体的模型; + + :return: 返回一个元组,元组的第一个值是具体的基于 pytorch 的 `Driver` 实例,元组的第二个值是该 driver 的名字(用于检测一个脚本中 + 先后 driver 的次序的正确问题); + """ + if "PADDLE_TRAINERS_NUM" in os.environ and "PADDLE_RANK_IN_NODE" in os.environ and FASTNLP_DISTRIBUTED_CHECK not in os.environ: + if device is not None: + logger.warning("Parameter `device` would be ignored when you are using `paddle.distributed.launch` to pull " + "up your script. And we will directly get the local device via " + "`f'gpu:{os.environ['FLAGS_selected_gpus']}')`.") + device = [int(g) for g in os.environ["FLAGS_selected_gpus"].split(",")] + return PaddleFleetDriver(model, f"gpu:{os.environ['PADDLE_RANK_IN_NODE']}", True, **kwargs) + + if driver not in {"paddle", "fleet"}: + raise ValueError("Parameter `driver` can only be one of these values: ['paddle', 'fleet'].") + + cuda_visible_devices = os.getenv("CUDA_VISIBLE_DEVICES") + user_visible_devices = os.getenv("USER_CUDA_VISIBLE_DEVICES") + # 优先级 user > cuda + # 判断单机情况 device 的合法性 + # 分布式情况下通过 world_device 判断 + if user_visible_devices is not None: + _could_use_device_num = len(user_visible_devices.split(",")) + elif cuda_visible_devices is not None: + _could_use_device_num = len(cuda_visible_devices.split(",")) + else: + _could_use_device_num = paddle.device.cuda.device_count() + if isinstance(device, int): + if device < 0 and device != -1: + raise ValueError("Parameter `device` can only be '-1' when it is smaller than 0.") + if device >= _could_use_device_num: + raise ValueError("The gpu device that parameter `device` specifies is not existed.") + device = f"gpu:{device}" + elif isinstance(device, Sequence) and not isinstance(device, str): + device = list(set(device)) + for each in device: + if not isinstance(each, int): + raise ValueError("When parameter `device` is 'Sequence' type, the value in it should be 'int' type.") + elif each < 0: + raise ValueError("When parameter `device` is 'Sequence' type, the value in it should be bigger than 0.") + if len(device) == 1: + # 传入了 [1] 这样的,视为单卡。 + device = device[0] + elif device is not None and not isinstance(device, str): + raise ValueError("Parameter `device` is wrong type, please check our documentation for the right use.") + + if driver == "paddle": + if not isinstance(device, List): + return PaddleSingleDriver(model, device, **kwargs) + else: + logger.warning("Notice you are using `paddle` driver but your chosen `device` are multi gpus, we will use" + "`Fleetriver` by default. But if you mean using `PaddleFleetDriver`, you should choose parameter" + "`driver` as `PaddleFleetDriver`.") + return PaddleFleetDriver(model, device, **kwargs) + elif driver == "fleet": + if not isinstance(device, List): + if device == "cpu": + raise ValueError("You are using `fleet` driver, but your chosen `device` is 'cpu'.") + logger.warning("Notice you are using `fleet` driver, but your chosen `device` is only one gpu, we will" + "still use `PaddleFleetDriver` for you, but if you mean using `PaddleSingleDriver`, you should " + "choose `paddle` driver.") + return PaddleFleetDriver(model, device, **kwargs) + else: + return PaddleFleetDriver(model, device, **kwargs) diff --git a/fastNLP/core/drivers/paddle_driver/paddle_driver.py b/fastNLP/core/drivers/paddle_driver/paddle_driver.py new file mode 100644 index 00000000..84ce6ec2 --- /dev/null +++ b/fastNLP/core/drivers/paddle_driver/paddle_driver.py @@ -0,0 +1,315 @@ +import os +import random +from typing import Union, Optional, Callable, Dict +from functools import partial + +import numpy as np + +from .utils import _build_fp16_env +from fastNLP.envs.imports import _NEED_IMPORT_PADDLE +from fastNLP.core.drivers.driver import Driver +from fastNLP.core.utils import apply_to_collection, paddle_move_data_to_device +from fastNLP.envs import rank_zero_call +from fastNLP.envs import FASTNLP_SEED_WORKERS +from fastNLP.core.log import logger + +if _NEED_IMPORT_PADDLE: + import paddle + from paddle.io import DataLoader, IterableDataset + from paddle.optimizer import Optimizer + + _reduces = { + 'max': paddle.max, + 'min': paddle.min, + 'mean': paddle.mean, + 'sum': paddle.sum + } + +class PaddleDriver(Driver): + r""" + Paddle框架的Driver,包括实现单卡训练的`PaddleSingleDriver`和分布式训练的`PaddleFleetDriver`。 + """ + def __init__(self, model, fp16: Optional[bool] = False, **kwargs): + if not isinstance(model, paddle.nn.Layer): + raise ValueError(f"Parameter `model` can not be `{type(model)}` in `PaddleDriver`, it should be exactly " + f"`paddle.nn.Layer` type.") + + super(PaddleDriver, self).__init__(model) + self.fp16 = fp16 + + # scaler的参数 + self.auto_cast, _grad_scaler = _build_fp16_env(dummy=not fp16) + self.grad_scaler = _grad_scaler() + + def zero_grad(self, set_to_none: bool = False): + r""" + 实现深度学习中的梯度的置零操作,应当直接通过优化器 optimizers 来将梯度置零; + 注意梯度累积不需要在这里实现,trainer 已经在内部实现了梯度累积; + + :param set_to_none: 用来判断是否需要将梯度直接置为 None;Paddle中这个参数无效。 + """ + # if set_to_none: + # log.warning("Parameter `set_to_none` does nothing in paddle since grad cannot be set directly.") + for optimizer in self.optimizers: + optimizer.clear_grad() + + @staticmethod + def _check_dataloader_legality(dataloader, dataloader_name, is_train: bool = False): + r""" + 该函数会在 trainer 或者 evaluator 设置 dataloader 后检测 dataloader 的合法性。 + 要求传入的 dataloader 必须为 `paddle.io.DataLoader` 或包含该类型的字典。 + + :param dataloader: 需要检测的输入的 `dataloader`; + :param dataloader_name: + :param is_train: + """ + if is_train: + if not isinstance(dataloader, DataLoader): + raise ValueError(f"Parameter `{dataloader_name}` should be 'paddle.io.DataLoader' type, not {type(dataloader)}.") + # TODO 我们先禁止 dataloader 的 dataset 是 IterableDataset 种类; + if isinstance(dataloader.dataset, IterableDataset): + raise TypeError("`IterableDataset` is not allowed.") + else: + if not isinstance(dataloader, Dict): + raise ValueError(f"Parameter `{dataloader_name}` should be 'Dict' type, not {type(dataloader)}.") + else: + for each_dataloader in dataloader.values(): + if not isinstance(each_dataloader, DataLoader): + raise ValueError(f"Each dataloader of parameter `{dataloader_name}` should be 'paddle.io.DataLoader' " + f"type, not {type(each_dataloader)}.") + if isinstance(each_dataloader.dataset, IterableDataset): + raise TypeError("`IterableDataset` is not allowed.") + + @staticmethod + def _check_optimizer_legality(optimizers): + r""" + 对于用户传入 trainer 的每一个 optimizer检测其合法性,必须为`paddle.optimizer.Optimizer`类型。 + + :param optimizers: 需要检测的 `optimizers`; + """ + for each_optimizer in optimizers: + if not isinstance(each_optimizer, Optimizer): + raise ValueError(f"Each optimizer of parameter `optimizers` should be 'paddle.optimizer.Optimizer' type, " + f"not {type(each_optimizer)}.") + + def check_evaluator_mode(self, mode: str): + r""" + 因为我们在具体的 driver 的 validate_step 和 test_step 的逻辑是如果模型没有实现本函数,那么就去检测模型是否实现了另一个函数; + 因此如果用户的 evaluator mode 是 validate,但是传入的 model 却没有实现 validate_step 函数,而是实现了 test_step 函数,那么 + 我们应当提醒用户这一行为; + """ + model = self.unwrap_model() + if mode == "validate": + if not hasattr(model, "validate_step"): + if hasattr(model, "test_step"): + logger.warning( + "Your model does not have 'validate_step' method but has 'test_step' method, but you" + "are using 'Evaluator.validate', we are going to use 'test_step' to substitute for" + "'validate_step'.") + + else: + if not hasattr(model, "test_step"): + if hasattr(model, "validate_step"): + logger.warning("Your model does not have 'test_step' method but has 'validate' method, but you" + "are using 'Evaluator.test', we are going to use 'validate_step' to substitute for" + "'test_step'.") + + @staticmethod + def tensor_to_numeric(tensor, reduce=None): + r""" + 将一个 `tensor` 对象(类型为 `paddle.Tensor` )转换为 python 的 `numeric` 对象;如果 tensor 只包含一个 + 元素则返回 float 或 int 。 + + :param tensor: 需要被转换的 `tensor` 对象 + :param reduce: 可选 ['sum', 'max', 'mea', 'min'],如果不为 None 将使用该 reduce 方法来处理当前 tensor 再返回 + float 或 int 对象。 + :return: 转换后返回的结果 + """ + if tensor is None: + return None + + def _translate(_data): + # 如果只含有一个元素,则返回元素本身,而非list + if _data.numel().item() == 1: + return _data.item() + if reduce is None: + return _data.tolist() + else: + return _reduces[reduce](_data).item() + + return apply_to_collection( + data=tensor, + dtype=paddle.Tensor, + function=_translate + ) + + def set_model_mode(self, mode: str): + r""" + 设置模型为 `train` / `eval` 的模式;目的是为切换模型训练和推理(会关闭dropout等)模式; + + :param mode: 应为二者之一:["train", "eval"]; + """ + assert mode in {"train", "eval"} + getattr(self.model, mode)() + + @rank_zero_call + def save_model(self, filepath: str, only_state_dict: bool = True, model_save_fn: Optional[Callable]=None, **kwargs): + r""" + 保存模型的函数;注意函数 `save` 是用来进行断点重训的函数; + 如果 `model_save_fn` 是一个可调用的函数,那么我们会直接运行该函数; + + :param filepath: 保存文件的文件位置(需要包括文件名); + :param only_state_dict: 是否只保存模型的 `state_dict`;注意该参数仅当 `model_save_fn` 为 None 时有效; + :param model_save_fn: 用户传入的用来代替该函数本身保存逻辑的函数;如果该参数不为 None,那么我们会调用 model_save_fn(path); + """ + if model_save_fn is not None: + model_save_fn(filepath) + else: + model = self.unwrap_model() + if only_state_dict: + paddle.save(model.state_dict(), filepath) + else: + input_spec = kwargs.get("input_spec", None) + if input_spec is None: + raise Exception("To save the whole Paddle Layer, parameter 'input_spec' is needed.") + paddle.jit.save(model, filepath, input_spec) + + @staticmethod + @rank_zero_call + def load_model(filepath: str, load_dict: bool = True): + r""" + 加载模型的函数;注意函数 `load` 是用来进行断点重训的函数; + + :param filepath: 需要被加载的对象的文件位置(需要包括文件名); + :param load_dict: 是否加载state_dict,默认为True。当用户在save_model时将only_state_dict设置为False时, + 即保存了整个模型时,这个参数必须也为False + :return: 返回加载指定文件后的结果; + """ + if load_dict: + return paddle.load(filepath) + else: + return paddle.jit.load(filepath) + + @rank_zero_call + def save(self, folder, states: Dict): + r""" + 断点重训的保存函数,该函数会负责保存模型和 optimizers 的 state_dict; + 需要注意 driver 应当是无状态的,即不管什么时候调用 driver 的接口函数,其返回的结果应该都是一样的;因此,断点重训不需要保存 driver + 本身自己的任何状态;而每一个 driver 实例需要在该函数中实现保存模型和 optimizers 的 state_dict 的逻辑;同时妥善存储传入的 + states 中的内容(主要用于恢复 Trainer ,Callback 等) + 需要保证该函数只在 global rank 0 上运行 + + :param folder: 保存断点重训的状态的文件名; + :param states: 由 trainer 传入的一个字典,其中已经包含了为了实现断点重训所需要保存的其它对象的状态,Driver 应该只需要保存 + 该对象即可, Driver 应该不需要理解该对象,同时在 driver.load() 的时候,需要将 states 返回回去,load()返回的值与这里的 + 传入的值保持一致。 + """ + # 1. 保存模型的状态; + model = self.unwrap_model() + model_state_dict = {name: param.cpu().detach().clone() for name, param in model.state_dict().items()} + # 对于单卡的 driver 来讲,我们实际上(现在)不应该考虑用户在DDP环境下使用单卡模式,从而造成效率损失; + states["model_state_dict"] = model_state_dict + + # 2. 保存 optimizers 的状态; + optimizers_state_dict = {} + for i in range(len(self.optimizers)): + optimizer: Optimizer = self.optimizers[i] + optimizer_state = optimizer.state_dict() + optimizer_state = {name: param.cpu().detach().clone() for name, param in optimizer_state.items()} + optimizers_state_dict[f"optimizer{i}"] = optimizer_state # 注意这里没有使用 deepcopy,测试是不需要的; + states["optimizers_state_dict"] = optimizers_state_dict + + paddle.save(states, folder) + + def load(self, filepath) -> Dict: + r""" + 断点重训的加载函数,注意该函数会负责读取数据,并且恢复模型和 optimizers 的 state_dict 等; + driver 实例需要在该函数中先加载模型和 optimizers 的 state_dict,然后将一个 state 字典返回给 trainer 。 + 因此 save 函数和 load 函数的接受和返回值应该是对应的; + + 该函数需要在所有 rank 上执行。 + + :param filepath: 保存断点重训的状态的文件名; + :return: 需要返回 save 函数输入的 states 内容; + """ + states = paddle.load(filepath) + + # 1. 加载 optimizers 的状态; + optimizers_state_dict = states["optimizers_state_dict"] + for i in range(len(self.optimizers)): + optimizer: paddle.optimizer.Optimizer = self.optimizers[i] + optimizer.set_state_dict(optimizers_state_dict[f"optimizer{i}"]) + + # 2. 加载模型状态; + model = self.unwrap_model() + model.load_dict(states["model_state_dict"]) + + self.barrier() + return states + + def get_evaluate_context(self): + r""" + 返回一个不计算梯度的环境用来对模型进行评测; + + :return: context 上下文对象 `paddle.no_grad`; + """ + return paddle.no_grad + + @staticmethod + def move_model_to_device(model: 'paddle.nn.Layer', device: Union[str, int, 'paddle.CUDAPlace', 'paddle.CPUPlace']): + r""" + 用来将模型转移到指定的 device 上; + 在 Paddle 中使用可能会引起因与设置的设备不一致而产生的问题,请注意。 + """ + if device is not None: + model.to(device) + + def move_data_to_device(self, batch: 'paddle.Tensor'): + r""" + 将数据迁移到指定的机器上;batch 可能是 list 也可能 dict ,或其嵌套结构。 + 在 Paddle 中使用可能会引起因与设置的设备不一致而产生的问题,请注意。 + + :return: 将移动到指定机器上的 batch 对象返回; + """ + return paddle_move_data_to_device(batch, self.data_device) + + @staticmethod + def worker_init_function(worker_id: int, rank: Optional[int] = None) -> None: # pragma: no cover + """The worker_init_fn that Lightning automatically adds to your dataloader if you previously set set the seed + with ``seed_everything(seed, workers=True)``. + + See also the PyTorch documentation on + `randomness in DataLoaders `_. + """ + # implementation notes: https://github.com/pytorch/pytorch/issues/5059#issuecomment-817392562 + global_rank = rank if rank is not None else rank_zero_call.rank + # TODO gpu + process_seed = paddle.fluid.core.default_cpu_generator().initial_seed() + # back out the base seed so we can use all the bits + base_seed = process_seed - worker_id + ss = np.random.SeedSequence([base_seed, worker_id, global_rank]) + # use 128 bits (4 x 32-bit words) + np.random.seed(ss.generate_state(4)) + # Spawn distinct SeedSequences for the PyTorch PRNG and the stdlib random module + paddle_ss, stdlib_ss = ss.spawn(2) + paddle.seed(paddle_ss.generate_state(1, dtype=np.uint64)[0]) + # use 128 bits expressed as an integer + stdlib_seed = (stdlib_ss.generate_state(2, dtype=np.uint64).astype(object) * [1 << 64, 1]).sum() + random.seed(stdlib_seed) + + def set_deterministic_dataloader(self, dataloader): + r""" + 为了确定性训练要对 dataloader 进行修改,保证在确定随机数种子后,每次重新训练得到的结果是一样的; + 作用是替换 datalaoder 的 `worker_init_fn`。 + """ + if int(os.environ.get(FASTNLP_SEED_WORKERS, 0)) and dataloader.worker_init_fn is None: + dataloader.worker_init_fn = partial(self.worker_init_function, rank=self.global_rank) + + def set_sampler_epoch(self, dataloader: 'DataLoader', cur_epoch_idx): + r""" + 对于分布式的 sampler,dataloader 需要在每一个 epoch 前设置随机数种子,来保证每一个进程上的 shuffle 是一样的; + + :param cur_epoch_idx: 当前是第几个 epoch; + """ + if callable(getattr(dataloader.batch_sampler, "set_epoch", None)): + dataloader.batch_sampler.set_epoch(cur_epoch_idx) diff --git a/fastNLP/core/drivers/paddle_driver/single_device.py b/fastNLP/core/drivers/paddle_driver/single_device.py new file mode 100644 index 00000000..849bf4d1 --- /dev/null +++ b/fastNLP/core/drivers/paddle_driver/single_device.py @@ -0,0 +1,161 @@ +from typing import Optional, Dict, Union + +from .paddle_driver import PaddleDriver +from fastNLP.envs.imports import _NEED_IMPORT_PADDLE +from fastNLP.core.utils import auto_param_call, get_paddle_gpu_str +from fastNLP.core.samplers import ReproducibleBatchSampler, ReproducibleIterator +from fastNLP.core.log import logger + +if _NEED_IMPORT_PADDLE: + import paddle + from paddle.fluid.reader import _DatasetKind + +__all__ = [ + "PaddleSingleDriver", +] + +class PaddleSingleDriver(PaddleDriver): + def __init__(self, model, device: Optional[str], fp16: Optional[bool] = False, **kwargs): + super(PaddleSingleDriver, self).__init__(model, fp16=fp16, **kwargs) + + if device is None: + raise ValueError("Parameter `device` can not be None in `PaddleSingleDriver`.") + + if isinstance(device, int): + self.model_device = get_paddle_gpu_str(device) + else: + self.model_device = device + + self.local_rank = 0 + self.global_rank = 0 + self.world_size = 1 + + if isinstance(model, paddle.DataParallel): + # 注意这里的 unwrap_model 调用的是具体子类的方法; + model = self.unwrap_model() + if hasattr(model, "train_step"): + logger.warning("Notice your model is a `paddle.DataParallel` model. And your model also " + "implements the `train_step` method, which we can not call actually, we will " + " call `forward` function instead of `train_step` and you should note that.") + self._train_step = self.model + self._train_signature_fn = model.forward + + if hasattr(model, "validate_step"): + logger.warning("Notice your model is a `paddle.DataParallel` model. And your model also " + "implements the `validate_step` method, which we can not call actually, we " + "will call `forward` function instead of `validate_step` and you should note that.") + self._validate_step = self.model + self._validate_signature_fn = model.forward + + if hasattr(model, "test_step"): + logger.warning("Notice your model is a `paddle.DataParallel` model. And your model also " + "implements the `test_step` method, which we can not call actually, we will " + "call `forward` function instead of `test_step` and you should note that.") + self._test_step = self.model + self._test_signature_fn = model.forward + else: + if hasattr(self.model, "train_step"): + self._train_step = self.model.train_step + self._train_signature_fn = None + else: + self._train_step = self.model + # 输入的模型是 `DataParallel`,我们需要保证其 signature_fn 是正确的; + model = self.unwrap_model() + self._train_signature_fn = model.forward + + if hasattr(self.model, "validate_step"): + self._validate_step = self.model.validate_step + self._validate_signature_fn = None + elif hasattr(self.model, "test_step"): + self._validate_step = self.model.test_step + self._validate_signature_fn = self.model.test_step + else: + self._validate_step = self.model + model = self.unwrap_model() + self._validate_signature_fn = model.forward + + if hasattr(self.model, "test_step"): + self._test_step = self.model.test_step + self._test_signature_fn = None + elif hasattr(self.model, "validate_step"): + self._test_step = self.model.validate_step + self._test_signature_fn = self.model.validate_step + else: + self._test_step = self.model + model = self.unwrap_model() + self._test_signature_fn = model.forward + + def setup(self): + paddle.device.set_device(self.model_device) + self.model.to(self.model_device) + + def train_step(self, batch) -> Dict: + # 如果 batch 是一个 Dict,我们就默认帮其做参数匹配,否则就直接传入到 `train_step` 函数中,让用户自己处理; + if isinstance(batch, Dict): + return auto_param_call(self._train_step, batch, signature_fn=self._train_signature_fn) + else: + return self._train_step(batch) + + def backward(self, loss): + self.grad_scaler.scale(loss).backward() + + def step(self): + for optimizer in self.optimizers: + self.grad_scaler.step(optimizer) + self.grad_scaler.update() + + def validate_step(self, batch) -> Dict: + if isinstance(batch, Dict): + return auto_param_call(self._validate_step, batch, signature_fn=self._validate_signature_fn) + else: + return self._validate_step(batch) + + def test_step(self, batch) -> Dict: + if isinstance(batch, Dict): + return auto_param_call(self._test_step, batch, signature_fn=self._test_signature_fn) + else: + return self._test_step(batch) + + def replace_sampler(self, dataloader, dist_sampler: Union[str, ReproducibleBatchSampler, ReproducibleIterator], reproducible: bool = False): + # 暂时不支持IteratorDataset + assert dataloader.dataset_kind != _DatasetKind.ITER, \ + "FastNLP does not support `IteratorDataset` now." + if isinstance(dist_sampler, ReproducibleBatchSampler): + dataloader.batch_sampler = dist_sampler + return dataloader + if isinstance(dist_sampler, ReproducibleIterator): + dataloader.batch_sampler.sampler = dist_sampler + return dataloader + + if reproducible: + if isinstance(dataloader.batch_sampler.sampler, ReproducibleIterator): + return dataloader + elif isinstance(dataloader.batch_sampler, ReproducibleBatchSampler): + return dataloader + else: + # TODO + batch_sampler = ReproducibleBatchSampler( + batch_sampler=dataloader.batch_sampler, + batch_size=dataloader.batch_sampler.batch_size, + drop_last=dataloader.drop_last + ) + dataloader.batch_sampler = batch_sampler + return dataloader + else: + return dataloader + + def unwrap_model(self): + if isinstance(self.model, paddle.DataParallel): + return self.model._layers + else: + return self.model + + @property + def data_device(self): + """ + 单卡模式不支持 data_device; + """ + return self.model_device + + def is_distributed(self): + return False diff --git a/fastNLP/core/drivers/paddle_driver/utils.py b/fastNLP/core/drivers/paddle_driver/utils.py new file mode 100644 index 00000000..9b54a30a --- /dev/null +++ b/fastNLP/core/drivers/paddle_driver/utils.py @@ -0,0 +1,351 @@ +import socket +import os +import struct +import random +import inspect +import numpy as np +from contextlib import ExitStack, closing +from enum import IntEnum +from typing import Dict, Optional, Union + +from fastNLP.envs.imports import _NEED_IMPORT_PADDLE +from fastNLP.core.utils import get_paddle_device_id, auto_param_call +from fastNLP.envs.env import FASTNLP_GLOBAL_SEED, FASTNLP_SEED_WORKERS, USER_CUDA_VISIBLE_DEVICES +from fastNLP.core.log import logger + + +if _NEED_IMPORT_PADDLE: + import paddle + from paddle import nn + from paddle.nn import Layer + from paddle.io import DataLoader, BatchSampler + from paddle.amp import auto_cast, GradScaler +else: + from fastNLP.core.utils.dummy_class import DummyClass as Layer + + +__all__ = [ + "paddle_seed_everything", +] + +def _select_seed_randomly(min_seed_value: int = 0, max_seed_value: int = 255) -> int: + return random.randint(min_seed_value, max_seed_value) + +def paddle_seed_everything(seed: Optional[int] = None, workers: bool = False) -> int: + + max_seed_value = np.iinfo(np.uint32).max + min_seed_value = np.iinfo(np.uint32).min + + if seed is None: + env_seed = os.environ.get("GLOBAL_SEED") + if env_seed is None: + seed = _select_seed_randomly(min_seed_value, max_seed_value) + # rank_zero_warn(f"No seed found, seed set to {seed}") + else: + try: + seed = int(env_seed) + except ValueError: + seed = _select_seed_randomly(min_seed_value, max_seed_value) + # rank_zero_warn(f"Invalid seed found: {repr(env_seed)}, seed set to {seed}") + elif not isinstance(seed, int): + seed = int(seed) + + if not (min_seed_value <= seed <= max_seed_value): + logger.warning("Your seed value is two big or two small for numpy, we will choose a random seed for " + "you.") + + # rank_zero_warn(f"{seed} is not in bounds, numpy accepts from {min_seed_value} to {max_seed_value}") + seed = _select_seed_randomly(min_seed_value, max_seed_value) + + # using `log.info` instead of `rank_zero_info`, + # so users can verify the seed is properly set in distributed training. + # log.info(f"Global seed set to {seed}") + os.environ[FASTNLP_GLOBAL_SEED] = str(seed) + random.seed(seed) + np.random.seed(seed) + # paddle的seed函数会自行判断是否在gpu环境,如果在的话会设置gpu的种子 + paddle.seed(seed) + os.environ[FASTNLP_SEED_WORKERS] = f"{int(workers)}" + return seed + + +def reset_seed() -> None: + """ + fleet 会开启多个进程,因此当用户在脚本中指定 seed_everything 时,在开启多个脚本后,会在每个脚本内重新 + 进行随机数的设置; + """ + seed = os.environ.get(FASTNLP_GLOBAL_SEED, None) + workers = os.environ.get(FASTNLP_SEED_WORKERS, "0") + if seed is not None: + paddle_seed_everything(int(seed), workers=bool(int(workers))) + +class ForwardState(IntEnum): + TRAIN = 0 + VALIDATE = 1 + TEST = 2 + PREDICT = 3 + +_MODE_PARAMETER = "_forward_state" + +class _FleetWrappingModel(Layer): + """ + 参考_DDPWrappingModel,paddle的分布式训练也需要用paddle.nn.DataParallel进行包装,采用和 + pytorch相似的处理方式 + """ + def __init__(self, model: 'nn.Layer'): + super(_FleetWrappingModel, self).__init__() + self.model = model + + if isinstance(model, paddle.DataParallel): + model = model._layers + if hasattr(model, "train_step"): + logger.warning( + "Notice your model is a `paddle.DataParallel` model. And your " + "model also implements the `train_step` method, which we can not call actually, we will" + " call `forward` function instead of `train_step` and you should note that.") + self._train_step = self.model + self._train_signature_fn = model.forward + + if hasattr(model, "validate_step"): + logger.warning( + "Notice your model is a `paddle.DataParallel` model. And your " + "model also implements the `validate_step` method, which we can not call actually, " + "we will call `forward` function instead of `validate_step` and you should note that.") + self._validate_step = self.model + self._validate_signature_fn = model.forward + + if hasattr(model, "test_step"): + logger.warning( + "Notice your model is a `paddle.DataParallel` model. And your " + "model also implements the `test_step` method, which we can not call actually, we will" + " call `forward` function instead of `test_step` and you should note that.") + self._test_step = self.model + self._test_signature_fn = model.forward + else: + if hasattr(model, "train_step"): + self._train_step = model.train_step + self._train_signature_fn = None + else: + self._train_step = model + self._train_signature_fn = model.forward + + if hasattr(model, "validate_step"): + self._validate_step = model.validate_step + self._validate_signature_fn = None + elif hasattr(model, "test_step"): + self._validate_step = model.test_step + self._validate_signature_fn = None + else: + self._validate_step = model + self._validate_signature_fn = model.forward + + if hasattr(model, "test_step"): + self._test_step = model.test_step + self._test_signature_fn = None + elif hasattr(model, "validate_step"): + self._test_step = model.validate_step + self._test_signature_fn = None + else: + self._test_step = model + self._test_signature_fn = model.forward + + def forward(self, batch, **kwargs) -> Dict: + + _forward_state = kwargs.pop(_MODE_PARAMETER) + + if _forward_state == ForwardState.TRAIN: + if isinstance(batch, Dict): + return auto_param_call(self._train_step, batch, signature_fn=self._train_signature_fn) + else: + return self._train_step(batch) + elif _forward_state == ForwardState.VALIDATE: + if isinstance(batch, Dict): + return auto_param_call(self._validate_step, batch, signature_fn=self._validate_signature_fn) + else: + return self._validate_step(batch) + elif _forward_state == ForwardState.TEST: + if isinstance(batch, Dict): + return auto_param_call(self._test_step, batch, signature_fn=self._test_signature_fn) + else: + return self._test_step(batch) + elif _forward_state == ForwardState.PREDICT: + raise NotImplementedError("'PREDICT' mode has not been implemented.") + else: + raise NotImplementedError("You should direct a concrete mode.") + +class DummyGradScaler: + """ + 用于仿造的GradScaler对象,防止重复写大量的if判断 + + """ + def __init__(self, *args, **kwargs): + pass + + def get_scale(self): + return 1.0 + + def is_enabled(self): + return False + + def scale(self, outputs): + return outputs + + def step(self, optimizer, *args, **kwargs): + optimizer.step(*args, **kwargs) + + def update(self, new_scale=None): + pass + + def unscale_(self, optimizer): + pass + + def load_state_dict(self, state_dict): + pass + + def state_dict(self): + return {} + + +def _build_fp16_env(dummy=False): + if dummy: + auto_cast = ExitStack + GradScaler = DummyGradScaler + else: + if not paddle.device.is_compiled_with_cuda(): + raise RuntimeError("No cuda") + if paddle.device.cuda.get_device_capability(0)[0] < 7: + logger.warning( + "NOTE: your device does NOT support faster training with fp16, " + "please switch to FP32 which is likely to be faster" + ) + return auto_cast, GradScaler + +def find_free_ports(num): + def __free_port(): + with closing(socket.socket(socket.AF_INET, socket.SOCK_STREAM)) as s: + s.setsockopt(socket.SOL_SOCKET, socket.SO_LINGER, + struct.pack('ii', 1, 0)) + s.bind(('', 0)) + return s.getsockname()[1] + + port_set = set() + step = 0 + while True: + port = __free_port() + if port not in port_set: + port_set.add(port) + + if len(port_set) >= num: + return port_set + + step += 1 + if step > 400: + logger.error( + "can't find avilable port and use the specified static port now!" + ) + return None + + return None + +def get_host_name_ip(): + try: + host_name = socket.gethostname() + host_ip = socket.gethostbyname(host_name) + return host_name, host_ip + except: + return None + +def get_device_from_visible(device: Union[str, int]): + """ + 在有 CUDA_VISIBLE_DEVICES 的情况下,获取对应的设备。 + 如 CUDA_VISIBLE_DEVICES=2,3 ,device=3 ,则返回1。 + :param devices:未转化的设备名 + :return: 转化后的设备id + """ + if device == "cpu": + return device + cuda_visible_devices = os.getenv("CUDA_VISIBLE_DEVICES") + idx = get_paddle_device_id(device) + if cuda_visible_devices is None or cuda_visible_devices == "": + # 这个判断一般不会发生,因为 fastnlp 会为 paddle 强行注入 CUDA_VISIBLE_DEVICES + return idx + else: + # 利用 USER_CUDA_VISIBLDE_DEVICES 获取用户期望的设备 + user_visiblde_devices = os.getenv(USER_CUDA_VISIBLE_DEVICES) + if user_visiblde_devices is None or user_visiblde_devices != "": + # 不为空,说明用户设置了 CUDA_VISIBLDE_DEVICES + idx = user_visiblde_devices.split(",")[idx] + else: + idx = str(idx) + + cuda_visible_devices_list = cuda_visible_devices.split(',') + assert idx in cuda_visible_devices_list, "Can't find "\ + "your devices %s in CUDA_VISIBLE_DEVICES[%s]."\ + % (idx, cuda_visible_devices) + res = cuda_visible_devices_list.index(idx) + return res + +def replace_sampler(dataloader: "DataLoader", sampler: "BatchSampler"): + # 拿到实例属性; + instance_attrs = {k: v for k, v in vars(dataloader).items() if not k.startswith('_')} + + # 拿到 dataloader '__init__' 函数的默认函数签名; + init_params = dict(inspect.signature(dataloader.__init__).parameters) + + # 这里为什么要单独弄的原因在于,用户在定制自己的 dataloader 的同时可能为了方便只设定一些参数,而后面直接使用 **kwargs 的方式,这时如果 + # 其在初始化自己的 dataloader 实例的时候加入了一些其它的新的参数(首先这一步是必要的,因为我们只能通过这样加 sampler;另一方面,用户 + # 可能确实通过 **kwargs 加入了一些新的参数),如果假设用户是这样使用的: "super().__init__(**kwargs)",那么我们就只能去 DataLoader + # 中寻找; + has_variadic_kwargs = any(v.kind is v.VAR_KEYWORD for k, v in init_params.items()) + if has_variadic_kwargs: + init_params.update(dict(inspect.signature(DataLoader.__init__).parameters)) + del init_params["self"] + + # 因为我们刚才可能用 DataLoader 的默认参数将用户定制的 dataloader 的参数覆盖掉了,因此需要重新弄一遍; + non_default_params = {name for name, p in init_params.items() if + name in instance_attrs and p.default != instance_attrs[name]} + # add `dataset` as it might have been replaced with `*args` + non_default_params.add("dataset") + + reconstruct_args = {k: v for k, v in instance_attrs.items() if k in non_default_params} + reconstruct_args.update({"batch_sampler": sampler, "shuffle": False, "drop_last": False, "batch_size": 1}) + + required_args = { + p.name + for p in init_params.values() + if p.kind in (p.POSITIONAL_ONLY, p.POSITIONAL_OR_KEYWORD) + and p.default is p.empty + and p.name not in reconstruct_args + } + + # 这种错误针对的是 __init__ 中的参数没有用同样名字的 self 挂上; + if required_args: + required_args = sorted(required_args) + dataloader_self_name = dataloader.__class__.__name__ + raise Exception( + f"Trying to inject `DistributedBatchSampler` into the `{dataloader_self_name}` instance. " + "This would fail as some of the `__init__` arguments are not available as instance attributes. " + f"The missing attributes are {required_args}. " + f"HINT: If you wrote the `{dataloader_self_name}` class, define `self.missing_arg_name` or " + "manually add the `DistributedBatchSampler` as: " + f"`{dataloader_self_name}(dataset, sampler=DistributedBatchSampler(dataset))`." + ) + + # 这种错误针对的是传入的 dataloader 不是直接的 DataLoader,而是定制了 DataLoader,但是 __init__ 中没有 **kwargs; + if not has_variadic_kwargs: + + # the dataloader signature does not allow keyword arguments that need to be passed + missing_kwargs = reconstruct_args.keys() - init_params.keys() + if missing_kwargs: + missing_kwargs = sorted(missing_kwargs) + dataloader_self_name = dataloader.__class__.__name__ + raise Exception( + f"Trying to inject `DistributedBatchSampler` into the `{dataloader_self_name}` instance. " + "This would fail as it doesn't expose all its attributes in the `__init__` signature. " + f"The missing arguments are {missing_kwargs}. " + f"HINT: If you wrote the `{dataloader_self_name}` class, add the `__init__` arguments or " + "manually add the `DistributedBatchSampler` as: " + f"`{dataloader_self_name}(dataset, sampler=DistributedBatchSampler(dataset))`." + ) + + return type(dataloader)(**reconstruct_args)