diff --git a/mindinsight/datavisual/common/exceptions.py b/mindinsight/datavisual/common/exceptions.py index c27c442a..6b6dab1e 100644 --- a/mindinsight/datavisual/common/exceptions.py +++ b/mindinsight/datavisual/common/exceptions.py @@ -57,17 +57,17 @@ class SummaryLogPathInvalid(MindInsightException): class CRCFailedError(MindInsightException): """CRC fail, record corrupted.""" - def __init__(self): - error_msg = 'CRC Failed.' + def __init__(self, error_detail): + error_msg = 'CRC Failed. Detail: %s' % error_detail super(CRCFailedError, self).__init__(DataVisualErrors.CRC_FAILED, error_msg, http_code=400) class CRCLengthFailedError(MindInsightException): - """CRC fail, record corrupted.""" - def __init__(self): - error_msg = 'CRC Length Failed.' + """CRC length fail, record corrupted.""" + def __init__(self, error_detail): + error_msg = 'CRC Length Failed. Detail: %s' % error_detail super(CRCLengthFailedError, self).__init__(DataVisualErrors.CRC_LENGTH_FAILED, error_msg, http_code=400) diff --git a/mindinsight/datavisual/data_transform/ms_data_loader.py b/mindinsight/datavisual/data_transform/ms_data_loader.py index 2a8cbc49..2a452273 100644 --- a/mindinsight/datavisual/data_transform/ms_data_loader.py +++ b/mindinsight/datavisual/data_transform/ms_data_loader.py @@ -335,7 +335,7 @@ class _SummaryParser(_Parser): else: self._latest_file_size = new_size # Wait for data in this file to be processed to avoid loading multiple files at the same time. - logger.info("Parse summary file offset %d, file path: %s.", self._latest_file_size, file_path) + logger.debug("Parse summary file offset %d, file path: %s.", self._latest_file_size, file_path) return False except UnknownError as ex: logger.warning("Parse summary file failed, detail: %r," @@ -371,7 +371,7 @@ class _SummaryParser(_Parser): while True: start_offset = file_handler.offset try: - event_str = self._event_load(file_handler) + event_str = self.event_load(file_handler) if event_str is None: file_handler.reset_offset(start_offset) return True @@ -399,27 +399,23 @@ class _SummaryParser(_Parser): future.add_done_callback(exception_no_raise_wrapper(_add_tensor_event_callback)) return False - except exceptions.CRCLengthFailedError: + except (exceptions.CRCFailedError, exceptions.CRCLengthFailedError) as exc: file_handler.reset_offset(start_offset) - logger.warning( - "Check crc length failed, please check the summary file integrity, " - "the file may be in transfer, file_path: %s, offset=%s.", - file_handler.file_path, start_offset) - return True - except exceptions.CRCFailedError: - file_handler.reset_offset(start_offset) - logger.warning("Check crc faild and ignore this file, file_path=%s, " - "offset=%s.", file_handler.file_path, file_handler.offset) + file_size = file_handler.file_stat(file_handler.file_path).size + logger.error("Check crc failed and ignore this file, please check the integrity of the file, " + "file_path: %s, offset: %s, file size: %s. Detail: %s.", + file_handler.file_path, file_handler.offset, file_size, str(exc)) return True except (OSError, DecodeError, exceptions.MindInsightException) as ex: - logger.warning("Parse log file fail, and ignore this file, detail: %r," - "file path: %s.", str(ex), file_handler.file_path) + logger.error("Parse log file fail, and ignore this file, detail: %r, " + "file path: %s.", str(ex), file_handler.file_path) return True except Exception as ex: logger.exception(ex) raise UnknownError(str(ex)) - def _event_load(self, file_handler): + @staticmethod + def event_load(file_handler): """ Load binary string to event string. @@ -439,9 +435,9 @@ class _SummaryParser(_Parser): header_crc_str = '' if len(header_str) != HEADER_SIZE or len(header_crc_str) != CRC_STR_SIZE: - raise exceptions.CRCLengthFailedError + raise exceptions.CRCLengthFailedError("CRC header length or event header length is incorrect.") if not crc32.CheckValueAgainstData(header_crc_str, header_str, HEADER_SIZE): - raise exceptions.CRCFailedError() + raise exceptions.CRCFailedError("The header of event crc is failed.") # read the event body if integrity of header is verified header = struct.unpack('Q', header_str) @@ -455,9 +451,9 @@ class _SummaryParser(_Parser): event_crc_str = '' if len(event_str) != event_len or len(event_crc_str) != CRC_STR_SIZE: - raise exceptions.CRCLengthFailedError + raise exceptions.CRCLengthFailedError("The event sting length or crc length is incorrect.") if not crc32.CheckValueAgainstData(event_crc_str, event_str, event_len): - raise exceptions.CRCFailedError() + raise exceptions.CRCFailedError("The event string crc is incorrect.") return event_str diff --git a/mindinsight/datavisual/data_transform/summary_parser/event_parser.py b/mindinsight/datavisual/data_transform/summary_parser/event_parser.py index adaadef7..97a9aca3 100644 --- a/mindinsight/datavisual/data_transform/summary_parser/event_parser.py +++ b/mindinsight/datavisual/data_transform/summary_parser/event_parser.py @@ -12,18 +12,13 @@ # See the License for the specific language governing permissions and # limitations under the License. # ============================================================================ -""" -Scalar Writer. +"""Parse summary file and save it local file.""" -This module write scalar into a csv file. -""" import os import time -import struct from google.protobuf.message import DecodeError -from mindinsight.datavisual.utils import crc32 from mindinsight.datavisual.common import exceptions from mindinsight.datavisual.common.log import parse_summary_logger from mindinsight.datavisual.proto_files import lazy_read_pb2 @@ -31,6 +26,8 @@ from mindinsight.datavisual.data_access.file_handler import FileHandler from mindinsight.datavisual.data_transform.summary_parser.image_writer import ImageWriter from mindinsight.datavisual.data_transform.summary_parser.scalar_writer import ScalarWriter +from ..ms_data_loader import _SummaryParser + HEADER_SIZE = 8 CRC_STR_SIZE = 4 MAX_EVENT_STRING = 500000000 @@ -40,8 +37,8 @@ INFO_INTERVAL = 10 RETRY_TIMES = 2 -class EventParser(): - """Parse summary file and save it to csv file and image.""" +class EventParser: + """Parse summary file and save it to local file.""" def __init__(self, summary_file, output): self.summary_file = summary_file self._output = output @@ -90,7 +87,8 @@ class EventParser(): while True: start_offset = file_handler.offset try: - event_str = self._event_load(file_handler) + event_str = _SummaryParser.event_load(file_handler) + self._print_process(file_handler) crc_check_time = 0 if event_str is None: return True @@ -121,49 +119,8 @@ class EventParser(): file_handler.file_path) return False - def _event_load(self, file_handler): - """ - Load binary string to event string. - - Args: - file_handler (FileHandler): A file handler. - - Returns: - bytes, MindSpore event in bytes. - """ - # read the header - header_str = file_handler.read(HEADER_SIZE) - - if not header_str: - return None - - header_crc_str = file_handler.read(CRC_STR_SIZE) - if not header_crc_str: - header_crc_str = '' - - if len(header_str) != HEADER_SIZE or len(header_crc_str) != CRC_STR_SIZE: - raise exceptions.CRCLengthFailedError - - if not crc32.CheckValueAgainstData(header_crc_str, header_str, HEADER_SIZE): - raise exceptions.CRCFailedError() - - # read the event body if integrity of header is verified - header = struct.unpack('Q', header_str) - event_len = int(header[0]) - - event_str = file_handler.read(event_len) - if not event_str: - event_str = '' - event_crc_str = file_handler.read(CRC_STR_SIZE) - if not event_crc_str: - event_crc_str = '' - - if len(event_str) != event_len or len(event_crc_str) != CRC_STR_SIZE: - raise exceptions.CRCLengthFailedError - - if not crc32.CheckValueAgainstData(event_crc_str, event_str, event_len): - raise exceptions.CRCFailedError() - + def _print_process(self, file_handler): + """Prints the current parsing progress based on the progress of the read file.""" current_offset = file_handler.offset if current_offset >= self._process_info: parse_summary_logger.info("Current parsing process: %d/%d, %d%%.", current_offset, self._file_size, @@ -171,7 +128,6 @@ class EventParser(): self._process_info += self._file_size // INFO_INTERVAL if self._process_info > os.path.getsize(self.summary_file): self._process_info = os.path.getsize(self.summary_file) - return event_str def _event_parse(self, event_str): """ diff --git a/mindinsight/scripts/parse_summary.py b/mindinsight/scripts/parse_summary.py index 20126a28..e885c735 100644 --- a/mindinsight/scripts/parse_summary.py +++ b/mindinsight/scripts/parse_summary.py @@ -122,7 +122,6 @@ class Command(BaseCommand): date_time = datetime.datetime.now().strftime('output_%Y%m%d_%H%M%S_%f') output_path = os.path.join(args.output, date_time) - summary_dir = args.summary_dir if not self._check_dirpath(summary_dir): return @@ -139,8 +138,8 @@ class Command(BaseCommand): summary_file = FileHandler.join(summary_dir, filename) - if not (self._check_filepath(summary_file) and self._check_create_filepath( - output_path) and self._check_create_filepath(FileHandler.join(output_path, 'image'))): + if not (self._check_filepath(summary_file) and self._check_create_filepath(output_path) + and self._check_create_filepath(FileHandler.join(output_path, 'image'))): return eventparser = EventParser(summary_file, output_path) diff --git a/tests/ut/datavisual/data_transform/test_ms_data_loader.py b/tests/ut/datavisual/data_transform/test_ms_data_loader.py index 33da0cfd..777f8f76 100644 --- a/tests/ut/datavisual/data_transform/test_ms_data_loader.py +++ b/tests/ut/datavisual/data_transform/test_ms_data_loader.py @@ -75,7 +75,7 @@ class TestMsDataLoader: ms_loader._check_files_deleted(new_file_list, old_file_list) shutil.rmtree(summary_dir) assert MockLogger.log_msg['info'] == "There are some files has been deleted, " \ - "we will reload all files in path {}.".format(summary_dir) + "we will reload all files in path {}.".format(summary_dir) @pytest.mark.usefixtures('crc_pass') def test_load_success_with_crc_pass(self): @@ -100,7 +100,7 @@ class TestMsDataLoader: ms_loader = MSDataLoader(summary_dir) ms_loader.load() shutil.rmtree(summary_dir) - assert 'Check crc faild and ignore this file' in str(MockLogger.log_msg['warning']) + assert 'Check crc failed' in str(MockLogger.log_msg['error']) def test_filter_event_files(self): """Test filter_event_files function ok."""