|
|
|
@@ -19,7 +19,7 @@ import threading |
|
|
|
import multiprocessing |
|
|
|
from concurrent import futures |
|
|
|
|
|
|
|
from mindinsight.utils.log import utils_logger as logger |
|
|
|
from mindinsight.utils.log import setup_logger |
|
|
|
from mindinsight.utils.constant import GeneralErrors |
|
|
|
from mindinsight.utils.exceptions import MindInsightException |
|
|
|
|
|
|
|
@@ -49,8 +49,9 @@ class ComputingResourceManager: |
|
|
|
} |
|
|
|
self._remaining_executors = len(self._executors) |
|
|
|
self._backend = futures.ProcessPoolExecutor(max_workers=max_processes_cnt, mp_context=_MP_CONTEXT) |
|
|
|
logger.info("Initialized ComputingResourceManager with executors_cnt=%s, max_processes_cnt=%s.", |
|
|
|
executors_cnt, max_processes_cnt) |
|
|
|
self.logger = setup_logger("utils", "utils") |
|
|
|
self.logger.info("Initialized ComputingResourceManager with executors_cnt=%s, max_processes_cnt=%s.", |
|
|
|
executors_cnt, max_processes_cnt) |
|
|
|
|
|
|
|
def __enter__(self): |
|
|
|
"""This method is not thread safe.""" |
|
|
|
@@ -92,8 +93,8 @@ class ComputingResourceManager: |
|
|
|
self._executors.pop(executor_id) |
|
|
|
|
|
|
|
remaining_executors = len(self._executors) |
|
|
|
logger.info("Destroy executor %s. Will release %s worker(s). Remaining executors: %s.", |
|
|
|
executor_id, released_workers, remaining_executors) |
|
|
|
self.logger.info("Destroy executor %s. Will release %s worker(s). Remaining executors: %s.", |
|
|
|
executor_id, released_workers, remaining_executors) |
|
|
|
if not remaining_executors: |
|
|
|
return |
|
|
|
|
|
|
|
@@ -139,6 +140,7 @@ class WrappedFuture: |
|
|
|
def __init__(self, executor, original_future: futures.Future): |
|
|
|
self._original_future = original_future |
|
|
|
self._executor = executor |
|
|
|
self.logger = setup_logger("utils", "utils") |
|
|
|
|
|
|
|
def add_done_callback(self, callback): |
|
|
|
""" |
|
|
|
@@ -147,7 +149,7 @@ class WrappedFuture: |
|
|
|
See futures.Future.add_done_callback() for details. |
|
|
|
""" |
|
|
|
def _wrapped_callback(*args, **kwargs): |
|
|
|
logger.debug("Future callback called.") |
|
|
|
self.logger.debug("Future callback called.") |
|
|
|
try: |
|
|
|
return callback(*args, **kwargs) |
|
|
|
finally: |
|
|
|
@@ -175,8 +177,8 @@ class Executor: |
|
|
|
self._futures = set() |
|
|
|
|
|
|
|
self._lock = threading.Lock() |
|
|
|
|
|
|
|
logger.debug("Available workers: %s.", available_workers) |
|
|
|
self.logger = setup_logger("utils", "utils") |
|
|
|
self.logger.debug("Available workers: %s.", available_workers) |
|
|
|
|
|
|
|
def __enter__(self): |
|
|
|
"""This method is not thread safe.""" |
|
|
|
@@ -194,7 +196,7 @@ class Executor: |
|
|
|
|
|
|
|
See concurrent.futures.Executor.submit() for details. This method is not thread safe. |
|
|
|
""" |
|
|
|
logger.debug("Task submitted to executor %s.", self._id) |
|
|
|
self.logger.debug("Task submitted to executor %s.", self._id) |
|
|
|
|
|
|
|
if self.closed: |
|
|
|
raise ComputingResourceManagerException("Cannot submit task to a closed executor.") |
|
|
|
@@ -232,11 +234,11 @@ class Executor: |
|
|
|
|
|
|
|
def _close(self): |
|
|
|
self.closed = True |
|
|
|
logger.debug("Executor is being closed, futures to wait: %s", self._futures) |
|
|
|
self.logger.debug("Executor is being closed, futures to wait: %s", self._futures) |
|
|
|
futures.wait(self._futures) |
|
|
|
logger.debug("Executor wait futures completed.") |
|
|
|
self.logger.debug("Executor wait futures completed.") |
|
|
|
self._mgr.destroy_executor(self._id) |
|
|
|
logger.debug("Executor is closed.") |
|
|
|
self.logger.debug("Executor is closed.") |
|
|
|
|
|
|
|
@property |
|
|
|
def available_workers(self): |
|
|
|
@@ -246,7 +248,7 @@ class Executor: |
|
|
|
|
|
|
|
def add_worker(self, added_available_workers): |
|
|
|
"""This method should only be called by ComputeResourceManager.""" |
|
|
|
logger.debug("Add worker: %s", added_available_workers) |
|
|
|
self.logger.debug("Add worker: %s", added_available_workers) |
|
|
|
with self._lock: |
|
|
|
self._available_workers += added_available_workers |
|
|
|
new_effective_workers = self._calc_effective_workers(self._available_workers) |
|
|
|
|