| @@ -3,6 +3,7 @@ package schedule | |||
| import ( | |||
| "context" | |||
| "fmt" | |||
| "gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/entity" | |||
| "slices" | |||
| "strings" | |||
| "time" | |||
| @@ -26,11 +27,6 @@ const ( | |||
| QUERY_RESOURCE_RETRY = 3 | |||
| ) | |||
| type ClustersWithDataDistributes struct { | |||
| Clusters []*strategy.AssignedCluster | |||
| DataDistributes *types.DataDistribute | |||
| } | |||
| type ScheduleCreateTaskLogic struct { | |||
| logx.Logger | |||
| ctx context.Context | |||
| @@ -47,14 +43,14 @@ func NewScheduleCreateTaskLogic(ctx context.Context, svcCtx *svc.ServiceContext) | |||
| } | |||
| } | |||
| func generateFilteredDataDistributes(clusters []*strategy.AssignedCluster, distribute types.DataDistribute) *ClustersWithDataDistributes { | |||
| func generateFilteredDataDistributes(clusters []*strategy.AssignedCluster, distribute types.DataDistribute) *entity.ClustersWithDataDistributes { | |||
| var clusterIds []string | |||
| for _, c := range clusters { | |||
| clusterIds = append(clusterIds, c.ClusterId) | |||
| } | |||
| clustersWithDataDistributes := &ClustersWithDataDistributes{ | |||
| clustersWithDataDistributes := &entity.ClustersWithDataDistributes{ | |||
| Clusters: clusters, | |||
| DataDistributes: &types.DataDistribute{ | |||
| Dataset: make([]*types.DatasetDistribute, 0), | |||
| @@ -263,7 +259,7 @@ func (l *ScheduleCreateTaskLogic) getAssignedClustersByStrategy(resources *types | |||
| return assignedClusters, nil | |||
| } | |||
| func (l *ScheduleCreateTaskLogic) createTask(taskName string, desc string, userId int64, strategyName string, clustersWithDataDistributes *ClustersWithDataDistributes, token string, userIp string, userName string) (int64, error) { | |||
| func (l *ScheduleCreateTaskLogic) createTask(taskName string, desc string, userId int64, strategyName string, clustersWithDataDistributes *entity.ClustersWithDataDistributes, token string, userIp string, userName string) (int64, error) { | |||
| var synergyStatus int64 | |||
| if len(clustersWithDataDistributes.Clusters) > 1 { | |||
| synergyStatus = 1 | |||
| @@ -60,7 +60,7 @@ func (l *ScheduleRunTaskLogic) ScheduleRunTask(req *types.RunTaskReq) (resp *typ | |||
| } | |||
| } | |||
| var clustersWithDataDistributes ClustersWithDataDistributes | |||
| var clustersWithDataDistributes entity.ClustersWithDataDistributes | |||
| err = yaml.Unmarshal([]byte(task.YamlString), &clustersWithDataDistributes) | |||
| if err != nil { | |||
| return nil, err | |||
| @@ -128,7 +128,7 @@ func (l *ScheduleRunTaskLogic) SaveResult(task *models.Task, results []*schedule | |||
| } | |||
| func updateClustersByScheduledDatas(taskId int64, clustersWithDataDistributes *ClustersWithDataDistributes, scheduledDatas []*types.DataScheduleResults) ([]*strategy.AssignedCluster, error) { | |||
| func updateClustersByScheduledDatas(taskId int64, clustersWithDataDistributes *entity.ClustersWithDataDistributes, scheduledDatas []*types.DataScheduleResults) ([]*strategy.AssignedCluster, error) { | |||
| assignedClusters := make([]*strategy.AssignedCluster, 0) | |||
| if len(scheduledDatas) == 0 { | |||
| @@ -3,6 +3,9 @@ package xjlab | |||
| import ( | |||
| "context" | |||
| "fmt" | |||
| "gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/entity" | |||
| "gopkg.in/yaml.v2" | |||
| "strings" | |||
| "github.com/pkg/errors" | |||
| "github.com/zeromicro/go-zero/core/logx" | |||
| @@ -87,3 +90,45 @@ func (l *TaskResourceUsageLogic) GetHpcTaskResourceUsage(req *types.FId) (resp i | |||
| return resp, nil | |||
| } | |||
| func (l *TaskResourceUsageLogic) GetAiTaskResourceUsage(task *models.Task) (resp interface{}, err error) { | |||
| var clustersWithDataDistributes entity.ClustersWithDataDistributes | |||
| err = yaml.Unmarshal([]byte(task.YamlString), &clustersWithDataDistributes) | |||
| if err != nil { | |||
| return nil, err | |||
| } | |||
| rs := struct { | |||
| Card string `json:"card,optional"` | |||
| Cpu string `json:"cpu,optional"` | |||
| Memory string `json:"memory,optional"` | |||
| }{} | |||
| resourcesRequired := clustersWithDataDistributes.Clusters[0].ResourcesRequired | |||
| for _, res := range resourcesRequired { | |||
| //typeName, ok := res["type"] | |||
| //if !ok { | |||
| // continue | |||
| //} | |||
| name, ok := res["name"] | |||
| if !ok { | |||
| continue | |||
| } | |||
| if str, ok := name.(string); ok { | |||
| name = strings.ToLower(str) | |||
| } else { | |||
| continue | |||
| } | |||
| num, ok := res["number"] | |||
| if !ok { | |||
| continue | |||
| } | |||
| if str, ok := num.(string); ok { | |||
| num = strings.ToLower(str) | |||
| } else { | |||
| continue | |||
| } | |||
| } | |||
| resp = rs | |||
| return resp, nil | |||
| } | |||
| @@ -1,5 +1,10 @@ | |||
| package entity | |||
| import ( | |||
| "gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/strategy" | |||
| "gitlink.org.cn/JointCloud/pcm-coordinator/internal/types" | |||
| ) | |||
| type ProviderParams struct { | |||
| Disk_avail float64 | |||
| Mem_avail float64 | |||
| @@ -16,3 +21,8 @@ type JsonData struct { | |||
| Name string `json:"name"` | |||
| Id string `json:"id"` | |||
| } | |||
| type ClustersWithDataDistributes struct { | |||
| Clusters []*strategy.AssignedCluster | |||
| DataDistributes *types.DataDistribute | |||
| } | |||