/* Copyright (c) [2023] [pcm] [pcm-coordinator] is licensed under Mulan PSL v2. You can use this software according to the terms and conditions of the Mulan PSL v2. You may obtain a copy of Mulan PSL v2 at: http://license.coscl.org.cn/MulanPSL2 THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, EITHER EXPaRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE. See the Mulan PSL v2 for more details. */ package storeLink import ( "context" "errors" "gitlink.org.cn/JointCloud/pcm-ac/hpcAC" hpcacclient "gitlink.org.cn/JointCloud/pcm-ac/hpcacclient" "gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/common" "gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/schedulers/option" "gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/service/collector" "gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/service/executor" "gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/service/inference" "gitlink.org.cn/JointCloud/pcm-coordinator/internal/types" "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/constants" "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/utils" "mime/multipart" "strconv" "strings" "sync" "time" ) const ( RAM_SIZE_1G = 1024 // 1G WORKER_NUMBER = 1 DCU = "DCU" DCU_TOPS = 24.5 PYTORCH = "Pytorch" TASK_PYTORCH_PREFIX = "PytorchTask" TENSORFLOW = "Tensorflow" RESOURCE_GROUP = "kshdtest" WorkPath = "/work/home/acgnnmfbwo/pcmv1/" TimeoutLimit = "10:00:00" PythonCodePath = "/work/home/acgnnmfbwo/111111/py/test.py" DATASETS_DIR = KUNSHAN_DIR + "/dataset" ALGORITHM_DIR = KUNSHAN_DIR + "/algorithm" KUNSHAN_DIR = "/public/home/acgnnmfbwo/pcmv1" TRAIN_FILE = "train.py" CPUCOREPRICEPERHOUR = 0.09 DCUPRICEPERHOUR = 2.0 KB = 1024 TIMEOUT = 20 DEPLOY_INSTANCE_LIMIT = 100 ProtocolType = "HTTP" ContainerPort = 8881 JUPYTER = "jupyter" Z100L = "Z100L" ) var ( RESOURCESGAIMAP = map[string]ResourceSpecSGAI{ "WodTB2rJ8SobMgQ1nrtR245jxOrsovFi": { CPU: 1, GPU: 1, RAM: 2 * RAM_SIZE_1G, }, "6d41v1XV53MQPmQOJ5kNatIck9yl8nWZ": { CPU: 1, GPU: 2, RAM: 2 * RAM_SIZE_1G, }, "OBtVaaXAv9n9FbLR7pWAoa3yR13jXwNc": { CPU: 2, GPU: 3, RAM: 4 * RAM_SIZE_1G, }, "sBWfpkntUzsWYly11kdwEHZOYYIsFmve": { CPU: 4, GPU: 4, RAM: 8 * RAM_SIZE_1G, }, "jeYBVPwyIALjVYNzHvysh2o5CsBpBLp2": { CPU: 5, GPU: 5, RAM: 10 * RAM_SIZE_1G, }, } RESOURCESPECSAI = map[string]string{ "WodTB2rJ8SobMgQ1nrtR245jxOrsovFi": "CPU:1, DCU:1, RAM:2G", "6d41v1XV53MQPmQOJ5kNatIck9yl8nWZ": "CPU:1, DCU:2, RAM:2G", "OBtVaaXAv9n9FbLR7pWAoa3yR13jXwNc": "CPU:2, DCU:3, RAM:4G", "sBWfpkntUzsWYly11kdwEHZOYYIsFmve": "CPU:4, DCU:4, RAM:8G", "jeYBVPwyIALjVYNzHvysh2o5CsBpBLp2": "CPU:5, DCU:5, RAM:10G", } ModelNameCmdMap = map[string]string{ "blip-image-captioning-base": "sudo pip install transformers python-multipart fastapi uvicorn[standard]; sudo python /public/home/acgnnmfbwo/pcmv1/inference/pytorch/blip_image_captioning_base/infer.py", "imagenet_resnet50": "sudo pip install fastapi uvicorn[standard] python-multipart; sudo python /public/home/acgnnmfbwo/pcmv1/inference/pytorch/imagenet_resnet50/infer.py", } ) type ResourceSpecSGAI struct { CPU int64 GPU int64 RAM int64 } type ShuguangAi struct { aCRpc hpcacclient.HpcAC platform string participantId int64 } func NewShuguangAi(aCRpc hpcAC.HpcACClient, name string, id int64) *ShuguangAi { return &ShuguangAi{aCRpc: aCRpc, platform: name, participantId: id} } func (s *ShuguangAi) UploadImage(ctx context.Context, path string) (interface{}, error) { return nil, nil } func (s *ShuguangAi) DeleteImage(ctx context.Context, imageId string) (interface{}, error) { return nil, nil } func (s *ShuguangAi) QueryImageList(ctx context.Context) (interface{}, error) { // shuguangAi获取镜像列表 req := &hpcAC.GetImageListAiReq{ AcceleratorType: DCU, TaskType: PYTORCH, } resp, err := s.aCRpc.GetImageListAi(ctx, req) if err != nil { return nil, err } return resp, nil } func (s *ShuguangAi) SubmitPytorchTask(ctx context.Context, imageId string, cmd string, envs []string, params []string, resourceId string, datasetsId string, algorithmId string) (interface{}, error) { //判断是否resourceId匹配自定义资源Id _, isMapContainsKey := RESOURCESPECSAI[resourceId] if !isMapContainsKey { return nil, errors.New("shuguangAi资源Id不存在") } //根据imageId获取imagePath, version imageReq := &hpcAC.GetImageAiByIdReq{ImageId: imageId} imageResp, err := s.aCRpc.GetImageAiById(ctx, imageReq) if err != nil { return nil, err } //python参数 var pythonArg string for _, param := range params { s := strings.Split(param, COMMA) pythonArg += PY_PARAM_PREFIX + s[0] + "=" + s[1] + SPACE } //环境变量 var env string for _, e := range envs { s := strings.Split(e, COMMA) env += s[0] + "=" + s[1] + SPACE } //set paths var workPath string var codePath string paths := strings.Split(algorithmId, DASH) if len(paths) == 3 { workPath = ALGORITHM_DIR + FORWARD_SLASH + paths[0] + FORWARD_SLASH + paths[1] + DASH + paths[2] codePath = workPath + FORWARD_SLASH + TRAIN_FILE } else { // storage schedule submit mode codePath = algorithmId paths = strings.Split(algorithmId, FORWARD_SLASH) last := paths[len(paths)-1] workPath = strings.TrimSuffix(algorithmId, FORWARD_SLASH+last) } req := &hpcAC.SubmitPytorchTaskReq{ Params: &hpcAC.SubmitPytorchTaskParams{ TaskName: TASK_PYTORCH_PREFIX + UNDERSCORE + utils.RandomString(10), WorkPath: workPath, IsDistributed: false, IsHvd: false, Env: env, AcceleratorType: DCU, Version: imageResp.Image.Version, ImagePath: imageResp.Image.Path, WorkerNumber: WORKER_NUMBER, ResourceGroup: RESOURCE_GROUP, TimeoutLimit: TimeoutLimit, PythonCodePath: codePath, PythonArg: pythonArg, }, } updateSGAIRequestByResourceId(resourceId, req) resp, err := s.aCRpc.SubmitPytorchTask(ctx, req) if err != nil { return nil, err } return resp, nil } func updateSGAIRequestByResourceId(resourceId string, req *hpcAC.SubmitPytorchTaskReq) { spec := RESOURCESGAIMAP[resourceId] req.Params.WorkerCpuNumber = spec.CPU req.Params.WorkerGpuNumber = spec.GPU req.Params.WorkerRamSize = spec.RAM } func (s *ShuguangAi) SubmitTensorflowTask(ctx context.Context, imageId string, cmd string, envs []string, params []string, resourceId string, datasetsId string, algorithmId string) (interface{}, error) { //req := &hpcAC.SubmitTensorflowTaskReq{ // Params: &hpcAC.SubmitTensorflowTaskParams{ // // } //} return nil, nil } func (s *ShuguangAi) SubmitTask(ctx context.Context, imageId string, cmd string, envs []string, params []string, resourceId string, datasetsId string, algorithmId string, aiType string) (interface{}, error) { // set algorithmId temporarily for storelink submit if algorithmId == "" { algorithmId = "pytorch-mnist-fcn" } // shuguangAi提交任务 switch aiType { case PYTORCH_TASK: task, err := s.SubmitPytorchTask(ctx, imageId, cmd, envs, params, resourceId, datasetsId, algorithmId) if err != nil { return nil, err } return task, nil case TENSORFLOW_TASK: task, err := s.SubmitTensorflowTask(ctx, imageId, cmd, envs, params, resourceId, datasetsId, algorithmId) if err != nil { return nil, err } return task, nil default: task, err := s.SubmitPytorchTask(ctx, imageId, cmd, envs, params, resourceId, datasetsId, algorithmId) if err != nil { return nil, err } return task, nil } } func (s *ShuguangAi) QueryTask(ctx context.Context, taskId string) (interface{}, error) { // shuguangAi获取任务 req := &hpcAC.GetPytorchTaskReq{ Id: taskId, } resp, err := s.aCRpc.GetPytorchTask(ctx, req) if err != nil { return nil, err } return resp, nil } func (s *ShuguangAi) DeleteTask(ctx context.Context, taskId string) (interface{}, error) { // shuguangAi删除任务 req := &hpcAC.DeleteTaskAiReq{ Ids: taskId, } resp, err := s.aCRpc.DeleteTaskAi(ctx, req) if err != nil { return nil, err } return resp, nil } func (s *ShuguangAi) QuerySpecs(ctx context.Context) (interface{}, error) { resp := &types.GetResourceSpecsResp{} for k, v := range RESOURCESPECSAI { var respec types.ResourceSpecSl respec.SpecId = k respec.SpecName = v respec.ParticipantId = s.participantId respec.ParticipantName = s.platform resp.ResourceSpecs = append(resp.ResourceSpecs, &respec) } resp.Success = true return resp, nil } func (s *ShuguangAi) GetResourceStats(ctx context.Context) (*collector.ResourceStats, error) { var wg sync.WaitGroup wg.Add(5) var cBalance = make(chan float64) var cMemTotal = make(chan float64) var cTotalCpu = make(chan int64) resourceStats := &collector.ResourceStats{ ClusterId: strconv.FormatInt(s.participantId, 10), Name: s.platform, } dcu := &collector.Card{ Platform: SHUGUANGAI, Type: CARD, Name: DCU, TOpsAtFp16: DCU_TOPS, } //history jobs go func() { hReq := &hpcAC.ListHistoryJobReq{} hReq.Start = 0 hReq.Limit = 1 hReq.IsQueryByQueueTime = "false" hReq.TimeType = "CUSTOM" hReq.StartTime = "2024-01-01 01:01:01" endTime := time.Now().Format("2006-01-02 15:04:05") hReq.EndTime = endTime hResp, err := s.aCRpc.ListHistoryJob(ctx, hReq) if err != nil || hResp.Code != "0" { wg.Done() return } if hResp.Data == nil { wg.Done() return } resourceStats.TaskCompleted = int64(hResp.Data.Total) wg.Done() }() //balance go func() { userReq := &hpcAC.GetUserInfoReq{} userinfo, err := s.aCRpc.GetUserInfo(ctx, userReq) if err != nil { return } if userinfo.Data == nil { wg.Done() return } balance, _ := strconv.ParseFloat(userinfo.Data.AccountBalance, 64) resourceStats.Balance = balance cBalance <- balance }() //resource limit go func() { limitReq := &hpcAC.QueueReq{} limitResp, err := s.aCRpc.QueryUserQuotasLimit(ctx, limitReq) if err != nil || limitResp.Code != "0" { wg.Done() return } if limitResp.Data == nil { wg.Done() return } totalCpu := limitResp.Data.AccountMaxCpu totalDcu := limitResp.Data.AccountMaxDcu dcu.CardNum = int32(totalDcu) resourceStats.CpuCoreTotal = totalCpu cTotalCpu <- totalCpu wg.Done() }() //disk go func() { diskReq := &hpcAC.ParaStorQuotaReq{} diskResp, err := s.aCRpc.ParaStorQuota(ctx, diskReq) if err != nil { wg.Done() return } if diskResp.Data == nil { wg.Done() return } totalDisk := common.RoundFloat(diskResp.Data[0].Threshold*KB*KB*KB, 3) availDisk := common.RoundFloat((diskResp.Data[0].Threshold-diskResp.Data[0].Usage)*KB*KB*KB, 3) resourceStats.DiskTotal = totalDisk resourceStats.DiskAvail = availDisk wg.Done() }() //memory go func() { nodeResp, err := s.aCRpc.GetNodeResources(ctx, nil) if err != nil { wg.Done() return } if nodeResp.Data == nil { wg.Done() return } memSize := common.RoundFloat(float64(nodeResp.Data.MemorySize)*KB*KB, 3) // MB to BYTES resourceStats.MemTotal = memSize cMemTotal <- memSize wg.Done() }() //resources being occupied go func() { var memSize float64 var totalCpu int64 select { case v := <-cMemTotal: memSize = v case <-time.After(TIMEOUT * time.Second): wg.Done() return } select { case v := <-cTotalCpu: totalCpu = v case <-time.After(TIMEOUT * time.Second): wg.Done() return } memberJobResp, err := s.aCRpc.GetMemberJobs(ctx, nil) if err != nil { wg.Done() return } if memberJobResp.Data == nil { wg.Done() return } var cpuCoreAvail int64 var memAvail float64 if len(memberJobResp.Data) != 0 { cpuCoreAvail = totalCpu memAvail = memSize } else { var cpuCoreUsed int64 var memUsed float64 for _, datum := range memberJobResp.Data { cpuCoreUsed += datum.CpuCore } memUsed = float64(cpuCoreUsed * 2 * KB * KB * KB) // 2 GB per cpu core if cpuCoreUsed > totalCpu { cpuCoreAvail = 0 } else { cpuCoreAvail = totalCpu - cpuCoreUsed } if memUsed > memSize { memAvail = 0 } else { memAvail = memSize - memUsed } } resourceStats.CpuCoreAvail = cpuCoreAvail resourceStats.MemAvail = memAvail wg.Done() }() //usable hours var balance float64 select { case v := <-cBalance: balance = v case <-time.After(TIMEOUT * time.Second): return nil, errors.New("get balance rpc call failed") } var cards []*collector.Card cardHours := common.RoundFloat(balance/DCUPRICEPERHOUR, 3) cpuHours := common.RoundFloat(balance/CPUCOREPRICEPERHOUR, 3) dcu.CardHours = cardHours resourceStats.CpuCoreHours = cpuHours resourceStats.Balance = balance wg.Wait() cards = append(cards, dcu) resourceStats.CardsAvail = cards return resourceStats, nil } func (s *ShuguangAi) GetDatasetsSpecs(ctx context.Context) ([]*collector.DatasetsSpecs, error) { req := &hpcAC.GetFileListReq{Limit: 100, Path: DATASETS_DIR, Start: 0} list, err := s.aCRpc.GetFileList(ctx, req) if err != nil { return nil, err } if list.Code != "0" { return nil, errors.New(list.Msg) } specs := []*collector.DatasetsSpecs{} for _, file := range list.Data.FileList { spec := &collector.DatasetsSpecs{Name: file.Name, Size: strconv.FormatInt(file.Size, 10)} specs = append(specs, spec) } return specs, nil } func (s *ShuguangAi) GetAlgorithms(ctx context.Context) ([]*collector.Algorithm, error) { var algorithms []*collector.Algorithm for _, t := range GetTaskTypes() { taskType := t req := &hpcAC.GetFileListReq{Limit: 100, Path: ALGORITHM_DIR + FORWARD_SLASH + taskType, Start: 0, Order: "asc", OrderBy: "name", KeyWord: ""} list, err := s.aCRpc.GetFileList(ctx, req) if err != nil { return nil, err } if list.Code != "0" { return nil, errors.New(list.Msg) } for _, file := range list.Data.FileList { algorithm := &collector.Algorithm{Name: file.Name, Platform: SHUGUANGAI, TaskType: taskType} algorithms = append(algorithms, algorithm) } } return algorithms, nil } func (s *ShuguangAi) GetComputeCards(ctx context.Context) ([]string, error) { var cards []string cards = append(cards, DCU) return cards, nil } func (s *ShuguangAi) GetUserBalance(ctx context.Context) (float64, error) { userReq := &hpcAC.GetUserInfoReq{} userinfo, err := s.aCRpc.GetUserInfo(ctx, userReq) if err != nil { return 0, err } balance, _ := strconv.ParseFloat(userinfo.Data.AccountBalance, 64) return balance, nil } func (s *ShuguangAi) DownloadAlgorithmCode(ctx context.Context, resourceType string, card string, taskType string, dataset string, algorithm string) (string, error) { algoName := dataset + DASH + algorithm req := &hpcAC.GetFileReq{ Path: ALGORITHM_DIR + FORWARD_SLASH + taskType + FORWARD_SLASH + algoName + FORWARD_SLASH + TRAIN_FILE, } resp, err := s.aCRpc.GetFile(ctx, req) if err != nil { return "", err } return resp.Content, nil } func (s *ShuguangAi) UploadAlgorithmCode(ctx context.Context, resourceType string, card string, taskType string, dataset string, algorithm string, code string) error { algoName := dataset + DASH + algorithm req := &hpcAC.UploadFileReq{ Path: ALGORITHM_DIR + FORWARD_SLASH + taskType + FORWARD_SLASH + algoName + FORWARD_SLASH, Cover: "cover", File: code, } _, err := s.aCRpc.UploadFile(ctx, req) if err != nil { return err } return nil } func (s *ShuguangAi) GetTrainingTaskLog(ctx context.Context, taskId string, instanceNum string) (string, error) { req := &hpcAC.GetInstanceLogReq{ TaskId: taskId, InstanceNum: instanceNum, LineCount: 1000, StartLineNum: -1, } resp, err := s.aCRpc.GetInstanceLog(ctx, req) if err != nil { return "", err } if resp.Code != "0" { resp.Data.Content = "waiting for logs..." } return resp.Data.Content, nil } func (s *ShuguangAi) GetTrainingTask(ctx context.Context, taskId string) (*collector.Task, error) { resp, err := s.QueryTask(ctx, taskId) if err != nil { return nil, err } jobresp := (resp).(*hpcAC.GetPytorchTaskResp) if jobresp.Code != "0" { return nil, errors.New(jobresp.Msg) } var task collector.Task task.Id = jobresp.Data.Id if jobresp.Data.StartTime != "" { task.Start = jobresp.Data.StartTime } if jobresp.Data.EndTime != "" { task.End = jobresp.Data.EndTime } task.Status = jobresp.Data.Status return &task, nil } func (s *ShuguangAi) Stop(ctx context.Context, id string) error { req := &hpcAC.StopTaskAiReq{ Id: id, } resp, err := s.aCRpc.StopTaskAi(ctx, req) if err != nil { return err } if resp.Code != "0" { return errors.New(resp.Msg) } return nil } func (s *ShuguangAi) Execute(ctx context.Context, option *option.AiOption, mode int) (interface{}, error) { switch mode { case executor.SUBMIT_MODE_JOINT_CLOUD: err := s.GenerateSubmitParams(ctx, option) if err != nil { return nil, err } case executor.SUBMIT_MODE_STORAGE_SCHEDULE: var dcuNum int64 for _, res := range option.ResourcesRequired { typeName, ok := res["type"] if !ok { continue } switch typeName { case DCU: num, ok := res["number"] if !ok { continue } n := common.ConvertTypeToString(num) val, err := strconv.ParseInt(n, 10, 64) if err != nil { return nil, err } dcuNum = val } } for k, v := range RESOURCESGAIMAP { if dcuNum == v.GPU { option.ResourceId = k break } if dcuNum == 0 && v.GPU == 1 { option.ResourceId = k break } if dcuNum >= 5 && v.GPU == 5 { option.ResourceId = k break } } option.ComputeCard = DCU default: return nil, errors.New("failed to choose submit mode") } task, err := s.SubmitTask(ctx, option.ImageId, option.Cmd, option.Envs, option.Params, option.ResourceId, option.DatasetsId, option.AlgorithmId, option.TaskType) if err != nil { return nil, err } return task, nil } func (s *ShuguangAi) GenerateSubmitParams(ctx context.Context, option *option.AiOption) error { err := s.generateResourceId(option) if err != nil { return err } err = s.generateImageId(ctx, option) if err != nil { return err } err = s.generateAlgorithmId(ctx, option) if err != nil { return err } err = s.generateCmd(option) if err != nil { return err } err = s.generateEnv(option) if err != nil { return err } err = s.generateParams(option) if err != nil { return err } return nil } func (s *ShuguangAi) generateResourceId(option *option.AiOption) error { if option.ResourceType == "" { return errors.New("ResourceType not set") } if option.ResourceType == CPU { option.ResourceId = "WodTB2rJ8SobMgQ1nrtR245jxOrsovFi" option.ComputeCard = CPU return nil } if option.ResourceType == CARD { if option.ComputeCard == "" { option.ComputeCard = DCU } if strings.ToUpper(option.ComputeCard) != DCU { return errors.New("computeCard not found") } option.ComputeCard = DCU if 0 <= option.Tops && option.Tops <= DCU_TOPS { option.ResourceId = "WodTB2rJ8SobMgQ1nrtR245jxOrsovFi" return nil } cardNum := 5 for k, v := range RESOURCESGAIMAP { for i := 1; i <= cardNum; i++ { if float64(i)*DCU_TOPS <= option.Tops && option.Tops <= float64(v.GPU)*DCU_TOPS { option.ResourceId = k return nil } } } if option.Tops > float64(cardNum)*DCU_TOPS { option.ResourceId = "jeYBVPwyIALjVYNzHvysh2o5CsBpBLp2" return nil } } return errors.New("failed to get ResourceId") } func (s *ShuguangAi) generateImageId(ctx context.Context, option *option.AiOption) error { if option.TaskType == "" { return errors.New("TaskType not set") } taskType := strings.Title(option.TaskType) req := &hpcAC.GetImageListAiReq{ AcceleratorType: DCU, TaskType: taskType, } resp, err := s.aCRpc.GetImageListAi(ctx, req) if err != nil { return errors.New("generateImageId / GetImageListAi: " + err.Error()) } if resp.Code != "0" { return errors.New("failed to get imageId") } for _, datum := range resp.Data { ns := strings.Split(datum.Version, COLON) if ns[0] == "jupyterlab-pytorch" { option.ImageId = datum.ImageId return nil } } return errors.New("failed to get ImageId") } func (s *ShuguangAi) generateAlgorithmId(ctx context.Context, option *option.AiOption) error { if option.DatasetsName == "" { return errors.New("DatasetsName not set") } req := &hpcAC.GetFileListReq{Limit: 100, Path: ALGORITHM_DIR + FORWARD_SLASH + option.TaskType, Start: 0} list, err := s.aCRpc.GetFileList(ctx, req) if err != nil { return errors.New("generateAlgorithmId / GetFileListReq: " + err.Error()) } if list.Code != "0" { return errors.New(list.Msg) } var algorithmId string for _, file := range list.Data.FileList { ns := strings.Split(file.Name, DASH) if ns[0] == option.DatasetsName { algoName := ns[1] if option.AlgorithmName == "" { switch option.DatasetsName { case "cifar10": algorithmId = option.TaskType + DASH + option.DatasetsName + DASH + "cnn" option.AlgorithmId = algorithmId option.AlgorithmName = algoName return nil case "mnist": algorithmId = option.TaskType + DASH + option.DatasetsName + DASH + "fcn" option.AlgorithmId = algorithmId option.AlgorithmName = algoName return nil } } else { if algoName == option.AlgorithmName { algorithmId = option.TaskType + DASH + option.DatasetsName + DASH + algoName option.AlgorithmId = algorithmId return nil } } } } if algorithmId == "" { return errors.New("Algorithm does not exist") } return errors.New("failed to get AlgorithmId") } func (s *ShuguangAi) generateCmd(option *option.AiOption) error { return nil } func (s *ShuguangAi) generateEnv(option *option.AiOption) error { return nil } func (s *ShuguangAi) generateParams(option *option.AiOption) error { if option.ResourceType == "" { return errors.New("ResourceType not set") } if len(option.Params) == 0 { epoch := "epoch" + COMMA + "1" option.Params = append(option.Params, epoch) } switch option.ResourceType { case CPU: card := "card" + COMMA + CPU option.Params = append(option.Params, card) return nil case CARD: card := "card" + COMMA + "cuda:0" option.Params = append(option.Params, card) return nil } return errors.New("failed to set params") } func (s *ShuguangAi) GetClusterInferUrl(ctx context.Context, option *option.InferOption) (*inference.ClusterInferUrl, error) { var imageUrls []*inference.InferUrl urlReq := &hpcAC.GetInferUrlReq{ ModelName: option.ModelName, Type: option.ModelType, Card: "dcu", } urlResp, err := s.aCRpc.GetInferUrl(ctx, urlReq) if err != nil { return nil, err } imageUrl := &inference.InferUrl{ Url: urlResp.Url, Card: "dcu", } imageUrls = append(imageUrls, imageUrl) clusterWithUrl := &inference.ClusterInferUrl{ ClusterName: s.platform, ClusterType: TYPE_SHUGUANGAI, InferUrls: imageUrls, } return clusterWithUrl, nil } func (s *ShuguangAi) GetInferDeployInstanceList(ctx context.Context) ([]*inference.DeployInstance, error) { var insList []*inference.DeployInstance params := &hpcAC.GetInstanceServiceListReqParam{ InstanceServiceName: DEPLOY_INSTANCE_PREFIEX, Status: "", TaskType: "", Start: 0, Limit: DEPLOY_INSTANCE_LIMIT, Sort: "desc", } req := &hpcacclient.GetInstanceServiceListReq{ Param: params, } list, err := s.aCRpc.GetInstanceServiceList(ctx, req) if err != nil { return nil, err } if list.Code != "0" { return nil, errors.New(list.Msg) } for _, datum := range list.Data { ins := &inference.DeployInstance{} ins.InstanceName = datum.InstanceServiceName ins.InstanceId = datum.Id ins.ClusterName = s.platform ins.Status = datum.Status ins.InferCard = DCU ins.CreatedTime = datum.CreateTime ins.ClusterType = TYPE_SHUGUANGAI insList = append(insList, ins) } return insList, nil } func (s *ShuguangAi) StartInferDeployInstance(ctx context.Context, id string) bool { req := &hpcAC.StartInstanceServiceReq{ InstanceServiceId: id, } resp, err := s.aCRpc.StartInstanceService(ctx, req) if err != nil || resp.Code != "0" { return false } if resp.Data == id && resp.Code == "0" { return true } return false } func (s *ShuguangAi) StopInferDeployInstance(ctx context.Context, id string) bool { ids := []string{id} req := &hpcAC.StopInstanceServiceReq{ Ids: ids, } resp, err := s.aCRpc.StopInstanceService(ctx, req) if err != nil || resp.Code != "0" { return false } if resp.Code == "0" { return true } return false } func (s *ShuguangAi) GetInferDeployInstance(ctx context.Context, id string) (*inference.DeployInstance, error) { ins := &inference.DeployInstance{} req := &hpcAC.GetInstanceServiceDetailReq{ Id: id, } resp, err := s.aCRpc.GetInstanceServiceDetail(ctx, req) if err != nil || resp.Code != "0" || resp.Data == nil { return nil, err } if resp.Data == nil { return nil, errors.New("GetInferDeployInstance empty") } var url string if resp.Data.Status == constants.Running { url = resp.Data.ContainerPortInfoList[0].AccessUrl } var modelType string var modelName string var card string if resp.Data.Description != "" { str := strings.Split(resp.Data.Description, FORWARD_SLASH) if len(str) == 3 { modelType = str[0] modelName = str[1] card = str[2] } } ins.InstanceName = resp.Data.InstanceServiceName ins.InstanceId = resp.Data.Id ins.ClusterName = s.platform ins.Status = resp.Data.Status ins.InferCard = DCU ins.CreatedTime = resp.Data.CreateTime ins.ClusterType = TYPE_SHUGUANGAI ins.ModelType = modelType ins.ModelName = modelName ins.InferUrl = url ins.InferCard = card return ins, nil } func (s *ShuguangAi) GetImageInferResult(ctx context.Context, url string, file multipart.File, fileName string) (string, error) { return "", nil } func (s *ShuguangAi) CreateInferDeployInstance(ctx context.Context, option *option.InferOption) (string, error) { containerPortInfoList := []*hpcAC.ContainerPortInfoList{ { ProtocolType: ProtocolType, ContainerPort: ContainerPort, }, } desc := option.ModelType + FORWARD_SLASH + option.ModelName + FORWARD_SLASH + strings.ToLower(DCU) instanceServiceName := "infer_instance" + UNDERSCORE + utils.TimeString() resourceGroup := "kshdtest" script, ok := ModelNameCmdMap[option.ModelName] if !ok { return "", errors.New("failed to set cmd, ModelName not exist") } param := &hpcAC.CreateParams{ AcceleratorType: strings.ToLower(DCU), ContainerPortInfoList: containerPortInfoList, CpuNumber: 8, Description: desc, //env GpuNumber: 1, ImagePath: "11.11.100.6:5000/dcu/admin/base/jupyterlab-pytorch:1.13.1-py3.7-dtk23.04-centos7.6", InstanceServiceName: instanceServiceName, MountInfoList: make([]*hpcAC.MountInfoList, 0), //originalVersion RamSize: 10 * RAM_SIZE_1G, //rdma ResourceGroup: resourceGroup, StartScriptActionScope: "all", StartScriptContent: script, //startServiceCommand //taskClassification: "interactive" TaskNumber: 1, TaskType: JUPYTER, TimeoutLimit: "01:00:00", UseStartScript: true, //useStartServiceCommand: false Version: "jupyterlab-pytorch:1.13.1-py3.7-dtk23.04-centos7.6", } req := &hpcacclient.CreateInstanceServiceReq{ Data: param, } resp, err := s.aCRpc.CreateInstanceService(ctx, req) if err != nil { return "", err } if resp.Code != "0" { return "", errors.New(resp.Msg) } return resp.Data, nil } func (s *ShuguangAi) CheckModelExistence(ctx context.Context, name string, mtype string) bool { modelPath := "model" + FORWARD_SLASH + name req := &hpcAC.IsExistFileReq{ Path: KUNSHAN_DIR + FORWARD_SLASH + modelPath, } resp, err := s.aCRpc.IsExistFile(ctx, req) if err != nil { return false } if resp.Code != "0" || resp.Data == nil { return false } return resp.Data.Exist } func (s *ShuguangAi) GetResourceSpecs(ctx context.Context) (*collector.ResourceSpec, error) { return nil, nil //var timeout = 5 //var wg sync.WaitGroup //var uwg sync.WaitGroup //wg.Add(3) //uwg.Add(3) //var ch = make(chan *collector.Usage, 2) //var qCh = make(chan *collector.Usage, 2) //var sch = make(chan *collector.Usage, 1) //var cresCh = make(chan *collector.ClusterResource) // //resUsage := &collector.ResourceSpec{ // ClusterId: strconv.FormatInt(s.participantId, 10), //} // //var resources []interface{} // //// 查询用户可访问队列 //go func() { // defer wg.Done() // defer close(ch) // done := make(chan bool) // go func() { // defer uwg.Done() // queueResp, err := s.aCRpc.SelectQueueByUser(ctx, nil) // if err != nil { // done <- true // return // } // // if len(queueResp.Data) == 0 { // done <- true // return // } // // var data *hpcAC.QueueData // for _, datum := range queueResp.Data { // if datum.QueueName == RESOURCE_GROUP { // data = datum // break // } // } // // //rate // queChargeRate, _ := strconv.ParseFloat(data.QueChargeRate, 64) // rate := &collector.Usage{ // Type: strings.ToUpper(RATE), // Total: &collector.UnitValue{Unit: PERHOUR, Value: queChargeRate}, // } // // cresCh <- &collector.ClusterResource{Resource: rate} // // var freeNodes int64 // var cpuPerNode int64 // var dcuPerNode int64 // freeNodes, _ = strconv.ParseInt(data.QueFreeNodes, 10, 10) // cpuPerNode, _ = strconv.ParseInt(data.QueMaxPPN, 10, 10) // dcuPerNode, _ = strconv.ParseInt(data.QueMaxDcuPN, 10, 10) // // cpu := &collector.Usage{ // Type: strings.ToUpper(CPU), // Total: &collector.UnitValue{Unit: CPUCORE, Value: freeNodes * cpuPerNode}, // } // // ch <- cpu // // dcu := &collector.Usage{ // Type: DCU, // Name: Z100L, // Total: &collector.UnitValue{Unit: NUMBER, Value: freeNodes * dcuPerNode}, // } // // ch <- dcu // // done <- true // }() // // select { // case <-done: // return // case <-time.After(time.Duration(timeout) * time.Second): // return // } // //}() // //// 查询实时作业列表 //go func() { // defer wg.Done() // defer close(qCh) // done := make(chan bool) // go func() { // defer uwg.Done() // jobList, err := s.aCRpc.ListJob(ctx, nil) // if err != nil { // done <- true // return // } // // // running task num // run := &collector.Usage{} // run.Type = strings.ToUpper(RUNNINGTASK) // // if len(jobList.Jobs) == 0 { // var v int64 // run.Total = &collector.UnitValue{ // Unit: NUMBER, // Value: v, // } // // cresCh <- &collector.ClusterResource{Resource: run} // // done <- true // return // } else { // var v int64 // v = int64(len(jobList.Jobs)) // run.Total = &collector.UnitValue{ // Unit: NUMBER, // Value: v, // } // // cresCh <- &collector.ClusterResource{Resource: run} // } // // var cpureqed atomic.Int64 // var dcureqed atomic.Int64 // //var jwg sync.WaitGroup // //for _, j := range jobList.Jobs { // // jwg.Add(1) // // job := j // // go func() { // // defer jwg.Done() // // h := http.Request{} // // jreq := &hpcAC.JobDetailReq{ // // JobId: job.JobId, // // } // // detail, err := s.aCRpc.GetJobDetail(h.Context(), jreq) // // if err != nil || detail.Data == nil { // // return // // } // // // // cpureqed.Add(int64(detail.Data.ProcNumReq)) // // dcureqed.Add(int64(detail.Data.DcuNumReq)) // // }() // //} // //jwg.Wait() // // for v := range ch { // switch v.Type { // case strings.ToUpper(CPU): // t, _ := v.Total.Value.(int64) // avail := t - cpureqed.Load() // cpu := &collector.Usage{ // Type: strings.ToUpper(CPU), // Name: v.Name, // Total: v.Total, // Available: &collector.UnitValue{Unit: CPUCORE, Value: avail}, // } // // qCh <- cpu // // case DCU: // t, _ := v.Total.Value.(int64) // avail := t - dcureqed.Load() // dcu := &collector.Usage{ // Type: DCU, // Name: v.Name, // Total: v.Total, // Available: &collector.UnitValue{Unit: CPUCORE, Value: avail}, // } // // qCh <- dcu // } // } // done <- true // }() // // select { // case <-done: // return // case <-time.After(time.Duration(timeout) * time.Second): // return // } //}() // //// 查询用户共享存储配额及使用量 //go func() { // defer wg.Done() // defer close(sch) // done := make(chan bool) // storage := &collector.Usage{} // go func() { // // diskReq := &hpcAC.ParaStorQuotaReq{} // diskResp, err := s.aCRpc.ParaStorQuota(ctx, diskReq) // if err != nil || diskResp.Data == nil { // done <- true // return // } // // totalStorage := common.RoundFloat(diskResp.Data[0].Threshold, 0) // availStorage := common.RoundFloat((diskResp.Data[0].Threshold - diskResp.Data[0].Usage), 0) // // storage.Type = STORAGE // storage.Name = DISK // storage.Total = &collector.UnitValue{ // Unit: GIGABYTE, // Value: totalStorage, // } // storage.Available = &collector.UnitValue{ // Unit: GIGABYTE, // Value: availStorage, // } // // done <- true // }() // // select { // case <-done: // sch <- storage // case <-time.After(time.Duration(timeout) * time.Second): // return // } //}() // //// 查询用户信息 //go func() { // defer uwg.Done() // done := make(chan bool) // cres := &collector.ClusterResource{} // go func() { // userReq := &hpcAC.GetUserInfoReq{} // userinfo, err := s.aCRpc.GetUserInfo(ctx, userReq) // if err != nil || userinfo.Data == nil { // done <- true // return // } // balance, _ := strconv.ParseFloat(userinfo.Data.AccountBalance, 64) // bal := &collector.Usage{} // bal.Type = strings.ToUpper(BALANCE) // bal.Total = &collector.UnitValue{ // Unit: RMB, // Value: balance, // } // cres.Resource = bal // // done <- true // }() // // select { // case <-done: // cresCh <- cres // case <-time.After(time.Duration(timeout) * time.Second): // return // } //}() // //go func() { // uwg.Wait() // close(cresCh) //}() // //for v := range cresCh { // resources = append(resources, v) //} // //wg.Wait() // //cres := &collector.ClusterResource{} //bres := make([]*collector.Usage, 0) //if len(qCh) == 0 { // for v := range ch { // v.Available = v.Total // switch v.Type { // case DCU: // cres.Resource = v // case strings.ToUpper(CPU): // bres = append(bres, v) // } // } //} else { // for v := range qCh { // switch v.Type { // case DCU: // cres.Resource = v // case strings.ToUpper(CPU): // bres = append(bres, v) // } // } //} // //// temporarily set memory usage ////var dcuNum int //// ////mem := &collector.Usage{ //// Type: strings.ToUpper(MEMORY), //// Name: strings.ToUpper(RAM), //// Total: &collector.UnitValue{Unit: GIGABYTE, Value: 2 * RAM_SIZE_1G}, //// Available: &collector.UnitValue{Unit: GIGABYTE, Value: 2 * RAM_SIZE_1G}, ////} ////vmem := &collector.Usage{ //// Type: strings.ToUpper(MEMORY), //// Name: strings.ToUpper(VRAM), //// Total: &collector.UnitValue{Unit: GIGABYTE, Value: 2 * RAM_SIZE_1G}, //// Available: &collector.UnitValue{Unit: GIGABYTE, Value: 2 * RAM_SIZE_1G}, ////} ////bres = append(bres, mem) ////bres = append(bres, vmem) // //for v := range sch { // bres = append(bres, v) //} // //cres.BaseResources = bres //resources = append(resources, cres) //resUsage.Resources = resources // //return resUsage, nil }