Browse Source

1.修改部分注释; 2.增加prepare_dataloader函数;3.paddle的paddler修改为先集中使用numpy pad再一把to_tensor

tags/v1.0.0alpha
yh_cc 2 years ago
parent
commit
47a63c27f7
22 changed files with 195 additions and 90 deletions
  1. +1
    -0
      fastNLP/core/__init__.py
  2. +1
    -1
      fastNLP/core/callbacks/checkpoint_callback.py
  3. +1
    -1
      fastNLP/core/callbacks/early_stop_callback.py
  4. +3
    -3
      fastNLP/core/callbacks/has_monitor_callback.py
  5. +1
    -1
      fastNLP/core/callbacks/load_best_model_callback.py
  6. +1
    -1
      fastNLP/core/callbacks/more_evaluate_callback.py
  7. +2
    -2
      fastNLP/core/callbacks/progress_callback.py
  8. +1
    -1
      fastNLP/core/callbacks/topk_saver.py
  9. +7
    -6
      fastNLP/core/collators/collator.py
  10. +8
    -41
      fastNLP/core/collators/padders/paddle_padder.py
  11. +1
    -0
      fastNLP/core/controllers/evaluator.py
  12. +11
    -1
      fastNLP/core/controllers/trainer.py
  13. +4
    -1
      fastNLP/core/dataloaders/__init__.py
  14. +6
    -3
      fastNLP/core/dataloaders/jittor_dataloader/fdl.py
  15. +2
    -1
      fastNLP/core/dataloaders/paddle_dataloader/fdl.py
  16. +114
    -0
      fastNLP/core/dataloaders/prepare_dataloader.py
  17. +13
    -24
      fastNLP/core/dataloaders/torch_dataloader/fdl.py
  18. +1
    -1
      fastNLP/core/drivers/paddle_driver/fleet.py
  19. +1
    -1
      fastNLP/core/drivers/torch_driver/ddp.py
  20. +1
    -1
      fastNLP/core/drivers/torch_driver/initialize_torch_driver.py
  21. +2
    -0
      fastNLP/core/utils/utils.py
  22. +13
    -0
      tests/core/dataloaders/test_prepare_dataloader.py

+ 1
- 0
fastNLP/core/__init__.py View File

