Browse Source

!1248 refactored the computing resource manager to ease the elegant exiting procedure

From: @wenkai_dist
Reviewed-by: 
Signed-off-by:
pull/1248/MERGE
mindspore-ci-bot Gitee 4 years ago
parent
commit
8d7a2712c8
5 changed files with 127 additions and 245 deletions
  1. +15
    -10
      mindinsight/backend/config/gunicorn_conf.py
  2. +10
    -12
      mindinsight/datavisual/data_transform/data_manager.py
  3. +4
    -6
      mindinsight/datavisual/data_transform/ms_data_loader.py
  4. +4
    -2
      mindinsight/scripts/stop.py
  5. +94
    -215
      mindinsight/utils/computing_resource_mgr.py

+ 15
- 10
mindinsight/backend/config/gunicorn_conf.py View File

@@ -17,6 +17,7 @@
import os
import time
import signal
import sys
import multiprocessing
import threading
from importlib import import_module
@@ -51,6 +52,11 @@ def on_starting(server):
threading.Thread(target=hook.on_startup, args=(server.log,)).start()


# This global variable is to manage the listen process so that we can close the
# process when gunicorn is exiting.
LISTEN_PROCESS = None


def post_worker_init(worker):
"""
Launch a process to listen worker after gunicorn worker is initialized.
@@ -62,7 +68,9 @@ def post_worker_init(worker):
worker (ThreadWorker): worker instance.
"""
def murder_worker_children_processes():
signal.signal(signal.SIGTERM, signal.SIG_IGN)
signal.signal(
signal.SIGTERM,
lambda signal_num, handler: sys.exit(0))
processes_to_kill = []
# sleep 3 seconds so that all worker children processes have been launched.
time.sleep(3)
@@ -91,18 +99,15 @@ def post_worker_init(worker):
listen_process = multiprocessing.Process(target=murder_worker_children_processes,
name="murder_worker_children_processes")
listen_process.start()
global LISTEN_PROCESS
LISTEN_PROCESS = listen_process
worker.log.info("Server pid: %d, start to listening.", worker.ppid)


def worker_int(worker):
"""Terminate child processes when worker is interrupted."""
terminate()
process = psutil.Process(worker.pid)
children = process.children(recursive=True)
for child in children:
try:
child.send_signal(signal.SIGTERM)
except psutil.NoSuchProcess:
continue
except psutil.Error as ex:
worker.log.error("Stop process %d failed. Detail: %s.", child.pid, str(ex))
global LISTEN_PROCESS
if LISTEN_PROCESS is not None:
LISTEN_PROCESS.terminate()
worker.log.info("Worker int processed.")

+ 10
- 12
mindinsight/datavisual/data_transform/data_manager.py View File

@@ -912,18 +912,16 @@ class DataManager:
return
self.status = DataManagerStatus.LOADING.value

with ComputingResourceManager(executors_cnt=1,
max_processes_cnt=settings.MAX_PROCESSES_COUNT) as computing_resource_mgr:
with computing_resource_mgr.get_executor() as executor:
self._brief_cache.update_cache(executor)
brief_cache_update = time.time()
for _ in self._detail_cache.update_cache(executor):
update_interval = time.time() - brief_cache_update
logger.debug('Loading one round of detail cache taking %ss.', update_interval)
if update_interval > 3: # Use 3 seconds as threshold to avoid updating too often
self._brief_cache.update_cache(executor)
brief_cache_update += update_interval
executor.wait_all_tasks_finish()
with ComputingResourceManager.get_instance().get_executor(
max_processes_cnt=settings.MAX_PROCESSES_COUNT) as executor:
self._brief_cache.update_cache(executor)
brief_cache_update = time.time()
for _ in self._detail_cache.update_cache(executor):
update_interval = time.time() - brief_cache_update
logger.debug('Loading one round of detail cache taking %ss.', update_interval)
if update_interval > 3: # Use 3 seconds as threshold to avoid updating too often
self._brief_cache.update_cache(executor)
brief_cache_update += update_interval
with self._status_mutex:
if not self._brief_cache.has_content() and not self._detail_cache.has_content():
self.status = DataManagerStatus.INVALID.value


