Signed-off-by: devad <cossjie@foxmail.com>
Former-commit-id: 04346b58bc
scheduler_restruct
| @@ -19,10 +19,6 @@ import ( | |||||
| ) | ) | ||||
| func AddCronGroup(svc *svc.ServiceContext) { | func AddCronGroup(svc *svc.ServiceContext) { | ||||
| // 同步任务信息到core端 | |||||
| svc.Cron.AddFunc("*/5 * * * * ?", func() { | |||||
| SyncParticipantRpc(svc) | |||||
| }) | |||||
| // 删除三天前的监控信息 | // 删除三天前的监控信息 | ||||
| svc.Cron.AddFunc("0 0 0 ? * ? ", func() { | svc.Cron.AddFunc("0 0 0 ? * ? ", func() { | ||||
| ClearMetricsData(svc) | ClearMetricsData(svc) | ||||
| @@ -1,54 +0,0 @@ | |||||
| /* | |||||
| Copyright (c) [2023] [pcm] | |||||
| [pcm-coordinator] is licensed under Mulan PSL v2. | |||||
| You can use this software according to the terms and conditions of the Mulan PSL v2. | |||||
| You may obtain a copy of Mulan PSL v2 at: | |||||
| http://license.coscl.org.cn/MulanPSL2 | |||||
| THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, | |||||
| EITHER EXPaRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, | |||||
| MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE. | |||||
| See the Mulan PSL v2 for more details. | |||||
| */ | |||||
| package cron | |||||
| import ( | |||||
| "github.com/zeromicro/go-zero/zrpc" | |||||
| "gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/svc" | |||||
| "gitlink.org.cn/jcce-pcm/pcm-coordinator/pkg/constants" | |||||
| "gitlink.org.cn/jcce-pcm/pcm-coordinator/pkg/models" | |||||
| "gitlink.org.cn/jcce-pcm/pcm-coordinator/pkg/tracker" | |||||
| "gitlink.org.cn/jcce-pcm/pcm-participant-kubernetes/kubernetesclient" | |||||
| ) | |||||
| func SyncParticipantRpc(svc *svc.ServiceContext) { | |||||
| // 查询出所有p端信息 | |||||
| var participants []*models.ScParticipantPhyInfo | |||||
| tx := svc.DbEngin.Find(&participants) | |||||
| if tx.Error != nil { | |||||
| } | |||||
| for _, participant := range participants { | |||||
| if len(participant.RpcAddress) != 0 && svc.K8sRpc[participant.Id] == nil { | |||||
| switch participant.Type { | |||||
| case constants.CLOUD, "SEALOS": | |||||
| // 初始化p端rpc客户端 | |||||
| svc.K8sRpc[participant.Id] = kubernetesclient.NewKubernetes(zrpc.MustNewClient(zrpc.RpcClientConf{ | |||||
| Endpoints: []string{participant.RpcAddress}, | |||||
| NonBlock: true, | |||||
| })) | |||||
| // 初始化p端prometheus client | |||||
| promClient, err := tracker.NewPrometheus(participant.MetricsUrl) | |||||
| if err != nil { | |||||
| return | |||||
| } | |||||
| svc.PromClient[participant.Id] = promClient | |||||
| } | |||||
| } | |||||
| } | |||||
| } | |||||
| @@ -27,7 +27,7 @@ func NewAppDetailLogic(ctx context.Context, svcCtx *svc.ServiceContext) *AppDeta | |||||
| func (l *AppDetailLogic) AppDetail(req *types.AppDetailReq) (resp *kubernetes.AppDetailResp, err error) { | func (l *AppDetailLogic) AppDetail(req *types.AppDetailReq) (resp *kubernetes.AppDetailResp, err error) { | ||||
| resp = &kubernetes.AppDetailResp{} | resp = &kubernetes.AppDetailResp{} | ||||
| //调用p端接口查询应用详情 | //调用p端接口查询应用详情 | ||||
| appDetail, err := l.svcCtx.K8sRpc[1727670068428410880].GetAppDetail(context.Background(), &kubernetes.AppDetailReq{ | |||||
| appDetail, err := l.svcCtx.K8sRpc.GetAppDetail(context.Background(), &kubernetes.AppDetailReq{ | |||||
| Namespace: req.NsID, | Namespace: req.NsID, | ||||
| Name: req.Name, | Name: req.Name, | ||||
| }) | }) | ||||
| @@ -27,7 +27,7 @@ func NewAppPodsLogic(ctx context.Context, svcCtx *svc.ServiceContext) *AppPodsLo | |||||
| func (l *AppPodsLogic) AppPods(req *types.AppDetailReq) (resp *kubernetes.PodDetailResp, err error) { | func (l *AppPodsLogic) AppPods(req *types.AppDetailReq) (resp *kubernetes.PodDetailResp, err error) { | ||||
| resp = &kubernetes.PodDetailResp{} | resp = &kubernetes.PodDetailResp{} | ||||
| //调用p端接口查询应用详情 | //调用p端接口查询应用详情 | ||||
| podList, err := l.svcCtx.K8sRpc[1727670068428410880].GetAppPodsByAppName(context.Background(), &kubernetes.AppDetailReq{ | |||||
| podList, err := l.svcCtx.K8sRpc.GetAppPodsByAppName(context.Background(), &kubernetes.AppDetailReq{ | |||||
| Namespace: req.NsID, | Namespace: req.NsID, | ||||
| Name: req.Name, | Name: req.Name, | ||||
| }) | }) | ||||
| @@ -30,7 +30,7 @@ func NewNoticeTenantLogic(ctx context.Context, svcCtx *svc.ServiceContext) *Noti | |||||
| func (l *NoticeTenantLogic) NoticeTenant() (resp *types.CloudResp, err error) { | func (l *NoticeTenantLogic) NoticeTenant() (resp *types.CloudResp, err error) { | ||||
| var tenants []*models.ScTenantInfo | var tenants []*models.ScTenantInfo | ||||
| //从p端kubernetes获取租户信息 | //从p端kubernetes获取租户信息 | ||||
| namespace, err := l.svcCtx.K8sRpc[1727670068428410880].ListNamespace(context.Background(), &kubernetes.NamespaceListReq{}) | |||||
| namespace, err := l.svcCtx.K8sRpc.ListNamespace(context.Background(), &kubernetes.NamespaceListReq{}) | |||||
| if err != nil { | if err != nil { | ||||
| logx.Errorf("获取租户信息失败:%v", err) | logx.Errorf("获取租户信息失败:%v", err) | ||||
| return nil, err | return nil, err | ||||
| @@ -28,7 +28,7 @@ func NewUpdateTenantLogic(ctx context.Context, svcCtx *svc.ServiceContext) *Upda | |||||
| func (l *UpdateTenantLogic) UpdateTenant(req *types.UpdateTenantReq) (resp *types.CloudResp, err error) { | func (l *UpdateTenantLogic) UpdateTenant(req *types.UpdateTenantReq) (resp *types.CloudResp, err error) { | ||||
| var tenants []*models.ScTenantInfo | var tenants []*models.ScTenantInfo | ||||
| //从p端kubernetes获取租户信息 | //从p端kubernetes获取租户信息 | ||||
| namespace, err := l.svcCtx.K8sRpc[1696118513460056064].ListNamespace(context.Background(), &kubernetes.NamespaceListReq{}) | |||||
| namespace, err := l.svcCtx.K8sRpc.ListNamespace(context.Background(), &kubernetes.NamespaceListReq{}) | |||||
| if err != nil { | if err != nil { | ||||
| return nil, err | return nil, err | ||||
| } | } | ||||
| @@ -46,7 +46,7 @@ func (l *TaskDetailLogic) TaskDetail(req *types.TaskDetailReq) (resp *types.Task | |||||
| var clouds []models.Cloud | var clouds []models.Cloud | ||||
| l.svcCtx.DbEngin.Where("task_id = ?", req.TaskId).Find(&clouds) | l.svcCtx.DbEngin.Where("task_id = ?", req.TaskId).Find(&clouds) | ||||
| for _, cloud := range clouds { | for _, cloud := range clouds { | ||||
| if l.svcCtx.K8sRpc[cloud.ParticipantId] == nil { | |||||
| if l.svcCtx.K8sRpc == nil { | |||||
| continue | continue | ||||
| } | } | ||||
| // 查询监控地址 | // 查询监控地址 | ||||
| @@ -57,7 +57,7 @@ func (l *TaskDetailLogic) TaskDetail(req *types.TaskDetailReq) (resp *types.Task | |||||
| var pods []*kubernetesclient.Pod | var pods []*kubernetesclient.Pod | ||||
| switch cloud.Kind { | switch cloud.Kind { | ||||
| case "Job": | case "Job": | ||||
| jobResult, err := l.svcCtx.K8sRpc[cloud.ParticipantId].JobDetail(context.Background(), &kubernetesclient.JobDetailReq{ | |||||
| jobResult, err := l.svcCtx.K8sRpc.JobDetail(context.Background(), &kubernetesclient.JobDetailReq{ | |||||
| Namespace: cloud.Namespace, | Namespace: cloud.Namespace, | ||||
| Name: cloud.Name, | Name: cloud.Name, | ||||
| }) | }) | ||||
| @@ -67,7 +67,7 @@ func (l *TaskDetailLogic) TaskDetail(req *types.TaskDetailReq) (resp *types.Task | |||||
| // 查询出job下关联的pod列表 | // 查询出job下关联的pod列表 | ||||
| uid := jobResult.Job.Metadata.Labels["controller-uid"] | uid := jobResult.Job.Metadata.Labels["controller-uid"] | ||||
| LabelSelector := "controller-uid=" + uid | LabelSelector := "controller-uid=" + uid | ||||
| podResp, err := l.svcCtx.K8sRpc[cloud.ParticipantId].PodList(context.Background(), &kubernetesclient.PodListReq{ | |||||
| podResp, err := l.svcCtx.K8sRpc.PodList(context.Background(), &kubernetesclient.PodListReq{ | |||||
| ListOptions: &kubernetesclient.ListOptions{ | ListOptions: &kubernetesclient.ListOptions{ | ||||
| LabelSelector: &LabelSelector, | LabelSelector: &LabelSelector, | ||||
| }, | }, | ||||
| @@ -78,7 +78,7 @@ func (l *TaskDetailLogic) TaskDetail(req *types.TaskDetailReq) (resp *types.Task | |||||
| pods = podResp.PodList.Items | pods = podResp.PodList.Items | ||||
| case "Deployment": | case "Deployment": | ||||
| deploymentResult, err := l.svcCtx.K8sRpc[cloud.ParticipantId].DeploymentDetail(context.Background(), &kubernetesclient.DeploymentDetailReq{ | |||||
| deploymentResult, err := l.svcCtx.K8sRpc.DeploymentDetail(context.Background(), &kubernetesclient.DeploymentDetailReq{ | |||||
| Namespace: cloud.Namespace, | Namespace: cloud.Namespace, | ||||
| Name: cloud.Name, | Name: cloud.Name, | ||||
| }) | }) | ||||
| @@ -89,7 +89,7 @@ func (l *TaskDetailLogic) TaskDetail(req *types.TaskDetailReq) (resp *types.Task | |||||
| uid := deploymentResult.Deployment.Spec.Selector.MatchLabels["app"] | uid := deploymentResult.Deployment.Spec.Selector.MatchLabels["app"] | ||||
| LabelSelector := "app=" + uid | LabelSelector := "app=" + uid | ||||
| podResp, err := l.svcCtx.K8sRpc[cloud.ParticipantId].PodList(context.Background(), &kubernetesclient.PodListReq{ | |||||
| podResp, err := l.svcCtx.K8sRpc.PodList(context.Background(), &kubernetesclient.PodListReq{ | |||||
| ListOptions: &kubernetesclient.ListOptions{ | ListOptions: &kubernetesclient.ListOptions{ | ||||
| LabelSelector: &LabelSelector, | LabelSelector: &LabelSelector, | ||||
| }, | }, | ||||
| @@ -57,7 +57,7 @@ type ServiceContext struct { | |||||
| DockerClient *client.Client | DockerClient *client.Client | ||||
| Downloader *s3manager.Downloader | Downloader *s3manager.Downloader | ||||
| Uploader *s3manager.Uploader | Uploader *s3manager.Uploader | ||||
| K8sRpc map[int64]kubernetesclient.Kubernetes | |||||
| K8sRpc kubernetesclient.Kubernetes | |||||
| PromClient map[int64]tracker.Prometheus | PromClient map[int64]tracker.Prometheus | ||||
| ParticipantRpc participantservice.ParticipantService | ParticipantRpc participantservice.ParticipantService | ||||
| } | } | ||||
| @@ -115,7 +115,7 @@ func NewServiceContext(c config.Config) *ServiceContext { | |||||
| ACRpc: hpcacclient.NewHpcAC(zrpc.MustNewClient(c.ACRpcConf)), | ACRpc: hpcacclient.NewHpcAC(zrpc.MustNewClient(c.ACRpcConf)), | ||||
| OctopusRpc: octopusclient.NewOctopus(zrpc.MustNewClient(c.OctopusRpcConf)), | OctopusRpc: octopusclient.NewOctopus(zrpc.MustNewClient(c.OctopusRpcConf)), | ||||
| OpenstackRpc: openstackclient.NewOpenstack(zrpc.MustNewClient(c.OpenstackRpcConf)), | OpenstackRpc: openstackclient.NewOpenstack(zrpc.MustNewClient(c.OpenstackRpcConf)), | ||||
| K8sRpc: make(map[int64]kubernetesclient.Kubernetes), | |||||
| K8sRpc: kubernetesclient.NewKubernetes(zrpc.MustNewClient(c.K8sNativeConf)), | |||||
| PromClient: make(map[int64]tracker.Prometheus), | PromClient: make(map[int64]tracker.Prometheus), | ||||
| ParticipantRpc: participantservice.NewParticipantService(zrpc.MustNewClient(c.PcmCoreRpcConf)), | ParticipantRpc: participantservice.NewParticipantService(zrpc.MustNewClient(c.PcmCoreRpcConf)), | ||||
| DockerClient: dockerClient, | DockerClient: dockerClient, | ||||