Browse Source

PaddleFleetDriver的文档

tags/v1.0.0alpha
x54-729 3 years ago
parent
commit
6cbb5ceec4
9 changed files with 156 additions and 182 deletions
  1. +1
    -1
      fastNLP/core/controllers/trainer.py
  2. +128
    -153
      fastNLP/core/drivers/paddle_driver/fleet.py
  3. +7
    -8
      fastNLP/core/drivers/paddle_driver/paddle_driver.py
  4. +1
    -1
      fastNLP/core/drivers/paddle_driver/single_device.py
  5. +2
    -2
      fastNLP/core/utils/jittor_utils.py
  6. +4
    -4
      fastNLP/core/utils/paddle_utils.py
  7. +2
    -2
      fastNLP/core/utils/rich_progress.py
  8. +8
    -8
      fastNLP/core/utils/utils.py
  9. +3
    -3
      fastNLP/modules/mix_modules/utils.py

+ 1
- 1
fastNLP/core/controllers/trainer.py View File

@@ -264,7 +264,7 @@ class Trainer(TrainerEventTrigger):

* fleet_kwargs -- 用于在使用 ``PaddleFleetDriver`` 时指定 ``DataParallel`` 和 ``fleet`` 初始化时的参数,包括:

* is_collective -- 是否使用 paddle 集群式的分布式训练方法,目前仅支持为 True 的情况;
* is_collective -- 是否使用 paddle 集群式的分布式训练方法,目前仅支持为 ``True`` 的情况;
* role_maker -- 初始化 ``fleet`` 分布式训练 API 时使用的 ``RoleMaker``
* 其它用于初始化 ``DataParallel`` 的参数;
* *data_device* -- 一个具体的 driver 实例中,有 ``model_device`` 和 ``data_device``,前者表示模型所在的设备,后者表示


+ 128
- 153
fastNLP/core/drivers/paddle_driver/fleet.py View File

@@ -1,3 +1,69 @@
r"""
用于实现 **PaddlePaddle** 框架下使用 ``fleet`` 分布式训练 API 进行集群式(*collective*)多卡训练的 Driver。

.. note::

在 **PaddlePaddle** 框架中,使用分布式训练的方式可以参见 **PaddlePaddle** 的
`官方文档 <https://www.paddlepaddle.org.cn/documentation/docs/zh/guides/06_distributed_training/cluster_quick_start_cn.html>`_ 。
简言之,分布式训练的过程可以概括为:导入 ``fleet`` 包 -> 使用 :func:`fleet.init` 初始化分布式环境 -> 初始化模型,转换为并行模型开始训练。

**fastNLP** 支持三种启动分布式训练的方式(假设执行训练的文件名为 ``train.py``):

A. 用户自己不进行分布式的任何操作,直接使用我们的 :class:`~fastNLP.core.Trainer` 进行训练,此时将参数 ``device``
设置为一个列表,然后使用 ``python train.py`` 的方式开始训练;
B. 用户自己不进行分布式的任何操作,但是使用 ``python -m paddle.distributed.launch train.py`` 开始训练;
C. 用户自己在外面初始化分布式环境,并且通过 ``python -m paddle.distributed.launch train.py`` 开始训练;

.. note::

在后两种启动方式中,您需要通过参数 ``--gpus`` 来指定训练使用的设备,在 ``trainer`` 中设置的参数是无效的。

不过在使用该 Driver 之前,我们需要向您说明 **fastNLP** 实现 ``PaddleFleetDriver`` 的思路,以便于您理解代码编写过程中可能出现的问题。

在 **fastNLP** 中,为了尽可能减少单卡向分布式训练转换过程中的代码变动,我们需要在 ``PaddleFleetDriver`` 中进行 **分布式环境初始化**
和 **将模型转换为并行模式** 等操作,同时实现多卡训练的方法是从主进程(``rank=0``)中创建其它的所有子进程(``rank=1,2,...``)。
在这个过程中,我们发现由于 **PaddlePaddle** 框架的特性,会出现下面的问题:

1. **fastNLP** 中,初始化模型一定会在初始化 ``Driver`` 之前,因此调用 :func:`fleet.init` 的时机会在初始化模型之后;
此时子进程中模型将无法正常地初始化,提示无法找到设备 ``gpu:0``;
2. 在训练的过程中,会出现训练一个 ``batch`` 后程序卡住或程序会占用所有可见显卡的情况;

考虑到这些问题,我们为 **PaddlePaddle** 的分布式训练制定了这样的约束:在导入 **fastNLP** 之前,必须设置环境变量 ``FASTNLP_BACKEND``
为 ``paddle``。执行方法有两种::

>>> import os
>>> os.environ["FASTNLP_BACKEND"] = "paddle" # 设置环境变量
>>> import fastNLP # 设置之后才可以导入 fastNLP

或是在执行脚本(假设文件名为 ``train.py`` )时设置::

FASTNLP_BACKEND=paddle python train.py
FASTNLP_BACKEND=paddle python -m paddle.distributed.lauch train.py

设置 ``FASTNLP_BACKEND=paddle`` 后,**fastNLP** 会在 ``import paddle`` 之前通过 ``CUDA_VISIBLE_DEVICES`` 将设备限制在所有可见设备的第
**0** 张卡上,以此绕开通信和同步上的种种限制。我们会将用户希望可见的设备(如用户自己设置了 ``CUDA_VISIBLE_DEVICES`` 的情况)保存在另一个环境变量
``USER_CUDA_VISIBLE_DEVICES`` 中来确保 **fastNLP** 能够知道用户的设置。假设用户希望在 ``[0,2,3]`` 三张显卡上进行分布式训练,那么在三个训练进程中,
``CUDA_VISIBLE_DEVICES`` 就分别为 0、2 和 3 。

.. note::

我们会事先将设备限制在所有可见设备的第 **0** 张卡上,因此多卡训练的参数 ``device`` 一定要以 **0** 开始,否则会无法正常地启动。
如果您希望调整使用的第一张显卡,请使用 ``CUDA_VISIBLE_DEVICES`` 进行限制。

.. note::

根据 **PaddlePaddle** 的说明,设置 ``CUDA_VISIBLE_DEVICES`` 之后启动分布式训练时,情况A与情况BC设置设备的方式会有所不同。
情况A应设置为实际设备相对可见设备的索引,而情况BC应设置为实际的设备号:

1. 情况A中, ``CUDA_VISIBLE_DEVICES=3,4,5,6`` 且参数 ``device=[0,2,3]`` 代表使用 **3号、5号和6号** 显卡;
2. 情况BC中,``CUDA_VISIBLE_DEVICES=3,4,5,6`` 且参数 ``--gpu=3,5,6`` 代表使用 **3号、5号和6号** 显卡;

.. note::

多机的启动强制要求用户在每一台机器上使用 ``python -m paddle.distributed.launch`` 启动;因此我们不会在 ``PaddleFleetDriver``
中保存任何当前有多少台机器的信息;

"""
import os
from typing import List, Union, Optional, Dict, Tuple, Callable

