|
- # Copyright 2019 Huawei Technologies Co., Ltd
- #
- # Licensed under the Apache License, Version 2.0 (the "License");
- # you may not use this file except in compliance with the License.
- # You may obtain a copy of the License at
- #
- # http://www.apache.org/licenses/LICENSE-2.0
- #
- # Unless required by applicable law or agreed to in writing, software
- # distributed under the License is distributed on an "AS IS" BASIS,
- # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- # See the License for the specific language governing permissions and
- # limitations under the License.
- # ============================================================================
- """This module is used to collect lineage information of model training."""
- import json
- import os
-
- import numpy as np
- from mindinsight.lineagemgr.common.exceptions.error_code import LineageErrorMsg, LineageErrors
- from mindinsight.lineagemgr.common.log import logger as log
- from mindinsight.utils.exceptions import MindInsightException
-
- from ._summary_record import LineageSummary
- from .base import Metadata
- from .utils import try_except, LineageParamRunContextError, LineageGetModelFileError, LineageLogError, \
- validate_int_params, validate_file_path, validate_raise_exception, \
- validate_user_defined_info, make_directory
-
- try:
- from mindspore.common.tensor import Tensor
- from mindspore.train.callback import Callback, RunContext, ModelCheckpoint, SummaryStep
- from mindspore.nn import Cell, Optimizer
- from mindspore.nn.loss.loss import _Loss
- from mindspore.dataset.engine import Dataset, ImageFolderDatasetV2, MnistDataset, Cifar10Dataset, Cifar100Dataset, \
- VOCDataset, CelebADataset, MindDataset, ManifestDataset, TFRecordDataset, TextFileDataset
- import mindspore.dataset as ds
- except (ImportError, ModuleNotFoundError):
- log.warning('MindSpore Not Found!')
-
-
- class TrainLineage(Callback):
- """
- Collect lineage of a training job.
-
- Args:
- summary_record (Union[SummaryRecord, str]): The `SummaryRecord` object which
- is used to record the summary value(see mindspore.train.summary.SummaryRecord),
- or a log dir(as a `str`) to be passed to `LineageSummary` to create
- a lineage summary recorder. It should be noted that instead of making
- use of summary_record to record lineage info directly, we obtain
- log dir from it then create a new summary file to write lineage info.
- raise_exception (bool): Whether to raise exception when error occurs in
- TrainLineage. If True, raise exception. If False, catch exception
- and continue. Default: False.
- user_defined_info (dict): User defined information. Only flatten dict with
- str key and int/float/str value is supported. Default: None.
-
- Raises:
- MindInsightException: If validating parameter fails.
- LineageLogError: If recording lineage information fails.
-
- Examples:
- >>> from mindinsight.lineagemgr import TrainLineage
- >>> from mindspore.train.callback import ModelCheckpoint, SummaryStep
- >>> from mindspore.train.summary import SummaryRecord
- >>> model = Model(train_network)
- >>> model_ckpt = ModelCheckpoint(directory='/dir/to/save/model/')
- >>> summary_writer = SummaryRecord(log_dir='./')
- >>> summary_callback = SummaryStep(summary_writer, flush_step=2)
- >>> lineagemgr = TrainLineage(summary_record=summary_writer)
- >>> model.train(epoch_num, dataset, callbacks=[model_ckpt, summary_callback, lineagemgr])
- """
- def __init__(self,
- summary_record,
- raise_exception=False,
- user_defined_info=None):
- super(TrainLineage, self).__init__()
- try:
- validate_raise_exception(raise_exception)
- self.raise_exception = raise_exception
-
- if isinstance(summary_record, str):
- # make directory if not exist
- self.lineage_log_dir = make_directory(summary_record)
- else:
- summary_log_path = summary_record.full_file_name
- validate_file_path(summary_log_path)
- self.lineage_log_dir = os.path.dirname(summary_log_path)
-
- self.lineage_summary = LineageSummary(self.lineage_log_dir)
-
- self.initial_learning_rate = None
-
- self.user_defined_info = user_defined_info
- if user_defined_info:
- validate_user_defined_info(user_defined_info)
-
- except MindInsightException as err:
- log.error(err)
- if raise_exception:
- raise
-
- @try_except(log)
- def begin(self, run_context):
- """
- Initialize the training progress when the training job begins.
-
- Args:
- run_context (RunContext): It contains all lineage information,
- see mindspore.train.callback.RunContext.
-
- Raises:
- MindInsightException: If validating parameter fails.
- """
- log.info('Initialize training lineage collection...')
-
- if self.user_defined_info:
- self.lineage_summary.record_user_defined_info(self.user_defined_info)
-
- if not isinstance(run_context, RunContext):
- error_msg = f'Invalid TrainLineage run_context.'
- log.error(error_msg)
- raise LineageParamRunContextError(error_msg)
-
- run_context_args = run_context.original_args()
- if not self.initial_learning_rate:
- optimizer = run_context_args.get('optimizer')
- if optimizer and not isinstance(optimizer, Optimizer):
- log.error("The parameter optimizer is invalid. It should be an instance of "
- "mindspore.nn.optim.optimizer.Optimizer.")
- raise MindInsightException(error=LineageErrors.PARAM_OPTIMIZER_ERROR,
- message=LineageErrorMsg.PARAM_OPTIMIZER_ERROR.value)
- if optimizer:
- log.info('Obtaining initial learning rate...')
- self.initial_learning_rate = AnalyzeObject.analyze_optimizer(optimizer)
- log.debug('initial_learning_rate: %s', self.initial_learning_rate)
- else:
- network = run_context_args.get('train_network')
- optimizer = AnalyzeObject.get_optimizer_by_network(network)
- self.initial_learning_rate = AnalyzeObject.analyze_optimizer(optimizer)
- log.debug('initial_learning_rate: %s', self.initial_learning_rate)
-
- # get train dataset graph
- train_dataset = run_context_args.get('train_dataset')
- dataset_graph_dict = ds.serialize(train_dataset)
- dataset_graph_json_str = json.dumps(dataset_graph_dict, indent=2)
- dataset_graph_dict = json.loads(dataset_graph_json_str)
- log.info('Logging dataset graph...')
- try:
- self.lineage_summary.record_dataset_graph(dataset_graph=dataset_graph_dict)
- except Exception as error:
- error_msg = f'Dataset graph log error in TrainLineage begin: {error}'
- log.error(error_msg)
- raise LineageLogError(error_msg)
- log.info('Dataset graph logged successfully.')
-
- @try_except(log)
- def end(self, run_context):
- """
- Collect lineage information when the training job ends.
-
- Args:
- run_context (RunContext): It contains all lineage information,
- see mindspore.train.callback.RunContext.
-
- Raises:
- LineageLogError: If recording lineage information fails.
- """
- log.info('Start to collect training lineage...')
- if not isinstance(run_context, RunContext):
- error_msg = f'Invalid TrainLineage run_context.'
- log.error(error_msg)
- raise LineageParamRunContextError(error_msg)
-
- run_context_args = run_context.original_args()
-
- train_lineage = dict()
- train_lineage = AnalyzeObject.get_network_args(
- run_context_args, train_lineage
- )
-
- train_dataset = run_context_args.get('train_dataset')
- callbacks = run_context_args.get('list_callback')
- list_callback = getattr(callbacks, '_callbacks', [])
-
- log.info('Obtaining model files...')
- ckpt_file_path, _ = AnalyzeObject.get_file_path(list_callback)
-
- train_lineage[Metadata.learning_rate] = self.initial_learning_rate
- train_lineage[Metadata.epoch] = run_context_args.get('epoch_num')
- train_lineage[Metadata.step_num] = run_context_args.get('cur_step_num')
- train_lineage[Metadata.parallel_mode] = run_context_args.get('parallel_mode')
- train_lineage[Metadata.device_num] = run_context_args.get('device_number')
- train_lineage[Metadata.batch_size] = run_context_args.get('batch_num')
- model_path_dict = {
- 'ckpt': ckpt_file_path
- }
- train_lineage[Metadata.model_path] = json.dumps(model_path_dict)
-
- log.info('Calculating model size...')
- train_lineage[Metadata.model_size] = AnalyzeObject.get_model_size(
- ckpt_file_path
- )
- log.debug('model_size: %s', train_lineage[Metadata.model_size])
-
- log.info('Analyzing dataset object...')
- train_lineage = AnalyzeObject.analyze_dataset(train_dataset, train_lineage, 'train')
-
- log.info('Logging lineage information...')
- try:
- self.lineage_summary.record_train_lineage(train_lineage)
- except IOError as error:
- error_msg = f'End error in TrainLineage: {error}'
- log.error(error_msg)
- raise LineageLogError(error_msg)
- except Exception as error:
- error_msg = f'End error in TrainLineage: {error}'
- log.error(error_msg)
- log.error('Fail to log the lineage of the training job.')
- raise LineageLogError(error_msg)
- log.info('The lineage of the training job has logged successfully.')
-
-
- class EvalLineage(Callback):
- """
- Collect lineage of an evaluation job.
-
- Args:
- summary_record (Union[SummaryRecord, str]): The `SummaryRecord` object which
- is used to record the summary value(see mindspore.train.summary.SummaryRecord),
- or a log dir(as a `str`) to be passed to `LineageSummary` to create
- a lineage summary recorder. It should be noted that instead of making
- use of summary_record to record lineage info directly, we obtain
- log dir from it then create a new summary file to write lineage info.
- raise_exception (bool): Whether to raise exception when error occurs in
- EvalLineage. If True, raise exception. If False, catch exception
- and continue. Default: False.
- user_defined_info (dict): User defined information. Only flatten dict with
- str key and int/float/str value is supported. Default: None.
-
- Raises:
- MindInsightException: If validating parameter fails.
- LineageLogError: If recording lineage information fails.
-
- Examples:
- >>> from mindinsight.lineagemgr import EvalLineage
- >>> from mindspore.train.callback import ModelCheckpoint, SummaryStep
- >>> from mindspore.train.summary import SummaryRecord
- >>> model = Model(train_network)
- >>> model_ckpt = ModelCheckpoint(directory='/dir/to/save/model/')
- >>> summary_writer = SummaryRecord(log_dir='./')
- >>> summary_callback = SummaryStep(summary_writer, flush_step=2)
- >>> lineagemgr = EvalLineage(summary_record=summary_writer)
- >>> model.eval(epoch_num, dataset, callbacks=[model_ckpt, summary_callback, lineagemgr])
- """
- def __init__(self,
- summary_record,
- raise_exception=False,
- user_defined_info=None):
- super(EvalLineage, self).__init__()
- try:
- validate_raise_exception(raise_exception)
- self.raise_exception = raise_exception
-
- if isinstance(summary_record, str):
- # make directory if not exist
- self.lineage_log_dir = make_directory(summary_record)
- else:
- summary_log_path = summary_record.full_file_name
- validate_file_path(summary_log_path)
- self.lineage_log_dir = os.path.dirname(summary_log_path)
-
- self.lineage_summary = LineageSummary(self.lineage_log_dir)
-
- self.user_defined_info = user_defined_info
- if self.user_defined_info:
- validate_user_defined_info(self.user_defined_info)
-
- except MindInsightException as err:
- log.error(err)
- if raise_exception:
- raise
-
- @try_except(log)
- def end(self, run_context):
- """
- Collect lineage information when the training job ends.
-
- Args:
- run_context (RunContext): It contains all lineage information,
- see mindspore.train.callback.RunContext.
-
- Raises:
- MindInsightException: If validating parameter fails.
- LineageLogError: If recording lineage information fails.
- """
- if self.user_defined_info:
- self.lineage_summary.record_user_defined_info(self.user_defined_info)
-
- if not isinstance(run_context, RunContext):
- error_msg = f'Invalid EvalLineage run_context.'
- log.error(error_msg)
- raise LineageParamRunContextError(error_msg)
-
- run_context_args = run_context.original_args()
-
- valid_dataset = run_context_args.get('valid_dataset')
-
- eval_lineage = dict()
- metrics = run_context_args.get('metrics')
- eval_lineage[Metadata.metrics] = json.dumps(metrics)
- eval_lineage[Metadata.step_num] = run_context_args.get('cur_step_num')
-
- log.info('Analyzing dataset object...')
- eval_lineage = AnalyzeObject.analyze_dataset(valid_dataset, eval_lineage, 'valid')
-
- log.info('Logging evaluation job lineage...')
- try:
- self.lineage_summary.record_evaluation_lineage(eval_lineage)
- except IOError as error:
- error_msg = f'End error in EvalLineage: {error}'
- log.error(error_msg)
- log.error('Fail to log the lineage of the evaluation job.')
- raise LineageLogError(error_msg)
- except Exception as error:
- error_msg = f'End error in EvalLineage: {error}'
- log.error(error_msg)
- log.error('Fail to log the lineage of the evaluation job.')
- raise LineageLogError(error_msg)
- log.info('The lineage of the evaluation job has logged successfully.')
-
-
- class AnalyzeObject:
- """Analyze class object in MindSpore."""
-
- @staticmethod
- def get_optimizer_by_network(network):
- """
- Get optimizer by analyzing network.
-
- Args:
- network (Cell): See mindspore.nn.Cell.
-
- Returns:
- Optimizer, an Optimizer object.
- """
- optimizer = None
- net_args = vars(network) if network else {}
- net_cell = net_args.get('_cells') if net_args else {}
- for _, value in net_cell.items():
- if isinstance(value, Optimizer):
- optimizer = value
- break
- return optimizer
-
- @staticmethod
- def get_loss_fn_by_network(network):
- """
- Get loss function by analyzing network.
-
- Args:
- network (Cell): See mindspore.nn.Cell.
-
- Returns:
- Loss_fn, a Cell object.
- """
- loss_fn = None
- inner_cell_list = []
- net_args = vars(network) if network else {}
- net_cell = net_args.get('_cells') if net_args else {}
- for _, value in net_cell.items():
- if isinstance(value, Cell) and \
- not isinstance(value, Optimizer):
- inner_cell_list.append(value)
-
- while inner_cell_list:
- inner_net_args = vars(inner_cell_list[0])
- inner_net_cell = inner_net_args.get('_cells')
-
- for value in inner_net_cell.values():
- if isinstance(value, _Loss):
- loss_fn = value
- break
- if isinstance(value, Cell):
- inner_cell_list.append(value)
- if loss_fn:
- break
-
- inner_cell_list.pop(0)
-
- return loss_fn
-
- @staticmethod
- def get_backbone_network(network):
- """
- Get the name of backbone network.
-
- Args:
- network (Cell): The train network.
-
- Returns:
- str, the name of the backbone network.
- """
- backbone_name = None
- has_network = False
- network_key = 'network'
- backbone_key = '_backbone'
-
- net_args = vars(network) if network else {}
- net_cell = net_args.get('_cells') if net_args else {}
-
- for key, value in net_cell.items():
- if key == network_key:
- network = value
- has_network = True
- break
-
- if has_network:
- while hasattr(network, network_key):
- network = getattr(network, network_key)
- if hasattr(network, backbone_key):
- backbone = getattr(network, backbone_key)
- backbone_name = type(backbone).__name__
- if backbone_name is None and network is not None:
- backbone_name = type(network).__name__
- return backbone_name
-
- @staticmethod
- def analyze_optimizer(optimizer):
- """
- Analyze Optimizer, a Cell object of MindSpore.
-
- In this way, we can obtain the following attributes:
- learning_rate (float),
- weight_decay (float),
- momentum (float),
- weights (float).
-
- Args:
- optimizer (Optimizer): See mindspore.nn.optim.Optimizer.
-
- Returns:
- float, the learning rate that the optimizer adopted.
- """
- learning_rate = None
- if isinstance(optimizer, Optimizer):
- learning_rate = getattr(optimizer, 'learning_rate', None)
-
- if learning_rate:
- learning_rate = learning_rate.default_input
-
- # Get the real learning rate value
- if isinstance(learning_rate, Tensor):
- learning_rate = learning_rate.asnumpy()
- if learning_rate.ndim == 0:
- learning_rate = np.atleast_1d(learning_rate)
- learning_rate = list(learning_rate)
- elif isinstance(learning_rate, float):
- learning_rate = [learning_rate]
-
- return learning_rate[0] if learning_rate else None
-
- @staticmethod
- def analyze_dataset(dataset, lineage_dict, dataset_type):
- """
- Analyze Dataset, a Dataset object of MindSpore.
-
- In this way, we can obtain the following attributes:
- dataset_path (str),
- train_dataset_size (int),
- valid_dataset_size (int),
- batch_size (int)
-
- Args:
- dataset (Dataset): See mindspore.dataengine.datasets.Dataset.
- lineage_dict (dict): A dict contains lineage metadata.
- dataset_type (str): Dataset type, train or valid.
-
- Returns:
- dict, the lineage metadata.
- """
- batch_num = dataset.get_dataset_size()
- batch_size = dataset.get_batch_size()
- if batch_num is not None:
- validate_int_params(batch_num, 'dataset_batch_num')
- validate_int_params(batch_num, 'dataset_batch_size')
- log.debug('dataset_batch_num: %d', batch_num)
- log.debug('dataset_batch_size: %d', batch_size)
- dataset_path = AnalyzeObject.get_dataset_path_wrapped(dataset)
- if dataset_path and os.path.isfile(dataset_path):
- dataset_path, _ = os.path.split(dataset_path)
-
- dataset_size = int(batch_num * batch_size)
- if dataset_type == 'train':
- lineage_dict[Metadata.train_dataset_path] = dataset_path
- lineage_dict[Metadata.train_dataset_size] = dataset_size
- elif dataset_type == 'valid':
- lineage_dict[Metadata.valid_dataset_path] = dataset_path
- lineage_dict[Metadata.valid_dataset_size] = dataset_size
-
- return lineage_dict
-
- def get_dataset_path(self, output_dataset):
- """
- Get dataset path of MindDataset object.
-
- Args:
- output_dataset (Union[Dataset, ImageFolderDatasetV2, MnistDataset, Cifar10Dataset, Cifar100Dataset,
- VOCDataset, CelebADataset, MindDataset, ManifestDataset, TFRecordDataset, TextFileDataset]):
- See mindspore.dataengine.datasets.Dataset.
-
- Returns:
- str, dataset path.
- """
- dataset_dir_set = (ImageFolderDatasetV2, MnistDataset, Cifar10Dataset,
- Cifar100Dataset, VOCDataset, CelebADataset)
- dataset_file_set = (MindDataset, ManifestDataset)
- dataset_files_set = (TFRecordDataset, TextFileDataset)
-
- if isinstance(output_dataset, dataset_file_set):
- return output_dataset.dataset_file
- if isinstance(output_dataset, dataset_dir_set):
- return output_dataset.dataset_dir
- if isinstance(output_dataset, dataset_files_set):
- return output_dataset.dataset_files[0]
- return self.get_dataset_path(output_dataset.inputs[0])
-
- @staticmethod
- def get_dataset_path_wrapped(dataset):
- """
- A wrapper for obtaining dataset path.
-
- Args:
- dataset (Union[MindDataset, Dataset]): See
- mindspore.dataengine.datasets.Dataset.
-
- Returns:
- str, dataset path.
- """
- dataset_path = None
- if isinstance(dataset, Dataset):
- try:
- dataset_path = AnalyzeObject().get_dataset_path(dataset)
- except IndexError:
- dataset_path = None
- dataset_path = validate_file_path(dataset_path, allow_empty=True)
- return dataset_path
-
- @staticmethod
- def get_file_path(list_callback):
- """
- Get ckpt_file_name and summary_log_path from MindSpore callback list.
-
- Args:
- list_callback (list[Callback]): The MindSpore training Callback list.
-
- Returns:
- tuple, contains ckpt_file_name and summary_log_path.
- """
- ckpt_file_path = None
- summary_log_path = None
- for callback in list_callback:
- if isinstance(callback, ModelCheckpoint):
- ckpt_file_path = callback.latest_ckpt_file_name
- if isinstance(callback, SummaryStep):
- summary_log_path = callback.summary_file_name
-
- if ckpt_file_path:
- validate_file_path(ckpt_file_path)
- ckpt_file_path = os.path.realpath(ckpt_file_path)
-
- if summary_log_path:
- validate_file_path(summary_log_path)
- summary_log_path = os.path.realpath(summary_log_path)
-
- return ckpt_file_path, summary_log_path
-
- @staticmethod
- def get_file_size(file_path):
- """
- Get the file size.
-
- Args:
- file_path (str): The file path.
-
- Returns:
- int, the file size.
- """
- try:
- return os.path.getsize(file_path)
- except (OSError, IOError) as error:
- error_msg = f"Error when get model file size: {error}"
- log.error(error_msg)
- raise LineageGetModelFileError(error_msg)
-
- @staticmethod
- def get_model_size(ckpt_file_path):
- """
- Get model the total size of the model file and the checkpoint file.
-
- Args:
- ckpt_file_path (str): The checkpoint file path.
-
- Returns:
- int, the total file size.
- """
- if ckpt_file_path:
- ckpt_file_path = os.path.realpath(ckpt_file_path)
- ckpt_file_size = AnalyzeObject.get_file_size(ckpt_file_path)
- else:
- ckpt_file_size = 0
-
- return ckpt_file_size
-
- @staticmethod
- def get_network_args(run_context_args, train_lineage):
- """
- Get the parameters related to the network,
- such as optimizer, loss function.
-
- Args:
- run_context_args (dict): It contains all information of the training job.
- train_lineage (dict): A dict contains lineage metadata.
-
- Returns:
- dict, the lineage metadata.
- """
- network = run_context_args.get('train_network')
- optimizer = run_context_args.get('optimizer')
- if not optimizer:
- optimizer = AnalyzeObject.get_optimizer_by_network(network)
- loss_fn = run_context_args.get('loss_fn')
- if not loss_fn:
- loss_fn = AnalyzeObject.get_loss_fn_by_network(network)
- loss = None
- else:
- loss = run_context_args.get('net_outputs')
-
- if loss:
- log.info('Calculating loss...')
- loss_numpy = loss.asnumpy()
- loss = float(np.atleast_1d(loss_numpy)[0])
- log.debug('loss: %s', loss)
- train_lineage[Metadata.loss] = loss
- else:
- train_lineage[Metadata.loss] = None
-
- # Analyze classname of optimizer, loss function and training network.
- train_lineage[Metadata.optimizer] = type(optimizer).__name__ \
- if optimizer else None
- train_lineage[Metadata.train_network] = AnalyzeObject.get_backbone_network(network)
- train_lineage[Metadata.loss_function] = type(loss_fn).__name__ \
- if loss_fn else None
-
- return train_lineage
|