Browse Source

updated openi

pull/378/head
tzwang 10 months ago
parent
commit
03eca58eb8
3 changed files with 160 additions and 11 deletions
  1. +12
    -0
      internal/scheduler/schedulers/aiScheduler.go
  2. +1
    -1
      internal/scheduler/service/aiService.go
  3. +147
    -10
      internal/storeLink/openi.go

+ 12
- 0
internal/scheduler/schedulers/aiScheduler.go View File

@@ -34,6 +34,8 @@ import (
"gitlink.org.cn/JointCloud/pcm-coordinator/pkg/utils"
"gitlink.org.cn/JointCloud/pcm-modelarts/client/modelartsservice"
"gitlink.org.cn/JointCloud/pcm-octopus/octopus"
"gitlink.org.cn/JointCloud/pcm-openi/model"
"strconv"
"sync"
)

@@ -391,6 +393,16 @@ func convertType(in interface{}) (*AiResult, error) {
result.JobId = resp.Metadata.Id
}

return &result, nil
case *model.CreateTask:
resp := (in).(*model.CreateTask)

if resp.Code != 0 {
result.Msg = resp.Msg
} else {
result.JobId = strconv.Itoa(resp.Data.Id)
}

return &result, nil
default:
return nil, errors.New("ai task response failed")


+ 1
- 1
internal/scheduler/service/aiService.go View File

