Browse Source

完善paddle_driver的部分文档

tags/v1.0.0alpha
x54-729 3 years ago
parent
commit
84d862c131
7 changed files with 121 additions and 78 deletions
  1. +4
    -10
      fastNLP/core/drivers/jittor_driver/jittor_driver.py
  2. +4
    -0
      fastNLP/core/drivers/jittor_driver/mpi.py
  3. +1
    -1
      fastNLP/core/drivers/jittor_driver/utils.py
  4. +2
    -0
      fastNLP/core/drivers/paddle_driver/dist_utils.py
  5. +69
    -48
      fastNLP/core/drivers/paddle_driver/paddle_driver.py
  6. +24
    -10
      fastNLP/core/drivers/paddle_driver/single_device.py
  7. +17
    -9
      fastNLP/core/drivers/paddle_driver/utils.py

+ 4
- 10
fastNLP/core/drivers/jittor_driver/jittor_driver.py View File

@@ -21,6 +21,9 @@ if _NEED_IMPORT_JITTOR:
'sum': jt.sum
}

__all__ = [
"JittorDriver",
]

class JittorDriver(Driver):
r"""
@@ -90,9 +93,6 @@ class JittorDriver(Driver):
"'test_step'.")

def save_model(self, filepath: str, only_state_dict: bool = False, model_save_fn: Optional[Callable]=None):
"""
保存模型
"""
if model_save_fn is not None:
outputs = model_save_fn(filepath)
if outputs is not None:
@@ -105,12 +105,6 @@ class JittorDriver(Driver):
jt.save(states, filepath)

def load_model(self, filepath: str):
"""
加载模型的加载函数;

:param file_path: 保存文件的文件位置(需要包括文件名);
:return: 加载后的state_dict
"""
if not os.path.exists(filepath):
raise FileNotFoundError("Checkpoint at {} not found.".format(filepath))
return jt.load(filepath)
@@ -156,7 +150,7 @@ class JittorDriver(Driver):

def move_data_to_device(self, batch: 'jt.Var'):
"""
jittor暂时没有提供数据迁移的函数,因此这个函数只是简单地返回batch
**jittor** 暂时没有提供数据迁移的函数,因此这个函数只是简单地返回 **batch**
"""
return batch



+ 4
- 0
fastNLP/core/drivers/jittor_driver/mpi.py View File

@@ -20,6 +20,10 @@ class JittorMPIDriver(JittorDriver):

这是一个正在开发中的功能,敬请期待。

.. todo:

实现断点重训中替换 dataloader 的 set_dist_repro_dataloader 函数

"""
def __init__(
self,


+ 1
- 1
fastNLP/core/drivers/jittor_driver/utils.py View File

@@ -9,7 +9,7 @@ __all__ = []

class DummyGradScaler:
"""
用于仿造的GradScaler对象,防止重复写大量的if判断
用于仿造的 **GradScaler** 对象,防止重复写大量的if判断
"""
def __init__(self, *args, **kwargs):
pass


+ 2
- 0
fastNLP/core/drivers/paddle_driver/dist_utils.py View File

@@ -21,6 +21,8 @@ if _NEED_IMPORT_PADDLE:
_parse_load_result,
)

__all__ = []

def _validate_output_list_for_rank(my_rank, dst, gather_list):
if dst == my_rank:
if not gather_list:


+ 69
- 48
fastNLP/core/drivers/paddle_driver/paddle_driver.py View File

@@ -1,14 +1,12 @@
import os
import random
from typing import Union, Optional, Dict
from typing import Union, Optional, Dict, Any
from pathlib import Path
from functools import partial
from dataclasses import dataclass

import numpy as np

from fastNLP.envs.env import USER_CUDA_VISIBLE_DEVICES

from .utils import _build_fp16_env, optimizer_state_to_device, DummyGradScaler
from fastNLP.envs.imports import _NEED_IMPORT_PADDLE
from fastNLP.core.drivers.driver import Driver
@@ -50,9 +48,26 @@ if _NEED_IMPORT_PADDLE:

