You can not select more than 25 topics Topics must start with a chinese character,a letter or number, can include dashes ('-') and can be up to 35 characters long.

explain_manager.py 13 kB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311
  1. # Copyright 2020 Huawei Technologies Co., Ltd
  2. #
  3. # Licensed under the Apache License, Version 2.0 (the "License");
  4. # you may not use this file except in compliance with the License.
  5. # You may obtain a copy of the License at
  6. #
  7. # http://www.apache.org/licenses/LICENSE-2.0
  8. #
  9. # Unless required by applicable law or agreed to in writing, software
  10. # distributed under the License is distributed on an "AS IS" BASIS,
  11. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. # See the License for the specific language governing permissions and
  13. # limitations under the License.
  14. # ============================================================================
  15. """ExplainManager."""
  16. from collections import OrderedDict
  17. import os
  18. import threading
  19. import time
  20. from datetime import datetime
  21. from typing import Optional
  22. from mindinsight.conf import settings
  23. from mindinsight.datavisual.common import exceptions
  24. from mindinsight.datavisual.common.enums import BaseEnum
  25. from mindinsight.datavisual.data_access.file_handler import FileHandler
  26. from mindinsight.datavisual.data_transform.summary_watcher import SummaryWatcher
  27. from mindinsight.explainer.common.log import logger
  28. from mindinsight.explainer.manager.explain_loader import ExplainLoader
  29. from mindinsight.utils.exceptions import MindInsightException, ParamValueError, UnknownError
  30. _MAX_LOADERS_NUM = 3
  31. class _ExplainManagerStatus(BaseEnum):
  32. """Manager status."""
  33. INIT = 'INIT'
  34. LOADING = 'LOADING'
  35. STOPPING = 'STOPPING'
  36. DONE = 'DONE'
  37. class ExplainManager:
  38. """ExplainManager."""
  39. def __init__(self, summary_base_dir: str):
  40. self._summary_base_dir = summary_base_dir
  41. self._loader_pool = OrderedDict()
  42. self._loading_status = _ExplainManagerStatus.INIT.value
  43. self._status_mutex = threading.Lock()
  44. self._load_data_mutex = threading.Lock()
  45. self._loader_pool_mutex = threading.Lock()
  46. self._max_loaders_num = _MAX_LOADERS_NUM
  47. self._summary_watcher = SummaryWatcher()
  48. @property
  49. def summary_base_dir(self):
  50. """Return the base directory for summary records."""
  51. return self._summary_base_dir
  52. def start_load_data(self, reload_interval: int = 0):
  53. """
  54. Start individual thread to cache explain_jobs and loading summary data periodically.
  55. Args:
  56. reload_interval (int): Specify the loading period in seconds. If interval == 0, data will only be loaded
  57. once. Default: 0.
  58. """
  59. thread = threading.Thread(target=self._repeat_loading,
  60. name='explainer.start_load_thread',
  61. args=(reload_interval,),
  62. daemon=True)
  63. thread.start()
  64. def get_job(self, loader_id: str) -> Optional[ExplainLoader]:
  65. """
  66. Return ExplainLoader given loader_id.
  67. If explain job w.r.t given loader_id is not found, None will be returned.
  68. Args:
  69. loader_id (str): The id of expected ExplainLoader
  70. Return:
  71. explain_job
  72. """
  73. self._check_status_valid()
  74. with self._loader_pool_mutex:
  75. if loader_id in self._loader_pool:
  76. self._loader_pool[loader_id].query_time = datetime.now().timestamp()
  77. self._loader_pool.move_to_end(loader_id, last=True)
  78. return self._loader_pool[loader_id]
  79. try:
  80. loader = self._generate_loader_from_relative_path(loader_id)
  81. loader.query_time = datetime.now().timestamp()
  82. self._add_loader(loader)
  83. self._reload_data_again()
  84. except ParamValueError:
  85. logger.warning('Cannot find summary in path: %s. No explain_job will be returned.', loader_id)
  86. return None
  87. return loader
  88. def get_job_list(self, offset=0, limit=None):
  89. """
  90. Return List of explain jobs. includes job ID, create and update time.
  91. Args:
  92. offset (int): An offset for page. Ex, offset is 0, mean current page is 1. Default value is 0.
  93. limit (int): The max data items for per page. Default value is 10.
  94. Returns:
  95. tuple[total, directories], total indicates the overall number of explain directories and directories
  96. indicate list of summary directory info including the following attributes.
  97. - relative_path (str): Relative path of summary directory, referring to settings.SUMMARY_BASE_DIR,
  98. starting with "./".
  99. - create_time (datetime): Creation time of summary file.
  100. - update_time (datetime): Modification time of summary file.
  101. """
  102. total, dir_infos = \
  103. self._summary_watcher.list_explain_directories(self._summary_base_dir, offset=offset, limit=limit)
  104. return total, dir_infos
  105. def _repeat_loading(self, repeat_interval):
  106. """Periodically loading summary."""
  107. # Allocate CPU resources to enable gunicorn to start the web service.
  108. time.sleep(1)
  109. while True:
  110. try:
  111. if self.status == _ExplainManagerStatus.STOPPING.value:
  112. logger.debug('Current loading status is %s, we will not trigger repeat loading.',
  113. _ExplainManagerStatus.STOPPING.value)
  114. else:
  115. logger.info('Starts triggering repeat loading, repeat interval: %r.', repeat_interval)
  116. self._load_data()
  117. if not repeat_interval:
  118. return
  119. time.sleep(repeat_interval)
  120. except UnknownError as ex:
  121. logger.error('Unexpected error happens when loading data. Loading status: %s, loading pool size: %d'
  122. 'Detail: %s', self.status, len(self._loader_pool), str(ex))
  123. def _load_data(self):
  124. """
  125. Prepare loaders in cache and start loading the data from summaries.
  126. Only a limited number of loaders will be cached in terms of updated_time or query_time. The size of cache
  127. pool is determined by _MAX_LOADERS_NUM. When the manager start loading data, only the latest _MAX_LOADER_NUM
  128. summaries will be loaded in cache. If a cached loader if queries by 'get_job', the query_time of the loader
  129. will be updated as well as the the loader moved to the end of cache. If an uncached summary is queried,
  130. a new loader instance will be generated and put to the end cache.
  131. """
  132. try:
  133. with self._load_data_mutex:
  134. if self.status == _ExplainManagerStatus.LOADING.value:
  135. logger.info('Current status is %s, will ignore to load data.', self.status)
  136. return
  137. logger.info('Start to load data, and status change to %s.', _ExplainManagerStatus.LOADING.value)
  138. self.status = _ExplainManagerStatus.LOADING.value
  139. self._cache_loaders()
  140. if self.status == _ExplainManagerStatus.STOPPING.value:
  141. logger.info('The manager status has been %s, will not execute loading.', self.status)
  142. return
  143. self._execute_loading()
  144. logger.info('Load event data end, current status: %s, next status: %s, loader pool size: %d.',
  145. self.status, _ExplainManagerStatus.DONE.value, len(self._loader_pool))
  146. except Exception as ex:
  147. logger.exception(ex)
  148. raise UnknownError(str(ex))
  149. finally:
  150. self.status = _ExplainManagerStatus.DONE.value
  151. def _cache_loaders(self):
  152. """Cache explain loader in cache pool."""
  153. dir_map_mtimes = []
  154. _, summaries_info = self._summary_watcher.list_explain_directories(self._summary_base_dir)
  155. for summary_info in summaries_info:
  156. summary_path = summary_info.get('relative_path')
  157. summary_update_time = summary_info.get('update_time').timestamp()
  158. if summary_path in self._loader_pool:
  159. summary_update_time = max(summary_update_time, self._loader_pool[summary_path].query_time)
  160. dir_map_mtimes.append((summary_info, summary_update_time))
  161. sorted_summaries_info = sorted(dir_map_mtimes, key=lambda x: x[1])[-_MAX_LOADERS_NUM:]
  162. with self._loader_pool_mutex:
  163. for summary_info, query_time in sorted_summaries_info:
  164. summary_path = summary_info['relative_path']
  165. if summary_path not in self._loader_pool:
  166. loader = self._generate_loader_from_relative_path(summary_path)
  167. # The added loader by automatically refresh, using file creation time as the query time
  168. self._add_loader(loader)
  169. else:
  170. self._loader_pool[summary_path].query_time = query_time
  171. self._loader_pool.move_to_end(summary_path, last=True)
  172. def _generate_loader_from_relative_path(self, relative_path: str) -> ExplainLoader:
  173. """Generate explain loader from the given relative path."""
  174. self._check_summary_exist(relative_path)
  175. current_dir = os.path.realpath(FileHandler.join(self._summary_base_dir, relative_path))
  176. loader_id = self._generate_loader_id(relative_path)
  177. loader = ExplainLoader(loader_id=loader_id, summary_dir=current_dir)
  178. return loader
  179. def _add_loader(self, loader):
  180. """add loader to the loader_pool."""
  181. if loader.train_id not in self._loader_pool:
  182. self._loader_pool[loader.train_id] = loader
  183. else:
  184. self._loader_pool.move_to_end(loader.train_id, last=True)
  185. while len(self._loader_pool) > self._max_loaders_num:
  186. self._loader_pool.popitem(last=False)
  187. def _execute_loading(self):
  188. """Execute the data loading."""
  189. # We will load the newest loader first.
  190. for loader_id in list(self._loader_pool.keys())[::-1]:
  191. try:
  192. with self._loader_pool_mutex:
  193. loader = self._loader_pool.get(loader_id, None)
  194. if loader is None:
  195. logger.debug('Loader %r has been deleted, will not load data.', loader_id)
  196. continue
  197. if self.status == _ExplainManagerStatus.STOPPING.value:
  198. logger.info('Loader %s status is %s, will return.', loader_id, loader.status)
  199. return
  200. loader.load()
  201. except MindInsightException as ex:
  202. logger.warning('Data loader %r load data failed. Delete data_loader. Detail: %s.', loader_id, ex)
  203. with self._loader_pool_mutex:
  204. self._delete_loader(loader_id)
  205. def _delete_loader(self, loader_id):
  206. """Delete loader given loader_id."""
  207. if loader_id in self._loader_pool:
  208. self._loader_pool.pop(loader_id)
  209. logger.debug('delete loader %s, and stop this loader loading data.', loader_id)
  210. def _check_status_valid(self):
  211. """Check manager status."""
  212. if self.status == _ExplainManagerStatus.INIT.value:
  213. raise exceptions.SummaryLogIsLoading('Data is loading, current status is %s' % self.status)
  214. def _check_summary_exist(self, loader_id):
  215. """Verify thee train_job is existed given loader_id."""
  216. if not self._summary_watcher.is_summary_directory(self._summary_base_dir, loader_id):
  217. raise ParamValueError('Can not find the train job in the manager.')
  218. def _reload_data_again(self):
  219. """Reload the data one more time."""
  220. logger.debug('Start to reload data again.')
  221. def _wrapper():
  222. if self.status == _ExplainManagerStatus.STOPPING.value:
  223. return
  224. self._stop_load_data()
  225. self._load_data()
  226. thread = threading.Thread(target=_wrapper, name='explainer.reload_data_thread')
  227. thread.daemon = False
  228. thread.start()
  229. def _stop_load_data(self):
  230. """Stop loading data, status changes to Stopping."""
  231. if self.status != _ExplainManagerStatus.LOADING.value:
  232. return
  233. logger.info('Start to stop loading data, set status to %s.', _ExplainManagerStatus.STOPPING.value)
  234. self.status = _ExplainManagerStatus.STOPPING.value
  235. for loader in self._loader_pool.values():
  236. loader.stop()
  237. while self.status != _ExplainManagerStatus.DONE.value:
  238. continue
  239. logger.info('Stop loading data end.')
  240. @property
  241. def status(self):
  242. """Get the status of this manager with lock."""
  243. with self._status_mutex:
  244. return self._loading_status
  245. @status.setter
  246. def status(self, status):
  247. """Set the status of this manager with lock."""
  248. with self._status_mutex:
  249. self._loading_status = status
  250. @staticmethod
  251. def _generate_loader_id(relative_path):
  252. """Generate loader id for given path"""
  253. loader_id = relative_path
  254. return loader_id
  255. EXPLAIN_MANAGER = ExplainManager(summary_base_dir=settings.SUMMARY_BASE_DIR)