You can not select more than 25 topics Topics must start with a chinese character,a letter or number, can include dashes ('-') and can be up to 35 characters long.

computing_resource_mgr.py 9.2 kB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265
  1. # Copyright 2020 Huawei Technologies Co., Ltd
  2. #
  3. # Licensed under the Apache License, Version 2.0 (the "License");
  4. # you may not use this file except in compliance with the License.
  5. # You may obtain a copy of the License at
  6. #
  7. # http://www.apache.org/licenses/LICENSE-2.0
  8. #
  9. # Unless required by applicable law or agreed to in writing, software
  10. # distributed under the License is distributed on an "AS IS" BASIS,
  11. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. # See the License for the specific language governing permissions and
  13. # limitations under the License.
  14. # ============================================================================
  15. """Compute resource manager."""
  16. import fractions
  17. import math
  18. import threading
  19. import multiprocessing
  20. from concurrent import futures
  21. from mindinsight.utils.log import utils_logger as logger
  22. from mindinsight.utils.constant import GeneralErrors
  23. from mindinsight.utils.exceptions import MindInsightException
  24. _MP_CONTEXT = multiprocessing.get_context(method="forkserver")
  25. class ComputingResourceManager:
  26. """
  27. Manager for computing resources.
  28. This class provides executors for computing tasks. Executors can only be used once.
  29. Args:
  30. executors_cnt (int): Number of executors to be provided by this class.
  31. max_processes_cnt (int): Max number of processes to be used for computing.
  32. """
  33. def __init__(self, executors_cnt=1, max_processes_cnt=4):
  34. self._max_processes_cnt = max_processes_cnt
  35. self._executors_cnt = executors_cnt
  36. self._lock = threading.Lock()
  37. self._executors = {
  38. ind: Executor(
  39. self, executor_id=ind,
  40. available_workers=fractions.Fraction(self._max_processes_cnt, self._executors_cnt))
  41. for ind in range(self._executors_cnt)
  42. }
  43. self._remaining_executors = len(self._executors)
  44. self._backend = futures.ProcessPoolExecutor(max_workers=max_processes_cnt, mp_context=_MP_CONTEXT)
  45. logger.info("Initialized ComputingResourceManager with executors_cnt=%s, max_processes_cnt=%s.",
  46. executors_cnt, max_processes_cnt)
  47. def __enter__(self):
  48. """This method is not thread safe."""
  49. return self
  50. def __exit__(self, exc_type, exc_val, exc_tb):
  51. """
  52. This should not block because every executor have waited. If it blocks, there may be some problem.
  53. This method is not thread safe.
  54. """
  55. self._backend.shutdown()
  56. def get_executor(self):
  57. """
  58. Get an executor.
  59. Returns:
  60. Executor, which can be used for submitting tasks.
  61. Raises:
  62. ComputeResourceManagerException: when no more executor is available.
  63. """
  64. with self._lock:
  65. self._remaining_executors -= 1
  66. if self._remaining_executors < 0:
  67. raise ComputingResourceManagerException("No more executors.")
  68. return self._executors[self._remaining_executors]
  69. def destroy_executor(self, executor_id):
  70. """
  71. Destroy an executor to reuse it's workers.
  72. Args:
  73. executor_id (int): Id of the executor to be destroyed.
  74. """
  75. with self._lock:
  76. released_workers = self._executors[executor_id].available_workers
  77. self._executors.pop(executor_id)
  78. remaining_executors = len(self._executors)
  79. logger.info("Destroy executor %s. Will release %s worker(s). Remaining executors: %s.",
  80. executor_id, released_workers, remaining_executors)
  81. if not remaining_executors:
  82. return
  83. for executor in self._executors.values():
  84. executor.add_worker(
  85. fractions.Fraction(
  86. released_workers.numerator,
  87. released_workers.denominator * remaining_executors))
  88. def submit(self, *args, **kwargs):
  89. """
  90. Submit a task.
  91. See concurrent.futures.Executor.submit() for details.
  92. This method should only be called by Executor. Users should not call this method directly.
  93. """
  94. with self._lock:
  95. return self._backend.submit(*args, **kwargs)
  96. class ComputingResourceManagerException(MindInsightException):
  97. """
  98. Indicates a computing resource error has occurred.
  99. This exception should not be presented to end users.
  100. Args:
  101. msg (str): Exception message.
  102. """
  103. def __init__(self, msg):
  104. super().__init__(error=GeneralErrors.COMPUTING_RESOURCE_ERROR, message=msg)
  105. class WrappedFuture:
  106. """
  107. Wrap Future objects with custom logics to release compute slots.
  108. Args:
  109. executor (Executor): The executor which generates this future.
  110. original_future (futures.Future): Original future object.
  111. """
  112. def __init__(self, executor, original_future: futures.Future):
  113. self._original_future = original_future
  114. self._executor = executor
  115. def add_done_callback(self, callback):
  116. """
  117. Add done callback.
  118. See futures.Future.add_done_callback() for details.
  119. """
  120. def _wrapped_callback(*args, **kwargs):
  121. logger.debug("Future callback called.")
  122. try:
  123. return callback(*args, **kwargs)
  124. finally:
  125. self._executor.release_slot()
  126. self._executor.remove_done_future(self._original_future)
  127. self._original_future.add_done_callback(_wrapped_callback)
  128. class Executor:
  129. """
  130. Task executor.
  131. Args:
  132. mgr (ComputingResourceManager): The ComputingResourceManager that generates this executor.
  133. executor_id (int): Executor id.
  134. available_workers (fractions.Fraction): Available workers.
  135. """
  136. def __init__(self, mgr: ComputingResourceManager, executor_id, available_workers):
  137. self._mgr = mgr
  138. self.closed = False
  139. self._available_workers = available_workers
  140. self._effective_workers = self._calc_effective_workers(self._available_workers)
  141. self._slots = threading.Semaphore(value=self._effective_workers)
  142. self._id = executor_id
  143. self._futures = set()
  144. self._lock = threading.Lock()
  145. logger.debug("Available workers: %s.", available_workers)
  146. def __enter__(self):
  147. """This method is not thread safe."""
  148. if self.closed:
  149. raise ComputingResourceManagerException("Can not reopen closed executor.")
  150. return self
  151. def __exit__(self, exc_type, exc_val, exc_tb):
  152. """This method is not thread safe."""
  153. self._close()
  154. def submit(self, *args, **kwargs):
  155. """
  156. Submit task.
  157. See concurrent.futures.Executor.submit() for details. This method is not thread safe.
  158. """
  159. logger.debug("Task submitted to executor %s.", self._id)
  160. if self.closed:
  161. raise ComputingResourceManagerException("Cannot submit task to a closed executor.")
  162. # Thread will wait on acquire().
  163. self._slots.acquire()
  164. future = self._mgr.submit(*args, **kwargs)
  165. # set.add is atomic in c-python.
  166. self._futures.add(future)
  167. return WrappedFuture(self, future)
  168. def release_slot(self):
  169. """
  170. Release a slot for new tasks to be submitted.
  171. Semaphore is itself thread safe, so no lock is needed.
  172. This method should only be called by ExecutorFuture.
  173. """
  174. self._slots.release()
  175. def remove_done_future(self, future):
  176. """
  177. Remove done futures so the executor will not track them.
  178. This method should only be called by WrappedFuture.
  179. """
  180. # set.remove is atomic in c-python so no lock is needed.
  181. self._futures.remove(future)
  182. @staticmethod
  183. def _calc_effective_workers(available_workers):
  184. return 1 if available_workers <= 1 else math.floor(available_workers)
  185. def _close(self):
  186. self.closed = True
  187. logger.debug("Executor is being closed, futures to wait: %s", self._futures)
  188. futures.wait(self._futures)
  189. logger.debug("Executor wait futures completed.")
  190. self._mgr.destroy_executor(self._id)
  191. logger.debug("Executor is closed.")
  192. @property
  193. def available_workers(self):
  194. """Get available workers."""
  195. with self._lock:
  196. return self._available_workers
  197. def add_worker(self, added_available_workers):
  198. """This method should only be called by ComputeResourceManager."""
  199. logger.debug("Add worker: %s", added_available_workers)
  200. with self._lock:
  201. self._available_workers += added_available_workers
  202. new_effective_workers = self._calc_effective_workers(self._available_workers)
  203. if new_effective_workers > self._effective_workers:
  204. for _ in range(new_effective_workers - self._effective_workers):
  205. self._slots.release()
  206. self._effective_workers = new_effective_workers
  207. def wait_all_tasks_finish(self):
  208. """
  209. Wait all tasks finish.
  210. This method is not thread safe.
  211. """
  212. futures.wait(self._futures)