You can not select more than 25 topics Topics must start with a chinese character,a letter or number, can include dashes ('-') and can be up to 35 characters long.

commithpctasklogic.go 5.6 kB

11 months ago
1 year ago
1 year ago
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178
  1. package hpc
  2. import (
  3. "context"
  4. "errors"
  5. "github.com/go-resty/resty/v2"
  6. clientCore "gitlink.org.cn/JointCloud/pcm-coordinator/client"
  7. "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/models"
  8. "strconv"
  9. "time"
  10. "gitlink.org.cn/JointCloud/pcm-coordinator/internal/svc"
  11. "gitlink.org.cn/JointCloud/pcm-coordinator/internal/types"
  12. "github.com/zeromicro/go-zero/core/logx"
  13. )
  14. type CommitHpcTaskLogic struct {
  15. logx.Logger
  16. ctx context.Context
  17. svcCtx *svc.ServiceContext
  18. }
  19. func NewCommitHpcTaskLogic(ctx context.Context, svcCtx *svc.ServiceContext) *CommitHpcTaskLogic {
  20. return &CommitHpcTaskLogic{
  21. Logger: logx.WithContext(ctx),
  22. ctx: ctx,
  23. svcCtx: svcCtx,
  24. }
  25. }
  26. type JobSpec struct {
  27. Name string // 应用名称: BWA/lammps
  28. Backend string // 后端类型:slurm/sugonac
  29. App string
  30. OperateType string // 应用内操作类型: bwa:构建索引/对比序列
  31. Parameters map[string]string // 通用参数
  32. CustomParams map[string]string // 各平台自定义参数
  33. }
  34. type ResultParticipant struct {
  35. Code int `json:"code"`
  36. Data struct {
  37. Backend string `json:"backend"`
  38. JobInfo struct {
  39. JobDir string `json:"jobDir"`
  40. JobId string `json:"jobId"`
  41. } `json:"jobInfo"`
  42. } `json:"data"`
  43. Msg string `json:"msg"`
  44. TraceId string `json:"trace_id"`
  45. }
  46. func (l *CommitHpcTaskLogic) CommitHpcTask(req *types.CommitHpcTaskReq) (resp *types.CommitHpcTaskResp, err error) {
  47. var clusterInfo types.ClusterInfo
  48. l.svcCtx.DbEngin.Raw("SELECT * FROM `t_cluster` where id = ?", req.ClusterId).First(&clusterInfo)
  49. if len(clusterInfo.Id) == 0 {
  50. return resp, errors.New("cluster not found")
  51. }
  52. // 构建主任务结构体
  53. userId, _ := strconv.ParseInt(req.Parameters["UserId"], 10, 64)
  54. taskModel := models.Task{
  55. Name: req.Name,
  56. Description: req.Description,
  57. CommitTime: time.Now(),
  58. Status: "Running",
  59. AdapterTypeDict: "2",
  60. UserId: userId,
  61. }
  62. // 保存任务数据到数据库
  63. tx := l.svcCtx.DbEngin.Create(&taskModel)
  64. if tx.Error != nil {
  65. return nil, tx.Error
  66. }
  67. var adapterName string
  68. l.svcCtx.DbEngin.Raw("SELECT name FROM `t_adapter` where id = ?", clusterInfo.AdapterId).Scan(&adapterName)
  69. var server string
  70. l.svcCtx.DbEngin.Raw("SELECT server FROM `t_adapter` where id = ?", clusterInfo.AdapterId).Scan(&server)
  71. if len(adapterName) == 0 || adapterName == "" {
  72. return nil, errors.New("no corresponding adapter found")
  73. }
  74. clusterId, err := strconv.ParseInt(req.ClusterId, 10, 64)
  75. cardCount, _ := strconv.ParseInt(req.Parameters["cardCount"], 10, 64)
  76. timelimit, _ := strconv.ParseInt(req.Parameters["timeLimit"], 10, 64)
  77. hpcInfo := models.TaskHpc{
  78. TaskId: taskModel.Id,
  79. AdapterId: clusterInfo.AdapterId,
  80. AdapterName: adapterName,
  81. ClusterId: clusterId,
  82. ClusterName: clusterInfo.Name,
  83. Name: taskModel.Name,
  84. Backend: req.Backend,
  85. OperateType: req.OperateType,
  86. CmdScript: req.Parameters["cmdScript"],
  87. StartTime: time.Now().String(),
  88. CardCount: cardCount,
  89. WorkDir: req.Parameters["workDir"],
  90. WallTime: req.Parameters["wallTime"],
  91. AppType: req.Parameters["appType"],
  92. AppName: req.Parameters["appName"],
  93. Queue: req.Parameters["queue"],
  94. SubmitType: req.Parameters["submitType"],
  95. NNode: req.Parameters["nNode"],
  96. Account: clusterInfo.Username,
  97. StdInput: req.Parameters["stdInput"],
  98. Partition: req.Parameters["partition"],
  99. CreatedTime: time.Now(),
  100. UpdatedTime: time.Now(),
  101. Status: "Running",
  102. TimeLimit: timelimit,
  103. }
  104. hpcInfo.WorkDir = clusterInfo.WorkDir + req.Parameters["WorkDir"]
  105. tx = l.svcCtx.DbEngin.Create(&hpcInfo)
  106. if tx.Error != nil {
  107. return nil, tx.Error
  108. }
  109. // 保存操作记录
  110. noticeInfo := clientCore.NoticeInfo{
  111. AdapterId: clusterInfo.AdapterId,
  112. AdapterName: adapterName,
  113. ClusterId: clusterId,
  114. ClusterName: clusterInfo.Name,
  115. NoticeType: "create",
  116. TaskName: req.Name,
  117. Incident: "任务创建中",
  118. CreatedTime: time.Now(),
  119. }
  120. result := l.svcCtx.DbEngin.Table("t_notice").Create(&noticeInfo)
  121. if result.Error != nil {
  122. logx.Errorf("Task creation failure, err: %v", result.Error)
  123. }
  124. // 数据上链
  125. // 查询资源价格
  126. var price int64
  127. l.svcCtx.DbEngin.Raw("select price from `resource_cost` where resource_id = ?", clusterId).Scan(&price)
  128. //bytes, _ := json.Marshal(taskModel)
  129. //remoteUtil.Evidence(remoteUtil.EvidenceParam{
  130. // UserIp: req.Parameters["UserIp"],
  131. // Url: l.svcCtx.Config.BlockChain.Url,
  132. // ContractAddress: l.svcCtx.Config.BlockChain.ContractAddress,
  133. // FunctionName: l.svcCtx.Config.BlockChain.FunctionName,
  134. // Type: l.svcCtx.Config.BlockChain.Type,
  135. // Token: req.Parameters["Token"],
  136. // Amount: price,
  137. // Args: []string{strconv.FormatInt(taskModel.Id, 10), string(bytes)},
  138. //})
  139. // 提交job到指定集群
  140. logx.Info("提交job到指定集群")
  141. resp, _ = submitJob(req, server)
  142. return resp, nil
  143. }
  144. func submitJob(req *types.CommitHpcTaskReq, adapterAddress string) (resp *types.CommitHpcTaskResp, err error) {
  145. req.Parameters["jobName"] = req.Name
  146. reqParticipant := JobSpec{
  147. Name: req.Name,
  148. Backend: req.Backend,
  149. App: req.App,
  150. OperateType: req.OperateType,
  151. Parameters: req.Parameters,
  152. CustomParams: req.CustomParams,
  153. }
  154. httpClient := resty.New().R()
  155. logx.Info("远程调用p端接口开始")
  156. httpClient.SetHeader("Content-Type", "application/json").
  157. SetBody(reqParticipant).
  158. SetResult(&resp).
  159. Post(adapterAddress + "/api/v1/jobs")
  160. logx.Info("远程调用p端接口完成")
  161. return resp, nil
  162. }

PCM is positioned as Software stack over Cloud, aiming to build the standards and ecology of heterogeneous cloud collaboration for JCC in a non intrusive and autonomous peer-to-peer manner.