diff --git a/internal/storeLink/shuguangai.go b/internal/storeLink/shuguangai.go index 9a688734..2fbce5f5 100644 --- a/internal/storeLink/shuguangai.go +++ b/internal/storeLink/shuguangai.go @@ -31,7 +31,6 @@ import ( "strconv" "strings" "sync" - "sync/atomic" "time" ) @@ -1105,322 +1104,323 @@ func (s *ShuguangAi) CheckModelExistence(ctx context.Context, name string, mtype } func (s *ShuguangAi) GetResourceSpecs(ctx context.Context) (*collector.ResourceSpec, error) { - var timeout = 5 - var wg sync.WaitGroup - var uwg sync.WaitGroup - wg.Add(3) - uwg.Add(3) - var ch = make(chan *collector.Usage, 2) - var qCh = make(chan *collector.Usage, 2) - var sch = make(chan *collector.Usage, 1) - var cresCh = make(chan *collector.ClusterResource) - - resUsage := &collector.ResourceSpec{ - ClusterId: strconv.FormatInt(s.participantId, 10), - } - - var resources []interface{} - - // 查询用户可访问队列 - go func() { - defer wg.Done() - defer close(ch) - done := make(chan bool) - go func() { - defer uwg.Done() - queueResp, err := s.aCRpc.SelectQueueByUser(ctx, nil) - if err != nil { - done <- true - return - } - - if len(queueResp.Data) == 0 { - done <- true - return - } - - var data *hpcAC.QueueData - for _, datum := range queueResp.Data { - if datum.QueueName == RESOURCE_GROUP { - data = datum - break - } - } - - //rate - queChargeRate, _ := strconv.ParseFloat(data.QueChargeRate, 64) - rate := &collector.Usage{ - Type: strings.ToUpper(RATE), - Total: &collector.UnitValue{Unit: PERHOUR, Value: queChargeRate}, - } - - cresCh <- &collector.ClusterResource{Resource: rate} - - var freeNodes int64 - var cpuPerNode int64 - var dcuPerNode int64 - freeNodes, _ = strconv.ParseInt(data.QueFreeNodes, 10, 10) - cpuPerNode, _ = strconv.ParseInt(data.QueMaxPPN, 10, 10) - dcuPerNode, _ = strconv.ParseInt(data.QueMaxDcuPN, 10, 10) - - cpu := &collector.Usage{ - Type: strings.ToUpper(CPU), - Total: &collector.UnitValue{Unit: CPUCORE, Value: freeNodes * cpuPerNode}, - } - - ch <- cpu - - dcu := &collector.Usage{ - Type: DCU, - Name: Z100L, - Total: &collector.UnitValue{Unit: NUMBER, Value: freeNodes * dcuPerNode}, - } - - ch <- dcu - - done <- true - }() - - select { - case <-done: - return - case <-time.After(time.Duration(timeout) * time.Second): - return - } - - }() - - // 查询实时作业列表 - go func() { - defer wg.Done() - defer close(qCh) - done := make(chan bool) - go func() { - defer uwg.Done() - jobList, err := s.aCRpc.ListJob(ctx, nil) - if err != nil { - done <- true - return - } - - // running task num - run := &collector.Usage{} - run.Type = strings.ToUpper(RUNNINGTASK) - - if len(jobList.Jobs) == 0 { - var v int64 - run.Total = &collector.UnitValue{ - Unit: NUMBER, - Value: v, - } - - cresCh <- &collector.ClusterResource{Resource: run} - - done <- true - return - } else { - var v int64 - v = int64(len(jobList.Jobs)) - run.Total = &collector.UnitValue{ - Unit: NUMBER, - Value: v, - } - - cresCh <- &collector.ClusterResource{Resource: run} - } - - var cpureqed atomic.Int64 - var dcureqed atomic.Int64 - //var jwg sync.WaitGroup - //for _, j := range jobList.Jobs { - // jwg.Add(1) - // job := j - // go func() { - // defer jwg.Done() - // h := http.Request{} - // jreq := &hpcAC.JobDetailReq{ - // JobId: job.JobId, - // } - // detail, err := s.aCRpc.GetJobDetail(h.Context(), jreq) - // if err != nil || detail.Data == nil { - // return - // } - // - // cpureqed.Add(int64(detail.Data.ProcNumReq)) - // dcureqed.Add(int64(detail.Data.DcuNumReq)) - // }() - //} - //jwg.Wait() - - for v := range ch { - switch v.Type { - case strings.ToUpper(CPU): - t, _ := v.Total.Value.(int64) - avail := t - cpureqed.Load() - cpu := &collector.Usage{ - Type: strings.ToUpper(CPU), - Name: v.Name, - Total: v.Total, - Available: &collector.UnitValue{Unit: CPUCORE, Value: avail}, - } - - qCh <- cpu - - case DCU: - t, _ := v.Total.Value.(int64) - avail := t - dcureqed.Load() - dcu := &collector.Usage{ - Type: DCU, - Name: v.Name, - Total: v.Total, - Available: &collector.UnitValue{Unit: CPUCORE, Value: avail}, - } - - qCh <- dcu - } - } - done <- true - }() - - select { - case <-done: - return - case <-time.After(time.Duration(timeout) * time.Second): - return - } - }() - - // 查询用户共享存储配额及使用量 - go func() { - defer wg.Done() - defer close(sch) - done := make(chan bool) - storage := &collector.Usage{} - go func() { - - diskReq := &hpcAC.ParaStorQuotaReq{} - diskResp, err := s.aCRpc.ParaStorQuota(ctx, diskReq) - if err != nil || diskResp.Data == nil { - done <- true - return - } - - totalStorage := common.RoundFloat(diskResp.Data[0].Threshold, 0) - availStorage := common.RoundFloat((diskResp.Data[0].Threshold - diskResp.Data[0].Usage), 0) - - storage.Type = STORAGE - storage.Name = DISK - storage.Total = &collector.UnitValue{ - Unit: GIGABYTE, - Value: totalStorage, - } - storage.Available = &collector.UnitValue{ - Unit: GIGABYTE, - Value: availStorage, - } - - done <- true - }() - - select { - case <-done: - sch <- storage - case <-time.After(time.Duration(timeout) * time.Second): - return - } - }() - - // 查询用户信息 - go func() { - defer uwg.Done() - done := make(chan bool) - cres := &collector.ClusterResource{} - go func() { - userReq := &hpcAC.GetUserInfoReq{} - userinfo, err := s.aCRpc.GetUserInfo(ctx, userReq) - if err != nil || userinfo.Data == nil { - done <- true - return - } - balance, _ := strconv.ParseFloat(userinfo.Data.AccountBalance, 64) - bal := &collector.Usage{} - bal.Type = strings.ToUpper(BALANCE) - bal.Total = &collector.UnitValue{ - Unit: RMB, - Value: balance, - } - cres.Resource = bal - - done <- true - }() - - select { - case <-done: - cresCh <- cres - case <-time.After(time.Duration(timeout) * time.Second): - return - } - }() - - go func() { - uwg.Wait() - close(cresCh) - }() - - for v := range cresCh { - resources = append(resources, v) - } - - wg.Wait() - - cres := &collector.ClusterResource{} - bres := make([]*collector.Usage, 0) - if len(qCh) == 0 { - for v := range ch { - v.Available = v.Total - switch v.Type { - case DCU: - cres.Resource = v - case strings.ToUpper(CPU): - bres = append(bres, v) - } - } - } else { - for v := range qCh { - switch v.Type { - case DCU: - cres.Resource = v - case strings.ToUpper(CPU): - bres = append(bres, v) - } - } - } - - // temporarily set memory usage - //var dcuNum int + return nil, nil + //var timeout = 5 + //var wg sync.WaitGroup + //var uwg sync.WaitGroup + //wg.Add(3) + //uwg.Add(3) + //var ch = make(chan *collector.Usage, 2) + //var qCh = make(chan *collector.Usage, 2) + //var sch = make(chan *collector.Usage, 1) + //var cresCh = make(chan *collector.ClusterResource) // - //mem := &collector.Usage{ - // Type: strings.ToUpper(MEMORY), - // Name: strings.ToUpper(RAM), - // Total: &collector.UnitValue{Unit: GIGABYTE, Value: 2 * RAM_SIZE_1G}, - // Available: &collector.UnitValue{Unit: GIGABYTE, Value: 2 * RAM_SIZE_1G}, + //resUsage := &collector.ResourceSpec{ + // ClusterId: strconv.FormatInt(s.participantId, 10), //} - //vmem := &collector.Usage{ - // Type: strings.ToUpper(MEMORY), - // Name: strings.ToUpper(VRAM), - // Total: &collector.UnitValue{Unit: GIGABYTE, Value: 2 * RAM_SIZE_1G}, - // Available: &collector.UnitValue{Unit: GIGABYTE, Value: 2 * RAM_SIZE_1G}, + // + //var resources []interface{} + // + //// 查询用户可访问队列 + //go func() { + // defer wg.Done() + // defer close(ch) + // done := make(chan bool) + // go func() { + // defer uwg.Done() + // queueResp, err := s.aCRpc.SelectQueueByUser(ctx, nil) + // if err != nil { + // done <- true + // return + // } + // + // if len(queueResp.Data) == 0 { + // done <- true + // return + // } + // + // var data *hpcAC.QueueData + // for _, datum := range queueResp.Data { + // if datum.QueueName == RESOURCE_GROUP { + // data = datum + // break + // } + // } + // + // //rate + // queChargeRate, _ := strconv.ParseFloat(data.QueChargeRate, 64) + // rate := &collector.Usage{ + // Type: strings.ToUpper(RATE), + // Total: &collector.UnitValue{Unit: PERHOUR, Value: queChargeRate}, + // } + // + // cresCh <- &collector.ClusterResource{Resource: rate} + // + // var freeNodes int64 + // var cpuPerNode int64 + // var dcuPerNode int64 + // freeNodes, _ = strconv.ParseInt(data.QueFreeNodes, 10, 10) + // cpuPerNode, _ = strconv.ParseInt(data.QueMaxPPN, 10, 10) + // dcuPerNode, _ = strconv.ParseInt(data.QueMaxDcuPN, 10, 10) + // + // cpu := &collector.Usage{ + // Type: strings.ToUpper(CPU), + // Total: &collector.UnitValue{Unit: CPUCORE, Value: freeNodes * cpuPerNode}, + // } + // + // ch <- cpu + // + // dcu := &collector.Usage{ + // Type: DCU, + // Name: Z100L, + // Total: &collector.UnitValue{Unit: NUMBER, Value: freeNodes * dcuPerNode}, + // } + // + // ch <- dcu + // + // done <- true + // }() + // + // select { + // case <-done: + // return + // case <-time.After(time.Duration(timeout) * time.Second): + // return + // } + // + //}() + // + //// 查询实时作业列表 + //go func() { + // defer wg.Done() + // defer close(qCh) + // done := make(chan bool) + // go func() { + // defer uwg.Done() + // jobList, err := s.aCRpc.ListJob(ctx, nil) + // if err != nil { + // done <- true + // return + // } + // + // // running task num + // run := &collector.Usage{} + // run.Type = strings.ToUpper(RUNNINGTASK) + // + // if len(jobList.Jobs) == 0 { + // var v int64 + // run.Total = &collector.UnitValue{ + // Unit: NUMBER, + // Value: v, + // } + // + // cresCh <- &collector.ClusterResource{Resource: run} + // + // done <- true + // return + // } else { + // var v int64 + // v = int64(len(jobList.Jobs)) + // run.Total = &collector.UnitValue{ + // Unit: NUMBER, + // Value: v, + // } + // + // cresCh <- &collector.ClusterResource{Resource: run} + // } + // + // var cpureqed atomic.Int64 + // var dcureqed atomic.Int64 + // //var jwg sync.WaitGroup + // //for _, j := range jobList.Jobs { + // // jwg.Add(1) + // // job := j + // // go func() { + // // defer jwg.Done() + // // h := http.Request{} + // // jreq := &hpcAC.JobDetailReq{ + // // JobId: job.JobId, + // // } + // // detail, err := s.aCRpc.GetJobDetail(h.Context(), jreq) + // // if err != nil || detail.Data == nil { + // // return + // // } + // // + // // cpureqed.Add(int64(detail.Data.ProcNumReq)) + // // dcureqed.Add(int64(detail.Data.DcuNumReq)) + // // }() + // //} + // //jwg.Wait() + // + // for v := range ch { + // switch v.Type { + // case strings.ToUpper(CPU): + // t, _ := v.Total.Value.(int64) + // avail := t - cpureqed.Load() + // cpu := &collector.Usage{ + // Type: strings.ToUpper(CPU), + // Name: v.Name, + // Total: v.Total, + // Available: &collector.UnitValue{Unit: CPUCORE, Value: avail}, + // } + // + // qCh <- cpu + // + // case DCU: + // t, _ := v.Total.Value.(int64) + // avail := t - dcureqed.Load() + // dcu := &collector.Usage{ + // Type: DCU, + // Name: v.Name, + // Total: v.Total, + // Available: &collector.UnitValue{Unit: CPUCORE, Value: avail}, + // } + // + // qCh <- dcu + // } + // } + // done <- true + // }() + // + // select { + // case <-done: + // return + // case <-time.After(time.Duration(timeout) * time.Second): + // return + // } + //}() + // + //// 查询用户共享存储配额及使用量 + //go func() { + // defer wg.Done() + // defer close(sch) + // done := make(chan bool) + // storage := &collector.Usage{} + // go func() { + // + // diskReq := &hpcAC.ParaStorQuotaReq{} + // diskResp, err := s.aCRpc.ParaStorQuota(ctx, diskReq) + // if err != nil || diskResp.Data == nil { + // done <- true + // return + // } + // + // totalStorage := common.RoundFloat(diskResp.Data[0].Threshold, 0) + // availStorage := common.RoundFloat((diskResp.Data[0].Threshold - diskResp.Data[0].Usage), 0) + // + // storage.Type = STORAGE + // storage.Name = DISK + // storage.Total = &collector.UnitValue{ + // Unit: GIGABYTE, + // Value: totalStorage, + // } + // storage.Available = &collector.UnitValue{ + // Unit: GIGABYTE, + // Value: availStorage, + // } + // + // done <- true + // }() + // + // select { + // case <-done: + // sch <- storage + // case <-time.After(time.Duration(timeout) * time.Second): + // return + // } + //}() + // + //// 查询用户信息 + //go func() { + // defer uwg.Done() + // done := make(chan bool) + // cres := &collector.ClusterResource{} + // go func() { + // userReq := &hpcAC.GetUserInfoReq{} + // userinfo, err := s.aCRpc.GetUserInfo(ctx, userReq) + // if err != nil || userinfo.Data == nil { + // done <- true + // return + // } + // balance, _ := strconv.ParseFloat(userinfo.Data.AccountBalance, 64) + // bal := &collector.Usage{} + // bal.Type = strings.ToUpper(BALANCE) + // bal.Total = &collector.UnitValue{ + // Unit: RMB, + // Value: balance, + // } + // cres.Resource = bal + // + // done <- true + // }() + // + // select { + // case <-done: + // cresCh <- cres + // case <-time.After(time.Duration(timeout) * time.Second): + // return + // } + //}() + // + //go func() { + // uwg.Wait() + // close(cresCh) + //}() + // + //for v := range cresCh { + // resources = append(resources, v) //} - //bres = append(bres, mem) - //bres = append(bres, vmem) - - for v := range sch { - bres = append(bres, v) - } - - cres.BaseResources = bres - resources = append(resources, cres) - resUsage.Resources = resources - - return resUsage, nil + // + //wg.Wait() + // + //cres := &collector.ClusterResource{} + //bres := make([]*collector.Usage, 0) + //if len(qCh) == 0 { + // for v := range ch { + // v.Available = v.Total + // switch v.Type { + // case DCU: + // cres.Resource = v + // case strings.ToUpper(CPU): + // bres = append(bres, v) + // } + // } + //} else { + // for v := range qCh { + // switch v.Type { + // case DCU: + // cres.Resource = v + // case strings.ToUpper(CPU): + // bres = append(bres, v) + // } + // } + //} + // + //// temporarily set memory usage + ////var dcuNum int + //// + ////mem := &collector.Usage{ + //// Type: strings.ToUpper(MEMORY), + //// Name: strings.ToUpper(RAM), + //// Total: &collector.UnitValue{Unit: GIGABYTE, Value: 2 * RAM_SIZE_1G}, + //// Available: &collector.UnitValue{Unit: GIGABYTE, Value: 2 * RAM_SIZE_1G}, + ////} + ////vmem := &collector.Usage{ + //// Type: strings.ToUpper(MEMORY), + //// Name: strings.ToUpper(VRAM), + //// Total: &collector.UnitValue{Unit: GIGABYTE, Value: 2 * RAM_SIZE_1G}, + //// Available: &collector.UnitValue{Unit: GIGABYTE, Value: 2 * RAM_SIZE_1G}, + ////} + ////bres = append(bres, mem) + ////bres = append(bres, vmem) + // + //for v := range sch { + // bres = append(bres, v) + //} + // + //cres.BaseResources = bres + //resources = append(resources, cres) + //resUsage.Resources = resources + // + //return resUsage, nil }