package core import ( "context" "fmt" "github.com/pkg/errors" "github.com/rs/zerolog/log" "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/models" "gorm.io/gorm" "strconv" "time" "gitlink.org.cn/JointCloud/pcm-coordinator/internal/svc" "gitlink.org.cn/JointCloud/pcm-coordinator/internal/types" "github.com/zeromicro/go-zero/core/logx" ) type SyncResourceSpecLogic struct { logx.Logger ctx context.Context svcCtx *svc.ServiceContext } func NewSyncResourceSpecLogic(ctx context.Context, svcCtx *svc.ServiceContext) *SyncResourceSpecLogic { return &SyncResourceSpecLogic{ Logger: logx.WithContext(ctx), ctx: ctx, svcCtx: svcCtx, } } func (l *SyncResourceSpecLogic) SyncResourceSpec(req *types.SyncResourceReq) (resp *types.CommonResp, err error) { var mainSpec models.TResourceSpec if err := l.svcCtx.DbEngin.Where("id = ? AND deleted_at IS NULL", req.Id). First(&mainSpec). Error; err != nil { if errors.Is(err, gorm.ErrRecordNotFound) { return nil, errors.Errorf("资源规格不存在 (ID: %s)", req.Id) } return nil, errors.Wrapf(err, "查询资源规格失败 (ID: %s)", req.Id) } // 获取集群资源数据 startTime := time.Now() compareLogic := NewCompareResourceSpecLogic(l.ctx, l.svcCtx) apiResources, err := compareLogic.FetchClusterResources(strconv.FormatInt(mainSpec.ClusterId, 10), mainSpec.Tag) log.Debug().Msgf("调用获取ai训练资源接口耗时: %v", time.Since(startTime)) if err != nil { log.Error().Err(err).Msg("同步集群资源失败") return nil, fmt.Errorf("同步集群资源失败,请稍后重试") } for _, response := range apiResources { // 转换API响应到数据库模型 _, apiSpecs, err := compareLogic.processAPIResponse(response, req.UserId) if err != nil { return nil, err } // 同步资源到数据库 for _, spec := range apiSpecs { if spec.SourceKey == mainSpec.SourceKey { err := l.updateResource(&mainSpec, spec) if err != nil { return nil, err } } } } return nil, nil } func (l *SyncResourceSpecLogic) updateResource(existing *models.TResourceSpec, newSpec models.TResourceSpec) error { return l.svcCtx.DbEngin.Transaction(func(tx *gorm.DB) error { updates := map[string]interface{}{ "type": newSpec.Type, "total_count": newSpec.TotalCount, "available_count": newSpec.AvailableCount, "change_type": ChangeTypeNormal, "update_time": time.Now(), } if err := tx.Model(existing).Updates(updates).Error; err != nil { return fmt.Errorf("failed to update resource: %w", err) } return l.syncBaseResources(tx, existing.Id, newSpec.BaseResourceSpecs, newSpec.Tag) }) } func (l *SyncResourceSpecLogic) syncBaseResources(tx *gorm.DB, specID int64, newResources []models.TBaseResourceSpec, tag string) error { // 处理基础资源更新 var existingResources []models.TBaseResourceSpec if err := tx.Where("resource_spec_id = ?", specID).Find(&existingResources).Error; err != nil { return fmt.Errorf("failed to query base resources: %w", err) } existingMap := make(map[string]models.TBaseResourceSpec) for _, r := range existingResources { key := resourceKey(r.Type, r.Name, tag) existingMap[key] = r } // 处理更新和新增 for i, newRes := range newResources { newRes.ResourceSpecId = specID key := resourceKey(newRes.Type, newRes.Name, tag) if existing, exists := existingMap[key]; exists { newRes.Id = existing.Id newRes.CreateTime = existing.CreateTime if err := tx.Save(&newRes).Error; err != nil { return fmt.Errorf("failed to update base resource: %w", err) } } else { if err := tx.Create(&newRes).Error; err != nil { return fmt.Errorf("failed to create base resource: %w", err) } } newResources[i] = newRes } // 处理删除 currentIDs := make(map[int64]struct{}) for _, r := range newResources { currentIDs[r.Id] = struct{}{} } for _, existing := range existingResources { if _, exists := currentIDs[existing.Id]; !exists { if err := tx.Delete(&existing).Error; err != nil { return fmt.Errorf("failed to delete base resource: %w", err) } } } return nil }