You can not select more than 25 topics Topics must start with a chinese character,a letter or number, can include dashes ('-') and can be up to 35 characters long.

syncresourcespeclogic.go 4.2 kB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136
  1. package core
  2. import (
  3. "context"
  4. "fmt"
  5. "github.com/pkg/errors"
  6. "github.com/rs/zerolog/log"
  7. "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/models"
  8. "gorm.io/gorm"
  9. "strconv"
  10. "time"
  11. "gitlink.org.cn/JointCloud/pcm-coordinator/internal/svc"
  12. "gitlink.org.cn/JointCloud/pcm-coordinator/internal/types"
  13. "github.com/zeromicro/go-zero/core/logx"
  14. )
  15. type SyncResourceSpecLogic struct {
  16. logx.Logger
  17. ctx context.Context
  18. svcCtx *svc.ServiceContext
  19. }
  20. func NewSyncResourceSpecLogic(ctx context.Context, svcCtx *svc.ServiceContext) *SyncResourceSpecLogic {
  21. return &SyncResourceSpecLogic{
  22. Logger: logx.WithContext(ctx),
  23. ctx: ctx,
  24. svcCtx: svcCtx,
  25. }
  26. }
  27. func (l *SyncResourceSpecLogic) SyncResourceSpec(req *types.SyncResourceReq) (resp *types.CommonResp, err error) {
  28. var mainSpec models.TResourceSpec
  29. if err := l.svcCtx.DbEngin.Where("id = ? AND deleted_at IS NULL", req.Id).
  30. First(&mainSpec).
  31. Error; err != nil {
  32. if errors.Is(err, gorm.ErrRecordNotFound) {
  33. return nil, errors.Errorf("资源规格不存在 (ID: %s)", req.Id)
  34. }
  35. return nil, errors.Wrapf(err, "查询资源规格失败 (ID: %s)", req.Id)
  36. }
  37. // 获取集群资源数据
  38. startTime := time.Now()
  39. compareLogic := NewCompareResourceSpecLogic(l.ctx, l.svcCtx)
  40. apiResources, err := compareLogic.FetchClusterResources(strconv.FormatInt(mainSpec.ClusterId, 10), mainSpec.Tag)
  41. log.Debug().Msgf("调用获取ai训练资源接口耗时: %v", time.Since(startTime))
  42. if err != nil {
  43. log.Error().Err(err).Msg("同步集群资源失败")
  44. return nil, fmt.Errorf("同步集群资源失败,请稍后重试")
  45. }
  46. for _, response := range apiResources {
  47. // 转换API响应到数据库模型
  48. _, apiSpecs, err := compareLogic.processAPIResponse(response, req.UserId)
  49. if err != nil {
  50. return nil, err
  51. }
  52. // 同步资源到数据库
  53. for _, spec := range apiSpecs {
  54. if spec.SourceKey == mainSpec.SourceKey {
  55. err := l.updateResource(&mainSpec, spec)
  56. if err != nil {
  57. return nil, err
  58. }
  59. }
  60. }
  61. }
  62. return nil, nil
  63. }
  64. func (l *SyncResourceSpecLogic) updateResource(existing *models.TResourceSpec, newSpec models.TResourceSpec) error {
  65. return l.svcCtx.DbEngin.Transaction(func(tx *gorm.DB) error {
  66. updates := map[string]interface{}{
  67. "type": newSpec.Type,
  68. "total_count": newSpec.TotalCount,
  69. "available_count": newSpec.AvailableCount,
  70. "change_type": ChangeTypeNormal,
  71. "update_time": time.Now(),
  72. }
  73. if err := tx.Model(existing).Updates(updates).Error; err != nil {
  74. return fmt.Errorf("failed to update resource: %w", err)
  75. }
  76. return l.syncBaseResources(tx, existing.Id, newSpec.BaseResourceSpecs, newSpec.Tag)
  77. })
  78. }
  79. func (l *SyncResourceSpecLogic) syncBaseResources(tx *gorm.DB, specID int64, newResources []models.TBaseResourceSpec, tag string) error {
  80. // 处理基础资源更新
  81. var existingResources []models.TBaseResourceSpec
  82. if err := tx.Where("resource_spec_id = ?", specID).Find(&existingResources).Error; err != nil {
  83. return fmt.Errorf("failed to query base resources: %w", err)
  84. }
  85. existingMap := make(map[string]models.TBaseResourceSpec)
  86. for _, r := range existingResources {
  87. key := resourceKey(r.Type, r.Name, tag)
  88. existingMap[key] = r
  89. }
  90. // 处理更新和新增
  91. for i, newRes := range newResources {
  92. newRes.ResourceSpecId = specID
  93. key := resourceKey(newRes.Type, newRes.Name, tag)
  94. if existing, exists := existingMap[key]; exists {
  95. newRes.Id = existing.Id
  96. newRes.CreateTime = existing.CreateTime
  97. if err := tx.Save(&newRes).Error; err != nil {
  98. return fmt.Errorf("failed to update base resource: %w", err)
  99. }
  100. } else {
  101. if err := tx.Create(&newRes).Error; err != nil {
  102. return fmt.Errorf("failed to create base resource: %w", err)
  103. }
  104. }
  105. newResources[i] = newRes
  106. }
  107. // 处理删除
  108. currentIDs := make(map[int64]struct{})
  109. for _, r := range newResources {
  110. currentIDs[r.Id] = struct{}{}
  111. }
  112. for _, existing := range existingResources {
  113. if _, exists := currentIDs[existing.Id]; !exists {
  114. if err := tx.Delete(&existing).Error; err != nil {
  115. return fmt.Errorf("failed to delete base resource: %w", err)
  116. }
  117. }
  118. }
  119. return nil
  120. }

PCM is positioned as Software stack over Cloud, aiming to build the standards and ecology of heterogeneous cloud collaboration for JCC in a non intrusive and autonomous peer-to-peer manner.