Browse Source

feat: add xid loadbalance (#534)

tags/v1.2.0
georgehao GitHub 2 years ago
parent
commit
d9a8ffa044
No known key found for this signature in database GPG Key ID: 4AEE18F83AFDEB23
13 changed files with 230 additions and 78 deletions
  1. +4
    -2
      pkg/client/client.go
  2. +7
    -7
      pkg/client/config.go
  3. +18
    -17
      pkg/remoting/config/config.go
  4. +1
    -1
      pkg/remoting/config/session_config.go
  5. +2
    -3
      pkg/remoting/getty/getty_client_test.go
  6. +2
    -2
      pkg/remoting/getty/getty_remoting.go
  7. +2
    -1
      pkg/remoting/getty/listener.go
  8. +21
    -26
      pkg/remoting/getty/readwriter.go
  9. +8
    -8
      pkg/remoting/getty/rpc_client.go
  10. +29
    -11
      pkg/remoting/getty/session_manager.go
  11. +43
    -0
      pkg/remoting/loadbalance/loadbalance.go
  12. +37
    -0
      pkg/remoting/loadbalance/random_loadbalance.go
  13. +56
    -0
      pkg/remoting/loadbalance/xid_loadbalance.go

+ 4
- 2
pkg/client/client.go View File

@@ -18,13 +18,14 @@
package client

import (
"github.com/seata/seata-go/pkg/remoting/getty"
"sync"

"github.com/seata/seata-go/pkg/datasource"
at "github.com/seata/seata-go/pkg/datasource/sql"
"github.com/seata/seata-go/pkg/datasource/sql/exec/config"
"github.com/seata/seata-go/pkg/integration"
"github.com/seata/seata-go/pkg/remoting/getty"
remoteConfig "github.com/seata/seata-go/pkg/remoting/config"
"github.com/seata/seata-go/pkg/remoting/processor/client"
"github.com/seata/seata-go/pkg/rm"
"github.com/seata/seata-go/pkg/rm/tcc"
@@ -61,11 +62,12 @@ func initTmClient(cfg *Config) {

// initRemoting init rpc client
func initRemoting(cfg *Config) {
getty.InitRpcClient(&cfg.GettyConfig, &getty.SeataConfig{
getty.InitRpcClient(&cfg.GettyConfig, &remoteConfig.SeataConfig{
ApplicationID: cfg.ApplicationID,
TxServiceGroup: cfg.TxServiceGroup,
ServiceVgroupMapping: cfg.ServiceConfig.VgroupMapping,
ServiceGrouplist: cfg.ServiceConfig.Grouplist,
LoadBalanceType: cfg.GettyConfig.LoadBalanceType,
})
}



+ 7
- 7
pkg/client/config.go View File

@@ -34,7 +34,7 @@ import (

"github.com/seata/seata-go/pkg/datasource/sql"
"github.com/seata/seata-go/pkg/datasource/sql/undo"
"github.com/seata/seata-go/pkg/remoting/getty"
remoteConfig "github.com/seata/seata-go/pkg/remoting/config"
"github.com/seata/seata-go/pkg/rm"
"github.com/seata/seata-go/pkg/rm/tcc"
"github.com/seata/seata-go/pkg/tm"
@@ -76,12 +76,12 @@ type Config struct {
EnableAutoDataSourceProxy bool `yaml:"enable-auto-data-source-proxy" json:"enable-auto-data-source-proxy,omitempty" koanf:"enable-auto-data-source-proxy"`
DataSourceProxyMode string `yaml:"data-source-proxy-mode" json:"data-source-proxy-mode,omitempty" koanf:"data-source-proxy-mode"`

AsyncWorkerConfig sql.AsyncWorkerConfig `yaml:"async" json:"async" koanf:"async"`
TCCConfig tcc.Config `yaml:"tcc" json:"tcc" koanf:"tcc"`
ClientConfig ClientConfig `yaml:"client" json:"client" koanf:"client"`
GettyConfig getty.Config `yaml:"getty" json:"getty" koanf:"getty"`
TransportConfig getty.TransportConfig `yaml:"transport" json:"transport" koanf:"transport"`
ServiceConfig tm.ServiceConfig `yaml:"service" json:"service" koanf:"service"`
AsyncWorkerConfig sql.AsyncWorkerConfig `yaml:"async" json:"async" koanf:"async"`
TCCConfig tcc.Config `yaml:"tcc" json:"tcc" koanf:"tcc"`
ClientConfig ClientConfig `yaml:"client" json:"client" koanf:"client"`
GettyConfig remoteConfig.Config `yaml:"getty" json:"getty" koanf:"getty"`
TransportConfig remoteConfig.TransportConfig `yaml:"transport" json:"transport" koanf:"transport"`
ServiceConfig tm.ServiceConfig `yaml:"service" json:"service" koanf:"service"`
}

func (c *Config) RegisterFlags(f *flag.FlagSet) {


pkg/remoting/getty/config.go → pkg/remoting/config/config.go View File

@@ -15,7 +15,7 @@
* limitations under the License.
*/

package getty
package config

import (
"flag"
@@ -24,20 +24,31 @@ import (
"github.com/seata/seata-go/pkg/util/flagext"
)

var (
seataConfig *SeataConfig
)
var seataConfig *SeataConfig

type Config struct {
ReconnectInterval int `yaml:"reconnect-interval" json:"reconnect-interval" koanf:"reconnect-interval"`
ConnectionNum int `yaml:"connection-num" json:"connection-num" koanf:"connection-num"`
LoadBalanceType string `yaml:"load-balance-type" json:"load-balance-type" koanf:"load-balance-type"`
SessionConfig SessionConfig `yaml:"session" json:"session" koanf:"session"`
}

// RegisterFlagsWithPrefix for Config.
func (cfg *Config) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) {
f.IntVar(&cfg.ReconnectInterval, prefix+".reconnect-interval", 0, "Reconnect interval.")
f.IntVar(&cfg.ConnectionNum, prefix+".connection-num", 1, "The getty_session pool.")
f.StringVar(&cfg.LoadBalanceType, prefix+".load-balance-type", "XID", "default load balance type")
cfg.SessionConfig.RegisterFlagsWithPrefix(prefix+".session", f)
}

type ShutdownConfig struct {
Wait time.Duration `yaml:"wait" json:"wait" konaf:"wait"`
}

func (cfg *ShutdownConfig) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) {
f.DurationVar(&cfg.Wait, prefix+".wait", 3*time.Second, "Shutdown wait time.")
}

type TransportConfig struct {
ShutdownConfig ShutdownConfig `yaml:"shutdown" json:"shutdown" koanf:"shutdown"`
Type string `yaml:"type" json:"type" koanf:"type"`
@@ -51,17 +62,6 @@ type TransportConfig struct {
RPCTmRequestTimeout time.Duration `yaml:"rpc-tm-request-timeout" json:"rpc-tm-request-timeout" koanf:"rpc-tm-request-timeout"`
}

// RegisterFlagsWithPrefix for Config.
func (cfg *Config) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) {
f.IntVar(&cfg.ReconnectInterval, prefix+".reconnect-interval", 0, "Reconnect interval.")
f.IntVar(&cfg.ConnectionNum, prefix+".connection-num", 1, "The getty_session pool.")
cfg.SessionConfig.RegisterFlagsWithPrefix(prefix+".session", f)
}

func (cfg *ShutdownConfig) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) {
f.DurationVar(&cfg.Wait, prefix+".wait", 3*time.Second, "Shutdown wait time.")
}

func (cfg *TransportConfig) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) {
cfg.ShutdownConfig.RegisterFlagsWithPrefix(prefix+".shutdown", f)
f.StringVar(&cfg.Type, prefix+".type", "TCP", "Transport protocol type.")
@@ -81,12 +81,13 @@ type SeataConfig struct {
TxServiceGroup string
ServiceVgroupMapping flagext.StringMap
ServiceGrouplist flagext.StringMap
LoadBalanceType string
}

func iniConfig(seataConf *SeataConfig) {
func IniConfig(seataConf *SeataConfig) {
seataConfig = seataConf
}

func getSeataConfig() *SeataConfig {
func GetSeataConfig() *SeataConfig {
return seataConfig
}

pkg/remoting/getty/session_config.go → pkg/remoting/config/session_config.go View File

@@ -15,7 +15,7 @@
* limitations under the License.
*/

package getty
package config

import (
"flag"

+ 2
- 3
pkg/remoting/getty/getty_client_test.go View File

@@ -23,13 +23,12 @@ import (
"testing"

"github.com/agiledragon/gomonkey/v2"
getty "github.com/apache/dubbo-getty"
"github.com/stretchr/testify/assert"

"github.com/seata/seata-go/pkg/protocol/codec"
"github.com/seata/seata-go/pkg/protocol/message"
"github.com/seata/seata-go/pkg/util/log"

getty "github.com/apache/dubbo-getty"
"github.com/stretchr/testify/assert"
)

// TestGettyRemotingClient_SendSyncRequest unit test for SendSyncRequest function


+ 2
- 2
pkg/remoting/getty/getty_remoting.go View File

@@ -59,14 +59,14 @@ func GetGettyRemotingInstance() *GettyRemoting {

func (g *GettyRemoting) SendSync(msg message.RpcMessage, s getty.Session, callback callbackMethod) (interface{}, error) {
if s == nil {
s = sessionManager.selectSession()
s = sessionManager.selectSession(msg)
}
return g.sendAsync(s, msg, callback)
}

func (g *GettyRemoting) SendASync(msg message.RpcMessage, s getty.Session, callback callbackMethod) error {
if s == nil {
s = sessionManager.selectSession()
s = sessionManager.selectSession(msg)
}
_, err := g.sendAsync(s, msg, callback)
return err


+ 2
- 1
pkg/remoting/getty/listener.go View File

@@ -27,6 +27,7 @@ import (
"github.com/seata/seata-go/pkg/constant"
"github.com/seata/seata-go/pkg/protocol/codec"
"github.com/seata/seata-go/pkg/protocol/message"
"github.com/seata/seata-go/pkg/remoting/config"
"github.com/seata/seata-go/pkg/remoting/processor"
"github.com/seata/seata-go/pkg/util/log"
)
@@ -62,7 +63,7 @@ func GetGettyClientHandlerInstance() *gettyClientHandler {
func (g *gettyClientHandler) OnOpen(session getty.Session) error {
log.Infof("Open new getty session ")
g.sessionManager.registerSession(session)
conf := getSeataConfig()
conf := config.GetSeataConfig()
go func() {
request := message.RegisterTMRequest{AbstractIdentifyRequest: message.AbstractIdentifyRequest{
Version: constant.SeataVersion,


+ 21
- 26
pkg/remoting/getty/readwriter.go View File

@@ -18,40 +18,35 @@
package getty

import (
"errors"
"fmt"

"github.com/seata/seata-go/pkg/util/bytes"

getty "github.com/apache/dubbo-getty"
"github.com/pkg/errors"

"github.com/seata/seata-go/pkg/protocol/codec"
"github.com/seata/seata-go/pkg/protocol/message"
"github.com/seata/seata-go/pkg/util/bytes"
)

/**
* <pre>
* 0 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
* +-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+
* | magic |Proto | Full length | Head | Msg |Seria|Compr| RequestID |
* | code |clVer | (head+body) | Length |Type |lizer|ess | |
* +-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+
* | |
* | Head Map [Optional] |
* +-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+
* | |
* | body |
* | |
* | ... ... |
* +-----------------------------------------------------------------------------------------------+
* </pre>
* <p>
* <li>Full Length: include all data </li>
* <li>Head Length: include head data from magic code to head map. </li>
* <li>Body Length: Full Length - Head Length</li>
* </p>
* https://github.com/seata/seata/issues/893
*/
// 0 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
// +-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+
// | magic |Proto | Full length | Head | Msg |Seria|Compr| RequestID |
// | code |clVer | (head+body) | Length |Type |lizer|ess | |
// +-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+
// | |
// | Head Map [Optional] |
// +-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+
// | |
// | body |
// | |
// | ... ... |
// +-----------------------------------------------------------------------------------------------+
// <li>Full Length: include all data </li>
// <li>Head Length: include head data from magic code to head map. </li>
// <li>Body Length: Full Length - Head Length</li>
// </p>
// https://github.com/seata/seata/issues/893

const (
Seatav1HeaderLength = 16
)


+ 8
- 8
pkg/remoting/getty/rpc_client.go View File

@@ -24,23 +24,23 @@ import (
"strings"
"sync"

"github.com/seata/seata-go/pkg/protocol/codec"
"github.com/seata/seata-go/pkg/util/log"

getty "github.com/apache/dubbo-getty"

gxsync "github.com/dubbogo/gost/sync"

"github.com/seata/seata-go/pkg/protocol/codec"
"github.com/seata/seata-go/pkg/remoting/config"
"github.com/seata/seata-go/pkg/util/log"
)

type RpcClient struct {
gettyConf *Config
seataConf *SeataConfig
gettyConf *config.Config
seataConf *config.SeataConfig
gettyClients []getty.Client
futures *sync.Map
}

func InitRpcClient(gettyConfig *Config, seataConfig *SeataConfig) {
iniConfig(seataConfig)
func InitRpcClient(gettyConfig *config.Config, seataConfig *config.SeataConfig) {
config.IniConfig(seataConfig)
rpcClient := &RpcClient{
gettyConf: gettyConfig,
seataConf: seataConfig,


+ 29
- 11
pkg/remoting/getty/session_manager.go View File

@@ -18,11 +18,16 @@
package getty

import (
"reflect"
"sync"
"sync/atomic"
"time"

getty "github.com/apache/dubbo-getty"

"github.com/seata/seata-go/pkg/protocol/message"
"github.com/seata/seata-go/pkg/remoting/config"
"github.com/seata/seata-go/pkg/remoting/loadbalance"
)

const (
@@ -47,20 +52,12 @@ func newSessionManager() *SessionManager {
}
}

func (g *SessionManager) selectSession() getty.Session {
var session getty.Session
g.allSessions.Range(func(key, value interface{}) bool {
session = key.(getty.Session)
if session.IsClosed() {
g.releaseSession(session)
} else {
return false
}
return true
})
func (g *SessionManager) selectSession(msg interface{}) getty.Session {
session := loadbalance.Select(config.GetSeataConfig().LoadBalanceType, &g.allSessions, g.getXid(msg))
if session != nil {
return session
}

if g.sessionSize == 0 {
ticker := time.NewTicker(time.Duration(checkAliveInternal) * time.Millisecond)
defer ticker.Stop()
@@ -83,6 +80,27 @@ func (g *SessionManager) selectSession() getty.Session {
return nil
}

func (g *SessionManager) getXid(msg interface{}) string {
var xid string
if tmpMsg, ok := msg.(message.AbstractGlobalEndRequest); ok {
xid = tmpMsg.Xid
} else if tmpMsg, ok := msg.(message.GlobalBeginRequest); ok {
xid = tmpMsg.TransactionName
} else if tmpMsg, ok := msg.(message.BranchRegisterRequest); ok {
xid = tmpMsg.Xid
} else if tmpMsg, ok := msg.(message.BranchReportRequest); ok {
xid = tmpMsg.Xid
} else {
msgType := reflect.TypeOf(msg)
msgValue := reflect.ValueOf(msg)
if msgType.Kind() == reflect.Ptr {
msgValue = msgValue.Elem()
}
xid = msgValue.FieldByName("Xid").String()
}
return xid
}

func (g *SessionManager) releaseSession(session getty.Session) {
g.allSessions.Delete(session)
if !session.IsClosed() {


+ 43
- 0
pkg/remoting/loadbalance/loadbalance.go View File

@@ -0,0 +1,43 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package loadbalance

import (
"sync"

getty "github.com/apache/dubbo-getty"
)

const (
randomLoadBalance = "RandomLoadBalance"
xidLoadBalance = "XID"
roundRobinLoadBalance = "RoundRobinLoadBalance"
consistentHashLoadBalance = "ConsistentHashLoadBalance"
leastActiveLoadBalance = "LeastActiveLoadBalance"
)

func Select(loadBalanceType string, sessions *sync.Map, xid string) getty.Session {
switch loadBalanceType {
case randomLoadBalance:
return RandomLoadBalance(sessions, xid)
case xidLoadBalance:
return XidLoadBalance(sessions, xid)
default:
return RandomLoadBalance(sessions, xid)
}
}

+ 37
- 0
pkg/remoting/loadbalance/random_loadbalance.go View File

@@ -0,0 +1,37 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package loadbalance

import (
"sync"

getty "github.com/apache/dubbo-getty"
)

func RandomLoadBalance(sessions *sync.Map, xid string) getty.Session {
var session getty.Session
sessions.Range(func(key, value interface{}) bool {
session = key.(getty.Session)
if session.IsClosed() {
sessions.Delete(session)
return true
}
return false
})
return session
}

+ 56
- 0
pkg/remoting/loadbalance/xid_loadbalance.go View File

@@ -0,0 +1,56 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package loadbalance

import (
"strings"
"sync"

getty "github.com/apache/dubbo-getty"
)

func XidLoadBalance(sessions *sync.Map, xid string) getty.Session {
var session getty.Session

// ip:port:transactionId
tmpSplits := strings.Split(xid, ":")
if len(tmpSplits) == 3 {
ip := tmpSplits[0]
port := tmpSplits[1]
ipPort := ip + ":" + port
sessions.Range(func(key, value interface{}) bool {
tmpSession := key.(getty.Session)
if tmpSession.IsClosed() {
sessions.Delete(tmpSession)
return true
}
connectedIpPort := session.RemoteAddr()
if ipPort == connectedIpPort {
session = tmpSession
return false
}
return true
})
}

if session == nil {
return RandomLoadBalance(sessions, xid)
}

return session
}

Loading…
Cancel
Save