diff --git a/go.mod b/go.mod index 86520cda..f833c6b3 100644 --- a/go.mod +++ b/go.mod @@ -18,7 +18,7 @@ require ( github.com/prometheus/common v0.60.1 github.com/robfig/cron/v3 v3.0.1 github.com/zeromicro/go-zero v1.7.3 - gitlink.org.cn/JointCloud/pcm-ac v0.0.0-20240920093406-601f283f0185 + gitlink.org.cn/JointCloud/pcm-ac v0.0.0-20241202005821-e7089b705a97 gitlink.org.cn/JointCloud/pcm-hpc v0.0.0-20241125115811-72f3568255a4 gitlink.org.cn/JointCloud/pcm-modelarts v0.0.0-20240918011543-482dcd609877 gitlink.org.cn/JointCloud/pcm-octopus v0.0.0-20240817071412-44397870b110 diff --git a/go.sum b/go.sum index e6baf4e3..2dc2c864 100644 --- a/go.sum +++ b/go.sum @@ -524,8 +524,8 @@ github.com/yuin/gopher-lua v1.1.1 h1:kYKnWBjvbNP4XLT3+bPEwAXJx262OhaHDWDVOPjL46M github.com/yuin/gopher-lua v1.1.1/go.mod h1:GBR0iDaNXjAgGg9zfCvksxSRnQx76gclCIb7kdAd1Pw= github.com/zeromicro/go-zero v1.7.3 h1:yDUQF2DXDhUHc77/NZF6mzsoRPMBfldjPmG2O/ZSzss= github.com/zeromicro/go-zero v1.7.3/go.mod h1:9JIW3gHBGuc9LzvjZnNwINIq9QdiKu3AigajLtkJamQ= -gitlink.org.cn/JointCloud/pcm-ac v0.0.0-20240920093406-601f283f0185 h1:B+YBB5xHlIAS6ILuaCGQwbOpr/L6LOHAlj9PeFUCetM= -gitlink.org.cn/JointCloud/pcm-ac v0.0.0-20240920093406-601f283f0185/go.mod h1:3eECiw9O2bIFkkePlloKyLNXiqBAhOxNrDoGaaGseGY= +gitlink.org.cn/JointCloud/pcm-ac v0.0.0-20241202005821-e7089b705a97 h1:m4eVyO1NkPk5Bqsr2htNLPfIpkUeF2GJN3scTV8BgQ4= +gitlink.org.cn/JointCloud/pcm-ac v0.0.0-20241202005821-e7089b705a97/go.mod h1:3eECiw9O2bIFkkePlloKyLNXiqBAhOxNrDoGaaGseGY= gitlink.org.cn/JointCloud/pcm-hpc v0.0.0-20241125115811-72f3568255a4 h1:WIs/189lRLNMXF2ui/Wm1+Y55eJ53BVGx+4+gdn9cls= gitlink.org.cn/JointCloud/pcm-hpc v0.0.0-20241125115811-72f3568255a4/go.mod h1:YbuoRgF9sEVvNJPQtGRjdocX7Du6NBOTLn+GVwqRVjo= gitlink.org.cn/JointCloud/pcm-modelarts v0.0.0-20240918011543-482dcd609877 h1:a+1FpxqLPRojlAkJlAeRhKRbxajymXYgrM+s9bfQx0E= diff --git a/internal/logic/schedule/queryresourceslogic.go b/internal/logic/schedule/queryresourceslogic.go index 9191df30..4af16683 100644 --- a/internal/logic/schedule/queryresourceslogic.go +++ b/internal/logic/schedule/queryresourceslogic.go @@ -2,6 +2,9 @@ package schedule import ( "context" + "gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/service/collector" + "sync" + "time" "gitlink.org.cn/JointCloud/pcm-coordinator/internal/svc" "gitlink.org.cn/JointCloud/pcm-coordinator/internal/types" @@ -25,11 +28,94 @@ func NewQueryResourcesLogic(ctx context.Context, svcCtx *svc.ServiceContext) *Qu func (l *QueryResourcesLogic) QueryResources(req *types.QueryResourcesReq) (resp *types.QueryResourcesResp, err error) { resp = &types.QueryResourcesResp{} + var ulist []*collector.ResourceUsage - _, err = l.svcCtx.Scheduler.AiService.AiCollectorAdapterMap[req.AdapterId]["req.ClusterId"].GetResourceUsage(l.ctx) + clusters, err := l.svcCtx.Scheduler.AiStorages.GetClustersByAdapterId(req.AdapterId) if err != nil { return nil, err } + var ch = make(chan *collector.ResourceUsage, len(clusters.List)) + + var wg sync.WaitGroup + for _, cluster := range clusters.List { + wg.Add(1) + c := cluster + go func() { + defer wg.Done() + + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + + col, found := l.svcCtx.Scheduler.AiService.AiCollectorAdapterMap[req.AdapterId][c.Id] + if !found { + return + } + usage, err := col.GetResourceUsage(ctx) + if err != nil { + return + } + ch <- usage + }() + } + wg.Wait() + close(ch) + + for v := range ch { + ulist = append(ulist, v) + } + + // handle empty usage + rus := handleEmptyResourceUsage(clusters.List, ulist) + resp.Data = rus + return resp, nil } + +func handleEmptyResourceUsage(list []types.ClusterInfo, ulist []*collector.ResourceUsage) []*collector.ResourceUsage { + var rus []*collector.ResourceUsage + m := make(map[string]interface{}) + for _, u := range ulist { + if u == nil { + continue + } + m[u.ClusterId] = u + } + + for _, l := range list { + s, ok := m[l.Id] + if !ok { + ru := &collector.ResourceUsage{ + ClusterId: l.Id, + Usages: nil, + Msg: "resources unavailable, please retry later", + } + rus = append(rus, ru) + } else { + if s == nil { + ru := &collector.ResourceUsage{ + ClusterId: l.Id, + Usages: nil, + Msg: "resources unavailable, please retry later", + } + rus = append(rus, ru) + } else { + r, ok := s.(*collector.ResourceUsage) + if ok { + if r.Usages == nil { + ru := &collector.ResourceUsage{ + ClusterId: r.ClusterId, + Usages: nil, + Msg: "resources unavailable, please retry later", + } + rus = append(rus, ru) + } else { + rus = append(rus, r) + } + } + } + } + } + + return rus +} diff --git a/internal/scheduler/service/collector/collector.go b/internal/scheduler/service/collector/collector.go index 3ece0fdc..5641770c 100644 --- a/internal/scheduler/service/collector/collector.go +++ b/internal/scheduler/service/collector/collector.go @@ -20,6 +20,7 @@ type AiCollector interface { type ResourceUsage struct { ClusterId string Usages []*Usage + Msg string } type Usage struct { diff --git a/internal/storeLink/shuguangai.go b/internal/storeLink/shuguangai.go index ca4239f5..bfc2655a 100644 --- a/internal/storeLink/shuguangai.go +++ b/internal/storeLink/shuguangai.go @@ -1017,6 +1017,7 @@ func (s *ShuguangAi) GetResourceUsage(ctx context.Context) (*collector.ResourceU var usages []*collector.Usage go func() { + defer close(ch) queueResp, err := s.aCRpc.SelectQueueByUser(ctx, nil) if err != nil { wg.Done() @@ -1062,6 +1063,7 @@ func (s *ShuguangAi) GetResourceUsage(ctx context.Context) (*collector.ResourceU }() go func() { + defer close(usageCh) jobList, err := s.aCRpc.ListJob(ctx, nil) if err != nil { wg.Done() @@ -1098,7 +1100,7 @@ func (s *ShuguangAi) GetResourceUsage(ctx context.Context) (*collector.ResourceU for v := range ch { switch v.Type { - case CPU: + case ACCPU: cpu := &collector.Usage{ Type: ACCPU, Total: v.Total, @@ -1123,8 +1125,6 @@ func (s *ShuguangAi) GetResourceUsage(ctx context.Context) (*collector.ResourceU }() wg.Wait() - close(ch) - close(usageCh) if len(usageCh) == 0 { for v := range ch {