From 4498264c4e631d824e13f2e73ccabb6cb2c47cae Mon Sep 17 00:00:00 2001 From: tzwang Date: Wed, 18 Jun 2025 19:27:24 +0800 Subject: [PATCH] update octopusHttp service --- internal/scheduler/common/common.go | 28 ++ internal/scheduler/service/aiService.go | 5 +- internal/storeLink/octopusHttp/octopusHttp.go | 319 +++++++++++++++--- internal/storeLink/octopusHttp/token.go | 27 +- 4 files changed, 310 insertions(+), 69 deletions(-) diff --git a/internal/scheduler/common/common.go b/internal/scheduler/common/common.go index 424ec21f..1f10bcc9 100644 --- a/internal/scheduler/common/common.go +++ b/internal/scheduler/common/common.go @@ -16,10 +16,13 @@ package common import ( "encoding/json" + "fmt" "github.com/go-resty/resty/v2" "math" "math/rand" + "reflect" "strconv" + "strings" "time" ) @@ -158,3 +161,28 @@ func ConvertTypeToString(v interface{}) string { return "" } } + +func GetJSONTag(structObj interface{}, fieldName string) (string, error) { + // 获取结构体的反射类型 + t := reflect.TypeOf(structObj) + + // 确保传入的是结构体 + if t.Kind() != reflect.Struct { + return "", fmt.Errorf("expected a struct, got %T", structObj) + } + + // 查找字段 + field, found := t.FieldByName(fieldName) + if !found { + return "", fmt.Errorf("field '%s' not found", fieldName) + } + + // 获取 `json` 标签 + jsonTag := field.Tag.Get("json") + if jsonTag == "" { + return field.Name, nil // 默认使用字段名(小写) + } + + // 去掉可能的 `omitempty` 等选项 + return strings.Split(jsonTag, ",")[0], nil +} diff --git a/internal/scheduler/service/aiService.go b/internal/scheduler/service/aiService.go index f38c0392..2c416fda 100644 --- a/internal/scheduler/service/aiService.go +++ b/internal/scheduler/service/aiService.go @@ -76,7 +76,10 @@ func InitAiClusterMap(conf *config.Config, clusters []types.ClusterInfo) (map[st switch c.Name { case OCTOPUS: id, _ := strconv.ParseInt(c.Id, 10, 64) - octopus := octopusHttp.NewOctopusHttp(id, c.Nickname, c.Server, c.Address, c.Username, c.Password) + octopus, err := octopusHttp.NewOctopusHttp(id, c.Nickname, c.Server, c.Address, c.Username, c.Password) + if err != nil { + panic(err) + } collectorMap[c.Id] = octopus executorMap[c.Id] = octopus inferenceMap[c.Id] = octopus diff --git a/internal/storeLink/octopusHttp/octopusHttp.go b/internal/storeLink/octopusHttp/octopusHttp.go index 6bbf208f..898b5999 100644 --- a/internal/storeLink/octopusHttp/octopusHttp.go +++ b/internal/storeLink/octopusHttp/octopusHttp.go @@ -5,6 +5,8 @@ import ( "context" "encoding/json" "errors" + "fmt" + common2 "gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/common" "gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/entity" "gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/schedulers/option" "gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/service/collector" @@ -15,6 +17,7 @@ import ( "gitlink.org.cn/JointCloud/pcm-openi/common" "mime/multipart" "net/http" + "strconv" "strings" ) @@ -28,6 +31,24 @@ const ( TASK_NAME_PREFIX = "trainJob" Python = "python " SemiColon = ";" + BALANCE = "balance" + RATE = "rate" + PERHOUR = "per-hour" + NUMBER = "number" + KILOBYTE = "kb" + GIGABYTE = "gb" + CPUCORE = "core" + STORAGE = "STORAGE" + DISK = "disk" + MEMORY = "memory" + RAM = "ram" + VRAM = "vram" + RMB = "rmb" + POINT = "point" + RUNNINGTASK = "RUNNING_TASK" + RUNNING = "RUNNING" + CPU = "cpu" + Gi = "Gi" ) const ( @@ -71,9 +92,12 @@ type OctopusHttp struct { token *Token } -func NewOctopusHttp(id int64, name, server, host string, user string, pwd string) *OctopusHttp { - token, _ := NewToken(host, user, pwd) - return &OctopusHttp{platform: name, participantId: id, server: server, host: host, token: token} +func NewOctopusHttp(id int64, name, server, host string, user string, pwd string) (*OctopusHttp, error) { + token, err := NewToken(server, host, user, pwd) + if err != nil { + return nil, err + } + return &OctopusHttp{platform: name, participantId: id, server: server, host: host, token: token}, nil } // executor @@ -291,50 +315,50 @@ func (o *OctopusHttp) GetUserBalance(ctx context.Context) (float64, error) { } func (o *OctopusHttp) GetResourceSpecs(ctx context.Context, resrcType string) (*collector.ResourceSpec, error) { - //resp, err := o.resourceSpecs(ctx) - //if err != nil { - // return nil, err - //} - // - //res := &collector.ResourceSpec{ - // ClusterId: strconv.FormatInt(o.participantId, 10), - // Tag: resrcType, - //} - // - //if resp.Code != http.StatusOK { - // if resp.Data != nil { - // marshal, err := json.Marshal(resp.Data) - // if err != nil { - // return nil, err - // } - // - // errormdl := &omodel.Error{} - // err = json.Unmarshal(marshal, errormdl) - // if err != nil { - // return nil, err - // } - // return nil, errors.New(errormdl.Message) - // } - //} else { - // if resp.Data != nil { - // specs := &entity.OctResourceSpecs{} - // marshal, err := json.Marshal(resp.Data) - // if err != nil { - // return nil, err - // } - // err = json.Unmarshal(marshal, specs) - // if err != nil { - // return nil, err - // } - // clusterResources, err := genSpecs(specs, resrcType) - // if err != nil { - // return nil, err - // } - // res.Resources = clusterResources - // } - //} + resp, err := o.resourceSpecs(ctx) + if err != nil { + return nil, err + } - return nil, nil + res := &collector.ResourceSpec{ + ClusterId: strconv.FormatInt(o.participantId, 10), + Tag: resrcType, + } + + if resp.Code != http.StatusOK { + if resp.Data != nil { + marshal, err := json.Marshal(resp.Data) + if err != nil { + return nil, err + } + + errormdl := &omodel.Error{} + err = json.Unmarshal(marshal, errormdl) + if err != nil { + return nil, err + } + return nil, errors.New(errormdl.Message) + } + } else { + if resp.Data != nil { + specs := &entity.OctResourceSpecs{} + marshal, err := json.Marshal(resp.Data) + if err != nil { + return nil, err + } + err = json.Unmarshal(marshal, specs) + if err != nil { + return nil, err + } + clusterResources, err := genSpecs(specs, resrcType) + if err != nil { + return nil, err + } + res.Resources = clusterResources + } + } + + return res, nil } func genSpecs(specs *entity.OctResourceSpecs, resrcType string) ([]interface{}, error) { @@ -355,24 +379,207 @@ func genSpecs(specs *entity.OctResourceSpecs, resrcType string) ([]interface{}, if err != nil { return nil, err } - if spec.ResourceQuantity.BiV100 != "" { - + resType, err := chooseResourceType(spec) + if err != nil { + return nil, err } - //cres := &collector.ClusterResource{} - //card := &collector.Usage{ - // Type: ComputeSource[i], - // Name: strings.ToUpper(k), - // Total: &collector.UnitValue{Unit: spec.ResourceQuantity, Value: v.AccCardsNum}, - // Available: &collector.UnitValue{Unit: NUMBER, Value: v.AccCardsNum}, - //} - //spec.ResourceQuantity. + if resType == nil { + continue + } + res = append(res, resType) } } } + return res, nil +} + +func chooseResourceType(spec *omodel.Spec) (*collector.ClusterResource, error) { + if spec.ResourceQuantity.NvidiaA100 != "" { + tag, err := common2.GetJSONTag(spec, "NvidiaA100") + if err != nil { + return nil, err + } + cres, err := genClusterResources(tag, spec.ResourceQuantity.NvidiaA100, spec) + if err != nil { + return nil, err + } + return cres, nil + } else if spec.ResourceQuantity.NvidiaA10080G != "" { + tag, err := common2.GetJSONTag(spec, "NvidiaA100") + if err != nil { + return nil, err + } + cres, err := genClusterResources(tag, spec.ResourceQuantity.NvidiaA100, spec) + if err != nil { + return nil, err + } + return cres, nil + } else if spec.ResourceQuantity.MrV100 != "" { + tag, err := common2.GetJSONTag(spec, "NvidiaA100") + if err != nil { + return nil, err + } + cres, err := genClusterResources(tag, spec.ResourceQuantity.NvidiaA100, spec) + if err != nil { + return nil, err + } + return cres, nil + } else if spec.ResourceQuantity.BiV100 != "" { + tag, err := common2.GetJSONTag(spec, "NvidiaA100") + if err != nil { + return nil, err + } + cres, err := genClusterResources(tag, spec.ResourceQuantity.NvidiaA100, spec) + if err != nil { + return nil, err + } + return cres, nil + } else if spec.ResourceQuantity.MRV50 != "" { + tag, err := common2.GetJSONTag(spec, "NvidiaA100") + if err != nil { + return nil, err + } + cres, err := genClusterResources(tag, spec.ResourceQuantity.NvidiaA100, spec) + if err != nil { + return nil, err + } + return cres, nil + } else if spec.ResourceQuantity.BIV100 != "" { + tag, err := common2.GetJSONTag(spec, "NvidiaA100") + if err != nil { + return nil, err + } + cres, err := genClusterResources(tag, spec.ResourceQuantity.NvidiaA100, spec) + if err != nil { + return nil, err + } + return cres, nil + } else if spec.ResourceQuantity.BIV150 != "" { + tag, err := common2.GetJSONTag(spec, "NvidiaA100") + if err != nil { + return nil, err + } + cres, err := genClusterResources(tag, spec.ResourceQuantity.NvidiaA100, spec) + if err != nil { + return nil, err + } + return cres, nil + } else if spec.ResourceQuantity.MRV100 != "" { + tag, err := common2.GetJSONTag(spec, "NvidiaA100") + if err != nil { + return nil, err + } + cres, err := genClusterResources(tag, spec.ResourceQuantity.NvidiaA100, spec) + if err != nil { + return nil, err + } + return cres, nil + } else if spec.ResourceQuantity.CambriconComMlu != "" { + tag, err := common2.GetJSONTag(spec, "NvidiaA100") + if err != nil { + return nil, err + } + cres, err := genClusterResources(tag, spec.ResourceQuantity.NvidiaA100, spec) + if err != nil { + return nil, err + } + return cres, nil + } else if spec.ResourceQuantity.HygonComDcu != "" { + tag, err := common2.GetJSONTag(spec, "NvidiaA100") + if err != nil { + return nil, err + } + cres, err := genClusterResources(tag, spec.ResourceQuantity.NvidiaA100, spec) + if err != nil { + return nil, err + } + return cres, nil + } else if spec.ResourceQuantity.HuaweiComAscend910 != "" { + tag, err := common2.GetJSONTag(spec, "NvidiaA100") + if err != nil { + return nil, err + } + cres, err := genClusterResources(tag, spec.ResourceQuantity.NvidiaA100, spec) + if err != nil { + return nil, err + } + return cres, nil + } else if spec.ResourceQuantity.EnflameComGcu != "" { + tag, err := common2.GetJSONTag(spec, "NvidiaA100") + if err != nil { + return nil, err + } + cres, err := genClusterResources(tag, spec.ResourceQuantity.NvidiaA100, spec) + if err != nil { + return nil, err + } + return cres, nil + } else if spec.ResourceQuantity.MXN260 != "" { + tag, err := common2.GetJSONTag(spec, "NvidiaA100") + if err != nil { + return nil, err + } + cres, err := genClusterResources(tag, spec.ResourceQuantity.NvidiaA100, spec) + if err != nil { + return nil, err + } + return cres, nil + } + return nil, nil } +func genClusterResources(cType string, cNum string, s *omodel.Spec) (*collector.ClusterResource, error) { + cres := &collector.ClusterResource{} + bres := make([]*collector.Usage, 0) + + cardNum, err := strconv.ParseInt(cNum, 10, 64) + if err != nil { + return nil, err + } + cpuCore, err := strconv.ParseInt(s.ResourceQuantity.Cpu, 10, 64) + if err != nil { + return nil, err + } + gi := strings.Split(s.ResourceQuantity.Memory, Gi) + if len(gi) != 1 { + return nil, fmt.Errorf("s.ResourceQuantity.Memory convert error: %s", s.ResourceQuantity.Memory) + } + + memGi, err := strconv.ParseInt(gi[0], 10, 64) + if err != nil { + return nil, err + } + + card := &collector.Usage{ + Type: ComputeSourceToCardType[cType], + Name: strings.ToUpper(cType), + Total: &collector.UnitValue{Unit: NUMBER, Value: cardNum}, + Available: &collector.UnitValue{Unit: NUMBER, Value: cardNum}, + } + cpu := &collector.Usage{ + Type: strings.ToUpper(CPU), + Name: strings.ToUpper(CPU), + Total: &collector.UnitValue{Unit: CPUCORE, Value: cpuCore}, + Available: &collector.UnitValue{Unit: CPUCORE, Value: cpuCore}, + } + mem := &collector.Usage{ + Type: strings.ToUpper(MEMORY), + Name: strings.ToUpper(RAM), + Total: &collector.UnitValue{Unit: GIGABYTE, Value: memGi}, + Available: &collector.UnitValue{Unit: GIGABYTE, Value: memGi}, + } + + bres = append(bres, cpu) + bres = append(bres, mem) + + cres.Resource = card + cres.BaseResources = bres + + return cres, nil +} + // inference func (o *OctopusHttp) GetClusterInferUrl(ctx context.Context, option *option.InferOption) (*inference.ClusterInferUrl, error) { return nil, errors.New(NotImplementError) diff --git a/internal/storeLink/octopusHttp/token.go b/internal/storeLink/octopusHttp/token.go index 42a86a12..4cd634e5 100644 --- a/internal/storeLink/octopusHttp/token.go +++ b/internal/storeLink/octopusHttp/token.go @@ -8,7 +8,7 @@ import ( ) const ( - GetToken = "openaiserver/v1/authmanage/token" + GetToken = "api/v1/token" ) type TokenModel struct { @@ -31,20 +31,23 @@ type Login struct { } type Token struct { - ip string - user string - pwd string - ttp *TokenTimePair + server string + host string + user string + pwd string + tokenUrl string + ttp *TokenTimePair } -func NewToken(ip, user, pwd string) (*Token, error) { +func NewToken(server, host, user, pwd string) (*Token, error) { login := Login{ Username: user, Password: pwd, } jsonStr, _ := json.Marshal(login) - tokenUrl := ip + Forward_Slash + GetToken - token, tm, err := generateToken(jsonStr, tokenUrl) + tokenUrl := server + GetToken + + token, tm, err := generateToken(jsonStr, tokenUrl, host) if err != nil { return nil, err } @@ -52,7 +55,7 @@ func NewToken(ip, user, pwd string) (*Token, error) { Token: token, ExpiredAt: tm, } - return &Token{ttp: ttp, ip: ip, user: user, pwd: pwd}, nil + return &Token{ttp: ttp, host: host, user: user, pwd: pwd, tokenUrl: tokenUrl}, nil } func (t *Token) update() error { @@ -61,8 +64,7 @@ func (t *Token) update() error { Password: t.pwd, } jsonStr, _ := json.Marshal(login) - tokenUrl := t.ip + Forward_Slash + GetToken - token, tm, err := generateToken(jsonStr, tokenUrl) + token, tm, err := generateToken(jsonStr, t.tokenUrl, t.host) if err != nil { return err } @@ -84,7 +86,7 @@ func (t *Token) Get() (string, error) { return t.ttp.Token, nil } -func generateToken(jsonStr []byte, tokenUrl string) (string, time.Time, error) { +func generateToken(jsonStr []byte, tokenUrl string, host string) (string, time.Time, error) { client := resty.New().SetTimeout(time.Duration(5) * time.Second) req := client.R() @@ -97,6 +99,7 @@ func generateToken(jsonStr []byte, tokenUrl string) (string, time.Time, error) { _, err := req. SetResult(&tokenResp). SetHeader("Content-Type", "application/json"). + SetQueryParam("addr", host). SetBody(jsonStr). Post(tokenUrl) if err != nil {