Browse Source

update octopusHttp service

pull/500/head
tzwang 5 months ago
parent
commit
4498264c4e
4 changed files with 310 additions and 69 deletions
  1. +28
    -0
      internal/scheduler/common/common.go
  2. +4
    -1
      internal/scheduler/service/aiService.go
  3. +263
    -56
      internal/storeLink/octopusHttp/octopusHttp.go
  4. +15
    -12
      internal/storeLink/octopusHttp/token.go

+ 28
- 0
internal/scheduler/common/common.go View File

@@ -16,10 +16,13 @@ package common

import (
"encoding/json"
"fmt"
"github.com/go-resty/resty/v2"
"math"
"math/rand"
"reflect"
"strconv"
"strings"
"time"
)

@@ -158,3 +161,28 @@ func ConvertTypeToString(v interface{}) string {
return ""
}
}

func GetJSONTag(structObj interface{}, fieldName string) (string, error) {
// 获取结构体的反射类型
t := reflect.TypeOf(structObj)

// 确保传入的是结构体
if t.Kind() != reflect.Struct {
return "", fmt.Errorf("expected a struct, got %T", structObj)
}

// 查找字段
field, found := t.FieldByName(fieldName)
if !found {
return "", fmt.Errorf("field '%s' not found", fieldName)
}

// 获取 `json` 标签
jsonTag := field.Tag.Get("json")
if jsonTag == "" {
return field.Name, nil // 默认使用字段名(小写)
}

// 去掉可能的 `omitempty` 等选项
return strings.Split(jsonTag, ",")[0], nil
}

+ 4
- 1
internal/scheduler/service/aiService.go View File