class PaddleDriver(Driver):
r"""
Paddle框架的Driver,包括实现单卡训练的 `PaddleSingleDriver` 和分布式训练的 `PaddleFleetDriver`。
实现了 **PaddlePaddle** 框架训练功能的基本 Driver,实现了单卡和多卡情景下均需要实现的功能,以和 **fastNLP** 的
:class:`~fastNLP.core.Trainer` 兼容;通过这个 Driver,可以在 **fastNLP** 中实现从 **Pytorch** 框架到
**PaddlePaddle** 深度学习框架的切换。

这个类被以下子类继承:

1. :class:`~fastNLP.core.drivers.PaddleSingleDriver`:实现了使用单卡和 ``cpu`` 训练的具体功能;
2. :class:`~fastNLP.core.drivers.PaddleFleetDriver`:实现了使用 ``fleet`` 分布式训练 API 进行分布式训练的具体功能;

:param model: 训练时使用的 **PaddlePaddle** 模型;
:param fp16: 是否开启混合精度训练;
:kwargs:
* wo_auto_param_call (``bool``) -- 是否关闭在训练时调用我们的 ``auto_param_call`` 函数来自动匹配 batch 和前向函数的参数的行为;

.. note::

关于该参数的详细说明,请参见 :class:`~fastNLP.core.controllers.Trainer` 中的描述;函数 ``auto_param_call`` 详见 :func:`fastNLP.core.utils.auto_param_call`。

"""
def __init__(self, model, fp16: Optional[bool] = False, **kwargs):
def __init__(self, model: "paddle.nn.Layer", fp16: Optional[bool] = False, **kwargs):
if not isinstance(model, paddle.nn.Layer):
raise ValueError(f"Parameter `model` can not be `{type(model)}` in `PaddleDriver`, it should be exactly "
f"`paddle.nn.Layer` type.")
@@ -69,10 +84,10 @@ class PaddleDriver(Driver):

def zero_grad(self, set_to_none: bool = False):
r"""
实现深度学习中的梯度的置零操作,应当直接通过优化器 optimizers 来将梯度置零;
注意梯度累积不需要在这里实现,trainer 已经在内部实现了梯度累积;
实现深度学习中的梯度的置零操作,应当直接通过优化器 ``optimizers`` 来将梯度置零;
注意梯度累积不需要在这里实现,:class:`~fastNLP.core.Trainer` 已经在内部实现了梯度累积;

:param set_to_none: 用来判断是否需要将梯度直接置为 None;Paddle中这个参数无效。
:param set_to_none: 用来判断是否需要将梯度直接置为 ``None``在 **PaddlePaddle** 中这个参数无效。
"""
for optimizer in self.optimizers:
optimizer.clear_grad()
@@ -87,14 +102,6 @@ class PaddleDriver(Driver):

@staticmethod
def check_dataloader_legality(dataloader, dataloader_name, is_train: bool = False):
r"""
该函数会在 trainer 或者 evaluator 设置 dataloader 后检测 dataloader 的合法性。
要求传入的 dataloader 必须为 `paddle.io.DataLoader` 或包含该类型的字典。

:param dataloader: 需要检测的输入的 `dataloader`;
:param dataloader_name:
:param is_train:
"""
if is_train:
if not isinstance(dataloader, DataLoader):
raise ValueError(f"Parameter `{dataloader_name}` should be 'paddle.io.DataLoader' type, not {type(dataloader)}.")
@@ -164,16 +171,15 @@ class PaddleDriver(Driver):
@rank_zero_call
def save_model(self, filepath: str, only_state_dict: bool = True, **kwargs):
r"""
保存模型的函数;注意函数 `save` 是用来进行断点重训的函数;
将模型保存到 ``filepath`` 中。

:param filepath: 保存文件的文件位置(需要包括文件名);
:param only_state_dict: 是否只保存模型的 `state_dict`;如果为 False,则会调用 `paddle.jit.save` 函数
保存整个模型的参数,此时需要传入 `input_spec` 参数,否则在 load 时会报错。
:param kwargs:
input_spec: 描述存储模型 forward 方法的输入,当 `only_state_dict` 为 False时必须传入,否则加载时会报错。
可以通过 InputSpec 或者示例 Tensor 进行描述。详细的可以参考 paddle 关于`paddle.jit.save`
的文档:
https://www.paddlepaddle.org.cn/documentation/docs/zh/api/paddle/jit/save_cn.html#save
:param only_state_dict: 是否只保存模型的 ``state_dict``;如果为 ``False``,则会调用 ``paddle.jit.save`` 函数
保存整个模型的参数,此时需要传入 ``input_spec`` 参数;
:kwargs:
* input_spec -- 描述存储模型 ``forward`` 方法的输入;
当 ``only_state_dict`` 为 ``False`` 时必须传入,否则加载时会报错。您可以通过 ``InputSpec`` 或者示例 ``Tensor``
进行描述。详细的使用方法可以参考 **PaddlePaddle** `关于 paddle.jit.save 函数的文档 <https://www.paddlepaddle.org.cn/documentation/docs/zh/api/paddle/jit/save_cn.html#save>`_;
"""
model = self.unwrap_model()
if isinstance(filepath, Path):
@@ -189,14 +195,6 @@ class PaddleDriver(Driver):
paddle.jit.save(model, filepath, input_spec)

