Browse Source

提交core/drivers/paddle_driver/

tags/v1.0.0alpha
x54-729 3 years ago
parent
commit
9969502dcd
7 changed files with 1527 additions and 0 deletions
  1. +11
    -0
      fastNLP/core/drivers/paddle_driver/__init__.py
  2. +426
    -0
      fastNLP/core/drivers/paddle_driver/fleet.py
  3. +176
    -0
      fastNLP/core/drivers/paddle_driver/fleet_launcher.py
  4. +87
    -0
      fastNLP/core/drivers/paddle_driver/initialize_paddle_driver.py
  5. +315
    -0
      fastNLP/core/drivers/paddle_driver/paddle_driver.py
  6. +161
    -0
      fastNLP/core/drivers/paddle_driver/single_device.py
  7. +351
    -0
      fastNLP/core/drivers/paddle_driver/utils.py

+ 11
- 0
fastNLP/core/drivers/paddle_driver/__init__.py View File

@@ -0,0 +1,11 @@
__all__ = [
"PaddleDriver",
"PaddleSingleDriver",
"PaddleFleetDriver",
"paddle_seed_everything",
]

from .paddle_driver import PaddleDriver
from .single_device import PaddleSingleDriver
from .fleet import PaddleFleetDriver
from .utils import paddle_seed_everything

+ 426
- 0
fastNLP/core/drivers/paddle_driver/fleet.py View File

@@ -0,0 +1,426 @@
import os
from functools import partial
from typing import List, Union, Optional, Dict

from .paddle_driver import PaddleDriver
from .fleet_launcher import FleetLauncher
from .utils import (
_FleetWrappingModel,
ForwardState,
_MODE_PARAMETER,
get_host_name_ip,
get_device_from_visible,
reset_seed,
)

from fastNLP.envs.imports import _NEED_IMPORT_PADDLE
from fastNLP.core.utils import (
auto_param_call,
check_user_specific_params,
paddle_move_data_to_device,
is_in_paddle_dist,
)
from fastNLP.core.samplers import ReproducibleIterator, RandomSampler, UnrepeatedDistributedSampler
from fastNLP.envs.env import FASTNLP_DISTRIBUTED_CHECK, USER_CUDA_VISIBLE_DEVICES
from fastNLP.core.log import logger

if _NEED_IMPORT_PADDLE:
import paddle
from paddle import DataParallel
import paddle.distributed.fleet as fleet
import paddle.distributed as dist
from paddle.io import BatchSampler
from paddle.optimizer import Optimizer
from paddle.fluid.reader import _DatasetKind
from paddle.fluid.dygraph import parallel_helper

__all__ = [
"PaddleFleetDriver",
]
# if os.path.exists(self.gloo_rendezvous_dir):
# shutil.rmtree(self.gloo_rendezvous_dir)
class PaddleFleetDriver(PaddleDriver):
def __init__(
self,
model,
parallel_device: Optional[Union[List[int], int]],
is_pull_by_paddle_run: bool = False,
fp16: bool = False,
**kwargs
):
"""
采用fleet接口进行并行paddle训练的driver
PaddleFleetDriver 目前考虑支持的三种启动方式:
1. 用户自己不进行 fleet 的任何操作,直接使用我们的 Trainer,并且只运行一个 main 脚本,这时是由我们自己使用 open_subprocesses 拉起
多个进程,然后由 Driver 自己进行初始化
2. 其它情况同 1,但是用户自己使用 python -m paddle.distributed.launch 拉起;
3. 用户自己在外面初始化 Fleet,并且通过 python -m paddle.distributed.launch 拉起;

注意多机的启动强制要求用户在每一台机器上使用 python -m paddle.distributed.launch 启动;

如果用户自己在外面初始化了 fleet,那么
parallel_device 为 None;
data_device 为 表示单卡的一个参数;
dist.is_initialized 为 true;
"""
super(PaddleFleetDriver, self).__init__(model, fp16=fp16, **kwargs)

# 如果不是通过 launch 启动,要求用户必须传入 parallel_device
if not is_pull_by_paddle_run and parallel_device is None:
raise ValueError("Parameter `parallel_device` can not be None when using `PaddleFleetDriver`. This error is caused "
"when your value of parameter `device` is `None` in your `Trainer` instance.")
# 如果用户自己初始化了 paddle 的分布式训练那么一定是通过 launch 拉起的
self.is_pull_by_paddle_run = is_pull_by_paddle_run
self.parallel_device = parallel_device
# 在初始化时,如果发现 is_pull_by_paddle_run ,则将 parallel_device 设置成当前进程的gpu
if is_pull_by_paddle_run:
self._model_device = parallel_device
else:
self._model_device = parallel_device[self.local_rank]

# 如果用户自己在外面初始化了并行模型;
self.outside_fleet = False
# 检测 paddle 分布式的环境变量
if parallel_helper._is_parallel_ctx_initialized():
# 如果用户自己在外面初始化了 DDP,那么我们要求用户传入的模型一定是已经由 DistributedDataParallel 包裹后的模型;
if not isinstance(model, DataParallel):
raise RuntimeError(
"It is not allowed to input a normal model instead of `paddle.DataParallel` when"
"you initialize the paddle distribued process out of our control.")

self.outside_fleet = True
# 用户只有将模型上传到对应机器上后才能用 DataParallel 包裹,因此如果用户在外面初始化了 Fleet,那么在 PaddleFleetDriver 中
# 我们就直接将 model_device 置为 None;
self._model_device = None

def _running_fn_(batch, step_fn, signature_fn):
if isinstance(batch, Dict):
return auto_param_call(step_fn, batch, signature_fn=signature_fn)
else:
return self._validate_step(batch)

model = model._layers
if hasattr(model, "train_step"):
logger.warning(
"Notice your model is a `paddle.DataParallel` model. And your "
"model also implements the `train_step` method, which we can not call actually, we will"
" call `forward` function instead of `train_step` and you should note that.")
self._train_step = partial(_running_fn_, step_fn=self.model, signature_fn=model.forward)
# self._train_signature_fn = model.forward

if hasattr(model, "validate_step"):
logger.warning(
"Notice your model is a `paddle.DataParallel` model. And your "
"model also implements the `validate_step` method, which we can not call actually, "
"we will call `forward` function instead of `validate_step` and you should note that.")
self._validate_step = partial(_running_fn_, step_fn=self.model, signature_fn=model.forward)
# self._validate_signature_fn = model.forward

if hasattr(model, "test_step"):
logger.warning(
"Notice your model is a `paddle.DataParallel` model. And your "
"model also implements the `test_step` method, which we can not call actually, we will"
" call `forward` function instead of `test_step` and you should note that.")
self._test_step = partial(_running_fn_, step_fn=self.model, signature_fn=model.forward)

# 当参数 `device` 为 None 时并且该参数不为 None,表示将对应的数据移到指定的机器上;
self._data_device = kwargs.get("_data_device", None)
if self._data_device is not None:
if isinstance(self._data_device, int):
if self._data_device < 0:
raise ValueError("Parameter `_data_device` can not be smaller than 0.")
_could_use_device_num = paddle.device.cuda.device_count()
if self._data_device >= _could_use_device_num:
raise ValueError("The gpu device that parameter `device` specifies is not existed.")
self._data_device = f"gpu:{self._data_device}"
elif not isinstance(self._data_device, str):
raise ValueError("Parameter `device` is wrong type, please check our documentation for the right use.")
if self.outside_fleet and paddle.device.get_device() != self._data_device:
logger.warning("`Parameter data_device` is not equal to paddle.deivce.get_device(), "
"please keep them equal to avoid some potential bugs.")

