You can not select more than 25 topics Topics must start with a chinese character,a letter or number, can include dashes ('-') and can be up to 35 characters long.

merge_to_stream_merge_pass.cc 11 kB

5 years ago
5 years ago
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244
  1. /**
  2. * Copyright 2020 Huawei Technologies Co., Ltd
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. #include "graph/passes/merge_to_stream_merge_pass.h"
  17. #include "common/ge/ge_util.h"
  18. #include "ge/ge_api_types.h"
  19. #include "graph/common/omg_util.h"
  20. namespace ge {
  21. Status MergeToStreamMergePass::Run(ComputeGraphPtr graph) {
  22. GELOGD("MergeToStreamMergePass Enter");
  23. bypass_nodes_.clear();
  24. for (const auto &node : graph->GetDirectNode()) {
  25. if ((node->GetType() != MERGE) && (node->GetType() != REFMERGE)) {
  26. continue;
  27. }
  28. OpDescPtr merge_op_desc = node->GetOpDesc();
  29. GE_CHECK_NOTNULL(merge_op_desc);
  30. if (merge_op_desc->HasAttr(ATTR_INSERT_BY_MBATCH)) {
  31. GE_CHK_STATUS_RET(AddMemcpyAsyncNodes(graph, node, true), "Merge add memcpy node failed.");
  32. GE_CHK_STATUS_RET(SetStreamLabel(node, node->GetName()), "Set stream label failed");
  33. } else {
  34. GE_CHK_STATUS_RET(ReplaceMergeNode(graph, node), "Add StreamMerge node failed.");
  35. }
  36. }
  37. for (const auto &node : bypass_nodes_) {
  38. GE_CHK_BOOL_EXEC(GraphUtils::RemoveNodeWithoutRelink(graph, node) == GRAPH_SUCCESS, return FAILED,
  39. "Remove merge node failed.");
  40. }
  41. GELOGD("MergeToStreamMergePass Leave");
  42. return SUCCESS;
  43. }
  44. ///
  45. /// @brief Replace Merge Op
  46. /// @param [in] graph
  47. /// @param [in] merge_node
  48. /// @return Status
  49. ///
  50. Status MergeToStreamMergePass::ReplaceMergeNode(const ComputeGraphPtr &graph, const NodePtr &merge_node) {
  51. OpDescPtr merge_op_desc = merge_node->GetOpDesc();
  52. GE_CHECK_NOTNULL(merge_op_desc);
  53. const std::string &node_name = merge_node->GetName();
  54. GELOGI("Create StreamMerge Op, name=%s.", node_name.c_str());
  55. OpDescPtr op_desc = MakeShared<OpDesc>(node_name, STREAMMERGE);
  56. if (op_desc == nullptr) {
  57. GELOGE(FAILED, "Create op_desc failed, StreamMerge:%s.", node_name.c_str());
  58. return FAILED;
  59. }
  60. for (const InDataAnchorPtr &in_anchor : merge_node->GetAllInDataAnchors()) {
  61. GE_CHK_BOOL_EXEC(op_desc->AddInputDesc(merge_op_desc->GetInputDesc(in_anchor->GetIdx())) == GRAPH_SUCCESS,
  62. return FAILED, "Create StreamMerge op: add input desc failed.");
  63. }
  64. for (const OutDataAnchorPtr &out_anchor : merge_node->GetAllOutDataAnchors()) {
  65. GE_CHK_BOOL_EXEC(op_desc->AddOutputDesc(merge_op_desc->GetOutputDesc(out_anchor->GetIdx())) == GRAPH_SUCCESS,
  66. return FAILED, "Create StreamMerge op: add output desc failed.");
  67. }
  68. NodePtr stream_merge = graph->AddNode(op_desc);
  69. GE_CHK_BOOL_EXEC(stream_merge != nullptr, return FAILED, "Insert StreamMerge node failed.");
  70. GE_CHK_STATUS_RET(MoveEdges(merge_node, stream_merge), "Move edges failed.");
  71. bypass_nodes_.insert(merge_node);
  72. if (merge_op_desc->HasAttr(ATTR_NAME_NEXT_ITERATION)) {
  73. std::string next_iteration_name;
  74. GE_IF_BOOL_EXEC(!AttrUtils::GetStr(merge_op_desc, ATTR_NAME_NEXT_ITERATION, next_iteration_name),
  75. GELOGE(INTERNAL_ERROR, "Get ATTR_NAME_NEXT_ITERATION failed");
  76. return INTERNAL_ERROR);
  77. GE_CHK_STATUS_RET(SetNextIteration(stream_merge, next_iteration_name), "Set next iteration failed");
  78. }
  79. if (merge_op_desc->HasAttr(ATTR_NAME_BATCH_LABEL)) {
  80. string batch_label;
  81. (void)AttrUtils::GetStr(merge_op_desc, ATTR_NAME_BATCH_LABEL, batch_label);
  82. if (!batch_label.empty()) {
  83. auto stream_merge_desc = stream_merge->GetOpDesc();
  84. GE_CHECK_NOTNULL(stream_merge_desc);
  85. (void)AttrUtils::SetStr(stream_merge_desc, ATTR_NAME_BATCH_LABEL, batch_label);
  86. }
  87. }
  88. return AddMemcpyAsyncNodes(graph, stream_merge, false);
  89. }
  90. ///
  91. /// @brief Add MemcpyAsync Op as StreamMerge in_node
  92. /// @param [in] graph
  93. /// @param [in] node
  94. /// @param [in] multi_batch_flag
  95. /// @return Status
  96. ///
  97. Status MergeToStreamMergePass::AddMemcpyAsyncNodes(const ComputeGraphPtr &graph, const NodePtr &node,
  98. bool multi_batch_flag) {
  99. GE_CHK_BOOL_EXEC(node != nullptr, return FAILED, "Param of pre node is null.");
  100. for (const InDataAnchorPtr &in_data_anchor : node->GetAllInDataAnchors()) {
  101. OutDataAnchorPtr peer_out_anchor = in_data_anchor->GetPeerOutAnchor();
  102. GE_IF_BOOL_EXEC(peer_out_anchor == nullptr, continue);
  103. NodePtr in_node = peer_out_anchor->GetOwnerNode();
  104. const std::string &type = in_node->GetType();
  105. // For WhileLoop no need memcpy & active for merge.
  106. GE_IF_BOOL_EXEC((type == ENTER) || (type == REFENTER) || (type == NEXTITERATION) || (type == REFNEXTITERATION),
  107. continue);
  108. const std::string &memcpy_name = node->GetName() + "_input_" + std::to_string(in_data_anchor->GetIdx());
  109. NodePtr memcpy_node = CreateMemcpyAsyncNode(graph, memcpy_name, peer_out_anchor, multi_batch_flag);
  110. GE_CHK_BOOL_EXEC(memcpy_node != nullptr, return FAILED, "Create MemcpyAsync node failed.");
  111. GE_CHK_STATUS(GraphUtils::RemoveEdge(peer_out_anchor, in_data_anchor), "MemcpyAsync node remove edge failed.");
  112. GE_CHK_STATUS(GraphUtils::AddEdge(peer_out_anchor, memcpy_node->GetInDataAnchor(0)),
  113. "MemcpyAsync node add edge failed.");
  114. GE_CHK_STATUS(GraphUtils::AddEdge(memcpy_node->GetOutDataAnchor(0), in_data_anchor),
  115. "MemcpyAsync node add edge failed.");
  116. NodePtr active_node = CreateActiveNode(graph, memcpy_node);
  117. GE_CHK_BOOL_EXEC(active_node != nullptr, return FAILED, "Create StreamActive node failed.");
  118. GE_CHK_STATUS(GraphUtils::AddEdge(active_node->GetOutControlAnchor(), node->GetInControlAnchor()),
  119. "StreamActive add ctrl edge failed.");
  120. if (SetActiveLabelList(active_node, { node->GetName() }) != SUCCESS) {
  121. GELOGE(FAILED, "SetActiveLabelList for node %s failed.", active_node->GetName().c_str());
  122. return FAILED;
  123. }
  124. }
  125. return SUCCESS;
  126. }
  127. ///
  128. /// @brief Add MemcpyAsync Node
  129. /// @param [in] graph
  130. /// @param [in] name
  131. /// @param [in] out_data_anchor
  132. /// @param [in] multi_batch_flag
  133. /// @return ge::NodePtr
  134. ///
  135. NodePtr MergeToStreamMergePass::CreateMemcpyAsyncNode(const ComputeGraphPtr &graph, const std::string &name,
  136. const OutDataAnchorPtr &out_data_anchor, bool multi_batch_flag) {
  137. GE_CHK_BOOL_EXEC(out_data_anchor != nullptr, return nullptr, "Param of input node is null.");
  138. OpDescPtr pre_op_desc = out_data_anchor->GetOwnerNode()->GetOpDesc();
  139. GE_CHK_BOOL_EXEC(pre_op_desc != nullptr, return nullptr, "OpDesc of pre node is invalid.");
  140. const std::string &memcpy_type = multi_batch_flag ? MEMCPYADDRASYNC : MEMCPYASYNC;
  141. const std::string &node_name = name + "_" + memcpy_type;
  142. GELOGI("Create MemcpyAsync op:%s.", node_name.c_str());
  143. OpDescPtr op_desc = MakeShared<OpDesc>(node_name, memcpy_type);
  144. if (op_desc == nullptr) {
  145. GELOGE(FAILED, "Create op_desc failed, MemcpyAsync:%s.", node_name.c_str());
  146. return nullptr;
  147. }
  148. GE_CHK_BOOL_EXEC(op_desc->AddInputDesc(pre_op_desc->GetOutputDesc(out_data_anchor->GetIdx())) == GRAPH_SUCCESS,
  149. return nullptr, "Create MemcpyAsync op: add input desc failed.");
  150. GE_CHK_BOOL_EXEC(op_desc->AddOutputDesc(pre_op_desc->GetOutputDesc(out_data_anchor->GetIdx())) == GRAPH_SUCCESS,
  151. return nullptr, "Create MemcpyAsync op: add output desc failed.");
  152. return graph->AddNode(op_desc);
  153. }
  154. ///
  155. /// @brief Create Active Op
  156. /// @param [in] graph
  157. /// @param [in] node
  158. /// @return ge::NodePtr
  159. ///
  160. NodePtr MergeToStreamMergePass::CreateActiveNode(const ComputeGraphPtr &graph, const NodePtr &node) {
  161. const std::string &node_name = node->GetName() + "_" + STREAMACTIVE;
  162. GELOGI("Create StreamActive op:%s.", node_name.c_str());
  163. OpDescPtr op_desc = MakeShared<OpDesc>(node_name, STREAMACTIVE);
  164. if (op_desc == nullptr) {
  165. GELOGE(FAILED, "Create op_desc failed, StreamActive:%s.", node_name.c_str());
  166. return nullptr;
  167. }
  168. NodePtr active_node = graph->AddNode(op_desc);
  169. GE_CHK_BOOL_EXEC(active_node != nullptr, return nullptr, "Create StreamActive node failed.");
  170. GE_IF_BOOL_EXEC(GraphUtils::AddEdge(node->GetOutControlAnchor(), active_node->GetInControlAnchor()) != SUCCESS,
  171. GELOGE(INTERNAL_ERROR, "add edge failed");
  172. return nullptr);
  173. GE_IF_BOOL_EXEC(SetSwitchBranchNodeLabel(active_node, node_name) != SUCCESS,
  174. GELOGE(INTERNAL_ERROR, "set switch branch node label failed");
  175. return nullptr);
  176. return active_node;
  177. }
  178. ///
  179. /// @brief move edges from old_node to new_node
  180. /// @param [in] old_node
  181. /// @param [in] new_node
  182. /// @return Status
  183. ///
  184. Status MergeToStreamMergePass::MoveEdges(const NodePtr &old_node, const NodePtr &new_node) {
  185. for (const InDataAnchorPtr &in_data_anchor : old_node->GetAllInDataAnchors()) {
  186. OutDataAnchorPtr peer_out_anchor = in_data_anchor->GetPeerOutAnchor();
  187. GE_IF_BOOL_EXEC(peer_out_anchor == nullptr, continue);
  188. GE_CHK_STATUS(GraphUtils::RemoveEdge(peer_out_anchor, in_data_anchor), "Merge remove in data edge failed.");
  189. GE_CHK_STATUS(GraphUtils::AddEdge(peer_out_anchor, new_node->GetInDataAnchor(in_data_anchor->GetIdx())),
  190. "StreamMerge add in data edge failed.");
  191. }
  192. for (const OutDataAnchorPtr &out_data_anchor : old_node->GetAllOutDataAnchors()) {
  193. for (const InDataAnchorPtr &peer_in_anchor : out_data_anchor->GetPeerInDataAnchors()) {
  194. GE_CHK_STATUS(GraphUtils::RemoveEdge(out_data_anchor, peer_in_anchor), "Merge remove out data edge failed.");
  195. GE_CHK_STATUS(GraphUtils::AddEdge(new_node->GetOutDataAnchor(out_data_anchor->GetIdx()), peer_in_anchor),
  196. "StreamMerge add out data edge failed.");
  197. }
  198. }
  199. for (const NodePtr &in_ctrl_node : old_node->GetInControlNodes()) {
  200. GE_CHK_STATUS(GraphUtils::RemoveEdge(in_ctrl_node->GetOutControlAnchor(), old_node->GetInControlAnchor()),
  201. "Merge remove in ctrl edge failed.");
  202. GE_CHK_STATUS(GraphUtils::AddEdge(in_ctrl_node->GetOutControlAnchor(), new_node->GetInControlAnchor()),
  203. "StreamMerge add in ctrl edge failed.");
  204. }
  205. for (const NodePtr &out_ctrl_node : old_node->GetOutControlNodes()) {
  206. GE_CHK_STATUS(GraphUtils::RemoveEdge(old_node->GetOutControlAnchor(), out_ctrl_node->GetInControlAnchor()),
  207. "Merge remove out ctrl edge failed.");
  208. GE_CHK_STATUS(GraphUtils::AddEdge(new_node->GetOutControlAnchor(), out_ctrl_node->GetInControlAnchor()),
  209. "StreamMerge add out ctrl edge failed.");
  210. }
  211. return SUCCESS;
  212. }
  213. } // namespace ge

图引擎模块(GE)是MindSpore的一个子模块,其代码由C++实现,位于前端模块ME和底层硬件之间,起到承接作用。图引擎模块以ME下发的图作为输入,然后进行一系列的深度图优化操作,最后输出一张可以在底层硬件上高效运行的图。GE针对昇腾AI处理器的硬件结构特点,做了特定的优化工作,以此来充分发挥出昇腾AI处理器的强大算力。在进行模型训练/推理时,GE会被自动调用而用户并不感知。GE主要由GE API和GE Core两部分组成,详细的架构图如下所示