You can not select more than 25 topics Topics must start with a chinese character,a letter or number, can include dashes ('-') and can be up to 35 characters long.

lineage_parser.py 9.4 kB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241
  1. # Copyright 2020 Huawei Technologies Co., Ltd
  2. #
  3. # Licensed under the Apache License, Version 2.0 (the "License");
  4. # you may not use this file except in compliance with the License.
  5. # You may obtain a copy of the License at
  6. #
  7. # http://www.apache.org/licenses/LICENSE-2.0
  8. #
  9. # Unless required by applicable law or agreed to in writing, software
  10. # distributed under the License is distributed on an "AS IS" BASIS,
  11. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. # See the License for the specific language governing permissions and
  13. # limitations under the License.
  14. # ============================================================================
  15. """This file is used to parse lineage info."""
  16. import os
  17. from mindinsight.datavisual.data_transform.summary_watcher import SummaryWatcher
  18. from mindinsight.lineagemgr.common.exceptions.exceptions import LineageSummaryAnalyzeException, \
  19. LineageEventNotExistException, LineageEventFieldNotExistException, LineageFileNotFoundError, \
  20. MindInsightException
  21. from mindinsight.lineagemgr.common.log import logger
  22. from mindinsight.lineagemgr.common.path_parser import SummaryPathParser
  23. from mindinsight.lineagemgr.summary.file_handler import FileHandler
  24. from mindinsight.lineagemgr.summary.lineage_summary_analyzer import LineageSummaryAnalyzer
  25. from mindinsight.lineagemgr.querier.query_model import LineageObj
  26. from mindinsight.utils.exceptions import ParamValueError
  27. LINEAGE = "lineage"
  28. class SuperLineageObj:
  29. """This is an object for LineageObj and its additional info."""
  30. def __init__(self, lineage_obj: LineageObj, update_time, added_info=None):
  31. self._lineage_obj = lineage_obj
  32. self._update_time = update_time
  33. self._added_info = added_info if added_info is not None else dict()
  34. @property
  35. def lineage_obj(self):
  36. """Get lineage object."""
  37. return self._lineage_obj
  38. @property
  39. def added_info(self):
  40. """Get added info."""
  41. return self._added_info
  42. @added_info.setter
  43. def added_info(self, added_info):
  44. """Set added info."""
  45. self._added_info = added_info
  46. @property
  47. def update_time(self):
  48. """Get update time."""
  49. return self._update_time
  50. @update_time.setter
  51. def update_time(self, update_time):
  52. """Set update_time."""
  53. self._update_time = update_time
  54. class LineageParser:
  55. """Lineage parser."""
  56. def __init__(self, train_id, summary_dir, update_time=None, added_info=None):
  57. self._summary_dir = summary_dir
  58. self._train_id = train_id
  59. self._update_time = update_time
  60. self._added_info = added_info
  61. self._init_variables()
  62. self.load()
  63. @property
  64. def update_time(self):
  65. return self._update_time
  66. @update_time.setter
  67. def update_time(self, update_time):
  68. self._update_time = update_time
  69. if self._super_lineage_obj is not None:
  70. self._super_lineage_obj.update_time = update_time
  71. def _init_variables(self):
  72. """Init variables."""
  73. self._super_lineage_obj = None
  74. self._latest_filename = None
  75. self._latest_file_size = None
  76. self._cached_file_list = None
  77. def load(self):
  78. """Find and load summaries."""
  79. # get sorted lineage files
  80. lineage_files = SummaryPathParser.get_lineage_summaries(self._summary_dir, is_sorted=True)
  81. if not lineage_files:
  82. logger.info('There is no summary log file under summary_dir %s.', self._summary_dir)
  83. raise LineageFileNotFoundError(
  84. 'There is no summary log file under summary_dir.'
  85. )
  86. self._init_if_files_deleted(lineage_files)
  87. index = 0
  88. if self._latest_filename is not None:
  89. index = lineage_files.index(self._latest_filename)
  90. for filename in lineage_files[index:]:
  91. if filename != self._latest_filename:
  92. self._latest_filename = filename
  93. self._latest_file_size = 0
  94. file_path = os.path.join(self._summary_dir, filename)
  95. new_size = FileHandler(file_path).size
  96. if new_size == self._latest_file_size:
  97. continue
  98. self._latest_file_size = new_size
  99. try:
  100. self._parse_summary_log()
  101. except (LineageSummaryAnalyzeException,
  102. LineageEventNotExistException,
  103. LineageEventFieldNotExistException) as error:
  104. logger.error("Parse file failed, file_path is %s. Detail: %s", file_path, str(error))
  105. except MindInsightException as error:
  106. logger.exception(error)
  107. logger.error("Parse file failed, file_path is %s.", file_path)
  108. def _init_if_files_deleted(self, file_list):
  109. """Init variables if files deleted."""
  110. cached_file_list = self._cached_file_list
  111. self._cached_file_list = file_list
  112. if cached_file_list is None:
  113. return
  114. deleted_files = set(cached_file_list) - set(file_list)
  115. if deleted_files:
  116. logger.info("There are some files has been deleted, "
  117. "all files will be reloaded in path %s.", self._summary_dir)
  118. self._init_variables()
  119. def _parse_summary_log(self):
  120. """
  121. Parse the single summary log.
  122. Returns:
  123. bool, `True` if parse summary log success, else `False`.
  124. """
  125. file_path = os.path.realpath(os.path.join(self._summary_dir, self._latest_filename))
  126. lineage_info = LineageSummaryAnalyzer.get_summary_infos(file_path)
  127. user_defined_info = LineageSummaryAnalyzer.get_user_defined_info(file_path)
  128. self._update_lineage_obj(lineage_info, user_defined_info)
  129. def _update_lineage_obj(self, lineage_info, user_defined_info):
  130. """Update lineage object."""
  131. if self._super_lineage_obj is None:
  132. lineage_obj = LineageObj(
  133. self._train_id,
  134. train_lineage=lineage_info.train_lineage,
  135. evaluation_lineage=lineage_info.eval_lineage,
  136. dataset_graph=lineage_info.dataset_graph,
  137. user_defined_info=user_defined_info
  138. )
  139. self._super_lineage_obj = SuperLineageObj(lineage_obj, self.update_time, self._added_info)
  140. else:
  141. self._super_lineage_obj.lineage_obj.parse_and_update_lineage(
  142. train_lineage=lineage_info.train_lineage,
  143. evaluation_lineage=lineage_info.eval_lineage,
  144. dataset_graph=lineage_info.dataset_graph,
  145. user_defined_info=user_defined_info
  146. )
  147. @property
  148. def super_lineage_obj(self):
  149. """Get super lineage object."""
  150. return self._super_lineage_obj
  151. class LineageOrganizer:
  152. """Lineage organizer."""
  153. def __init__(self, data_manager=None, summary_base_dir=None):
  154. self._data_manager = data_manager
  155. self._summary_base_dir = summary_base_dir
  156. self._check_params()
  157. self._super_lineage_objs = {}
  158. self._organize_from_cache()
  159. self._organize_from_disk()
  160. def _check_params(self):
  161. """Check params."""
  162. if self._data_manager is not None and self._summary_base_dir is not None:
  163. self._summary_base_dir = None
  164. def _organize_from_disk(self):
  165. """Organize lineage objs from disk."""
  166. if self._summary_base_dir is None:
  167. return
  168. summary_watcher = SummaryWatcher()
  169. relative_dirs = summary_watcher.list_summary_directories(
  170. summary_base_dir=self._summary_base_dir
  171. )
  172. no_lineage_count = 0
  173. for item in relative_dirs:
  174. relative_dir = item.get('relative_path')
  175. update_time = item.get('update_time')
  176. abs_summary_dir = os.path.realpath(os.path.join(self._summary_base_dir, relative_dir))
  177. try:
  178. lineage_parser = LineageParser(relative_dir, abs_summary_dir, update_time)
  179. super_lineage_obj = lineage_parser.super_lineage_obj
  180. if super_lineage_obj is not None:
  181. self._super_lineage_objs.update({abs_summary_dir: super_lineage_obj})
  182. except LineageFileNotFoundError:
  183. no_lineage_count += 1
  184. if no_lineage_count == len(relative_dirs):
  185. logger.info('There is no summary log file under summary_base_dir.')
  186. def _organize_from_cache(self):
  187. """Organize lineage objs from cache."""
  188. if self._data_manager is None:
  189. return
  190. brief_cache = self._data_manager.get_brief_cache()
  191. cache_items = brief_cache.cache_items
  192. for relative_dir, cache_train_job in cache_items.items():
  193. try:
  194. super_lineage_obj = cache_train_job.get("lineage").super_lineage_obj
  195. if super_lineage_obj is not None:
  196. self._super_lineage_objs.update({relative_dir: super_lineage_obj})
  197. except ParamValueError:
  198. logger.debug("This is no lineage info in train job %s.", relative_dir)
  199. @property
  200. def super_lineage_objs(self):
  201. """Get super lineage objects."""
  202. return self._super_lineage_objs
  203. def get_super_lineage_obj(self, relative_path):
  204. """Get super lineage object by given relative path."""
  205. return self._super_lineage_objs.get(relative_path)