You can not select more than 25 topics Topics must start with a chinese character,a letter or number, can include dashes ('-') and can be up to 35 characters long.

lineage_parser.py 9.1 kB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227
  1. # Copyright 2020 Huawei Technologies Co., Ltd
  2. #
  3. # Licensed under the Apache License, Version 2.0 (the "License");
  4. # you may not use this file except in compliance with the License.
  5. # You may obtain a copy of the License at
  6. #
  7. # http://www.apache.org/licenses/LICENSE-2.0
  8. #
  9. # Unless required by applicable law or agreed to in writing, software
  10. # distributed under the License is distributed on an "AS IS" BASIS,
  11. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. # See the License for the specific language governing permissions and
  13. # limitations under the License.
  14. # ============================================================================
  15. """This file is used to parse lineage info."""
  16. import os
  17. from mindinsight.datavisual.data_transform.summary_watcher import SummaryWatcher
  18. from mindinsight.lineagemgr.common.exceptions.exceptions import LineageSummaryAnalyzeException, \
  19. LineageEventNotExistException, LineageEventFieldNotExistException, LineageFileNotFoundError, \
  20. MindInsightException
  21. from mindinsight.lineagemgr.common.log import logger
  22. from mindinsight.lineagemgr.common.path_parser import SummaryPathParser
  23. from mindinsight.lineagemgr.summary.file_handler import FileHandler
  24. from mindinsight.lineagemgr.summary.lineage_summary_analyzer import LineageSummaryAnalyzer
  25. from mindinsight.lineagemgr.querier.query_model import LineageObj
  26. from mindinsight.utils.exceptions import ParamValueError
  27. LINEAGE = "lineage"
  28. class SuperLineageObj:
  29. """This is an object for LineageObj and its additional info."""
  30. def __init__(self, lineage_obj: LineageObj, update_time, added_info=None):
  31. self._lineage_obj = lineage_obj
  32. self._update_time = update_time
  33. self._added_info = added_info if added_info is not None else dict()
  34. @property
  35. def lineage_obj(self):
  36. """Get lineage object."""
  37. return self._lineage_obj
  38. @property
  39. def added_info(self):
  40. """Get added info."""
  41. return self._added_info
  42. @added_info.setter
  43. def added_info(self, added_info):
  44. """Set added info."""
  45. self._added_info = added_info
  46. @property
  47. def update_time(self):
  48. """Get update time."""
  49. return self._update_time
  50. class LineageParser:
  51. """Lineage parser."""
  52. def __init__(self, summary_dir, update_time=None, added_info=None):
  53. self._summary_dir = summary_dir
  54. self.update_time = update_time
  55. self._added_info = added_info
  56. self._init_variables()
  57. self.load()
  58. def _init_variables(self):
  59. """Init variables."""
  60. self._super_lineage_obj = None
  61. self._latest_filename = None
  62. self._latest_file_size = None
  63. self._cached_file_list = None
  64. def load(self):
  65. """Find and load summaries."""
  66. # get sorted lineage files
  67. lineage_files = SummaryPathParser.get_lineage_summaries(self._summary_dir, is_sorted=True)
  68. if not lineage_files:
  69. logger.info('There is no summary log file under summary_dir %s.', self._summary_dir)
  70. raise LineageFileNotFoundError(
  71. 'There is no summary log file under summary_dir.'
  72. )
  73. self._init_if_files_deleted(lineage_files)
  74. index = 0
  75. if self._latest_filename is not None:
  76. index = lineage_files.index(self._latest_filename)
  77. for filename in lineage_files[index:]:
  78. if filename != self._latest_filename:
  79. self._latest_filename = filename
  80. self._latest_file_size = 0
  81. file_path = os.path.join(self._summary_dir, filename)
  82. new_size = FileHandler(file_path).size
  83. if new_size == self._latest_file_size:
  84. continue
  85. self._latest_file_size = new_size
  86. self._parse_summary_log()
  87. def _init_if_files_deleted(self, file_list):
  88. """Init variables if files deleted."""
  89. cached_file_list = self._cached_file_list
  90. self._cached_file_list = file_list
  91. if cached_file_list is None:
  92. return
  93. deleted_files = set(cached_file_list) - set(file_list)
  94. if deleted_files:
  95. logger.info("There are some files has been deleted, "
  96. "all files will be reloaded in path %s.", self._summary_dir)
  97. self._init_variables()
  98. def _parse_summary_log(self):
  99. """
  100. Parse the single summary log.
  101. Returns:
  102. bool, `True` if parse summary log success, else `False`.
  103. """
  104. file_path = os.path.realpath(os.path.join(self._summary_dir, self._latest_filename))
  105. try:
  106. lineage_info = LineageSummaryAnalyzer.get_summary_infos(file_path)
  107. user_defined_info = LineageSummaryAnalyzer.get_user_defined_info(file_path)
  108. self._update_lineage_obj(lineage_info, user_defined_info)
  109. except LineageSummaryAnalyzeException:
  110. logger.warning("Parse file failed under summary_dir %s.", file_path)
  111. except (LineageEventNotExistException, LineageEventFieldNotExistException) as error:
  112. logger.warning("Parse file failed under summary_dir %s. Detail: %s.", file_path, str(error))
  113. except MindInsightException as error:
  114. logger.exception(error)
  115. logger.warning("Parse file failed under summary_dir %s.", file_path)
  116. def _update_lineage_obj(self, lineage_info, user_defined_info):
  117. """Update lineage object."""
  118. if self._super_lineage_obj is None:
  119. lineage_obj = LineageObj(
  120. self._summary_dir,
  121. train_lineage=lineage_info.train_lineage,
  122. evaluation_lineage=lineage_info.eval_lineage,
  123. dataset_graph=lineage_info.dataset_graph,
  124. user_defined_info=user_defined_info
  125. )
  126. self._super_lineage_obj = SuperLineageObj(lineage_obj, self.update_time, self._added_info)
  127. else:
  128. self._super_lineage_obj.lineage_obj.parse_and_update_lineage(
  129. train_lineage=lineage_info.train_lineage,
  130. evaluation_lineage=lineage_info.eval_lineage,
  131. dataset_graph=lineage_info.dataset_graph,
  132. user_defined_info=user_defined_info
  133. )
  134. @property
  135. def super_lineage_obj(self):
  136. """Get super lineage object."""
  137. return self._super_lineage_obj
  138. class LineageOrganizer:
  139. """Lineage organizer."""
  140. def __init__(self, data_manager=None, summary_base_dir=None):
  141. self._data_manager = data_manager
  142. self._summary_base_dir = summary_base_dir
  143. self._check_params()
  144. self._super_lineage_objs = {}
  145. self._organize_from_cache()
  146. self._organize_from_disk()
  147. def _check_params(self):
  148. """Check params."""
  149. if self._data_manager is not None and self._summary_base_dir is not None:
  150. self._summary_base_dir = None
  151. def _organize_from_disk(self):
  152. """Organize lineage objs from disk."""
  153. if self._summary_base_dir is None:
  154. return
  155. summary_watcher = SummaryWatcher()
  156. relative_dirs = summary_watcher.list_summary_directories(
  157. summary_base_dir=self._summary_base_dir
  158. )
  159. no_lineage_count = 0
  160. for item in relative_dirs:
  161. relative_dir = item.get('relative_path')
  162. update_time = item.get('update_time')
  163. abs_summary_dir = os.path.realpath(os.path.join(self._summary_base_dir, relative_dir))
  164. try:
  165. lineage_parser = LineageParser(abs_summary_dir, update_time)
  166. super_lineage_obj = lineage_parser.super_lineage_obj
  167. if super_lineage_obj is not None:
  168. self._super_lineage_objs.update({abs_summary_dir: super_lineage_obj})
  169. except LineageFileNotFoundError:
  170. no_lineage_count += 1
  171. if no_lineage_count == len(relative_dirs):
  172. logger.info('There is no summary log file under summary_base_dir.')
  173. raise LineageFileNotFoundError(
  174. 'There is no summary log file under summary_base_dir.'
  175. )
  176. def _organize_from_cache(self):
  177. """Organize lineage objs from cache."""
  178. if self._data_manager is None:
  179. return
  180. brief_cache = self._data_manager.get_brief_cache()
  181. cache_items = brief_cache.cache_items
  182. for relative_dir, cache_train_job in cache_items.items():
  183. try:
  184. super_lineage_obj = cache_train_job.get("lineage").super_lineage_obj
  185. self._super_lineage_objs.update({relative_dir: super_lineage_obj})
  186. except ParamValueError:
  187. logger.info("This is no lineage info in train job %s.", relative_dir)
  188. @property
  189. def super_lineage_objs(self):
  190. """Get super lineage objects."""
  191. return self._super_lineage_objs
  192. def get_super_lineage_obj(self, relative_path):
  193. """Get super lineage object by given relative path."""
  194. return self._super_lineage_objs.get(relative_path)