package octopusHttp import ( "bytes" "context" "encoding/json" "errors" "fmt" "github.com/zeromicro/go-zero/core/logx" common2 "gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/common" "gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/entity" "gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/schedulers/option" "gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/service/collector" "gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/service/executor" "gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/service/inference" "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/constants" "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/utils" omodel "gitlink.org.cn/JointCloud/pcm-octopus/http/model" "gitlink.org.cn/JointCloud/pcm-openi/common" "mime/multipart" "net/http" "strconv" "strings" "time" ) const ( RESOURCE_POOL = "grampus-pool" Param_Token = "token" Param_Addr = "addr" Forward_Slash = "/" COMMA = "," UNDERSCORE = "_" TASK_NAME_PREFIX = "trainJob" Python = "python " SemiColon = ";" BALANCE = "balance" RATE = "rate" PERHOUR = "per-hour" NUMBER = "number" KILOBYTE = "kb" GIGABYTE = "gb" CPUCORE = "core" STORAGE = "STORAGE" DISK = "disk" MEMORY = "memory" RAM = "ram" VRAM = "vram" RMB = "rmb" POINT = "point" RUNNINGTASK = "RUNNING_TASK" RUNNING = "RUNNING" CPU = "cpu" Gi = "Gi" AlgorithmRecordOnlyVersion = "V1" ) const ( NotImplementError = "not implemented" ) const ( MyAlgorithmListUrl = "api/v1/algorithm/myAlgorithmList" CreateAlgorithm = "api/v1/algorithm/create" ResourcespecsUrl = "api/v1/resource/specs" CreateTrainJobUrl = "api/v1/job/create" TrainJobDetail = "api/v1/job/detail" TrainJobLog = "api/v1/job/log" ) // compute source var ( ComputeSourceToCardType = map[string]string{ "nvidia-a100": "GPU", "nvidia-a100-80g": "GPU", "mr-v100": "ILUVATAR-GPGPU", "bi-v100": "ILUVATAR-GPGPU", "MR-V50": "ILUVATAR-GPGPU", "BI-V100": "ILUVATAR-GPGPU", "BI-V150": "ILUVATAR-GPGPU", "MR-V100": "ILUVATAR-GPGPU", "cambricon.com/mlu": "MLU", "hygon.com/dcu": "DCU", "huawei.com/Ascend910": "NPU", "enflame.com/gcu": "GCU", "ILUVATAR-GPGPU": "ILUVATAR-GPGPU", "MXN260": "METAX-GPGPU", } ) type OctopusHttp struct { server string host string platform string participantId int64 token *Token } func NewOctopusHttp(id int64, name, server, host string, user string, pwd string) *OctopusHttp { token, err := NewToken(server, host, user, pwd) if err != nil { logx.Infof("Init OctopusHttp, id: %d, host: %s, token error: %s \n", id, host, err) } return &OctopusHttp{platform: name, participantId: id, server: server, host: host, token: token} } // executor func (o *OctopusHttp) Execute(ctx context.Context, option *option.AiOption, mode int) (interface{}, error) { switch mode { case executor.SUBMIT_MODE_JOINT_CLOUD: case executor.SUBMIT_MODE_STORAGE_SCHEDULE: // cmd if option.AlgorithmId == "" { return nil, errors.New("algorithmId is empty") } if option.Cmd != "" { option.Cmd = option.Cmd + SemiColon + Python + option.AlgorithmId } else { option.Cmd = Python + option.AlgorithmId } // algorithm param := &omodel.CreateMyAlgorithmParam{ AlgorithmName: option.AlgorithmId, ModelName: option.AlgorithmId, } algorithm, err := o.createAlgorithm(ctx, param) if err != nil { return nil, err } if algorithm.Code != http.StatusOK { if algorithm.Data != nil { marshal, err := json.Marshal(algorithm.Data) if err != nil { return nil, err } errormdl := &omodel.Error{} err = json.Unmarshal(marshal, errormdl) if err != nil { return nil, err } return nil, errors.New(errormdl.Message) } } else { if algorithm.Data != nil { result := &entity.OctCreateAlgorithm{} marshal, err := json.Marshal(algorithm.Data) if err != nil { return nil, err } err = json.Unmarshal(marshal, result) if err != nil { return nil, err } if result.AlgorithmId == "" { return nil, errors.New("createAlgorithm failed") } option.AlgorithmId = result.AlgorithmId } else { return nil, errors.New("createAlgorithm failed") } } // resource option.ResourceId = "964fdee2db544928bfea74dac12a924f" // submit task, err := o.SubmitTask(ctx, option.ImageId, option.Cmd, option.Envs, option.Params, option.ResourceId, option.DatasetsId, option.AlgorithmId, option.TaskType) if err != nil { return nil, err } return task, nil } return nil, nil } func (o *OctopusHttp) Stop(ctx context.Context, id string) error { return nil } func (o *OctopusHttp) SubmitTask(ctx context.Context, imageId string, cmd string, envs []string, params []string, resourceId string, datasetsId string, algorithmId string, aiType string) (interface{}, error) { // octopus提交任务 reqUrl := o.server + CreateTrainJobUrl token, err := o.token.Get() if err != nil { return nil, err } // python参数 var prms []struct { Key string `json:"key"` Value string `json:"value"` } for _, param := range params { var p struct { Key string `json:"key"` Value string `json:"value"` } s := strings.Split(param, COMMA) p.Key = s[0] p.Value = s[1] prms = append(prms, p) } //环境变量 envMap := make(map[string]string) for _, env := range envs { s := strings.Split(env, COMMA) envMap[s[0]] = s[1] } param := &omodel.CreateTrainJobParam{ //DataSetId: datasetsId, //DataSetVersion: VERSION, AlgorithmId: algorithmId, AlgorithmVersion: AlgorithmRecordOnlyVersion, Name: TASK_NAME_PREFIX + UNDERSCORE + utils.RandomString(10), ImageId: imageId, IsDistributed: false, ResourcePool: RESOURCE_POOL, Config: []*omodel.CreateTrainJobConf{ { Command: cmd, ResourceSpecId: resourceId, MinFailedTaskCount: 1, MinSucceededTaskCount: 1, TaskNumber: 1, Parameters: prms, Envs: envMap, }, }, } resp := &entity.OctResp{} req := common.GetRestyRequest(common.TIMEOUT) _, err = req. SetHeader("Authorization", "Bearer "+token). SetQueryString("token=" + token). SetQueryString("addr=" + o.host). SetBody(param). SetResult(resp). Post(reqUrl) if err != nil { return nil, err } return resp, nil } func (o *OctopusHttp) createAlgorithm(ctx context.Context, param *omodel.CreateMyAlgorithmParam) (*entity.OctResp, error) { createAlgorithmUrl := o.server + CreateAlgorithm token, err := o.token.Get() if err != nil { return nil, err } resp := &entity.OctResp{} req := common.GetRestyRequest(common.TIMEOUT) _, err = req. SetHeader("Authorization", "Bearer "+token). SetBody(param). SetResult(resp). Post(createAlgorithmUrl) if err != nil { return nil, err } return resp, nil } // collector func (o *OctopusHttp) resourceSpecs(ctx context.Context) (*entity.OctResp, error) { resourcespecsUrl := o.server + ResourcespecsUrl token, err := o.token.Get() if err != nil { return nil, err } param := omodel.ResourceSpecParam{ ResourcePool: RESOURCE_POOL, } b, _ := json.Marshal(param) byt := bytes.NewBuffer(b) resp := &entity.OctResp{} req := common.GetRestyRequest(common.TIMEOUT) r, _ := http.NewRequest("GET", resourcespecsUrl, byt) req.RawRequest = r req.URL = resourcespecsUrl _, err = req. SetHeader("Content-Type", "application/json"). SetQueryParam(Param_Token, token). SetQueryParam(Param_Addr, o.host). SetBody(byt). SetResult(resp). Send() if err != nil { return nil, err } return resp, nil } func (o *OctopusHttp) GetResourceStats(ctx context.Context) (*collector.ResourceStats, error) { resp, err := o.resourceSpecs(ctx) if err != nil { return nil, err } if resp.Code != http.StatusOK { if resp.Data != nil { marshal, err := json.Marshal(resp.Data) if err != nil { return nil, err } errormdl := &omodel.Error{} err = json.Unmarshal(marshal, errormdl) if err != nil { return nil, err } return nil, errors.New(errormdl.Message) } } else { if resp.Data != nil { spec := &entity.OctResourceSpecs{} marshal, err := json.Marshal(resp.Data) if err != nil { return nil, err } err = json.Unmarshal(marshal, spec) if err != nil { return nil, err } } } return nil, nil } func (o *OctopusHttp) GetDatasetsSpecs(ctx context.Context) ([]*collector.DatasetsSpecs, error) { return nil, nil } func (o *OctopusHttp) GetAlgorithms(ctx context.Context) ([]*collector.Algorithm, error) { return nil, errors.New(NotImplementError) } func (o *OctopusHttp) GetTrainingTaskLog(ctx context.Context, taskId string, instanceNum string) (string, error) { taskDetailsUrl := o.server + TrainJobLog token, err := o.token.Get() if err != nil { return "", err } param := omodel.TrainJobLog{ JobId: taskId, } b, _ := json.Marshal(param) byt := bytes.NewBuffer(b) resp := &entity.OctResp{} req := common.GetRestyRequest(common.TIMEOUT) r, _ := http.NewRequest("GET", taskDetailsUrl, byt) req.RawRequest = r req.URL = taskDetailsUrl _, err = req. SetHeader("Content-Type", "application/json"). SetQueryParam(Param_Token, token). SetQueryParam(Param_Addr, o.host). SetBody(byt). SetResult(resp). Send() if err != nil { return "", errors.New("failed to invoke taskDetails") } if resp.Code != http.StatusOK { return "", errors.New("failed to invoke taskDetails") } var log string marshal, err := json.Marshal(resp.Data) if err != nil { return "", err } log = string(marshal) if strings.Contains(log, "404 Not Found") || log == "" { log = "waiting for logs..." } return log, nil } func (o *OctopusHttp) GetTrainingTask(ctx context.Context, taskId string) (*collector.Task, error) { if taskId == "" { return nil, errors.New("empty taskId") } resp, err := o.getTrainingTask(ctx, taskId) if err != nil { return nil, err } if resp.Code != http.StatusOK { if resp.Data != nil { marshal, err := json.Marshal(resp.Data) if err != nil { return nil, err } errormdl := &omodel.Error{} err = json.Unmarshal(marshal, errormdl) if err != nil { return nil, err } return nil, errors.New(errormdl.Message) } } else { if resp.Data != nil { job := &entity.OctTrainJob{} marshal, err := json.Marshal(resp.Data) if err != nil { return nil, err } err = json.Unmarshal(marshal, job) if err != nil { return nil, err } var task collector.Task task.Id = job.TrainJob.Id if job.TrainJob.StartedAt != 0 { task.Start = time.Unix(int64(job.TrainJob.StartedAt), 0).Format(constants.Layout) } if job.TrainJob.CompletedAt != 0 { task.End = time.Unix(int64(job.TrainJob.CompletedAt), 0).Format(constants.Layout) } switch job.TrainJob.Status { case "succeeded": task.Status = constants.Completed case "failed": task.Status = constants.Failed case "running": task.Status = constants.Running case "stopped": task.Status = constants.Stopped case "pending": task.Status = constants.Pending default: task.Status = "undefined" } return &task, nil } } return nil, errors.New("failed to get trainjob") } func (o *OctopusHttp) getTrainingTask(ctx context.Context, taskId string) (*entity.OctResp, error) { taskDetailsUrl := o.server + TrainJobDetail token, err := o.token.Get() if err != nil { return nil, err } param := omodel.TrainJobDetailParam{ JobId: taskId, } b, _ := json.Marshal(param) byt := bytes.NewBuffer(b) resp := &entity.OctResp{} req := common.GetRestyRequest(common.TIMEOUT) r, _ := http.NewRequest("GET", taskDetailsUrl, byt) req.RawRequest = r req.URL = taskDetailsUrl _, err = req. SetHeader("Content-Type", "application/json"). SetQueryParam(Param_Token, token). SetQueryParam(Param_Addr, o.host). SetBody(byt). SetResult(resp). Send() if err != nil { return nil, errors.New("failed to invoke taskDetails") } return resp, nil } func (o *OctopusHttp) DownloadAlgorithmCode(ctx context.Context, resourceType string, card string, taskType string, dataset string, algorithm string) (string, error) { return "", errors.New(NotImplementError) } func (o *OctopusHttp) UploadAlgorithmCode(ctx context.Context, resourceType string, card string, taskType string, dataset string, algorithm string, code string) error { return nil } func (o *OctopusHttp) GetComputeCards(ctx context.Context) ([]string, error) { return nil, errors.New(NotImplementError) } func (o *OctopusHttp) GetUserBalance(ctx context.Context) (float64, error) { return 0, errors.New(NotImplementError) } func (o *OctopusHttp) GetResourceSpecs(ctx context.Context, resrcType string) (*collector.ResourceSpec, error) { resp, err := o.resourceSpecs(ctx) if err != nil { return nil, err } res := &collector.ResourceSpec{ ClusterId: strconv.FormatInt(o.participantId, 10), Tag: resrcType, } if resp.Code != http.StatusOK { if resp.Data != nil { marshal, err := json.Marshal(resp.Data) if err != nil { return nil, err } errormdl := &omodel.Error{} err = json.Unmarshal(marshal, errormdl) if err != nil { return nil, err } return nil, errors.New(errormdl.Message) } } else { if resp.Data != nil { specs := &entity.OctResourceSpecs{} marshal, err := json.Marshal(resp.Data) if err != nil { return nil, err } err = json.Unmarshal(marshal, specs) if err != nil { return nil, err } clusterResources, err := genSpecs(specs, resrcType) if err != nil { return nil, err } res.Resources = clusterResources } } return res, nil } func genSpecs(specs *entity.OctResourceSpecs, resrcType string) ([]interface{}, error) { res := make([]interface{}, 0) if resrcType == "Inference" { return res, nil } else if resrcType == "Train" { if specs.MapResourceSpecIdList.Train.ResourceSpecs == nil { return res, nil } else { for _, s := range specs.MapResourceSpecIdList.Train.ResourceSpecs { spec := &omodel.Spec{} marshal, err := json.Marshal(s) if err != nil { return nil, err } err = json.Unmarshal(marshal, spec) if err != nil { return nil, err } resType, err := chooseResourceType(spec) if err != nil { return nil, err } if resType == nil { continue } res = append(res, resType) } } } return res, nil } func chooseResourceType(spec *omodel.Spec) (*collector.ClusterResource, error) { if spec.ResourceQuantity.NvidiaA100 != "" { tag, err := common2.GetJSONTag(spec.ResourceQuantity, "NvidiaA100") if err != nil { return nil, err } cres, err := genClusterResources(tag, spec.ResourceQuantity.NvidiaA100, spec) if err != nil { return nil, err } return cres, nil } else if spec.ResourceQuantity.NvidiaA10080G != "" { tag, err := common2.GetJSONTag(spec.ResourceQuantity, "NvidiaA10080G") if err != nil { return nil, err } cres, err := genClusterResources(tag, spec.ResourceQuantity.NvidiaA10080G, spec) if err != nil { return nil, err } return cres, nil } else if spec.ResourceQuantity.MrV100 != "" { tag, err := common2.GetJSONTag(spec.ResourceQuantity, "MrV100") if err != nil { return nil, err } cres, err := genClusterResources(tag, spec.ResourceQuantity.MrV100, spec) if err != nil { return nil, err } return cres, nil } else if spec.ResourceQuantity.BiV100 != "" { tag, err := common2.GetJSONTag(spec.ResourceQuantity, "BiV100") if err != nil { return nil, err } cres, err := genClusterResources(tag, spec.ResourceQuantity.BiV100, spec) if err != nil { return nil, err } return cres, nil } else if spec.ResourceQuantity.MRV50 != "" { tag, err := common2.GetJSONTag(spec.ResourceQuantity, "MRV50") if err != nil { return nil, err } cres, err := genClusterResources(tag, spec.ResourceQuantity.MRV50, spec) if err != nil { return nil, err } return cres, nil } else if spec.ResourceQuantity.BIV100 != "" { tag, err := common2.GetJSONTag(spec.ResourceQuantity, "NvidiaA100") if err != nil { return nil, err } cres, err := genClusterResources(tag, spec.ResourceQuantity.NvidiaA100, spec) if err != nil { return nil, err } return cres, nil } else if spec.ResourceQuantity.BIV150 != "" { tag, err := common2.GetJSONTag(spec.ResourceQuantity, "BIV150") if err != nil { return nil, err } cres, err := genClusterResources(tag, spec.ResourceQuantity.BIV150, spec) if err != nil { return nil, err } return cres, nil } else if spec.ResourceQuantity.MRV100 != "" { tag, err := common2.GetJSONTag(spec.ResourceQuantity, "MRV100") if err != nil { return nil, err } cres, err := genClusterResources(tag, spec.ResourceQuantity.MRV100, spec) if err != nil { return nil, err } return cres, nil } else if spec.ResourceQuantity.CambriconComMlu != "" { tag, err := common2.GetJSONTag(spec.ResourceQuantity, "CambriconComMlu") if err != nil { return nil, err } cres, err := genClusterResources(tag, spec.ResourceQuantity.CambriconComMlu, spec) if err != nil { return nil, err } return cres, nil } else if spec.ResourceQuantity.HygonComDcu != "" { tag, err := common2.GetJSONTag(spec.ResourceQuantity, "HygonComDcu") if err != nil { return nil, err } cres, err := genClusterResources(tag, spec.ResourceQuantity.HygonComDcu, spec) if err != nil { return nil, err } return cres, nil } else if spec.ResourceQuantity.HuaweiComAscend910 != "" { tag, err := common2.GetJSONTag(spec.ResourceQuantity, "HuaweiComAscend910") if err != nil { return nil, err } cres, err := genClusterResources(tag, spec.ResourceQuantity.HuaweiComAscend910, spec) if err != nil { return nil, err } return cres, nil } else if spec.ResourceQuantity.EnflameComGcu != "" { tag, err := common2.GetJSONTag(spec.ResourceQuantity, "EnflameComGcu") if err != nil { return nil, err } cres, err := genClusterResources(tag, spec.ResourceQuantity.EnflameComGcu, spec) if err != nil { return nil, err } return cres, nil } else if spec.ResourceQuantity.MXN260 != "" { tag, err := common2.GetJSONTag(spec.ResourceQuantity, "MXN260") if err != nil { return nil, err } cres, err := genClusterResources(tag, spec.ResourceQuantity.MXN260, spec) if err != nil { return nil, err } return cres, nil } else if spec.ResourceQuantity.NvidiaV100 != "" { tag, err := common2.GetJSONTag(spec.ResourceQuantity, "NvidiaV100") if err != nil { return nil, err } cres, err := genClusterResources(tag, spec.ResourceQuantity.NvidiaV100, spec) if err != nil { return nil, err } return cres, nil } else if spec.ResourceQuantity.MetaxTechComGpu != "" { tag, err := common2.GetJSONTag(spec.ResourceQuantity, "MetaxTechComGpu") if err != nil { return nil, err } cres, err := genClusterResources(tag, spec.ResourceQuantity.MetaxTechComGpu, spec) if err != nil { return nil, err } return cres, nil } return nil, nil } func genClusterResources(cType string, cNum string, s *omodel.Spec) (*collector.ClusterResource, error) { cres := &collector.ClusterResource{} bres := make([]*collector.Usage, 0) var cardNum int64 var cpuCore int64 var memGi int64 cardNum, err := strconv.ParseInt(cNum, 10, 64) if err != nil { cardNum = 0 } cpuCore, err = strconv.ParseInt(s.ResourceQuantity.Cpu, 10, 64) if err != nil { cpuCore = 0 } if s.ResourceQuantity.Memory != "" { gi := strings.Split(s.ResourceQuantity.Memory, Gi) if len(gi) != 2 { return nil, fmt.Errorf("s.ResourceQuantity.Memory convert error: %s", s.ResourceQuantity.Memory) } mGi, err := strconv.ParseInt(gi[0], 10, 64) if err != nil { memGi = 0 } else { memGi = mGi } } else { memGi = 0 } card := &collector.Usage{ Type: ComputeSourceToCardType[cType], Name: strings.ToUpper(cType), Total: &collector.UnitValue{Unit: NUMBER, Value: cardNum}, Available: &collector.UnitValue{Unit: NUMBER, Value: cardNum}, } cpu := &collector.Usage{ Type: strings.ToUpper(CPU), Name: strings.ToUpper(CPU), Total: &collector.UnitValue{Unit: CPUCORE, Value: cpuCore}, Available: &collector.UnitValue{Unit: CPUCORE, Value: cpuCore}, } mem := &collector.Usage{ Type: strings.ToUpper(MEMORY), Name: strings.ToUpper(RAM), Total: &collector.UnitValue{Unit: GIGABYTE, Value: memGi}, Available: &collector.UnitValue{Unit: GIGABYTE, Value: memGi}, } bres = append(bres, cpu) bres = append(bres, mem) cres.Resource = card cres.BaseResources = bres return cres, nil } // inference func (o *OctopusHttp) GetClusterInferUrl(ctx context.Context, option *option.InferOption) (*inference.ClusterInferUrl, error) { return nil, errors.New(NotImplementError) } func (o *OctopusHttp) GetInferDeployInstanceList(ctx context.Context) ([]*inference.DeployInstance, error) { return nil, errors.New(NotImplementError) } func (o *OctopusHttp) StartInferDeployInstance(ctx context.Context, id string) bool { return false } func (o *OctopusHttp) StopInferDeployInstance(ctx context.Context, id string) bool { return false } func (o *OctopusHttp) GetInferDeployInstance(ctx context.Context, id string) (*inference.DeployInstance, error) { return nil, errors.New(NotImplementError) } func (o *OctopusHttp) CreateInferDeployInstance(ctx context.Context, option *option.InferOption) (string, error) { return "", errors.New(NotImplementError) } func (o *OctopusHttp) CheckModelExistence(ctx context.Context, modelName string, modelType string) bool { return false } func (o *OctopusHttp) GetImageInferResult(ctx context.Context, url string, file multipart.File, fileName string) (string, error) { return "", errors.New(NotImplementError) }