You can not select more than 25 topics Topics must start with a chinese character,a letter or number, can include dashes ('-') and can be up to 35 characters long.

lineage_summary_analyzer.py 8.4 kB

5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262
  1. # Copyright 2020 Huawei Technologies Co., Ltd
  2. #
  3. # Licensed under the Apache License, Version 2.0 (the "License");
  4. # you may not use this file except in compliance with the License.
  5. # You may obtain a copy of the License at
  6. #
  7. # http://www.apache.org/licenses/LICENSE-2.0
  8. #
  9. # Unless required by applicable law or agreed to in writing, software
  10. # distributed under the License is distributed on an "AS IS" BASIS,
  11. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. # See the License for the specific language governing permissions and
  13. # limitations under the License.
  14. # ============================================================================
  15. """This module provides python APIs to get lineage summary from summary log."""
  16. import struct
  17. from collections import namedtuple
  18. from enum import Enum
  19. from google.protobuf.json_format import MessageToDict
  20. from google.protobuf.message import DecodeError
  21. from mindinsight.datavisual.proto_files.mindinsight_lineage_pb2 import LineageEvent
  22. from mindinsight.datavisual.utils import crc32
  23. from mindinsight.lineagemgr.common.exceptions.exceptions import MindInsightException, \
  24. LineageVerificationException, LineageSummaryAnalyzeException
  25. from mindinsight.lineagemgr.common.log import logger as log
  26. from mindinsight.lineagemgr.common.validator.validate_path import safe_normalize_path
  27. from mindinsight.lineagemgr.summary.file_handler import FileHandler
  28. LineageInfo = namedtuple('LineageInfo', ['train_lineage', 'eval_lineage', 'dataset_graph'])
  29. class SummaryTag(Enum):
  30. """The tag value of lineage fields."""
  31. # the value is `field_number << 3 | wire_type`
  32. WALL_TIME = 'wall_time'
  33. STEP = 'step'
  34. VERSION = 'version'
  35. GRAPH = 'graph'
  36. SUMMARY = 'summary'
  37. TRAIN_LINEAGE = 'train_lineage'
  38. EVAL_LINEAGE = 'evaluation_lineage'
  39. DATASET_GRAPH = 'dataset_graph'
  40. class SummaryAnalyzer:
  41. """
  42. Summary log Analyzer.
  43. Args:
  44. file_path (str): The path of summary log.
  45. Raises:
  46. LineageVerificationException: Raise when verification failed.
  47. """
  48. HEADER_SIZE = 8
  49. HEADER_CRC_SIZE = 4
  50. BODY_CRC_SIZE = 4
  51. def __init__(self, file_path):
  52. self.file_handler = FileHandler(file_path)
  53. def load_events(self):
  54. """
  55. Load events in summary log.
  56. Returns:
  57. generator, the event generator.
  58. """
  59. while self._has_next():
  60. yield self._read_event()
  61. def _has_next(self):
  62. """
  63. Check if the file has reached the end.
  64. Returns:
  65. bool, whether the file has reached the end.
  66. """
  67. current_offset = self.file_handler.tell()
  68. if current_offset < self.file_handler.size:
  69. return True
  70. return False
  71. def _read_event(self):
  72. """
  73. Read event.
  74. Returns:
  75. LineageEvent, the event body.
  76. """
  77. body_size = self._read_header()
  78. body_str = self._read_body(body_size)
  79. event = LineageEvent().FromString(body_str)
  80. return event
  81. def _read_header(self):
  82. """
  83. Read header information.
  84. Returns:
  85. int, the length of event body.
  86. """
  87. header_str = self.file_handler.read(self.HEADER_SIZE)
  88. header_crc_str = self.file_handler.read(self.HEADER_CRC_SIZE)
  89. SummaryAnalyzer._check_crc(header_str, header_crc_str)
  90. body_len = struct.unpack("<Q", header_str)[0]
  91. return body_len
  92. def _read_body(self, body_size):
  93. """
  94. Read event body information.
  95. Args:
  96. body_size (int): The size of event body.
  97. Returns:
  98. bytes, the event body in bytes.
  99. """
  100. body_str = self.file_handler.read(body_size)
  101. body_crc_str = self.file_handler.read(self.BODY_CRC_SIZE)
  102. SummaryAnalyzer._check_crc(body_str, body_crc_str)
  103. return body_str
  104. @staticmethod
  105. def _check_crc(source_str, crc_str):
  106. """
  107. Check the integrity of source string.
  108. Args:
  109. source_str (bytes): Source string in bytes.
  110. crc_str (bytes): CRC string of source string in bytes.
  111. Raises:
  112. LineageVerificationException: Raise when verification failed.
  113. """
  114. if not crc32.CheckValueAgainstData(crc_str, source_str, len(source_str)):
  115. log.debug("The CRC verification not pass. source_str: %s. crc_str: %s.", source_str, crc_str)
  116. raise LineageVerificationException("The CRC verification failed.")
  117. class LineageSummaryAnalyzer(SummaryAnalyzer):
  118. """
  119. Summary log analyzer for lineage information.
  120. Args:
  121. file_path (str): The path of summary log.
  122. Raises:
  123. LineageSummaryAnalyzeException: If failed to get lineage information.
  124. """
  125. def __init__(self, file_path):
  126. file_path = safe_normalize_path(file_path, 'lineage_summary_path', None)
  127. super(LineageSummaryAnalyzer, self).__init__(file_path)
  128. def get_latest_info(self):
  129. """
  130. Get latest lineage info in summary log file.
  131. Returns:
  132. LineageInfo, the lineage summary information.
  133. """
  134. lineage_events = {
  135. SummaryTag.TRAIN_LINEAGE: None,
  136. SummaryTag.EVAL_LINEAGE: None,
  137. SummaryTag.DATASET_GRAPH: None
  138. }
  139. for event in self.load_events():
  140. for tag, _ in lineage_events.items():
  141. if event.HasField(tag.value):
  142. lineage_events[tag] = event
  143. break
  144. lineage_info = LineageInfo(
  145. train_lineage=lineage_events.get(SummaryTag.TRAIN_LINEAGE),
  146. eval_lineage=lineage_events.get(SummaryTag.EVAL_LINEAGE),
  147. dataset_graph=lineage_events.get(SummaryTag.DATASET_GRAPH)
  148. )
  149. return lineage_info
  150. @classmethod
  151. def get_summary_infos(cls, file_path):
  152. """
  153. Get lineage summary information from summary log file.
  154. Args:
  155. file_path (str): The file path of summary log.
  156. Returns:
  157. LineageInfo, the lineage summary information.
  158. Raises:
  159. LineageSummaryAnalyzeException: If failed to get lineage information.
  160. """
  161. analyzer = cls(file_path)
  162. err_msg = "Can not analyze lineage info, file path is %s. Detail: %s"
  163. try:
  164. lineage_info = analyzer.get_latest_info()
  165. except (MindInsightException, IOError, DecodeError) as err:
  166. log.debug(err_msg, file_path, str(err))
  167. raise LineageSummaryAnalyzeException(str(err))
  168. except Exception as err:
  169. log.debug(err_msg, file_path, str(err))
  170. raise LineageSummaryAnalyzeException(str(err))
  171. return lineage_info
  172. @staticmethod
  173. def get_user_defined_info(file_path):
  174. """
  175. Get user defined info.
  176. Args:
  177. file_path (str): The file path of summary log.
  178. Returns:
  179. list, the list of dict format user defined information
  180. which converted from proto message.
  181. """
  182. all_user_message = []
  183. summary_analyzer = SummaryAnalyzer(file_path)
  184. for event in summary_analyzer.load_events():
  185. if event.HasField("user_defined_info"):
  186. user_defined_info = MessageToDict(
  187. event,
  188. preserving_proto_field_name=True
  189. ).get("user_defined_info")
  190. user_dict = LineageSummaryAnalyzer._get_dict_from_proto(user_defined_info)
  191. all_user_message.append(user_dict)
  192. return all_user_message
  193. @staticmethod
  194. def _get_dict_from_proto(user_defined_info):
  195. """
  196. Convert the proto message UserDefinedInfo to its dict format.
  197. Args:
  198. user_defined_info (UserDefinedInfo): The proto message of user defined info.
  199. Returns:
  200. dict, the converted dict.
  201. """
  202. user_dict = dict()
  203. proto_dict = user_defined_info.get("user_info")
  204. for proto_item in proto_dict:
  205. if proto_item and isinstance(proto_item, dict):
  206. key, value = list(list(proto_item.values())[0].items())[0]
  207. if isinstance(value, dict):
  208. user_dict[key] = LineageSummaryAnalyzer._get_dict_from_proto(value)
  209. else:
  210. user_dict[key] = value
  211. return user_dict