From a6267288be745c304c8d83c9faa9840b82b67a1a Mon Sep 17 00:00:00 2001 From: tzwang Date: Fri, 29 Nov 2024 11:39:33 +0800 Subject: [PATCH] added GetResourceUsage --- .../scheduler/service/collector/collector.go | 6 +- internal/storeLink/openi.go | 7 + internal/storeLink/shuguangai.go | 132 +++++++++++++++++- 3 files changed, 141 insertions(+), 4 deletions(-) diff --git a/internal/scheduler/service/collector/collector.go b/internal/scheduler/service/collector/collector.go index 9ea2284e..3ece0fdc 100644 --- a/internal/scheduler/service/collector/collector.go +++ b/internal/scheduler/service/collector/collector.go @@ -19,13 +19,13 @@ type AiCollector interface { type ResourceUsage struct { ClusterId string - Resources []*Usage + Usages []*Usage } type Usage struct { - Total float64 + Total int64 Type string - Available float64 + Available int64 } type ResourceStats struct { diff --git a/internal/storeLink/openi.go b/internal/storeLink/openi.go index 8d163d40..d9fb5f6d 100644 --- a/internal/storeLink/openi.go +++ b/internal/storeLink/openi.go @@ -1 +1,8 @@ package storeLink + +type OpenI struct { +} + +func NewOpenI() *OpenI { + return &OpenI{} +} diff --git a/internal/storeLink/shuguangai.go b/internal/storeLink/shuguangai.go index fe428a56..30f99138 100644 --- a/internal/storeLink/shuguangai.go +++ b/internal/storeLink/shuguangai.go @@ -27,15 +27,18 @@ import ( "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/constants" "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/utils" "mime/multipart" + "net/http" "strconv" "strings" "sync" + "sync/atomic" "time" ) const ( RAM_SIZE_1G = 1024 // 1G WORKER_NUMBER = 1 + ACCPU = "CPU" DCU = "DCU" DCU_TOPS = 24.5 PYTORCH = "Pytorch" @@ -1001,5 +1004,132 @@ func (s *ShuguangAi) CheckModelExistence(ctx context.Context, name string, mtype } func (s *ShuguangAi) GetResourceUsage(ctx context.Context) (*collector.ResourceUsage, error) { - return nil, nil + var wg sync.WaitGroup + wg.Add(2) + + var usageCh = make(chan *collector.Usage, 2) + + resUsage := &collector.ResourceUsage{ + ClusterId: strconv.FormatInt(s.participantId, 10), + } + + var usages []*collector.Usage + + go func() { + queueResp, err := s.aCRpc.SelectQueueByUser(ctx, nil) + if err != nil { + wg.Done() + return + } + + if len(queueResp.Data) == 0 { + wg.Done() + return + } + + var data *hpcAC.QueueData + for _, datum := range queueResp.Data { + if datum.QueueName == RESOURCE_GROUP { + data = datum + break + } + } + + var freeNodes int64 + var cpuPerNode int64 + var dcuPerNode int64 + freeNodes, _ = strconv.ParseInt(data.QueFreeNodes, 10, 10) + cpuPerNode, _ = strconv.ParseInt(data.QueMaxPPN, 10, 10) + dcuPerNode, _ = strconv.ParseInt(data.QueMaxDcuPN, 10, 10) + + cpu := &collector.Usage{ + Type: ACCPU, + Total: freeNodes * cpuPerNode, + } + + usageCh <- cpu + + dcu := &collector.Usage{ + Type: DCU, + Total: freeNodes * dcuPerNode, + } + + usageCh <- dcu + + close(usageCh) + + }() + + go func() { + jobList, err := s.aCRpc.ListJob(ctx, nil) + if err != nil { + wg.Done() + return + } + if len(jobList.Jobs) == 0 { + wg.Done() + return + } + + var cpureqed atomic.Int64 + var dcureqed atomic.Int64 + var jwg sync.WaitGroup + for _, j := range jobList.Jobs { + jwg.Add(1) + job := j + go func() { + h := http.Request{} + jreq := &hpcAC.JobDetailReq{ + JobId: job.JobId, + } + detail, err := s.aCRpc.GetJobDetail(h.Context(), jreq) + if err != nil { + jwg.Done() + return + } + cpureqed.Add(int64(detail.Data.ProcNumReq)) + dcureqed.Add(int64(detail.Data.DcuNumReq)) + jwg.Done() + }() + } + jwg.Wait() + + for v := range usageCh { + switch v.Type { + case CPU: + v.Available = v.Total - cpureqed.Load() + case DCU: + v.Available = v.Total - dcureqed.Load() + } + } + + //for i := 0; i < len(usageCh); i++ { + // select { + // case v := <-usageCh: + // switch v.Type { + // case CPU: + // v.Available = v.Total - cpureqed.Load() + // case DCU: + // v.Available = v.Total - dcureqed.Load() + // } + // + // case <-time.After(3 * time.Second): + // fmt.Println("timeout") + // continue + // } + //} + + wg.Done() + + }() + + wg.Wait() + + for v := range usageCh { + usages = append(usages, v) + } + + resUsage.Usages = usages + + return resUsage, nil }