Browse Source

!1632 rw lock for GetOrCreateNodeState

From: @chen_yemeng
Reviewed-by: @sheng-nan,@ji_chen
Signed-off-by:
tags/v1.3.0
mindspore-ci-bot Gitee 3 years ago
parent
commit
716129a69a
22 changed files with 589 additions and 34 deletions
  1. +12
    -2
      ge/graph/build/stream_allocator.cc
  2. +3
    -2
      ge/graph/build/stream_allocator.h
  3. +1
    -0
      ge/graph/manager/graph_caching_allocator.cc
  4. +4
    -1
      ge/graph/manager/host_mem_allocator.cc
  5. +1
    -0
      ge/hybrid/executor/hybrid_execution_context.h
  6. +29
    -5
      ge/hybrid/executor/hybrid_model_pipeline_executor.cc
  7. +1
    -0
      ge/hybrid/executor/hybrid_model_pipeline_executor.h
  8. +5
    -0
      ge/hybrid/executor/node_done_manager.cc
  9. +2
    -0
      ge/hybrid/executor/node_done_manager.h
  10. +57
    -5
      ge/hybrid/executor/subgraph_context.cc
  11. +4
    -3
      ge/hybrid/executor/subgraph_context.h
  12. +15
    -1
      ge/hybrid/executor/subgraph_executor.cc
  13. +11
    -7
      ge/hybrid/model/hybrid_model_builder.cc
  14. +10
    -5
      ge/hybrid/node_executor/hccl/hccl_node_executor.cc
  15. +84
    -0
      tests/depends/mmpa/src/mmpa_stub.cc
  16. +3
    -1
      tests/depends/runtime/src/runtime_stub.cc
  17. +3
    -0
      tests/ut/ge/CMakeLists.txt
  18. +190
    -0
      tests/ut/ge/graph/build/stream_allocator_unittest.cc
  19. +40
    -0
      tests/ut/ge/graph/manager/host_mem_allocator_unittest.cc
  20. +68
    -0
      tests/ut/ge/hybrid/executor/hybrid_model_pipeline_executor_unittest.cc
  21. +16
    -1
      tests/ut/ge/hybrid/executor/subgraph_executor_unittest.cc
  22. +30
    -1
      tests/ut/ge/hybrid/model/hybrid_model_builder_unittest.cc

+ 12
- 2
ge/graph/build/stream_allocator.cc View File

