@@ -60,7 +60,12 @@ func initTmClient(cfg *Config) { | |||
// initRemoting init rpc client | |||
func initRemoting(cfg *Config) { | |||
getty.InitRpcClient() | |||
getty.InitRpcClient(&cfg.GettyConfig, &getty.SeataConfig{ | |||
ApplicationID: cfg.ApplicationID, | |||
TxServiceGroup: cfg.TxServiceGroup, | |||
ServiceVgroupMapping: cfg.ServiceConfig.VgroupMapping, | |||
ServiceGrouplist: cfg.ServiceConfig.Grouplist, | |||
}) | |||
} | |||
// InitRmClient init client rm client | |||
@@ -26,13 +26,12 @@ import ( | |||
"runtime" | |||
"strings" | |||
"github.com/seata/seata-go/pkg/datasource/sql/undo" | |||
"github.com/knadh/koanf" | |||
"github.com/knadh/koanf/parsers/json" | |||
"github.com/knadh/koanf/parsers/toml" | |||
"github.com/knadh/koanf/parsers/yaml" | |||
"github.com/knadh/koanf/providers/rawbytes" | |||
"github.com/seata/seata-go/pkg/datasource/sql/undo" | |||
"github.com/seata/seata-go/pkg/remoting/getty" | |||
"github.com/seata/seata-go/pkg/rm" | |||
"github.com/seata/seata-go/pkg/rm/tcc" | |||
@@ -80,7 +80,6 @@ func TestLoadPath(t *testing.T) { | |||
assert.NotNil(t, cfg.GettyConfig.SessionConfig) | |||
assert.Equal(t, 0, cfg.GettyConfig.ReconnectInterval) | |||
assert.Equal(t, 16, cfg.GettyConfig.ConnectionNum) | |||
assert.Equal(t, time.Second*15, cfg.GettyConfig.HeartbeatPeriod) | |||
assert.Equal(t, false, cfg.GettyConfig.SessionConfig.CompressEncoding) | |||
assert.Equal(t, true, cfg.GettyConfig.SessionConfig.TCPNoDelay) | |||
assert.Equal(t, true, cfg.GettyConfig.SessionConfig.TCPKeepAlive) | |||
@@ -118,7 +117,7 @@ func TestLoadPath(t *testing.T) { | |||
} | |||
func TestLoadJson(t *testing.T) { | |||
confJson := `{"enabled":false,"application-id":"application_test","tx-service-group":"default_tx_group","access-key":"test","secret-key":"test","enable-auto-data-source-proxy":false,"data-source-proxy-mode":"AT","client":{"rm":{"async-commit-buffer-limit":10000,"report-retry-count":5,"table-meta-check-enable":false,"report-success-enable":false,"saga-branch-register-enable":false,"saga-json-parser":"fastjson","saga-retry-persist-mode-update":false,"saga-compensate-persist-mode-update":false,"tcc-action-interceptor-order":-2147482648,"sql-parser-type":"druid","lock":{"retry-interval":10,"retry-times":"30s","retry-policy-branch-rollback-on-conflict":true}},"tm":{"commit-retry-count":5,"rollback-retry-count":5,"default-global-transaction-timeout":"60s","degrade-check":false,"degrade-check-period":2000,"degrade-check-allow-times":"10s","interceptor-order":-2147482648},"undo":{"data-validation":false,"log-serialization":"jackson222","only-care-update-columns":false,"log-table":"undo_log333","compress":{"enable":false,"type":"zip111","threshold":"128k"}}},"tcc":{"fence":{"log-table-name":"tcc_fence_log_test2","clean-period":80000000000}},"getty":{"reconnect-interval":1,"connection-num":10,"heartbeat-period":"10s","session":{"compress-encoding":true,"tcp-no-delay":false,"tcp-keep-alive":false,"keep-alive-period":"120s","tcp-r-buf-size":261120,"tcp-w-buf-size":32768,"tcp-read-timeout":"2s","tcp-write-timeout":"8s","wait-timeout":"2s","max-msg-len":261120,"session-name":"client_test","cron-period":"2s"}},"transport":{"shutdown":{"wait":"3s"},"type":"TCP","server":"NIO","heartbeat":true,"serialization":"seata","compressor":"none"," enable-tm-client-batch-send-request":false,"enable-rm-client-batch-send-request":true,"rpc-rm-request-timeout":"30s","rpc-tm-request-timeout":"30s"},"service":{"enable-degrade":true,"disable-global-transaction":true,"vgroup-mapping":{"default_tx_group":"default_test"},"grouplist":{"default":"127.0.0.1:8092"}}}` | |||
confJson := `{"enabled":false,"application-id":"application_test","tx-service-group":"default_tx_group","access-key":"test","secret-key":"test","enable-auto-data-source-proxy":false,"data-source-proxy-mode":"AT","client":{"rm":{"async-commit-buffer-limit":10000,"report-retry-count":5,"table-meta-check-enable":false,"report-success-enable":false,"saga-branch-register-enable":false,"saga-json-parser":"fastjson","saga-retry-persist-mode-update":false,"saga-compensate-persist-mode-update":false,"tcc-action-interceptor-order":-2147482648,"sql-parser-type":"druid","lock":{"retry-interval":10,"retry-times":"30s","retry-policy-branch-rollback-on-conflict":true}},"tm":{"commit-retry-count":5,"rollback-retry-count":5,"default-global-transaction-timeout":"60s","degrade-check":false,"degrade-check-period":2000,"degrade-check-allow-times":"10s","interceptor-order":-2147482648},"undo":{"data-validation":false,"log-serialization":"jackson222","only-care-update-columns":false,"log-table":"undo_log333","compress":{"enable":false,"type":"zip111","threshold":"128k"}}},"tcc":{"fence":{"log-table-name":"tcc_fence_log_test2","clean-period":80000000000}},"getty":{"reconnect-interval":1,"connection-num":10,"session":{"compress-encoding":true,"tcp-no-delay":false,"tcp-keep-alive":false,"keep-alive-period":"120s","tcp-r-buf-size":261120,"tcp-w-buf-size":32768,"tcp-read-timeout":"2s","tcp-write-timeout":"8s","wait-timeout":"2s","max-msg-len":261120,"session-name":"client_test","cron-period":"2s"}},"transport":{"shutdown":{"wait":"3s"},"type":"TCP","server":"NIO","heartbeat":true,"serialization":"seata","compressor":"none"," enable-tm-client-batch-send-request":false,"enable-rm-client-batch-send-request":true,"rpc-rm-request-timeout":"30s","rpc-tm-request-timeout":"30s"},"service":{"enable-degrade":true,"disable-global-transaction":true,"vgroup-mapping":{"default_tx_group":"default_test"},"grouplist":{"default":"127.0.0.1:8092"}}}` | |||
cfg := LoadJson([]byte(confJson)) | |||
assert.NotNil(t, cfg) | |||
assert.Equal(t, false, cfg.Enabled) | |||
@@ -172,7 +171,6 @@ func TestLoadJson(t *testing.T) { | |||
assert.NotNil(t, cfg.GettyConfig.SessionConfig) | |||
assert.Equal(t, 1, cfg.GettyConfig.ReconnectInterval) | |||
assert.Equal(t, 10, cfg.GettyConfig.ConnectionNum) | |||
assert.Equal(t, time.Second*10, cfg.GettyConfig.HeartbeatPeriod) | |||
assert.Equal(t, true, cfg.GettyConfig.SessionConfig.CompressEncoding) | |||
assert.Equal(t, false, cfg.GettyConfig.SessionConfig.TCPNoDelay) | |||
assert.Equal(t, false, cfg.GettyConfig.SessionConfig.TCPKeepAlive) | |||
@@ -36,5 +36,7 @@ const ( | |||
GlobalLockKey = "TX_LOCK" | |||
SeataFilterKey = "seataDubboFilter" | |||
SeataVersion = "1.1.0" | |||
TccBusinessActionContextParameter = "tccParam" | |||
) |
@@ -20,12 +20,17 @@ package getty | |||
import ( | |||
"flag" | |||
"time" | |||
"github.com/seata/seata-go/pkg/util/flagext" | |||
) | |||
var ( | |||
seataConfig *SeataConfig | |||
) | |||
type Config struct { | |||
ReconnectInterval int `yaml:"reconnect-interval" json:"reconnect-interval" koanf:"reconnect-interval"` | |||
ConnectionNum int `yaml:"connection-num" json:"connection-num" koanf:"connection-num"` | |||
HeartbeatPeriod time.Duration `yaml:"heartbeat-period" json:"heartbeat-period" koanf:"heartbeat-period"` | |||
SessionConfig SessionConfig `yaml:"session" json:"session" koanf:"session"` | |||
} | |||
@@ -50,7 +55,6 @@ type TransportConfig struct { | |||
func (cfg *Config) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) { | |||
f.IntVar(&cfg.ReconnectInterval, prefix+".reconnect-interval", 0, "Reconnect interval.") | |||
f.IntVar(&cfg.ConnectionNum, prefix+".connection-num", 16, "The getty_session pool.") | |||
f.DurationVar(&cfg.HeartbeatPeriod, prefix+".heartbeat-period", 15*time.Second, "The heartbeat period.") | |||
cfg.SessionConfig.RegisterFlagsWithPrefix(prefix+".session", f) | |||
} | |||
@@ -70,3 +74,18 @@ func (cfg *TransportConfig) RegisterFlagsWithPrefix(prefix string, f *flag.FlagS | |||
f.DurationVar(&cfg.RPCRmRequestTimeout, prefix+".rpc-rm-request-timeout", 30*time.Second, "RM send request timeout.") | |||
f.DurationVar(&cfg.RPCTmRequestTimeout, prefix+".rpc-tm-request-timeout", 30*time.Second, "TM send request timeout.") | |||
} | |||
type SeataConfig struct { | |||
ApplicationID string | |||
TxServiceGroup string | |||
ServiceVgroupMapping flagext.StringMap | |||
ServiceGrouplist flagext.StringMap | |||
} | |||
func iniConfig(seataConf *SeataConfig) { | |||
seataConfig = seataConf | |||
} | |||
func getSeataConfig() *SeataConfig { | |||
return seataConfig | |||
} |
@@ -22,7 +22,7 @@ import ( | |||
"sync" | |||
getty "github.com/apache/dubbo-getty" | |||
"github.com/seata/seata-go/pkg/config" | |||
"github.com/seata/seata-go/pkg/constant" | |||
"github.com/seata/seata-go/pkg/protocol/codec" | |||
"github.com/seata/seata-go/pkg/protocol/message" | |||
"github.com/seata/seata-go/pkg/remoting/processor" | |||
@@ -36,7 +36,7 @@ var ( | |||
) | |||
type gettyClientHandler struct { | |||
conf *config.ClientConfig | |||
conf *SeataConfig | |||
idGenerator *atomic.Uint32 | |||
msgFutures *sync.Map | |||
mergeMsgMap *sync.Map | |||
@@ -48,7 +48,7 @@ func GetGettyClientHandlerInstance() *gettyClientHandler { | |||
if clientHandler == nil { | |||
onceClientHandler.Do(func() { | |||
clientHandler = &gettyClientHandler{ | |||
conf: config.GetDefaultClientConfig("seata-go"), | |||
conf: getSeataConfig(), | |||
idGenerator: &atomic.Uint32{}, | |||
msgFutures: &sync.Map{}, | |||
mergeMsgMap: &sync.Map{}, | |||
@@ -65,9 +65,9 @@ func (g *gettyClientHandler) OnOpen(session getty.Session) error { | |||
g.sessionManager.registerSession(session) | |||
go func() { | |||
request := message.RegisterTMRequest{AbstractIdentifyRequest: message.AbstractIdentifyRequest{ | |||
Version: g.conf.SeataVersion, | |||
Version: constant.SeataVersion, | |||
ApplicationId: g.conf.ApplicationID, | |||
TransactionServiceGroup: g.conf.TransactionServiceGroup, | |||
TransactionServiceGroup: g.conf.TxServiceGroup, | |||
}} | |||
err := GetGettyRemotingClient().SendAsyncRequest(request) | |||
if err != nil { | |||
@@ -21,9 +21,9 @@ import ( | |||
"crypto/tls" | |||
"fmt" | |||
"net" | |||
"strings" | |||
"sync" | |||
"github.com/seata/seata-go/pkg/config" | |||
"github.com/seata/seata-go/pkg/protocol/codec" | |||
"github.com/seata/seata-go/pkg/util/log" | |||
@@ -33,14 +33,17 @@ import ( | |||
) | |||
type RpcClient struct { | |||
conf *config.ClientConfig | |||
gettyConf *Config | |||
seataConf *SeataConfig | |||
gettyClients []getty.Client | |||
futures *sync.Map | |||
} | |||
func InitRpcClient() { | |||
func InitRpcClient(gettyConfig *Config, seataConfig *SeataConfig) { | |||
iniConfig(seataConfig) | |||
rpcClient := &RpcClient{ | |||
conf: config.GetClientConfig(), | |||
gettyConf: gettyConfig, | |||
seataConf: seataConfig, | |||
gettyClients: make([]getty.Client, 0), | |||
} | |||
codec.Init() | |||
@@ -48,15 +51,15 @@ func InitRpcClient() { | |||
} | |||
func (c *RpcClient) init() { | |||
addressList := getAvailServerList() | |||
addressList := c.getAvailServerList() | |||
if len(addressList) == 0 { | |||
log.Warn("no have valid seata server list") | |||
} | |||
for _, address := range addressList { | |||
gettyClient := getty.NewTCPClient( | |||
getty.WithServerAddress(address), | |||
getty.WithConnectionNumber(c.conf.GettyConfig.ConnectionNum), | |||
getty.WithReconnectInterval(c.conf.GettyConfig.ReconnectInterval), | |||
getty.WithConnectionNumber(c.gettyConf.ConnectionNum), | |||
getty.WithReconnectInterval(c.gettyConf.ReconnectInterval), | |||
getty.WithClientTaskPool(gxsync.NewTaskPoolSimple(0)), | |||
) | |||
go gettyClient.RunEventLoop(c.newSession) | |||
@@ -64,9 +67,21 @@ func (c *RpcClient) init() { | |||
} | |||
} | |||
// todo mock | |||
func getAvailServerList() []string { | |||
return []string{"127.0.0.1:8091"} | |||
func (c *RpcClient) getAvailServerList() []string { | |||
defaultAddressList := []string{"127.0.0.1:8091"} | |||
txServiceGroup := c.seataConf.TxServiceGroup | |||
if txServiceGroup == "" { | |||
return defaultAddressList | |||
} | |||
clusterName := c.seataConf.ServiceVgroupMapping[txServiceGroup] | |||
if clusterName == "" { | |||
return defaultAddressList | |||
} | |||
grouplist := c.seataConf.ServiceGrouplist[clusterName] | |||
if grouplist == "" { | |||
return defaultAddressList | |||
} | |||
return strings.Split(grouplist, ",") | |||
} | |||
func (c *RpcClient) newSession(session getty.Session) error { | |||
@@ -76,18 +91,11 @@ func (c *RpcClient) newSession(session getty.Session) error { | |||
err error | |||
) | |||
if c.conf.GettyConfig.GettySessionParam.CompressEncoding { | |||
if c.gettyConf.SessionConfig.CompressEncoding { | |||
session.SetCompressType(getty.CompressZip) | |||
} | |||
if _, ok = session.Conn().(*tls.Conn); ok { | |||
session.SetName(c.conf.GettyConfig.GettySessionParam.SessionName) | |||
session.SetMaxMsgLen(c.conf.GettyConfig.GettySessionParam.MaxMsgLen) | |||
session.SetPkgHandler(rpcPkgHandler) | |||
session.SetEventListener(GetGettyClientHandlerInstance()) | |||
session.SetReadTimeout(c.conf.GettyConfig.GettySessionParam.TCPReadTimeout) | |||
session.SetWriteTimeout(c.conf.GettyConfig.GettySessionParam.TCPWriteTimeout) | |||
session.SetCronPeriod((int)(c.conf.GettyConfig.GettySessionParam.CronPeriod)) | |||
session.SetWaitTime(c.conf.GettyConfig.GettySessionParam.WaitTimeout) | |||
c.setSessionConfig(session) | |||
log.Debugf("server accepts new tls session:%s\n", session.Stat()) | |||
return nil | |||
} | |||
@@ -100,34 +108,38 @@ func (c *RpcClient) newSession(session getty.Session) error { | |||
return errors.New(fmt.Sprintf("%s, session.conn{%#v} is not tcp connection", session.Stat(), session.Conn())) | |||
} | |||
if err = tcpConn.SetNoDelay(c.conf.GettyConfig.GettySessionParam.TCPNoDelay); err != nil { | |||
if err = tcpConn.SetNoDelay(c.gettyConf.SessionConfig.TCPNoDelay); err != nil { | |||
return err | |||
} | |||
if err = tcpConn.SetKeepAlive(c.conf.GettyConfig.GettySessionParam.TCPKeepAlive); err != nil { | |||
if err = tcpConn.SetKeepAlive(c.gettyConf.SessionConfig.TCPKeepAlive); err != nil { | |||
return err | |||
} | |||
if c.conf.GettyConfig.GettySessionParam.TCPKeepAlive { | |||
if err = tcpConn.SetKeepAlivePeriod(c.conf.GettyConfig.GettySessionParam.KeepAlivePeriod); err != nil { | |||
if c.gettyConf.SessionConfig.TCPKeepAlive { | |||
if err = tcpConn.SetKeepAlivePeriod(c.gettyConf.SessionConfig.KeepAlivePeriod); err != nil { | |||
return err | |||
} | |||
} | |||
if err = tcpConn.SetReadBuffer(c.conf.GettyConfig.GettySessionParam.TCPRBufSize); err != nil { | |||
if err = tcpConn.SetReadBuffer(c.gettyConf.SessionConfig.TCPRBufSize); err != nil { | |||
return err | |||
} | |||
if err = tcpConn.SetWriteBuffer(c.conf.GettyConfig.GettySessionParam.TCPWBufSize); err != nil { | |||
if err = tcpConn.SetWriteBuffer(c.gettyConf.SessionConfig.TCPWBufSize); err != nil { | |||
return err | |||
} | |||
} | |||
session.SetName(c.conf.GettyConfig.GettySessionParam.SessionName) | |||
session.SetMaxMsgLen(c.conf.GettyConfig.GettySessionParam.MaxMsgLen) | |||
session.SetPkgHandler(rpcPkgHandler) | |||
session.SetEventListener(GetGettyClientHandlerInstance()) | |||
session.SetReadTimeout(c.conf.GettyConfig.GettySessionParam.TCPReadTimeout) | |||
session.SetWriteTimeout(c.conf.GettyConfig.GettySessionParam.TCPWriteTimeout) | |||
session.SetCronPeriod((int)(c.conf.GettyConfig.GettySessionParam.CronPeriod.Nanoseconds() / 1e6)) | |||
session.SetWaitTime(c.conf.GettyConfig.GettySessionParam.WaitTimeout) | |||
c.setSessionConfig(session) | |||
log.Debugf("rpc_client new session:%s\n", session.Stat()) | |||
return nil | |||
} | |||
func (c *RpcClient) setSessionConfig(session getty.Session) { | |||
session.SetName(c.gettyConf.SessionConfig.SessionName) | |||
session.SetMaxMsgLen(c.gettyConf.SessionConfig.MaxMsgLen) | |||
session.SetPkgHandler(rpcPkgHandler) | |||
session.SetEventListener(GetGettyClientHandlerInstance()) | |||
session.SetReadTimeout(c.gettyConf.SessionConfig.TCPReadTimeout) | |||
session.SetWriteTimeout(c.gettyConf.SessionConfig.TCPWriteTimeout) | |||
session.SetCronPeriod((int)(c.gettyConf.SessionConfig.CronPeriod.Milliseconds())) | |||
session.SetWaitTime(c.gettyConf.SessionConfig.WaitTimeout) | |||
} |
@@ -144,7 +144,6 @@ seata: | |||
getty: | |||
reconnect-interval: 0 | |||
connection-num: 16 | |||
heartbeat-period: 15s | |||
session: | |||
compress-encoding: false | |||
tcp-no-delay: true | |||