package cron import ( "errors" "fmt" "github.com/zeromicro/go-zero/core/logx" "github.com/zeromicro/go-zero/zrpc" "gitlink.org.cn/JointCloud/pcm-ac/hpcacclient" "gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/config" "gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/scheduler/service/collector" "gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/scheduler/service/executor" "gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/storeLink" "gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/svc" "gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/types" "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/constants" "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/models" "gitlink.org.cn/JointCloud/pcm-modelarts/client/imagesservice" "gitlink.org.cn/JointCloud/pcm-modelarts/client/modelartsservice" "gitlink.org.cn/JointCloud/pcm-octopus/octopusclient" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" "net/http" "strconv" "sync" "time" ) const ( OCTOPUS = "octopus" MODELARTS = "modelarts" SHUGUANGAI = "shuguangAi" ) func GetTaskList(svc *svc.ServiceContext) ([]*types.TaskModel, error) { limit := 10 offset := 0 var list []*types.TaskModel db := svc.DbEngin.Model(&types.TaskModel{}).Table("task") db = db.Where("deleted_at is null") //count total var total int64 err := db.Count(&total).Error db.Limit(limit).Offset(offset) if err != nil { return nil, err } err = db.Order("created_time desc").Find(&list).Error if err != nil { return nil, err } return list, nil } func UpdateAiTaskStatus(svc *svc.ServiceContext, tasklist []*types.TaskModel) { list := make([]*types.TaskModel, len(tasklist)) copy(list, tasklist) for i := len(list) - 1; i >= 0; i-- { if list[i].AdapterTypeDict != 1 || list[i].Status == constants.Succeeded || list[i].Status == constants.Failed { list = append(list[:i], list[i+1:]...) } } if len(list) == 0 { return } task := list[0] for i := range list { earliest, _ := time.Parse(constants.Layout, task.UpdatedTime) latest, _ := time.Parse(constants.Layout, list[i].UpdatedTime) if latest.Before(earliest) { task = list[i] } } var aiTaskList []*models.TaskAi tx := svc.DbEngin.Raw("select * from task_ai where `task_id` = ? ", task.Id).Scan(&aiTaskList) if tx.Error != nil { logx.Errorf(tx.Error.Error()) return } if len(aiTaskList) == 0 { return } var wg sync.WaitGroup for _, aitask := range aiTaskList { t := aitask if t.Status == constants.Completed || t.Status == constants.Failed { continue } wg.Add(1) go func() { h := http.Request{} trainingTask, err := svc.Scheduler.AiService.AiCollectorAdapterMap[strconv.FormatInt(t.AdapterId, 10)][strconv.FormatInt(t.ClusterId, 10)].GetTrainingTask(h.Context(), t.JobId) if err != nil { if status.Code(err) == codes.DeadlineExceeded { msg := fmt.Sprintf("###UpdateAiTaskStatus###, AiTaskId: %v, clusterId: %v , JobId: %v, error: %v \n", t.Id, t.ClusterId, t.JobId, err.Error()) logx.Errorf(errors.New(msg).Error()) wg.Done() return } msg := fmt.Sprintf("###UpdateAiTaskStatus###, AiTaskId: %v, clusterId: %v , JobId: %v, error: %v \n", t.Id, t.ClusterId, t.JobId, err.Error()) logx.Errorf(errors.New(msg).Error()) wg.Done() return } if trainingTask == nil { wg.Done() return } t.Status = trainingTask.Status t.StartTime = trainingTask.Start t.EndTime = trainingTask.End err = svc.Scheduler.AiStorages.UpdateAiTask(t) if err != nil { msg := fmt.Sprintf("###UpdateAiTaskStatus###, AiTaskId: %v, clusterId: %v , JobId: %v, error: %v \n", t.Id, t.ClusterId, t.JobId, err.Error()) logx.Errorf(errors.New(msg).Error()) wg.Done() return } wg.Done() }() } wg.Wait() } func UpdateTaskStatus(svc *svc.ServiceContext, tasklist []*types.TaskModel) { list := make([]*types.TaskModel, len(tasklist)) copy(list, tasklist) for i := len(list) - 1; i >= 0; i-- { if list[i].AdapterTypeDict != 1 || list[i].Status == constants.Succeeded || list[i].Status == constants.Failed { list = append(list[:i], list[i+1:]...) } } if len(list) == 0 { return } task := list[0] for i := range list { earliest, _ := time.Parse(time.RFC3339, task.UpdatedTime) latest, _ := time.Parse(time.RFC3339, list[i].UpdatedTime) if latest.Before(earliest) { task = list[i] } } var aiTask []*models.TaskAi tx := svc.DbEngin.Raw("select * from task_ai where `task_id` = ? ", task.Id).Scan(&aiTask) if tx.Error != nil { logx.Errorf(tx.Error.Error()) return } if len(aiTask) == 0 { tx = svc.DbEngin.Model(task).Table("task").Where("deleted_at is null").Updates(task) if tx.Error != nil { logx.Errorf(tx.Error.Error()) return } return } if len(aiTask) == 1 { if aiTask[0].Status == constants.Completed { task.Status = constants.Succeeded } else { task.Status = aiTask[0].Status } task.StartTime = aiTask[0].StartTime task.EndTime = aiTask[0].EndTime task.UpdatedTime = time.Now().Format(constants.Layout) tx = svc.DbEngin.Model(task).Table("task").Where("deleted_at is null").Updates(task) if tx.Error != nil { logx.Errorf(tx.Error.Error()) return } return } for i := len(aiTask) - 1; i >= 0; i-- { if aiTask[i].StartTime == "" { task.Status = aiTask[i].Status aiTask = append(aiTask[:i], aiTask[i+1:]...) } } if len(aiTask) == 0 { task.UpdatedTime = time.Now().Format(constants.Layout) tx = svc.DbEngin.Table("task").Model(task).Updates(task) if tx.Error != nil { logx.Errorf(tx.Error.Error()) return } return } start, _ := time.ParseInLocation(constants.Layout, aiTask[0].StartTime, time.Local) end, _ := time.ParseInLocation(constants.Layout, aiTask[0].EndTime, time.Local) var status string var count int for _, a := range aiTask { s, _ := time.ParseInLocation(constants.Layout, a.StartTime, time.Local) e, _ := time.ParseInLocation(constants.Layout, a.EndTime, time.Local) if s.Before(start) { start = s } if e.After(end) { end = e } if a.Status == constants.Failed { status = a.Status break } if a.Status == constants.Pending { status = a.Status continue } if a.Status == constants.Running { status = a.Status continue } if a.Status == constants.Completed { count++ continue } } if count == len(aiTask) { status = constants.Succeeded } if status != "" { task.Status = status task.StartTime = start.Format(constants.Layout) task.EndTime = end.Format(constants.Layout) } task.UpdatedTime = time.Now().Format(constants.Layout) tx = svc.DbEngin.Table("task").Model(task).Updates(task) if tx.Error != nil { logx.Errorf(tx.Error.Error()) return } } func UpdateAiAdapterMaps(svc *svc.ServiceContext) { var aiType = "1" adapterIds, err := svc.Scheduler.AiStorages.GetAdapterIdsByType(aiType) if err != nil { msg := fmt.Sprintf("###UpdateAiAdapterMaps###, error: %v \n", err.Error()) logx.Errorf(errors.New(msg).Error()) return } if len(adapterIds) == 0 { return } for _, id := range adapterIds { if isAdapterExist(svc, id) { continue } clusters, err := svc.Scheduler.AiStorages.GetClustersByAdapterId(id) if err != nil { msg := fmt.Sprintf("###UpdateAiAdapterMaps###, error: %v \n", err.Error()) logx.Errorf(errors.New(msg).Error()) return } if len(clusters.List) == 0 { continue } exeClusterMap, colClusterMap := InitAiClusterMap(&svc.Config, clusters.List) svc.Scheduler.AiService.AiExecutorAdapterMap[id] = exeClusterMap svc.Scheduler.AiService.AiCollectorAdapterMap[id] = colClusterMap } } func isAdapterExist(svc *svc.ServiceContext, id string) bool { _, ok := svc.Scheduler.AiService.AiExecutorAdapterMap[id] _, ok2 := svc.Scheduler.AiService.AiCollectorAdapterMap[id] if ok && ok2 { return true } return false } func InitAiClusterMap(conf *config.Config, clusters []types.ClusterInfo) (map[string]executor.AiExecutor, map[string]collector.AiCollector) { executorMap := make(map[string]executor.AiExecutor) collectorMap := make(map[string]collector.AiCollector) for _, c := range clusters { switch c.Name { case OCTOPUS: id, _ := strconv.ParseInt(c.Id, 10, 64) octopusRpc := octopusclient.NewOctopus(zrpc.MustNewClient(conf.OctopusRpcConf)) octopus := storeLink.NewOctopusLink(octopusRpc, c.Nickname, id) collectorMap[c.Id] = octopus executorMap[c.Id] = octopus case MODELARTS: id, _ := strconv.ParseInt(c.Id, 10, 64) modelArtsRpc := modelartsservice.NewModelArtsService(zrpc.MustNewClient(conf.ModelArtsRpcConf)) modelArtsImgRpc := imagesservice.NewImagesService(zrpc.MustNewClient(conf.ModelArtsImgRpcConf)) modelarts := storeLink.NewModelArtsLink(modelArtsRpc, modelArtsImgRpc, c.Name, id, c.Nickname) collectorMap[c.Id] = modelarts executorMap[c.Id] = modelarts case SHUGUANGAI: id, _ := strconv.ParseInt(c.Id, 10, 64) aCRpc := hpcacclient.NewHpcAC(zrpc.MustNewClient(conf.ACRpcConf)) sgai := storeLink.NewShuguangAi(aCRpc, c.Nickname, id) collectorMap[c.Id] = sgai executorMap[c.Id] = sgai } } return executorMap, collectorMap } func UpdateClusterResource(svc *svc.ServiceContext) { list, err := svc.Scheduler.AiStorages.GetAdaptersByType("1") if err != nil { return } var wg sync.WaitGroup for _, adapter := range list { clusters, err := svc.Scheduler.AiStorages.GetClustersByAdapterId(adapter.Id) if err != nil { continue } for _, cluster := range clusters.List { c := cluster clusterResource, err := svc.Scheduler.AiStorages.GetClusterResourcesById(c.Id) if err != nil { continue } wg.Add(1) go func() { _, ok := svc.Scheduler.AiService.AiCollectorAdapterMap[adapter.Id][c.Id] if !ok { wg.Done() return } h := http.Request{} stat, err := svc.Scheduler.AiService.AiCollectorAdapterMap[adapter.Id][c.Id].GetResourceStats(h.Context()) if err != nil { wg.Done() return } if stat == nil { wg.Done() return } clusterType, err := strconv.ParseInt(adapter.Type, 10, 64) if err != nil { wg.Done() return } var cardTotal int64 var topsTotal float64 for _, card := range stat.CardsAvail { cardTotal += int64(card.CardNum) topsTotal += card.TOpsAtFp16 * float64(card.CardNum) } if (models.TClusterResource{} == *clusterResource) { err = svc.Scheduler.AiStorages.SaveClusterResources(c.Id, c.Name, clusterType, float64(stat.CpuCoreAvail), float64(stat.CpuCoreTotal), stat.MemAvail, stat.MemTotal, stat.DiskAvail, stat.DiskTotal, float64(stat.GpuAvail), float64(stat.GpuTotal), cardTotal, topsTotal) if err != nil { wg.Done() return } } else { if stat.CpuCoreTotal == 0 || stat.MemTotal == 0 || stat.DiskTotal == 0 { wg.Done() return } clusterResource.CardTotal = cardTotal clusterResource.CardTopsTotal = topsTotal clusterResource.CpuAvail = float64(stat.CpuCoreAvail) clusterResource.CpuTotal = float64(stat.CpuCoreTotal) clusterResource.MemAvail = stat.MemAvail clusterResource.MemTotal = stat.MemTotal clusterResource.DiskAvail = stat.DiskAvail clusterResource.DiskTotal = stat.DiskTotal err := svc.Scheduler.AiStorages.UpdateClusterResources(clusterResource) if err != nil { wg.Done() return } } wg.Done() }() } } wg.Wait() }