diff --git a/ge/CMakeLists.txt b/ge/CMakeLists.txt index 6f54cf7b..fb5b2ef6 100755 --- a/ge/CMakeLists.txt +++ b/ge/CMakeLists.txt @@ -380,6 +380,7 @@ set(TRAIN_SRC_LIST "single_op/task/tbe_task_builder.cc" "single_op/task/aicpu_task_builder.cc" "single_op/task/aicpu_kernel_task_builder.cc" + "single_op/task/rts_kernel_task_builder.cc" "hybrid/common/tensor_value.cc" "hybrid/common/npu_memory_allocator.cc" "hybrid/executor/rt_callback_manager.cc" diff --git a/ge/executor/CMakeLists.txt b/ge/executor/CMakeLists.txt index d7da8916..e188086d 100644 --- a/ge/executor/CMakeLists.txt +++ b/ge/executor/CMakeLists.txt @@ -65,6 +65,7 @@ set(SRC_LIST "../single_op/task/tbe_task_builder.cc" "../single_op/task/aicpu_task_builder.cc" "../single_op/task/aicpu_kernel_task_builder.cc" + "../single_op/task/rts_kernel_task_builder.cc" "../hybrid/common/tensor_value.cc" "../hybrid/common/npu_memory_allocator.cc" "../hybrid/executor/rt_callback_manager.cc" diff --git a/ge/single_op/single_op_model.cc b/ge/single_op/single_op_model.cc index 5b5f24a2..67642f2e 100755 --- a/ge/single_op/single_op_model.cc +++ b/ge/single_op/single_op_model.cc @@ -30,6 +30,7 @@ #include "runtime/rt.h" #include "task/aicpu_task_builder.h" #include "task/aicpu_kernel_task_builder.h" +#include "task/rts_kernel_task_builder.h" #include "task/tbe_task_builder.h" #include "hybrid/executor/hybrid_model_executor.h" #include "hybrid/node_executor/node_executor.h" @@ -266,7 +267,9 @@ Status SingleOpModel::ParseInputsAndOutputs() { for (auto &op_desc : data_ops_) { GE_CHK_STATUS_RET_NOLOG(ParseInputNode(op_desc)); } - ParseOutputNode(netoutput_op_); + if (netoutput_op_ != nullptr) { + ParseOutputNode(netoutput_op_); + } return SUCCESS; } @@ -323,10 +326,7 @@ Status SingleOpModel::BuildTaskList(StreamResource *stream_resource, SingleOp &s OpTask *task = nullptr; uint64_t singleop_kernel_id = aicpu_kernel_id++; GELOGI("Build singleOp CCTask, kernel_id = %lu", singleop_kernel_id); - auto ret = BuildCpuKernelTask(task_def.kernel(), &task, singleop_kernel_id); - if (ret != SUCCESS) { - return ret; - } + GE_CHK_STATUS_RET_NOLOG(BuildCpuKernelTask(task_def.kernel(), &task, singleop_kernel_id)); task->SetModelArgs(model_name_, model_id_); ParseArgTable(task, single_op); single_op.tasks_.emplace_back(task); @@ -345,13 +345,22 @@ Status SingleOpModel::BuildTaskList(StreamResource *stream_resource, SingleOp &s bool depend_compute_flag = false; uint64_t singleop_kernel_id = aicpu_kernel_id++; GELOGI("Build singleOp TfTask, kernel_id = %lu", singleop_kernel_id); - auto ret = BuildKernelExTask(task_def.kernel_ex(), &aicpu_task, false, depend_compute_flag, singleop_kernel_id); - if (ret != SUCCESS) { - return ret; - } + GE_CHK_STATUS_RET_NOLOG( + BuildKernelExTask(task_def.kernel_ex(), &aicpu_task, false, depend_compute_flag, singleop_kernel_id)); aicpu_task->SetModelArgs(model_name_, model_id_); ParseArgTable(aicpu_task, single_op); single_op.tasks_.emplace_back(aicpu_task); + } else if ((task_type == RT_MODEL_TASK_MEMCPY_ASYNC) || (task_type == RT_MODEL_TASK_MEMCPY_ADDR_ASYNC)) { + auto kernel_def = task_def.memcpy_async(); + auto node = op_list_[kernel_def.op_index()]; + GE_CHECK_NOTNULL(node); + auto op_desc = node->GetOpDesc(); + GE_CHECK_NOTNULL(op_desc); + std::unique_ptr task; + GE_CHK_STATUS_RET_NOLOG(RtsKernelTaskBuilder::BuildMemcpyAsyncTask(op_desc, kernel_def, model_params_, task)); + task->SetModelArgs(model_name_, model_id_); + ParseArgTable(task.get(), single_op); + single_op.tasks_.emplace_back(task.release()); } else { // skip GELOGD("Skip task type: %d", static_cast(task_type)); diff --git a/ge/single_op/single_op_model.h b/ge/single_op/single_op_model.h index e7d07ee0..529a442d 100755 --- a/ge/single_op/single_op_model.h +++ b/ge/single_op/single_op_model.h @@ -26,6 +26,7 @@ #include "common/helper/model_helper.h" #include "single_op/single_op.h" #include "single_op/stream_resource.h" +#include "single_op/task/op_task.h" namespace ge { struct SingleOpModelParam { diff --git a/ge/single_op/task/op_task.cc b/ge/single_op/task/op_task.cc index fbc3d68b..e48677f8 100755 --- a/ge/single_op/task/op_task.cc +++ b/ge/single_op/task/op_task.cc @@ -34,6 +34,7 @@ namespace ge { namespace { constexpr int kLaunchRetryTimes = 1000; +constexpr size_t kMemcpyArgCount = 2; constexpr int kSleepTime = 10; constexpr uint64_t kReleaseFlag = 1; constexpr int kCopyNum = 2; @@ -963,4 +964,17 @@ void AiCpuCCTask::GetIoAddr(uintptr_t *&arg_base, size_t &arg_count) { arg_base = io_addr_; arg_count = io_addr_num_; } + +Status MemcpyAsyncTask::LaunchKernel(rtStream_t stream) { + auto src_addr = reinterpret_cast(addresses_[0]); + auto dst_addr = reinterpret_cast(addresses_[1]); + kind_ = (kind_ == RT_MEMCPY_ADDR_DEVICE_TO_DEVICE) ? RT_MEMCPY_DEVICE_TO_DEVICE : kind_; + GE_CHK_RT_RET(rtMemcpyAsync(dst_addr, dst_max_, src_addr, count_, kind_, stream)); + return SUCCESS; +} + +void MemcpyAsyncTask::GetIoAddr(uintptr_t *&arg_base, size_t &arg_count) { + arg_base = addresses_; + arg_count = kMemcpyArgCount; +} } // namespace ge diff --git a/ge/single_op/task/op_task.h b/ge/single_op/task/op_task.h index 0c64ecb4..ed6cf40f 100644 --- a/ge/single_op/task/op_task.h +++ b/ge/single_op/task/op_task.h @@ -44,6 +44,9 @@ class OpTask { virtual Status UpdateArgTable(const SingleOpModelParam ¶m); void SetModelArgs(std::string model_name, uint32_t model_id); Status GetProfilingArgs(TaskDescInfo &task_desc_info, uint32_t &model_id); + void SetOpDesc(const OpDescPtr &op_desc) { + op_desc_ = op_desc; + } const OpDescPtr &GetOpdesc() const {return op_desc_;} Status OpenDump(rtStream_t stream); virtual void GetIoAddr(uintptr_t *&arg_base, size_t &arg_count) = 0; @@ -244,6 +247,22 @@ private: std::string op_type_; uint64_t kernel_id_ = 0; }; + +class MemcpyAsyncTask : public OpTask { + public: + Status LaunchKernel(rtStream_t stream) override; + void GetIoAddr(uintptr_t *&arg_base, size_t &arg_count) override; + + private: + friend class SingleOpModel; + friend class RtsKernelTaskBuilder; + + uintptr_t addresses_[2]; + size_t dst_max_; + size_t count_; + rtMemcpyKind_t kind_; + NodePtr node_; +}; } // namespace ge #endif // GE_SINGLE_OP_TASK_OP_TASK_H_ diff --git a/ge/single_op/task/rts_kernel_task_builder.cc b/ge/single_op/task/rts_kernel_task_builder.cc new file mode 100644 index 00000000..aad78fd9 --- /dev/null +++ b/ge/single_op/task/rts_kernel_task_builder.cc @@ -0,0 +1,45 @@ +/** + * Copyright 2021 Huawei Technologies Co., Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "single_op/task/rts_kernel_task_builder.h" +#include "build_task_utils.h" + +namespace ge { +namespace { +const size_t kNumAddresses = 2; +} // namespace + +Status RtsKernelTaskBuilder::BuildMemcpyAsyncTask(const OpDescPtr &op_desc, + const domi::MemcpyAsyncDef &kernel_def, + const SingleOpModelParam ¶m, + std::unique_ptr &task) { + task.reset(new(std::nothrow)MemcpyAsyncTask()); + GE_CHECK_NOTNULL(task); + task->SetOpDesc(op_desc); + task->dst_max_ = kernel_def.dst_max(); + task->count_ = kernel_def.count(); + task->kind_ = static_cast(kernel_def.kind()); + auto addresses = BuildTaskUtils::JoinAddresses(BuildTaskUtils::GetAddresses(op_desc, param, false)); + if (addresses.size() != kNumAddresses) { + GELOGE(INTERNAL_ERROR, "[Build][MemcpyAsyncTask] Invalid address count: %zu", addresses.size()); + return INTERNAL_ERROR; + } + + task->addresses_[0] = reinterpret_cast(addresses[0]); + task->addresses_[1] = reinterpret_cast(addresses[1]); + return SUCCESS; +} +} // namespace ge \ No newline at end of file diff --git a/ge/single_op/task/rts_kernel_task_builder.h b/ge/single_op/task/rts_kernel_task_builder.h new file mode 100644 index 00000000..80bf92a3 --- /dev/null +++ b/ge/single_op/task/rts_kernel_task_builder.h @@ -0,0 +1,34 @@ +/** + * Copyright 2021 Huawei Technologies Co., Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef GE_SINGLE_OP_TASK_RTS_KERNEL_TASK_BUILDER_H_ +#define GE_SINGLE_OP_TASK_RTS_KERNEL_TASK_BUILDER_H_ + +#include +#include "graph/op_desc.h" +#include "single_op/single_op.h" +#include "single_op/single_op_model.h" + +namespace ge { +class RtsKernelTaskBuilder { + public: + static Status BuildMemcpyAsyncTask(const OpDescPtr &op_desc, + const domi::MemcpyAsyncDef &kernel_def, + const SingleOpModelParam ¶m, + std::unique_ptr &task); +}; +} // namespace ge +#endif // GE_SINGLE_OP_TASK_RTS_KERNEL_TASK_BUILDER_H_ \ No newline at end of file diff --git a/tests/ut/ge/CMakeLists.txt b/tests/ut/ge/CMakeLists.txt index ec0b146c..9ca694d6 100755 --- a/tests/ut/ge/CMakeLists.txt +++ b/tests/ut/ge/CMakeLists.txt @@ -585,6 +585,7 @@ set(SINGLE_OP_SRC_FILES "${GE_CODE_DIR}/ge/single_op/single_op_manager.cc" "${GE_CODE_DIR}/ge/single_op/task/aicpu_task_builder.cc" "${GE_CODE_DIR}/ge/single_op/task/aicpu_kernel_task_builder.cc" + "${GE_CODE_DIR}/ge/single_op/task/rts_kernel_task_builder.cc" "${GE_CODE_DIR}/ge/hybrid/common/tensor_value.cc" "${GE_CODE_DIR}/ge/hybrid/common/npu_memory_allocator.cc" "${GE_CODE_DIR}/ge/hybrid/executor/rt_callback_manager.cc" diff --git a/tests/ut/ge/single_op/single_op_model_unittest.cc b/tests/ut/ge/single_op/single_op_model_unittest.cc index f5d1a83c..a2c1cb02 100644 --- a/tests/ut/ge/single_op/single_op_model_unittest.cc +++ b/tests/ut/ge/single_op/single_op_model_unittest.cc @@ -25,6 +25,11 @@ #define private public #include "single_op/single_op_model.h" #include "single_op/task/tbe_task_builder.h" +#include "single_op/task/rts_kernel_task_builder.h" +#include "single_op/task/op_task.h" +#include "framework/common/helper/model_helper.h" +#include "single_op/single_op.h" +#include "single_op/stream_resource.h" #undef private #undef protected #include "graph/passes/graph_builder_utils.h" @@ -240,3 +245,45 @@ TEST_F(UtestSingleOpModel, test_host_mem) { DynamicSingleOp single_op(0, &stream_mu_, nullptr); ASSERT_EQ(model.SetHostMemTensor(single_op), SUCCESS); } + +TEST_F(UtestSingleOpModel, BuildTaskList) { + ComputeGraphPtr graph = make_shared("single_op"); + GeModelPtr ge_model = make_shared(); + ge_model->SetGraph(GraphUtils::CreateGraphFromComputeGraph(graph)); + shared_ptr model_task_def = make_shared(); + ge_model->SetModelTaskDef(model_task_def); + NodePtr node = nullptr; + { + auto op_desc = std::make_shared("memcpy", MEMCPYASYNC); + GeTensorDesc tensor(GeShape(), FORMAT_NCHW, DT_FLOAT); + op_desc->AddInputDesc(tensor); + op_desc->AddOutputDesc(tensor); + op_desc->SetInputOffset({0}); + op_desc->SetOutputOffset({0}); + node = graph->AddNode(op_desc); + + domi::TaskDef *task_def = model_task_def->add_task(); + task_def->set_stream_id(0); + task_def->set_type(RT_MODEL_TASK_MEMCPY_ASYNC); + domi::MemcpyAsyncDef *memcpy_async = task_def->mutable_memcpy_async(); + memcpy_async->set_src(0); + memcpy_async->set_dst(0); + memcpy_async->set_dst_max(512); + memcpy_async->set_count(1); + memcpy_async->set_kind(RT_MEMCPY_DEVICE_TO_DEVICE); + memcpy_async->set_op_index(0); + } + + string model_data_str = "123456789"; + SingleOpModel model("model", model_data_str.c_str(), model_data_str.size()); + StreamResource *res = new (std::nothrow) StreamResource(1); + std::mutex stream_mu; + rtStream_t stream = nullptr; + rtStreamCreate(&stream, 0); + SingleOp single_op(res, &stream_mu, stream); + model.model_helper_.model_ = ge_model; + model.op_list_.emplace(0, node); + ASSERT_EQ(model.BuildTaskList(res, single_op), SUCCESS); + MemcpyAsyncTask mem_task; + ASSERT_EQ(mem_task.LaunchKernel(0), SUCCESS); +}