|
|
@@ -47,8 +47,6 @@ class TimelineAnalyser(BaseAnalyser): |
|
|
|
|
|
|
|
|
def _load(self): |
|
|
def _load(self): |
|
|
"""Load data according to the parsed profiling files.""" |
|
|
"""Load data according to the parsed profiling files.""" |
|
|
self.load_timeline_data() |
|
|
|
|
|
self._timeline_summary['op_exe_times'] = len(self._timeline_meta) |
|
|
|
|
|
|
|
|
|
|
|
def _filter(self, filter_condition): |
|
|
def _filter(self, filter_condition): |
|
|
""" |
|
|
""" |
|
|
@@ -122,6 +120,7 @@ class TimelineAnalyser(BaseAnalyser): |
|
|
def write_timeline(self): |
|
|
def write_timeline(self): |
|
|
"""Load data according to the parsed profiling files.""" |
|
|
"""Load data according to the parsed profiling files.""" |
|
|
# Write timeline to file. |
|
|
# Write timeline to file. |
|
|
|
|
|
logger.info('Writing timeline file...') |
|
|
file_size = self.write_timeline_to_json() |
|
|
file_size = self.write_timeline_to_json() |
|
|
|
|
|
|
|
|
# If the file size is larger than 20MB, open a new file and |
|
|
# If the file size is larger than 20MB, open a new file and |
|
|
@@ -131,6 +130,8 @@ class TimelineAnalyser(BaseAnalyser): |
|
|
# write to json file for display |
|
|
# write to json file for display |
|
|
self.write_timeline_to_json_by_limitation() |
|
|
self.write_timeline_to_json_by_limitation() |
|
|
|
|
|
|
|
|
|
|
|
logger.info('Finished file writing!') |
|
|
|
|
|
|
|
|
def write_timeline_to_json(self): |
|
|
def write_timeline_to_json(self): |
|
|
"""Write timeline to json.""" |
|
|
"""Write timeline to json.""" |
|
|
timeline_filename = self._timeline_filename.format(self._device_id) |
|
|
timeline_filename = self._timeline_filename.format(self._device_id) |
|
|
@@ -197,7 +198,7 @@ class TimelineAnalyser(BaseAnalyser): |
|
|
logger.error('Error occurred when write timeline summary file: %s', err) |
|
|
logger.error('Error occurred when write timeline summary file: %s', err) |
|
|
raise ProfilerIOException |
|
|
raise ProfilerIOException |
|
|
|
|
|
|
|
|
def load_timeline_data(self): |
|
|
|
|
|
|
|
|
def _load_timeline_data(self): |
|
|
"""Load timeline data from file.""" |
|
|
"""Load timeline data from file.""" |
|
|
file_path = os.path.join( |
|
|
file_path = os.path.join( |
|
|
self._profiling_dir, |
|
|
self._profiling_dir, |
|
|
@@ -210,34 +211,37 @@ class TimelineAnalyser(BaseAnalyser): |
|
|
logger.error("Failed to find parsed timeline file.") |
|
|
logger.error("Failed to find parsed timeline file.") |
|
|
raise ProfilerFileNotFoundException('parsed timeline file') |
|
|
raise ProfilerFileNotFoundException('parsed timeline file') |
|
|
|
|
|
|
|
|
stream_count_dict = {} |
|
|
|
|
|
|
|
|
timeline_list = [] |
|
|
try: |
|
|
try: |
|
|
with open(file_path, 'r') as f_obj: |
|
|
with open(file_path, 'r') as f_obj: |
|
|
for line in f_obj: |
|
|
for line in f_obj: |
|
|
if not line.startswith('op_name'): |
|
|
if not line.startswith('op_name'): |
|
|
line_list = line.strip('\n').split(',') |
|
|
line_list = line.strip('\n').split(',') |
|
|
self._parse_timeline_data(line_list) |
|
|
|
|
|
self._update_num_of_streams(line_list, stream_count_dict) |
|
|
|
|
|
|
|
|
timeline_list.append(line_list) |
|
|
except (IOError, OSError) as err: |
|
|
except (IOError, OSError) as err: |
|
|
logger.error('Error occurred when read timeline intermediate file: %s', err) |
|
|
logger.error('Error occurred when read timeline intermediate file: %s', err) |
|
|
raise ProfilerIOException |
|
|
raise ProfilerIOException |
|
|
|
|
|
|
|
|
# Update timeline summary info |
|
|
|
|
|
self._timeline_summary['num_of_streams'] = len(stream_count_dict.keys()) |
|
|
|
|
|
|
|
|
return timeline_list |
|
|
|
|
|
|
|
|
def _parse_timeline_data(self, line_list): |
|
|
def _parse_timeline_data(self, line_list): |
|
|
"""Parse timeline data.""" |
|
|
"""Parse timeline data.""" |
|
|
|
|
|
# factor to convert the time unit from 1ms to 1us for timeline display |
|
|
factor = 1000 |
|
|
factor = 1000 |
|
|
op_meta = TimelineContainer(line_list) |
|
|
op_meta = TimelineContainer(line_list) |
|
|
timeline_dict = {} |
|
|
timeline_dict = {} |
|
|
timeline_dict['name'] = op_meta.op_name |
|
|
timeline_dict['name'] = op_meta.op_name |
|
|
timeline_dict['ph'] = 'X' |
|
|
timeline_dict['ph'] = 'X' |
|
|
timeline_dict['pid'] = int(self._device_id) |
|
|
|
|
|
timeline_dict['tid'] = op_meta.stream_id |
|
|
timeline_dict['tid'] = op_meta.stream_id |
|
|
timeline_dict['ts'] = op_meta.start_time * factor |
|
|
timeline_dict['ts'] = op_meta.start_time * factor |
|
|
dur = op_meta.duration * factor |
|
|
dur = op_meta.duration * factor |
|
|
timeline_dict['dur'] = dur |
|
|
timeline_dict['dur'] = dur |
|
|
self._timeline_summary['total_time'] += dur |
|
|
|
|
|
|
|
|
if op_meta.pid == 10000: # AllReduce PID |
|
|
|
|
|
timeline_dict['pid'] = 10000 |
|
|
|
|
|
else: |
|
|
|
|
|
timeline_dict['pid'] = int(self._device_id) |
|
|
|
|
|
# Update total time of operator execution. |
|
|
|
|
|
self._timeline_summary['total_time'] += dur |
|
|
self._timeline_meta.append(timeline_dict) |
|
|
self._timeline_meta.append(timeline_dict) |
|
|
|
|
|
|
|
|
@staticmethod |
|
|
@staticmethod |
|
|
@@ -249,7 +253,7 @@ class TimelineAnalyser(BaseAnalyser): |
|
|
else: |
|
|
else: |
|
|
stream_count_dict[stream_id] += 1 |
|
|
stream_count_dict[stream_id] += 1 |
|
|
|
|
|
|
|
|
def get_min_cycle_counter_from_file(self): |
|
|
|
|
|
|
|
|
def get_min_cycle_counter(self): |
|
|
""" |
|
|
""" |
|
|
Get minimum cycle counter. |
|
|
Get minimum cycle counter. |
|
|
|
|
|
|
|
|
@@ -280,48 +284,50 @@ class TimelineAnalyser(BaseAnalyser): |
|
|
|
|
|
|
|
|
return min_cycle_counter |
|
|
return min_cycle_counter |
|
|
|
|
|
|
|
|
def add_all_reduce_info(self, all_reduce_info): |
|
|
|
|
|
|
|
|
def init_timeline(self, all_reduce_info, framework_info): |
|
|
""" |
|
|
""" |
|
|
Add all reduce info into timeline metadata. |
|
|
|
|
|
|
|
|
Init timeline metadata, adding all collected info. |
|
|
|
|
|
|
|
|
Args: |
|
|
Args: |
|
|
all_reduce_info (list<dict>): The metadata of AllReduce operator. |
|
|
|
|
|
[ |
|
|
|
|
|
{ |
|
|
|
|
|
'stream_id_1': [(start_time, end_time, duration, field_name)], |
|
|
|
|
|
... |
|
|
|
|
|
}, |
|
|
|
|
|
{...} |
|
|
|
|
|
] |
|
|
|
|
|
|
|
|
all_reduce_info (list[list]): The metadata of AllReduce operator. |
|
|
|
|
|
framework_info (dict): The framework metadata. |
|
|
""" |
|
|
""" |
|
|
logger.info('Adding AllReduce info...') |
|
|
|
|
|
factor = 100 |
|
|
|
|
|
min_cycle_counter = self.get_min_cycle_counter_from_file() |
|
|
|
|
|
for step_meta in all_reduce_info: |
|
|
|
|
|
for stream_id, time_info_list in step_meta.items(): |
|
|
|
|
|
for time_info in time_info_list: |
|
|
|
|
|
start, _, dur, name = time_info |
|
|
|
|
|
all_reduce_dict = {} |
|
|
|
|
|
all_reduce_dict['name'] = name |
|
|
|
|
|
all_reduce_dict['ph'] = 'X' |
|
|
|
|
|
# Using 10000 to represent AllReduce |
|
|
|
|
|
all_reduce_dict['pid'] = 10000 |
|
|
|
|
|
all_reduce_dict['tid'] = int(stream_id) |
|
|
|
|
|
all_reduce_dict['ts'] = (start - min_cycle_counter) / factor |
|
|
|
|
|
all_reduce_dict['dur'] = dur / factor |
|
|
|
|
|
self._timeline_meta.append(all_reduce_dict) |
|
|
|
|
|
self._timeline_summary['total_time'] += all_reduce_dict['dur'] |
|
|
|
|
|
|
|
|
|
|
|
def add_framework_info(self, framework_info): |
|
|
|
|
|
|
|
|
logger.info('Initiating timeline...') |
|
|
|
|
|
timeline_list = self._load_timeline_data() |
|
|
|
|
|
self._timeline_summary['op_exe_times'] = len(timeline_list) |
|
|
|
|
|
|
|
|
|
|
|
# Add AllReduce info to timeline temp list and sort by start time. |
|
|
|
|
|
if all_reduce_info: |
|
|
|
|
|
logger.debug('AllReduce info found. Start adding info into timeline...') |
|
|
|
|
|
timeline_list.extend(all_reduce_info) |
|
|
|
|
|
timeline_list.sort(key=lambda x: float(x[2])) |
|
|
|
|
|
|
|
|
|
|
|
# Init a dict for counting the num of streams. |
|
|
|
|
|
stream_count_dict = {} |
|
|
|
|
|
for timeline in timeline_list: |
|
|
|
|
|
self._parse_timeline_data(timeline) |
|
|
|
|
|
# Updating the collection of streams. |
|
|
|
|
|
if len(timeline) == 4: |
|
|
|
|
|
self._update_num_of_streams(timeline, stream_count_dict) |
|
|
|
|
|
|
|
|
|
|
|
# Get framework metadata. |
|
|
|
|
|
framework_obj_list = framework_info.get('object') |
|
|
|
|
|
# The length of list is the number of operators. |
|
|
|
|
|
self._timeline_summary['num_of_ops'] = len(framework_obj_list) |
|
|
|
|
|
self._add_framework_info(framework_obj_list) |
|
|
|
|
|
logger.info('Finished adding info into timeline...') |
|
|
|
|
|
|
|
|
|
|
|
# Update timeline summary info |
|
|
|
|
|
self._timeline_summary['num_of_streams'] = len(stream_count_dict.keys()) |
|
|
|
|
|
|
|
|
|
|
|
def _add_framework_info(self, framework_obj_list): |
|
|
""" |
|
|
""" |
|
|
Add framework info into timeline metadata. |
|
|
Add framework info into timeline metadata. |
|
|
|
|
|
|
|
|
Args: |
|
|
Args: |
|
|
framework_info (dict): The framework metadata. |
|
|
|
|
|
|
|
|
framework_obj_list (list): The framework metadata. |
|
|
""" |
|
|
""" |
|
|
logger.info('Adding framework info...') |
|
|
|
|
|
framework_obj_list = framework_info.get('object') |
|
|
|
|
|
self._timeline_summary['num_of_ops'] = len(framework_obj_list) |
|
|
|
|
|
|
|
|
logger.debug('Start adding framework info into timeline...') |
|
|
for framework_obj in framework_obj_list: |
|
|
for framework_obj in framework_obj_list: |
|
|
op_name = framework_obj[0] |
|
|
op_name = framework_obj[0] |
|
|
op_type = framework_obj[1] |
|
|
op_type = framework_obj[1] |
|
|
@@ -335,3 +341,5 @@ class TimelineAnalyser(BaseAnalyser): |
|
|
'fullname': op_full_name |
|
|
'fullname': op_full_name |
|
|
} |
|
|
} |
|
|
timeline_obj['args'].update(op_info) |
|
|
timeline_obj['args'].update(op_info) |
|
|
|
|
|
|
|
|
|
|
|
logger.debug('Finished adding framework info into timeline...') |