Browse Source

feat: Implement the discovery service by Etcd (#605)

Implement the discovery service by Etcd
tags/v2.0.0
yizhibian GitHub 1 year ago
parent
commit
18e611d3a9
No known key found for this signature in database GPG Key ID: B5690EEEBB952194
8 changed files with 613 additions and 7 deletions
  1. +6
    -0
      go.mod
  2. +6
    -0
      go.sum
  3. +238
    -5
      pkg/discovery/etcd3.go
  4. +141
    -0
      pkg/discovery/etcd3_test.go
  5. +2
    -1
      pkg/discovery/init.go
  6. +20
    -1
      pkg/discovery/init_test.go
  7. +192
    -0
      pkg/discovery/mock/mock_etcd_client.go
  8. +8
    -0
      pkg/discovery/mock/test_etcd_client.go

+ 6
- 0
go.mod View File

@@ -33,6 +33,8 @@ require (
require ( require (
github.com/agiledragon/gomonkey v2.0.2+incompatible github.com/agiledragon/gomonkey v2.0.2+incompatible
github.com/agiledragon/gomonkey/v2 v2.9.0 github.com/agiledragon/gomonkey/v2 v2.9.0
go.etcd.io/etcd/api/v3 v3.5.6
go.etcd.io/etcd/client/v3 v3.5.6
) )


require ( require (
@@ -45,6 +47,8 @@ require (
github.com/bytedance/sonic v1.9.1 // indirect github.com/bytedance/sonic v1.9.1 // indirect
github.com/cespare/xxhash/v2 v2.2.0 // indirect github.com/cespare/xxhash/v2 v2.2.0 // indirect
github.com/chenzhuoyu/base64x v0.0.0-20221115062448-fe3a3abad311 // indirect github.com/chenzhuoyu/base64x v0.0.0-20221115062448-fe3a3abad311 // indirect
github.com/coreos/go-semver v0.3.0 // indirect
github.com/coreos/go-systemd/v22 v22.3.2 // indirect
github.com/creasty/defaults v1.5.2 // indirect github.com/creasty/defaults v1.5.2 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect github.com/davecgh/go-spew v1.1.1 // indirect
github.com/gabriel-vasile/mimetype v1.4.2 // indirect github.com/gabriel-vasile/mimetype v1.4.2 // indirect
@@ -52,6 +56,7 @@ require (
github.com/go-ole/go-ole v1.2.6 // indirect github.com/go-ole/go-ole v1.2.6 // indirect
github.com/go-playground/locales v0.14.1 // indirect github.com/go-playground/locales v0.14.1 // indirect
github.com/go-playground/universal-translator v0.18.1 // indirect github.com/go-playground/universal-translator v0.18.1 // indirect
github.com/gogo/protobuf v1.3.2 // indirect
github.com/golang/protobuf v1.5.3 // indirect github.com/golang/protobuf v1.5.3 // indirect
github.com/golang/snappy v0.0.4 // indirect github.com/golang/snappy v0.0.4 // indirect
github.com/gorilla/websocket v1.4.2 // indirect github.com/gorilla/websocket v1.4.2 // indirect
@@ -82,6 +87,7 @@ require (
github.com/twitchyliquid64/golang-asm v0.15.1 // indirect github.com/twitchyliquid64/golang-asm v0.15.1 // indirect
github.com/ugorji/go/codec v1.2.11 // indirect github.com/ugorji/go/codec v1.2.11 // indirect
github.com/yusufpapurcu/wmi v1.2.2 // indirect github.com/yusufpapurcu/wmi v1.2.2 // indirect
go.etcd.io/etcd/client/pkg/v3 v3.5.6 // indirect
go.uber.org/multierr v1.8.0 // indirect go.uber.org/multierr v1.8.0 // indirect
golang.org/x/arch v0.3.0 // indirect golang.org/x/arch v0.3.0 // indirect
golang.org/x/text v0.14.0 // indirect golang.org/x/text v0.14.0 // indirect


+ 6
- 0
go.sum View File

@@ -148,10 +148,12 @@ github.com/codahale/hdrhistogram v0.0.0-20161010025455-3a0bb77429bd/go.mod h1:sE
github.com/coreos/bbolt v1.3.2/go.mod h1:iRUV2dpdMOn7Bo10OQBFzIJO9kkE559Wcmn+qkEiiKk= github.com/coreos/bbolt v1.3.2/go.mod h1:iRUV2dpdMOn7Bo10OQBFzIJO9kkE559Wcmn+qkEiiKk=
github.com/coreos/etcd v3.3.13+incompatible/go.mod h1:uF7uidLiAD3TWHmW31ZFd/JWoc32PjwdhPthX9715RE= github.com/coreos/etcd v3.3.13+incompatible/go.mod h1:uF7uidLiAD3TWHmW31ZFd/JWoc32PjwdhPthX9715RE=
github.com/coreos/go-semver v0.2.0/go.mod h1:nnelYz7RCh+5ahJtPPxZlU+153eP4D4r3EedlOD2RNk= github.com/coreos/go-semver v0.2.0/go.mod h1:nnelYz7RCh+5ahJtPPxZlU+153eP4D4r3EedlOD2RNk=
github.com/coreos/go-semver v0.3.0 h1:wkHLiw0WNATZnSG7epLsujiMCgPAc9xhjJ4tgnAxmfM=
github.com/coreos/go-semver v0.3.0/go.mod h1:nnelYz7RCh+5ahJtPPxZlU+153eP4D4r3EedlOD2RNk= github.com/coreos/go-semver v0.3.0/go.mod h1:nnelYz7RCh+5ahJtPPxZlU+153eP4D4r3EedlOD2RNk=
github.com/coreos/go-systemd v0.0.0-20180511133405-39ca1b05acc7/go.mod h1:F5haX7vjVVG0kc13fIWeqUViNPyEJxv/OmvnBo0Yme4= github.com/coreos/go-systemd v0.0.0-20180511133405-39ca1b05acc7/go.mod h1:F5haX7vjVVG0kc13fIWeqUViNPyEJxv/OmvnBo0Yme4=
github.com/coreos/go-systemd v0.0.0-20190321100706-95778dfbb74e/go.mod h1:F5haX7vjVVG0kc13fIWeqUViNPyEJxv/OmvnBo0Yme4= github.com/coreos/go-systemd v0.0.0-20190321100706-95778dfbb74e/go.mod h1:F5haX7vjVVG0kc13fIWeqUViNPyEJxv/OmvnBo0Yme4=
github.com/coreos/go-systemd/v22 v22.1.0/go.mod h1:xO0FLkIi5MaZafQlIrOotqXZ90ih+1atmu1JpKERPPk= github.com/coreos/go-systemd/v22 v22.1.0/go.mod h1:xO0FLkIi5MaZafQlIrOotqXZ90ih+1atmu1JpKERPPk=
github.com/coreos/go-systemd/v22 v22.3.2 h1:D9/bQk5vlXQFZ6Kwuu6zaiXJ9oTPe68++AzAJc1DzSI=
github.com/coreos/go-systemd/v22 v22.3.2/go.mod h1:Y58oyj3AT4RCenI/lSvhwexgC+NSVTIJ3seZv2GcEnc= github.com/coreos/go-systemd/v22 v22.3.2/go.mod h1:Y58oyj3AT4RCenI/lSvhwexgC+NSVTIJ3seZv2GcEnc=
github.com/coreos/pkg v0.0.0-20160727233714-3ac0863d7acf/go.mod h1:E3G3o1h8I7cfcXa63jLwjI0eiQQMgzzUDFVpN/nH/eA= github.com/coreos/pkg v0.0.0-20160727233714-3ac0863d7acf/go.mod h1:E3G3o1h8I7cfcXa63jLwjI0eiQQMgzzUDFVpN/nH/eA=
github.com/coreos/pkg v0.0.0-20180928190104-399ea9e2e55f/go.mod h1:E3G3o1h8I7cfcXa63jLwjI0eiQQMgzzUDFVpN/nH/eA= github.com/coreos/pkg v0.0.0-20180928190104-399ea9e2e55f/go.mod h1:E3G3o1h8I7cfcXa63jLwjI0eiQQMgzzUDFVpN/nH/eA=
@@ -270,6 +272,7 @@ github.com/gogo/protobuf v1.1.1/go.mod h1:r8qH/GZQm5c6nD/R0oafs1akxWv10x8SbQlK7a
github.com/gogo/protobuf v1.2.0/go.mod h1:r8qH/GZQm5c6nD/R0oafs1akxWv10x8SbQlK7atdtwQ= github.com/gogo/protobuf v1.2.0/go.mod h1:r8qH/GZQm5c6nD/R0oafs1akxWv10x8SbQlK7atdtwQ=
github.com/gogo/protobuf v1.2.1/go.mod h1:hp+jE20tsWTFYpLwKvXlhS1hjn+gTNwPg2I6zVXpSg4= github.com/gogo/protobuf v1.2.1/go.mod h1:hp+jE20tsWTFYpLwKvXlhS1hjn+gTNwPg2I6zVXpSg4=
github.com/gogo/protobuf v1.3.1/go.mod h1:SlYgWuQ5SjCEi6WLHjHCa1yvBfUnHcTbrrZtXPKa29o= github.com/gogo/protobuf v1.3.1/go.mod h1:SlYgWuQ5SjCEi6WLHjHCa1yvBfUnHcTbrrZtXPKa29o=
github.com/gogo/protobuf v1.3.2 h1:Ov1cvc58UF3b5XjBnZv7+opcTcQFZebYjWzi34vdm4Q=
github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69NZV8Q= github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69NZV8Q=
github.com/goji/httpauth v0.0.0-20160601135302-2da839ab0f4d/go.mod h1:nnjvkQ9ptGaCkuDUx6wNykzzlUixGxvkme+H/lnzb+A= github.com/goji/httpauth v0.0.0-20160601135302-2da839ab0f4d/go.mod h1:nnjvkQ9ptGaCkuDUx6wNykzzlUixGxvkme+H/lnzb+A=
github.com/golang/freetype v0.0.0-20170609003504-e2365dfdc4a0/go.mod h1:E/TSTwGwJL78qG/PmXZO1EjYhfJinVAhrmmHX6Z8B9k= github.com/golang/freetype v0.0.0-20170609003504-e2365dfdc4a0/go.mod h1:E/TSTwGwJL78qG/PmXZO1EjYhfJinVAhrmmHX6Z8B9k=
@@ -766,12 +769,15 @@ go.etcd.io/bbolt v1.3.5/go.mod h1:G5EMThwa9y8QZGBClrRx5EY+Yw9kAhnjy3bSjsnlVTQ=
go.etcd.io/etcd v0.0.0-20191023171146-3cf2f69b5738/go.mod h1:dnLIgRNXwCJa5e+c6mIZCrds/GIG4ncV9HhK5PX7jPg= go.etcd.io/etcd v0.0.0-20191023171146-3cf2f69b5738/go.mod h1:dnLIgRNXwCJa5e+c6mIZCrds/GIG4ncV9HhK5PX7jPg=
go.etcd.io/etcd/api/v3 v3.5.0-alpha.0/go.mod h1:mPcW6aZJukV6Aa81LSKpBjQXTWlXB5r74ymPoSWa3Sw= go.etcd.io/etcd/api/v3 v3.5.0-alpha.0/go.mod h1:mPcW6aZJukV6Aa81LSKpBjQXTWlXB5r74ymPoSWa3Sw=
go.etcd.io/etcd/api/v3 v3.5.4/go.mod h1:5GB2vv4A4AOn3yk7MftYGHkUfGtDHnEraIjym4dYz5A= go.etcd.io/etcd/api/v3 v3.5.4/go.mod h1:5GB2vv4A4AOn3yk7MftYGHkUfGtDHnEraIjym4dYz5A=
go.etcd.io/etcd/api/v3 v3.5.6 h1:Cy2qx3npLcYqTKqGJzMypnMv2tiRyifZJ17BlWIWA7A=
go.etcd.io/etcd/api/v3 v3.5.6/go.mod h1:KFtNaxGDw4Yx/BA4iPPwevUTAuqcsPxzyX8PHydchN8= go.etcd.io/etcd/api/v3 v3.5.6/go.mod h1:KFtNaxGDw4Yx/BA4iPPwevUTAuqcsPxzyX8PHydchN8=
go.etcd.io/etcd/client/pkg/v3 v3.5.4/go.mod h1:IJHfcCEKxYu1Os13ZdwCwIUTUVGYTSAM3YSwc9/Ac1g= go.etcd.io/etcd/client/pkg/v3 v3.5.4/go.mod h1:IJHfcCEKxYu1Os13ZdwCwIUTUVGYTSAM3YSwc9/Ac1g=
go.etcd.io/etcd/client/pkg/v3 v3.5.6 h1:TXQWYceBKqLp4sa87rcPs11SXxUA/mHwH975v+BDvLU=
go.etcd.io/etcd/client/pkg/v3 v3.5.6/go.mod h1:ggrwbk069qxpKPq8/FKkQ3Xq9y39kbFR4LnKszpRXeQ= go.etcd.io/etcd/client/pkg/v3 v3.5.6/go.mod h1:ggrwbk069qxpKPq8/FKkQ3Xq9y39kbFR4LnKszpRXeQ=
go.etcd.io/etcd/client/v2 v2.305.0-alpha.0/go.mod h1:kdV+xzCJ3luEBSIeQyB/OEKkWKd8Zkux4sbDeANrosU= go.etcd.io/etcd/client/v2 v2.305.0-alpha.0/go.mod h1:kdV+xzCJ3luEBSIeQyB/OEKkWKd8Zkux4sbDeANrosU=
go.etcd.io/etcd/client/v3 v3.5.0-alpha.0/go.mod h1:wKt7jgDgf/OfKiYmCq5WFGxOFAkVMLxiiXgLDFhECr8= go.etcd.io/etcd/client/v3 v3.5.0-alpha.0/go.mod h1:wKt7jgDgf/OfKiYmCq5WFGxOFAkVMLxiiXgLDFhECr8=
go.etcd.io/etcd/client/v3 v3.5.4/go.mod h1:ZaRkVgBZC+L+dLCjTcF1hRXpgZXQPOvnA/Ak/gq3kiY= go.etcd.io/etcd/client/v3 v3.5.4/go.mod h1:ZaRkVgBZC+L+dLCjTcF1hRXpgZXQPOvnA/Ak/gq3kiY=
go.etcd.io/etcd/client/v3 v3.5.6 h1:coLs69PWCXE9G4FKquzNaSHrRyMCAXwF+IX1tAPVO8E=
go.etcd.io/etcd/client/v3 v3.5.6/go.mod h1:f6GRinRMCsFVv9Ht42EyY7nfsVGwrNO0WEoS2pRKzQk= go.etcd.io/etcd/client/v3 v3.5.6/go.mod h1:f6GRinRMCsFVv9Ht42EyY7nfsVGwrNO0WEoS2pRKzQk=
go.etcd.io/etcd/pkg/v3 v3.5.0-alpha.0/go.mod h1:tV31atvwzcybuqejDoY3oaNRTtlD2l/Ot78Pc9w7DMY= go.etcd.io/etcd/pkg/v3 v3.5.0-alpha.0/go.mod h1:tV31atvwzcybuqejDoY3oaNRTtlD2l/Ot78Pc9w7DMY=
go.etcd.io/etcd/raft/v3 v3.5.0-alpha.0/go.mod h1:FAwse6Zlm5v4tEWZaTjmNhe17Int4Oxbu7+2r0DiD3w= go.etcd.io/etcd/raft/v3 v3.5.0-alpha.0/go.mod h1:FAwse6Zlm5v4tEWZaTjmNhe17Int4Oxbu7+2r0DiD3w=


+ 238
- 5
pkg/discovery/etcd3.go View File

@@ -17,14 +17,247 @@


package discovery package discovery


type EtcdRegistryService struct{}
import (
"context"
"fmt"
"github.com/seata/seata-go/pkg/util/log"
etcd3 "go.etcd.io/etcd/client/v3"
"strconv"
"strings"
"sync"
)

const (
clusterNameSplitChar = "-"
addressSplitChar = ":"
etcdClusterPrefix = "registry-seata"
)

type EtcdRegistryService struct {
client *etcd3.Client
cfg etcd3.Config
vgroupMapping map[string]string
grouplist map[string][]*ServiceInstance
rwLock sync.RWMutex

stopCh chan struct{}
}

func newEtcdRegistryService(config *ServiceConfig, etcd3Config *Etcd3Config) RegistryService {

if etcd3Config == nil {
log.Fatalf("etcd config is nil")
panic("etcd config is nil")
}

cfg := etcd3.Config{
Endpoints: []string{etcd3Config.ServerAddr},
}
cli, err := etcd3.New(cfg)
if err != nil {
log.Fatalf("failed to create etcd3 client")
panic("failed to create etcd3 client")
}

vgroupMapping := config.VgroupMapping
grouplist := make(map[string][]*ServiceInstance, 0)

etcdRegistryService := &EtcdRegistryService{
client: cli,
cfg: cfg,
vgroupMapping: vgroupMapping,
grouplist: grouplist,
stopCh: make(chan struct{}),
}
go etcdRegistryService.watch(etcdClusterPrefix)

return etcdRegistryService
}

func (s *EtcdRegistryService) watch(key string) {

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

resp, err := s.client.Get(ctx, key, etcd3.WithPrefix())
if err != nil {
log.Infof("cant get server instances from etcd")
}

if resp != nil {
for _, kv := range resp.Kvs {
k := kv.Key
v := kv.Value
clusterName, err := getClusterName(k)
if err != nil {
log.Errorf("etcd key has an incorrect format: ", err)
return
}
serverInstance, err := getServerInstance(v)
if err != nil {
log.Errorf("etcd value has an incorrect format: ", err)
return
}
s.rwLock.Lock()
if s.grouplist[clusterName] == nil {
s.grouplist[clusterName] = []*ServiceInstance{serverInstance}
} else {
s.grouplist[clusterName] = append(s.grouplist[clusterName], serverInstance)
}
s.rwLock.Unlock()
}

}
// watch the changes of endpoints
watchCh := s.client.Watch(ctx, key, etcd3.WithPrefix())

for {
select {
case watchResp, ok := <-watchCh:
if !ok {
log.Warnf("Watch channel closed")
return
}
for _, event := range watchResp.Events {
switch event.Type {
case etcd3.EventTypePut:
log.Infof("Key %s updated. New value: %s\n", event.Kv.Key, event.Kv.Value)

k := event.Kv.Key
v := event.Kv.Value
clusterName, err := getClusterName(k)
if err != nil {
log.Errorf("etcd key err: ", err)
return
}
serverInstance, err := getServerInstance(v)
if err != nil {
log.Errorf("etcd value err: ", err)
return
}

s.rwLock.Lock()
if s.grouplist[clusterName] == nil {
s.grouplist[clusterName] = []*ServiceInstance{serverInstance}
s.rwLock.Unlock()
continue
}
if ifHaveSameServiceInstances(s.grouplist[clusterName], serverInstance) {
s.rwLock.Unlock()
continue
}
s.grouplist[clusterName] = append(s.grouplist[clusterName], serverInstance)
s.rwLock.Unlock()

case etcd3.EventTypeDelete:
log.Infof("Key %s deleted.\n", event.Kv.Key)

cluster, ip, port, err := getClusterAndAddress(event.Kv.Key)
if err != nil {
log.Errorf("etcd key err: ", err)
return
}

s.rwLock.Lock()
serviceInstances := s.grouplist[cluster]
if serviceInstances == nil {
log.Warnf("etcd doesnt exit cluster: ", cluster)
s.rwLock.Unlock()
continue
}
s.grouplist[cluster] = removeValueFromList(serviceInstances, ip, port)
s.rwLock.Unlock()
}
}
case <-s.stopCh:
log.Warn("stop etcd watch")
return
}
}
}

func getClusterName(key []byte) (string, error) {
stringKey := string(key)
keySplit := strings.Split(stringKey, clusterNameSplitChar)
if len(keySplit) != 4 {
return "", fmt.Errorf("etcd key has an incorrect format. key: %s", stringKey)
}

cluster := keySplit[2]
return cluster, nil
}

func getServerInstance(value []byte) (*ServiceInstance, error) {
stringValue := string(value)
valueSplit := strings.Split(stringValue, addressSplitChar)
if len(valueSplit) != 2 {
return nil, fmt.Errorf("etcd value has an incorrect format. value: %s", stringValue)
}
ip := valueSplit[0]
port, err := strconv.Atoi(valueSplit[1])
if err != nil {
return nil, fmt.Errorf("etcd port has an incorrect format. err: %w", err)
}
serverInstance := &ServiceInstance{
Addr: ip,
Port: port,
}

return serverInstance, nil
}

func getClusterAndAddress(key []byte) (string, string, int, error) {
stringKey := string(key)
keySplit := strings.Split(stringKey, clusterNameSplitChar)
if len(keySplit) != 4 {
return "", "", 0, fmt.Errorf("etcd key has an incorrect format. key: %s", stringKey)
}
cluster := keySplit[2]
address := strings.Split(keySplit[3], addressSplitChar)
ip := address[0]
port, err := strconv.Atoi(address[1])
if err != nil {
return "", "", 0, fmt.Errorf("etcd port has an incorrect format. err: %w", err)
}

return cluster, ip, port, nil
}

func ifHaveSameServiceInstances(list []*ServiceInstance, value *ServiceInstance) bool {
for _, v := range list {
if v.Addr == value.Addr && v.Port == value.Port {
return true
}
}
return false
}

func removeValueFromList(list []*ServiceInstance, ip string, port int) []*ServiceInstance {
for k, v := range list {
if v.Addr == ip && v.Port == port {
result := list[:k]
if k < len(list)-1 {
result = append(result, list[k+1:]...)
}
return result
}
}

return list
}


func (s *EtcdRegistryService) Lookup(key string) ([]*ServiceInstance, error) { func (s *EtcdRegistryService) Lookup(key string) ([]*ServiceInstance, error) {
//TODO implement me
panic("implement me")
s.rwLock.RLock()
defer s.rwLock.RUnlock()
cluster := s.vgroupMapping[key]
if cluster == "" {
return nil, fmt.Errorf("cluster doesnt exit")
}

list := s.grouplist[cluster]
return list, nil
} }


func (s *EtcdRegistryService) Close() { func (s *EtcdRegistryService) Close() {
//TODO implement me
panic("implement me")
s.stopCh <- struct{}{}
} }

+ 141
- 0
pkg/discovery/etcd3_test.go View File

@@ -0,0 +1,141 @@
package discovery

import (
"github.com/golang/mock/gomock"
"github.com/seata/seata-go/pkg/discovery/mock"
"github.com/stretchr/testify/assert"
"go.etcd.io/etcd/api/v3/mvccpb"
"go.etcd.io/etcd/client/v3"
"reflect"
"testing"
"time"
)

func TestEtcd3RegistryService_Lookup(t *testing.T) {

tests := []struct {
name string
getResp *clientv3.GetResponse
watchResp *clientv3.WatchResponse
want []*ServiceInstance
}{
{
name: "normal",
getResp: &clientv3.GetResponse{
Kvs: []*mvccpb.KeyValue{
{
Key: []byte("registry-seata-default-172.0.0.1:8091"),
Value: []byte("172.0.0.1:8091"),
},
},
},
watchResp: nil,
want: []*ServiceInstance{
{
Addr: "172.0.0.1",
Port: 8091,
},
},
},
{
name: "use watch update ServiceInstances",
getResp: nil,
watchResp: &clientv3.WatchResponse{
Events: []*clientv3.Event{
{
Type: clientv3.EventTypePut,
Kv: &mvccpb.KeyValue{
Key: []byte("registry-seata-default-172.0.0.1:8091"),
Value: []byte("172.0.0.1:8091"),
},
},
},
},
want: []*ServiceInstance{
{
Addr: "172.0.0.1",
Port: 8091,
},
},
},
{
name: "use watch del ServiceInstances",
getResp: &clientv3.GetResponse{
Kvs: []*mvccpb.KeyValue{
{
Key: []byte("registry-seata-default-172.0.0.1:8091"),
Value: []byte("172.0.0.1:8091"),
},
{
Key: []byte("registry-seata-default-172.0.0.1:8092"),
Value: []byte("172.0.0.1:8092"),
},
},
},
watchResp: &clientv3.WatchResponse{
Events: []*clientv3.Event{
{
Type: clientv3.EventTypeDelete,
Kv: &mvccpb.KeyValue{
Key: []byte("registry-seata-default-172.0.0.1:8091"),
Value: []byte("172.0.0.1:8091"),
},
},
},
},
want: []*ServiceInstance{
{
Addr: "172.0.0.1",
Port: 8092,
},
},
},
}

for _, tt := range tests {
ctrl := gomock.NewController(t)
mockEtcdClient := mock.NewMockEtcdClient(ctrl)
etcdRegistryService := &EtcdRegistryService{
client: &clientv3.Client{
KV: mockEtcdClient,
Watcher: mockEtcdClient,
},
vgroupMapping: map[string]string{
"default_tx_group": "default",
},
grouplist: make(map[string][]*ServiceInstance, 0),
stopCh: make(chan struct{}),
}

mockEtcdClient.EXPECT().Get(gomock.Any(), gomock.Any(), gomock.Any()).Return(tt.getResp, nil)
ch := make(chan clientv3.WatchResponse)
mockEtcdClient.EXPECT().Watch(gomock.Any(), gomock.Any(), gomock.Any()).Return(ch)

go func() {
etcdRegistryService.watch("registry-seata")
}()
// wait a second for watch
time.Sleep(1 * time.Second)

if tt.watchResp != nil {
go func() {
ch <- *tt.watchResp
}()
}

// wait one more second for update
time.Sleep(1 * time.Second)
serviceInstances, err := etcdRegistryService.Lookup("default_tx_group")
if err != nil {
t.Errorf("error happen when look up . err = %e", err)
}
t.Logf(tt.name)
for i := range serviceInstances {
t.Log(serviceInstances[i].Addr)
t.Log(serviceInstances[i].Port)
}
assert.True(t, reflect.DeepEqual(serviceInstances, tt.want))

etcdRegistryService.Close()
}
}

+ 2
- 1
pkg/discovery/init.go View File

@@ -33,7 +33,8 @@ func InitRegistry(serviceConfig *ServiceConfig, registryConfig *RegistryConfig)
//init file registry //init file registry
registryService = newFileRegistryService(serviceConfig) registryService = newFileRegistryService(serviceConfig)
case ETCD: case ETCD:
//TODO: init etcd registry
//init etcd registry
registryService = newEtcdRegistryService(serviceConfig, &registryConfig.Etcd3)
case NACOS: case NACOS:
//TODO: init nacos registry //TODO: init nacos registry
case EUREKA: case EUREKA:


+ 20
- 1
pkg/discovery/init_test.go View File

@@ -34,7 +34,7 @@ func TestInitRegistry(t *testing.T) {
expectedType string expectedType string
}{ }{
{ {
name: "normal",
name: "file",
args: args{ args: args{
registryConfig: &RegistryConfig{ registryConfig: &RegistryConfig{
Type: FILE, Type: FILE,
@@ -43,6 +43,25 @@ func TestInitRegistry(t *testing.T) {
}, },
expectedType: "FileRegistryService", expectedType: "FileRegistryService",
}, },
{
name: "etcd",
args: args{
serviceConfig: &ServiceConfig{
VgroupMapping: map[string]string{
"default_tx_group": "default",
},
},
registryConfig: &RegistryConfig{
Type: ETCD,
Etcd3: Etcd3Config{
ServerAddr: "127.0.0.1:2379",
Cluster: "default",
},
},
},
hasPanic: false,
expectedType: "EtcdRegistryService",
},
{ {
name: "unknown type", name: "unknown type",
args: args{ args: args{


+ 192
- 0
pkg/discovery/mock/mock_etcd_client.go View File

@@ -0,0 +1,192 @@
// Code generated by MockGen. DO NOT EDIT.
// Source: test_etcd_client.go

// Package mock is a generated GoMock package.
package mock

import (
context "context"
reflect "reflect"

gomock "github.com/golang/mock/gomock"
clientv3 "go.etcd.io/etcd/client/v3"
)

// MockEtcdClient is a mock of EtcdClient interface.
type MockEtcdClient struct {
ctrl *gomock.Controller
recorder *MockEtcdClientMockRecorder
}

// MockEtcdClientMockRecorder is the mock recorder for MockEtcdClient.
type MockEtcdClientMockRecorder struct {
mock *MockEtcdClient
}

// NewMockEtcdClient creates a new mock instance.
func NewMockEtcdClient(ctrl *gomock.Controller) *MockEtcdClient {
mock := &MockEtcdClient{ctrl: ctrl}
mock.recorder = &MockEtcdClientMockRecorder{mock}
return mock
}

// EXPECT returns an object that allows the caller to indicate expected use.
func (m *MockEtcdClient) EXPECT() *MockEtcdClientMockRecorder {
return m.recorder
}

// Close mocks base method.
func (m *MockEtcdClient) Close() error {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "Close")
ret0, _ := ret[0].(error)
return ret0
}

// Close indicates an expected call of Close.
func (mr *MockEtcdClientMockRecorder) Close() *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Close", reflect.TypeOf((*MockEtcdClient)(nil).Close))
}

// Compact mocks base method.
func (m *MockEtcdClient) Compact(ctx context.Context, rev int64, opts ...clientv3.CompactOption) (*clientv3.CompactResponse, error) {
m.ctrl.T.Helper()
varargs := []interface{}{ctx, rev}
for _, a := range opts {
varargs = append(varargs, a)
}
ret := m.ctrl.Call(m, "Compact", varargs...)
ret0, _ := ret[0].(*clientv3.CompactResponse)
ret1, _ := ret[1].(error)
return ret0, ret1
}

// Compact indicates an expected call of Compact.
func (mr *MockEtcdClientMockRecorder) Compact(ctx, rev interface{}, opts ...interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper()
varargs := append([]interface{}{ctx, rev}, opts...)
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Compact", reflect.TypeOf((*MockEtcdClient)(nil).Compact), varargs...)
}

// Delete mocks base method.
func (m *MockEtcdClient) Delete(ctx context.Context, key string, opts ...clientv3.OpOption) (*clientv3.DeleteResponse, error) {
m.ctrl.T.Helper()
varargs := []interface{}{ctx, key}
for _, a := range opts {
varargs = append(varargs, a)
}
ret := m.ctrl.Call(m, "Delete", varargs...)
ret0, _ := ret[0].(*clientv3.DeleteResponse)
ret1, _ := ret[1].(error)
return ret0, ret1
}

// Delete indicates an expected call of Delete.
func (mr *MockEtcdClientMockRecorder) Delete(ctx, key interface{}, opts ...interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper()
varargs := append([]interface{}{ctx, key}, opts...)
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Delete", reflect.TypeOf((*MockEtcdClient)(nil).Delete), varargs...)
}

// Do mocks base method.
func (m *MockEtcdClient) Do(ctx context.Context, op clientv3.Op) (clientv3.OpResponse, error) {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "Do", ctx, op)
ret0, _ := ret[0].(clientv3.OpResponse)
ret1, _ := ret[1].(error)
return ret0, ret1
}

// Do indicates an expected call of Do.
func (mr *MockEtcdClientMockRecorder) Do(ctx, op interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Do", reflect.TypeOf((*MockEtcdClient)(nil).Do), ctx, op)
}

// Get mocks base method.
func (m *MockEtcdClient) Get(ctx context.Context, key string, opts ...clientv3.OpOption) (*clientv3.GetResponse, error) {
m.ctrl.T.Helper()
varargs := []interface{}{ctx, key}
for _, a := range opts {
varargs = append(varargs, a)
}
ret := m.ctrl.Call(m, "Get", varargs...)
ret0, _ := ret[0].(*clientv3.GetResponse)
ret1, _ := ret[1].(error)
return ret0, ret1
}

// Get indicates an expected call of Get.
func (mr *MockEtcdClientMockRecorder) Get(ctx, key interface{}, opts ...interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper()
varargs := append([]interface{}{ctx, key}, opts...)
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Get", reflect.TypeOf((*MockEtcdClient)(nil).Get), varargs...)
}

// Put mocks base method.
func (m *MockEtcdClient) Put(ctx context.Context, key, val string, opts ...clientv3.OpOption) (*clientv3.PutResponse, error) {
m.ctrl.T.Helper()
varargs := []interface{}{ctx, key, val}
for _, a := range opts {
varargs = append(varargs, a)
}
ret := m.ctrl.Call(m, "Put", varargs...)
ret0, _ := ret[0].(*clientv3.PutResponse)
ret1, _ := ret[1].(error)
return ret0, ret1
}

// Put indicates an expected call of Put.
func (mr *MockEtcdClientMockRecorder) Put(ctx, key, val interface{}, opts ...interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper()
varargs := append([]interface{}{ctx, key, val}, opts...)
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Put", reflect.TypeOf((*MockEtcdClient)(nil).Put), varargs...)
}

// RequestProgress mocks base method.
func (m *MockEtcdClient) RequestProgress(ctx context.Context) error {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "RequestProgress", ctx)
ret0, _ := ret[0].(error)
return ret0
}

// RequestProgress indicates an expected call of RequestProgress.
func (mr *MockEtcdClientMockRecorder) RequestProgress(ctx interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "RequestProgress", reflect.TypeOf((*MockEtcdClient)(nil).RequestProgress), ctx)
}

// Txn mocks base method.
func (m *MockEtcdClient) Txn(ctx context.Context) clientv3.Txn {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "Txn", ctx)
ret0, _ := ret[0].(clientv3.Txn)
return ret0
}

// Txn indicates an expected call of Txn.
func (mr *MockEtcdClientMockRecorder) Txn(ctx interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Txn", reflect.TypeOf((*MockEtcdClient)(nil).Txn), ctx)
}

// Watch mocks base method.
func (m *MockEtcdClient) Watch(ctx context.Context, key string, opts ...clientv3.OpOption) clientv3.WatchChan {
m.ctrl.T.Helper()
varargs := []interface{}{ctx, key}
for _, a := range opts {
varargs = append(varargs, a)
}
ret := m.ctrl.Call(m, "Watch", varargs...)
ret0, _ := ret[0].(clientv3.WatchChan)
return ret0
}

// Watch indicates an expected call of Watch.
func (mr *MockEtcdClientMockRecorder) Watch(ctx, key interface{}, opts ...interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper()
varargs := append([]interface{}{ctx, key}, opts...)
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Watch", reflect.TypeOf((*MockEtcdClient)(nil).Watch), varargs...)
}

+ 8
- 0
pkg/discovery/mock/test_etcd_client.go View File

@@ -0,0 +1,8 @@
package mock

import "go.etcd.io/etcd/client/v3"

type EtcdClient interface {
clientv3.KV
clientv3.Watcher
}

Loading…
Cancel
Save