Browse Source

run_graph_with_stream.

tags/v1.3.0
zhaozhixuan 3 years ago
parent
commit
3723f68b83
19 changed files with 545 additions and 2 deletions
  1. +41
    -0
      ge/client/ge_api.cc
  2. +66
    -0
      ge/graph/execute/graph_execute.cc
  3. +9
    -0
      ge/graph/execute/graph_execute.h
  4. +10
    -0
      ge/graph/load/graph_loader.cc
  5. +73
    -0
      ge/graph/manager/graph_manager.cc
  6. +16
    -0
      ge/graph/manager/graph_manager.h
  7. +1
    -0
      ge/graph/manager/graph_manager_utils.cc
  8. +3
    -0
      ge/graph/manager/graph_manager_utils.h
  9. +5
    -0
      ge/model/ge_root_model.h
  10. +45
    -0
      ge/session/inner_session.cc
  11. +3
    -0
      ge/session/inner_session.h
  12. +27
    -0
      ge/session/session_manager.cc
  13. +14
    -0
      ge/session/session_manager.h
  14. +12
    -0
      inc/external/ge/ge_api.h
  15. +1
    -1
      metadef
  16. +2
    -0
      tests/ut/ge/CMakeLists.txt
  17. +99
    -1
      tests/ut/ge/graph/ge_executor_unittest.cc
  18. +60
    -0
      tests/ut/ge/graph/manager/run_graph_unittest.cc
  19. +58
    -0
      tests/ut/ge/session/ge_api_unittest.cc

+ 41
- 0
ge/client/ge_api.cc View File

@@ -598,6 +598,47 @@ Status Session::RunGraph(uint32_t graph_id, const std::vector<Tensor> &inputs, s
return ret; return ret;
} }


// Run Graph with stream Asynchronously
Status Session::RunGraphWithStreamAsync(uint32_t graph_id, void *stream, const std::vector<Tensor> &inputs,
std::vector<Tensor> &outputs) {
ErrorManager::GetInstance().SetStage(ErrorMessage::kModelCompile, ErrorMessage::kOther);
GELOGT(TRACE_INIT, "Session run graph with stream async start");

ErrorManager::GetInstance().GenWorkStreamIdBySessionGraph(sessionId_, graph_id);
std::shared_ptr<GELib> instance_ptr = ge::GELib::GetInstance();
if (instance_ptr == nullptr) {
GELOGE(GE_CLI_GE_NOT_INITIALIZED,
"[Run][Graph]Run graph with stream asyn failed, the GELib instance is nullptr,"
"session id = %lu, graph id = %u, stream = %p.", sessionId_, graph_id, stream);
REPORT_INNER_ERROR("E19999",
"Run graph with stream asyn failed, the GELib instance is nullptr"
"session id = %lu, graph id = %u, stream = %p.", sessionId_, graph_id, stream);
return FAILED;
}
if (!instance_ptr->InitFlag()) {
GELOGE(GE_CLI_GE_NOT_INITIALIZED,
"[Run][Graph]Run graph with stream asyn failed, the GELib instance is not init,"
"session id = %lu, graph id = %u, stream = %p.", sessionId_, graph_id, stream);
REPORT_INNER_ERROR("E19999",
"Run graph with stream asyn failed, the GELib instance is not init,"
"session id = %lu, graph id = %u, stream = %p.", sessionId_, graph_id, stream);
return FAILED;
}
GELOGT(TRACE_RUNNING, "Run Graph Run graph with stream asyn.");
Status ret = instance_ptr->SessionManagerObj().RunGraphWithStreamAsync(sessionId_, graph_id, stream, inputs,
outputs);
if (ret != SUCCESS) {
GELOGE(ret, "[Run][Graph]Run graph with stream asyn Failed,"
"error code = %u, session id = %lu, graph id = %u, stream = %p.", ret, sessionId_, graph_id, stream);
REPORT_CALL_ERROR("E19999", "[Run][Graph]Run graph with stream asyn failed, error code = %u, session id = %lu,"
"graph id = %u, stream = %p.", ret, sessionId_, graph_id, stream);
return FAILED;
}

GELOGT(TRACE_STOP, "Session run graph with stream async finished");
return SUCCESS;
}

