Browse Source

support ipv6 (#786)

* support ipv6

* add license

* support ipv6

* support ipv6

---------

Co-authored-by: JayLiu <38887641+luky116@users.noreply.github.com>
Co-authored-by: FengZhang <zfcode@qq.com>
tags/v2.0.0-rc01
lxfeng1997 GitHub 4 months ago
parent
commit
a0d485fc6a
No known key found for this signature in database GPG Key ID: B5690EEEBB952194
9 changed files with 327 additions and 41 deletions
  1. +7
    -12
      pkg/discovery/etcd3.go
  2. +18
    -0
      pkg/discovery/etcd3_test.go
  3. +4
    -9
      pkg/discovery/file.go
  4. +70
    -1
      pkg/discovery/file_test.go
  5. +2
    -1
      pkg/remoting/getty/session_manager.go
  6. +30
    -18
      pkg/remoting/loadbalance/xid_loadbalance.go
  7. +6
    -0
      pkg/remoting/loadbalance/xid_loadbalance_test.go
  8. +59
    -0
      pkg/util/net/address_validator.go
  9. +131
    -0
      pkg/util/net/address_validator_test.go

+ 7
- 12
pkg/discovery/etcd3.go View File

@@ -20,11 +20,13 @@ package discovery
import (
"context"
"fmt"
etcd3 "go.etcd.io/etcd/client/v3"
"seata.apache.org/seata-go/pkg/util/log"
"strconv"
"strings"
"sync"

etcd3 "go.etcd.io/etcd/client/v3"

"seata.apache.org/seata-go/pkg/util/log"
"seata.apache.org/seata-go/pkg/util/net"
)

const (
@@ -189,12 +191,7 @@ func getClusterName(key []byte) (string, error) {

func getServerInstance(value []byte) (*ServiceInstance, error) {
stringValue := string(value)
valueSplit := strings.Split(stringValue, addressSplitChar)
if len(valueSplit) != 2 {
return nil, fmt.Errorf("etcd value has an incorrect format. value: %s", stringValue)
}
ip := valueSplit[0]
port, err := strconv.Atoi(valueSplit[1])
ip, port, err := net.SplitIPPortStr(stringValue)
if err != nil {
return nil, fmt.Errorf("etcd port has an incorrect format. err: %w", err)
}
@@ -213,9 +210,7 @@ func getClusterAndAddress(key []byte) (string, string, int, error) {
return "", "", 0, fmt.Errorf("etcd key has an incorrect format. key: %s", stringKey)
}
cluster := keySplit[2]
address := strings.Split(keySplit[3], addressSplitChar)
ip := address[0]
port, err := strconv.Atoi(address[1])
ip, port, err := net.SplitIPPortStr(keySplit[3])
if err != nil {
return "", "", 0, fmt.Errorf("etcd port has an incorrect format. err: %w", err)
}


+ 18
- 0
pkg/discovery/etcd3_test.go View File

@@ -54,6 +54,24 @@ func TestEtcd3RegistryService_Lookup(t *testing.T) {
},
},
},
{
name: "host is ipv6",
getResp: &clientv3.GetResponse{
Kvs: []*mvccpb.KeyValue{
{
Key: []byte("registry-seata-default-2000:0000:0000:0000:0001:2345:6789:abcd:8091"),
Value: []byte("2000:0000:0000:0000:0001:2345:6789:abcd:8091"),
},
},
},
watchResp: nil,
want: []*ServiceInstance{
{
Addr: "2000:0000:0000:0000:0001:2345:6789:abcd",
Port: 8091,
},
},
},
{
name: "use watch update ServiceInstances",
getResp: nil,


+ 4
- 9
pkg/discovery/file.go View File

@@ -19,15 +19,14 @@ package discovery

import (
"fmt"
"strconv"
"strings"

"seata.apache.org/seata-go/pkg/util/log"
"seata.apache.org/seata-go/pkg/util/net"
)

const (
endPointSplitChar = ";"
ipPortSplitChar = ":"
)

type FileRegistryService struct {
@@ -66,17 +65,13 @@ func (s *FileRegistryService) Lookup(key string) ([]*ServiceInstance, error) {
addrs := strings.Split(addrStr, endPointSplitChar)
instances := make([]*ServiceInstance, 0)
for _, addr := range addrs {
ipPort := strings.Split(addr, ipPortSplitChar)
if len(ipPort) != 2 {
return nil, fmt.Errorf("endpoint format should like ip:port. endpoint: %s", addr)
}
ip := ipPort[0]
port, err := strconv.Atoi(ipPort[1])
host, port, err := net.SplitIPPortStr(addr)
if err != nil {
log.Errorf("endpoint err. endpoint: %s", addr)
return nil, err
}
instances = append(instances, &ServiceInstance{
Addr: ip,
Addr: host,
Port: port,
})
}


+ 70
- 1
pkg/discovery/file_test.go View File

@@ -136,7 +136,7 @@ func TestFileRegistryService_Lookup(t *testing.T) {
},
want: nil,
wantErr: true,
wantErrMsg: "endpoint format should like ip:port. endpoint: 127.0.0.18091",
wantErrMsg: "address 127.0.0.18091: missing port in address",
},
{
name: "port is not number",
@@ -157,6 +157,75 @@ func TestFileRegistryService_Lookup(t *testing.T) {
wantErr: true,
wantErrMsg: "strconv.Atoi: parsing \"abc\": invalid syntax",
},
{
name: "endpoint is ipv6",
args: args{
key: "default_tx_group",
},
fields: fields{
serviceConfig: &ServiceConfig{
VgroupMapping: map[string]string{
"default_tx_group": "default",
},
Grouplist: map[string]string{
"default": "[2000:0000:0000:0000:0001:2345:6789:abcd]:8080",
},
},
},
want: []*ServiceInstance{
{
Addr: "2000:0000:0000:0000:0001:2345:6789:abcd",
Port: 8080,
},
},
wantErr: false,
},
{
name: "endpoint is ipv6",
args: args{
key: "default_tx_group",
},
fields: fields{
serviceConfig: &ServiceConfig{
VgroupMapping: map[string]string{
"default_tx_group": "default",
},
Grouplist: map[string]string{
"default": "[2000:0000:0000:0000:0001:2345:6789:abcd%10]:8080",
},
},
},
want: []*ServiceInstance{
{
Addr: "2000:0000:0000:0000:0001:2345:6789:abcd",
Port: 8080,
},
},
wantErr: false,
},
{
name: "endpoint is ipv6",
args: args{
key: "default_tx_group",
},
fields: fields{
serviceConfig: &ServiceConfig{
VgroupMapping: map[string]string{
"default_tx_group": "default",
},
Grouplist: map[string]string{
"default": "[::]:8080",
},
},
},
want: []*ServiceInstance{
{
Addr: "::",
Port: 8080,
},
},
wantErr: false,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {


+ 2
- 1
pkg/remoting/getty/session_manager.go View File

@@ -22,6 +22,7 @@ import (
"fmt"
"net"
"reflect"
"strconv"
"sync"
"sync/atomic"
"time"
@@ -76,7 +77,7 @@ func (g *SessionManager) init() {
}
for _, address := range addressList {
gettyClient := getty.NewTCPClient(
getty.WithServerAddress(fmt.Sprintf("%s:%d", address.Addr, address.Port)),
getty.WithServerAddress(net.JoinHostPort(address.Addr, strconv.Itoa(address.Port))),
// todo if read c.gettyConf.ConnectionNum, will cause the connect to fail
getty.WithConnectionNumber(1),
getty.WithReconnectInterval(g.gettyConf.ReconnectInterval),


+ 30
- 18
pkg/remoting/loadbalance/xid_loadbalance.go View File

@@ -18,34 +18,46 @@
package loadbalance

import (
"fmt"
"strings"
"sync"

getty "github.com/apache/dubbo-getty"

"seata.apache.org/seata-go/pkg/util/log"
"seata.apache.org/seata-go/pkg/util/net"
)

func XidLoadBalance(sessions *sync.Map, xid string) getty.Session {
var session getty.Session
const delimiter = ":"

if len(xid) > 0 && strings.Contains(xid, delimiter) {
// ip:port:transactionId -> ip:port
index := strings.LastIndex(xid, delimiter)
serverAddress := xid[:index]

// ip:port:transactionId
tmpSplits := strings.Split(xid, ":")
if len(tmpSplits) == 3 {
ip := tmpSplits[0]
port := tmpSplits[1]
ipPort := ip + ":" + port
sessions.Range(func(key, value interface{}) bool {
tmpSession := key.(getty.Session)
if tmpSession.IsClosed() {
sessions.Delete(tmpSession)
// ip:port -> port
// ipv4/v6
ip, port, err := net.SplitIPPortStr(serverAddress)
if err != nil {
log.Errorf("xid load balance err, xid:%s, %v , change use random load balance", xid, err)
} else {
sessions.Range(func(key, value interface{}) bool {
tmpSession := key.(getty.Session)
if tmpSession.IsClosed() {
sessions.Delete(tmpSession)
return true
}
ipPort := fmt.Sprintf("%s:%d", ip, port)
connectedIpPort := tmpSession.RemoteAddr()
if ipPort == connectedIpPort {
session = tmpSession
return false
}
return true
}
connectedIpPort := tmpSession.RemoteAddr()
if ipPort == connectedIpPort {
session = tmpSession
return false
}
return true
})
})
}
}

if session == nil {


+ 6
- 0
pkg/remoting/loadbalance/xid_loadbalance_test.go View File

@@ -84,6 +84,12 @@ func TestXidLoadBalance(t *testing.T) {
xid: "127.0.0.1:9000:111",
returnAddrs: []string{"127.0.0.1:8000", "127.0.0.1:8002"},
},
{
name: "ip is ipv6",
sessions: sessions,
xid: "2000:0000:0000:0000:0001:2345:6789:abcd:8002:111",
returnAddrs: []string{"127.0.0.1:8000", "127.0.0.1:8002"},
},
}
for _, test := range testCases {
session := XidLoadBalance(test.sessions, test.xid)


+ 59
- 0
pkg/util/net/address_validator.go View File

@@ -0,0 +1,59 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package net

import (
"fmt"
"regexp"
"strconv"
"strings"
)

const (
addressSplitChar = ":"
)

func SplitIPPortStr(addr string) (string, int, error) {
if addr == "" {
return "", 0, fmt.Errorf("split ip err: param addr must not empty")
}

if addr[0] == '[' {
reg := regexp.MustCompile("[\\[\\]]")
addr = reg.ReplaceAllString(addr, "")
}

i := strings.LastIndex(addr, addressSplitChar)
if i < 0 {
return "", 0, fmt.Errorf("address %s: missing port in address", addr)
}

host := addr[:i]
port := addr[i+1:]

if strings.Contains(host, "%") {
reg := regexp.MustCompile("\\%[0-9]+")
host = reg.ReplaceAllString(host, "")
}

portInt, err := strconv.Atoi(port)
if err != nil {
return "", 0, err
}
return host, portInt, nil
}

+ 131
- 0
pkg/util/net/address_validator_test.go View File

@@ -0,0 +1,131 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package net

import (
"reflect"
"testing"
)

func TestAddressValidator(t *testing.T) {
type IPAddr struct {
host string
port int
}
tests := []struct {
name string
address string
want *IPAddr
wantErr bool
wantErrMsg string
}{
{
name: "normal single endpoint.",
address: "127.0.0.1:8091",
want: &IPAddr{
"127.0.0.1", 8091,
},
wantErr: false,
},
{
name: "addr is empty.",
address: "",
want: nil,
wantErr: true,
wantErrMsg: "split ip err: param addr must not empty",
},
{
name: "format is not ip:port",
address: "127.0.0.18091",
want: nil,
wantErr: true,
wantErrMsg: "address 127.0.0.18091: missing port in address",
},
{
name: "port is not number",
address: "127.0.0.1:abc",
want: nil,
wantErr: true,
wantErrMsg: "strconv.Atoi: parsing \"abc\": invalid syntax",
},
{
name: "endpoint is ipv6",
address: "[2000:0000:0000:0000:0001:2345:6789:abcd]:8080",
want: &IPAddr{
"2000:0000:0000:0000:0001:2345:6789:abcd", 8080,
},
wantErr: false,
},
{
name: "endpoint is ipv6",
address: "[2000:0000:0000:0000:0001:2345:6789:abcd%10]:8080",
want: &IPAddr{
"2000:0000:0000:0000:0001:2345:6789:abcd", 8080,
},
wantErr: false,
},
{
name: "endpoint is ipv6",
address: "2000:0000:0000:0000:0001:2345:6789:abcd:8080",
want: &IPAddr{
"2000:0000:0000:0000:0001:2345:6789:abcd", 8080,
},
wantErr: false,
},
{
name: "endpoint is ipv6",
address: "[::]:8080",
want: &IPAddr{
"::", 8080,
},
wantErr: false,
},
{
name: "endpoint is ipv6",
address: "::FFFF:192.168.1.2:8080",
want: &IPAddr{
"::FFFF:192.168.1.2", 8080,
},
wantErr: false,
},
{
name: "endpoint is ipv6",
address: "[::FFFF:192.168.1.2]:8080",
want: &IPAddr{
"::FFFF:192.168.1.2", 8080,
},
wantErr: false,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
host, port, err := SplitIPPortStr(tt.address)

if (err != nil) != tt.wantErr {
t.Errorf("SplitIPPortStr() error = %v, wantErr = %v", err, tt.wantErr)
}
if tt.wantErr && err.Error() != tt.wantErrMsg {
t.Errorf("SplitIPPortStr() errMsg = %v, wantErrMsg = %v", err.Error(), tt.wantErrMsg)
}
got := &IPAddr{host, port}
if !tt.wantErr && !reflect.DeepEqual(got, tt.want) {
t.Errorf("SplitIPPortStr() got = %v, want = %v", got, tt.want)
}
})
}
}

Loading…
Cancel
Save