Browse Source

Refactored the computing resource manager to:

1. Ease the elegant exiting procedure by providing a singleton access pattern. Now the exiting procedure can easily call the exit_non_block method to notify the executors to start exiting worker processes. If the worker processes not exit after a timeout, the mindinsight stop still can kill these processes.

2. Build a framework for managing computing resources used by different businesses. Different businesses can now call the get_executor() API to get their process pool to execute compute heavy tasks outside the mindinsight guinicorn worker process, which will make the restful api respond more quickly.
pull/1248/head
wenkai 4 years ago
parent
commit
f8f50d9508
5 changed files with 127 additions and 245 deletions
  1. +15
    -10
      mindinsight/backend/config/gunicorn_conf.py
  2. +10
    -12
      mindinsight/datavisual/data_transform/data_manager.py
  3. +4
    -6
      mindinsight/datavisual/data_transform/ms_data_loader.py
  4. +4
    -2
      mindinsight/scripts/stop.py
  5. +94
    -215
      mindinsight/utils/computing_resource_mgr.py

+ 15
- 10
mindinsight/backend/config/gunicorn_conf.py View File

@@ -17,6 +17,7 @@
import os import os
import time import time
import signal import signal
import sys
import multiprocessing import multiprocessing
import threading import threading
from importlib import import_module from importlib import import_module
@@ -51,6 +52,11 @@ def on_starting(server):
threading.Thread(target=hook.on_startup, args=(server.log,)).start() threading.Thread(target=hook.on_startup, args=(server.log,)).start()




# This global variable is to manage the listen process so that we can close the
# process when gunicorn is exiting.
LISTEN_PROCESS = None


def post_worker_init(worker): def post_worker_init(worker):
""" """
Launch a process to listen worker after gunicorn worker is initialized. Launch a process to listen worker after gunicorn worker is initialized.
@@ -62,7 +68,9 @@ def post_worker_init(worker):
worker (ThreadWorker): worker instance. worker (ThreadWorker): worker instance.
""" """
def murder_worker_children_processes(): def murder_worker_children_processes():
signal.signal(signal.SIGTERM, signal.SIG_IGN)
signal.signal(
signal.SIGTERM,
lambda signal_num, handler: sys.exit(0))
processes_to_kill = [] processes_to_kill = []
# sleep 3 seconds so that all worker children processes have been launched. # sleep 3 seconds so that all worker children processes have been launched.
time.sleep(3) time.sleep(3)
@@ -91,18 +99,15 @@ def post_worker_init(worker):
listen_process = multiprocessing.Process(target=murder_worker_children_processes, listen_process = multiprocessing.Process(target=murder_worker_children_processes,
name="murder_worker_children_processes") name="murder_worker_children_processes")
listen_process.start() listen_process.start()
global LISTEN_PROCESS
LISTEN_PROCESS = listen_process
worker.log.info("Server pid: %d, start to listening.", worker.ppid) worker.log.info("Server pid: %d, start to listening.", worker.ppid)




def worker_int(worker): def worker_int(worker):
"""Terminate child processes when worker is interrupted.""" """Terminate child processes when worker is interrupted."""
terminate() terminate()
process = psutil.Process(worker.pid)
children = process.children(recursive=True)
for child in children:
try:
child.send_signal(signal.SIGTERM)
except psutil.NoSuchProcess:
continue
except psutil.Error as ex:
worker.log.error("Stop process %d failed. Detail: %s.", child.pid, str(ex))
global LISTEN_PROCESS
if LISTEN_PROCESS is not None:
LISTEN_PROCESS.terminate()
worker.log.info("Worker int processed.")

+ 10
- 12
mindinsight/datavisual/data_transform/data_manager.py View File

@@ -912,18 +912,16 @@ class DataManager:
return return
self.status = DataManagerStatus.LOADING.value self.status = DataManagerStatus.LOADING.value