@@ -48,6 +48,7 @@ __all__ = [
'prepare_jittor_dataloader', 'prepare_jittor_dataloader',
'prepare_paddle_dataloader', 'prepare_paddle_dataloader',
'prepare_torch_dataloader', 'prepare_torch_dataloader',
"prepare_dataloader",


# dataset # dataset
'DataSet', 'DataSet',


+ 1
- 1
fastNLP/core/callbacks/checkpoint_callback.py View File

@@ -32,7 +32,7 @@ class CheckpointCallback(Callback):
model_save_fn 为 None ,则以上每个 folder 中,将生成 fastnlp_model.pkl.tar 文件。 model_save_fn 为 None ,则以上每个 folder 中,将生成 fastnlp_model.pkl.tar 文件。
若 model_save_fn 不为 None,则 fastNLP 将 folder 绝对路径传递给该函数,fastNLP 在该 folder 下不进行模型保存。 若 model_save_fn 不为 None,则 fastNLP 将 folder 绝对路径传递给该函数,fastNLP 在该 folder 下不进行模型保存。


:param monitor: 监控的 metric 值。如果在 evaluation 结果中没有找到完全一致的名称,将使用 最公共字符串算法 找到最匹配
:param monitor: 监控的 metric 值。如果在 evaluation 结果中没有找到完全一致的名称,将使用 最公共字符串算法 找到最匹配
的那个作为 monitor 。如果为 None,将尝试使用 Trainer 设置的 monitor 。也可以传入一个函数,接受参数为 evaluation 的结 的那个作为 monitor 。如果为 None,将尝试使用 Trainer 设置的 monitor 。也可以传入一个函数,接受参数为 evaluation 的结
果(字典类型),返回一个 float 值作为 monitor 的结果,如果当前结果中没有相关的 monitor 值请返回 None 。 果(字典类型),返回一个 float 值作为 monitor 的结果,如果当前结果中没有相关的 monitor 值请返回 None 。
:param folder: 保存的文件夹,fastNLP 将在该文件下以时间戳创建子文件夹,并在里面保存。因此不同次运行可以将被保存到不同的 :param folder: 保存的文件夹,fastNLP 将在该文件下以时间戳创建子文件夹,并在里面保存。因此不同次运行可以将被保存到不同的


+ 1
- 1
fastNLP/core/callbacks/early_stop_callback.py View File

@@ -12,7 +12,7 @@ class EarlyStopCallback(HasMonitorCallback):
def __init__(self, monitor:Union[str, Callable]=None, larger_better:bool=True, patience:int=10): def __init__(self, monitor:Union[str, Callable]=None, larger_better:bool=True, patience:int=10):
""" """


:param str monitor: 监控的 metric 值。如果在 evaluation 结果中没有找到完全一致的名称,将使用 最公共字符串算法 找到最匹配
:param str monitor: 监控的 metric 值。如果在 evaluation 结果中没有找到完全一致的名称,将使用 最公共字符串算法 找到最匹配
的那个作为 monitor 。如果为 None,将尝试使用 Trainer 设置的 monitor 。也可以传入一个函数,接受参数为 evaluation 的结 的那个作为 monitor 。如果为 None,将尝试使用 Trainer 设置的 monitor 。也可以传入一个函数,接受参数为 evaluation 的结
果(字典类型),返回一个 float 值作为 monitor 的结果。 果(字典类型),返回一个 float 值作为 monitor 的结果。
:param larger_better: monitor 的值是否是越大越好。 :param larger_better: monitor 的值是否是越大越好。


+ 3
- 3
fastNLP/core/callbacks/has_monitor_callback.py View File

@@ -34,7 +34,7 @@ class ResultsMonitor:
""" """
可用于监控某个数值,并通过 is_better_results() 等接口实现检测结果是否变得更好了。 可用于监控某个数值,并通过 is_better_results() 等接口实现检测结果是否变得更好了。


:param monitor: 监控的 metric 值。如果在 evaluation 结果中没有找到完全一致的名称,将使用 最公共字符串算法 找到最匹配
:param monitor: 监控的 metric 值。如果在 evaluation 结果中没有找到完全一致的名称,将使用 最公共字符串算法 找到最匹配
的那个作为 monitor 。如果为 None,将尝试使用 Trainer 设置的 monitor 。也可以传入一个函数,接受参数为 evaluation 的结 的那个作为 monitor 。如果为 None,将尝试使用 Trainer 设置的 monitor 。也可以传入一个函数,接受参数为 evaluation 的结
果(字典类型),返回一个 float 值作为 monitor 的结果,如果当前结果中没有相关的 monitor 值请返回 None 。 果(字典类型),返回一个 float 值作为 monitor 的结果,如果当前结果中没有相关的 monitor 值请返回 None 。
:param larger_better: monitor 是否时越大越好 :param larger_better: monitor 是否时越大越好
@@ -171,7 +171,7 @@ class HasMonitorCallback(ResultsMonitor, Callback):
该 callback 不直接进行使用,作为其它相关 callback 的父类使用,如果 callback 有使用 monitor 可以继承该函数里面实现了 该 callback 不直接进行使用,作为其它相关 callback 的父类使用,如果 callback 有使用 monitor 可以继承该函数里面实现了
(1)判断monitor合法性;(2)在需要时, 根据trainer的monitor设置自己的monitor名称。 (1)判断monitor合法性;(2)在需要时, 根据trainer的monitor设置自己的monitor名称。


:param monitor: 监控的 metric 值。如果在 evaluation 结果中没有找到完全一致的名称,将使用 最公共字符串算法 找到最匹配
:param monitor: 监控的 metric 值。如果在 evaluation 结果中没有找到完全一致的名称,将使用 最公共字符串算法 找到最匹配
的那个作为 monitor 。如果为 None,将尝试使用 Trainer 设置的 monitor 。也可以传入一个函数,接受参数为 evaluation 的结 的那个作为 monitor 。如果为 None,将尝试使用 Trainer 设置的 monitor 。也可以传入一个函数,接受参数为 evaluation 的结
果(字典类型),返回一个 float 值作为 monitor 的结果,如果当前结果中没有相关的 monitor 值请返回 None 。 果(字典类型),返回一个 float 值作为 monitor 的结果,如果当前结果中没有相关的 monitor 值请返回 None 。
:param larger_better: monitor 是否时越大越好 :param larger_better: monitor 是否时越大越好
@@ -209,7 +209,7 @@ class ExecuteOnceBetterMonitor(HasMonitorCallback):
""" """
当监控的 monitor 结果更好的时候,调用 execute_fn 函数。 当监控的 monitor 结果更好的时候,调用 execute_fn 函数。


:param monitor: 监控的 metric 值。如果在 evaluation 结果中没有找到完全一致的名称,将使用 最公共字符串算法 找到最匹配
:param monitor: 监控的 metric 值。如果在 evaluation 结果中没有找到完全一致的名称,将使用 最公共字符串算法 找到最匹配
的那个作为 monitor 。如果为 None,将尝试使用 Trainer 设置的 monitor 。也可以传入一个函数,接受参数为 evaluation 的结 的那个作为 monitor 。如果为 None,将尝试使用 Trainer 设置的 monitor 。也可以传入一个函数,接受参数为 evaluation 的结
果(字典类型),返回一个 float 值作为 monitor 的结果,如果当前结果中没有相关的 monitor 值请返回 None 。 果(字典类型),返回一个 float 值作为 monitor 的结果,如果当前结果中没有相关的 monitor 值请返回 None 。
:param larger_better: monitor 是否时越大越好 :param larger_better: monitor 是否时越大越好


+ 1
- 1
fastNLP/core/callbacks/load_best_model_callback.py View File

@@ -21,7 +21,7 @@ class LoadBestModelCallback(HasMonitorCallback):
""" """
保存最佳的 monitor 值最佳的模型,并在训练结束的时候重新加载模型。仅在训练正常结束的时候才能加载最好的模型。 保存最佳的 monitor 值最佳的模型,并在训练结束的时候重新加载模型。仅在训练正常结束的时候才能加载最好的模型。


:param str monitor: 监控的 metric 值。如果在 evaluation 结果中没有找到完全一致的名称,将使用 最公共字符串算法 找到最匹配
:param str monitor: 监控的 metric 值。如果在 evaluation 结果中没有找到完全一致的名称,将使用 最公共字符串算法 找到最匹配
的那个作为 monitor 。如果为 None,将尝试使用 Trainer 设置的 monitor 。也可以传入一个函数,接受参数为 evaluation 的结 的那个作为 monitor 。如果为 None,将尝试使用 Trainer 设置的 monitor 。也可以传入一个函数,接受参数为 evaluation 的结
果(字典类型),返回一个 float 值作为 monitor 的结果,如果当前结果中没有相关的 monitor 值请返回 None 。 果(字典类型),返回一个 float 值作为 monitor 的结果,如果当前结果中没有相关的 monitor 值请返回 None 。
:param larger_better: 该 metric 值是否是越大越好。 :param larger_better: 该 metric 值是否是越大越好。


+ 1
- 1
fastNLP/core/callbacks/more_evaluate_callback.py View File

@@ -37,7 +37,7 @@ class MoreEvaluateCallback(HasMonitorCallback):
一个 bool 值,返回为 True 说明需要进行 evaluate ;将在每个 batch 结束后调用该函数判断是否需要 evaluate 。 一个 bool 值,返回为 True 说明需要进行 evaluate ;将在每个 batch 结束后调用该函数判断是否需要 evaluate 。
:param watch_monitor: 这个值用来表示监控的 Trainer 中的 evaluate 结果的,当该值不为 None ,evaluate_every 失效。本参数的 :param watch_monitor: 这个值用来表示监控的 Trainer 中的 evaluate 结果的,当该值不为 None ,evaluate_every 失效。本参数的
意义是,当检测到 Trainer 中 evaluate results 的 {watch_monitor} 的结果更好时,则进行一次 evaluate 。该参数有两种 意义是,当检测到 Trainer 中 evaluate results 的 {watch_monitor} 的结果更好时,则进行一次 evaluate 。该参数有两种
取值: (1) str 类型,监控的 metric 值。如果在 evaluation 结果中没有找到完全一致的名称,将使用 最公共字符串算法 找到最
取值: (1) str 类型,监控的 metric 值。如果在 evaluation 结果中没有找到完全一致的名称,将使用 最公共字符串算法 找到最
匹配的那个作为 monitor ; (2) 也可以传入一个函数,接受参数为 evaluation 的结果(字典类型),返回一个 float 值作为 monitor 匹配的那个作为 monitor ; (2) 也可以传入一个函数,接受参数为 evaluation 的结果(字典类型),返回一个 float 值作为 monitor
的结果,如果当前结果中没有相关的monitor 值请返回 None 。 的结果,如果当前结果中没有相关的monitor 值请返回 None 。
:param watch_monitor_larger_better: watch_monitor 是否越大越好。 :param watch_monitor_larger_better: watch_monitor 是否越大越好。


+ 2
- 2
fastNLP/core/callbacks/progress_callback.py View File

@@ -46,7 +46,7 @@ class RichCallback(ProgressCallback):
:param print_every: 多少个 batch 更新一次显示。 :param print_every: 多少个 batch 更新一次显示。
:param loss_round_ndigit: 显示的 loss 保留多少位有效数字 :param loss_round_ndigit: 显示的 loss 保留多少位有效数字
:param monitor: 当检测到这个key的结果更好时,会打印出不同的颜色进行提示。监控的 metric 值。如果在 evaluation 结果中没有找到 :param monitor: 当检测到这个key的结果更好时,会打印出不同的颜色进行提示。监控的 metric 值。如果在 evaluation 结果中没有找到
完全一致的名称,将使用 最公共字符串算法 找到最匹配的那个作为 monitor 。如果为 None,将尝试使用 Trainer 设置的 monitor
完全一致的名称,将使用 最公共字符串算法 找到最匹配的那个作为 monitor 。如果为 None,将尝试使用 Trainer 设置的 monitor
。也可以传入一个函数,接受参数为 evaluation 的结果(字典类型),返回一个 float 值作为 monitor 的结果,如果当前结果中没有 。也可以传入一个函数,接受参数为 evaluation 的结果(字典类型),返回一个 float 值作为 monitor 的结果,如果当前结果中没有
相关的 monitor 值请返回 None 。 相关的 monitor 值请返回 None 。
:param larger_better: 是否是 monitor 的结果越大越好。 :param larger_better: 是否是 monitor 的结果越大越好。
@@ -141,7 +141,7 @@ class RawTextCallback(ProgressCallback):
:param print_every: 多少个 batch 更新一次显示。 :param print_every: 多少个 batch 更新一次显示。
:param loss_round_ndigit: 显示的 loss 保留多少位有效数字 :param loss_round_ndigit: 显示的 loss 保留多少位有效数字
:param monitor: 当检测到这个key的结果更好时,会打印出不同的颜色进行提示。监控的 metric 值。如果在 evaluation 结果中没有找到 :param monitor: 当检测到这个key的结果更好时,会打印出不同的颜色进行提示。监控的 metric 值。如果在 evaluation 结果中没有找到
完全一致的名称,将使用 最公共字符串算法 找到最匹配的那个作为 monitor 。如果为 None,将尝试使用 Trainer 设置的 monitor
完全一致的名称,将使用 最公共字符串算法 找到最匹配的那个作为 monitor 。如果为 None,将尝试使用 Trainer 设置的 monitor
。也可以传入一个函数,接受参数为 evaluation 的结果(字典类型),返回一个 float 值作为 monitor 的结果,如果当前结果中没有 。也可以传入一个函数,接受参数为 evaluation 的结果(字典类型),返回一个 float 值作为 monitor 的结果,如果当前结果中没有
相关的 monitor 值请返回 None 。 相关的 monitor 值请返回 None 。
:param larger_better: 是否是monitor的结果越大越好。 :param larger_better: 是否是monitor的结果越大越好。


+ 1
- 1
fastNLP/core/callbacks/topk_saver.py View File

@@ -183,7 +183,7 @@ class TopkSaver(ResultsMonitor, Saver):


:param topk: 保存 topk 多少的模型,-1 为保存所有模型;0 为都不保存;大于 0 的数为保存 topk 个。 :param topk: 保存 topk 多少的模型,-1 为保存所有模型;0 为都不保存;大于 0 的数为保存 topk 个。
:param monitor: 监控哪个指标判断是否是 topk 的。监控的 metric 值。如果在 evaluation 结果中没有找到完全一致的名称,将使用 :param monitor: 监控哪个指标判断是否是 topk 的。监控的 metric 值。如果在 evaluation 结果中没有找到完全一致的名称,将使用
公共字符串算法 找到最匹配的那个作为 monitor 。如果为 None,将尝试使用 Trainer 设置的 monitor 。也可以传入一个函数,
公共字符串算法 找到最匹配的那个作为 monitor 。如果为 None,将尝试使用 Trainer 设置的 monitor 。也可以传入一个函数,
接受参数为 evaluation 的结果(字典类型),返回一个 float 值作为 monitor 的结果,如果当前结果中没有相关的 monitor 值请 接受参数为 evaluation 的结果(字典类型),返回一个 float 值作为 monitor 的结果,如果当前结果中没有相关的 monitor 值请
返回 None 。 返回 None 。
:param larger_better: 该 monitor 是否越大越好。 :param larger_better: 该 monitor 是否越大越好。


+ 7
- 6
fastNLP/core/collators/collator.py View File

@@ -6,19 +6,20 @@ from typing import List, Union, Dict, Callable, Sequence, Mapping
import os import os
import sys import sys
import inspect import inspect
import re


from fastNLP.core.log import logger from fastNLP.core.log import logger
from .padders.get_padder import get_padder from .padders.get_padder import get_padder
from ...envs import SUPPORT_BACKENDS


import re


from .packer_unpacker import SequencePackerUnpacker, SinglePackerUnpacker, MappingPackerUnpacker, \ from .packer_unpacker import SequencePackerUnpacker, SinglePackerUnpacker, MappingPackerUnpacker, \
NestedMappingPackerUnpacker NestedMappingPackerUnpacker


sequence_idx_str = re.compile(r'^_\d+$') # 形如_0, _1 sequence_idx_str = re.compile(r'^_\d+$') # 形如_0, _1
SUPPORTED_BACKENDS = ['torch', 'jittor', 'paddle', 'numpy', 'raw', 'auto', None] SUPPORTED_BACKENDS = ['torch', 'jittor', 'paddle', 'numpy', 'raw', 'auto', None]
CHECK_BACKEND = ['torch', 'jittor', 'paddle'] # backend 为 auto 时 检查是否是这些 backend
# 由于 jittor DataLoader 存在自动的 to_jittor 的转换,所以只需要 collate 成为 numpy 就行
AUTO_BACKEND_MAPPING = {'jittor': 'numpy'}


def _get_backend() -> str: def _get_backend() -> str:
""" """
@@ -40,7 +41,7 @@ def _get_backend() -> str:
catch_backend = [] catch_backend = []
try: try:
file = module.__file__ file = module.__file__
for backend in CHECK_BACKEND:
for backend in SUPPORT_BACKENDS:
if f'{os.sep}site-packages{os.sep}{backend}' in file: if f'{os.sep}site-packages{os.sep}{backend}' in file:
catch_backend = [backend, file] catch_backend = [backend, file]
except: except:
@@ -62,10 +63,10 @@ def _get_backend() -> str:
break break
if len(catch_backend): if len(catch_backend):
logger.debug(f"Find a file named:{catch_backend[1]} from stack contains backend:{catch_backend[0]}.") logger.debug(f"Find a file named:{catch_backend[1]} from stack contains backend:{catch_backend[0]}.")
return catch_backend[0]
return AUTO_BACKEND_MAPPING.get(catch_backend[0], catch_backend[0])


# 方式 (2) # 方式 (2)
for backend in CHECK_BACKEND:
for backend in SUPPORT_BACKENDS:
if backend in sys.modules: if backend in sys.modules:
logger.debug(f"sys.modules contains backend:{backend}.") logger.debug(f"sys.modules contains backend:{backend}.")
return backend return backend


+ 8
- 41
fastNLP/core/collators/padders/paddle_padder.py View File

@@ -30,7 +30,8 @@ if _NEED_IMPORT_PADDLE:
} }


