| @@ -288,6 +288,8 @@ class DavinciModel { | |||
| const vector<rtLabel_t> &GetLabelList() const { return label_list_; } | |||
| uint64_t GetAllStreamNum() const { return stream_list_.size() + all_hccl_stream_list_.size(); } | |||
| Status GetLabelGotoAddr(uint32_t label_index, rtMemType_t memory_type, void *&addr, uint32_t &size); | |||
| Status DestroyThread(); | |||
| @@ -44,9 +44,12 @@ const std::string kCmdTypeProfModelSubscribe = "prof_model_subscribe"; | |||
| const std::string kCmdTypeProfModelUnsubscribe = "prof_model_cancel_subscribe"; | |||
| const char *const kBatchLoadBuf = "batchLoadsoFrombuf"; | |||
| const char *const kDeleteCustOp = "deleteCustOp"; | |||
| const char *const kStreamResource = "stream"; | |||
| const char *const kEventResource = "event"; | |||
| const int kTimeSpecNano = 1000000000; | |||
| const int kTimeSpecMiro = 1000000; | |||
| const int kOpNameMaxSize = 100; | |||
| const int kMaxEventNum = 1024; | |||
| const uint64_t kInferSessionId = 0; | |||
| #pragma pack(push, 1) | |||
| struct CustAicpuSoBuf { | |||
| @@ -372,6 +375,126 @@ Status ModelManager::LoadModelOnline(uint32_t &model_id, const shared_ptr<ge::Ge | |||
| return ret; | |||
| } | |||
| Status ModelManager::CheckAndReleaseStreamEventResource(const GeModelPtr &ge_model, uint32_t model_id) { | |||
| GE_CHECK_NOTNULL(ge_model); | |||
| int64_t value = 0; | |||
| bool ret = ge::AttrUtils::GetInt(ge_model, ATTR_MODEL_STREAM_NUM, value); | |||
| int64_t need_stream_num = ret? value : 0; | |||
| ret = ge::AttrUtils::GetInt(ge_model, ATTR_MODEL_EVENT_NUM, value); | |||
| int64_t need_event_num = ret? value : 0; | |||
| GELOGD("The main stream number is %lu, event number is %lu",need_stream_num, need_event_num); | |||
| int64_t hccl_follow_stream = 0; | |||
| Status status = ModelUtils::CalculateFollowStream(ge_model, hccl_follow_stream); | |||
| if (status != SUCCESS) { | |||
| GELOGE(FAILED,"[Calculate][stream] Calculate follow stream num failed"); | |||
| return FAILED; | |||
| } | |||
| need_stream_num = need_stream_num + hccl_follow_stream; | |||
| GELOGD("The model is %u need stream is %ld", model_id, need_stream_num); | |||
| int64_t free_stream_num = 0; | |||
| status = GetFreeStream(free_stream_num); | |||
| if (status != SUCCESS) { | |||
| GELOGE(FAILED, "Get free stream num failed"); | |||
| return FAILED; | |||
| } | |||
| if (need_stream_num > free_stream_num) { | |||
| status = ReleaseResource(need_stream_num, free_stream_num, kStreamResource); | |||
| if (status != SUCCESS) { | |||
| GELOGE(FAILED, "Release stream resoure failed"); | |||
| return FAILED; | |||
| } | |||
| } | |||
| int64_t free_event_num = 0; | |||
| GetFreeEvent(free_event_num); | |||
| if (need_event_num > free_event_num) { | |||
| status = ReleaseResource(need_event_num, free_event_num, kEventResource); | |||
| if (status != SUCCESS) { | |||
| GELOGE(FAILED, "Release event resource failed"); | |||
| return FAILED; | |||
| } | |||
| } | |||
| return SUCCESS; | |||
| } | |||
| Status ModelManager::ReleaseResource(int64_t need_resource, int64_t free_resource, const string &resource_kind) { | |||
| while (need_resource > free_resource) { | |||
| uint32_t max_stream_model_id = 0; | |||
| uint32_t max_event_model_id = 0; | |||
| GetMaxStreamAndEventModel(max_stream_model_id, max_event_model_id); | |||
| GELOGD("The max stream num model is: %u, the max event num model is :%u",max_stream_model_id,max_event_model_id); | |||
| std::lock_guard<std::recursive_mutex> lock(map_mutex_); | |||
| if (resource_kind == kStreamResource) { | |||
| uint64_t max_stream_num = model_map_.at(max_stream_model_id)->GetAllStreamNum(); | |||
| Status ret = Unload(max_stream_model_id); | |||
| if (ret != SUCCESS) { | |||
| GELOGE(FAILED, "Unload max stream model failed, model id : %u",max_stream_model_id); | |||
| return FAILED; | |||
| } | |||
| free_resource = free_resource + max_stream_num; | |||
| GELOGD("Unload model for stream, model id : %u, stream num : %lu", max_stream_model_id, max_stream_num); | |||
| } | |||
| if (resource_kind == kEventResource) { | |||
| uint64_t max_event_num = model_map_.at(max_event_model_id)->GetEventList().size(); | |||
| Status ret = Unload(max_event_model_id); | |||
| if (ret != SUCCESS) { | |||
| GELOGE(FAILED, "Unload max event model failed, model id : %u", max_event_model_id); | |||
| return FAILED; | |||
| } | |||
| free_resource = free_resource + max_event_num; | |||
| GELOGD("Unload model for event, model id : %u, event num : %zu", max_event_model_id, max_event_num); | |||
| } | |||
| } | |||
| return SUCCESS; | |||
| } | |||
| Status ModelManager::GetFreeStream(int64_t &free_stream) { | |||
| uint32_t max_stream_cout; | |||
| uint32_t max_task_cout; | |||
| rtError_t ret = rtGetMaxStreamAndTask(RT_NORMAL_STREAM, &max_stream_cout, &max_task_cout); | |||
| if (ret != RT_ERROR_NONE) { | |||
| REPORT_INNER_ERROR("E19999", "Call rtGetMaxStreamAndTask failed"); | |||
| GELOGE(FAILED, "Get max stream and task cout failed"); | |||
| return FAILED; | |||
| } | |||
| GELOGD("Allowed max stream cout :%u, maxi task cout per stream:%u",max_stream_cout, max_task_cout); | |||
| std::lock_guard<std::recursive_mutex> lock(map_mutex_); | |||
| uint64_t stream_sum = 0; | |||
| for (auto &it : model_map_) { | |||
| stream_sum = stream_sum + it.second->GetAllStreamNum(); | |||
| } | |||
| free_stream = max_stream_cout - stream_sum; | |||
| return SUCCESS; | |||
| } | |||
| void ModelManager::GetFreeEvent(int64_t &free_event) { | |||
| std::lock_guard<std::recursive_mutex> lock(map_mutex_); | |||
| uint64_t event_sum; | |||
| for (auto &it : model_map_) { | |||
| event_sum = event_sum + it.second->GetEventList().size(); | |||
| } | |||
| free_event = kMaxEventNum - event_sum; | |||
| } | |||
| void ModelManager::GetMaxStreamAndEventModel(uint32_t &max_stream_model, uint32_t &max_event_model) { | |||
| std::lock_guard<std::recursive_mutex> lock(map_mutex_); | |||
| uint64_t max_stream_num = 0; | |||
| uint64_t max_event_num = 0; | |||
| for (auto &it : model_map_) { | |||
| if (it.second ->GetAllStreamNum() > max_stream_num) { | |||
| max_stream_num = it.second->GetAllStreamNum(); | |||
| max_stream_model = it.first; | |||
| } | |||
| if (it.second->GetEventList().size() > max_event_num) { | |||
| max_event_num = it.second->GetEventList().size(); | |||
| max_event_model = it.first; | |||
| } | |||
| } | |||
| } | |||
| void ModelManager::InsertModel(uint32_t model_id, std::shared_ptr<DavinciModel> &davinci_model) { | |||
| GE_CHK_BOOL_EXEC(davinci_model != nullptr, return, "[Check][Param] davinci_model ptr is null, id:%u", model_id); | |||
| std::lock_guard<std::recursive_mutex> lock(map_mutex_); | |||
| @@ -336,7 +336,12 @@ class FMK_FUNC_HOST_VISIBILITY FMK_FUNC_DEV_VISIBILITY ModelManager { | |||
| /// | |||
| void InsertModel(uint32_t model_id, std::shared_ptr<DavinciModel> &davinci_model); | |||
| void InsertModel(uint32_t model_id, std::shared_ptr<hybrid::HybridDavinciModel> &hybrid_model); | |||
| Status CheckAndReleaseStreamEventResource(const GeModelPtr &ge_model, uint32_t model_id); | |||
| Status ReleaseResource(int64_t need_resource, int64_t free_resource, const string &resource_kind); | |||
| Status GetFreeStream(int64_t &free_stream); | |||
| void GetFreeEvent(int64_t &free_event); | |||
| void GetMaxStreamAndEventModel(uint32_t &max_stream_model, uint32_t &max_event_model); | |||
| /// | |||
| /// @ingroup domi_ome | |||
| /// @brief delete model from model manager set | |||
| @@ -43,6 +43,9 @@ | |||
| } \ | |||
| } while (0) | |||
| namespace { | |||
| const char *const kUsedStreamNum = "used_stream_num"; | |||
| } // namespace | |||
| namespace ge { | |||
| /// | |||
| /// @ingroup ge | |||
| @@ -620,4 +623,54 @@ Status ModelUtils::GetRtAddress(const RuntimeParam ¶m, uintptr_t logic_addr, | |||
| mem_addr = runtime_base_addr + logic_addr; | |||
| return SUCCESS; | |||
| } | |||
| Status ModelUtils::CalculateFollowStream(const GeModelPtr &ge_model, int64_t &hccl_fellow_stream_num) { | |||
| const auto &model_def = ge_model->GetModelTaskDefPtr(); | |||
| GE_CHECK_NOTNULL(model_def); | |||
| Graph graph = ge_model->GetGraph(); | |||
| ComputeGraphPtr compute_graph = GraphUtils::GetComputeGraph(graph); | |||
| GE_CHECK_NOTNULL(compute_graph); | |||
| map<uint32_t, OpDescPtr> op_list; | |||
| for (const auto &node : compute_graph->GetDirectNode()) { | |||
| OpDescPtr op_desc = node->GetOpDesc(); | |||
| GE_CHECK_NOTNULL(op_desc); | |||
| op_list.emplace(op_desc->GetId(), op_desc); | |||
| } | |||
| std::multimap<int64_t, int64_t> main_follow_num; | |||
| for (int32_t i = 0; i < model_def->task_size(); i++) { | |||
| const domi::TaskDef &task = model_def->task(i); | |||
| if (static_cast<rtModelTaskType_t>(task.type() == RT_MODEL_TASK_HCCL)){ | |||
| auto hccl_def = task.kernel_hccl(); | |||
| OpDescPtr hccl_op_desc = op_list.at(hccl_def.op_index()); | |||
| int64_t main_stream_id = hccl_op_desc->GetStreamId(); | |||
| int64_t follow_stream_num = 0; | |||
| if (!ge::AttrUtils::GetInt(hccl_op_desc, kUsedStreamNum, follow_stream_num)) { | |||
| GELOGW("Get used stream num failed, op is %s", hccl_op_desc->GetName().c_str()); | |||
| } | |||
| main_follow_num.emplace(main_stream_id, follow_stream_num); | |||
| } | |||
| } | |||
| hccl_fellow_stream_num = CalFollowStramSum(main_follow_num); | |||
| return SUCCESS; | |||
| } | |||
| int64_t ModelUtils::CalFollowStramSum(const std::multimap<int64_t, int64_t> &hccl_stream_map) { | |||
| std::map<int64_t, int64_t> max_follow_stream_map; | |||
| for (const auto &it : hccl_stream_map) { | |||
| auto max_it = max_follow_stream_map.find(it.first); | |||
| if(max_it == max_follow_stream_map.end()) { | |||
| max_follow_stream_map.emplace(it.first, it.second); | |||
| } else if (it.second > max_it->second) { | |||
| max_follow_stream_map.at(max_it->first) = it.second; | |||
| } | |||
| } | |||
| int64_t need_follow_stream_num = 0; | |||
| for (const auto &follow_it : max_follow_stream_map) { | |||
| need_follow_stream_num = need_follow_stream_num + follow_it.second; | |||
| } | |||
| GELOGD("Need follow num is %ld", need_follow_stream_num); | |||
| return need_follow_stream_num; | |||
| } | |||
| } // namespace ge | |||
| @@ -24,6 +24,7 @@ | |||
| #include "graph/load/model_manager/task_info/task_info.h" | |||
| #include "graph/op_desc.h" | |||
| #include "graph/utils/tensor_adapter.h" | |||
| #include "model/ge_model.h" | |||
| using std::vector; | |||
| @@ -108,6 +109,19 @@ class ModelUtils { | |||
| /// | |||
| static Status GetRtAddress(const RuntimeParam &model_param, uintptr_t logic_addr, uint8_t *&mem_addr); | |||
| /// | |||
| /// @ingroup ge | |||
| /// @brief Calculate hccl follw stream | |||
| /// @return Status | |||
| /// | |||
| static Status CalculateFollowStream(const GeModelPtr &ge_model, int64_t &hccl_fellow_stream_num); | |||
| /// | |||
| /// @ingroup ge | |||
| /// @brief Calculate the sum of follow stream | |||
| /// @return int64_t | |||
| /// | |||
| static int64_t CalFollowStramSum(const std::multimap<int64_t, int64_t> &hccl_stream_map); | |||
| private: | |||
| /// | |||
| /// @ingroup ge | |||
| @@ -438,4 +438,98 @@ TEST_F(UtestModelManagerModelManager, test_data_input_tensor) { | |||
| auto ret = mm.DataInputTensor(model_id,inputs); | |||
| EXPECT_EQ(PARAM_INVALID, ret); // HybridDavinciModel::impl_ is null. | |||
| } | |||
| } // namespace ge | |||
| TEST_F(UtestModelManagerModelManager, test_init_dump_properties_with_new_session_id) { | |||
| ModelManager model_manager; | |||
| uint64_t session_id = 1; | |||
| model_manager.InitDumPropertiesWithNewSessionId(session_id); | |||
| } | |||
| TEST_F(UtestModelManagerModelManager, test_update_session_id) { | |||
| ModelManager model_manager; | |||
| uint32_t model_id = 0; | |||
| uint64_t session_id = 0; | |||
| GeModelPtr ge_model = MakeShared<GeModel>(); | |||
| std::shared_ptr<DavinciModel> davinci_model = MakeShared<DavinciModel>(0, nullptr); | |||
| model_manager.UpdateSessionId(model_id, ge_model, davinci_model, session_id); | |||
| } | |||
| TEST_F(UtestModelManagerModelManager, test_has_var_node) { | |||
| ModelManager model_manager; | |||
| uint64_t session_id = 1; | |||
| Graph graph("test"); | |||
| CreateGraph(graph); | |||
| auto compute_graph = ge::GraphUtils::GetComputeGraph(graph); | |||
| model_manager.HasVarNode(compute_graph); | |||
| } | |||
| TEST_F(UtestModelManagerModelManager, Cal_follow_stream_sum) { | |||
| std::multimap<int64_t, int64_t> hccl_stream_map = {{1,10}, {1,20}, {2,10}, {2,5}}; | |||
| int64_t result = ModelUtils::CalFollowStramSum(hccl_stream_map); | |||
| EXPECT_EQ(result, 30); | |||
| } | |||
| TEST_F(UtestModelManagerModelManager, get_max_stream_and_event) { | |||
| ModelManager mm; | |||
| auto model1 = std::make_shared<DavinciModel> (1, nullptr); | |||
| auto model2 = std::make_shared<DavinciModel> (2, nullptr); | |||
| rtStream_t stream = nullptr; | |||
| rtStream_t stream2 = nullptr; | |||
| rtStream_t stream3 = nullptr; | |||
| rtStream_t stream4 = nullptr; | |||
| rtEvent_t event = nullptr; | |||
| rtEvent_t event2 = nullptr; | |||
| rtEvent_t event3 = nullptr; | |||
| model1->stream_list_ = {stream, stream2, stream3, stream4}; | |||
| model1->event_list_ = {event, event2}; | |||
| model2->stream_list_ = {stream, stream2}; | |||
| model2->event_list_ = {event, event2, event3}; | |||
| mm.InsertModel(1, model1); | |||
| mm.InsertModel(2, model2); | |||
| uint32_t max_stream_model; | |||
| uint32_t max_event_model; | |||
| mm.GetMaxStreamAndEventModel(max_stream_model, max_event_model); | |||
| EXPECT_EQ(max_stream_model, 1); | |||
| EXPECT_EQ(max_event_model, 2); | |||
| int64_t free_stream; | |||
| int64_t free_event; | |||
| Status ret = mm.GetFreeStream(free_stream); | |||
| EXPECT_EQ(ret, SUCCESS); | |||
| } | |||
| TEST_F(UtestModelManagerModelManager, release_resource_stream) { | |||
| ModelManager mm; | |||
| auto model1 = std::make_shared<DavinciModel> (1, nullptr); | |||
| auto model2 = std::make_shared<DavinciModel> (2, nullptr); | |||
| rtStream_t stream = nullptr; | |||
| rtStream_t stream2 = nullptr; | |||
| rtStream_t stream3 = nullptr; | |||
| rtStream_t stream4 = nullptr; | |||
| rtEvent_t event = nullptr; | |||
| rtEvent_t event2 = nullptr; | |||
| rtEvent_t event3 = nullptr; | |||
| model1->stream_list_ = {stream, stream2, stream3, stream4}; | |||
| model1->event_list_ = {event, event2}; | |||
| model2->stream_list_ = {stream, stream2}; | |||
| model2->event_list_ = {event, event2, event3}; | |||
| mm.InsertModel(1, model1); | |||
| mm.InsertModel(2, model2); | |||
| string kind = "stream"; | |||
| Status ret = mm.ReleaseResource(110, 109, kind); | |||
| EXPECT_EQ(ret, SUCCESS); | |||
| string kind2 = "event"; | |||
| Status ret2 = mm.ReleaseResource(110, 109, kind2); | |||
| EXPECT_EQ(ret2, SUCCESS); | |||
| } | |||
| TEST_F(UtestModelManagerModelManager, check_stream_and_event_resource) { | |||
| ModelManager mm; | |||
| auto ge_model = make_shared<GeModel>(); | |||
| Status ret = mm.CheckAndReleaseStreamEventResource(ge_model, 1); | |||
| EXPECT_EQ(ret, FAILED); | |||
| } | |||
| } // namespace ge | |||