if not self.outside_fleet and parallel_device is None:
raise ValueError("Parameter `parallel_device` can not be None when using `PaddleFleetDriver`. This error is caused "
"when your value of parameter `device` is `None` in your `Trainer` instance.")

# 可能需要放在参数里
self.strategy = kwargs.get("strategy", fleet.DistributedStrategy())
self.is_collective = kwargs.get("is_collective", True)
if not self.is_collective:
raise NotImplementedError("FastNLP dose not support `parameters server` for distributed training now.")
self.role_maker = kwargs.get("role_maker", None)

self._master_port = None
self.world_size = None
self.global_rank = 0
self._configured = False # 防止重复调用 configure_ddp() 函数使用
self._has_setup = False # 防止重复调用 setup() 函数

self._fleet_kwargs = kwargs.get("paddle_fleet_kwargs", {})
check_user_specific_params(self._fleet_kwargs, DataParallel.__init__)
# TODO 对这些参数的检查

if self.local_rank == 0 and not is_in_paddle_dist():
# 由于使用driver时模型一定会被初始化,因此在一开始程序一定会占用一部分显存来存放模型,然而这部分显存没有
# 发挥任何作用。
logger.warning(f"The program will use some extra space on {paddle.device.get_device()} to place your model since the model "
"has already been initialized.")

self.output_from_new_proc = kwargs.get("output_from_new_proc", "only_error")
assert isinstance(self.output_from_new_proc, str), "Parameter `output_from_new_proc` can only be `str` type."
if self.output_from_new_proc not in {"all", "ignore", "only_error"}:
os.makedirs(name=self.output_from_new_proc, exist_ok=True)
self.output_from_new_proc = os.path.abspath(self.output_from_new_proc)

def setup(self):
"""
在主进程拉起其它子进程,将主进程作为rank 0
"""
if self._has_setup:
return
self._has_setup = True
# 如果用户需要使用多机模式,那么一定进入到这里;
if self.is_pull_by_paddle_run:

if self.outside_fleet:
# 已经初始化了多机环境
self.set_from_fleet_environment()
else:
# 用户没有初始化多机环境
# TODO 绕一下
# dist.get_world_size() 只能在初始化之后进行调用;
self.world_size = int(os.environ.get("PADDLE_TRAINERS_NUM"))
self.global_rank = int(os.environ.get("PADDLE_TRAINER_ID"))
reset_seed()
logger.warning(f"\nworld size, global rank: {self.world_size}, {self.global_rank}\n")
fleet.init(self.role_maker, self.is_collective, self.strategy)

else:
# 在用户只使用了一个分布式 trainer 的情况下
# 此时 parallel_helper._is_parallel_ctx_initialized() 一定为 False
# parallel_device 是 list,
# if self.local_rank == 0 and FASTNLP_DISTRIBUTED_CHECK not in os.environ:
if not parallel_helper._is_parallel_ctx_initialized():
# 没有初始化分布式环境,且是主进程
self.init_fleet_and_set()
# 用户在这个 trainer 前面又初始化了一个 trainer,并且使用的是 PaddleFleetDriver;
else:
# 已经设置过一次,保证参数必须是一样的
pre_gpus = os.environ[FASTNLP_DISTRIBUTED_CHECK]
pre_gpus = [int (x) for x in pre_gpus.split(",")]
if sorted(pre_gpus) != sorted(self.parallel_device):
raise RuntimeError("Notice you are using `PaddleFleetDriver` after one instantiated `PaddleFleetDriver`, it is not"
"allowed that your second `PaddleFleetDriver` has a new setting of parameters `parallel_device`.")

if not self.outside_fleet:
# self.model.to(self.model_device)
self.configure_fleet()

# 初始化 self._pids,从而使得每一个进程都能接受到 rank0 的 send 操作;
# TODO 不用.to会怎么样?
self._pids = []
dist.all_gather(self._pids, paddle.to_tensor(os.getpid(), dtype="int32"))
# TODO LOCAL_WORLD_SIZE
local_world_size = int(os.environ.get("LOCAL_WORLD_SIZE")) if "LOCAL_WORLD_SIZE" in os.environ else None
if local_world_size is None:
local_world_size = paddle.to_tensor(self.local_rank, dtype="int32")
dist.all_reduce(local_world_size, op=dist.ReduceOp.MAX)
local_world_size = local_world_size.item() + 1

node_rank = self.global_rank // local_world_size
self._pids = self._pids[node_rank*local_world_size: (node_rank+1)*local_world_size]
self._pids = self.tensor_to_numeric(self._pids)

def init_fleet_and_set(self):
"""
使用 FleetLauncher 拉起子进程
"""
if self.local_rank == 0:
# 是 rank0 的话,则拉起其它子进程
launcher = FleetLauncher(self.parallel_device, self.output_from_new_proc)
launcher.launch()
# 设置参数和初始化分布式环境
reset_seed()
fleet.init(self.role_maker, self.is_collective, self.strategy)
self.global_rank = int(os.getenv("PADDLE_TRAINER_ID"))
self.world_size = int(os.getenv("PADDLE_TRAINERS_NUM"))

# 正常情况下不会Assert出问题,但还是保险一下
assert self.global_rank is not None
assert self.world_size is not None
assert self.world_size == len(self.parallel_device)

def set_from_fleet_environment(self):
"""
当用户使用了 `python -m paddle.distributed.launch xxx.py` 启动时,我们需要
根据 paddle 设置的环境变量来获得各种属性
"""
self.world_size = dist.get_world_size()
self.global_rank = dist.get_rank()

def barrier(self):
dist.barrier()

def configure_fleet(self):
if not self._configured and not isinstance(self.model, DataParallel):
self.model = DataParallel(
_FleetWrappingModel(self.model),
**self._fleet_kwargs
)

self._train_step = partial(self.model, **{_MODE_PARAMETER: ForwardState.TRAIN})
self._validate_step = partial(self.model, **{_MODE_PARAMETER: ForwardState.VALIDATE})
self._test_step = partial(self.model, **{_MODE_PARAMETER: ForwardState.TEST})

self._configured = True

@property
def world_size(self) -> int:
return self._world_size

@world_size.setter
def world_size(self, size: int) -> None:
self._world_size = size

@property
def global_rank(self) -> int:
return self._global_rank

@global_rank.setter
def global_rank(self, rank: int) -> None:
self._global_rank = rank

@property
def local_rank(self) -> int:
return int(os.getenv("PADDLE_RANK_IN_NODE", "0"))

@property
def model_device(self):
# 我认为这里的两个 device 应该返回真实值,对 CUDA_VISIBLDE_DEIVCES的转换应该在相应的 to 函数完成
# 否则会造成用户的困惑
return self._model_device

@property
def data_device(self):
if self.outside_fleet:
return self._data_device
return self.model_device

def train_step(self, batch):
return self._train_step(batch)

def validate_step(self, batch):
return self._validate_step(batch)

def test_step(self, batch):
return self._test_step(batch)

def replace_sampler(self, dataloader, dist_sampler: Optional[Union[str, ReproducibleIterator]] = "dist", reproducible: bool = False):
# 暂时不支持iterableDataset
assert dataloader.dataset_kind != _DatasetKind.ITER, \
"FastNLP does not support `IteratorDataset` now."
if isinstance(dist_sampler, ReproducibleIterator):
dataloader.batch_sampler.sampler = dist_sampler
return dataloader

