If the current job is the latest one in the loader pool and the job is deleted, the job goes into an infinite cycle of load-fail-delete-reload-load-fail-delete. So we need to prevent the infinite loop during XAI data loading failed.tags/v1.2.0-rc1
| @@ -29,7 +29,7 @@ from mindinsight.datavisual.data_access.file_handler import FileHandler | |||||
| from mindinsight.explainer.common.enums import ExplainFieldsEnum | from mindinsight.explainer.common.enums import ExplainFieldsEnum | ||||
| from mindinsight.explainer.common.log import logger | from mindinsight.explainer.common.log import logger | ||||
| from mindinsight.explainer.manager.explain_parser import ExplainParser | from mindinsight.explainer.manager.explain_parser import ExplainParser | ||||
| from mindinsight.utils.exceptions import ParamValueError | |||||
| from mindinsight.utils.exceptions import ParamValueError, UnknownError | |||||
| _NAN_CONSTANT = 'NaN' | _NAN_CONSTANT = 'NaN' | ||||
| _NUM_DIGITS = 6 | _NUM_DIGITS = 6 | ||||
| @@ -287,7 +287,10 @@ class ExplainLoader: | |||||
| is_end = False | is_end = False | ||||
| while not is_end and self.status != _LoaderStatus.STOP.value: | while not is_end and self.status != _LoaderStatus.STOP.value: | ||||
| file_changed, is_end, event_dict = self._parser.list_events(filenames) | |||||
| try: | |||||
| file_changed, is_end, event_dict = self._parser.list_events(filenames) | |||||
| except UnknownError: | |||||
| break | |||||
| if file_changed: | if file_changed: | ||||
| logger.info('Summary file in %s update, reload the data in the summary.', | logger.info('Summary file in %s update, reload the data in the summary.', | ||||
| @@ -14,11 +14,10 @@ | |||||
| # ============================================================================ | # ============================================================================ | ||||
| """ExplainManager.""" | """ExplainManager.""" | ||||
| from collections import OrderedDict | |||||
| import os | import os | ||||
| import threading | import threading | ||||
| import time | import time | ||||
| from collections import OrderedDict | |||||
| from datetime import datetime | from datetime import datetime | ||||
| from typing import Optional | from typing import Optional | ||||
| @@ -29,7 +28,7 @@ from mindinsight.datavisual.data_access.file_handler import FileHandler | |||||
| from mindinsight.datavisual.data_transform.summary_watcher import SummaryWatcher | from mindinsight.datavisual.data_transform.summary_watcher import SummaryWatcher | ||||
| from mindinsight.explainer.common.log import logger | from mindinsight.explainer.common.log import logger | ||||
| from mindinsight.explainer.manager.explain_loader import ExplainLoader | from mindinsight.explainer.manager.explain_loader import ExplainLoader | ||||
| from mindinsight.utils.exceptions import MindInsightException, ParamValueError, UnknownError | |||||
| from mindinsight.utils.exceptions import ParamValueError, UnknownError | |||||
| _MAX_LOADERS_NUM = 3 | _MAX_LOADERS_NUM = 3 | ||||
| @@ -226,23 +225,17 @@ class ExplainManager: | |||||
| """Execute the data loading.""" | """Execute the data loading.""" | ||||
| # We will load the newest loader first. | # We will load the newest loader first. | ||||
| for loader_id in list(self._loader_pool.keys())[::-1]: | for loader_id in list(self._loader_pool.keys())[::-1]: | ||||
| try: | |||||
| with self._loader_pool_mutex: | |||||
| loader = self._loader_pool.get(loader_id, None) | |||||
| if loader is None: | |||||
| logger.debug('Loader %r has been deleted, will not load data.', loader_id) | |||||
| continue | |||||
| with self._loader_pool_mutex: | |||||
| loader = self._loader_pool.get(loader_id, None) | |||||
| if loader is None: | |||||
| logger.debug('Loader %r has been deleted, will not load data.', loader_id) | |||||
| continue | |||||
| if self.status == _ExplainManagerStatus.STOPPING.value: | |||||
| logger.info('Loader %s status is %s, will return.', loader_id, loader.status) | |||||
| return | |||||
| loader.load() | |||||
| if self.status == _ExplainManagerStatus.STOPPING.value: | |||||
| logger.info('Loader %s status is %s, will return.', loader_id, loader.status) | |||||
| return | |||||
| except MindInsightException as ex: | |||||
| logger.warning('Data loader %r load data failed. Delete data_loader. Detail: %s.', loader_id, ex) | |||||
| with self._loader_pool_mutex: | |||||
| self._delete_loader(loader_id) | |||||
| loader.load() | |||||
| def _delete_loader(self, loader_id): | def _delete_loader(self, loader_id): | ||||
| """Delete loader given loader_id.""" | """Delete loader given loader_id.""" | ||||
| @@ -19,14 +19,12 @@ This module is used to parse the MindExplain log file. | |||||
| """ | """ | ||||
| from collections import namedtuple | from collections import namedtuple | ||||
| from google.protobuf.message import DecodeError | |||||
| from mindinsight.datavisual.common import exceptions | from mindinsight.datavisual.common import exceptions | ||||
| from mindinsight.explainer.common.enums import ExplainFieldsEnum | |||||
| from mindinsight.explainer.common.log import logger | |||||
| from mindinsight.datavisual.data_access.file_handler import FileHandler | from mindinsight.datavisual.data_access.file_handler import FileHandler | ||||
| from mindinsight.datavisual.data_transform.ms_data_loader import _SummaryParser | from mindinsight.datavisual.data_transform.ms_data_loader import _SummaryParser | ||||
| from mindinsight.datavisual.proto_files import mindinsight_summary_pb2 as summary_pb2 | from mindinsight.datavisual.proto_files import mindinsight_summary_pb2 as summary_pb2 | ||||
| from mindinsight.explainer.common.enums import ExplainFieldsEnum | |||||
| from mindinsight.explainer.common.log import logger | |||||
| from mindinsight.utils.exceptions import UnknownError | from mindinsight.utils.exceptions import UnknownError | ||||
| HEADER_SIZE = 8 | HEADER_SIZE = 8 | ||||
| @@ -109,15 +107,20 @@ class ExplainParser(_SummaryParser): | |||||
| except (exceptions.CRCFailedError, exceptions.CRCLengthFailedError) as ex: | except (exceptions.CRCFailedError, exceptions.CRCLengthFailedError) as ex: | ||||
| self._summary_file_handler.reset_offset(start_offset) | self._summary_file_handler.reset_offset(start_offset) | ||||
| is_end = True | is_end = True | ||||
| logger.warning("Check crc failed and ignore this file, file_path=%s, offset=%s. Detail: %r.", | |||||
| logger.warning("Check crc failed and reset offset, file_path=%s, offset=%s. Detail: %r.", | |||||
| self._summary_file_handler.file_path, self._summary_file_handler.offset, str(ex)) | self._summary_file_handler.file_path, self._summary_file_handler.offset, str(ex)) | ||||
| return file_changed, is_end, event_data | return file_changed, is_end, event_data | ||||
| except (OSError, DecodeError, exceptions.MindInsightException) as ex: | |||||
| is_end = True | |||||
| logger.warning("Parse log file fail, and ignore this file, detail: %r," | |||||
| "file path: %s.", str(ex), self._summary_file_handler.file_path) | |||||
| return file_changed, is_end, event_data | |||||
| except Exception as ex: | except Exception as ex: | ||||
| # Note: If an unknown error occurs, we will set the offset to the end of this file, | |||||
| # which is equivalent to stopping parsing this file. We do not delete the current job | |||||
| # and retain the data that has been successfully parsed. | |||||
| self._summary_file_handler.reset_offset(new_size) | |||||
| # Notice: If the current job is the latest one in the loader pool and the job is deleted, | |||||
| # the job goes into an infinite cycle of load-fail-delete-reload-load-fail-delete. | |||||
| # We need to prevent this infinite loop. | |||||
| logger.error("Parse summary file failed, will set offset to the file end. file_path: %s, " | |||||
| "offset: %d, detail: %s.", file_path, self._summary_file_handler.offset, str(ex)) | |||||
| logger.exception(ex) | logger.exception(ex) | ||||
| raise UnknownError(str(ex)) | raise UnknownError(str(ex)) | ||||
| finally: | finally: | ||||