Browse Source

updated stop task

pull/388/head
tzwang 10 months ago
parent
commit
cc816c9a18
3 changed files with 83 additions and 38 deletions
  1. +37
    -5
      internal/logic/schedule/schedulecanceltasklogic.go
  2. +8
    -0
      internal/scheduler/database/aiStorage.go
  3. +38
    -33
      internal/scheduler/service/utils/status/taskStatusSync.go

+ 37
- 5
internal/logic/schedule/schedulecanceltasklogic.go View File

@@ -3,6 +3,10 @@ package schedule
import (
"context"
"errors"
"fmt"
"gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/service/utils/status"
"gitlink.org.cn/JointCloud/pcm-coordinator/pkg/constants"
"gitlink.org.cn/JointCloud/pcm-coordinator/pkg/models"
"strconv"

"gitlink.org.cn/JointCloud/pcm-coordinator/internal/svc"
@@ -26,17 +30,33 @@ func NewScheduleCancelTaskLogic(ctx context.Context, svcCtx *svc.ServiceContext)
}

func (l *ScheduleCancelTaskLogic) ScheduleCancelTask(req *types.CancelTaskReq) (resp *types.CancelTaskResp, err error) {
// find task
tasks, err := l.svcCtx.Scheduler.AiStorages.GetAiTaskListById(req.TaskId)
task, err := l.svcCtx.Scheduler.AiService.Storage.GetTaskById(req.TaskId)
if err != nil {
return nil, err
}

if len(tasks) == 0 {
return nil, errors.New("failed to stop task, ai sub tasks have not been created")
if (&models.Task{} == task) {
return nil, errors.New("failed to cancel task, task not found")
}

t := tasks[0]
// find ai tasks
aitasks, err := l.svcCtx.Scheduler.AiStorages.GetAiTaskListById(task.Id)
if err != nil {
return nil, err
}

if len(aitasks) == 0 {
return nil, errors.New("failed to cancel task, ai sub tasks have not been created")
}

// update status
status.UpdateAiTask(l.svcCtx, aitasks...)

t := aitasks[0]

if t.Status != constants.Running {
return nil, fmt.Errorf("failed to cancel task, ai sub tasks is %s", t.Status)
}

// assume a task has only one sub ai task
err = l.svcCtx.Scheduler.AiService.AiExecutorAdapterMap[strconv.FormatInt(t.AdapterId, 10)][strconv.FormatInt(t.ClusterId, 10)].Stop(l.ctx, t.JobId)
@@ -44,5 +64,17 @@ func (l *ScheduleCancelTaskLogic) ScheduleCancelTask(req *types.CancelTaskReq) (
return nil, err
}

t.Status = constants.Cancelled
err = l.svcCtx.Scheduler.AiService.Storage.UpdateAiTask(t)
if err != nil {
return nil, err
}

task.Status = constants.Cancelled
err = l.svcCtx.Scheduler.AiService.Storage.UpdateTaskByModel(task)
if err != nil {
return nil, err
}

return resp, nil
}

+ 8
- 0
internal/scheduler/database/aiStorage.go View File

@@ -330,6 +330,14 @@ func (s *AiStorage) UpdateAiTask(task *models.TaskAi) error {
return nil
}

func (s *AiStorage) UpdateTaskByModel(task *models.Task) error {
tx := s.DbEngin.Updates(task)
if tx.Error != nil {
return tx.Error
}
return nil
}

func (s *AiStorage) GetStrategyCode(name string) (int64, error) {
var strategy int64
sqlStr := `select t_dict_item.item_value


+ 38
- 33
internal/scheduler/service/utils/status/taskStatusSync.go View File

@@ -7,6 +7,7 @@ import (
"gitlink.org.cn/JointCloud/pcm-coordinator/internal/svc"
"gitlink.org.cn/JointCloud/pcm-coordinator/internal/types"
"gitlink.org.cn/JointCloud/pcm-coordinator/pkg/constants"
"gitlink.org.cn/JointCloud/pcm-coordinator/pkg/models"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"net/http"
@@ -19,7 +20,7 @@ func UpdateTaskStatus(svc *svc.ServiceContext, tasklist []*types.TaskModel) {
list := make([]*types.TaskModel, len(tasklist))
copy(list, tasklist)
for i := len(list) - 1; i >= 0; i-- {
if list[i].AdapterTypeDict != "1" || list[i].Status == constants.Succeeded || list[i].Status == constants.Failed {
if list[i].AdapterTypeDict != "1" || list[i].Status == constants.Succeeded || list[i].Status == constants.Failed || list[i].Status == constants.Cancelled {
list = append(list[:i], list[i+1:]...)
}
}
@@ -240,38 +241,7 @@ func updateInferTaskStatus(svc *svc.ServiceContext, task types.TaskModel) {
}
}

func UpdateAiTaskStatus(svc *svc.ServiceContext, tasklist []*types.TaskModel) {
list := make([]*types.TaskModel, len(tasklist))
copy(list, tasklist)
for i := len(list) - 1; i >= 0; i-- {
if list[i].AdapterTypeDict != "1" || list[i].Status == constants.Succeeded || list[i].Status == constants.Failed {
list = append(list[:i], list[i+1:]...)
}
}

if len(list) == 0 {
return
}

task := list[0]
for i := range list {
earliest, _ := time.Parse(constants.Layout, task.UpdatedTime)
latest, _ := time.Parse(constants.Layout, list[i].UpdatedTime)
if latest.Before(earliest) {
task = list[i]
}
}

aiTaskList, err := svc.Scheduler.AiStorages.GetAiTaskListById(task.Id)
if err != nil {
logx.Errorf(err.Error())
return
}

if len(aiTaskList) == 0 {
return
}

func UpdateAiTask(svc *svc.ServiceContext, aiTaskList ...*models.TaskAi) {
var wg sync.WaitGroup
for _, aitask := range aiTaskList {
t := aitask
@@ -336,6 +306,41 @@ func UpdateAiTaskStatus(svc *svc.ServiceContext, tasklist []*types.TaskModel) {
wg.Wait()
}

func UpdateAiTaskStatus(svc *svc.ServiceContext, tasklist []*types.TaskModel) {
list := make([]*types.TaskModel, len(tasklist))
copy(list, tasklist)
for i := len(list) - 1; i >= 0; i-- {
if list[i].AdapterTypeDict != "1" || list[i].Status == constants.Succeeded || list[i].Status == constants.Failed {
list = append(list[:i], list[i+1:]...)
}
}

if len(list) == 0 {
return
}

task := list[0]
for i := range list {
earliest, _ := time.Parse(constants.Layout, task.UpdatedTime)
latest, _ := time.Parse(constants.Layout, list[i].UpdatedTime)
if latest.Before(earliest) {
task = list[i]
}
}

aiTaskList, err := svc.Scheduler.AiStorages.GetAiTaskListById(task.Id)
if err != nil {
logx.Errorf(err.Error())
return
}

if len(aiTaskList) == 0 {
return
}

UpdateAiTask(svc, aiTaskList...)
}

func UpdateTrainingTaskStatus(svc *svc.ServiceContext, list []*types.AdapterInfo) {
var wg sync.WaitGroup
for _, adapter := range list {


Loading…
Cancel
Save