diff --git a/internal/scheduler/entity/entity.go b/internal/scheduler/entity/entity.go index 9654f887..82bac5ba 100644 --- a/internal/scheduler/entity/entity.go +++ b/internal/scheduler/entity/entity.go @@ -16,3 +16,29 @@ type JsonData struct { Name string `json:"name"` Id string `json:"id"` } + +type OctCreateJobResp struct { + Code int `json:"code"` + Msg string `json:"msg"` + Data interface{} `json:"data"` +} + +type OctResourceSpecsResp struct { + Code int `json:"code"` + Msg string `json:"msg"` + Data interface{} `json:"data"` +} + +type OctResourceSpecs struct { + MapResourceSpecIdList struct { + Debug struct { + ResourceSpecs []interface{} `json:"resourceSpecs"` + } `json:"debug"` + Deploy struct { + ResourceSpecs []interface{} `json:"resourceSpecs"` + } `json:"deploy"` + Train struct { + ResourceSpecs []interface{} `json:"resourceSpecs"` + } `json:"train"` + } `json:"mapResourceSpecIdList"` +} diff --git a/internal/scheduler/schedulers/aiScheduler.go b/internal/scheduler/schedulers/aiScheduler.go index f1ec62ba..403298ee 100644 --- a/internal/scheduler/schedulers/aiScheduler.go +++ b/internal/scheduler/schedulers/aiScheduler.go @@ -23,6 +23,7 @@ import ( "gitlink.org.cn/JointCloud/pcm-ac/hpcAC" "gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler" "gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/common" + "gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/entity" "gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/schedulers/option" "gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/service/collector" "gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/service/executor" @@ -35,6 +36,7 @@ import ( "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/response" "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/utils" "gitlink.org.cn/JointCloud/pcm-modelarts/client/modelartsservice" + omodel "gitlink.org.cn/JointCloud/pcm-octopus/http/model" "gitlink.org.cn/JointCloud/pcm-octopus/octopus" "gitlink.org.cn/JointCloud/pcm-openi/model" "strconv" @@ -467,6 +469,17 @@ func convertType(in interface{}) (*AiResult, error) { result.JobId = strconv.Itoa(resp.Data.Id) } + return &result, nil + case *entity.OctCreateJobResp: + resp := (in).(entity.OctCreateJobResp) + + if resp.Code != 200 { + result.Msg = resp.Msg + } else { + job := (resp.Data).(*omodel.OctCreateJob) + result.JobId = job.JobId + } + return &result, nil default: return nil, errors.New("ai task response failed") diff --git a/internal/scheduler/service/aiService.go b/internal/scheduler/service/aiService.go index 9dbcafa8..f38c0392 100644 --- a/internal/scheduler/service/aiService.go +++ b/internal/scheduler/service/aiService.go @@ -9,6 +9,7 @@ import ( "gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/service/executor" "gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/service/inference" "gitlink.org.cn/JointCloud/pcm-coordinator/internal/storeLink" + "gitlink.org.cn/JointCloud/pcm-coordinator/internal/storeLink/octopusHttp" "gitlink.org.cn/JointCloud/pcm-coordinator/internal/types" "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/constants" "gitlink.org.cn/JointCloud/pcm-modelarts/client/imagesservice" @@ -75,8 +76,7 @@ func InitAiClusterMap(conf *config.Config, clusters []types.ClusterInfo) (map[st switch c.Name { case OCTOPUS: id, _ := strconv.ParseInt(c.Id, 10, 64) - octopusRpc := octopusclient.NewOctopus(zrpc.MustNewClient(conf.OctopusRpcConf)) - octopus := storeLink.NewOctopusLink(octopusRpc, c.Nickname, id) + octopus := octopusHttp.NewOctopusHttp(id, c.Nickname, c.Server, c.Address, c.Username, c.Password) collectorMap[c.Id] = octopus executorMap[c.Id] = octopus inferenceMap[c.Id] = octopus diff --git a/internal/storeLink/octopusHttp/octopusHttp.go b/internal/storeLink/octopusHttp/octopusHttp.go index 6c46a073..df25a464 100644 --- a/internal/storeLink/octopusHttp/octopusHttp.go +++ b/internal/storeLink/octopusHttp/octopusHttp.go @@ -5,21 +5,30 @@ import ( "context" "encoding/json" "errors" + "gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/entity" "gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/schedulers/option" "gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/service/collector" + "gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/service/executor" "gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/service/inference" + "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/utils" omodel "gitlink.org.cn/JointCloud/pcm-octopus/http/model" "gitlink.org.cn/JointCloud/pcm-openi/common" "mime/multipart" "net/http" + "strconv" + "strings" ) const ( - RESOURCE_POOL = "common-pool" - Param_Token = "token" - Param_Addr = "addr" - Octopus = "octopus" - Forward_Slash = "/" + RESOURCE_POOL = "common-pool" + Param_Token = "token" + Param_Addr = "addr" + Forward_Slash = "/" + COMMA = "," + UNDERSCORE = "_" + TASK_NAME_PREFIX = "trainJob" + Python = "python " + SemiColon = ";" ) const ( @@ -27,8 +36,32 @@ const ( ) const ( - MyAlgorithmListUrl = "/api/v1/algorithm/myAlgorithmList" - ResourcespecsUrl = "/api/v1/resource/specs" + MyAlgorithmListUrl = "api/v1/algorithm/myAlgorithmList" + ResourcespecsUrl = "api/v1/resource/specs" + CreateTrainJobUrl = "api/v1/job/create" + TrainJobDetail = "api/v1/job/detail" +) + +// compute source +var ( + ComputeSourceToCardType = map[string]string{ + "nvidia-a100": "GPU", + "nvidia-a100-80g": "GPU", + "mr-v100": "ILUVATAR-GPGPU", + "bi-v100": "ILUVATAR-GPGPU", + "MR-V50": "ILUVATAR-GPGPU", + "BI-V100": "ILUVATAR-GPGPU", + "BI-V150": "ILUVATAR-GPGPU", + "MR-V100": "ILUVATAR-GPGPU", + + "cambricon.com/mlu": "MLU", + "hygon.com/dcu": "DCU", + + "huawei.com/Ascend910": "NPU", + "enflame.com/gcu": "GCU", + "ILUVATAR-GPGPU": "ILUVATAR-GPGPU", + "MXN260": "METAX-GPGPU", + } ) type OctopusHttp struct { @@ -46,8 +79,22 @@ func NewOctopusHttp(id int64, name, server, host string, user string, pwd string // executor func (o *OctopusHttp) Execute(ctx context.Context, option *option.AiOption, mode int) (interface{}, error) { - //TODO implement me - panic("implement me") + switch mode { + case executor.SUBMIT_MODE_JOINT_CLOUD: + + case executor.SUBMIT_MODE_STORAGE_SCHEDULE: + // cmd + if option.AlgorithmId != "" { + option.Cmd = option.Cmd + SemiColon + Python + option.AlgorithmId + } + option.ResourceId = "9e2feeae30e04492a4298755179f2ae0" + task, err := o.SubmitTask(ctx, option.ImageId, option.Cmd, option.Envs, option.Params, option.ResourceId, option.DatasetsId, option.AlgorithmId, option.TaskType) + if err != nil { + return nil, err + } + return task, nil + } + return nil, nil } func (o *OctopusHttp) Stop(ctx context.Context, id string) error { @@ -55,9 +102,80 @@ func (o *OctopusHttp) Stop(ctx context.Context, id string) error { panic("implement me") } +func (o *OctopusHttp) SubmitTask(ctx context.Context, imageId string, cmd string, envs []string, params []string, resourceId string, datasetsId string, algorithmId string, aiType string) (interface{}, error) { + + // octopus提交任务 + reqUrl := o.server + CreateTrainJobUrl + + token, err := o.token.Get() + if err != nil { + return nil, err + } + + // python参数 + var prms []struct { + Key string `json:"key"` + Value string `json:"value"` + } + for _, param := range params { + var p struct { + Key string `json:"key"` + Value string `json:"value"` + } + s := strings.Split(param, COMMA) + p.Key = s[0] + p.Value = s[1] + prms = append(prms, p) + } + + //环境变量 + envMap := make(map[string]string) + for _, env := range envs { + s := strings.Split(env, COMMA) + envMap[s[0]] = s[1] + } + + param := &omodel.CreateTrainJobParam{ + //DataSetId: datasetsId, + //DataSetVersion: VERSION, + //AlgorithmId: algorithmId, + //AlgorithmVersion: VERSION, + Name: TASK_NAME_PREFIX + UNDERSCORE + utils.RandomString(10), + ImageId: imageId, + IsDistributed: false, + ResourcePool: RESOURCE_POOL, + Config: []*omodel.CreateTrainJobConf{ + { + Command: cmd, + ResourceSpecId: resourceId, + MinFailedTaskCount: 1, + MinSucceededTaskCount: 1, + TaskNumber: 1, + Parameters: prms, + Envs: envMap, + }, + }, + } + + resp := &entity.OctCreateJobResp{} + + req := common.GetRestyRequest(common.TIMEOUT) + _, err = req. + SetHeader("Authorization", "Bearer "+token). + SetBody(param). + SetResult(resp). + Post(reqUrl) + + if err != nil { + return nil, err + } + return resp, nil + +} + // collector -func (o *OctopusHttp) GetResourceStats(ctx context.Context) (*collector.ResourceStats, error) { - resourcespecsUrl := o.server + Forward_Slash + ResourcespecsUrl +func (o *OctopusHttp) resourceSpecs(ctx context.Context) (*entity.OctResourceSpecsResp, error) { + resourcespecsUrl := o.server + ResourcespecsUrl token, err := o.token.Get() if err != nil { return nil, err @@ -70,11 +188,7 @@ func (o *OctopusHttp) GetResourceStats(ctx context.Context) (*collector.Resource b, _ := json.Marshal(param) byt := bytes.NewBuffer(b) - resp := struct { - Code int `json:"code"` - Msg string `json:"msg"` - Data interface{} `json:"data"` - }{} + resp := &entity.OctResourceSpecsResp{} req := common.GetRestyRequest(common.TIMEOUT) r, _ := http.NewRequest("GET", resourcespecsUrl, byt) @@ -86,13 +200,21 @@ func (o *OctopusHttp) GetResourceStats(ctx context.Context) (*collector.Resource SetQueryParam(Param_Token, token). SetQueryParam(Param_Addr, o.host). SetBody(byt). - SetResult(&resp). + SetResult(resp). Send() if err != nil { return nil, err } + return resp, nil +} + +func (o *OctopusHttp) GetResourceStats(ctx context.Context) (*collector.ResourceStats, error) { + resp, err := o.resourceSpecs(ctx) + if err != nil { + return nil, err + } if resp.Code != http.StatusOK { if resp.Data != nil { marshal, err := json.Marshal(resp.Data) @@ -109,13 +231,12 @@ func (o *OctopusHttp) GetResourceStats(ctx context.Context) (*collector.Resource } } else { if resp.Data != nil { - spec := omodel.ResourceSpec{} - + spec := &entity.OctResourceSpecs{} marshal, err := json.Marshal(resp.Data) if err != nil { return nil, err } - err = json.Unmarshal(marshal, &spec.Payload) + err = json.Unmarshal(marshal, spec) if err != nil { return nil, err } @@ -126,8 +247,7 @@ func (o *OctopusHttp) GetResourceStats(ctx context.Context) (*collector.Resource } func (o *OctopusHttp) GetDatasetsSpecs(ctx context.Context) ([]*collector.DatasetsSpecs, error) { - //TODO implement me - panic("implement me") + return nil, nil } func (o *OctopusHttp) GetAlgorithms(ctx context.Context) ([]*collector.Algorithm, error) { @@ -166,8 +286,86 @@ func (o *OctopusHttp) GetUserBalance(ctx context.Context) (float64, error) { } func (o *OctopusHttp) GetResourceSpecs(ctx context.Context, resrcType string) (*collector.ResourceSpec, error) { - //TODO implement me - panic("implement me") + resp, err := o.resourceSpecs(ctx) + if err != nil { + return nil, err + } + + res := &collector.ResourceSpec{ + ClusterId: strconv.FormatInt(o.participantId, 10), + Tag: resrcType, + } + + if resp.Code != http.StatusOK { + if resp.Data != nil { + marshal, err := json.Marshal(resp.Data) + if err != nil { + return nil, err + } + + errormdl := &omodel.Error{} + err = json.Unmarshal(marshal, errormdl) + if err != nil { + return nil, err + } + return nil, errors.New(errormdl.Message) + } + } else { + if resp.Data != nil { + specs := &entity.OctResourceSpecs{} + marshal, err := json.Marshal(resp.Data) + if err != nil { + return nil, err + } + err = json.Unmarshal(marshal, specs) + if err != nil { + return nil, err + } + clusterResources, err := genSpecs(specs, resrcType) + if err != nil { + return nil, err + } + res.Resources = clusterResources + } + } + + return res, nil +} + +func genSpecs(specs *entity.OctResourceSpecs, resrcType string) ([]interface{}, error) { + res := make([]interface{}, 0) + if resrcType == "Inference" { + return res, nil + } else if resrcType == "Train" { + if specs.MapResourceSpecIdList.Train.ResourceSpecs == nil { + return res, nil + } else { + for _, s := range specs.MapResourceSpecIdList.Train.ResourceSpecs { + spec := &omodel.Spec{} + marshal, err := json.Marshal(s) + if err != nil { + return nil, err + } + err = json.Unmarshal(marshal, specs) + if err != nil { + return nil, err + } + if spec.ResourceQuantity.BiV100 != "" { + + } + //cres := &collector.ClusterResource{} + //card := &collector.Usage{ + // Type: ComputeSource[i], + // Name: strings.ToUpper(k), + // Total: &collector.UnitValue{Unit: spec.ResourceQuantity, Value: v.AccCardsNum}, + // Available: &collector.UnitValue{Unit: NUMBER, Value: v.AccCardsNum}, + //} + //spec.ResourceQuantity. + } + } + } + + return nil, nil } // inference