From 7877f33b70e566f07b63297235be42468118b4b7 Mon Sep 17 00:00:00 2001 From: wangshuide2020 <7511764+wangshuide2020@user.noreply.gitee.com> Date: Thu, 16 Jul 2020 21:21:24 +0800 Subject: [PATCH] Use multiple processes to calc events. 1. To accelerate summary file parsing, multiple processes are used. As the first step to mindinsight parsing performance optimization, we only made changes to _load_single_file function. 2. This PR will imporve summary parsing throughput dramatically (about cpu_count times) 3. Changes are mainly about _load_single_file function In the future, a more global concurrent computing framework is needed for mindinsight. See the gitee wiki doc for details. --- mindinsight/backend/run.py | 11 +- mindinsight/conf/constants.py | 2 + .../datavisual/data_transform/data_loader.py | 11 +- .../datavisual/data_transform/data_manager.py | 8 +- .../datavisual/data_transform/events_data.py | 1 + .../data_transform/ms_data_loader.py | 149 ++++++++++++------ .../data_transform/tensor_container.py | 5 +- .../datavisual/processors/tensor_processor.py | 6 +- mindinsight/scripts/stop.py | 36 +++-- 9 files changed, 148 insertions(+), 81 deletions(-) diff --git a/mindinsight/backend/run.py b/mindinsight/backend/run.py index 23831b72..998f0bbd 100644 --- a/mindinsight/backend/run.py +++ b/mindinsight/backend/run.py @@ -236,9 +236,10 @@ def start(): process = subprocess.Popen( shlex.split(cmd), shell=False, - stdin=subprocess.PIPE, - stdout=subprocess.PIPE, - stderr=subprocess.PIPE + # Change stdout to DEVNULL to prevent broken pipe error when creating new processes. + stdin=subprocess.DEVNULL, + stdout=subprocess.DEVNULL, + stderr=subprocess.STDOUT ) # sleep 1 second for gunicorn appplication to load modules @@ -246,9 +247,7 @@ def start(): # check if gunicorn application is running if process.poll() is not None: - _, stderr = process.communicate() - for line in stderr.decode().split('\n'): - console.error(line) + console.error("Start MindInsight failed. See log for details.") else: state_result = _check_server_start_stat(errorlog_abspath, log_size) # print gunicorn start state to stdout diff --git a/mindinsight/conf/constants.py b/mindinsight/conf/constants.py index cdd8aa1a..9605c809 100644 --- a/mindinsight/conf/constants.py +++ b/mindinsight/conf/constants.py @@ -14,6 +14,7 @@ # ============================================================================ """Constants module for mindinsight settings.""" import logging +import os #################################### # Global default settings. @@ -48,6 +49,7 @@ API_PREFIX = '/v1/mindinsight' # Datavisual default settings. #################################### MAX_THREADS_COUNT = 15 +MAX_PROCESSES_COUNT = max(os.cpu_count() or 0, 15) MAX_TAG_SIZE_PER_EVENTS_DATA = 300 DEFAULT_STEP_SIZES_PER_TAG = 500 diff --git a/mindinsight/datavisual/data_transform/data_loader.py b/mindinsight/datavisual/data_transform/data_loader.py index 84dfcd51..e61f8633 100644 --- a/mindinsight/datavisual/data_transform/data_loader.py +++ b/mindinsight/datavisual/data_transform/data_loader.py @@ -34,8 +34,13 @@ class DataLoader: self._summary_dir = summary_dir self._loader = None - def load(self): - """Load the data when loader is exist.""" + def load(self, workers_count=1): + """Load the data when loader is exist. + + Args: + workers_count (int): The count of workers. Default value is 1. + """ + if self._loader is None: ms_dataloader = MSDataLoader(self._summary_dir) loaders = [ms_dataloader] @@ -48,7 +53,7 @@ class DataLoader: logger.warning("No valid files can be loaded, summary_dir: %s.", self._summary_dir) raise exceptions.SummaryLogPathInvalid() - self._loader.load() + self._loader.load(workers_count) def get_events_data(self): """ diff --git a/mindinsight/datavisual/data_transform/data_manager.py b/mindinsight/datavisual/data_transform/data_manager.py index 1f138f90..36ccb809 100644 --- a/mindinsight/datavisual/data_transform/data_manager.py +++ b/mindinsight/datavisual/data_transform/data_manager.py @@ -510,7 +510,7 @@ class _DetailCacheManager(_BaseCacheManager): logger.debug("delete loader %s", loader_id) self._loader_pool.pop(loader_id) - def _execute_loader(self, loader_id): + def _execute_loader(self, loader_id, workers_count): """ Load data form data_loader. @@ -518,7 +518,7 @@ class _DetailCacheManager(_BaseCacheManager): Args: loader_id (str): An ID for `Loader`. - + workers_count (int): The count of workers. """ try: with self._loader_pool_mutex: @@ -527,7 +527,7 @@ class _DetailCacheManager(_BaseCacheManager): logger.debug("Loader %r has been deleted, will not load data.", loader_id) return - loader.data_loader.load() + loader.data_loader.load(workers_count) # Update loader cache status to CACHED. # Loader with cache status CACHED should remain the same cache status. @@ -584,7 +584,7 @@ class _DetailCacheManager(_BaseCacheManager): futures = [] loader_pool = self._get_snapshot_loader_pool() for loader_id in loader_pool: - future = executor.submit(self._execute_loader, loader_id) + future = executor.submit(self._execute_loader, loader_id, threads_count) futures.append(future) wait(futures, return_when=ALL_COMPLETED) diff --git a/mindinsight/datavisual/data_transform/events_data.py b/mindinsight/datavisual/data_transform/events_data.py index d297189a..07d054c3 100644 --- a/mindinsight/datavisual/data_transform/events_data.py +++ b/mindinsight/datavisual/data_transform/events_data.py @@ -85,6 +85,7 @@ class EventsData: deleted_tag = self._check_tag_out_of_spec(plugin_name) if deleted_tag is not None: if tag in self._deleted_tags: + logger.debug("Tag is in deleted tags: %s.", tag) return self.delete_tensor_event(deleted_tag) diff --git a/mindinsight/datavisual/data_transform/ms_data_loader.py b/mindinsight/datavisual/data_transform/ms_data_loader.py index f4e062fd..e7c35092 100644 --- a/mindinsight/datavisual/data_transform/ms_data_loader.py +++ b/mindinsight/datavisual/data_transform/ms_data_loader.py @@ -19,12 +19,17 @@ This module is used to load the MindSpore training log file. Each instance will read an entire run, a run can contain one or more log file. """ +import concurrent.futures as futures +import math +import os import re import struct +import threading from google.protobuf.message import DecodeError from google.protobuf.text_format import ParseError +from mindinsight.conf import settings from mindinsight.datavisual.common import exceptions from mindinsight.datavisual.common.enums import PluginNameEnum from mindinsight.datavisual.common.log import logger @@ -32,13 +37,13 @@ from mindinsight.datavisual.data_access.file_handler import FileHandler from mindinsight.datavisual.data_transform.events_data import EventsData from mindinsight.datavisual.data_transform.events_data import TensorEvent from mindinsight.datavisual.data_transform.graph import MSGraph -from mindinsight.datavisual.proto_files import mindinsight_summary_pb2 as summary_pb2 -from mindinsight.datavisual.proto_files import mindinsight_anf_ir_pb2 as anf_ir_pb2 -from mindinsight.datavisual.utils import crc32 -from mindinsight.utils.exceptions import UnknownError from mindinsight.datavisual.data_transform.histogram import Histogram from mindinsight.datavisual.data_transform.histogram_container import HistogramContainer from mindinsight.datavisual.data_transform.tensor_container import TensorContainer +from mindinsight.datavisual.proto_files import mindinsight_anf_ir_pb2 as anf_ir_pb2 +from mindinsight.datavisual.proto_files import mindinsight_summary_pb2 as summary_pb2 +from mindinsight.datavisual.utils import crc32 +from mindinsight.utils.exceptions import UnknownError HEADER_SIZE = 8 CRC_STR_SIZE = 4 @@ -79,11 +84,14 @@ class MSDataLoader: "we will reload all files in path %s.", self._summary_dir) self.__init__(self._summary_dir) - def load(self): + def load(self, workers_count=1): """ Load all log valid files. When the file is reloaded, it will continue to load from where it left off. + + Args: + workers_count (int): The count of workers. Default value is 1. """ logger.debug("Start to load data in ms data loader.") filenames = self.filter_valid_files() @@ -95,7 +103,7 @@ class MSDataLoader: self._check_files_deleted(filenames, old_filenames) for parser in self._parser_list: - parser.parse_files(filenames, events_data=self._events_data) + parser.parse_files(workers_count, filenames, events_data=self._events_data) def filter_valid_files(self): """ @@ -125,11 +133,12 @@ class _Parser: self._latest_mtime = 0 self._summary_dir = summary_dir - def parse_files(self, filenames, events_data): + def parse_files(self, workers_count, filenames, events_data): """ Load files and parse files content. Args: + workers_count (int): The count of workers. filenames (list[str]): File name list. events_data (EventsData): The container of event data. """ @@ -177,7 +186,7 @@ class _Parser: class _PbParser(_Parser): """This class is used to parse pb file.""" - def parse_files(self, filenames, events_data): + def parse_files(self, workers_count, filenames, events_data): pb_filenames = self.filter_files(filenames) pb_filenames = self.sort_files(pb_filenames) for filename in pb_filenames: @@ -255,11 +264,12 @@ class _SummaryParser(_Parser): self._summary_file_handler = None self._events_data = None - def parse_files(self, filenames, events_data): + def parse_files(self, workers_count, filenames, events_data): """ Load summary file and parse file content. Args: + workers_count (int): The count of workers. filenames (list[str]): File name list. events_data (EventsData): The container of event data. """ @@ -285,7 +295,7 @@ class _SummaryParser(_Parser): self._latest_file_size = new_size try: - self._load_single_file(self._summary_file_handler) + self._load_single_file(self._summary_file_handler, workers_count) except UnknownError as ex: logger.warning("Parse summary file failed, detail: %r," "file path: %s.", str(ex), file_path) @@ -304,36 +314,75 @@ class _SummaryParser(_Parser): lambda filename: (re.search(r'summary\.\d+', filename) and not filename.endswith("_lineage")), filenames)) - def _load_single_file(self, file_handler): + def _load_single_file(self, file_handler, workers_count): """ Load a log file data. Args: file_handler (FileHandler): A file handler. + workers_count (int): The count of workers. """ - logger.debug("Load single summary file, file path: %s.", file_handler.file_path) - while True: - start_offset = file_handler.offset - try: - event_str = self._event_load(file_handler) - if event_str is None: + + default_concurrency = 1 + cpu_count = os.cpu_count() + if cpu_count is None: + concurrency = default_concurrency + else: + concurrency = min(math.floor(cpu_count / workers_count), + math.floor(settings.MAX_PROCESSES_COUNT / workers_count)) + if concurrency <= 0: + concurrency = default_concurrency + logger.debug("Load single summary file, file path: %s, concurrency: %s.", file_handler.file_path, concurrency) + + semaphore = threading.Semaphore(value=concurrency) + with futures.ProcessPoolExecutor(max_workers=concurrency) as executor: + while True: + start_offset = file_handler.offset + try: + event_str = self._event_load(file_handler) + if event_str is None: + file_handler.reset_offset(start_offset) + break + + # Make sure we have at most concurrency tasks not finished to save memory. + semaphore.acquire() + future = executor.submit(self._event_parse, event_str, self._latest_filename) + + def _add_tensor_event_callback(future_value): + try: + tensor_values = future_value.result() + for tensor_value in tensor_values: + if tensor_value.plugin_name == PluginNameEnum.GRAPH.value: + try: + graph_tags = self._events_data.list_tags_by_plugin(PluginNameEnum.GRAPH.value) + except KeyError: + graph_tags = [] + + summary_tags = self.filter_files(graph_tags) + for tag in summary_tags: + self._events_data.delete_tensor_event(tag) + + self._events_data.add_tensor_event(tensor_value) + except Exception as exc: + # Log exception for debugging. + logger.exception(exc) + raise + finally: + semaphore.release() + + future.add_done_callback(_add_tensor_event_callback) + except exceptions.CRCFailedError: file_handler.reset_offset(start_offset) + logger.warning("Check crc faild and ignore this file, file_path=%s, " + "offset=%s.", file_handler.file_path, file_handler.offset) break - - event = summary_pb2.Event.FromString(event_str) - self._event_parse(event) - except exceptions.CRCFailedError: - file_handler.reset_offset(start_offset) - logger.warning("Check crc faild and ignore this file, file_path=%s, " - "offset=%s.", file_handler.file_path, file_handler.offset) - break - except (OSError, DecodeError, exceptions.MindInsightException) as ex: - logger.warning("Parse log file fail, and ignore this file, detail: %r," - "file path: %s.", str(ex), file_handler.file_path) - break - except Exception as ex: - logger.exception(ex) - raise UnknownError(str(ex)) + except (OSError, DecodeError, exceptions.MindInsightException) as ex: + logger.warning("Parse log file fail, and ignore this file, detail: %r," + "file path: %s.", str(ex), file_handler.file_path) + break + except Exception as ex: + logger.exception(ex) + raise UnknownError(str(ex)) def _event_load(self, file_handler): """ @@ -381,20 +430,29 @@ class _SummaryParser(_Parser): return event_str - def _event_parse(self, event): + @staticmethod + def _event_parse(event_str, latest_file_name): """ Transform `Event` data to tensor_event and update it to EventsData. + This method is static to avoid sending unnecessary objects to other processes. + Args: - event (Event): Message event in summary proto, data read from file handler. + event (str): Message event string in summary proto, data read from file handler. + latest_file_name (str): Latest file name. """ + plugins = { 'scalar_value': PluginNameEnum.SCALAR, 'image': PluginNameEnum.IMAGE, 'histogram': PluginNameEnum.HISTOGRAM, 'tensor': PluginNameEnum.TENSOR } + logger.debug("Start to parse event string. Event string len: %s.", len(event_str)) + event = summary_pb2.Event.FromString(event_str) + logger.debug("Deserialize event string completed.") + ret_tensor_events = [] if event.HasField('summary'): for value in event.summary.value: for plugin in plugins: @@ -402,6 +460,7 @@ class _SummaryParser(_Parser): continue plugin_name_enum = plugins[plugin] tensor_event_value = getattr(value, plugin) + logger.debug("Processing plugin value: %s.", plugin_name_enum) if plugin == 'histogram': tensor_event_value = HistogramContainer(tensor_event_value) @@ -419,29 +478,23 @@ class _SummaryParser(_Parser): tag='{}/{}'.format(value.tag, plugin_name_enum.value), plugin_name=plugin_name_enum.value, value=tensor_event_value, - filename=self._latest_filename) - self._events_data.add_tensor_event(tensor_event) + filename=latest_file_name) + logger.debug("Tensor event generated, plugin is %s, tag is %s, step is %s.", + plugin_name_enum, value.tag, event.step) + ret_tensor_events.append(tensor_event) elif event.HasField('graph_def'): graph = MSGraph() graph.build_graph(event.graph_def) tensor_event = TensorEvent(wall_time=event.wall_time, step=event.step, - tag=self._latest_filename, + tag=latest_file_name, plugin_name=PluginNameEnum.GRAPH.value, value=graph, - filename=self._latest_filename) - - try: - graph_tags = self._events_data.list_tags_by_plugin(PluginNameEnum.GRAPH.value) - except KeyError: - graph_tags = [] - - summary_tags = self.filter_files(graph_tags) - for tag in summary_tags: - self._events_data.delete_tensor_event(tag) + filename=latest_file_name) + ret_tensor_events.append(tensor_event) - self._events_data.add_tensor_event(tensor_event) + return ret_tensor_events @staticmethod def _compare_summary_file(current_file, dst_file): diff --git a/mindinsight/datavisual/data_transform/tensor_container.py b/mindinsight/datavisual/data_transform/tensor_container.py index fc1cbba1..17535367 100644 --- a/mindinsight/datavisual/data_transform/tensor_container.py +++ b/mindinsight/datavisual/data_transform/tensor_container.py @@ -199,8 +199,8 @@ class TensorContainer: def __init__(self, tensor_message): self._lock = threading.Lock - self._msg = tensor_message - self._dims = tensor_message.dims + # Original dims can not be pickled to transfer to other process, so tuple is used. + self._dims = tuple(tensor_message.dims) self._data_type = tensor_message.data_type self._np_array = None self._data = _get_data_from_tensor(tensor_message) @@ -265,5 +265,4 @@ class TensorContainer: logger.error("Reshape array fail, detail: %r", str(ex)) return - self._msg = None self._np_array = ndarray diff --git a/mindinsight/datavisual/processors/tensor_processor.py b/mindinsight/datavisual/processors/tensor_processor.py index 07996f12..bd1ba979 100644 --- a/mindinsight/datavisual/processors/tensor_processor.py +++ b/mindinsight/datavisual/processors/tensor_processor.py @@ -245,7 +245,7 @@ class TensorProcessor(BaseProcessor): # This value is an instance of TensorContainer value = tensor.value value_dict = { - "dims": tuple(value.dims), + "dims": value.dims, "data_type": anf_ir_pb2.DataType.Name(value.data_type) } if detail and detail == 'stats': @@ -313,7 +313,7 @@ class TensorProcessor(BaseProcessor): "wall_time": tensor.wall_time, "step": tensor.step, "value": { - "dims": tuple(value.dims), + "dims": value.dims, "data_type": anf_ir_pb2.DataType.Name(value.data_type), "data": res_data.tolist(), "statistics": get_statistics_dict(value, flatten_data) @@ -362,7 +362,7 @@ class TensorProcessor(BaseProcessor): "wall_time": tensor.wall_time, "step": tensor.step, "value": { - "dims": tuple(value.dims), + "dims": value.dims, "data_type": anf_ir_pb2.DataType.Name(value.data_type), "histogram_buckets": buckets, "statistics": get_statistics_dict(value, None) diff --git a/mindinsight/scripts/stop.py b/mindinsight/scripts/stop.py index ca9fe494..ebffe177 100644 --- a/mindinsight/scripts/stop.py +++ b/mindinsight/scripts/stop.py @@ -103,21 +103,17 @@ class Command(BaseCommand): self.logfile.info('Stop mindinsight with port %s and pid %s.', port, pid) process = psutil.Process(pid) - child_pids = [child.pid for child in process.children()] + processes_to_kill = [process] + # Set recursive to True to kill grand children processes. + for child in process.children(recursive=True): + processes_to_kill.append(child) - # kill gunicorn master process - try: - os.kill(pid, signal.SIGKILL) - except PermissionError: - self.console.info('kill pid %s failed due to permission error', pid) - sys.exit(1) - - # cleanup gunicorn worker processes - for child_pid in child_pids: + for proc in processes_to_kill: + self.logfile.info('Stopping mindinsight process %s.', proc.pid) try: - os.kill(child_pid, signal.SIGKILL) - except ProcessLookupError: - pass + proc.send_signal(signal.SIGKILL) + except psutil.Error as ex: + self.logfile.warning("Stop process %s failed. Detail: %s.", proc.pid, str(ex)) for hook in HookUtils.instance().hooks(): hook.on_shutdown(self.logfile) @@ -154,7 +150,19 @@ class Command(BaseCommand): if user != process.username(): continue - pid = process.pid if process.ppid() == 1 else process.ppid() + gunicorn_master_process = process + + # The gunicorn master process might have grand children (eg forked by process pool). + while True: + parent_process = gunicorn_master_process.parent() + if parent_process is None or parent_process.pid == 1: + break + parent_cmd = parent_process.cmdline() + if ' '.join(parent_cmd).find(self.cmd_regex) == -1: + break + gunicorn_master_process = parent_process + + pid = gunicorn_master_process.pid for open_file in process.open_files(): if open_file.path.endswith(self.access_log_path):