|
- /*
-
- Copyright (c) [2023] [pcm]
- [pcm-coordinator] is licensed under Mulan PSL v2.
- You can use this software according to the terms and conditions of the Mulan PSL v2.
- You may obtain a copy of Mulan PSL v2 at:
- http://license.coscl.org.cn/MulanPSL2
- THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
- EITHER EXPaRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
- MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
- See the Mulan PSL v2 for more details.
-
- */
-
- package nacos
-
- import (
- "context"
- "encoding/json"
- "fmt"
- "github.com/JCCE-nudt/zero-contrib/zrpc/registry/nacos"
- "github.com/nacos-group/nacos-sdk-go/v2/clients"
- "github.com/nacos-group/nacos-sdk-go/v2/clients/nacos_client"
- "github.com/nacos-group/nacos-sdk-go/v2/clients/naming_client/naming_cache"
- "github.com/nacos-group/nacos-sdk-go/v2/clients/naming_client/naming_proxy"
- "github.com/nacos-group/nacos-sdk-go/v2/common/constant"
- "github.com/nacos-group/nacos-sdk-go/v2/common/http_agent"
- "github.com/nacos-group/nacos-sdk-go/v2/common/nacos_server"
- "github.com/nacos-group/nacos-sdk-go/v2/common/security"
- "github.com/nacos-group/nacos-sdk-go/v2/vo"
- "github.com/zeromicro/go-zero/core/logx"
- "github.com/zeromicro/go-zero/rest"
- "github.com/zeromicro/go-zero/zrpc"
- nacosVo "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/helper/nacos/vo"
- "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/utils"
- "net/http"
- "sync"
- )
-
- type (
- BootstrapConfig struct {
- NacosConfig NacosConfig
- }
-
- ListenConfig func(data string)
-
- NacosServerConfig struct {
- IpAddr string
- Port uint64
- }
-
- NacosClientConfig struct {
- NamespaceId string
- TimeoutMs uint64
- NotLoadCacheAtStart bool
- LogDir string
- CacheDir string
- LogLevel string
- }
-
- NacosConfig struct {
- ServerConfigs []NacosServerConfig
- ClientConfig NacosClientConfig
- DataId string
- Group string
- }
- )
-
- // NamingClient ...
- type NamingClient struct {
- nacos_client.INacosClient
- ctx context.Context
- cancel context.CancelFunc
- serviceProxy naming_proxy.INamingProxy
- serviceInfoHolder *naming_cache.ServiceInfoHolder
- clientConfig constant.ClientConfig
- }
-
- func (n *NacosConfig) Discovery(c *zrpc.RpcServerConf) {
- sc, cc := n.buildConfig()
- opts := nacos.NewNacosConfig(c.Name, c.ListenOn, sc, &cc)
- opts.Group = n.Group
- err := nacos.RegisterService(opts)
- if err != nil {
- panic(err)
- }
- }
-
- func (n *NacosConfig) DiscoveryRest(c *rest.RestConf) {
- sc, cc := n.buildConfig()
- opts := nacos.NewNacosConfig(c.Name, fmt.Sprintf("%s:%d", c.Host, c.Port), sc, &cc)
- err := nacos.RegisterService(opts)
- if err != nil {
- panic(err)
- }
- }
-
- func (n *NacosConfig) InitConfig(listenConfigCallback ListenConfig) string {
- //nacos server
- sc, cc := n.buildConfig()
-
- pa := vo.NacosClientParam{
- ClientConfig: &cc,
- ServerConfigs: sc,
- }
- configClient, err := clients.NewConfigClient(pa)
- if err != nil {
- panic(err)
- }
- //获取配置中心内容
- content, err := configClient.GetConfig(vo.ConfigParam{
- DataId: n.DataId,
- Group: n.Group,
- })
- if err != nil {
- panic(err)
- }
- //设置配置监听
- if err = configClient.ListenConfig(vo.ConfigParam{
- DataId: n.DataId,
- Group: n.Group,
- OnChange: func(namespace, group, dataId, data string) {
- //配置文件产生变化就会触发
- if len(data) == 0 {
- logx.Errorf("listen nacos data nil error , namespace : %s,group : %s , dataId : %s , data : %s", namespace, group, dataId, data)
- return
- }
- listenConfigCallback(data)
- },
- }); err != nil {
- panic(err)
- }
-
- if len(content) == 0 {
- panic("read nacos nacos content err , content is nil")
- }
-
- return content
- }
-
- func (n *NacosConfig) buildConfig() ([]constant.ServerConfig, constant.ClientConfig) {
- var sc []constant.ServerConfig
- if len(n.ServerConfigs) == 0 {
- panic("nacos server no set")
- }
- for _, serveConfig := range n.ServerConfigs {
- sc = append(sc, constant.ServerConfig{
- Port: serveConfig.Port,
- IpAddr: serveConfig.IpAddr,
- },
- )
- }
-
- //nacos client
- cc := constant.ClientConfig{
- NamespaceId: n.ClientConfig.NamespaceId,
- TimeoutMs: n.ClientConfig.TimeoutMs,
- NotLoadCacheAtStart: n.ClientConfig.NotLoadCacheAtStart,
- LogDir: n.ClientConfig.LogDir,
- CacheDir: n.ClientConfig.CacheDir,
- LogLevel: n.ClientConfig.LogLevel,
- }
- return sc, cc
- }
-
- type NacosServer struct {
- sync.RWMutex
- securityLogin security.AuthClient
- serverList []constant.ServerConfig
- httpAgent http_agent.IHttpAgent
- timeoutMs uint64
- endpoint string
- lastSrvRefTime int64
- vipSrvRefInterMills int64
- contextPath string
- currentIndex int32
- ServerSrcChangeSignal chan struct{}
- }
-
- // GetAllServicesInfo Get all Services
- func (n *NacosConfig) GetAllServicesInfo() (nacosVo.NacosServiceList, error) {
- nacosServiceList := nacosVo.NacosServiceList{}
- api := constant.SERVICE_BASE_PATH + "/catalog/services"
- nacosServer, err := nacos_server.NewNacosServer(context.Background(),
- []constant.ServerConfig{*constant.NewServerConfig(n.ServerConfigs[0].IpAddr, n.ServerConfigs[0].Port)},
- constant.ClientConfig{},
- &http_agent.HttpAgent{},
- 1000,
- "")
- if err != nil {
- return nacosServiceList, err
- }
- params := map[string]string{}
- params["namespaceId"] = n.ClientConfig.NamespaceId
- params["groupName"] = ""
- params["pageNo"] = "1"
- params["pageSize"] = "10000"
- result, err := nacosServer.ReqApi(api, params, http.MethodGet, constant.ClientConfig{})
- if err != nil {
- logx.Errorf("Failed to get all services ,error: <%+v>, namespace : <%s> ", err, n.ClientConfig.NamespaceId)
- return nacosServiceList, err
- }
- err1 := json.Unmarshal([]byte(result), &nacosServiceList)
- if err1 != nil {
- logx.Errorf("Conversion failed ,error: %+v, str: %s", err1, result)
- return nacosServiceList, err
- }
- return nacosServiceList, err
- }
-
- // GetAllGroupName Get all GroupName
- func (n *NacosConfig) GetAllGroupName() (nacosGroupList nacosVo.NacosGroupList, err error) {
- nacosServiceList := nacosVo.NacosServiceList{}
- api := constant.SERVICE_BASE_PATH + "/catalog/services"
- nacosServer, err := nacos_server.NewNacosServer(context.Background(),
- []constant.ServerConfig{*constant.NewServerConfig(n.ServerConfigs[0].IpAddr, n.ServerConfigs[0].Port)},
- constant.ClientConfig{},
- &http_agent.HttpAgent{},
- 1000,
- "")
- if err != nil {
- return nacosGroupList, err
- }
- params := map[string]string{}
- params["namespaceId"] = "test"
- params["groupName"] = ""
- params["pageNo"] = "1"
- params["pageSize"] = "10000"
- result, err := nacosServer.ReqApi(api, params, http.MethodGet, constant.ClientConfig{})
- err1 := json.Unmarshal([]byte(result), &nacosServiceList)
- if err1 != nil {
- logx.Errorf("Conversion failed ,error: %+v, str: %s", err1, result)
- return nacosGroupList, err1
- }
- for _, v := range nacosServiceList.ServiceList {
- nacosGroupList.GroupName = append(nacosGroupList.GroupName, v.GroupName)
- }
- nacosGroupList.GroupName = utils.RemoveDuplication_map(nacosGroupList.GroupName)
-
- return nacosGroupList, err
- }
|