From 7c70874b4a0424af8d157ad4ed43ededee464b61 Mon Sep 17 00:00:00 2001 From: yh_cc Date: Fri, 15 Apr 2022 16:04:43 +0800 Subject: [PATCH] =?UTF-8?q?=E5=88=A0=E9=99=A4core.sampelrs.sampler.py?= =?UTF-8?q?=EF=BC=9B=E5=A2=9E=E5=8A=A0torch=E7=9A=84clipgradient=E5=92=8Cw?= =?UTF-8?q?armupcallback?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- fastNLP/core/callbacks/__init__.py | 6 +- .../callbacks/torch_callbacks/__init__.py | 8 + .../torch_grad_clip_callback.py | 52 ++ .../torch_lr_sched_callback.py | 58 ++ fastNLP/core/drivers/driver.py | 2 +- .../drivers/torch_driver/single_device.py | 8 +- fastNLP/core/samplers/__init__.py | 6 - fastNLP/core/samplers/sampler.py | 728 ------------------ fastNLP/io/loader/conll.py | 3 +- .../callbacks/torch_callbacks/__init__.py | 0 .../test_torch_grad_clip_callback.py | 41 + .../test_torch_warmup_callback.py | 34 + tests/core/samplers/test_sampler.py | 31 - .../prepare_trainer_args_for_torch_test.py | 68 ++ 14 files changed, 275 insertions(+), 770 deletions(-) create mode 100644 fastNLP/core/callbacks/torch_callbacks/__init__.py create mode 100644 fastNLP/core/callbacks/torch_callbacks/torch_grad_clip_callback.py create mode 100644 fastNLP/core/callbacks/torch_callbacks/torch_lr_sched_callback.py delete mode 100644 fastNLP/core/samplers/sampler.py create mode 100644 tests/core/callbacks/torch_callbacks/__init__.py create mode 100644 tests/core/callbacks/torch_callbacks/test_torch_grad_clip_callback.py create mode 100644 tests/core/callbacks/torch_callbacks/test_torch_warmup_callback.py delete mode 100644 tests/core/samplers/test_sampler.py create mode 100644 tests/helpers/callbacks/prepare_trainer_args_for_torch_test.py diff --git a/fastNLP/core/callbacks/__init__.py b/fastNLP/core/callbacks/__init__.py index fc5d9d5b..58de0319 100644 --- a/fastNLP/core/callbacks/__init__.py +++ b/fastNLP/core/callbacks/__init__.py @@ -11,7 +11,10 @@ __all__ = [ 'RichCallback', "LRSchedCallback", 'LoadBestModelCallback', - "EarlyStopCallback" + "EarlyStopCallback", + + "TorchWarmupCallback", + "TorchGradClipCallback" ] @@ -23,4 +26,5 @@ from .progress_callback import choose_progress_callback, ProgressCallback, RichC from .lr_scheduler_callback import LRSchedCallback from .load_best_model_callback import LoadBestModelCallback from .early_stop_callback import EarlyStopCallback +from .torch_callbacks import * diff --git a/fastNLP/core/callbacks/torch_callbacks/__init__.py b/fastNLP/core/callbacks/torch_callbacks/__init__.py new file mode 100644 index 00000000..1cadd7f6 --- /dev/null +++ b/fastNLP/core/callbacks/torch_callbacks/__init__.py @@ -0,0 +1,8 @@ +__all__ = [ + 'TorchWarmupCallback', + 'TorchGradClipCallback' +] + + +from .torch_lr_sched_callback import TorchWarmupCallback +from .torch_grad_clip_callback import TorchGradClipCallback \ No newline at end of file diff --git a/fastNLP/core/callbacks/torch_callbacks/torch_grad_clip_callback.py b/fastNLP/core/callbacks/torch_callbacks/torch_grad_clip_callback.py new file mode 100644 index 00000000..d5104a26 --- /dev/null +++ b/fastNLP/core/callbacks/torch_callbacks/torch_grad_clip_callback.py @@ -0,0 +1,52 @@ +__all__ = [ + 'TorchGradClipCallback' +] +from ..callback import Callback + + +class TorchGradClipCallback(Callback): + def __init__(self, clip_value=1, clip_type='norm', parameters=None): + r""" + 在每次 optimizer update 之前将 parameter 进行 clip + + :param float clip_value: 将gradient 限制到[-clip_value, clip_value]。clip_value应该为正数 + :param str clip_type: 支持'norm', 'value' + 两种:: + + 1 'norm', 将gradient的norm rescale到[-clip_value, clip_value] + + 2 'value', 将gradient限制在[-clip_value, clip_value], + 小于-clip_value的gradient被赋值为-clip_value; + 大于clip_value的gradient被赋值为clip_value. + :param None,torch.Tensor,List[torch.Tensor] parameters: 一般通过model.parameters()获得。 + 如果为None则默认对 Trainer 的 optimizers 中所有参数进行梯度裁剪。 + """ + super().__init__() + + from torch import nn + if clip_type == 'norm': + self.clip_fun = nn.utils.clip_grad_norm_ + elif clip_type == 'value': + self.clip_fun = nn.utils.clip_grad_value_ + else: + raise ValueError("Only supports `norm` or `value` right now.") + if parameters is not None: + self.parameters = list(parameters) + else: + self.parameters = None + self.clip_value = clip_value + + def on_after_trainer_initialized(self, trainer, driver): + assert 'torch' in driver.__class__.__name__.lower(), f"Callback:{self.__class__.__name__} only supports torch " \ + f"related drivers for now." + parameters = [] + for optimizer in trainer.driver.optimizers: + for param_group in optimizer.param_groups: + parameters.extend(param_group['params']) + self.parameters = parameters + assert len(self.parameters), "There is no parameters need to be clipped." + + def on_before_optimizers_step(self, trainer, optimizers): + for optimizer in trainer.driver.optimizers: + trainer.driver.grad_scaler.unscale_(optimizer) + self.clip_fun(self.parameters, self.clip_value) diff --git a/fastNLP/core/callbacks/torch_callbacks/torch_lr_sched_callback.py b/fastNLP/core/callbacks/torch_callbacks/torch_lr_sched_callback.py new file mode 100644 index 00000000..3d428d47 --- /dev/null +++ b/fastNLP/core/callbacks/torch_callbacks/torch_lr_sched_callback.py @@ -0,0 +1,58 @@ +__all__ = [ + 'TorchWarmupCallback' +] +import math + +from ..callback import Callback + + +class TorchWarmupCallback(Callback): + def __init__(self, warmup=0.1, schedule='constant'): + r""" + 调整 learning rate 的 callback 。仅在实际发生参数更新的情况下 + + :param int,float warmup: 如果warmup为int,则在该step之前,learning rate根据schedule的策略变化; 如果warmup为float, + 如0.1, 则前10%的step是按照schedule策略调整learning rate。 + :param str schedule: 以哪种方式调整。 + linear: 前warmup的step上升到指定的learning rate(从Trainer中的optimizer处获取的), 后warmup的step下降到0; + constant前warmup的step上升到指定learning rate,后面的step保持learning rate. + """ + super().__init__() + self.warmup = max(warmup, 0.) + + self.initial_lrs = [] # 存放param_group的learning rate + if schedule == 'constant': + self.get_lr = self._get_constant_lr + elif schedule == 'linear': + self.get_lr = self._get_linear_lr + else: + raise RuntimeError("Only support 'linear', 'constant'.") + + def _get_constant_lr(self, progress): + if progress 1: + self.warmup = self.warmup / self.t_steps + self.t_steps = max(2, self.t_steps) # 不能小于2 + # 防止 t_steps 不能整除 accumulation_steps + self.t_steps = math.ceil(self.t_steps/trainer.accumulation_steps) * trainer.accumulation_steps + # 获取param_group的初始learning rate + for optimizer in trainer.driver.optimizers: + for group in optimizer.param_groups: + self.initial_lrs.append(group['lr']) + + def on_before_optimizers_step(self, trainer, optimizers): + # 这里需要加 accumulation_steps 是防止 lr 从 0 开始 + progress = (trainer.global_forward_batches + trainer.accumulation_steps) / self.t_steps + for optimizer in trainer.driver.optimizers: + for lr, group in zip(self.initial_lrs, optimizer.param_groups): + group['lr'] = lr * self.get_lr(progress) diff --git a/fastNLP/core/drivers/driver.py b/fastNLP/core/drivers/driver.py index 0ef7f053..06547516 100644 --- a/fastNLP/core/drivers/driver.py +++ b/fastNLP/core/drivers/driver.py @@ -129,7 +129,7 @@ class Driver(ABC): @property def optimizers(self) -> List: r""" - 如下所示,driver 返回的 optimizers 一定是一个 List,如果用户直接向 Trainer 传入一个单独的 optimzer,我们会使用一个 List 将其 + 如下所示,driver 返回的 optimizers 一定是一个 List,如果用户直接向 Trainer 传入一个单独的 optimizer,我们会使用一个 List 将其 包裹; :return: List[optimizer0, optimizer1, optimizer2, ...] diff --git a/fastNLP/core/drivers/torch_driver/single_device.py b/fastNLP/core/drivers/torch_driver/single_device.py index adc61bd1..99ba754e 100644 --- a/fastNLP/core/drivers/torch_driver/single_device.py +++ b/fastNLP/core/drivers/torch_driver/single_device.py @@ -37,7 +37,12 @@ class TorchSingleDriver(TorchDriver): super(TorchSingleDriver, self).__init__(model, fp16=fp16, **kwargs) if device is None: - raise ValueError("Parameter `device` can not be None in `TorchSingleDriver`.") + logger.debug("device is not set, fastNLP will try to automatically get it.") + try: + device = next(model.parameters()).device + assert isinstance(device, torch.device) + except: + raise ValueError("fastNLP cannot get device automatically, please set device explicitly.") self.model_device = device @@ -70,6 +75,7 @@ class TorchSingleDriver(TorchDriver): return self.model, model.forward else: + # TODO 这种直接调用模型某个接口的方法无法触发hook,也许需要做一个warning,如果用户有钩子,提醒他train_step无法触发。 if hasattr(self.model, fn): fn = getattr(self.model, fn) if not callable(fn): diff --git a/fastNLP/core/samplers/__init__.py b/fastNLP/core/samplers/__init__.py index 61433e8e..edc1f891 100644 --- a/fastNLP/core/samplers/__init__.py +++ b/fastNLP/core/samplers/__init__.py @@ -1,9 +1,4 @@ __all__ = [ - 'BucketSampler', - 'SortedSampler', - 'ConstTokenNumSampler', - 'ConstantTokenNumSampler', - 'MixSampler', 'DopedSampler', 'MixSequentialSampler', @@ -26,7 +21,6 @@ __all__ = [ "re_instantiate_sampler" ] -from .sampler import BucketSampler, SortedSampler, ConstTokenNumSampler, ConstantTokenNumSampler from .unrepeated_sampler import UnrepeatedSampler, UnrepeatedRandomSampler, UnrepeatedSortedSampler, UnrepeatedSequentialSampler from .mix_sampler import MixSampler, DopedSampler, MixSequentialSampler, PollingSampler from .reproducible_sampler import ReproducibleSampler, RandomSampler, SequentialSampler, SortedSampler diff --git a/fastNLP/core/samplers/sampler.py b/fastNLP/core/samplers/sampler.py deleted file mode 100644 index 89751884..00000000 --- a/fastNLP/core/samplers/sampler.py +++ /dev/null @@ -1,728 +0,0 @@ -r""" -sampler 子类实现了 fastNLP 所需的各种采样器。 -""" - -__all__ = [ - "BucketSampler", - "SortedSampler", - 'ConstTokenNumSampler', - "ConstantTokenNumSampler", -] - -from itertools import chain -from typing import List, Iterable - -import numpy as np - -from fastNLP.envs.imports import _NEED_IMPORT_TORCH - -if _NEED_IMPORT_TORCH: - from torch.utils.data import Sampler -else: - from fastNLP.core.utils.dummy_class import DummyClass as Sampler - -# class DopedSampler(Sampler): -# """ -# 定制给MixDataLoader的BatchSampler,其功能是将传入的datasets的list列表混合采样组成一个个batch返回。 -# """ -# -# def __init__(self, dataset: Union[List, Dict], batch_size: int = None, -# sampler: Union[List[Sampler], Dict[str, Sampler]] = None, -# ds_ratio: Union[str, None, List[float], Dict[str, float]] = None, drop_last: bool = False) -> None: -# if batch_size <= 0: -# raise ValueError("batch_size should be a positive integer value, " -# "but got batch_size={}".format(batch_size)) -# if not isinstance(drop_last, bool): -# raise ValueError("drop_last should be a boolean value, but got " -# "drop_last={}".format(drop_last)) -# self.batch_size = batch_size -# self.drop_last = drop_last -# self.ds_ratio = ds_ratio -# if sampler is None: -# if isinstance(dataset, List): -# self.sampler = [SequentialSampler(ds) for ds in dataset] -# elif isinstance(dataset, Dict): -# self.sampler = {name: SequentialSampler(ds) for name, ds in dataset.items()} -# -# elif isinstance(sampler, List): -# if len(sampler) != len(dataset): -# raise ValueError("the length of sampler != the length of sampler") -# self.sampler = sampler -# else: -# self.sampler = sampler -# if ds_ratio == 'pad_to_most' or ds_ratio == 'truncate_to_least' or ds_ratio is None: -# self.ds_ratio = ds_ratio -# elif isinstance(ds_ratio, List): -# if not all(item >= 0 for item in ds_ratio): -# raise ValueError("batch_size should be a positive integer value, " -# "but got batch_size={}".format(ds_ratio)) -# self.ds_ratio = ds_ratio -# else: -# raise ValueError(f"{ds_ratio} must be pad_to_least or truncate_to_least or None") -# -# def __iter__(self): -# samplers, index = [], 0 -# if isinstance(self.sampler, List): -# for idx, sampler in enumerate(self.sampler): -# samplers.append((iter(sampler), self.batch_size, index, 0, idx)) -# index += len(sampler) -# elif isinstance(self.sampler, Dict): -# for name, sampler in self.sampler.items(): -# samplers.append((iter(sampler), self.batch_size, index, 0, name)) -# index += len(sampler) -# -# def __len__(self): -# lens = 0 -# max_len, ds_len = 0, 0 -# if self.ds_ratio == 'truncate_to_least': -# if isinstance(self.sampler, List): -# max_len = min(len(sampler) for sampler in self.sampler) -# ds_len = len(self.sampler) -# elif isinstance(self.sampler, Dict): -# max_len = min(len(sampler) for _, sampler in self.sampler.items()) -# for _, _ in self.sampler.items(): -# ds_len += 1 -# -# elif self.ds_ratio == 'pad_to_most': -# if isinstance(self.sampler, List): -# max_len = max(len(sampler) for sampler in self.sampler) -# ds_len = len(self.sampler) -# elif isinstance(self.sampler, Dict): -# max_len = max(len(sampler) for _, sampler in self.sampler.items()) -# for _, _ in self.sampler.items(): -# ds_len += 1 -# -# if self.ds_ratio is None: -# if isinstance(self.sampler, List): -# for i in range(len(self.sampler)): -# sampler = self.sampler[i] -# if self.drop_last: -# lens += len(sampler) // self.batch_size -# else: -# lens += (len(sampler) + self.batch_size - 1) // self.batch_size -# elif isinstance(self.sampler, Dict): -# for name, sampler in self.sampler.items(): -# if self.drop_last: -# lens += len(sampler) // self.batch_size -# else: -# lens += (len(sampler) + self.batch_size - 1) // self.batch_size -# elif self.ds_ratio == 'truncate_to_least' or self.ds_ratio == 'pad_to_most': -# for i in range(ds_len): -# if self.drop_last: -# lens += max_len // self.batch_size -# else: -# lens += (max_len + self.batch_size - 1) // self.batch_size -# return lens -# -# def demo(self): -# indexes = np.array([0]*self.batch_size + [1]*self.batch_size + [2]*self.batch_size) -# shift = np.array([0]*self.batch_size + [len(ds1)]*self.batch_size + [len(ds1)+len(ds2)]*self.batch_size) -# buffer = np.zeros(self.batch_size*self.num_ds, dtype=int) -# select_sampler = np.random.randint(0, self.batch_size*self.num_ds, num_sample=self.batch_size) -# select_indices = buffer[select_sampler] + shift[select_sampler] -# num_1 = (indexes[select_sampler]==0).sum() -# - - -# class MixSequentialSampler(Sampler): -# """ -# 定制给MixDataLoader的BatchSampler,其功能是将传入的datasets的list列表顺序采样并返回index,只有处理了上一个dataset才会处理下一个。 -# """ -# -# def __init__(self, dataset: Union[List, Dict], batch_size: int = None, -# sampler: Union[List[Sampler], Dict[str, Sampler], None] = None, -# drop_last: bool = False) -> None: -# """ -# -# :param dataset: 实现了__getitem__和__len__的数据容器列表 -# :param batch_size: 对应dataset的批次大小,可以为list或者为int,当为int时默认所有dataset -# :param sampler: 实例化好的sampler,每个dataset对应一个sampler对象 -# :param drop_last: 是否去掉最后一个batch的数据,其长度小于batch_size -# """ -# # 如果dataset为Dict,则其他参数如collate_fn必须为Dict或者Callable, -# if isinstance(dataset, Dict) and isinstance(sampler, List): -# raise ValueError(f"{sampler} must be dict") -# -# # 判断batch_size是否大于等于0 -# if batch_size <= 0: -# raise ValueError("batch_size should be a positive integer value, " -# "but got batch_size={}".format(batch_size)) -# -# if not isinstance(drop_last, bool): -# raise ValueError("drop_last should be a boolean value, but got " -# "drop_last={}".format(drop_last)) -# self.batch_size = batch_size -# self.drop_last = drop_last -# if sampler is None: -# if isinstance(dataset, List): -# self.sampler = [SequentialSampler(ds) for ds in dataset] -# elif isinstance(dataset, Dict): -# self.sampler = {name: SequentialSampler(ds) for name, ds in dataset.items()} -# elif isinstance(sampler, List): -# if len(sampler) != len(dataset): -# raise ValueError("the length of sampler != the length of sampler") -# self.sampler = sampler -# -# def __iter__(self) -> Iterable[List[int]]: -# """ -# 按照dataset的顺序采样,打包成一个batch后返回 -# :return: -# """ -# index = 0 -# batch = [] -# if isinstance(self. sampler, List): -# for i in range(len(self.sampler)): -# sampler = self.sampler[i] -# for idx in sampler: -# batch.append(idx + index) -# if len(batch) == self.batch_size: -# yield batch -# batch = [] -# if len(batch) > 0 and not self.drop_last: -# yield batch -# batch = [] -# index += len(sampler) -# elif isinstance(self.sampler, Dict): -# for name, sampler in self.sampler.items(): -# for idx in sampler: -# batch.append(idx + index) -# if len(batch) == self.batch_size: -# yield batch -# batch = [] -# if len(batch) > 0 and not self.drop_last: -# yield batch -# batch = [] -# index += len(sampler) -# -# def __len__(self) -> int: -# lens = 0 -# if isinstance(self.sampler, List): -# for i in range(len(self.sampler)): -# sampler = self.sampler[i] -# if self.drop_last: -# lens += len(sampler) // self.batch_size -# else: -# lens += (len(sampler) + self.batch_size - 1) // self.batch_size -# elif isinstance(self.sampler, Dict): -# for _, sampler in self.sampler.items(): -# if self.drop_last: -# lens += len(sampler) // self.batch_size -# else: -# lens += (len(sampler) + self.batch_size - 1) // self.batch_size -# return lens - - -# class PollingSampler(Sampler): -# """ -# 定制给MixDataLoader的BatchSampler,其功能是将传入的datasets的list列表轮流采样并返回index,处理了上个dataset的一个batch后会处理下一个。 -# """ -# -# def __init__(self, dataset: Union[List, Dict], batch_size: int = 16, -# sampler: Union[List[Sampler], Dict[str, Sampler]] = None, -# drop_last: bool = False, ds_ratio="pad_to_most") -> None: -# """ -# -# :param dataset: 实现了__getitem__和__len__的数据容器列表 -# :param batch_size: 对应dataset的批次大小,可以为list或者为int,当为int时默认所有dataset -# :param sampler: 实例化好的sampler,每个dataset对应一个sampler对象 -# :param drop_last: 是否去掉最后一个batch的数据,其长度小于batch_size -# :param ds_ratio: 当ds_ratio=None时候, 轮流采样dataset列表直至所有的数据集采样完;当ds_ratio='truncate_to_least'时, -# 以dataset列表最短的ds为基准,长的数据集会被截断;当ds_ratio='pad_to_most'时,以dataset列表最长ds为基准,短的数据集会被重采样 -# """ -# # 如果dataset为Dict,则其他参数如collate_fn必须为Dict或者Callable, -# if isinstance(dataset, Dict) and isinstance(sampler, List): -# raise ValueError(f"{sampler} must be dict") -# if isinstance(dataset, List) and isinstance(sampler, Dict): -# raise ValueError(f"{sampler} must be list") -# # 判断batch_size是否大于等于0 -# if batch_size <= 0: -# raise ValueError("batch_size should be a positive integer value, " -# "but got batch_size={}".format(batch_size)) -# -# if not isinstance(drop_last, bool): -# raise ValueError("drop_last should be a boolean value, but got " -# "drop_last={}".format(drop_last)) -# -# self.batch_size = batch_size -# self.drop_last = drop_last -# if sampler is None: -# if isinstance(dataset, List): -# self.sampler = [SequentialSampler(ds) for ds in dataset] -# elif isinstance(dataset, Dict): -# self.sampler = {name: SequentialSampler(ds) for name, ds in dataset.items()} -# -# elif isinstance(sampler, List): -# if len(sampler) != len(dataset): -# raise ValueError("the length of sampler != the length of sampler") -# self.sampler = sampler -# else: -# self.sampler = sampler -# if ds_ratio == 'pad_to_most' or ds_ratio == 'truncate_to_least' or ds_ratio is None: -# self.ds_ratio = ds_ratio -# else: -# raise ValueError(f"{ds_ratio} must be pad_to_least or truncate_to_least or None") -# -# def __iter__(self) -> Iterable[List[int]]: -# # index是数据集下标基址, pointer指向数据集列表的某个数据集 -# index, pointer, samplers, flag = 0, 0, [], False -# -# if isinstance(self.sampler, List): -# for idx, sampler in enumerate(self.sampler): -# samplers.append((iter(sampler), self.batch_size, index, 0, idx)) -# index += len(sampler) -# elif isinstance(self.sampler, Dict): -# for name, sampler in self.sampler.items(): -# samplers.append((iter(sampler), self.batch_size, index, 0, name)) -# index += len(sampler) -# if self.ds_ratio == 'pad_to_most': -# if isinstance(self.sampler, List): -# limit_len = max(len(ds) for ds in self.sampler) -# else: -# limit_len = max(len(ds) for _, ds in self.sampler.items()) -# elif self.ds_ratio == 'truncate_to_least': -# if isinstance(self.sampler, List): -# limit_len = min(len(ds) for ds in self.sampler) -# else: -# limit_len = min(len(ds) for _, ds in self.sampler.items()) -# else: -# limit_len = 0 -# # 最后一个批次的大小 -# last_batch_size = limit_len % self.batch_size -# -# while True: -# # 全部采样完,退出 -# if len(samplers) == 0: -# break -# batch, flag = [], False -# # sampler_len代表已经取出来的数据个数 -# sampler, batch_size, index, sampler_len, name = samplers.pop(0) -# for _ in range(batch_size): -# try: -# batch.append(index + next(sampler)) -# sampler_len += 1 -# except StopIteration: -# flag = True -# # ds_ratio为None,第一种情况,删除掉采样完的数据即可。 -# if self.ds_ratio == 'pad_to_most' and sampler_len < limit_len: -# # 重置sampler,并取足一个batch数据 -# sampler = iter(self.sampler[name]) -# # 由于batch_size一定小于等于ds的长度,故能够取足一个batch_size的数据 -# for _ in range(batch_size-len(batch)): -# batch.append(next(sampler) + index) -# sampler_len += 1 -# break -# -# # ds_ratio不为None情况 -# # 两种情况会触发一下逻辑:1.truncate_to_least时,最短的数据集最后一个batch大小不等于batch_size时, -# # 其他较长的数据集的最后一个batch长度会较长;2. pad_to_most,最长的数据集最后一个batch不等于batch_size时,较短数据集最后一个 -# # batch长度会较长 -# if limit_len != 0 and limit_len < sampler_len: -# batch = batch[:last_batch_size] -# # ds_ratio为任意情况下, 没有取完所有数据,则添加到队列尾部 -# elif (limit_len == 0 and flag == False) or limit_len > sampler_len: -# samplers.append((sampler, batch_size, index, sampler_len, name)) -# if len(batch) == batch_size: -# yield batch -# elif len(batch) > 0 and not self.drop_last: -# yield batch -# -# def __len__(self) -> int: -# lens = 0 -# max_len, ds_len = 0, 0 -# if self.ds_ratio == 'truncate_to_least': -# if isinstance(self.sampler, List): -# max_len = min(len(sampler) for sampler in self.sampler) -# ds_len = len(self.sampler) -# elif isinstance(self.sampler, Dict): -# max_len = min(len(sampler) for _, sampler in self.sampler.items()) -# for _, _ in self.sampler.items(): -# ds_len += 1 -# -# elif self.ds_ratio == 'pad_to_most': -# if isinstance(self.sampler, List): -# max_len = max(len(sampler) for sampler in self.sampler) -# ds_len = len(self.sampler) -# elif isinstance(self.sampler, Dict): -# max_len = max(len(sampler) for _, sampler in self.sampler.items()) -# for _, _ in self.sampler.items(): -# ds_len += 1 -# if self.ds_ratio is None: -# if isinstance(self.sampler, List): -# for i in range(len(self.sampler)): -# sampler = self.sampler[i] -# if self.drop_last: -# lens += len(sampler) // self.batch_size -# else: -# lens += (len(sampler) + self.batch_size - 1) // self.batch_size -# elif isinstance(self.sampler, Dict): -# for name, sampler in self.sampler.items(): -# if self.drop_last: -# lens += len(sampler) // self.batch_size -# else: -# lens += (len(sampler) + self.batch_size - 1) // self.batch_size -# else: -# for i in range(ds_len): -# if self.drop_last: -# lens += max_len // self.batch_size -# else: -# lens += (max_len + self.batch_size - 1) // self.batch_size -# return lens - - -class BucketSampler(Sampler): - r""" - 带Bucket的 `Random Sampler`. 可以随机地取出长度相似的元素 - """ - - def __init__(self, dataset, num_buckets=10, batch_size=None, seq_len_field_name='seq_len', drop_last=False) -> None: - r""" - - :param int num_buckets: bucket的数量 - :param int batch_size: batch的大小. 默认为None,Trainer/Tester在调用BucketSampler时,会将该值正确设置,如果是非 - Trainer/Tester场景使用,需要显示传递该值 - :param str seq_len_field_name: 对应序列长度的 `field` 的名字 - """ - self.dataset = dataset - self.num_buckets = num_buckets - self.batch_size = batch_size - self.seq_len_field_name = seq_len_field_name - - def set_batch_size(self, batch_size) -> None: - r""" - - :param int batch_size: 每个batch的大小 - :return: - """ - self.batch_size = batch_size - - def __iter__(self): - if self.batch_size is None: - raise RuntimeError("batch_size is None.") - seq_lens = self.dataset.get_all_fields()[self.seq_len_field_name].content - total_sample_num = len(seq_lens) - - bucket_indexes = [] - assert total_sample_num >= self.num_buckets, "The number of samples is smaller than the number of buckets." - num_sample_per_bucket = total_sample_num // self.num_buckets - for i in range(self.num_buckets): - bucket_indexes.append([num_sample_per_bucket * i, num_sample_per_bucket * (i + 1)]) - bucket_indexes[-1][1] = total_sample_num - - sorted_seq_lens = list(sorted([(idx, seq_len) for - idx, seq_len in zip(range(total_sample_num), seq_lens)], - key=lambda x: x[1])) - - batchs = [] - - left_init_indexes = [] - for b_idx in range(self.num_buckets): - start_idx = bucket_indexes[b_idx][0] - end_idx = bucket_indexes[b_idx][1] - sorted_bucket_seq_lens = sorted_seq_lens[start_idx:end_idx] - left_init_indexes.extend([tup[0] for tup in sorted_bucket_seq_lens]) - num_batch_per_bucket = len(left_init_indexes) // self.batch_size - np.random.shuffle(left_init_indexes) - for i in range(num_batch_per_bucket): - batchs.append(left_init_indexes[i * self.batch_size:(i + 1) * self.batch_size]) - left_init_indexes = left_init_indexes[num_batch_per_bucket * self.batch_size:] - if (left_init_indexes) != 0: - batchs.append(left_init_indexes) - np.random.shuffle(batchs) - - return chain(*batchs) - - -class ConstTokenNumSampler(Sampler): - """ - 尽量保证每个batch的输入token数量是接近的。 - - """ - - def __init__(self, dataset, seq_len_field_name: List[int], max_token: int = 4096, max_sentence: int = -1, - need_be_multiple_of: int = 1, num_bucket: int = -1) -> None: - """ - - :param dataset: - :param List[int] seq_len_field_name: 哪个field指示的sample的长度 - :param int max_token: 每个batch的最大的token数量 - :param int max_sentence: 每个batch最多多少个instance, -1表示根据max_token决定 - :param int need_be_multiple_of: 生成的batch的instance的数量需要是几的倍数,在DataParallel场景下会用到 - :param int num_bucket: 将数据按长度拆分为num_bucket个bucket,batch中的sample尽量在bucket之中进行组合,这样可以减少padding。 - """ - assert (max_sentence != -1 and max_sentence >= need_be_multiple_of) or max_sentence < 1 - self.dataset = dataset - self.seq_len_field_name = seq_len_field_name - self.num_bucket = num_bucket - self.max_token = max_token - self._max_sentence = max_sentence - self.need_be_multiple_of = need_be_multiple_of - - assert len(self.dataset) > self.num_bucket, "The number of samples should be larger than buckets." - seq_len = self.dataset.get_field(self.seq_len_field_name) - self.seq_len = seq_len - seq_len_indice = [(length, i) for i, length in enumerate(seq_len)] - seq_len_indice.sort(key=lambda x: x[0]) - indice_in_buckets = [] - if self.num_bucket > 0: - sample_per_bucket = len(seq_len_indice) // self.num_bucket - i = 0 - while len(indice_in_buckets) < len(seq_len_indice): - indice_in_buckets.append(seq_len_indice[i * sample_per_bucket:(i + 1) * sample_per_bucket]) - i += 1 - else: - indice_in_buckets = [seq_len_indice] - self.indice_in_buckets = indice_in_buckets - self.get_new_order() - - @property - def max_sentence(self): - if self._max_sentence < 1: - return 100000000 - return self._max_sentence - - @max_sentence.setter - def max_sentence(self, max_sentence): - self._max_sentence = max_sentence - - def get_new_order(self) -> None: - np.random.shuffle(self.indice_in_buckets) - for bucket in self.indice_in_buckets: - np.random.shuffle(bucket) - indices = list(chain(*self.indice_in_buckets)) - batches = [] - cur_max_len = 0 - batch = [] - for length, i in indices: - max_len = max(length, cur_max_len) - if max_len * (len(batch) + 1) > self.max_token or len(batch) >= self.max_sentence: - left_sample = len(batch) % self.need_be_multiple_of - add_samples = batch.copy() - cur_max_len = length - if left_sample != 0: - add_samples = add_samples[:-left_sample] - batch = batch[-left_sample:] - cur_max_len = max(cur_max_len, max(batch)) - else: - batch = [] - if len(add_samples) == 0: - raise RuntimeError( - f"The sample `{i}` is too long to make a batch with {self.need_be_multiple_of} samples.") - batches.append(add_samples) - else: - cur_max_len = max_len - batch.append(i) - if batch: - left_sample = len(batch) % self.need_be_multiple_of - add_samples = batch.copy() - if left_sample != 0: - add_samples = add_samples[:-left_sample].copy() - if add_samples: - batches.append(add_samples) - np.random.shuffle(batches) - self.batches = batches - - def __iter__(self) -> Iterable[int]: - for batch in self.batches: - yield batch - self.get_new_order() - - def __len__(self): - return len(self.batches) - - -class ConstantTokenNumSampler: - """ - 尽量保证每个batch的输入token数量是接近的。 - - """ - - def __init__(self, seq_len, max_token: List[int] = 4096, max_sentence: int = -1, - need_be_multiple_of: int = 1, num_bucket: int = -1) -> None: - """ - - :param List[int] seq_len: list[int], 是每个sample的长度。一般可以通过dataset.get_field('seq_len').content传入 - :param int max_token: 每个batch的最大的token数量 - :param int max_sentence: 每个batch最多多少个instance, -1表示根据max_token决定 - :param int need_be_multiple_of: 生成的batch的instance的数量需要是几的倍数,在DataParallel场景下会用到 - :param int num_bucket: 将数据按长度拆分为num_bucket个bucket,batch中的sample尽量在bucket之中进行组合,这样可以减少padding。 - """ - assert (max_sentence != -1 and max_sentence >= need_be_multiple_of) or max_sentence < 1 - assert len(seq_len) > num_bucket, "The number of samples should be larger than buckets." - self.seq_len = seq_len - self.max_token = max_token - self._max_sentence = max_sentence - self.need_be_multiple_of = need_be_multiple_of - seq_len_indice = [(length, i) for i, length in enumerate(seq_len)] - seq_len_indice.sort(key=lambda x: x[0]) - indice_in_buckets = [] - if num_bucket > 0: - sample_per_bucket = len(seq_len_indice) // num_bucket - i = 0 - while len(indice_in_buckets) < len(seq_len_indice): - indice_in_buckets.append(seq_len_indice[i * sample_per_bucket:(i + 1) * sample_per_bucket]) - i += 1 - else: - indice_in_buckets = [seq_len_indice] - self.indice_in_buckets = indice_in_buckets - self.get_new_order() - - @property - def max_sentence(self): - if self._max_sentence < 1: - return 100000000 - return self._max_sentence - - @max_sentence.setter - def max_sentence(self, max_sentence): - self._max_sentence = max_sentence - - def get_new_order(self) -> None: - np.random.shuffle(self.indice_in_buckets) - for bucket in self.indice_in_buckets: - np.random.shuffle(bucket) - indices = list(chain(*self.indice_in_buckets)) - batches = [] - cur_max_len = 0 - batch = [] - for length, i in indices: - max_len = max(length, cur_max_len) - if max_len * (len(batch) + 1) > self.max_token or len(batch) >= self.max_sentence: - left_sample = len(batch) % self.need_be_multiple_of - add_samples = batch.copy() - cur_max_len = length - if left_sample != 0: - add_samples = add_samples[:-left_sample] - batch = batch[-left_sample:] - cur_max_len = max(cur_max_len, max(batch)) - else: - batch = [] - if len(add_samples) == 0: - raise RuntimeError( - f"The sample `{i}` is too long to make a batch with {self.need_be_multiple_of} samples.") - batches.append(add_samples) - else: - cur_max_len = max_len - batch.append(i) - if batch: - left_sample = len(batch) % self.need_be_multiple_of - add_samples = batch.copy() - if left_sample != 0: - add_samples = add_samples[:-left_sample].copy() - if add_samples: - batches.append(add_samples) - np.random.shuffle(batches) - self.batches = batches - - def __iter__(self) -> Iterable[int]: - for batch in self.batches: - yield batch - self.get_new_order() - - def __len__(self): - return len(self.batches) - - -class SortedSampler(Sampler): - r""" - 按照sample的长度进行排序,主要在测试的时候使用,可以加速测试(因为减少了padding) - """ - - def __init__(self, dataset, seq_len_field_name: str = 'seq_len', descending: bool = True) -> None: - """ - - :param str seq_len_field_name: 按哪个field进行排序。如果传入的field是数字,则直接按照该数字大小排序;如果传入的field不是 - 数字,则使用该field的长度进行排序 - :param bool descending: 是否降序排列 - """ - self.dataset = dataset - self.seq_len_field_name = seq_len_field_name - self.descending = descending - - def __iter__(self) -> Iterable[int]: - seq_lens = self.dataset.get_field(self.seq_len_field_name).content - try: - seq_lens = list(map(len, seq_lens)) - except: - pass - - orders = np.argsort(seq_lens).tolist() # 从小到大的顺序 - if self.descending: - orders = orders[::-1] - for order in orders: - yield order - - -def simple_sort_bucketing(lengths): - r""" - - :param lengths: list of int, the lengths of all examples. - :return data: 2-level list - :: - - [ - [index_11, index_12, ...], # bucket 1 - [index_21, index_22, ...], # bucket 2 - ... - ] - - """ - lengths_mapping = [(idx, length) for idx, length in enumerate(lengths)] - sorted_lengths = sorted(lengths_mapping, key=lambda x: x[1]) - # TODO: need to return buckets - return [idx for idx, _ in sorted_lengths] - - -def k_means_1d(x, k, max_iter=100): - r"""Perform k-means on 1-D data. - - :param x: list of int, representing points in 1-D. - :param k: the number of clusters required. - :param max_iter: maximum iteration - :return centroids: numpy array, centroids of the k clusters - assignment: numpy array, 1-D, the bucket id assigned to each example. - """ - sorted_x = sorted(list(set(x))) - x = np.array(x) - if len(sorted_x) < k: - raise ValueError("too few buckets") - gap = len(sorted_x) / k - - centroids = np.array([sorted_x[int(x * gap)] for x in range(k)]) - assign = None - - for i in range(max_iter): - # Cluster Assignment step - assign = np.array([np.argmin([np.absolute(x_i - x) for x in centroids]) for x_i in x]) - # Move centroids step - new_centroids = np.array([x[assign == k].mean() for k in range(k)]) - if (new_centroids == centroids).all(): - centroids = new_centroids - break - centroids = new_centroids - return np.array(centroids), assign - - -def k_means_bucketing(lengths, buckets): - r"""Assign all instances into possible buckets using k-means, such that instances in the same bucket have similar lengths. - - :param lengths: list of int, the length of all samples. - :param buckets: list of int. The length of the list is the number of buckets. Each integer is the maximum length - threshold for each bucket (This is usually None.). - :return data: 2-level list - :: - - [ - [index_11, index_12, ...], # bucket 1 - [index_21, index_22, ...], # bucket 2 - ... - ] - - """ - bucket_data = [[] for _ in buckets] - num_buckets = len(buckets) - _, assignments = k_means_1d(lengths, num_buckets) - - for idx, bucket_id in enumerate(assignments): - if buckets[bucket_id] is None or lengths[idx] <= buckets[bucket_id]: - bucket_data[bucket_id].append(idx) - return bucket_data diff --git a/fastNLP/io/loader/conll.py b/fastNLP/io/loader/conll.py index e099331f..90045e46 100644 --- a/fastNLP/io/loader/conll.py +++ b/fastNLP/io/loader/conll.py @@ -50,8 +50,6 @@ class ConllLoader(Loader): ConllLoader返回的DataSet的field由传入的headers确定。 - 数据中以"-DOCSTART-"开头的行将被忽略,因为该符号在conll 2003中被用为文档分割符。 - """ def __init__(self, headers, sep=None, indexes=None, dropna=True): @@ -93,6 +91,7 @@ class ConllLoader(Loader): class Conll2003Loader(ConllLoader): r""" 用于读取conll2003任务的数据。数据的内容应该类似与以下的内容, 第一列为raw_words, 第二列为pos, 第三列为chunking,第四列为ner。 + 数据中以"-DOCSTART-"开头的行将被忽略,因为该符号在conll 2003中被用为文档分割符。 Example:: diff --git a/tests/core/callbacks/torch_callbacks/__init__.py b/tests/core/callbacks/torch_callbacks/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/tests/core/callbacks/torch_callbacks/test_torch_grad_clip_callback.py b/tests/core/callbacks/torch_callbacks/test_torch_grad_clip_callback.py new file mode 100644 index 00000000..7f2016e2 --- /dev/null +++ b/tests/core/callbacks/torch_callbacks/test_torch_grad_clip_callback.py @@ -0,0 +1,41 @@ +import pytest +import numpy as np + +from fastNLP.core.callbacks import TorchGradClipCallback, Callback +from fastNLP import Trainer +from fastNLP.envs.imports import _NEED_IMPORT_TORCH + +if _NEED_IMPORT_TORCH: + import torch + +from tests.helpers.callbacks.prepare_trainer_args_for_torch_test import get_trainer_args + + +class CheckClipCallback(Callback): + def __init__(self, parameters, clip_type, clip_value): + self.parameters = parameters + self.clip_type = clip_type + self.clip_value = clip_value + + def on_after_optimizers_step(self, trainer, optimizers): + for param in self.parameters: + if self.clip_type == 'value': + assert param.grad.max().item()<=self.clip_value + else: + assert np.linalg.norm(param.grad.cpu().view(-1).numpy())<=self.clip_value + + +@pytest.mark.parametrize('accumulation_steps', [1, 3, 5]) +@pytest.mark.parametrize('fp16', [True, False]) +@pytest.mark.parametrize('clip_type', ['norm', 'value']) +@pytest.mark.parametrize('clip_value', [1, 2]) +def test_torch_grad_clip_callback(accumulation_steps, fp16, clip_type, clip_value): + if not torch.cuda.is_available() and fp16: + pytest.skip("No cuda, cannot test fp16.") + device = 'cuda' if fp16 else 'cpu' + kwargs = get_trainer_args(lr=1, device=device) + callbacks = [] + callbacks.append(TorchGradClipCallback(clip_value=clip_value, clip_type=clip_type)) + callbacks.append(CheckClipCallback(kwargs['model'].parameters(), clip_type, clip_value)) + trainer = Trainer(**kwargs, callbacks=callbacks, fp16=fp16) + trainer.run() diff --git a/tests/core/callbacks/torch_callbacks/test_torch_warmup_callback.py b/tests/core/callbacks/torch_callbacks/test_torch_warmup_callback.py new file mode 100644 index 00000000..6367c458 --- /dev/null +++ b/tests/core/callbacks/torch_callbacks/test_torch_warmup_callback.py @@ -0,0 +1,34 @@ +import pytest +import numpy as np + +from fastNLP.core.callbacks import TorchWarmupCallback, Callback +from fastNLP import Trainer + +from tests.helpers.callbacks.prepare_trainer_args_for_torch_test import get_trainer_args + + +class RecordLrCallback(Callback): + def __init__(self): + self.lrs = [] + + def on_after_optimizers_step(self, trainer, optimizers): + self.lrs.append(trainer.driver.optimizers[0].param_groups[0]['lr']) + + +@pytest.mark.parametrize('warmup', [5, 0.1]) +@pytest.mark.parametrize('schedule', ['constant', 'linear']) +@pytest.mark.parametrize('accumulation_steps', [1, 3, 4]) +def test_torch_warmup_callback(warmup, schedule, accumulation_steps): + kwargs = get_trainer_args(lr=0.1, bsz=4) + callback = TorchWarmupCallback(warmup, schedule) + r_callback = RecordLrCallback() + kwargs['callbacks'] = [callback, r_callback] + trainer = Trainer(**kwargs, accumulation_steps=accumulation_steps) + trainer.run() + + if schedule == 'linear': + assert kwargs['optimizers'].param_groups[0]['lr'] <= 0.01 + elif schedule == 'constant': + assert np.allclose(0.1, kwargs['optimizers'].param_groups[0]['lr']) + + assert len(r_callback.lrs)<=trainer.total_batches//accumulation_steps+1 \ No newline at end of file diff --git a/tests/core/samplers/test_sampler.py b/tests/core/samplers/test_sampler.py deleted file mode 100644 index 63d8e860..00000000 --- a/tests/core/samplers/test_sampler.py +++ /dev/null @@ -1,31 +0,0 @@ -import unittest -import random -from fastNLP.core.samplers import SequentialSampler, RandomSampler, BucketSampler -from fastNLP.core.dataset import DataSet -from array import array -import torch - -from fastNLP.core.samplers.sampler import ReproduceBatchSampler -from fastNLP.core.drivers.torch_driver.utils import replace_batch_sampler -from tests.helpers.datasets.torch_data import TorchNormalDataset - - -class SamplerTest(unittest.TestCase): - - def test_sequentialsampler(self): - ds = DataSet({'x': [1, 2, 3, 4] * 10}) - sqspl = SequentialSampler(ds) - for idx, inst in enumerate(sqspl): - self.assertEqual(idx, inst) - - def test_randomsampler(self): - ds = DataSet({'x': [1, 2, 3, 4] * 10}) - rdspl = RandomSampler(ds) - ans = [ds[i] for i in rdspl] - self.assertEqual(len(ans), len(ds)) - - def test_bucketsampler(self): - data_set = DataSet({"x": [[0] * random.randint(1, 10)] * 10, "y": [[5, 6]] * 10}) - sampler = BucketSampler(data_set, num_buckets=3, batch_size=16, seq_len_field_name="seq_len") - - diff --git a/tests/helpers/callbacks/prepare_trainer_args_for_torch_test.py b/tests/helpers/callbacks/prepare_trainer_args_for_torch_test.py new file mode 100644 index 00000000..01544e0a --- /dev/null +++ b/tests/helpers/callbacks/prepare_trainer_args_for_torch_test.py @@ -0,0 +1,68 @@ + +""" +这个文件主要用于提供测试 callback 时的 Trainer 的参数,可以直接使用进行对Trainer进行初始化。只需要再额外传入相应的callback就可以运行 + +""" + +from fastNLP.envs.imports import _NEED_IMPORT_TORCH +from fastNLP.core.metrics import Accuracy + + +if _NEED_IMPORT_TORCH: + import torch + from torch import nn + from torch.utils.data import DataLoader + import torch.nn.functional as F + + class DataSet: + def __init__(self, num_samples=1000, num_features=10): + g = torch.Generator() + g.manual_seed(1000) + self.data = torch.randn(num_samples, num_features, generator=g) + self.y = self.data.argmax(dim=-1) + + def __getitem__(self, item): + return {'x': self.data[item], 'target': self.y[item]} + + def __len__(self): + return len(self.data) + + + class Model(nn.Module): + def __init__(self, num_features=5): + super().__init__() + self.mlps = nn.Sequential( + nn.Linear(num_features, 20), + nn.ReLU(), + nn.Linear(20, 20), + nn.Dropout(p=0.3), + nn.ReLU(), + nn.Linear(20, num_features) + ) + + def forward(self, x, target): + y = self.mlps(x) + if self.training: + return {'loss': F.cross_entropy(y, target)} + return {'pred': y} + + +def get_trainer_args(num_features=5, num_samples=20, bsz=4, lr=0.1, n_epochs=5, device=None): + ds = DataSet(num_samples=num_samples, num_features=num_features) + dl = DataLoader(ds, batch_size=bsz) + model = Model(num_features=num_features) + + optimizer = torch.optim.SGD(model.parameters(), lr=lr) + + kwargs = { + 'model': model, + 'driver': 'torch', + 'device': device, + 'optimizers': optimizer, + 'train_dataloader': dl, + 'evaluate_dataloaders': dl, + 'metrics': {'acc': Accuracy()}, + 'n_epochs': n_epochs + } + + return kwargs \ No newline at end of file