def load_model(self, filepath: str, only_state_dict: bool = True, **kwargs):
r"""
加载模型的函数;将 filepath 中的模型加载并赋值给当前 model 。

:param filepath: 需要被加载的对象的文件位置(需要包括文件名);
:param load_state_dict: 保存的文件是否只是模型的权重,还是完整的模型。即便是保存的完整的模型,此处也只能使用尝试加载filepath
模型中的权重到自身模型,而不会直接替代当前 Driver 中的模型。
:return: 返回加载指定文件后的结果;
"""
model = self.unwrap_model()
if isinstance(filepath, Path):
filepath = str(filepath)
@@ -210,6 +208,28 @@ class PaddleDriver(Driver):

@rank_zero_call
def save(self, folder: Path, states: Dict, dataloader, only_state_dict: bool = True, should_save_model: bool = True, **kwargs):
r"""
断点重训的保存函数,该函数会负责保存模型和 optimizers, fp16 的 state_dict;以及模型的保存(若 should_save_model 为 True)

:param folder: 保存断点重训的状态的文件夹;save 函数应该在下面新增两(一)个文件 的 FASTNLP_CHECKPOINT_FILENAME 文件与
FASTNLP_MODEL_FILENAME (如果 should_save_model 为 True )。把 model 相关的内容放入到 FASTNLP_MODEL_FILENAME 文件
中,将传入的 states 以及自身产生其它状态一并保存在 FASTNLP_CHECKPOINT_FILENAME 里面。
:param states: 由 trainer 传入的一个字典,其中已经包含了为了实现断点重训所需要保存的其它对象的状态,Driver 应该只需要保存
该对象即可, Driver 应该不需要理解该对象,同时在 driver.load() 的时候,需要将 states 返回回去,load() 返回的值与这里的
传入的值保持一致。
:param dataloader: 正在使用的 dataloader,需要保存里面的状态使得之后可以从当前迭代的位置恢复。
:param only_state_dict: 是否只保存模型的参数,当 should_save_model 为 False ,该参数无效。
:param should_save_model: 是否应该保存模型,如果为False,Driver 将不负责 model 的保存。
:kwargs:
* input_spec -- 描述存储模型 ``forward`` 方法的输入;
当 ``only_state_dict`` 为 ``False`` 时必须传入,否则加载时会报错。您可以通过 ``InputSpec`` 或者示例 ``Tensor``
进行描述。详细的使用方法可以参考 **PaddlePaddle** `关于 paddle.jit.save 函数的文档 <https://www.paddlepaddle.org.cn/documentation/docs/zh/api/paddle/jit/save_cn.html#save>`_;
.. todo:

等 Driver 的文档写完

"""
# 传入的 dataloader 参数是 trainer 的 dataloader 属性,因为 driver 的所有 dataloader 我们是不会去改变它的,而是通过改变
# trainer.dataloader 来改变 dataloader 的状态,从而适配训练或者评测环境;

@@ -352,37 +372,41 @@ class PaddleDriver(Driver):
r"""
返回一个不计算梯度的环境用来对模型进行评测;

:return: context 上下文对象 `paddle.no_grad`;
:return: 上下文对象 ``paddle.no_grad``;
"""
return paddle.no_grad

@staticmethod
def move_model_to_device(model: "paddle.nn.Layer", device: Union[str, int, "paddle.CUDAPlace", "paddle.CPUPlace"]):
r"""
用来将模型转移到指定的 device 上;
在 Paddle 中使用可能会引起因与设置的设备不一致而产生的问题,请注意。
用来将模型 ``model`` 转移到指定的设备上;

.. note::

在 **Paddle** 中使用可能会引起因与设置的设备不一致而产生的问题,请注意。

:param model: 需要进行转移的模型;
:param device: 目标设备;
"""
if device is not None:
model.to(device)