+ 4
- 6
mindinsight/datavisual/data_transform/ms_data_loader.py View File

@@ -102,12 +102,10 @@ class MSDataLoader:
if executor is not None:
raise TypeError("'executor' should be an Executor instance or None.")

with ComputingResourceManager() as mgr:
with mgr.get_executor() as new_executor:
while not self._load(new_executor):
pass
new_executor.wait_all_tasks_finish()
return True
with ComputingResourceManager.get_instance().get_executor() as new_executor:
while not self._load(new_executor):
pass
return True

def _load(self, executor):
"""


+ 4
- 2
mindinsight/scripts/stop.py View File

@@ -109,9 +109,11 @@ class Command(BaseCommand):
processes.append(process)
try:
self._send_signal(process, signal.SIGINT)
# Wait 2 second, if not terminate, kill the worker process.
_, alive = psutil.wait_procs(processes, 2)
# Wait a moment, if not terminate, kill the worker process.
exit_timeout_seconds = 5
_, alive = psutil.wait_procs(processes, exit_timeout_seconds)
for alive_process in alive:
self.logfile.info("Stop process %d because timeout.", alive_process.pid)
self._send_signal(alive_process, signal.SIGKILL)
except psutil.Error as ex:
self.logfile.error("Stop process %d failed. Detail: %s.", pid, str(ex))


+ 94
- 215
mindinsight/utils/computing_resource_mgr.py View File

@@ -13,267 +13,146 @@
# limitations under the License.
# ============================================================================
"""Compute resource manager."""
import fractions
import math


import functools
import multiprocessing
import threading
from concurrent import futures
import multiprocessing

from mindinsight.utils.log import setup_logger
from mindinsight.utils.constant import GeneralErrors
from mindinsight.utils.exceptions import MindInsightException
import _thread

from mindinsight.utils.log import setup_logger

_MP_CONTEXT = multiprocessing.get_context(method="forkserver")
terminating = False


class ComputingResourceManager:
"""
Manager for computing resources.

This class provides executors for computing tasks. Executors can only be used once.

Args:
executors_cnt (int): Number of executors to be provided by this class.
max_processes_cnt (int): Max number of processes to be used for computing.
Note:
1. Please always use the get_instance method to get instance.
2. This class will be used in a multi-threaded env, so it needs to be
thread-safe.
"""
def __init__(self, executors_cnt=1, max_processes_cnt=4):
self._max_processes_cnt = max_processes_cnt
self._executors_cnt = executors_cnt
_cls_lock = threading.Lock()
_instance = None
_exiting = False

def __init__(self):
self._executors = {}
self._executor_id_counter = 1
self._lock = threading.Lock()
self._executors = {
ind: Executor(
self, executor_id=ind,
available_workers=fractions.Fraction(self._max_processes_cnt, self._executors_cnt))
for ind in range(self._executors_cnt)
}
self._remaining_executors = len(self._executors)
self._backend = futures.ProcessPoolExecutor(max_workers=max_processes_cnt, mp_context=_MP_CONTEXT)
self.logger = setup_logger("utils", "utils")
self.logger.info("Initialized ComputingResourceManager with executors_cnt=%s, max_processes_cnt=%s.",
executors_cnt, max_processes_cnt)
self._exiting = False
self._logger = setup_logger("utils", "utils")

def __enter__(self):
"""This method is not thread safe."""
return self
@classmethod
def get_instance(cls):
"""Get the singleton instance."""
with cls._cls_lock:
if cls._instance is None:
cls._instance = ComputingResourceManager()
return cls._instance

def __exit__(self, exc_type, exc_val, exc_tb):
def exit(self):
"""
This should not block because every executor have waited. If it blocks, there may be some problem.
Called when the gunicorn worker process is exiting.

