|
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232 |
- package database
-
- import (
- "github.com/zeromicro/go-zero/core/logx"
- "gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/scheduler/schedulers/option"
- "gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/types"
- "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/constants"
- "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/models"
- "gorm.io/gorm"
- "strconv"
- "time"
- )
-
- type AiStorage struct {
- DbEngin *gorm.DB
- }
-
- func (s *AiStorage) GetParticipants() (*types.ClusterListResp, error) {
- var resp types.ClusterListResp
- tx := s.DbEngin.Raw("select * from t_cluster where `deleted_at` IS NULL ORDER BY create_time Desc").Scan(&resp.List)
- if tx.Error != nil {
- logx.Errorf(tx.Error.Error())
- return nil, tx.Error
- }
- return &resp, nil
- }
-
- func (s *AiStorage) GetClustersByAdapterId(id string) (*types.ClusterListResp, error) {
- var resp types.ClusterListResp
- tx := s.DbEngin.Raw("select * from t_cluster where `deleted_at` IS NULL and `adapter_id` = ? ORDER BY create_time Desc", id).Scan(&resp.List)
- if tx.Error != nil {
- logx.Errorf(tx.Error.Error())
- return nil, tx.Error
- }
- return &resp, nil
- }
-
- func (s *AiStorage) GetAdapterIdsByType(adapterType string) ([]string, error) {
- var list []types.AdapterInfo
- var ids []string
- db := s.DbEngin.Model(&types.AdapterInfo{}).Table("t_adapter")
- db = db.Where("type = ?", adapterType)
- err := db.Order("create_time desc").Find(&list).Error
- if err != nil {
- return nil, err
- }
- for _, info := range list {
- ids = append(ids, info.Id)
- }
- return ids, nil
- }
-
- func (s *AiStorage) GetAdaptersByType(adapterType string) ([]*types.AdapterInfo, error) {
- var list []*types.AdapterInfo
- db := s.DbEngin.Model(&types.AdapterInfo{}).Table("t_adapter")
- db = db.Where("type = ?", adapterType)
- err := db.Order("create_time desc").Find(&list).Error
- if err != nil {
- return nil, err
- }
- return list, nil
- }
-
- func (s *AiStorage) GetAiTasksByAdapterId(adapterId string) ([]*models.TaskAi, error) {
- var resp []*models.TaskAi
- tx := s.DbEngin.Raw("select * from task_ai where `adapter_id` = ? ", adapterId).Scan(&resp)
- if tx.Error != nil {
- logx.Errorf(tx.Error.Error())
- return nil, tx.Error
- }
- return resp, nil
- }
-
- func (s *AiStorage) SaveTask(name string, strategyCode int64, synergyStatus int64) (int64, error) {
- // 构建主任务结构体
- taskModel := models.Task{
- Status: constants.Saved,
- Description: "ai task",
- Name: name,
- SynergyStatus: synergyStatus,
- Strategy: strategyCode,
- AdapterTypeDict: 1,
- CommitTime: time.Now(),
- }
- // 保存任务数据到数据库
- tx := s.DbEngin.Create(&taskModel)
- if tx.Error != nil {
- return 0, tx.Error
- }
- return taskModel.Id, nil
- }
-
- func (s *AiStorage) SaveAiTask(taskId int64, option *option.AiOption, clusterId string, jobId string, status string, msg string) error {
- // 构建主任务结构体
- aId, err := strconv.ParseInt(option.AdapterId, 10, 64)
- if err != nil {
- return err
- }
- cId, err := strconv.ParseInt(clusterId, 10, 64)
- if err != nil {
- return err
- }
- aiTaskModel := models.TaskAi{
- TaskId: taskId,
- AdapterId: aId,
- ClusterId: cId,
- Name: option.TaskName,
- Replica: int64(option.Replica),
- JobId: jobId,
- TaskType: option.TaskType,
- Strategy: option.StrategyName,
- Status: status,
- Msg: msg,
- CommitTime: time.Now(),
- }
- // 保存任务数据到数据库
- tx := s.DbEngin.Create(&aiTaskModel)
- if tx.Error != nil {
- return tx.Error
- }
- return nil
- }
-
- func (s *AiStorage) SaveClusterTaskQueue(adapterId string, clusterId string, queueNum int64) error {
- aId, err := strconv.ParseInt(adapterId, 10, 64)
- if err != nil {
- return err
- }
- cId, err := strconv.ParseInt(clusterId, 10, 64)
- if err != nil {
- return err
- }
- taskQueue := models.TClusterTaskQueue{
- AdapterId: aId,
- ClusterId: cId,
- QueueNum: queueNum,
- }
- tx := s.DbEngin.Create(&taskQueue)
- if tx.Error != nil {
- return tx.Error
- }
- return nil
- }
-
- func (s *AiStorage) GetClusterTaskQueues(adapterId string, clusterId string) ([]*models.TClusterTaskQueue, error) {
- var taskQueues []*models.TClusterTaskQueue
- tx := s.DbEngin.Raw("select * from t_cluster_task_queue where `adapter_id` = ? and `cluster_id` = ?", adapterId, clusterId).Scan(&taskQueues)
- if tx.Error != nil {
- logx.Errorf(tx.Error.Error())
- return nil, tx.Error
- }
- return taskQueues, nil
- }
-
- func (s *AiStorage) GetAiTaskIdByClusterIdAndTaskId(clusterId string, taskId string) (string, error) {
- var aiTask models.TaskAi
- tx := s.DbEngin.Raw("select * from task_ai where `cluster_id` = ? and `task_id` = ?", clusterId, taskId).Scan(&aiTask)
- if tx.Error != nil {
- logx.Errorf(tx.Error.Error())
- return "", tx.Error
- }
- return aiTask.JobId, nil
- }
-
- func (s *AiStorage) GetClusterResourcesById(clusterId string) (*models.TClusterResource, error) {
- var clusterResource models.TClusterResource
- tx := s.DbEngin.Raw("select * from t_cluster_resource where `cluster_id` = ?", clusterId).Scan(&clusterResource)
- if tx.Error != nil {
- logx.Errorf(tx.Error.Error())
- return nil, tx.Error
- }
- return &clusterResource, nil
- }
-
- func (s *AiStorage) SaveClusterResources(clusterId string, clusterName string, clusterType int64, cpuAvail float64, cpuTotal float64,
- memAvail float64, memTotal float64, diskAvail float64, diskTotal float64, gpuAvail float64, gpuTotal float64, cardTotal int64, topsTotal float64) error {
- cId, err := strconv.ParseInt(clusterId, 10, 64)
- if err != nil {
- return err
- }
- clusterResource := models.TClusterResource{
- ClusterId: cId,
- ClusterName: clusterName,
- ClusterType: clusterType,
- CpuAvail: cpuAvail,
- CpuTotal: cpuTotal,
- MemAvail: memAvail,
- MemTotal: memTotal,
- DiskAvail: diskAvail,
- DiskTotal: diskTotal,
- GpuAvail: gpuAvail,
- GpuTotal: gpuTotal,
- CardTotal: cardTotal,
- CardTopsTotal: topsTotal,
- }
- tx := s.DbEngin.Create(&clusterResource)
- if tx.Error != nil {
- return tx.Error
- }
- return nil
- }
-
- func (s *AiStorage) UpdateClusterResources(clusterResource *models.TClusterResource) error {
- tx := s.DbEngin.Updates(clusterResource)
- if tx.Error != nil {
- return tx.Error
- }
- return nil
- }
-
- func (s *AiStorage) UpdateAiTask(task *models.TaskAi) error {
- tx := s.DbEngin.Updates(task)
- if tx.Error != nil {
- return tx.Error
- }
- return nil
- }
-
- func (s *AiStorage) GetStrategyCode(name string) (int64, error) {
- var strategy int64
- sqlStr := `select t_dict_item.item_value
- from t_dict
- left join t_dict_item on t_dict.id = t_dict_item.dict_id
- where item_text = ?
- and t_dict.dict_code = 'schedule_Strategy'`
- //查询调度策略
- err := s.DbEngin.Raw(sqlStr, name).Scan(&strategy).Error
- if err != nil {
- return strategy, nil
- }
- return strategy, nil
- }
|