When CRC error occurs, it is generally caused by incomplete data in the file, so error information should be given to prompt the user.tags/v1.1.0
| @@ -57,17 +57,17 @@ class SummaryLogPathInvalid(MindInsightException): | |||
| class CRCFailedError(MindInsightException): | |||
| """CRC fail, record corrupted.""" | |||
| def __init__(self): | |||
| error_msg = 'CRC Failed.' | |||
| def __init__(self, error_detail): | |||
| error_msg = 'CRC Failed. Detail: %s' % error_detail | |||
| super(CRCFailedError, self).__init__(DataVisualErrors.CRC_FAILED, | |||
| error_msg, | |||
| http_code=400) | |||
| class CRCLengthFailedError(MindInsightException): | |||
| """CRC fail, record corrupted.""" | |||
| def __init__(self): | |||
| error_msg = 'CRC Length Failed.' | |||
| """CRC length fail, record corrupted.""" | |||
| def __init__(self, error_detail): | |||
| error_msg = 'CRC Length Failed. Detail: %s' % error_detail | |||
| super(CRCLengthFailedError, self).__init__(DataVisualErrors.CRC_LENGTH_FAILED, | |||
| error_msg, | |||
| http_code=400) | |||
| @@ -335,7 +335,7 @@ class _SummaryParser(_Parser): | |||
| else: | |||
| self._latest_file_size = new_size | |||
| # Wait for data in this file to be processed to avoid loading multiple files at the same time. | |||
| logger.info("Parse summary file offset %d, file path: %s.", self._latest_file_size, file_path) | |||
| logger.debug("Parse summary file offset %d, file path: %s.", self._latest_file_size, file_path) | |||
| return False | |||
| except UnknownError as ex: | |||
| logger.warning("Parse summary file failed, detail: %r," | |||
| @@ -371,7 +371,7 @@ class _SummaryParser(_Parser): | |||
| while True: | |||
| start_offset = file_handler.offset | |||
| try: | |||
| event_str = self._event_load(file_handler) | |||
| event_str = self.event_load(file_handler) | |||
| if event_str is None: | |||
| file_handler.reset_offset(start_offset) | |||
| return True | |||
| @@ -399,27 +399,23 @@ class _SummaryParser(_Parser): | |||
| future.add_done_callback(exception_no_raise_wrapper(_add_tensor_event_callback)) | |||
| return False | |||
| except exceptions.CRCLengthFailedError: | |||
| except (exceptions.CRCFailedError, exceptions.CRCLengthFailedError) as exc: | |||
| file_handler.reset_offset(start_offset) | |||
| logger.warning( | |||
| "Check crc length failed, please check the summary file integrity, " | |||
| "the file may be in transfer, file_path: %s, offset=%s.", | |||
| file_handler.file_path, start_offset) | |||
| return True | |||
| except exceptions.CRCFailedError: | |||
| file_handler.reset_offset(start_offset) | |||
| logger.warning("Check crc faild and ignore this file, file_path=%s, " | |||
| "offset=%s.", file_handler.file_path, file_handler.offset) | |||
| file_size = file_handler.file_stat(file_handler.file_path).size | |||
| logger.error("Check crc failed and ignore this file, please check the integrity of the file, " | |||
| "file_path: %s, offset: %s, file size: %s. Detail: %s.", | |||
| file_handler.file_path, file_handler.offset, file_size, str(exc)) | |||
| return True | |||
| except (OSError, DecodeError, exceptions.MindInsightException) as ex: | |||
| logger.warning("Parse log file fail, and ignore this file, detail: %r," | |||
| "file path: %s.", str(ex), file_handler.file_path) | |||
| logger.error("Parse log file fail, and ignore this file, detail: %r, " | |||
| "file path: %s.", str(ex), file_handler.file_path) | |||
| return True | |||
| except Exception as ex: | |||
| logger.exception(ex) | |||
| raise UnknownError(str(ex)) | |||
| def _event_load(self, file_handler): | |||
| @staticmethod | |||
| def event_load(file_handler): | |||
| """ | |||
| Load binary string to event string. | |||
| @@ -439,9 +435,9 @@ class _SummaryParser(_Parser): | |||
| header_crc_str = '' | |||
| if len(header_str) != HEADER_SIZE or len(header_crc_str) != CRC_STR_SIZE: | |||
| raise exceptions.CRCLengthFailedError | |||
| raise exceptions.CRCLengthFailedError("CRC header length or event header length is incorrect.") | |||
| if not crc32.CheckValueAgainstData(header_crc_str, header_str, HEADER_SIZE): | |||
| raise exceptions.CRCFailedError() | |||
| raise exceptions.CRCFailedError("The header of event crc is failed.") | |||
| # read the event body if integrity of header is verified | |||
| header = struct.unpack('Q', header_str) | |||
| @@ -455,9 +451,9 @@ class _SummaryParser(_Parser): | |||
| event_crc_str = '' | |||
| if len(event_str) != event_len or len(event_crc_str) != CRC_STR_SIZE: | |||
| raise exceptions.CRCLengthFailedError | |||
| raise exceptions.CRCLengthFailedError("The event sting length or crc length is incorrect.") | |||
| if not crc32.CheckValueAgainstData(event_crc_str, event_str, event_len): | |||
| raise exceptions.CRCFailedError() | |||
| raise exceptions.CRCFailedError("The event string crc is incorrect.") | |||
| return event_str | |||
| @@ -12,18 +12,13 @@ | |||
| # See the License for the specific language governing permissions and | |||
| # limitations under the License. | |||
| # ============================================================================ | |||
| """ | |||
| Scalar Writer. | |||
| """Parse summary file and save it local file.""" | |||
| This module write scalar into a csv file. | |||
| """ | |||
| import os | |||
| import time | |||
| import struct | |||
| from google.protobuf.message import DecodeError | |||
| from mindinsight.datavisual.utils import crc32 | |||
| from mindinsight.datavisual.common import exceptions | |||
| from mindinsight.datavisual.common.log import parse_summary_logger | |||
| from mindinsight.datavisual.proto_files import lazy_read_pb2 | |||
| @@ -31,6 +26,8 @@ from mindinsight.datavisual.data_access.file_handler import FileHandler | |||
| from mindinsight.datavisual.data_transform.summary_parser.image_writer import ImageWriter | |||
| from mindinsight.datavisual.data_transform.summary_parser.scalar_writer import ScalarWriter | |||
| from ..ms_data_loader import _SummaryParser | |||
| HEADER_SIZE = 8 | |||
| CRC_STR_SIZE = 4 | |||
| MAX_EVENT_STRING = 500000000 | |||
| @@ -40,8 +37,8 @@ INFO_INTERVAL = 10 | |||
| RETRY_TIMES = 2 | |||
| class EventParser(): | |||
| """Parse summary file and save it to csv file and image.""" | |||
| class EventParser: | |||
| """Parse summary file and save it to local file.""" | |||
| def __init__(self, summary_file, output): | |||
| self.summary_file = summary_file | |||
| self._output = output | |||
| @@ -90,7 +87,8 @@ class EventParser(): | |||
| while True: | |||
| start_offset = file_handler.offset | |||
| try: | |||
| event_str = self._event_load(file_handler) | |||
| event_str = _SummaryParser.event_load(file_handler) | |||
| self._print_process(file_handler) | |||
| crc_check_time = 0 | |||
| if event_str is None: | |||
| return True | |||
| @@ -121,49 +119,8 @@ class EventParser(): | |||
| file_handler.file_path) | |||
| return False | |||
| def _event_load(self, file_handler): | |||
| """ | |||
| Load binary string to event string. | |||
| Args: | |||
| file_handler (FileHandler): A file handler. | |||
| Returns: | |||
| bytes, MindSpore event in bytes. | |||
| """ | |||
| # read the header | |||
| header_str = file_handler.read(HEADER_SIZE) | |||
| if not header_str: | |||
| return None | |||
| header_crc_str = file_handler.read(CRC_STR_SIZE) | |||
| if not header_crc_str: | |||
| header_crc_str = '' | |||
| if len(header_str) != HEADER_SIZE or len(header_crc_str) != CRC_STR_SIZE: | |||
| raise exceptions.CRCLengthFailedError | |||
| if not crc32.CheckValueAgainstData(header_crc_str, header_str, HEADER_SIZE): | |||
| raise exceptions.CRCFailedError() | |||
| # read the event body if integrity of header is verified | |||
| header = struct.unpack('Q', header_str) | |||
| event_len = int(header[0]) | |||
| event_str = file_handler.read(event_len) | |||
| if not event_str: | |||
| event_str = '' | |||
| event_crc_str = file_handler.read(CRC_STR_SIZE) | |||
| if not event_crc_str: | |||
| event_crc_str = '' | |||
| if len(event_str) != event_len or len(event_crc_str) != CRC_STR_SIZE: | |||
| raise exceptions.CRCLengthFailedError | |||
| if not crc32.CheckValueAgainstData(event_crc_str, event_str, event_len): | |||
| raise exceptions.CRCFailedError() | |||
| def _print_process(self, file_handler): | |||
| """Prints the current parsing progress based on the progress of the read file.""" | |||
| current_offset = file_handler.offset | |||
| if current_offset >= self._process_info: | |||
| parse_summary_logger.info("Current parsing process: %d/%d, %d%%.", current_offset, self._file_size, | |||
| @@ -171,7 +128,6 @@ class EventParser(): | |||
| self._process_info += self._file_size // INFO_INTERVAL | |||
| if self._process_info > os.path.getsize(self.summary_file): | |||
| self._process_info = os.path.getsize(self.summary_file) | |||
| return event_str | |||
| def _event_parse(self, event_str): | |||
| """ | |||
| @@ -122,7 +122,6 @@ class Command(BaseCommand): | |||
| date_time = datetime.datetime.now().strftime('output_%Y%m%d_%H%M%S_%f') | |||
| output_path = os.path.join(args.output, date_time) | |||
| summary_dir = args.summary_dir | |||
| if not self._check_dirpath(summary_dir): | |||
| return | |||
| @@ -139,8 +138,8 @@ class Command(BaseCommand): | |||
| summary_file = FileHandler.join(summary_dir, filename) | |||
| if not (self._check_filepath(summary_file) and self._check_create_filepath( | |||
| output_path) and self._check_create_filepath(FileHandler.join(output_path, 'image'))): | |||
| if not (self._check_filepath(summary_file) and self._check_create_filepath(output_path) | |||
| and self._check_create_filepath(FileHandler.join(output_path, 'image'))): | |||
| return | |||
| eventparser = EventParser(summary_file, output_path) | |||
| @@ -75,7 +75,7 @@ class TestMsDataLoader: | |||
| ms_loader._check_files_deleted(new_file_list, old_file_list) | |||
| shutil.rmtree(summary_dir) | |||
| assert MockLogger.log_msg['info'] == "There are some files has been deleted, " \ | |||
| "we will reload all files in path {}.".format(summary_dir) | |||
| "we will reload all files in path {}.".format(summary_dir) | |||
| @pytest.mark.usefixtures('crc_pass') | |||
| def test_load_success_with_crc_pass(self): | |||
| @@ -100,7 +100,7 @@ class TestMsDataLoader: | |||
| ms_loader = MSDataLoader(summary_dir) | |||
| ms_loader.load() | |||
| shutil.rmtree(summary_dir) | |||
| assert 'Check crc faild and ignore this file' in str(MockLogger.log_msg['warning']) | |||
| assert 'Check crc failed' in str(MockLogger.log_msg['error']) | |||
| def test_filter_event_files(self): | |||
| """Test filter_event_files function ok.""" | |||