package hpc import ( "context" "errors" "github.com/go-resty/resty/v2" clientCore "gitlink.org.cn/JointCloud/pcm-coordinator/client" "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/models" "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/utils/remoteUtil" v1 "gitlink.org.cn/JointCloud/pcm-hpc/routers/v1" "k8s.io/apimachinery/pkg/util/json" "strconv" "time" "gitlink.org.cn/JointCloud/pcm-coordinator/internal/svc" "gitlink.org.cn/JointCloud/pcm-coordinator/internal/types" "github.com/zeromicro/go-zero/core/logx" ) type CommitHpcTaskLogic struct { logx.Logger ctx context.Context svcCtx *svc.ServiceContext } func NewCommitHpcTaskLogic(ctx context.Context, svcCtx *svc.ServiceContext) *CommitHpcTaskLogic { return &CommitHpcTaskLogic{ Logger: logx.WithContext(ctx), ctx: ctx, svcCtx: svcCtx, } } func (l *CommitHpcTaskLogic) CommitHpcTask(req *types.CommitHpcTaskReq) (resp *types.CommitHpcTaskResp, err error) { var clusterInfo types.ClusterInfo l.svcCtx.DbEngin.Raw("SELECT * FROM `t_cluster` where id = ?", req.ClusterId).First(&clusterInfo) if len(clusterInfo.Id) == 0 { return resp, errors.New("cluster not found") } // 构建主任务结构体 taskModel := models.Task{ Name: req.Name, Description: req.Description, CommitTime: time.Now(), Status: "Running", AdapterTypeDict: "2", } // 保存任务数据到数据库 tx := l.svcCtx.DbEngin.Create(&taskModel) if tx.Error != nil { return nil, tx.Error } var adapterName string l.svcCtx.DbEngin.Raw("SELECT name FROM `t_adapter` where id = ?", clusterInfo.AdapterId).Scan(&adapterName) var server string l.svcCtx.DbEngin.Raw("SELECT server FROM `t_adapter` where id = ?", clusterInfo.AdapterId).Scan(&server) if len(adapterName) == 0 || adapterName == "" { return nil, errors.New("no corresponding adapter found") } clusterId, err := strconv.ParseInt(req.ClusterId, 10, 64) hpcInfo := models.TaskHpc{ TaskId: taskModel.Id, AdapterId: clusterInfo.AdapterId, AdapterName: adapterName, ClusterId: clusterId, ClusterName: clusterInfo.Name, Name: taskModel.Name, CmdScript: req.CmdScript, StartTime: time.Now().String(), CardCount: req.CardCount, WorkDir: req.WorkDir, WallTime: req.WallTime, AppType: req.AppType, AppName: req.AppName, Queue: req.Queue, SubmitType: req.SubmitType, NNode: req.NNode, Account: clusterInfo.Username, StdInput: req.StdInput, Partition: req.Partition, CreatedTime: time.Now(), UpdatedTime: time.Now(), Status: "Running", } hpcInfo.WorkDir = clusterInfo.WorkDir + req.WorkDir tx = l.svcCtx.DbEngin.Create(&hpcInfo) if tx.Error != nil { return nil, tx.Error } // 提交job到指定集群 logx.Info("提交job到指定集群") jobId, err := submitJob(&hpcInfo, &clusterInfo, server) logx.Info("提交job到指定集群完成") if err != nil { return nil, err } // 保存操作记录 noticeInfo := clientCore.NoticeInfo{ AdapterId: clusterInfo.AdapterId, AdapterName: adapterName, ClusterId: clusterId, ClusterName: clusterInfo.Name, NoticeType: "create", TaskName: req.Name, Incident: "任务创建中", CreatedTime: time.Now(), } result := l.svcCtx.DbEngin.Table("t_notice").Create(¬iceInfo) if result.Error != nil { logx.Errorf("Task creation failure, err: %v", result.Error) } resp = &types.CommitHpcTaskResp{ JobId: string(jobId), } // 数据上链 bytes, _ := json.Marshal(taskModel) remoteUtil.Evidence(remoteUtil.EvidenceParam{ Url: l.svcCtx.Config.BlockChain.Url, ContractAddress: l.svcCtx.Config.BlockChain.ContractAddress, FunctionName: l.svcCtx.Config.BlockChain.FunctionName, MemberName: l.svcCtx.Config.BlockChain.MemberName, Type: l.svcCtx.Config.BlockChain.Type, Args: []string{strconv.FormatInt(taskModel.Id, 10), string(bytes)}, }) return resp, nil } func submitJob(hpcInfo *models.TaskHpc, clusterInfo *types.ClusterInfo, adapterAddress string) (int, error) { SubmitJobReq := v1.SubmitJobReq{ Server: clusterInfo.Server, Version: clusterInfo.Version, Username: clusterInfo.Username, Token: clusterInfo.Token, JobOptions: v1.JobOptions{ Script: hpcInfo.CmdScript, Job: &v1.JobProperties{ Account: hpcInfo.Account, Name: hpcInfo.Name, NTasks: 1, CurrentWorkingDirectory: hpcInfo.WorkDir, Partition: hpcInfo.Partition, Environment: map[string]string{"PATH": clusterInfo.EnvPath, "LD_LIBRARY_PATH": clusterInfo.EnvLdPath}, StandardOutput: hpcInfo.WorkDir + "/job.out", StandardError: hpcInfo.WorkDir + "/job.err", }, }, } var resp v1.SubmitJobResp httpClient := resty.New().R() logx.Info("远程调用p端接口开始") _, err := httpClient.SetHeader("Content-Type", "application/json"). SetBody(SubmitJobReq). SetResult(&resp). Post(adapterAddress + "/api/v1/job/submit") logx.Info("远程调用p端接口完成") if err != nil { return 0, err } return resp.JobId, nil }