# paddle 的 BatchSampler 和 DataLoader 没有 shuffle 成员,只能根据 sampler 判断
# 但是其子类 DistributedBatchSampler 却有 shuffle 成员
# 因此用 type() 进行严格的判断
if type(dataloader.batch_sampler) == BatchSampler:
shuffle = isinstance(dataloader.batch_sampler.sampler, RandomSampler)
else:
shuffle = dataloader.batch_sampler.shuffle

# trainer, evaluator
if dist_sampler is None:
if reproducible:
raise RuntimeError("It is not allowed to use checkpoint retraining when you initialize fleet out of our "
"control.")
else:
return dataloader
# trainer
elif dist_sampler == "dist":
# 如果用户的 trainer.use_dist_sampler 为 True,那么此时其是否进行断点重训,不影响这里的行为;
if isinstance(dataloader.batch_sampler.sampler, ReproducibleIterator):
dataloader.batch_sampler.sampler.set_distributed(
num_replicas=self.world_size,
rank=self.global_rank,
pad=True
)
return dataloader
else:
sampler = RandomSampler(
dataset=dataloader.dataset,
shuffle=shuffle,
seed=int(os.environ.get("FASTNLP_SEED", 0))
)
sampler.set_distributed(
num_replicas=self.world_size,
rank=self.global_rank,
pad=True
)
dataloader.batch_sampler.sampler = sampler
return dataloader
# evaluator
elif dist_sampler == "unrepeatdist":
sampler = UnrepeatedDistributedSampler(
dataset=dataloader.dataset,
shuffle=shuffle,
seed=int(os.environ.get("FASTNLP_SEED", 0))
)
sampler.set_distributed(
num_replicas=self.world_size,
rank=self.global_rank
)
dataloader.batch_sampler.sampler = sampler
return dataloader
else:
raise ValueError("Parameter `dist_sampler` can only be one of three values: ('dist', 'unrepeatdist', None).")

def backward(self, loss):
self.grad_scaler.scale(loss).backward()

def step(self):
for optimizer in self.optimizers:
self.grad_scaler.step(optimizer)
self.grad_scaler.update()

def is_global_zero(self):
return self.global_rank == 0

def get_no_sync_context(self):
return self.model.no_sync

def unwrap_model(self):
_layers = self.model._layers
if isinstance(_layers, _FleetWrappingModel):
return _layers.model
else:
return _layers

def get_local_rank(self) ->int:
return self.local_rank

def is_distributed(self):
return True

def move_data_to_device(self, batch: 'paddle.Tensor'):
device = self.data_device
# 因为设置了CUDA_VISIBLE_DEVICES,在子进程中可能会引起错误
if FASTNLP_DISTRIBUTED_CHECK in os.environ:
device = get_device_from_visible(device)
return paddle_move_data_to_device(batch, device)

@staticmethod
def _check_optimizer_legality(optimizers):
"""
paddle存在设置分布式optimizers的函数,返回值为fleet.meta_optimizers.HybridParallelOptimizer
重写是为了防止单卡下也传入了分布式的优化器
"""
DistribuedOptimizer = fleet.meta_optimizers.HybridParallelOptimizer
for each_optimizer in optimizers:
if not isinstance(each_optimizer, (Optimizer, DistribuedOptimizer)):
raise ValueError(f"Each optimizer of parameter `optimizers` should be 'paddle.optimizer.Optimizer' type, "
f"not {type(each_optimizer)}.")

+ 176
- 0
fastNLP/core/drivers/paddle_driver/fleet_launcher.py View File

@@ -0,0 +1,176 @@
import os
import sys
import __main__
import tempfile
import copy
from typing import List

from fastNLP.core.drivers.utils import distributed_open_proc
from fastNLP.envs.env import (
FASTNLP_DISTRIBUTED_CHECK,
FASTNLP_LOG_LEVEL,
FASTNLP_GLOBAL_SEED,
USER_CUDA_VISIBLE_DEVICES,
)
from .utils import (
find_free_ports,
reset_seed,
)

# 记录各个进程信息
class SubTrainer(object):
"""
和fastnlp的Triainer没有关系,仅用于统计节点内不同训练的一些信息
"""
def __init__(self, endpoint=None, rank=None):
self.devices = []
self.endpoint = endpoint
self.rank = rank


class FleetLauncher:
"""
复原了 paddle 的 launch_collective 函数,将其集成到一个类里
仅支持单机多卡的启动
"""
def __init__(
self,
devices: List[int],
output_from_new_proc: str = "only_error"
):

self.devices = devices
self.output_from_new_proc = output_from_new_proc

self.setup()

def setup(self):

self.set_endpoints()
self.sub_trainers = self.get_process_info()

def launch(self) -> int:
# 设置环境变量
self.global_envs = self.get_global_env()
self.open_subprocess()
reset_seed()

def open_subprocess(self):

if __main__.__spec__ is None:
# Script called as `python a/b/c.py`
# when user is using hydra find the absolute path
path_lib = os.path.abspath

# pull out the commands used to run the script and resolve the abs file path
command = sys.argv
try:
full_path = path_lib(command[0])
except Exception:
full_path = os.path.abspath(command[0])

command[0] = full_path
# use the same python interpreter and actually running
command = [sys.executable] + command
else: # Script called as `python -m a.b.c`
command = [sys.executable, "-m", __main__.__spec__._name] + sys.argv[1:]

current_env = copy.copy(self.global_envs)
for idx, t in enumerate(self.sub_trainers):
proc_env = {
# global_rank
"PADDLE_TRAINER_ID": f"{t.rank}",
"PADDLE_CURRENT_ENDPOINT": f"{t.endpoint}",
# rank
"PADDLE_RANK_IN_NODE": f"{idx}",
"PADDLE_LOCAL_DEVICE_IDS":
",".join([str(g) for g in t.devices]),
}

if len(t.devices) > 0:
proc_env["FLAGS_selected_gpus"] = "%s" % ",".join(
[str(g) for g in t.devices])
proc_env["FLAGS_selected_devices"] = "%s" % ",".join(
[str(g) for g in t.devices])

current_env.update(proc_env)

if os.environ.get(FASTNLP_GLOBAL_SEED) is None and FASTNLP_GLOBAL_SEED in current_env:
del current_env[FASTNLP_GLOBAL_SEED]

if idx != 0:
# 子进程
if os.environ.get(FASTNLP_LOG_LEVEL, None) is None:
current_env[FASTNLP_LOG_LEVEL] = "warning"
proc = distributed_open_proc(self.output_from_new_proc, command, current_env, t.rank)
else:
# 更新当前的环境变量
os.environ.update(current_env)

def get_global_env(self):

global_envs = copy.copy(os.environ.copy())
self.gloo_rendezvous_dir = tempfile.mkdtemp()
# launch中涉及的gloo环境
global_envs["PADDLE_WITH_GLOO"] = str(os.getenv("PADDLE_WITH_GLOO", "0"))
global_envs["PADDLE_GLOO_RENDEZVOUS"] = "3"
global_envs["PADDLE_GLOO_FS_PATH"] = self.gloo_rendezvous_dir
global_envs["PADDLE_DISTRI_BACKEND"] = "nccl"

# 通过FNLP初始化的标志
global_envs[FASTNLP_DISTRIBUTED_CHECK] = f"{','.join([str(g) for g in self.devices])}"

# 统计全局信息
device_ids = []
for t in self.sub_trainers:
device_ids.append([str(acc) for acc in t.devices])
world_device_ids = [':'.join(ele) for ele in device_ids]
# 全局环境变量
global_envs.update({
# world_size
"PADDLE_TRAINERS_NUM": f"{len(self.sub_trainers)}",
"PADDLE_TRAINER_ENDPOINTS": ",".join(self.endpoints),
"PADDLE_WORLD_DEVICE_IDS": ",".join(world_device_ids),
})