with ComputingResourceManager(executors_cnt=1,
max_processes_cnt=settings.MAX_PROCESSES_COUNT) as computing_resource_mgr:
with computing_resource_mgr.get_executor() as executor:
self._brief_cache.update_cache(executor)
brief_cache_update = time.time()
for _ in self._detail_cache.update_cache(executor):
update_interval = time.time() - brief_cache_update
logger.debug('Loading one round of detail cache taking %ss.', update_interval)
if update_interval > 3: # Use 3 seconds as threshold to avoid updating too often
self._brief_cache.update_cache(executor)
brief_cache_update += update_interval
executor.wait_all_tasks_finish()
with ComputingResourceManager.get_instance().get_executor(
max_processes_cnt=settings.MAX_PROCESSES_COUNT) as executor:
self._brief_cache.update_cache(executor)
brief_cache_update = time.time()
for _ in self._detail_cache.update_cache(executor):
update_interval = time.time() - brief_cache_update
logger.debug('Loading one round of detail cache taking %ss.', update_interval)
if update_interval > 3: # Use 3 seconds as threshold to avoid updating too often
self._brief_cache.update_cache(executor)
brief_cache_update += update_interval
with self._status_mutex: with self._status_mutex:
if not self._brief_cache.has_content() and not self._detail_cache.has_content(): if not self._brief_cache.has_content() and not self._detail_cache.has_content():
self.status = DataManagerStatus.INVALID.value self.status = DataManagerStatus.INVALID.value


+ 4
- 6
mindinsight/datavisual/data_transform/ms_data_loader.py View File

@@ -102,12 +102,10 @@ class MSDataLoader:
if executor is not None: if executor is not None:
raise TypeError("'executor' should be an Executor instance or None.") raise TypeError("'executor' should be an Executor instance or None.")


with ComputingResourceManager() as mgr:
with mgr.get_executor() as new_executor:
while not self._load(new_executor):
pass
new_executor.wait_all_tasks_finish()
return True
with ComputingResourceManager.get_instance().get_executor() as new_executor:
while not self._load(new_executor):
pass
return True


def _load(self, executor): def _load(self, executor):
""" """


+ 4
- 2
mindinsight/scripts/stop.py View File

@@ -109,9 +109,11 @@ class Command(BaseCommand):
processes.append(process) processes.append(process)
try: try:
self._send_signal(process, signal.SIGINT) self._send_signal(process, signal.SIGINT)
# Wait 2 second, if not terminate, kill the worker process.
_, alive = psutil.wait_procs(processes, 2)
# Wait a moment, if not terminate, kill the worker process.
exit_timeout_seconds = 5
_, alive = psutil.wait_procs(processes, exit_timeout_seconds)
for alive_process in alive: for alive_process in alive:
self.logfile.info("Stop process %d because timeout.", alive_process.pid)
self._send_signal(alive_process, signal.SIGKILL) self._send_signal(alive_process, signal.SIGKILL)
except psutil.Error as ex: except psutil.Error as ex:
self.logfile.error("Stop process %d failed. Detail: %s.", pid, str(ex)) self.logfile.error("Stop process %d failed. Detail: %s.", pid, str(ex))


+ 94
- 215
mindinsight/utils/computing_resource_mgr.py View File

@@ -13,267 +13,146 @@
# limitations under the License. # limitations under the License.
# ============================================================================ # ============================================================================
"""Compute resource manager.""" """Compute resource manager."""
import fractions
import math


import functools
import multiprocessing
import threading import threading
from concurrent import futures from concurrent import futures
import multiprocessing


from mindinsight.utils.log import setup_logger
from mindinsight.utils.constant import GeneralErrors
from mindinsight.utils.exceptions import MindInsightException
import _thread


from mindinsight.utils.log import setup_logger


_MP_CONTEXT = multiprocessing.get_context(method="forkserver") _MP_CONTEXT = multiprocessing.get_context(method="forkserver")
terminating = False




class ComputingResourceManager: class ComputingResourceManager:
""" """
Manager for computing resources. Manager for computing resources.


This class provides executors for computing tasks. Executors can only be used once.

Args:
executors_cnt (int): Number of executors to be provided by this class.
max_processes_cnt (int): Max number of processes to be used for computing.
Note:
1. Please always use the get_instance method to get instance.
2. This class will be used in a multi-threaded env, so it needs to be
thread-safe.
""" """
def __init__(self, executors_cnt=1, max_processes_cnt=4):
self._max_processes_cnt = max_processes_cnt
self._executors_cnt = executors_cnt
_cls_lock = threading.Lock()
_instance = None
_exiting = False

def __init__(self):
self._executors = {}
self._executor_id_counter = 1
self._lock = threading.Lock() self._lock = threading.Lock()
self._executors = {
ind: Executor(
self, executor_id=ind,
available_workers=fractions.Fraction(self._max_processes_cnt, self._executors_cnt))
for ind in range(self._executors_cnt)
}
self._remaining_executors = len(self._executors)
self._backend = futures.ProcessPoolExecutor(max_workers=max_processes_cnt, mp_context=_MP_CONTEXT)
self.logger = setup_logger("utils", "utils")
self.logger.info("Initialized ComputingResourceManager with executors_cnt=%s, max_processes_cnt=%s.",
executors_cnt, max_processes_cnt)
self._exiting = False
self._logger = setup_logger("utils", "utils")


