From c5e6846c42aa7a770ea67ca6f99da3524cb48206 Mon Sep 17 00:00:00 2001 From: ychao_1983 Date: Mon, 21 Nov 2022 15:40:25 +0800 Subject: [PATCH] =?UTF-8?q?=E6=8F=90=E4=BA=A4=E4=BB=A3=E7=A0=81?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- models/cloudbrain.go | 37 +- modules/grampus/grampus.go | 107 ++--- modules/grampus/resty.go | 42 +- routers/api/v1/api.go | 3 + routers/repo/grampus.go | 370 ++++++++++++++++-- routers/routes/routes.go | 10 +- .../cloudbrain/cloudbrainTask/sync_status.go | 50 +++ 7 files changed, 514 insertions(+), 105 deletions(-) diff --git a/models/cloudbrain.go b/models/cloudbrain.go index a2a69316f..627c161f9 100755 --- a/models/cloudbrain.go +++ b/models/cloudbrain.go @@ -1444,16 +1444,16 @@ type GrampusJobInfo struct { } type GrampusNotebookInfo struct { - StartedAt int64 `json:"startedAt"` - RunSec int64 `json:"runSec"` - CompletedAt int64 `json:"completedAt"` - CreatedAt int64 `json:"createdAt"` - UpdatedAt int64 `json:"updatedAt"` - Desc string `json:"desc"` - JobID string `json:"id"` - Name string `json:"name"` - Status string `json:"status"` - UserID string `json:"userId"` + StartedAt int64 `json:"startedAt"` + RunSec int64 `json:"runSec"` + CompletedAt int64 `json:"completedAt"` + CreatedAt int64 `json:"createdAt"` + UpdatedAt int64 `json:"updatedAt"` + Desc string `json:"desc"` + JobID string `json:"id"` + Name string `json:"name"` + Status string `json:"status"` + UserID string `json:"userId"` Tasks []GrampusNotebookTask `json:"tasks"` } type Center struct { @@ -1537,6 +1537,11 @@ type GrampusNotebookResponse struct { JobInfo GrampusNotebookInfo `json:"otJob"` } +type GrampusNotebookRestartResponse struct { + NewId string `json:"newId"` + Status string `json:"status"` +} + type GrampusStopJobResponse struct { GrampusResult StoppedAt int64 `json:"stoppedAt"` @@ -1558,18 +1563,20 @@ type GrampusTasks struct { } type GrampusNotebookTask struct { AutoStopDuration int `json:"autoStopDuration"` - Name string `json:"name"` + Name string `json:"name"` Capacity int `json:"capacity"` CenterID []string `json:"centerID"` CenterName []string `json:"centerName"` Code GrampusDataset `json:"code"` Datasets []GrampusDataset `json:"datasets"` + CodeUrl string `json:"codeUrl"` + DataUrl string `json:"dataUrl"` ImageId string `json:"imageId"` ImageUrl string `json:"imageUrl"` ResourceSpecId string `json:"resourceSpecId"` - Token string `json:"token"` - Url string `json:"url"` - Status string `json:"status"` + Token string `json:"token"` + Url string `json:"url"` + Status string `json:"status"` } type GrampusDataset struct { @@ -1585,7 +1592,7 @@ type CreateGrampusJobRequest struct { } type CreateGrampusNotebookRequest struct { - Name string `json:"name"` + Name string `json:"name"` Tasks []GrampusNotebookTask `json:"tasks"` } diff --git a/modules/grampus/grampus.go b/modules/grampus/grampus.go index c8fc381d8..26e85a1cf 100755 --- a/modules/grampus/grampus.go +++ b/modules/grampus/grampus.go @@ -26,9 +26,9 @@ const ( CodeArchiveName = "master.zip" - BucketRemote = "grampus" - RemoteModelPath = "/output/" + models.ModelSuffix - autoStopDurationMs = 4 * 60 * 60 * 1000 + BucketRemote = "grampus" + RemoteModelPath = "/output/" + models.ModelSuffix + autoStopDurationMs = 4 * 60 * 60 * 1000 ) var ( @@ -83,22 +83,22 @@ type GenerateTrainJobReq struct { } type GenerateNotebookJobReq struct { - JobName string - Command string - ImageUrl string - ImageId string - DisplayJobName string - Uuid string - Description string - CodeObsPath string - CommitID string - BranchName string - ComputeResource string - ProcessType string - DatasetNames string - DatasetInfos map[string]models.DatasetInfo - Spec *models.Specification - CodeName string + JobName string + Command string + ImageUrl string + ImageId string + DisplayJobName string + Uuid string + Description string + CodeStoragePath string + CommitID string + BranchName string + ComputeResource string + ProcessType string + DatasetNames string + DatasetInfos map[string]models.DatasetInfo + Spec *models.Specification + CodeName string } func getEndPoint() string { @@ -133,22 +133,31 @@ func GenerateNotebookJob(ctx *context.Context, req *GenerateNotebookJobReq) (job Name: req.CodeName, Bucket: setting.Bucket, EndPoint: getEndPoint(), - ObjectKey: req.CodeObsPath + cloudbrain.DefaultBranchName + ".zip", + ObjectKey: req.CodeStoragePath + cloudbrain.DefaultBranchName + ".zip", } + } else { + + codeGrampus = models.GrampusDataset{ + Name: req.CodeName, + Bucket: setting.Bucket, + EndPoint: getEndPoint(), + ObjectKey: req.CodeStoragePath + cloudbrain.DefaultBranchName + ".zip", + } + } jobResult, err := createNotebookJob(models.CreateGrampusNotebookRequest{ Name: req.JobName, Tasks: []models.GrampusNotebookTask{ { - Name: req.JobName, - ResourceSpecId: req.Spec.SourceSpecId, - ImageId: req.ImageId, - ImageUrl: req.ImageUrl, - Datasets: datasetGrampus, - Code: codeGrampus, - AutoStopDuration:autoStopDurationMs, - Capacity: setting.Capacity, + Name: req.JobName, + ResourceSpecId: req.Spec.SourceSpecId, + ImageId: req.ImageId, + ImageUrl: req.ImageUrl, + Datasets: datasetGrampus, + Code: codeGrampus, + AutoStopDuration: autoStopDurationMs, + Capacity: setting.Capacity, }, }, }) @@ -159,27 +168,27 @@ func GenerateNotebookJob(ctx *context.Context, req *GenerateNotebookJobReq) (job jobID := jobResult.JobInfo.JobID err = models.CreateCloudbrain(&models.Cloudbrain{ - Status: TransTrainJobStatus(jobResult.JobInfo.Status), - UserID: ctx.User.ID, - RepoID: ctx.Repo.Repository.ID, - JobID: jobID, - JobName: req.JobName, - DisplayJobName: req.DisplayJobName, - JobType: string(models.JobTypeDebug), - Type: models.TypeC2Net, - Uuid: req.Uuid, - DatasetName: req.DatasetNames, - CommitID: req.CommitID, - IsLatestVersion: "1", - ComputeResource: req.ComputeResource, - ImageID: req.ImageId, - BranchName: req.BranchName, - Description: req.Description, - WorkServerNumber: 1, - EngineName: req.ImageUrl, - CreatedUnix: createTime, - UpdatedUnix: createTime, - Spec: req.Spec, + Status: TransTrainJobStatus(jobResult.JobInfo.Status), + UserID: ctx.User.ID, + RepoID: ctx.Repo.Repository.ID, + JobID: jobID, + JobName: req.JobName, + DisplayJobName: req.DisplayJobName, + JobType: string(models.JobTypeDebug), + Type: models.TypeC2Net, + Uuid: req.Uuid, + DatasetName: req.DatasetNames, + CommitID: req.CommitID, + IsLatestVersion: "1", + ComputeResource: req.ComputeResource, + ImageID: req.ImageId, + BranchName: req.BranchName, + Description: req.Description, + WorkServerNumber: 1, + EngineName: req.ImageUrl, + CreatedUnix: createTime, + UpdatedUnix: createTime, + Spec: req.Spec, }) if err != nil { diff --git a/modules/grampus/resty.go b/modules/grampus/resty.go index a5d55a71f..13e6866fc 100755 --- a/modules/grampus/resty.go +++ b/modules/grampus/resty.go @@ -26,7 +26,7 @@ const ( urlGetResourceSpecs = urlOpenApiV1 + "resourcespec" urlGetAiCenter = urlOpenApiV1 + "sharescreen/aicenter" urlGetImages = urlOpenApiV1 + "image" - urlNotebookJob = urlOpenApiV1 + "notebook" + urlNotebookJob = urlOpenApiV1 + "notebook" errorIllegalToken = 1005 ) @@ -154,8 +154,7 @@ sendjob: return &result, nil } - -func GetNotebookJob(jobID string)(*models.GrampusNotebookResponse, error){ +func GetNotebookJob(jobID string) (*models.GrampusNotebookResponse, error) { checkSetting() client := getRestyClient() var result models.GrampusNotebookResponse @@ -251,7 +250,7 @@ sendjob: return &result, nil } -func GetImages(processorType string) (*models.GetGrampusImagesResult, error) { +func GetImages(processorType string, jobType string) (*models.GetGrampusImagesResult, error) { checkSetting() client := getRestyClient() var result models.GetGrampusImagesResult @@ -262,7 +261,7 @@ sendjob: _, err := client.R(). SetAuthToken(TOKEN). SetResult(&result). - Get(HOST + urlGetImages + "?processorType=" + processorType) + Get(HOST + urlGetImages + "?processorType=" + processorType + "&jobType=" + jobType) if err != nil { return nil, fmt.Errorf("resty GetImages: %v", err) @@ -338,19 +337,26 @@ func GetGrampusMetrics(jobID string) (models.GetTrainJobMetricStatisticResult, e return result, nil } -func StopJob(jobID string) (*models.GrampusStopJobResponse, error) { +func StopJob(jobID string, jobType ...string) (*models.GrampusStopJobResponse, error) { checkSetting() client := getRestyClient() var result models.GrampusStopJobResponse retry := 0 + url := urlTrainJob + if len(jobType) > 0 { + if jobType[0] == string(models.JobTypeDebug) { + url = urlNotebookJob + } + } + sendjob: _, err := client.R(). //SetHeader("Content-Type", "application/json"). SetAuthToken(TOKEN). SetResult(&result). - Post(HOST + urlTrainJob + "/" + jobID + "/stop") + Post(HOST + url + "/" + jobID + "/stop") if err != nil { return &result, fmt.Errorf("resty StopTrainJob: %v", err) @@ -402,3 +408,25 @@ sendjob: return &result, nil } + +func RestartNotebookJob(jobID string) (*models.GrampusNotebookRestartResponse, error) { + checkSetting() + client := getRestyClient() + var restartResponse *models.GrampusNotebookRestartResponse + + res, err := client.R(). + SetAuthToken(TOKEN). + SetResult(&restartResponse). + Get(HOST + urlNotebookJob + "/" + jobID + "/start") + + if err != nil { + return nil, fmt.Errorf("resty grampus restart note book job: %v", err) + } + + if res.StatusCode() != http.StatusOK { + log.Error("resty grampus restart note book job failed(%s): %v", res.String(), err.Error()) + return nil, fmt.Errorf("resty grampus restart note book job failed: %v", err) + } + + return restartResponse, nil +} diff --git a/routers/api/v1/api.go b/routers/api/v1/api.go index 2afbb9b7d..dda8b0e80 100755 --- a/routers/api/v1/api.go +++ b/routers/api/v1/api.go @@ -1046,6 +1046,9 @@ func RegisterRoutes(m *macaron.Macaron) { }) }, reqRepoReader(models.UnitTypeCloudBrain)) m.Group("/grampus", func() { + m.Group("/notebook", func() { + m.Get("/:id", repo_ext.GetGrampusNotebook) + }) m.Group("/train-job", func() { m.Group("/:jobid", func() { m.Get("", repo.GetModelArtsTrainJobVersion) diff --git a/routers/repo/grampus.go b/routers/repo/grampus.go index 4942e1df2..f2f26cf0d 100755 --- a/routers/repo/grampus.go +++ b/routers/repo/grampus.go @@ -45,6 +45,7 @@ import ( const ( tplGrampusTrainJobShow base.TplName = "repo/grampus/trainjob/show" + tplGrampusNotebookShow base.TplName = "repo/grampus/notebook/show" //GPU tplGrampusNotebookGPUNew base.TplName = "repo/grampus/notebook/gpu/new" @@ -104,6 +105,10 @@ func GrampusNotebookCreate(ctx *context.Context, form auth.CreateGrampusNotebook repo := ctx.Repo.Repository branchName := form.BranchName image := strings.TrimSpace(form.Image) + codeLocalPath := setting.JobPath + jobName + cloudbrain.CodeMountPath + "/" + + codeStoragePath := setting.CBCodePathPrefix + jobName + cloudbrain.CodeMountPath + "/" + tpl := tplGrampusNotebookGPUNew processType := grampus.ProcessorTypeGPU computeSource := models.GPUResource @@ -112,7 +117,8 @@ func GrampusNotebookCreate(ctx *context.Context, form auth.CreateGrampusNotebook tpl = tplGrampusNotebookNPUNew processType = grampus.ProcessorTypeNPU computeSource = models.NPUResource - computeSourceSimple := models.NPU + computeSourceSimple = models.NPU + codeStoragePath = grampus.JobPath + jobName + modelarts.CodePath } lock := redis_lock.NewDistributeLock(redis_key.CloudbrainBindingJobNameKey(fmt.Sprint(repo.ID), string(models.JobTypeDebug), displayJobName)) @@ -135,13 +141,13 @@ func GrampusNotebookCreate(ctx *context.Context, form auth.CreateGrampusNotebook count, err := cloudbrainTask.GetNotFinalStatusTaskCount(ctx.User.ID, models.TypeC2Net, string(models.JobTypeDebug), computeSource) if err != nil { log.Error("GetGrampusCountByUserID failed:%v", err, ctx.Data["MsgID"]) - grampusTrainJobNewDataPrepare(ctx, processType) + grampusNotebookNewDataPrepare(ctx, processType) ctx.RenderWithErr("system error", tpl, &form) return } else { if count >= 1 { log.Error("the user already has running or waiting task", ctx.Data["MsgID"]) - grampusTrainJobNewDataPrepare(ctx, processType) + grampusNotebookNewDataPrepare(ctx, processType) ctx.RenderWithErr("you have already a running or waiting task, can not create more", tpl, &form) return } @@ -152,14 +158,14 @@ func GrampusNotebookCreate(ctx *context.Context, form auth.CreateGrampusNotebook if err == nil { if len(tasks) != 0 { log.Error("the job name did already exist", ctx.Data["MsgID"]) - grampusTrainJobNewDataPrepare(ctx, processType) + grampusNotebookNewDataPrepare(ctx, processType) ctx.RenderWithErr("the job name did already exist", tpl, &form) return } } else { if !models.IsErrJobNotExist(err) { log.Error("system error, %v", err, ctx.Data["MsgID"]) - grampusTrainJobNewDataPrepare(ctx, processType) + grampusNotebookNewDataPrepare(ctx, processType) ctx.RenderWithErr("system error", tpl, &form) return } @@ -172,41 +178,80 @@ func GrampusNotebookCreate(ctx *context.Context, form auth.CreateGrampusNotebook Cluster: models.C2NetCluster, }) if err != nil || spec == nil { - grampusTrainJobNewDataPrepare(ctx, processType) + grampusNotebookNewDataPrepare(ctx, processType) ctx.RenderWithErr("Resource specification not available", tpl, &form) return } if !account.IsPointBalanceEnough(ctx.User.ID, spec.UnitPrice) { log.Error("point balance is not enough,userId=%d specId=%d", ctx.User.ID, spec.ID) - grampusTrainJobNewDataPrepare(ctx, processType) + grampusNotebookNewDataPrepare(ctx, processType) ctx.RenderWithErr(ctx.Tr("points.insufficient_points_balance"), tpl, &form) return } + datasetInfos, datasetNames, err := models.GetDatasetInfo(uuid, computeSourceSimple) + if err != nil { + log.Error("GetDatasetInfo failed: %v", err, ctx.Data["MsgID"]) + grampusNotebookNewDataPrepare(ctx, processType) + ctx.RenderWithErr(ctx.Tr("cloudbrain.error.dataset_select"), tpl, &form) + return + } + + //prepare code and out path + _, err = ioutil.ReadDir(codeLocalPath) + if err == nil { + os.RemoveAll(codeLocalPath) + } + + if err := downloadZipCode(ctx, codeLocalPath, branchName); err != nil { + log.Error("downloadZipCode failed, server timed out: %s (%v)", repo.FullName(), err) + grampusNotebookNewDataPrepare(ctx, processType) + ctx.RenderWithErr(ctx.Tr("cloudbrain.load_code_failed"), tpl, &form) + return + } + + if processType == grampus.ProcessorTypeGPU { + if err := uploadCodeToMinio(codeLocalPath+"/", jobName, cloudbrain.CodeMountPath+"/"); err != nil { + log.Error("Failed to uploadCodeToMinio: %s (%v)", repo.FullName(), err, ctx.Data["MsgID"]) + grampusNotebookNewDataPrepare(ctx, processType) + ctx.RenderWithErr(ctx.Tr("cloudbrain.load_code_failed"), tpl, &form) + return + } + } else { + + if err := uploadCodeToObs(codeLocalPath, jobName, ""); err != nil { + log.Error("Failed to uploadCodeToObs: %s (%v)", repo.FullName(), err) + grampusNotebookNewDataPrepare(ctx, processType) + ctx.RenderWithErr(ctx.Tr("cloudbrain.load_code_failed"), tpl, &form) + return + } + } + commitID, _ := ctx.Repo.GitRepo.GetBranchCommitID(branchName) command := "" - req := &grampus.GenerateNotebookReq{ - JobName: jobName, - DisplayJobName: displayJobName, - ComputeResource: computeSource, - ProcessType: processType, - Command: command, - ImageUrl: image, - ImageId: form.ImageID, - Description: description, - Uuid: uuid, - CommitID: commitID, - BranchName: branchName, - DatasetNames: form.DatasetName, - WorkServerNumber: 1, - Spec: spec, + req := &grampus.GenerateNotebookJobReq{ + JobName: jobName, + DisplayJobName: displayJobName, + ComputeResource: computeSource, + ProcessType: processType, + Command: command, + ImageUrl: image, + ImageId: form.ImageID, + Description: description, + Uuid: uuid, + CommitID: commitID, + BranchName: branchName, + DatasetNames: datasetNames, + DatasetInfos: datasetInfos, + Spec: spec, + CodeStoragePath: codeStoragePath, } - _, err = grampus.GenerateNotebook(ctx, req) + _, err = grampus.GenerateNotebookJob(ctx, req) if err != nil { - log.Error("GenerateTrainJob failed:%v", err.Error(), ctx.Data["MsgID"]) + log.Error("GenerateNotebookJob failed:%v", err.Error(), ctx.Data["MsgID"]) grampusTrainJobNewDataPrepare(ctx, processType) ctx.RenderWithErr(err.Error(), tpl, &form) return @@ -844,22 +889,54 @@ func grampusTrainJobNpuCreate(ctx *context.Context, form auth.CreateGrampusTrain ctx.Redirect(setting.AppSubURL + ctx.Repo.RepoLink + "/modelarts/train-job") } +func GetGrampusNotebook(ctx *context.APIContext) { + var ( + err error + ) + + ID := ctx.Params(":id") + job, err := models.GetCloudbrainByID(ID) + if err != nil { + ctx.NotFound("", err) + log.Error("GetCloudbrainByID failed:", err) + return + } + + jobAfter, err := cloudbrainTask.SyncGrampusNotebookStatus(job) + + if err != nil { + ctx.NotFound(err) + log.Error("Sync cloud brain one status failed:", err) + return + } + + ctx.JSON(http.StatusOK, map[string]interface{}{ + "ID": ID, + "JobName": jobAfter.JobName, + "JobStatus": jobAfter.Status, + "SubState": "", + "CreatedTime": jobAfter.CreatedUnix.Format("2006-01-02 15:04:05"), + "CompletedTime": jobAfter.UpdatedUnix.Format("2006-01-02 15:04:05"), + "JobDuration": jobAfter.TrainJobDuration, + }) +} + func GrampusStopJob(ctx *context.Context) { - var ID = ctx.Params(":jobid") + var ID = ctx.Params(":id") var resultCode = "0" var errorMsg = "" var status = "" task := ctx.Cloudbrain for { - if task.Status == string(models.GrampusStatusStopped) || task.Status == string(models.GrampusStatusFailed) || task.Status == string(models.GrampusStatusSucceeded) { + if task.Status == models.GrampusStatusStopped || task.Status == models.GrampusStatusFailed || task.Status == models.GrampusStatusSucceeded { log.Error("the job(%s) has been stopped", task.JobName, ctx.Data["msgID"]) resultCode = "-1" - errorMsg = "system error" + errorMsg = "System error" break } - res, err := grampus.StopJob(task.JobID) + res, err := grampus.StopJob(task.JobID, task.JobType) if err != nil { log.Error("StopJob(%s) failed:%v", task.JobName, err, ctx.Data["msgID"]) resultCode = strconv.Itoa(res.ErrorCode) @@ -896,6 +973,25 @@ func GrampusStopJob(ctx *context.Context) { }) } +func GrampusNotebookDel(ctx *context.Context) { + var listType = ctx.Query("listType") + if err := deleteGrampusJob(ctx); err != nil { + log.Error("deleteGrampusJob failed: %v", err, ctx.Data["msgID"]) + ctx.ServerError(err.Error(), err) + return + } + + var isAdminPage = ctx.Query("isadminpage") + var isHomePage = ctx.Query("ishomepage") + if ctx.IsUserSiteAdmin() && isAdminPage == "true" { + ctx.Redirect(setting.AppSubURL + "/admin" + "/cloudbrains") + } else if isHomePage == "true" { + ctx.Redirect(setting.AppSubURL + "/cloudbrains") + } else { + ctx.Redirect(setting.AppSubURL + ctx.Repo.RepoLink + "/debugjob?debugListType=" + listType) + } +} + func GrampusTrainJobDel(ctx *context.Context) { var listType = ctx.Query("listType") if err := deleteGrampusJob(ctx); err != nil { @@ -918,9 +1014,9 @@ func GrampusTrainJobDel(ctx *context.Context) { func deleteGrampusJob(ctx *context.Context) error { task := ctx.Cloudbrain - if task.Status != string(models.GrampusStatusStopped) && task.Status != string(models.GrampusStatusSucceeded) && task.Status != string(models.GrampusStatusFailed) { + if task.Status != models.GrampusStatusStopped && task.Status != models.GrampusStatusSucceeded && task.Status != models.GrampusStatusFailed { log.Error("the job(%s) has not been stopped", task.JobName, ctx.Data["msgID"]) - return errors.New("the job has not been stopped") + return errors.New(ctx.Tr("cloudbrain.Not_Stopped")) } err := models.DeleteJob(task) @@ -938,6 +1034,89 @@ func deleteGrampusJob(ctx *context.Context) error { return nil } +func GrampusNotebookShow(ctx *context.Context) { + ctx.Data["PageIsCloudBrain"] = true + + var task *models.Cloudbrain + task, err := models.GetCloudbrainByJobIDWithDeleted(ctx.Params(":jobid")) + if err != nil { + log.Error("GetCloudbrainByJobID failed:" + err.Error()) + ctx.NotFound(ctx.Req.URL.RequestURI(), nil) + return + } + task.ContainerIp = "" + + if task.DeletedAt.IsZero() { //normal record + result, err := grampus.GetNotebookJob(task.JobID) + if err != nil { + log.Error("GetJob failed:" + err.Error()) + ctx.NotFound(ctx.Req.URL.RequestURI(), nil) + return + } + + if result != nil { + if len(result.JobInfo.Tasks[0].CenterID) == 1 && len(result.JobInfo.Tasks[0].CenterName) == 1 { + task.AiCenter = result.JobInfo.Tasks[0].CenterID[0] + "+" + result.JobInfo.Tasks[0].CenterName[0] + } + oldStatus := task.Status + task.Status = grampus.TransTrainJobStatus(result.JobInfo.Status) + if task.Status != oldStatus || task.Status == models.GrampusStatusRunning { + task.Duration = result.JobInfo.RunSec + if task.Duration < 0 { + task.Duration = 0 + } + task.TrainJobDuration = models.ConvertDurationToStr(task.Duration) + + if task.StartTime == 0 && result.JobInfo.StartedAt > 0 { + task.StartTime = timeutil.TimeStamp(result.JobInfo.StartedAt) + } + if task.EndTime == 0 && models.IsTrainJobTerminal(task.Status) && task.StartTime > 0 { + task.EndTime = task.StartTime.Add(task.Duration) + } + task.CorrectCreateUnix() + if oldStatus != task.Status { + notification.NotifyChangeCloudbrainStatus(task, oldStatus) + if models.IsTrainJobTerminal(task.Status) && task.ComputeResource == models.NPUResource { + if len(result.JobInfo.Tasks[0].CenterID) == 1 { + urchin.GetBackNpuModel(task.ID, grampus.GetRemoteEndPoint(result.JobInfo.Tasks[0].CenterID[0]), grampus.BucketRemote, grampus.GetNpuModelObjectKey(task.JobName), grampus.GetCenterProxy(setting.Grampus.LocalCenterID)) + } + } + } + } + err = models.UpdateJob(task) + if err != nil { + log.Error("UpdateJob failed:" + err.Error()) + } + } + } + + if len(task.Parameters) > 0 { + var parameters models.Parameters + err := json.Unmarshal([]byte(task.Parameters), ¶meters) + if err != nil { + log.Error("Failed to Unmarshal Parameters: %s (%v)", task.Parameters, err) + ctx.ServerError("system error", err) + return + } + + if len(parameters.Parameter) > 0 { + paramTemp := "" + for _, Parameter := range parameters.Parameter { + param := Parameter.Label + " = " + Parameter.Value + "; " + paramTemp = paramTemp + param + } + task.Parameters = paramTemp[:len(paramTemp)-2] + } else { + task.Parameters = "" + } + } + prepareSpec4Show(ctx, task) + ctx.Data["datasetDownload"] = GetCloudBrainDataSetInfo(task.Uuid, task.DatasetName, false) + ctx.Data["canDownload"] = cloudbrain.CanModifyJob(ctx, task) + ctx.Data["ai_center"] = cloudbrainService.GetAiCenterShow(task.AiCenter, ctx) + ctx.HTML(http.StatusOK, tplGrampusNotebookShow) +} + func GrampusTrainJobShow(ctx *context.Context) { ctx.Data["PageIsCloudBrain"] = true @@ -1281,3 +1460,134 @@ func HandleTaskWithAiCenter(ctx *context.Context) { r["updateCounts"] = updateCounts ctx.JSON(http.StatusOK, response.SuccessWithData(r)) } + +func GrampusNotebookDebug(ctx *context.Context) { + + result, err := grampus.GetNotebookJob(ctx.Cloudbrain.JobID) + + if err != nil { + ctx.RenderWithErr(err.Error(), tplDebugJobIndex, nil) + return + } + if len(result.JobInfo.Tasks) > 0 { + ctx.Redirect(result.JobInfo.Tasks[0].Url + "?token=" + result.JobInfo.Tasks[0].Token) + return + } + ctx.NotFound("Can not find the job.", nil) + +} + +func GrampusNotebookRestart(ctx *context.Context) { + var id = ctx.Params(":id") + var resultCode = "-1" + var errorMsg = "" + var status = "" + var spec *models.Specification + + task := ctx.Cloudbrain + if ctx.Written() { + return + } + + for { + + if task.Status != models.GrampusStatusStopped && task.Status != models.GrampusStatusSucceeded && task.Status != models.GrampusStatusFailed { + log.Error("the job(%s) is not stopped", task.JobName, ctx.Data["MsgID"]) + errorMsg = "the job is not stopped" + break + } + + count, err := cloudbrainTask.GetNotFinalStatusTaskCount(ctx.User.ID, models.TypeC2Net, string(models.JobTypeDebug), task.ComputeResource) + + if err != nil { + log.Error("GetCloudbrainNotebookCountByUserID failed:%v", err, ctx.Data["MsgID"]) + errorMsg = "system error" + break + } else { + if count >= 1 { + log.Error("the user already has running or waiting task", ctx.Data["MsgID"]) + resultCode = "2" + errorMsg = ctx.Tr("repo.cloudbrain.morethanonejob") + break + } + } + + oldSpec, err := resource.GetCloudbrainSpec(task.ID) + if err != nil || oldSpec == nil { + log.Error("NotebookManage GetCloudbrainSpec error.%v", err) + errorMsg = "Resource specification not available" + break + } + + computeSourceSimple := models.GPU + action := models.ActionCreateGrampusGPUDebugTask + if task.ComputeResource == models.NPUResource { + computeSourceSimple = models.NPU + action = models.ActionCreateGrampusNPUDebugTask + } + spec, err = resource.GetAndCheckSpec(ctx.User.ID, oldSpec.ID, models.FindSpecsOptions{ + JobType: models.JobType(task.JobType), + ComputeResource: computeSourceSimple, + Cluster: models.C2NetCluster, + }) + if err != nil || spec == nil { + log.Error("NotebookManage GetAndCheckSpec error.task.id = %d", task.ID) + errorMsg = "Resource specification not support any more" + break + } + if !account.IsPointBalanceEnough(ctx.User.ID, spec.UnitPrice) { + log.Error("point balance is not enough,userId=%d specId=%d", ctx.User.ID, spec.ID) + errorMsg = ctx.Tr("points.insufficient_points_balance") + break + } + createTime := timeutil.TimeStampNow() + + res, err := grampus.RestartNotebookJob(task.JobID) + if err != nil { + log.Error("ManageNotebook2(%s) failed:%v", task.DisplayJobName, err.Error(), ctx.Data["MsgID"]) + errorMsg = err.Error() + break + } + + newTask := &models.Cloudbrain{ + Status: res.Status, + UserID: task.UserID, + RepoID: task.RepoID, + JobID: res.NewId, + JobName: task.JobName, + DisplayJobName: task.DisplayJobName, + JobType: task.JobType, + Type: task.Type, + Uuid: task.Uuid, + Image: task.Image, + ComputeResource: task.ComputeResource, + Description: task.Description, + CreatedUnix: createTime, + UpdatedUnix: createTime, + Spec: spec, + } + + err = models.RestartCloudbrain(task, newTask) + if err != nil { + log.Error("RestartCloudbrain(%s) failed:%v", task.JobName, err.Error(), ctx.Data["MsgID"]) + errorMsg = "system error" + break + } + + id = strconv.FormatInt(newTask.ID, 10) + + status = res.Status + resultCode = "0" + + notification.NotifyOtherTask(ctx.User, ctx.Repo.Repository, id, newTask.DisplayJobName, action) + + break + } + + ctx.JSON(200, map[string]string{ + "result_code": resultCode, + "error_msg": errorMsg, + "status": status, + "id": id, + }) +} diff --git a/routers/routes/routes.go b/routers/routes/routes.go index 450c2ac1b..c56ca46ee 100755 --- a/routers/routes/routes.go +++ b/routers/routes/routes.go @@ -1217,10 +1217,12 @@ func RegisterRoutes(m *macaron.Macaron) { }, context.RepoRef()) m.Group("/grampus", func() { m.Group("/notebook", func() { - m.Group("/:jobid", func() { - m.Get("", reqRepoCloudBrainReader, repo.GrampusTrainJobShow) + m.Group("/:id", func() { + m.Get("", reqRepoCloudBrainReader, repo.GrampusNotebookShow) + m.Get("/debug", reqWechatBind, cloudbrain.AdminOrJobCreaterRight, repo.GrampusNotebookDebug) + m.Post("/restart", reqWechatBind, cloudbrain.AdminOrJobCreaterRight, repo.GrampusNotebookRestart) m.Post("/stop", cloudbrain.AdminOrOwnerOrJobCreaterRight, repo.GrampusStopJob) - m.Post("/del", cloudbrain.AdminOrOwnerOrJobCreaterRightForTrain, repo.GrampusTrainJobDel) + m.Post("/del", cloudbrain.AdminOrOwnerOrJobCreaterRight, repo.GrampusNotebookDel) }) m.Get("/create", reqWechatBind, reqRepoCloudBrainWriter, context.PointAccount(), repo.GrampusNotebookNew) @@ -1230,7 +1232,7 @@ func RegisterRoutes(m *macaron.Macaron) { m.Group("/train-job", func() { m.Group("/:jobid", func() { m.Get("", reqRepoCloudBrainReader, repo.GrampusTrainJobShow) - m.Post("/stop", cloudbrain.AdminOrOwnerOrJobCreaterRight, repo.GrampusStopJob) + m.Post("/stop", cloudbrain.AdminOrOwnerOrJobCreaterRightForTrain, repo.GrampusStopJob) m.Post("/del", cloudbrain.AdminOrOwnerOrJobCreaterRightForTrain, repo.GrampusTrainJobDel) m.Get("/model_download", cloudbrain.AdminOrJobCreaterRightForTrain, repo.ModelDownload) m.Get("/create_version", reqWechatBind, cloudbrain.AdminOrJobCreaterRightForTrain, repo.GrampusTrainJobVersionNew) diff --git a/services/cloudbrain/cloudbrainTask/sync_status.go b/services/cloudbrain/cloudbrainTask/sync_status.go index 67dc4d3b7..99ef7c419 100644 --- a/services/cloudbrain/cloudbrainTask/sync_status.go +++ b/services/cloudbrain/cloudbrainTask/sync_status.go @@ -3,6 +3,9 @@ package cloudbrainTask import ( "net/http" + "code.gitea.io/gitea/modules/grampus" + "code.gitea.io/gitea/modules/timeutil" + "code.gitea.io/gitea/models" "code.gitea.io/gitea/modules/cloudbrain" "code.gitea.io/gitea/modules/httplib" @@ -57,6 +60,53 @@ func SyncCloudBrainOneStatus(task *models.Cloudbrain) (*models.Cloudbrain, error } +func SyncGrampusNotebookStatus(job *models.Cloudbrain) (*models.Cloudbrain, error) { + result, err := grampus.GetNotebookJob(job.JobID) + if err != nil { + + log.Error("GetJob(%s) failed:%v", job.JobName, err) + + return job, err + } + + if job.StartTime == 0 && result.JobInfo.StartedAt > 0 { + job.StartTime = timeutil.TimeStamp(result.JobInfo.StartedAt) + } + oldStatus := job.Status + job.Status = grampus.TransTrainJobStatus(result.JobInfo.Status) + job.Duration = result.JobInfo.RunSec + job.TrainJobDuration = models.ConvertDurationToStr(job.Duration) + + if job.EndTime == 0 && models.IsTrainJobTerminal(job.Status) && job.StartTime > 0 { + job.EndTime = job.StartTime.Add(job.Duration) + } + job.CorrectCreateUnix() + + if len(job.AiCenter) == 0 { + if len(result.JobInfo.Tasks) > 0 { + if len(result.JobInfo.Tasks[0].CenterID) > 0 && len(result.JobInfo.Tasks[0].CenterName) > 0 { + job.AiCenter = result.JobInfo.Tasks[0].CenterID[0] + "+" + result.JobInfo.Tasks[0].CenterName[0] + } + } + } + + if job.Status != models.GrampusStatusWaiting { + if oldStatus != job.Status { + notification.NotifyChangeCloudbrainStatus(job, oldStatus) + } + job.TrainUrl = result.JobInfo.Tasks[0].CodeUrl + job.DataUrl = result.JobInfo.Tasks[0].DataUrl + err = models.UpdateJob(job) + if err != nil { + log.Error("UpdateJob failed:", err) + return nil, err + } + } + + return job, nil + +} + func isNoteBookReady(task *models.Cloudbrain) bool { if task.JobType != string(models.JobTypeDebug) { return true