@@ -53,6 +119,33 @@ __all__ = [
]

class PaddleFleetDriver(PaddleDriver):
"""
:param model: 训练使用的模型;

* 如果不想自己初始化分布式环境,类型应为 :class:`paddle.nn.Layer`;
* 如果已经在外面初始化了分布式环境,类型应为 :class:`paddle.DataParallel`;

:param parallel_device: 多卡训练时使用的设备,必须是一个列表。
当使用 ``python -m paddle.distributed.launch`` 启动时,该参数无效;
:param is_pull_by_paddle_run: 标记当前进程是否为通过 ``python -m paddle.distributed.launch`` 启动的。
这个参数仅在 :class:`~fastNLP.core.Trainer` 中初始化 driver 时使用
:param fp16: 是否开启混合精度训练;
:kwargs:
* *paddle_kwargs* -- 用于在指定 ``driver`` 为 'paddle' 时设定具体 driver 实例的一些参数:

* fleet_kwargs -- 用于在使用 ``PaddleFleetDriver`` 时指定 ``DataParallel`` 和 ``fleet`` 初始化时的参数,包括:

* is_collective -- 是否使用 paddle 集群式的分布式训练方法,目前仅支持为 ``True`` 的情况;
* role_maker -- 初始化 ``fleet`` 分布式训练 API 时使用的 ``RoleMaker``
* 其它用于初始化 ``DataParallel`` 的参数;

* wo_auto_param_call (``bool``) -- 是否关闭在训练时调用我们的 ``auto_param_call`` 函数来自动匹配 batch 和前向函数的参数的行为;

.. note::

关于该参数的详细说明,请参见 :class:`~fastNLP.core.controllers.Trainer` 中的描述;函数 ``auto_param_call`` 详见 :func:`fastNLP.core.utils.auto_param_call`。

"""
def __init__(
self,
model,
@@ -61,143 +154,20 @@ class PaddleFleetDriver(PaddleDriver):
fp16: bool = False,
**kwargs
):
r"""
通过使用 PaddlePaddle 的 Fleet 框架启动多卡进程的 Driver。
需要注意的一点是,由于 PaddlePaddle 框架的特性,如果直接使用在 rank0 拉起其它进程的方法的话,如果不加以任何限制,PaddlePaddle会出现
第一次前向传播后卡住或占用所有显卡的现象;为了解决这一问题,我们在引入 FastNLP 时,会使用 `CUDA_VISIBLE_DEVICES` 将设备限制在卡0上,
而用户如果使用了这一环境变量,我们会将其储存在 `USER_CUDA_VISIBLE_DEVICES` 中,并且通过一定的手段实现了转换(详细的设置请参见:
`fastNLP/envs/set_backend.py`)。在拉起其它进程的时候,我们会如法炮制,将环境限制在对应的设备上。

`PaddleFleetDriver` 目前支持的三种启动方式:
1. 用户自己不进行分布式的任何操作,直接使用我们的 Trainer,这时是由我们自己使用 `FleetLauncher` 拉起多个进程,
然后 `PaddleFleetDriver` 自己通过调用 `fleet.init` 来初始化 ddp 的通信组;(情况 A)
2. 用户同样不在 Trainer 之外初始化分布式训练,但是用户自己使用 python -m paddle.distributed.launch 拉起来创建多个进程,这时我们仍旧
会通过调用 `fleet.init` 来初始化 ddp 的通信组;(情况 B)
3. 用户自己在外面初始化分布式,并且通过 python -m paddle.distributed.launch 拉起,这时无论是多个进程的拉起和通信组的建立
都由用户自己操作,我们只会在 driver.setup 的时候对 `PaddleFleetDriver` 设置一些必要的属性值;(情况 C)

注意多机的启动强制要求用户在每一台机器上使用 python -m paddle.distributed.launch 启动;因此我们不会在 `PaddleFleetDriver` 中保存
任何当前有多少台机器的信息;

Part 1:三种启动方式的具体分析:
(1)对于用户运行的脚本中,如果 `driver.setup` 只会被调用一次(意味着用户的启动脚本中只初始化了一个 trainer/evaluator)时,
`PaddleFleetDriver` 在初始化以及 `setup` 函数中会做的事情分别如下所示:
-> 情况 A:这种情况下用户传入的 model 在一定是普通的 model(没有经 `DataParallel` 包裹的model),
因为 `Parallel` 的使用一定要求 fleet.init 已经被调用用来建立当前的 ddp 通信组;但是这意味着如果
用户需要使用 2 张以上的显卡,那么其必然需要使用 paddle.distributed.launch 来启动,意味着就不是情况 A 了;
这时我们首先会调用 `FleetLauncher.launch` 函数来拉起多个进程,其中进程的数量等于用户传入给 trainer 的使用的 gpu
的数量(例如 `Trainer` 中的参数是 device=[0, 1, 6, 7],那么我们就会使用第 0、1、6、7 张 gpu 来拉起 4 个进程);
接着我们会调用 `fleet.init` 来初始化各个进程之间的通信组;
这里需要注意拉起的新的进程会从前到后完整地运行一遍用户的启动脚本(例如 main.py),因此也都会运行这两个函数,但是需要注意只有进程 0
才会去真正地运行 `FleetLauncher.launch`;进程 0 运行到 `fleet.init`,paddle 会阻塞进程 0 继续
向前运行,直到其它进程也运行到这里;
最后我们会设置这个进程对应的 device,然后将模型迁移到对应的机器上,再使用 `DataParallel` 将模型包裹;
至此,paddle 分布式的环境配置过程全部完成;

-> 情况 B:注意这种情况我们直接限定了用户是通过 paddle.distributed.launch 拉起,并且没有自己建立分布式的通信组。这时在
`PaddleFleetDriver` 的初始化和 setup 函数的调用过程中,与情况 A 首要的不同就在于用户在 trainer 中输入的参数 device 不再有效,
这时每个进程所使用的 gpu 是我们直接通过 `CUDA_VISIBLE_DEVICE` 来配置的;因此,如果用户想要实现使用特定 gpu
设备的目的,可以通过自己设置环境变量实现(例如 os.environ["CUDA_VISIBLE_DEVICE"] 来实现,我们会通过一定的手段将其保存起来);
剩下的操作和情况 A 类似;

-> 情况 C:注意这种情况我们限定了用户是通过 paddle.distributed.launch 拉起,并且 ddp 的通信组也是由自己建立。这时基本上所有的
与操作相关的操作都应当由用户自己完成,包括迁移模型到对应 gpu 上以及将模型用 `DataParallel` 包裹等。
(2)如果 `driver.setup` 函数在脚本中会被调用两次及以上(意味着用户的启动脚本初始化了两个及以上的 trainer/evaluator)时:
注意这种情况下我们是会保证前后两个 trainer/evaluator 使用的 `PaddleFleetDriver` 以及其初始化方式的一致性,换句话说,如果 trainer1
检测到的启动方式是 '情况 A',那么我们会保证 trainer2 检测到的启动方式同样是 '情况A'(即使这需要一些额外的处理);因此这里我们主要讨论
我们是通过怎样的操作来保证 trainer2/3/... 检测到的启动方式是和 trainer1 一致的;简单来说,我们是通过使用环境变量来标记每一种不同的
启动方式来实现这一点的:
我们会使用 `FASTNLP_DISTRIBUTED_CHECK` 来标记 '情况 A',使用 `fastnlp_torch_launch_not_ddp` 来标记 '情况 B',意味着我们在
使用 '情况 A' 来启动 `PaddleFleetDriver` 时,我们会将 `FASTNLP_DISTRIBUTED_CHECK` 这一字符串注入到环境变量中,而 '情况 B' 时则
会将 `fastnlp_torch_launch_not_ddp` 这一字符串注入到环境变量中。因此在 trainer2 的 `PaddleFleetDriver` 的初始化和 setup 过程中,
如果检测到这些特殊的环境变量,我们就会将启动方式变更为其对应的启动方式,即使其它的参数特征属于另外的启动方式。

Part 2:对应的代码细节:
1. 如何判断当前的各进程之间的通信组已经被建立(fleet 已经被初始化);
parallel_helper._is_parallel_ctx_initialized();
2. 如何判断不同的进程是否是由 `python -m paddle.distributed.launch` 拉起还是由我们的 `FleetLauncher.launch()`
函数拉起;
我们会在用户脚本 `import fastNLP` 的时候检测当前的环境变量中是否有 'PADDLE_RANK_IN_NODE'、'PADDLE_TRAINER_ID'
以及没有 `FASTNLP_DISTRIBUTED_CHECK`,
如果满足条件,则我们会向环境变量中注入特殊的值 'FASTNLP_BACKEND_LAUNCH' 来标记用户是否使用了 `python -m paddle.distributed.launch`
来拉起多个进程;
3. 整体的处理判断流程:
___________________________________
|进入 PaddleFleetDriver 的 __init__ 函数|
———————————————————————————————————
___________________________________________________
| 判断不同的进程是否是由 paddle.distributed.launch 拉起 |
|(或者我们自己的 FleetLauncher 函数拉起) | -------------->
———————————————————————————————————————————————————  |
↓ 是由 paddle.distributed.launch 拉起 | 我们自己的 FleetLauncher 函数拉起多个进程
 _____________________________            | 
←←←←← | 检测用户是否自己初始化了 fleet |              |
↓ —————————————————————————————                  ↓
↓ ↓ 是 ________
↓ ______ | 情况 A |
↓ 否 |情况 C| —————————
↓ ———————
↓ ______
↓ -----------> |情况 B|
  ———————
4. 为了完成全部的建立分布式所需要的操作,三种情况都需要做的事情,以及每件事情的职责归属:

情况 A | 情况 B | 情况 C
________________________________________________________________________________________________________
配置 fleet 所 | FleetLauncher.launch | paddle.distributed.launch| paddle.distributed.launch
需要的环境变量 | | |
————————————————————————————————————————————————————————————————————————————————————————————————————————
开启多个进程 | FleetLauncher.launch | paddle.distributed.launch| paddle.distributed.launch
————————————————————————————————————————————————————————————————————————————————————————————————————————
调用 fleet.init函数 | PaddleFleetDriver.setup | PaddleFleetDriver.setup | 用户自己调用
————————————————————————————————————————————————————————————————————————————————————————————————————————
设置 PaddleFleetDriver | | |
的 world_size 和 | PaddleFleetDriver.setup | PaddleFleetDriver.setup | PaddleFleetDriver.setup
global_rank 属性 | | |
————————————————————————————————————————————————————————————————————————————————————————————————————————

Part 3:其它的处理细节:
1. 环境变量;
fastNLP 的 `PaddleFleetDriver` 运行时所需要的环境变量分为两种,一种是 paddle fleet 运行所需要的环境变量;另一种是 fastNLP 自己
的环境变量。前者的配置情况如上表所示;而后者中的大多数环境变量则是在用户 import fastNLP 时就设置好了;
2. parallel_device, model_device 和 data_device 的关系;
parallel_device 为 `PaddleFleetDriver` 的参数,model_device 和 data_device 都为 driver 的属性;
其中 data_device 仅当情况 C 时由用户自己指定;如果其不为 None,那么在模型 forward 的时候,我们就会将数据迁移到 data_device 上;
model_device 永远都为单独的一个 torch.device;

情况 A | 情况 B | 情况 C
________________________________________________________________________________________________________
parallel_device | 由用户传入trainer的参数 | |
| device 决定,必须是一个list, | 为 CUDA_VISIBLE_DEVICES | 为 CUDA_VISIBLE_DEVICES
| 其中每一个对象都是 int | |
————————————————————————————————————————————————————————————————————————————————————————————————————————
model_device | parallel_device[local_rank] | parallel_device | None
————————————————————————————————————————————————————————————————————————————————————————————————————————
data_device | model_device | model_device | 由用户传入 trainer 的参数
| | | data_device 决定
————————————————————————————————————————————————————————————————————————————————————————————————————————

3. _DDPWrappingModel 的作用;
因为我们即需要调用模型的 `train_step`、`evaluate_step`、`test_step` 方法,又需要通过 `DataParallel` 的forward 函数来帮助
我们同步各个设备上的梯度,因此我们需要先将模型单独包裹一层,然后在 forward 的时候,其先经过 `DataParallel` 的 forward 方法,
然后再经过 `_DDPWrappingModel` 的 forward 方法,我们会在该 forward 函数中进行判断,确定调用的是模型自己的 forward 函数,还是
`train_step`、`evaluate_step`、`test_step` 方法。

4. 当某一个进程出现 exception 后,`PaddleFleetDriver` 的处理;

不管是什么情况,`PaddleFleetDriver` 在 `setup` 函数的最后,都会将所有进程的 pid 主动记录下来,这样当一个进程出现 exception 后,
driver 的 on_exception 函数就会被 trainer 调用,其会调用 os.kill 指令将其它进程 kill 掉;
"""
if USER_CUDA_VISIBLE_DEVICES not in os.environ:
raise RuntimeError("To run paddle distributed training, please set `FASTNLP_BACKEND` to 'paddle' before using FastNLP.")
super(PaddleFleetDriver, self).__init__(model, fp16=fp16, **kwargs)

