From 20b1fdbbba9aa04474199099236be4ecc5ff3b58 Mon Sep 17 00:00:00 2001 From: tzwang Date: Wed, 18 Dec 2024 18:04:58 +0800 Subject: [PATCH] update loadpriority --- internal/scheduler/strategy/loadPriority.go | 36 +++++++++++- internal/storeLink/openi.go | 9 +++ internal/storeLink/shuguangai.go | 63 +++++++++++++++------ internal/storeLink/storeLink.go | 2 +- 4 files changed, 89 insertions(+), 21 deletions(-) diff --git a/internal/scheduler/strategy/loadPriority.go b/internal/scheduler/strategy/loadPriority.go index f205d1a2..325f3bdc 100644 --- a/internal/scheduler/strategy/loadPriority.go +++ b/internal/scheduler/strategy/loadPriority.go @@ -1,7 +1,12 @@ package strategy +import ( + "gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/service/collector" + "gitlink.org.cn/JointCloud/pcm-coordinator/internal/storeLink" +) + type LoadPriority struct { - replicas int32 + Replicas int32 Clusters []*CLusterLoad } @@ -11,6 +16,31 @@ type CLusterLoad struct { TaskPredictedNum int64 } -func NewLoadPriority() *LoadPriority { - return &LoadPriority{} +func NewLoadPriority(replicas int32, resources []*collector.ResourceSpec) *LoadPriority { + var clusters []*CLusterLoad + + for _, resource := range resources { + if resource.ClusterId == "" { + continue + } + cluster := &CLusterLoad{ + ClusterId: resource.ClusterId, + } + for _, res := range resource.Resources { + r, ok := res.(*collector.Usage) + if !ok { + continue + } + switch r.Type { + case storeLink.RUNNINGTASK: + num, ok := r.Total.Value.(int64) + if !ok { + continue + } + cluster.TaskRunningNum = num + } + } + clusters = append(clusters, cluster) + } + return &LoadPriority{Replicas: replicas, Clusters: clusters} } diff --git a/internal/storeLink/openi.go b/internal/storeLink/openi.go index 5ac8ac99..7aea5f8c 100644 --- a/internal/storeLink/openi.go +++ b/internal/storeLink/openi.go @@ -346,6 +346,15 @@ func (o OpenI) GetResourceSpecs(ctx context.Context) (*collector.ResourceSpec, e } }() + go func() { + rate := &collector.Usage{ + Type: strings.ToUpper(RATE), + Total: &collector.UnitValue{Unit: PERHOUR, Value: 1}, + } + + ch <- rate + }() + go func() { wg.Wait() close(ch) diff --git a/internal/storeLink/shuguangai.go b/internal/storeLink/shuguangai.go index cb116580..807dcce2 100644 --- a/internal/storeLink/shuguangai.go +++ b/internal/storeLink/shuguangai.go @@ -1005,12 +1005,12 @@ func (s *ShuguangAi) CheckModelExistence(ctx context.Context, name string, mtype func (s *ShuguangAi) GetResourceSpecs(ctx context.Context) (*collector.ResourceSpec, error) { var wg sync.WaitGroup - wg.Add(4) - + var uwg sync.WaitGroup + wg.Add(2) + uwg.Add(4) var ch = make(chan *collector.Usage, 2) var qCh = make(chan *collector.Usage, 2) var uCh = make(chan *collector.Usage) - defer close(uCh) resUsage := &collector.ResourceSpec{ ClusterId: strconv.FormatInt(s.participantId, 10), @@ -1041,12 +1041,15 @@ func (s *ShuguangAi) GetResourceSpecs(ctx context.Context) (*collector.ResourceS queChargeRate, _ := strconv.ParseFloat(data.QueChargeRate, 64) - rate := &collector.Usage{ - Type: strings.ToUpper(RATE), - Total: &collector.UnitValue{Unit: PERHOUR, Value: queChargeRate}, - } + go func() { + defer uwg.Done() + rate := &collector.Usage{ + Type: strings.ToUpper(RATE), + Total: &collector.UnitValue{Unit: PERHOUR, Value: queChargeRate}, + } - uCh <- rate + uCh <- rate + }() var freeNodes int64 var cpuPerNode int64 @@ -1080,8 +1083,32 @@ func (s *ShuguangAi) GetResourceSpecs(ctx context.Context) (*collector.ResourceS if err != nil { return } + + run := &collector.Usage{} + run.Type = strings.ToUpper(RUNNINGTASK) + if len(jobList.Jobs) == 0 { + go func() { + defer uwg.Done() + run.Total = &collector.UnitValue{ + Unit: NUMBER, + Value: 0, + } + + uCh <- run + }() + return + } else { + go func() { + defer uwg.Done() + run.Total = &collector.UnitValue{ + Unit: NUMBER, + Value: len(jobList.Jobs), + } + + uCh <- run + }() } var cpureqed atomic.Int64 @@ -1091,19 +1118,18 @@ func (s *ShuguangAi) GetResourceSpecs(ctx context.Context) (*collector.ResourceS jwg.Add(1) job := j go func() { + defer jwg.Done() h := http.Request{} jreq := &hpcAC.JobDetailReq{ JobId: job.JobId, } detail, err := s.aCRpc.GetJobDetail(h.Context(), jreq) if err != nil || detail.Data == nil { - jwg.Done() return } cpureqed.Add(int64(detail.Data.ProcNumReq)) dcureqed.Add(int64(detail.Data.DcuNumReq)) - jwg.Done() }() } jwg.Wait() @@ -1139,7 +1165,7 @@ func (s *ShuguangAi) GetResourceSpecs(ctx context.Context) (*collector.ResourceS // 查询用户共享存储配额及使用量 go func() { - defer wg.Done() + defer uwg.Done() diskReq := &hpcAC.ParaStorQuotaReq{} diskResp, err := s.aCRpc.ParaStorQuota(ctx, diskReq) if err != nil { @@ -1167,7 +1193,7 @@ func (s *ShuguangAi) GetResourceSpecs(ctx context.Context) (*collector.ResourceS // 查询用户信息 go func() { - defer wg.Done() + defer uwg.Done() userReq := &hpcAC.GetUserInfoReq{} userinfo, err := s.aCRpc.GetUserInfo(ctx, userReq) if err != nil { @@ -1187,11 +1213,16 @@ func (s *ShuguangAi) GetResourceSpecs(ctx context.Context) (*collector.ResourceS }() go func() { - for v := range uCh { - resources = append(resources, v) - } + uwg.Wait() + close(uCh) }() + for v := range uCh { + resources = append(resources, v) + } + + wg.Wait() + if len(qCh) == 0 { for v := range ch { v.Available = v.Total @@ -1203,8 +1234,6 @@ func (s *ShuguangAi) GetResourceSpecs(ctx context.Context) (*collector.ResourceS } } - wg.Wait() - resUsage.Resources = resources return resUsage, nil diff --git a/internal/storeLink/storeLink.go b/internal/storeLink/storeLink.go index e3b76286..3b1c2956 100644 --- a/internal/storeLink/storeLink.go +++ b/internal/storeLink/storeLink.go @@ -73,7 +73,7 @@ const ( DEPLOY_INSTANCE_PREFIEX = "infer" BALANCE = "balance" RATE = "rate" - PERHOUR = "rmb-per-hour" + PERHOUR = "per-hour" NUMBER = "number" KILOBYTE = "kb" CPUCORE = "core"