from .padder import Padder from .padder import Padder
from .utils import is_number_or_numpy_number, is_number, is_numpy_number_dtype, get_shape, is_numpy_generic_class
from .utils import is_number_or_numpy_number, is_number, is_numpy_number_dtype, is_numpy_generic_class, \
get_padded_numpy_array
from .exceptions import * from .exceptions import *




@@ -54,7 +55,6 @@ def is_paddle_dtype_str(dtype):
return False return False





def _get_dtype(ele_dtype, dtype, class_name): def _get_dtype(ele_dtype, dtype, class_name):
if not (ele_dtype is None or is_number_or_numpy_number(ele_dtype) or is_paddle_tensor(ele_dtype) or is_paddle_dtype_str(ele_dtype)): if not (ele_dtype is None or is_number_or_numpy_number(ele_dtype) or is_paddle_tensor(ele_dtype) or is_paddle_dtype_str(ele_dtype)):
raise EleDtypeUnsupportedError(f"`{class_name}` only supports padding python numbers " raise EleDtypeUnsupportedError(f"`{class_name}` only supports padding python numbers "
@@ -131,7 +131,7 @@ class PaddleTensorPadder(Padder):
def pad(batch_field, pad_val, dtype): def pad(batch_field, pad_val, dtype):
try: try:
if not isinstance(batch_field[0], paddle.Tensor): if not isinstance(batch_field[0], paddle.Tensor):
batch_field = [paddle.to_tensor(field.tolist(), dtype=dtype) for field in batch_field]
batch_field = [np.array(field.tolist()) for field in batch_field]
else: else:
if dtype is None: if dtype is None:
dtype = batch_field[0].dtype dtype = batch_field[0].dtype
@@ -141,46 +141,14 @@ class PaddleTensorPadder(Padder):


shapes = [field.shape for field in batch_field] shapes = [field.shape for field in batch_field]
max_shape = [len(batch_field)] + [max(*_) for _ in zip(*shapes)] max_shape = [len(batch_field)] + [max(*_) for _ in zip(*shapes)]
tensor = paddle.full(max_shape, fill_value=pad_val, dtype=dtype)
array = np.full(max_shape, fill_value=pad_val)
for i, field in enumerate(batch_field): for i, field in enumerate(batch_field):
slices = (i, ) + tuple(slice(0, s) for s in shapes[i]) slices = (i, ) + tuple(slice(0, s) for s in shapes[i])
tensor[slices] = field
array[slices] = field
tensor = paddle.to_tensor(array, dtype=dtype)
return tensor return tensor




def fill_tensor(batch_field, padded_batch, dtype):
"""
将 batch_field 中的值填入到 tensor 中。

:param batch_field: 需要填充进入 array 中的内容
:param padded_batch: 待填充的 tensor
:param dtype: 数据的类别

:return:
"""
if padded_batch.ndim == 2:
for i, content_i in enumerate(batch_field):
padded_batch[i, :len(content_i)] = paddle.to_tensor(content_i, dtype=dtype)
elif padded_batch.ndim == 3:
for i, content_i in enumerate(batch_field):
for j, content_ii in enumerate(content_i):
padded_batch[i, j, :len(content_ii)] = paddle.to_tensor(content_ii, dtype=dtype)
elif padded_batch.ndim == 4:
try: # 应该是图像,所以直接应该就 ok 了。
padded_batch = np.array(batch_field)
except:
for i, content_i in enumerate(batch_field):
for j, content_ii in enumerate(content_i):
for k, content_iii in enumerate(content_ii):
padded_batch[i, j, k, :len(content_iii)] = paddle.to_tensor(content_iii, dtype=dtype)
elif padded_batch.ndim == 1:
padded_batch[:] = paddle.to_tensor(batch_field, dtype=dtype)
else:
raise RuntimeError("fastNLP does not support padding for more than 3 dimensions. If you need this, please "
"report.")
return padded_batch


def get_padded_paddle_tensor(batch_field, dtype=None, pad_val=0): def get_padded_paddle_tensor(batch_field, dtype=None, pad_val=0):
""" """
例如: 例如:
@@ -192,7 +160,6 @@ def get_padded_paddle_tensor(batch_field, dtype=None, pad_val=0):
:param pad_val: pad 的 value :param pad_val: pad 的 value
:return: :return:
""" """
shapes = get_shape(batch_field)
tensor = paddle.to_tensor(np.full(shape=shapes, fill_value=pad_val), dtype=dtype)
tensor = fill_tensor(batch_field, tensor, dtype=dtype)
array = get_padded_numpy_array(batch_field=batch_field, dtype=None, pad_val=pad_val)
tensor = paddle.to_tensor(array, dtype=dtype)
return tensor return tensor

+ 1
- 0
fastNLP/core/controllers/evaluator.py View File

@@ -159,6 +159,7 @@ class Evaluator:
self.reset() self.reset()
self.driver.barrier() self.driver.barrier()
except BaseException as e: except BaseException as e:
self.driver.on_exception()
raise e raise e
finally: finally:
self.finally_progress_bar() self.finally_progress_bar()


+ 11
- 1
fastNLP/core/controllers/trainer.py View File

@@ -117,18 +117,20 @@ class Trainer(TrainerEventTrigger):
:param accumulation_steps: 梯度累积的步数,表示每隔几个 batch 优化器迭代一次;默认为 1; :param accumulation_steps: 梯度累积的步数,表示每隔几个 batch 优化器迭代一次;默认为 1;
:param fp16: 是否开启混合精度训练;默认为 False; :param fp16: 是否开启混合精度训练;默认为 False;
:param monitor: 当存在 evaluate_dataloaders 时,默认的 monitor metric 的名字。传入的 callback 如果有 monitor 参数且没有 :param monitor: 当存在 evaluate_dataloaders 时,默认的 monitor metric 的名字。传入的 callback 如果有 monitor 参数且没有
在 callback 初始化设定的,将采取这个值。如果在 evaluation 结果中没有找到完全一致的名称,将使用 最公共字符串算法 找到最匹配
在 callback 初始化设定的,将采取这个值。如果在 evaluation 结果中没有找到完全一致的名称,将使用 最公共字符串算法 找到最匹配
的那个作为 monitor 。也可以传入一个函数,接受参数为 evaluation 的结果(字典类型),返回一个 float 值作为 monitor 的结果。 的那个作为 monitor 。也可以传入一个函数,接受参数为 evaluation 的结果(字典类型),返回一个 float 值作为 monitor 的结果。
如果 evaluate_dataloaders 与 metrics 没有提供,该参数无意义。 如果 evaluate_dataloaders 与 metrics 没有提供,该参数无意义。
:param larger_better: monitor 的值是否是越大越好。 :param larger_better: monitor 的值是否是越大越好。
:param marker: 用于标记一个 Trainer 实例,从而在用户调用 `Trainer.on` 函数时,标记该 callback 函数属于哪一个具体的 'trainer' 实例;默认为 None; :param marker: 用于标记一个 Trainer 实例,从而在用户调用 `Trainer.on` 函数时,标记该 callback 函数属于哪一个具体的 'trainer' 实例;默认为 None;
:param kwargs: 一些其它的可能需要的参数,见下方的说明 :param kwargs: 一些其它的可能需要的参数,见下方的说明
:kwargs: :kwargs:
* *torch_kwargs* --
* *torch_non_blocking* -- 表示用于 pytorch 的 tensor 的 to 方法的参数 non_blocking; * *torch_non_blocking* -- 表示用于 pytorch 的 tensor 的 to 方法的参数 non_blocking;
* *data_device* -- 表示如果用户的模型 device (在 Driver 中对应为参数 model_device)为 None 时,我们会将数据迁移到 data_device 上; * *data_device* -- 表示如果用户的模型 device (在 Driver 中对应为参数 model_device)为 None 时,我们会将数据迁移到 data_device 上;
注意如果 model_device 为 None,那么 data_device 不会起作用; 注意如果 model_device 为 None,那么 data_device 不会起作用;
* *torch_ddp_kwargs* -- 用于配置 pytorch 的 DistributedDataParallel 初始化时的参数;仅用于 pytorch ddp 训练。例如传入 * *torch_ddp_kwargs* -- 用于配置 pytorch 的 DistributedDataParallel 初始化时的参数;仅用于 pytorch ddp 训练。例如传入
{'find_unused_parameters': True} 来解决有有参数不参与前向运算导致的报错等。 {'find_unused_parameters': True} 来解决有有参数不参与前向运算导致的报错等。
* *torch_single_kwargs* --
* *set_grad_to_none* -- 是否在训练过程中在每一次 optimizer 更新后将 grad 置为 None; * *set_grad_to_none* -- 是否在训练过程中在每一次 optimizer 更新后将 grad 置为 None;
* *use_dist_sampler* -- 表示是否使用分布式的 sampler 。在多卡时,分布式 sampler 将自动决定每张卡上读取的 sample ,使得一个epoch * *use_dist_sampler* -- 表示是否使用分布式的 sampler 。在多卡时,分布式 sampler 将自动决定每张卡上读取的 sample ,使得一个epoch
内所有卡的 sample 加起来为一整个数据集的 sample。默认会根据 driver 是否为分布式进行设置。 内所有卡的 sample 加起来为一整个数据集的 sample。默认会根据 driver 是否为分布式进行设置。
@@ -360,6 +362,14 @@ class Trainer(TrainerEventTrigger):
self.on_exception(e) self.on_exception(e)
if not catch_KeyboardInterrupt: if not catch_KeyboardInterrupt:
raise e raise e
except RuntimeError as e:
if 'torch' in self.driver_name.lower(): # 如果是 torch ,需要检测一下 find_unused_parameters
if 'find_unused_parameters' in e.args[0]:
logger.error("You may need to pass `torch_ddp_kwargs={'find_unused_parameters': True}` in the "
"Trainer initialization to avoid this error.")
self.driver.on_exception()
self.on_exception(e)
raise e
except BaseException as e: except BaseException as e:
self.driver.on_exception() self.driver.on_exception()
self.on_exception(e) self.on_exception(e)


+ 4
- 1
fastNLP/core/dataloaders/__init__.py View File

@@ -5,10 +5,13 @@ __all__ = [
'JittorDataLoader', 'JittorDataLoader',
'prepare_jittor_dataloader', 'prepare_jittor_dataloader',
'prepare_paddle_dataloader', 'prepare_paddle_dataloader',
'prepare_torch_dataloader'
'prepare_torch_dataloader',

"prepare_dataloader"
] ]


from .mix_dataloader import MixDataLoader from .mix_dataloader import MixDataLoader
from .jittor_dataloader import JittorDataLoader, prepare_jittor_dataloader from .jittor_dataloader import JittorDataLoader, prepare_jittor_dataloader
from .torch_dataloader import TorchDataLoader, prepare_torch_dataloader from .torch_dataloader import TorchDataLoader, prepare_torch_dataloader
from .paddle_dataloader import PaddleDataLoader, prepare_paddle_dataloader from .paddle_dataloader import PaddleDataLoader, prepare_paddle_dataloader
from .prepare_dataloader import prepare_dataloader

+ 6
- 3
fastNLP/core/dataloaders/jittor_dataloader/fdl.py View File

@@ -4,6 +4,7 @@ __all__ = [
] ]


