/* Copyright (c) [2023] [pcm] [pcm-coordinator] is licensed under Mulan PSL v2. You can use this software according to the terms and conditions of the Mulan PSL v2. You may obtain a copy of Mulan PSL v2 at: http://license.coscl.org.cn/MulanPSL2 THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, EITHER EXPaRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE. See the Mulan PSL v2 for more details. */ package storeLink import ( "context" "fmt" "github.com/pkg/errors" "gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/schedulers/option" "gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/service/collector" "gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/service/inference" "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/constants" "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/utils" "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/utils/timeutils" "gitlink.org.cn/JointCloud/pcm-modelarts/client/imagesservice" "gitlink.org.cn/JointCloud/pcm-modelarts/client/modelartsservice" "gitlink.org.cn/JointCloud/pcm-modelarts/modelarts" modelartsclient "gitlink.org.cn/JointCloud/pcm-modelarts/modelarts" "log" "mime/multipart" "strconv" "strings" "sync" "time" ) const ( Ascend = "Ascend" Npu = "npu" ImageNetResnet50Cmd = "cd /home/ma-user & python ./inference_ascend.py" ChatGLM6BCmd = "cd /home/ma-user && python ./download_model.py && python ./inference_chatGLM.py" ) type ModelArtsLink struct { modelArtsRpc modelartsservice.ModelArtsService modelArtsImgRpc imagesservice.ImagesService platform string participantId int64 pageIndex int32 pageSize int32 SourceLocation string Version string ModelId string ModelType string } type MoUsage struct { CpuSize int64 NpuSize int64 CpuAvailable int64 NpuAvailable int64 } // Version 结构体表示版本号 type Version struct { Major, Minor, Patch int } // ParseVersion 从字符串解析版本号 func ParseVersion(versionStr string) (*Version, error) { parts := strings.Split(versionStr, ".") if len(parts) != 3 { return nil, fmt.Errorf("invalid version format: %s", versionStr) } major, err := strconv.Atoi(parts[0]) if err != nil { return nil, err } minor, err := strconv.Atoi(parts[1]) if err != nil { return nil, err } patch, err := strconv.Atoi(parts[2]) if err != nil { return nil, err } return &Version{Major: major, Minor: minor, Patch: patch}, nil } // Increment 根据给定规则递增版本号 func (v *Version) Increment() { if v.Patch < 9 { v.Patch++ } else { v.Patch = 0 if v.Minor < 9 { v.Minor++ } else { v.Minor = 0 v.Major++ } } } // String 将版本号转换回字符串格式 func (v *Version) String() string { return fmt.Sprintf("%d.%d.%d", v.Major, v.Minor, v.Patch) } func NewModelArtsLink(modelArtsRpc modelartsservice.ModelArtsService, modelArtsImgRpc imagesservice.ImagesService, name string, id int64, nickname string) *ModelArtsLink { return &ModelArtsLink{modelArtsRpc: modelArtsRpc, modelArtsImgRpc: modelArtsImgRpc, platform: nickname, participantId: id, pageIndex: 0, pageSize: 50} } func (m *ModelArtsLink) UploadImage(ctx context.Context, path string) (interface{}, error) { //TODO modelArts上传镜像 return nil, nil } func (m *ModelArtsLink) DeleteImage(ctx context.Context, imageId string) (interface{}, error) { // TODO modelArts删除镜像 return nil, nil } func (m *ModelArtsLink) QueryImageList(ctx context.Context) (interface{}, error) { // modelArts获取镜像列表 req := &modelarts.ListRepoReq{ Offset: "0", Limit: strconv.Itoa(int(m.pageSize)), Platform: m.platform, } resp, err := m.modelArtsImgRpc.ListReposDetails(ctx, req) if err != nil { return nil, err } return resp, nil } func (m *ModelArtsLink) SubmitTask(ctx context.Context, imageId string, cmd string, envs []string, params []string, resourceId string, datasetsId string, algorithmId string, aiType string) (interface{}, error) { // modelArts提交任务 environments := make(map[string]string) parameters := make([]*modelarts.ParametersTrainJob, 0) for _, env := range envs { s := strings.Split(env, COMMA) environments[s[0]] = s[1] } for _, param := range params { s := strings.Split(param, COMMA) parameters = append(parameters, &modelarts.ParametersTrainJob{ Name: s[0], Value: s[1], }) } req := &modelarts.CreateTrainingJobReq{ Kind: "job", Metadata: &modelarts.MetadataS{ Name: TASK_NAME_PREFIX + utils.RandomString(10), WorkspaceId: "0", }, Algorithm: &modelarts.Algorithms{ Id: algorithmId, Engine: &modelarts.EngineCreateTraining{ ImageUrl: imageId, }, Command: cmd, Environments: environments, Parameters: parameters, }, Spec: &modelarts.SpecsC{ Resource: &modelarts.ResourceCreateTraining{ FlavorId: resourceId, NodeCount: 1, }, }, Platform: m.platform, } resp, err := m.modelArtsRpc.CreateTrainingJob(ctx, req) if err != nil { return nil, err } return resp, nil } func (m *ModelArtsLink) QueryTask(ctx context.Context, taskId string) (interface{}, error) { // 获取任务 req := &modelarts.DetailTrainingJobsReq{ TrainingJobId: taskId, Platform: m.platform, } resp, err := m.modelArtsRpc.GetTrainingJobs(ctx, req) if err != nil { return nil, err } return resp, nil } func (m *ModelArtsLink) DeleteTask(ctx context.Context, taskId string) (interface{}, error) { // 删除任务 req := &modelarts.DeleteTrainingJobReq{ TrainingJobId: taskId, Platform: m.platform, } resp, err := m.modelArtsRpc.DeleteTrainingJob(ctx, req) if err != nil { return nil, err } return resp, nil } func (m *ModelArtsLink) QuerySpecs(ctx context.Context) (interface{}, error) { // octopus查询资源规格 req := &modelarts.TrainingJobFlavorsReq{ Platform: m.platform, } resp, err := m.modelArtsRpc.GetTrainingJobFlavors(ctx, req) if err != nil { return nil, err } return resp, nil } func (m *ModelArtsLink) GetResourceStats(ctx context.Context) (*collector.ResourceStats, error) { req := &modelarts.GetPoolsRuntimeMetricsReq{} resp, err := m.modelArtsRpc.GetPoolsRuntimeMetrics(ctx, req) if err != nil { return nil, err } if resp.ErrorMsg != "" { return nil, errors.New("failed to get algorithms") } resourceStats := &collector.ResourceStats{} CpuCoreTotalSum := int64(0) CpuCoreAvailSum := int64(0) MemTotalSum := float64(0) MemAvailSum := float64(0) var CpuCoreTotal int64 var CpuCoreAvail int64 var MemTotal float64 var MemAvail float64 for _, items := range resp.Items { //TODO The value of taskType is temporarily fixed to "pytorch" CpuCoreTotal, err = strconv.ParseInt(items.Table.Capacity.Value.Cpu, 10, 64) CpuCoreTotalSum += CpuCoreTotal CpuCoreAvail, err = strconv.ParseInt(items.Table.Allocated.Value.Cpu, 10, 64) CpuCoreAvailSum += CpuCoreAvail MemTotal, err = strconv.ParseFloat(items.Table.Capacity.Value.Memory, 64) MemTotalSum += MemTotal MemAvail, err = strconv.ParseFloat(items.Table.Allocated.Value.Memory, 64) MemAvailSum += MemAvail } resourceStats.CpuCoreTotal = CpuCoreTotalSum resourceStats.CpuCoreAvail = CpuCoreAvailSum resourceStats.MemTotal = MemTotalSum resourceStats.MemAvail = MemAvailSum req1 := &modelarts.GetResourceFlavorsReq{} resp1, err := m.modelArtsRpc.GetResourceFlavors(ctx, req1) num32, _ := strconv.Atoi(resp1.Items[0].Spec.Npu.Size) var cards []*collector.Card card := &collector.Card{ Platform: MODELARTS, Type: CARD, Name: Npu, CardNum: int32(num32), TOpsAtFp16: float64(num32 * 320), } cards = append(cards, card) resourceStats.CardsAvail = cards return resourceStats, nil } func (m *ModelArtsLink) GetDatasetsSpecs(ctx context.Context) ([]*collector.DatasetsSpecs, error) { return nil, nil } func (m *ModelArtsLink) GetAlgorithms(ctx context.Context) ([]*collector.Algorithm, error) { var algorithms []*collector.Algorithm req := &modelarts.ListAlgorithmsReq{ Platform: m.platform, Offset: m.pageIndex, Limit: m.pageSize, } resp, err := m.modelArtsRpc.ListAlgorithms(ctx, req) if err != nil { return nil, err } if resp.ErrorMsg != "" { return nil, errors.New("failed to get algorithms") } for _, a := range resp.Items { //TODO The value of taskType is temporarily fixed to "pytorch" algorithm := &collector.Algorithm{Name: a.Metadata.Name, Platform: MODELARTS, TaskType: "pytorch"} algorithms = append(algorithms, algorithm) } return algorithms, nil } func (m *ModelArtsLink) GetComputeCards(ctx context.Context) ([]string, error) { var cards []string cards = append(cards, Ascend) return cards, nil } func (m *ModelArtsLink) GetUserBalance(ctx context.Context) (float64, error) { return 0, nil } func (m *ModelArtsLink) DownloadAlgorithmCode(ctx context.Context, resourceType string, card string, taskType string, dataset string, algorithm string) (string, error) { algoName := dataset + DASH + algorithm req := &modelarts.GetFileReq{ Path: algoName + FORWARD_SLASH + TRAIN_FILE, } resp, err := m.modelArtsRpc.GetFile(ctx, req) if err != nil { return "", err } return string(resp.Content), nil } func (m *ModelArtsLink) UploadAlgorithmCode(ctx context.Context, resourceType string, card string, taskType string, dataset string, algorithm string, code string) error { return nil } // Determine whether there is a necessary image in image management and query the image name based on the image name func (m *ModelArtsLink) getSourceLocationFromImages(ctx context.Context, option *option.InferOption) error { req := &modelarts.ListImagesReq{ //Platform: m.platform, Limit: 50, Offset: 0, } ListImagesResp, err := m.modelArtsRpc.ListImages(ctx, req) if err != nil { return err } if ListImagesResp.Code != 200 { return errors.New("failed to get ListImages") } for _, ListImages := range ListImagesResp.Data { if option.ModelName == "ChatGLM-6B" { if ListImages.Name == "chatglm-6b" { m.SourceLocation = ListImages.SwrPath return nil } } else { if ListImages.Name == option.ModelName { m.SourceLocation = ListImages.SwrPath return nil } } } return errors.New("SourceLocation not set") } // Get AI Application List func (m *ModelArtsLink) GetModelId(ctx context.Context, option *option.InferOption) error { req := &modelarts.ListModelReq{ Platform: m.platform, ModelName: option.ModelName, //ModelType: "Image", Limit: int64(m.pageIndex), Offset: int64(m.pageSize), } ListModelResp, err := m.modelArtsRpc.ListModels(ctx, req) if err != nil { return err } if ListModelResp.Code == 200 { //return errors.New("failed to get ModelId") for _, ListModel := range ListModelResp.Models { if ListModel.ModelName == option.ModelName { option.ModelId = ListModel.ModelId m.Version = ListModel.ModelVersion return nil } } } err = m.CreateModel(ctx, option) if err != nil { return err } return nil } func (m *ModelArtsLink) GetModel(ctx context.Context, option *option.InferOption) string { req := &modelarts.ShowModelReq{ Platform: m.platform, ModelId: option.ModelId, } ctx, cancel := context.WithTimeout(context.Background(), 50*time.Second) defer cancel() ShowModelsResp, err := m.modelArtsRpc.ShowModels(ctx, req) if err != nil { if err == context.DeadlineExceeded { log.Println("Request timed out") // 重试请求或其他处理 } else { log.Fatalf("could not call method: %v", err) } } if ShowModelsResp.Code != 200 { errors.New("failed to get findModelsStatus") } m.ModelType = ShowModelsResp.ShowModelDetail.ModelAlgorithm return ShowModelsResp.ShowModelDetail.ModelStatus } // Get AI Application List func (m *ModelArtsLink) GetModelStatus(ctx context.Context, option *option.InferOption) error { var wg sync.WaitGroup wg.Add(1) // 使用goroutine进行轮询 //defer wg.Done() for { status := m.GetModel(ctx, option) if status == "published" { fmt.Println("Model is now published.") break // 一旦状态变为published,就退出循环 } fmt.Println("Waiting for model to be published...") time.Sleep(5 * time.Second) // 等待一段时间后再次检查 } // 在这里执行模型状态为published后需要进行的操作 fmt.Println("Continuing with the program...") return nil } // Create an AI application func (m *ModelArtsLink) CreateModel(ctx context.Context, option *option.InferOption) error { //Before creating an AI application, check if there are any images that can be created err := m.getSourceLocationFromImages(ctx, option) if err != nil { // return errors.New("No image available for creationd") } // var CMD string if option.ModelName == "imagenet_resnet50" { CMD = ImageNetResnet50Cmd } else if option.ModelName == "ChatGLM-6B" { CMD = ChatGLM6BCmd } if m.Version == "" { m.Version = "0.0.1" } version, err := ParseVersion(m.Version) version.Increment() req := &modelarts.CreateModelReq{ Platform: m.platform, ModelName: option.ModelName, ModelType: "Image", ModelVersion: version.String(), SourceLocation: m.SourceLocation, InstallType: []string{"real-time"}, Cmd: CMD, ModelAlgorithm: option.ModelType, } ModelResp, err := m.modelArtsRpc.CreateModel(ctx, req) if err != nil { return err } if ModelResp.Code != 200 { return errors.New("failed to get ModelId") } option.ModelId = ModelResp.ModelId return nil } func (m *ModelArtsLink) GetSpecifications(ctx context.Context, option *option.AiOption, ifoption *option.InferOption) error { req := &modelarts.ListSpecificationsReq{ //Platform: m.platform, IsPersonalCluster: false, InferType: "real-time", Limit: m.pageIndex, OffSet: m.pageSize, } ListSpecificationsResp, err := m.modelArtsRpc.ListSpecifications(ctx, req) if err != nil { return err } for _, ListSpecifications := range ListSpecificationsResp.Specifications { if ListSpecifications.Specification == "modelarts.kat1.xlarge" { ifoption.Specification = ListSpecifications.Specification return nil } } return nil } func (m *ModelArtsLink) GetTrainingTaskLog(ctx context.Context, taskId string, instanceNum string) (string, error) { req := &modelartsservice.GetTrainingJobLogsPreviewReq{ Platform: m.platform, TaskId: "worker-0", TrainingJobId: taskId, } resp, err := m.modelArtsRpc.GetTrainingJobLogsPreview(ctx, req) if err != nil { return "", err } if strings.Contains(resp.Content, "404 Not Found") { resp.Content = "waiting for logs..." } return resp.Content, nil } func (m *ModelArtsLink) GetTrainingTask(ctx context.Context, taskId string) (*collector.Task, error) { resp, err := m.QueryTask(ctx, taskId) if err != nil { return nil, err } jobresp, ok := (resp).(*modelartsservice.JobResponse) if jobresp.ErrorMsg != "" || !ok { if jobresp.ErrorMsg != "" { return nil, errors.New(jobresp.ErrorMsg) } else { return nil, errors.New("get training task failed, empty error returned") } } var task collector.Task task.Id = jobresp.Metadata.Id switch strings.ToLower(jobresp.Status.Phase) { case "completed": milliTimestamp := int64(jobresp.Status.StartTime) task.Start = timeutils.MillisecondsToUTCString(milliTimestamp, time.DateTime) duration := int64(jobresp.Status.Duration) task.End = timeutils.MillisecondsToAddDurationToUTCString(milliTimestamp, duration, time.DateTime) task.Status = constants.Completed case "failed": milliTimestamp := int64(jobresp.Status.StartTime) task.Start = timeutils.MillisecondsToUTCString(milliTimestamp, time.DateTime) duration := int64(jobresp.Status.Duration) task.End = timeutils.MillisecondsToAddDurationToUTCString(milliTimestamp, duration, time.DateTime) task.Status = constants.Failed case "running": milliTimestamp := int64(jobresp.Status.StartTime) task.Start = timeutils.MillisecondsToUTCString(milliTimestamp, time.DateTime) task.Status = constants.Running case "stopped": task.Status = constants.Stopped case "pending": task.Status = constants.Pending case "terminated": //TODO Failed task.Status = constants.Failed default: task.Status = "undefined" } return &task, nil } func (m *ModelArtsLink) Execute(ctx context.Context, option *option.AiOption) (interface{}, error) { err := m.GenerateSubmitParams(ctx, option) if err != nil { return nil, err } task, err := m.SubmitTask(ctx, option.ImageId, option.Cmd, option.Envs, option.Params, option.ResourceId, option.DatasetsId, option.AlgorithmId, option.TaskType) if err != nil { return nil, err } return task, nil } func (m *ModelArtsLink) GenerateSubmitParams(ctx context.Context, option *option.AiOption) error { err := m.generateResourceId(ctx, option, nil) if err != nil { return err } err = m.generateAlgorithmId(ctx, option) if err != nil { return err } err = m.generateImageId(option) if err != nil { return err } err = m.generateCmd(option) if err != nil { return err } err = m.generateEnv(option) if err != nil { return err } err = m.generateParams(option) if err != nil { return err } return nil } func (m *ModelArtsLink) generateResourceId(ctx context.Context, option *option.AiOption, ifoption *option.InferOption) error { option.ResourceId = "modelarts.kat1.xlarge" return nil } func (m *ModelArtsLink) generateImageId(option *option.AiOption) error { return nil } func (m *ModelArtsLink) generateCmd(option *option.AiOption) error { return nil } func (m *ModelArtsLink) generateEnv(option *option.AiOption) error { return nil } func (m *ModelArtsLink) generateParams(option *option.AiOption) error { return nil } func (m *ModelArtsLink) generateAlgorithmId(ctx context.Context, option *option.AiOption) error { req := &modelarts.ListAlgorithmsReq{ Platform: m.platform, Offset: m.pageIndex, Limit: m.pageSize, } resp, err := m.modelArtsRpc.ListAlgorithms(ctx, req) if err != nil { return err } if resp.ErrorMsg != "" { return errors.New("failed to get algorithmId") } for _, algorithm := range resp.Items { engVersion := algorithm.JobConfig.Engine.EngineVersion if strings.Contains(engVersion, option.TaskType) { ns := strings.Split(algorithm.Metadata.Name, DASH) if ns[0] != option.TaskType { continue } if ns[1] != option.DatasetsName { continue } if ns[2] != option.AlgorithmName { continue } option.AlgorithmId = algorithm.Metadata.Id return nil } } if option.AlgorithmId == "" { return errors.New("Algorithm does not exist") } return errors.New("failed to get AlgorithmId") } func (m *ModelArtsLink) GetClusterInferUrl(ctx context.Context, option *option.InferOption) (*inference.ClusterInferUrl, error) { var imageUrls []*inference.InferUrl urlReq := &modelartsclient.ImageReasoningUrlReq{ ServiceName: option.ModelName, Type: option.ModelType, Card: "npu", } urlResp, err := m.modelArtsRpc.ImageReasoningUrl(ctx, urlReq) if err != nil { return nil, err } imageUrl := &inference.InferUrl{ Url: urlResp.Url, Card: "npu", } imageUrls = append(imageUrls, imageUrl) clusterWithUrl := &inference.ClusterInferUrl{ ClusterName: m.platform, ClusterType: TYPE_MODELARTS, InferUrls: imageUrls, } return clusterWithUrl, nil } func (m *ModelArtsLink) GetInferDeployInstanceList(ctx context.Context) ([]*inference.DeployInstance, error) { var insList []*inference.DeployInstance req := &modelarts.ListServicesReq{ Platform: m.platform, OffSet: m.pageIndex, Limit: m.pageSize, } //list, err := m.modelArtsRpc.ListServices(ctx, req) resp, err := m.modelArtsRpc.ListServices(ctx, req) if err != nil { return nil, err } if resp.ErrorMsg != "" { return nil, errors.New(resp.Msg) } for _, services := range resp.Services { ins := &inference.DeployInstance{} ins.InstanceName = services.ServiceName ins.InstanceId = services.ServiceId ins.Status = services.Status ins.InferCard = "NPU" ins.ClusterName = m.platform ins.CreatedTime = string(services.StartTime) ins.ClusterType = TYPE_MODELARTS insList = append(insList, ins) } return insList, nil } func (m *ModelArtsLink) StartInferDeployInstance(ctx context.Context, id string) bool { req := &modelartsclient.UpdateServiceReq{ ServiceId: id, Status: "running", } resp, err := m.modelArtsRpc.UpdateService(ctx, req) if err != nil || resp.Code != 0 { return false } if resp.Code == 0 { return true } return false } func (m *ModelArtsLink) StopInferDeployInstance(ctx context.Context, id string) bool { req := &modelartsclient.UpdateServiceReq{ ServiceId: id, Status: "stopped", } resp, err := m.modelArtsRpc.UpdateService(ctx, req) if err != nil || resp.Code != 0 { return false } if resp.Code == 0 { return true } return false } func (m *ModelArtsLink) GetInferDeployInstance(ctx context.Context, id string) (*inference.DeployInstance, error) { req := &modelarts.ShowServiceReq{ ServiceId: id, } resp, err := m.modelArtsRpc.ShowService(ctx, req) if err != nil { return nil, err } if resp.ErrorMsg != "" { return nil, errors.New(resp.Msg) } ins := &inference.DeployInstance{} ins.InstanceName = resp.ServiceName ins.InstanceId = resp.ServiceId ins.Status = resp.Status ins.InferCard = "NPU" ins.ClusterName = m.platform ins.CreatedTime = string(resp.StartTime) ins.ClusterType = TYPE_MODELARTS ins.ModelName = resp.Config[0].ModelName ins.ModelType = m.ModelType ins.InferUrl = resp.AccessAddress return ins, nil } func (m *ModelArtsLink) GetImageInferResult(ctx context.Context, url string, file multipart.File, fileName string) (string, error) { return "", nil } func (m *ModelArtsLink) CreateInferDeployInstance(ctx context.Context, option *option.InferOption) (string, error) { err := m.GetModelId(ctx, option) if err != nil { return "", err } err = m.GetModelStatus(ctx, option) if err != nil { return "", err } configParam := &modelarts.ServiceConfig{ Specification: "modelarts.kat1.xlarge", Weight: 100, ModelId: option.ModelId, InstanceCount: 1, } var configItems []*modelarts.ServiceConfig configItems = append(configItems, configParam) now := time.Now() timestampSec := now.Unix() str := strconv.FormatInt(timestampSec, 10) req := &modelarts.CreateServiceReq{ Platform: m.platform, Config: configItems, InferType: "real-time", ServiceName: option.ModelName + "_" + option.ModelType + "_" + Npu + "_" + str, } ctx, cancel := context.WithTimeout(context.Background(), 150*time.Second) defer cancel() resp, err := m.modelArtsRpc.CreateService(ctx, req) if err != nil { return "", err } return resp.ServiceId, nil } func (m *ModelArtsLink) CheckModelExistence(ctx context.Context, name string, mtype string) bool { ifoption := &option.InferOption{ ModelName: name, ModelType: mtype, } err := m.CheckImageExist(ctx, ifoption) if err != nil { return false } return true } func (m *ModelArtsLink) CheckImageExist(ctx context.Context, option *option.InferOption) error { req := &modelarts.ListImagesReq{ Limit: m.pageSize, Offset: m.pageIndex, } ListImageResp, err := m.modelArtsRpc.ListImages(ctx, req) if err != nil { return err } var modelName string if ListImageResp.Code == 200 { //return errors.New("failed to get ModelId") for _, ListImage := range ListImageResp.Data { if option.ModelName == "ChatGLM-6B" { modelName = "chatglm-6b" } else { modelName = option.ModelName } if ListImage.Name == modelName { return nil } } } return errors.New("failed to find Image ") } func (m *ModelArtsLink) GetResourceSpecs(ctx context.Context) (*collector.ResourceSpec, error) { var wg sync.WaitGroup req := &modelarts.GetResourceFlavorsReq{} resp, err := m.modelArtsRpc.GetResourceFlavors(ctx, req) if err != nil { return nil, err } if resp.Msg != "" { return nil, errors.New(resp.Msg) } MoUsage := MoUsage{} var cpusum int64 = 0 var npusum int64 = 0 for _, Flavors := range resp.Items { MoUsage.CpuSize, err = strconv.ParseInt(Flavors.Spec.Cpu, 10, 64) //CPU的值 if err != nil { // 如果转换失败,处理错误 fmt.Println("转换错误:", err) } cpusum += MoUsage.CpuSize MoUsage.NpuSize, err = strconv.ParseInt(Flavors.Spec.Npu.Size, 10, 64) //NPU的值 if err != nil { // 如果转换失败,处理错误 fmt.Println("转换错误:", err) } npusum += MoUsage.NpuSize } reqTraining := &modelarts.ListTrainingJobsreq{ Platform: m.platform, } //查询作业列表 respList, err := m.modelArtsRpc.GetListTrainingJobs(ctx, reqTraining) if err != nil { wg.Done() } var CoreNum int32 = 0 var NpuNum int32 = 0 for _, TrainLists := range respList.Items { if len(respList.Items) == 0 { wg.Done() } if TrainLists.Status.Phase == "Running" { CoreNum += TrainLists.Spec.Resource.FlavorDetail.FlavorInfo.Cpu.CoreNum NpuNum += TrainLists.Spec.Resource.FlavorDetail.FlavorInfo.Npu.UnitNum } } MoUsage.CpuAvailable = cpusum - int64(CoreNum) MoUsage.NpuAvailable = npusum - int64(NpuNum) UsageCPU := &collector.Usage{Type: strings.ToUpper(CPU), Total: cpusum, Available: MoUsage.CpuAvailable} UsageNPU := &collector.Usage{Type: strings.ToUpper(NPU), Total: npusum, Available: MoUsage.NpuAvailable} resUsage := &collector.ResourceSpec{ ClusterId: m.participantId, } resUsage.Resources = append(resUsage.Resources, UsageCPU) resUsage.Resources = append(resUsage.Resources, UsageNPU) return resUsage, nil }