@@ -282,32 +282,41 @@ class Trainer(TrainerEventTrigger): | |||||
:kwargs: | :kwargs: | ||||
* *torch_kwargs* -- 用于在指定 ``driver`` 为 'torch' 时设定具体 driver 实例的一些参数: | * *torch_kwargs* -- 用于在指定 ``driver`` 为 'torch' 时设定具体 driver 实例的一些参数: | ||||
* ddp_kwargs -- 用于在使用 ``TorchDDPDriver`` 时指定 ``DistributedDataParallel`` 初始化时的参数;例如传入 | * ddp_kwargs -- 用于在使用 ``TorchDDPDriver`` 时指定 ``DistributedDataParallel`` 初始化时的参数;例如传入 | ||||
{'find_unused_parameters': True} 来解决有参数不参与前向运算导致的报错等; | |||||
{'find_unused_parameters': True} 来解决有参数不参与前向运算导致的报错等; | |||||
* set_grad_to_none -- 是否在训练过程中在每一次 optimizer 更新后将 grad 置为 None; | * set_grad_to_none -- 是否在训练过程中在每一次 optimizer 更新后将 grad 置为 None; | ||||
* torch_non_blocking -- 表示用于 pytorch 的 tensor 的 to 方法的参数 non_blocking; | * torch_non_blocking -- 表示用于 pytorch 的 tensor 的 to 方法的参数 non_blocking; | ||||
* *paddle_kwargs* -- 用于在指定 ``driver`` 为 'paddle' 时设定具体 driver 实例的一些参数: | |||||
* fleet_kwargs -- 用于在使用 ``PaddleFleetDriver`` 时指定 ``DataParallel`` 和 ``fleet`` 初始化时的参数,包括: | |||||
* is_collective -- 是否使用 paddle 集群式的分布式训练方法,目前仅支持为 True 的情况; | |||||
* role_maker -- 初始化 ``fleet`` 分布式训练 API 时使用的 ``RoleMaker`` | |||||
* 其它用于初始化 ``DataParallel`` 的参数; | |||||
* *data_device* -- 一个具体的 driver 实例中,有 ``model_device`` 和 ``data_device``,前者表示模型所在的设备,后者表示 | * *data_device* -- 一个具体的 driver 实例中,有 ``model_device`` 和 ``data_device``,前者表示模型所在的设备,后者表示 | ||||
当 ``model_device`` 为 None 时应当将数据迁移到哪个设备; | |||||
当 ``model_device`` 为 None 时应当将数据迁移到哪个设备; | |||||
.. note:: | |||||
.. note:: | |||||
注意您在绝大部分情况下不会用到该参数! | 注意您在绝大部分情况下不会用到该参数! | ||||
1. 当 driver 实例的 ``model_device`` 不为 None 时,该参数无效; | 1. 当 driver 实例的 ``model_device`` 不为 None 时,该参数无效; | ||||
2. 对于 pytorch,仅当用户自己通过 ``python -m torch.distributed.launch`` 并且自己初始化 ``init_process_group`` 时, | 2. 对于 pytorch,仅当用户自己通过 ``python -m torch.distributed.launch`` 并且自己初始化 ``init_process_group`` 时, | ||||
driver 实例的 ``model_device`` 才会为 None; | driver 实例的 ``model_device`` 才会为 None; | ||||
3. 对于 paddle,该参数无效; | |||||
* *use_dist_sampler* -- 表示是否使用分布式的 ``sampler``。在多卡时,分布式 ``sampler`` 将自动决定每张卡上读取的 sample ,使得一个 epoch | * *use_dist_sampler* -- 表示是否使用分布式的 ``sampler``。在多卡时,分布式 ``sampler`` 将自动决定每张卡上读取的 sample ,使得一个 epoch | ||||
内所有卡的 sample 加起来为一整个数据集的 sample。默认会根据 driver 是否为分布式进行设置。 | |||||
内所有卡的 sample 加起来为一整个数据集的 sample。默认会根据 driver 是否为分布式进行设置。 | |||||
* *evaluate_use_dist_sampler* -- 表示在 ``Evaluator`` 中在使用分布式的时候是否将 dataloader 的 ``sampler`` 替换为分布式的 ``sampler``;默认为 ``True``; | * *evaluate_use_dist_sampler* -- 表示在 ``Evaluator`` 中在使用分布式的时候是否将 dataloader 的 ``sampler`` 替换为分布式的 ``sampler``;默认为 ``True``; | ||||
* *output_from_new_proc* -- 应当为一个字符串,表示在多进程的 driver 中其它进程的输出流应当被做如何处理;其值应当为以下之一: | * *output_from_new_proc* -- 应当为一个字符串,表示在多进程的 driver 中其它进程的输出流应当被做如何处理;其值应当为以下之一: | ||||
["all", "ignore", "only_error"];当该参数的值不是以上值时,该值应当表示一个文件夹的名字,我们会将其他 rank 的输出流重定向到 | |||||
log 文件中,然后将 log 文件保存在通过该参数值设定的文件夹中;默认为 "only_error"; | |||||
["all", "ignore", "only_error"];当该参数的值不是以上值时,该值应当表示一个文件夹的名字,我们会将其他 rank 的输出流重定向到 | |||||
log 文件中,然后将 log 文件保存在通过该参数值设定的文件夹中;默认为 "only_error"; | |||||
注意该参数仅当使用分布式的 ``driver`` 时才有效,例如 ``TorchDDPDriver``; | |||||
注意该参数仅当使用分布式的 ``driver`` 时才有效,例如 ``TorchDDPDriver``; | |||||
* *progress_bar* -- 以哪种方式显示 progress ,目前支持[None, 'raw', 'rich', 'auto'] 或者 RichCallback, RawTextCallback对象, | * *progress_bar* -- 以哪种方式显示 progress ,目前支持[None, 'raw', 'rich', 'auto'] 或者 RichCallback, RawTextCallback对象, | ||||
默认为 auto , auto 表示如果检测到当前 terminal 为交互型则使用 RichCallback,否则使用 RawTextCallback对象。如果 | |||||
需要定制 progress bar 的参数,例如打印频率等,可以传入 RichCallback, RawTextCallback 对象。 | |||||
默认为 auto , auto 表示如果检测到当前 terminal 为交互型则使用 RichCallback,否则使用 RawTextCallback对象。如果 | |||||
需要定制 progress bar 的参数,例如打印频率等,可以传入 RichCallback, RawTextCallback 对象。 | |||||
* *train_input_mapping* -- 与 input_mapping 一致,但是只用于 ``Trainer`` 中。与 input_mapping 互斥。 | * *train_input_mapping* -- 与 input_mapping 一致,但是只用于 ``Trainer`` 中。与 input_mapping 互斥。 | ||||
* *train_output_mapping* -- 与 output_mapping 一致,但是只用于 ``Trainer`` 中。与 output_mapping 互斥。 | * *train_output_mapping* -- 与 output_mapping 一致,但是只用于 ``Trainer`` 中。与 output_mapping 互斥。 | ||||
* *evaluate_input_mapping* -- 与 input_mapping 一致,但是只用于 ``Evaluator`` 中。与 input_mapping 互斥。 | * *evaluate_input_mapping* -- 与 input_mapping 一致,但是只用于 ``Evaluator`` 中。与 input_mapping 互斥。 | ||||
@@ -164,7 +164,7 @@ class PaddleDataLoader(DataLoader): | |||||
""" | """ | ||||
获取当前 ``batch`` 中每条数据对应的索引。 | 获取当前 ``batch`` 中每条数据对应的索引。 | ||||
:return: 当前 ``batch`` 数据的索引 | |||||
:return: 当前 ``batch`` 数据的索引; | |||||
""" | """ | ||||
return self.cur_batch_indices | return self.cur_batch_indices | ||||
@@ -172,7 +172,7 @@ class TorchDataLoader(DataLoader): | |||||
""" | """ | ||||
获取当前 ``batch`` 中每条数据对应的索引。 | 获取当前 ``batch`` 中每条数据对应的索引。 | ||||
:return: 当前 ``batch`` 数据的索引 | |||||
:return: 当前 ``batch`` 数据的索引; | |||||
""" | """ | ||||
return self.cur_batch_indices | return self.cur_batch_indices | ||||
@@ -400,16 +400,22 @@ class DataSet: | |||||
new_field_name: str = None, num_proc: int = 0, | new_field_name: str = None, num_proc: int = 0, | ||||
progress_desc: str = None, show_progress_bar: bool = True): | progress_desc: str = None, show_progress_bar: bool = True): | ||||
r""" | r""" | ||||
将 :class:`~DataSet` 每个 ``instance`` 中为 ``field_name`` 的 ``field`` 传给函数 ``func``,并获取函数的返回值。 | |||||
:param field_name: 传入 ``func`` 的 ``field`` 名称。 | |||||
:param func: 一个函数,其输入是 ``instance`` 中名为 ``field_name`` 的 ``field`` 的内容。 | |||||
:param new_field_name: 将 ``func`` 返回的内容放入到 ``new_field_name`` 对应的 ``field`` 中,如果名称与已有的 ``field`` 相同 | |||||
则进行覆盖。如果为 ``None`` 则不会覆盖和创建 ``field`` 。 | |||||
:param num_proc: 使用进程的数量。请注意,由于 ``python`` 语言的特性,使用了多少进程就会导致多少倍内存的增长。 | |||||
:param progress_desc: 进度条的描述字符,默认为 ``Main``。 | |||||
:param show_progress_bar: 是否展示进度条;默认为展示。 | |||||
:return: 从函数 ``func`` 中得到的返回值。 | |||||
将 :class:`~DataSet` 每个 ``instance`` 中为 ``field_name`` 的 ``field`` 传给函数 ``func``,并写入到 ``new_field_name`` | |||||
中。 | |||||
:param field_name: 传入 ``func`` 的 ``field`` 名称; | |||||
:param func: 对指定 ``field`` 进行处理的函数,注意其输入应为 ``instance`` 中名为 ``field_name`` 的 ``field`` 的内容; | |||||
:param new_field_name: 函数执行结果写入的 ``field`` 名称。该函数会将 ``func`` 返回的内容放入到 ``new_field_name`` 对 | |||||
应的 ``field`` 中,注意如果名称与已有的 ``field`` 相同则会进行覆盖。如果为 ``None`` 则不会覆盖和创建 ``field`` ; | |||||
:param num_proc: 使用进程的数量。 | |||||
.. note:: | |||||
由于 ``python`` 语言的特性,设置该参数后会导致相应倍数的内存增长,这可能会对您程序的执行带来一定的影响。 | |||||
:param progress_desc: 进度条的描述字符,默认为 ``Main``; | |||||
:param show_progress_bar: 是否在处理过程中展示进度条; | |||||
:return: 从函数 ``func`` 中得到的返回值; | |||||
""" | """ | ||||
assert len(self) != 0, "Null DataSet cannot use apply_field()." | assert len(self) != 0, "Null DataSet cannot use apply_field()." | ||||
if not self.has_field(field_name=field_name): | if not self.has_field(field_name=field_name): | ||||
@@ -23,9 +23,9 @@ def choose_driver(model, driver: Union[str, Driver], device: Optional[Union[int, | |||||
elif driver in {"jittor"}: | elif driver in {"jittor"}: | ||||
from fastNLP.core.drivers.jittor_driver.initialize_jittor_driver import initialize_jittor_driver | from fastNLP.core.drivers.jittor_driver.initialize_jittor_driver import initialize_jittor_driver | ||||
return initialize_jittor_driver(driver, device, model, **kwargs) | return initialize_jittor_driver(driver, device, model, **kwargs) | ||||
elif driver in {"paddle", "fleet"}: | |||||
elif driver in {"paddle"}: | |||||
from fastNLP.core.drivers.paddle_driver.initialize_paddle_driver import initialize_paddle_driver | from fastNLP.core.drivers.paddle_driver.initialize_paddle_driver import initialize_paddle_driver | ||||
return initialize_paddle_driver(driver, device, model, **kwargs) | return initialize_paddle_driver(driver, device, model, **kwargs) | ||||
else: | else: | ||||
raise ValueError("Parameter `driver` can only be one of these values: ['torch', 'fairscale', " | raise ValueError("Parameter `driver` can only be one of these values: ['torch', 'fairscale', " | ||||
"'jittor', 'paddle', 'fleet'].") | |||||
"'jittor', 'paddle'].") |
@@ -7,18 +7,22 @@ from fastNLP.envs.imports import _NEED_IMPORT_JITTOR | |||||
if _NEED_IMPORT_JITTOR: | if _NEED_IMPORT_JITTOR: | ||||
import jittor | import jittor | ||||
__all__ = [] | |||||
def initialize_jittor_driver(driver: str, device: Union[str, int, List[int]], model: jittor.Module, **kwargs) -> JittorDriver: | def initialize_jittor_driver(driver: str, device: Union[str, int, List[int]], model: jittor.Module, **kwargs) -> JittorDriver: | ||||
r""" | r""" | ||||
用来根据参数 `driver` 和 `device` 来确定并且初始化一个具体的 `Driver` 实例然后返回回去; | |||||
在这个函数中,我们会根据用户设置的device来确定JittorDriver的mode。 | |||||
用来根据参数 ``device`` 来确定并且初始化一个具体的 ``Driver`` 实例然后返回回去。 | |||||
.. todo:: | |||||
创建多卡的 driver | |||||
:param driver: 该参数的值应为以下之一:["jittor"]; | |||||
:param device: jittor运行的设备 | |||||
:param driver: 该参数的值应为以下之一:``["jittor"]``; | |||||
:param device: ``jittor`` 运行的设备; | |||||
:param model: 训练或者评测的具体的模型; | :param model: 训练或者评测的具体的模型; | ||||
:param kwargs: | :param kwargs: | ||||
:return: 返回一个元组,元组的第一个值是具体的基于 jittor 的 `Driver` 实例,元组的第二个值是该 driver 的名字(用于检测一个脚本中 | |||||
先后 driver 的次序的正确问题); | |||||
:return: :class:`~fastNLP.core.JittorSingleDriver` 或 :class:`~fastNLP.core.JittorMPIDriver` 实例; | |||||
""" | """ | ||||
if driver not in {"jittor"}: | if driver not in {"jittor"}: | ||||
@@ -24,7 +24,17 @@ if _NEED_IMPORT_JITTOR: | |||||
class JittorDriver(Driver): | class JittorDriver(Driver): | ||||
r""" | r""" | ||||
Jittor 框架的 Driver | |||||
``Jittor`` 框架的 ``Driver`` | |||||
.. note:: | |||||
这是一个正在开发中的功能,敬请期待。 | |||||
.. todo:: | |||||
实现 fp16 的设置,且支持 cpu 和 gpu 的切换; | |||||
实现用于断点重训的 save 和 load 函数; | |||||
""" | """ | ||||
def __init__(self, model, fp16: bool = False, **kwargs): | def __init__(self, model, fp16: bool = False, **kwargs): | ||||
@@ -13,6 +13,14 @@ __all__ = [ | |||||
] | ] | ||||
class JittorMPIDriver(JittorDriver): | class JittorMPIDriver(JittorDriver): | ||||
""" | |||||
执行 ``Jittor`` 框架下分布式训练的 ``Driver``。 | |||||
.. note:: | |||||
这是一个正在开发中的功能,敬请期待。 | |||||
""" | |||||
def __init__( | def __init__( | ||||
self, | self, | ||||
model, | model, | ||||
@@ -16,8 +16,17 @@ __all__ = [ | |||||
class JittorSingleDriver(JittorDriver): | class JittorSingleDriver(JittorDriver): | ||||
r""" | r""" | ||||
用于 cpu 和 单卡 gpu 运算 | |||||
TODO: jittor 的 fp16 | |||||
``Jittor`` 框架下用于 ``cpu`` 和单卡 ``gpu`` 运算的 ``Driver``。 | |||||
.. note:: | |||||
这是一个正在开发中的功能,敬请期待。 | |||||
.. todo:: | |||||
支持 cpu 和 gpu 的切换; | |||||
实现断点重训中替换 dataloader 的 set_dist_repro_dataloader 函数 | |||||
""" | """ | ||||
def __init__(self, model, device=None, fp16: bool = False, **kwargs): | def __init__(self, model, device=None, fp16: bool = False, **kwargs): | ||||
@@ -30,11 +39,6 @@ class JittorSingleDriver(JittorDriver): | |||||
self.world_size = 1 | self.world_size = 1 | ||||
def step(self): | def step(self): | ||||
""" | |||||
jittor optimizers 的step函数可以传入参数loss | |||||
此时会同时进行 zero_grad 和 backward | |||||
为了统一,这里暂不使用这样的方式 | |||||
""" | |||||
for optimizer in self.optimizers: | for optimizer in self.optimizers: | ||||
optimizer.step() | optimizer.step() | ||||
@@ -5,10 +5,11 @@ from fastNLP.envs.imports import _NEED_IMPORT_JITTOR | |||||
if _NEED_IMPORT_JITTOR: | if _NEED_IMPORT_JITTOR: | ||||
import jittor | import jittor | ||||
__all__ = [] | |||||
class DummyGradScaler: | class DummyGradScaler: | ||||
""" | """ | ||||
用于仿造的GradScaler对象,防止重复写大量的if判断 | 用于仿造的GradScaler对象,防止重复写大量的if判断 | ||||
""" | """ | ||||
def __init__(self, *args, **kwargs): | def __init__(self, *args, **kwargs): | ||||
pass | pass | ||||
@@ -1,8 +1,6 @@ | |||||
import os | import os | ||||
from typing import List, Union, Optional, Dict, Tuple, Callable | from typing import List, Union, Optional, Dict, Tuple, Callable | ||||
from fastNLP.core.utils.paddle_utils import get_device_from_visible | |||||
from .paddle_driver import PaddleDriver | from .paddle_driver import PaddleDriver | ||||
from .fleet_launcher import FleetLauncher | from .fleet_launcher import FleetLauncher | ||||
from .utils import ( | from .utils import ( | ||||
@@ -19,7 +17,9 @@ from fastNLP.core.utils import ( | |||||
check_user_specific_params, | check_user_specific_params, | ||||
is_in_paddle_dist, | is_in_paddle_dist, | ||||
is_in_paddle_dist, | is_in_paddle_dist, | ||||
get_paddle_device_id, | |||||
) | ) | ||||
from fastNLP.core.utils.paddle_utils import _convert_data_device | |||||
from fastNLP.envs.distributed import rank_zero_rm | from fastNLP.envs.distributed import rank_zero_rm | ||||
from fastNLP.core.samplers import ( | from fastNLP.core.samplers import ( | ||||
ReproduceBatchSampler, | ReproduceBatchSampler, | ||||
@@ -31,7 +31,12 @@ from fastNLP.core.samplers import ( | |||||
re_instantiate_sampler, | re_instantiate_sampler, | ||||
conversion_between_reproducible_and_unrepeated_sampler, | conversion_between_reproducible_and_unrepeated_sampler, | ||||
) | ) | ||||
from fastNLP.envs.env import FASTNLP_DISTRIBUTED_CHECK, FASTNLP_GLOBAL_SEED, FASTNLP_NO_SYNC | |||||
from fastNLP.envs.env import ( | |||||
FASTNLP_DISTRIBUTED_CHECK, | |||||
FASTNLP_GLOBAL_SEED, | |||||
FASTNLP_NO_SYNC, | |||||
USER_CUDA_VISIBLE_DEVICES, | |||||
) | |||||
from fastNLP.core.log import logger | from fastNLP.core.log import logger | ||||
if _NEED_IMPORT_PADDLE: | if _NEED_IMPORT_PADDLE: | ||||
@@ -51,7 +56,7 @@ class PaddleFleetDriver(PaddleDriver): | |||||
def __init__( | def __init__( | ||||
self, | self, | ||||
model, | model, | ||||
parallel_device: Optional[Union[List[int], int]], | |||||
parallel_device: Optional[Union[List[str], str]], | |||||
is_pull_by_paddle_run: bool = False, | is_pull_by_paddle_run: bool = False, | ||||
fp16: bool = False, | fp16: bool = False, | ||||
**kwargs | **kwargs | ||||
@@ -185,6 +190,8 @@ class PaddleFleetDriver(PaddleDriver): | |||||
不管是什么情况,`PaddleFleetDriver` 在 `setup` 函数的最后,都会将所有进程的 pid 主动记录下来,这样当一个进程出现 exception 后, | 不管是什么情况,`PaddleFleetDriver` 在 `setup` 函数的最后,都会将所有进程的 pid 主动记录下来,这样当一个进程出现 exception 后, | ||||
driver 的 on_exception 函数就会被 trainer 调用,其会调用 os.kill 指令将其它进程 kill 掉; | driver 的 on_exception 函数就会被 trainer 调用,其会调用 os.kill 指令将其它进程 kill 掉; | ||||
""" | """ | ||||
if USER_CUDA_VISIBLE_DEVICES not in os.environ: | |||||
raise RuntimeError("To run paddle distributed training, please set `FASTNLP_BACKEND` to 'paddle' before using FastNLP.") | |||||
super(PaddleFleetDriver, self).__init__(model, fp16=fp16, **kwargs) | super(PaddleFleetDriver, self).__init__(model, fp16=fp16, **kwargs) | ||||
# 如果不是通过 launch 启动,要求用户必须传入 parallel_device | # 如果不是通过 launch 启动,要求用户必须传入 parallel_device | ||||
@@ -213,25 +220,6 @@ class PaddleFleetDriver(PaddleDriver): | |||||
"you initialize the paddle distribued process out of our control.") | "you initialize the paddle distribued process out of our control.") | ||||
self.outside_fleet = True | self.outside_fleet = True | ||||
# 用户只有将模型上传到对应机器上后才能用 DataParallel 包裹,因此如果用户在外面初始化了 Fleet,那么在 PaddleFleetDriver 中 | |||||
# 我们就直接将 model_device 置为 None; | |||||
self._model_device = None | |||||
# 当参数 `device` 为 None 时并且该参数不为 None,表示将对应的数据移到指定的机器上; | |||||
self._data_device = kwargs.get("data_device", None) | |||||
if self._data_device is not None: | |||||
if isinstance(self._data_device, int): | |||||
if self._data_device < 0: | |||||
raise ValueError("Parameter `data_device` can not be smaller than 0.") | |||||
_could_use_device_num = paddle.device.cuda.device_count() | |||||
if self._data_device >= _could_use_device_num: | |||||
raise ValueError("The gpu device that parameter `device` specifies is not existed.") | |||||
self._data_device = f"gpu:{self._data_device}" | |||||
elif not isinstance(self._data_device, str): | |||||
raise ValueError("Parameter `device` is wrong type, please check our documentation for the right use.") | |||||
if self.outside_fleet and paddle.device.get_device() != self._data_device: | |||||
logger.warning("`Parameter data_device` is not equal to paddle.deivce.get_device(), " | |||||
"please keep them equal to avoid some potential bugs.") | |||||
self.world_size = None | self.world_size = None | ||||
self.global_rank = 0 | self.global_rank = 0 | ||||
@@ -304,7 +292,8 @@ class PaddleFleetDriver(PaddleDriver): | |||||
else: | else: | ||||
# 已经设置过一次,保证参数必须是一样的 | # 已经设置过一次,保证参数必须是一样的 | ||||
pre_gpus = os.environ[FASTNLP_DISTRIBUTED_CHECK] | pre_gpus = os.environ[FASTNLP_DISTRIBUTED_CHECK] | ||||
pre_gpus = [int (x) for x in pre_gpus.split(",")] | |||||
pre_gpus = [int(x) for x in pre_gpus.split(",")] | |||||
cur_gpus = [get_paddle_device_id(g) for g in self.parallel_device] | |||||
if sorted(pre_gpus) != sorted(self.parallel_device): | if sorted(pre_gpus) != sorted(self.parallel_device): | ||||
raise RuntimeError("Notice you are using `PaddleFleetDriver` after one instantiated `PaddleFleetDriver`, it is not" | raise RuntimeError("Notice you are using `PaddleFleetDriver` after one instantiated `PaddleFleetDriver`, it is not" | ||||
"allowed that your second `PaddleFleetDriver` has a new setting of parameters `parallel_device`.") | "allowed that your second `PaddleFleetDriver` has a new setting of parameters `parallel_device`.") | ||||
@@ -410,8 +399,6 @@ class PaddleFleetDriver(PaddleDriver): | |||||
@property | @property | ||||
def data_device(self): | def data_device(self): | ||||
if self.outside_fleet: | |||||
return self._data_device | |||||
return self.model_device | return self.model_device | ||||
def model_call(self, batch, fn: Callable, signature_fn: Optional[Callable]) -> Dict: | def model_call(self, batch, fn: Callable, signature_fn: Optional[Callable]) -> Dict: | ||||
@@ -565,7 +552,7 @@ class PaddleFleetDriver(PaddleDriver): | |||||
def broadcast_object(self, obj, src:int=0, group=None, **kwargs): | def broadcast_object(self, obj, src:int=0, group=None, **kwargs): | ||||
# 因为设置了CUDA_VISIBLE_DEVICES,可能会引起错误 | # 因为设置了CUDA_VISIBLE_DEVICES,可能会引起错误 | ||||
device = get_device_from_visible(self.data_device) | |||||
device = _convert_data_device(self.data_device) | |||||
return fastnlp_paddle_broadcast_object(obj, src, device=device, group=group) | return fastnlp_paddle_broadcast_object(obj, src, device=device, group=group) | ||||
def all_gather(self, obj, group=None) -> List: | def all_gather(self, obj, group=None) -> List: | ||||
@@ -11,11 +11,14 @@ from fastNLP.envs.env import ( | |||||
FASTNLP_LOG_LEVEL, | FASTNLP_LOG_LEVEL, | ||||
FASTNLP_GLOBAL_SEED, | FASTNLP_GLOBAL_SEED, | ||||
) | ) | ||||
from fastNLP.core.utils import get_paddle_device_id | |||||
from .utils import ( | from .utils import ( | ||||
find_free_ports, | find_free_ports, | ||||
reset_seed, | reset_seed, | ||||
) | ) | ||||
__all__ = [] | |||||
# 记录各个进程信息 | # 记录各个进程信息 | ||||
class SubTrainer(object): | class SubTrainer(object): | ||||
""" | """ | ||||
@@ -34,11 +37,11 @@ class FleetLauncher: | |||||
""" | """ | ||||
def __init__( | def __init__( | ||||
self, | self, | ||||
devices: List[int], | |||||
devices: List[str], | |||||
output_from_new_proc: str = "only_error" | output_from_new_proc: str = "only_error" | ||||
): | ): | ||||
self.devices = devices | |||||
self.devices = [ get_paddle_device_id(g) for g in devices] | |||||
self.output_from_new_proc = output_from_new_proc | self.output_from_new_proc = output_from_new_proc | ||||
self.setup() | self.setup() | ||||
@@ -7,50 +7,58 @@ from .single_device import PaddleSingleDriver | |||||
from .fleet import PaddleFleetDriver | from .fleet import PaddleFleetDriver | ||||
from fastNLP.envs.imports import _NEED_IMPORT_PADDLE | from fastNLP.envs.imports import _NEED_IMPORT_PADDLE | ||||
from fastNLP.core.utils import is_in_paddle_launch_dist | |||||
from fastNLP.envs.env import USER_CUDA_VISIBLE_DEVICES | |||||
from fastNLP.core.utils import is_in_paddle_launch_dist, get_paddle_gpu_str | |||||
from fastNLP.core.log import logger | from fastNLP.core.log import logger | ||||
if _NEED_IMPORT_PADDLE: | if _NEED_IMPORT_PADDLE: | ||||
import paddle | import paddle | ||||
__all__ = [] | |||||
def initialize_paddle_driver(driver: str, device: Optional[Union[str, int, List[int]]], | def initialize_paddle_driver(driver: str, device: Optional[Union[str, int, List[int]]], | ||||
model: "paddle.nn.Layer", **kwargs) -> PaddleDriver: | model: "paddle.nn.Layer", **kwargs) -> PaddleDriver: | ||||
r""" | r""" | ||||
用来根据参数 `driver` 和 `device` 来确定并且初始化一个具体的 `Driver` 实例然后返回回去; | |||||
1、如果检测到当前进程为用户通过 `python -m paddle.distributed.launch xxx.py` 方式拉起的,则将 | |||||
设备自动设置为用户指定的设备(由于我们在引入 fastNLP 进行了特殊的设置,因此可以通过 `CUDA_VISIBLE_DEVICES` 获取) | |||||
2、如果检测到输入的 `driver` 是 `paddle` 但 `device` 包含了多个设备,那么我们会给出警告并且自动返回多卡的 Driver | |||||
3、如果检测到输入的 `driver` 是 `fleet` 但 `device` 仅有一个设备,那么我们会给出警告但仍旧返回多卡的 Driver | |||||
用来根据参数 ``device`` 来确定并且初始化一个具体的 ``Driver`` 实例。 | |||||
1. 如果检测到当前进程为用户通过 ``python -m paddle.distributed.launch xxx.py`` 方式拉起的,则将 | |||||
设备自动设置为用户指定的设备(由于我们要求分布式训练必须进行 ``backend`` 的设置,因此可以通过 ``CUDA_VISIBLE_DEVICES`` 获取) | |||||
2. 如果 ``device`` 包含了多个设备,则返回一个 :class:`~fastNLP.core.PaddleFleetDriver` 实例,否则返回 | |||||
单卡的 :class:`~fastNLP.core.PaddleSingleDriver` 实例 | |||||
:param driver: 使用的 ``driver`` 类型,在这个函数中仅支持 ``paddle`` | |||||
:param device: 该参数的格式与 `Trainer` 对参数 `device` 的要求一致; | |||||
:param driver: 使用的 ``driver`` 类型,在这个函数中仅支持 ``paddle``; | |||||
:param device: 该参数的格式与 ``Trainer`` 对参数 ``device`` 的要求一致; | |||||
:param model: 训练或者评测的具体的模型; | :param model: 训练或者评测的具体的模型; | ||||
:return: 返回构造的 `Driver` 实例。 | |||||
:return: 一个 :class:`~fastNLP.core.PaddleSingleDriver` 或 :class:`~fastNLP.core.PaddleFleetDriver` 实例; | |||||
""" | """ | ||||
if driver != "paddle": | if driver != "paddle": | ||||
raise ValueError("When initialize PaddleDriver, parameter `driver` must be 'paddle'.") | raise ValueError("When initialize PaddleDriver, parameter `driver` must be 'paddle'.") | ||||
user_visible_devices = os.getenv(USER_CUDA_VISIBLE_DEVICES) | |||||
if is_in_paddle_launch_dist(): | if is_in_paddle_launch_dist(): | ||||
if user_visible_devices is None: | |||||
raise RuntimeError("To run paddle distributed training, please set `FASTNLP_BACKEND` to 'paddle' before using FastNLP.") | |||||
if device is not None: | if device is not None: | ||||
logger.warning_once("Parameter `device` would be ignored when you are using `paddle.distributed.launch` to pull " | logger.warning_once("Parameter `device` would be ignored when you are using `paddle.distributed.launch` to pull " | ||||
"up your script. And we will directly get the local device via " | |||||
"and `os.environ['CUDA_VISIBLE_DEVICES']``.") | |||||
device = [int(g) for g in os.environ["CUDA_VISIBLE_DEVICES"].split(",")] | |||||
# TODO 目前一个进程仅对应一个卡,所以暂时传入一个 int | |||||
"up your script. And we will directly get the local device via environment variables.") | |||||
_visible_list = user_visible_devices.split(",") | |||||
device = [ f"gpu:{_visible_list.index(g) }" for g in os.environ["CUDA_VISIBLE_DEVICES"].split(",")] | |||||
# TODO 目前一个进程仅对应一个卡,所以暂时传入单个 | |||||
return PaddleFleetDriver(model, device[0], True, **kwargs) | return PaddleFleetDriver(model, device[0], True, **kwargs) | ||||
user_visible_devices = os.getenv("USER_CUDA_VISIBLE_DEVICES") | |||||
if user_visible_devices is None: | if user_visible_devices is None: | ||||
raise RuntimeError("`USER_CUDA_VISIBLE_DEVICES` cannot be None, please check if you have set " | |||||
"`FASTNLP_BACKEND` to 'paddle' before using FastNLP.") | |||||
_could_use_device_num = len(user_visible_devices.split(",")) | |||||
_could_use_device_num = paddle.device.cuda.device_count() | |||||
else: | |||||
_could_use_device_num = len(user_visible_devices.split(",")) | |||||
if isinstance(device, int): | if isinstance(device, int): | ||||
if device < 0 and device != -1: | if device < 0 and device != -1: | ||||
raise ValueError("Parameter `device` can only be '-1' when it is smaller than 0.") | raise ValueError("Parameter `device` can only be '-1' when it is smaller than 0.") | ||||
if device >= _could_use_device_num: | if device >= _could_use_device_num: | ||||
raise ValueError("The gpu device that parameter `device` specifies is not existed.") | raise ValueError("The gpu device that parameter `device` specifies is not existed.") | ||||
if device == -1: | if device == -1: | ||||
device = list(range(_could_use_device_num)) | |||||
device = [ get_paddle_gpu_str(g) for g in range(_could_use_device_num)] | |||||
elif isinstance(device, Sequence) and not isinstance(device, str): | elif isinstance(device, Sequence) and not isinstance(device, str): | ||||
device = list(set(device)) | device = list(set(device)) | ||||
for each in device: | for each in device: | ||||
@@ -61,8 +69,10 @@ def initialize_paddle_driver(driver: str, device: Optional[Union[str, int, List[ | |||||
elif each >= _could_use_device_num: | elif each >= _could_use_device_num: | ||||
raise ValueError("When parameter `device` is 'Sequence' type, the value in it should not be bigger than" | raise ValueError("When parameter `device` is 'Sequence' type, the value in it should not be bigger than" | ||||
" the available gpu number.") | " the available gpu number.") | ||||
device = [get_paddle_gpu_str(g) for g in device] | |||||
elif device is not None and not isinstance(device, str): | elif device is not None and not isinstance(device, str): | ||||
raise ValueError("Parameter `device` is wrong type, please check our documentation for the right use.") | raise ValueError("Parameter `device` is wrong type, please check our documentation for the right use.") | ||||
if isinstance(device, List): | if isinstance(device, List): | ||||
return PaddleFleetDriver(model, device, **kwargs) | return PaddleFleetDriver(model, device, **kwargs) | ||||
else: | else: | ||||
@@ -7,10 +7,13 @@ from dataclasses import dataclass | |||||
import numpy as np | import numpy as np | ||||
from fastNLP.envs.env import USER_CUDA_VISIBLE_DEVICES | |||||
from .utils import _build_fp16_env, optimizer_state_to_device, DummyGradScaler | from .utils import _build_fp16_env, optimizer_state_to_device, DummyGradScaler | ||||
from fastNLP.envs.imports import _NEED_IMPORT_PADDLE | from fastNLP.envs.imports import _NEED_IMPORT_PADDLE | ||||
from fastNLP.core.drivers.driver import Driver | from fastNLP.core.drivers.driver import Driver | ||||
from fastNLP.core.utils import apply_to_collection, paddle_move_data_to_device, get_device_from_visible | |||||
from fastNLP.core.utils import apply_to_collection, paddle_move_data_to_device | |||||
from fastNLP.core.utils.paddle_utils import _convert_data_device | |||||
from fastNLP.envs import ( | from fastNLP.envs import ( | ||||
FASTNLP_SEED_WORKERS, | FASTNLP_SEED_WORKERS, | ||||
FASTNLP_MODEL_FILENAME, | FASTNLP_MODEL_FILENAME, | ||||
@@ -369,7 +372,7 @@ class PaddleDriver(Driver): | |||||
:return: 将移动到指定机器上的 batch 对象返回; | :return: 将移动到指定机器上的 batch 对象返回; | ||||
""" | """ | ||||
device = get_device_from_visible(self.data_device) | |||||
device = _convert_data_device(self.data_device) | |||||
return paddle_move_data_to_device(batch, device) | return paddle_move_data_to_device(batch, device) | ||||
@staticmethod | @staticmethod | ||||
@@ -8,10 +8,10 @@ from fastNLP.envs.imports import _NEED_IMPORT_PADDLE | |||||
from fastNLP.envs.env import USER_CUDA_VISIBLE_DEVICES | from fastNLP.envs.env import USER_CUDA_VISIBLE_DEVICES | ||||
from fastNLP.core.utils import ( | from fastNLP.core.utils import ( | ||||
auto_param_call, | auto_param_call, | ||||
get_device_from_visible, | |||||
get_paddle_gpu_str, | get_paddle_gpu_str, | ||||
get_paddle_device_id, | get_paddle_device_id, | ||||
) | ) | ||||
from fastNLP.core.utils.paddle_utils import _convert_data_device | |||||
from fastNLP.core.utils.utils import _get_fun_msg | from fastNLP.core.utils.utils import _get_fun_msg | ||||
from fastNLP.core.samplers import ( | from fastNLP.core.samplers import ( | ||||
ReproducibleBatchSampler, | ReproducibleBatchSampler, | ||||
@@ -40,9 +40,6 @@ class PaddleSingleDriver(PaddleDriver): | |||||
raise ValueError("`paddle.DataParallel` is not supported in `PaddleSingleDriver`") | raise ValueError("`paddle.DataParallel` is not supported in `PaddleSingleDriver`") | ||||
cuda_visible_devices = os.getenv(USER_CUDA_VISIBLE_DEVICES) | cuda_visible_devices = os.getenv(USER_CUDA_VISIBLE_DEVICES) | ||||
if cuda_visible_devices is None: | |||||
raise RuntimeError("`USER_CUDA_VISIBLE_DEVICES` cannot be None, please check if you have set " | |||||
"`FASTNLP_BACKEND` to 'paddle' before using FastNLP.") | |||||
if cuda_visible_devices == "": | if cuda_visible_devices == "": | ||||
device = "cpu" | device = "cpu" | ||||
logger.info("You have set `CUDA_VISIBLE_DEVICES` to '' in system environment variable, and we are gonna to" | logger.info("You have set `CUDA_VISIBLE_DEVICES` to '' in system environment variable, and we are gonna to" | ||||
@@ -54,11 +51,9 @@ class PaddleSingleDriver(PaddleDriver): | |||||
raise ValueError("Parameter `device` can not be None in `PaddleSingleDriver`.") | raise ValueError("Parameter `device` can not be None in `PaddleSingleDriver`.") | ||||
if device != "cpu": | if device != "cpu": | ||||
if isinstance(device, int): | |||||
device_id = device | |||||
else: | |||||
device_id = get_paddle_device_id(device) | |||||
os.environ["CUDA_VISIBLE_DEVICES"] = cuda_visible_devices.split(",")[device_id] | |||||
device_id = get_paddle_device_id(device) | |||||
if cuda_visible_devices is not None: | |||||
os.environ["CUDA_VISIBLE_DEVICES"] = cuda_visible_devices.split(",")[device_id] | |||||
self.model_device = get_paddle_gpu_str(device) | self.model_device = get_paddle_gpu_str(device) | ||||
self.local_rank = 0 | self.local_rank = 0 | ||||
@@ -69,7 +64,8 @@ class PaddleSingleDriver(PaddleDriver): | |||||
r""" | r""" | ||||
该函数用来初始化训练环境,用于设置当前训练的设备,并将模型迁移到对应设备上。 | 该函数用来初始化训练环境,用于设置当前训练的设备,并将模型迁移到对应设备上。 | ||||
""" | """ | ||||
device = get_device_from_visible(self.model_device, output_type=str) | |||||
device = _convert_data_device(self.data_device) | |||||
paddle.device.set_device(device) | paddle.device.set_device(device) | ||||
with contextlib.redirect_stdout(None): | with contextlib.redirect_stdout(None): | ||||
self.model.to(device) | self.model.to(device) | ||||
@@ -10,19 +10,18 @@ from .ddp import TorchDDPDriver | |||||
from fastNLP.core.log import logger | from fastNLP.core.log import logger | ||||
from fastNLP.envs import FASTNLP_BACKEND_LAUNCH | from fastNLP.envs import FASTNLP_BACKEND_LAUNCH | ||||
__all__ = [] | |||||
def initialize_torch_driver(driver: str, device: Optional[Union[str, "torch.device", int, List[int]]], | def initialize_torch_driver(driver: str, device: Optional[Union[str, "torch.device", int, List[int]]], | ||||
model: "torch.nn.Module", **kwargs) -> TorchDriver: | model: "torch.nn.Module", **kwargs) -> TorchDriver: | ||||
r""" | r""" | ||||
用来根据参数 `driver` 和 `device` 来确定并且初始化一个具体的 `Driver` 实例然后返回回去; | |||||
注意如果输入的 `device` 如果和 `driver` 对应不上就直接报错; | |||||
用来根据参数 ``driver` 和 ``device`` 来确定并且初始化一个具体的 ``Driver`` 实例然后返回回去; | |||||
:param driver: 该参数的值应为以下之一:["torch", "torch_ddp", "fairscale"]; | |||||
:param device: 该参数的格式与 `Trainer` 对参数 `device` 的要求一致; | |||||
:param driver: 该参数的值应为以下之一:``["torch", "fairscale"]``; | |||||
:param device: 该参数的格式与 ``Trainer`` 对参数 ``device`` 的要求一致; | |||||
:param model: 训练或者评测的具体的模型; | :param model: 训练或者评测的具体的模型; | ||||
:return: 返回一个元组,元组的第一个值是具体的基于 pytorch 的 `Driver` 实例,元组的第二个值是该 driver 的名字(用于检测一个脚本中 | |||||
先后 driver 的次序的正确问题); | |||||
:return: 返回一个 :class:`~fastNLP.core.TorchSingleDriver` 或 :class:`~fastNLP.core.TorchDDPDriver` 实例; | |||||
""" | """ | ||||
# world_size 和 rank | # world_size 和 rank | ||||
if FASTNLP_BACKEND_LAUNCH in os.environ: | if FASTNLP_BACKEND_LAUNCH in os.environ: | ||||
@@ -1,12 +1,14 @@ | |||||
import os | |||||
from typing import List, Any | from typing import List, Any | ||||
import numpy as np | import numpy as np | ||||
from fastNLP.core.metrics.backend import Backend | from fastNLP.core.metrics.backend import Backend | ||||
from fastNLP.core.utils.paddle_utils import paddle_to, get_device_from_visible | |||||
from fastNLP.core.utils.paddle_utils import paddle_to, _convert_data_device | |||||
from fastNLP.core.metrics.utils import AggregateMethodError | from fastNLP.core.metrics.utils import AggregateMethodError | ||||
from fastNLP.core.drivers.paddle_driver.dist_utils import fastnlp_paddle_all_gather | from fastNLP.core.drivers.paddle_driver.dist_utils import fastnlp_paddle_all_gather | ||||
from fastNLP.envs.imports import _NEED_IMPORT_PADDLE | from fastNLP.envs.imports import _NEED_IMPORT_PADDLE | ||||
from fastNLP.envs.env import USER_CUDA_VISIBLE_DEVICES | |||||
if _NEED_IMPORT_PADDLE: | if _NEED_IMPORT_PADDLE: | ||||
import paddle | import paddle | ||||
@@ -79,7 +81,7 @@ class PaddleBackend(Backend): | |||||
raise ValueError(f"tensor: {tensor} can not convert to ndarray!") | raise ValueError(f"tensor: {tensor} can not convert to ndarray!") | ||||
def move_tensor_to_device(self, tensor, device): | def move_tensor_to_device(self, tensor, device): | ||||
device = get_device_from_visible(device) | |||||
device = _convert_data_device(device) | |||||
return paddle_to(tensor, device) | return paddle_to(tensor, device) | ||||
def all_gather_object(self, obj, group=None) -> List: | def all_gather_object(self, obj, group=None) -> List: | ||||
@@ -2,7 +2,6 @@ __all__ = [ | |||||
'cache_results', | 'cache_results', | ||||
'is_jittor_dataset', | 'is_jittor_dataset', | ||||
'jittor_collate_wraps', | 'jittor_collate_wraps', | ||||
'get_device_from_visible', | |||||
'paddle_to', | 'paddle_to', | ||||
'paddle_move_data_to_device', | 'paddle_move_data_to_device', | ||||
'get_paddle_device_id', | 'get_paddle_device_id', | ||||
@@ -28,7 +27,7 @@ __all__ = [ | |||||
from .cache_results import cache_results | from .cache_results import cache_results | ||||
from .jittor_utils import is_jittor_dataset, jittor_collate_wraps | from .jittor_utils import is_jittor_dataset, jittor_collate_wraps | ||||
from .paddle_utils import get_device_from_visible, paddle_to, paddle_move_data_to_device, get_paddle_device_id, get_paddle_gpu_str, is_in_paddle_dist, \ | |||||
from .paddle_utils import paddle_to, paddle_move_data_to_device, get_paddle_device_id, get_paddle_gpu_str, is_in_paddle_dist, \ | |||||
is_in_fnlp_paddle_dist, is_in_paddle_launch_dist | is_in_fnlp_paddle_dist, is_in_paddle_launch_dist | ||||
from .rich_progress import f_rich_progress | from .rich_progress import f_rich_progress | ||||
from .torch_utils import torch_move_data_to_device | from .torch_utils import torch_move_data_to_device | ||||
@@ -15,6 +15,12 @@ from fastNLP.core.dataset import Instance | |||||
def is_jittor_dataset(dataset) -> bool: | def is_jittor_dataset(dataset) -> bool: | ||||
""" | |||||
判断传入的 ``dataset`` 是否是 :class:`jittor.dataset.Dataset` 类型 | |||||
:param dataset: 数据集; | |||||
:return: 当前 ``dataset`` 是否为 ``jittor`` 的数据集类型; | |||||
""" | |||||
try: | try: | ||||
if isinstance(dataset, jt.dataset.Dataset): | if isinstance(dataset, jt.dataset.Dataset): | ||||
return True | return True | ||||
@@ -26,7 +32,8 @@ def is_jittor_dataset(dataset) -> bool: | |||||
def jittor_collate_wraps(func, auto_collator: Callable): | def jittor_collate_wraps(func, auto_collator: Callable): | ||||
""" | """ | ||||
对jittor的collate_fn进行wrap封装, 如果数据集为mapping类型,那么采用auto_collator,否则还是采用jittor自带的collate_batch | |||||
对 ``jittor`` 的 ``collate_fn`` 进行 ``wrap`` 封装,。如果数据集为 ``mapping`` 类型,那么采用 ``auto_collator`` ,否则 | |||||
还是采用 ``jittor`` 的 ``collate_batch``。 | |||||
:param func: | :param func: | ||||
:param auto_collator: | :param auto_collator: | ||||
@@ -1,5 +1,4 @@ | |||||
__all__ = [ | __all__ = [ | ||||
"get_device_from_visible", | |||||
"paddle_to", | "paddle_to", | ||||
"paddle_move_data_to_device", | "paddle_move_data_to_device", | ||||
"get_paddle_gpu_str", | "get_paddle_gpu_str", | ||||
@@ -21,73 +20,90 @@ if _NEED_IMPORT_PADDLE: | |||||
from .utils import apply_to_collection | from .utils import apply_to_collection | ||||
def get_device_from_visible(device: Union[str, int], output_type=int): | |||||
def _convert_data_device(device: Union[str, int]) -> str: | |||||
""" | """ | ||||
在有 CUDA_VISIBLE_DEVICES 的情况下,获取对应的设备。 | |||||
如 CUDA_VISIBLE_DEVICES=2,3 ,device=3 ,则返回1。 | |||||
用于转换 ``driver`` 的 ``data_device`` 的函数。如果用户设置了 ``FASTNLP_BACKEND=paddle``,那么 ``fastNLP`` 会将 | |||||
可见的设备保存在 ``USER_CUDA_VISIBLE_DEVICES`` 中,并且将 ``CUDA_VISIBLE_DEVICES`` 设置为可见的第一张显卡;这是为 | |||||
了顺利执行 ``paddle`` 的分布式训练而设置的。 | |||||
在这种情况下,单纯使用 ``driver.data_device`` 是无效的。比如在分布式训练中将设备设置为 ``[0,2,3]`` ,且用户设置了 | |||||
``CUDA_VISIBLE_DEVICES=3,4,5,6`` ,那么在 ``rank1``的进程中有:: | |||||
:param device: 未转化的设备名 | |||||
:param output_type: 返回值的类型 | |||||
:return: 转化后的设备id | |||||
""" | |||||
if output_type not in [int, str]: | |||||
raise ValueError("Parameter `output_type` should be one of these types: [int, str]") | |||||
if device == "cpu": | |||||
return device | |||||
cuda_visible_devices = os.getenv("CUDA_VISIBLE_DEVICES") | |||||
user_visible_devices = os.getenv(USER_CUDA_VISIBLE_DEVICES) | |||||
if user_visible_devices is None: | |||||
raise RuntimeError("`USER_CUDA_VISIBLE_DEVICES` cannot be None, please check if you have set " | |||||
"`FASTNLP_BACKEND` to 'paddle' before using FastNLP.") | |||||
idx = get_paddle_device_id(device) | |||||
# 利用 USER_CUDA_VISIBLDE_DEVICES 获取用户期望的设备 | |||||
if user_visible_devices is None: | |||||
raise RuntimeError("This situation cannot happen, please report a bug to us.") | |||||
idx = user_visible_devices.split(",")[idx] | |||||
cuda_visible_devices_list = cuda_visible_devices.split(',') | |||||
if idx not in cuda_visible_devices_list: | |||||
raise ValueError(f"Can't find your devices {idx} in CUDA_VISIBLE_DEVICES[{cuda_visible_devices}]. ") | |||||
res = cuda_visible_devices_list.index(idx) | |||||
if output_type == int: | |||||
return res | |||||
else: | |||||
return f"gpu:{res}" | |||||
os.environ["CUDA_VISIBLE_DEVICES"] = "5" | |||||
os.environ["USER_CUDA_VISIBLE_DEVICES"] = "3,4,5,6" | |||||
driver.data_device = "gpu:2" # 为了向用户正确地反映他们设置的设备减少歧义,因此这里没有设置为 "gpu:5" | |||||
此时我们便需要通过这个函数将 ``data_device`` 转换为 ``gpu:0``。具体过程便是通过索引 **2** 在 ``USER_CUDA_VISIBLE_DEVICES`` 中 | |||||
找到设备 **5**,然后在 ``CUDA_VISIBLE_DEVICES`` 中找到设备 **5** 的索引 **0** 返回。 | |||||
.. note:: | |||||
def paddle_to(data, device: Union[str, int]): | |||||
在分布式单进程仅支持单卡的情况下中,这个函数实际等同于直接转换为 ``gpu:0`` 返回。 | |||||
:param device: 未转化的设备; | |||||
:return: 转化后的设备,格式为 ``gpu:x``; | |||||
""" | |||||
try: | |||||
user_visible_devices = os.getenv(USER_CUDA_VISIBLE_DEVICES) | |||||
if device == "cpu" or user_visible_devices is None: | |||||
# 传入的是 CPU,或者没有设置 USER_CUDA_VISIBLE_DEVICES | |||||
# 此时不需要进行转换 | |||||
return get_paddle_gpu_str(device) | |||||
idx = get_paddle_device_id(device) | |||||
idx = user_visible_devices.split(",")[idx] | |||||
# 此时 CUDA_VISIBLE_DEVICES 一定不是 None | |||||
cuda_visible_devices_list = os.getenv("CUDA_VISIBLE_DEVICES").split(',') | |||||
return f"gpu:{cuda_visible_devices_list.index(idx)}" | |||||
except Exception as e: | |||||
raise ValueError(f"Can't convert device {device} when USER_CUDA_VISIBLE_DEVICES={user_visible_devices} " | |||||
"and CUDA_VISIBLE_DEVICES={cuda_visible_devices}. If this situation happens, please report this bug to us.") | |||||
def paddle_to(data: "paddle.Tensor", device: Union[str, int]) -> "paddle.Tensor": | |||||
""" | """ | ||||
将 `data` 迁移到指定的 `device` 上 | |||||
将 ``data`` 迁移到指定的 ``device`` 上。``paddle.Tensor`` 没有类似 ``torch.Tensor`` 的 ``to`` 函数,该函数 | |||||
只是集成了 :func:`paddle.Tensor.cpu` 和 :func:`paddle.Tensor.cuda` 两个函数。 | |||||
:param data: 要迁移的张量 | |||||
:param device: 目标设备,可以是 `str` 或 `int` | |||||
:return: 迁移后的张量 | |||||
:param data: 要迁移的张量; | |||||
:param device: 目标设备,可以是 ``str`` 或 ``int`` 类型; | |||||
:return: 迁移后的张量; | |||||
""" | """ | ||||
if device == "cpu": | if device == "cpu": | ||||
return data.cpu() | return data.cpu() | ||||
else: | else: | ||||
# device = get_device_from_visible(device, output_type=int) | |||||
return data.cuda(get_paddle_device_id(device)) | return data.cuda(get_paddle_device_id(device)) | ||||
def get_paddle_gpu_str(device: Union[str, int]): | |||||
def get_paddle_gpu_str(device: Union[str, int]) -> str: | |||||
""" | """ | ||||
获得 `gpu:x` 类型的设备名 | |||||
获得 ``gpu:x`` 格式的设备名:: | |||||
:param device: 设备编号或设备名 | |||||
:return: 返回对应的 `gpu:x` 格式的设备名 | |||||
>>> get_paddle_gpu_str(1) | |||||
'gpu:1' | |||||
>>> get_paddle_gpu_str("cuda:1") | |||||
'gpu:1' | |||||
:param device: 设备编号或设备名; | |||||
:return: 返回对应的 ``gpu:x`` 格式的设备名; | |||||
""" | """ | ||||
if isinstance(device, str): | if isinstance(device, str): | ||||
return device.replace("cuda", "gpu") | return device.replace("cuda", "gpu") | ||||
return f"gpu:{device}" | return f"gpu:{device}" | ||||
def get_paddle_device_id(device: Union[str, int]): | |||||
def get_paddle_device_id(device: Union[str, int]) -> int: | |||||
""" | """ | ||||
获得 gpu 的设备id | |||||
获得 ``device`` 的设备编号:: | |||||
>>> get_paddle_device_id("gpu:1") | |||||
1 | |||||
>>> get_paddle_device_id("gpu") | |||||
0 | |||||
请注意不要向这个函数中传入 ``cpu``。 | |||||
:param: device: 设备编号或设备名 | |||||
:return: 设备对应的编号 | |||||
:param: device: 设备编号或设备名; | |||||
:return: 设备对应的编号; | |||||
""" | """ | ||||
if isinstance(device, int): | if isinstance(device, int): | ||||
return device | return device | ||||
@@ -109,21 +125,17 @@ def get_paddle_device_id(device: Union[str, int]): | |||||
return device_id | return device_id | ||||
def paddle_move_data_to_device(batch: Any, device: Optional[str] = None, | |||||
data_device: Optional[str] = None) -> Any: | |||||
def paddle_move_data_to_device(batch: Any, device: Optional[Union[str, int]]) -> Any: | |||||
r""" | r""" | ||||
将数据集合传输到给定设备。只有paddle.Tensor对象会被传输到设备中,其余保持不变 | |||||
将 ``paddle`` 的数据集合传输到给定设备。只有 :class:`paddle.Tensor` 对象会被传输到设备中,其余保持不变。 | |||||
:param batch: | |||||
:param device: `cpu`, `gpu` or `gpu:x` | |||||
:param data_device: | |||||
:return: 相同的集合,但所有包含的张量都驻留在新设备上; | |||||
:param batch: 需要进行迁移的数据集合; | |||||
:param device: 目标设备。可以是显卡设备的编号,或是``cpu``, ``gpu`` 或 ``gpu:x`` 格式的字符串;当这个参数 | |||||
为 `None`` 时,不会执行任何操作。 | |||||
:return: 迁移到新设备上的数据集合; | |||||
""" | """ | ||||
if device is None: | if device is None: | ||||
if data_device is not None: | |||||
device = data_device | |||||
else: | |||||
return batch | |||||
return batch | |||||
def batch_to(data: Any) -> Any: | def batch_to(data: Any) -> Any: | ||||
return paddle_to(data, device) | return paddle_to(data, device) | ||||
@@ -131,22 +143,22 @@ def paddle_move_data_to_device(batch: Any, device: Optional[str] = None, | |||||
return apply_to_collection(batch, dtype=paddle.Tensor, function=batch_to) | return apply_to_collection(batch, dtype=paddle.Tensor, function=batch_to) | ||||
def is_in_paddle_dist(): | |||||
def is_in_paddle_dist() -> bool: | |||||
""" | """ | ||||
判断是否处于分布式的进程下,使用 global_rank 和 selected_gpus 判断 | |||||
判断是否处于 ``paddle`` 分布式的进程下,使用 ``PADDLE_RANK_IN_NODE`` 和 ``FLAGS_selected_gpus`` 判断。 | |||||
""" | """ | ||||
return ('PADDLE_RANK_IN_NODE' in os.environ and 'FLAGS_selected_gpus' in os.environ) | return ('PADDLE_RANK_IN_NODE' in os.environ and 'FLAGS_selected_gpus' in os.environ) | ||||
def is_in_fnlp_paddle_dist(): | |||||
def is_in_fnlp_paddle_dist() -> bool: | |||||
""" | """ | ||||
判断是否处于 FastNLP 拉起的分布式进程中 | |||||
判断是否处于 ``fastNLP`` 拉起的 ``paddle`` 分布式进程中 | |||||
""" | """ | ||||
return FASTNLP_DISTRIBUTED_CHECK in os.environ | return FASTNLP_DISTRIBUTED_CHECK in os.environ | ||||
def is_in_paddle_launch_dist(): | |||||
def is_in_paddle_launch_dist() -> bool: | |||||
""" | """ | ||||
判断是否处于 launch 启动的分布式进程中 | |||||
判断是否处于 ``python -m paddle.distributed.launch`` 方法启动的 ``paddle`` 分布式进程中 | |||||
""" | """ | ||||
return FASTNLP_BACKEND_LAUNCH in os.environ | return FASTNLP_BACKEND_LAUNCH in os.environ |
@@ -44,12 +44,12 @@ class TorchTransferableDataType(ABC): | |||||
def torch_move_data_to_device(batch: Any, device: Optional[Union[str, "torch.device"]] = None, | def torch_move_data_to_device(batch: Any, device: Optional[Union[str, "torch.device"]] = None, | ||||
non_blocking: Optional[bool] = True) -> Any: | non_blocking: Optional[bool] = True) -> Any: | ||||
r""" | r""" | ||||
将数据集合传输到给定设备。任何定义方法 “to(device)” 的对象都将被移动并且集合中的所有其他对象将保持不变; | |||||
在 ``pytorch`` 中将数据集合 ``batch`` 传输到给定设备。任何定义方法 ``to(device)`` 的对象都将被移动并且集合中的所有其他对象将保持不变; | |||||
:param batch: 应当迁移的数据; | |||||
:param device: 数据应当迁移到的设备;当该参数的值为 None 时,表示迁移数据的操作由用户自己完成,我们不需要经管; | |||||
:param non_blocking: pytorch 的迁移数据方法 `to` 的参数; | |||||
:return: 相同的集合,但所有包含的张量都驻留在新设备上; | |||||
:param batch: 需要迁移的数据; | |||||
:param device: 数据应当迁移到的设备;当该参数的值为 ``None`` 时则不执行任何操作; | |||||
:param non_blocking: ``pytorch`` 的数据迁移方法 ``to`` 的参数; | |||||
:return: 迁移到新设备上的数据集合; | |||||
""" | """ | ||||
if device is None: | if device is None: | ||||
return batch | return batch | ||||
@@ -38,10 +38,16 @@ __all__ = [ | |||||
def get_fn_arg_names(fn: Callable) -> List[str]: | def get_fn_arg_names(fn: Callable) -> List[str]: | ||||
r""" | r""" | ||||
返回一个函数所有参数的名字 | |||||
该函数可以返回一个函数所有参数的名字:: | |||||
:param fn: 需要查询的函数 | |||||
:return: 一个列表,其中的元素是函数 ``fn`` 参数的字符串名字 | |||||
>>> def function(a, b=1): | |||||
... return a | |||||
... | |||||
>>> get_fn_arg_names(function) | |||||
['a', 'b'] | |||||
:param fn: 需要查询的函数; | |||||
:return: 包含函数 ``fn`` 参数名的列表; | |||||
""" | """ | ||||
return list(inspect.signature(fn).parameters) | return list(inspect.signature(fn).parameters) | ||||
@@ -49,7 +55,7 @@ def get_fn_arg_names(fn: Callable) -> List[str]: | |||||
def auto_param_call(fn: Callable, *args, signature_fn: Optional[Callable] = None, | def auto_param_call(fn: Callable, *args, signature_fn: Optional[Callable] = None, | ||||
mapping: Optional[Dict[AnyStr, AnyStr]] = None) -> Any: | mapping: Optional[Dict[AnyStr, AnyStr]] = None) -> Any: | ||||
r""" | r""" | ||||
该函数会根据输入函数的形参名从 ``*args`` (因此都需要是 ``dict`` 类型)中找到匹配的值进行调用,如果传入的数据与 ``fn`` 的形参不匹配,可以通过 | |||||
该函数会根据输入函数的形参名从 ``*args`` (均为 ``dict`` 类型)中找到匹配的值进行调用,如果传入的数据与 ``fn`` 的形参不匹配,可以通过 | |||||
``mapping`` 参数进行转换。``mapping`` 参数中的一对 ``(key, value)`` 表示在 ``*args`` 中找到 ``key`` 对应的值,并将这个值传递给形参中名为 | ``mapping`` 参数进行转换。``mapping`` 参数中的一对 ``(key, value)`` 表示在 ``*args`` 中找到 ``key`` 对应的值,并将这个值传递给形参中名为 | ||||
``value`` 的参数。 | ``value`` 的参数。 | ||||
@@ -161,13 +167,13 @@ def _get_keys(args:List[Dict]) -> List[List[str]]: | |||||
def _get_fun_msg(fn, with_fp=True)->str: | def _get_fun_msg(fn, with_fp=True)->str: | ||||
""" | """ | ||||
获取函数的基本信息,帮助报错。 | |||||
ex: | |||||
print(_get_fun_msg(_get_fun_msg)) | |||||
# `_get_fun_msg(fn) -> str`(In file:/Users/hnyan/Desktop/projects/fastNLP/fastNLP/fastNLP/core/utils/utils.py) | |||||
获取函数的基本信息,帮助报错:: | |||||
>>>> print(_get_fun_msg(_get_fun_msg)) | |||||
`_get_fun_msg(fn) -> str`(In file:/Users/hnyan/Desktop/projects/fastNLP/fastNLP/fastNLP/core/utils/utils.py) | |||||
:param callable fn: | :param callable fn: | ||||
:param with_fp: 是否包含函数所在的文件信息。 | |||||
:param with_fp: 是否包含函数所在的文件信息; | |||||
:return: | :return: | ||||
""" | """ | ||||
if isinstance(fn, functools.partial): | if isinstance(fn, functools.partial): | ||||
@@ -224,7 +230,7 @@ def _check_valid_parameters_number(fn, expected_params:List[str], fn_name=None): | |||||
def check_user_specific_params(user_params: Dict, fn: Callable): | def check_user_specific_params(user_params: Dict, fn: Callable): | ||||
""" | """ | ||||
该函数使用用户的输入来对指定函数的参数进行赋值,主要用于一些用户无法直接调用函数的情况; | 该函数使用用户的输入来对指定函数的参数进行赋值,主要用于一些用户无法直接调用函数的情况; | ||||
该函数主要的作用在于帮助检查用户对使用函数 ``fn`` 的参数输入是否有误; | |||||
主要作用在于帮助检查用户对使用函数 ``fn`` 的参数输入是否有误; | |||||
:param user_params: 用户指定的参数的值,应当是一个字典,其中 ``key`` 表示每一个参数的名字, | :param user_params: 用户指定的参数的值,应当是一个字典,其中 ``key`` 表示每一个参数的名字, | ||||
``value`` 为每一个参数的值; | ``value`` 为每一个参数的值; | ||||
@@ -241,7 +247,7 @@ def check_user_specific_params(user_params: Dict, fn: Callable): | |||||
def dataclass_to_dict(data: "dataclasses.dataclass") -> Dict: | def dataclass_to_dict(data: "dataclasses.dataclass") -> Dict: | ||||
""" | """ | ||||
将传入的 `dataclass` 实例转换为字典。 | |||||
将传入的 ``dataclass`` 实例转换为字典。 | |||||
""" | """ | ||||
if not is_dataclass(data): | if not is_dataclass(data): | ||||
raise TypeError(f"Parameter `data` can only be `dataclass` type instead of {type(data)}.") | raise TypeError(f"Parameter `data` can only be `dataclass` type instead of {type(data)}.") | ||||
@@ -253,12 +259,12 @@ def dataclass_to_dict(data: "dataclasses.dataclass") -> Dict: | |||||
def match_and_substitute_params(mapping: Optional[Union[Callable, Dict]] = None, data: Optional[Any] = None) -> Any: | def match_and_substitute_params(mapping: Optional[Union[Callable, Dict]] = None, data: Optional[Any] = None) -> Any: | ||||
r""" | r""" | ||||
用来实现将输入的 ``batch``,或者输出的 ``outputs``,通过 ``mapping`` 将键值进行更换的功能; | |||||
用来实现将输入的 ``batch`` 或者输出的 ``outputs`` 通过 ``mapping`` 将键值进行更换的功能; | |||||
该函数应用于 ``input_mapping`` 和 ``output_mapping``; | 该函数应用于 ``input_mapping`` 和 ``output_mapping``; | ||||
对于 ``input_mapping``,该函数会在 :class:`~fastNLP.core.controllers.TrainBatchLoop` 中取完数据后立刻被调用; | |||||
对于 ``output_mapping``,该函数会在 :class:`~fastNLP.core.Trainer` 的 :meth:`~fastNLP.core.Trainer.train_step` | |||||
以及 :class:`~fastNLP.core.Evaluator` 的 :meth:`~fastNLP.core.Evaluator.train_step` 中得到结果后立刻被调用; | |||||
* 对于 ``input_mapping``,该函数会在 :class:`~fastNLP.core.controllers.TrainBatchLoop` 中取完数据后立刻被调用; | |||||
* 对于 ``output_mapping``,该函数会在 :class:`~fastNLP.core.Trainer` 的 :meth:`~fastNLP.core.Trainer.train_step` | |||||
以及 :class:`~fastNLP.core.Evaluator` 的 :meth:`~fastNLP.core.Evaluator.train_step` 中得到结果后立刻被调用; | |||||
转换的逻辑按优先级依次为: | 转换的逻辑按优先级依次为: | ||||
@@ -277,9 +283,9 @@ def match_and_substitute_params(mapping: Optional[Union[Callable, Dict]] = None, | |||||
然后使用 ``mapping`` 对这个 ``Dict`` 进行转换,如果没有匹配上 ``mapping`` 中的 ``key`` 则保持 ``\'\_number\'`` 这个形式。 | 然后使用 ``mapping`` 对这个 ``Dict`` 进行转换,如果没有匹配上 ``mapping`` 中的 ``key`` 则保持 ``\'\_number\'`` 这个形式。 | ||||
:param mapping: 用于转换的字典或者函数;``mapping`` 是函数时,返回值必须为字典类型。 | |||||
:param mapping: 用于转换的字典或者函数;当 ``mapping`` 是函数时,返回值必须为字典类型; | |||||
:param data: 需要被转换的对象; | :param data: 需要被转换的对象; | ||||
:return: 返回转换好的结果; | |||||
:return: 返回转换后的结果; | |||||
""" | """ | ||||
if mapping is None: | if mapping is None: | ||||
return data | return data | ||||
@@ -331,19 +337,19 @@ def apply_to_collection( | |||||
**kwargs: Any, | **kwargs: Any, | ||||
) -> Any: | ) -> Any: | ||||
""" | """ | ||||
使用函数 ``function`` 递归地在 ``data`` 中的元素执行,但是仅在满足元素为 ``dtype`` 时执行。 | |||||
递归地对 ``data`` 中的元素执行函数 ``function``,且仅在满足元素为 ``dtype`` 时执行。 | |||||
该函数参考了 `pytorch-lightning <https://github.com/PyTorchLightning/pytorch-lightning>`_ 的实现 | 该函数参考了 `pytorch-lightning <https://github.com/PyTorchLightning/pytorch-lightning>`_ 的实现 | ||||
:param data: 需要进行处理的数据集合或数据 | |||||
:param dtype: 数据的类型,函数 ``function`` 只会被应用于 ``data`` 中类型为 ``dtype`` 的数据 | |||||
:param function: 对数据进行处理的函数 | |||||
:param args: ``function`` 所需要的其它参数 | |||||
:param data: 需要进行处理的数据集合或数据; | |||||
:param dtype: 数据的类型,函数 ``function`` 只会被应用于 ``data`` 中类型为 ``dtype`` 的数据; | |||||
:param function: 对数据进行处理的函数; | |||||
:param args: ``function`` 所需要的其它参数; | |||||
:param wrong_dtype: ``function`` 一定不会生效的数据类型。如果数据既是 ``wrong_dtype`` 类型又是 ``dtype`` 类型 | :param wrong_dtype: ``function`` 一定不会生效的数据类型。如果数据既是 ``wrong_dtype`` 类型又是 ``dtype`` 类型 | ||||
那么也不会生效。 | |||||
:param include_none: 是否包含执行结果为 ``None`` 的数据,默认为 ``True``。 | |||||
:param kwargs: ``function`` 所需要的其它参数 | |||||
:return: 经过 ``function`` 处理后的数据集合 | |||||
那么也不会生效; | |||||
:param include_none: 是否包含执行结果为 ``None`` 的数据,默认为 ``True``; | |||||
:param kwargs: ``function`` 所需要的其它参数; | |||||
:return: 经过 ``function`` 处理后的数据集合; | |||||
""" | """ | ||||
# Breaking condition | # Breaking condition | ||||
if isinstance(data, dtype) and (wrong_dtype is None or not isinstance(data, wrong_dtype)): | if isinstance(data, dtype) and (wrong_dtype is None or not isinstance(data, wrong_dtype)): | ||||
@@ -411,20 +417,20 @@ def apply_to_collection( | |||||
@contextmanager | @contextmanager | ||||
def nullcontext(): | def nullcontext(): | ||||
r""" | r""" | ||||
实现一个什么都不做的上下文环境 | |||||
实现一个什么都不做的上下文环境。 | |||||
""" | """ | ||||
yield | yield | ||||
def sub_column(string: str, c: int, c_size: int, title: str) -> str: | def sub_column(string: str, c: int, c_size: int, title: str) -> str: | ||||
r""" | r""" | ||||
对传入的字符串进行截断,方便在命令行中显示 | |||||
对传入的字符串进行截断,方便在命令行中显示。 | |||||
:param string: 要被截断的字符串 | |||||
:param c: 命令行列数 | |||||
:param c_size: :class:`~fastNLP.core.Instance` 或 :class:`fastNLP.core.DataSet` 的 ``field`` 数目 | |||||
:param title: 列名 | |||||
:return: 对一个过长的列进行截断的结果 | |||||
:param string: 要被截断的字符串; | |||||
:param c: 命令行列数; | |||||
:param c_size: :class:`~fastNLP.core.Instance` 或 :class:`fastNLP.core.DataSet` 的 ``field`` 数目; | |||||
:param title: 列名; | |||||
:return: 对一个过长的列进行截断的结果; | |||||
""" | """ | ||||
avg = max(int(c / c_size / 2), len(title)) | avg = max(int(c / c_size / 2), len(title)) | ||||
string = str(string) | string = str(string) | ||||
@@ -453,7 +459,7 @@ def _is_iterable(value): | |||||
def pretty_table_printer(dataset_or_ins) -> PrettyTable: | def pretty_table_printer(dataset_or_ins) -> PrettyTable: | ||||
r""" | r""" | ||||
在 ``fastNLP`` 中展示数据的函数:: | |||||
用于在 ``fastNLP`` 中展示数据的函数:: | |||||
>>> ins = Instance(field_1=[1, 1, 1], field_2=[2, 2, 2], field_3=["a", "b", "c"]) | >>> ins = Instance(field_1=[1, 1, 1], field_2=[2, 2, 2], field_3=["a", "b", "c"]) | ||||
+-----------+-----------+-----------------+ | +-----------+-----------+-----------------+ | ||||
@@ -462,8 +468,8 @@ def pretty_table_printer(dataset_or_ins) -> PrettyTable: | |||||
| [1, 1, 1] | [2, 2, 2] | ['a', 'b', 'c'] | | | [1, 1, 1] | [2, 2, 2] | ['a', 'b', 'c'] | | ||||
+-----------+-----------+-----------------+ | +-----------+-----------+-----------------+ | ||||
:param dataset_or_ins: 要展示的 :class:`~fastNLP.core.DataSet` 或者 :class:`~fastNLP.core.Instance` | |||||
:return: 根据 ``terminal`` 大小进行自动截断的数据表格 | |||||
:param dataset_or_ins: 要展示的 :class:`~fastNLP.core.DataSet` 或者 :class:`~fastNLP.core.Instance` 实例; | |||||
:return: 根据命令行大小进行自动截断的数据表格; | |||||
""" | """ | ||||
x = PrettyTable() | x = PrettyTable() | ||||
try: | try: | ||||
@@ -529,7 +535,7 @@ def deprecated(help_message: Optional[str] = None): | |||||
""" | """ | ||||
标记当前功能已经过时的装饰器。 | 标记当前功能已经过时的装饰器。 | ||||
:param help_message: 一段指引信息,告知用户如何将代码切换为当前版本提倡的用法。 | |||||
:param help_message: 一段指引信息,告知用户如何将代码切换为当前版本提倡的用法; | |||||
""" | """ | ||||
def decorator(deprecated_function: Callable): | def decorator(deprecated_function: Callable): | ||||
@@ -578,10 +584,10 @@ def seq_len_to_mask(seq_len, max_len: Optional[int]): | |||||
>>>print(mask.size()) | >>>print(mask.size()) | ||||
torch.Size([14, 100]) | torch.Size([14, 100]) | ||||
:param seq_len: 大小为是 ``(B,)`` 的长度序列 | |||||
:param int max_len: 将长度 ``pad`` 到 ``max_len``。默认情况(为 ``None``)使用的是 ``seq_len`` 中最长的长度。 | |||||
:param seq_len: 大小为 ``(B,)`` 的长度序列; | |||||
:param int max_len: 将长度补齐或截断到 ``max_len``。默认情况(为 ``None``)使用的是 ``seq_len`` 中最长的长度; | |||||
但在 :class:`torch.nn.DataParallel` 等分布式的场景下可能不同卡的 ``seq_len`` 会有区别,所以需要传入 | 但在 :class:`torch.nn.DataParallel` 等分布式的场景下可能不同卡的 ``seq_len`` 会有区别,所以需要传入 | ||||
一个 ``max_len`` 使得 ``mask`` 的长度 ``pad`` 到该长度。 | |||||
一个 ``max_len`` 使得 ``mask`` 的补齐或截断到该长度。 | |||||
:return: 大小为 ``(B, max_len)`` 的 ``mask``, 元素类型为 ``bool`` 或 ``uint8`` | :return: 大小为 ``(B, max_len)`` 的 ``mask``, 元素类型为 ``bool`` 或 ``uint8`` | ||||
""" | """ | ||||
if isinstance(seq_len, np.ndarray): | if isinstance(seq_len, np.ndarray): | ||||
@@ -51,23 +51,33 @@ def _set_backend(): | |||||
assert _module_available(backend), f"You must have {backend} available to use {backend} backend." | assert _module_available(backend), f"You must have {backend} available to use {backend} backend." | ||||
assert 'paddle' not in sys.modules, "You have to use `set_backend()` before `import paddle`." | assert 'paddle' not in sys.modules, "You have to use `set_backend()` before `import paddle`." | ||||
user_visible_devices = os.getenv(USER_CUDA_VISIBLE_DEVICES) | user_visible_devices = os.getenv(USER_CUDA_VISIBLE_DEVICES) | ||||
cuda_visible_devices = os.getenv("CUDA_VISIBLE_DEVICES") | |||||
if 'PADDLE_RANK_IN_NODE' in os.environ and 'FLAGS_selected_gpus' in os.environ: | if 'PADDLE_RANK_IN_NODE' in os.environ and 'FLAGS_selected_gpus' in os.environ: | ||||
# 在分布式子进程下,根据 USER_VISIBLE_DEVICES 得到进程真正占有的设备 | # 在分布式子进程下,根据 USER_VISIBLE_DEVICES 得到进程真正占有的设备 | ||||
selected_gpus = os.environ['FLAGS_selected_gpus'].split(',') | selected_gpus = os.environ['FLAGS_selected_gpus'].split(',') | ||||
if user_visible_devices is not None: | if user_visible_devices is not None: | ||||
# 用户通过 CUDA_VISIBLE_DEVICES 启动了分布式训练 | |||||
# 用户使用 fastNLP 启动了分布式训练 | |||||
# 此时经过 set_backend,用户的设置会保存在 USER_CUDA_VISIBLE_DEVICES 中 | # 此时经过 set_backend,用户的设置会保存在 USER_CUDA_VISIBLE_DEVICES 中 | ||||
# 我们需要从中找到真正使用的设备编号 | |||||
# 我们需要从中转换为用户找到真正使用的设备编号 | |||||
user_visible_devices = user_visible_devices.split(",") | user_visible_devices = user_visible_devices.split(",") | ||||
selected_gpus = ",".join([user_visible_devices[int(i)] for i in selected_gpus]) | |||||
selected_gpus = [user_visible_devices[int(i)] for i in selected_gpus] | |||||
# 没有找到 USER_CUDA_VISIBLE_DEVICES,说明用户是直接用 launch 启动的 | |||||
elif cuda_visible_devices: | |||||
# 用户设置了可见设备,需要进行转换 | |||||
# 如 CUDA_VISIBLE_DEVICES = 0,2,3 --gpus=0,2,3 | |||||
# 在 rank1 中此时 selected_gpus = ['1'],需要转换为设备 2 | |||||
os.environ[USER_CUDA_VISIBLE_DEVICES] = cuda_visible_devices | |||||
cuda_visible_devices = cuda_visible_devices.split(",") | |||||
selected_gpus = [cuda_visible_devices[int(i)] for i in selected_gpus] | |||||
else: | else: | ||||
# 没有找到 USER_CUDA_VISIBLE_DEVICES,则将之设置为所有的设备 | |||||
# 用户没有设置可见设备,则赋值成所有的设备 | |||||
os.environ[USER_CUDA_VISIBLE_DEVICES] = ",".join(map(str, list( | os.environ[USER_CUDA_VISIBLE_DEVICES] = ",".join(map(str, list( | ||||
range(get_gpu_count()) | range(get_gpu_count()) | ||||
))) | ))) | ||||
os.environ['CUDA_VISIBLE_DEVICES'] = ",".join(selected_gpus) | os.environ['CUDA_VISIBLE_DEVICES'] = ",".join(selected_gpus) | ||||
os.environ['FLAGS_selected_gpus'] = ",".join([str(g) for g in range(len(selected_gpus))]) | os.environ['FLAGS_selected_gpus'] = ",".join([str(g) for g in range(len(selected_gpus))]) | ||||
os.environ['FLAGS_selected_accelerators'] = ",".join([str(g) for g in range(len(selected_gpus))]) | os.environ['FLAGS_selected_accelerators'] = ",".join([str(g) for g in range(len(selected_gpus))]) | ||||
elif 'CUDA_VISIBLE_DEVICES' in os.environ: | elif 'CUDA_VISIBLE_DEVICES' in os.environ: | ||||
# 主进程中,用户设置了 CUDA_VISIBLE_DEVICES | # 主进程中,用户设置了 CUDA_VISIBLE_DEVICES | ||||
# 将用户设置的 CUDA_VISIBLE_DEVICES hack 掉 | # 将用户设置的 CUDA_VISIBLE_DEVICES hack 掉 | ||||
@@ -91,6 +101,11 @@ def _set_backend(): | |||||
elif backend == 'torch': | elif backend == 'torch': | ||||
assert _module_available(backend), f"You must have {backend} available to use {backend} backend." | assert _module_available(backend), f"You must have {backend} available to use {backend} backend." | ||||
if 'PADDLE_RANK_IN_NODE' in os.environ and 'FLAGS_selected_gpus' in os.environ \ | |||||
and "USER_CUDA_VISIBLE_DEVICES" not in os.environ: | |||||
# 当用户没有设置 backend 并且使用 launch 启动了多卡,应该提醒用户进行设置 | |||||
raise RuntimeError("To run paddle distributed training, please set `FASTNLP_BACKEND` to 'paddle' before using FastNLP.") | |||||
def set_env(global_seed=None): | def set_env(global_seed=None): | ||||
""" | """ | ||||
@@ -0,0 +1,9 @@ | |||||
__all__ = [ | |||||
# "MixModule", | |||||
"torch2paddle", | |||||
"paddle2torch", | |||||
"torch2jittor", | |||||
"jittor2torch", | |||||
] | |||||
from .mix_modules import torch2paddle, paddle2torch, torch2jittor, jittor2torch |
@@ -0,0 +1,10 @@ | |||||
__all__ = [ | |||||
# "MixModule", | |||||
"torch2paddle", | |||||
"paddle2torch", | |||||
"torch2jittor", | |||||
"jittor2torch", | |||||
] | |||||
# from .mix_module import MixModule | |||||
from .utils import * |
@@ -1,7 +1,15 @@ | |||||
""" | """ | ||||
这个文件测试用户以python -m paddle.distributed.launch 启动的情况 | |||||
看看有没有用pytest执行的机会 | |||||
FASTNLP_BACKEND=paddle python -m paddle.distributed.launch --gpus=0,2,3 _test_trainer_fleet.py | |||||
这个文件测试多卡情况下使用 paddle 的情况:: | |||||
>>> # 测试用 python -m paddle.distributed.launch 启动 | |||||
>>> FASTNLP_BACKEND=paddle python -m paddle.distributed.launch --gpus=0,2,3 _test_trainer_fleet.py | |||||
>>> # 测试在限制 GPU 的情况下用 python -m paddle.distributed.launch 启动 | |||||
>>> CUDA_VISIBLE_DEVICES=0,2,3 FASTNLP_BACKEND=paddle python -m paddle.distributed.launch --gpus=0,2,3 _test_trainer_fleet.py | |||||
>>> # 测试直接使用多卡 | |||||
>>> FASTNLP_BACKEND=paddle python _test_trainer_fleet.py | |||||
>>> # 测试在限制 GPU 的情况下直接使用多卡 | |||||
>>> CUDA_VISIBLE_DEVICES=3,4,5,6 FASTNLP_BACKEND=paddle python _test_trainer_fleet.py | |||||
""" | """ | ||||
import os | import os | ||||
import sys | import sys | ||||
@@ -71,14 +79,13 @@ def test_trainer_fleet( | |||||
n_epochs=n_epochs, | n_epochs=n_epochs, | ||||
callbacks=callbacks, | callbacks=callbacks, | ||||
output_from_new_proc="logs", | |||||
# output_from_new_proc="logs", | |||||
) | ) | ||||
trainer.run() | trainer.run() | ||||
if __name__ == "__main__": | if __name__ == "__main__": | ||||
driver = "paddle" | driver = "paddle" | ||||
device = [0,2,3] | |||||
# driver = "paddle" | |||||
device = [0,1,3] | |||||
# device = 2 | # device = 2 | ||||
callbacks = [ | callbacks = [ | ||||
# RecordMetricCallback(monitor="acc#acc", metric_threshold=0.0, larger_better=True), | # RecordMetricCallback(monitor="acc#acc", metric_threshold=0.0, larger_better=True), | ||||
@@ -1,7 +1,11 @@ | |||||
""" | """ | ||||
这个文件测试用户以python -m paddle.distributed.launch 启动的情况 | |||||
并且自己初始化了 fleet | |||||
FASTNLP_BACKEND=paddle python -m paddle.distributed.launch --gpus=0,2,3 _test_trainer_fleet_outside.py | |||||
这个文件测试用户自己初始化分布式环境后使用 paddle 的情况: | |||||
>>> # 测试用 python -m paddle.distributed.launch 启动 | |||||
>>> FASTNLP_BACKEND=paddle python -m paddle.distributed.launch --gpus=0,2,3 _test_trainer_fleet_outside.py | |||||
>>> # 测试在限制 GPU 的情况下用 python -m paddle.distributed.launch 启动 | |||||
>>> CUDA_VISIBLE_DEVICES=0,2,3 FASTNLP_BACKEND=paddle python -m paddle.distributed.launch --gpus=0,2,3 _test_trainer_fleet_outside.py | |||||
""" | """ | ||||
import os | import os | ||||
import sys | import sys | ||||
@@ -77,14 +81,13 @@ def test_trainer_fleet( | |||||
n_epochs=n_epochs, | n_epochs=n_epochs, | ||||
callbacks=callbacks, | callbacks=callbacks, | ||||
output_from_new_proc="logs", | |||||
data_device=f"gpu:{os.environ['CUDA_VISIBLE_DEVICES']}" | |||||
# output_from_new_proc="logs", | |||||
) | ) | ||||
trainer.run() | trainer.run() | ||||
if __name__ == "__main__": | if __name__ == "__main__": | ||||
driver = "paddle" | driver = "paddle" | ||||
device = [0,2,3] | |||||
device = [0,1,3] | |||||
callbacks = [ | callbacks = [ | ||||
# RecordMetricCallback(monitor="acc#acc", metric_threshold=0.0, larger_better=True), | # RecordMetricCallback(monitor="acc#acc", metric_threshold=0.0, larger_better=True), | ||||
RichCallback(5), | RichCallback(5), | ||||
@@ -0,0 +1,237 @@ | |||||
import os | |||||
import sys | |||||
import time | |||||
# os.environ["cuda_archs"] = "61" | |||||
# os.environ["FAS"] | |||||
os.environ["log_silent"] = "1" | |||||
sys.path.append("../../../") | |||||
from datasets import load_dataset | |||||
from datasets import DatasetDict | |||||
import jittor as jt | |||||
from jittor import nn, Module | |||||
from jittor.dataset import Dataset | |||||
jt.flags.use_cuda = True | |||||
from fastNLP.core.controllers.trainer import Trainer | |||||
from fastNLP.core.metrics.accuracy import Accuracy | |||||
from fastNLP.core.vocabulary import Vocabulary | |||||
from fastNLP.core.callbacks.progress_callback import RichCallback | |||||
from fastNLP.core.callbacks.callback import Callback | |||||
from fastNLP.core.dataloaders.jittor_dataloader.fdl import JittorDataLoader | |||||
class TextClassificationDataset(Dataset): | |||||
def __init__(self, dataset): | |||||
super(TextClassificationDataset, self).__init__() | |||||
self.dataset = dataset | |||||
self.set_attrs(total_len=len(dataset)) | |||||
def __getitem__(self, idx): | |||||
return {"x": self.dataset["input_ids"][idx], "y": self.dataset["label"][idx]} | |||||
class LSTM(Module): | |||||
def __init__(self, num_of_words, hidden_size, features): | |||||
self.embedding = nn.Embedding(num_of_words, features) | |||||
self.lstm = nn.LSTM(features, hidden_size, batch_first=True) | |||||
self.layer = nn.Linear(hidden_size, 2) | |||||
self.softmax = nn.Softmax(dim=1) | |||||
self.loss_fn = nn.CrossEntropyLoss() | |||||
self.hidden_size = hidden_size | |||||
self.features = features | |||||
def init_hidden(self, x): | |||||
# batch_first | |||||
batch_size = x.shape[0] | |||||
h0 = jt.randn(1, batch_size, hidden_size) | |||||
c0 = jt.randn(1, batch_size, hidden_size) | |||||
return h0, c0 | |||||
def execute(self, input_ids): | |||||
output = self.embedding(input_ids) | |||||
# TODO 去除padding | |||||
output, (h, c) = self.lstm(output, self.init_hidden(output)) | |||||
# len, batch, hidden_size | |||||
output = self.layer(output[-1]) | |||||
return output | |||||
def train_step(self, x, y): | |||||
x = self(x) | |||||
outputs = self.loss_fn(x, y) | |||||
return {"loss": outputs} | |||||
def evaluate_step(self, x, y): | |||||
x = self(x) | |||||
return {"pred": x, "target": y.reshape((-1,))} | |||||
class PrintWhileTrainingCallBack(Callback): | |||||
""" | |||||
通过该Callback实现训练过程中loss的输出 | |||||
""" | |||||
def __init__(self, print_every_epoch, print_every_batch): | |||||
self.print_every_epoch = print_every_epoch | |||||
self.print_every_batch = print_every_batch | |||||
self.loss = 0 | |||||
self.start = 0 | |||||
self.epoch_start = 0 | |||||
def on_train_begin(self, trainer): | |||||
""" | |||||
在训练开始前输出信息 | |||||
""" | |||||
print("Start training. Total {} epochs and {} batches in each epoch.".format( | |||||
trainer.n_epochs, trainer.num_batches_per_epoch | |||||
)) | |||||
self.start = time.time() | |||||
def on_before_backward(self, trainer, outputs): | |||||
""" | |||||
每次反向传播前统计loss,用于计算平均值 | |||||
""" | |||||
loss = trainer.extract_loss_from_outputs(outputs) | |||||
loss = trainer.driver.tensor_to_numeric(loss) | |||||
self.loss += loss | |||||
def on_train_epoch_begin(self, trainer): | |||||
self.epoch_start = time.time() | |||||
def on_train_epoch_end(self, trainer): | |||||
""" | |||||
在每经过一定epoch或最后一个epoch时输出当前epoch的平均loss和使用时间 | |||||
""" | |||||
if trainer.cur_epoch_idx % self.print_every_epoch == 0 \ | |||||
or trainer.cur_epoch_idx == trainer.n_epochs: | |||||
print("Epoch: {} Loss: {} Current epoch training time: {}s".format( | |||||
trainer.cur_epoch_idx, self.loss / trainer.num_batches_per_epoch, time.time() - self.epoch_start | |||||
)) | |||||
# 将loss清零 | |||||
self.loss = 0 | |||||
def on_train_batch_end(self, trainer): | |||||
""" | |||||
在每经过一定batch或最后一个batch时输出当前epoch截止目前的平均loss | |||||
""" | |||||
if trainer.batch_idx_in_epoch % self.print_every_batch == 0 \ | |||||
or trainer.batch_idx_in_epoch == trainer.num_batches_per_epoch: | |||||
print("\tBatch: {} Loss: {}".format( | |||||
trainer.batch_idx_in_epoch, self.loss / trainer.batch_idx_in_epoch | |||||
)) | |||||
def on_train_end(self, trainer): | |||||
print("Total training time: {}s".format(time.time() - self.start)) | |||||
def process_data(ds: DatasetDict, vocabulary: Vocabulary, max_len=256) -> DatasetDict: | |||||
# 分词 | |||||
ds = ds.map(lambda x: {"input_ids": text_to_id(vocabulary, x["text"], max_len)}) | |||||
ds.set_format(type="numpy", columns=ds.column_names) | |||||
return ds | |||||
def set_vocabulary(vocab, dataset): | |||||
for data in dataset: | |||||
vocab.update(data["text"].split()) | |||||
return vocab | |||||
def text_to_id(vocab, text: str, max_len): | |||||
text = text.split() | |||||
# to index | |||||
ids = [vocab.to_index(word) for word in text] | |||||
# padding | |||||
ids += [vocab.padding_idx] * (max_len - len(text)) | |||||
return ids[:max_len] | |||||
def get_dataset(name, max_len, train_format="", test_format=""): | |||||
# datasets | |||||
train_dataset = load_dataset(name, split="train" + train_format).shuffle(seed=123) | |||||
test_dataset = load_dataset(name, split="test" + test_format).shuffle(seed=321) | |||||
split = train_dataset.train_test_split(test_size=0.2, seed=123) | |||||
train_dataset = split["train"] | |||||
val_dataset = split["test"] | |||||
vocab = Vocabulary() | |||||
vocab = set_vocabulary(vocab, train_dataset) | |||||
vocab = set_vocabulary(vocab, val_dataset) | |||||
train_dataset = process_data(train_dataset, vocab, max_len) | |||||
val_dataset = process_data(val_dataset, vocab, max_len) | |||||
test_dataset = process_data(test_dataset, vocab, max_len) | |||||
return TextClassificationDataset(train_dataset), TextClassificationDataset(val_dataset), \ | |||||
TextClassificationDataset(test_dataset), vocab | |||||
if __name__ == "__main__": | |||||
# 训练参数 | |||||
max_len = 20 | |||||
epochs = 40 | |||||
lr = 1 | |||||
batch_size = 64 | |||||
features = 100 | |||||
hidden_size = 128 | |||||
# 获取数据集 | |||||
# imdb.py SetFit/sst2 | |||||
train_data, val_data, test_data, vocab = get_dataset("SetFit/sst2", max_len, "", "") | |||||
# 使用dataloader | |||||
train_dataloader = JittorDataLoader( | |||||
dataset=train_data, | |||||
batch_size=batch_size, | |||||
shuffle=True, | |||||
num_workers=4, | |||||
) | |||||
val_dataloader = JittorDataLoader( | |||||
dataset=val_data, | |||||
batch_size=batch_size, | |||||
shuffle=True, | |||||
num_workers=4, | |||||
) | |||||
test_dataloader = JittorDataLoader( | |||||
dataset=test_data, | |||||
batch_size=1, | |||||
shuffle=False, | |||||
) | |||||
# 初始化模型 | |||||
model = LSTM(len(vocab), hidden_size, features) | |||||
# 优化器 | |||||
# 也可以是多个优化器的list | |||||
optimizer = nn.SGD(model.parameters(), lr) | |||||
# Metrics | |||||
metrics = {"acc": Accuracy()} | |||||
# callbacks | |||||
callbacks = [ | |||||
PrintWhileTrainingCallBack(print_every_epoch=1, print_every_batch=10), | |||||
# RichCallback(), # print_every参数默认为1,即每一个batch更新一次进度条 | |||||
] | |||||
trainer = Trainer( | |||||
model=model, | |||||
driver="jittor", | |||||
device=[0,1,2,3,4], | |||||
optimizers=optimizer, | |||||
train_dataloader=train_dataloader, | |||||
validate_dataloaders=val_dataloader, | |||||
validate_every=-1, | |||||
input_mapping=None, | |||||
output_mapping=None, | |||||
metrics=metrics, | |||||
n_epochs=epochs, | |||||
callbacks=callbacks, | |||||
# progress_bar="raw" | |||||
) | |||||
trainer.run() |
@@ -0,0 +1,110 @@ | |||||
# coding=utf-8 | |||||
# Copyright 2020 The TensorFlow Datasets Authors and the HuggingFace Datasets Authors. | |||||
# | |||||
# Licensed under the Apache License, Version 2.0 (the "License"); | |||||
# you may not use this file except in compliance with the License. | |||||
# You may obtain a copy of the License at | |||||
# | |||||
# http://www.apache.org/licenses/LICENSE-2.0 | |||||
# | |||||
# Unless required by applicable law or agreed to in writing, software | |||||
# distributed under the License is distributed on an "AS IS" BASIS, | |||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |||||
# See the License for the specific language governing permissions and | |||||
# limitations under the License. | |||||
# Lint as: python3 | |||||
"""IMDB movie reviews dataset.""" | |||||
import datasets | |||||
from datasets.tasks import TextClassification | |||||
_DESCRIPTION = """\ | |||||
Large Movie Review Dataset. | |||||
This is a dataset for binary sentiment classification containing substantially \ | |||||
more data than previous benchmark datasets. We provide a set of 25,000 highly \ | |||||
polar movie reviews for training, and 25,000 for testing. There is additional \ | |||||
unlabeled data for use as well.\ | |||||
""" | |||||
_CITATION = """\ | |||||
@InProceedings{maas-EtAl:2011:ACL-HLT2011, | |||||
author = {Maas, Andrew L. and Daly, Raymond E. and Pham, Peter T. and Huang, Dan and Ng, Andrew Y. and Potts, Christopher}, | |||||
title = {Learning Word Vectors for Sentiment Analysis}, | |||||
booktitle = {Proceedings of the 49th Annual Meeting of the Association for Computational Linguistics: Human Language Technologies}, | |||||
month = {June}, | |||||
year = {2011}, | |||||
address = {Portland, Oregon, USA}, | |||||
publisher = {Association for Computational Linguistics}, | |||||
pages = {142--150}, | |||||
url = {http://www.aclweb.org/anthology/P11-1015} | |||||
} | |||||
""" | |||||
_DOWNLOAD_URL = "http://ai.stanford.edu/~amaas/data/sentiment/aclImdb_v1.tar.gz" | |||||
class IMDBReviewsConfig(datasets.BuilderConfig): | |||||
"""BuilderConfig for IMDBReviews.""" | |||||
def __init__(self, **kwargs): | |||||
"""BuilderConfig for IMDBReviews. | |||||
Args: | |||||
**kwargs: keyword arguments forwarded to super. | |||||
""" | |||||
super(IMDBReviewsConfig, self).__init__(version=datasets.Version("1.0.0", ""), **kwargs) | |||||
class Imdb(datasets.GeneratorBasedBuilder): | |||||
"""IMDB movie reviews dataset.""" | |||||
BUILDER_CONFIGS = [ | |||||
IMDBReviewsConfig( | |||||
name="plain_text", | |||||
description="Plain text", | |||||
) | |||||
] | |||||
def _info(self): | |||||
return datasets.DatasetInfo( | |||||
description=_DESCRIPTION, | |||||
features=datasets.Features( | |||||
{"text": datasets.Value("string"), "label": datasets.features.ClassLabel(names=["neg", "pos"])} | |||||
), | |||||
supervised_keys=None, | |||||
homepage="http://ai.stanford.edu/~amaas/data/sentiment/", | |||||
citation=_CITATION, | |||||
task_templates=[TextClassification(text_column="text", label_column="label")], | |||||
) | |||||
def _split_generators(self, dl_manager): | |||||
archive = dl_manager.download(_DOWNLOAD_URL) | |||||
return [ | |||||
datasets.SplitGenerator( | |||||
name=datasets.Split.TRAIN, gen_kwargs={"files": dl_manager.iter_archive(archive), "split": "train"} | |||||
), | |||||
datasets.SplitGenerator( | |||||
name=datasets.Split.TEST, gen_kwargs={"files": dl_manager.iter_archive(archive), "split": "test"} | |||||
), | |||||
datasets.SplitGenerator( | |||||
name=datasets.Split("unsupervised"), | |||||
gen_kwargs={"files": dl_manager.iter_archive(archive), "split": "train", "labeled": False}, | |||||
), | |||||
] | |||||
def _generate_examples(self, files, split, labeled=True): | |||||
"""Generate aclImdb examples.""" | |||||
# For labeled examples, extract the label from the path. | |||||
if labeled: | |||||
label_mapping = {"pos": 1, "neg": 0} | |||||
for path, f in files: | |||||
if path.startswith(f"aclImdb/{split}"): | |||||
label = label_mapping.get(path.split("/")[2]) | |||||
if label is not None: | |||||
yield path, {"text": f.read().decode("utf-8"), "label": label} | |||||
else: | |||||
for path, f in files: | |||||
if path.startswith(f"aclImdb/{split}"): | |||||
if path.split("/")[2] == "unsup": | |||||
yield path, {"text": f.read().decode("utf-8"), "label": -1} |
@@ -1,3 +1,5 @@ | |||||
import os | |||||
from typing import List | |||||
import pytest | import pytest | ||||
from dataclasses import dataclass | from dataclasses import dataclass | ||||
@@ -5,6 +7,7 @@ from fastNLP.core.controllers.trainer import Trainer | |||||
from fastNLP.core.metrics.accuracy import Accuracy | from fastNLP.core.metrics.accuracy import Accuracy | ||||
from fastNLP.core.callbacks.progress_callback import RichCallback | from fastNLP.core.callbacks.progress_callback import RichCallback | ||||
from fastNLP.envs.imports import _NEED_IMPORT_PADDLE | from fastNLP.envs.imports import _NEED_IMPORT_PADDLE | ||||
from fastNLP.envs.env import USER_CUDA_VISIBLE_DEVICES | |||||
if _NEED_IMPORT_PADDLE: | if _NEED_IMPORT_PADDLE: | ||||
from paddle.optimizer import Adam | from paddle.optimizer import Adam | ||||
@@ -34,6 +37,8 @@ def test_trainer_paddle( | |||||
callbacks, | callbacks, | ||||
n_epochs=2, | n_epochs=2, | ||||
): | ): | ||||
if isinstance(device, List) and USER_CUDA_VISIBLE_DEVICES not in os.environ: | |||||
pytest.skip("Skip test fleet if FASTNLP_BACKEND is not set to paddle.") | |||||
model = PaddleNormalModel_Classification_1( | model = PaddleNormalModel_Classification_1( | ||||
num_labels=TrainPaddleConfig.num_labels, | num_labels=TrainPaddleConfig.num_labels, | ||||
feature_dimension=TrainPaddleConfig.feature_dimension | feature_dimension=TrainPaddleConfig.feature_dimension | ||||
@@ -2,37 +2,42 @@ import os | |||||
import pytest | import pytest | ||||
from fastNLP.core.utils.paddle_utils import get_device_from_visible, paddle_to, paddle_move_data_to_device | |||||
from fastNLP.core.utils.paddle_utils import _convert_data_device, paddle_to, paddle_move_data_to_device | |||||
from fastNLP.envs.imports import _NEED_IMPORT_PADDLE | from fastNLP.envs.imports import _NEED_IMPORT_PADDLE | ||||
if _NEED_IMPORT_PADDLE: | if _NEED_IMPORT_PADDLE: | ||||
import paddle | import paddle | ||||
@pytest.mark.parametrize( | @pytest.mark.parametrize( | ||||
("user_visible_devices, cuda_visible_devices, device, output_type, correct"), | |||||
("user_visible_devices, cuda_visible_devices, device, correct"), | |||||
( | ( | ||||
("0,1,2,3,4,5,6,7", "0", "cpu", str, "cpu"), | |||||
("0,1,2,3,4,5,6,7", "0", "cpu", int, "cpu"), | |||||
("0,1,2,3,4,5,6,7", "3,4,5", "gpu:4", int, 1), | |||||
("0,1,2,3,4,5,6,7", "3,4,5", "gpu:5", str, "gpu:2"), | |||||
("3,4,5,6", "3,5", 0, int, 0), | |||||
("3,6,7,8", "6,7,8", "gpu:2", str, "gpu:1"), | |||||
(None, None, 1, "gpu:1"), | |||||
(None, "2,4,5,6", 2, "gpu:2"), | |||||
(None, "3,4,5", 1, "gpu:1"), | |||||
("0,1,2,3,4,5,6,7", "0", "cpu", "cpu"), | |||||
("3,4,5,6,7", "0", "cpu", "cpu"), | |||||
("0,1,2,3,4,5,6,7", "3,4,5", "gpu:4", "gpu:1"), | |||||
("0,1,2,3,4,5,6,7", "3,4,5", "gpu:5", "gpu:2"), | |||||
("3,4,5,6", "3,5", 0, "gpu:0"), | |||||
("3,6,7,8", "6,7,8", "gpu:2", "gpu:1"), | |||||
) | ) | ||||
) | ) | ||||
@pytest.mark.paddle | |||||
def test_get_device_from_visible(user_visible_devices, cuda_visible_devices, device, output_type, correct): | |||||
def test_convert_data_device(user_visible_devices, cuda_visible_devices, device, correct): | |||||
_cuda_visible_devices = os.getenv("CUDA_VISIBLE_DEVICES") | _cuda_visible_devices = os.getenv("CUDA_VISIBLE_DEVICES") | ||||
_user_visible_devices = os.getenv("USER_CUDA_VISIBLE_DEVICES") | _user_visible_devices = os.getenv("USER_CUDA_VISIBLE_DEVICES") | ||||
os.environ["CUDA_VISIBLE_DEVICES"] = cuda_visible_devices | |||||
os.environ["USER_CUDA_VISIBLE_DEVICES"] = user_visible_devices | |||||
res = get_device_from_visible(device, output_type) | |||||
if cuda_visible_devices is not None: | |||||
os.environ["CUDA_VISIBLE_DEVICES"] = cuda_visible_devices | |||||
if user_visible_devices is not None: | |||||
os.environ["USER_CUDA_VISIBLE_DEVICES"] = user_visible_devices | |||||
res = _convert_data_device(device) | |||||
assert res == correct | assert res == correct | ||||
# 还原环境变量 | # 还原环境变量 | ||||
if _cuda_visible_devices is None: | if _cuda_visible_devices is None: | ||||
del os.environ["CUDA_VISIBLE_DEVICES"] | |||||
os.environ.pop("CUDA_VISIBLE_DEVICES", None) | |||||
else: | else: | ||||
os.environ["CUDA_VISIBLE_DEVICES"] = _cuda_visible_devices | os.environ["CUDA_VISIBLE_DEVICES"] = _cuda_visible_devices | ||||
if _user_visible_devices is None: | if _user_visible_devices is None: | ||||
del os.environ["USER_CUDA_VISIBLE_DEVICES"] | |||||
os.environ.pop("USER_CUDA_VISIBLE_DEVICES", None) | |||||
else: | else: | ||||
os.environ["USER_CUDA_VISIBLE_DEVICES"] = _user_visible_devices | os.environ["USER_CUDA_VISIBLE_DEVICES"] = _user_visible_devices | ||||
@@ -0,0 +1,442 @@ | |||||
import pytest | |||||
from fastNLP.envs.imports import _NEED_IMPORT_JITTOR, _NEED_IMPORT_PADDLE, _NEED_IMPORT_TORCH | |||||
from fastNLP.modules.mix_modules.utils import ( | |||||
paddle2torch, | |||||
torch2paddle, | |||||
jittor2torch, | |||||
torch2jittor, | |||||
) | |||||
if _NEED_IMPORT_TORCH: | |||||
import torch | |||||
if _NEED_IMPORT_PADDLE: | |||||
import paddle | |||||
if _NEED_IMPORT_JITTOR: | |||||
import jittor | |||||
############################################################################ | |||||
# | |||||
# 测试paddle到torch的转换 | |||||
# | |||||
############################################################################ | |||||
@pytest.mark.torchpaddle | |||||
class TestPaddle2Torch: | |||||
def check_torch_tensor(self, tensor, device, requires_grad): | |||||
""" | |||||
检查张量设备和梯度情况的工具函数 | |||||
""" | |||||
assert isinstance(tensor, torch.Tensor) | |||||
assert tensor.device == torch.device(device) | |||||
assert tensor.requires_grad == requires_grad | |||||
def test_gradient(self): | |||||
""" | |||||
测试张量转换后的反向传播是否正确 | |||||
""" | |||||
x = paddle.to_tensor([1.0, 2.0, 3.0, 4.0, 5.0], stop_gradient=False) | |||||
y = paddle2torch(x) | |||||
z = 3 * (y ** 2) | |||||
z.sum().backward() | |||||
assert y.grad.tolist() == [6, 12, 18, 24, 30] | |||||
def test_tensor_transfer(self): | |||||
""" | |||||
测试单个张量的设备和梯度转换是否正确 | |||||
""" | |||||
paddle_tensor = paddle.rand((3, 4, 5)).cpu() | |||||
res = paddle2torch(paddle_tensor) | |||||
self.check_torch_tensor(res, "cpu", not paddle_tensor.stop_gradient) | |||||
res = paddle2torch(paddle_tensor, target_device="cuda:2", no_gradient=None) | |||||
self.check_torch_tensor(res, "cuda:2", not paddle_tensor.stop_gradient) | |||||
res = paddle2torch(paddle_tensor, target_device="cuda:1", no_gradient=True) | |||||
self.check_torch_tensor(res, "cuda:1", False) | |||||
res = paddle2torch(paddle_tensor, target_device="cuda:1", no_gradient=False) | |||||
self.check_torch_tensor(res, "cuda:1", True) | |||||
def test_list_transfer(self): | |||||
""" | |||||
测试张量列表的转换 | |||||
""" | |||||
paddle_list = [paddle.rand((6, 4, 2)).cuda(1) for i in range(10)] | |||||
res = paddle2torch(paddle_list) | |||||
assert isinstance(res, list) | |||||
for t in res: | |||||
self.check_torch_tensor(t, "cuda:1", False) | |||||
res = paddle2torch(paddle_list, target_device="cpu", no_gradient=False) | |||||
assert isinstance(res, list) | |||||
for t in res: | |||||
self.check_torch_tensor(t, "cpu", True) | |||||
def test_tensor_tuple_transfer(self): | |||||
""" | |||||
测试张量元组的转换 | |||||
""" | |||||
paddle_list = [paddle.rand((6, 4, 2)).cuda(1) for i in range(10)] | |||||
paddle_tuple = tuple(paddle_list) | |||||
res = paddle2torch(paddle_tuple) | |||||
assert isinstance(res, tuple) | |||||
for t in res: | |||||
self.check_torch_tensor(t, "cuda:1", False) | |||||
def test_dict_transfer(self): | |||||
""" | |||||
测试包含复杂结构的字典的转换 | |||||
""" | |||||
paddle_dict = { | |||||
"tensor": paddle.rand((3, 4)).cuda(0), | |||||
"list": [paddle.rand((6, 4, 2)).cuda(0) for i in range(10)], | |||||
"dict":{ | |||||
"list": [paddle.rand((6, 4, 2)).cuda(0) for i in range(10)], | |||||
"tensor": paddle.rand((3, 4)).cuda(0) | |||||
}, | |||||
"int": 2, | |||||
"string": "test string" | |||||
} | |||||
res = paddle2torch(paddle_dict) | |||||
assert isinstance(res, dict) | |||||
self.check_torch_tensor(res["tensor"], "cuda:0", False) | |||||
assert isinstance(res["list"], list) | |||||
for t in res["list"]: | |||||
self.check_torch_tensor(t, "cuda:0", False) | |||||
assert isinstance(res["int"], int) | |||||
assert isinstance(res["string"], str) | |||||
assert isinstance(res["dict"], dict) | |||||
assert isinstance(res["dict"]["list"], list) | |||||
for t in res["dict"]["list"]: | |||||
self.check_torch_tensor(t, "cuda:0", False) | |||||
self.check_torch_tensor(res["dict"]["tensor"], "cuda:0", False) | |||||
############################################################################ | |||||
# | |||||
# 测试torch到paddle的转换 | |||||
# | |||||
############################################################################ | |||||
@pytest.mark.torchpaddle | |||||
class TestTorch2Paddle: | |||||
def check_paddle_tensor(self, tensor, device, stop_gradient): | |||||
""" | |||||
检查得到的paddle张量设备和梯度情况的工具函数 | |||||
""" | |||||
assert isinstance(tensor, paddle.Tensor) | |||||
if device == "cpu": | |||||
assert tensor.place.is_cpu_place() | |||||
elif device.startswith("gpu"): | |||||
paddle_device = paddle.device._convert_to_place(device) | |||||
assert tensor.place.is_gpu_place() | |||||
if hasattr(tensor.place, "gpu_device_id"): | |||||
# paddle中,有两种Place | |||||
# paddle.fluid.core.Place是创建Tensor时使用的类型 | |||||
# 有函数gpu_device_id获取设备 | |||||
assert tensor.place.gpu_device_id() == paddle_device.get_device_id() | |||||
else: | |||||
# 通过_convert_to_place得到的是paddle.CUDAPlace | |||||
# 通过get_device_id获取设备 | |||||
assert tensor.place.get_device_id() == paddle_device.get_device_id() | |||||
else: | |||||
raise NotImplementedError | |||||
assert tensor.stop_gradient == stop_gradient | |||||
def test_gradient(self): | |||||
""" | |||||
测试转换后梯度的反向传播 | |||||
""" | |||||
x = torch.tensor([1.0, 2.0, 3.0, 4.0, 5.0], requires_grad=True) | |||||
y = torch2paddle(x) | |||||
z = 3 * (y ** 2) | |||||
z.sum().backward() | |||||
assert y.grad.tolist() == [6, 12, 18, 24, 30] | |||||
def test_tensor_transfer(self): | |||||
""" | |||||
测试单个张量的转换 | |||||
""" | |||||
torch_tensor = torch.rand((3, 4, 5)) | |||||
res = torch2paddle(torch_tensor) | |||||
self.check_paddle_tensor(res, "cpu", True) | |||||
res = torch2paddle(torch_tensor, target_device="gpu:2", no_gradient=None) | |||||
self.check_paddle_tensor(res, "gpu:2", True) | |||||
res = torch2paddle(torch_tensor, target_device="gpu:2", no_gradient=True) | |||||
self.check_paddle_tensor(res, "gpu:2", True) | |||||
res = torch2paddle(torch_tensor, target_device="gpu:2", no_gradient=False) | |||||
self.check_paddle_tensor(res, "gpu:2", False) | |||||
def test_tensor_list_transfer(self): | |||||
""" | |||||
测试张量列表的转换 | |||||
""" | |||||
torch_list = [torch.rand(6, 4, 2) for i in range(10)] | |||||
res = torch2paddle(torch_list) | |||||
assert isinstance(res, list) | |||||
for t in res: | |||||
self.check_paddle_tensor(t, "cpu", True) | |||||
res = torch2paddle(torch_list, target_device="gpu:1", no_gradient=False) | |||||
assert isinstance(res, list) | |||||
for t in res: | |||||
self.check_paddle_tensor(t, "gpu:1", False) | |||||
def test_tensor_tuple_transfer(self): | |||||
""" | |||||
测试张量元组的转换 | |||||
""" | |||||
torch_list = [torch.rand(6, 4, 2) for i in range(10)] | |||||
torch_tuple = tuple(torch_list) | |||||
res = torch2paddle(torch_tuple, target_device="cpu") | |||||
assert isinstance(res, tuple) | |||||
for t in res: | |||||
self.check_paddle_tensor(t, "cpu", True) | |||||
def test_dict_transfer(self): | |||||
""" | |||||
测试复杂的字典结构的转换 | |||||
""" | |||||
torch_dict = { | |||||
"tensor": torch.rand((3, 4)), | |||||
"list": [torch.rand(6, 4, 2) for i in range(10)], | |||||
"dict":{ | |||||
"list": [torch.rand(6, 4, 2) for i in range(10)], | |||||
"tensor": torch.rand((3, 4)) | |||||
}, | |||||
"int": 2, | |||||
"string": "test string" | |||||
} | |||||
res = torch2paddle(torch_dict) | |||||
assert isinstance(res, dict) | |||||
self.check_paddle_tensor(res["tensor"], "cpu", True) | |||||
assert isinstance(res["list"], list) | |||||
for t in res["list"]: | |||||
self.check_paddle_tensor(t, "cpu", True) | |||||
assert isinstance(res["int"], int) | |||||
assert isinstance(res["string"], str) | |||||
assert isinstance(res["dict"], dict) | |||||
assert isinstance(res["dict"]["list"], list) | |||||
for t in res["dict"]["list"]: | |||||
self.check_paddle_tensor(t, "cpu", True) | |||||
self.check_paddle_tensor(res["dict"]["tensor"], "cpu", True) | |||||
############################################################################ | |||||
# | |||||
# 测试jittor到torch的转换 | |||||
# | |||||
############################################################################ | |||||
class TestJittor2Torch: | |||||
def check_torch_tensor(self, tensor, device, requires_grad): | |||||
""" | |||||
检查得到的torch张量的工具函数 | |||||
""" | |||||
assert isinstance(tensor, torch.Tensor) | |||||
if device == "cpu": | |||||
assert not tensor.is_cuda | |||||
else: | |||||
assert tensor.device == torch.device(device) | |||||
assert tensor.requires_grad == requires_grad | |||||
def test_var_transfer(self): | |||||
""" | |||||
测试单个Jittor Var的转换 | |||||
""" | |||||
jittor_var = jittor.rand((3, 4, 5)) | |||||
res = jittor2torch(jittor_var) | |||||
self.check_torch_tensor(res, "cpu", True) | |||||
res = jittor2torch(jittor_var, target_device="cuda:2", no_gradient=None) | |||||
self.check_torch_tensor(res, "cuda:2", True) | |||||
res = jittor2torch(jittor_var, target_device="cuda:2", no_gradient=True) | |||||
self.check_torch_tensor(res, "cuda:2", False) | |||||
res = jittor2torch(jittor_var, target_device="cuda:2", no_gradient=False) | |||||
self.check_torch_tensor(res, "cuda:2", True) | |||||
def test_var_list_transfer(self): | |||||
""" | |||||
测试Jittor列表的转换 | |||||
""" | |||||
jittor_list = [jittor.rand((6, 4, 2)) for i in range(10)] | |||||
res = jittor2torch(jittor_list) | |||||
assert isinstance(res, list) | |||||
for t in res: | |||||
self.check_torch_tensor(t, "cpu", True) | |||||
res = jittor2torch(jittor_list, target_device="cuda:1", no_gradient=False) | |||||
assert isinstance(res, list) | |||||
for t in res: | |||||
self.check_torch_tensor(t, "cuda:1", True) | |||||
def test_var_tuple_transfer(self): | |||||
""" | |||||
测试Jittor变量元组的转换 | |||||
""" | |||||
jittor_list = [jittor.rand((6, 4, 2)) for i in range(10)] | |||||
jittor_tuple = tuple(jittor_list) | |||||
res = jittor2torch(jittor_tuple, target_device="cpu") | |||||
assert isinstance(res, tuple) | |||||
for t in res: | |||||
self.check_torch_tensor(t, "cpu", True) | |||||
def test_dict_transfer(self): | |||||
""" | |||||
测试字典结构的转换 | |||||
""" | |||||
jittor_dict = { | |||||
"tensor": jittor.rand((3, 4)), | |||||
"list": [jittor.rand(6, 4, 2) for i in range(10)], | |||||
"dict":{ | |||||
"list": [jittor.rand(6, 4, 2) for i in range(10)], | |||||
"tensor": jittor.rand((3, 4)) | |||||
}, | |||||
"int": 2, | |||||
"string": "test string" | |||||
} | |||||
res = jittor2torch(jittor_dict) | |||||
assert isinstance(res, dict) | |||||
self.check_torch_tensor(res["tensor"], "cpu", True) | |||||
assert isinstance(res["list"], list) | |||||
for t in res["list"]: | |||||
self.check_torch_tensor(t, "cpu", True) | |||||
assert isinstance(res["int"], int) | |||||
assert isinstance(res["string"], str) | |||||
assert isinstance(res["dict"], dict) | |||||
assert isinstance(res["dict"]["list"], list) | |||||
for t in res["dict"]["list"]: | |||||
self.check_torch_tensor(t, "cpu", True) | |||||
self.check_torch_tensor(res["dict"]["tensor"], "cpu", True) | |||||
############################################################################ | |||||
# | |||||
# 测试torch到jittor的转换 | |||||
# | |||||
############################################################################ | |||||
class TestTorch2Jittor: | |||||
def check_jittor_var(self, var, requires_grad): | |||||
""" | |||||
检查得到的Jittor Var梯度情况的工具函数 | |||||
""" | |||||
assert isinstance(var, jittor.Var) | |||||
assert var.requires_grad == requires_grad | |||||
def test_gradient(self): | |||||
""" | |||||
测试反向传播的梯度 | |||||
""" | |||||
x = torch.tensor([1.0, 2.0, 3.0, 4.0, 5.0], requires_grad=True) | |||||
y = torch2jittor(x) | |||||
z = 3 * (y ** 2) | |||||
grad = jittor.grad(z, y) | |||||
assert grad.tolist() == [6.0, 12.0, 18.0, 24.0, 30.0] | |||||
def test_tensor_transfer(self): | |||||
""" | |||||
测试单个张量转换为Jittor | |||||
""" | |||||
torch_tensor = torch.rand((3, 4, 5)) | |||||
res = torch2jittor(torch_tensor) | |||||
self.check_jittor_var(res, False) | |||||
res = torch2jittor(torch_tensor, no_gradient=None) | |||||
self.check_jittor_var(res, False) | |||||
res = torch2jittor(torch_tensor, no_gradient=True) | |||||
self.check_jittor_var(res, False) | |||||
res = torch2jittor(torch_tensor, no_gradient=False) | |||||
self.check_jittor_var(res, True) | |||||
def test_tensor_list_transfer(self): | |||||
""" | |||||
测试张量列表的转换 | |||||
""" | |||||
torch_list = [torch.rand((6, 4, 2)) for i in range(10)] | |||||
res = torch2jittor(torch_list) | |||||
assert isinstance(res, list) | |||||
for t in res: | |||||
self.check_jittor_var(t, False) | |||||
res = torch2jittor(torch_list, no_gradient=False) | |||||
assert isinstance(res, list) | |||||
for t in res: | |||||
self.check_jittor_var(t, True) | |||||
def test_tensor_tuple_transfer(self): | |||||
""" | |||||
测试张量元组的转换 | |||||
""" | |||||
torch_list = [torch.rand((6, 4, 2)) for i in range(10)] | |||||
torch_tuple = tuple(torch_list) | |||||
res = torch2jittor(torch_tuple) | |||||
assert isinstance(res, tuple) | |||||
for t in res: | |||||
self.check_jittor_var(t, False) | |||||
def test_dict_transfer(self): | |||||
""" | |||||
测试字典结构的转换 | |||||
""" | |||||
torch_dict = { | |||||
"tensor": torch.rand((3, 4)), | |||||
"list": [torch.rand(6, 4, 2) for i in range(10)], | |||||
"dict":{ | |||||
"list": [torch.rand(6, 4, 2) for i in range(10)], | |||||
"tensor": torch.rand((3, 4)) | |||||
}, | |||||
"int": 2, | |||||
"string": "test string" | |||||
} | |||||
res = torch2jittor(torch_dict) | |||||
assert isinstance(res, dict) | |||||
self.check_jittor_var(res["tensor"], False) | |||||
assert isinstance(res["list"], list) | |||||
for t in res["list"]: | |||||
self.check_jittor_var(t, False) | |||||
assert isinstance(res["int"], int) | |||||
assert isinstance(res["string"], str) | |||||
assert isinstance(res["dict"], dict) | |||||
assert isinstance(res["dict"]["list"], list) | |||||
for t in res["dict"]["list"]: | |||||
self.check_jittor_var(t, False) | |||||
self.check_jittor_var(res["dict"]["tensor"], False) |