Browse Source

refactor: refact getty module (#746)

* feat:add more linter

* feat:change golangclilint version to 1.57.x to support more linter

* feat:adjust lint conf and adjust the code to pass the check

* style: format some code; fix: some sql statement or rows was not been closed

* feat:close session when send heart beat message failed

* refactor: change GettyRemoting from singleton to GettyRemotingClient member

* remove rpc client

* refactor: complete GettyRemoting refactoring

* refactor: change getter name, improve variable naming and remove unused fields

* update something

* feat:add heart-beat failed retry times

* Refactor getty.
>
>
Co-authored-by: marsevilspirit <marsevilspirit@gmail.com>
Co-authored-by: solisamicus <solisamicus@163.com>
Co-authored-by: No-SilverBullet  <nosilverbullet.wu@gmail.com>

* Refactor getty.

Co-authored-by: marsevilspirit <marsevilspirit@gmail.com>
Co-authored-by: solisamicus <solisamicus@163.com>
Co-authored-by: No-SilverBullet  <nosilverbullet.wu@gmail.com>

---------

Co-authored-by: JayLiu <38887641+luky116@users.noreply.github.com>
Co-authored-by: SolisAmicus <solisamicus@163.com>
Co-authored-by: marsevilspirit <marsevilspirit@gmail.com>
tags/v2.0.0
xinfan.wu GitHub 9 months ago
parent
commit
8ce100621f
No known key found for this signature in database GPG Key ID: B5690EEEBB952194
11 changed files with 233 additions and 214 deletions
  1. +5
    -3
      pkg/client/client.go
  2. +1
    -1
      pkg/remoting/config/config.go
  3. +28
    -6
      pkg/remoting/getty/getty_client.go
  4. +3
    -3
      pkg/remoting/getty/getty_client_test.go
  5. +29
    -0
      pkg/remoting/getty/getty_init.go
  6. +5
    -15
      pkg/remoting/getty/getty_remoting.go
  7. +20
    -16
      pkg/remoting/getty/getty_remoting_test.go
  8. +22
    -17
      pkg/remoting/getty/listener.go
  9. +0
    -138
      pkg/remoting/getty/rpc_client.go
  10. +112
    -8
      pkg/remoting/getty/session_manager.go
  11. +8
    -7
      pkg/remoting/processor/client/client_on_response_processor.go

+ 5
- 3
pkg/client/client.go View File

@@ -62,15 +62,17 @@ func initTmClient(cfg *Config) {
})
}

// initRemoting init rpc client
// initRemoting init remoting
func initRemoting(cfg *Config) {
getty.InitRpcClient(&cfg.GettyConfig, &remoteConfig.SeataConfig{
seataConfig := remoteConfig.SeataConfig{
ApplicationID: cfg.ApplicationID,
TxServiceGroup: cfg.TxServiceGroup,
ServiceVgroupMapping: cfg.ServiceConfig.VgroupMapping,
ServiceGrouplist: cfg.ServiceConfig.Grouplist,
LoadBalanceType: cfg.GettyConfig.LoadBalanceType,
})
}

getty.InitGetty(&cfg.GettyConfig, &seataConfig)
}

// InitRmClient init client rm client


+ 1
- 1
pkg/remoting/config/config.go View File

@@ -84,7 +84,7 @@ type SeataConfig struct {
LoadBalanceType string
}