This method is not thread safe.
This method will be called in the signal handling thread, which is not
the same thread with get_executor. Also, this method will hold the lock
to block other threads from operating the singleton or executors.
"""
self._backend.shutdown()
with self._lock:
self._logger.info("Start to exit.")
self._exiting = True
for executor in self._executors.values():
# It's safe to call executor.shutdown() multiple times.
executor.shutdown(wait=True)
self._logger.info("Exited.")

def get_executor(self):
def get_executor(self, max_processes_cnt=1):
"""
Get an executor.

Returns:
Executor, which can be used for submitting tasks.

Raises:
ComputeResourceManagerException: when no more executor is available.
This method may be called by different business from different threads.
So it needs to be tread-safe.
"""
with self._lock:
self._remaining_executors -= 1
if self._remaining_executors < 0:
raise ComputingResourceManagerException("No more executors.")
return self._executors[self._remaining_executors]

def destroy_executor(self, executor_id):
"""
Destroy an executor to reuse it's workers.

Args:
executor_id (int): Id of the executor to be destroyed.
"""
if self._exiting:
self._logger.info(
"System is exiting, will terminate the thread.")
_thread.exit()

executor = Executor(
max_processes_cnt=max_processes_cnt,
exit_callback=functools.partial(
self._remove_executor,
executor_id=self._executor_id_counter),
exit_check_fn=self._check_exit
)

self._executors[self._executor_id_counter] = executor
self._executor_id_counter += 1
return executor

def _remove_executor(self, executor_id):
with self._lock:
released_workers = self._executors[executor_id].available_workers
self._executors.pop(executor_id)

remaining_executors = len(self._executors)
self.logger.info("Destroy executor %s. Will release %s worker(s). Remaining executors: %s.",
executor_id, released_workers, remaining_executors)
if not remaining_executors:
return

for executor in self._executors.values():
executor.add_worker(
fractions.Fraction(
released_workers.numerator,
released_workers.denominator * remaining_executors))

def submit(self, *args, **kwargs):
"""
Submit a task.

See concurrent.futures.Executor.submit() for details.

This method should only be called by Executor. Users should not call this method directly.
"""
def _check_exit(self):
with self._lock:
if not terminating:
return self._backend.submit(*args, **kwargs)
self.logger.info('Got submit after process pool shutdown.')
return None


class ComputingResourceManagerException(MindInsightException):
"""
Indicates a computing resource error has occurred.

This exception should not be presented to end users.

Args:
msg (str): Exception message.
"""
def __init__(self, msg):
super().__init__(error=GeneralErrors.COMPUTING_RESOURCE_ERROR, message=msg)


class WrappedFuture:
"""
Wrap Future objects with custom logics to release compute slots.

Args:
executor (Executor): The executor which generates this future.
original_future (futures.Future): Original future object.
"""
def __init__(self, executor, original_future: futures.Future):
self._original_future = original_future
self._executor = executor
self.logger = setup_logger("utils", "utils")

def add_done_callback(self, callback):
"""
Add done callback.

See futures.Future.add_done_callback() for details.
"""
def _wrapped_callback(*args, **kwargs):
self.logger.debug("Future callback called.")
try:
return callback(*args, **kwargs)
finally:
self._executor.release_slot()
self._executor.remove_done_future(self._original_future)
self._original_future.add_done_callback(_wrapped_callback)
return self._exiting


class Executor:
"""
Task executor.
Wrapped ProcessPoolExecutor to help global management.

