You can not select more than 25 topics Topics must start with a chinese character,a letter or number, can include dashes ('-') and can be up to 35 characters long.

explain_manager.py 12 kB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319
  1. # Copyright 2020 Huawei Technologies Co., Ltd
  2. #
  3. # Licensed under the Apache License, Version 2.0 (the "License");
  4. # you may not use this file except in compliance with the License.
  5. # You may obtain a copy of the License at
  6. #
  7. # http://www.apache.org/licenses/LICENSE-2.0
  8. #
  9. # Unless required by applicable law or agreed to in writing, software
  10. # distributed under the License is distributed on an "AS IS" BASIS,
  11. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. # See the License for the specific language governing permissions and
  13. # limitations under the License.
  14. # ============================================================================
  15. """ExplainManager."""
  16. import os
  17. import threading
  18. import time
  19. from mindinsight.datavisual.common import exceptions
  20. from mindinsight.datavisual.common.enums import BaseEnum
  21. from mindinsight.explainer.common.log import logger
  22. from mindinsight.explainer.manager.explain_job import ExplainJob
  23. from mindinsight.datavisual.data_access.file_handler import FileHandler
  24. from mindinsight.datavisual.data_transform.summary_watcher import SummaryWatcher
  25. from mindinsight.utils.exceptions import MindInsightException, ParamValueError, UnknownError
  26. _MAX_LOADER_NUM = 3
  27. _MAX_INTERVAL = 3
  28. class _ExplainManagerStatus(BaseEnum):
  29. """Manager status."""
  30. INIT = 'INIT'
  31. LOADING = 'LOADING'
  32. DONE = 'DONE'
  33. INVALID = 'INVALID'
  34. class ExplainManager:
  35. """ExplainManager."""
  36. def __init__(self, summary_base_dir: str):
  37. self._summary_base_dir = summary_base_dir
  38. self._loader_pool = {}
  39. self._deleted_ids = []
  40. self._status = _ExplainManagerStatus.INIT.value
  41. self._status_mutex = threading.Lock()
  42. self._loader_pool_mutex = threading.Lock()
  43. self._max_loader_num = _MAX_LOADER_NUM
  44. self._reload_interval = None
  45. def _reload_data(self):
  46. """periodically load summary from file."""
  47. while True:
  48. try:
  49. self._load_data()
  50. if not self._reload_interval:
  51. break
  52. time.sleep(self._reload_interval)
  53. except UnknownError as ex:
  54. logger.exception(ex)
  55. logger.error('Unknown Error raise when loading summary files, status: %r, and loader pool size is %r.'
  56. 'Detail: %s', self._status, len(self._loader_pool), str(ex))
  57. self._status = _ExplainManagerStatus.INVALID.value
  58. def _load_data(self):
  59. """Loading the summary in the given base directory."""
  60. logger.info('Start to load data, reload interval: %r.', self._reload_interval)
  61. with self._status_mutex:
  62. if self._status == _ExplainManagerStatus.LOADING.value:
  63. logger.info('Current status is %s, will ignore to load data.', self._status)
  64. return
  65. self._status = _ExplainManagerStatus.LOADING.value
  66. try:
  67. self._generate_loaders()
  68. self._execute_load_data()
  69. except Exception as ex:
  70. raise UnknownError(ex)
  71. if not self._loader_pool:
  72. self._status = _ExplainManagerStatus.INVALID.value
  73. else:
  74. self._status = _ExplainManagerStatus.DONE.value
  75. logger.info('Load event data end, status: %r, and loader pool size is %r',
  76. self._status, len(self._loader_pool))
  77. def _update_loader_latest_update_time(self, loader_id, latest_update_time=None):
  78. """update the update time of loader of given id."""
  79. if latest_update_time is None:
  80. latest_update_time = time.time()
  81. self._loader_pool[loader_id].latest_update_time = latest_update_time
  82. def _delete_loader(self, loader_id):
  83. """delete loader given loader_id"""
  84. if self._loader_pool.get(loader_id, None) is not None:
  85. self._loader_pool.pop(loader_id)
  86. logger.debug('delete loader %s', loader_id)
  87. def _add_loader(self, loader):
  88. """add loader to the loader_pool."""
  89. if len(self._loader_pool) >= _MAX_LOADER_NUM:
  90. delete_num = len(self._loader_pool) - _MAX_LOADER_NUM + 1
  91. sorted_loaders = sorted(
  92. self._loader_pool.items(),
  93. key=lambda x: x[1].latest_update_time)
  94. for index in range(delete_num):
  95. delete_loader_id = sorted_loaders[index][0]
  96. self._delete_loader(delete_loader_id)
  97. self._loader_pool.update({loader.loader_id: loader})
  98. def _deal_loaders(self, latest_loaders):
  99. """"update the loader pool."""
  100. with self._loader_pool_mutex:
  101. for loader_id, loader in latest_loaders:
  102. if self._loader_pool.get(loader_id, None) is None:
  103. self._add_loader(loader)
  104. continue
  105. if (self._loader_pool[loader_id].latest_update_time
  106. < loader.latest_update_time):
  107. self._update_loader_latest_update_time(
  108. loader_id, loader.latest_update_time)
  109. @staticmethod
  110. def _generate_loader_id(relative_path):
  111. """Generate loader id for given path"""
  112. loader_id = relative_path
  113. return loader_id
  114. @staticmethod
  115. def _generate_loader_name(relative_path):
  116. """Generate_loader name for given path."""
  117. loader_name = relative_path
  118. return loader_name
  119. def _generate_loader_by_relative_path(self, relative_path: str) -> ExplainJob:
  120. """Generate explain job from given relative path."""
  121. current_dir = os.path.realpath(FileHandler.join(
  122. self._summary_base_dir, relative_path
  123. ))
  124. loader_id = self._generate_loader_id(relative_path)
  125. loader = ExplainJob(
  126. job_id=loader_id,
  127. summary_dir=current_dir,
  128. create_time=ExplainJob.get_create_time(current_dir),
  129. latest_update_time=ExplainJob.get_update_time(current_dir))
  130. return loader
  131. def _generate_loaders(self):
  132. """Generate job loaders from the summary watcher."""
  133. dir_map_mtime_dict = {}
  134. loader_dict = {}
  135. min_modify_time = None
  136. _, summaries = SummaryWatcher().list_explain_directories(
  137. self._summary_base_dir)
  138. for item in summaries:
  139. relative_path = item.get('relative_path')
  140. modify_time = item.get('update_time').timestamp()
  141. loader_id = self._generate_loader_id(relative_path)
  142. loader = self._loader_pool.get(loader_id, None)
  143. if loader is not None and loader.latest_update_time > modify_time:
  144. modify_time = loader.latest_update_time
  145. if min_modify_time is None:
  146. min_modify_time = modify_time
  147. if len(dir_map_mtime_dict) < _MAX_LOADER_NUM:
  148. if modify_time < min_modify_time:
  149. min_modify_time = modify_time
  150. dir_map_mtime_dict.update({relative_path: modify_time})
  151. else:
  152. if modify_time >= min_modify_time:
  153. dir_map_mtime_dict.update({relative_path: modify_time})
  154. sorted_dir_tuple = sorted(dir_map_mtime_dict.items(),
  155. key=lambda d: d[1])[-_MAX_LOADER_NUM:]
  156. for relative_path, modify_time in sorted_dir_tuple:
  157. loader_id = self._generate_loader_id(relative_path)
  158. loader = self._generate_loader_by_relative_path(relative_path)
  159. loader_dict.update({loader_id: loader})
  160. sorted_loaders = sorted(loader_dict.items(),
  161. key=lambda x: x[1].latest_update_time)
  162. latest_loaders = sorted_loaders[-_MAX_LOADER_NUM:]
  163. self._deal_loaders(latest_loaders)
  164. def _execute_loader(self, loader_id):
  165. """Execute the data loading."""
  166. try:
  167. with self._loader_pool_mutex:
  168. loader = self._loader_pool.get(loader_id, None)
  169. if loader is None:
  170. logger.debug('Loader %r has been deleted, will not load'
  171. 'data', loader_id)
  172. return
  173. loader.load()
  174. except MindInsightException as ex:
  175. logger.warning('Data loader %r load data failed. Delete data_loader. Detail: %s', loader_id, ex)
  176. with self._loader_pool_mutex:
  177. self._delete_loader(loader_id)
  178. def _execute_load_data(self):
  179. """Execute the loader in the pool to load data."""
  180. loader_pool = self._get_snapshot_loader_pool()
  181. for loader_id in loader_pool:
  182. self._execute_loader(loader_id)
  183. def _get_snapshot_loader_pool(self):
  184. """Get snapshot of loader_pool."""
  185. with self._loader_pool_mutex:
  186. return dict(self._loader_pool)
  187. def _check_status_valid(self):
  188. """Check manager status."""
  189. if self._status == _ExplainManagerStatus.INIT.value:
  190. raise exceptions.SummaryLogIsLoading('Data is loading, current status is %s' % self._status)
  191. @staticmethod
  192. def _check_train_id_valid(train_id: str):
  193. """Verify the train_id is valid."""
  194. if not train_id.startswith('./'):
  195. logger.warning('train_id does not start with "./"')
  196. return False
  197. if len(train_id.split('/')) > 2:
  198. logger.warning('train_id contains multiple "/"')
  199. return False
  200. return True
  201. def _check_train_job_exist(self, train_id):
  202. """Verify thee train_job is existed given train_id."""
  203. if train_id in self._loader_pool:
  204. return
  205. self._check_train_id_valid(train_id)
  206. if SummaryWatcher().is_summary_directory(self._summary_base_dir, train_id):
  207. return
  208. raise ParamValueError('Can not find the train job in the manager, train_id: %s' % train_id)
  209. def _reload_data_again(self):
  210. """Reload the data one more time."""
  211. logger.debug('Start to reload data again.')
  212. thread = threading.Thread(target=self._load_data,
  213. name='reload_data_thread')
  214. thread.daemon = False
  215. thread.start()
  216. def _get_job(self, train_id):
  217. """Retrieve train_job given train_id."""
  218. is_reload = False
  219. with self._loader_pool_mutex:
  220. loader = self._loader_pool.get(train_id, None)
  221. if loader is None:
  222. relative_path = train_id
  223. temp_loader = self._generate_loader_by_relative_path(
  224. relative_path)
  225. if temp_loader is None:
  226. return None
  227. self._add_loader(temp_loader)
  228. is_reload = True
  229. if is_reload:
  230. self._reload_data_again()
  231. return loader
  232. @property
  233. def summary_base_dir(self):
  234. """Return the base directory for summary records."""
  235. return self._summary_base_dir
  236. def get_job(self, train_id):
  237. """
  238. Return ExplainJob given train_id.
  239. If explain job w.r.t given train_id is not found, None will be returned.
  240. Args:
  241. train_id (str): The id of expected ExplainJob
  242. Return:
  243. explain_job
  244. """
  245. self._check_status_valid()
  246. self._check_train_job_exist(train_id)
  247. loader = self._get_job(train_id)
  248. if loader is None:
  249. return None
  250. return loader
  251. def start_load_data(self, reload_interval=_MAX_INTERVAL):
  252. """
  253. Start threads for loading data.
  254. Args:
  255. reload_interval (int): interval to reload the summary from file
  256. """
  257. self._reload_interval = reload_interval
  258. thread = threading.Thread(target=self._reload_data, name='start_load_data_thread')
  259. thread.daemon = True
  260. thread.start()
  261. # wait for data loading
  262. time.sleep(1)