Browse Source

Merge pull request 'fix openi algorithmId format err' (#492) from tzwang/pcm-coordinator:master into master

pull/494/head
tzwang 5 months ago
parent
commit
fa13f14503
3 changed files with 246 additions and 246 deletions
  1. +0
    -3
      internal/storeLink/modelarts.go
  2. +2
    -2
      internal/storeLink/octopus.go
  3. +244
    -241
      internal/storeLink/openi.go

+ 0
- 3
internal/storeLink/modelarts.go View File

@@ -997,7 +997,6 @@ func (m *ModelArtsLink) CheckImageExist(ctx context.Context, option *option.Infe
}

func (m *ModelArtsLink) GetResourceSpecs(ctx context.Context, resrcType string) (*collector.ResourceSpec, error) {
var wg sync.WaitGroup
MoUsage := MoUsage{}
var cpusum int64 = 0
var npusum int64 = 0
@@ -1021,7 +1020,6 @@ func (m *ModelArtsLink) GetResourceSpecs(ctx context.Context, resrcType string)
}
respJobFlavors, err := m.modelArtsRpc.GetTrainingJobFlavors(ctx, reqJobFlavors)
if err != nil {
wg.Done()
return nil, err
}
respJobFlavorsMarshal, err2 := json.Marshal(respJobFlavors)
@@ -1095,7 +1093,6 @@ func (m *ModelArtsLink) GetResourceSpecs(ctx context.Context, resrcType string)
}
ListSpecificationsResp, err := m.modelArtsRpc.ListSpecifications(ctx, req)
if err != nil {
wg.Done()
return nil, err
}
respJobSpecificationsMarshal, err2 := json.Marshal(ListSpecificationsResp)


+ 2
- 2
internal/storeLink/octopus.go View File

@@ -1280,11 +1280,11 @@ func (o *OctopusLink) CheckModelExistence(ctx context.Context, name string, mtyp
}

