|
- # Copyright 2020 Huawei Technologies Co., Ltd
- #
- # Licensed under the Apache License, Version 2.0 (the "License");
- # you may not use this file except in compliance with the License.
- # You may obtain a copy of the License at
- #
- # http://www.apache.org/licenses/LICENSE-2.0
- #
- # Unless required by applicable law or agreed to in writing, software
- # distributed under the License is distributed on an "AS IS" BASIS,
- # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- # See the License for the specific language governing permissions and
- # limitations under the License.
- # ============================================================================
- """The converter between proto format event of lineage and dict."""
- import socket
- import time
-
- from mindinsight.datavisual.proto_files.mindinsight_lineage_pb2 import LineageEvent
- from mindinsight.lineagemgr.common.exceptions.exceptions import LineageParamTypeError
- from mindinsight.lineagemgr.common.log import logger as log
-
- # Set the Event mark
- EVENT_FILE_NAME_MARK = "out.events."
- # Set lineage file mark
- LINEAGE_FILE_NAME_MARK = "_lineage"
-
-
- def package_dataset_graph(graph):
- """
- Package dataset graph.
-
- Args:
- graph (dict): Dataset graph.
-
- Returns:
- LineageEvent, the proto message event contains dataset graph.
- """
- dataset_graph_event = LineageEvent()
- dataset_graph_event.wall_time = time.time()
-
- dataset_graph = dataset_graph_event.dataset_graph
- if "children" in graph:
- children = graph.pop("children")
- if children:
- _package_children(children=children, message=dataset_graph)
- _package_current_dataset(operation=graph, message=dataset_graph)
-
- return dataset_graph_event
-
-
- def _package_children(children, message):
- """
- Package children in dataset operation.
-
- Args:
- children (list[dict]): Child operations.
- message (DatasetGraph): Children proto message.
- """
- for child in children:
- if child:
- child_graph_message = getattr(message, "children").add()
- grandson = child.pop("children")
- if grandson:
- _package_children(children=grandson, message=child_graph_message)
- # package other parameters
- _package_current_dataset(operation=child, message=child_graph_message)
-
-
- def _package_current_dataset(operation, message):
- """
- Package operation parameters in event message.
-
- Args:
- operation (dict): Operation dict.
- message (Operation): Operation proto message.
- """
- for key, value in operation.items():
- if value and key == "operations":
- for operator in value:
- _package_enhancement_operation(
- operator,
- message.operations.add()
- )
- elif value and key == "sampler":
- _package_enhancement_operation(
- value,
- message.sampler
- )
- else:
- _package_parameter(key, value, message.parameter)
-
-
- def _package_enhancement_operation(operation, message):
- """
- Package enhancement operation in MapDataset.
-
- Args:
- operation (dict): Enhancement operation.
- message (Operation): Enhancement operation proto message.
- """
- for key, value in operation.items():
- if isinstance(value, list):
- if all(isinstance(ele, int) for ele in value):
- message.size.extend(value)
- else:
- message.weights.extend(value)
- else:
- _package_parameter(key, value, message.operationParam)
-
-
- def _package_parameter(key, value, message):
- """
- Package parameters in operation.
-
- Args:
- key (str): Operation name.
- value (Union[str, bool, int, float, list, None]): Operation args.
- message (OperationParameter): Operation proto message.
- """
- if isinstance(value, str):
- message.mapStr[key] = value
- elif isinstance(value, bool):
- message.mapBool[key] = value
- elif isinstance(value, int):
- message.mapInt[key] = value
- elif isinstance(value, float):
- message.mapDouble[key] = value
- elif isinstance(value, list) and key != "operations":
- if value:
- replace_value_list = list(map(lambda x: "" if x is None else x, value))
- message.mapStrList[key].strValue.extend(replace_value_list)
- elif value is None:
- message.mapStr[key] = "None"
- else:
- error_msg = "Parameter {} is not supported " \
- "in event package.".format(key)
- log.error(error_msg)
- raise LineageParamTypeError(error_msg)
-
-
- def package_user_defined_info(user_dict):
- """
- Package user defined info.
-
- Args:
- user_dict(dict): User defined info dict.
-
- Returns:
- LineageEvent, the proto message event contains user defined info.
-
- """
- user_event = LineageEvent()
- user_event.wall_time = time.time()
- user_defined_info = user_event.user_defined_info
- _package_user_defined_info(user_dict, user_defined_info)
-
- return user_event
-
-
- def _package_user_defined_info(user_defined_dict, user_defined_message):
- """
- Setting attribute in user defined proto message.
-
- Args:
- user_defined_dict (dict): User define info dict.
- user_defined_message (LineageEvent): Proto message of user defined info.
-
- Raises:
- LineageParamValueError: When the value is out of range.
- LineageParamTypeError: When given a type not support yet.
- """
- for key, value in user_defined_dict.items():
- if not isinstance(key, str):
- error_msg = f"Invalid key type in user defined info. The {key}'s type" \
- f"'{type(key).__name__}' is not supported. It should be str."
- log.error(error_msg)
-
- if isinstance(value, int):
- attr_name = "map_int32"
- elif isinstance(value, float):
- attr_name = "map_double"
- elif isinstance(value, str):
- attr_name = "map_str"
- else:
- attr_name = "attr_name"
-
- add_user_defined_info = user_defined_message.user_info.add()
- try:
- getattr(add_user_defined_info, attr_name)[key] = value
- except AttributeError:
- error_msg = f"Invalid value type in user defined info. The {value}'s type" \
- f"'{type(value).__name__}' is not supported. It should be float, int or str."
- log.error(error_msg)
-
-
- def get_lineage_file_name():
- """
- Get lineage file name.
-
- Lineage filename format is:
- EVENT_FILE_NAME_MARK + "summary." + time(seconds) + "." + Hostname + lineage_suffix.
-
- Returns:
- str, the name of event log file.
- """
-
- time_second = str(int(time.time()))
- hostname = socket.gethostname()
- file_name = f'{EVENT_FILE_NAME_MARK}summary.{time_second}.{hostname}{LINEAGE_FILE_NAME_MARK}'
-
- return file_name
|