package core import ( "context" "fmt" "github.com/mitchellh/mapstructure" "github.com/rs/zerolog/log" "gitlink.org.cn/JointCloud/pcm-coordinator/internal/logic/schedule" "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/models" "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/utils" "gorm.io/gorm" "time" "gitlink.org.cn/JointCloud/pcm-coordinator/internal/svc" "gitlink.org.cn/JointCloud/pcm-coordinator/internal/types" "github.com/zeromicro/go-zero/core/logx" ) type CompareResourceSpecLogic struct { logx.Logger ctx context.Context svcCtx *svc.ServiceContext } const ( ChangeTypeNormal = 0 // 资源规格正常 ChangeTypeModified = 1 // 资源规格变更 ChangeTypeDeleted = 2 // 资源被删除 ) type APIResponse struct { ClusterId string `json:"ClusterId"` ClusterType string `json:"clusterType"` Region string `json:"region"` Resources []Resource `json:"resources"` Msg string `json:"msg"` } type Resource struct { Resource ResourceDetail `json:"resource"` BaseResources []ResourceDetail `json:"baseResources"` } type ResourceDetail struct { Type string `json:"type"` Name string `json:"name"` Total Metric `json:"total"` Available Metric `json:"available"` } type Metric struct { Unit string `json:"unit"` Value float64 `json:"value"` } func NewCompareResourceSpecLogic(ctx context.Context, svcCtx *svc.ServiceContext) *CompareResourceSpecLogic { return &CompareResourceSpecLogic{ Logger: logx.WithContext(ctx), ctx: ctx, svcCtx: svcCtx, } } func (l *CompareResourceSpecLogic) CompareResourceSpec(req *types.ResourceSpecReq) (resp *types.PageResult, err error) { log.Debug().Msgf("开始比较资源规格,ClusterId: %s, PageNum: %d, PageSize: %d", req.ClusterId, req.PageNum, req.PageSize) if req.ClusterId == "" { return resp, nil } // 获取集群资源数据 startTime := time.Now() apiResources, err := l.FetchClusterResources(req.ClusterId) log.Debug().Msgf("调用获取ai训练资源接口耗时: %v", time.Since(startTime)) if err != nil { return nil, fmt.Errorf("failed to fetch cluster resources: %w", err) } // 同步资源到数据库 if err := l.syncResourcesToDB(apiResources); err != nil { return nil, fmt.Errorf("failed to sync resources: %w", err) } // 查询数据库结果 return l.queryDatabaseResults(req) } func (l *CompareResourceSpecLogic) FetchClusterResources(clusterId string) ([]APIResponse, error) { queryLogic := schedule.NewQueryResourcesLogic(l.ctx, l.svcCtx) resources, err := queryLogic.QueryResources(&types.QueryResourcesReq{}) if err != nil { return nil, fmt.Errorf("query resources failed: %w", err) } var apiResponses []APIResponse if err := decodeAPIResponse(resources.Data, &apiResponses); err != nil { return nil, fmt.Errorf("decode response failed: %w", err) } // 过滤出指定集群的资源 var filteredResponses []APIResponse for _, response := range apiResponses { if response.ClusterId == clusterId && response.Resources != nil { filteredResponses = append(filteredResponses, response) } } if len(filteredResponses) == 0 { return nil, fmt.Errorf("no resources found for cluster ID: %s", clusterId) } return filteredResponses, nil } func decodeAPIResponse(input interface{}, output *[]APIResponse) error { config := &mapstructure.DecoderConfig{ Result: output, TagName: "json", ErrorUnused: true, DecodeHook: mapstructure.ComposeDecodeHookFunc( mapstructure.StringToTimeHookFunc(time.RFC3339), mapstructure.StringToSliceHookFunc(","), ), } decoder, err := mapstructure.NewDecoder(config) if err != nil { return fmt.Errorf("failed to create decoder: %w", err) } if err := decoder.Decode(input); err != nil { return fmt.Errorf("decoding error: %w", err) } return nil } func (l *CompareResourceSpecLogic) syncResourcesToDB(apiResponses []APIResponse) error { for _, response := range apiResponses { // 转换API响应到数据库模型 dbSpecs, apiSpecs, err := l.processAPIResponse(response) if err != nil { return err } // 处理资源变更 if err := l.handleResourceChanges(dbSpecs, apiSpecs); err != nil { return fmt.Errorf("failed to handle resource changes: %w", err) } } return nil } func (l *CompareResourceSpecLogic) processAPIResponse(response APIResponse) ([]models.TResourceSpec, []models.TResourceSpec, error) { ClusterId := utils.StringToInt64(response.ClusterId) var dbSpecs []models.TResourceSpec if err := l.svcCtx.DbEngin.Model(models.TResourceSpec{}).Preload("BaseResourceSpecs"). Where("cluster_id = ?", ClusterId). Find(&dbSpecs).Error; err != nil { return nil, nil, fmt.Errorf("database query failed: %w", err) } var apiSpecs []models.TResourceSpec for _, res := range response.Resources { // 检查资源类型和名称是否存在 if res.Resource.Name == "" || res.Resource.Type == "" { continue } spec := l.convertToResourceSpec(ClusterId, res) apiSpecs = append(apiSpecs, spec) } return dbSpecs, apiSpecs, nil } func (l *CompareResourceSpecLogic) handleResourceChanges(dbSpecs, apiSpecs []models.TResourceSpec) error { dbMap := make(map[string]models.TResourceSpec) for _, spec := range dbSpecs { key := spec.SourceKey dbMap[key] = spec } apiMap := make(map[string]models.TResourceSpec) for _, spec := range apiSpecs { key := resourceKey(spec.Type, spec.Name) apiMap[key] = spec } var createSpecs []*models.TResourceSpec var modifiedIDs []int64 var normalIDs []int64 // 第一阶段:收集需要处理的操作 for key, apiSpec := range apiMap { dbSpec, exists := dbMap[key] if !exists { // 创建资源副本避免指针重复 newSpec := apiSpec // 初始化时间字段 newSpec.CreateTime = time.Now() newSpec.UpdateTime = time.Now() createSpecs = append(createSpecs, &newSpec) continue } // 检查资源规格变更 if l.isSpecChanged(dbSpec, apiSpec) { modifiedIDs = append(modifiedIDs, dbSpec.Id) } else { normalIDs = append(normalIDs, dbSpec.Id) } } // 第二阶段:批量处理数据库操作 // 批量创建新资源及关联资源 if len(createSpecs) > 0 { tx := l.svcCtx.DbEngin.Begin() if tx.Error != nil { return fmt.Errorf("failed to start transaction: %w", tx.Error) } // 批量插入主资源 if err := tx.CreateInBatches(createSpecs, 100).Error; err != nil { tx.Rollback() return fmt.Errorf("failed to batch create resources: %w", err) } // 准备关联资源数据 var baseResources []models.TBaseResourceSpec for _, spec := range createSpecs { for i := range spec.BaseResourceSpecs { br := &spec.BaseResourceSpecs[i] br.ResourceSpecId = spec.Id br.CreateTime = time.Now() br.UpdateTime = time.Now() baseResources = append(baseResources, *br) } } // 批量插入关联资源 if len(baseResources) > 0 { if err := tx.CreateInBatches(baseResources, 100).Error; err != nil { tx.Rollback() return fmt.Errorf("failed to batch create base resources: %w", err) } } if err := tx.Commit().Error; err != nil { return fmt.Errorf("transaction commit failed: %w", err) } } // 批量更新变更资源 now := time.Now() if len(modifiedIDs) > 0 { if err := l.svcCtx.DbEngin.Model(&models.TResourceSpec{}). Where("id IN ?", modifiedIDs). Updates(map[string]interface{}{ "change_type": ChangeTypeModified, "update_time": now, }).Error; err != nil { return fmt.Errorf("batch update modified failed: %w", err) } } // 批量更新正常资源 if len(normalIDs) > 0 { if err := l.svcCtx.DbEngin.Model(&models.TResourceSpec{}). Where("id IN ?", normalIDs). Updates(map[string]interface{}{ "change_type": ChangeTypeNormal, "update_time": now, }).Error; err != nil { return fmt.Errorf("batch update normal failed: %w", err) } } // 处理删除的资源 for key, dbSpec := range dbMap { if _, exists := apiMap[key]; !exists { if err := l.markResourceDeleted(dbSpec.Id); err != nil { return err } } } return nil } func resourceKey(resType, name string) string { return fmt.Sprintf("%s::%s", resType, name) } func (l *CompareResourceSpecLogic) createNewResource(spec *models.TResourceSpec) error { return l.svcCtx.DbEngin.Transaction(func(tx *gorm.DB) error { if err := tx.Create(spec).Error; err != nil { return fmt.Errorf("failed to create resource: %w", err) } return nil }) } // 标识资源规格变更 func (l *CompareResourceSpecLogic) updateResource(existing *models.TResourceSpec, newSpec models.TResourceSpec, changeType int) error { return l.svcCtx.DbEngin.Transaction(func(tx *gorm.DB) error { updates := map[string]interface{}{ "change_type": changeType, "update_time": time.Now(), } if err := tx.Model(existing).Updates(updates).Error; err != nil { return fmt.Errorf("failed to update resource: %w", err) } return nil }) } func (l *CompareResourceSpecLogic) markResourceDeleted(id int64) error { return l.svcCtx.DbEngin.Model(&models.TResourceSpec{}). Where("id = ?", id). Update("change_type", ChangeTypeDeleted). Error } func (l *CompareResourceSpecLogic) isSpecChanged(old, new models.TResourceSpec) bool { if old.TotalCount != new.TotalCount || old.AvailableCount != new.AvailableCount || old.Region != new.Region { return true } // 比较基础资源 oldBaseMap := make(map[string]models.TBaseResourceSpec) for _, br := range old.BaseResourceSpecs { oldBaseMap[resourceKey(br.Type, br.Name)] = br } for _, newBr := range new.BaseResourceSpecs { key := resourceKey(newBr.Type, newBr.Name) oldBr, exists := oldBaseMap[key] if !exists || oldBr.TotalValue != newBr.TotalValue || oldBr.AvailableValue != newBr.AvailableValue { return true } delete(oldBaseMap, key) } return len(oldBaseMap) > 0 } func (l *CompareResourceSpecLogic) convertToResourceSpec(ClusterId int64, res Resource) models.TResourceSpec { spec := models.TResourceSpec{ SourceKey: resourceKey(res.Resource.Type, res.Resource.Name), Type: res.Resource.Type, Name: res.Resource.Name, TotalCount: int64(res.Resource.Total.Value), AvailableCount: int64(res.Resource.Available.Value), ClusterId: ClusterId, CreateTime: time.Now(), UpdateTime: time.Now(), ChangeType: ChangeTypeNormal, } for _, br := range res.BaseResources { spec.BaseResourceSpecs = append(spec.BaseResourceSpecs, models.TBaseResourceSpec{ Type: br.Type, Name: br.Name, TotalValue: br.Total.Value, TotalUnit: br.Total.Unit, AvailableValue: br.Available.Value, AvailableUnit: br.Available.Unit, CreateTime: time.Now(), UpdateTime: time.Now(), }) } return spec } func (l *CompareResourceSpecLogic) queryDatabaseResults(req *types.ResourceSpecReq) (*types.PageResult, error) { result := &types.PageResult{ PageNum: req.PageNum, PageSize: req.PageSize, } query := l.buildBaseQuery(req) if err := query.Count(&result.Total).Error; err != nil { return nil, fmt.Errorf("failed to count records: %w", err) } var specs []*models.TResourceSpec if err := query.Model(models.TResourceSpec{}).Preload("BaseResourceSpecs"). Scopes(paginate(req.PageNum, req.PageSize)). Find(&specs).Error; err != nil { return nil, fmt.Errorf("failed to query resources: %w", err) } result.List = specs return result, nil } func (l *CompareResourceSpecLogic) buildBaseQuery(req *types.ResourceSpecReq) *gorm.DB { query := l.svcCtx.DbEngin.Model(&models.TResourceSpec{}). Where("cluster_id = ?", utils.StringToInt64(req.ClusterId)). Where("deleted_at IS NULL") if req.Status != "" { query = query.Where("status = ?", req.Status) } if req.ChangeType != "" { query = query.Where("change_type = ?", req.ChangeType) } if req.Type != "" { query = query.Where("type = ?", req.Type) } if req.Name != "" { query = query.Where("name LIKE ?", "%"+req.Name+"%") } return query } func paginate(pageNum, pageSize int) func(db *gorm.DB) *gorm.DB { return func(db *gorm.DB) *gorm.DB { offset := (pageNum - 1) * pageSize return db.Offset(offset).Limit(pageSize).Order("create_time DESC") } }