From 84d862c13176395fb6f799bf707abeb66d1524f1 Mon Sep 17 00:00:00 2001 From: x54-729 <17307130121@fudan.edu.cn> Date: Wed, 11 May 2022 10:48:37 +0000 Subject: [PATCH] =?UTF-8?q?=E5=AE=8C=E5=96=84paddle=5Fdriver=E7=9A=84?= =?UTF-8?q?=E9=83=A8=E5=88=86=E6=96=87=E6=A1=A3?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../drivers/jittor_driver/jittor_driver.py | 14 +-- fastNLP/core/drivers/jittor_driver/mpi.py | 4 + fastNLP/core/drivers/jittor_driver/utils.py | 2 +- .../core/drivers/paddle_driver/dist_utils.py | 2 + .../drivers/paddle_driver/paddle_driver.py | 117 +++++++++++------- .../drivers/paddle_driver/single_device.py | 34 +++-- fastNLP/core/drivers/paddle_driver/utils.py | 26 ++-- 7 files changed, 121 insertions(+), 78 deletions(-) diff --git a/fastNLP/core/drivers/jittor_driver/jittor_driver.py b/fastNLP/core/drivers/jittor_driver/jittor_driver.py index 7efff348..25fc4af6 100644 --- a/fastNLP/core/drivers/jittor_driver/jittor_driver.py +++ b/fastNLP/core/drivers/jittor_driver/jittor_driver.py @@ -21,6 +21,9 @@ if _NEED_IMPORT_JITTOR: 'sum': jt.sum } +__all__ = [ + "JittorDriver", +] class JittorDriver(Driver): r""" @@ -90,9 +93,6 @@ class JittorDriver(Driver): "'test_step'.") def save_model(self, filepath: str, only_state_dict: bool = False, model_save_fn: Optional[Callable]=None): - """ - 保存模型 - """ if model_save_fn is not None: outputs = model_save_fn(filepath) if outputs is not None: @@ -105,12 +105,6 @@ class JittorDriver(Driver): jt.save(states, filepath) def load_model(self, filepath: str): - """ - 加载模型的加载函数; - - :param file_path: 保存文件的文件位置(需要包括文件名); - :return: 加载后的state_dict - """ if not os.path.exists(filepath): raise FileNotFoundError("Checkpoint at {} not found.".format(filepath)) return jt.load(filepath) @@ -156,7 +150,7 @@ class JittorDriver(Driver): def move_data_to_device(self, batch: 'jt.Var'): """ - jittor暂时没有提供数据迁移的函数,因此这个函数只是简单地返回batch + **jittor** 暂时没有提供数据迁移的函数,因此这个函数只是简单地返回 **batch** """ return batch diff --git a/fastNLP/core/drivers/jittor_driver/mpi.py b/fastNLP/core/drivers/jittor_driver/mpi.py index bfa49e68..4ade3fd1 100644 --- a/fastNLP/core/drivers/jittor_driver/mpi.py +++ b/fastNLP/core/drivers/jittor_driver/mpi.py @@ -20,6 +20,10 @@ class JittorMPIDriver(JittorDriver): 这是一个正在开发中的功能,敬请期待。 + .. todo: + + 实现断点重训中替换 dataloader 的 set_dist_repro_dataloader 函数 + """ def __init__( self, diff --git a/fastNLP/core/drivers/jittor_driver/utils.py b/fastNLP/core/drivers/jittor_driver/utils.py index 43be9ac3..c6d44cfc 100644 --- a/fastNLP/core/drivers/jittor_driver/utils.py +++ b/fastNLP/core/drivers/jittor_driver/utils.py @@ -9,7 +9,7 @@ __all__ = [] class DummyGradScaler: """ - 用于仿造的GradScaler对象,防止重复写大量的if判断 + 用于仿造的 **GradScaler** 对象,防止重复写大量的if判断 """ def __init__(self, *args, **kwargs): pass diff --git a/fastNLP/core/drivers/paddle_driver/dist_utils.py b/fastNLP/core/drivers/paddle_driver/dist_utils.py index ffa142d3..4dea268d 100644 --- a/fastNLP/core/drivers/paddle_driver/dist_utils.py +++ b/fastNLP/core/drivers/paddle_driver/dist_utils.py @@ -21,6 +21,8 @@ if _NEED_IMPORT_PADDLE: _parse_load_result, ) +__all__ = [] + def _validate_output_list_for_rank(my_rank, dst, gather_list): if dst == my_rank: if not gather_list: diff --git a/fastNLP/core/drivers/paddle_driver/paddle_driver.py b/fastNLP/core/drivers/paddle_driver/paddle_driver.py index 74c7b7a8..a228a90d 100644 --- a/fastNLP/core/drivers/paddle_driver/paddle_driver.py +++ b/fastNLP/core/drivers/paddle_driver/paddle_driver.py @@ -1,14 +1,12 @@ import os import random -from typing import Union, Optional, Dict +from typing import Union, Optional, Dict, Any from pathlib import Path from functools import partial from dataclasses import dataclass import numpy as np -from fastNLP.envs.env import USER_CUDA_VISIBLE_DEVICES - from .utils import _build_fp16_env, optimizer_state_to_device, DummyGradScaler from fastNLP.envs.imports import _NEED_IMPORT_PADDLE from fastNLP.core.drivers.driver import Driver @@ -50,9 +48,26 @@ if _NEED_IMPORT_PADDLE: class PaddleDriver(Driver): r""" - Paddle框架的Driver,包括实现单卡训练的 `PaddleSingleDriver` 和分布式训练的 `PaddleFleetDriver`。 + 实现了 **PaddlePaddle** 框架训练功能的基本 Driver,实现了单卡和多卡情景下均需要实现的功能,以和 **fastNLP** 的 + :class:`~fastNLP.core.Trainer` 兼容;通过这个 Driver,可以在 **fastNLP** 中实现从 **Pytorch** 框架到 + **PaddlePaddle** 深度学习框架的切换。 + + 这个类被以下子类继承: + + 1. :class:`~fastNLP.core.drivers.PaddleSingleDriver`:实现了使用单卡和 ``cpu`` 训练的具体功能; + 2. :class:`~fastNLP.core.drivers.PaddleFleetDriver`:实现了使用 ``fleet`` 分布式训练 API 进行分布式训练的具体功能; + + :param model: 训练时使用的 **PaddlePaddle** 模型; + :param fp16: 是否开启混合精度训练; + :kwargs: + * wo_auto_param_call (``bool``) -- 是否关闭在训练时调用我们的 ``auto_param_call`` 函数来自动匹配 batch 和前向函数的参数的行为; + + .. note:: + + 关于该参数的详细说明,请参见 :class:`~fastNLP.core.controllers.Trainer` 中的描述;函数 ``auto_param_call`` 详见 :func:`fastNLP.core.utils.auto_param_call`。 + """ - def __init__(self, model, fp16: Optional[bool] = False, **kwargs): + def __init__(self, model: "paddle.nn.Layer", fp16: Optional[bool] = False, **kwargs): if not isinstance(model, paddle.nn.Layer): raise ValueError(f"Parameter `model` can not be `{type(model)}` in `PaddleDriver`, it should be exactly " f"`paddle.nn.Layer` type.") @@ -69,10 +84,10 @@ class PaddleDriver(Driver): def zero_grad(self, set_to_none: bool = False): r""" - 实现深度学习中的梯度的置零操作,应当直接通过优化器 optimizers 来将梯度置零; - 注意梯度累积不需要在这里实现,trainer 已经在内部实现了梯度累积; + 实现深度学习中的梯度的置零操作,应当直接通过优化器 ``optimizers`` 来将梯度置零; + 注意梯度累积不需要在这里实现,:class:`~fastNLP.core.Trainer` 已经在内部实现了梯度累积; - :param set_to_none: 用来判断是否需要将梯度直接置为 None;Paddle中这个参数无效。 + :param set_to_none: 用来判断是否需要将梯度直接置为 ``None``;在 **PaddlePaddle** 中这个参数无效。 """ for optimizer in self.optimizers: optimizer.clear_grad() @@ -87,14 +102,6 @@ class PaddleDriver(Driver): @staticmethod def check_dataloader_legality(dataloader, dataloader_name, is_train: bool = False): - r""" - 该函数会在 trainer 或者 evaluator 设置 dataloader 后检测 dataloader 的合法性。 - 要求传入的 dataloader 必须为 `paddle.io.DataLoader` 或包含该类型的字典。 - - :param dataloader: 需要检测的输入的 `dataloader`; - :param dataloader_name: - :param is_train: - """ if is_train: if not isinstance(dataloader, DataLoader): raise ValueError(f"Parameter `{dataloader_name}` should be 'paddle.io.DataLoader' type, not {type(dataloader)}.") @@ -164,16 +171,15 @@ class PaddleDriver(Driver): @rank_zero_call def save_model(self, filepath: str, only_state_dict: bool = True, **kwargs): r""" - 保存模型的函数;注意函数 `save` 是用来进行断点重训的函数; + 将模型保存到 ``filepath`` 中。 :param filepath: 保存文件的文件位置(需要包括文件名); - :param only_state_dict: 是否只保存模型的 `state_dict`;如果为 False,则会调用 `paddle.jit.save` 函数 - 保存整个模型的参数,此时需要传入 `input_spec` 参数,否则在 load 时会报错。 - :param kwargs: - input_spec: 描述存储模型 forward 方法的输入,当 `only_state_dict` 为 False时必须传入,否则加载时会报错。 - 可以通过 InputSpec 或者示例 Tensor 进行描述。详细的可以参考 paddle 关于`paddle.jit.save` - 的文档: - https://www.paddlepaddle.org.cn/documentation/docs/zh/api/paddle/jit/save_cn.html#save + :param only_state_dict: 是否只保存模型的 ``state_dict``;如果为 ``False``,则会调用 ``paddle.jit.save`` 函数 + 保存整个模型的参数,此时需要传入 ``input_spec`` 参数; + :kwargs: + * input_spec -- 描述存储模型 ``forward`` 方法的输入; + 当 ``only_state_dict`` 为 ``False`` 时必须传入,否则加载时会报错。您可以通过 ``InputSpec`` 或者示例 ``Tensor`` + 进行描述。详细的使用方法可以参考 **PaddlePaddle** `关于 paddle.jit.save 函数的文档 `_; """ model = self.unwrap_model() if isinstance(filepath, Path): @@ -189,14 +195,6 @@ class PaddleDriver(Driver): paddle.jit.save(model, filepath, input_spec) def load_model(self, filepath: str, only_state_dict: bool = True, **kwargs): - r""" - 加载模型的函数;将 filepath 中的模型加载并赋值给当前 model 。 - - :param filepath: 需要被加载的对象的文件位置(需要包括文件名); - :param load_state_dict: 保存的文件是否只是模型的权重,还是完整的模型。即便是保存的完整的模型,此处也只能使用尝试加载filepath - 模型中的权重到自身模型,而不会直接替代当前 Driver 中的模型。 - :return: 返回加载指定文件后的结果; - """ model = self.unwrap_model() if isinstance(filepath, Path): filepath = str(filepath) @@ -210,6 +208,28 @@ class PaddleDriver(Driver): @rank_zero_call def save(self, folder: Path, states: Dict, dataloader, only_state_dict: bool = True, should_save_model: bool = True, **kwargs): + r""" + 断点重训的保存函数,该函数会负责保存模型和 optimizers, fp16 的 state_dict;以及模型的保存(若 should_save_model 为 True) + + :param folder: 保存断点重训的状态的文件夹;save 函数应该在下面新增两(一)个文件 的 FASTNLP_CHECKPOINT_FILENAME 文件与 + FASTNLP_MODEL_FILENAME (如果 should_save_model 为 True )。把 model 相关的内容放入到 FASTNLP_MODEL_FILENAME 文件 + 中,将传入的 states 以及自身产生其它状态一并保存在 FASTNLP_CHECKPOINT_FILENAME 里面。 + :param states: 由 trainer 传入的一个字典,其中已经包含了为了实现断点重训所需要保存的其它对象的状态,Driver 应该只需要保存 + 该对象即可, Driver 应该不需要理解该对象,同时在 driver.load() 的时候,需要将 states 返回回去,load() 返回的值与这里的 + 传入的值保持一致。 + :param dataloader: 正在使用的 dataloader,需要保存里面的状态使得之后可以从当前迭代的位置恢复。 + :param only_state_dict: 是否只保存模型的参数,当 should_save_model 为 False ,该参数无效。 + :param should_save_model: 是否应该保存模型,如果为False,Driver 将不负责 model 的保存。 + :kwargs: + * input_spec -- 描述存储模型 ``forward`` 方法的输入; + 当 ``only_state_dict`` 为 ``False`` 时必须传入,否则加载时会报错。您可以通过 ``InputSpec`` 或者示例 ``Tensor`` + 进行描述。详细的使用方法可以参考 **PaddlePaddle** `关于 paddle.jit.save 函数的文档 `_; + + .. todo: + + 等 Driver 的文档写完 + + """ # 传入的 dataloader 参数是 trainer 的 dataloader 属性,因为 driver 的所有 dataloader 我们是不会去改变它的,而是通过改变 # trainer.dataloader 来改变 dataloader 的状态,从而适配训练或者评测环境; @@ -352,37 +372,41 @@ class PaddleDriver(Driver): r""" 返回一个不计算梯度的环境用来对模型进行评测; - :return: context 上下文对象 `paddle.no_grad`; + :return: 上下文对象 ``paddle.no_grad``; """ return paddle.no_grad @staticmethod def move_model_to_device(model: "paddle.nn.Layer", device: Union[str, int, "paddle.CUDAPlace", "paddle.CPUPlace"]): r""" - 用来将模型转移到指定的 device 上; - 在 Paddle 中使用可能会引起因与设置的设备不一致而产生的问题,请注意。 + 用来将模型 ``model`` 转移到指定的设备上; + + .. note:: + + 在 **Paddle** 中使用可能会引起因与设置的设备不一致而产生的问题,请注意。 + + :param model: 需要进行转移的模型; + :param device: 目标设备; """ if device is not None: model.to(device) - def move_data_to_device(self, batch: "paddle.Tensor"): + def move_data_to_device(self, batch: Any) -> Any: r""" - 将数据迁移到指定的机器上;batch 可能是 list 也可能 dict ,或其嵌套结构。 - 在 Paddle 中使用可能会引起因与设置的设备不一致而产生的问题,请注意。 + 将数据集合 ``batch`` 迁移到指定的机器上。 + + .. note:: + + 在 **Paddle** 中使用可能会引起因与设置的设备不一致而产生的问题,请注意。 - :return: 将移动到指定机器上的 batch 对象返回; + :param batch: 包含 :class:`paddle.Tensor` 的数据集合,可以是 **List**、**Dict** 等嵌套类型; + :return: 移动到指定机器后的 `batch``; """ device = _convert_data_device(self.data_device) return paddle_move_data_to_device(batch, device) @staticmethod def worker_init_function(worker_id: int, rank: Optional[int] = None) -> None: # pragma: no cover - """The worker_init_fn that Lightning automatically adds to your dataloader if you previously set set the seed - with ``seed_everything(seed, workers=True)``. - - See also the PyTorch documentation on - `randomness in DataLoaders `_. - """ # implementation notes: https://github.com/pytorch/pytorch/issues/5059#issuecomment-817392562 global_rank = rank if rank is not None else int(os.environ.get(FASTNLP_GLOBAL_RANK, 0)) # TODO gpu @@ -409,9 +433,6 @@ class PaddleDriver(Driver): @staticmethod def get_dataloader_args(dataloader: "DataLoader"): - """ - 获取 dataloader 的 shuffle 和 drop_last 属性; - """ @dataclass class Res: diff --git a/fastNLP/core/drivers/paddle_driver/single_device.py b/fastNLP/core/drivers/paddle_driver/single_device.py index c0957dbf..a9e92fd3 100644 --- a/fastNLP/core/drivers/paddle_driver/single_device.py +++ b/fastNLP/core/drivers/paddle_driver/single_device.py @@ -33,9 +33,20 @@ __all__ = [ class PaddleSingleDriver(PaddleDriver): """ - 支持 paddle cpu 或单卡 gpu 训练的 driver + 实现了 **PaddlePaddle** 框架下在单卡或 ``cpu`` 环境下训练功能的 **Driver**。 + + :param model: 训练时使用的 **PaddlePaddle** 模型; + :param device: 训练使用的设备; + :param fp16: 是否开启混合精度训练; + :kwargs: + * wo_auto_param_call (``bool``) -- 是否关闭在训练时调用我们的 ``auto_param_call`` 函数来自动匹配 batch 和前向函数的参数的行为; + + .. note:: + + 关于该参数的详细说明,请参见 :class:`~fastNLP.core.controllers.Trainer` 中的描述;函数 ``auto_param_call`` 详见 :func:`fastNLP.core.utils.auto_param_call`。 + """ - def __init__(self, model, device: Union[str, int], fp16: Optional[bool] = False, **kwargs): + def __init__(self, model: "paddle.nn.Layer", device: Union[str, int], fp16: Optional[bool] = False, **kwargs): if isinstance(model, DataParallel): raise ValueError("`paddle.DataParallel` is not supported in `PaddleSingleDriver`") @@ -62,7 +73,7 @@ class PaddleSingleDriver(PaddleDriver): def setup(self): r""" - 该函数用来初始化训练环境,用于设置当前训练的设备,并将模型迁移到对应设备上。 + 初始化训练环境;设置当前训练的设备,并将模型迁移到对应设备上。 """ device = _convert_data_device(self.data_device) @@ -127,17 +138,20 @@ class PaddleSingleDriver(PaddleDriver): return dataloader def unwrap_model(self): - if isinstance(self.model, paddle.DataParallel): - return self.model._layers - else: - return self.model + """ + 返回训练使用的模型。 + """ + return self.model @property - def data_device(self): + def data_device(self) -> str: """ - 返回数据所在的设备。由于单卡模式不支持 data_device,因此返回的是 model_device + :return: 数据和模型所在的设备; """ return self.model_device - def is_distributed(self): + def is_distributed(self) -> bool: + """ + 判断是否为分布式的 **Driver** ,在 ``PaddleSingleDriver`` 中,返回 ``False`` + """ return False diff --git a/fastNLP/core/drivers/paddle_driver/utils.py b/fastNLP/core/drivers/paddle_driver/utils.py index 6362193e..1a324c97 100644 --- a/fastNLP/core/drivers/paddle_driver/utils.py +++ b/fastNLP/core/drivers/paddle_driver/utils.py @@ -31,7 +31,15 @@ __all__ = [ def _select_seed_randomly(min_seed_value: int = 0, max_seed_value: int = 255) -> int: return random.randint(min_seed_value, max_seed_value) -def paddle_seed_everything(seed: Optional[int] = None, workers: bool = False) -> int: +def paddle_seed_everything(seed: Optional[int], workers: bool = False) -> int: + r""" + 为 **paddle**、**numpy**、**python.random** 伪随机数生成器设置种子。 + + :param seed: 全局随机状态的整数值种子。如果为 ``None``,将从环境变量 ``FASTNLP_GLOBAL_SEED`` 中读取种子或随机选择; + :param workers: 如果为 ``True`` ,则会设置环境变量 ``FASTNLP_SEED_WORKERS`` 。该环境变量会在 :class:`~fastNLP.core.Trainer` + 中配置 ``dataloader`` 时用于设置 ``worker_init_fn`` 。如果用户已经为 ``dataloader`` 提供了 ``worker_init_fn`` ,则设置 + 此参数将没有影响; + """ max_seed_value = np.iinfo(np.uint32).max min_seed_value = np.iinfo(np.uint32).min @@ -70,7 +78,7 @@ def paddle_seed_everything(seed: Optional[int] = None, workers: bool = False) -> def reset_seed() -> None: """ - fleet 会开启多个进程,因此当用户在脚本中指定 seed_everything 时,在开启多个脚本后,会在每个脚本内重新 + ``fleet`` 会开启多个进程,因此当用户在脚本中指定 ``seed_everything`` 时,在开启多个脚本后,会在每个脚本内重新 进行随机数的设置; """ seed = os.environ.get(FASTNLP_GLOBAL_SEED, None) @@ -80,8 +88,8 @@ def reset_seed() -> None: class _FleetWrappingModel(Layer): """ - 参考 _DDPWrappingModel , paddle 的分布式训练也需要用 paddle.nn.DataParallel 进行包装,采用和 - pytorch 相似的处理方式 + 参考 :class:`fastNLP.core.drivers.torch_driver.utils._DDPWrappingModel` , **PaddlePaddle** 的分布式训练也需要用 :class:`paddle.nn.DataParallel` 进行包装,采用和 + **pytorch** 相似的处理方式 """ def __init__(self, model: 'nn.Layer'): super(_FleetWrappingModel, self).__init__() @@ -100,7 +108,7 @@ class _FleetWrappingModel(Layer): class DummyGradScaler: """ - 用于仿造的GradScaler对象,防止重复写大量的if判断 + 用于仿造的 **GradScaler** 对象,防止重复写大量的if判断 """ def __init__(self, *args, **kwargs): pass @@ -144,7 +152,7 @@ def _build_fp16_env(dummy=False): def find_free_ports(num): """ - 在空闲的端口中找到 num 个端口 + 在空闲的端口中找到 ``num`` 个端口 """ def __free_port(): with closing(socket.socket(socket.AF_INET, socket.SOCK_STREAM)) as s: @@ -174,8 +182,8 @@ def find_free_ports(num): def replace_batch_sampler(dataloader: "DataLoader", batch_sampler: "BatchSampler"): """ - 利用 `batch_sampler` 重新构建一个 DataLoader,起到替换 `batch_sampler` 又不影响原 `dataloader` 的作用。 - 考虑了用户自己定制了 DataLoader 的情形。 + 利用 ``batch_sampler`` 重新构建一个 ``DataLoader``,起到替换 ``batch_sampler`` 又不影响原 ``dataloader`` 的作用。 + 考虑了用户自己定制了 ``DataLoader`` 的情形。 """ # 拿到非下划线开头的实例属性; instance_attrs = {k: v for k, v in vars(dataloader).items() if not k.startswith('_')} @@ -246,7 +254,7 @@ def replace_batch_sampler(dataloader: "DataLoader", batch_sampler: "BatchSampler def replace_sampler(dataloader, new_sampler): """ - 使用 `new_sampler` 重新构建一个 BatchSampler,并替换到 `dataloader` 中 + 使用 ``new_sampler`` 重新构建一个 ``BatchSampler``,并替换到 ``dataloader`` 中 """ new_batch_sampler = deepcopy(dataloader.batch_sampler) new_batch_sampler.sampler = new_sampler