# 如果不是通过 launch 启动,要求用户必须传入 parallel_device
if not is_pull_by_paddle_run and parallel_device is None:
raise ValueError("Parameter `parallel_device` can not be None when using `PaddleFleetDriver`. This error is caused "
"when your value of parameter `device` is `None` in your `Trainer` instance.")
if not is_pull_by_paddle_run:
if parallel_device is None:
raise ValueError("Parameter `parallel_device` can not be None when using `PaddleFleetDriver`. This error is caused "
"when your value of parameter `device` is `None` in your `Trainer` instance.")
if not isinstance(parallel_device, List):
raise ValueError("Parameter `parallel_device`'s type must be List when using `PaddleFleetDriver`, "
f"not {type(parallel_device)}.")
if get_paddle_device_id(parallel_device[0]) != 0:
raise ValueError("The first device of `parallel_device` must be 'gpu:0' in fastNLP.")
# 如果用户自己初始化了 paddle 的分布式训练那么一定是通过 launch 拉起的
# 这个参数会在 initialize_paddle_drvier 中设置。
@@ -254,10 +224,10 @@ class PaddleFleetDriver(PaddleDriver):

def setup(self):
"""
根据不同的情况进行不同的设置
1、如果是通过 paddle.distributed.launch 方法启动时,则根据已经设置好的环境获取
分布式的属性。
2、否则,调用 FleetLauncher 类启动子进程
初始化分布式训练的环境
1. 如果是通过 ``paddle.distributed.launch`` 方法启动的,则根据已经设置好的环境获取分布式的属性。
2. 否则启动子进程。
"""
if self._has_setup:
return
@@ -267,7 +237,7 @@ class PaddleFleetDriver(PaddleDriver):

