You can not select more than 25 topics Topics must start with a chinese character,a letter or number, can include dashes ('-') and can be up to 35 characters long.

lineage_parser.py 9.0 kB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226
  1. # Copyright 2020 Huawei Technologies Co., Ltd
  2. #
  3. # Licensed under the Apache License, Version 2.0 (the "License");
  4. # you may not use this file except in compliance with the License.
  5. # You may obtain a copy of the License at
  6. #
  7. # http://www.apache.org/licenses/LICENSE-2.0
  8. #
  9. # Unless required by applicable law or agreed to in writing, software
  10. # distributed under the License is distributed on an "AS IS" BASIS,
  11. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. # See the License for the specific language governing permissions and
  13. # limitations under the License.
  14. # ============================================================================
  15. """This file is used to parse lineage info."""
  16. import os
  17. from mindinsight.datavisual.data_transform.summary_watcher import SummaryWatcher
  18. from mindinsight.lineagemgr.common.exceptions.exceptions import LineageSummaryAnalyzeException, \
  19. LineageEventNotExistException, LineageEventFieldNotExistException, LineageFileNotFoundError, \
  20. MindInsightException
  21. from mindinsight.lineagemgr.common.log import logger
  22. from mindinsight.lineagemgr.common.path_parser import SummaryPathParser
  23. from mindinsight.lineagemgr.summary.file_handler import FileHandler
  24. from mindinsight.lineagemgr.summary.lineage_summary_analyzer import LineageSummaryAnalyzer
  25. from mindinsight.lineagemgr.querier.query_model import LineageObj
  26. from mindinsight.utils.exceptions import ParamValueError
  27. LINEAGE = "lineage"
  28. class SuperLineageObj:
  29. """This is an object for LineageObj and its additional info."""
  30. def __init__(self, lineage_obj: LineageObj, update_time, added_info=None):
  31. self._lineage_obj = lineage_obj
  32. self._update_time = update_time
  33. self._added_info = added_info if added_info is not None else dict()
  34. @property
  35. def lineage_obj(self):
  36. """Get lineage object."""
  37. return self._lineage_obj
  38. @property
  39. def added_info(self):
  40. """Get added info."""
  41. return self._added_info
  42. @added_info.setter
  43. def added_info(self, added_info):
  44. """Set added info."""
  45. self._added_info = added_info
  46. @property
  47. def update_time(self):
  48. """Get update time."""
  49. return self._update_time
  50. class LineageParser:
  51. """Lineage parser."""
  52. def __init__(self, summary_dir, update_time=None, added_info=None):
  53. self._summary_dir = summary_dir
  54. self.update_time = update_time
  55. self._added_info = added_info
  56. self._init_variables()
  57. self.load()
  58. def _init_variables(self):
  59. """Init variables."""
  60. self._super_lineage_obj = None
  61. self._latest_filename = None
  62. self._latest_file_size = None
  63. self._cached_file_list = None
  64. def load(self):
  65. """Find and load summaries."""
  66. # get sorted lineage files
  67. lineage_files = SummaryPathParser.get_lineage_summaries(self._summary_dir, is_sorted=True)
  68. if not lineage_files:
  69. logger.info('There is no summary log file under summary_dir %s.', self._summary_dir)
  70. raise LineageFileNotFoundError(
  71. 'There is no summary log file under summary_dir.'
  72. )
  73. self._init_if_files_deleted(lineage_files)
  74. index = 0
  75. if self._latest_filename is not None:
  76. index = lineage_files.index(self._latest_filename)
  77. for filename in lineage_files[index:]:
  78. if filename != self._latest_filename:
  79. self._latest_filename = filename
  80. self._latest_file_size = 0
  81. file_path = os.path.join(self._summary_dir, filename)
  82. new_size = FileHandler(file_path).size
  83. if new_size == self._latest_file_size:
  84. continue
  85. self._latest_file_size = new_size
  86. self._parse_summary_log()
  87. def _init_if_files_deleted(self, file_list):
  88. """Init variables if files deleted."""
  89. cached_file_list = self._cached_file_list
  90. self._cached_file_list = file_list
  91. if cached_file_list is None:
  92. return
  93. deleted_files = set(cached_file_list) - set(file_list)
  94. if deleted_files:
  95. logger.info("There are some files has been deleted, "
  96. "all files will be reloaded in path %s.", self._summary_dir)
  97. self._init_variables()
  98. def _parse_summary_log(self):
  99. """
  100. Parse the single summary log.
  101. Returns:
  102. bool, `True` if parse summary log success, else `False`.
  103. """
  104. file_path = os.path.realpath(os.path.join(self._summary_dir, self._latest_filename))
  105. lineage_info = LineageSummaryAnalyzer.get_summary_infos(file_path)
  106. user_defined_info = LineageSummaryAnalyzer.get_user_defined_info(file_path)
  107. self._update_lineage_obj(lineage_info, user_defined_info)
  108. def _update_lineage_obj(self, lineage_info, user_defined_info):
  109. """Update lineage object."""
  110. if self._super_lineage_obj is None:
  111. lineage_obj = LineageObj(
  112. self._summary_dir,
  113. train_lineage=lineage_info.train_lineage,
  114. evaluation_lineage=lineage_info.eval_lineage,
  115. dataset_graph=lineage_info.dataset_graph,
  116. user_defined_info=user_defined_info
  117. )
  118. self._super_lineage_obj = SuperLineageObj(lineage_obj, self.update_time, self._added_info)
  119. else:
  120. self._super_lineage_obj.lineage_obj.parse_and_update_lineage(
  121. train_lineage=lineage_info.train_lineage,
  122. evaluation_lineage=lineage_info.eval_lineage,
  123. dataset_graph=lineage_info.dataset_graph,
  124. user_defined_info=user_defined_info
  125. )
  126. @property
  127. def super_lineage_obj(self):
  128. """Get super lineage object."""
  129. return self._super_lineage_obj
  130. class LineageOrganizer:
  131. """Lineage organizer."""
  132. def __init__(self, data_manager=None, summary_base_dir=None):
  133. self._data_manager = data_manager
  134. self._summary_base_dir = summary_base_dir
  135. self._check_params()
  136. self._super_lineage_objs = {}
  137. self._organize_from_cache()
  138. self._organize_from_disk()
  139. def _check_params(self):
  140. """Check params."""
  141. if self._data_manager is not None and self._summary_base_dir is not None:
  142. self._summary_base_dir = None
  143. def _organize_from_disk(self):
  144. """Organize lineage objs from disk."""
  145. if self._summary_base_dir is None:
  146. return
  147. summary_watcher = SummaryWatcher()
  148. relative_dirs = summary_watcher.list_summary_directories(
  149. summary_base_dir=self._summary_base_dir
  150. )
  151. no_lineage_count = 0
  152. for item in relative_dirs:
  153. relative_dir = item.get('relative_path')
  154. update_time = item.get('update_time')
  155. abs_summary_dir = os.path.realpath(os.path.join(self._summary_base_dir, relative_dir))
  156. try:
  157. lineage_parser = LineageParser(abs_summary_dir, update_time)
  158. super_lineage_obj = lineage_parser.super_lineage_obj
  159. if super_lineage_obj is not None:
  160. self._super_lineage_objs.update({abs_summary_dir: super_lineage_obj})
  161. except LineageFileNotFoundError:
  162. no_lineage_count += 1
  163. except (LineageSummaryAnalyzeException,
  164. LineageEventNotExistException,
  165. LineageEventFieldNotExistException) as error:
  166. logger.warning("Parse file failed under summary_dir %s. Detail: %s.", relative_dir, str(error))
  167. except MindInsightException as error:
  168. logger.exception(error)
  169. logger.warning("Parse file failed under summary_dir %s.", relative_dir)
  170. if no_lineage_count == len(relative_dirs):
  171. logger.info('There is no summary log file under summary_base_dir.')
  172. raise LineageFileNotFoundError(
  173. 'There is no summary log file under summary_base_dir.'
  174. )
  175. def _organize_from_cache(self):
  176. """Organize lineage objs from cache."""
  177. if self._data_manager is None:
  178. return
  179. brief_cache = self._data_manager.get_brief_cache()
  180. cache_items = brief_cache.cache_items
  181. for relative_dir, cache_train_job in cache_items.items():
  182. try:
  183. super_lineage_obj = cache_train_job.get("lineage").super_lineage_obj
  184. self._super_lineage_objs.update({relative_dir: super_lineage_obj})
  185. except ParamValueError:
  186. logger.debug("This is no lineage info in train job %s.", relative_dir)
  187. @property
  188. def super_lineage_objs(self):
  189. """Get super lineage objects."""
  190. return self._super_lineage_objs
  191. def get_super_lineage_obj(self, relative_path):
  192. """Get super lineage object by given relative path."""
  193. return self._super_lineage_objs.get(relative_path)