Browse Source

updated deployinstance status files

pull/308/head
tzwang 1 year ago
parent
commit
5153b6a31a
12 changed files with 118 additions and 94 deletions
  1. +5
    -4
      internal/cron/cron.go
  2. +2
    -2
      internal/logic/ai/getcenteroverviewlogic.go
  3. +2
    -2
      internal/logic/ai/getcentertasklistlogic.go
  4. +3
    -3
      internal/logic/core/pagelisttasklogic.go
  5. +4
    -4
      internal/logic/inference/deployinstancelistlogic.go
  6. +2
    -31
      internal/logic/inference/startallbydeploytaskidlogic.go
  7. +9
    -5
      internal/logic/inference/startdeployinstancelistlogic.go
  8. +2
    -31
      internal/logic/inference/stopallbydeploytaskidlogic.go
  9. +9
    -5
      internal/logic/inference/stopdeployinstancelogic.go
  10. +1
    -1
      internal/scheduler/service/utils/stat/clusterResources.go
  11. +78
    -5
      internal/scheduler/service/utils/status/deployInstance.go
  12. +1
    -1
      internal/scheduler/service/utils/status/taskStatusSync.go

+ 5
- 4
internal/cron/cron.go View File

@@ -16,7 +16,8 @@ package cron

import (
"github.com/zeromicro/go-zero/core/logx"
"gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/service/updater"
"gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/service/utils/stat"
"gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/service/utils/status"
"gitlink.org.cn/JointCloud/pcm-coordinator/internal/svc"
)

@@ -28,8 +29,8 @@ func AddCronGroup(svc *svc.ServiceContext) {
logx.Errorf(err.Error())
return
}
updater.UpdateTaskStatus(svc, list)
updater.UpdateAiTaskStatus(svc, list)
status.UpdateTaskStatus(svc, list)
status.UpdateAiTaskStatus(svc, list)
})

svc.Cron.AddFunc("*/5 * * * * ?", func() {
@@ -42,6 +43,6 @@ func AddCronGroup(svc *svc.ServiceContext) {
logx.Errorf(err.Error())
return
}
updater.UpdateClusterResources(svc, adapterList)
stat.UpdateClusterResources(svc, adapterList)
})
}

+ 2
- 2
internal/logic/ai/getcenteroverviewlogic.go View File

