From 664634d0708eac7d677af140dc9d2851e66ba781 Mon Sep 17 00:00:00 2001 From: chuxing Date: Thu, 31 Dec 2020 12:33:06 +0800 Subject: [PATCH] Handle EndOfSequence --- .../executor/hybrid_execution_context.cc | 27 ++++++++++++++++++ ge/hybrid/executor/hybrid_execution_context.h | 2 ++ .../executor/hybrid_model_async_executor.cc | 4 +-- ge/hybrid/executor/hybrid_model_executor.cc | 1 + ge/hybrid/executor/hybrid_model_executor.h | 1 + ge/hybrid/executor/subgraph_executor.cc | 11 +++++++- .../aicpu/aicpu_node_executor.cc | 15 +++++++++- .../node_executor/aicpu/aicpu_node_executor.h | 1 + .../compiledsubgraph/known_node_executor.cc | 28 +++++++++++++++++-- .../compiledsubgraph/known_node_executor.h | 6 ++-- ge/hybrid/node_executor/task_context.cc | 4 +++ ge/hybrid/node_executor/task_context.h | 2 ++ 12 files changed, 94 insertions(+), 8 deletions(-) diff --git a/ge/hybrid/executor/hybrid_execution_context.cc b/ge/hybrid/executor/hybrid_execution_context.cc index 491220be..77089e15 100644 --- a/ge/hybrid/executor/hybrid_execution_context.cc +++ b/ge/hybrid/executor/hybrid_execution_context.cc @@ -18,6 +18,12 @@ namespace ge { namespace hybrid { +namespace { +const uint32_t kEndOfSequence = 0x0704000a; +const uint32_t kEndOfSequenceNew = 507005; +const int32_t kModelAbortNormal = 0x0704000e; +const int32_t kModelAbortNormalNew = 507024; +} // namespace void GraphExecutionContext::SetErrorCode(Status error_code) { std::lock_guard lk(mu); this->status = error_code; @@ -27,5 +33,26 @@ Status GraphExecutionContext::GetStatus() const { std::lock_guard lk(mu); return this->status; } + +Status GraphExecutionContext::Synchronize(rtStream_t rt_stream) { + auto rt_ret = rtStreamSynchronize(rt_stream); + if (rt_ret == SUCCESS) { + return SUCCESS; + } + + if (rt_ret == kEndOfSequence || rt_ret == kEndOfSequenceNew) { + GELOGI("Got end of sequence"); + is_eos_ = true; + return SUCCESS; + } + + if (rt_ret == kModelAbortNormal || rt_ret == kModelAbortNormalNew) { + GELOGI("The model with multiple datasets aborts normally"); + return SUCCESS; + } + + GELOGE(RT_FAILED, "Failed to invoke rtStreamSynchronize, ret = %d", rt_ret); + return RT_FAILED; +} } // namespace hybrid } // namespace ge \ No newline at end of file diff --git a/ge/hybrid/executor/hybrid_execution_context.h b/ge/hybrid/executor/hybrid_execution_context.h index f1c25290..49c54d2f 100644 --- a/ge/hybrid/executor/hybrid_execution_context.h +++ b/ge/hybrid/executor/hybrid_execution_context.h @@ -36,6 +36,7 @@ namespace hybrid { struct GraphExecutionContext { void SetErrorCode(Status error_code); Status GetStatus() const; + Status Synchronize(rtStream_t rt_stream); uint64_t session_id = 0; const HybridModel *model = nullptr; @@ -49,6 +50,7 @@ struct GraphExecutionContext { DumpProperties dump_properties; bool trace_enabled = false; bool dump_enabled = false; + std::atomic_bool is_eos_; long profiling_level = 0; long iteration = 0; Status status = SUCCESS; diff --git a/ge/hybrid/executor/hybrid_model_async_executor.cc b/ge/hybrid/executor/hybrid_model_async_executor.cc index ba717a2d..c17ff0d9 100644 --- a/ge/hybrid/executor/hybrid_model_async_executor.cc +++ b/ge/hybrid/executor/hybrid_model_async_executor.cc @@ -24,7 +24,7 @@ namespace ge { namespace hybrid { namespace { -int kDataOutputIndex = 0; +const int kDataOutputIndex = 0; } HybridModelAsyncExecutor::HybridModelAsyncExecutor(HybridModel *model) : model_(model), run_flag_(false) { @@ -162,7 +162,7 @@ Status HybridModelAsyncExecutor::HandleResult(Status exec_ret, OutputData *output_data) { GELOGD("Start to handle result. model id = %u, data index = %u, execution ret = %u", model_id_, data_id, exec_ret); std::vector output_tensor_info_list; - if (exec_ret == END_OF_SEQUENCE) { + if (args.is_eos) { GELOGW("End of sequence, model id = %u", model_id_); return OnComputeDone(data_id, END_OF_SEQUENCE, output_tensor_info_list); } diff --git a/ge/hybrid/executor/hybrid_model_executor.cc b/ge/hybrid/executor/hybrid_model_executor.cc index e17998db..ee933090 100755 --- a/ge/hybrid/executor/hybrid_model_executor.cc +++ b/ge/hybrid/executor/hybrid_model_executor.cc @@ -58,6 +58,7 @@ Status HybridModelExecutor::Execute(HybridModelExecutor::ExecuteArgs &args) { context_.profiler->Reset(); } + args.is_eos = context_.is_eos_; context_.iteration += 1; return SUCCESS; } diff --git a/ge/hybrid/executor/hybrid_model_executor.h b/ge/hybrid/executor/hybrid_model_executor.h index 04aef6a5..6299d4ff 100644 --- a/ge/hybrid/executor/hybrid_model_executor.h +++ b/ge/hybrid/executor/hybrid_model_executor.h @@ -31,6 +31,7 @@ class HybridModelExecutor { std::vector input_desc; std::vector outputs; std::vector output_desc; + bool is_eos = false; }; HybridModelExecutor(HybridModel *model, uint32_t device_id, rtStream_t stream); diff --git a/ge/hybrid/executor/subgraph_executor.cc b/ge/hybrid/executor/subgraph_executor.cc index 4b6dddab..1b2024c7 100644 --- a/ge/hybrid/executor/subgraph_executor.cc +++ b/ge/hybrid/executor/subgraph_executor.cc @@ -240,6 +240,10 @@ Status SubgraphExecutor::PrepareNodes() { } if (!ready_queue_.Push(p_node_state)) { + if (context_->is_eos_) { + GELOGD("Got end of sequence"); + return SUCCESS; + } GELOGE(INTERNAL_ERROR, "[%s] Error occurs while launching tasks. quit from preparing nodes.", graph_item_->GetName().c_str()); return INTERNAL_ERROR; @@ -295,6 +299,11 @@ Status SubgraphExecutor::LaunchTasks() { "[%s] Execute node failed.", node_state->GetName().c_str()); + if (context_->is_eos_) { + GELOGD("Got end of sequence"); + ready_queue_.Stop(); + return SUCCESS; + } GELOGD("[%s] Done executing node successfully.", node_state->GetName().c_str()); } } @@ -350,7 +359,7 @@ Status SubgraphExecutor::GetOutputs(vector &outputs, std::vectorGetName().c_str()); - GE_CHK_RT_RET(rtStreamSynchronize(context_->stream)); + GE_CHK_STATUS_RET_NOLOG(context_->Synchronize(context_->stream)); GELOGD("[%s] Done synchronizing successfully.", graph_item_->GetName().c_str()); return SUCCESS; } diff --git a/ge/hybrid/node_executor/aicpu/aicpu_node_executor.cc b/ge/hybrid/node_executor/aicpu/aicpu_node_executor.cc index 43f4f6d2..0b34ecc3 100755 --- a/ge/hybrid/node_executor/aicpu/aicpu_node_executor.cc +++ b/ge/hybrid/node_executor/aicpu/aicpu_node_executor.cc @@ -19,6 +19,7 @@ #include "common/formats/formats.h" #include "aicpu/common/aicpu_task_struct.h" #include "graph/load/new_model_manager/model_manager.h" +#include "graph/utils/node_utils.h" #include "hybrid/executor/hybrid_execution_context.h" #include "hybrid/model/hybrid_model.h" #include "opskernel_manager/ops_kernel_builder_manager.h" @@ -188,6 +189,10 @@ Status AicpuNodeTaskBase::ExecuteAsync(TaskContext &context, std::functionis_eos_) { + GELOGD("[%s] Got end of sequence", node_name_.c_str()); + return SUCCESS; + } uint32_t task_id = 0; uint32_t stream_id = 0; @@ -346,7 +351,11 @@ Status AicpuTfNodeTask::Init(const HybridModel &model) { GE_CHK_RT_RET(rtMemcpy(kernel_buf_->GetData(), sizeof(STR_FWK_OP_KERNEL), &fwk_op_kernel, sizeof(STR_FWK_OP_KERNEL), RT_MEMCPY_HOST_TO_DEVICE)); - + auto node_type = NodeUtils::GetNodeType(node_item_->node); + if (node_type.find(GETNEXT) != string::npos) { + GELOGD("[%s] Is GetNext, set need sync to true, node type = %s", node_name_.c_str(), node_type.c_str()); + need_sync_ = true; + } GELOGI("Node[%s] init end.", node_name_.c_str()); return SUCCESS; } @@ -616,6 +625,10 @@ Status AicpuTfNodeTask::LaunchTask(TaskContext &context) { GE_CHK_RT_RET(rtKernelLaunchEx(kernel_buf_->GetData(), kernel_buf_->GetSize(), flag, context.GetStream())); RECORD_EXECUTION_EVENT(context.GetExecutionContext(), node_name_.c_str(), "[AicpuTfNodertKernelLaunchEx] End"); GELOGD("Node[%s] launch end.", node_name_.c_str()); + if (need_sync_) { + GELOGD("[%s] Task needs sync", node_name_.c_str()); + GE_CHK_STATUS_RET_NOLOG(context.Synchronize()); + } return SUCCESS; } diff --git a/ge/hybrid/node_executor/aicpu/aicpu_node_executor.h b/ge/hybrid/node_executor/aicpu/aicpu_node_executor.h index 1205b190..8f0b1d0a 100644 --- a/ge/hybrid/node_executor/aicpu/aicpu_node_executor.h +++ b/ge/hybrid/node_executor/aicpu/aicpu_node_executor.h @@ -144,6 +144,7 @@ class AicpuTfNodeTask : public AicpuNodeTaskBase { std::unique_ptr copy_input_data_size_dev_; std::unique_ptr copy_input_src_dev_; std::unique_ptr copy_input_dst_dev_; + bool need_sync_ = false; }; class AicpuNodeTask : public AicpuNodeTaskBase { diff --git a/ge/hybrid/node_executor/compiledsubgraph/known_node_executor.cc b/ge/hybrid/node_executor/compiledsubgraph/known_node_executor.cc index c914ac1b..3ef0a50f 100755 --- a/ge/hybrid/node_executor/compiledsubgraph/known_node_executor.cc +++ b/ge/hybrid/node_executor/compiledsubgraph/known_node_executor.cc @@ -21,6 +21,8 @@ #include "common/ge/ge_util.h" #include "graph/attr_value.h" #include "graph/debug/ge_attr_define.h" +#include "graph/utils/graph_utils.h" +#include "graph/utils/node_utils.h" #include "graph/load/new_model_manager/model_utils.h" #include "graph/load/new_model_manager/model_manager.h" #include "hybrid/executor/hybrid_execution_context.h" @@ -29,7 +31,7 @@ namespace ge { namespace hybrid { REGISTER_NODE_EXECUTOR_BUILDER(NodeExecutorManager::ExecutorType::COMPILED_SUBGRAPH, KnownNodeExecutor); -Status KnownNodeTask:: ExecuteAsync(TaskContext &context, std::function done_callback) { +Status KnownNodeTask::ExecuteAsync(TaskContext &context, std::function done_callback) { RECORD_EXECUTION_EVENT(context.GetExecutionContext(), context.GetNodeName(), "[KnownNodeTaskExecuteAsync] Start"); GELOGD("[%s] KnownNodeTask::ExecuteAsync in.", context.GetNodeName()); if (davinci_model_->GetTaskList().empty()) { @@ -58,6 +60,10 @@ Status KnownNodeTask:: ExecuteAsync(TaskContext &context, std::functionAssign(ge_model), "KnownNodeExecutor::LoadTask davincimodel assign failed."); - task = MakeShared(davinci_model); + bool need_sync = false; + GE_CHK_STATUS_RET_NOLOG(NeedSync(*ge_model, need_sync)); + task = MakeShared(davinci_model, need_sync); GE_CHECK_NOTNULL(task); GELOGI("[%s] KnownNodeExecutor::LoadTask success.", node->GetName().c_str()); return SUCCESS; @@ -186,5 +194,21 @@ Status KnownNodeExecutor::ExecuteTask(NodeTask &task, TaskContext &context, RECORD_EXECUTION_EVENT(context.GetExecutionContext(), context.GetNodeName(), "[KnownNodeExecutorExecuteTask] End"); return SUCCESS; } + +Status KnownNodeExecutor::NeedSync(GeModel &ge_model, bool &need_sync) { + auto compute_graph = GraphUtils::GetComputeGraph(ge_model.GetGraph()); + GE_CHECK_NOTNULL(compute_graph); + for (auto &node : compute_graph->GetAllNodes()) { + auto type = NodeUtils::GetNodeType(node); + if (type == GETNEXT) { + GELOGD("Contains GetNext node: %s", node->GetName().c_str()); + need_sync = true; + return SUCCESS; + } + } + + need_sync = false; + return SUCCESS; +} } // namespace hybrid } // namespace ge diff --git a/ge/hybrid/node_executor/compiledsubgraph/known_node_executor.h b/ge/hybrid/node_executor/compiledsubgraph/known_node_executor.h index 2dde993b..dfd6bbd0 100644 --- a/ge/hybrid/node_executor/compiledsubgraph/known_node_executor.h +++ b/ge/hybrid/node_executor/compiledsubgraph/known_node_executor.h @@ -27,8 +27,8 @@ class HybridModel; class KnownNodeTask : public NodeTask { public: - explicit KnownNodeTask(std::shared_ptr davinci_model) - : davinci_model_(davinci_model) + explicit KnownNodeTask(std::shared_ptr davinci_model, bool need_sync) + : davinci_model_(davinci_model), need_sync_(need_sync) {} ~KnownNodeTask() {} @@ -39,6 +39,7 @@ class KnownNodeTask : public NodeTask { private: std::shared_ptr davinci_model_ = nullptr; bool load_flag_ = false; + bool need_sync_; }; class KnownNodeExecutor : public NodeExecutor { @@ -48,6 +49,7 @@ class KnownNodeExecutor : public NodeExecutor { Status ExecuteTask(NodeTask &task, TaskContext &context, const std::function &callback) const; ~KnownNodeExecutor() {} private: + static Status NeedSync(GeModel &ge_model, bool &need_sync); std::shared_ptr davinci_model_ = nullptr; }; } // namespace hybrid diff --git a/ge/hybrid/node_executor/task_context.cc b/ge/hybrid/node_executor/task_context.cc index d15ea978..6488fbbe 100644 --- a/ge/hybrid/node_executor/task_context.cc +++ b/ge/hybrid/node_executor/task_context.cc @@ -494,5 +494,9 @@ const DumpProperties &TaskContext::GetDumpProperties() const { bool TaskContext::NeedCallback() { return node_item_->has_observer || IsDumpEnabled() || execution_context_->profiling_level > 0; } + +Status TaskContext::Synchronize() { + return execution_context_->Synchronize(GetStream()); +} } // namespace hybrid } // namespace ge diff --git a/ge/hybrid/node_executor/task_context.h b/ge/hybrid/node_executor/task_context.h index 0e85a8e3..9ddde322 100644 --- a/ge/hybrid/node_executor/task_context.h +++ b/ge/hybrid/node_executor/task_context.h @@ -102,6 +102,8 @@ class TaskContext { uint32_t GetStreamId() const; void SetStreamId(uint32_t stream_id); + Status Synchronize(); + bool IsForceInferShape() const; void SetForceInferShape(bool force_infer_shape); void *handle_ = nullptr;