|
- package storeLink
-
- import (
- "bytes"
- "context"
- "encoding/json"
- "gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/schedulers/option"
- "gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/service/collector"
- "gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/service/inference"
- "gitlink.org.cn/JointCloud/pcm-openi/common"
- "gitlink.org.cn/JointCloud/pcm-openi/model"
- "mime/multipart"
- "net/http"
- "strconv"
- "strings"
- "sync"
- )
-
- const (
- DEBUG = "DEBUG"
- TRAIN = "TRAIN"
- INFERENCE = "INFERENCE"
- C2NET = "C2Net"
- TESTREPO = "testrepo"
- )
-
- // compute source
- var (
- ComputeSource = []string{"GPU", "NPU", "GCU", "MLU", "DCU", "CPU", "ILUVATAR-GPGPU", "METAX-GPGPU"}
- )
-
- type OpenI struct {
- participantId int64
- host string
- userName string
- accessToken string
- }
-
- func NewOpenI(host string, id int64, name string, token string) *OpenI {
- return &OpenI{
- host: host,
- participantId: id,
- userName: name,
- accessToken: token,
- }
- }
-
- func (o OpenI) Execute(ctx context.Context, option *option.AiOption) (interface{}, error) {
- //TODO implement me
- panic("implement me")
- }
-
- func (o OpenI) GetClusterInferUrl(ctx context.Context, option *option.InferOption) (*inference.ClusterInferUrl, error) {
- //TODO implement me
- panic("implement me")
- }
-
- func (o OpenI) GetInferDeployInstanceList(ctx context.Context) ([]*inference.DeployInstance, error) {
- //TODO implement me
- panic("implement me")
- }
-
- func (o OpenI) StartInferDeployInstance(ctx context.Context, id string) bool {
- //TODO implement me
- panic("implement me")
- }
-
- func (o OpenI) StopInferDeployInstance(ctx context.Context, id string) bool {
- //TODO implement me
- panic("implement me")
- }
-
- func (o OpenI) GetInferDeployInstance(ctx context.Context, id string) (*inference.DeployInstance, error) {
- //TODO implement me
- panic("implement me")
- }
-
- func (o OpenI) CreateInferDeployInstance(ctx context.Context, option *option.InferOption) (string, error) {
- //TODO implement me
- panic("implement me")
- }
-
- func (o OpenI) CheckModelExistence(ctx context.Context, modelName string, modelType string) bool {
- //TODO implement me
- panic("implement me")
- }
-
- func (o OpenI) GetImageInferResult(ctx context.Context, url string, file multipart.File, fileName string) (string, error) {
- //TODO implement me
- panic("implement me")
- }
-
- func (o OpenI) GetResourceStats(ctx context.Context) (*collector.ResourceStats, error) {
- //TODO implement me
- panic("implement me")
- }
-
- func (o OpenI) GetDatasetsSpecs(ctx context.Context) ([]*collector.DatasetsSpecs, error) {
- //TODO implement me
- panic("implement me")
- }
-
- func (o OpenI) GetAlgorithms(ctx context.Context) ([]*collector.Algorithm, error) {
- //TODO implement me
- panic("implement me")
- }
-
- func (o OpenI) GetTrainingTaskLog(ctx context.Context, taskId string, instanceNum string) (string, error) {
- //TODO implement me
- panic("implement me")
- }
-
- func (o OpenI) GetTrainingTask(ctx context.Context, taskId string) (*collector.Task, error) {
- //TODO implement me
- panic("implement me")
- }
-
- func (o OpenI) DownloadAlgorithmCode(ctx context.Context, resourceType string, card string, taskType string, dataset string, algorithm string) (string, error) {
- //TODO implement me
- panic("implement me")
- }
-
- func (o OpenI) UploadAlgorithmCode(ctx context.Context, resourceType string, card string, taskType string, dataset string, algorithm string, code string) error {
- //TODO implement me
- panic("implement me")
- }
-
- func (o OpenI) GetComputeCards(ctx context.Context) ([]string, error) {
- //TODO implement me
- panic("implement me")
- }
-
- func (o OpenI) GetUserBalance(ctx context.Context) (float64, error) {
- //TODO implement me
- panic("implement me")
- }
-
- func (o OpenI) GetResourceSpecs(ctx context.Context) (*collector.ResourceSpec, error) {
- var resources []interface{}
- res := &collector.ResourceSpec{
- ClusterId: strconv.FormatInt(o.participantId, 10),
- }
- url := o.host + "/api/v1/task/creationRequired"
- var wg sync.WaitGroup
- var ch = make(chan *collector.Usage)
- defer close(ch)
-
- for c := range ComputeSource {
- wg.Add(1)
- i := c
- go func() {
- defer wg.Done()
- param := model.TaskCreationRequiredParam{
- UserName: o.userName,
- RepoName: TESTREPO,
- JobType: TRAIN,
- ComputeSource: ComputeSource[i],
- ClusterType: C2NET,
- }
-
- b, _ := json.Marshal(param)
- byt := bytes.NewBuffer(b)
-
- resp := struct {
- Code int `json:"code"`
- Msg string `json:"msg"`
- Data model.TaskCreationRequired `json:"data"`
- }{}
-
- req := common.GetRestyRequest(common.TIMEOUT)
- r, _ := http.NewRequest("GET", url, byt)
- req.RawRequest = r
- req.URL = url
-
- _, err := req.
- SetHeader("Content-Type", "application/json").
- SetQueryParam(common.ACCESSTOKEN, o.accessToken).
- SetBody(byt).
- SetResult(&resp).
- Send()
-
- if err != nil {
- return
- }
-
- if len(resp.Data.Data.Specs.All) == 0 {
- return
- }
-
- m := make(map[string]struct {
- Id int `json:"id"`
- AccCardsNum int `json:"acc_cards_num"`
- AccCardType string `json:"acc_card_type"`
- CpuCores int `json:"cpu_cores"`
- MemGiB int `json:"mem_gi_b"`
- GpuMemGiB int `json:"gpu_mem_gi_b"`
- ShareMemGiB int `json:"share_mem_gi_b"`
- ComputeResource string `json:"compute_resource"`
- UnitPrice int `json:"unit_price"`
- SourceSpecId string `json:"source_spec_id"`
- HasInternet int `json:"has_internet"`
- EnableVisualization bool `json:"enable_visualization"`
- })
-
- for _, s := range resp.Data.Data.Specs.All {
- e, ok := m[s.AccCardType]
- if ok {
- if s.AccCardsNum > e.AccCardsNum {
- m[s.AccCardType] = s
- }
- } else {
- m[s.AccCardType] = s
- }
- }
-
- for k, v := range m {
- u := &collector.Usage{
- Type: strings.ToUpper(k),
- Total: v.AccCardsNum,
- Available: v.AccCardsNum,
- }
- ch <- u
- }
-
- }()
- }
-
- go func() {
- for v := range ch {
- resources = append(resources, v)
- }
- }()
-
- wg.Wait()
-
- res.Resource = resources
-
- return res, nil
- }
|