# Copyright 2020-2021 Huawei Technologies Co., Ltd # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. # ============================================================================ """Compute resource manager.""" import functools import multiprocessing import threading from concurrent import futures import _thread from mindinsight.utils.log import setup_logger _MP_CONTEXT = multiprocessing.get_context(method="forkserver") class ComputingResourceManager: """ Manager for computing resources. Note: 1. Please always use the get_instance method to get instance. 2. This class will be used in a multi-threaded env, so it needs to be thread-safe. """ _cls_lock = threading.Lock() _instance = None _exiting = False def __init__(self): self._executors = {} self._executor_id_counter = 1 self._lock = threading.Lock() self._exiting = False self._logger = setup_logger("utils", "utils") @classmethod def get_instance(cls): """Get the singleton instance.""" with cls._cls_lock: if cls._instance is None: cls._instance = ComputingResourceManager() return cls._instance def exit(self): """ Called when the gunicorn worker process is exiting. This method will be called in the signal handling thread, which is not the same thread with get_executor. Also, this method will hold the lock to block other threads from operating the singleton or executors. """ with self._lock: self._logger.info("Start to exit.") self._exiting = True for executor in self._executors.values(): # It's safe to call executor.shutdown() multiple times. executor.shutdown(wait=True) self._logger.info("Exited.") def get_executor(self, max_processes_cnt=1): """ Get an executor. This method may be called by different business from different threads. So it needs to be tread-safe. """ with self._lock: if self._exiting: self._logger.info( "System is exiting, will terminate the thread.") _thread.exit() executor = Executor( max_processes_cnt=max_processes_cnt, exit_callback=functools.partial( self._remove_executor, executor_id=self._executor_id_counter), exit_check_fn=self._check_exit ) self._executors[self._executor_id_counter] = executor self._executor_id_counter += 1 return executor def _remove_executor(self, executor_id): with self._lock: self._executors.pop(executor_id) def _check_exit(self): with self._lock: return self._exiting class Executor: """ Wrapped ProcessPoolExecutor to help global management. Args: max_processes_cnt (int): Max processes to use. exit_callback (Callable): A callback that will be called after process pool exit. exit_check_fn (Callable): A function to check whether the system is exiting. """ def __init__(self, max_processes_cnt, exit_callback, exit_check_fn): self._backend = futures.ProcessPoolExecutor( max_workers=max_processes_cnt, mp_context=_MP_CONTEXT) self._exit_callback = exit_callback self._task_slots = threading.Semaphore(value=max_processes_cnt) self._exit_check_fn = exit_check_fn self._logger = setup_logger("utils", "utils") def __enter__(self): self._backend.__enter__() return self def __exit__(self, *args, **kwargs): ret = self._backend.__exit__(*args, **kwargs) self._exit_callback() return ret def submit(self, *args, **kwargs): if self._exit_check_fn(): self._logger.warning( "System exiting, will terminate current thread.") _thread.exit() self._task_slots.acquire() future = self._backend.submit(*args, **kwargs) # The future object is not needed for releasing semaphores. future.add_done_callback(lambda future_obj: self._task_slots.release()) return future submit.__doc__ = futures.Executor.submit.__doc__ def shutdown(self, wait): self._backend.shutdown(wait) shutdown.__doc__ = futures.Executor.shutdown.__doc__ def terminate(): """Set the terminating flag.""" ComputingResourceManager.get_instance().exit()