Browse Source

Handle EndOfSequence

tags/v1.2.0
chuxing 4 years ago
parent
commit
664634d070
12 changed files with 94 additions and 8 deletions
  1. +27
    -0
      ge/hybrid/executor/hybrid_execution_context.cc
  2. +2
    -0
      ge/hybrid/executor/hybrid_execution_context.h
  3. +2
    -2
      ge/hybrid/executor/hybrid_model_async_executor.cc
  4. +1
    -0
      ge/hybrid/executor/hybrid_model_executor.cc
  5. +1
    -0
      ge/hybrid/executor/hybrid_model_executor.h
  6. +10
    -1
      ge/hybrid/executor/subgraph_executor.cc
  7. +14
    -1
      ge/hybrid/node_executor/aicpu/aicpu_node_executor.cc
  8. +1
    -0
      ge/hybrid/node_executor/aicpu/aicpu_node_executor.h
  9. +26
    -2
      ge/hybrid/node_executor/compiledsubgraph/known_node_executor.cc
  10. +4
    -2
      ge/hybrid/node_executor/compiledsubgraph/known_node_executor.h
  11. +4
    -0
      ge/hybrid/node_executor/task_context.cc
  12. +2
    -0
      ge/hybrid/node_executor/task_context.h

+ 27
- 0
ge/hybrid/executor/hybrid_execution_context.cc View File

@@ -18,6 +18,12 @@