if self.outside_fleet:
# 已经初始化了多机环境
self.set_from_fleet_environment()
self._set_from_fleet_environment()
else:
# 用户没有初始化多机环境
# TODO 绕一下
@@ -287,7 +257,7 @@ class PaddleFleetDriver(PaddleDriver):
# parallel_device 是 list,
if not parallel_helper._is_parallel_ctx_initialized():
# 拉起子进程并设置相应的属性
self.init_fleet_and_set()
self._init_fleet_and_set()
# 用户在这个 trainer 前面又初始化了一个 trainer,并且使用的是 PaddleFleetDriver;
else:
# 已经设置过一次,保证参数必须是一样的
@@ -321,7 +291,7 @@ class PaddleFleetDriver(PaddleDriver):
self._pids = self._pids[node_rank*local_world_size: (node_rank+1)*local_world_size]
self._pids = self.tensor_to_numeric(self._pids)

def init_fleet_and_set(self):
def _init_fleet_and_set(self):
"""
使用 FleetLauncher 拉起子进程
"""
@@ -340,7 +310,7 @@ class PaddleFleetDriver(PaddleDriver):
assert self.world_size is not None
assert self.world_size == len(self.parallel_device)

def set_from_fleet_environment(self):
def _set_from_fleet_environment(self):
"""
当用户使用了 `python -m paddle.distributed.launch xxx.py` 启动时,我们需要
根据 paddle 设置的环境变量来获得各种属性
@@ -349,19 +319,11 @@ class PaddleFleetDriver(PaddleDriver):
self.global_rank = paddledist.get_rank()

def barrier(self):
r"""
用于在多进程工作时同步各进程的工作进度,运行快的进程运行到这里会等待运行慢的进程,只有所有进程都运行到此函数时,所有的进程才会继续运行;
仅在多分布式训练场景中有使用。

