Browse Source

added GetResourceUsage

pull/350/head
tzwang 11 months ago
parent
commit
a6267288be
3 changed files with 141 additions and 4 deletions
  1. +3
    -3
      internal/scheduler/service/collector/collector.go
  2. +7
    -0
      internal/storeLink/openi.go
  3. +131
    -1
      internal/storeLink/shuguangai.go

+ 3
- 3
internal/scheduler/service/collector/collector.go View File

@@ -19,13 +19,13 @@ type AiCollector interface {

type ResourceUsage struct {
ClusterId string
Resources []*Usage
Usages []*Usage
}

type Usage struct {
Total float64
Total int64
Type string
Available float64
Available int64
}

type ResourceStats struct {


+ 7
- 0
internal/storeLink/openi.go View File

@@ -1 +1,8 @@
package storeLink

type OpenI struct {
}

func NewOpenI() *OpenI {
return &OpenI{}
}

+ 131
- 1
internal/storeLink/shuguangai.go View File

@@ -27,15 +27,18 @@ import (
"gitlink.org.cn/JointCloud/pcm-coordinator/pkg/constants"
"gitlink.org.cn/JointCloud/pcm-coordinator/pkg/utils"
"mime/multipart"
"net/http"
"strconv"
"strings"
"sync"
"sync/atomic"
"time"
)

const (
RAM_SIZE_1G = 1024 // 1G
WORKER_NUMBER = 1
ACCPU = "CPU"
DCU = "DCU"
DCU_TOPS = 24.5
PYTORCH = "Pytorch"
@@ -1001,5 +1004,132 @@ func (s *ShuguangAi) CheckModelExistence(ctx context.Context, name string, mtype
}

func (s *ShuguangAi) GetResourceUsage(ctx context.Context) (*collector.ResourceUsage, error) {
return nil, nil
var wg sync.WaitGroup
wg.Add(2)

var usageCh = make(chan *collector.Usage, 2)

resUsage := &collector.ResourceUsage{
ClusterId: strconv.FormatInt(s.participantId, 10),
}

var usages []*collector.Usage

go func() {
queueResp, err := s.aCRpc.SelectQueueByUser(ctx, nil)
if err != nil {
wg.Done()
return
}

if len(queueResp.Data) == 0 {
wg.Done()
return
}

var data *hpcAC.QueueData
for _, datum := range queueResp.Data {
if datum.QueueName == RESOURCE_GROUP {
data = datum
break
}
}

var freeNodes int64
var cpuPerNode int64
var dcuPerNode int64
freeNodes, _ = strconv.ParseInt(data.QueFreeNodes, 10, 10)
cpuPerNode, _ = strconv.ParseInt(data.QueMaxPPN, 10, 10)
dcuPerNode, _ = strconv.ParseInt(data.QueMaxDcuPN, 10, 10)

cpu := &collector.Usage{
Type: ACCPU,
Total: freeNodes * cpuPerNode,
}

usageCh <- cpu

dcu := &collector.Usage{
Type: DCU,
Total: freeNodes * dcuPerNode,
}

usageCh <- dcu

close(usageCh)

}()

go func() {
jobList, err := s.aCRpc.ListJob(ctx, nil)
if err != nil {
wg.Done()
return
}
if len(jobList.Jobs) == 0 {
wg.Done()
return
}

var cpureqed atomic.Int64
var dcureqed atomic.Int64
var jwg sync.WaitGroup
for _, j := range jobList.Jobs {
jwg.Add(1)
job := j
go func() {
h := http.Request{}
jreq := &hpcAC.JobDetailReq{
JobId: job.JobId,
}
detail, err := s.aCRpc.GetJobDetail(h.Context(), jreq)
if err != nil {
jwg.Done()
return
}
cpureqed.Add(int64(detail.Data.ProcNumReq))
dcureqed.Add(int64(detail.Data.DcuNumReq))
jwg.Done()
}()
}
jwg.Wait()

for v := range usageCh {
switch v.Type {
case CPU:
v.Available = v.Total - cpureqed.Load()
case DCU:
v.Available = v.Total - dcureqed.Load()
}
}

//for i := 0; i < len(usageCh); i++ {
// select {
// case v := <-usageCh:
// switch v.Type {
// case CPU:
// v.Available = v.Total - cpureqed.Load()
// case DCU:
// v.Available = v.Total - dcureqed.Load()
// }
//
// case <-time.After(3 * time.Second):
// fmt.Println("timeout")
// continue
// }
//}

wg.Done()

}()

wg.Wait()

for v := range usageCh {
usages = append(usages, v)
}

resUsage.Usages = usages

return resUsage, nil
}

Loading…
Cancel
Save