Browse Source

update openi

pull/368/head
tzwang 11 months ago
parent
commit
2d82c1d401
6 changed files with 134 additions and 9 deletions
  1. +1
    -1
      go.mod
  2. +2
    -2
      go.sum
  3. +1
    -1
      internal/scheduler/service/aiService.go
  4. +4
    -2
      internal/scheduler/strategy/loadPriority.go
  5. +124
    -3
      internal/storeLink/openi.go
  6. +2
    -0
      internal/storeLink/storeLink.go

+ 1
- 1
go.mod View File

@@ -22,7 +22,7 @@ require (
gitlink.org.cn/JointCloud/pcm-hpc v0.0.0-20241125115811-72f3568255a4 gitlink.org.cn/JointCloud/pcm-hpc v0.0.0-20241125115811-72f3568255a4
gitlink.org.cn/JointCloud/pcm-modelarts v0.0.0-20241211021152-9771ba5670b7 gitlink.org.cn/JointCloud/pcm-modelarts v0.0.0-20241211021152-9771ba5670b7
gitlink.org.cn/JointCloud/pcm-octopus v0.0.0-20240817071412-44397870b110 gitlink.org.cn/JointCloud/pcm-octopus v0.0.0-20240817071412-44397870b110
gitlink.org.cn/JointCloud/pcm-openi v0.0.0-20241206092318-9f0c3142fced
gitlink.org.cn/JointCloud/pcm-openi v0.0.0-20241217104229-e46c7e0c8e56
gitlink.org.cn/JointCloud/pcm-openstack v0.0.0-20240403033338-e7edabad4203 gitlink.org.cn/JointCloud/pcm-openstack v0.0.0-20240403033338-e7edabad4203
gitlink.org.cn/JointCloud/pcm-slurm v0.0.0-20240301080743-8b94bbaf57f5 gitlink.org.cn/JointCloud/pcm-slurm v0.0.0-20240301080743-8b94bbaf57f5
go.opentelemetry.io/otel/trace v1.32.0 go.opentelemetry.io/otel/trace v1.32.0


+ 2
- 2
go.sum View File

@@ -532,8 +532,8 @@ gitlink.org.cn/JointCloud/pcm-modelarts v0.0.0-20241211021152-9771ba5670b7 h1:sP
gitlink.org.cn/JointCloud/pcm-modelarts v0.0.0-20241211021152-9771ba5670b7/go.mod h1:V19vFg8dWRAbaskASoSj70dgpacswOqZu/SaI02dxac= gitlink.org.cn/JointCloud/pcm-modelarts v0.0.0-20241211021152-9771ba5670b7/go.mod h1:V19vFg8dWRAbaskASoSj70dgpacswOqZu/SaI02dxac=
gitlink.org.cn/JointCloud/pcm-octopus v0.0.0-20240817071412-44397870b110 h1:GaXwr5sgDh0raHjUf9IewTvnRvajYea7zbLsaerYyXo= gitlink.org.cn/JointCloud/pcm-octopus v0.0.0-20240817071412-44397870b110 h1:GaXwr5sgDh0raHjUf9IewTvnRvajYea7zbLsaerYyXo=
gitlink.org.cn/JointCloud/pcm-octopus v0.0.0-20240817071412-44397870b110/go.mod h1:QOD5+/l2D+AYBjF2h5T0mdJyfGAmF78QmeKdbBXbjLQ= gitlink.org.cn/JointCloud/pcm-octopus v0.0.0-20240817071412-44397870b110/go.mod h1:QOD5+/l2D+AYBjF2h5T0mdJyfGAmF78QmeKdbBXbjLQ=
gitlink.org.cn/JointCloud/pcm-openi v0.0.0-20241206092318-9f0c3142fced h1:N/UMOMm5fzELf52Neze+2oNMEUejFWWqbFwzhzJvgHc=
gitlink.org.cn/JointCloud/pcm-openi v0.0.0-20241206092318-9f0c3142fced/go.mod h1:oDJrr/TNbUCaVjI+RaOrUtGawD7UPAvp7U/oVgT2Dhc=
gitlink.org.cn/JointCloud/pcm-openi v0.0.0-20241217104229-e46c7e0c8e56 h1:zfE8/x16ZF4P2b8u24bHjH1ZBp/pA+4Zt4LKMasXlJs=
gitlink.org.cn/JointCloud/pcm-openi v0.0.0-20241217104229-e46c7e0c8e56/go.mod h1:oDJrr/TNbUCaVjI+RaOrUtGawD7UPAvp7U/oVgT2Dhc=
gitlink.org.cn/JointCloud/pcm-openstack v0.0.0-20240403033338-e7edabad4203 h1:s6PsZ1+bev294IWdZRlV7mnOwI1+UzFcldVW/BqhQzI= gitlink.org.cn/JointCloud/pcm-openstack v0.0.0-20240403033338-e7edabad4203 h1:s6PsZ1+bev294IWdZRlV7mnOwI1+UzFcldVW/BqhQzI=
gitlink.org.cn/JointCloud/pcm-openstack v0.0.0-20240403033338-e7edabad4203/go.mod h1:i2rrbMQ+Fve345BY9Heh4MUqVTAimZQElQhzzRee5B8= gitlink.org.cn/JointCloud/pcm-openstack v0.0.0-20240403033338-e7edabad4203/go.mod h1:i2rrbMQ+Fve345BY9Heh4MUqVTAimZQElQhzzRee5B8=
gitlink.org.cn/JointCloud/pcm-slurm v0.0.0-20240301080743-8b94bbaf57f5 h1:+/5vnzkJBfMRnya1NrhOzlroUtRa5ePiYbPKlHLoLV0= gitlink.org.cn/JointCloud/pcm-slurm v0.0.0-20240301080743-8b94bbaf57f5 h1:+/5vnzkJBfMRnya1NrhOzlroUtRa5ePiYbPKlHLoLV0=


+ 1
- 1
internal/scheduler/service/aiService.go View File

@@ -91,7 +91,7 @@ func InitAiClusterMap(conf *config.Config, clusters []types.ClusterInfo) (map[st
inferenceMap[c.Id] = sgai inferenceMap[c.Id] = sgai
case OPENI: case OPENI:
id, _ := strconv.ParseInt(c.Id, 10, 64) id, _ := strconv.ParseInt(c.Id, 10, 64)
openi := storeLink.NewOpenI(c.Server, id, c.Username, c.Token)
openi := storeLink.NewOpenI("http://localhost:2024", id, c.Username, c.Token)
collectorMap[c.Id] = openi collectorMap[c.Id] = openi
executorMap[c.Id] = openi executorMap[c.Id] = openi
inferenceMap[c.Id] = openi inferenceMap[c.Id] = openi


+ 4
- 2
internal/scheduler/strategy/loadPriority.go View File

@@ -2,11 +2,13 @@ package strategy


type LoadPriority struct { type LoadPriority struct {
replicas int32 replicas int32
Clusters []*CLusterLoad
} }


type CLusterLoad struct { type CLusterLoad struct {
ClusterId string
TaskRunning string
ClusterId string
TaskRunningNum int64
TaskPredictedNum int64
} }


func NewLoadPriority() *LoadPriority { func NewLoadPriority() *LoadPriority {


+ 124
- 3
internal/storeLink/openi.go View File

@@ -15,6 +15,7 @@ import (
"strconv" "strconv"
"strings" "strings"
"sync" "sync"
"sync/atomic"
) )


const ( const (
@@ -123,11 +124,17 @@ func (o OpenI) GetResourceSpecs(ctx context.Context) (*collector.ResourceSpec, e
res := &collector.ResourceSpec{ res := &collector.ResourceSpec{
ClusterId: strconv.FormatInt(o.participantId, 10), ClusterId: strconv.FormatInt(o.participantId, 10),
} }
url := o.host + "/api/v1/task/creationRequired"
creationRequirelUrl := o.host + "/api/v1/task/creationRequired"
reposUrl := o.host + "/api/v1/user/repos"
taskListUrl := o.host + "/api/v1/task/list"
//taskDetailsUrl := o.host + "/api/v1/task/detail"

var wg sync.WaitGroup var wg sync.WaitGroup
var ch = make(chan *collector.Usage) var ch = make(chan *collector.Usage)
defer close(ch) defer close(ch)


var once sync.Once

for c := range ComputeSource { for c := range ComputeSource {
wg.Add(1) wg.Add(1)
i := c i := c
@@ -151,9 +158,9 @@ func (o OpenI) GetResourceSpecs(ctx context.Context) (*collector.ResourceSpec, e
}{} }{}


req := common.GetRestyRequest(common.TIMEOUT) req := common.GetRestyRequest(common.TIMEOUT)
r, _ := http.NewRequest("GET", url, byt)
r, _ := http.NewRequest("GET", creationRequirelUrl, byt)
req.RawRequest = r req.RawRequest = r
req.URL = url
req.URL = creationRequirelUrl


_, err := req. _, err := req.
SetHeader("Content-Type", "application/json"). SetHeader("Content-Type", "application/json").
@@ -170,6 +177,24 @@ func (o OpenI) GetResourceSpecs(ctx context.Context) (*collector.ResourceSpec, e
return return
} }


// balance
wg.Add(1)
go func() {
defer wg.Done()
var balanceCheck = func() {
balance := resp.Data.Data.PointAccount.Balance
bal := &collector.Usage{}
bal.Type = strings.ToUpper(BALANCE)
bal.Total = &collector.UnitValue{
Unit: POINT,
Value: balance,
}

ch <- bal
}
once.Do(balanceCheck)
}()

m := make(map[string]struct { m := make(map[string]struct {
Id int `json:"id"` Id int `json:"id"`
AccCardsNum int `json:"acc_cards_num"` AccCardsNum int `json:"acc_cards_num"`
@@ -209,12 +234,108 @@ func (o OpenI) GetResourceSpecs(ctx context.Context) (*collector.ResourceSpec, e
}() }()
} }


// tasks
var jch = make(chan *collector.Usage, 1)
wg.Add(1)
go func() {
defer close(jch)
defer wg.Done()
reporesp := struct {
Code int `json:"code"`
Msg string `json:"msg"`
Data []model.Repo `json:"data"`
}{}

reporeq := common.GetRestyRequest(common.TIMEOUT)
repor, _ := http.NewRequest("GET", reposUrl, nil)
reporeq.RawRequest = repor
reporeq.URL = reposUrl

_, err := reporeq.
SetHeader("Content-Type", "application/json").
SetQueryParam(common.ACCESSTOKEN, o.accessToken).
SetResult(&reporesp).
Send()

if err != nil {
return
}

if len(reporesp.Data) == 0 {
return
}

var runningJobs atomic.Int64
var jwg sync.WaitGroup
for _, datum := range reporesp.Data {
jwg.Add(1)
dat := datum
go func() {
defer jwg.Done()
param := model.TaskListParam{
UserName: o.userName,
RepoName: dat.Name,
}

b, _ := json.Marshal(param)
byt := bytes.NewBuffer(b)

resp := struct {
Code int `json:"code"`
Msg string `json:"msg"`
Data model.TaskList `json:"data"`
}{}

req := common.GetRestyRequest(common.TIMEOUT)
r, _ := http.NewRequest("GET", taskListUrl, byt)
req.RawRequest = r
req.URL = taskListUrl

_, err := req.
SetHeader("Content-Type", "application/json").
SetQueryParam(common.ACCESSTOKEN, o.accessToken).
SetBody(byt).
SetResult(&resp).
Send()

if err != nil {
return
}

if len(resp.Data.Data.Tasks) == 0 {
return
}

for _, task := range resp.Data.Data.Tasks {
if task.Task.Status == RUNNING {
runningJobs.Add(1)
}
}
}()
}
jwg.Wait()

run := &collector.Usage{}
run.Type = strings.ToUpper(RUNNING)
run.Total = &collector.UnitValue{
Unit: NUMBER,
Value: runningJobs.Load(),
}

jch <- run

}()

go func() { go func() {
for v := range ch { for v := range ch {
resources = append(resources, v) resources = append(resources, v)
} }
}() }()


for v := range jch {
resources = append(resources, v)
}

wg.Wait() wg.Wait()


res.Resources = resources res.Resources = resources


+ 2
- 0
internal/storeLink/storeLink.go View File

@@ -80,6 +80,8 @@ const (
STORAGE = "STORAGE" STORAGE = "STORAGE"
DISK = "disk" DISK = "disk"
RMB = "rmb" RMB = "rmb"
POINT = "point"
RUNNING = "RUNNING"
) )


var ( var (


Loading…
Cancel
Save