diff --git a/mindinsight/backend/config/gunicorn_conf.py b/mindinsight/backend/config/gunicorn_conf.py index cbeaa000..c158d695 100644 --- a/mindinsight/backend/config/gunicorn_conf.py +++ b/mindinsight/backend/config/gunicorn_conf.py @@ -1,4 +1,4 @@ -# Copyright 2019 Huawei Technologies Co., Ltd +# Copyright 2019-2021 Huawei Technologies Co., Ltd # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -15,15 +15,17 @@ """Config file for gunicorn.""" import os -import multiprocessing +import time import signal +import multiprocessing import threading -import time from importlib import import_module import psutil import gunicorn +from mindinsight.utils.computing_resource_mgr import terminate + gunicorn.SERVER_SOFTWARE = 'unknown' @@ -60,6 +62,7 @@ def post_worker_init(worker): worker (ThreadWorker): worker instance. """ def murder_worker_children_processes(): + signal.signal(signal.SIGTERM, signal.SIG_IGN) processes_to_kill = [] # sleep 3 seconds so that all worker children processes have been launched. time.sleep(3) @@ -69,8 +72,10 @@ def post_worker_init(worker): processes_to_kill.append(child) while True: if os.getppid() != worker.pid: + # Kill the remaining sub-processed after the worker process died + _, alive = psutil.wait_procs(processes_to_kill, 0.1) current_worker_pid = os.getppid() - for proc in processes_to_kill: + for proc in alive: worker.log.info("Original worker pid: %d, current worker pid: %d, stop process %d", worker.pid, current_worker_pid, proc.pid) try: @@ -79,7 +84,7 @@ def post_worker_init(worker): continue except psutil.Error as ex: worker.log.error("Stop process %d failed. Detail: %s.", proc.pid, str(ex)) - worker.log.info("%d processes have been killed.", len(processes_to_kill)) + worker.log.info("%d processes have been terminated by listener.", len(alive)) break time.sleep(1) @@ -87,3 +92,17 @@ def post_worker_init(worker): name="murder_worker_children_processes") listen_process.start() worker.log.info("Server pid: %d, start to listening.", worker.ppid) + + +def worker_int(worker): + """Terminate child processes when worker is interrupted.""" + terminate() + process = psutil.Process(worker.pid) + children = process.children(recursive=True) + for child in children: + try: + child.send_signal(signal.SIGTERM) + except psutil.NoSuchProcess: + continue + except psutil.Error as ex: + worker.log.error("Stop process %d failed. Detail: %s.", child.pid, str(ex)) diff --git a/mindinsight/datavisual/data_transform/ms_data_loader.py b/mindinsight/datavisual/data_transform/ms_data_loader.py index 883a8e15..f8446f15 100644 --- a/mindinsight/datavisual/data_transform/ms_data_loader.py +++ b/mindinsight/datavisual/data_transform/ms_data_loader.py @@ -1,4 +1,4 @@ -# Copyright 2019 Huawei Technologies Co., Ltd +# Copyright 2019-2021 Huawei Technologies Co., Ltd # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -203,7 +203,8 @@ class _PbParser(_Parser): tensor_event = future_value.result() if tensor_event is not None: events_data.add_tensor_event(tensor_event) - future.add_done_callback(exception_no_raise_wrapper(add_tensor_event)) + if future is not None: + future.add_done_callback(exception_no_raise_wrapper(add_tensor_event)) return False return True @@ -397,7 +398,8 @@ class _SummaryParser(_Parser): events_data.add_tensor_event(tensor_value) - future.add_done_callback(exception_no_raise_wrapper(_add_tensor_event_callback)) + if future is not None: + future.add_done_callback(exception_no_raise_wrapper(_add_tensor_event_callback)) return False except (exceptions.CRCFailedError, exceptions.CRCLengthFailedError) as exc: file_handler.reset_offset(start_offset) diff --git a/mindinsight/scripts/stop.py b/mindinsight/scripts/stop.py index 55562ca9..0aa56825 100644 --- a/mindinsight/scripts/stop.py +++ b/mindinsight/scripts/stop.py @@ -1,4 +1,4 @@ -# Copyright 2019 Huawei Technologies Co., Ltd +# Copyright 2019-2021 Huawei Technologies Co., Ltd # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -105,23 +105,28 @@ class Command(BaseCommand): self.logfile.info('Stop mindinsight with port %s and pid %s.', port, pid) process = psutil.Process(pid) - processes_to_kill = [process] - # Set recursive to True to kill grand children processes. - for child in process.children(recursive=True): - processes_to_kill.append(child) - - for proc in processes_to_kill: - self.logfile.info('Stopping mindinsight process %s.', proc.pid) - try: - proc.send_signal(signal.SIGKILL) - except psutil.Error as ex: - self.logfile.warning("Stop process %s failed. Detail: %s.", proc.pid, str(ex)) + processes = process.children(recursive=True) + processes.append(process) + try: + self._send_signal(process, signal.SIGINT) + # Wait 2 second, if not terminate, kill the worker process. + _, alive = psutil.wait_procs(processes, 2) + for alive_process in alive: + self._send_signal(alive_process, signal.SIGKILL) + except psutil.Error as ex: + self.logfile.error("Stop process %d failed. Detail: %s.", pid, str(ex)) for hook in HookUtils.instance().hooks(): hook.on_shutdown(self.logfile) self.console.info('Stop mindinsight service successfully') + def _send_signal(self, process, proc_signal): + try: + process.send_signal(proc_signal) + except psutil.NoSuchProcess: + pass + def get_process(self, port): """ Get mindinsight process diff --git a/mindinsight/utils/computing_resource_mgr.py b/mindinsight/utils/computing_resource_mgr.py index 20d0b9f1..ed9067d4 100644 --- a/mindinsight/utils/computing_resource_mgr.py +++ b/mindinsight/utils/computing_resource_mgr.py @@ -1,4 +1,4 @@ -# Copyright 2020 Huawei Technologies Co., Ltd +# Copyright 2020-2021 Huawei Technologies Co., Ltd # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -13,18 +13,21 @@ # limitations under the License. # ============================================================================ """Compute resource manager.""" +import sys import fractions import math import threading -import multiprocessing from concurrent import futures +import signal +import multiprocessing from mindinsight.utils.log import setup_logger from mindinsight.utils.constant import GeneralErrors from mindinsight.utils.exceptions import MindInsightException -_MP_CONTEXT = multiprocessing.get_context(method="forkserver") +_MP_CONTEXT = multiprocessing.get_context(method="fork") +terminating = False class ComputingResourceManager: @@ -48,7 +51,18 @@ class ComputingResourceManager: for ind in range(self._executors_cnt) } self._remaining_executors = len(self._executors) - self._backend = futures.ProcessPoolExecutor(max_workers=max_processes_cnt, mp_context=_MP_CONTEXT) + + def initializer(): + origin_handler = signal.getsignal(signal.SIGTERM) + + def handler(sig, frame): + origin_handler(sig, frame) + sys.exit(0) + + signal.signal(signal.SIGTERM, handler) + + self._backend = futures.ProcessPoolExecutor(max_workers=max_processes_cnt, mp_context=_MP_CONTEXT, + initializer=initializer) self.logger = setup_logger("utils", "utils") self.logger.info("Initialized ComputingResourceManager with executors_cnt=%s, max_processes_cnt=%s.", executors_cnt, max_processes_cnt) @@ -113,7 +127,10 @@ class ComputingResourceManager: This method should only be called by Executor. Users should not call this method directly. """ with self._lock: - return self._backend.submit(*args, **kwargs) + if not terminating: + return self._backend.submit(*args, **kwargs) + self.logger.info('Got submit after process pool shutdown.') + return None class ComputingResourceManagerException(MindInsightException): @@ -204,6 +221,8 @@ class Executor: # Thread will wait on acquire(). self._slots.acquire() future = self._mgr.submit(*args, **kwargs) + if future is None: + return None # set.add is atomic in c-python. self._futures.add(future) @@ -265,3 +284,9 @@ class Executor: This method is not thread safe. """ futures.wait(self._futures) + + +def terminate(): + """Set the terminating flag.""" + global terminating + terminating = True