diff --git a/desc/schedule/pcm-schedule.api b/desc/schedule/pcm-schedule.api index 02e8b8bd..d5b5d6e3 100644 --- a/desc/schedule/pcm-schedule.api +++ b/desc/schedule/pcm-schedule.api @@ -204,6 +204,7 @@ type ( CodeDistribute { DataName string `json:"dataName,optional"` PackageID int64 `json:"packageID"` + Output string `json:"output"` Clusters []*ClusterScheduled `json:"clusters"` } diff --git a/etc/pcm.yaml b/etc/pcm.yaml index 3ea863bd..90805d77 100644 --- a/etc/pcm.yaml +++ b/etc/pcm.yaml @@ -81,4 +81,7 @@ BlockChain: ContractAddress: 0x22ac23bf2d2cf1b4d8fec9cb4d279c7da6718e35 FunctionName: "storeEvidence" MemberName: "pcm" - Type: "2" \ No newline at end of file + Type: "2" + +JcsMiddleware: + Url: 101.201.215.196:7891 \ No newline at end of file diff --git a/go.sum b/go.sum index 38619a56..2e827dd1 100644 --- a/go.sum +++ b/go.sum @@ -528,14 +528,6 @@ gitlink.org.cn/JointCloud/pcm-ac v0.0.0-20250107025835-8fc888b1d170 h1:/n3pl6WuH gitlink.org.cn/JointCloud/pcm-ac v0.0.0-20250107025835-8fc888b1d170/go.mod h1:3eECiw9O2bIFkkePlloKyLNXiqBAhOxNrDoGaaGseGY= gitlink.org.cn/JointCloud/pcm-hpc v0.0.0-20241125115811-72f3568255a4 h1:WIs/189lRLNMXF2ui/Wm1+Y55eJ53BVGx+4+gdn9cls= gitlink.org.cn/JointCloud/pcm-hpc v0.0.0-20241125115811-72f3568255a4/go.mod h1:YbuoRgF9sEVvNJPQtGRjdocX7Du6NBOTLn+GVwqRVjo= -gitlink.org.cn/JointCloud/pcm-modelarts v0.0.0-20250108072048-9adf0597b07c h1:9LphS29VNfoWT73eqhgwKV1nG8PcoDUNu7dRev845wA= -gitlink.org.cn/JointCloud/pcm-modelarts v0.0.0-20250108072048-9adf0597b07c/go.mod h1:V19vFg8dWRAbaskASoSj70dgpacswOqZu/SaI02dxac= -gitlink.org.cn/JointCloud/pcm-modelarts v0.0.0-20250304023304-d556ce8161c7 h1:pv1WX3+ttqsHs7nr7+lfYNkvzUp1KIJQ0XzWbVetj6w= -gitlink.org.cn/JointCloud/pcm-modelarts v0.0.0-20250304023304-d556ce8161c7/go.mod h1:MxtnJJcU8S4zfGKZVcg2MOXGtwucKy7MMDwA0IemBd0= -gitlink.org.cn/JointCloud/pcm-modelarts v0.0.0-20250304035519-da6ab53b969d h1:EfAxN4oaCVIRsnM3pnC7NskifFRjM/THBUiMGtQQzfg= -gitlink.org.cn/JointCloud/pcm-modelarts v0.0.0-20250304035519-da6ab53b969d/go.mod h1:MxtnJJcU8S4zfGKZVcg2MOXGtwucKy7MMDwA0IemBd0= -gitlink.org.cn/JointCloud/pcm-modelarts v0.0.0-20250306022112-4ed1f08d3170 h1:NsHFtWPpcL8nF0s4v0DHuHuPaPFgMO9xITQCMM7Du1E= -gitlink.org.cn/JointCloud/pcm-modelarts v0.0.0-20250306022112-4ed1f08d3170/go.mod h1:MxtnJJcU8S4zfGKZVcg2MOXGtwucKy7MMDwA0IemBd0= gitlink.org.cn/JointCloud/pcm-modelarts v0.0.0-20250306073530-56ecf1273207 h1:korhOkFl0x1wuQBKoKTsQHeFboDwLFRWwR2G9IPPfNg= gitlink.org.cn/JointCloud/pcm-modelarts v0.0.0-20250306073530-56ecf1273207/go.mod h1:MxtnJJcU8S4zfGKZVcg2MOXGtwucKy7MMDwA0IemBd0= gitlink.org.cn/JointCloud/pcm-octopus v0.0.0-20240817071412-44397870b110 h1:GaXwr5sgDh0raHjUf9IewTvnRvajYea7zbLsaerYyXo= diff --git a/internal/config/config.go b/internal/config/config.go index 3719362a..76131a02 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -51,6 +51,8 @@ type Config struct { SnowflakeConf SnowflakeConf Monitoring Monitoring + + JcsMiddleware JcsMiddleware } type Monitoring struct { PromUrl string @@ -61,3 +63,7 @@ type Monitoring struct { type SnowflakeConf struct { MachineId int64 `json:"machineId"` } + +type JcsMiddleware struct { + Url string +} diff --git a/internal/scheduler/scheduler.go b/internal/scheduler/scheduler.go index c2fc2983..673ae0e9 100644 --- a/internal/scheduler/scheduler.go +++ b/internal/scheduler/scheduler.go @@ -29,7 +29,6 @@ import ( "gorm.io/gorm" "sigs.k8s.io/yaml" "strings" - "sync" ) type Scheduler struct { @@ -40,7 +39,6 @@ type Scheduler struct { result []string //pID:子任务yamlstring 键值对 AiStorages *database.AiStorage AiService *service.AiService - mu sync.RWMutex } type SubSchedule interface { diff --git a/internal/scheduler/schedulers/aiScheduler.go b/internal/scheduler/schedulers/aiScheduler.go index c280bea2..8174f528 100644 --- a/internal/scheduler/schedulers/aiScheduler.go +++ b/internal/scheduler/schedulers/aiScheduler.go @@ -26,6 +26,7 @@ import ( "gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/schedulers/option" "gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/service/collector" "gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/service/executor" + "gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/service/utils/jcs" "gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/strategy" "gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/strategy/param" "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/constants" @@ -256,6 +257,13 @@ func (as *AiScheduler) handleErrors(errs []interface{}, clusters []*strategy.Ass return err } + //report msg + report := &jcs.JobStatusReportReq{ + TaskName: "", + TaskID: strconv.FormatInt(taskId, 10), + Messages: make([]*jcs.ReportMessage, 0), + } + var errmsg string for _, err := range errs { e := (err).(struct { @@ -271,6 +279,15 @@ func (as *AiScheduler) handleErrors(errs []interface{}, clusters []*strategy.Ass if err != nil { return errors.New("database add failed: " + err.Error()) } + + //add report msg + jobMsg := &jcs.ReportMessage{ + Status: false, + Message: msg, + ClusterID: e.clusterId, + Output: "", + } + report.Messages = append(report.Messages, jobMsg) } for _, s := range results { as.option.ComputeCard = s.Card //execute card @@ -291,7 +308,21 @@ func (as *AiScheduler) handleErrors(errs []interface{}, clusters []*strategy.Ass return errors.New("database add failed: " + err.Error()) } } + //add report msg + jobMsg := &jcs.ReportMessage{ + Status: false, + Message: s.Msg, + ClusterID: s.ClusterId, + Output: "", + } + report.Messages = append(report.Messages, jobMsg) + } + + //report status + if mode == executor.SUBMIT_MODE_STORAGE_SCHEDULE { + _ = jcs.StatusReport(as.AiService.Conf.JcsMiddleware.Url, report) } + logx.Errorf(errors.New(errmsg).Error()) return errors.New(errmsg) } diff --git a/internal/scheduler/service/aiService.go b/internal/scheduler/service/aiService.go index f0be4a74..9d6064f7 100644 --- a/internal/scheduler/service/aiService.go +++ b/internal/scheduler/service/aiService.go @@ -29,6 +29,7 @@ type AiService struct { InferenceAdapterMap map[string]map[string]inference.ICluster Storage *database.AiStorage LocalCache map[string]interface{} + Conf *config.Config } func NewAiService(conf *config.Config, storages *database.AiStorage, localCache map[string]interface{}) (*AiService, error) { @@ -43,6 +44,7 @@ func NewAiService(conf *config.Config, storages *database.AiStorage, localCache InferenceAdapterMap: make(map[string]map[string]inference.ICluster), Storage: storages, LocalCache: localCache, + Conf: conf, } for _, id := range adapterIds { clusters, err := storages.GetClustersByAdapterId(id) diff --git a/internal/scheduler/service/utils/jcs/middleware.go b/internal/scheduler/service/utils/jcs/middleware.go new file mode 100644 index 00000000..0ff5c374 --- /dev/null +++ b/internal/scheduler/service/utils/jcs/middleware.go @@ -0,0 +1,38 @@ +package jcs + +import ( + "gitlink.org.cn/JointCloud/pcm-openi/common" +) + +type JobStatusReportReq struct { + TaskName string `json:"taskName"` + TaskID string `json:"taskID"` + Messages []*ReportMessage `json:"messages"` +} +type ReportMessage struct { + Status bool `json:"status"` + Message string `json:"message"` + ClusterID string `json:"clusterID"` + Output string `json:"output"` +} + +func StatusReport(url string, report *JobStatusReportReq) error { + resp := struct { + Code int `json:"code"` + Msg string `json:"msg"` + Data interface{} `json:"data"` + }{} + + req := common.GetRestyRequest(common.TIMEOUT) + _, err := req. + SetHeader("Content-Type", "application/json"). + SetBody(&report). + SetResult(&resp). + Post(url) + + if err != nil { + return err + } + + return nil +} diff --git a/internal/scheduler/service/utils/status/taskStatusSync.go b/internal/scheduler/service/utils/status/taskStatusSync.go index d33e82b4..e6c0f455 100644 --- a/internal/scheduler/service/utils/status/taskStatusSync.go +++ b/internal/scheduler/service/utils/status/taskStatusSync.go @@ -4,6 +4,7 @@ import ( "errors" "fmt" "github.com/zeromicro/go-zero/core/logx" + "gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/service/utils/jcs" "gitlink.org.cn/JointCloud/pcm-coordinator/internal/svc" "gitlink.org.cn/JointCloud/pcm-coordinator/internal/types" "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/constants" @@ -61,6 +62,7 @@ func UpdateTaskStatus(svc *svc.ServiceContext, tasklist []*types.TaskModel) { if len(aiTask) == 1 { if aiTask[0].Status == constants.Completed { task.Status = constants.Succeeded + _ = reportStatusMessages(svc, task, aiTask[0]) } else { task.Status = aiTask[0].Status } @@ -142,6 +144,26 @@ func UpdateTaskStatus(svc *svc.ServiceContext, tasklist []*types.TaskModel) { } } +func reportStatusMessages(svc *svc.ServiceContext, task *types.TaskModel, aiTask *models.TaskAi) error { + report := &jcs.JobStatusReportReq{ + TaskName: task.Name, + TaskID: strconv.FormatInt(task.Id, 10), + Messages: make([]*jcs.ReportMessage, 0), + } + //add report msg + jobMsg := &jcs.ReportMessage{ + Status: true, + Message: "", + ClusterID: strconv.FormatInt(aiTask.ClusterId, 10), + Output: aiTask.JobId, + } + report.Messages = append(report.Messages, jobMsg) + + _ = jcs.StatusReport(svc.Scheduler.AiService.Conf.JcsMiddleware.Url, report) + + return nil +} + func updateInferTaskStatus(svc *svc.ServiceContext, task types.TaskModel) { aiTask, err := svc.Scheduler.AiStorages.GetAiTaskListById(task.Id) if err != nil { diff --git a/internal/types/types.go b/internal/types/types.go index cd14b4f3..e633fe24 100644 --- a/internal/types/types.go +++ b/internal/types/types.go @@ -5975,6 +5975,7 @@ type DatasetDistribute struct { type CodeDistribute struct { DataName string `json:"dataName,optional"` PackageID int64 `json:"packageID"` + Output string `json:"output"` Clusters []*ClusterScheduled `json:"clusters"` }