You can not select more than 25 topics Topics must start with a chinese character,a letter or number, can include dashes ('-') and can be up to 35 characters long.

hybrid_model_async_executor.cc 16 kB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362
  1. /**
  2. * Copyright 2019-2020 Huawei Technologies Co., Ltd
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. #include "hybrid/executor/hybrid_model_async_executor.h"
  17. #include "graph/load/new_model_manager/model_utils.h"
  18. #include "graph/utils/tensor_utils.h"
  19. #include "graph/utils/type_utils.h"
  20. #include "omm/csa_interact.h"
  21. namespace ge {
  22. namespace hybrid {
  23. namespace {
  24. int kDataOutputIndex = 0;
  25. }
  26. HybridModelAsyncExecutor::HybridModelAsyncExecutor(HybridModel *model) : model_(model), run_flag_(false) {}
  27. HybridModelAsyncExecutor::~HybridModelAsyncExecutor() {
  28. if (stream_ != nullptr) {
  29. GE_CHK_RT(rtStreamDestroy(stream_));
  30. }
  31. }
  32. void HybridModelAsyncExecutor::SetDeviceId(uint32_t device_id) { device_id_ = device_id; }
  33. void HybridModelAsyncExecutor::SetModelId(uint32_t model_id) { model_id_ = model_id; }
  34. Status HybridModelAsyncExecutor::EnqueueData(const shared_ptr<InputDataWrapper> &data) {
  35. GE_CHK_STATUS_EXEC(data_inputer_->Push(data), return domi::DATA_QUEUE_ISFULL,
  36. "Data queue is full, please call again later, model_id %u ", model_id_);
  37. GELOGD("EnqueueData successfully. model_id = %u, data_index = %u", data->GetInput().model_id, data->GetInput().index);
  38. return SUCCESS;
  39. }
  40. Status HybridModelAsyncExecutor::Start(const std::shared_ptr<ModelListener> &listener) {
  41. GELOGD("HybridModelExecutor::Start IN, listener = %p", listener.get());
  42. std::lock_guard<std::mutex> lk(mu_);
  43. GE_CHK_BOOL_RET_STATUS(!run_flag_, INTERNAL_ERROR, "Model already started.");
  44. run_flag_ = true;
  45. listener_ = listener;
  46. future_ = std::async([&]() -> Status { return RunInternal(); });
  47. GE_CHK_BOOL_RET_STATUS(future_.valid(), INTERNAL_ERROR, "Failed to start.");
  48. GELOGD("HybridModelExecutor::Start successfully");
  49. return SUCCESS;
  50. }
  51. Status HybridModelAsyncExecutor::Stop() {
  52. std::lock_guard<std::mutex> lk(mu_);
  53. run_flag_ = false;
  54. data_inputer_->Stop();
  55. auto ret = future_.get();
  56. if (stream_ != nullptr) {
  57. GE_CHK_RT(rtStreamDestroy(stream_));
  58. stream_ = nullptr;
  59. }
  60. return ret;
  61. }
  62. Status HybridModelAsyncExecutor::Init() {
  63. data_inputer_ = std::unique_ptr<DataInputer>(new (std::nothrow) DataInputer());
  64. GE_CHECK_NOTNULL(data_inputer_);
  65. GE_CHK_RT_RET(rtStreamCreate(&stream_, RT_STREAM_PRIORITY_DEFAULT));
  66. executor_ = std::unique_ptr<HybridModelExecutor>(new (std::nothrow) HybridModelExecutor(model_, device_id_, stream_));
  67. GE_CHECK_NOTNULL(executor_);
  68. GE_CHK_STATUS_RET(executor_->Init(), "Failed to init hybrid engine");
  69. GE_CHK_STATUS_RET(InitInputTensors(), "Failed to init input tensors");
  70. return SUCCESS;
  71. }
  72. Status HybridModelAsyncExecutor::PreRun(InputData &current_data) {
  73. GE_CHK_STATUS_RET(SyncVarData(), "Failed to sync var data");
  74. RECORD_MODEL_EXECUTION_EVENT(executor_->GetContext(), "[SyncVarData] End");
  75. GE_CHK_STATUS_RET(CopyInputData(current_data), "Failed to copy input data to model");
  76. RECORD_MODEL_EXECUTION_EVENT(executor_->GetContext(), "[CopyInputData] End");
  77. return SUCCESS;
  78. }
  79. Status HybridModelAsyncExecutor::RunInternal() {
  80. auto device_id = static_cast<int32_t>(device_id_);
  81. GELOGD("Hybrid model start. model_id = %u, device_id = %u", model_id_, device_id_);
  82. GE_CHK_RT_RET(rtSetDevice(device_id));
  83. // DeviceReset before thread run finished!
  84. GE_MAKE_GUARD(not_used_var, [&] { GE_CHK_RT(rtDeviceReset(device_id)); });
  85. while (run_flag_) {
  86. std::shared_ptr<InputDataWrapper> data_wrapper;
  87. Status ret = data_inputer_->Pop(data_wrapper);
  88. if (data_wrapper == nullptr || ret != SUCCESS) {
  89. GELOGI("data_wrapper is null!, ret = %u", ret);
  90. continue;
  91. }
  92. GELOGI("Getting the input data, model_id:%u", model_id_);
  93. GE_IF_BOOL_EXEC(!run_flag_, break);
  94. InputData current_data = data_wrapper->GetInput();
  95. GELOGI("Model thread Run begin, model id:%u, data index:%u.", model_id_, current_data.index);
  96. HybridModelExecutor::ExecuteArgs args;
  97. args.inputs.resize(input_tensors_.size());
  98. for (auto &it : input_tensors_) {
  99. args.inputs[it.first] = it.second;
  100. }
  101. RECORD_MODEL_EXECUTION_EVENT(executor_->GetContext(), "[RunInternal] [iteration = %d] Start", iterator_count_);
  102. ret = PreRun(current_data);
  103. GE_CHK_BOOL_TRUE_EXEC_WITH_LOG(
  104. ret != SUCCESS, (void)HandleResult(ret, current_data.index, args, data_wrapper->GetOutput());
  105. CsaInteract::GetInstance().StoreInternalErrorCode(ret, ERROR_MODULE_FMK, JOBSUBSTATE_GRAPH_EXEC);
  106. continue, "PreRun failed."); // [No need to check value]
  107. ret = executor_->Execute(args);
  108. ret = HandleResult(ret, current_data.index, args, data_wrapper->GetOutput());
  109. if (ret != SUCCESS) {
  110. CsaInteract::GetInstance().StoreInternalErrorCode(ret, ERROR_MODULE_RUNTIME, JOBSUBSTATE_GRAPH_EXEC);
  111. continue;
  112. }
  113. RECORD_MODEL_EXECUTION_EVENT(executor_->GetContext(), "[RunInternal] [iteration = %d] End", iterator_count_);
  114. iterator_count_++;
  115. GELOGI("run iterator count is %lu", iterator_count_);
  116. }
  117. CsaInteract::GetInstance().WriteInternalErrorCode();
  118. GELOGI("Model run end, model id:%u", model_id_);
  119. return SUCCESS;
  120. }
  121. Status HybridModelAsyncExecutor::HandleResult(Status exec_ret, uint32_t data_id, HybridModelExecutor::ExecuteArgs &args,
  122. OutputData *output_data) {
  123. GELOGD("Start to handle result. model id = %u, data index = %u, execution ret = %u", model_id_, data_id, exec_ret);
  124. std::vector<ge::OutputTensorInfo> output_tensor_info_list;
  125. if (exec_ret == END_OF_SEQUENCE) {
  126. GELOGW("End of sequence, model id = %u", model_id_);
  127. return OnComputeDone(data_id, END_OF_SEQUENCE, output_tensor_info_list);
  128. }
  129. if (exec_ret != SUCCESS) {
  130. GELOGE(exec_ret, "Failed to execute graph. model_id = %u", model_id_);
  131. return OnComputeDone(data_id, INTERNAL_ERROR, output_tensor_info_list);
  132. }
  133. GE_CHECK_NOTNULL(output_data);
  134. auto ret = CopyOutputs(args, output_data, output_tensor_info_list);
  135. if (ret != SUCCESS) {
  136. OnComputeDone(data_id, INTERNAL_ERROR, output_tensor_info_list);
  137. return INTERNAL_ERROR;
  138. }
  139. GELOGD("Executed graph successfully, model id = %u, data_index = %u", model_id_, data_id);
  140. return OnComputeDone(data_id, SUCCESS, output_tensor_info_list);
  141. }
  142. Status HybridModelAsyncExecutor::SyncVarData() {
  143. GELOGI("Sync var data, model id:%u", model_id_);
  144. TensorValue *global_step_var = model_->GetVariable(NODE_NAME_GLOBAL_STEP);
  145. if (global_step_var != nullptr) {
  146. std::vector<uint64_t> v_step;
  147. v_step.push_back(iterator_count_);
  148. GE_CHK_RT_RET(rtMemcpy(global_step_var->MutableData(), global_step_var->GetSize(), v_step.data(),
  149. v_step.size() * sizeof(uint64_t), RT_MEMCPY_HOST_TO_DEVICE));
  150. } else {
  151. GELOGD("No GLOBAL_STEP variable was found.");
  152. }
  153. return SUCCESS;
  154. }
  155. Status HybridModelAsyncExecutor::CopyInputData(const InputData &current_data) {
  156. const std::vector<DataBuffer> &blobs = current_data.blobs;
  157. for (const auto &it : input_tensors_) {
  158. auto input_index = it.first;
  159. auto input_tensor = it.second;
  160. auto data_size = input_tensor.GetSize();
  161. GELOGD("To copy input data for input[%u]", input_index);
  162. if (input_index >= blobs.size()) {
  163. GELOGE(FAILED, "Blobs not match: blobs=%zu, tensor=%zu, index=%u, size=%ld", blobs.size(),
  164. model_->input_nodes_.size(), input_index, data_size);
  165. return FAILED;
  166. }
  167. const DataBuffer &data_buf = blobs[input_index];
  168. auto mem_size = static_cast<uint32_t>(data_size);
  169. GE_CHK_BOOL_RET_STATUS(mem_size >= data_buf.length, PARAM_INVALID,
  170. "input data size(%u) does not match model required size(%u), ret failed.", data_buf.length,
  171. mem_size);
  172. GELOGI("[IMAS]CopyPlainData memcpy graph_%u type[F] output[%u] memaddr[%p] mem_size[%u] datasize[%u]",
  173. model_->root_runtime_param_.graph_id, input_index, input_tensor.GetData(), mem_size, data_buf.length);
  174. GE_CHK_RT_RET(
  175. rtMemcpy(input_tensor.MutableData(), mem_size, data_buf.data, data_buf.length, RT_MEMCPY_HOST_TO_DEVICE));
  176. }
  177. return SUCCESS;
  178. }
  179. Status HybridModelAsyncExecutor::InitInputTensors() {
  180. auto allocator = NpuMemoryAllocator::GetAllocator(device_id_);
  181. GE_CHECK_NOTNULL(allocator);
  182. int input_index = 0;
  183. for (const auto &input_node : model_->GetRootGraphItem()->GetInputNodes()) {
  184. GELOGD("Init input[%u], node = %s", input_index, input_node->NodeName().c_str());
  185. auto output_desc = input_node->op_desc->GetOutputDescPtr(kDataOutputIndex);
  186. GE_CHECK_NOTNULL(output_desc);
  187. int64_t tensor_size = 0;
  188. GE_CHK_GRAPH_STATUS_RET(TensorUtils::GetSize(*output_desc, tensor_size), "Failed to get size from %s",
  189. input_node->NodeName().c_str());
  190. if (tensor_size == 0) {
  191. GELOGW("[%s] Tensor size == 0", input_node->NodeName().c_str());
  192. GE_CHK_GRAPH_STATUS_RET(TensorUtils::GetTensorMemorySizeInBytes(*output_desc, tensor_size),
  193. "Failed to calc tensor size");
  194. GELOGD("[%s] Tensor size updated to %ld", input_node->NodeName().c_str(), tensor_size);
  195. }
  196. auto buffer = TensorBuffer::Create(allocator, tensor_size);
  197. GE_CHECK_NOTNULL(buffer);
  198. TensorValue tensor(shared_ptr<TensorBuffer>(buffer.release()));
  199. tensor.SetName("Input_" + input_node->NodeName());
  200. input_tensors_.emplace(input_index, tensor);
  201. input_index += 1;
  202. }
  203. return SUCCESS;
  204. }
  205. Status HybridModelAsyncExecutor::OnComputeDone(uint32_t data_index, uint32_t result_code,
  206. std::vector<ge::OutputTensorInfo> &outputs) {
  207. GELOGD("OnComputeDone. model id = %u, data index = %u, execution ret = %u", model_id_, data_index, result_code);
  208. if (listener_ != nullptr) {
  209. GE_CHK_STATUS(listener_->OnComputeDone(model_id_, data_index, result_code, outputs), "OnComputeDone failed");
  210. }
  211. return result_code;
  212. }
  213. Status HybridModelAsyncExecutor::CopyOutputs(HybridModelExecutor::ExecuteArgs &args, OutputData *output_data,
  214. std::vector<ge::OutputTensorInfo> &outputs) {
  215. // copy output data from op to designated position
  216. std::vector<ConstGeTensorDescPtr> &output_tensor_desc_list = args.output_desc;
  217. std::vector<TensorValue> &output_tensors = args.outputs;
  218. if (output_tensor_desc_list.size() != output_tensors.size()) {
  219. GELOGE(INTERNAL_ERROR, "Output sizes mismatch. From op_desc = %zu, and from output tensors = %zu",
  220. output_tensor_desc_list.size(), output_tensors.size());
  221. return INTERNAL_ERROR;
  222. }
  223. GELOGD("Number of outputs = %zu", output_tensor_desc_list.size());
  224. for (size_t i = 0; i < output_tensors.size(); ++i) {
  225. GELOGD("Start to process output[%zu]", i);
  226. auto &output_tensor = output_tensors[i];
  227. auto &tensor_desc = output_tensor_desc_list.at(i);
  228. GE_CHECK_NOTNULL(tensor_desc);
  229. int64_t output_size = -1;
  230. GE_CHK_GRAPH_STATUS_RET(TensorUtils::CalcTensorMemSize(tensor_desc->GetShape(), tensor_desc->GetFormat(),
  231. tensor_desc->GetDataType(), output_size),
  232. "Failed to calc tensor size for output[%zu]. shape = [%s], type = %s, format = %s", i,
  233. tensor_desc->GetShape().ToString().c_str(),
  234. TypeUtils::DataTypeToSerialString(tensor_desc->GetDataType()).c_str(),
  235. TypeUtils::FormatToSerialString(tensor_desc->GetFormat()).c_str());
  236. GELOGD("Got tensor size for output[%zu] successfully. shape = [%s], type = %s, format = %s, size = %ld", i,
  237. tensor_desc->GetShape().ToString().c_str(),
  238. TypeUtils::DataTypeToSerialString(tensor_desc->GetDataType()).c_str(),
  239. TypeUtils::FormatToSerialString(tensor_desc->GetFormat()).c_str(), output_size);
  240. GE_CHECK_GE(output_size, 0);
  241. GE_CHECK_LE(output_size, UINT32_MAX);
  242. if (output_tensor.GetSize() < static_cast<size_t>(output_size)) {
  243. GELOGE(INTERNAL_ERROR, "output[%zu] tensor size(%zu) is not enough for output shape [%s]", i,
  244. output_tensor.GetSize(), tensor_desc->GetShape().ToString().c_str());
  245. return INTERNAL_ERROR;
  246. }
  247. ge::OutputTensorInfo output;
  248. output.data_type = static_cast<uint32_t>(tensor_desc->GetDataType());
  249. output.dims = tensor_desc->GetShape().GetDims();
  250. output.length = output_size;
  251. if (output_size > 0) {
  252. std::unique_ptr<uint8_t[]> data_buf(new (std::nothrow) uint8_t[output_size]);
  253. GE_CHECK_NOTNULL(data_buf);
  254. GE_CHK_RT_RET(
  255. rtMemcpy(data_buf.get(), output_size, output_tensor.GetData(), output_size, RT_MEMCPY_DEVICE_TO_HOST));
  256. output.data = std::move(data_buf);
  257. output_data->blobs.emplace_back(data_buf.get(), static_cast<uint32_t>(output_size), false);
  258. } else {
  259. GELOGW("Output[%zu] is empty. shape = [%s]", i, tensor_desc->GetShape().ToString().c_str());
  260. output.data = nullptr;
  261. output_data->blobs.emplace_back(nullptr, 0U, false);
  262. }
  263. outputs.emplace_back(std::move(output));
  264. GELOGD("Output[%zu] added, type = %s, shape = [%s], size = %ld", i,
  265. TypeUtils::DataTypeToSerialString(tensor_desc->GetDataType()).c_str(),
  266. tensor_desc->GetShape().ToString().c_str(), output_size);
  267. }
  268. return SUCCESS;
  269. }
  270. Status HybridModelAsyncExecutor::Execute(const vector<GeTensor> &inputs, vector<GeTensor> &outputs) {
  271. GELOGD("Start to execute model.");
  272. // prepare inputs
  273. InputData input_data;
  274. for (auto &tensor : inputs) {
  275. DataBuffer buffer;
  276. buffer.data = const_cast<uint8_t *>(tensor.GetData().GetData());
  277. buffer.length = tensor.GetData().size();
  278. input_data.blobs.emplace_back(buffer);
  279. }
  280. GE_CHK_STATUS_RET(CopyInputData(input_data), "Failed to copy input data to model");
  281. GELOGD("Done copying input data successfully.");
  282. HybridModelExecutor::ExecuteArgs args;
  283. args.inputs.resize(input_tensors_.size());
  284. args.input_desc.resize(input_tensors_.size());
  285. for (auto &it : input_tensors_) {
  286. args.inputs[it.first] = it.second;
  287. args.input_desc[it.first] = MakeShared<GeTensorDesc>(inputs[it.first].GetTensorDesc());
  288. }
  289. GE_CHK_STATUS_RET(executor_->Execute(args), "Failed to execute model.");
  290. std::vector<ge::OutputTensorInfo> output_tensor_info_list;
  291. OutputData output_data;
  292. GE_CHK_STATUS_RET(CopyOutputs(args, &output_data, output_tensor_info_list), "Failed to copy outputs.");
  293. GELOGD("Done copying output data successfully. output count = %zu", output_tensor_info_list.size());
  294. int out_index = 0;
  295. outputs.resize(output_tensor_info_list.size());
  296. for (auto &out_tensor_info : output_tensor_info_list) {
  297. auto &ge_tensor = outputs[out_index];
  298. if (out_tensor_info.length > 0) {
  299. GE_CHK_GRAPH_STATUS_RET(ge_tensor.SetData(out_tensor_info.data.get(), out_tensor_info.length),
  300. "Failed to set output[%d].", out_index);
  301. }
  302. ge_tensor.MutableTensorDesc() = *args.output_desc[out_index];
  303. GELOGD("Set output[%d], tensor size = %ld, shape = [%s]", out_index, out_tensor_info.length,
  304. ge_tensor.MutableTensorDesc().MutableShape().ToString().c_str());
  305. ++out_index;
  306. }
  307. return SUCCESS;
  308. }
  309. } // namespace hybrid
  310. } // namespace ge

图引擎模块(GE)是MindSpore的一个子模块,其代码由C++实现,位于前端模块ME和底层硬件之间,起到承接作用。图引擎模块以ME下发的图作为输入,然后进行一系列的深度图优化操作,最后输出一张可以在底层硬件上高效运行的图。GE针对昇腾AI处理器的硬件结构特点,做了特定的优化工作,以此来充分发挥出昇腾AI处理器的强大算力。在进行模型训练/推理时,GE会被自动调用而用户并不感知。GE主要由GE API和GE Core两部分组成,详细的架构图如下所示