diff --git a/mindinsight/backend/config/gunicorn_conf.py b/mindinsight/backend/config/gunicorn_conf.py index c158d695..c4ae6ce3 100644 --- a/mindinsight/backend/config/gunicorn_conf.py +++ b/mindinsight/backend/config/gunicorn_conf.py @@ -17,6 +17,7 @@ import os import time import signal +import sys import multiprocessing import threading from importlib import import_module @@ -51,6 +52,11 @@ def on_starting(server): threading.Thread(target=hook.on_startup, args=(server.log,)).start() +# This global variable is to manage the listen process so that we can close the +# process when gunicorn is exiting. +LISTEN_PROCESS = None + + def post_worker_init(worker): """ Launch a process to listen worker after gunicorn worker is initialized. @@ -62,7 +68,9 @@ def post_worker_init(worker): worker (ThreadWorker): worker instance. """ def murder_worker_children_processes(): - signal.signal(signal.SIGTERM, signal.SIG_IGN) + signal.signal( + signal.SIGTERM, + lambda signal_num, handler: sys.exit(0)) processes_to_kill = [] # sleep 3 seconds so that all worker children processes have been launched. time.sleep(3) @@ -91,18 +99,15 @@ def post_worker_init(worker): listen_process = multiprocessing.Process(target=murder_worker_children_processes, name="murder_worker_children_processes") listen_process.start() + global LISTEN_PROCESS + LISTEN_PROCESS = listen_process worker.log.info("Server pid: %d, start to listening.", worker.ppid) def worker_int(worker): """Terminate child processes when worker is interrupted.""" terminate() - process = psutil.Process(worker.pid) - children = process.children(recursive=True) - for child in children: - try: - child.send_signal(signal.SIGTERM) - except psutil.NoSuchProcess: - continue - except psutil.Error as ex: - worker.log.error("Stop process %d failed. Detail: %s.", child.pid, str(ex)) + global LISTEN_PROCESS + if LISTEN_PROCESS is not None: + LISTEN_PROCESS.terminate() + worker.log.info("Worker int processed.") diff --git a/mindinsight/datavisual/data_transform/data_manager.py b/mindinsight/datavisual/data_transform/data_manager.py index d8090899..633997f3 100644 --- a/mindinsight/datavisual/data_transform/data_manager.py +++ b/mindinsight/datavisual/data_transform/data_manager.py @@ -912,18 +912,16 @@ class DataManager: return self.status = DataManagerStatus.LOADING.value - with ComputingResourceManager(executors_cnt=1, - max_processes_cnt=settings.MAX_PROCESSES_COUNT) as computing_resource_mgr: - with computing_resource_mgr.get_executor() as executor: - self._brief_cache.update_cache(executor) - brief_cache_update = time.time() - for _ in self._detail_cache.update_cache(executor): - update_interval = time.time() - brief_cache_update - logger.debug('Loading one round of detail cache taking %ss.', update_interval) - if update_interval > 3: # Use 3 seconds as threshold to avoid updating too often - self._brief_cache.update_cache(executor) - brief_cache_update += update_interval - executor.wait_all_tasks_finish() + with ComputingResourceManager.get_instance().get_executor( + max_processes_cnt=settings.MAX_PROCESSES_COUNT) as executor: + self._brief_cache.update_cache(executor) + brief_cache_update = time.time() + for _ in self._detail_cache.update_cache(executor): + update_interval = time.time() - brief_cache_update + logger.debug('Loading one round of detail cache taking %ss.', update_interval) + if update_interval > 3: # Use 3 seconds as threshold to avoid updating too often + self._brief_cache.update_cache(executor) + brief_cache_update += update_interval with self._status_mutex: if not self._brief_cache.has_content() and not self._detail_cache.has_content(): self.status = DataManagerStatus.INVALID.value diff --git a/mindinsight/datavisual/data_transform/ms_data_loader.py b/mindinsight/datavisual/data_transform/ms_data_loader.py index f8446f15..e854a7cd 100644 --- a/mindinsight/datavisual/data_transform/ms_data_loader.py +++ b/mindinsight/datavisual/data_transform/ms_data_loader.py @@ -102,12 +102,10 @@ class MSDataLoader: if executor is not None: raise TypeError("'executor' should be an Executor instance or None.") - with ComputingResourceManager() as mgr: - with mgr.get_executor() as new_executor: - while not self._load(new_executor): - pass - new_executor.wait_all_tasks_finish() - return True + with ComputingResourceManager.get_instance().get_executor() as new_executor: + while not self._load(new_executor): + pass + return True def _load(self, executor): """ diff --git a/mindinsight/scripts/stop.py b/mindinsight/scripts/stop.py index 0aa56825..bc4b781b 100644 --- a/mindinsight/scripts/stop.py +++ b/mindinsight/scripts/stop.py @@ -109,9 +109,11 @@ class Command(BaseCommand): processes.append(process) try: self._send_signal(process, signal.SIGINT) - # Wait 2 second, if not terminate, kill the worker process. - _, alive = psutil.wait_procs(processes, 2) + # Wait a moment, if not terminate, kill the worker process. + exit_timeout_seconds = 5 + _, alive = psutil.wait_procs(processes, exit_timeout_seconds) for alive_process in alive: + self.logfile.info("Stop process %d because timeout.", alive_process.pid) self._send_signal(alive_process, signal.SIGKILL) except psutil.Error as ex: self.logfile.error("Stop process %d failed. Detail: %s.", pid, str(ex)) diff --git a/mindinsight/utils/computing_resource_mgr.py b/mindinsight/utils/computing_resource_mgr.py index fe5778a8..0b451e01 100644 --- a/mindinsight/utils/computing_resource_mgr.py +++ b/mindinsight/utils/computing_resource_mgr.py @@ -13,267 +13,146 @@ # limitations under the License. # ============================================================================ """Compute resource manager.""" -import fractions -import math + + +import functools +import multiprocessing import threading from concurrent import futures -import multiprocessing -from mindinsight.utils.log import setup_logger -from mindinsight.utils.constant import GeneralErrors -from mindinsight.utils.exceptions import MindInsightException +import _thread +from mindinsight.utils.log import setup_logger _MP_CONTEXT = multiprocessing.get_context(method="forkserver") -terminating = False class ComputingResourceManager: """ Manager for computing resources. - This class provides executors for computing tasks. Executors can only be used once. - - Args: - executors_cnt (int): Number of executors to be provided by this class. - max_processes_cnt (int): Max number of processes to be used for computing. + Note: + 1. Please always use the get_instance method to get instance. + 2. This class will be used in a multi-threaded env, so it needs to be + thread-safe. """ - def __init__(self, executors_cnt=1, max_processes_cnt=4): - self._max_processes_cnt = max_processes_cnt - self._executors_cnt = executors_cnt + _cls_lock = threading.Lock() + _instance = None + _exiting = False + + def __init__(self): + self._executors = {} + self._executor_id_counter = 1 self._lock = threading.Lock() - self._executors = { - ind: Executor( - self, executor_id=ind, - available_workers=fractions.Fraction(self._max_processes_cnt, self._executors_cnt)) - for ind in range(self._executors_cnt) - } - self._remaining_executors = len(self._executors) - self._backend = futures.ProcessPoolExecutor(max_workers=max_processes_cnt, mp_context=_MP_CONTEXT) - self.logger = setup_logger("utils", "utils") - self.logger.info("Initialized ComputingResourceManager with executors_cnt=%s, max_processes_cnt=%s.", - executors_cnt, max_processes_cnt) + self._exiting = False + self._logger = setup_logger("utils", "utils") - def __enter__(self): - """This method is not thread safe.""" - return self + @classmethod + def get_instance(cls): + """Get the singleton instance.""" + with cls._cls_lock: + if cls._instance is None: + cls._instance = ComputingResourceManager() + return cls._instance - def __exit__(self, exc_type, exc_val, exc_tb): + def exit(self): """ - This should not block because every executor have waited. If it blocks, there may be some problem. + Called when the gunicorn worker process is exiting. - This method is not thread safe. + This method will be called in the signal handling thread, which is not + the same thread with get_executor. Also, this method will hold the lock + to block other threads from operating the singleton or executors. """ - self._backend.shutdown() + with self._lock: + self._logger.info("Start to exit.") + self._exiting = True + for executor in self._executors.values(): + # It's safe to call executor.shutdown() multiple times. + executor.shutdown(wait=True) + self._logger.info("Exited.") - def get_executor(self): + def get_executor(self, max_processes_cnt=1): """ Get an executor. - Returns: - Executor, which can be used for submitting tasks. - - Raises: - ComputeResourceManagerException: when no more executor is available. + This method may be called by different business from different threads. + So it needs to be tread-safe. """ with self._lock: - self._remaining_executors -= 1 - if self._remaining_executors < 0: - raise ComputingResourceManagerException("No more executors.") - return self._executors[self._remaining_executors] - - def destroy_executor(self, executor_id): - """ - Destroy an executor to reuse it's workers. - - Args: - executor_id (int): Id of the executor to be destroyed. - """ + if self._exiting: + self._logger.info( + "System is exiting, will terminate the thread.") + _thread.exit() + + executor = Executor( + max_processes_cnt=max_processes_cnt, + exit_callback=functools.partial( + self._remove_executor, + executor_id=self._executor_id_counter), + exit_check_fn=self._check_exit + ) + + self._executors[self._executor_id_counter] = executor + self._executor_id_counter += 1 + return executor + + def _remove_executor(self, executor_id): with self._lock: - released_workers = self._executors[executor_id].available_workers self._executors.pop(executor_id) - remaining_executors = len(self._executors) - self.logger.info("Destroy executor %s. Will release %s worker(s). Remaining executors: %s.", - executor_id, released_workers, remaining_executors) - if not remaining_executors: - return - - for executor in self._executors.values(): - executor.add_worker( - fractions.Fraction( - released_workers.numerator, - released_workers.denominator * remaining_executors)) - - def submit(self, *args, **kwargs): - """ - Submit a task. - - See concurrent.futures.Executor.submit() for details. - - This method should only be called by Executor. Users should not call this method directly. - """ + def _check_exit(self): with self._lock: - if not terminating: - return self._backend.submit(*args, **kwargs) - self.logger.info('Got submit after process pool shutdown.') - return None - - -class ComputingResourceManagerException(MindInsightException): - """ - Indicates a computing resource error has occurred. - - This exception should not be presented to end users. - - Args: - msg (str): Exception message. - """ - def __init__(self, msg): - super().__init__(error=GeneralErrors.COMPUTING_RESOURCE_ERROR, message=msg) - - -class WrappedFuture: - """ - Wrap Future objects with custom logics to release compute slots. - - Args: - executor (Executor): The executor which generates this future. - original_future (futures.Future): Original future object. - """ - def __init__(self, executor, original_future: futures.Future): - self._original_future = original_future - self._executor = executor - self.logger = setup_logger("utils", "utils") - - def add_done_callback(self, callback): - """ - Add done callback. - - See futures.Future.add_done_callback() for details. - """ - def _wrapped_callback(*args, **kwargs): - self.logger.debug("Future callback called.") - try: - return callback(*args, **kwargs) - finally: - self._executor.release_slot() - self._executor.remove_done_future(self._original_future) - self._original_future.add_done_callback(_wrapped_callback) + return self._exiting class Executor: """ - Task executor. + Wrapped ProcessPoolExecutor to help global management. Args: - mgr (ComputingResourceManager): The ComputingResourceManager that generates this executor. - executor_id (int): Executor id. - available_workers (fractions.Fraction): Available workers. + max_processes_cnt (int): Max processes to use. + exit_callback (Callable): A callback that will be called after process + pool exit. + exit_check_fn (Callable): A function to check whether the system is + exiting. """ - def __init__(self, mgr: ComputingResourceManager, executor_id, available_workers): - self._mgr = mgr - self.closed = False - self._available_workers = available_workers - self._effective_workers = self._calc_effective_workers(self._available_workers) - self._slots = threading.Semaphore(value=self._effective_workers) - self._id = executor_id - self._futures = set() - - self._lock = threading.Lock() - self.logger = setup_logger("utils", "utils") - self.logger.debug("Available workers: %s.", available_workers) + def __init__(self, max_processes_cnt, exit_callback, exit_check_fn): + self._backend = futures.ProcessPoolExecutor( + max_workers=max_processes_cnt, + mp_context=_MP_CONTEXT) + self._exit_callback = exit_callback + self._task_slots = threading.Semaphore(value=max_processes_cnt) + self._exit_check_fn = exit_check_fn + self._logger = setup_logger("utils", "utils") def __enter__(self): - """This method is not thread safe.""" - if self.closed: - raise ComputingResourceManagerException("Can not reopen closed executor.") + self._backend.__enter__() return self - def __exit__(self, exc_type, exc_val, exc_tb): - """This method is not thread safe.""" - self._close() + def __exit__(self, *args, **kwargs): + ret = self._backend.__exit__(*args, **kwargs) + self._exit_callback() + return ret def submit(self, *args, **kwargs): - """ - Submit task. - - See concurrent.futures.Executor.submit() for details. This method is not thread safe. - """ - self.logger.debug("Task submitted to executor %s.", self._id) - - if self.closed: - raise ComputingResourceManagerException("Cannot submit task to a closed executor.") - - # Thread will wait on acquire(). - self._slots.acquire() - future = self._mgr.submit(*args, **kwargs) - if future is None: - return None - - # set.add is atomic in c-python. - self._futures.add(future) - return WrappedFuture(self, future) + if self._exit_check_fn(): + self._logger.warning( + "System exiting, will terminate current thread.") + _thread.exit() + self._task_slots.acquire() + future = self._backend.submit(*args, **kwargs) + # The future object is not needed for releasing semaphores. + future.add_done_callback(lambda future_obj: self._task_slots.release()) + return future - def release_slot(self): - """ - Release a slot for new tasks to be submitted. - - Semaphore is itself thread safe, so no lock is needed. - - This method should only be called by ExecutorFuture. - """ - self._slots.release() - - def remove_done_future(self, future): - """ - Remove done futures so the executor will not track them. - - This method should only be called by WrappedFuture. - """ - # set.remove is atomic in c-python so no lock is needed. - self._futures.remove(future) - - @staticmethod - def _calc_effective_workers(available_workers): - return 1 if available_workers <= 1 else math.floor(available_workers) + submit.__doc__ = futures.Executor.submit.__doc__ - def _close(self): - self.closed = True - self.logger.debug("Executor is being closed, futures to wait: %s", self._futures) - futures.wait(self._futures) - self.logger.debug("Executor wait futures completed.") - self._mgr.destroy_executor(self._id) - self.logger.debug("Executor is closed.") + def shutdown(self, wait): + self._backend.shutdown(wait) - @property - def available_workers(self): - """Get available workers.""" - with self._lock: - return self._available_workers - - def add_worker(self, added_available_workers): - """This method should only be called by ComputeResourceManager.""" - self.logger.debug("Add worker: %s", added_available_workers) - with self._lock: - self._available_workers += added_available_workers - new_effective_workers = self._calc_effective_workers(self._available_workers) - if new_effective_workers > self._effective_workers: - for _ in range(new_effective_workers - self._effective_workers): - self._slots.release() - - self._effective_workers = new_effective_workers - - def wait_all_tasks_finish(self): - """ - Wait all tasks finish. - - This method is not thread safe. - """ - futures.wait(self._futures) + shutdown.__doc__ = futures.Executor.shutdown.__doc__ def terminate(): """Set the terminating flag.""" - global terminating - terminating = True + ComputingResourceManager.get_instance().exit()