func (o *OctopusLink) GetResourceSpecs(ctx context.Context, resrcType string) (*collector.ResourceSpec, error) {
res := &collector.ResourceSpec{
_ = &collector.ResourceSpec{
ClusterId: strconv.FormatInt(o.participantId, 10),
Resources: make([]interface{}, 0),
}
return res, nil
return nil, nil
}

func (o *OctopusLink) Stop(ctx context.Context, id string) error {


+ 244
- 241
internal/storeLink/openi.go View File

@@ -320,7 +320,7 @@ func (o *OpenI) Stop(ctx context.Context, id string) error {
return err
}

codePaths := strings.Split(task.Data.Task.Description, FORWARD_SLASH)
codePaths := strings.SplitN(task.Data.Task.Description, FORWARD_SLASH, 3)
if len(codePaths) != 3 {
return errors.New("failed to stop, openI desc not set")
}
@@ -635,9 +635,9 @@ func (o *OpenI) GetTrainingTaskLog(ctx context.Context, taskId string, instanceN
return "", err
}

codePaths := strings.Split(task.Data.Task.Description, FORWARD_SLASH)
codePaths := strings.SplitN(task.Data.Task.Description, FORWARD_SLASH, 3)
if len(codePaths) != 3 {
return "", errors.New("failed to get log, openI desc not set")
return "", errors.New("failed to stop, openI desc not set")
}

repoName := codePaths[0]
@@ -796,289 +796,292 @@ func (o *OpenI) GetResourceSpecs(ctx context.Context, resrcType string) (*collec
var wg sync.WaitGroup
var ch = make(chan *collector.ClusterResource)
var once sync.Once

wg.Add(2)
go o.genComputeResources(&wg, ch, &once, jobType, creationRequirelUrl)
go o.genRunningTaskNum(&wg, ch, reposUrl, taskListUrl)

go func() {
defer wg.Done()
for c := range ComputeSource {
wg.Add(1)
i := c
go func() {
defer wg.Done()
param := model.TaskCreationRequiredParam{
UserName: o.userName,
RepoName: TESTREPO,
JobType: jobType,
ComputeSource: ComputeSource[i],
ClusterType: C2NET,
}
wg.Wait()
close(ch)
}()

b, _ := json.Marshal(param)
byt := bytes.NewBuffer(b)
for v := range ch {
resources = append(resources, v)
}

resp := struct {
Code int `json:"code"`
Msg string `json:"msg"`
Data model.TaskCreationRequired `json:"data"`
}{}
res.Resources = resources

req := common.GetRestyRequest(common.TIMEOUT)
r, _ := http.NewRequest("GET", creationRequirelUrl, byt)
req.RawRequest = r
req.URL = creationRequirelUrl
return res, nil
}

_, err := req.
SetHeader("Content-Type", "application/json").
SetQueryParam(common.ACCESSTOKEN, o.accessToken).
SetBody(byt).
SetResult(&resp).
Send()
func (o *OpenI) genComputeResources(wg *sync.WaitGroup, ch chan *collector.ClusterResource, once *sync.Once, jobType string, creationRequirelUrl string) {
defer wg.Done()

if err != nil {
return
}
for c := range ComputeSource {
wg.Add(1)
i := c
go func() {
defer wg.Done()
param := model.TaskCreationRequiredParam{
UserName: o.userName,
RepoName: TESTREPO,
JobType: jobType,
ComputeSource: ComputeSource[i],
ClusterType: C2NET,
}

if len(resp.Data.Data.Specs.All) == 0 {
return
}
b, _ := json.Marshal(param)
byt := bytes.NewBuffer(b)

resp := struct {
Code int `json:"code"`
Msg string `json:"msg"`
Data model.TaskCreationRequired `json:"data"`
}{}

req := common.GetRestyRequest(common.TIMEOUT)
r, _ := http.NewRequest("GET", creationRequirelUrl, byt)
req.RawRequest = r
req.URL = creationRequirelUrl

_, err := req.
SetHeader("Content-Type", "application/json").
SetQueryParam(common.ACCESSTOKEN, o.accessToken).
SetBody(byt).
SetResult(&resp).
Send()

if err != nil {
return
}

// balance
var balanceCheck = func() {
balance := resp.Data.Data.PointAccount.Balance
bal := &collector.Usage{}
bal.Type = strings.ToUpper(BALANCE)
bal.Total = &collector.UnitValue{
Unit: POINT,
Value: balance,
}
if len(resp.Data.Data.Specs.All) == 0 {
return
}

ch <- &collector.ClusterResource{Resource: bal}
// balance
var balanceCheck = func() {
balance := resp.Data.Data.PointAccount.Balance
bal := &collector.Usage{}
bal.Type = strings.ToUpper(BALANCE)
bal.Total = &collector.UnitValue{
Unit: POINT,
Value: balance,
}

//rate
var v float64
v = 1
rate := &collector.Usage{
Type: strings.ToUpper(RATE),
Total: &collector.UnitValue{Unit: PERHOUR, Value: v},
}
ch <- &collector.ClusterResource{Resource: bal}

ch <- &collector.ClusterResource{Resource: rate}
//rate
var v float64
v = 1
rate := &collector.Usage{
Type: strings.ToUpper(RATE),
Total: &collector.UnitValue{Unit: PERHOUR, Value: v},
}
once.Do(balanceCheck)

m := make(map[string]struct {
Id int `json:"id"`
AccCardsNum int `json:"acc_cards_num"`
AccCardType string `json:"acc_card_type"`
CpuCores int `json:"cpu_cores"`
MemGiB int `json:"mem_gi_b"`
GpuMemGiB int `json:"gpu_mem_gi_b"`
ShareMemGiB int `json:"share_mem_gi_b"`
ComputeResource string `json:"compute_resource"`
UnitPrice int `json:"unit_price"`
SourceSpecId string `json:"source_spec_id"`
HasInternet int `json:"has_internet"`
EnableVisualization bool `json:"enable_visualization"`
})
for _, s := range resp.Data.Data.Specs.All {
e, ok := m[s.AccCardType]
if ok {
if s.AccCardsNum > e.AccCardsNum {
m[s.AccCardType] = s
}
} else {
ch <- &collector.ClusterResource{Resource: rate}
}
once.Do(balanceCheck)
m := make(map[string]struct {
Id int `json:"id"`
AccCardsNum int `json:"acc_cards_num"`
AccCardType string `json:"acc_card_type"`
CpuCores int `json:"cpu_cores"`
MemGiB int `json:"mem_gi_b"`
GpuMemGiB int `json:"gpu_mem_gi_b"`
ShareMemGiB int `json:"share_mem_gi_b"`
ComputeResource string `json:"compute_resource"`
UnitPrice int `json:"unit_price"`
SourceSpecId string `json:"source_spec_id"`
HasInternet int `json:"has_internet"`
EnableVisualization bool `json:"enable_visualization"`
})
for _, s := range resp.Data.Data.Specs.All {
e, ok := m[s.AccCardType]
if ok {
if s.AccCardsNum > e.AccCardsNum {
m[s.AccCardType] = s
}
} else {
m[s.AccCardType] = s
}
}

for k, v := range m {
bres := make([]*collector.Usage, 0)
cres := &collector.ClusterResource{}
card := &collector.Usage{
Type: ComputeSource[i],
Name: strings.ToUpper(k),
Total: &collector.UnitValue{Unit: NUMBER, Value: v.AccCardsNum},
Available: &collector.UnitValue{Unit: NUMBER, Value: v.AccCardsNum},
}
cpu := &collector.Usage{
Type: strings.ToUpper(CPU),
Name: strings.ToUpper(CPU),
Total: &collector.UnitValue{Unit: CPUCORE, Value: v.CpuCores},
Available: &collector.UnitValue{Unit: CPUCORE, Value: v.CpuCores},
}
mem := &collector.Usage{
Type: strings.ToUpper(MEMORY),
Name: strings.ToUpper(RAM),
Total: &collector.UnitValue{Unit: GIGABYTE, Value: v.MemGiB},
Available: &collector.UnitValue{Unit: GIGABYTE, Value: v.MemGiB},
}
vmem := &collector.Usage{
Type: strings.ToUpper(MEMORY),
Name: strings.ToUpper(VRAM),
Total: &collector.UnitValue{Unit: GIGABYTE, Value: v.GpuMemGiB},
Available: &collector.UnitValue{Unit: GIGABYTE, Value: v.GpuMemGiB},
}

//storage
var s float64
s = 1024
storage := &collector.Usage{}
storage.Type = STORAGE
storage.Name = DISK
storage.Total = &collector.UnitValue{
Unit: GIGABYTE,
Value: s,
}
storage.Available = &collector.UnitValue{
Unit: GIGABYTE,
Value: s,
}

bres = append(bres, storage)
bres = append(bres, cpu)
bres = append(bres, mem)
bres = append(bres, vmem)

cres.Resource = card
cres.BaseResources = bres

ch <- cres
for k, v := range m {
bres := make([]*collector.Usage, 0)
cres := &collector.ClusterResource{}
card := &collector.Usage{
Type: ComputeSource[i],
Name: strings.ToUpper(k),
Total: &collector.UnitValue{Unit: NUMBER, Value: v.AccCardsNum},
Available: &collector.UnitValue{Unit: NUMBER, Value: v.AccCardsNum},
}
cpu := &collector.Usage{
Type: strings.ToUpper(CPU),
Name: strings.ToUpper(CPU),
Total: &collector.UnitValue{Unit: CPUCORE, Value: v.CpuCores},
Available: &collector.UnitValue{Unit: CPUCORE, Value: v.CpuCores},
}
mem := &collector.Usage{
Type: strings.ToUpper(MEMORY),
Name: strings.ToUpper(RAM),
Total: &collector.UnitValue{Unit: GIGABYTE, Value: v.MemGiB},
Available: &collector.UnitValue{Unit: GIGABYTE, Value: v.MemGiB},
}
vmem := &collector.Usage{
Type: strings.ToUpper(MEMORY),
Name: strings.ToUpper(VRAM),
Total: &collector.UnitValue{Unit: GIGABYTE, Value: v.GpuMemGiB},
Available: &collector.UnitValue{Unit: GIGABYTE, Value: v.GpuMemGiB},
}
}()
}
}()

// repos
go func() {
defer wg.Done()
reporesp := struct {
Code int `json:"code"`
Msg string `json:"msg"`
Data []model.Repo `json:"data"`
}{}

reporeq := common.GetRestyRequest(common.TIMEOUT)
repor, _ := http.NewRequest("GET", reposUrl, nil)
reporeq.RawRequest = repor
reporeq.URL = reposUrl
//storage
var s float64
s = 1024
storage := &collector.Usage{}
storage.Type = STORAGE
storage.Name = DISK
storage.Total = &collector.UnitValue{
Unit: GIGABYTE,
Value: s,
}
storage.Available = &collector.UnitValue{
Unit: GIGABYTE,
Value: s,
}

_, err := reporeq.
SetHeader("Content-Type", "application/json").
SetQueryParam(common.ACCESSTOKEN, o.accessToken).
SetResult(&reporesp).
Send()
bres = append(bres, storage)
bres = append(bres, cpu)
bres = append(bres, mem)
bres = append(bres, vmem)

if err != nil {
return
}
cres.Resource = card
cres.BaseResources = bres

if len(reporesp.Data) == 0 {
return
}
ch <- cres
}
}()
}
}

// tasklist
var runningJobs atomic.Int64
var jwg sync.WaitGroup
var errs []error
var ech = make(chan error)
jwg.Add(1)
go func() {
defer jwg.Done()
for _, datum := range reporesp.Data {
jwg.Add(1)
dat := datum
go func() {
defer jwg.Done()
param := model.TaskListParam{
UserName: o.userName,
RepoName: dat.Name,
}
func (o *OpenI) genRunningTaskNum(wg *sync.WaitGroup, ch chan *collector.ClusterResource, reposUrl string, taskListUrl string) {
defer wg.Done()
reporesp := struct {
Code int `json:"code"`
Msg string `json:"msg"`
Data []model.Repo `json:"data"`
}{}

b, _ := json.Marshal(param)
byt := bytes.NewBuffer(b)
reporeq := common.GetRestyRequest(common.TIMEOUT)
repor, _ := http.NewRequest("GET", reposUrl, nil)
reporeq.RawRequest = repor
reporeq.URL = reposUrl

resp := struct {
Code int `json:"code"`
Msg string `json:"msg"`
Data model.TaskList `json:"data"`
}{}
_, err := reporeq.
SetHeader("Content-Type", "application/json").
SetQueryParam(common.ACCESSTOKEN, o.accessToken).
SetResult(&reporesp).
Send()

req := common.GetRestyRequest(common.TIMEOUT)
r, _ := http.NewRequest("GET", taskListUrl, byt)
req.RawRequest = r
req.URL = taskListUrl
if err != nil {
return
}

_, err := req.
SetHeader("Content-Type", "application/json").
SetQueryParam(common.ACCESSTOKEN, o.accessToken).
SetBody(byt).
SetResult(&resp).
Send()
if len(reporesp.Data) == 0 {
return
}

if err != nil {
// assume occupied running tasks
ech <- err
return
}
// tasklist
var runningJobs atomic.Int64
var jwg sync.WaitGroup
var errs []error
var ech = make(chan error)
jwg.Add(1)
go func() {
defer jwg.Done()
for _, datum := range reporesp.Data {
jwg.Add(1)
dat := datum
go func() {
defer jwg.Done()
param := model.TaskListParam{
UserName: o.userName,
RepoName: dat.Name,
}

if len(resp.Data.Data.Tasks) == 0 {
return
}
b, _ := json.Marshal(param)
byt := bytes.NewBuffer(b)

for _, task := range resp.Data.Data.Tasks {
if task.Task.Status == RUNNING {
runningJobs.Add(1)
}
}
}()
}
}()
resp := struct {
Code int `json:"code"`
Msg string `json:"msg"`
Data model.TaskList `json:"data"`
}{}

go func() {
jwg.Wait()
close(ech)
}()
req := common.GetRestyRequest(common.TIMEOUT)
r, _ := http.NewRequest("GET", taskListUrl, byt)
req.RawRequest = r
req.URL = taskListUrl

for v := range ech {
errs = append(errs, v)
}
_, err := req.
SetHeader("Content-Type", "application/json").
SetQueryParam(common.ACCESSTOKEN, o.accessToken).
SetBody(byt).
SetResult(&resp).
Send()

// running tasks num
var runningNum int64
runningNum = runningJobs.Load()
run := &collector.Usage{}
run.Type = strings.ToUpper(RUNNINGTASK)
if len(errs) == 0 {
run.Total = &collector.UnitValue{
Unit: NUMBER,
Value: runningNum,
}
if err != nil {
// assume occupied running tasks
ech <- err
return
}

ch <- &collector.ClusterResource{Resource: run}
} else {
runningNum = int64(len(errs)) * 4
run.Total = &collector.UnitValue{
Unit: NUMBER,
Value: runningNum,
}
if len(resp.Data.Data.Tasks) == 0 {
return
}

ch <- &collector.ClusterResource{Resource: run}
for _, task := range resp.Data.Data.Tasks {
if task.Task.Status == RUNNING {
runningJobs.Add(1)
}
}
}()
}
}()

go func() {
wg.Wait()
close(ch)
jwg.Wait()
close(ech)
}()

for v := range ch {
resources = append(resources, v)
for v := range ech {
errs = append(errs, v)
}

res.Resources = resources
// running tasks num
var runningNum int64
runningNum = runningJobs.Load()
run := &collector.Usage{}
run.Type = strings.ToUpper(RUNNINGTASK)
if len(errs) == 0 {
run.Total = &collector.UnitValue{
Unit: NUMBER,
Value: runningNum,
}

return res, nil
ch <- &collector.ClusterResource{Resource: run}
} else {
runningNum = int64(len(errs)) * 4
run.Total = &collector.UnitValue{
Unit: NUMBER,
Value: runningNum,
}

ch <- &collector.ClusterResource{Resource: run}
}
}

func (o *OpenI) getOnlineInferUrl(ctx context.Context, taskId string, repoName string) (string, error) {


Loading…
Cancel
Save