|
|
@@ -6,6 +6,7 @@ import ( |
|
|
jsoniter "github.com/json-iterator/go" |
|
|
jsoniter "github.com/json-iterator/go" |
|
|
"github.com/pkg/errors" |
|
|
"github.com/pkg/errors" |
|
|
"github.com/rs/zerolog/log" |
|
|
"github.com/rs/zerolog/log" |
|
|
|
|
|
"github.com/zeromicro/go-zero/core/logc" |
|
|
"github.com/zeromicro/go-zero/core/logx" |
|
|
"github.com/zeromicro/go-zero/core/logx" |
|
|
clientCore "gitlink.org.cn/JointCloud/pcm-coordinator/client" |
|
|
clientCore "gitlink.org.cn/JointCloud/pcm-coordinator/client" |
|
|
"gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/service" |
|
|
"gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/service" |
|
|
@@ -285,6 +286,12 @@ func (l *CommitHpcTaskLogic) SaveHpcTaskToDB(req *types.CommitHpcTaskReq, jobScr |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
func (l *CommitHpcTaskLogic) CommitHpcTask(req *types.CommitHpcTaskReq) (resp *types.CommitHpcTaskResp, err error) { |
|
|
func (l *CommitHpcTaskLogic) CommitHpcTask(req *types.CommitHpcTaskReq) (resp *types.CommitHpcTaskResp, err error) { |
|
|
|
|
|
reqJSON, err := jsoniter.MarshalToString(req) |
|
|
|
|
|
if err != nil { |
|
|
|
|
|
return nil, fmt.Errorf("failed to marshal request: %w", err) |
|
|
|
|
|
} |
|
|
|
|
|
logc.Infof(l.ctx, "提交超算任务请求参数: %s", reqJSON) |
|
|
|
|
|
|
|
|
jobName := generateJobName(req) |
|
|
jobName := generateJobName(req) |
|
|
req.Parameters["jobName"] = jobName |
|
|
req.Parameters["jobName"] = jobName |
|
|
|
|
|
|
|
|
@@ -303,7 +310,7 @@ func (l *CommitHpcTaskLogic) CommitHpcTask(req *types.CommitHpcTaskReq) (resp *t |
|
|
tx.Where("app_type = ?", req.OperateType) |
|
|
tx.Where("app_type = ?", req.OperateType) |
|
|
} |
|
|
} |
|
|
if err := tx.First(&templateInfo).Error; err != nil { |
|
|
if err := tx.First(&templateInfo).Error; err != nil { |
|
|
return nil, fmt.Errorf("failed to get template: %w", err) |
|
|
|
|
|
|
|
|
return nil, fmt.Errorf("获取HPC应用【%s】模板失败: %w", req.App, err) |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
// 转换请求参数 |
|
|
// 转换请求参数 |
|
|
@@ -332,7 +339,7 @@ func (l *CommitHpcTaskLogic) CommitHpcTask(req *types.CommitHpcTaskReq) (resp *t |
|
|
log.Info().Msgf("Submitting HPC task to cluster %s with params: %s", clusterInfo.Name, q) |
|
|
log.Info().Msgf("Submitting HPC task to cluster %s with params: %s", clusterInfo.Name, q) |
|
|
resp, err = l.hpcService.HpcExecutorAdapterMap[adapterInfo.Id].SubmitTask(l.ctx, submitQ) |
|
|
resp, err = l.hpcService.HpcExecutorAdapterMap[adapterInfo.Id].SubmitTask(l.ctx, submitQ) |
|
|
if err != nil { |
|
|
if err != nil { |
|
|
log.Error().Err(err).Msgf("提交HPC任务失败, cluster: %s, jobName: %s, scriptContent: %s", clusterInfo.Name, jobName, scriptContent) |
|
|
|
|
|
|
|
|
log.Error().Err(err).Msgf("提交超算任务失败, cluster: %s, jobName: %s, scriptContent: %s", clusterInfo.Name, jobName, scriptContent) |
|
|
return nil, fmt.Errorf("网络请求失败,请稍后重试") |
|
|
return nil, fmt.Errorf("网络请求失败,请稍后重试") |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
@@ -340,8 +347,8 @@ func (l *CommitHpcTaskLogic) CommitHpcTask(req *types.CommitHpcTaskReq) (resp *t |
|
|
workDir := resp.Data.JobInfo["jobDir"] |
|
|
workDir := resp.Data.JobInfo["jobDir"] |
|
|
taskID, err := l.SaveHpcTaskToDB(req, scriptContent, jobID, workDir) |
|
|
taskID, err := l.SaveHpcTaskToDB(req, scriptContent, jobID, workDir) |
|
|
if err != nil { |
|
|
if err != nil { |
|
|
log.Error().Msgf("Failed to save task to DB: %v", err) |
|
|
|
|
|
return nil, fmt.Errorf("db save failed: %w", err) |
|
|
|
|
|
|
|
|
log.Error().Msgf("超算任务保存到数据库失败, cluster: %s, jobName: %s, scriptContent: %s, error: %v", clusterInfo.Name, jobName, scriptContent, err) |
|
|
|
|
|
return nil, fmt.Errorf("保存超算任务到数据库失败: %w", err) |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
resp.Data.JobInfo["taskId"] = taskID |
|
|
resp.Data.JobInfo["taskId"] = taskID |
|
|
|