From dc66fa4b3f6699edc786bd09fe743497e66c1c78 Mon Sep 17 00:00:00 2001 From: chenyifan01 Date: Thu, 25 Aug 2022 17:52:16 +0800 Subject: [PATCH] #2701 update --- models/resource_specification.go | 21 ++ .../resource/resource_specification.go | 287 ++++++++++-------- 2 files changed, 181 insertions(+), 127 deletions(-) diff --git a/models/resource_specification.go b/models/resource_specification.go index 0cb7bca67..764d5acdc 100644 --- a/models/resource_specification.go +++ b/models/resource_specification.go @@ -525,3 +525,24 @@ func InitCloudbrainTwoSpecs() (map[string]*Specification, error) { } return r, nil } + +var grampusSpecsInitFlag = false +var grampusSpecs map[string]*Specification + +func GetGrampusSpecs() (map[string]*Specification, error) { + if !grampusSpecsInitFlag { + r, err := FindSpecs(FindSpecsOptions{ + Cluster: C2NetCluster, + }) + if err != nil { + return nil, err + } + for _, spec := range r { + grampusSpecs[spec.SourceSpecId] = spec + grampusSpecs[spec.SourceSpecId+"_"+spec.AiCenterCode] = spec + } + grampusSpecsInitFlag = true + + } + return grampusSpecs, nil +} diff --git a/services/cloudbrain/resource/resource_specification.go b/services/cloudbrain/resource/resource_specification.go index ad07b30bd..26c6b6431 100644 --- a/services/cloudbrain/resource/resource_specification.go +++ b/services/cloudbrain/resource/resource_specification.go @@ -338,6 +338,7 @@ func RefreshHistorySpec(scopeAll bool, ids []int64) (int64, int64, error) { total = n for i := 0; i < 1000; i++ { list, err := models.FindNoSpecHistoricTask(page, pageSize) + page++ if err != nil { log.Error("FindNoSpecHistoricTask error.page=%d pageSize=%d e=%v", page, pageSize, err) return total, success, err @@ -373,6 +374,8 @@ func RefreshOneHistorySpec(task *models.Cloudbrain) error { spec, err = getCloudbrainOneSpec(task) case models.TypeCloudBrainTwo: spec, err = getCloudbrainTwoSpec(task) + case models.TypeC2Net: + spec, err = getGrampusSpec(task) } if err != nil { log.Error("find spec error,task.ID=%d err=%v", task.ID, err) @@ -386,155 +389,179 @@ func RefreshOneHistorySpec(task *models.Cloudbrain) error { } func getCloudbrainOneSpec(task *models.Cloudbrain) (*models.Specification, error) { + if task.GpuQueue == "" { + log.Info("gpu queue is empty.task.ID = %d", task.ID) + return nil, nil + } + //find from config + spec, err := findCloudbrainOneSpecFromConfig(task) + if err != nil { + log.Error("getCloudbrainOneSpec findCloudbrainOneSpecFromConfig error.%v", err) + return nil, err + } + if spec != nil { + return spec, nil + } //find from remote + return findCloudbrainOneSpecFromRemote(task) + +} + +func findCloudbrainOneSpecFromRemote(task *models.Cloudbrain) (*models.Specification, error) { + time.Sleep(200 * time.Millisecond) + log.Info("start findCloudbrainOneSpecFromRemote") result, err := cloudbrain.GetJob(task.JobID) if err != nil { log.Error("getCloudbrainOneSpec error. %v", err) return nil, err } - if result != nil { - jobRes, _ := models.ConvertToJobResultPayload(result.Payload) - memSize, _ := models.ParseMemSizeFromGrampus(jobRes.Resource.Memory) - if task.ComputeResource == "CPU/GPU" { - task.ComputeResource = models.GPU - } - var shmMB float32 - if jobRes.Config.TaskRoles != nil && len(jobRes.Config.TaskRoles) > 0 { - shmMB = float32(jobRes.Config.TaskRoles[0].ShmMB) / 1024 - } - opt := models.FindSpecsOptions{ - ComputeResource: task.ComputeResource, - Cluster: models.OpenICluster, - AiCenterCode: models.AICenterOfCloudBrainOne, - QueueCode: task.GpuQueue, - AccCardsNum: jobRes.Resource.NvidiaComGpu, - UseAccCardsNum: true, - CpuCores: jobRes.Resource.CPU, - UseCpuCores: true, - MemGiB: memSize, - UseMemGiB: memSize > 0, - ShareMemGiB: shmMB, - UseShareMemGiB: shmMB > 0, - RequestAll: true, - } - specs, err := models.FindSpecs(opt) + if result == nil { + log.Info("findCloudbrainOneSpecFromRemote failed,result is empty.task.ID=%d", task.ID) + return nil, nil + } + jobRes, _ := models.ConvertToJobResultPayload(result.Payload) + memSize, _ := models.ParseMemSizeFromGrampus(jobRes.Resource.Memory) + if task.ComputeResource == "CPU/GPU" { + task.ComputeResource = models.GPU + } + var shmMB float32 + if jobRes.Config.TaskRoles != nil && len(jobRes.Config.TaskRoles) > 0 { + shmMB = float32(jobRes.Config.TaskRoles[0].ShmMB) / 1024 + } + + opt := models.FindSpecsOptions{ + ComputeResource: task.ComputeResource, + Cluster: models.OpenICluster, + AiCenterCode: models.AICenterOfCloudBrainOne, + QueueCode: task.GpuQueue, + AccCardsNum: jobRes.Resource.NvidiaComGpu, + UseAccCardsNum: true, + CpuCores: jobRes.Resource.CPU, + UseCpuCores: true, + MemGiB: memSize, + UseMemGiB: memSize > 0, + ShareMemGiB: shmMB, + UseShareMemGiB: shmMB > 0, + RequestAll: true, + } + specs, err := models.FindSpecs(opt) + if err != nil { + log.Error("getCloudbrainOneSpec from remote error,%v", err) + return nil, err + } + if len(specs) == 1 { + return specs[0], nil + } + if len(specs) == 0 { + s, err := InitQueueAndSpec(opt, "云脑一", "处理历史云脑任务时自动添加") if err != nil { - log.Error("getCloudbrainOneSpec from remote error,%v", err) - return nil, err + log.Error("getCloudbrainOneSpec InitQueueAndSpec error.err=%v", err) + return nil, nil } - if len(specs) == 1 { - return specs[0], nil + return s, nil + } + log.Error("Too many results matched.size=%d opt=%+v", len(specs), opt) + return nil, nil +} + +func findCloudbrainOneSpecFromConfig(task *models.Cloudbrain) (*models.Specification, error) { + //find from config + var specConfig *models.ResourceSpec + hasSpec := false + if task.JobType == string(models.JobTypeTrain) { + if cloudbrain.TrainResourceSpecs == nil { + json.Unmarshal([]byte(setting.TrainResourceSpecs), &cloudbrain.TrainResourceSpecs) } - if len(specs) == 0 { - s, err := InitQueueAndSpec(opt, "云脑一", "处理历史云脑任务时自动添加") - if err != nil { - log.Error("getCloudbrainOneSpec InitQueueAndSpec error.err=%v", err) - return nil, nil + for _, tmp := range cloudbrain.TrainResourceSpecs.ResourceSpec { + if tmp.Id == task.ResourceSpecId { + hasSpec = true + specConfig = tmp + break } - return s, nil } - if len(specs) > 1 { - log.Error("Too many results matched.size=%d opt=%+v", len(specs), opt) - return nil, nil + } else if task.JobType == string(models.JobTypeInference) { + if cloudbrain.InferenceResourceSpecs == nil { + json.Unmarshal([]byte(setting.InferenceResourceSpecs), &cloudbrain.InferenceResourceSpecs) } - - } else { - //find from config - var specConfig *models.ResourceSpec - hasSpec := false - if task.JobType == string(models.JobTypeTrain) { - if cloudbrain.TrainResourceSpecs == nil { - json.Unmarshal([]byte(setting.TrainResourceSpecs), &cloudbrain.TrainResourceSpecs) - } - for _, tmp := range cloudbrain.TrainResourceSpecs.ResourceSpec { - if tmp.Id == task.ResourceSpecId { - hasSpec = true - specConfig = tmp - break - } - } - } else if task.JobType == string(models.JobTypeInference) { - if cloudbrain.InferenceResourceSpecs == nil { - json.Unmarshal([]byte(setting.InferenceResourceSpecs), &cloudbrain.InferenceResourceSpecs) - } - for _, tmp := range cloudbrain.InferenceResourceSpecs.ResourceSpec { - if tmp.Id == task.ResourceSpecId { - hasSpec = true - specConfig = tmp - break - } - } - } else { - if cloudbrain.ResourceSpecs == nil { - json.Unmarshal([]byte(setting.ResourceSpecs), &cloudbrain.ResourceSpecs) + for _, tmp := range cloudbrain.InferenceResourceSpecs.ResourceSpec { + if tmp.Id == task.ResourceSpecId { + hasSpec = true + specConfig = tmp + break } - for _, tmp := range cloudbrain.ResourceSpecs.ResourceSpec { - if tmp.Id == task.ResourceSpecId { - hasSpec = true - specConfig = tmp - break + } + } else { + if cloudbrain.ResourceSpecs == nil { + json.Unmarshal([]byte(setting.ResourceSpecs), &cloudbrain.ResourceSpecs) + } + for _, tmp := range cloudbrain.ResourceSpecs.ResourceSpec { + if tmp.Id == task.ResourceSpecId { + hasSpec = true + specConfig = tmp + break - } } } - if !hasSpec && cloudbrain.SpecialPools != nil { + } + if !hasSpec && cloudbrain.SpecialPools != nil { - for _, specialPool := range cloudbrain.SpecialPools.Pools { + for _, specialPool := range cloudbrain.SpecialPools.Pools { - if specialPool.ResourceSpec != nil { + if specialPool.ResourceSpec != nil { - for _, spec := range specialPool.ResourceSpec { - if task.ResourceSpecId == spec.Id { - hasSpec = true - specConfig = spec - break - } + for _, spec := range specialPool.ResourceSpec { + if task.ResourceSpecId == spec.Id { + hasSpec = true + specConfig = spec + break } } } } - if specConfig == nil { - log.Error("getCloudbrainOneSpec from config failed,task.ResourceSpecId=%d", task.ResourceSpecId) - return nil, nil - } - opt := models.FindSpecsOptions{ - JobType: models.JobType(task.JobType), - ComputeResource: task.ComputeResource, - Cluster: models.OpenICluster, - AiCenterCode: models.AICenterOfCloudBrainOne, - QueueCode: task.GpuQueue, - AccCardsNum: specConfig.GpuNum, - UseAccCardsNum: true, - CpuCores: specConfig.GpuNum, - UseCpuCores: true, - MemGiB: float32(specConfig.MemMiB) / 1024, - UseMemGiB: true, - ShareMemGiB: float32(specConfig.ShareMemMiB) / 1024, - UseShareMemGiB: true, - RequestAll: true, - } - specs, err := models.FindSpecs(opt) + } + if specConfig == nil { + log.Error("getCloudbrainOneSpec from config failed,task.ResourceSpecId=%d", task.ResourceSpecId) + return nil, nil + } + if task.ComputeResource == "CPU/GPU" { + task.ComputeResource = models.GPU + } + + opt := models.FindSpecsOptions{ + JobType: models.JobType(task.JobType), + ComputeResource: task.ComputeResource, + Cluster: models.OpenICluster, + AiCenterCode: models.AICenterOfCloudBrainOne, + QueueCode: task.GpuQueue, + AccCardsNum: specConfig.GpuNum, + UseAccCardsNum: true, + CpuCores: specConfig.CpuNum, + UseCpuCores: true, + MemGiB: float32(specConfig.MemMiB) / 1024, + UseMemGiB: true, + ShareMemGiB: float32(specConfig.ShareMemMiB) / 1024, + UseShareMemGiB: true, + RequestAll: true, + } + specs, err := models.FindSpecs(opt) + if err != nil { + log.Error("getCloudbrainOneSpec from config error,%v", err) + return nil, err + } + if len(specs) > 1 { + log.Error("Too many results matched.size=%d opt=%+v", len(specs), opt) + return nil, nil + } + if len(specs) == 0 { + s, err := InitQueueAndSpec(opt, "云脑一", "处理历史云脑任务时自动添加") if err != nil { - log.Error("getCloudbrainOneSpec from config error,%v", err) - return nil, err - } - if len(specs) > 1 { - log.Error("Too many results matched.size=%d opt=%+v", len(specs), opt) + log.Error("getCloudbrainOneSpec InitQueueAndSpec error.err=%v", err) return nil, nil } - if len(specs) == 0 { - s, err := InitQueueAndSpec(opt, "云脑一", "处理历史云脑任务时自动添加") - if err != nil { - log.Error("getCloudbrainOneSpec InitQueueAndSpec error.err=%v", err) - return nil, nil - } - return s, nil - } - return specs[0], nil + return s, nil } - return nil, nil - + return specs[0], nil } func getCloudbrainTwoSpec(task *models.Cloudbrain) (*models.Specification, error) { @@ -546,6 +573,8 @@ func getCloudbrainTwoSpec(task *models.Cloudbrain) (*models.Specification, error if task.FlavorCode != "" { return specMap[task.FlavorCode], nil } + time.Sleep(200 * time.Millisecond) + log.Info("start getCloudbrainTwoSpec FromRemote") if task.JobType == string(models.JobTypeDebug) { result, err := modelarts.GetNotebook2(task.JobID) if err != nil { @@ -568,12 +597,16 @@ func getCloudbrainTwoSpec(task *models.Cloudbrain) (*models.Specification, error return nil, nil } -func RefreshCloudbrainTwoSpec(task *models.Cloudbrain) error { - return nil -} - -func RefreshC2NetSpec(task *models.Cloudbrain) error { - return nil +func getGrampusSpec(task *models.Cloudbrain) (*models.Specification, error) { + specMap, err := models.GetGrampusSpecs() + if err != nil { + log.Error("GetGrampusSpecs err.%v", err) + return nil, err + } + if task.AiCenter != "" { + return specMap[task.FlavorCode+"_"+task.AiCenter], nil + } + return specMap[task.FlavorCode], nil } func InitQueueAndSpec(opt models.FindSpecsOptions, aiCenterName string, remark string) (*models.Specification, error) {