| @@ -1013,6 +1013,24 @@ bool StreamAllocator::IsActivated(int64_t stream_id) const { | |||||
| return false; | return false; | ||||
| } | } | ||||
| // Iteraotor loop : | |||||
| // StreamSwitch -> StreamActive | |||||
| // FpBp loop: | |||||
| // StreamSwitch -> AssignAdd -> StreamActive | |||||
| NodePtr FindSwitchNodeBeforeLoopActiveNode(const NodePtr &active_node) { | |||||
| for (auto pre_node : active_node->GetInControlNodes()) { | |||||
| if (pre_node->GetType() == STREAMSWITCH) { | |||||
| return pre_node; | |||||
| } | |||||
| for (auto pre_pre_node : pre_node->GetInControlNodes()) { | |||||
| if (pre_pre_node->GetType() == STREAMSWITCH) { | |||||
| return pre_pre_node; | |||||
| } | |||||
| } | |||||
| } | |||||
| return nullptr; | |||||
| } | |||||
| Status StreamAllocator::SetActiveStreamsForLoop() { | Status StreamAllocator::SetActiveStreamsForLoop() { | ||||
| vector<uint32_t> loop_active_streams; | vector<uint32_t> loop_active_streams; | ||||
| for (int64_t stream_id = 0; stream_id < stream_num_; stream_id++) { | for (int64_t stream_id = 0; stream_id < stream_num_; stream_id++) { | ||||
| @@ -1038,6 +1056,13 @@ Status StreamAllocator::SetActiveStreamsForLoop() { | |||||
| bool is_loop_active = false; | bool is_loop_active = false; | ||||
| if (AttrUtils::GetBool(node->GetOpDesc(), ATTR_NAME_IS_LOOP_ACTIVE, is_loop_active) && is_loop_active) { | if (AttrUtils::GetBool(node->GetOpDesc(), ATTR_NAME_IS_LOOP_ACTIVE, is_loop_active) && is_loop_active) { | ||||
| vector<string> activated_label_list; | vector<string> activated_label_list; | ||||
| NodePtr pre_switch_node = FindSwitchNodeBeforeLoopActiveNode(node); | |||||
| if (pre_switch_node == nullptr) { | |||||
| GELOGE(FAILED, "find switch node before loop active node %s failed", node->GetName().c_str()); | |||||
| return FAILED; | |||||
| } | |||||
| if (!AttrUtils::GetListStr(node->GetOpDesc(), ATTR_NAME_ACTIVE_LABEL_LIST, activated_label_list) || | if (!AttrUtils::GetListStr(node->GetOpDesc(), ATTR_NAME_ACTIVE_LABEL_LIST, activated_label_list) || | ||||
| activated_label_list.empty()) { | activated_label_list.empty()) { | ||||
| GE_CHK_BOOL_EXEC(AttrUtils::SetListInt(node->GetOpDesc(), ATTR_NAME_ACTIVE_STREAM_LIST, loop_active_streams), | GE_CHK_BOOL_EXEC(AttrUtils::SetListInt(node->GetOpDesc(), ATTR_NAME_ACTIVE_STREAM_LIST, loop_active_streams), | ||||
| @@ -1053,7 +1078,7 @@ Status StreamAllocator::SetActiveStreamsForLoop() { | |||||
| // it may cause some stream actived by iterator next step when this stream still alive. | // it may cause some stream actived by iterator next step when this stream still alive. | ||||
| // If above situation happen, active message will lose, cause process block in next iteration. | // If above situation happen, active message will lose, cause process block in next iteration. | ||||
| // In order to avoid this abnormal happen, | // In order to avoid this abnormal happen, | ||||
| // add event between each last node and iterator active node in target active stream | |||||
| // add event between each last node and iterator switch node | |||||
| GELOGI("there are %zu next iterator target streams has streamswitch node.", streams_skip_iterator_event.size()); | GELOGI("there are %zu next iterator target streams has streamswitch node.", streams_skip_iterator_event.size()); | ||||
| for (auto iter : stream_id_to_last_node) { | for (auto iter : stream_id_to_last_node) { | ||||
| if (streams_skip_iterator_event.find(iter.first) != streams_skip_iterator_event.end()) { | if (streams_skip_iterator_event.find(iter.first) != streams_skip_iterator_event.end()) { | ||||
| @@ -1067,7 +1092,7 @@ Status StreamAllocator::SetActiveStreamsForLoop() { | |||||
| continue; | continue; | ||||
| } | } | ||||
| AddSendEventId(iter.second, event_num_); | AddSendEventId(iter.second, event_num_); | ||||
| AddRecvEventId(node, event_num_); | |||||
| AddRecvEventId(pre_switch_node, event_num_); | |||||
| event_num_++; | event_num_++; | ||||
| } | } | ||||