From 75cc7ad8ecae910211c0c2d7ed84adb3917f3264 Mon Sep 17 00:00:00 2001 From: zhangyunshu Date: Wed, 21 Apr 2021 22:29:59 +0800 Subject: [PATCH] profiler: add RESTful API for cluster memory --- mindinsight/backend/profiler/profile_api.py | 28 ++++++++++ .../profiler/analyser/cluster_analyser.py | 56 ++++++++++++++++++- .../analyser/memory_usage_analyser.py | 2 +- .../profiler/analyser/timeline_analyser.py | 4 +- 4 files changed, 86 insertions(+), 4 deletions(-) diff --git a/mindinsight/backend/profiler/profile_api.py b/mindinsight/backend/profiler/profile_api.py index 5253d84b..3fc44ea5 100644 --- a/mindinsight/backend/profiler/profile_api.py +++ b/mindinsight/backend/profiler/profile_api.py @@ -673,6 +673,34 @@ def get_cluster_step_trace_info(): return jsonify(step_trace_info) +@BLUEPRINT.route("/profile/cluster-peak-memory", methods=["GET"]) +def get_cluster_peak_memory(): + """ + Get cluster peak memory. + + Returns: + str, the cluster peak memory. + + Raises: + ParamValueError: If the cluster profiler dir is invalid. + + Examples: + >>>GET http://xxx/v1/mindinsight/profile/cluster-peak-memory + """ + train_id = get_train_id(request) + if not train_id: + raise ParamValueError('No train id.') + cluster_profiler_dir = os.path.join(settings.SUMMARY_BASE_DIR, train_id) + cluster_profiler_dir = validate_and_normalize_path(cluster_profiler_dir, 'cluster_profiler') + check_train_job_and_profiler_dir(cluster_profiler_dir) + + analyser = AnalyserFactory.instance().get_analyser( + 'cluster_memory', cluster_profiler_dir + ) + peak_mem = analyser.get_peak_memory() + return jsonify(peak_mem) + + def init_module(app): """ Init module entry. diff --git a/mindinsight/profiler/analyser/cluster_analyser.py b/mindinsight/profiler/analyser/cluster_analyser.py index 3fffbf5a..94cece33 100644 --- a/mindinsight/profiler/analyser/cluster_analyser.py +++ b/mindinsight/profiler/analyser/cluster_analyser.py @@ -18,7 +18,7 @@ import os from mindinsight.profiler.analyser.base_analyser import BaseAnalyser from mindinsight.profiler.common.exceptions.exceptions import ProfilerFileNotFoundException, \ - ProfilerDirNotFoundException + ProfilerDirNotFoundException, ProfilerIOException from mindinsight.profiler.common.log import logger as log from mindinsight.profiler.common.validator.validate_path import validate_and_normalize_path @@ -251,3 +251,57 @@ class ClusterStepTraceAnalyser(ClusterAnalyser): 'step_trace': result, 'size': self._cluster_step_trace_info_size } + + +class ClusterMemoryAnalyser(ClusterAnalyser): + """The analyser for analyzing the cluster memory usage.""" + _summary_filename = 'memory_usage_summary_{}.json' + + def __init__(self, cluster_profiler_dir, device_id='0'): + super().__init__(cluster_profiler_dir, device_id) + self._cluster_dir = os.path.join(cluster_profiler_dir, 'cluster_profiler') + + def get_peak_memory(self): + """Get peak memory for each device.""" + peak_mem_list = [] + + for host_map_ip, device_id, rank_id in self._host_device_rank_relation: + host_dir = os.path.join(self._cluster_dir, host_map_ip, 'profiler') + validate_and_normalize_path(host_dir, raise_key='Invalid host directory {}.'.format(host_map_ip)) + file_path = self._get_memory_file_for_each_device(host_dir, device_id) + file_content = self._get_file_content(file_path) + capacity = file_content.get('capacity') + peak_mem = file_content.get('peak_mem') + + mem_dict = { + 'host_ip': host_map_ip, + 'device_id': device_id, + 'rank_id': rank_id, + 'capacity': capacity, + 'peak_mem': peak_mem + } + peak_mem_list.append(mem_dict) + + return peak_mem_list + + def _get_memory_file_for_each_device(self, path, device_id): + """Get memory file for each device.""" + filename = self._summary_filename.format(device_id) + file_path = os.path.join(path, filename) + validate_and_normalize_path( + file_path, raise_key='Invalid memory usage file path.' + ) + + return file_path + + @staticmethod + def _get_file_content(file_path): + """Get file content.""" + try: + with open(file_path, 'r') as f_obj: + file_content = json.load(f_obj) + except (IOError, OSError, json.JSONDecodeError) as err: + log.error('Error occurred when read memory file: %s', err) + raise ProfilerIOException() + + return file_content diff --git a/mindinsight/profiler/analyser/memory_usage_analyser.py b/mindinsight/profiler/analyser/memory_usage_analyser.py index fc2b6103..3ab22895 100644 --- a/mindinsight/profiler/analyser/memory_usage_analyser.py +++ b/mindinsight/profiler/analyser/memory_usage_analyser.py @@ -126,7 +126,7 @@ class MemoryUsageAnalyser(BaseAnalyser): file_content = json.load(f_obj) except (IOError, OSError, json.JSONDecodeError) as err: logger.error('Error occurred when read memory file: %s', err) - raise ProfilerIOException + raise ProfilerIOException() return file_content diff --git a/mindinsight/profiler/analyser/timeline_analyser.py b/mindinsight/profiler/analyser/timeline_analyser.py index 80fecedb..5f4e5733 100644 --- a/mindinsight/profiler/analyser/timeline_analyser.py +++ b/mindinsight/profiler/analyser/timeline_analyser.py @@ -69,7 +69,7 @@ class TimelineAnalyser(BaseAnalyser): timeline = json.load(f_obj) except (IOError, OSError, json.JSONDecodeError) as err: logger.error('Error occurred when read timeline display file: %s', err) - raise ProfilerIOException + raise ProfilerIOException() else: logger.info('No timeline file. Please check the output path.') @@ -101,7 +101,7 @@ class TimelineAnalyser(BaseAnalyser): timeline_summary = json.load(f_obj) except (IOError, OSError, json.JSONDecodeError) as err: logger.error('Error occurred when read timeline summary file: %s', err) - raise ProfilerIOException + raise ProfilerIOException() else: logger.info('No timeline summary file. Please check the output path.')