def move_data_to_device(self, batch: "paddle.Tensor"):
def move_data_to_device(self, batch: Any) -> Any:
r"""
将数据迁移到指定的机器上;batch 可能是 list 也可能 dict ,或其嵌套结构。
在 Paddle 中使用可能会引起因与设置的设备不一致而产生的问题,请注意。
将数据集合 ``batch`` 迁移到指定的机器上。
.. note::

在 **Paddle** 中使用可能会引起因与设置的设备不一致而产生的问题,请注意。

:return: 将移动到指定机器上的 batch 对象返回;
:param batch: 包含 :class:`paddle.Tensor` 的数据集合,可以是 **List**、**Dict** 等嵌套类型;
:return: 移动到指定机器后的 `batch``;
"""
device = _convert_data_device(self.data_device)
return paddle_move_data_to_device(batch, device)

@staticmethod
def worker_init_function(worker_id: int, rank: Optional[int] = None) -> None: # pragma: no cover
"""The worker_init_fn that Lightning automatically adds to your dataloader if you previously set set the seed
with ``seed_everything(seed, workers=True)``.

See also the PyTorch documentation on
`randomness in DataLoaders <https://pytorch.org/docs/stable/notes/randomness.html#dataloader>`_.
"""
# implementation notes: https://github.com/pytorch/pytorch/issues/5059#issuecomment-817392562
global_rank = rank if rank is not None else int(os.environ.get(FASTNLP_GLOBAL_RANK, 0))
# TODO gpu
@@ -409,9 +433,6 @@ class PaddleDriver(Driver):

@staticmethod
def get_dataloader_args(dataloader: "DataLoader"):
"""
获取 dataloader 的 shuffle 和 drop_last 属性;
"""

@dataclass
class Res:


+ 24
- 10
fastNLP/core/drivers/paddle_driver/single_device.py View File