@@ -905,6 +905,7 @@ Status StreamAllocator::SplitStreams(vector<set<int64_t>> &split_streams) {
added_stream_num_vec[stream_id]++;
new_stream_id_vec[stream_id] = last_stream_id;
split_streams[stream_id].emplace(last_stream_id);
split_ori_stream_map_[last_stream_id] = stream_id;
node_split_stream_map_[cur_node] = last_stream_id;

// Add the send/recv event to the first and last nodes of the split stream.
@@ -1104,7 +1105,7 @@ Status StreamAllocator::UpdateActiveStreamsForActiveNode(const vector<set<int64_
return SUCCESS;
}

Status StreamAllocator::UpdateActiveStreamsForSubgraphs() const {
Status StreamAllocator::UpdateActiveStreamsForSubgraphs() {
// Update active stream list for active nodes
for (auto &node_stream_pair : node_split_stream_map_) {
auto node = node_stream_pair.first;
@@ -1134,6 +1135,7 @@ Status StreamAllocator::UpdateActiveStreamsForSubgraphs() const {
if (IsActivated(new_split_stream)) {
continue;
}
specific_activated_streams_.emplace(new_split_stream);
new_active_streams.emplace(static_cast<uint32_t>(new_split_stream));
active_streams.assign(new_active_streams.begin(), new_active_streams.end());
if (!AttrUtils::SetListInt(active_op, ATTR_NAME_ACTIVE_STREAM_LIST, active_streams)) {
@@ -1148,13 +1150,21 @@ Status StreamAllocator::UpdateActiveStreamsForSubgraphs() const {
}

bool StreamAllocator::IsActivated(int64_t stream_id) const {
const auto &iter = split_ori_stream_map_.find(stream_id);
if (iter == split_ori_stream_map_.end()) {
REPORT_INNER_ERROR("E19999", "Find original stream_id failed, split_stream_id=%ld", stream_id);
GELOGE(INTERNAL_ERROR, "[CheckActivated][Check] Find original stream_id failed, split_stream_id=%ld", stream_id);
return false;
}
int64_t ori_stream_id = iter->second;
for (const auto &node : whole_graph_->GetNodes(whole_graph_->GetGraphUnknownFlag())) {
auto op_desc = node->GetOpDesc();
vector<uint32_t> active_streams;
if (op_desc == nullptr || !AttrUtils::GetListInt(op_desc, ATTR_NAME_ACTIVE_STREAM_LIST, active_streams)) {
continue;
}
if (std::find(active_streams.begin(), active_streams.end(), stream_id) != active_streams.end()) {
if (std::find(active_streams.begin(), active_streams.end(), stream_id) != active_streams.end() ||
std::find(active_streams.begin(), active_streams.end(), ori_stream_id) != active_streams.end()) {
return true;
}
}


+ 3
- 2
ge/graph/build/stream_allocator.h View File

@@ -66,7 +66,7 @@ class StreamAllocator {
Status UpdateActiveStreamsForSwitchNode(NodePtr &switch_node);
Status InsertActiveNodesAfterSwitch(NodePtr &switch_nodes, std::vector<NodePtr> &switch_active_nodes);
Status UpdateActiveStreamsForActiveNode(const std::vector<std::set<int64_t>> &split_streams, NodePtr &node);
Status UpdateActiveStreamsForSubgraphs() const;
Status UpdateActiveStreamsForSubgraphs();
bool IsActivated(int64_t stream_id) const;
Status SetActiveStreamsForLoop();
Status CheckStreamActived() const;
@@ -114,6 +114,7 @@ class StreamAllocator {
std::map<int64_t, std::set<NodePtr>> specific_activated_streams_nodes_map_;

std::map<NodePtr, int64_t> node_split_stream_map_;
std::map<int64_t, int64_t> split_ori_stream_map_;
std::map<ComputeGraphPtr, NodePtr> subgraph_first_active_node_map_;

// send events corresponding to the node
@@ -123,4 +124,4 @@ class StreamAllocator {
std::map<NodePtr, std::vector<uint32_t>> node_to_recv_events_;
};
} // namespace ge
#endif // GE_GRAPH_BUILD_STREAM_ALLOCATOR_H_
#endif // GE_GRAPH_BUILD_STREAM_ALLOCATOR_H_

+ 1
- 0
ge/graph/manager/graph_caching_allocator.cc View File

@@ -137,6 +137,7 @@ uint8_t *CachingAllocator::Malloc(size_t size, uint8_t *org_ptr, uint32_t device
uint8_t *ptr = nullptr;
Block *block = FindFreeBlock(size, org_ptr, device_id);
if (block == nullptr) {
std::lock_guard<std::recursive_mutex> lock(mutex_);
if (ge::SUCCESS == TryExtendCache(size, device_id)) {
block = FindFreeBlock(size, org_ptr, device_id);
if (block != nullptr) {


+ 4
- 1
ge/graph/manager/host_mem_allocator.cc View File

@@ -21,7 +21,10 @@
namespace ge {
const void *HostMemAllocator::Malloc(const std::shared_ptr<AlignedPtr> &aligned_ptr, size_t size) {
if (aligned_ptr == nullptr) {
GELOGW("Insert a null aligned_ptr");
GELOGW("Insert a null aligned_ptr, size=%zu", size);
if (size == 0) {
allocated_blocks_[nullptr] = { size, nullptr };
}
return nullptr;
}
GELOGD("allocate existed host memory succ, size=%zu", size);


+ 1
- 0
ge/hybrid/executor/hybrid_execution_context.h View File

@@ -62,6 +62,7 @@ struct GraphExecutionContext {
const HybridModel *model = nullptr;
const GEThreadLocalContext *ge_context = nullptr;
rtStream_t stream = nullptr;
rtStream_t hccl_stream = nullptr;
rtContext_t rt_context = nullptr;
rtContext_t rt_gen_context = nullptr;
std::unique_ptr<CallbackManager> callback_manager = nullptr;


+ 29
- 5
ge/hybrid/executor/hybrid_model_pipeline_executor.cc View File

@@ -18,14 +18,26 @@ const char *const kEnvProfilingLevel = "HYBRID_PROFILING_LEVEL";
StageExecutor::StageExecutor(int id, HybridModel *model, PipeExecutionConfig *config)
: id_(id), model_(model), pipe_config_(config) {}

StageExecutor::~StageExecutor() { GELOGD("~StageExecutor(), id = %d", id_); }
StageExecutor::~StageExecutor() {
GELOGD("~StageExecutor(), id = %d", id_);
if (stream_ != nullptr) {
GE_CHK_RT(rtStreamDestroy(stream_));
stream_ = nullptr;
}
if (hccl_stream_ != nullptr) {
GE_CHK_RT(rtStreamDestroy(hccl_stream_));
hccl_stream_ = nullptr;
}
}

Status StageExecutor::Init() {
GELOGD("[Executor: %d] Start to init StateExecutor", id_);
context_.rt_context = pipe_config_->rt_context;
GE_CHK_STATUS_RET_NOLOG(InitExecutionContext());
GE_CHK_RT_RET(rtStreamCreate(&stream_, RT_STREAM_PRIORITY_DEFAULT));
GE_CHK_RT_RET(rtStreamCreate(&hccl_stream_, RT_STREAM_PRIORITY_DEFAULT));
context_.stream = stream_;
context_.hccl_stream = hccl_stream_;

root_graph_executor_.reset(new (std::nothrow) SubgraphExecutor(model_->GetRootGraphItem(), &context_));
GE_CHECK_NOTNULL(root_graph_executor_);
@@ -78,11 +90,11 @@ Status StageExecutor::Start(const std::vector<TensorValue> &inputs, const std::v
if (task_info.event != nullptr) {
GELOGD("[%d] Add StreamWaitEvent", id_);
GE_CHK_RT_RET(rtStreamWaitEvent(stream_, task_info.event));
RECORD_MODEL_EXECUTION_EVENT(&context_, "[iteration = %ld] [Stage = %d] End", task_info.iteration - 1,
RECORD_MODEL_EXECUTION_EVENT(&context_, "[iteration = %ld] [Stage = %d] EventWait End", task_info.iteration,
task_info.stage);
}

RECORD_MODEL_EXECUTION_EVENT(&context_, "[iteration = %lld] [Stage = %d] Start", task_info.iteration,
RECORD_MODEL_EXECUTION_EVENT(&context_, "[iteration = %ld] [Stage = %d] Start", task_info.iteration,
task_info.stage);

if (task_info.stage == 0) {
@@ -102,6 +114,10 @@ Status StageExecutor::Start(const std::vector<TensorValue> &inputs, const std::v
StageTask next_task;
next_task.stage = task_info.stage;
next_task.iteration = task_info.iteration + 1;
if ((task_info.iteration + 1) % iteration_count > 0) {
GE_CHK_RT_RET(rtEventCreate(&next_task.event));
GE_CHK_RT_RET(rtEventRecord(next_task.event, context_.hccl_stream));
}

auto sync_result = Synchronize();
if (sync_result != SUCCESS) {
@@ -110,15 +126,22 @@ Status StageExecutor::Start(const std::vector<TensorValue> &inputs, const std::v
id_, sync_result, task_info.iteration);
REPORT_CALL_ERROR("E19999", "[Executor: %d] Failed to sync result:%d. iteration = %ld",
id_, sync_result, task_info.iteration);
context_.profiler->Dump(std::cout);
if (context_.profiler != nullptr) {
context_.profiler->Dump(std::cout);
}
context_.callback_manager->Destroy();
RuntimeInferenceContext::DestroyContext(std::to_string(context_.context_id));
return sync_result;
}
if (task_info.event != nullptr) {
GE_CHK_RT_RET(rtEventDestroy(task_info.event));
RECORD_MODEL_EXECUTION_EVENT(&context_, "[iteration = %ld] [Stage = %d] EventDestroy End", task_info.iteration,
task_info.stage);
}

RECORD_MODEL_EXECUTION_EVENT(&context_, "[iteration = %ld] [Stage = %d] End", task_info.iteration, task_info.stage);

// if not end stage
// if end stage
if (task_info.stage >= pipe_config_->num_stages - 1) {
RECORD_MODEL_EXECUTION_EVENT(&context_, "[iteration = %ld] Schedule End", task_info.iteration);
GELOGD("[Executor: %d] End of iteration [%ld]", id_, task_info.iteration);
@@ -163,6 +186,7 @@ Status StageExecutor::InitExecutionContext() {
context_.callback_manager = std::unique_ptr<CallbackManager>(new (std::nothrow) CallbackManager());
GE_CHECK_NOTNULL(context_.callback_manager);
context_.dump_properties = DumpManager::GetInstance().GetDumpProperties(context_.session_id);
context_.is_eos_ = false;
if (IsLogEnable(GE_MODULE_NAME, DLOG_DEBUG)) {
context_.trace_enabled = true;
}


+ 1
- 0
ge/hybrid/executor/hybrid_model_pipeline_executor.h View File

@@ -63,6 +63,7 @@ class StageExecutor {
StageExecutor *next_executor_ = nullptr;

rtStream_t stream_ = nullptr;
rtStream_t hccl_stream_ = nullptr;
};

class HybridModelPipelineExecutor {


+ 5
- 0
ge/hybrid/executor/node_done_manager.cc View File

@@ -121,5 +121,10 @@ void NodeDoneManager::Reset(const NodePtr &node) {
GELOGD("[%s] Node reset.", node->GetName().c_str());
}
}

void NodeDoneManager::Reset() {
subjects_.clear();
destroyed_ = false;
}
} // namespace hybrid
} // namespace ge

+ 2
- 0
ge/hybrid/executor/node_done_manager.h View File

@@ -35,6 +35,8 @@ class NodeDoneManager {

void Destroy();

void Reset();

private:
class Cond {
public:


+ 57
- 5
ge/hybrid/executor/subgraph_context.cc View File

@@ -15,8 +15,6 @@
*/

#include "subgraph_context.h"

#include "common/debug/log.h"
#include "hybrid/executor/hybrid_model_executor.h"

namespace ge {
@@ -25,6 +23,13 @@ SubgraphContext::SubgraphContext(const GraphItem *graph_item, const GraphExecuti
: graph_item_(graph_item), execution_context_(execution_context) {
}

SubgraphContext::~SubgraphContext() {
if (mmRWLockDestroy(&rw_lock_) != EN_OK) {
REPORT_CALL_ERROR("E19999", "Destroy rw_lock failed");
GELOGE(INTERNAL_ERROR, "[RWLock][Destroy] Destroy rw_lock failed");
}
}

Status SubgraphContext::Init() {
GE_CHECK_NOTNULL(graph_item_);
GELOGD("[%s] Start to init subgraph context. total inputs = %d, total outputs = %d",
@@ -33,7 +38,11 @@ Status SubgraphContext::Init() {
graph_item_->TotalOutputs());
all_inputs_.resize(static_cast<unsigned long>(graph_item_->TotalInputs()));
all_outputs_.resize(static_cast<unsigned long>(graph_item_->TotalOutputs()));

if (mmRWLockInit(&rw_lock_) != EN_OK) {
REPORT_CALL_ERROR("E19999", "Init rw_lock failed");
GELOGE(INTERNAL_ERROR, "[RWLock][Init] Init rw_lock failed");
return INTERNAL_ERROR;
}
return SUCCESS;
}

@@ -42,13 +51,48 @@ void SubgraphContext::ResetContext(const NodePtr &node) {
}

NodeStatePtr SubgraphContext::GetOrCreateNodeState(const NodeItem *node_item) {
std::lock_guard<std::mutex> lk(mu_);
GELOGD("[%s] lock for read", node_item->NodeName().c_str());
if (mmRWLockRDLock(&rw_lock_) != EN_OK) {
REPORT_CALL_ERROR("E19999", "[Node:%s] Lock for read failed", node_item->NodeName().c_str());
GELOGE(INTERNAL_ERROR, "[RWLock][Lock][Node:%s] Lock for read failed", node_item->NodeName().c_str());
return nullptr;
}
const auto &iter = node_states_.find(node_item);
if (iter != node_states_.end()) {
auto state = iter->second;
GELOGD("[%s] unlock for read", node_item->NodeName().c_str());
if (mmRDLockUnLock(&rw_lock_) != EN_OK) {
REPORT_CALL_ERROR("E19999", "[Node:%s] Unlock for read failed", node_item->NodeName().c_str());
GELOGE(INTERNAL_ERROR, "[RWLock][Unlock][Node:%s] Unlock for read failed", node_item->NodeName().c_str());
return nullptr;
}
return state;
}
GELOGD("[%s] unlock for read", node_item->NodeName().c_str());
if (mmRDLockUnLock(&rw_lock_) != EN_OK) {
REPORT_CALL_ERROR("E19999", "[Node:%s] Unlock for read failed", node_item->NodeName().c_str());
GELOGE(INTERNAL_ERROR, "[RWLock][Unlock][Node:%s] Unlock for read failed", node_item->NodeName().c_str());
return nullptr;
}

GELOGD("[%s] lock for write", node_item->NodeName().c_str());
if (mmRWLockWRLock(&rw_lock_) != EN_OK) {
REPORT_CALL_ERROR("E19999", "[Node:%s] Lock for write failed", node_item->NodeName().c_str());
GELOGE(INTERNAL_ERROR, "[RWLock][Lock][Node:%s] Lock for write failed", node_item->NodeName().c_str());
return nullptr;
}
auto &node_state = node_states_[node_item];
if (node_state == nullptr) {
const auto &guard = node_item->MutexGuard("GetOrCreateNodeState");
node_state.reset(new(std::nothrow)NodeState(*node_item, this));
node_state = std::move(std::unique_ptr<NodeState>(new(std::nothrow)NodeState(*node_item, this)));
(void)guard;
}
GELOGD("[%s] unlock for write", node_item->NodeName().c_str());
if (mmWRLockUnLock(&rw_lock_) != EN_OK) {
REPORT_CALL_ERROR("E19999", "[Node:%s] Unlock for write failed", node_item->NodeName().c_str());
GELOGE(INTERNAL_ERROR, "[RWLock][Unlock][Node:%s] Unlock for write failed", node_item->NodeName().c_str());
return nullptr;
}

return node_state;
}
@@ -144,5 +188,13 @@ void SubgraphContext::OnError(Status error) {
void SubgraphContext::NodeDone(const NodePtr &node) {
node_done_manager_.NodeDone(node);
}

void SubgraphContext::Reset() {
node_done_manager_.Reset();
if (mmRWLockWRLock(&rw_lock_) == EN_OK) {
node_states_.clear();
(void)mmWRLockUnLock(&rw_lock_);
}
}
} // namespace hybrid
} // namespace ge

+ 4
- 3
ge/hybrid/executor/subgraph_context.h View File

@@ -18,7 +18,7 @@
#define GE_HYBRID_EXECUTOR_ITERATION_CONTEXT_H_

#include <vector>
#include "mmpa/mmpa_api.h"
#include "hybrid/common/tensor_value.h"
#include "hybrid/executor/hybrid_execution_context.h"
#include "hybrid/executor/node_state.h"
@@ -31,10 +31,11 @@ namespace hybrid {
class SubgraphContext {
public:
explicit SubgraphContext(const GraphItem *graph_item, const GraphExecutionContext *execution_context);
~SubgraphContext() = default;
~SubgraphContext();

Status Init();
void ResetContext(const NodePtr &node);
void Reset();
NodeStatePtr GetOrCreateNodeState(const NodeItem *node_item);

void OnError(Status error);
@@ -52,7 +53,7 @@ class SubgraphContext {
friend class TaskContext;
const GraphItem *graph_item_;
const GraphExecutionContext *execution_context_;
std::mutex mu_;
mmRWLock_t rw_lock_;
std::vector<TensorValue> all_inputs_;
std::vector<TensorValue> all_outputs_;
NodeDoneManager node_done_manager_;


+ 15
- 1
ge/hybrid/executor/subgraph_executor.cc View File

@@ -704,7 +704,21 @@ Status SubgraphExecutor::PartialExecuteAsync(int task_group) {

Status SubgraphExecutor::InitForPartialExecution(const vector<TensorValue> &inputs,
const vector<ConstGeTensorDescPtr> &input_desc) {
return Init(inputs, input_desc);
if (subgraph_context_ == nullptr) {
return Init(inputs, input_desc);
}
subgraph_context_->Reset();
if (graph_item_->IsDynamic()) {
GE_CHK_STATUS_RET(InitInputsForUnknownShape(inputs, input_desc),
"[%s] Failed to set inputs.",
graph_item_->GetName().c_str());
} else {
GE_CHK_STATUS_RET(InitInputsForKnownShape(inputs),
"[Invoke][InitInputsForKnownShape][%s] Failed to init subgraph executor for known shape subgraph",
graph_item_->GetName().c_str());
}

return SUCCESS;
}
} // namespace hybrid
} // namespace ge

+ 11
- 7
ge/hybrid/model/hybrid_model_builder.cc View File

@@ -1005,14 +1005,18 @@ Status HybridModelBuilder::InitConstantOps() {
// Tensors return by api GetWeights share data with proto, whose addr is not confirmed to be aligned
GeTensor aligned_tensor = ge_tensor->Clone();
GELOGD("Init tensor with host constant %s size = %zu", var_name.c_str(), aligned_tensor.MutableData().GetSize());
if (MemManager::Instance().HostMemInstance(RT_MEMORY_HBM).Malloc(aligned_tensor.GetAlignedPtr(),
aligned_tensor.GetData().size()) == nullptr) {
GELOGE(MEMALLOC_FAILED, "[Malloc][HostMemory] for an existed GeTensor failed, model_name_:%s.",
GetGraphName());
return MEMALLOC_FAILED;
if (aligned_tensor.GetData().size() > 0) {
if (MemManager::Instance().HostMemInstance(RT_MEMORY_HBM).Malloc(aligned_tensor.GetAlignedPtr(),
aligned_tensor.GetData().size()) == nullptr) {
GELOGE(MEMALLOC_FAILED, "[Malloc][HostMemory] for an existed GeTensor failed, model_name_:%s.",
GetGraphName());
return MEMALLOC_FAILED;
}
var_tensor.reset(new(std::nothrow)TensorValue(aligned_tensor.MutableData().data(),
aligned_tensor.GetData().size()));
} else {
var_tensor.reset(new(std::nothrow)TensorValue(nullptr, 0));
}
var_tensor.reset(new(std::nothrow)TensorValue(aligned_tensor.MutableData().data(),
aligned_tensor.GetData().size()));
} else {
GE_CHK_STATUS_RET_NOLOG(VarNodeToTensor(var_node, var_tensor));
GELOGD("Init const op tensor. name = %s, size = %ld", var_name.c_str(), var_tensor->GetSize());


+ 10
- 5
ge/hybrid/node_executor/hccl/hccl_node_executor.cc View File

@@ -314,21 +314,26 @@ Status RdmaNodeTask::ExecuteAsync(TaskContext &context, std::function<void()> do
return SUCCESS;
}

rtEvent_t evt = nullptr;
if (context.GetExecutionContext()->hccl_stream != nullptr) {
GE_CHK_RT_RET(rtEventCreateWithFlag(&evt, 0x01));
GE_CHK_RT_RET(rtStreamWaitEvent(context.GetExecutionContext()->hccl_stream, evt));
}
TaskContext *p_ctx = &context;
auto callback = [p_ctx, done_callback](HcclResult status) {
auto callback = [p_ctx, done_callback, evt](HcclResult status) {
if (status != HCCL_SUCCESS) {
GELOGE(HCCL_E_INTERNAL, "[Call][HcomExcutorInitialize] failed for node:%s(%s), ret: 0x%X",
p_ctx->GetNodeName(), p_ctx->GetNodeItem().NodeType().c_str(), status);
p_ctx->SetStatus(FAILED);
}
done_callback();
if (evt != nullptr) {
GE_CHK_RT_RET(rtEventRecord(evt, nullptr));
GE_CHK_RT_RET(rtEventDestroy(evt));
}
GELOGI("rdma callback success.");
};

std::string executor_type = context.GetNodeItem().NodeType();
if (kRdmaScatterTypes.count(context.GetNodeItem().NodeType()) > 0) {
executor_type = context.GetNodeItem().NodeType() == HCOMREMOTEREFREAD ? HCOMREMOTEREAD : HCOMREMOTEWRITE;
}
HcclResult hccl_ret = HcomExecEnqueueRemoteAccess(context.GetNodeItem().NodeType(), addr_infos, callback);
if (hccl_ret != HCCL_SUCCESS) {
GELOGE(HCCL_E_INTERNAL, "[Call][HcomExecEnqueueRemoteAccess] failed for node:%s(%s), ret: 0x%X",


+ 84
- 0
tests/depends/mmpa/src/mmpa_stub.cc View File

@@ -242,6 +242,90 @@ INT32 mmRealPath(const CHAR *path, CHAR *realPath, INT32 realPathLen)
return ret;
}

INT32 mmRWLockInit(mmRWLock_t *rwLock)
{
if (rwLock == NULL) {
return EN_INVALID_PARAM;
}

INT32 ret = pthread_rwlock_init(rwLock, NULL);
if (ret != MMPA_ZERO) {
return EN_ERROR;
}

return EN_OK;
}

INT32 mmRWLockRDLock(mmRWLock_t *rwLock)
{
if (rwLock == NULL) {
return EN_INVALID_PARAM;
}

INT32 ret = pthread_rwlock_rdlock(rwLock);
if (ret != MMPA_ZERO) {
return EN_ERROR;
}

return EN_OK;
}

INT32 mmRWLockWRLock(mmRWLock_t *rwLock)
{
if (rwLock == NULL) {
return EN_INVALID_PARAM;
}

INT32 ret = pthread_rwlock_wrlock(rwLock);
if (ret != MMPA_ZERO) {
return EN_ERROR;
}

return EN_OK;
}

INT32 mmRDLockUnLock(mmRWLock_t *rwLock)
{
if (rwLock == NULL) {
return EN_INVALID_PARAM;
}

INT32 ret = pthread_rwlock_unlock(rwLock);
if (ret != MMPA_ZERO) {
return EN_ERROR;
}

return EN_OK;
}

INT32 mmWRLockUnLock(mmRWLock_t *rwLock)
{
if (rwLock == NULL) {
return EN_INVALID_PARAM;
}

INT32 ret = pthread_rwlock_unlock(rwLock);
if (ret != MMPA_ZERO) {
return EN_ERROR;
}

return EN_OK;
}

INT32 mmRWLockDestroy(mmRWLock_t *rwLock)
{
if (rwLock == NULL) {
return EN_INVALID_PARAM;
}

INT32 ret = pthread_rwlock_destroy(rwLock);
if (ret != MMPA_ZERO) {
return EN_ERROR;
}

return EN_OK;
}

INT32 mmGetErrorCode()
{
return 0;


+ 3
- 1
tests/depends/runtime/src/runtime_stub.cc View File

@@ -434,6 +434,8 @@ rtError_t rtGetRtCapability(rtFeatureType_t featureType, int32_t featureInfo, in

rtError_t rtGetMaxStreamAndTask(uint32_t streamType, uint32_t *maxStrCount, uint32_t *maxTaskCount)
{
*maxStrCount = 1024;
*maxTaskCount = 1024;
return RT_ERROR_NONE;
}

@@ -456,4 +458,4 @@ rtError_t rtDebugUnRegisterForStream(rtStream_t stream) {
}
#ifdef __cplusplus
}
#endif
#endif

+ 3
- 0
tests/ut/ge/CMakeLists.txt View File

@@ -784,6 +784,7 @@ set(MULTI_PARTS_TEST_FILES
"common/ge_auth_file_saver_unittest.cc"
"graph/variable_accelerate_ctrl_unittest.cc"
"graph/build/logical_stream_allocator_unittest.cc"
"graph/build/stream_allocator_unittest.cc"
"graph/build/model_builder_unittest.cc"
"graph/build/mem_assigner_unittest.cc"
"graph/build/task_generator_unittest.cc"
@@ -792,6 +793,7 @@ set(MULTI_PARTS_TEST_FILES
"graph/preprocess/graph_preprocess_unittest.cc"
"graph/manager/hcom_util_unittest.cc"
"graph/manager/graph_caching_allocator_unittest.cc"
"graph/manager/host_mem_allocator_unittest.cc"
"graph/manager/session_scope_mem_allocator_unittest.cc"
"graph/manager/run_graph_unittest.cc"
"graph/partition/dynamic_shape_partition_unittest.cc"
@@ -831,6 +833,7 @@ set(HYBRID_TEST_FILES
"hybrid/model/hybrid_model_builder_unittest.cc"
"hybrid/node_executor/rts/rts_node_task_unittest.cc"
"hybrid/executor/hybrid_model_async_executor_unittest.cc"
"hybrid/executor/hybrid_model_pipeline_executor_unittest.cc"
"hybrid/node_executor/aicore/aicore_task_compiler_unittest.cc"
)



+ 190
- 0
tests/ut/ge/graph/build/stream_allocator_unittest.cc View File

@@ -0,0 +1,190 @@
/**
* Copyright 2021 Huawei Technologies Co., Ltd
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#include <string>
#include <vector>
#include <gtest/gtest.h>

#define protected public
#define private public
#include "graph/build/stream_allocator.h"
#undef protected
#undef private

#include "graph/debug/ge_attr_define.h"
#include "graph/utils/graph_utils.h"

namespace ge {
class UtestStreamAllocator : public testing::Test {
protected:
void SetUp() {}
void TearDown() {}
public:

///
/// A
/// / \
/// B C
/// | |
/// D 400
/// | |
/// | E
/// \ /
/// F
///
void make_graph_active(const ComputeGraphPtr &graph) {
const auto &a_desc = std::make_shared<OpDesc>("A", DATA);
a_desc->AddInputDesc(GeTensorDesc());
a_desc->AddOutputDesc(GeTensorDesc());
a_desc->SetStreamId(0);
const auto &a_node = graph->AddNode(a_desc);

const auto &b_desc = std::make_shared<OpDesc>("B", "testa");
b_desc->AddInputDesc(GeTensorDesc());
b_desc->AddOutputDesc(GeTensorDesc());
b_desc->SetStreamId(1);
AttrUtils::SetListStr(b_desc, ATTR_NAME_ACTIVE_LABEL_LIST, {"1"});
const auto &b_node = graph->AddNode(b_desc);

const auto &c_desc = std::make_shared<OpDesc>("C", "testa");
c_desc->AddInputDesc(GeTensorDesc());
c_desc->AddOutputDesc(GeTensorDesc());
c_desc->SetStreamId(2);
AttrUtils::SetStr(c_desc, ATTR_NAME_STREAM_LABEL, "1");
const auto &c_node = graph->AddNode(c_desc);

const auto &d_desc = std::make_shared<OpDesc>("D", "testa");
d_desc->AddInputDesc(GeTensorDesc());
d_desc->AddOutputDesc(GeTensorDesc());
d_desc->SetStreamId(1);
const auto &d_node = graph->AddNode(d_desc);

const auto &e_desc = std::make_shared<OpDesc>("E", "testa");
e_desc->AddInputDesc(GeTensorDesc());
e_desc->AddOutputDesc(GeTensorDesc());
e_desc->SetStreamId(2);
const auto &e_node = graph->AddNode(e_desc);

const auto &f_desc = std::make_shared<OpDesc>("F", "testa");
f_desc->AddInputDesc(GeTensorDesc());
f_desc->AddInputDesc(GeTensorDesc());
f_desc->AddOutputDesc(GeTensorDesc());
f_desc->SetStreamId(2);
const auto &f_node = graph->AddNode(f_desc);

std::vector<NodePtr> node_list(400);
for (int i = 0; i < 400; i++) {
const auto &op_desc = std::make_shared<OpDesc>("X", DATA);
op_desc->AddInputDesc(GeTensorDesc());
op_desc->AddOutputDesc(GeTensorDesc());
op_desc->SetStreamId(2);
node_list[i] = graph->AddNode(op_desc);
}

GraphUtils::AddEdge(a_node->GetOutDataAnchor(0), b_node->GetInDataAnchor(0));
GraphUtils::AddEdge(a_node->GetOutDataAnchor(0), c_node->GetInDataAnchor(0));
GraphUtils::AddEdge(b_node->GetOutDataAnchor(0), d_node->GetInDataAnchor(0));
GraphUtils::AddEdge(d_node->GetOutDataAnchor(0), f_node->GetInDataAnchor(0));
GraphUtils::AddEdge(c_node->GetOutDataAnchor(0), node_list[0]->GetInDataAnchor(0));
for (uint32_t i = 0; i < 399; i++) {
GraphUtils::AddEdge(node_list[i]->GetOutDataAnchor(0), node_list[i + 1]->GetInDataAnchor(0));
}
GraphUtils::AddEdge(node_list[399]->GetOutDataAnchor(0), e_node->GetInDataAnchor(0));
GraphUtils::AddEdge(e_node->GetOutDataAnchor(0), f_node->GetInDataAnchor(1));
}
};

TEST_F(UtestStreamAllocator, test_split_streams_active) {
const auto &graph = std::make_shared<ComputeGraph>("test_split_streams_active_graph");
EXPECT_NE(graph, nullptr);
make_graph_active(graph);

StreamAllocator allocator(graph, Graph2SubGraphInfoList());
allocator.stream_num_ = 3;
EXPECT_EQ(allocator.SetActiveStreamsByLabel(), SUCCESS);
std::vector<std::set<int64_t>> split_stream(3);
EXPECT_EQ(allocator.SplitStreams(split_stream), SUCCESS);
EXPECT_EQ(allocator.UpdateActiveStreams(split_stream), SUCCESS);
EXPECT_EQ(allocator.SetActiveStreamsForLoop(), SUCCESS);
EXPECT_EQ(allocator.specific_activated_streams_.count(3), 1);

const auto &node_b = graph->FindNode("B");
EXPECT_NE(node_b, nullptr);
std::vector<uint32_t> active_stream_list;
EXPECT_TRUE(AttrUtils::GetListInt(node_b->GetOpDesc(), ATTR_NAME_ACTIVE_STREAM_LIST, active_stream_list));
EXPECT_EQ(active_stream_list.size(), 2);
const auto &node_e = graph->FindNode("E");
EXPECT_NE(node_e, nullptr);
EXPECT_EQ(active_stream_list[0], node_e->GetOpDesc()->GetStreamId());
EXPECT_EQ(active_stream_list[1], 3);
}

TEST_F(UtestStreamAllocator, test_update_active_streams_for_subgraph) {
const auto &root_graph = std::make_shared<ComputeGraph>("test_update_active_streams_for_subgraph_root_graph");
EXPECT_NE(root_graph, nullptr);
root_graph->SetGraphUnknownFlag(false);
const auto &sub_graph1 = std::make_shared<ComputeGraph>("test_update_active_streams_for_subgraph_sub_graph1");
EXPECT_NE(sub_graph1, nullptr);
root_graph->AddSubGraph(sub_graph1);
const auto &sub_graph2 = std::make_shared<ComputeGraph>("test_update_active_streams_for_subgraph_sub_graph2");
EXPECT_NE(sub_graph2, nullptr);
root_graph->AddSubGraph(sub_graph2);

const auto &case_desc = std::make_shared<OpDesc>("case", CASE);
EXPECT_NE(case_desc, nullptr);
EXPECT_EQ(case_desc->AddInputDesc(GeTensorDesc()), GRAPH_SUCCESS);
EXPECT_EQ(case_desc->AddOutputDesc(GeTensorDesc()), GRAPH_SUCCESS);
case_desc->AddSubgraphName("branch1");
case_desc->SetSubgraphInstanceName(0, "test_update_active_streams_for_subgraph_sub_graph1");
case_desc->AddSubgraphName("branch2");
case_desc->SetSubgraphInstanceName(1, "test_update_active_streams_for_subgraph_sub_graph2");
const auto &case_node = root_graph->AddNode(case_desc);
EXPECT_NE(case_node, nullptr);
sub_graph1->SetParentNode(case_node);
sub_graph2->SetParentNode(case_node);

const auto &active_desc1 = std::make_shared<OpDesc>("active1", STREAMACTIVE);
EXPECT_NE(active_desc1, nullptr);
EXPECT_TRUE(AttrUtils::SetListInt(active_desc1, ATTR_NAME_ACTIVE_STREAM_LIST, {0}));
const auto &active_node1 = sub_graph1->AddNode(active_desc1);
EXPECT_NE(active_node1, nullptr);

const auto &active_desc2 = std::make_shared<OpDesc>("active2", STREAMACTIVE);
EXPECT_NE(active_desc2, nullptr);
EXPECT_TRUE(AttrUtils::SetListInt(active_desc2, ATTR_NAME_ACTIVE_STREAM_LIST, {1}));
const auto &active_node2 = sub_graph2->AddNode(active_desc2);
EXPECT_NE(active_node2, nullptr);

StreamAllocator allocator(root_graph, Graph2SubGraphInfoList());
allocator.node_split_stream_map_[active_node1] = 2;
allocator.node_split_stream_map_[active_node2] = 3;
allocator.split_ori_stream_map_[2] = 0;
allocator.subgraph_first_active_node_map_[sub_graph1] = active_node1;
allocator.subgraph_first_active_node_map_[sub_graph2] = active_node2;
EXPECT_EQ(allocator.UpdateActiveStreamsForSubgraphs(), SUCCESS);
std::vector<uint32_t> active_stream_list1;
EXPECT_TRUE(AttrUtils::GetListInt(active_node1->GetOpDesc(), ATTR_NAME_ACTIVE_STREAM_LIST, active_stream_list1));
EXPECT_EQ(active_stream_list1.size(), 1);
EXPECT_EQ(active_stream_list1[0], 0);
std::vector<uint32_t> active_stream_list2;
EXPECT_TRUE(AttrUtils::GetListInt(active_node2->GetOpDesc(), ATTR_NAME_ACTIVE_STREAM_LIST, active_stream_list2));
EXPECT_EQ(active_stream_list2.size(), 2);
EXPECT_EQ(active_stream_list2[0], 1);
EXPECT_EQ(active_stream_list2[1], 3);
EXPECT_EQ(allocator.specific_activated_streams_.size(), 1);
EXPECT_EQ(allocator.specific_activated_streams_.count(3), 1);
}
}

+ 40
- 0
tests/ut/ge/graph/manager/host_mem_allocator_unittest.cc View File

@@ -0,0 +1,40 @@
/**
* Copyright 2021 Huawei Technologies Co., Ltd
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include <gtest/gtest.h>
#include <memory>

#define protected public
#define private public
#include "graph/manager/host_mem_allocator.h"
#undef protected
#undef private

namespace ge {
class UtestHostMemManagerTest : public testing::Test {
protected:
void SetUp() {}
void TearDown() {}
};

TEST_F(UtestHostMemManagerTest, malloc_zero_size) {
HostMemAllocator allocator(RT_MEMORY_HBM);
EXPECT_EQ(allocator.allocated_blocks_.size(), 0);
EXPECT_EQ(allocator.Malloc(nullptr, 0), nullptr);
EXPECT_EQ(allocator.allocated_blocks_.size(), 1);
EXPECT_EQ(allocator.Malloc(nullptr, 1), nullptr);
EXPECT_EQ(allocator.allocated_blocks_.size(), 1);
}
} // namespace ge

+ 68
- 0
tests/ut/ge/hybrid/executor/hybrid_model_pipeline_executor_unittest.cc View File

@@ -0,0 +1,68 @@
/**
* Copyright 2021 Huawei Technologies Co., Ltd
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#include <gtest/gtest.h>
#include <gmock/gmock.h>

#define private public
#define protected public
#include "hybrid/executor/hybrid_model_pipeline_executor.h"
#include "graph/ge_context.h"

namespace ge {
using namespace hybrid;

class UtestStageExecutor : public testing::Test {
protected:
void SetUp() {}
void TearDown() { }
};

TEST_F(UtestStageExecutor, run_success) {
ComputeGraphPtr graph = std::make_shared<ComputeGraph>("test");
GeRootModelPtr ge_root_model = std::make_shared<GeRootModel>(graph);
HybridModel hybrid_model(ge_root_model);
hybrid_model.root_graph_item_ = std::unique_ptr<GraphItem>(new(std::nothrow)GraphItem());

PipeExecutionConfig config;
config.device_id = 0;
config.num_executors = 2;
config.num_stages = 1;
config.iteration_end = 2;
rtCtxGetCurrent(&config.rt_context);
StageExecutor executor(0, &hybrid_model, &config);
StageExecutor next_executor(1, &hybrid_model, &config);
executor.SetNext(&next_executor);
EXPECT_EQ(executor.Init(), SUCCESS);

auto allocator = NpuMemoryAllocator::GetAllocator(config.device_id);
EXPECT_NE(allocator, nullptr);
StageExecutor::StageTask task_info_1;
task_info_1.stage = 0;
task_info_1.iteration = 0;
EXPECT_EQ(rtEventCreate(&task_info_1.event), RT_ERROR_NONE);
EXPECT_EQ(executor.ExecuteAsync(task_info_1), SUCCESS);
EXPECT_EQ(executor.Start({}, {}, 2), SUCCESS);

StageExecutor::StageTask task_info_2;
task_info_2.stage = 0;
task_info_2.iteration = 1;
EXPECT_EQ(rtEventCreate(&task_info_2.event), RT_ERROR_NONE);
EXPECT_EQ(executor.ExecuteAsync(task_info_2), SUCCESS);
EXPECT_EQ(executor.Start({}, {}, 2), SUCCESS);
executor.Reset();
}
} // namespace ge

+ 16
- 1
tests/ut/ge/hybrid/executor/subgraph_executor_unittest.cc View File

@@ -264,4 +264,19 @@ TEST_F(UtestSubgraphExecutor, cond_graph_schedule_tasks) {
ASSERT_EQ(state_it_f->second->GetSwitchIndex(), 0);
ASSERT_EQ(graph_context.callback_manager->Destroy(), SUCCESS);
}
} // namespace ge

TEST_F(UtestSubgraphExecutor, partial_execution_init) {
ComputeGraphPtr graph = std::make_shared<ComputeGraph>("test");
ASSERT_NE(graph, nullptr);
GeRootModelPtr ge_root_model = std::make_shared<GeRootModel>(graph);
ASSERT_NE(ge_root_model, nullptr);
HybridModel hybrid_model(ge_root_model);
hybrid_model.root_graph_item_ = std::unique_ptr<GraphItem>(new(std::nothrow)GraphItem());
hybrid_model.root_graph_item_->is_dynamic_ = false;
GraphExecutionContext graph_context;
SubgraphExecutor executor(hybrid_model.GetRootGraphItem(), &graph_context);

ASSERT_EQ(executor.Init({}, {}), SUCCESS);
ASSERT_EQ(executor.InitForPartialExecution({}, {}), SUCCESS);
}
} // namespace ge

+ 30
- 1
tests/ut/ge/hybrid/model/hybrid_model_builder_unittest.cc View File

@@ -27,6 +27,7 @@
#include "graph/utils/tensor_utils.h"
#include "graph/utils/graph_utils.h"
#include "graph/debug/ge_attr_define.h"
#include "graph/ge_local_context.h"

using namespace std;
using namespace testing;
@@ -70,6 +71,15 @@ static NodePtr CreateNode(ComputeGraph &graph, const string &name, const string
return graph.AddNode(op_desc);
}

static NodePtr CreateConstantNode(const ComputeGraphPtr &graph, const string &name, size_t size) {
OpDescPtr op_desc = std::make_shared<OpDesc>(name, CONSTANTOP);
op_desc->AddOutputDesc(GeTensorDesc());
GeTensorPtr value = std::make_shared<GeTensor>(GeTensorDesc(), size);
(void)AttrUtils::SetTensor(op_desc, ATTR_NAME_WEIGHTS, value);

return graph->AddNode(op_desc);
}

TEST_F(UtestHybridModelBuilder, normal_hybrid_model_build) {
/*******************************************************************************
* Exit Identify
@@ -230,4 +240,23 @@ TEST_F(UtestHybridModelBuilder, stream_switch_n_group) {
AttrUtils::SetInt(switch_n->GetOpDesc(), ATTR_NAME_BATCH_NUM, batch_num);
ASSERT_EQ(hybrid_model_builder.CreateStreamSwitchNGroup(switch_n, &node_item), SUCCESS);
}
} // namespace ge

TEST_F(UtestHybridModelBuilder, init_constant_op_host_) {
ComputeGraphPtr graph = std::make_shared<ComputeGraph>("test");
GeRootModelPtr ge_root_model = make_shared<GeRootModel>(graph);
HybridModel hybrid_model(ge_root_model);
HybridModelBuilder hybrid_model_builder(hybrid_model);

auto const_1 = CreateConstantNode(graph, "const_1", 0);
hybrid_model_builder.constant_op_nodes_.emplace(const_1->GetName(), const_1);
auto const_2 = CreateConstantNode(graph, "const_2", 10);
hybrid_model_builder.constant_op_nodes_.emplace(const_2->GetName(), const_2);

std::map<std::string, string> options;
options["ge.exec.placement"] = "HOST";
GetThreadLocalContext().SetGraphOption(options);

EXPECT_EQ(hybrid_model_builder.InitConstantOps(), SUCCESS);
EXPECT_EQ(hybrid_model_builder.hybrid_model_.variable_tensors_.size(), 2);
}
} // namespace ge

Loading…
Cancel
Save