You can not select more than 25 topics Topics must start with a chinese character,a letter or number, can include dashes ('-') and can be up to 35 characters long.

minddata_analyser.py 13 kB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305
  1. # Copyright 2020 Huawei Technologies Co., Ltd
  2. #
  3. # Licensed under the Apache License, Version 2.0 (the "License");
  4. # you may not use this file except in compliance with the License.
  5. # You may obtain a copy of the License at
  6. #
  7. # http://www.apache.org/licenses/LICENSE-2.0
  8. #
  9. # Unless required by applicable law or agreed to in writing, software
  10. # distributed under the License is distributed on an "AS IS" BASIS,
  11. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. # See the License for the specific language governing permissions and
  13. # limitations under the License.
  14. # ============================================================================
  15. """Data process analyser."""
  16. import os
  17. from mindinsight.profiler.analyser.base_analyser import BaseAnalyser
  18. from mindinsight.profiler.common.validator.validate_path import validate_and_normalize_path
  19. class MinddataAnalyser(BaseAnalyser):
  20. """The Minddata profiling analyser."""
  21. DEVICE_QUEUE_EMPTY_WARNING_THRESHOLD = 0.7
  22. DEVICE_QUEUE_NOT_EMPTY_THRESHOLD = 0.95
  23. def analyse_get_next_info(self, info_type="all"):
  24. """
  25. Analyse the get_next operation info.
  26. Args:
  27. info_type (str): The info type to return, default return both queue and time info,
  28. other options are ["queue", "time"].
  29. Returns:
  30. list[list], all get_next operation info, each info contains node_name, start, end, queue_size.
  31. """
  32. # init queue info result
  33. queue_info = dict()
  34. queue_size_list = []
  35. empty_step_count = 0
  36. # init time info result
  37. time_info = dict()
  38. time_list = []
  39. total_cost = 0
  40. file_name = "minddata_aicpu_" + self._device_id + ".txt"
  41. file_path = MinddataAnalyser.find_target_file(self._profiling_dir, file_name)
  42. # the GPU minddata profiler file
  43. if not file_path:
  44. file_name = "minddata_getnext_profiling_" + self._device_id + ".txt"
  45. file_path = MinddataAnalyser.find_target_file(self._profiling_dir, file_name)
  46. if file_path:
  47. file_path = validate_and_normalize_path(
  48. file_path, raise_key="Invaild minddata_getnext file path.")
  49. with open(file_path) as data_file:
  50. for line in data_file.readlines():
  51. node_info = line.split()
  52. # Ascend:GetNext_dequeue_wait GPU:GetNext
  53. if node_info and node_info[0][0:7] == "GetNext":
  54. # analyse target info type
  55. if len(node_info) > 3 and info_type in ["all", "queue"]:
  56. queue_size_list.append(int(node_info[3]))
  57. if node_info[3] == '0':
  58. empty_step_count += 1
  59. if len(node_info) > 2 and info_type in ["all", "time"]:
  60. one_step_cost_time = (float(node_info[2]) - float(node_info[1]))/1e3
  61. # The time stamp in Ascend is μs but in GPU is ns.
  62. if 'minddata_getnext_profiling' in file_name:
  63. one_step_cost_time = one_step_cost_time/1e3
  64. time_list.append(one_step_cost_time)
  65. total_cost += one_step_cost_time
  66. if info_type in ["all", "queue"]:
  67. queue_info["size"] = len(queue_size_list)
  68. queue_info["info"] = {"queue": queue_size_list}
  69. queue_info["summary"] = {
  70. "queue_summary": {
  71. "empty_queue": empty_step_count
  72. }
  73. }
  74. if len(node_info) > 2 and info_type in ["all", "time"]:
  75. time_info["size"] = len(time_list)
  76. time_info["info"] = {"get_next": time_list}
  77. if time_info["size"]:
  78. time_info["summary"] = {
  79. "time_summary": {
  80. "avg_cost": "0" if not time_list else str(total_cost / len(time_list))
  81. }
  82. }
  83. return queue_info, time_info
  84. def analyse_device_queue_info(self, info_type="all"):
  85. """
  86. Analyse the device_queue operation info.
  87. Args:
  88. info_type (str): The info type to return, default return both queue and time info,
  89. other options are ["queue", "time"].
  90. Returns:
  91. dict, queue size info.
  92. dict, time cost info.
  93. """
  94. # init queue info result
  95. queue_info = dict()
  96. get_time_list, push_time_list, total_time_list = [], [], []
  97. total_cost, total_push, total_get = 0, 0, 0
  98. # init time info result
  99. time_info = dict()
  100. queue_size_list = []
  101. empty_step, full_step = 0, 0
  102. file_path = self.get_device_queue_file_path()
  103. if file_path:
  104. file_path = validate_and_normalize_path(
  105. file_path, raise_key="Invaild device_queue file path.")
  106. with open(file_path) as data_file:
  107. for line in data_file.readlines():
  108. op_info = line.split()
  109. # time info
  110. if op_info and op_info[0] == "0" and info_type in ["all", "time"]:
  111. # sub_type: 0 get_time, 1 push time, 2 total time
  112. # op_info: 2: step num 3: cost time
  113. if op_info[1] == "0":
  114. get_time_list.append([int(op_info[2]), float(op_info[3])])
  115. total_get += float(op_info[3])
  116. elif op_info[1] == "1":
  117. push_time_list.append([int(op_info[2]), float(op_info[3])])
  118. total_push += float(op_info[3])
  119. elif op_info[1] == "2":
  120. total_time_list.append([int(op_info[2]), float(op_info[3])])
  121. total_cost += float(op_info[3])
  122. elif op_info and op_info[0] == "1" and info_type in ["all", "queue"]:
  123. queue_size_list.append([int(op_info[2]), int(op_info[3])])
  124. if op_info[1] == op_info[3]:
  125. full_step += 1
  126. if op_info[3] == "0":
  127. empty_step += 1
  128. if info_type in ["all", "time"]:
  129. total_time_list = MinddataAnalyser.sort_step(total_time_list)
  130. push_time_list = MinddataAnalyser.sort_step(push_time_list)
  131. get_time_list = MinddataAnalyser.sort_step(get_time_list)
  132. time_info["size"] = len(total_time_list)
  133. time_info["info"] = {"total_cost": total_time_list,
  134. "push_cost": push_time_list,
  135. "get_cost": get_time_list}
  136. if time_info["size"]:
  137. time_info["summary"] = {"time_summary": {"avg_cost": total_cost/time_info["size"]}}
  138. time_info["summary"]["time_summary"]["get_cost"] = total_get/time_info["size"]
  139. time_info["summary"]["time_summary"]["push_cost"] = total_push/time_info["size"]
  140. if info_type in ["all", "queue"]:
  141. queue_size_list = MinddataAnalyser.sort_step(queue_size_list)
  142. queue_info["size"] = len(queue_size_list)
  143. queue_info["info"] = {"queue": queue_size_list}
  144. queue_info["summary"] = {"queue_summary": {"empty_queue": empty_step}}
  145. queue_info["summary"]["queue_summary"]["full_queue"] = full_step
  146. return queue_info, time_info
  147. def get_device_queue_file_path(self):
  148. """
  149. Get device queue file path.
  150. Returns:
  151. str, the file path.
  152. """
  153. device_queue_file_name = "device_queue_profiling_" + self._device_id + ".txt"
  154. device_queue_file_path = MinddataAnalyser.find_target_file(self._profiling_dir, device_queue_file_name)
  155. feed_file_name = "dataset_iterator_profiling_" + self._device_id + ".txt"
  156. feed_file_path = MinddataAnalyser.find_target_file(self._profiling_dir, feed_file_name)
  157. file_path = ""
  158. if device_queue_file_path:
  159. file_path = device_queue_file_path
  160. elif not device_queue_file_path and feed_file_path:
  161. file_path = feed_file_path
  162. return file_path
  163. @staticmethod
  164. def analyse_queue_summary(get_next_queue_info, device_queue_info):
  165. """
  166. Analyse the queue summary info.
  167. Args:
  168. get_next_queue_info (dict): the get_next queue info return by ananlyser.
  169. device_queue_info (dict): the device queue info return by ananlyser.
  170. Returns:
  171. dict, the summary of queue.
  172. """
  173. result = {}
  174. if get_next_queue_info and device_queue_info:
  175. result = {"data_process": {"status": "normal"},
  176. "device_queue_op": {"status": "normal"},
  177. "data_transmission": {"status": "normal"},
  178. "get_next": {"status": "normal"}}
  179. get_next_queue_empty_count = get_next_queue_info.get(
  180. "summary", {}).get("queue_summary", {}).get("empty_queue", 0)
  181. result["get_next_queue_info"] = {
  182. "summary": {
  183. "empty_batch_count": get_next_queue_empty_count,
  184. "total_batch": get_next_queue_info.get("size")
  185. }
  186. }
  187. device_queue_empty_count = device_queue_info.get(
  188. "summary", {}).get("queue_summary", {}).get("empty_queue", 0)
  189. device_queue_full_count = device_queue_info.get(
  190. "summary", {}).get("queue_summary", {}).get("full_queue", 0)
  191. result["device_queue_info"] = {"summary": {
  192. "empty_batch_count": device_queue_empty_count,
  193. "full_batch_count": device_queue_full_count,
  194. "total_batch": device_queue_info.get("size")}}
  195. # Adapt to the case that the first step data in the GPU is always empty
  196. if get_next_queue_empty_count > 1:
  197. if device_queue_empty_count > device_queue_info.get("size", 0)*\
  198. MinddataAnalyser.DEVICE_QUEUE_EMPTY_WARNING_THRESHOLD:
  199. result["data_process"]["status"] = "warning"
  200. elif device_queue_empty_count < device_queue_info.get("size", 0)*\
  201. MinddataAnalyser.DEVICE_QUEUE_NOT_EMPTY_THRESHOLD:
  202. result["data_transmission"]["status"] = "warning"
  203. result["device_queue_op"]["status"] = "warning"
  204. elif device_queue_info and not get_next_queue_info:
  205. result = {"data_process": {"status": "normal"},
  206. "fpbp": {"status": "normal"}}
  207. device_queue_empty_count = device_queue_info.get(
  208. "summary", {}).get("queue_summary", {}).get("empty_queue", 0)
  209. device_queue_full_count = device_queue_info.get(
  210. "summary", {}).get("queue_summary", {}).get("full_queue", 0)
  211. result["device_queue_info"] = {
  212. "summary": {
  213. "empty_batch_count": device_queue_empty_count,
  214. "full_batch_count": device_queue_full_count,
  215. "total_batch": device_queue_info.get("size")
  216. }
  217. }
  218. if device_queue_empty_count > device_queue_info.get("size", 0)*0.7:
  219. result["data_process"]["status"] = "warning"
  220. return result
  221. @staticmethod
  222. def sort_step(step_info_list):
  223. """
  224. Sorting the list by the first item and return the list of second item.
  225. Args:
  226. step_info_list (list): the step info, contains [step_num, info].
  227. Returns:
  228. list, the info list sorted by step.
  229. """
  230. step_info_list.sort(key=lambda x: x[0])
  231. result = []
  232. for item in step_info_list:
  233. result.append(item[1])
  234. return result
  235. @staticmethod
  236. def find_target_file(file_dir, file_name):
  237. """
  238. Find the target file in dir, and return the find file's abs path or "".
  239. Args:
  240. file_dir (str): The target file dir.
  241. file_name (str): The target file name.
  242. Returns:
  243. str, the abs file path.
  244. """
  245. target_file_path = ""
  246. for root_path, _, file_names in os.walk(file_dir):
  247. for item in file_names:
  248. if item == file_name:
  249. target_file_path = os.path.join(root_path, file_name)
  250. return target_file_path
  251. def _filter(self, filter_condition):
  252. """
  253. Filter the profiling data according to the filter condition.
  254. Args:
  255. filter_condition (dict): The filter condition.
  256. """
  257. def _load(self):
  258. """Load data according to the parsed profiling files."""