Browse Source

fix

Signed-off-by: jagger <cossjie@foxmail.com>
pull/464/head
jagger 7 months ago
parent
commit
0a8e47a317
6 changed files with 123 additions and 172 deletions
  1. +0
    -58
      internal/cron/hpc_cron_task.go
  2. +25
    -64
      internal/logic/hpc/commithpctasklogic.go
  3. +2
    -0
      internal/scheduler/service/collector/hpc_collector.go
  4. +30
    -6
      internal/scheduler/service/hpc/slurm.go
  5. +60
    -40
      internal/scheduler/service/hpc_service.go
  6. +6
    -4
      internal/scheduler/service/utils/status/hpc_task_sync.go

+ 0
- 58
internal/cron/hpc_cron_task.go View File

@@ -1,10 +1,6 @@
package cron package cron


import ( import (
"errors"
"fmt"
"github.com/zeromicro/go-zero/core/logx"
"gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/service"
"gitlink.org.cn/JointCloud/pcm-coordinator/internal/svc" "gitlink.org.cn/JointCloud/pcm-coordinator/internal/svc"
"gitlink.org.cn/JointCloud/pcm-coordinator/internal/types" "gitlink.org.cn/JointCloud/pcm-coordinator/internal/types"
) )
@@ -33,57 +29,3 @@ func GetHpcTaskList(svc *svc.ServiceContext) ([]*types.TaskModel, error) {
} }
return list, nil return list, nil
} }

func UpdateHpcAdapterMaps(svc *svc.ServiceContext) {
var hpcType = "2"
adapterIds, err := svc.Scheduler.HpcStorages.GetAdapterIdsByType(hpcType)
if err != nil {
msg := fmt.Sprintf("###UpdateHpcAdapterMaps###, error: %v \n", err.Error())
logx.Errorf(errors.New(msg).Error())
return
}
if len(adapterIds) == 0 {
return
}

for _, id := range adapterIds {
clusters, err := svc.Scheduler.HpcStorages.GetClustersByAdapterId(id)
if err != nil {
msg := fmt.Sprintf("###UpdateHpcAdapterMaps###, error: %v \n", err.Error())
logx.Errorf(errors.New(msg).Error())
return
}
if len(clusters.List) == 0 {
continue
}
if hpcAdapterExist(svc, id, len(clusters.List)) {
continue
} else {
if hpcAdapterEmpty(svc, id) {
exeClusterMap := service.InitHpcClusterMap(&svc.Config, clusters.List)
svc.Scheduler.HpcService.HpcExecutorAdapterMap[id] = exeClusterMap
} else {
svc.Scheduler.HpcService.UpdateHpcClusterMaps(&svc.Config, id, clusters.List)
}
}
}
}

func hpcAdapterExist(svc *svc.ServiceContext, id string, clusterNum int) bool {
emap, ok := svc.Scheduler.HpcService.HpcExecutorAdapterMap[id]

if ok {
if len(emap) == clusterNum {
return true
}
}
return false
}

func hpcAdapterEmpty(svc *svc.ServiceContext, id string) bool {
_, ok := svc.Scheduler.HpcService.HpcExecutorAdapterMap[id]
if !ok {
return true
}
return false
}

+ 25
- 64
internal/logic/hpc/commithpctasklogic.go View File