func IniConfig(seataConf *SeataConfig) {
func InitConfig(seataConf *SeataConfig) {
seataConfig = seataConf
}



+ 28
- 6
pkg/remoting/getty/getty_client.go View File

@@ -35,14 +35,16 @@ var (
)

type GettyRemotingClient struct {
idGenerator *atomic.Uint32
idGenerator *atomic.Uint32
gettyRemoting *GettyRemoting
}

func GetGettyRemotingClient() *GettyRemotingClient {
if gettyRemotingClient == nil {
onceGettyRemotingClient.Do(func() {
gettyRemotingClient = &GettyRemotingClient{
idGenerator: &atomic.Uint32{},
idGenerator: &atomic.Uint32{},
gettyRemoting: newGettyRemoting(),
}
})
}
@@ -63,7 +65,7 @@ func (client *GettyRemotingClient) SendAsyncRequest(msg interface{}) error {
Compressor: 0,
Body: msg,
}
return GetGettyRemotingInstance().SendASync(rpcMessage, nil, client.asyncCallback)
return client.gettyRemoting.SendAsync(rpcMessage, nil, client.asyncCallback)
}

func (client *GettyRemotingClient) SendAsyncResponse(msgID int32, msg interface{}) error {
@@ -74,7 +76,7 @@ func (client *GettyRemotingClient) SendAsyncResponse(msgID int32, msg interface{
Compressor: 0,
Body: msg,
}
return GetGettyRemotingInstance().SendASync(rpcMessage, nil, nil)
return client.gettyRemoting.SendAsync(rpcMessage, nil, nil)
}

func (client *GettyRemotingClient) SendSyncRequest(msg interface{}) (interface{}, error) {
@@ -85,7 +87,7 @@ func (client *GettyRemotingClient) SendSyncRequest(msg interface{}) (interface{}
Compressor: 0,
Body: msg,
}
return GetGettyRemotingInstance().SendSync(rpcMessage, nil, client.syncCallback)
return client.gettyRemoting.SendSync(rpcMessage, nil, client.syncCallback)
}

func (g *GettyRemotingClient) asyncCallback(reqMsg message.RpcMessage, respMsg *message.MessageFuture) (interface{}, error) {
@@ -96,10 +98,30 @@ func (g *GettyRemotingClient) asyncCallback(reqMsg message.RpcMessage, respMsg *
func (g *GettyRemotingClient) syncCallback(reqMsg message.RpcMessage, respMsg *message.MessageFuture) (interface{}, error) {
select {
case <-gxtime.GetDefaultTimerWheel().After(RpcRequestTimeout):
GetGettyRemotingInstance().RemoveMergedMessageFuture(reqMsg.ID)
g.gettyRemoting.RemoveMergedMessageFuture(reqMsg.ID)
log.Errorf("wait resp timeout: %#v", reqMsg)
return nil, fmt.Errorf("wait response timeout, request: %#v", reqMsg)
case <-respMsg.Done:
return respMsg.Response, respMsg.Err
}
}

func (client *GettyRemotingClient) GetMergedMessage(msgID int32) *message.MergedWarpMessage {
return client.gettyRemoting.GetMergedMessage(msgID)
}

func (client *GettyRemotingClient) GetMessageFuture(msgID int32) *message.MessageFuture {
return client.gettyRemoting.GetMessageFuture(msgID)
}

func (client *GettyRemotingClient) RemoveMessageFuture(msgID int32) {
client.gettyRemoting.RemoveMessageFuture(msgID)
}

func (client *GettyRemotingClient) RemoveMergedMessageFuture(msgID int32) {
client.gettyRemoting.RemoveMergedMessageFuture(msgID)
}

func (client *GettyRemotingClient) NotifyRpcMessageResponse(msg message.RpcMessage) {
client.gettyRemoting.NotifyRpcMessageResponse(msg)
}

+ 3
- 3
pkg/remoting/getty/getty_client_test.go View File

@@ -40,7 +40,7 @@ func TestGettyRemotingClient_SendSyncRequest(t *testing.T) {
},
},
}
gomonkey.ApplyMethod(reflect.TypeOf(GetGettyRemotingInstance()), "SendSync",
gomonkey.ApplyMethod(reflect.TypeOf(GetGettyRemotingClient().gettyRemoting), "SendSync",
func(_ *GettyRemoting, msg message.RpcMessage, s getty.Session, callback callbackMethod) (interface{},
error) {
return respMsg, nil
@@ -52,7 +52,7 @@ func TestGettyRemotingClient_SendSyncRequest(t *testing.T) {

// TestGettyRemotingClient_SendAsyncResponse unit test for SendAsyncResponse function
func TestGettyRemotingClient_SendAsyncResponse(t *testing.T) {
gomonkey.ApplyMethod(reflect.TypeOf(GetGettyRemotingInstance()), "SendASync",
gomonkey.ApplyMethod(reflect.TypeOf(GetGettyRemotingClient().gettyRemoting), "SendAsync",
func(_ *GettyRemoting, msg message.RpcMessage, s getty.Session, callback callbackMethod) error {
return nil
})
@@ -77,7 +77,7 @@ func TestGettyRemotingClient_SendAsyncRequest(t *testing.T) {
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
gomonkey.ApplyMethod(reflect.TypeOf(GetGettyRemotingInstance()), "SendASync",
gomonkey.ApplyMethod(reflect.TypeOf(GetGettyRemotingClient().gettyRemoting), "SendAsync",
func(_ *GettyRemoting, msg message.RpcMessage, s getty.Session, callback callbackMethod) error {
return nil
})


+ 29
- 0
pkg/remoting/getty/getty_init.go View File

@@ -0,0 +1,29 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package getty

import (
"seata.apache.org/seata-go/pkg/protocol/codec"
"seata.apache.org/seata-go/pkg/remoting/config"
)

func InitGetty(gettyConfig *config.Config, seataConfig *config.SeataConfig) {
config.InitConfig(seataConfig)
codec.Init()
initSessionManager(gettyConfig)
}

+ 5
- 15
pkg/remoting/getty/getty_remoting.go View File

@@ -33,11 +33,6 @@ const (
RpcRequestTimeout = 20 * time.Second
)

var (
gettyRemoting *GettyRemoting
onceGettyRemoting = &sync.Once{}
)

type (
callbackMethod func(reqMsg message.RpcMessage, respMsg *message.MessageFuture) (interface{}, error)
GettyRemoting struct {
@@ -46,16 +41,11 @@ type (
}
)

func GetGettyRemotingInstance() *GettyRemoting {
if gettyRemoting == nil {
onceGettyRemoting.Do(func() {
gettyRemoting = &GettyRemoting{
futures: &sync.Map{},
mergeMsgMap: &sync.Map{},
}
})
func newGettyRemoting() *GettyRemoting {
return &GettyRemoting{
futures: &sync.Map{},
mergeMsgMap: &sync.Map{},
}
return gettyRemoting
}

func (g *GettyRemoting) SendSync(msg message.RpcMessage, s getty.Session, callback callbackMethod) (interface{}, error) {
@@ -72,7 +62,7 @@ func (g *GettyRemoting) SendSync(msg message.RpcMessage, s getty.Session, callba
return result, err
}

func (g *GettyRemoting) SendASync(msg message.RpcMessage, s getty.Session, callback callbackMethod) error {
func (g *GettyRemoting) SendAsync(msg message.RpcMessage, s getty.Session, callback callbackMethod) error {
if s == nil {
s = sessionManager.selectSession(msg)
}


+ 20
- 16
pkg/remoting/getty/getty_remoting_test.go View File

@@ -47,14 +47,15 @@ func TestGettyRemoting_GetMessageFuture(t *testing.T) {
},
},
}
gettyRemotingClient := GetGettyRemotingClient()
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
if test.messageFuture != nil {
GetGettyRemotingInstance().futures.Store(test.msgID, test.messageFuture)
messageFuture := GetGettyRemotingInstance().GetMessageFuture(test.msgID)
gettyRemotingClient.gettyRemoting.futures.Store(test.msgID, test.messageFuture)
messageFuture := gettyRemotingClient.gettyRemoting.GetMessageFuture(test.msgID)
assert.Equal(t, *test.messageFuture, *messageFuture)
} else {
messageFuture := GetGettyRemotingInstance().GetMessageFuture(test.msgID)
messageFuture := gettyRemotingClient.gettyRemoting.GetMessageFuture(test.msgID)
assert.Empty(t, messageFuture)
}
})
@@ -78,13 +79,14 @@ func TestGettyRemoting_RemoveMessageFuture(t *testing.T) {
},
},
}
gettyRemotingClient := GetGettyRemotingClient()
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
GetGettyRemotingInstance().futures.Store(test.msgID, test.messageFuture)
messageFuture := GetGettyRemotingInstance().GetMessageFuture(test.msgID)
gettyRemotingClient.gettyRemoting.futures.Store(test.msgID, test.messageFuture)
messageFuture := gettyRemotingClient.gettyRemoting.GetMessageFuture(test.msgID)
assert.Equal(t, messageFuture, test.messageFuture)
GetGettyRemotingInstance().RemoveMessageFuture(test.msgID)
messageFuture = GetGettyRemotingInstance().GetMessageFuture(test.msgID)
gettyRemotingClient.gettyRemoting.RemoveMessageFuture(test.msgID)
messageFuture = gettyRemotingClient.gettyRemoting.GetMessageFuture(test.msgID)
assert.Empty(t, messageFuture)
})
}
@@ -110,14 +112,15 @@ func TestGettyRemoting_GetMergedMessage(t *testing.T) {
},
},
}
gettyRemotingClient := GetGettyRemotingClient()
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
if test.mergedWarpMessage != nil {
GetGettyRemotingInstance().mergeMsgMap.Store(test.msgID, test.mergedWarpMessage)
mergedWarpMessage := GetGettyRemotingInstance().GetMergedMessage(test.msgID)
gettyRemotingClient.gettyRemoting.mergeMsgMap.Store(test.msgID, test.mergedWarpMessage)
mergedWarpMessage := gettyRemotingClient.gettyRemoting.GetMergedMessage(test.msgID)
assert.Equal(t, *test.mergedWarpMessage, *mergedWarpMessage)
} else {
mergedWarpMessage := GetGettyRemotingInstance().GetMessageFuture(test.msgID)
mergedWarpMessage := gettyRemotingClient.gettyRemoting.GetMessageFuture(test.msgID)
assert.Empty(t, mergedWarpMessage)
}
})
@@ -144,18 +147,19 @@ func TestGettyRemoting_RemoveMergedMessageFuture(t *testing.T) {
},
},
}
gettyRemotingClient := GetGettyRemotingClient()
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
if test.mergedWarpMessage != nil {
GetGettyRemotingInstance().mergeMsgMap.Store(test.msgID, test.mergedWarpMessage)
mergedWarpMessage := GetGettyRemotingInstance().GetMergedMessage(test.msgID)
gettyRemotingClient.gettyRemoting.mergeMsgMap.Store(test.msgID, test.mergedWarpMessage)
mergedWarpMessage := gettyRemotingClient.gettyRemoting.GetMergedMessage(test.msgID)
assert.NotEmpty(t, mergedWarpMessage)
GetGettyRemotingInstance().RemoveMergedMessageFuture(test.msgID)
mergedWarpMessage = GetGettyRemotingInstance().GetMergedMessage(test.msgID)
gettyRemotingClient.gettyRemoting.RemoveMergedMessageFuture(test.msgID)
mergedWarpMessage = gettyRemotingClient.gettyRemoting.GetMergedMessage(test.msgID)
assert.Empty(t, mergedWarpMessage)
} else {
GetGettyRemotingInstance().RemoveMergedMessageFuture(test.msgID)
mergedWarpMessage := GetGettyRemotingInstance().GetMergedMessage(test.msgID)
gettyRemotingClient.gettyRemoting.RemoveMergedMessageFuture(test.msgID)
mergedWarpMessage := gettyRemotingClient.gettyRemoting.GetMergedMessage(test.msgID)
assert.Empty(t, mergedWarpMessage)
}
})


