diff --git a/ge/hybrid/model/hybrid_model_builder.cc b/ge/hybrid/model/hybrid_model_builder.cc index 669fafb1..25dabd78 100755 --- a/ge/hybrid/model/hybrid_model_builder.cc +++ b/ge/hybrid/model/hybrid_model_builder.cc @@ -1089,14 +1089,14 @@ Status HybridModelBuilder::LoadTask(NodeItem &node_item) { Status HybridModelBuilder::LoadTasks() { GE_CHK_STATUS_RET(CheckAicpuOpList(), "Check Aicpu op failed."); - std::map ordered_partitioned_calls; + std::map> ordered_partitioned_calls; for (auto &it : hybrid_model_.node_items_) { auto &node_item = it.second; if (node_item->node_type == NETOUTPUT) { continue; } if (node_item->node_type == PARTITIONEDCALL) { - ordered_partitioned_calls.emplace(node_item->node_id, node_item.get()); + ordered_partitioned_calls[node_item->node_id][node_item->node_name] = node_item.get(); continue; } GE_CHK_STATUS_RET_NOLOG(LoadTask(*node_item)); @@ -1104,7 +1104,9 @@ Status HybridModelBuilder::LoadTasks() { // HCCL operators need to be loaded in the same order across different processes for (auto &it : ordered_partitioned_calls) { - GE_CHK_STATUS_RET_NOLOG(LoadTask(*it.second)); + for (auto &it2 : it.second) { + GE_CHK_STATUS_RET_NOLOG(LoadTask(*it2.second)); + } } return SUCCESS; @@ -1637,6 +1639,7 @@ Status HybridModelBuilder::LoadKnownShapedSubgraph(ComputeGraph &graph, NodeItem auto temp_graph = MakeShared("temp"); GE_CHECK_NOTNULL(temp_graph); auto wrapper_node = temp_graph->AddNode(wrapper_op_desc); + wrapper_op_desc->SetId(parent_node_item->node_id); GeModelPtr ge_model = subgraph_models_[subgraph_name]; GE_CHECK_NOTNULL(ge_model); hybrid_model_.known_shape_sub_models_.emplace(wrapper_node, ge_model); @@ -1916,7 +1919,6 @@ Status HybridModelBuilder::LoadDynamicSubgraph(ComputeGraph &graph, bool is_root NodeItem *node_item = nullptr; GE_CHK_STATUS_RET_NOLOG(GetOrCreateNodeItem(node, &node_item)); GE_CHK_STATUS_RET_NOLOG(BuildNodeItem(node, *node_item)); - GE_CHK_STATUS_RET_NOLOG(CollectParallelGroups(node_item)); GE_CHK_STATUS_RET_NOLOG(UpdateAnchorStatus(node)); // needed by FE generate task node_item->input_start = input_start; @@ -2069,22 +2071,17 @@ Status HybridModelBuilder::CollectParallelGroups(NodeItem *node_item) { } Status HybridModelBuilder::ParseDependentByParallelGroup() { + for (auto &it : hybrid_model_.node_items_) { + GE_CHK_STATUS_RET_NOLOG(CollectParallelGroups(it.second.get())); + } for (const auto &it : node_to_parallel_groups_) { auto node_item = it.first; - auto dst_engine_type = NodeExecutorManager::GetInstance().ResolveExecutorType(*node_item->node); + auto dst_executor_type = NodeExecutorManager::GetInstance().ResolveExecutorType(*node_item->node); for (const auto ¶llel_group : it.second) { auto &dependent_nodes = parallel_group_to_nodes_[parallel_group]; NodeItem *nearest_dep_node = nullptr; int max_id = -1; for (auto &dep_node : dependent_nodes) { - if (node_item == dep_node) { - continue; - } - auto src_engine_type = NodeExecutorManager::GetInstance().ResolveExecutorType(*dep_node->node); - if (src_engine_type == dst_engine_type) { - continue; - } - if (dep_node->node_id < node_item->node_id && dep_node->node_id > max_id) { nearest_dep_node = dep_node; max_id = dep_node->node_id; @@ -2092,17 +2089,25 @@ Status HybridModelBuilder::ParseDependentByParallelGroup() { } if (nearest_dep_node != nullptr) { - GELOGD("Add dependency for nodes of same parallel group[%s], src = [%s], dst = [%s]", - parallel_group.c_str(), - nearest_dep_node->NodeName().c_str(), - node_item->NodeName().c_str()); + GELOGD("[%s] Nearest node = [%s]", node_item->NodeName().c_str(), nearest_dep_node->NodeName().c_str()); + auto src_engine_type = NodeExecutorManager::GetInstance().ResolveExecutorType(*nearest_dep_node->node); + if (src_engine_type == dst_executor_type) { + GELOGD("No need to add dependency for nodes with same executor type"); + continue; + } auto &deps = node_item->dependents_for_execution; if (std::find(deps.begin(), deps.end(), nearest_dep_node->node) != deps.end()) { - GELOGD("Already has dependency, skip it"); + GELOGD("%s->%s Already has dependency, skip it", + nearest_dep_node->node->GetName().c_str(), + node_item->NodeName().c_str()); continue; } nearest_dep_node->has_observer = true; deps.emplace_back(nearest_dep_node->node); + GELOGD("Add dependency for nodes with the same parallel group[%s], src = [%s], dst = [%s]", + parallel_group.c_str(), + nearest_dep_node->NodeName().c_str(), + node_item->NodeName().c_str()); } } } diff --git a/tests/ut/ge/hybrid/ge_hybrid_unittest.cc b/tests/ut/ge/hybrid/ge_hybrid_unittest.cc index 2166b274..f38037a0 100644 --- a/tests/ut/ge/hybrid/ge_hybrid_unittest.cc +++ b/tests/ut/ge/hybrid/ge_hybrid_unittest.cc @@ -315,16 +315,21 @@ TEST_F(UtestGeHybrid, test_parse_parallel_group) { ASSERT_EQ(builder.parallel_group_to_nodes_["group_1"].size(), 2); ASSERT_EQ(builder.parallel_group_to_nodes_["group_2"].size(), 1); - ASSERT_FALSE(node_item->has_observer); - ASSERT_TRUE(node_item_1->dependents_for_execution.empty()); + builder.parallel_group_to_nodes_.clear(); + builder.node_ref_inputs_.clear(); + model.node_items_[node] = std::move(node_item); + model.node_items_[node_1] = std::move(node_item_1); + + ASSERT_FALSE(model.node_items_[node]->has_observer); + ASSERT_TRUE(model.node_items_[node_1]->dependents_for_execution.empty()); ASSERT_EQ(builder.ParseDependentByParallelGroup(), SUCCESS); - ASSERT_TRUE(node_item->has_observer); - ASSERT_EQ(node_item_1->dependents_for_execution.size(), 1); - ASSERT_EQ(node_item_1->dependents_for_execution[0], node); + ASSERT_TRUE(model.node_items_[node]->has_observer); + ASSERT_EQ(model.node_items_[node_1]->dependents_for_execution.size(), 1); + ASSERT_EQ(model.node_items_[node_1]->dependents_for_execution[0], node); // repeat parse ASSERT_EQ(builder.ParseDependentByParallelGroup(), SUCCESS); - ASSERT_TRUE(node_item->has_observer); - ASSERT_EQ(node_item_1->dependents_for_execution.size(), 1); - ASSERT_EQ(node_item_1->dependents_for_execution[0], node); + ASSERT_TRUE(model.node_items_[node]->has_observer); + ASSERT_EQ(model.node_items_[node_1]->dependents_for_execution.size(), 1); + ASSERT_EQ(model.node_items_[node_1]->dependents_for_execution[0], node); } \ No newline at end of file