You can not select more than 25 topics Topics must start with a chinese character,a letter or number, can include dashes ('-') and can be up to 35 characters long.

compareresourcespeclogic.go 12 kB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409
  1. package core
  2. import (
  3. "context"
  4. "fmt"
  5. "github.com/mitchellh/mapstructure"
  6. "github.com/rs/zerolog/log"
  7. "gitlink.org.cn/JointCloud/pcm-coordinator/internal/logic/schedule"
  8. "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/models"
  9. "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/utils"
  10. "gorm.io/gorm"
  11. "time"
  12. "gitlink.org.cn/JointCloud/pcm-coordinator/internal/svc"
  13. "gitlink.org.cn/JointCloud/pcm-coordinator/internal/types"
  14. "github.com/zeromicro/go-zero/core/logx"
  15. )
  16. type CompareResourceSpecLogic struct {
  17. logx.Logger
  18. ctx context.Context
  19. svcCtx *svc.ServiceContext
  20. }
  21. const (
  22. ChangeTypeNormal = 0 // 资源规格正常
  23. ChangeTypeModified = 1 // 资源规格变更
  24. ChangeTypeDeleted = 2 // 资源被删除
  25. )
  26. type APIResponse struct {
  27. ClusterId string `json:"ClusterId"`
  28. ClusterType string `json:"clusterType"`
  29. Region string `json:"region"`
  30. Resources []Resource `json:"resources"`
  31. Msg string `json:"msg"`
  32. }
  33. type Resource struct {
  34. Resource ResourceDetail `json:"resource"`
  35. BaseResources []ResourceDetail `json:"baseResources"`
  36. }
  37. type ResourceDetail struct {
  38. Type string `json:"type"`
  39. Name string `json:"name"`
  40. Total Metric `json:"total"`
  41. Available Metric `json:"available"`
  42. }
  43. type Metric struct {
  44. Unit string `json:"unit"`
  45. Value float64 `json:"value"`
  46. }
  47. func NewCompareResourceSpecLogic(ctx context.Context, svcCtx *svc.ServiceContext) *CompareResourceSpecLogic {
  48. return &CompareResourceSpecLogic{
  49. Logger: logx.WithContext(ctx),
  50. ctx: ctx,
  51. svcCtx: svcCtx,
  52. }
  53. }
  54. func (l *CompareResourceSpecLogic) CompareResourceSpec(req *types.ResourceSpecReq) (resp *types.PageResult, err error) {
  55. log.Debug().Msgf("开始比较资源规格,ClusterId: %s, PageNum: %d, PageSize: %d", req.ClusterId, req.PageNum, req.PageSize)
  56. if req.ClusterId == "" {
  57. return resp, nil
  58. }
  59. // 获取集群资源数据
  60. startTime := time.Now()
  61. apiResources, err := l.FetchClusterResources(req.ClusterId)
  62. log.Debug().Msgf("调用获取ai训练资源接口耗时: %v", time.Since(startTime))
  63. if err != nil {
  64. return nil, fmt.Errorf("failed to fetch cluster resources: %w", err)
  65. }
  66. // 同步资源到数据库
  67. if err := l.syncResourcesToDB(apiResources); err != nil {
  68. return nil, fmt.Errorf("failed to sync resources: %w", err)
  69. }
  70. // 查询数据库结果
  71. return l.queryDatabaseResults(req)
  72. }
  73. func (l *CompareResourceSpecLogic) FetchClusterResources(ClusterId string) ([]APIResponse, error) {
  74. queryLogic := schedule.NewQueryResourcesLogic(l.ctx, l.svcCtx)
  75. resources, err := queryLogic.QueryResources(&types.QueryResourcesReq{
  76. ClusterIDs: []string{ClusterId},
  77. })
  78. if err != nil {
  79. return nil, fmt.Errorf("query resources failed: %w", err)
  80. }
  81. var apiResponses []APIResponse
  82. if err := decodeAPIResponse(resources.Data, &apiResponses); err != nil {
  83. return nil, fmt.Errorf("decode response failed: %w", err)
  84. }
  85. for _, resource := range apiResponses {
  86. if resource.Resources == nil {
  87. return nil, fmt.Errorf(resource.Msg)
  88. }
  89. }
  90. return apiResponses, nil
  91. }
  92. func decodeAPIResponse(input interface{}, output *[]APIResponse) error {
  93. config := &mapstructure.DecoderConfig{
  94. Result: output,
  95. TagName: "json",
  96. ErrorUnused: true,
  97. DecodeHook: mapstructure.ComposeDecodeHookFunc(
  98. mapstructure.StringToTimeHookFunc(time.RFC3339),
  99. mapstructure.StringToSliceHookFunc(","),
  100. ),
  101. }
  102. decoder, err := mapstructure.NewDecoder(config)
  103. if err != nil {
  104. return fmt.Errorf("failed to create decoder: %w", err)
  105. }
  106. if err := decoder.Decode(input); err != nil {
  107. return fmt.Errorf("decoding error: %w", err)
  108. }
  109. return nil
  110. }
  111. func (l *CompareResourceSpecLogic) syncResourcesToDB(apiResponses []APIResponse) error {
  112. for _, response := range apiResponses {
  113. // 转换API响应到数据库模型
  114. dbSpecs, apiSpecs, err := l.processAPIResponse(response)
  115. if err != nil {
  116. return err
  117. }
  118. // 处理资源变更
  119. if err := l.handleResourceChanges(dbSpecs, apiSpecs); err != nil {
  120. return fmt.Errorf("failed to handle resource changes: %w", err)
  121. }
  122. }
  123. return nil
  124. }
  125. func (l *CompareResourceSpecLogic) processAPIResponse(response APIResponse) ([]models.TResourceSpec, []models.TResourceSpec, error) {
  126. ClusterId := utils.StringToInt64(response.ClusterId)
  127. var dbSpecs []models.TResourceSpec
  128. if err := l.svcCtx.DbEngin.Model(models.TResourceSpec{}).Preload("BaseResourceSpecs").
  129. Where("cluster_id = ?", ClusterId).
  130. Find(&dbSpecs).Error; err != nil {
  131. return nil, nil, fmt.Errorf("database query failed: %w", err)
  132. }
  133. var apiSpecs []models.TResourceSpec
  134. for _, res := range response.Resources {
  135. // 检查资源类型和名称是否存在
  136. if res.Resource.Name == "" || res.Resource.Type == "" {
  137. continue
  138. }
  139. spec := l.convertToResourceSpec(ClusterId, res)
  140. apiSpecs = append(apiSpecs, spec)
  141. }
  142. return dbSpecs, apiSpecs, nil
  143. }
  144. func (l *CompareResourceSpecLogic) handleResourceChanges(dbSpecs, apiSpecs []models.TResourceSpec) error {
  145. // 创建资源映射
  146. dbMap := make(map[string]models.TResourceSpec)
  147. for _, spec := range dbSpecs {
  148. key := spec.SourceKey
  149. dbMap[key] = spec
  150. }
  151. apiMap := make(map[string]models.TResourceSpec)
  152. for _, spec := range apiSpecs {
  153. key := resourceKey(spec.Type, spec.Name)
  154. apiMap[key] = spec
  155. }
  156. // 处理新增或更新的资源
  157. for key, apiSpec := range apiMap {
  158. dbSpec, exists := dbMap[key]
  159. if !exists {
  160. // 资源不存在于数据库,创建新资源
  161. if err := l.createNewResource(&apiSpec); err != nil {
  162. return err
  163. }
  164. continue
  165. }
  166. // 资源已存在于数据库,检查是否有变更,资源规格有变更则更新changeType为变更
  167. if l.isSpecChanged(dbSpec, apiSpec) {
  168. if err := l.updateResource(&dbSpec, apiSpec, ChangeTypeModified); err != nil {
  169. return err
  170. }
  171. } else {
  172. // 如果资源存在且没有变更,更新changeType为正常
  173. if err := l.updateResource(&dbSpec, apiSpec, ChangeTypeNormal); err != nil {
  174. return err
  175. }
  176. }
  177. }
  178. // 处理删除的资源
  179. for key, dbSpec := range dbMap {
  180. if _, exists := apiMap[key]; !exists {
  181. if err := l.markResourceDeleted(dbSpec.Id); err != nil {
  182. return err
  183. }
  184. }
  185. }
  186. return nil
  187. }
  188. func resourceKey(resType, name string) string {
  189. return fmt.Sprintf("%s::%s", resType, name)
  190. }
  191. func (l *CompareResourceSpecLogic) createNewResource(spec *models.TResourceSpec) error {
  192. return l.svcCtx.DbEngin.Transaction(func(tx *gorm.DB) error {
  193. if err := tx.Create(spec).Error; err != nil {
  194. return fmt.Errorf("failed to create resource: %w", err)
  195. }
  196. return nil
  197. })
  198. }
  199. // 标识资源规格变更
  200. func (l *CompareResourceSpecLogic) updateResource(existing *models.TResourceSpec, newSpec models.TResourceSpec, changeType int) error {
  201. return l.svcCtx.DbEngin.Transaction(func(tx *gorm.DB) error {
  202. updates := map[string]interface{}{
  203. "change_type": changeType,
  204. "update_time": time.Now(),
  205. }
  206. if err := tx.Model(existing).Updates(updates).Error; err != nil {
  207. return fmt.Errorf("failed to update resource: %w", err)
  208. }
  209. return l.syncBaseResources(tx, existing.Id, newSpec.BaseResourceSpecs)
  210. })
  211. }
  212. func (l *CompareResourceSpecLogic) syncBaseResources(tx *gorm.DB, specID int64, newResources []models.TBaseResourceSpec) error {
  213. // 处理基础资源更新
  214. var existingResources []models.TBaseResourceSpec
  215. if err := tx.Where("resource_spec_id = ?", specID).Find(&existingResources).Error; err != nil {
  216. return fmt.Errorf("failed to query base resources: %w", err)
  217. }
  218. existingMap := make(map[string]models.TBaseResourceSpec)
  219. for _, r := range existingResources {
  220. key := resourceKey(r.Type, r.Name)
  221. existingMap[key] = r
  222. }
  223. // 处理更新和新增
  224. for i, newRes := range newResources {
  225. newRes.ResourceSpecId = specID
  226. key := resourceKey(newRes.Type, newRes.Name)
  227. if existing, exists := existingMap[key]; exists {
  228. newRes.Id = existing.Id
  229. newRes.CreateTime = existing.CreateTime
  230. if err := tx.Save(&newRes).Error; err != nil {
  231. return fmt.Errorf("failed to update base resource: %w", err)
  232. }
  233. } else {
  234. if err := tx.Create(&newRes).Error; err != nil {
  235. return fmt.Errorf("failed to create base resource: %w", err)
  236. }
  237. }
  238. newResources[i] = newRes
  239. }
  240. // 处理删除
  241. currentIDs := make(map[int64]struct{})
  242. for _, r := range newResources {
  243. currentIDs[r.Id] = struct{}{}
  244. }
  245. for _, existing := range existingResources {
  246. if _, exists := currentIDs[existing.Id]; !exists {
  247. if err := tx.Delete(&existing).Error; err != nil {
  248. return fmt.Errorf("failed to delete base resource: %w", err)
  249. }
  250. }
  251. }
  252. return nil
  253. }
  254. func (l *CompareResourceSpecLogic) markResourceDeleted(id int64) error {
  255. return l.svcCtx.DbEngin.Model(&models.TResourceSpec{}).
  256. Where("id = ?", id).
  257. Update("change_type", ChangeTypeDeleted).
  258. Error
  259. }
  260. func (l *CompareResourceSpecLogic) isSpecChanged(old, new models.TResourceSpec) bool {
  261. if old.TotalCount != new.TotalCount ||
  262. old.AvailableCount != new.AvailableCount ||
  263. old.Region != new.Region {
  264. return true
  265. }
  266. // 比较基础资源
  267. oldBaseMap := make(map[string]models.TBaseResourceSpec)
  268. for _, br := range old.BaseResourceSpecs {
  269. oldBaseMap[resourceKey(br.Type, br.Name)] = br
  270. }
  271. for _, newBr := range new.BaseResourceSpecs {
  272. key := resourceKey(newBr.Type, newBr.Name)
  273. oldBr, exists := oldBaseMap[key]
  274. if !exists ||
  275. oldBr.TotalValue != newBr.TotalValue ||
  276. oldBr.AvailableValue != newBr.AvailableValue {
  277. return true
  278. }
  279. delete(oldBaseMap, key)
  280. }
  281. return len(oldBaseMap) > 0
  282. }
  283. func (l *CompareResourceSpecLogic) convertToResourceSpec(ClusterId int64, res Resource) models.TResourceSpec {
  284. spec := models.TResourceSpec{
  285. SourceKey: resourceKey(res.Resource.Type, res.Resource.Name),
  286. Type: res.Resource.Type,
  287. Name: res.Resource.Name,
  288. TotalCount: int64(res.Resource.Total.Value),
  289. AvailableCount: int64(res.Resource.Available.Value),
  290. ClusterId: ClusterId,
  291. CreateTime: time.Now(),
  292. UpdateTime: time.Now(),
  293. ChangeType: ChangeTypeNormal,
  294. }
  295. for _, br := range res.BaseResources {
  296. spec.BaseResourceSpecs = append(spec.BaseResourceSpecs, models.TBaseResourceSpec{
  297. Type: br.Type,
  298. Name: br.Name,
  299. TotalValue: br.Total.Value,
  300. TotalUnit: br.Total.Unit,
  301. AvailableValue: br.Available.Value,
  302. AvailableUnit: br.Available.Unit,
  303. CreateTime: time.Now(),
  304. UpdateTime: time.Now(),
  305. })
  306. }
  307. return spec
  308. }
  309. func (l *CompareResourceSpecLogic) queryDatabaseResults(req *types.ResourceSpecReq) (*types.PageResult, error) {
  310. result := &types.PageResult{
  311. PageNum: req.PageNum,
  312. PageSize: req.PageSize,
  313. }
  314. query := l.buildBaseQuery(req)
  315. if err := query.Count(&result.Total).Error; err != nil {
  316. return nil, fmt.Errorf("failed to count records: %w", err)
  317. }
  318. var specs []*models.TResourceSpec
  319. if err := query.Model(models.TResourceSpec{}).Preload("BaseResourceSpecs").
  320. Scopes(paginate(req.PageNum, req.PageSize)).
  321. Find(&specs).Error; err != nil {
  322. return nil, fmt.Errorf("failed to query resources: %w", err)
  323. }
  324. result.List = specs
  325. return result, nil
  326. }
  327. func (l *CompareResourceSpecLogic) buildBaseQuery(req *types.ResourceSpecReq) *gorm.DB {
  328. query := l.svcCtx.DbEngin.Model(&models.TResourceSpec{}).
  329. Where("cluster_id = ?", utils.StringToInt64(req.ClusterId)).
  330. Where("deleted_at IS NULL")
  331. if req.Status != "" {
  332. query = query.Where("status = ?", req.Status)
  333. }
  334. if req.ChangeType != "" {
  335. query = query.Where("change_type = ?", req.ChangeType)
  336. }
  337. if req.Type != "" {
  338. query = query.Where("type = ?", req.Type)
  339. }
  340. if req.Name != "" {
  341. query = query.Where("name LIKE ?", "%"+req.Name+"%")
  342. }
  343. return query
  344. }
  345. func paginate(pageNum, pageSize int) func(db *gorm.DB) *gorm.DB {
  346. return func(db *gorm.DB) *gorm.DB {
  347. offset := (pageNum - 1) * pageSize
  348. return db.Offset(offset).Limit(pageSize).Order("create_time DESC")
  349. }
  350. }

PCM is positioned as Software stack over Cloud, aiming to build the standards and ecology of heterogeneous cloud collaboration for JCC in a non intrusive and autonomous peer-to-peer manner.