Browse Source

cluster slow node query

tags/v1.2.0-rc1
zhangyihui 4 years ago
parent
commit
d5d276ba91
4 changed files with 345 additions and 20 deletions
  1. +37
    -0
      mindinsight/backend/profiler/profile_api.py
  2. +53
    -19
      mindinsight/datavisual/data_transform/summary_watcher.py
  3. +2
    -1
      mindinsight/profiler/analyser/__init__.py
  4. +253
    -0
      mindinsight/profiler/analyser/cluster_analyser.py

+ 37
- 0
mindinsight/backend/profiler/profile_api.py View File

@@ -629,6 +629,43 @@ def get_minddata_cpu_utilization_info():
return jsonify(cpu_utilization) return jsonify(cpu_utilization)




@BLUEPRINT.route("/profile/cluster-step-trace-summary", methods=["POST"])
def get_cluster_step_trace_info():
"""
Get cluster step trace info.

Returns:
str, the cluster step trace info.

Raises:
ParamValueError: If the search condition contains some errors.

Examples:
>>>POST http://xxx/v1/mindinsight/profile/cluster-step-trace-summary
"""
train_id = get_train_id(request)
cluster_profiler_dir = os.path.join(settings.SUMMARY_BASE_DIR, train_id)
try:
cluster_profiler_dir = validate_and_normalize_path(cluster_profiler_dir, 'cluster_profiler')
except ValidationError:
raise ParamValueError('Invalid cluster_profiler dir')

condition = request.stream.read()
try:
condition = json.loads(condition) if condition else {}
except (json.JSONDecodeError, ValueError):
raise ParamValueError("Json data parse failed.")

device_id = condition.get("device_id", "0")
to_int(device_id, 'device_id')

analyser = AnalyserFactory.instance().get_analyser(
'cluster_step_trace', cluster_profiler_dir, device_id
)
step_trace_info = analyser.query(condition)
return jsonify(step_trace_info)


def init_module(app): def init_module(app):
""" """
Init module entry. Init module entry.


+ 53
- 19
mindinsight/datavisual/data_transform/summary_watcher.py View File

