package storeLink import ( "bytes" "context" "encoding/json" "errors" "fmt" "github.com/json-iterator/go" "github.com/rs/zerolog/log" openIcom "gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/common" "gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/schedulers/option" "gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/service/collector" "gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/service/executor" "gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/service/inference" "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/constants" "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/utils" "gitlink.org.cn/JointCloud/pcm-openi/common" "gitlink.org.cn/JointCloud/pcm-openi/model" "mime/multipart" "net/http" "net/url" "strconv" "strings" "sync" "sync/atomic" "time" ) const ( DEBUG = "DEBUG" TRAIN = "TRAIN" INFERENCE = "INFERENCE" C2NET = "C2Net" TESTREPO = "testrepo" ONLINEINFERENCE = "ONLINEINFERENCE" //online inference ) const ( CreationRequirelUrl = "/api/v1/task/creationRequired" TaskCreatelUrl = "/api/v1/task/create" ReposUrl = "/api/v1/user/repos" TaskListUrl = "/api/v1/task/list" TaskDetailsUrl = "/api/v1/task/detail" TaskLogUrl = "/api/v1/task/log" TaskStopUrl = "/api/v1/task/stop" TaskOnlineInferUrl = "/api/v1/task/onlineInferUrl" ) // compute source var ( ComputeSource = []string{"GPU", "NPU", "GCU", "MLU", "DCU", "CPU", "ILUVATAR-GPGPU", "METAX-GPGPU"} ) type ResourceSpecOpenI struct { ResType string Name string Number int64 } type OpenI struct { participantId int64 platform string host string userName string accessToken string } func NewOpenI(host string, id int64, name string, token string, platform string) *OpenI { return &OpenI{ host: host, participantId: id, userName: name, accessToken: token, platform: platform, } } func (o *OpenI) Execute(ctx context.Context, option *option.AiOption, mode int) (interface{}, error) { switch mode { case executor.SUBMIT_MODE_JOINT_CLOUD: case executor.SUBMIT_MODE_STORAGE_SCHEDULE: var repoName string codePaths := strings.SplitN(option.AlgorithmId, FORWARD_SLASH, 3) if len(codePaths) != 3 { return nil, fmt.Errorf("algorithmId %s format is incorrect", option.AlgorithmId) } repoName = codePaths[0] spec := &ResourceSpecOpenI{} for _, res := range option.ResourcesRequired { typeName, ok := res["type"] if !ok { continue } name, ok := res["name"] if !ok { continue } for _, s := range ComputeSource { switch typeName { case s: num, ok := res["number"] if !ok { continue } n := openIcom.ConvertTypeToString(num) val, err := strconv.ParseInt(n, 10, 64) if err != nil { return nil, err } spec.ResType = s spec.Name = openIcom.ConvertTypeToString(name) spec.Number = val break } } } if spec.ResType == "" || spec.Name == "" { return nil, errors.New("resource spec not found") } creationRequirelUrl := o.host + CreationRequirelUrl param := model.TaskCreationRequiredParam{ UserName: o.userName, RepoName: repoName, JobType: TRAIN, ComputeSource: spec.ResType, ClusterType: C2NET, } b, _ := json.Marshal(param) byt := bytes.NewBuffer(b) resp := struct { Code int `json:"code"` Msg string `json:"msg"` Data model.TaskCreationRequired `json:"data"` }{} req := common.GetRestyRequest(common.TIMEOUT) r, _ := http.NewRequest("GET", creationRequirelUrl, byt) req.RawRequest = r req.URL = creationRequirelUrl _, err := req. SetHeader("Content-Type", "application/json"). SetQueryParam(common.ACCESSTOKEN, o.accessToken). SetBody(byt). SetResult(&resp). Send() if err != nil { return nil, errors.New("failed to invoke TaskCreationRequired; " + err.Error()) } if len(resp.Data.Data.Specs.All) == 0 { return nil, errors.New("TaskCreationRequired specs are empty") } for _, s := range resp.Data.Data.Specs.All { if spec.ResType == s.ComputeResource && spec.Name == s.AccCardType { if int(spec.Number) == s.AccCardsNum { option.ResourceId = strconv.Itoa(s.Id) + FORWARD_SLASH + spec.ResType break } } } if option.ResourceId == "" { return nil, errors.New("can not find spec Id") } option.ComputeCard = spec.Name } task, err := o.SubmitTask(ctx, option.ImageId, option.Cmd, option.Envs, option.Params, option.ResourceId, option.DatasetsId, option.AlgorithmId, option.TaskType) if err != nil { return nil, err } return task, nil } func (o *OpenI) SubmitTask(ctx context.Context, imageId string, cmd string, envs []string, params []string, resourceId string, datasetsId string, algorithmId string, aiType string) (interface{}, error) { taskCreatelUrl := o.host + TaskCreatelUrl var repoName string var branchName string var bootFile string codePaths := strings.SplitN(algorithmId, FORWARD_SLASH, 3) if len(codePaths) != 3 { return nil, fmt.Errorf("algorithmId %s format is incorrect", algorithmId) } specs := strings.Split(resourceId, FORWARD_SLASH) specId, err := strconv.ParseInt(specs[0], 10, 0) if err != nil { return nil, err } computeSource := specs[1] repoName = codePaths[0] branchName = codePaths[1] bootFile = codePaths[2] // params var parameters struct { Parameter []struct { Label string `json:"label"` Value string `json:"value"` } `json:"parameter"` } for _, param := range params { s := strings.Split(param, COMMA) st := struct { Label string `json:"label"` Value string `json:"value"` }{ Label: s[0], Value: s[1], } parameters.Parameter = append(parameters.Parameter, st) } paramStr, _ := json.Marshal(parameters) // choose imageId and imageUrl imgId, imgUrl, err := swapImageIdAndImageUrl(imageId) if err != nil { return nil, err } taskParam := &model.CreateTaskParam{ Description: algorithmId, // temporarily set reponame contained in the algorithmId to desc for missing taskdetail's reponame JobType: TRAIN, Cluster: C2NET, DisplayJobName: strings.ToLower(TRAIN + UNDERSCORE + utils.RandomString(10)), ComputeSource: computeSource, SpecId: int(specId), BranchName: branchName, ImageId: imgId, ImageUrl: imgUrl, DatasetUuidStr: datasetsId, Params: string(paramStr), BootFile: bootFile, HasInternet: 2, // 0 不限制;1 不需要互联网;2 需要互联网 WorkServerNumber: 1, // 运行节点数 } param := model.CreateTaskReq{ UserName: o.userName, RepoName: repoName, CreateTaskParam: taskParam, } resp := struct { Code int `json:"code"` Msg string `json:"msg"` Data model.CreateTask `json:"data"` }{} req := common.GetRestyRequest(common.TIMEOUT) _, err = req. SetHeader("Content-Type", "application/json"). SetQueryParam(common.ACCESSTOKEN, o.accessToken). SetBody(¶m). SetResult(&resp). Post(taskCreatelUrl) if err != nil { return nil, err } if resp.Code != 200 { return nil, errors.New(resp.Msg) } if resp.Data.Code != 0 { return nil, errors.New(resp.Msg) } if (resp.Data == model.CreateTask{}) { return nil, errors.New("failed to submit task, empty response") } return resp.Data, nil } func swapImageIdAndImageUrl(imageId string) (string, string, error) { if imageId == "" { return "", "", errors.New("imageId is empty") } var imgId string var imgUrl string parsedURL, err := url.Parse("http://" + imageId) if err != nil { return "", "", err } if utils.IsValidHostAddress(parsedURL.Host) { imgId = "" imgUrl = imageId } else { imgId = imageId imgUrl = "" } return imgId, imgUrl, nil } func (o *OpenI) Stop(ctx context.Context, id string) error { task, err := o.getTrainingTask(ctx, id) if err != nil { return err } codePaths := strings.SplitN(task.Data.Task.Description, FORWARD_SLASH, 3) if len(codePaths) != 3 { return errors.New("failed to stop, openI desc not set") } repoName := codePaths[0] taskStopUrl := o.host + TaskStopUrl param := model.StopTaskParam{ UserName: o.userName, RepoName: repoName, Id: id, } resp := struct { Code int `json:"code"` Msg string `json:"msg"` Data model.StopTask `json:"data"` }{} req := common.GetRestyRequest(common.TIMEOUT) _, err = req. SetHeader("Content-Type", "application/json"). SetQueryParam(common.ACCESSTOKEN, o.accessToken). SetBody(¶m). SetResult(&resp). Post(taskStopUrl) if err != nil { return err } if resp.Code != http.StatusOK { return errors.New("failed to stop") } return nil } func (o *OpenI) GetClusterInferUrl(ctx context.Context, option *option.InferOption) (*inference.ClusterInferUrl, error) { return nil, errors.New("failed to implement") } func (o *OpenI) GetInferDeployInstanceList(ctx context.Context) ([]*inference.DeployInstance, error) { return nil, errors.New("failed to implement") } func (o *OpenI) StartInferDeployInstance(ctx context.Context, id string) bool { return false } func (o *OpenI) StopInferDeployInstance(ctx context.Context, id string) bool { err := o.Stop(ctx, id) if err != nil { return false } return true } func (o *OpenI) GetInferDeployInstance(ctx context.Context, id string) (*inference.DeployInstance, error) { task, err := o.getTrainingTask(ctx, id) if err != nil { return nil, err } description := task.Data.Task.Description //从描述中解析出repoName codePaths := strings.SplitN(description, FORWARD_SLASH, 3) if len(codePaths) != 3 { return nil, fmt.Errorf("algorithmId %s format is incorrect", description) } repoName := codePaths[0] var resp inference.DeployInstance resp.InstanceId = id resp.InstanceName = task.Data.Task.DisplayJobName resp.ModelName = task.Data.Task.PretrainModelName resp.ModelType = "" resp.InferCard = task.Data.Task.Spec.ComputeResource + "_" + task.Data.Task.Spec.AccCardType resp.ClusterName = o.platform resp.ClusterType = TYPE_OPENI //获取在线推理url var inferUrl string if task.Data.Task.Status == "RUNNING" { inferUrl, err = o.getOnlineInferUrl(ctx, id, repoName) if err != nil { return nil, err } } resp.InferUrl = inferUrl resp.Status = task.Data.Task.Status resp.CreatedTime = time.Unix(int64(task.Data.Task.CreatedUnix), 0).Format(constants.Layout) log.Debug().Msgf("func GetInferDeployInstance, resp: %v", resp) return &resp, nil } func (o *OpenI) CreateInferDeployInstance(ctx context.Context, option *option.InferOption) (string, error) { var repoName string codePaths := strings.SplitN(option.AlgorithmId, FORWARD_SLASH, 3) if len(codePaths) != 3 { return "", fmt.Errorf("algorithmId %s format is incorrect", option.AlgorithmId) } repoName = codePaths[0] spec := &ResourceSpecOpenI{} for _, res := range option.ResourcesRequired { typeName, ok := res["type"] if !ok { continue } name, ok := res["name"] if !ok { continue } for _, s := range ComputeSource { switch typeName { case s: num, ok := res["number"] if !ok { continue } n := openIcom.ConvertTypeToString(num) val, err := strconv.ParseInt(n, 10, 64) if err != nil { return "", err } spec.ResType = s spec.Name = openIcom.ConvertTypeToString(name) spec.Number = val break } } } if spec.ResType == "" || spec.Name == "" { return "", errors.New("resource spec not found") } creationRequirelUrl := o.host + CreationRequirelUrl param := model.TaskCreationRequiredParam{ UserName: o.userName, RepoName: repoName, JobType: ONLINEINFERENCE, ComputeSource: spec.ResType, ClusterType: C2NET, } b, _ := json.Marshal(param) byt := bytes.NewBuffer(b) resp := struct { Code int `json:"code"` Msg string `json:"msg"` Data model.TaskCreationRequired `json:"data"` }{} req := common.GetRestyRequest(common.TIMEOUT) r, _ := http.NewRequest("GET", creationRequirelUrl, byt) req.RawRequest = r req.URL = creationRequirelUrl _, err := req. SetHeader("Content-Type", "application/json"). SetQueryParam(common.ACCESSTOKEN, o.accessToken). SetBody(byt). SetResult(&resp). Send() if err != nil { return "", errors.New("failed to invoke TaskCreationRequired") } if len(resp.Data.Data.Specs.All) == 0 { return "", errors.New("TaskCreationRequired specs are empty") } for _, s := range resp.Data.Data.Specs.All { if spec.ResType == s.ComputeResource && spec.Name == s.AccCardType { if int(spec.Number) == s.AccCardsNum { option.ResourceId = strconv.Itoa(s.Id) + FORWARD_SLASH + spec.ResType break } } } if option.ResourceId == "" { return "", errors.New("can not find spec Id") } option.ComputeCard = spec.Name task, err := o.SubmitInferTask(ctx, option.ImageId, option.Cmd, option.Envs, option.Params, option.ResourceId, option.AlgorithmId, option.ModelID) if err != nil { return "", err } ma, err := jsoniter.Marshal(task) if err != nil { return "", err } taskId := jsoniter.Get(ma, "data").Get("id").ToString() return taskId, nil } func (o *OpenI) SubmitInferTask(ctx context.Context, imageId string, cmd string, envs []string, params []string, resourceId string, algorithmId string, modelId string) (interface{}, error) { taskCreatelUrl := o.host + TaskCreatelUrl var repoName string var branchName string var bootFile string //从描述中解析出repoName codePaths := strings.SplitN(algorithmId, FORWARD_SLASH, 3) if len(codePaths) != 3 { return nil, fmt.Errorf("algorithmId %s format is incorrect", algorithmId) } specs := strings.Split(resourceId, FORWARD_SLASH) specId, err := strconv.ParseInt(specs[0], 10, 0) if err != nil { return nil, err } computeSource := specs[1] repoName = codePaths[0] branchName = codePaths[1] bootFile = strings.Join(codePaths[2:], "/") log.Printf("repoName: %s, branchName: %s, bootFile: %s", repoName, branchName, bootFile) //params := "{\"parameter\":[{\"label\":\"a\",\"value\":\"1\"},{\"label\":\"b\",\"value\":\"2\"}]}" // choose imageId and imageUrl imgId, imgUrl, err := swapImageIdAndImageUrl(imageId) if err != nil { return nil, err } taskParam := &model.CreateTaskParam{ Description: algorithmId, // temporarily set reponame contained in the algorithmId to desc for missing taskdetail's reponame JobType: ONLINEINFERENCE, Cluster: C2NET, DisplayJobName: ONLINEINFERENCE + UNDERSCORE + utils.RandomString(10), ComputeSource: computeSource, SpecId: int(specId), BranchName: branchName, ImageId: imgId, ImageUrl: imgUrl, PretrainModelIdStr: modelId, BootFile: bootFile, HasInternet: 2, // 0 不限制;1 不需要互联网;2 需要互联网 WorkServerNumber: 1, // 运行节点数 } param := model.CreateTaskReq{ UserName: o.userName, RepoName: repoName, CreateTaskParam: taskParam, } resp := struct { Code int `json:"code"` Msg string `json:"msg"` Data model.CreateTask `json:"data"` }{} req := common.GetRestyRequest(common.TIMEOUT) _, err = req. SetHeader("Content-Type", "application/json"). SetQueryParam(common.ACCESSTOKEN, o.accessToken). SetBody(¶m). SetResult(&resp). Post(taskCreatelUrl) if err != nil { return nil, err } if resp.Code != http.StatusOK { return nil, errors.New(resp.Msg) } if resp.Data.Data.Id == 0 { return nil, fmt.Errorf("failed to submit task, msg: [%s]", resp.Data.Msg) } return resp.Data, nil } func (o *OpenI) CheckModelExistence(ctx context.Context, modelName string, modelType string) bool { return false } func (o *OpenI) GetImageInferResult(ctx context.Context, url string, file multipart.File, fileName string) (string, error) { return "", errors.New("failed to implement") } func (o *OpenI) GetResourceStats(ctx context.Context) (*collector.ResourceStats, error) { return nil, errors.New("failed to implement") } func (o *OpenI) GetDatasetsSpecs(ctx context.Context) ([]*collector.DatasetsSpecs, error) { return nil, errors.New("failed to implement") } func (o *OpenI) GetAlgorithms(ctx context.Context) ([]*collector.Algorithm, error) { return nil, errors.New("failed to implement") } func (o *OpenI) GetTrainingTaskLog(ctx context.Context, taskId string, instanceNum string) (string, error) { task, err := o.getTrainingTask(ctx, taskId) if err != nil { return "", err } codePaths := strings.SplitN(task.Data.Task.Description, FORWARD_SLASH, 3) if len(codePaths) != 3 { return "", errors.New("failed to get log, openI desc not set") } repoName := codePaths[0] tasklogurl := o.host + TaskLogUrl param := model.GetLogParam{ UserName: o.userName, RepoName: repoName, Id: taskId, } b, _ := json.Marshal(param) byt := bytes.NewBuffer(b) resp := struct { Code int `json:"code"` Msg string `json:"msg"` Data string `json:"data"` }{} req := common.GetRestyRequest(common.TIMEOUT) r, _ := http.NewRequest("GET", tasklogurl, byt) req.RawRequest = r req.URL = tasklogurl _, err = req. SetHeader("Content-Type", "application/json"). SetQueryParam(common.ACCESSTOKEN, o.accessToken). SetBody(byt). SetResult(&resp). Send() if err != nil { return "", err } if resp.Data == "" { return "waiting for logs", nil } return resp.Data, nil } func (o *OpenI) getTrainingTask(ctx context.Context, taskId string) (*model.TaskDetail, error) { taskDetailsUrl := o.host + TaskDetailsUrl param := model.TaskDetailParam{ UserName: o.userName, RepoName: TESTREPO, Id: taskId, } b, _ := json.Marshal(param) byt := bytes.NewBuffer(b) resp := struct { Code int `json:"code"` Msg string `json:"msg"` Data model.TaskDetail `json:"data"` }{} req := common.GetRestyRequest(common.TIMEOUT) r, _ := http.NewRequest("GET", taskDetailsUrl, byt) req.RawRequest = r req.URL = taskDetailsUrl _, err := req. SetHeader("Content-Type", "application/json"). SetQueryParam(common.ACCESSTOKEN, o.accessToken). SetBody(byt). SetResult(&resp). Send() if err != nil { return nil, errors.New("failed to invoke taskDetails") } if resp.Data.Code != 0 && resp.Data.Msg != "" { return nil, errors.New(resp.Data.Msg) } return &resp.Data, nil } func (o *OpenI) GetTrainingTask(ctx context.Context, taskId string) (*collector.Task, error) { task, err := o.getTrainingTask(ctx, taskId) if err != nil { return nil, err } var resp collector.Task resp.Id = strconv.Itoa(task.Data.Task.Id) if task.Data.Task.StartTime != 0 { resp.Start = time.Unix(int64(task.Data.Task.StartTime), 0).Format(constants.Layout) } if task.Data.Task.EndTime != 0 { resp.End = time.Unix(int64(task.Data.Task.EndTime), 0).Format(constants.Layout) } switch task.Data.Task.Status { case "SUCCEEDED": resp.Status = constants.Completed case "FAILED": resp.Status = constants.Failed case "CREATED_FAILED": resp.Status = constants.Failed case "RUNNING": resp.Status = constants.Running case "STOPPED": resp.Status = constants.Stopped case "PENDING": resp.Status = constants.Pending case "WAITING": resp.Status = constants.Waiting default: resp.Status = "undefined" } return &resp, nil } func (o *OpenI) DownloadAlgorithmCode(ctx context.Context, resourceType string, card string, taskType string, dataset string, algorithm string) (string, error) { return "", errors.New("failed to implement") } func (o *OpenI) UploadAlgorithmCode(ctx context.Context, resourceType string, card string, taskType string, dataset string, algorithm string, code string) error { return errors.New("failed to implement") } func (o *OpenI) GetComputeCards(ctx context.Context) ([]string, error) { return nil, errors.New("failed to implement") } func (o *OpenI) GetUserBalance(ctx context.Context) (float64, error) { return 0, errors.New("failed to implement") } func (o *OpenI) GetResourceSpecs(ctx context.Context, resrcType string) (*collector.ResourceSpec, error) { var jobType string if resrcType == "Inference" { jobType = ONLINEINFERENCE } else if resrcType == "Train" { jobType = TRAIN } var resources []interface{} res := &collector.ResourceSpec{ ClusterId: strconv.FormatInt(o.participantId, 10), Tag: resrcType, } //clres := &collector.ClusterResource{} creationRequirelUrl := o.host + CreationRequirelUrl reposUrl := o.host + ReposUrl taskListUrl := o.host + TaskListUrl var wg sync.WaitGroup var ch = make(chan *collector.ClusterResource) var once sync.Once wg.Add(2) go o.genComputeResources(&wg, ch, &once, jobType, creationRequirelUrl) go o.genRunningTaskNum(&wg, ch, reposUrl, taskListUrl) go func() { wg.Wait() close(ch) }() for v := range ch { resources = append(resources, v) } res.Resources = resources return res, nil } func (o *OpenI) genComputeResources(wg *sync.WaitGroup, ch chan *collector.ClusterResource, once *sync.Once, jobType string, creationRequirelUrl string) { defer wg.Done() for c := range ComputeSource { wg.Add(1) i := c go func() { defer wg.Done() param := model.TaskCreationRequiredParam{ UserName: o.userName, RepoName: TESTREPO, JobType: jobType, ComputeSource: ComputeSource[i], ClusterType: C2NET, } b, _ := json.Marshal(param) byt := bytes.NewBuffer(b) resp := struct { Code int `json:"code"` Msg string `json:"msg"` Data model.TaskCreationRequired `json:"data"` }{} req := common.GetRestyRequest(common.TIMEOUT) r, _ := http.NewRequest("GET", creationRequirelUrl, byt) req.RawRequest = r req.URL = creationRequirelUrl _, err := req. SetHeader("Content-Type", "application/json"). SetQueryParam(common.ACCESSTOKEN, o.accessToken). SetBody(byt). SetResult(&resp). Send() if err != nil { return } if len(resp.Data.Data.Specs.All) == 0 { return } // balance var balanceCheck = func() { balance := resp.Data.Data.PointAccount.Balance bal := &collector.Usage{} bal.Type = strings.ToUpper(BALANCE) bal.Total = &collector.UnitValue{ Unit: POINT, Value: balance, } ch <- &collector.ClusterResource{Resource: bal} //rate var v float64 v = 1 rate := &collector.Usage{ Type: strings.ToUpper(RATE), Total: &collector.UnitValue{Unit: PERHOUR, Value: v}, } ch <- &collector.ClusterResource{Resource: rate} } once.Do(balanceCheck) m := make(map[string]struct { Id int `json:"id"` AccCardsNum int `json:"acc_cards_num"` AccCardType string `json:"acc_card_type"` CpuCores int `json:"cpu_cores"` MemGiB int `json:"mem_gi_b"` GpuMemGiB int `json:"gpu_mem_gi_b"` ShareMemGiB int `json:"share_mem_gi_b"` ComputeResource string `json:"compute_resource"` UnitPrice int `json:"unit_price"` SourceSpecId string `json:"source_spec_id"` HasInternet int `json:"has_internet"` EnableVisualization bool `json:"enable_visualization"` }) for _, s := range resp.Data.Data.Specs.All { e, ok := m[s.AccCardType] if ok { if s.AccCardsNum > e.AccCardsNum { m[s.AccCardType] = s } } else { m[s.AccCardType] = s } } for k, v := range m { bres := make([]*collector.Usage, 0) cres := &collector.ClusterResource{} card := &collector.Usage{ Type: ComputeSource[i], Name: strings.ToUpper(k), Total: &collector.UnitValue{Unit: NUMBER, Value: v.AccCardsNum}, Available: &collector.UnitValue{Unit: NUMBER, Value: v.AccCardsNum}, } cpu := &collector.Usage{ Type: strings.ToUpper(CPU), Name: strings.ToUpper(CPU), Total: &collector.UnitValue{Unit: CPUCORE, Value: v.CpuCores}, Available: &collector.UnitValue{Unit: CPUCORE, Value: v.CpuCores}, } mem := &collector.Usage{ Type: strings.ToUpper(MEMORY), Name: strings.ToUpper(RAM), Total: &collector.UnitValue{Unit: GIGABYTE, Value: v.MemGiB}, Available: &collector.UnitValue{Unit: GIGABYTE, Value: v.MemGiB}, } vmem := &collector.Usage{ Type: strings.ToUpper(MEMORY), Name: strings.ToUpper(VRAM), Total: &collector.UnitValue{Unit: GIGABYTE, Value: v.GpuMemGiB}, Available: &collector.UnitValue{Unit: GIGABYTE, Value: v.GpuMemGiB}, } //storage var s float64 s = 1024 storage := &collector.Usage{} storage.Type = STORAGE storage.Name = DISK storage.Total = &collector.UnitValue{ Unit: GIGABYTE, Value: s, } storage.Available = &collector.UnitValue{ Unit: GIGABYTE, Value: s, } bres = append(bres, storage) bres = append(bres, cpu) bres = append(bres, mem) bres = append(bres, vmem) cres.Resource = card cres.BaseResources = bres ch <- cres } }() } } func (o *OpenI) genRunningTaskNum(wg *sync.WaitGroup, ch chan *collector.ClusterResource, reposUrl string, taskListUrl string) { defer wg.Done() reporesp := struct { Code int `json:"code"` Msg string `json:"msg"` Data []model.Repo `json:"data"` }{} reporeq := common.GetRestyRequest(common.TIMEOUT) repor, _ := http.NewRequest("GET", reposUrl, nil) reporeq.RawRequest = repor reporeq.URL = reposUrl _, err := reporeq. SetHeader("Content-Type", "application/json"). SetQueryParam(common.ACCESSTOKEN, o.accessToken). SetResult(&reporesp). Send() if err != nil { return } if len(reporesp.Data) == 0 { return } // tasklist var runningJobs atomic.Int64 var jwg sync.WaitGroup var errs []error var ech = make(chan error) jwg.Add(1) go func() { defer jwg.Done() for _, datum := range reporesp.Data { jwg.Add(1) dat := datum go func() { defer jwg.Done() param := model.TaskListParam{ UserName: o.userName, RepoName: dat.Name, } b, _ := json.Marshal(param) byt := bytes.NewBuffer(b) resp := struct { Code int `json:"code"` Msg string `json:"msg"` Data model.TaskList `json:"data"` }{} req := common.GetRestyRequest(common.TIMEOUT) r, _ := http.NewRequest("GET", taskListUrl, byt) req.RawRequest = r req.URL = taskListUrl _, err := req. SetHeader("Content-Type", "application/json"). SetQueryParam(common.ACCESSTOKEN, o.accessToken). SetBody(byt). SetResult(&resp). Send() if err != nil { // assume occupied running tasks ech <- err return } if len(resp.Data.Data.Tasks) == 0 { return } for _, task := range resp.Data.Data.Tasks { if task.Task.Status == RUNNING { runningJobs.Add(1) } } }() } }() go func() { jwg.Wait() close(ech) }() for v := range ech { errs = append(errs, v) } // running tasks num var runningNum int64 runningNum = runningJobs.Load() run := &collector.Usage{} run.Type = strings.ToUpper(RUNNINGTASK) if len(errs) == 0 { run.Total = &collector.UnitValue{ Unit: NUMBER, Value: runningNum, } ch <- &collector.ClusterResource{Resource: run} } else { runningNum = int64(len(errs)) * 4 run.Total = &collector.UnitValue{ Unit: NUMBER, Value: runningNum, } ch <- &collector.ClusterResource{Resource: run} } } func (o *OpenI) getOnlineInferUrl(ctx context.Context, taskId string, repoName string) (string, error) { taskDetailsUrl := o.host + TaskOnlineInferUrl param := model.TaskDetailParam{ UserName: o.userName, RepoName: repoName, Id: taskId, } b, _ := json.Marshal(param) byt := bytes.NewBuffer(b) resp := model.SelfEndpointUrlResp{} req := common.GetRestyRequest(common.TIMEOUT) r, _ := http.NewRequest("GET", taskDetailsUrl, byt) req.RawRequest = r req.URL = taskDetailsUrl _, err := req. SetHeader("Content-Type", "application/json"). SetQueryParam(common.ACCESSTOKEN, o.accessToken). SetBody(byt). SetResult(&resp). Send() if err != nil { return "", err } if resp.Code != http.StatusOK { return "", errors.New(resp.Msg) } return resp.Data.Url, nil }