from typing import Callable, Optional, List, Union from typing import Callable, Optional, List, Union
from copy import deepcopy


from fastNLP.envs.imports import _NEED_IMPORT_JITTOR from fastNLP.envs.imports import _NEED_IMPORT_JITTOR


@@ -75,10 +76,12 @@ class JittorDataLoader:
if isinstance(collate_fn, str): if isinstance(collate_fn, str):
if collate_fn == "auto": if collate_fn == "auto":
if isinstance(self.dataset.dataset, FDataSet): if isinstance(self.dataset.dataset, FDataSet):
self.collate_fn = self.dataset.dataset.collator
self.collate_fn.set_backend(backend="jittor")
self.collate_fn = deepcopy(self.dataset.dataset.collator)
# jittor 比较特殊,只需要保证返回 numpy.array, 其Dataloader会转为jt.var
self.collate_fn.set_backend(backend="numpy")
else: else:
self.collate_fn = Collator(backend="jittor")
# jittor 比较特殊,只需要保证返回 numpy.array, 其Dataloader会转为jt.var
self.collate_fn = Collator(backend="numpy")
else: else:
raise ValueError(f"collate_fn: {collate_fn} must be 'auto'") raise ValueError(f"collate_fn: {collate_fn} must be 'auto'")
elif isinstance(collate_fn, Callable): elif isinstance(collate_fn, Callable):


