You can not select more than 25 topics Topics must start with a chinese character,a letter or number, can include dashes ('-') and can be up to 35 characters long.

commithpctasklogic.go 3.8 kB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136
  1. package hpc
  2. import (
  3. "context"
  4. "errors"
  5. clientCore "gitlink.org.cn/JointCloud/pcm-coordinator/client"
  6. "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/constants"
  7. "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/models"
  8. "k8s.io/apimachinery/pkg/util/json"
  9. "math/rand"
  10. "time"
  11. "gitlink.org.cn/JointCloud/pcm-coordinator/internal/svc"
  12. "gitlink.org.cn/JointCloud/pcm-coordinator/internal/types"
  13. "github.com/zeromicro/go-zero/core/logx"
  14. )
  15. type CommitHpcTaskLogic struct {
  16. logx.Logger
  17. ctx context.Context
  18. svcCtx *svc.ServiceContext
  19. }
  20. func NewCommitHpcTaskLogic(ctx context.Context, svcCtx *svc.ServiceContext) *CommitHpcTaskLogic {
  21. return &CommitHpcTaskLogic{
  22. Logger: logx.WithContext(ctx),
  23. ctx: ctx,
  24. svcCtx: svcCtx,
  25. }
  26. }
  27. func (l *CommitHpcTaskLogic) CommitHpcTask(req *types.CommitHpcTaskReq) (resp *types.CommitHpcTaskResp, err error) {
  28. // 构建主任务结构体
  29. taskModel := models.Task{
  30. Name: req.Name,
  31. Description: req.Description,
  32. Status: constants.Saved,
  33. Strategy: 0,
  34. SynergyStatus: 0,
  35. CommitTime: time.Now(),
  36. AdapterTypeDict: "2",
  37. }
  38. // 保存任务数据到数据库
  39. tx := l.svcCtx.DbEngin.Create(&taskModel)
  40. if tx.Error != nil {
  41. return nil, tx.Error
  42. }
  43. var clusterIds []int64
  44. l.svcCtx.DbEngin.Raw("SELECT id FROM `t_cluster` where adapter_id in ? and label = ?", req.AdapterIds, req.ClusterType).Scan(&clusterIds)
  45. if len(clusterIds) == 0 || clusterIds == nil {
  46. resp.Code = 400
  47. resp.Msg = "no cluster found"
  48. return resp, nil
  49. }
  50. var clusterName string
  51. var adapterId int64
  52. var adapterName string
  53. clusterId := clusterIds[rand.Intn(len(clusterIds))]
  54. l.svcCtx.DbEngin.Raw("SELECT nickname FROM `t_cluster` where id = ?", clusterId).Scan(&clusterName)
  55. l.svcCtx.DbEngin.Raw("SELECT adapter_id FROM `t_cluster` where id = ?", clusterId).Scan(&adapterId)
  56. l.svcCtx.DbEngin.Raw("SELECT name FROM `t_adapter` where id = ?", adapterId).Scan(&adapterName)
  57. if len(adapterName) == 0 || adapterName == "" {
  58. return nil, errors.New("no corresponding adapter found")
  59. }
  60. env, _ := json.Marshal(req.Environment)
  61. hpcInfo := models.TaskHpc{
  62. TaskId: taskModel.Id,
  63. AdapterId: uint(adapterId),
  64. AdapterName: adapterName,
  65. ClusterId: uint(clusterId),
  66. ClusterName: clusterName,
  67. Name: taskModel.Name,
  68. Status: "Saved",
  69. CmdScript: req.CmdScript,
  70. StartTime: time.Now().String(),
  71. CardCount: req.CardCount,
  72. WorkDir: req.WorkDir,
  73. WallTime: req.WallTime,
  74. AppType: req.AppType,
  75. AppName: req.AppName,
  76. Queue: req.Queue,
  77. SubmitType: req.SubmitType,
  78. NNode: req.NNode,
  79. StdOutFile: req.StdOutFile,
  80. StdErrFile: req.StdErrFile,
  81. StdInput: req.StdInput,
  82. Partition: req.Partition,
  83. DeletedFlag: 0,
  84. CreatedBy: 0,
  85. CreatedTime: time.Now(),
  86. UpdatedBy: 0,
  87. UpdatedTime: time.Now(),
  88. Environment: string(env),
  89. }
  90. tx = l.svcCtx.DbEngin.Create(&hpcInfo)
  91. if tx.Error != nil {
  92. return nil, tx.Error
  93. }
  94. noticeInfo := clientCore.NoticeInfo{
  95. AdapterId: adapterId,
  96. AdapterName: adapterName,
  97. ClusterId: clusterId,
  98. ClusterName: clusterName,
  99. NoticeType: "create",
  100. TaskName: req.Name,
  101. Incident: "任务创建中",
  102. CreatedTime: time.Now(),
  103. }
  104. result := l.svcCtx.DbEngin.Table("t_notice").Create(&noticeInfo)
  105. if result.Error != nil {
  106. logx.Errorf("Task creation failure, err: %v", result.Error)
  107. }
  108. // todo mq task manage
  109. //reqMessage, err := json.Marshal(mqInfo)
  110. //if err != nil {
  111. // logx.Error(err)
  112. // return nil, err
  113. //}
  114. //publish := l.svcCtx.RedisClient.Publish(context.Background(), mqInfo.TaskType, reqMessage)
  115. //if publish.Err() != nil {
  116. // return nil, publish.Err()
  117. //}
  118. resp = &types.CommitHpcTaskResp{
  119. Code: 200,
  120. Msg: "success",
  121. TaskId: taskModel.Id,
  122. }
  123. return resp, nil
  124. }

PCM is positioned as Software stack over Cloud, aiming to build the standards and ecology of heterogeneous cloud collaboration for JCC in a non intrusive and autonomous peer-to-peer manner.