Collect timeline information after training finished. Support timeline display on web UI.tags/v0.5.0-beta
| @@ -401,6 +401,52 @@ def get_minddata_pipeline_queue_info(): | |||||
| return jsonify(op_queue_info) | return jsonify(op_queue_info) | ||||
| @BLUEPRINT.route("/profile/timeline-summary", methods=["GET"]) | |||||
| def get_timeline_summary(): | |||||
| """ | |||||
| Get timeline summary info. | |||||
| Returns: | |||||
| Response, the timeline summary info. | |||||
| Examples: | |||||
| >>> GET http://xxxx/v1/mindinsight/profile/timeline-summary | |||||
| """ | |||||
| summary_dir = request.args.get("dir") | |||||
| profiler_dir = validate_and_normalize_profiler_path(summary_dir, settings.SUMMARY_BASE_DIR) | |||||
| device_id = request.args.get("device_id", default='0') | |||||
| _ = to_int(device_id, 'device_id') | |||||
| analyser = AnalyserFactory.instance().get_analyser( | |||||
| 'timeline', profiler_dir, device_id) | |||||
| summary = analyser.get_timeline_summary() | |||||
| return summary | |||||
| @BLUEPRINT.route("/profile/timeline", methods=["GET"]) | |||||
| def get_timeline_detail(): | |||||
| """ | |||||
| Get timeline detail. | |||||
| Returns: | |||||
| Response, the detail information of timeline. | |||||
| Examples: | |||||
| >>> GET http://xxxx/v1/mindinsight/profile/timeline | |||||
| """ | |||||
| summary_dir = request.args.get("dir") | |||||
| profiler_dir = validate_and_normalize_profiler_path(summary_dir, settings.SUMMARY_BASE_DIR) | |||||
| device_id = request.args.get("device_id", default='0') | |||||
| _ = to_int(device_id, 'device_id') | |||||
| analyser = AnalyserFactory.instance().get_analyser( | |||||
| 'timeline', profiler_dir, device_id) | |||||
| timeline = analyser.get_display_timeline() | |||||
| return jsonify(timeline) | |||||
| def init_module(app): | def init_module(app): | ||||
| """ | """ | ||||
| Init module entry. | Init module entry. | ||||
| @@ -14,4 +14,4 @@ | |||||
| # ============================================================================ | # ============================================================================ | ||||
| """The analyser module.""" | """The analyser module.""" | ||||
| from . import analyser, minddata_pipeline_analyser, step_trace_analyser, \ | from . import analyser, minddata_pipeline_analyser, step_trace_analyser, \ | ||||
| minddata_analyser | |||||
| minddata_analyser, timeline_analyser | |||||
| @@ -0,0 +1,340 @@ | |||||
| # Copyright 2020 Huawei Technologies Co., Ltd | |||||
| # | |||||
| # Licensed under the Apache License, Version 2.0 (the "License"); | |||||
| # you may not use this file except in compliance with the License. | |||||
| # You may obtain a copy of the License at | |||||
| # | |||||
| # http://www.apache.org/licenses/LICENSE-2.0 | |||||
| # | |||||
| # Unless required by applicable law or agreed to in writing, software | |||||
| # distributed under the License is distributed on an "AS IS" BASIS, | |||||
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |||||
| # See the License for the specific language governing permissions and | |||||
| # limitations under the License. | |||||
| # ============================================================================ | |||||
| """The Timeline Analyser.""" | |||||
| import json | |||||
| import os | |||||
| from mindinsight.profiler.analyser.base_analyser import BaseAnalyser | |||||
| from mindinsight.profiler.parser.container import TimelineContainer | |||||
| from mindinsight.profiler.common.exceptions.exceptions import ProfilerFileNotFoundException, \ | |||||
| ProfilerIOException | |||||
| from mindinsight.profiler.common.log import logger | |||||
| from mindinsight.profiler.common.validator.validate_path import validate_and_normalize_path | |||||
| SIZE_LIMIT = 20 * 1024 * 1024 # 20MB | |||||
| class TimelineAnalyser(BaseAnalyser): | |||||
| """ | |||||
| Analyse timeline data from file. | |||||
| """ | |||||
| __col_names__ = ['op_name', 'stream_id', 'start_time', 'duration'] | |||||
| _output_timeline_data_file_path = 'output_timeline_data_{}.txt' | |||||
| _min_cycle_counter_file_path = 'min_cycle_counter_{}.txt' | |||||
| _timeline_filename = 'timeline_detail_{}.json' | |||||
| _display_filename = 'timeline_display_{}.json' | |||||
| _timeline_summary_filename = 'timeline_summary_{}.json' | |||||
| _timeline_meta = [] | |||||
| _timeline_summary = { | |||||
| 'total_time': 0, | |||||
| 'num_of_streams': 0, | |||||
| 'num_of_ops': 0, | |||||
| 'op_exe_times': 0 | |||||
| } | |||||
| def _load(self): | |||||
| """Load data according to the parsed profiling files.""" | |||||
| self.load_timeline_data() | |||||
| self._timeline_summary['op_exe_times'] = len(self._timeline_meta) | |||||
| def _filter(self, filter_condition): | |||||
| """ | |||||
| Filter the profiling data according to the filter condition. | |||||
| Args: | |||||
| filter_condition (dict): The filter condition. | |||||
| """ | |||||
| def get_display_timeline(self): | |||||
| """ | |||||
| Get timeline data for UI display. | |||||
| Returns: | |||||
| json, the content of timeline data. | |||||
| """ | |||||
| # Search timeline json file under profiling dir. | |||||
| file_path = None | |||||
| for filename in os.listdir(self._profiling_dir): | |||||
| if filename.startswith('timeline_display') and filename.endswith('.json'): | |||||
| file_path = os.path.join(self._profiling_dir, filename) | |||||
| logger.debug('Display file found.') | |||||
| break | |||||
| elif filename.startswith('timeline_detail') and filename.endswith('.json'): | |||||
| file_path = os.path.join(self._profiling_dir, filename) | |||||
| logger.debug('Original file found.') | |||||
| break | |||||
| file_path = validate_and_normalize_path( | |||||
| file_path, raise_key='Invalid timeline json path.' | |||||
| ) | |||||
| timeline = [] | |||||
| if os.path.exists(file_path): | |||||
| try: | |||||
| with open(file_path, 'r') as f_obj: | |||||
| timeline = json.load(f_obj) | |||||
| except (IOError, OSError) as err: | |||||
| logger.error('Error occurred when read timeline display file: %s', err) | |||||
| raise ProfilerIOException | |||||
| else: | |||||
| logger.info('No timeline file. Please check the output path.') | |||||
| return timeline | |||||
| def get_timeline_summary(self): | |||||
| """ | |||||
| Get timeline summary information for UI display. | |||||
| Returns: | |||||
| json, the content of timeline summary information. | |||||
| """ | |||||
| file_path = None | |||||
| summary_file_name = 'timeline_summary_{}.json'.format(self._device_id) | |||||
| if summary_file_name in os.listdir(self._profiling_dir): | |||||
| file_path = os.path.join(self._profiling_dir, summary_file_name) | |||||
| file_path = validate_and_normalize_path( | |||||
| file_path, raise_key='Invalid timeline summary path.' | |||||
| ) | |||||
| timeline_summary = {} | |||||
| if os.path.exists(file_path): | |||||
| try: | |||||
| with open(file_path, 'r') as f_obj: | |||||
| timeline_summary = json.load(f_obj) | |||||
| except (IOError, OSError) as err: | |||||
| logger.error('Error occurred when read timeline summary file: %s', err) | |||||
| raise ProfilerIOException | |||||
| return timeline_summary | |||||
| def write_timeline(self): | |||||
| """Load data according to the parsed profiling files.""" | |||||
| # Write timeline to file. | |||||
| file_size = self.write_timeline_to_json() | |||||
| # If the file size is larger than 20MB, open a new file and | |||||
| # write the first 20MB content into it. | |||||
| if file_size > SIZE_LIMIT: | |||||
| logger.debug('File size is larger than 20MB, will be resized...') | |||||
| # write to json file for display | |||||
| self.write_timeline_to_json_by_limitation() | |||||
| def write_timeline_to_json(self): | |||||
| """Write timeline to json.""" | |||||
| timeline_filename = self._timeline_filename.format(self._device_id) | |||||
| timeline_file_path = os.path.join( | |||||
| self._profiling_dir, | |||||
| timeline_filename | |||||
| ) | |||||
| timeline_file_path = validate_and_normalize_path( | |||||
| timeline_file_path, raise_key='Invalid timeline json path.' | |||||
| ) | |||||
| try: | |||||
| with open(timeline_file_path, 'w') as json_file: | |||||
| json.dump(self._timeline_meta, json_file) | |||||
| file_size = os.path.getsize(timeline_file_path) | |||||
| except (IOError, OSError) as err: | |||||
| logger.error('Error occurred when write timeline full details: %s', err) | |||||
| raise ProfilerIOException | |||||
| return file_size | |||||
| def write_timeline_to_json_by_limitation(self): | |||||
| """Write timeline to json by limitation.""" | |||||
| display_filename = self._display_filename.format(self._device_id) | |||||
| display_file_path = os.path.join( | |||||
| self._profiling_dir, | |||||
| display_filename | |||||
| ) | |||||
| display_file_path = validate_and_normalize_path( | |||||
| display_file_path, raise_key='Invalid timeline display json path.' | |||||
| ) | |||||
| try: | |||||
| with open(display_file_path, 'w') as json_file: | |||||
| json_file.write('[') | |||||
| for item in self._timeline_meta: | |||||
| json.dump(item, json_file) | |||||
| file_size = os.path.getsize(display_file_path) | |||||
| if file_size > SIZE_LIMIT: | |||||
| break | |||||
| json_file.write(',') | |||||
| json_file.write(']') | |||||
| except (IOError, OSError) as err: | |||||
| logger.error('Error occurred when write timeline display file: %s', err) | |||||
| raise ProfilerIOException | |||||
| def write_timeline_summary(self): | |||||
| """Write timeline summary to json.""" | |||||
| timeline_summary_file_path = os.path.join( | |||||
| self._profiling_dir, | |||||
| self._timeline_summary_filename.format(self._device_id) | |||||
| ) | |||||
| timeline_summary_file_path = validate_and_normalize_path( | |||||
| timeline_summary_file_path, raise_key='Invalid timeline summary path.' | |||||
| ) | |||||
| try: | |||||
| with open(timeline_summary_file_path, 'w') as json_file: | |||||
| json.dump(self._timeline_summary, json_file) | |||||
| except (IOError, OSError) as err: | |||||
| logger.error('Error occurred when write timeline summary file: %s', err) | |||||
| raise ProfilerIOException | |||||
| def load_timeline_data(self): | |||||
| """Load timeline data from file.""" | |||||
| file_path = os.path.join( | |||||
| self._profiling_dir, | |||||
| self._output_timeline_data_file_path.format(self._device_id) | |||||
| ) | |||||
| file_path = validate_and_normalize_path( | |||||
| file_path, raise_key='Invalid timeline txt file path.' | |||||
| ) | |||||
| if not os.path.exists(file_path): | |||||
| logger.error("Failed to find parsed timeline file.") | |||||
| raise ProfilerFileNotFoundException('parsed timeline file') | |||||
| stream_count_dict = {} | |||||
| try: | |||||
| with open(file_path, 'r') as f_obj: | |||||
| for line in f_obj: | |||||
| if not line.startswith('=') and not line.startswith('op_name') and \ | |||||
| not line.startswith('-'): | |||||
| line_list = line.split() | |||||
| self._parse_timeline_data(line_list) | |||||
| self._update_num_of_streams(line_list, stream_count_dict) | |||||
| except (IOError, OSError) as err: | |||||
| logger.error('Error occurred when read timeline intermediate file: %s', err) | |||||
| raise ProfilerIOException | |||||
| # Update timeline summary info | |||||
| self._timeline_summary['num_of_streams'] = len(stream_count_dict.keys()) | |||||
| def _parse_timeline_data(self, line_list): | |||||
| """Parse timeline data.""" | |||||
| factor = 1000 | |||||
| op_meta = TimelineContainer(line_list) | |||||
| timeline_dict = {} | |||||
| timeline_dict['name'] = op_meta.op_name | |||||
| timeline_dict['ph'] = 'X' | |||||
| timeline_dict['pid'] = int(self._device_id) | |||||
| timeline_dict['tid'] = op_meta.stream_id | |||||
| timeline_dict['ts'] = op_meta.start_time * factor | |||||
| dur = op_meta.duration * factor | |||||
| timeline_dict['dur'] = dur | |||||
| self._timeline_summary['total_time'] += dur | |||||
| self._timeline_meta.append(timeline_dict) | |||||
| @staticmethod | |||||
| def _update_num_of_streams(line_list, stream_count_dict): | |||||
| """Update number of streams.""" | |||||
| stream_id = line_list[1] | |||||
| if stream_id not in stream_count_dict.keys(): | |||||
| stream_count_dict[stream_id] = 1 | |||||
| else: | |||||
| stream_count_dict[stream_id] += 1 | |||||
| def get_min_cycle_counter_from_file(self): | |||||
| """ | |||||
| Get minimum cycle counter. | |||||
| Returns: | |||||
| float, the minimum value of the cycle counter. | |||||
| """ | |||||
| file_path = os.path.join( | |||||
| self._profiling_dir, | |||||
| self._min_cycle_counter_file_path.format(self._device_id) | |||||
| ) | |||||
| file_path = validate_and_normalize_path( | |||||
| file_path, raise_key='Invalid min cycle counter file path.' | |||||
| ) | |||||
| if os.path.exists(file_path): | |||||
| try: | |||||
| with open(file_path, 'r') as f_obj: | |||||
| min_cycle_counter = f_obj.read() | |||||
| min_cycle_counter = float(min_cycle_counter) \ | |||||
| if not min_cycle_counter == 'inf' else 0 | |||||
| except (IOError, OSError) as err: | |||||
| logger.error('Error occurred when read minimum cycle counter: %s', err) | |||||
| raise ProfilerIOException | |||||
| else: | |||||
| min_cycle_counter = 0 | |||||
| logger.info("No min cycle counter recorded.") | |||||
| return min_cycle_counter | |||||
| def add_all_reduce_info(self, all_reduce_info): | |||||
| """ | |||||
| Add all reduce info into timeline metadata. | |||||
| Args: | |||||
| all_reduce_info (list<dict>): The metadata of AllReduce operator. | |||||
| [ | |||||
| { | |||||
| 'stream_id_1': [(start_time, end_time, duration, field_name)], | |||||
| ... | |||||
| }, | |||||
| {...} | |||||
| ] | |||||
| """ | |||||
| logger.info('Adding AllReduce info...') | |||||
| factor = 100 | |||||
| min_cycle_counter = self.get_min_cycle_counter_from_file() | |||||
| for step_meta in all_reduce_info: | |||||
| for stream_id, time_info_list in step_meta.items(): | |||||
| for time_info in time_info_list: | |||||
| start, _, dur, name = time_info | |||||
| all_reduce_dict = {} | |||||
| all_reduce_dict['name'] = name | |||||
| all_reduce_dict['ph'] = 'X' | |||||
| # Using 10000 to represent AllReduce | |||||
| all_reduce_dict['pid'] = 10000 | |||||
| all_reduce_dict['tid'] = int(stream_id) | |||||
| all_reduce_dict['ts'] = (start - min_cycle_counter) / factor | |||||
| all_reduce_dict['dur'] = dur / factor | |||||
| self._timeline_meta.append(all_reduce_dict) | |||||
| self._timeline_summary['total_time'] += all_reduce_dict['dur'] | |||||
| def add_framework_info(self, framework_info): | |||||
| """ | |||||
| Add framework info into timeline metadata. | |||||
| Args: | |||||
| framework_info (dict): The framework metadata. | |||||
| """ | |||||
| logger.info('Adding framework info...') | |||||
| framework_obj_list = framework_info.get('object') | |||||
| self._timeline_summary['num_of_ops'] = len(framework_obj_list) | |||||
| for framework_obj in framework_obj_list: | |||||
| op_name = framework_obj[0] | |||||
| op_type = framework_obj[1] | |||||
| op_full_name = framework_obj[4] | |||||
| op_info = framework_obj[5] | |||||
| for timeline_obj in self._timeline_meta: | |||||
| if op_full_name == timeline_obj.get('name'): | |||||
| timeline_obj['name'] = op_name | |||||
| timeline_obj['args'] = { | |||||
| 'type': op_type, | |||||
| 'fullname': op_full_name | |||||
| } | |||||
| timeline_obj['args'].update(op_info) | |||||
| @@ -0,0 +1,91 @@ | |||||
| """The container of metadata used in profiler parser.""" | |||||
| class HWTSContainer: | |||||
| """ | |||||
| HWTS output container. | |||||
| Args: | |||||
| split_list (list): The split list of metadata in HWTS output file. | |||||
| """ | |||||
| def __init__(self, split_list): | |||||
| self._op_name = '' | |||||
| self._duration = None | |||||
| self._status = split_list[0] | |||||
| self._task_id = split_list[6] | |||||
| self._cycle_counter = float(split_list[7]) | |||||
| self._stream_id = split_list[8] | |||||
| @property | |||||
| def status(self): | |||||
| """Get the status of the operator, i.e. Start or End.""" | |||||
| return self._status | |||||
| @property | |||||
| def task_id(self): | |||||
| """Get the task id of the operator.""" | |||||
| return self._task_id | |||||
| @property | |||||
| def cycle_counter(self): | |||||
| """Get the cycle counter.""" | |||||
| return self._cycle_counter | |||||
| @property | |||||
| def stream_id(self): | |||||
| """Get the stream id of the operator.""" | |||||
| return self._stream_id | |||||
| @property | |||||
| def op_name(self): | |||||
| """Get the name of the operator.""" | |||||
| return self._op_name | |||||
| @op_name.setter | |||||
| def op_name(self, name): | |||||
| """Set the name of the operator.""" | |||||
| self._op_name = name | |||||
| @property | |||||
| def duration(self): | |||||
| """Get the duration of the operator execution.""" | |||||
| return self._duration | |||||
| @duration.setter | |||||
| def duration(self, value): | |||||
| """Set the duration of the operator execution.""" | |||||
| self._duration = value | |||||
| class TimelineContainer: | |||||
| """ | |||||
| A container of operator computation metadata. | |||||
| Args: | |||||
| split_list (list): The split list of metadata in op_compute output file. | |||||
| """ | |||||
| def __init__(self, split_list): | |||||
| self._op_name = split_list[0] | |||||
| self._stream_id = int(split_list[1]) | |||||
| self._start_time = float(split_list[2]) | |||||
| self._duration = float(split_list[3]) | |||||
| @property | |||||
| def op_name(self): | |||||
| """Get the name of the operator.""" | |||||
| return self._op_name | |||||
| @property | |||||
| def stream_id(self): | |||||
| """Get the stream id of the operator.""" | |||||
| return self._stream_id | |||||
| @property | |||||
| def start_time(self): | |||||
| """Get the execution start time of the operator.""" | |||||
| return self._start_time | |||||
| @property | |||||
| def duration(self): | |||||
| """Get the duration of the operator execution.""" | |||||
| return self._duration | |||||
| @@ -13,8 +13,15 @@ | |||||
| # limitations under the License. | # limitations under the License. | ||||
| # ============================================================================ | # ============================================================================ | ||||
| """Op compute time files parser.""" | """Op compute time files parser.""" | ||||
| import os | |||||
| from tabulate import tabulate | from tabulate import tabulate | ||||
| from mindinsight.profiler.common._utils import fwrite_format | from mindinsight.profiler.common._utils import fwrite_format | ||||
| from mindinsight.profiler.common.exceptions.exceptions import ProfilerFileNotFoundException | |||||
| from mindinsight.profiler.common.log import logger | |||||
| from mindinsight.profiler.common.validator.validate_path import validate_and_normalize_path | |||||
| from mindinsight.profiler.parser.container import HWTSContainer | |||||
| class OPComputeTimeParser: | class OPComputeTimeParser: | ||||
| """ | """ | ||||
| @@ -28,11 +35,20 @@ class OPComputeTimeParser: | |||||
| _dst_file_title = 'title:op compute time' | _dst_file_title = 'title:op compute time' | ||||
| _dst_file_column_title = ['op_name', 'compute_time(ms)', 'stream_id'] | _dst_file_column_title = ['op_name', 'compute_time(ms)', 'stream_id'] | ||||
| _timeline_file_title = 'title:timeline info' | |||||
| _timeline_file_column_title = ['op_name', 'stream_id', 'start_time', 'duration'] | |||||
| def __init__(self, hwts_output_file, output_filename, op_task_info): | |||||
| def __init__(self, hwts_output_file, output_filename, op_task_info, | |||||
| output_path, device_id): | |||||
| hwts_output_file = validate_and_normalize_path( | |||||
| hwts_output_file, raise_key='Invalid hwts output file path.' | |||||
| ) | |||||
| self._hwts_output_file = hwts_output_file | self._hwts_output_file = hwts_output_file | ||||
| self._output_filename = output_filename | self._output_filename = output_filename | ||||
| self._op_task_info = op_task_info | self._op_task_info = op_task_info | ||||
| self._output_path = output_path | |||||
| self._device_id = device_id | |||||
| self._min_cycle_counter = float("inf") | |||||
| def _get_op_task_id_map(self): | def _get_op_task_id_map(self): | ||||
| """ | """ | ||||
| @@ -44,78 +60,197 @@ class OPComputeTimeParser: | |||||
| op_map_result = [] | op_map_result = [] | ||||
| hwts_list = [] | hwts_list = [] | ||||
| with(open(self._hwts_output_file, 'r')) as data_file: | |||||
| if not os.path.exists(self._hwts_output_file): | |||||
| logger.error('The hwts output file does not exist.') | |||||
| raise ProfilerFileNotFoundException('hwts output file') | |||||
| with open(self._hwts_output_file, 'r') as data_file: | |||||
| lines = data_file.readlines() | lines = data_file.readlines() | ||||
| for line in lines: | for line in lines: | ||||
| if line.startswith("Start of task"): | |||||
| if line.startswith("Start of task") or line.startswith("End of task"): | |||||
| line_split = line.split() | line_split = line.split() | ||||
| hwts_list.append([line_split[0], line_split[6], line_split[7], line_split[8]]) | |||||
| if line.startswith('End of task'): | |||||
| line_split = line.split() | |||||
| hwts_list.append([line_split[0], line_split[6], line_split[7], line_split[8]]) | |||||
| container = HWTSContainer(line_split) | |||||
| hwts_list.append(container) | |||||
| # hwts op map by taskId | # hwts op map by taskId | ||||
| for hwts in hwts_list: | for hwts in hwts_list: | ||||
| if hwts[1] in self._op_task_info.keys(): | |||||
| op_map_result.append([self._op_task_info[hwts[1]], hwts[0], hwts[1], hwts[2], hwts[3]]) | |||||
| if hwts.task_id in self._op_task_info.keys(): | |||||
| hwts.op_name = self._op_task_info[hwts.task_id] | |||||
| op_map_result.append(hwts) | |||||
| return op_map_result | return op_map_result | ||||
| def execute(self): | def execute(self): | ||||
| """Execute the parser, compute all op, get op time, and write it to the output file.""" | """Execute the parser, compute all op, get op time, and write it to the output file.""" | ||||
| # Calculate the execution time of operators, | |||||
| # and update the minimum cycle counter. | |||||
| tmp_result_data = self._calculate_op_execution_time() | |||||
| # Convert time units from nanoseconds to milliseconds. | |||||
| # The unit of the cycle counter is 10 nanoseconds. | |||||
| op_name_time_dict = {} | |||||
| op_name_stream_dict = {} | |||||
| op_name_count_dict = {} | |||||
| op_name_task_dict = {} | |||||
| op_name_start_time = {} | |||||
| self._convert_op_time_unit( | |||||
| tmp_result_data, op_name_time_dict, op_name_stream_dict, | |||||
| op_name_count_dict, op_name_task_dict, op_name_start_time | |||||
| ) | |||||
| result_data = [] | result_data = [] | ||||
| for op_name, time in op_name_time_dict.items(): | |||||
| if op_name in op_name_stream_dict.keys(): | |||||
| stream_id = op_name_stream_dict[op_name] | |||||
| avg_time = time / op_name_count_dict[op_name] | |||||
| result_data.append([op_name, avg_time, stream_id]) | |||||
| timeline_data = [] | |||||
| for op_name, time in op_name_time_dict.items(): | |||||
| if op_name in op_name_stream_dict.keys(): | |||||
| stream_id = op_name_stream_dict[op_name] | |||||
| start_time_list = op_name_start_time.get(op_name) | |||||
| for (start_time, duration) in start_time_list: | |||||
| timeline_data.append([op_name, stream_id, start_time, duration]) | |||||
| # Write the metadata of operators into the file, | |||||
| # including operator name, average time, and stream id. | |||||
| self._write_op_time_into_file(result_data) | |||||
| # Write the timeline data into file, | |||||
| # including operator name, stream id, start time, and duration. | |||||
| self._write_timeline_data_into_file(timeline_data) | |||||
| # Write the minimum cycle counter into the file. | |||||
| self.write_min_cycle_counter_to_file() | |||||
| def _write_op_time_into_file(self, result_data): | |||||
| """ | |||||
| Write the metadata of operators into the file, including | |||||
| op name, average time, and stream id. | |||||
| Args: | |||||
| result_data (list): The metadata to be written into the file. | |||||
| [ | |||||
| ['op_name_1', 'avg_time_1', 'stream_id_1'], | |||||
| ['op_name_2', 'avg_time_2', 'stream_id_2'], | |||||
| [...] | |||||
| ] | |||||
| """ | |||||
| result_data.sort(key=lambda x: x[0]) | |||||
| total_time = 0 | |||||
| for item in result_data: | |||||
| total_time += item[1] | |||||
| result_data.append(["total op", total_time, 0]) | |||||
| fwrite_format(self._output_filename, data_source=self._dst_file_title, is_start=True) | |||||
| fwrite_format(self._output_filename, data_source=tabulate(result_data, | |||||
| self._dst_file_column_title, | |||||
| tablefmt='simple')) | |||||
| def _write_timeline_data_into_file(self, timeline_data): | |||||
| """ | |||||
| Write the timeline information into the file, including | |||||
| operator name, stream id, start time and duration. | |||||
| Args: | |||||
| timeline_data (list): The metadata to be written into the file. | |||||
| [ | |||||
| ['op_name_1', 'stream_id_1', 'start_time_1', 'durarion_1'], | |||||
| ['op_name_2', 'stream_id_2', 'start_time_2', 'durarion_2'], | |||||
| [...] | |||||
| ] | |||||
| """ | |||||
| # sorted by start times | |||||
| timeline_data.sort(key=lambda x: float(x[2])) | |||||
| filename = 'output_timeline_data_{}.txt'.format(self._device_id) | |||||
| file_path = os.path.join(self._output_path, filename) | |||||
| file_path = validate_and_normalize_path(file_path, raise_key='Invalid file path of timeline data.') | |||||
| # write to file | |||||
| fwrite_format(file_path, data_source=self._timeline_file_title, is_start=True) | |||||
| fwrite_format(file_path, data_source=tabulate( | |||||
| timeline_data, self._timeline_file_column_title, tablefmt='simple' | |||||
| )) | |||||
| def _calculate_op_execution_time(self): | |||||
| """ | |||||
| Calculate the execution time of each operator. | |||||
| Returns: | |||||
| list, including the intermediate data of op execution time. | |||||
| """ | |||||
| tmp_result_data = [] | tmp_result_data = [] | ||||
| op_map_list = self._get_op_task_id_map() | op_map_list = self._get_op_task_id_map() | ||||
| cur_index = 0 | cur_index = 0 | ||||
| length = len(op_map_list) | length = len(op_map_list) | ||||
| min_cycle_counter = float("inf") | |||||
| while cur_index < length: | while cur_index < length: | ||||
| if cur_index + 1 == length: | if cur_index + 1 == length: | ||||
| break | break | ||||
| op_start = op_map_list[cur_index] | |||||
| op_end = op_map_list[cur_index+1] | |||||
| if op_start[1] == "Start" and op_end[1] == "End"\ | |||||
| and op_start[0] == op_end[0]: | |||||
| # op_name, task_id, cycle counter, stream_id | |||||
| tmp_result_data.append([op_start[0], op_start[2], int(op_end[3]) - int(op_start[3]), op_start[4]]) | |||||
| op_start = op_map_list[cur_index] | |||||
| op_end = op_map_list[cur_index + 1] | |||||
| if op_start.status == "Start" and op_end.status == "End" \ | |||||
| and op_start.op_name == op_end.op_name: | |||||
| op_start.duration = op_end.cycle_counter - op_start.cycle_counter | |||||
| tmp_result_data.append(op_start) | |||||
| cur_index += 2 | cur_index += 2 | ||||
| if not op_start.op_name.startswith("assign"): | |||||
| min_cycle_counter = min(min_cycle_counter, op_start.cycle_counter) | |||||
| else: | else: | ||||
| cur_index += 1 | cur_index += 1 | ||||
| op_name_time_dict = {} | |||||
| op_name_steamid_dict = {} | |||||
| op_name_count_dict = {} | |||||
| op_name_task_dict = {} | |||||
| # Update the value of minimum cycle counter. | |||||
| self._min_cycle_counter = min_cycle_counter | |||||
| # compute all op | |||||
| for item in tmp_result_data: | |||||
| if item[0] in op_name_time_dict.keys(): | |||||
| op_name_time_dict[item[0]] += float(item[2])/1e5 # cycle counter/1*10^5 ms | |||||
| if item[1] == op_name_task_dict[item[0]]: | |||||
| op_name_count_dict[item[0]] += 1 | |||||
| return tmp_result_data | |||||
| else: | |||||
| op_name_time_dict[item[0]] = float(item[2])/1e5 | |||||
| op_name_steamid_dict[item[0]] = item[-1] | |||||
| op_name_task_dict[item[0]] = item[1] | |||||
| op_name_count_dict[item[0]] = 1 | |||||
| def _convert_op_time_unit(self, op_data_list, op_name_time_dict, op_name_stream_dict, | |||||
| op_name_count_dict, op_name_task_dict, op_name_start_time): | |||||
| """ | |||||
| Calculate the execution time of operator and convert it into millisecond. | |||||
| for op_name, time in op_name_time_dict.items(): | |||||
| if op_name in op_name_steamid_dict.keys(): | |||||
| stream_id = op_name_steamid_dict[op_name] | |||||
| avg_time = time / op_name_count_dict[op_name] | |||||
| result_data.append([op_name, avg_time, stream_id]) | |||||
| Args: | |||||
| op_data_list (list): The list of operator metadata. | |||||
| op_name_time_dict (dict): The mapping relation of operator name and its execution time. | |||||
| op_name_stream_dict (dict): The mapping relation of operator name and its stream id. | |||||
| op_name_count_dict (dict): The mapping relation of operator name and its count. | |||||
| op_name_task_dict (dict): The mapping relation of operator name and its task id. | |||||
| op_name_start_time (dict): The mapping relation of operator name and its start time. | |||||
| """ | |||||
| factor = 1e5 | |||||
| for item in op_data_list: | |||||
| op_name = item.op_name | |||||
| # Unit conversion: converting the cycle counter into ms. | |||||
| op_start_time_str = str((item.cycle_counter - self._min_cycle_counter) / factor) | |||||
| op_duration = item.duration / factor | |||||
| op_duration_str = str(item.duration / factor) | |||||
| if op_name in op_name_time_dict.keys(): | |||||
| op_name_time_dict[op_name] += op_duration | |||||
| if item.task_id == op_name_task_dict[op_name]: | |||||
| op_name_count_dict[op_name] += 1 | |||||
| op_name_start_time[op_name].append( | |||||
| (op_start_time_str, op_duration_str) | |||||
| ) | |||||
| result_data.sort(key=lambda x: x[0]) | |||||
| total_time = 0 | |||||
| for item in result_data: | |||||
| total_time += item[1] | |||||
| result_data.append(["total op", total_time, 0]) | |||||
| else: | |||||
| op_name_time_dict[op_name] = op_duration | |||||
| op_name_stream_dict[op_name] = item.stream_id | |||||
| op_name_task_dict[op_name] = item.task_id | |||||
| op_name_count_dict[op_name] = 1 | |||||
| op_name_start_time[op_name] = [] | |||||
| op_name_start_time[op_name].append( | |||||
| (op_start_time_str, op_duration_str) | |||||
| ) | |||||
| fwrite_format(self._output_filename, data_source=self._dst_file_title, is_start=True) | |||||
| fwrite_format(self._output_filename, data_source=tabulate(result_data, | |||||
| self._dst_file_column_title, | |||||
| tablefmt='simple')) | |||||
| def write_min_cycle_counter_to_file(self): | |||||
| """Write minimum cycle counter into a txt file.""" | |||||
| min_cycle_counter = self._min_cycle_counter | |||||
| file_name = 'min_cycle_counter_' + self._device_id + '.txt' | |||||
| file_path = os.path.join(self._output_path, file_name) | |||||
| file_path = validate_and_normalize_path( | |||||
| file_path, raise_key='Invalid min cycle counter file path.' | |||||
| ) | |||||
| with open(file_path, 'w') as file: | |||||
| file.write(str(min_cycle_counter)) | |||||
| @@ -16,11 +16,14 @@ | |||||
| import os | import os | ||||
| import time | import time | ||||
| from marshmallow import ValidationError | |||||
| from tabulate import tabulate | from tabulate import tabulate | ||||
| from mindinsight.profiler.analyser.analyser_factory import AnalyserFactory | from mindinsight.profiler.analyser.analyser_factory import AnalyserFactory | ||||
| from mindinsight.profiler.analyser.integrator import Integrator | from mindinsight.profiler.analyser.integrator import Integrator | ||||
| from mindinsight.profiler.common._utils import get_file_names, fwrite_format | from mindinsight.profiler.common._utils import get_file_names, fwrite_format | ||||
| from mindinsight.profiler.common.exceptions.exceptions import ProfilerFileNotFoundException, \ | |||||
| ProfilerIOException | |||||
| from mindinsight.profiler.common.log import logger | from mindinsight.profiler.common.log import logger | ||||
| from mindinsight.profiler.common.validator.checkparam import \ | from mindinsight.profiler.common.validator.checkparam import \ | ||||
| check_bool, check_subgraph | check_bool, check_subgraph | ||||
| @@ -76,6 +79,7 @@ class Profiler: | |||||
| optypes_to_deal='', optypes_not_deal='Variable', job_id=""): | optypes_to_deal='', optypes_not_deal='Variable', job_id=""): | ||||
| # get device_id and device_target | # get device_id and device_target | ||||
| device_target = "" | device_target = "" | ||||
| dev_id = "" | |||||
| try: | try: | ||||
| import mindspore.context as context | import mindspore.context as context | ||||
| dev_id = str(context.get_context("device_id")) | dev_id = str(context.get_context("device_id")) | ||||
| @@ -83,7 +87,7 @@ class Profiler: | |||||
| except ImportError: | except ImportError: | ||||
| logger.error("Profiling: fail to import context from mindspore.") | logger.error("Profiling: fail to import context from mindspore.") | ||||
| except ValueError as err: | except ValueError as err: | ||||
| logger.error("Profiling: fail to get context, %s", err.message) | |||||
| logger.error("Profiling: fail to get context, %s", err) | |||||
| if not dev_id: | if not dev_id: | ||||
| dev_id = os.getenv('DEVICE_ID') | dev_id = os.getenv('DEVICE_ID') | ||||
| @@ -185,7 +189,10 @@ class Profiler: | |||||
| # get op compute time from hwts data and framework data, write output_op_compute_time.txt | # get op compute time from hwts data and framework data, write output_op_compute_time.txt | ||||
| opcompute_output_filename = self._opcompute_output_filename_target + self._dev_id + ".txt" | opcompute_output_filename = self._opcompute_output_filename_target + self._dev_id + ".txt" | ||||
| opcompute_output_filename = os.path.join(self._output_path, opcompute_output_filename) | opcompute_output_filename = os.path.join(self._output_path, opcompute_output_filename) | ||||
| optime_parser = OPComputeTimeParser(hwts_output_filename, opcompute_output_filename, op_task_dict) | |||||
| optime_parser = OPComputeTimeParser( | |||||
| hwts_output_filename, opcompute_output_filename, | |||||
| op_task_dict, self._output_path, self._dev_id | |||||
| ) | |||||
| optime_parser.execute() | optime_parser.execute() | ||||
| # parse DATA_PREPROCESS.dev.AICPU file, write output_data_preprocess_aicpu_x.txt | # parse DATA_PREPROCESS.dev.AICPU file, write output_data_preprocess_aicpu_x.txt | ||||
| @@ -216,6 +223,9 @@ class Profiler: | |||||
| # analyse step trace info | # analyse step trace info | ||||
| self._analyse_step_trace(source_path, framework_parser) | self._analyse_step_trace(source_path, framework_parser) | ||||
| # analyse timeline info | |||||
| self._analyse_timeline() | |||||
| def _analyse_step_trace(self, source_path, framework_parser): | def _analyse_step_trace(self, source_path, framework_parser): | ||||
| """ | """ | ||||
| Analyse step trace data and save the result. | Analyse step trace data and save the result. | ||||
| @@ -240,7 +250,34 @@ class Profiler: | |||||
| parser.parse_and_save() | parser.parse_and_save() | ||||
| # print parser result | # print parser result | ||||
| parser.show() | parser.show() | ||||
| logger.info("Finish save the intermediate result %s", step_trace_intermediate_file_path) | |||||
| def _analyse_timeline(self): | |||||
| """ | |||||
| Analyse and parse timeline info. | |||||
| """ | |||||
| # Get framework info | |||||
| aicoredetail_analyser = AnalyserFactory.instance().get_analyser( | |||||
| 'aicore_detail', self._output_path, self._dev_id | |||||
| ) | |||||
| framework_info = aicoredetail_analyser.query() | |||||
| # Get all reduce info | |||||
| step_trace_analyser = AnalyserFactory.instance().get_analyser( | |||||
| 'step_trace', self._output_path, self._dev_id | |||||
| ) | |||||
| all_reduce_info = step_trace_analyser.query_for_all_reduce() | |||||
| # Get timeline info | |||||
| timeline_analyser = AnalyserFactory.instance().get_analyser( | |||||
| 'timeline', self._output_path, self._dev_id | |||||
| ) | |||||
| timeline_analyser.add_framework_info(framework_info) | |||||
| timeline_analyser.add_all_reduce_info(all_reduce_info) | |||||
| try: | |||||
| timeline_analyser.write_timeline() | |||||
| timeline_analyser.write_timeline_summary() | |||||
| except (ProfilerIOException, ProfilerFileNotFoundException, ValidationError) as err: | |||||
| logger.warning('Fail to write timeline data: %s', err) | |||||
| def __del__(self): | def __del__(self): | ||||
| """Disable the profiling collection service, called after training.""" | """Disable the profiling collection service, called after training.""" | ||||
| @@ -72,7 +72,6 @@ class TestProfilerAnalyse(TestCase): | |||||
| def test_step_trace_file_exist(self): | def test_step_trace_file_exist(self): | ||||
| """Test the step trace file has been generated""" | """Test the step trace file has been generated""" | ||||
| output_files = os.listdir(self.profiler) | output_files = os.listdir(self.profiler) | ||||
| assert len(output_files) == 9 | |||||
| assert self.step_trace_file in output_files | assert self.step_trace_file in output_files | ||||
| @pytest.mark.level0 | @pytest.mark.level0 | ||||