+ 2
- 1
fastNLP/core/dataloaders/paddle_dataloader/fdl.py View File

@@ -4,6 +4,7 @@ __all__ = [
] ]


from typing import Callable, List, Optional, Union, Dict, Sequence from typing import Callable, List, Optional, Union, Dict, Sequence
from copy import deepcopy


from fastNLP.envs.imports import _NEED_IMPORT_PADDLE from fastNLP.envs.imports import _NEED_IMPORT_PADDLE


@@ -68,7 +69,7 @@ class PaddleDataLoader(DataLoader):
if isinstance(collate_fn, str): if isinstance(collate_fn, str):
if collate_fn == 'auto': if collate_fn == 'auto':
if isinstance(dataset.dataset, FDataSet): if isinstance(dataset.dataset, FDataSet):
collate_fn = dataset.dataset.collator
collate_fn = deepcopy(dataset.dataset.collator)
collate_fn.set_backend(backend="paddle") collate_fn.set_backend(backend="paddle")
else: else:
collate_fn = Collator(backend="paddle") collate_fn = Collator(backend="paddle")


+ 114
- 0
fastNLP/core/dataloaders/prepare_dataloader.py View File

@@ -0,0 +1,114 @@
__all__ = [
'prepare_dataloader'
]

from typing import Union, Callable
import os
import sys

