package schedule import ( "context" "errors" "github.com/zeromicro/go-zero/core/logx" "gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/service/collector" "gitlink.org.cn/JointCloud/pcm-coordinator/internal/storeLink" "gitlink.org.cn/JointCloud/pcm-coordinator/internal/svc" "gitlink.org.cn/JointCloud/pcm-coordinator/internal/types" "strconv" "strings" "sync" "time" ) const ( ADAPTERID = "1777144940459986944" // 异构适配器id QUERY_TRAIN_RESOURCES = "train_resources" QUERY_INFERENCE_RESOURCES = "inference_resources" ) type QueryResourcesLogic struct { logx.Logger ctx context.Context svcCtx *svc.ServiceContext } func NewQueryResourcesLogic(ctx context.Context, svcCtx *svc.ServiceContext) *QueryResourcesLogic { return &QueryResourcesLogic{ Logger: logx.WithContext(ctx), ctx: ctx, svcCtx: svcCtx, } } func (l *QueryResourcesLogic) QueryResources(req *types.QueryResourcesReq) (resp *types.QueryResourcesResp, err error) { resp = &types.QueryResourcesResp{} if len(req.ClusterIDs) == 0 { cs, err := l.svcCtx.Scheduler.AiStorages.GetClustersByAdapterId(ADAPTERID) if err != nil { return nil, err } var resources interface{} switch req.Type { case "Train": resources, _ = l.svcCtx.Scheduler.AiService.LocalCache[QUERY_TRAIN_RESOURCES] case "Inference": resources, _ = l.svcCtx.Scheduler.AiService.LocalCache[QUERY_INFERENCE_RESOURCES] default: resources, _ = l.svcCtx.Scheduler.AiService.LocalCache[QUERY_TRAIN_RESOURCES] } specs, ok := resources.([]*collector.ResourceSpec) if ok { results := handleEmptyResourceUsage(cs.List, specs) resp.Data = results return resp, nil } rus, err := l.QueryResourcesByClusterId(cs.List, req.Type) if err != nil { return nil, err } results := handleEmptyResourceUsage(cs.List, rus) resp.Data = results } else { var clusters []types.ClusterInfo for _, id := range req.ClusterIDs { cluster, err := l.svcCtx.Scheduler.AiStorages.GetClustersById(id) if err != nil { return nil, err } clusters = append(clusters, *cluster) } if len(clusters) == 0 { return nil, errors.New("no clusters found ") } rus, err := l.QueryResourcesByClusterId(clusters, req.Type) if err != nil { return nil, err } results := handleEmptyResourceUsage(clusters, rus) resp.Data = results } return resp, nil } func (l *QueryResourcesLogic) QueryResourcesByClusterId(clusterinfos []types.ClusterInfo, resrcType string) ([]*collector.ResourceSpec, error) { var clusters []types.ClusterInfo if len(clusterinfos) == 0 { cs, err := l.svcCtx.Scheduler.AiStorages.GetClustersByAdapterId(ADAPTERID) if err != nil { return nil, err } clusters = cs.List } else { clusters = clusterinfos } var ulist []*collector.ResourceSpec var ch = make(chan *collector.ResourceSpec, len(clusters)) var wg sync.WaitGroup for _, cluster := range clusters { wg.Add(1) c := cluster go func() { defer wg.Done() done := make(chan bool) var u *collector.ResourceSpec var err error go func() { col, found := l.svcCtx.Scheduler.AiService.AiCollectorAdapterMap[strconv.FormatInt(c.AdapterId, 10)][c.Id] if !found { done <- true return } u, err = col.GetResourceSpecs(l.ctx, resrcType) if err != nil { done <- true return } done <- true }() select { case <-done: if u != nil { ch <- u } case <-time.After(10 * time.Second): return } }() } wg.Wait() close(ch) for v := range ch { ulist = append(ulist, v) } return ulist, nil } func handleEmptyResourceUsage(list []types.ClusterInfo, ulist []*collector.ResourceSpec) []*collector.ResourceSpec { var rus []*collector.ResourceSpec m := make(map[string]interface{}) for _, u := range ulist { if u == nil { continue } m[u.ClusterId] = u } for _, l := range list { s, ok := m[l.Id] if !ok { ru := &collector.ResourceSpec{ ClusterId: l.Id, Resources: nil, Msg: "resources unavailable, please retry later", } rus = append(rus, ru) } else { if s == nil { ru := &collector.ResourceSpec{ ClusterId: l.Id, Resources: nil, Msg: "resources unavailable, please retry later", } rus = append(rus, ru) } else { r, ok := s.(*collector.ResourceSpec) if ok { if r.Resources == nil || len(r.Resources) == 0 { ru := &collector.ResourceSpec{ ClusterId: r.ClusterId, Resources: nil, Msg: "resources unavailable, please retry later", } rus = append(rus, ru) } else { // add cluster type t, ok := storeLink.ClusterTypeMap[strings.Title(l.Name)] if ok { r.ClusterType = t } rus = append(rus, r) } } } } } return rus } func checkCachingCondition(clusters []types.ClusterInfo, specs []*collector.ResourceSpec) bool { var count int for _, spec := range specs { if spec.Resources != nil { count++ } } if count == len(clusters) { return true } return false }