注意,该函数的行为会受到 FASTNLP_NO_SYNC 的影响。仅当 FASTNLP_NO_SYNC 在 os.environ 中不存在,或小于 1 时才真的执行 barrier 。
"""
if int(os.environ.get(FASTNLP_NO_SYNC, 0)) < 1: # 当 FASTNLP_NO_SYNC 小于 1 时实际执行
paddledist.barrier()

def configure_fleet(self):
"""
将模型用 DataParallel 和自定义的类型包裹起来
"""
# 将模型用 DataParallel 和自定义的类型包裹起来
if not self._has_fleetwrapped and not isinstance(self.model, DataParallel):
self.model = DataParallel(
_FleetWrappingModel(self.model),
@@ -395,10 +357,17 @@ class PaddleFleetDriver(PaddleDriver):

@property
def model_device(self):
"""
:return: 模型所在的设备;
"""
return self._model_device

@property
def data_device(self):
"""
:return: 数据所在的设备;由于 **PaddlePaddle** 可以通过环境变量获取当前进程的设备,因此该属性
和 ``model_device`` 表现相同;
"""
return self.model_device

def model_call(self, batch, fn: Callable, signature_fn: Optional[Callable]) -> Dict:
@@ -522,23 +491,29 @@ class PaddleFleetDriver(PaddleDriver):
else:
raise ValueError("Parameter `dist_sampler` can only be one of three values: ('dist', 'unrepeatdist', None).")

def is_global_zero(self):
def is_global_zero(self) -> bool:
return self.global_rank == 0

def get_model_no_sync_context(self):
return self.model.no_sync

def unwrap_model(self):
def unwrap_model(self) -> "paddle.nn.Layer":
"""
获得 driver 最原始的模型。该函数可以取出被 :class:`paddle.DataParallel` 包裹起来的模型。
"""
_layers = self.model._layers
if isinstance(_layers, _FleetWrappingModel):
return _layers.model
else:
return _layers

def get_local_rank(self) ->int:
def get_local_rank(self) -> int:
return self.local_rank

def is_distributed(self):
def is_distributed(self) -> bool:
"""
判断是否为分布式的 **Driver** ,在 ``PaddleFleetDriver`` 中,返回 ``True``。
"""
return True

@staticmethod


+ 7
- 8
fastNLP/core/drivers/paddle_driver/paddle_driver.py View File

@@ -55,7 +55,7 @@ class PaddleDriver(Driver):
这个类被以下子类继承:

1. :class:`~fastNLP.core.drivers.PaddleSingleDriver`:实现了使用单卡和 ``cpu`` 训练的具体功能;
2. :class:`~fastNLP.core.drivers.PaddleFleetDriver`:实现了使用 ``fleet`` 分布式训练 API 进行分布式训练的具体功能;
2. :class:`~fastNLP.core.drivers.PaddleFleetDriver`:实现了使用 ``fleet`` 分布式训练 API 进行集群式分布式训练的具体功能;

:param model: 训练时使用的 **PaddlePaddle** 模型;
:param fp16: 是否开启混合精度训练;
@@ -174,8 +174,8 @@ class PaddleDriver(Driver):
将模型保存到 ``filepath`` 中。

:param filepath: 保存文件的文件位置(需要包括文件名);
:param only_state_dict: 是否只保存模型的 ``state_dict``;如果为 ``False``,则会调用 ``paddle.jit.save`` 函数
保存整个模型的参数,此时需要传入 ``input_spec`` 参数;
:param only_state_dict: 是否只保存模型的 ``state_dict``;如果为 ``False``,则会调用 ``paddle.jit.save``
函数保存整个模型的参数,此时需要传入 ``input_spec`` 参数;
:kwargs:
* input_spec -- 描述存储模型 ``forward`` 方法的输入;
当 ``only_state_dict`` 为 ``False`` 时必须传入,否则加载时会报错。您可以通过 ``InputSpec`` 或者示例 ``Tensor``
@@ -212,11 +212,10 @@ class PaddleDriver(Driver):
断点重训的保存函数,该函数会负责保存模型和 optimizers, fp16 的 state_dict;以及模型的保存(若 should_save_model 为 True)

:param folder: 保存断点重训的状态的文件夹;save 函数应该在下面新增两(一)个文件 的 FASTNLP_CHECKPOINT_FILENAME 文件与
FASTNLP_MODEL_FILENAME (如果 should_save_model 为 True )。把 model 相关的内容放入到 FASTNLP_MODEL_FILENAME 文件
中,将传入的 states 以及自身产生其它状态一并保存在 FASTNLP_CHECKPOINT_FILENAME 里面。
:param states: 由 trainer 传入的一个字典,其中已经包含了为了实现断点重训所需要保存的其它对象的状态,Driver 应该只需要保存
该对象即可, Driver 应该不需要理解该对象,同时在 driver.load() 的时候,需要将 states 返回回去,load() 返回的值与这里的
传入的值保持一致。
FASTNLP_MODEL_FILENAME (如果 should_save_model 为 True )。把 model 相关的内容放入到 FASTNLP_MODEL_FILENAME 文件中,
将传入的 states 以及自身产生其它状态一并保存在 FASTNLP_CHECKPOINT_FILENAME 里面。
:param states: 由 trainer 传入的一个字典,其中已经包含了为了实现断点重训所需要保存的其它对象的状态,Driver 应该只需要保存该对象即可,
Driver 应该不需要理解该对象,同时在 driver.load() 的时候,需要将 states 返回回去,load() 返回的值与这里的传入的值保持一致。
:param dataloader: 正在使用的 dataloader,需要保存里面的状态使得之后可以从当前迭代的位置恢复。
:param only_state_dict: 是否只保存模型的参数,当 should_save_model 为 False ,该参数无效。
:param should_save_model: 是否应该保存模型,如果为False,Driver 将不负责 model 的保存。


+ 1
- 1
fastNLP/core/drivers/paddle_driver/single_device.py View File

@@ -152,6 +152,6 @@ class PaddleSingleDriver(PaddleDriver):

def is_distributed(self) -> bool:
"""
判断是否为分布式的 **Driver** ,在 ``PaddleSingleDriver`` 中,返回 ``False``
判断是否为分布式的 **Driver** ,在 ``PaddleSingleDriver`` 中,返回 ``False``
"""
return False

+ 2
- 2
fastNLP/core/utils/jittor_utils.py View File

@@ -32,8 +32,8 @@ def is_jittor_dataset(dataset) -> bool:

def jittor_collate_wraps(func, auto_collator: Callable):
"""
对 ``jittor`` 的 ``collate_fn`` 进行 ``wrap`` 封装,。如果数据集为 ``mapping`` 类型,那么采用 ``auto_collator`` ,否则
还是采用 ``jittor`` 的 ``collate_batch``。
对 ``jittor`` 的 ``collate_fn`` 进行 ``wrap`` 封装,。如果数据集为 ``mapping`` 类型,那么采用 ``auto_collator`` ,
否则还是采用 ``jittor`` 的 ``collate_batch``。