namespace ge { namespace ge {
namespace hybrid { namespace hybrid {
namespace {
const uint32_t kEndOfSequence = 0x0704000a;
const uint32_t kEndOfSequenceNew = 507005;
const int32_t kModelAbortNormal = 0x0704000e;
const int32_t kModelAbortNormalNew = 507024;
} // namespace
void GraphExecutionContext::SetErrorCode(Status error_code) { void GraphExecutionContext::SetErrorCode(Status error_code) {
std::lock_guard<std::mutex> lk(mu); std::lock_guard<std::mutex> lk(mu);
this->status = error_code; this->status = error_code;
@@ -27,5 +33,26 @@ Status GraphExecutionContext::GetStatus() const {
std::lock_guard<std::mutex> lk(mu); std::lock_guard<std::mutex> lk(mu);
return this->status; return this->status;
} }

Status GraphExecutionContext::Synchronize(rtStream_t rt_stream) {
auto rt_ret = rtStreamSynchronize(rt_stream);
if (rt_ret == SUCCESS) {
return SUCCESS;
}

if (rt_ret == kEndOfSequence || rt_ret == kEndOfSequenceNew) {
GELOGI("Got end of sequence");
is_eos_ = true;
return SUCCESS;
}

if (rt_ret == kModelAbortNormal || rt_ret == kModelAbortNormalNew) {
GELOGI("The model with multiple datasets aborts normally");
return SUCCESS;
}

GELOGE(RT_FAILED, "Failed to invoke rtStreamSynchronize, ret = %d", rt_ret);
return RT_FAILED;
}
} // namespace hybrid } // namespace hybrid
} // namespace ge } // namespace ge

+ 2
- 0
ge/hybrid/executor/hybrid_execution_context.h View File

@@ -36,6 +36,7 @@ namespace hybrid {
struct GraphExecutionContext { struct GraphExecutionContext {
void SetErrorCode(Status error_code); void SetErrorCode(Status error_code);
Status GetStatus() const; Status GetStatus() const;
Status Synchronize(rtStream_t rt_stream);


uint64_t session_id = 0; uint64_t session_id = 0;
const HybridModel *model = nullptr; const HybridModel *model = nullptr;
@@ -49,6 +50,7 @@ struct GraphExecutionContext {
DumpProperties dump_properties; DumpProperties dump_properties;
bool trace_enabled = false; bool trace_enabled = false;
bool dump_enabled = false; bool dump_enabled = false;
std::atomic_bool is_eos_;
long profiling_level = 0; long profiling_level = 0;
long iteration = 0; long iteration = 0;
Status status = SUCCESS; Status status = SUCCESS;


+ 2
- 2
ge/hybrid/executor/hybrid_model_async_executor.cc View File

@@ -24,7 +24,7 @@
namespace ge { namespace ge {
namespace hybrid { namespace hybrid {
namespace { namespace {
int kDataOutputIndex = 0;
const int kDataOutputIndex = 0;
} }
HybridModelAsyncExecutor::HybridModelAsyncExecutor(HybridModel *model) HybridModelAsyncExecutor::HybridModelAsyncExecutor(HybridModel *model)
: model_(model), run_flag_(false) { : model_(model), run_flag_(false) {
@@ -162,7 +162,7 @@ Status HybridModelAsyncExecutor::HandleResult(Status exec_ret,
OutputData *output_data) { OutputData *output_data) {
GELOGD("Start to handle result. model id = %u, data index = %u, execution ret = %u", model_id_, data_id, exec_ret); GELOGD("Start to handle result. model id = %u, data index = %u, execution ret = %u", model_id_, data_id, exec_ret);
std::vector<ge::OutputTensorInfo> output_tensor_info_list; std::vector<ge::OutputTensorInfo> output_tensor_info_list;
if (exec_ret == END_OF_SEQUENCE) {
if (args.is_eos) {
GELOGW("End of sequence, model id = %u", model_id_); GELOGW("End of sequence, model id = %u", model_id_);
return OnComputeDone(data_id, END_OF_SEQUENCE, output_tensor_info_list); return OnComputeDone(data_id, END_OF_SEQUENCE, output_tensor_info_list);
} }


+ 1
- 0
ge/hybrid/executor/hybrid_model_executor.cc View File

@@ -58,6 +58,7 @@ Status HybridModelExecutor::Execute(HybridModelExecutor::ExecuteArgs &args) {
context_.profiler->Reset(); context_.profiler->Reset();
} }


args.is_eos = context_.is_eos_;
context_.iteration += 1; context_.iteration += 1;
return SUCCESS; return SUCCESS;
} }


+ 1
- 0
ge/hybrid/executor/hybrid_model_executor.h View File

@@ -31,6 +31,7 @@ class HybridModelExecutor {
std::vector<ConstGeTensorDescPtr> input_desc; std::vector<ConstGeTensorDescPtr> input_desc;
std::vector<TensorValue> outputs; std::vector<TensorValue> outputs;
std::vector<ConstGeTensorDescPtr> output_desc; std::vector<ConstGeTensorDescPtr> output_desc;
bool is_eos = false;
}; };


HybridModelExecutor(HybridModel *model, uint32_t device_id, rtStream_t stream); HybridModelExecutor(HybridModel *model, uint32_t device_id, rtStream_t stream);


+ 10
- 1
ge/hybrid/executor/subgraph_executor.cc View File

@@ -240,6 +240,10 @@ Status SubgraphExecutor::PrepareNodes() {
} }


if (!ready_queue_.Push(p_node_state)) { if (!ready_queue_.Push(p_node_state)) {
if (context_->is_eos_) {
GELOGD("Got end of sequence");
return SUCCESS;
}
GELOGE(INTERNAL_ERROR, "[%s] Error occurs while launching tasks. quit from preparing nodes.", GELOGE(INTERNAL_ERROR, "[%s] Error occurs while launching tasks. quit from preparing nodes.",
graph_item_->GetName().c_str()); graph_item_->GetName().c_str());
return INTERNAL_ERROR; return INTERNAL_ERROR;
@@ -295,6 +299,11 @@ Status SubgraphExecutor::LaunchTasks() {
"[%s] Execute node failed.", "[%s] Execute node failed.",
node_state->GetName().c_str()); node_state->GetName().c_str());


if (context_->is_eos_) {
GELOGD("Got end of sequence");
ready_queue_.Stop();
return SUCCESS;
}
GELOGD("[%s] Done executing node successfully.", node_state->GetName().c_str()); GELOGD("[%s] Done executing node successfully.", node_state->GetName().c_str());
} }
} }
@@ -350,7 +359,7 @@ Status SubgraphExecutor::GetOutputs(vector<TensorValue> &outputs, std::vector<Co


Status SubgraphExecutor::Synchronize() { Status SubgraphExecutor::Synchronize() {
GELOGD("[%s] Synchronize start.", graph_item_->GetName().c_str()); GELOGD("[%s] Synchronize start.", graph_item_->GetName().c_str());
GE_CHK_RT_RET(rtStreamSynchronize(context_->stream));
GE_CHK_STATUS_RET_NOLOG(context_->Synchronize(context_->stream));
GELOGD("[%s] Done synchronizing successfully.", graph_item_->GetName().c_str()); GELOGD("[%s] Done synchronizing successfully.", graph_item_->GetName().c_str());
return SUCCESS; return SUCCESS;
} }


+ 14
- 1
ge/hybrid/node_executor/aicpu/aicpu_node_executor.cc View File

@@ -19,6 +19,7 @@
#include "common/formats/formats.h" #include "common/formats/formats.h"
#include "aicpu/common/aicpu_task_struct.h" #include "aicpu/common/aicpu_task_struct.h"
#include "graph/load/new_model_manager/model_manager.h" #include "graph/load/new_model_manager/model_manager.h"
#include "graph/utils/node_utils.h"
#include "hybrid/executor/hybrid_execution_context.h" #include "hybrid/executor/hybrid_execution_context.h"
#include "hybrid/model/hybrid_model.h" #include "hybrid/model/hybrid_model.h"
#include "opskernel_manager/ops_kernel_builder_manager.h" #include "opskernel_manager/ops_kernel_builder_manager.h"
@@ -188,6 +189,10 @@ Status AicpuNodeTaskBase::ExecuteAsync(TaskContext &context, std::function<void(
GELOGD("Node[%s] execute async start. unknown_type=%d.", node_name_.c_str(), unknown_type_); GELOGD("Node[%s] execute async start. unknown_type=%d.", node_name_.c_str(), unknown_type_);


GE_CHK_STATUS_RET(LaunchTask(context)); GE_CHK_STATUS_RET(LaunchTask(context));
if (context.GetExecutionContext()->is_eos_) {
GELOGD("[%s] Got end of sequence", node_name_.c_str());
return SUCCESS;
}


uint32_t task_id = 0; uint32_t task_id = 0;
uint32_t stream_id = 0; uint32_t stream_id = 0;
@@ -346,7 +351,11 @@ Status AicpuTfNodeTask::Init(const HybridModel &model) {
GE_CHK_RT_RET(rtMemcpy(kernel_buf_->GetData(), sizeof(STR_FWK_OP_KERNEL), GE_CHK_RT_RET(rtMemcpy(kernel_buf_->GetData(), sizeof(STR_FWK_OP_KERNEL),
&fwk_op_kernel, sizeof(STR_FWK_OP_KERNEL), &fwk_op_kernel, sizeof(STR_FWK_OP_KERNEL),
RT_MEMCPY_HOST_TO_DEVICE)); RT_MEMCPY_HOST_TO_DEVICE));

auto node_type = NodeUtils::GetNodeType(node_item_->node);
if (node_type.find(GETNEXT) != string::npos) {
GELOGD("[%s] Is GetNext, set need sync to true, node type = %s", node_name_.c_str(), node_type.c_str());
need_sync_ = true;
}
GELOGI("Node[%s] init end.", node_name_.c_str()); GELOGI("Node[%s] init end.", node_name_.c_str());
return SUCCESS; return SUCCESS;
} }
@@ -616,6 +625,10 @@ Status AicpuTfNodeTask::LaunchTask(TaskContext &context) {
GE_CHK_RT_RET(rtKernelLaunchEx(kernel_buf_->GetData(), kernel_buf_->GetSize(), flag, context.GetStream())); GE_CHK_RT_RET(rtKernelLaunchEx(kernel_buf_->GetData(), kernel_buf_->GetSize(), flag, context.GetStream()));
RECORD_EXECUTION_EVENT(context.GetExecutionContext(), node_name_.c_str(), "[AicpuTfNodertKernelLaunchEx] End"); RECORD_EXECUTION_EVENT(context.GetExecutionContext(), node_name_.c_str(), "[AicpuTfNodertKernelLaunchEx] End");
GELOGD("Node[%s] launch end.", node_name_.c_str()); GELOGD("Node[%s] launch end.", node_name_.c_str());
if (need_sync_) {
GELOGD("[%s] Task needs sync", node_name_.c_str());
GE_CHK_STATUS_RET_NOLOG(context.Synchronize());
}
return SUCCESS; return SUCCESS;
} }




+ 1
- 0
ge/hybrid/node_executor/aicpu/aicpu_node_executor.h View File

@@ -144,6 +144,7 @@ class AicpuTfNodeTask : public AicpuNodeTaskBase {
std::unique_ptr<TensorBuffer> copy_input_data_size_dev_; std::unique_ptr<TensorBuffer> copy_input_data_size_dev_;
std::unique_ptr<TensorBuffer> copy_input_src_dev_; std::unique_ptr<TensorBuffer> copy_input_src_dev_;
std::unique_ptr<TensorBuffer> copy_input_dst_dev_; std::unique_ptr<TensorBuffer> copy_input_dst_dev_;
bool need_sync_ = false;
}; };


class AicpuNodeTask : public AicpuNodeTaskBase { class AicpuNodeTask : public AicpuNodeTaskBase {


+ 26
- 2
ge/hybrid/node_executor/compiledsubgraph/known_node_executor.cc View File

@@ -21,6 +21,8 @@
#include "common/ge/ge_util.h" #include "common/ge/ge_util.h"
#include "graph/attr_value.h" #include "graph/attr_value.h"
#include "graph/debug/ge_attr_define.h" #include "graph/debug/ge_attr_define.h"
#include "graph/utils/graph_utils.h"
#include "graph/utils/node_utils.h"
#include "graph/load/new_model_manager/model_utils.h" #include "graph/load/new_model_manager/model_utils.h"
#include "graph/load/new_model_manager/model_manager.h" #include "graph/load/new_model_manager/model_manager.h"
#include "hybrid/executor/hybrid_execution_context.h" #include "hybrid/executor/hybrid_execution_context.h"
@@ -29,7 +31,7 @@ namespace ge {
namespace hybrid { namespace hybrid {
REGISTER_NODE_EXECUTOR_BUILDER(NodeExecutorManager::ExecutorType::COMPILED_SUBGRAPH, KnownNodeExecutor); REGISTER_NODE_EXECUTOR_BUILDER(NodeExecutorManager::ExecutorType::COMPILED_SUBGRAPH, KnownNodeExecutor);


Status KnownNodeTask:: ExecuteAsync(TaskContext &context, std::function<void()> done_callback) {
Status KnownNodeTask::ExecuteAsync(TaskContext &context, std::function<void()> done_callback) {
RECORD_EXECUTION_EVENT(context.GetExecutionContext(), context.GetNodeName(), "[KnownNodeTaskExecuteAsync] Start"); RECORD_EXECUTION_EVENT(context.GetExecutionContext(), context.GetNodeName(), "[KnownNodeTaskExecuteAsync] Start");
GELOGD("[%s] KnownNodeTask::ExecuteAsync in.", context.GetNodeName()); GELOGD("[%s] KnownNodeTask::ExecuteAsync in.", context.GetNodeName());
if (davinci_model_->GetTaskList().empty()) { if (davinci_model_->GetTaskList().empty()) {
@@ -58,6 +60,10 @@ Status KnownNodeTask:: ExecuteAsync(TaskContext &context, std::function<void()
GELOGE(rt_ret, "rtModelExecute error, ret: hybrid_model_executorOx%X", rt_ret); return FAILED;); GELOGE(rt_ret, "rtModelExecute error, ret: hybrid_model_executorOx%X", rt_ret); return FAILED;);
RECORD_EXECUTION_EVENT(context.GetExecutionContext(), context.GetNodeName(), "[KnownNodertModelExecute] End"); RECORD_EXECUTION_EVENT(context.GetExecutionContext(), context.GetNodeName(), "[KnownNodertModelExecute] End");


if (need_sync_) {
GELOGD("[%s] model need sync", context.GetNodeName());
GE_CHK_STATUS_RET_NOLOG(context.Synchronize());
}
GE_CHK_STATUS_RET_NOLOG(context.RegisterCallback(done_callback)); GE_CHK_STATUS_RET_NOLOG(context.RegisterCallback(done_callback));
GELOGD("[%s] KnownNodeTask::ExecuteAsync success.", context.GetNodeName()); GELOGD("[%s] KnownNodeTask::ExecuteAsync success.", context.GetNodeName());
RECORD_EXECUTION_EVENT(context.GetExecutionContext(), context.GetNodeName(), "[KnownNodeTaskExecuteAsync] End"); RECORD_EXECUTION_EVENT(context.GetExecutionContext(), context.GetNodeName(), "[KnownNodeTaskExecuteAsync] End");
@@ -171,7 +177,9 @@ Status KnownNodeExecutor::LoadTask(const HybridModel &model, const NodePtr &node


GE_CHK_STATUS_RET(davinci_model->Assign(ge_model), "KnownNodeExecutor::LoadTask davincimodel assign failed."); GE_CHK_STATUS_RET(davinci_model->Assign(ge_model), "KnownNodeExecutor::LoadTask davincimodel assign failed.");


task = MakeShared<KnownNodeTask>(davinci_model);
bool need_sync = false;
GE_CHK_STATUS_RET_NOLOG(NeedSync(*ge_model, need_sync));
task = MakeShared<KnownNodeTask>(davinci_model, need_sync);
GE_CHECK_NOTNULL(task); GE_CHECK_NOTNULL(task);
GELOGI("[%s] KnownNodeExecutor::LoadTask success.", node->GetName().c_str()); GELOGI("[%s] KnownNodeExecutor::LoadTask success.", node->GetName().c_str());
return SUCCESS; return SUCCESS;
@@ -186,5 +194,21 @@ Status KnownNodeExecutor::ExecuteTask(NodeTask &task, TaskContext &context,
RECORD_EXECUTION_EVENT(context.GetExecutionContext(), context.GetNodeName(), "[KnownNodeExecutorExecuteTask] End"); RECORD_EXECUTION_EVENT(context.GetExecutionContext(), context.GetNodeName(), "[KnownNodeExecutorExecuteTask] End");
return SUCCESS; return SUCCESS;
} }

Status KnownNodeExecutor::NeedSync(GeModel &ge_model, bool &need_sync) {
auto compute_graph = GraphUtils::GetComputeGraph(ge_model.GetGraph());
GE_CHECK_NOTNULL(compute_graph);
for (auto &node : compute_graph->GetAllNodes()) {
auto type = NodeUtils::GetNodeType(node);
if (type == GETNEXT) {
GELOGD("Contains GetNext node: %s", node->GetName().c_str());
need_sync = true;
return SUCCESS;
}
}

need_sync = false;
return SUCCESS;
}
} // namespace hybrid } // namespace hybrid
} // namespace ge } // namespace ge

+ 4
- 2
ge/hybrid/node_executor/compiledsubgraph/known_node_executor.h View File

@@ -27,8 +27,8 @@ class HybridModel;


class KnownNodeTask : public NodeTask { class KnownNodeTask : public NodeTask {
public: public:
explicit KnownNodeTask(std::shared_ptr<DavinciModel> davinci_model)
: davinci_model_(davinci_model)
explicit KnownNodeTask(std::shared_ptr<DavinciModel> davinci_model, bool need_sync)
: davinci_model_(davinci_model), need_sync_(need_sync)
{} {}


~KnownNodeTask() {} ~KnownNodeTask() {}
@@ -39,6 +39,7 @@ class KnownNodeTask : public NodeTask {
private: private:
std::shared_ptr<DavinciModel> davinci_model_ = nullptr; std::shared_ptr<DavinciModel> davinci_model_ = nullptr;
bool load_flag_ = false; bool load_flag_ = false;
bool need_sync_;
}; };


class KnownNodeExecutor : public NodeExecutor { class KnownNodeExecutor : public NodeExecutor {
@@ -48,6 +49,7 @@ class KnownNodeExecutor : public NodeExecutor {
Status ExecuteTask(NodeTask &task, TaskContext &context, const std::function<void()> &callback) const; Status ExecuteTask(NodeTask &task, TaskContext &context, const std::function<void()> &callback) const;
~KnownNodeExecutor() {} ~KnownNodeExecutor() {}
private: private:
static Status NeedSync(GeModel &ge_model, bool &need_sync);
std::shared_ptr<DavinciModel> davinci_model_ = nullptr; std::shared_ptr<DavinciModel> davinci_model_ = nullptr;
}; };
} // namespace hybrid } // namespace hybrid


+ 4
- 0
ge/hybrid/node_executor/task_context.cc View File

@@ -494,5 +494,9 @@ const DumpProperties &TaskContext::GetDumpProperties() const {
bool TaskContext::NeedCallback() { bool TaskContext::NeedCallback() {
return node_item_->has_observer || IsDumpEnabled() || execution_context_->profiling_level > 0; return node_item_->has_observer || IsDumpEnabled() || execution_context_->profiling_level > 0;
} }

Status TaskContext::Synchronize() {
return execution_context_->Synchronize(GetStream());
}
} // namespace hybrid } // namespace hybrid
} // namespace ge } // namespace ge

+ 2
- 0
ge/hybrid/node_executor/task_context.h View File

@@ -102,6 +102,8 @@ class TaskContext {
uint32_t GetStreamId() const; uint32_t GetStreamId() const;
void SetStreamId(uint32_t stream_id); void SetStreamId(uint32_t stream_id);


Status Synchronize();

bool IsForceInferShape() const; bool IsForceInferShape() const;
void SetForceInferShape(bool force_infer_shape); void SetForceInferShape(bool force_infer_shape);
void *handle_ = nullptr; void *handle_ = nullptr;


Loading…
Cancel
Save