diff --git a/internal/scheduler/schedulers/aiScheduler.go b/internal/scheduler/schedulers/aiScheduler.go index 7276e820..8e1bfff8 100644 --- a/internal/scheduler/schedulers/aiScheduler.go +++ b/internal/scheduler/schedulers/aiScheduler.go @@ -34,6 +34,8 @@ import ( "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/utils" "gitlink.org.cn/JointCloud/pcm-modelarts/client/modelartsservice" "gitlink.org.cn/JointCloud/pcm-octopus/octopus" + "gitlink.org.cn/JointCloud/pcm-openi/model" + "strconv" "sync" ) @@ -391,6 +393,16 @@ func convertType(in interface{}) (*AiResult, error) { result.JobId = resp.Metadata.Id } + return &result, nil + case *model.CreateTask: + resp := (in).(*model.CreateTask) + + if resp.Code != 0 { + result.Msg = resp.Msg + } else { + result.JobId = strconv.Itoa(resp.Data.Id) + } + return &result, nil default: return nil, errors.New("ai task response failed") diff --git a/internal/scheduler/service/aiService.go b/internal/scheduler/service/aiService.go index 1a26c82a..014cc323 100644 --- a/internal/scheduler/service/aiService.go +++ b/internal/scheduler/service/aiService.go @@ -91,7 +91,7 @@ func InitAiClusterMap(conf *config.Config, clusters []types.ClusterInfo) (map[st inferenceMap[c.Id] = sgai case OPENI: id, _ := strconv.ParseInt(c.Id, 10, 64) - openi := storeLink.NewOpenI(c.Server, id, c.Username, c.Token) + openi := storeLink.NewOpenI("http://localhost:2024", id, c.Username, c.Token) collectorMap[c.Id] = openi executorMap[c.Id] = openi inferenceMap[c.Id] = openi diff --git a/internal/storeLink/openi.go b/internal/storeLink/openi.go index df53e3f9..10dfe6fb 100644 --- a/internal/storeLink/openi.go +++ b/internal/storeLink/openi.go @@ -5,10 +5,12 @@ import ( "context" "encoding/json" "errors" + openIcom "gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/common" "gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/schedulers/option" "gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/service/collector" "gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/service/executor" "gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/service/inference" + "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/utils" "gitlink.org.cn/JointCloud/pcm-openi/common" "gitlink.org.cn/JointCloud/pcm-openi/model" "mime/multipart" @@ -27,11 +29,24 @@ const ( TESTREPO = "testrepo" ) +const ( + CreationRequirelUrl = "/api/v1/task/creationRequired" + TaskCreatelUrl = "/api/v1/task/create" + ReposUrl = "/api/v1/user/repos" + TaskListUrl = "/api/v1/task/list" +) + // compute source var ( ComputeSource = []string{"GPU", "NPU", "GCU", "MLU", "DCU", "CPU", "ILUVATAR-GPGPU", "METAX-GPGPU"} ) +type ResourceSpecOpenI struct { + ResType string + Name string + Number int64 +} + type OpenI struct { participantId int64 host string @@ -54,6 +69,99 @@ func (o OpenI) Execute(ctx context.Context, option *option.AiOption, mode int) ( case executor.SUBMIT_MODE_STORAGE_SCHEDULE: + var repoName string + + codePaths := strings.Split(option.AlgorithmId, FORWARD_SLASH) + if len(codePaths) != 3 { + return nil, errors.New("algorithmId format is incorrect") + } + repoName = codePaths[0] + + spec := &ResourceSpecOpenI{} + for _, res := range option.ResourcesRequired { + typeName, ok := res["type"] + if !ok { + continue + } + name, ok := res["name"] + if !ok { + continue + } + for _, s := range ComputeSource { + switch typeName { + case s: + num, ok := res["number"] + if !ok { + continue + } + n := openIcom.ConvertTypeToString(num) + val, err := strconv.ParseInt(n, 10, 64) + if err != nil { + return nil, err + } + spec.ResType = s + spec.Name = openIcom.ConvertTypeToString(name) + spec.Number = val + break + } + } + } + + if spec.ResType == "" || spec.Name == "" { + return nil, errors.New("resource spec not found") + } + + creationRequirelUrl := o.host + CreationRequirelUrl + + param := model.TaskCreationRequiredParam{ + UserName: o.userName, + RepoName: repoName, + JobType: TRAIN, + ComputeSource: spec.ResType, + ClusterType: C2NET, + } + + b, _ := json.Marshal(param) + byt := bytes.NewBuffer(b) + + resp := struct { + Code int `json:"code"` + Msg string `json:"msg"` + Data model.TaskCreationRequired `json:"data"` + }{} + + req := common.GetRestyRequest(common.TIMEOUT) + r, _ := http.NewRequest("GET", creationRequirelUrl, byt) + req.RawRequest = r + req.URL = creationRequirelUrl + + _, err := req. + SetHeader("Content-Type", "application/json"). + SetQueryParam(common.ACCESSTOKEN, o.accessToken). + SetBody(byt). + SetResult(&resp). + Send() + + if err != nil { + return nil, errors.New("failed to invoke TaskCreationRequired") + } + + if len(resp.Data.Data.Specs.All) == 0 { + return nil, errors.New("TaskCreationRequired specs are empty") + } + + for _, s := range resp.Data.Data.Specs.All { + if spec.ResType == s.ComputeResource && spec.Name == s.AccCardType { + if int(spec.Number) == s.AccCardsNum { + option.ResourceId = strconv.Itoa(s.Id) + break + } + } + } + + if option.ResourceId == "" { + return nil, errors.New("can not find spec Id") + } } task, err := o.SubmitTask(ctx, option.ImageId, option.Cmd, option.Envs, option.Params, option.ResourceId, option.DatasetsId, option.AlgorithmId, option.TaskType) @@ -64,12 +172,41 @@ func (o OpenI) Execute(ctx context.Context, option *option.AiOption, mode int) ( } func (o *OpenI) SubmitTask(ctx context.Context, imageId string, cmd string, envs []string, params []string, resourceId string, datasetsId string, algorithmId string, aiType string) (interface{}, error) { - taskCreatelUrl := o.host + "/api/v1/task/create" + taskCreatelUrl := o.host + TaskCreatelUrl + var repoName string + var branchName string + var bootFile string + codePaths := strings.Split(algorithmId, FORWARD_SLASH) + if len(codePaths) != 3 { + return nil, errors.New("algorithmId format is incorrect") + } - taskParam := &model.CreateTaskParam{} + specId, err := strconv.ParseInt(resourceId, 10, 0) + if err != nil { + return nil, err + } + repoName = codePaths[0] + branchName = codePaths[1] + bootFile = codePaths[2] + + //params := "{\"parameter\":[{\"label\":\"a\",\"value\":\"1\"},{\"label\":\"b\",\"value\":\"2\"}]}" + + taskParam := &model.CreateTaskParam{ + JobType: TRAIN, + Cluster: C2NET, + DisplayJobName: TRAIN + UNDERSCORE + utils.RandomString(10), + ComputeSource: "", + SpecId: int(specId), + BranchName: branchName, + ImageId: imageId, + DatasetUuidStr: datasetsId, + BootFile: bootFile, + HasInternet: 2, // 0 不限制;1 不需要互联网;2 需要互联网 + WorkServerNumber: 1, // 运行节点数 + } param := model.CreateTaskReq{ UserName: o.userName, - RepoName: algorithmId, + RepoName: repoName, CreateTaskParam: taskParam, } @@ -77,9 +214,9 @@ func (o *OpenI) SubmitTask(ctx context.Context, imageId string, cmd string, envs byt := bytes.NewBuffer(b) resp := struct { - Code int `json:"code"` - Msg string `json:"msg"` - Data model.CreateTask `json:"data"` + Code int `json:"code"` + Msg string `json:"msg"` + Data *model.CreateTask `json:"data"` }{} req := common.GetRestyRequest(common.TIMEOUT) @@ -87,7 +224,7 @@ func (o *OpenI) SubmitTask(ctx context.Context, imageId string, cmd string, envs req.RawRequest = r req.URL = taskCreatelUrl - _, err := req. + _, err = req. SetHeader("Content-Type", "application/json"). SetQueryParam(common.ACCESSTOKEN, o.accessToken). SetBody(byt). @@ -173,9 +310,9 @@ func (o OpenI) GetResourceSpecs(ctx context.Context) (*collector.ResourceSpec, e res := &collector.ResourceSpec{ ClusterId: strconv.FormatInt(o.participantId, 10), } - creationRequirelUrl := o.host + "/api/v1/task/creationRequired" - reposUrl := o.host + "/api/v1/user/repos" - taskListUrl := o.host + "/api/v1/task/list" + creationRequirelUrl := o.host + CreationRequirelUrl + reposUrl := o.host + ReposUrl + taskListUrl := o.host + TaskListUrl //taskDetailsUrl := o.host + "/api/v1/task/detail" var wg sync.WaitGroup