Browse Source

!840 Handle EndOfSequnce

From: @xchu42
Reviewed-by: @ji_chen
Signed-off-by:
tags/v1.2.0
mindspore-ci-bot Gitee 3 years ago
parent
commit
70e1869d53
17 changed files with 170 additions and 88 deletions
  1. +27
    -0
      ge/hybrid/executor/hybrid_execution_context.cc
  2. +16
    -0
      ge/hybrid/executor/hybrid_execution_context.h
  3. +5
    -4
      ge/hybrid/executor/hybrid_model_async_executor.cc
  4. +9
    -5
      ge/hybrid/executor/hybrid_model_executor.cc
  5. +1
    -0
      ge/hybrid/executor/hybrid_model_executor.h
  6. +17
    -16
      ge/hybrid/executor/node_state.cc
  7. +1
    -1
      ge/hybrid/executor/node_state.h
  8. +16
    -5
      ge/hybrid/executor/subgraph_context.cc
  9. +4
    -2
      ge/hybrid/executor/subgraph_context.h
  10. +45
    -39
      ge/hybrid/executor/subgraph_executor.cc
  11. +6
    -6
      ge/hybrid/executor/worker/execution_engine.cc
  12. +1
    -5
      ge/hybrid/executor/worker/shape_inference_engine.cc
  13. +11
    -2
      ge/hybrid/node_executor/aicpu/aicpu_node_executor.cc
  14. +1
    -0
      ge/hybrid/node_executor/aicpu/aicpu_node_executor.h
  15. +4
    -3
      ge/hybrid/node_executor/node_executor.cc
  16. +4
    -0
      ge/hybrid/node_executor/task_context.cc
  17. +2
    -0
      ge/hybrid/node_executor/task_context.h

+ 27
- 0
ge/hybrid/executor/hybrid_execution_context.cc View File

@@ -18,6 +18,12 @@

