From da797e3e2ffbc612092d1562f4fa7aa978ecb22f Mon Sep 17 00:00:00 2001 From: Li Hongzhang Date: Tue, 15 Sep 2020 17:48:15 +0800 Subject: [PATCH] parse pb files in executor --- .../data_transform/ms_data_loader.py | 38 ++++++++++--------- .../data_transform/summary_watcher.py | 30 +++++++-------- .../data_transform/test_ms_data_loader.py | 2 +- 3 files changed, 35 insertions(+), 35 deletions(-) diff --git a/mindinsight/datavisual/data_transform/ms_data_loader.py b/mindinsight/datavisual/data_transform/ms_data_loader.py index f96780dd..c8028a8a 100644 --- a/mindinsight/datavisual/data_transform/ms_data_loader.py +++ b/mindinsight/datavisual/data_transform/ms_data_loader.py @@ -197,14 +197,16 @@ class _PbParser(_Parser): for filename in pb_filenames: if not self._set_latest_file(filename): continue - - try: - tensor_event = self._parse_pb_file(filename) - except UnknownError: - # Parse pb file failed, so return None. - continue - - events_data.add_tensor_event(tensor_event) + future = executor.submit(self._parse_pb_file, self._summary_dir, filename) + def add_tensor_event(future_value): + try: + tensor_event = future_value.result() + if tensor_event is not None: + events_data.add_tensor_event(tensor_event) + except Exception as ex: + logger.exception(ex) + raise UnknownError(str(ex)) + future.add_done_callback(add_tensor_event) return False return True @@ -249,7 +251,8 @@ class _PbParser(_Parser): return True - def _parse_pb_file(self, filename): + @staticmethod + def _parse_pb_file(summary_dir, filename): """ Parse pb file and write content to `EventsData`. @@ -259,7 +262,7 @@ class _PbParser(_Parser): Returns: TensorEvent, if load pb file and build graph success, will return tensor event, else return None. """ - file_path = FileHandler.join(self._summary_dir, filename) + file_path = FileHandler.join(summary_dir, filename) logger.info("Start to load graph from pb file, file path: %s.", file_path) filehandler = FileHandler(file_path) model_proto = anf_ir_pb2.ModelProto() @@ -280,7 +283,7 @@ class _PbParser(_Parser): logger.exception(ex) raise UnknownError(str(ex)) - tensor_event = TensorEvent(wall_time=FileHandler.file_stat(file_path), + tensor_event = TensorEvent(wall_time=FileHandler.file_stat(file_path).mtime, step=0, tag=filename, plugin_name=PluginNameEnum.GRAPH.value, @@ -298,7 +301,6 @@ class _SummaryParser(_Parser): super(_SummaryParser, self).__init__(summary_dir) self._latest_file_size = 0 self._summary_file_handler = None - self._events_data = None def parse_files(self, executor, filenames, events_data): """ @@ -312,7 +314,6 @@ class _SummaryParser(_Parser): Returns: bool, True if all the summary files are finished loading. """ - self._events_data = events_data summary_files = self.filter_files(filenames) summary_files = self.sort_files(summary_files) if self._latest_filename in summary_files: @@ -332,7 +333,7 @@ class _SummaryParser(_Parser): continue try: - if not self._load_single_file(self._summary_file_handler, executor): + if not self._load_single_file(self._summary_file_handler, executor, events_data): self._latest_file_size = self._summary_file_handler.offset else: self._latest_file_size = new_size @@ -358,13 +359,14 @@ class _SummaryParser(_Parser): lambda filename: (re.search(r'summary\.\d+', filename) and not filename.endswith("_lineage")), filenames)) - def _load_single_file(self, file_handler, executor): + def _load_single_file(self, file_handler, executor, events_data): """ Load a log file data. Args: file_handler (FileHandler): A file handler. executor (Executor): The executor instance. + events_data (EventsData): The container of event data. Returns: bool, True if the summary file is finished loading. @@ -389,15 +391,15 @@ class _SummaryParser(_Parser): for tensor_value in tensor_values: if tensor_value.plugin_name == PluginNameEnum.GRAPH.value: try: - graph_tags = self._events_data.list_tags_by_plugin(PluginNameEnum.GRAPH.value) + graph_tags = events_data.list_tags_by_plugin(PluginNameEnum.GRAPH.value) except KeyError: graph_tags = [] summary_tags = self.filter_files(graph_tags) for tag in summary_tags: - self._events_data.delete_tensor_event(tag) + events_data.delete_tensor_event(tag) - self._events_data.add_tensor_event(tensor_value) + events_data.add_tensor_event(tensor_value) except Exception as exc: # Log exception for debugging. logger.exception(exc) diff --git a/mindinsight/datavisual/data_transform/summary_watcher.py b/mindinsight/datavisual/data_transform/summary_watcher.py index 197a1130..80c9c7f9 100644 --- a/mindinsight/datavisual/data_transform/summary_watcher.py +++ b/mindinsight/datavisual/data_transform/summary_watcher.py @@ -205,14 +205,7 @@ class SummaryWatcher: except OverflowError: return if relative_path not in summary_dict: - summary_dict[relative_path] = { - 'create_time': ctime, - 'update_time': mtime, - 'summary_files': 0, - 'lineage_files': 0, - 'graph_files': 0, - 'profiler': None, - } + summary_dict[relative_path] = _new_entry(ctime, mtime) if summary_dict[relative_path]['create_time'] < ctime: summary_dict[relative_path].update({ 'create_time': ctime, @@ -241,14 +234,7 @@ class SummaryWatcher: if relative_path in summary_dict: summary_dict[relative_path]['profiler'] = profiler else: - summary_dict[relative_path] = { - 'create_time': ctime, - 'summary_files': 0, - 'lineage_files': 0, - 'graph_files': 0, - 'update_time': mtime, - 'profiler': profiler - } + summary_dict[relative_path] = _new_entry(ctime, mtime, profiler) def is_summary_directory(self, summary_base_dir, relative_path): """ @@ -410,3 +396,15 @@ class SummaryWatcher: summaries.sort(key=lambda x: (-int(x['update_time'].timestamp()), x['file_name'])) return summaries + + +def _new_entry(ctime, mtime, profiler=None): + """Create a new entry.""" + return { + 'create_time': ctime, + 'update_time': mtime, + 'summary_files': 0, + 'lineage_files': 0, + 'graph_files': 0, + 'profiler': profiler + } diff --git a/tests/ut/datavisual/data_transform/test_ms_data_loader.py b/tests/ut/datavisual/data_transform/test_ms_data_loader.py index c8530615..eb88663e 100644 --- a/tests/ut/datavisual/data_transform/test_ms_data_loader.py +++ b/tests/ut/datavisual/data_transform/test_ms_data_loader.py @@ -149,7 +149,7 @@ class TestPbParser: filename = 'ms_output.pb' create_graph_pb_file(output_dir=self._summary_dir, filename=filename) parser = _PbParser(self._summary_dir) - tensor_event = parser._parse_pb_file(filename) + tensor_event = parser._parse_pb_file(self._summary_dir, filename) assert isinstance(tensor_event, TensorEvent) def test_set_latest_file(self):