def __enter__(self):
"""This method is not thread safe."""
return self
@classmethod
def get_instance(cls):
"""Get the singleton instance."""
with cls._cls_lock:
if cls._instance is None:
cls._instance = ComputingResourceManager()
return cls._instance


def __exit__(self, exc_type, exc_val, exc_tb):
def exit(self):
""" """
This should not block because every executor have waited. If it blocks, there may be some problem.
Called when the gunicorn worker process is exiting.


This method is not thread safe.
This method will be called in the signal handling thread, which is not
the same thread with get_executor. Also, this method will hold the lock
to block other threads from operating the singleton or executors.
""" """
self._backend.shutdown()
with self._lock:
self._logger.info("Start to exit.")
self._exiting = True
for executor in self._executors.values():
# It's safe to call executor.shutdown() multiple times.
executor.shutdown(wait=True)
self._logger.info("Exited.")


def get_executor(self):
def get_executor(self, max_processes_cnt=1):
""" """
Get an executor. Get an executor.


Returns:
Executor, which can be used for submitting tasks.

Raises:
ComputeResourceManagerException: when no more executor is available.
This method may be called by different business from different threads.
So it needs to be tread-safe.
""" """
with self._lock: with self._lock:
self._remaining_executors -= 1
if self._remaining_executors < 0:
raise ComputingResourceManagerException("No more executors.")
return self._executors[self._remaining_executors]

def destroy_executor(self, executor_id):
"""
Destroy an executor to reuse it's workers.

Args:
executor_id (int): Id of the executor to be destroyed.
"""
if self._exiting:
self._logger.info(
"System is exiting, will terminate the thread.")
_thread.exit()

executor = Executor(
max_processes_cnt=max_processes_cnt,
exit_callback=functools.partial(
self._remove_executor,
executor_id=self._executor_id_counter),
exit_check_fn=self._check_exit
)

self._executors[self._executor_id_counter] = executor
self._executor_id_counter += 1
return executor

def _remove_executor(self, executor_id):
with self._lock: with self._lock:
released_workers = self._executors[executor_id].available_workers
self._executors.pop(executor_id) self._executors.pop(executor_id)


remaining_executors = len(self._executors)
self.logger.info("Destroy executor %s. Will release %s worker(s). Remaining executors: %s.",
executor_id, released_workers, remaining_executors)
if not remaining_executors:
return

for executor in self._executors.values():
executor.add_worker(
fractions.Fraction(
released_workers.numerator,
released_workers.denominator * remaining_executors))

def submit(self, *args, **kwargs):
"""
Submit a task.

See concurrent.futures.Executor.submit() for details.

This method should only be called by Executor. Users should not call this method directly.
"""
def _check_exit(self):
with self._lock: with self._lock:
if not terminating:
return self._backend.submit(*args, **kwargs)
self.logger.info('Got submit after process pool shutdown.')
return None


class ComputingResourceManagerException(MindInsightException):
"""
Indicates a computing resource error has occurred.

This exception should not be presented to end users.

Args:
msg (str): Exception message.
"""
def __init__(self, msg):
super().__init__(error=GeneralErrors.COMPUTING_RESOURCE_ERROR, message=msg)


class WrappedFuture:
"""
Wrap Future objects with custom logics to release compute slots.

Args:
executor (Executor): The executor which generates this future.
original_future (futures.Future): Original future object.
"""
def __init__(self, executor, original_future: futures.Future):
self._original_future = original_future
self._executor = executor
self.logger = setup_logger("utils", "utils")

def add_done_callback(self, callback):
"""
Add done callback.

See futures.Future.add_done_callback() for details.
"""
def _wrapped_callback(*args, **kwargs):
self.logger.debug("Future callback called.")
try:
return callback(*args, **kwargs)
finally:
self._executor.release_slot()
self._executor.remove_done_future(self._original_future)
self._original_future.add_done_callback(_wrapped_callback)
return self._exiting




class Executor: class Executor:
""" """
Task executor.
Wrapped ProcessPoolExecutor to help global management.


