diff --git a/models/cloudbrain.go b/models/cloudbrain.go index b2b24bacc..4a010ffd6 100755 --- a/models/cloudbrain.go +++ b/models/cloudbrain.go @@ -1974,6 +1974,12 @@ func GetCloudbrainByID(id string) (*Cloudbrain, error) { return getRepoCloudBrain(cb) } +func IsCloudbrainExistByJobName(jobName string)(bool,error){ + return x.Unscoped().Exist(&Cloudbrain{ + JobName: jobName, + }) +} + func GetCloudbrainByIDWithDeleted(id string) (*Cloudbrain, error) { idInt64, _ := strconv.ParseInt(id, 10, 64) cb := &Cloudbrain{ID: idInt64} @@ -2119,19 +2125,37 @@ func GetCloudBrainUnStoppedJob() ([]*Cloudbrain, error) { Find(&cloudbrains) } -func GetCloudBrainOneStoppedJobDaysAgo(days int, limit int) ([]*Cloudbrain, error) { +func GetCloudBrainOneStoppedNotDebugJobDaysAgo(days int, limit int) ([]*Cloudbrain, error) { cloudbrains := make([]*Cloudbrain, 0, 10) endTimeBefore := time.Now().Unix() - int64(days)*24*3600 missEndTimeBefore := endTimeBefore - 24*3600 - return cloudbrains, x.Cols("id,job_name,job_id"). + return cloudbrains, x.Unscoped().Cols("id,job_name,job_id"). In("status", JobStopped, JobSucceeded, JobFailed, ModelArtsCreateFailed, ModelArtsStartFailed, ModelArtsUnavailable, ModelArtsResizFailed, ModelArtsDeleted, ModelArtsStopped, ModelArtsTrainJobCanceled, ModelArtsTrainJobCheckFailed, ModelArtsTrainJobCompleted, ModelArtsTrainJobDeleteFailed, ModelArtsTrainJobDeployServiceFailed, ModelArtsTrainJobFailed, ModelArtsTrainJobImageFailed, ModelArtsTrainJobKilled, ModelArtsTrainJobLost, ModelArtsTrainJobSubmitFailed, ModelArtsTrainJobSubmitModelFailed). - Where("(((end_time is null or end_time=0) and updated_unix 0 { + uId = userIds[0] + } + //filter exclusive specs + specs := FilterExclusiveSpecs(s.RelatedSpecs, uId) + + centerIds := make([]string, len(specs)) + for i, v := range specs { + centerIds[i] = v.AiCenterCode + } + return centerIds +} + +func FilterExclusiveSpecs(r []*Specification, userId int64) []*Specification { + if userId == 0 { + return r + } + specs := make([]*Specification, 0, len(r)) + specMap := make(map[int64]string, 0) + for i := 0; i < len(r); i++ { + spec := r[i] + if _, has := specMap[spec.ID]; has { + continue + } + if !spec.IsExclusive { + specs = append(specs, spec) + specMap[spec.ID] = "" + continue + } + orgs := strings.Split(spec.ExclusiveOrg, ";") + for _, org := range orgs { + isMember, _ := IsOrganizationMemberByOrgName(org, userId) + if isMember { + specs = append(specs, spec) + specMap[spec.ID] = "" + break + } + } + } + return specs +} + +func DistinctSpecs(r []*Specification) []*Specification { + specs := make([]*Specification, 0, len(r)) + sourceSpecIdMap := make(map[string]string, 0) + for i := 0; i < len(r); i++ { + spec := r[i] + if spec.SourceSpecId == "" { + specs = append(specs, spec) + continue + } + if _, has := sourceSpecIdMap[spec.SourceSpecId]; has { + continue + } + specs = append(specs, spec) + sourceSpecIdMap[spec.SourceSpecId] = "" + } + return specs +} + func InsertResourceSpecification(r ResourceSpecification) (int64, error) { return x.Insert(&r) } diff --git a/modules/grampus/grampus.go b/modules/grampus/grampus.go index 169bf5625..ff03680da 100755 --- a/modules/grampus/grampus.go +++ b/modules/grampus/grampus.go @@ -282,8 +282,6 @@ func GenerateNotebookJob(ctx *context.Context, req *GenerateNotebookJobReq) (job func GenerateTrainJob(ctx *context.Context, req *GenerateTrainJobReq) (jobId string, err error) { createTime := timeutil.TimeStampNow() - centerID, centerName := getCentersParamter(ctx, req) - var datasetGrampus, modelGrampus []models.GrampusDataset var codeGrampus models.GrampusDataset if ProcessorTypeNPU == req.ProcessType { @@ -315,8 +313,7 @@ func GenerateTrainJob(ctx *context.Context, req *GenerateTrainJobReq) (jobId str ResourceSpecId: req.Spec.SourceSpecId, ImageId: req.ImageId, ImageUrl: req.ImageUrl, - CenterID: centerID, - CenterName: centerName, + CenterID: req.Spec.GetAvailableCenterIds(ctx.User.ID), ReplicaNum: 1, Datasets: datasetGrampus, Models: modelGrampus, diff --git a/modules/setting/setting.go b/modules/setting/setting.go index 3506e2715..bf7eb2352 100755 --- a/modules/setting/setting.go +++ b/modules/setting/setting.go @@ -618,6 +618,7 @@ var ( Enabled bool ResultSaveDays int BatchSize int + DebugJobSize int TrashSaveDays int Cron string RunAtStart bool @@ -1696,6 +1697,7 @@ func getClearStrategy(){ ClearStrategy.Enabled=sec.Key("ENABLED").MustBool(false) ClearStrategy.ResultSaveDays=sec.Key("RESULT_SAVE_DAYS").MustInt(30) ClearStrategy.BatchSize=sec.Key("BATCH_SIZE").MustInt(500) + ClearStrategy.DebugJobSize=sec.Key("DEBUG_BATCH_SIZE").MustInt(100) ClearStrategy.TrashSaveDays=sec.Key("TRASH_SAVE_DAYS").MustInt(90) ClearStrategy.Cron=sec.Key("CRON").MustString("* 0,30 2-8 * * ?") ClearStrategy.RunAtStart=sec.Key("RUN_AT_START").MustBool(false) diff --git a/routers/api/v1/repo/cloudbrain_dashboard.go b/routers/api/v1/repo/cloudbrain_dashboard.go index 7fe5d603c..0d68fff30 100755 --- a/routers/api/v1/repo/cloudbrain_dashboard.go +++ b/routers/api/v1/repo/cloudbrain_dashboard.go @@ -968,6 +968,8 @@ func GetWaittingTop(ctx *context.Context) { taskDetail.RepoID = ciTasks[i].RepoID if ciTasks[i].Repo != nil { taskDetail.RepoName = ciTasks[i].Repo.OwnerName + "/" + ciTasks[i].Repo.Name + } else { + taskDetail.RepoName = "" } WaitTimeInt := time.Now().Unix() - ciTasks[i].Cloudbrain.CreatedUnix.AsTime().Unix() taskDetail.WaitTime = models.ConvertDurationToStr(WaitTimeInt) @@ -975,6 +977,13 @@ func GetWaittingTop(ctx *context.Context) { if WaitTimeInt < 0 { taskDetail.WaitTime = "00:00:00" } + + taskDetail.ID = ciTasks[i].Cloudbrain.ID + taskDetail.ComputeResource = ciTasks[i].Cloudbrain.ComputeResource + taskDetail.JobType = ciTasks[i].Cloudbrain.JobType + taskDetail.JobID = ciTasks[i].Cloudbrain.JobID + taskDetail.Type = ciTasks[i].Cloudbrain.Type + tasks = append(tasks, taskDetail) } ctx.JSON(http.StatusOK, map[string]interface{}{ @@ -1001,6 +1010,12 @@ func GetRunningTop(ctx *context.Context) { taskDetail.RepoName = ciTasks[i].Repo.OwnerName + "/" + ciTasks[i].Repo.Name } + taskDetail.ID = ciTasks[i].Cloudbrain.ID + taskDetail.ComputeResource = ciTasks[i].Cloudbrain.ComputeResource + taskDetail.JobType = ciTasks[i].Cloudbrain.JobType + taskDetail.JobID = ciTasks[i].Cloudbrain.JobID + taskDetail.Type = ciTasks[i].Cloudbrain.Type + tasks = append(tasks, taskDetail) } ctx.JSON(http.StatusOK, map[string]interface{}{ diff --git a/services/cloudbrain/clear.go b/services/cloudbrain/clear.go index bbdfdd024..44613ee3c 100644 --- a/services/cloudbrain/clear.go +++ b/services/cloudbrain/clear.go @@ -13,11 +13,22 @@ import ( ) func ClearCloudbrainResultSpace() { + log.Info("clear cloudbrain one result space begin.") if !setting.ClearStrategy.Enabled{ return } - tasks, err := models.GetCloudBrainOneStoppedJobDaysAgo(setting.ClearStrategy.ResultSaveDays, setting.ClearStrategy.BatchSize) + tasks, err := models.GetCloudBrainOneStoppedNotDebugJobDaysAgo(setting.ClearStrategy.ResultSaveDays, setting.ClearStrategy.BatchSize) + if err != nil { + log.Warn("Failed to get cloudbrain, clear result failed.", err) + return + } + debugTasks, err := models.GetCloudBrainOneStoppedDebugJobDaysAgo(setting.ClearStrategy.ResultSaveDays, setting.ClearStrategy.DebugJobSize) + if err != nil { + log.Warn("Failed to get debug cloudbrain.", err) + + } + tasks=append(tasks,debugTasks...) if err != nil { log.Warn("Failed to get cloudbrain, clear result failed.", err) @@ -37,11 +48,12 @@ func ClearCloudbrainResultSpace() { log.Warn("Failed to set cloudbrain cleared status", err) } //如果云脑表处理完了,通过遍历minio对象处理历史垃圾数据,如果存在的话 - if len(tasks) < setting.ClearStrategy.BatchSize { + if len(tasks) < setting.ClearStrategy.BatchSize+setting.ClearStrategy.DebugJobSize { clearLocalHistoryTrashFile() clearMinioHistoryTrashFile() } + log.Info("clear cloudbrain one result space end.") } @@ -57,11 +69,15 @@ func clearMinioHistoryTrashFile() { SortModTimeAscend(miniofiles) for _, file := range miniofiles { - if file.ModTime().Before(time.Now().AddDate(0, 0, -setting.ClearStrategy.TrashSaveDays)) { - dirPath := setting.CBCodePathPrefix + file.Name() + "/" - log.Info("clear job in minio trash:"+file.Name()) - storage.Attachments.DeleteDir(dirPath) - processCount++ + if file.Name()!="" && file.ModTime().Before(time.Now().AddDate(0, 0, -setting.ClearStrategy.TrashSaveDays)) { + + has,err:=models.IsCloudbrainExistByJobName(file.Name()) + if err==nil && !has { + dirPath := setting.CBCodePathPrefix + file.Name() + "/" + log.Info("clear job in minio trash:" + file.Name()) + storage.Attachments.DeleteDir(dirPath) + processCount++ + } if processCount == setting.ClearStrategy.BatchSize { break } @@ -83,10 +99,13 @@ func clearLocalHistoryTrashFile() { SortModTimeAscend(files) for _, file := range files { //清理n天前的历史垃圾数据,清理job目录 - if file.ModTime().Before(time.Now().AddDate(0, 0, -setting.ClearStrategy.TrashSaveDays)) { - os.RemoveAll(setting.JobPath + file.Name()) - log.Info("clear job in local trash:"+file.Name()) - processCount++ + if file.Name()!="" && file.ModTime().Before(time.Now().AddDate(0, 0, -setting.ClearStrategy.TrashSaveDays)) { + has,err:=models.IsCloudbrainExistByJobName(file.Name()) + if err==nil && !has{ + os.RemoveAll(setting.JobPath + file.Name()) + log.Info("clear job in local trash:"+file.Name()) + processCount++ + } if processCount == setting.ClearStrategy.BatchSize { break } @@ -105,15 +124,12 @@ func SortModTimeAscend(files []os.FileInfo) { return files[i].ModTime().Before(files[j].ModTime()) }) } -func SortModTimeAscendForMinio(files []storage.FileInfo) { - sort.Slice(files, func(i, j int) bool { - timeI, _ := time.Parse("2006-01-02 15:04:05", files[i].ModTime) - timeJ, _ := time.Parse("2006-01-02 15:04:05", files[i].ModTime) - return timeI.Before(timeJ) - }) -} func DeleteCloudbrainOneJobStorage(jobName string) error { + + if jobName==""{ + return nil + } //delete local localJobPath := setting.JobPath + jobName err := os.RemoveAll(localJobPath) diff --git a/services/cloudbrain/resource/resource_specification.go b/services/cloudbrain/resource/resource_specification.go index 8f4182d87..5070d7c1e 100644 --- a/services/cloudbrain/resource/resource_specification.go +++ b/services/cloudbrain/resource/resource_specification.go @@ -246,10 +246,10 @@ func FindAvailableSpecs(userId int64, opts models.FindSpecsOptions) ([]*models.S return nil, err } //filter exclusive specs - specs := filterExclusiveSpecs(r, userId) + specs := models.FilterExclusiveSpecs(r, userId) //distinct by sourceSpecId - specs = distinctSpecs(specs) + specs = models.DistinctSpecs(specs) return specs, err } @@ -265,50 +265,6 @@ func FindAvailableSpecs4Show(userId int64, opts models.FindSpecsOptions) ([]*api return result, nil } -func filterExclusiveSpecs(r []*models.Specification, userId int64) []*models.Specification { - specs := make([]*models.Specification, 0, len(r)) - specMap := make(map[int64]string, 0) - for i := 0; i < len(r); i++ { - spec := r[i] - if _, has := specMap[spec.ID]; has { - continue - } - if !spec.IsExclusive { - specs = append(specs, spec) - specMap[spec.ID] = "" - continue - } - orgs := strings.Split(spec.ExclusiveOrg, ";") - for _, org := range orgs { - isMember, _ := models.IsOrganizationMemberByOrgName(org, userId) - if isMember { - specs = append(specs, spec) - specMap[spec.ID] = "" - break - } - } - } - return specs -} - -func distinctSpecs(r []*models.Specification) []*models.Specification { - specs := make([]*models.Specification, 0, len(r)) - sourceSpecIdMap := make(map[string]string, 0) - for i := 0; i < len(r); i++ { - spec := r[i] - if spec.SourceSpecId == "" { - specs = append(specs, spec) - continue - } - if _, has := sourceSpecIdMap[spec.SourceSpecId]; has { - continue - } - specs = append(specs, spec) - sourceSpecIdMap[spec.SourceSpecId] = "" - } - return specs -} - func GetAndCheckSpec(userId int64, specId int64, opts models.FindSpecsOptions) (*models.Specification, error) { if specId == 0 { return nil, nil