|
- package storeLink
-
- import (
- "bytes"
- "context"
- "encoding/json"
- "errors"
- "gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/schedulers/option"
- "gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/service/collector"
- "gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/service/inference"
- "gitlink.org.cn/JointCloud/pcm-openi/common"
- "gitlink.org.cn/JointCloud/pcm-openi/model"
- "mime/multipart"
- "net/http"
- "strconv"
- "strings"
- "sync"
- "sync/atomic"
- )
-
- const (
- DEBUG = "DEBUG"
- TRAIN = "TRAIN"
- INFERENCE = "INFERENCE"
- C2NET = "C2Net"
- TESTREPO = "testrepo"
- )
-
- // compute source
- var (
- ComputeSource = []string{"GPU", "NPU", "GCU", "MLU", "DCU", "CPU", "ILUVATAR-GPGPU", "METAX-GPGPU"}
- )
-
- type OpenI struct {
- participantId int64
- host string
- userName string
- accessToken string
- }
-
- func NewOpenI(host string, id int64, name string, token string) *OpenI {
- return &OpenI{
- host: host,
- participantId: id,
- userName: name,
- accessToken: token,
- }
- }
-
- func (o OpenI) Execute(ctx context.Context, option *option.AiOption) (interface{}, error) {
- return nil, errors.New("failed to implement")
- }
-
- func (o OpenI) GetClusterInferUrl(ctx context.Context, option *option.InferOption) (*inference.ClusterInferUrl, error) {
- return nil, errors.New("failed to implement")
- }
-
- func (o OpenI) GetInferDeployInstanceList(ctx context.Context) ([]*inference.DeployInstance, error) {
- return nil, errors.New("failed to implement")
- }
-
- func (o OpenI) StartInferDeployInstance(ctx context.Context, id string) bool {
- return false
- }
-
- func (o OpenI) StopInferDeployInstance(ctx context.Context, id string) bool {
- return false
- }
-
- func (o OpenI) GetInferDeployInstance(ctx context.Context, id string) (*inference.DeployInstance, error) {
- return nil, errors.New("failed to implement")
- }
-
- func (o OpenI) CreateInferDeployInstance(ctx context.Context, option *option.InferOption) (string, error) {
- return "", errors.New("failed to implement")
- }
-
- func (o OpenI) CheckModelExistence(ctx context.Context, modelName string, modelType string) bool {
- return false
- }
-
- func (o OpenI) GetImageInferResult(ctx context.Context, url string, file multipart.File, fileName string) (string, error) {
- return "", errors.New("failed to implement")
- }
-
- func (o OpenI) GetResourceStats(ctx context.Context) (*collector.ResourceStats, error) {
- return nil, errors.New("failed to implement")
- }
-
- func (o OpenI) GetDatasetsSpecs(ctx context.Context) ([]*collector.DatasetsSpecs, error) {
- return nil, errors.New("failed to implement")
- }
-
- func (o OpenI) GetAlgorithms(ctx context.Context) ([]*collector.Algorithm, error) {
- return nil, errors.New("failed to implement")
- }
-
- func (o OpenI) GetTrainingTaskLog(ctx context.Context, taskId string, instanceNum string) (string, error) {
- return "", errors.New("failed to implement")
- }
-
- func (o OpenI) GetTrainingTask(ctx context.Context, taskId string) (*collector.Task, error) {
- return nil, errors.New("failed to implement")
- }
-
- func (o OpenI) DownloadAlgorithmCode(ctx context.Context, resourceType string, card string, taskType string, dataset string, algorithm string) (string, error) {
- return "", errors.New("failed to implement")
- }
-
- func (o OpenI) UploadAlgorithmCode(ctx context.Context, resourceType string, card string, taskType string, dataset string, algorithm string, code string) error {
- return errors.New("failed to implement")
- }
-
- func (o OpenI) GetComputeCards(ctx context.Context) ([]string, error) {
- return nil, errors.New("failed to implement")
- }
-
- func (o OpenI) GetUserBalance(ctx context.Context) (float64, error) {
- return 0, errors.New("failed to implement")
- }
-
- func (o OpenI) GetResourceSpecs(ctx context.Context) (*collector.ResourceSpec, error) {
- var resources []interface{}
- res := &collector.ResourceSpec{
- ClusterId: strconv.FormatInt(o.participantId, 10),
- }
- creationRequirelUrl := o.host + "/api/v1/task/creationRequired"
- reposUrl := o.host + "/api/v1/user/repos"
- taskListUrl := o.host + "/api/v1/task/list"
- //taskDetailsUrl := o.host + "/api/v1/task/detail"
-
- var wg sync.WaitGroup
- var ch = make(chan *collector.Usage)
- var once sync.Once
- wg.Add(2)
-
- go func() {
- defer wg.Done()
- for c := range ComputeSource {
- wg.Add(1)
- i := c
- go func() {
- defer wg.Done()
- param := model.TaskCreationRequiredParam{
- UserName: o.userName,
- RepoName: TESTREPO,
- JobType: TRAIN,
- ComputeSource: ComputeSource[i],
- ClusterType: C2NET,
- }
-
- b, _ := json.Marshal(param)
- byt := bytes.NewBuffer(b)
-
- resp := struct {
- Code int `json:"code"`
- Msg string `json:"msg"`
- Data model.TaskCreationRequired `json:"data"`
- }{}
-
- req := common.GetRestyRequest(common.TIMEOUT)
- r, _ := http.NewRequest("GET", creationRequirelUrl, byt)
- req.RawRequest = r
- req.URL = creationRequirelUrl
-
- _, err := req.
- SetHeader("Content-Type", "application/json").
- SetQueryParam(common.ACCESSTOKEN, o.accessToken).
- SetBody(byt).
- SetResult(&resp).
- Send()
-
- if err != nil {
- return
- }
-
- if len(resp.Data.Data.Specs.All) == 0 {
- return
- }
-
- // balance
- var balanceCheck = func() {
- balance := resp.Data.Data.PointAccount.Balance
- bal := &collector.Usage{}
- bal.Type = strings.ToUpper(BALANCE)
- bal.Total = &collector.UnitValue{
- Unit: POINT,
- Value: balance,
- }
-
- ch <- bal
- }
- once.Do(balanceCheck)
-
- m := make(map[string]struct {
- Id int `json:"id"`
- AccCardsNum int `json:"acc_cards_num"`
- AccCardType string `json:"acc_card_type"`
- CpuCores int `json:"cpu_cores"`
- MemGiB int `json:"mem_gi_b"`
- GpuMemGiB int `json:"gpu_mem_gi_b"`
- ShareMemGiB int `json:"share_mem_gi_b"`
- ComputeResource string `json:"compute_resource"`
- UnitPrice int `json:"unit_price"`
- SourceSpecId string `json:"source_spec_id"`
- HasInternet int `json:"has_internet"`
- EnableVisualization bool `json:"enable_visualization"`
- })
-
- for _, s := range resp.Data.Data.Specs.All {
- e, ok := m[s.AccCardType]
- if ok {
- if s.AccCardsNum > e.AccCardsNum {
- m[s.AccCardType] = s
- }
- } else {
- m[s.AccCardType] = s
- }
- }
-
- for k, v := range m {
- u := &collector.Usage{
- Type: ComputeSource[i],
- Name: strings.ToUpper(k),
- Total: &collector.UnitValue{Unit: NUMBER, Value: v.AccCardsNum},
- Available: &collector.UnitValue{Unit: NUMBER, Value: v.AccCardsNum},
- }
- ch <- u
- }
- }()
- }
- }()
-
- // repos
- go func() {
- defer wg.Done()
- reporesp := struct {
- Code int `json:"code"`
- Msg string `json:"msg"`
- Data []model.Repo `json:"data"`
- }{}
-
- reporeq := common.GetRestyRequest(common.TIMEOUT)
- repor, _ := http.NewRequest("GET", reposUrl, nil)
- reporeq.RawRequest = repor
- reporeq.URL = reposUrl
-
- _, err := reporeq.
- SetHeader("Content-Type", "application/json").
- SetQueryParam(common.ACCESSTOKEN, o.accessToken).
- SetResult(&reporesp).
- Send()
-
- if err != nil {
- return
- }
-
- if len(reporesp.Data) == 0 {
- return
- }
-
- // tasklist
- var runningJobs atomic.Int64
- var jwg sync.WaitGroup
- var errs []error
- var ech = make(chan error)
- jwg.Add(1)
- go func() {
- defer jwg.Done()
- for _, datum := range reporesp.Data {
- jwg.Add(1)
- dat := datum
- go func() {
- defer jwg.Done()
- param := model.TaskListParam{
- UserName: o.userName,
- RepoName: dat.Name,
- }
-
- b, _ := json.Marshal(param)
- byt := bytes.NewBuffer(b)
-
- resp := struct {
- Code int `json:"code"`
- Msg string `json:"msg"`
- Data model.TaskList `json:"data"`
- }{}
-
- req := common.GetRestyRequest(common.TIMEOUT)
- r, _ := http.NewRequest("GET", taskListUrl, byt)
- req.RawRequest = r
- req.URL = taskListUrl
-
- _, err := req.
- SetHeader("Content-Type", "application/json").
- SetQueryParam(common.ACCESSTOKEN, o.accessToken).
- SetBody(byt).
- SetResult(&resp).
- Send()
-
- if err != nil {
- // assume occupied running tasks
- ech <- err
- return
- }
-
- if len(resp.Data.Data.Tasks) == 0 {
- return
- }
-
- for _, task := range resp.Data.Data.Tasks {
- if task.Task.Status == RUNNING {
- runningJobs.Add(1)
- }
- }
- }()
- }
- }()
-
- go func() {
- jwg.Wait()
- close(ech)
- }()
-
- for v := range ech {
- errs = append(errs, v)
- }
-
- run := &collector.Usage{}
- run.Type = strings.ToUpper(RUNNINGTASK)
- if len(errs) == 0 {
- run.Total = &collector.UnitValue{
- Unit: NUMBER,
- Value: runningJobs.Load(),
- }
-
- ch <- run
- } else {
- running := int64(len(errs)) * 4
- run.Total = &collector.UnitValue{
- Unit: NUMBER,
- Value: running,
- }
-
- ch <- run
- }
- }()
-
- go func() {
- rate := &collector.Usage{
- Type: strings.ToUpper(RATE),
- Total: &collector.UnitValue{Unit: PERHOUR, Value: 1},
- }
-
- ch <- rate
- }()
-
- go func() {
- wg.Wait()
- close(ch)
- }()
-
- for v := range ch {
- resources = append(resources, v)
- }
-
- res.Resources = resources
-
- return res, nil
- }
|