@@ -91,7 +91,7 @@ func InitAiClusterMap(conf *config.Config, clusters []types.ClusterInfo) (map[st
inferenceMap[c.Id] = sgai
case OPENI:
id, _ := strconv.ParseInt(c.Id, 10, 64)
openi := storeLink.NewOpenI(c.Server, id, c.Username, c.Token)
openi := storeLink.NewOpenI("http://localhost:2024", id, c.Username, c.Token)
collectorMap[c.Id] = openi
executorMap[c.Id] = openi
inferenceMap[c.Id] = openi


+ 147
- 10
internal/storeLink/openi.go View File

@@ -5,10 +5,12 @@ import (
"context"
"encoding/json"
"errors"
openIcom "gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/common"
"gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/schedulers/option"
"gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/service/collector"
"gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/service/executor"
"gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/service/inference"
"gitlink.org.cn/JointCloud/pcm-coordinator/pkg/utils"
"gitlink.org.cn/JointCloud/pcm-openi/common"
"gitlink.org.cn/JointCloud/pcm-openi/model"
"mime/multipart"
@@ -27,11 +29,24 @@ const (
TESTREPO = "testrepo"
)

const (
CreationRequirelUrl = "/api/v1/task/creationRequired"
TaskCreatelUrl = "/api/v1/task/create"
ReposUrl = "/api/v1/user/repos"
TaskListUrl = "/api/v1/task/list"
)

// compute source
var (
ComputeSource = []string{"GPU", "NPU", "GCU", "MLU", "DCU", "CPU", "ILUVATAR-GPGPU", "METAX-GPGPU"}
)

type ResourceSpecOpenI struct {
ResType string
Name string
Number int64
}

type OpenI struct {
participantId int64
host string
@@ -54,6 +69,99 @@ func (o OpenI) Execute(ctx context.Context, option *option.AiOption, mode int) (

case executor.SUBMIT_MODE_STORAGE_SCHEDULE:

var repoName string

codePaths := strings.Split(option.AlgorithmId, FORWARD_SLASH)
if len(codePaths) != 3 {
return nil, errors.New("algorithmId format is incorrect")
}
repoName = codePaths[0]

spec := &ResourceSpecOpenI{}
for _, res := range option.ResourcesRequired {
typeName, ok := res["type"]
if !ok {
continue
}
name, ok := res["name"]
if !ok {
continue
}
for _, s := range ComputeSource {
switch typeName {
case s:
num, ok := res["number"]
if !ok {
continue
}
n := openIcom.ConvertTypeToString(num)
val, err := strconv.ParseInt(n, 10, 64)
if err != nil {
return nil, err
}
spec.ResType = s
spec.Name = openIcom.ConvertTypeToString(name)
spec.Number = val
break
}
}
}

if spec.ResType == "" || spec.Name == "" {
return nil, errors.New("resource spec not found")
}

creationRequirelUrl := o.host + CreationRequirelUrl

param := model.TaskCreationRequiredParam{
UserName: o.userName,
RepoName: repoName,
JobType: TRAIN,
ComputeSource: spec.ResType,
ClusterType: C2NET,
}

b, _ := json.Marshal(param)
byt := bytes.NewBuffer(b)

resp := struct {
Code int `json:"code"`
Msg string `json:"msg"`
Data model.TaskCreationRequired `json:"data"`
}{}

req := common.GetRestyRequest(common.TIMEOUT)
r, _ := http.NewRequest("GET", creationRequirelUrl, byt)
req.RawRequest = r
req.URL = creationRequirelUrl

_, err := req.
SetHeader("Content-Type", "application/json").
SetQueryParam(common.ACCESSTOKEN, o.accessToken).
SetBody(byt).
SetResult(&resp).
Send()

if err != nil {
return nil, errors.New("failed to invoke TaskCreationRequired")
}

if len(resp.Data.Data.Specs.All) == 0 {
return nil, errors.New("TaskCreationRequired specs are empty")
}

for _, s := range resp.Data.Data.Specs.All {
if spec.ResType == s.ComputeResource && spec.Name == s.AccCardType {
if int(spec.Number) == s.AccCardsNum {
option.ResourceId = strconv.Itoa(s.Id)
break
}
}
}

if option.ResourceId == "" {
return nil, errors.New("can not find spec Id")
}
}

task, err := o.SubmitTask(ctx, option.ImageId, option.Cmd, option.Envs, option.Params, option.ResourceId, option.DatasetsId, option.AlgorithmId, option.TaskType)
@@ -64,12 +172,41 @@ func (o OpenI) Execute(ctx context.Context, option *option.AiOption, mode int) (
}

func (o *OpenI) SubmitTask(ctx context.Context, imageId string, cmd string, envs []string, params []string, resourceId string, datasetsId string, algorithmId string, aiType string) (interface{}, error) {
taskCreatelUrl := o.host + "/api/v1/task/create"
taskCreatelUrl := o.host + TaskCreatelUrl
var repoName string
var branchName string
var bootFile string
codePaths := strings.Split(algorithmId, FORWARD_SLASH)
if len(codePaths) != 3 {
return nil, errors.New("algorithmId format is incorrect")
}

taskParam := &model.CreateTaskParam{}
specId, err := strconv.ParseInt(resourceId, 10, 0)
if err != nil {
return nil, err
}
repoName = codePaths[0]
branchName = codePaths[1]
bootFile = codePaths[2]

//params := "{\"parameter\":[{\"label\":\"a\",\"value\":\"1\"},{\"label\":\"b\",\"value\":\"2\"}]}"

taskParam := &model.CreateTaskParam{
JobType: TRAIN,
Cluster: C2NET,
DisplayJobName: TRAIN + UNDERSCORE + utils.RandomString(10),
ComputeSource: "",
SpecId: int(specId),
BranchName: branchName,
ImageId: imageId,
DatasetUuidStr: datasetsId,
BootFile: bootFile,
HasInternet: 2, // 0 不限制;1 不需要互联网;2 需要互联网
WorkServerNumber: 1, // 运行节点数
}
param := model.CreateTaskReq{
UserName: o.userName,
RepoName: algorithmId,
RepoName: repoName,
CreateTaskParam: taskParam,
}

@@ -77,9 +214,9 @@ func (o *OpenI) SubmitTask(ctx context.Context, imageId string, cmd string, envs
byt := bytes.NewBuffer(b)

resp := struct {
Code int `json:"code"`
Msg string `json:"msg"`
Data model.CreateTask `json:"data"`
Code int `json:"code"`
Msg string `json:"msg"`
Data *model.CreateTask `json:"data"`
}{}

req := common.GetRestyRequest(common.TIMEOUT)
@@ -87,7 +224,7 @@ func (o *OpenI) SubmitTask(ctx context.Context, imageId string, cmd string, envs
req.RawRequest = r
req.URL = taskCreatelUrl

_, err := req.
_, err = req.
SetHeader("Content-Type", "application/json").
SetQueryParam(common.ACCESSTOKEN, o.accessToken).
SetBody(byt).
@@ -173,9 +310,9 @@ func (o OpenI) GetResourceSpecs(ctx context.Context) (*collector.ResourceSpec, e
res := &collector.ResourceSpec{
ClusterId: strconv.FormatInt(o.participantId, 10),
}
creationRequirelUrl := o.host + "/api/v1/task/creationRequired"
reposUrl := o.host + "/api/v1/user/repos"
taskListUrl := o.host + "/api/v1/task/list"
creationRequirelUrl := o.host + CreationRequirelUrl
reposUrl := o.host + ReposUrl
taskListUrl := o.host + TaskListUrl
//taskDetailsUrl := o.host + "/api/v1/task/detail"

var wg sync.WaitGroup


Loading…
Cancel
Save