You can not select more than 25 topics Topics must start with a chinese character,a letter or number, can include dashes ('-') and can be up to 35 characters long.

minddata_pipeline_analyser.py 10 kB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285
  1. # Copyright 2020 Huawei Technologies Co., Ltd
  2. #
  3. # Licensed under the Apache License, Version 2.0 (the "License");
  4. # you may not use this file except in compliance with the License.
  5. # You may obtain a copy of the License at
  6. #
  7. # http://www.apache.org/licenses/LICENSE-2.0
  8. #
  9. # Unless required by applicable law or agreed to in writing, software
  10. # distributed under the License is distributed on an "AS IS" BASIS,
  11. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. # See the License for the specific language governing permissions and
  13. # limitations under the License.
  14. # ============================================================================
  15. """The minddata pipeline analyser class."""
  16. import csv
  17. import json
  18. import os
  19. import sys
  20. from mindinsight.profiler.analyser.base_analyser import BaseAnalyser
  21. from mindinsight.profiler.common.exceptions.exceptions import \
  22. ProfilerPipelineOpNotExistException
  23. from mindinsight.profiler.common.log import logger
  24. class MinddataPipelineAnalyser(BaseAnalyser):
  25. """
  26. The analyser for analyzing the minddata pipeline operator and queue data.
  27. Args:
  28. profiling_dir (str): The directory where the parsed profiling files are
  29. located.
  30. device_id (str): The device ID.
  31. Raises:
  32. ProfilerPathErrorException: If the profiling dir is invalid.
  33. """
  34. _col_names = ['op_id', 'op_type', 'num_workers', 'output_queue_size',
  35. 'output_queue_average_size', 'output_queue_length',
  36. 'output_queue_usage_rate', 'sample_interval', 'parent_id',
  37. 'children_id']
  38. _file_name_pipeline = 'minddata_pipeline_raw_{}.csv'
  39. _index_op_id = 0
  40. _index_op_type = 1
  41. _index_num_workers = 2
  42. _index_output_queue_size = 3
  43. _index_output_queue_average_size = 4
  44. _index_output_queue_length = 5
  45. _index_output_queue_usage_rate = 6
  46. _index_sample_interval = 7
  47. _index_parent_id = 8
  48. _index_children_id = 9
  49. def __init__(self, profiling_dir, device_id):
  50. super().__init__(profiling_dir, device_id)
  51. self._none_filter_condition_key = ['threshold', 'is_display_op_detail']
  52. self._none_sort_col_names = ['output_queue_size', 'children_id']
  53. self._op_id_index_map = self._get_op_id_index_map()
  54. def get_op_and_parent_op_info(self, op_id):
  55. """
  56. Get the operator and parent operator information by `op_id`.
  57. Args:
  58. op_id (int): The minddata pipeline operator ID.
  59. Returns:
  60. dict, the operator and parent operator information.
  61. Raises:
  62. ProfilerPipelineOpNotExistException: If the minddata pipeline
  63. operator does not exist.
  64. """
  65. index = self._op_id_index_map.get(op_id)
  66. if index is None:
  67. raise ProfilerPipelineOpNotExistException(str(op_id))
  68. op_info = self._data[index]
  69. parent_id = op_info[self._index_parent_id]
  70. parent_index = self._op_id_index_map.get(parent_id)
  71. if parent_index is None:
  72. parent_op = None
  73. queue_info = None
  74. else:
  75. parent_op_info = self._data[parent_index]
  76. parent_op = {
  77. 'op_id': parent_op_info[self._index_op_id],
  78. 'op_type': parent_op_info[self._index_op_type],
  79. 'num_workers': parent_op_info[self._index_num_workers]
  80. }
  81. queue_info = {
  82. 'output_queue_size': op_info[self._index_output_queue_size],
  83. 'output_queue_average_size':
  84. op_info[self._index_output_queue_average_size],
  85. 'output_queue_length': op_info[self._index_output_queue_length],
  86. 'output_queue_usage_rate':
  87. op_info[self._index_output_queue_usage_rate],
  88. 'sample_interval': op_info[self._index_sample_interval]
  89. }
  90. current_op = {
  91. 'op_id': op_info[self._index_op_id],
  92. 'op_type': op_info[self._index_op_type],
  93. 'num_workers': op_info[self._index_num_workers]
  94. }
  95. return {
  96. 'current_op': current_op,
  97. 'parent_op': parent_op,
  98. 'queue_info': queue_info
  99. }
  100. def _load(self):
  101. """Load data according to the parsed minddata pipeline file."""
  102. pipeline_file_path = os.path.join(
  103. self._profiling_dir,
  104. self._file_name_pipeline.format(self._device_id)
  105. )
  106. if not os.path.isfile(pipeline_file_path):
  107. logger.warning('The file <%s> does not exist.', pipeline_file_path)
  108. return
  109. with open(pipeline_file_path, 'r') as file:
  110. csv.field_size_limit(sys.maxsize)
  111. csv_reader = csv.reader(file)
  112. _ = next(csv_reader)
  113. for info in csv_reader:
  114. self._data.append(self._convert_field_type(info))
  115. def _filter(self, filter_condition):
  116. """
  117. Filter the profiling data according to the filter condition.
  118. Args:
  119. filter_condition (dict): The filter condition.
  120. """
  121. def _inner_filter(item: list):
  122. return self._default_filter(item, filter_condition)
  123. def _inner_map(item: list):
  124. inner_item = item[0:2]
  125. inner_item.extend(item[4:])
  126. return inner_item
  127. threshold = filter_condition.get('threshold')
  128. is_display_op_detail = filter_condition.get(
  129. 'is_display_op_detail', False
  130. )
  131. self._set_display_col_name(is_display_op_detail)
  132. filter_result = list(filter(_inner_filter, self._data))
  133. if threshold:
  134. low_threshold = threshold[1]
  135. high_threshold = threshold[0]
  136. filter_result = self._filter_outside_threshold(
  137. filter_result, low_threshold, high_threshold
  138. )
  139. if is_display_op_detail:
  140. self._result = filter_result
  141. else:
  142. self._result = list(map(_inner_map, filter_result))
  143. def _filter_outside_threshold(self, data, low_threshold, high_threshold):
  144. """
  145. Get the data outside the threshold range.
  146. Args:
  147. data (list[list]): The filtered data.
  148. low_threshold (float): The low threshold.
  149. high_threshold (float): The high threshold.
  150. Returns:
  151. list[list], the data outside the threshold range.
  152. """
  153. root_node = None
  154. leaf_nodes = []
  155. all_below_low_threshold = True
  156. all_higher_high_threshold = True
  157. result = []
  158. for item in data:
  159. parent_id = item[self._index_parent_id]
  160. if parent_id is None:
  161. root_node = item
  162. continue
  163. # current usage rate compared to the threshold
  164. cur_usage_rate = item[self._index_output_queue_usage_rate]
  165. is_low = False
  166. if cur_usage_rate < low_threshold:
  167. is_low = True
  168. else:
  169. all_below_low_threshold = False
  170. if cur_usage_rate < high_threshold:
  171. all_higher_high_threshold = False
  172. # the child node usage rate compared to the threshold
  173. child_ids = item[self._index_children_id]
  174. if not child_ids:
  175. leaf_nodes.append(item)
  176. continue
  177. child_usage_rates = [
  178. self._get_usage_rate_by_op_id(op_id) for op_id in child_ids
  179. ]
  180. is_high = True
  181. for usage_rate in child_usage_rates:
  182. if usage_rate < high_threshold:
  183. is_high = False
  184. break
  185. if is_high and is_low:
  186. result.append(item)
  187. if all_below_low_threshold:
  188. result = leaf_nodes
  189. elif all_higher_high_threshold:
  190. result = [root_node]
  191. return result
  192. def _get_usage_rate_by_op_id(self, op_id):
  193. """
  194. Gets the usage rate of the queue corresponding to the specified operator.
  195. Args:
  196. op_id (int): The pipeline operator ID.
  197. Returns:
  198. float, the usage rate of the queue corresponding to the specified
  199. operator.
  200. """
  201. index = self._op_id_index_map.get(op_id)
  202. op_info = self._data[index]
  203. return op_info[self._index_output_queue_usage_rate]
  204. def _set_display_col_name(self, is_display_op_detail):
  205. """
  206. Set the display column name according to the filter condition.
  207. Args:
  208. is_display_op_detail (bool): Whether to display the detailed operator
  209. information.
  210. """
  211. if not is_display_op_detail:
  212. self._display_col_names = self._col_names[0:2]
  213. self._display_col_names.extend(self._col_names[4:])
  214. def _convert_field_type(self, row):
  215. """
  216. Convert the field type of minddata pipeline file to the specific type.
  217. Args:
  218. row (list[str]): One row data from parsed data.
  219. Returns:
  220. list[Union[str, int, float]], the converted data.
  221. """
  222. return [
  223. int(row[self._index_op_id]),
  224. row[self._index_op_type],
  225. int(row[self._index_num_workers]),
  226. json.loads(row[self._index_output_queue_size])
  227. if row[self._index_output_queue_size] else None,
  228. float(row[self._index_output_queue_average_size])
  229. if row[self._index_output_queue_average_size] else None,
  230. int(row[self._index_output_queue_length])
  231. if row[self._index_output_queue_length] else None,
  232. float(row[self._index_output_queue_usage_rate])
  233. if row[self._index_output_queue_usage_rate] else None,
  234. int(row[self._index_sample_interval]),
  235. int(row[self._index_parent_id])
  236. if row[self._index_parent_id] else None,
  237. json.loads(row[self._index_children_id])
  238. if row[self._index_children_id] else None
  239. ]
  240. def _get_op_id_index_map(self):
  241. """
  242. Get the map of the operator id and index in data.
  243. Returns:
  244. dict, the map of the operator id and index in data.
  245. """
  246. the_map = {}
  247. for index, op_info in enumerate(self._data):
  248. the_map[op_info[self._index_op_id]] = index
  249. return the_map