Browse Source

feat: new feature for service discovery (#574)

* feat: new feature for service discovery

1.Completed the code for organizing the workflow.
2.Implemented FileRegistryService based on local configuration files.

* style: go imports

* refactor: avoid the uncertainty of SQL field order in unit test.

refactor select_for_update_executor_test.go to avoid the uncertainty of SQL field order causing test code to pass and fail sometimes.
tags/v2.0.0
liushao GitHub 2 years ago
parent
commit
eed0557c77
No known key found for this signature in database GPG Key ID: 4AEE18F83AFDEB23
20 changed files with 792 additions and 42 deletions
  1. +9
    -1
      pkg/client/client.go
  2. +4
    -1
      pkg/client/config.go
  3. +14
    -0
      pkg/client/config_test.go
  4. +2
    -1
      pkg/datasource/sql/exec/at/select_for_update_executor_test.go
  5. +39
    -0
      pkg/discovery/base.go
  6. +92
    -0
      pkg/discovery/config.go
  7. +30
    -0
      pkg/discovery/consul.go
  8. +30
    -0
      pkg/discovery/etcd3.go
  9. +30
    -0
      pkg/discovery/eureka.go
  10. +88
    -0
      pkg/discovery/file.go
  11. +178
    -0
      pkg/discovery/file_test.go
  12. +61
    -0
      pkg/discovery/init.go
  13. +78
    -0
      pkg/discovery/init_test.go
  14. +30
    -0
      pkg/discovery/nacos.go
  15. +30
    -0
      pkg/discovery/redis.go
  16. +30
    -0
      pkg/discovery/sofa.go
  17. +30
    -0
      pkg/discovery/zk.go
  18. +8
    -17
      pkg/remoting/getty/rpc_client.go
  19. +0
    -16
      pkg/tm/config.go
  20. +9
    -6
      testdata/conf/seatago.yml

+ 9
- 1
pkg/client/client.go View File

@@ -23,6 +23,7 @@ import (
"github.com/seata/seata-go/pkg/datasource"
at "github.com/seata/seata-go/pkg/datasource/sql"
"github.com/seata/seata-go/pkg/datasource/sql/exec/config"
"github.com/seata/seata-go/pkg/discovery"
"github.com/seata/seata-go/pkg/integration"
remoteConfig "github.com/seata/seata-go/pkg/remoting/config"
"github.com/seata/seata-go/pkg/remoting/getty"
@@ -41,7 +42,7 @@ func Init() {
// InitPath init client with config path
func InitPath(configFilePath string) {
cfg := LoadPath(configFilePath)
initRegistry(cfg)
initRmClient(cfg)
initTmClient(cfg)
initDatasource()
@@ -51,6 +52,7 @@ var (
onceInitTmClient sync.Once
onceInitRmClient sync.Once
onceInitDatasource sync.Once
onceInitRegistry sync.Once
)

// InitTmClient init client tm client
@@ -95,3 +97,9 @@ func initDatasource() {
datasource.Init()
})
}

func initRegistry(cfg *Config) {
onceInitRegistry.Do(func() {
discovery.InitRegistry(&cfg.ServiceConfig, &cfg.RegistryConfig)
})
}

+ 4
- 1
pkg/client/config.go View File

@@ -31,6 +31,7 @@ import (
"github.com/knadh/koanf/parsers/toml"
"github.com/knadh/koanf/parsers/yaml"
"github.com/knadh/koanf/providers/rawbytes"
"github.com/seata/seata-go/pkg/discovery"

"github.com/seata/seata-go/pkg/datasource/sql"
"github.com/seata/seata-go/pkg/datasource/sql/undo"
@@ -81,7 +82,8 @@ type Config struct {
ClientConfig ClientConfig `yaml:"client" json:"client" koanf:"client"`
GettyConfig remoteConfig.Config `yaml:"getty" json:"getty" koanf:"getty"`
TransportConfig remoteConfig.TransportConfig `yaml:"transport" json:"transport" koanf:"transport"`
ServiceConfig tm.ServiceConfig `yaml:"service" json:"service" koanf:"service"`
ServiceConfig discovery.ServiceConfig `yaml:"service" json:"service" koanf:"service"`
RegistryConfig discovery.RegistryConfig `yaml:"registry" json:"registry" koanf:"registry"`
}

func (c *Config) RegisterFlags(f *flag.FlagSet) {
@@ -98,6 +100,7 @@ func (c *Config) RegisterFlags(f *flag.FlagSet) {
c.ClientConfig.RegisterFlagsWithPrefix("client", f)
c.GettyConfig.RegisterFlagsWithPrefix("getty", f)
c.TransportConfig.RegisterFlagsWithPrefix("transport", f)
c.RegistryConfig.RegisterFlagsWithPrefix("registry", f)
c.ServiceConfig.RegisterFlagsWithPrefix("service", f)
}



+ 14
- 0
pkg/client/config_test.go View File

@@ -112,6 +112,20 @@ func TestLoadPath(t *testing.T) {
assert.Equal(t, "default", cfg.ServiceConfig.VgroupMapping["default_tx_group"])
assert.Equal(t, "127.0.0.1:8091", cfg.ServiceConfig.Grouplist["default"])

assert.NotNil(t, cfg.RegistryConfig)
assert.Equal(t, "file", cfg.RegistryConfig.Type)
assert.Equal(t, "seatago.yml", cfg.RegistryConfig.File.Name)
assert.Equal(t, "seata-server", cfg.RegistryConfig.Nacos.Application)
assert.Equal(t, "127.0.0.1:8848", cfg.RegistryConfig.Nacos.ServerAddr)
assert.Equal(t, "SEATA_GROUP", cfg.RegistryConfig.Nacos.Group)
assert.Equal(t, "test-namespace", cfg.RegistryConfig.Nacos.Namespace)
assert.Equal(t, "test-username", cfg.RegistryConfig.Nacos.Username)
assert.Equal(t, "test-password", cfg.RegistryConfig.Nacos.Password)
assert.Equal(t, "test-access-key", cfg.RegistryConfig.Nacos.AccessKey)
assert.Equal(t, "test-secret-key", cfg.RegistryConfig.Nacos.SecretKey)
assert.Equal(t, "default", cfg.RegistryConfig.Etcd3.Cluster)
assert.Equal(t, "http://localhost:2379", cfg.RegistryConfig.Etcd3.ServerAddr)

// reset flag.CommandLine
flag.CommandLine = flag.NewFlagSet(os.Args[0], flag.ExitOnError)
}


+ 2
- 1
pkg/datasource/sql/exec/at/select_for_update_executor_test.go View File

@@ -44,7 +44,8 @@ func TestBuildSelectPKSQL(t *testing.T) {
ctx, err := parser.DoParser(sql)

metaData := types.TableMeta{
TableName: "t_user",
TableName: "t_user",
ColumnNames: []string{"id", "order_id", "age"},
Indexs: map[string]types.IndexMeta{
"id": {
IType: types.IndexTypePrimaryKey,


+ 39
- 0
pkg/discovery/base.go View File

@@ -0,0 +1,39 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package discovery

const (
FILE string = "file"
NACOS string = "nacos"
ETCD string = "etcd"
EUREKA string = "eureka"
REDIS string = "redis"
ZK string = "zk"
CONSUL string = "consul"
SOFA string = "sofa"
)

type ServiceInstance struct {
Addr string
Port int
}

type RegistryService interface {
Lookup(key string) ([]*ServiceInstance, error)
Close()
}

+ 92
- 0
pkg/discovery/config.go View File

@@ -0,0 +1,92 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package discovery

import (
"flag"

"github.com/seata/seata-go/pkg/util/flagext"
)

type ServiceConfig struct {
VgroupMapping flagext.StringMap `yaml:"vgroup-mapping" json:"vgroup-mapping" koanf:"vgroup-mapping"`
Grouplist flagext.StringMap `yaml:"grouplist" json:"grouplist" koanf:"grouplist"`
EnableDegrade bool `yaml:"enable-degrade" json:"enable-degrade" koanf:"enable-degrade"`
DisableGlobalTransaction bool `yaml:"disable-global-transaction" json:"disable-global-transaction" koanf:"disable-global-transaction"`
}

func (cfg *ServiceConfig) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) {
f.BoolVar(&cfg.EnableDegrade, prefix+".enable-degrade", false, "degrade current not support.")
f.BoolVar(&cfg.DisableGlobalTransaction, prefix+".disable-global-transaction", false, "disable globalTransaction.")
f.Var(&cfg.VgroupMapping, prefix+".vgroup-mapping", "The vgroup mapping.")
f.Var(&cfg.Grouplist, prefix+".grouplist", "The group list.")
}

type RegistryConfig struct {
Type string `yaml:"type" json:"type" koanf:"type"`
File FileConfig `yaml:"file" json:"file" koanf:"file"`
Nacos NacosConfig `yaml:"nacos" json:"nacos" koanf:"nacos"`
Etcd3 Etcd3Config `yaml:"etcd3" json:"etcd3" koanf:"etcd3"`
}

func (cfg *RegistryConfig) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) {
f.StringVar(&cfg.Type, prefix+".type", "file", "The registry type.")
cfg.File.RegisterFlagsWithPrefix(prefix+".file", f)
cfg.Nacos.RegisterFlagsWithPrefix(prefix+".nacos", f)
cfg.Etcd3.RegisterFlagsWithPrefix(prefix+".etcd3", f)
}

type FileConfig struct {
Name string `yaml:"name" json:"name" koanf:"name"`
}

func (cfg *FileConfig) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) {
f.StringVar(&cfg.Name, prefix+".name", "registry.conf", "The file name of registry.")
}

type NacosConfig struct {
Application string `yaml:"application" json:"application" koanf:"application"`
ServerAddr string `yaml:"server-addr" json:"server-addr" koanf:"server-addr"`
Group string `yaml:"group" json:"group" koanf:"group"`
Namespace string `yaml:"namespace" json:"namespace" koanf:"namespace"`
Username string `yaml:"username" json:"username" koanf:"username"`
Password string `yaml:"password" json:"password" koanf:"password"`
AccessKey string `yaml:"access-key" json:"access-key" koanf:"access-key"`
SecretKey string `yaml:"secret-key" json:"secret-key" koanf:"secret-key"`
}

func (cfg *NacosConfig) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) {
f.StringVar(&cfg.Application, prefix+".application", "seata", "The application name of registry.")
f.StringVar(&cfg.ServerAddr, prefix+".server-addr", "", "The server address of registry.")
f.StringVar(&cfg.Group, prefix+".group", "SEATA_GROUP", "The group of registry.")
f.StringVar(&cfg.Namespace, prefix+".namespace", "", "The namespace of registry.")
f.StringVar(&cfg.Username, prefix+".username", "", "The username of registry.")
f.StringVar(&cfg.Password, prefix+".password", "", "The password of registry.")
f.StringVar(&cfg.AccessKey, prefix+".access-key", "", "The access key of registry.")
f.StringVar(&cfg.SecretKey, prefix+".secret-key", "", "The secret key of registry.")
}

type Etcd3Config struct {
Cluster string `yaml:"cluster" json:"cluster" koanf:"cluster"`
ServerAddr string `yaml:"server-addr" json:"server-addr" koanf:"server-addr"`
}

func (cfg *Etcd3Config) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) {
f.StringVar(&cfg.Cluster, prefix+".cluster", "default", "The server address of registry.")
f.StringVar(&cfg.ServerAddr, prefix+".server-addr", "http://localhost:2379", "The server address of registry.")
}

+ 30
- 0
pkg/discovery/consul.go View File

@@ -0,0 +1,30 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package discovery

type ConsulRegistryService struct{}

func (s *ConsulRegistryService) Lookup(key string) ([]*ServiceInstance, error) {
//TODO implement me
panic("implement me")
}

func (s *ConsulRegistryService) Close() {
//TODO implement me
panic("implement me")
}

+ 30
- 0
pkg/discovery/etcd3.go View File

@@ -0,0 +1,30 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package discovery

type EtcdRegistryService struct{}

func (s *EtcdRegistryService) Lookup(key string) ([]*ServiceInstance, error) {
//TODO implement me
panic("implement me")
}

func (s *EtcdRegistryService) Close() {
//TODO implement me
panic("implement me")
}

+ 30
- 0
pkg/discovery/eureka.go View File

@@ -0,0 +1,30 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package discovery

type EurekaRegistryService struct{}

func (s *EurekaRegistryService) Lookup(key string) ([]*ServiceInstance, error) {
//TODO implement me
panic("implement me")
}

func (s *EurekaRegistryService) Close() {
//TODO implement me
panic("implement me")
}

+ 88
- 0
pkg/discovery/file.go View File

@@ -0,0 +1,88 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package discovery

import (
"fmt"
"strconv"
"strings"

"github.com/seata/seata-go/pkg/util/log"
)

const (
endPointSplitChar = ";"
ipPortSplitChar = ":"
)

type FileRegistryService struct {
serviceConfig *ServiceConfig
}

func newFileRegistryService(config *ServiceConfig) RegistryService {
if config == nil {
log.Fatalf("service config is nil")
panic("service config is nil")
}
return &FileRegistryService{
serviceConfig: config,
}
}

func (s *FileRegistryService) Lookup(key string) ([]*ServiceInstance, error) {
var group string
if v, ok := s.serviceConfig.VgroupMapping[key]; ok {
group = v
}
if group == "" {
log.Errorf("vgroup is empty. key: %s", key)
return nil, fmt.Errorf("vgroup is empty. key: %s", key)
}

var addrStr string
if v, ok := s.serviceConfig.Grouplist[group]; ok {
addrStr = v
}
if addrStr == "" {
log.Errorf("endpoint is empty. key: %s group: %s", group)
return nil, fmt.Errorf("endpoint is empty. key: %s group: %s", key, group)
}

addrs := strings.Split(addrStr, endPointSplitChar)
instances := make([]*ServiceInstance, 0)
for _, addr := range addrs {
ipPort := strings.Split(addr, ipPortSplitChar)
if len(ipPort) != 2 {
return nil, fmt.Errorf("endpoint format should like ip:port. endpoint: %s", addr)
}
ip := ipPort[0]
port, err := strconv.Atoi(ipPort[1])
if err != nil {
return nil, err
}
instances = append(instances, &ServiceInstance{
Addr: ip,
Port: port,
})
}
return instances, nil
}

func (s *FileRegistryService) Close() {

}

+ 178
- 0
pkg/discovery/file_test.go View File

@@ -0,0 +1,178 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package discovery

import (
"reflect"
"testing"
)

func TestFileRegistryService_Lookup(t *testing.T) {
type fields struct {
serviceConfig *ServiceConfig
}
type args struct {
key string
}
tests := []struct {
name string
fields fields
args args
want []*ServiceInstance
wantErr bool
wantErrMsg string
}{
{
name: "normal single endpoint.",
args: args{
key: "default_tx_group",
},
fields: fields{
serviceConfig: &ServiceConfig{
VgroupMapping: map[string]string{
"default_tx_group": "default",
},
Grouplist: map[string]string{
"default": "127.0.0.1:8091",
},
},
},
want: []*ServiceInstance{
{
Addr: "127.0.0.1",
Port: 8091,
},
},
wantErr: false,
},
{
name: "normal multi endpoints.",
args: args{
key: "default_tx_group",
},
fields: fields{
serviceConfig: &ServiceConfig{
VgroupMapping: map[string]string{
"default_tx_group": "default",
},
Grouplist: map[string]string{
"default": "127.0.0.1:8091;192.168.0.1:8092",
},
},
},
want: []*ServiceInstance{
{
Addr: "127.0.0.1",
Port: 8091,
},
{
Addr: "192.168.0.1",
Port: 8092,
},
},
wantErr: false,
},
{
name: "vgroup is empty.",
args: args{
key: "default_tx_group",
},
fields: fields{
serviceConfig: &ServiceConfig{
VgroupMapping: map[string]string{
"default_tx_group": "",
},
},
},
want: nil,
wantErr: true,
wantErrMsg: "vgroup is empty. key: default_tx_group",
},
{
name: "endpoint is empty.",
args: args{
key: "default_tx_group",
},
fields: fields{
serviceConfig: &ServiceConfig{
VgroupMapping: map[string]string{
"default_tx_group": "default",
},
},
},
want: nil,
wantErr: true,
wantErrMsg: "endpoint is empty. key: default_tx_group group: default",
},
{
name: "format is not ip:port",
args: args{
key: "default_tx_group",
},
fields: fields{
serviceConfig: &ServiceConfig{
VgroupMapping: map[string]string{
"default_tx_group": "default",
},
Grouplist: map[string]string{
"default": "127.0.0.18091",
},
},
},
want: nil,
wantErr: true,
wantErrMsg: "endpoint format should like ip:port. endpoint: 127.0.0.18091",
},
{
name: "port is not number",
args: args{
key: "default_tx_group",
},
fields: fields{
serviceConfig: &ServiceConfig{
VgroupMapping: map[string]string{
"default_tx_group": "default",
},
Grouplist: map[string]string{
"default": "127.0.0.1:abc",
},
},
},
want: nil,
wantErr: true,
wantErrMsg: "strconv.Atoi: parsing \"abc\": invalid syntax",
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
s := &FileRegistryService{
serviceConfig: tt.fields.serviceConfig,
}
got, err := s.Lookup(tt.args.key)
if (err != nil) != tt.wantErr {
t.Errorf("Lookup() error = %v, wantErr = %v", err, tt.wantErr)
}
if tt.wantErr && err.Error() != tt.wantErrMsg {
t.Errorf("Lookup() errMsg = %v, wantErrMsg = %v", err.Error(), tt.wantErrMsg)
}
if !tt.wantErr && !reflect.DeepEqual(got, tt.want) {
t.Errorf("Lookup() got = %v, want = %v", got, tt.want)
}
})
}
}

+ 61
- 0
pkg/discovery/init.go View File

@@ -0,0 +1,61 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package discovery

import (
"fmt"
)

var (
registryServiceInstance RegistryService
)

func InitRegistry(serviceConfig *ServiceConfig, registryConfig *RegistryConfig) {
var registryService RegistryService
var err error
switch registryConfig.Type {
case FILE:
//init file registry
registryService = newFileRegistryService(serviceConfig)
case ETCD:
//TODO: init etcd registry
case NACOS:
//TODO: init nacos registry
case EUREKA:
//TODO: init eureka registry
case REDIS:
//TODO: init redis registry
case ZK:
//TODO: init zk registry
case CONSUL:
//TODO: init consul registry
case SOFA:
//TODO: init sofa registry
default:
err = fmt.Errorf("service registry not support registry type:%s", registryConfig.Type)
}

if err != nil {
panic(fmt.Errorf("init service registry err:%v", err))
}
registryServiceInstance = registryService
}

func GetRegistry() RegistryService {
return registryServiceInstance
}

+ 78
- 0
pkg/discovery/init_test.go View File

@@ -0,0 +1,78 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package discovery

import (
"reflect"
"testing"
)

func TestInitRegistry(t *testing.T) {
type args struct {
serviceConfig *ServiceConfig
registryConfig *RegistryConfig
}
tests := []struct {
name string
args args
hasPanic bool
expectedType string
}{
{
name: "normal",
args: args{
registryConfig: &RegistryConfig{
Type: FILE,
},
serviceConfig: &ServiceConfig{},
},
expectedType: "FileRegistryService",
},
{
name: "unknown type",
args: args{
registryConfig: &RegistryConfig{
Type: "unknown",
},
serviceConfig: &ServiceConfig{},
},
hasPanic: true,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
defer func() {
if r := recover(); r != nil {
if !tt.hasPanic {
t.Errorf("panic is not expected!")
}
} else if tt.hasPanic {
t.Errorf("Expected a panic but did not receive one")
}
}()
InitRegistry(tt.args.serviceConfig, tt.args.registryConfig)
instance := GetRegistry()
if !tt.hasPanic {
actualType := reflect.TypeOf(instance).Elem().Name()
if actualType != tt.expectedType {
t.Errorf("type = %v, want %v", actualType, tt.expectedType)
}
}
})
}
}

+ 30
- 0
pkg/discovery/nacos.go View File

@@ -0,0 +1,30 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package discovery

type NacosRegistryService struct{}

func (s *NacosRegistryService) Lookup(key string) ([]*ServiceInstance, error) {
//TODO implement me
panic("implement me")
}

func (NacosRegistryService) Close() {
//TODO implement me
panic("implement me")
}

+ 30
- 0
pkg/discovery/redis.go View File

@@ -0,0 +1,30 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package discovery

type RedisRegistryService struct{}

func (s *RedisRegistryService) Lookup(key string) ([]*ServiceInstance, error) {
//TODO implement me
panic("implement me")
}

func (RedisRegistryService) Close() {
//TODO implement me
panic("implement me")
}

+ 30
- 0
pkg/discovery/sofa.go View File

@@ -0,0 +1,30 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package discovery

type SofaRegistryService struct{}

func (s *SofaRegistryService) Lookup(key string) ([]*ServiceInstance, error) {
//TODO implement me
panic("implement me")
}

func (s *SofaRegistryService) Close() {
//TODO implement me
panic("implement me")
}

+ 30
- 0
pkg/discovery/zk.go View File

@@ -0,0 +1,30 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package discovery

type ZkRegistryService struct{}

func (s *ZkRegistryService) Lookup(key string) ([]*ServiceInstance, error) {
//TODO implement me
panic("implement me")
}

func (s *ZkRegistryService) Close() {
//TODO implement me
panic("implement me")
}

+ 8
- 17
pkg/remoting/getty/rpc_client.go View File

@@ -21,12 +21,11 @@ import (
"crypto/tls"
"fmt"
"net"
"strings"
"sync"

getty "github.com/apache/dubbo-getty"
gxsync "github.com/dubbogo/gost/sync"
"github.com/seata/seata-go/pkg/discovery"
"github.com/seata/seata-go/pkg/protocol/codec"
"github.com/seata/seata-go/pkg/remoting/config"
"github.com/seata/seata-go/pkg/util/log"
@@ -57,7 +56,7 @@ func (c *RpcClient) init() {
}
for _, address := range addressList {
gettyClient := getty.NewTCPClient(
getty.WithServerAddress(address),
getty.WithServerAddress(fmt.Sprintf("%s:%d", address.Addr, address.Port)),
// todo if read c.gettyConf.ConnectionNum, will cause the connect to fail
getty.WithConnectionNumber(1),
getty.WithReconnectInterval(c.gettyConf.ReconnectInterval),
@@ -68,21 +67,13 @@ func (c *RpcClient) init() {
}
}

func (c *RpcClient) getAvailServerList() []string {
defaultAddressList := []string{"127.0.0.1:8091"}
txServiceGroup := c.seataConf.TxServiceGroup
if txServiceGroup == "" {
return defaultAddressList
}
clusterName := c.seataConf.ServiceVgroupMapping[txServiceGroup]
if clusterName == "" {
return defaultAddressList
}
grouplist := c.seataConf.ServiceGrouplist[clusterName]
if grouplist == "" {
return defaultAddressList
func (c *RpcClient) getAvailServerList() []*discovery.ServiceInstance {
registryService := discovery.GetRegistry()
instances, err := registryService.Lookup(c.seataConf.TxServiceGroup)
if err != nil {
return nil
}
return strings.Split(grouplist, ",")
return instances
}

func (c *RpcClient) newSession(session getty.Session) error {


+ 0
- 16
pkg/tm/config.go View File

@@ -20,8 +20,6 @@ package tm
import (
"flag"
"time"

"github.com/seata/seata-go/pkg/util/flagext"
)

type TmConfig struct {
@@ -43,17 +41,3 @@ func (cfg *TmConfig) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) {
f.DurationVar(&cfg.DegradeCheckAllowTimes, prefix+".degrade-check-allow-times", 10*time.Second, "The duration allowed for degrade checking.")
f.IntVar(&cfg.InterceptorOrder, prefix+".interceptor-order", -2147482648, "The order of interceptor.")
}

type ServiceConfig struct {
VgroupMapping flagext.StringMap `yaml:"vgroup-mapping" json:"vgroup-mapping" koanf:"vgroup-mapping"`
Grouplist flagext.StringMap `yaml:"grouplist" json:"grouplist" koanf:"grouplist"`
EnableDegrade bool `yaml:"enable-degrade" json:"enable-degrade" koanf:"enable-degrade"`
DisableGlobalTransaction bool `yaml:"disable-global-transaction" json:"disable-global-transaction" koanf:"disable-global-transaction"`
}

func (cfg *ServiceConfig) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) {
f.BoolVar(&cfg.EnableDegrade, prefix+".enable-degrade", false, "degrade current not support.")
f.BoolVar(&cfg.DisableGlobalTransaction, prefix+".disable-global-transaction", false, "disable globalTransaction.")
f.Var(&cfg.VgroupMapping, prefix+".vgroup-mapping", "The vgroup mapping.")
f.Var(&cfg.Grouplist, prefix+".grouplist", "The group list.")
}

+ 9
- 6
testdata/conf/seatago.yml View File

@@ -122,17 +122,20 @@ seata:
registry:
type: file
file:
name: registry.conf
name: seatago.yml
nacos:
application: seata-server
server-addr: 127.0.0.1:8848
group: "SEATA_GROUP"
namespace: ""
username: ""
password: ""
namespace: "test-namespace"
username: "test-username"
password: "test-password"
##if use MSE Nacos with auth, mutex with username/password attribute #
#access-key: "" #
#secret-key: "" #
access-key: "test-access-key" #
secret-key: "test-secret-key" #
etcd3:
cluster: "default"
server-addr: "http://localhost:2379"
log:
exception-rate: 100
tcc:


Loading…
Cancel
Save