@@ -11,11 +11,12 @@ from fastNLP.envs.imports import _NEED_IMPORT_PADDLE | |||||
if _NEED_IMPORT_PADDLE: | if _NEED_IMPORT_PADDLE: | ||||
import paddle | import paddle | ||||
import paddle.distributed as dist | |||||
from paddle.fluid.dygraph import parallel_helper | from paddle.fluid.dygraph import parallel_helper | ||||
def _simple_gather_all_tensors(result, group: Any, world_size: int) -> List: | def _simple_gather_all_tensors(result, group: Any, world_size: int) -> List: | ||||
gathered_result = [paddle.zeros_like(result) for _ in range(world_size)] | gathered_result = [paddle.zeros_like(result) for _ in range(world_size)] | ||||
paddle.distributed.all_gather(gathered_result, result, group) | |||||
dist.all_gather(gathered_result, result, group) | |||||
return gathered_result | return gathered_result | ||||
class PaddleBackend(Backend): | class PaddleBackend(Backend): | ||||
@@ -36,13 +37,13 @@ class PaddleBackend(Backend): | |||||
tensor = paddle.stack(tensor) | tensor = paddle.stack(tensor) | ||||
# 第一步, aggregate结果 | # 第一步, aggregate结果 | ||||
if method == 'sum': | if method == 'sum': | ||||
tensor = paddle.sum(tensor, dim=0) | |||||
tensor = paddle.sum(tensor, axis=0) | |||||
elif method == 'mean': | elif method == 'mean': | ||||
tensor = paddle.mean(tensor, dim=0) | |||||
tensor = paddle.mean(tensor, axis=0) | |||||
elif method == 'max': | elif method == 'max': | ||||
tensor, _ = paddle.max(tensor, dim=0) | |||||
tensor, _ = paddle.max(tensor, axis=0) | |||||
elif method == 'min': | elif method == 'min': | ||||
tensor, _ = paddle.min(tensor, dim=0) | |||||
tensor, _ = paddle.min(tensor, axis=0) | |||||
else: | else: | ||||
raise AggregateMethodError(should_have_aggregate_method=False) | raise AggregateMethodError(should_have_aggregate_method=False) | ||||
@@ -80,11 +81,12 @@ class PaddleBackend(Backend): | |||||
聚合 group 中所有的 result;由于不同 group 中 result 大小不同,因此在适当的时候需要进行 padding | 聚合 group 中所有的 result;由于不同 group 中 result 大小不同,因此在适当的时候需要进行 padding | ||||
""" | """ | ||||
# TODO check 正确性 | # TODO check 正确性 | ||||
if group is None: | |||||
group = paddle.distributed.get_group(0) | |||||
# 有 paddle 那边的 bug,2.3 版本的时候修复了,到时候改一下 | |||||
# if group is None: | |||||
# group = dist.get_group(0) | |||||
world_size = group.nranks | |||||
paddle.distributed.barrier(group=group) | |||||
world_size = group.nranks if group is not None else dist.get_world_size() | |||||
dist.barrier(group=group) | |||||
# 张量为 标量的情况,简单地gather就好 | # 张量为 标量的情况,简单地gather就好 | ||||
if result.ndim == 0: | if result.ndim == 0: | ||||
@@ -93,10 +95,10 @@ class PaddleBackend(Backend): | |||||
# 获得 result 的 shape | # 获得 result 的 shape | ||||
local_size = paddle.to_tensor(result.shape) | local_size = paddle.to_tensor(result.shape) | ||||
# 将 group 中所有 result 的大小聚合在一起 | # 将 group 中所有 result 的大小聚合在一起 | ||||
local_sizes = [paddle.zeros_like(local_size) for _ in range(world_size)] | |||||
paddle.distributed.all_gather(local_sizes, local_size, group=group) | |||||
local_sizes = [] | |||||
dist.all_gather(local_sizes, local_size, group=group) | |||||
# 堆叠后,计算出 shape 每一维度的最大值 | # 堆叠后,计算出 shape 每一维度的最大值 | ||||
max_size = paddle.stack(local_sizes).max(axis=0).values | |||||
max_size = paddle.stack(local_sizes).max(axis=0) | |||||
all_sizes_equal = all(all(ls == max_size) for ls in local_sizes) | all_sizes_equal = all(all(ls == max_size) for ls in local_sizes) | ||||
# 如果所有的结果大小相同,那么可以直接聚合 | # 如果所有的结果大小相同,那么可以直接聚合 | ||||
@@ -111,10 +113,10 @@ class PaddleBackend(Backend): | |||||
pad_dims.append(val.item()) | pad_dims.append(val.item()) | ||||
result_padded = paddle.nn.functional.pad(result, pad_dims) | result_padded = paddle.nn.functional.pad(result, pad_dims) | ||||
# 重新进行聚合 | # 重新进行聚合 | ||||
gathered_result = [paddle.zeros_like(result_padded) for _ in range(world_size)] | |||||
paddle.distributed.all_gather(gathered_result, result_padded, group) | |||||
gathered_result = [] | |||||
dist.all_gather(gathered_result, result_padded, group) | |||||
for idx, item_size in enumerate(local_sizes): | for idx, item_size in enumerate(local_sizes): | ||||
slice_param = [slice(dim_size) for dim_size in item_size] | |||||
slice_param = [slice(dim_size) for dim_size in item_size.tolist()] | |||||
gathered_result[idx] = gathered_result[idx][slice_param] | gathered_result[idx] = gathered_result[idx][slice_param] | ||||
return gathered_result | return gathered_result | ||||
@@ -0,0 +1,151 @@ | |||||
import pytest | |||||
import os | |||||
from typing import Any | |||||
from dataclasses import dataclass | |||||
from paddle.optimizer import Adam | |||||
from paddle.io import DataLoader | |||||
from fastNLP.core.controllers.trainer import Trainer | |||||
from fastNLP.core.metrics.accuracy import Accuracy | |||||
from fastNLP.core.callbacks.progress_callback import RichCallback | |||||
from fastNLP.envs import FASTNLP_DISTRIBUTED_CHECK | |||||
from tests.helpers.models.paddle_model import PaddleNormalModel_Classification | |||||
from tests.helpers.datasets.paddle_data import PaddleDataset_MNIST | |||||
from tests.helpers.callbacks.helper_callbacks import RecordLossCallback, RecordMetricCallback | |||||
from tests.helpers.utils import magic_argv_env_context | |||||
@dataclass | |||||
class MNISTTrainPaddleConfig: | |||||
num_labels: int = 10 | |||||
feature_dimension: int = 784 | |||||
batch_size: int = 32 | |||||
shuffle: bool = True | |||||
validate_every = -5 | |||||
driver: str = "paddle" | |||||
device = "gpu" | |||||
@dataclass | |||||
class MNISTTrainFleetConfig: | |||||
num_labels: int = 10 | |||||
feature_dimension: int = 784 | |||||
batch_size: int = 32 | |||||
shuffle: bool = True | |||||
validate_every = -5 | |||||
@dataclass | |||||
class TrainerParameters: | |||||
model: Any = None | |||||
optimizers: Any = None | |||||
train_dataloader: Any = None | |||||
validate_dataloaders: Any = None | |||||
input_mapping: Any = None | |||||
output_mapping: Any = None | |||||
metrics: Any = None | |||||
# @pytest.fixture(params=[0], autouse=True) | |||||
# def model_and_optimizers(request): | |||||
# """ | |||||
# 初始化单卡模式的模型和优化器 | |||||
# """ | |||||
# trainer_params = TrainerParameters() | |||||
# print(paddle.device.get_device()) | |||||
# if request.param == 0: | |||||
# trainer_params.model = PaddleNormalModel_Classification( | |||||
# num_labels=MNISTTrainPaddleConfig.num_labels, | |||||
# feature_dimension=MNISTTrainPaddleConfig.feature_dimension | |||||
# ) | |||||
# trainer_params.optimizers = Adam(parameters=trainer_params.model.parameters(), learning_rate=0.0001) | |||||
# train_dataloader = DataLoader( | |||||
# dataset=PaddleDataset_MNIST("train"), | |||||
# batch_size=MNISTTrainPaddleConfig.batch_size, | |||||
# shuffle=True | |||||
# ) | |||||
# val_dataloader = DataLoader( | |||||
# dataset=PaddleDataset_MNIST(mode="test"), | |||||
# batch_size=MNISTTrainPaddleConfig.batch_size, | |||||
# shuffle=True | |||||
# ) | |||||
# trainer_params.train_dataloader = train_dataloader | |||||
# trainer_params.validate_dataloaders = val_dataloader | |||||
# trainer_params.validate_every = MNISTTrainPaddleConfig.validate_every | |||||
# trainer_params.metrics = {"acc": Accuracy()} | |||||
# return trainer_params | |||||
@pytest.mark.parametrize("driver,device", [("paddle", "cpu"), ("paddle", 1)]) | |||||
# @pytest.mark.parametrize("driver,device", [("fleet", [0, 1])]) | |||||
@pytest.mark.parametrize("callbacks", [[RecordMetricCallback(monitor="acc#acc", metric_threshold=0.7, larger_better=True), | |||||
RichCallback(5), RecordLossCallback(loss_threshold=0.3)]]) | |||||
@magic_argv_env_context | |||||
def test_trainer_paddle( | |||||
# model_and_optimizers: TrainerParameters, | |||||
driver, | |||||
device, | |||||
callbacks, | |||||
n_epochs=15, | |||||
): | |||||
trainer_params = TrainerParameters() | |||||
trainer_params.model = PaddleNormalModel_Classification( | |||||
num_labels=MNISTTrainPaddleConfig.num_labels, | |||||
feature_dimension=MNISTTrainPaddleConfig.feature_dimension | |||||
) | |||||
trainer_params.optimizers = Adam(parameters=trainer_params.model.parameters(), learning_rate=0.0001) | |||||
train_dataloader = DataLoader( | |||||
dataset=PaddleDataset_MNIST("train"), | |||||
batch_size=MNISTTrainPaddleConfig.batch_size, | |||||
shuffle=True | |||||
) | |||||
val_dataloader = DataLoader( | |||||
dataset=PaddleDataset_MNIST(mode="test"), | |||||
batch_size=MNISTTrainPaddleConfig.batch_size, | |||||
shuffle=True | |||||
) | |||||
trainer_params.train_dataloader = train_dataloader | |||||
trainer_params.validate_dataloaders = val_dataloader | |||||
trainer_params.validate_every = MNISTTrainPaddleConfig.validate_every | |||||
trainer_params.metrics = {"acc": Accuracy(backend="paddle")} | |||||
if not isinstance(device, (int, str)) and len(device) > 1 and FASTNLP_DISTRIBUTED_CHECK not in os.environ: | |||||
with pytest.raises(SystemExit) as exc: | |||||
trainer = Trainer( | |||||
model=trainer_params.model, | |||||
driver=driver, | |||||
device=device, | |||||
optimizers=trainer_params.optimizers, | |||||
train_dataloader=trainer_params.train_dataloader, | |||||
validate_dataloaders=trainer_params.validate_dataloaders, | |||||
validate_every=trainer_params.validate_every, | |||||
input_mapping=trainer_params.input_mapping, | |||||
output_mapping=trainer_params.output_mapping, | |||||
metrics=trainer_params.metrics, | |||||
n_epochs=n_epochs, | |||||
callbacks=callbacks, | |||||
) | |||||
assert exc.value.code == 0 | |||||
return | |||||
else: | |||||
trainer = Trainer( | |||||
model=trainer_params.model, | |||||
driver=driver, | |||||
device=device, | |||||
optimizers=trainer_params.optimizers, | |||||
train_dataloader=trainer_params.train_dataloader, | |||||
validate_dataloaders=trainer_params.validate_dataloaders, | |||||
validate_every=trainer_params.validate_every, | |||||
input_mapping=trainer_params.input_mapping, | |||||
output_mapping=trainer_params.output_mapping, | |||||
metrics=trainer_params.metrics, | |||||
n_epochs=n_epochs, | |||||
callbacks=callbacks, | |||||
) | |||||
trainer.run() |
@@ -1,17 +1,11 @@ | |||||
import unittest | import unittest | ||||
import torch | import torch | ||||
from fastNLP.envs.set_env import set_env | |||||
from fastNLP.envs.set_env_on_import import set_env_on_import_paddle | |||||
set_env_on_import_paddle() | |||||
set_env("paddle") | |||||
from fastNLP.core.drivers.paddle_driver.paddle_driver import PaddleDriver | |||||
import paddle | import paddle | ||||
from paddle.io import Dataset, DataLoader | from paddle.io import Dataset, DataLoader | ||||
from fastNLP.core.drivers.paddle_driver.paddle_driver import PaddleDriver | |||||
class Net(paddle.nn.Layer): | class Net(paddle.nn.Layer): | ||||
def __init__(self): | def __init__(self): | ||||
super(Net, self).__init__() | super(Net, self).__init__() | ||||