From 6000644d671c9e8dcdad2569c5189fca43d2f781 Mon Sep 17 00:00:00 2001 From: tzwang Date: Wed, 15 Jan 2025 16:17:48 +0800 Subject: [PATCH] added saveToChain func --- .../schedule/schedulecreatetaskhandler.go | 4 +++ .../handler/schedule/schedulesubmithandler.go | 4 +++ .../logic/schedule/schedulecreatetasklogic.go | 8 ++--- .../logic/schedule/schedulesubmitlogic.go | 2 +- internal/scheduler/database/aiStorage.go | 14 ++++++-- internal/scheduler/scheduler.go | 6 ++-- internal/scheduler/schedulers/aiScheduler.go | 2 +- .../imageInference/imageInference.go | 2 +- .../inference/textInference/textInference.go | 2 +- .../service/utils/blockchain/chain.go | 35 +++++++++++++++++++ 10 files changed, 67 insertions(+), 12 deletions(-) create mode 100644 internal/scheduler/service/utils/blockchain/chain.go diff --git a/internal/handler/schedule/schedulecreatetaskhandler.go b/internal/handler/schedule/schedulecreatetaskhandler.go index 71415b6a..e059e01b 100644 --- a/internal/handler/schedule/schedulecreatetaskhandler.go +++ b/internal/handler/schedule/schedulecreatetaskhandler.go @@ -18,6 +18,10 @@ func ScheduleCreateTaskHandler(svcCtx *svc.ServiceContext) http.HandlerFunc { return } + // 获取token信息 + token := r.Header.Get("Authorization") + req.Token = token + l := schedule.NewScheduleCreateTaskLogic(r.Context(), svcCtx) resp, err := l.ScheduleCreateTask(&req) result.HttpResult(r, w, resp, err) diff --git a/internal/handler/schedule/schedulesubmithandler.go b/internal/handler/schedule/schedulesubmithandler.go index 81e3acff..6054136f 100644 --- a/internal/handler/schedule/schedulesubmithandler.go +++ b/internal/handler/schedule/schedulesubmithandler.go @@ -17,6 +17,10 @@ func ScheduleSubmitHandler(svcCtx *svc.ServiceContext) http.HandlerFunc { return } + // 获取token信息 + token := r.Header.Get("Authorization") + req.Token = token + l := schedule.NewScheduleSubmitLogic(r.Context(), svcCtx) resp, err := l.ScheduleSubmit(&req) result.HttpResult(r, w, resp, err) diff --git a/internal/logic/schedule/schedulecreatetasklogic.go b/internal/logic/schedule/schedulecreatetasklogic.go index d07fbb57..4df2d910 100644 --- a/internal/logic/schedule/schedulecreatetasklogic.go +++ b/internal/logic/schedule/schedulecreatetasklogic.go @@ -61,7 +61,7 @@ func (l *ScheduleCreateTaskLogic) ScheduleCreateTask(req *types.CreateTaskReq) ( ClusterId: req.JobResources.Clusters[0].ClusterID, }}, req.JobResources.Clusters) - taskId, err := l.createTask("SCHEDULE_TASK_"+utils.RandomString(TRAINNING_TASK_SUFFIX_LEN), req.JobResources.ScheduleStrategy, assignedClusters) + taskId, err := l.createTask("SCHEDULE_TASK_"+utils.RandomString(TRAINNING_TASK_SUFFIX_LEN), req.JobResources.ScheduleStrategy, assignedClusters, req.Token) if err != nil { return nil, err } @@ -87,7 +87,7 @@ func (l *ScheduleCreateTaskLogic) ScheduleCreateTask(req *types.CreateTaskReq) ( if err != nil { return nil, err } - taskId, err := l.createTask("SCHEDULE_TASK_"+utils.RandomString(TRAINNING_TASK_SUFFIX_LEN), req.JobResources.ScheduleStrategy, assignedClusters) + taskId, err := l.createTask("SCHEDULE_TASK_"+utils.RandomString(TRAINNING_TASK_SUFFIX_LEN), req.JobResources.ScheduleStrategy, assignedClusters, req.Token) if err != nil { return nil, err } @@ -210,7 +210,7 @@ func copyParams(clusters []*strategy.AssignedCluster, clusterInfos []*types.JobC return result } -func (l *ScheduleCreateTaskLogic) createTask(taskName string, strategyName string, clusters []*strategy.AssignedCluster) (int64, error) { +func (l *ScheduleCreateTaskLogic) createTask(taskName string, strategyName string, clusters []*strategy.AssignedCluster, token string) (int64, error) { var synergyStatus int64 if len(clusters) > 1 { synergyStatus = 1 @@ -221,7 +221,7 @@ func (l *ScheduleCreateTaskLogic) createTask(taskName string, strategyName strin fmt.Printf("Error while Marshaling. %v", err) } - taskId, err := l.svcCtx.Scheduler.CreateTask(taskName, synergyStatus, strategyName, string(y)) + taskId, err := l.svcCtx.Scheduler.CreateTask(taskName, synergyStatus, strategyName, string(y), token, &l.svcCtx.Config) if err != nil { return 0, err } diff --git a/internal/logic/schedule/schedulesubmitlogic.go b/internal/logic/schedule/schedulesubmitlogic.go index 522781a6..0e527428 100644 --- a/internal/logic/schedule/schedulesubmitlogic.go +++ b/internal/logic/schedule/schedulesubmitlogic.go @@ -65,7 +65,7 @@ func (l *ScheduleSubmitLogic) ScheduleSubmit(req *types.ScheduleReq) (resp *type synergystatus = 1 } - taskId, err := l.svcCtx.Scheduler.CreateTask(req.AiOption.TaskName, synergystatus, req.AiOption.Strategy, "") + taskId, err := l.svcCtx.Scheduler.CreateTask(req.AiOption.TaskName, synergystatus, req.AiOption.Strategy, "", req.Token, &l.svcCtx.Config) if err != nil { return nil, err } diff --git a/internal/scheduler/database/aiStorage.go b/internal/scheduler/database/aiStorage.go index c560c162..eafefb6d 100644 --- a/internal/scheduler/database/aiStorage.go +++ b/internal/scheduler/database/aiStorage.go @@ -103,7 +103,7 @@ func (s *AiStorage) GetAiTaskListById(id int64) ([]*models.TaskAi, error) { return aiTaskList, nil } -func (s *AiStorage) SaveTask(name string, strategyCode int64, synergyStatus int64, aiType string, yaml string) (int64, error) { +func (s *AiStorage) SaveTask(name string, strategyCode int64, synergyStatus int64, aiType string, yaml string, saveToChain func(task models.Task, id int64) error) (int64, error) { startTime := time.Now() // 构建主任务结构体 taskModel := models.Task{ @@ -123,7 +123,17 @@ func (s *AiStorage) SaveTask(name string, strategyCode int64, synergyStatus int6 if tx.Error != nil { return 0, tx.Error } - return taskModel.Id, nil + + id := taskModel.Id + + // 数据上链 + if saveToChain != nil { + err := saveToChain(taskModel, id) + if err != nil { + logx.Error(err) + } + } + return id, nil } func (s *AiStorage) UpdateTask(task *types.TaskModel) error { diff --git a/internal/scheduler/scheduler.go b/internal/scheduler/scheduler.go index 5399db0d..48388e4e 100644 --- a/internal/scheduler/scheduler.go +++ b/internal/scheduler/scheduler.go @@ -18,10 +18,12 @@ import ( "encoding/json" "github.com/pkg/errors" "github.com/zeromicro/go-zero/core/logx" + "gitlink.org.cn/JointCloud/pcm-coordinator/internal/config" "gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/common" "gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/database" "gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/service" "gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/service/executor" + "gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/service/utils/blockchain" "gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/strategy" "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/response" "gorm.io/gorm" @@ -186,13 +188,13 @@ func (s *Scheduler) SaveToDb() error { return nil } -func (s *Scheduler) CreateTask(taskName string, synergyCode int64, strategyName string, yaml string) (int64, error) { +func (s *Scheduler) CreateTask(taskName string, synergyCode int64, strategyName string, yaml string, token string, config *config.Config) (int64, error) { strategyCode, err := s.AiStorages.GetStrategyCode(strategyName) if err != nil { return 0, err } - id, err := s.AiStorages.SaveTask(taskName, strategyCode, synergyCode, "10", yaml) + id, err := s.AiStorages.SaveTask(taskName, strategyCode, synergyCode, "10", yaml, blockchain.SaveToChain(token, config)) if err != nil { return 0, err } diff --git a/internal/scheduler/schedulers/aiScheduler.go b/internal/scheduler/schedulers/aiScheduler.go index 6ef1dcf5..6aea740a 100644 --- a/internal/scheduler/schedulers/aiScheduler.go +++ b/internal/scheduler/schedulers/aiScheduler.go @@ -229,7 +229,7 @@ func (as *AiScheduler) AssignTask(clusters []*strategy.AssignedCluster, mode int synergystatus = 1 } - taskId, err := as.CreateTask(as.option.TaskName, synergystatus, as.option.StrategyName, "") + taskId, err := as.CreateTask(as.option.TaskName, synergystatus, as.option.StrategyName, "", "", nil) if err != nil { return nil, err } diff --git a/internal/scheduler/service/inference/imageInference/imageInference.go b/internal/scheduler/service/inference/imageInference/imageInference.go index a9a80aef..9881d91f 100644 --- a/internal/scheduler/service/inference/imageInference/imageInference.go +++ b/internal/scheduler/service/inference/imageInference/imageInference.go @@ -126,7 +126,7 @@ func (i *ImageInference) saveTask() (int64, error) { return 0, err } - id, err := i.storage.SaveTask(i.opt.TaskName, strategyCode, synergystatus, i.inference.GetAiType(), "") + id, err := i.storage.SaveTask(i.opt.TaskName, strategyCode, synergystatus, i.inference.GetAiType(), "", nil) if err != nil { return 0, err } diff --git a/internal/scheduler/service/inference/textInference/textInference.go b/internal/scheduler/service/inference/textInference/textInference.go index 1b4ea6bb..aa462d86 100644 --- a/internal/scheduler/service/inference/textInference/textInference.go +++ b/internal/scheduler/service/inference/textInference/textInference.go @@ -70,7 +70,7 @@ func (ti *TextInference) saveTask() (int64, error) { var synergystatus int64 var strategyCode int64 - id, err := ti.storage.SaveTask(ti.opt.TaskName, strategyCode, synergystatus, ti.inference.GetAiType(), "") + id, err := ti.storage.SaveTask(ti.opt.TaskName, strategyCode, synergystatus, ti.inference.GetAiType(), "", nil) if err != nil { return 0, err } diff --git a/internal/scheduler/service/utils/blockchain/chain.go b/internal/scheduler/service/utils/blockchain/chain.go new file mode 100644 index 00000000..32b08ae3 --- /dev/null +++ b/internal/scheduler/service/utils/blockchain/chain.go @@ -0,0 +1,35 @@ +package blockchain + +import ( + "encoding/json" + "gitlink.org.cn/JointCloud/pcm-coordinator/internal/config" + "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/models" + "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/utils/remoteUtil" + "strconv" +) + +func SaveToChain(token string, config *config.Config) func(task models.Task, id int64) error { + return func(task models.Task, id int64) error { + if token == "" { + return nil + } + // 数据上链 + bytes, err := json.Marshal(task) + if err != nil { + return err + } + err = remoteUtil.Evidence(remoteUtil.EvidenceParam{ + Url: config.BlockChain.Url, + ContractAddress: config.BlockChain.ContractAddress, + FunctionName: config.BlockChain.FunctionName, + Type: config.BlockChain.Type, + Token: token, + Args: []string{strconv.FormatInt(id, 10), string(bytes)}, + }) + if err != nil { + return err + } + + return nil + } +}