return global_envs

def set_endpoints(self):
"""
Reference to `get_cluster_from_args`
"""
self.node_ip = "127.0.0.1"

free_ports = None
if os.environ.get("FLAGS_START_PORT") is None:
free_ports = find_free_ports(len(self.devices))
if free_ports is not None:
free_ports = list(free_ports)
else:
start_port = int(os.getenv("FLAGS_START_PORT", "6070"))

free_ports = [
x for x in range(start_port, start_port + len(self.devices))
]

self.endpoints = ["%s:%d" % (self.node_ip, port) for port in free_ports]

def get_process_info(self):
"""
Reference to `get_cluster`
"""
sub_trainers = []
assert len(self.endpoints) >= len(
self.devices
), "current trainer_endpoints size should be greater equal than acclerators size."

for i in range(len(self.devices)):
sub_trainer = SubTrainer(f"{self.endpoints[i]}", i)
if isinstance(self.devices[i], (list, tuple)):
sub_trainer.devices.extend(self.devices[i])
else:
sub_trainer.devices.append(self.devices[i])

sub_trainers.append(sub_trainer)

return sub_trainers

+ 87
- 0
fastNLP/core/drivers/paddle_driver/initialize_paddle_driver.py View File

@@ -0,0 +1,87 @@
import os

from typing import Optional, List, Sequence, Union

from .paddle_driver import PaddleDriver
from .single_device import PaddleSingleDriver
from .fleet import PaddleFleetDriver

from fastNLP.envs.imports import _NEED_IMPORT_PADDLE
from fastNLP.envs.env import FASTNLP_DISTRIBUTED_CHECK
from fastNLP.core.log import logger

if _NEED_IMPORT_PADDLE:
import paddle

def initialize_paddle_driver(driver: str, device: Optional[Union[str, int, List[int]]],
model: paddle.nn.Layer, **kwargs) -> PaddleDriver:
r"""
用来根据参数 `driver` 和 `device` 来确定并且初始化一个具体的 `Driver` 实例然后返回回去;
注意如果输入的 `device` 如果和 `driver` 对应不上就直接报错;

:param driver: 该参数的值应为以下之一:["paddle", "fleet"];
:param device: 该参数的格式与 `Trainer` 对参数 `device` 的要求一致;
:param model: 训练或者评测的具体的模型;

:return: 返回一个元组,元组的第一个值是具体的基于 pytorch 的 `Driver` 实例,元组的第二个值是该 driver 的名字(用于检测一个脚本中
先后 driver 的次序的正确问题);
"""
if "PADDLE_TRAINERS_NUM" in os.environ and "PADDLE_RANK_IN_NODE" in os.environ and FASTNLP_DISTRIBUTED_CHECK not in os.environ:
if device is not None:
logger.warning("Parameter `device` would be ignored when you are using `paddle.distributed.launch` to pull "
"up your script. And we will directly get the local device via "
"`f'gpu:{os.environ['FLAGS_selected_gpus']}')`.")
device = [int(g) for g in os.environ["FLAGS_selected_gpus"].split(",")]
return PaddleFleetDriver(model, f"gpu:{os.environ['PADDLE_RANK_IN_NODE']}", True, **kwargs)

if driver not in {"paddle", "fleet"}:
raise ValueError("Parameter `driver` can only be one of these values: ['paddle', 'fleet'].")

cuda_visible_devices = os.getenv("CUDA_VISIBLE_DEVICES")
user_visible_devices = os.getenv("USER_CUDA_VISIBLE_DEVICES")
# 优先级 user > cuda
# 判断单机情况 device 的合法性
# 分布式情况下通过 world_device 判断
if user_visible_devices is not None:
_could_use_device_num = len(user_visible_devices.split(","))
elif cuda_visible_devices is not None:
_could_use_device_num = len(cuda_visible_devices.split(","))
else:
_could_use_device_num = paddle.device.cuda.device_count()
if isinstance(device, int):
if device < 0 and device != -1:
raise ValueError("Parameter `device` can only be '-1' when it is smaller than 0.")
if device >= _could_use_device_num:
raise ValueError("The gpu device that parameter `device` specifies is not existed.")
device = f"gpu:{device}"
elif isinstance(device, Sequence) and not isinstance(device, str):
device = list(set(device))
for each in device:
if not isinstance(each, int):
raise ValueError("When parameter `device` is 'Sequence' type, the value in it should be 'int' type.")
elif each < 0:
raise ValueError("When parameter `device` is 'Sequence' type, the value in it should be bigger than 0.")
if len(device) == 1:
# 传入了 [1] 这样的,视为单卡。
device = device[0]
elif device is not None and not isinstance(device, str):
raise ValueError("Parameter `device` is wrong type, please check our documentation for the right use.")

if driver == "paddle":
if not isinstance(device, List):
return PaddleSingleDriver(model, device, **kwargs)
else:
logger.warning("Notice you are using `paddle` driver but your chosen `device` are multi gpus, we will use"
"`Fleetriver` by default. But if you mean using `PaddleFleetDriver`, you should choose parameter"
"`driver` as `PaddleFleetDriver`.")
return PaddleFleetDriver(model, device, **kwargs)
elif driver == "fleet":
if not isinstance(device, List):
if device == "cpu":
raise ValueError("You are using `fleet` driver, but your chosen `device` is 'cpu'.")
logger.warning("Notice you are using `fleet` driver, but your chosen `device` is only one gpu, we will"
"still use `PaddleFleetDriver` for you, but if you mean using `PaddleSingleDriver`, you should "
"choose `paddle` driver.")
return PaddleFleetDriver(model, device, **kwargs)
else:
return PaddleFleetDriver(model, device, **kwargs)

+ 315
- 0
fastNLP/core/drivers/paddle_driver/paddle_driver.py View File

@@ -0,0 +1,315 @@
import os
import random
from typing import Union, Optional, Callable, Dict
from functools import partial

import numpy as np

from .utils import _build_fp16_env
from fastNLP.envs.imports import _NEED_IMPORT_PADDLE
from fastNLP.core.drivers.driver import Driver
from fastNLP.core.utils import apply_to_collection, paddle_move_data_to_device
from fastNLP.envs import rank_zero_call
from fastNLP.envs import FASTNLP_SEED_WORKERS
from fastNLP.core.log import logger

if _NEED_IMPORT_PADDLE:
import paddle
from paddle.io import DataLoader, IterableDataset
from paddle.optimizer import Optimizer

_reduces = {
'max': paddle.max,
'min': paddle.min,
'mean': paddle.mean,
'sum': paddle.sum
}

class PaddleDriver(Driver):
r"""
Paddle框架的Driver,包括实现单卡训练的`PaddleSingleDriver`和分布式训练的`PaddleFleetDriver`。
"""
def __init__(self, model, fp16: Optional[bool] = False, **kwargs):
if not isinstance(model, paddle.nn.Layer):
raise ValueError(f"Parameter `model` can not be `{type(model)}` in `PaddleDriver`, it should be exactly "
f"`paddle.nn.Layer` type.")

super(PaddleDriver, self).__init__(model)
self.fp16 = fp16

# scaler的参数
self.auto_cast, _grad_scaler = _build_fp16_env(dummy=not fp16)
self.grad_scaler = _grad_scaler()

def zero_grad(self, set_to_none: bool = False):
r"""
实现深度学习中的梯度的置零操作,应当直接通过优化器 optimizers 来将梯度置零;
注意梯度累积不需要在这里实现,trainer 已经在内部实现了梯度累积;

:param set_to_none: 用来判断是否需要将梯度直接置为 None;Paddle中这个参数无效。
"""
# if set_to_none:
# log.warning("Parameter `set_to_none` does nothing in paddle since grad cannot be set directly.")
for optimizer in self.optimizers:
optimizer.clear_grad()

