You can not select more than 25 topics Topics must start with a chinese character,a letter or number, can include dashes ('-') and can be up to 35 characters long.

hpcac.go 4.2 kB

2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134
  1. package main
  2. import (
  3. "PCM/adaptor/PCM-CORE/rpc/pcmcoreclient"
  4. "PCM/adaptor/PCM-HPC/PCM-AC/rpc/hpcAC"
  5. "PCM/adaptor/PCM-HPC/PCM-AC/rpc/internal/config"
  6. "PCM/adaptor/PCM-HPC/PCM-AC/rpc/internal/logic"
  7. "PCM/adaptor/PCM-HPC/PCM-AC/rpc/internal/server"
  8. "PCM/adaptor/PCM-HPC/PCM-AC/rpc/internal/svc"
  9. "PCM/common/tool"
  10. "context"
  11. "flag"
  12. "github.com/zeromicro/go-zero/core/conf"
  13. "github.com/zeromicro/go-zero/core/logx"
  14. "github.com/zeromicro/go-zero/core/service"
  15. "github.com/zeromicro/go-zero/zrpc"
  16. "google.golang.org/grpc"
  17. "google.golang.org/grpc/reflection"
  18. )
  19. var configFile = flag.String("f", "adaptor/PCM-HPC/PCM-AC/rpc/etc/hpcac.yaml", "the config file")
  20. func main() {
  21. flag.Parse()
  22. var c config.Config
  23. conf.MustLoad(*configFile, &c)
  24. // start log component
  25. logx.MustSetup(c.LogConf)
  26. ctx := svc.NewServiceContext(c)
  27. ctx.Cron.Start()
  28. s := zrpc.MustNewServer(c.RpcServerConf, func(grpcServer *grpc.Server) {
  29. hpcAC.RegisterHpcACServer(grpcServer, server.NewHpcACServer(ctx))
  30. if c.Mode == service.DevMode || c.Mode == service.TestMode {
  31. reflection.Register(grpcServer)
  32. }
  33. })
  34. defer s.Stop()
  35. logx.Infof("Starting rpc server at %s...\n", c.ListenOn)
  36. initCron(ctx)
  37. s.Start()
  38. }
  39. func initCron(svc *svc.ServiceContext) {
  40. submitJobLogic := logic.NewSubmitJobLogic(context.Background(), svc)
  41. listLogic := logic.NewListJobLogic(context.Background(), svc)
  42. svc.Cron.AddFunc("*/5 * * * * ?", func() {
  43. syncInfoReq := pcmcoreclient.SyncInfoReq{
  44. Kind: "hpc",
  45. ServiceName: "ac",
  46. }
  47. // 查询core端分发下来的任务列表
  48. infoList, err := queryCoreInfoList(svc)
  49. if err != nil {
  50. logx.Error(err)
  51. return
  52. }
  53. // 提交任务
  54. submitJob(infoList, submitJobLogic)
  55. // 查询运行中的任务列表同步信息
  56. listReq := hpcAC.ListJobReq{}
  57. listJob, err := listLogic.ListJob(&listReq)
  58. if err != nil {
  59. logx.Error(err)
  60. return
  61. }
  62. for index1, _ := range infoList.HpcInfoList {
  63. for index2, _ := range listJob.Jobs {
  64. if listJob.Jobs[index2].JobName == infoList.HpcInfoList[index1].Name {
  65. infoList.HpcInfoList[index1].StartTime = listJob.Jobs[index2].JobStartTime
  66. infoList.HpcInfoList[index1].RunningTime = int64(tool.RunTimeToSeconds(listJob.Jobs[index2].JobRunTime))
  67. if listJob.Jobs[index2].JobStatus == "statR" {
  68. infoList.HpcInfoList[index1].Status = "Running"
  69. }
  70. if listJob.Jobs[index2].JobStatus == "statC" {
  71. infoList.HpcInfoList[index1].Status = "Completed"
  72. }
  73. }
  74. }
  75. }
  76. // 同步信息到core端
  77. if len(infoList.HpcInfoList) != 0 {
  78. syncInfoReq.HpcInfoList = infoList.HpcInfoList
  79. svc.PcmCoreRpc.SyncInfo(context.Background(), &syncInfoReq)
  80. }
  81. })
  82. }
  83. func submitJob(infoList *pcmcoreclient.InfoListResp, submitJobLogic *logic.SubmitJobLogic) {
  84. for index, _ := range infoList.HpcInfoList {
  85. if infoList.HpcInfoList[index].Status == "Saved" {
  86. submitReq := hpcAC.SubmitJobReq{
  87. Appname: "BASE",
  88. Apptype: "BASIC",
  89. StrJobManagerID: 1638523853,
  90. MapAppJobInfo: &hpcAC.MapAppJobInfo{
  91. GAP_CMD_FILE: "sleep 10",
  92. GAP_NNODE: "1",
  93. GAP_SUBMIT_TYPE: "cmd",
  94. GAP_JOB_NAME: infoList.HpcInfoList[index].Name,
  95. GAP_WORK_DIR: infoList.HpcInfoList[index].WorkDir,
  96. GAP_QUEUE: "debug2",
  97. GAP_NPROC: "1",
  98. GAP_APPNAME: "BASE",
  99. GAP_WALL_TIME: infoList.HpcInfoList[index].WallTime,
  100. GAP_STD_OUT_FILE: "/public/home/zhijiang/test/testjob1/std.out.%j",
  101. GAP_STD_ERR_FILE: " /public/home/zhijiang/test/testjob1/std.err.%j",
  102. },
  103. }
  104. jobResult, _ := submitJobLogic.SubmitJob(&submitReq)
  105. if jobResult.Code == "0" {
  106. infoList.HpcInfoList[index].Status = "Pending"
  107. infoList.HpcInfoList[index].JobId = jobResult.Data
  108. } else {
  109. infoList.HpcInfoList[index].Result = "Failed"
  110. infoList.HpcInfoList[index].Result = jobResult.Msg
  111. }
  112. }
  113. }
  114. }
  115. func queryCoreInfoList(svc *svc.ServiceContext) (*pcmcoreclient.InfoListResp, error) {
  116. infoReq := pcmcoreclient.InfoListReq{
  117. Kind: "hpc",
  118. ServiceName: "ac",
  119. }
  120. infoList, err := svc.PcmCoreRpc.InfoList(context.Background(), &infoReq)
  121. if err != nil {
  122. return nil, err
  123. }
  124. return infoList, nil
  125. }

PCM is positioned as Software stack over Cloud, aiming to build the standards and ecology of heterogeneous cloud collaboration for JCC in a non intrusive and autonomous peer-to-peer manner.