+ 22
- 17
pkg/remoting/getty/listener.go View File

@@ -38,22 +38,16 @@ var (
)

type gettyClientHandler struct {
idGenerator *atomic.Uint32
msgFutures *sync.Map
mergeMsgMap *sync.Map
sessionManager *SessionManager
processorMap map[message.MessageType]processor.RemotingProcessor
idGenerator *atomic.Uint32
processorMap map[message.MessageType]processor.RemotingProcessor
}

func GetGettyClientHandlerInstance() *gettyClientHandler {
if clientHandler == nil {
onceClientHandler.Do(func() {
clientHandler = &gettyClientHandler{
idGenerator: &atomic.Uint32{},
msgFutures: &sync.Map{},
mergeMsgMap: &sync.Map{},
sessionManager: sessionManager,
processorMap: make(map[message.MessageType]processor.RemotingProcessor, 0),
idGenerator: &atomic.Uint32{},
processorMap: make(map[message.MessageType]processor.RemotingProcessor, 0),
}
})
}
@@ -62,7 +56,7 @@ func GetGettyClientHandlerInstance() *gettyClientHandler {

func (g *gettyClientHandler) OnOpen(session getty.Session) error {
log.Infof("Open new getty session ")
g.sessionManager.registerSession(session)
sessionManager.registerSession(session)
conf := config.GetSeataConfig()
go func() {
request := message.RegisterTMRequest{AbstractIdentifyRequest: message.AbstractIdentifyRequest{
@@ -73,7 +67,7 @@ func (g *gettyClientHandler) OnOpen(session getty.Session) error {
err := GetGettyRemotingClient().SendAsyncRequest(request)
if err != nil {
log.Errorf("OnOpen error: {%#v}", err.Error())
g.sessionManager.releaseSession(session)
sessionManager.releaseSession(session)
return
}
}()
@@ -83,12 +77,12 @@ func (g *gettyClientHandler) OnOpen(session getty.Session) error {

func (g *gettyClientHandler) OnError(session getty.Session, err error) {
log.Infof("session{%s} got error{%v}, will be closed.", session.Stat(), err)
g.sessionManager.releaseSession(session)
sessionManager.releaseSession(session)
}

func (g *gettyClientHandler) OnClose(session getty.Session) {
log.Infof("session{%s} is closing......", session.Stat())
g.sessionManager.releaseSession(session)
sessionManager.releaseSession(session)
}

func (g *gettyClientHandler) OnMessage(session getty.Session, pkg interface{}) {
@@ -117,8 +111,19 @@ func (g *gettyClientHandler) OnCron(session getty.Session) {
log.Debug("session{%s} Oncron executing", session.Stat())
err := g.transferHeartBeat(session, message.HeartBeatMessagePing)
if err != nil {
log.Errorf("failed to send heart beat: {%#v}", err.Error())
g.sessionManager.releaseSession(session)
log.Warnf("failed to send heart beat: {%#v}", err.Error())
if session.GetAttribute(heartBeatRetryTimesKey) != nil {
retryTimes := session.GetAttribute(heartBeatRetryTimesKey).(int)
if retryTimes >= maxHeartBeatRetryTimes {
log.Warnf("heartbeat retry times exceed default max retry times{%d}, close the session{%s}",
maxHeartBeatRetryTimes, session.Stat())
sessionManager.releaseSession(session)
return
}
session.SetAttribute(heartBeatRetryTimesKey, retryTimes+1)
} else {
session.SetAttribute(heartBeatRetryTimesKey, 1)
}
}
}

@@ -130,7 +135,7 @@ func (g *gettyClientHandler) transferHeartBeat(session getty.Session, msg messag
Compressor: 0,
Body: msg,
}
return GetGettyRemotingInstance().SendASync(rpcMessage, session, nil)
return GetGettyRemotingClient().gettyRemoting.SendAsync(rpcMessage, session, nil)
}

func (g *gettyClientHandler) RegisterProcessor(msgType message.MessageType, processor processor.RemotingProcessor) {


+ 0
- 138
pkg/remoting/getty/rpc_client.go View File

@@ -1,138 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package getty

import (
"crypto/tls"
"fmt"
"net"
"sync"

getty "github.com/apache/dubbo-getty"
gxsync "github.com/dubbogo/gost/sync"

"seata.apache.org/seata-go/pkg/discovery"
"seata.apache.org/seata-go/pkg/protocol/codec"
"seata.apache.org/seata-go/pkg/remoting/config"
"seata.apache.org/seata-go/pkg/util/log"
)

type RpcClient struct {
gettyConf *config.Config
seataConf *config.SeataConfig
gettyClients []getty.Client
futures *sync.Map
}

func InitRpcClient(gettyConfig *config.Config, seataConfig *config.SeataConfig) {
config.IniConfig(seataConfig)
rpcClient := &RpcClient{
gettyConf: gettyConfig,
seataConf: seataConfig,
gettyClients: make([]getty.Client, 0),
}
codec.Init()
rpcClient.init()
}

func (c *RpcClient) init() {
addressList := c.getAvailServerList()
if len(addressList) == 0 {
log.Warn("no have valid seata server list")
}
for _, address := range addressList {
gettyClient := getty.NewTCPClient(
getty.WithServerAddress(fmt.Sprintf("%s:%d", address.Addr, address.Port)),
// todo if read c.gettyConf.ConnectionNum, will cause the connect to fail
getty.WithConnectionNumber(1),
getty.WithReconnectInterval(c.gettyConf.ReconnectInterval),
getty.WithClientTaskPool(gxsync.NewTaskPoolSimple(0)),
)
go gettyClient.RunEventLoop(c.newSession)
// c.gettyClients = append(c.gettyClients, gettyClient)
}
}

func (c *RpcClient) getAvailServerList() []*discovery.ServiceInstance {
registryService := discovery.GetRegistry()
instances, err := registryService.Lookup(c.seataConf.TxServiceGroup)
if err != nil {
return nil
}
return instances
}

func (c *RpcClient) newSession(session getty.Session) error {
var (
ok bool
tcpConn *net.TCPConn
err error
)

if c.gettyConf.SessionConfig.CompressEncoding {
session.SetCompressType(getty.CompressZip)
}
if _, ok = session.Conn().(*tls.Conn); ok {
c.setSessionConfig(session)
log.Debugf("server accepts new tls session:%s\n", session.Stat())
return nil
}
if _, ok = session.Conn().(*net.TCPConn); !ok {
panic(fmt.Sprintf("%s, session.conn{%#v} is not a tcp connection\n", session.Stat(), session.Conn()))
}

if _, ok = session.Conn().(*tls.Conn); !ok {
if tcpConn, ok = session.Conn().(*net.TCPConn); !ok {
return fmt.Errorf("%s, session.conn{%#v} is not tcp connection", session.Stat(), session.Conn())
}

if err = tcpConn.SetNoDelay(c.gettyConf.SessionConfig.TCPNoDelay); err != nil {
return err
}
if err = tcpConn.SetKeepAlive(c.gettyConf.SessionConfig.TCPKeepAlive); err != nil {
return err
}
if c.gettyConf.SessionConfig.TCPKeepAlive {
if err = tcpConn.SetKeepAlivePeriod(c.gettyConf.SessionConfig.KeepAlivePeriod); err != nil {
return err
}
}
if err = tcpConn.SetReadBuffer(c.gettyConf.SessionConfig.TCPRBufSize); err != nil {
return err
}
if err = tcpConn.SetWriteBuffer(c.gettyConf.SessionConfig.TCPWBufSize); err != nil {
return err
}
}

c.setSessionConfig(session)
log.Debugf("rpc_client new session:%s\n", session.Stat())

return nil
}

func (c *RpcClient) setSessionConfig(session getty.Session) {
session.SetName(c.gettyConf.SessionConfig.SessionName)
session.SetMaxMsgLen(c.gettyConf.SessionConfig.MaxMsgLen)
session.SetPkgHandler(rpcPkgHandler)
session.SetEventListener(GetGettyClientHandlerInstance())
session.SetReadTimeout(c.gettyConf.SessionConfig.TCPReadTimeout)
session.SetWriteTimeout(c.gettyConf.SessionConfig.TCPWriteTimeout)
session.SetCronPeriod((int)(c.gettyConf.SessionConfig.CronPeriod.Milliseconds()))
session.SetWaitTime(c.gettyConf.SessionConfig.WaitTimeout)
}

+ 112
- 8
pkg/remoting/getty/session_manager.go View File

@@ -18,38 +18,142 @@
package getty

import (
"crypto/tls"
"fmt"
"net"
"reflect"
"sync"
"sync/atomic"
"time"

getty "github.com/apache/dubbo-getty"
gxsync "github.com/dubbogo/gost/sync"

"seata.apache.org/seata-go/pkg/discovery"
"seata.apache.org/seata-go/pkg/protocol/message"
"seata.apache.org/seata-go/pkg/remoting/config"
"seata.apache.org/seata-go/pkg/remoting/loadbalance"
"seata.apache.org/seata-go/pkg/util/log"
)

const (
maxCheckAliveRetry = 600
checkAliveInternal = 100
maxCheckAliveRetry = 600
checkAliveInternal = 100
heartBeatRetryTimesKey = "heartbeat-retry-times"
maxHeartBeatRetryTimes = 3
)

var sessionManager = newSessionManager()
var (
sessionManager *SessionManager
onceSessionManager = &sync.Once{}
)

type SessionManager struct {
// serverAddress -> rpc_client.Session -> bool
serverSessions sync.Map
allSessions sync.Map
sessionSize int32
gettyConf *config.Config
}

func initSessionManager(gettyConfig *config.Config) {
if sessionManager == nil {
onceSessionManager.Do(func() {
sessionManager = &SessionManager{
allSessions: sync.Map{},
serverSessions: sync.Map{},
gettyConf: gettyConfig,
}
sessionManager.init()
})
}
}

func newSessionManager() *SessionManager {
return &SessionManager{
allSessions: sync.Map{},
// serverAddress -> rpc_client.Session -> bool
serverSessions: sync.Map{},
func (g *SessionManager) init() {
addressList := g.getAvailServerList()
if len(addressList) == 0 {
log.Warn("no have valid seata server list")
}
for _, address := range addressList {
gettyClient := getty.NewTCPClient(
getty.WithServerAddress(fmt.Sprintf("%s:%d", address.Addr, address.Port)),
// todo if read c.gettyConf.ConnectionNum, will cause the connect to fail
getty.WithConnectionNumber(1),
getty.WithReconnectInterval(g.gettyConf.ReconnectInterval),
getty.WithClientTaskPool(gxsync.NewTaskPoolSimple(0)),
)
go gettyClient.RunEventLoop(g.newSession)
}
}

func (g *SessionManager) getAvailServerList() []*discovery.ServiceInstance {
registryService := discovery.GetRegistry()
instances, err := registryService.Lookup(config.GetSeataConfig().TxServiceGroup)
if err != nil {
return nil
}
return instances
}

func (g *SessionManager) setSessionConfig(session getty.Session) {
session.SetName(g.gettyConf.SessionConfig.SessionName)
session.SetMaxMsgLen(g.gettyConf.SessionConfig.MaxMsgLen)
session.SetPkgHandler(rpcPkgHandler)
session.SetEventListener(GetGettyClientHandlerInstance())
session.SetReadTimeout(g.gettyConf.SessionConfig.TCPReadTimeout)
session.SetWriteTimeout(g.gettyConf.SessionConfig.TCPWriteTimeout)
session.SetCronPeriod((int)(g.gettyConf.SessionConfig.CronPeriod.Milliseconds()))
session.SetWaitTime(g.gettyConf.SessionConfig.WaitTimeout)
session.SetAttribute(heartBeatRetryTimesKey, 0)
}

func (g *SessionManager) newSession(session getty.Session) error {
var (
ok bool
tcpConn *net.TCPConn
err error
)

if g.gettyConf.SessionConfig.CompressEncoding {
session.SetCompressType(getty.CompressZip)
}
if _, ok = session.Conn().(*tls.Conn); ok {
g.setSessionConfig(session)
log.Debugf("server accepts new tls session:%s\n", session.Stat())
return nil
}
if _, ok = session.Conn().(*net.TCPConn); !ok {
panic(fmt.Sprintf("%s, session.conn{%#v} is not a tcp connection\n", session.Stat(), session.Conn()))
}

if _, ok = session.Conn().(*tls.Conn); !ok {
if tcpConn, ok = session.Conn().(*net.TCPConn); !ok {
return fmt.Errorf("%s, session.conn{%#v} is not tcp connection", session.Stat(), session.Conn())
}

if err = tcpConn.SetNoDelay(g.gettyConf.SessionConfig.TCPNoDelay); err != nil {
return err
}
if err = tcpConn.SetKeepAlive(g.gettyConf.SessionConfig.TCPKeepAlive); err != nil {
return err
}
if g.gettyConf.SessionConfig.TCPKeepAlive {
if err = tcpConn.SetKeepAlivePeriod(g.gettyConf.SessionConfig.KeepAlivePeriod); err != nil {
return err
}
}
if err = tcpConn.SetReadBuffer(g.gettyConf.SessionConfig.TCPRBufSize); err != nil {
return err
}
if err = tcpConn.SetWriteBuffer(g.gettyConf.SessionConfig.TCPWBufSize); err != nil {
return err
}
}

g.setSessionConfig(session)
log.Debugf("rpc_client new session:%s\n", session.Stat())

return nil
}

func (g *SessionManager) selectSession(msg interface{}) getty.Session {


+ 8
- 7
pkg/remoting/processor/client/client_on_response_processor.go View File

@@ -46,27 +46,28 @@ type clientOnResponseProcessor struct{}

func (f *clientOnResponseProcessor) Process(ctx context.Context, rpcMessage message.RpcMessage) error {
log.Infof("the rm client received clientOnResponse msg %#v from tc server.", rpcMessage)
gettyRemotingClient := getty.GetGettyRemotingClient()
if mergedResult, ok := rpcMessage.Body.(message.MergeResultMessage); ok {
mergedMessage := getty.GetGettyRemotingInstance().GetMergedMessage(rpcMessage.ID)
mergedMessage := gettyRemotingClient.GetMergedMessage(rpcMessage.ID)
if mergedMessage != nil {
for i := 0; i < len(mergedMessage.Msgs); i++ {
msgID := mergedMessage.MsgIds[i]
response := getty.GetGettyRemotingInstance().GetMessageFuture(msgID)
response := gettyRemotingClient.GetMessageFuture(msgID)
if response != nil {
response.Response = mergedResult.Msgs[i]
response.Done <- struct{}{}
getty.GetGettyRemotingInstance().RemoveMessageFuture(msgID)
gettyRemotingClient.RemoveMessageFuture(msgID)
}
}
getty.GetGettyRemotingInstance().RemoveMergedMessageFuture(rpcMessage.ID)
gettyRemotingClient.RemoveMergedMessageFuture(rpcMessage.ID)
}
return nil
} else {
// 如果是请求消息,做处理逻辑
msgFuture := getty.GetGettyRemotingInstance().GetMessageFuture(rpcMessage.ID)
msgFuture := gettyRemotingClient.GetMessageFuture(rpcMessage.ID)
if msgFuture != nil {
getty.GetGettyRemotingInstance().NotifyRpcMessageResponse(rpcMessage)
getty.GetGettyRemotingInstance().RemoveMessageFuture(rpcMessage.ID)
gettyRemotingClient.NotifyRpcMessageResponse(rpcMessage)
gettyRemotingClient.RemoveMessageFuture(rpcMessage.ID)
} else {
if _, ok := rpcMessage.Body.(message.AbstractResultMessage); ok {
log.Infof("the rm client received response msg [{}] from tc server.", msgFuture)


Loading…
Cancel
Save