|
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349 |
- # Copyright 2019 Huawei Technologies Co., Ltd
- #
- # Licensed under the Apache License, Version 2.0 (the "License");
- # you may not use this file except in compliance with the License.
- # You may obtain a copy of the License at
- #
- # http://www.apache.org/licenses/LICENSE-2.0
- #
- # Unless required by applicable law or agreed to in writing, software
- # distributed under the License is distributed on an "AS IS" BASIS,
- # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- # See the License for the specific language governing permissions and
- # limitations under the License.
- # ============================================================================
- """This file is used to define the model lineage python api."""
- import os
- import numpy as np
- import pandas as pd
-
- from mindinsight.lineagemgr.common.exceptions.exceptions import LineageParamValueError, \
- LineageQuerySummaryDataError, LineageParamSummaryPathError, \
- LineageQuerierParamException, LineageDirNotExistError, LineageSearchConditionParamError, \
- LineageParamTypeError, LineageSummaryParseException
- from mindinsight.lineagemgr.common.log import logger as log
- from mindinsight.lineagemgr.common.utils import normalize_summary_dir, get_relative_path
- from mindinsight.lineagemgr.common.validator.model_parameter import SearchModelConditionParameter
- from mindinsight.lineagemgr.common.validator.validate import validate_filter_key, validate_search_model_condition, \
- validate_condition, validate_path, validate_train_id
- from mindinsight.lineagemgr.lineage_parser import LineageParser, LineageOrganizer
- from mindinsight.lineagemgr.querier.querier import Querier
- from mindinsight.optimizer.common.enums import ReasonCode
- from mindinsight.optimizer.utils.utils import is_simple_numpy_number
- from mindinsight.utils.exceptions import MindInsightException
-
- _METRIC_PREFIX = "[M]"
- _USER_DEFINED_PREFIX = "[U]"
-
- USER_DEFINED_INFO_LIMIT = 100
-
-
- def get_summary_lineage(data_manager=None, summary_dir=None, keys=None):
- """
- Get summary lineage from data_manager or parsing from summaries.
-
- One of data_manager or summary_dir needs to be specified. Support getting
- super_lineage_obj from data_manager or parsing summaries by summary_dir.
-
- Args:
- data_manager (DataManager): Data manager defined as
- mindinsight.datavisual.data_transform.data_manager.DataManager
- summary_dir (str): The summary directory. It contains summary logs for
- one training.
- keys (list[str]): The filter keys of lineage information. The acceptable
- keys are `metric`, `user_defined`, `hyper_parameters`, `algorithm`,
- `train_dataset`, `model`, `valid_dataset` and `dataset_graph`.
- If it is `None`, all information will be returned. Default: None.
-
- Returns:
- dict, the lineage information for one training.
-
- Raises:
- LineageParamSummaryPathError: If summary path is invalid.
- LineageQuerySummaryDataError: If querying summary data fails.
- LineageFileNotFoundError: If the summary log file is not found.
-
- """
- default_result = {}
- if data_manager is None and summary_dir is None:
- raise LineageParamTypeError("One of data_manager or summary_dir needs to be specified.")
- if data_manager is not None and summary_dir is None:
- raise LineageParamTypeError("If data_manager is specified, the summary_dir needs to be "
- "specified as relative path.")
-
- if keys is not None:
- validate_filter_key(keys)
-
- if data_manager is None:
- normalize_summary_dir(summary_dir)
- super_lineage_obj = LineageParser(summary_dir).super_lineage_obj
- else:
- validate_train_id(summary_dir)
- super_lineage_obj = LineageOrganizer(data_manager=data_manager).get_super_lineage_obj(summary_dir)
-
- if super_lineage_obj is None:
- return default_result
-
- try:
- result = Querier({summary_dir: super_lineage_obj}).get_summary_lineage(summary_dir, keys)
- except (LineageQuerierParamException, LineageParamTypeError) as error:
- log.error(str(error))
- log.exception(error)
- raise LineageQuerySummaryDataError("Get summary lineage failed.")
- return result[0]
-
-
- def filter_summary_lineage(data_manager=None, summary_base_dir=None, search_condition=None, added=False):
- """
- Filter summary lineage from data_manager or parsing from summaries.
-
- One of data_manager or summary_base_dir needs to be specified. Support getting
- super_lineage_obj from data_manager or parsing summaries by summary_base_dir.
-
- Args:
- data_manager (DataManager): Data manager defined as
- mindinsight.datavisual.data_transform.data_manager.DataManager
- summary_base_dir (str): The summary base directory. It contains summary
- directories generated by training.
- search_condition (dict): The search condition.
- """
- if data_manager is None and summary_base_dir is None:
- raise LineageParamTypeError("One of data_manager or summary_base_dir needs to be specified.")
-
- if data_manager is None:
- summary_base_dir = normalize_summary_dir(summary_base_dir)
- else:
- summary_base_dir = data_manager.summary_base_dir
-
- search_condition = {} if search_condition is None else search_condition
-
- try:
- validate_condition(search_condition)
- validate_search_model_condition(SearchModelConditionParameter, search_condition)
- except MindInsightException as error:
- log.error(str(error))
- log.exception(error)
- raise LineageSearchConditionParamError(str(error.message))
-
- try:
- search_condition = _convert_relative_path_to_abspath(summary_base_dir, search_condition)
- except (LineageParamValueError, LineageDirNotExistError) as error:
- log.error(str(error))
- log.exception(error)
- raise LineageParamSummaryPathError(str(error.message))
-
- try:
- lineage_objects = LineageOrganizer(data_manager, summary_base_dir).super_lineage_objs
- result = Querier(lineage_objects).filter_summary_lineage(
- condition=search_condition,
- added=added
- )
- except LineageSummaryParseException:
- result = {'object': [], 'count': 0}
- except (LineageQuerierParamException, LineageParamTypeError) as error:
- log.error(str(error))
- log.exception(error)
- raise LineageQuerySummaryDataError("Filter summary lineage failed.")
-
- return result
-
-
- def _convert_relative_path_to_abspath(summary_base_dir, search_condition):
- """
- Convert relative path to absolute path.
-
- Args:
- summary_base_dir (str): The summary base directory.
- search_condition (dict): The search condition.
-
- Returns:
- dict, the updated search_condition.
-
- Raises:
- LineageParamValueError: If the value of input_name is invalid.
- """
- if ("summary_dir" not in search_condition) or (not search_condition.get("summary_dir")):
- return search_condition
-
- summary_dir_condition = search_condition.get("summary_dir")
-
- for key in ['in', 'not_in']:
- if key in summary_dir_condition:
- summary_paths = []
- for summary_dir in summary_dir_condition.get(key):
- if summary_dir.startswith('./'):
- abs_dir = os.path.join(
- summary_base_dir, summary_dir[2:]
- )
- abs_dir = validate_path(abs_dir)
- else:
- abs_dir = validate_path(summary_dir)
- summary_paths.append(abs_dir)
- search_condition.get('summary_dir')[key] = summary_paths
-
- if 'eq' in summary_dir_condition:
- summary_dir = summary_dir_condition.get('eq')
- if summary_dir.startswith('./'):
- abs_dir = os.path.join(
- summary_base_dir, summary_dir[2:]
- )
- abs_dir = validate_path(abs_dir)
- else:
- abs_dir = validate_path(summary_dir)
- search_condition.get('summary_dir')['eq'] = abs_dir
-
- return search_condition
-
-
- def get_flattened_lineage(data_manager, search_condition=None):
- """
- Get lineage data in a table from data manager.
-
- Args:
- data_manager (mindinsight.datavisual.data_manager.DataManager): An object to manage loading.
- search_condition (dict): The search condition.
-
- Returns:
- Dict[str, list]: A dict contains keys and values from lineages.
-
- """
- summary_base_dir, flatten_dict, user_count = data_manager.summary_base_dir, {'train_id': []}, 0
- lineages = filter_summary_lineage(data_manager=data_manager, search_condition=search_condition).get("object", [])
- for index, lineage in enumerate(lineages):
- flatten_dict['train_id'].append(get_relative_path(lineage.get("summary_dir"), summary_base_dir))
- for key, val in _flatten_lineage(lineage.get('model_lineage', {})):
- if key.startswith(_USER_DEFINED_PREFIX) and key not in flatten_dict:
- if user_count > USER_DEFINED_INFO_LIMIT:
- log.warning("The user_defined_info has reached the limit %s. %r is ignored",
- USER_DEFINED_INFO_LIMIT, key)
- continue
- user_count += 1
- if key not in flatten_dict:
- flatten_dict[key] = [None] * index
- flatten_dict[key].append(_parse_value(val))
- for vals in flatten_dict.values():
- if len(vals) == index:
- vals.append(None)
- return flatten_dict
-
-
- def _flatten_lineage(lineage):
- """Flatten the lineage."""
- for key, val in lineage.items():
- if key == 'metric':
- for k, v in val.items():
- yield f'{_METRIC_PREFIX}{k}', v
- elif key == 'user_defined':
- for k, v in val.items():
- yield f'{_USER_DEFINED_PREFIX}{k}', v
- else:
- yield key, val
-
-
- def _parse_value(val):
- """Parse value."""
- if isinstance(val, str) and val.lower() in ['nan', 'inf']:
- return np.nan
- return val
-
-
- class LineageTable:
- """Wrap lineage data in a table."""
- _LOSS_NAME = "loss"
- _NOT_TUNABLE_NAMES = [_LOSS_NAME, "train_id", "device_num", "model_size",
- "test_dataset_count", "train_dataset_count"]
-
- def __init__(self, df: pd.DataFrame):
- self._df = df
- self.train_ids = self._df["train_id"].tolist()
- self._drop_columns_info = []
- self._remove_unsupported_columns()
-
- def _remove_unsupported_columns(self):
- """Remove unsupported columns."""
- columns_to_drop = []
- for name, data in self._df.iteritems():
- if not is_simple_numpy_number(data.dtype):
- columns_to_drop.append(name)
-
- if columns_to_drop:
- log.debug("Unsupported columns: %s", columns_to_drop)
- self._df = self._df.drop(columns=columns_to_drop)
-
- for name in columns_to_drop:
- if not name.startswith(_USER_DEFINED_PREFIX):
- continue
- self._drop_columns_info.append({
- "name": name,
- "unselected": True,
- "reason_code": ReasonCode.NOT_ALL_NUMBERS.value
- })
-
- @property
- def target_names(self):
- """Get names for optimize targets (eg loss, accuracy)."""
- target_names = [name for name in self._df.columns if name.startswith(_METRIC_PREFIX)]
- if self._LOSS_NAME in self._df.columns:
- target_names.append(self._LOSS_NAME)
- return target_names
-
- @property
- def hyper_param_names(self, tunable=True):
- """Get hyper param names."""
- blocked_names = self._get_blocked_names(tunable)
-
- hyper_param_names = [
- name for name in self._df.columns
- if not name.startswith(_METRIC_PREFIX) and name not in blocked_names]
-
- if self._LOSS_NAME in hyper_param_names:
- hyper_param_names.remove(self._LOSS_NAME)
-
- return hyper_param_names
-
- def _get_blocked_names(self, tunable):
- if tunable:
- block_names = self._NOT_TUNABLE_NAMES
- else:
- block_names = []
- return block_names
-
- @property
- def user_defined_hyper_param_names(self):
- """Get user defined hyper param names."""
- names = [name for name in self._df.columns if name.startswith(_USER_DEFINED_PREFIX)]
- return names
-
- def get_column(self, name):
- """
- Get data for specified column.
- Args:
- name (str): column name.
-
- Returns:
- np.ndarray, specified column.
-
- """
- return self._df[name]
-
- def get_column_values(self, name):
- """
- Get data for specified column.
- Args:
- name (str): column name.
-
- Returns:
- list, specified column data. If value is np.nan, transform to None.
-
- """
- return [None if np.isnan(num) else num for num in self._df[name].tolist()]
-
- @property
- def dataframe_data(self):
- """Get the DataFrame."""
- return self._df
-
- @property
- def drop_column_info(self):
- """Get dropped columns info."""
- return self._drop_columns_info
|