You can not select more than 25 topics Topics must start with a chinese character,a letter or number, can include dashes ('-') and can be up to 35 characters long.

hpc_cron_task.go 2.4 kB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889
  1. package cron
  2. import (
  3. "errors"
  4. "fmt"
  5. "github.com/zeromicro/go-zero/core/logx"
  6. "gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/service"
  7. "gitlink.org.cn/JointCloud/pcm-coordinator/internal/svc"
  8. "gitlink.org.cn/JointCloud/pcm-coordinator/internal/types"
  9. )
  10. // GetHpcTaskList get hpc task list
  11. func GetHpcTaskList(svc *svc.ServiceContext) ([]*types.TaskModel, error) {
  12. limit := 10
  13. offset := 0
  14. var list []*types.TaskModel
  15. db := svc.DbEngin.Model(&types.TaskModel{}).Table("task").
  16. Joins("join task_hpc hpc on task.id = hpc.task_id").
  17. Select("task.* ,hpc.job_id, hpc.work_dir, hpc.status, hpc.updated_time").
  18. Where("task.adapter_type_dict = 2 AND task.status NOT IN ('Succeeded', 'Failed') and task.deleted_at is null")
  19. //count total
  20. var total int64
  21. err := db.Count(&total).Error
  22. if err != nil {
  23. return nil, err
  24. }
  25. db.Limit(limit).Offset(offset)
  26. err = db.Order("created_time desc").Scan(&list).Error
  27. if err != nil {
  28. return nil, err
  29. }
  30. return list, nil
  31. }
  32. func UpdateHpcAdapterMaps(svc *svc.ServiceContext) {
  33. var hpcType = "2"
  34. adapterIds, err := svc.Scheduler.HpcStorages.GetAdapterIdsByType(hpcType)
  35. if err != nil {
  36. msg := fmt.Sprintf("###UpdateHpcAdapterMaps###, error: %v \n", err.Error())
  37. logx.Errorf(errors.New(msg).Error())
  38. return
  39. }
  40. if len(adapterIds) == 0 {
  41. return
  42. }
  43. for _, id := range adapterIds {
  44. clusters, err := svc.Scheduler.HpcStorages.GetClustersByAdapterId(id)
  45. if err != nil {
  46. msg := fmt.Sprintf("###UpdateHpcAdapterMaps###, error: %v \n", err.Error())
  47. logx.Errorf(errors.New(msg).Error())
  48. return
  49. }
  50. if len(clusters.List) == 0 {
  51. continue
  52. }
  53. if hpcAdapterExist(svc, id, len(clusters.List)) {
  54. continue
  55. } else {
  56. if hpcAdapterEmpty(svc, id) {
  57. exeClusterMap := service.InitHpcClusterMap(&svc.Config, clusters.List)
  58. svc.Scheduler.HpcService.HpcExecutorAdapterMap[id] = exeClusterMap
  59. } else {
  60. svc.Scheduler.HpcService.UpdateHpcClusterMaps(&svc.Config, id, clusters.List)
  61. }
  62. }
  63. }
  64. }
  65. func hpcAdapterExist(svc *svc.ServiceContext, id string, clusterNum int) bool {
  66. emap, ok := svc.Scheduler.HpcService.HpcExecutorAdapterMap[id]
  67. if ok {
  68. if len(emap) == clusterNum {
  69. return true
  70. }
  71. }
  72. return false
  73. }
  74. func hpcAdapterEmpty(svc *svc.ServiceContext, id string) bool {
  75. _, ok := svc.Scheduler.HpcService.HpcExecutorAdapterMap[id]
  76. if !ok {
  77. return true
  78. }
  79. return false
  80. }

PCM is positioned as Software stack over Cloud, aiming to build the standards and ecology of heterogeneous cloud collaboration for JCC in a non intrusive and autonomous peer-to-peer manner.