@@ -207,12 +207,6 @@ class PaddleFleetDriver(PaddleDriver): | |||||
raise NotImplementedError("FastNLP only support `collective` for distributed training now.") | raise NotImplementedError("FastNLP only support `collective` for distributed training now.") | ||||
self.role_maker = self._fleet_kwargs.pop("role_maker", None) | self.role_maker = self._fleet_kwargs.pop("role_maker", None) | ||||
if self.local_rank == 0 and not is_in_paddle_dist(): | |||||
# 由于使用driver时模型一定会被初始化,因此在一开始程序一定会占用一部分显存来存放模型,然而这部分显存没有 | |||||
# 发挥任何作用。 | |||||
logger.warning(f"The program will use some extra space on {paddle.device.get_device()} to place your model since the model " | |||||
"has already been initialized.") | |||||
self.output_from_new_proc = kwargs.get("output_from_new_proc", "only_error") | self.output_from_new_proc = kwargs.get("output_from_new_proc", "only_error") | ||||
assert isinstance(self.output_from_new_proc, str), "Parameter `output_from_new_proc` can only be `str` type." | assert isinstance(self.output_from_new_proc, str), "Parameter `output_from_new_proc` can only be `str` type." | ||||
if self.output_from_new_proc not in {"all", "ignore", "only_error"}: | if self.output_from_new_proc not in {"all", "ignore", "only_error"}: | ||||
@@ -10,6 +10,7 @@ from fastNLP.envs.env import ( | |||||
FASTNLP_DISTRIBUTED_CHECK, | FASTNLP_DISTRIBUTED_CHECK, | ||||
FASTNLP_LOG_LEVEL, | FASTNLP_LOG_LEVEL, | ||||
FASTNLP_GLOBAL_SEED, | FASTNLP_GLOBAL_SEED, | ||||
FASTNLP_GLOBAL_RANK | |||||
) | ) | ||||
from fastNLP.core.utils import get_paddle_device_id | from fastNLP.core.utils import get_paddle_device_id | ||||
from .utils import ( | from .utils import ( | ||||
@@ -130,6 +131,8 @@ class FleetLauncher: | |||||
""" | """ | ||||
global_envs = copy.copy(os.environ.copy()) | global_envs = copy.copy(os.environ.copy()) | ||||
os.environ[FASTNLP_GLOBAL_RANK] = "0" | |||||
self.gloo_rendezvous_dir = tempfile.mkdtemp() | self.gloo_rendezvous_dir = tempfile.mkdtemp() | ||||
# launch中涉及的gloo环境 | # launch中涉及的gloo环境 | ||||
global_envs["PADDLE_WITH_GLOO"] = str(os.getenv("PADDLE_WITH_GLOO", "0")) | global_envs["PADDLE_WITH_GLOO"] = str(os.getenv("PADDLE_WITH_GLOO", "0")) | ||||
@@ -6,6 +6,7 @@ from fastNLP.core.metrics.backend import Backend | |||||
if _NEED_IMPORT_JITTOR: | if _NEED_IMPORT_JITTOR: | ||||
import jittor | import jittor | ||||
__all__ = [] | |||||
class JittorBackend(Backend): | class JittorBackend(Backend): | ||||
@@ -4,23 +4,17 @@ from typing import List, Any | |||||
import numpy as np | import numpy as np | ||||
from fastNLP.core.metrics.backend import Backend | from fastNLP.core.metrics.backend import Backend | ||||
from fastNLP.core.utils.paddle_utils import paddle_to, _convert_data_device | |||||
from fastNLP.core.utils.paddle_utils import paddle_to, _convert_data_device, is_in_paddle_dist | |||||
from fastNLP.core.metrics.utils import AggregateMethodError | from fastNLP.core.metrics.utils import AggregateMethodError | ||||
from fastNLP.core.drivers.paddle_driver.dist_utils import fastnlp_paddle_all_gather | from fastNLP.core.drivers.paddle_driver.dist_utils import fastnlp_paddle_all_gather | ||||
from fastNLP.envs.imports import _NEED_IMPORT_PADDLE | from fastNLP.envs.imports import _NEED_IMPORT_PADDLE | ||||
from fastNLP.envs.env import USER_CUDA_VISIBLE_DEVICES | |||||
if _NEED_IMPORT_PADDLE: | if _NEED_IMPORT_PADDLE: | ||||
import paddle | import paddle | ||||
import paddle.distributed as dist | import paddle.distributed as dist | ||||
from paddle.fluid.dygraph import parallel_helper | from paddle.fluid.dygraph import parallel_helper | ||||
def _simple_gather_all_tensors(result, group: Any, world_size: int) -> List: | |||||
gathered_result = [paddle.zeros_like(result) for _ in range(world_size)] | |||||
dist.all_gather(gathered_result, result, group) | |||||
return gathered_result | |||||
__all__ = [] | |||||
class PaddleBackend(Backend): | class PaddleBackend(Backend): | ||||
def __init__(self): | def __init__(self): | ||||
@@ -80,6 +74,13 @@ class PaddleBackend(Backend): | |||||
else: | else: | ||||
raise ValueError(f"tensor: {tensor} can not convert to ndarray!") | raise ValueError(f"tensor: {tensor} can not convert to ndarray!") | ||||
@staticmethod | |||||
def is_distributed() -> bool: | |||||
""" | |||||
:return: | |||||
""" | |||||
return is_in_paddle_dist() | |||||
def move_tensor_to_device(self, tensor, device): | def move_tensor_to_device(self, tensor, device): | ||||
device = _convert_data_device(device) | device = _convert_data_device(device) | ||||
return paddle_to(tensor, device) | return paddle_to(tensor, device) | ||||
@@ -12,12 +12,7 @@ if _NEED_IMPORT_TORCH: | |||||
import torch | import torch | ||||
import torch.distributed as dist | import torch.distributed as dist | ||||
def _simple_gather_all_tensors(result, group: Any, world_size: int) -> List: | |||||
gathered_result = [torch.zeros_like(result) for _ in range(world_size)] | |||||
dist.all_gather(gathered_result, result, group) | |||||
return gathered_result | |||||
__all__ = [] | |||||
class TorchBackend(Backend): | class TorchBackend(Backend): | ||||
def __init__(self): | def __init__(self): | ||||