You can not select more than 25 topics Topics must start with a chinese character,a letter or number, can include dashes ('-') and can be up to 35 characters long.

explain_manager.py 13 kB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310
  1. # Copyright 2020-2021 Huawei Technologies Co., Ltd
  2. #
  3. # Licensed under the Apache License, Version 2.0 (the "License");
  4. # you may not use this file except in compliance with the License.
  5. # You may obtain a copy of the License at
  6. #
  7. # http://www.apache.org/licenses/LICENSE-2.0
  8. #
  9. # Unless required by applicable law or agreed to in writing, software
  10. # distributed under the License is distributed on an "AS IS" BASIS,
  11. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. # See the License for the specific language governing permissions and
  13. # limitations under the License.
  14. # ============================================================================
  15. """ExplainManager."""
  16. import os
  17. import threading
  18. import time
  19. from collections import OrderedDict
  20. from datetime import datetime
  21. from typing import Optional
  22. from mindinsight.conf import settings
  23. from mindinsight.datavisual.common import exceptions
  24. from mindinsight.datavisual.common.enums import BaseEnum
  25. from mindinsight.datavisual.data_access.file_handler import FileHandler
  26. from mindinsight.datavisual.data_transform.summary_watcher import SummaryWatcher
  27. from mindinsight.explainer.common.log import logger
  28. from mindinsight.explainer.manager.explain_loader import ExplainLoader, _LoaderStatus
  29. from mindinsight.utils.exceptions import ParamValueError, UnknownError
  30. _MAX_LOADERS_NUM = 3
  31. class _ExplainManagerStatus(BaseEnum):
  32. """Manager status."""
  33. INIT = 'INIT'
  34. LOADING = 'LOADING'
  35. STOPPING = 'STOPPING'
  36. DONE = 'DONE'
  37. class ExplainManager:
  38. """ExplainManager."""
  39. def __init__(self, summary_base_dir: str):
  40. self._summary_base_dir = summary_base_dir
  41. self._loader_pool = OrderedDict()
  42. self._loading_status = _ExplainManagerStatus.INIT.value
  43. self._status_mutex = threading.Lock()
  44. self._load_data_mutex = threading.Lock()
  45. self._loader_pool_mutex = threading.Lock()
  46. self._max_loaders_num = _MAX_LOADERS_NUM
  47. self._summary_watcher = SummaryWatcher()
  48. @property
  49. def summary_base_dir(self):
  50. """Return the base directory for summary records."""
  51. return self._summary_base_dir
  52. def start_load_data(self, reload_interval: int = 0):
  53. """
  54. Start individual thread to cache explain_jobs and loading summary data periodically.
  55. Args:
  56. reload_interval (int): Specify the loading period in seconds. If interval == 0, data will only be loaded
  57. once. Default: 0.
  58. """
  59. thread = threading.Thread(target=self._repeat_loading,
  60. name='explainer.start_load_thread',
  61. args=(reload_interval,),
  62. daemon=True)
  63. thread.start()
  64. def get_job(self, loader_id: str) -> Optional[ExplainLoader]:
  65. """
  66. Return ExplainLoader given loader_id.
  67. If explain job w.r.t given loader_id is not found, None will be returned.
  68. Args:
  69. loader_id (str): The id of expected ExplainLoader.
  70. Returns:
  71. ExplainLoader, the data loader specified by loader_id.
  72. """
  73. self._check_status_valid()
  74. with self._loader_pool_mutex:
  75. if loader_id in self._loader_pool:
  76. self._loader_pool[loader_id].query_time = datetime.now().timestamp()
  77. self._loader_pool.move_to_end(loader_id, last=True)
  78. loader = self._loader_pool[loader_id]
  79. if loader.status == _LoaderStatus.STOP.value:
  80. self._reload_data_again()
  81. return self._loader_pool[loader_id]
  82. try:
  83. loader = self._generate_loader_from_relative_path(loader_id)
  84. loader.query_time = datetime.now().timestamp()
  85. self._add_loader(loader)
  86. self._reload_data_again()
  87. except ParamValueError:
  88. logger.warning('Cannot find summary in path: %s. No explain_job will be returned.', loader_id)
  89. return None
  90. return loader
  91. def get_job_list(self, offset=0, limit=None):
  92. """
  93. Return List of explain jobs. includes job ID, create and update time.
  94. Args:
  95. offset (int): An offset for page. Ex, offset is 0, mean current page is 1. Default: 0.
  96. limit (int): The max data items for per page. Default: 10.
  97. Returns:
  98. tuple, the elements of the returned tuple are:
  99. - total (int): The overall number of explain directories
  100. - dir_infos (list): List of summary directory info including the following attributes:
  101. - relative_path (str): Relative path of summary directory, referring to settings.SUMMARY_BASE_DIR,
  102. starting with "./".
  103. - create_time (datetime): Creation time of summary file.
  104. - update_time (datetime): Modification time of summary file.
  105. """
  106. total, dir_infos = \
  107. self._summary_watcher.list_explain_directories(self._summary_base_dir, offset=offset, limit=limit)
  108. return total, dir_infos
  109. def _repeat_loading(self, repeat_interval):
  110. """Periodically loading summary."""
  111. # Allocate CPU resources to enable gunicorn to start the web service.
  112. time.sleep(1)
  113. while True:
  114. try:
  115. if self.status == _ExplainManagerStatus.STOPPING.value:
  116. logger.debug('Current loading status is %s, we will not trigger repeat loading.',
  117. _ExplainManagerStatus.STOPPING.value)
  118. else:
  119. logger.info('Starts triggering repeat loading, repeat interval: %r.', repeat_interval)
  120. self._load_data()
  121. if not repeat_interval:
  122. return
  123. time.sleep(repeat_interval)
  124. except UnknownError as ex:
  125. logger.error('Unexpected error happens when loading data. Loading status: %s, loading pool size: %d'
  126. 'Detail: %s', self.status, len(self._loader_pool), str(ex))
  127. def _load_data(self):
  128. """
  129. Prepare loaders in cache and start loading the data from summaries.
  130. Only a limited number of loaders will be cached in terms of updated_time or query_time. The size of cache
  131. pool is determined by _MAX_LOADERS_NUM. When the manager start loading data, only the latest _MAX_LOADER_NUM
  132. summaries will be loaded in cache. If a cached loader if queries by 'get_job', the query_time of the loader
  133. will be updated as well as the the loader moved to the end of cache. If an uncached summary is queried,
  134. a new loader instance will be generated and put to the end cache.
  135. """
  136. try:
  137. with self._load_data_mutex:
  138. if self.status == _ExplainManagerStatus.LOADING.value:
  139. logger.info('Current status is %s, will ignore to load data.', self.status)
  140. return
  141. logger.info('Start to load data, and status change to %s.', _ExplainManagerStatus.LOADING.value)
  142. self.status = _ExplainManagerStatus.LOADING.value
  143. self._cache_loaders()
  144. if self.status == _ExplainManagerStatus.STOPPING.value:
  145. logger.info('The manager status has been %s, will not execute loading.', self.status)
  146. return
  147. self._execute_loading()
  148. logger.info('Load event data end, current status: %s, next status: %s, loader pool size: %d.',
  149. self.status, _ExplainManagerStatus.DONE.value, len(self._loader_pool))
  150. except Exception as ex:
  151. logger.exception(ex)
  152. raise UnknownError(str(ex))
  153. finally:
  154. self.status = _ExplainManagerStatus.DONE.value
  155. def _cache_loaders(self):
  156. """Cache explain loader in cache pool."""
  157. dir_map_mtimes = []
  158. _, summaries_info = self._summary_watcher.list_explain_directories(self._summary_base_dir)
  159. for summary_info in summaries_info:
  160. summary_path = summary_info.get('relative_path')
  161. summary_update_time = summary_info.get('update_time').timestamp()
  162. if summary_path in self._loader_pool:
  163. summary_update_time = max(summary_update_time, self._loader_pool[summary_path].query_time)
  164. dir_map_mtimes.append((summary_info, summary_update_time))
  165. sorted_summaries_info = sorted(dir_map_mtimes, key=lambda x: x[1])[-_MAX_LOADERS_NUM:]
  166. with self._loader_pool_mutex:
  167. for summary_info, query_time in sorted_summaries_info:
  168. summary_path = summary_info['relative_path']
  169. if summary_path not in self._loader_pool:
  170. loader = self._generate_loader_from_relative_path(summary_path)
  171. # The added loader by automatically refresh, using file creation time as the query time
  172. self._add_loader(loader)
  173. else:
  174. self._loader_pool[summary_path].query_time = query_time
  175. self._loader_pool.move_to_end(summary_path, last=True)
  176. def _generate_loader_from_relative_path(self, relative_path: str) -> ExplainLoader:
  177. """Generate explain loader from the given relative path."""
  178. self._check_summary_exist(relative_path)
  179. current_dir = os.path.realpath(FileHandler.join(self._summary_base_dir, relative_path))
  180. loader_id = self._generate_loader_id(relative_path)
  181. loader = ExplainLoader(loader_id=loader_id, summary_dir=current_dir)
  182. return loader
  183. def _add_loader(self, loader):
  184. """Add loader to the loader_pool."""
  185. if loader.train_id not in self._loader_pool:
  186. self._loader_pool[loader.train_id] = loader
  187. else:
  188. self._loader_pool.move_to_end(loader.train_id, last=True)
  189. while len(self._loader_pool) > self._max_loaders_num:
  190. self._loader_pool.popitem(last=False)
  191. def _execute_loading(self):
  192. """Execute the data loading."""
  193. # We will load the newest loader first.
  194. for loader_id in list(self._loader_pool.keys())[::-1]:
  195. with self._loader_pool_mutex:
  196. loader = self._loader_pool.get(loader_id, None)
  197. if loader is None:
  198. logger.debug('Loader %r has been deleted, will not load data.', loader_id)
  199. continue
  200. if self.status == _ExplainManagerStatus.STOPPING.value:
  201. logger.info('Loader %s status is %s, will return.', loader_id, loader.status)
  202. return
  203. loader.load()
  204. def _delete_loader(self, loader_id):
  205. """Delete loader given loader_id."""
  206. if loader_id in self._loader_pool:
  207. self._loader_pool.pop(loader_id)
  208. logger.debug('delete loader %s, and stop this loader loading data.', loader_id)
  209. def _check_status_valid(self):
  210. """Check manager status."""
  211. if self.status == _ExplainManagerStatus.INIT.value:
  212. raise exceptions.SummaryLogIsLoading('Data is loading, current status is %s' % self.status)
  213. def _check_summary_exist(self, loader_id):
  214. """Verify thee train_job is existed given loader_id."""
  215. if not self._summary_watcher.is_summary_directory(self._summary_base_dir, loader_id):
  216. raise ParamValueError('Can not find the train job in the manager.')
  217. def _reload_data_again(self):
  218. """Reload the data one more time."""
  219. logger.debug('Start to reload data again.')
  220. def _wrapper():
  221. if self.status == _ExplainManagerStatus.STOPPING.value:
  222. return
  223. self._stop_load_data()
  224. self._load_data()
  225. thread = threading.Thread(target=_wrapper, name='explainer.reload_data_thread')
  226. thread.daemon = False
  227. thread.start()
  228. def _stop_load_data(self):
  229. """Stop loading data, status changes to Stopping."""
  230. if self.status != _ExplainManagerStatus.LOADING.value:
  231. return
  232. logger.info('Start to stop loading data, set status to %s.', _ExplainManagerStatus.STOPPING.value)
  233. self.status = _ExplainManagerStatus.STOPPING.value
  234. for loader in self._loader_pool.values():
  235. loader.stop()
  236. while self.status != _ExplainManagerStatus.DONE.value:
  237. continue
  238. logger.info('Stop loading data end.')
  239. @property
  240. def status(self):
  241. """Get the status of this manager with lock."""
  242. with self._status_mutex:
  243. return self._loading_status
  244. @status.setter
  245. def status(self, status):
  246. """Set the status of this manager with lock."""
  247. with self._status_mutex:
  248. self._loading_status = status
  249. @staticmethod
  250. def _generate_loader_id(relative_path):
  251. """Generate loader id for given path"""
  252. loader_id = relative_path
  253. return loader_id
  254. EXPLAIN_MANAGER = ExplainManager(summary_base_dir=settings.SUMMARY_BASE_DIR)