You can not select more than 25 topics Topics must start with a chinese character,a letter or number, can include dashes ('-') and can be up to 35 characters long.

minddata_analyser.py 12 kB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291
  1. # Copyright 2020 Huawei Technologies Co., Ltd
  2. #
  3. # Licensed under the Apache License, Version 2.0 (the "License");
  4. # you may not use this file except in compliance with the License.
  5. # You may obtain a copy of the License at
  6. #
  7. # http://www.apache.org/licenses/LICENSE-2.0
  8. #
  9. # Unless required by applicable law or agreed to in writing, software
  10. # distributed under the License is distributed on an "AS IS" BASIS,
  11. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. # See the License for the specific language governing permissions and
  13. # limitations under the License.
  14. # ============================================================================
  15. """Data process analyser."""
  16. import os
  17. from mindinsight.profiler.analyser.base_analyser import BaseAnalyser
  18. class MinddataAnalyser(BaseAnalyser):
  19. """The Minddata profiling analyser."""
  20. DEVICE_QUEUE_EMPTY_WARNING_THRESHOLD = 0.7
  21. DEVICE_QUEUE_NOT_EMPTY_THRESHOLD = 0.95
  22. def analyse_get_next_info(self, info_type="all"):
  23. """
  24. Analyse the get_next operation info.
  25. Args:
  26. info_type (str): The info type to return, default return both queue and time info,
  27. other options are ["queue", "time"].
  28. Returns:
  29. list[list], all get_next operation info, each info contains node_name, start, end, queue_size.
  30. """
  31. # init queue info result
  32. queue_info = dict()
  33. queue_size_list = []
  34. empty_step_count = 0
  35. # init time info result
  36. time_info = dict()
  37. time_list = []
  38. total_cost = 0
  39. file_name = "minddata_aicpu_" + self._device_id + ".txt"
  40. file_path = MinddataAnalyser.find_target_file(self._profiling_dir, file_name)
  41. if file_path:
  42. with open(file_path) as data_file:
  43. for line in data_file.readlines():
  44. node_info = line.split()
  45. if node_info and node_info[0] == "GetNext_dequeue_wait":
  46. # analyse target info type
  47. if len(node_info) > 3 and info_type in ["all", "queue"]:
  48. queue_size_list.append(int(node_info[3]))
  49. if node_info[3] == '0':
  50. empty_step_count += 1
  51. if len(node_info) > 2 and info_type in ["all", "time"]:
  52. one_step_cost_time = (float(node_info[2]) - float(node_info[1]))/1e3
  53. time_list.append(one_step_cost_time)
  54. total_cost += one_step_cost_time
  55. if info_type in ["all", "queue"]:
  56. queue_info["size"] = len(queue_size_list)
  57. queue_info["info"] = {"queue": queue_size_list}
  58. queue_info["summary"] = {
  59. "queue_summary": {
  60. "empty_queue": empty_step_count
  61. }
  62. }
  63. if len(node_info) > 2 and info_type in ["all", "time"]:
  64. time_info["size"] = len(time_list)
  65. time_info["info"] = {"get_next": time_list}
  66. if time_info["size"]:
  67. time_info["summary"] = {
  68. "time_summary": {
  69. "avg_cost": "0" if not time_list else str(total_cost / len(time_list))
  70. }
  71. }
  72. return queue_info, time_info
  73. def analyse_device_queue_info(self, info_type="all"):
  74. """
  75. Analyse the device_queue operation info.
  76. Args:
  77. info_type (str): The info type to return, default return both queue and time info,
  78. other options are ["queue", "time"].
  79. Returns:
  80. dict, queue size info.
  81. dict, time cost info.
  82. """
  83. # init queue info result
  84. queue_info = dict()
  85. get_time_list, push_time_list, total_time_list = [], [], []
  86. total_cost, total_push, total_get = 0, 0, 0
  87. # init time info result
  88. time_info = dict()
  89. queue_size_list = []
  90. empty_step, full_step = 0, 0
  91. file_path = self.get_device_queue_file_path()
  92. if file_path:
  93. with open(file_path) as data_file:
  94. for line in data_file.readlines():
  95. op_info = line.split()
  96. # time info
  97. if op_info and op_info[0] == "0" and info_type in ["all", "time"]:
  98. # sub_type: 0 get_time, 1 push time, 2 total time
  99. # op_info: 2: step num 3: cost time
  100. if op_info[1] == "0":
  101. get_time_list.append([int(op_info[2]), float(op_info[3])])
  102. total_get += float(op_info[3])
  103. elif op_info[1] == "1":
  104. push_time_list.append([int(op_info[2]), float(op_info[3])])
  105. total_push += float(op_info[3])
  106. elif op_info[1] == "2":
  107. total_time_list.append([int(op_info[2]), float(op_info[3])])
  108. total_cost += float(op_info[3])
  109. elif op_info and op_info[0] == "1" and info_type in ["all", "queue"]:
  110. queue_size_list.append([int(op_info[2]), int(op_info[3])])
  111. if op_info[1] == op_info[3]:
  112. full_step += 1
  113. if op_info[3] == "0":
  114. empty_step += 1
  115. if info_type in ["all", "time"]:
  116. total_time_list = MinddataAnalyser.sort_step(total_time_list)
  117. push_time_list = MinddataAnalyser.sort_step(push_time_list)
  118. get_time_list = MinddataAnalyser.sort_step(get_time_list)
  119. time_info["size"] = len(total_time_list)
  120. time_info["info"] = {"total_cost": total_time_list,
  121. "push_cost": push_time_list,
  122. "get_cost": get_time_list}
  123. if time_info["size"]:
  124. time_info["summary"] = {"time_summary": {"avg_cost": total_cost/time_info["size"]}}
  125. time_info["summary"]["time_summary"]["get_cost"] = total_get/time_info["size"]
  126. time_info["summary"]["time_summary"]["push_cost"] = total_push/time_info["size"]
  127. if info_type in ["all", "queue"]:
  128. queue_size_list = MinddataAnalyser.sort_step(queue_size_list)
  129. queue_info["size"] = len(queue_size_list)
  130. queue_info["info"] = {"queue": queue_size_list}
  131. queue_info["summary"] = {"queue_summary": {"empty_queue": empty_step}}
  132. queue_info["summary"]["queue_summary"]["full_queue"] = full_step
  133. return queue_info, time_info
  134. def get_device_queue_file_path(self):
  135. """
  136. Get device queue file path.
  137. Returns:
  138. str, the file path.
  139. """
  140. device_queue_file_name = "device_queue_profiling_" + self._device_id + ".txt"
  141. device_queue_file_path = MinddataAnalyser.find_target_file(self._profiling_dir, device_queue_file_name)
  142. feed_file_name = "dataset_iterator_profiling_" + self._device_id + ".txt"
  143. feed_file_path = MinddataAnalyser.find_target_file(self._profiling_dir, feed_file_name)
  144. file_path = ""
  145. if device_queue_file_path:
  146. file_path = device_queue_file_path
  147. elif not device_queue_file_path and feed_file_path:
  148. file_path = feed_file_path
  149. return file_path
  150. @staticmethod
  151. def analyse_queue_summary(get_next_queue_info, device_queue_info):
  152. """
  153. Analyse the queue summary info.
  154. Args:
  155. get_next_queue_info (dict): the get_next queue info return by ananlyser.
  156. device_queue_info (dict): the device queue info return by ananlyser.
  157. Returns:
  158. dict, the summary of queue.
  159. """
  160. result = {}
  161. if get_next_queue_info and device_queue_info:
  162. result = {"data_process": {"status": "normal"},
  163. "device_queue_op": {"status": "normal"},
  164. "tdt": {"status": "normal"},
  165. "get_next": {"status": "normal"}}
  166. get_next_queue_empty_count = get_next_queue_info.get(
  167. "summary", {}).get("queue_summary", {}).get("empty_queue", 0)
  168. result["get_next_queue_info"] = {
  169. "summary": {
  170. "empty_batch_count": get_next_queue_empty_count,
  171. "total_batch": get_next_queue_info.get("size")
  172. }
  173. }
  174. device_queue_empty_count = device_queue_info.get(
  175. "summary", {}).get("queue_summary", {}).get("empty_queue", 0)
  176. device_queue_full_count = device_queue_info.get(
  177. "summary", {}).get("queue_summary", {}).get("full_queue", 0)
  178. result["device_queue_info"] = {"summary": {
  179. "empty_batch_count": device_queue_empty_count,
  180. "full_batch_count": device_queue_full_count,
  181. "total_batch": device_queue_info.get("size")}}
  182. if get_next_queue_empty_count:
  183. if device_queue_empty_count > device_queue_info.get("size", 0)*\
  184. MinddataAnalyser.DEVICE_QUEUE_EMPTY_WARNING_THRESHOLD:
  185. result["data_process"]["status"] = "warning"
  186. elif device_queue_empty_count < device_queue_info.get("size", 0)*\
  187. MinddataAnalyser.DEVICE_QUEUE_NOT_EMPTY_THRESHOLD:
  188. result["tdt"]["status"] = "warning"
  189. result["device_queue_op"]["status"] = "warning"
  190. elif device_queue_info and not get_next_queue_info:
  191. result = {"data_process": {"status": "normal"},
  192. "fpbp": {"status": "normal"}}
  193. device_queue_empty_count = device_queue_info.get(
  194. "summary", {}).get("queue_summary", {}).get("empty_queue", 0)
  195. device_queue_full_count = device_queue_info.get(
  196. "summary", {}).get("queue_summary", {}).get("full_queue", 0)
  197. result["device_queue_info"] = {
  198. "summary": {
  199. "empty_batch_count": device_queue_empty_count,
  200. "full_batch_count": device_queue_full_count,
  201. "total_batch": device_queue_info.get("size")
  202. }
  203. }
  204. if device_queue_empty_count > device_queue_info.get("size", 0)*0.7:
  205. result["data_process"]["status"] = "warning"
  206. return result
  207. @staticmethod
  208. def sort_step(step_info_list):
  209. """
  210. Sorting the list by the first item and return the list of second item.
  211. Args:
  212. step_info_list (list): the step info, contains [step_num, info].
  213. Returns:
  214. list, the info list sorted by step.
  215. """
  216. step_info_list.sort(key=lambda x: x[0])
  217. result = []
  218. for item in step_info_list:
  219. result.append(item[1])
  220. return result
  221. @staticmethod
  222. def find_target_file(file_dir, file_name):
  223. """
  224. Find the target file in dir, and return the find file's abs path or "".
  225. Args:
  226. file_dir (str): The target file dir.
  227. file_name (str): The target file name.
  228. Returns:
  229. str, the abs file path.
  230. """
  231. target_file_path = ""
  232. for root_path, _, file_names in os.walk(file_dir):
  233. for item in file_names:
  234. if item == file_name:
  235. target_file_path = os.path.join(root_path, file_name)
  236. return target_file_path
  237. def _filter(self, filter_condition):
  238. """
  239. Filter the profiling data according to the filter condition.
  240. Args:
  241. filter_condition (dict): The filter condition.
  242. """
  243. def _load(self):
  244. """Load data according to the parsed profiling files."""