:param func:
:param auto_collator:


+ 4
- 4
fastNLP/core/utils/paddle_utils.py View File

@@ -61,8 +61,8 @@ def _convert_data_device(device: Union[str, int]) -> str:

def paddle_to(data: "paddle.Tensor", device: Union[str, int]) -> "paddle.Tensor":
"""
将 ``data`` 迁移到指定的 ``device`` 上。``paddle.Tensor`` 没有类似 ``torch.Tensor`` 的 ``to`` 函数,该函数
只是集成了 :func:`paddle.Tensor.cpu` 和 :func:`paddle.Tensor.cuda` 两个函数。
将 ``data`` 迁移到指定的 ``device`` 上。``paddle.Tensor`` 没有类似 ``torch.Tensor`` 的 ``to`` 函数,
该函数只是集成了 :func:`paddle.Tensor.cpu` 和 :func:`paddle.Tensor.cuda` 两个函数。

:param data: 要迁移的张量;
:param device: 目标设备,可以是 ``str`` 或 ``int`` 类型;
@@ -130,8 +130,8 @@ def paddle_move_data_to_device(batch: Any, device: Optional[Union[str, int]]) ->
将 **paddle** 的数据集合传输到给定设备。只有 :class:`paddle.Tensor` 对象会被传输到设备中,其余保持不变。

:param batch: 需要进行迁移的数据集合;
:param device: 目标设备。可以是显卡设备的编号,或是``cpu``, ``gpu`` 或 ``gpu:x`` 格式的字符串;当这个参数
为 `None`` 时,不会执行任何操作。
:param device: 目标设备。可以是显卡设备的编号,或是``cpu``, ``gpu`` 或 ``gpu:x`` 格式的字符串;
当这个参数为 `None`` 时,不会执行任何操作。
:return: 迁移到新设备上的数据集合;
"""
if device is None:


