From 11ade9f6fd87d03439a2f7442df5af2342839a19 Mon Sep 17 00:00:00 2001 From: tzwang Date: Tue, 31 Dec 2024 17:38:59 +0800 Subject: [PATCH] updated openi --- internal/scheduler/schedulers/aiScheduler.go | 4 +- internal/scheduler/service/aiService.go | 2 +- internal/storeLink/openi.go | 73 ++++++++++++++++++-- 3 files changed, 71 insertions(+), 8 deletions(-) diff --git a/internal/scheduler/schedulers/aiScheduler.go b/internal/scheduler/schedulers/aiScheduler.go index 8e1bfff8..b39ba4e4 100644 --- a/internal/scheduler/schedulers/aiScheduler.go +++ b/internal/scheduler/schedulers/aiScheduler.go @@ -394,8 +394,8 @@ func convertType(in interface{}) (*AiResult, error) { } return &result, nil - case *model.CreateTask: - resp := (in).(*model.CreateTask) + case model.CreateTask: + resp := (in).(model.CreateTask) if resp.Code != 0 { result.Msg = resp.Msg diff --git a/internal/scheduler/service/aiService.go b/internal/scheduler/service/aiService.go index 014cc323..1a26c82a 100644 --- a/internal/scheduler/service/aiService.go +++ b/internal/scheduler/service/aiService.go @@ -91,7 +91,7 @@ func InitAiClusterMap(conf *config.Config, clusters []types.ClusterInfo) (map[st inferenceMap[c.Id] = sgai case OPENI: id, _ := strconv.ParseInt(c.Id, 10, 64) - openi := storeLink.NewOpenI("http://localhost:2024", id, c.Username, c.Token) + openi := storeLink.NewOpenI(c.Server, id, c.Username, c.Token) collectorMap[c.Id] = openi executorMap[c.Id] = openi inferenceMap[c.Id] = openi diff --git a/internal/storeLink/openi.go b/internal/storeLink/openi.go index 10dfe6fb..309efbd7 100644 --- a/internal/storeLink/openi.go +++ b/internal/storeLink/openi.go @@ -10,6 +10,7 @@ import ( "gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/service/collector" "gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/service/executor" "gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/service/inference" + "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/constants" "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/utils" "gitlink.org.cn/JointCloud/pcm-openi/common" "gitlink.org.cn/JointCloud/pcm-openi/model" @@ -19,6 +20,7 @@ import ( "strings" "sync" "sync/atomic" + "time" ) const ( @@ -34,6 +36,7 @@ const ( TaskCreatelUrl = "/api/v1/task/create" ReposUrl = "/api/v1/user/repos" TaskListUrl = "/api/v1/task/list" + TaskDetailsUrl = "/api/v1/task/detail" ) // compute source @@ -214,9 +217,9 @@ func (o *OpenI) SubmitTask(ctx context.Context, imageId string, cmd string, envs byt := bytes.NewBuffer(b) resp := struct { - Code int `json:"code"` - Msg string `json:"msg"` - Data *model.CreateTask `json:"data"` + Code int `json:"code"` + Msg string `json:"msg"` + Data model.CreateTask `json:"data"` }{} req := common.GetRestyRequest(common.TIMEOUT) @@ -286,7 +289,68 @@ func (o OpenI) GetTrainingTaskLog(ctx context.Context, taskId string, instanceNu } func (o OpenI) GetTrainingTask(ctx context.Context, taskId string) (*collector.Task, error) { - return nil, errors.New("failed to implement") + taskDetailsUrl := o.host + TaskDetailsUrl + + param := model.TaskDetailParam{ + UserName: o.userName, + RepoName: TESTREPO, + Id: taskId, + } + + b, _ := json.Marshal(param) + byt := bytes.NewBuffer(b) + + resp := struct { + Code int `json:"code"` + Msg string `json:"msg"` + Data model.TaskDetail `json:"data"` + }{} + + req := common.GetRestyRequest(common.TIMEOUT) + r, _ := http.NewRequest("GET", taskDetailsUrl, byt) + req.RawRequest = r + req.URL = taskDetailsUrl + + _, err := req. + SetHeader("Content-Type", "application/json"). + SetQueryParam(common.ACCESSTOKEN, o.accessToken). + SetBody(byt). + SetResult(&resp). + Send() + + if err != nil { + return nil, errors.New("failed to invoke taskDetails") + } + + if resp.Data.Code != 0 { + return nil, errors.New(resp.Msg) + } + + var task collector.Task + task.Id = strconv.Itoa(resp.Data.Data.Task.Id) + if resp.Data.Data.Task.StartTime != 0 { + task.Start = time.Unix(int64(resp.Data.Data.Task.StartTime), 0).Format(constants.Layout) + } + if resp.Data.Data.Task.EndTime != 0 { + task.End = time.Unix(int64(resp.Data.Data.Task.EndTime), 0).Format(constants.Layout) + } + + switch resp.Data.Data.Task.Status { + case "SUCCEEDED": + task.Status = constants.Completed + case "FAILED": + task.Status = constants.Failed + case "RUNNING": + task.Status = constants.Running + case "STOPPED": + task.Status = constants.Stopped + case "PENDING": + task.Status = constants.Pending + default: + task.Status = "undefined" + } + + return &task, nil } func (o OpenI) DownloadAlgorithmCode(ctx context.Context, resourceType string, card string, taskType string, dataset string, algorithm string) (string, error) { @@ -313,7 +377,6 @@ func (o OpenI) GetResourceSpecs(ctx context.Context) (*collector.ResourceSpec, e creationRequirelUrl := o.host + CreationRequirelUrl reposUrl := o.host + ReposUrl taskListUrl := o.host + TaskListUrl - //taskDetailsUrl := o.host + "/api/v1/task/detail" var wg sync.WaitGroup var ch = make(chan *collector.Usage)