@@ -19,7 +19,7 @@ from fastNLP.core.utils import ( | |||||
paddle_move_data_to_device, | paddle_move_data_to_device, | ||||
is_in_paddle_dist, | is_in_paddle_dist, | ||||
) | ) | ||||
from fastNLP.core.samplers import ReproducibleIterator, RandomSampler, UnrepeatedDistributedSampler | |||||
from fastNLP.core.samplers import ReproducibleIterator, RandomSampler, UnrepeatedSampler | |||||
from fastNLP.envs.env import FASTNLP_DISTRIBUTED_CHECK, USER_CUDA_VISIBLE_DEVICES | from fastNLP.envs.env import FASTNLP_DISTRIBUTED_CHECK, USER_CUDA_VISIBLE_DEVICES | ||||
from fastNLP.core.log import logger | from fastNLP.core.log import logger | ||||
@@ -362,7 +362,7 @@ class PaddleFleetDriver(PaddleDriver): | |||||
return dataloader | return dataloader | ||||
# evaluator | # evaluator | ||||
elif dist == "unrepeatdist": | elif dist == "unrepeatdist": | ||||
sampler = UnrepeatedDistributedSampler( | |||||
sampler = UnrepeatedSampler( | |||||
dataset=dataloader.dataset, | dataset=dataloader.dataset, | ||||
shuffle=shuffle, | shuffle=shuffle, | ||||
seed=int(os.environ.get("FASTNLP_SEED", 0)) | seed=int(os.environ.get("FASTNLP_SEED", 0)) | ||||
@@ -28,7 +28,7 @@ from fastNLP.core.drivers.torch_driver.utils import ( | |||||
) | ) | ||||
from fastNLP.core.drivers.utils import distributed_open_proc | from fastNLP.core.drivers.utils import distributed_open_proc | ||||
from fastNLP.core.utils import auto_param_call, check_user_specific_params | from fastNLP.core.utils import auto_param_call, check_user_specific_params | ||||
from fastNLP.core.samplers import ReproducibleIterator, RandomSampler, UnrepeatedDistributedSampler, ReproducibleBatchSampler | |||||
from fastNLP.core.samplers import ReproducibleIterator, RandomSampler, UnrepeatedSampler, ReproducibleBatchSampler | |||||
from fastNLP.envs import FASTNLP_DISTRIBUTED_CHECK, FASTNLP_GLOBAL_RANK, FASTNLP_GLOBAL_SEED | from fastNLP.envs import FASTNLP_DISTRIBUTED_CHECK, FASTNLP_GLOBAL_RANK, FASTNLP_GLOBAL_SEED | ||||
from fastNLP.core.log import logger | from fastNLP.core.log import logger | ||||
from fastNLP.core.drivers.torch_driver.dist_utils import fastnlp_torch_all_gather, fastnlp_torch_broadcast_object | from fastNLP.core.drivers.torch_driver.dist_utils import fastnlp_torch_all_gather, fastnlp_torch_broadcast_object | ||||
@@ -513,7 +513,7 @@ class TorchDDPDriver(TorchDriver): | |||||
args = self.get_dataloader_args(dataloader) | args = self.get_dataloader_args(dataloader) | ||||
# todo 判断 batch_sampler; | # todo 判断 batch_sampler; | ||||
sampler = UnrepeatedDistributedSampler( | |||||
sampler = UnrepeatedSampler( | |||||
dataset=args.dataset, | dataset=args.dataset, | ||||
shuffle=args.shuffle, | shuffle=args.shuffle, | ||||
) | ) | ||||
@@ -3,19 +3,24 @@ __all__ = [ | |||||
'SortedSampler', | 'SortedSampler', | ||||
'ConstTokenNumSampler', | 'ConstTokenNumSampler', | ||||
'ConstantTokenNumSampler', | 'ConstantTokenNumSampler', | ||||
'UnrepeatedDistributedSampler', | |||||
'MixSampler', | 'MixSampler', | ||||
'InnerSampler', | |||||
'DopedSampler', | 'DopedSampler', | ||||
'MixSequentialSampler', | 'MixSequentialSampler', | ||||
'PollingSampler', | 'PollingSampler', | ||||
'ReproducibleIterator', | 'ReproducibleIterator', | ||||
'RandomSampler', | 'RandomSampler', | ||||
're_instantiate_sampler' | |||||
're_instantiate_sampler', | |||||
'UnrepeatedSampler', | |||||
"UnrepeatedSortedSampler" | |||||
] | ] | ||||
from .sampler import BucketSampler, SortedSampler, ConstTokenNumSampler, ConstantTokenNumSampler, UnrepeatedDistributedSampler | |||||
from .mix_sampler import MixSampler, InnerSampler, DopedSampler, MixSequentialSampler, PollingSampler | |||||
from .sampler import BucketSampler, SortedSampler, ConstTokenNumSampler, ConstantTokenNumSampler | |||||
from .unrepeated_sampler import UnrepeatedSampler, UnrepeatedSortedSampler | |||||
from .mix_sampler import MixSampler, DopedSampler, MixSequentialSampler, PollingSampler | |||||
from .reproducible_sampler import ReproducibleIterator, RandomSampler, re_instantiate_sampler | from .reproducible_sampler import ReproducibleIterator, RandomSampler, re_instantiate_sampler | ||||
from .reproducible_batch_sampler import ReproducibleBatchSampler, BucketedBatchSampler | from .reproducible_batch_sampler import ReproducibleBatchSampler, BucketedBatchSampler | ||||
@@ -4,7 +4,6 @@ from typing import Union, List, Iterable, Dict | |||||
__all__ = [ | __all__ = [ | ||||
'MixSampler', | 'MixSampler', | ||||
'InnerSampler', | |||||
'DopedSampler', | 'DopedSampler', | ||||
'MixSequentialSampler', | 'MixSequentialSampler', | ||||
'PollingSampler' | 'PollingSampler' | ||||
@@ -7,7 +7,6 @@ __all__ = [ | |||||
"SortedSampler", | "SortedSampler", | ||||
'ConstTokenNumSampler', | 'ConstTokenNumSampler', | ||||
"ConstantTokenNumSampler", | "ConstantTokenNumSampler", | ||||
"UnrepeatedDistributedSampler", | |||||
] | ] | ||||
from itertools import chain | from itertools import chain | ||||
@@ -18,7 +17,7 @@ import numpy as np | |||||
from fastNLP.envs.imports import _NEED_IMPORT_TORCH | from fastNLP.envs.imports import _NEED_IMPORT_TORCH | ||||
if _NEED_IMPORT_TORCH: | if _NEED_IMPORT_TORCH: | ||||
from torch.utils.data import SequentialSampler, Sampler, RandomSampler | |||||
from torch.utils.data import Sampler | |||||
else: | else: | ||||
from fastNLP.core.utils.dummy_class import DummyClass as Sampler | from fastNLP.core.utils.dummy_class import DummyClass as Sampler | ||||
@@ -727,87 +726,3 @@ def k_means_bucketing(lengths, buckets): | |||||
if buckets[bucket_id] is None or lengths[idx] <= buckets[bucket_id]: | if buckets[bucket_id] is None or lengths[idx] <= buckets[bucket_id]: | ||||
bucket_data[bucket_id].append(idx) | bucket_data[bucket_id].append(idx) | ||||
return bucket_data | return bucket_data | ||||
class UnrepeatedDistributedSampler: | |||||
def __init__(self, dataset, shuffle: bool = False, seed: int = 0): | |||||
""" | |||||
考虑在多卡evaluate的场景下,不能重复sample。 | |||||
:param dataset: | |||||
:param shuffle: | |||||
:param seed: | |||||
""" | |||||
self.dataset = dataset | |||||
self.shuffle = shuffle | |||||
self.seed = seed | |||||
# 多卡的相关的参数 | |||||
self.num_replicas = 1 | |||||
self.rank = 0 | |||||
self.epoch = -1 | |||||
def __len__(self): | |||||
""" | |||||
返回 sampler 一次完整的迭代过程会产生多少个index。多卡的情况下,只考虑当前rank; | |||||
:return: | |||||
""" | |||||
num_common = len(self.dataset)//self.num_replicas | |||||
self.num_samples = num_common + int(self.rank < (len(self.dataset)-num_common*self.num_replicas)) | |||||
return self.num_samples | |||||
def __iter__(self): | |||||
r""" | |||||
当前使用num_consumed_samples做法会在交替使用的时候遇到问题; | |||||
Example: | |||||
>>> sampler = RandomSampler() | |||||
>>> iter1 = iter(sampler) | |||||
>>> iter2 = iter(sampler) | |||||
>>> next(iter1) | |||||
>>> next(iter2) # 当前num_consumed_samples的数量会发生变化 | |||||
""" | |||||
indices = self.generate_indices() | |||||
# subsample | |||||
indices = indices[self.rank:len(indices):self.num_replicas] | |||||
assert len(indices) == len(self) | |||||
for index in indices: | |||||
yield index | |||||
def generate_indices(self) -> List[int]: | |||||
""" | |||||
生成随机序列 | |||||
:return: | |||||
""" | |||||
if self.shuffle: | |||||
indices = list(range(len(self.dataset))) | |||||
seed = self.seed + self.epoch | |||||
rng = np.random.default_rng(abs(seed)) | |||||
rng.shuffle(indices) | |||||
if self.epoch < 0: # 防止用户忘记调用 set_epoch,至少这样可以保证每次epoch出来的index顺序不同。 | |||||
self.epoch -= 1 | |||||
else: | |||||
indices = list(range(len(self.dataset))) | |||||
return indices | |||||
def set_epoch(self, epoch: int) -> None: | |||||
self.epoch = epoch | |||||
def set_distributed(self, num_replicas, rank): | |||||
""" | |||||
该方法本质上等同于 ddp 情形下的没有完成的初始化,应当在初始化该 sampler 本身后立即被调用; | |||||
:param num_replicas: | |||||
:param rank: | |||||
:return: | |||||
""" | |||||
assert num_replicas>0 and isinstance(num_replicas, int) | |||||
assert isinstance(rank, int) and 0<=rank<num_replicas | |||||
# 注意初始化该函数时,所有的状态都应当默认是一个 epoch 刚开始训练的状态; | |||||
self.num_replicas = num_replicas | |||||
self.rank = rank | |||||
return self |
@@ -0,0 +1,114 @@ | |||||
__all__ = [ | |||||
'UnrepeatedSortedSampler', | |||||
'UnrepeatedSampler' | |||||
] | |||||
from typing import List, Union | |||||
from fastNLP.core.dataset import DataSet | |||||
import numpy as np | |||||
class UnrepeatedSampler: | |||||
def __init__(self, dataset, shuffle: bool = False, seed: int = 0, **kwargs): | |||||
""" | |||||
考虑在多卡evaluate的场景下,不能重复sample。 | |||||
:param dataset: | |||||
:param shuffle: | |||||
:param seed: | |||||
""" | |||||
self.dataset = dataset | |||||
self.shuffle = shuffle | |||||
self.seed = seed | |||||
# 多卡的相关的参数 | |||||
self.num_replicas = kwargs.get('num_replicas', 1) | |||||
self.rank = kwargs.get('rank', 0) | |||||
self.epoch = kwargs.get('epoch', -1) | |||||
def __len__(self): | |||||
""" | |||||
返回 sampler 一次完整的迭代过程会产生多少个index。多卡的情况下,只考虑当前rank; | |||||
:return: | |||||
""" | |||||
num_common = len(self.dataset)//self.num_replicas | |||||
self.num_samples = num_common + int(self.rank < (len(self.dataset)-num_common*self.num_replicas)) | |||||
return self.num_samples | |||||
def __iter__(self): | |||||
indices = self.generate_indices() | |||||
# subsample | |||||
indices = indices[self.rank:len(indices):self.num_replicas] | |||||
assert len(indices) == len(self) | |||||
for index in indices: | |||||
yield index | |||||
def generate_indices(self) -> List[int]: | |||||
""" | |||||
生成随机序列 | |||||
:return: | |||||
""" | |||||
if self.shuffle: | |||||
indices = list(range(len(self.dataset))) | |||||
seed = self.seed + self.epoch | |||||
rng = np.random.default_rng(abs(seed)) | |||||
rng.shuffle(indices) | |||||
if self.epoch < 0: # 防止用户忘记调用 set_epoch,至少这样可以保证每次epoch出来的index顺序不同。 | |||||
self.epoch -= 1 | |||||
else: | |||||
indices = list(range(len(self.dataset))) | |||||
return indices | |||||
def set_epoch(self, epoch: int) -> None: | |||||
self.epoch = epoch | |||||
def set_distributed(self, num_replicas, rank): | |||||
""" | |||||
该方法本质上等同于 ddp 情形下的没有完成的初始化,应当在初始化该 sampler 本身后立即被调用; | |||||
:param num_replicas: | |||||
:param rank: | |||||
:return: | |||||
""" | |||||
assert num_replicas>0 and isinstance(num_replicas, int) | |||||
assert isinstance(rank, int) and 0<=rank<num_replicas | |||||
# 注意初始化该函数时,所有的状态都应当默认是一个 epoch 刚开始训练的状态; | |||||
self.num_replicas = num_replicas | |||||
self.rank = rank | |||||
return self | |||||
class UnrepeatedSortedSampler(UnrepeatedSampler): | |||||
def __init__(self, dataset, length:Union[str, List], seed: int = 0): | |||||
""" | |||||
将 dataset 中的数据根据 length 从长到短进行迭代,并且保证在多卡场景下数据不重复。本 sampler 可能导致各个机器上的 | |||||
batch 数量不完全一致。 | |||||
:param dataset: 实现了 __len__ 方法的数据容器。 | |||||
:param length: 如果为 List,应当与 dataset 有一样的长度,表示 dataset 中每个元素的数量;仅当传入的 dataset 为 fastNLP 的 | |||||
DataSet 时支持传入 str,会将该str理解为 dataset 的 field 名称,若 field 中的元素为 int,则认为该值是 sample 的长度。 | |||||
:param shuffle: 如果为 True,将不进行 shuffle,实际上数据会以从长到短的方式输出。 | |||||
:param seed: 设置的随机数种子 | |||||
:param kwargs: fastNLP 保留使用 | |||||
""" | |||||
super().__init__(dataset=dataset, shuffle=False, seed=seed) | |||||
if isinstance(dataset, DataSet): | |||||
length = dataset.get_field(length) | |||||
if not isinstance(length[0], int): | |||||
length = list(map(len, length)) | |||||
else: | |||||
assert len(length) == len(dataset), "When the dataset is not fastNLP.DataSet, " \ | |||||
"the length parameter can only be List[int]" | |||||
assert len(length) == len(dataset), "The length of `data` and `length` should be equal." | |||||
self.length = np.array(length, dtype=int) # 按照长到短排列的序号。 | |||||
self.sorted_indices = np.argsort(self.length)[::-1].tolist() # 按长度从高到低排序的 | |||||
def generate_indices(self) -> List[int]: | |||||
return self.sorted_indices |
@@ -0,0 +1,64 @@ | |||||
from itertools import chain | |||||
import pytest | |||||
from fastNLP.core.samplers import UnrepeatedSampler, UnrepeatedSortedSampler | |||||
class DatasetWithVaryLength: | |||||
def __init__(self, num_of_data=100): | |||||
self.data = list(range(num_of_data)) | |||||
def __getitem__(self, item): | |||||
return self.data[item] | |||||
def __len__(self): | |||||
return len(self.data) | |||||
class TestUnrepeatedSampler: | |||||
@pytest.mark.parametrize('shuffle', [True, False]) | |||||
def test_single(self, shuffle): | |||||
num_of_data = 100 | |||||
data = DatasetWithVaryLength(num_of_data) | |||||
sampler = UnrepeatedSampler(data, shuffle) | |||||
indexes = set(sampler) | |||||
assert indexes==set(range(num_of_data)) | |||||
@pytest.mark.parametrize('num_replica', [2, 3]) | |||||
@pytest.mark.parametrize('num_of_data', [2, 3, 4, 100]) | |||||
@pytest.mark.parametrize('shuffle', [False, True]) | |||||
def test_multi(self, num_replica, num_of_data, shuffle): | |||||
data = DatasetWithVaryLength(num_of_data=num_of_data) | |||||
samplers = [] | |||||
for i in range(num_replica): | |||||
sampler = UnrepeatedSampler(dataset=data, shuffle=shuffle) | |||||
sampler.set_distributed(num_replica, rank=i) | |||||
samplers.append(sampler) | |||||
indexes = set(chain(*samplers)) | |||||
assert indexes==set(range(num_of_data)) | |||||
class TestUnrepeatedSortedSampler: | |||||
@pytest.mark.parametrize('shuffle', [True, False]) | |||||
def test_single(self, shuffle): | |||||
num_of_data = 100 | |||||
data = DatasetWithVaryLength(num_of_data) | |||||
sampler = UnrepeatedSortedSampler(data, length=data.data) | |||||
indexes = list(sampler) | |||||
assert indexes==list(range(num_of_data-1, -1, -1)) | |||||
@pytest.mark.parametrize('num_replica', [2, 3]) | |||||
@pytest.mark.parametrize('num_of_data', [2, 3, 4, 100]) | |||||
@pytest.mark.parametrize('shuffle', [False, True]) | |||||
def test_multi(self, num_replica, num_of_data, shuffle): | |||||
data = DatasetWithVaryLength(num_of_data=num_of_data) | |||||
samplers = [] | |||||
for i in range(num_replica): | |||||
sampler = UnrepeatedSortedSampler(dataset=data, length=data.data) | |||||
sampler.set_distributed(num_replica, rank=i) | |||||
samplers.append(sampler) | |||||
indexes = set(chain(*samplers)) | |||||
assert indexes==set(range(num_of_data)) |