namespace ge {
namespace hybrid {
namespace {
const uint32_t kEndOfSequence = 0x0704000a;
const uint32_t kEndOfSequenceNew = 507005;
const int32_t kModelAbortNormal = 0x0704000e;
const int32_t kModelAbortNormalNew = 507024;
} // namespace
void GraphExecutionContext::SetErrorCode(Status error_code) {
std::lock_guard<std::mutex> lk(mu);
this->status = error_code;
@@ -27,5 +33,26 @@ Status GraphExecutionContext::GetStatus() const {
std::lock_guard<std::mutex> lk(mu);
return this->status;
}

Status GraphExecutionContext::Synchronize(rtStream_t rt_stream) {
auto rt_ret = rtStreamSynchronize(rt_stream);
if (rt_ret == RT_ERROR_NONE) {
return SUCCESS;
}

if (rt_ret == kEndOfSequence || rt_ret == kEndOfSequenceNew) {
GELOGI("Got end of sequence");
is_eos_ = true;
return END_OF_SEQUENCE;
}

if (rt_ret == kModelAbortNormal || rt_ret == kModelAbortNormalNew) {
GELOGI("The model with multiple datasets aborts normally");
return SUCCESS;
}

GELOGE(RT_FAILED, "Failed to invoke rtStreamSynchronize, ret = %d", rt_ret);
return RT_FAILED;
}
} // namespace hybrid
} // namespace ge

+ 16
- 0
ge/hybrid/executor/hybrid_execution_context.h View File

@@ -31,11 +31,26 @@
#include "hybrid/executor/rt_callback_manager.h"
#include "hybrid/model/hybrid_model.h"

// If expr is not SUCCESS, print the log and return the same value
#define HYBRID_CHK_STATUS_RET(expr, ...) \
do { \
const ge::Status _status = (expr); \
if (_status != ge::SUCCESS) { \
if (_status == ge::END_OF_SEQUENCE) { \
GELOGD("Got end of sequence"); \
} else { \
GELOGE(_status, __VA_ARGS__); \
} \
return _status; \
} \
} while (0)

namespace ge {
namespace hybrid {
struct GraphExecutionContext {
void SetErrorCode(Status error_code);
Status GetStatus() const;
Status Synchronize(rtStream_t rt_stream);

uint64_t session_id = 0;
const HybridModel *model = nullptr;
@@ -49,6 +64,7 @@ struct GraphExecutionContext {
DumpProperties dump_properties;
bool trace_enabled = false;
bool dump_enabled = false;
std::atomic_bool is_eos_;
long profiling_level = 0;
long iteration = 0;
Status status = SUCCESS;


+ 5
- 4
ge/hybrid/executor/hybrid_model_async_executor.cc View File

@@ -24,7 +24,7 @@
namespace ge {
namespace hybrid {
namespace {
int kDataOutputIndex = 0;
const int kDataOutputIndex = 0;
}
HybridModelAsyncExecutor::HybridModelAsyncExecutor(HybridModel *model)
: model_(model), run_flag_(false) {
@@ -157,9 +157,10 @@ Status HybridModelAsyncExecutor::HandleResult(Status exec_ret,
OutputData *output_data) {
GELOGD("Start to handle result. model id = %u, data index = %u, execution ret = %u", model_id_, data_id, exec_ret);
std::vector<ge::OutputTensorInfo> output_tensor_info_list;
if (exec_ret == END_OF_SEQUENCE) {
GELOGW("End of sequence, model id = %u", model_id_);
return OnComputeDone(data_id, END_OF_SEQUENCE, output_tensor_info_list);
if (args.is_eos) {
GELOGI("End of sequence, model id = %u", model_id_);
GE_CHK_STATUS_RET_NOLOG(OnComputeDone(data_id, END_OF_SEQUENCE, output_tensor_info_list));
return SUCCESS;
}

if (exec_ret != SUCCESS) {


+ 9
- 5
ge/hybrid/executor/hybrid_model_executor.cc View File

@@ -50,15 +50,18 @@ Status HybridModelExecutor::Execute(HybridModelExecutor::ExecuteArgs &args) {
auto ret = ExecuteGraphInternal(executor, args);
Cleanup();
RECORD_MODEL_EXECUTION_EVENT(&context_, "[Cleanup] End");
GE_CHK_STATUS_RET(ret, "Failed to execute model");
GELOGD("Model executed successfully.");

if (context_.profiler != nullptr) {
context_.profiler->Dump(std::cout);
context_.profiler->Reset();
}

context_.iteration += 1;
if (ret == END_OF_SEQUENCE) {
args.is_eos = true;
} else {
GE_CHK_STATUS_RET(ret, "Failed to execute model");
}
return SUCCESS;
}

@@ -68,13 +71,13 @@ Status HybridModelExecutor::ExecuteGraphInternal(SubgraphExecutor &executor,
GE_CHK_STATUS_RET_NOLOG(ResetExecutionContext(context_));
RECORD_MODEL_EXECUTION_EVENT(&context_, "[InitContext] End");

GE_CHK_STATUS_RET(executor.ExecuteAsync(args.inputs, args.input_desc), "Failed to execute partitioned call.");
HYBRID_CHK_STATUS_RET(executor.ExecuteAsync(args.inputs, args.input_desc), "Failed to execute partitioned call.");
RECORD_MODEL_EXECUTION_EVENT(&context_, "[ExecuteAsync] End");

GE_CHK_STATUS_RET(executor.Synchronize(), "Failed to sync root graph.");
HYBRID_CHK_STATUS_RET(executor.Synchronize(), "Failed to sync root graph.");
RECORD_MODEL_EXECUTION_EVENT(&context_, "[Synchronize] End");

GE_CHK_STATUS_RET(executor.GetOutputs(args.outputs, args.output_desc), "Failed to get outputs");
HYBRID_CHK_STATUS_RET(executor.GetOutputs(args.outputs, args.output_desc), "Failed to get outputs");
RECORD_MODEL_EXECUTION_EVENT(&context_, "[GetOutput] End");
return SUCCESS;
}
@@ -94,6 +97,7 @@ Status HybridModelExecutor::InitExecutionContext() {

context_.stream = stream_;
context_.model = model_;
context_.is_eos_ = false;
context_.session_id = ::ge::GetContext().SessionId();
context_.ge_context = &GetThreadLocalContext();
GELOGD("session id from model = %lu, from context = %lu", model_->GetSessionId(), context_.session_id);


+ 1
- 0
ge/hybrid/executor/hybrid_model_executor.h View File

@@ -31,6 +31,7 @@ class HybridModelExecutor {
std::vector<ConstGeTensorDescPtr> input_desc;
std::vector<TensorValue> outputs;
std::vector<ConstGeTensorDescPtr> output_desc;
bool is_eos = false;
};

HybridModelExecutor(HybridModel *model, uint32_t device_id, rtStream_t stream);


+ 17
- 16
ge/hybrid/executor/node_state.cc View File

@@ -98,6 +98,11 @@ Status ShapeInferenceState::AwaitShapesReady(const GraphExecutionContext &contex
break;
}

if (context.is_eos_) {
GELOGD("[%s] Await pending shape cancelled due to end of sequence", node_item.NodeName().c_str());
return END_OF_SEQUENCE;
}

if (context.GetStatus() != SUCCESS) {
GELOGE(FAILED, "[%s] Await pending shape cancelled", node_item.NodeName().c_str());
break;
@@ -114,7 +119,8 @@ Status ShapeInferenceState::AwaitShapesReady(const GraphExecutionContext &contex
auto idx = p.first;
auto &future = p.second;
RECORD_SHAPE_INFERENCE_EVENT(&context, node_item.NodeName().c_str(), "[AwaitShape] [idx = %u] Start", idx);
auto src_tensor_desc = future.GetTensorDesc();
GeTensorDescPtr src_tensor_desc;
GE_CHK_STATUS_RET_NOLOG(future.GetTensorDesc(src_tensor_desc));
GE_CHECK_NOTNULL(src_tensor_desc);
RECORD_SHAPE_INFERENCE_EVENT(&context, node_item.NodeName().c_str(), "[AwaitShape] [idx = %u] End", idx);

@@ -156,10 +162,11 @@ Status NodeState::AwaitInputTensors(GraphExecutionContext &context) const {
node_item_->NodeName().c_str(),
"[AwaitNodeDone] [%s] Start",
src_node->GetName().c_str());
if (!subgraph_context_->Await(src_node)) {
GELOGE(INTERNAL_ERROR, "[%s] Await node [%s] failed.", GetName().c_str(), src_node->GetName().c_str());
return INTERNAL_ERROR;
}

HYBRID_CHK_STATUS_RET(subgraph_context_->Await(src_node),
"[%s] Await node [%s] failed.",
GetName().c_str(),
src_node->GetName().c_str());

RECORD_EXECUTION_EVENT(&context,
node_item_->NodeName().c_str(),
@@ -183,24 +190,18 @@ Status NodeState::WaitForPrepareDone() {

Status ShapeFuture::Get(GeShape &ori_shape, GeShape &shape) {
GELOGD("Start to wait node: %s for getting shape", src_node_->GetName().c_str());
if (!subgraph_context_->Await(src_node_)) {
GELOGE(INTERNAL_ERROR, "cancelled");
return INTERNAL_ERROR;
}

HYBRID_CHK_STATUS_RET(subgraph_context_->Await(src_node_), "cancelled");
shape = src_node_->GetOpDesc()->MutableOutputDesc(src_index_)->MutableShape();
ori_shape = src_node_->GetOpDesc()->MutableOutputDesc(src_index_)->GetOriginShape();
GELOGD("Get shape from %s:%u. shape = [%s]", src_node_->GetName().c_str(), src_index_, shape.ToString().c_str());
return SUCCESS;
}

GeTensorDescPtr ShapeFuture::GetTensorDesc() {
Status ShapeFuture::GetTensorDesc(GeTensorDescPtr &tensor_desc) {
GELOGD("Start to wait node: %s for getting shape", src_node_->GetName().c_str());
if (!subgraph_context_->Await(src_node_)) {
GELOGE(INTERNAL_ERROR, "cancelled");
return nullptr;
}
return src_node_->GetOpDesc()->MutableOutputDesc(src_index_);
HYBRID_CHK_STATUS_RET(subgraph_context_->Await(src_node_), "cancelled");
tensor_desc = src_node_->GetOpDesc()->MutableOutputDesc(src_index_);
return SUCCESS;
}
} // namespace hybrid
} // namespace ge

+ 1
- 1
ge/hybrid/executor/node_state.h View File

@@ -35,7 +35,7 @@ class ShapeFuture {
ShapeFuture(NodePtr src_node, uint32_t src_index, SubgraphContext *subgraph_context);
~ShapeFuture() = default;
Status Get(GeShape &ori_shape, GeShape &shape);
GeTensorDescPtr GetTensorDesc();
Status GetTensorDesc(GeTensorDescPtr &tensor_desc);

private:
NodePtr src_node_;


+ 16
- 5
ge/hybrid/executor/subgraph_context.cc View File

@@ -17,11 +17,12 @@
#include "subgraph_context.h"

#include "common/debug/log.h"
#include "hybrid/executor/hybrid_model_executor.h"

namespace ge {
namespace hybrid {
SubgraphContext::SubgraphContext(const GraphItem *graph_item) : graph_item_(graph_item) {
SubgraphContext::SubgraphContext(const GraphItem *graph_item, const GraphExecutionContext *execution_context)
: graph_item_(graph_item), execution_context_(execution_context) {
}

Status SubgraphContext::Init() {
@@ -111,12 +112,22 @@ Status SubgraphContext::GetOutputs(std::vector<TensorValue> &outputs) {
return SUCCESS;
}

bool SubgraphContext::Await(const NodePtr &node) {
return node_done_manager_.Await(node);
Status SubgraphContext::Await(const NodePtr &node) {
if (node_done_manager_.Await(node)) {
return SUCCESS;
}

if (execution_context_->is_eos_) {
return END_OF_SEQUENCE;
}

return FAILED;
}

void SubgraphContext::OnError(Status error) {
GELOGE(error, "[%s] Error occurred while executing graph.", graph_item_->GetName().c_str());
if (error != END_OF_SEQUENCE) {
GELOGE(error, "[%s] Error occurred while executing graph.", graph_item_->GetName().c_str());
}
node_done_manager_.Destroy();
}



+ 4
- 2
ge/hybrid/executor/subgraph_context.h View File

@@ -20,6 +20,7 @@
#include <vector>

#include "hybrid/common/tensor_value.h"
#include "hybrid/executor/hybrid_execution_context.h"
#include "hybrid/executor/node_state.h"
#include "hybrid/executor/node_done_manager.h"
#include "hybrid/model/graph_item.h"
@@ -29,7 +30,7 @@ namespace ge {
namespace hybrid {
class SubgraphContext {
public:
explicit SubgraphContext(const GraphItem *graph_item);
explicit SubgraphContext(const GraphItem *graph_item, const GraphExecutionContext *execution_context);
~SubgraphContext() = default;

Status Init();
@@ -43,12 +44,13 @@ class SubgraphContext {
Status GetInput(int index, TensorValue &tensor);
Status GetOutputs(std::vector<TensorValue> &outputs);

bool Await(const NodePtr &node);
Status Await(const NodePtr &node);
void NodeDone(const NodePtr &node);

private:
friend class TaskContext;
const GraphItem *graph_item_;
const GraphExecutionContext *execution_context_;
std::mutex mu_;
std::vector<TensorValue> all_inputs_;
std::vector<TensorValue> all_outputs_;


+ 45
- 39
ge/hybrid/executor/subgraph_executor.cc View File

@@ -40,7 +40,7 @@ SubgraphExecutor::~SubgraphExecutor() {

Status SubgraphExecutor::Init(const std::vector<TensorValue> &inputs,
const std::vector<ConstGeTensorDescPtr> &input_desc) {
subgraph_context_.reset(new(std::nothrow)SubgraphContext(graph_item_));
subgraph_context_.reset(new(std::nothrow)SubgraphContext(graph_item_, context_));
GE_CHECK_NOTNULL(subgraph_context_);
GE_CHK_STATUS_RET(subgraph_context_->Init(), "[%s] Failed to init subgraph context.", graph_item_->GetName().c_str());

@@ -139,7 +139,7 @@ Status SubgraphExecutor::ExecuteAsync(const std::vector<TensorValue> &inputs,
return ExecuteAsyncForKnownShape(inputs);
}

GE_CHK_STATUS_RET(ScheduleTasks(), "[%s] Failed to execute tasks.", graph_item_->GetName().c_str());
HYBRID_CHK_STATUS_RET(ScheduleTasks(), "[%s] Failed to execute tasks.", graph_item_->GetName().c_str());
GELOGD("[%s] Done executing subgraph successfully.", graph_item_->GetName().c_str());
return SUCCESS;
}
@@ -163,10 +163,10 @@ Status SubgraphExecutor::ExecuteAsyncForKnownShape(const std::vector<TensorValue
known_shape_task_context_ = TaskContext::Create(*node_item, context_, subgraph_context_.get());
GE_CHECK_NOTNULL(known_shape_task_context_);

GE_CHK_STATUS_RET(ExecutionEngine::ExecuteAsync(*node_state, known_shape_task_context_, *context_),
"[%s] Failed to execute node [%s] for known subgraph.",
graph_item_->GetName().c_str(),
known_shape_task_context_->GetNodeName());
HYBRID_CHK_STATUS_RET(ExecutionEngine::ExecuteAsync(*node_state, known_shape_task_context_, *context_),
"[%s] Failed to execute node [%s] for known subgraph.",
graph_item_->GetName().c_str(),
known_shape_task_context_->GetNodeName());

GELOGD("[%s] Done execute non-dynamic subgraph successfully.", graph_item_->GetName().c_str());
return SUCCESS;
@@ -211,35 +211,34 @@ Status SubgraphExecutor::PrepareNodes() {
GE_CHECK_NOTNULL(node_state);
auto p_node_state = node_state.get();

if (node_item.node_type == NETOUTPUT) {
// Wait for all inputs become valid
// after PrepareNodes returned. all output tensors and shapes are valid
GE_CHK_STATUS_RET_NOLOG(p_node_state->GetShapeInferenceState().AwaitShapesReady(*context_));
GE_CHK_STATUS_RET_NOLOG(p_node_state->AwaitInputTensors(*context_));
continue;
}
if (node_item.node_type != NETOUTPUT) {
// only do shape inference and compilation for nodes with dynamic shapes.
if (node_item.is_dynamic) {
auto prepare_future = pre_run_pool_.commit([this, p_node_state]() -> Status {
GetContext().SetSessionId(context_->session_id);
GE_CHK_STATUS_RET_NOLOG(InferShape(shape_inference_engine_.get(), *p_node_state));
return PrepareForExecution(context_, *p_node_state);
});

// only do shape inference and compilation for nodes with dynamic shapes.
if (node_item.is_dynamic) {
auto prepare_future = pre_run_pool_.commit([this, p_node_state]() -> Status {
GetContext().SetSessionId(context_->session_id);
GE_CHK_STATUS_RET_NOLOG(InferShape(shape_inference_engine_.get(), *p_node_state));
return PrepareForExecution(context_, *p_node_state);
});

p_node_state->SetPrepareFuture(std::move(prepare_future));
} else {
GELOGD("[%s] Skipping shape inference and compilation for node with static shape.", node_item.NodeName().c_str());
if (node_item.kernel_task == nullptr) {
GELOGW("[%s] Node of static shape got no task.", node_item.NodeName().c_str());
GE_CHK_STATUS_RET(TaskCompileEngine::Compile(*p_node_state, context_),
"[%s] Failed to create task.", p_node_state->GetName().c_str());
p_node_state->SetPrepareFuture(std::move(prepare_future));
} else {
node_state->SetKernelTask(node_item.kernel_task);
GELOGD("[%s] Skipping shape inference and compilation for node with static shape.",
node_item.NodeName().c_str());
if (node_item.kernel_task == nullptr) {
GELOGW("[%s] Node of static shape got no task.", node_item.NodeName().c_str());
GE_CHK_STATUS_RET(TaskCompileEngine::Compile(*p_node_state, context_),
"[%s] Failed to create task.", p_node_state->GetName().c_str());
} else {
node_state->SetKernelTask(node_item.kernel_task);
}
}
}

if (!ready_queue_.Push(p_node_state)) {
if (context_->is_eos_) {
GELOGD("Got end of sequence");
return SUCCESS;
}
GELOGE(INTERNAL_ERROR, "[%s] Error occurs while launching tasks. quit from preparing nodes.",
graph_item_->GetName().c_str());
return INTERNAL_ERROR;
@@ -253,10 +252,10 @@ Status SubgraphExecutor::PrepareNodes() {

Status SubgraphExecutor::InferShape(ShapeInferenceEngine *shape_inference_engine, NodeState &node_state) {
const auto &node_item = *node_state.GetNodeItem();
GE_CHK_STATUS_RET(shape_inference_engine->InferShape(node_state),
"[%s] Failed to InferShape.", node_state.GetName().c_str());
GE_CHK_STATUS_RET(shape_inference_engine->PropagateOutputShapes(node_item),
"[%s] Failed to PropagateOutputShapes.", node_state.GetName().c_str());
HYBRID_CHK_STATUS_RET(shape_inference_engine->InferShape(node_state),
"[%s] Failed to InferShape.", node_state.GetName().c_str());
HYBRID_CHK_STATUS_RET(shape_inference_engine->PropagateOutputShapes(node_item),
"[%s] Failed to PropagateOutputShapes.", node_state.GetName().c_str());
return SUCCESS;
}

@@ -284,6 +283,15 @@ Status SubgraphExecutor::LaunchTasks() {
return SUCCESS;
}

if (node_state->GetType() == NETOUTPUT) {
// Wait for all inputs become valid
// after PrepareNodes returned. all output tensors and shapes are valid
GE_CHK_STATUS_RET_NOLOG(node_state->GetShapeInferenceState().AwaitShapesReady(*context_));
GE_CHK_STATUS_RET_NOLOG(node_state->AwaitInputTensors(*context_));
GELOGD("[%s] Done executing node successfully.", node_state->GetName().c_str());
continue;
}

GE_CHK_STATUS_RET_NOLOG(node_state->WaitForPrepareDone());

GELOGD("[%s] Start to execute.", node_state->GetName().c_str());
@@ -291,10 +299,9 @@ Status SubgraphExecutor::LaunchTasks() {
GE_CHECK_NOTNULL(task_context);
task_context->SetForceInferShape(force_infer_shape_);
auto shared_task_context = std::shared_ptr<TaskContext>(task_context.release());
GE_CHK_STATUS_RET(ExecutionEngine::ExecuteAsync(*node_state, shared_task_context, *context_),
"[%s] Execute node failed.",
node_state->GetName().c_str());

HYBRID_CHK_STATUS_RET(ExecutionEngine::ExecuteAsync(*node_state, shared_task_context, *context_),
"[%s] Execute node failed.",
node_state->GetName().c_str());
GELOGD("[%s] Done executing node successfully.", node_state->GetName().c_str());
}
}
@@ -311,7 +318,6 @@ Status SubgraphExecutor::ScheduleTasks() {
GELOGD("[%s] Start to execute subgraph.", graph_item_->GetName().c_str());
auto ret = LaunchTasks();
if (ret != SUCCESS) {
GELOGE(ret, "[%s] Failed to execute subgraph.", graph_item_->GetName().c_str());
subgraph_context_->OnError(ret);
context_->SetErrorCode(ret);
ready_queue_.Stop();
@@ -350,7 +356,7 @@ Status SubgraphExecutor::GetOutputs(vector<TensorValue> &outputs, std::vector<Co

Status SubgraphExecutor::Synchronize() {
GELOGD("[%s] Synchronize start.", graph_item_->GetName().c_str());
GE_CHK_RT_RET(rtStreamSynchronize(context_->stream));
GE_CHK_STATUS_RET_NOLOG(context_->Synchronize(context_->stream));
GELOGD("[%s] Done synchronizing successfully.", graph_item_->GetName().c_str());
return SUCCESS;
}


+ 6
- 6
ge/hybrid/executor/worker/execution_engine.cc View File

@@ -408,9 +408,9 @@ Status ExecutionEngine::DoExecuteAsync(NodeState &node_state,

// Wait for dependent nodes(DEPEND_COMPUTE), so that the input tensors are valid.
RECORD_EXECUTION_EVENT(&context, task_context.GetNodeName(), "[AwaitDependents] Start");
GE_CHK_STATUS_RET(node_state.AwaitInputTensors(context),
"[%s] Failed to wait for dependent nodes.",
node_state.GetName().c_str());
HYBRID_CHK_STATUS_RET(node_state.AwaitInputTensors(context),
"[%s] Failed to wait for dependent nodes.",
node_state.GetName().c_str());

const auto &node_item = *node_state.GetNodeItem();
auto executor = node_item.node_executor;
@@ -440,9 +440,9 @@ Status ExecutionEngine::DoExecuteAsync(NodeState &node_state,
});
}
RECORD_EXECUTION_EVENT(&context, task_context.GetNodeName(), "[ExecuteTask] Start");
GE_CHK_STATUS_RET(node_item.node_executor->ExecuteTask(*task, task_context, callback),
"[%s] Failed to execute task",
node_state.GetName().c_str());
HYBRID_CHK_STATUS_RET(node_item.node_executor->ExecuteTask(*task, task_context, callback),
"[%s] Failed to execute task",
node_state.GetName().c_str());
RECORD_EXECUTION_EVENT(&context, task_context.GetNodeName(), "[ExecuteTask] End");

GELOGD("[%s] Done task launch successfully.", node_state.GetName().c_str());


+ 1
- 5
ge/hybrid/executor/worker/shape_inference_engine.cc View File

@@ -99,11 +99,7 @@ Status ShapeInferenceEngine::AwaitDependentNodes(NodeState &node_state) {
node_item.NodeName().c_str(),
"[AwaitNodeDone] [%s] Start",
src_node->GetName().c_str());
if (!subgraph_context_->Await(src_node)) {
GELOGE(INTERNAL_ERROR, "[%s] Await node failed.", src_node->GetName().c_str());
return INTERNAL_ERROR;
}

HYBRID_CHK_STATUS_RET(subgraph_context_->Await(src_node), "[%s] Await node failed.", src_node->GetName().c_str());
RECORD_SHAPE_INFERENCE_EVENT(execution_context_,
node_item.NodeName().c_str(),
"[AwaitNodeDone] [%s] End",


+ 11
- 2
ge/hybrid/node_executor/aicpu/aicpu_node_executor.cc View File

@@ -19,6 +19,7 @@
#include "common/formats/formats.h"
#include "aicpu/common/aicpu_task_struct.h"
#include "graph/load/new_model_manager/model_manager.h"
#include "graph/utils/node_utils.h"
#include "hybrid/executor/hybrid_execution_context.h"
#include "hybrid/model/hybrid_model.h"
#include "opskernel_manager/ops_kernel_builder_manager.h"
@@ -187,7 +188,7 @@ Status AicpuNodeTaskBase::ExecuteAsync(TaskContext &context, std::function<void(
RECORD_EXECUTION_EVENT(context.GetExecutionContext(), context.GetNodeName(), "[AicpuNodeTaskBaseExecuteAsync] Start");
GELOGD("Node[%s] execute async start. unknown_type=%d.", node_name_.c_str(), unknown_type_);

GE_CHK_STATUS_RET(LaunchTask(context));
HYBRID_CHK_STATUS_RET(LaunchTask(context), "[%s] Failed to launch task", node_name_.c_str());

uint32_t task_id = 0;
uint32_t stream_id = 0;
@@ -346,7 +347,11 @@ Status AicpuTfNodeTask::Init(const HybridModel &model) {
GE_CHK_RT_RET(rtMemcpy(kernel_buf_->GetData(), sizeof(STR_FWK_OP_KERNEL),
&fwk_op_kernel, sizeof(STR_FWK_OP_KERNEL),
RT_MEMCPY_HOST_TO_DEVICE));

auto node_type = NodeUtils::GetNodeType(node_item_->node);
if (node_type.find(GETNEXT) != string::npos) {
GELOGD("[%s] Is GetNext, set need sync to true, node type = %s", node_name_.c_str(), node_type.c_str());
need_sync_ = true;
}
GELOGI("Node[%s] init end.", node_name_.c_str());
return SUCCESS;
}
@@ -616,6 +621,10 @@ Status AicpuTfNodeTask::LaunchTask(TaskContext &context) {
GE_CHK_RT_RET(rtKernelLaunchEx(kernel_buf_->GetData(), kernel_buf_->GetSize(), flag, context.GetStream()));
RECORD_EXECUTION_EVENT(context.GetExecutionContext(), node_name_.c_str(), "[AicpuTfNodertKernelLaunchEx] End");
GELOGD("Node[%s] launch end.", node_name_.c_str());
if (need_sync_) {
GELOGD("[%s] Task needs sync", node_name_.c_str());
GE_CHK_STATUS_RET_NOLOG(context.Synchronize());
}
return SUCCESS;
}



+ 1
- 0
ge/hybrid/node_executor/aicpu/aicpu_node_executor.h View File

@@ -144,6 +144,7 @@ class AicpuTfNodeTask : public AicpuNodeTaskBase {
std::unique_ptr<TensorBuffer> copy_input_data_size_dev_;
std::unique_ptr<TensorBuffer> copy_input_src_dev_;
std::unique_ptr<TensorBuffer> copy_input_dst_dev_;
bool need_sync_ = false;
};

class AicpuNodeTask : public AicpuNodeTaskBase {


+ 4
- 3
ge/hybrid/node_executor/node_executor.cc View File

@@ -20,6 +20,7 @@
#include "graph/utils/node_utils.h"
#include "init/gelib.h"
#include "graph/utils/tensor_utils.h"
#include "hybrid/executor/hybrid_execution_context.h"
#include "hybrid/model/hybrid_model.h"
#include "graph/debug/ge_attr_define.h"
#include "opskernel_manager/ops_kernel_builder_manager.h"
@@ -44,9 +45,9 @@ Status NodeExecutor::PrepareTask(NodeTask &task, TaskContext &context) const {
}

Status NodeExecutor::ExecuteTask(NodeTask &task, TaskContext &context, const std::function<void()> &callback) const {
GE_CHK_STATUS_RET(task.ExecuteAsync(context, callback),
"Failed to execute task. node = %s",
context.GetNodeItem().NodeName().c_str());
HYBRID_CHK_STATUS_RET(task.ExecuteAsync(context, callback),
"Failed to execute task. node = %s",
context.GetNodeItem().NodeName().c_str());
return SUCCESS;
}



+ 4
- 0
ge/hybrid/node_executor/task_context.cc View File

@@ -494,5 +494,9 @@ const DumpProperties &TaskContext::GetDumpProperties() const {
bool TaskContext::NeedCallback() {
return node_item_->has_observer || IsDumpEnabled() || execution_context_->profiling_level > 0;
}

Status TaskContext::Synchronize() {
return execution_context_->Synchronize(GetStream());
}
} // namespace hybrid
} // namespace ge

+ 2
- 0
ge/hybrid/node_executor/task_context.h View File

@@ -102,6 +102,8 @@ class TaskContext {
uint32_t GetStreamId() const;
void SetStreamId(uint32_t stream_id);

Status Synchronize();

bool IsForceInferShape() const;
void SetForceInferShape(bool force_infer_shape);
void *handle_ = nullptr;


Loading…
Cancel
Save