You can not select more than 25 topics Topics must start with a chinese character,a letter or number, can include dashes ('-') and can be up to 35 characters long.

computing_resource_mgr.py 5.2 kB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158
  1. # Copyright 2020-2021 Huawei Technologies Co., Ltd
  2. #
  3. # Licensed under the Apache License, Version 2.0 (the "License");
  4. # you may not use this file except in compliance with the License.
  5. # You may obtain a copy of the License at
  6. #
  7. # http://www.apache.org/licenses/LICENSE-2.0
  8. #
  9. # Unless required by applicable law or agreed to in writing, software
  10. # distributed under the License is distributed on an "AS IS" BASIS,
  11. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. # See the License for the specific language governing permissions and
  13. # limitations under the License.
  14. # ============================================================================
  15. """Compute resource manager."""
  16. import functools
  17. import multiprocessing
  18. import threading
  19. from concurrent import futures
  20. import _thread
  21. from mindinsight.utils.log import setup_logger
  22. _MP_CONTEXT = multiprocessing.get_context(method="forkserver")
  23. class ComputingResourceManager:
  24. """
  25. Manager for computing resources.
  26. Note:
  27. 1. Please always use the get_instance method to get instance.
  28. 2. This class will be used in a multi-threaded env, so it needs to be
  29. thread-safe.
  30. """
  31. _cls_lock = threading.Lock()
  32. _instance = None
  33. _exiting = False
  34. def __init__(self):
  35. self._executors = {}
  36. self._executor_id_counter = 1
  37. self._lock = threading.Lock()
  38. self._exiting = False
  39. self._logger = setup_logger("utils", "utils")
  40. @classmethod
  41. def get_instance(cls):
  42. """Get the singleton instance."""
  43. with cls._cls_lock:
  44. if cls._instance is None:
  45. cls._instance = ComputingResourceManager()
  46. return cls._instance
  47. def exit(self):
  48. """
  49. Called when the gunicorn worker process is exiting.
  50. This method will be called in the signal handling thread, which is not
  51. the same thread with get_executor. Also, this method will hold the lock
  52. to block other threads from operating the singleton or executors.
  53. """
  54. with self._lock:
  55. self._logger.info("Start to exit.")
  56. self._exiting = True
  57. for executor in self._executors.values():
  58. # It's safe to call executor.shutdown() multiple times.
  59. executor.shutdown(wait=True)
  60. self._logger.info("Exited.")
  61. def get_executor(self, max_processes_cnt=1):
  62. """
  63. Get an executor.
  64. This method may be called by different business from different threads.
  65. So it needs to be tread-safe.
  66. """
  67. with self._lock:
  68. if self._exiting:
  69. self._logger.info(
  70. "System is exiting, will terminate the thread.")
  71. _thread.exit()
  72. executor = Executor(
  73. max_processes_cnt=max_processes_cnt,
  74. exit_callback=functools.partial(
  75. self._remove_executor,
  76. executor_id=self._executor_id_counter),
  77. exit_check_fn=self._check_exit
  78. )
  79. self._executors[self._executor_id_counter] = executor
  80. self._executor_id_counter += 1
  81. return executor
  82. def _remove_executor(self, executor_id):
  83. with self._lock:
  84. self._executors.pop(executor_id)
  85. def _check_exit(self):
  86. with self._lock:
  87. return self._exiting
  88. class Executor:
  89. """
  90. Wrapped ProcessPoolExecutor to help global management.
  91. Args:
  92. max_processes_cnt (int): Max processes to use.
  93. exit_callback (Callable): A callback that will be called after process
  94. pool exit.
  95. exit_check_fn (Callable): A function to check whether the system is
  96. exiting.
  97. """
  98. def __init__(self, max_processes_cnt, exit_callback, exit_check_fn):
  99. self._backend = futures.ProcessPoolExecutor(
  100. max_workers=max_processes_cnt,
  101. mp_context=_MP_CONTEXT)
  102. self._exit_callback = exit_callback
  103. self._task_slots = threading.Semaphore(value=max_processes_cnt)
  104. self._exit_check_fn = exit_check_fn
  105. self._logger = setup_logger("utils", "utils")
  106. def __enter__(self):
  107. self._backend.__enter__()
  108. return self
  109. def __exit__(self, *args, **kwargs):
  110. ret = self._backend.__exit__(*args, **kwargs)
  111. self._exit_callback()
  112. return ret
  113. def submit(self, *args, **kwargs):
  114. if self._exit_check_fn():
  115. self._logger.warning(
  116. "System exiting, will terminate current thread.")
  117. _thread.exit()
  118. self._task_slots.acquire()
  119. future = self._backend.submit(*args, **kwargs)
  120. # The future object is not needed for releasing semaphores.
  121. future.add_done_callback(lambda future_obj: self._task_slots.release())
  122. return future
  123. submit.__doc__ = futures.Executor.submit.__doc__
  124. def shutdown(self, wait):
  125. self._backend.shutdown(wait)
  126. shutdown.__doc__ = futures.Executor.shutdown.__doc__
  127. def terminate():
  128. """Set the terminating flag."""
  129. ComputingResourceManager.get_instance().exit()