Args: Args:
mgr (ComputingResourceManager): The ComputingResourceManager that generates this executor.
executor_id (int): Executor id.
available_workers (fractions.Fraction): Available workers.
max_processes_cnt (int): Max processes to use.
exit_callback (Callable): A callback that will be called after process
pool exit.
exit_check_fn (Callable): A function to check whether the system is
exiting.
""" """
def __init__(self, mgr: ComputingResourceManager, executor_id, available_workers):
self._mgr = mgr
self.closed = False
self._available_workers = available_workers
self._effective_workers = self._calc_effective_workers(self._available_workers)
self._slots = threading.Semaphore(value=self._effective_workers)
self._id = executor_id
self._futures = set()

self._lock = threading.Lock()
self.logger = setup_logger("utils", "utils")
self.logger.debug("Available workers: %s.", available_workers)
def __init__(self, max_processes_cnt, exit_callback, exit_check_fn):
self._backend = futures.ProcessPoolExecutor(
max_workers=max_processes_cnt,
mp_context=_MP_CONTEXT)
self._exit_callback = exit_callback
self._task_slots = threading.Semaphore(value=max_processes_cnt)
self._exit_check_fn = exit_check_fn
self._logger = setup_logger("utils", "utils")


def __enter__(self): def __enter__(self):
"""This method is not thread safe."""
if self.closed:
raise ComputingResourceManagerException("Can not reopen closed executor.")
self._backend.__enter__()
return self return self


def __exit__(self, exc_type, exc_val, exc_tb):
"""This method is not thread safe."""
self._close()
def __exit__(self, *args, **kwargs):
ret = self._backend.__exit__(*args, **kwargs)
self._exit_callback()
return ret


def submit(self, *args, **kwargs): def submit(self, *args, **kwargs):
"""
Submit task.

See concurrent.futures.Executor.submit() for details. This method is not thread safe.
"""
self.logger.debug("Task submitted to executor %s.", self._id)

if self.closed:
raise ComputingResourceManagerException("Cannot submit task to a closed executor.")

# Thread will wait on acquire().
self._slots.acquire()
future = self._mgr.submit(*args, **kwargs)
if future is None:
return None

# set.add is atomic in c-python.
self._futures.add(future)
return WrappedFuture(self, future)
if self._exit_check_fn():
self._logger.warning(
"System exiting, will terminate current thread.")
_thread.exit()
self._task_slots.acquire()
future = self._backend.submit(*args, **kwargs)
# The future object is not needed for releasing semaphores.
future.add_done_callback(lambda future_obj: self._task_slots.release())
return future


def release_slot(self):
"""
Release a slot for new tasks to be submitted.

Semaphore is itself thread safe, so no lock is needed.

This method should only be called by ExecutorFuture.
"""
self._slots.release()

def remove_done_future(self, future):
"""
Remove done futures so the executor will not track them.

This method should only be called by WrappedFuture.
"""
# set.remove is atomic in c-python so no lock is needed.
self._futures.remove(future)

@staticmethod
def _calc_effective_workers(available_workers):
return 1 if available_workers <= 1 else math.floor(available_workers)
submit.__doc__ = futures.Executor.submit.__doc__


def _close(self):
self.closed = True
self.logger.debug("Executor is being closed, futures to wait: %s", self._futures)
futures.wait(self._futures)
self.logger.debug("Executor wait futures completed.")
self._mgr.destroy_executor(self._id)
self.logger.debug("Executor is closed.")
def shutdown(self, wait):
self._backend.shutdown(wait)


@property
def available_workers(self):
"""Get available workers."""
with self._lock:
return self._available_workers

def add_worker(self, added_available_workers):
"""This method should only be called by ComputeResourceManager."""
self.logger.debug("Add worker: %s", added_available_workers)
with self._lock:
self._available_workers += added_available_workers
new_effective_workers = self._calc_effective_workers(self._available_workers)
if new_effective_workers > self._effective_workers:
for _ in range(new_effective_workers - self._effective_workers):
self._slots.release()

self._effective_workers = new_effective_workers

def wait_all_tasks_finish(self):
"""
Wait all tasks finish.

This method is not thread safe.
"""
futures.wait(self._futures)
shutdown.__doc__ = futures.Executor.shutdown.__doc__




def terminate(): def terminate():
"""Set the terminating flag.""" """Set the terminating flag."""
global terminating
terminating = True
ComputingResourceManager.get_instance().exit()

Loading…
Cancel
Save