diff --git a/ge/CMakeLists.txt b/ge/CMakeLists.txt index 4a296e87..8fcf97ef 100755 --- a/ge/CMakeLists.txt +++ b/ge/CMakeLists.txt @@ -373,6 +373,7 @@ set(TRAIN_SRC_LIST "opskernel_manager/ops_kernel_builder_manager.cc" "session/inner_session.cc" "session/session_manager.cc" + "graph/execute/model_executor.cc" "single_op/single_op.cc" "single_op/single_op_manager.cc" "single_op/single_op_model.cc" @@ -475,6 +476,7 @@ set(INFER_SRC_LIST "init/gelib.cc" "session/inner_session.cc" "session/session_manager.cc" + "graph/execute/model_executor.cc" "engine_manager/dnnengine_manager.cc" "opskernel_manager/ops_kernel_manager.cc" "opskernel_manager/ops_kernel_builder_manager.cc" diff --git a/ge/common/executor.h b/ge/common/executor.h new file mode 100644 index 00000000..7f1d7ef9 --- /dev/null +++ b/ge/common/executor.h @@ -0,0 +1,89 @@ +/** + * Copyright 2021 Huawei Technologies Co., Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#ifndef GE_COMMON_EXECUTOR_H +#define GE_COMMON_EXECUTOR_H + +#include "external/ge/ge_api_types.h" +#include "graph/ge_local_context.h" +#include "graph/manager/graph_manager_utils.h" + +namespace ge { +struct RunArgs { + GraphNodePtr graph_node; + GraphId graph_id; + uint64_t session_id; + struct error_message::Context error_context; + std::vector input_tensor; + GeRootModelPtr ge_root_model; + GEThreadLocalContext context; + RunAsyncCallback callback; +}; + +class Executor { + public: + /// + /// @ingroup ge + /// @brief Load mode from graph. + /// @param [in] GeRootModel: root model of graph compiled. + /// @param [in] GraphNode: node of graph. + /// @return Status result of function + /// + virtual Status LoadGraph(const GeRootModelPtr &ge_root_model, const GraphNodePtr &graph_node) = 0; + + /// + /// @ingroup ge + /// @brief Unload mode. + /// @param [in] GeRootModel: root model of graph compiled. + /// @param [in] graph_id: graph identifier. + /// @return Status result of function + /// + virtual Status UnloadGraph(const GeRootModelPtr &ge_root_model, uint32_t graph_id) = 0; + + /// + /// @ingroup ge + /// @brief Push model execution params to queue. + /// @param [in] RunArgs of for model execution. + /// @return Status result of function + /// + virtual Status PushGraph(const RunArgs &args) = 0; + + /// + /// @ingroup ge + /// @brief Run graph for synchronize model. + /// @param [in] graph_node: node of graph. + /// @param [in] graph_id: graph identifier. + /// @param [in] inputs: input data for the graph running. + /// @param [out] outputs: output data of the graph running + /// @return Status result of function + /// + virtual Status RunGraph(const GraphNodePtr &graph_node, GraphId graph_id, + const std::vector &inputs, std::vector &outputs) = 0; + + /// + /// @ingroup ge + /// @brief Run graph for NN synchronize model. + /// @param [in] graph_node: node of graph. + /// @param [in] graph_id: graph identifier. + /// @param [in] stream: Stream for model running. + /// @param [in] inputs: input data for the graph running. + /// @param [out] outputs: output data of the graph running + /// @return Status result of function + /// + virtual Status RunGraphWithStream(const GraphNodePtr &graph_node, GraphId graph_id, rtStream_t stream, + const std::vector &inputs, std::vector &outputs) = 0; +}; +} +#endif // GE_COMMON_EXECUTOR_H diff --git a/ge/graph/common/local_context.cc b/ge/graph/common/local_context.cc index fa2f78e0..bd747021 100644 --- a/ge/graph/common/local_context.cc +++ b/ge/graph/common/local_context.cc @@ -16,13 +16,12 @@ #include "graph/common/local_context.h" -#include "framework/common/ge_inner_error_codes.h" #include "framework/common/debug/ge_log.h" -#include "framework/omg/omg_inner_types.h" namespace ge { namespace { thread_local OmgContext *omg_context = nullptr; +thread_local OmeContext *ome_context = nullptr; } void SetLocalOmgContext(OmgContext &context) { @@ -37,4 +36,18 @@ OmgContext &GetLocalOmgContext() { return domi::GetContext(); } } + +void SetLocalOmeContext(OmeContext &context) { + ome_context = &context; +} + +OmeContext &GetLocalOmeContext() { + if (ome_context != nullptr) { + return *ome_context; + } + + GELOGW("ome_context is nullptr."); + static OmeContext context; + return context; +} } diff --git a/ge/graph/common/local_context.h b/ge/graph/common/local_context.h index 4aa95855..751c6692 100644 --- a/ge/graph/common/local_context.h +++ b/ge/graph/common/local_context.h @@ -22,5 +22,22 @@ namespace ge { void SetLocalOmgContext(OmgContext &context); OmgContext &GetLocalOmgContext(); + + +struct OmeContext { + bool need_multi_batch = false; + std::string dynamic_node_type; + std::vector data_nodes; + std::vector getnext_nosink_nodes; + std::vector dynamic_shape_dims; + std::vector>> user_input_dims; + std::vector> user_real_input_dims; +}; + +GE_FUNC_VISIBILITY +void SetLocalOmeContext(OmeContext &context); + +GE_FUNC_VISIBILITY +OmeContext &GetLocalOmeContext(); } // namespace ge #endif // GE_GRAPH_COMMON_LOCAL_CONTEXT_H_ diff --git a/ge/graph/execute/graph_execute.cc b/ge/graph/execute/graph_execute.cc index 02d7d3ca..ba35e7c0 100755 --- a/ge/graph/execute/graph_execute.cc +++ b/ge/graph/execute/graph_execute.cc @@ -31,7 +31,6 @@ GraphExecutor::GraphExecutor() sync_run_mutex_(nullptr), condition_(nullptr), graph_run_listener_(nullptr), - graph_context_(nullptr), last_graph_id_(UINT32_MAX), malloc_flag_(false) {} @@ -79,16 +78,6 @@ Status GraphExecutor::SetCondition(std::mutex *mutex, std::condition_variable *c return SUCCESS; } -Status GraphExecutor::SetGraphContext(GraphContextPtr graph_context_ptr) { - if (graph_context_ptr == nullptr) { - REPORT_INNER_ERROR("E19999", "Check param graph_context_ptr nullptr"); - GELOGE(GE_GRAPH_PARAM_NULLPTR, "[Check][Param] input param graph_context_ptr is nullptr"); - return GE_GRAPH_PARAM_NULLPTR; - } - graph_context_ = graph_context_ptr; - return SUCCESS; -} - Status GraphExecutor::SetDynamicSize(uint32_t model_id, const std::vector &batch_num, int32_t dynamic_type) { auto model_manager = ge::ModelManager::GetInstance(); GE_CHECK_NOTNULL(model_manager); diff --git a/ge/graph/execute/graph_execute.h b/ge/graph/execute/graph_execute.h index 879a124c..b6d56dff 100755 --- a/ge/graph/execute/graph_execute.h +++ b/ge/graph/execute/graph_execute.h @@ -60,8 +60,6 @@ class GraphExecutor { Status SetCondition(std::mutex *mutex, std::condition_variable *cond, std::shared_ptr listener); - Status SetGraphContext(GraphContextPtr graph_context_ptr); - static Status SetDynamicSize(uint32_t model_id, const std::vector &batch_num, int32_t dynamic_type); void SetTrainFlag(bool is_train_graph); @@ -160,8 +158,6 @@ class GraphExecutor { // Run graph asynchronous call back listener std::shared_ptr graph_run_listener_; - GraphContextPtr graph_context_; - std::vector outputs_desc_; GraphId last_graph_id_; diff --git a/ge/graph/execute/model_executor.cc b/ge/graph/execute/model_executor.cc new file mode 100644 index 00000000..50e8a5a5 --- /dev/null +++ b/ge/graph/execute/model_executor.cc @@ -0,0 +1,553 @@ +/** + * Copyright 2021 Huawei Technologies Co., Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "graph/execute/model_executor.h" + +#include "graph/ge_context.h" +#include "graph/debug/ge_attr_define.h" +#include "graph/common/ge_call_wrapper.h" +#include "graph/common/local_context.h" +#include "graph/manager/graph_var_manager.h" +#include "graph/utils/tensor_adapter.h" +#include "graph/load/graph_loader.h" +#include "common/math/math_util.h" +#include "common/formats/utils/formats_trans_utils.h" + +namespace { +constexpr int32_t kBase = 10; +constexpr uint8_t kNeverLoaded = 0; +} + +namespace ge { +/// +/// @ingroup ge +/// @brief graph executor init +/// @param [in] options user config params +/// @return Status result of function +/// +Status ModelExecutor::Initialize(const map &options) { + graph_run_listener_ = MakeShared(sync_run_mutex_, condition_); + if (graph_run_listener_ == nullptr) { + REPORT_CALL_ERROR("E19999", "New GraphModelListener fail"); + GELOGE(MEMALLOC_FAILED, "[New][GraphModelListener] failed"); + return MEMALLOC_FAILED; + } + + train_graph_flag_ = ParseTrainGraphFlag(); + thread_run_flag_.store(true); + run_thread_ = std::thread(&ModelExecutor::RunThread, this); + + init_flag_ = true; + return SUCCESS; +} + +/// +/// @ingroup ge +/// @brief graph executor finalize +/// @return Status result of function +/// +Status ModelExecutor::Finalize() { + if (!init_flag_) { + GELOGW("ModelExecutor has not been initialized."); + return SUCCESS; + } + + StopQueue(); + if (run_thread_.joinable()) { + run_thread_.join(); + } + + if (graph_executor_.FreeExecuteMemory() != SUCCESS) { + GELOGW("Graph executor FreeExecuteMemory failed, resources may not be released correctly."); + } + + return SUCCESS; +} + +// OPTION_GRAPH_RUN_MODE is supposed to be a session-level option, but it used to be set to global-level in the past. +// If can not parse from session, it can parse from global by GetContext(). +bool ModelExecutor::ParseTrainGraphFlag() { + string run_mode; + if (GetContext().GetOption(OPTION_GRAPH_RUN_MODE, run_mode) == SUCCESS && !run_mode.empty()) { + if (GraphRunMode(std::strtol(run_mode.c_str(), nullptr, kBase)) >= TRAIN) { + GELOGI("Graph train flag set."); + return true; + } + } + return false; +} + +void ModelExecutor::AddGraphNode(GraphId graph_id, const GraphNodePtr &graph_node) { + std::lock_guard lock(mutex_); + graph_nodes_.emplace(graph_id, graph_node); +} + +void ModelExecutor::RemoveGraphNode(GraphId graph_id) { + std::lock_guard lock(mutex_); + graph_nodes_.erase(graph_id); +} + +/// +/// @ingroup ge +/// @brief Load mode for graph. +/// @param [in] GeRootModel: root model of graph compiled. +/// @param [in] GraphNode: node of graph. +/// @return Status result of function +/// +Status ModelExecutor::LoadGraph(const GeRootModelPtr &ge_root_model, const GraphNodePtr &graph_node) { + GE_CHECK_NOTNULL(graph_node); + if (ge_root_model == nullptr) { + return SUCCESS; + } + + UpdateLocalOmeContext(graph_node); + return graph_node->IsAsync() ? ModelLoadAsync(ge_root_model, graph_node) : ModelLoadSync(ge_root_model, graph_node); +} + +/// +/// @ingroup ge +/// @brief Unload mode for graph. +/// @param [in] GeRootModel: root model of graph compiled. +/// @param [in] graph_id: graph identifier. +/// @return Status result of function +/// +Status ModelExecutor::UnloadGraph(const GeRootModelPtr &ge_root_model, uint32_t graph_id) { + GE_CHECK_NOTNULL(ge_root_model); + rtError_t rt_ret = rtSetDevice(GetContext().DeviceId()); + if (rt_ret != RT_ERROR_NONE) { + GELOGW("[GraphExecutor] rtSetDevice failed, modelId=%u, graphId=%u.", ge_root_model->GetModelId(), graph_id); + return FAILED; + } + + RemoveGraphNode(graph_id); + Status ret = UnloadModel(ge_root_model, graph_id); + if (ret != SUCCESS) { + GELOGW("[GraphExecutor] unload model failed, graph_id=%u.", graph_id); + } + rt_ret = rtDeviceReset(GetContext().DeviceId()); + if (rt_ret != RT_ERROR_NONE) { + GELOGW("[GraphExecutor] rtDeviceReset failed, graphId=%u.", graph_id); + } + + return ret; +} + +Status ModelExecutor::UnloadModel(const GeRootModelPtr &ge_root_model, uint32_t graph_id) { + GE_CHECK_NOTNULL(ge_root_model); + for (size_t i = 0; i < ge_root_model->GetAllModelId().size(); ++i) { + uint32_t model_id = ge_root_model->GetAllModelId()[i]; + GELOGI("Unload model %u.", model_id); + Status ret = GraphLoader::UnloadModel(model_id); + if (ret != SUCCESS) { + GELOGE(ret, "[GraphExecutor] unload model failed, modelId=%u, graphId=%u.", model_id, graph_id); + return ret; + } + } + return SUCCESS; +} + +void ModelExecutor::StopQueue() { + thread_run_flag_.store(false); + run_args_q_.Stop(); +} + +void ModelExecutor::ReturnError(RunAsyncCallback callback, Status ret, const string &log) { + StopQueue(); + GELOGE(ret, "%s.", log.c_str()); + std::vector outputs; + callback(ret, outputs); +} + +void ModelExecutor::UpdateLocalOmeContext(const GraphNodePtr &graph_node) { + std::lock_guard lock(mutex_); + SetLocalOmeContext(graph_node->GetOmeContext()); +} + +/// +/// @ingroup ge +/// @brief Push model execution params to queue. +/// @param [in] RunArgs of for model execution. +/// @return Status result of function +/// +Status ModelExecutor::PushGraph(const RunArgs &args) { + return run_args_q_.Push(args) ? SUCCESS : FAILED; +} + +void ModelExecutor::RunThread() { + ErrorManager::GetInstance().SetStage(error_message::kModelExecute, error_message::kModelExecute); + if (prctl(PR_SET_NAME, ("GE_Run")) != 0) { + GELOGW("Set thread name failed."); + } + + RunArgs args; + while (thread_run_flag_) { + if (!run_args_q_.Pop(args)) { + continue; + } + + GELOGI("[RunThread] A new loop start, graph_id:%u.", args.graph_id); + ErrorManager::GetInstance().SetErrorContext(args.error_context); + GetContext().SetSessionId(args.session_id); + GetThreadLocalContext() = args.context; + UpdateLocalOmeContext(args.graph_node); + + // parse inputs.dims to vector> dynamic_dims + Status ret = ParseInputsDims(args.input_tensor); + if (ret != SUCCESS) { + ReturnError(args.callback, ret, "ParseInputsDims failed, thread exit."); + args.graph_node->Unlock(); + return; + } + + args.graph_node->UpdateLoadFlag(); + if (!args.graph_node->GetLoadFlag()) { + ErrorManager::GetInstance().SetStage(error_message::kModelLoad, error_message::kModelLoad); + args.ge_root_model->SetTrainFlag(train_graph_flag_); + ret = ModelLoadAsync(args.ge_root_model, args.graph_node); + if (ret != SUCCESS || args.ge_root_model == nullptr) { + StopQueue(); + ReturnError(args.callback, ret, "LoadGraphAsync failed, thread exit."); + args.graph_node->Unlock(); + return; + } + // control the times of graph loading in multi-thread scenario + args.graph_node->DecreaseLoadCount(); + args.graph_node->IncreaseLoadRecord(); + + args.graph_node->SetLoadFlag(true); + GELOGI("LoadGraph[%u], model[%u] success and set LoadFlag to true.", args.graph_node->GetGraphId(), + args.ge_root_model->GetModelId()); + } + + ErrorManager::GetInstance().SetStage(error_message::kModelExecute, error_message::kModelExecute); + if (train_graph_flag_) { + graph_executor_.SetTrainFlag(train_graph_flag_); + } + + ret = graph_executor_.ExecuteGraphAsync(args.graph_id, args.graph_node->GetGeRootModel(), + args.input_tensor, args.callback); + args.graph_node->SetRunFlag(false); + if (ret != SUCCESS) { + ReturnError(args.callback, ret, "ExecuteGraphAsync failed, thread exit."); + args.graph_node->Unlock(); + return; + } + args.graph_node->Unlock(); + GELOGI("[GraphExecutor] Run graph async success, graph_id=%u.", args.graph_id); + } +} + +/// +/// @ingroup ge +/// @brief Run graph for synchronize model. +/// @param [in] graph_node: node of graph. +/// @param [in] graph_id: graph identifier. +/// @param [in] inputs: input data for the graph running. +/// @param [out] outputs: output data of the graph running +/// @return Status result of function +/// +Status ModelExecutor::RunGraph(const GraphNodePtr &graph_node, GraphId graph_id, + const std::vector &inputs, std::vector &outputs) { + Status ret = graph_executor_.SetCondition(&sync_run_mutex_, &condition_, graph_run_listener_); + if (ret != SUCCESS) { + GELOGE(GE_GRAPH_RUNGRAPH_FAILED, "[Set][Condition] failed, graph_id = %u.", graph_id); + graph_node->SetRunFlag(false); + return GE_GRAPH_RUNGRAPH_FAILED; + } + + if (train_graph_flag_) { + graph_executor_.SetTrainFlag(train_graph_flag_); + } + ret = graph_executor_.ExecuteGraph(graph_id, graph_node->GetGeRootModel(), inputs, outputs); + + graph_node->SetRunFlag(false); + if (ret != SUCCESS) { + GELOGE(ret, "[Execute][Graph] failed, graph_id = %u.", graph_id); + return ret; + } + return SUCCESS; +} + +/// +/// @ingroup ge +/// @brief Run graph for NN synchronize model. +/// @param [in] graph_node: node of graph. +/// @param [in] graph_id: graph identifier. +/// @param [in] stream: Stream for model running. +/// @param [in] inputs: input data for the graph running. +/// @param [out] outputs: output data of the graph running +/// @return Status result of function +/// +Status ModelExecutor::RunGraphWithStream(const GraphNodePtr &graph_node, GraphId graph_id, rtStream_t stream, + const std::vector &inputs, std::vector &outputs) { + auto ret = graph_executor_.SetCondition(&sync_run_mutex_, &condition_, graph_run_listener_); + if (ret != SUCCESS) { + GELOGE(GE_GRAPH_RUNGRAPH_FAILED, "[Set][Condition] failed, graph id = %u, stream = %p.", graph_id, stream); + graph_node->SetRunFlag(false); + return GE_GRAPH_RUNGRAPH_FAILED; + } + + ret = graph_executor_.ExecuteGraphWithStream(graph_id, stream, graph_node->GetGeRootModel(), inputs, outputs); + graph_node->SetRunFlag(false); + graph_node->SetIsSpecificStream(false); + if (ret != SUCCESS) { + GELOGE(ret, "[Execute][Graph] With Stream failed, graph id = %u, stream = %p.", graph_id, stream); + return ret; + } + GELOGI("[Run][GraphWithStreamAsync] run graph success, graph id = %u, stream = %p.", graph_id, stream); + return SUCCESS; +} + +Status ModelExecutor::ModelLoadSync(const GeRootModelPtr &ge_root_model, const GraphNodePtr &graph_node) { + ge_root_model->SetIsSpecificStream(graph_node->IsSpecificStream()); + return ModelLoad(ge_root_model, graph_node, graph_run_listener_); +} + +Status ModelExecutor::ModelLoadAsync(const GeRootModelPtr &ge_root_model, const GraphNodePtr &graph_node) { + auto listener = MakeShared(); + GE_CHECK_NOTNULL(listener); + return ModelLoad(ge_root_model, graph_node, listener); +} + +Status ModelExecutor::ModelLoad(const GeRootModelPtr &ge_root_model, const GraphNodePtr &graph_node, + const std::shared_ptr &listener) { + ge_root_model->SetTrainFlag(train_graph_flag_); + bool is_unknown_shape = false; + GE_CHK_STATUS_RET(ge_root_model->CheckIsUnknownShape(is_unknown_shape)); + if (!is_unknown_shape) { + if (getenv(kEnvGeuseStaticMemory) != nullptr) { + GELOGI("[LoadGraph] GE_USE_STATIC_MEMORY is seted."); + } else { + auto root_graph = ge_root_model->GetRootGraph(); + GE_CHECK_NOTNULL(root_graph); + auto name_to_model = ge_root_model->GetSubgraphInstanceNameToModel(); + GeModelPtr ge_model = name_to_model[root_graph->GetName()]; + GE_CHK_STATUS_RET(CheckAndReleaseMemory(ge_model, graph_node)); + } + } + GE_TIMESTAMP_START(LoadModelOnline); + uint32_t model_id = INVALID_MODEL_ID; + Status ret = GraphLoader::LoadModelOnline(model_id, ge_root_model, listener); + GE_TIMESTAMP_EVENT_END(LoadModelOnline, "GraphLoader::LoadModelOnline"); + if (ret != SUCCESS) { + GELOGE(ret, "[Load][ModelOnline] Failed, model_id:%u", model_id); + graph_node->SetRunFlag(false); + return ret; + } + graph_node->SetLoadFlag(true); + ge_root_model->SetModelId(model_id); + graph_node->SetGeRootModel(ge_root_model); + AddGraphNode(graph_node->GetGraphId(), graph_node); + return SUCCESS; +} + +void ModelExecutor::ReleaseMemory(const GeModelPtr &ge_model, const GraphNodePtr &graph_node, + const std::vector &model_ids, uint32_t graph_id, uint64_t session_id) { + rtError_t rt_ret = rtSetDevice(GetContext().DeviceId()); + if (rt_ret != RT_ERROR_NONE) { + REPORT_CALL_ERROR("E19999", "Call rtSetDevice failed, device_id:%u", GetContext().DeviceId()); + GELOGE(RT_FAILED, "[Call][RtSetDevice] failed, device_id=%u.", GetContext().DeviceId()); + return; + } + for (auto model_id : model_ids) { + uint64_t max_memory_size = 0; + Status result = GraphLoader::GetMaxUsedMemory(model_id, max_memory_size); + if (result != SUCCESS) { + continue; + } + GELOGI("try to UnloadGraph[%u], model[%u] which MaxUsedMemory[%lu].", graph_id, model_id, max_memory_size); + if (model_ids.size() > 1) { + result = ge_model->GetSessionId(model_id, session_id); + if (result != SUCCESS) { + GELOGW("[GraphExecutor:] get session failed when dynamic memory, modelId=%u, graphId=%u.", model_id, + graph_id); + continue; + } + } + result = GraphLoader::DestroyAicpuKernel(session_id, model_id, 0); + if (result != SUCCESS) { + GELOGW("[GraphExecutor:] destroy aicpu kernel failed when dynamic memory, modelId=%u, graphId=%u.", model_id, + graph_id); + } + result = GraphLoader::UnloadModel(model_id); + if (result != SUCCESS) { + GELOGW("[GraphExecutor:] unload model failed, modelId=%u, graphId=%u.", model_id, graph_id); + } + GELOGI("UnloadGraph[%u], model[%u] success.", graph_id, model_id); + } + graph_node->SetLoadFlag(false); + // Allow model to be loaded agagin without adding graph again + graph_node->SetLoadCount(graph_node->GetLoadRecord()); + graph_node->SetLoadRecord(kNeverLoaded); + GeRootModelPtr ge_root_model = graph_node->GetGeRootModel(); + if (ge_root_model == nullptr) { + GELOGW("ge_root_model is null, graph_id:%u", graph_id); + return; + } + ge_root_model->ClearAllModelId(); + rt_ret = rtDeviceReset(GetContext().DeviceId()); + if (rt_ret != RT_ERROR_NONE) { + REPORT_CALL_ERROR("E19999", "Call rtDeviceReset failed, device_id:%u", GetContext().DeviceId()); + GELOGE(RT_FAILED, "[Call][RtDeviceReset] failed, device_id:%u.", GetContext().DeviceId()); + return; + } +} + +Status ModelExecutor::CheckAndReleaseMemory(const GeModelPtr &ge_model, const GraphNodePtr &graph_node) { + GELOGI("graph_id[%u]", graph_node->GetGraphId()); + int64_t free_memory = 0; + Status result = GraphLoader::GetMemoryInfo(free_memory); + if (result != SUCCESS) { + return result; + } + + int64_t value = 0; + int64_t memory_size = AttrUtils::GetInt(ge_model, ATTR_MODEL_MEMORY_SIZE, value) ? value : 0; + int64_t weight_size = AttrUtils::GetInt(ge_model, ATTR_MODEL_WEIGHT_SIZE, value) ? value : 0; + int64_t session_id = AttrUtils::GetInt(ge_model, MODEL_ATTR_SESSION_ID, value) ? value : 0; + + GELOGI("Graph[%u] need memory_size[%ld], weight_size[%ld], Device[%u] free_memory_size[%ld]", + graph_node->GetGraphId(), memory_size, weight_size, GetContext().DeviceId(), free_memory); + if (CheckInt64AddOverflow(memory_size, weight_size) != SUCCESS) { + REPORT_INNER_ERROR("E19999", "memory_size:%ld and weight_size:%ld will overflow after add, check invalid", + memory_size, weight_size); + GELOGE(INTERNAL_ERROR, "[Check][Param] memory_size:%ld and weight_size:%ld will overflow after add", + memory_size, weight_size); + return INTERNAL_ERROR; + } + if (free_memory >= (memory_size + weight_size)) { + return SUCCESS; + } + + std::lock_guard lock(mutex_); + for (const auto &it : graph_nodes_) { + auto graph_id = it.second->GetGraphId(); + auto model = it.second->GetGeRootModel(); + if (model == nullptr) { + continue; + } + auto model_id = model->GetModelId(); + auto model_ids = model->GetAllModelId(); + // unload model not release + bool is_unknown_shape = false; + GE_CHK_STATUS_RET(model->CheckIsUnknownShape(is_unknown_shape)); + if (is_unknown_shape) { + GELOGD("model_id[%u] graph_id[%u] is unknown model, not release memory", model_id, graph_id); + continue; + } + // not loaded,no need unload + if (!it.second->GetLoadFlag()) { + GELOGI("CheckAndReleaseMemory graph[%u] has not been loaded.", graph_id); + continue; + } + ReleaseMemory(ge_model, it.second, model_ids, graph_id, static_cast(session_id)); + } + + return SUCCESS; +} + +void ModelExecutor::ParseInputsDimsForData(const std::vector &input_tensor) { + GELOGD("Start parse input dims from data."); + for (size_t i = 0; i < input_tensor.size(); ++i) { + const TensorDesc &tensor_desc = input_tensor[i].GetTensorDesc(); + const Shape &shape = tensor_desc.GetShape(); + const auto &shape_dims = shape.GetDims(); + GELOGD("Input tensor dims is %s.", formats::JoinToString(shape_dims).c_str()); + GetLocalOmeContext().user_real_input_dims.emplace_back(shape_dims); + } +} + +Status ModelExecutor::ParseInputsDimsForGetNextNoSinkAndData(const vector &dynamic_nodes, + const std::vector &input_tensor) { + GELOGD("Start parse inputs dims when coexist data and getnext sink."); + for (size_t i = 0; i < dynamic_nodes.size(); ++i) { + auto op_desc = dynamic_nodes.at(i)->GetOpDesc(); + if (op_desc == nullptr) { + continue; + } + GeAttrValue::INT index = 0; + if (!(AttrUtils::GetInt(op_desc, ATTR_NAME_INDEX, index))) { + REPORT_CALL_ERROR("E19999", "Get Attr:%s from op:%s(%s) fail", ATTR_NAME_INDEX.c_str(), + op_desc->GetName().c_str(), op_desc->GetType().c_str()); + GELOGE(PARAM_INVALID, "[Get][Attr] %s from op:%s(%s) fail", ATTR_NAME_INDEX.c_str(), + op_desc->GetName().c_str(), op_desc->GetType().c_str()); + return PARAM_INVALID; + } + if (static_cast(index) > input_tensor.size()) { + REPORT_INNER_ERROR("E19999", "Attr:%s in op:%s(%s) value:%ld > param input_tensor.size:%zu, " + "check invalid", ATTR_NAME_INDEX.c_str(), + op_desc->GetName().c_str(), op_desc->GetType().c_str(), + index, input_tensor.size()); + GELOGE(PARAM_INVALID, "[Check][Param] Attr:%s in op:%s(%s) value:%ld > param input_tensor.size:%zu", + ATTR_NAME_INDEX.c_str(), op_desc->GetName().c_str(), op_desc->GetType().c_str(), + index, input_tensor.size()); + return PARAM_INVALID; + } + + const TensorDesc &tensor_desc = input_tensor[i].GetTensorDesc(); + const Shape &shape = tensor_desc.GetShape(); + const auto &shape_dims = shape.GetDims(); + GELOGI("Shape dims of %zu data is %s.", index, formats::JoinToString(shape_dims).c_str()); + GetLocalOmeContext().user_real_input_dims.emplace_back(std::move(shape_dims)); + } + return SUCCESS; +} + +Status ModelExecutor::ParseInputsDims(const std::vector &input_tensor) { + GELOGI("Start parse input dims of %zu input tensor.", input_tensor.size()); + GetLocalOmeContext().user_real_input_dims.clear(); + if (GetLocalOmeContext().dynamic_node_type.empty()) { + return SUCCESS; + } + + const vector &data_nodes = GetLocalOmeContext().data_nodes; + const vector &getnext_nosink_nodes = GetLocalOmeContext().getnext_nosink_nodes; + GELOGD("Data nodes count is %zu, getnext nosink nodes count is %zu.", data_nodes.size(), + getnext_nosink_nodes.size()); + if (GetLocalOmeContext().dynamic_node_type == DATA) { + if (getnext_nosink_nodes.empty()) { + // just data or data+getnext_sink + ParseInputsDimsForData(input_tensor); + } else { + // data+getnext_nosink, but only need to get shape_dims of data + if (ParseInputsDimsForGetNextNoSinkAndData(data_nodes, input_tensor) != SUCCESS) { + GELOGE(PARAM_INVALID, "[Parse][Dims] from data failed, when data coexist with getnext nosink."); + return PARAM_INVALID; + } + } + } else { + if (getnext_nosink_nodes.empty()) { + // just getnext_sink or getnext_sink+data, need to get shape_dims from aicpu op + GELOGI("Need to get dims from aicpu op: GETDYNAMICDIMS."); + return SUCCESS; + } else { + if (data_nodes.empty()) { + // just getnext_nosink + ParseInputsDimsForData(input_tensor); + } else { + // getnext_nosink + data, but only need to get shape_dims of getnext_nosink + if (ParseInputsDimsForGetNextNoSinkAndData(getnext_nosink_nodes, input_tensor) != SUCCESS) { + GELOGE(PARAM_INVALID, "[Parse][Dims] from getnext nosink failed, when data coexist with getnext nosink"); + return PARAM_INVALID; + } + } + } + } + + GELOGI("Parse %zu inputs dims success.", GetLocalOmeContext().user_real_input_dims.size()); + return SUCCESS; +} +} // namespace ge diff --git a/ge/graph/execute/model_executor.h b/ge/graph/execute/model_executor.h new file mode 100644 index 00000000..f8e717a1 --- /dev/null +++ b/ge/graph/execute/model_executor.h @@ -0,0 +1,139 @@ +/** + * Copyright 2021 Huawei Technologies Co., Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#ifndef GE_GRAPH_EXECUTE_MODEL_EXECUTOR_H +#define GE_GRAPH_EXECUTE_MODEL_EXECUTOR_H + +#include + +#include "common/executor.h" +#include "graph/execute/graph_execute.h" + +namespace ge { +class ModelExecutor : public Executor { + public: + /// + /// @ingroup ge + /// @brief graph executor init + /// @param [in] options user config params + /// @return Status result of function + /// + Status Initialize(const map &options); + + /// + /// @ingroup ge + /// @brief graph executor finalize + /// @return Status result of function + /// + Status Finalize(); + + /// + /// @ingroup ge + /// @brief Load mode for graph. + /// @param [in] GeRootModel: root model of graph compiled. + /// @param [in] GraphNode: node of graph. + /// @return Status result of function + /// + Status LoadGraph(const GeRootModelPtr &ge_root_model, const GraphNodePtr &graph_node); + + /// + /// @ingroup ge + /// @brief Unload mode for graph. + /// @param [in] GeRootModel: root model of graph compiled. + /// @param [in] graph_id: graph identifier. + /// @return Status result of function + /// + Status UnloadGraph(const GeRootModelPtr &ge_root_model, uint32_t graph_id); + + /// + /// @ingroup ge + /// @brief Push model execution params to queue. + /// @param [in] RunArgs of for model execution. + /// @return Status result of function + /// + Status PushGraph(const RunArgs &args); + + /// + /// @ingroup ge + /// @brief Run graph for synchronize model. + /// @param [in] graph_node: node of graph. + /// @param [in] graph_id: graph identifier. + /// @param [in] inputs: input data for the graph running. + /// @param [out] outputs: output data of the graph running + /// @return Status result of function + /// + Status RunGraph(const GraphNodePtr &graph_node, GraphId graph_id, + const std::vector &inputs, std::vector &outputs); + + /// + /// @ingroup ge + /// @brief Run graph for NN synchronize model. + /// @param [in] graph_node: node of graph. + /// @param [in] graph_id: graph identifier. + /// @param [in] stream: Stream for model running. + /// @param [in] inputs: input data for the graph running. + /// @param [out] outputs: output data of the graph running + /// @return Status result of function + /// + Status RunGraphWithStream(const GraphNodePtr &graph_node, GraphId graph_id, rtStream_t stream, + const std::vector &inputs, std::vector &outputs); + + private: + bool ParseTrainGraphFlag(); + + void AddGraphNode(GraphId graph_id, const GraphNodePtr &graph_node); + void RemoveGraphNode(GraphId graph_id); + + Status ModelLoadSync(const GeRootModelPtr &ge_root_model, const GraphNodePtr &graph_node); + Status ModelLoadAsync(const GeRootModelPtr &ge_root_model, const GraphNodePtr &graph_node); + Status ModelLoad(const GeRootModelPtr &ge_root_model, const GraphNodePtr &graph_node, + const std::shared_ptr &listener); + + Status UnloadModel(const GeRootModelPtr &ge_root_model, uint32_t graph_id); + + void ReleaseMemory(const GeModelPtr &ge_model, const GraphNodePtr &graph_node, const std::vector &model_ids, + uint32_t graph_id, uint64_t session_id); + Status CheckAndReleaseMemory(const GeModelPtr &ge_model, const GraphNodePtr &graph_node); + + void UpdateLocalOmeContext(const GraphNodePtr &graph_node); + + void RunThread(); + void StopQueue(); + void ReturnError(RunAsyncCallback callback, Status ret, const string &log); + + void ParseInputsDimsForData(const std::vector &input_tensor); + Status ParseInputsDimsForGetNextNoSinkAndData(const vector &dynamic_nodes, + const std::vector &input_tensor); + Status ParseInputsDims(const std::vector &input_tensor); + + bool init_flag_{false}; + bool train_graph_flag_{false}; + GraphExecutor graph_executor_; + + std::mutex mutex_; + std::map graph_nodes_; + + std::thread run_thread_; + std::atomic_bool thread_run_flag_{false}; + BlockingQueue run_args_q_; + + // for run graph synchronous return + std::mutex sync_run_mutex_; + std::condition_variable condition_; + // run graph synchronization call back listener + std::shared_ptr graph_run_listener_; +}; +} +#endif // GE_GRAPH_EXECUTE_MODEL_EXECUTOR_H \ No newline at end of file diff --git a/ge/graph/load/model_manager/model_manager.cc b/ge/graph/load/model_manager/model_manager.cc index 3c31014d..45540ba0 100755 --- a/ge/graph/load/model_manager/model_manager.cc +++ b/ge/graph/load/model_manager/model_manager.cc @@ -513,8 +513,7 @@ Status ModelManager::GetCurDynamicDims(const vector> &user_real_ } GELOGD("Cur dynamic dims is %s.", formats::JoinToString(cur_dynamic_dims).c_str()); bool cur_dynamic_dims_valid = false; - std::vector shape_strs = ge::StringUtils::Split(GetLocalOmgContext().dynamic_dims, ';'); - for (auto dynamic_dim : shape_strs) { + for (auto dynamic_dim : GetLocalOmeContext().dynamic_shape_dims) { if (dynamic_dim == formats::JoinToString(cur_dynamic_dims)) { cur_dynamic_dims_valid = true; break; @@ -556,10 +555,10 @@ Status ModelManager::DataInputTensor(uint32_t model_id, const std::vector cur_dynamic_dims; - if (!GetLocalOmgContext().user_real_input_dims.empty()) { - if (GetCurDynamicDims(GetLocalOmgContext().user_real_input_dims, GetLocalOmgContext().user_input_dims, + if (!GetLocalOmeContext().user_real_input_dims.empty()) { + if (GetCurDynamicDims(GetLocalOmeContext().user_real_input_dims, GetLocalOmeContext().user_input_dims, cur_dynamic_dims) != SUCCESS) { GELOGE(INTERNAL_ERROR, "[Get][CurDynamicDims] [Train_Dynamic] Failed to Parse real_dynamic_dims."); return INTERNAL_ERROR; diff --git a/ge/graph/manager/graph_manager.cc b/ge/graph/manager/graph_manager.cc index 04e0f51c..96dc59c5 100755 --- a/ge/graph/manager/graph_manager.cc +++ b/ge/graph/manager/graph_manager.cc @@ -129,8 +129,6 @@ const uint32_t kInitGraphCount = 1; const uint32_t kNotAdded = 0; const uint32_t kStartAdd = 1; const uint32_t kDoneAdded = 2; -const uint32_t kNeverLoaded = 0; -const size_t kAlignment = 64; bool IsTailingOptimization() { string is_tailing_optimization_option; @@ -164,26 +162,12 @@ ge::Status CheckFpCeilingMode() { } // namespace namespace ge { -GraphManager::GraphManager() - : thread_run_flag_(false), - graph_run_listener_(nullptr), - init_flag_(false) { -} - -Status GraphManager::Initialize(const std::map &options) { +Status GraphManager::Initialize(const std::map &options, Executor *executor) { ErrorManager::GetInstance().SetStage(error_message::kInitialize, error_message::kOther); if (init_flag_) { GELOGW("[Initialize] GraphManager already initialized."); return SUCCESS; } - - // malloc - graph_run_listener_ = MakeShared(sync_run_mutex_, condition_); - if (graph_run_listener_ == nullptr) { - REPORT_CALL_ERROR("E19999", "New GraphModelListener fail"); - GELOGE(MEMALLOC_FAILED, "[New][GraphModelListener] failed"); - return MEMALLOC_FAILED; - } // graph context graph_context_ = MakeShared(); if (graph_context_ == nullptr) { @@ -211,31 +195,18 @@ Status GraphManager::Initialize(const std::map &options) { return ret; } - graph_map_.clear(); - cache_helper_map_.clear(); - graph_id_to_add_graph_cond_.clear(); - graph_count_.clear(); + executor_ = executor; init_flag_ = true; thread_run_flag_ = true; - prerun_thread_ = std::thread(GraphManager::PreRunThread, this); - run_thread_ = std::thread(GraphManager::RunThread, this); + prerun_thread_ = std::thread(&GraphManager::PreRunThread, this); return SUCCESS; } Status GraphManager::UnloadModel(GeRootModelPtr ge_root_model, uint32_t graph_id) { - Status ret = SUCCESS; - for (size_t i = 0; i < ge_root_model->GetAllModelId().size(); ++i) { - uint32_t model_id = ge_root_model->GetAllModelId()[i]; - GELOGI("Unload model %u.", model_id); - ret = GraphLoader::UnloadModel(model_id); - if (ret != SUCCESS) { - GELOGW("[GraphManager] unload model failed, modelId=%u, graphId=%u.", model_id, graph_id); - return ret; - } - } - return ret; + GE_CHECK_NOTNULL(executor_); + return executor_->UnloadGraph(ge_root_model, graph_id); } Status GraphManager::Finalize() { @@ -244,23 +215,13 @@ Status GraphManager::Finalize() { return SUCCESS; } - if (graph_executor_.FreeExecuteMemory() != SUCCESS) { - GELOGW("Graph executor FreeExecuteMemory failed, resources may not be released correctly."); - } - - StopQueue(this); - + StopQueue(); if (prerun_thread_.joinable()) { prerun_thread_.join(); } - if (run_thread_.joinable()) { - run_thread_.join(); - } // check graph whether running or not Status unload_model_ret = SUCCESS; - Status ret; - rtError_t rt_ret; for (auto iter = graph_map_.begin(); iter != graph_map_.end(); ++iter) { GraphNodePtr graph_node = iter->second; if (graph_node->GetRunFlag()) { @@ -271,22 +232,10 @@ Status GraphManager::Finalize() { // unload model auto ge_root_model = graph_node->GetGeRootModel(); if (ge_root_model != nullptr && ge_root_model->GetModelId() != INVALID_MODEL_ID && graph_node->GetLoadFlag()) { - rt_ret = rtSetDevice(GetContext().DeviceId()); - if (rt_ret != RT_ERROR_NONE) { - GELOGW("[GraphManager] rtSetDevice failed, modelId=%u, graphId=%u.", ge_root_model->GetModelId(), iter->first); - unload_model_ret = FAILED; - continue; - } - ret = UnloadModel(ge_root_model, iter->first); + Status ret = UnloadModel(ge_root_model, iter->first); if (ret != SUCCESS) { - GELOGW("[GraphManager] unload model failed, graph_id=%u.", iter->first); unload_model_ret = ret; - } - rt_ret = rtDeviceReset(GetContext().DeviceId()); - if (rt_ret != RT_ERROR_NONE) { - GELOGW("[GraphManager] rtDeviceReset failed, graphId=%u.", iter->first); - unload_model_ret = FAILED; - continue; + GELOGW("[GraphManager] unload model failed, graph_id=%u.", iter->first); } } @@ -1122,12 +1071,7 @@ Status GraphManager::StartForRunGraph(const GraphNodePtr &graph_node, const std: return ret; } } - ErrorManager::GetInstance().SetStage(error_message::kModelLoad, error_message::kModelLoad); - if (!graph_node->IsAsync()) { - ret = LoadGraph(ge_root_model, graph_node); - } else { - ret = LoadGraphAsync(ge_root_model, graph_node); - } + ret = LoadGraph(ge_root_model, graph_node); if (ret != SUCCESS) { GELOGE(ret, "[Load][Graph] Failed, graph_id:%u.", graph_node->GetGraphId()); return ret; @@ -1135,13 +1079,8 @@ Status GraphManager::StartForRunGraph(const GraphNodePtr &graph_node, const std: graph_node->SetBuildFlag(true); var_acc_ctrl_.SetGraphBuildEnd(graph_node->GetGraphId()); } else if (!graph_node->GetLoadFlag()) { - ErrorManager::GetInstance().SetStage(error_message::kModelLoad, error_message::kModelLoad); GeRootModelPtr ge_root_model_ptr = graph_node->GetGeRootModel(); - if (!graph_node->IsAsync()) { - ret = LoadGraph(ge_root_model_ptr, graph_node); - } else { - ret = LoadGraphAsync(ge_root_model_ptr, graph_node); - } + ret = LoadGraph(ge_root_model, graph_node); if (ret != SUCCESS) { GELOGE(ret, "[Load][Graph] Failed, graph_id:%u.", graph_node->GetGraphId()); return ret; @@ -1149,40 +1088,16 @@ Status GraphManager::StartForRunGraph(const GraphNodePtr &graph_node, const std: } return ret; } + Status GraphManager::LoadGraph(const GeRootModelPtr &ge_root_model, const GraphNodePtr &graph_node) { GELOGI("[LoadGraph] run_graph_flag[%d], graph_id[%u]", options_.run_graph_flag, graph_node->GetGraphId()); - if (options_.run_graph_flag && ge_root_model != nullptr) { - ge_root_model->SetTrainFlag(GetTrainFlag()); - // synchronization run graph with model - std::shared_ptr model_listener = GetModelListener(); - ModelIdInfo model_id_info; - bool is_unknown_shape = false; - GE_CHK_STATUS_RET(ge_root_model->CheckIsUnknownShape(is_unknown_shape)); - if (!is_unknown_shape) { - if (getenv(kEnvGeuseStaticMemory) != nullptr) { - GELOGI("[LoadGraph] GE_USE_STATIC_MEMORY is seted."); - } else { - auto root_graph = ge_root_model->GetRootGraph(); - GE_CHECK_NOTNULL(root_graph); - auto name_to_model = ge_root_model->GetSubgraphInstanceNameToModel(); - GeModelPtr ge_model = name_to_model[root_graph->GetName()]; - GE_CHK_STATUS_RET(CheckAndReleaseMemory(ge_model, graph_node)); - } - } - ge_root_model->SetIsSpecificStream(graph_node->IsSpecificStream()); - GE_TIMESTAMP_START(LoadGraph); - Status ret = GraphLoader::LoadModelOnline(model_id_info.model_id, ge_root_model, model_listener); - GE_TIMESTAMP_EVENT_END(LoadGraph, "GraphManager::LoadGraph"); - if (ret != SUCCESS) { - GELOGE(ret, "[Load][Model] failed, ret:%d", ret); - graph_node->SetRunFlag(false); - return ret; - } - graph_node->SetLoadFlag(true); - ge_root_model->SetModelId(model_id_info.model_id); - graph_node->SetGeRootModel(ge_root_model); + if (!options_.run_graph_flag) { + return SUCCESS; } - return SUCCESS; + + ErrorManager::GetInstance().SetStage(error_message::kModelLoad, error_message::kModelLoad); + GE_CHECK_NOTNULL(executor_); + return executor_->LoadGraph(ge_root_model, graph_node); } Status GraphManager::LoadFromCache(const GraphNodePtr &graph_node, const ModelCacheHelperPtr &cache_helper, @@ -1272,45 +1187,14 @@ Status GraphManager::SaveCacheAfterBuild(uint32_t graph_id, ge::ComputeGraphPtr Status GraphManager::InnerRunGraph(GraphNodePtr &graph_node, const GraphId &graph_id, const std::vector &inputs, std::vector &outputs) { - Status ret = graph_executor_.SetCondition(&sync_run_mutex_, &condition_, graph_run_listener_); - if (ret != SUCCESS) { - GELOGE(GE_GRAPH_RUNGRAPH_FAILED, "[Set][Condition] failed, graph_id = %u.", graph_id); - graph_node->SetRunFlag(false); - return GE_GRAPH_RUNGRAPH_FAILED; - } - - if (GetTrainFlag()) { - GE_CHK_STATUS_RET(graph_executor_.SetGraphContext(GetGraphContext())); - graph_executor_.SetTrainFlag(options_.train_graph_flag); - } - ret = graph_executor_.ExecuteGraph(graph_id, graph_node->GetGeRootModel(), inputs, outputs); - - graph_node->SetRunFlag(false); - if (ret != SUCCESS) { - GELOGE(ret, "[Execute][Graph] failed, graph_id = %u.", graph_id); - return ret; - } - return SUCCESS; + GE_CHECK_NOTNULL(executor_); + return executor_->RunGraph(graph_node, graph_id, inputs, outputs); } Status GraphManager::InnerRunGraphWithStream(GraphNodePtr &graph_node, const GraphId &graph_id, rtStream_t stream, const std::vector &inputs, std::vector &outputs) { - auto ret = graph_executor_.SetCondition(&sync_run_mutex_, &condition_, graph_run_listener_); - if (ret != SUCCESS) { - GELOGE(GE_GRAPH_RUNGRAPH_FAILED, "[Set][Condition] failed, graph id = %u, stream = %p.", graph_id, stream); - graph_node->SetRunFlag(false); - return GE_GRAPH_RUNGRAPH_FAILED; - } - - ret = graph_executor_.ExecuteGraphWithStream(graph_id, stream, graph_node->GetGeRootModel(), inputs, outputs); - graph_node->SetRunFlag(false); - graph_node->SetIsSpecificStream(false); - if (ret != SUCCESS) { - GELOGE(ret, "[Execute][Graph] With Stream failed, graph id = %u, stream = %p.", graph_id, stream); - return ret; - } - GELOGI("[Run][GraphWithStreamAsync] run graph success, graph id = %u, stream = %p.", graph_id, stream); - return SUCCESS; + GE_CHECK_NOTNULL(executor_); + return executor_->RunGraphWithStream(graph_node, graph_id, stream, inputs, outputs); } Status GraphManager::RunGraphWithStreamAsync(const GraphId &graph_id, rtStream_t stream, uint64_t session_id, @@ -1665,38 +1549,18 @@ Status GraphManager::RemoveGraph(const GraphId &graph_id) { std::lock_guard lock(unload_model_mutex_); - Status middle_ret; - rtError_t rt_ret; var_acc_ctrl_.RemoveGraph(graph_id); RemoveGraphNode(graph_id); - RemoveModelCacheHelper(graph_id); auto ge_root_model = graph_node->GetGeRootModel(); if (CheckModelLoad(ge_root_model, graph_node->GetLoadFlag())) { - rt_ret = rtSetDevice(GetContext().DeviceId()); - if (rt_ret != RT_ERROR_NONE) { - REPORT_CALL_ERROR("E19999", "Call rtSetDevice failed, device_id:%u, graph_id:%u", - GetContext().DeviceId(), graph_id); - GELOGE(RT_FAILED, "[Call][RtSetDevice] failed, modelId=%u, graphId=%u.", ge_root_model->GetModelId(), - graph_id); - return FAILED; - } - // same graph may be added for several times, different models were created separately, - // unload them respectively. - middle_ret = UnloadModel(ge_root_model, graph_id); + Status middle_ret = UnloadModel(ge_root_model, graph_id); if (middle_ret != SUCCESS) { REPORT_INNER_ERROR("E19999", "UnloadModel for graph:%u failed, check invalid", graph_id); GELOGE(middle_ret, "[Unload][Model] model failed, graph_id=%u.", graph_id); ret = middle_ret; } - rt_ret = rtDeviceReset(GetContext().DeviceId()); - if (rt_ret != RT_ERROR_NONE) { - REPORT_CALL_ERROR("E19999", "Call rtDeviceReset failed, device_id:%u, graph_id:%u", - GetContext().DeviceId(), graph_id); - GELOGE(RT_FAILED, "[Call][RtDeviceReset] failed, device_id:%u, graph_id:%u", GetContext().DeviceId(), graph_id); - ret = FAILED; - } } RemoveCompilerStages(graph_id); @@ -2120,8 +1984,6 @@ Status GraphManager::SummaryHandle(const GraphId &graph_id, std::vector &outputs) { GELOGI("[GraphManager] CheckpointHandle, outputsSize=%zu.", outputs.size()); - std::vector outputs_desc = graph_executor_.GetOutputsDesc(); - GELOGI("[GraphManager] CheckpointHandle, outputsDescSize=%zu.", outputs_desc.size()); std::map save_results; NodePtr netoutput = nullptr; @@ -2786,160 +2648,6 @@ void GraphManager::ChangeConstTypeWhenTraining(const ComputeGraphPtr &compute_gr } } -Status GraphManager::LoadGraphAsync(const GeRootModelPtr &ge_root_model, const GraphNodePtr &graph_node) { - GELOGI("[LoadGraphAsync] run_graph_flag[%d], graph_id[%u]", options_.run_graph_flag, graph_node->GetGraphId()); - if (options_.run_graph_flag && ge_root_model != nullptr) { - ge_root_model->SetTrainFlag(GetTrainFlag()); - // synchronization run graph with model - ModelIdInfo model_id_info; - bool is_unknown_shape = false; - GE_CHK_STATUS_RET(ge_root_model->CheckIsUnknownShape(is_unknown_shape)); - if (!is_unknown_shape) { - if (getenv(kEnvGeuseStaticMemory) != nullptr) { - GELOGI("[LoadGraphAsync] GE_USE_STATIC_MEMORY is seted."); - } else { - auto root_graph = ge_root_model->GetRootGraph(); - GE_CHECK_NOTNULL(root_graph); - auto name_to_model = ge_root_model->GetSubgraphInstanceNameToModel(); - GeModelPtr ge_model = name_to_model[root_graph->GetName()]; - GE_CHK_STATUS_RET(CheckAndReleaseMemory(ge_model, graph_node)); - } - } - GE_TIMESTAMP_START(LoadGraph); - auto listener = MakeShared(); - GE_CHECK_NOTNULL(listener); - Status ret = GraphLoader::LoadModelOnline(model_id_info.model_id, ge_root_model, listener); - GE_TIMESTAMP_EVENT_END(LoadGraph, "GraphManager::LoadGraphAsync"); - if (ret != SUCCESS) { - GELOGE(ret, "[Load][ModelOnline] Failed, model_id:%u", model_id_info.model_id); - graph_node->SetRunFlag(false); - return ret; - } - graph_node->SetLoadFlag(true); - ge_root_model->SetModelId(model_id_info.model_id); - graph_node->SetGeRootModel(ge_root_model); - } - return SUCCESS; -} - -void GraphManager::ReleaseMemory(const GeModelPtr &ge_model, GraphNodePtr &graph_node, - const std::vector &model_ids, uint32_t graph_id, uint64_t session_id) { - rtError_t rt_ret = rtSetDevice(GetContext().DeviceId()); - if (rt_ret != RT_ERROR_NONE) { - REPORT_CALL_ERROR("E19999", "Call rtSetDevice failed, device_id:%u", GetContext().DeviceId()); - GELOGE(RT_FAILED, "[Call][RtSetDevice] failed, device_id=%u.", GetContext().DeviceId()); - return; - } - for (auto model_id : model_ids) { - uint64_t max_memory_size = 0; - Status result = GraphLoader::GetMaxUsedMemory(model_id, max_memory_size); - if (result != SUCCESS) { - continue; - } - GELOGI("CheckAndReleaseMemory try to UnloadGraph[%u], model[%u] which MaxUsedMemory[%lu].", graph_id, model_id, - max_memory_size); - if (model_ids.size() > 1) { - result = ge_model->GetSessionId(model_id, session_id); - if (result != SUCCESS) { - GELOGW("[GraphManager:] get session failed when dynamic memory, modelId=%u, graphId=%u.", model_id, - graph_id); - continue; - } - } - result = GraphLoader::DestroyAicpuKernel(session_id, model_id, 0); - if (result != SUCCESS) { - GELOGW("[GraphManager:] destroy aicpu kernel failed when dynamic memory, modelId=%u, graphId=%u.", model_id, - graph_id); - } - result = GraphLoader::UnloadModel(model_id); - if (result != SUCCESS) { - GELOGW("[GraphManager:] unload model failed, modelId=%u, graphId=%u.", model_id, graph_id); - } - GELOGI("CheckAndReleaseMemory UnloadGraph[%u], model[%u] success.", graph_id, model_id); - } - graph_node->SetLoadFlag(false); - // Allow model to be loaded agagin without adding graph again - graph_node->SetLoadCount(graph_node->GetLoadRecord()); - graph_node->SetLoadRecord(kNeverLoaded); - GeRootModelPtr ge_root_model = graph_node->GetGeRootModel(); - if (ge_root_model == nullptr) { - GELOGW("ge_root_model is null, graph_id:%u", graph_id); - return; - } - ge_root_model->ClearAllModelId(); - rt_ret = rtDeviceReset(GetContext().DeviceId()); - if (rt_ret != RT_ERROR_NONE) { - REPORT_CALL_ERROR("E19999", "Call rtDeviceReset failed, device_id:%u", GetContext().DeviceId()); - GELOGE(RT_FAILED, "[Call][RtDeviceReset] failed, device_id:%u.", GetContext().DeviceId()); - return; - } -} - -Status GraphManager::CheckAndReleaseMemory(const GeModelPtr &ge_model, const GraphNodePtr &graph_node) { - GELOGI("CheckAndReleaseMemory graph_id[%u]", graph_node->GetGraphId()); - int64_t value = 0; - bool ret = ge::AttrUtils::GetInt(ge_model, ATTR_MODEL_MEMORY_SIZE, value); - int64_t memory_size = ret ? value : 0; - ret = ge::AttrUtils::GetInt(ge_model, ATTR_MODEL_WEIGHT_SIZE, value); - int64_t weight_size = ret ? value : 0; - ret = ge::AttrUtils::GetInt(ge_model, MODEL_ATTR_SESSION_ID, value); - uint64_t session_id = ret ? value : 0; - - int64_t free_memory = 0; - Status result = GraphLoader::GetMemoryInfo(free_memory); - if (result != SUCCESS) { - return result; - } - - GELOGI( - "CheckAndReleaseMemory Graph[%u] need memory_size[%ld], weight_size[%ld]," - " Device[%u] free_memory_size[%ld]", - graph_node->GetGraphId(), memory_size, weight_size, GetContext().DeviceId(), free_memory); - if (ge::CheckInt64AddOverflow(memory_size, weight_size) != SUCCESS) { - REPORT_INNER_ERROR("E19999", "memory_size:%ld and weight_size:%ld will overflow after add, check invalid", - memory_size, weight_size); - GELOGE(INTERNAL_ERROR, "[Check][Param] memory_size:%ld and weight_size:%ld will overflow after add", - memory_size, weight_size); - return INTERNAL_ERROR; - } - if (free_memory >= (memory_size + weight_size)) { - return SUCCESS; - } - - std::lock_guard lock(unload_model_mutex_); - - std::map graph_map; - { - std::lock_guard lock(member_mutex_); - graph_map = graph_map_; - } - - for (auto &it : graph_map) { - auto graph_id = it.second->GetGraphId(); - auto model = it.second->GetGeRootModel(); - if (model == nullptr) { - continue; - } - auto model_id = model->GetModelId(); - auto model_ids = model->GetAllModelId(); - // unload model not release - bool is_unknown_shape = false; - GE_CHK_STATUS_RET(model->CheckIsUnknownShape(is_unknown_shape)); - if (is_unknown_shape) { - GELOGD("model_id[%u] graph_id[%u] is unknown model, not release memory", model_id, graph_id); - continue; - } - // not loaded,no need unload - if (!it.second->GetLoadFlag()) { - GELOGI("CheckAndReleaseMemory graph[%u] has not been loaded.", graph_id); - continue; - } - ReleaseMemory(ge_model, it.second, model_ids, graph_id, session_id); - } - - return SUCCESS; -} - Status GraphManager::ProcessSubGraphWithMultiThreads(GraphManager *graph_manager, GraphId root_graph_id, const SubGraphInfoPtr &sub_graph_info_ptr, const std::string &root_graph_name, @@ -3069,14 +2777,14 @@ Status GraphManager::IncreBuild(const GraphNodePtr &graph_node, GeModelPtr &ge_m return FAILED; } -Status GraphManager::CheckIncreBuildAndPreRun(GraphManager *graph_manager, const PreRunArgs &args, +Status GraphManager::CheckIncreBuildAndPreRun(const PreRunArgs &args, GraphNodePtr &graph_node, GeRootModelPtr &ge_root_model) { - if (!graph_manager->IsGraphNeedBuild(graph_node)) { + if (!IsGraphNeedBuild(graph_node)) { ge_root_model = graph_node->GetGeRootModel(); return SUCCESS; } if (graph_node->GetBuildFlag()) { - ReturnError(graph_manager, args.callback, PARAM_INVALID, + ReturnError(args.callback, PARAM_INVALID, "The graph " + std::to_string(graph_node->GetGraphId()) + " need to re-build, you should remove it" " from GE first, then AddGraph again and rebuild it."); @@ -3084,55 +2792,53 @@ Status GraphManager::CheckIncreBuildAndPreRun(GraphManager *graph_manager, const } // check need incre build. GeModelPtr ge_model = nullptr; - if (graph_manager->IncreBuild(graph_node, ge_model) != SUCCESS) { + if (IncreBuild(graph_node, ge_model) != SUCCESS) { std::vector ge_inputs; for (const auto &item: args.input_tensor) { ge_inputs.emplace_back(TensorAdapter::AsGeTensor(item)); } - Status ret = graph_manager->PreRun(graph_node, ge_inputs, ge_root_model, args.session_id); + Status ret = PreRun(graph_node, ge_inputs, ge_root_model, args.session_id); // release rts generate context RtContextUtil::GetInstance().DestroyRtContexts(args.session_id, graph_node->GetGraphId()); if (ret != SUCCESS) { - ReturnError(graph_manager, args.callback, ret, "PreRun Failed."); + ReturnError(args.callback, ret, "PreRun Failed."); return ret; } } graph_node->SetBuildFlag(true); - graph_manager->var_acc_ctrl_.SetGraphBuildEnd(graph_node->GetGraphId()); + var_acc_ctrl_.SetGraphBuildEnd(graph_node->GetGraphId()); return SUCCESS; } -void GraphManager::PreRunThread(GraphManager *graph_manager) { +void GraphManager::PreRunThread() { if (prctl(PR_SET_NAME, ("GE_PreRun")) != 0) { GELOGW("Set thread name failed."); } PreRunArgs args; - while (graph_manager->thread_run_flag_) { - bool pop_status = graph_manager->prerun_args_q_.Pop(args); - if (!pop_status) { + while (thread_run_flag_) { + if (!prerun_args_q_.Pop(args)) { continue; } GELOGI("[PreRunThread] A new loop start, graph_id:%u.", args.graph_id); - ErrorManager::GetInstance().SetErrorContext(args.error_context); ErrorManager::GetInstance().SetStage(error_message::kModelCompile, error_message::kOther); GetContext().SetSessionId(args.session_id); GetThreadLocalContext() = args.context; - graph_manager->UpdateLocalOmgContext(args.graph_id); + UpdateLocalOmgContext(args.graph_id); // find graph GraphNodePtr graph_node = nullptr; - Status ret = graph_manager->GetGraphNode(args.graph_id, graph_node); + Status ret = GetGraphNode(args.graph_id, graph_node); if (ret != SUCCESS) { - ReturnError(graph_manager, args.callback, GE_GRAPH_GRAPH_NODE_NULL, + ReturnError(args.callback, GE_GRAPH_GRAPH_NODE_NULL, "[RunGraph] graph not exist, graph_id=" + std::to_string(args.graph_id)); return; } // more than one graph owns same graph_id uint32_t count = 0; - if (graph_manager->GetGraphCount(args.graph_id, count) != SUCCESS) { + if (GetGraphCount(args.graph_id, count) != SUCCESS) { GELOGE(INTERNAL_ERROR, "[Get][GraphCount] failed, graph id:%u.", args.graph_id); return; } @@ -3142,7 +2848,7 @@ void GraphManager::PreRunThread(GraphManager *graph_manager) { // In online inference concurrency senario, graph_node is allowed to be locked for 'count' times graph_node->SetSemSize(count); graph_node->Lock(); - graph_manager->run_args_q_.Push(RunArgs( { graph_node, args.graph_id, args.session_id, args.error_context, + PushGraph(RunArgs( { graph_node, args.graph_id, args.session_id, args.error_context, args.input_tensor, graph_node->GetGeRootModel(), GetThreadLocalContext(), args.callback })); GELOGI("[PreRunThread] Loop end. Start to run with cached build model."); continue; @@ -3151,7 +2857,7 @@ void GraphManager::PreRunThread(GraphManager *graph_manager) { graph_node->Lock(); if (graph_node->GetRunFlag()) { - ReturnError(graph_manager, args.callback, GE_GRAPH_ALREADY_RUNNING, + ReturnError(args.callback, GE_GRAPH_ALREADY_RUNNING, "[RunGraph] graph already running, graph id=" + std::to_string(args.graph_id)); graph_node->Unlock(); return; @@ -3162,25 +2868,25 @@ void GraphManager::PreRunThread(GraphManager *graph_manager) { ComputeGraphPtr compute_graph_tmp = GraphUtils::GetComputeGraph(*(graph_node->GetGraph())); if (compute_graph_tmp == nullptr) { - ReturnError(graph_manager, args.callback, GE_GRAPH_GRAPH_NODE_NULL, + ReturnError(args.callback, GE_GRAPH_GRAPH_NODE_NULL, "[RunGraph] compute_graph_tmp is NULL, graph id = %u."); graph_node->Unlock(); return; } // when set incre build, save cache helper. - graph_manager->AddModelCacheHelperToMap(args.graph_id, args.session_id, compute_graph_tmp); + AddModelCacheHelperToMap(args.graph_id, args.session_id, compute_graph_tmp); std::vector ge_models; - if (graph_manager->options_.local_fmk_op_flag) { - graph_manager->GetCompilerStages(graph_node->GetGraphId()).optimizer.TranFrameOp(compute_graph_tmp); + if (options_.local_fmk_op_flag) { + GetCompilerStages(graph_node->GetGraphId()).optimizer.TranFrameOp(compute_graph_tmp); } // it will not execute graph preprocess, optimize, parition, build if the graph has built successful. GELOGI("Start for run graph async."); GeRootModelPtr ge_root_model = nullptr; - ret = CheckIncreBuildAndPreRun(graph_manager, args, graph_node, ge_root_model); + ret = CheckIncreBuildAndPreRun(args, graph_node, ge_root_model); if (ret != SUCCESS) { graph_node->SetRunFlag(false); if (!ge::Analyzer::GetInstance()->IsEnableNetAnalyzeDebug()) { @@ -3193,252 +2899,49 @@ void GraphManager::PreRunThread(GraphManager *graph_manager) { continue; } } - graph_manager->run_args_q_.Push(RunArgs( { graph_node, args.graph_id, args.session_id, args.error_context, + + PushGraph(RunArgs( { graph_node, args.graph_id, args.session_id, args.error_context, args.input_tensor, ge_root_model, GetThreadLocalContext(), args.callback })); GELOGI("[PreRunThread] Loop end."); } } -void GraphManager::ParseInputsDimsForData(const std::vector &input_tensor) { - GELOGD("Start parse input dims from data."); - for (size_t i = 0; i < input_tensor.size(); ++i) { - const TensorDesc &tensor_desc = input_tensor[i].GetTensorDesc(); - const Shape &shape = tensor_desc.GetShape(); - const auto &shape_dims = shape.GetDims(); - GELOGD("Input tensor dims is %s.", formats::JoinToString(shape_dims).c_str()); - GetLocalOmgContext().user_real_input_dims.emplace_back(shape_dims); - } -} - -Status GraphManager::ParseInputsDimsForGetNexNosinkAndData(const vector &dynamic_nodes, - const std::vector &input_tensor) { - GELOGD("Start parse inputs dims when coexist data and getnext sink."); - for (size_t i = 0; i < dynamic_nodes.size(); ++i) { - auto op_desc = dynamic_nodes.at(i)->GetOpDesc(); - if (op_desc == nullptr) { - continue; - } - GeAttrValue::INT index = 0; - if (!(AttrUtils::GetInt(op_desc, ATTR_NAME_INDEX, index))) { - REPORT_CALL_ERROR("E19999", "Get Attr:%s from op:%s(%s) fail", ATTR_NAME_INDEX.c_str(), - op_desc->GetName().c_str(), op_desc->GetType().c_str()); - GELOGE(PARAM_INVALID, "[Get][Attr] %s from op:%s(%s) fail", ATTR_NAME_INDEX.c_str(), - op_desc->GetName().c_str(), op_desc->GetType().c_str()); - return PARAM_INVALID; - } - if (static_cast(index) > input_tensor.size()) { - REPORT_INNER_ERROR("E19999", "Attr:%s in op:%s(%s) value:%ld > param input_tensor.size:%zu, " - "check invalid", ATTR_NAME_INDEX.c_str(), - op_desc->GetName().c_str(), op_desc->GetType().c_str(), - index, input_tensor.size()); - GELOGE(PARAM_INVALID, "[Check][Param] Attr:%s in op:%s(%s) value:%ld > param input_tensor.size:%zu", - ATTR_NAME_INDEX.c_str(), op_desc->GetName().c_str(), op_desc->GetType().c_str(), - index, input_tensor.size()); - return PARAM_INVALID; - } - - const TensorDesc &tensor_desc = input_tensor[i].GetTensorDesc(); - const Shape &shape = tensor_desc.GetShape(); - const auto &shape_dims = shape.GetDims(); - GELOGI("Shape dims of %zu data is %s.", index, formats::JoinToString(shape_dims).c_str()); - GetLocalOmgContext().user_real_input_dims.emplace_back(std::move(shape_dims)); +void GraphManager::PushGraph(const RunArgs &args) { + if (executor_ == nullptr) { + GELOGW("Just compile model, not support execute."); + return; } - return SUCCESS; -} -Status GraphManager::ParseInputsDims(const std::vector &input_tensor) { - GELOGI("Start parse input dims of %zu input tensor.", input_tensor.size()); - GetLocalOmgContext().user_real_input_dims.clear(); - if (!GetLocalOmgContext().dynamic_node_type.empty()) { - vector data_nodes; - vector getnext_nosink_nodes; - data_nodes = GetLocalOmgContext().data_nodes; - getnext_nosink_nodes = GetLocalOmgContext().getnext_nosink_nodes; - GELOGD("Data nodes count is %zu, getnext nosink nodes count is %zu.", data_nodes.size(), - getnext_nosink_nodes.size()); - if (GetLocalOmgContext().dynamic_node_type == DATA) { - if (getnext_nosink_nodes.empty()) { - // just data or data+getnext_sink - ParseInputsDimsForData(input_tensor); - } else { - // data+getnext_nosink, but only need to get shape_dims of data - if (ParseInputsDimsForGetNexNosinkAndData(data_nodes, input_tensor) != SUCCESS) { - GELOGE(PARAM_INVALID, "[Parse][Dims] from data failed, when data coexist with getnext nosink."); - return PARAM_INVALID; - } - } - } else { - if (getnext_nosink_nodes.empty()) { - // just getnext_sink or getnext_sink+data, need to get shape_dims from aicpu op - GELOGI("Need to get dims from aicpu op: GETDYNAMICDIMS."); - return SUCCESS; - } else { - if (data_nodes.empty()) { - // just getnext_nosink - ParseInputsDimsForData(input_tensor); - } else { - // getnext_nosink + data, but only need to get shape_dims of getnext_nosink - if (ParseInputsDimsForGetNexNosinkAndData(getnext_nosink_nodes, input_tensor) != SUCCESS) { - GELOGE(PARAM_INVALID, "[Parse][Dims] from getnext nosink failed, when data coexist with getnext nosink"); - return PARAM_INVALID; - } - } - } - } - } - GELOGI("Parse %zu inputs dims success.", GetLocalOmgContext().user_real_input_dims.size()); - return SUCCESS; + (void)executor_->PushGraph(args); } -void GraphManager::RunThread(GraphManager *graph_manager) { - ErrorManager::GetInstance().SetStage(error_message::kModelExecute, error_message::kModelExecute); - if (prctl(PR_SET_NAME, ("GE_Run")) != 0) { - GELOGW("Set thread name failed."); - } - - RunArgs args; - while (graph_manager->thread_run_flag_) { - bool pop_status = graph_manager->run_args_q_.Pop(args); - if (!pop_status) { - continue; - } - - GELOGI("[RunThread] A new loop start, graph_id:%u.", args.graph_id); - - ErrorManager::GetInstance().SetErrorContext(args.error_context); - GetContext().SetSessionId(args.session_id); - GetThreadLocalContext() = args.context; - graph_manager->UpdateLocalOmgContext(args.graph_id); - - Status ret; - // parse inputs.dims to vector> dynamic_dims - ret = graph_manager->ParseInputsDims(args.input_tensor); - if (ret != SUCCESS) { - ReturnError(graph_manager, args.callback, ret, "ParseInputsDims failed, thread exit."); - args.graph_node->Unlock(); - return; - } - - args.graph_node->UpdateLoadFlag(); - if (!args.graph_node->GetLoadFlag()) { - ErrorManager::GetInstance().SetStage(error_message::kModelLoad, error_message::kModelLoad); - args.ge_root_model->SetTrainFlag(graph_manager->GetTrainFlag()); - ret = graph_manager->LoadGraphAsync(args.ge_root_model, args.graph_node); - if (ret != SUCCESS || args.ge_root_model == nullptr) { - StopQueue(graph_manager); - ReturnError(graph_manager, args.callback, ret, "LoadGraphAsync failed, thread exit."); - args.graph_node->Unlock(); - return; - } - // control the times of graph loading in multi-thread scenario - args.graph_node->DecreaseLoadCount(); - args.graph_node->IncreaseLoadRecord(); +void GraphManager::SetRunContext(const GraphNodePtr &graph_node) { + OmeContext ome_context; + ome_context.need_multi_batch = GetLocalOmgContext().need_multi_batch; + ome_context.dynamic_node_type = GetLocalOmgContext().dynamic_node_type; + ome_context.dynamic_shape_dims = StringUtils::Split(GetLocalOmgContext().dynamic_dims, ';'); + ome_context.user_input_dims = GetLocalOmgContext().user_input_dims; - args.graph_node->SetLoadFlag(true); - GELOGI("LoadGraph[%u], model[%u] success and set LoadFlag to true.", args.graph_node->GetGraphId(), - args.ge_root_model->GetModelId()); - } + ome_context.data_nodes = GetLocalOmgContext().data_nodes; + ome_context.getnext_nosink_nodes = GetLocalOmgContext().getnext_nosink_nodes; - ErrorManager::GetInstance().SetStage(error_message::kModelExecute, error_message::kModelExecute); - if (graph_manager->GetTrainFlag()) { - ret = graph_manager->graph_executor_.SetGraphContext(graph_manager->GetGraphContext()); - if (ret != SUCCESS) { - GELOGW("[GraphManager] SetGraphContext failed, graph_id=%u.", args.graph_id); - } - graph_manager->graph_executor_.SetTrainFlag(graph_manager->options_.train_graph_flag); - } + ome_context.user_real_input_dims = GetLocalOmgContext().user_real_input_dims; - ret = graph_manager->graph_executor_.ExecuteGraphAsync(args.graph_id, args.graph_node->GetGeRootModel(), - args.input_tensor, args.callback); - args.graph_node->SetRunFlag(false); - if (ret != SUCCESS) { - ReturnError(graph_manager, args.callback, ret, "ExecuteGraphAsync failed, thread exit."); - args.graph_node->Unlock(); - return; - } - args.graph_node->Unlock(); - GELOGI("[GraphManager] Run graph async success, graph_id=%u.", args.graph_id); - } + graph_node->SetOmeContext(ome_context); } -void GraphManager::StopQueue(GraphManager *graph_manager) { - if (graph_manager == nullptr) { - return; - } - - graph_manager->thread_run_flag_.store(false); - graph_manager->prerun_args_q_.Stop(); - graph_manager->run_args_q_.Stop(); +void GraphManager::StopQueue() { + thread_run_flag_.store(false); + prerun_args_q_.Stop(); } -void GraphManager::ReturnError(GraphManager *graph_manager, RunAsyncCallback callback, Status ret, const string &log) { - if (graph_manager == nullptr) { - return; - } - StopQueue(graph_manager); +void GraphManager::ReturnError(RunAsyncCallback callback, Status ret, const string &log) { + StopQueue(); GELOGE(ret, "%s.", log.c_str()); std::vector outputs; callback(ret, outputs); } -void GraphManager::ReturnError(GraphManager *graph_manager, GraphNodePtr &graph_node, RunAsyncCallback callback, - Status ret, const string &log) { - std::vector outputs; - auto compute_graph = GraphUtils::GetComputeGraph(*graph_node->GetGraph()); - if (graph_manager == nullptr || compute_graph == nullptr) { - REPORT_INNER_ERROR("E19999", "Param graph_manager or compute_graph in graph_node is nullptr, check invalid"); - GELOGE(GRAPH_FAILED, "[Check][Param] compute graph or graph manager is nullptr"); - callback(GRAPH_FAILED, outputs); - return; - } - - for (const auto &node : compute_graph->GetAllNodes()) { - if (node->GetType() != "NetOutput") { - continue; - } - for (size_t i = 0; i < node->GetAllInDataAnchorsSize(); i++) { - auto input_desc = node->GetOpDesc()->MutableInputDesc(i); - GeShape ge_shape(input_desc->GetShape().GetDims()); - GeTensorDesc ge_tensor_desc; - ge_tensor_desc.SetShape(ge_shape); - GeTensor ge_tensor(ge_tensor_desc); - int64_t len = 1; - if (input_desc->GetShape().GetDims() != std::vector({})) { - len = input_desc->GetShape().GetShapeSize(); - } - if (len < 0) { - REPORT_INNER_ERROR("E19999", "InputIndex:%zu ShapeSize:%ld of op:%s(%s) < 0, unknown shape is not support, " - "check invalid", i, len, - node->GetName().c_str(), node->GetType().c_str()); - GELOGE(GRAPH_FAILED, "[Check][Param] InputIndex:%zu ShapeSize:%ld of op:%s(%s) < 0, " - "unknown shape is not support", i, len, node->GetName().c_str(), node->GetType().c_str()); - callback(GRAPH_FAILED, outputs); - return; - } else if (len == 0) { - GELOGI("getted shape size is 0.Do process as empty tensor!"); - len = 1; - } - auto length = GetSizeInBytes(len, input_desc->GetDataType()); - auto aligned_ptr = MakeShared(length, kAlignment); - if (aligned_ptr == nullptr) { - REPORT_CALL_ERROR("E19999", "New AlignedPtr failed, len:%ld", length); - GELOGE(GRAPH_FAILED, "[Create][AlignedPtr] failed, len:%ld", length); - return; - } - ge_tensor.SetData(aligned_ptr, length); - ge::Tensor tensor = TensorAdapter::AsTensor(ge_tensor); - // To avoid global step too small and can not stop, totally set a bigger value - auto ptr = aligned_ptr->MutableGet(); - for (int64_t i = 0; i < length; i++) { - ptr[i] = 0x7F; // here stands for a positive max value - } - outputs.emplace_back(std::move(tensor)); - } - } - callback(SUCCESS, outputs); - return; -} - bool GraphManager::IsGraphNeedRebuild(uint32_t graph_id) { // find graph GraphNodePtr graph_node = nullptr; @@ -3649,6 +3152,7 @@ Status GraphManager::Build(const GraphNodePtr &graph_node, ComputeGraphPtr &comp GraphUtils::DumpGEGraph(compute_graph, "Build", is_always_dump); GraphUtils::DumpGEGraphToOnnx(*compute_graph, "Build"); + SetRunContext(graph_node); graph_node->SetGeRootModel(ge_root_model); return SUCCESS; } diff --git a/ge/graph/manager/graph_manager.h b/ge/graph/manager/graph_manager.h index 3475da6d..6773787c 100644 --- a/ge/graph/manager/graph_manager.h +++ b/ge/graph/manager/graph_manager.h @@ -31,7 +31,6 @@ #include "external/graph/types.h" #include "external/ge/ge_api_types.h" #include "graph/build/graph_builder.h" -#include "graph/execute/graph_execute.h" #include "graph/ge_local_context.h" #include "graph/load/graph_loader.h" #include "graph/manager/graph_manager_utils.h" @@ -41,11 +40,12 @@ #include "graph/preprocess/graph_preprocess.h" #include "graph/tuning_utils.h" #include "model/ge_model.h" +#include "common/executor.h" namespace ge { class GraphManager { public: - GraphManager(); + GraphManager() = default; ~GraphManager() = default; /// @@ -54,7 +54,7 @@ class GraphManager { /// @param [in] options user config params /// @return Status result of function /// - Status Initialize(const std::map &options); + Status Initialize(const std::map &options, Executor *executor = nullptr); /// /// @ingroup ge_graph @@ -113,7 +113,7 @@ class GraphManager { /// @param [out] outputs output data /// @return Status result of function /// - Status RunGraphWithStreamAsync(const GraphId &graph_id, rtStream_t stream, uint64_t session_id, + Status RunGraphWithStreamAsync(const GraphId &graph_id, rtStream_t stream, uint64_t session_id, const std::vector &inputs, std::vector &outputs); /// @@ -227,34 +227,18 @@ class GraphManager { RunAsyncCallback callback; }; - struct RunArgs { - GraphNodePtr graph_node; - GraphId graph_id; - uint64_t session_id; - struct error_message::Context error_context; - std::vector input_tensor; - GeRootModelPtr ge_root_model; - GEThreadLocalContext context; - RunAsyncCallback callback; - }; - void AddGraphNode(GraphId graph_id, const GraphNodePtr &graph_node); void RemoveGraphNode(GraphId graph_id); bool HasGraphNode(GraphId graph_id); Status GetGraphNode(const GraphId &graph_id, GraphNodePtr &out); - std::shared_ptr GetModelListener() const { return graph_run_listener_; } - static Status ProcessSubGraphWithMultiThreads(GraphManager *graph_manager, GraphId root_graph_id, const SubGraphInfoPtr &sub_graph_info_ptr, const std::string &root_graph_name, uint64_t session_id, const struct error_message::Context &error_context, const GEThreadLocalContext &ge_context); - Status ParseInputsDims(const std::vector &input_tensor); - void ParseInputsDimsForData(const std::vector &input_tensor); - Status ParseInputsDimsForGetNexNosinkAndData(const vector &dynamic_nodes, - const std::vector &input_tensor); + Status RunCustomPass(const GraphNodePtr &graph_node); Status PreRun(const GraphNodePtr &graph_node, const std::vector &inputs, GeRootModelPtr &ge_root_model, uint64_t session_id = INVALID_SESSION_ID); @@ -350,10 +334,6 @@ class GraphManager { Status SubexpressionMigration(ComputeGraphPtr &compute_graph); - Status LoadGraphAsync(const GeRootModelPtr &ge_root_model, const GraphNodePtr &graph_node); - - Status CheckAndReleaseMemory(const GeModelPtr &ge_model, const GraphNodePtr &graph_node); - bool CheckModelLoad(const GeRootModelPtr &ge_model, bool load_flag); Status LoadGraph(const GeRootModelPtr &ge_root_model, const GraphNodePtr &graph_node); @@ -368,12 +348,12 @@ class GraphManager { void RemoveModelCacheHelper(const GraphId &graph_id); ModelCacheHelperPtr FindModelCacheHelper(GraphId graph_id); - static void PreRunThread(GraphManager *graph_manager); - static void RunThread(GraphManager *graph_manager); - static void StopQueue(GraphManager *graph_manager); - static void ReturnError(GraphManager *graph_manager, RunAsyncCallback callback, Status ret, const string &log); - static void ReturnError(GraphManager *graph_manager, GraphNodePtr &graph_node, RunAsyncCallback callback, - Status ret, const string &log); + void SetRunContext(const GraphNodePtr &graph_node); + void PushGraph(const RunArgs &args); + + void PreRunThread(); + void StopQueue(); + void ReturnError(RunAsyncCallback callback, Status ret, const string &log); void ChangeConstTypeWhenTraining(const ComputeGraphPtr &compute_graph); @@ -409,11 +389,7 @@ class GraphManager { CompilerStages &GetCompilerStages(GraphId graph_id); void RemoveCompilerStages(GraphId graph_id); - static Status CheckIncreBuildAndPreRun(GraphManager *graph_manager, const PreRunArgs &args, GraphNodePtr &graph_node, - GeRootModelPtr &ge_root_model); - - void ReleaseMemory(const GeModelPtr &ge_model, GraphNodePtr &graph_node, const std::vector &model_ids, - uint32_t graph_id, uint64_t session_id); + Status CheckIncreBuildAndPreRun(const PreRunArgs &args, GraphNodePtr &graph_node, GeRootModelPtr &ge_root_model); Status CheckRepeatAdd(uint32_t graph_id, bool &is_added); @@ -431,34 +407,25 @@ class GraphManager { static Status CheckGraphAdded(const GraphId &graph_id, const Graph &graph); - std::atomic_bool thread_run_flag_; + std::atomic_bool thread_run_flag_{false}; BlockingQueue prerun_args_q_{}; - BlockingQueue run_args_q_{}; std::thread prerun_thread_; - std::thread run_thread_; ComputeGraphPtr compute_graph_; std::map graph_map_; std::map cache_helper_map_; - // for run graph synchronous return - std::mutex sync_run_mutex_; - std::condition_variable condition_; - // run graph synchronization call back listener - std::shared_ptr graph_run_listener_; - // summary and checkpoint callback function list for ME, key is summary or checkpoint std::map &)>> me_callback_map_; std::map &)>> callback_map_; - bool init_flag_; - + bool init_flag_{false}; GraphManagerOptions options_; GraphContextPtr graph_context_ = nullptr; map omg_contexts_; map compiler_stages_; - GraphExecutor graph_executor_; + Executor *executor_{nullptr}; VarAccelerateCtrl var_acc_ctrl_; diff --git a/ge/graph/manager/graph_manager_utils.h b/ge/graph/manager/graph_manager_utils.h index 6ed76e57..9cec6b6d 100644 --- a/ge/graph/manager/graph_manager_utils.h +++ b/ge/graph/manager/graph_manager_utils.h @@ -33,6 +33,7 @@ #include "framework/common/debug/ge_log.h" #include "framework/common/ge_inner_error_codes.h" #include "graph/compute_graph.h" +#include "graph/common/local_context.h" #include "external/graph/graph.h" #include "graph/model.h" #include "model/ge_model.h" @@ -154,6 +155,9 @@ class GraphNode { bool GetRunFlag() const { return run_flag_; } void SetRunFlag(bool flag) { run_flag_ = flag; } + void SetOmeContext(const OmeContext &context) { context_ = context; } + OmeContext &GetOmeContext() { return context_; } + bool IsAsync() const { return async_; } void SetAsync(bool flag) { async_ = flag; } @@ -196,6 +200,8 @@ class GraphNode { bool run_flag_; std::vector subgraph_ptr_list_; + OmeContext context_; + GraphPtr graph_; ComputeGraphPtr compute_graph_; bool build_flag_; diff --git a/ge/session/inner_session.cc b/ge/session/inner_session.cc index 58b78f41..236ec783 100755 --- a/ge/session/inner_session.cc +++ b/ge/session/inner_session.cc @@ -124,7 +124,7 @@ Status InnerSession::Initialize() { GE_CHK_STATUS_RET(dump_properties.InitByOptions(), "Init dump properties failed."); GE_CHK_STATUS_RET(AddDumpProperties(dump_properties), "[Add][DumpProperties] failed."); - ret = graph_manager_.Initialize(options_); + ret = InnerInitialize(); if (ret != SUCCESS) { GELOGE(ret, "[Init][GraphManager] failed, InnerSession:%lu.", session_id_); REPORT_CALL_ERROR("E19999", "GraphManager initialize failed, InnerSession:%lu.", session_id_); @@ -136,7 +136,7 @@ Status InnerSession::Initialize() { if (ret != SUCCESS) { GELOGE(ret, "[Set][MemoryMallocSize] failed."); REPORT_CALL_ERROR("E19999", "VarManager SetMemoryMallocSize failed, InnerSession:%lu.", session_id_); - (void)graph_manager_.Finalize(); + (void)InnerFinalize(); GE_CHK_STATUS(RemoveDumpProperties(), "[Remove][DumpProperties] failed."); GE_CHK_RT(rtDeviceReset(static_cast(GetContext().DeviceId()))); return ret; @@ -162,7 +162,7 @@ Status InnerSession::Finalize() { return SUCCESS; } UpdateThreadContext(std::map{}); - Status ret = graph_manager_.Finalize(); + Status ret = InnerFinalize(); if (ret != SUCCESS) { // Subsequent code execution is required, so no return is required GELOGE(ret, "[Finalize][GraphManager] failed, InnerSession:%lu.", session_id_); @@ -188,6 +188,44 @@ Status InnerSession::Finalize() { return ret; } +Status InnerSession::InnerInitialize() { + Status ret = model_executor_.Initialize(options_); + if (ret != SUCCESS) { + GELOGE(ret, "[Init][GraphExecutor] failed, InnerSession:%lu.", session_id_); + REPORT_CALL_ERROR("E19999", "GraphExecutor initialize failed, InnerSession:%lu.", session_id_); + GE_CHK_STATUS(RemoveDumpProperties(), "[Remove][DumpProperties] failed."); + return ret; + } + + ret = graph_manager_.Initialize(options_, &model_executor_); + if (ret != SUCCESS) { + GELOGE(ret, "[Init][GraphManager] failed, InnerSession:%lu.", session_id_); + REPORT_CALL_ERROR("E19999", "GraphManager initialize failed, InnerSession:%lu.", session_id_); + GE_CHK_STATUS(RemoveDumpProperties(), "[Remove][DumpProperties] failed."); + return ret; + } + + return SUCCESS; +} + +Status InnerSession::InnerFinalize() { + Status ret = graph_manager_.Finalize(); + if (ret != SUCCESS) { + // Subsequent code execution is required, so no return is required + GELOGE(ret, "[Finalize][GraphManager] failed, InnerSession:%lu.", session_id_); + REPORT_CALL_ERROR("E19999", "GraphManager Finalize failed, InnerSession:%lu.", session_id_); + } + + ret = model_executor_.Finalize(); + if (ret != SUCCESS) { + // Subsequent code execution is required, so no return is required + GELOGE(ret, "[Finalize][GraphExecutor] failed, InnerSession:%lu.", session_id_); + REPORT_CALL_ERROR("E19999", "GraphExecutor Finalize failed, InnerSession:%lu.", session_id_); + } + + return SUCCESS; +} + Status InnerSession::GetVariable(const std::string &name, Tensor &val) { UpdateThreadContext(std::map{}); return graph_manager_.GetVariable(name, val); diff --git a/ge/session/inner_session.h b/ge/session/inner_session.h index 35fe4692..afc273ac 100644 --- a/ge/session/inner_session.h +++ b/ge/session/inner_session.h @@ -23,6 +23,7 @@ #include "framework/common/ge_types.h" #include "external/ge/ge_api_types.h" #include "graph/manager/graph_manager.h" +#include "graph/execute/model_executor.h" namespace ge { class InnerSession { @@ -82,10 +83,14 @@ class InnerSession { void SetRtSocVersion(); private: + Status InnerInitialize(); + Status InnerFinalize(); + bool init_flag_; uint64_t session_id_; std::map options_; GraphManager graph_manager_; + ModelExecutor model_executor_; std::mutex resource_mutex_; // AddGraph, RemoveGraph and Finalize use void UpdateThreadContext(const std::map &options); void UpdateThreadContext(uint32_t graph_id); diff --git a/ge/single_op/task/op_task.h b/ge/single_op/task/op_task.h index 085bb5ff..a73bcfda 100644 --- a/ge/single_op/task/op_task.h +++ b/ge/single_op/task/op_task.h @@ -268,7 +268,7 @@ class MemcpyAsyncTask : public OpTask { friend class SingleOpModel; friend class RtsKernelTaskBuilder; - uintptr_t addresses_[kAddressNum]; + uintptr_t addresses_[kAddressNum] = {0}; size_t dst_max_; size_t count_; rtMemcpyKind_t kind_; diff --git a/tests/ut/ge/CMakeLists.txt b/tests/ut/ge/CMakeLists.txt index d8fcd6c3..7832c7b0 100755 --- a/tests/ut/ge/CMakeLists.txt +++ b/tests/ut/ge/CMakeLists.txt @@ -161,8 +161,9 @@ set(COMMON_SRC_FILES "${GE_CODE_DIR}/ge/common/profiling/profiling_manager.cc" "${GE_CODE_DIR}/ge/common/profiling/ge_profiling.cc" "${GE_CODE_DIR}/ge/graph/manager/host_mem_manager.cc" - "${GE_CODE_DIR}/ge/graph/manager/memory_api.cc" + "${GE_CODE_DIR}/ge/graph/manager/memory_api.cc" "${GE_CODE_DIR}/ge/session/inner_session.cc" + "${GE_CODE_DIR}/ge/graph/execute/model_executor.cc" "${GE_CODE_DIR}/ge/graph/manager/util/rt_context_util.cc" "${GE_CODE_DIR}/ge/graph/execute/graph_execute.cc" "${GE_CODE_DIR}/ge/graph/preprocess/graph_preprocess.cc" @@ -469,6 +470,7 @@ set(GRAPH_BUILD_COMMON_SRC_FILES "${GE_CODE_DIR}/ge/client/ge_api.cc" "${GE_CODE_DIR}/ge/session/inner_session.cc" "${GE_CODE_DIR}/ge/session/session_manager.cc" + "${GE_CODE_DIR}/ge/graph/execute/model_executor.cc" "${GE_CODE_DIR}/ge/engine_manager/dnnengine_manager.cc" "${GE_CODE_DIR}/ge/plugin/engine/engine_manage.cc" "${GE_CODE_DIR}/ge/graph/build/logical_stream_allocator.cc" @@ -810,6 +812,7 @@ set(MULTI_PARTS_TEST_FILES "graph/build/task_generator_unittest.cc" "graph/build/buffer_pool_mem_assigner_unittest.cc" "graph/execute/graph_execute_unittest.cc" + "graph/execute/model_executor_unittest.cc" "graph/preprocess/graph_preprocess_unittest.cc" "graph/manager/hcom_util_unittest.cc" "graph/manager/graph_caching_allocator_unittest.cc" diff --git a/tests/ut/ge/common/datatype_transfer_unittest.cc b/tests/ut/ge/common/datatype_transfer_unittest.cc index c311a7cf..ea131b2c 100644 --- a/tests/ut/ge/common/datatype_transfer_unittest.cc +++ b/tests/ut/ge/common/datatype_transfer_unittest.cc @@ -47,7 +47,7 @@ TEST_F(UtestDataTypeTransfer, fp16_fp32) { EXPECT_EQ(transfer.TransDataType(args, result), SUCCESS); EXPECT_EQ(result.length, sizeof(ret)); bool is_equal = true; - for (int i = 0; i < sizeof(ret) / sizeof(ret[0]); ++i) { + for (size_t i = 0; i < sizeof(ret) / sizeof(ret[0]); ++i) { if (abs((reinterpret_cast(result.data.get()))[i] - ret[i]) > 1.0e-6) { is_equal = false; break; @@ -60,7 +60,7 @@ TEST_F(UtestDataTypeTransfer, fp16_fp32) { CastArgs args2{reinterpret_cast(ret), sizeof(ret) / sizeof(ret[0]), DT_FLOAT, DT_FLOAT16}; EXPECT_EQ(transfer2.TransDataType(args2, result2), SUCCESS); EXPECT_EQ(result2.length, sizeof(data)); - for (int i = 0; i < sizeof(ret) / sizeof(ret[0]); ++i) { + for (size_t i = 0; i < sizeof(ret) / sizeof(ret[0]); ++i) { EXPECT_FLOAT_EQ((reinterpret_cast(result2.data.get()))[i].val, data[i].val); } EXPECT_EQ(TransDataType(args2, result2), SUCCESS); @@ -81,7 +81,7 @@ TEST_F(UtestDataTypeTransfer, int32_fp16) { CastArgs args{reinterpret_cast(data), sizeof(ret) / sizeof(ret[0]), DT_INT32, DT_FLOAT16}; EXPECT_EQ(transfer.TransDataType(args, result), SUCCESS); EXPECT_EQ(result.length, sizeof(ret)); - for (int i = 0; i < sizeof(ret) / sizeof(ret[0]); ++i) { + for (size_t i = 0; i < sizeof(ret) / sizeof(ret[0]); ++i) { EXPECT_FLOAT_EQ((reinterpret_cast(result.data.get()))[i].val, ret[i].val); } @@ -91,7 +91,7 @@ TEST_F(UtestDataTypeTransfer, int32_fp16) { EXPECT_EQ(transfer2.TransDataType(args2, result2), SUCCESS); EXPECT_EQ(result2.length, sizeof(data)); bool is_equal = true; - for (int i = 0; i < sizeof(data) / sizeof(data[0]); ++i) { + for (size_t i = 0; i < sizeof(data) / sizeof(data[0]); ++i) { if (abs((reinterpret_cast(result2.data.get()))[i] - data[i]) / abs(data[i]) > 0.05) { is_equal = false; break; @@ -154,7 +154,7 @@ TEST_F(UtestDataTypeTransfer, fp32_fp16) { EXPECT_EQ(transfer.TransDataType(args, result), SUCCESS); EXPECT_EQ(result.length, sizeof(ret)); bool is_equal = true; - for (int i = 0; i < sizeof(ret) / sizeof(ret[0]); ++i) { + for (size_t i = 0; i < sizeof(ret) / sizeof(ret[0]); ++i) { if (abs((reinterpret_cast(result.data.get()))[i] - ret[i]) > 1.0e-6) { is_equal = false; break; @@ -167,7 +167,7 @@ TEST_F(UtestDataTypeTransfer, fp32_fp16) { CastArgs args2{reinterpret_cast(ret), sizeof(data) / sizeof(data[0]), DT_FLOAT, DT_FLOAT16}; EXPECT_EQ(transfer2.TransDataType(args2, result2), SUCCESS); EXPECT_EQ(result2.length, sizeof(data)); - for (int i = 0; i < sizeof(data) / sizeof(data[0]); ++i) { + for (size_t i = 0; i < sizeof(data) / sizeof(data[0]); ++i) { EXPECT_FLOAT_EQ((reinterpret_cast(result2.data.get()))[i].val, data[i].val); } } @@ -238,7 +238,7 @@ TEST_F(UtestDataTypeTransfer, uint8_fp32) { DataTypeTransfer transfer; EXPECT_EQ(transfer.TransDataType(args, result), SUCCESS); EXPECT_EQ(result.length, sizeof(ret)); - for (int i = 0; i < sizeof(ret) / sizeof(ret[0]); ++i) { + for (size_t i = 0; i < sizeof(ret) / sizeof(ret[0]); ++i) { EXPECT_EQ((reinterpret_cast(result.data.get()))[i], ret[i]); } } @@ -259,7 +259,7 @@ TEST_F(UtestDataTypeTransfer, uint8_int32) { DataTypeTransfer transfer; EXPECT_EQ(transfer.TransDataType(args, result), SUCCESS); EXPECT_EQ(result.length, sizeof(ret)); - for (int i = 0; i < sizeof(ret) / sizeof(ret[0]); ++i) { + for (size_t i = 0; i < sizeof(ret) / sizeof(ret[0]); ++i) { EXPECT_EQ((reinterpret_cast(result.data.get()))[i], ret[i]); } } @@ -282,7 +282,7 @@ TEST_F(UtestDataTypeTransfer, fp32_int32) { DataTypeTransfer transfer; EXPECT_EQ(transfer.TransDataType(args, result), SUCCESS); EXPECT_EQ(result.length, sizeof(ret)); - for (int i = 0; i < sizeof(ret) / sizeof(ret[0]); ++i) { + for (size_t i = 0; i < sizeof(ret) / sizeof(ret[0]); ++i) { EXPECT_FLOAT_EQ((reinterpret_cast(result.data.get()))[i], ret[i]); } } @@ -304,7 +304,7 @@ TEST_F(UtestDataTypeTransfer, int32_fp32) { DataTypeTransfer transfer; EXPECT_EQ(transfer.TransDataType(args, result), SUCCESS); EXPECT_EQ(result.length, sizeof(ret)); - for (int i = 0; i < sizeof(ret) / sizeof(ret[0]); ++i) { + for (size_t i = 0; i < sizeof(ret) / sizeof(ret[0]); ++i) { EXPECT_FLOAT_EQ((reinterpret_cast(result.data.get()))[i], ret[i]); } } @@ -329,7 +329,7 @@ TEST_F(UtestDataTypeTransfer, int32_uint8) { DataTypeTransfer transfer; EXPECT_EQ(transfer.TransDataType(args, result), SUCCESS); EXPECT_EQ(result.length, sizeof(ret)); - for (int i = 0; i < sizeof(ret) / sizeof(ret[0]); ++i) { + for (size_t i = 0; i < sizeof(ret) / sizeof(ret[0]); ++i) { EXPECT_FLOAT_EQ((reinterpret_cast(result.data.get()))[i], ret[i]); } } diff --git a/tests/ut/ge/graph/execute/model_executor_unittest.cc b/tests/ut/ge/graph/execute/model_executor_unittest.cc new file mode 100644 index 00000000..33643993 --- /dev/null +++ b/tests/ut/ge/graph/execute/model_executor_unittest.cc @@ -0,0 +1,327 @@ +/** + * Copyright 2021 Huawei Technologies Co., Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include + +#define protected public +#define private public +#include "graph/execute/model_executor.h" +#include "graph/manager/graph_manager.h" +#include "graph/load/model_manager/model_manager.h" +#include "graph/load/model_manager/davinci_model.h" + +using namespace std; + +namespace ge { +class UtestModelExecutorTest : public testing::Test { + protected: + void SetUp() {} + void TearDown() {} +}; + +static NodePtr CreateNode(ComputeGraph &graph, const string &name, const string &type, int in_num, int out_num) { + OpDescPtr op_desc = std::make_shared(name, type); + op_desc->SetStreamId(0); + static int32_t index = 0; + op_desc->SetId(index++); + + GeTensorDesc tensor(GeShape(), FORMAT_ND, DT_INT64); + TensorUtils::SetSize(tensor, 64); + vector input_offset; + for (int i = 0; i < in_num; i++) { + op_desc->AddInputDesc(tensor); + input_offset.emplace_back(index * 64 + i * 64); + } + op_desc->SetInputOffset(input_offset); + + vector output_offset; + for (int i = 0; i < out_num; i++) { + op_desc->AddOutputDesc(tensor); + output_offset.emplace_back(index * 64 + in_num * 64 + i * 64); + } + op_desc->SetOutputOffset(output_offset); + + op_desc->SetWorkspace({}); + op_desc->SetWorkspaceBytes({}); + op_desc->SetOpKernelLibName("DNN_VM_RTS_OP_STORE"); + + return graph.AddNode(op_desc); +} + +TEST_F(UtestModelExecutorTest, test_load_graph_sync) { + ModelExecutor model_executor; + EXPECT_EQ(model_executor.Initialize({}), SUCCESS); + + auto compute_graph = MakeShared("test_graph"); + GeRootModelPtr ge_root_model = MakeShared(compute_graph); + + GeModelPtr ge_model = MakeShared(); + ge_model->SetGraph(GraphUtils::CreateGraphFromComputeGraph(compute_graph)); + ge_root_model->SetSubgraphInstanceNameToModel(compute_graph->GetName(), ge_model); + + GraphId graph_id = 1; + GraphNodePtr graph_node = MakeShared(graph_id); + graph_node->SetGeRootModel(ge_root_model); + graph_node->SetLoadFlag(true); + graph_node->SetAsync(false); + + EXPECT_EQ(model_executor.LoadGraph(ge_root_model, graph_node), SUCCESS); + EXPECT_EQ(model_executor.UnloadGraph(ge_root_model, graph_id), SUCCESS); + + EXPECT_EQ(model_executor.Finalize(), SUCCESS); +} + +TEST_F(UtestModelExecutorTest, test_load_graph_async) { + ModelExecutor model_executor; + EXPECT_EQ(model_executor.Initialize({}), SUCCESS); + + Graph graph("test_graph"); + auto compute_graph = MakeShared("test_graph"); + GeRootModelPtr ge_root_model = MakeShared(compute_graph); + + GeModelPtr ge_model = MakeShared(); + ge_model->SetGraph(GraphUtils::CreateGraphFromComputeGraph(compute_graph)); + ge_root_model->SetSubgraphInstanceNameToModel(compute_graph->GetName(), ge_model); + + GraphId graph_id = 1; + GraphNodePtr graph_node = MakeShared(graph_id); + graph_node->SetGeRootModel(ge_root_model); + graph_node->SetLoadFlag(true); + graph_node->SetAsync(true); + + EXPECT_EQ(model_executor.LoadGraph(ge_root_model, graph_node), SUCCESS); + + EXPECT_EQ(model_executor.UnloadGraph(ge_root_model, graph_id), SUCCESS); + + EXPECT_EQ(model_executor.Finalize(), SUCCESS); +} + +TEST_F(UtestModelExecutorTest, test_load_graph_failed) { + ModelExecutor model_executor; + EXPECT_EQ(model_executor.Initialize({}), SUCCESS); + + Graph graph("test_graph"); + auto compute_graph = MakeShared("test_graph"); + GeRootModelPtr ge_root_model = MakeShared(compute_graph); + + GraphId graph_id = 1; + GraphNodePtr graph_node = MakeShared(graph_id); + graph_node->SetGeRootModel(ge_root_model); + graph_node->SetLoadFlag(true); + graph_node->SetAsync(true); + + // GeModel is null, DavinciModel::Assign will return FAILED + setenv(kEnvGeuseStaticMemory, "1", true); + EXPECT_EQ(model_executor.LoadGraph(ge_root_model, graph_node), FAILED); + EXPECT_EQ(model_executor.UnloadGraph(ge_root_model, graph_id), SUCCESS); + + EXPECT_EQ(model_executor.Finalize(), SUCCESS); + unsetenv(kEnvGeuseStaticMemory); +} + +TEST_F(UtestModelExecutorTest, test_check_and_release_memory) { + { + auto listener = MakeShared(); + shared_ptr davinci_model1 = MakeShared(1, listener); + davinci_model1->SetId(1); + ModelManager::GetInstance()->InsertModel(1, davinci_model1); + shared_ptr davinci_model2 = MakeShared(2, listener); + davinci_model1->SetId(2); + ModelManager::GetInstance()->InsertModel(2, davinci_model2); + } + + ModelExecutor model_executor; + EXPECT_EQ(model_executor.Initialize({}), SUCCESS); + + GeModelPtr ge_model = make_shared(); + int64_t memory_size = 25 * 1024UL * 1024UL * 1024UL; + int64_t weight_size = 25 * 1024UL * 1024UL * 1024UL; + uint64_t session_id = 0; + EXPECT_TRUE(AttrUtils::SetInt(ge_model, ATTR_MODEL_MEMORY_SIZE, memory_size)); + EXPECT_TRUE(AttrUtils::SetInt(ge_model, ATTR_MODEL_WEIGHT_SIZE, weight_size)); + EXPECT_TRUE(AttrUtils::SetInt(ge_model, MODEL_ATTR_SESSION_ID, session_id)); + + GraphId graph_id = 1; + GraphNodePtr graph_node = MakeShared(graph_id); + model_executor.AddGraphNode(graph_id, graph_node); + + ComputeGraphPtr compute_graph = MakeShared("test_graph"); + GeRootModelPtr ge_root_model = MakeShared(compute_graph); + ge_root_model->SetModelId(1); + ge_root_model->SetModelId(2); + graph_node->SetGeRootModel(ge_root_model); + graph_node->SetLoadFlag(true); + + EXPECT_EQ(model_executor.CheckAndReleaseMemory(ge_model, graph_node), SUCCESS); + EXPECT_EQ(model_executor.Finalize(), SUCCESS); +} + +TEST_F(UtestModelExecutorTest, parse_inputs_dims_data) { + ModelExecutor model_executor; + EXPECT_EQ(model_executor.Initialize({}), SUCCESS); + + OmeContext context; + SetLocalOmeContext(context); + ComputeGraphPtr compute_graph = MakeShared("test_graph"); + const auto data1 = CreateNode(*compute_graph, DATA, "data1", 1, 1); + const auto next1 = CreateNode(*compute_graph, GETNEXT, "data1", 1, 1); + + Tensor tensor; + std::vector input_tensors; + input_tensors.emplace_back(tensor); + EXPECT_EQ(model_executor.ParseInputsDims(input_tensors), SUCCESS); // dynamic_node_type is empty, just return + + context.dynamic_node_type = DATA; + EXPECT_EQ(model_executor.ParseInputsDims(input_tensors), SUCCESS); // ParseInputsDimsForData + + context.getnext_nosink_nodes.emplace_back(next1); + EXPECT_EQ(model_executor.ParseInputsDims(input_tensors), SUCCESS); // ParseInputsDimsForGetNexNosinkAndData + + EXPECT_EQ(model_executor.Finalize(), SUCCESS); +} + +TEST_F(UtestModelExecutorTest, parse_inputs_dims_getnext) { + ModelExecutor model_executor; + EXPECT_EQ(model_executor.Initialize({}), SUCCESS); + + OmeContext context; + SetLocalOmeContext(context); + ComputeGraphPtr compute_graph = MakeShared("test_graph"); + const auto data1 = CreateNode(*compute_graph, DATA, "data1", 1, 1); + const auto next1 = CreateNode(*compute_graph, GETNEXT, "data1", 1, 1); + + Tensor tensor; + std::vector input_tensors; + input_tensors.emplace_back(tensor); + + context.dynamic_node_type = GETNEXT; + EXPECT_EQ(model_executor.ParseInputsDims(input_tensors), SUCCESS); // just getnext_sink + + context.getnext_nosink_nodes.emplace_back(next1); + EXPECT_EQ(model_executor.ParseInputsDims(input_tensors), SUCCESS); // ParseInputsDimsForData + + context.data_nodes.emplace_back(data1); + EXPECT_EQ(model_executor.ParseInputsDims(input_tensors), PARAM_INVALID); // ParseInputsDimsForGetNexNosinkAndData + AttrUtils::SetInt(next1->GetOpDesc(), ATTR_NAME_INDEX, 0); + EXPECT_EQ(model_executor.ParseInputsDims(input_tensors), SUCCESS); // ParseInputsDimsForGetNexNosinkAndData + + EXPECT_EQ(model_executor.Finalize(), SUCCESS); +} + +TEST_F(UtestModelExecutorTest, test_run_thread) { + ModelExecutor model_executor; + EXPECT_EQ(model_executor.Initialize({}), SUCCESS); + + GraphId graph_id = 1; + uint64_t session_id = 0; + error_message::Context error_context; + GEThreadLocalContext context; + const auto callback = [](Status status, std::vector &outputs) { }; + + auto compute_graph = MakeShared("test_graph"); + GeRootModelPtr ge_root_model = MakeShared(compute_graph); + + GeModelPtr ge_model = MakeShared(); + ge_model->SetGraph(GraphUtils::CreateGraphFromComputeGraph(compute_graph)); + ge_root_model->SetSubgraphInstanceNameToModel(compute_graph->GetName(), ge_model); + + GraphNodePtr graph_node = MakeShared(graph_id); + graph_node->SetGeRootModel(ge_root_model); + graph_node->SetLoadFlag(false); + graph_node->SetAsync(true); + graph_node->IncreaseLoadCount(); + graph_node->Lock(); + + Tensor tensor; + std::vector input_tensors; + input_tensors.emplace_back(tensor); + + RunArgs run_args{graph_node, graph_id, session_id, error_context, input_tensors, ge_root_model, context, callback}; + EXPECT_EQ(model_executor.PushGraph(run_args), SUCCESS); + + while (model_executor.run_args_q_.Size() > 0) { + usleep(10); // 0.01ms, Wait for RunThread. + } + EXPECT_EQ(model_executor.Finalize(), SUCCESS); +} + +static void test_run_graph(ModelExecutor &model_executor) { + auto compute_graph = MakeShared("test_graph"); + GeRootModelPtr ge_root_model = MakeShared(compute_graph); + + GeModelPtr ge_model = MakeShared(); + ge_model->SetGraph(GraphUtils::CreateGraphFromComputeGraph(compute_graph)); + ge_root_model->SetSubgraphInstanceNameToModel(compute_graph->GetName(), ge_model); + + GraphId graph_id = 1; + GraphNodePtr graph_node = MakeShared(graph_id); + graph_node->SetGeRootModel(ge_root_model); + graph_node->SetLoadFlag(false); + graph_node->SetAsync(false); // RunGraph is Synchronization. + EXPECT_EQ(model_executor.LoadGraph(ge_root_model, graph_node), SUCCESS); + + std::vector inputs; + std::vector outputs; + EXPECT_EQ(model_executor.RunGraph(graph_node, graph_id, inputs, outputs), SUCCESS); +} + +TEST_F(UtestModelExecutorTest, test_run_graph_train) { + GetThreadLocalContext().SetGlobalOption({{OPTION_GRAPH_RUN_MODE, "1"}}); + ModelExecutor model_executor; + EXPECT_EQ(model_executor.Initialize({}), SUCCESS); + test_run_graph(model_executor); + EXPECT_EQ(model_executor.Finalize(), SUCCESS); +} + +TEST_F(UtestModelExecutorTest, test_run_graph_infer) { + GetThreadLocalContext().SetGlobalOption({}); + GetThreadLocalContext().SetSessionOption({}); + GetThreadLocalContext().SetGraphOption({}); + ModelExecutor model_executor; + EXPECT_EQ(model_executor.Initialize({}), SUCCESS); + test_run_graph(model_executor); + EXPECT_EQ(model_executor.Finalize(), SUCCESS); +} + +TEST_F(UtestModelExecutorTest, test_run_graph_with_stream) { + ModelExecutor model_executor; + EXPECT_EQ(model_executor.Initialize({}), SUCCESS); + + GraphId graph_id = 1; + auto compute_graph = MakeShared("test_graph"); + GeRootModelPtr ge_root_model = MakeShared(compute_graph); + + GeModelPtr ge_model = MakeShared(); + ge_model->SetGraph(GraphUtils::CreateGraphFromComputeGraph(compute_graph)); + ge_root_model->SetSubgraphInstanceNameToModel(compute_graph->GetName(), ge_model); + + GraphNodePtr graph_node = MakeShared(graph_id); + graph_node->SetGeRootModel(ge_root_model); + graph_node->SetLoadFlag(false); + graph_node->SetAsync(true); + + GeTensor tensor; + std::vector inputs{tensor}; + std::vector outputs; + + rtStream_t stream = nullptr; + rtStreamCreate(&stream, 0); + EXPECT_EQ(model_executor.RunGraphWithStream(graph_node, graph_id, stream, inputs, outputs), 145003); + + EXPECT_EQ(model_executor.Finalize(), SUCCESS); + rtStreamDestroy(stream); +} +} // namespace ge diff --git a/tests/ut/ge/graph/manager/graph_manager_unittest.cc b/tests/ut/ge/graph/manager/graph_manager_unittest.cc index 9bae10eb..9663e90f 100644 --- a/tests/ut/ge/graph/manager/graph_manager_unittest.cc +++ b/tests/ut/ge/graph/manager/graph_manager_unittest.cc @@ -15,20 +15,9 @@ */ #include + #include #include -#define protected public -#define private public -#include "graph/manager/graph_manager.h" -#include "graph/load/model_manager/model_manager.h" -#include "graph/load/model_manager/davinci_model.h" -#define const -#include "common/helper/model_cache_helper.h" -#undef const -#include "init/gelib.h" -#undef private -#undef public - #include #include #include @@ -38,6 +27,14 @@ #include #include +#define protected public +#define private public +#include "graph/manager/graph_manager.h" +#define const +#include "common/helper/model_cache_helper.h" +#undef const +#include "init/gelib.h" + #include "common/math/math_util.h" #include "common/thread_pool.h" #include "common/dump/dump_manager.h" @@ -121,7 +118,6 @@ using namespace std; using namespace testing; -using namespace ge; using namespace domi; namespace { @@ -129,6 +125,8 @@ const uint32_t kNotAdded = 0; const uint32_t kStartAdd = 1; const uint32_t kDoneAdded = 2; } + +namespace ge { class UtestGraphManagerTest : public testing::Test { protected: void SetUp() {} @@ -136,6 +134,31 @@ class UtestGraphManagerTest : public testing::Test { void TearDown() {} }; +class StubExecutor : public Executor { + public: + Status LoadGraph(const GeRootModelPtr &ge_root_model, const GraphNodePtr &graph_node) { + return SUCCESS; + } + + Status UnloadGraph(const GeRootModelPtr &ge_root_model, uint32_t graph_id) { + return SUCCESS; + } + + Status PushGraph(const RunArgs &args) { + return SUCCESS; + } + + Status RunGraph(const GraphNodePtr &graph_node, GraphId graph_id, + const std::vector &inputs, std::vector &outputs) { + return SUCCESS; + } + + Status RunGraphWithStream(const GraphNodePtr &graph_node, GraphId graph_id, rtStream_t stream, + const std::vector &inputs, std::vector &outputs){ + return SUCCESS; + } +}; + void CreateGraph(Graph &graph) { TensorDesc desc(ge::Shape({1, 3, 224, 224})); uint32_t size = desc.GetShape().GetShapeSize(); @@ -288,26 +311,20 @@ TEST_F(UtestGraphManagerTest, test_remove_graph_1) { TEST_F(UtestGraphManagerTest, test_remove_graph_2) { GraphId graph_id = 1; GraphManager graph_manager; + StubExecutor stub_executor; + graph_manager.executor_ = &stub_executor; + GraphNodePtr graph_node = MakeShared(graph_id); Graph graph("test_graph"); CreateGraph(graph); auto compute_graph = GraphUtils::GetComputeGraph(graph); GeRootModelPtr ge_root_model = MakeShared(compute_graph); - auto model_manager = ModelManager::GetInstance(); - auto listener = MakeShared(); - shared_ptr davinci_model1 = MakeShared(1, listener); - davinci_model1->SetId(1); - shared_ptr davinci_model2 = MakeShared(2, listener); - davinci_model1->SetId(2); - model_manager->InsertModel(1, davinci_model1); - model_manager->InsertModel(2, davinci_model2); ge_root_model->SetModelId(1); ge_root_model->SetModelId(2); graph_node->SetGeRootModel(ge_root_model); graph_node->SetLoadFlag(true); graph_manager.AddGraphNode(graph_id, graph_node); - Status status = graph_manager.RemoveGraph(graph_id); - EXPECT_EQ(status, ge::SUCCESS); + EXPECT_EQ(graph_manager.RemoveGraph(graph_id), SUCCESS); } TEST_F(UtestGraphManagerTest, test_pre_run_thread) { @@ -327,7 +344,7 @@ TEST_F(UtestGraphManagerTest, test_pre_run_thread) { GraphNodePtr graph_node = MakeShared(graph_id); graph_manager.AddGraphNode(graph_id, graph_node); - graph_manager.PreRunThread(&graph_manager); + graph_manager.PreRunThread(); // end with failed } @@ -355,48 +372,10 @@ TEST_F(UtestGraphManagerTest, test_pre_run_thread_2) { graph_manager.AddGraphNode(graph_id, graph_node_2); ret = graph_manager.prerun_args_q_.Push({graph_id, input_tensor, session_id, error_context, context, callback}); EXPECT_EQ(ret, true); - graph_manager.PreRunThread(&graph_manager); + graph_manager.PreRunThread(); // end with failed } -TEST_F(UtestGraphManagerTest, test_check_and_release_memory) { - - GraphManager graph_manager; - GeModelPtr ge_model = make_shared(); - int64_t memory_size = 25 * 1024UL * 1024UL * 1024UL; - int64_t weight_size = 25 * 1024UL * 1024UL * 1024UL; - uint64_t session_id = 0; - ge::AttrUtils::SetInt(ge_model, ATTR_MODEL_MEMORY_SIZE, memory_size); - ge::AttrUtils::SetInt(ge_model, ATTR_MODEL_WEIGHT_SIZE, weight_size); - ge::AttrUtils::SetInt(ge_model, MODEL_ATTR_SESSION_ID, session_id); - - - GraphId graph_id = 1; - GraphNodePtr graph_node = MakeShared(graph_id); - graph_manager.AddGraphNode(graph_id, graph_node); - graph_manager.IncreaseGraphCount(graph_id); - graph_manager.IncreaseGraphCount(graph_id); - - auto model_manager = ModelManager::GetInstance(); - auto listener = MakeShared(); - shared_ptr davinci_model1 = MakeShared(1, listener); - davinci_model1->SetId(1); - shared_ptr davinci_model2 = MakeShared(2, listener); - davinci_model1->SetId(2); - model_manager->InsertModel(1, davinci_model1); - model_manager->InsertModel(2, davinci_model2); - ComputeGraphPtr compute_graph = MakeShared("test_graph"); - bool is_dynamic_shape = false; - (void)AttrUtils::GetBool(compute_graph, ATTR_NAME_DYNAMIC_SHAPE_PARTITIONED, is_dynamic_shape); - GeRootModelPtr ge_root_model = MakeShared(compute_graph); - ge_root_model->SetModelId(1); - ge_root_model->SetModelId(2); - graph_node->SetGeRootModel(ge_root_model); - graph_node->SetLoadFlag(true); - Status status = graph_manager.CheckAndReleaseMemory(ge_model, graph_node); - EXPECT_EQ(status, ge::SUCCESS); -} - TEST_F(UtestGraphManagerTest, test_check_incre_build_and_pre_run_1) { // no need to build GraphId graph_id = 1; @@ -406,7 +385,7 @@ TEST_F(UtestGraphManagerTest, test_check_incre_build_and_pre_run_1) { GraphManager::PreRunArgs arg; GraphNodePtr graph_node = MakeShared(graph_id); graph_node->SetBuildFlag(true); - Status status = graph_manager.CheckIncreBuildAndPreRun(&graph_manager, arg, graph_node, ge_root_model); + Status status = graph_manager.CheckIncreBuildAndPreRun(arg, graph_node, ge_root_model); EXPECT_EQ(status, ge::SUCCESS); } @@ -422,7 +401,7 @@ TEST_F(UtestGraphManagerTest, test_check_incre_build_and_pre_run_2) { graph_node->SetBuildFlag(true); graph_node->Lock(); graph_manager.var_acc_ctrl_.graph_ids_need_rebuild_.insert(graph_id); - Status status = graph_manager.CheckIncreBuildAndPreRun(&graph_manager, arg, graph_node, ge_root_model); + Status status = graph_manager.CheckIncreBuildAndPreRun(arg, graph_node, ge_root_model); EXPECT_EQ(status, ge::PARAM_INVALID); } @@ -437,7 +416,7 @@ TEST_F(UtestGraphManagerTest, test_check_incre_build_and_pre_run_3) { GraphNodePtr graph_node = MakeShared(graph_id); graph_node->SetBuildFlag(false); graph_node->Lock(); - Status status = graph_manager.CheckIncreBuildAndPreRun(&graph_manager, arg, graph_node, ge_root_model); + Status status = graph_manager.CheckIncreBuildAndPreRun(arg, graph_node, ge_root_model); EXPECT_NE(status, ge::SUCCESS); } @@ -471,14 +450,6 @@ TEST_F(UtestGraphManagerTest, test_add_graph_with_copy_fail) { EXPECT_NE(status, ge::SUCCESS); } -TEST_F(UtestGraphManagerTest, ParseInputsDimsForData_success) { - GraphManager graph_manager; - std::vector input_tensors; - ge::Tensor tensor; - input_tensors.emplace_back(tensor); - graph_manager.ParseInputsDimsForData(input_tensors); -} - TEST_F(UtestGraphManagerTest, test_prerunthread_failed_1) { GraphId graph_id = 1; GraphManager graph_manager; @@ -509,7 +480,7 @@ TEST_F(UtestGraphManagerTest, test_prerunthread_failed_1) { graph_node->SetRunFlag(false); // function return. graph_manager.prerun_args_q_.Push(args); - auto t1 = std::thread(GraphManager::PreRunThread, &graph_manager); + auto t1 = std::thread(&GraphManager::PreRunThread, &graph_manager); if (t1.joinable()) { t1.join(); } @@ -549,7 +520,7 @@ TEST_F(UtestGraphManagerTest, test_prerunthread_failed_2) { int ret = setenv("ENABLE_NETWORK_ANALYSIS_DEBUG", "1", 1); EXPECT_EQ(ret, 0); graph_manager.prerun_args_q_.Push(args); - auto t1 = std::thread(GraphManager::PreRunThread, &graph_manager); + auto t1 = std::thread(&GraphManager::PreRunThread, &graph_manager); if (t1.joinable()) { t1.join(); } @@ -593,3 +564,4 @@ TEST_F(UtestGraphManagerTest, ChangeAndDeleteConst_success) { auto all_nodes = graph->GetDirectNode(); EXPECT_EQ(all_nodes.size(), 3); } +} // namespace ge diff --git a/tests/ut/ge/graph/passes/folding_kernel/gather_v2_kernel_unittest.cc b/tests/ut/ge/graph/passes/folding_kernel/gather_v2_kernel_unittest.cc index 0083146b..ad165d25 100644 --- a/tests/ut/ge/graph/passes/folding_kernel/gather_v2_kernel_unittest.cc +++ b/tests/ut/ge/graph/passes/folding_kernel/gather_v2_kernel_unittest.cc @@ -92,7 +92,7 @@ TEST_F(UtestGraphPassesFoldingKernelGatherV2Kernel, INT32Axis0VersionA) { GeTensorPtr tensor_out = outputs[0]; int32_t *data_buf = (int32_t *)tensor_out->GetData().data(); vector expect_out = {2, 2}; - for (int i = 0; i < expect_out.size(); i++) { + for (size_t i = 0; i < expect_out.size(); i++) { EXPECT_EQ(*(data_buf + i), expect_out[i]); } } @@ -139,7 +139,7 @@ TEST_F(UtestGraphPassesFoldingKernelGatherV2Kernel, INT32Axis0VersionB) { GeTensorPtr tensor_out = outputs[0]; int32_t *data_buf = (int32_t *)tensor_out->GetData().data(); vector expect_out = {3, 3}; - for (int i = 0; i < expect_out.size(); i++) { + for (size_t i = 0; i < expect_out.size(); i++) { EXPECT_EQ(*(data_buf + i), expect_out[i]); } } @@ -186,7 +186,7 @@ TEST_F(UtestGraphPassesFoldingKernelGatherV2Kernel, INT64Axis0) { GeTensorPtr tensor_out = outputs[0]; int64_t *data_buf = (int64_t *)tensor_out->GetData().data(); vector expect_out = {3, 3}; - for (int i = 0; i < expect_out.size(); i++) { + for (size_t i = 0; i < expect_out.size(); i++) { EXPECT_EQ(*(data_buf + i), expect_out[i]); } } @@ -233,7 +233,7 @@ TEST_F(UtestGraphPassesFoldingKernelGatherV2Kernel, INT32Axis0) { GeTensorPtr tensor_out = outputs[0]; int32_t *data_buf = (int32_t *)tensor_out->GetData().data(); vector expect_out = {11, 12, 13, 14, 15, 16, 17, 18, 19, 11, 12, 13, 14, 15, 16, 17, 18, 19}; - for (int i = 0; i < expect_out.size(); i++) { + for (size_t i = 0; i < expect_out.size(); i++) { EXPECT_EQ(*(data_buf + i), expect_out[i]); } } @@ -279,7 +279,7 @@ TEST_F(UtestGraphPassesFoldingKernelGatherV2Kernel, INT32Axis0And1) { GeTensorPtr tensor_out = outputs[0]; int32_t *data_buf = (int32_t *)tensor_out->GetData().data(); vector expect_out = {11, 12, 13, 14, 15, 16, 17, 18, 19, 1, 2, 3, 4, 5, 6, 7, 8, 9}; - for (int i = 0; i < expect_out.size(); i++) { + for (size_t i = 0; i < expect_out.size(); i++) { EXPECT_EQ(*(data_buf + i), expect_out[i]); } } @@ -327,7 +327,7 @@ TEST_F(UtestGraphPassesFoldingKernelGatherV2Kernel, INT32Axis1) { GeTensorPtr tensor_out = outputs[0]; int32_t *data_buf = (int32_t *)tensor_out->GetData().data(); vector expect_out = {4, 5, 6, 4, 5, 6, 14, 15, 16, 14, 15, 16}; - for (int i = 0; i < expect_out.size(); i++) { + for (size_t i = 0; i < expect_out.size(); i++) { EXPECT_EQ(*(data_buf + i), expect_out[i]); } } @@ -374,7 +374,7 @@ TEST_F(UtestGraphPassesFoldingKernelGatherV2Kernel, INT32Axis2) { GeTensorPtr tensor_out = outputs[0]; int32_t *data_buf = (int32_t *)tensor_out->GetData().data(); vector expect_out = {1, 1, 4, 4, 7, 7, 11, 11, 14, 14, 17, 17}; - for (int i = 0; i < expect_out.size(); i++) { + for (size_t i = 0; i < expect_out.size(); i++) { EXPECT_EQ(*(data_buf + i), expect_out[i]); } } @@ -422,7 +422,7 @@ TEST_F(UtestGraphPassesFoldingKernelGatherV2Kernel, INT32Axis3) { GeTensorPtr tensor_out = outputs[0]; int32_t *data_buf = (int32_t *)tensor_out->GetData().data(); vector expect_out = {1, 2, 4, 5, 7, 8, 11, 12, 14, 15, 17, 18, 1, 2, 4, 5, 7, 8, 11, 12, 14, 15, 17, 18}; - for (int i = 0; i < expect_out.size(); i++) { + for (size_t i = 0; i < expect_out.size(); i++) { EXPECT_EQ(*(data_buf + i), expect_out[i]); } } @@ -470,7 +470,7 @@ TEST_F(UtestGraphPassesFoldingKernelGatherV2Kernel, INT8Axis0) { GeTensorPtr tensor_out = outputs[0]; int8_t *data_buf = (int8_t *)tensor_out->GetData().data(); vector expect_out = {2, 2}; - for (int i = 0; i < expect_out.size(); i++) { + for (size_t i = 0; i < expect_out.size(); i++) { EXPECT_EQ(*(data_buf + i), expect_out[i]); } } @@ -517,7 +517,7 @@ TEST_F(UtestGraphPassesFoldingKernelGatherV2Kernel, INT16Axis0) { GeTensorPtr tensor_out = outputs[0]; int16_t *data_buf = (int16_t *)tensor_out->GetData().data(); vector expect_out = {2, 2}; - for (int i = 0; i < expect_out.size(); i++) { + for (size_t i = 0; i < expect_out.size(); i++) { EXPECT_EQ(*(data_buf + i), expect_out[i]); } } @@ -564,7 +564,7 @@ TEST_F(UtestGraphPassesFoldingKernelGatherV2Kernel, UINT8Axis0) { GeTensorPtr tensor_out = outputs[0]; uint8_t *data_buf = (uint8_t *)tensor_out->GetData().data(); vector expect_out = {2, 2}; - for (int i = 0; i < expect_out.size(); i++) { + for (size_t i = 0; i < expect_out.size(); i++) { EXPECT_EQ(*(data_buf + i), expect_out[i]); } } @@ -611,7 +611,7 @@ TEST_F(UtestGraphPassesFoldingKernelGatherV2Kernel, UINT16Axis0) { GeTensorPtr tensor_out = outputs[0]; uint16_t *data_buf = (uint16_t *)tensor_out->GetData().data(); vector expect_out = {2, 2}; - for (int i = 0; i < expect_out.size(); i++) { + for (size_t i = 0; i < expect_out.size(); i++) { EXPECT_EQ(*(data_buf + i), expect_out[i]); } } @@ -658,7 +658,7 @@ TEST_F(UtestGraphPassesFoldingKernelGatherV2Kernel, UINT32Axis0) { GeTensorPtr tensor_out = outputs[0]; uint32_t *data_buf = (uint32_t *)tensor_out->GetData().data(); vector expect_out = {2, 2}; - for (int i = 0; i < expect_out.size(); i++) { + for (size_t i = 0; i < expect_out.size(); i++) { EXPECT_EQ(*(data_buf + i), expect_out[i]); } } @@ -705,7 +705,7 @@ TEST_F(UtestGraphPassesFoldingKernelGatherV2Kernel, UINT64Axis0) { GeTensorPtr tensor_out = outputs[0]; uint64_t *data_buf = (uint64_t *)tensor_out->GetData().data(); vector expect_out = {2, 2}; - for (int i = 0; i < expect_out.size(); i++) { + for (size_t i = 0; i < expect_out.size(); i++) { EXPECT_EQ(*(data_buf + i), expect_out[i]); } } @@ -753,7 +753,7 @@ TEST_F(UtestGraphPassesFoldingKernelGatherV2Kernel, DoubleAxis0) { GeTensorPtr tensor_out = outputs[0]; double *data_buf = (double *)tensor_out->GetData().data(); vector expect_out = {2, 2}; - for (int i = 0; i < expect_out.size(); i++) { + for (size_t i = 0; i < expect_out.size(); i++) { double diff = *(data_buf + i) - expect_out[i]; bool is_same = fabs(diff) < 0.0001 ? true : false; EXPECT_EQ(is_same, true); @@ -802,7 +802,7 @@ TEST_F(UtestGraphPassesFoldingKernelGatherV2Kernel, Float16Axis0) { GeTensorPtr tensor_out = outputs[0]; fp16_t *data_buf = (fp16_t *)tensor_out->GetData().data(); vector expect_out = {2, 2}; - for (int i = 0; i < expect_out.size(); i++) { + for (size_t i = 0; i < expect_out.size(); i++) { double diff = (double)*(data_buf + i) - (double)expect_out[i]; bool is_same = fabs(diff) < 0.0001 ? true : false; EXPECT_EQ(is_same, true); diff --git a/tests/ut/ge/graph/passes/mark_node_unknown_shape_pass_unittest.cc b/tests/ut/ge/graph/passes/mark_node_unknown_shape_pass_unittest.cc index 5157e510..c7d36582 100644 --- a/tests/ut/ge/graph/passes/mark_node_unknown_shape_pass_unittest.cc +++ b/tests/ut/ge/graph/passes/mark_node_unknown_shape_pass_unittest.cc @@ -33,7 +33,7 @@ protected: void SetUp() {} void TearDown() {} public: - NodePtr MakeNode(const ComputeGraphPtr &graph, uint32_t in_num, uint32_t out_num, string name, string type) { + NodePtr MakeNode(const ComputeGraphPtr &graph, int in_num, int out_num, string name, string type) { GeTensorDesc test_desc(GeShape(), FORMAT_NCHW, DT_FLOAT); auto op_desc = std::make_shared(name, type); for (auto i = 0; i < in_num; ++i) { diff --git a/tests/ut/ge/graph/passes/multi_batch_clone_pass_unittest.cc b/tests/ut/ge/graph/passes/multi_batch_clone_pass_unittest.cc index 1b75a613..c752cea4 100644 --- a/tests/ut/ge/graph/passes/multi_batch_clone_pass_unittest.cc +++ b/tests/ut/ge/graph/passes/multi_batch_clone_pass_unittest.cc @@ -45,7 +45,7 @@ protected: } public: - NodePtr MakeNode(const ComputeGraphPtr &graph, uint32_t in_num, uint32_t out_num, string name, string type) { + NodePtr MakeNode(const ComputeGraphPtr &graph, int in_num, int out_num, string name, string type) { GeTensorDesc test_desc(GeShape(), FORMAT_NCHW, DT_FLOAT); auto op_desc = std::make_shared(name, type); for (auto i = 0; i < in_num; ++i) { diff --git a/tests/ut/ge/graph/passes/subgraph_const_migration_pass_unittest.cc b/tests/ut/ge/graph/passes/subgraph_const_migration_pass_unittest.cc index 00157395..c633c0e1 100644 --- a/tests/ut/ge/graph/passes/subgraph_const_migration_pass_unittest.cc +++ b/tests/ut/ge/graph/passes/subgraph_const_migration_pass_unittest.cc @@ -32,7 +32,7 @@ class UtestSubgraphConstMigrationPass : public testing::Test { void TearDown() {} public: - NodePtr MakeNode(const ComputeGraphPtr &graph, uint32_t in_num, uint32_t out_num, string name, string type) { + NodePtr MakeNode(const ComputeGraphPtr &graph, int in_num, int out_num, string name, string type) { GeTensorDesc test_desc(GeShape(), FORMAT_NCHW, DT_FLOAT); auto op_desc = std::make_shared(name, type); for (auto i = 0; i < in_num; ++i) { diff --git a/tests/ut/ge/session/inner_session_unittest.cc b/tests/ut/ge/session/inner_session_unittest.cc index 0d20f06a..80cc2834 100644 --- a/tests/ut/ge/session/inner_session_unittest.cc +++ b/tests/ut/ge/session/inner_session_unittest.cc @@ -19,21 +19,18 @@ #define private public #define protected public #include "session/inner_session.h" -#undef private -#undef protected - using namespace std; namespace ge { -class Utest_Inner_session : public testing::Test { +class UtestInnerSession : public testing::Test { protected: void SetUp() override {} void TearDown() override {} }; -TEST_F(Utest_Inner_session, build_graph_success) { +TEST_F(UtestInnerSession, build_graph_success) { std::map options; uint64_t session_id = 1; InnerSession inner_seesion(session_id, options); @@ -44,17 +41,15 @@ TEST_F(Utest_Inner_session, build_graph_success) { EXPECT_NE(ret, ge::SUCCESS); } -TEST_F(Utest_Inner_session, initialize) { - std::map options = { - {ge::MODIFY_MIXLIST, "/modify.json"} - }; +TEST_F(UtestInnerSession, initialize) { + std::map options = {}; uint64_t session_id = 1; InnerSession inner_session(session_id, options); - auto ret = inner_session.Initialize(); - EXPECT_NE(ret, ge::SUCCESS); + EXPECT_EQ(inner_session.Initialize(), SUCCESS); + EXPECT_EQ(inner_session.Finalize(), SUCCESS); } -TEST_F(Utest_Inner_session, check_op_precision_mode) { +TEST_F(UtestInnerSession, check_op_precision_mode) { std::map options = { {ge::OP_PRECISION_MODE, "./op_precision_mode.ini"} };