@@ -33,9 +33,20 @@ __all__ = [

class PaddleSingleDriver(PaddleDriver):
"""
支持 paddle cpu 或单卡 gpu 训练的 driver
实现了 **PaddlePaddle** 框架下在单卡或 ``cpu`` 环境下训练功能的 **Driver**。

:param model: 训练时使用的 **PaddlePaddle** 模型;
:param device: 训练使用的设备;
:param fp16: 是否开启混合精度训练;
:kwargs:
* wo_auto_param_call (``bool``) -- 是否关闭在训练时调用我们的 ``auto_param_call`` 函数来自动匹配 batch 和前向函数的参数的行为;

.. note::

关于该参数的详细说明,请参见 :class:`~fastNLP.core.controllers.Trainer` 中的描述;函数 ``auto_param_call`` 详见 :func:`fastNLP.core.utils.auto_param_call`。

"""
def __init__(self, model, device: Union[str, int], fp16: Optional[bool] = False, **kwargs):
def __init__(self, model: "paddle.nn.Layer", device: Union[str, int], fp16: Optional[bool] = False, **kwargs):
if isinstance(model, DataParallel):
raise ValueError("`paddle.DataParallel` is not supported in `PaddleSingleDriver`")

@@ -62,7 +73,7 @@ class PaddleSingleDriver(PaddleDriver):

def setup(self):
r"""
该函数用来初始化训练环境,用于设置当前训练的设备,并将模型迁移到对应设备上。
初始化训练环境;设置当前训练的设备,并将模型迁移到对应设备上。
"""
device = _convert_data_device(self.data_device)

@@ -127,17 +138,20 @@ class PaddleSingleDriver(PaddleDriver):
return dataloader

def unwrap_model(self):
if isinstance(self.model, paddle.DataParallel):
return self.model._layers
else:
return self.model
"""
返回训练使用的模型。
"""
return self.model

@property
def data_device(self):
def data_device(self) -> str:
"""
返回数据所在的设备。由于单卡模式不支持 data_device,因此返回的是 model_device
:return: 数据和模型所在的设备;
"""
return self.model_device

def is_distributed(self):
def is_distributed(self) -> bool:
"""
判断是否为分布式的 **Driver** ,在 ``PaddleSingleDriver`` 中,返回 ``False``
"""
return False

+ 17
- 9
fastNLP/core/drivers/paddle_driver/utils.py View File

@@ -31,7 +31,15 @@ __all__ = [
def _select_seed_randomly(min_seed_value: int = 0, max_seed_value: int = 255) -> int:
return random.randint(min_seed_value, max_seed_value)

def paddle_seed_everything(seed: Optional[int] = None, workers: bool = False) -> int:
def paddle_seed_everything(seed: Optional[int], workers: bool = False) -> int:
r"""
为 **paddle**、**numpy**、**python.random** 伪随机数生成器设置种子。

:param seed: 全局随机状态的整数值种子。如果为 ``None``,将从环境变量 ``FASTNLP_GLOBAL_SEED`` 中读取种子或随机选择;
:param workers: 如果为 ``True`` ,则会设置环境变量 ``FASTNLP_SEED_WORKERS`` 。该环境变量会在 :class:`~fastNLP.core.Trainer`
中配置 ``dataloader`` 时用于设置 ``worker_init_fn`` 。如果用户已经为 ``dataloader`` 提供了 ``worker_init_fn`` ,则设置
此参数将没有影响;
"""

max_seed_value = np.iinfo(np.uint32).max
min_seed_value = np.iinfo(np.uint32).min
@@ -70,7 +78,7 @@ def paddle_seed_everything(seed: Optional[int] = None, workers: bool = False) ->

def reset_seed() -> None:
"""
fleet 会开启多个进程,因此当用户在脚本中指定 seed_everything 时,在开启多个脚本后,会在每个脚本内重新
``fleet`` 会开启多个进程,因此当用户在脚本中指定 ``seed_everything`` 时,在开启多个脚本后,会在每个脚本内重新
进行随机数的设置;
"""
seed = os.environ.get(FASTNLP_GLOBAL_SEED, None)
@@ -80,8 +88,8 @@ def reset_seed() -> None:

class _FleetWrappingModel(Layer):
"""
参考 _DDPWrappingModel , paddle 的分布式训练也需要用 paddle.nn.DataParallel 进行包装,采用和
pytorch 相似的处理方式
参考 :class:`fastNLP.core.drivers.torch_driver.utils._DDPWrappingModel` , **PaddlePaddle** 的分布式训练也需要用 :class:`paddle.nn.DataParallel` 进行包装,采用和
**pytorch** 相似的处理方式
"""
def __init__(self, model: 'nn.Layer'):
super(_FleetWrappingModel, self).__init__()
@@ -100,7 +108,7 @@ class _FleetWrappingModel(Layer):

class DummyGradScaler:
"""
用于仿造的GradScaler对象,防止重复写大量的if判断
用于仿造的 **GradScaler** 对象,防止重复写大量的if判断
"""
def __init__(self, *args, **kwargs):
pass
@@ -144,7 +152,7 @@ def _build_fp16_env(dummy=False):

def find_free_ports(num):
"""
在空闲的端口中找到 num 个端口
在空闲的端口中找到 ``num`` 个端口
"""
def __free_port():
with closing(socket.socket(socket.AF_INET, socket.SOCK_STREAM)) as s:
@@ -174,8 +182,8 @@ def find_free_ports(num):

def replace_batch_sampler(dataloader: "DataLoader", batch_sampler: "BatchSampler"):
"""
利用 `batch_sampler` 重新构建一个 DataLoader,起到替换 `batch_sampler` 又不影响原 `dataloader` 的作用。
考虑了用户自己定制了 DataLoader 的情形。
利用 ``batch_sampler`` 重新构建一个 ``DataLoader``,起到替换 ``batch_sampler`` 又不影响原 ``dataloader`` 的作用。
考虑了用户自己定制了 ``DataLoader`` 的情形。
"""
# 拿到非下划线开头的实例属性;
instance_attrs = {k: v for k, v in vars(dataloader).items() if not k.startswith('_')}
@@ -246,7 +254,7 @@ def replace_batch_sampler(dataloader: "DataLoader", batch_sampler: "BatchSampler

def replace_sampler(dataloader, new_sampler):
"""
使用 `new_sampler` 重新构建一个 BatchSampler,并替换到 `dataloader` 中
使用 ``new_sampler`` 重新构建一个 ``BatchSampler``,并替换到 ``dataloader`` 中
"""
new_batch_sampler = deepcopy(dataloader.batch_sampler)
new_batch_sampler.sampler = new_sampler


Loading…
Cancel
Save