|
- package storeLink
-
- import (
- "bytes"
- "context"
- "encoding/json"
- "errors"
- "fmt"
- "github.com/json-iterator/go"
- "github.com/rs/zerolog/log"
- openIcom "gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/common"
- "gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/schedulers/option"
- "gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/service/collector"
- "gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/service/executor"
- "gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/service/inference"
- "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/constants"
- "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/utils"
- "gitlink.org.cn/JointCloud/pcm-openi/common"
- "gitlink.org.cn/JointCloud/pcm-openi/model"
- "mime/multipart"
- "net/http"
- "net/url"
- "strconv"
- "strings"
- "sync"
- "sync/atomic"
- "time"
- )
-
- const (
- DEBUG = "DEBUG"
- TRAIN = "TRAIN"
- INFERENCE = "INFERENCE"
- C2NET = "C2Net"
- TESTREPO = "testrepo"
- ONLINEINFERENCE = "ONLINEINFERENCE" //online inference
- )
-
- const (
- CreationRequirelUrl = "/api/v1/task/creationRequired"
- TaskCreatelUrl = "/api/v1/task/create"
- ReposUrl = "/api/v1/user/repos"
- TaskListUrl = "/api/v1/task/list"
- TaskDetailsUrl = "/api/v1/task/detail"
- TaskLogUrl = "/api/v1/task/log"
- TaskStopUrl = "/api/v1/task/stop"
- TaskOnlineInferUrl = "/api/v1/task/onlineInferUrl"
- )
-
- // compute source
- var (
- ComputeSource = []string{"GPU", "NPU", "GCU", "MLU", "DCU", "CPU", "ILUVATAR-GPGPU", "METAX-GPGPU"}
- )
-
- type ResourceSpecOpenI struct {
- ResType string
- Name string
- Number int64
- }
-
- type OpenI struct {
- participantId int64
- platform string
- host string
- userName string
- accessToken string
- }
-
- func NewOpenI(host string, id int64, name string, token string, platform string) *OpenI {
- return &OpenI{
- host: host,
- participantId: id,
- userName: name,
- accessToken: token,
- platform: platform,
- }
- }
-
- func (o *OpenI) Execute(ctx context.Context, option *option.AiOption, mode int) (interface{}, error) {
- switch mode {
- case executor.SUBMIT_MODE_JOINT_CLOUD:
-
- case executor.SUBMIT_MODE_STORAGE_SCHEDULE:
-
- var repoName string
-
- codePaths := strings.SplitN(option.AlgorithmId, FORWARD_SLASH, 3)
- if len(codePaths) != 3 {
- return nil, fmt.Errorf("algorithmId %s format is incorrect", option.AlgorithmId)
- }
- repoName = codePaths[0]
-
- spec := &ResourceSpecOpenI{}
- for _, res := range option.ResourcesRequired {
- typeName, ok := res["type"]
- if !ok {
- continue
- }
- name, ok := res["name"]
- if !ok {
- continue
- }
- for _, s := range ComputeSource {
- switch typeName {
- case s:
- num, ok := res["number"]
- if !ok {
- continue
- }
- n := openIcom.ConvertTypeToString(num)
- val, err := strconv.ParseInt(n, 10, 64)
- if err != nil {
- return nil, err
- }
- spec.ResType = s
- spec.Name = openIcom.ConvertTypeToString(name)
- spec.Number = val
- break
- }
- }
- }
-
- if spec.ResType == "" || spec.Name == "" {
- return nil, errors.New("resource spec not found")
- }
-
- creationRequirelUrl := o.host + CreationRequirelUrl
-
- param := model.TaskCreationRequiredParam{
- UserName: o.userName,
- RepoName: repoName,
- JobType: TRAIN,
- ComputeSource: spec.ResType,
- ClusterType: C2NET,
- }
-
- b, _ := json.Marshal(param)
- byt := bytes.NewBuffer(b)
-
- resp := struct {
- Code int `json:"code"`
- Msg string `json:"msg"`
- Data model.TaskCreationRequired `json:"data"`
- }{}
-
- req := common.GetRestyRequest(common.TIMEOUT)
- r, _ := http.NewRequest("GET", creationRequirelUrl, byt)
- req.RawRequest = r
- req.URL = creationRequirelUrl
-
- _, err := req.
- SetHeader("Content-Type", "application/json").
- SetQueryParam(common.ACCESSTOKEN, o.accessToken).
- SetBody(byt).
- SetResult(&resp).
- Send()
-
- if err != nil {
- return nil, errors.New("failed to invoke TaskCreationRequired; " + err.Error())
- }
-
- if len(resp.Data.Data.Specs.All) == 0 {
- return nil, errors.New("TaskCreationRequired specs are empty")
- }
-
- for _, s := range resp.Data.Data.Specs.All {
- if spec.ResType == s.ComputeResource && spec.Name == s.AccCardType {
- if int(spec.Number) == s.AccCardsNum {
- option.ResourceId = strconv.Itoa(s.Id) + FORWARD_SLASH + spec.ResType
- break
- }
- }
- }
-
- if option.ResourceId == "" {
- return nil, errors.New("can not find spec Id")
- }
-
- option.ComputeCard = spec.Name
- }
-
- task, err := o.SubmitTask(ctx, option.ImageId, option.Cmd, option.Envs, option.Params, option.ResourceId, option.DatasetsId, option.AlgorithmId, option.TaskType)
- if err != nil {
- return nil, err
- }
- return task, nil
- }
-
- func (o *OpenI) SubmitTask(ctx context.Context, imageId string, cmd string, envs []string, params []string, resourceId string, datasetsId string, algorithmId string, aiType string) (interface{}, error) {
- taskCreatelUrl := o.host + TaskCreatelUrl
- var repoName string
- var branchName string
- var bootFile string
- codePaths := strings.SplitN(algorithmId, FORWARD_SLASH, 3)
- if len(codePaths) != 3 {
- return nil, fmt.Errorf("algorithmId %s format is incorrect", algorithmId)
- }
-
- specs := strings.Split(resourceId, FORWARD_SLASH)
- specId, err := strconv.ParseInt(specs[0], 10, 0)
- if err != nil {
- return nil, err
- }
-
- computeSource := specs[1]
-
- repoName = codePaths[0]
- branchName = codePaths[1]
- bootFile = codePaths[2]
-
- // params
- var parameters struct {
- Parameter []struct {
- Label string `json:"label"`
- Value string `json:"value"`
- } `json:"parameter"`
- }
-
- // add default param
- current_id := strconv.Itoa(int(o.participantId))
- current_platform := CURRENT_PLATFORM + COMMA + current_id
- params = append(params, current_platform)
-
- for _, param := range params {
- s := strings.Split(param, COMMA)
- st := struct {
- Label string `json:"label"`
- Value string `json:"value"`
- }{
- Label: s[0],
- Value: s[1],
- }
- parameters.Parameter = append(parameters.Parameter, st)
- }
-
- paramStr, _ := json.Marshal(parameters)
-
- // choose imageId and imageUrl
- imgId, imgUrl, err := swapImageIdAndImageUrl(imageId)
- if err != nil {
- return nil, err
- }
-
- taskParam := &model.CreateTaskParam{
- Description: algorithmId, // temporarily set reponame contained in the algorithmId to desc for missing taskdetail's reponame
- JobType: TRAIN,
- Cluster: C2NET,
- DisplayJobName: strings.ToLower(TRAIN + UNDERSCORE + utils.RandomString(10)),
- ComputeSource: computeSource,
- SpecId: int(specId),
- BranchName: branchName,
- ImageId: imgId,
- ImageUrl: imgUrl,
- DatasetUuidStr: datasetsId,
- Params: string(paramStr),
- BootFile: bootFile,
- HasInternet: 2, // 0 不限制;1 不需要互联网;2 需要互联网
- WorkServerNumber: 1, // 运行节点数
- }
- param := model.CreateTaskReq{
- UserName: o.userName,
- RepoName: repoName,
- CreateTaskParam: taskParam,
- }
-
- resp := struct {
- Code int `json:"code"`
- Msg string `json:"msg"`
- Data model.CreateTask `json:"data"`
- }{}
-
- req := common.GetRestyRequest(common.TIMEOUT)
- _, err = req.
- SetHeader("Content-Type", "application/json").
- SetQueryParam(common.ACCESSTOKEN, o.accessToken).
- SetBody(¶m).
- SetResult(&resp).
- Post(taskCreatelUrl)
-
- if err != nil {
- return nil, err
- }
-
- if resp.Code != 200 {
- return nil, errors.New(resp.Msg)
- }
-
- if resp.Data.Code != 0 {
- return nil, errors.New(resp.Msg)
- }
-
- if (resp.Data == model.CreateTask{}) {
- return nil, errors.New("failed to submit task, empty response")
- }
-
- return resp.Data, nil
- }
-
- func swapImageIdAndImageUrl(imageId string) (string, string, error) {
- if imageId == "" {
- return "", "", errors.New("imageId is empty")
- }
- var imgId string
- var imgUrl string
-
- parsedURL, err := url.Parse("http://" + imageId)
- if err != nil {
- return "", "", err
- }
-
- if utils.IsValidHostAddress(parsedURL.Host) {
- imgId = ""
- imgUrl = imageId
- } else {
- imgId = imageId
- imgUrl = ""
- }
-
- return imgId, imgUrl, nil
- }
-
- func (o *OpenI) Stop(ctx context.Context, id string) error {
- task, err := o.getTrainingTask(ctx, id)
- if err != nil {
- return err
- }
-
- codePaths := strings.SplitN(task.Data.Task.Description, FORWARD_SLASH, 3)
- if len(codePaths) != 3 {
- return errors.New("failed to stop, openI desc not set")
- }
-
- repoName := codePaths[0]
-
- taskStopUrl := o.host + TaskStopUrl
-
- param := model.StopTaskParam{
- UserName: o.userName,
- RepoName: repoName,
- Id: id,
- }
-
- resp := struct {
- Code int `json:"code"`
- Msg string `json:"msg"`
- Data model.StopTask `json:"data"`
- }{}
-
- req := common.GetRestyRequest(common.TIMEOUT)
- _, err = req.
- SetHeader("Content-Type", "application/json").
- SetQueryParam(common.ACCESSTOKEN, o.accessToken).
- SetBody(¶m).
- SetResult(&resp).
- Post(taskStopUrl)
-
- if err != nil {
- return err
- }
-
- if resp.Code != http.StatusOK {
- return errors.New("failed to stop")
- }
-
- return nil
- }
-
- func (o *OpenI) GetClusterInferUrl(ctx context.Context, option *option.InferOption) (*inference.ClusterInferUrl, error) {
- return nil, errors.New("failed to implement")
- }
-
- func (o *OpenI) GetInferDeployInstanceList(ctx context.Context) ([]*inference.DeployInstance, error) {
- return nil, errors.New("failed to implement")
- }
-
- func (o *OpenI) StartInferDeployInstance(ctx context.Context, id string) bool {
- return false
- }
-
- func (o *OpenI) StopInferDeployInstance(ctx context.Context, id string) bool {
- err := o.Stop(ctx, id)
- if err != nil {
- return false
- }
- return true
- }
-
- func (o *OpenI) GetInferDeployInstance(ctx context.Context, id string) (*inference.DeployInstance, error) {
- task, err := o.getTrainingTask(ctx, id)
- if err != nil {
- return nil, err
- }
- description := task.Data.Task.Description
-
- //从描述中解析出repoName
- codePaths := strings.SplitN(description, FORWARD_SLASH, 3)
- if len(codePaths) != 3 {
- return nil, fmt.Errorf("algorithmId %s format is incorrect", description)
- }
-
- repoName := codePaths[0]
- var resp inference.DeployInstance
- resp.InstanceId = id
- resp.InstanceName = task.Data.Task.DisplayJobName
- resp.ModelName = task.Data.Task.PretrainModelName
- resp.ModelType = ""
- resp.InferCard = task.Data.Task.Spec.ComputeResource + "_" + task.Data.Task.Spec.AccCardType
- resp.ClusterName = o.platform
- resp.ClusterType = TYPE_OPENI
- //获取在线推理url
- var inferUrl string
- if task.Data.Task.Status == "RUNNING" {
- inferUrl, err = o.getOnlineInferUrl(ctx, id, repoName)
- if err != nil {
- return nil, err
- }
- }
-
- resp.InferUrl = inferUrl
- resp.Status = task.Data.Task.Status
- resp.CreatedTime = time.Unix(int64(task.Data.Task.CreatedUnix), 0).Format(constants.Layout)
- log.Debug().Msgf("func GetInferDeployInstance, resp: %v", resp)
- return &resp, nil
- }
-
- func (o *OpenI) CreateInferDeployInstance(ctx context.Context, option *option.InferOption) (string, error) {
- var repoName string
-
- codePaths := strings.SplitN(option.AlgorithmId, FORWARD_SLASH, 3)
- if len(codePaths) != 3 {
- return "", fmt.Errorf("algorithmId %s format is incorrect", option.AlgorithmId)
- }
-
- repoName = codePaths[0]
-
- spec := &ResourceSpecOpenI{}
- for _, res := range option.ResourcesRequired {
- typeName, ok := res["type"]
- if !ok {
- continue
- }
- name, ok := res["name"]
- if !ok {
- continue
- }
- for _, s := range ComputeSource {
- switch typeName {
- case s:
- num, ok := res["number"]
- if !ok {
- continue
- }
- n := openIcom.ConvertTypeToString(num)
- val, err := strconv.ParseInt(n, 10, 64)
- if err != nil {
- return "", err
- }
- spec.ResType = s
- spec.Name = openIcom.ConvertTypeToString(name)
- spec.Number = val
- break
- }
- }
- }
-
- if spec.ResType == "" || spec.Name == "" {
- return "", errors.New("resource spec not found")
- }
-
- creationRequirelUrl := o.host + CreationRequirelUrl
-
- param := model.TaskCreationRequiredParam{
- UserName: o.userName,
- RepoName: repoName,
- JobType: ONLINEINFERENCE,
- ComputeSource: spec.ResType,
- ClusterType: C2NET,
- }
-
- b, _ := json.Marshal(param)
- byt := bytes.NewBuffer(b)
-
- resp := struct {
- Code int `json:"code"`
- Msg string `json:"msg"`
- Data model.TaskCreationRequired `json:"data"`
- }{}
-
- req := common.GetRestyRequest(common.TIMEOUT)
- r, _ := http.NewRequest("GET", creationRequirelUrl, byt)
- req.RawRequest = r
- req.URL = creationRequirelUrl
-
- _, err := req.
- SetHeader("Content-Type", "application/json").
- SetQueryParam(common.ACCESSTOKEN, o.accessToken).
- SetBody(byt).
- SetResult(&resp).
- Send()
-
- if err != nil {
- return "", errors.New("failed to invoke TaskCreationRequired")
- }
-
- if len(resp.Data.Data.Specs.All) == 0 {
- return "", errors.New("TaskCreationRequired specs are empty")
- }
-
- for _, s := range resp.Data.Data.Specs.All {
- if spec.ResType == s.ComputeResource && spec.Name == s.AccCardType {
- if int(spec.Number) == s.AccCardsNum {
- option.ResourceId = strconv.Itoa(s.Id) + FORWARD_SLASH + spec.ResType
- break
- }
- }
- }
-
- if option.ResourceId == "" {
- return "", errors.New("can not find spec Id")
- }
-
- option.ComputeCard = spec.Name
- task, err := o.SubmitInferTask(ctx, option.ImageId, option.Cmd, option.Envs, option.Params, option.ResourceId, option.AlgorithmId, option.ModelID)
- if err != nil {
- return "", err
- }
- ma, err := jsoniter.Marshal(task)
- if err != nil {
- return "", err
- }
- taskId := jsoniter.Get(ma, "data").Get("id").ToString()
- return taskId, nil
- }
-
- func (o *OpenI) SubmitInferTask(ctx context.Context, imageId string, cmd string, envs []string, params []string, resourceId string, algorithmId string, modelId string) (interface{}, error) {
- taskCreatelUrl := o.host + TaskCreatelUrl
- var repoName string
- var branchName string
- var bootFile string
-
- //从描述中解析出repoName
- codePaths := strings.SplitN(algorithmId, FORWARD_SLASH, 3)
- if len(codePaths) != 3 {
- return nil, fmt.Errorf("algorithmId %s format is incorrect", algorithmId)
- }
-
- specs := strings.Split(resourceId, FORWARD_SLASH)
- specId, err := strconv.ParseInt(specs[0], 10, 0)
- if err != nil {
- return nil, err
- }
-
- computeSource := specs[1]
-
- repoName = codePaths[0]
- branchName = codePaths[1]
- bootFile = strings.Join(codePaths[2:], "/")
- log.Printf("repoName: %s, branchName: %s, bootFile: %s", repoName, branchName, bootFile)
- //params := "{\"parameter\":[{\"label\":\"a\",\"value\":\"1\"},{\"label\":\"b\",\"value\":\"2\"}]}"
-
- // choose imageId and imageUrl
- imgId, imgUrl, err := swapImageIdAndImageUrl(imageId)
- if err != nil {
- return nil, err
- }
-
- taskParam := &model.CreateTaskParam{
- Description: algorithmId, // temporarily set reponame contained in the algorithmId to desc for missing taskdetail's reponame
- JobType: ONLINEINFERENCE,
- Cluster: C2NET,
- DisplayJobName: strings.ToLower(ONLINEINFERENCE + UNDERSCORE + utils.RandomString(10)),
- ComputeSource: computeSource,
- SpecId: int(specId),
- BranchName: branchName,
- ImageId: imgId,
- ImageUrl: imgUrl,
- PretrainModelIdStr: modelId,
- BootFile: bootFile,
- HasInternet: 2, // 0 不限制;1 不需要互联网;2 需要互联网
- WorkServerNumber: 1, // 运行节点数
- }
- param := model.CreateTaskReq{
- UserName: o.userName,
- RepoName: repoName,
- CreateTaskParam: taskParam,
- }
-
- resp := struct {
- Code int `json:"code"`
- Msg string `json:"msg"`
- Data model.CreateTask `json:"data"`
- }{}
-
- req := common.GetRestyRequest(common.TIMEOUT)
- _, err = req.
- SetHeader("Content-Type", "application/json").
- SetQueryParam(common.ACCESSTOKEN, o.accessToken).
- SetBody(¶m).
- SetResult(&resp).
- Post(taskCreatelUrl)
-
- if err != nil {
- return nil, err
- }
-
- if resp.Code != http.StatusOK {
- return nil, errors.New(resp.Msg)
- }
-
- if resp.Data.Data.Id == 0 {
- return nil, fmt.Errorf("failed to submit task, msg: [%s]", resp.Data.Msg)
- }
-
- return resp.Data, nil
- }
-
- func (o *OpenI) CheckModelExistence(ctx context.Context, modelName string, modelType string) bool {
- return false
- }
-
- func (o *OpenI) GetImageInferResult(ctx context.Context, url string, file multipart.File, fileName string) (string, error) {
- return "", errors.New("failed to implement")
- }
-
- func (o *OpenI) GetResourceStats(ctx context.Context) (*collector.ResourceStats, error) {
- return nil, errors.New("failed to implement")
- }
-
- func (o *OpenI) GetDatasetsSpecs(ctx context.Context) ([]*collector.DatasetsSpecs, error) {
- return nil, errors.New("failed to implement")
- }
-
- func (o *OpenI) GetAlgorithms(ctx context.Context) ([]*collector.Algorithm, error) {
- return nil, errors.New("failed to implement")
- }
-
- func (o *OpenI) GetTrainingTaskLog(ctx context.Context, taskId string, instanceNum string) (string, error) {
- task, err := o.getTrainingTask(ctx, taskId)
- if err != nil {
- return "", err
- }
-
- codePaths := strings.SplitN(task.Data.Task.Description, FORWARD_SLASH, 3)
- if len(codePaths) != 3 {
- return "", errors.New("failed to get log, openI desc not set")
- }
-
- repoName := codePaths[0]
-
- tasklogurl := o.host + TaskLogUrl
- param := model.GetLogParam{
- UserName: o.userName,
- RepoName: repoName,
- Id: taskId,
- }
-
- b, _ := json.Marshal(param)
- byt := bytes.NewBuffer(b)
-
- resp := struct {
- Code int `json:"code"`
- Msg string `json:"msg"`
- Data string `json:"data"`
- }{}
-
- req := common.GetRestyRequest(common.TIMEOUT)
- r, _ := http.NewRequest("GET", tasklogurl, byt)
- req.RawRequest = r
- req.URL = tasklogurl
-
- _, err = req.
- SetHeader("Content-Type", "application/json").
- SetQueryParam(common.ACCESSTOKEN, o.accessToken).
- SetBody(byt).
- SetResult(&resp).
- Send()
-
- if err != nil {
- return "", err
- }
-
- if resp.Data == "" {
- return "waiting for logs", nil
- }
-
- return resp.Data, nil
- }
-
- func (o *OpenI) getTrainingTask(ctx context.Context, taskId string) (*model.TaskDetail, error) {
- taskDetailsUrl := o.host + TaskDetailsUrl
-
- param := model.TaskDetailParam{
- UserName: o.userName,
- RepoName: TESTREPO,
- Id: taskId,
- }
-
- b, _ := json.Marshal(param)
- byt := bytes.NewBuffer(b)
-
- resp := struct {
- Code int `json:"code"`
- Msg string `json:"msg"`
- Data model.TaskDetail `json:"data"`
- }{}
-
- req := common.GetRestyRequest(common.TIMEOUT)
- r, _ := http.NewRequest("GET", taskDetailsUrl, byt)
- req.RawRequest = r
- req.URL = taskDetailsUrl
-
- _, err := req.
- SetHeader("Content-Type", "application/json").
- SetQueryParam(common.ACCESSTOKEN, o.accessToken).
- SetBody(byt).
- SetResult(&resp).
- Send()
-
- if err != nil {
- return nil, errors.New("failed to invoke taskDetails")
- }
-
- if resp.Data.Code != 0 && resp.Data.Msg != "" {
- return nil, errors.New(resp.Data.Msg)
- }
-
- return &resp.Data, nil
- }
-
- func (o *OpenI) GetTrainingTask(ctx context.Context, taskId string) (*collector.Task, error) {
- task, err := o.getTrainingTask(ctx, taskId)
- if err != nil {
- return nil, err
- }
-
- var resp collector.Task
- resp.Id = strconv.Itoa(task.Data.Task.Id)
- if task.Data.Task.StartTime != 0 {
- resp.Start = time.Unix(int64(task.Data.Task.StartTime), 0).Format(constants.Layout)
- }
- if task.Data.Task.EndTime != 0 {
- resp.End = time.Unix(int64(task.Data.Task.EndTime), 0).Format(constants.Layout)
- }
-
- switch task.Data.Task.Status {
- case "SUCCEEDED":
- resp.Status = constants.Completed
- case "FAILED":
- resp.Status = constants.Failed
- case "CREATED_FAILED":
- resp.Status = constants.Failed
- case "RUNNING":
- resp.Status = constants.Running
- case "STOPPED":
- resp.Status = constants.Stopped
- case "PENDING":
- resp.Status = constants.Pending
- case "WAITING":
- resp.Status = constants.Waiting
- default:
- resp.Status = "undefined"
- }
-
- return &resp, nil
- }
-
- func (o *OpenI) DownloadAlgorithmCode(ctx context.Context, resourceType string, card string, taskType string, dataset string, algorithm string) (string, error) {
- return "", errors.New("failed to implement")
- }
-
- func (o *OpenI) UploadAlgorithmCode(ctx context.Context, resourceType string, card string, taskType string, dataset string, algorithm string, code string) error {
- return errors.New("failed to implement")
- }
-
- func (o *OpenI) GetComputeCards(ctx context.Context) ([]string, error) {
- return nil, errors.New("failed to implement")
- }
-
- func (o *OpenI) GetUserBalance(ctx context.Context) (float64, error) {
- return 0, errors.New("failed to implement")
- }
-
- func (o *OpenI) GetResourceSpecs(ctx context.Context, resrcType string) (*collector.ResourceSpec, error) {
- var jobType string
- if resrcType == "Inference" {
- jobType = ONLINEINFERENCE
- } else if resrcType == "Train" {
- jobType = TRAIN
- }
-
- var resources []interface{}
- res := &collector.ResourceSpec{
- ClusterId: strconv.FormatInt(o.participantId, 10),
- Tag: resrcType,
- }
- //clres := &collector.ClusterResource{}
- creationRequirelUrl := o.host + CreationRequirelUrl
- reposUrl := o.host + ReposUrl
- taskListUrl := o.host + TaskListUrl
-
- var wg sync.WaitGroup
- var ch = make(chan *collector.ClusterResource)
- var once sync.Once
-
- wg.Add(2)
- go o.genComputeResources(&wg, ch, &once, jobType, creationRequirelUrl)
- go o.genRunningTaskNum(&wg, ch, reposUrl, taskListUrl)
-
- go func() {
- wg.Wait()
- close(ch)
- }()
-
- for v := range ch {
- resources = append(resources, v)
- }
-
- res.Resources = resources
-
- return res, nil
- }
-
- func (o *OpenI) genComputeResources(wg *sync.WaitGroup, ch chan *collector.ClusterResource, once *sync.Once, jobType string, creationRequirelUrl string) {
- defer wg.Done()
-
- for c := range ComputeSource {
- wg.Add(1)
- i := c
- go func() {
- defer wg.Done()
- param := model.TaskCreationRequiredParam{
- UserName: o.userName,
- RepoName: TESTREPO,
- JobType: jobType,
- ComputeSource: ComputeSource[i],
- ClusterType: C2NET,
- }
-
- b, _ := json.Marshal(param)
- byt := bytes.NewBuffer(b)
-
- resp := struct {
- Code int `json:"code"`
- Msg string `json:"msg"`
- Data model.TaskCreationRequired `json:"data"`
- }{}
-
- req := common.GetRestyRequest(common.TIMEOUT)
- r, _ := http.NewRequest("GET", creationRequirelUrl, byt)
- req.RawRequest = r
- req.URL = creationRequirelUrl
-
- _, err := req.
- SetHeader("Content-Type", "application/json").
- SetQueryParam(common.ACCESSTOKEN, o.accessToken).
- SetBody(byt).
- SetResult(&resp).
- Send()
-
- if err != nil {
- return
- }
-
- if len(resp.Data.Data.Specs.All) == 0 {
- return
- }
-
- // balance
- var balanceCheck = func() {
- balance := resp.Data.Data.PointAccount.Balance
- bal := &collector.Usage{}
- bal.Type = strings.ToUpper(BALANCE)
- bal.Total = &collector.UnitValue{
- Unit: POINT,
- Value: balance,
- }
-
- ch <- &collector.ClusterResource{Resource: bal}
-
- //rate
- var v float64
- v = 1
- rate := &collector.Usage{
- Type: strings.ToUpper(RATE),
- Total: &collector.UnitValue{Unit: PERHOUR, Value: v},
- }
-
- ch <- &collector.ClusterResource{Resource: rate}
- }
- once.Do(balanceCheck)
-
- m := make(map[string]struct {
- Id int `json:"id"`
- AccCardsNum int `json:"acc_cards_num"`
- AccCardType string `json:"acc_card_type"`
- CpuCores int `json:"cpu_cores"`
- MemGiB int `json:"mem_gi_b"`
- GpuMemGiB int `json:"gpu_mem_gi_b"`
- ShareMemGiB int `json:"share_mem_gi_b"`
- ComputeResource string `json:"compute_resource"`
- UnitPrice int `json:"unit_price"`
- SourceSpecId string `json:"source_spec_id"`
- HasInternet int `json:"has_internet"`
- EnableVisualization bool `json:"enable_visualization"`
- })
-
- for _, s := range resp.Data.Data.Specs.All {
- e, ok := m[s.AccCardType]
- if ok {
- if s.AccCardsNum > e.AccCardsNum {
- m[s.AccCardType] = s
- }
- } else {
- m[s.AccCardType] = s
- }
- }
-
- for k, v := range m {
- bres := make([]*collector.Usage, 0)
- cres := &collector.ClusterResource{}
- card := &collector.Usage{
- Type: ComputeSource[i],
- Name: strings.ToUpper(k),
- Total: &collector.UnitValue{Unit: NUMBER, Value: v.AccCardsNum},
- Available: &collector.UnitValue{Unit: NUMBER, Value: v.AccCardsNum},
- }
- cpu := &collector.Usage{
- Type: strings.ToUpper(CPU),
- Name: strings.ToUpper(CPU),
- Total: &collector.UnitValue{Unit: CPUCORE, Value: v.CpuCores},
- Available: &collector.UnitValue{Unit: CPUCORE, Value: v.CpuCores},
- }
- mem := &collector.Usage{
- Type: strings.ToUpper(MEMORY),
- Name: strings.ToUpper(RAM),
- Total: &collector.UnitValue{Unit: GIGABYTE, Value: v.MemGiB},
- Available: &collector.UnitValue{Unit: GIGABYTE, Value: v.MemGiB},
- }
- vmem := &collector.Usage{
- Type: strings.ToUpper(MEMORY),
- Name: strings.ToUpper(VRAM),
- Total: &collector.UnitValue{Unit: GIGABYTE, Value: v.GpuMemGiB},
- Available: &collector.UnitValue{Unit: GIGABYTE, Value: v.GpuMemGiB},
- }
-
- //storage
- var s float64
- s = 1024
- storage := &collector.Usage{}
- storage.Type = STORAGE
- storage.Name = DISK
- storage.Total = &collector.UnitValue{
- Unit: GIGABYTE,
- Value: s,
- }
- storage.Available = &collector.UnitValue{
- Unit: GIGABYTE,
- Value: s,
- }
-
- bres = append(bres, storage)
- bres = append(bres, cpu)
- bres = append(bres, mem)
- bres = append(bres, vmem)
-
- cres.Resource = card
- cres.BaseResources = bres
-
- ch <- cres
- }
- }()
- }
- }
-
- func (o *OpenI) genRunningTaskNum(wg *sync.WaitGroup, ch chan *collector.ClusterResource, reposUrl string, taskListUrl string) {
- defer wg.Done()
- reporesp := struct {
- Code int `json:"code"`
- Msg string `json:"msg"`
- Data []model.Repo `json:"data"`
- }{}
-
- reporeq := common.GetRestyRequest(common.TIMEOUT)
- repor, _ := http.NewRequest("GET", reposUrl, nil)
- reporeq.RawRequest = repor
- reporeq.URL = reposUrl
-
- _, err := reporeq.
- SetHeader("Content-Type", "application/json").
- SetQueryParam(common.ACCESSTOKEN, o.accessToken).
- SetResult(&reporesp).
- Send()
-
- if err != nil {
- return
- }
-
- if len(reporesp.Data) == 0 {
- return
- }
-
- // tasklist
- var runningJobs atomic.Int64
- var jwg sync.WaitGroup
- var errs []error
- var ech = make(chan error)
- jwg.Add(1)
- go func() {
- defer jwg.Done()
- for _, datum := range reporesp.Data {
- jwg.Add(1)
- dat := datum
- go func() {
- defer jwg.Done()
- param := model.TaskListParam{
- UserName: o.userName,
- RepoName: dat.Name,
- }
-
- b, _ := json.Marshal(param)
- byt := bytes.NewBuffer(b)
-
- resp := struct {
- Code int `json:"code"`
- Msg string `json:"msg"`
- Data model.TaskList `json:"data"`
- }{}
-
- req := common.GetRestyRequest(common.TIMEOUT)
- r, _ := http.NewRequest("GET", taskListUrl, byt)
- req.RawRequest = r
- req.URL = taskListUrl
-
- _, err := req.
- SetHeader("Content-Type", "application/json").
- SetQueryParam(common.ACCESSTOKEN, o.accessToken).
- SetBody(byt).
- SetResult(&resp).
- Send()
-
- if err != nil {
- // assume occupied running tasks
- ech <- err
- return
- }
-
- if len(resp.Data.Data.Tasks) == 0 {
- return
- }
-
- for _, task := range resp.Data.Data.Tasks {
- if task.Task.Status == RUNNING {
- runningJobs.Add(1)
- }
- }
- }()
- }
- }()
-
- go func() {
- jwg.Wait()
- close(ech)
- }()
-
- for v := range ech {
- errs = append(errs, v)
- }
-
- // running tasks num
- var runningNum int64
- runningNum = runningJobs.Load()
- run := &collector.Usage{}
- run.Type = strings.ToUpper(RUNNINGTASK)
- if len(errs) == 0 {
- run.Total = &collector.UnitValue{
- Unit: NUMBER,
- Value: runningNum,
- }
-
- ch <- &collector.ClusterResource{Resource: run}
- } else {
- runningNum = int64(len(errs)) * 4
- run.Total = &collector.UnitValue{
- Unit: NUMBER,
- Value: runningNum,
- }
-
- ch <- &collector.ClusterResource{Resource: run}
- }
- }
-
- func (o *OpenI) getOnlineInferUrl(ctx context.Context, taskId string, repoName string) (string, error) {
- taskDetailsUrl := o.host + TaskOnlineInferUrl
-
- param := model.TaskDetailParam{
- UserName: o.userName,
- RepoName: repoName,
- Id: taskId,
- }
-
- b, _ := json.Marshal(param)
- byt := bytes.NewBuffer(b)
-
- resp := model.SelfEndpointUrlResp{}
- req := common.GetRestyRequest(common.TIMEOUT)
- r, _ := http.NewRequest("GET", taskDetailsUrl, byt)
- req.RawRequest = r
- req.URL = taskDetailsUrl
-
- _, err := req.
- SetHeader("Content-Type", "application/json").
- SetQueryParam(common.ACCESSTOKEN, o.accessToken).
- SetBody(byt).
- SetResult(&resp).
- Send()
-
- if err != nil {
- return "", err
- }
-
- if resp.Code != http.StatusOK {
- return "", errors.New(resp.Msg)
- }
-
- return resp.Data.Url, nil
- }
|