@@ -37,6 +37,7 @@ class SummaryWatcher:
SUMMARY_FILENAME_REGEX = r'summary\.(?P<timestamp>\d+)' SUMMARY_FILENAME_REGEX = r'summary\.(?P<timestamp>\d+)'
PB_FILENAME_REGEX = r'\.pb$' PB_FILENAME_REGEX = r'\.pb$'
PROFILER_DIRECTORY_REGEX = r'^profiler' PROFILER_DIRECTORY_REGEX = r'^profiler'
CLUSTER_PROFILER_DIRECTORY_REGEX = r'^cluster_profiler$'
MAX_SUMMARY_DIR_COUNT = 999 MAX_SUMMARY_DIR_COUNT = 999
# scan at most 20000 files/directories (approximately 1 seconds) # scan at most 20000 files/directories (approximately 1 seconds)
@@ -98,6 +99,7 @@ class SummaryWatcher:
elif entry.is_file(): elif entry.is_file():
self._update_summary_dict(summary_dict, summary_base_dir, relative_path, entry, list_explain) self._update_summary_dict(summary_dict, summary_base_dir, relative_path, entry, list_explain)
elif entry.is_dir(): elif entry.is_dir():
self._update_summary_dict(summary_dict, summary_base_dir, relative_path, entry, list_explain)
entry_path = os.path.realpath(os.path.join(summary_base_dir, entry.name)) entry_path = os.path.realpath(os.path.join(summary_base_dir, entry.name))
self._scan_subdir_entries(summary_dict, summary_base_dir, entry_path, entry.name, counter, list_explain) self._scan_subdir_entries(summary_dict, summary_base_dir, entry_path, entry.name, counter, list_explain)
@@ -236,19 +238,24 @@ class SummaryWatcher:
if list_explain: if list_explain:
return return
profiler_type, is_find = self._find_profiler_dir(entry, summary_base_dir, relative_path)
if not is_find:
return
profiler = {'directory': os.path.join('.', entry.name),
'create_time': ctime,
'update_time': mtime,
"profiler_type": profiler_type}
if relative_path in summary_dict:
summary_dict[relative_path]['profiler'] = profiler
else:
summary_dict[relative_path] = _new_entry(ctime, mtime, profiler)
cluster_profiler_type, is_cluster_profiler = \
self._find_cluster_profiler_dir(entry, summary_base_dir, relative_path)
profiler_type, is_profiler = self._find_profiler_dir(entry, summary_base_dir, relative_path)
if is_cluster_profiler or is_profiler:
if is_cluster_profiler:
profiler_type = cluster_profiler_type
profiler = {
'directory': os.path.join('.', entry.name),
'create_time': ctime,
'update_time': mtime,
"profiler_type": profiler_type
}
if relative_path in summary_dict:
summary_dict[relative_path]['profiler'] = profiler
else:
summary_dict[relative_path] = _new_entry(ctime, mtime, profiler)
def _find_profiler_dir(self, entry, summary_base_dir, relative_path): def _find_profiler_dir(self, entry, summary_base_dir, relative_path):
"""Find profiler dir by the given relative path.""" """Find profiler dir by the given relative path."""
@@ -260,6 +267,16 @@ class SummaryWatcher:
return profiler_type, True return profiler_type, True
def _find_cluster_profiler_dir(self, entry, summary_base_dir, relative_path):
"""Find profiler cluster dir by the given relative path."""
cluster_profiler_pattern = re.search(self.CLUSTER_PROFILER_DIRECTORY_REGEX, entry.name)
full_dir_path = os.path.join(summary_base_dir, relative_path, entry.name)
is_valid_cluster_profiler_dir, profiler_type = self._is_valid_cluster_profiler_directory(full_dir_path)
if cluster_profiler_pattern is None or not is_valid_cluster_profiler_dir:
return profiler_type, False
return profiler_type, True
def _is_valid_pattern_result(self, summary_pattern, pb_pattern, list_explain, entry): def _is_valid_pattern_result(self, summary_pattern, pb_pattern, list_explain, entry):
"""Check the pattern result is valid.""" """Check the pattern result is valid."""
if summary_pattern is None and pb_pattern is None: if summary_pattern is None and pb_pattern is None:
@@ -313,12 +330,14 @@ class SummaryWatcher:
if pb_pattern is not None and entry.is_file(): if pb_pattern is not None and entry.is_file():
return True return True
profiler_pattern = re.search(self.PROFILER_DIRECTORY_REGEX, entry.name)
if profiler_pattern is not None and entry.is_dir():
full_path = os.path.realpath(os.path.join(summary_directory, entry.name))
if self._is_valid_profiler_directory(full_path)[0]:
return True
if entry.is_dir():
profiler_pattern = re.search(self.PROFILER_DIRECTORY_REGEX, entry.name)
cluster_profiler_pattern = re.search(self.CLUSTER_PROFILER_DIRECTORY_REGEX, entry.name)
if profiler_pattern is not None or cluster_profiler_pattern is not None:
full_path = os.path.realpath(os.path.join(summary_directory, entry.name))
if self._is_valid_profiler_directory(full_path)[0] or \
self._is_valid_cluster_profiler_directory(full_path)[0]:
return True
return False return False
def _is_valid_profiler_directory(self, directory): def _is_valid_profiler_directory(self, directory):
@@ -331,6 +350,21 @@ class SummaryWatcher:
return bool(device_list), profiler_type return bool(device_list), profiler_type
def _is_valid_cluster_profiler_directory(self, directory):
"""Determine whether it is a valid cluster profiler."""
cluster_profiler_type = 'cluster'
entries = os.scandir(directory)
for entry in entries:
if entry.is_symlink():
continue
if entry.is_dir():
full_path = os.path.join(directory, entry.name, 'profiler')
is_profile, profiler_type = self._is_valid_profiler_directory(full_path)
if is_profile:
return is_profile, cluster_profiler_type + '_' + profiler_type
return False, cluster_profiler_type
def list_summary_directories_by_pagination(self, summary_base_dir, offset=0, limit=10): def list_summary_directories_by_pagination(self, summary_base_dir, offset=0, limit=10):
""" """
List summary directories within base directory. List summary directories within base directory.


+ 2
- 1
mindinsight/profiler/analyser/__init__.py View File

@@ -14,4 +14,5 @@
# ============================================================================ # ============================================================================
"""The analyser module.""" """The analyser module."""
from . import analyser, minddata_pipeline_analyser, step_trace_analyser, minddata_analyser, \ from . import analyser, minddata_pipeline_analyser, step_trace_analyser, minddata_analyser, \
timeline_analyser, cpu_analyser, gpu_analyser, memory_usage_analyser, minddata_cpu_utilization_analyser
timeline_analyser, cpu_analyser, gpu_analyser, memory_usage_analyser, \
minddata_cpu_utilization_analyser, cluster_analyser

+ 253
- 0
mindinsight/profiler/analyser/cluster_analyser.py View File

@@ -0,0 +1,253 @@
# Copyright 2021 Huawei Technologies Co., Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ============================================================================
"""The specific cluster analyser class"""
import json
import os

