# Copyright 2020 Huawei Technologies Co., Ltd # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. from mindspore.train import Model, ParallelMode from mindspore.nn.loss import SoftmaxCrossEntropyWithLogits from mindspore.nn.optim.momentum import Momentum from mindspore import Tensor, context import mindspore as ms import numpy as np import mindspore.nn as nn from tests.dataset_mock import MindData from mindspore import context from mindspore.common.api import _executor from mindspore.parallel import _cost_model_context as cost_model_context class Dataset(MindData): def __init__(self, predict, label, length=3): super(Dataset, self).__init__(size=length) self.predict = predict self.label = label self.index = 0 self.length = length def __iter__(self): return self def __next__(self): if self.index >= self.length: raise StopIteration self.index += 1 return self.predict, self.label def reset(self): self.index = 0 class DenseNet1(nn.Cell): def __init__(self, has_bias=True, activation='relu'): super(DenseNet1, self).__init__() self.fc1 = nn.Dense(128, 128, has_bias=has_bias, activation=activation) self.fc2 = nn.Dense(128, 128, has_bias=has_bias, activation=activation) self.fc3 = nn.Dense(128, 128, has_bias=has_bias, activation=activation) self.fc4 = nn.Dense(128, 128, has_bias=has_bias, activation=activation) def construct(self, x): q = self.fc1(x) k = self.fc2(q) v = self.fc3(k) s = self.fc4(v) return s class DenseNet2(nn.Cell): def __init__(self, has_bias=True, activation='relu'): super(DenseNet2, self).__init__() self.fc1 = nn.Dense(128, 128, has_bias=has_bias, activation=activation) self.fc2 = nn.Dense(128, 128, has_bias=has_bias, activation=activation) self.fc3 = nn.Dense(128, 128, has_bias=has_bias, activation=activation) self.fc4 = nn.Dense(128, 128, has_bias=has_bias, activation=activation) self.fc5 = nn.Dense(128, 128, has_bias=has_bias, activation=activation) self.fc6 = nn.Dense(128, 128, has_bias=has_bias, activation=activation) self.fc7 = nn.Dense(128, 128, has_bias=has_bias, activation=activation) self.fc8 = nn.Dense(128, 128, has_bias=has_bias, activation=activation) def construct(self, x): q = self.fc1(x) k = self.fc2(q) v = self.fc3(k) s = self.fc4(v) t = self.fc5(s) u = self.fc6(t) w = self.fc7(u) z = self.fc8(w) return z class SimpleDMLNet(nn.Cell): def __init__(self, net1, net2): super(SimpleDMLNet, self).__init__() self.backbone1 = net1 self.backbone2 = net2 def construct(self, x): x1 = self.backbone1(x) x2 = self.backbone2(x) return x1 + x2 def train_common(net): batch_size = 32 learning_rate = 0.1 momentum = 0.9 epoch_size = 2 device_num=4 context.reset_auto_parallel_context() context.set_auto_parallel_context(parallel_mode=ParallelMode.SEMI_AUTO_PARALLEL, device_num=device_num, parameter_broadcast=False) context.set_context(mode=context.GRAPH_MODE) predict = Tensor(np.ones([batch_size, 128]), dtype=ms.float32) label = Tensor(np.ones([batch_size]), dtype=ms.int32) dataset = Dataset(predict, label, 2) loss = SoftmaxCrossEntropyWithLogits(is_grad=False, sparse=True) opt = Momentum(net.trainable_params(), learning_rate, momentum) model = Model(net, loss, opt) model.train(epoch_size, dataset, dataset_sink_mode=False) allreduce_fusion_dict = _executor._get_allreduce_fusion(model._train_network) print(allreduce_fusion_dict) return allreduce_fusion_dict def test_allreduce_fusion_parameters(): cost_model_context.reset_cost_model_context() cost_model_context.set_cost_model_context(costmodel_allreduce_fusion_algorithm=2) algorithm = cost_model_context.get_cost_model_context('costmodel_allreduce_fusion_algorithm') assert (algorithm == 2) cost_model_context.set_cost_model_context(costmodel_allreduce_fusion_algorithm=1) algorithm = cost_model_context.get_cost_model_context('costmodel_allreduce_fusion_algorithm') assert (algorithm == 1) cost_model_context.reset_cost_model_context() algorithm = cost_model_context.get_cost_model_context('costmodel_allreduce_fusion_algorithm') assert (algorithm == 0) cost_model_context.set_cost_model_context(costmodel_allreduce_fusion_times=2) fusion_times = cost_model_context.get_cost_model_context('costmodel_allreduce_fusion_times') assert (fusion_times == 2) cost_model_context.set_cost_model_context(costmodel_allreduce_fusion_tail_percent=0.2) tail_percent = cost_model_context.get_cost_model_context('costmodel_allreduce_fusion_tail_percent') assert (tail_percent == 0.2) cost_model_context.reset_cost_model_context() tail_percent = cost_model_context.get_cost_model_context('costmodel_allreduce_fusion_tail_percent') assert (tail_percent == 0.1) cost_model_context.set_cost_model_context(costmodel_allreduce_fusion_tail_time=0.2) tail_time = cost_model_context.get_cost_model_context('costmodel_allreduce_fusion_tail_time') assert (tail_time == 0.2) cost_model_context.reset_cost_model_context() tail_time = cost_model_context.get_cost_model_context('costmodel_allreduce_fusion_tail_time') assert (tail_time == 0.1) cost_model_context.set_cost_model_context(costmodel_allreduce_fusion_allreduce_inherent_time=0.2) allreduce_inherent_time = cost_model_context.get_cost_model_context('costmodel_allreduce_fusion_allreduce_inherent_time') assert (allreduce_inherent_time == 0.2) cost_model_context.reset_cost_model_context() allreduce_inherent_time = cost_model_context.get_cost_model_context('costmodel_allreduce_fusion_allreduce_inherent_time') assert (allreduce_inherent_time == 0.1) cost_model_context.set_cost_model_context(costmodel_allreduce_fusion_allreduce_bandwidth=0.2) allreduce_bandwidth = cost_model_context.get_cost_model_context('costmodel_allreduce_fusion_allreduce_bandwidth') assert (allreduce_bandwidth == 0.2) cost_model_context.reset_cost_model_context() allreduce_bandwidth = cost_model_context.get_cost_model_context('costmodel_allreduce_fusion_allreduce_bandwidth') assert (allreduce_bandwidth == 0.1) cost_model_context.set_cost_model_context(costmodel_allreduce_fusion_computation_time_parameter=0.2) computation_time_parameter = cost_model_context.get_cost_model_context('costmodel_allreduce_fusion_computation_time_parameter') assert (computation_time_parameter == 0.2) cost_model_context.reset_cost_model_context() computation_time_parameter = cost_model_context.get_cost_model_context('costmodel_allreduce_fusion_computation_time_parameter') assert (computation_time_parameter == 0.1) def test_allreduce_fusion1(): cost_model_context.set_cost_model_context(costmodel_allreduce_fusion_algorithm=1) cost_model_context.set_cost_model_context(costmodel_allreduce_fusion_times=2) cost_model_context.set_cost_model_context(costmodel_allreduce_fusion_tail_percent=0.5) net = SimpleDMLNet(DenseNet1(has_bias=False, activation=None), DenseNet2(has_bias=False, activation=None)) allreduce_fusion_dict = train_common(net) expect_dict = {'backbone2.fc8.weight': 2, 'backbone2.fc7.weight': 2, 'backbone2.fc6.weight': 2, 'backbone1.fc4.weight': 2, 'backbone1.fc3.weight': 2, 'backbone1.fc2.weight': 2, 'backbone2.fc5.weight': 1, 'backbone2.fc4.weight': 1, 'backbone2.fc3.weight': 1, 'backbone2.fc2.weight': 1, 'backbone2.fc1.weight': 1, 'backbone1.fc1.weight': 1} assert (allreduce_fusion_dict == expect_dict) cost_model_context.reset_cost_model_context() # reset_cost_model_context is called, the default value of costmodel_allreduce_fusion_times is 0, step_allreduce_fusion # is bypassed. def test_allreduce_fusion2(): cost_model_context.set_cost_model_context(costmodel_allreduce_fusion_times=2) cost_model_context.set_cost_model_context(costmodel_allreduce_fusion_tail_percent=0.5) cost_model_context.reset_cost_model_context() net = SimpleDMLNet(DenseNet1(has_bias=False, activation=None), DenseNet2(has_bias=False, activation=None)) allreduce_fusion_dict = train_common(net) expect_dict = {} assert (allreduce_fusion_dict == expect_dict) cost_model_context.reset_cost_model_context() def test_allreduce_fusion3(): cost_model_context.set_cost_model_context(costmodel_allreduce_fusion_algorithm=1) cost_model_context.set_cost_model_context(costmodel_allreduce_fusion_times=3) cost_model_context.set_cost_model_context(costmodel_allreduce_fusion_tail_percent=0.3333333) net = SimpleDMLNet(DenseNet1(has_bias=True, activation='relu'), DenseNet2(has_bias=False, activation='relu')) allreduce_fusion_dict = train_common(net) expect_dict = {'backbone2.fc8.weight': 3, 'backbone2.fc7.weight': 3, 'backbone2.fc6.weight': 2, 'backbone2.fc5.weight': 2, 'backbone2.fc4.weight': 2, 'backbone2.fc3.weight': 1, 'backbone2.fc2.weight': 1, 'backbone2.fc1.weight': 1, 'backbone1.fc4.bias': 3, 'backbone1.fc4.weight': 3, 'backbone1.fc3.bias': 3, 'backbone1.fc3.weight': 2, 'backbone1.fc2.bias': 2, 'backbone1.fc2.weight': 2, 'backbone1.fc1.bias': 2, 'backbone1.fc1.weight': 2} assert (allreduce_fusion_dict == expect_dict) cost_model_context.reset_cost_model_context() def test_allreduce_fusion4(): cost_model_context.set_cost_model_context(costmodel_allreduce_fusion_algorithm=1) cost_model_context.set_cost_model_context(costmodel_allreduce_fusion_times=2) cost_model_context.set_cost_model_context(costmodel_allreduce_fusion_tail_percent=0.5) net = SimpleDMLNet(DenseNet2(has_bias=False, activation=None), DenseNet2(has_bias=False, activation=None)) allreduce_fusion_dict = train_common(net) expect_dict = {'backbone2.fc8.weight': 2, 'backbone2.fc7.weight': 2, 'backbone2.fc6.weight': 2, 'backbone1.fc8.weight': 2, 'backbone1.fc7.weight': 2, 'backbone1.fc6.weight': 2, 'backbone2.fc5.weight': 1, 'backbone2.fc4.weight': 1, 'backbone2.fc3.weight': 1, 'backbone2.fc2.weight': 1, 'backbone2.fc1.weight': 1, 'backbone1.fc5.weight': 1, 'backbone1.fc4.weight': 1, 'backbone1.fc3.weight': 1, 'backbone1.fc2.weight': 1, 'backbone1.fc1.weight': 1} assert (allreduce_fusion_dict == expect_dict) cost_model_context.reset_cost_model_context() def test_allreduce_fusion5(): cost_model_context.set_cost_model_context(costmodel_allreduce_fusion_algorithm=2) cost_model_context.set_cost_model_context(costmodel_allreduce_fusion_tail_time=0.1) cost_model_context.set_cost_model_context(costmodel_allreduce_fusion_allreduce_inherent_time=0.05) cost_model_context.set_cost_model_context(costmodel_allreduce_fusion_allreduce_bandwidth=0.000001) cost_model_context.set_cost_model_context(costmodel_allreduce_fusion_computation_time_parameter=0.0000015) net = SimpleDMLNet(DenseNet2(has_bias=False, activation=None), DenseNet2(has_bias=False, activation=None)) allreduce_fusion_dict = train_common(net) expect_dict = {'backbone2.fc8.weight': 3, 'backbone2.fc7.weight': 3, 'backbone2.fc6.weight': 3, 'backbone2.fc5.weight': 2, 'backbone2.fc4.weight': 2, 'backbone2.fc3.weight': 2, 'backbone2.fc2.weight': 1, 'backbone2.fc1.weight': 1, 'backbone1.fc8.weight': 3, 'backbone1.fc7.weight': 3, 'backbone1.fc6.weight': 3, 'backbone1.fc5.weight': 2, 'backbone1.fc4.weight': 2, 'backbone1.fc3.weight': 2, 'backbone1.fc2.weight': 1, 'backbone1.fc1.weight': 1,} assert (allreduce_fusion_dict == expect_dict) cost_model_context.reset_cost_model_context()