| @@ -29,6 +29,81 @@ __all__ = [ | |||||
| ] | ] | ||||
| class DeepSpeedDriver(TorchDDPDriver): | class DeepSpeedDriver(TorchDDPDriver): | ||||
| """ | |||||
| 实现 ``deepspeed`` 分布式训练的 ``Driver``。 | |||||
| .. note:: | |||||
| 您在绝大多数情况下不需要自己使用到该类,通过向 ``Trainer`` 传入正确的参数,您可以方便快速地部署您的分布式训练; | |||||
| ``DeepSpeedDriver`` 目前支持的三种启动方式: | |||||
| 1. 用户自己不进行任何操作,直接使用我们的 ``Trainer``,这时是由我们自己使用 ``open_subprocesses`` 拉起多个进程, | |||||
| 然后 ``DeepSpeedDriver`` 自己通过调用 ``deepspeed.initialize`` 来初始化模型和同心组;(情况 A) | |||||
| .. code-block:: | |||||
| trainer = Trainer( | |||||
| ... | |||||
| driver='deepspeed', | |||||
| device=[0, 1] | |||||
| ) | |||||
| trainer.run() | |||||
| 通过运行 ``python train.py`` 启动; | |||||
| 2. 用户同样不在 ``Trainer`` 之外初始化 ``deepspeed``,但是用户自己使用 ``python -m torch.distributed.launch`` 拉起来创建多个进程,这时我们仍旧 | |||||
| 会通过调用 ``model.initialize`` 来初始化 ``ddp`` 的通信组;(情况 B) | |||||
| .. code-block:: | |||||
| trainer = Trainer( | |||||
| ... | |||||
| driver='deepspeed', | |||||
| device=None | |||||
| ) | |||||
| trainer.run() | |||||
| 通过运行 ``deepspeed train.py`` 启动; | |||||
| 3. 用户自己在外面初始化 ``deepspeed``,并且通过 ``deepspeed train.py`` 拉起,这时无论是多个进程的拉起和通信组的建立 | |||||
| 都由用户自己操作,我们只会在 ``driver.setup`` 的时候对 ``DeepSpeedDriver`` 设置一些必要的属性值;(情况 C) | |||||
| .. code-block:: | |||||
| import deepspeed | |||||
| # 初始化 | |||||
| model, _, _, _ = deepspeed.initialize(model, ...) | |||||
| trainer = Trainer( | |||||
| ... | |||||
| driver='deepspeed', | |||||
| device=None | |||||
| ) | |||||
| trainer.run() | |||||
| 通过运行 ``deepspeed train.py`` 启动; | |||||
| :param model: 传入给 ``Trainer`` 的 ``model`` 参数; | |||||
| :param parallel_device: 用于分布式训练的 ``gpu`` 设备; | |||||
| :param is_pull_by_torch_run: 标志当前的脚本的启动是否由 ``python -m torch.distributed.launch`` 启动的; | |||||
| :param fp16: 是否开启 fp16 训练; | |||||
| :param deepspeed_kwargs: | |||||
| * *strategy* -- 使用 ZeRO 优化的策略,默认为 ``deepspeed``;目前仅支持以下值: | |||||
| * ``deepspeed`` -- 使用 ZeRO 的第二阶段,等同于 ``deepspeed_stage_2``; | |||||
| * ``deepspeed_stage_1`` -- 使用 ZeRO 的第一阶段,仅将 ``optimizer`` 的状态分散到不同设备上; | |||||
| * ``deepspeed_stage_2`` -- 使用 ZeRO 的第二阶段,将 ``optimizer`` 和**梯度**分散到不同设备上; | |||||
| * ``deepspeed_stage_2_offload`` -- 使用 ZeRO 的第二阶段,并且借助 cpu 的内存来进一步节约显存; | |||||
| * ``deepspeed_stage_3`` -- 使用 ZeRO 的第三阶段,将 ``optimizer`` 、**梯度**和**模型**分散到不同设备上; | |||||
| * ``deepspeed_stage_3_offload`` -- 使用 ZeRO 的第三阶段,并且借助 cpu 的内存来进一步节约显存; | |||||
| * ``deepspeed_stage_3_offload_nvme`` -- 使用 ZeRO 的第三阶段,并且借助 NVMe 硬盘来进一步节约显存; | |||||
| * *logging_level* -- ``deepspeed`` 库的日志等级,默认为 **logging.ERROR**; | |||||
| * *config* -- ``deepspeed`` 的各项设置;**FastNLP** 允许用户传入自己的设置以增强灵活性,但这会使参数 | |||||
| 中的 ``optimizer`` 、``strategy`` 、 ``fp16`` 等失效,即当这个参数存在时,**FastNLP** 会用该参数覆盖 | |||||
| 其它的设置; | |||||
| """ | |||||
| # TODO fp16 load_config | # TODO fp16 load_config | ||||
| def __init__( | def __init__( | ||||
| self, | self, | ||||
| @@ -36,11 +111,13 @@ class DeepSpeedDriver(TorchDDPDriver): | |||||
| parallel_device: Union[List["torch.device"], "torch.device"], | parallel_device: Union[List["torch.device"], "torch.device"], | ||||
| is_pull_by_torch_run = False, | is_pull_by_torch_run = False, | ||||
| fp16: bool = False, | fp16: bool = False, | ||||
| deepspeed_kwargs: Dict = {}, | |||||
| **kwargs | **kwargs | ||||
| ): | ): | ||||
| assert _NEED_IMPORT_DEEPSPEED, "Deepspeed is not imported." | assert _NEED_IMPORT_DEEPSPEED, "Deepspeed is not imported." | ||||
| # assert not dist.is_initialized(), "DeepSpeedDriver does not support initialize distributed by user." | |||||
| TorchDriver.__init__(self, model=model, fp16=False, **kwargs) | |||||
| kwargs.pop("torch_kwargs", None) | |||||
| self._ds_kwargs = deepspeed_kwargs | |||||
| TorchDriver.__init__(self, model=model, fp16=False, torch_kwargs=deepspeed_kwargs, **kwargs) | |||||
| self.fp16 = fp16 | self.fp16 = fp16 | ||||
| # 如果用户自己在外面初始化 DDP,那么其一定是通过 python -m torch.distributed.launch 拉起的; | # 如果用户自己在外面初始化 DDP,那么其一定是通过 python -m torch.distributed.launch 拉起的; | ||||
| @@ -108,7 +185,6 @@ class DeepSpeedDriver(TorchDDPDriver): | |||||
| "to 1 for deepspeed configuration.") | "to 1 for deepspeed configuration.") | ||||
| self.train_micro_batch_size = 1 | self.train_micro_batch_size = 1 | ||||
| self._ds_kwargs = kwargs.get("deepspeed_kwargs", {}) | |||||
| self.strategy = self._ds_kwargs.get("strategy", "deepspeed") | self.strategy = self._ds_kwargs.get("strategy", "deepspeed") | ||||
| deepspeed_logging_level = self._ds_kwargs.get("logging_level", logging.ERROR) | deepspeed_logging_level = self._ds_kwargs.get("logging_level", logging.ERROR) | ||||
| deepspeed.utils.logging.logger.setLevel(deepspeed_logging_level) | deepspeed.utils.logging.logger.setLevel(deepspeed_logging_level) | ||||
| @@ -125,7 +201,7 @@ class DeepSpeedDriver(TorchDDPDriver): | |||||
| 准备分布式环境,该函数主要做以下两件事情: | 准备分布式环境,该函数主要做以下两件事情: | ||||
| 1. 开启多进程,每个 gpu 设备对应单独的一个进程; | 1. 开启多进程,每个 gpu 设备对应单独的一个进程; | ||||
| 2. 每个进程将模型迁移到自己对应的 ``gpu`` 设备上;然后使用 ``DistributedDataParallel`` 包裹模型; | |||||
| 2. 使用 ``deepspeed.initialize`` 包裹模型; | |||||
| """ | """ | ||||
| if len(self.optimizers) != 1: | if len(self.optimizers) != 1: | ||||
| raise ValueError("Multi optimizers is not supported for `DeepSpeedDriver` right now.") | raise ValueError("Multi optimizers is not supported for `DeepSpeedDriver` right now.") | ||||
| @@ -160,15 +236,15 @@ class DeepSpeedDriver(TorchDDPDriver): | |||||
| self.open_subprocess() | self.open_subprocess() | ||||
| self.global_rank = self.local_rank # rank 一定是通过环境变量去获取的; | self.global_rank = self.local_rank # rank 一定是通过环境变量去获取的; | ||||
| deepspeed.init_distributed("nccl", distributed_port=self.master_port) | deepspeed.init_distributed("nccl", distributed_port=self.master_port) | ||||
| # 用户在这个 trainer 前面又初始化了一个 trainer,并且使用的是 TorchDDPDriver; | |||||
| # 用户在这个 trainer 前面又初始化了一个 trainer,并且使用的是 DeepSpeedDriver; | |||||
| else: | else: | ||||
| # 如果 `dist.is_initialized() == True`,那么说明 TorchDDPDriver 在之前已经初始化并且已经 setup 过一次,那么我们需要保证现在 | |||||
| # 使用的(即之后的)TorchDDPDriver 的设置和第一个 TorchDDPDriver 是完全一样的; | |||||
| # 如果 `dist.is_initialized() == True`,那么说明 DeepSpeedDriver 在之前已经初始化并且已经 setup 过一次,那么我们需要保证现在 | |||||
| # 使用的(即之后的)DeepSpeedDriver 的设置和第一个 DeepSpeedDriver 是完全一样的; | |||||
| pre_num_processes = int(os.environ[FASTNLP_DISTRIBUTED_CHECK]) | pre_num_processes = int(os.environ[FASTNLP_DISTRIBUTED_CHECK]) | ||||
| if pre_num_processes != len(self.parallel_device): | if pre_num_processes != len(self.parallel_device): | ||||
| raise RuntimeError( | raise RuntimeError( | ||||
| "Notice you are using `TorchDDPDriver` after one instantiated `TorchDDPDriver`, it is not" | |||||
| "allowed that your second `TorchDDPDriver` has a new setting of parameters " | |||||
| "Notice you are using `DeepSpeedDriver` after one instantiated `DeepSpeedDriver`, it is not" | |||||
| "allowed that your second `DeepSpeedDriver` has a new setting of parameters " | |||||
| "`num_nodes` and `num_processes`.") | "`num_nodes` and `num_processes`.") | ||||
| self.world_size = dist.get_world_size() | self.world_size = dist.get_world_size() | ||||
| self.global_rank = dist.get_rank() | self.global_rank = dist.get_rank() | ||||
| @@ -302,7 +378,7 @@ class DeepSpeedDriver(TorchDDPDriver): | |||||
| 保存当前 driver 的模型到 folder 下。 | 保存当前 driver 的模型到 folder 下。 | ||||
| :param filepath: 保存到哪个文件夹; | :param filepath: 保存到哪个文件夹; | ||||
| :param only_state_dict: 是否只保存权重; | |||||
| :param only_state_dict: 是否只保存权重;在 ``DeepSpeedDriver`` 中该参数无效; | |||||
| :return: | :return: | ||||
| """ | """ | ||||
| # deepspeed engine 要求在每个 rank 都调用 save_checkpoint,故去掉了 rank_zero_call 装饰器 | # deepspeed engine 要求在每个 rank 都调用 save_checkpoint,故去掉了 rank_zero_call 装饰器 | ||||
| @@ -325,7 +401,7 @@ class DeepSpeedDriver(TorchDDPDriver): | |||||
| 从 folder 中加载权重并赋值到当前 driver 的模型上。 | 从 folder 中加载权重并赋值到当前 driver 的模型上。 | ||||
| :param filepath: 加载权重或模型的路径 | :param filepath: 加载权重或模型的路径 | ||||
| :param load_state_dict: 保存的内容是否只是权重。 | |||||
| :param load_state_dict: 保存的内容是否只是权重;在 ``DeepSpeedDriver`` 中该参数无效; | |||||
| :param kwargs: | :param kwargs: | ||||
| :return: | :return: | ||||
| """ | """ | ||||