@staticmethod
def _check_dataloader_legality(dataloader, dataloader_name, is_train: bool = False):
r"""
该函数会在 trainer 或者 evaluator 设置 dataloader 后检测 dataloader 的合法性。
要求传入的 dataloader 必须为 `paddle.io.DataLoader` 或包含该类型的字典。

:param dataloader: 需要检测的输入的 `dataloader`;
:param dataloader_name:
:param is_train:
"""
if is_train:
if not isinstance(dataloader, DataLoader):
raise ValueError(f"Parameter `{dataloader_name}` should be 'paddle.io.DataLoader' type, not {type(dataloader)}.")
# TODO 我们先禁止 dataloader 的 dataset 是 IterableDataset 种类;
if isinstance(dataloader.dataset, IterableDataset):
raise TypeError("`IterableDataset` is not allowed.")
else:
if not isinstance(dataloader, Dict):
raise ValueError(f"Parameter `{dataloader_name}` should be 'Dict' type, not {type(dataloader)}.")
else:
for each_dataloader in dataloader.values():
if not isinstance(each_dataloader, DataLoader):
raise ValueError(f"Each dataloader of parameter `{dataloader_name}` should be 'paddle.io.DataLoader' "
f"type, not {type(each_dataloader)}.")
if isinstance(each_dataloader.dataset, IterableDataset):
raise TypeError("`IterableDataset` is not allowed.")

@staticmethod
def _check_optimizer_legality(optimizers):
r"""
对于用户传入 trainer 的每一个 optimizer检测其合法性,必须为`paddle.optimizer.Optimizer`类型。

:param optimizers: 需要检测的 `optimizers`;
"""
for each_optimizer in optimizers:
if not isinstance(each_optimizer, Optimizer):
raise ValueError(f"Each optimizer of parameter `optimizers` should be 'paddle.optimizer.Optimizer' type, "
f"not {type(each_optimizer)}.")

def check_evaluator_mode(self, mode: str):
r"""
因为我们在具体的 driver 的 validate_step 和 test_step 的逻辑是如果模型没有实现本函数,那么就去检测模型是否实现了另一个函数;
因此如果用户的 evaluator mode 是 validate,但是传入的 model 却没有实现 validate_step 函数,而是实现了 test_step 函数,那么
我们应当提醒用户这一行为;
"""
model = self.unwrap_model()
if mode == "validate":
if not hasattr(model, "validate_step"):
if hasattr(model, "test_step"):
logger.warning(
"Your model does not have 'validate_step' method but has 'test_step' method, but you"
"are using 'Evaluator.validate', we are going to use 'test_step' to substitute for"
"'validate_step'.")

else:
if not hasattr(model, "test_step"):
if hasattr(model, "validate_step"):
logger.warning("Your model does not have 'test_step' method but has 'validate' method, but you"
"are using 'Evaluator.test', we are going to use 'validate_step' to substitute for"
"'test_step'.")

@staticmethod
def tensor_to_numeric(tensor, reduce=None):
r"""
将一个 `tensor` 对象(类型为 `paddle.Tensor` )转换为 python 的 `numeric` 对象;如果 tensor 只包含一个
元素则返回 float 或 int 。

:param tensor: 需要被转换的 `tensor` 对象
:param reduce: 可选 ['sum', 'max', 'mea', 'min'],如果不为 None 将使用该 reduce 方法来处理当前 tensor 再返回
float 或 int 对象。
:return: 转换后返回的结果
"""
if tensor is None:
return None

def _translate(_data):
# 如果只含有一个元素,则返回元素本身,而非list
if _data.numel().item() == 1:
return _data.item()
if reduce is None:
return _data.tolist()
else:
return _reduces[reduce](_data).item()

return apply_to_collection(
data=tensor,
dtype=paddle.Tensor,
function=_translate
)

def set_model_mode(self, mode: str):
r"""
设置模型为 `train` / `eval` 的模式;目的是为切换模型训练和推理(会关闭dropout等)模式;

:param mode: 应为二者之一:["train", "eval"];
"""
assert mode in {"train", "eval"}
getattr(self.model, mode)()

@rank_zero_call
def save_model(self, filepath: str, only_state_dict: bool = True, model_save_fn: Optional[Callable]=None, **kwargs):
r"""
保存模型的函数;注意函数 `save` 是用来进行断点重训的函数;
如果 `model_save_fn` 是一个可调用的函数,那么我们会直接运行该函数;

:param filepath: 保存文件的文件位置(需要包括文件名);
:param only_state_dict: 是否只保存模型的 `state_dict`;注意该参数仅当 `model_save_fn` 为 None 时有效;
:param model_save_fn: 用户传入的用来代替该函数本身保存逻辑的函数;如果该参数不为 None,那么我们会调用 model_save_fn(path);
"""
if model_save_fn is not None:
model_save_fn(filepath)
else:
model = self.unwrap_model()
if only_state_dict:
paddle.save(model.state_dict(), filepath)
else:
input_spec = kwargs.get("input_spec", None)
if input_spec is None:
raise Exception("To save the whole Paddle Layer, parameter 'input_spec' is needed.")
paddle.jit.save(model, filepath, input_spec)

@staticmethod
@rank_zero_call
def load_model(filepath: str, load_dict: bool = True):
r"""
加载模型的函数;注意函数 `load` 是用来进行断点重训的函数;

:param filepath: 需要被加载的对象的文件位置(需要包括文件名);
:param load_dict: 是否加载state_dict,默认为True。当用户在save_model时将only_state_dict设置为False时,
即保存了整个模型时,这个参数必须也为False
:return: 返回加载指定文件后的结果;
"""
if load_dict:
return paddle.load(filepath)
else:
return paddle.jit.load(filepath)

@rank_zero_call
def save(self, folder, states: Dict):
r"""
断点重训的保存函数,该函数会负责保存模型和 optimizers 的 state_dict;
需要注意 driver 应当是无状态的,即不管什么时候调用 driver 的接口函数,其返回的结果应该都是一样的;因此,断点重训不需要保存 driver
本身自己的任何状态;而每一个 driver 实例需要在该函数中实现保存模型和 optimizers 的 state_dict 的逻辑;同时妥善存储传入的
states 中的内容(主要用于恢复 Trainer ,Callback 等)
需要保证该函数只在 global rank 0 上运行

:param folder: 保存断点重训的状态的文件名;
:param states: 由 trainer 传入的一个字典,其中已经包含了为了实现断点重训所需要保存的其它对象的状态,Driver 应该只需要保存
该对象即可, Driver 应该不需要理解该对象,同时在 driver.load() 的时候,需要将 states 返回回去,load()返回的值与这里的
传入的值保持一致。
"""
# 1. 保存模型的状态;
model = self.unwrap_model()
model_state_dict = {name: param.cpu().detach().clone() for name, param in model.state_dict().items()}
# 对于单卡的 driver 来讲,我们实际上(现在)不应该考虑用户在DDP环境下使用单卡模式,从而造成效率损失;
states["model_state_dict"] = model_state_dict

# 2. 保存 optimizers 的状态;
optimizers_state_dict = {}
for i in range(len(self.optimizers)):
optimizer: Optimizer = self.optimizers[i]
optimizer_state = optimizer.state_dict()
optimizer_state = {name: param.cpu().detach().clone() for name, param in optimizer_state.items()}
optimizers_state_dict[f"optimizer{i}"] = optimizer_state # 注意这里没有使用 deepcopy,测试是不需要的;
states["optimizers_state_dict"] = optimizers_state_dict

