diff --git a/pkg/client/client.go b/pkg/client/client.go index 8fcf943c..2819a660 100644 --- a/pkg/client/client.go +++ b/pkg/client/client.go @@ -18,13 +18,14 @@ package client import ( + "github.com/seata/seata-go/pkg/remoting/getty" "sync" "github.com/seata/seata-go/pkg/datasource" at "github.com/seata/seata-go/pkg/datasource/sql" "github.com/seata/seata-go/pkg/datasource/sql/exec/config" "github.com/seata/seata-go/pkg/integration" - "github.com/seata/seata-go/pkg/remoting/getty" + remoteConfig "github.com/seata/seata-go/pkg/remoting/config" "github.com/seata/seata-go/pkg/remoting/processor/client" "github.com/seata/seata-go/pkg/rm" "github.com/seata/seata-go/pkg/rm/tcc" @@ -61,11 +62,12 @@ func initTmClient(cfg *Config) { // initRemoting init rpc client func initRemoting(cfg *Config) { - getty.InitRpcClient(&cfg.GettyConfig, &getty.SeataConfig{ + getty.InitRpcClient(&cfg.GettyConfig, &remoteConfig.SeataConfig{ ApplicationID: cfg.ApplicationID, TxServiceGroup: cfg.TxServiceGroup, ServiceVgroupMapping: cfg.ServiceConfig.VgroupMapping, ServiceGrouplist: cfg.ServiceConfig.Grouplist, + LoadBalanceType: cfg.GettyConfig.LoadBalanceType, }) } diff --git a/pkg/client/config.go b/pkg/client/config.go index e2abe347..a4a9eb77 100644 --- a/pkg/client/config.go +++ b/pkg/client/config.go @@ -34,7 +34,7 @@ import ( "github.com/seata/seata-go/pkg/datasource/sql" "github.com/seata/seata-go/pkg/datasource/sql/undo" - "github.com/seata/seata-go/pkg/remoting/getty" + remoteConfig "github.com/seata/seata-go/pkg/remoting/config" "github.com/seata/seata-go/pkg/rm" "github.com/seata/seata-go/pkg/rm/tcc" "github.com/seata/seata-go/pkg/tm" @@ -76,12 +76,12 @@ type Config struct { EnableAutoDataSourceProxy bool `yaml:"enable-auto-data-source-proxy" json:"enable-auto-data-source-proxy,omitempty" koanf:"enable-auto-data-source-proxy"` DataSourceProxyMode string `yaml:"data-source-proxy-mode" json:"data-source-proxy-mode,omitempty" koanf:"data-source-proxy-mode"` - AsyncWorkerConfig sql.AsyncWorkerConfig `yaml:"async" json:"async" koanf:"async"` - TCCConfig tcc.Config `yaml:"tcc" json:"tcc" koanf:"tcc"` - ClientConfig ClientConfig `yaml:"client" json:"client" koanf:"client"` - GettyConfig getty.Config `yaml:"getty" json:"getty" koanf:"getty"` - TransportConfig getty.TransportConfig `yaml:"transport" json:"transport" koanf:"transport"` - ServiceConfig tm.ServiceConfig `yaml:"service" json:"service" koanf:"service"` + AsyncWorkerConfig sql.AsyncWorkerConfig `yaml:"async" json:"async" koanf:"async"` + TCCConfig tcc.Config `yaml:"tcc" json:"tcc" koanf:"tcc"` + ClientConfig ClientConfig `yaml:"client" json:"client" koanf:"client"` + GettyConfig remoteConfig.Config `yaml:"getty" json:"getty" koanf:"getty"` + TransportConfig remoteConfig.TransportConfig `yaml:"transport" json:"transport" koanf:"transport"` + ServiceConfig tm.ServiceConfig `yaml:"service" json:"service" koanf:"service"` } func (c *Config) RegisterFlags(f *flag.FlagSet) { diff --git a/pkg/remoting/getty/config.go b/pkg/remoting/config/config.go similarity index 92% rename from pkg/remoting/getty/config.go rename to pkg/remoting/config/config.go index b85ce55f..5d234d30 100644 --- a/pkg/remoting/getty/config.go +++ b/pkg/remoting/config/config.go @@ -15,7 +15,7 @@ * limitations under the License. */ -package getty +package config import ( "flag" @@ -24,20 +24,31 @@ import ( "github.com/seata/seata-go/pkg/util/flagext" ) -var ( - seataConfig *SeataConfig -) +var seataConfig *SeataConfig type Config struct { ReconnectInterval int `yaml:"reconnect-interval" json:"reconnect-interval" koanf:"reconnect-interval"` ConnectionNum int `yaml:"connection-num" json:"connection-num" koanf:"connection-num"` + LoadBalanceType string `yaml:"load-balance-type" json:"load-balance-type" koanf:"load-balance-type"` SessionConfig SessionConfig `yaml:"session" json:"session" koanf:"session"` } +// RegisterFlagsWithPrefix for Config. +func (cfg *Config) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) { + f.IntVar(&cfg.ReconnectInterval, prefix+".reconnect-interval", 0, "Reconnect interval.") + f.IntVar(&cfg.ConnectionNum, prefix+".connection-num", 1, "The getty_session pool.") + f.StringVar(&cfg.LoadBalanceType, prefix+".load-balance-type", "XID", "default load balance type") + cfg.SessionConfig.RegisterFlagsWithPrefix(prefix+".session", f) +} + type ShutdownConfig struct { Wait time.Duration `yaml:"wait" json:"wait" konaf:"wait"` } +func (cfg *ShutdownConfig) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) { + f.DurationVar(&cfg.Wait, prefix+".wait", 3*time.Second, "Shutdown wait time.") +} + type TransportConfig struct { ShutdownConfig ShutdownConfig `yaml:"shutdown" json:"shutdown" koanf:"shutdown"` Type string `yaml:"type" json:"type" koanf:"type"` @@ -51,17 +62,6 @@ type TransportConfig struct { RPCTmRequestTimeout time.Duration `yaml:"rpc-tm-request-timeout" json:"rpc-tm-request-timeout" koanf:"rpc-tm-request-timeout"` } -// RegisterFlagsWithPrefix for Config. -func (cfg *Config) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) { - f.IntVar(&cfg.ReconnectInterval, prefix+".reconnect-interval", 0, "Reconnect interval.") - f.IntVar(&cfg.ConnectionNum, prefix+".connection-num", 1, "The getty_session pool.") - cfg.SessionConfig.RegisterFlagsWithPrefix(prefix+".session", f) -} - -func (cfg *ShutdownConfig) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) { - f.DurationVar(&cfg.Wait, prefix+".wait", 3*time.Second, "Shutdown wait time.") -} - func (cfg *TransportConfig) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) { cfg.ShutdownConfig.RegisterFlagsWithPrefix(prefix+".shutdown", f) f.StringVar(&cfg.Type, prefix+".type", "TCP", "Transport protocol type.") @@ -81,12 +81,13 @@ type SeataConfig struct { TxServiceGroup string ServiceVgroupMapping flagext.StringMap ServiceGrouplist flagext.StringMap + LoadBalanceType string } -func iniConfig(seataConf *SeataConfig) { +func IniConfig(seataConf *SeataConfig) { seataConfig = seataConf } -func getSeataConfig() *SeataConfig { +func GetSeataConfig() *SeataConfig { return seataConfig } diff --git a/pkg/remoting/getty/session_config.go b/pkg/remoting/config/session_config.go similarity index 99% rename from pkg/remoting/getty/session_config.go rename to pkg/remoting/config/session_config.go index be9defe9..391ab857 100644 --- a/pkg/remoting/getty/session_config.go +++ b/pkg/remoting/config/session_config.go @@ -15,7 +15,7 @@ * limitations under the License. */ -package getty +package config import ( "flag" diff --git a/pkg/remoting/getty/getty_client_test.go b/pkg/remoting/getty/getty_client_test.go index 41b96e5e..4704cd7e 100644 --- a/pkg/remoting/getty/getty_client_test.go +++ b/pkg/remoting/getty/getty_client_test.go @@ -23,13 +23,12 @@ import ( "testing" "github.com/agiledragon/gomonkey/v2" + getty "github.com/apache/dubbo-getty" + "github.com/stretchr/testify/assert" "github.com/seata/seata-go/pkg/protocol/codec" "github.com/seata/seata-go/pkg/protocol/message" "github.com/seata/seata-go/pkg/util/log" - - getty "github.com/apache/dubbo-getty" - "github.com/stretchr/testify/assert" ) // TestGettyRemotingClient_SendSyncRequest unit test for SendSyncRequest function diff --git a/pkg/remoting/getty/getty_remoting.go b/pkg/remoting/getty/getty_remoting.go index 4fed4de8..d0474388 100644 --- a/pkg/remoting/getty/getty_remoting.go +++ b/pkg/remoting/getty/getty_remoting.go @@ -59,14 +59,14 @@ func GetGettyRemotingInstance() *GettyRemoting { func (g *GettyRemoting) SendSync(msg message.RpcMessage, s getty.Session, callback callbackMethod) (interface{}, error) { if s == nil { - s = sessionManager.selectSession() + s = sessionManager.selectSession(msg) } return g.sendAsync(s, msg, callback) } func (g *GettyRemoting) SendASync(msg message.RpcMessage, s getty.Session, callback callbackMethod) error { if s == nil { - s = sessionManager.selectSession() + s = sessionManager.selectSession(msg) } _, err := g.sendAsync(s, msg, callback) return err diff --git a/pkg/remoting/getty/listener.go b/pkg/remoting/getty/listener.go index 8f6b6a59..81890c78 100644 --- a/pkg/remoting/getty/listener.go +++ b/pkg/remoting/getty/listener.go @@ -27,6 +27,7 @@ import ( "github.com/seata/seata-go/pkg/constant" "github.com/seata/seata-go/pkg/protocol/codec" "github.com/seata/seata-go/pkg/protocol/message" + "github.com/seata/seata-go/pkg/remoting/config" "github.com/seata/seata-go/pkg/remoting/processor" "github.com/seata/seata-go/pkg/util/log" ) @@ -62,7 +63,7 @@ func GetGettyClientHandlerInstance() *gettyClientHandler { func (g *gettyClientHandler) OnOpen(session getty.Session) error { log.Infof("Open new getty session ") g.sessionManager.registerSession(session) - conf := getSeataConfig() + conf := config.GetSeataConfig() go func() { request := message.RegisterTMRequest{AbstractIdentifyRequest: message.AbstractIdentifyRequest{ Version: constant.SeataVersion, diff --git a/pkg/remoting/getty/readwriter.go b/pkg/remoting/getty/readwriter.go index 358405d6..ef5751da 100644 --- a/pkg/remoting/getty/readwriter.go +++ b/pkg/remoting/getty/readwriter.go @@ -18,40 +18,35 @@ package getty import ( + "errors" "fmt" - "github.com/seata/seata-go/pkg/util/bytes" - getty "github.com/apache/dubbo-getty" - "github.com/pkg/errors" "github.com/seata/seata-go/pkg/protocol/codec" "github.com/seata/seata-go/pkg/protocol/message" + "github.com/seata/seata-go/pkg/util/bytes" ) -/** - *
- * 0     1     2     3     4     5     6     7     8     9    10     11    12    13    14    15    16
- * +-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+
- * |   magic   |Proto |     Full length      |    Head   | Msg |Seria|Compr|     RequestID         |
- * |   code    |clVer |    (head+body)       |   Length  |Type |lizer|ess  |                       |
- * +-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+
- * |                                                                                               |
- * |                                   Head Map [Optional]                                         |
- * +-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+
- * |                                                                                               |
- * |                                         body                                                  |
- * |                                                                                               |
- * |                                        ... ...                                                |
- * +-----------------------------------------------------------------------------------------------+
- * 
- *

- *

  • Full Length: include all data
  • - *
  • Head Length: include head data from magic code to head map.
  • - *
  • Body Length: Full Length - Head Length
  • - *

    - * https://github.com/seata/seata/issues/893 - */ +// 0 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 +// +-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+ +// | magic |Proto | Full length | Head | Msg |Seria|Compr| RequestID | +// | code |clVer | (head+body) | Length |Type |lizer|ess | | +// +-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+ +// | | +// | Head Map [Optional] | +// +-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+ +// | | +// | body | +// | | +// | ... ... | +// +-----------------------------------------------------------------------------------------------+ +//
  • Full Length: include all data
  • +//
  • Head Length: include head data from magic code to head map.
  • +//
  • Body Length: Full Length - Head Length
  • +//

    +// https://github.com/seata/seata/issues/893 + const ( Seatav1HeaderLength = 16 ) diff --git a/pkg/remoting/getty/rpc_client.go b/pkg/remoting/getty/rpc_client.go index 1aaca738..12f8a440 100644 --- a/pkg/remoting/getty/rpc_client.go +++ b/pkg/remoting/getty/rpc_client.go @@ -24,23 +24,23 @@ import ( "strings" "sync" - "github.com/seata/seata-go/pkg/protocol/codec" - "github.com/seata/seata-go/pkg/util/log" - getty "github.com/apache/dubbo-getty" - gxsync "github.com/dubbogo/gost/sync" + + "github.com/seata/seata-go/pkg/protocol/codec" + "github.com/seata/seata-go/pkg/remoting/config" + "github.com/seata/seata-go/pkg/util/log" ) type RpcClient struct { - gettyConf *Config - seataConf *SeataConfig + gettyConf *config.Config + seataConf *config.SeataConfig gettyClients []getty.Client futures *sync.Map } -func InitRpcClient(gettyConfig *Config, seataConfig *SeataConfig) { - iniConfig(seataConfig) +func InitRpcClient(gettyConfig *config.Config, seataConfig *config.SeataConfig) { + config.IniConfig(seataConfig) rpcClient := &RpcClient{ gettyConf: gettyConfig, seataConf: seataConfig, diff --git a/pkg/remoting/getty/session_manager.go b/pkg/remoting/getty/session_manager.go index be925ff0..cf8dba14 100644 --- a/pkg/remoting/getty/session_manager.go +++ b/pkg/remoting/getty/session_manager.go @@ -18,11 +18,16 @@ package getty import ( + "reflect" "sync" "sync/atomic" "time" getty "github.com/apache/dubbo-getty" + + "github.com/seata/seata-go/pkg/protocol/message" + "github.com/seata/seata-go/pkg/remoting/config" + "github.com/seata/seata-go/pkg/remoting/loadbalance" ) const ( @@ -47,20 +52,12 @@ func newSessionManager() *SessionManager { } } -func (g *SessionManager) selectSession() getty.Session { - var session getty.Session - g.allSessions.Range(func(key, value interface{}) bool { - session = key.(getty.Session) - if session.IsClosed() { - g.releaseSession(session) - } else { - return false - } - return true - }) +func (g *SessionManager) selectSession(msg interface{}) getty.Session { + session := loadbalance.Select(config.GetSeataConfig().LoadBalanceType, &g.allSessions, g.getXid(msg)) if session != nil { return session } + if g.sessionSize == 0 { ticker := time.NewTicker(time.Duration(checkAliveInternal) * time.Millisecond) defer ticker.Stop() @@ -83,6 +80,27 @@ func (g *SessionManager) selectSession() getty.Session { return nil } +func (g *SessionManager) getXid(msg interface{}) string { + var xid string + if tmpMsg, ok := msg.(message.AbstractGlobalEndRequest); ok { + xid = tmpMsg.Xid + } else if tmpMsg, ok := msg.(message.GlobalBeginRequest); ok { + xid = tmpMsg.TransactionName + } else if tmpMsg, ok := msg.(message.BranchRegisterRequest); ok { + xid = tmpMsg.Xid + } else if tmpMsg, ok := msg.(message.BranchReportRequest); ok { + xid = tmpMsg.Xid + } else { + msgType := reflect.TypeOf(msg) + msgValue := reflect.ValueOf(msg) + if msgType.Kind() == reflect.Ptr { + msgValue = msgValue.Elem() + } + xid = msgValue.FieldByName("Xid").String() + } + return xid +} + func (g *SessionManager) releaseSession(session getty.Session) { g.allSessions.Delete(session) if !session.IsClosed() { diff --git a/pkg/remoting/loadbalance/loadbalance.go b/pkg/remoting/loadbalance/loadbalance.go new file mode 100644 index 00000000..c5ddb679 --- /dev/null +++ b/pkg/remoting/loadbalance/loadbalance.go @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package loadbalance + +import ( + "sync" + + getty "github.com/apache/dubbo-getty" +) + +const ( + randomLoadBalance = "RandomLoadBalance" + xidLoadBalance = "XID" + roundRobinLoadBalance = "RoundRobinLoadBalance" + consistentHashLoadBalance = "ConsistentHashLoadBalance" + leastActiveLoadBalance = "LeastActiveLoadBalance" +) + +func Select(loadBalanceType string, sessions *sync.Map, xid string) getty.Session { + switch loadBalanceType { + case randomLoadBalance: + return RandomLoadBalance(sessions, xid) + case xidLoadBalance: + return XidLoadBalance(sessions, xid) + default: + return RandomLoadBalance(sessions, xid) + } +} diff --git a/pkg/remoting/loadbalance/random_loadbalance.go b/pkg/remoting/loadbalance/random_loadbalance.go new file mode 100644 index 00000000..9700f919 --- /dev/null +++ b/pkg/remoting/loadbalance/random_loadbalance.go @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package loadbalance + +import ( + "sync" + + getty "github.com/apache/dubbo-getty" +) + +func RandomLoadBalance(sessions *sync.Map, xid string) getty.Session { + var session getty.Session + sessions.Range(func(key, value interface{}) bool { + session = key.(getty.Session) + if session.IsClosed() { + sessions.Delete(session) + return true + } + return false + }) + return session +} diff --git a/pkg/remoting/loadbalance/xid_loadbalance.go b/pkg/remoting/loadbalance/xid_loadbalance.go new file mode 100644 index 00000000..f855856f --- /dev/null +++ b/pkg/remoting/loadbalance/xid_loadbalance.go @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package loadbalance + +import ( + "strings" + "sync" + + getty "github.com/apache/dubbo-getty" +) + +func XidLoadBalance(sessions *sync.Map, xid string) getty.Session { + var session getty.Session + + // ip:port:transactionId + tmpSplits := strings.Split(xid, ":") + if len(tmpSplits) == 3 { + ip := tmpSplits[0] + port := tmpSplits[1] + ipPort := ip + ":" + port + sessions.Range(func(key, value interface{}) bool { + tmpSession := key.(getty.Session) + if tmpSession.IsClosed() { + sessions.Delete(tmpSession) + return true + } + connectedIpPort := session.RemoteAddr() + if ipPort == connectedIpPort { + session = tmpSession + return false + } + return true + }) + } + + if session == nil { + return RandomLoadBalance(sessions, xid) + } + + return session +}