package resource import ( "code.gitea.io/gitea/models" "code.gitea.io/gitea/modules/cloudbrain" "code.gitea.io/gitea/modules/grampus" "code.gitea.io/gitea/modules/log" "code.gitea.io/gitea/modules/modelarts" "code.gitea.io/gitea/modules/setting" "code.gitea.io/gitea/routers/response" "code.gitea.io/gitea/services/admin/operate_log" "encoding/json" "errors" "fmt" "strconv" "strings" "time" ) func AddResourceSpecification(doerId int64, req models.ResourceSpecificationReq) error { if req.Status == 0 { req.Status = models.SpecNotVerified } spec := req.ToDTO() if _, err := models.InsertResourceSpecification(spec); err != nil { return err } return nil } func UpdateSpecUnitPrice(doerId int64, specId int64, unitPrice int) *response.BizError { oldSpec, err := models.GetResourceSpecification(&models.ResourceSpecification{ID: specId}) if err != nil { return response.NewBizError(err) } if oldSpec == nil { return response.SPECIFICATION_NOT_EXIST } err = models.UpdateSpecUnitPriceById(specId, unitPrice) if err != nil { return response.NewBizError(err) } if oldSpec.UnitPrice != unitPrice { AddSpecOperateLog(doerId, "edit", operate_log.NewLogValues().Add("unitPrice", unitPrice), operate_log.NewLogValues().Add("unitPrice", oldSpec.UnitPrice), specId, fmt.Sprintf("修改资源规格单价从%d积分到%d积分", oldSpec.UnitPrice, unitPrice)) } return nil } func SyncGrampusSpecs(doerId int64) error { r, err := grampus.GetResourceSpecs("") if err != nil { return err } log.Info("SyncGrampusSpecs result = %+v", r) specUpdateList := make([]models.ResourceSpecification, 0) specInsertList := make([]models.ResourceSpecification, 0) existIds := make([]int64, 0) for _, spec := range r.Infos { for _, c := range spec.Centers { computeResource := models.ParseComputeResourceFormGrampus(spec.SpecInfo.AccDeviceKind) if computeResource == "" { continue } accCardType := strings.ToUpper(spec.SpecInfo.AccDeviceModel) memGiB, err := models.ParseMemSizeFromGrampus(spec.SpecInfo.MemorySize) gpuMemGiB, err := models.ParseMemSizeFromGrampus(spec.SpecInfo.AccDeviceMemory) if err != nil { log.Error("ParseMemSizeFromGrampus error. MemorySize=%s AccDeviceMemory=%s", spec.SpecInfo.MemorySize, spec.SpecInfo.AccDeviceMemory) } // get resource queue.if queue not exist,skip it r, err := models.GetResourceQueue(&models.ResourceQueue{ Cluster: models.C2NetCluster, AiCenterCode: c.ID, ComputeResource: computeResource, AccCardType: accCardType, }) if err != nil || r == nil { continue } //Determine if this specification already exists.if exist,update params //if not exist,insert a new record and status is SpecNotVerified oldSpec, err := models.GetResourceSpecification(&models.ResourceSpecification{ QueueId: r.ID, SourceSpecId: spec.ID, }) if err != nil { return err } if oldSpec == nil { specInsertList = append(specInsertList, models.ResourceSpecification{ QueueId: r.ID, SourceSpecId: spec.ID, AccCardsNum: spec.SpecInfo.AccDeviceNum, CpuCores: spec.SpecInfo.CpuCoreNum, MemGiB: memGiB, GPUMemGiB: gpuMemGiB, Status: models.SpecNotVerified, IsAutomaticSync: true, CreatedBy: doerId, UpdatedBy: doerId, }) } else { existIds = append(existIds, oldSpec.ID) specUpdateList = append(specUpdateList, models.ResourceSpecification{ ID: oldSpec.ID, AccCardsNum: spec.SpecInfo.AccDeviceNum, CpuCores: spec.SpecInfo.CpuCoreNum, MemGiB: memGiB, GPUMemGiB: gpuMemGiB, UpdatedBy: doerId, }) } } } return models.SyncGrampusSpecs(specUpdateList, specInsertList, existIds) } //GetResourceSpecificationList returns specification and queue func GetResourceSpecificationList(opts models.SearchResourceSpecificationOptions) (*models.ResourceSpecAndQueueListRes, error) { n, r, err := models.SearchResourceSpecification(opts) if err != nil { return nil, err } return models.NewResourceSpecAndQueueListRes(n, r), nil } func GetResourceSpecificationScenes(specId int64) ([]models.ResourceSceneBriefRes, error) { r, err := models.GetSpecScenes(specId) if err != nil { return nil, err } return r, nil } func ResourceSpecOnShelf(doerId int64, id int64, unitPrice int) *response.BizError { spec, err := models.GetResourceSpecification(&models.ResourceSpecification{ID: id}) if err != nil { return response.NewBizError(err) } if spec == nil { return response.SPECIFICATION_NOT_EXIST } if q, err := models.GetResourceQueue(&models.ResourceQueue{ID: spec.QueueId}); err != nil || q == nil { return response.RESOURCE_QUEUE_NOT_AVAILABLE } err = models.ResourceSpecOnShelf(id, unitPrice) if err != nil { return response.NewBizError(err) } if spec.UnitPrice != unitPrice { AddSpecOperateLog(doerId, "on-shelf", operate_log.NewLogValues().Add("UnitPrice", unitPrice), operate_log.NewLogValues().Add("UnitPrice", spec.UnitPrice), id, fmt.Sprintf("定价上架资源规格,单价为%d", unitPrice)) } else { AddSpecOperateLog(doerId, "on-shelf", nil, nil, id, "上架资源规格") } return nil } func ResourceSpecOffShelf(doerId int64, id int64) *response.BizError { _, err := models.ResourceSpecOffShelf(id) if err != nil { return response.NewBizError(err) } AddSpecOperateLog(doerId, "off-shelf", nil, nil, id, "下架资源规格") return nil } func AddSpecOperateLog(doerId int64, operateType string, newValue, oldValue *models.LogValues, specId int64, comment string) { var newString = "" var oldString = "" if newValue != nil { newString = newValue.JsonString() } if oldValue != nil { oldString = oldValue.JsonString() } operate_log.Log(models.AdminOperateLog{ BizType: "SpecOperate", OperateType: operateType, OldValue: oldString, NewValue: newString, RelatedId: fmt.Sprint(specId), CreatedBy: doerId, Comment: comment, }) } func FindAvailableSpecs(userId int64, opts models.FindSpecsOptions) ([]*models.Specification, error) { r, err := models.FindSpecs(opts) if err != nil { log.Error("FindAvailableSpecs error.%v", err) return nil, err } specs := make([]*models.Specification, 0, len(r)) specMap := make(map[int64]string, 0) //filter exclusive spec for i := 0; i < len(r); i++ { spec := r[i] if _, has := specMap[spec.ID]; has { continue } if !spec.IsExclusive { specs = append(specs, spec) specMap[spec.ID] = "" continue } orgs := strings.Split(spec.ExclusiveOrg, ";") for _, org := range orgs { isMember, _ := models.IsOrganizationMemberByOrgName(org, userId) if isMember { specs = append(specs, spec) specMap[spec.ID] = "" } } } return specs, err } func GetAndCheckSpec(userId int64, specId int64, opts models.FindSpecsOptions) (*models.Specification, error) { if specId == 0 { return nil, nil } opts.SpecId = specId r, err := FindAvailableSpecs(userId, opts) if err != nil { return nil, err } if r == nil || len(r) == 0 { return nil, nil } return r[0], nil } func InsertCloudbrainSpec(cloudbrainId int64, s *models.Specification) error { c := models.CloudbrainSpec{ CloudbrainID: cloudbrainId, SpecId: s.ID, SourceSpecId: s.SourceSpecId, AccCardsNum: s.AccCardsNum, AccCardType: s.AccCardType, CpuCores: s.CpuCores, MemGiB: s.MemGiB, GPUMemGiB: s.GPUMemGiB, ShareMemGiB: s.ShareMemGiB, ComputeResource: s.ComputeResource, UnitPrice: s.UnitPrice, QueueId: s.QueueId, QueueCode: s.QueueCode, Cluster: s.Cluster, AiCenterCode: s.AiCenterCode, AiCenterName: s.AiCenterName, IsExclusive: s.IsExclusive, ExclusiveOrg: s.ExclusiveOrg, } _, err := models.InsertCloudbrainSpec(c) if err != nil { log.Error("InsertCloudbrainSpec error.CloudbrainSpec=%v. err=%v", c, err) return err } return nil } func GetCloudbrainSpec(cloudbrainId int64) (*models.Specification, error) { c, err := models.GetCloudbrainSpecByID(cloudbrainId) if err != nil { return nil, err } if c == nil { return nil, nil } return c.ConvertToSpecification(), nil } func RefreshHistorySpec(scopeAll bool, ids []int64) (int64, int64, error) { var success int64 var total int64 if !scopeAll { if ids == nil || len(ids) == 0 { return 0, 0, nil } total = int64(len(ids)) tasks, err := models.GetCloudbrainWithDeletedByIDs(ids) if err != nil { return total, 0, err } for _, task := range tasks { err = RefreshOneHistorySpec(task) if err != nil { log.Error("RefreshOneHistorySpec error.%v", err) continue } success++ time.Sleep(500 * time.Millisecond) } } else { page := 1 pageSize := 100 n, err := models.CountNoSpecHistoricTask() if err != nil { log.Error("FindNoSpecHistoricTask CountNoSpecHistoricTask error. e=%v", err) return 0, 0, err } total = n for i := 0; i < 1000; i++ { list, err := models.FindNoSpecHistoricTask(page, pageSize) if err != nil { log.Error("FindNoSpecHistoricTask error.page=%d pageSize=%d e=%v", page, pageSize, err) return total, success, err } if len(list) == 0 { log.Info("RefreshHistorySpec. list is empty") break } for _, task := range list { err = RefreshOneHistorySpec(task) if err != nil { log.Error("RefreshOneHistorySpec error.%v", err) continue } success++ time.Sleep(500 * time.Millisecond) } if len(list) < pageSize { log.Info("RefreshHistorySpec. list < pageSize") break } } } return total, success, nil } func RefreshOneHistorySpec(task *models.Cloudbrain) error { var spec *models.Specification var err error switch task.Type { case models.TypeCloudBrainOne: spec, err = getCloudbrainOneSpec(task) case models.TypeCloudBrainTwo: spec, err = getCloudbrainTwoSpec(task) } if err != nil { log.Error("find spec error,task.ID=%d err=%v", task.ID, err) return err } if spec == nil { log.Error("find spec failed,task.ID=%d", task.ID) return errors.New("find spec failed") } return InsertCloudbrainSpec(task.ID, spec) } func getCloudbrainOneSpec(task *models.Cloudbrain) (*models.Specification, error) { //find from remote result, err := cloudbrain.GetJob(task.JobID) if err != nil { log.Error("getCloudbrainOneSpec error. %v", err) return nil, err } if result != nil { jobRes, _ := models.ConvertToJobResultPayload(result.Payload) memSize, _ := models.ParseMemSizeFromGrampus(jobRes.Resource.Memory) if task.ComputeResource == "CPU/GPU" { task.ComputeResource = models.GPU } var shmMB float32 if jobRes.Config.TaskRoles != nil && len(jobRes.Config.TaskRoles) > 0 { shmMB = float32(jobRes.Config.TaskRoles[0].ShmMB) / 1024 } opt := models.FindSpecsOptions{ ComputeResource: task.ComputeResource, Cluster: models.OpenICluster, AiCenterCode: models.AICenterOfCloudBrainOne, QueueCode: task.GpuQueue, AccCardsNum: jobRes.Resource.NvidiaComGpu, UseAccCardsNum: true, CpuCores: jobRes.Resource.CPU, UseCpuCores: true, MemGiB: memSize, UseMemGiB: memSize > 0, ShareMemGiB: shmMB, UseShareMemGiB: shmMB > 0, RequestAll: true, } specs, err := models.FindSpecs(opt) if err != nil { log.Error("getCloudbrainOneSpec from remote error,%v", err) return nil, err } if len(specs) == 1 { return specs[0], nil } if len(specs) == 0 { s, err := InitQueueAndSpec(opt, "云脑一", "处理历史云脑任务时自动添加") if err != nil { log.Error("getCloudbrainOneSpec InitQueueAndSpec error.err=%v", err) return nil, nil } return s, nil } if len(specs) > 1 { log.Error("Too many results matched.size=%d opt=%+v", len(specs), opt) return nil, nil } } else { //find from config var specConfig *models.ResourceSpec hasSpec := false if task.JobType == string(models.JobTypeTrain) { if cloudbrain.TrainResourceSpecs == nil { json.Unmarshal([]byte(setting.TrainResourceSpecs), &cloudbrain.TrainResourceSpecs) } for _, tmp := range cloudbrain.TrainResourceSpecs.ResourceSpec { if tmp.Id == task.ResourceSpecId { hasSpec = true specConfig = tmp break } } } else if task.JobType == string(models.JobTypeInference) { if cloudbrain.InferenceResourceSpecs == nil { json.Unmarshal([]byte(setting.InferenceResourceSpecs), &cloudbrain.InferenceResourceSpecs) } for _, tmp := range cloudbrain.InferenceResourceSpecs.ResourceSpec { if tmp.Id == task.ResourceSpecId { hasSpec = true specConfig = tmp break } } } else { if cloudbrain.ResourceSpecs == nil { json.Unmarshal([]byte(setting.ResourceSpecs), &cloudbrain.ResourceSpecs) } for _, tmp := range cloudbrain.ResourceSpecs.ResourceSpec { if tmp.Id == task.ResourceSpecId { hasSpec = true specConfig = tmp break } } } if !hasSpec && cloudbrain.SpecialPools != nil { for _, specialPool := range cloudbrain.SpecialPools.Pools { if specialPool.ResourceSpec != nil { for _, spec := range specialPool.ResourceSpec { if task.ResourceSpecId == spec.Id { hasSpec = true specConfig = spec break } } } } } if specConfig == nil { log.Error("getCloudbrainOneSpec from config failed,task.ResourceSpecId=%d", task.ResourceSpecId) return nil, nil } opt := models.FindSpecsOptions{ JobType: models.JobType(task.JobType), ComputeResource: task.ComputeResource, Cluster: models.OpenICluster, AiCenterCode: models.AICenterOfCloudBrainOne, QueueCode: task.GpuQueue, AccCardsNum: specConfig.GpuNum, UseAccCardsNum: true, CpuCores: specConfig.GpuNum, UseCpuCores: true, MemGiB: float32(specConfig.MemMiB) / 1024, UseMemGiB: true, ShareMemGiB: float32(specConfig.ShareMemMiB) / 1024, UseShareMemGiB: true, RequestAll: true, } specs, err := models.FindSpecs(opt) if err != nil { log.Error("getCloudbrainOneSpec from config error,%v", err) return nil, err } if len(specs) > 1 { log.Error("Too many results matched.size=%d opt=%+v", len(specs), opt) return nil, nil } if len(specs) == 0 { s, err := InitQueueAndSpec(opt, "云脑一", "处理历史云脑任务时自动添加") if err != nil { log.Error("getCloudbrainOneSpec InitQueueAndSpec error.err=%v", err) return nil, nil } return s, nil } return specs[0], nil } return nil, nil } func getCloudbrainTwoSpec(task *models.Cloudbrain) (*models.Specification, error) { specMap, err := models.GetCloudbrainTwoSpecs() if err != nil { log.Error("InitCloudbrainTwoSpecs err.%v", err) return nil, err } if task.FlavorCode != "" { return specMap[task.FlavorCode], nil } if task.JobType == string(models.JobTypeDebug) { result, err := modelarts.GetNotebook2(task.JobID) if err != nil { log.Error("getCloudbrainTwoSpec GetNotebook2 error.%v", err) return nil, err } if result != nil { return specMap[result.Flavor], nil } } else if task.JobType == string(models.JobTypeTrain) || task.JobType == string(models.JobTypeInference) { result, err := modelarts.GetTrainJob(task.JobID, strconv.FormatInt(task.VersionID, 10)) if err != nil { log.Error("getCloudbrainTwoSpec GetTrainJob error:%v", task.JobName, err) return nil, err } if result != nil { return specMap[result.Flavor.Code], nil } } return nil, nil } func RefreshCloudbrainTwoSpec(task *models.Cloudbrain) error { return nil } func RefreshC2NetSpec(task *models.Cloudbrain) error { return nil } func InitQueueAndSpec(opt models.FindSpecsOptions, aiCenterName string, remark string) (*models.Specification, error) { return models.InitQueueAndSpec(models.ResourceQueue{ QueueCode: opt.QueueCode, Cluster: opt.Cluster, AiCenterCode: opt.AiCenterCode, AiCenterName: aiCenterName, ComputeResource: opt.ComputeResource, AccCardType: models.GetCloudbrainOneAccCardType(opt.QueueCode), Remark: remark, }, models.ResourceSpecification{ AccCardsNum: opt.AccCardsNum, CpuCores: opt.CpuCores, MemGiB: opt.MemGiB, GPUMemGiB: opt.GPUMemGiB, ShareMemGiB: opt.ShareMemGiB, Status: models.SpecOffShelf, }) }