// Register Call Back // Register Call Back
Status Session::RegisterCallBackFunc(const std::string &key, const pCallBackFunc &callback) { Status Session::RegisterCallBackFunc(const std::string &key, const pCallBackFunc &callback) {
ErrorManager::GetInstance().GenWorkStreamIdDefault(); ErrorManager::GetInstance().GenWorkStreamIdDefault();


+ 66
- 0
ge/graph/execute/graph_execute.cc View File

@@ -403,6 +403,72 @@ Status GraphExecutor::ExecuteGraphAsync(GraphId graph_id, const GeRootModelPtr &
return SUCCESS; return SUCCESS;
} }


Status GraphExecutor::GetExecuteData(const std::vector<GeTensor> &input_tensor, std::vector<DataBuffer> &blobs,
std::vector<GeTensorDesc> &tensor_desc) {
for (const auto &tensor : input_tensor) {
DataBuffer in_data_buf;
// check placement
in_data_buf.data = const_cast<uint8_t *>(tensor.GetData().data());
in_data_buf.length = tensor.GetData().size();
in_data_buf.isDataSupportMemShare = false;
blobs.emplace_back(in_data_buf);
tensor_desc.emplace_back(tensor.GetTensorDesc());
}
return SUCCESS;
}

Status GraphExecutor::ExecuteGraphWithStream(GraphId graph_id,
rtStream_t stream,
const GeRootModelPtr &ge_root_model,
const std::vector<GeTensor> &input_tensor,
std::vector<GeTensor> &output_tensor) {
GELOGI("[GraphExecutor] Start to execute graph with stream, graph id = %u, stream = %p.", graph_id, stream);
if (!init_flag_) {
REPORT_INNER_ERROR("E19999", "No SetCondition called before, graph id = %u, stream = %p, check invalid.",
graph_id, stream);
GELOGE(GE_GRAPH_EXECUTE_NOT_INIT, "[GraphExecutor] AI Core Engine without calling SetCondition!");
return GE_GRAPH_EXECUTE_NOT_INIT;
}

if (graph_id != last_graph_id_) {
auto ret = FreeExecuteMemory();
if (ret != SUCCESS) {
return ret;
}
}
last_graph_id_ = graph_id;

GE_CHECK_NOTNULL_EXEC(ge_root_model, return FAILED);
auto model_id = ge_root_model->GetModelId();
InputData input_data;
input_data.index = 0;
input_data.model_id = model_id;
std::vector<GeTensorDesc> input_desc;
auto ret = GetExecuteData(input_tensor, input_data.blobs, input_desc);
if (ret != SUCCESS) {
return ret;
}
OutputData output_data;
output_data.index = 0;
output_data.model_id = model_id;
std::vector<GeTensorDesc> output_desc;
ret = GetExecuteData(output_tensor, output_data.blobs, output_desc);
if (ret != SUCCESS) {
return ret;
}

auto async_mode = true;
auto model_manager = ge::ModelManager::GetInstance();
GE_CHECK_NOTNULL(model_manager);
ret = model_manager->ExecuteModel(model_id, stream, async_mode, input_data, input_desc, output_data, output_desc);
if (ret != SUCCESS) {
return ret;
}

GELOGI("[GraphExecutor] Async execute graph with stream success graph id = %u, stream = %p.", graph_id, stream);
return SUCCESS;
}

bool CompareByLoad(const Uint32Pair &lhs, const Uint32Pair &rhs) { bool CompareByLoad(const Uint32Pair &lhs, const Uint32Pair &rhs) {
return lhs.second < rhs.second; return lhs.second < rhs.second;
} }


+ 9
- 0
ge/graph/execute/graph_execute.h View File

@@ -52,6 +52,12 @@ class GraphExecutor {
ge::Status ExecuteGraphAsync(GraphId graph_id, const GeRootModelPtr &ge_root_model, ge::Status ExecuteGraphAsync(GraphId graph_id, const GeRootModelPtr &ge_root_model,
const std::vector<InputTensorInfo> &input_tensor, const RunAsyncCallback &callback); const std::vector<InputTensorInfo> &input_tensor, const RunAsyncCallback &callback);


Status ExecuteGraphWithStream(GraphId graph_id,
rtStream_t stream,
const GeRootModelPtr &ge_root_model,
const std::vector<GeTensor> &input_tensor,
std::vector<GeTensor> &output_tensor);

Status SetCondition(std::mutex *mutex, std::condition_variable *cond, std::shared_ptr<GraphModelListener> listener); Status SetCondition(std::mutex *mutex, std::condition_variable *cond, std::shared_ptr<GraphModelListener> listener);


Status SetGraphContext(GraphContextPtr graph_context_ptr); Status SetGraphContext(GraphContextPtr graph_context_ptr);
@@ -125,6 +131,9 @@ class GraphExecutor {
Status PrepareInputData(const std::vector<GeTensor> &input_tensor, InputData &graph_input_data, Status PrepareInputData(const std::vector<GeTensor> &input_tensor, InputData &graph_input_data,
OutputData &graph_output_data, std::vector<InputOutputDescInfo> &output_desc); OutputData &graph_output_data, std::vector<InputOutputDescInfo> &output_desc);


Status GetExecuteData(const std::vector<GeTensor> &input_tensor, std::vector<DataBuffer> &blobs,
std::vector<GeTensorDesc> &tensor_desc);

Status SyncExecuteModel(uint32_t model_id, const std::vector<GeTensor> &input_tensor, Status SyncExecuteModel(uint32_t model_id, const std::vector<GeTensor> &input_tensor,
std::vector<GeTensor> &output_tensor); std::vector<GeTensor> &output_tensor);




+ 10
- 0
ge/graph/load/graph_loader.cc View File

@@ -75,6 +75,16 @@ Status GraphLoader::LoadModelOnline(uint32_t &model_id, const std::shared_ptr<ge
return ret; return ret;
} }


if (ge_root_model_ptr->IsSpecificStream()) {
GELOGI("No need to start a new thread to run model in specific scene.");
rt_ret = rtDeviceReset(GetContext().DeviceId());
if (rt_ret != RT_ERROR_NONE) {
REPORT_CALL_ERROR("E19999", "Call rtDeviceReset failed, device_id:%u, ret:0x%X",
GetContext().DeviceId(), rt_ret);
GELOGE(RT_FAILED, "Call rt api failed, ret: 0x%X", rt_ret);
}
return SUCCESS;
}
ret = model_manager->Start(model_id); ret = model_manager->Start(model_id);
if (ret != SUCCESS) { if (ret != SUCCESS) {
if (model_manager->Unload(model_id) != SUCCESS) { if (model_manager->Unload(model_id) != SUCCESS) {


+ 73
- 0
ge/graph/manager/graph_manager.cc View File

@@ -1109,6 +1109,7 @@ Status GraphManager::LoadGraph(const GeRootModelPtr &ge_root_model, const GraphN
GE_CHK_STATUS_RET(CheckAndReleaseMemory(ge_model, graph_node)); GE_CHK_STATUS_RET(CheckAndReleaseMemory(ge_model, graph_node));
} }
} }
ge_root_model->SetIsSpecificStream(graph_node->IsSpecificStream());
GE_TIMESTAMP_START(LoadGraph); GE_TIMESTAMP_START(LoadGraph);
Status ret = GraphLoader::LoadModelOnline(model_id_info.model_id, ge_root_model, model_listener); Status ret = GraphLoader::LoadModelOnline(model_id_info.model_id, ge_root_model, model_listener);
GE_TIMESTAMP_EVENT_END(LoadGraph, "GraphManager::LoadGraph"); GE_TIMESTAMP_EVENT_END(LoadGraph, "GraphManager::LoadGraph");
@@ -1232,6 +1233,78 @@ Status GraphManager::InnerRunGraph(GraphNodePtr &graph_node, const GraphId &grap
return SUCCESS; return SUCCESS;
} }


Status GraphManager::InnerRunGraphWithStream(GraphNodePtr &graph_node, const GraphId &graph_id, rtStream_t stream,
const std::vector<GeTensor> &inputs, std::vector<GeTensor> &outputs) {
auto ret = graph_executor_.SetCondition(&sync_run_mutex_, &condition_, graph_run_listener_);
if (ret != SUCCESS) {
GELOGE(GE_GRAPH_RUNGRAPH_FAILED, "[Run][GraphWithStreamAsync] set condition failed, "
"graph id = %u, stream = %p.", graph_id, stream);
graph_node->SetRunFlag(false);
return GE_GRAPH_RUNGRAPH_FAILED;
}

ret = graph_executor_.ExecuteGraphWithStream(graph_id, stream, graph_node->GetGeRootModel(), inputs, outputs);
graph_node->SetRunFlag(false);
graph_node->SetIsSpecificStream(false);
if (ret != SUCCESS) {
GELOGE(ret, "[Run][GraphWithStreamAsync] execute graph failed, graph id = %u, stream = %p.", graph_id, stream);
return ret;
}
GELOGI("[Run][GraphWithStreamAsync] run graph success, graph id = %u, stream = %p.", graph_id, stream);
return SUCCESS;
}

Status GraphManager::RunGraphWithStreamAsync(const GraphId &graph_id, rtStream_t stream, uint64_t session_id,
const std::vector<GeTensor> &inputs, std::vector<GeTensor> &outputs) {
ErrorManager::GetInstance().SetStage(ErrorMessage::kModelCompile, ErrorMessage::kOther);
std::lock_guard<std::mutex> lock(run_mutex_);
GELOGI("Start to run graph with stream async, graph id = %u, stream = %p.", graph_id, stream);

if (inputs.empty()) {
GELOGI("Run graph with stream async, initialize sub graph has no inputs.");
}

// find graph
GraphNodePtr graph_node = nullptr;
Status ret = GetGraphNode(graph_id, graph_node);
if (ret != SUCCESS) {
REPORT_INNER_ERROR("E19999", "graph id = %u not exist in graph_map, check invalid.", graph_id);
GELOGE(ret, "Run graph with stream async graph not exist, graph id = %u.", graph_id);
return ret;
}
if (graph_node == nullptr) {
REPORT_INNER_ERROR("E19999", "Graph node is nullptr in graph_map, graph id = %u, check invalid.", graph_id);
GELOGE(GE_GRAPH_GRAPH_NODE_NULL, "Run graph with stream async graph node is NULL, graph id = %u.", graph_id);
return GE_GRAPH_GRAPH_NODE_NULL;
}
if (graph_node->GetRunFlag()) {
REPORT_INNER_ERROR("E19999", "Graph is already running, can't be run again, graph id = %u, "
"check invalid.", graph_id);
GELOGE(GE_GRAPH_ALREADY_RUNNING, "Run graph with stream async graph already running, graph id = %u.", graph_id);
return GE_GRAPH_ALREADY_RUNNING;
}

UpdateLocalOmgContext(graph_id);
// set graph's run flag
graph_node->SetRunFlag(true);
graph_node->SetIsSpecificStream(true);
ComputeGraphPtr compute_graph_tmp = GraphUtils::GetComputeGraph(*(graph_node->GetGraph()));

// when set incre build, add cache helper map
AddModelCacheHelperToMap(graph_id, session_id, compute_graph_tmp);
if (options_.local_fmk_op_flag) {
GetCompilerStages(graph_id).optimizer.TranFrameOp(compute_graph_tmp);
}
GeRootModelPtr ge_root_model = nullptr;
ret = StartForRunGraph(graph_node, inputs, ge_root_model, session_id);
if (ret != SUCCESS) {
GELOGE(ret, "[Run][GraphWithStreamAsync] StartForRunGraph failed!");
graph_node->SetRunFlag(false);
return ret;
}
return InnerRunGraphWithStream(graph_node, graph_id, stream, inputs, outputs);
}

Status GraphManager::RunGraph(const GraphId &graph_id, const std::vector<GeTensor> &inputs, Status GraphManager::RunGraph(const GraphId &graph_id, const std::vector<GeTensor> &inputs,
std::vector<GeTensor> &outputs, uint64_t session_id) { std::vector<GeTensor> &outputs, uint64_t session_id) {
ErrorManager::GetInstance().SetStage(error_message::kModelCompile, error_message::kOther); ErrorManager::GetInstance().SetStage(error_message::kModelCompile, error_message::kOther);


+ 16
- 0
ge/graph/manager/graph_manager.h View File

@@ -103,6 +103,19 @@ class GraphManager {
Status RunGraph(const GraphId &graph_id, const std::vector<GeTensor> &inputs, std::vector<GeTensor> &outputs, Status RunGraph(const GraphId &graph_id, const std::vector<GeTensor> &inputs, std::vector<GeTensor> &outputs,
uint64_t session_id = INVALID_SESSION_ID); uint64_t session_id = INVALID_SESSION_ID);


///
/// @ingroup ge_graph
/// @brief run specific graph with specific session id and stream
/// @param [in] graph_id graph id
/// @param [in] stream specific stream
/// @param [in] session_id session id
/// @param [in] inputs input data
/// @param [out] outputs output data
/// @return Status result of function
///
Status RunGraphWithStreamAsync(const GraphId &graph_id, rtStream_t stream, uint64_t session_id,
const std::vector<GeTensor> &inputs, std::vector<GeTensor> &outputs);

/// ///
/// @ingroup ge_graph /// @ingroup ge_graph
/// @brief build specific graph /// @brief build specific graph
@@ -258,6 +271,9 @@ class GraphManager {
Status InnerRunGraph(GraphNodePtr &graph_node, const GraphId &graph_id, const std::vector<GeTensor> &inputs, Status InnerRunGraph(GraphNodePtr &graph_node, const GraphId &graph_id, const std::vector<GeTensor> &inputs,
std::vector<GeTensor> &outputs); std::vector<GeTensor> &outputs);


Status InnerRunGraphWithStream(GraphNodePtr &graph_node, const GraphId &graph_id, rtStream_t stream,
const std::vector<GeTensor> &inputs, std::vector<GeTensor> &outputs);

Status ParseOptions(const std::map<std::string, std::string> &options); Status ParseOptions(const std::map<std::string, std::string> &options);


static void ParseOption(const std::map<std::string, std::string> &options, const std::string &key, static void ParseOption(const std::map<std::string, std::string> &options, const std::string &key,


+ 1
- 0
ge/graph/manager/graph_manager_utils.cc View File

@@ -41,6 +41,7 @@ GraphNode::GraphNode(GraphId graph_id)
build_flag_(false), build_flag_(false),
load_flag_(false), load_flag_(false),
async_(false), async_(false),
is_specific_stream_(false),
ge_model_(nullptr), ge_model_(nullptr),
sem_(1) { sem_(1) {
graph_run_async_listener_ = MakeShared<RunAsyncListener>(); graph_run_async_listener_ = MakeShared<RunAsyncListener>();


+ 3
- 0
ge/graph/manager/graph_manager_utils.h View File

@@ -167,6 +167,8 @@ class GraphNode {
void UpdateLoadFlag() { load_flag_ = load_count_ == 0 || load_record_ >= kMaxLoadNum; } void UpdateLoadFlag() { load_flag_ = load_count_ == 0 || load_record_ >= kMaxLoadNum; }
void SetLoadFlag(bool load_flag) { load_flag_ = load_flag; } void SetLoadFlag(bool load_flag) { load_flag_ = load_flag; }
void SetGeModel(const GeModelPtr &ge_model) { ge_model_ = ge_model; } void SetGeModel(const GeModelPtr &ge_model) { ge_model_ = ge_model; }
void SetIsSpecificStream(bool specific_stream) { is_specific_stream_ = specific_stream; }
bool IsSpecificStream() const { return is_specific_stream_; }
GeModelPtr GetGeModel() const { return ge_model_; } GeModelPtr GetGeModel() const { return ge_model_; }
void SetGeRootModel(const GeRootModelPtr &ge_root_model) { ge_root_model_ = ge_root_model; } void SetGeRootModel(const GeRootModelPtr &ge_root_model) { ge_root_model_ = ge_root_model; }
GeRootModelPtr GetGeRootModel() const { return ge_root_model_; } GeRootModelPtr GetGeRootModel() const { return ge_root_model_; }
@@ -200,6 +202,7 @@ class GraphNode {
// load_flag_ is true if more than 1 model were loaded // load_flag_ is true if more than 1 model were loaded
bool load_flag_; bool load_flag_;
bool async_; bool async_;
bool is_specific_stream_;
GeModelPtr ge_model_; GeModelPtr ge_model_;
GeRootModelPtr ge_root_model_; GeRootModelPtr ge_root_model_;
BlockingQueue<uint8_t> sem_; BlockingQueue<uint8_t> sem_;


+ 5
- 0
ge/model/ge_root_model.h View File

@@ -40,6 +40,10 @@ class GeRootModel {
} }
uint32_t GetModelId() const { return model_id_; } uint32_t GetModelId() const { return model_id_; }


void SetIsSpecificStream(bool is_specific_stream) { is_specific_stream_ = is_specific_stream; }

bool IsSpecificStream() const { return is_specific_stream_; }

void SetModelName(const std::string &model_name) { model_name_ = model_name; } void SetModelName(const std::string &model_name) { model_name_ = model_name; }


const std::string &GetModelName() const { return model_name_; } const std::string &GetModelName() const { return model_name_; }
@@ -64,6 +68,7 @@ class GeRootModel {
std::vector<uint32_t> model_ids_; std::vector<uint32_t> model_ids_;
bool train_flag_ = false; bool train_flag_ = false;
std::string model_name_; std::string model_name_;
bool is_specific_stream_ = false;
}; };
} // namespace ge } // namespace ge
using GeRootModelPtr = std::shared_ptr<ge::GeRootModel>; using GeRootModelPtr = std::shared_ptr<ge::GeRootModel>;


+ 45
- 0
ge/session/inner_session.cc View File

@@ -262,6 +262,51 @@ Status InnerSession::RunGraph(uint32_t graph_id, const std::vector<Tensor> &inpu
} }
} }


Status InnerSession::RunGraphWithStreamAsync(uint32_t graph_id, rtStream_t stream,
const std::vector<Tensor> &inputs, std::vector<Tensor> &outputs) {
GELOGI("Run graph with stream, session id = %lu, graph id = %u, stream = %p.",
session_id_, graph_id, stream);
if (mutex_.try_lock()) {
std::lock_guard<std::mutex> lock(mutex_, std::adopt_lock);
if (!init_flag_) {
GELOGE(GE_SESS_INIT_FAILED, "[Run][GraphWithStream]failed because GraphManager not Init,"
"session id = %lu, graph id = %u, stream = %p.", session_id_, graph_id, stream);
REPORT_INNER_ERROR("E19999", "RunGraphWithStreamAsync failed because GraphManager not Init,"
"session id = %lu, graph id = %u, stream = %p.", session_id_, graph_id, stream);
return GE_SESS_INIT_FAILED;
}
UpdateThreadContext(graph_id);
vector<GeTensor> ge_inputs;
for (auto &item : inputs) {
ge_inputs.emplace_back(TensorAdapter::AsGeTensor(item));
}
vector<GeTensor> ge_outputs;
for (auto &item : outputs) {
ge_outputs.emplace_back(TensorAdapter::AsGeTensor(item));
}
Status ret = graph_manager_.RunGraphWithStreamAsync(graph_id, stream, session_id_, ge_inputs, ge_outputs);
domi::GetContext().out_nodes_map.clear();
domi::GetContext().user_out_nodes.clear();
if (ret != SUCCESS) {
GELOGE(ret, "[Run][GraphWithStreamAsync]failed,"
"session id = %lu, graph id = %u, stream = %p.", session_id_, graph_id, stream);
REPORT_CALL_ERROR("E19999", "GraphManager RunGrapWithStreamhAsync failed,"
"session id = %lu, graph id = %u, stream = %p.", session_id_, graph_id, stream);
return ret;
}

GELOGI("Run graph with stream success, session id = %lu, graph id = %u, stream = %p.",
session_id_, graph_id, stream);
return SUCCESS;
} else {
GELOGE(GE_SESS_ALREADY_RUNNING, "[Run][GraphWithStreamAsync]failed because mutex try_lock false,"
"session id = %lu, graph id = %u, stream = %p.", session_id_, graph_id, stream);
REPORT_INNER_ERROR("E19999", "[Run][GraphWithStreamAsync]failed failed because mutex try_lock false,"
"session id = %lu, graph id = %u, stream = %p.", session_id_, graph_id, stream);
return GE_SESS_ALREADY_RUNNING;
}
}

Status InnerSession::RemoveGraph(uint32_t graph_id) { Status InnerSession::RemoveGraph(uint32_t graph_id) {
std::lock_guard<std::mutex> lock(resource_mutex_); std::lock_guard<std::mutex> lock(resource_mutex_);
if (!init_flag_) { if (!init_flag_) {


+ 3
- 0
ge/session/inner_session.h View File

@@ -41,6 +41,9 @@ class InnerSession {


Status RunGraph(uint32_t graph_id, const std::vector<Tensor> &inputs, std::vector<Tensor> &outputs); Status RunGraph(uint32_t graph_id, const std::vector<Tensor> &inputs, std::vector<Tensor> &outputs);


Status RunGraphWithStreamAsync(uint32_t graph_id, rtStream_t stream, const std::vector<Tensor> &inputs,
std::vector<Tensor> &outputs);

Status RemoveGraph(uint32_t graph_id); Status RemoveGraph(uint32_t graph_id);


Status BuildGraph(uint32_t graph_id, const std::vector<InputTensorInfo> &inputs); Status BuildGraph(uint32_t graph_id, const std::vector<InputTensorInfo> &inputs);


+ 27
- 0
ge/session/session_manager.cc View File

@@ -242,6 +242,33 @@ Status SessionManager::RunGraph(SessionId session_id, uint32_t graph_id, const s
return innerSession->RunGraph(graph_id, inputs, outputs); return innerSession->RunGraph(graph_id, inputs, outputs);
} }


Status SessionManager::RunGraphWithStreamAsync(SessionId session_id,
uint32_t graph_id,
rtStream_t stream,
const std::vector<Tensor> &inputs,
std::vector<Tensor> &outputs) {
if (!init_flag_) {
GELOGE(GE_SESSION_MANAGER_NOT_INIT,
"[RunWithStream][Graph]Session manager is not initialized,"
"session id = %lu, graph id = %u, stream = %p.", session_id, graph_id, stream);
REPORT_INNER_ERROR("E19999",
"RunGraphWithStreamAsync fail for Session manager is not initialized,"
"session id = %lu, graph id = %u, stream = %p.", session_id, graph_id, stream);
return GE_SESSION_MANAGER_NOT_INIT;
}
SessionPtr innerSession = nullptr;
{
std::lock_guard<std::mutex> lock(mutex_);
std::map<SessionId, SessionPtr>::iterator it = session_manager_map_.find(session_id);
if (it == session_manager_map_.end()) {
return GE_SESSION_NOT_EXIST;
} else {
innerSession = it->second;
}
}
return innerSession->RunGraphWithStreamAsync(graph_id, stream, inputs, outputs);
}

Status SessionManager::RemoveGraph(SessionId session_id, uint32_t graph_id) { Status SessionManager::RemoveGraph(SessionId session_id, uint32_t graph_id) {
if (!init_flag_) { if (!init_flag_) {
GELOGE(GE_SESSION_MANAGER_NOT_INIT, GELOGE(GE_SESSION_MANAGER_NOT_INIT,


+ 14
- 0
ge/session/session_manager.h View File

@@ -25,6 +25,7 @@
#include "common/ge_inner_error_codes.h" #include "common/ge_inner_error_codes.h"
#include "ge/ge_api_types.h" #include "ge/ge_api_types.h"
#include "session/inner_session.h" #include "session/inner_session.h"
#include "runtime/base.h"


namespace ge { namespace ge {
using SessionPtr = std::shared_ptr<InnerSession>; using SessionPtr = std::shared_ptr<InnerSession>;
@@ -96,6 +97,19 @@ class SessionManager {
Status RunGraph(SessionId session_id, uint32_t graph_id, const std::vector<Tensor> &inputs, Status RunGraph(SessionId session_id, uint32_t graph_id, const std::vector<Tensor> &inputs,
std::vector<Tensor> &outputs); std::vector<Tensor> &outputs);


///
/// @ingroup ge_session
/// @brief run a graph of the session with specific stream asynchronously
/// @param [in] session_id session id
/// @param [in] graph_id graph id
/// @param [in] stream specific stream
/// @param [in] inputs input data
/// @param [out] outputs output data
/// @return Status result of function
///
Status RunGraphWithStreamAsync(SessionId session_id, uint32_t graph_id, rtStream_t stream,
const std::vector<Tensor> &inputs, std::vector<Tensor> &outputs);

/// ///
/// @ingroup ge_session /// @ingroup ge_session
/// @brief remove a graph from the session with specific session id /// @brief remove a graph from the session with specific session id


+ 12
- 0
inc/external/ge/ge_api.h View File

@@ -121,6 +121,18 @@ class GE_FUNC_VISIBILITY Session {
/// ///
Status RunGraph(uint32_t graphId, const std::vector<Tensor> &inputs, std::vector<Tensor> &outputs); Status RunGraph(uint32_t graphId, const std::vector<Tensor> &inputs, std::vector<Tensor> &outputs);


///
/// @ingroup ge_graph
/// @brief run a graph of the session with specific session id and specific stream asynchronously
/// @param [in] graph_id graph id
/// @param [in] stream specific stream
/// @param [in] inputs input data
/// @param [out] outputs output data
/// @return Status result of function
///
Status RunGraphWithStreamAsync(uint32_t graph_id, void *stream, const std::vector<Tensor> &inputs,
std::vector<Tensor> &outputs);

/// ///
/// @ingroup ge_graph /// @ingroup ge_graph
/// @brief build graph in the session with specific session id /// @brief build graph in the session with specific session id


+ 1
- 1
metadef

@@ -1 +1 @@
Subproject commit 1c41e02f73b6e8f95369e052ee4de285145fb34f
Subproject commit 0facaa5af36b64c9d39603ed419191d21832df8a

+ 2
- 0
tests/ut/ge/CMakeLists.txt View File

@@ -788,9 +788,11 @@ set(MULTI_PARTS_TEST_FILES
"graph/preprocess/graph_preprocess_unittest.cc" "graph/preprocess/graph_preprocess_unittest.cc"
"graph/manager/hcom_util_unittest.cc" "graph/manager/hcom_util_unittest.cc"
"graph/manager/graph_caching_allocator_unittest.cc" "graph/manager/graph_caching_allocator_unittest.cc"
"graph/manager/run_graph_unittest.cc"
"graph/partition/dynamic_shape_partition_unittest.cc" "graph/partition/dynamic_shape_partition_unittest.cc"
"graph/manager/graph_manager_unittest.cc" "graph/manager/graph_manager_unittest.cc"
"session/omg_omg_unittest.cc" "session/omg_omg_unittest.cc"
"session/ge_api_unittest.cc"
) )


set(GENERATOR_TEST_FILES set(GENERATOR_TEST_FILES


+ 99
- 1
tests/ut/ge/graph/ge_executor_unittest.cc View File

@@ -38,6 +38,7 @@
#include "graph/load/model_manager/model_manager.h" #include "graph/load/model_manager/model_manager.h"
#include "graph/load/model_manager/task_info/kernel_task_info.h" #include "graph/load/model_manager/task_info/kernel_task_info.h"
#include "graph/load/model_manager/task_info/kernel_ex_task_info.h" #include "graph/load/model_manager/task_info/kernel_ex_task_info.h"
#include "graph/execute/graph_execute.h"
#include "ge/common/dump/dump_properties.h" #include "ge/common/dump/dump_properties.h"
#include "graph/manager/graph_mem_allocator.h" #include "graph/manager/graph_mem_allocator.h"
#include "graph/utils/graph_utils.h" #include "graph/utils/graph_utils.h"
@@ -192,6 +193,104 @@ TEST_F(UtestGeExecutor, kernel_ex_InitDumpTask) {
kernel_ex_task_info.InitDumpTask(nullptr, op_desc); kernel_ex_task_info.InitDumpTask(nullptr, op_desc);
} }


TEST_F(UtestGeExecutor, execute_graph_with_stream) {
DavinciModel model(0, nullptr);
ComputeGraphPtr graph = make_shared<ComputeGraph>("default");

GeModelPtr ge_model = make_shared<GeModel>();
ge_model->SetGraph(GraphUtils::CreateGraphFromComputeGraph(graph));
AttrUtils::SetInt(ge_model, ATTR_MODEL_MEMORY_SIZE, 10240);
AttrUtils::SetInt(ge_model, ATTR_MODEL_STREAM_NUM, 1);

shared_ptr<domi::ModelTaskDef> model_task_def = make_shared<domi::ModelTaskDef>();
ge_model->SetModelTaskDef(model_task_def);

GeTensorDesc tensor(GeShape(), FORMAT_NCHW, DT_FLOAT);
TensorUtils::SetSize(tensor, 512);
{
OpDescPtr op_desc = CreateOpDesc("data", DATA);
op_desc->AddInputDesc(tensor);
op_desc->AddOutputDesc(tensor);
op_desc->SetInputOffset({1024});
op_desc->SetOutputOffset({1024});
NodePtr node = graph->AddNode(op_desc); // op_index = 0
}

{
OpDescPtr op_desc = CreateOpDesc("square", "Square");
op_desc->AddInputDesc(tensor);
op_desc->AddOutputDesc(tensor);
op_desc->SetInputOffset({1024});
op_desc->SetOutputOffset({1024});
NodePtr node = graph->AddNode(op_desc); // op_index = 1

domi::TaskDef *task_def = model_task_def->add_task();
task_def->set_stream_id(0);
task_def->set_type(RT_MODEL_TASK_KERNEL);
domi::KernelDef *kernel_def = task_def->mutable_kernel();
kernel_def->set_stub_func("stub_func");
kernel_def->set_args_size(64);
string args(64, '1');
kernel_def->set_args(args.data(), 64);
domi::KernelContext *context = kernel_def->mutable_context();
context->set_op_index(op_desc->GetId());
context->set_kernel_type(2); // ccKernelType::TE
uint16_t args_offset[9] = {0};
context->set_args_offset(args_offset, 9 * sizeof(uint16_t));
}

{
OpDescPtr op_desc = CreateOpDesc("memcpy", MEMCPYASYNC);
op_desc->AddInputDesc(tensor);
op_desc->AddOutputDesc(tensor);
op_desc->SetInputOffset({1024});
op_desc->SetOutputOffset({5120});
NodePtr node = graph->AddNode(op_desc); // op_index = 2

domi::TaskDef *task_def = model_task_def->add_task();
task_def->set_stream_id(0);
task_def->set_type(RT_MODEL_TASK_MEMCPY_ASYNC);
domi::MemcpyAsyncDef *memcpy_async = task_def->mutable_memcpy_async();
memcpy_async->set_src(1024);
memcpy_async->set_dst(5120);
memcpy_async->set_dst_max(512);
memcpy_async->set_count(1);
memcpy_async->set_kind(RT_MEMCPY_DEVICE_TO_DEVICE);
memcpy_async->set_op_index(op_desc->GetId());
}

{
OpDescPtr op_desc = CreateOpDesc("output", NETOUTPUT);
op_desc->AddInputDesc(tensor);
op_desc->SetInputOffset({5120});
op_desc->SetSrcName( { "memcpy" } );
op_desc->SetSrcIndex( { 0 } );
NodePtr node = graph->AddNode(op_desc); // op_index = 3
}

EXPECT_EQ(model.Assign(ge_model), SUCCESS);
EXPECT_EQ(model.Init(), SUCCESS);

EXPECT_EQ(model.input_addrs_list_.size(), 1);
EXPECT_EQ(model.output_addrs_list_.size(), 1);
EXPECT_EQ(model.task_list_.size(), 2);

OutputData output_data;
vector<OutputTensorInfo> outputs;
EXPECT_EQ(model.GenOutputTensorInfo(&output_data, outputs), SUCCESS);

GraphExecutor graph_executer;
graph_executer.init_flag_ = true;
GeRootModelPtr ge_root_model = make_shared<GeRootModel>(graph);
std::vector<GeTensor> input_tensor;
std::vector<GeTensor> output_tensor;
std::vector<InputOutputDescInfo> output_desc;
InputOutputDescInfo desc0;
output_desc.push_back(desc0);
graph_executer.ExecuteGraphWithStream(0, nullptr, ge_root_model, input_tensor, output_tensor);
}

TEST_F(UtestGeExecutor, get_op_attr) { TEST_F(UtestGeExecutor, get_op_attr) {
shared_ptr<DavinciModel> model = MakeShared<DavinciModel>(1, g_label_call_back); shared_ptr<DavinciModel> model = MakeShared<DavinciModel>(1, g_label_call_back);
model->SetId(1); model->SetId(1);
@@ -223,5 +322,4 @@ TEST_F(UtestGeExecutor, get_op_attr) {
EXPECT_EQ(ret, UNSUPPORTED); EXPECT_EQ(ret, UNSUPPORTED);
ret = ge_executor.GetOpAttr(3, "test", ge::ATTR_NAME_DATA_DUMP_ORIGIN_OP_NAMES, attr_value); ret = ge_executor.GetOpAttr(3, "test", ge::ATTR_NAME_DATA_DUMP_ORIGIN_OP_NAMES, attr_value);
EXPECT_EQ(ret, ACL_ERROR_GE_EXEC_MODEL_ID_INVALID); EXPECT_EQ(ret, ACL_ERROR_GE_EXEC_MODEL_ID_INVALID);
}
} }

+ 60
- 0
tests/ut/ge/graph/manager/run_graph_unittest.cc View File

@@ -0,0 +1,60 @@
/**
* Copyright 2021 Huawei Technologies Co., Ltd
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include <gtest/gtest.h>
#include <memory>
#include "graph/anchor.h"
#include "graph/attr_value.h"
#include "graph/debug/ge_attr_define.h"
#include "graph/utils/graph_utils.h"
#include "graph/utils/node_utils.h"
#include "graph/utils/op_desc_utils.h"
#include "graph/utils/tensor_utils.h"
#include "omg/omg_inner_types.h"
#define protected public
#define private public
#include"graph/manager/graph_manager_utils.h"
#include "graph/manager/graph_manager.h"
#undef protected
#undef private
using namespace std;
using namespace testing;
using namespace ge;
using domi::GetContext;
class UtestGraphRunTest : public testing::Test {
protected:
void SetUp() {}
void TearDown() { GetContext().out_nodes_map.clear(); }
};
TEST_F(UtestGraphRunTest, RunGraphWithStreamAsync) {
GraphManager graph_manager;
GeTensor input0, input1;
std::vector<GeTensor> inputs{input0, input1};
std::vector<GeTensor> outputs;
GraphNodePtr graph_node = std::make_shared<GraphNode>(1);
graph_manager.AddGraphNode(1, graph_node);
GraphPtr graph = std::make_shared<Graph>("test");
graph_node->SetGraph(graph);
graph_node->SetRunFlag(false);
graph_node->SetBuildFlag(true);
auto ret = graph_manager.RunGraphWithStreamAsync(1, nullptr, 0, inputs, outputs);
}

+ 58
- 0
tests/ut/ge/session/ge_api_unittest.cc View File

@@ -0,0 +1,58 @@
/**
* Copyright 2021 Huawei Technologies Co., Ltd
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include <gtest/gtest.h>
#include <vector>
#include <string>
#include <map>
#define protected public
#define private public
#include "common/ge/ge_util.h"
#include "proto/ge_ir.pb.h"
#include "inc/external/ge/ge_api.h"
#include "session/session_manager.h"
#undef protected
#undef private
using namespace std;
namespace ge {
class UtestGeApi : public testing::Test {
protected:
void SetUp() override {}
void TearDown() override {}
};
TEST_F(UtestGeApi, run_graph_with_stream) {
vector<Tensor> inputs;
vector<Tensor> outputs;
std::map<std::string, std::string> options;
Session session(options);
auto ret = session.RunGraphWithStreamAsync(10, nullptr, inputs, outputs);
ASSERT_NE(ret, SUCCESS);
SessionManager session_manager;
session_manager.init_flag_ = true;
ret = session_manager.RunGraphWithStreamAsync(10, 10, nullptr, inputs, outputs);
ASSERT_NE(ret, SUCCESS);
InnerSession inner_session(1, options);
inner_session.init_flag_ = true;
ret = inner_session.RunGraphWithStreamAsync(10, nullptr, inputs, outputs);
ASSERT_NE(ret, SUCCESS);
}
} // namespace ge

Loading…
Cancel
Save