package cloudbrainTask import ( "encoding/json" "errors" "fmt" "io" "io/ioutil" "net/http" "os" "path" "regexp" "strings" "code.gitea.io/gitea/modules/timeutil" "code.gitea.io/gitea/modules/notification" "code.gitea.io/gitea/modules/obs" "code.gitea.io/gitea/modules/git" "code.gitea.io/gitea/modules/storage" "github.com/unknwon/com" "code.gitea.io/gitea/models" "code.gitea.io/gitea/modules/cloudbrain" "code.gitea.io/gitea/modules/context" "code.gitea.io/gitea/modules/grampus" "code.gitea.io/gitea/modules/log" "code.gitea.io/gitea/modules/modelarts" "code.gitea.io/gitea/modules/redis/redis_key" "code.gitea.io/gitea/modules/redis/redis_lock" "code.gitea.io/gitea/modules/setting" api "code.gitea.io/gitea/modules/structs" "code.gitea.io/gitea/modules/util" "code.gitea.io/gitea/services/cloudbrain/resource" "code.gitea.io/gitea/services/reward/point/account" ) var jobNamePattern = regexp.MustCompile(`^[a-z0-9][a-z0-9-_]{1,34}[a-z0-9-]$`) func GrampusTrainJobGpuCreate(ctx *context.Context, option api.CreateTrainJobOption) { displayJobName := option.DisplayJobName jobName := util.ConvertDisplayJobNameToJobName(displayJobName) uuid := option.Attachment description := option.Description bootFile := strings.TrimSpace(option.BootFile) params := option.Params repo := ctx.Repo.Repository codeLocalPath := setting.JobPath + jobName + cloudbrain.CodeMountPath + "/" codeMinioPath := setting.CBCodePathPrefix + jobName + cloudbrain.CodeMountPath + "/" branchName := option.BranchName image := strings.TrimSpace(option.Image) lock := redis_lock.NewDistributeLock(redis_key.CloudbrainBindingJobNameKey(fmt.Sprint(repo.ID), string(models.JobTypeTrain), displayJobName)) defer lock.UnLock() spec, datasetInfos, datasetNames, err := checkParameters(ctx, option, lock, repo) if err != nil { ctx.JSON(http.StatusOK, models.BaseErrorMessageApi(err.Error())) return } //prepare code and out path _, err = ioutil.ReadDir(codeLocalPath) if err == nil { os.RemoveAll(codeLocalPath) } if err := downloadZipCode(ctx, codeLocalPath, branchName); err != nil { log.Error("downloadZipCode failed, server timed out: %s (%v)", repo.FullName(), err, ctx.Data["MsgID"]) ctx.JSON(http.StatusOK, models.BaseErrorMessageApi(ctx.Tr("cloudbrain.load_code_failed"))) } //todo: upload code (send to file_server todo this work?) //upload code if err := uploadCodeToMinio(codeLocalPath+"/", jobName, cloudbrain.CodeMountPath+"/"); err != nil { log.Error("Failed to uploadCodeToMinio: %s (%v)", repo.FullName(), err, ctx.Data["MsgID"]) ctx.JSON(http.StatusOK, models.BaseErrorMessageApi(ctx.Tr("cloudbrain.load_code_failed"))) return } modelPath := setting.JobPath + jobName + cloudbrain.ModelMountPath + "/" if err := mkModelPath(modelPath); err != nil { log.Error("Failed to mkModelPath: %s (%v)", repo.FullName(), err, ctx.Data["MsgID"]) ctx.JSON(http.StatusOK, models.BaseErrorMessageApi(ctx.Tr("cloudbrain.load_code_failed"))) return } //init model readme if err := uploadCodeToMinio(modelPath, jobName, cloudbrain.ModelMountPath+"/"); err != nil { log.Error("Failed to uploadCodeToMinio: %s (%v)", repo.FullName(), err, ctx.Data["MsgID"]) ctx.JSON(http.StatusOK, models.BaseErrorMessageApi(ctx.Tr("cloudbrain.load_code_failed"))) return } var datasetRemotePath, allFileName string for _, datasetInfo := range datasetInfos { if datasetRemotePath == "" { datasetRemotePath = datasetInfo.DataLocalPath allFileName = datasetInfo.FullName } else { datasetRemotePath = datasetRemotePath + ";" + datasetInfo.DataLocalPath allFileName = allFileName + ";" + datasetInfo.FullName } } //prepare command preTrainModelPath := getPreTrainModelPath(option.PreTrainModelUrl, option.CkptName) command, err := generateCommand(repo.Name, grampus.ProcessorTypeGPU, codeMinioPath+cloudbrain.DefaultBranchName+".zip", datasetRemotePath, bootFile, params, setting.CBCodePathPrefix+jobName+cloudbrain.ModelMountPath+"/", allFileName, preTrainModelPath, option.CkptName) if err != nil { log.Error("Failed to generateCommand: %s (%v)", displayJobName, err, ctx.Data["MsgID"]) ctx.JSON(http.StatusOK, models.BaseErrorMessageApi("Create task failed, internal error")) return } commitID, _ := ctx.Repo.GitRepo.GetBranchCommitID(branchName) req := &grampus.GenerateTrainJobReq{ JobName: jobName, DisplayJobName: displayJobName, ComputeResource: models.GPUResource, ProcessType: grampus.ProcessorTypeGPU, Command: command, ImageUrl: image, Description: description, BootFile: bootFile, Uuid: uuid, CommitID: commitID, BranchName: branchName, Params: option.Params, EngineName: image, DatasetNames: datasetNames, DatasetInfos: datasetInfos, IsLatestVersion: modelarts.IsLatestVersion, VersionCount: modelarts.VersionCountOne, WorkServerNumber: 1, Spec: spec, } if option.ModelName != "" { //使用预训练模型训练 req.ModelName = option.ModelName req.LabelName = option.LabelName req.CkptName = option.CkptName req.ModelVersion = option.ModelVersion req.PreTrainModelUrl = option.PreTrainModelUrl } jobId, err := grampus.GenerateTrainJob(ctx, req) if err != nil { log.Error("GenerateTrainJob failed:%v", err.Error(), ctx.Data["MsgID"]) ctx.JSON(http.StatusOK, models.BaseErrorMessageApi(err.Error())) return } ctx.JSON(http.StatusOK, models.BaseMessageApi{Code: 0, Message: jobId}) } func checkParameters(ctx *context.Context, option api.CreateTrainJobOption, lock *redis_lock.DistributeLock, repo *models.Repository) (*models.Specification, map[string]models.DatasetInfo, string, error) { isOk, err := lock.Lock(models.CloudbrainKeyDuration) if !isOk { log.Error("lock processed failed:%v", err, ctx.Data["MsgID"]) return nil, nil, "", fmt.Errorf(ctx.Tr("repo.cloudbrain_samejob_err")) } if !jobNamePattern.MatchString(option.DisplayJobName) { return nil, nil, "", fmt.Errorf(ctx.Tr("repo.cloudbrain_jobname_err")) } bootFileExist, err := ctx.Repo.FileExists(option.BootFile, option.BranchName) if err != nil || !bootFileExist { log.Error("Get bootfile error:", err, ctx.Data["MsgID"]) return nil, nil, "", fmt.Errorf(ctx.Tr("repo.cloudbrain_bootfile_err")) } computeResource := models.GPUResource if option.Type == 3 { computeResource = models.NPUResource } //check count limit count, err := GetNotFinalStatusTaskCount(ctx.User.ID, models.TypeC2Net, string(models.JobTypeTrain), computeResource) if err != nil { log.Error("GetGrampusCountByUserID failed:%v", err, ctx.Data["MsgID"]) return nil, nil, "", fmt.Errorf("system error") } else { if count >= 1 { log.Error("the user already has running or waiting task", ctx.Data["MsgID"]) return nil, nil, "", fmt.Errorf("you have already a running or waiting task, can not create more.") } } //check param if err := grampusParamCheckCreateTrainJob(option.BootFile, option.BranchName); err != nil { log.Error("paramCheckCreateTrainJob failed:(%v)", err, ctx.Data["MsgID"]) return nil, nil, "", err } //check whether the task name in the project is duplicated tasks, err := models.GetCloudbrainsByDisplayJobName(repo.ID, string(models.JobTypeTrain), option.DisplayJobName) if err == nil { if len(tasks) != 0 { log.Error("the job name did already exist", ctx.Data["MsgID"]) return nil, nil, "", fmt.Errorf("The job name did already exist.") } } else { if !models.IsErrJobNotExist(err) { log.Error("system error, %v", err, ctx.Data["MsgID"]) return nil, nil, "", fmt.Errorf("system error") } } //check specification computeType := models.GPU if option.Type == 3 { computeType = models.NPU } spec, err := resource.GetAndCheckSpec(ctx.User.ID, option.SpecId, models.FindSpecsOptions{ JobType: models.JobTypeTrain, ComputeResource: computeType, Cluster: models.C2NetCluster, }) if err != nil || spec == nil { return nil, nil, "", fmt.Errorf("Resource specification is not available.") } if !account.IsPointBalanceEnough(ctx.User.ID, spec.UnitPrice) { log.Error("point balance is not enough,userId=%d specId=%d", ctx.User.ID, spec.ID) return nil, nil, "", fmt.Errorf(ctx.Tr("points.insufficient_points_balance")) } //check dataset datasetInfos, datasetNames, err := models.GetDatasetInfo(option.Attachment, computeType) if err != nil { log.Error("GetDatasetInfo failed: %v", err, ctx.Data["MsgID"]) return nil, nil, "", fmt.Errorf(ctx.Tr("cloudbrain.error.dataset_select")) } return spec, datasetInfos, datasetNames, err } func GrampusTrainJobNpuCreate(ctx *context.Context, option api.CreateTrainJobOption) { displayJobName := option.DisplayJobName jobName := util.ConvertDisplayJobNameToJobName(displayJobName) uuid := option.Attachment description := option.Description bootFile := strings.TrimSpace(option.BootFile) params := option.Params repo := ctx.Repo.Repository codeLocalPath := setting.JobPath + jobName + modelarts.CodePath codeObsPath := grampus.JobPath + jobName + modelarts.CodePath branchName := option.BranchName isLatestVersion := modelarts.IsLatestVersion versionCount := modelarts.VersionCountOne engineName := option.Image lock := redis_lock.NewDistributeLock(redis_key.CloudbrainBindingJobNameKey(fmt.Sprint(repo.ID), string(models.JobTypeTrain), displayJobName)) defer lock.UnLock() spec, datasetInfos, datasetNames, err := checkParameters(ctx, option, lock, repo) if err != nil { ctx.JSON(http.StatusOK, models.BaseErrorMessageApi(err.Error())) return } //prepare code and out path _, err = ioutil.ReadDir(codeLocalPath) if err == nil { os.RemoveAll(codeLocalPath) } if err := downloadZipCode(ctx, codeLocalPath, branchName); err != nil { log.Error("downloadZipCode failed, server timed out: %s (%v)", repo.FullName(), err) ctx.JSON(http.StatusOK, models.BaseErrorMessageApi(ctx.Tr("cloudbrain.load_code_failed"))) return } //todo: upload code (send to file_server todo this work?) if err := obsMkdir(setting.CodePathPrefix + jobName + modelarts.OutputPath); err != nil { log.Error("Failed to obsMkdir_output: %s (%v)", repo.FullName(), err) ctx.JSON(http.StatusOK, models.BaseErrorMessageApi(ctx.Tr("cloudbrain.load_code_failed"))) return } if err := uploadCodeToObs(codeLocalPath, jobName, ""); err != nil { log.Error("Failed to uploadCodeToObs: %s (%v)", repo.FullName(), err) ctx.JSON(http.StatusOK, models.BaseErrorMessageApi(ctx.Tr("cloudbrain.load_code_failed"))) return } var datasetRemotePath, allFileName string for _, datasetInfo := range datasetInfos { if datasetRemotePath == "" { datasetRemotePath = datasetInfo.DataLocalPath + "'" + datasetInfo.FullName + "'" allFileName = datasetInfo.FullName } else { datasetRemotePath = datasetRemotePath + ";" + datasetInfo.DataLocalPath + "'" + datasetInfo.FullName + "'" allFileName = allFileName + ";" + datasetInfo.FullName } } //prepare command preTrainModelPath := getPreTrainModelPath(option.PreTrainModelUrl, option.CkptName) command, err := generateCommand(repo.Name, grampus.ProcessorTypeNPU, codeObsPath+cloudbrain.DefaultBranchName+".zip", datasetRemotePath, bootFile, params, setting.CodePathPrefix+jobName+modelarts.OutputPath, allFileName, preTrainModelPath, option.CkptName) if err != nil { log.Error("Failed to generateCommand: %s (%v)", displayJobName, err, ctx.Data["MsgID"]) ctx.JSON(http.StatusOK, models.BaseErrorMessageApi("Create task failed, internal error")) return } commitID, _ := ctx.Repo.GitRepo.GetBranchCommitID(branchName) req := &grampus.GenerateTrainJobReq{ JobName: jobName, DisplayJobName: displayJobName, ComputeResource: models.NPUResource, ProcessType: grampus.ProcessorTypeNPU, Command: command, ImageId: option.ImageID, Description: description, CodeObsPath: codeObsPath, BootFileUrl: codeObsPath + bootFile, BootFile: bootFile, WorkServerNumber: option.WorkServerNumber, Uuid: uuid, CommitID: commitID, IsLatestVersion: isLatestVersion, BranchName: branchName, Params: option.Params, EngineName: engineName, VersionCount: versionCount, TotalVersionCount: modelarts.TotalVersionCount, DatasetNames: datasetNames, DatasetInfos: datasetInfos, Spec: spec, CodeName: strings.ToLower(repo.Name), } if option.ModelName != "" { //使用预训练模型训练 req.ModelName = option.ModelName req.LabelName = option.LabelName req.CkptName = option.CkptName req.ModelVersion = option.ModelVersion req.PreTrainModelUrl = option.PreTrainModelUrl req.PreTrainModelPath = preTrainModelPath } jobId, err := grampus.GenerateTrainJob(ctx, req) if err != nil { log.Error("GenerateTrainJob failed:%v", err.Error()) ctx.JSON(http.StatusOK, models.BaseErrorMessageApi(err.Error())) return } ctx.JSON(http.StatusOK, models.BaseMessageApi{Code: 0, Message: jobId}) } func obsMkdir(dir string) error { input := &obs.PutObjectInput{} input.Bucket = setting.Bucket input.Key = dir _, err := storage.ObsCli.PutObject(input) if err != nil { log.Error("PutObject(%s) failed: %s", input.Key, err.Error()) return err } return nil } func uploadCodeToObs(codePath, jobName, parentDir string) error { files, err := readDir(codePath) if err != nil { log.Error("readDir(%s) failed: %s", codePath, err.Error()) return err } for _, file := range files { if file.IsDir() { input := &obs.PutObjectInput{} input.Bucket = setting.Bucket input.Key = parentDir + file.Name() + "/" _, err = storage.ObsCli.PutObject(input) if err != nil { log.Error("PutObject(%s) failed: %s", input.Key, err.Error()) return err } if err = uploadCodeToObs(codePath+file.Name()+"/", jobName, parentDir+file.Name()+"/"); err != nil { log.Error("uploadCodeToObs(%s) failed: %s", file.Name(), err.Error()) return err } } else { input := &obs.PutFileInput{} input.Bucket = setting.Bucket input.Key = setting.CodePathPrefix + jobName + "/code/" + parentDir + file.Name() input.SourceFile = codePath + file.Name() _, err = storage.ObsCli.PutFile(input) if err != nil { log.Error("PutFile(%s) failed: %s", input.SourceFile, err.Error()) return err } } } return nil } func grampusParamCheckCreateTrainJob(bootFile string, branchName string) error { if !strings.HasSuffix(strings.TrimSpace(bootFile), ".py") { log.Error("the boot file(%s) must be a python file", bootFile) return errors.New("启动文件必须是python文件") } if branchName == "" { log.Error("the branch must not be null!", branchName) return errors.New("代码分支不能为空!") } return nil } func downloadZipCode(ctx *context.Context, codePath, branchName string) error { archiveType := git.ZIP archivePath := codePath if !com.IsDir(archivePath) { if err := os.MkdirAll(archivePath, os.ModePerm); err != nil { log.Error("MkdirAll failed:" + err.Error()) return err } } // Get corresponding commit. var ( commit *git.Commit err error ) gitRepo := ctx.Repo.GitRepo if err != nil { log.Error("OpenRepository failed:" + err.Error()) return err } if gitRepo.IsBranchExist(branchName) { commit, err = gitRepo.GetBranchCommit(branchName) if err != nil { log.Error("GetBranchCommit failed:" + err.Error()) return err } } else { log.Error("the branch is not exist: " + branchName) return fmt.Errorf("The branch does not exist.") } archivePath = path.Join(archivePath, grampus.CodeArchiveName) if !com.IsFile(archivePath) { if err := commit.CreateArchive(archivePath, git.CreateArchiveOpts{ Format: archiveType, Prefix: setting.Repository.PrefixArchiveFiles, }); err != nil { log.Error("CreateArchive failed:" + err.Error()) return err } } return nil } func uploadCodeToMinio(codePath, jobName, parentDir string) error { files, err := readDir(codePath) if err != nil { log.Error("readDir(%s) failed: %s", codePath, err.Error()) return err } for _, file := range files { if file.IsDir() { if err = uploadCodeToMinio(codePath+file.Name()+"/", jobName, parentDir+file.Name()+"/"); err != nil { log.Error("uploadCodeToMinio(%s) failed: %s", file.Name(), err.Error()) return err } } else { destObject := setting.CBCodePathPrefix + jobName + parentDir + file.Name() sourceFile := codePath + file.Name() err = storage.Attachments.UploadObject(destObject, sourceFile) if err != nil { log.Error("UploadObject(%s) failed: %s", file.Name(), err.Error()) return err } } } return nil } func readDir(dirname string) ([]os.FileInfo, error) { f, err := os.Open(dirname) if err != nil { return nil, err } list, err := f.Readdir(0) f.Close() if err != nil { //todo: can not upload empty folder if err == io.EOF { return nil, nil } return nil, err } //sort.Slice(list, func(i, j int) bool { return list[i].Name() < list[j].Name() }) return list, nil } func mkModelPath(modelPath string) error { return mkPathAndReadMeFile(modelPath, "You can put the files into this directory and download the files by the web page.") } func mkPathAndReadMeFile(path string, text string) error { err := os.MkdirAll(path, os.ModePerm) if err != nil { log.Error("MkdirAll(%s) failed:%v", path, err) return err } fileName := path + "README" f, err := os.OpenFile(fileName, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, os.ModePerm) if err != nil { log.Error("OpenFile failed", err.Error()) return err } defer f.Close() _, err = f.WriteString(text) if err != nil { log.Error("WriteString failed", err.Error()) return err } return nil } func getPreTrainModelPath(pretrainModelDir string, fileName string) string { index := strings.Index(pretrainModelDir, "/") if index > 0 { filterBucket := pretrainModelDir[index+1:] return filterBucket + fileName } else { return "" } } func generateCommand(repoName, processorType, codeRemotePath, dataRemotePath, bootFile, paramSrc, outputRemotePath, datasetName, pretrainModelPath, pretrainModelFileName string) (string, error) { var command string workDir := grampus.NpuWorkDir if processorType == grampus.ProcessorTypeGPU { workDir = grampus.GpuWorkDir } command += "pwd;cd " + workDir + fmt.Sprintf(grampus.CommandPrepareScript, setting.Grampus.SyncScriptProject, setting.Grampus.SyncScriptProject) //download code & dataset if processorType == grampus.ProcessorTypeNPU { //no need to download code & dataset by internet } else if processorType == grampus.ProcessorTypeGPU { commandDownload := "./downloader_for_minio " + setting.Grampus.Env + " " + codeRemotePath + " " + grampus.CodeArchiveName + " '" + dataRemotePath + "' '" + datasetName + "'" commandDownload = processPretrainModelParameter(pretrainModelPath, pretrainModelFileName, commandDownload) command += commandDownload } //unzip code & dataset if processorType == grampus.ProcessorTypeNPU { //no need to process } else if processorType == grampus.ProcessorTypeGPU { unZipDatasetCommand := generateDatasetUnzipCommand(datasetName) commandUnzip := "cd " + workDir + "code;unzip -q master.zip;echo \"start to unzip dataset\";cd " + workDir + "dataset;" + unZipDatasetCommand command += commandUnzip } command += "echo \"unzip finished;start to exec code;\";" // set export var commandExport string if processorType == grampus.ProcessorTypeNPU { commandExport = "export bucket=" + setting.Bucket + " && export remote_path=" + outputRemotePath + ";" } else if processorType == grampus.ProcessorTypeGPU { commandExport = "export env=" + setting.Grampus.Env + " && export remote_path=" + outputRemotePath + ";" } command += commandExport //exec code var parameters models.Parameters var paramCode string if len(paramSrc) != 0 { err := json.Unmarshal([]byte(paramSrc), ¶meters) if err != nil { log.Error("Failed to Unmarshal params: %s (%v)", paramSrc, err) return command, err } for _, parameter := range parameters.Parameter { paramCode += " --" + parameter.Label + "=" + parameter.Value } } var commandCode string if processorType == grampus.ProcessorTypeNPU { commandCode = "/bin/bash /home/work/run_train_for_openi.sh /home/work/openi.py /tmp/log/train.log" + paramCode + ";" } else if processorType == grampus.ProcessorTypeGPU { if pretrainModelFileName != "" { paramCode += " --ckpt_url" + "=" + workDir + "pretrainmodel/" + pretrainModelFileName } commandCode = "cd " + workDir + "code/" + strings.ToLower(repoName) + ";python " + bootFile + paramCode + ";" } command += commandCode //get exec result commandGetRes := "result=$?;" command += commandGetRes //upload models if processorType == grampus.ProcessorTypeNPU { commandUpload := "cd " + workDir + setting.Grampus.SyncScriptProject + "/;./uploader_for_npu " + setting.Bucket + " " + outputRemotePath + " " + workDir + "output/;" command += commandUpload } else if processorType == grampus.ProcessorTypeGPU { commandUpload := "cd " + workDir + setting.Grampus.SyncScriptProject + "/;./uploader_for_gpu " + setting.Grampus.Env + " " + outputRemotePath + " " + workDir + "output/;" command += commandUpload } //check exec result commandCheckRes := "bash -c \"[[ $result -eq 0 ]] && exit 0 || exit -1\"" command += commandCheckRes return command, nil } func processPretrainModelParameter(pretrainModelPath string, pretrainModelFileName string, commandDownload string) string { commandDownloadTemp := commandDownload if pretrainModelPath != "" { commandDownloadTemp += " '" + pretrainModelPath + "' '" + pretrainModelFileName + "'" } commandDownloadTemp += ";" return commandDownloadTemp } func generateDatasetUnzipCommand(datasetName string) string { var unZipDatasetCommand string datasetNameArray := strings.Split(datasetName, ";") if len(datasetNameArray) == 1 { //单数据集 unZipDatasetCommand = "unzip -q '" + datasetName + "';" if strings.HasSuffix(datasetNameArray[0], ".tar.gz") { unZipDatasetCommand = "tar --strip-components=1 -zxvf '" + datasetName + "';" } } else { //多数据集 for _, datasetNameTemp := range datasetNameArray { if strings.HasSuffix(datasetNameTemp, ".tar.gz") { unZipDatasetCommand = unZipDatasetCommand + "tar -zxvf '" + datasetNameTemp + "';" } else { unZipDatasetCommand = unZipDatasetCommand + "unzip -q '" + datasetNameTemp + "' -d './" + strings.TrimSuffix(datasetNameTemp, ".zip") + "';" } } } return unZipDatasetCommand } func getPoolId() string { var resourcePools modelarts.ResourcePool json.Unmarshal([]byte(setting.ResourcePools), &resourcePools) return resourcePools.Info[0].ID } func PrepareSpec4Show(task *models.Cloudbrain) { s, err := resource.GetCloudbrainSpec(task.ID) if err != nil { log.Info("error:" + err.Error()) return } task.Spec = s } func IsTaskNotStop(task *models.Cloudbrain) bool { statuses := CloudbrainOneNotFinalStatuses if task.Type == models.TypeCloudBrainTwo || task.Type == models.TypeCDCenter { statuses = CloudbrainTwoNotFinalStatuses } else { statuses = GrampusNotFinalStatuses } for _, status := range statuses { if task.Status == status { return true } } return false } func SyncTaskStatus(task *models.Cloudbrain) error { if task.Type == models.TypeCloudBrainOne { result, err := cloudbrain.GetJob(task.JobID) if err != nil { log.Info("error:" + err.Error()) return fmt.Errorf("repo.cloudbrain_query_fail") } if result != nil { jobRes, _ := models.ConvertToJobResultPayload(result.Payload) taskRoles := jobRes.TaskRoles taskRes, _ := models.ConvertToTaskPod(taskRoles[cloudbrain.SubTaskName].(map[string]interface{})) oldStatus := task.Status task.Status = taskRes.TaskStatuses[0].State task.ContainerID = taskRes.TaskStatuses[0].ContainerID models.ParseAndSetDurationFromCloudBrainOne(jobRes, task) if task.DeletedAt.IsZero() { //normal record if oldStatus != task.Status { notification.NotifyChangeCloudbrainStatus(task, oldStatus) } err = models.UpdateJob(task) if err != nil { return fmt.Errorf("repo.cloudbrain_query_fail") } } } else { log.Info("error:" + err.Error()) return fmt.Errorf("repo.cloudbrain_query_fail") } } else if task.Type == models.TypeCloudBrainTwo || task.Type == models.TypeCDCenter { err := modelarts.HandleTrainJobInfo(task) if err != nil { return fmt.Errorf("repo.cloudbrain_query_fail") } } else if task.Type == models.TypeC2Net { result, err := grampus.GetJob(task.JobID) if err != nil { log.Error("GetJob failed:" + err.Error()) return fmt.Errorf("repo.cloudbrain_query_fail") } if result != nil { if len(result.JobInfo.Tasks[0].CenterID) == 1 && len(result.JobInfo.Tasks[0].CenterName) == 1 { task.AiCenter = result.JobInfo.Tasks[0].CenterID[0] + "+" + result.JobInfo.Tasks[0].CenterName[0] } oldStatus := task.Status task.Status = grampus.TransTrainJobStatus(result.JobInfo.Status) if task.Status != result.JobInfo.Status || result.JobInfo.Status == models.GrampusStatusRunning { task.Duration = result.JobInfo.RunSec if task.Duration < 0 { task.Duration = 0 } task.TrainJobDuration = models.ConvertDurationToStr(task.Duration) if task.StartTime == 0 && result.JobInfo.StartedAt > 0 { task.StartTime = timeutil.TimeStamp(result.JobInfo.StartedAt) } if task.EndTime == 0 && models.IsTrainJobTerminal(task.Status) && task.StartTime > 0 { task.EndTime = task.StartTime.Add(task.Duration) } task.CorrectCreateUnix() if oldStatus != task.Status { notification.NotifyChangeCloudbrainStatus(task, oldStatus) } err = models.UpdateJob(task) if err != nil { log.Error("UpdateJob failed:" + err.Error()) return fmt.Errorf("repo.cloudbrain_query_fail") } } } } return nil }