Browse Source

Merge pull request 'update octopushttp' (#541) from tzwang/pcm-coordinator:master into master

pull/542/head
tzwang 2 months ago
parent
commit
3e6daf350d
3 changed files with 399 additions and 92 deletions
  1. +391
    -54
      internal/storeLink/octopusHttp/octopusHttp.go
  2. +7
    -37
      internal/storeLink/octopusHttp/token.go
  3. +1
    -1
      internal/storeLink/openi.go

+ 391
- 54
internal/storeLink/octopusHttp/octopusHttp.go View File

@@ -6,7 +6,6 @@ import (
"encoding/json"
"errors"
"fmt"
"github.com/zeromicro/go-zero/core/logx"
common2 "gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/common"
"gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/entity"
"gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/schedulers/option"
@@ -99,9 +98,11 @@ type OctopusHttp struct {
}

func NewOctopusHttp(id int64, name, server, host string, user string, pwd string) *OctopusHttp {
token, err := NewToken(server, host, user, pwd)
if err != nil {
logx.Infof("Init OctopusHttp, id: %d, host: %s, token error: %s \n", id, host, err)
token := &Token{
user: user,
pwd: pwd,
server: server,
host: host,
}
return &OctopusHttp{platform: name, participantId: id, server: server, host: host, token: token}
}
@@ -168,7 +169,21 @@ func (o *OctopusHttp) Execute(ctx context.Context, option *option.AiOption, mode
//}

// resource
option.ResourceId = "964fdee2db544928bfea74dac12a924f"
resp, err := o.resourceSpecs(ctx)
if err != nil {
return nil, err
}

id, err := matchResource(resp, option.ResourcesRequired)
if err != nil {
return nil, err
}

if id == nil {
return nil, errors.New("resource id is nil")
}

option.ResourceId = *id

// submit
task, err := o.SubmitTask(ctx, option.ImageId, option.Cmd, option.Envs, option.Params, option.ResourceId, option.DatasetsId, option.AlgorithmId, option.TaskType)
@@ -565,7 +580,7 @@ func (o *OctopusHttp) GetResourceSpecs(ctx context.Context, resrcType string) (*
if err != nil {
return nil, err
}
clusterResources, err := genSpecs(specs, resrcType)
clusterResources, err := genSpecs(specs, resrcType, nil)
if err != nil {
return nil, err
}
@@ -576,7 +591,88 @@ func (o *OctopusHttp) GetResourceSpecs(ctx context.Context, resrcType string) (*
return res, nil
}

func genSpecs(specs *entity.OctResourceSpecs, resrcType string) ([]interface{}, error) {
func findSpecId(cType string, cNum string, s *omodel.Spec, resourcesRequired []map[string]interface{}) (*string, error) {
var id string
for _, res := range resourcesRequired {
//typeName, ok := res["type"]
//if !ok {
// continue
//}
name, ok := res["name"]
if !ok {
continue
}
if str, ok := name.(string); ok {
name = strings.ToLower(str)
} else {
continue
}

num, ok := res["number"]
if !ok {
continue
}
if str, ok := num.(string); ok {
num = strings.ToLower(str)
} else {
continue
}

if cType == name && cNum == num {
id = s.Id
return &id, nil
}
}

return nil, nil
}

func matchResource(resp *entity.OctResp, resourcesRequired []map[string]interface{}) (*string, error) {
if resp.Code != http.StatusOK {
if resp.Data != nil {
marshal, err := json.Marshal(resp.Data)
if err != nil {
return nil, err
}

errormdl := &omodel.Error{}
err = json.Unmarshal(marshal, errormdl)
if err != nil {
return nil, err
}
return nil, errors.New(errormdl.Message)
}
} else {
if resp.Data != nil {
spec := &entity.OctResourceSpecs{}
marshal, err := json.Marshal(resp.Data)
if err != nil {
return nil, err
}
err = json.Unmarshal(marshal, spec)
if err != nil {
return nil, err
}

res, err := genSpecs(spec, "Train", resourcesRequired)
if err != nil {
return nil, err
}

if len(res) != 1 {
return nil, errors.New("resource not found")
}

if str, ok := res[0].(*string); ok {
return str, nil
}
}
}

return nil, errors.New("matchResource failed")
}

func genSpecs(specs *entity.OctResourceSpecs, resrcType string, resourcesRequired []map[string]interface{}) ([]interface{}, error) {
res := make([]interface{}, 0)
if resrcType == "Inference" {
return res, nil
@@ -594,7 +690,8 @@ func genSpecs(specs *entity.OctResourceSpecs, resrcType string) ([]interface{},
if err != nil {
return nil, err
}
resType, err := chooseResourceType(spec)

resType, err := chooseResourceType(spec, resourcesRequired)
if err != nil {
return nil, err
}
@@ -609,156 +706,396 @@ func genSpecs(specs *entity.OctResourceSpecs, resrcType string) ([]interface{},
return res, nil
}

func chooseResourceType(spec *omodel.Spec) (*collector.ClusterResource, error) {
func chooseResourceType(spec *omodel.Spec, resourcesRequired []map[string]interface{}) (interface{}, error) {
if spec.ResourceQuantity.NvidiaA100 != "" {
tag, err := common2.GetJSONTag(spec.ResourceQuantity, "NvidiaA100")
if err != nil {
return nil, err
}
cres, err := genClusterResources(tag, spec.ResourceQuantity.NvidiaA100, spec)
if err != nil {
return nil, err

var cres interface{}
if resourcesRequired != nil {
id, err := findSpecId(tag, spec.ResourceQuantity.NvidiaA100, spec, resourcesRequired)
if err != nil {
return nil, err
}
if id != nil {
cres = id
}
} else {
res, err := genClusterResources(tag, spec.ResourceQuantity.NvidiaA100, spec)
if err != nil {
return nil, err
}
cres = res
}

return cres, nil
} else if spec.ResourceQuantity.NvidiaA10080G != "" {
tag, err := common2.GetJSONTag(spec.ResourceQuantity, "NvidiaA10080G")
if err != nil {
return nil, err
}
cres, err := genClusterResources(tag, spec.ResourceQuantity.NvidiaA10080G, spec)
if err != nil {
return nil, err

var cres interface{}
if resourcesRequired != nil {
id, err := findSpecId(tag, spec.ResourceQuantity.NvidiaA10080G, spec, resourcesRequired)
if err != nil {
return nil, err
}
if id != nil {
cres = id
}
} else {
res, err := genClusterResources(tag, spec.ResourceQuantity.NvidiaA10080G, spec)
if err != nil {
return nil, err
}
cres = res
}
//cres, err := genClusterResources(tag, spec.ResourceQuantity.NvidiaA10080G, spec)
//if err != nil {
// return nil, err
//}
return cres, nil
} else if spec.ResourceQuantity.MrV100 != "" {
tag, err := common2.GetJSONTag(spec.ResourceQuantity, "MrV100")
if err != nil {
return nil, err
}
cres, err := genClusterResources(tag, spec.ResourceQuantity.MrV100, spec)
if err != nil {
return nil, err

var cres interface{}
if resourcesRequired != nil {
id, err := findSpecId(tag, spec.ResourceQuantity.MrV100, spec, resourcesRequired)
if err != nil {
return nil, err
}
if id != nil {
cres = id
}
} else {
res, err := genClusterResources(tag, spec.ResourceQuantity.MrV100, spec)
if err != nil {
return nil, err
}
cres = res
}
//cres, err := genClusterResources(tag, spec.ResourceQuantity.MrV100, spec)
//if err != nil {
// return nil, err
//}
return cres, nil
} else if spec.ResourceQuantity.BiV100 != "" {
tag, err := common2.GetJSONTag(spec.ResourceQuantity, "BiV100")
if err != nil {
return nil, err
}
cres, err := genClusterResources(tag, spec.ResourceQuantity.BiV100, spec)
if err != nil {
return nil, err
var cres interface{}
if resourcesRequired != nil {
id, err := findSpecId(tag, spec.ResourceQuantity.BiV100, spec, resourcesRequired)
if err != nil {
return nil, err
}
if id != nil {
cres = id
}
} else {
res, err := genClusterResources(tag, spec.ResourceQuantity.BiV100, spec)
if err != nil {
return nil, err
}
cres = res
}
//cres, err := genClusterResources(tag, spec.ResourceQuantity.BiV100, spec)
//if err != nil {
// return nil, err
//}
return cres, nil
} else if spec.ResourceQuantity.MRV50 != "" {
tag, err := common2.GetJSONTag(spec.ResourceQuantity, "MRV50")
if err != nil {
return nil, err
}
cres, err := genClusterResources(tag, spec.ResourceQuantity.MRV50, spec)
if err != nil {
return nil, err
var cres interface{}
if resourcesRequired != nil {
id, err := findSpecId(tag, spec.ResourceQuantity.MRV50, spec, resourcesRequired)
if err != nil {
return nil, err
}
if id != nil {
cres = id
}
} else {
res, err := genClusterResources(tag, spec.ResourceQuantity.MRV50, spec)
if err != nil {
return nil, err
}
cres = res
}
//cres, err := genClusterResources(tag, spec.ResourceQuantity.MRV50, spec)
//if err != nil {
// return nil, err
//}
return cres, nil
} else if spec.ResourceQuantity.BIV100 != "" {
tag, err := common2.GetJSONTag(spec.ResourceQuantity, "NvidiaA100")
if err != nil {
return nil, err
}
cres, err := genClusterResources(tag, spec.ResourceQuantity.NvidiaA100, spec)
if err != nil {
return nil, err
var cres interface{}
if resourcesRequired != nil {
id, err := findSpecId(tag, spec.ResourceQuantity.NvidiaA100, spec, resourcesRequired)
if err != nil {
return nil, err
}
if id != nil {
cres = id
}
} else {
res, err := genClusterResources(tag, spec.ResourceQuantity.NvidiaA100, spec)
if err != nil {
return nil, err
}
cres = res
}
//cres, err := genClusterResources(tag, spec.ResourceQuantity.NvidiaA100, spec)
//if err != nil {
// return nil, err
//}
return cres, nil
} else if spec.ResourceQuantity.BIV150 != "" {
tag, err := common2.GetJSONTag(spec.ResourceQuantity, "BIV150")
if err != nil {
return nil, err
}
cres, err := genClusterResources(tag, spec.ResourceQuantity.BIV150, spec)
if err != nil {
return nil, err
var cres interface{}
if resourcesRequired != nil {
id, err := findSpecId(tag, spec.ResourceQuantity.BIV150, spec, resourcesRequired)
if err != nil {
return nil, err
}
if id != nil {
cres = id
}
} else {
res, err := genClusterResources(tag, spec.ResourceQuantity.BIV150, spec)
if err != nil {
return nil, err
}
cres = res
}
//cres, err := genClusterResources(tag, spec.ResourceQuantity.BIV150, spec)
//if err != nil {
// return nil, err
//}
return cres, nil
} else if spec.ResourceQuantity.MRV100 != "" {
tag, err := common2.GetJSONTag(spec.ResourceQuantity, "MRV100")
if err != nil {
return nil, err
}
cres, err := genClusterResources(tag, spec.ResourceQuantity.MRV100, spec)
if err != nil {
return nil, err
var cres interface{}
if resourcesRequired != nil {
id, err := findSpecId(tag, spec.ResourceQuantity.MRV100, spec, resourcesRequired)
if err != nil {
return nil, err
}
if id != nil {
cres = id
}
} else {
res, err := genClusterResources(tag, spec.ResourceQuantity.MRV100, spec)
if err != nil {
return nil, err
}
cres = res
}
//cres, err := genClusterResources(tag, spec.ResourceQuantity.MRV100, spec)
//if err != nil {
// return nil, err
//}
return cres, nil
} else if spec.ResourceQuantity.CambriconComMlu != "" {
tag, err := common2.GetJSONTag(spec.ResourceQuantity, "CambriconComMlu")
if err != nil {
return nil, err
}
cres, err := genClusterResources(tag, spec.ResourceQuantity.CambriconComMlu, spec)
if err != nil {
return nil, err
var cres interface{}
if resourcesRequired != nil {
id, err := findSpecId(tag, spec.ResourceQuantity.CambriconComMlu, spec, resourcesRequired)
if err != nil {
return nil, err
}
if id != nil {
cres = id
}
} else {
res, err := genClusterResources(tag, spec.ResourceQuantity.CambriconComMlu, spec)
if err != nil {
return nil, err
}
cres = res
}
//cres, err := genClusterResources(tag, spec.ResourceQuantity.CambriconComMlu, spec)
//if err != nil {
// return nil, err
//}
return cres, nil
} else if spec.ResourceQuantity.HygonComDcu != "" {
tag, err := common2.GetJSONTag(spec.ResourceQuantity, "HygonComDcu")
if err != nil {
return nil, err
}
cres, err := genClusterResources(tag, spec.ResourceQuantity.HygonComDcu, spec)
if err != nil {
return nil, err
var cres interface{}
if resourcesRequired != nil {
id, err := findSpecId(tag, spec.ResourceQuantity.HygonComDcu, spec, resourcesRequired)
if err != nil {
return nil, err
}
if id != nil {
cres = id
}
} else {
res, err := genClusterResources(tag, spec.ResourceQuantity.HygonComDcu, spec)
if err != nil {
return nil, err
}
cres = res
}
//cres, err := genClusterResources(tag, spec.ResourceQuantity.HygonComDcu, spec)
//if err != nil {
// return nil, err
//}
return cres, nil
} else if spec.ResourceQuantity.HuaweiComAscend910 != "" {
tag, err := common2.GetJSONTag(spec.ResourceQuantity, "HuaweiComAscend910")
if err != nil {
return nil, err
}
cres, err := genClusterResources(tag, spec.ResourceQuantity.HuaweiComAscend910, spec)
if err != nil {
return nil, err
var cres interface{}
if resourcesRequired != nil {
id, err := findSpecId(tag, spec.ResourceQuantity.HuaweiComAscend910, spec, resourcesRequired)
if err != nil {
return nil, err
}
if id != nil {
cres = id
}
} else {
res, err := genClusterResources(tag, spec.ResourceQuantity.HuaweiComAscend910, spec)
if err != nil {
return nil, err
}
cres = res
}
//cres, err := genClusterResources(tag, spec.ResourceQuantity.HuaweiComAscend910, spec)
//if err != nil {
// return nil, err
//}
return cres, nil
} else if spec.ResourceQuantity.EnflameComGcu != "" {
tag, err := common2.GetJSONTag(spec.ResourceQuantity, "EnflameComGcu")
if err != nil {
return nil, err
}
cres, err := genClusterResources(tag, spec.ResourceQuantity.EnflameComGcu, spec)
if err != nil {
return nil, err
var cres interface{}
if resourcesRequired != nil {
id, err := findSpecId(tag, spec.ResourceQuantity.EnflameComGcu, spec, resourcesRequired)
if err != nil {
return nil, err
}
if id != nil {
cres = id
}
} else {
res, err := genClusterResources(tag, spec.ResourceQuantity.EnflameComGcu, spec)
if err != nil {
return nil, err
}
cres = res
}
//cres, err := genClusterResources(tag, spec.ResourceQuantity.EnflameComGcu, spec)
//if err != nil {
// return nil, err
//}
return cres, nil
} else if spec.ResourceQuantity.MXN260 != "" {
tag, err := common2.GetJSONTag(spec.ResourceQuantity, "MXN260")
if err != nil {
return nil, err
}
cres, err := genClusterResources(tag, spec.ResourceQuantity.MXN260, spec)
if err != nil {
return nil, err
var cres interface{}
if resourcesRequired != nil {
id, err := findSpecId(tag, spec.ResourceQuantity.MXN260, spec, resourcesRequired)
if err != nil {
return nil, err
}
if id != nil {
cres = id
}
} else {
res, err := genClusterResources(tag, spec.ResourceQuantity.MXN260, spec)
if err != nil {
return nil, err
}
cres = res
}
//cres, err := genClusterResources(tag, spec.ResourceQuantity.MXN260, spec)
//if err != nil {
// return nil, err
//}
return cres, nil
} else if spec.ResourceQuantity.NvidiaV100 != "" {
tag, err := common2.GetJSONTag(spec.ResourceQuantity, "NvidiaV100")
if err != nil {
return nil, err
}
cres, err := genClusterResources(tag, spec.ResourceQuantity.NvidiaV100, spec)
if err != nil {
return nil, err
var cres interface{}
if resourcesRequired != nil {
id, err := findSpecId(tag, spec.ResourceQuantity.NvidiaV100, spec, resourcesRequired)
if err != nil {
return nil, err
}
if id != nil {
cres = id
}
} else {
res, err := genClusterResources(tag, spec.ResourceQuantity.NvidiaV100, spec)
if err != nil {
return nil, err
}
cres = res
}
//cres, err := genClusterResources(tag, spec.ResourceQuantity.NvidiaV100, spec)
//if err != nil {
// return nil, err
//}
return cres, nil
} else if spec.ResourceQuantity.MetaxTechComGpu != "" {
tag, err := common2.GetJSONTag(spec.ResourceQuantity, "MetaxTechComGpu")
if err != nil {
return nil, err
}
cres, err := genClusterResources(tag, spec.ResourceQuantity.MetaxTechComGpu, spec)
if err != nil {
return nil, err
var cres interface{}
if resourcesRequired != nil {
id, err := findSpecId(tag, spec.ResourceQuantity.MetaxTechComGpu, spec, resourcesRequired)
if err != nil {
return nil, err
}
if id != nil {
cres = id
}
} else {
res, err := genClusterResources(tag, spec.ResourceQuantity.MetaxTechComGpu, spec)
if err != nil {
return nil, err
}
cres = res
}
//cres, err := genClusterResources(tag, spec.ResourceQuantity.MetaxTechComGpu, spec)
//if err != nil {
// return nil, err
//}
return cres, nil
}



+ 7
- 37
internal/storeLink/octopusHttp/token.go View File

@@ -3,7 +3,6 @@ package octopusHttp
import (
"crypto/tls"
"encoding/json"
"errors"
"github.com/go-resty/resty/v2"
"time"
)
@@ -29,54 +28,25 @@ type Login struct {
}

type Token struct {
server string
host string
user string
pwd string
tokenUrl string
ttp *TokenTimePair
token string
}

func NewToken(server, host, user, pwd string) (*Token, error) {
login := Login{
Username: user,
Password: pwd,
}
jsonStr, _ := json.Marshal(login)
tokenUrl := server + GetToken

token, err := generateToken(jsonStr, tokenUrl, host)
if err != nil {
return &Token{token: "", host: host, user: user, pwd: pwd, tokenUrl: tokenUrl}, err
}
return &Token{token: token, host: host, user: user, pwd: pwd, tokenUrl: tokenUrl}, nil
server string
host string
user string
pwd string
}

func (t *Token) Get() (string, error) {
if t.token == "" {
err := t.Update()
if err != nil {
return "", errors.New("get token failed")
}
}
return t.token, nil
}

func (t *Token) Update() error {
login := Login{
Username: t.user,
Password: t.pwd,
}
jsonStr, _ := json.Marshal(login)
tokenUrl := t.server + GetToken

token, err := generateToken(jsonStr, tokenUrl, t.host)
if err != nil {
return errors.New("get token failed")
return "", err
}

t.token = token
return nil
return token, nil
}

func generateToken(jsonStr []byte, tokenUrl string, host string) (string, error) {


+ 1
- 1
internal/storeLink/openi.go View File

@@ -563,7 +563,7 @@ func (o *OpenI) SubmitInferTask(ctx context.Context, imageId string, cmd string,
Description: algorithmId, // temporarily set reponame contained in the algorithmId to desc for missing taskdetail's reponame
JobType: ONLINEINFERENCE,
Cluster: C2NET,
DisplayJobName: ONLINEINFERENCE + UNDERSCORE + utils.RandomString(10),
DisplayJobName: strings.ToLower(ONLINEINFERENCE + UNDERSCORE + utils.RandomString(10)),
ComputeSource: computeSource,
SpecId: int(specId),
BranchName: branchName,


Loading…
Cancel
Save