From db7eb5f3568c7a22fa9b8d06c466d3e20478c5c5 Mon Sep 17 00:00:00 2001 From: chenyemeng Date: Tue, 11 May 2021 21:20:17 +0800 Subject: [PATCH] rw lock for GetOrCreateNodeState --- ge/graph/build/stream_allocator.cc | 14 +- ge/graph/build/stream_allocator.h | 5 +- ge/graph/manager/graph_caching_allocator.cc | 1 + ge/graph/manager/host_mem_allocator.cc | 5 +- ge/hybrid/executor/hybrid_execution_context.h | 1 + .../hybrid_model_pipeline_executor.cc | 34 +++- .../executor/hybrid_model_pipeline_executor.h | 1 + ge/hybrid/executor/node_done_manager.cc | 5 + ge/hybrid/executor/node_done_manager.h | 2 + ge/hybrid/executor/subgraph_context.cc | 62 +++++- ge/hybrid/executor/subgraph_context.h | 7 +- ge/hybrid/executor/subgraph_executor.cc | 16 +- ge/hybrid/model/hybrid_model_builder.cc | 18 +- .../node_executor/hccl/hccl_node_executor.cc | 15 +- tests/depends/mmpa/src/mmpa_stub.cc | 84 ++++++++ tests/depends/runtime/src/runtime_stub.cc | 4 +- tests/ut/ge/CMakeLists.txt | 3 + .../graph/build/stream_allocator_unittest.cc | 190 ++++++++++++++++++ .../manager/host_mem_allocator_unittest.cc | 40 ++++ ...hybrid_model_pipeline_executor_unittest.cc | 68 +++++++ .../executor/subgraph_executor_unittest.cc | 17 +- .../model/hybrid_model_builder_unittest.cc | 31 ++- 22 files changed, 589 insertions(+), 34 deletions(-) create mode 100644 tests/ut/ge/graph/build/stream_allocator_unittest.cc create mode 100644 tests/ut/ge/graph/manager/host_mem_allocator_unittest.cc create mode 100644 tests/ut/ge/hybrid/executor/hybrid_model_pipeline_executor_unittest.cc diff --git a/ge/graph/build/stream_allocator.cc b/ge/graph/build/stream_allocator.cc index 0e1a1aba..a5a1112e 100644 --- a/ge/graph/build/stream_allocator.cc +++ b/ge/graph/build/stream_allocator.cc @@ -905,6 +905,7 @@ Status StreamAllocator::SplitStreams(vector> &split_streams) { added_stream_num_vec[stream_id]++; new_stream_id_vec[stream_id] = last_stream_id; split_streams[stream_id].emplace(last_stream_id); + split_ori_stream_map_[last_stream_id] = stream_id; node_split_stream_map_[cur_node] = last_stream_id; // Add the send/recv event to the first and last nodes of the split stream. @@ -1104,7 +1105,7 @@ Status StreamAllocator::UpdateActiveStreamsForActiveNode(const vector(new_split_stream)); active_streams.assign(new_active_streams.begin(), new_active_streams.end()); if (!AttrUtils::SetListInt(active_op, ATTR_NAME_ACTIVE_STREAM_LIST, active_streams)) { @@ -1148,13 +1150,21 @@ Status StreamAllocator::UpdateActiveStreamsForSubgraphs() const { } bool StreamAllocator::IsActivated(int64_t stream_id) const { + const auto &iter = split_ori_stream_map_.find(stream_id); + if (iter == split_ori_stream_map_.end()) { + REPORT_INNER_ERROR("E19999", "Find original stream_id failed, split_stream_id=%ld", stream_id); + GELOGE(INTERNAL_ERROR, "[CheckActivated][Check] Find original stream_id failed, split_stream_id=%ld", stream_id); + return false; + } + int64_t ori_stream_id = iter->second; for (const auto &node : whole_graph_->GetNodes(whole_graph_->GetGraphUnknownFlag())) { auto op_desc = node->GetOpDesc(); vector active_streams; if (op_desc == nullptr || !AttrUtils::GetListInt(op_desc, ATTR_NAME_ACTIVE_STREAM_LIST, active_streams)) { continue; } - if (std::find(active_streams.begin(), active_streams.end(), stream_id) != active_streams.end()) { + if (std::find(active_streams.begin(), active_streams.end(), stream_id) != active_streams.end() || + std::find(active_streams.begin(), active_streams.end(), ori_stream_id) != active_streams.end()) { return true; } } diff --git a/ge/graph/build/stream_allocator.h b/ge/graph/build/stream_allocator.h index 44dcd673..34b2ec3a 100644 --- a/ge/graph/build/stream_allocator.h +++ b/ge/graph/build/stream_allocator.h @@ -66,7 +66,7 @@ class StreamAllocator { Status UpdateActiveStreamsForSwitchNode(NodePtr &switch_node); Status InsertActiveNodesAfterSwitch(NodePtr &switch_nodes, std::vector &switch_active_nodes); Status UpdateActiveStreamsForActiveNode(const std::vector> &split_streams, NodePtr &node); - Status UpdateActiveStreamsForSubgraphs() const; + Status UpdateActiveStreamsForSubgraphs(); bool IsActivated(int64_t stream_id) const; Status SetActiveStreamsForLoop(); Status CheckStreamActived() const; @@ -114,6 +114,7 @@ class StreamAllocator { std::map> specific_activated_streams_nodes_map_; std::map node_split_stream_map_; + std::map split_ori_stream_map_; std::map subgraph_first_active_node_map_; // send events corresponding to the node @@ -123,4 +124,4 @@ class StreamAllocator { std::map> node_to_recv_events_; }; } // namespace ge -#endif // GE_GRAPH_BUILD_STREAM_ALLOCATOR_H_ \ No newline at end of file +#endif // GE_GRAPH_BUILD_STREAM_ALLOCATOR_H_ diff --git a/ge/graph/manager/graph_caching_allocator.cc b/ge/graph/manager/graph_caching_allocator.cc index 75aa5c01..8326038d 100644 --- a/ge/graph/manager/graph_caching_allocator.cc +++ b/ge/graph/manager/graph_caching_allocator.cc @@ -137,6 +137,7 @@ uint8_t *CachingAllocator::Malloc(size_t size, uint8_t *org_ptr, uint32_t device uint8_t *ptr = nullptr; Block *block = FindFreeBlock(size, org_ptr, device_id); if (block == nullptr) { + std::lock_guard lock(mutex_); if (ge::SUCCESS == TryExtendCache(size, device_id)) { block = FindFreeBlock(size, org_ptr, device_id); if (block != nullptr) { diff --git a/ge/graph/manager/host_mem_allocator.cc b/ge/graph/manager/host_mem_allocator.cc index 98f9a313..baf81a60 100644 --- a/ge/graph/manager/host_mem_allocator.cc +++ b/ge/graph/manager/host_mem_allocator.cc @@ -21,7 +21,10 @@ namespace ge { const void *HostMemAllocator::Malloc(const std::shared_ptr &aligned_ptr, size_t size) { if (aligned_ptr == nullptr) { - GELOGW("Insert a null aligned_ptr"); + GELOGW("Insert a null aligned_ptr, size=%zu", size); + if (size == 0) { + allocated_blocks_[nullptr] = { size, nullptr }; + } return nullptr; } GELOGD("allocate existed host memory succ, size=%zu", size); diff --git a/ge/hybrid/executor/hybrid_execution_context.h b/ge/hybrid/executor/hybrid_execution_context.h index f2628409..489d6d99 100644 --- a/ge/hybrid/executor/hybrid_execution_context.h +++ b/ge/hybrid/executor/hybrid_execution_context.h @@ -62,6 +62,7 @@ struct GraphExecutionContext { const HybridModel *model = nullptr; const GEThreadLocalContext *ge_context = nullptr; rtStream_t stream = nullptr; + rtStream_t hccl_stream = nullptr; rtContext_t rt_context = nullptr; rtContext_t rt_gen_context = nullptr; std::unique_ptr callback_manager = nullptr; diff --git a/ge/hybrid/executor/hybrid_model_pipeline_executor.cc b/ge/hybrid/executor/hybrid_model_pipeline_executor.cc index ba24d78d..c0bd5c7d 100644 --- a/ge/hybrid/executor/hybrid_model_pipeline_executor.cc +++ b/ge/hybrid/executor/hybrid_model_pipeline_executor.cc @@ -18,14 +18,26 @@ const char *const kEnvProfilingLevel = "HYBRID_PROFILING_LEVEL"; StageExecutor::StageExecutor(int id, HybridModel *model, PipeExecutionConfig *config) : id_(id), model_(model), pipe_config_(config) {} -StageExecutor::~StageExecutor() { GELOGD("~StageExecutor(), id = %d", id_); } +StageExecutor::~StageExecutor() { + GELOGD("~StageExecutor(), id = %d", id_); + if (stream_ != nullptr) { + GE_CHK_RT(rtStreamDestroy(stream_)); + stream_ = nullptr; + } + if (hccl_stream_ != nullptr) { + GE_CHK_RT(rtStreamDestroy(hccl_stream_)); + hccl_stream_ = nullptr; + } +} Status StageExecutor::Init() { GELOGD("[Executor: %d] Start to init StateExecutor", id_); context_.rt_context = pipe_config_->rt_context; GE_CHK_STATUS_RET_NOLOG(InitExecutionContext()); GE_CHK_RT_RET(rtStreamCreate(&stream_, RT_STREAM_PRIORITY_DEFAULT)); + GE_CHK_RT_RET(rtStreamCreate(&hccl_stream_, RT_STREAM_PRIORITY_DEFAULT)); context_.stream = stream_; + context_.hccl_stream = hccl_stream_; root_graph_executor_.reset(new (std::nothrow) SubgraphExecutor(model_->GetRootGraphItem(), &context_)); GE_CHECK_NOTNULL(root_graph_executor_); @@ -78,11 +90,11 @@ Status StageExecutor::Start(const std::vector &inputs, const std::v if (task_info.event != nullptr) { GELOGD("[%d] Add StreamWaitEvent", id_); GE_CHK_RT_RET(rtStreamWaitEvent(stream_, task_info.event)); - RECORD_MODEL_EXECUTION_EVENT(&context_, "[iteration = %ld] [Stage = %d] End", task_info.iteration - 1, + RECORD_MODEL_EXECUTION_EVENT(&context_, "[iteration = %ld] [Stage = %d] EventWait End", task_info.iteration, task_info.stage); } - RECORD_MODEL_EXECUTION_EVENT(&context_, "[iteration = %lld] [Stage = %d] Start", task_info.iteration, + RECORD_MODEL_EXECUTION_EVENT(&context_, "[iteration = %ld] [Stage = %d] Start", task_info.iteration, task_info.stage); if (task_info.stage == 0) { @@ -102,6 +114,10 @@ Status StageExecutor::Start(const std::vector &inputs, const std::v StageTask next_task; next_task.stage = task_info.stage; next_task.iteration = task_info.iteration + 1; + if ((task_info.iteration + 1) % iteration_count > 0) { + GE_CHK_RT_RET(rtEventCreate(&next_task.event)); + GE_CHK_RT_RET(rtEventRecord(next_task.event, context_.hccl_stream)); + } auto sync_result = Synchronize(); if (sync_result != SUCCESS) { @@ -110,15 +126,22 @@ Status StageExecutor::Start(const std::vector &inputs, const std::v id_, sync_result, task_info.iteration); REPORT_CALL_ERROR("E19999", "[Executor: %d] Failed to sync result:%d. iteration = %ld", id_, sync_result, task_info.iteration); - context_.profiler->Dump(std::cout); + if (context_.profiler != nullptr) { + context_.profiler->Dump(std::cout); + } context_.callback_manager->Destroy(); RuntimeInferenceContext::DestroyContext(std::to_string(context_.context_id)); return sync_result; } + if (task_info.event != nullptr) { + GE_CHK_RT_RET(rtEventDestroy(task_info.event)); + RECORD_MODEL_EXECUTION_EVENT(&context_, "[iteration = %ld] [Stage = %d] EventDestroy End", task_info.iteration, + task_info.stage); + } RECORD_MODEL_EXECUTION_EVENT(&context_, "[iteration = %ld] [Stage = %d] End", task_info.iteration, task_info.stage); - // if not end stage + // if end stage if (task_info.stage >= pipe_config_->num_stages - 1) { RECORD_MODEL_EXECUTION_EVENT(&context_, "[iteration = %ld] Schedule End", task_info.iteration); GELOGD("[Executor: %d] End of iteration [%ld]", id_, task_info.iteration); @@ -163,6 +186,7 @@ Status StageExecutor::InitExecutionContext() { context_.callback_manager = std::unique_ptr(new (std::nothrow) CallbackManager()); GE_CHECK_NOTNULL(context_.callback_manager); context_.dump_properties = DumpManager::GetInstance().GetDumpProperties(context_.session_id); + context_.is_eos_ = false; if (IsLogEnable(GE_MODULE_NAME, DLOG_DEBUG)) { context_.trace_enabled = true; } diff --git a/ge/hybrid/executor/hybrid_model_pipeline_executor.h b/ge/hybrid/executor/hybrid_model_pipeline_executor.h index cb08d872..c59e1462 100644 --- a/ge/hybrid/executor/hybrid_model_pipeline_executor.h +++ b/ge/hybrid/executor/hybrid_model_pipeline_executor.h @@ -63,6 +63,7 @@ class StageExecutor { StageExecutor *next_executor_ = nullptr; rtStream_t stream_ = nullptr; + rtStream_t hccl_stream_ = nullptr; }; class HybridModelPipelineExecutor { diff --git a/ge/hybrid/executor/node_done_manager.cc b/ge/hybrid/executor/node_done_manager.cc index d31765c2..0ea04661 100644 --- a/ge/hybrid/executor/node_done_manager.cc +++ b/ge/hybrid/executor/node_done_manager.cc @@ -121,5 +121,10 @@ void NodeDoneManager::Reset(const NodePtr &node) { GELOGD("[%s] Node reset.", node->GetName().c_str()); } } + +void NodeDoneManager::Reset() { + subjects_.clear(); + destroyed_ = false; +} } // namespace hybrid } // namespace ge diff --git a/ge/hybrid/executor/node_done_manager.h b/ge/hybrid/executor/node_done_manager.h index 292d1369..bedbff3d 100644 --- a/ge/hybrid/executor/node_done_manager.h +++ b/ge/hybrid/executor/node_done_manager.h @@ -35,6 +35,8 @@ class NodeDoneManager { void Destroy(); + void Reset(); + private: class Cond { public: diff --git a/ge/hybrid/executor/subgraph_context.cc b/ge/hybrid/executor/subgraph_context.cc index 9a9a97c2..5de0828f 100644 --- a/ge/hybrid/executor/subgraph_context.cc +++ b/ge/hybrid/executor/subgraph_context.cc @@ -15,8 +15,6 @@ */ #include "subgraph_context.h" - -#include "common/debug/log.h" #include "hybrid/executor/hybrid_model_executor.h" namespace ge { @@ -25,6 +23,13 @@ SubgraphContext::SubgraphContext(const GraphItem *graph_item, const GraphExecuti : graph_item_(graph_item), execution_context_(execution_context) { } +SubgraphContext::~SubgraphContext() { + if (mmRWLockDestroy(&rw_lock_) != EN_OK) { + REPORT_CALL_ERROR("E19999", "Destroy rw_lock failed"); + GELOGE(INTERNAL_ERROR, "[RWLock][Destroy] Destroy rw_lock failed"); + } +} + Status SubgraphContext::Init() { GE_CHECK_NOTNULL(graph_item_); GELOGD("[%s] Start to init subgraph context. total inputs = %d, total outputs = %d", @@ -33,7 +38,11 @@ Status SubgraphContext::Init() { graph_item_->TotalOutputs()); all_inputs_.resize(static_cast(graph_item_->TotalInputs())); all_outputs_.resize(static_cast(graph_item_->TotalOutputs())); - + if (mmRWLockInit(&rw_lock_) != EN_OK) { + REPORT_CALL_ERROR("E19999", "Init rw_lock failed"); + GELOGE(INTERNAL_ERROR, "[RWLock][Init] Init rw_lock failed"); + return INTERNAL_ERROR; + } return SUCCESS; } @@ -42,13 +51,48 @@ void SubgraphContext::ResetContext(const NodePtr &node) { } NodeStatePtr SubgraphContext::GetOrCreateNodeState(const NodeItem *node_item) { - std::lock_guard lk(mu_); + GELOGD("[%s] lock for read", node_item->NodeName().c_str()); + if (mmRWLockRDLock(&rw_lock_) != EN_OK) { + REPORT_CALL_ERROR("E19999", "[Node:%s] Lock for read failed", node_item->NodeName().c_str()); + GELOGE(INTERNAL_ERROR, "[RWLock][Lock][Node:%s] Lock for read failed", node_item->NodeName().c_str()); + return nullptr; + } + const auto &iter = node_states_.find(node_item); + if (iter != node_states_.end()) { + auto state = iter->second; + GELOGD("[%s] unlock for read", node_item->NodeName().c_str()); + if (mmRDLockUnLock(&rw_lock_) != EN_OK) { + REPORT_CALL_ERROR("E19999", "[Node:%s] Unlock for read failed", node_item->NodeName().c_str()); + GELOGE(INTERNAL_ERROR, "[RWLock][Unlock][Node:%s] Unlock for read failed", node_item->NodeName().c_str()); + return nullptr; + } + return state; + } + GELOGD("[%s] unlock for read", node_item->NodeName().c_str()); + if (mmRDLockUnLock(&rw_lock_) != EN_OK) { + REPORT_CALL_ERROR("E19999", "[Node:%s] Unlock for read failed", node_item->NodeName().c_str()); + GELOGE(INTERNAL_ERROR, "[RWLock][Unlock][Node:%s] Unlock for read failed", node_item->NodeName().c_str()); + return nullptr; + } + + GELOGD("[%s] lock for write", node_item->NodeName().c_str()); + if (mmRWLockWRLock(&rw_lock_) != EN_OK) { + REPORT_CALL_ERROR("E19999", "[Node:%s] Lock for write failed", node_item->NodeName().c_str()); + GELOGE(INTERNAL_ERROR, "[RWLock][Lock][Node:%s] Lock for write failed", node_item->NodeName().c_str()); + return nullptr; + } auto &node_state = node_states_[node_item]; if (node_state == nullptr) { const auto &guard = node_item->MutexGuard("GetOrCreateNodeState"); - node_state.reset(new(std::nothrow)NodeState(*node_item, this)); + node_state = std::move(std::unique_ptr(new(std::nothrow)NodeState(*node_item, this))); (void)guard; } + GELOGD("[%s] unlock for write", node_item->NodeName().c_str()); + if (mmWRLockUnLock(&rw_lock_) != EN_OK) { + REPORT_CALL_ERROR("E19999", "[Node:%s] Unlock for write failed", node_item->NodeName().c_str()); + GELOGE(INTERNAL_ERROR, "[RWLock][Unlock][Node:%s] Unlock for write failed", node_item->NodeName().c_str()); + return nullptr; + } return node_state; } @@ -144,5 +188,13 @@ void SubgraphContext::OnError(Status error) { void SubgraphContext::NodeDone(const NodePtr &node) { node_done_manager_.NodeDone(node); } + +void SubgraphContext::Reset() { + node_done_manager_.Reset(); + if (mmRWLockWRLock(&rw_lock_) == EN_OK) { + node_states_.clear(); + (void)mmWRLockUnLock(&rw_lock_); + } +} } // namespace hybrid } // namespace ge diff --git a/ge/hybrid/executor/subgraph_context.h b/ge/hybrid/executor/subgraph_context.h index ff692ed9..7a99e324 100644 --- a/ge/hybrid/executor/subgraph_context.h +++ b/ge/hybrid/executor/subgraph_context.h @@ -18,7 +18,7 @@ #define GE_HYBRID_EXECUTOR_ITERATION_CONTEXT_H_ #include - +#include "mmpa/mmpa_api.h" #include "hybrid/common/tensor_value.h" #include "hybrid/executor/hybrid_execution_context.h" #include "hybrid/executor/node_state.h" @@ -31,10 +31,11 @@ namespace hybrid { class SubgraphContext { public: explicit SubgraphContext(const GraphItem *graph_item, const GraphExecutionContext *execution_context); - ~SubgraphContext() = default; + ~SubgraphContext(); Status Init(); void ResetContext(const NodePtr &node); + void Reset(); NodeStatePtr GetOrCreateNodeState(const NodeItem *node_item); void OnError(Status error); @@ -52,7 +53,7 @@ class SubgraphContext { friend class TaskContext; const GraphItem *graph_item_; const GraphExecutionContext *execution_context_; - std::mutex mu_; + mmRWLock_t rw_lock_; std::vector all_inputs_; std::vector all_outputs_; NodeDoneManager node_done_manager_; diff --git a/ge/hybrid/executor/subgraph_executor.cc b/ge/hybrid/executor/subgraph_executor.cc index 60895c7e..3536f295 100644 --- a/ge/hybrid/executor/subgraph_executor.cc +++ b/ge/hybrid/executor/subgraph_executor.cc @@ -704,7 +704,21 @@ Status SubgraphExecutor::PartialExecuteAsync(int task_group) { Status SubgraphExecutor::InitForPartialExecution(const vector &inputs, const vector &input_desc) { - return Init(inputs, input_desc); + if (subgraph_context_ == nullptr) { + return Init(inputs, input_desc); + } + subgraph_context_->Reset(); + if (graph_item_->IsDynamic()) { + GE_CHK_STATUS_RET(InitInputsForUnknownShape(inputs, input_desc), + "[%s] Failed to set inputs.", + graph_item_->GetName().c_str()); + } else { + GE_CHK_STATUS_RET(InitInputsForKnownShape(inputs), + "[Invoke][InitInputsForKnownShape][%s] Failed to init subgraph executor for known shape subgraph", + graph_item_->GetName().c_str()); + } + + return SUCCESS; } } // namespace hybrid } // namespace ge diff --git a/ge/hybrid/model/hybrid_model_builder.cc b/ge/hybrid/model/hybrid_model_builder.cc index 86530fe0..91188326 100755 --- a/ge/hybrid/model/hybrid_model_builder.cc +++ b/ge/hybrid/model/hybrid_model_builder.cc @@ -1005,14 +1005,18 @@ Status HybridModelBuilder::InitConstantOps() { // Tensors return by api GetWeights share data with proto, whose addr is not confirmed to be aligned GeTensor aligned_tensor = ge_tensor->Clone(); GELOGD("Init tensor with host constant %s size = %zu", var_name.c_str(), aligned_tensor.MutableData().GetSize()); - if (MemManager::Instance().HostMemInstance(RT_MEMORY_HBM).Malloc(aligned_tensor.GetAlignedPtr(), - aligned_tensor.GetData().size()) == nullptr) { - GELOGE(MEMALLOC_FAILED, "[Malloc][HostMemory] for an existed GeTensor failed, model_name_:%s.", - GetGraphName()); - return MEMALLOC_FAILED; + if (aligned_tensor.GetData().size() > 0) { + if (MemManager::Instance().HostMemInstance(RT_MEMORY_HBM).Malloc(aligned_tensor.GetAlignedPtr(), + aligned_tensor.GetData().size()) == nullptr) { + GELOGE(MEMALLOC_FAILED, "[Malloc][HostMemory] for an existed GeTensor failed, model_name_:%s.", + GetGraphName()); + return MEMALLOC_FAILED; + } + var_tensor.reset(new(std::nothrow)TensorValue(aligned_tensor.MutableData().data(), + aligned_tensor.GetData().size())); + } else { + var_tensor.reset(new(std::nothrow)TensorValue(nullptr, 0)); } - var_tensor.reset(new(std::nothrow)TensorValue(aligned_tensor.MutableData().data(), - aligned_tensor.GetData().size())); } else { GE_CHK_STATUS_RET_NOLOG(VarNodeToTensor(var_node, var_tensor)); GELOGD("Init const op tensor. name = %s, size = %ld", var_name.c_str(), var_tensor->GetSize()); diff --git a/ge/hybrid/node_executor/hccl/hccl_node_executor.cc b/ge/hybrid/node_executor/hccl/hccl_node_executor.cc index 20684194..c46d5080 100644 --- a/ge/hybrid/node_executor/hccl/hccl_node_executor.cc +++ b/ge/hybrid/node_executor/hccl/hccl_node_executor.cc @@ -314,21 +314,26 @@ Status RdmaNodeTask::ExecuteAsync(TaskContext &context, std::function do return SUCCESS; } + rtEvent_t evt = nullptr; + if (context.GetExecutionContext()->hccl_stream != nullptr) { + GE_CHK_RT_RET(rtEventCreateWithFlag(&evt, 0x01)); + GE_CHK_RT_RET(rtStreamWaitEvent(context.GetExecutionContext()->hccl_stream, evt)); + } TaskContext *p_ctx = &context; - auto callback = [p_ctx, done_callback](HcclResult status) { + auto callback = [p_ctx, done_callback, evt](HcclResult status) { if (status != HCCL_SUCCESS) { GELOGE(HCCL_E_INTERNAL, "[Call][HcomExcutorInitialize] failed for node:%s(%s), ret: 0x%X", p_ctx->GetNodeName(), p_ctx->GetNodeItem().NodeType().c_str(), status); p_ctx->SetStatus(FAILED); } done_callback(); + if (evt != nullptr) { + GE_CHK_RT_RET(rtEventRecord(evt, nullptr)); + GE_CHK_RT_RET(rtEventDestroy(evt)); + } GELOGI("rdma callback success."); }; - std::string executor_type = context.GetNodeItem().NodeType(); - if (kRdmaScatterTypes.count(context.GetNodeItem().NodeType()) > 0) { - executor_type = context.GetNodeItem().NodeType() == HCOMREMOTEREFREAD ? HCOMREMOTEREAD : HCOMREMOTEWRITE; - } HcclResult hccl_ret = HcomExecEnqueueRemoteAccess(context.GetNodeItem().NodeType(), addr_infos, callback); if (hccl_ret != HCCL_SUCCESS) { GELOGE(HCCL_E_INTERNAL, "[Call][HcomExecEnqueueRemoteAccess] failed for node:%s(%s), ret: 0x%X", diff --git a/tests/depends/mmpa/src/mmpa_stub.cc b/tests/depends/mmpa/src/mmpa_stub.cc index 62499ca1..a82621ef 100644 --- a/tests/depends/mmpa/src/mmpa_stub.cc +++ b/tests/depends/mmpa/src/mmpa_stub.cc @@ -242,6 +242,90 @@ INT32 mmRealPath(const CHAR *path, CHAR *realPath, INT32 realPathLen) return ret; } +INT32 mmRWLockInit(mmRWLock_t *rwLock) +{ + if (rwLock == NULL) { + return EN_INVALID_PARAM; + } + + INT32 ret = pthread_rwlock_init(rwLock, NULL); + if (ret != MMPA_ZERO) { + return EN_ERROR; + } + + return EN_OK; +} + +INT32 mmRWLockRDLock(mmRWLock_t *rwLock) +{ + if (rwLock == NULL) { + return EN_INVALID_PARAM; + } + + INT32 ret = pthread_rwlock_rdlock(rwLock); + if (ret != MMPA_ZERO) { + return EN_ERROR; + } + + return EN_OK; +} + +INT32 mmRWLockWRLock(mmRWLock_t *rwLock) +{ + if (rwLock == NULL) { + return EN_INVALID_PARAM; + } + + INT32 ret = pthread_rwlock_wrlock(rwLock); + if (ret != MMPA_ZERO) { + return EN_ERROR; + } + + return EN_OK; +} + +INT32 mmRDLockUnLock(mmRWLock_t *rwLock) +{ + if (rwLock == NULL) { + return EN_INVALID_PARAM; + } + + INT32 ret = pthread_rwlock_unlock(rwLock); + if (ret != MMPA_ZERO) { + return EN_ERROR; + } + + return EN_OK; +} + +INT32 mmWRLockUnLock(mmRWLock_t *rwLock) +{ + if (rwLock == NULL) { + return EN_INVALID_PARAM; + } + + INT32 ret = pthread_rwlock_unlock(rwLock); + if (ret != MMPA_ZERO) { + return EN_ERROR; + } + + return EN_OK; +} + +INT32 mmRWLockDestroy(mmRWLock_t *rwLock) +{ + if (rwLock == NULL) { + return EN_INVALID_PARAM; + } + + INT32 ret = pthread_rwlock_destroy(rwLock); + if (ret != MMPA_ZERO) { + return EN_ERROR; + } + + return EN_OK; +} + INT32 mmGetErrorCode() { return 0; diff --git a/tests/depends/runtime/src/runtime_stub.cc b/tests/depends/runtime/src/runtime_stub.cc index 59a98978..2b1af23c 100644 --- a/tests/depends/runtime/src/runtime_stub.cc +++ b/tests/depends/runtime/src/runtime_stub.cc @@ -434,6 +434,8 @@ rtError_t rtGetRtCapability(rtFeatureType_t featureType, int32_t featureInfo, in rtError_t rtGetMaxStreamAndTask(uint32_t streamType, uint32_t *maxStrCount, uint32_t *maxTaskCount) { + *maxStrCount = 1024; + *maxTaskCount = 1024; return RT_ERROR_NONE; } @@ -456,4 +458,4 @@ rtError_t rtDebugUnRegisterForStream(rtStream_t stream) { } #ifdef __cplusplus } -#endif \ No newline at end of file +#endif diff --git a/tests/ut/ge/CMakeLists.txt b/tests/ut/ge/CMakeLists.txt index 3f272023..4db4c0d1 100755 --- a/tests/ut/ge/CMakeLists.txt +++ b/tests/ut/ge/CMakeLists.txt @@ -782,6 +782,7 @@ set(MULTI_PARTS_TEST_FILES "common/ge_auth_file_saver_unittest.cc" "graph/variable_accelerate_ctrl_unittest.cc" "graph/build/logical_stream_allocator_unittest.cc" + "graph/build/stream_allocator_unittest.cc" "graph/build/model_builder_unittest.cc" "graph/build/mem_assigner_unittest.cc" "graph/build/task_generator_unittest.cc" @@ -790,6 +791,7 @@ set(MULTI_PARTS_TEST_FILES "graph/preprocess/graph_preprocess_unittest.cc" "graph/manager/hcom_util_unittest.cc" "graph/manager/graph_caching_allocator_unittest.cc" + "graph/manager/host_mem_allocator_unittest.cc" "graph/manager/session_scope_mem_allocator_unittest.cc" "graph/manager/run_graph_unittest.cc" "graph/partition/dynamic_shape_partition_unittest.cc" @@ -829,6 +831,7 @@ set(HYBRID_TEST_FILES "hybrid/model/hybrid_model_builder_unittest.cc" "hybrid/node_executor/rts/rts_node_task_unittest.cc" "hybrid/executor/hybrid_model_async_executor_unittest.cc" + "hybrid/executor/hybrid_model_pipeline_executor_unittest.cc" "hybrid/node_executor/aicore/aicore_task_compiler_unittest.cc" ) diff --git a/tests/ut/ge/graph/build/stream_allocator_unittest.cc b/tests/ut/ge/graph/build/stream_allocator_unittest.cc new file mode 100644 index 00000000..019e75d1 --- /dev/null +++ b/tests/ut/ge/graph/build/stream_allocator_unittest.cc @@ -0,0 +1,190 @@ +/** + * Copyright 2021 Huawei Technologies Co., Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#include +#include + +#define protected public +#define private public +#include "graph/build/stream_allocator.h" +#undef protected +#undef private + +#include "graph/debug/ge_attr_define.h" +#include "graph/utils/graph_utils.h" + +namespace ge { +class UtestStreamAllocator : public testing::Test { + protected: + void SetUp() {} + void TearDown() {} + public: + + /// + /// A + /// / \ + /// B C + /// | | + /// D 400 + /// | | + /// | E + /// \ / + /// F + /// + void make_graph_active(const ComputeGraphPtr &graph) { + const auto &a_desc = std::make_shared("A", DATA); + a_desc->AddInputDesc(GeTensorDesc()); + a_desc->AddOutputDesc(GeTensorDesc()); + a_desc->SetStreamId(0); + const auto &a_node = graph->AddNode(a_desc); + + const auto &b_desc = std::make_shared("B", "testa"); + b_desc->AddInputDesc(GeTensorDesc()); + b_desc->AddOutputDesc(GeTensorDesc()); + b_desc->SetStreamId(1); + AttrUtils::SetListStr(b_desc, ATTR_NAME_ACTIVE_LABEL_LIST, {"1"}); + const auto &b_node = graph->AddNode(b_desc); + + const auto &c_desc = std::make_shared("C", "testa"); + c_desc->AddInputDesc(GeTensorDesc()); + c_desc->AddOutputDesc(GeTensorDesc()); + c_desc->SetStreamId(2); + AttrUtils::SetStr(c_desc, ATTR_NAME_STREAM_LABEL, "1"); + const auto &c_node = graph->AddNode(c_desc); + + const auto &d_desc = std::make_shared("D", "testa"); + d_desc->AddInputDesc(GeTensorDesc()); + d_desc->AddOutputDesc(GeTensorDesc()); + d_desc->SetStreamId(1); + const auto &d_node = graph->AddNode(d_desc); + + const auto &e_desc = std::make_shared("E", "testa"); + e_desc->AddInputDesc(GeTensorDesc()); + e_desc->AddOutputDesc(GeTensorDesc()); + e_desc->SetStreamId(2); + const auto &e_node = graph->AddNode(e_desc); + + const auto &f_desc = std::make_shared("F", "testa"); + f_desc->AddInputDesc(GeTensorDesc()); + f_desc->AddInputDesc(GeTensorDesc()); + f_desc->AddOutputDesc(GeTensorDesc()); + f_desc->SetStreamId(2); + const auto &f_node = graph->AddNode(f_desc); + + std::vector node_list(400); + for (int i = 0; i < 400; i++) { + const auto &op_desc = std::make_shared("X", DATA); + op_desc->AddInputDesc(GeTensorDesc()); + op_desc->AddOutputDesc(GeTensorDesc()); + op_desc->SetStreamId(2); + node_list[i] = graph->AddNode(op_desc); + } + + GraphUtils::AddEdge(a_node->GetOutDataAnchor(0), b_node->GetInDataAnchor(0)); + GraphUtils::AddEdge(a_node->GetOutDataAnchor(0), c_node->GetInDataAnchor(0)); + GraphUtils::AddEdge(b_node->GetOutDataAnchor(0), d_node->GetInDataAnchor(0)); + GraphUtils::AddEdge(d_node->GetOutDataAnchor(0), f_node->GetInDataAnchor(0)); + GraphUtils::AddEdge(c_node->GetOutDataAnchor(0), node_list[0]->GetInDataAnchor(0)); + for (uint32_t i = 0; i < 399; i++) { + GraphUtils::AddEdge(node_list[i]->GetOutDataAnchor(0), node_list[i + 1]->GetInDataAnchor(0)); + } + GraphUtils::AddEdge(node_list[399]->GetOutDataAnchor(0), e_node->GetInDataAnchor(0)); + GraphUtils::AddEdge(e_node->GetOutDataAnchor(0), f_node->GetInDataAnchor(1)); + } +}; + +TEST_F(UtestStreamAllocator, test_split_streams_active) { + const auto &graph = std::make_shared("test_split_streams_active_graph"); + EXPECT_NE(graph, nullptr); + make_graph_active(graph); + + StreamAllocator allocator(graph, Graph2SubGraphInfoList()); + allocator.stream_num_ = 3; + EXPECT_EQ(allocator.SetActiveStreamsByLabel(), SUCCESS); + std::vector> split_stream(3); + EXPECT_EQ(allocator.SplitStreams(split_stream), SUCCESS); + EXPECT_EQ(allocator.UpdateActiveStreams(split_stream), SUCCESS); + EXPECT_EQ(allocator.SetActiveStreamsForLoop(), SUCCESS); + EXPECT_EQ(allocator.specific_activated_streams_.count(3), 1); + + const auto &node_b = graph->FindNode("B"); + EXPECT_NE(node_b, nullptr); + std::vector active_stream_list; + EXPECT_TRUE(AttrUtils::GetListInt(node_b->GetOpDesc(), ATTR_NAME_ACTIVE_STREAM_LIST, active_stream_list)); + EXPECT_EQ(active_stream_list.size(), 2); + const auto &node_e = graph->FindNode("E"); + EXPECT_NE(node_e, nullptr); + EXPECT_EQ(active_stream_list[0], node_e->GetOpDesc()->GetStreamId()); + EXPECT_EQ(active_stream_list[1], 3); +} + +TEST_F(UtestStreamAllocator, test_update_active_streams_for_subgraph) { + const auto &root_graph = std::make_shared("test_update_active_streams_for_subgraph_root_graph"); + EXPECT_NE(root_graph, nullptr); + root_graph->SetGraphUnknownFlag(false); + const auto &sub_graph1 = std::make_shared("test_update_active_streams_for_subgraph_sub_graph1"); + EXPECT_NE(sub_graph1, nullptr); + root_graph->AddSubGraph(sub_graph1); + const auto &sub_graph2 = std::make_shared("test_update_active_streams_for_subgraph_sub_graph2"); + EXPECT_NE(sub_graph2, nullptr); + root_graph->AddSubGraph(sub_graph2); + + const auto &case_desc = std::make_shared("case", CASE); + EXPECT_NE(case_desc, nullptr); + EXPECT_EQ(case_desc->AddInputDesc(GeTensorDesc()), GRAPH_SUCCESS); + EXPECT_EQ(case_desc->AddOutputDesc(GeTensorDesc()), GRAPH_SUCCESS); + case_desc->AddSubgraphName("branch1"); + case_desc->SetSubgraphInstanceName(0, "test_update_active_streams_for_subgraph_sub_graph1"); + case_desc->AddSubgraphName("branch2"); + case_desc->SetSubgraphInstanceName(1, "test_update_active_streams_for_subgraph_sub_graph2"); + const auto &case_node = root_graph->AddNode(case_desc); + EXPECT_NE(case_node, nullptr); + sub_graph1->SetParentNode(case_node); + sub_graph2->SetParentNode(case_node); + + const auto &active_desc1 = std::make_shared("active1", STREAMACTIVE); + EXPECT_NE(active_desc1, nullptr); + EXPECT_TRUE(AttrUtils::SetListInt(active_desc1, ATTR_NAME_ACTIVE_STREAM_LIST, {0})); + const auto &active_node1 = sub_graph1->AddNode(active_desc1); + EXPECT_NE(active_node1, nullptr); + + const auto &active_desc2 = std::make_shared("active2", STREAMACTIVE); + EXPECT_NE(active_desc2, nullptr); + EXPECT_TRUE(AttrUtils::SetListInt(active_desc2, ATTR_NAME_ACTIVE_STREAM_LIST, {1})); + const auto &active_node2 = sub_graph2->AddNode(active_desc2); + EXPECT_NE(active_node2, nullptr); + + StreamAllocator allocator(root_graph, Graph2SubGraphInfoList()); + allocator.node_split_stream_map_[active_node1] = 2; + allocator.node_split_stream_map_[active_node2] = 3; + allocator.split_ori_stream_map_[2] = 0; + allocator.subgraph_first_active_node_map_[sub_graph1] = active_node1; + allocator.subgraph_first_active_node_map_[sub_graph2] = active_node2; + EXPECT_EQ(allocator.UpdateActiveStreamsForSubgraphs(), SUCCESS); + std::vector active_stream_list1; + EXPECT_TRUE(AttrUtils::GetListInt(active_node1->GetOpDesc(), ATTR_NAME_ACTIVE_STREAM_LIST, active_stream_list1)); + EXPECT_EQ(active_stream_list1.size(), 1); + EXPECT_EQ(active_stream_list1[0], 0); + std::vector active_stream_list2; + EXPECT_TRUE(AttrUtils::GetListInt(active_node2->GetOpDesc(), ATTR_NAME_ACTIVE_STREAM_LIST, active_stream_list2)); + EXPECT_EQ(active_stream_list2.size(), 2); + EXPECT_EQ(active_stream_list2[0], 1); + EXPECT_EQ(active_stream_list2[1], 3); + EXPECT_EQ(allocator.specific_activated_streams_.size(), 1); + EXPECT_EQ(allocator.specific_activated_streams_.count(3), 1); +} +} diff --git a/tests/ut/ge/graph/manager/host_mem_allocator_unittest.cc b/tests/ut/ge/graph/manager/host_mem_allocator_unittest.cc new file mode 100644 index 00000000..3d8e4890 --- /dev/null +++ b/tests/ut/ge/graph/manager/host_mem_allocator_unittest.cc @@ -0,0 +1,40 @@ +/** + * Copyright 2021 Huawei Technologies Co., Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include +#include + +#define protected public +#define private public +#include "graph/manager/host_mem_allocator.h" +#undef protected +#undef private + +namespace ge { +class UtestHostMemManagerTest : public testing::Test { + protected: + void SetUp() {} + void TearDown() {} +}; + +TEST_F(UtestHostMemManagerTest, malloc_zero_size) { + HostMemAllocator allocator(RT_MEMORY_HBM); + EXPECT_EQ(allocator.allocated_blocks_.size(), 0); + EXPECT_EQ(allocator.Malloc(nullptr, 0), nullptr); + EXPECT_EQ(allocator.allocated_blocks_.size(), 1); + EXPECT_EQ(allocator.Malloc(nullptr, 1), nullptr); + EXPECT_EQ(allocator.allocated_blocks_.size(), 1); +} +} // namespace ge diff --git a/tests/ut/ge/hybrid/executor/hybrid_model_pipeline_executor_unittest.cc b/tests/ut/ge/hybrid/executor/hybrid_model_pipeline_executor_unittest.cc new file mode 100644 index 00000000..7dc5a191 --- /dev/null +++ b/tests/ut/ge/hybrid/executor/hybrid_model_pipeline_executor_unittest.cc @@ -0,0 +1,68 @@ +/** + * Copyright 2021 Huawei Technologies Co., Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#include + +#define private public +#define protected public +#include "hybrid/executor/hybrid_model_pipeline_executor.h" +#include "graph/ge_context.h" + +namespace ge { +using namespace hybrid; + +class UtestStageExecutor : public testing::Test { + protected: + void SetUp() {} + void TearDown() { } +}; + +TEST_F(UtestStageExecutor, run_success) { + ComputeGraphPtr graph = std::make_shared("test"); + GeRootModelPtr ge_root_model = std::make_shared(graph); + HybridModel hybrid_model(ge_root_model); + hybrid_model.root_graph_item_ = std::unique_ptr(new(std::nothrow)GraphItem()); + + PipeExecutionConfig config; + config.device_id = 0; + config.num_executors = 2; + config.num_stages = 1; + config.iteration_end = 2; + rtCtxGetCurrent(&config.rt_context); + StageExecutor executor(0, &hybrid_model, &config); + StageExecutor next_executor(1, &hybrid_model, &config); + executor.SetNext(&next_executor); + EXPECT_EQ(executor.Init(), SUCCESS); + + auto allocator = NpuMemoryAllocator::GetAllocator(config.device_id); + EXPECT_NE(allocator, nullptr); + StageExecutor::StageTask task_info_1; + task_info_1.stage = 0; + task_info_1.iteration = 0; + EXPECT_EQ(rtEventCreate(&task_info_1.event), RT_ERROR_NONE); + EXPECT_EQ(executor.ExecuteAsync(task_info_1), SUCCESS); + EXPECT_EQ(executor.Start({}, {}, 2), SUCCESS); + + StageExecutor::StageTask task_info_2; + task_info_2.stage = 0; + task_info_2.iteration = 1; + EXPECT_EQ(rtEventCreate(&task_info_2.event), RT_ERROR_NONE); + EXPECT_EQ(executor.ExecuteAsync(task_info_2), SUCCESS); + EXPECT_EQ(executor.Start({}, {}, 2), SUCCESS); + executor.Reset(); +} +} // namespace ge diff --git a/tests/ut/ge/hybrid/executor/subgraph_executor_unittest.cc b/tests/ut/ge/hybrid/executor/subgraph_executor_unittest.cc index fbda3776..445382bc 100644 --- a/tests/ut/ge/hybrid/executor/subgraph_executor_unittest.cc +++ b/tests/ut/ge/hybrid/executor/subgraph_executor_unittest.cc @@ -264,4 +264,19 @@ TEST_F(UtestSubgraphExecutor, cond_graph_schedule_tasks) { ASSERT_EQ(state_it_f->second->GetSwitchIndex(), 0); ASSERT_EQ(graph_context.callback_manager->Destroy(), SUCCESS); } -} // namespace ge \ No newline at end of file + +TEST_F(UtestSubgraphExecutor, partial_execution_init) { + ComputeGraphPtr graph = std::make_shared("test"); + ASSERT_NE(graph, nullptr); + GeRootModelPtr ge_root_model = std::make_shared(graph); + ASSERT_NE(ge_root_model, nullptr); + HybridModel hybrid_model(ge_root_model); + hybrid_model.root_graph_item_ = std::unique_ptr(new(std::nothrow)GraphItem()); + hybrid_model.root_graph_item_->is_dynamic_ = false; + GraphExecutionContext graph_context; + SubgraphExecutor executor(hybrid_model.GetRootGraphItem(), &graph_context); + + ASSERT_EQ(executor.Init({}, {}), SUCCESS); + ASSERT_EQ(executor.InitForPartialExecution({}, {}), SUCCESS); +} +} // namespace ge diff --git a/tests/ut/ge/hybrid/model/hybrid_model_builder_unittest.cc b/tests/ut/ge/hybrid/model/hybrid_model_builder_unittest.cc index 9630b193..9c9dab17 100644 --- a/tests/ut/ge/hybrid/model/hybrid_model_builder_unittest.cc +++ b/tests/ut/ge/hybrid/model/hybrid_model_builder_unittest.cc @@ -27,6 +27,7 @@ #include "graph/utils/tensor_utils.h" #include "graph/utils/graph_utils.h" #include "graph/debug/ge_attr_define.h" +#include "graph/ge_local_context.h" using namespace std; using namespace testing; @@ -70,6 +71,15 @@ static NodePtr CreateNode(ComputeGraph &graph, const string &name, const string return graph.AddNode(op_desc); } +static NodePtr CreateConstantNode(const ComputeGraphPtr &graph, const string &name, size_t size) { + OpDescPtr op_desc = std::make_shared(name, CONSTANTOP); + op_desc->AddOutputDesc(GeTensorDesc()); + GeTensorPtr value = std::make_shared(GeTensorDesc(), size); + (void)AttrUtils::SetTensor(op_desc, ATTR_NAME_WEIGHTS, value); + + return graph->AddNode(op_desc); +} + TEST_F(UtestHybridModelBuilder, normal_hybrid_model_build) { /******************************************************************************* * Exit Identify @@ -230,4 +240,23 @@ TEST_F(UtestHybridModelBuilder, stream_switch_n_group) { AttrUtils::SetInt(switch_n->GetOpDesc(), ATTR_NAME_BATCH_NUM, batch_num); ASSERT_EQ(hybrid_model_builder.CreateStreamSwitchNGroup(switch_n, &node_item), SUCCESS); } -} // namespace ge \ No newline at end of file + +TEST_F(UtestHybridModelBuilder, init_constant_op_host_) { + ComputeGraphPtr graph = std::make_shared("test"); + GeRootModelPtr ge_root_model = make_shared(graph); + HybridModel hybrid_model(ge_root_model); + HybridModelBuilder hybrid_model_builder(hybrid_model); + + auto const_1 = CreateConstantNode(graph, "const_1", 0); + hybrid_model_builder.constant_op_nodes_.emplace(const_1->GetName(), const_1); + auto const_2 = CreateConstantNode(graph, "const_2", 10); + hybrid_model_builder.constant_op_nodes_.emplace(const_2->GetName(), const_2); + + std::map options; + options["ge.exec.placement"] = "HOST"; + GetThreadLocalContext().SetGraphOption(options); + + EXPECT_EQ(hybrid_model_builder.InitConstantOps(), SUCCESS); + EXPECT_EQ(hybrid_model_builder.hybrid_model_.variable_tensors_.size(), 2); +} +} // namespace ge