For max processes used for computing, we need to make sure every summary directory has a process to load data. We also should not use too many processes to avoid system problems (eg. out of memory). So we calc the max processes cnt in _calc_default_max_processes_cnt For _load_data_in_thread_wrapper, because self._load_data_in_thread() will create process pool when loading files, we can not afford to run multiple self._load_data_in_thread() simultaneously. So we use a lock to make sure that only one self._load_data_in_thread() is running.tags/v0.6.0-beta
| @@ -14,8 +14,37 @@ | |||||
| # ============================================================================ | # ============================================================================ | ||||
| """Constants module for mindinsight settings.""" | """Constants module for mindinsight settings.""" | ||||
| import logging | import logging | ||||
| import math | |||||
| import os | import os | ||||
| _DEFAULT_MAX_THREADS_COUNT = 15 | |||||
| def _calc_default_max_processes_cnt(): | |||||
| """Calc default processes count.""" | |||||
| # We need to make sure every summary directory has a process to load data. | |||||
| min_cnt = _DEFAULT_MAX_THREADS_COUNT | |||||
| # Do not use too many processes to avoid system problems (eg. out of memory). | |||||
| max_cnt = 45 | |||||
| used_cpu_ratio = 0.75 | |||||
| cpu_count = os.cpu_count() | |||||
| if cpu_count is None: | |||||
| return min_cnt | |||||
| processes_cnt = math.floor(cpu_count * used_cpu_ratio) | |||||
| if processes_cnt < min_cnt: | |||||
| return min_cnt | |||||
| if processes_cnt > max_cnt: | |||||
| return max_cnt | |||||
| return processes_cnt | |||||
| #################################### | #################################### | ||||
| # Global default settings. | # Global default settings. | ||||
| #################################### | #################################### | ||||
| @@ -48,8 +77,8 @@ API_PREFIX = '/v1/mindinsight' | |||||
| #################################### | #################################### | ||||
| # Datavisual default settings. | # Datavisual default settings. | ||||
| #################################### | #################################### | ||||
| MAX_THREADS_COUNT = 15 | |||||
| MAX_PROCESSES_COUNT = max(os.cpu_count() or 0, 15) | |||||
| MAX_THREADS_COUNT = _DEFAULT_MAX_THREADS_COUNT | |||||
| MAX_PROCESSES_COUNT = _calc_default_max_processes_cnt() | |||||
| MAX_TAG_SIZE_PER_EVENTS_DATA = 300 | MAX_TAG_SIZE_PER_EVENTS_DATA = 300 | ||||
| DEFAULT_STEP_SIZES_PER_TAG = 500 | DEFAULT_STEP_SIZES_PER_TAG = 500 | ||||
| @@ -831,6 +831,11 @@ class DataManager: | |||||
| self._detail_cache = _DetailCacheManager(loader_generators) | self._detail_cache = _DetailCacheManager(loader_generators) | ||||
| self._brief_cache = _BriefCacheManager() | self._brief_cache = _BriefCacheManager() | ||||
| # This lock is used to make sure that only one self._load_data_in_thread() is running. | |||||
| # Because self._load_data_in_thread() will create process pool when loading files, we can not | |||||
| # afford to run multiple self._load_data_in_thread() simultaneously (will create too many processes). | |||||
| self._load_data_lock = threading.Lock() | |||||
| @property | @property | ||||
| def summary_base_dir(self): | def summary_base_dir(self): | ||||
| """Get summary base dir.""" | """Get summary base dir.""" | ||||
| @@ -886,7 +891,8 @@ class DataManager: | |||||
| def _load_data_in_thread_wrapper(self): | def _load_data_in_thread_wrapper(self): | ||||
| """Wrapper for load data in thread.""" | """Wrapper for load data in thread.""" | ||||
| try: | try: | ||||
| self._load_data_in_thread() | |||||
| with self._load_data_lock: | |||||
| self._load_data_in_thread() | |||||
| except MindInsightException as exc: | except MindInsightException as exc: | ||||
| # Not raising the exception here to ensure that data reloading does not crash. | # Not raising the exception here to ensure that data reloading does not crash. | ||||
| logger.warning(exc.message) | logger.warning(exc.message) | ||||