You can not select more than 25 topics Topics must start with a chinese character,a letter or number, can include dashes ('-') and can be up to 35 characters long.

compareresourcespeclogic.go 12 kB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428
  1. package core
  2. import (
  3. "context"
  4. "fmt"
  5. "github.com/mitchellh/mapstructure"
  6. "github.com/rs/zerolog/log"
  7. "gitlink.org.cn/JointCloud/pcm-coordinator/internal/logic/schedule"
  8. "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/models"
  9. "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/utils"
  10. "gorm.io/gorm"
  11. "time"
  12. "gitlink.org.cn/JointCloud/pcm-coordinator/internal/svc"
  13. "gitlink.org.cn/JointCloud/pcm-coordinator/internal/types"
  14. "github.com/zeromicro/go-zero/core/logx"
  15. )
  16. type CompareResourceSpecLogic struct {
  17. logx.Logger
  18. ctx context.Context
  19. svcCtx *svc.ServiceContext
  20. }
  21. const (
  22. ChangeTypeNormal = 0 // 资源规格正常
  23. ChangeTypeModified = 1 // 资源规格变更
  24. ChangeTypeDeleted = 2 // 资源被删除
  25. )
  26. type APIResponse struct {
  27. ClusterId string `json:"ClusterId"`
  28. ClusterType string `json:"clusterType"`
  29. Region string `json:"region"`
  30. Resources []Resource `json:"resources"`
  31. Msg string `json:"msg"`
  32. }
  33. type Resource struct {
  34. Resource ResourceDetail `json:"resource"`
  35. BaseResources []ResourceDetail `json:"baseResources"`
  36. }
  37. type ResourceDetail struct {
  38. Type string `json:"type"`
  39. Name string `json:"name"`
  40. Total Metric `json:"total"`
  41. Available Metric `json:"available"`
  42. }
  43. type Metric struct {
  44. Unit string `json:"unit"`
  45. Value float64 `json:"value"`
  46. }
  47. func NewCompareResourceSpecLogic(ctx context.Context, svcCtx *svc.ServiceContext) *CompareResourceSpecLogic {
  48. return &CompareResourceSpecLogic{
  49. Logger: logx.WithContext(ctx),
  50. ctx: ctx,
  51. svcCtx: svcCtx,
  52. }
  53. }
  54. func (l *CompareResourceSpecLogic) CompareResourceSpec(req *types.ResourceSpecReq) (resp *types.PageResult, err error) {
  55. log.Debug().Msgf("开始比较资源规格,ClusterId: %s, PageNum: %d, PageSize: %d", req.ClusterId, req.PageNum, req.PageSize)
  56. if req.ClusterId == "" {
  57. return resp, nil
  58. }
  59. // 获取集群资源数据
  60. startTime := time.Now()
  61. apiResources, err := l.FetchClusterResources(req.ClusterId)
  62. log.Debug().Msgf("调用获取ai训练资源接口耗时: %v", time.Since(startTime))
  63. if err != nil {
  64. return nil, fmt.Errorf("failed to fetch cluster resources: %w", err)
  65. }
  66. // 同步资源到数据库
  67. if err := l.syncResourcesToDB(apiResources); err != nil {
  68. return nil, fmt.Errorf("failed to sync resources: %w", err)
  69. }
  70. // 查询数据库结果
  71. return l.queryDatabaseResults(req)
  72. }
  73. func (l *CompareResourceSpecLogic) FetchClusterResources(clusterId string) ([]APIResponse, error) {
  74. queryLogic := schedule.NewQueryResourcesLogic(l.ctx, l.svcCtx)
  75. resources, err := queryLogic.QueryResources(&types.QueryResourcesReq{})
  76. if err != nil {
  77. return nil, fmt.Errorf("query resources failed: %w", err)
  78. }
  79. var apiResponses []APIResponse
  80. if err := decodeAPIResponse(resources.Data, &apiResponses); err != nil {
  81. return nil, fmt.Errorf("decode response failed: %w", err)
  82. }
  83. // 过滤出指定集群的资源
  84. var filteredResponses []APIResponse
  85. for _, response := range apiResponses {
  86. if response.ClusterId == clusterId && response.Resources != nil {
  87. filteredResponses = append(filteredResponses, response)
  88. }
  89. }
  90. if len(filteredResponses) == 0 {
  91. return nil, fmt.Errorf("no resources found for cluster ID: %s", clusterId)
  92. }
  93. return filteredResponses, nil
  94. }
  95. func decodeAPIResponse(input interface{}, output *[]APIResponse) error {
  96. config := &mapstructure.DecoderConfig{
  97. Result: output,
  98. TagName: "json",
  99. ErrorUnused: true,
  100. DecodeHook: mapstructure.ComposeDecodeHookFunc(
  101. mapstructure.StringToTimeHookFunc(time.RFC3339),
  102. mapstructure.StringToSliceHookFunc(","),
  103. ),
  104. }
  105. decoder, err := mapstructure.NewDecoder(config)
  106. if err != nil {
  107. return fmt.Errorf("failed to create decoder: %w", err)
  108. }
  109. if err := decoder.Decode(input); err != nil {
  110. return fmt.Errorf("decoding error: %w", err)
  111. }
  112. return nil
  113. }
  114. func (l *CompareResourceSpecLogic) syncResourcesToDB(apiResponses []APIResponse) error {
  115. for _, response := range apiResponses {
  116. // 转换API响应到数据库模型
  117. dbSpecs, apiSpecs, err := l.processAPIResponse(response)
  118. if err != nil {
  119. return err
  120. }
  121. // 处理资源变更
  122. if err := l.handleResourceChanges(dbSpecs, apiSpecs); err != nil {
  123. return fmt.Errorf("failed to handle resource changes: %w", err)
  124. }
  125. }
  126. return nil
  127. }
  128. func (l *CompareResourceSpecLogic) processAPIResponse(response APIResponse) ([]models.TResourceSpec, []models.TResourceSpec, error) {
  129. ClusterId := utils.StringToInt64(response.ClusterId)
  130. var dbSpecs []models.TResourceSpec
  131. if err := l.svcCtx.DbEngin.Model(models.TResourceSpec{}).Preload("BaseResourceSpecs").
  132. Where("cluster_id = ?", ClusterId).
  133. Find(&dbSpecs).Error; err != nil {
  134. return nil, nil, fmt.Errorf("database query failed: %w", err)
  135. }
  136. var apiSpecs []models.TResourceSpec
  137. for _, res := range response.Resources {
  138. // 检查资源类型和名称是否存在
  139. if res.Resource.Name == "" || res.Resource.Type == "" {
  140. continue
  141. }
  142. spec := l.convertToResourceSpec(ClusterId, res)
  143. apiSpecs = append(apiSpecs, spec)
  144. }
  145. return dbSpecs, apiSpecs, nil
  146. }
  147. func (l *CompareResourceSpecLogic) handleResourceChanges(dbSpecs, apiSpecs []models.TResourceSpec) error {
  148. dbMap := make(map[string]models.TResourceSpec)
  149. for _, spec := range dbSpecs {
  150. key := spec.SourceKey
  151. dbMap[key] = spec
  152. }
  153. apiMap := make(map[string]models.TResourceSpec)
  154. for _, spec := range apiSpecs {
  155. key := resourceKey(spec.Type, spec.Name)
  156. apiMap[key] = spec
  157. }
  158. var createSpecs []*models.TResourceSpec
  159. var modifiedIDs []int64
  160. var normalIDs []int64
  161. // 第一阶段:收集需要处理的操作
  162. for key, apiSpec := range apiMap {
  163. dbSpec, exists := dbMap[key]
  164. if !exists {
  165. // 创建资源副本避免指针重复
  166. newSpec := apiSpec
  167. // 初始化时间字段
  168. newSpec.CreateTime = time.Now()
  169. newSpec.UpdateTime = time.Now()
  170. createSpecs = append(createSpecs, &newSpec)
  171. continue
  172. }
  173. // 检查资源规格变更
  174. if l.isSpecChanged(dbSpec, apiSpec) {
  175. modifiedIDs = append(modifiedIDs, dbSpec.Id)
  176. } else {
  177. normalIDs = append(normalIDs, dbSpec.Id)
  178. }
  179. }
  180. // 第二阶段:批量处理数据库操作
  181. // 批量创建新资源及关联资源
  182. if len(createSpecs) > 0 {
  183. tx := l.svcCtx.DbEngin.Begin()
  184. if tx.Error != nil {
  185. return fmt.Errorf("failed to start transaction: %w", tx.Error)
  186. }
  187. // 批量插入主资源
  188. if err := tx.CreateInBatches(createSpecs, 100).Error; err != nil {
  189. tx.Rollback()
  190. return fmt.Errorf("failed to batch create resources: %w", err)
  191. }
  192. // 准备关联资源数据
  193. var baseResources []models.TBaseResourceSpec
  194. for _, spec := range createSpecs {
  195. for i := range spec.BaseResourceSpecs {
  196. br := &spec.BaseResourceSpecs[i]
  197. br.ResourceSpecId = spec.Id
  198. br.CreateTime = time.Now()
  199. br.UpdateTime = time.Now()
  200. baseResources = append(baseResources, *br)
  201. }
  202. }
  203. // 批量插入关联资源
  204. if len(baseResources) > 0 {
  205. if err := tx.CreateInBatches(baseResources, 100).Error; err != nil {
  206. tx.Rollback()
  207. return fmt.Errorf("failed to batch create base resources: %w", err)
  208. }
  209. }
  210. if err := tx.Commit().Error; err != nil {
  211. return fmt.Errorf("transaction commit failed: %w", err)
  212. }
  213. }
  214. // 批量更新变更资源
  215. now := time.Now()
  216. if len(modifiedIDs) > 0 {
  217. if err := l.svcCtx.DbEngin.Model(&models.TResourceSpec{}).
  218. Where("id IN ?", modifiedIDs).
  219. Updates(map[string]interface{}{
  220. "change_type": ChangeTypeModified,
  221. "update_time": now,
  222. }).Error; err != nil {
  223. return fmt.Errorf("batch update modified failed: %w", err)
  224. }
  225. }
  226. // 批量更新正常资源
  227. if len(normalIDs) > 0 {
  228. if err := l.svcCtx.DbEngin.Model(&models.TResourceSpec{}).
  229. Where("id IN ?", normalIDs).
  230. Updates(map[string]interface{}{
  231. "change_type": ChangeTypeNormal,
  232. "update_time": now,
  233. }).Error; err != nil {
  234. return fmt.Errorf("batch update normal failed: %w", err)
  235. }
  236. }
  237. // 处理删除的资源
  238. for key, dbSpec := range dbMap {
  239. if _, exists := apiMap[key]; !exists {
  240. if err := l.markResourceDeleted(dbSpec.Id); err != nil {
  241. return err
  242. }
  243. }
  244. }
  245. return nil
  246. }
  247. func resourceKey(resType, name string) string {
  248. return fmt.Sprintf("%s::%s", resType, name)
  249. }
  250. func (l *CompareResourceSpecLogic) createNewResource(spec *models.TResourceSpec) error {
  251. return l.svcCtx.DbEngin.Transaction(func(tx *gorm.DB) error {
  252. if err := tx.Create(spec).Error; err != nil {
  253. return fmt.Errorf("failed to create resource: %w", err)
  254. }
  255. return nil
  256. })
  257. }
  258. // 标识资源规格变更
  259. func (l *CompareResourceSpecLogic) updateResource(existing *models.TResourceSpec, newSpec models.TResourceSpec, changeType int) error {
  260. return l.svcCtx.DbEngin.Transaction(func(tx *gorm.DB) error {
  261. updates := map[string]interface{}{
  262. "change_type": changeType,
  263. "update_time": time.Now(),
  264. }
  265. if err := tx.Model(existing).Updates(updates).Error; err != nil {
  266. return fmt.Errorf("failed to update resource: %w", err)
  267. }
  268. return nil
  269. })
  270. }
  271. func (l *CompareResourceSpecLogic) markResourceDeleted(id int64) error {
  272. return l.svcCtx.DbEngin.Model(&models.TResourceSpec{}).
  273. Where("id = ?", id).
  274. Update("change_type", ChangeTypeDeleted).
  275. Error
  276. }
  277. func (l *CompareResourceSpecLogic) isSpecChanged(old, new models.TResourceSpec) bool {
  278. if old.TotalCount != new.TotalCount ||
  279. old.AvailableCount != new.AvailableCount ||
  280. old.Region != new.Region {
  281. return true
  282. }
  283. // 比较基础资源
  284. oldBaseMap := make(map[string]models.TBaseResourceSpec)
  285. for _, br := range old.BaseResourceSpecs {
  286. oldBaseMap[resourceKey(br.Type, br.Name)] = br
  287. }
  288. for _, newBr := range new.BaseResourceSpecs {
  289. key := resourceKey(newBr.Type, newBr.Name)
  290. oldBr, exists := oldBaseMap[key]
  291. if !exists ||
  292. oldBr.TotalValue != newBr.TotalValue ||
  293. oldBr.AvailableValue != newBr.AvailableValue {
  294. return true
  295. }
  296. delete(oldBaseMap, key)
  297. }
  298. return len(oldBaseMap) > 0
  299. }
  300. func (l *CompareResourceSpecLogic) convertToResourceSpec(ClusterId int64, res Resource) models.TResourceSpec {
  301. spec := models.TResourceSpec{
  302. SourceKey: resourceKey(res.Resource.Type, res.Resource.Name),
  303. Type: res.Resource.Type,
  304. Name: res.Resource.Name,
  305. TotalCount: int64(res.Resource.Total.Value),
  306. AvailableCount: int64(res.Resource.Available.Value),
  307. ClusterId: ClusterId,
  308. CreateTime: time.Now(),
  309. UpdateTime: time.Now(),
  310. ChangeType: ChangeTypeNormal,
  311. }
  312. for _, br := range res.BaseResources {
  313. spec.BaseResourceSpecs = append(spec.BaseResourceSpecs, models.TBaseResourceSpec{
  314. Type: br.Type,
  315. Name: br.Name,
  316. TotalValue: br.Total.Value,
  317. TotalUnit: br.Total.Unit,
  318. AvailableValue: br.Available.Value,
  319. AvailableUnit: br.Available.Unit,
  320. CreateTime: time.Now(),
  321. UpdateTime: time.Now(),
  322. })
  323. }
  324. return spec
  325. }
  326. func (l *CompareResourceSpecLogic) queryDatabaseResults(req *types.ResourceSpecReq) (*types.PageResult, error) {
  327. result := &types.PageResult{
  328. PageNum: req.PageNum,
  329. PageSize: req.PageSize,
  330. }
  331. query := l.buildBaseQuery(req)
  332. if err := query.Count(&result.Total).Error; err != nil {
  333. return nil, fmt.Errorf("failed to count records: %w", err)
  334. }
  335. var specs []*models.TResourceSpec
  336. if err := query.Model(models.TResourceSpec{}).Preload("BaseResourceSpecs").
  337. Scopes(paginate(req.PageNum, req.PageSize)).
  338. Find(&specs).Error; err != nil {
  339. return nil, fmt.Errorf("failed to query resources: %w", err)
  340. }
  341. result.List = specs
  342. return result, nil
  343. }
  344. func (l *CompareResourceSpecLogic) buildBaseQuery(req *types.ResourceSpecReq) *gorm.DB {
  345. query := l.svcCtx.DbEngin.Model(&models.TResourceSpec{}).
  346. Where("cluster_id = ?", utils.StringToInt64(req.ClusterId)).
  347. Where("deleted_at IS NULL")
  348. if req.Status != "" {
  349. query = query.Where("status = ?", req.Status)
  350. }
  351. if req.ChangeType != "" {
  352. query = query.Where("change_type = ?", req.ChangeType)
  353. }
  354. if req.Type != "" {
  355. query = query.Where("type = ?", req.Type)
  356. }
  357. if req.Name != "" {
  358. query = query.Where("name LIKE ?", "%"+req.Name+"%")
  359. }
  360. return query
  361. }
  362. func paginate(pageNum, pageSize int) func(db *gorm.DB) *gorm.DB {
  363. return func(db *gorm.DB) *gorm.DB {
  364. offset := (pageNum - 1) * pageSize
  365. return db.Offset(offset).Limit(pageSize).Order("create_time DESC")
  366. }
  367. }

PCM is positioned as Software stack over Cloud, aiming to build the standards and ecology of heterogeneous cloud collaboration for JCC in a non intrusive and autonomous peer-to-peer manner.