from ..samplers import RandomBatchSampler, RandomSampler
from .torch_dataloader import prepare_torch_dataloader
from .paddle_dataloader import prepare_paddle_dataloader
from .jittor_dataloader import prepare_jittor_dataloader
from ...envs import FASTNLP_BACKEND, SUPPORT_BACKENDS, _module_available
from ..log import logger


def prepare_dataloader(dataset, batch_size: int = 16, shuffle: bool = False, drop_last: bool = False,
collate_fn: Union[Callable, str, None] = 'auto', num_workers: int = 0,
seed: int = 0, backend: str = 'auto'):
"""
自动创建合适的 ``DataLoader`` 对象。例如,检测当当前环境是 ``torch`` 的,则返回 ``TorchDataLoader`` , 是 ``paddle`` 的则
返回 ``PaddleDataLoader`` 。如果有更多需要定制的参数,请直接使用对应的 ``prepare`` 函数,例如
:func:`~fastNLP.prepare_torch_dataloader` 或 :func:`~fastNLP.prepare_paddle_dataloader` 等。

:param dataset: 实现 __getitem__() 和 __len__() 的对象;或这种对象的序列;或字典。

* 为单个数据集对象时
返回一个 DataLoader 。
* 为数据集对象序列时
返回一个序列的 DataLoader 。
* 为字典型 或 :class:`~fastNLP.io.DataBundle` 数据时,返回 `Dict` 类型的数据。
返回一个字典 。

:param batch_size: 批次大小。
:param shuffle: 是否打乱数据集。
:param drop_last: 当最后一个 batch 不足 batch_size 数量的是否,是否丢弃。
:param collate_fn: 用于处理一个 batch 的函数,一般包括 padding 和转为 tensor。有以下三种取值:

* 为 ``auto`` 时
使用 :class:`~fastNLP.Collator` 进行 padding 和 转tensor 。
* 为 ``Callable`` 时
应当接受一个 ``batch`` 的数据作为参数,同时输出一个对象 。
* 为 ``None`` 时
使用各个框架的 DataLoader 的默认 ``collate_fn`` 。
:param num_workers: 使用多少进程进行数据的 fetch 。
:param seed: 使用的随机数种子。
:param backend: 当前支持 ``["auto", "torch", "paddle", "jittor"]`` 四种类型。

* 为 ``auto`` 时
首先(1) 根据环境变量 "FASTNLP_BACKEND" 进行判断;如果没有设置则,(2)通过当前 ``sys.modules`` 中已经 import 的
``backend`` 进行判定。如果以上均无法判定,则报错。如果找到了 ``backend`` ,则按照下述的方式处理。
* 为 ``torch`` 时
使用 :func:`~fastNLP.prepare_torch_dataloader` 。
* 为 ``paddle`` 时
使用 :func:`~fastNLP.prepare_paddle_dataloader` 。
* 为 ``jittor`` 时
使用 :func:`~fastNLP.prepare_jittor_dataloader` 。

:return
"""
if backend == 'auto':
backend = _get_backend()
if backend == 'torch':
batch_sampler = RandomBatchSampler(dataset=dataset, batch_size=batch_size, shuffle=shuffle,
drop_last=drop_last, seed=seed)
return prepare_torch_dataloader(ds_or_db=dataset, batch_sampler=batch_sampler, collate_fn=collate_fn,
num_workers=num_workers, shuffle=False, sampler=None)
elif backend == 'paddle':
batch_sampler = RandomBatchSampler(dataset=dataset, batch_size=batch_size, shuffle=shuffle,
drop_last=drop_last, seed=seed)
return prepare_paddle_dataloader(ds_or_db=dataset, batch_sampler=batch_sampler, collate_fn=collate_fn,
num_workers=num_workers)
elif backend == 'jittor':
sampler = RandomSampler(dataset=dataset, shuffle=shuffle, seed=seed)
prepare_jittor_dataloader(ds_or_db=dataset, sampler=sampler, collate_fn=collate_fn,
num_workers=num_workers, batch_size=batch_size, shuffle=shuffle,
drop_last=drop_last)
else:
raise ValueError(f"Currently we do not support backend:{backend}.")


