From 20ca0842eab90f78515ec4e049fd87fb9ea9db5f Mon Sep 17 00:00:00 2001 From: wjtest001 Date: Fri, 2 Sep 2022 11:08:15 +0800 Subject: [PATCH] add scripts --- npu_multiNode/README.md | 2 + npu_multiNode/config.py | 33 +++++ npu_multiNode/dataset.py | 60 ++++++++ npu_multiNode/dataset_distributed.py | 54 +++++++ npu_multiNode/lenet.py | 60 ++++++++ npu_multiNode/train.py | 211 +++++++++++++++++++++++++++ npu_multiNode/说明.md | 5 + 7 files changed, 425 insertions(+) create mode 100755 npu_multiNode/README.md create mode 100755 npu_multiNode/config.py create mode 100755 npu_multiNode/dataset.py create mode 100755 npu_multiNode/dataset_distributed.py create mode 100755 npu_multiNode/lenet.py create mode 100755 npu_multiNode/train.py create mode 100755 npu_multiNode/说明.md diff --git a/npu_multiNode/README.md b/npu_multiNode/README.md new file mode 100755 index 0000000..298794f --- /dev/null +++ b/npu_multiNode/README.md @@ -0,0 +1,2 @@ +# MNIST_PytorchExample_npu_multiNode + diff --git a/npu_multiNode/config.py b/npu_multiNode/config.py new file mode 100755 index 0000000..22d68e2 --- /dev/null +++ b/npu_multiNode/config.py @@ -0,0 +1,33 @@ +# Copyright 2020 Huawei Technologies Co., Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============================================================================ +""" +network config setting, will be used in train.py +""" + +from easydict import EasyDict as edict + +mnist_cfg = edict({ + 'num_classes': 10, + 'lr': 0.01, + 'momentum': 0.9, + 'epoch_size': 10, + 'batch_size': 32, + 'buffer_size': 1000, + 'image_height': 32, + 'image_width': 32, + 'save_checkpoint_steps': 1875, + 'keep_checkpoint_max': 150, + 'air_name': "lenet", +}) diff --git a/npu_multiNode/dataset.py b/npu_multiNode/dataset.py new file mode 100755 index 0000000..df9eecd --- /dev/null +++ b/npu_multiNode/dataset.py @@ -0,0 +1,60 @@ +# Copyright 2020 Huawei Technologies Co., Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============================================================================ +""" +Produce the dataset +""" + +import mindspore.dataset as ds +import mindspore.dataset.vision.c_transforms as CV +import mindspore.dataset.transforms.c_transforms as C +from mindspore.dataset.vision import Inter +from mindspore.common import dtype as mstype + + +def create_dataset(data_path, batch_size=32, repeat_size=1, + num_parallel_workers=1): + """ + create dataset for train or test + """ + # define dataset + mnist_ds = ds.MnistDataset(data_path) + + resize_height, resize_width = 32, 32 + rescale = 1.0 / 255.0 + shift = 0.0 + rescale_nml = 1 / 0.3081 + shift_nml = -1 * 0.1307 / 0.3081 + + # define map operations + resize_op = CV.Resize((resize_height, resize_width), interpolation=Inter.LINEAR) # Bilinear mode + rescale_nml_op = CV.Rescale(rescale_nml, shift_nml) + rescale_op = CV.Rescale(rescale, shift) + hwc2chw_op = CV.HWC2CHW() + type_cast_op = C.TypeCast(mstype.int32) + + # apply map operations on images + mnist_ds = mnist_ds.map(operations=type_cast_op, input_columns="label", num_parallel_workers=num_parallel_workers) + mnist_ds = mnist_ds.map(operations=resize_op, input_columns="image", num_parallel_workers=num_parallel_workers) + mnist_ds = mnist_ds.map(operations=rescale_op, input_columns="image", num_parallel_workers=num_parallel_workers) + mnist_ds = mnist_ds.map(operations=rescale_nml_op, input_columns="image", num_parallel_workers=num_parallel_workers) + mnist_ds = mnist_ds.map(operations=hwc2chw_op, input_columns="image", num_parallel_workers=num_parallel_workers) + + # apply DatasetOps + buffer_size = 10000 + mnist_ds = mnist_ds.shuffle(buffer_size=buffer_size) # 10000 as in LeNet train script + mnist_ds = mnist_ds.batch(batch_size, drop_remainder=True) + mnist_ds = mnist_ds.repeat(repeat_size) + + return mnist_ds diff --git a/npu_multiNode/dataset_distributed.py b/npu_multiNode/dataset_distributed.py new file mode 100755 index 0000000..66cca60 --- /dev/null +++ b/npu_multiNode/dataset_distributed.py @@ -0,0 +1,54 @@ + +""" +Produce the dataset: +与单机不同的是,在数据集接口需要传入num_shards和shard_id参数,分别对应卡的数量和逻辑序号,建议通过HCCL接口获取: +get_rank:获取当前设备在集群中的ID。 +get_group_size:获取集群数量。 + +""" + +import mindspore.dataset as ds +import mindspore.dataset.vision.c_transforms as CV +import mindspore.dataset.transforms.c_transforms as C +from mindspore.dataset.vision import Inter +from mindspore.common import dtype as mstype +from mindspore.communication.management import get_rank, get_group_size + +def create_dataset_parallel(data_path, batch_size=32, repeat_size=1, + num_parallel_workers=1, shard_id=0, num_shards=8): + """ + create dataset for train or test + """ + + resize_height, resize_width = 32, 32 + rescale = 1.0 / 255.0 + shift = 0.0 + rescale_nml = 1 / 0.3081 + shift_nml = -1 * 0.1307 / 0.3081 + # get shard_id and num_shards.Get the ID of the current device in the cluster And Get the number of clusters. + shard_id = get_rank() + num_shards = get_group_size() + # define dataset + mnist_ds = ds.MnistDataset(data_path, num_shards=num_shards, shard_id=shard_id) + + # define map operations + resize_op = CV.Resize((resize_height, resize_width), interpolation=Inter.LINEAR) # Bilinear mode + rescale_nml_op = CV.Rescale(rescale_nml, shift_nml) + rescale_op = CV.Rescale(rescale, shift) + hwc2chw_op = CV.HWC2CHW() + type_cast_op = C.TypeCast(mstype.int32) + + # apply map operations on images + mnist_ds = mnist_ds.map(operations=type_cast_op, input_columns="label", num_parallel_workers=num_parallel_workers) + mnist_ds = mnist_ds.map(operations=resize_op, input_columns="image", num_parallel_workers=num_parallel_workers) + mnist_ds = mnist_ds.map(operations=rescale_op, input_columns="image", num_parallel_workers=num_parallel_workers) + mnist_ds = mnist_ds.map(operations=rescale_nml_op, input_columns="image", num_parallel_workers=num_parallel_workers) + mnist_ds = mnist_ds.map(operations=hwc2chw_op, input_columns="image", num_parallel_workers=num_parallel_workers) + + # apply DatasetOps + buffer_size = 10000 + mnist_ds = mnist_ds.shuffle(buffer_size=buffer_size) # 10000 as in LeNet train script + mnist_ds = mnist_ds.batch(batch_size, drop_remainder=True) + mnist_ds = mnist_ds.repeat(repeat_size) + + return mnist_ds diff --git a/npu_multiNode/lenet.py b/npu_multiNode/lenet.py new file mode 100755 index 0000000..0600793 --- /dev/null +++ b/npu_multiNode/lenet.py @@ -0,0 +1,60 @@ +# Copyright 2020 Huawei Technologies Co., Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============================================================================ +"""LeNet.""" +import mindspore.nn as nn +from mindspore.common.initializer import Normal + + +class LeNet5(nn.Cell): + """ + Lenet network + + Args: + num_class (int): Number of classes. Default: 10. + num_channel (int): Number of channels. Default: 1. + + Returns: + Tensor, output tensor + Examples: + >>> LeNet(num_class=10) + + """ + def __init__(self, num_class=10, num_channel=1, include_top=True): + super(LeNet5, self).__init__() + self.conv1 = nn.Conv2d(num_channel, 6, 5, pad_mode='valid') + self.conv2 = nn.Conv2d(6, 16, 5, pad_mode='valid') + self.relu = nn.ReLU() + self.max_pool2d = nn.MaxPool2d(kernel_size=2, stride=2) + self.include_top = include_top + if self.include_top: + self.flatten = nn.Flatten() + self.fc1 = nn.Dense(16 * 5 * 5, 120, weight_init=Normal(0.02)) + self.fc2 = nn.Dense(120, 84, weight_init=Normal(0.02)) + self.fc3 = nn.Dense(84, num_class, weight_init=Normal(0.02)) + + def construct(self, x): + x = self.conv1(x) + x = self.relu(x) + x = self.max_pool2d(x) + x = self.conv2(x) + x = self.relu(x) + x = self.max_pool2d(x) + if not self.include_top: + return x + x = self.flatten(x) + x = self.relu(self.fc1(x)) + x = self.relu(self.fc2(x)) + x = self.fc3(x) + return x diff --git a/npu_multiNode/train.py b/npu_multiNode/train.py new file mode 100755 index 0000000..7471a3f --- /dev/null +++ b/npu_multiNode/train.py @@ -0,0 +1,211 @@ +""" +######################## single-dataset train lenet example ######################## +This example is a single-dataset training tutorial. If it is a multi-dataset, please refer to the multi-dataset training +tutorial train_for_multidataset.py. This example cannot be used for multi-datasets! + +######################## Instructions for using the training environment ######################## +The image of the debugging environment and the image of the training environment are two different images, +and the working local directories are different. In the training task, you need to pay attention to the following points. +1、(1)The structure of the dataset uploaded for single dataset training in this example + MNISTData.zip + ├── test + └── train + + +2、Single dataset training requires predefined functions +(1)Copy single dataset from obs to training image +function ObsToEnv(obs_data_url, data_dir) + +(2)Copy the output to obs +function EnvToObs(train_dir, obs_train_url) + +(3)Download the input from Qizhi And Init +function DownloadFromQizhi(obs_data_url, data_dir) + +(4)Upload the output to Qizhi +function UploadToQizhi(train_dir, obs_train_url) + +3、3 parameters need to be defined +--data_url is the dataset you selected on the Qizhi platform + +--data_url,--train_url,--device_target,These 3 parameters must be defined first in a single dataset task, +otherwise an error will be reported. +There is no need to add these parameters to the running parameters of the Qizhi platform, +because they are predefined in the background, you only need to define them in your code. + +4、How the dataset is used +A single dataset uses data_url as the input, and data_dir (ie:'/cache/data') as the calling method +of the dataset in the image. +For details, please refer to the following sample code. + +""" + +import os +import argparse +import moxing as mox +from config import mnist_cfg as cfg +from dataset import create_dataset +from dataset_distributed import create_dataset_parallel +from lenet import LeNet5 +import mindspore.nn as nn +from mindspore import context +from mindspore.train.callback import ModelCheckpoint, CheckpointConfig, LossMonitor, TimeMonitor +from mindspore.train import Model +from mindspore.nn.metrics import Accuracy +from mindspore.context import ParallelMode +from mindspore.communication.management import init, get_rank, get_group_size +import mindspore.ops as ops +import time + +### Copy single dataset from obs to training image### +def ObsToEnv(obs_data_url, data_dir): + try: + mox.file.copy_parallel(obs_data_url, data_dir) + print("Successfully Download {} to {}".format(obs_data_url, data_dir)) + except Exception as e: + print('moxing download {} to {} failed: '.format(obs_data_url, data_dir) + str(e)) + #Set a cache file to determine whether the data has been copied to obs. + #If this file exists during multi-card training, there is no need to copy the dataset multiple times. + f = open("/cache/download_input.txt", 'w') + f.close() + try: + if os.path.exists("/cache/download_input.txt"): + print("download_input succeed") + except Exception as e: + print("download_input failed") + return +### Copy the output to obs### +def EnvToObs(train_dir, obs_train_url): + try: + mox.file.copy_parallel(train_dir, obs_train_url) + print("Successfully Upload {} to {}".format(train_dir,obs_train_url)) + except Exception as e: + print('moxing upload {} to {} failed: '.format(train_dir,obs_train_url) + str(e)) + return +def DownloadFromQizhi(obs_data_url, data_dir): + device_num = int(os.getenv('RANK_SIZE')) + node_num = get_group_size() + if device_num == 1: + ObsToEnv(obs_data_url,data_dir) + context.set_context(mode=context.GRAPH_MODE,device_target=args.device_target) + if device_num > 1 and node_num == 1: + # set device_id and init for multi-card training + context.set_context(mode=context.GRAPH_MODE, device_target=args.device_target, device_id=int(os.getenv('ASCEND_DEVICE_ID'))) + context.reset_auto_parallel_context() + context.set_auto_parallel_context(device_num = device_num, parallel_mode=ParallelMode.DATA_PARALLEL, gradients_mean=True, parameter_broadcast=True) + init() + #Copying obs data does not need to be executed multiple times, just let the 0th card copy the data + local_rank=int(os.getenv('RANK_ID')) + if local_rank%8==0: + ObsToEnv(obs_data_url,data_dir) + #If the cache file does not exist, it means that the copy data has not been completed, + #and Wait for 0th card to finish copying data + while not os.path.exists("/cache/download_input.txt"): + time.sleep(1) + if node_num > 1: + ObsToEnv(obs_data_url,data_dir) + return +def UploadToQizhi(train_dir, obs_train_url): + device_num = int(os.getenv('RANK_SIZE')) + local_rank=int(os.getenv('RANK_ID')) + if device_num == 1: + EnvToObs(train_dir, obs_train_url) + if device_num > 1: + if local_rank%8==0: + EnvToObs(train_dir, obs_train_url) + return + +### --data_url,--train_url,--device_target,These 3 parameters must be defined first in a single dataset, +### otherwise an error will be reported. +###There is no need to add these parameters to the running parameters of the Qizhi platform, +###because they are predefined in the background, you only need to define them in your code. +parser = argparse.ArgumentParser(description='MindSpore Lenet Example') +parser.add_argument('--data_url', + help='path to training/inference dataset folder', + default= '/cache/data/') + +parser.add_argument('--train_url', + help='output folder to save/load', + default= '/cache/output/') + +parser.add_argument( + '--device_target', + type=str, + default="Ascend", + choices=['Ascend', 'CPU'], + help='device where the code will be implemented (default: Ascend),if to use the CPU on the Qizhi platform:device_target=CPU') + +parser.add_argument('--epoch_size', + type=int, + default=5, + help='Training epochs.') + +parser.add_argument('--distributed', + type=bool, + default=True, + help='Whether to perform distributed training.') + +if __name__ == "__main__": + args = parser.parse_args() + data_dir = '/cache/data' + train_dir = '/cache/output' + if not os.path.exists(data_dir): + os.makedirs(data_dir) + if not os.path.exists(train_dir): + os.makedirs(train_dir) + ###Initialize and copy data to training image + if args.distributed: + init() + DownloadFromQizhi(args.data_url, data_dir) + ###The dataset path is used here:data_dir +"/train" + device_num = int(os.getenv('RANK_SIZE')) + if device_num == 1: + ds_train = create_dataset(os.path.join(data_dir, "train"), cfg.batch_size) + if device_num > 1: + ds_train = create_dataset_parallel(os.path.join(data_dir, "train"), cfg.batch_size) + if ds_train.get_dataset_size() == 0: + raise ValueError("Please check dataset size > 0 and batch_size <= dataset size") + + network = LeNet5(cfg.num_classes) + net_loss = nn.SoftmaxCrossEntropyWithLogits(sparse=True, reduction="mean") + net_opt = nn.Momentum(network.trainable_params(), cfg.lr, cfg.momentum) + time_cb = TimeMonitor(data_size=ds_train.get_dataset_size()) + + if args.device_target != "Ascend": + model = Model(network, + net_loss, + net_opt, + metrics={"accuracy": Accuracy()}) + else: + model = Model(network, + net_loss, + net_opt, + metrics={"accuracy": Accuracy()}, + amp_level="O2") + + config_ck = CheckpointConfig( + save_checkpoint_steps=cfg.save_checkpoint_steps, + keep_checkpoint_max=cfg.keep_checkpoint_max) + #Note that this method saves the model file on each card. You need to specify the save path on each card. + # In this example, get_rank() is added to distinguish different paths. + if device_num == 1: + outputDirectory = train_dir + "/" + if device_num > 1: + outputDirectory = train_dir + "/" + str(get_rank()) + "/" + ckpoint_cb = ModelCheckpoint(prefix="checkpoint_lenet", + directory=outputDirectory, + config=config_ck) + print("============== Starting Training ==============") + epoch_size = cfg['epoch_size'] + if (args.epoch_size): + epoch_size = args.epoch_size + print('epoch_size is: ', epoch_size) + + model.train(epoch_size, + ds_train, + callbacks=[time_cb, ckpoint_cb, + LossMonitor()]) + + ###Copy the trained output data from the local running environment back to obs, + ###and download it in the training task corresponding to the Qizhi platform + UploadToQizhi(train_dir,args.train_url) diff --git a/npu_multiNode/说明.md b/npu_multiNode/说明.md new file mode 100755 index 0000000..8aa9aac --- /dev/null +++ b/npu_multiNode/说明.md @@ -0,0 +1,5 @@ + + +需要是多机多节点的任务,比如2节点2卡 + +数据集选1个就行,mnistData。 \ No newline at end of file