You can not select more than 25 topics Topics must start with a chinese character,a letter or number, can include dashes ('-') and can be up to 35 characters long.

data_manager.py 38 kB

5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106
  1. # Copyright 2019 Huawei Technologies Co., Ltd
  2. #
  3. # Licensed under the Apache License, Version 2.0 (the "License");
  4. # you may not use this file except in compliance with the License.
  5. # You may obtain a copy of the License at
  6. #
  7. # http://www.apache.org/licenses/LICENSE-2.0
  8. #
  9. # Unless required by applicable law or agreed to in writing, software
  10. # distributed under the License is distributed on an "AS IS" BASIS,
  11. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. # See the License for the specific language governing permissions and
  13. # limitations under the License.
  14. # ============================================================================
  15. """
  16. Management of all events data.
  17. This module exists to all loaders.
  18. It can read events data through the DataLoader.
  19. This module also acts as a thread pool manager.
  20. """
  21. import abc
  22. import datetime
  23. import threading
  24. import time
  25. import os
  26. from typing import Iterable, Optional
  27. from concurrent.futures import ThreadPoolExecutor, wait, ALL_COMPLETED
  28. from mindinsight.datavisual.data_transform.summary_watcher import SummaryWatcher
  29. from mindinsight.conf import settings
  30. from mindinsight.datavisual.common import exceptions
  31. from mindinsight.datavisual.common.enums import CacheStatus
  32. from mindinsight.datavisual.common.log import logger
  33. from mindinsight.datavisual.common.enums import DataManagerStatus, DetailCacheManagerStatus
  34. from mindinsight.datavisual.common.enums import PluginNameEnum
  35. from mindinsight.datavisual.common.exceptions import TrainJobNotExistError
  36. from mindinsight.datavisual.data_transform.loader_generators.loader_generator import MAX_DATA_LOADER_SIZE
  37. from mindinsight.datavisual.data_transform.loader_generators.data_loader_generator import DataLoaderGenerator
  38. from mindinsight.utils.computing_resource_mgr import ComputingResourceManager
  39. from mindinsight.utils.exceptions import MindInsightException
  40. from mindinsight.utils.exceptions import ParamValueError
  41. from mindinsight.utils.exceptions import UnknownError
  42. from mindinsight.datavisual.utils.tools import exception_wrapper
  43. class _BasicTrainJob:
  44. """
  45. Basic info about train job.
  46. Args:
  47. train_id (str): Id of the train job.
  48. abs_summary_base_dir (str): The canonical path of summary base directory. It should be the return value of
  49. realpath().
  50. abs_summary_dir (str): The canonical path of summary directory. It should be the return value of realpath().
  51. create_time (DateTime): The create time of summary directory.
  52. update_time (DateTime): The latest modify time of summary files directly in the summary directory.
  53. profiler_dir (str): The relative path of profiler directory.
  54. """
  55. def __init__(self, train_id, abs_summary_base_dir, abs_summary_dir, create_time, update_time, profiler_dir):
  56. self._train_id = train_id
  57. self._abs_summary_base_dir = abs_summary_base_dir
  58. self._abs_summary_dir = abs_summary_dir
  59. self._create_time = create_time
  60. self._update_time = update_time
  61. self._profiler_dir = profiler_dir
  62. @property
  63. def abs_summary_dir(self):
  64. """Get summary directory path."""
  65. return self._abs_summary_dir
  66. @property
  67. def summary_base_dir(self):
  68. """Get summary base directory path."""
  69. return self._abs_summary_base_dir
  70. @property
  71. def train_id(self):
  72. """Get train id."""
  73. return self._train_id
  74. @property
  75. def profiler_dir(self):
  76. """Get profiler directory path."""
  77. return self._profiler_dir
  78. @property
  79. def create_time(self):
  80. """Get create time."""
  81. return self._create_time
  82. @property
  83. def update_time(self):
  84. """Get update time."""
  85. return self._update_time
  86. class CachedTrainJob:
  87. """
  88. Cache item for BriefCacheManager.
  89. DetailCacheManager will also wrap it's return value with this class.
  90. Args:
  91. basic_info (_BasicTrainJob): Basic info about the train job.
  92. """
  93. def __init__(self, basic_info: _BasicTrainJob):
  94. self._basic_info = basic_info
  95. self._last_access_time = datetime.datetime.utcnow()
  96. # Other cached content is stored here.
  97. self._content = {}
  98. self._cache_status = CacheStatus.NOT_IN_CACHE
  99. self._key_locks = {}
  100. @property
  101. def cache_status(self):
  102. """Get cache status."""
  103. return self._cache_status
  104. @cache_status.setter
  105. def cache_status(self, value):
  106. """Set cache status."""
  107. self._cache_status = value
  108. def update_access_time(self):
  109. """Update last access time of this cache item."""
  110. self._last_access_time = datetime.datetime.utcnow()
  111. @property
  112. def last_access_time(self):
  113. """Get last access time for purposes such as LRU."""
  114. return self._last_access_time
  115. @property
  116. def abs_summary_dir(self):
  117. """Get summary directory path."""
  118. return self._basic_info.abs_summary_dir
  119. @property
  120. def summary_base_dir(self):
  121. """Get summary base directory path."""
  122. return self._basic_info.summary_base_dir
  123. def set(self, key, value):
  124. """Set value to cache."""
  125. self._content[key] = value
  126. def delete(self, key, raise_exception=True):
  127. """Delete key in cache."""
  128. try:
  129. self._content.pop(key)
  130. except KeyError:
  131. if raise_exception:
  132. raise ParamValueError("Delete failed. Invalid cache key({}).".format(key))
  133. def get(self, key, raise_exception=True):
  134. """
  135. Get value from cache.
  136. Args:
  137. key (str): Key of content.
  138. raise_exception (bool): If the key does not exist and
  139. raise_exception is True, it will raise an Exception.
  140. Returns:
  141. Union[Object, None], Return value if key in content,
  142. return False else if raise_exception is False.
  143. Raises:
  144. ParamValueError, if the key does not exist and raise_exception is True.
  145. """
  146. try:
  147. return self._content[key]
  148. except KeyError:
  149. if raise_exception:
  150. raise ParamValueError("Invalid cache key({}).".format(key))
  151. return None
  152. @property
  153. def basic_info(self):
  154. """Get basic train job info."""
  155. return self._basic_info
  156. @basic_info.setter
  157. def basic_info(self, value):
  158. """Set basic train job info."""
  159. self._basic_info = value
  160. def lock_key(self, key):
  161. """Threading lock with given key."""
  162. return self._key_locks.setdefault(key, threading.Lock())
  163. class TrainJob:
  164. """
  165. Train job object.
  166. You must not create TrainJob objects manually. You should always get TrainJob objects from DataManager.
  167. Args:
  168. brief_train_job (CachedTrainJob): Brief info about train job.
  169. detail_train_job (Optional[CachedTrainJob]): Detailed info about train job. Default: None.
  170. """
  171. def __init__(self,
  172. brief_train_job: CachedTrainJob,
  173. detail_train_job: Optional[CachedTrainJob] = None):
  174. self._brief = brief_train_job
  175. self._detail = detail_train_job
  176. if self._detail is None:
  177. self._cache_status = CacheStatus.NOT_IN_CACHE
  178. else:
  179. self._cache_status = self._detail.cache_status
  180. def has_detail(self):
  181. """Whether this train job has detailed info in cache."""
  182. return bool(self._detail is not None)
  183. def get_detail(self, key):
  184. """
  185. Get detail content.
  186. Args:
  187. key (Any): Cache key.
  188. Returns:
  189. Any, cache content.
  190. Raises:
  191. TrainJobDetailNotInCacheError: when this train job has no detail cache.
  192. """
  193. if not self.has_detail():
  194. raise exceptions.TrainJobDetailNotInCacheError()
  195. return self._detail.get(key)
  196. def get_brief(self, key):
  197. """
  198. Get brief content.
  199. Args:
  200. key (Any): Cache key.
  201. Returns:
  202. Any, cache content.
  203. """
  204. return self._brief.get(key)
  205. def get_basic_info(self):
  206. """
  207. Get basic info.
  208. Returns:
  209. basic_info (_BasicTrainJob): Basic info about the train job.
  210. """
  211. return self._brief.basic_info
  212. @property
  213. def cache_status(self):
  214. """Get cache status."""
  215. return self._cache_status
  216. @cache_status.setter
  217. def cache_status(self, cache_status):
  218. """Set cache status."""
  219. self._cache_status = cache_status
  220. class BaseCacheItemUpdater(abc.ABC):
  221. """Abstract base class for other modules to update cache content."""
  222. def update_item(self, cache_item: CachedTrainJob):
  223. """
  224. Update cache item in place.
  225. Args:
  226. cache_item (CachedTrainJob): The cache item to be processed.
  227. """
  228. raise NotImplementedError()
  229. class _BaseCacheManager:
  230. """Base class for cache manager."""
  231. def __init__(self):
  232. # Use dict to remove duplicate updaters.
  233. self._updaters = {}
  234. # key is train_id
  235. self._lock = threading.Lock()
  236. self._cache_items = {}
  237. def size(self):
  238. """Gets used cache slots."""
  239. return len(self._cache_items)
  240. def register_cache_item_updater(self, updater: BaseCacheItemUpdater):
  241. """Register cache item updater."""
  242. self._updaters[updater.__class__.__qualname__] = updater
  243. def get_train_jobs(self):
  244. """Get cached train jobs."""
  245. copied_train_jobs = dict(self._cache_items)
  246. return copied_train_jobs
  247. def get_train_job(self, train_id):
  248. """Get cached train job."""
  249. try:
  250. return self._cache_items[train_id]
  251. except KeyError:
  252. raise TrainJobNotExistError(train_id)
  253. def cache_train_job(self, train_id) -> bool:
  254. """
  255. Cache given train job and update train job's last access time.
  256. This method should return true if reload actions should be taken to cache the train job.
  257. Args:
  258. train_id (str): Train Id.
  259. """
  260. raise NotImplementedError()
  261. def delete_train_job(self, train_id):
  262. """Delete train job from cache."""
  263. if train_id in self._cache_items:
  264. del self._cache_items[train_id]
  265. def has_content(self):
  266. """Whether this cache manager has train jobs."""
  267. return bool(self._cache_items)
  268. def update_cache(self, disk_train_jobs: Iterable[_BasicTrainJob]):
  269. """
  270. Update cache according to given train jobs on disk.
  271. Different cache manager should implement different cache update policies in this method.
  272. Args:
  273. disk_train_jobs (Iterable[_BasicTrainJob]): Train jobs on disk.
  274. """
  275. raise NotImplementedError()
  276. def _merge_with_disk(self, disk_train_jobs: Iterable[_BasicTrainJob]):
  277. """
  278. Merge train jobs in cache with train jobs from disk
  279. This method will remove train jobs not on disk. Call this function with lock for thread safety.
  280. Args:
  281. disk_train_jobs (Iterable[_BasicTrainJob]): Basic train jobs info from disk.
  282. Returns:
  283. dict, a dict containing train jobs to be cached.
  284. """
  285. new_cache_items = {}
  286. for train_job in disk_train_jobs:
  287. if train_job.train_id not in self._cache_items:
  288. new_cache_items[train_job.train_id] = CachedTrainJob(train_job)
  289. else:
  290. reused_train_job = self._cache_items[train_job.train_id]
  291. reused_train_job.basic_info = train_job
  292. new_cache_items[train_job.train_id] = reused_train_job
  293. return new_cache_items
  294. class _BriefCacheManager(_BaseCacheManager):
  295. """A cache manager that holds all disk train jobs on disk."""
  296. def cache_train_job(self, train_id):
  297. """
  298. Cache given train job.
  299. All disk train jobs are cached on every reload, so this method always return false.
  300. Args:
  301. train_id (str): Train Id.
  302. """
  303. if train_id in self._cache_items:
  304. self._cache_items[train_id].update_access_time()
  305. return False
  306. def update_cache(self, disk_train_jobs):
  307. """Update cache."""
  308. with self._lock:
  309. new_cache_items = self._merge_with_disk(disk_train_jobs)
  310. self._cache_items = new_cache_items
  311. for updater in self._updaters.values():
  312. for cache_item in self._cache_items.values():
  313. updater.update_item(cache_item)
  314. @property
  315. def cache_items(self):
  316. """Get cache items."""
  317. return self._cache_items
  318. # Key for plugin tags.
  319. DATAVISUAL_PLUGIN_KEY = "tag_mapping"
  320. # Detail train job cache key for datavisual content.
  321. DATAVISUAL_CACHE_KEY = "datavisual"
  322. class _DetailCacheManager(_BaseCacheManager):
  323. """A cache manager that holds detailed info for most recently used train jobs."""
  324. def __init__(self, loader_generators):
  325. super().__init__()
  326. self._loader_pool = {}
  327. self._deleted_id_list = []
  328. self._loader_pool_mutex = threading.Lock()
  329. self._max_threads_count = 30
  330. self._loader_generators = loader_generators
  331. self._status = DetailCacheManagerStatus.INIT.value
  332. self._loading_mutex = threading.Lock()
  333. @property
  334. def status(self):
  335. """Get loading status, if it is loading, return True."""
  336. return self._status
  337. def has_content(self):
  338. """Whether this cache manager has train jobs."""
  339. return bool(self._loader_pool)
  340. def size(self):
  341. """
  342. Get the number of items in this cache manager.
  343. To be implemented.
  344. Returns:
  345. int, the number of items in this cache manager.
  346. """
  347. raise NotImplementedError()
  348. def loader_pool_size(self):
  349. """Get loader pool size."""
  350. return len(self._loader_pool)
  351. def _load_in_cache(self):
  352. """Generate and execute loaders."""
  353. def load():
  354. self._generate_loaders()
  355. self._execute_load_data()
  356. try:
  357. exception_wrapper(load())
  358. except UnknownError as ex:
  359. logger.warning("Load event data failed. Detail: %s.", str(ex))
  360. finally:
  361. self._status = DetailCacheManagerStatus.DONE.value
  362. logger.info("Load event data end, status: %r, and loader pool size is %r.",
  363. self._status, self.loader_pool_size())
  364. def update_cache(self, disk_train_jobs: Iterable[_BasicTrainJob]):
  365. """
  366. Update cache.
  367. Will switch to using disk_train_jobs in the future.
  368. Args:
  369. disk_train_jobs (Iterable[_BasicTrainJob]): Basic info about train jobs on disk.
  370. """
  371. with self._loading_mutex:
  372. if self._status == DetailCacheManagerStatus.LOADING.value:
  373. logger.debug("Event data is loading, and loader pool size is %r.", self.loader_pool_size())
  374. return
  375. self._status = DetailCacheManagerStatus.LOADING.value
  376. thread = threading.Thread(target=self._load_in_cache, name="load_detail_in_cache")
  377. thread.start()
  378. def cache_train_job(self, train_id):
  379. """Cache given train job."""
  380. loader = None
  381. need_reload = False
  382. with self._loader_pool_mutex:
  383. if self._is_loader_in_loader_pool(train_id, self._loader_pool):
  384. loader = self._loader_pool.get(train_id)
  385. if loader is None:
  386. for generator in self._loader_generators:
  387. tmp_loader = generator.generate_loader_by_train_id(train_id)
  388. if loader and loader.latest_update_time > tmp_loader.latest_update_time:
  389. continue
  390. loader = tmp_loader
  391. if loader is None:
  392. raise TrainJobNotExistError(train_id)
  393. # Update cache status loader to CACHING if loader is NOT_IN_CACHE
  394. # before triggering the next interval.
  395. if loader.cache_status == CacheStatus.NOT_IN_CACHE:
  396. loader.cache_status = CacheStatus.CACHING
  397. self._add_loader(loader)
  398. need_reload = True
  399. self._update_loader_latest_update_time(loader.loader_id)
  400. return need_reload
  401. def get_train_jobs(self):
  402. """
  403. Get train jobs
  404. To be implemented.
  405. """
  406. def _add_loader(self, loader):
  407. """
  408. Add a loader to load data.
  409. Args:
  410. loader (LoaderStruct): A object of `Loader`.
  411. """
  412. if len(self._loader_pool) >= MAX_DATA_LOADER_SIZE:
  413. delete_number = len(self._loader_pool) - MAX_DATA_LOADER_SIZE + 1
  414. sorted_loaders = sorted(self._loader_pool.items(),
  415. key=lambda loader: loader[1].latest_update_time)
  416. for index in range(delete_number):
  417. delete_loader_id = sorted_loaders[index][0]
  418. self._delete_loader(delete_loader_id)
  419. self._loader_pool.update({loader.loader_id: loader})
  420. def _delete_loader(self, loader_id):
  421. """
  422. Delete loader from loader pool by loader id.
  423. Args:
  424. loader_id (str): ID of loader.
  425. """
  426. if self._loader_pool.get(loader_id) is not None:
  427. logger.debug("delete loader %s", loader_id)
  428. self._loader_pool.pop(loader_id)
  429. def _execute_loader(self, loader_id, computing_resource_mgr):
  430. """
  431. Load data form data_loader.
  432. If there is something wrong by loading, add logs and delete the loader.
  433. Args:
  434. loader_id (str): An ID for `Loader`.
  435. computing_resource_mgr (ComputingResourceManager): The ComputingResourceManager instance.
  436. """
  437. try:
  438. with self._loader_pool_mutex:
  439. loader = self._loader_pool.get(loader_id, None)
  440. if loader is None:
  441. logger.debug("Loader %r has been deleted, will not load data.", loader_id)
  442. return
  443. loader.data_loader.load(computing_resource_mgr)
  444. # Update loader cache status to CACHED.
  445. # Loader with cache status CACHED should remain the same cache status.
  446. loader.cache_status = CacheStatus.CACHED
  447. except MindInsightException as ex:
  448. logger.warning("Data loader %r load data failed. "
  449. "Delete data_loader. Detail: %s", loader_id, ex)
  450. with self._loader_pool_mutex:
  451. self._delete_loader(loader_id)
  452. def _generate_loaders(self):
  453. """This function generates the loader from given path."""
  454. loader_dict = {}
  455. for generator in self._loader_generators:
  456. loader_dict.update(generator.generate_loaders(self._loader_pool))
  457. sorted_loaders = sorted(loader_dict.items(), key=lambda loader: loader[1].latest_update_time)
  458. latest_loaders = sorted_loaders[-MAX_DATA_LOADER_SIZE:]
  459. self._deal_loaders(latest_loaders)
  460. def _deal_loaders(self, latest_loaders):
  461. """
  462. This function determines which loaders to keep or remove or added.
  463. It is based on the given dict of loaders.
  464. Args:
  465. latest_loaders (list[dict]): A list of <loader_id: LoaderStruct>.
  466. """
  467. with self._loader_pool_mutex:
  468. for loader_id, loader in latest_loaders:
  469. if self._loader_pool.get(loader_id, None) is None:
  470. self._add_loader(loader)
  471. continue
  472. # If this loader was updated manually before,
  473. # its latest_update_time may bigger than update_time in summary.
  474. if self._loader_pool[loader_id].latest_update_time < loader.latest_update_time:
  475. self._update_loader_latest_update_time(loader_id, loader.latest_update_time)
  476. def _execute_load_data(self):
  477. """Load data through multiple threads."""
  478. threads_count = self._get_threads_count()
  479. if not threads_count:
  480. logger.info("Can not find any valid train log path to load, loader pool is empty.")
  481. return
  482. logger.info("Start to execute load data. threads_count: %s.", threads_count)
  483. with ComputingResourceManager(
  484. executors_cnt=threads_count,
  485. max_processes_cnt=settings.MAX_PROCESSES_COUNT) as computing_resource_mgr:
  486. with ThreadPoolExecutor(max_workers=threads_count) as executor:
  487. futures = []
  488. loader_pool = self._get_snapshot_loader_pool()
  489. for loader_id in loader_pool:
  490. future = executor.submit(self._execute_loader, loader_id, computing_resource_mgr)
  491. futures.append(future)
  492. wait(futures, return_when=ALL_COMPLETED)
  493. def _get_threads_count(self):
  494. """
  495. Use the maximum number of threads available.
  496. Returns:
  497. int, number of threads.
  498. """
  499. threads_count = min(self._max_threads_count, len(self._loader_pool))
  500. return threads_count
  501. def delete_train_job(self, train_id):
  502. """
  503. Delete train job with a train id.
  504. Args:
  505. train_id (str): ID for train job.
  506. """
  507. with self._loader_pool_mutex:
  508. self._delete_loader(train_id)
  509. def list_tensors(self, train_id, tag):
  510. """
  511. List tensors of the given train job and tag.
  512. If the tensor can not find by the given tag, will raise exception.
  513. Args:
  514. train_id (str): ID for train job.
  515. tag (str): The tag name.
  516. Returns:
  517. list, the NameTuple format is `collections.namedtuple('_Tensor', ['wall_time', 'event_step', 'value'])`.
  518. the value will contain the given tag data.
  519. """
  520. loader_pool = self._get_snapshot_loader_pool()
  521. if not self._is_loader_in_loader_pool(train_id, loader_pool):
  522. raise TrainJobNotExistError("Can not find the given train job in cache.")
  523. data_loader = loader_pool[train_id].data_loader
  524. tensors = []
  525. try:
  526. events_data = data_loader.get_events_data()
  527. tensors = events_data.tensors(tag)
  528. except KeyError:
  529. error_msg = "Can not find any data in this train job by given tag."
  530. raise ParamValueError(error_msg)
  531. except AttributeError:
  532. logger.debug("Train job %r has been deleted or it has not loaded data, "
  533. "and set tags to empty list.", train_id)
  534. return tensors
  535. def _check_train_job_exist(self, train_id, loader_pool):
  536. """
  537. Check train job exist, if not exist, will raise exception.
  538. Args:
  539. train_id (str): The given train job id.
  540. loader_pool (dict[str, LoaderStruct]): Refer to self._loader_pool.
  541. Raises:
  542. TrainJobNotExistError: Can not find train job in data manager.
  543. """
  544. is_exist = False
  545. if train_id in loader_pool:
  546. return
  547. for generator in self._loader_generators:
  548. if generator.check_train_job_exist(train_id):
  549. is_exist = True
  550. break
  551. if not is_exist:
  552. raise TrainJobNotExistError("Can not find the train job in data manager.")
  553. def _is_loader_in_loader_pool(self, train_id, loader_pool):
  554. """
  555. Check train job exist, if not exist, return False. Else, return True.
  556. Args:
  557. train_id (str): The given train job id.
  558. loader_pool (dict): See self._loader_pool.
  559. Returns:
  560. bool, if loader in loader pool, return True.
  561. """
  562. if train_id in loader_pool:
  563. return True
  564. return False
  565. def _get_snapshot_loader_pool(self):
  566. """
  567. Create a snapshot of data loader pool to avoid concurrent mutation and iteration issues.
  568. Returns:
  569. dict, a copy of `self._loader_pool`.
  570. """
  571. with self._loader_pool_mutex:
  572. return dict(self._loader_pool)
  573. def get_train_job(self, train_id):
  574. """
  575. Get train job by train ID.
  576. This method overrides parent method.
  577. Args:
  578. train_id (str): Train ID for train job.
  579. Returns:
  580. dict, single train job, if can not find any data, will return None.
  581. """
  582. self._check_train_job_exist(train_id, self._loader_pool)
  583. loader = self._get_loader(train_id)
  584. if loader is None:
  585. logger.info("No valid summary log in train job %s, or it is not in the cache.", train_id)
  586. return None
  587. train_job = loader.to_dict()
  588. train_job.pop('data_loader')
  589. plugin_data = {}
  590. for plugin_name in PluginNameEnum.list_members():
  591. job = self.get_train_job_by_plugin(train_id, plugin_name=plugin_name)
  592. if job is None:
  593. plugin_data[plugin_name] = []
  594. else:
  595. plugin_data[plugin_name] = job['tags']
  596. train_job.update({DATAVISUAL_PLUGIN_KEY: plugin_data})
  597. # Will fill basic_info value in future.
  598. train_job_obj = CachedTrainJob(basic_info=None)
  599. train_job_obj.set(DATAVISUAL_CACHE_KEY, train_job)
  600. train_job_obj.cache_status = loader.cache_status
  601. return train_job_obj
  602. def _get_loader(self, train_id):
  603. """
  604. Get loader by train id.
  605. Args:
  606. train_id (str): Train Id.
  607. Returns:
  608. LoaderStruct, the loader.
  609. """
  610. loader = None
  611. with self._loader_pool_mutex:
  612. if self._is_loader_in_loader_pool(train_id, self._loader_pool):
  613. loader = self._loader_pool.get(train_id)
  614. return loader
  615. def _update_loader_latest_update_time(self, loader_id, latest_update_time=None):
  616. """
  617. Update loader with latest_update_time.
  618. Args:
  619. loader_id (str): ID of loader.
  620. latest_update_time (float): Timestamp.
  621. """
  622. if latest_update_time is None:
  623. latest_update_time = time.time()
  624. self._loader_pool[loader_id].latest_update_time = latest_update_time
  625. def get_train_job_by_plugin(self, train_id, plugin_name):
  626. """
  627. Get a train job by train job id.
  628. If the given train job does not has the given plugin data, the tag list will be empty.
  629. Args:
  630. train_id (str): Get train job info by the given id.
  631. plugin_name (str): Get tags by given plugin.
  632. Returns:
  633. TypedDict('TrainJobEntity', {'id': str, 'name': str, 'tags': List[str]}),
  634. a train job object.
  635. """
  636. self._check_train_job_exist(train_id, self._loader_pool)
  637. loader = self._get_loader(train_id)
  638. if loader is None:
  639. logger.warning("No valid summary log in train job %s, "
  640. "or it is not in the cache.", train_id)
  641. return None
  642. name = loader.name
  643. data_loader = loader.data_loader
  644. tags = []
  645. try:
  646. events_data = data_loader.get_events_data()
  647. tags = events_data.list_tags_by_plugin(plugin_name)
  648. except KeyError:
  649. logger.debug("Plugin name %r does not exist "
  650. "in train job %r, and set tags to empty list.", plugin_name, name)
  651. except AttributeError:
  652. logger.debug("Train job %r has been deleted or it has not loaded data, "
  653. "and set tags to empty list.", name)
  654. result = dict(id=train_id, name=name, tags=tags)
  655. return result
  656. class DataManager:
  657. """
  658. DataManager manages a pool of loader which help access events data.
  659. Each loader helps deal the data of the events.
  660. A loader corresponds to an events_data.
  661. The DataManager build a pool including all the data_loader.
  662. The data_loader provides extracting
  663. method to get the information of events.
  664. """
  665. def __init__(self, summary_base_dir):
  666. """
  667. Initialize the pool of loader and the dict of name-to-path.
  668. Args:
  669. summary_base_dir (str): Base summary directory.
  670. self._status: Refer `datavisual.common.enums.DataManagerStatus`.
  671. """
  672. self._summary_base_dir = os.path.realpath(summary_base_dir)
  673. self._status = DataManagerStatus.INIT.value
  674. self._status_mutex = threading.Lock()
  675. self._reload_interval = 3
  676. loader_generators = [DataLoaderGenerator(self._summary_base_dir)]
  677. self._detail_cache = _DetailCacheManager(loader_generators)
  678. self._brief_cache = _BriefCacheManager()
  679. # This lock is used to make sure that only one self._load_data_in_thread() is running.
  680. # Because self._load_data_in_thread() will create process pool when loading files, we can not
  681. # afford to run multiple self._load_data_in_thread() simultaneously (will create too many processes).
  682. self._load_data_lock = threading.Lock()
  683. @property
  684. def summary_base_dir(self):
  685. """Get summary base dir."""
  686. return self._summary_base_dir
  687. def start_load_data(self,
  688. reload_interval=settings.RELOAD_INTERVAL,
  689. max_threads_count=MAX_DATA_LOADER_SIZE):
  690. """
  691. Start threads for loading data.
  692. Args:
  693. reload_interval (int): Time to reload data once.
  694. max_threads_count (int): Max number of threads of execution.
  695. """
  696. logger.info("Start to load data, reload_interval: %s, "
  697. "max_threads_count: %s.", reload_interval, max_threads_count)
  698. DataManager.check_reload_interval(reload_interval)
  699. DataManager.check_max_threads_count(max_threads_count)
  700. self._reload_interval = reload_interval
  701. self._max_threads_count = max_threads_count
  702. thread = threading.Thread(target=self._reload_data_in_thread,
  703. name='start_load_data_thread')
  704. thread.daemon = True
  705. thread.start()
  706. def _reload_data_in_thread(self):
  707. """This function periodically loads the data."""
  708. # Let gunicorn load other modules first.
  709. time.sleep(1)
  710. while True:
  711. self._load_data_in_thread_wrapper()
  712. if not self._reload_interval:
  713. break
  714. time.sleep(self._reload_interval)
  715. def reload_data(self):
  716. """
  717. Reload the data once.
  718. This function needs to be used after `start_load_data` function.
  719. """
  720. logger.debug("start to reload data")
  721. thread = threading.Thread(target=self._load_data_in_thread_wrapper,
  722. name='reload_data_thread')
  723. thread.daemon = False
  724. thread.start()
  725. def _load_data_in_thread_wrapper(self):
  726. """Wrapper for load data in thread."""
  727. try:
  728. with self._load_data_lock:
  729. exception_wrapper(self._load_data())
  730. except UnknownError as exc:
  731. # Not raising the exception here to ensure that data reloading does not crash.
  732. logger.warning(exc.message)
  733. def _load_data(self):
  734. """This function will load data once and ignore it if the status is loading."""
  735. logger.info("Start to load data, reload interval: %r.", self._reload_interval)
  736. with self._status_mutex:
  737. if self.status == DataManagerStatus.LOADING.value:
  738. logger.debug("Current status is %s , will ignore to load data.", self.status)
  739. return
  740. self.status = DataManagerStatus.LOADING.value
  741. summaries_info = SummaryWatcher().list_summary_directories(self._summary_base_dir)
  742. basic_train_jobs = []
  743. for info in summaries_info:
  744. profiler = info['profiler']
  745. basic_train_jobs.append(_BasicTrainJob(
  746. train_id=info['relative_path'],
  747. abs_summary_base_dir=self._summary_base_dir,
  748. abs_summary_dir=os.path.realpath(os.path.join(
  749. self._summary_base_dir,
  750. info['relative_path']
  751. )),
  752. create_time=info['create_time'],
  753. update_time=info['update_time'],
  754. profiler_dir=None if profiler is None else profiler['directory'],
  755. ))
  756. self._brief_cache.update_cache(basic_train_jobs)
  757. self._detail_cache.update_cache(basic_train_jobs)
  758. if not self._brief_cache.has_content() and not self._detail_cache.has_content() \
  759. and self._detail_cache.status == DetailCacheManagerStatus.DONE.value:
  760. self.status = DataManagerStatus.INVALID.value
  761. else:
  762. self.status = DataManagerStatus.DONE.value
  763. logger.info("Load brief data end, and loader pool size is %r.", self._detail_cache.loader_pool_size())
  764. @staticmethod
  765. def check_reload_interval(reload_interval):
  766. """
  767. Check reload interval is valid.
  768. Args:
  769. reload_interval (int): Reload interval >= 0.
  770. """
  771. if not isinstance(reload_interval, int):
  772. raise ParamValueError("The value of reload interval should be integer.")
  773. if reload_interval < 0:
  774. raise ParamValueError("The value of reload interval should be >= 0.")
  775. @staticmethod
  776. def check_max_threads_count(max_threads_count):
  777. """
  778. Threads count should be a integer, and should > 0.
  779. Args:
  780. max_threads_count (int), should > 0.
  781. """
  782. if not isinstance(max_threads_count, int):
  783. raise ParamValueError("The value of max threads count should be integer.")
  784. if max_threads_count <= 0:
  785. raise ParamValueError("The value of max threads count should be > 0.")
  786. def get_train_job_by_plugin(self, train_id, plugin_name):
  787. """
  788. Get a train job by train job id.
  789. If the given train job does not has the given plugin data, the tag list will be empty.
  790. Args:
  791. train_id (str): Get train job info by the given id.
  792. plugin_name (str): Get tags by given plugin.
  793. Returns:
  794. TypedDict('TrainJobEntity', {'id': str, 'name': str, 'tags': List[str]}),
  795. a train job object.
  796. """
  797. self._check_status_valid()
  798. return self._detail_cache.get_train_job_by_plugin(train_id, plugin_name)
  799. def delete_train_job(self, train_id, only_delete_from_cache=True):
  800. """
  801. Delete train job with a train id.
  802. Args:
  803. train_id (str): ID for train job.
  804. """
  805. if not only_delete_from_cache:
  806. raise NotImplementedError("Delete from both cache and disk is not supported.")
  807. self._brief_cache.delete_train_job(train_id)
  808. self._detail_cache.delete_train_job(train_id)
  809. def list_tensors(self, train_id, tag):
  810. """
  811. List tensors of the given train job and tag.
  812. If the tensor can not find by the given tag, will raise exception.
  813. Args:
  814. train_id (str): ID for train job.
  815. tag (str): The tag name.
  816. Returns:
  817. NamedTuple, the tuple format is `collections.namedtuple('_Tensor', ['wall_time', 'event_step', 'value'])`.
  818. the value will contain the given tag data.
  819. """
  820. self._check_status_valid()
  821. return self._detail_cache.list_tensors(train_id, tag)
  822. def _check_status_valid(self):
  823. """Check if the status is valid to load data."""
  824. if self.status == DataManagerStatus.INIT.value:
  825. raise exceptions.SummaryLogIsLoading("Data is being loaded, current status: %s." % self._status)
  826. def get_train_job(self, train_id):
  827. """
  828. Get train job by train ID.
  829. Args:
  830. train_id (str): Train ID for train job.
  831. Returns:
  832. dict, single train job, if can not find any data, will return None.
  833. """
  834. self._check_status_valid()
  835. detail_train_job = self._detail_cache.get_train_job(train_id)
  836. brief_train_job = self._brief_cache.get_train_job(train_id)
  837. return TrainJob(brief_train_job, detail_train_job)
  838. @property
  839. def status(self):
  840. """
  841. Get the status of data manager.
  842. Returns:
  843. DataManagerStatus, the status of data manager.
  844. """
  845. return self._status
  846. @status.setter
  847. def status(self, status):
  848. """Set data manger status."""
  849. self._status = status
  850. def cache_train_job(self, train_id):
  851. """Cache given train job (async)."""
  852. brief_need_reload = self._brief_cache.cache_train_job(train_id)
  853. detail_need_reload = self._detail_cache.cache_train_job(train_id)
  854. if brief_need_reload or detail_need_reload:
  855. self.reload_data()
  856. def register_brief_cache_item_updater(self, updater: BaseCacheItemUpdater):
  857. """Register brief cache item updater for brief cache manager."""
  858. self._brief_cache.register_cache_item_updater(updater)
  859. def get_brief_cache(self):
  860. """Get brief cache."""
  861. return self._brief_cache
  862. def get_brief_train_job(self, train_id):
  863. """Get brief train job."""
  864. return self._brief_cache.get_train_job(train_id)
  865. def get_detail_cache_status(self):
  866. """Get detail status, just for ut/st."""
  867. return self._detail_cache.status
  868. DATA_MANAGER = DataManager(settings.SUMMARY_BASE_DIR)