@@ -43,7 +43,7 @@ Status GraphExecutionContext::Synchronize(rtStream_t rt_stream) { | |||||
if (rt_ret == kEndOfSequence || rt_ret == kEndOfSequenceNew) { | if (rt_ret == kEndOfSequence || rt_ret == kEndOfSequenceNew) { | ||||
GELOGI("Got end of sequence"); | GELOGI("Got end of sequence"); | ||||
is_eos_ = true; | is_eos_ = true; | ||||
return SUCCESS; | |||||
return END_OF_SEQUENCE; | |||||
} | } | ||||
if (rt_ret == kModelAbortNormal || rt_ret == kModelAbortNormalNew) { | if (rt_ret == kModelAbortNormal || rt_ret == kModelAbortNormalNew) { | ||||
@@ -31,6 +31,20 @@ | |||||
#include "hybrid/executor/rt_callback_manager.h" | #include "hybrid/executor/rt_callback_manager.h" | ||||
#include "hybrid/model/hybrid_model.h" | #include "hybrid/model/hybrid_model.h" | ||||
// If expr is not SUCCESS, print the log and return the same value | |||||
#define HYBRID_CHK_STATUS_RET(expr, ...) \ | |||||
do { \ | |||||
const ge::Status _status = (expr); \ | |||||
if (_status != ge::SUCCESS) { \ | |||||
if (_status == ge::END_OF_SEQUENCE) { \ | |||||
GELOGD("Got end of sequence"); \ | |||||
} else { \ | |||||
GELOGE(_status, __VA_ARGS__); \ | |||||
} \ | |||||
return _status; \ | |||||
} \ | |||||
} while (0) | |||||
namespace ge { | namespace ge { | ||||
namespace hybrid { | namespace hybrid { | ||||
struct GraphExecutionContext { | struct GraphExecutionContext { | ||||
@@ -98,6 +98,11 @@ Status ShapeInferenceState::AwaitShapesReady(const GraphExecutionContext &contex | |||||
break; | break; | ||||
} | } | ||||
if (context.is_eos_) { | |||||
GELOGD("[%s] Await pending shape cancelled due to end of sequence", node_item.NodeName().c_str()); | |||||
return END_OF_SEQUENCE; | |||||
} | |||||
if (context.GetStatus() != SUCCESS) { | if (context.GetStatus() != SUCCESS) { | ||||
GELOGE(FAILED, "[%s] Await pending shape cancelled", node_item.NodeName().c_str()); | GELOGE(FAILED, "[%s] Await pending shape cancelled", node_item.NodeName().c_str()); | ||||
break; | break; | ||||
@@ -114,7 +119,8 @@ Status ShapeInferenceState::AwaitShapesReady(const GraphExecutionContext &contex | |||||
auto idx = p.first; | auto idx = p.first; | ||||
auto &future = p.second; | auto &future = p.second; | ||||
RECORD_SHAPE_INFERENCE_EVENT(&context, node_item.NodeName().c_str(), "[AwaitShape] [idx = %u] Start", idx); | RECORD_SHAPE_INFERENCE_EVENT(&context, node_item.NodeName().c_str(), "[AwaitShape] [idx = %u] Start", idx); | ||||
auto src_tensor_desc = future.GetTensorDesc(); | |||||
GeTensorDescPtr src_tensor_desc; | |||||
GE_CHK_STATUS_RET_NOLOG(future.GetTensorDesc(src_tensor_desc)); | |||||
GE_CHECK_NOTNULL(src_tensor_desc); | GE_CHECK_NOTNULL(src_tensor_desc); | ||||
RECORD_SHAPE_INFERENCE_EVENT(&context, node_item.NodeName().c_str(), "[AwaitShape] [idx = %u] End", idx); | RECORD_SHAPE_INFERENCE_EVENT(&context, node_item.NodeName().c_str(), "[AwaitShape] [idx = %u] End", idx); | ||||
@@ -156,10 +162,11 @@ Status NodeState::AwaitInputTensors(GraphExecutionContext &context) const { | |||||
node_item_->NodeName().c_str(), | node_item_->NodeName().c_str(), | ||||
"[AwaitNodeDone] [%s] Start", | "[AwaitNodeDone] [%s] Start", | ||||
src_node->GetName().c_str()); | src_node->GetName().c_str()); | ||||
if (!subgraph_context_->Await(src_node)) { | |||||
GELOGE(INTERNAL_ERROR, "[%s] Await node [%s] failed.", GetName().c_str(), src_node->GetName().c_str()); | |||||
return INTERNAL_ERROR; | |||||
} | |||||
HYBRID_CHK_STATUS_RET(subgraph_context_->Await(src_node), | |||||
"[%s] Await node [%s] failed.", | |||||
GetName().c_str(), | |||||
src_node->GetName().c_str()); | |||||
RECORD_EXECUTION_EVENT(&context, | RECORD_EXECUTION_EVENT(&context, | ||||
node_item_->NodeName().c_str(), | node_item_->NodeName().c_str(), | ||||
@@ -183,24 +190,18 @@ Status NodeState::WaitForPrepareDone() { | |||||
Status ShapeFuture::Get(GeShape &ori_shape, GeShape &shape) { | Status ShapeFuture::Get(GeShape &ori_shape, GeShape &shape) { | ||||
GELOGD("Start to wait node: %s for getting shape", src_node_->GetName().c_str()); | GELOGD("Start to wait node: %s for getting shape", src_node_->GetName().c_str()); | ||||
if (!subgraph_context_->Await(src_node_)) { | |||||
GELOGE(INTERNAL_ERROR, "cancelled"); | |||||
return INTERNAL_ERROR; | |||||
} | |||||
HYBRID_CHK_STATUS_RET(subgraph_context_->Await(src_node_), "cancelled"); | |||||
shape = src_node_->GetOpDesc()->MutableOutputDesc(src_index_)->MutableShape(); | shape = src_node_->GetOpDesc()->MutableOutputDesc(src_index_)->MutableShape(); | ||||
ori_shape = src_node_->GetOpDesc()->MutableOutputDesc(src_index_)->GetOriginShape(); | ori_shape = src_node_->GetOpDesc()->MutableOutputDesc(src_index_)->GetOriginShape(); | ||||
GELOGD("Get shape from %s:%u. shape = [%s]", src_node_->GetName().c_str(), src_index_, shape.ToString().c_str()); | GELOGD("Get shape from %s:%u. shape = [%s]", src_node_->GetName().c_str(), src_index_, shape.ToString().c_str()); | ||||
return SUCCESS; | return SUCCESS; | ||||
} | } | ||||
GeTensorDescPtr ShapeFuture::GetTensorDesc() { | |||||
Status ShapeFuture::GetTensorDesc(GeTensorDescPtr &tensor_desc) { | |||||
GELOGD("Start to wait node: %s for getting shape", src_node_->GetName().c_str()); | GELOGD("Start to wait node: %s for getting shape", src_node_->GetName().c_str()); | ||||
if (!subgraph_context_->Await(src_node_)) { | |||||
GELOGE(INTERNAL_ERROR, "cancelled"); | |||||
return nullptr; | |||||
} | |||||
return src_node_->GetOpDesc()->MutableOutputDesc(src_index_); | |||||
HYBRID_CHK_STATUS_RET(subgraph_context_->Await(src_node_), "cancelled"); | |||||
tensor_desc = src_node_->GetOpDesc()->MutableOutputDesc(src_index_); | |||||
return SUCCESS; | |||||
} | } | ||||
} // namespace hybrid | } // namespace hybrid | ||||
} // namespace ge | } // namespace ge |
@@ -35,7 +35,7 @@ class ShapeFuture { | |||||
ShapeFuture(NodePtr src_node, uint32_t src_index, SubgraphContext *subgraph_context); | ShapeFuture(NodePtr src_node, uint32_t src_index, SubgraphContext *subgraph_context); | ||||
~ShapeFuture() = default; | ~ShapeFuture() = default; | ||||
Status Get(GeShape &ori_shape, GeShape &shape); | Status Get(GeShape &ori_shape, GeShape &shape); | ||||
GeTensorDescPtr GetTensorDesc(); | |||||
Status GetTensorDesc(GeTensorDescPtr &tensor_desc); | |||||
private: | private: | ||||
NodePtr src_node_; | NodePtr src_node_; | ||||
@@ -20,8 +20,8 @@ | |||||
namespace ge { | namespace ge { | ||||
namespace hybrid { | namespace hybrid { | ||||
SubgraphContext::SubgraphContext(const GraphItem *graph_item) : graph_item_(graph_item) { | |||||
SubgraphContext::SubgraphContext(const GraphItem *graph_item, const GraphExecutionContext *execution_context) | |||||
: graph_item_(graph_item), execution_context_(execution_context) { | |||||
} | } | ||||
Status SubgraphContext::Init() { | Status SubgraphContext::Init() { | ||||
@@ -111,12 +111,22 @@ Status SubgraphContext::GetOutputs(std::vector<TensorValue> &outputs) { | |||||
return SUCCESS; | return SUCCESS; | ||||
} | } | ||||
bool SubgraphContext::Await(const NodePtr &node) { | |||||
return node_done_manager_.Await(node); | |||||
Status SubgraphContext::Await(const NodePtr &node) { | |||||
if (node_done_manager_.Await(node)) { | |||||
return SUCCESS; | |||||
} | |||||
if (execution_context_->is_eos_) { | |||||
return END_OF_SEQUENCE; | |||||
} | |||||
return FAILED; | |||||
} | } | ||||
void SubgraphContext::OnError(Status error) { | void SubgraphContext::OnError(Status error) { | ||||
GELOGE(error, "[%s] Error occurred while executing graph.", graph_item_->GetName().c_str()); | |||||
if (error != END_OF_SEQUENCE) { | |||||
GELOGE(error, "[%s] Error occurred while executing graph.", graph_item_->GetName().c_str()); | |||||
} | |||||
node_done_manager_.Destroy(); | node_done_manager_.Destroy(); | ||||
} | } | ||||
@@ -20,6 +20,7 @@ | |||||
#include <vector> | #include <vector> | ||||
#include "hybrid/common/tensor_value.h" | #include "hybrid/common/tensor_value.h" | ||||
#include "hybrid/executor/hybrid_model_executor.h" | |||||
#include "hybrid/executor/node_state.h" | #include "hybrid/executor/node_state.h" | ||||
#include "hybrid/executor/node_done_manager.h" | #include "hybrid/executor/node_done_manager.h" | ||||
#include "hybrid/model/graph_item.h" | #include "hybrid/model/graph_item.h" | ||||
@@ -29,7 +30,7 @@ namespace ge { | |||||
namespace hybrid { | namespace hybrid { | ||||
class SubgraphContext { | class SubgraphContext { | ||||
public: | public: | ||||
explicit SubgraphContext(const GraphItem *graph_item); | |||||
explicit SubgraphContext(const GraphItem *graph_item, const GraphExecutionContext *execution_context); | |||||
~SubgraphContext() = default; | ~SubgraphContext() = default; | ||||
Status Init(); | Status Init(); | ||||
@@ -43,11 +44,12 @@ class SubgraphContext { | |||||
Status GetInput(int index, TensorValue &tensor); | Status GetInput(int index, TensorValue &tensor); | ||||
Status GetOutputs(std::vector<TensorValue> &outputs); | Status GetOutputs(std::vector<TensorValue> &outputs); | ||||
bool Await(const NodePtr &node); | |||||
Status Await(const NodePtr &node); | |||||
void NodeDone(const NodePtr &node); | void NodeDone(const NodePtr &node); | ||||
private: | private: | ||||
friend class TaskContext; | friend class TaskContext; | ||||
const GraphExecutionContext *execution_context_; | |||||
const GraphItem *graph_item_; | const GraphItem *graph_item_; | ||||
std::mutex mu_; | std::mutex mu_; | ||||
std::vector<TensorValue> all_inputs_; | std::vector<TensorValue> all_inputs_; | ||||
@@ -163,10 +163,10 @@ Status SubgraphExecutor::ExecuteAsyncForKnownShape(const std::vector<TensorValue | |||||
known_shape_task_context_ = TaskContext::Create(*node_item, context_, subgraph_context_.get()); | known_shape_task_context_ = TaskContext::Create(*node_item, context_, subgraph_context_.get()); | ||||
GE_CHECK_NOTNULL(known_shape_task_context_); | GE_CHECK_NOTNULL(known_shape_task_context_); | ||||
GE_CHK_STATUS_RET(ExecutionEngine::ExecuteAsync(*node_state, known_shape_task_context_, *context_), | |||||
"[%s] Failed to execute node [%s] for known subgraph.", | |||||
graph_item_->GetName().c_str(), | |||||
known_shape_task_context_->GetNodeName()); | |||||
HYBRID_CHK_STATUS_RET(ExecutionEngine::ExecuteAsync(*node_state, known_shape_task_context_, *context_), | |||||
"[%s] Failed to execute node [%s] for known subgraph.", | |||||
graph_item_->GetName().c_str(), | |||||
known_shape_task_context_->GetNodeName()); | |||||
GELOGD("[%s] Done execute non-dynamic subgraph successfully.", graph_item_->GetName().c_str()); | GELOGD("[%s] Done execute non-dynamic subgraph successfully.", graph_item_->GetName().c_str()); | ||||
return SUCCESS; | return SUCCESS; | ||||
@@ -252,10 +252,10 @@ Status SubgraphExecutor::PrepareNodes() { | |||||
Status SubgraphExecutor::InferShape(ShapeInferenceEngine *shape_inference_engine, NodeState &node_state) { | Status SubgraphExecutor::InferShape(ShapeInferenceEngine *shape_inference_engine, NodeState &node_state) { | ||||
const auto &node_item = *node_state.GetNodeItem(); | const auto &node_item = *node_state.GetNodeItem(); | ||||
GE_CHK_STATUS_RET(shape_inference_engine->InferShape(node_state), | |||||
"[%s] Failed to InferShape.", node_state.GetName().c_str()); | |||||
GE_CHK_STATUS_RET(shape_inference_engine->PropagateOutputShapes(node_item), | |||||
"[%s] Failed to PropagateOutputShapes.", node_state.GetName().c_str()); | |||||
HYBRID_CHK_STATUS_RET(shape_inference_engine->InferShape(node_state), | |||||
"[%s] Failed to InferShape.", node_state.GetName().c_str()); | |||||
HYBRID_CHK_STATUS_RET(shape_inference_engine->PropagateOutputShapes(node_item), | |||||
"[%s] Failed to PropagateOutputShapes.", node_state.GetName().c_str()); | |||||
return SUCCESS; | return SUCCESS; | ||||
} | } | ||||
@@ -299,15 +299,9 @@ Status SubgraphExecutor::LaunchTasks() { | |||||
GE_CHECK_NOTNULL(task_context); | GE_CHECK_NOTNULL(task_context); | ||||
task_context->SetForceInferShape(force_infer_shape_); | task_context->SetForceInferShape(force_infer_shape_); | ||||
auto shared_task_context = std::shared_ptr<TaskContext>(task_context.release()); | auto shared_task_context = std::shared_ptr<TaskContext>(task_context.release()); | ||||
GE_CHK_STATUS_RET(ExecutionEngine::ExecuteAsync(*node_state, shared_task_context, *context_), | |||||
"[%s] Execute node failed.", | |||||
node_state->GetName().c_str()); | |||||
if (context_->is_eos_) { | |||||
GELOGD("Got end of sequence"); | |||||
ready_queue_.Stop(); | |||||
return SUCCESS; | |||||
} | |||||
HYBRID_CHK_STATUS_RET(ExecutionEngine::ExecuteAsync(*node_state, shared_task_context, *context_), | |||||
"[%s] Execute node failed.", | |||||
node_state->GetName().c_str()); | |||||
GELOGD("[%s] Done executing node successfully.", node_state->GetName().c_str()); | GELOGD("[%s] Done executing node successfully.", node_state->GetName().c_str()); | ||||
} | } | ||||
} | } | ||||
@@ -324,7 +318,6 @@ Status SubgraphExecutor::ScheduleTasks() { | |||||
GELOGD("[%s] Start to execute subgraph.", graph_item_->GetName().c_str()); | GELOGD("[%s] Start to execute subgraph.", graph_item_->GetName().c_str()); | ||||
auto ret = LaunchTasks(); | auto ret = LaunchTasks(); | ||||
if (ret != SUCCESS) { | if (ret != SUCCESS) { | ||||
GELOGE(ret, "[%s] Failed to execute subgraph.", graph_item_->GetName().c_str()); | |||||
subgraph_context_->OnError(ret); | subgraph_context_->OnError(ret); | ||||
context_->SetErrorCode(ret); | context_->SetErrorCode(ret); | ||||
ready_queue_.Stop(); | ready_queue_.Stop(); | ||||
@@ -406,9 +406,9 @@ Status ExecutionEngine::DoExecuteAsync(NodeState &node_state, | |||||
// Wait for dependent nodes(DEPEND_COMPUTE), so that the input tensors are valid. | // Wait for dependent nodes(DEPEND_COMPUTE), so that the input tensors are valid. | ||||
RECORD_EXECUTION_EVENT(&context, task_context.GetNodeName(), "[AwaitDependents] Start"); | RECORD_EXECUTION_EVENT(&context, task_context.GetNodeName(), "[AwaitDependents] Start"); | ||||
GE_CHK_STATUS_RET(node_state.AwaitInputTensors(context), | |||||
"[%s] Failed to wait for dependent nodes.", | |||||
node_state.GetName().c_str()); | |||||
HYBRID_CHK_STATUS_RET(node_state.AwaitInputTensors(context), | |||||
"[%s] Failed to wait for dependent nodes.", | |||||
node_state.GetName().c_str()); | |||||
const auto &node_item = *node_state.GetNodeItem(); | const auto &node_item = *node_state.GetNodeItem(); | ||||
auto executor = node_item.node_executor; | auto executor = node_item.node_executor; | ||||
@@ -438,9 +438,9 @@ Status ExecutionEngine::DoExecuteAsync(NodeState &node_state, | |||||
}); | }); | ||||
} | } | ||||
RECORD_EXECUTION_EVENT(&context, task_context.GetNodeName(), "[ExecuteTask] Start"); | RECORD_EXECUTION_EVENT(&context, task_context.GetNodeName(), "[ExecuteTask] Start"); | ||||
GE_CHK_STATUS_RET(node_item.node_executor->ExecuteTask(*task, task_context, callback), | |||||
"[%s] Failed to execute task", | |||||
node_state.GetName().c_str()); | |||||
HYBRID_CHK_STATUS_RET(node_item.node_executor->ExecuteTask(*task, task_context, callback), | |||||
"[%s] Failed to execute task", | |||||
node_state.GetName().c_str()); | |||||
RECORD_EXECUTION_EVENT(&context, task_context.GetNodeName(), "[ExecuteTask] End"); | RECORD_EXECUTION_EVENT(&context, task_context.GetNodeName(), "[ExecuteTask] End"); | ||||
GELOGD("[%s] Done task launch successfully.", node_state.GetName().c_str()); | GELOGD("[%s] Done task launch successfully.", node_state.GetName().c_str()); | ||||
@@ -99,11 +99,7 @@ Status ShapeInferenceEngine::AwaitDependentNodes(NodeState &node_state) { | |||||
node_item.NodeName().c_str(), | node_item.NodeName().c_str(), | ||||
"[AwaitNodeDone] [%s] Start", | "[AwaitNodeDone] [%s] Start", | ||||
src_node->GetName().c_str()); | src_node->GetName().c_str()); | ||||
if (!subgraph_context_->Await(src_node)) { | |||||
GELOGE(INTERNAL_ERROR, "[%s] Await node failed.", src_node->GetName().c_str()); | |||||
return INTERNAL_ERROR; | |||||
} | |||||
HYBRID_CHK_STATUS_RET(subgraph_context_->Await(src_node), "[%s] Await node failed.", src_node->GetName().c_str()); | |||||
RECORD_SHAPE_INFERENCE_EVENT(execution_context_, | RECORD_SHAPE_INFERENCE_EVENT(execution_context_, | ||||
node_item.NodeName().c_str(), | node_item.NodeName().c_str(), | ||||
"[AwaitNodeDone] [%s] End", | "[AwaitNodeDone] [%s] End", | ||||
@@ -188,11 +188,7 @@ Status AicpuNodeTaskBase::ExecuteAsync(TaskContext &context, std::function<void( | |||||
RECORD_EXECUTION_EVENT(context.GetExecutionContext(), context.GetNodeName(), "[AicpuNodeTaskBaseExecuteAsync] Start"); | RECORD_EXECUTION_EVENT(context.GetExecutionContext(), context.GetNodeName(), "[AicpuNodeTaskBaseExecuteAsync] Start"); | ||||
GELOGD("Node[%s] execute async start. unknown_type=%d.", node_name_.c_str(), unknown_type_); | GELOGD("Node[%s] execute async start. unknown_type=%d.", node_name_.c_str(), unknown_type_); | ||||
GE_CHK_STATUS_RET(LaunchTask(context)); | |||||
if (context.GetExecutionContext()->is_eos_) { | |||||
GELOGD("[%s] Got end of sequence", node_name_.c_str()); | |||||
return SUCCESS; | |||||
} | |||||
HYBRID_CHK_STATUS_RET(LaunchTask(context), "[%s] Failed to launch task", node_name_.c_str()); | |||||
uint32_t task_id = 0; | uint32_t task_id = 0; | ||||
uint32_t stream_id = 0; | uint32_t stream_id = 0; | ||||
@@ -20,6 +20,7 @@ | |||||
#include "graph/utils/node_utils.h" | #include "graph/utils/node_utils.h" | ||||
#include "init/gelib.h" | #include "init/gelib.h" | ||||
#include "graph/utils/tensor_utils.h" | #include "graph/utils/tensor_utils.h" | ||||
#include "hybrid/executor/hybrid_execution_context.h" | |||||
#include "hybrid/model/hybrid_model.h" | #include "hybrid/model/hybrid_model.h" | ||||
#include "graph/debug/ge_attr_define.h" | #include "graph/debug/ge_attr_define.h" | ||||
#include "opskernel_manager/ops_kernel_builder_manager.h" | #include "opskernel_manager/ops_kernel_builder_manager.h" | ||||
@@ -44,9 +45,9 @@ Status NodeExecutor::PrepareTask(NodeTask &task, TaskContext &context) const { | |||||
} | } | ||||
Status NodeExecutor::ExecuteTask(NodeTask &task, TaskContext &context, const std::function<void()> &callback) const { | Status NodeExecutor::ExecuteTask(NodeTask &task, TaskContext &context, const std::function<void()> &callback) const { | ||||
GE_CHK_STATUS_RET(task.ExecuteAsync(context, callback), | |||||
"Failed to execute task. node = %s", | |||||
context.GetNodeItem().NodeName().c_str()); | |||||
HYBRID_CHK_STATUS_RET(task.ExecuteAsync(context, callback), | |||||
"Failed to execute task. node = %s", | |||||
context.GetNodeItem().NodeName().c_str()); | |||||
return SUCCESS; | return SUCCESS; | ||||
} | } | ||||