You can not select more than 25 topics Topics must start with a chinese character,a letter or number, can include dashes ('-') and can be up to 35 characters long.

step_trace_parser.py 14 kB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382
  1. # Copyright 2020 Huawei Technologies Co., Ltd
  2. #
  3. # Licensed under the Apache License, Version 2.0 (the "License");
  4. # you may not use this file except in compliance with the License.
  5. # You may obtain a copy of the License at
  6. #
  7. # http://www.apache.org/licenses/LICENSE-2.0
  8. #
  9. # Unless required by applicable law or agreed to in writing, software
  10. # distributed under the License is distributed on an "AS IS" BASIS,
  11. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. # See the License for the specific language governing permissions and
  13. # limitations under the License.
  14. # ============================================================================
  15. """The parser for step trace data."""
  16. import csv
  17. import json
  18. import os
  19. import stat
  20. import struct
  21. from collections import namedtuple
  22. from decimal import Decimal
  23. from mindinsight.profiler.common.exceptions.exceptions import ProfilerPathErrorException, \
  24. JobIdMismatchException, ProfilerIOException
  25. from mindinsight.profiler.common.log import logger as log
  26. from mindinsight.profiler.common.util import get_summary_for_step_trace
  27. StepTraceStruct = namedtuple(
  28. 'TrainingTraceStruct', ['tag_id', 'task_id', 'stream_id', 'sys_count']
  29. )
  30. class StepTraceParser:
  31. """
  32. The parser for step trace data.
  33. Args:
  34. input_dir (str): The directory that contains original step trace data.
  35. output_file_path (str): The output file path.
  36. job_id (int): The job id used to define the start of new step. Default: 0.
  37. skip_first_step (bool): Whether skip the first step or not.
  38. """
  39. _event_size = 20
  40. _fp_tag = 1
  41. _bp_tag = 2
  42. _end_tag = 255
  43. def __init__(self, input_dir, output_file_path, job_id=0, skip_first_step=False):
  44. self._input_dir = input_dir
  45. self._output_path = output_file_path
  46. self._job_id = job_id
  47. self._skip_first_step = skip_first_step
  48. self._result = []
  49. self._header = []
  50. self._step_num = 0
  51. self._tag_map = {}
  52. @property
  53. def output_file(self):
  54. """The property of step trace header."""
  55. file_name = self._output_path.rsplit('/', 2)
  56. return file_name[-1] if len(file_name) == 3 else ''
  57. def show(self):
  58. """The property of step trace info."""
  59. summary_info = {}
  60. if self._result:
  61. summary_info = get_summary_for_step_trace(self._result[-1], self._header)
  62. summary_info['total_steps'] = len(self._result) - 1
  63. print('\nStep trace summary info (unit: syscnt):')
  64. print(summary_info)
  65. print('\nThe step trace parse result saves under ${summary_dir}/profiler/%s'
  66. % self.output_file)
  67. def parse_and_save(self):
  68. """Parse step trace files and save the result."""
  69. try:
  70. source_files = self._get_step_trace_files()
  71. self._parse(source_files)
  72. self._save()
  73. except IOError as err:
  74. log.exception(err)
  75. raise ProfilerIOException()
  76. else:
  77. log.info("Finish to save intermediate result for step trace file.")
  78. def record_point_info(self, point_info, output_path):
  79. """
  80. Record point info into json.
  81. Args:
  82. point_info (dict): The point info about tag id and relative op name.
  83. output_path (str): The output path for saving point info.
  84. Returns:
  85. dict, parsed point info.
  86. """
  87. points = {
  88. 'fp_start': point_info.get(self._fp_tag, ''),
  89. 'bp_end': point_info.get(self._bp_tag, '')
  90. }
  91. try:
  92. with open(output_path, 'w') as json_file:
  93. json.dump(points, json_file)
  94. os.chmod(output_path, stat.S_IREAD)
  95. except (IOError, OSError) as err:
  96. log.warning('Failed to save point info. %s', err)
  97. raise ProfilerIOException
  98. return points
  99. def update_tag_op_type_map(self, point_info):
  100. """
  101. update the map from tag id to op type.
  102. Args:
  103. point_info (dict): The point info about tag id and relative op name.
  104. """
  105. tag_map = {}
  106. for tag, op_name in point_info.items():
  107. op_type = self._get_op_type(tag, op_name)
  108. tag_map[tag] = op_type
  109. log.info("Get tag types for step trace analysis: %s", tag_map)
  110. self._tag_map = tag_map
  111. def _get_op_type(self, tag, name):
  112. """
  113. Get op type from tag and name.
  114. Args:
  115. tag (int): The tag id.
  116. name (str): The op name.
  117. Returns:
  118. str, the op type.
  119. """
  120. tag_map = {self._fp_tag: 'fp', self._bp_tag: 'bp', self._end_tag: 'end'}
  121. # get solid tag type
  122. op_type = tag_map.get(tag, '')
  123. if op_type:
  124. return op_type
  125. # check if the tag is step tag.
  126. if tag > self._end_tag or tag == 0:
  127. return 'start'
  128. # analyze the reduce tag
  129. op_type = name.rsplit('/', 1)[-1].split('-')[0]
  130. if not op_type:
  131. log.warning("Unexpected op name:%s", name)
  132. return op_type
  133. def _get_step_trace_files(self):
  134. """Get step trace files."""
  135. # step trace files may under $profiler_dir or $profiler_dir/data
  136. profiler_dir = self._input_dir
  137. step_trace_files = self._search_file(profiler_dir)
  138. if not step_trace_files:
  139. # try to find step trace files under $profiler_dir/data
  140. profiler_dir = os.path.join(profiler_dir, 'data')
  141. step_trace_files = self._search_file(profiler_dir)
  142. if not step_trace_files:
  143. raise ProfilerPathErrorException('Training trace file does not exist.')
  144. return step_trace_files
  145. @staticmethod
  146. def _search_file(input_dir):
  147. """Search step trace file under specific input directory."""
  148. # validate input_dir
  149. if not os.path.isdir(input_dir):
  150. raise ProfilerPathErrorException(
  151. '{} does not exist or is not a dir'.format(input_dir)
  152. )
  153. # get step trace files
  154. files = os.listdir(input_dir)
  155. step_trace_files = list(
  156. filter(
  157. lambda file: file.startswith('training_trace') and not file.endswith('.done'),
  158. files
  159. )
  160. )
  161. # validate result
  162. if len(step_trace_files) > 1:
  163. # the format of file name is like
  164. # `training_trace.46.dev.profiler_default_tag.$id.slice_$number`
  165. # use the $number as the sorted key
  166. try:
  167. step_trace_files.sort(key=lambda path: int(path.rsplit('_', 1)[-1]))
  168. except ValueError as err:
  169. log.warning("Unable to parse file names: %s. %s", step_trace_files, err)
  170. step_trace_files = []
  171. file_paths = [os.path.join(input_dir, file) for file in step_trace_files]
  172. log.info("Find %d step trace files.", len(file_paths))
  173. return file_paths
  174. def _parse(self, source_files):
  175. """Parse source step trace files."""
  176. log.info("Start to parse step trace file.")
  177. event_info = {}
  178. for source_file in source_files:
  179. with open(source_file, 'rb') as handler:
  180. content = handler.read()
  181. for step_trace in self._get_next_step_trace(content, event_info):
  182. if self._skip_first_step:
  183. self._skip_first_step = False
  184. continue
  185. self._record_trace_event(step_trace)
  186. self._record_average_info()
  187. log.info("Finish to parse step trace file.")
  188. def _get_next_step_trace(self, content, event_info):
  189. """
  190. Get next step trace info.
  191. Args:
  192. content (bytes): The input step trace info.
  193. event_info (dict): The event info.
  194. Returns:
  195. Generator, return the step trace one by one.
  196. """
  197. for pos in range(0, len(content), 20):
  198. next_event = self._get_trace_struct(content[pos:pos + self._event_size])
  199. self._construct_event_info(next_event, event_info)
  200. if event_info.get('end'):
  201. yield event_info
  202. def _get_trace_struct(self, bin_info):
  203. """Translate event info to StepTraceStruct."""
  204. if len(bin_info) == self._event_size:
  205. parsed_info = struct.unpack('=QHHQ', bin_info)
  206. return StepTraceStruct(*parsed_info)
  207. return None
  208. def _construct_event_info(self, next_event, event_info):
  209. """Construct event info according to next_event."""
  210. min_job_id = 255
  211. step_flag: bool = lambda tag: tag > min_job_id or tag == 0
  212. end_flag: bool = lambda tag: tag == min_job_id
  213. fp_flag: bool = lambda tag: tag == self._fp_tag
  214. bp_flag: bool = lambda tag: tag == self._bp_tag
  215. def _on_step_event():
  216. """Handle step event."""
  217. self._validate_tag_id(tag_id)
  218. start_time = event_info.get('end', '-')
  219. event_info.clear()
  220. event_info['start'] = start_time
  221. event_info['reduce'] = {}
  222. def _on_reduce_event(reduce_tag_id):
  223. """Handle reduce event."""
  224. stream_id = next_event.stream_id
  225. if event_info['reduce'].get(stream_id):
  226. event_info['reduce'][stream_id].append((reduce_tag_id, sys_count))
  227. else:
  228. event_info['reduce'][stream_id] = [(reduce_tag_id, sys_count)]
  229. tag_id = next_event.tag_id
  230. sys_count = next_event.sys_count
  231. if end_flag(tag_id):
  232. event_info['end'] = sys_count
  233. elif step_flag(tag_id):
  234. _on_step_event()
  235. elif fp_flag(tag_id):
  236. event_info['fp'] = sys_count
  237. elif bp_flag(tag_id):
  238. event_info['bp'] = sys_count
  239. else:
  240. _on_reduce_event(tag_id)
  241. def _validate_tag_id(self, job_id):
  242. """Check the job id in source step trace file is same as user set."""
  243. if not self._job_id:
  244. self._job_id = job_id
  245. elif self._job_id != job_id:
  246. raise JobIdMismatchException()
  247. def _record_trace_event(self, step_trace):
  248. """Record trace event."""
  249. self._step_num += 1
  250. start_time = step_trace.get('start')
  251. end_time = step_trace.get('end')
  252. fp_time = step_trace.get('fp')
  253. bp_time = step_trace.get('bp')
  254. if not (start_time and end_time and fp_time and bp_time):
  255. log.warning("The step %d lacks basic time.", self._step_num)
  256. return
  257. if start_time == '-':
  258. start_time = fp_time
  259. row_data = {
  260. 'step_num': self._step_num,
  261. 'start_point': start_time,
  262. 'end_point': end_time,
  263. 'total': end_time - start_time,
  264. 'fp_point': fp_time,
  265. 'bp_point': bp_time,
  266. 'iteration_interval': fp_time - start_time,
  267. 'fp_and_bp': bp_time - fp_time,
  268. 'tail': end_time - bp_time
  269. }
  270. # update reduce info
  271. self._update_reduce_info(step_trace, row_data)
  272. # save the row data
  273. if not self._header:
  274. self._header = list(row_data.keys())
  275. row_data_list = [row_data.get(header_name, 0) for header_name in self._header]
  276. self._result.append(row_data_list)
  277. def _update_reduce_info(self, step_trace, row_data):
  278. """Extract reduce info."""
  279. reduce_time = step_trace.get('reduce', {})
  280. for stream_id, time_points in reduce_time.items():
  281. time_point_num = len(time_points)
  282. if time_point_num % 2:
  283. log.warning("Stream %d has %d reduce time points.", stream_id, time_point_num)
  284. continue
  285. for index, point_id in enumerate(range(0, time_point_num, 2)):
  286. field_name = f'stream_{stream_id}_{index}'
  287. reduce_info = self._get_single_reduce_event_info(
  288. field_name, time_points[point_id], time_points[point_id + 1])
  289. row_data.update(reduce_info)
  290. def _get_single_reduce_event_info(self, field_name, start_point, end_point):
  291. """
  292. Get single reduce info.
  293. Args:
  294. field_name (str): The field name.
  295. start_point (Tuple[int, int]): Start point time info, including (tag_id, sys_count).
  296. end_point (Tuple[int, int]): End point time info, including (tag_id, sys_count).
  297. Returns:
  298. dict, reduce info.
  299. """
  300. reduce_info = {}
  301. if end_point[0] - start_point[0] != 1 or end_point[0] % 2:
  302. log.warning("Unmatched reduce event <%s, %s>.", start_point, end_point)
  303. return reduce_info
  304. op_type = self._tag_map.get(start_point[0])
  305. # append field name with op type.
  306. if not op_type:
  307. log.warning("Can't recognize the inner type for point tag: %d.", start_point[0])
  308. field_name += '_parallel'
  309. else:
  310. field_name += '_' + op_type
  311. reduce_info[field_name] = end_point[1] - start_point[1]
  312. reduce_info[field_name + '_start_point'] = start_point[1]
  313. reduce_info[field_name + '_end_point'] = end_point[1]
  314. return reduce_info
  315. def _record_average_info(self):
  316. """Calculate average info."""
  317. result_size = len(self._result)
  318. # calculate average data for each column in result data
  319. average_data = [0] * len(self._header)
  320. if result_size >= 2:
  321. for row_info in self._result[1:]:
  322. average_data = [
  323. Decimal(i) + Decimal(j) for i, j in zip(row_info, average_data)
  324. ]
  325. average_data = [
  326. round((item / (result_size - 1))) for item in average_data
  327. ]
  328. # change step num info in average_data to None
  329. step_num_index = self._header.index('step_num')
  330. average_data[step_num_index] = '-'
  331. self._result.append(average_data)
  332. log.info("Finish add average info for step trace.")
  333. def _save(self):
  334. log.info("Start to save step trace file.")
  335. if not self._header:
  336. return
  337. with open(self._output_path, 'w') as file_handle:
  338. csv_writer = csv.writer(file_handle)
  339. csv_writer.writerow(self._header)
  340. for row_data in self._result:
  341. csv_writer.writerow(row_data)
  342. os.chmod(self._output_path, stat.S_IREAD)