Former-commit-id: cead65aae1
pull/314/head
| @@ -2,7 +2,7 @@ name: Sync Mirror Repository | |||
| on: | |||
| schedule: | |||
| - cron: '0 * * * *' # 每小时同步一次 | |||
| - cron: '0 */8 * * *' # 每小时同步一次 | |||
| workflow_dispatch: # 允许手动触发 | |||
| jobs: | |||
| @@ -18,7 +18,7 @@ require ( | |||
| github.com/prometheus/common v0.59.1 | |||
| github.com/robfig/cron/v3 v3.0.1 | |||
| github.com/zeromicro/go-zero v1.7.2 | |||
| gitlink.org.cn/JointCloud/pcm-ac v0.0.0-20240725071305-f751eec4dde1 | |||
| gitlink.org.cn/JointCloud/pcm-ac v0.0.0-20240918015229-59c579d1a437 | |||
| gitlink.org.cn/JointCloud/pcm-modelarts v0.0.0-20240909072501-939c3144cd9e | |||
| gitlink.org.cn/JointCloud/pcm-octopus v0.0.0-20240817071412-44397870b110 | |||
| gitlink.org.cn/JointCloud/pcm-openstack v0.0.0-20240403033338-e7edabad4203 | |||
| @@ -466,8 +466,8 @@ github.com/yuin/gopher-lua v1.1.1 h1:kYKnWBjvbNP4XLT3+bPEwAXJx262OhaHDWDVOPjL46M | |||
| github.com/yuin/gopher-lua v1.1.1/go.mod h1:GBR0iDaNXjAgGg9zfCvksxSRnQx76gclCIb7kdAd1Pw= | |||
| github.com/zeromicro/go-zero v1.7.2 h1:a8lyVOG3KXG4LrAy6ZmtJTJtisX4Ostc4Pst4fE704I= | |||
| github.com/zeromicro/go-zero v1.7.2/go.mod h1:WFXfF92Exw0O7WECifS6r99JSzv4KEN49x9RhAfgkMc= | |||
| gitlink.org.cn/JointCloud/pcm-ac v0.0.0-20240725071305-f751eec4dde1 h1:DicBXoQiC6mumMBeyqSPNrsjtqJIgk5Pv2hscu2xryw= | |||
| gitlink.org.cn/JointCloud/pcm-ac v0.0.0-20240725071305-f751eec4dde1/go.mod h1:3eECiw9O2bIFkkePlloKyLNXiqBAhOxNrDoGaaGseGY= | |||
| gitlink.org.cn/JointCloud/pcm-ac v0.0.0-20240918015229-59c579d1a437 h1:ta6h9+FU7AQ2fNyQiXrZnMdlNBjOKdyBx4e3RF7BE84= | |||
| gitlink.org.cn/JointCloud/pcm-ac v0.0.0-20240918015229-59c579d1a437/go.mod h1:3eECiw9O2bIFkkePlloKyLNXiqBAhOxNrDoGaaGseGY= | |||
| gitlink.org.cn/JointCloud/pcm-modelarts v0.0.0-20240909072501-939c3144cd9e h1:6LYJggBoeAQxy/otzWjt40Pa7gnVvUR4c5YMi6A/NdU= | |||
| gitlink.org.cn/JointCloud/pcm-modelarts v0.0.0-20240909072501-939c3144cd9e/go.mod h1:/eOmBFZKWGoabG3sRVkVvIbLwsd2631k4jkUBR6x1AA= | |||
| gitlink.org.cn/JointCloud/pcm-octopus v0.0.0-20240817071412-44397870b110 h1:GaXwr5sgDh0raHjUf9IewTvnRvajYea7zbLsaerYyXo= | |||
| @@ -16,7 +16,8 @@ package cron | |||
| import ( | |||
| "github.com/zeromicro/go-zero/core/logx" | |||
| "gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/service/updater" | |||
| "gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/service/utils/stat" | |||
| "gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/service/utils/status" | |||
| "gitlink.org.cn/JointCloud/pcm-coordinator/internal/svc" | |||
| ) | |||
| @@ -28,20 +29,24 @@ func AddCronGroup(svc *svc.ServiceContext) { | |||
| logx.Errorf(err.Error()) | |||
| return | |||
| } | |||
| updater.UpdateTaskStatus(svc, list) | |||
| updater.UpdateAiTaskStatus(svc, list) | |||
| status.UpdateTaskStatus(svc, list) | |||
| status.UpdateAiTaskStatus(svc, list) | |||
| }) | |||
| svc.Cron.AddFunc("*/5 * * * * ?", func() { | |||
| UpdateAiAdapterMaps(svc) | |||
| }) | |||
| svc.Cron.AddFunc("*/59 * * * * ?", func() { | |||
| svc.Cron.AddFunc("30 * * * * ?", func() { | |||
| adapterList, err := svc.Scheduler.AiStorages.GetAdaptersByType("1") | |||
| if err != nil { | |||
| logx.Errorf(err.Error()) | |||
| return | |||
| } | |||
| updater.UpdateClusterResources(svc, adapterList) | |||
| stat.UpdateClusterResources(svc, adapterList) | |||
| }) | |||
| svc.Cron.AddFunc("@hourly", func() { | |||
| status.UpdateAutoStoppedInstance(svc) | |||
| }) | |||
| } | |||
| @@ -3,7 +3,7 @@ package ai | |||
| import ( | |||
| "context" | |||
| "github.com/zeromicro/go-zero/core/logx" | |||
| "gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/service/updater" | |||
| "gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/service/utils/stat" | |||
| "gitlink.org.cn/JointCloud/pcm-coordinator/internal/svc" | |||
| "gitlink.org.cn/JointCloud/pcm-coordinator/internal/types" | |||
| ) | |||
| @@ -37,7 +37,7 @@ func (l *GetCenterOverviewLogic) GetCenterOverview() (resp *types.CenterOverview | |||
| centerNum = int32(len(adapterList)) | |||
| resp.CenterNum = centerNum | |||
| go updater.UpdateClusterResources(l.svcCtx, adapterList) | |||
| go stat.UpdateClusterResources(l.svcCtx, adapterList) | |||
| for _, adapter := range adapterList { | |||
| taskList, err := l.svcCtx.Scheduler.AiStorages.GetAiTasksByAdapterId(adapter.Id) | |||
| @@ -3,7 +3,7 @@ package ai | |||
| import ( | |||
| "context" | |||
| "github.com/zeromicro/go-zero/core/logx" | |||
| "gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/service/updater" | |||
| "gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/service/utils/status" | |||
| "gitlink.org.cn/JointCloud/pcm-coordinator/internal/svc" | |||
| "gitlink.org.cn/JointCloud/pcm-coordinator/internal/types" | |||
| "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/constants" | |||
| @@ -32,7 +32,7 @@ func (l *GetCenterTaskListLogic) GetCenterTaskList() (resp *types.CenterTaskList | |||
| return nil, err | |||
| } | |||
| go updater.UpdateTrainingTaskStatus(l.svcCtx, adapterList) | |||
| go status.UpdateTrainingTaskStatus(l.svcCtx, adapterList) | |||
| for _, adapter := range adapterList { | |||
| taskList, err := l.svcCtx.Scheduler.AiStorages.GetAiTasksByAdapterId(adapter.Id) | |||
| @@ -2,7 +2,7 @@ package core | |||
| import ( | |||
| "context" | |||
| "gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/service/updater" | |||
| "gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/service/utils/status" | |||
| "gitlink.org.cn/JointCloud/pcm-coordinator/internal/svc" | |||
| "gitlink.org.cn/JointCloud/pcm-coordinator/internal/types" | |||
| "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/repository/result" | |||
| @@ -52,8 +52,8 @@ func (l *PageListTaskLogic) PageListTask(req *types.PageTaskReq) (resp *types.Pa | |||
| } | |||
| // 更新智算任务状态 | |||
| go updater.UpdateTaskStatus(l.svcCtx, list) | |||
| go updater.UpdateAiTaskStatus(l.svcCtx, list) | |||
| go status.UpdateTaskStatus(l.svcCtx, list) | |||
| go status.UpdateAiTaskStatus(l.svcCtx, list) | |||
| for _, model := range list { | |||
| if model.StartTime != "" && model.EndTime == "" { | |||
| @@ -5,7 +5,7 @@ import ( | |||
| "errors" | |||
| "github.com/zeromicro/go-zero/core/logx" | |||
| "gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/common" | |||
| "gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/service/updater" | |||
| "gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/service/utils/status" | |||
| "gitlink.org.cn/JointCloud/pcm-coordinator/internal/svc" | |||
| "gitlink.org.cn/JointCloud/pcm-coordinator/internal/types" | |||
| "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/models" | |||
| @@ -71,7 +71,7 @@ func (l *DeployInstanceListLogic) DeployInstanceList(req *types.DeployInstanceLi | |||
| list := common.ConcatMultipleSlices(slices) | |||
| if len(list) != 0 { | |||
| go updater.UpdateDeployInstanceStatusBatch(l.svcCtx, list) | |||
| go status.UpdateDeployInstanceStatusBatch(l.svcCtx, list, true) | |||
| ins := list[0] | |||
| for i := range list { | |||
| @@ -82,8 +82,8 @@ func (l *DeployInstanceListLogic) DeployInstanceList(req *types.DeployInstanceLi | |||
| } | |||
| } | |||
| go updater.UpdateDeployInstanceStatus(l.svcCtx, ins, true) | |||
| go updater.UpdateDeployTaskStatus(l.svcCtx) | |||
| go status.UpdateDeployInstanceStatus(l.svcCtx, ins, true, nil) | |||
| go status.UpdateDeployTaskStatus(l.svcCtx) | |||
| } | |||
| resp.List = &deployTasks | |||
| @@ -5,8 +5,7 @@ import ( | |||
| "errors" | |||
| "fmt" | |||
| "github.com/zeromicro/go-zero/core/logx" | |||
| "gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/service/inference" | |||
| "gitlink.org.cn/JointCloud/pcm-coordinator/internal/storeLink" | |||
| "gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/service/utils/status" | |||
| "gitlink.org.cn/JointCloud/pcm-coordinator/internal/svc" | |||
| "gitlink.org.cn/JointCloud/pcm-coordinator/internal/types" | |||
| "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/models" | |||
| @@ -84,7 +83,7 @@ func (l *StartAllByDeployTaskIdLogic) startAll(list []*models.AiInferDeployInsta | |||
| <-buf | |||
| return | |||
| } | |||
| if checkStopStatus(in) { | |||
| if status.CheckStopStatus(in) { | |||
| success := l.svcCtx.Scheduler.AiService.InferenceAdapterMap[strconv.FormatInt(ins.AdapterId, 10)][strconv.FormatInt(ins.ClusterId, 10)].StartInferDeployInstance(l.ctx, ins.InstanceId) | |||
| if !success { | |||
| e := struct { | |||
| @@ -136,31 +135,3 @@ func (l *StartAllByDeployTaskIdLogic) startAll(list []*models.AiInferDeployInsta | |||
| return nil | |||
| } | |||
| func checkStopStatus(in *inference.DeployInstance) bool { | |||
| switch in.ClusterType { | |||
| case storeLink.TYPE_OCTOPUS: | |||
| switch in.Status { | |||
| case "stopped": | |||
| return true | |||
| default: | |||
| return false | |||
| } | |||
| case storeLink.TYPE_MODELARTS: | |||
| switch in.Status { | |||
| case "stopped": | |||
| return true | |||
| default: | |||
| return false | |||
| } | |||
| case storeLink.TYPE_SHUGUANGAI: | |||
| switch in.Status { | |||
| case "Terminated": | |||
| return true | |||
| default: | |||
| return false | |||
| } | |||
| default: | |||
| return false | |||
| } | |||
| } | |||
| @@ -4,7 +4,7 @@ import ( | |||
| "context" | |||
| "errors" | |||
| "github.com/zeromicro/go-zero/core/logx" | |||
| "gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/service/updater" | |||
| "gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/service/utils/status" | |||
| "gitlink.org.cn/JointCloud/pcm-coordinator/internal/svc" | |||
| "gitlink.org.cn/JointCloud/pcm-coordinator/internal/types" | |||
| "strconv" | |||
| @@ -33,12 +33,19 @@ func (l *StartDeployInstanceListLogic) StartDeployInstanceList(req *types.StartD | |||
| return nil, err | |||
| } | |||
| success := l.svcCtx.Scheduler.AiService.InferenceAdapterMap[req.AdapterId][req.ClusterId].StartInferDeployInstance(l.ctx, req.InstanceId) | |||
| if !success { | |||
| return nil, errors.New("start instance failed") | |||
| in, err := l.svcCtx.Scheduler.AiService.InferenceAdapterMap[strconv.FormatInt(ins.AdapterId, 10)][strconv.FormatInt(ins.ClusterId, 10)].GetInferDeployInstance(l.ctx, ins.InstanceId) | |||
| if err != nil { | |||
| return nil, err | |||
| } | |||
| if status.CheckStopStatus(in) { | |||
| success := l.svcCtx.Scheduler.AiService.InferenceAdapterMap[req.AdapterId][req.ClusterId].StartInferDeployInstance(l.ctx, in.InstanceId) | |||
| if !success { | |||
| return nil, errors.New("start instance failed") | |||
| } | |||
| } | |||
| go updater.UpdateDeployInstanceStatus(l.svcCtx, ins, true) | |||
| go status.UpdateDeployInstanceStatus(l.svcCtx, ins, true, nil) | |||
| return resp, nil | |||
| } | |||
| @@ -4,8 +4,7 @@ import ( | |||
| "context" | |||
| "errors" | |||
| "fmt" | |||
| "gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/service/inference" | |||
| "gitlink.org.cn/JointCloud/pcm-coordinator/internal/storeLink" | |||
| "gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/service/utils/status" | |||
| "gitlink.org.cn/JointCloud/pcm-coordinator/internal/svc" | |||
| "gitlink.org.cn/JointCloud/pcm-coordinator/internal/types" | |||
| "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/models" | |||
| @@ -85,7 +84,7 @@ func (l *StopAllByDeployTaskIdLogic) stopAll(list []*models.AiInferDeployInstanc | |||
| <-buf | |||
| return | |||
| } | |||
| if checkStatus(in) { | |||
| if status.CheckRunningStatus(in) { | |||
| success := l.svcCtx.Scheduler.AiService.InferenceAdapterMap[strconv.FormatInt(ins.AdapterId, 10)][strconv.FormatInt(ins.ClusterId, 10)].StopInferDeployInstance(l.ctx, ins.InstanceId) | |||
| if !success { | |||
| e := struct { | |||
| @@ -137,31 +136,3 @@ func (l *StopAllByDeployTaskIdLogic) stopAll(list []*models.AiInferDeployInstanc | |||
| return nil | |||
| } | |||
| func checkStatus(in *inference.DeployInstance) bool { | |||
| switch in.ClusterType { | |||
| case storeLink.TYPE_OCTOPUS: | |||
| switch in.Status { | |||
| case "running": | |||
| return true | |||
| default: | |||
| return false | |||
| } | |||
| case storeLink.TYPE_MODELARTS: | |||
| switch in.Status { | |||
| case "running": | |||
| return true | |||
| default: | |||
| return false | |||
| } | |||
| case storeLink.TYPE_SHUGUANGAI: | |||
| switch in.Status { | |||
| case "Running": | |||
| return true | |||
| default: | |||
| return false | |||
| } | |||
| default: | |||
| return false | |||
| } | |||
| } | |||
| @@ -4,7 +4,7 @@ import ( | |||
| "context" | |||
| "errors" | |||
| "github.com/zeromicro/go-zero/core/logx" | |||
| "gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/service/updater" | |||
| "gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/service/utils/status" | |||
| "gitlink.org.cn/JointCloud/pcm-coordinator/internal/svc" | |||
| "gitlink.org.cn/JointCloud/pcm-coordinator/internal/types" | |||
| "strconv" | |||
| @@ -33,12 +33,19 @@ func (l *StopDeployInstanceLogic) StopDeployInstance(req *types.StopDeployInstan | |||
| return nil, err | |||
| } | |||
| success := l.svcCtx.Scheduler.AiService.InferenceAdapterMap[req.AdapterId][req.ClusterId].StopInferDeployInstance(l.ctx, req.InstanceId) | |||
| if !success { | |||
| return nil, errors.New("stop instance failed") | |||
| in, err := l.svcCtx.Scheduler.AiService.InferenceAdapterMap[strconv.FormatInt(ins.AdapterId, 10)][strconv.FormatInt(ins.ClusterId, 10)].GetInferDeployInstance(l.ctx, ins.InstanceId) | |||
| if err != nil { | |||
| return nil, err | |||
| } | |||
| if status.CheckRunningStatus(in) { | |||
| success := l.svcCtx.Scheduler.AiService.InferenceAdapterMap[req.AdapterId][req.ClusterId].StopInferDeployInstance(l.ctx, in.InstanceId) | |||
| if !success { | |||
| return nil, errors.New("stop instance failed") | |||
| } | |||
| } | |||
| go updater.UpdateDeployInstanceStatus(l.svcCtx, ins, true) | |||
| go status.UpdateDeployInstanceStatus(l.svcCtx, ins, true, nil) | |||
| return resp, nil | |||
| } | |||
| @@ -240,7 +240,7 @@ func (i *ImageInference) inferImages(cs []*FilteredCluster) ([]*types.ImageResul | |||
| var wg sync.WaitGroup | |||
| var ch = make(chan *types.ImageResult, len(i.files)) | |||
| var results []*types.ImageResult | |||
| limit := make(chan bool, 7) | |||
| limit := make(chan bool, 5) | |||
| var imageNumIdx int32 = 0 | |||
| var imageNumIdxEnd int32 = 0 | |||
| @@ -290,22 +290,22 @@ func (i *ImageInference) updateStatus(aiTaskList []*models.TaskAi, cs []*Filtere | |||
| //change cluster status | |||
| if len(i.clusters) != len(cs) { | |||
| var acs []*strategy.AssignedCluster | |||
| var rcs []*strategy.AssignedCluster | |||
| var failedclusters []*strategy.AssignedCluster | |||
| var runningclusters []*strategy.AssignedCluster | |||
| for _, cluster := range i.clusters { | |||
| if contains(cs, cluster.ClusterId) { | |||
| var ac *strategy.AssignedCluster | |||
| ac = cluster | |||
| rcs = append(rcs, ac) | |||
| runningclusters = append(runningclusters, ac) | |||
| } else { | |||
| var ac *strategy.AssignedCluster | |||
| ac = cluster | |||
| acs = append(acs, ac) | |||
| failedclusters = append(failedclusters, ac) | |||
| } | |||
| } | |||
| // update failed cluster status | |||
| for _, ac := range acs { | |||
| for _, ac := range failedclusters { | |||
| for _, t := range aiTaskList { | |||
| if ac.ClusterId == strconv.Itoa(int(t.ClusterId)) { | |||
| t.Status = constants.Failed | |||
| @@ -322,7 +322,7 @@ func (i *ImageInference) updateStatus(aiTaskList []*models.TaskAi, cs []*Filtere | |||
| } | |||
| // update running cluster status | |||
| for _, ac := range rcs { | |||
| for _, ac := range runningclusters { | |||
| for _, t := range aiTaskList { | |||
| if ac.ClusterId == strconv.Itoa(int(t.ClusterId)) { | |||
| t.Status = constants.Running | |||
| @@ -396,7 +396,6 @@ func (i *ImageInference) sendInferReq(images []*ImageFile, cluster *FilteredClus | |||
| return | |||
| } | |||
| }(image, cluster) | |||
| <-limit | |||
| } | |||
| } | |||
| @@ -1,4 +1,4 @@ | |||
| package updater | |||
| package stat | |||
| import ( | |||
| "gitlink.org.cn/JointCloud/pcm-coordinator/internal/svc" | |||
| @@ -1,6 +1,7 @@ | |||
| package updater | |||
| package status | |||
| import ( | |||
| "gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/service/inference" | |||
| "gitlink.org.cn/JointCloud/pcm-coordinator/internal/storeLink" | |||
| "gitlink.org.cn/JointCloud/pcm-coordinator/internal/svc" | |||
| "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/constants" | |||
| @@ -10,12 +11,15 @@ import ( | |||
| "time" | |||
| ) | |||
| func UpdateDeployInstanceStatusBatch(svc *svc.ServiceContext, insList []*models.AiInferDeployInstance) { | |||
| func UpdateDeployInstanceStatusBatch(svc *svc.ServiceContext, insList []*models.AiInferDeployInstance, needfilter bool) { | |||
| list := make([]*models.AiInferDeployInstance, len(insList)) | |||
| copy(list, insList) | |||
| for i := len(list) - 1; i >= 0; i-- { | |||
| if list[i].Status == constants.Running || list[i].Status == constants.Stopped { | |||
| list = append(list[:i], list[i+1:]...) | |||
| if needfilter { | |||
| for i := len(list) - 1; i >= 0; i-- { | |||
| if list[i].Status == constants.Running || list[i].Status == constants.Stopped { | |||
| list = append(list[:i], list[i+1:]...) | |||
| } | |||
| } | |||
| } | |||
| @@ -23,8 +27,10 @@ func UpdateDeployInstanceStatusBatch(svc *svc.ServiceContext, insList []*models. | |||
| return | |||
| } | |||
| buffer := make(chan bool, 3) | |||
| for _, instance := range list { | |||
| go UpdateDeployInstanceStatus(svc, instance, false) | |||
| buffer <- true | |||
| go UpdateDeployInstanceStatus(svc, instance, false, buffer) | |||
| } | |||
| } | |||
| @@ -47,23 +53,37 @@ func UpdateDeployTaskStatus(svc *svc.ServiceContext) { | |||
| return | |||
| } | |||
| buffer := make(chan bool, 2) | |||
| for _, instance := range inslist { | |||
| go UpdateDeployInstanceStatus(svc, instance, false) | |||
| buffer <- true | |||
| go UpdateDeployInstanceStatus(svc, instance, false, buffer) | |||
| } | |||
| } | |||
| func UpdateDeployInstanceStatus(svc *svc.ServiceContext, instance *models.AiInferDeployInstance, updatetime bool) { | |||
| func UpdateDeployInstanceStatus(svc *svc.ServiceContext, instance *models.AiInferDeployInstance, updatetime bool, ch chan bool) { | |||
| amap, found := svc.Scheduler.AiService.InferenceAdapterMap[strconv.FormatInt(instance.AdapterId, 10)] | |||
| if !found { | |||
| if ch != nil { | |||
| <-ch | |||
| return | |||
| } | |||
| return | |||
| } | |||
| cmap, found := amap[strconv.FormatInt(instance.ClusterId, 10)] | |||
| if !found { | |||
| if ch != nil { | |||
| <-ch | |||
| return | |||
| } | |||
| return | |||
| } | |||
| h := http.Request{} | |||
| ins, err := cmap.GetInferDeployInstance(h.Context(), instance.InstanceId) | |||
| if err != nil { | |||
| if ch != nil { | |||
| <-ch | |||
| return | |||
| } | |||
| return | |||
| } | |||
| switch instance.ClusterType { | |||
| @@ -71,11 +91,19 @@ func UpdateDeployInstanceStatus(svc *svc.ServiceContext, instance *models.AiInfe | |||
| switch ins.Status { | |||
| case "running": | |||
| if instance.Status == constants.Running { | |||
| if ch != nil { | |||
| <-ch | |||
| return | |||
| } | |||
| return | |||
| } | |||
| instance.Status = constants.Running | |||
| case "stopped": | |||
| if instance.Status == constants.Stopped { | |||
| if ch != nil { | |||
| <-ch | |||
| return | |||
| } | |||
| return | |||
| } | |||
| instance.Status = constants.Stopped | |||
| @@ -86,11 +114,19 @@ func UpdateDeployInstanceStatus(svc *svc.ServiceContext, instance *models.AiInfe | |||
| switch ins.Status { | |||
| case "running": | |||
| if instance.Status == constants.Running { | |||
| if ch != nil { | |||
| <-ch | |||
| return | |||
| } | |||
| return | |||
| } | |||
| instance.Status = constants.Running | |||
| case "stopped": | |||
| if instance.Status == constants.Stopped { | |||
| if ch != nil { | |||
| <-ch | |||
| return | |||
| } | |||
| return | |||
| } | |||
| instance.Status = constants.Stopped | |||
| @@ -101,11 +137,19 @@ func UpdateDeployInstanceStatus(svc *svc.ServiceContext, instance *models.AiInfe | |||
| switch ins.Status { | |||
| case "Running": | |||
| if instance.Status == constants.Running { | |||
| if ch != nil { | |||
| <-ch | |||
| return | |||
| } | |||
| return | |||
| } | |||
| instance.Status = constants.Running | |||
| case "Terminated": | |||
| if instance.Status == constants.Stopped { | |||
| if ch != nil { | |||
| <-ch | |||
| return | |||
| } | |||
| return | |||
| } | |||
| instance.Status = constants.Stopped | |||
| @@ -116,6 +160,84 @@ func UpdateDeployInstanceStatus(svc *svc.ServiceContext, instance *models.AiInfe | |||
| err = svc.Scheduler.AiStorages.UpdateInferDeployInstance(instance, updatetime) | |||
| if err != nil { | |||
| if ch != nil { | |||
| <-ch | |||
| return | |||
| } | |||
| return | |||
| } | |||
| if ch != nil { | |||
| <-ch | |||
| return | |||
| } | |||
| } | |||
| func UpdateAutoStoppedInstance(svc *svc.ServiceContext) { | |||
| list, err := svc.Scheduler.AiStorages.GetInferDeployInstanceList() | |||
| if err != nil { | |||
| return | |||
| } | |||
| if len(list) == 0 { | |||
| return | |||
| } | |||
| UpdateDeployInstanceStatusBatch(svc, list, false) | |||
| } | |||
| func CheckStopStatus(in *inference.DeployInstance) bool { | |||
| switch in.ClusterType { | |||
| case storeLink.TYPE_OCTOPUS: | |||
| switch in.Status { | |||
| case "stopped": | |||
| return true | |||
| default: | |||
| return false | |||
| } | |||
| case storeLink.TYPE_MODELARTS: | |||
| switch in.Status { | |||
| case "stopped": | |||
| return true | |||
| default: | |||
| return false | |||
| } | |||
| case storeLink.TYPE_SHUGUANGAI: | |||
| switch in.Status { | |||
| case "Terminated": | |||
| return true | |||
| default: | |||
| return false | |||
| } | |||
| default: | |||
| return false | |||
| } | |||
| } | |||
| func CheckRunningStatus(in *inference.DeployInstance) bool { | |||
| switch in.ClusterType { | |||
| case storeLink.TYPE_OCTOPUS: | |||
| switch in.Status { | |||
| case "running": | |||
| return true | |||
| default: | |||
| return false | |||
| } | |||
| case storeLink.TYPE_MODELARTS: | |||
| switch in.Status { | |||
| case "running": | |||
| return true | |||
| default: | |||
| return false | |||
| } | |||
| case storeLink.TYPE_SHUGUANGAI: | |||
| switch in.Status { | |||
| case "Running": | |||
| return true | |||
| default: | |||
| return false | |||
| } | |||
| default: | |||
| return false | |||
| } | |||
| } | |||
| @@ -1,4 +1,4 @@ | |||
| package updater | |||
| package status | |||
| import ( | |||
| "errors" | |||
| @@ -791,11 +791,14 @@ func (m *ModelArtsLink) CreateInferDeployInstance(ctx context.Context, option *o | |||
| } | |||
| var configItems []*modelarts.ServiceConfig | |||
| configItems = append(configItems, configParam) | |||
| now := time.Now() | |||
| timestampSec := now.Unix() | |||
| str := strconv.FormatInt(timestampSec, 10) | |||
| req := &modelarts.CreateServiceReq{ | |||
| Platform: m.platform, | |||
| Config: configItems, | |||
| InferType: "real-time", | |||
| ServiceName: option.ModelName + "_" + option.ModelType + "_" + Npu, | |||
| ServiceName: option.ModelName + "_" + option.ModelType + "_" + Npu + "_" + str, | |||
| } | |||
| ctx, cancel := context.WithTimeout(context.Background(), 150*time.Second) | |||
| defer cancel() | |||
| @@ -812,10 +815,37 @@ func (m *ModelArtsLink) CheckModelExistence(ctx context.Context, name string, mt | |||
| ModelName: name, | |||
| ModelType: mtype, | |||
| } | |||
| err := m.GetModelId(ctx, ifoption) | |||
| err := m.CheckImageExist(ctx, ifoption) | |||
| if err != nil { | |||
| return false | |||
| } | |||
| return true | |||
| } | |||
| func (m *ModelArtsLink) CheckImageExist(ctx context.Context, option *option.InferOption) error { | |||
| req := &modelarts.ListImagesReq{ | |||
| Limit: m.pageSize, | |||
| Offset: m.pageIndex, | |||
| } | |||
| ListImageResp, err := m.modelArtsRpc.ListImages(ctx, req) | |||
| if err != nil { | |||
| return err | |||
| } | |||
| var modelName string | |||
| if ListImageResp.Code == 200 { | |||
| //return errors.New("failed to get ModelId") | |||
| for _, ListImage := range ListImageResp.Data { | |||
| if option.ModelName == "ChatGLM-6B" { | |||
| modelName = "chatglm-6b" | |||
| } else { | |||
| modelName = option.ModelName | |||
| } | |||
| if ListImage.Name == modelName { | |||
| return nil | |||
| } | |||
| } | |||
| } | |||
| return errors.New("failed to find Image ") | |||
| } | |||
| @@ -84,10 +84,10 @@ var ( | |||
| CardModelNameCmdMap = map[string]map[string]string{ | |||
| BIV100: {"blip-image-captioning-base": "pip install -U transformers; pip install fastapi uvicorn[standard]; pip install python-multipart; cd /code; python infer_biv100.py", | |||
| "imagenet_resnet50": "pip install -U transformers; pip install fastapi uvicorn[standard]; pip install python-multipart; cd /code/infer; python infer_biv100.py", | |||
| "chatGLM_6B": "su root; pip install transformers==4.33.2; pip install fastapi uvicorn[standard]; cd /code; python infer_biv100.py"}, | |||
| "ChatGLM_6B": "su root; pip install transformers==4.33.2; pip install fastapi uvicorn[standard]; cd /code; python infer_biv100.py"}, | |||
| MLU: {"blip-image-captioning-base": "", | |||
| "imagenet_resnet50": "su root; . /torch/venv3/pytorch/bin/activate; pip install fastapi uvicorn[standard]; pip install python-multipart; cd /code/infer; python infer_mlu.py", | |||
| "chatGLM_6B": ""}, | |||
| "ChatGLM_6B": ""}, | |||
| } | |||
| ) | |||
| @@ -24,6 +24,7 @@ import ( | |||
| "gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/service/collector" | |||
| "gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/service/inference" | |||
| "gitlink.org.cn/JointCloud/pcm-coordinator/internal/types" | |||
| "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/constants" | |||
| "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/utils" | |||
| "mime/multipart" | |||
| "strconv" | |||
| @@ -800,8 +801,11 @@ func (s *ShuguangAi) GetInferDeployInstanceList(ctx context.Context) ([]*inferen | |||
| var insList []*inference.DeployInstance | |||
| params := &hpcAC.GetInstanceServiceListReqParam{ | |||
| InstanceServiceName: DEPLOY_INSTANCE_PREFIEX, | |||
| Status: "", | |||
| TaskType: "", | |||
| Start: 0, | |||
| Limit: DEPLOY_INSTANCE_LIMIT, | |||
| Sort: "desc", | |||
| } | |||
| req := &hpcacclient.GetInstanceServiceListReq{ | |||
| Param: params, | |||
| @@ -867,6 +871,25 @@ func (s *ShuguangAi) GetInferDeployInstance(ctx context.Context, id string) (*in | |||
| if err != nil || resp.Code != "0" { | |||
| return nil, err | |||
| } | |||
| var url string | |||
| if resp.Data.Status == constants.Running { | |||
| url = resp.Data.ContainerPortInfoList[0].AccessUrl | |||
| } | |||
| var modelType string | |||
| var modelName string | |||
| var card string | |||
| if resp.Data.Description != "" { | |||
| str := strings.Split(resp.Data.Description, FORWARD_SLASH) | |||
| if len(str) == 3 { | |||
| modelType = str[0] | |||
| modelName = str[1] | |||
| card = str[2] | |||
| } | |||
| } | |||
| ins.InstanceName = resp.Data.InstanceServiceName | |||
| ins.InstanceId = resp.Data.Id | |||
| ins.ClusterName = s.platform | |||
| @@ -874,6 +897,10 @@ func (s *ShuguangAi) GetInferDeployInstance(ctx context.Context, id string) (*in | |||
| ins.InferCard = DCU | |||
| ins.CreatedTime = resp.Data.CreateTime | |||
| ins.ClusterType = TYPE_SHUGUANGAI | |||
| ins.ModelType = modelType | |||
| ins.ModelName = modelName | |||
| ins.InferUrl = url | |||
| ins.InferCard = card | |||
| return ins, nil | |||
| } | |||