paddle.save(states, folder)

def load(self, filepath) -> Dict:
r"""
断点重训的加载函数,注意该函数会负责读取数据,并且恢复模型和 optimizers 的 state_dict 等;
driver 实例需要在该函数中先加载模型和 optimizers 的 state_dict,然后将一个 state 字典返回给 trainer 。
因此 save 函数和 load 函数的接受和返回值应该是对应的;

该函数需要在所有 rank 上执行。

:param filepath: 保存断点重训的状态的文件名;
:return: 需要返回 save 函数输入的 states 内容;
"""
states = paddle.load(filepath)

# 1. 加载 optimizers 的状态;
optimizers_state_dict = states["optimizers_state_dict"]
for i in range(len(self.optimizers)):
optimizer: paddle.optimizer.Optimizer = self.optimizers[i]
optimizer.set_state_dict(optimizers_state_dict[f"optimizer{i}"])

# 2. 加载模型状态;
model = self.unwrap_model()
model.load_dict(states["model_state_dict"])

self.barrier()
return states

def get_evaluate_context(self):
r"""
返回一个不计算梯度的环境用来对模型进行评测;

:return: context 上下文对象 `paddle.no_grad`;
"""
return paddle.no_grad

@staticmethod
def move_model_to_device(model: 'paddle.nn.Layer', device: Union[str, int, 'paddle.CUDAPlace', 'paddle.CPUPlace']):
r"""
用来将模型转移到指定的 device 上;
在 Paddle 中使用可能会引起因与设置的设备不一致而产生的问题,请注意。
"""
if device is not None:
model.to(device)

def move_data_to_device(self, batch: 'paddle.Tensor'):
r"""
将数据迁移到指定的机器上;batch 可能是 list 也可能 dict ,或其嵌套结构。
在 Paddle 中使用可能会引起因与设置的设备不一致而产生的问题,请注意。

:return: 将移动到指定机器上的 batch 对象返回;
"""
return paddle_move_data_to_device(batch, self.data_device)

@staticmethod
def worker_init_function(worker_id: int, rank: Optional[int] = None) -> None: # pragma: no cover
"""The worker_init_fn that Lightning automatically adds to your dataloader if you previously set set the seed
with ``seed_everything(seed, workers=True)``.

See also the PyTorch documentation on
`randomness in DataLoaders <https://pytorch.org/docs/stable/notes/randomness.html#dataloader>`_.
"""
# implementation notes: https://github.com/pytorch/pytorch/issues/5059#issuecomment-817392562
global_rank = rank if rank is not None else rank_zero_call.rank
# TODO gpu
process_seed = paddle.fluid.core.default_cpu_generator().initial_seed()
# back out the base seed so we can use all the bits
base_seed = process_seed - worker_id
ss = np.random.SeedSequence([base_seed, worker_id, global_rank])
# use 128 bits (4 x 32-bit words)
np.random.seed(ss.generate_state(4))
# Spawn distinct SeedSequences for the PyTorch PRNG and the stdlib random module
paddle_ss, stdlib_ss = ss.spawn(2)
paddle.seed(paddle_ss.generate_state(1, dtype=np.uint64)[0])
# use 128 bits expressed as an integer
stdlib_seed = (stdlib_ss.generate_state(2, dtype=np.uint64).astype(object) * [1 << 64, 1]).sum()
random.seed(stdlib_seed)

def set_deterministic_dataloader(self, dataloader):
r"""
为了确定性训练要对 dataloader 进行修改,保证在确定随机数种子后,每次重新训练得到的结果是一样的;
作用是替换 datalaoder 的 `worker_init_fn`。
"""
if int(os.environ.get(FASTNLP_SEED_WORKERS, 0)) and dataloader.worker_init_fn is None:
dataloader.worker_init_fn = partial(self.worker_init_function, rank=self.global_rank)

def set_sampler_epoch(self, dataloader: 'DataLoader', cur_epoch_idx):
r"""
对于分布式的 sampler,dataloader 需要在每一个 epoch 前设置随机数种子,来保证每一个进程上的 shuffle 是一样的;

:param cur_epoch_idx: 当前是第几个 epoch;
"""
if callable(getattr(dataloader.batch_sampler, "set_epoch", None)):
dataloader.batch_sampler.set_epoch(cur_epoch_idx)

+ 161
- 0
fastNLP/core/drivers/paddle_driver/single_device.py View File

@@ -0,0 +1,161 @@
from typing import Optional, Dict, Union

from .paddle_driver import PaddleDriver
from fastNLP.envs.imports import _NEED_IMPORT_PADDLE
from fastNLP.core.utils import auto_param_call, get_paddle_gpu_str
from fastNLP.core.samplers import ReproducibleBatchSampler, ReproducibleIterator
from fastNLP.core.log import logger

if _NEED_IMPORT_PADDLE:
import paddle
from paddle.fluid.reader import _DatasetKind

__all__ = [
"PaddleSingleDriver",
]

class PaddleSingleDriver(PaddleDriver):
def __init__(self, model, device: Optional[str], fp16: Optional[bool] = False, **kwargs):
super(PaddleSingleDriver, self).__init__(model, fp16=fp16, **kwargs)

if device is None:
raise ValueError("Parameter `device` can not be None in `PaddleSingleDriver`.")

if isinstance(device, int):
self.model_device = get_paddle_gpu_str(device)
else:
self.model_device = device

self.local_rank = 0
self.global_rank = 0
self.world_size = 1

if isinstance(model, paddle.DataParallel):
# 注意这里的 unwrap_model 调用的是具体子类的方法;
model = self.unwrap_model()
if hasattr(model, "train_step"):
logger.warning("Notice your model is a `paddle.DataParallel` model. And your model also "
"implements the `train_step` method, which we can not call actually, we will "
" call `forward` function instead of `train_step` and you should note that.")
self._train_step = self.model
self._train_signature_fn = model.forward

if hasattr(model, "validate_step"):
logger.warning("Notice your model is a `paddle.DataParallel` model. And your model also "
"implements the `validate_step` method, which we can not call actually, we "
"will call `forward` function instead of `validate_step` and you should note that.")
self._validate_step = self.model
self._validate_signature_fn = model.forward

if hasattr(model, "test_step"):
logger.warning("Notice your model is a `paddle.DataParallel` model. And your model also "
"implements the `test_step` method, which we can not call actually, we will "
"call `forward` function instead of `test_step` and you should note that.")
self._test_step = self.model
self._test_signature_fn = model.forward
else:
if hasattr(self.model, "train_step"):
self._train_step = self.model.train_step
self._train_signature_fn = None
else:
self._train_step = self.model
# 输入的模型是 `DataParallel`,我们需要保证其 signature_fn 是正确的;
model = self.unwrap_model()
self._train_signature_fn = model.forward

if hasattr(self.model, "validate_step"):
self._validate_step = self.model.validate_step
self._validate_signature_fn = None
elif hasattr(self.model, "test_step"):
self._validate_step = self.model.test_step
self._validate_signature_fn = self.model.test_step
else:
self._validate_step = self.model
model = self.unwrap_model()
self._validate_signature_fn = model.forward

if hasattr(self.model, "test_step"):
self._test_step = self.model.test_step
self._test_signature_fn = None
elif hasattr(self.model, "validate_step"):
self._test_step = self.model.validate_step
self._test_signature_fn = self.model.validate_step
else:
self._test_step = self.model
model = self.unwrap_model()
self._test_signature_fn = model.forward

def setup(self):
paddle.device.set_device(self.model_device)
self.model.to(self.model_device)

