diff --git a/internal/cron/hpc_cron_task.go b/internal/cron/hpc_cron_task.go index 3977d9a1..8298cbba 100644 --- a/internal/cron/hpc_cron_task.go +++ b/internal/cron/hpc_cron_task.go @@ -1,10 +1,6 @@ package cron import ( - "errors" - "fmt" - "github.com/zeromicro/go-zero/core/logx" - "gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/service" "gitlink.org.cn/JointCloud/pcm-coordinator/internal/svc" "gitlink.org.cn/JointCloud/pcm-coordinator/internal/types" ) @@ -33,57 +29,3 @@ func GetHpcTaskList(svc *svc.ServiceContext) ([]*types.TaskModel, error) { } return list, nil } - -func UpdateHpcAdapterMaps(svc *svc.ServiceContext) { - var hpcType = "2" - adapterIds, err := svc.Scheduler.HpcStorages.GetAdapterIdsByType(hpcType) - if err != nil { - msg := fmt.Sprintf("###UpdateHpcAdapterMaps###, error: %v \n", err.Error()) - logx.Errorf(errors.New(msg).Error()) - return - } - if len(adapterIds) == 0 { - return - } - - for _, id := range adapterIds { - clusters, err := svc.Scheduler.HpcStorages.GetClustersByAdapterId(id) - if err != nil { - msg := fmt.Sprintf("###UpdateHpcAdapterMaps###, error: %v \n", err.Error()) - logx.Errorf(errors.New(msg).Error()) - return - } - if len(clusters.List) == 0 { - continue - } - if hpcAdapterExist(svc, id, len(clusters.List)) { - continue - } else { - if hpcAdapterEmpty(svc, id) { - exeClusterMap := service.InitHpcClusterMap(&svc.Config, clusters.List) - svc.Scheduler.HpcService.HpcExecutorAdapterMap[id] = exeClusterMap - } else { - svc.Scheduler.HpcService.UpdateHpcClusterMaps(&svc.Config, id, clusters.List) - } - } - } -} - -func hpcAdapterExist(svc *svc.ServiceContext, id string, clusterNum int) bool { - emap, ok := svc.Scheduler.HpcService.HpcExecutorAdapterMap[id] - - if ok { - if len(emap) == clusterNum { - return true - } - } - return false -} - -func hpcAdapterEmpty(svc *svc.ServiceContext, id string) bool { - _, ok := svc.Scheduler.HpcService.HpcExecutorAdapterMap[id] - if !ok { - return true - } - return false -} diff --git a/internal/logic/hpc/commithpctasklogic.go b/internal/logic/hpc/commithpctasklogic.go index 6aef90f4..abcd10b9 100644 --- a/internal/logic/hpc/commithpctasklogic.go +++ b/internal/logic/hpc/commithpctasklogic.go @@ -3,9 +3,10 @@ package hpc import ( "context" "errors" - "github.com/go-resty/resty/v2" jsoniter "github.com/json-iterator/go" clientCore "gitlink.org.cn/JointCloud/pcm-coordinator/client" + "gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/service" + "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/constants" "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/models" "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/utils" "strconv" @@ -19,39 +20,25 @@ import ( type CommitHpcTaskLogic struct { logx.Logger - ctx context.Context - svcCtx *svc.ServiceContext + ctx context.Context + svcCtx *svc.ServiceContext + hpcService *service.HpcService } func NewCommitHpcTaskLogic(ctx context.Context, svcCtx *svc.ServiceContext) *CommitHpcTaskLogic { + cache := make(map[string]interface{}, 10) + hpcService, err := service.NewHpcService(&svcCtx.Config, svcCtx.Scheduler.HpcStorages, cache) + if err != nil { + return nil + } return &CommitHpcTaskLogic{ - Logger: logx.WithContext(ctx), - ctx: ctx, - svcCtx: svcCtx, + Logger: logx.WithContext(ctx), + ctx: ctx, + svcCtx: svcCtx, + hpcService: hpcService, } } -type JobSpec struct { - Name string // 应用名称: BWA/lammps - Backend string // 后端类型:slurm/sugonac - App string - OperateType string // 应用内操作类型: bwa:构建索引/对比序列 - Parameters map[string]string // 通用参数 - CustomParams map[string]string // 各平台自定义参数 -} -type ResultParticipant struct { - Code int `json:"code"` - Data struct { - Backend string `json:"backend"` - JobInfo struct { - JobDir string `json:"jobDir"` - JobId string `json:"jobId"` - } `json:"jobInfo"` - } `json:"data"` - Msg string `json:"msg"` - TraceId string `json:"trace_id"` -} - func (l *CommitHpcTaskLogic) CommitHpcTask(req *types.CommitHpcTaskReq) (resp *types.CommitHpcTaskResp, err error) { reqStr, _ := jsoniter.MarshalToString(req) yaml := utils.StringToYaml(reqStr) @@ -68,7 +55,7 @@ func (l *CommitHpcTaskLogic) CommitHpcTask(req *types.CommitHpcTaskReq) (resp *t Name: req.Name, Description: req.Description, CommitTime: time.Now(), - Status: "Running", + Status: "Saved", AdapterTypeDict: "2", UserId: userId, YamlString: *yaml, @@ -80,12 +67,10 @@ func (l *CommitHpcTaskLogic) CommitHpcTask(req *types.CommitHpcTaskReq) (resp *t return nil, tx.Error } - var adapterName string - l.svcCtx.DbEngin.Raw("SELECT name FROM `t_adapter` where id = ?", clusterInfo.AdapterId).Scan(&adapterName) - var server string - l.svcCtx.DbEngin.Raw("SELECT server FROM `t_adapter` where id = ?", clusterInfo.AdapterId).Scan(&server) - if len(adapterName) == 0 || adapterName == "" { - return nil, errors.New("no corresponding adapter found") + var adapterInfo types.AdapterInfo + l.svcCtx.DbEngin.Raw("SELECT * FROM `t_adapter` where id = ?", clusterInfo.AdapterId).Scan(&adapterInfo) + if adapterInfo.Id == "" { + return resp, errors.New("adapter not found") } clusterId, err := strconv.ParseInt(req.ClusterId, 10, 64) cardCount, _ := strconv.ParseInt(req.Parameters["cardCount"], 10, 64) @@ -93,14 +78,14 @@ func (l *CommitHpcTaskLogic) CommitHpcTask(req *types.CommitHpcTaskReq) (resp *t hpcInfo := models.TaskHpc{ TaskId: taskModel.Id, AdapterId: clusterInfo.AdapterId, - AdapterName: adapterName, + AdapterName: adapterInfo.Name, ClusterId: clusterId, ClusterName: clusterInfo.Name, Name: taskModel.Name, Backend: req.Backend, OperateType: req.OperateType, CmdScript: req.Parameters["cmdScript"], - StartTime: time.Now().String(), + StartTime: time.Now().Format(constants.Layout), CardCount: cardCount, WorkDir: req.Parameters["workDir"], WallTime: req.Parameters["wallTime"], @@ -127,7 +112,7 @@ func (l *CommitHpcTaskLogic) CommitHpcTask(req *types.CommitHpcTaskReq) (resp *t // 保存操作记录 noticeInfo := clientCore.NoticeInfo{ AdapterId: clusterInfo.AdapterId, - AdapterName: adapterName, + AdapterName: adapterInfo.Name, ClusterId: clusterId, ClusterName: clusterInfo.Name, NoticeType: "create", @@ -141,8 +126,8 @@ func (l *CommitHpcTaskLogic) CommitHpcTask(req *types.CommitHpcTaskReq) (resp *t } // 数据上链 // 查询资源价格 - var price int64 - l.svcCtx.DbEngin.Raw("select price from `resource_cost` where resource_id = ?", clusterId).Scan(&price) + //var price int64 + //l.svcCtx.DbEngin.Raw("select price from `resource_cost` where resource_id = ?", clusterId).Scan(&price) //bytes, _ := json.Marshal(taskModel) //remoteUtil.Evidence(remoteUtil.EvidenceParam{ @@ -157,7 +142,7 @@ func (l *CommitHpcTaskLogic) CommitHpcTask(req *types.CommitHpcTaskReq) (resp *t //}) // 提交job到指定集群 logx.Info("提交job到指定集群") - resp, err = submitJob(req, server) + resp, err = l.hpcService.HpcExecutorAdapterMap[adapterInfo.Id].SubmitTask(context.Background(), *req) if err != nil { return nil, err } @@ -172,27 +157,3 @@ func (l *CommitHpcTaskLogic) CommitHpcTask(req *types.CommitHpcTaskReq) (resp *t } return resp, nil } - -func submitJob(req *types.CommitHpcTaskReq, adapterAddress string) (resp *types.CommitHpcTaskResp, err error) { - req.Parameters["jobName"] = req.Name + "_" + req.OperateType - reqParticipant := JobSpec{ - Name: req.Name, - Backend: req.Backend, - App: req.App, - OperateType: req.OperateType, - Parameters: req.Parameters, - CustomParams: req.CustomParams, - } - httpClient := resty.New().R() - logx.Info("远程调用p端接口开始") - _, err = httpClient.SetHeader("Content-Type", "application/json"). - SetBody(reqParticipant). - SetResult(&resp). - Post(adapterAddress + "/api/v1/jobs") - if err != nil { - return nil, err - } - logx.Info("远程调用p端接口完成") - - return resp, nil -} diff --git a/internal/scheduler/service/collector/hpc_collector.go b/internal/scheduler/service/collector/hpc_collector.go index 41f31dd9..282f198c 100644 --- a/internal/scheduler/service/collector/hpc_collector.go +++ b/internal/scheduler/service/collector/hpc_collector.go @@ -2,11 +2,13 @@ package collector import ( "context" + "gitlink.org.cn/JointCloud/pcm-coordinator/internal/types" "time" ) type HPCCollector interface { GetTask(ctx context.Context, taskId string) (*Task, error) + SubmitTask(ctx context.Context, req types.CommitHpcTaskReq) (*types.CommitHpcTaskResp, error) } type JobInfo struct { diff --git a/internal/scheduler/service/hpc/slurm.go b/internal/scheduler/service/hpc/slurm.go index 7b6dae24..97ee1aca 100644 --- a/internal/scheduler/service/hpc/slurm.go +++ b/internal/scheduler/service/hpc/slurm.go @@ -5,6 +5,7 @@ import ( "github.com/go-resty/resty/v2" "github.com/zeromicro/go-zero/core/logx" "gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/service/collector" + "gitlink.org.cn/JointCloud/pcm-coordinator/internal/types" "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/constants" ) @@ -17,9 +18,18 @@ type ParticipantHpc struct { } const ( - JobDetailUrl = "/api/v1/jobs/detail/{backend}/{jobId}" + JobDetailUrl = "/api/v1/jobs/detail/{backend}/{jobId}" + SubmitTaskUrl = "/api/v1/jobs" ) +func NewHpc(host string, id int64, platform string) *ParticipantHpc { + return &ParticipantHpc{ + host: host, + participantId: id, + platform: platform, + } +} + func (c *ParticipantHpc) GetTask(ctx context.Context, taskId string) (*collector.Task, error) { reqUrl := c.host + JobDetailUrl hpcResp := &collector.HpcJobDetailResp{} @@ -64,10 +74,24 @@ func (c *ParticipantHpc) GetTask(ctx context.Context, taskId string) (*collector return &resp, nil } -func NewHpc(host string, id int64, platform string) *ParticipantHpc { - return &ParticipantHpc{ - host: host, - participantId: id, - platform: platform, +func (c *ParticipantHpc) SubmitTask(ctx context.Context, req types.CommitHpcTaskReq) (*types.CommitHpcTaskResp, error) { + reqUrl := c.host + SubmitTaskUrl + req.Parameters["jobName"] = req.Name + "_" + req.OperateType + resp := types.CommitHpcTaskResp{} + httpClient := resty.New().R() + _, err := httpClient.SetHeader("Content-Type", "application/json"). + SetBody(map[string]interface{}{ + "name": req.Name, // 应用名称: BWA/lammps + "backend": req.Backend, // 后端类型:slurm/sugonac + "app": req.App, // 超算应用: bwa/lammps + "operateType": req.OperateType, // 应用内操作类型: bwa:构建索引/对比序列 + "parameters": req.Parameters, // 通用参数 + "customParams": req.CustomParams, // 各平台自定义参数 + }). + SetResult(&resp). + Post(reqUrl) + if err != nil { + return nil, err } + return &resp, nil } diff --git a/internal/scheduler/service/hpc_service.go b/internal/scheduler/service/hpc_service.go index b398e2c8..51346a28 100644 --- a/internal/scheduler/service/hpc_service.go +++ b/internal/scheduler/service/hpc_service.go @@ -1,6 +1,7 @@ package service import ( + "fmt" "gitlink.org.cn/JointCloud/pcm-coordinator/internal/config" "gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/database" "gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/service/collector" @@ -11,69 +12,88 @@ import ( ) const ( - Slurm_Arm = "slurm_arm" + PcmSlurm = "pcm-slurm" ) type HpcService struct { - HpcExecutorAdapterMap map[string]map[string]collector.HPCCollector + HpcExecutorAdapterMap map[string]collector.HPCCollector Storage *database.HpcStorage LocalCache map[string]interface{} Conf *config.Config TaskSyncLock sync.Mutex } +// NewHpcService 创建并初始化HpcService实例 func NewHpcService(conf *config.Config, storages *database.HpcStorage, localCache map[string]interface{}) (*HpcService, error) { - var aiType = "2" - adapterIds, err := storages.GetAdapterIdsByType(aiType) - if err != nil { - return nil, err - } hpcService := &HpcService{ - HpcExecutorAdapterMap: make(map[string]map[string]collector.HPCCollector), + HpcExecutorAdapterMap: make(map[string]collector.HPCCollector), Storage: storages, LocalCache: localCache, Conf: conf, } - for _, id := range adapterIds { - clusters, err := storages.GetClustersByAdapterId(id) - if err != nil { - return nil, err - } - if len(clusters.List) == 0 { - continue - } - exeClusterMap := InitHpcClusterMap(conf, clusters.List) - hpcService.HpcExecutorAdapterMap[id] = exeClusterMap + + if err := hpcService.initAdapters(); err != nil { + return nil, err } return hpcService, nil } -func InitHpcClusterMap(conf *config.Config, clusters []types.ClusterInfo) map[string]collector.HPCCollector { - executorMap := make(map[string]collector.HPCCollector) - for _, c := range clusters { - switch c.Name { - case Slurm_Arm: - id, _ := strconv.ParseInt(c.Id, 10, 64) - slurm := hpcservice.NewHpc(c.Server, id, c.Nickname) - executorMap[c.Id] = slurm +// initAdapters 初始化所有适配器 +func (s *HpcService) initAdapters() error { + adapters, err := s.loadAdapters() + if err != nil { + return err + } + for _, adapter := range adapters { + if err := s.processAdapter(*adapter); err != nil { + return err } } - return executorMap + + return nil } -func (as *HpcService) UpdateHpcClusterMaps(conf *config.Config, adapterId string, clusters []types.ClusterInfo) { - for _, c := range clusters { - _, ok := as.HpcExecutorAdapterMap[adapterId][c.Id] - if !ok { - switch c.Name { - case Slurm_Arm: - id, _ := strconv.ParseInt(c.Id, 10, 64) - slurm := hpcservice.NewHpc(c.Server, id, c.Nickname) - as.HpcExecutorAdapterMap[adapterId][c.Id] = slurm - } - } else { - continue - } +// loadAdapters 从存储中加载适配器 +func (s *HpcService) loadAdapters() ([]*types.AdapterInfo, error) { + const aiType = "2" + return s.Storage.GetAdaptersByType(aiType) +} + +// processAdapter 处理单个适配器 +func (s *HpcService) processAdapter(adapter types.AdapterInfo) error { + if adapter.Id == "" { + return nil + } + + executor, err := s.createExecutor(adapter) + if err != nil { + return err + } + + if executor != nil { + s.HpcExecutorAdapterMap[adapter.Id] = executor + } + + return nil +} + +// createExecutor 根据适配器类型创建对应的执行器 +func (s *HpcService) createExecutor(adapter types.AdapterInfo) (collector.HPCCollector, error) { + switch adapter.Nickname { + case PcmSlurm: + return s.CreateSlurmExecutor(adapter) + // 可以在这里添加其他类型的适配器 + default: + return nil, nil // 或者返回错误,取决于业务需求 + } +} + +// CreateSlurmExecutor 创建Slurm执行器 +func (s *HpcService) CreateSlurmExecutor(adapter types.AdapterInfo) (collector.HPCCollector, error) { + id, err := strconv.ParseInt(adapter.Id, 10, 64) + if err != nil { + return nil, fmt.Errorf("failed to parse adapter ID %s: %v", adapter.Id, err) } + return hpcservice.NewHpc(adapter.Server, id, adapter.Nickname), nil } diff --git a/internal/scheduler/service/utils/status/hpc_task_sync.go b/internal/scheduler/service/utils/status/hpc_task_sync.go index 411d23f6..62ace239 100644 --- a/internal/scheduler/service/utils/status/hpc_task_sync.go +++ b/internal/scheduler/service/utils/status/hpc_task_sync.go @@ -2,6 +2,7 @@ package status import ( "fmt" + jsoniter "github.com/json-iterator/go" "github.com/pkg/errors" "github.com/rs/zerolog/log" "github.com/zeromicro/go-zero/core/logx" @@ -28,10 +29,11 @@ func reportHpcStatusMessages(svc *svc.ServiceContext, task *types.TaskModel, hpc Status: true, Message: "", ClusterID: strconv.FormatInt(hpcTask.ClusterId, 10), - Output: hpcTask.JobId, + Output: hpcTask.WorkDir, } report.Messages = append(report.Messages, jobMsg) - log.Debug().Msgf("通知中间件任务状态参数: [%v]", report) + marshal, _ := jsoniter.MarshalToString(report) + log.Debug().Msgf("通知中间件任务状态参数: [%v]", marshal) _ = jcs.StatusReport(svc.Scheduler.AiService.Conf.JcsMiddleware.JobStatusReportUrl, report) return nil @@ -62,7 +64,7 @@ func UpdateTaskStatusByHpc(svc *svc.ServiceContext, tasklist []*types.TaskModel) _ = reportHpcStatusMessages(svc, task, hpcTaskList[0]) case constants.Running: - task.Status = constants.Succeeded + task.Status = constants.Running logx.Errorf("############ Report Status Message Before Sending %s", task.Status) _ = reportHpcStatusMessages(svc, task, hpcTaskList[0]) @@ -128,7 +130,7 @@ func updateHpcTask(svc *svc.ServiceContext, hpcTaskList ...*models.TaskHpc) { wg.Add(1) go func() { h := http.Request{} - hpcTask, err := svc.Scheduler.HpcService.HpcExecutorAdapterMap[strconv.FormatInt(t.AdapterId, 10)][strconv.FormatInt(t.ClusterId, 10)].GetTask(h.Context(), t.JobId) + hpcTask, err := svc.Scheduler.HpcService.HpcExecutorAdapterMap[strconv.FormatInt(t.AdapterId, 10)].GetTask(h.Context(), t.JobId) if err != nil { if status.Code(err) == codes.DeadlineExceeded { msg := fmt.Sprintf("###UpdateHpcTaskStatus###, HpcTaskId: %v, clusterId: %v , JobId: %v, error: %v \n", t.Id, t.ClusterId, t.JobId, err.Error())