Browse Source

updated shuguangai getresources func

Former-commit-id: edfde2b701
pull/113/head
tzwang 1 year ago
parent
commit
5d3da2c8a3
3 changed files with 202 additions and 87 deletions
  1. +31
    -7
      api/internal/scheduler/schedulers/aiScheduler.go
  2. +120
    -70
      api/internal/storeLink/shuguangai.go
  3. +51
    -10
      api/internal/storeLink/storeLink.go

+ 31
- 7
api/internal/scheduler/schedulers/aiScheduler.go View File

@@ -182,6 +182,13 @@ func (as *AiScheduler) AssignTask(clusters []*strategy.AssignedCluster) (interfa
})
msg += fmt.Sprintf("clusterId: %v , error: %v \n", e.clusterId, e.err.Error())
}
for s := range ch {
if s.Msg != "" {
msg += fmt.Sprintf("clusterId: %v , error: %v \n", s.ClusterId, s.Msg)
} else {
msg += fmt.Sprintf("clusterId: %v , submitted successfully, taskId: %v \n", s.ClusterId, s.TaskId)
}
}
return nil, errors.New(msg)
}

@@ -196,18 +203,26 @@ func (as *AiScheduler) AssignTask(clusters []*strategy.AssignedCluster) (interfa
func (as *AiScheduler) findClustersWithResources() ([]*collector.ResourceStats, error) {
var wg sync.WaitGroup
var ch = make(chan *collector.ResourceStats, len(*as.ResourceCollector))
var errCh = make(chan error, len(*as.ResourceCollector))
var errCh = make(chan interface{}, len(*as.ResourceCollector))

var resourceSpecs []*collector.ResourceStats
var errs []error
var errs []interface{}

for _, resourceCollector := range *as.ResourceCollector {
for s, resourceCollector := range *as.ResourceCollector {
wg.Add(1)
rc := resourceCollector
id := s
go func() {
spec, err := rc.GetResourceStats(as.ctx)
if err != nil {
errCh <- err
e := struct {
err error
clusterId string
}{
err: err,
clusterId: id,
}
errCh <- e
wg.Done()
return
}
@@ -227,13 +242,22 @@ func (as *AiScheduler) findClustersWithResources() ([]*collector.ResourceStats,
errs = append(errs, e)
}

if len(errs) != 0 {
if len(errs) == len(*as.ResourceCollector) {
return nil, errors.New("get resources failed")
}

if len(resourceSpecs) == 0 {
return nil, errors.New("no resource found")
if len(errs) != 0 {
var msg string
for _, err := range errs {
e := (err).(struct {
err error
clusterId string
})
msg += fmt.Sprintf("clusterId: %v , error: %v \n", e.clusterId, e.err.Error())
}
return nil, errors.New(msg)
}

return resourceSpecs, nil
}



+ 120
- 70
api/internal/storeLink/shuguangai.go View File

@@ -26,6 +26,8 @@ import (
"gitlink.org.cn/JointCloud/pcm-coordinator/pkg/utils"
"strconv"
"strings"
"sync"
"time"
)

const (
@@ -266,96 +268,144 @@ func (s *ShuguangAi) QuerySpecs(ctx context.Context) (interface{}, error) {
}

func (s *ShuguangAi) GetResourceStats(ctx context.Context) (*collector.ResourceStats, error) {
//balance
userReq := &hpcAC.GetUserInfoReq{}
userinfo, err := s.aCRpc.GetUserInfo(ctx, userReq)
if err != nil {
return nil, err
var wg sync.WaitGroup
wg.Add(4)
var cBalance = make(chan float64)
var cMemTotal = make(chan float64)
var cTotalCpu = make(chan int64)

resourceStats := &collector.ResourceStats{
ClusterId: strconv.FormatInt(s.participantId, 10),
Name: s.platform,
}
balance, _ := strconv.ParseFloat(userinfo.Data.AccountBalance, 64)

//resource limit
limitReq := &hpcAC.QueueReq{}
limitResp, err := s.aCRpc.QueryUserQuotasLimit(ctx, limitReq)
if err != nil {
return nil, err
dcu := &collector.Card{
Platform: SHUGUANGAI,
Type: CARD,
Name: DCU,
TOpsAtFp16: DCU_TOPS,
}
totalCpu := limitResp.Data.AccountMaxCpu
totalDcu := limitResp.Data.AccountMaxDcu

//balance
go func() {
userReq := &hpcAC.GetUserInfoReq{}
userinfo, err := s.aCRpc.GetUserInfo(ctx, userReq)
if err != nil {
return
}
balance, _ := strconv.ParseFloat(userinfo.Data.AccountBalance, 64)
resourceStats.Balance = balance

cBalance <- balance
}()

//resource limit
go func() {
limitReq := &hpcAC.QueueReq{}
limitResp, err := s.aCRpc.QueryUserQuotasLimit(ctx, limitReq)
if err != nil {
wg.Done()
return
}
totalCpu := limitResp.Data.AccountMaxCpu
totalDcu := limitResp.Data.AccountMaxDcu

dcu.CardNum = int32(totalDcu)
resourceStats.CpuCoreTotal = totalCpu

cTotalCpu <- totalCpu
wg.Done()
}()

//disk
diskReq := &hpcAC.ParaStorQuotaReq{}
diskResp, err := s.aCRpc.ParaStorQuota(ctx, diskReq)
if err != nil {
return nil, err
}
go func() {
diskReq := &hpcAC.ParaStorQuotaReq{}
diskResp, err := s.aCRpc.ParaStorQuota(ctx, diskReq)
if err != nil {
wg.Done()
return
}

totalDisk := common.RoundFloat(diskResp.Data[0].Threshold*KB*KB*KB, 3)
availDisk := common.RoundFloat((diskResp.Data[0].Threshold-diskResp.Data[0].Usage)*KB*KB*KB, 3)

totalDisk := common.RoundFloat(diskResp.Data[0].Threshold*KB*KB*KB, 3)
availDisk := common.RoundFloat((diskResp.Data[0].Threshold-diskResp.Data[0].Usage)*KB*KB*KB, 3)
resourceStats.DiskTotal = totalDisk
resourceStats.DiskAvail = availDisk
wg.Done()
}()

//memory
nodeResp, err := s.aCRpc.GetNodeResources(ctx, nil)
if err != nil {
return nil, err
}
memSize := common.RoundFloat(float64(nodeResp.Data.MemorySize)*KB*KB, 3) // MB to BYTES
go func() {
nodeResp, err := s.aCRpc.GetNodeResources(ctx, nil)
if err != nil {
wg.Done()
return
}
memSize := common.RoundFloat(float64(nodeResp.Data.MemorySize)*KB*KB, 3) // MB to BYTES

resourceStats.MemTotal = memSize
cMemTotal <- memSize
wg.Done()
}()

//resources being occupied
memberJobResp, err := s.aCRpc.GetMemberJobs(ctx, nil)
if err != nil {
return nil, err
}
var CpuCoreAvail int64
var MemAvail float64
if len(memberJobResp.Data) != 0 {
CpuCoreAvail = totalCpu
MemAvail = memSize
} else {
var cpuCoreUsed int64
var memUsed float64
for _, datum := range memberJobResp.Data {
cpuCoreUsed += datum.CpuCore
}
memUsed = float64(cpuCoreUsed * 2 * KB * KB * KB) // 2 GB per cpu core
if cpuCoreUsed > totalCpu {
CpuCoreAvail = 0
} else {
CpuCoreAvail = totalCpu - cpuCoreUsed
go func() {
memSize := <-cMemTotal
totalCpu := <-cTotalCpu
memberJobResp, err := s.aCRpc.GetMemberJobs(ctx, nil)
if err != nil {
wg.Done()
return
}
if memUsed > memSize {
MemAvail = 0
var cpuCoreAvail int64
var memAvail float64
if len(memberJobResp.Data) != 0 {
cpuCoreAvail = totalCpu
memAvail = memSize
} else {
MemAvail = memSize - memUsed
var cpuCoreUsed int64
var memUsed float64
for _, datum := range memberJobResp.Data {
cpuCoreUsed += datum.CpuCore
}
memUsed = float64(cpuCoreUsed * 2 * KB * KB * KB) // 2 GB per cpu core
if cpuCoreUsed > totalCpu {
cpuCoreAvail = 0
} else {
cpuCoreAvail = totalCpu - cpuCoreUsed
}
if memUsed > memSize {
memAvail = 0
} else {
memAvail = memSize - memUsed
}
}
}
resourceStats.CpuCoreAvail = cpuCoreAvail
resourceStats.MemAvail = memAvail
wg.Done()
}()

//usable hours
var balance float64

select {
case v := <-cBalance:
balance = v
case <-time.After(2 * time.Second):
return nil, errors.New("get balance rpc call failed")
}

var cards []*collector.Card
cardHours := common.RoundFloat(balance/DCUPRICEPERHOUR, 3)
cpuHours := common.RoundFloat(balance/CPUCOREPRICEPERHOUR, 3)

dcu := &collector.Card{
Platform: SHUGUANGAI,
Type: CARD,
Name: DCU,
TOpsAtFp16: DCU_TOPS,
CardHours: cardHours,
CardNum: int32(totalDcu),
}
dcu.CardHours = cardHours
resourceStats.CpuCoreHours = cpuHours

wg.Wait()

cards = append(cards, dcu)
resourceStats := &collector.ResourceStats{
ClusterId: strconv.FormatInt(s.participantId, 10),
Name: s.platform,
Balance: balance,
CpuCoreTotal: totalCpu,
CpuCoreAvail: CpuCoreAvail,
DiskTotal: totalDisk,
DiskAvail: availDisk,
MemTotal: memSize,
MemAvail: MemAvail,
CpuCoreHours: cpuHours,
CardsAvail: cards,
}
resourceStats.CardsAvail = cards

return resourceStats, nil
}


+ 51
- 10
api/internal/storeLink/storeLink.go View File

@@ -16,6 +16,7 @@ package storeLink

import (
"context"
"fmt"
"github.com/pkg/errors"
"gitlink.org.cn/JointCloud/pcm-ac/hpcAC"
"gitlink.org.cn/JointCloud/pcm-ac/hpcacclient"
@@ -129,19 +130,27 @@ func GetResourceTypes() []string {

func GetDatasetsNames(ctx context.Context, collectorMap *map[string]collector.AiCollector) ([]string, error) {
var wg sync.WaitGroup
var errCh = make(chan error, len(*collectorMap))
var errs []error
var errCh = make(chan interface{}, len(*collectorMap))
var errs []interface{}
var names []string
var mu sync.Mutex
colMap := *collectorMap
for _, col := range colMap {
for s, col := range colMap {
wg.Add(1)
c := col
id := s
go func() {
var ns []string
specs, err := c.GetDatasetsSpecs(ctx)
if err != nil {
errCh <- err
e := struct {
err error
clusterId string
}{
err: err,
clusterId: id,
}
errCh <- e
wg.Done()
return
}
@@ -167,12 +176,24 @@ func GetDatasetsNames(ctx context.Context, collectorMap *map[string]collector.Ai
wg.Wait()
close(errCh)

if len(errs) == len(colMap) {
return nil, errors.New("get DatasetsNames failed")
}

for e := range errCh {
errs = append(errs, e)
}

if len(errs) != 0 {
return nil, errors.New("get DatasetsNames failed")
var msg string
for _, err := range errs {
e := (err).(struct {
err error
clusterId string
})
msg += fmt.Sprintf("clusterId: %v , error: %v \n", e.clusterId, e.err.Error())
}
return nil, errors.New(msg)
}

names = common.RemoveDuplicates(names)
@@ -182,19 +203,27 @@ func GetDatasetsNames(ctx context.Context, collectorMap *map[string]collector.Ai
func GetAlgorithms(ctx context.Context, collectorMap *map[string]collector.AiCollector, resourceType string, taskType string, dataset string) ([]string, error) {
var names []string
var wg sync.WaitGroup
var errCh = make(chan error, len(*collectorMap))
var errs []error
var errCh = make(chan interface{}, len(*collectorMap))
var errs []interface{}
var mu sync.Mutex

colMap := *collectorMap
for _, col := range colMap {
for s, col := range colMap {
wg.Add(1)
c := col
id := s
go func() {
var ns []string
algorithms, err := c.GetAlgorithms(ctx)
if err != nil {
errCh <- err
e := struct {
err error
clusterId string
}{
err: err,
clusterId: id,
}
errCh <- e
wg.Done()
return
}
@@ -240,10 +269,22 @@ func GetAlgorithms(ctx context.Context, collectorMap *map[string]collector.AiCol
errs = append(errs, e)
}

if len(errs) != 0 {
if len(errs) == len(colMap) {
return nil, errors.New("get Algorithms failed")
}

if len(errs) != 0 {
var msg string
for _, err := range errs {
e := (err).(struct {
err error
clusterId string
})
msg += fmt.Sprintf("clusterId: %v , error: %v \n", e.clusterId, e.err.Error())
}
return nil, errors.New(msg)
}

names = common.RemoveDuplicates(names)
return names, nil
}


Loading…
Cancel
Save