@@ -3,9 +3,10 @@ package hpc
import ( import (
"context" "context"
"errors" "errors"
"github.com/go-resty/resty/v2"
jsoniter "github.com/json-iterator/go" jsoniter "github.com/json-iterator/go"
clientCore "gitlink.org.cn/JointCloud/pcm-coordinator/client" clientCore "gitlink.org.cn/JointCloud/pcm-coordinator/client"
"gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/service"
"gitlink.org.cn/JointCloud/pcm-coordinator/pkg/constants"
"gitlink.org.cn/JointCloud/pcm-coordinator/pkg/models" "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/models"
"gitlink.org.cn/JointCloud/pcm-coordinator/pkg/utils" "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/utils"
"strconv" "strconv"
@@ -19,39 +20,25 @@ import (


type CommitHpcTaskLogic struct { type CommitHpcTaskLogic struct {
logx.Logger logx.Logger
ctx context.Context
svcCtx *svc.ServiceContext
ctx context.Context
svcCtx *svc.ServiceContext
hpcService *service.HpcService
} }


func NewCommitHpcTaskLogic(ctx context.Context, svcCtx *svc.ServiceContext) *CommitHpcTaskLogic { func NewCommitHpcTaskLogic(ctx context.Context, svcCtx *svc.ServiceContext) *CommitHpcTaskLogic {
cache := make(map[string]interface{}, 10)
hpcService, err := service.NewHpcService(&svcCtx.Config, svcCtx.Scheduler.HpcStorages, cache)
if err != nil {
return nil
}
return &CommitHpcTaskLogic{ return &CommitHpcTaskLogic{
Logger: logx.WithContext(ctx),
ctx: ctx,
svcCtx: svcCtx,
Logger: logx.WithContext(ctx),
ctx: ctx,
svcCtx: svcCtx,
hpcService: hpcService,
} }
} }


type JobSpec struct {
Name string // 应用名称: BWA/lammps
Backend string // 后端类型:slurm/sugonac
App string
OperateType string // 应用内操作类型: bwa:构建索引/对比序列
Parameters map[string]string // 通用参数
CustomParams map[string]string // 各平台自定义参数
}
type ResultParticipant struct {
Code int `json:"code"`
Data struct {
Backend string `json:"backend"`
JobInfo struct {
JobDir string `json:"jobDir"`
JobId string `json:"jobId"`
} `json:"jobInfo"`
} `json:"data"`
Msg string `json:"msg"`
TraceId string `json:"trace_id"`
}

func (l *CommitHpcTaskLogic) CommitHpcTask(req *types.CommitHpcTaskReq) (resp *types.CommitHpcTaskResp, err error) { func (l *CommitHpcTaskLogic) CommitHpcTask(req *types.CommitHpcTaskReq) (resp *types.CommitHpcTaskResp, err error) {
reqStr, _ := jsoniter.MarshalToString(req) reqStr, _ := jsoniter.MarshalToString(req)
yaml := utils.StringToYaml(reqStr) yaml := utils.StringToYaml(reqStr)
@@ -68,7 +55,7 @@ func (l *CommitHpcTaskLogic) CommitHpcTask(req *types.CommitHpcTaskReq) (resp *t
Name: req.Name, Name: req.Name,
Description: req.Description, Description: req.Description,
CommitTime: time.Now(), CommitTime: time.Now(),
Status: "Running",
Status: "Saved",
AdapterTypeDict: "2", AdapterTypeDict: "2",
UserId: userId, UserId: userId,
YamlString: *yaml, YamlString: *yaml,
@@ -80,12 +67,10 @@ func (l *CommitHpcTaskLogic) CommitHpcTask(req *types.CommitHpcTaskReq) (resp *t
return nil, tx.Error return nil, tx.Error
} }


var adapterName string
l.svcCtx.DbEngin.Raw("SELECT name FROM `t_adapter` where id = ?", clusterInfo.AdapterId).Scan(&adapterName)
var server string
l.svcCtx.DbEngin.Raw("SELECT server FROM `t_adapter` where id = ?", clusterInfo.AdapterId).Scan(&server)
if len(adapterName) == 0 || adapterName == "" {
return nil, errors.New("no corresponding adapter found")
var adapterInfo types.AdapterInfo
l.svcCtx.DbEngin.Raw("SELECT * FROM `t_adapter` where id = ?", clusterInfo.AdapterId).Scan(&adapterInfo)
if adapterInfo.Id == "" {
return resp, errors.New("adapter not found")
} }
clusterId, err := strconv.ParseInt(req.ClusterId, 10, 64) clusterId, err := strconv.ParseInt(req.ClusterId, 10, 64)
cardCount, _ := strconv.ParseInt(req.Parameters["cardCount"], 10, 64) cardCount, _ := strconv.ParseInt(req.Parameters["cardCount"], 10, 64)
@@ -93,14 +78,14 @@ func (l *CommitHpcTaskLogic) CommitHpcTask(req *types.CommitHpcTaskReq) (resp *t
hpcInfo := models.TaskHpc{ hpcInfo := models.TaskHpc{
TaskId: taskModel.Id, TaskId: taskModel.Id,
AdapterId: clusterInfo.AdapterId, AdapterId: clusterInfo.AdapterId,
AdapterName: adapterName,
AdapterName: adapterInfo.Name,
ClusterId: clusterId, ClusterId: clusterId,
ClusterName: clusterInfo.Name, ClusterName: clusterInfo.Name,
Name: taskModel.Name, Name: taskModel.Name,
Backend: req.Backend, Backend: req.Backend,
OperateType: req.OperateType, OperateType: req.OperateType,
CmdScript: req.Parameters["cmdScript"], CmdScript: req.Parameters["cmdScript"],
StartTime: time.Now().String(),
StartTime: time.Now().Format(constants.Layout),
CardCount: cardCount, CardCount: cardCount,
WorkDir: req.Parameters["workDir"], WorkDir: req.Parameters["workDir"],
WallTime: req.Parameters["wallTime"], WallTime: req.Parameters["wallTime"],
@@ -127,7 +112,7 @@ func (l *CommitHpcTaskLogic) CommitHpcTask(req *types.CommitHpcTaskReq) (resp *t
// 保存操作记录 // 保存操作记录
noticeInfo := clientCore.NoticeInfo{ noticeInfo := clientCore.NoticeInfo{
AdapterId: clusterInfo.AdapterId, AdapterId: clusterInfo.AdapterId,
AdapterName: adapterName,
AdapterName: adapterInfo.Name,
ClusterId: clusterId, ClusterId: clusterId,
ClusterName: clusterInfo.Name, ClusterName: clusterInfo.Name,
NoticeType: "create", NoticeType: "create",
@@ -141,8 +126,8 @@ func (l *CommitHpcTaskLogic) CommitHpcTask(req *types.CommitHpcTaskReq) (resp *t
} }
// 数据上链 // 数据上链
// 查询资源价格 // 查询资源价格
var price int64
l.svcCtx.DbEngin.Raw("select price from `resource_cost` where resource_id = ?", clusterId).Scan(&price)
//var price int64
//l.svcCtx.DbEngin.Raw("select price from `resource_cost` where resource_id = ?", clusterId).Scan(&price)


//bytes, _ := json.Marshal(taskModel) //bytes, _ := json.Marshal(taskModel)
//remoteUtil.Evidence(remoteUtil.EvidenceParam{ //remoteUtil.Evidence(remoteUtil.EvidenceParam{
@@ -157,7 +142,7 @@ func (l *CommitHpcTaskLogic) CommitHpcTask(req *types.CommitHpcTaskReq) (resp *t
//}) //})
// 提交job到指定集群 // 提交job到指定集群
logx.Info("提交job到指定集群") logx.Info("提交job到指定集群")
resp, err = submitJob(req, server)
resp, err = l.hpcService.HpcExecutorAdapterMap[adapterInfo.Id].SubmitTask(context.Background(), *req)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@@ -172,27 +157,3 @@ func (l *CommitHpcTaskLogic) CommitHpcTask(req *types.CommitHpcTaskReq) (resp *t
} }
return resp, nil return resp, nil
} }

func submitJob(req *types.CommitHpcTaskReq, adapterAddress string) (resp *types.CommitHpcTaskResp, err error) {
req.Parameters["jobName"] = req.Name + "_" + req.OperateType
reqParticipant := JobSpec{
Name: req.Name,
Backend: req.Backend,
App: req.App,
OperateType: req.OperateType,
Parameters: req.Parameters,
CustomParams: req.CustomParams,
}
httpClient := resty.New().R()
logx.Info("远程调用p端接口开始")
_, err = httpClient.SetHeader("Content-Type", "application/json").
SetBody(reqParticipant).
SetResult(&resp).
Post(adapterAddress + "/api/v1/jobs")
if err != nil {
return nil, err
}
logx.Info("远程调用p端接口完成")

return resp, nil
}

+ 2
- 0
internal/scheduler/service/collector/hpc_collector.go View File

@@ -2,11 +2,13 @@ package collector


import ( import (
"context" "context"
"gitlink.org.cn/JointCloud/pcm-coordinator/internal/types"
"time" "time"
) )


type HPCCollector interface { type HPCCollector interface {
GetTask(ctx context.Context, taskId string) (*Task, error) GetTask(ctx context.Context, taskId string) (*Task, error)
SubmitTask(ctx context.Context, req types.CommitHpcTaskReq) (*types.CommitHpcTaskResp, error)
} }


type JobInfo struct { type JobInfo struct {


+ 30
- 6
internal/scheduler/service/hpc/slurm.go View File

@@ -5,6 +5,7 @@ import (
"github.com/go-resty/resty/v2" "github.com/go-resty/resty/v2"
"github.com/zeromicro/go-zero/core/logx" "github.com/zeromicro/go-zero/core/logx"
"gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/service/collector" "gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/service/collector"
"gitlink.org.cn/JointCloud/pcm-coordinator/internal/types"
"gitlink.org.cn/JointCloud/pcm-coordinator/pkg/constants" "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/constants"
) )


@@ -17,9 +18,18 @@ type ParticipantHpc struct {
} }


const ( const (
JobDetailUrl = "/api/v1/jobs/detail/{backend}/{jobId}"
JobDetailUrl = "/api/v1/jobs/detail/{backend}/{jobId}"
SubmitTaskUrl = "/api/v1/jobs"
) )


func NewHpc(host string, id int64, platform string) *ParticipantHpc {
return &ParticipantHpc{
host: host,
participantId: id,
platform: platform,
}
}

func (c *ParticipantHpc) GetTask(ctx context.Context, taskId string) (*collector.Task, error) { func (c *ParticipantHpc) GetTask(ctx context.Context, taskId string) (*collector.Task, error) {
reqUrl := c.host + JobDetailUrl reqUrl := c.host + JobDetailUrl
hpcResp := &collector.HpcJobDetailResp{} hpcResp := &collector.HpcJobDetailResp{}
@@ -64,10 +74,24 @@ func (c *ParticipantHpc) GetTask(ctx context.Context, taskId string) (*collector
return &resp, nil return &resp, nil
} }


func NewHpc(host string, id int64, platform string) *ParticipantHpc {
return &ParticipantHpc{
host: host,
participantId: id,
platform: platform,
func (c *ParticipantHpc) SubmitTask(ctx context.Context, req types.CommitHpcTaskReq) (*types.CommitHpcTaskResp, error) {
reqUrl := c.host + SubmitTaskUrl
req.Parameters["jobName"] = req.Name + "_" + req.OperateType
resp := types.CommitHpcTaskResp{}
httpClient := resty.New().R()
_, err := httpClient.SetHeader("Content-Type", "application/json").
SetBody(map[string]interface{}{
"name": req.Name, // 应用名称: BWA/lammps
"backend": req.Backend, // 后端类型:slurm/sugonac
"app": req.App, // 超算应用: bwa/lammps
"operateType": req.OperateType, // 应用内操作类型: bwa:构建索引/对比序列
"parameters": req.Parameters, // 通用参数
"customParams": req.CustomParams, // 各平台自定义参数
}).
SetResult(&resp).
Post(reqUrl)
if err != nil {
return nil, err
} }
return &resp, nil
} }

+ 60
- 40
internal/scheduler/service/hpc_service.go View File

@@ -1,6 +1,7 @@
package service package service


import ( import (
"fmt"
"gitlink.org.cn/JointCloud/pcm-coordinator/internal/config" "gitlink.org.cn/JointCloud/pcm-coordinator/internal/config"
"gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/database" "gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/database"
"gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/service/collector" "gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/service/collector"
@@ -11,69 +12,88 @@ import (
) )


const ( const (
Slurm_Arm = "slurm_arm"
PcmSlurm = "pcm-slurm"
) )


type HpcService struct { type HpcService struct {
HpcExecutorAdapterMap map[string]map[string]collector.HPCCollector
HpcExecutorAdapterMap map[string]collector.HPCCollector
Storage *database.HpcStorage Storage *database.HpcStorage
LocalCache map[string]interface{} LocalCache map[string]interface{}
Conf *config.Config Conf *config.Config
TaskSyncLock sync.Mutex TaskSyncLock sync.Mutex
} }


// NewHpcService 创建并初始化HpcService实例
func NewHpcService(conf *config.Config, storages *database.HpcStorage, localCache map[string]interface{}) (*HpcService, error) { func NewHpcService(conf *config.Config, storages *database.HpcStorage, localCache map[string]interface{}) (*HpcService, error) {
var aiType = "2"
adapterIds, err := storages.GetAdapterIdsByType(aiType)
if err != nil {
return nil, err
}
hpcService := &HpcService{ hpcService := &HpcService{
HpcExecutorAdapterMap: make(map[string]map[string]collector.HPCCollector),
HpcExecutorAdapterMap: make(map[string]collector.HPCCollector),
Storage: storages, Storage: storages,
LocalCache: localCache, LocalCache: localCache,
Conf: conf, Conf: conf,
} }
for _, id := range adapterIds {
clusters, err := storages.GetClustersByAdapterId(id)
if err != nil {
return nil, err
}
if len(clusters.List) == 0 {
continue
}
exeClusterMap := InitHpcClusterMap(conf, clusters.List)
hpcService.HpcExecutorAdapterMap[id] = exeClusterMap

if err := hpcService.initAdapters(); err != nil {
return nil, err
} }


return hpcService, nil return hpcService, nil
} }


func InitHpcClusterMap(conf *config.Config, clusters []types.ClusterInfo) map[string]collector.HPCCollector {
executorMap := make(map[string]collector.HPCCollector)
for _, c := range clusters {
switch c.Name {
case Slurm_Arm:
id, _ := strconv.ParseInt(c.Id, 10, 64)
slurm := hpcservice.NewHpc(c.Server, id, c.Nickname)
executorMap[c.Id] = slurm
// initAdapters 初始化所有适配器
func (s *HpcService) initAdapters() error {
adapters, err := s.loadAdapters()
if err != nil {
return err
}
for _, adapter := range adapters {
if err := s.processAdapter(*adapter); err != nil {
return err
} }
} }
return executorMap

return nil
} }


func (as *HpcService) UpdateHpcClusterMaps(conf *config.Config, adapterId string, clusters []types.ClusterInfo) {
for _, c := range clusters {
_, ok := as.HpcExecutorAdapterMap[adapterId][c.Id]
if !ok {
switch c.Name {
case Slurm_Arm:
id, _ := strconv.ParseInt(c.Id, 10, 64)
slurm := hpcservice.NewHpc(c.Server, id, c.Nickname)
as.HpcExecutorAdapterMap[adapterId][c.Id] = slurm
}
} else {
continue
}
// loadAdapters 从存储中加载适配器
func (s *HpcService) loadAdapters() ([]*types.AdapterInfo, error) {
const aiType = "2"
return s.Storage.GetAdaptersByType(aiType)
}

// processAdapter 处理单个适配器
func (s *HpcService) processAdapter(adapter types.AdapterInfo) error {
if adapter.Id == "" {
return nil
}

executor, err := s.createExecutor(adapter)
if err != nil {
return err
}

if executor != nil {
s.HpcExecutorAdapterMap[adapter.Id] = executor
}

return nil
}

// createExecutor 根据适配器类型创建对应的执行器
func (s *HpcService) createExecutor(adapter types.AdapterInfo) (collector.HPCCollector, error) {
switch adapter.Nickname {
case PcmSlurm:
return s.CreateSlurmExecutor(adapter)
// 可以在这里添加其他类型的适配器
default:
return nil, nil // 或者返回错误,取决于业务需求
}
}

// CreateSlurmExecutor 创建Slurm执行器
func (s *HpcService) CreateSlurmExecutor(adapter types.AdapterInfo) (collector.HPCCollector, error) {
id, err := strconv.ParseInt(adapter.Id, 10, 64)
if err != nil {
return nil, fmt.Errorf("failed to parse adapter ID %s: %v", adapter.Id, err)
} }
return hpcservice.NewHpc(adapter.Server, id, adapter.Nickname), nil
} }

+ 6
- 4
internal/scheduler/service/utils/status/hpc_task_sync.go View File

@@ -2,6 +2,7 @@ package status


import ( import (
"fmt" "fmt"
jsoniter "github.com/json-iterator/go"
"github.com/pkg/errors" "github.com/pkg/errors"
"github.com/rs/zerolog/log" "github.com/rs/zerolog/log"
"github.com/zeromicro/go-zero/core/logx" "github.com/zeromicro/go-zero/core/logx"
@@ -28,10 +29,11 @@ func reportHpcStatusMessages(svc *svc.ServiceContext, task *types.TaskModel, hpc
Status: true, Status: true,
Message: "", Message: "",
ClusterID: strconv.FormatInt(hpcTask.ClusterId, 10), ClusterID: strconv.FormatInt(hpcTask.ClusterId, 10),
Output: hpcTask.JobId,
Output: hpcTask.WorkDir,
} }
report.Messages = append(report.Messages, jobMsg) report.Messages = append(report.Messages, jobMsg)
log.Debug().Msgf("通知中间件任务状态参数: [%v]", report)
marshal, _ := jsoniter.MarshalToString(report)
log.Debug().Msgf("通知中间件任务状态参数: [%v]", marshal)
_ = jcs.StatusReport(svc.Scheduler.AiService.Conf.JcsMiddleware.JobStatusReportUrl, report) _ = jcs.StatusReport(svc.Scheduler.AiService.Conf.JcsMiddleware.JobStatusReportUrl, report)


return nil return nil
@@ -62,7 +64,7 @@ func UpdateTaskStatusByHpc(svc *svc.ServiceContext, tasklist []*types.TaskModel)


_ = reportHpcStatusMessages(svc, task, hpcTaskList[0]) _ = reportHpcStatusMessages(svc, task, hpcTaskList[0])
case constants.Running: case constants.Running:
task.Status = constants.Succeeded
task.Status = constants.Running
logx.Errorf("############ Report Status Message Before Sending %s", task.Status) logx.Errorf("############ Report Status Message Before Sending %s", task.Status)


_ = reportHpcStatusMessages(svc, task, hpcTaskList[0]) _ = reportHpcStatusMessages(svc, task, hpcTaskList[0])
@@ -128,7 +130,7 @@ func updateHpcTask(svc *svc.ServiceContext, hpcTaskList ...*models.TaskHpc) {
wg.Add(1) wg.Add(1)
go func() { go func() {
h := http.Request{} h := http.Request{}
hpcTask, err := svc.Scheduler.HpcService.HpcExecutorAdapterMap[strconv.FormatInt(t.AdapterId, 10)][strconv.FormatInt(t.ClusterId, 10)].GetTask(h.Context(), t.JobId)
hpcTask, err := svc.Scheduler.HpcService.HpcExecutorAdapterMap[strconv.FormatInt(t.AdapterId, 10)].GetTask(h.Context(), t.JobId)
if err != nil { if err != nil {
if status.Code(err) == codes.DeadlineExceeded { if status.Code(err) == codes.DeadlineExceeded {
msg := fmt.Sprintf("###UpdateHpcTaskStatus###, HpcTaskId: %v, clusterId: %v , JobId: %v, error: %v \n", t.Id, t.ClusterId, t.JobId, err.Error()) msg := fmt.Sprintf("###UpdateHpcTaskStatus###, HpcTaskId: %v, clusterId: %v , JobId: %v, error: %v \n", t.Id, t.ClusterId, t.JobId, err.Error())


Loading…
Cancel
Save