from mindinsight.profiler.analyser.base_analyser import BaseAnalyser
from mindinsight.profiler.common.exceptions.exceptions import ProfilerFileNotFoundException, \
ProfilerDirNotFoundException
from mindinsight.profiler.common.log import logger as log
from mindinsight.profiler.common.validator.validate_path import validate_and_normalize_path


class ClusterAnalyser(BaseAnalyser):
"""The analyser for analyzing the cluster."""
_host_ips_mapping_filename = 'host_ips_mapping.txt'

def __init__(self, cluster_profiler_dir, device_id):
super().__init__(cluster_profiler_dir, device_id)
self._cluster_profiler_dir = cluster_profiler_dir
self._host_ips_mapping_info = self._get_host_ips_mapping_info()
self._host_ips_dir = self._get_host_ips_dir()
self._host_device_rank_relation = self._get_host_device_rank_relation()

def _get_host_ips_mapping_info(self):
"""Get host ips mapping info."""
host_ips_mapping_info = list()
file_path = os.path.join(self._cluster_profiler_dir, self._host_ips_mapping_filename)
file_path = validate_and_normalize_path(
file_path, raise_key="Invalid file path.")
if not os.path.exists(file_path):
log.error('Did not find host_ips_mapping file: %s', file_path)
raise ProfilerDirNotFoundException(msg='Did not find host_ips_mapping:{}'.format(file_path))

with open(file_path, 'r') as src_file:
for line in src_file.readlines():
mapping_info = line.split()
if len(mapping_info) > 1:
# mapping_info[0]:host_ip, mapping_info[1]:host_mapping_ip
host_ips_mapping_info.append([mapping_info[0], mapping_info[1]])

return host_ips_mapping_info

def _get_host_ips_dir(self):
"""Get host ips dir."""
host_ips_dir = []
target_dir_path = os.path.join(self._cluster_profiler_dir, 'cluster_profiler')
target_dir_path = validate_and_normalize_path(
target_dir_path, raise_key="Invalid cluster_profiler dir path.")
if not os.path.exists(target_dir_path):
log.error('Did not find cluster_profiler dir : %s', target_dir_path)
raise ProfilerDirNotFoundException(msg='Did not find cluster_profiler dir:{}'.format(target_dir_path))

entries = os.scandir(target_dir_path)
# host_mapping_id_index:1
host_mapping_ips = [i[1] for i in self._host_ips_mapping_info]
for entry in entries:
if entry.is_symlink():
continue
if entry.is_dir():
if entry.name in host_mapping_ips:
host_ips_dir.append(entry.name)
return host_ips_dir

def _get_host_device_rank_relation(self):
"""Get host_ip device_id rank_id relation."""
rank_table_file_path = self._get_rank_table_file_path()
if not os.path.exists(rank_table_file_path):
log.error('Did not find rank table file under %s', self._cluster_profiler_dir)
raise ProfilerFileNotFoundException(msg='Did not find rank table file')
with open(rank_table_file_path, 'r', encoding='utf-8') as file:
try:
relation_info = json.load(file)
except json.JSONDecodeError as err:
log.exception(err)
host_device_rank_relation = list()
servers_info = relation_info.get("server_list")
for server_info in servers_info:
server_id = server_info.get("server_id")
devices_info = server_info.get("device")
for device_info in devices_info:
device_id = device_info.get("device_id")
rank_id = device_info.get("rank_id")
host_device_rank_relation.append([server_id, device_id, rank_id])

host_ips_mapping_info = self._get_host_ips_mapping_info()
for item in host_device_rank_relation:
# host_ip_index:0,host_mapping_id_index:1
target_info = [i for i in host_ips_mapping_info if item[0] == i[0]]
# target_info is like:[[host_ip, host_mapping_ip]]
item[0] = target_info[0][1]

return host_device_rank_relation

def _get_rank_table_file_path(self):
"""Get rank table file path."""
file_path = ''
target_dir_path = self._cluster_profiler_dir
entries = os.scandir(target_dir_path)
for entry in entries:
if entry.is_symlink():
continue
if entry.is_file() and entry.name.endswith('.json'):
file_path = os.path.join(target_dir_path, entry.name)
break
return file_path

def _load(self):
"""Load data according to the parsed profiling files."""

def _filter(self, filter_condition):
"""
Filter the profiling data according to the filter condition.

Args:
filter_condition (dict): The filter condition.
"""


class ClusterStepTraceAnalyser(ClusterAnalyser):
"""The analyser for analyzing the cluster step trace."""
_col_names = ['iteration_interval', 'fp_and_bp', 'tail']

