| @@ -2308,11 +2308,10 @@ func GetWaitingCloudbrainCount(cloudbrainType int, computeResource string, jobTy | |||
| } | |||
| return sess.Count(new(Cloudbrain)) | |||
| } | |||
| func GetNotFinalStatusTaskCount(userID int64, notFinalStatus []string, jobTypes []JobType, cloudbrainTypes []int, computeResource string) (int, error) { | |||
| func GetNotFinalStatusTaskCount(userID int64, notFinalStatus []string, jobTypes []JobType) (int, error) { | |||
| count, err := x.In("status", notFinalStatus). | |||
| In("job_type", jobTypes). | |||
| In("type", cloudbrainTypes). | |||
| And("user_id = ? and compute_resource = ?", userID, computeResource).Count(new(Cloudbrain)) | |||
| And("user_id = ? ", userID).Count(new(Cloudbrain)) | |||
| return int(count), err | |||
| } | |||
| @@ -690,6 +690,8 @@ var ( | |||
| IncubationSourceOrgName string | |||
| PaperRepoTopicName string | |||
| CloudbrainUniquenessLockTime time.Duration | |||
| //nginx proxy | |||
| PROXYURL string | |||
| RadarMap = struct { | |||
| @@ -1506,6 +1508,8 @@ func NewContext() { | |||
| CullInterval = sec.Key("CULL_INTERVAL").MustString("60") | |||
| DebugAttachSize = sec.Key("DEBUG_ATTACH_SIZE").MustInt(20) | |||
| CloudbrainUniquenessLockTime = sec.Key("UNIQUENESS_LOCK_TIME").MustDuration(5 * time.Minute) | |||
| sec = Cfg.Section("benchmark") | |||
| IsBenchmarkEnabled = sec.Key("ENABLED").MustBool(false) | |||
| BenchmarkOwner = sec.Key("OWNER").MustString("") | |||
| @@ -576,11 +576,10 @@ func AiSafetyCreateForPost(ctx *context.Context) { | |||
| tpname = tplCloudBrainModelSafetyNewGpu | |||
| } | |||
| limiterCtx := &lock.LockContext{Repo: ctx.Repo.Repository, DisplayJobName: displayJobName, User: ctx.User} | |||
| lockOperator, errMsg := cloudbrainService.Lock4CloudbrainCreation(limiterCtx) | |||
| lockOperator, errMsg := cloudbrainService.Lock4CloudbrainCreation(&lock.LockContext{Repo: ctx.Repo.Repository, Task: &models.Cloudbrain{DisplayJobName: displayJobName, JobType: string(models.JobTypeInference)}, User: ctx.User}) | |||
| defer func() { | |||
| if lockOperator != nil { | |||
| lockOperator.Unlock(limiterCtx) | |||
| lockOperator.Unlock() | |||
| } | |||
| }() | |||
| @@ -41,8 +41,6 @@ import ( | |||
| "code.gitea.io/gitea/modules/git" | |||
| "code.gitea.io/gitea/modules/log" | |||
| "code.gitea.io/gitea/modules/modelarts" | |||
| "code.gitea.io/gitea/modules/redis/redis_key" | |||
| "code.gitea.io/gitea/modules/redis/redis_lock" | |||
| "code.gitea.io/gitea/modules/setting" | |||
| "code.gitea.io/gitea/modules/storage" | |||
| "code.gitea.io/gitea/modules/util" | |||
| @@ -229,11 +227,10 @@ func cloudBrainCreate(ctx *context.Context, form auth.CreateCloudBrainForm) { | |||
| if jobType == string(models.JobTypeTrain) { | |||
| tpl = tplCloudBrainTrainJobNew | |||
| } | |||
| limiterCtx := &lock.LockContext{Repo: ctx.Repo.Repository, DisplayJobName: displayJobName, User: ctx.User} | |||
| lockOperator, errMsg := cloudbrainService.Lock4CloudbrainCreation(limiterCtx) | |||
| lockOperator, errMsg := cloudbrainService.Lock4CloudbrainCreation(&lock.LockContext{Repo: ctx.Repo.Repository, Task: &models.Cloudbrain{DisplayJobName: displayJobName, JobType: form.JobType}, User: ctx.User}) | |||
| defer func() { | |||
| if lockOperator != nil { | |||
| lockOperator.Unlock(limiterCtx) | |||
| lockOperator.Unlock() | |||
| } | |||
| }() | |||
| @@ -467,15 +464,19 @@ func CloudBrainInferenceJobCreate(ctx *context.Context, form auth.CreateCloudBra | |||
| repo := ctx.Repo.Repository | |||
| tpl := tplCloudBrainInferenceJobNew | |||
| lock := redis_lock.NewDistributeLock(redis_key.CloudbrainBindingJobNameKey(fmt.Sprint(repo.ID), jobType, displayJobName)) | |||
| isOk, err := lock.Lock(models.CloudbrainKeyDuration) | |||
| if !isOk { | |||
| log.Error("lock processed failed:%v", err, ctx.Data["MsgID"]) | |||
| cloudBrainNewDataPrepare(ctx, jobType) | |||
| ctx.RenderWithErr(ctx.Tr("repo.cloudbrain_samejob_err"), tpl, &form) | |||
| lockOperator, errMsg := cloudbrainService.Lock4CloudbrainCreation(&lock.LockContext{Repo: ctx.Repo.Repository, Task: &models.Cloudbrain{DisplayJobName: displayJobName, JobType: jobType}, User: ctx.User}) | |||
| defer func() { | |||
| if lockOperator != nil { | |||
| lockOperator.Unlock() | |||
| } | |||
| }() | |||
| if errMsg != "" { | |||
| log.Error("lock processed failed:%s", errMsg, ctx.Data["MsgID"]) | |||
| grampusTrainJobNewDataPrepare(ctx, grampus.ProcessorTypeNPU) | |||
| ctx.RenderWithErr(ctx.Tr(errMsg), tpl, &form) | |||
| return | |||
| } | |||
| defer lock.UnLock() | |||
| ckptUrl := setting.Attachment.Minio.RealPath + form.TrainUrl + form.CkptName | |||
| log.Info("ckpt url:" + ckptUrl) | |||
| @@ -650,7 +651,24 @@ func CloudBrainRestart(ctx *context.Context) { | |||
| var errorMsg = "" | |||
| var status = string(models.JobWaiting) | |||
| task := ctx.Cloudbrain | |||
| lockOperator, errMsg := cloudbrainService.Lock4CloudbrainRestart(&lock.LockContext{Repo: ctx.Repo.Repository, Task: &models.Cloudbrain{JobType: task.JobType}, User: ctx.User}) | |||
| defer func() { | |||
| if lockOperator != nil { | |||
| lockOperator.Unlock() | |||
| } | |||
| }() | |||
| if errMsg != "" { | |||
| log.Error("lock processed failed:%s", errMsg, ctx.Data["MsgID"]) | |||
| resultCode = "-1" | |||
| errorMsg = ctx.Tr(errMsg) | |||
| } | |||
| for { | |||
| if errorMsg != "" { | |||
| break | |||
| } | |||
| if task.Status != string(models.JobStopped) && task.Status != string(models.JobSucceeded) && task.Status != string(models.JobFailed) { | |||
| log.Error("the job(%s) is not stopped", task.JobName, ctx.Data["MsgID"]) | |||
| resultCode = "-1" | |||
| @@ -2393,11 +2411,10 @@ func BenchMarkAlgorithmCreate(ctx *context.Context, form auth.CreateCloudBrainFo | |||
| ctx.Data["benchmarkTypeID"] = benchmarkTypeID | |||
| ctx.Data["benchmark_child_types_id_hidden"] = benchmarkChildTypeID | |||
| limiterCtx := &lock.LockContext{Repo: ctx.Repo.Repository, DisplayJobName: displayJobName, User: ctx.User} | |||
| lockOperator, errMsg := cloudbrainService.Lock4CloudbrainCreation(limiterCtx) | |||
| lockOperator, errMsg := cloudbrainService.Lock4CloudbrainCreation(&lock.LockContext{Repo: ctx.Repo.Repository, Task: &models.Cloudbrain{DisplayJobName: displayJobName, JobType: form.JobType}, User: ctx.User}) | |||
| defer func() { | |||
| if lockOperator != nil { | |||
| lockOperator.Unlock(limiterCtx) | |||
| lockOperator.Unlock() | |||
| } | |||
| }() | |||
| @@ -2587,15 +2604,19 @@ func ModelBenchmarkCreate(ctx *context.Context, form auth.CreateCloudBrainForm) | |||
| tpl := tplCloudBrainBenchmarkNew | |||
| command := cloudbrain.GetCloudbrainDebugCommand() | |||
| lock := redis_lock.NewDistributeLock(redis_key.CloudbrainBindingJobNameKey(fmt.Sprint(repo.ID), jobType, displayJobName)) | |||
| isOk, err := lock.Lock(models.CloudbrainKeyDuration) | |||
| if !isOk { | |||
| log.Error("lock processed failed:%v", err, ctx.Data["MsgID"]) | |||
| cloudBrainNewDataPrepare(ctx, jobType) | |||
| ctx.RenderWithErr(ctx.Tr("repo.cloudbrain_samejob_err"), tpl, &form) | |||
| lockOperator, errMsg := cloudbrainService.Lock4CloudbrainCreation(&lock.LockContext{Repo: ctx.Repo.Repository, Task: &models.Cloudbrain{DisplayJobName: displayJobName, JobType: jobType}, User: ctx.User}) | |||
| defer func() { | |||
| if lockOperator != nil { | |||
| lockOperator.Unlock() | |||
| } | |||
| }() | |||
| if errMsg != "" { | |||
| log.Error("lock processed failed:%s", errMsg, ctx.Data["MsgID"]) | |||
| grampusTrainJobNewDataPrepare(ctx, grampus.ProcessorTypeNPU) | |||
| ctx.RenderWithErr(ctx.Tr(errMsg), tpl, &form) | |||
| return | |||
| } | |||
| defer lock.UnLock() | |||
| tasks, err := models.GetCloudbrainsByDisplayJobName(repo.ID, jobType, displayJobName) | |||
| if err == nil { | |||
| @@ -28,8 +28,6 @@ import ( | |||
| "code.gitea.io/gitea/modules/grampus" | |||
| "code.gitea.io/gitea/modules/modelarts" | |||
| "code.gitea.io/gitea/modules/notification" | |||
| "code.gitea.io/gitea/modules/redis/redis_key" | |||
| "code.gitea.io/gitea/modules/redis/redis_lock" | |||
| "code.gitea.io/gitea/modules/timeutil" | |||
| "code.gitea.io/gitea/modules/util" | |||
| "github.com/unknwon/com" | |||
| @@ -132,11 +130,10 @@ func GrampusNotebookCreate(ctx *context.Context, form auth.CreateGrampusNotebook | |||
| codeStoragePath = setting.CBCodePathPrefix + jobName + cloudbrain.CodeMountPath + "/" | |||
| } | |||
| limiterCtx := &lock.LockContext{Repo: ctx.Repo.Repository, DisplayJobName: displayJobName, User: ctx.User} | |||
| lockOperator, errMsg := cloudbrainService.Lock4CloudbrainCreation(limiterCtx) | |||
| lockOperator, errMsg := cloudbrainService.Lock4CloudbrainCreation(&lock.LockContext{Repo: ctx.Repo.Repository, Task: &models.Cloudbrain{DisplayJobName: displayJobName, JobType: string(models.JobTypeDebug)}, User: ctx.User}) | |||
| defer func() { | |||
| if lockOperator != nil { | |||
| lockOperator.Unlock(limiterCtx) | |||
| lockOperator.Unlock() | |||
| } | |||
| }() | |||
| @@ -498,19 +495,23 @@ func grampusTrainJobGpuCreate(ctx *context.Context, form auth.CreateGrampusTrain | |||
| image := strings.TrimSpace(form.Image) | |||
| tpl := tplGrampusTrainJobGPUNew | |||
| lock := redis_lock.NewDistributeLock(redis_key.CloudbrainBindingJobNameKey(fmt.Sprint(repo.ID), string(models.JobTypeTrain), displayJobName)) | |||
| isOk, err := lock.Lock(models.CloudbrainKeyDuration) | |||
| if !isOk { | |||
| log.Error("lock processed failed:%v", err, ctx.Data["MsgID"]) | |||
| if !jobNamePattern.MatchString(displayJobName) { | |||
| grampusTrainJobNewDataPrepare(ctx, grampus.ProcessorTypeGPU) | |||
| ctx.RenderWithErr(ctx.Tr("repo.cloudbrain_samejob_err"), tplGrampusTrainJobGPUNew, &form) | |||
| ctx.RenderWithErr(ctx.Tr("repo.cloudbrain_jobname_err"), tpl, &form) | |||
| return | |||
| } | |||
| defer lock.UnLock() | |||
| if !jobNamePattern.MatchString(displayJobName) { | |||
| lockOperator, errMsg := cloudbrainService.Lock4CloudbrainCreation(&lock.LockContext{Repo: ctx.Repo.Repository, Task: &models.Cloudbrain{DisplayJobName: displayJobName, JobType: string(models.JobTypeTrain)}, User: ctx.User}) | |||
| defer func() { | |||
| if lockOperator != nil { | |||
| lockOperator.Unlock() | |||
| } | |||
| }() | |||
| if errMsg != "" { | |||
| log.Error("lock processed failed:%s", errMsg, ctx.Data["MsgID"]) | |||
| grampusTrainJobNewDataPrepare(ctx, grampus.ProcessorTypeGPU) | |||
| ctx.RenderWithErr(ctx.Tr("repo.cloudbrain_jobname_err"), tpl, &form) | |||
| ctx.RenderWithErr(ctx.Tr(errMsg), tpl, &form) | |||
| return | |||
| } | |||
| @@ -746,19 +747,23 @@ func grampusTrainJobNpuCreate(ctx *context.Context, form auth.CreateGrampusTrain | |||
| engineName := form.EngineName | |||
| tpl := tplGrampusTrainJobNPUNew | |||
| lock := redis_lock.NewDistributeLock(redis_key.CloudbrainBindingJobNameKey(fmt.Sprint(repo.ID), string(models.JobTypeTrain), displayJobName)) | |||
| isOk, err := lock.Lock(models.CloudbrainKeyDuration) | |||
| if !isOk { | |||
| log.Error("lock processed failed:%v", err, ctx.Data["MsgID"]) | |||
| if !jobNamePattern.MatchString(displayJobName) { | |||
| grampusTrainJobNewDataPrepare(ctx, grampus.ProcessorTypeNPU) | |||
| ctx.RenderWithErr(ctx.Tr("repo.cloudbrain_samejob_err"), tplGrampusTrainJobNPUNew, &form) | |||
| ctx.RenderWithErr(ctx.Tr("repo.cloudbrain_jobname_err"), tpl, &form) | |||
| return | |||
| } | |||
| defer lock.UnLock() | |||
| if !jobNamePattern.MatchString(displayJobName) { | |||
| lockOperator, errMsg := cloudbrainService.Lock4CloudbrainCreation(&lock.LockContext{Repo: ctx.Repo.Repository, Task: &models.Cloudbrain{DisplayJobName: displayJobName, JobType: string(models.JobTypeTrain)}, User: ctx.User}) | |||
| defer func() { | |||
| if lockOperator != nil { | |||
| lockOperator.Unlock() | |||
| } | |||
| }() | |||
| if errMsg != "" { | |||
| log.Error("lock processed failed:%s", errMsg, ctx.Data["MsgID"]) | |||
| grampusTrainJobNewDataPrepare(ctx, grampus.ProcessorTypeNPU) | |||
| ctx.RenderWithErr(ctx.Tr("repo.cloudbrain_jobname_err"), tpl, &form) | |||
| ctx.RenderWithErr(ctx.Tr(errMsg), tpl, &form) | |||
| return | |||
| } | |||
| @@ -1641,7 +1646,22 @@ func GrampusNotebookRestart(ctx *context.Context) { | |||
| return | |||
| } | |||
| lockOperator, errMsg := cloudbrainService.Lock4CloudbrainRestart(&lock.LockContext{Repo: ctx.Repo.Repository, Task: &models.Cloudbrain{JobType: task.JobType}, User: ctx.User}) | |||
| defer func() { | |||
| if lockOperator != nil { | |||
| lockOperator.Unlock() | |||
| } | |||
| }() | |||
| if errMsg != "" { | |||
| log.Error("lock processed failed:%s", errMsg, ctx.Data["MsgID"]) | |||
| errorMsg = ctx.Tr(errMsg) | |||
| } | |||
| for { | |||
| if errorMsg != "" { | |||
| break | |||
| } | |||
| if task.Status != models.GrampusStatusStopped && task.Status != models.GrampusStatusSucceeded && task.Status != models.GrampusStatusFailed { | |||
| log.Error("the job(%s) is not stopped", task.JobName, ctx.Data["MsgID"]) | |||
| @@ -2,6 +2,7 @@ package repo | |||
| import ( | |||
| "archive/zip" | |||
| "code.gitea.io/gitea/services/lock" | |||
| "encoding/json" | |||
| "errors" | |||
| "fmt" | |||
| @@ -35,8 +36,6 @@ import ( | |||
| "code.gitea.io/gitea/modules/modelarts" | |||
| "code.gitea.io/gitea/modules/notification" | |||
| "code.gitea.io/gitea/modules/obs" | |||
| "code.gitea.io/gitea/modules/redis/redis_key" | |||
| "code.gitea.io/gitea/modules/redis/redis_lock" | |||
| "code.gitea.io/gitea/modules/setting" | |||
| "code.gitea.io/gitea/modules/storage" | |||
| "code.gitea.io/gitea/modules/timeutil" | |||
| @@ -176,15 +175,19 @@ func Notebook2Create(ctx *context.Context, form auth.CreateModelArtsNotebookForm | |||
| imageId := form.ImageId | |||
| repo := ctx.Repo.Repository | |||
| lock := redis_lock.NewDistributeLock(redis_key.CloudbrainBindingJobNameKey(fmt.Sprint(repo.ID), string(models.JobTypeDebug), displayJobName)) | |||
| isOk, err := lock.Lock(models.CloudbrainKeyDuration) | |||
| if !isOk { | |||
| log.Error("lock processed failed:%v", err, ctx.Data["MsgID"]) | |||
| lockOperator, errMsg := cloudbrainService.Lock4CloudbrainCreation(&lock.LockContext{Repo: ctx.Repo.Repository, Task: &models.Cloudbrain{DisplayJobName: displayJobName, JobType: string(models.JobTypeDebug)}, User: ctx.User}) | |||
| defer func() { | |||
| if lockOperator != nil { | |||
| lockOperator.Unlock() | |||
| } | |||
| }() | |||
| if errMsg != "" { | |||
| log.Error("lock processed failed:%s", errMsg, ctx.Data["MsgID"]) | |||
| notebookNewDataPrepare(ctx) | |||
| ctx.RenderWithErr(ctx.Tr("repo.cloudbrain_samejob_err"), tplModelArtsNotebookNew, &form) | |||
| ctx.RenderWithErr(ctx.Tr(errMsg), tplModelArtsNotebookNew, &form) | |||
| return | |||
| } | |||
| defer lock.UnLock() | |||
| count, err := cloudbrainTask.GetNotFinalStatusTaskCount(ctx.User.ID, models.TypeCloudBrainTwo, string(models.JobTypeDebug)) | |||
| @@ -494,8 +497,21 @@ func NotebookRestart(ctx *context.Context) { | |||
| var spec *models.Specification | |||
| task := ctx.Cloudbrain | |||
| lockOperator, errMsg := cloudbrainService.Lock4CloudbrainRestart(&lock.LockContext{Repo: ctx.Repo.Repository, Task: &models.Cloudbrain{JobType: task.JobType}, User: ctx.User}) | |||
| defer func() { | |||
| if lockOperator != nil { | |||
| lockOperator.Unlock() | |||
| } | |||
| }() | |||
| if errMsg != "" { | |||
| log.Error("lock processed failed:%s", errMsg, ctx.Data["MsgID"]) | |||
| errorMsg = ctx.Tr(errMsg) | |||
| } | |||
| for { | |||
| if errMsg != "" { | |||
| break | |||
| } | |||
| ctx.CheckWechatBind() | |||
| if ctx.Written() { | |||
| return | |||
| @@ -1083,15 +1099,19 @@ func TrainJobCreate(ctx *context.Context, form auth.CreateModelArtsTrainJobForm) | |||
| return | |||
| } | |||
| lock := redis_lock.NewDistributeLock(redis_key.CloudbrainBindingJobNameKey(fmt.Sprint(repo.ID), string(models.JobTypeTrain), displayJobName)) | |||
| isOk, err := lock.Lock(models.CloudbrainKeyDuration) | |||
| if !isOk { | |||
| log.Error("lock processed failed:%v", err, ctx.Data["MsgID"]) | |||
| lockOperator, errMsg := cloudbrainService.Lock4CloudbrainCreation(&lock.LockContext{Repo: ctx.Repo.Repository, Task: &models.Cloudbrain{DisplayJobName: displayJobName, JobType: string(models.JobTypeTrain)}, User: ctx.User}) | |||
| defer func() { | |||
| if lockOperator != nil { | |||
| lockOperator.Unlock() | |||
| } | |||
| }() | |||
| if errMsg != "" { | |||
| log.Error("lock processed failed:%s", errMsg, ctx.Data["MsgID"]) | |||
| trainJobNewDataPrepare(ctx) | |||
| ctx.RenderWithErr(ctx.Tr("repo.cloudbrain_samejob_err"), tplModelArtsTrainJobNew, &form) | |||
| ctx.RenderWithErr(ctx.Tr(errMsg), tplModelArtsTrainJobNew, &form) | |||
| return | |||
| } | |||
| defer lock.UnLock() | |||
| count, err := cloudbrainTask.GetNotFinalStatusTaskCount(ctx.User.ID, models.TypeCloudBrainTwo, string(models.JobTypeTrain)) | |||
| @@ -1461,6 +1481,20 @@ func TrainJobCreateVersion(ctx *context.Context, form auth.CreateModelArtsTrainJ | |||
| ctx.Data["PageIsTrainJob"] = true | |||
| var jobID = ctx.Params(":jobid") | |||
| lockOperator, errMsg := cloudbrainService.Lock4CloudbrainCreation(&lock.LockContext{Repo: ctx.Repo.Repository, Task: &models.Cloudbrain{DisplayJobName: form.DisplayJobName, JobType: string(models.JobTypeTrain)}, User: ctx.User}) | |||
| defer func() { | |||
| if lockOperator != nil { | |||
| lockOperator.Unlock() | |||
| } | |||
| }() | |||
| if errMsg != "" { | |||
| log.Error("lock processed failed:%s", errMsg, ctx.Data["MsgID"]) | |||
| trainJobNewVersionDataPrepare(ctx) | |||
| ctx.RenderWithErr(ctx.Tr(errMsg), tplModelArtsTrainJobVersionNew, &form) | |||
| return | |||
| } | |||
| errStr := checkMultiNode(ctx.User.ID, form.WorkServerNumber) | |||
| if errStr != "" { | |||
| trainJobNewVersionDataPrepare(ctx) | |||
| @@ -1512,16 +1546,6 @@ func TrainJobCreateVersion(ctx *context.Context, form auth.CreateModelArtsTrainJ | |||
| EngineName := form.EngineName | |||
| isLatestVersion := modelarts.IsLatestVersion | |||
| lock := redis_lock.NewDistributeLock(redis_key.CloudbrainBindingJobNameKey(fmt.Sprint(repo.ID), string(models.JobTypeTrain), displayJobName)) | |||
| isOk, err := lock.Lock(models.CloudbrainKeyDuration) | |||
| if !isOk { | |||
| log.Error("lock processed failed:%v", err, ctx.Data["MsgID"]) | |||
| trainJobNewVersionDataPrepare(ctx) | |||
| ctx.RenderWithErr(ctx.Tr("repo.cloudbrain_samejob_err"), tplModelArtsTrainJobVersionNew, &form) | |||
| return | |||
| } | |||
| defer lock.UnLock() | |||
| canNewJob, _ := canUserCreateTrainJobVersion(ctx, latestTask.UserID) | |||
| if !canNewJob { | |||
| trainJobNewVersionDataPrepare(ctx) | |||
| @@ -2109,15 +2133,19 @@ func InferenceJobCreate(ctx *context.Context, form auth.CreateModelArtsInference | |||
| return | |||
| } | |||
| lock := redis_lock.NewDistributeLock(redis_key.CloudbrainBindingJobNameKey(fmt.Sprint(repo.ID), string(models.JobTypeInference), displayJobName)) | |||
| isOk, err := lock.Lock(models.CloudbrainKeyDuration) | |||
| if !isOk { | |||
| log.Error("lock processed failed:%v", err, ctx.Data["MsgID"]) | |||
| lockOperator, errMsg := cloudbrainService.Lock4CloudbrainCreation(&lock.LockContext{Repo: ctx.Repo.Repository, Task: &models.Cloudbrain{DisplayJobName: displayJobName, JobType: string(models.JobTypeInference)}, User: ctx.User}) | |||
| defer func() { | |||
| if lockOperator != nil { | |||
| lockOperator.Unlock() | |||
| } | |||
| }() | |||
| if errMsg != "" { | |||
| log.Error("lock processed failed:%s", errMsg, ctx.Data["MsgID"]) | |||
| inferenceJobErrorNewDataPrepare(ctx, form) | |||
| ctx.RenderWithErr(ctx.Tr("repo.cloudbrain_samejob_err"), tplModelArtsInferenceJobNew, &form) | |||
| ctx.RenderWithErr(ctx.Tr(errMsg), tplModelArtsInferenceJobNew, &form) | |||
| return | |||
| } | |||
| defer lock.UnLock() | |||
| count, err := cloudbrainTask.GetNotFinalStatusTaskCount(ctx.User.ID, models.TypeCloudBrainTwo, string(models.JobTypeInference)) | |||
| @@ -16,7 +16,7 @@ type StatusInfo struct { | |||
| var CloudbrainOneNotFinalStatuses = []string{string(models.JobWaiting), string(models.JobRunning)} | |||
| var CloudbrainTwoNotFinalStatuses = []string{string(models.ModelArtsTrainJobInit), string(models.ModelArtsTrainJobImageCreating), string(models.ModelArtsTrainJobSubmitTrying), string(models.ModelArtsTrainJobWaiting), string(models.ModelArtsTrainJobRunning), string(models.ModelArtsTrainJobScaling), string(models.ModelArtsTrainJobCheckInit), string(models.ModelArtsTrainJobCheckRunning), string(models.ModelArtsTrainJobCheckRunningCompleted)} | |||
| var GrampusNotFinalStatuses = []string{models.GrampusStatusWaiting, models.GrampusStatusRunning} | |||
| var GrampusNotFinalStatuses = []string{models.GrampusStatusWaiting, models.GrampusStatusRunning, models.GrampusStatusPending} | |||
| var StatusInfoDict = map[string]StatusInfo{string(models.JobTypeDebug) + "-" + strconv.Itoa(models.TypeCloudBrainOne): { | |||
| CloudBrainTypes: []int{models.TypeCloudBrainOne}, | |||
| JobType: []models.JobType{models.JobTypeDebug}, | |||
| @@ -92,7 +92,7 @@ func GetNotFinalStatusTaskCount(uid int64, cloudbrainType int, jobType string, c | |||
| if statusInfo, ok := StatusInfoDict[key]; ok { | |||
| return models.GetNotFinalStatusTaskCount(uid, statusInfo.NotFinalStatuses, statusInfo.JobType, statusInfo.CloudBrainTypes, statusInfo.ComputeResource) | |||
| return models.GetNotFinalStatusTaskCount(uid, statusInfo.NotFinalStatuses, statusInfo.JobType) | |||
| } else { | |||
| return 0, fmt.Errorf("Can not find the status info.") | |||
| @@ -2,12 +2,19 @@ package cloudbrain | |||
| import "code.gitea.io/gitea/services/lock" | |||
| var defaultChain = lock.NewLockChainOperator().Add(lock.CloudbrainUniquenessLock{}).Add(lock.CloudbrainDisplayJobNameLock{}) | |||
| func Lock4CloudbrainCreation(ctx *lock.LockContext) (*lock.LockChainOperator, string) { | |||
| errCode := defaultChain.Lock(ctx) | |||
| op := lock.NewLockChainOperator(ctx).Add(lock.CloudbrainUniquenessLock{}).Add(lock.CloudbrainDisplayJobNameLock{}) | |||
| errCode := op.Lock() | |||
| if errCode != "" { | |||
| return nil, errCode | |||
| } | |||
| return op, "" | |||
| } | |||
| func Lock4CloudbrainRestart(ctx *lock.LockContext) (*lock.LockChainOperator, string) { | |||
| op := lock.NewLockChainOperator(ctx).Add(lock.CloudbrainUniquenessLock{}) | |||
| errCode := op.Lock() | |||
| if errCode != "" { | |||
| return nil, errCode | |||
| } | |||
| return defaultChain, "" | |||
| return op, "" | |||
| } | |||
| @@ -16,7 +16,7 @@ func (c CloudbrainDisplayJobNameLock) IsMatch(ctx *LockContext) bool { | |||
| } | |||
| func (c CloudbrainDisplayJobNameLock) Lock(ctx *LockContext) string { | |||
| lock := redis_lock.NewDistributeLock(redis_key.CloudbrainBindingJobNameKey(fmt.Sprint(ctx.Repo.ID), string(models.JobTypeDebug), ctx.DisplayJobName)) | |||
| lock := redis_lock.NewDistributeLock(redis_key.CloudbrainBindingJobNameKey(fmt.Sprint(ctx.Repo.ID), ctx.Task.JobType, ctx.Task.DisplayJobName)) | |||
| isOk, err := lock.Lock(models.CloudbrainKeyDuration) | |||
| if !isOk { | |||
| log.Error("CloudbrainDisplayJobNameLock lock failed:%v", err) | |||
| @@ -26,6 +26,6 @@ func (c CloudbrainDisplayJobNameLock) Lock(ctx *LockContext) string { | |||
| } | |||
| func (c CloudbrainDisplayJobNameLock) Unlock(ctx *LockContext) error { | |||
| lock := redis_lock.NewDistributeLock(redis_key.CloudbrainBindingJobNameKey(fmt.Sprint(ctx.Repo.ID), string(models.JobTypeDebug), ctx.DisplayJobName)) | |||
| lock := redis_lock.NewDistributeLock(redis_key.CloudbrainBindingJobNameKey(fmt.Sprint(ctx.Repo.ID), ctx.Task.JobType, ctx.Task.DisplayJobName)) | |||
| return lock.UnLock() | |||
| } | |||
| @@ -1,11 +1,10 @@ | |||
| package lock | |||
| import ( | |||
| "code.gitea.io/gitea/models" | |||
| "code.gitea.io/gitea/modules/log" | |||
| "code.gitea.io/gitea/modules/redis/redis_key" | |||
| "code.gitea.io/gitea/modules/redis/redis_lock" | |||
| "time" | |||
| "code.gitea.io/gitea/modules/setting" | |||
| ) | |||
| type CloudbrainUniquenessLock struct { | |||
| @@ -16,16 +15,16 @@ func (c CloudbrainUniquenessLock) IsMatch(ctx *LockContext) bool { | |||
| } | |||
| func (c CloudbrainUniquenessLock) Lock(ctx *LockContext) string { | |||
| lock := redis_lock.NewDistributeLock(redis_key.CloudbrainUniquenessKey(ctx.User.ID, string(models.JobTypeDebug))) | |||
| isOk, err := lock.Lock(5 * time.Minute) | |||
| lock := redis_lock.NewDistributeLock(redis_key.CloudbrainUniquenessKey(ctx.User.ID, ctx.Task.JobType)) | |||
| isOk, err := lock.Lock(setting.CloudbrainUniquenessLockTime) | |||
| if !isOk { | |||
| log.Error("CloudbrainDisplayJobNameLock lock failed:%v", err) | |||
| return "you have already a running or waiting task, can not create more" | |||
| return "repo.cloudbrain.morethanonejob" | |||
| } | |||
| return "" | |||
| } | |||
| func (c CloudbrainUniquenessLock) Unlock(ctx *LockContext) error { | |||
| lock := redis_lock.NewDistributeLock(redis_key.CloudbrainUniquenessKey(ctx.User.ID, string(models.JobTypeDebug))) | |||
| lock := redis_lock.NewDistributeLock(redis_key.CloudbrainUniquenessKey(ctx.User.ID, ctx.Task.JobType)) | |||
| return lock.UnLock() | |||
| } | |||
| @@ -5,10 +5,9 @@ import ( | |||
| ) | |||
| type LockContext struct { | |||
| Repo *models.Repository | |||
| DisplayJobName string | |||
| User *models.User | |||
| LockedList []Lock | |||
| Repo *models.Repository | |||
| Task *models.Cloudbrain | |||
| User *models.User | |||
| } | |||
| type Lock interface { | |||
| @@ -1,40 +1,42 @@ | |||
| package lock | |||
| type LockChainOperator struct { | |||
| ChainList []Lock | |||
| chainList []Lock | |||
| lockedList []Lock | |||
| ctx *LockContext | |||
| } | |||
| func NewLockChainOperator() *LockChainOperator { | |||
| return &LockChainOperator{} | |||
| func NewLockChainOperator(ctx *LockContext) *LockChainOperator { | |||
| return &LockChainOperator{ctx: ctx} | |||
| } | |||
| func (b *LockChainOperator) Add(l Lock) *LockChainOperator { | |||
| b.ChainList = append(b.ChainList, l) | |||
| b.chainList = append(b.chainList, l) | |||
| return b | |||
| } | |||
| func (b *LockChainOperator) Lock(ctx *LockContext) string { | |||
| for i := 0; i < len(b.ChainList); i++ { | |||
| l := b.ChainList[i] | |||
| if !l.IsMatch(ctx) { | |||
| func (b *LockChainOperator) Lock() string { | |||
| for i := 0; i < len(b.chainList); i++ { | |||
| l := b.chainList[i] | |||
| if !l.IsMatch(b.ctx) { | |||
| continue | |||
| } | |||
| if errCode := l.Lock(ctx); errCode != "" { | |||
| b.Unlock(ctx) | |||
| if errCode := l.Lock(b.ctx); errCode != "" { | |||
| b.Unlock() | |||
| return errCode | |||
| } | |||
| ctx.LockedList = append(ctx.LockedList, l) | |||
| b.lockedList = append(b.lockedList, l) | |||
| } | |||
| return "" | |||
| } | |||
| func (b *LockChainOperator) Unlock(ctx *LockContext) error { | |||
| if b.ChainList == nil || len(b.ChainList) == 0 { | |||
| func (b *LockChainOperator) Unlock() error { | |||
| if b.chainList == nil || len(b.chainList) == 0 { | |||
| return nil | |||
| } | |||
| for j := len(ctx.LockedList) - 1; j >= 0; j-- { | |||
| ctx.LockedList[j].Unlock(ctx) | |||
| for j := len(b.lockedList) - 1; j >= 0; j-- { | |||
| b.lockedList[j].Unlock(b.ctx) | |||
| } | |||
| return nil | |||
| } | |||