| @@ -18,7 +18,7 @@ require ( | |||
| github.com/prometheus/common v0.60.1 | |||
| github.com/robfig/cron/v3 v3.0.1 | |||
| github.com/zeromicro/go-zero v1.7.3 | |||
| gitlink.org.cn/JointCloud/pcm-ac v0.0.0-20240920093406-601f283f0185 | |||
| gitlink.org.cn/JointCloud/pcm-ac v0.0.0-20241202005821-e7089b705a97 | |||
| gitlink.org.cn/JointCloud/pcm-hpc v0.0.0-20241125115811-72f3568255a4 | |||
| gitlink.org.cn/JointCloud/pcm-modelarts v0.0.0-20240918011543-482dcd609877 | |||
| gitlink.org.cn/JointCloud/pcm-octopus v0.0.0-20240817071412-44397870b110 | |||
| @@ -524,8 +524,8 @@ github.com/yuin/gopher-lua v1.1.1 h1:kYKnWBjvbNP4XLT3+bPEwAXJx262OhaHDWDVOPjL46M | |||
| github.com/yuin/gopher-lua v1.1.1/go.mod h1:GBR0iDaNXjAgGg9zfCvksxSRnQx76gclCIb7kdAd1Pw= | |||
| github.com/zeromicro/go-zero v1.7.3 h1:yDUQF2DXDhUHc77/NZF6mzsoRPMBfldjPmG2O/ZSzss= | |||
| github.com/zeromicro/go-zero v1.7.3/go.mod h1:9JIW3gHBGuc9LzvjZnNwINIq9QdiKu3AigajLtkJamQ= | |||
| gitlink.org.cn/JointCloud/pcm-ac v0.0.0-20240920093406-601f283f0185 h1:B+YBB5xHlIAS6ILuaCGQwbOpr/L6LOHAlj9PeFUCetM= | |||
| gitlink.org.cn/JointCloud/pcm-ac v0.0.0-20240920093406-601f283f0185/go.mod h1:3eECiw9O2bIFkkePlloKyLNXiqBAhOxNrDoGaaGseGY= | |||
| gitlink.org.cn/JointCloud/pcm-ac v0.0.0-20241202005821-e7089b705a97 h1:m4eVyO1NkPk5Bqsr2htNLPfIpkUeF2GJN3scTV8BgQ4= | |||
| gitlink.org.cn/JointCloud/pcm-ac v0.0.0-20241202005821-e7089b705a97/go.mod h1:3eECiw9O2bIFkkePlloKyLNXiqBAhOxNrDoGaaGseGY= | |||
| gitlink.org.cn/JointCloud/pcm-hpc v0.0.0-20241125115811-72f3568255a4 h1:WIs/189lRLNMXF2ui/Wm1+Y55eJ53BVGx+4+gdn9cls= | |||
| gitlink.org.cn/JointCloud/pcm-hpc v0.0.0-20241125115811-72f3568255a4/go.mod h1:YbuoRgF9sEVvNJPQtGRjdocX7Du6NBOTLn+GVwqRVjo= | |||
| gitlink.org.cn/JointCloud/pcm-modelarts v0.0.0-20240918011543-482dcd609877 h1:a+1FpxqLPRojlAkJlAeRhKRbxajymXYgrM+s9bfQx0E= | |||
| @@ -2,6 +2,9 @@ package schedule | |||
| import ( | |||
| "context" | |||
| "gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/service/collector" | |||
| "sync" | |||
| "time" | |||
| "gitlink.org.cn/JointCloud/pcm-coordinator/internal/svc" | |||
| "gitlink.org.cn/JointCloud/pcm-coordinator/internal/types" | |||
| @@ -25,11 +28,94 @@ func NewQueryResourcesLogic(ctx context.Context, svcCtx *svc.ServiceContext) *Qu | |||
| func (l *QueryResourcesLogic) QueryResources(req *types.QueryResourcesReq) (resp *types.QueryResourcesResp, err error) { | |||
| resp = &types.QueryResourcesResp{} | |||
| var ulist []*collector.ResourceUsage | |||
| _, err = l.svcCtx.Scheduler.AiService.AiCollectorAdapterMap[req.AdapterId]["req.ClusterId"].GetResourceUsage(l.ctx) | |||
| clusters, err := l.svcCtx.Scheduler.AiStorages.GetClustersByAdapterId(req.AdapterId) | |||
| if err != nil { | |||
| return nil, err | |||
| } | |||
| var ch = make(chan *collector.ResourceUsage, len(clusters.List)) | |||
| var wg sync.WaitGroup | |||
| for _, cluster := range clusters.List { | |||
| wg.Add(1) | |||
| c := cluster | |||
| go func() { | |||
| defer wg.Done() | |||
| ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) | |||
| defer cancel() | |||
| col, found := l.svcCtx.Scheduler.AiService.AiCollectorAdapterMap[req.AdapterId][c.Id] | |||
| if !found { | |||
| return | |||
| } | |||
| usage, err := col.GetResourceUsage(ctx) | |||
| if err != nil { | |||
| return | |||
| } | |||
| ch <- usage | |||
| }() | |||
| } | |||
| wg.Wait() | |||
| close(ch) | |||
| for v := range ch { | |||
| ulist = append(ulist, v) | |||
| } | |||
| // handle empty usage | |||
| rus := handleEmptyResourceUsage(clusters.List, ulist) | |||
| resp.Data = rus | |||
| return resp, nil | |||
| } | |||
| func handleEmptyResourceUsage(list []types.ClusterInfo, ulist []*collector.ResourceUsage) []*collector.ResourceUsage { | |||
| var rus []*collector.ResourceUsage | |||
| m := make(map[string]interface{}) | |||
| for _, u := range ulist { | |||
| if u == nil { | |||
| continue | |||
| } | |||
| m[u.ClusterId] = u | |||
| } | |||
| for _, l := range list { | |||
| s, ok := m[l.Id] | |||
| if !ok { | |||
| ru := &collector.ResourceUsage{ | |||
| ClusterId: l.Id, | |||
| Usages: nil, | |||
| Msg: "resources unavailable, please retry later", | |||
| } | |||
| rus = append(rus, ru) | |||
| } else { | |||
| if s == nil { | |||
| ru := &collector.ResourceUsage{ | |||
| ClusterId: l.Id, | |||
| Usages: nil, | |||
| Msg: "resources unavailable, please retry later", | |||
| } | |||
| rus = append(rus, ru) | |||
| } else { | |||
| r, ok := s.(*collector.ResourceUsage) | |||
| if ok { | |||
| if r.Usages == nil { | |||
| ru := &collector.ResourceUsage{ | |||
| ClusterId: r.ClusterId, | |||
| Usages: nil, | |||
| Msg: "resources unavailable, please retry later", | |||
| } | |||
| rus = append(rus, ru) | |||
| } else { | |||
| rus = append(rus, r) | |||
| } | |||
| } | |||
| } | |||
| } | |||
| } | |||
| return rus | |||
| } | |||
| @@ -20,6 +20,7 @@ type AiCollector interface { | |||
| type ResourceUsage struct { | |||
| ClusterId string | |||
| Usages []*Usage | |||
| Msg string | |||
| } | |||
| type Usage struct { | |||
| @@ -1017,6 +1017,7 @@ func (s *ShuguangAi) GetResourceUsage(ctx context.Context) (*collector.ResourceU | |||
| var usages []*collector.Usage | |||
| go func() { | |||
| defer close(ch) | |||
| queueResp, err := s.aCRpc.SelectQueueByUser(ctx, nil) | |||
| if err != nil { | |||
| wg.Done() | |||
| @@ -1062,6 +1063,7 @@ func (s *ShuguangAi) GetResourceUsage(ctx context.Context) (*collector.ResourceU | |||
| }() | |||
| go func() { | |||
| defer close(usageCh) | |||
| jobList, err := s.aCRpc.ListJob(ctx, nil) | |||
| if err != nil { | |||
| wg.Done() | |||
| @@ -1098,7 +1100,7 @@ func (s *ShuguangAi) GetResourceUsage(ctx context.Context) (*collector.ResourceU | |||
| for v := range ch { | |||
| switch v.Type { | |||
| case CPU: | |||
| case ACCPU: | |||
| cpu := &collector.Usage{ | |||
| Type: ACCPU, | |||
| Total: v.Total, | |||
| @@ -1123,8 +1125,6 @@ func (s *ShuguangAi) GetResourceUsage(ctx context.Context) (*collector.ResourceU | |||
| }() | |||
| wg.Wait() | |||
| close(ch) | |||
| close(usageCh) | |||
| if len(usageCh) == 0 { | |||
| for v := range ch { | |||