|
|
@@ -23,7 +23,7 @@ import ( |
|
|
|
) |
|
|
|
|
|
|
|
func init() { |
|
|
|
extension.SetRegistry(constant.ETCDV3_KEY, newETCDRegistry) |
|
|
|
extension.SetRegistry(constant.Etcdv3Key, newETCDRegistry) |
|
|
|
} |
|
|
|
|
|
|
|
type etcdEventListener struct { |
|
|
@@ -60,7 +60,7 @@ type leaseWrapper struct { |
|
|
|
|
|
|
|
// Lookup Service Discovery |
|
|
|
func (r *etcdRegistry) Lookup() ([]string, error) { |
|
|
|
resp, err := r.client.Get(context.Background(), constant.ETCDV3_REGISTRY_PREFIX+r.clusterName, clientv3.WithPrefix()) |
|
|
|
resp, err := r.client.Get(context.Background(), constant.Etcdv3RegistryPrefix+r.clusterName, clientv3.WithPrefix()) |
|
|
|
if err != nil { |
|
|
|
return nil, err |
|
|
|
} |
|
|
@@ -70,7 +70,7 @@ func (r *etcdRegistry) Lookup() ([]string, error) { |
|
|
|
addrs = append(addrs, string(kv.Value)) |
|
|
|
} |
|
|
|
|
|
|
|
err = r.Subscribe("", &etcdEventListener{}) |
|
|
|
err = r.Subscribe(&etcdEventListener{}) |
|
|
|
if err != nil { |
|
|
|
return nil, err |
|
|
|
} |
|
|
@@ -98,13 +98,13 @@ func (r *etcdRegistry) UnRegister(addr *registry.Address) error { |
|
|
|
return err |
|
|
|
} |
|
|
|
|
|
|
|
func (r *etcdRegistry) Subscribe(cluster string, listener registry.EventListener) error { |
|
|
|
func (r *etcdRegistry) Subscribe(listener registry.EventListener) error { |
|
|
|
resp, err := r.client.Get(context.Background(), constant.ETCDV3_REGISTRY_PREFIX+r.clusterName, clientv3.WithPrefix()) |
|
|
|
if err != nil { |
|
|
|
return err |
|
|
|
} |
|
|
|
|
|
|
|
wcCh := r.client.Watch(context.Background(), constant.ETCDV3_REGISTRY_PREFIX+r.clusterName, clientv3.WithPrefix(), clientv3.WithRev(resp.Header.Revision)) |
|
|
|
wcCh := r.client.Watch(context.Background(), constant.Etcdv3RegistryPrefix+r.clusterName, clientv3.WithPrefix(), clientv3.WithRev(resp.Header.Revision)) |
|
|
|
stopChan := make(chan struct{}) |
|
|
|
r.listenersChanMap.Store(r.clusterName, stopChan) |
|
|
|
r.regWg.Add(1) |
|
|
@@ -148,14 +148,14 @@ LOOP: |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
func (r *etcdRegistry) UnSubscribe(cluster string, listener registry.EventListener) error { |
|
|
|
func (r *etcdRegistry) UnSubscribe(listener registry.EventListener) error { |
|
|
|
stopChanUnCast, ok := r.listenersChanMap.Load(constant.ETCDV3_REGISTRY_PREFIX + r.clusterName) |
|
|
|
if !ok { |
|
|
|
return errors.New("failed to unsubscribe, not matching key in the map") |
|
|
|
} |
|
|
|
stopChan, _ := stopChanUnCast.(chan struct{}) |
|
|
|
stopChan <- struct{}{} |
|
|
|
r.listenersChanMap.Delete(constant.ETCDV3_REGISTRY_PREFIX + r.clusterName) |
|
|
|
r.listenersChanMap.Delete(constant.Etcdv3RegistryPrefix + r.clusterName) |
|
|
|
return nil |
|
|
|
} |
|
|
|
|
|
|
@@ -184,14 +184,14 @@ LOOP: |
|
|
|
continue |
|
|
|
} |
|
|
|
|
|
|
|
if ttl.TTL <= constant.ETCDV3_LEASE_TTL_CRITICAL { |
|
|
|
if ttl.TTL <= constant.Etcdv3LeaseTtlCritical { |
|
|
|
_, err := r.client.KeepAliveOnce(context.Background(), *r.leaseWrp.leaseId) |
|
|
|
if err != nil { |
|
|
|
log.Warnf("failed to renew ttl, %s", err.Error()) |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
time.Sleep(time.Duration(constant.ETCDV3_LEASE_RENEW_INTERVAL) * time.Second) |
|
|
|
time.Sleep(time.Duration(constant.Etcdv3LeaseRenewInterval) * time.Second) |
|
|
|
} else { |
|
|
|
break LOOP |
|
|
|
} |
|
|
@@ -235,7 +235,7 @@ func newETCDRegistry() (registry.Registry, error) { |
|
|
|
return &etcdRegistry{}, err |
|
|
|
} |
|
|
|
|
|
|
|
resp, err := client.Grant(context.Background(), constant.ETCDV3_LEASE_TTL) |
|
|
|
resp, err := client.Grant(context.Background(), constant.Etcdv3LeaseTtl) |
|
|
|
if err != nil { |
|
|
|
return &etcdRegistry{}, err |
|
|
|
} |
|
|
@@ -256,8 +256,3 @@ func newETCDRegistry() (registry.Registry, error) { |
|
|
|
|
|
|
|
return r, nil |
|
|
|
} |
|
|
|
|
|
|
|
type pair struct { |
|
|
|
key interface{} |
|
|
|
value interface{} |
|
|
|
} |