You can not select more than 25 topics Topics must start with a chinese character,a letter or number, can include dashes ('-') and can be up to 35 characters long.

compareresourcespeclogic.go 10 kB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366
  1. package core
  2. import (
  3. "context"
  4. "fmt"
  5. "github.com/mitchellh/mapstructure"
  6. "github.com/rs/zerolog/log"
  7. "gitlink.org.cn/JointCloud/pcm-coordinator/internal/logic/schedule"
  8. "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/models"
  9. "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/utils"
  10. "gorm.io/gorm"
  11. "time"
  12. "gitlink.org.cn/JointCloud/pcm-coordinator/internal/svc"
  13. "gitlink.org.cn/JointCloud/pcm-coordinator/internal/types"
  14. "github.com/zeromicro/go-zero/core/logx"
  15. )
  16. type CompareResourceSpecLogic struct {
  17. logx.Logger
  18. ctx context.Context
  19. svcCtx *svc.ServiceContext
  20. }
  21. const (
  22. ChangeTypeNormal = 0 // 资源规格正常
  23. ChangeTypeModified = 1 // 资源规格变更
  24. ChangeTypeDeleted = 2 // 资源被删除
  25. )
  26. type APIResponse struct {
  27. ClusterId string `json:"ClusterId"`
  28. ClusterType string `json:"clusterType"`
  29. Region string `json:"region"`
  30. Tag string `json:"tag"`
  31. Resources []Resource `json:"resources"`
  32. Msg string `json:"msg"`
  33. }
  34. type Resource struct {
  35. Resource ResourceDetail `json:"resource"`
  36. BaseResources []ResourceDetail `json:"baseResources"`
  37. }
  38. type ResourceDetail struct {
  39. Type string `json:"type"`
  40. Name string `json:"name"`
  41. Total Metric `json:"total"`
  42. Available Metric `json:"available"`
  43. }
  44. type Metric struct {
  45. Unit string `json:"unit"`
  46. Value float64 `json:"value"`
  47. }
  48. func NewCompareResourceSpecLogic(ctx context.Context, svcCtx *svc.ServiceContext) *CompareResourceSpecLogic {
  49. return &CompareResourceSpecLogic{
  50. Logger: logx.WithContext(ctx),
  51. ctx: ctx,
  52. svcCtx: svcCtx,
  53. }
  54. }
  55. func (l *CompareResourceSpecLogic) CompareResourceSpec(req *types.FetchResourceSpecReq) (resp *types.PageResult, err error) {
  56. if req.ClusterId == "" {
  57. return resp, nil
  58. }
  59. // 获取集群资源数据
  60. startTime := time.Now()
  61. apiResources, err := l.FetchClusterResources(req.ClusterId, req.Tag)
  62. log.Debug().Msgf("调用获取ai训练资源接口耗时: %v", time.Since(startTime))
  63. if err != nil {
  64. log.Error().Msgf("调用第三方接口获取集群资源失败: %v", err)
  65. return nil, fmt.Errorf("调用第三方接口获取集群资源失败")
  66. }
  67. // 同步资源到数据库
  68. if err := l.syncResourcesToDB(apiResources, req.UserId); err != nil {
  69. return nil, fmt.Errorf("failed to sync resources: %w", err)
  70. }
  71. return
  72. }
  73. func (l *CompareResourceSpecLogic) FetchClusterResources(clusterId string, tag string) ([]APIResponse, error) {
  74. queryLogic := schedule.NewQueryResourcesLogic(l.ctx, l.svcCtx)
  75. resources, err := queryLogic.QueryResources(&types.QueryResourcesReq{
  76. Type: tag,
  77. })
  78. if err != nil {
  79. return nil, fmt.Errorf("query resources failed: %w", err)
  80. }
  81. var apiResponses []APIResponse
  82. if err := decodeAPIResponse(resources.Data, &apiResponses); err != nil {
  83. return nil, fmt.Errorf("decode response failed: %w", err)
  84. }
  85. // 过滤出指定集群的资源
  86. var filteredResponses []APIResponse
  87. for _, response := range apiResponses {
  88. if response.ClusterId == clusterId && response.Resources != nil {
  89. filteredResponses = append(filteredResponses, response)
  90. }
  91. }
  92. if len(filteredResponses) == 0 {
  93. return nil, fmt.Errorf("no resources found for cluster ID: %s", clusterId)
  94. }
  95. return filteredResponses, nil
  96. }
  97. func decodeAPIResponse(input interface{}, output *[]APIResponse) error {
  98. config := &mapstructure.DecoderConfig{
  99. Result: output,
  100. TagName: "json",
  101. ErrorUnused: true,
  102. DecodeHook: mapstructure.ComposeDecodeHookFunc(
  103. mapstructure.StringToTimeHookFunc(time.RFC3339),
  104. mapstructure.StringToSliceHookFunc(","),
  105. ),
  106. }
  107. decoder, err := mapstructure.NewDecoder(config)
  108. if err != nil {
  109. return fmt.Errorf("failed to create decoder: %w", err)
  110. }
  111. if err := decoder.Decode(input); err != nil {
  112. return fmt.Errorf("decoding error: %w", err)
  113. }
  114. return nil
  115. }
  116. func (l *CompareResourceSpecLogic) syncResourcesToDB(apiResponses []APIResponse, userId int64) error {
  117. for _, response := range apiResponses {
  118. // 转换API响应到数据库模型
  119. dbSpecs, apiSpecs, err := l.processAPIResponse(response, userId)
  120. if err != nil {
  121. return err
  122. }
  123. // 处理资源变更
  124. if err := l.handleResourceChanges(dbSpecs, apiSpecs); err != nil {
  125. return fmt.Errorf("failed to handle resource changes: %w", err)
  126. }
  127. }
  128. return nil
  129. }
  130. func (l *CompareResourceSpecLogic) processAPIResponse(response APIResponse, userId int64) ([]models.TResourceSpec, []models.TResourceSpec, error) {
  131. ClusterId := utils.StringToInt64(response.ClusterId)
  132. var dbSpecs []models.TResourceSpec
  133. if err := l.svcCtx.DbEngin.Model(models.TResourceSpec{}).Preload("BaseResourceSpecs").
  134. Where("cluster_id = ?", ClusterId).
  135. Where("tag = ?", response.Tag).
  136. Find(&dbSpecs).Error; err != nil {
  137. return nil, nil, fmt.Errorf("database query failed: %w", err)
  138. }
  139. var apiSpecs []models.TResourceSpec
  140. for _, res := range response.Resources {
  141. // 检查资源类型和名称是否存在
  142. if res.Resource.Name == "" || res.Resource.Type == "" {
  143. continue
  144. }
  145. spec := l.convertToResourceSpec(ClusterId, res, response.Tag, userId)
  146. apiSpecs = append(apiSpecs, spec)
  147. }
  148. return dbSpecs, apiSpecs, nil
  149. }
  150. func (l *CompareResourceSpecLogic) handleResourceChanges(dbSpecs, apiSpecs []models.TResourceSpec) error {
  151. dbMap := make(map[string]models.TResourceSpec)
  152. for _, spec := range dbSpecs {
  153. key := spec.SourceKey
  154. dbMap[key] = spec
  155. }
  156. apiMap := make(map[string]models.TResourceSpec)
  157. for _, spec := range apiSpecs {
  158. key := resourceKey(spec.Type, spec.Name, spec.Tag)
  159. apiMap[key] = spec
  160. }
  161. var createSpecs []*models.TResourceSpec
  162. var modifiedIDs []int64
  163. var normalIDs []int64
  164. // 第一阶段:收集需要处理的操作
  165. for key, apiSpec := range apiMap {
  166. dbSpec, exists := dbMap[key]
  167. if !exists {
  168. // 创建资源副本避免指针重复
  169. newSpec := apiSpec
  170. // 初始化时间字段
  171. newSpec.CreateTime = time.Now()
  172. newSpec.UpdateTime = time.Now()
  173. newSpec.Tag = apiSpec.Tag
  174. createSpecs = append(createSpecs, &newSpec)
  175. continue
  176. }
  177. // 检查资源规格变更
  178. if l.isSpecChanged(dbSpec, apiSpec) {
  179. modifiedIDs = append(modifiedIDs, dbSpec.Id)
  180. } else {
  181. normalIDs = append(normalIDs, dbSpec.Id)
  182. }
  183. }
  184. // 第二阶段:批量处理数据库操作
  185. // 批量创建新资源及关联资源
  186. if len(createSpecs) > 0 {
  187. tx := l.svcCtx.DbEngin.Begin()
  188. if tx.Error != nil {
  189. return fmt.Errorf("failed to start transaction: %w", tx.Error)
  190. }
  191. // 批量插入主资源
  192. if err := tx.CreateInBatches(createSpecs, 100).Error; err != nil {
  193. tx.Rollback()
  194. return fmt.Errorf("failed to batch create resources: %w", err)
  195. }
  196. if err := tx.Commit().Error; err != nil {
  197. return fmt.Errorf("transaction commit failed: %w", err)
  198. }
  199. }
  200. // 批量更新变更资源
  201. now := time.Now()
  202. if len(modifiedIDs) > 0 {
  203. if err := l.svcCtx.DbEngin.Model(&models.TResourceSpec{}).
  204. Where("id IN ?", modifiedIDs).
  205. Updates(map[string]interface{}{
  206. "change_type": ChangeTypeModified,
  207. "update_time": now,
  208. }).Error; err != nil {
  209. return fmt.Errorf("batch update modified failed: %w", err)
  210. }
  211. }
  212. // 批量更新正常资源
  213. if len(normalIDs) > 0 {
  214. if err := l.svcCtx.DbEngin.Model(&models.TResourceSpec{}).
  215. Where("id IN ?", normalIDs).
  216. Updates(map[string]interface{}{
  217. "change_type": ChangeTypeNormal,
  218. "update_time": now,
  219. }).Error; err != nil {
  220. return fmt.Errorf("batch update normal failed: %w", err)
  221. }
  222. }
  223. // 处理删除的资源
  224. for key, dbSpec := range dbMap {
  225. if _, exists := apiMap[key]; !exists {
  226. if err := l.markResourceDeleted(dbSpec.Id); err != nil {
  227. return err
  228. }
  229. }
  230. }
  231. return nil
  232. }
  233. func resourceKey(resType, name, tag string) string {
  234. return fmt.Sprintf("%s::%s::%s", resType, name, tag)
  235. }
  236. func (l *CompareResourceSpecLogic) createNewResource(spec *models.TResourceSpec) error {
  237. return l.svcCtx.DbEngin.Transaction(func(tx *gorm.DB) error {
  238. if err := tx.Create(spec).Error; err != nil {
  239. return fmt.Errorf("failed to create resource: %w", err)
  240. }
  241. return nil
  242. })
  243. }
  244. // 标识资源规格变更
  245. func (l *CompareResourceSpecLogic) updateResource(existing *models.TResourceSpec, newSpec models.TResourceSpec, changeType int) error {
  246. return l.svcCtx.DbEngin.Transaction(func(tx *gorm.DB) error {
  247. updates := map[string]interface{}{
  248. "change_type": changeType,
  249. "update_time": time.Now(),
  250. }
  251. if err := tx.Model(existing).Updates(updates).Error; err != nil {
  252. return fmt.Errorf("failed to update resource: %w", err)
  253. }
  254. return nil
  255. })
  256. }
  257. func (l *CompareResourceSpecLogic) markResourceDeleted(id int64) error {
  258. return l.svcCtx.DbEngin.Model(&models.TResourceSpec{}).
  259. Where("id = ?", id).
  260. Update("change_type", ChangeTypeDeleted).
  261. Error
  262. }
  263. func (l *CompareResourceSpecLogic) isSpecChanged(old, new models.TResourceSpec) bool {
  264. if old.TotalCount != new.TotalCount ||
  265. old.AvailableCount != new.AvailableCount ||
  266. old.Region != new.Region {
  267. return true
  268. }
  269. // 比较基础资源
  270. oldBaseMap := make(map[string]models.TBaseResourceSpec)
  271. for _, br := range old.BaseResourceSpecs {
  272. oldBaseMap[resourceKey(br.Type, br.Name, old.Tag)] = br
  273. }
  274. for _, newBr := range new.BaseResourceSpecs {
  275. key := resourceKey(newBr.Type, newBr.Name, new.Tag)
  276. oldBr, exists := oldBaseMap[key]
  277. if !exists ||
  278. oldBr.TotalValue != newBr.TotalValue ||
  279. oldBr.AvailableValue != newBr.AvailableValue {
  280. return true
  281. }
  282. delete(oldBaseMap, key)
  283. }
  284. return len(oldBaseMap) > 0
  285. }
  286. func (l *CompareResourceSpecLogic) convertToResourceSpec(ClusterId int64, res Resource, tag string, userId int64) models.TResourceSpec {
  287. spec := models.TResourceSpec{
  288. SourceKey: resourceKey(res.Resource.Type, res.Resource.Name, tag),
  289. Type: res.Resource.Type,
  290. Name: res.Resource.Name,
  291. Tag: tag,
  292. TotalCount: int64(res.Resource.Total.Value),
  293. AvailableCount: int64(res.Resource.Available.Value),
  294. ClusterId: ClusterId,
  295. CreateTime: time.Now(),
  296. UpdateTime: time.Now(),
  297. UserId: userId,
  298. ChangeType: ChangeTypeNormal,
  299. }
  300. for _, br := range res.BaseResources {
  301. spec.BaseResourceSpecs = append(spec.BaseResourceSpecs, models.TBaseResourceSpec{
  302. Type: br.Type,
  303. Name: br.Name,
  304. TotalValue: br.Total.Value,
  305. TotalUnit: br.Total.Unit,
  306. AvailableValue: br.Available.Value,
  307. AvailableUnit: br.Available.Unit,
  308. UserId: userId,
  309. CreateTime: time.Now(),
  310. UpdateTime: time.Now(),
  311. })
  312. }
  313. return spec
  314. }

PCM is positioned as Software stack over Cloud, aiming to build the standards and ecology of heterogeneous cloud collaboration for JCC in a non intrusive and autonomous peer-to-peer manner.