def __init__(self, cluster_profiler_dir, device_id):
super().__init__(cluster_profiler_dir, device_id)
self._none_sort_col_names = []
self._total_step_num = self._get_total_step_num()

def _get_total_step_num(self):
"""Get the num of train step."""
total_step_num = 0
# take the data of one of the machines to get the total number of steps.
host_ip_dir = self._host_ips_dir[0]
target_dir_path = os.path.join(self._cluster_profiler_dir, 'cluster_profiler', host_ip_dir, 'profiler')
target_dir_path = validate_and_normalize_path(
target_dir_path, raise_key="Invalid profiler dir path.")
if not os.path.exists(target_dir_path):
log.error('Did not find cluster_profiler dir : %s', target_dir_path)
raise ProfilerDirNotFoundException(msg='Did not find cluster_profiler dir:{}'.format(target_dir_path))

entries = os.scandir(target_dir_path)
for entry in entries:
if entry.is_symlink():
continue
if entry.is_file() and entry.name.startswith('step_trace_raw'):
file_path = os.path.join(target_dir_path, entry.name)
with open(file_path, 'r') as src_file:
lines = src_file.readlines()
# The penultimate line represents the information of the last step
# The step num index is 0
if len(lines) > 1:
total_step_num = lines[-2].split(',')[0]
break
return total_step_num

def _get_cluster_step_trace_info(self, step_num):
"""Get cluster step trace info."""
cluster_step_trace_info = list()
for item in self._host_device_rank_relation:
# item[0]:host_ip, item[1]:device_id, item[2]:rank_id
step_trace_info = self._get_step_trace_info(item[0], item[1], step_num)
step_trace_info.append(item[0])
step_trace_info.append(item[1])
step_trace_info.append(item[2])
cluster_step_trace_info.append(step_trace_info)
self._cluster_step_trace_info_size = len(cluster_step_trace_info)
return cluster_step_trace_info

def _get_step_trace_info(self, host_ip, device_id, step_num):
"""Get step trace info."""
file_name = 'step_trace_raw_{}_detail_time.csv'.format(device_id)
step_trace_file_path = \
os.path.join(self._cluster_profiler_dir, 'cluster_profiler', host_ip, 'profiler', file_name)
step_trace_file_path = validate_and_normalize_path(
step_trace_file_path, raise_key="Invalid step trace file path.")
if not os.path.exists(step_trace_file_path):
log.error('Did not find the file: %s', step_trace_file_path)
raise ProfilerFileNotFoundException(msg='Did not find the file:{}'.format(step_trace_file_path))
step_trace_info = list()
step_num = str(step_num)
with open(step_trace_file_path, 'r') as src_file:
lines = src_file.readlines()
# when the step_num value is 0, it means the average value.
# The last line of the step_trace_raw_{}_detail_time.csv records the average value.
if step_num == '0':
step_trace_info = lines[-1].strip('\n').split(',')
else:
for line in lines:
line = line.strip('\n').split(',')
if line[0] == step_num:
step_trace_info = line
# step_trace_info[6]: iteration_interval time
# step_trace_info[7]: fp_and_bp time
# step_trace_info[8]: tail time
# divided by 1e5, the unit becomes a millisecond
iteration_interval = float(step_trace_info[6])/1e5
fp_and_bp = float(step_trace_info[7])/1e5
tail = float(step_trace_info[8])/1e5
step_trace_info = [iteration_interval, fp_and_bp, tail]
return step_trace_info

def _load(self):
"""Load data according to the parsed profiling files."""

def _filter(self, filter_condition):
"""
Filter the profiling data according to the filter condition.

Args:
filter_condition (dict): The filter condition.

- step_id (int): The selected step id.
"""

step_id = filter_condition.get("step_id", 0)
self._result = self._get_cluster_step_trace_info(step_id)

def _organize_query_result(self):
"""
Organize the query result.

Returns:
dict, the query result.
"""
result = list()
# item[0]:iteration_interval, item[1]:fp_and_bp, item[2]:tail
# item[4]:host_ip, item[5]:device_id, item[6]:rank_id
for item in self._result:
step_trace_info = dict()
step_trace_info["step_trace_info"] = item[0:3]
step_trace_info["host_ip"] = item[3]
step_trace_info["device_id"] = item[4]
step_trace_info["rank_id"] = item[5]
step_trace_info["profiler_dir"] = 'profiler'
result.append(step_trace_info)

return {
'total_step_num': self._total_step_num,
'step_trace': result,
'size': self._cluster_step_trace_info_size
}

Loading…
Cancel
Save