From a685dbc20dc8ace63bca1f5b5f99e4ded51c8458 Mon Sep 17 00:00:00 2001 From: ougongchang Date: Fri, 25 Dec 2020 11:19:04 +0800 Subject: [PATCH] Prevent the infinite loop during XAI data loading failed If the current job is the latest one in the loader pool and the job is deleted, the job goes into an infinite cycle of load-fail-delete-reload-load-fail-delete. So we need to prevent the infinite loop during XAI data loading failed. --- .../explainer/manager/explain_loader.py | 7 +++-- .../explainer/manager/explain_manager.py | 29 +++++++------------ .../explainer/manager/explain_parser.py | 23 ++++++++------- 3 files changed, 29 insertions(+), 30 deletions(-) diff --git a/mindinsight/explainer/manager/explain_loader.py b/mindinsight/explainer/manager/explain_loader.py index 53c3b2be..5ee9c332 100644 --- a/mindinsight/explainer/manager/explain_loader.py +++ b/mindinsight/explainer/manager/explain_loader.py @@ -29,7 +29,7 @@ from mindinsight.datavisual.data_access.file_handler import FileHandler from mindinsight.explainer.common.enums import ExplainFieldsEnum from mindinsight.explainer.common.log import logger from mindinsight.explainer.manager.explain_parser import ExplainParser -from mindinsight.utils.exceptions import ParamValueError +from mindinsight.utils.exceptions import ParamValueError, UnknownError _NAN_CONSTANT = 'NaN' _NUM_DIGITS = 6 @@ -287,7 +287,10 @@ class ExplainLoader: is_end = False while not is_end and self.status != _LoaderStatus.STOP.value: - file_changed, is_end, event_dict = self._parser.list_events(filenames) + try: + file_changed, is_end, event_dict = self._parser.list_events(filenames) + except UnknownError: + break if file_changed: logger.info('Summary file in %s update, reload the data in the summary.', diff --git a/mindinsight/explainer/manager/explain_manager.py b/mindinsight/explainer/manager/explain_manager.py index 39072316..4d8ea791 100644 --- a/mindinsight/explainer/manager/explain_manager.py +++ b/mindinsight/explainer/manager/explain_manager.py @@ -14,11 +14,10 @@ # ============================================================================ """ExplainManager.""" -from collections import OrderedDict - import os import threading import time +from collections import OrderedDict from datetime import datetime from typing import Optional @@ -29,7 +28,7 @@ from mindinsight.datavisual.data_access.file_handler import FileHandler from mindinsight.datavisual.data_transform.summary_watcher import SummaryWatcher from mindinsight.explainer.common.log import logger from mindinsight.explainer.manager.explain_loader import ExplainLoader -from mindinsight.utils.exceptions import MindInsightException, ParamValueError, UnknownError +from mindinsight.utils.exceptions import ParamValueError, UnknownError _MAX_LOADERS_NUM = 3 @@ -226,23 +225,17 @@ class ExplainManager: """Execute the data loading.""" # We will load the newest loader first. for loader_id in list(self._loader_pool.keys())[::-1]: - try: - with self._loader_pool_mutex: - loader = self._loader_pool.get(loader_id, None) - if loader is None: - logger.debug('Loader %r has been deleted, will not load data.', loader_id) - continue + with self._loader_pool_mutex: + loader = self._loader_pool.get(loader_id, None) + if loader is None: + logger.debug('Loader %r has been deleted, will not load data.', loader_id) + continue - if self.status == _ExplainManagerStatus.STOPPING.value: - logger.info('Loader %s status is %s, will return.', loader_id, loader.status) - return - - loader.load() + if self.status == _ExplainManagerStatus.STOPPING.value: + logger.info('Loader %s status is %s, will return.', loader_id, loader.status) + return - except MindInsightException as ex: - logger.warning('Data loader %r load data failed. Delete data_loader. Detail: %s.', loader_id, ex) - with self._loader_pool_mutex: - self._delete_loader(loader_id) + loader.load() def _delete_loader(self, loader_id): """Delete loader given loader_id.""" diff --git a/mindinsight/explainer/manager/explain_parser.py b/mindinsight/explainer/manager/explain_parser.py index 00576205..2e7e8e7c 100644 --- a/mindinsight/explainer/manager/explain_parser.py +++ b/mindinsight/explainer/manager/explain_parser.py @@ -19,14 +19,12 @@ This module is used to parse the MindExplain log file. """ from collections import namedtuple -from google.protobuf.message import DecodeError - from mindinsight.datavisual.common import exceptions -from mindinsight.explainer.common.enums import ExplainFieldsEnum -from mindinsight.explainer.common.log import logger from mindinsight.datavisual.data_access.file_handler import FileHandler from mindinsight.datavisual.data_transform.ms_data_loader import _SummaryParser from mindinsight.datavisual.proto_files import mindinsight_summary_pb2 as summary_pb2 +from mindinsight.explainer.common.enums import ExplainFieldsEnum +from mindinsight.explainer.common.log import logger from mindinsight.utils.exceptions import UnknownError HEADER_SIZE = 8 @@ -109,15 +107,20 @@ class ExplainParser(_SummaryParser): except (exceptions.CRCFailedError, exceptions.CRCLengthFailedError) as ex: self._summary_file_handler.reset_offset(start_offset) is_end = True - logger.warning("Check crc failed and ignore this file, file_path=%s, offset=%s. Detail: %r.", + logger.warning("Check crc failed and reset offset, file_path=%s, offset=%s. Detail: %r.", self._summary_file_handler.file_path, self._summary_file_handler.offset, str(ex)) return file_changed, is_end, event_data - except (OSError, DecodeError, exceptions.MindInsightException) as ex: - is_end = True - logger.warning("Parse log file fail, and ignore this file, detail: %r," - "file path: %s.", str(ex), self._summary_file_handler.file_path) - return file_changed, is_end, event_data except Exception as ex: + # Note: If an unknown error occurs, we will set the offset to the end of this file, + # which is equivalent to stopping parsing this file. We do not delete the current job + # and retain the data that has been successfully parsed. + self._summary_file_handler.reset_offset(new_size) + + # Notice: If the current job is the latest one in the loader pool and the job is deleted, + # the job goes into an infinite cycle of load-fail-delete-reload-load-fail-delete. + # We need to prevent this infinite loop. + logger.error("Parse summary file failed, will set offset to the file end. file_path: %s, " + "offset: %d, detail: %s.", file_path, self._summary_file_handler.offset, str(ex)) logger.exception(ex) raise UnknownError(str(ex)) finally: