From: @ouwenchang Reviewed-by: @scukzz,@wenkai_dist Signed-off-by:tags/v1.1.0
| @@ -57,17 +57,17 @@ class SummaryLogPathInvalid(MindInsightException): | |||||
| class CRCFailedError(MindInsightException): | class CRCFailedError(MindInsightException): | ||||
| """CRC fail, record corrupted.""" | """CRC fail, record corrupted.""" | ||||
| def __init__(self): | |||||
| error_msg = 'CRC Failed.' | |||||
| def __init__(self, error_detail): | |||||
| error_msg = 'CRC Failed. Detail: %s' % error_detail | |||||
| super(CRCFailedError, self).__init__(DataVisualErrors.CRC_FAILED, | super(CRCFailedError, self).__init__(DataVisualErrors.CRC_FAILED, | ||||
| error_msg, | error_msg, | ||||
| http_code=400) | http_code=400) | ||||
| class CRCLengthFailedError(MindInsightException): | class CRCLengthFailedError(MindInsightException): | ||||
| """CRC fail, record corrupted.""" | |||||
| def __init__(self): | |||||
| error_msg = 'CRC Length Failed.' | |||||
| """CRC length fail, record corrupted.""" | |||||
| def __init__(self, error_detail): | |||||
| error_msg = 'CRC Length Failed. Detail: %s' % error_detail | |||||
| super(CRCLengthFailedError, self).__init__(DataVisualErrors.CRC_LENGTH_FAILED, | super(CRCLengthFailedError, self).__init__(DataVisualErrors.CRC_LENGTH_FAILED, | ||||
| error_msg, | error_msg, | ||||
| http_code=400) | http_code=400) | ||||
| @@ -335,7 +335,7 @@ class _SummaryParser(_Parser): | |||||
| else: | else: | ||||
| self._latest_file_size = new_size | self._latest_file_size = new_size | ||||
| # Wait for data in this file to be processed to avoid loading multiple files at the same time. | # Wait for data in this file to be processed to avoid loading multiple files at the same time. | ||||
| logger.info("Parse summary file offset %d, file path: %s.", self._latest_file_size, file_path) | |||||
| logger.debug("Parse summary file offset %d, file path: %s.", self._latest_file_size, file_path) | |||||
| return False | return False | ||||
| except UnknownError as ex: | except UnknownError as ex: | ||||
| logger.warning("Parse summary file failed, detail: %r," | logger.warning("Parse summary file failed, detail: %r," | ||||
| @@ -371,7 +371,7 @@ class _SummaryParser(_Parser): | |||||
| while True: | while True: | ||||
| start_offset = file_handler.offset | start_offset = file_handler.offset | ||||
| try: | try: | ||||
| event_str = self._event_load(file_handler) | |||||
| event_str = self.event_load(file_handler) | |||||
| if event_str is None: | if event_str is None: | ||||
| file_handler.reset_offset(start_offset) | file_handler.reset_offset(start_offset) | ||||
| return True | return True | ||||
| @@ -399,27 +399,23 @@ class _SummaryParser(_Parser): | |||||
| future.add_done_callback(exception_no_raise_wrapper(_add_tensor_event_callback)) | future.add_done_callback(exception_no_raise_wrapper(_add_tensor_event_callback)) | ||||
| return False | return False | ||||
| except exceptions.CRCLengthFailedError: | |||||
| except (exceptions.CRCFailedError, exceptions.CRCLengthFailedError) as exc: | |||||
| file_handler.reset_offset(start_offset) | file_handler.reset_offset(start_offset) | ||||
| logger.warning( | |||||
| "Check crc length failed, please check the summary file integrity, " | |||||
| "the file may be in transfer, file_path: %s, offset=%s.", | |||||
| file_handler.file_path, start_offset) | |||||
| return True | |||||
| except exceptions.CRCFailedError: | |||||
| file_handler.reset_offset(start_offset) | |||||
| logger.warning("Check crc faild and ignore this file, file_path=%s, " | |||||
| "offset=%s.", file_handler.file_path, file_handler.offset) | |||||
| file_size = file_handler.file_stat(file_handler.file_path).size | |||||
| logger.error("Check crc failed and ignore this file, please check the integrity of the file, " | |||||
| "file_path: %s, offset: %s, file size: %s. Detail: %s.", | |||||
| file_handler.file_path, file_handler.offset, file_size, str(exc)) | |||||
| return True | return True | ||||
| except (OSError, DecodeError, exceptions.MindInsightException) as ex: | except (OSError, DecodeError, exceptions.MindInsightException) as ex: | ||||
| logger.warning("Parse log file fail, and ignore this file, detail: %r," | |||||
| "file path: %s.", str(ex), file_handler.file_path) | |||||
| logger.error("Parse log file fail, and ignore this file, detail: %r, " | |||||
| "file path: %s.", str(ex), file_handler.file_path) | |||||
| return True | return True | ||||
| except Exception as ex: | except Exception as ex: | ||||
| logger.exception(ex) | logger.exception(ex) | ||||
| raise UnknownError(str(ex)) | raise UnknownError(str(ex)) | ||||
| def _event_load(self, file_handler): | |||||
| @staticmethod | |||||
| def event_load(file_handler): | |||||
| """ | """ | ||||
| Load binary string to event string. | Load binary string to event string. | ||||
| @@ -439,9 +435,9 @@ class _SummaryParser(_Parser): | |||||
| header_crc_str = '' | header_crc_str = '' | ||||
| if len(header_str) != HEADER_SIZE or len(header_crc_str) != CRC_STR_SIZE: | if len(header_str) != HEADER_SIZE or len(header_crc_str) != CRC_STR_SIZE: | ||||
| raise exceptions.CRCLengthFailedError | |||||
| raise exceptions.CRCLengthFailedError("CRC header length or event header length is incorrect.") | |||||
| if not crc32.CheckValueAgainstData(header_crc_str, header_str, HEADER_SIZE): | if not crc32.CheckValueAgainstData(header_crc_str, header_str, HEADER_SIZE): | ||||
| raise exceptions.CRCFailedError() | |||||
| raise exceptions.CRCFailedError("The header of event crc is failed.") | |||||
| # read the event body if integrity of header is verified | # read the event body if integrity of header is verified | ||||
| header = struct.unpack('Q', header_str) | header = struct.unpack('Q', header_str) | ||||
| @@ -455,9 +451,9 @@ class _SummaryParser(_Parser): | |||||
| event_crc_str = '' | event_crc_str = '' | ||||
| if len(event_str) != event_len or len(event_crc_str) != CRC_STR_SIZE: | if len(event_str) != event_len or len(event_crc_str) != CRC_STR_SIZE: | ||||
| raise exceptions.CRCLengthFailedError | |||||
| raise exceptions.CRCLengthFailedError("The event sting length or crc length is incorrect.") | |||||
| if not crc32.CheckValueAgainstData(event_crc_str, event_str, event_len): | if not crc32.CheckValueAgainstData(event_crc_str, event_str, event_len): | ||||
| raise exceptions.CRCFailedError() | |||||
| raise exceptions.CRCFailedError("The event string crc is incorrect.") | |||||
| return event_str | return event_str | ||||
| @@ -12,18 +12,13 @@ | |||||
| # See the License for the specific language governing permissions and | # See the License for the specific language governing permissions and | ||||
| # limitations under the License. | # limitations under the License. | ||||
| # ============================================================================ | # ============================================================================ | ||||
| """ | |||||
| Scalar Writer. | |||||
| """Parse summary file and save it local file.""" | |||||
| This module write scalar into a csv file. | |||||
| """ | |||||
| import os | import os | ||||
| import time | import time | ||||
| import struct | |||||
| from google.protobuf.message import DecodeError | from google.protobuf.message import DecodeError | ||||
| from mindinsight.datavisual.utils import crc32 | |||||
| from mindinsight.datavisual.common import exceptions | from mindinsight.datavisual.common import exceptions | ||||
| from mindinsight.datavisual.common.log import parse_summary_logger | from mindinsight.datavisual.common.log import parse_summary_logger | ||||
| from mindinsight.datavisual.proto_files import lazy_read_pb2 | from mindinsight.datavisual.proto_files import lazy_read_pb2 | ||||
| @@ -31,6 +26,8 @@ from mindinsight.datavisual.data_access.file_handler import FileHandler | |||||
| from mindinsight.datavisual.data_transform.summary_parser.image_writer import ImageWriter | from mindinsight.datavisual.data_transform.summary_parser.image_writer import ImageWriter | ||||
| from mindinsight.datavisual.data_transform.summary_parser.scalar_writer import ScalarWriter | from mindinsight.datavisual.data_transform.summary_parser.scalar_writer import ScalarWriter | ||||
| from ..ms_data_loader import _SummaryParser | |||||
| HEADER_SIZE = 8 | HEADER_SIZE = 8 | ||||
| CRC_STR_SIZE = 4 | CRC_STR_SIZE = 4 | ||||
| MAX_EVENT_STRING = 500000000 | MAX_EVENT_STRING = 500000000 | ||||
| @@ -40,8 +37,8 @@ INFO_INTERVAL = 10 | |||||
| RETRY_TIMES = 2 | RETRY_TIMES = 2 | ||||
| class EventParser(): | |||||
| """Parse summary file and save it to csv file and image.""" | |||||
| class EventParser: | |||||
| """Parse summary file and save it to local file.""" | |||||
| def __init__(self, summary_file, output): | def __init__(self, summary_file, output): | ||||
| self.summary_file = summary_file | self.summary_file = summary_file | ||||
| self._output = output | self._output = output | ||||
| @@ -90,7 +87,8 @@ class EventParser(): | |||||
| while True: | while True: | ||||
| start_offset = file_handler.offset | start_offset = file_handler.offset | ||||
| try: | try: | ||||
| event_str = self._event_load(file_handler) | |||||
| event_str = _SummaryParser.event_load(file_handler) | |||||
| self._print_process(file_handler) | |||||
| crc_check_time = 0 | crc_check_time = 0 | ||||
| if event_str is None: | if event_str is None: | ||||
| return True | return True | ||||
| @@ -121,49 +119,8 @@ class EventParser(): | |||||
| file_handler.file_path) | file_handler.file_path) | ||||
| return False | return False | ||||
| def _event_load(self, file_handler): | |||||
| """ | |||||
| Load binary string to event string. | |||||
| Args: | |||||
| file_handler (FileHandler): A file handler. | |||||
| Returns: | |||||
| bytes, MindSpore event in bytes. | |||||
| """ | |||||
| # read the header | |||||
| header_str = file_handler.read(HEADER_SIZE) | |||||
| if not header_str: | |||||
| return None | |||||
| header_crc_str = file_handler.read(CRC_STR_SIZE) | |||||
| if not header_crc_str: | |||||
| header_crc_str = '' | |||||
| if len(header_str) != HEADER_SIZE or len(header_crc_str) != CRC_STR_SIZE: | |||||
| raise exceptions.CRCLengthFailedError | |||||
| if not crc32.CheckValueAgainstData(header_crc_str, header_str, HEADER_SIZE): | |||||
| raise exceptions.CRCFailedError() | |||||
| # read the event body if integrity of header is verified | |||||
| header = struct.unpack('Q', header_str) | |||||
| event_len = int(header[0]) | |||||
| event_str = file_handler.read(event_len) | |||||
| if not event_str: | |||||
| event_str = '' | |||||
| event_crc_str = file_handler.read(CRC_STR_SIZE) | |||||
| if not event_crc_str: | |||||
| event_crc_str = '' | |||||
| if len(event_str) != event_len or len(event_crc_str) != CRC_STR_SIZE: | |||||
| raise exceptions.CRCLengthFailedError | |||||
| if not crc32.CheckValueAgainstData(event_crc_str, event_str, event_len): | |||||
| raise exceptions.CRCFailedError() | |||||
| def _print_process(self, file_handler): | |||||
| """Prints the current parsing progress based on the progress of the read file.""" | |||||
| current_offset = file_handler.offset | current_offset = file_handler.offset | ||||
| if current_offset >= self._process_info: | if current_offset >= self._process_info: | ||||
| parse_summary_logger.info("Current parsing process: %d/%d, %d%%.", current_offset, self._file_size, | parse_summary_logger.info("Current parsing process: %d/%d, %d%%.", current_offset, self._file_size, | ||||
| @@ -171,7 +128,6 @@ class EventParser(): | |||||
| self._process_info += self._file_size // INFO_INTERVAL | self._process_info += self._file_size // INFO_INTERVAL | ||||
| if self._process_info > os.path.getsize(self.summary_file): | if self._process_info > os.path.getsize(self.summary_file): | ||||
| self._process_info = os.path.getsize(self.summary_file) | self._process_info = os.path.getsize(self.summary_file) | ||||
| return event_str | |||||
| def _event_parse(self, event_str): | def _event_parse(self, event_str): | ||||
| """ | """ | ||||
| @@ -122,7 +122,6 @@ class Command(BaseCommand): | |||||
| date_time = datetime.datetime.now().strftime('output_%Y%m%d_%H%M%S_%f') | date_time = datetime.datetime.now().strftime('output_%Y%m%d_%H%M%S_%f') | ||||
| output_path = os.path.join(args.output, date_time) | output_path = os.path.join(args.output, date_time) | ||||
| summary_dir = args.summary_dir | summary_dir = args.summary_dir | ||||
| if not self._check_dirpath(summary_dir): | if not self._check_dirpath(summary_dir): | ||||
| return | return | ||||
| @@ -139,8 +138,8 @@ class Command(BaseCommand): | |||||
| summary_file = FileHandler.join(summary_dir, filename) | summary_file = FileHandler.join(summary_dir, filename) | ||||
| if not (self._check_filepath(summary_file) and self._check_create_filepath( | |||||
| output_path) and self._check_create_filepath(FileHandler.join(output_path, 'image'))): | |||||
| if not (self._check_filepath(summary_file) and self._check_create_filepath(output_path) | |||||
| and self._check_create_filepath(FileHandler.join(output_path, 'image'))): | |||||
| return | return | ||||
| eventparser = EventParser(summary_file, output_path) | eventparser = EventParser(summary_file, output_path) | ||||
| @@ -75,7 +75,7 @@ class TestMsDataLoader: | |||||
| ms_loader._check_files_deleted(new_file_list, old_file_list) | ms_loader._check_files_deleted(new_file_list, old_file_list) | ||||
| shutil.rmtree(summary_dir) | shutil.rmtree(summary_dir) | ||||
| assert MockLogger.log_msg['info'] == "There are some files has been deleted, " \ | assert MockLogger.log_msg['info'] == "There are some files has been deleted, " \ | ||||
| "we will reload all files in path {}.".format(summary_dir) | |||||
| "we will reload all files in path {}.".format(summary_dir) | |||||
| @pytest.mark.usefixtures('crc_pass') | @pytest.mark.usefixtures('crc_pass') | ||||
| def test_load_success_with_crc_pass(self): | def test_load_success_with_crc_pass(self): | ||||
| @@ -100,7 +100,7 @@ class TestMsDataLoader: | |||||
| ms_loader = MSDataLoader(summary_dir) | ms_loader = MSDataLoader(summary_dir) | ||||
| ms_loader.load() | ms_loader.load() | ||||
| shutil.rmtree(summary_dir) | shutil.rmtree(summary_dir) | ||||
| assert 'Check crc faild and ignore this file' in str(MockLogger.log_msg['warning']) | |||||
| assert 'Check crc failed' in str(MockLogger.log_msg['error']) | |||||
| def test_filter_event_files(self): | def test_filter_event_files(self): | ||||
| """Test filter_event_files function ok.""" | """Test filter_event_files function ok.""" | ||||