Browse Source

fix getresources and dynamicResources strategy bugs

Former-commit-id: fb05379230
pull/94/head
tzwang 1 year ago
parent
commit
db55f5e5a3
5 changed files with 49 additions and 29 deletions
  1. +5
    -5
      api/internal/logic/schedule/schedulesubmitlogic.go
  2. +2
    -3
      api/internal/scheduler/database/aiStorage.go
  3. +26
    -5
      api/internal/scheduler/schedulers/aiScheduler.go
  4. +1
    -1
      api/internal/scheduler/strategy/dynamicResources.go
  5. +15
    -15
      api/internal/storeLink/shuguangai.go

+ 5
- 5
api/internal/logic/schedule/schedulesubmitlogic.go View File

@@ -28,11 +28,11 @@ func NewScheduleSubmitLogic(ctx context.Context, svcCtx *svc.ServiceContext) *Sc
func (l *ScheduleSubmitLogic) ScheduleSubmit(req *types.ScheduleReq) (resp *types.ScheduleResp, err error) { func (l *ScheduleSubmitLogic) ScheduleSubmit(req *types.ScheduleReq) (resp *types.ScheduleResp, err error) {
resp = &types.ScheduleResp{} resp = &types.ScheduleResp{}
opt := &option.AiOption{ opt := &option.AiOption{
ResourceType: req.AiOption.ResourceType,
Tops: 0,
TaskType: req.AiOption.TaskType,
DatasetsName: req.AiOption.Datasets,
AlgorithmName: "cnn",
ResourceType: req.AiOption.ResourceType,
Tops: 0,
TaskType: req.AiOption.TaskType,
DatasetsName: req.AiOption.Datasets,
//AlgorithmName: "cnn",
StrategyName: req.AiOption.Strategy, StrategyName: req.AiOption.Strategy,
ClusterToStaticWeight: nil, ClusterToStaticWeight: nil,
Params: []string{ Params: []string{


+ 2
- 3
api/internal/scheduler/database/aiStorage.go View File

@@ -2,7 +2,6 @@ package database


import ( import (
"github.com/zeromicro/go-zero/core/logx" "github.com/zeromicro/go-zero/core/logx"
"gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/scheduler/strategy"
"gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/types" "gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/types"
"gitlink.org.cn/JointCloud/pcm-coordinator/pkg/constants" "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/constants"
"gitlink.org.cn/JointCloud/pcm-coordinator/pkg/models" "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/models"
@@ -24,12 +23,12 @@ func (s *AiStorage) GetParticipants() (*types.ClusterListResp, error) {
return &resp, nil return &resp, nil
} }


func (s *AiStorage) SaveTask(cluster strategy.AssignedCluster) error {
func (s *AiStorage) SaveTask(name string) error {
// 构建主任务结构体 // 构建主任务结构体
taskModel := models.Task{ taskModel := models.Task{
Status: constants.Saved, Status: constants.Saved,
Description: "ai task", Description: "ai task",
Name: "testAi",
Name: name,
CommitTime: time.Now(), CommitTime: time.Now(),
} }
// 保存任务数据到数据库 // 保存任务数据到数据库


+ 26
- 5
api/internal/scheduler/schedulers/aiScheduler.go View File

@@ -100,6 +100,8 @@ func (as *AiScheduler) AssignTask(clusters []*strategy.AssignedCluster) ([]inter
return nil, errors.New("clusters is nil") return nil, errors.New("clusters is nil")
} }


//res := struct {
//}{}
var wg sync.WaitGroup var wg sync.WaitGroup
var result []interface{} var result []interface{}
var errs []error var errs []error
@@ -115,6 +117,7 @@ func (as *AiScheduler) AssignTask(clusters []*strategy.AssignedCluster) ([]inter
wg.Add(1) wg.Add(1)
go func() { go func() {
resp, err := executorMap[c.Name].Execute(as.ctx, as.option) resp, err := executorMap[c.Name].Execute(as.ctx, as.option)

if err != nil { if err != nil {
// TODO: database operation // TODO: database operation
errCh <- err errCh <- err
@@ -122,15 +125,20 @@ func (as *AiScheduler) AssignTask(clusters []*strategy.AssignedCluster) ([]inter
return return
} }
// TODO: database operation // TODO: database operation
ch <- resp
data := struct {
Resp interface{}
ClusterId int64
}{
Resp: resp,
ClusterId: c.ParticipantId,
}
ch <- data
wg.Done() wg.Done()
}() }()
} }
wg.Wait() wg.Wait()

for s := range ch {
result = append(result, s)
}
close(ch)
close(errCh)


for e := range errCh { for e := range errCh {
errs = append(errs, e) errs = append(errs, e)
@@ -140,6 +148,19 @@ func (as *AiScheduler) AssignTask(clusters []*strategy.AssignedCluster) ([]inter
return nil, errors.New("submit task failed") return nil, errors.New("submit task failed")
} }


for s := range ch {
data := (s).(struct {
Resp interface{}
ClusterId int64
})

result = append(result, data.Resp)
}

err := as.AiStorages.SaveTask(as.option.TaskName)
if err != nil {
return nil, err
}
return result, nil return result, nil
} }




+ 1
- 1
api/internal/scheduler/strategy/dynamicResources.go View File

@@ -49,7 +49,7 @@ func (ps *DynamicResourcesStrategy) Schedule() ([]*AssignedCluster, error) {
if opt.ResourceType == "computeCard" { if opt.ResourceType == "computeCard" {
var maxCurrentCardHours float64 var maxCurrentCardHours float64
for _, card := range res.CardsAvail { for _, card := range res.CardsAvail {
cardHours := common.RoundFloat(card.TOpsAtFp16*card.CardHours, 3)
cardHours := common.RoundFloat( /*card.TOpsAtFp16**/ card.CardHours, 3)
if cardHours > maxCurrentCardHours { if cardHours > maxCurrentCardHours {
maxCurrentCardHours = cardHours maxCurrentCardHours = cardHours
} }


+ 15
- 15
api/internal/storeLink/shuguangai.go View File

@@ -284,14 +284,14 @@ func (s *ShuguangAi) GetResourceStats(ctx context.Context) (*collector.ResourceS
totalDcu := limitResp.Data.AccountMaxDcu totalDcu := limitResp.Data.AccountMaxDcu


//disk //disk
diskReq := &hpcAC.ParaStorQuotaReq{}
diskResp, err := s.aCRpc.ParaStorQuota(ctx, diskReq)
if err != nil {
return nil, err
}
totalDisk := common.RoundFloat(diskResp.Data[0].Threshold*KB*KB*KB, 3)
availDisk := common.RoundFloat((diskResp.Data[0].Threshold-diskResp.Data[0].Usage)*KB*KB*KB, 3)
//diskReq := &hpcAC.ParaStorQuotaReq{}
//diskResp, err := s.aCRpc.ParaStorQuota(ctx, diskReq)
//if err != nil {
// return nil, err
//}
//
//totalDisk := common.RoundFloat(diskResp.Data[0].Threshold*KB*KB*KB, 3)
//availDisk := common.RoundFloat((diskResp.Data[0].Threshold-diskResp.Data[0].Usage)*KB*KB*KB, 3)


//memory //memory
nodeResp, err := s.aCRpc.GetNodeResources(ctx, nil) nodeResp, err := s.aCRpc.GetNodeResources(ctx, nil)
@@ -349,12 +349,12 @@ func (s *ShuguangAi) GetResourceStats(ctx context.Context) (*collector.ResourceS
Balance: balance, Balance: balance,
CpuCoreTotal: totalCpu, CpuCoreTotal: totalCpu,
CpuCoreAvail: CpuCoreAvail, CpuCoreAvail: CpuCoreAvail,
DiskTotal: totalDisk,
DiskAvail: availDisk,
MemTotal: memSize,
MemAvail: MemAvail,
CpuCoreHours: cpuHours,
CardsAvail: cards,
//DiskTotal: totalDisk,
//DiskAvail: availDisk,
MemTotal: memSize,
MemAvail: MemAvail,
CpuCoreHours: cpuHours,
CardsAvail: cards,
} }


return resourceStats, nil return resourceStats, nil
@@ -381,7 +381,7 @@ func (s *ShuguangAi) GetAlgorithms(ctx context.Context) ([]*collector.Algorithm,
var algorithms []*collector.Algorithm var algorithms []*collector.Algorithm
for _, t := range GetTaskTypes() { for _, t := range GetTaskTypes() {
taskType := t taskType := t
req := &hpcAC.GetFileListReq{Limit: 100, Path: ALGORITHM_DIR + FORWARD_SLASH + taskType, Start: 0}
req := &hpcAC.GetFileListReq{Limit: 100, Path: ALGORITHM_DIR + FORWARD_SLASH + taskType, Start: 0, Order: "asc", OrderBy: "name", KeyWord: ""}
list, err := s.aCRpc.GetFileList(ctx, req) list, err := s.aCRpc.GetFileList(ctx, req)
if err != nil { if err != nil {
return nil, err return nil, err


Loading…
Cancel
Save