From a3114f023d2384932fb1cadfc6b6a601a59dd8bf Mon Sep 17 00:00:00 2001 From: chenyemeng Date: Mon, 18 Jan 2021 16:53:09 +0800 Subject: [PATCH 1/3] cache support --- ge/CMakeLists.txt | 1 + ge/common/types.cc | 1 + ge/executor/CMakeLists.txt | 1 + ge/executor/module.mk | 1 + ge/ge_runner.mk | 1 + ge/graph/build/memory/var_mem_assign_util.cc | 8 +- .../load/new_model_manager/model_utils.cc | 37 ++++-- ge/graph/load/new_model_manager/model_utils.h | 9 ++ ge/graph/manager/graph_var_manager.cc | 74 +++++++++--- ge/graph/manager/graph_var_manager.h | 29 ++++- ge/graph/manager/rdma_pool_allocator.h | 4 + ge/graph/partition/dynamic_shape_partition.cc | 27 ++++- ge/graph/partition/dynamic_shape_partition.h | 3 +- ge/graph/partition/stage_partition.cc | 38 +++++- ge/graph/passes/subgraph_pass.cc | 7 +- .../ops_kernel_store/op/host_op.cc | 3 + .../executor/hybrid_model_async_executor.cc | 7 +- ge/hybrid/model/hybrid_model_builder.cc | 7 +- .../node_executor/hccl/hccl_node_executor.cc | 114 +++++++++++++----- .../node_executor/hccl/hccl_node_executor.h | 2 + .../host_cpu/kernel/assign_kernel.cc | 4 +- .../host_cpu/kernel/data_kernel.cc | 41 +++++++ .../host_cpu/kernel/data_kernel.h | 42 +++++++ .../host_cpu/kernel/no_op_kernel.cc | 2 +- .../host_cpu/kernel/random_uniform_kernel.cc | 4 +- .../host_cpu/kernel/variable_kernel.cc | 4 +- inc/framework/common/types.h | 1 + inc/framework/omg/parser/parser_types.h | 2 + tests/ut/ge/CMakeLists.txt | 1 + .../ut/ge/graph/load/model_utils_unittest.cc | 70 +++++++++++ third_party/fwkacllib/inc/runtime/mem.h | 1 + 31 files changed, 459 insertions(+), 87 deletions(-) create mode 100644 ge/hybrid/node_executor/host_cpu/kernel/data_kernel.cc create mode 100644 ge/hybrid/node_executor/host_cpu/kernel/data_kernel.h create mode 100644 tests/ut/ge/graph/load/model_utils_unittest.cc diff --git a/ge/CMakeLists.txt b/ge/CMakeLists.txt index a8eabf05..edbf837d 100755 --- a/ge/CMakeLists.txt +++ b/ge/CMakeLists.txt @@ -375,6 +375,7 @@ set(TRAIN_SRC_LIST "hybrid/node_executor/host_cpu/kernel/variable_kernel.cc" "hybrid/node_executor/host_cpu/kernel/assign_kernel.cc" "hybrid/node_executor/host_cpu/kernel/random_uniform_kernel.cc" + "hybrid/node_executor/host_cpu/kernel/data_kernel.cc" "hybrid/node_executor/controlop/control_op_executor.cc" "hybrid/node_executor/partitioned_call/partitioned_call_node_executor.cc" "hybrid/node_executor/hccl/hccl_node_executor.cc" diff --git a/ge/common/types.cc b/ge/common/types.cc index 268e7caa..90ff9fe4 100644 --- a/ge/common/types.cc +++ b/ge/common/types.cc @@ -388,6 +388,7 @@ REGISTER_OPTYPE_DEFINE(HCOMRECEIVE, "HcomReceive"); REGISTER_OPTYPE_DEFINE(HCOMREMOTEREAD, "HcomRemoteRead"); REGISTER_OPTYPE_DEFINE(HCOMREMOTEREFREAD, "HcomRemoteRefRead"); REGISTER_OPTYPE_DEFINE(HCOMREMOTEWRITE, "HcomRemoteWrite"); +REGISTER_OPTYPE_DEFINE(HCOMREMOTESCATTERWRITE, "HcomRemoteScatterWrite"); REGISTER_OPTYPE_DEFINE(VARASSIGN, "VarAssign"); REGISTER_OPTYPE_DEFINE(VARISINITIALIZEDOP, "VarIsInitializedOp"); diff --git a/ge/executor/CMakeLists.txt b/ge/executor/CMakeLists.txt index 755bdf97..d7bca1fa 100644 --- a/ge/executor/CMakeLists.txt +++ b/ge/executor/CMakeLists.txt @@ -104,6 +104,7 @@ set(SRC_LIST "../hybrid/node_executor/host_cpu/kernel/variable_kernel.cc" "../hybrid/node_executor/host_cpu/kernel/assign_kernel.cc" "../hybrid/node_executor/host_cpu/kernel/random_uniform_kernel.cc" + "../hybrid/node_executor/host_cpu/kernel/data_kernel.cc" "../hybrid/node_executor/controlop/control_op_executor.cc" "../hybrid/node_executor/partitioned_call/partitioned_call_node_executor.cc" "../hybrid/node_executor/rts/rts_node_executor.cc" diff --git a/ge/executor/module.mk b/ge/executor/module.mk index 87abdade..7f2c1c53 100644 --- a/ge/executor/module.mk +++ b/ge/executor/module.mk @@ -95,6 +95,7 @@ local_ge_executor_src_files := \ ../hybrid/node_executor/host_cpu/kernel/variable_kernel.cc \ ../hybrid/node_executor/host_cpu/kernel/assign_kernel.cc \ ../hybrid/node_executor/host_cpu/kernel/random_uniform_kernel.cc \ + ../hybrid/node_executor/host_cpu/kernel/data_kernel.cc \ ../hybrid/node_executor/controlop/control_op_executor.cc \ ../hybrid/node_executor/partitioned_call/partitioned_call_node_executor.cc \ ../hybrid/node_executor/rts/rts_node_executor.cc \ diff --git a/ge/ge_runner.mk b/ge/ge_runner.mk index 460d5068..af938686 100644 --- a/ge/ge_runner.mk +++ b/ge/ge_runner.mk @@ -300,6 +300,7 @@ LIBGE_LOCAL_SRC_FILES := \ hybrid/node_executor/host_cpu/kernel/variable_kernel.cc \ hybrid/node_executor/host_cpu/kernel/assign_kernel.cc \ hybrid/node_executor/host_cpu/kernel/random_uniform_kernel.cc \ + hybrid/node_executor/host_cpu/kernel/data_kernel.cc \ hybrid/node_executor/controlop/control_op_executor.cc \ hybrid/node_executor/partitioned_call/partitioned_call_node_executor.cc \ hybrid/node_executor/hccl/hccl_node_executor.cc \ diff --git a/ge/graph/build/memory/var_mem_assign_util.cc b/ge/graph/build/memory/var_mem_assign_util.cc index 639bfaa0..dfc633af 100755 --- a/ge/graph/build/memory/var_mem_assign_util.cc +++ b/ge/graph/build/memory/var_mem_assign_util.cc @@ -60,9 +60,14 @@ Status VarMemAssignUtil::AssignStaticMemory2Node(ge::ComputeGraphPtr &compute_gr return FAILED); ge::ConstGeTensorDescPtr tensor_desc = n->GetOpDesc()->GetOutputDescPtr(0); GE_CHECK_NOTNULL(tensor_desc); + rtMemType_t memory_type = RT_MEMORY_HBM; + uint32_t mem_type = 0; + if (AttrUtils::GetInt(n->GetOpDesc(), ATTR_OUTPUT_MEMORY_TYPE, mem_type) && (mem_type == 1)) { + memory_type = RT_MEMORY_RDMA_HBM; + } if (!VarManager::Instance(compute_graph->GetSessionID())->IsVarExist(node_name, *tensor_desc)) { GE_CHK_STATUS_RET( - VarManager::Instance(compute_graph->GetSessionID())->AssignVarMem(node_name, *tensor_desc, RT_MEMORY_HBM)); + VarManager::Instance(compute_graph->GetSessionID())->AssignVarMem(node_name, *tensor_desc, memory_type)); GE_IF_BOOL_EXEC(n->GetType() == VARIABLE, GE_CHK_STATUS_RET(AssignData2Fp32Var(n, compute_graph->GetSessionID()))); GE_CHK_STATUS_RET(VarManager::Instance(compute_graph->GetSessionID()) @@ -70,7 +75,6 @@ Status VarMemAssignUtil::AssignStaticMemory2Node(ge::ComputeGraphPtr &compute_gr } uint8_t *dev_ptr = nullptr; - rtMemType_t memory_type = RT_MEMORY_HBM; GE_CHK_STATUS_RET(VarManager::Instance(compute_graph->GetSessionID()) ->GetVarAddr(node_name, *tensor_desc, &dev_ptr, memory_type)); vector output_list = n->GetOpDesc()->GetOutputOffset(); diff --git a/ge/graph/load/new_model_manager/model_utils.cc b/ge/graph/load/new_model_manager/model_utils.cc index 22a657ad..efd8c619 100755 --- a/ge/graph/load/new_model_manager/model_utils.cc +++ b/ge/graph/load/new_model_manager/model_utils.cc @@ -15,18 +15,10 @@ */ #include "graph/load/new_model_manager/model_utils.h" - #include - #include "common/debug/log.h" #include "common/op/ge_op_utils.h" -#include "graph/debug/ge_attr_define.h" -#include "graph/utils/attr_utils.h" #include "graph/utils/tensor_utils.h" -#include "runtime/base.h" -#include "runtime/kernel.h" - -#include "framework/common/debug/ge_log.h" #include "graph/manager/graph_var_manager.h" #define VALIDATE_MEM_RANGE(OP, SIZE, OFFSET) \ @@ -342,8 +334,8 @@ vector ModelUtils::GetInputDataAddrs(const RuntimeParam &model_param, Co int64_t input_offset = v_input_offset[non_const_index]; non_const_index++; GE_IF_BOOL_EXEC(model_param.var_size != 0 && ge::VarManager::Instance(session_id)->IsVarAddr(input_offset), - VALIDATE_MEM_RANGE(op_desc, model_param.var_size, input_offset - model_param.logic_var_base); - uint8_t *variable_addr = model_param.var_base + input_offset - model_param.logic_var_base; + uint8_t *variable_addr = nullptr; + GE_CHK_STATUS_EXEC(GetVarAddr(model_param, op_desc, input_offset, variable_addr), return {}); v_input_data_addr.push_back(variable_addr); GELOGI("[IMAS]GetInputDataAddrs graph_%u type[V] name[%s] input[%lu] memaddr[%p]", model_param.graph_id, op_desc->GetName().c_str(), i, variable_addr); @@ -380,6 +372,27 @@ vector ModelUtils::GetInputDataAddrs(const RuntimeParam &model_param, Co return v_input_data_addr; } +/// +/// @ingroup ge +/// @brief Get variable address. +/// @return Status +/// +Status ModelUtils::GetVarAddr(const RuntimeParam &model_param, const ConstOpDescPtr &op_desc, int64_t offset, + uint8_t *&var_addr) { + if (ge::VarManager::Instance(model_param.session_id)->GetVarMemType(offset) == RT_MEMORY_RDMA_HBM) { + if (offset < 0) { + GELOGE(PARAM_INVALID, "rdma var addr is invalid, addr=%p", reinterpret_cast(offset)); + return PARAM_INVALID; + } + var_addr = reinterpret_cast(offset); + GE_CHECK_NOTNULL(var_addr); + } else { + VALIDATE_MEM_RANGE(op_desc, model_param.var_size, offset - model_param.logic_var_base); + var_addr = model_param.var_base + offset - model_param.logic_var_base; + } + return SUCCESS; +} + /// /// @ingroup ge /// @brief Get output data address. @@ -405,8 +418,8 @@ vector ModelUtils::GetOutputDataAddrs(const RuntimeParam &model_param, C } for (size_t i = 0; i < outputs_size; ++i) { GE_IF_BOOL_EXEC(model_param.var_size != 0 && ge::VarManager::Instance(session_id)->IsVarAddr(v_output_offset[i]), - VALIDATE_MEM_RANGE(op_desc, model_param.var_size, v_output_offset[i] - model_param.logic_var_base); - uint8_t *variable_addr = model_param.var_base + v_output_offset[i] - model_param.logic_var_base; + uint8_t *variable_addr = nullptr; + GE_CHK_STATUS_EXEC(GetVarAddr(model_param, op_desc, v_output_offset[i], variable_addr), return {}); v_output_data_addr.push_back(variable_addr); GELOGI("[IMAS]GetOutputDataAddrs graph_%u type[V] name[%s] output[%zu] memaddr[%p]", model_param.graph_id, op_desc->GetName().c_str(), i, variable_addr); diff --git a/ge/graph/load/new_model_manager/model_utils.h b/ge/graph/load/new_model_manager/model_utils.h index 4b3d7ae7..417b9b89 100755 --- a/ge/graph/load/new_model_manager/model_utils.h +++ b/ge/graph/load/new_model_manager/model_utils.h @@ -107,6 +107,15 @@ class ModelUtils { /// @return Status /// static Status GetRtAddress(const RuntimeParam &model_param, uintptr_t logic_addr, uint8_t *&mem_addr); + + private: + /// + /// @ingroup ge + /// @brief Get variable address. + /// @return Status + /// + static Status GetVarAddr(const RuntimeParam &model_param, const ConstOpDescPtr &op_desc, int64_t offset, + uint8_t *&var_addr); }; } // namespace ge diff --git a/ge/graph/manager/graph_var_manager.cc b/ge/graph/manager/graph_var_manager.cc index 821de257..928c893f 100755 --- a/ge/graph/manager/graph_var_manager.cc +++ b/ge/graph/manager/graph_var_manager.cc @@ -16,17 +16,10 @@ #include "graph/manager/graph_var_manager.h" -#include - -#include "common/l2_cache_optimize.h" -#include "common/types.h" -#include "framework/common/debug/ge_log.h" -#include "framework/common/debug/log.h" -#include "ge/ge_api_types.h" #include "graph/debug/ge_attr_define.h" #include "graph/manager/graph_mem_allocator.h" +#include "graph/manager/rdma_pool_allocator.h" #include "graph/manager/trans_var_data_utils.h" -#include "graph/utils/attr_utils.h" #include "graph/utils/type_utils.h" using std::map; @@ -37,7 +30,7 @@ namespace ge { VarResource::VarResource(uint64_t session_id) : session_id_(session_id) {} VarResource::~VarResource() { - var_offset_set_.clear(); + var_offset_map_.clear(); var_addr_mgr_map_.clear(); cur_var_tensor_desc_map_.clear(); var_broad_cast_info_.clear(); @@ -91,8 +84,10 @@ ge::Status VarResource::SaveVarAddr(const std::string &var_name, const ge::GeTen std::string var_key = VarKey(var_name, tensor_desc); GELOGD("VarResource::SaveVarAddr, var_key = %s", var_key.c_str()); if (var_addr_mgr_map_.count(var_key) == 0) { - uint64_t logic_address = VarManager::Instance(session_id_)->GetVarMemLogicBase() + - static_cast(reinterpret_cast(address)); + uint64_t logic_address = static_cast(reinterpret_cast(address)); + if (memory_type != RT_MEMORY_RDMA_HBM) { + logic_address += VarManager::Instance(session_id_)->GetVarMemLogicBase(); + } GELOGI("SaveVarAddr node_name %s, tensor_desc format %s, type %s.", var_name.c_str(), TypeUtils::FormatToSerialString(tensor_desc.GetFormat()).c_str(), TypeUtils::DataTypeToSerialString(tensor_desc.GetDataType()).c_str()); @@ -102,7 +97,7 @@ ge::Status VarResource::SaveVarAddr(const std::string &var_name, const ge::GeTen var_addr_mgr.tensor_desc = tensor_desc; var_addr_mgr.memory_type = memory_type; var_addr_mgr_map_[var_key] = var_addr_mgr; - var_offset_set_.insert(logic_address); + var_offset_map_[logic_address] = memory_type; return SUCCESS; } @@ -211,7 +206,14 @@ ge::Status VarResource::SyncVarData(uint32_t graph_id, const std::string &var_na return SyncVarData2BroadCast(graph_id, var_name, var_tensor_desc, base_ptr); } -bool VarResource::IsVarAddr(const int64_t &offset) { return var_offset_set_.count(offset) > 0; } +bool VarResource::IsVarAddr(const int64_t &offset) { return var_offset_map_.count(offset) > 0; } + +rtMemType_t VarResource::GetVarMemType(const int64_t &offset) { + if (var_offset_map_.count(offset) > 0) { + return var_offset_map_[offset]; + } + return RT_MEMORY_HBM; +} VarTransRoad *VarResource::GetTransRoad(const std::string &var_name) { auto iter = var_to_trans_road_.find(var_name); @@ -252,7 +254,19 @@ Status VarResource::SetAllocatedGraphId(const std::string &var_name, uint32_t gr MemResource::MemResource() : total_size_(0), var_mem_size_(0) {} -Status MemResource::AssignVarMem(const std::string &var_name, uint64_t size, uint64_t session_id, size_t &mem_offset) { +MemResource *MemResource::BuildMemResourceFromType(rtMemType_t mem_type) { + switch (mem_type) { + case RT_MEMORY_HBM: + return new (std::nothrow) HbmMemResource(); + case RT_MEMORY_RDMA_HBM: + return new (std::nothrow) RdmaMemResource(); + default: + return nullptr; + } +} + +Status HbmMemResource::AssignVarMem(const std::string &var_name, uint64_t size, uint64_t session_id, + size_t &mem_offset) { size = (size + kSessionMemAlignSize - 1) / kSessionMemAlignSize * kSessionMemAlignSize; uint64_t real_size = size; total_size_ = VarManager::Instance(session_id)->GetVarMemMaxSize(); @@ -282,6 +296,19 @@ Status MemResource::AssignVarMem(const std::string &var_name, uint64_t size, uin return SUCCESS; } +Status RdmaMemResource::AssignVarMem(const std::string &var_name, uint64_t size, uint64_t session_id, size_t &address) { + uint8_t *buffer = MemManager::Instance().RdmaPoolInstance(RT_MEMORY_HBM).Malloc(size); + if (buffer == nullptr) { + GELOGE(MEMALLOC_FAILED, "Failed to malloc rdma memory for node %s, size = %llu", var_name.c_str(), size); + return MEMALLOC_FAILED; + } + address = reinterpret_cast(reinterpret_cast(buffer)); + var_mem_size_ += size; + GELOGI("[IMAS]AssignVarMem Set session_%llu name[%s] output[%d] addr to [%p] size[%llu].", + session_id, var_name.c_str(), 0, buffer, size); + return SUCCESS; +} + uint64_t MemResource::GetVarMemSize() const { return var_mem_size_; } void MemResource::UpdateVarMemSize(int64_t mem_size) { var_mem_size_ = mem_size; }; @@ -428,7 +455,7 @@ Status VarManager::UpdateVarMemSize(rtMemType_t memory_type, int64_t mem_size) { MemResource *mem_resource = nullptr; auto iter = mem_resource_map_.find(memory_type); if (iter == mem_resource_map_.end()) { - mem_resource = new (std::nothrow) MemResource(); + mem_resource = MemResource::BuildMemResourceFromType(memory_type); if (mem_resource == nullptr) { GELOGE(ge::INTERNAL_ERROR, "Alloc MemResource failed, memory_type = %u.", memory_type); return ge::INTERNAL_ERROR; @@ -465,7 +492,7 @@ ge::Status VarManager::AssignVarMem(const std::string &var_name, const ge::GeTen MemResource *mem_resource = nullptr; auto it = mem_resource_map_.find(memory_type); if (it == mem_resource_map_.end()) { - mem_resource = new (std::nothrow) MemResource(); + mem_resource = MemResource::BuildMemResourceFromType(memory_type); if (mem_resource == nullptr) { GELOGE(ge::INTERNAL_ERROR, "Alloc MemResource failed, memory_type = %u.", memory_type); return ge::INTERNAL_ERROR; @@ -629,6 +656,15 @@ bool VarManager::IsVarAddr(const int64_t &offset) { return var_resource_->IsVarAddr(offset); } +rtMemType_t VarManager::GetVarMemType(const int64_t &offset) { + std::lock_guard lock(mutex_); + if (var_resource_ == nullptr) { + GELOGW("VarManager has not been init."); + return RT_MEMORY_HBM; + } + return var_resource_->GetVarMemType(offset); +} + ge::Status VarManager::MallocVarMemory(size_t memory_size) { std::lock_guard lock(mutex_); uint8_t *var_mem_base = nullptr; @@ -654,12 +690,18 @@ ge::Status VarManager::MallocVarMemory(size_t memory_size) { uint8_t *VarManager::GetVarMemoryBase(rtMemType_t memory_type) { std::lock_guard lock(mutex_); + if (memory_type == RT_MEMORY_RDMA_HBM) { + return MemManager::Instance().RdmaPoolInstance(RT_MEMORY_HBM).GetRdmaBaseAddr(); + } string memory_key = std::to_string(session_id_); return MemManager::Instance(memory_type)->GetMemoryAddr(memory_key); } uint8_t *VarManager::GetVarMemoryAddr(uint8_t *logic_addr, rtMemType_t memory_type) { std::lock_guard lock(mutex_); + if (memory_type == RT_MEMORY_RDMA_HBM) { + return logic_addr; + } string mem_key = std::to_string(session_id_); uint8_t *mem_base = MemManager::Instance(memory_type)->GetMemoryAddr(mem_key); if (mem_base == nullptr) { diff --git a/ge/graph/manager/graph_var_manager.h b/ge/graph/manager/graph_var_manager.h index 9cf0068c..924ddcb7 100755 --- a/ge/graph/manager/graph_var_manager.h +++ b/ge/graph/manager/graph_var_manager.h @@ -158,13 +158,15 @@ class VarResource { bool IsVarAddr(const int64_t &offset); + rtMemType_t GetVarMemType(const int64_t &offset); + std::unordered_map GetAllVarDesc() const { return cur_var_tensor_desc_map_; } private: std::string VarKey(const std::string &var_name, const ge::GeTensorDesc &tensor_desc); uint64_t session_id_; - std::unordered_set var_offset_set_; + std::unordered_map var_offset_map_; std::unordered_map var_addr_mgr_map_; std::unordered_map cur_var_tensor_desc_map_; std::unordered_map> var_to_trans_road_; @@ -176,19 +178,36 @@ class VarResource { class MemResource { public: MemResource(); - ~MemResource() = default; + virtual ~MemResource() = default; + static MemResource *BuildMemResourceFromType(rtMemType_t mem_type); - Status AssignVarMem(const std::string &var_name, uint64_t size, uint64_t session_id, size_t &mem_offset); + virtual Status AssignVarMem(const std::string &var_name, uint64_t size, uint64_t session_id, size_t &mem_offset) = 0; uint64_t GetVarMemSize() const; void UpdateVarMemSize(int64_t mem_size); - private: + protected: uint64_t total_size_; uint64_t var_mem_size_; }; +class HbmMemResource : public MemResource { + public: + HbmMemResource() = default; + ~HbmMemResource() override = default; + + Status AssignVarMem(const std::string &var_name, uint64_t size, uint64_t session_id, size_t &address) override; +}; + +class RdmaMemResource : public MemResource { + public: + RdmaMemResource() = default; + ~RdmaMemResource() override = default; + + Status AssignVarMem(const std::string &var_name, uint64_t size, uint64_t session_id, size_t &address) override; +}; + class FMK_FUNC_HOST_VISIBILITY FMK_FUNC_DEV_VISIBILITY VarManager { public: static VarManager *Instance(uint64_t session_id); @@ -275,6 +294,8 @@ class FMK_FUNC_HOST_VISIBILITY FMK_FUNC_DEV_VISIBILITY VarManager { bool IsVarAddr(const int64_t &offset); + rtMemType_t GetVarMemType(const int64_t &offset); + uint8_t *GetVarMemoryBase(rtMemType_t memory_type); uint8_t *GetVarMemoryAddr(uint8_t *logic_addr, rtMemType_t memory_type); diff --git a/ge/graph/manager/rdma_pool_allocator.h b/ge/graph/manager/rdma_pool_allocator.h index 4d8cf71e..0a895a11 100644 --- a/ge/graph/manager/rdma_pool_allocator.h +++ b/ge/graph/manager/rdma_pool_allocator.h @@ -53,6 +53,10 @@ class RdmaPoolAllocator { Status GetBaseAddr(uint64_t &base_addr, uint64_t &mem_size); + uint8_t *GetRdmaBaseAddr() { return rdma_base_addr_; } + + size_t GetRdmaMemSize() { return rdma_mem_size_; } + private: void MergeBlocks(Block *dst, Block *src); diff --git a/ge/graph/partition/dynamic_shape_partition.cc b/ge/graph/partition/dynamic_shape_partition.cc index 6c81b21f..1c82eaf3 100755 --- a/ge/graph/partition/dynamic_shape_partition.cc +++ b/ge/graph/partition/dynamic_shape_partition.cc @@ -213,6 +213,7 @@ std::string DynamicShapePartitioner::DebugString() const { size_t data = 0; size_t netoutput = 0; size_t is_inputnode = 0; + size_t stage = 0; std::stringstream ss; ss << "All unknown shape nodes:" << std::endl; for (const auto &node : unknown_shape_nodes_) { @@ -229,10 +230,13 @@ std::string DynamicShapePartitioner::DebugString() const { netoutput++; } else if (cluster->IsInputNode()) { is_inputnode++; + } else if (cluster->IsIndependent()) { + stage++; } } ss << "All clusters:" << unique_clusters_.size() << ", data:" << data << ", known:" << known - << ", unknown:" << unknown << ", netoutput:" << netoutput << ", is_inputnode:" << is_inputnode << std::endl; + << ", unknown:" << unknown << ", netoutput:" << netoutput << ", is_inputnode:" << is_inputnode + << ", stage:" << stage << std::endl; for (const auto &cluster : unique_clusters_) { ss << " " << cluster->DebugString() << std::endl; } @@ -272,12 +276,15 @@ Status DynamicShapePartitioner::InitClusters() { for (const auto &node : graph->GetDirectNode()) { Cluster::Type type = Cluster::DATA; bool is_input = ((node->GetType() == CONSTANT) || (node->GetType() == CONSTANTOP)) && node->GetInNodes().empty(); + REQUIRE_NOT_NULL(node->GetOpDesc(), "op_desc is null"); if (node->GetType() == DATA) { type = Cluster::DATA; } else if (is_input) { type = Cluster::INPUT_NODE; } else if (node->GetType() == NETOUTPUT) { type = Cluster::NETOUTPUT; + } else if ((node->GetType() == PARTITIONEDCALL) && (node->GetOpDesc()->HasAttr(ATTR_STAGE_LEVEL))) { + type = Cluster::STAGE; } else if (unknown_shape_nodes_.count(node) > 0) { type = Cluster::UNKNOWN_SHAPE; } else { @@ -360,6 +367,9 @@ static std::string ToString(const std::vector &clusters) { void DynamicShapePartitioner::MergeClustersUnknownShape() { // Merge unknown shape clusters for (const auto &cluster : ordered_cluster_) { + if (cluster->IsIndependent()) { + continue; + } for (const auto &in_cluster : cluster->Inputs()) { if (!in_cluster->IsUnknownShape()) { continue; @@ -379,6 +389,9 @@ void DynamicShapePartitioner::MergeClustersUnknownShape() { void DynamicShapePartitioner::MergeClustersKnownShape() { // Merge known shape clusters for (const auto &cluster : ordered_cluster_) { + if (cluster->IsIndependent()) { + continue; + } if (cluster->IsRefVariable() && cluster->Inputs().size() == 1) { auto in_cluster = *(cluster->Inputs().begin()); in_cluster->Merge(cluster); @@ -606,6 +619,7 @@ void Cluster::UpdateRank(size_t rank) { bool Cluster::IsData() const { return type_ == DATA; }; bool Cluster::IsKnownShape() const { return type_ == KNOWN_SHAPE; }; bool Cluster::IsUnknownShape() const { return type_ == UNKNOWN_SHAPE; }; +bool Cluster::IsIndependent() const { return type_ == STAGE; }; bool Cluster::IsNetOutput() const { return type_ == NETOUTPUT; }; bool Cluster::IsInputNode() const { return type_ == INPUT_NODE; }; bool Cluster::IsRefVariable() const { @@ -641,6 +655,9 @@ void Cluster::RemoveOutput(ClusterPtr out) { out->in_clusters_.end()); }; void Cluster::Merge(ClusterPtr other) { + if (other->IsIndependent()) { + return; + } nodes_.insert(nodes_.end(), other->nodes_.begin(), other->nodes_.end()); other->in_clusters_.erase(std::remove(other->in_clusters_.begin(), other->in_clusters_.end(), shared_from_this()), other->in_clusters_.end()); @@ -689,7 +706,9 @@ std::vector Cluster::MergeAllPathFrom(ClusterPtr other) { std::unordered_set forward_reached_clusters; std::unordered_set backward_reached_clusters; std::vector path_clusters; - + if (other->IsIndependent()) { + return path_clusters; + } if (std::find(other->out_clusters_.begin(), other->out_clusters_.end(), shared_from_this()) == other->out_clusters_.end()) { return path_clusters; @@ -772,7 +791,7 @@ Status Cluster::BuildFrame() { } } } - if (IsData()) { + if (IsData() || IsIndependent()) { for (const auto &anchor : node->GetAllOutDataAnchors()) { AddFrameOutput(anchor); } @@ -888,7 +907,7 @@ Status Cluster::CombinePartitionFrame() { } Status Cluster::BuildPartitionSubgraph() { - if (IsData() || IsNetOutput()) { + if (IsData() || IsNetOutput() || IsIndependent()) { return SUCCESS; } int64_t parent_node_index = 0; diff --git a/ge/graph/partition/dynamic_shape_partition.h b/ge/graph/partition/dynamic_shape_partition.h index 9772615e..e8408ff9 100644 --- a/ge/graph/partition/dynamic_shape_partition.h +++ b/ge/graph/partition/dynamic_shape_partition.h @@ -32,7 +32,7 @@ class DynamicShapePartitioner { // DATA:DATA, UNKNOWN_SHAPE:unknowshape, KNOWN_SHAPE:knowshape, NETOUTPUT:NETOUTPUT. class Cluster : public std::enable_shared_from_this { public: - enum Type { DATA, INPUT_NODE, NETOUTPUT, KNOWN_SHAPE, UNKNOWN_SHAPE }; + enum Type { DATA, INPUT_NODE, NETOUTPUT, STAGE, KNOWN_SHAPE, UNKNOWN_SHAPE }; Cluster(size_t rank, Type type, NodePtr node, DynamicShapePartitioner *partitioner) : id_(rank), min_(rank), max_(rank), type_(type), partitioner_(partitioner) { nodes_.push_back(node); @@ -45,6 +45,7 @@ class DynamicShapePartitioner { bool IsData() const; bool IsKnownShape() const; bool IsUnknownShape() const; + bool IsIndependent() const; bool IsNetOutput() const; std::vector> Inputs() const; std::vector> Outputs() const; diff --git a/ge/graph/partition/stage_partition.cc b/ge/graph/partition/stage_partition.cc index 93a06afe..f6e49bbd 100644 --- a/ge/graph/partition/stage_partition.cc +++ b/ge/graph/partition/stage_partition.cc @@ -25,6 +25,10 @@ #include "common/types.h" namespace ge { +namespace { +const std::set kSrcNodeTypes = { DATA, AIPPDATA, ANN_DATA }; +} + Status StagePartitioner::Partition() { GE_CHECK_NOTNULL(root_graph_); if (root_graph_->GetParentGraph() != nullptr) { @@ -37,6 +41,10 @@ Status StagePartitioner::Partition() { if (!AttrUtils::GetInt(op_desc, ATTR_STAGE_LEVEL, level)) { continue; } + if ((kSrcNodeTypes.count(op_desc->GetType()) != 0) && node->GetInAllNodes().empty()) { + continue; + } + GELOGD("original node %s for stage %u", node->GetName().c_str(), level); stage_nodes_[level].insert(node); } if (stage_nodes_.empty()) { @@ -54,6 +62,13 @@ Status StagePartitioner::Partition() { return FAILED; } + root_graph_->TopologicalSorting([](const NodePtr &a, const NodePtr &b) -> bool { + uint32_t a_level = UINT32_MAX; + (void)AttrUtils::GetInt(a->GetOpDesc(), ATTR_STAGE_LEVEL, a_level); + uint32_t b_level = UINT32_MAX; + (void)AttrUtils::GetInt(b->GetOpDesc(), ATTR_STAGE_LEVEL, b_level); + return a_level < b_level; + }); if (root_graph_->TopologicalSorting() != GRAPH_SUCCESS) { GELOGE(FAILED, "Topological sort for graph %s after stage partition failed, " "maybe stage_level was not set correctly.", root_graph_->GetName().c_str()); @@ -76,20 +91,26 @@ Status StagePartitioner::SplitStageLevel() { auto node = nodes.top(); nodes.pop(); GE_CHECK_NOTNULL(node->GetOpDesc()); - if (node->GetOpDesc()->HasAttr(ATTR_STAGE_LEVEL) && (cur_stage_nodes.count(node) == 0)) { + uint32_t tmp_level = cur_stage_level; + (void)AttrUtils::GetInt(node->GetOpDesc(), ATTR_STAGE_LEVEL, tmp_level); + if (tmp_level != cur_stage_level) { continue; } for (const auto &in_node : node->GetInAllNodes()) { if (visited_stage_nodes.count(in_node) != 0) { continue; } + if (!AttrUtils::SetInt(in_node->GetOpDesc(), ATTR_STAGE_LEVEL, cur_stage_level)) { + GELOGE(INTERNAL_ERROR, "Set attr ATTR_STAGE_LEVEL on node %s failed.", in_node->GetName().c_str()); + return INTERNAL_ERROR; + } + GELOGD("Mark stage_level node %s, stage_level=%u", in_node->GetName().c_str(), cur_stage_level); + if ((kSrcNodeTypes.count(in_node->GetType()) != 0) && in_node->GetInAllNodes().empty()) { + GELOGD("skip data node %s for stage %u", in_node->GetName().c_str(), cur_stage_level); + continue; + } nodes.push(in_node); } - if (!AttrUtils::SetInt(node->GetOpDesc(), ATTR_STAGE_LEVEL, cur_stage_level)) { - GELOGE(INTERNAL_ERROR, "Set attr ATTR_STAGE_LEVEL on node %s failed.", node->GetName().c_str()); - return INTERNAL_ERROR; - } - GELOGD("Mark stage_level node %s, stage_level=%u", node->GetName().c_str(), cur_stage_level); visited_stage_nodes.emplace(node); } for (const auto &node : visited_stage_nodes) { @@ -219,6 +240,11 @@ NodePtr StagePartitioner::BuildSubgraphNode(const std::string &graph_name, const op_desc->AddSubgraphName("f"); op_desc->SetSubgraphInstanceName(0, graph_name); + if (!AttrUtils::SetInt(op_desc, ATTR_STAGE_LEVEL, stage_info.stage_level)) { + GELOGE(INTERNAL_ERROR, "Set attr ATTR_STAGE_LEVEL on node %s failed", op_desc->GetName().c_str()); + return nullptr; + } + NodePtr subgraph_node = root_graph_->AddNode(op_desc); if (subgraph_node == nullptr) { GELOGE(FAILED, "Add node %s failed.", graph_name.c_str()); diff --git a/ge/graph/passes/subgraph_pass.cc b/ge/graph/passes/subgraph_pass.cc index d1111d52..dc6269ac 100755 --- a/ge/graph/passes/subgraph_pass.cc +++ b/ge/graph/passes/subgraph_pass.cc @@ -142,17 +142,18 @@ Status SubgraphPass::SubgraphOutputNode(const ComputeGraphPtr &graph, const Node GE_CHECK_NOTNULL(in_node); // Need insert memcpy - // 1. Const->NetOutput in subgraph + // 1. Const->NetOutput in subgraph & parent graph is known // 2. AtomicOp->NetOutput in subgraph // 3. OutputContinuesRequiredOp->NetOutput in subgraph // 4. Data->NetOutput in subgraph but parent_node is not while // 5. While->NetOutput in known subgraph std::string op_type; - bool insert_flag = NodeUtils::GetConstOpType(in_node, op_type) || + bool insert_flag = + (NodeUtils::GetConstOpType(in_node, op_type) && !graph->GetParentGraph()->GetGraphUnknownFlag()) || IsAtomicRequired(in_node, peer_out_anchor->GetIdx()) || IsOutputContinuesRequired(in_node) || ((in_node->GetType() == DATA) && (kWhileOpTypes.count(graph->GetParentNode()->GetType()) == 0)) || (!graph->GetGraphUnknownFlag() && NodeUtils::IsDynamicShape(node) && - (kWhileOpTypes.count(in_node->GetType()) != 0)); + (kWhileOpTypes.count(in_node->GetType()) != 0)); if (insert_flag) { GELOGD("Insert MemcpyAsync node between %s and %s.", in_node->GetName().c_str(), node->GetName().c_str()); std::string name = node->GetName() + "_input_" + std::to_string(in_data_anchor->GetIdx()) + "_Memcpy"; diff --git a/ge/host_cpu_engine/ops_kernel_store/op/host_op.cc b/ge/host_cpu_engine/ops_kernel_store/op/host_op.cc index a6e00f4a..7f709f03 100644 --- a/ge/host_cpu_engine/ops_kernel_store/op/host_op.cc +++ b/ge/host_cpu_engine/ops_kernel_store/op/host_op.cc @@ -32,5 +32,8 @@ REGISTER_OP_CREATOR(Assign, HostOp); REGISTER_OP_CREATOR(RandomUniform, HostOp); REGISTER_OP_CREATOR(Add, HostOp); REGISTER_OP_CREATOR(Mul, HostOp); +REGISTER_OP_CREATOR(ConcatV2, HostOp); +REGISTER_OP_CREATOR(Data, HostOp); +REGISTER_OP_CREATOR(Fill, HostOp); } // namespace host_cpu } // namespace ge diff --git a/ge/hybrid/executor/hybrid_model_async_executor.cc b/ge/hybrid/executor/hybrid_model_async_executor.cc index e9881224..3673edf0 100644 --- a/ge/hybrid/executor/hybrid_model_async_executor.cc +++ b/ge/hybrid/executor/hybrid_model_async_executor.cc @@ -59,6 +59,7 @@ Status HybridModelAsyncExecutor::Start(const std::shared_ptr &lis run_flag_ = true; listener_ = listener; future_ = std::async(std::launch::async, [&]() -> Status { + GetThreadLocalContext() = *executor_->GetContext()->ge_context; GetContext().SetSessionId(executor_->GetContext()->session_id); return RunInternal(); }); @@ -229,7 +230,11 @@ Status HybridModelAsyncExecutor::PrepareInputs(const InputData ¤t_data, Hy } GE_CHECK_GE(tensor_size, 0); - auto tensor_buffer = TensorBuffer::Create(allocator, tensor_size); + AllocationAttr attr; + if (GetContext().GetHostExecFlag()) { + attr.SetMemType(HOST_DDR); + } + auto tensor_buffer = TensorBuffer::Create(allocator, tensor_size, &attr); GE_CHECK_NOTNULL(tensor_buffer); args.inputs.emplace_back(std::shared_ptr(tensor_buffer.release())); diff --git a/ge/hybrid/model/hybrid_model_builder.cc b/ge/hybrid/model/hybrid_model_builder.cc index d1f61985..7ee0bef7 100755 --- a/ge/hybrid/model/hybrid_model_builder.cc +++ b/ge/hybrid/model/hybrid_model_builder.cc @@ -772,7 +772,12 @@ Status HybridModelBuilder::VarNodeToTensor(const NodePtr &var_node, std::unique_ var_name.c_str(), hybrid_model_.GetSessionId()); - uint8_t *dev_mem = var_manager_->GetVarMemoryAddr(var_logic, RT_MEMORY_HBM); + rtMemType_t memory_type = RT_MEMORY_HBM; + uint32_t mem_type = 0; + if (AttrUtils::GetInt(var_node->GetOpDesc(), ATTR_OUTPUT_MEMORY_TYPE, mem_type) && (mem_type == 1)) { + memory_type = RT_MEMORY_RDMA_HBM; + } + uint8_t *dev_mem = var_manager_->GetVarMemoryAddr(var_logic, memory_type); if (dev_mem == nullptr) { GELOGE(INTERNAL_ERROR, "Failed to copy var %s from device, cant not get " diff --git a/ge/hybrid/node_executor/hccl/hccl_node_executor.cc b/ge/hybrid/node_executor/hccl/hccl_node_executor.cc index 94c734ca..5387a176 100644 --- a/ge/hybrid/node_executor/hccl/hccl_node_executor.cc +++ b/ge/hybrid/node_executor/hccl/hccl_node_executor.cc @@ -15,23 +15,25 @@ */ #include "hybrid/node_executor/hccl/hccl_node_executor.h" -#include "common/ge/ge_util.h" #include "common/ge/plugin_manager.h" #include "common/math/math_util.h" -#include "framework/common/debug/ge_log.h" #include "graph/attr_value.h" #include "graph/debug/ge_attr_define.h" #include "graph/manager/util/hcom_util.h" #include "graph/runtime_inference_context.h" -#include "hccl/hcom.h" +#include "graph/utils/type_utils.h" +#include "hybrid/executor/hybrid_execution_context.h" +namespace ge { namespace { -const size_t kVarTableDims = 2; -const size_t kVarTableRowCnt = 3; -const size_t kVarTableIdxAddr = 1; -const size_t kVarTableIdxLen = 2; +constexpr size_t kVarTableDims = 2; +constexpr size_t kVarTableRowCnt = 3; +constexpr size_t kVarTableIdxAddr = 1; +constexpr size_t kVarTableIdxLen = 2; +const std::set kRdmaReadTypes = { HCOMREMOTEREAD, HCOMREMOTEREFREAD }; +const std::set kRdmaWriteTypes = { HCOMREMOTEWRITE, HCOMREMOTESCATTERWRITE }; +const std::set kRdmaScatterTypes = { HCOMREMOTEREFREAD, HCOMREMOTESCATTERWRITE }; } // namespace -namespace ge { namespace hybrid { REGISTER_NODE_EXECUTOR_BUILDER(NodeExecutorManager::ExecutorType::HCCL, HcclNodeExecutor); @@ -142,11 +144,22 @@ Status RdmaNodeTask::Init(TaskContext &context) { GE_CHECK_NOTNULL(peer_node->GetOpDesc()); remote_index_ = {peer_node->GetOpDesc()->GetId(), out_data_anchor->GetIdx()}; - if (node_item.node->GetType() == HCOMREMOTEREAD) { + if (kRdmaReadTypes.count(node_item.node->GetType()) > 0) { local_index_ = 0; } else { local_index_ = op_desc->GetInputIndexByName("local"); } + int32_t offset_idx = node_item.op_desc->GetInputIndexByName("local_offset"); + if ((offset_idx != -1) && (node_item.op_desc->GetInputDescPtr(offset_idx) != nullptr)) { + skip_flag_ = true; + GE_CHECK_NOTNULL(node_item.node->GetInDataAnchor(offset_idx)); + GE_CHECK_NOTNULL(node_item.node->GetInDataAnchor(offset_idx)->GetPeerOutAnchor()); + GE_CHECK_NOTNULL(node_item.node->GetInDataAnchor(offset_idx)->GetPeerOutAnchor()->GetOwnerNode()); + GE_CHECK_NOTNULL(node_item.node->GetInDataAnchor(offset_idx)->GetPeerOutAnchor()->GetOwnerNode()->GetOpDesc()); + offset_index_ = { + node_item.node->GetInDataAnchor(offset_idx)->GetPeerOutAnchor()->GetOwnerNode()->GetOpDesc()->GetId(), + node_item.node->GetInDataAnchor(offset_idx)->GetPeerOutAnchor()->GetIdx() }; + } return SUCCESS; } @@ -158,8 +171,13 @@ Status RdmaNodeTask::ExtractTensor(TaskContext &context, vectorGetTensor(remote_index_.first, remote_index_.second, remote_tensor)); auto data = reinterpret_cast(remote_tensor.GetData()); if (data == nullptr) { - GELOGE(FAILED, "Tensor data is nullptr."); - return FAILED; + if (kRdmaScatterTypes.count(context.GetNodeItem().NodeType()) > 0) { + GELOGD("data is null, no need to do rdma read/write, node=%s", context.GetNodeName()); + return SUCCESS; + } else { + GELOGE(FAILED, "Tensor data is nullptr."); + return FAILED; + } } auto dims = remote_tensor.GetTensorDesc().GetShape().GetDims(); if (dims.size() != kVarTableDims && dims.back() != kVarTableRowCnt) { @@ -183,30 +201,63 @@ Status RdmaNodeTask::ExtractTensor(TaskContext &context, vector(tensor_buffer.release())))); } + } else if (context.GetNodeItem().NodeType() == HCOMREMOTEREFREAD) { + AllocationAttr attr; + attr.SetMemType(RDMA_HBM); + GE_CHK_STATUS_RET(context.AllocateOutputs(&attr)) } TensorValue *tv; - if (context.GetNodeItem().NodeType() == HCOMREMOTEREAD) { - tv = context.MutableOutput(0); + if (kRdmaReadTypes.count(context.GetNodeItem().NodeType()) > 0) { + tv = context.MutableOutput(local_index_); } else { tv = context.MutableInput(local_index_); } GE_CHECK_NOTNULL(tv); - auto local_addr = reinterpret_cast(reinterpret_cast(tv->MutableData())); auto row_num = dims.front(); addr_infos.resize(row_num); - auto device_len = tv->GetSize() / row_num; - if (device_len <= 0 || device_len > data[kVarTableIdxLen]) { - GELOGE(FAILED, "Local embedding length is out of range."); - return FAILED; - } + if (skip_flag_) { + int32_t offset_idx = context.GetNodeItem().op_desc->GetInputIndexByName("local_offset"); + GE_CHECK_NOTNULL(context.GetNodeItem().op_desc->GetInputDescPtr(offset_idx)); + auto data_type = context.GetNodeItem().op_desc->GetInputDesc(offset_idx).GetDataType(); + + Tensor offset_tensor; + GE_CHK_STATUS_RET(ctx->GetTensor(offset_index_.first, offset_index_.second, offset_tensor)) + if (static_cast(offset_tensor.GetSize() / GetSizeByDataType(data_type)) != row_num) { + GELOGE(PARAM_INVALID, "num of offset and remote addr mismatch, offset size=%zu, remote_addr size=%lld, dtype=%s", + offset_tensor.GetSize(), row_num, TypeUtils::DataTypeToSerialString(data_type).c_str()); + return PARAM_INVALID; + } - for (auto idx = 0; idx < row_num; ++idx) { - FMK_INT64_MULCHECK(idx, kVarTableRowCnt); - auto line_idx = idx * kVarTableRowCnt; - addr_infos[idx] = {static_cast(data[line_idx]), data[line_idx + kVarTableIdxAddr], local_addr, - device_len}; - local_addr += device_len; + auto addr_offset = reinterpret_cast(offset_tensor.GetData()); + GE_CHECK_NOTNULL(addr_offset); + auto base_addr = reinterpret_cast(tv->MutableData()); + GE_CHECK_NOTNULL(base_addr); + + for (auto idx = 0; idx < row_num; idx++) { + FMK_INT64_MULCHECK(idx, kVarTableRowCnt) + auto line_idx = idx * kVarTableRowCnt; + addr_infos[idx] = { static_cast(data[line_idx]), + data[line_idx + kVarTableIdxAddr], + reinterpret_cast(reinterpret_cast(base_addr + addr_offset[idx])), + data[line_idx + kVarTableIdxLen] }; + } + } else { + auto local_addr = reinterpret_cast(reinterpret_cast(tv->MutableData())); + auto device_len = tv->GetSize() / row_num; + if (device_len <= 0 || device_len > data[kVarTableIdxLen]) { + GELOGE(FAILED, "Local embedding length is out of range, expect %lld, but %lld exactly.", + data[kVarTableIdxLen], device_len); + return FAILED; + } + + for (auto idx = 0; idx < row_num; ++idx) { + FMK_INT64_MULCHECK(idx, kVarTableRowCnt) + auto line_idx = idx * kVarTableRowCnt; + addr_infos[idx] = { static_cast(data[line_idx]), data[line_idx + kVarTableIdxAddr], local_addr, + device_len }; + local_addr += device_len; + } } return SUCCESS; @@ -226,6 +277,10 @@ Status RdmaNodeTask::ExecuteAsync(TaskContext &context, std::function do } vector addr_infos; GE_CHK_STATUS_RET(ExtractTensor(context, addr_infos)); + if (addr_infos.empty()) { + done_callback(); + return SUCCESS; + } auto callback = [this](HcclResult status) { if (status != HCCL_SUCCESS) { @@ -235,6 +290,11 @@ Status RdmaNodeTask::ExecuteAsync(TaskContext &context, std::function do this->cond_.notify_all(); GELOGI("rdma callback success."); }; + + std::string executor_type = context.GetNodeItem().NodeType(); + if (kRdmaScatterTypes.count(context.GetNodeItem().NodeType()) > 0) { + executor_type = context.GetNodeItem().NodeType() == HCOMREMOTEREFREAD ? HCOMREMOTEREAD : HCOMREMOTEWRITE; + } HcclResult hccl_ret = HcomExecEnqueueRemoteAccess(context.GetNodeItem().NodeType(), addr_infos, callback); if (hccl_ret != HCCL_SUCCESS) { GELOGE(HCCL_E_INTERNAL, "Call HcomExecInitialize failed, ret: 0x%X", hccl_ret); @@ -262,7 +322,7 @@ Status HcclNodeExecutor::PrepareTask(NodeTask &task, TaskContext &context) const GE_CHK_STATUS_RET(task.Init(context), "hccl node load hccl so failed."); // allocate output mem, output mem or remote read will be calculated when node execute. - if (context.GetNodeItem().NodeType() != HCOMREMOTEREAD) { + if (kRdmaReadTypes.count(context.GetNodeItem().NodeType()) == 0) { GE_CHK_STATUS_RET(context.AllocateOutputs(), "hccl node task allocate output failed."); } @@ -274,7 +334,7 @@ Status HcclNodeExecutor::PrepareTask(NodeTask &task, TaskContext &context) const Status HcclNodeExecutor::LoadTask(const HybridModel &model, const NodePtr &node, shared_ptr &task) const { GELOGI("[%s] HcclNodeExecutor::LoadTask in.", node->GetName().c_str()); GE_CHECK_NOTNULL(node); - if (node->GetType() == HCOMREMOTEREAD || node->GetType() == HCOMREMOTEWRITE) { + if ((kRdmaReadTypes.count(node->GetType()) > 0) || (kRdmaWriteTypes.count(node->GetType()) > 0)) { task = MakeShared(); } else { task = MakeShared(); diff --git a/ge/hybrid/node_executor/hccl/hccl_node_executor.h b/ge/hybrid/node_executor/hccl/hccl_node_executor.h index 07dd848b..873f259f 100644 --- a/ge/hybrid/node_executor/hccl/hccl_node_executor.h +++ b/ge/hybrid/node_executor/hccl/hccl_node_executor.h @@ -55,9 +55,11 @@ class RdmaNodeTask : public NodeTask { private: Status ExtractTensor(TaskContext &context, vector &addr_infos); std::pair remote_index_; + std::pair offset_index_; int32_t local_index_ = 0; std::mutex hccl_mutex_; std::condition_variable cond_; + bool skip_flag_; }; class HcclNodeExecutor : public NodeExecutor { diff --git a/ge/hybrid/node_executor/host_cpu/kernel/assign_kernel.cc b/ge/hybrid/node_executor/host_cpu/kernel/assign_kernel.cc index 01fd391d..d54195d6 100644 --- a/ge/hybrid/node_executor/host_cpu/kernel/assign_kernel.cc +++ b/ge/hybrid/node_executor/host_cpu/kernel/assign_kernel.cc @@ -29,8 +29,6 @@ namespace ge { namespace hybrid { namespace host_cpu { Status AssignKernel::Compute(TaskContext& context) { - GELOGI("[%s] compute begin.", node_->GetName().c_str()); - auto ref_tensor = context.MutableInput(kAssignRefInputIndex); GE_CHECK_NOTNULL(ref_tensor); const auto value_tensor = context.GetInput(kAssignValueInputIndex); @@ -50,7 +48,7 @@ Status AssignKernel::Compute(TaskContext& context) { GE_CHK_STATUS_RET(context.SetOutput(kAssignRefOutputIndex, *ref_tensor), "[%s] Failed to set output.", context.GetNodeName()); - GELOGI("[%s] compute success.", node_->GetName().c_str()); + GELOGD("[%s] compute success.", node_->GetName().c_str()); return SUCCESS; } diff --git a/ge/hybrid/node_executor/host_cpu/kernel/data_kernel.cc b/ge/hybrid/node_executor/host_cpu/kernel/data_kernel.cc new file mode 100644 index 00000000..e34f601a --- /dev/null +++ b/ge/hybrid/node_executor/host_cpu/kernel/data_kernel.cc @@ -0,0 +1,41 @@ +/** + * Copyright 2020 Huawei Technologies Co., Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "hybrid/node_executor/host_cpu/kernel/data_kernel.h" +#include "framework/common/debug/ge_log.h" +#include "framework/common/util.h" +#include "hybrid/node_executor/host_cpu/kernel_factory.h" + +namespace { +constexpr size_t kDataInputIndex = 0; +constexpr size_t kDataOutputIndex = 0; +} + +namespace ge { +namespace hybrid { +namespace host_cpu { +Status DataKernel::Compute(TaskContext& context) { + auto input = context.MutableInput(kDataInputIndex); + GE_CHECK_NOTNULL(input); + GE_CHK_STATUS_RET(context.SetOutput(kDataOutputIndex, *input), "[%s] Failed to set output.", context.GetNodeName()) + GELOGD("[%s] compute success.", node_->GetName().c_str()); + return SUCCESS; +} + +REGISTER_KERNEL_CREATOR(Data, DataKernel); +} // namespace host_cpu +} // namespace hybrid +} // namespace ge diff --git a/ge/hybrid/node_executor/host_cpu/kernel/data_kernel.h b/ge/hybrid/node_executor/host_cpu/kernel/data_kernel.h new file mode 100644 index 00000000..ca42d647 --- /dev/null +++ b/ge/hybrid/node_executor/host_cpu/kernel/data_kernel.h @@ -0,0 +1,42 @@ +/** + * Copyright 2020 Huawei Technologies Co., Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef GE_HYBRID_HOST_CPU_KERNEL_DATA_KERNEL_H_ +#define GE_HYBRID_HOST_CPU_KERNEL_DATA_KERNEL_H_ + +#include "hybrid/node_executor/host_cpu/kernel/kernel.h" + +namespace ge { +namespace hybrid { +namespace host_cpu { +class DataKernel : public Kernel { + public: + DataKernel(const NodePtr &node) : Kernel(node) {} + ~DataKernel() override = default; + DataKernel &operator=(const DataKernel &op) = delete; + DataKernel(const DataKernel &op) = delete; + + /** + * @brief compute for node_task. + * @return result + */ + Status Compute(TaskContext& context) override; +}; +} // namespace host_cpu +} // namespace hybrid +} // namespace ge + +#endif // GE_HYBRID_HOST_CPU_KERNEL_DATA_KERNEL_H_ diff --git a/ge/hybrid/node_executor/host_cpu/kernel/no_op_kernel.cc b/ge/hybrid/node_executor/host_cpu/kernel/no_op_kernel.cc index ff5a7c6d..b1b4e68c 100644 --- a/ge/hybrid/node_executor/host_cpu/kernel/no_op_kernel.cc +++ b/ge/hybrid/node_executor/host_cpu/kernel/no_op_kernel.cc @@ -23,7 +23,7 @@ namespace ge { namespace hybrid { namespace host_cpu { Status NoOpKernel::Compute(TaskContext& context) { - GELOGI("[%s] no need to compute.", node_->GetName().c_str()); + GELOGD("[%s] no need to compute.", node_->GetName().c_str()); return SUCCESS; } diff --git a/ge/hybrid/node_executor/host_cpu/kernel/random_uniform_kernel.cc b/ge/hybrid/node_executor/host_cpu/kernel/random_uniform_kernel.cc index 37b07e37..52d48821 100755 --- a/ge/hybrid/node_executor/host_cpu/kernel/random_uniform_kernel.cc +++ b/ge/hybrid/node_executor/host_cpu/kernel/random_uniform_kernel.cc @@ -30,8 +30,6 @@ namespace ge { namespace hybrid { namespace host_cpu { Status RandomUniformKernel::Compute(TaskContext& context) { - GELOGI("[%s] compute begin.", node_->GetName().c_str()); - int64_t seed = 0; int64_t seed2 = 0; (void)AttrUtils::GetInt(node_->GetOpDesc(), "seed", seed); @@ -66,7 +64,7 @@ Status RandomUniformKernel::Compute(TaskContext& context) { return UNSUPPORTED; } - GELOGI("[%s] compute success.", node_->GetName().c_str()); + GELOGD("[%s] compute success.", node_->GetName().c_str()); return SUCCESS; } diff --git a/ge/hybrid/node_executor/host_cpu/kernel/variable_kernel.cc b/ge/hybrid/node_executor/host_cpu/kernel/variable_kernel.cc index 2a836458..16738c2a 100644 --- a/ge/hybrid/node_executor/host_cpu/kernel/variable_kernel.cc +++ b/ge/hybrid/node_executor/host_cpu/kernel/variable_kernel.cc @@ -23,8 +23,6 @@ namespace ge { namespace hybrid { namespace host_cpu { Status VariableKernel::Compute(TaskContext& context) { - GELOGI("[%s] compute begin.", node_->GetName().c_str()); - auto tensor = context.GetVariable(node_->GetName()); if (tensor == nullptr) { GELOGE(PARAM_INVALID, "tensor is NULL."); @@ -32,7 +30,7 @@ Status VariableKernel::Compute(TaskContext& context) { } // Constant & Variable Op has and only has one output GE_CHK_STATUS_RET(context.SetOutput(0, *tensor), "[%s] Failed to set output.", context.GetNodeName()); - GELOGI("[%s] compute success.", node_->GetName().c_str()); + GELOGD("[%s] compute success.", node_->GetName().c_str()); return SUCCESS; } diff --git a/inc/framework/common/types.h b/inc/framework/common/types.h index 4d4c54d1..2dbb1753 100644 --- a/inc/framework/common/types.h +++ b/inc/framework/common/types.h @@ -437,6 +437,7 @@ REGISTER_OPTYPE_DECLARE(HCOMRECEIVE, "HcomReceive"); REGISTER_OPTYPE_DECLARE(HCOMREMOTEREAD, "HcomRemoteRead"); REGISTER_OPTYPE_DECLARE(HCOMREMOTEREFREAD, "HcomRemoteRefRead"); REGISTER_OPTYPE_DECLARE(HCOMREMOTEWRITE, "HcomRemoteWrite"); +REGISTER_OPTYPE_DECLARE(HCOMREMOTESCATTERWRITE, "HcomRemoteScatterWrite"); REGISTER_OPTYPE_DECLARE(VARASSIGN, "VarAssign"); REGISTER_OPTYPE_DECLARE(VARISINITIALIZEDOP, "VarIsInitializedOp"); diff --git a/inc/framework/omg/parser/parser_types.h b/inc/framework/omg/parser/parser_types.h index 62c9c750..f2bd4e28 100644 --- a/inc/framework/omg/parser/parser_types.h +++ b/inc/framework/omg/parser/parser_types.h @@ -370,7 +370,9 @@ FMK_FUNC_HOST_VISIBILITY FMK_FUNC_DEV_VISIBILITY extern const char *HCOMREDUCESC FMK_FUNC_HOST_VISIBILITY FMK_FUNC_DEV_VISIBILITY extern const char *HCOMSEND; FMK_FUNC_HOST_VISIBILITY FMK_FUNC_DEV_VISIBILITY extern const char *HCOMRECEIVE; FMK_FUNC_HOST_VISIBILITY FMK_FUNC_DEV_VISIBILITY extern const char *HCOMREMOTEREAD; +FMK_FUNC_HOST_VISIBILITY FMK_FUNC_DEV_VISIBILITY extern const char *HCOMREMOTEREFREAD; FMK_FUNC_HOST_VISIBILITY FMK_FUNC_DEV_VISIBILITY extern const char *HCOMREMOTEWRITE; +FMK_FUNC_HOST_VISIBILITY FMK_FUNC_DEV_VISIBILITY extern const char *HCOMREMOTESCATTERWRITE; FMK_FUNC_HOST_VISIBILITY FMK_FUNC_DEV_VISIBILITY extern const char *VARASSIGN; FMK_FUNC_HOST_VISIBILITY FMK_FUNC_DEV_VISIBILITY extern const char *VARISINITIALIZEDOP; diff --git a/tests/ut/ge/CMakeLists.txt b/tests/ut/ge/CMakeLists.txt index 91a6620d..5979f5cf 100755 --- a/tests/ut/ge/CMakeLists.txt +++ b/tests/ut/ge/CMakeLists.txt @@ -589,6 +589,7 @@ set(DISTINCT_GRAPH_LOAD_TEST_FILES #"graph/graph_load_unittest.cc" "graph/ge_executor_unittest.cc" "graph/load/model_helper_unittest.cc" + "graph/load/model_utils_unittest.cc" ) set(PASS_TEST_FILES diff --git a/tests/ut/ge/graph/load/model_utils_unittest.cc b/tests/ut/ge/graph/load/model_utils_unittest.cc new file mode 100644 index 00000000..bd86c71e --- /dev/null +++ b/tests/ut/ge/graph/load/model_utils_unittest.cc @@ -0,0 +1,70 @@ +/** + * Copyright 2019-2020 Huawei Technologies Co., Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#define protected public +#define private public +#include "graph/load/new_model_manager/model_utils.h" +#include "graph/manager/graph_var_manager.h" + +using namespace std; + +namespace ge { +class UtestModelUtils : public testing::Test { + protected: + void TearDown() {} +}; + +// test ModelUtils::GetVarAddr +TEST_F(UtestModelUtils, get_var_addr_hbm) { + uint8_t test = 2; + uint8_t *pf = &test; + RuntimeParam runtime_param; + runtime_param.session_id = 0; + runtime_param.logic_var_base = 0; + runtime_param.var_base = pf; + runtime_param.var_size = 16; + + int64_t offset = 8; + EXPECT_EQ(VarManager::Instance(runtime_param.session_id)->Init(0, 0, 0, 0), SUCCESS); + EXPECT_NE(VarManager::Instance(runtime_param.session_id)->var_resource_, nullptr); + VarManager::Instance(runtime_param.session_id)->var_resource_->var_offset_map_[offset] = RT_MEMORY_HBM; + std::shared_ptr op_desc = std::make_shared("test", "test"); + uint8_t *var_addr = nullptr; + EXPECT_EQ(ModelUtils::GetVarAddr(runtime_param, op_desc, offset, var_addr), SUCCESS); + EXPECT_EQ(runtime_param.var_base + offset - runtime_param.logic_var_base, var_addr); + VarManager::Instance(runtime_param.session_id)->Destory(); +} + +TEST_F(UtestModelUtils, get_var_addr_rdma_hbm) { + uint8_t test = 2; + uint8_t *pf = &test; + RuntimeParam runtime_param; + runtime_param.session_id = 0; + runtime_param.logic_var_base = 0; + runtime_param.var_base = pf; + + int64_t offset = 8; + EXPECT_EQ(VarManager::Instance(runtime_param.session_id)->Init(0, 0, 0, 0), SUCCESS); + EXPECT_NE(VarManager::Instance(runtime_param.session_id)->var_resource_, nullptr); + VarManager::Instance(runtime_param.session_id)->var_resource_->var_offset_map_[offset] = RT_MEMORY_RDMA_HBM; + std::shared_ptr op_desc = std::make_shared("test", "test"); + uint8_t *var_addr = nullptr; + EXPECT_EQ(ModelUtils::GetVarAddr(runtime_param, op_desc, offset, var_addr), SUCCESS); + EXPECT_EQ(reinterpret_cast(offset), var_addr); + VarManager::Instance(runtime_param.session_id)->Destory(); +} +} // namespace ge diff --git a/third_party/fwkacllib/inc/runtime/mem.h b/third_party/fwkacllib/inc/runtime/mem.h index 32bd9e6b..c305fb12 100644 --- a/third_party/fwkacllib/inc/runtime/mem.h +++ b/third_party/fwkacllib/inc/runtime/mem.h @@ -34,6 +34,7 @@ extern "C" { */ #define RT_MEMORY_DEFAULT ((uint32_t)0x0) // default memory on device #define RT_MEMORY_HBM ((uint32_t)0x2) // HBM memory on device +#define RT_MEMORY_RDMA_HBM ((uint32_t)0x3) // RDMA-HBM memory on device #define RT_MEMORY_DDR ((uint32_t)0x4) // DDR memory on device #define RT_MEMORY_SPM ((uint32_t)0x8) // shared physical memory on device #define RT_MEMORY_P2P_HBM ((uint32_t)0x10) // HBM memory on other 4P device From 2fc8c77a01f54d4c8f2f57d7eea7314d89541b3b Mon Sep 17 00:00:00 2001 From: chenyemeng Date: Mon, 18 Jan 2021 16:59:24 +0800 Subject: [PATCH 2/3] cache support --- inc/framework/omg/parser/parser_types.h | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/inc/framework/omg/parser/parser_types.h b/inc/framework/omg/parser/parser_types.h index f2bd4e28..f3b7f00a 100644 --- a/inc/framework/omg/parser/parser_types.h +++ b/inc/framework/omg/parser/parser_types.h @@ -238,8 +238,8 @@ FMK_FUNC_HOST_VISIBILITY FMK_FUNC_DEV_VISIBILITY extern const char *SOFTSIGN; FMK_FUNC_HOST_VISIBILITY FMK_FUNC_DEV_VISIBILITY extern const char *COSH; FMK_FUNC_HOST_VISIBILITY FMK_FUNC_DEV_VISIBILITY extern const char *SINH; FMK_FUNC_HOST_VISIBILITY FMK_FUNC_DEV_VISIBILITY extern const char *SQUAREDDIFFERENCE; -FMK_FUNC_HOST_VISIBILITY FMK_FUNC_DEV_VISIBILITY extern const char - *REQUIREDSPACETOBATCHPADDINGS; // for retinanet scope fusion +// for retinanet scope fusion +FMK_FUNC_HOST_VISIBILITY FMK_FUNC_DEV_VISIBILITY extern const char *REQUIREDSPACETOBATCHPADDINGS; FMK_FUNC_HOST_VISIBILITY FMK_FUNC_DEV_VISIBILITY extern const char *SSDPOSTPROCESSOR; FMK_FUNC_HOST_VISIBILITY FMK_FUNC_DEV_VISIBILITY extern const char *RETINANETBOXES; FMK_FUNC_HOST_VISIBILITY FMK_FUNC_DEV_VISIBILITY extern const char *RETINAMULTIANCHORS; From a892b2bf901e9939e49d8125014dbaa599519902 Mon Sep 17 00:00:00 2001 From: chenyemeng Date: Tue, 19 Jan 2021 12:35:38 +0800 Subject: [PATCH 3/3] cache support --- .../load/new_model_manager/model_utils.cc | 25 ++++++++++++------- ge/graph/manager/graph_var_manager.cc | 4 +-- 2 files changed, 18 insertions(+), 11 deletions(-) diff --git a/ge/graph/load/new_model_manager/model_utils.cc b/ge/graph/load/new_model_manager/model_utils.cc index efd8c619..d9a9f3ca 100755 --- a/ge/graph/load/new_model_manager/model_utils.cc +++ b/ge/graph/load/new_model_manager/model_utils.cc @@ -379,17 +379,24 @@ vector ModelUtils::GetInputDataAddrs(const RuntimeParam &model_param, Co /// Status ModelUtils::GetVarAddr(const RuntimeParam &model_param, const ConstOpDescPtr &op_desc, int64_t offset, uint8_t *&var_addr) { - if (ge::VarManager::Instance(model_param.session_id)->GetVarMemType(offset) == RT_MEMORY_RDMA_HBM) { - if (offset < 0) { - GELOGE(PARAM_INVALID, "rdma var addr is invalid, addr=%p", reinterpret_cast(offset)); + rtMemType_t mem_type = ge::VarManager::Instance(model_param.session_id)->GetVarMemType(offset); + switch (mem_type) { + case RT_MEMORY_RDMA_HBM: + if (offset < 0) { + GELOGE(PARAM_INVALID, "rdma var addr is invalid, addr=%p", reinterpret_cast(offset)); + return PARAM_INVALID; + } + var_addr = reinterpret_cast(offset); + break; + case RT_MEMORY_HBM: + VALIDATE_MEM_RANGE(op_desc, model_param.var_size, offset - model_param.logic_var_base); + var_addr = model_param.var_base + offset - model_param.logic_var_base; + break; + default: + GELOGE(PARAM_INVALID, "unsupported memory type %u", mem_type); return PARAM_INVALID; - } - var_addr = reinterpret_cast(offset); - GE_CHECK_NOTNULL(var_addr); - } else { - VALIDATE_MEM_RANGE(op_desc, model_param.var_size, offset - model_param.logic_var_base); - var_addr = model_param.var_base + offset - model_param.logic_var_base; } + GE_CHECK_NOTNULL(var_addr); return SUCCESS; } diff --git a/ge/graph/manager/graph_var_manager.cc b/ge/graph/manager/graph_var_manager.cc index 928c893f..8a829d47 100755 --- a/ge/graph/manager/graph_var_manager.cc +++ b/ge/graph/manager/graph_var_manager.cc @@ -212,7 +212,7 @@ rtMemType_t VarResource::GetVarMemType(const int64_t &offset) { if (var_offset_map_.count(offset) > 0) { return var_offset_map_[offset]; } - return RT_MEMORY_HBM; + return RT_MEMORY_RESERVED; } VarTransRoad *VarResource::GetTransRoad(const std::string &var_name) { @@ -660,7 +660,7 @@ rtMemType_t VarManager::GetVarMemType(const int64_t &offset) { std::lock_guard lock(mutex_); if (var_resource_ == nullptr) { GELOGW("VarManager has not been init."); - return RT_MEMORY_HBM; + return RT_MEMORY_RESERVED; } return var_resource_->GetVarMemType(offset); }