From 4f7e47485b903e18ff0394b40ce6386a11de2ab2 Mon Sep 17 00:00:00 2001 From: jagger Date: Fri, 15 Aug 2025 17:36:05 +0800 Subject: [PATCH] =?UTF-8?q?xjlab=20=E6=8E=A5=E5=8F=A3?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- etc/pcm.yaml | 5 +- .../handler/cloud/commitgeneraltaskhandler.go | 4 +- .../handler/cloud/containercreatehandler.go | 6 +- internal/handler/hpc/commithpctaskhandler.go | 9 +- internal/handler/routes.go | 25 ++ .../schedule/schedulecreatetaskhandler.go | 4 +- internal/handler/xjlab/task.go | 64 +++ .../logic/cloud/commitgeneraltasklogic.go | 10 +- internal/logic/cloud/containercreatelogic.go | 8 +- internal/logic/hpc/commithpctasklogic.go | 96 +++-- .../logic/schedule/schedulecreatetasklogic.go | 15 +- .../logic/schedule/schedulesubmitlogic.go | 7 +- internal/logic/xjlab/pagelisttasklogic.go | 275 ++++++++++++ internal/logic/xjlab/task_resource_usage.go | 27 ++ .../logic/xjlab/task_status_statistics.go | 155 +++++++ internal/scheduler/database/aiStorage.go | 46 +- internal/scheduler/scheduler.go | 7 +- internal/scheduler/schedulers/aiScheduler.go | 9 +- .../imageInference/imageInference.go | 19 +- .../inference/textInference/textInference.go | 2 +- internal/types/cloud/container.go | 1 + internal/types/types.go | 52 +++ pkg/constants/const.go | 24 ++ pkg/models/taskaimodel_gen.go | 43 +- pkg/models/taskhpcmodel_gen.go | 107 ++--- pkg/models/taskmodel_gen.go | 1 + pkg/utils/slurm_parser.go | 394 ++++++++++++++++++ 27 files changed, 1239 insertions(+), 176 deletions(-) create mode 100644 internal/handler/xjlab/task.go create mode 100644 internal/logic/xjlab/pagelisttasklogic.go create mode 100644 internal/logic/xjlab/task_resource_usage.go create mode 100644 internal/logic/xjlab/task_status_statistics.go create mode 100644 pkg/constants/const.go create mode 100644 pkg/utils/slurm_parser.go diff --git a/etc/pcm.yaml b/etc/pcm.yaml index 3c1303e6..df52154a 100644 --- a/etc/pcm.yaml +++ b/etc/pcm.yaml @@ -94,4 +94,7 @@ JcsMiddleware: Participant: AdapterId: "1777144940456666666" - CloudAdapterId: "1770658294298316800" \ No newline at end of file + CloudAdapterId: "1770658294298316800" + +JccUserService: + Url: http://jcce-admin:8082/jcc-admin/admin/user/{id} \ No newline at end of file diff --git a/internal/handler/cloud/commitgeneraltaskhandler.go b/internal/handler/cloud/commitgeneraltaskhandler.go index 66e8997a..1559cf5b 100644 --- a/internal/handler/cloud/commitgeneraltaskhandler.go +++ b/internal/handler/cloud/commitgeneraltaskhandler.go @@ -1,11 +1,12 @@ package cloud import ( + "net/http" + "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/models" "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/repository/result" "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/utils" "k8s.io/apimachinery/pkg/util/json" - "net/http" "github.com/zeromicro/go-zero/rest/httpx" "gitlink.org.cn/JointCloud/pcm-coordinator/internal/logic/cloud" @@ -32,6 +33,7 @@ func CommitGeneralTaskHandler(svcCtx *svc.ServiceContext) http.HandlerFunc { user := &models.JccUserInfo{} json.Unmarshal([]byte(userStr), user) req.UserId = user.Id + req.UserName = user.UserName l := cloud.NewCommitGeneralTaskLogic(r.Context(), svcCtx) resp, err := l.CommitGeneralTask(&req) result.HttpResult(r, w, resp, err) diff --git a/internal/handler/cloud/containercreatehandler.go b/internal/handler/cloud/containercreatehandler.go index f2fbf142..dff011b2 100644 --- a/internal/handler/cloud/containercreatehandler.go +++ b/internal/handler/cloud/containercreatehandler.go @@ -1,12 +1,13 @@ package cloud import ( + "io" + "net/http" + "gitlink.org.cn/JointCloud/pcm-coordinator/internal/logic/cloud" "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/models" "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/repository/result" - "io" "k8s.io/apimachinery/pkg/util/json" - "net/http" "gitlink.org.cn/JointCloud/pcm-coordinator/internal/svc" container "gitlink.org.cn/JointCloud/pcm-coordinator/internal/types/cloud" @@ -30,6 +31,7 @@ func ContainerCreateHandler(svcCtx *svc.ServiceContext) http.HandlerFunc { user := &models.JccUserInfo{} json.Unmarshal([]byte(userStr), user) req.UserId = user.Id + req.UserName = user.UserName l := cloud.NewContainerCreateLogic(r.Context(), svcCtx) resp, err := l.ContainerCreate(&req) result.HttpResult(r, w, resp, err) diff --git a/internal/handler/hpc/commithpctaskhandler.go b/internal/handler/hpc/commithpctaskhandler.go index fec2dd4a..ebf5344e 100644 --- a/internal/handler/hpc/commithpctaskhandler.go +++ b/internal/handler/hpc/commithpctaskhandler.go @@ -1,14 +1,16 @@ package hpc import ( + "net/http" + "strconv" + "github.com/zeromicro/go-zero/rest/httpx" "gitlink.org.cn/JointCloud/pcm-coordinator/internal/logic/hpc" "gitlink.org.cn/JointCloud/pcm-coordinator/internal/svc" "gitlink.org.cn/JointCloud/pcm-coordinator/internal/types" + "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/constants" "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/repository/result" "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/utils" - "net/http" - "strconv" ) func CommitHpcTaskHandler(svcCtx *svc.ServiceContext) http.HandlerFunc { @@ -30,7 +32,8 @@ func CommitHpcTaskHandler(svcCtx *svc.ServiceContext) http.HandlerFunc { result.ParamErrorResult(r, w, err) return } - req.Parameters["UserId"] = strconv.FormatInt(jccUserInfo.Id, 10) + req.Parameters[constants.UserId] = strconv.FormatInt(jccUserInfo.Id, 10) + req.Parameters[constants.UserName] = jccUserInfo.UserName l := hpc.NewCommitHpcTaskLogic(r.Context(), svcCtx) resp, err := l.CommitHpcTask(&req) diff --git a/internal/handler/routes.go b/internal/handler/routes.go index c7cc4234..37160a2b 100644 --- a/internal/handler/routes.go +++ b/internal/handler/routes.go @@ -16,6 +16,7 @@ import ( storage "gitlink.org.cn/JointCloud/pcm-coordinator/internal/handler/storage" storelink "gitlink.org.cn/JointCloud/pcm-coordinator/internal/handler/storelink" vm "gitlink.org.cn/JointCloud/pcm-coordinator/internal/handler/vm" + xjlab "gitlink.org.cn/JointCloud/pcm-coordinator/internal/handler/xjlab" "gitlink.org.cn/JointCloud/pcm-coordinator/internal/svc" "github.com/zeromicro/go-zero/rest" @@ -1735,4 +1736,28 @@ func RegisterHandlers(server *rest.Server, serverCtx *svc.ServiceContext) { }, rest.WithPrefix("/pcm/v1"), ) + + server.AddRoutes( + []rest.Route{ + { + // 查询任务列表 + Method: http.MethodGet, + Path: "/xjlab/taskList", + Handler: xjlab.TaskListHandler(serverCtx), + }, + { + // 查询指定任务资源使用情况 + Method: http.MethodGet, + Path: "/xjlab/taskResourceUsage", + Handler: xjlab.TaskResourceUsageHandler(serverCtx), + }, + { + //任务状态监控 + Method: http.MethodGet, + Path: "/xjlab/taskStatusStatistics", + Handler: xjlab.TaskStatusStatisticsHandler(serverCtx), + }, + }, + rest.WithPrefix("/pcm/v1"), + ) } diff --git a/internal/handler/schedule/schedulecreatetaskhandler.go b/internal/handler/schedule/schedulecreatetaskhandler.go index a430ebb3..0ec267d4 100644 --- a/internal/handler/schedule/schedulecreatetaskhandler.go +++ b/internal/handler/schedule/schedulecreatetaskhandler.go @@ -1,13 +1,14 @@ package schedule import ( + "net/http" + "github.com/zeromicro/go-zero/rest/httpx" "gitlink.org.cn/JointCloud/pcm-coordinator/internal/logic/schedule" "gitlink.org.cn/JointCloud/pcm-coordinator/internal/svc" "gitlink.org.cn/JointCloud/pcm-coordinator/internal/types" "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/repository/result" "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/utils" - "net/http" ) func ScheduleCreateTaskHandler(svcCtx *svc.ServiceContext) http.HandlerFunc { @@ -29,6 +30,7 @@ func ScheduleCreateTaskHandler(svcCtx *svc.ServiceContext) http.HandlerFunc { return } req.UserId = jccUserInfo.Id + req.UserName = jccUserInfo.UserName l := schedule.NewScheduleCreateTaskLogic(r.Context(), svcCtx) resp, err := l.ScheduleCreateTask(&req) result.HttpResult(r, w, resp, err) diff --git a/internal/handler/xjlab/task.go b/internal/handler/xjlab/task.go new file mode 100644 index 00000000..b59dd6a6 --- /dev/null +++ b/internal/handler/xjlab/task.go @@ -0,0 +1,64 @@ +package xjlab + +import ( + "net/http" + + "github.com/zeromicro/go-zero/rest/httpx" + "gitlink.org.cn/JointCloud/pcm-coordinator/internal/logic/xjlab" + "gitlink.org.cn/JointCloud/pcm-coordinator/internal/svc" + "gitlink.org.cn/JointCloud/pcm-coordinator/internal/types" + "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/repository/result" + "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/utils" +) + +func TaskListHandler(ctx *svc.ServiceContext) http.HandlerFunc { + return func(w http.ResponseWriter, r *http.Request) { + var req types.XJLABTaskReq + if err := httpx.Parse(r, &req); err != nil { + result.ParamErrorResult(r, w, err) + return + } + token := r.Header.Get("Authorization") + // 获取用户信息 + jccUserInfo, err := utils.ParseTokenWithoutVerify(token) + if err != nil { + result.ParamErrorResult(r, w, err) + return + } + req.UserId = jccUserInfo.Id + if req.UserName == "" { + req.UserName = jccUserInfo.UserName + } + l := xjlab.NewPageListTaskLogic(r.Context(), ctx) + resp, err := l.PageListTask(&req) + result.HttpResult(r, w, resp, err) + } +} + +func TaskResourceUsageHandler(svcCtx *svc.ServiceContext) http.HandlerFunc { + return func(w http.ResponseWriter, r *http.Request) { + var req types.FId + if err := httpx.Parse(r, &req); err != nil { + result.ParamErrorResult(r, w, err) + return + } + + l := xjlab.NewTaskResourceUsageLogic(r.Context(), svcCtx) + resp, err := l.TaskResourceUsage(&req) + result.HttpResult(r, w, resp, err) + } +} + +func TaskStatusStatisticsHandler(svcCtx *svc.ServiceContext) http.HandlerFunc { + return func(w http.ResponseWriter, r *http.Request) { + var req types.XJLABCommonReq + if err := httpx.Parse(r, &req); err != nil { + result.ParamErrorResult(r, w, err) + return + } + + l := xjlab.NewTaskStatusStatisticsLogic(r.Context(), svcCtx) + resp, err := l.GetSimpleTaskStatistics(&req) + result.HttpResult(r, w, resp, err) + } +} diff --git a/internal/logic/cloud/commitgeneraltasklogic.go b/internal/logic/cloud/commitgeneraltasklogic.go index 1759ac72..2d213b5b 100644 --- a/internal/logic/cloud/commitgeneraltasklogic.go +++ b/internal/logic/cloud/commitgeneraltasklogic.go @@ -3,6 +3,11 @@ package cloud import ( "bytes" "context" + "io" + "strconv" + "strings" + "time" + "github.com/pkg/errors" clientCore "gitlink.org.cn/JointCloud/pcm-coordinator/client" "gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/schedulers" @@ -13,15 +18,11 @@ import ( "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/models/cloud" "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/utils" "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/utils/remoteUtil" - "io" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/apimachinery/pkg/runtime" syaml "k8s.io/apimachinery/pkg/runtime/serializer/yaml" "k8s.io/apimachinery/pkg/util/json" kyaml "k8s.io/apimachinery/pkg/util/yaml" - "strconv" - "strings" - "time" "gitlink.org.cn/JointCloud/pcm-coordinator/internal/svc" "gitlink.org.cn/JointCloud/pcm-coordinator/internal/types" @@ -102,6 +103,7 @@ func (l *CommitGeneralTaskLogic) CommitGeneralTask(req *types.GeneralTaskReq) (r SynergyStatus: synergyStatus, Strategy: strategy, UserId: req.UserId, + UserName: req.UserName, } resp.TaskId = taskModel.Id var taskClouds []cloud.TaskCloudModel diff --git a/internal/logic/cloud/containercreatelogic.go b/internal/logic/cloud/containercreatelogic.go index 5c4c0f95..f393d668 100644 --- a/internal/logic/cloud/containercreatelogic.go +++ b/internal/logic/cloud/containercreatelogic.go @@ -18,6 +18,10 @@ import ( "context" "errors" "fmt" + "net/http" + "strconv" + "time" + "github.com/zeromicro/go-zero/core/logx" "gitlink.org.cn/JointCloud/pcm-coordinator/internal/participant/cloud" "gitlink.org.cn/JointCloud/pcm-coordinator/internal/svc" @@ -26,9 +30,6 @@ import ( "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/models" cloud2 "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/models/cloud" "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/utils" - "net/http" - "strconv" - "time" ) type ContainerCreateLogic struct { @@ -79,6 +80,7 @@ func (l *ContainerCreateLogic) ContainerCreate(req *container.CreateParam) (resp Description: req.Description, Name: req.Name, UserId: req.UserId, + UserName: req.UserName, AdapterTypeDict: "0", CommitTime: time.Now(), } diff --git a/internal/logic/hpc/commithpctasklogic.go b/internal/logic/hpc/commithpctasklogic.go index 0c667b3d..4f358f84 100644 --- a/internal/logic/hpc/commithpctasklogic.go +++ b/internal/logic/hpc/commithpctasklogic.go @@ -3,6 +3,13 @@ package hpc import ( "context" "fmt" + "regexp" + "strconv" + "strings" + "sync" + "text/template" + "time" + jsoniter "github.com/json-iterator/go" "github.com/pkg/errors" "github.com/rs/zerolog/log" @@ -12,14 +19,9 @@ import ( "gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/service" "gitlink.org.cn/JointCloud/pcm-coordinator/internal/svc" "gitlink.org.cn/JointCloud/pcm-coordinator/internal/types" + "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/constants" "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/models" "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/utils" - "regexp" - "strconv" - "strings" - "sync" - "text/template" - "time" ) type CommitHpcTaskLogic struct { @@ -29,12 +31,6 @@ type CommitHpcTaskLogic struct { hpcService *service.HpcService } -const ( - statusSaved = "Saved" - statusDeploying = "Deploying" - adapterTypeHPC = "2" -) - type JobRequest struct { App string `json:"app"` Common CommonParams `json:"common"` @@ -201,16 +197,18 @@ func (l *CommitHpcTaskLogic) SaveHpcTaskToDB(req *types.CommitHpcTaskReq, jobScr } }() - userID, _ := strconv.ParseInt(req.Parameters["UserId"], 10, 64) + userID, _ := strconv.ParseInt(req.Parameters[constants.UserId], 10, 64) taskID := utils.GenSnowflakeID() + taskModel := models.Task{ Id: taskID, Name: req.Name, Description: req.Description, CommitTime: time.Now(), - Status: statusSaved, - AdapterTypeDict: adapterTypeHPC, + Status: constants.StatusSaved, + AdapterTypeDict: constants.AdapterTypeHPC, UserId: userID, + UserName: req.Parameters[constants.UserName], } if err = tx.Table("task").Create(&taskModel).Error; err != nil { @@ -226,36 +224,48 @@ func (l *CommitHpcTaskLogic) SaveHpcTaskToDB(req *types.CommitHpcTaskReq, jobScr if err != nil { return "", fmt.Errorf("failed to marshal parameters: %w", err) } - + //解析slurm脚本内容 + var resource models.ResourceSpec + if req.Backend == string(constants.HPC_SYSTEM_SLURM) { + parser := utils.NewSlurmParser() + slurmResource := parser.ParseScript(jobScript) + resource = models.ResourceSpec{ + //资源规格名称,采用拼接的方式 集群名+队列名 + ResourceName: fmt.Sprintf("%s_%s", clusterInfo.Name, slurmResource.Partition), + Partition: slurmResource.Partition, + Specifications: slurmResource, + } + } clusterID := utils.StringToInt64(clusterInfo.Id) hpcTask := models.TaskHpc{ - Id: utils.GenSnowflakeID(), - TaskId: taskID, - AdapterId: clusterInfo.AdapterId, - AdapterName: adapterInfo.Name, - ClusterId: clusterID, - ClusterName: clusterInfo.Name, - Name: taskModel.Name, - Backend: req.Backend, - OperateType: req.OperateType, - CmdScript: req.Parameters["cmdScript"], - WallTime: req.Parameters["wallTime"], - AppType: req.Parameters["appType"], - AppName: req.App, - Queue: req.Parameters["queue"], - SubmitType: req.Parameters["submitType"], - NNode: req.Parameters["nNode"], - Account: clusterInfo.Username, - StdInput: req.Parameters["stdInput"], - Partition: req.Parameters["partition"], - CreatedTime: time.Now(), - UpdatedTime: time.Now(), - Status: statusDeploying, - UserId: userID, - Params: paramsJSON, - Script: jobScript, - JobId: jobId, - WorkDir: workDir, + Id: utils.GenSnowflakeID(), + TaskId: taskID, + AdapterId: clusterInfo.AdapterId, + AdapterName: adapterInfo.Name, + ClusterId: clusterID, + ClusterName: clusterInfo.Name, + Name: taskModel.Name, + Backend: req.Backend, + OperateType: req.OperateType, + CmdScript: req.Parameters["cmdScript"], + WallTime: req.Parameters["wallTime"], + AppType: req.Parameters["appType"], + AppName: req.App, + Queue: req.Parameters["queue"], + SubmitType: req.Parameters["submitType"], + NNode: req.Parameters["nNode"], + Account: clusterInfo.Username, + StdInput: req.Parameters["stdInput"], + Partition: req.Parameters["partition"], + CreatedTime: time.Now(), + UpdatedTime: time.Now(), + Status: constants.StatusDeploying, + UserId: userID, + Params: paramsJSON, + Script: jobScript, + JobId: jobId, + WorkDir: workDir, + ResourceSpec: resource, } if err = tx.Table("task_hpc").Create(&hpcTask).Error; err != nil { diff --git a/internal/logic/schedule/schedulecreatetasklogic.go b/internal/logic/schedule/schedulecreatetasklogic.go index 474e54b2..deaa5696 100644 --- a/internal/logic/schedule/schedulecreatetasklogic.go +++ b/internal/logic/schedule/schedulecreatetasklogic.go @@ -3,6 +3,10 @@ package schedule import ( "context" "fmt" + "slices" + "strings" + "time" + "github.com/pkg/errors" "gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/common" "gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/service/collector" @@ -12,9 +16,6 @@ import ( "gitlink.org.cn/JointCloud/pcm-coordinator/internal/svc" "gitlink.org.cn/JointCloud/pcm-coordinator/internal/types" "gopkg.in/yaml.v3" - "slices" - "strings" - "time" "github.com/zeromicro/go-zero/core/logx" ) @@ -167,7 +168,7 @@ func (l *ScheduleCreateTaskLogic) ScheduleCreateTask(req *types.CreateTaskReq) ( // filter data distribution clustersWithDataDistributes := generateFilteredDataDistributes(assignedClusters, req.DataDistributes) - taskId, err := l.createTask(taskName, req.Description, req.UserId, req.JobResources.ScheduleStrategy, clustersWithDataDistributes, req.Token, req.UserIp) + taskId, err := l.createTask(taskName, req.Description, req.UserId, req.JobResources.ScheduleStrategy, clustersWithDataDistributes, req.Token, req.UserIp, req.UserName) if err != nil { return nil, err } @@ -198,7 +199,7 @@ func (l *ScheduleCreateTaskLogic) ScheduleCreateTask(req *types.CreateTaskReq) ( // filter data distribution clustersWithDataDistributes := generateFilteredDataDistributes(assignedClusters, req.DataDistributes) - taskId, err := l.createTask(taskName, req.Description, req.UserId, req.JobResources.ScheduleStrategy, clustersWithDataDistributes, req.Token, req.UserIp) + taskId, err := l.createTask(taskName, req.Description, req.UserId, req.JobResources.ScheduleStrategy, clustersWithDataDistributes, req.Token, req.UserIp, req.UserName) if err != nil { return nil, err } @@ -262,7 +263,7 @@ func (l *ScheduleCreateTaskLogic) getAssignedClustersByStrategy(resources *types return assignedClusters, nil } -func (l *ScheduleCreateTaskLogic) createTask(taskName string, desc string, userId int64, strategyName string, clustersWithDataDistributes *ClustersWithDataDistributes, token string, userIp string) (int64, error) { +func (l *ScheduleCreateTaskLogic) createTask(taskName string, desc string, userId int64, strategyName string, clustersWithDataDistributes *ClustersWithDataDistributes, token string, userIp string, userName string) (int64, error) { var synergyStatus int64 if len(clustersWithDataDistributes.Clusters) > 1 { synergyStatus = 1 @@ -273,7 +274,7 @@ func (l *ScheduleCreateTaskLogic) createTask(taskName string, desc string, userI fmt.Printf("Error while Marshaling. %v", err) } - taskId, err := l.svcCtx.Scheduler.CreateTask(taskName, desc, userId, synergyStatus, strategyName, string(y), token, userIp, &l.svcCtx.Config) + taskId, err := l.svcCtx.Scheduler.CreateTask(taskName, desc, userId, synergyStatus, strategyName, string(y), token, userIp, &l.svcCtx.Config, userName) if err != nil { return 0, err } diff --git a/internal/logic/schedule/schedulesubmitlogic.go b/internal/logic/schedule/schedulesubmitlogic.go index 60bb20b8..93b914e5 100644 --- a/internal/logic/schedule/schedulesubmitlogic.go +++ b/internal/logic/schedule/schedulesubmitlogic.go @@ -2,14 +2,15 @@ package schedule import ( "context" + "strconv" + "strings" + "gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/schedulers" "gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/schedulers/option" "gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/service/executor" "gitlink.org.cn/JointCloud/pcm-coordinator/internal/svc" "gitlink.org.cn/JointCloud/pcm-coordinator/internal/types" "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/constants" - "strconv" - "strings" "github.com/zeromicro/go-zero/core/logx" ) @@ -65,7 +66,7 @@ func (l *ScheduleSubmitLogic) ScheduleSubmit(req *types.ScheduleReq) (resp *type synergystatus = 1 } - taskId, err := l.svcCtx.Scheduler.CreateTask(req.AiOption.TaskName, "", 0, synergystatus, req.AiOption.Strategy, "", req.Token, "", &l.svcCtx.Config) + taskId, err := l.svcCtx.Scheduler.CreateTask(req.AiOption.TaskName, "", 0, synergystatus, req.AiOption.Strategy, "", req.Token, "", &l.svcCtx.Config, "") if err != nil { return nil, err } diff --git a/internal/logic/xjlab/pagelisttasklogic.go b/internal/logic/xjlab/pagelisttasklogic.go new file mode 100644 index 00000000..f01a2f71 --- /dev/null +++ b/internal/logic/xjlab/pagelisttasklogic.go @@ -0,0 +1,275 @@ +package xjlab + +import ( + "context" + "time" + + jsoniter "github.com/json-iterator/go" + "github.com/zeromicro/go-zero/core/logx" + "gitlink.org.cn/JointCloud/pcm-coordinator/internal/svc" + "gitlink.org.cn/JointCloud/pcm-coordinator/internal/types" + "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/constants" + "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/repository/result" + "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/utils/timeutils" + "gorm.io/gorm" +) + +type PageListTaskLogic struct { + logx.Logger + ctx context.Context + svcCtx *svc.ServiceContext +} + +func NewPageListTaskLogic(ctx context.Context, svcCtx *svc.ServiceContext) *PageListTaskLogic { + return &PageListTaskLogic{ + Logger: logx.WithContext(ctx), + ctx: ctx, + svcCtx: svcCtx, + } +} + +type TaskResp struct { + Id int64 `json:"id,omitempty,string" db:"id"` // id + Name string `json:"name,omitempty" db:"name"` // 作业名称 + Description string `json:"description,omitempty" db:"description"` // 作业描述 + Status string `json:"status,omitempty" db:"status"` // 作业状态 + Strategy int64 `json:"strategy" db:"strategy"` // 策略 + SynergyStatus int64 `json:"synergyStatus" db:"synergy_status"` // 协同状态(0-未协同、1-已协同) + CommitTime string `json:"commitTime,omitempty" db:"commit_time"` // 提交时间 + StartTime string `json:"startTime,omitempty" db:"start_time"` // 开始时间 + EndTime string `json:"endTime,omitempty" db:"end_time"` // 结束运行时间 + RunningTime int64 `json:"runningTime" db:"running_time"` // 已运行时间(单位秒) + YamlString string `json:"yamlString,omitempty" db:"yaml_string"` + Result string `json:"result,omitempty" db:"result"` // 作业结果 + DeletedAt string `json:"deletedAt,omitempty" gorm:"index" db:"deleted_at"` + NsID string `json:"nsId,omitempty" db:"ns_id"` + TenantId string `json:"tenantId,omitempty" db:"tenant_id"` + CreatedTime string `json:"createdTime,omitempty" db:"created_time" gorm:"autoCreateTime"` + UpdatedTime string `json:"updatedTime,omitempty" db:"updated_time"` + AdapterTypeDict string `json:"adapterTypeDict" db:"adapter_type_dict" gorm:"adapter_type_dict"` //适配器类型(对应字典表的值 + TaskTypeDict string `json:"taskTypeDict" db:"task_type_dict" gorm:"task_type_dict"` //任务类型(对应字典表的值 + UserId int64 `json:"userId,omitempty" db:"user_id"` + UserName string `json:"userName,omitempty" db:"user_name"` + + ClusterId string `json:"clusterId,omitempty" db:"cluster_id"` + ClusterName string `json:"clusterName,omitempty" db:"cluster_name"` + ResourceSpec string `json:"resourceSpec,omitempty" db:"resource_spec"` + Card string `json:"card,omitempty" db:"card"` +} + +// clusterInfo 集群信息结构体 +type clusterInfo struct { + ClusterId string `json:"cluster_id"` + ClusterName string `json:"cluster_name"` + ResourceSpec string `json:"resource_spec"` + Card string `json:"card"` +} + +func (l *PageListTaskLogic) PageListTask(req *types.XJLABTaskReq) (*types.PageResult, error) { + // 验证请求参数 + if err := l.validateRequest(req); err != nil { + return nil, err + } + + // 查询任务总数 + total, err := l.getTaskCount(req) + if err != nil { + return nil, err + } + + // 查询任务列表 + tasks, err := l.getTaskList(req) + if err != nil { + return nil, err + } + + // 异步更新任务状态 + l.updateTaskStatusAsync(tasks) + + // 处理任务响应数据 + taskResps := l.processTaskResponses(tasks) + + // 构建分页结果 + return &types.PageResult{ + List: &taskResps, + PageSize: req.PageSize, + PageNum: req.PageNum, + Total: total, + }, nil +} + +// validateRequest 验证请求参数 +func (l *PageListTaskLogic) validateRequest(req *types.XJLABTaskReq) error { + if req.PageSize <= 0 || req.PageNum <= 0 { + return result.NewDefaultError("Invalid page size or page number") + } + return nil +} + +// buildBaseQuery 构建基础查询条件 +func (l *PageListTaskLogic) buildBaseQuery(req *types.XJLABTaskReq) *gorm.DB { + db := l.svcCtx.DbEngin.Model(&types.TaskModel{}).Table("task").Where("deleted_at is null") + + // 用户权限过滤 + if req.UserName != "" && req.UserName != "admin" { + db = db.Where("user_name = ?", req.UserName) + } + + // 任务名称模糊查询 + if req.Name != "" { + db = db.Where("name LIKE ?", "%"+req.Name+"%") + } + + // 计算类型筛选 + if req.AdapterTypeDict != "" { + db = db.Where("adapter_type_dict = ?", req.AdapterTypeDict) + } + + // 任务状态筛选 + if req.Status != "" { + db = db.Where("status = ?", req.Status) + } + + // 时间范围筛选 + if req.StartTime != "" && req.EndTime != "" { + db = db.Where("created_time BETWEEN ? AND ?", req.StartTime, req.EndTime) + } + + return db +} + +// getTaskCount 获取任务总数 +func (l *PageListTaskLogic) getTaskCount(req *types.XJLABTaskReq) (int64, error) { + var total int64 + db := l.buildBaseQuery(req) + + if err := db.Count(&total).Error; err != nil { + return 0, result.NewDefaultError(err.Error()) + } + + return total, nil +} + +// getTaskList 获取任务列表 +func (l *PageListTaskLogic) getTaskList(req *types.XJLABTaskReq) ([]*types.TaskModel, error) { + var list []*types.TaskModel + + limit := req.PageSize + offset := req.PageSize * (req.PageNum - 1) + + db := l.buildBaseQuery(req) + + if err := db.Limit(limit).Offset(offset).Order("created_time desc").Find(&list).Error; err != nil { + return nil, result.NewDefaultError(err.Error()) + } + + return list, nil +} + +// updateTaskStatusAsync 异步更新任务状态 +func (l *PageListTaskLogic) updateTaskStatusAsync(tasks []*types.TaskModel) { + go l.svcCtx.Scheduler.AiService.St.UpdateTaskStatus(tasks) + go l.svcCtx.Scheduler.AiService.St.UpdateAiTaskStatus(tasks) +} + +// processTaskResponses 处理任务响应数据 +func (l *PageListTaskLogic) processTaskResponses(tasks []*types.TaskModel) []*TaskResp { + taskResps := make([]*TaskResp, 0, len(tasks)) + + for _, model := range tasks { + // 计算运行时间 + model.RunningTime = l.calculateRunningTime(model.StartTime, model.EndTime) + + // 转换为响应结构体 + taskResp, err := l.convertToTaskResp(model) + if err != nil { + l.Errorf("Failed to convert task model: %v", err) + continue + } + + // 丰富任务详情 + enrichedTask := l.enrichTaskDetails(taskResp) + taskResps = append(taskResps, &enrichedTask) + } + + return taskResps +} + +// convertToTaskResp 将TaskModel转换为TaskResp +func (l *PageListTaskLogic) convertToTaskResp(model *types.TaskModel) (*TaskResp, error) { + jsonData, err := jsoniter.Marshal(model) + if err != nil { + return nil, err + } + + var taskResp TaskResp + if err := jsoniter.Unmarshal(jsonData, &taskResp); err != nil { + return nil, err + } + + return &taskResp, nil +} + +// calculateRunningTime 计算任务的运行时间 +func (l *PageListTaskLogic) calculateRunningTime(startTimeStr, endTimeStr string) int64 { + if startTimeStr == "" { + return 0 + } + + startTime := timeutils.TimeStringToGoTime(startTimeStr) + + // 如果没有结束时间,计算到当前时间 + if endTimeStr == "" { + return int64(time.Since(startTime).Seconds()) + } + + endTime := timeutils.TimeStringToGoTime(endTimeStr) + return int64(endTime.Sub(startTime).Seconds()) +} + +// enrichTaskDetails 丰富任务详情 +func (l *PageListTaskLogic) enrichTaskDetails(task *TaskResp) TaskResp { + cluster := l.getClusterInfo(task.Id, task.AdapterTypeDict) + + task.ClusterId = cluster.ClusterId + task.ClusterName = cluster.ClusterName + task.ResourceSpec = cluster.ResourceSpec + task.Card = cluster.Card + + return *task +} + +// getClusterInfo 根据适配器类型获取集群信息 +func (l *PageListTaskLogic) getClusterInfo(taskId int64, adapterType string) clusterInfo { + var cluster clusterInfo + + switch adapterType { + case constants.AdapterTypeCloud: + // 云计算任务 + l.svcCtx.DbEngin.Table("task_cloud"). + Where("task_id = ?", taskId). + Select("cluster_id,cluster_name,resource_spec"). + Find(&cluster) + + case constants.AdapterTypeAI: + // AI计算任务 + l.svcCtx.DbEngin.Table("task_ai"). + Where("task_id = ?", taskId). + Select("cluster_id,cluster_name,resource_spec,card"). + Find(&cluster) + + // AI任务特殊处理:如果没有resource_spec,使用card字段 + if cluster.ResourceSpec == "" { + cluster.ResourceSpec = cluster.Card + } + + case constants.AdapterTypeHPC: + // 高性能计算任务 + l.svcCtx.DbEngin.Table("task_hpc"). + Where("task_id = ?", taskId). + Select("cluster_id,cluster_name,resource_spec"). + Find(&cluster) + } + + return cluster +} diff --git a/internal/logic/xjlab/task_resource_usage.go b/internal/logic/xjlab/task_resource_usage.go new file mode 100644 index 00000000..d27fdb97 --- /dev/null +++ b/internal/logic/xjlab/task_resource_usage.go @@ -0,0 +1,27 @@ +package xjlab + +import ( + "context" + + "github.com/zeromicro/go-zero/core/logx" + "gitlink.org.cn/JointCloud/pcm-coordinator/internal/svc" + "gitlink.org.cn/JointCloud/pcm-coordinator/internal/types" +) + +type TaskResourceUsageLogic struct { + logx.Logger + ctx context.Context + svcCtx *svc.ServiceContext +} + +func NewTaskResourceUsageLogic(ctx context.Context, svcCtx *svc.ServiceContext) *TaskResourceUsageLogic { + return &TaskResourceUsageLogic{ + Logger: logx.WithContext(ctx), + ctx: ctx, + svcCtx: svcCtx, + } +} + +func (l *TaskResourceUsageLogic) TaskResourceUsage(req *types.FId) (*types.CommonResp, error) { + return nil, nil +} diff --git a/internal/logic/xjlab/task_status_statistics.go b/internal/logic/xjlab/task_status_statistics.go new file mode 100644 index 00000000..1b3035d7 --- /dev/null +++ b/internal/logic/xjlab/task_status_statistics.go @@ -0,0 +1,155 @@ +package xjlab + +import ( + "context" + + "github.com/zeromicro/go-zero/core/logx" + "gitlink.org.cn/JointCloud/pcm-coordinator/internal/svc" + "gitlink.org.cn/JointCloud/pcm-coordinator/internal/types" + "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/repository/result" +) + +// 任务状态常量定义 +const ( + StatusCompleted = "Completed" + StatusFailed = "Failed" + StatusRunning = "Running" + StatusSaved = "Saved" + StatusStopped = "Stopped" + StatusSucceeded = "Succeeded" + StatusUndefined = "undefined" + StatusWaiting = "Waiting" + StatusWaitRestart = "WaitRestart" +) + +type TaskStatusStatistics struct { + logx.Logger + ctx context.Context + svcCtx *svc.ServiceContext +} + +func NewTaskStatusStatisticsLogic(ctx context.Context, svcCtx *svc.ServiceContext) *TaskStatusStatistics { + return &TaskStatusStatistics{ + Logger: logx.WithContext(ctx), + ctx: ctx, + svcCtx: svcCtx, + } +} + +// TaskStatus 任务状态统计结构体 +type TaskStatus struct { + // 总数 + Total int64 `json:"total"` + + // 按状态分类统计 + Completed int64 `json:"completed"` // 已完成 + Failed int64 `json:"failed"` // 失败 + Running int64 `json:"running"` // 运行中 + Saved int64 `json:"saved"` // 已保存 + Stopped int64 `json:"stopped"` // 已停止 + Succeeded int64 `json:"succeeded"` // 成功 + Undefined int64 `json:"undefined"` // 未定义 + Waiting int64 `json:"waiting"` // 等待中 + WaitRestart int64 `json:"waitRestart"` // 等待重启 + + // 业务分类统计 + NormalCount int64 `json:"normalCount"` // 正常任务数 (Completed + Succeeded + Saved) + RunningCount int64 `json:"runningCount"` // 运行中任务数 (Running + Waiting + WaitRestart) + ErrorCount int64 `json:"errorCount"` // 异常任务数 (Failed + Stopped + Undefined) +} + +// StatusResult 数据库查询结果结构体 +type StatusResult struct { + Status string `gorm:"column:status" json:"status"` + Count int64 `gorm:"column:count" json:"count"` +} + +// GetTaskStatusStatistics 获取任务状态统计 +func (l *TaskStatusStatistics) GetTaskStatusStatistics(req *types.XJLABCommonReq) (*TaskStatus, error) { + // 构建数据库查询 + db := l.svcCtx.DbEngin.Model(&types.TaskModel{}). + Table("task"). + Where("deleted_at IS NULL") + + // 查询状态统计 + var results []StatusResult + err := db. + Select("status, COUNT(*) as count"). + Group("status"). + Find(&results).Error + + if err != nil { + l.Errorf("Failed to query task status statistics: %v", err) + return nil, result.NewDefaultError("Failed to get task status statistics") + } + + // 初始化统计结构 + stats := &TaskStatus{} + + // 填充统计数据 + for _, r := range results { + stats.Total += r.Count + l.categorizeTaskStatus(stats, r.Status, r.Count) + } + + l.Infof("Task status statistics retrieved successfully. Total: %d, Normal: %d, Running: %d, Error: %d", + stats.Total, stats.NormalCount, stats.RunningCount, stats.ErrorCount) + + return stats, nil +} + +// categorizeTaskStatus 根据状态分类统计数据 +func (l *TaskStatusStatistics) categorizeTaskStatus(stats *TaskStatus, status string, count int64) { + switch status { + case StatusCompleted: + stats.Completed = count + stats.NormalCount += count + case StatusFailed: + stats.Failed = count + stats.ErrorCount += count + case StatusRunning: + stats.Running = count + stats.RunningCount += count + case StatusSaved: + stats.Saved = count + stats.NormalCount += count + case StatusStopped: + stats.Stopped = count + stats.ErrorCount += count + case StatusSucceeded: + stats.Succeeded = count + stats.NormalCount += count + case StatusUndefined: + stats.Undefined = count + stats.ErrorCount += count + case StatusWaiting: + stats.Waiting = count + stats.RunningCount += count + case StatusWaitRestart: + stats.WaitRestart = count + stats.RunningCount += count + default: + // 记录未知状态 + l.Logger.Errorf("Unknown task status encountered: %s with count: %d", status, count) + stats.ErrorCount += count + } +} + +// GetSimpleTaskStatistics 获取简化的任务统计数据 +func (l *TaskStatusStatistics) GetSimpleTaskStatistics(req *types.XJLABCommonReq) (*map[string]interface{}, error) { + // 获取完整统计 + resp, err := l.GetTaskStatusStatistics(req) + if err != nil { + return nil, err + } + + // 返回简化的统计数据,对应前端UI的三个卡片 + simpleStats := map[string]interface{}{ + "totalCount": resp.Total, // 任务总数 + "normalCount": resp.NormalCount, // 正常任务数 + "errorCount": resp.ErrorCount, // 任务告警数 + "runningCount": resp.RunningCount, // 运行中任务数 + } + + return &simpleStats, nil +} diff --git a/internal/scheduler/database/aiStorage.go b/internal/scheduler/database/aiStorage.go index e32bb02e..19b90a63 100644 --- a/internal/scheduler/database/aiStorage.go +++ b/internal/scheduler/database/aiStorage.go @@ -1,6 +1,10 @@ package database import ( + "strconv" + "time" + + jsoniter "github.com/json-iterator/go" "github.com/zeromicro/go-zero/core/logx" clientCore "gitlink.org.cn/JointCloud/pcm-coordinator/client" "gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/schedulers/option" @@ -11,8 +15,6 @@ import ( "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/tracker" "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/utils" "gorm.io/gorm" - "strconv" - "time" ) type AiStorage struct { @@ -125,7 +127,7 @@ func (s *AiStorage) DoesTaskNameExist(name string, taskType string) (bool, error return total > 0, nil } -func (s *AiStorage) SaveTask(name string, desc string, userId int64, strategyCode int64, synergyStatus int64, aiType string, yaml string, saveToChain func(task models.Task, id int64) error) (int64, error) { +func (s *AiStorage) SaveTask(name string, desc string, userId int64, strategyCode int64, synergyStatus int64, aiType string, yaml string, saveToChain func(task models.Task, id int64) error, userName string) (int64, error) { startTime := time.Now() // 构建主任务结构体 @@ -135,6 +137,7 @@ func (s *AiStorage) SaveTask(name string, desc string, userId int64, strategyCod Description: desc, Name: name, UserId: userId, + UserName: userName, SynergyStatus: synergyStatus, Strategy: strategyCode, AdapterTypeDict: "1", @@ -214,25 +217,26 @@ func (s *AiStorage) SaveAiTask(taskId int64, opt option.Option, adapterName stri if err != nil { return err } - + resourceSpec, _ := jsoniter.MarshalToString(aiOpt.ResourcesRequired) aiTaskModel := models.TaskAi{ - TaskId: taskId, - AdapterId: aId, - AdapterName: adapterName, - ClusterId: cId, - ClusterName: clusterName, - Name: aiOpt.TaskName, - Replica: int64(aiOpt.Replica), - JobId: jobId, - TaskType: aiOpt.TaskType, - ModelName: aiOpt.ModelName, - Strategy: aiOpt.StrategyName, - Status: status, - Msg: msg, - Output: aiOpt.Output, - Card: aiOpt.ComputeCard, - StartTime: time.Now().Format(time.RFC3339), - CommitTime: time.Now(), + TaskId: taskId, + AdapterId: aId, + AdapterName: adapterName, + ClusterId: cId, + ClusterName: clusterName, + Name: aiOpt.TaskName, + Replica: int64(aiOpt.Replica), + JobId: jobId, + TaskType: aiOpt.TaskType, + ModelName: aiOpt.ModelName, + Strategy: aiOpt.StrategyName, + Status: status, + Msg: msg, + Output: aiOpt.Output, + Card: aiOpt.ComputeCard, + StartTime: time.Now().Format(time.RFC3339), + CommitTime: time.Now(), + ResourceSpec: resourceSpec, } // 保存任务数据到数据库 tx := s.DbEngin.Create(&aiTaskModel) diff --git a/internal/scheduler/scheduler.go b/internal/scheduler/scheduler.go index e2b60386..4a077d2b 100644 --- a/internal/scheduler/scheduler.go +++ b/internal/scheduler/scheduler.go @@ -16,6 +16,8 @@ package scheduler import ( "encoding/json" + "strings" + "github.com/pkg/errors" "github.com/zeromicro/go-zero/core/logx" "gitlink.org.cn/JointCloud/pcm-coordinator/internal/config" @@ -27,7 +29,6 @@ import ( "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/response" "gorm.io/gorm" "sigs.k8s.io/yaml" - "strings" ) type Scheduler struct { @@ -189,13 +190,13 @@ func (s *Scheduler) SaveToDb() error { return nil } -func (s *Scheduler) CreateTask(taskName string, desc string, userId int64, synergyCode int64, strategyName string, yaml string, token string, userIp string, config *config.Config) (int64, error) { +func (s *Scheduler) CreateTask(taskName string, desc string, userId int64, synergyCode int64, strategyName string, yaml string, token string, userIp string, config *config.Config, userName string) (int64, error) { strategyCode, err := s.AiStorages.GetStrategyCode(strategyName) if err != nil { return 0, err } - id, err := s.AiStorages.SaveTask(taskName, desc, userId, strategyCode, synergyCode, "10", yaml, nil) + id, err := s.AiStorages.SaveTask(taskName, desc, userId, strategyCode, synergyCode, "10", yaml, nil, userName) if err != nil { return 0, err } diff --git a/internal/scheduler/schedulers/aiScheduler.go b/internal/scheduler/schedulers/aiScheduler.go index 99dffcdf..0ca593ca 100644 --- a/internal/scheduler/schedulers/aiScheduler.go +++ b/internal/scheduler/schedulers/aiScheduler.go @@ -19,6 +19,10 @@ import ( "encoding/json" "errors" "fmt" + "strconv" + "strings" + "sync" + "github.com/zeromicro/go-zero/core/logx" "gitlink.org.cn/JointCloud/pcm-ac/hpcAC" "gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler" @@ -39,9 +43,6 @@ import ( omodel "gitlink.org.cn/JointCloud/pcm-octopus/http/model" "gitlink.org.cn/JointCloud/pcm-octopus/octopus" "gitlink.org.cn/JointCloud/pcm-openi/model" - "strconv" - "strings" - "sync" ) type AiScheduler struct { @@ -248,7 +249,7 @@ func (as *AiScheduler) handleErrors(errs []interface{}, clusters []*strategy.Ass var taskId int64 switch mode { case executor.SUBMIT_MODE_JOINT_CLOUD: - tid, err := as.CreateTask(as.option.TaskName, "", 0, synergystatus, as.option.StrategyName, "", "", "", nil) + tid, err := as.CreateTask(as.option.TaskName, "", 0, synergystatus, as.option.StrategyName, "", "", "", nil, "0") if err != nil { return err } diff --git a/internal/scheduler/service/inference/imageInference/imageInference.go b/internal/scheduler/service/inference/imageInference/imageInference.go index 7d9b2e65..68de2158 100644 --- a/internal/scheduler/service/inference/imageInference/imageInference.go +++ b/internal/scheduler/service/inference/imageInference/imageInference.go @@ -3,6 +3,15 @@ package imageInference import ( "encoding/json" "errors" + "log" + "math/rand" + "mime/multipart" + "net/http" + "sort" + "strconv" + "sync" + "time" + "github.com/zeromicro/go-zero/core/logx" "gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/common" "gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/database" @@ -14,14 +23,6 @@ import ( "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/constants" "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/models" "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/utils" - "log" - "math/rand" - "mime/multipart" - "net/http" - "sort" - "strconv" - "sync" - "time" ) type IImageInference interface { @@ -126,7 +127,7 @@ func (i *ImageInference) saveTask() (int64, error) { return 0, err } - id, err := i.storage.SaveTask(i.opt.TaskName, "", 0, strategyCode, synergystatus, i.inference.GetAiType(), "", nil) + id, err := i.storage.SaveTask(i.opt.TaskName, "", 0, strategyCode, synergystatus, i.inference.GetAiType(), "", nil, "0") if err != nil { return 0, err } diff --git a/internal/scheduler/service/inference/textInference/textInference.go b/internal/scheduler/service/inference/textInference/textInference.go index fdd15f23..051792de 100644 --- a/internal/scheduler/service/inference/textInference/textInference.go +++ b/internal/scheduler/service/inference/textInference/textInference.go @@ -70,7 +70,7 @@ func (ti *TextInference) saveTask() (int64, error) { var synergystatus int64 var strategyCode int64 - id, err := ti.storage.SaveTask(ti.opt.TaskName, "", 0, strategyCode, synergystatus, ti.inference.GetAiType(), "", nil) + id, err := ti.storage.SaveTask(ti.opt.TaskName, "", 0, strategyCode, synergystatus, ti.inference.GetAiType(), "", nil, "0") if err != nil { return 0, err } diff --git a/internal/types/cloud/container.go b/internal/types/cloud/container.go index de5b232d..18956613 100644 --- a/internal/types/cloud/container.go +++ b/internal/types/cloud/container.go @@ -14,6 +14,7 @@ type CreateParam struct { ContainerGroupName string `json:"containerGroupName"` Description string `json:"description,omitempty"` UserId int64 `json:"userId"` + UserName string `json:"userName"` Name string `json:"name"` Image string `json:"image"` ImageRegistry string `json:"imageRegistry,omitempty"` diff --git a/internal/types/types.go b/internal/types/types.go index 1aec0096..61b9e5cb 100644 --- a/internal/types/types.go +++ b/internal/types/types.go @@ -1465,6 +1465,7 @@ type CreateSubnetResp struct { type CreateTaskReq struct { Name string `json:"name"` UserId int64 `json:"userId,optional"` + UserName string `json:"userName,optional"` Description string `json:"description,optional"` Token string `json:"token,optional"` UserIp string `json:"userIp,optional"` @@ -2500,6 +2501,7 @@ type Floatingips struct { type GeneralTaskReq struct { Token string `json:"token,optional"` UserId int64 `json:"userId,optional"` + UserName string `json:"userName,optional"` Name string `json:"name"` AdapterIds []string `json:"adapterIds"` ClusterIds []string `json:"clusterIds"` @@ -5647,6 +5649,7 @@ type TaskModel struct { AdapterTypeDict string `json:"adapterTypeDict" db:"adapter_type_dict" gorm:"adapter_type_dict"` //适配器类型(对应字典表的值 TaskTypeDict string `json:"taskTypeDict" db:"task_type_dict" gorm:"task_type_dict"` //任务类型(对应字典表的值 UserId int64 `json:"userId,omitempty" db:"user_id"` + UserName string `json:"userName,omitempty" db:"user_name"` } type TaskSl struct { @@ -6537,3 +6540,52 @@ type TaskNumResp struct { History int `json:"history"` Failed int `json:"failed"` } + +// jcc 用户信息 +type JccUserInfo struct { + UserName string `json:"userName,optional"` + UserId int64 `json:"userId,optional"` +} + +type XJLABTaskReq struct { + Id string `form:"id,optional"` + Name string `form:"name,optional"` + Status string `form:"status,optional"` + UserName string `form:"userName,optional"` + AdapterTypeDict string `form:"adapterTypeDict,optional"` + StartTime string `form:"startTime,optional"` + EndTime string `form:"endTime,optional"` + PageInfo + JccUserInfo +} + +type XJLABTaskResp struct { + Id int64 `json:"id,omitempty,string" db:"id"` // id + Name string `json:"name,omitempty" db:"name"` // 作业名称 + Description string `json:"description,omitempty" db:"description"` // 作业描述 + Status string `json:"status,omitempty" db:"status"` // 作业状态 + Strategy int64 `json:"strategy" db:"strategy"` // 策略 + SynergyStatus int64 `json:"synergyStatus" db:"synergy_status"` // 协同状态(0-未协同、1-已协同) + CommitTime string `json:"commitTime,omitempty" db:"commit_time"` // 提交时间 + StartTime string `json:"startTime,omitempty" db:"start_time"` // 开始时间 + EndTime string `json:"endTime,omitempty" db:"end_time"` // 结束运行时间 + RunningTime int64 `json:"runningTime" db:"running_time"` // 已运行时间(单位秒) + YamlString string `json:"yamlString,omitempty" db:"yaml_string"` + Result string `json:"result,omitempty" db:"result"` // 作业结果 + DeletedAt string `json:"deletedAt,omitempty" gorm:"index" db:"deleted_at"` + NsID string `json:"nsId,omitempty" db:"ns_id"` + TenantId string `json:"tenantId,omitempty" db:"tenant_id"` + CreatedTime string `json:"createdTime,omitempty" db:"created_time" gorm:"autoCreateTime"` + UpdatedTime string `json:"updatedTime,omitempty" db:"updated_time"` + AdapterTypeDict string `json:"adapterTypeDict" db:"adapter_type_dict" gorm:"adapter_type_dict"` //适配器类型(对应字典表的值 + TaskTypeDict string `json:"taskTypeDict" db:"task_type_dict" gorm:"task_type_dict"` //任务类型(对应字典表的值 + UserId int64 `json:"userId,omitempty" db:"user_id"` + UserName string `json:"userName,omitempty" db:"-"` // 用户名称 + ResourceId string `json:"resourceId,omitempty" db:"resource_id"` // 资源ID + ResourceName string `json:"resourceName,omitempty" db:"resource_name"` // 资源名称 + ClusterId string `json:"clusterId,omitempty" db:"cluster_id"` // 集群ID + ClusterName string `json:"clusterName,omitempty" db:"cluster_name"` // 集群 +} + +type XJLABCommonReq struct { +} diff --git a/pkg/constants/const.go b/pkg/constants/const.go new file mode 100644 index 00000000..8be0351f --- /dev/null +++ b/pkg/constants/const.go @@ -0,0 +1,24 @@ +package constants + +const ( + UserId = "UserId" + UserName = "UserName" +) + +const ( + StatusSaved = "Saved" + StatusDeploying = "Deploying" +) +const ( + AdapterTypeCloud = "0" + AdapterTypeAI = "1" + AdapterTypeHPC = "2" +) + +// HPCSystemType 超算计算系统类型 +type HPCSystemType string + +const ( + HPC_SYSTEM_SLURM HPCSystemType = "slurm" + HPC_SYSTEM_AC HPCSystemType = "ac" +) diff --git a/pkg/models/taskaimodel_gen.go b/pkg/models/taskaimodel_gen.go index 4d4f72a9..03cce164 100644 --- a/pkg/models/taskaimodel_gen.go +++ b/pkg/models/taskaimodel_gen.go @@ -35,27 +35,28 @@ type ( } TaskAi struct { - Id int64 `db:"id"` // id - TaskId int64 `db:"task_id"` // 任务id - AdapterId int64 `db:"adapter_id"` // 适配器id - AdapterName string `db:"adapter_name"` // 适配器名称 - ClusterId int64 `db:"cluster_id"` // 集群id - ClusterName string `db:"cluster_name"` // 集群名称 - Name string `db:"name"` // 任务名 - Replica int64 `db:"replica"` // 执行数 - JobId string `db:"job_id"` // 集群返回任务id - Strategy string `db:"strategy"` // 主任务使用策略 - Status string `db:"status"` // 任务状态 - Msg string `db:"msg"` // 集群返回任务信息 - CommitTime time.Time `db:"commit_time"` // 提交时间 - StartTime string `db:"start_time"` // 开始时间 - EndTime string `db:"end_time"` // 结束时间 - TaskType string `db:"task_type"` - DeletedAt *time.Time `db:"deleted_at"` - Card string `db:"card"` - InferUrl string `db:"infer_url"` - ModelName string `db:"model_name"` - Output string `db:"output"` + Id int64 `db:"id"` // id + TaskId int64 `db:"task_id"` // 任务id + AdapterId int64 `db:"adapter_id"` // 适配器id + AdapterName string `db:"adapter_name"` // 适配器名称 + ClusterId int64 `db:"cluster_id"` // 集群id + ClusterName string `db:"cluster_name"` // 集群名称 + Name string `db:"name"` // 任务名 + Replica int64 `db:"replica"` // 执行数 + JobId string `db:"job_id"` // 集群返回任务id + Strategy string `db:"strategy"` // 主任务使用策略 + Status string `db:"status"` // 任务状态 + Msg string `db:"msg"` // 集群返回任务信息 + CommitTime time.Time `db:"commit_time"` // 提交时间 + StartTime string `db:"start_time"` // 开始时间 + EndTime string `db:"end_time"` // 结束时间 + TaskType string `db:"task_type"` + DeletedAt *time.Time `db:"deleted_at"` + Card string `db:"card"` + InferUrl string `db:"infer_url"` + ModelName string `db:"model_name"` + Output string `db:"output"` + ResourceSpec string `db:"resource_spec"` } ) diff --git a/pkg/models/taskhpcmodel_gen.go b/pkg/models/taskhpcmodel_gen.go index 459934e7..f9a2cfb2 100644 --- a/pkg/models/taskhpcmodel_gen.go +++ b/pkg/models/taskhpcmodel_gen.go @@ -36,55 +36,64 @@ type ( } TaskHpc struct { - Id int64 `db:"id"` // id - TaskId int64 `db:"task_id"` // 任务id - JobId string `db:"job_id"` // 作业id(在第三方系统中的作业id) - AdapterId int64 `db:"adapter_d"` // 适配器id - AdapterName string `db:"adapter_name"` //适配器名称 - ClusterId int64 `db:"cluster_id"` //集群id - ClusterName string `db:"cluster_name"` //集群名称 - Name string `db:"name"` // 名称 - Backend string `db:"backend"` // 平台类型 - OperateType string `db:"operate_type"` // 操作类型 - Status string `db:"status"` // 状态 - CmdScript string `db:"cmd_script"` - StartTime string `db:"start_time"` // 开始时间 - EndTime string `db:"end_time"` // 结束时间 - RunningTime int64 `db:"running_time"` // 运行时间 - DerivedEs string `db:"derived_es"` - Cluster string `db:"cluster"` - BlockId int64 `db:"block_id"` - AllocNodes int64 `db:"alloc_nodes"` - AllocCpu int64 `db:"alloc_cpu"` - CardCount int64 `db:"card_count"` // 卡数 - Version string `db:"version"` - Account string `db:"account"` - WorkDir string `db:"work_dir"` // 工作路径 - AssocId int64 `db:"assoc_id"` - ExitCode int64 `db:"exit_code"` - WallTime string `db:"wall_time"` // 最大运行时间 - Result string `db:"result"` // 运行结果 - DeletedAt sql.NullTime `db:"deleted_at"` // 删除时间 - YamlString string `db:"yaml_string"` - AppType string `db:"app_type"` // 应用类型 - AppName string `db:"app_name"` // 应用名称 - Queue string `db:"queue"` // 队列名称 - SubmitType string `db:"submit_type"` // cmd(命令行模式) - NNode string `db:"n_node"` // 节点个数(当指定该参数时,GAP_NODE_STRING必须为"") - StdOutFile string `db:"std_out_file"` // 工作路径/std.err.%j - StdErrFile string `db:"std_err_file"` // 工作路径/std.err.%j - StdInput string `db:"std_input"` - Partition string `db:"partition"` - DeletedFlag int64 `db:"deleted_flag"` // 是否删除(0-否,1-是) - CreatedBy int64 `db:"created_by"` // 创建人 - CreatedTime time.Time `db:"created_time"` // 创建时间 - UpdatedBy int64 `db:"updated_by"` // 更新人 - UpdatedTime time.Time `db:"updated_time"` // 更新时间 - UserId int64 `db:"user_id"` - TimeLimit int64 `db:"time_limit"` - Params string `db:"params"` // 渲染参数 - Script string `db:"script"` // 生成的脚本 - TemplateId int64 `db:"template_id"` // 模板ID + Id int64 `db:"id"` // id + TaskId int64 `db:"task_id"` // 任务id + JobId string `db:"job_id"` // 作业id(在第三方系统中的作业id) + AdapterId int64 `db:"adapter_d"` // 适配器id + AdapterName string `db:"adapter_name"` //适配器名称 + ClusterId int64 `db:"cluster_id"` //集群id + ClusterName string `db:"cluster_name"` //集群名称 + Name string `db:"name"` // 名称 + Backend string `db:"backend"` // 平台类型 + OperateType string `db:"operate_type"` // 操作类型 + Status string `db:"status"` // 状态 + CmdScript string `db:"cmd_script"` + StartTime string `db:"start_time"` // 开始时间 + EndTime string `db:"end_time"` // 结束时间 + RunningTime int64 `db:"running_time"` // 运行时间 + DerivedEs string `db:"derived_es"` + Cluster string `db:"cluster"` + BlockId int64 `db:"block_id"` + AllocNodes int64 `db:"alloc_nodes"` + AllocCpu int64 `db:"alloc_cpu"` + CardCount int64 `db:"card_count"` // 卡数 + Version string `db:"version"` + Account string `db:"account"` + WorkDir string `db:"work_dir"` // 工作路径 + AssocId int64 `db:"assoc_id"` + ExitCode int64 `db:"exit_code"` + WallTime string `db:"wall_time"` // 最大运行时间 + Result string `db:"result"` // 运行结果 + DeletedAt sql.NullTime `db:"deleted_at"` // 删除时间 + YamlString string `db:"yaml_string"` + AppType string `db:"app_type"` // 应用类型 + AppName string `db:"app_name"` // 应用名称 + Queue string `db:"queue"` // 队列名称 + SubmitType string `db:"submit_type"` // cmd(命令行模式) + NNode string `db:"n_node"` // 节点个数(当指定该参数时,GAP_NODE_STRING必须为"") + StdOutFile string `db:"std_out_file"` // 工作路径/std.err.%j + StdErrFile string `db:"std_err_file"` // 工作路径/std.err.%j + StdInput string `db:"std_input"` + Partition string `db:"partition"` + DeletedFlag int64 `db:"deleted_flag"` // 是否删除(0-否,1-是) + CreatedBy int64 `db:"created_by"` // 创建人 + CreatedTime time.Time `db:"created_time"` // 创建时间 + UpdatedBy int64 `db:"updated_by"` // 更新人 + UpdatedTime time.Time `db:"updated_time"` // 更新时间 + UserId int64 `db:"user_id"` + TimeLimit int64 `db:"time_limit"` + Params string `db:"params"` // 渲染参数 + Script string `db:"script"` // 生成的脚本 + TemplateId int64 `db:"template_id"` // 模板ID + ResourceSpec ResourceSpec `json:"resourceSpec" gorm:"serializer:json"` + } + + ResourceSpec struct { + ResourceId string `json:"resourceId"` // 资源ID + ResourceName string `json:"resourceName"` // 资源名称 + ResourceType string `json:"resourceType"` // 资源类型 + Partition string `json:"partition"` // 分区 + Specifications interface{} `json:"specifications"` } ) diff --git a/pkg/models/taskmodel_gen.go b/pkg/models/taskmodel_gen.go index f099c4dc..1c368995 100644 --- a/pkg/models/taskmodel_gen.go +++ b/pkg/models/taskmodel_gen.go @@ -52,6 +52,7 @@ type ( AdapterTypeDict string `db:"adapter_type_dict" json:"adapterTypeDict"` //任务类型(对应字典表的值) TaskTypeDict string `db:"task_type_dict" json:"taskTypeDict"` UserId int64 `db:"user_id" json:"userId"` + UserName string `db:"user_name" json:"userName"` // 提交人 } ) diff --git a/pkg/utils/slurm_parser.go b/pkg/utils/slurm_parser.go new file mode 100644 index 00000000..78b1d811 --- /dev/null +++ b/pkg/utils/slurm_parser.go @@ -0,0 +1,394 @@ +package utils + +import ( + "bufio" + "fmt" + "os" + "regexp" + "strconv" + "strings" +) + +// SlurmResource 定义SLURM资源规格结构体 +type SlurmResource struct { + JobName string `json:"job_name"` + CPUs string `json:"cpus"` // 每任务CPU数 + Memory string `json:"memory"` // 内存 + Nodes string `json:"nodes"` // 节点数 + Tasks string `json:"tasks"` // 总任务数 + TasksPerNode string `json:"tasks_per_node"` // 每节点任务数 + CPUsPerTask string `json:"cpus_per_task"` // 每任务CPU数 + Partition string `json:"partition"` // 队列/分区 + Time string `json:"time"` // 时间限制 + Output string `json:"output"` // 输出文件 + Error string `json:"error"` // 错误文件 + QOS string `json:"qos"` // 服务质量 + Account string `json:"account"` // 账户 + GPUs string `json:"gpus"` // GPU数量 + GPUType string `json:"gpu_type"` // GPU类型 + Constraint string `json:"constraint"` // 节点约束 + Exclusive bool `json:"exclusive"` // 独占节点 + ArrayJobID string `json:"array_job_id"` // 数组作业ID + WorkingDir string `json:"working_dir"` // 工作目录 + MailType string `json:"mail_type"` // 邮件类型 + MailUser string `json:"mail_user"` // 邮件用户 +} + +// SlurmParser SLURM解析器 +type SlurmParser struct { + patterns map[string][]*regexp.Regexp +} + +// NewSlurmParser 创建新的SLURM解析器 +func NewSlurmParser() *SlurmParser { + parser := &SlurmParser{ + patterns: make(map[string][]*regexp.Regexp), + } + parser.initPatterns() + return parser +} + +// initPatterns 初始化所有匹配模式 +func (p *SlurmParser) initPatterns() { + // 作业名称的各种写法 + p.patterns["job_name"] = []*regexp.Regexp{ + regexp.MustCompile(`#SBATCH\s+--job-name[=\s]+([^\s]+)`), + regexp.MustCompile(`#SBATCH\s+-J\s+([^\s]+)`), + regexp.MustCompile(`#SBATCH\s+--job-name\s*=\s*"([^"]+)"`), + regexp.MustCompile(`#SBATCH\s+--job-name\s*=\s*'([^']+)'`), + } + + // CPU相关的各种写法 + p.patterns["cpus_per_task"] = []*regexp.Regexp{ + regexp.MustCompile(`#SBATCH\s+--cpus-per-task[=\s]+(\d+)`), + regexp.MustCompile(`#SBATCH\s+-c\s+(\d+)`), + regexp.MustCompile(`#SBATCH\s+--cpus-per-task\s*=\s*(\d+)`), + } + + // 内存的各种写法 + p.patterns["memory"] = []*regexp.Regexp{ + regexp.MustCompile(`#SBATCH\s+--mem[=\s]+([^\s]+)`), + regexp.MustCompile(`#SBATCH\s+-m\s+([^\s]+)`), + regexp.MustCompile(`#SBATCH\s+--mem\s*=\s*([^\s]+)`), + regexp.MustCompile(`#SBATCH\s+--mem-per-cpu[=\s]+([^\s]+)`), + regexp.MustCompile(`#SBATCH\s+--mem-per-node[=\s]+([^\s]+)`), + } + + // 节点数的各种写法 + p.patterns["nodes"] = []*regexp.Regexp{ + regexp.MustCompile(`#SBATCH\s+--nodes[=\s]+(\d+)`), + regexp.MustCompile(`#SBATCH\s+-N\s+(\d+)`), + regexp.MustCompile(`#SBATCH\s+--nodes\s*=\s*(\d+)`), + regexp.MustCompile(`#SBATCH\s+--nodes[=\s]+(\d+-\d+)`), // 范围格式 + } + + // 任务数的各种写法 + p.patterns["tasks"] = []*regexp.Regexp{ + regexp.MustCompile(`#SBATCH\s+--ntasks[=\s]+(\d+)`), + regexp.MustCompile(`#SBATCH\s+-n\s+(\d+)`), + regexp.MustCompile(`#SBATCH\s+--ntasks\s*=\s*(\d+)`), + } + + // 每节点任务数 + p.patterns["tasks_per_node"] = []*regexp.Regexp{ + regexp.MustCompile(`#SBATCH\s+--ntasks-per-node[=\s]+(\d+)`), + regexp.MustCompile(`#SBATCH\s+--ntasks-per-node\s*=\s*(\d+)`), + } + + // 分区/队列的各种写法 + p.patterns["partition"] = []*regexp.Regexp{ + regexp.MustCompile(`#SBATCH\s+--partition[=\s]+([^\s]+)`), + regexp.MustCompile(`#SBATCH\s+-p\s+([^\s]+)`), + regexp.MustCompile(`#SBATCH\s+--partition\s*=\s*([^\s]+)`), + regexp.MustCompile(`#SBATCH\s+--partition\s*=\s*"([^"]+)"`), + } + + // 时间限制的各种写法 + p.patterns["time"] = []*regexp.Regexp{ + regexp.MustCompile(`#SBATCH\s+--time[=\s]+([^\s]+)`), + regexp.MustCompile(`#SBATCH\s+-t\s+([^\s]+)`), + regexp.MustCompile(`#SBATCH\s+--time\s*=\s*([^\s]+)`), + } + + // 输出文件 + p.patterns["output"] = []*regexp.Regexp{ + regexp.MustCompile(`#SBATCH\s+--output[=\s]+([^\s]+)`), + regexp.MustCompile(`#SBATCH\s+-o\s+([^\s]+)`), + regexp.MustCompile(`#SBATCH\s+--output\s*=\s*([^\s]+)`), + } + + // 错误文件 + p.patterns["error"] = []*regexp.Regexp{ + regexp.MustCompile(`#SBATCH\s+--error[=\s]+([^\s]+)`), + regexp.MustCompile(`#SBATCH\s+-e\s+([^\s]+)`), + regexp.MustCompile(`#SBATCH\s+--error\s*=\s*([^\s]+)`), + } + + // 服务质量 + p.patterns["qos"] = []*regexp.Regexp{ + regexp.MustCompile(`#SBATCH\s+--qos[=\s]+([^\s]+)`), + regexp.MustCompile(`#SBATCH\s+--qos\s*=\s*([^\s]+)`), + } + + // 账户 + p.patterns["account"] = []*regexp.Regexp{ + regexp.MustCompile(`#SBATCH\s+--account[=\s]+([^\s]+)`), + regexp.MustCompile(`#SBATCH\s+-A\s+([^\s]+)`), + regexp.MustCompile(`#SBATCH\s+--account\s*=\s*([^\s]+)`), + } + + // GPU相关 + p.patterns["gpus"] = []*regexp.Regexp{ + regexp.MustCompile(`#SBATCH\s+--gpus[=\s]+([^\s]+)`), + regexp.MustCompile(`#SBATCH\s+--gpus-per-node[=\s]+([^\s]+)`), + regexp.MustCompile(`#SBATCH\s+--gpus-per-task[=\s]+([^\s]+)`), + regexp.MustCompile(`#SBATCH\s+--gres[=\s]+gpu:(\d+)`), + regexp.MustCompile(`#SBATCH\s+--gres[=\s]+gpu:([^:]+):(\d+)`), // gpu类型:数量 + } + + // 约束条件 + p.patterns["constraint"] = []*regexp.Regexp{ + regexp.MustCompile(`#SBATCH\s+--constraint[=\s]+([^\s]+)`), + regexp.MustCompile(`#SBATCH\s+-C\s+([^\s]+)`), + regexp.MustCompile(`#SBATCH\s+--constraint\s*=\s*"([^"]+)"`), + } + + // 独占节点 + p.patterns["exclusive"] = []*regexp.Regexp{ + regexp.MustCompile(`#SBATCH\s+--exclusive`), + } + + // 数组作业 + p.patterns["array"] = []*regexp.Regexp{ + regexp.MustCompile(`#SBATCH\s+--array[=\s]+([^\s]+)`), + regexp.MustCompile(`#SBATCH\s+-a\s+([^\s]+)`), + } + + // 工作目录 + p.patterns["workdir"] = []*regexp.Regexp{ + regexp.MustCompile(`#SBATCH\s+--chdir[=\s]+([^\s]+)`), + regexp.MustCompile(`#SBATCH\s+--workdir[=\s]+([^\s]+)`), + regexp.MustCompile(`#SBATCH\s+-D\s+([^\s]+)`), + } + + // 邮件通知 + p.patterns["mail_type"] = []*regexp.Regexp{ + regexp.MustCompile(`#SBATCH\s+--mail-type[=\s]+([^\s]+)`), + } + + p.patterns["mail_user"] = []*regexp.Regexp{ + regexp.MustCompile(`#SBATCH\s+--mail-user[=\s]+([^\s]+)`), + } +} + +// ParseScript 解析SLURM脚本 +func (p *SlurmParser) ParseScript(scriptContent string) *SlurmResource { + resource := &SlurmResource{} + + scanner := bufio.NewScanner(strings.NewReader(scriptContent)) + + for scanner.Scan() { + line := strings.TrimSpace(scanner.Text()) + + // 跳过非SBATCH行和注释行 + if !strings.HasPrefix(line, "#SBATCH") { + continue + } + + // 处理每个字段 + p.parseField(line, "job_name", &resource.JobName) + p.parseField(line, "cpus_per_task", &resource.CPUsPerTask) + p.parseField(line, "memory", &resource.Memory) + p.parseField(line, "nodes", &resource.Nodes) + p.parseField(line, "tasks", &resource.Tasks) + p.parseField(line, "tasks_per_node", &resource.TasksPerNode) + p.parseField(line, "partition", &resource.Partition) + p.parseField(line, "time", &resource.Time) + p.parseField(line, "output", &resource.Output) + p.parseField(line, "error", &resource.Error) + p.parseField(line, "qos", &resource.QOS) + p.parseField(line, "account", &resource.Account) + p.parseField(line, "constraint", &resource.Constraint) + p.parseField(line, "array", &resource.ArrayJobID) + p.parseField(line, "workdir", &resource.WorkingDir) + p.parseField(line, "mail_type", &resource.MailType) + p.parseField(line, "mail_user", &resource.MailUser) + + // 处理GPU + p.parseGPU(line, resource) + + // 处理exclusive + if p.matchPattern(line, "exclusive") != "" { + resource.Exclusive = true + } + } + + // 后处理:推导缺失的信息 + p.postProcess(resource) + + return resource +} + +// parseField 解析单个字段 +func (p *SlurmParser) parseField(line, field string, target *string) { + if *target == "" { // 只在字段为空时才设置 + if value := p.matchPattern(line, field); value != "" { + *target = value + } + } +} + +// parseGPU 解析GPU相关信息 +func (p *SlurmParser) parseGPU(line string, resource *SlurmResource) { + if patterns, exists := p.patterns["gpus"]; exists { + for _, pattern := range patterns { + if matches := pattern.FindStringSubmatch(line); len(matches) > 1 { + if strings.Contains(pattern.String(), "gres.*gpu:([^:]+):(\\d+)") && len(matches) > 2 { + // gpu类型:数量格式 + resource.GPUType = matches[1] + resource.GPUs = matches[2] + } else { + resource.GPUs = matches[1] + } + break + } + } + } +} + +// matchPattern 匹配模式并返回值 +func (p *SlurmParser) matchPattern(line, field string) string { + if patterns, exists := p.patterns[field]; exists { + for _, pattern := range patterns { + if matches := pattern.FindStringSubmatch(line); len(matches) > 1 { + return matches[1] + } + } + } + return "" +} + +// postProcess 后处理,推导缺失信息 +func (p *SlurmParser) postProcess(resource *SlurmResource) { + // 如果没有指定CPUs但有tasks和cpus_per_task,计算总CPU数 + if resource.CPUs == "" && resource.Tasks != "" && resource.CPUsPerTask != "" { + if tasks, err1 := strconv.Atoi(resource.Tasks); err1 == nil { + if cpusPerTask, err2 := strconv.Atoi(resource.CPUsPerTask); err2 == nil { + resource.CPUs = strconv.Itoa(tasks * cpusPerTask) + } + } + } + + // 如果只有tasks但没有nodes,假设为单节点 + if resource.Tasks != "" && resource.Nodes == "" && resource.TasksPerNode == "" { + resource.Nodes = "1" + } +} + +// ParseFile 从文件解析SLURM脚本 +func (p *SlurmParser) ParseFile(filename string) (*SlurmResource, error) { + content, err := os.ReadFile(filename) + if err != nil { + return nil, fmt.Errorf("读取文件失败: %v", err) + } + return p.ParseScript(string(content)), nil +} + +// String 格式化输出 +func (r *SlurmResource) String() string { + var result strings.Builder + result.WriteString("SLURM资源规格:\n") + result.WriteString("====================\n") + + if r.JobName != "" { + result.WriteString(fmt.Sprintf("作业名称: %s\n", r.JobName)) + } + if r.Partition != "" { + result.WriteString(fmt.Sprintf("队列/分区: %s\n", r.Partition)) + } + if r.Nodes != "" { + result.WriteString(fmt.Sprintf("节点数: %s\n", r.Nodes)) + } + if r.Tasks != "" { + result.WriteString(fmt.Sprintf("任务数: %s\n", r.Tasks)) + } + if r.TasksPerNode != "" { + result.WriteString(fmt.Sprintf("每节点任务数: %s\n", r.TasksPerNode)) + } + if r.CPUsPerTask != "" { + result.WriteString(fmt.Sprintf("每任务CPU数: %s\n", r.CPUsPerTask)) + } + if r.CPUs != "" { + result.WriteString(fmt.Sprintf("总CPU数: %s\n", r.CPUs)) + } + if r.Memory != "" { + result.WriteString(fmt.Sprintf("内存: %s\n", r.Memory)) + } + if r.GPUs != "" { + result.WriteString(fmt.Sprintf("GPU数量: %s\n", r.GPUs)) + if r.GPUType != "" { + result.WriteString(fmt.Sprintf("GPU类型: %s\n", r.GPUType)) + } + } + if r.Time != "" { + result.WriteString(fmt.Sprintf("运行时间: %s\n", r.Time)) + } + if r.Account != "" { + result.WriteString(fmt.Sprintf("账户: %s\n", r.Account)) + } + if r.QOS != "" { + result.WriteString(fmt.Sprintf("服务质量: %s\n", r.QOS)) + } + if r.Constraint != "" { + result.WriteString(fmt.Sprintf("节点约束: %s\n", r.Constraint)) + } + if r.Exclusive { + result.WriteString("独占节点: 是\n") + } + if r.ArrayJobID != "" { + result.WriteString(fmt.Sprintf("数组作业: %s\n", r.ArrayJobID)) + } + if r.Output != "" { + result.WriteString(fmt.Sprintf("输出文件: %s\n", r.Output)) + } + if r.Error != "" { + result.WriteString(fmt.Sprintf("错误文件: %s\n", r.Error)) + } + + return result.String() +} + +// GetResourceSummary 获取核心资源摘要 +func (r *SlurmResource) GetResourceSummary() map[string]string { + summary := make(map[string]string) + + if r.JobName != "" { + summary["job_name"] = r.JobName + } + if r.Partition != "" { + summary["queue"] = r.Partition + } + if r.Nodes != "" { + summary["nodes"] = r.Nodes + } + if r.Tasks != "" { + summary["tasks"] = r.Tasks + } + if r.CPUs != "" { + summary["total_cpus"] = r.CPUs + } + if r.CPUsPerTask != "" { + summary["cpus_per_task"] = r.CPUsPerTask + } + if r.Memory != "" { + summary["memory"] = r.Memory + } + if r.GPUs != "" { + summary["gpus"] = r.GPUs + } + if r.Time != "" { + summary["time_limit"] = r.Time + } + + return summary +}