def _check_module(module):
"""
检查该 module 是否含有 某个 backend 的特征

:param module: module 对象
:return:
"""
try:
file = module.__file__
for backend in SUPPORT_BACKENDS:
if f'{os.sep}site-packages{os.sep}{backend}' in file:
return backend
except:
pass
return None


def _get_backend():
if os.environ.get(FASTNLP_BACKEND, None) != None:
backend = os.environ.get(FASTNLP_BACKEND)
logger.debug(f"Get Dataloader backend:{backend} from os.environ")
else:
available_backends = set()
for module in sys.modules.values():
_backend = _check_module(module)
if _backend:
available_backends.add(_backend)
if len(available_backends) == 1:
backend = available_backends.pop()
logger.debug(f"Get Dataloader backend:{backend} from sys.modules.")
else:
raise RuntimeError("Fail to detect dataloader backend automatically, please set it manually.")
return backend

+ 13
- 24
fastNLP/core/dataloaders/torch_dataloader/fdl.py View File

@@ -4,7 +4,7 @@ __all__ = [
] ]


from typing import Optional, Callable, Sequence, Union, Tuple, Dict, Mapping, List from typing import Optional, Callable, Sequence, Union, Tuple, Dict, Mapping, List
import inspect
from copy import deepcopy


from fastNLP.core.dataset import DataSet from fastNLP.core.dataset import DataSet
from fastNLP.core.collators import Collator from fastNLP.core.collators import Collator
@@ -84,7 +84,7 @@ class TorchDataLoader(DataLoader):
if isinstance(collate_fn, str): if isinstance(collate_fn, str):
if collate_fn == 'auto': if collate_fn == 'auto':
if isinstance(dataset.dataset, DataSet): # 使用了 fastnlp dataset if isinstance(dataset.dataset, DataSet): # 使用了 fastnlp dataset
collate_fn = dataset.dataset.collator
collate_fn = deepcopy(dataset.dataset.collator)
collate_fn.set_backend(backend="torch") collate_fn.set_backend(backend="torch")
else: else:
collate_fn = Collator(backend="torch") collate_fn = Collator(backend="torch")
@@ -178,8 +178,8 @@ class TorchDataLoader(DataLoader):




