You can not select more than 25 topics Topics must start with a chinese character,a letter or number, can include dashes ('-') and can be up to 35 characters long.

commithpctasklogic.go 5.4 kB

11 months ago
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167
  1. package hpc
  2. import (
  3. "context"
  4. "errors"
  5. jsoniter "github.com/json-iterator/go"
  6. clientCore "gitlink.org.cn/JointCloud/pcm-coordinator/client"
  7. "gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/service"
  8. "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/models"
  9. "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/utils"
  10. "strconv"
  11. "time"
  12. "gitlink.org.cn/JointCloud/pcm-coordinator/internal/svc"
  13. "gitlink.org.cn/JointCloud/pcm-coordinator/internal/types"
  14. "github.com/zeromicro/go-zero/core/logx"
  15. )
  16. type CommitHpcTaskLogic struct {
  17. logx.Logger
  18. ctx context.Context
  19. svcCtx *svc.ServiceContext
  20. hpcService *service.HpcService
  21. }
  22. func NewCommitHpcTaskLogic(ctx context.Context, svcCtx *svc.ServiceContext) *CommitHpcTaskLogic {
  23. cache := make(map[string]interface{}, 10)
  24. hpcService, err := service.NewHpcService(&svcCtx.Config, svcCtx.Scheduler.HpcStorages, cache)
  25. if err != nil {
  26. return nil
  27. }
  28. return &CommitHpcTaskLogic{
  29. Logger: logx.WithContext(ctx),
  30. ctx: ctx,
  31. svcCtx: svcCtx,
  32. hpcService: hpcService,
  33. }
  34. }
  35. func (l *CommitHpcTaskLogic) CommitHpcTask(req *types.CommitHpcTaskReq) (resp *types.CommitHpcTaskResp, err error) {
  36. req.Parameters["jobName"] = generateJobName(req)
  37. reqStr, _ := jsoniter.MarshalToString(req)
  38. yaml := utils.StringToYaml(reqStr)
  39. var clusterInfo types.ClusterInfo
  40. l.svcCtx.DbEngin.Raw("SELECT * FROM `t_cluster` where id = ?", req.ClusterId).First(&clusterInfo)
  41. if len(clusterInfo.Id) == 0 {
  42. return resp, errors.New("cluster not found")
  43. }
  44. // 构建主任务结构体
  45. userId, _ := strconv.ParseInt(req.Parameters["UserId"], 10, 64)
  46. taskModel := models.Task{
  47. Name: req.Name,
  48. Description: req.Description,
  49. CommitTime: time.Now(),
  50. Status: "Saved",
  51. AdapterTypeDict: "2",
  52. UserId: userId,
  53. YamlString: *yaml,
  54. }
  55. // 保存任务数据到数据库
  56. tx := l.svcCtx.DbEngin.Create(&taskModel)
  57. if tx.Error != nil {
  58. return nil, tx.Error
  59. }
  60. var adapterInfo types.AdapterInfo
  61. l.svcCtx.DbEngin.Raw("SELECT * FROM `t_adapter` where id = ?", clusterInfo.AdapterId).Scan(&adapterInfo)
  62. if adapterInfo.Id == "" {
  63. return resp, errors.New("adapter not found")
  64. }
  65. clusterId, err := strconv.ParseInt(req.ClusterId, 10, 64)
  66. cardCount, _ := strconv.ParseInt(req.Parameters["cardCount"], 10, 64)
  67. timelimit, _ := strconv.ParseInt(req.Parameters["timeLimit"], 10, 64)
  68. hpcInfo := models.TaskHpc{
  69. TaskId: taskModel.Id,
  70. AdapterId: clusterInfo.AdapterId,
  71. AdapterName: adapterInfo.Name,
  72. ClusterId: clusterId,
  73. ClusterName: clusterInfo.Name,
  74. Name: taskModel.Name,
  75. Backend: req.Backend,
  76. OperateType: req.OperateType,
  77. CmdScript: req.Parameters["cmdScript"],
  78. CardCount: cardCount,
  79. WorkDir: req.Parameters["workDir"],
  80. WallTime: req.Parameters["wallTime"],
  81. AppType: req.Parameters["appType"],
  82. AppName: req.App,
  83. Queue: req.Parameters["queue"],
  84. SubmitType: req.Parameters["submitType"],
  85. NNode: req.Parameters["nNode"],
  86. Account: clusterInfo.Username,
  87. StdInput: req.Parameters["stdInput"],
  88. Partition: req.Parameters["partition"],
  89. CreatedTime: time.Now(),
  90. UpdatedTime: time.Now(),
  91. Status: "Deploying",
  92. TimeLimit: timelimit,
  93. UserId: userId,
  94. YamlString: *yaml,
  95. }
  96. hpcInfo.WorkDir = clusterInfo.WorkDir + req.Parameters["WorkDir"]
  97. tx = l.svcCtx.DbEngin.Create(&hpcInfo)
  98. if tx.Error != nil {
  99. return nil, tx.Error
  100. }
  101. // 保存操作记录
  102. noticeInfo := clientCore.NoticeInfo{
  103. AdapterId: clusterInfo.AdapterId,
  104. AdapterName: adapterInfo.Name,
  105. ClusterId: clusterId,
  106. ClusterName: clusterInfo.Name,
  107. NoticeType: "create",
  108. TaskName: req.Name,
  109. Incident: "任务创建中",
  110. CreatedTime: time.Now(),
  111. }
  112. result := l.svcCtx.DbEngin.Table("t_notice").Create(&noticeInfo)
  113. if result.Error != nil {
  114. logx.Errorf("Task creation failure, err: %v", result.Error)
  115. }
  116. // 数据上链
  117. // 查询资源价格
  118. //var price int64
  119. //l.svcCtx.DbEngin.Raw("select price from `resource_cost` where resource_id = ?", clusterId).Scan(&price)
  120. //bytes, _ := json.Marshal(taskModel)
  121. //remoteUtil.Evidence(remoteUtil.EvidenceParam{
  122. // UserIp: req.Parameters["UserIp"],
  123. // Url: l.svcCtx.Config.BlockChain.Url,
  124. // ContractAddress: l.svcCtx.Config.BlockChain.ContractAddress,
  125. // FunctionName: l.svcCtx.Config.BlockChain.FunctionName,
  126. // Type: l.svcCtx.Config.BlockChain.Type,
  127. // Token: req.Parameters["Token"],
  128. // Amount: price,
  129. // Args: []string{strconv.FormatInt(taskModel.Id, 10), string(bytes)},
  130. //})
  131. // 提交job到指定集群
  132. logx.Info("提交job到指定集群")
  133. resp, err = l.hpcService.HpcExecutorAdapterMap[adapterInfo.Id].SubmitTask(l.ctx, *req)
  134. if err != nil {
  135. return nil, err
  136. }
  137. // 更新任务状态
  138. updates := l.svcCtx.DbEngin.Model(&hpcInfo).Updates(models.TaskHpc{
  139. Id: hpcInfo.Id,
  140. JobId: resp.Data.JobInfo["jobId"],
  141. WorkDir: resp.Data.JobInfo["jobDir"],
  142. })
  143. if updates.Error != nil {
  144. return nil, updates.Error
  145. }
  146. resp.Data.JobInfo["taskId"] = strconv.FormatInt(taskModel.Id, 10)
  147. return resp, nil
  148. }
  149. // generateJobName 根据条件生成 jobName
  150. func generateJobName(req *types.CommitHpcTaskReq) string {
  151. if req.OperateType == "" {
  152. return req.Name
  153. }
  154. return req.Name + "_" + req.OperateType
  155. }

PCM is positioned as Software stack over Cloud, aiming to build the standards and ecology of heterogeneous cloud collaboration for JCC in a non intrusive and autonomous peer-to-peer manner.