From 22a8702d225e5d39f526daa3c56bd2f16ff7500f Mon Sep 17 00:00:00 2001 From: yh Date: Thu, 18 Jul 2019 23:43:10 +0800 Subject: [PATCH] =?UTF-8?q?1.=20Trainer=E6=94=AF=E6=8C=81=E4=BD=BF?= =?UTF-8?q?=E7=94=A8DistributedDataParallel=E8=AE=AD=E7=BB=83;=20=E4=BD=86?= =?UTF-8?q?=E6=98=AF=E8=BF=98=E6=B2=A1=E6=9C=89=E7=BB=8F=E8=BF=87=E5=B9=BF?= =?UTF-8?q?=E6=B3=9B=E6=B5=8B=E8=AF=95=EF=BC=8C=E8=B0=A8=E6=85=8E=E4=BD=BF?= =?UTF-8?q?=E7=94=A8;=202.=20=E4=BF=AE=E5=A4=8Dimport=20os=20bug;=203.Fitl?= =?UTF-8?q?ogCallback=E6=94=AF=E6=8C=81=E4=B8=8D=E4=BC=A0=E5=85=A5?= =?UTF-8?q?=E4=BB=BB=E4=BD=95DataSet;=204.=20NullOptimizer=E7=9A=84constru?= =?UTF-8?q?ct=5Ffrom=5Foptimer=E8=BF=94=E5=9B=9Eself;=205.=20=E4=BF=AE?= =?UTF-8?q?=E5=A4=8DBert=E4=B8=ADpooled=5Fcls=E7=9A=84bug;=E2=80=9C?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- fastNLP/core/batch.py | 16 +++- fastNLP/core/callback.py | 2 +- fastNLP/core/dataset.py | 12 ++- fastNLP/core/losses.py | 2 +- fastNLP/core/optimizer.py | 2 +- fastNLP/core/sampler.py | 6 +- fastNLP/core/tester.py | 14 ++-- fastNLP/core/trainer.py | 104 ++++++++++++++++--------- fastNLP/core/utils.py | 70 +++-------------- fastNLP/embeddings/bert_embedding.py | 6 +- fastNLP/embeddings/elmo_embedding.py | 2 +- fastNLP/embeddings/static_embedding.py | 4 +- fastNLP/modules/encoder/bert.py | 2 + fastNLP/modules/utils.py | 4 +- test/core/test_dataset.py | 11 +++ test/core/test_field.py | 10 +-- 16 files changed, 139 insertions(+), 128 deletions(-) diff --git a/fastNLP/core/batch.py b/fastNLP/core/batch.py index 64c5f48e..538f583a 100644 --- a/fastNLP/core/batch.py +++ b/fastNLP/core/batch.py @@ -93,9 +93,13 @@ class DataSetGetter: class SamplerAdapter(torch.utils.data.Sampler): def __init__(self, sampler, dataset): + super().__init__(dataset) self.sampler = sampler self.dataset = dataset + def __len__(self): + return len(self.dataset) + def __iter__(self): return iter(self.sampler(self.dataset)) @@ -165,15 +169,19 @@ class DataSetIter(BatchIter): timeout=0, worker_init_fn=None): super().__init__() assert isinstance(dataset, DataSet) - sampler = SamplerAdapter(sampler=sampler or SequentialSampler(), dataset=dataset) + if not isinstance(sampler, torch.utils.data.Sampler): + self.sampler = SamplerAdapter(sampler=sampler or SequentialSampler(), dataset=dataset) + else: + self.sampler = sampler dataset = DataSetGetter(dataset, as_numpy) collate_fn = dataset.collate_fn if hasattr(dataset, 'collate_fn') else None self.dataiter = torch.utils.data.DataLoader( - dataset=dataset, batch_size=batch_size, sampler=sampler, + dataset=dataset, batch_size=batch_size, sampler=self.sampler, collate_fn=collate_fn, num_workers=num_workers, pin_memory=pin_memory, drop_last=drop_last, timeout=timeout, worker_init_fn=worker_init_fn) - self.num_batches = self.get_num_batches(len(dataset), batch_size, drop_last) + # 以sampler的数量为准,因为DistributedSampler的时候每个进程上并不是所有的数据都用上了 + self.num_batches = self.get_num_batches(len(self.dataiter.sampler), batch_size, drop_last) self.batch_size = batch_size @@ -182,7 +190,7 @@ class TorchLoaderIter(BatchIter): super().__init__() assert isinstance(dataset, torch.utils.data.DataLoader) self.dataiter = dataset - self.num_batches = self.get_num_batches(len(dataset), dataset.batch_size, dataset.drop_last) + self.num_batches = self.get_num_batches(len(dataset.sampler), dataset.batch_size, dataset.drop_last) self.batch_size = dataset.batch_size diff --git a/fastNLP/core/callback.py b/fastNLP/core/callback.py index 6f855397..874d0ad9 100644 --- a/fastNLP/core/callback.py +++ b/fastNLP/core/callback.py @@ -479,7 +479,7 @@ class FitlogCallback(Callback): self.datasets[key] = value elif isinstance(data, DataSet): self.datasets['test'] = data - else: + elif data is not None: raise TypeError("data receives dict[DataSet] or DataSet object.") self.verbose = verbose diff --git a/fastNLP/core/dataset.py b/fastNLP/core/dataset.py index 7b7fa87a..2955eff6 100644 --- a/fastNLP/core/dataset.py +++ b/fastNLP/core/dataset.py @@ -487,7 +487,7 @@ class DataSet(object): """ 删除第index个instance - :param int index: 需要删除的instance的index,从0开始 + :param int index: 需要删除的instance的index,序号从0开始。 """ assert isinstance(index, int), "Only integer supported." if len(self) <= index: @@ -566,7 +566,7 @@ class DataSet(object): raise KeyError("DataSet has no field named {}.".format(old_name)) return self - def set_target(self, *field_names, flag=True): + def set_target(self, *field_names, flag=True, use_1st_ins_infer_dim_type=True): """ 将field_names的field设置为target @@ -577,11 +577,14 @@ class DataSet(object): :param str field_names: field的名称 :param bool flag: 将field_name的target状态设置为flag + :param bool use_1st_ins_infer_dim_type: 如果为True,将不会check该列是否所有数据都是同样的维度,同样的类型。将直接使用第一 + 行的数据进行类型和维度推断本列的数据的类型和维度。 """ assert isinstance(flag, bool), "Only bool type supported." for name in field_names: if name in self.field_arrays: try: + self.field_arrays[name]._use_1st_ins_infer_dim_type = bool(use_1st_ins_infer_dim_type) self.field_arrays[name].is_target = flag except SetInputOrTargetException as e: print(f"Cannot set field:{name} as target.") @@ -589,7 +592,7 @@ class DataSet(object): else: raise KeyError("{} is not a valid field name.".format(name)) - def set_input(self, *field_names, flag=True): + def set_input(self, *field_names, flag=True, use_1st_ins_infer_dim_type=True): """ 将field_names的field设置为input:: @@ -598,10 +601,13 @@ class DataSet(object): :param str field_names: field的名称 :param bool flag: 将field_name的input状态设置为flag + :param bool use_1st_ins_infer_dim_type: 如果为True,将不会check该列是否所有数据都是同样的维度,同样的类型。将直接使用第一 + 行的数据进行类型和维度推断本列的数据的类型和维度。 """ for name in field_names: if name in self.field_arrays: try: + self.field_arrays[name]._use_1st_ins_infer_dim_type = bool(use_1st_ins_infer_dim_type) self.field_arrays[name].is_input = flag except SetInputOrTargetException as e: print(f"Cannot set field:{name} as input, exception happens at the {e.index} value.") diff --git a/fastNLP/core/losses.py b/fastNLP/core/losses.py index 1f8923eb..21c024f0 100644 --- a/fastNLP/core/losses.py +++ b/fastNLP/core/losses.py @@ -225,7 +225,7 @@ class CrossEntropyLoss(LossBase): def get_loss(self, pred, target, seq_len=None): if pred.dim() > 2: - if pred.size(1) != target.size(1): + if pred.size(1) != target.size(1): # 有可能顺序替换了 pred = pred.transpose(1, 2) pred = pred.reshape(-1, pred.size(-1)) target = target.reshape(-1) diff --git a/fastNLP/core/optimizer.py b/fastNLP/core/optimizer.py index 3036257c..e95047b4 100644 --- a/fastNLP/core/optimizer.py +++ b/fastNLP/core/optimizer.py @@ -49,7 +49,7 @@ class NullOptimizer(Optimizer): super().__init__(None) def construct_from_pytorch(self, model_params): - pass + return self def __getattr__(self, item): def pass_func(*args, **kwargs): diff --git a/fastNLP/core/sampler.py b/fastNLP/core/sampler.py index d8ba1ad1..9ca04fa0 100644 --- a/fastNLP/core/sampler.py +++ b/fastNLP/core/sampler.py @@ -25,9 +25,9 @@ class Sampler(object): def __call__(self, data_set): """ - :param DataSet data_set: `DataSet` 对象, 需要Sample的数据 - :return result: list(int) 其中元素的下标序列, ``data_set`` 中元素会按 ``result`` 中顺序取出 - """ + :param DataSet data_set: `DataSet` 对象, 需要Sample的数据 + :return result: list(int) 其中元素的下标序列, ``data_set`` 中元素会按 ``result`` 中顺序取出 + """ raise NotImplementedError diff --git a/fastNLP/core/tester.py b/fastNLP/core/tester.py index c1d270d1..3d672ccc 100644 --- a/fastNLP/core/tester.py +++ b/fastNLP/core/tester.py @@ -47,6 +47,7 @@ from .utils import _get_func_signature from .utils import _get_model_device from .utils import _move_model_to_device from ._parallel_utils import _data_parallel_wrapper +from .utils import _model_contains_inner_module from functools import partial __all__ = [ @@ -83,9 +84,7 @@ class Tester(object): def __init__(self, data, model, metrics, batch_size=16, num_workers=0, device=None, verbose=1): super(Tester, self).__init__() - - if not isinstance(data, DataSet): - raise TypeError(f"The type of data must be `fastNLP.DataSet`, got `{type(data)}`.") + if not isinstance(model, nn.Module): raise TypeError(f"The type of model must be `torch.nn.Module`, got `{type(model)}`.") @@ -106,19 +105,22 @@ class Tester(object): # check predict if (hasattr(self._model, 'predict') and callable(self._model.predict)) or \ - (isinstance(self._model, nn.DataParallel) and hasattr(self._model.module, 'predict') and - callable(self._model.module.predict)): + (_model_contains_inner_module(self._model) and hasattr(self._model.module, 'predict') and + callable(self._model.module.predict)): if isinstance(self._model, nn.DataParallel): self._predict_func_wrapper = partial(_data_parallel_wrapper('predict', self._model.device_ids, self._model.output_device), network=self._model.module) + self._predict_func = self._model.module.predict # 用于匹配参数 + elif isinstance(self._model, nn.parallel.DistributedDataParallel): self._predict_func = self._model.module.predict + self._predict_func_wrapper = self._model.module.predict # 用于调用 else: self._predict_func = self._model.predict self._predict_func_wrapper = self._model.predict else: - if isinstance(self._model, nn.DataParallel): + if _model_contains_inner_module(model): self._predict_func_wrapper = self._model.forward self._predict_func = self._model.module.forward else: diff --git a/fastNLP/core/trainer.py b/fastNLP/core/trainer.py index 671e2736..09e8a437 100644 --- a/fastNLP/core/trainer.py +++ b/fastNLP/core/trainer.py @@ -352,7 +352,7 @@ from .utils import _move_dict_value_to_device from .utils import _get_func_signature from .utils import _get_model_device from .utils import _move_model_to_device - +from .utils import _model_contains_inner_module class Trainer(object): """ @@ -389,8 +389,8 @@ class Trainer(object): 要指定以哪个指标为准。另外有些指标是越小效果越好,比如语言模型的困惑度,这种情况下,在key前面增加一个'-'来表 明验证时,值越小越好(比如: "-ppl")。仅在传入dev_data时有效。 :param int validate_every: 多少个step在验证集上验证一次; 如果为-1,则每个epoch结束验证一次。仅在传入dev_data时有效。 - :param str,None save_path: 将模型保存路径。如果为None,则不保存模型。如果dev_data为None,则保存最后一次迭代的模型。 - 保存的时候不仅保存了参数,还保存了模型结构。即便使用DataParallel,这里也只保存模型。 + :param str,None save_path: 将模型保存路径,如果路径不存在,将自动创建文件夹。如果为None,则不保存模型。如果dev_data为None,则保存 + 最后一次迭代的模型。保存的时候不仅保存了参数,还保存了模型结构。即便使用DataParallel,这里也只保存模型。 :param bool use_tqdm: 是否使用tqdm来显示训练进度; 如果为False,则将loss打印在终端中。 :param str,int,torch.device,list(int) device: 将模型load到哪个设备。默认为None,即Trainer不对模型 的计算位置进行管理。支持以下的输入: @@ -440,7 +440,7 @@ class Trainer(object): # check update every assert update_every >= 1, "update_every must be no less than 1." self.update_every = int(update_every) - + # check save_path if not (save_path is None or isinstance(save_path, str)): raise ValueError("save_path can only be None or `str`.") @@ -458,30 +458,69 @@ class Trainer(object): self.metric_key = None # prepare loss losser = _prepare_losser(loss) - - # sampler check - if sampler is not None and not isinstance(sampler, Sampler): - raise ValueError("The type of sampler should be fastNLP.BaseSampler, got {}.".format(type(sampler))) - if sampler is None: - sampler = RandomSampler() - elif hasattr(sampler, 'set_batch_size'): - sampler.set_batch_size(batch_size) + if isinstance(train_data, BatchIter): + if sampler is not None: + warnings.warn("sampler is ignored when train_data is a BatchIter.") + if num_workers>0: + warnings.warn("num_workers is ignored when train_data is BatchIter.") + if drop_last: + warnings.warn("drop_last is ignored when train_data is BatchIter.") + + if isinstance(model, nn.parallel.DistributedDataParallel): # 如果是分布式的 + # device为None + if device is not None: + warnings.warn("device is ignored when model is nn.parallel.DistributedDataParallel.") + device = None + # Sampler要是分布式的 + if sampler is None: + sampler = torch.utils.data.DistributedSampler(train_data) + elif not isinstance(sampler, torch.utils.data.DistributedSampler): + raise TypeError("When using nn.parallel.DistributedDataParallel, " + "sampler must be None or torch.utils.data.DistributedSampler.") + # 不能保存模型 + if save_path: + raise RuntimeError("Saving model in Distributed situation is not allowed right now.") + else: + # sampler check + if sampler is not None and not isinstance(sampler, (Sampler, torch.utils.data.Sampler)): + raise ValueError(f"The type of sampler should be fastNLP.BaseSampler or pytorch's Sampler, got {type(sampler)}") + if sampler is None: + sampler = RandomSampler() + elif hasattr(sampler, 'set_batch_size'): + sampler.set_batch_size(batch_size) if isinstance(train_data, DataSet): self.data_iterator = DataSetIter( dataset=train_data, batch_size=batch_size, num_workers=num_workers, sampler=sampler, drop_last=drop_last) elif isinstance(train_data, BatchIter): self.data_iterator = train_data + train_data = train_data.dataset else: raise TypeError("train_data type {} not support".format(type(train_data))) - if check_code_level > -1 and isinstance(self.data_iterator, DataSetIter): - _check_code(dataset=train_data, model=model, losser=losser, metrics=metrics, dev_data=dev_data, - metric_key=self.metric_key, check_level=check_code_level, - batch_size=min(batch_size, DEFAULT_CHECK_BATCH_SIZE)) - # _check_code 是 fastNLP 帮助你检查代码是否正确的方法 。如果你在错误栈中看到这行注释,请认真检查你的代码 self.model = _move_model_to_device(model, device=device) + if _model_contains_inner_module(self.model): + self._forward_func = self.model.module.forward + else: + self._forward_func = self.model.forward + if check_code_level > -1: + # _check_code 是 fastNLP 帮助你检查代码是否正确的方法 。如果你在错误栈中看到这行注释,请认真检查你的field名与模型的输入 + # 名是否匹配 + dev_dataset = dev_data + if isinstance(dev_data, BatchIter): + dev_dataset = None + warnings.warn("dev_data is of BatchIter type, ignore validation checking.") + check_batch_size = min(batch_size, DEFAULT_CHECK_BATCH_SIZE) + if isinstance(self.model, nn.DataParallel): + _num_devices = len(self.model.device_ids) + if batch_size//_num_devices>1: # 如果多卡是每个卡可以分多个数据的,则用每个卡给两个sample + check_batch_size = max(len(self.model.device_ids)*2, check_batch_size) + else: + check_batch_size = max(len(self.model.device_ids), check_batch_size) + _check_code(dataset=train_data, model=self.model, losser=losser, forward_func=self._forward_func, metrics=metrics, + dev_data=dev_dataset, metric_key=self.metric_key, check_level=check_code_level, + batch_size=check_batch_size) self.train_data = train_data self.dev_data = dev_data # If None, No validation. @@ -496,8 +535,7 @@ class Trainer(object): self.best_dev_epoch = None self.best_dev_step = None self.best_dev_perf = None - self.n_steps = (len(self.train_data) // self.batch_size + int( - len(self.train_data) % self.batch_size != 0)) * int(drop_last==0) * self.n_epochs + self.n_steps = len(self.data_iterator) * self.n_epochs if isinstance(optimizer, torch.optim.Optimizer): self.optimizer = optimizer @@ -600,10 +638,6 @@ class Trainer(object): self.step = 0 self.epoch = 0 start = time.time() - if isinstance(self.model, nn.DataParallel): - self._forward_func = self.model.module.forward - else: - self._forward_func = self.model.forward with inner_tqdm(total=self.n_steps, postfix='loss:{0:<6.5f}', leave=False, dynamic_ncols=True) as pbar: self.pbar = pbar avg_loss = 0 @@ -745,7 +779,7 @@ class Trainer(object): model_path = os.path.join(self.save_path, model_name) if not os.path.exists(self.save_path): os.makedirs(self.save_path, exist_ok=True) - if isinstance(model, nn.DataParallel): + if _model_contains_inner_module(model): model = model.module if only_param: state_dict = model.state_dict() @@ -765,7 +799,7 @@ class Trainer(object): states = torch.load(model_path) else: states = torch.load(model_path).state_dict() - if isinstance(model, nn.DataParallel): + if _model_contains_inner_module(model): model.module.load_state_dict(states) else: model.load_state_dict(states) @@ -823,12 +857,10 @@ def _get_value_info(_dict): from numbers import Number from .batch import _to_tensor -def _check_code(dataset, model, losser, metrics, batch_size=DEFAULT_CHECK_BATCH_SIZE, - dev_data=None, metric_key=None, - check_level=0): +def _check_code(dataset, model, losser, metrics, forward_func, batch_size=DEFAULT_CHECK_BATCH_SIZE, + dev_data=None, metric_key=None, check_level=0): # check get_loss 方法 - model_devcie = _get_model_device(model=model) - + model_device = _get_model_device(model=model) def _iter(): start_idx = 0 while start_idx