@@ -55,7 +55,7 @@ class JittorDriver(Driver): | |||||
:param fp16: 是否开启混合精度训练; | :param fp16: 是否开启混合精度训练; | ||||
:param jittor_kwargs: | :param jittor_kwargs: | ||||
""" | """ | ||||
def __init__(self, model, fp16: bool = False, jittor_kwargs: Dict = {}, **kwargs): | |||||
def __init__(self, model, fp16: bool = False, jittor_kwargs: Dict = None, **kwargs): | |||||
if not isinstance(model, Module): | if not isinstance(model, Module): | ||||
raise ValueError(f"Parameter `model` can not be `{type(model)}` in `JittorDriver`, it should be exactly " | raise ValueError(f"Parameter `model` can not be `{type(model)}` in `JittorDriver`, it should be exactly " | ||||
f"`jittor.Module` type.") | f"`jittor.Module` type.") | ||||
@@ -67,7 +67,7 @@ class JittorDriver(Driver): | |||||
jt.flags.auto_mixed_precision_level = 0 | jt.flags.auto_mixed_precision_level = 0 | ||||
self.fp16 = fp16 | self.fp16 = fp16 | ||||
self._auto_cast = nullcontext | self._auto_cast = nullcontext | ||||
self._jittor_kwargs = jittor_kwargs | |||||
self._jittor_kwargs = jittor_kwargs if jittor_kwargs is not None else {} | |||||
# 用来设置是否关闭 auto_param_call 中的参数匹配问题; | # 用来设置是否关闭 auto_param_call 中的参数匹配问题; | ||||
self.wo_auto_param_call = kwargs.get("model_wo_auto_param_call", False) | self.wo_auto_param_call = kwargs.get("model_wo_auto_param_call", False) | ||||
@@ -34,7 +34,7 @@ class JittorMPIDriver(JittorDriver): | |||||
parallel_device: None, | parallel_device: None, | ||||
is_pull_by_jittor_run: bool = False, | is_pull_by_jittor_run: bool = False, | ||||
fp16: bool = False, | fp16: bool = False, | ||||
jittor_kwargs: Dict = {}, | |||||
jittor_kwargs: Dict = None, | |||||
**kwargs | **kwargs | ||||
): | ): | ||||
@@ -37,7 +37,7 @@ class JittorSingleDriver(JittorDriver): | |||||
:param jittor_kwargs: | :param jittor_kwargs: | ||||
""" | """ | ||||
def __init__(self, model, device=None, fp16: bool = False, jittor_kwargs: Dict = {}, **kwargs): | |||||
def __init__(self, model, device=None, fp16: bool = False, jittor_kwargs: Dict = None, **kwargs): | |||||
if device not in [None, "cpu", "gpu", "cuda"]: | if device not in [None, "cpu", "gpu", "cuda"]: | ||||
raise RuntimeError("Parameter `device` should be one of [None, 'cpu', 'gpu', 'cuda'] .") | raise RuntimeError("Parameter `device` should be one of [None, 'cpu', 'gpu', 'cuda'] .") | ||||
super(JittorSingleDriver, self).__init__(model, fp16, jittor_kwargs=jittor_kwargs) | super(JittorSingleDriver, self).__init__(model, fp16, jittor_kwargs=jittor_kwargs) | ||||
@@ -46,7 +46,7 @@ class OneflowDDPDriver(OneflowDriver): | |||||
任何当前有多少台机器的信息; | 任何当前有多少台机器的信息; | ||||
:param model: 传入给 ``Trainer`` 的 ``model`` 参数; | :param model: 传入给 ``Trainer`` 的 ``model`` 参数; | ||||
:param parallel_device: 该参数无效,**FastNLP** 会自动获取当前进程的设备; | |||||
:param parallel_device: 该参数无效,**fastNLP** 会自动获取当前进程的设备; | |||||
:param fp16: 是否开启 fp16 训练;目前该参数无效; | :param fp16: 是否开启 fp16 训练;目前该参数无效; | ||||
:param oneflow_kwargs: | :param oneflow_kwargs: | ||||
* *ddp_kwargs* -- 用于 ``DistributedDataParallel`` 的其它参数,详情可查阅 **oneflow** 的官方文档; | * *ddp_kwargs* -- 用于 ``DistributedDataParallel`` 的其它参数,详情可查阅 **oneflow** 的官方文档; | ||||
@@ -57,7 +57,7 @@ class OneflowDDPDriver(OneflowDriver): | |||||
model, | model, | ||||
parallel_device: Optional["oneflow.device"], | parallel_device: Optional["oneflow.device"], | ||||
fp16: bool = False, | fp16: bool = False, | ||||
oneflow_kwargs: Dict = {}, | |||||
oneflow_kwargs: Dict = None, | |||||
**kwargs | **kwargs | ||||
): | ): | ||||
@@ -48,11 +48,11 @@ class OneflowDriver(Driver): | |||||
您可以在使用 ``OneflowSingleDriver`` 和 ``OneflowDDPDriver`` 时使用 ``OneflowDriver`` 提供的接口; | 您可以在使用 ``OneflowSingleDriver`` 和 ``OneflowDDPDriver`` 时使用 ``OneflowDriver`` 提供的接口; | ||||
""" | """ | ||||
def __init__(self, model, fp16: Optional[bool] = False, oneflow_kwargs: Dict = {}, **kwargs): | |||||
def __init__(self, model, fp16: Optional[bool] = False, oneflow_kwargs: Dict = None, **kwargs): | |||||
super(OneflowDriver, self).__init__(model) | super(OneflowDriver, self).__init__(model) | ||||
""" 进行 fp16 的设置 """ | """ 进行 fp16 的设置 """ | ||||
self._oneflow_kwargs = oneflow_kwargs | |||||
self._oneflow_kwargs = oneflow_kwargs if oneflow_kwargs is not None else {} | |||||
self.fp16 = fp16 | self.fp16 = fp16 | ||||
if fp16: | if fp16: | ||||
@@ -29,14 +29,14 @@ class OneflowSingleDriver(OneflowDriver): | |||||
:param oneflow_kwargs: | :param oneflow_kwargs: | ||||
""" | """ | ||||
def __init__(self, model, device: "oneflow.device", fp16: bool = False, oneflow_kwargs: Dict = {}, **kwargs): | |||||
def __init__(self, model, device: "oneflow.device", fp16: bool = False, oneflow_kwargs: Dict = None, **kwargs): | |||||
cuda_visible_devices = os.environ.get("CUDA_VISIBLE_DEVICES", None) | cuda_visible_devices = os.environ.get("CUDA_VISIBLE_DEVICES", None) | ||||
if cuda_visible_devices == "": | if cuda_visible_devices == "": | ||||
device = oneflow.device("cpu") | device = oneflow.device("cpu") | ||||
logger.info("You have set `CUDA_VISIBLE_DEVICES` to '' in system environment variable, and we are gonna to" | logger.info("You have set `CUDA_VISIBLE_DEVICES` to '' in system environment variable, and we are gonna to" | ||||
"use `cpu` instead of `gpu` device.") | "use `cpu` instead of `gpu` device.") | ||||
super(OneflowSingleDriver, self).__init__(model, fp16=fp16, **kwargs) | |||||
super(OneflowSingleDriver, self).__init__(model, fp16=fp16, oneflow_kwargs=oneflow_kwargs, **kwargs) | |||||
if device is None: | if device is None: | ||||
logger.debug("device is not set, fastNLP will try to automatically get it.") | logger.debug("device is not set, fastNLP will try to automatically get it.") | ||||
@@ -152,12 +152,12 @@ class PaddleFleetDriver(PaddleDriver): | |||||
parallel_device: Optional[Union[List[str], str]], | parallel_device: Optional[Union[List[str], str]], | ||||
is_pull_by_paddle_run: bool = False, | is_pull_by_paddle_run: bool = False, | ||||
fp16: bool = False, | fp16: bool = False, | ||||
paddle_kwrags: Dict = {}, | |||||
paddle_kwargs: Dict = None, | |||||
**kwargs | **kwargs | ||||
): | ): | ||||
if USER_CUDA_VISIBLE_DEVICES not in os.environ: | if USER_CUDA_VISIBLE_DEVICES not in os.environ: | ||||
raise RuntimeError("To run paddle distributed training, please set `FASTNLP_BACKEND` to 'paddle' before using FastNLP.") | |||||
super(PaddleFleetDriver, self).__init__(model, fp16=fp16, paddle_kwrags=paddle_kwargs, **kwargs) | |||||
raise RuntimeError("To run paddle distributed training, please set `FASTNLP_BACKEND` to 'paddle' before using fastNLP.") | |||||
super(PaddleFleetDriver, self).__init__(model, fp16=fp16, paddle_kwargs=paddle_kwargs, **kwargs) | |||||
# 如果不是通过 launch 启动,要求用户必须传入 parallel_device | # 如果不是通过 launch 启动,要求用户必须传入 parallel_device | ||||
if not is_pull_by_paddle_run: | if not is_pull_by_paddle_run: | ||||
@@ -195,17 +195,14 @@ class PaddleFleetDriver(PaddleDriver): | |||||
self.world_size = None | self.world_size = None | ||||
self.global_rank = 0 | self.global_rank = 0 | ||||
self.gloo_rendezvous_dir = None | self.gloo_rendezvous_dir = None | ||||
# 分布式环境的其它参数设置 | |||||
paddle_kwargs = kwargs.get("paddle_kwargs", {}) | |||||
self._fleet_kwargs = paddle_kwargs.get("fleet_kwargs", {}) | |||||
self._fleet_kwargs = self._paddle_kwargs.get("fleet_kwargs", {}) | |||||
check_user_specific_params(self._fleet_kwargs, DataParallel.__init__, DataParallel.__name__) | check_user_specific_params(self._fleet_kwargs, DataParallel.__init__, DataParallel.__name__) | ||||
# fleet.init 中对于分布式策略的设置,详情可以参考 PaddlePaddle 的官方文档 | # fleet.init 中对于分布式策略的设置,详情可以参考 PaddlePaddle 的官方文档 | ||||
self.strategy = self._fleet_kwargs.get("strategy", fleet.DistributedStrategy()) | self.strategy = self._fleet_kwargs.get("strategy", fleet.DistributedStrategy()) | ||||
self.is_collective = self._fleet_kwargs.pop("is_collective", True) | self.is_collective = self._fleet_kwargs.pop("is_collective", True) | ||||
if not self.is_collective: | if not self.is_collective: | ||||
raise NotImplementedError("FastNLP only support `collective` for distributed training now.") | |||||
raise NotImplementedError("fastNLP only support `collective` for distributed training now.") | |||||
self.role_maker = self._fleet_kwargs.pop("role_maker", None) | self.role_maker = self._fleet_kwargs.pop("role_maker", None) | ||||
self.output_from_new_proc = kwargs.get("output_from_new_proc", "only_error") | self.output_from_new_proc = kwargs.get("output_from_new_proc", "only_error") | ||||
@@ -38,7 +38,7 @@ def initialize_paddle_driver(driver: str, device: Optional[Union[str, int, List[ | |||||
user_visible_devices = os.getenv(USER_CUDA_VISIBLE_DEVICES) | user_visible_devices = os.getenv(USER_CUDA_VISIBLE_DEVICES) | ||||
if is_in_paddle_launch_dist(): | if is_in_paddle_launch_dist(): | ||||
if user_visible_devices is None: | if user_visible_devices is None: | ||||
raise RuntimeError("To run paddle distributed training, please set `FASTNLP_BACKEND` to 'paddle' before using FastNLP.") | |||||
raise RuntimeError("To run paddle distributed training, please set `FASTNLP_BACKEND` to 'paddle' before using fastNLP.") | |||||
if device is not None: | if device is not None: | ||||
logger.rank_zero_warning("Parameter `device` would be ignored when you are using `paddle.distributed.launch` to pull " | logger.rank_zero_warning("Parameter `device` would be ignored when you are using `paddle.distributed.launch` to pull " | ||||
"up your script. And we will directly get the local device via environment variables.", once=True) | "up your script. And we will directly get the local device via environment variables.", once=True) | ||||
@@ -70,13 +70,14 @@ class PaddleDriver(Driver): | |||||
:param paddle_kwargs: | :param paddle_kwargs: | ||||
""" | """ | ||||
def __init__(self, model: "paddle.nn.Layer", fp16: Optional[bool] = False, paddle_kwrags: Dict = {}, **kwargs): | |||||
def __init__(self, model: "paddle.nn.Layer", fp16: Optional[bool] = False, paddle_kwargs: Dict = None, **kwargs): | |||||
if not isinstance(model, paddle.nn.Layer): | if not isinstance(model, paddle.nn.Layer): | ||||
raise ValueError(f"Parameter `model` can not be `{type(model)}` in `PaddleDriver`, it should be exactly " | raise ValueError(f"Parameter `model` can not be `{type(model)}` in `PaddleDriver`, it should be exactly " | ||||
f"`paddle.nn.Layer` type.") | f"`paddle.nn.Layer` type.") | ||||
super(PaddleDriver, self).__init__(model) | super(PaddleDriver, self).__init__(model) | ||||
self.fp16 = fp16 | self.fp16 = fp16 | ||||
self._paddle_kwargs = paddle_kwargs if paddle_kwargs is not None else {} | |||||
# scaler的参数 | # scaler的参数 | ||||
self.auto_cast, _grad_scaler = _build_fp16_env(dummy=not fp16) | self.auto_cast, _grad_scaler = _build_fp16_env(dummy=not fp16) | ||||
@@ -53,7 +53,7 @@ class PaddleSingleDriver(PaddleDriver): | |||||
关于该参数的详细说明,请参见 :class:`~fastNLP.core.controllers.Trainer` 中的描述;函数 ``auto_param_call`` 详见 :func:`fastNLP.core.utils.auto_param_call`。 | 关于该参数的详细说明,请参见 :class:`~fastNLP.core.controllers.Trainer` 中的描述;函数 ``auto_param_call`` 详见 :func:`fastNLP.core.utils.auto_param_call`。 | ||||
""" | """ | ||||
def __init__(self, model: "paddle.nn.Layer", device: Union[str, int], fp16: Optional[bool] = False, paddle_kwrags: Dict = {}, **kwargs): | |||||
def __init__(self, model: "paddle.nn.Layer", device: Union[str, int], fp16: Optional[bool] = False, paddle_kwargs: Dict = None, **kwargs): | |||||
if isinstance(model, DataParallel): | if isinstance(model, DataParallel): | ||||
raise ValueError("`paddle.DataParallel` is not supported in `PaddleSingleDriver`") | raise ValueError("`paddle.DataParallel` is not supported in `PaddleSingleDriver`") | ||||
@@ -63,7 +63,7 @@ class PaddleSingleDriver(PaddleDriver): | |||||
logger.info("You have set `CUDA_VISIBLE_DEVICES` to '' in system environment variable, and we are gonna to" | logger.info("You have set `CUDA_VISIBLE_DEVICES` to '' in system environment variable, and we are gonna to" | ||||
"use `cpu` instead of `gpu` device.") | "use `cpu` instead of `gpu` device.") | ||||
super(PaddleSingleDriver, self).__init__(model, fp16=fp16, paddle_kwrags=paddle_kwrags, **kwargs) | |||||
super(PaddleSingleDriver, self).__init__(model, fp16=fp16, paddle_kwargs=paddle_kwargs, **kwargs) | |||||
if device is None: | if device is None: | ||||
raise ValueError("Parameter `device` can not be None in `PaddleSingleDriver`.") | raise ValueError("Parameter `device` can not be None in `PaddleSingleDriver`.") | ||||
@@ -249,7 +249,7 @@ class TorchDDPDriver(TorchDriver): | |||||
parallel_device: Optional[Union[List["torch.device"], "torch.device"]], | parallel_device: Optional[Union[List["torch.device"], "torch.device"]], | ||||
is_pull_by_torch_run: bool = False, | is_pull_by_torch_run: bool = False, | ||||
fp16: bool = False, | fp16: bool = False, | ||||
torch_kwargs: Dict = {}, | |||||
torch_kwargs: Dict = None, | |||||
**kwargs | **kwargs | ||||
): | ): | ||||
@@ -111,7 +111,7 @@ class DeepSpeedDriver(TorchDDPDriver): | |||||
parallel_device: Union[List["torch.device"], "torch.device"], | parallel_device: Union[List["torch.device"], "torch.device"], | ||||
is_pull_by_torch_run = False, | is_pull_by_torch_run = False, | ||||
fp16: bool = False, | fp16: bool = False, | ||||
deepspeed_kwargs: Dict = {}, | |||||
deepspeed_kwargs: Dict = None, | |||||
**kwargs | **kwargs | ||||
): | ): | ||||
assert _NEED_IMPORT_DEEPSPEED, "Deepspeed is not imported." | assert _NEED_IMPORT_DEEPSPEED, "Deepspeed is not imported." | ||||
@@ -251,9 +251,9 @@ class DeepSpeedDriver(TorchDDPDriver): | |||||
if not self.outside_ddp: | if not self.outside_ddp: | ||||
torch.cuda.set_device(self.model_device) | torch.cuda.set_device(self.model_device) | ||||
# TODO 模型过大的话应该会导致显存溢出,但是不加的话显存会占用rank对应的设备 | |||||
# lightning里在之前通过broadcast_list广播了log_dir所以没有这种情况 | |||||
self.model.to(self.model_device) | |||||
# 不加 dist.broadcast_object_list 会发生设备在 4,5 但是模型会同步到 0,1 的情况 | |||||
# 原因未知 | |||||
dist.broadcast_object_list(["test"], 0, None) | |||||
self.configure_ddp() | self.configure_ddp() | ||||
self.barrier() | self.barrier() | ||||
@@ -35,11 +35,12 @@ class FairScaleDriver(TorchDDPDriver): | |||||
parallel_device: Union[List["torch.device"], "torch.device"], | parallel_device: Union[List["torch.device"], "torch.device"], | ||||
is_pull_by_torch_run = False, | is_pull_by_torch_run = False, | ||||
fp16: bool = False, | fp16: bool = False, | ||||
fairscale_kwargs: Dict = None, | |||||
**kwargs | **kwargs | ||||
): | ): | ||||
assert _NEED_IMPORT_FAIRSCALE, "fairscale is not imported." | assert _NEED_IMPORT_FAIRSCALE, "fairscale is not imported." | ||||
assert not dist.is_initialized(), "FairScaleDriver does not support initialize distributed by user." | assert not dist.is_initialized(), "FairScaleDriver does not support initialize distributed by user." | ||||
self._fairscale_kwargs = kwargs.get('fairscale_kwargs', {}) | |||||
self._fairscale_kwargs = fairscale_kwargs | |||||
self.fs_type = self._fairscale_kwargs.get('fs_type', 'sdp') # ddp, sdp, fsdp | self.fs_type = self._fairscale_kwargs.get('fs_type', 'sdp') # ddp, sdp, fsdp | ||||
if self.fs_type == 'fsdp': | if self.fs_type == 'fsdp': | ||||
self._fairscale_kwargs['set_grad_to_none'] = self._fairscale_kwargs.get('set_grad_to_none', True) | self._fairscale_kwargs['set_grad_to_none'] = self._fairscale_kwargs.get('set_grad_to_none', True) | ||||
@@ -41,7 +41,7 @@ class TorchSingleDriver(TorchDriver): | |||||
* *gradscaler_kwargs* -- 用于 fp16=True 时,提供给 ``torch.amp.cuda.GradScaler`` 的参数; | * *gradscaler_kwargs* -- 用于 fp16=True 时,提供给 ``torch.amp.cuda.GradScaler`` 的参数; | ||||
""" | """ | ||||
def __init__(self, model, device: "torch.device", fp16: bool = False, torch_kwargs: Dict = {}, **kwargs): | |||||
def __init__(self, model, device: "torch.device", fp16: bool = False, torch_kwargs: Dict = None, **kwargs): | |||||
if isinstance(model, DistributedDataParallel): | if isinstance(model, DistributedDataParallel): | ||||
raise ValueError("`DistributedDataParallel` is not supported in `TorchSingleDriver`") | raise ValueError("`DistributedDataParallel` is not supported in `TorchSingleDriver`") | ||||
@@ -51,11 +51,11 @@ class TorchDriver(Driver): | |||||
:param fp16: 是否开启混合精度训练; | :param fp16: 是否开启混合精度训练; | ||||
:param torch_kwargs: | :param torch_kwargs: | ||||
""" | """ | ||||
def __init__(self, model, fp16: Optional[bool] = False, torch_kwargs: Dict = {}, **kwargs): | |||||
def __init__(self, model, fp16: Optional[bool] = False, torch_kwargs: Dict = None, **kwargs): | |||||
super(TorchDriver, self).__init__(model) | super(TorchDriver, self).__init__(model) | ||||
""" 进行 fp16 的设置 """ | """ 进行 fp16 的设置 """ | ||||
self._torch_kwargs = torch_kwargs | |||||
self._torch_kwargs = torch_kwargs if torch_kwargs is not None else {} | |||||
# 因为 ddp 和 single_device 的混合精度训练的设置是一样的,因此可以统一抽象到这里; | # 因为 ddp 和 single_device 的混合精度训练的设置是一样的,因此可以统一抽象到这里; | ||||
self.fp16 = fp16 | self.fp16 = fp16 | ||||