diff --git a/mindinsight/datavisual/data_transform/summary_watcher.py b/mindinsight/datavisual/data_transform/summary_watcher.py index 8345ab16..bf8f8a63 100644 --- a/mindinsight/datavisual/data_transform/summary_watcher.py +++ b/mindinsight/datavisual/data_transform/summary_watcher.py @@ -85,6 +85,9 @@ class SummaryWatcher: logger.error('Path of summary base directory is not accessible.') raise FileSystemPermissionError('Path of summary base directory is not accessible.') + # sort in ascending order according to modification time. + entries = [entry for entry in entries if not entry.is_symlink()] + entries = sorted(entries, key=lambda x: x.stat().st_mtime) for entry in entries: if len(summary_dict) == self.MAX_SUMMARY_DIR_COUNT: break @@ -127,7 +130,6 @@ class SummaryWatcher: entry_name (str): Name of entry. counter (Counter): An instance of CountLimiter. list_explain (bool): Indicates whether to list only the mindexplain folder. - """ try: subdir_entries = os.scandir(entry_path) @@ -135,6 +137,9 @@ class SummaryWatcher: logger.warning('Path of %s under summary base directory is not accessible.', entry_name) return + # sort in ascending order according to modification time. + subdir_entries = [subdir_entry for subdir_entry in subdir_entries if not subdir_entry.is_symlink()] + subdir_entries = sorted(subdir_entries, key=lambda x: x.stat().st_mtime) for subdir_entry in subdir_entries: if len(summary_dict) == self.MAX_SUMMARY_DIR_COUNT: break @@ -200,7 +205,6 @@ class SummaryWatcher: ctime = datetime.datetime.fromtimestamp(stat.st_ctime).astimezone() mtime = datetime.datetime.fromtimestamp(stat.st_mtime).astimezone() - if entry.is_file(): summary_pattern = re.search(self.SUMMARY_FILENAME_REGEX, entry.name) pb_pattern = re.search(self.PB_FILENAME_REGEX, entry.name) diff --git a/mindinsight/profiler/analyser/minddata_cpu_utilization_analyser.py b/mindinsight/profiler/analyser/minddata_cpu_utilization_analyser.py index cd6da16c..bed8eec7 100644 --- a/mindinsight/profiler/analyser/minddata_cpu_utilization_analyser.py +++ b/mindinsight/profiler/analyser/minddata_cpu_utilization_analyser.py @@ -31,6 +31,16 @@ class MinddataCpuUtilizationAnalyser(BaseAnalyser): def __init__(self, profiling_dir, device_id): super().__init__(profiling_dir, device_id) self._steps_info = self._get_minddata_cpu_utilization_steps_info() + self._cpu_utilization_info = dict() + + def get_idle_utilization_avg(self): + """Get the idle utilization information of the whole machine.""" + filter_condition = {} + self._filter(filter_condition) + device_key_value = "device_info" + self._get_cpu_utilization_average_value(device_key_value) + idle_utilization_avg = self._cpu_utilization_info.get("device_info").get("idle_utilization").get("avg_value") + return idle_utilization_avg def query(self, condition=None): """ @@ -48,50 +58,19 @@ class MinddataCpuUtilizationAnalyser(BaseAnalyser): filter_condition = condition.get('filter_condition', {}) log.info("Receive query request. %s", filter_condition) self._filter(filter_condition) - result = dict() - result["sampling_interval"] = self._data.get("sampling_interval") - result["step_info"] = self._steps_info - result["step_total_num"] = self._step_total_num - result["cpu_processor_num"] = self._data.get("cpu_processor_num") + self._cpu_utilization_info["sampling_interval"] = self._data.get("sampling_interval") + self._cpu_utilization_info["step_info"] = self._steps_info + self._cpu_utilization_info["step_total_num"] = self._step_total_num + self._cpu_utilization_info["cpu_processor_num"] = self._data.get("cpu_processor_num") # device average CPU utilization - result["device_info"] = dict() - for key in self._data.get("device_info").keys(): - arr = self._data.get("device_info")[key] - avg_value = round(sum(arr) / len(arr)) if arr else 0 - result["device_info"][key] = {"metrics": arr, "avg_value": avg_value} - + device_key_value = "device_info" + self._get_cpu_utilization_average_value(device_key_value) # process average CPU utilization - result["process_info"] = dict() - for key in self._data.get("process_info").keys(): - arr = self._data.get("process_info")[key] - avg_value = round(sum(arr) / len(arr)) if arr else 0 - result["process_info"][key] = {"metrics": arr, "avg_value": avg_value} - + process_key_value = "process_info" + self._get_cpu_utilization_average_value(process_key_value) # op average CPU utilization - minddata_pipeline_op_info = self._get_minddata_pipeline_info() - result["op_info"] = dict() - result["op_info"]["op_list"] = [] - result["op_info"]["total_op_avg_value"] = dict() - result["op_info"]["total_op_avg_value"]["user_utilization"] = 0 - result["op_info"]["total_op_avg_value"]["sys_utilization"] = 0 - - for item in self._data.get("op_info"): - # Filtering out non minddata pipeline operator - if str(item.get("op_id")) == "-1": - continue - op_info_dict = dict() - op_info_dict["metrics"] = dict() - for key in item.get("metrics").keys(): - arr = item.get("metrics")[key] - avg_value = round(sum(arr) / len(arr)) if arr else 0 - op_info_dict["metrics"][key] = {"metrics": arr, "avg_value": avg_value} - result["op_info"]["total_op_avg_value"][key] += avg_value - op_info_dict["op_id"] = item.get("op_id") - op_info = [i for i in minddata_pipeline_op_info if i.get("op_id") == item.get("op_id")] - # op_info is like [{"num_workers":int,"op_id":int}] - op_info_dict["num_workers"] = op_info[0].get("num_workers") - result["op_info"]["op_list"].append(op_info_dict) - return result + self._get_cpu_utilization_op_average_value() + return self._cpu_utilization_info def _load(self): """Load cpu_utilization info.""" @@ -101,7 +80,7 @@ class MinddataCpuUtilizationAnalyser(BaseAnalyser): file_path, raise_key="Invalid cpu_utilization_info file path.") if not os.path.exists(file_path): log.error('Did not find the cpu utilization file: %s', file_path) - raise ProfilerFileNotFoundException(msg='Did not find the cpu utilization file:{}'.format(file_path)) + raise ProfilerFileNotFoundException(msg='Did not find the cpu utilization file.') with open(file_path, 'r', encoding='utf-8') as file: try: @@ -124,8 +103,6 @@ class MinddataCpuUtilizationAnalyser(BaseAnalyser): """ start_step = filter_condition.get("start_step", 1) end_step = filter_condition.get("end_step", self._step_total_num) - if start_step == 1 and end_step == self._step_total_num: - return while not self._steps_info.count(str(start_step)): start_step += 1 left_index = self._steps_info.index(str(start_step)) @@ -149,17 +126,22 @@ class MinddataCpuUtilizationAnalyser(BaseAnalyser): item["metrics"][key] = item.get("metrics").get(key)[left_index:right_index + 1] def _get_minddata_cpu_utilization_steps_info(self): - """Establish a connection between cpu utilization sampling points and host queue capacity""" + """Establish a connection between cpu utilization sampling points and host queue capacity.""" steps_info = [] left_index = 0 right_index = 0 time_stamp = self._data.get("time_stamp") queue_step_time_info = self._get_minddata_queue_step_time_info() self._step_total_num = len(queue_step_time_info) + step0 = 0 for item in time_stamp: + # queue_step_time_info[][0]:step_num + # queue_step_time_info[][1]:sample time + # points less than step1 are classified as step0 + if float(item) < float(queue_step_time_info[0][1]): + steps_info.append(step0) + continue while right_index < len(queue_step_time_info): - # queue_step_time_info[][0]:step_num - # queue_step_time_info[][1]:sample time if float(item) <= float(queue_step_time_info[right_index][1]): if float(item) < float(queue_step_time_info[right_index][1]): steps_info.append(queue_step_time_info[left_index][0]) @@ -183,13 +165,16 @@ class MinddataCpuUtilizationAnalyser(BaseAnalyser): file_path, raise_key="Invalid device_queue file path") if not os.path.exists(file_path): log.error('Did not find the device queue file: %s', file_path) - raise ProfilerFileNotFoundException(msg='Did not find the device queue file:{}'.format(file_path)) + raise ProfilerFileNotFoundException(msg='Did not find the device queue file.') with open(file_path) as data_file: for line in data_file.readlines(): op_info = line.split() - # op_info[0]=="1":queue info, op_info[1]:Connector capacity, - # op_info[2]:step_num, op_info[3]:Connector size, op_info[4]:sampling time + # op_info is a list like:['1','64','8','2','85406783'] + # The value of the first element in op_info is '0' or '1'. + # '0' means that the time information is recorded. + # '1' means that the queue information is recorded. + # '1':queue info , '64':queue capacity, '8':step_num, '2':queue size, '85406783':sampling time. if op_info and op_info[0] == "1": minddata_queue_step_time_info.append([op_info[2], op_info[4]]) return minddata_queue_step_time_info @@ -218,3 +203,36 @@ class MinddataCpuUtilizationAnalyser(BaseAnalyser): op_info_dict["num_workers"] = item.get("num_workers") minddata_pipeline_op_info.append(op_info_dict) return minddata_pipeline_op_info + + def _get_cpu_utilization_average_value(self, key_value): + """Get cpu_utilization average value for host or process.""" + self._cpu_utilization_info[key_value] = dict() + for key in self._data.get(key_value).keys(): + arr = self._data.get(key_value)[key] + avg_value = round(sum(arr) / len(arr)) if arr else 0 + self._cpu_utilization_info[key_value][key] = {"metrics": arr, "avg_value": avg_value} + + def _get_cpu_utilization_op_average_value(self): + """Get cpu_utilization average value for op.""" + minddata_pipeline_op_info = self._get_minddata_pipeline_info() + self._cpu_utilization_info["op_info"] = { + "op_list": [], + "total_op_avg_value": {"user_utilization": 0, "sys_utilization": 0} + } + + for item in self._data.get("op_info"): + # Filtering out non minddata pipeline operator + if str(item.get("op_id")) == "-1": + continue + op_info_dict = dict() + op_info_dict["metrics"] = dict() + for key in item.get("metrics").keys(): + arr = item.get("metrics")[key] + avg_value = round(sum(arr) / len(arr)) if arr else 0 + op_info_dict["metrics"][key] = {"metrics": arr, "avg_value": avg_value} + self._cpu_utilization_info["op_info"]["total_op_avg_value"][key] += avg_value + op_info_dict["op_id"] = item.get("op_id") + op_info = [i for i in minddata_pipeline_op_info if i.get("op_id") == item.get("op_id")] + # op_info is like [{"num_workers":int,"op_id":int}] + op_info_dict["num_workers"] = op_info[0].get("num_workers") + self._cpu_utilization_info["op_info"]["op_list"].append(op_info_dict) diff --git a/mindinsight/profiler/proposer/allproposers/minddata_proposer.py b/mindinsight/profiler/proposer/allproposers/minddata_proposer.py index 075ce1fa..357c395e 100644 --- a/mindinsight/profiler/proposer/allproposers/minddata_proposer.py +++ b/mindinsight/profiler/proposer/allproposers/minddata_proposer.py @@ -1,4 +1,4 @@ -# Copyright 2020 Huawei Technologies Co., Ltd +# Copyright 2020-2021 Huawei Technologies Co., Ltd # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -20,6 +20,8 @@ from collections import OrderedDict from mindinsight.profiler.analyser.analyser_factory import AnalyserFactory from mindinsight.profiler.analyser.minddata_analyser import MinddataAnalyser from mindinsight.profiler.proposer.allproposers.base_proposer import Proposer +from mindinsight.profiler.common.log import logger as log +from mindinsight.profiler.common.exceptions.exceptions import ProfilerRawFileException, ProfilerFileNotFoundException class MinddataProposer(Proposer): @@ -96,11 +98,13 @@ class MinddataProposer(Proposer): minddata_cpu_utilization = OrderedDict() minddata_cpu_utilization_analyser = AnalyserFactory.instance().get_analyser( 'minddata_cpu_utilization', self.profiling_path, self.device_id) - result = minddata_cpu_utilization_analyser.query() - idle_utilization_avg = result.get("device_info").get("idle_utilization").get("avg_value") - # The maximum value of this cpu_activate_utilization_avg is 100%. - cpu_activate_utilization_avg = 100 - idle_utilization_avg - cpu_activate_utilization_threshold = 80 - if cpu_activate_utilization_avg > cpu_activate_utilization_threshold: - minddata_cpu_utilization["minddata_cpu_utilization"] = [cpu_activate_utilization_avg] - self.__proposal_dict.update(minddata_cpu_utilization) + try: + idle_utilization_avg = minddata_cpu_utilization_analyser.get_idle_utilization_avg() + # The maximum value of this cpu_activate_utilization_avg is 100%. + cpu_activate_utilization_avg = 100 - idle_utilization_avg + cpu_activate_utilization_threshold = 80 + if cpu_activate_utilization_avg > cpu_activate_utilization_threshold: + minddata_cpu_utilization["minddata_cpu_utilization"] = [cpu_activate_utilization_avg] + self.__proposal_dict.update(minddata_cpu_utilization) + except (ProfilerRawFileException, ProfilerFileNotFoundException) as err: + log.exception(err)