Browse Source

updated openi

pull/379/head
tzwang 10 months ago
parent
commit
11ade9f6fd
3 changed files with 71 additions and 8 deletions
  1. +2
    -2
      internal/scheduler/schedulers/aiScheduler.go
  2. +1
    -1
      internal/scheduler/service/aiService.go
  3. +68
    -5
      internal/storeLink/openi.go

+ 2
- 2
internal/scheduler/schedulers/aiScheduler.go View File

@@ -394,8 +394,8 @@ func convertType(in interface{}) (*AiResult, error) {
} }


return &result, nil return &result, nil
case *model.CreateTask:
resp := (in).(*model.CreateTask)
case model.CreateTask:
resp := (in).(model.CreateTask)


if resp.Code != 0 { if resp.Code != 0 {
result.Msg = resp.Msg result.Msg = resp.Msg


+ 1
- 1
internal/scheduler/service/aiService.go View File

@@ -91,7 +91,7 @@ func InitAiClusterMap(conf *config.Config, clusters []types.ClusterInfo) (map[st
inferenceMap[c.Id] = sgai inferenceMap[c.Id] = sgai
case OPENI: case OPENI:
id, _ := strconv.ParseInt(c.Id, 10, 64) id, _ := strconv.ParseInt(c.Id, 10, 64)
openi := storeLink.NewOpenI("http://localhost:2024", id, c.Username, c.Token)
openi := storeLink.NewOpenI(c.Server, id, c.Username, c.Token)
collectorMap[c.Id] = openi collectorMap[c.Id] = openi
executorMap[c.Id] = openi executorMap[c.Id] = openi
inferenceMap[c.Id] = openi inferenceMap[c.Id] = openi


+ 68
- 5
internal/storeLink/openi.go View File

@@ -10,6 +10,7 @@ import (
"gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/service/collector" "gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/service/collector"
"gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/service/executor" "gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/service/executor"
"gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/service/inference" "gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/service/inference"
"gitlink.org.cn/JointCloud/pcm-coordinator/pkg/constants"
"gitlink.org.cn/JointCloud/pcm-coordinator/pkg/utils" "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/utils"
"gitlink.org.cn/JointCloud/pcm-openi/common" "gitlink.org.cn/JointCloud/pcm-openi/common"
"gitlink.org.cn/JointCloud/pcm-openi/model" "gitlink.org.cn/JointCloud/pcm-openi/model"
@@ -19,6 +20,7 @@ import (
"strings" "strings"
"sync" "sync"
"sync/atomic" "sync/atomic"
"time"
) )


const ( const (
@@ -34,6 +36,7 @@ const (
TaskCreatelUrl = "/api/v1/task/create" TaskCreatelUrl = "/api/v1/task/create"
ReposUrl = "/api/v1/user/repos" ReposUrl = "/api/v1/user/repos"
TaskListUrl = "/api/v1/task/list" TaskListUrl = "/api/v1/task/list"
TaskDetailsUrl = "/api/v1/task/detail"
) )


// compute source // compute source
@@ -214,9 +217,9 @@ func (o *OpenI) SubmitTask(ctx context.Context, imageId string, cmd string, envs
byt := bytes.NewBuffer(b) byt := bytes.NewBuffer(b)


resp := struct { resp := struct {
Code int `json:"code"`
Msg string `json:"msg"`
Data *model.CreateTask `json:"data"`
Code int `json:"code"`
Msg string `json:"msg"`
Data model.CreateTask `json:"data"`
}{} }{}


req := common.GetRestyRequest(common.TIMEOUT) req := common.GetRestyRequest(common.TIMEOUT)
@@ -286,7 +289,68 @@ func (o OpenI) GetTrainingTaskLog(ctx context.Context, taskId string, instanceNu
} }


func (o OpenI) GetTrainingTask(ctx context.Context, taskId string) (*collector.Task, error) { func (o OpenI) GetTrainingTask(ctx context.Context, taskId string) (*collector.Task, error) {
return nil, errors.New("failed to implement")
taskDetailsUrl := o.host + TaskDetailsUrl

param := model.TaskDetailParam{
UserName: o.userName,
RepoName: TESTREPO,
Id: taskId,
}

b, _ := json.Marshal(param)
byt := bytes.NewBuffer(b)

resp := struct {
Code int `json:"code"`
Msg string `json:"msg"`
Data model.TaskDetail `json:"data"`
}{}

req := common.GetRestyRequest(common.TIMEOUT)
r, _ := http.NewRequest("GET", taskDetailsUrl, byt)
req.RawRequest = r
req.URL = taskDetailsUrl

_, err := req.
SetHeader("Content-Type", "application/json").
SetQueryParam(common.ACCESSTOKEN, o.accessToken).
SetBody(byt).
SetResult(&resp).
Send()

if err != nil {
return nil, errors.New("failed to invoke taskDetails")
}

if resp.Data.Code != 0 {
return nil, errors.New(resp.Msg)
}

var task collector.Task
task.Id = strconv.Itoa(resp.Data.Data.Task.Id)
if resp.Data.Data.Task.StartTime != 0 {
task.Start = time.Unix(int64(resp.Data.Data.Task.StartTime), 0).Format(constants.Layout)
}
if resp.Data.Data.Task.EndTime != 0 {
task.End = time.Unix(int64(resp.Data.Data.Task.EndTime), 0).Format(constants.Layout)
}

switch resp.Data.Data.Task.Status {
case "SUCCEEDED":
task.Status = constants.Completed
case "FAILED":
task.Status = constants.Failed
case "RUNNING":
task.Status = constants.Running
case "STOPPED":
task.Status = constants.Stopped
case "PENDING":
task.Status = constants.Pending
default:
task.Status = "undefined"
}

return &task, nil
} }


func (o OpenI) DownloadAlgorithmCode(ctx context.Context, resourceType string, card string, taskType string, dataset string, algorithm string) (string, error) { func (o OpenI) DownloadAlgorithmCode(ctx context.Context, resourceType string, card string, taskType string, dataset string, algorithm string) (string, error) {
@@ -313,7 +377,6 @@ func (o OpenI) GetResourceSpecs(ctx context.Context) (*collector.ResourceSpec, e
creationRequirelUrl := o.host + CreationRequirelUrl creationRequirelUrl := o.host + CreationRequirelUrl
reposUrl := o.host + ReposUrl reposUrl := o.host + ReposUrl
taskListUrl := o.host + TaskListUrl taskListUrl := o.host + TaskListUrl
//taskDetailsUrl := o.host + "/api/v1/task/detail"


var wg sync.WaitGroup var wg sync.WaitGroup
var ch = make(chan *collector.Usage) var ch = make(chan *collector.Usage)


Loading…
Cancel
Save