@@ -76,7 +76,10 @@ func InitAiClusterMap(conf *config.Config, clusters []types.ClusterInfo) (map[st
switch c.Name {
case OCTOPUS:
id, _ := strconv.ParseInt(c.Id, 10, 64)
octopus := octopusHttp.NewOctopusHttp(id, c.Nickname, c.Server, c.Address, c.Username, c.Password)
octopus, err := octopusHttp.NewOctopusHttp(id, c.Nickname, c.Server, c.Address, c.Username, c.Password)
if err != nil {
panic(err)
}
collectorMap[c.Id] = octopus
executorMap[c.Id] = octopus
inferenceMap[c.Id] = octopus


+ 263
- 56
internal/storeLink/octopusHttp/octopusHttp.go View File

@@ -5,6 +5,8 @@ import (
"context"
"encoding/json"
"errors"
"fmt"
common2 "gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/common"
"gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/entity"
"gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/schedulers/option"
"gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/service/collector"
@@ -15,6 +17,7 @@ import (
"gitlink.org.cn/JointCloud/pcm-openi/common"
"mime/multipart"
"net/http"
"strconv"
"strings"
)

@@ -28,6 +31,24 @@ const (
TASK_NAME_PREFIX = "trainJob"
Python = "python "
SemiColon = ";"
BALANCE = "balance"
RATE = "rate"
PERHOUR = "per-hour"
NUMBER = "number"
KILOBYTE = "kb"
GIGABYTE = "gb"
CPUCORE = "core"
STORAGE = "STORAGE"
DISK = "disk"
MEMORY = "memory"
RAM = "ram"
VRAM = "vram"
RMB = "rmb"
POINT = "point"
RUNNINGTASK = "RUNNING_TASK"
RUNNING = "RUNNING"
CPU = "cpu"
Gi = "Gi"
)

const (
@@ -71,9 +92,12 @@ type OctopusHttp struct {
token *Token
}

func NewOctopusHttp(id int64, name, server, host string, user string, pwd string) *OctopusHttp {
token, _ := NewToken(host, user, pwd)
return &OctopusHttp{platform: name, participantId: id, server: server, host: host, token: token}
func NewOctopusHttp(id int64, name, server, host string, user string, pwd string) (*OctopusHttp, error) {
token, err := NewToken(server, host, user, pwd)
if err != nil {
return nil, err
}
return &OctopusHttp{platform: name, participantId: id, server: server, host: host, token: token}, nil
}

// executor
@@ -291,50 +315,50 @@ func (o *OctopusHttp) GetUserBalance(ctx context.Context) (float64, error) {
}

func (o *OctopusHttp) GetResourceSpecs(ctx context.Context, resrcType string) (*collector.ResourceSpec, error) {
//resp, err := o.resourceSpecs(ctx)
//if err != nil {
// return nil, err
//}
//
//res := &collector.ResourceSpec{
// ClusterId: strconv.FormatInt(o.participantId, 10),
// Tag: resrcType,
//}
//
//if resp.Code != http.StatusOK {
// if resp.Data != nil {
// marshal, err := json.Marshal(resp.Data)
// if err != nil {
// return nil, err
// }
//
// errormdl := &omodel.Error{}
// err = json.Unmarshal(marshal, errormdl)
// if err != nil {
// return nil, err
// }
// return nil, errors.New(errormdl.Message)
// }
//} else {
// if resp.Data != nil {
// specs := &entity.OctResourceSpecs{}
// marshal, err := json.Marshal(resp.Data)
// if err != nil {
// return nil, err
// }
// err = json.Unmarshal(marshal, specs)
// if err != nil {
// return nil, err
// }
// clusterResources, err := genSpecs(specs, resrcType)
// if err != nil {
// return nil, err
// }
// res.Resources = clusterResources
// }
//}
resp, err := o.resourceSpecs(ctx)
if err != nil {
return nil, err
}

return nil, nil
res := &collector.ResourceSpec{
ClusterId: strconv.FormatInt(o.participantId, 10),
Tag: resrcType,
}

if resp.Code != http.StatusOK {
if resp.Data != nil {
marshal, err := json.Marshal(resp.Data)
if err != nil {
return nil, err
}

errormdl := &omodel.Error{}
err = json.Unmarshal(marshal, errormdl)
if err != nil {
return nil, err
}
return nil, errors.New(errormdl.Message)
}
} else {
if resp.Data != nil {
specs := &entity.OctResourceSpecs{}
marshal, err := json.Marshal(resp.Data)
if err != nil {
return nil, err
}
err = json.Unmarshal(marshal, specs)
if err != nil {
return nil, err
}
clusterResources, err := genSpecs(specs, resrcType)
if err != nil {
return nil, err
}
res.Resources = clusterResources
}
}

return res, nil
}

func genSpecs(specs *entity.OctResourceSpecs, resrcType string) ([]interface{}, error) {
@@ -355,24 +379,207 @@ func genSpecs(specs *entity.OctResourceSpecs, resrcType string) ([]interface{},
if err != nil {
return nil, err
}
if spec.ResourceQuantity.BiV100 != "" {

resType, err := chooseResourceType(spec)
if err != nil {
return nil, err
}
//cres := &collector.ClusterResource{}
//card := &collector.Usage{
// Type: ComputeSource[i],
// Name: strings.ToUpper(k),
// Total: &collector.UnitValue{Unit: spec.ResourceQuantity, Value: v.AccCardsNum},
// Available: &collector.UnitValue{Unit: NUMBER, Value: v.AccCardsNum},
//}
//spec.ResourceQuantity.
if resType == nil {
continue
}
res = append(res, resType)
}
}
}

return res, nil
}

func chooseResourceType(spec *omodel.Spec) (*collector.ClusterResource, error) {
if spec.ResourceQuantity.NvidiaA100 != "" {
tag, err := common2.GetJSONTag(spec, "NvidiaA100")
if err != nil {
return nil, err
}
cres, err := genClusterResources(tag, spec.ResourceQuantity.NvidiaA100, spec)
if err != nil {
return nil, err
}
return cres, nil
} else if spec.ResourceQuantity.NvidiaA10080G != "" {
tag, err := common2.GetJSONTag(spec, "NvidiaA100")
if err != nil {
return nil, err
}
cres, err := genClusterResources(tag, spec.ResourceQuantity.NvidiaA100, spec)
if err != nil {
return nil, err
}
return cres, nil
} else if spec.ResourceQuantity.MrV100 != "" {
tag, err := common2.GetJSONTag(spec, "NvidiaA100")
if err != nil {
return nil, err
}
cres, err := genClusterResources(tag, spec.ResourceQuantity.NvidiaA100, spec)
if err != nil {
return nil, err
}
return cres, nil
} else if spec.ResourceQuantity.BiV100 != "" {
tag, err := common2.GetJSONTag(spec, "NvidiaA100")
if err != nil {
return nil, err
}
cres, err := genClusterResources(tag, spec.ResourceQuantity.NvidiaA100, spec)
if err != nil {
return nil, err
}
return cres, nil
} else if spec.ResourceQuantity.MRV50 != "" {
tag, err := common2.GetJSONTag(spec, "NvidiaA100")
if err != nil {
return nil, err
}
cres, err := genClusterResources(tag, spec.ResourceQuantity.NvidiaA100, spec)
if err != nil {
return nil, err
}
return cres, nil
} else if spec.ResourceQuantity.BIV100 != "" {
tag, err := common2.GetJSONTag(spec, "NvidiaA100")
if err != nil {
return nil, err
}
cres, err := genClusterResources(tag, spec.ResourceQuantity.NvidiaA100, spec)
if err != nil {
return nil, err
}
return cres, nil
} else if spec.ResourceQuantity.BIV150 != "" {
tag, err := common2.GetJSONTag(spec, "NvidiaA100")
if err != nil {
return nil, err
}
cres, err := genClusterResources(tag, spec.ResourceQuantity.NvidiaA100, spec)
if err != nil {
return nil, err
}
return cres, nil
} else if spec.ResourceQuantity.MRV100 != "" {
tag, err := common2.GetJSONTag(spec, "NvidiaA100")
if err != nil {
return nil, err
}
cres, err := genClusterResources(tag, spec.ResourceQuantity.NvidiaA100, spec)
if err != nil {
return nil, err
}
return cres, nil
} else if spec.ResourceQuantity.CambriconComMlu != "" {
tag, err := common2.GetJSONTag(spec, "NvidiaA100")
if err != nil {
return nil, err
}
cres, err := genClusterResources(tag, spec.ResourceQuantity.NvidiaA100, spec)
if err != nil {
return nil, err
}
return cres, nil
} else if spec.ResourceQuantity.HygonComDcu != "" {
tag, err := common2.GetJSONTag(spec, "NvidiaA100")
if err != nil {
return nil, err
}
cres, err := genClusterResources(tag, spec.ResourceQuantity.NvidiaA100, spec)
if err != nil {
return nil, err
}
return cres, nil
} else if spec.ResourceQuantity.HuaweiComAscend910 != "" {
tag, err := common2.GetJSONTag(spec, "NvidiaA100")
if err != nil {
return nil, err
}
cres, err := genClusterResources(tag, spec.ResourceQuantity.NvidiaA100, spec)
if err != nil {
return nil, err
}
return cres, nil
} else if spec.ResourceQuantity.EnflameComGcu != "" {
tag, err := common2.GetJSONTag(spec, "NvidiaA100")
if err != nil {
return nil, err
}
cres, err := genClusterResources(tag, spec.ResourceQuantity.NvidiaA100, spec)
if err != nil {
return nil, err
}
return cres, nil
} else if spec.ResourceQuantity.MXN260 != "" {
tag, err := common2.GetJSONTag(spec, "NvidiaA100")
if err != nil {
return nil, err
}
cres, err := genClusterResources(tag, spec.ResourceQuantity.NvidiaA100, spec)
if err != nil {
return nil, err
}
return cres, nil
}

return nil, nil
}

func genClusterResources(cType string, cNum string, s *omodel.Spec) (*collector.ClusterResource, error) {
cres := &collector.ClusterResource{}
bres := make([]*collector.Usage, 0)

cardNum, err := strconv.ParseInt(cNum, 10, 64)
if err != nil {
return nil, err
}
cpuCore, err := strconv.ParseInt(s.ResourceQuantity.Cpu, 10, 64)
if err != nil {
return nil, err
}
gi := strings.Split(s.ResourceQuantity.Memory, Gi)
if len(gi) != 1 {
return nil, fmt.Errorf("s.ResourceQuantity.Memory convert error: %s", s.ResourceQuantity.Memory)
}

memGi, err := strconv.ParseInt(gi[0], 10, 64)
if err != nil {
return nil, err
}

card := &collector.Usage{
Type: ComputeSourceToCardType[cType],
Name: strings.ToUpper(cType),
Total: &collector.UnitValue{Unit: NUMBER, Value: cardNum},
Available: &collector.UnitValue{Unit: NUMBER, Value: cardNum},
}
cpu := &collector.Usage{
Type: strings.ToUpper(CPU),
Name: strings.ToUpper(CPU),
Total: &collector.UnitValue{Unit: CPUCORE, Value: cpuCore},
Available: &collector.UnitValue{Unit: CPUCORE, Value: cpuCore},
}
mem := &collector.Usage{
Type: strings.ToUpper(MEMORY),
Name: strings.ToUpper(RAM),
Total: &collector.UnitValue{Unit: GIGABYTE, Value: memGi},
Available: &collector.UnitValue{Unit: GIGABYTE, Value: memGi},
}

bres = append(bres, cpu)
bres = append(bres, mem)

cres.Resource = card
cres.BaseResources = bres

return cres, nil
}

// inference
func (o *OctopusHttp) GetClusterInferUrl(ctx context.Context, option *option.InferOption) (*inference.ClusterInferUrl, error) {
return nil, errors.New(NotImplementError)


+ 15
- 12
internal/storeLink/octopusHttp/token.go View File

@@ -8,7 +8,7 @@ import (
)

const (
GetToken = "openaiserver/v1/authmanage/token"
GetToken = "api/v1/token"
)

type TokenModel struct {
@@ -31,20 +31,23 @@ type Login struct {
}

type Token struct {
ip string
user string
pwd string
ttp *TokenTimePair
server string
host string
user string
pwd string
tokenUrl string
ttp *TokenTimePair
}

func NewToken(ip, user, pwd string) (*Token, error) {
func NewToken(server, host, user, pwd string) (*Token, error) {
login := Login{
Username: user,
Password: pwd,
}
jsonStr, _ := json.Marshal(login)
tokenUrl := ip + Forward_Slash + GetToken
token, tm, err := generateToken(jsonStr, tokenUrl)
tokenUrl := server + GetToken

token, tm, err := generateToken(jsonStr, tokenUrl, host)
if err != nil {
return nil, err
}
@@ -52,7 +55,7 @@ func NewToken(ip, user, pwd string) (*Token, error) {
Token: token,
ExpiredAt: tm,
}
return &Token{ttp: ttp, ip: ip, user: user, pwd: pwd}, nil
return &Token{ttp: ttp, host: host, user: user, pwd: pwd, tokenUrl: tokenUrl}, nil
}

func (t *Token) update() error {
@@ -61,8 +64,7 @@ func (t *Token) update() error {
Password: t.pwd,
}
jsonStr, _ := json.Marshal(login)
tokenUrl := t.ip + Forward_Slash + GetToken
token, tm, err := generateToken(jsonStr, tokenUrl)
token, tm, err := generateToken(jsonStr, t.tokenUrl, t.host)
if err != nil {
return err
}
@@ -84,7 +86,7 @@ func (t *Token) Get() (string, error) {
return t.ttp.Token, nil
}

func generateToken(jsonStr []byte, tokenUrl string) (string, time.Time, error) {
func generateToken(jsonStr []byte, tokenUrl string, host string) (string, time.Time, error) {
client := resty.New().SetTimeout(time.Duration(5) * time.Second)
req := client.R()

@@ -97,6 +99,7 @@ func generateToken(jsonStr []byte, tokenUrl string) (string, time.Time, error) {
_, err := req.
SetResult(&tokenResp).
SetHeader("Content-Type", "application/json").
SetQueryParam("addr", host).
SetBody(jsonStr).
Post(tokenUrl)
if err != nil {


Loading…
Cancel
Save