def prepare_torch_dataloader(ds_or_db: Union[DataSet, Sequence[DataSet], Mapping[str, DataSet]], def prepare_torch_dataloader(ds_or_db: Union[DataSet, Sequence[DataSet], Mapping[str, DataSet]],
batch_size: int = 16,
shuffle: bool = True,
batch_size: int = 1,
shuffle: bool = False,
sampler: Union["Sampler[int]", ReproducibleSampler, UnrepeatedSampler] = None, sampler: Union["Sampler[int]", ReproducibleSampler, UnrepeatedSampler] = None,
batch_sampler: Union["Sampler[Sequence[int]]", ReproducibleBatchSampler] = None, batch_sampler: Union["Sampler[Sequence[int]]", ReproducibleBatchSampler] = None,
num_workers: int = 0, collate_fn: Union[str, Callable, None] = 'auto', num_workers: int = 0, collate_fn: Union[str, Callable, None] = 'auto',
@@ -250,26 +250,15 @@ def prepare_torch_dataloader(ds_or_db: Union[DataSet, Sequence[DataSet], Mapping
elif isinstance(ds_or_db, Sequence): elif isinstance(ds_or_db, Sequence):
dl_bundle = [] dl_bundle = []
for idx, ds in enumerate(ds_or_db): for idx, ds in enumerate(ds_or_db):
if idx == 0:
dl_bundle.append(
TorchDataLoader(dataset=ds, batch_size=batch_size,
shuffle=shuffle, sampler=sampler, batch_sampler=batch_sampler,
num_workers=num_workers, collate_fn=collate_fn, pin_memory=pin_memory,
drop_last=drop_last, timeout=timeout, worker_init_fn=worker_init_fn,
multiprocessing_context=multiprocessing_context, generator=generator,
prefetch_factor=prefetch_factor, persistent_workers=persistent_workers,
)
)
else:
dl_bundle.append(
TorchDataLoader(dataset=ds, batch_size=batch_size,
shuffle=shuffle, sampler=sampler, batch_sampler=batch_sampler,
num_workers=num_workers, collate_fn=collate_fn, pin_memory=pin_memory,
drop_last=drop_last, timeout=timeout, worker_init_fn=worker_init_fn,
multiprocessing_context=multiprocessing_context, generator=generator,
prefetch_factor=prefetch_factor, persistent_workers=persistent_workers,
)
)
dl_bundle.append(
TorchDataLoader(dataset=ds, batch_size=batch_size,
shuffle=shuffle, sampler=sampler, batch_sampler=batch_sampler,
num_workers=num_workers, collate_fn=collate_fn, pin_memory=pin_memory,
drop_last=drop_last, timeout=timeout, worker_init_fn=worker_init_fn,
multiprocessing_context=multiprocessing_context, generator=generator,
prefetch_factor=prefetch_factor, persistent_workers=persistent_workers,
)
)
return dl_bundle return dl_bundle


elif isinstance(ds_or_db, Mapping): elif isinstance(ds_or_db, Mapping):


+ 1
- 1
fastNLP/core/drivers/paddle_driver/fleet.py View File

@@ -285,7 +285,7 @@ class PaddleFleetDriver(PaddleDriver):
self.world_size = int(os.environ.get("PADDLE_TRAINERS_NUM")) self.world_size = int(os.environ.get("PADDLE_TRAINERS_NUM"))
self.global_rank = int(os.environ.get("PADDLE_TRAINER_ID")) self.global_rank = int(os.environ.get("PADDLE_TRAINER_ID"))
reset_seed() reset_seed()
logger.info(f"\nworld size, global rank: {self.world_size}, {self.global_rank}\n")
logger.info(f"World size: {self.world_size}, Global rank: {self.global_rank}")
if not parallel_helper._is_parallel_ctx_initialized(): if not parallel_helper._is_parallel_ctx_initialized():
fleet.init(self.role_maker, self.is_collective, self.strategy) fleet.init(self.role_maker, self.is_collective, self.strategy)




+ 1
- 1
fastNLP/core/drivers/torch_driver/ddp.py View File

@@ -251,7 +251,7 @@ class TorchDDPDriver(TorchDriver):
self.world_size = int(os.environ.get("WORLD_SIZE")) self.world_size = int(os.environ.get("WORLD_SIZE"))
self.global_rank = int(os.environ.get("RANK")) self.global_rank = int(os.environ.get("RANK"))
reset_seed() reset_seed()
logger.info(f"World size:{self.world_size}, Global rank:{self.global_rank}")
logger.info(f"World size: {self.world_size}, Global rank: {self.global_rank}")


if not dist.is_initialized(): if not dist.is_initialized():
dist.init_process_group( dist.init_process_group(


+ 1
- 1
fastNLP/core/drivers/torch_driver/initialize_torch_driver.py View File

@@ -61,7 +61,7 @@ def initialize_torch_driver(driver: str, device: Optional[Union[str, "torch.devi
elif device is not None and not isinstance(device, torch.device): elif device is not None and not isinstance(device, torch.device):
raise ValueError("Parameter `device` is wrong type, please check our documentation for the right use.") raise ValueError("Parameter `device` is wrong type, please check our documentation for the right use.")


if driver == "torch":
if driver == "torch": # single, ddp, 直接启动。
if not isinstance(device, List): if not isinstance(device, List):
return TorchSingleDriver(model, device, **kwargs) return TorchSingleDriver(model, device, **kwargs)
else: else:


+ 2
- 0
fastNLP/core/utils/utils.py View File

@@ -22,6 +22,8 @@ import numpy as np
from pathlib import Path from pathlib import Path


from fastNLP.core.log import logger from fastNLP.core.log import logger
from ...envs import SUPPORT_BACKENDS



__all__ = [ __all__ = [
'get_fn_arg_names', 'get_fn_arg_names',


+ 13
- 0
tests/core/dataloaders/test_prepare_dataloader.py View File

@@ -0,0 +1,13 @@
import pytest

from fastNLP import prepare_dataloader
from fastNLP import DataSet


@pytest.mark.torch
def test_torch():
import torch
ds = DataSet({"x": [[1, 2], [2, 3, 4], [4, 5, 6, 7]] * 10, "y": [1, 0, 1] * 10})
dl = prepare_dataloader(ds, batch_size=2, shuffle=True)
for batch in dl:
assert isinstance(batch['x'], torch.Tensor)

Loading…
Cancel
Save