You can not select more than 25 topics Topics must start with a chinese character,a letter or number, can include dashes ('-') and can be up to 35 characters long.

_summary_adapter.py 7.0 kB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212
  1. # Copyright 2020 Huawei Technologies Co., Ltd
  2. #
  3. # Licensed under the Apache License, Version 2.0 (the "License");
  4. # you may not use this file except in compliance with the License.
  5. # You may obtain a copy of the License at
  6. #
  7. # http://www.apache.org/licenses/LICENSE-2.0
  8. #
  9. # Unless required by applicable law or agreed to in writing, software
  10. # distributed under the License is distributed on an "AS IS" BASIS,
  11. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. # See the License for the specific language governing permissions and
  13. # limitations under the License.
  14. # ============================================================================
  15. """The converter between proto format event of lineage and dict."""
  16. import socket
  17. import time
  18. from mindinsight.datavisual.proto_files.mindinsight_lineage_pb2 import LineageEvent
  19. from mindinsight.lineagemgr.common.exceptions.exceptions import LineageParamTypeError
  20. from mindinsight.lineagemgr.common.log import logger as log
  21. # Set the Event mark
  22. EVENT_FILE_NAME_MARK = "out.events."
  23. # Set lineage file mark
  24. LINEAGE_FILE_NAME_MARK = "_lineage"
  25. def package_dataset_graph(graph):
  26. """
  27. Package dataset graph.
  28. Args:
  29. graph (dict): Dataset graph.
  30. Returns:
  31. LineageEvent, the proto message event contains dataset graph.
  32. """
  33. dataset_graph_event = LineageEvent()
  34. dataset_graph_event.wall_time = time.time()
  35. dataset_graph = dataset_graph_event.dataset_graph
  36. if "children" in graph:
  37. children = graph.pop("children")
  38. if children:
  39. _package_children(children=children, message=dataset_graph)
  40. _package_current_dataset(operation=graph, message=dataset_graph)
  41. return dataset_graph_event
  42. def _package_children(children, message):
  43. """
  44. Package children in dataset operation.
  45. Args:
  46. children (list[dict]): Child operations.
  47. message (DatasetGraph): Children proto message.
  48. """
  49. for child in children:
  50. if child:
  51. child_graph_message = getattr(message, "children").add()
  52. grandson = child.pop("children")
  53. if grandson:
  54. _package_children(children=grandson, message=child_graph_message)
  55. # package other parameters
  56. _package_current_dataset(operation=child, message=child_graph_message)
  57. def _package_current_dataset(operation, message):
  58. """
  59. Package operation parameters in event message.
  60. Args:
  61. operation (dict): Operation dict.
  62. message (Operation): Operation proto message.
  63. """
  64. for key, value in operation.items():
  65. if value and key == "operations":
  66. for operator in value:
  67. _package_enhancement_operation(
  68. operator,
  69. message.operations.add()
  70. )
  71. elif value and key == "sampler":
  72. _package_enhancement_operation(
  73. value,
  74. message.sampler
  75. )
  76. else:
  77. _package_parameter(key, value, message.parameter)
  78. def _package_enhancement_operation(operation, message):
  79. """
  80. Package enhancement operation in MapDataset.
  81. Args:
  82. operation (dict): Enhancement operation.
  83. message (Operation): Enhancement operation proto message.
  84. """
  85. for key, value in operation.items():
  86. if isinstance(value, list):
  87. if all(isinstance(ele, int) for ele in value):
  88. message.size.extend(value)
  89. else:
  90. message.weights.extend(value)
  91. else:
  92. _package_parameter(key, value, message.operationParam)
  93. def _package_parameter(key, value, message):
  94. """
  95. Package parameters in operation.
  96. Args:
  97. key (str): Operation name.
  98. value (Union[str, bool, int, float, list, None]): Operation args.
  99. message (OperationParameter): Operation proto message.
  100. """
  101. if isinstance(value, str):
  102. message.mapStr[key] = value
  103. elif isinstance(value, bool):
  104. message.mapBool[key] = value
  105. elif isinstance(value, int):
  106. message.mapInt[key] = value
  107. elif isinstance(value, float):
  108. message.mapDouble[key] = value
  109. elif isinstance(value, list) and key != "operations":
  110. if value:
  111. replace_value_list = list(map(lambda x: "" if x is None else x, value))
  112. message.mapStrList[key].strValue.extend(replace_value_list)
  113. elif value is None:
  114. message.mapStr[key] = "None"
  115. else:
  116. error_msg = "Parameter {} is not supported " \
  117. "in event package.".format(key)
  118. log.error(error_msg)
  119. raise LineageParamTypeError(error_msg)
  120. def package_user_defined_info(user_dict):
  121. """
  122. Package user defined info.
  123. Args:
  124. user_dict(dict): User defined info dict.
  125. Returns:
  126. LineageEvent, the proto message event contains user defined info.
  127. """
  128. user_event = LineageEvent()
  129. user_event.wall_time = time.time()
  130. user_defined_info = user_event.user_defined_info
  131. _package_user_defined_info(user_dict, user_defined_info)
  132. return user_event
  133. def _package_user_defined_info(user_defined_dict, user_defined_message):
  134. """
  135. Setting attribute in user defined proto message.
  136. Args:
  137. user_defined_dict (dict): User define info dict.
  138. user_defined_message (LineageEvent): Proto message of user defined info.
  139. Raises:
  140. LineageParamValueError: When the value is out of range.
  141. LineageParamTypeError: When given a type not support yet.
  142. """
  143. for key, value in user_defined_dict.items():
  144. if not isinstance(key, str):
  145. error_msg = f"Invalid key type in user defined info. The {key}'s type" \
  146. f"'{type(key).__name__}' is not supported. It should be str."
  147. log.error(error_msg)
  148. if isinstance(value, int):
  149. attr_name = "map_int32"
  150. elif isinstance(value, float):
  151. attr_name = "map_double"
  152. elif isinstance(value, str):
  153. attr_name = "map_str"
  154. else:
  155. attr_name = "attr_name"
  156. add_user_defined_info = user_defined_message.user_info.add()
  157. try:
  158. getattr(add_user_defined_info, attr_name)[key] = value
  159. except AttributeError:
  160. error_msg = f"Invalid value type in user defined info. The {value}'s type" \
  161. f"'{type(value).__name__}' is not supported. It should be float, int or str."
  162. log.error(error_msg)
  163. def get_lineage_file_name():
  164. """
  165. Get lineage file name.
  166. Lineage filename format is:
  167. EVENT_FILE_NAME_MARK + "summary." + time(seconds) + "." + Hostname + lineage_suffix.
  168. Returns:
  169. str, the name of event log file.
  170. """
  171. time_second = str(int(time.time()))
  172. hostname = socket.gethostname()
  173. file_name = f'{EVENT_FILE_NAME_MARK}summary.{time_second}.{hostname}{LINEAGE_FILE_NAME_MARK}'
  174. return file_name