You can not select more than 25 topics Topics must start with a chinese character,a letter or number, can include dashes ('-') and can be up to 35 characters long.

explain_parser.py 10 kB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234
  1. # Copyright 2020 Huawei Technologies Co., Ltd
  2. #
  3. # Licensed under the Apache License, Version 2.0 (the "License");
  4. # you may not use this file except in compliance with the License.
  5. # You may obtain a copy of the License at
  6. #
  7. # http://www.apache.org/licenses/LICENSE-2.0
  8. #
  9. # Unless required by applicable law or agreed to in writing, software
  10. # distributed under the License is distributed on an "AS IS" BASIS,
  11. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. # See the License for the specific language governing permissions and
  13. # limitations under the License.
  14. # ============================================================================
  15. """
  16. File parser for MindExplain data.
  17. This module is used to parse the MindExplain log file.
  18. """
  19. from collections import namedtuple
  20. from mindinsight.datavisual.common import exceptions
  21. from mindinsight.datavisual.data_access.file_handler import FileHandler
  22. from mindinsight.datavisual.data_transform.ms_data_loader import _SummaryParser
  23. from mindinsight.datavisual.proto_files import mindinsight_summary_pb2 as summary_pb2
  24. from mindinsight.explainer.common.enums import ExplainFieldsEnum
  25. from mindinsight.explainer.common.log import logger
  26. from mindinsight.utils.exceptions import UnknownError
  27. HEADER_SIZE = 8
  28. CRC_STR_SIZE = 4
  29. MAX_EVENT_STRING = 500000000
  30. BenchmarkContainer = namedtuple('BenchmarkContainer', ['benchmark', 'status'])
  31. MetadataContainer = namedtuple('MetadataContainer', ['metadata', 'status'])
  32. InferfenceContainer = namedtuple('InferenceContainer', ['ground_truth_prob',
  33. 'ground_truth_prob_sd',
  34. 'ground_truth_prob_itl95_low',
  35. 'ground_truth_prob_itl95_hi',
  36. 'predicted_label',
  37. 'predicted_prob',
  38. 'predicted_prob_sd',
  39. 'predicted_prob_itl95_low',
  40. 'predicted_prob_itl95_hi'])
  41. SampleContainer = namedtuple('SampleContainer', ['sample_id', 'image_path', 'ground_truth_label', 'inference',
  42. 'explanation', 'status'])
  43. class ExplainParser(_SummaryParser):
  44. """The summary file parser."""
  45. def __init__(self, summary_dir):
  46. super(ExplainParser, self).__init__(summary_dir)
  47. self._latest_offset = 0
  48. def list_events(self, filenames):
  49. """
  50. Load summary file and parse file content.
  51. Args:
  52. filenames (list[str]): File name list.
  53. Returns:
  54. tuple, will return (file_changed, is_end, event_data),
  55. file_changed (bool): True if the 9latest file is changed.
  56. is_end (bool): True if all the summary files are finished loading.
  57. event_data (dict): return an event data, key is field.
  58. """
  59. summary_files = self.sort_files(filenames)
  60. is_end = False
  61. file_changed = False
  62. event_data = {}
  63. filename = summary_files[-1]
  64. file_path = FileHandler.join(self._summary_dir, filename)
  65. if filename != self._latest_filename:
  66. self._summary_file_handler = FileHandler(file_path, 'rb')
  67. self._latest_filename = filename
  68. self._latest_offset = 0
  69. file_changed = True
  70. new_size = FileHandler.file_stat(file_path).size
  71. if new_size == self._latest_offset:
  72. is_end = True
  73. return file_changed, is_end, event_data
  74. while True:
  75. start_offset = self._summary_file_handler.offset
  76. try:
  77. event_str = self.event_load(self._summary_file_handler)
  78. if event_str is None:
  79. self._summary_file_handler.reset_offset(start_offset)
  80. is_end = True
  81. return file_changed, is_end, event_data
  82. if len(event_str) > MAX_EVENT_STRING:
  83. logger.warning("file_path: %s, event string: %d exceeds %d and drop it.",
  84. self._summary_file_handler.file_path, len(event_str), MAX_EVENT_STRING)
  85. continue
  86. field_list, tensor_value_list = self._event_decode(event_str)
  87. for field, tensor_value in zip(field_list, tensor_value_list):
  88. event_data[field] = tensor_value
  89. logger.debug("Parse summary file offset %d, file path: %s.",
  90. self._summary_file_handler.offset, file_path)
  91. return file_changed, is_end, event_data
  92. except (exceptions.CRCFailedError, exceptions.CRCLengthFailedError) as ex:
  93. self._summary_file_handler.reset_offset(start_offset)
  94. is_end = True
  95. logger.warning("Check crc failed and reset offset, file_path=%s, offset=%s. Detail: %r.",
  96. self._summary_file_handler.file_path, self._summary_file_handler.offset, str(ex))
  97. return file_changed, is_end, event_data
  98. except Exception as ex:
  99. # Note: If an unknown error occurs, we will set the offset to the end of this file,
  100. # which is equivalent to stopping parsing this file. We do not delete the current job
  101. # and retain the data that has been successfully parsed.
  102. self._summary_file_handler.reset_offset(new_size)
  103. # Notice: If the current job is the latest one in the loader pool and the job is deleted,
  104. # the job goes into an infinite cycle of load-fail-delete-reload-load-fail-delete.
  105. # We need to prevent this infinite loop.
  106. logger.error("Parse summary file failed, will set offset to the file end. file_path: %s, "
  107. "offset: %d, detail: %s.", file_path, self._summary_file_handler.offset, str(ex))
  108. logger.exception(ex)
  109. raise UnknownError(str(ex))
  110. finally:
  111. self._latest_offset = self._summary_file_handler.offset
  112. @staticmethod
  113. def _event_decode(event_str):
  114. """
  115. Transform `Event` data to tensor_event and update it to EventsData.
  116. Args:
  117. event_str (str): Message event string in summary proto, data read from file handler.
  118. """
  119. logger.debug("Start to parse event string. Event string len: %s.", len(event_str))
  120. event = summary_pb2.Event.FromString(event_str)
  121. logger.debug("Deserialize event string completed.")
  122. fields = {
  123. 'sample_id': ExplainFieldsEnum.SAMPLE_ID,
  124. 'benchmark': ExplainFieldsEnum.BENCHMARK,
  125. 'metadata': ExplainFieldsEnum.METADATA
  126. }
  127. tensor_event_value = getattr(event, 'explain')
  128. field_list = []
  129. tensor_value_list = []
  130. for field in fields:
  131. if getattr(tensor_event_value, field, None) is None:
  132. continue
  133. if ExplainFieldsEnum.METADATA.value == field and not tensor_event_value.metadata.label:
  134. continue
  135. tensor_value = None
  136. if field == ExplainFieldsEnum.SAMPLE_ID.value:
  137. tensor_value = ExplainParser._add_image_data(tensor_event_value)
  138. elif field == ExplainFieldsEnum.BENCHMARK.value:
  139. tensor_value = ExplainParser._add_benchmark(tensor_event_value)
  140. elif field == ExplainFieldsEnum.METADATA.value:
  141. tensor_value = ExplainParser._add_metadata(tensor_event_value)
  142. logger.debug("Event generated, label is %s, step is %s.", field, event.step)
  143. field_list.append(field)
  144. tensor_value_list.append(tensor_value)
  145. return field_list, tensor_value_list
  146. @staticmethod
  147. def _add_image_data(tensor_event_value):
  148. """
  149. Parse image data based on sample_id in Explain message
  150. Args:
  151. tensor_event_value: the object of Explain message
  152. """
  153. inference = InferfenceContainer(
  154. ground_truth_prob=tensor_event_value.inference.ground_truth_prob,
  155. ground_truth_prob_sd=tensor_event_value.inference.ground_truth_prob_sd,
  156. ground_truth_prob_itl95_low=tensor_event_value.inference.ground_truth_prob_itl95_low,
  157. ground_truth_prob_itl95_hi=tensor_event_value.inference.ground_truth_prob_itl95_hi,
  158. predicted_label=tensor_event_value.inference.predicted_label,
  159. predicted_prob=tensor_event_value.inference.predicted_prob,
  160. predicted_prob_sd=tensor_event_value.inference.predicted_prob_sd,
  161. predicted_prob_itl95_low=tensor_event_value.inference.predicted_prob_itl95_low,
  162. predicted_prob_itl95_hi=tensor_event_value.inference.predicted_prob_itl95_hi
  163. )
  164. sample_data = SampleContainer(
  165. sample_id=tensor_event_value.sample_id,
  166. image_path=tensor_event_value.image_path,
  167. ground_truth_label=tensor_event_value.ground_truth_label,
  168. inference=inference,
  169. explanation=tensor_event_value.explanation,
  170. status=tensor_event_value.status
  171. )
  172. return sample_data
  173. @staticmethod
  174. def _add_benchmark(tensor_event_value):
  175. """
  176. Parse benchmark data from Explain message.
  177. Args:
  178. tensor_event_value: the object of Explain message
  179. Returns:
  180. benchmark_data: An object containing benchmark.
  181. """
  182. benchmark_data = BenchmarkContainer(
  183. benchmark=tensor_event_value.benchmark,
  184. status=tensor_event_value.status
  185. )
  186. return benchmark_data
  187. @staticmethod
  188. def _add_metadata(tensor_event_value):
  189. """
  190. Parse metadata from Explain message.
  191. Args:
  192. tensor_event_value: the object of Explain message
  193. Returns:
  194. benchmark_data: An object containing metadata.
  195. """
  196. metadata_value = MetadataContainer(
  197. metadata=tensor_event_value.metadata,
  198. status=tensor_event_value.status
  199. )
  200. return metadata_value