def train_step(self, batch) -> Dict:
# 如果 batch 是一个 Dict,我们就默认帮其做参数匹配,否则就直接传入到 `train_step` 函数中,让用户自己处理;
if isinstance(batch, Dict):
return auto_param_call(self._train_step, batch, signature_fn=self._train_signature_fn)
else:
return self._train_step(batch)

def backward(self, loss):
self.grad_scaler.scale(loss).backward()

def step(self):
for optimizer in self.optimizers:
self.grad_scaler.step(optimizer)
self.grad_scaler.update()

def validate_step(self, batch) -> Dict:
if isinstance(batch, Dict):
return auto_param_call(self._validate_step, batch, signature_fn=self._validate_signature_fn)
else:
return self._validate_step(batch)

def test_step(self, batch) -> Dict:
if isinstance(batch, Dict):
return auto_param_call(self._test_step, batch, signature_fn=self._test_signature_fn)
else:
return self._test_step(batch)

def replace_sampler(self, dataloader, dist_sampler: Union[str, ReproducibleBatchSampler, ReproducibleIterator], reproducible: bool = False):
# 暂时不支持IteratorDataset
assert dataloader.dataset_kind != _DatasetKind.ITER, \
"FastNLP does not support `IteratorDataset` now."
if isinstance(dist_sampler, ReproducibleBatchSampler):
dataloader.batch_sampler = dist_sampler
return dataloader
if isinstance(dist_sampler, ReproducibleIterator):
dataloader.batch_sampler.sampler = dist_sampler
return dataloader

if reproducible:
if isinstance(dataloader.batch_sampler.sampler, ReproducibleIterator):
return dataloader
elif isinstance(dataloader.batch_sampler, ReproducibleBatchSampler):
return dataloader
else:
# TODO
batch_sampler = ReproducibleBatchSampler(
batch_sampler=dataloader.batch_sampler,
batch_size=dataloader.batch_sampler.batch_size,
drop_last=dataloader.drop_last
)
dataloader.batch_sampler = batch_sampler
return dataloader
else:
return dataloader

def unwrap_model(self):
if isinstance(self.model, paddle.DataParallel):
return self.model._layers
else:
return self.model

@property
def data_device(self):
"""
单卡模式不支持 data_device;
"""
return self.model_device

def is_distributed(self):
return False

+ 351
- 0
fastNLP/core/drivers/paddle_driver/utils.py View File

@@ -0,0 +1,351 @@
import socket
import os
import struct
import random
import inspect
import numpy as np
from contextlib import ExitStack, closing
from enum import IntEnum
from typing import Dict, Optional, Union

from fastNLP.envs.imports import _NEED_IMPORT_PADDLE
from fastNLP.core.utils import get_paddle_device_id, auto_param_call
from fastNLP.envs.env import FASTNLP_GLOBAL_SEED, FASTNLP_SEED_WORKERS, USER_CUDA_VISIBLE_DEVICES
from fastNLP.core.log import logger


if _NEED_IMPORT_PADDLE:
import paddle
from paddle import nn
from paddle.nn import Layer
from paddle.io import DataLoader, BatchSampler
from paddle.amp import auto_cast, GradScaler
else:
from fastNLP.core.utils.dummy_class import DummyClass as Layer


__all__ = [
"paddle_seed_everything",
]

def _select_seed_randomly(min_seed_value: int = 0, max_seed_value: int = 255) -> int:
return random.randint(min_seed_value, max_seed_value)

def paddle_seed_everything(seed: Optional[int] = None, workers: bool = False) -> int:

max_seed_value = np.iinfo(np.uint32).max
min_seed_value = np.iinfo(np.uint32).min

if seed is None:
env_seed = os.environ.get("GLOBAL_SEED")
if env_seed is None:
seed = _select_seed_randomly(min_seed_value, max_seed_value)
# rank_zero_warn(f"No seed found, seed set to {seed}")
else:
try:
seed = int(env_seed)
except ValueError:
seed = _select_seed_randomly(min_seed_value, max_seed_value)
# rank_zero_warn(f"Invalid seed found: {repr(env_seed)}, seed set to {seed}")
elif not isinstance(seed, int):
seed = int(seed)

if not (min_seed_value <= seed <= max_seed_value):
logger.warning("Your seed value is two big or two small for numpy, we will choose a random seed for "
"you.")

# rank_zero_warn(f"{seed} is not in bounds, numpy accepts from {min_seed_value} to {max_seed_value}")
seed = _select_seed_randomly(min_seed_value, max_seed_value)

# using `log.info` instead of `rank_zero_info`,
# so users can verify the seed is properly set in distributed training.
# log.info(f"Global seed set to {seed}")
os.environ[FASTNLP_GLOBAL_SEED] = str(seed)
random.seed(seed)
np.random.seed(seed)
# paddle的seed函数会自行判断是否在gpu环境,如果在的话会设置gpu的种子
paddle.seed(seed)
os.environ[FASTNLP_SEED_WORKERS] = f"{int(workers)}"
return seed


def reset_seed() -> None:
"""
fleet 会开启多个进程,因此当用户在脚本中指定 seed_everything 时,在开启多个脚本后,会在每个脚本内重新
进行随机数的设置;
"""
seed = os.environ.get(FASTNLP_GLOBAL_SEED, None)
workers = os.environ.get(FASTNLP_SEED_WORKERS, "0")
if seed is not None:
paddle_seed_everything(int(seed), workers=bool(int(workers)))

class ForwardState(IntEnum):
TRAIN = 0
VALIDATE = 1
TEST = 2
PREDICT = 3

_MODE_PARAMETER = "_forward_state"

class _FleetWrappingModel(Layer):
"""
参考_DDPWrappingModel,paddle的分布式训练也需要用paddle.nn.DataParallel进行包装,采用和
pytorch相似的处理方式
"""
def __init__(self, model: 'nn.Layer'):
super(_FleetWrappingModel, self).__init__()
self.model = model

if isinstance(model, paddle.DataParallel):
model = model._layers
if hasattr(model, "train_step"):
logger.warning(
"Notice your model is a `paddle.DataParallel` model. And your "
"model also implements the `train_step` method, which we can not call actually, we will"
" call `forward` function instead of `train_step` and you should note that.")
self._train_step = self.model
self._train_signature_fn = model.forward

if hasattr(model, "validate_step"):
logger.warning(
"Notice your model is a `paddle.DataParallel` model. And your "
"model also implements the `validate_step` method, which we can not call actually, "
"we will call `forward` function instead of `validate_step` and you should note that.")
self._validate_step = self.model
self._validate_signature_fn = model.forward

if hasattr(model, "test_step"):
logger.warning(
"Notice your model is a `paddle.DataParallel` model. And your "
"model also implements the `test_step` method, which we can not call actually, we will"
" call `forward` function instead of `test_step` and you should note that.")
self._test_step = self.model
self._test_signature_fn = model.forward
else:
if hasattr(model, "train_step"):
self._train_step = model.train_step
self._train_signature_fn = None
else:
self._train_step = model
self._train_signature_fn = model.forward

if hasattr(model, "validate_step"):
self._validate_step = model.validate_step
self._validate_signature_fn = None
elif hasattr(model, "test_step"):
self._validate_step = model.test_step
self._validate_signature_fn = None
else:
self._validate_step = model
self._validate_signature_fn = model.forward

if hasattr(model, "test_step"):
self._test_step = model.test_step
self._test_signature_fn = None
elif hasattr(model, "validate_step"):
self._test_step = model.validate_step
self._test_signature_fn = None
else:
self._test_step = model
self._test_signature_fn = model.forward

