|
|
@@ -13,6 +13,7 @@ import ( |
|
|
"gitlink.org.cn/JointCloud/pcm-coordinator/internal/types" |
|
|
"gitlink.org.cn/JointCloud/pcm-coordinator/internal/types" |
|
|
"gitlink.org.cn/JointCloud/pcm-coordinator/pkg/models" |
|
|
"gitlink.org.cn/JointCloud/pcm-coordinator/pkg/models" |
|
|
"gitlink.org.cn/JointCloud/pcm-coordinator/pkg/utils" |
|
|
"gitlink.org.cn/JointCloud/pcm-coordinator/pkg/utils" |
|
|
|
|
|
"regexp" |
|
|
"strconv" |
|
|
"strconv" |
|
|
"strings" |
|
|
"strings" |
|
|
"sync" |
|
|
"sync" |
|
|
@@ -85,13 +86,67 @@ func (l *CommitHpcTaskLogic) getClusterInfo(clusterID string) (*types.ClusterInf |
|
|
return &clusterInfo, &adapterInfo, nil |
|
|
return &clusterInfo, &adapterInfo, nil |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
// 自定义函数映射 |
|
|
|
|
|
func createFuncMap() template.FuncMap { |
|
|
|
|
|
return template.FuncMap{ |
|
|
|
|
|
"regexMatch": regexMatch, |
|
|
|
|
|
"required": required, |
|
|
|
|
|
"error": errorHandler, |
|
|
|
|
|
"default": defaultHandler, |
|
|
|
|
|
} |
|
|
|
|
|
} |
|
|
|
|
|
func extractUserError(originalErr error) error { |
|
|
|
|
|
// 尝试匹配模板引擎返回的错误格式 |
|
|
|
|
|
re := regexp.MustCompile(`error calling \w+: (.*)$`) |
|
|
|
|
|
matches := re.FindStringSubmatch(originalErr.Error()) |
|
|
|
|
|
if len(matches) > 1 { |
|
|
|
|
|
return errors.New(matches[1]) |
|
|
|
|
|
} |
|
|
|
|
|
return originalErr |
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
// 正则匹配函数 |
|
|
|
|
|
func regexMatch(pattern string) *regexp.Regexp { |
|
|
|
|
|
return regexp.MustCompile(pattern) |
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
// 必填字段检查 |
|
|
|
|
|
func required(msg string, val interface{}) (interface{}, error) { |
|
|
|
|
|
if val == nil || val == "" { |
|
|
|
|
|
return nil, errors.New(msg) |
|
|
|
|
|
} |
|
|
|
|
|
return val, nil |
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
// 错误处理函数 |
|
|
|
|
|
func errorHandler(msg string) (string, error) { |
|
|
|
|
|
return "", errors.New(msg) |
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
// 默认值处理函数 |
|
|
|
|
|
func defaultHandler(defaultVal interface{}, val interface{}) interface{} { |
|
|
|
|
|
switch v := val.(type) { |
|
|
|
|
|
case nil: |
|
|
|
|
|
return defaultVal |
|
|
|
|
|
case string: |
|
|
|
|
|
if v == "" { |
|
|
|
|
|
return defaultVal |
|
|
|
|
|
} |
|
|
|
|
|
case int: |
|
|
|
|
|
if v == 0 { |
|
|
|
|
|
return defaultVal |
|
|
|
|
|
} |
|
|
|
|
|
// 可根据需要添加其他类型判断 |
|
|
|
|
|
} |
|
|
|
|
|
return val |
|
|
|
|
|
} |
|
|
func (l *CommitHpcTaskLogic) RenderJobScript(templateContent string, req *JobRequest) (string, error) { |
|
|
func (l *CommitHpcTaskLogic) RenderJobScript(templateContent string, req *JobRequest) (string, error) { |
|
|
// 使用缓存模板 |
|
|
// 使用缓存模板 |
|
|
tmpl, ok := templateCache.Load(templateContent) |
|
|
tmpl, ok := templateCache.Load(templateContent) |
|
|
if !ok { |
|
|
if !ok { |
|
|
parsedTmpl, err := template.New("jobScript").Parse(templateContent) |
|
|
|
|
|
|
|
|
parsedTmpl, err := template.New("slurmTemplate").Funcs(createFuncMap()).Parse(templateContent) |
|
|
if err != nil { |
|
|
if err != nil { |
|
|
return "", fmt.Errorf("template parse failed: %w", err) |
|
|
|
|
|
|
|
|
return "", err |
|
|
} |
|
|
} |
|
|
templateCache.Store(templateContent, parsedTmpl) |
|
|
templateCache.Store(templateContent, parsedTmpl) |
|
|
tmpl = parsedTmpl |
|
|
tmpl = parsedTmpl |
|
|
@@ -104,7 +159,8 @@ func (l *CommitHpcTaskLogic) RenderJobScript(templateContent string, req *JobReq |
|
|
|
|
|
|
|
|
var buf strings.Builder |
|
|
var buf strings.Builder |
|
|
if err := tmpl.(*template.Template).Execute(&buf, params); err != nil { |
|
|
if err := tmpl.(*template.Template).Execute(&buf, params); err != nil { |
|
|
return "", fmt.Errorf("template render failed: %w", err) |
|
|
|
|
|
|
|
|
log.Error().Err(err).Msg("模板渲染失败") |
|
|
|
|
|
return "", extractUserError(err) |
|
|
} |
|
|
} |
|
|
return buf.String(), nil |
|
|
return buf.String(), nil |
|
|
} |
|
|
} |
|
|
@@ -235,36 +291,39 @@ func (l *CommitHpcTaskLogic) CommitHpcTask(req *types.CommitHpcTaskReq) (resp *t |
|
|
if err != nil { |
|
|
if err != nil { |
|
|
return nil, err |
|
|
return nil, err |
|
|
} |
|
|
} |
|
|
|
|
|
scriptContent := req.ScriptContent |
|
|
|
|
|
if scriptContent == "" { |
|
|
|
|
|
// 获取模板 |
|
|
|
|
|
var templateInfo types.HpcAppTemplateInfo |
|
|
|
|
|
tx := l.svcCtx.DbEngin.Table("hpc_app_template"). |
|
|
|
|
|
Where("cluster_id = ? and app = ? ", req.ClusterId, req.App) |
|
|
|
|
|
if req.OperateType != "" { |
|
|
|
|
|
tx.Where("app_type = ?", req.OperateType) |
|
|
|
|
|
} |
|
|
|
|
|
if err := tx.First(&templateInfo).Error; err != nil { |
|
|
|
|
|
return nil, fmt.Errorf("failed to get template: %w", err) |
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
// 获取模板 |
|
|
|
|
|
var templateInfo types.HpcAppTemplateInfo |
|
|
|
|
|
tx := l.svcCtx.DbEngin.Table("hpc_app_template"). |
|
|
|
|
|
Where("cluster_id = ? and app = ? ", req.ClusterId, req.App) |
|
|
|
|
|
if req.OperateType != "" { |
|
|
|
|
|
tx.Where("app_type = ?", req.OperateType) |
|
|
|
|
|
} |
|
|
|
|
|
if err := tx.First(&templateInfo).Error; err != nil { |
|
|
|
|
|
return nil, fmt.Errorf("failed to get template: %w", err) |
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
// 转换请求参数 |
|
|
|
|
|
jobRequest, err := ConvertToJobRequest(req) |
|
|
|
|
|
if err != nil { |
|
|
|
|
|
return nil, fmt.Errorf("invalid job request: %w", err) |
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
// 转换请求参数 |
|
|
|
|
|
jobRequest, err := ConvertToJobRequest(req) |
|
|
|
|
|
if err != nil { |
|
|
|
|
|
return nil, err |
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
// 渲染脚本 |
|
|
|
|
|
script, err := l.RenderJobScript(templateInfo.Content, &jobRequest) |
|
|
|
|
|
if err != nil { |
|
|
|
|
|
return nil, fmt.Errorf("script rendering failed: %w", err) |
|
|
|
|
|
|
|
|
// 渲染脚本 |
|
|
|
|
|
script, err := l.RenderJobScript(templateInfo.Content, &jobRequest) |
|
|
|
|
|
if err != nil { |
|
|
|
|
|
return nil, err |
|
|
|
|
|
} |
|
|
|
|
|
scriptContent = script |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
q, _ := jsoniter.MarshalToString(script) |
|
|
|
|
|
|
|
|
q, _ := jsoniter.MarshalToString(scriptContent) |
|
|
submitQ := types.SubmitHpcTaskReq{ |
|
|
submitQ := types.SubmitHpcTaskReq{ |
|
|
App: req.App, |
|
|
App: req.App, |
|
|
ClusterId: req.ClusterId, |
|
|
ClusterId: req.ClusterId, |
|
|
JobName: jobName, |
|
|
JobName: jobName, |
|
|
ScriptContent: script, |
|
|
|
|
|
|
|
|
ScriptContent: scriptContent, |
|
|
Parameters: req.Parameters, |
|
|
Parameters: req.Parameters, |
|
|
Backend: req.Backend, |
|
|
Backend: req.Backend, |
|
|
} |
|
|
} |
|
|
@@ -276,7 +335,7 @@ func (l *CommitHpcTaskLogic) CommitHpcTask(req *types.CommitHpcTaskReq) (resp *t |
|
|
|
|
|
|
|
|
jobID := resp.Data.JobInfo["jobId"] |
|
|
jobID := resp.Data.JobInfo["jobId"] |
|
|
workDir := resp.Data.JobInfo["jobDir"] |
|
|
workDir := resp.Data.JobInfo["jobDir"] |
|
|
taskID, err := l.SaveHpcTaskToDB(req, script, jobID, workDir) |
|
|
|
|
|
|
|
|
taskID, err := l.SaveHpcTaskToDB(req, scriptContent, jobID, workDir) |
|
|
if err != nil { |
|
|
if err != nil { |
|
|
log.Error().Msgf("Failed to save task to DB: %v", err) |
|
|
log.Error().Msgf("Failed to save task to DB: %v", err) |
|
|
return nil, fmt.Errorf("db save failed: %w", err) |
|
|
return nil, fmt.Errorf("db save failed: %w", err) |
|
|
|