diff --git a/ge/hybrid/executor/hybrid_execution_context.cc b/ge/hybrid/executor/hybrid_execution_context.cc index 13a6c9ec..87207e94 100644 --- a/ge/hybrid/executor/hybrid_execution_context.cc +++ b/ge/hybrid/executor/hybrid_execution_context.cc @@ -43,7 +43,7 @@ Status GraphExecutionContext::Synchronize(rtStream_t rt_stream) { if (rt_ret == kEndOfSequence || rt_ret == kEndOfSequenceNew) { GELOGI("Got end of sequence"); is_eos_ = true; - return SUCCESS; + return END_OF_SEQUENCE; } if (rt_ret == kModelAbortNormal || rt_ret == kModelAbortNormalNew) { diff --git a/ge/hybrid/executor/hybrid_execution_context.h b/ge/hybrid/executor/hybrid_execution_context.h index 49c54d2f..c398e83d 100644 --- a/ge/hybrid/executor/hybrid_execution_context.h +++ b/ge/hybrid/executor/hybrid_execution_context.h @@ -31,6 +31,20 @@ #include "hybrid/executor/rt_callback_manager.h" #include "hybrid/model/hybrid_model.h" +// If expr is not SUCCESS, print the log and return the same value +#define HYBRID_CHK_STATUS_RET(expr, ...) \ + do { \ + const ge::Status _status = (expr); \ + if (_status != ge::SUCCESS) { \ + if (_status == ge::END_OF_SEQUENCE) { \ + GELOGD("Got end of sequence"); \ + } else { \ + GELOGE(_status, __VA_ARGS__); \ + } \ + return _status; \ + } \ + } while (0) + namespace ge { namespace hybrid { struct GraphExecutionContext { diff --git a/ge/hybrid/executor/node_state.cc b/ge/hybrid/executor/node_state.cc index ceed40b0..93c6c58c 100644 --- a/ge/hybrid/executor/node_state.cc +++ b/ge/hybrid/executor/node_state.cc @@ -98,6 +98,11 @@ Status ShapeInferenceState::AwaitShapesReady(const GraphExecutionContext &contex break; } + if (context.is_eos_) { + GELOGD("[%s] Await pending shape cancelled due to end of sequence", node_item.NodeName().c_str()); + return END_OF_SEQUENCE; + } + if (context.GetStatus() != SUCCESS) { GELOGE(FAILED, "[%s] Await pending shape cancelled", node_item.NodeName().c_str()); break; @@ -114,7 +119,8 @@ Status ShapeInferenceState::AwaitShapesReady(const GraphExecutionContext &contex auto idx = p.first; auto &future = p.second; RECORD_SHAPE_INFERENCE_EVENT(&context, node_item.NodeName().c_str(), "[AwaitShape] [idx = %u] Start", idx); - auto src_tensor_desc = future.GetTensorDesc(); + GeTensorDescPtr src_tensor_desc; + GE_CHK_STATUS_RET_NOLOG(future.GetTensorDesc(src_tensor_desc)); GE_CHECK_NOTNULL(src_tensor_desc); RECORD_SHAPE_INFERENCE_EVENT(&context, node_item.NodeName().c_str(), "[AwaitShape] [idx = %u] End", idx); @@ -156,10 +162,11 @@ Status NodeState::AwaitInputTensors(GraphExecutionContext &context) const { node_item_->NodeName().c_str(), "[AwaitNodeDone] [%s] Start", src_node->GetName().c_str()); - if (!subgraph_context_->Await(src_node)) { - GELOGE(INTERNAL_ERROR, "[%s] Await node [%s] failed.", GetName().c_str(), src_node->GetName().c_str()); - return INTERNAL_ERROR; - } + + HYBRID_CHK_STATUS_RET(subgraph_context_->Await(src_node), + "[%s] Await node [%s] failed.", + GetName().c_str(), + src_node->GetName().c_str()); RECORD_EXECUTION_EVENT(&context, node_item_->NodeName().c_str(), @@ -183,24 +190,18 @@ Status NodeState::WaitForPrepareDone() { Status ShapeFuture::Get(GeShape &ori_shape, GeShape &shape) { GELOGD("Start to wait node: %s for getting shape", src_node_->GetName().c_str()); - if (!subgraph_context_->Await(src_node_)) { - GELOGE(INTERNAL_ERROR, "cancelled"); - return INTERNAL_ERROR; - } - + HYBRID_CHK_STATUS_RET(subgraph_context_->Await(src_node_), "cancelled"); shape = src_node_->GetOpDesc()->MutableOutputDesc(src_index_)->MutableShape(); ori_shape = src_node_->GetOpDesc()->MutableOutputDesc(src_index_)->GetOriginShape(); GELOGD("Get shape from %s:%u. shape = [%s]", src_node_->GetName().c_str(), src_index_, shape.ToString().c_str()); return SUCCESS; } -GeTensorDescPtr ShapeFuture::GetTensorDesc() { +Status ShapeFuture::GetTensorDesc(GeTensorDescPtr &tensor_desc) { GELOGD("Start to wait node: %s for getting shape", src_node_->GetName().c_str()); - if (!subgraph_context_->Await(src_node_)) { - GELOGE(INTERNAL_ERROR, "cancelled"); - return nullptr; - } - return src_node_->GetOpDesc()->MutableOutputDesc(src_index_); + HYBRID_CHK_STATUS_RET(subgraph_context_->Await(src_node_), "cancelled"); + tensor_desc = src_node_->GetOpDesc()->MutableOutputDesc(src_index_); + return SUCCESS; } } // namespace hybrid } // namespace ge diff --git a/ge/hybrid/executor/node_state.h b/ge/hybrid/executor/node_state.h index 312e177f..02a362b4 100644 --- a/ge/hybrid/executor/node_state.h +++ b/ge/hybrid/executor/node_state.h @@ -35,7 +35,7 @@ class ShapeFuture { ShapeFuture(NodePtr src_node, uint32_t src_index, SubgraphContext *subgraph_context); ~ShapeFuture() = default; Status Get(GeShape &ori_shape, GeShape &shape); - GeTensorDescPtr GetTensorDesc(); + Status GetTensorDesc(GeTensorDescPtr &tensor_desc); private: NodePtr src_node_; diff --git a/ge/hybrid/executor/subgraph_context.cc b/ge/hybrid/executor/subgraph_context.cc index 923c2aa3..0889e51e 100644 --- a/ge/hybrid/executor/subgraph_context.cc +++ b/ge/hybrid/executor/subgraph_context.cc @@ -20,8 +20,8 @@ namespace ge { namespace hybrid { -SubgraphContext::SubgraphContext(const GraphItem *graph_item) : graph_item_(graph_item) { - +SubgraphContext::SubgraphContext(const GraphItem *graph_item, const GraphExecutionContext *execution_context) + : graph_item_(graph_item), execution_context_(execution_context) { } Status SubgraphContext::Init() { @@ -111,12 +111,22 @@ Status SubgraphContext::GetOutputs(std::vector &outputs) { return SUCCESS; } -bool SubgraphContext::Await(const NodePtr &node) { - return node_done_manager_.Await(node); +Status SubgraphContext::Await(const NodePtr &node) { + if (node_done_manager_.Await(node)) { + return SUCCESS; + } + + if (execution_context_->is_eos_) { + return END_OF_SEQUENCE; + } + + return FAILED; } void SubgraphContext::OnError(Status error) { - GELOGE(error, "[%s] Error occurred while executing graph.", graph_item_->GetName().c_str()); + if (error != END_OF_SEQUENCE) { + GELOGE(error, "[%s] Error occurred while executing graph.", graph_item_->GetName().c_str()); + } node_done_manager_.Destroy(); } diff --git a/ge/hybrid/executor/subgraph_context.h b/ge/hybrid/executor/subgraph_context.h index b86765f7..3eb66b02 100644 --- a/ge/hybrid/executor/subgraph_context.h +++ b/ge/hybrid/executor/subgraph_context.h @@ -20,6 +20,7 @@ #include #include "hybrid/common/tensor_value.h" +#include "hybrid/executor/hybrid_model_executor.h" #include "hybrid/executor/node_state.h" #include "hybrid/executor/node_done_manager.h" #include "hybrid/model/graph_item.h" @@ -29,7 +30,7 @@ namespace ge { namespace hybrid { class SubgraphContext { public: - explicit SubgraphContext(const GraphItem *graph_item); + explicit SubgraphContext(const GraphItem *graph_item, const GraphExecutionContext *execution_context); ~SubgraphContext() = default; Status Init(); @@ -43,11 +44,12 @@ class SubgraphContext { Status GetInput(int index, TensorValue &tensor); Status GetOutputs(std::vector &outputs); - bool Await(const NodePtr &node); + Status Await(const NodePtr &node); void NodeDone(const NodePtr &node); private: friend class TaskContext; + const GraphExecutionContext *execution_context_; const GraphItem *graph_item_; std::mutex mu_; std::vector all_inputs_; diff --git a/ge/hybrid/executor/subgraph_executor.cc b/ge/hybrid/executor/subgraph_executor.cc index 6286ea8c..b59f1acb 100644 --- a/ge/hybrid/executor/subgraph_executor.cc +++ b/ge/hybrid/executor/subgraph_executor.cc @@ -163,10 +163,10 @@ Status SubgraphExecutor::ExecuteAsyncForKnownShape(const std::vectorGetName().c_str(), - known_shape_task_context_->GetNodeName()); + HYBRID_CHK_STATUS_RET(ExecutionEngine::ExecuteAsync(*node_state, known_shape_task_context_, *context_), + "[%s] Failed to execute node [%s] for known subgraph.", + graph_item_->GetName().c_str(), + known_shape_task_context_->GetNodeName()); GELOGD("[%s] Done execute non-dynamic subgraph successfully.", graph_item_->GetName().c_str()); return SUCCESS; @@ -252,10 +252,10 @@ Status SubgraphExecutor::PrepareNodes() { Status SubgraphExecutor::InferShape(ShapeInferenceEngine *shape_inference_engine, NodeState &node_state) { const auto &node_item = *node_state.GetNodeItem(); - GE_CHK_STATUS_RET(shape_inference_engine->InferShape(node_state), - "[%s] Failed to InferShape.", node_state.GetName().c_str()); - GE_CHK_STATUS_RET(shape_inference_engine->PropagateOutputShapes(node_item), - "[%s] Failed to PropagateOutputShapes.", node_state.GetName().c_str()); + HYBRID_CHK_STATUS_RET(shape_inference_engine->InferShape(node_state), + "[%s] Failed to InferShape.", node_state.GetName().c_str()); + HYBRID_CHK_STATUS_RET(shape_inference_engine->PropagateOutputShapes(node_item), + "[%s] Failed to PropagateOutputShapes.", node_state.GetName().c_str()); return SUCCESS; } @@ -299,15 +299,9 @@ Status SubgraphExecutor::LaunchTasks() { GE_CHECK_NOTNULL(task_context); task_context->SetForceInferShape(force_infer_shape_); auto shared_task_context = std::shared_ptr(task_context.release()); - GE_CHK_STATUS_RET(ExecutionEngine::ExecuteAsync(*node_state, shared_task_context, *context_), - "[%s] Execute node failed.", - node_state->GetName().c_str()); - - if (context_->is_eos_) { - GELOGD("Got end of sequence"); - ready_queue_.Stop(); - return SUCCESS; - } + HYBRID_CHK_STATUS_RET(ExecutionEngine::ExecuteAsync(*node_state, shared_task_context, *context_), + "[%s] Execute node failed.", + node_state->GetName().c_str()); GELOGD("[%s] Done executing node successfully.", node_state->GetName().c_str()); } } @@ -324,7 +318,6 @@ Status SubgraphExecutor::ScheduleTasks() { GELOGD("[%s] Start to execute subgraph.", graph_item_->GetName().c_str()); auto ret = LaunchTasks(); if (ret != SUCCESS) { - GELOGE(ret, "[%s] Failed to execute subgraph.", graph_item_->GetName().c_str()); subgraph_context_->OnError(ret); context_->SetErrorCode(ret); ready_queue_.Stop(); diff --git a/ge/hybrid/executor/worker/execution_engine.cc b/ge/hybrid/executor/worker/execution_engine.cc index 21dd8e4b..ea70ad69 100755 --- a/ge/hybrid/executor/worker/execution_engine.cc +++ b/ge/hybrid/executor/worker/execution_engine.cc @@ -406,9 +406,9 @@ Status ExecutionEngine::DoExecuteAsync(NodeState &node_state, // Wait for dependent nodes(DEPEND_COMPUTE), so that the input tensors are valid. RECORD_EXECUTION_EVENT(&context, task_context.GetNodeName(), "[AwaitDependents] Start"); - GE_CHK_STATUS_RET(node_state.AwaitInputTensors(context), - "[%s] Failed to wait for dependent nodes.", - node_state.GetName().c_str()); + HYBRID_CHK_STATUS_RET(node_state.AwaitInputTensors(context), + "[%s] Failed to wait for dependent nodes.", + node_state.GetName().c_str()); const auto &node_item = *node_state.GetNodeItem(); auto executor = node_item.node_executor; @@ -438,9 +438,9 @@ Status ExecutionEngine::DoExecuteAsync(NodeState &node_state, }); } RECORD_EXECUTION_EVENT(&context, task_context.GetNodeName(), "[ExecuteTask] Start"); - GE_CHK_STATUS_RET(node_item.node_executor->ExecuteTask(*task, task_context, callback), - "[%s] Failed to execute task", - node_state.GetName().c_str()); + HYBRID_CHK_STATUS_RET(node_item.node_executor->ExecuteTask(*task, task_context, callback), + "[%s] Failed to execute task", + node_state.GetName().c_str()); RECORD_EXECUTION_EVENT(&context, task_context.GetNodeName(), "[ExecuteTask] End"); GELOGD("[%s] Done task launch successfully.", node_state.GetName().c_str()); diff --git a/ge/hybrid/executor/worker/shape_inference_engine.cc b/ge/hybrid/executor/worker/shape_inference_engine.cc index 66d0ede2..56ae3ea3 100755 --- a/ge/hybrid/executor/worker/shape_inference_engine.cc +++ b/ge/hybrid/executor/worker/shape_inference_engine.cc @@ -99,11 +99,7 @@ Status ShapeInferenceEngine::AwaitDependentNodes(NodeState &node_state) { node_item.NodeName().c_str(), "[AwaitNodeDone] [%s] Start", src_node->GetName().c_str()); - if (!subgraph_context_->Await(src_node)) { - GELOGE(INTERNAL_ERROR, "[%s] Await node failed.", src_node->GetName().c_str()); - return INTERNAL_ERROR; - } - + HYBRID_CHK_STATUS_RET(subgraph_context_->Await(src_node), "[%s] Await node failed.", src_node->GetName().c_str()); RECORD_SHAPE_INFERENCE_EVENT(execution_context_, node_item.NodeName().c_str(), "[AwaitNodeDone] [%s] End", diff --git a/ge/hybrid/node_executor/aicpu/aicpu_node_executor.cc b/ge/hybrid/node_executor/aicpu/aicpu_node_executor.cc index 0b34ecc3..63ce65e9 100755 --- a/ge/hybrid/node_executor/aicpu/aicpu_node_executor.cc +++ b/ge/hybrid/node_executor/aicpu/aicpu_node_executor.cc @@ -188,11 +188,7 @@ Status AicpuNodeTaskBase::ExecuteAsync(TaskContext &context, std::functionis_eos_) { - GELOGD("[%s] Got end of sequence", node_name_.c_str()); - return SUCCESS; - } + HYBRID_CHK_STATUS_RET(LaunchTask(context), "[%s] Failed to launch task", node_name_.c_str()); uint32_t task_id = 0; uint32_t stream_id = 0; diff --git a/ge/hybrid/node_executor/node_executor.cc b/ge/hybrid/node_executor/node_executor.cc index fe89464b..02427b91 100755 --- a/ge/hybrid/node_executor/node_executor.cc +++ b/ge/hybrid/node_executor/node_executor.cc @@ -20,6 +20,7 @@ #include "graph/utils/node_utils.h" #include "init/gelib.h" #include "graph/utils/tensor_utils.h" +#include "hybrid/executor/hybrid_execution_context.h" #include "hybrid/model/hybrid_model.h" #include "graph/debug/ge_attr_define.h" #include "opskernel_manager/ops_kernel_builder_manager.h" @@ -44,9 +45,9 @@ Status NodeExecutor::PrepareTask(NodeTask &task, TaskContext &context) const { } Status NodeExecutor::ExecuteTask(NodeTask &task, TaskContext &context, const std::function &callback) const { - GE_CHK_STATUS_RET(task.ExecuteAsync(context, callback), - "Failed to execute task. node = %s", - context.GetNodeItem().NodeName().c_str()); + HYBRID_CHK_STATUS_RET(task.ExecuteAsync(context, callback), + "Failed to execute task. node = %s", + context.GetNodeItem().NodeName().c_str()); return SUCCESS; }