Args:
mgr (ComputingResourceManager): The ComputingResourceManager that generates this executor.
executor_id (int): Executor id.
available_workers (fractions.Fraction): Available workers.
max_processes_cnt (int): Max processes to use.
exit_callback (Callable): A callback that will be called after process
pool exit.
exit_check_fn (Callable): A function to check whether the system is
exiting.
"""
def __init__(self, mgr: ComputingResourceManager, executor_id, available_workers):
self._mgr = mgr
self.closed = False
self._available_workers = available_workers
self._effective_workers = self._calc_effective_workers(self._available_workers)
self._slots = threading.Semaphore(value=self._effective_workers)
self._id = executor_id
self._futures = set()

self._lock = threading.Lock()
self.logger = setup_logger("utils", "utils")
self.logger.debug("Available workers: %s.", available_workers)
def __init__(self, max_processes_cnt, exit_callback, exit_check_fn):
self._backend = futures.ProcessPoolExecutor(
max_workers=max_processes_cnt,
mp_context=_MP_CONTEXT)
self._exit_callback = exit_callback
self._task_slots = threading.Semaphore(value=max_processes_cnt)
self._exit_check_fn = exit_check_fn
self._logger = setup_logger("utils", "utils")

def __enter__(self):
"""This method is not thread safe."""
if self.closed:
raise ComputingResourceManagerException("Can not reopen closed executor.")
self._backend.__enter__()
return self

def __exit__(self, exc_type, exc_val, exc_tb):
"""This method is not thread safe."""
self._close()
def __exit__(self, *args, **kwargs):
ret = self._backend.__exit__(*args, **kwargs)
self._exit_callback()
return ret

def submit(self, *args, **kwargs):
"""
Submit task.

See concurrent.futures.Executor.submit() for details. This method is not thread safe.
"""
self.logger.debug("Task submitted to executor %s.", self._id)

if self.closed:
raise ComputingResourceManagerException("Cannot submit task to a closed executor.")

# Thread will wait on acquire().
self._slots.acquire()
future = self._mgr.submit(*args, **kwargs)
if future is None:
return None

# set.add is atomic in c-python.
self._futures.add(future)
return WrappedFuture(self, future)
if self._exit_check_fn():
self._logger.warning(
"System exiting, will terminate current thread.")
_thread.exit()
self._task_slots.acquire()
future = self._backend.submit(*args, **kwargs)
# The future object is not needed for releasing semaphores.
future.add_done_callback(lambda future_obj: self._task_slots.release())
return future

def release_slot(self):
"""
Release a slot for new tasks to be submitted.

Semaphore is itself thread safe, so no lock is needed.

This method should only be called by ExecutorFuture.
"""
self._slots.release()

def remove_done_future(self, future):
"""
Remove done futures so the executor will not track them.

This method should only be called by WrappedFuture.
"""
# set.remove is atomic in c-python so no lock is needed.
self._futures.remove(future)

@staticmethod
def _calc_effective_workers(available_workers):
return 1 if available_workers <= 1 else math.floor(available_workers)
submit.__doc__ = futures.Executor.submit.__doc__

def _close(self):
self.closed = True
self.logger.debug("Executor is being closed, futures to wait: %s", self._futures)
futures.wait(self._futures)
self.logger.debug("Executor wait futures completed.")
self._mgr.destroy_executor(self._id)
self.logger.debug("Executor is closed.")
def shutdown(self, wait):
self._backend.shutdown(wait)

@property
def available_workers(self):
"""Get available workers."""
with self._lock:
return self._available_workers

def add_worker(self, added_available_workers):
"""This method should only be called by ComputeResourceManager."""
self.logger.debug("Add worker: %s", added_available_workers)
with self._lock:
self._available_workers += added_available_workers
new_effective_workers = self._calc_effective_workers(self._available_workers)
if new_effective_workers > self._effective_workers:
for _ in range(new_effective_workers - self._effective_workers):
self._slots.release()

self._effective_workers = new_effective_workers

def wait_all_tasks_finish(self):
"""
Wait all tasks finish.

This method is not thread safe.
"""
futures.wait(self._futures)
shutdown.__doc__ = futures.Executor.shutdown.__doc__


def terminate():
"""Set the terminating flag."""
global terminating
terminating = True
ComputingResourceManager.get_instance().exit()

Loading…
Cancel
Save