You can not select more than 25 topics Topics must start with a chinese character,a letter or number, can include dashes ('-') and can be up to 35 characters long.

hybrid_model_pipeline_executor.cc 11 kB

4 years ago
4 years ago
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286
  1. #include "hybrid_model_pipeline_executor.h"
  2. #include "common/math/math_util.h"
  3. #include "common/dump/dump_manager.h"
  4. #include "graph/ge_context.h"
  5. #include "graph/runtime_inference_context.h"
  6. namespace ge {
  7. namespace hybrid {
  8. namespace {
  9. constexpr int kNumExecutors = 2;
  10. const int kMinLoopCount = 2;
  11. const int kIntBase = 10;
  12. const char *const kEnvProfilingLevel = "HYBRID_PROFILING_LEVEL";
  13. }
  14. StageExecutor::StageExecutor(int id, HybridModel *model, PipeExecutionConfig *config)
  15. : id_(id), model_(model), pipe_config_(config) {}
  16. StageExecutor::~StageExecutor() { GELOGD("~StageExecutor(), id = %d", id_); }
  17. Status StageExecutor::Init() {
  18. GELOGD("[Executor: %d] Start to init StateExecutor", id_);
  19. context_.rt_context = pipe_config_->rt_context;
  20. GE_CHK_STATUS_RET_NOLOG(InitExecutionContext());
  21. GE_CHK_RT_RET(rtStreamCreate(&stream_, RT_STREAM_PRIORITY_DEFAULT));
  22. context_.stream = stream_;
  23. root_graph_executor_.reset(new (std::nothrow) SubgraphExecutor(model_->GetRootGraphItem(), &context_));
  24. GE_CHECK_NOTNULL(root_graph_executor_);
  25. GELOGD("[Executor: %d] Init stage executor successfully", id_);
  26. return SUCCESS;
  27. }
  28. Status StageExecutor::ResetExecutionContext(GraphExecutionContext &context) {
  29. GE_CHK_STATUS_RET_NOLOG(context.callback_manager->Init());
  30. string ctx_id = std::to_string(context.context_id);
  31. RuntimeInferenceContext::DestroyContext(ctx_id);
  32. GE_CHK_GRAPH_STATUS_RET(RuntimeInferenceContext::CreateContext(ctx_id), "Failed to Destroy RuntimeInferenceContext");
  33. return SUCCESS;
  34. }
  35. Status StageExecutor::Start(const std::vector<TensorValue> &inputs, const std::vector<ConstGeTensorDescPtr> &input_desc,
  36. int iteration_count) {
  37. GELOGD("Start");
  38. GE_CHK_RT_RET(rtCtxSetCurrent(context_.rt_context));
  39. int num_loops = iteration_count / pipe_config_->num_executors;
  40. if (id_ < iteration_count % iteration_count) {
  41. num_loops += 1;
  42. }
  43. FMK_INT32_MULCHECK(num_loops, pipe_config_->num_stages);
  44. num_loops *= pipe_config_->num_stages;
  45. GELOGD("[Executor: %d] loop count = %d", id_, num_loops);
  46. for (int loop_idx = 0; loop_idx < num_loops; ++loop_idx) {
  47. GELOGD("[Executor: %d] Start to wait for task.", id_);
  48. StageTask task_info;
  49. task_queue_.Pop(task_info);
  50. GELOGD("[Executor: %d] Got task, stage = %d, iteration = %ld", id_, task_info.stage, task_info.iteration);
  51. if (task_info.iteration >= pipe_config_->iteration_end) {
  52. GELOGE(INTERNAL_ERROR, "[Executor: %d] Unexpected iteration: %d", id_, task_info.iteration);
  53. return INTERNAL_ERROR;
  54. }
  55. if (task_info.event != nullptr) {
  56. GELOGD("[%d] Add StreamWaitEvent", id_);
  57. GE_CHK_RT_RET(rtStreamWaitEvent(stream_, task_info.event));
  58. RECORD_MODEL_EXECUTION_EVENT(&context_, "[iteration = %d] [Stage = %d] End", task_info.iteration - 1,
  59. task_info.stage);
  60. }
  61. RECORD_MODEL_EXECUTION_EVENT(&context_, "[iteration = %d] [Stage = %d] Start", task_info.iteration,
  62. task_info.stage);
  63. if (task_info.stage == 0) {
  64. GELOGD("[Executor: %d] To ResetExecutionContext", id_);
  65. GE_CHK_STATUS_RET(ResetExecutionContext(context_), "[Executor: %d] Failed to reset context", id_);
  66. context_.iteration = task_info.iteration;
  67. GE_CHK_STATUS_RET_NOLOG(SetInputs(inputs, input_desc));
  68. }
  69. RECORD_MODEL_EXECUTION_EVENT(&context_, "[Stage = %d] PartialExecuteAsync Start", task_info.stage);
  70. GE_CHK_STATUS_RET(root_graph_executor_->PartialExecuteAsync(task_info.stage));
  71. RECORD_MODEL_EXECUTION_EVENT(&context_, "[Stage = %d] PartialExecuteAsync End", task_info.stage);
  72. GELOGD("[Executor: %d] PartialExecuteAsync successfully.", id_);
  73. // notify next execution unit
  74. StageTask next_task;
  75. next_task.stage = task_info.stage;
  76. next_task.iteration = task_info.iteration + 1;
  77. auto sync_result = Synchronize();
  78. if (sync_result != SUCCESS) {
  79. GELOGE(sync_result, "[Executor: %d] Failed to sync result. iteration = %d", id_, task_info.iteration);
  80. context_.profiler->Dump(std::cout);
  81. context_.callback_manager->Destroy();
  82. RuntimeInferenceContext::DestroyContext(std::to_string(context_.context_id));
  83. return sync_result;
  84. }
  85. RECORD_MODEL_EXECUTION_EVENT(&context_, "[iteration = %d] [Stage = %d] End", task_info.iteration, task_info.stage);
  86. // if not end stage
  87. if (task_info.stage >= pipe_config_->num_stages - 1) {
  88. RECORD_MODEL_EXECUTION_EVENT(&context_, "[iteration = %d] Schedule End", task_info.iteration);
  89. GELOGD("[Executor: %d] End of iteration [%ld]", id_, task_info.iteration);
  90. context_.callback_manager->Destroy();
  91. RuntimeInferenceContext::DestroyContext(std::to_string(context_.context_id));
  92. }
  93. next_executor_->ExecuteAsync(next_task);
  94. GELOGD("[Executor: %d] Push item successfully.", id_);
  95. }
  96. GELOGD("[Executor: %d] Process task ended.", id_);
  97. return SUCCESS;
  98. }
  99. Status StageExecutor::ExecuteAsync(const StageTask &args) {
  100. (void)task_queue_.Push(args);
  101. return SUCCESS;
  102. }
  103. Status StageExecutor::Synchronize() {
  104. auto ret = root_graph_executor_->Synchronize();
  105. RECORD_MODEL_EXECUTION_EVENT(&context_, "[Synchronize] End, ret = %u", ret);
  106. return ret;
  107. }
  108. HybridModelPipelineExecutor::HybridModelPipelineExecutor(HybridModel *model, uint32_t device_id)
  109. : model_(model), device_id_(device_id) {
  110. config_.num_executors = kNumExecutors;
  111. config_.num_stages = model_->GetRootGraphItem()->NumGroups();
  112. config_.device_id = device_id_;
  113. }
  114. Status StageExecutor::InitExecutionContext() {
  115. GE_CHK_RT_RET(rtCtxCreate(&context_.rt_gen_context, RT_CTX_GEN_MODE, 0));
  116. GE_CHK_RT_RET(rtCtxSetCurrent(context_.rt_context));
  117. context_.model = model_;
  118. context_.session_id = ::ge::GetContext().SessionId();
  119. GELOGD("session id from model = %lu, from context = %lu", model_->GetSessionId(), context_.session_id);
  120. context_.allocator = NpuMemoryAllocator::GetAllocator(pipe_config_->device_id);
  121. GE_CHECK_NOTNULL(context_.allocator);
  122. context_.callback_manager = std::unique_ptr<CallbackManager>(new (std::nothrow) CallbackManager());
  123. GE_CHECK_NOTNULL(context_.callback_manager);
  124. context_.dump_properties = DumpManager::GetInstance().GetDumpProperties(context_.session_id);
  125. if (IsLogEnable(GE_MODULE_NAME, DLOG_DEBUG)) {
  126. context_.trace_enabled = true;
  127. }
  128. return SUCCESS;
  129. }
  130. Status StageExecutor::SetInputs(const vector<TensorValue> &inputs, const vector<ConstGeTensorDescPtr> &input_desc) {
  131. root_graph_executor_->InitForPartialExecution(inputs, input_desc);
  132. return SUCCESS;
  133. }
  134. Status StageExecutor::GetOutputs(vector<TensorValue> &outputs, vector<ConstGeTensorDescPtr> &output_desc) {
  135. return root_graph_executor_->GetOutputs(outputs, output_desc);
  136. }
  137. void StageExecutor::Reset() {
  138. task_queue_.Stop();
  139. task_queue_.Clear();
  140. task_queue_.Restart();
  141. }
  142. Status HybridModelPipelineExecutor::Init() {
  143. const char *profiling_level = std::getenv(kEnvProfilingLevel);
  144. if (profiling_level != nullptr) {
  145. context_.profiling_level = std::strtol(profiling_level, nullptr, kIntBase);
  146. GELOGD("Got profiling level = %ld", context_.profiling_level);
  147. if (context_.profiling_level > 0) {
  148. context_.profiler.reset(new (std::nothrow) HybridProfiler());
  149. GE_CHECK_NOTNULL(context_.profiler);
  150. }
  151. }
  152. GELOGD("Number of stages = %d, number of executors = %d", config_.num_stages, config_.num_executors);
  153. GE_CHK_RT_RET(rtCtxGetCurrent(&config_.rt_context));
  154. GE_CHK_STATUS_RET_NOLOG(InitStageExecutors());
  155. return SUCCESS;
  156. }
  157. Status HybridModelPipelineExecutor::InitStageExecutors() {
  158. for (int i = 0; i < config_.num_executors; ++i) {
  159. auto stage_executor = std::unique_ptr<StageExecutor>(new (std::nothrow) StageExecutor(i, model_, &config_));
  160. GE_CHECK_NOTNULL(stage_executor);
  161. GE_CHK_STATUS_RET_NOLOG(stage_executor->Init());
  162. if (context_.profiler != nullptr) {
  163. // will call unique_ptr::release later
  164. stage_executor->context_.profiler.reset(context_.profiler.get());
  165. stage_executor->context_.profiling_level = context_.profiling_level;
  166. }
  167. stage_executors_.emplace_back(std::move(stage_executor));
  168. }
  169. // build propagation loop
  170. for (int i = 0; i < config_.num_executors - 1; ++i) {
  171. stage_executors_[i]->SetNext(stage_executors_[i + 1].get());
  172. }
  173. stage_executors_[config_.num_executors - 1]->SetNext(stage_executors_[0].get());
  174. return SUCCESS;
  175. }
  176. Status HybridModelPipelineExecutor::Execute(HybridModelExecutor::ExecuteArgs &args) {
  177. int loop_count = args.num_loops;
  178. GE_CHECK_GE(loop_count, kMinLoopCount);
  179. auto &inputs = args.inputs;
  180. auto &input_desc = args.input_desc;
  181. // Start schedulers
  182. std::vector<std::future<Status>> futures;
  183. for (size_t i = 0; i < stage_executors_.size(); ++i) {
  184. GELOGD("Starting executor %zu", i);
  185. auto executor = stage_executors_[i].get();
  186. executor->Reset();
  187. auto future = std::async(
  188. [loop_count, executor, inputs, input_desc]() { return executor->Start(inputs, input_desc, loop_count); });
  189. futures.emplace_back(std::move(future));
  190. }
  191. // Push initial tasks
  192. GELOGD("Start to execute with loops, loop count = %d", loop_count);
  193. config_.iteration_end = iteration_ + loop_count;
  194. for (int i = 0; i < config_.num_stages; ++i) {
  195. StageExecutor::StageTask task_info;
  196. task_info.stage = i;
  197. task_info.iteration = iteration_;
  198. stage_executors_[0]->ExecuteAsync(task_info);
  199. }
  200. // Wait for end of iterations
  201. bool has_error = false;
  202. for (size_t i = 0; i < stage_executors_.size(); ++i) {
  203. GELOGD("Start to sync result of executor[%zu]", i);
  204. auto ret = futures[i].get();
  205. if (ret != SUCCESS) {
  206. GELOGE(ret, "[Executor: %zu] Failed to schedule tasks.", i);
  207. has_error = true;
  208. continue;
  209. }
  210. ret = stage_executors_[i]->Synchronize();
  211. if (ret != SUCCESS) {
  212. GELOGE(ret, "[Executor: %zu] Failed to synchronize result.", i);
  213. has_error = true;
  214. continue;
  215. }
  216. }
  217. // record for profiling analyzer
  218. RECORD_MODEL_EXECUTION_EVENT(&context_, "[Cleanup] End");
  219. if (context_.profiler != nullptr) {
  220. context_.profiler->Dump(std::cout);
  221. }
  222. iteration_ = config_.iteration_end;
  223. if (has_error) {
  224. GELOGE(FAILED, "Error occurred while execution");
  225. return FAILED;
  226. }
  227. auto last_iter_executor_idx = loop_count % stage_executors_.size();
  228. GE_CHK_STATUS_RET(stage_executors_[last_iter_executor_idx]->GetOutputs(args.outputs, args.output_desc),
  229. "Failed to get output from executor[%zu]", last_iter_executor_idx);
  230. return SUCCESS;
  231. }
  232. HybridModelPipelineExecutor::~HybridModelPipelineExecutor() {
  233. GELOGD("~HybridModelPipelineExecutor()");
  234. for (auto &executor : stage_executors_) {
  235. (void)executor->context_.profiler.release();
  236. }
  237. }
  238. } // namespace hybrid
  239. } // namespace ge

图引擎模块(GE)是MindSpore的一个子模块,其代码由C++实现,位于前端模块ME和底层硬件之间,起到承接作用。图引擎模块以ME下发的图作为输入,然后进行一系列的深度图优化操作,最后输出一张可以在底层硬件上高效运行的图。GE针对昇腾AI处理器的硬件结构特点,做了特定的优化工作,以此来充分发挥出昇腾AI处理器的强大算力。在进行模型训练/推理时,GE会被自动调用而用户并不感知。GE主要由GE API和GE Core两部分组成,详细的架构图如下所示