You can not select more than 25 topics Topics must start with a chinese character,a letter or number, can include dashes ('-') and can be up to 35 characters long.

summary_watcher.py 16 kB

5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365
  1. # Copyright 2020 Huawei Technologies Co., Ltd
  2. #
  3. # Licensed under the Apache License, Version 2.0 (the "License");
  4. # you may not use this file except in compliance with the License.
  5. # You may obtain a copy of the License at
  6. #
  7. # http://www.apache.org/licenses/LICENSE-2.0
  8. #
  9. # Unless required by applicable law or agreed to in writing, software
  10. # distributed under the License is distributed on an "AS IS" BASIS,
  11. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. # See the License for the specific language governing permissions and
  13. # limitations under the License.
  14. # ============================================================================
  15. """Summary watcher module."""
  16. import os
  17. import re
  18. import datetime
  19. from pathlib import Path
  20. from mindinsight.datavisual.common.log import logger
  21. from mindinsight.datavisual.common.validation import Validation
  22. from mindinsight.datavisual.utils.tools import Counter
  23. from mindinsight.utils.exceptions import ParamValueError
  24. from mindinsight.utils.exceptions import FileSystemPermissionError
  25. class SummaryWatcher:
  26. """SummaryWatcher class."""
  27. SUMMARY_FILENAME_REGEX = r'summary\.(?P<timestamp>\d+)'
  28. PB_FILENAME_REGEX = r'\.pb$'
  29. MAX_SUMMARY_DIR_COUNT = 999
  30. # scan at most 20000 files/directories (approximately 1 seconds)
  31. # if overall=False in SummaryWatcher.list_summary_directories
  32. # to avoid long-time blocking
  33. MAX_SCAN_COUNT = 20000
  34. def list_summary_directories(self, summary_base_dir, overall=True):
  35. """
  36. List summary directories within base directory.
  37. Args:
  38. summary_base_dir (str): Path of summary base directory.
  39. overall (bool): Limit the total num of scanning if overall is False.
  40. Returns:
  41. list, list of summary directory info, each of which including the following attributes.
  42. - relative_path (str): Relative path of summary directory, referring to settings.SUMMARY_BASE_DIR,
  43. starting with "./".
  44. - create_time (datetime): Creation time of summary file.
  45. - update_time (datetime): Modification time of summary file.
  46. Examples:
  47. >>> from mindinsight.datavisual.data_transform.summary_watcher import SummaryWatcher
  48. >>> summary_watcher = SummaryWatcher()
  49. >>> directories = summary_watcher.list_summary_directories('/summary/base/dir')
  50. """
  51. if self._contains_null_byte(summary_base_dir=summary_base_dir):
  52. return []
  53. if not os.path.exists(summary_base_dir):
  54. logger.warning('Path of summary base directory not exists.')
  55. return []
  56. if not os.path.isdir(summary_base_dir):
  57. logger.warning('Path of summary base directory is not a valid directory.')
  58. return []
  59. summary_dict = {}
  60. if not overall:
  61. counter = Counter(max_count=self.MAX_SCAN_COUNT)
  62. else:
  63. counter = Counter()
  64. try:
  65. entries = os.scandir(summary_base_dir)
  66. except PermissionError:
  67. logger.error('Path of summary base directory is not accessible.')
  68. raise FileSystemPermissionError('Path of summary base directory is not accessible.')
  69. for entry in entries:
  70. if len(summary_dict) == self.MAX_SUMMARY_DIR_COUNT:
  71. break
  72. try:
  73. counter.add()
  74. except ParamValueError:
  75. logger.info('Stop further scanning due to overall is False and '
  76. 'number of scanned files exceeds upper limit.')
  77. break
  78. relative_path = os.path.join('.', '')
  79. if entry.is_symlink():
  80. pass
  81. elif entry.is_file():
  82. self._update_summary_dict(summary_dict, relative_path, entry)
  83. elif entry.is_dir():
  84. full_path = os.path.realpath(os.path.join(summary_base_dir, entry.name))
  85. try:
  86. subdir_entries = os.scandir(full_path)
  87. except PermissionError:
  88. logger.warning('Path of %s under summary base directory is not accessible.', entry.name)
  89. continue
  90. self._scan_subdir_entries(summary_dict, subdir_entries, entry.name, counter)
  91. directories = [{
  92. 'relative_path': key,
  93. 'create_time': value['ctime'],
  94. 'update_time': value['mtime'],
  95. } for key, value in summary_dict.items()]
  96. # sort by update time in descending order and relative path in ascending order
  97. directories.sort(key=lambda x: (-int(x['update_time'].timestamp()), x['relative_path']))
  98. return directories
  99. def _scan_subdir_entries(self, summary_dict, subdir_entries, entry_name, counter):
  100. """
  101. Scan subdir entries.
  102. Args:
  103. summary_dict (dict): Temporary data structure to hold summary directory info.
  104. subdir_entries(DirEntry): Directory entry instance.
  105. entry_name (str): Name of entry.
  106. counter (Counter): An instance of CountLimiter.
  107. """
  108. for subdir_entry in subdir_entries:
  109. if len(summary_dict) == self.MAX_SUMMARY_DIR_COUNT:
  110. break
  111. try:
  112. counter.add()
  113. except ParamValueError:
  114. logger.info('Stop further scanning due to overall is False and '
  115. 'number of scanned files exceeds upper limit.')
  116. break
  117. subdir_relative_path = os.path.join('.', entry_name)
  118. if subdir_entry.is_symlink():
  119. pass
  120. elif subdir_entry.is_file():
  121. self._update_summary_dict(summary_dict, subdir_relative_path, subdir_entry)
  122. def _contains_null_byte(self, **kwargs):
  123. """
  124. Check if arg contains null byte.
  125. Args:
  126. kwargs (Any): Check if arg contains null byte.
  127. Returns:
  128. bool, indicates if any arg contains null byte.
  129. """
  130. for key, value in kwargs.items():
  131. if not isinstance(value, str):
  132. continue
  133. if '\x00' in value:
  134. logger.warning('%s contains null byte \\x00.', key)
  135. return True
  136. return False
  137. def _is_valid_summary_directory(self, summary_base_dir, relative_path):
  138. """
  139. Check if the given summary directory is valid.
  140. Args:
  141. summary_base_dir (str): Path of summary base directory.
  142. relative_path (str): Relative path of summary directory, referring to summary base directory,
  143. starting with "./" .
  144. Returns:
  145. bool, indicates if summary directory is valid.
  146. """
  147. summary_base_dir = os.path.realpath(summary_base_dir)
  148. summary_directory = os.path.realpath(os.path.join(summary_base_dir, relative_path))
  149. if summary_base_dir == summary_directory:
  150. return True
  151. if not os.path.exists(summary_directory):
  152. logger.warning('Path of summary directory not exists.')
  153. return False
  154. if not os.path.isdir(summary_directory):
  155. logger.warning('Path of summary directory is not a valid directory.')
  156. return False
  157. try:
  158. Path(summary_directory).relative_to(Path(summary_base_dir))
  159. except ValueError:
  160. logger.warning('Relative path %s is not subdirectory of summary_base_dir', relative_path)
  161. return False
  162. return True
  163. def _update_summary_dict(self, summary_dict, relative_path, entry):
  164. """
  165. Update summary_dict with ctime and mtime.
  166. Args:
  167. summary_dict (dict): Temporary data structure to hold summary directory info.
  168. relative_path (str): Relative path of summary directory, referring to summary base directory,
  169. starting with "./" .
  170. entry (DirEntry): Directory entry instance needed to check with regular expression.
  171. """
  172. summary_pattern = re.search(self.SUMMARY_FILENAME_REGEX, entry.name)
  173. pb_pattern = re.search(self.PB_FILENAME_REGEX, entry.name)
  174. if summary_pattern is None and pb_pattern is None:
  175. return
  176. if summary_pattern is not None:
  177. timestamp = int(summary_pattern.groupdict().get('timestamp'))
  178. try:
  179. # extract created time from filename
  180. ctime = datetime.datetime.fromtimestamp(timestamp).astimezone()
  181. except OverflowError:
  182. return
  183. else:
  184. ctime = datetime.datetime.fromtimestamp(entry.stat().st_ctime).astimezone()
  185. # extract modified time from filesystem
  186. mtime = datetime.datetime.fromtimestamp(entry.stat().st_mtime).astimezone()
  187. if relative_path not in summary_dict or summary_dict[relative_path]['ctime'] < ctime:
  188. summary_dict[relative_path] = {
  189. 'ctime': ctime,
  190. 'mtime': mtime,
  191. }
  192. def is_summary_directory(self, summary_base_dir, relative_path):
  193. """
  194. Check if the given summary directory is valid.
  195. Args:
  196. summary_base_dir (str): Path of summary base directory.
  197. relative_path (str): Relative path of summary directory, referring to summary base directory,
  198. starting with "./" .
  199. Returns:
  200. bool, indicates if the given summary directory is valid.
  201. Examples:
  202. >>> from mindinsight.datavisual.data_transform.summary_watcher import SummaryWatcher
  203. >>> summary_watcher = SummaryWatcher()
  204. >>> summaries = summary_watcher.is_summary_directory('/summary/base/dir', './job-01')
  205. """
  206. if self._contains_null_byte(summary_base_dir=summary_base_dir, relative_path=relative_path):
  207. return False
  208. if not self._is_valid_summary_directory(summary_base_dir, relative_path):
  209. return False
  210. summary_directory = os.path.realpath(os.path.join(summary_base_dir, relative_path))
  211. try:
  212. entries = os.scandir(summary_directory)
  213. except PermissionError:
  214. logger.error('Path of summary base directory is not accessible.')
  215. raise FileSystemPermissionError('Path of summary base directory is not accessible.')
  216. for entry in entries:
  217. if entry.is_symlink() or not entry.is_file():
  218. continue
  219. summary_pattern = re.search(self.SUMMARY_FILENAME_REGEX, entry.name)
  220. pb_pattern = re.search(self.PB_FILENAME_REGEX, entry.name)
  221. if summary_pattern or pb_pattern:
  222. return True
  223. return False
  224. def list_summary_directories_by_pagination(self, summary_base_dir, offset=0, limit=10):
  225. """
  226. List summary directories within base directory.
  227. Args:
  228. summary_base_dir (str): Path of summary base directory.
  229. offset (int): An offset for page. Ex, offset is 0, mean current page is 1. Default value is 0.
  230. limit (int): The max data items for per page. Default value is 10.
  231. Returns:
  232. tuple[total, directories], total indicates the overall number of summary directories and directories
  233. indicate list of summary directory info including the following attributes.
  234. - relative_path (str): Relative path of summary directory, referring to settings.SUMMARY_BASE_DIR,
  235. starting with "./".
  236. - create_time (datetime): Creation time of summary file.
  237. - update_time (datetime): Modification time of summary file.
  238. Raises:
  239. ParamValueError, if offset < 0 or limit is out of valid value range.
  240. ParamTypeError, if offset or limit is not valid integer.
  241. Examples:
  242. >>> from mindinsight.datavisual.data_transform.summary_watcher import SummaryWatcher
  243. >>> summary_watcher = SummaryWatcher()
  244. >>> total, directories = summary_watcher.list_summary_directories_by_pagination(
  245. '/summary/base/dir', offset=0, limit=10)
  246. """
  247. offset = Validation.check_offset(offset=offset)
  248. limit = Validation.check_limit(limit, min_value=1, max_value=999)
  249. directories = self.list_summary_directories(summary_base_dir, overall=False)
  250. return len(directories), directories[offset * limit:(offset + 1) * limit]
  251. def list_summaries(self, summary_base_dir, relative_path='./'):
  252. """
  253. Get info of latest summary file within the given summary directory.
  254. Args:
  255. summary_base_dir (str): Path of summary base directory.
  256. relative_path (str): Relative path of summary directory, referring to summary base directory,
  257. starting with "./" .
  258. Returns:
  259. list, list of summary file including the following attributes.
  260. - file_name (str): Summary file name.
  261. - create_time (datetime): Creation time of summary file.
  262. - update_time (datetime): Modification time of summary file.
  263. Examples:
  264. >>> from mindinsight.datavisual.data_transform.summary_watcher import SummaryWatcher
  265. >>> summary_watcher = SummaryWatcher()
  266. >>> summaries = summary_watcher.list_summaries('/summary/base/dir', './job-01')
  267. """
  268. if self._contains_null_byte(summary_base_dir=summary_base_dir, relative_path=relative_path):
  269. return []
  270. if not self._is_valid_summary_directory(summary_base_dir, relative_path):
  271. return []
  272. summaries = []
  273. summary_directory = os.path.realpath(os.path.join(summary_base_dir, relative_path))
  274. try:
  275. entries = os.scandir(summary_directory)
  276. except PermissionError:
  277. logger.error('Path of summary directory is not accessible.')
  278. raise FileSystemPermissionError('Path of summary directory is not accessible.')
  279. for entry in entries:
  280. if entry.is_symlink() or not entry.is_file():
  281. continue
  282. pattern = re.search(self.SUMMARY_FILENAME_REGEX, entry.name)
  283. if pattern is None:
  284. continue
  285. timestamp = int(pattern.groupdict().get('timestamp'))
  286. try:
  287. # extract created time from filename
  288. ctime = datetime.datetime.fromtimestamp(timestamp).astimezone()
  289. except OverflowError:
  290. continue
  291. # extract modified time from filesystem
  292. mtime = datetime.datetime.fromtimestamp(entry.stat().st_mtime).astimezone()
  293. summaries.append({
  294. 'file_name': entry.name,
  295. 'create_time': ctime,
  296. 'update_time': mtime,
  297. })
  298. # sort by update time in descending order and filename in ascending order
  299. summaries.sort(key=lambda x: (-int(x['update_time'].timestamp()), x['file_name']))
  300. return summaries

MindInsight为MindSpore提供了简单易用的调优调试能力。在训练过程中,可以将标量、张量、图像、计算图、模型超参、训练耗时等数据记录到文件中,通过MindInsight可视化页面进行查看及分析。