+ 2
- 2
fastNLP/core/utils/rich_progress.py View File

@@ -1,6 +1,6 @@
"""
该文件用于为 **fastNLP** 提供一个统一的 ``progress bar`` 管理,通过共用一个``Task`` 对象, :class:`~fastNLP.core.Trainer`
的 ``progress bar`` 和 :class:`~fastNLP.core.Evaluator` 中的 ``progress bar`` 才能不冲突
该文件用于为 **fastNLP** 提供一个统一的 ``progress bar`` 管理,通过共用一个``Task`` 对象, :class:`~fastNLP.core.Trainer`
的 ``progress bar`` 和 :class:`~fastNLP.core.Evaluator` 中的 ``progress bar`` 才能不冲突
"""
import sys
from typing import Any, Union, Optional


+ 8
- 8
fastNLP/core/utils/utils.py View File

@@ -60,7 +60,7 @@ def auto_param_call(fn: Callable, *args, signature_fn: Optional[Callable] = None
``value`` 的参数。

1. 该函数用来提供给用户根据字符串匹配从而实现自动调用;
2. 注意 ``mapping`` 默认为 ``None``,如果你希望指定输入和运行函数的参数的对应方式,那么你应当让 ``mapping`` 为一个字典传入进来;
2. 注意 ``mapping`` 默认为 ``None``,如果您希望指定输入和运行函数的参数的对应方式,那么您应当让 ``mapping`` 为一个字典传入进来;
如果 ``mapping`` 不为 ``None``,那么我们一定会先使用 ``mapping`` 将输入的字典的 ``keys`` 修改过来,因此请务必亲自检查 ``mapping`` 的正确性;
3. 如果输入的函数的参数有默认值,那么如果之后的输入中没有该参数对应的值,我们就会使用该参数对应的默认值,否则也会使用之后的输入的值;
4. 如果输入的函数是一个 ``partial`` 函数,情况同第三点,即和默认参数的情况相同;
@@ -82,8 +82,8 @@ def auto_param_call(fn: Callable, *args, signature_fn: Optional[Callable] = None

:param fn: 用来进行实际计算的函数,其参数可以包含有默认值;
:param args: 一系列的位置参数,应当为一系列的字典,我们需要从这些输入中提取 ``fn`` 计算所需要的实际参数;
:param signature_fn: 函数,用来替换 ``fn`` 的函数签名,如果该参数不为 ``None``,那么我们首先会从该函数中提取函数签名,然后通过该函数签名提取
参数值后,再传给 ``fn`` 进行实际的运算;
:param signature_fn: 函数,用来替换 ``fn`` 的函数签名,如果该参数不为 ``None``,那么我们首先会从该函数中提取函数签名,
然后通过该函数签名提取参数值后,再传给 ``fn`` 进行实际的运算;
:param mapping: 一个字典,用来更改其前面的字典的键值;

:return: 返回 ``fn`` 运行的结果;
@@ -195,8 +195,8 @@ def _get_fun_msg(fn, with_fp=True)->str:

def _check_valid_parameters_number(fn, expected_params:List[str], fn_name=None):
"""
检查一个函数是否需要 expected_params 参数(检测数量是否匹配)。除掉 self (如果是method),给定默认值的参数等。如果匹配不上,就会
进行报错。
检查一个函数是否需要 expected_params 参数(检测数量是否匹配)。除掉 self (如果是method),给定默认值的参数等。
如果匹配不上,就会进行报错。

:param fn: 需要检测的函数,可以是 method 或者 function 。
:param expected_params: 期待应该支持的参数。
@@ -345,8 +345,8 @@ def apply_to_collection(
:param dtype: 数据的类型,函数 ``function`` 只会被应用于 ``data`` 中类型为 ``dtype`` 的数据;
:param function: 对数据进行处理的函数;
:param args: ``function`` 所需要的其它参数;
:param wrong_dtype: ``function`` 一定不会生效的数据类型。如果数据既是 ``wrong_dtype`` 类型又是 ``dtype`` 类型
那么也不会生效;
:param wrong_dtype: ``function`` 一定不会生效的数据类型。
如果数据既是 ``wrong_dtype`` 类型又是 ``dtype`` 类型那么也不会生效;
:param include_none: 是否包含执行结果为 ``None`` 的数据,默认为 ``True``;
:param kwargs: ``function`` 所需要的其它参数;
:return: 经过 ``function`` 处理后的数据集合;
@@ -587,7 +587,7 @@ def seq_len_to_mask(seq_len, max_len: Optional[int]):
:param seq_len: 大小为 ``(B,)`` 的长度序列;
:param int max_len: 将长度补齐或截断到 ``max_len``。默认情况(为 ``None``)使用的是 ``seq_len`` 中最长的长度;
但在 :class:`torch.nn.DataParallel` 等分布式的场景下可能不同卡的 ``seq_len`` 会有区别,所以需要传入
一个 ``max_len`` 使得 ``mask`` 的补齐或截断到该长度。
``max_len`` 使得 ``mask`` 的补齐或截断到该长度。
:return: 大小为 ``(B, max_len)`` 的 ``mask``, 元素类型为 ``bool`` 或 ``uint8``
"""
if isinstance(seq_len, np.ndarray):


+ 3
- 3
fastNLP/modules/mix_modules/utils.py View File

@@ -202,12 +202,12 @@ def jittor2torch(batch: Any, device: str = None, no_gradient: bool = None) -> An

.. note::

注意,由于 **pytorch** 和 **jittor** 之间的差异,从 :class:`jittor.Var` 转换
:class:`torch.Tensor` 的过程中无法保留原张量的梯度。
注意,由于 **pytorch** 和 **jittor** 之间的差异,从 :class:`jittor.Var` 转换
:class:`torch.Tensor` 的过程中无法保留原张量的梯度。

:param batch: 包含 :class:`jittor.Var` 类型的数据集合;
:param device: 是否将转换后的张量迁移到特定设备上。为 ``None``时,和输入保持一致;
:param no_gradient: 是否保留原张量的梯度,在这个函数中该参数无效
:param no_gradient: 是否保留原张量的梯度,在这个函数中该参数无效;
:return: 转换后的数据;
"""



Loading…
Cancel
Save