You can not select more than 25 topics Topics must start with a chinese character,a letter or number, can include dashes ('-') and can be up to 35 characters long.

cpu_queue_schedule.cc 16 kB

4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374
  1. /**
  2. * Copyright 2020 Huawei Technologies Co., Ltd
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. #include "graph/load/model_manager/cpu_queue_schedule.h"
  17. #include "framework/common/debug/ge_log.h"
  18. #include "framework/common/debug/log.h"
  19. namespace {
  20. const uint32_t kCoreDim = 1; // for rtCpuKernelLaunch
  21. const char *const kCpuTaskModelPrepare = "modelPrepare";
  22. const char *const kCpuTaskWaitEndGraph = "modelWaitEndGraph";
  23. const char *const kCpuTaskModelPostpare = "modelPostpare";
  24. } // namespace
  25. namespace ge {
  26. CpuTaskInfo::CpuTaskInfo(rtStream_t stream) : args_(nullptr), args_size_(0) { stream_ = stream; }
  27. CpuTaskInfo::~CpuTaskInfo() {
  28. if (args_ == nullptr) {
  29. return;
  30. }
  31. rtError_t status = rtFree(args_);
  32. if (status != RT_ERROR_NONE) {
  33. GELOGW("Call rt free failed, status: 0x%x", status);
  34. }
  35. args_ = nullptr;
  36. }
  37. Status CpuTaskModelPrepare::GenerateCpuAddr(const map<uint32_t, ZeroCopyOffset> &node_addrs, void *&data_list_addr,
  38. void *&index_list_addr, uint32_t &num) {
  39. vector<uint64_t> addrs_list;
  40. vector<uint32_t> index_list;
  41. for (const auto &addrs : node_addrs) {
  42. const auto &addrs_mapping_list = addrs.second.GetOutsideAddrs();
  43. GE_CHK_BOOL_EXEC(!addrs_mapping_list.empty(), return PARAM_INVALID, "[Check][Param] not set outside_addrs");
  44. std::map<const void *, std::vector<void *>> virtual_args_addrs = addrs_mapping_list[0];
  45. for (const auto &virtual_args_addr : virtual_args_addrs) {
  46. num += virtual_args_addr.second.size();
  47. for (size_t i = 0; i < virtual_args_addr.second.size(); ++i) {
  48. index_list.emplace_back(addrs.first);
  49. addrs_list.push_back(static_cast<uint64_t>(reinterpret_cast<uintptr_t>(virtual_args_addr.second.at(i))));
  50. }
  51. }
  52. }
  53. GE_CHK_RT_RET(rtMalloc(&data_list_addr, addrs_list.size() * sizeof(uint64_t), RT_MEMORY_HBM));
  54. rtError_t status = rtMemcpy(data_list_addr, addrs_list.size() * sizeof(uint64_t), addrs_list.data(),
  55. addrs_list.size() * sizeof(uint64_t), RT_MEMCPY_HOST_TO_DEVICE);
  56. if (status != RT_ERROR_NONE) {
  57. REPORT_CALL_ERROR("E19999", "Call rtMemcpy failed, size:%lu, ret:0x%X", addrs_list.size() * sizeof(uint64_t),
  58. status);
  59. GELOGE(RT_FAILED, "[Call][RtMemcpy] failed, size:%lu, ret:0x%X", addrs_list.size() * sizeof(uint64_t), status);
  60. return RT_ERROR_TO_GE_STATUS(status);
  61. }
  62. GE_CHK_RT_RET(rtMalloc(&index_list_addr, index_list.size() * sizeof(uint32_t), RT_MEMORY_HBM));
  63. status = rtMemcpy(index_list_addr, index_list.size() * sizeof(uint32_t), index_list.data(),
  64. index_list.size() * sizeof(uint32_t), RT_MEMCPY_HOST_TO_DEVICE);
  65. if (status != RT_ERROR_NONE) {
  66. REPORT_CALL_ERROR("E19999", "Call rtMemcpy failed, size:%lu, ret:0x%X", index_list.size() * sizeof(uint32_t),
  67. status);
  68. GELOGE(RT_FAILED, "[Call][RtMemcpy] failed, size:%lu, ret:0x%X", index_list.size() * sizeof(uint32_t), status);
  69. return RT_ERROR_TO_GE_STATUS(status);
  70. }
  71. return SUCCESS;
  72. }
  73. Status CpuTaskModelPrepare::GenerateOutSizeAddr(const map<uint32_t, ZeroCopyOffset> &outside_addrs,
  74. void *&output_size_list_addr) {
  75. vector<uint32_t> output_sizes;
  76. for (const auto &addrs : outside_addrs) {
  77. if (addrs.second.GetDataInfo().empty()) {
  78. REPORT_INNER_ERROR("E19999", "Index:%u out_data_info is empty, check invalid", addrs.first);
  79. GELOGE(INTERNAL_ERROR, "[Check][Param] Index:%u out_data_info is empty, check invalid", addrs.first);
  80. return INTERNAL_ERROR;
  81. }
  82. uint32_t data_size = static_cast<uint32_t>(addrs.second.GetDataInfo().at(0).first);
  83. output_sizes.push_back(data_size);
  84. }
  85. GE_CHK_RT_RET(rtMalloc(&output_size_list_addr, output_sizes.size() * sizeof(uint32_t), RT_MEMORY_HBM));
  86. rtError_t status = rtMemcpy(output_size_list_addr, output_sizes.size() * sizeof(uint32_t), output_sizes.data(),
  87. output_sizes.size() * sizeof(uint32_t), RT_MEMCPY_HOST_TO_DEVICE);
  88. if (status != RT_ERROR_NONE) {
  89. REPORT_CALL_ERROR("E19999", "Call rtMemcpy failed, size:%lu, ret:0x%X", output_sizes.size() * sizeof(uint32_t),
  90. status);
  91. GELOGE(RT_FAILED, "[Call][RtMemcpy] failed, size:%lu, ret:0x%X", output_sizes.size() * sizeof(uint32_t), status);
  92. return RT_ERROR_TO_GE_STATUS(status);
  93. }
  94. return SUCCESS;
  95. }
  96. Status CpuTaskModelPrepare::Init(const vector<uint32_t> &input_queue_ids, const vector<uint32_t> &output_queue_ids,
  97. const map<uint32_t, ZeroCopyOffset> &inside_addrs,
  98. const map<uint32_t, ZeroCopyOffset> &outside_addrs, uintptr_t &out_mbuf) {
  99. if ((args_ != nullptr) || (args_size_ > 0)) {
  100. REPORT_INNER_ERROR("E19999", "Param args_ is not nullptr or args_size_:%u > 0, check invalid", args_size_);
  101. GELOGE(FAILED, "[Check][Param] Task already initialized, size:%u", args_size_);
  102. return FAILED;
  103. }
  104. GE_CHK_RT_RET(rtMalloc(&mbufptr_list_, output_queue_ids.size() * sizeof(uint64_t), RT_MEMORY_HBM));
  105. GE_CHK_RT_RET(rtMalloc(&queue_id_list_addr_, input_queue_ids.size() * sizeof(uint32_t), RT_MEMORY_HBM));
  106. rtError_t status = rtMemcpy(queue_id_list_addr_, input_queue_ids.size() * sizeof(uint32_t), input_queue_ids.data(),
  107. input_queue_ids.size() * sizeof(uint32_t), RT_MEMCPY_HOST_TO_DEVICE);
  108. if (status != RT_ERROR_NONE) {
  109. REPORT_CALL_ERROR("E19999", "Call rtMemcpy failed, size:%lu, ret:0x%X", input_queue_ids.size() * sizeof(uint32_t),
  110. status);
  111. GELOGE(RT_FAILED, "[Call][RtMemcpy] failed, size:%lu, ret:0x%X", input_queue_ids.size() * sizeof(uint32_t), status);
  112. return RT_ERROR_TO_GE_STATUS(status);
  113. }
  114. uint32_t input_addr_num = 0;
  115. uint32_t output_addr_num = 0;
  116. if (GenerateCpuAddr(inside_addrs, input_list_addr_, input_index_list_addr_, input_addr_num) != SUCCESS) {
  117. return FAILED;
  118. }
  119. if (GenerateCpuAddr(outside_addrs, output_list_addr_, output_index_list_addr_, output_addr_num) != SUCCESS) {
  120. return FAILED;
  121. }
  122. if (GenerateOutSizeAddr(outside_addrs, output_size_list_addr_) != SUCCESS) {
  123. return FAILED;
  124. }
  125. AicpuPareInfo aicpu_info;
  126. aicpu_info.aicpu_info_size = sizeof(AicpuPareInfo);
  127. aicpu_info.input_addr_num = input_addr_num;
  128. aicpu_info.input_addr_list = static_cast<uint64_t>(reinterpret_cast<uintptr_t>(input_list_addr_));
  129. aicpu_info.input_index_list = static_cast<uint64_t>(reinterpret_cast<uintptr_t>(input_index_list_addr_));
  130. aicpu_info.output_addr_num = output_addr_num;
  131. aicpu_info.output_addr_list = static_cast<uint64_t>(reinterpret_cast<uintptr_t>(output_list_addr_));
  132. aicpu_info.output_index_list = static_cast<uint64_t>(reinterpret_cast<uintptr_t>(output_index_list_addr_));
  133. aicpu_info.output_num = outside_addrs.size();
  134. aicpu_info.output_size_list = static_cast<uint64_t>(reinterpret_cast<uintptr_t>(output_size_list_addr_));
  135. aicpu_info.in_queue_num = input_queue_ids.size();
  136. aicpu_info.in_queueid_list = static_cast<uint64_t>(reinterpret_cast<uintptr_t>(queue_id_list_addr_));
  137. aicpu_info.out_queue_num = output_queue_ids.size();
  138. aicpu_info.mbufptr_list = static_cast<uint64_t>(reinterpret_cast<uintptr_t>(mbufptr_list_));
  139. args_size_ = sizeof(AicpuPareInfo);
  140. GE_CHK_RT_RET(rtMalloc(&args_, args_size_, RT_MEMORY_HBM));
  141. status = rtMemcpy(args_, args_size_, &aicpu_info, sizeof(AicpuPareInfo), RT_MEMCPY_HOST_TO_DEVICE);
  142. if (status != RT_ERROR_NONE) {
  143. REPORT_CALL_ERROR("E19999", "Call rtMemcpy failed, size:%u, ret:0x%X", args_size_, status);
  144. GELOGE(RT_FAILED, "[Call][RtMemcpy] failed, size:%u, ret:0x%X", args_size_, status);
  145. return RT_ERROR_TO_GE_STATUS(status);
  146. }
  147. out_mbuf = reinterpret_cast<uintptr_t>(mbufptr_list_);
  148. return SUCCESS;
  149. }
  150. Status CpuTaskModelPrepare::Distribute() {
  151. if ((args_ == nullptr) || (args_size_ == 0) || (stream_ == nullptr)) {
  152. REPORT_INNER_ERROR("E19999",
  153. "Param args_ is nullptr or args_size_:%u is 0 or stream_ is nullptr,"
  154. "check invalid",
  155. args_size_);
  156. GELOGE(FAILED, "[Check][Param] Task not initialized, distribute failed, size:%u", args_size_);
  157. return FAILED;
  158. }
  159. rtError_t status = rtCpuKernelLaunch(nullptr, kCpuTaskModelPrepare, kCoreDim, args_, args_size_, nullptr, stream_);
  160. if (status != RT_ERROR_NONE) {
  161. REPORT_CALL_ERROR("E19999", "Call rtCpuKernelLaunch failed, ret:0x%X", status);
  162. GELOGE(RT_FAILED, "[Call][RtCpuKernelLaunch] failed, ret:0x%X", status);
  163. return RT_ERROR_TO_GE_STATUS(status);
  164. }
  165. GELOGI("Cpu kernel launch model prepare task success.");
  166. return SUCCESS;
  167. }
  168. CpuTaskModelPrepare::~CpuTaskModelPrepare() {
  169. if (input_list_addr_ != nullptr) {
  170. GE_CHK_RT(rtFree(input_list_addr_));
  171. }
  172. if (input_index_list_addr_ != nullptr) {
  173. GE_CHK_RT(rtFree(input_index_list_addr_));
  174. }
  175. if (output_list_addr_ != nullptr) {
  176. GE_CHK_RT(rtFree(output_list_addr_));
  177. }
  178. if (output_index_list_addr_ != nullptr) {
  179. GE_CHK_RT(rtFree(output_index_list_addr_));
  180. }
  181. if (output_size_list_addr_ != nullptr) {
  182. GE_CHK_RT(rtFree(output_size_list_addr_));
  183. }
  184. if (queue_id_list_addr_ != nullptr) {
  185. GE_CHK_RT(rtFree(queue_id_list_addr_));
  186. }
  187. if (mbufptr_list_ != nullptr) {
  188. GE_CHK_RT(rtFree(mbufptr_list_));
  189. }
  190. input_list_addr_ = nullptr;
  191. input_index_list_addr_ = nullptr;
  192. output_list_addr_ = nullptr;
  193. output_index_list_addr_ = nullptr;
  194. output_size_list_addr_ = nullptr;
  195. queue_id_list_addr_ = nullptr;
  196. mbufptr_list_ = nullptr;
  197. }
  198. Status CpuTaskModelPostpare::Init(uint32_t model_id, const vector<uint32_t> &output_queue_ids, uintptr_t out_mbuf) {
  199. if ((args_ != nullptr) || (args_size_ > 0)) {
  200. REPORT_INNER_ERROR("E19999", "Param args_ is not nullptr or args_size_:%u > 0, check invalid", args_size_);
  201. GELOGE(FAILED, "[Check][Param] Task already initialized, size:%u", args_size_);
  202. return FAILED;
  203. }
  204. GE_CHK_RT_RET(rtMalloc(&queue_id_list_addr_, output_queue_ids.size() * sizeof(uint32_t), RT_MEMORY_HBM));
  205. rtError_t status = rtMemcpy(queue_id_list_addr_, output_queue_ids.size() * sizeof(uint32_t), output_queue_ids.data(),
  206. output_queue_ids.size() * sizeof(uint32_t), RT_MEMCPY_HOST_TO_DEVICE);
  207. if (status != RT_ERROR_NONE) {
  208. REPORT_CALL_ERROR("E19999", "Call rtMemcpy failed, size:%lu, ret:0x%X", output_queue_ids.size() * sizeof(uint32_t),
  209. status);
  210. GELOGE(RT_FAILED, "[Call][RtMemcpy] failed, size:%lu, ret:0x%X", output_queue_ids.size() * sizeof(uint32_t),
  211. status);
  212. return RT_ERROR_TO_GE_STATUS(status);
  213. }
  214. AicpuPareInfo aicpu_info;
  215. aicpu_info.model_id = model_id;
  216. aicpu_info.out_queue_num = output_queue_ids.size();
  217. aicpu_info.out_queueid_list = static_cast<uint64_t>(reinterpret_cast<uintptr_t>(queue_id_list_addr_));
  218. aicpu_info.mbufptr_list = static_cast<uint64_t>(out_mbuf);
  219. args_size_ = sizeof(AicpuPareInfo);
  220. GE_CHK_RT_RET(rtMalloc(&args_, args_size_, RT_MEMORY_HBM));
  221. status = rtMemcpy(args_, args_size_, &aicpu_info, sizeof(AicpuPareInfo), RT_MEMCPY_HOST_TO_DEVICE);
  222. if (status != RT_ERROR_NONE) {
  223. REPORT_CALL_ERROR("E19999", "Call rtMemcpy failed, size:%u, ret:0x%X", args_size_, status);
  224. GELOGE(RT_FAILED, "[Call][RtMemcpy] failed, size:%u, ret:0x%X", args_size_, status);
  225. return RT_ERROR_TO_GE_STATUS(status);
  226. }
  227. return SUCCESS;
  228. }
  229. Status CpuTaskModelPostpare::Distribute() {
  230. if ((args_ == nullptr) || (args_size_ == 0) || (stream_ == nullptr)) {
  231. REPORT_INNER_ERROR("E19999",
  232. "Param args_ is nullptr or args_size_:%u is 0 or stream_ is nullptr,"
  233. "check invalid",
  234. args_size_);
  235. GELOGE(FAILED, "[Check][Param] Task not initialized, distribute failed, size:%u", args_size_);
  236. return FAILED;
  237. }
  238. rtError_t status = rtCpuKernelLaunch(nullptr, kCpuTaskModelPostpare, kCoreDim, args_, args_size_, nullptr, stream_);
  239. if (status != RT_ERROR_NONE) {
  240. REPORT_CALL_ERROR("E19999", "Call rtCpuKernelLaunch failed, ret:0x%X", status);
  241. GELOGE(RT_FAILED, "[Call][RtCpuKernelLaunch] failed, ret:0x%X", status);
  242. return RT_ERROR_TO_GE_STATUS(status);
  243. }
  244. GELOGI("Cpu kernel launch model postpare task success.");
  245. return SUCCESS;
  246. }
  247. CpuTaskModelPostpare::~CpuTaskModelPostpare() {
  248. if (queue_id_list_addr_ != nullptr) {
  249. GE_CHK_RT(rtFree(queue_id_list_addr_));
  250. }
  251. queue_id_list_addr_ = nullptr;
  252. }
  253. ///
  254. /// @ingroup ge
  255. /// @brief definiteness queue schedule, active entry stream.
  256. /// @param [in] stream: stream to be active.
  257. /// @return: 0 for success / others for failCpuTaskModelPostpareed
  258. ///
  259. Status CpuTaskActiveEntry::Init(rtStream_t stream) {
  260. if (stream == nullptr) {
  261. REPORT_INNER_ERROR("E19999", "Param stream is nullptr, check invalid");
  262. GELOGE(FAILED, "[Check][Param] Task active stream not valid");
  263. return FAILED;
  264. }
  265. active_stream_ = stream;
  266. return SUCCESS;
  267. }
  268. Status CpuTaskActiveEntry::Distribute() {
  269. if ((active_stream_ == nullptr) || (stream_ == nullptr)) {
  270. REPORT_INNER_ERROR("E19999", "Param stream is nullptr or active_stream_ is nullptr, check invalid");
  271. GELOGE(FAILED, "[Check][Param] Task not initialized, distribute failed, size:%u", args_size_);
  272. return FAILED;
  273. }
  274. rtError_t ret = rtStreamActive(active_stream_, stream_);
  275. if (ret != RT_ERROR_NONE) {
  276. REPORT_CALL_ERROR("E19999", "Call rtStreamActive failed, ret:0x%X", ret);
  277. GELOGE(RT_FAILED, "[Call][RtStreamActive] failed, ret:0x%X", ret);
  278. return RT_ERROR_TO_GE_STATUS(ret);
  279. }
  280. GELOGI("Cpu kernel launch active entry task success.");
  281. return SUCCESS;
  282. }
  283. ///
  284. /// @ingroup ge
  285. /// @brief definiteness queue schedule, wait for end graph.
  286. /// @param [in] model_id: model id for wait end graph.
  287. /// @return: 0 for success / others for failed
  288. ///
  289. Status CpuTaskWaitEndGraph::Init(uint32_t model_id) {
  290. if ((args_ != nullptr) || (args_size_ > 0)) {
  291. REPORT_INNER_ERROR("E19999", "Param args_ is not nullptr or args_size_:%u > 0, check invalid", args_size_);
  292. GELOGE(FAILED, "[Check][Param] Task already initialized, size:%u", args_size_);
  293. return FAILED;
  294. }
  295. args_size_ = sizeof(model_id);
  296. rtError_t status = rtMalloc(&args_, args_size_, RT_MEMORY_HBM);
  297. if (status != RT_ERROR_NONE) {
  298. REPORT_CALL_ERROR("E19999", "Call rtMalloc failed, size:%u, ret:0x%X", args_size_, status);
  299. GELOGE(RT_FAILED, "[Call][RtMalloc] failed, size:%u, ret:0x%X", args_size_, status);
  300. return RT_ERROR_TO_GE_STATUS(status);
  301. }
  302. GE_PRINT_DYNAMIC_MEMORY(rtMalloc, "args data.", args_size_)
  303. status = rtMemcpy(args_, args_size_, &model_id, args_size_, RT_MEMCPY_HOST_TO_DEVICE);
  304. if (status != RT_ERROR_NONE) {
  305. REPORT_CALL_ERROR("E19999", "Call rtMemcpy failed, size:%u, ret:0x%X", args_size_, status);
  306. GELOGE(RT_FAILED, "[Call][RtMemcpy] failed, size:%u, ret:0x%X", args_size_, status);
  307. return RT_ERROR_TO_GE_STATUS(status);
  308. }
  309. return SUCCESS;
  310. }
  311. Status CpuTaskWaitEndGraph::Distribute() {
  312. if ((args_ == nullptr) || (args_size_ == 0) || (stream_ == nullptr)) {
  313. REPORT_INNER_ERROR("E19999",
  314. "Param args_ is nullptr or args_size_:%u is 0 or stream_ is nullptr,"
  315. "check invalid",
  316. args_size_);
  317. GELOGE(FAILED, "[Check][Param] Task not initialized, distribute failed, size:%u", args_size_);
  318. return FAILED;
  319. }
  320. rtError_t status = rtCpuKernelLaunch(nullptr, kCpuTaskWaitEndGraph, kCoreDim, args_, args_size_, nullptr, stream_);
  321. if (status != RT_ERROR_NONE) {
  322. REPORT_CALL_ERROR("E19999", "Call rtCpuKernelLaunch failed, ret:0x%X", status);
  323. GELOGE(RT_FAILED, "[Call][RtCpuKernelLaunch] failed, ret:0x%X", status);
  324. return RT_ERROR_TO_GE_STATUS(status);
  325. }
  326. GELOGI("Cpu kernel launch wait end task success.");
  327. return SUCCESS;
  328. }
  329. } // namespace ge

图引擎模块(GE)是MindSpore的一个子模块,其代码由C++实现,位于前端模块ME和底层硬件之间,起到承接作用。图引擎模块以ME下发的图作为输入,然后进行一系列的深度图优化操作,最后输出一张可以在底层硬件上高效运行的图。GE针对昇腾AI处理器的硬件结构特点,做了特定的优化工作,以此来充分发挥出昇腾AI处理器的强大算力。在进行模型训练/推理时,GE会被自动调用而用户并不感知。GE主要由GE API和GE Core两部分组成,详细的架构图如下所示