# Copyright 2020 Huawei Technologies Co., Ltd # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. # ============================================================================ """Summary watcher module.""" import os import re import datetime from pathlib import Path from mindinsight.datavisual.common.log import logger from mindinsight.datavisual.common.validation import Validation from mindinsight.datavisual.utils.tools import Counter from mindinsight.datavisual.utils.utils import contains_null_byte from mindinsight.datavisual.common.exceptions import MaxCountExceededError from mindinsight.utils.exceptions import FileSystemPermissionError LINEAGE_SUMMARY_SUFFIX = '_lineage' class SummaryWatcher: """SummaryWatcher class.""" SUMMARY_FILENAME_REGEX = r'summary\.(?P\d+)' PB_FILENAME_REGEX = r'\.pb$' PROFILER_DIRECTORY_REGEX = r'^profiler$' MAX_SUMMARY_DIR_COUNT = 999 # scan at most 20000 files/directories (approximately 1 seconds) # if overall is False in SummaryWatcher.list_summary_directories # to avoid long-time blocking MAX_SCAN_COUNT = 20000 def list_summary_directories(self, summary_base_dir, overall=True): """ List summary directories within base directory. Args: summary_base_dir (str): Path of summary base directory. overall (bool): Limit the total num of scanning if overall is False. Returns: list, list of summary directory info, each of which including the following attributes. - relative_path (str): Relative path of summary directory, referring to settings.SUMMARY_BASE_DIR, starting with "./". - create_time (datetime): Creation time of summary file. - update_time (datetime): Modification time of summary file. - profiler (dict): profiler info, including profiler subdirectory path, profiler creation time and profiler modification time. Examples: >>> from mindinsight.datavisual.data_transform.summary_watcher import SummaryWatcher >>> summary_watcher = SummaryWatcher() >>> directories = summary_watcher.list_summary_directories('/summary/base/dir') """ if contains_null_byte(summary_base_dir=summary_base_dir): return [] relative_path = os.path.join('.', '') if not self._is_valid_summary_directory(summary_base_dir, relative_path): return [] summary_dict = {} counter = Counter(max_count=None if overall else self.MAX_SCAN_COUNT) try: entries = os.scandir(summary_base_dir) except PermissionError: logger.error('Path of summary base directory is not accessible.') raise FileSystemPermissionError('Path of summary base directory is not accessible.') for entry in entries: if len(summary_dict) == self.MAX_SUMMARY_DIR_COUNT: break try: counter.add() except MaxCountExceededError: logger.info('Stop further scanning due to overall is False and ' 'number of scanned files exceeds upper limit.') break if entry.is_symlink(): pass elif entry.is_file(): self._update_summary_dict(summary_dict, summary_base_dir, relative_path, entry) elif entry.is_dir(): entry_path = os.path.realpath(os.path.join(summary_base_dir, entry.name)) self._scan_subdir_entries(summary_dict, summary_base_dir, entry_path, entry.name, counter) directories = [] for key, value in summary_dict.items(): directory = { 'relative_path': key, **value } directories.append(directory) # sort by update time in descending order and relative path in ascending order directories.sort(key=lambda x: (-int(x['update_time'].timestamp()), x['relative_path'])) return directories def _scan_subdir_entries(self, summary_dict, summary_base_dir, entry_path, entry_name, counter): """ Scan subdir entries. Args: summary_dict (dict): Temporary data structure to hold summary directory info. summary_base_dir (str): Path of summary base directory. entry_path(str): Path entry. entry_name (str): Name of entry. counter (Counter): An instance of CountLimiter. """ try: subdir_entries = os.scandir(entry_path) except PermissionError: logger.warning('Path of %s under summary base directory is not accessible.', entry_name) return for subdir_entry in subdir_entries: if len(summary_dict) == self.MAX_SUMMARY_DIR_COUNT: break try: counter.add() except MaxCountExceededError: logger.info('Stop further scanning due to overall is False and ' 'number of scanned files exceeds upper limit.') break subdir_relative_path = os.path.join('.', entry_name) if subdir_entry.is_symlink(): pass self._update_summary_dict(summary_dict, summary_base_dir, subdir_relative_path, subdir_entry) def _is_valid_summary_directory(self, summary_base_dir, relative_path): """ Check if the given summary directory is valid. Args: summary_base_dir (str): Path of summary base directory. relative_path (str): Relative path of summary directory, referring to summary base directory, starting with "./" . Returns: bool, indicates if summary directory is valid. """ summary_base_dir = os.path.realpath(summary_base_dir) summary_directory = os.path.realpath(os.path.join(summary_base_dir, relative_path)) if not os.path.exists(summary_directory): logger.warning('Path of summary directory not exists.') return False if not os.path.isdir(summary_directory): logger.warning('Path of summary directory is not a valid directory.') return False try: Path(summary_directory).relative_to(Path(summary_base_dir)) except ValueError: logger.warning('Relative path %s is not subdirectory of summary_base_dir', relative_path) return False return True def _update_summary_dict(self, summary_dict, summary_base_dir, relative_path, entry): """ Update summary_dict with ctime and mtime. Args: summary_dict (dict): Temporary data structure to hold summary directory info. summary_base_dir (str): Path of summary base directory. relative_path (str): Relative path of summary directory, referring to summary base directory, starting with "./" . entry (DirEntry): Directory entry instance needed to check with regular expression. """ try: stat = entry.stat() except FileNotFoundError: logger.warning('File %s not found', entry.name) return ctime = datetime.datetime.fromtimestamp(stat.st_ctime).astimezone() mtime = datetime.datetime.fromtimestamp(stat.st_mtime).astimezone() if entry.is_file(): summary_pattern = re.search(self.SUMMARY_FILENAME_REGEX, entry.name) pb_pattern = re.search(self.PB_FILENAME_REGEX, entry.name) if summary_pattern is None and pb_pattern is None: return if summary_pattern is not None: timestamp = int(summary_pattern.groupdict().get('timestamp')) try: # extract created time from filename ctime = datetime.datetime.fromtimestamp(timestamp).astimezone() except OverflowError: return if relative_path not in summary_dict: summary_dict[relative_path] = _new_entry(ctime, mtime) if summary_dict[relative_path]['create_time'] < ctime: summary_dict[relative_path].update({ 'create_time': ctime, 'update_time': mtime, }) if not summary_pattern: summary_dict[relative_path]['graph_files'] += 1 elif entry.name.endswith(LINEAGE_SUMMARY_SUFFIX): summary_dict[relative_path]['lineage_files'] += 1 else: summary_dict[relative_path]['summary_files'] += 1 elif entry.is_dir(): profiler_pattern = re.search(self.PROFILER_DIRECTORY_REGEX, entry.name) full_dir_path = os.path.join(summary_base_dir, relative_path, entry.name) is_valid_profiler_dir, profiler_type = self._is_valid_profiler_directory(full_dir_path) if profiler_pattern is None or not is_valid_profiler_dir: return profiler = { 'directory': os.path.join('.', entry.name), 'create_time': ctime, 'update_time': mtime, "profiler_type": profiler_type } if relative_path in summary_dict: summary_dict[relative_path]['profiler'] = profiler else: summary_dict[relative_path] = _new_entry(ctime, mtime, profiler) def is_summary_directory(self, summary_base_dir, relative_path): """ Check if the given summary directory is valid. Args: summary_base_dir (str): Path of summary base directory. relative_path (str): Relative path of summary directory, referring to summary base directory, starting with "./" . Returns: bool, indicates if the given summary directory is valid. Examples: >>> from mindinsight.datavisual.data_transform.summary_watcher import SummaryWatcher >>> summary_watcher = SummaryWatcher() >>> summaries = summary_watcher.is_summary_directory('/summary/base/dir', './job-01') """ if contains_null_byte(summary_base_dir=summary_base_dir, relative_path=relative_path): return False if not self._is_valid_summary_directory(summary_base_dir, relative_path): return False summary_directory = os.path.realpath(os.path.join(summary_base_dir, relative_path)) try: entries = os.scandir(summary_directory) except PermissionError: logger.error('Path of summary base directory is not accessible.') raise FileSystemPermissionError('Path of summary base directory is not accessible.') for entry in entries: if entry.is_symlink(): continue summary_pattern = re.search(self.SUMMARY_FILENAME_REGEX, entry.name) if summary_pattern is not None and entry.is_file(): return True pb_pattern = re.search(self.PB_FILENAME_REGEX, entry.name) if pb_pattern is not None and entry.is_file(): return True profiler_pattern = re.search(self.PROFILER_DIRECTORY_REGEX, entry.name) if profiler_pattern is not None and entry.is_dir(): full_path = os.path.realpath(os.path.join(summary_directory, entry.name)) if self._is_valid_profiler_directory(full_path)[0]: return True return False def _is_valid_profiler_directory(self, directory): profiler_type = "" try: from mindinsight.profiler.common.util import analyse_device_list_from_profiler_dir device_list, profiler_type = analyse_device_list_from_profiler_dir(directory) except ImportError: device_list = [] return bool(device_list), profiler_type def list_summary_directories_by_pagination(self, summary_base_dir, offset=0, limit=10): """ List summary directories within base directory. Args: summary_base_dir (str): Path of summary base directory. offset (int): An offset for page. Ex, offset is 0, mean current page is 1. Default value is 0. limit (int): The max data items for per page. Default value is 10. Returns: tuple[total, directories], total indicates the overall number of summary directories and directories indicate list of summary directory info including the following attributes. - relative_path (str): Relative path of summary directory, referring to settings.SUMMARY_BASE_DIR, starting with "./". - create_time (datetime): Creation time of summary file. - update_time (datetime): Modification time of summary file. Raises: ParamValueError, if offset < 0 or limit is out of valid value range. ParamTypeError, if offset or limit is not valid integer. Examples: >>> from mindinsight.datavisual.data_transform.summary_watcher import SummaryWatcher >>> summary_watcher = SummaryWatcher() >>> total, directories = summary_watcher.list_summary_directories_by_pagination( '/summary/base/dir', offset=0, limit=10) """ offset = Validation.check_offset(offset=offset) limit = Validation.check_limit(limit, min_value=1, max_value=999) directories = self.list_summary_directories(summary_base_dir, overall=False) return len(directories), directories[offset * limit:(offset + 1) * limit] def list_summaries(self, summary_base_dir, relative_path='./'): """ Get info of latest summary file within the given summary directory. Args: summary_base_dir (str): Path of summary base directory. relative_path (str): Relative path of summary directory, referring to summary base directory, starting with "./" . Returns: list, list of summary file including the following attributes. - file_name (str): Summary file name. - create_time (datetime): Creation time of summary file. - update_time (datetime): Modification time of summary file. Examples: >>> from mindinsight.datavisual.data_transform.summary_watcher import SummaryWatcher >>> summary_watcher = SummaryWatcher() >>> summaries = summary_watcher.list_summaries('/summary/base/dir', './job-01') """ if contains_null_byte(summary_base_dir=summary_base_dir, relative_path=relative_path): return [] if not self._is_valid_summary_directory(summary_base_dir, relative_path): return [] summaries = [] summary_directory = os.path.realpath(os.path.join(summary_base_dir, relative_path)) try: entries = os.scandir(summary_directory) except PermissionError: logger.error('Path of summary directory is not accessible.') raise FileSystemPermissionError('Path of summary directory is not accessible.') for entry in entries: if entry.is_symlink() or not entry.is_file(): continue pattern = re.search(self.SUMMARY_FILENAME_REGEX, entry.name) if pattern is None: continue timestamp = int(pattern.groupdict().get('timestamp')) try: # extract created time from filename ctime = datetime.datetime.fromtimestamp(timestamp).astimezone() except OverflowError: continue try: stat = entry.stat() except FileNotFoundError: logger.warning('File %s not found.', entry.name) continue mtime = datetime.datetime.fromtimestamp(stat.st_mtime).astimezone() summaries.append({ 'file_name': entry.name, 'create_time': ctime, 'update_time': mtime, }) # sort by update time in descending order and filename in ascending order summaries.sort(key=lambda x: (-int(x['update_time'].timestamp()), x['file_name'])) return summaries def _new_entry(ctime, mtime, profiler=None): """Create a new entry.""" return { 'create_time': ctime, 'update_time': mtime, 'summary_files': 0, 'lineage_files': 0, 'graph_files': 0, 'profiler': profiler }