diff --git a/ge/hybrid/node_executor/aicpu/aicpu_node_executor.cc b/ge/hybrid/node_executor/aicpu/aicpu_node_executor.cc index a2e610b4..16a42f9c 100755 --- a/ge/hybrid/node_executor/aicpu/aicpu_node_executor.cc +++ b/ge/hybrid/node_executor/aicpu/aicpu_node_executor.cc @@ -22,7 +22,6 @@ #include "graph/utils/node_utils.h" #include "hybrid/executor/hybrid_execution_context.h" #include "hybrid/model/hybrid_model.h" -#include "opskernel_manager/ops_kernel_builder_manager.h" namespace ge { namespace hybrid { @@ -352,10 +351,54 @@ Status AicpuTfNodeTask::Init(const HybridModel &model) { GELOGD("[%s] Is GetNext, set need sync to true, node type = %s", node_name_.c_str(), node_type.c_str()); need_sync_ = true; } + auto task_defs = model.GetTaskDefs(node_item_->node); + if (unknown_type_ == DEPEND_COMPUTE) { + GE_CHK_STATUS_RET_NOLOG(SetMemCopyTask((*task_defs)[1])); + } GELOGI("Node[%s] init end.", node_name_.c_str()); return SUCCESS; } +Status AicpuTfNodeTask::SetMemCopyTask(const domi::TaskDef &task_def) { + if (node_item_->num_outputs == 0) { + GELOGD("Node[%s] type[%s] has no output, no need set mem_copy task.", + node_name_.c_str(), node_item_->node_type.c_str()); + return SUCCESS; + } + + GELOGD("Start to set memcpy task for node[%s].", node_name_.c_str()); + const domi::KernelExDef &kernel_def = task_def.kernel_ex(); + if (kernel_def.args_size() > sizeof(STR_FWK_OP_KERNEL)) { + GELOGE(PARAM_INVALID, "sizeof STR_FWK_OP_KERNEL is: %lu, but args_size is: %d", + sizeof(STR_FWK_OP_KERNEL), kernel_def.args_size()); + return PARAM_INVALID; + } + STR_FWK_OP_KERNEL aicpu_task = {0}; + auto sec_ret = memcpy_s(&aicpu_task, sizeof(STR_FWK_OP_KERNEL), + kernel_def.args().data(), kernel_def.args_size()); + if (sec_ret != EOK) { + GELOGE(FAILED, "memcpy failed, ret: %d", sec_ret); + return FAILED; + } + + GE_CHK_STATUS_RET(AllocTensorBuffer(kernel_def.task_info_size(), copy_workspace_buf_), + "Node[%s] alloc copy task workspace buf failed, size=%zu.", + node_name_.c_str(), kernel_def.task_info_size()); + + GE_CHK_RT_RET(rtMemcpy(copy_workspace_buf_->GetData(), kernel_def.task_info_size(), + kernel_def.task_info().data(), kernel_def.task_info_size(), RT_MEMCPY_HOST_TO_DEVICE)); + + aicpu_task.fwkKernelBase.fwk_kernel.inputOutputAddr = reinterpret_cast(copy_ioaddr_dev_->GetData()); + aicpu_task.fwkKernelBase.fwk_kernel.workspaceBaseAddr = reinterpret_cast(copy_workspace_buf_->GetData()); + aicpu_task.fwkKernelBase.fwk_kernel.extInfoAddr = 0; + aicpu_task.fwkKernelBase.fwk_kernel.extInfoLen = 0; + + GE_CHK_RT_RET(rtMemcpy(copy_task_args_buf_->GetData(), sizeof(STR_FWK_OP_KERNEL), + &aicpu_task, sizeof(STR_FWK_OP_KERNEL), RT_MEMCPY_HOST_TO_DEVICE)); + GELOGD("Set memcpy task for node[%s] successfully.", node_name_.c_str()); + return SUCCESS; +} + uint64_t AicpuTfNodeTask::GetStepIdAddr(const HybridModel &model) { // get step_id_addr auto var_tensor = model.GetVariable(NODE_NAME_GLOBAL_STEP); @@ -407,32 +450,7 @@ Status AicpuTfNodeTask::CopyDataToHbm(TaskContext &context, "Node[%s] has %d outputs but out shape is %zu.", node_name_.c_str(), node_item_->num_outputs, out_shape_hbm.size()); - uint64_t copy_num = 0; - GE_CHK_STATUS_RET_NOLOG(PrepareCopyInputs(context, out_shape_hbm, copy_num)); - - STR_FWK_OP_KERNEL aicpu_task = {0}; - std::string task_info; - RECORD_CALLBACK_EVENT(context.GetExecutionContext(), node_name_.c_str(), - "[GenMemCopyTask] Start"); - GE_CHK_STATUS_RET_NOLOG(GenMemCopyTask(copy_num, aicpu_task, task_info)); - RECORD_CALLBACK_EVENT(context.GetExecutionContext(), node_name_.c_str(), - "[GenMemCopyTask] End"); - - std::unique_ptr kernel_workspace_buf; - GE_CHK_STATUS_RET(AllocTensorBuffer(task_info.size(), kernel_workspace_buf), - "Node[%s] alloc copy task workspace buf failed, size=%zu.", - node_name_.c_str(), task_info.size()); - - GE_CHK_RT_RET(rtMemcpy(kernel_workspace_buf->GetData(), task_info.size(), - task_info.data(), task_info.size(), RT_MEMCPY_HOST_TO_DEVICE)); - - aicpu_task.fwkKernelBase.fwk_kernel.inputOutputAddr = reinterpret_cast(copy_ioaddr_dev_->GetData()); - aicpu_task.fwkKernelBase.fwk_kernel.workspaceBaseAddr = reinterpret_cast(kernel_workspace_buf->GetData()); - aicpu_task.fwkKernelBase.fwk_kernel.extInfoAddr = 0; - aicpu_task.fwkKernelBase.fwk_kernel.extInfoLen = 0; - - GE_CHK_RT_RET(rtMemcpy(copy_task_args_buf_->GetData(), sizeof(STR_FWK_OP_KERNEL), - &aicpu_task, sizeof(STR_FWK_OP_KERNEL), RT_MEMCPY_HOST_TO_DEVICE)); + GE_CHK_STATUS_RET_NOLOG(PrepareCopyInputs(context, out_shape_hbm)); RECORD_CALLBACK_EVENT(context.GetExecutionContext(), node_name_.c_str(), "[LaunchCopy] Start"); GE_CHK_RT_RET(rtKernelLaunchEx(copy_task_args_buf_->GetData(), sizeof(STR_FWK_OP_KERNEL), @@ -445,8 +463,7 @@ Status AicpuTfNodeTask::CopyDataToHbm(TaskContext &context, } Status AicpuTfNodeTask::PrepareCopyInputs(const TaskContext &context, - const std::vector> &out_shape_hbm, - uint64_t ©_num) { + const std::vector> &out_shape_hbm) { std::vector copy_input_release_flag; std::vector copy_input_data_size; std::vector copy_input_src; @@ -458,34 +475,23 @@ Status AicpuTfNodeTask::PrepareCopyInputs(const TaskContext &context, node_name_.c_str(), i, summary.shape_data_ptr, summary.shape_data_size, summary.raw_data_ptr, summary.raw_data_size); - if (summary.raw_data_size > 0) { - auto output = context.GetOutput(i); - GE_CHECK_NOTNULL(output); - GE_CHECK_NOTNULL(output->GetData()); - copy_input_release_flag.emplace_back(kReleaseFlag); - copy_input_data_size.emplace_back(summary.raw_data_size); - copy_input_src.emplace_back(summary.raw_data_ptr); - copy_input_dst.emplace_back(reinterpret_cast(output->GetData())); - } - - if (summary.shape_data_size > 0) { - const auto &shape_buffer = out_shape_hbm[i]; - GE_CHECK_NOTNULL(shape_buffer); - GE_CHECK_NOTNULL(shape_buffer->GetData()); - copy_input_release_flag.emplace_back(kReleaseFlag); - copy_input_data_size.emplace_back(summary.shape_data_size); - copy_input_src.emplace_back(summary.shape_data_ptr); - copy_input_dst.emplace_back(reinterpret_cast(shape_buffer->GetData())); - } + auto output = context.GetOutput(i); + GE_CHECK_NOTNULL(output); + copy_input_release_flag.emplace_back(kReleaseFlag); + copy_input_data_size.emplace_back(summary.raw_data_size); + copy_input_src.emplace_back(summary.raw_data_ptr); + copy_input_dst.emplace_back(reinterpret_cast(output->GetData())); + + const auto &shape_buffer = out_shape_hbm[i]; + GE_CHECK_NOTNULL(shape_buffer); + copy_input_release_flag.emplace_back(kReleaseFlag); + copy_input_data_size.emplace_back(summary.shape_data_size); + copy_input_src.emplace_back(summary.shape_data_ptr); + copy_input_dst.emplace_back(reinterpret_cast(shape_buffer->GetData())); } - copy_num = copy_input_release_flag.size(); - - GE_CHK_BOOL_RET_STATUS(copy_num > 0, INTERNAL_ERROR, - "Node[%s] need copy num is 0", node_name_.c_str()); - - // copy task need copy output and output shape - const size_t copy_input_buf_len = copy_num * sizeof(uint64_t); + // copy task need copy all output_data and output_shape, len is 2 * output_num + const size_t copy_input_buf_len = node_item_->num_outputs * 2 * sizeof(uint64_t); GE_CHK_RT_RET(rtMemcpy(copy_input_release_flag_dev_->GetData(), copy_input_release_flag_dev_->GetSize(), ©_input_release_flag[0], copy_input_buf_len, RT_MEMCPY_HOST_TO_DEVICE)); @@ -498,15 +504,6 @@ Status AicpuTfNodeTask::PrepareCopyInputs(const TaskContext &context, return SUCCESS; } -Status AicpuTfNodeTask::GenMemCopyTask(uint64_t copy_num, STR_FWK_OP_KERNEL &task, std::string &task_info) { - static constexpr const char *const kKernelLibName = "aicpu_tf_kernel"; - auto kernel_builder = OpsKernelBuilderManager::Instance().GetOpsKernelBuilder(kKernelLibName); - GE_CHK_BOOL_RET_STATUS(kernel_builder != nullptr, FAILED, "Get op kernel info store[%s] failed", kKernelLibName); - auto ret = kernel_builder->GenMemCopyTask(copy_num, task, task_info); - GE_CHK_STATUS_RET(ret, "Call aicpu GenMemCopyTask failed, copy_num=%lu, ret=%u", copy_num, ret); - return SUCCESS; -} - Status AicpuTfNodeTask::UpdateShapeByHbmBuffer(TaskContext &context, const std::vector> &out_shape_hbm) { GE_CHK_BOOL_RET_STATUS(out_shape_hbm.size() == static_cast(node_item_->num_outputs), @@ -813,9 +810,9 @@ Status AiCpuNodeExecutor::LoadTask(const HybridModel &model, GE_CHK_BOOL_RET_STATUS((*task_defs).size() == 1, PARAM_INVALID, "Node[%s] task_def num[%zu] != 1", node->GetName().c_str(), (*task_defs).size()); } else { - // The number of tasks of the fourth type operator may be 2 - GE_CHK_BOOL_RET_STATUS((*task_defs).size() == 1 || (*task_defs).size() == 2, PARAM_INVALID, - "Node[%s] DEPEND_COMPUTE task_def num[%zu] != 1 or 2", + // The number of tasks of the fourth type operator must be 2 + GE_CHK_BOOL_RET_STATUS((*task_defs).size() == 2, PARAM_INVALID, + "Node[%s] DEPEND_COMPUTE task_def num[%zu] != 2", node->GetName().c_str(), (*task_defs).size()); } const auto &task_def = (*task_defs)[0]; diff --git a/ge/hybrid/node_executor/aicpu/aicpu_node_executor.h b/ge/hybrid/node_executor/aicpu/aicpu_node_executor.h index 8f0b1d0a..b9cc8256 100644 --- a/ge/hybrid/node_executor/aicpu/aicpu_node_executor.h +++ b/ge/hybrid/node_executor/aicpu/aicpu_node_executor.h @@ -98,6 +98,8 @@ class AicpuTfNodeTask : public AicpuNodeTaskBase { Status UpdateIoAddr(TaskContext &context) override; private: + Status SetMemCopyTask(const domi::TaskDef &task_def); + Status InitForDependComputeTask(); Status UpdateShapeAndDataByResultSummary(TaskContext &context); @@ -117,11 +119,9 @@ class AicpuTfNodeTask : public AicpuNodeTaskBase { const std::vector> &out_shape_hbm); Status PrepareCopyInputs(const TaskContext &context, - const std::vector> &out_shape_hbm, - uint64_t ©_num); + const std::vector> &out_shape_hbm); static Status EnsureSessionCreated(uint64_t session_id); - static Status GenMemCopyTask(uint64_t count, STR_FWK_OP_KERNEL &task, std::string &task_info); static uint64_t GetStepIdAddr(const HybridModel &model); private: // kernel buf, device mem @@ -145,6 +145,8 @@ class AicpuTfNodeTask : public AicpuNodeTaskBase { std::unique_ptr copy_input_src_dev_; std::unique_ptr copy_input_dst_dev_; bool need_sync_ = false; + + std::unique_ptr copy_workspace_buf_; }; class AicpuNodeTask : public AicpuNodeTaskBase {