You can not select more than 25 topics Topics must start with a chinese character,a letter or number, can include dashes ('-') and can be up to 35 characters long.

computing_resource_mgr.py 10 kB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292
  1. # Copyright 2020-2021 Huawei Technologies Co., Ltd
  2. #
  3. # Licensed under the Apache License, Version 2.0 (the "License");
  4. # you may not use this file except in compliance with the License.
  5. # You may obtain a copy of the License at
  6. #
  7. # http://www.apache.org/licenses/LICENSE-2.0
  8. #
  9. # Unless required by applicable law or agreed to in writing, software
  10. # distributed under the License is distributed on an "AS IS" BASIS,
  11. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. # See the License for the specific language governing permissions and
  13. # limitations under the License.
  14. # ============================================================================
  15. """Compute resource manager."""
  16. import sys
  17. import fractions
  18. import math
  19. import threading
  20. from concurrent import futures
  21. import signal
  22. import multiprocessing
  23. from mindinsight.utils.log import setup_logger
  24. from mindinsight.utils.constant import GeneralErrors
  25. from mindinsight.utils.exceptions import MindInsightException
  26. _MP_CONTEXT = multiprocessing.get_context(method="fork")
  27. terminating = False
  28. class ComputingResourceManager:
  29. """
  30. Manager for computing resources.
  31. This class provides executors for computing tasks. Executors can only be used once.
  32. Args:
  33. executors_cnt (int): Number of executors to be provided by this class.
  34. max_processes_cnt (int): Max number of processes to be used for computing.
  35. """
  36. def __init__(self, executors_cnt=1, max_processes_cnt=4):
  37. self._max_processes_cnt = max_processes_cnt
  38. self._executors_cnt = executors_cnt
  39. self._lock = threading.Lock()
  40. self._executors = {
  41. ind: Executor(
  42. self, executor_id=ind,
  43. available_workers=fractions.Fraction(self._max_processes_cnt, self._executors_cnt))
  44. for ind in range(self._executors_cnt)
  45. }
  46. self._remaining_executors = len(self._executors)
  47. def initializer():
  48. origin_handler = signal.getsignal(signal.SIGTERM)
  49. def handler(sig, frame):
  50. origin_handler(sig, frame)
  51. sys.exit(0)
  52. signal.signal(signal.SIGTERM, handler)
  53. self._backend = futures.ProcessPoolExecutor(max_workers=max_processes_cnt, mp_context=_MP_CONTEXT,
  54. initializer=initializer)
  55. self.logger = setup_logger("utils", "utils")
  56. self.logger.info("Initialized ComputingResourceManager with executors_cnt=%s, max_processes_cnt=%s.",
  57. executors_cnt, max_processes_cnt)
  58. def __enter__(self):
  59. """This method is not thread safe."""
  60. return self
  61. def __exit__(self, exc_type, exc_val, exc_tb):
  62. """
  63. This should not block because every executor have waited. If it blocks, there may be some problem.
  64. This method is not thread safe.
  65. """
  66. self._backend.shutdown()
  67. def get_executor(self):
  68. """
  69. Get an executor.
  70. Returns:
  71. Executor, which can be used for submitting tasks.
  72. Raises:
  73. ComputeResourceManagerException: when no more executor is available.
  74. """
  75. with self._lock:
  76. self._remaining_executors -= 1
  77. if self._remaining_executors < 0:
  78. raise ComputingResourceManagerException("No more executors.")
  79. return self._executors[self._remaining_executors]
  80. def destroy_executor(self, executor_id):
  81. """
  82. Destroy an executor to reuse it's workers.
  83. Args:
  84. executor_id (int): Id of the executor to be destroyed.
  85. """
  86. with self._lock:
  87. released_workers = self._executors[executor_id].available_workers
  88. self._executors.pop(executor_id)
  89. remaining_executors = len(self._executors)
  90. self.logger.info("Destroy executor %s. Will release %s worker(s). Remaining executors: %s.",
  91. executor_id, released_workers, remaining_executors)
  92. if not remaining_executors:
  93. return
  94. for executor in self._executors.values():
  95. executor.add_worker(
  96. fractions.Fraction(
  97. released_workers.numerator,
  98. released_workers.denominator * remaining_executors))
  99. def submit(self, *args, **kwargs):
  100. """
  101. Submit a task.
  102. See concurrent.futures.Executor.submit() for details.
  103. This method should only be called by Executor. Users should not call this method directly.
  104. """
  105. with self._lock:
  106. if not terminating:
  107. return self._backend.submit(*args, **kwargs)
  108. self.logger.info('Got submit after process pool shutdown.')
  109. return None
  110. class ComputingResourceManagerException(MindInsightException):
  111. """
  112. Indicates a computing resource error has occurred.
  113. This exception should not be presented to end users.
  114. Args:
  115. msg (str): Exception message.
  116. """
  117. def __init__(self, msg):
  118. super().__init__(error=GeneralErrors.COMPUTING_RESOURCE_ERROR, message=msg)
  119. class WrappedFuture:
  120. """
  121. Wrap Future objects with custom logics to release compute slots.
  122. Args:
  123. executor (Executor): The executor which generates this future.
  124. original_future (futures.Future): Original future object.
  125. """
  126. def __init__(self, executor, original_future: futures.Future):
  127. self._original_future = original_future
  128. self._executor = executor
  129. self.logger = setup_logger("utils", "utils")
  130. def add_done_callback(self, callback):
  131. """
  132. Add done callback.
  133. See futures.Future.add_done_callback() for details.
  134. """
  135. def _wrapped_callback(*args, **kwargs):
  136. self.logger.debug("Future callback called.")
  137. try:
  138. return callback(*args, **kwargs)
  139. finally:
  140. self._executor.release_slot()
  141. self._executor.remove_done_future(self._original_future)
  142. self._original_future.add_done_callback(_wrapped_callback)
  143. class Executor:
  144. """
  145. Task executor.
  146. Args:
  147. mgr (ComputingResourceManager): The ComputingResourceManager that generates this executor.
  148. executor_id (int): Executor id.
  149. available_workers (fractions.Fraction): Available workers.
  150. """
  151. def __init__(self, mgr: ComputingResourceManager, executor_id, available_workers):
  152. self._mgr = mgr
  153. self.closed = False
  154. self._available_workers = available_workers
  155. self._effective_workers = self._calc_effective_workers(self._available_workers)
  156. self._slots = threading.Semaphore(value=self._effective_workers)
  157. self._id = executor_id
  158. self._futures = set()
  159. self._lock = threading.Lock()
  160. self.logger = setup_logger("utils", "utils")
  161. self.logger.debug("Available workers: %s.", available_workers)
  162. def __enter__(self):
  163. """This method is not thread safe."""
  164. if self.closed:
  165. raise ComputingResourceManagerException("Can not reopen closed executor.")
  166. return self
  167. def __exit__(self, exc_type, exc_val, exc_tb):
  168. """This method is not thread safe."""
  169. self._close()
  170. def submit(self, *args, **kwargs):
  171. """
  172. Submit task.
  173. See concurrent.futures.Executor.submit() for details. This method is not thread safe.
  174. """
  175. self.logger.debug("Task submitted to executor %s.", self._id)
  176. if self.closed:
  177. raise ComputingResourceManagerException("Cannot submit task to a closed executor.")
  178. # Thread will wait on acquire().
  179. self._slots.acquire()
  180. future = self._mgr.submit(*args, **kwargs)
  181. if future is None:
  182. return None
  183. # set.add is atomic in c-python.
  184. self._futures.add(future)
  185. return WrappedFuture(self, future)
  186. def release_slot(self):
  187. """
  188. Release a slot for new tasks to be submitted.
  189. Semaphore is itself thread safe, so no lock is needed.
  190. This method should only be called by ExecutorFuture.
  191. """
  192. self._slots.release()
  193. def remove_done_future(self, future):
  194. """
  195. Remove done futures so the executor will not track them.
  196. This method should only be called by WrappedFuture.
  197. """
  198. # set.remove is atomic in c-python so no lock is needed.
  199. self._futures.remove(future)
  200. @staticmethod
  201. def _calc_effective_workers(available_workers):
  202. return 1 if available_workers <= 1 else math.floor(available_workers)
  203. def _close(self):
  204. self.closed = True
  205. self.logger.debug("Executor is being closed, futures to wait: %s", self._futures)
  206. futures.wait(self._futures)
  207. self.logger.debug("Executor wait futures completed.")
  208. self._mgr.destroy_executor(self._id)
  209. self.logger.debug("Executor is closed.")
  210. @property
  211. def available_workers(self):
  212. """Get available workers."""
  213. with self._lock:
  214. return self._available_workers
  215. def add_worker(self, added_available_workers):
  216. """This method should only be called by ComputeResourceManager."""
  217. self.logger.debug("Add worker: %s", added_available_workers)
  218. with self._lock:
  219. self._available_workers += added_available_workers
  220. new_effective_workers = self._calc_effective_workers(self._available_workers)
  221. if new_effective_workers > self._effective_workers:
  222. for _ in range(new_effective_workers - self._effective_workers):
  223. self._slots.release()
  224. self._effective_workers = new_effective_workers
  225. def wait_all_tasks_finish(self):
  226. """
  227. Wait all tasks finish.
  228. This method is not thread safe.
  229. """
  230. futures.wait(self._futures)
  231. def terminate():
  232. """Set the terminating flag."""
  233. global terminating
  234. terminating = True