@@ -3,7 +3,7 @@ package ai
import (
"context"
"github.com/zeromicro/go-zero/core/logx"
"gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/service/updater"
"gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/service/utils/stat"
"gitlink.org.cn/JointCloud/pcm-coordinator/internal/svc"
"gitlink.org.cn/JointCloud/pcm-coordinator/internal/types"
)
@@ -37,7 +37,7 @@ func (l *GetCenterOverviewLogic) GetCenterOverview() (resp *types.CenterOverview
centerNum = int32(len(adapterList))
resp.CenterNum = centerNum

go updater.UpdateClusterResources(l.svcCtx, adapterList)
go stat.UpdateClusterResources(l.svcCtx, adapterList)

for _, adapter := range adapterList {
taskList, err := l.svcCtx.Scheduler.AiStorages.GetAiTasksByAdapterId(adapter.Id)


+ 2
- 2
internal/logic/ai/getcentertasklistlogic.go View File

@@ -3,7 +3,7 @@ package ai
import (
"context"
"github.com/zeromicro/go-zero/core/logx"
"gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/service/updater"
"gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/service/utils/status"
"gitlink.org.cn/JointCloud/pcm-coordinator/internal/svc"
"gitlink.org.cn/JointCloud/pcm-coordinator/internal/types"
"gitlink.org.cn/JointCloud/pcm-coordinator/pkg/constants"
@@ -32,7 +32,7 @@ func (l *GetCenterTaskListLogic) GetCenterTaskList() (resp *types.CenterTaskList
return nil, err
}

go updater.UpdateTrainingTaskStatus(l.svcCtx, adapterList)
go status.UpdateTrainingTaskStatus(l.svcCtx, adapterList)

for _, adapter := range adapterList {
taskList, err := l.svcCtx.Scheduler.AiStorages.GetAiTasksByAdapterId(adapter.Id)


+ 3
- 3
internal/logic/core/pagelisttasklogic.go View File

@@ -2,7 +2,7 @@ package core

import (
"context"
"gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/service/updater"
"gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/service/utils/status"
"gitlink.org.cn/JointCloud/pcm-coordinator/internal/svc"
"gitlink.org.cn/JointCloud/pcm-coordinator/internal/types"
"gitlink.org.cn/JointCloud/pcm-coordinator/pkg/repository/result"
@@ -52,8 +52,8 @@ func (l *PageListTaskLogic) PageListTask(req *types.PageTaskReq) (resp *types.Pa
}

// 更新智算任务状态
go updater.UpdateTaskStatus(l.svcCtx, list)
go updater.UpdateAiTaskStatus(l.svcCtx, list)
go status.UpdateTaskStatus(l.svcCtx, list)
go status.UpdateAiTaskStatus(l.svcCtx, list)

for _, model := range list {
if model.StartTime != "" && model.EndTime == "" {


+ 4
- 4
internal/logic/inference/deployinstancelistlogic.go View File

@@ -5,7 +5,7 @@ import (
"errors"
"github.com/zeromicro/go-zero/core/logx"
"gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/common"
"gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/service/updater"
"gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/service/utils/status"
"gitlink.org.cn/JointCloud/pcm-coordinator/internal/svc"
"gitlink.org.cn/JointCloud/pcm-coordinator/internal/types"
"gitlink.org.cn/JointCloud/pcm-coordinator/pkg/models"
@@ -71,7 +71,7 @@ func (l *DeployInstanceListLogic) DeployInstanceList(req *types.DeployInstanceLi
list := common.ConcatMultipleSlices(slices)

if len(list) != 0 {
go updater.UpdateDeployInstanceStatusBatch(l.svcCtx, list)
go status.UpdateDeployInstanceStatusBatch(l.svcCtx, list, true)

ins := list[0]
for i := range list {
@@ -82,8 +82,8 @@ func (l *DeployInstanceListLogic) DeployInstanceList(req *types.DeployInstanceLi
}
}

go updater.UpdateDeployInstanceStatus(l.svcCtx, ins, true)
go updater.UpdateDeployTaskStatus(l.svcCtx)
go status.UpdateDeployInstanceStatus(l.svcCtx, ins, true)
go status.UpdateDeployTaskStatus(l.svcCtx)
}

resp.List = &deployTasks


+ 2
- 31
internal/logic/inference/startallbydeploytaskidlogic.go View File

@@ -5,8 +5,7 @@ import (
"errors"
"fmt"
"github.com/zeromicro/go-zero/core/logx"
"gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/service/inference"
"gitlink.org.cn/JointCloud/pcm-coordinator/internal/storeLink"
"gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/service/utils/status"
"gitlink.org.cn/JointCloud/pcm-coordinator/internal/svc"
"gitlink.org.cn/JointCloud/pcm-coordinator/internal/types"
"gitlink.org.cn/JointCloud/pcm-coordinator/pkg/models"
@@ -84,7 +83,7 @@ func (l *StartAllByDeployTaskIdLogic) startAll(list []*models.AiInferDeployInsta
<-buf
return
}
if checkStopStatus(in) {
if status.CheckStopStatus(in) {
success := l.svcCtx.Scheduler.AiService.InferenceAdapterMap[strconv.FormatInt(ins.AdapterId, 10)][strconv.FormatInt(ins.ClusterId, 10)].StartInferDeployInstance(l.ctx, ins.InstanceId)
if !success {
e := struct {
@@ -136,31 +135,3 @@ func (l *StartAllByDeployTaskIdLogic) startAll(list []*models.AiInferDeployInsta

return nil
}

func checkStopStatus(in *inference.DeployInstance) bool {
switch in.ClusterType {
case storeLink.TYPE_OCTOPUS:
switch in.Status {
case "stopped":
return true
default:
return false
}
case storeLink.TYPE_MODELARTS:
switch in.Status {
case "stopped":
return true
default:
return false
}
case storeLink.TYPE_SHUGUANGAI:
switch in.Status {
case "Terminated":
return true
default:
return false
}
default:
return false
}
}

+ 9
- 5
internal/logic/inference/startdeployinstancelistlogic.go View File

@@ -4,7 +4,7 @@ import (
"context"
"errors"
"github.com/zeromicro/go-zero/core/logx"
"gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/service/updater"
"gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/service/utils/status"
"gitlink.org.cn/JointCloud/pcm-coordinator/internal/svc"
"gitlink.org.cn/JointCloud/pcm-coordinator/internal/types"
"strconv"
@@ -33,12 +33,16 @@ func (l *StartDeployInstanceListLogic) StartDeployInstanceList(req *types.StartD
return nil, err
}

success := l.svcCtx.Scheduler.AiService.InferenceAdapterMap[req.AdapterId][req.ClusterId].StartInferDeployInstance(l.ctx, req.InstanceId)
if !success {
return nil, errors.New("start instance failed")
in, err := l.svcCtx.Scheduler.AiService.InferenceAdapterMap[strconv.FormatInt(ins.AdapterId, 10)][strconv.FormatInt(ins.ClusterId, 10)].GetInferDeployInstance(l.ctx, ins.InstanceId)

if status.CheckStopStatus(in) {
success := l.svcCtx.Scheduler.AiService.InferenceAdapterMap[req.AdapterId][req.ClusterId].StartInferDeployInstance(l.ctx, req.InstanceId)
if !success {
return nil, errors.New("start instance failed")
}
}

go updater.UpdateDeployInstanceStatus(l.svcCtx, ins, true)
go status.UpdateDeployInstanceStatus(l.svcCtx, ins, true)

return resp, nil
}

+ 2
- 31
internal/logic/inference/stopallbydeploytaskidlogic.go View File

@@ -4,8 +4,7 @@ import (
"context"
"errors"
"fmt"
"gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/service/inference"
"gitlink.org.cn/JointCloud/pcm-coordinator/internal/storeLink"
"gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/service/utils/status"
"gitlink.org.cn/JointCloud/pcm-coordinator/internal/svc"
"gitlink.org.cn/JointCloud/pcm-coordinator/internal/types"
"gitlink.org.cn/JointCloud/pcm-coordinator/pkg/models"
@@ -85,7 +84,7 @@ func (l *StopAllByDeployTaskIdLogic) stopAll(list []*models.AiInferDeployInstanc
<-buf
return
}
if checkStatus(in) {
if status.CheckRunningStatus(in) {
success := l.svcCtx.Scheduler.AiService.InferenceAdapterMap[strconv.FormatInt(ins.AdapterId, 10)][strconv.FormatInt(ins.ClusterId, 10)].StopInferDeployInstance(l.ctx, ins.InstanceId)
if !success {
e := struct {
@@ -137,31 +136,3 @@ func (l *StopAllByDeployTaskIdLogic) stopAll(list []*models.AiInferDeployInstanc

return nil
}

func checkStatus(in *inference.DeployInstance) bool {
switch in.ClusterType {
case storeLink.TYPE_OCTOPUS:
switch in.Status {
case "running":
return true
default:
return false
}
case storeLink.TYPE_MODELARTS:
switch in.Status {
case "running":
return true
default:
return false
}
case storeLink.TYPE_SHUGUANGAI:
switch in.Status {
case "Running":
return true
default:
return false
}
default:
return false
}
}

+ 9
- 5
internal/logic/inference/stopdeployinstancelogic.go View File

@@ -4,7 +4,7 @@ import (
"context"
"errors"
"github.com/zeromicro/go-zero/core/logx"
"gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/service/updater"
"gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/service/utils/status"
"gitlink.org.cn/JointCloud/pcm-coordinator/internal/svc"
"gitlink.org.cn/JointCloud/pcm-coordinator/internal/types"
"strconv"
@@ -33,12 +33,16 @@ func (l *StopDeployInstanceLogic) StopDeployInstance(req *types.StopDeployInstan
return nil, err
}

success := l.svcCtx.Scheduler.AiService.InferenceAdapterMap[req.AdapterId][req.ClusterId].StopInferDeployInstance(l.ctx, req.InstanceId)
if !success {
return nil, errors.New("stop instance failed")
in, err := l.svcCtx.Scheduler.AiService.InferenceAdapterMap[strconv.FormatInt(ins.AdapterId, 10)][strconv.FormatInt(ins.ClusterId, 10)].GetInferDeployInstance(l.ctx, ins.InstanceId)

if status.CheckRunningStatus(in) {
success := l.svcCtx.Scheduler.AiService.InferenceAdapterMap[req.AdapterId][req.ClusterId].StopInferDeployInstance(l.ctx, req.InstanceId)
if !success {
return nil, errors.New("stop instance failed")
}
}

go updater.UpdateDeployInstanceStatus(l.svcCtx, ins, true)
go status.UpdateDeployInstanceStatus(l.svcCtx, ins, true)

return resp, nil
}

internal/scheduler/service/updater/clusterResources.go → internal/scheduler/service/utils/stat/clusterResources.go View File

@@ -1,4 +1,4 @@
package updater
package stat

import (
"gitlink.org.cn/JointCloud/pcm-coordinator/internal/svc"

internal/scheduler/service/updater/deployInstance.go → internal/scheduler/service/utils/status/deployInstance.go View File

@@ -1,6 +1,7 @@
package updater
package status

import (
"gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/service/inference"
"gitlink.org.cn/JointCloud/pcm-coordinator/internal/storeLink"
"gitlink.org.cn/JointCloud/pcm-coordinator/internal/svc"
"gitlink.org.cn/JointCloud/pcm-coordinator/pkg/constants"
@@ -10,12 +11,15 @@ import (
"time"
)

func UpdateDeployInstanceStatusBatch(svc *svc.ServiceContext, insList []*models.AiInferDeployInstance) {
func UpdateDeployInstanceStatusBatch(svc *svc.ServiceContext, insList []*models.AiInferDeployInstance, needfilter bool) {
list := make([]*models.AiInferDeployInstance, len(insList))
copy(list, insList)
for i := len(list) - 1; i >= 0; i-- {
if list[i].Status == constants.Running || list[i].Status == constants.Stopped {
list = append(list[:i], list[i+1:]...)

if needfilter {
for i := len(list) - 1; i >= 0; i-- {
if list[i].Status == constants.Running || list[i].Status == constants.Stopped {
list = append(list[:i], list[i+1:]...)
}
}
}

@@ -119,3 +123,72 @@ func UpdateDeployInstanceStatus(svc *svc.ServiceContext, instance *models.AiInfe
return
}
}

func UpdateAutoStoppedInstance(svc *svc.ServiceContext) {
list, err := svc.Scheduler.AiStorages.GetInferDeployInstanceList()
if err != nil {
return
}

if len(list) == 0 {
return
}

UpdateDeployInstanceStatusBatch(svc, list, false)
}

func CheckStopStatus(in *inference.DeployInstance) bool {
switch in.ClusterType {
case storeLink.TYPE_OCTOPUS:
switch in.Status {
case "stopped":
return true
default:
return false
}
case storeLink.TYPE_MODELARTS:
switch in.Status {
case "stopped":
return true
default:
return false
}
case storeLink.TYPE_SHUGUANGAI:
switch in.Status {
case "Terminated":
return true
default:
return false
}
default:
return false
}
}

func CheckRunningStatus(in *inference.DeployInstance) bool {
switch in.ClusterType {
case storeLink.TYPE_OCTOPUS:
switch in.Status {
case "running":
return true
default:
return false
}
case storeLink.TYPE_MODELARTS:
switch in.Status {
case "running":
return true
default:
return false
}
case storeLink.TYPE_SHUGUANGAI:
switch in.Status {
case "Running":
return true
default:
return false
}
default:
return false
}
}

internal/scheduler/service/updater/taskStatusSync.go → internal/scheduler/service/utils/status/taskStatusSync.go View File

@@ -1,4 +1,4 @@
package updater
package status

import (
"errors"

Loading…
Cancel
Save