From 2d82c1d401ca994b2ba6310969456b85593c933a Mon Sep 17 00:00:00 2001 From: tzwang Date: Tue, 17 Dec 2024 20:20:12 +0800 Subject: [PATCH] update openi --- go.mod | 2 +- go.sum | 4 +- internal/scheduler/service/aiService.go | 2 +- internal/scheduler/strategy/loadPriority.go | 6 +- internal/storeLink/openi.go | 127 +++++++++++++++++++- internal/storeLink/storeLink.go | 2 + 6 files changed, 134 insertions(+), 9 deletions(-) diff --git a/go.mod b/go.mod index 05b30ef1..667cd805 100644 --- a/go.mod +++ b/go.mod @@ -22,7 +22,7 @@ require ( gitlink.org.cn/JointCloud/pcm-hpc v0.0.0-20241125115811-72f3568255a4 gitlink.org.cn/JointCloud/pcm-modelarts v0.0.0-20241211021152-9771ba5670b7 gitlink.org.cn/JointCloud/pcm-octopus v0.0.0-20240817071412-44397870b110 - gitlink.org.cn/JointCloud/pcm-openi v0.0.0-20241206092318-9f0c3142fced + gitlink.org.cn/JointCloud/pcm-openi v0.0.0-20241217104229-e46c7e0c8e56 gitlink.org.cn/JointCloud/pcm-openstack v0.0.0-20240403033338-e7edabad4203 gitlink.org.cn/JointCloud/pcm-slurm v0.0.0-20240301080743-8b94bbaf57f5 go.opentelemetry.io/otel/trace v1.32.0 diff --git a/go.sum b/go.sum index 85174bb2..d9ee12bb 100644 --- a/go.sum +++ b/go.sum @@ -532,8 +532,8 @@ gitlink.org.cn/JointCloud/pcm-modelarts v0.0.0-20241211021152-9771ba5670b7 h1:sP gitlink.org.cn/JointCloud/pcm-modelarts v0.0.0-20241211021152-9771ba5670b7/go.mod h1:V19vFg8dWRAbaskASoSj70dgpacswOqZu/SaI02dxac= gitlink.org.cn/JointCloud/pcm-octopus v0.0.0-20240817071412-44397870b110 h1:GaXwr5sgDh0raHjUf9IewTvnRvajYea7zbLsaerYyXo= gitlink.org.cn/JointCloud/pcm-octopus v0.0.0-20240817071412-44397870b110/go.mod h1:QOD5+/l2D+AYBjF2h5T0mdJyfGAmF78QmeKdbBXbjLQ= -gitlink.org.cn/JointCloud/pcm-openi v0.0.0-20241206092318-9f0c3142fced h1:N/UMOMm5fzELf52Neze+2oNMEUejFWWqbFwzhzJvgHc= -gitlink.org.cn/JointCloud/pcm-openi v0.0.0-20241206092318-9f0c3142fced/go.mod h1:oDJrr/TNbUCaVjI+RaOrUtGawD7UPAvp7U/oVgT2Dhc= +gitlink.org.cn/JointCloud/pcm-openi v0.0.0-20241217104229-e46c7e0c8e56 h1:zfE8/x16ZF4P2b8u24bHjH1ZBp/pA+4Zt4LKMasXlJs= +gitlink.org.cn/JointCloud/pcm-openi v0.0.0-20241217104229-e46c7e0c8e56/go.mod h1:oDJrr/TNbUCaVjI+RaOrUtGawD7UPAvp7U/oVgT2Dhc= gitlink.org.cn/JointCloud/pcm-openstack v0.0.0-20240403033338-e7edabad4203 h1:s6PsZ1+bev294IWdZRlV7mnOwI1+UzFcldVW/BqhQzI= gitlink.org.cn/JointCloud/pcm-openstack v0.0.0-20240403033338-e7edabad4203/go.mod h1:i2rrbMQ+Fve345BY9Heh4MUqVTAimZQElQhzzRee5B8= gitlink.org.cn/JointCloud/pcm-slurm v0.0.0-20240301080743-8b94bbaf57f5 h1:+/5vnzkJBfMRnya1NrhOzlroUtRa5ePiYbPKlHLoLV0= diff --git a/internal/scheduler/service/aiService.go b/internal/scheduler/service/aiService.go index 1a26c82a..014cc323 100644 --- a/internal/scheduler/service/aiService.go +++ b/internal/scheduler/service/aiService.go @@ -91,7 +91,7 @@ func InitAiClusterMap(conf *config.Config, clusters []types.ClusterInfo) (map[st inferenceMap[c.Id] = sgai case OPENI: id, _ := strconv.ParseInt(c.Id, 10, 64) - openi := storeLink.NewOpenI(c.Server, id, c.Username, c.Token) + openi := storeLink.NewOpenI("http://localhost:2024", id, c.Username, c.Token) collectorMap[c.Id] = openi executorMap[c.Id] = openi inferenceMap[c.Id] = openi diff --git a/internal/scheduler/strategy/loadPriority.go b/internal/scheduler/strategy/loadPriority.go index 562a27c4..f205d1a2 100644 --- a/internal/scheduler/strategy/loadPriority.go +++ b/internal/scheduler/strategy/loadPriority.go @@ -2,11 +2,13 @@ package strategy type LoadPriority struct { replicas int32 + Clusters []*CLusterLoad } type CLusterLoad struct { - ClusterId string - TaskRunning string + ClusterId string + TaskRunningNum int64 + TaskPredictedNum int64 } func NewLoadPriority() *LoadPriority { diff --git a/internal/storeLink/openi.go b/internal/storeLink/openi.go index d4b0fe77..5bcf699d 100644 --- a/internal/storeLink/openi.go +++ b/internal/storeLink/openi.go @@ -15,6 +15,7 @@ import ( "strconv" "strings" "sync" + "sync/atomic" ) const ( @@ -123,11 +124,17 @@ func (o OpenI) GetResourceSpecs(ctx context.Context) (*collector.ResourceSpec, e res := &collector.ResourceSpec{ ClusterId: strconv.FormatInt(o.participantId, 10), } - url := o.host + "/api/v1/task/creationRequired" + creationRequirelUrl := o.host + "/api/v1/task/creationRequired" + reposUrl := o.host + "/api/v1/user/repos" + taskListUrl := o.host + "/api/v1/task/list" + //taskDetailsUrl := o.host + "/api/v1/task/detail" + var wg sync.WaitGroup var ch = make(chan *collector.Usage) defer close(ch) + var once sync.Once + for c := range ComputeSource { wg.Add(1) i := c @@ -151,9 +158,9 @@ func (o OpenI) GetResourceSpecs(ctx context.Context) (*collector.ResourceSpec, e }{} req := common.GetRestyRequest(common.TIMEOUT) - r, _ := http.NewRequest("GET", url, byt) + r, _ := http.NewRequest("GET", creationRequirelUrl, byt) req.RawRequest = r - req.URL = url + req.URL = creationRequirelUrl _, err := req. SetHeader("Content-Type", "application/json"). @@ -170,6 +177,24 @@ func (o OpenI) GetResourceSpecs(ctx context.Context) (*collector.ResourceSpec, e return } + // balance + wg.Add(1) + go func() { + defer wg.Done() + var balanceCheck = func() { + balance := resp.Data.Data.PointAccount.Balance + bal := &collector.Usage{} + bal.Type = strings.ToUpper(BALANCE) + bal.Total = &collector.UnitValue{ + Unit: POINT, + Value: balance, + } + + ch <- bal + } + once.Do(balanceCheck) + }() + m := make(map[string]struct { Id int `json:"id"` AccCardsNum int `json:"acc_cards_num"` @@ -209,12 +234,108 @@ func (o OpenI) GetResourceSpecs(ctx context.Context) (*collector.ResourceSpec, e }() } + // tasks + var jch = make(chan *collector.Usage, 1) + wg.Add(1) + go func() { + defer close(jch) + defer wg.Done() + reporesp := struct { + Code int `json:"code"` + Msg string `json:"msg"` + Data []model.Repo `json:"data"` + }{} + + reporeq := common.GetRestyRequest(common.TIMEOUT) + repor, _ := http.NewRequest("GET", reposUrl, nil) + reporeq.RawRequest = repor + reporeq.URL = reposUrl + + _, err := reporeq. + SetHeader("Content-Type", "application/json"). + SetQueryParam(common.ACCESSTOKEN, o.accessToken). + SetResult(&reporesp). + Send() + + if err != nil { + return + } + + if len(reporesp.Data) == 0 { + return + } + + var runningJobs atomic.Int64 + var jwg sync.WaitGroup + for _, datum := range reporesp.Data { + jwg.Add(1) + dat := datum + go func() { + defer jwg.Done() + param := model.TaskListParam{ + UserName: o.userName, + RepoName: dat.Name, + } + + b, _ := json.Marshal(param) + byt := bytes.NewBuffer(b) + + resp := struct { + Code int `json:"code"` + Msg string `json:"msg"` + Data model.TaskList `json:"data"` + }{} + + req := common.GetRestyRequest(common.TIMEOUT) + r, _ := http.NewRequest("GET", taskListUrl, byt) + req.RawRequest = r + req.URL = taskListUrl + + _, err := req. + SetHeader("Content-Type", "application/json"). + SetQueryParam(common.ACCESSTOKEN, o.accessToken). + SetBody(byt). + SetResult(&resp). + Send() + + if err != nil { + return + } + + if len(resp.Data.Data.Tasks) == 0 { + return + } + + for _, task := range resp.Data.Data.Tasks { + if task.Task.Status == RUNNING { + runningJobs.Add(1) + } + } + }() + } + jwg.Wait() + + run := &collector.Usage{} + run.Type = strings.ToUpper(RUNNING) + run.Total = &collector.UnitValue{ + Unit: NUMBER, + Value: runningJobs.Load(), + } + + jch <- run + + }() + go func() { for v := range ch { resources = append(resources, v) } }() + for v := range jch { + resources = append(resources, v) + } + wg.Wait() res.Resources = resources diff --git a/internal/storeLink/storeLink.go b/internal/storeLink/storeLink.go index 39e06a71..c8c5e132 100644 --- a/internal/storeLink/storeLink.go +++ b/internal/storeLink/storeLink.go @@ -80,6 +80,8 @@ const ( STORAGE = "STORAGE" DISK = "disk" RMB = "rmb" + POINT = "point" + RUNNING = "RUNNING" ) var (