def forward(self, batch, **kwargs) -> Dict:

_forward_state = kwargs.pop(_MODE_PARAMETER)

if _forward_state == ForwardState.TRAIN:
if isinstance(batch, Dict):
return auto_param_call(self._train_step, batch, signature_fn=self._train_signature_fn)
else:
return self._train_step(batch)
elif _forward_state == ForwardState.VALIDATE:
if isinstance(batch, Dict):
return auto_param_call(self._validate_step, batch, signature_fn=self._validate_signature_fn)
else:
return self._validate_step(batch)
elif _forward_state == ForwardState.TEST:
if isinstance(batch, Dict):
return auto_param_call(self._test_step, batch, signature_fn=self._test_signature_fn)
else:
return self._test_step(batch)
elif _forward_state == ForwardState.PREDICT:
raise NotImplementedError("'PREDICT' mode has not been implemented.")
else:
raise NotImplementedError("You should direct a concrete mode.")

class DummyGradScaler:
"""
用于仿造的GradScaler对象,防止重复写大量的if判断

"""
def __init__(self, *args, **kwargs):
pass

def get_scale(self):
return 1.0

def is_enabled(self):
return False

def scale(self, outputs):
return outputs

def step(self, optimizer, *args, **kwargs):
optimizer.step(*args, **kwargs)

def update(self, new_scale=None):
pass

def unscale_(self, optimizer):
pass

def load_state_dict(self, state_dict):
pass

def state_dict(self):
return {}


def _build_fp16_env(dummy=False):
if dummy:
auto_cast = ExitStack
GradScaler = DummyGradScaler
else:
if not paddle.device.is_compiled_with_cuda():
raise RuntimeError("No cuda")
if paddle.device.cuda.get_device_capability(0)[0] < 7:
logger.warning(
"NOTE: your device does NOT support faster training with fp16, "
"please switch to FP32 which is likely to be faster"
)
return auto_cast, GradScaler

def find_free_ports(num):
def __free_port():
with closing(socket.socket(socket.AF_INET, socket.SOCK_STREAM)) as s:
s.setsockopt(socket.SOL_SOCKET, socket.SO_LINGER,
struct.pack('ii', 1, 0))
s.bind(('', 0))
return s.getsockname()[1]

port_set = set()
step = 0
while True:
port = __free_port()
if port not in port_set:
port_set.add(port)

if len(port_set) >= num:
return port_set

step += 1
if step > 400:
logger.error(
"can't find avilable port and use the specified static port now!"
)
return None

return None

def get_host_name_ip():
try:
host_name = socket.gethostname()
host_ip = socket.gethostbyname(host_name)
return host_name, host_ip
except:
return None

def get_device_from_visible(device: Union[str, int]):
"""
在有 CUDA_VISIBLE_DEVICES 的情况下,获取对应的设备。
如 CUDA_VISIBLE_DEVICES=2,3 ,device=3 ,则返回1。
:param devices:未转化的设备名
:return: 转化后的设备id
"""
if device == "cpu":
return device
cuda_visible_devices = os.getenv("CUDA_VISIBLE_DEVICES")
idx = get_paddle_device_id(device)
if cuda_visible_devices is None or cuda_visible_devices == "":
# 这个判断一般不会发生,因为 fastnlp 会为 paddle 强行注入 CUDA_VISIBLE_DEVICES
return idx
else:
# 利用 USER_CUDA_VISIBLDE_DEVICES 获取用户期望的设备
user_visiblde_devices = os.getenv(USER_CUDA_VISIBLE_DEVICES)
if user_visiblde_devices is None or user_visiblde_devices != "":
# 不为空,说明用户设置了 CUDA_VISIBLDE_DEVICES
idx = user_visiblde_devices.split(",")[idx]
else:
idx = str(idx)

cuda_visible_devices_list = cuda_visible_devices.split(',')
assert idx in cuda_visible_devices_list, "Can't find "\
"your devices %s in CUDA_VISIBLE_DEVICES[%s]."\
% (idx, cuda_visible_devices)
res = cuda_visible_devices_list.index(idx)
return res

def replace_sampler(dataloader: "DataLoader", sampler: "BatchSampler"):
# 拿到实例属性;
instance_attrs = {k: v for k, v in vars(dataloader).items() if not k.startswith('_')}

# 拿到 dataloader '__init__' 函数的默认函数签名;
init_params = dict(inspect.signature(dataloader.__init__).parameters)

# 这里为什么要单独弄的原因在于,用户在定制自己的 dataloader 的同时可能为了方便只设定一些参数,而后面直接使用 **kwargs 的方式,这时如果
# 其在初始化自己的 dataloader 实例的时候加入了一些其它的新的参数(首先这一步是必要的,因为我们只能通过这样加 sampler;另一方面,用户
# 可能确实通过 **kwargs 加入了一些新的参数),如果假设用户是这样使用的: "super().__init__(**kwargs)",那么我们就只能去 DataLoader
# 中寻找;
has_variadic_kwargs = any(v.kind is v.VAR_KEYWORD for k, v in init_params.items())
if has_variadic_kwargs:
init_params.update(dict(inspect.signature(DataLoader.__init__).parameters))
del init_params["self"]

# 因为我们刚才可能用 DataLoader 的默认参数将用户定制的 dataloader 的参数覆盖掉了,因此需要重新弄一遍;
non_default_params = {name for name, p in init_params.items() if
name in instance_attrs and p.default != instance_attrs[name]}
# add `dataset` as it might have been replaced with `*args`
non_default_params.add("dataset")

reconstruct_args = {k: v for k, v in instance_attrs.items() if k in non_default_params}
reconstruct_args.update({"batch_sampler": sampler, "shuffle": False, "drop_last": False, "batch_size": 1})

required_args = {
p.name
for p in init_params.values()
if p.kind in (p.POSITIONAL_ONLY, p.POSITIONAL_OR_KEYWORD)
and p.default is p.empty
and p.name not in reconstruct_args
}

# 这种错误针对的是 __init__ 中的参数没有用同样名字的 self 挂上;
if required_args:
required_args = sorted(required_args)
dataloader_self_name = dataloader.__class__.__name__
raise Exception(
f"Trying to inject `DistributedBatchSampler` into the `{dataloader_self_name}` instance. "
"This would fail as some of the `__init__` arguments are not available as instance attributes. "
f"The missing attributes are {required_args}. "
f"HINT: If you wrote the `{dataloader_self_name}` class, define `self.missing_arg_name` or "
"manually add the `DistributedBatchSampler` as: "
f"`{dataloader_self_name}(dataset, sampler=DistributedBatchSampler(dataset))`."
)

# 这种错误针对的是传入的 dataloader 不是直接的 DataLoader,而是定制了 DataLoader,但是 __init__ 中没有 **kwargs;
if not has_variadic_kwargs:

# the dataloader signature does not allow keyword arguments that need to be passed
missing_kwargs = reconstruct_args.keys() - init_params.keys()
if missing_kwargs:
missing_kwargs = sorted(missing_kwargs)
dataloader_self_name = dataloader.__class__.__name__
raise Exception(
f"Trying to inject `DistributedBatchSampler` into the `{dataloader_self_name}` instance. "
"This would fail as it doesn't expose all its attributes in the `__init__` signature. "
f"The missing arguments are {missing_kwargs}. "
f"HINT: If you wrote the `{dataloader_self_name}` class, add the `__init__` arguments or "
"manually add the `DistributedBatchSampler` as: "
f"`{dataloader_self_name}(dataset, sampler=DistributedBatchSampler(dataset))`."
)

return type(dataloader)(**reconstruct_args)

Loading…
Cancel
Save