diff --git a/ge/graph/load/model_manager/davinci_model.h b/ge/graph/load/model_manager/davinci_model.h index 4c06ad98..7f130b43 100755 --- a/ge/graph/load/model_manager/davinci_model.h +++ b/ge/graph/load/model_manager/davinci_model.h @@ -288,6 +288,8 @@ class DavinciModel { const vector &GetLabelList() const { return label_list_; } + uint64_t GetAllStreamNum() const { return stream_list_.size() + all_hccl_stream_list_.size(); } + Status GetLabelGotoAddr(uint32_t label_index, rtMemType_t memory_type, void *&addr, uint32_t &size); Status DestroyThread(); diff --git a/ge/graph/load/model_manager/model_manager.cc b/ge/graph/load/model_manager/model_manager.cc index 6a563d2f..c9f31019 100755 --- a/ge/graph/load/model_manager/model_manager.cc +++ b/ge/graph/load/model_manager/model_manager.cc @@ -44,9 +44,12 @@ const std::string kCmdTypeProfModelSubscribe = "prof_model_subscribe"; const std::string kCmdTypeProfModelUnsubscribe = "prof_model_cancel_subscribe"; const char *const kBatchLoadBuf = "batchLoadsoFrombuf"; const char *const kDeleteCustOp = "deleteCustOp"; +const char *const kStreamResource = "stream"; +const char *const kEventResource = "event"; const int kTimeSpecNano = 1000000000; const int kTimeSpecMiro = 1000000; const int kOpNameMaxSize = 100; +const int kMaxEventNum = 1024; const uint64_t kInferSessionId = 0; #pragma pack(push, 1) struct CustAicpuSoBuf { @@ -372,6 +375,126 @@ Status ModelManager::LoadModelOnline(uint32_t &model_id, const shared_ptr free_stream_num) { + status = ReleaseResource(need_stream_num, free_stream_num, kStreamResource); + if (status != SUCCESS) { + GELOGE(FAILED, "Release stream resoure failed"); + return FAILED; + } + } + + int64_t free_event_num = 0; + GetFreeEvent(free_event_num); + if (need_event_num > free_event_num) { + status = ReleaseResource(need_event_num, free_event_num, kEventResource); + if (status != SUCCESS) { + GELOGE(FAILED, "Release event resource failed"); + return FAILED; + } + } + return SUCCESS; +} + +Status ModelManager::ReleaseResource(int64_t need_resource, int64_t free_resource, const string &resource_kind) { + while (need_resource > free_resource) { + uint32_t max_stream_model_id = 0; + uint32_t max_event_model_id = 0; + GetMaxStreamAndEventModel(max_stream_model_id, max_event_model_id); + GELOGD("The max stream num model is: %u, the max event num model is :%u",max_stream_model_id,max_event_model_id); + std::lock_guard lock(map_mutex_); + if (resource_kind == kStreamResource) { + uint64_t max_stream_num = model_map_.at(max_stream_model_id)->GetAllStreamNum(); + Status ret = Unload(max_stream_model_id); + if (ret != SUCCESS) { + GELOGE(FAILED, "Unload max stream model failed, model id : %u",max_stream_model_id); + return FAILED; + } + free_resource = free_resource + max_stream_num; + GELOGD("Unload model for stream, model id : %u, stream num : %lu", max_stream_model_id, max_stream_num); + } + if (resource_kind == kEventResource) { + uint64_t max_event_num = model_map_.at(max_event_model_id)->GetEventList().size(); + Status ret = Unload(max_event_model_id); + if (ret != SUCCESS) { + GELOGE(FAILED, "Unload max event model failed, model id : %u", max_event_model_id); + return FAILED; + } + free_resource = free_resource + max_event_num; + GELOGD("Unload model for event, model id : %u, event num : %zu", max_event_model_id, max_event_num); + } + } + return SUCCESS; +} + +Status ModelManager::GetFreeStream(int64_t &free_stream) { + uint32_t max_stream_cout; + uint32_t max_task_cout; + rtError_t ret = rtGetMaxStreamAndTask(RT_NORMAL_STREAM, &max_stream_cout, &max_task_cout); + if (ret != RT_ERROR_NONE) { + REPORT_INNER_ERROR("E19999", "Call rtGetMaxStreamAndTask failed"); + GELOGE(FAILED, "Get max stream and task cout failed"); + return FAILED; + } + GELOGD("Allowed max stream cout :%u, maxi task cout per stream:%u",max_stream_cout, max_task_cout); + std::lock_guard lock(map_mutex_); + uint64_t stream_sum = 0; + + for (auto &it : model_map_) { + stream_sum = stream_sum + it.second->GetAllStreamNum(); + } + free_stream = max_stream_cout - stream_sum; + return SUCCESS; +} + +void ModelManager::GetFreeEvent(int64_t &free_event) { + std::lock_guard lock(map_mutex_); + uint64_t event_sum; + for (auto &it : model_map_) { + event_sum = event_sum + it.second->GetEventList().size(); + } + free_event = kMaxEventNum - event_sum; +} + +void ModelManager::GetMaxStreamAndEventModel(uint32_t &max_stream_model, uint32_t &max_event_model) { + std::lock_guard lock(map_mutex_); + uint64_t max_stream_num = 0; + uint64_t max_event_num = 0; + for (auto &it : model_map_) { + if (it.second ->GetAllStreamNum() > max_stream_num) { + max_stream_num = it.second->GetAllStreamNum(); + max_stream_model = it.first; + } + if (it.second->GetEventList().size() > max_event_num) { + max_event_num = it.second->GetEventList().size(); + max_event_model = it.first; + } + } +} + void ModelManager::InsertModel(uint32_t model_id, std::shared_ptr &davinci_model) { GE_CHK_BOOL_EXEC(davinci_model != nullptr, return, "[Check][Param] davinci_model ptr is null, id:%u", model_id); std::lock_guard lock(map_mutex_); diff --git a/ge/graph/load/model_manager/model_manager.h b/ge/graph/load/model_manager/model_manager.h index e35bb7aa..ff56356c 100755 --- a/ge/graph/load/model_manager/model_manager.h +++ b/ge/graph/load/model_manager/model_manager.h @@ -336,7 +336,12 @@ class FMK_FUNC_HOST_VISIBILITY FMK_FUNC_DEV_VISIBILITY ModelManager { /// void InsertModel(uint32_t model_id, std::shared_ptr &davinci_model); void InsertModel(uint32_t model_id, std::shared_ptr &hybrid_model); - + + Status CheckAndReleaseStreamEventResource(const GeModelPtr &ge_model, uint32_t model_id); + Status ReleaseResource(int64_t need_resource, int64_t free_resource, const string &resource_kind); + Status GetFreeStream(int64_t &free_stream); + void GetFreeEvent(int64_t &free_event); + void GetMaxStreamAndEventModel(uint32_t &max_stream_model, uint32_t &max_event_model); /// /// @ingroup domi_ome /// @brief delete model from model manager set diff --git a/ge/graph/load/model_manager/model_utils.cc b/ge/graph/load/model_manager/model_utils.cc index 224a3331..16c2cd36 100755 --- a/ge/graph/load/model_manager/model_utils.cc +++ b/ge/graph/load/model_manager/model_utils.cc @@ -43,6 +43,9 @@ } \ } while (0) +namespace { + const char *const kUsedStreamNum = "used_stream_num"; +} // namespace namespace ge { /// /// @ingroup ge @@ -620,4 +623,54 @@ Status ModelUtils::GetRtAddress(const RuntimeParam ¶m, uintptr_t logic_addr, mem_addr = runtime_base_addr + logic_addr; return SUCCESS; } + +Status ModelUtils::CalculateFollowStream(const GeModelPtr &ge_model, int64_t &hccl_fellow_stream_num) { + const auto &model_def = ge_model->GetModelTaskDefPtr(); + GE_CHECK_NOTNULL(model_def); + Graph graph = ge_model->GetGraph(); + ComputeGraphPtr compute_graph = GraphUtils::GetComputeGraph(graph); + GE_CHECK_NOTNULL(compute_graph); + + map op_list; + for (const auto &node : compute_graph->GetDirectNode()) { + OpDescPtr op_desc = node->GetOpDesc(); + GE_CHECK_NOTNULL(op_desc); + op_list.emplace(op_desc->GetId(), op_desc); + } + + std::multimap main_follow_num; + for (int32_t i = 0; i < model_def->task_size(); i++) { + const domi::TaskDef &task = model_def->task(i); + if (static_cast(task.type() == RT_MODEL_TASK_HCCL)){ + auto hccl_def = task.kernel_hccl(); + OpDescPtr hccl_op_desc = op_list.at(hccl_def.op_index()); + int64_t main_stream_id = hccl_op_desc->GetStreamId(); + int64_t follow_stream_num = 0; + if (!ge::AttrUtils::GetInt(hccl_op_desc, kUsedStreamNum, follow_stream_num)) { + GELOGW("Get used stream num failed, op is %s", hccl_op_desc->GetName().c_str()); + } + main_follow_num.emplace(main_stream_id, follow_stream_num); + } + } + hccl_fellow_stream_num = CalFollowStramSum(main_follow_num); + return SUCCESS; +} + +int64_t ModelUtils::CalFollowStramSum(const std::multimap &hccl_stream_map) { + std::map max_follow_stream_map; + for (const auto &it : hccl_stream_map) { + auto max_it = max_follow_stream_map.find(it.first); + if(max_it == max_follow_stream_map.end()) { + max_follow_stream_map.emplace(it.first, it.second); + } else if (it.second > max_it->second) { + max_follow_stream_map.at(max_it->first) = it.second; + } + } + int64_t need_follow_stream_num = 0; + for (const auto &follow_it : max_follow_stream_map) { + need_follow_stream_num = need_follow_stream_num + follow_it.second; + } + GELOGD("Need follow num is %ld", need_follow_stream_num); + return need_follow_stream_num; +} } // namespace ge diff --git a/ge/graph/load/model_manager/model_utils.h b/ge/graph/load/model_manager/model_utils.h index 8ce1b060..ac69d929 100755 --- a/ge/graph/load/model_manager/model_utils.h +++ b/ge/graph/load/model_manager/model_utils.h @@ -24,6 +24,7 @@ #include "graph/load/model_manager/task_info/task_info.h" #include "graph/op_desc.h" #include "graph/utils/tensor_adapter.h" +#include "model/ge_model.h" using std::vector; @@ -108,6 +109,19 @@ class ModelUtils { /// static Status GetRtAddress(const RuntimeParam &model_param, uintptr_t logic_addr, uint8_t *&mem_addr); + /// + /// @ingroup ge + /// @brief Calculate hccl follw stream + /// @return Status + /// + static Status CalculateFollowStream(const GeModelPtr &ge_model, int64_t &hccl_fellow_stream_num); + + /// + /// @ingroup ge + /// @brief Calculate the sum of follow stream + /// @return int64_t + /// + static int64_t CalFollowStramSum(const std::multimap &hccl_stream_map); private: /// /// @ingroup ge diff --git a/tests/ut/ge/graph/load/model_manager_unittest.cc b/tests/ut/ge/graph/load/model_manager_unittest.cc index a3545b33..02fd3a81 100644 --- a/tests/ut/ge/graph/load/model_manager_unittest.cc +++ b/tests/ut/ge/graph/load/model_manager_unittest.cc @@ -438,4 +438,98 @@ TEST_F(UtestModelManagerModelManager, test_data_input_tensor) { auto ret = mm.DataInputTensor(model_id,inputs); EXPECT_EQ(PARAM_INVALID, ret); // HybridDavinciModel::impl_ is null. } -} // namespace ge + +TEST_F(UtestModelManagerModelManager, test_init_dump_properties_with_new_session_id) { + ModelManager model_manager; + uint64_t session_id = 1; + model_manager.InitDumPropertiesWithNewSessionId(session_id); +} + +TEST_F(UtestModelManagerModelManager, test_update_session_id) { + ModelManager model_manager; + uint32_t model_id = 0; + uint64_t session_id = 0; + GeModelPtr ge_model = MakeShared(); + std::shared_ptr davinci_model = MakeShared(0, nullptr); + model_manager.UpdateSessionId(model_id, ge_model, davinci_model, session_id); +} + +TEST_F(UtestModelManagerModelManager, test_has_var_node) { + ModelManager model_manager; + uint64_t session_id = 1; + Graph graph("test"); + CreateGraph(graph); + auto compute_graph = ge::GraphUtils::GetComputeGraph(graph); + model_manager.HasVarNode(compute_graph); +} + +TEST_F(UtestModelManagerModelManager, Cal_follow_stream_sum) { + std::multimap hccl_stream_map = {{1,10}, {1,20}, {2,10}, {2,5}}; + int64_t result = ModelUtils::CalFollowStramSum(hccl_stream_map); + EXPECT_EQ(result, 30); +} + +TEST_F(UtestModelManagerModelManager, get_max_stream_and_event) { + ModelManager mm; + auto model1 = std::make_shared (1, nullptr); + auto model2 = std::make_shared (2, nullptr); + rtStream_t stream = nullptr; + rtStream_t stream2 = nullptr; + rtStream_t stream3 = nullptr; + rtStream_t stream4 = nullptr; + rtEvent_t event = nullptr; + rtEvent_t event2 = nullptr; + rtEvent_t event3 = nullptr; + model1->stream_list_ = {stream, stream2, stream3, stream4}; + model1->event_list_ = {event, event2}; + model2->stream_list_ = {stream, stream2}; + model2->event_list_ = {event, event2, event3}; + + mm.InsertModel(1, model1); + mm.InsertModel(2, model2); + uint32_t max_stream_model; + uint32_t max_event_model; + mm.GetMaxStreamAndEventModel(max_stream_model, max_event_model); + EXPECT_EQ(max_stream_model, 1); + EXPECT_EQ(max_event_model, 2); + + int64_t free_stream; + int64_t free_event; + Status ret = mm.GetFreeStream(free_stream); + EXPECT_EQ(ret, SUCCESS); +} + +TEST_F(UtestModelManagerModelManager, release_resource_stream) { + ModelManager mm; + auto model1 = std::make_shared (1, nullptr); + auto model2 = std::make_shared (2, nullptr); + rtStream_t stream = nullptr; + rtStream_t stream2 = nullptr; + rtStream_t stream3 = nullptr; + rtStream_t stream4 = nullptr; + rtEvent_t event = nullptr; + rtEvent_t event2 = nullptr; + rtEvent_t event3 = nullptr; + model1->stream_list_ = {stream, stream2, stream3, stream4}; + model1->event_list_ = {event, event2}; + model2->stream_list_ = {stream, stream2}; + model2->event_list_ = {event, event2, event3}; + + mm.InsertModel(1, model1); + mm.InsertModel(2, model2); + string kind = "stream"; + Status ret = mm.ReleaseResource(110, 109, kind); + EXPECT_EQ(ret, SUCCESS); + + string kind2 = "event"; + Status ret2 = mm.ReleaseResource(110, 109, kind2); + EXPECT_EQ(ret2, SUCCESS); +} + +TEST_F(UtestModelManagerModelManager, check_stream_and_event_resource) { + ModelManager mm; + auto ge_model = make_shared(); + Status ret = mm.CheckAndReleaseStreamEventResource(ge_model, 1); + EXPECT_EQ(ret, FAILED); +} +} // namespace ge