From 7ce03a601b702ce759dd3e13cf8878d1f273f4d9 Mon Sep 17 00:00:00 2001 From: yh_cc Date: Mon, 13 Sep 2021 10:26:45 +0800 Subject: [PATCH] =?UTF-8?q?1.=E4=BF=AE=E6=94=B9Trainer=E4=B8=AD=E7=9A=84pi?= =?UTF-8?q?n=5Fmemory=E5=8F=82=E6=95=B0;=202.=E4=BF=AE=E6=94=B9DistTrainer?= =?UTF-8?q?=E4=BD=BF=E5=BE=97DistTrainer=E5=92=8CTrainer=E7=9A=84api?= =?UTF-8?q?=E4=BD=BF=E7=94=A8=E5=8F=AF=E4=BB=A5=E5=B0=BD=E9=87=8F=E6=8E=A5?= =?UTF-8?q?=E8=BF=91?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- fastNLP/core/dist_trainer.py | 55 +++++++++++++++++++++++++++--------- fastNLP/core/tester.py | 8 ++++-- fastNLP/core/trainer.py | 11 +++++--- 3 files changed, 55 insertions(+), 19 deletions(-) diff --git a/fastNLP/core/dist_trainer.py b/fastNLP/core/dist_trainer.py index a26673b2..a82587bd 100644 --- a/fastNLP/core/dist_trainer.py +++ b/fastNLP/core/dist_trainer.py @@ -32,6 +32,7 @@ from .utils import _build_args from .utils import _build_fp16_env from .utils import _get_func_signature from .utils import _move_dict_value_to_device +from .sampler import Sampler __all__ = [ 'get_local_rank', @@ -68,7 +69,7 @@ class DistTrainer(): dev_data=None, metrics=None, metric_key=None, update_every=1, print_every=10, validate_every=-1, save_path=None, device='auto', - fp16=False, use_tqdm=True, **kwargs): + fp16=False, use_tqdm=True, sampler=None, **kwargs): r""" :param train_data: 训练集, :class:`~fastNLP.DataSet` 类型。 @@ -101,6 +102,8 @@ class DistTrainer(): :param str device: 指定 device,可以是 gpu,cpu 或 auto :param bool fp16: 指定是否使用半精度训练。 :param bool use_tqdm: 是否使用tqdm来显示训练进度; 如果为False,则将loss打印在终端中。 + :param Sampler sampler: 使用的sampler,如果不指定,默认使用的DistributedSampler。使用这个参数的情况一般为,明确修改了每个 + rank的Dataset,使得每个rank上的dataset虽然sample数量一样多,但是sample其实不一样。 :param kwargs: 支持配置可选参数 bool test_use_tqdm: 在dev上验证的时候是否开启tqdm Sampler test_sampler: 在evaluate的时候使用的sampler @@ -108,6 +111,9 @@ class DistTrainer(): bool test_use_fp16: test时使用fp16 bool set_grad_to_none: zero_grad时将grad设为None而不是0 GradScaler gradscaler: 自定义的梯度 scaler + bool pin_memory: 是否将产生的tensor使用pin memory, 可能会加快数据速度。一般在tensor较多或tensor维度较大时,有速度增益。 + bool find_unused_parameters: 在将model转化为DistributedDataParallel类型的时候,需要填入该参数,除非model内确实有 + forward没用上的参数,否则应该不需要用到该参数。 """ assert device in ['auto', 'cuda', 'cpu'], "Please set correct device in [auto', 'cuda', 'cpu']" if device == 'auto': @@ -126,6 +132,8 @@ class DistTrainer(): self.rank = dist.get_rank() # unique id for each process self.train_data = train_data + if kwargs.get('batch_size', None): + batch_size_per_gpu = int(kwargs.get('batch_size')) self.batch_size_per_gpu = int(batch_size_per_gpu) self.n_epochs = int(n_epochs) self.num_data_workers = int(num_workers) @@ -163,7 +171,8 @@ class DistTrainer(): # init DataParallel if parse_version(torch.__version__)>=parse_version('1.1'): self.ddp_model = DDP(model, device_ids=[self.local_rank], - output_device=self.local_rank, find_unused_parameters=True) + output_device=self.local_rank, + find_unused_parameters=kwargs.get('find_unused_parameters', False)) else: self.ddp_model = DDP(model, device_ids=[self.local_rank], output_device=self.local_rank) @@ -172,7 +181,17 @@ class DistTrainer(): optimizer = self._get_optimizer(optimizer) self.optimizer = optimizer if isinstance(self.train_data, DataSet): - self.sampler = DistributedSampler(self.train_data) + if sampler is None: + self.sampler = DistributedSampler(self.train_data) + else: + # sampler check + if sampler is not None and not isinstance(sampler, (Sampler, torch.utils.data.Sampler)): + raise ValueError( + f"The type of sampler should be fastNLP.BaseSampler or pytorch's Sampler, got {type(sampler)}") + elif hasattr(sampler, 'set_batch_size'): + sampler.set_batch_size(batch_size_per_gpu) + self.sampler = sampler + self.pin_memory = kwargs.get('pin_memory', True) self.data_iterator = self._get_data_iter(self.train_data) self.batch_size = self.world_size * self.batch_size_per_gpu self.n_steps = self._get_n_steps() @@ -191,7 +210,6 @@ class DistTrainer(): batch_size=dev_batch_size, num_workers=num_workers, sampler=kwargs.get('test_sampler', None), use_tqdm=self.test_use_tqdm) self.test_manager.add_callback([cb], master=True) - # Setup logging # 同步start_time sync_time = torch.tensor(time.time(), dtype=torch.double).to(self.device) @@ -233,7 +251,8 @@ class DistTrainer(): def _get_data_iter(self, dataset): if isinstance(dataset, DataSet): return DataSetIter(dataset=dataset, batch_size=self.batch_size_per_gpu, sampler=self.sampler, - num_workers=self.num_data_workers, drop_last=self.drop_last) + num_workers=self.num_data_workers, drop_last=self.drop_last, + pin_memory=self.pin_memory) elif isinstance(dataset, BatchIter): return dataset else: @@ -347,7 +366,7 @@ class DistTrainer(): for batch_x, batch_y in data_iterator: self.step += 1 self.ddp_model.train() - _move_dict_value_to_device(batch_x, batch_y, device=self.device) + _move_dict_value_to_device(batch_x, batch_y, device=self.device, non_blocking=self.pin_memory) indices = data_iterator.get_batch_indices() # negative sampling; replace unknown; re-weight batch_y self.callback_manager.on_batch_begin(batch_x, batch_y, indices) @@ -361,10 +380,9 @@ class DistTrainer(): # Is loss NaN or inf? requires_grad = False self.callback_manager.on_backward_begin(loss) - self.grad_scaler.scale(loss).backward() + self._grad_backward(loss) self.callback_manager.on_backward_end() - if self.step % self.update_every == 0: - self._update() + self._update() self.callback_manager.on_step_end() if self.step % self.print_every == 0: @@ -390,7 +408,7 @@ class DistTrainer(): self.pbar = None # ============ tqdm end ============== # - def _clear_grad_opt(self, optimizer): + def _clear_grad(self, optimizer): if self.set_grad_to_none: for group in optimizer.param_groups: for p in group['params']: @@ -399,13 +417,24 @@ class DistTrainer(): else: optimizer.zero_grad() + def _grad_backward(self, loss): + r"""Compute gradient with link rules. + + :param loss: a scalar where back-prop starts + + For PyTorch, just do "loss.backward()" + """ + if (self.step-1) % self.update_every == 0: + self._clear_grad(self.optimizer) + self.grad_scaler.scale(loss).backward() + def _update(self): r"""Perform weight update on a model. """ - self.grad_scaler.step(self.optimizer) - self.grad_scaler.update() - self._clear_grad_opt(self.optimizer) + if self.step % self.update_every == 0: + self.grad_scaler.step(self.optimizer) + self.grad_scaler.update() def _data_forward(self, network, x): x = _build_args(self._forward_func, **x) diff --git a/fastNLP/core/tester.py b/fastNLP/core/tester.py index 4cf83fac..55ffd9cf 100644 --- a/fastNLP/core/tester.py +++ b/fastNLP/core/tester.py @@ -98,6 +98,7 @@ class Tester(object): :param bool fp16: 是否使用float16进行验证 :param kwargs: Sampler sampler: 支持传入sampler控制测试顺序 + bool pin_memory: 是否将产生的tensor使用pin memory, 可能会加快数据速度。 """ super(Tester, self).__init__() @@ -112,6 +113,7 @@ class Tester(object): self.verbose = verbose self.use_tqdm = use_tqdm self.logger = logger + self.pin_memory = kwargs.get('pin_memory', True) if isinstance(data, DataSet): sampler = kwargs.get('sampler', None) @@ -122,7 +124,8 @@ class Tester(object): if hasattr(sampler, 'set_batch_size'): sampler.set_batch_size(batch_size) self.data_iterator = DataSetIter(dataset=data, batch_size=batch_size, sampler=sampler, - num_workers=num_workers) + num_workers=num_workers, + pin_memory=self.pin_memory) elif isinstance(data, BatchIter): self.data_iterator = data else: @@ -179,7 +182,8 @@ class Tester(object): start_time = time.time() for batch_x, batch_y in data_iterator: - _move_dict_value_to_device(batch_x, batch_y, device=self._model_device) + _move_dict_value_to_device(batch_x, batch_y, device=self._model_device, + non_blocking=self.pin_memory) with self.auto_cast(): pred_dict = self._data_forward(self._predict_func, batch_x) if not isinstance(pred_dict, dict): diff --git a/fastNLP/core/trainer.py b/fastNLP/core/trainer.py index eab28854..398272bf 100644 --- a/fastNLP/core/trainer.py +++ b/fastNLP/core/trainer.py @@ -432,6 +432,7 @@ class Trainer(object): bool set_grad_to_none: 在zero_grad的时候是否将gradient设置为None,而不是设置为zero GradScaler grad_scaler: 仅在fp16为True时有效,如果不使用torch.cuda.amp.GradScaler的初始化参数,可传入一个已经初始化后的 grad_scaler。 + bool pin_memory: 是否将产生的tensor使用pin memory, 可能会加快数据速度。 """ super(Trainer, self).__init__() if not isinstance(model, nn.Module): @@ -472,7 +473,7 @@ class Trainer(object): warnings.warn("num_workers is ignored when train_data is BatchIter.") if drop_last: warnings.warn("drop_last is ignored when train_data is BatchIter.") - + self.pin_memory = kwargs.get('pin_memory', True) if isinstance(model, nn.parallel.DistributedDataParallel): # 如果是分布式的 # device为None if device is not None: @@ -502,12 +503,13 @@ class Trainer(object): sampler(train_data) train_data = DataSetIter(train_data, batch_size=1, sampler=None, as_numpy=False, num_workers=num_workers, - pin_memory=False, drop_last=drop_last, timeout=0, worker_init_fn=None, + pin_memory=self.pin_memory, drop_last=drop_last, timeout=0, worker_init_fn=None, batch_sampler=sampler) if isinstance(train_data, DataSet): self.data_iterator = DataSetIter(dataset=train_data, batch_size=batch_size, sampler=sampler, - num_workers=num_workers, drop_last=drop_last) + num_workers=num_workers, drop_last=drop_last, + pin_memory=self.pin_memory) elif isinstance(train_data, BatchIter): self.data_iterator = train_data train_data = train_data.dataset @@ -600,7 +602,8 @@ class Trainer(object): use_tqdm=self.test_use_tqdm, sampler=kwargs.get('test_sampler', None), fp16=self.test_use_fp16, - num_workers=num_workers) + num_workers=num_workers, + pin_memory=self.pin_memory) self.start_time = None # start timestamp