Browse Source

fix getty auto close bug (#130)

tags/v0.1.0-rc1
Yuecai Liu GitHub 3 years ago
parent
commit
af70930997
No known key found for this signature in database GPG Key ID: 4AEE18F83AFDEB23
12 changed files with 153 additions and 149 deletions
  1. +9
    -1
      pkg/common/log/logging.go
  2. +3
    -3
      pkg/config/getty_config.go
  3. +4
    -4
      pkg/protocol/codec/branch_commit_response_codec.go
  4. +2
    -2
      pkg/protocol/codec/branch_rollback_response_codec.go
  5. +21
    -14
      pkg/remoting/getty/getty_client.go
  6. +29
    -51
      pkg/remoting/getty/getty_remoting.go
  7. +41
    -25
      pkg/remoting/getty/listener.go
  8. +36
    -42
      pkg/remoting/getty/session_manager.go
  9. +5
    -4
      pkg/remoting/processor/client/client_heart_beat_processon.go
  10. +1
    -1
      pkg/remoting/processor/client/client_heart_beat_processor_test.go
  11. +1
    -1
      pkg/remoting/processor/client/rm_branch_commit_processor.go
  12. +1
    -1
      pkg/remoting/processor/client/rm_branch_rollback_processor.go

+ 9
- 1
pkg/common/log/logging.go View File

@@ -104,7 +104,15 @@ var (
log Logger
zapLogger *zap.Logger

zapLoggerConfig = zap.NewDevelopmentConfig()
zapLoggerConfig = zap.Config{
// todo read level from config
Level: zap.NewAtomicLevelAt(zap.InfoLevel),
Development: true,
Encoding: "console",
EncoderConfig: zap.NewDevelopmentEncoderConfig(),
OutputPaths: []string{"stderr"},
ErrorOutputPaths: []string{"stderr"},
}
zapLoggerEncoderConfig = zapcore.EncoderConfig{
TimeKey: "time",
LevelKey: "level",


+ 3
- 3
pkg/config/getty_config.go View File

@@ -38,8 +38,8 @@ type GettyConfig struct {
// GetDefaultGettyConfig ...
func GetDefaultGettyConfig() GettyConfig {
return GettyConfig{
ReconnectInterval: 1,
ConnectionNum: 20,
ReconnectInterval: 0,
ConnectionNum: 1,
HeartbeatPeriod: 10 * time.Second,
GettySessionParam: GettySessionParam{
CompressEncoding: false,
@@ -51,7 +51,7 @@ func GetDefaultGettyConfig() GettyConfig {
TCPReadTimeout: time.Second,
TCPWriteTimeout: 5 * time.Second,
WaitTimeout: time.Second,
CronPeriod: 5 * time.Second,
CronPeriod: time.Second,
MaxMsgLen: 4096,
SessionName: "rpc_client",
},


+ 4
- 4
pkg/protocol/codec/branch_commit_response_codec.go View File

@@ -39,7 +39,7 @@ func (g *BranchCommitResponseCodec) Decode(in []byte) interface{} {

data.ResultCode = message.ResultCode(bytes.ReadByte(buf))
if data.ResultCode == message.ResultCodeFailed {
data.Msg = bytes.ReadString16Length(buf)
data.Msg = bytes.ReadString8Length(buf)
}
data.TransactionExceptionCode = serror.TransactionExceptionCode(bytes.ReadByte(buf))
data.Xid = bytes.ReadString16Length(buf)
@@ -56,10 +56,10 @@ func (g *BranchCommitResponseCodec) Encode(in interface{}) []byte {
buf.WriteByte(byte(data.ResultCode))
if data.ResultCode == message.ResultCodeFailed {
msg := data.Msg
if len(data.Msg) > math.MaxInt16 {
msg = data.Msg[:math.MaxInt16]
if len(data.Msg) > math.MaxInt8 {
msg = data.Msg[:math.MaxInt8]
}
bytes.WriteString16Length(msg, buf)
bytes.WriteString8Length(msg, buf)
}
buf.WriteByte(byte(data.TransactionExceptionCode))
bytes.WriteString16Length(data.Xid, buf)


+ 2
- 2
pkg/protocol/codec/branch_rollback_response_codec.go View File

@@ -39,7 +39,7 @@ func (g *BranchRollbackResponseCodec) Decode(in []byte) interface{} {

data.ResultCode = message.ResultCode(bytes.ReadByte(buf))
if data.ResultCode == message.ResultCodeFailed {
data.Msg = bytes.ReadString16Length(buf)
data.Msg = bytes.ReadString8Length(buf)
}
data.TransactionExceptionCode = serror.TransactionExceptionCode(bytes.ReadByte(buf))
data.Xid = bytes.ReadString16Length(buf)
@@ -59,7 +59,7 @@ func (g *BranchRollbackResponseCodec) Encode(in interface{}) []byte {
if len(data.Msg) > math.MaxInt16 {
msg = data.Msg[:math.MaxInt16]
}
bytes.WriteString16Length(msg, buf)
bytes.WriteString8Length(msg, buf)
}
buf.WriteByte(byte(data.TransactionExceptionCode))
bytes.WriteString16Length(data.Xid, buf)


+ 21
- 14
pkg/remoting/getty/getty_client.go View File

@@ -19,8 +19,10 @@ package getty

import (
"sync"
"time"

gxtime "github.com/dubbogo/gost/time"
"github.com/pkg/errors"
"github.com/seata/seata-go/pkg/common/log"
"github.com/seata/seata-go/pkg/protocol/codec"
"github.com/seata/seata-go/pkg/protocol/message"
"go.uber.org/atomic"
@@ -60,18 +62,18 @@ func (client *GettyRemotingClient) SendAsyncRequest(msg interface{}) error {
Compressor: 0,
Body: msg,
}
return GetGettyRemotingInstance().SendASync(rpcMessage)
return GetGettyRemotingInstance().SendASync(rpcMessage, nil, client.asyncCallback)
}

func (client *GettyRemotingClient) SendAsyncResponse(msg interface{}) error {
func (client *GettyRemotingClient) SendAsyncResponse(msgID int32, msg interface{}) error {
rpcMessage := message.RpcMessage{
ID: int32(client.idGenerator.Inc()),
ID: msgID,
Type: message.GettyRequestType_Response,
Codec: byte(codec.CodecTypeSeata),
Compressor: 0,
Body: msg,
}
return GetGettyRemotingInstance().SendASync(rpcMessage)
return GetGettyRemotingInstance().SendASync(rpcMessage, nil, nil)
}

func (client *GettyRemotingClient) SendSyncRequest(msg interface{}) (interface{}, error) {
@@ -82,16 +84,21 @@ func (client *GettyRemotingClient) SendSyncRequest(msg interface{}) (interface{}
Compressor: 0,
Body: msg,
}
return GetGettyRemotingInstance().SendSync(rpcMessage)
return GetGettyRemotingInstance().SendSync(rpcMessage, nil, client.syncCallback)
}

func (client *GettyRemotingClient) SendSyncRequestWithTimeout(msg interface{}, timeout time.Duration) (interface{}, error) {
rpcMessage := message.RpcMessage{
ID: int32(client.idGenerator.Inc()),
Type: message.GettyRequestType_RequestSync,
Codec: byte(codec.CodecTypeSeata),
Compressor: 0,
Body: msg,
func (g *GettyRemotingClient) asyncCallback(reqMsg message.RpcMessage, respMsg *message.MessageFuture) (interface{}, error) {
go g.asyncCallback(reqMsg, respMsg)
return nil, nil
}

func (g *GettyRemotingClient) syncCallback(reqMsg message.RpcMessage, respMsg *message.MessageFuture) (interface{}, error) {
select {
case <-gxtime.GetDefaultTimerWheel().After(RPC_REQUEST_TIMEOUT):
GetGettyRemotingInstance().RemoveMergedMessageFuture(reqMsg.ID)
log.Errorf("wait resp timeout: %#v", reqMsg)
return nil, errors.Errorf("wait response timeout, request: %#v", reqMsg)
case <-respMsg.Done:
return respMsg.Response, respMsg.Err
}
return GetGettyRemotingInstance().SendSyncWithTimeout(rpcMessage, timeout)
}

+ 29
- 51
pkg/remoting/getty/getty_remoting.go View File

@@ -23,14 +23,12 @@ import (

getty "github.com/apache/dubbo-getty"

gxtime "github.com/dubbogo/gost/time"
"github.com/pkg/errors"
"github.com/seata/seata-go/pkg/common/log"
"github.com/seata/seata-go/pkg/protocol/message"
)

const (
RPC_REQUEST_TIMEOUT = 5 * time.Second
RPC_REQUEST_TIMEOUT = 2 * time.Second
)

var (
@@ -38,10 +36,13 @@ var (
onceGettyRemoting = &sync.Once{}
)

type GettyRemoting struct {
futures *sync.Map
mergeMsgMap *sync.Map
}
type (
callbackMethod func(reqMsg message.RpcMessage, respMsg *message.MessageFuture) (interface{}, error)
GettyRemoting struct {
futures *sync.Map
mergeMsgMap *sync.Map
}
)

func GetGettyRemotingInstance() *GettyRemoting {
if gettyRemoting == nil {
@@ -55,67 +56,44 @@ func GetGettyRemotingInstance() *GettyRemoting {
return gettyRemoting
}

func (client *GettyRemoting) SendSync(msg message.RpcMessage) (interface{}, error) {
ss := sessionManager.AcquireGettySession()
return client.sendAsync(ss, msg, RPC_REQUEST_TIMEOUT)
}

func (client *GettyRemoting) SendSyncWithTimeout(msg message.RpcMessage, timeout time.Duration) (interface{}, error) {
ss := sessionManager.AcquireGettySession()
return client.sendAsync(ss, msg, timeout)
func (g *GettyRemoting) SendSync(msg message.RpcMessage, s getty.Session, callback callbackMethod) (interface{}, error) {
if s == nil {
s = sessionManager.selectSession()
}
return g.sendAsync(s, msg, callback)
}

func (client *GettyRemoting) SendASync(msg message.RpcMessage) error {
ss := sessionManager.AcquireGettySession()
_, err := client.sendAsync(ss, msg, 0*time.Second)
func (g *GettyRemoting) SendASync(msg message.RpcMessage, s getty.Session, callback callbackMethod) error {
if s == nil {
s = sessionManager.selectSession()
}
_, err := g.sendAsync(s, msg, callback)
return err
}

func (client *GettyRemoting) sendAsync(session getty.Session, msg message.RpcMessage, timeout time.Duration) (interface{}, error) {
log.Infof("send async message: {%#v}", msg)
func (g *GettyRemoting) sendAsync(session getty.Session, msg message.RpcMessage, callback callbackMethod) (interface{}, error) {
if _, ok := msg.Body.(message.HeartBeatMessage); ok {
log.Debug("send async message: {%#v}", msg)
} else {
log.Infof("send async message: {%#v}", msg)
}
var err error
if session == nil || session.IsClosed() {
log.Warn("sendAsyncRequestWithResponse nothing, caused by null channel.")
return nil, err
}
resp := message.NewMessageFuture(msg)
client.futures.Store(msg.ID, resp)
g.futures.Store(msg.ID, resp)
_, _, err = session.WritePkg(msg, time.Duration(0))
if err != nil {
client.futures.Delete(msg.ID)
g.futures.Delete(msg.ID)
log.Errorf("send message: %#v, session: %s", msg, session.Stat())
return nil, err
}

log.Debugf("send message: %#v, session: %s", msg, session.Stat())

actualTimeOut := timeout
if timeout <= time.Duration(0) {
// todo timeoue use config
actualTimeOut = time.Duration(2000)
}

wait := func() (interface{}, error) {
select {
case <-gxtime.GetDefaultTimerWheel().After(actualTimeOut):
client.futures.Delete(msg.ID)
if session != nil {
return nil, errors.Errorf("wait response timeout, ip: %s, request: %#v", session.RemoteAddr(), msg)
} else {
return nil, errors.Errorf("wait response timeout and session is nil, request: %#v", msg)
}
case <-resp.Done:
err = resp.Err
return resp.Response, err
}
}

if timeout > time.Duration(0) {
return wait()
} else {
go wait()
if callback != nil {
return callback(msg, resp)
}
return nil, err
return nil, nil
}

func (client *GettyRemoting) GetMessageFuture(msgID int32) *message.MessageFuture {


+ 41
- 25
pkg/remoting/getty/listener.go View File

@@ -24,6 +24,7 @@ import (
getty "github.com/apache/dubbo-getty"
"github.com/seata/seata-go/pkg/common/log"
"github.com/seata/seata-go/pkg/config"
"github.com/seata/seata-go/pkg/protocol/codec"
"github.com/seata/seata-go/pkg/protocol/message"
"github.com/seata/seata-go/pkg/remoting/processor"
"go.uber.org/atomic"
@@ -39,7 +40,8 @@ type gettyClientHandler struct {
idGenerator *atomic.Uint32
msgFutures *sync.Map
mergeMsgMap *sync.Map
processorTable map[message.MessageType]processor.RemotingProcessor
sessionManager *SessionManager
processorMap map[message.MessageType]processor.RemotingProcessor
}

func GetGettyClientHandlerInstance() *gettyClientHandler {
@@ -50,70 +52,84 @@ func GetGettyClientHandlerInstance() *gettyClientHandler {
idGenerator: &atomic.Uint32{},
msgFutures: &sync.Map{},
mergeMsgMap: &sync.Map{},
processorTable: make(map[message.MessageType]processor.RemotingProcessor, 0),
sessionManager: sessionManager,
processorMap: make(map[message.MessageType]processor.RemotingProcessor, 0),
}
})
}
return clientHandler
}

func (client *gettyClientHandler) OnOpen(session getty.Session) error {
sessionManager.RegisterGettySession(session)
func (g *gettyClientHandler) OnOpen(session getty.Session) error {
log.Infof("Open new getty session ")
g.sessionManager.registerSession(session)
go func() {
request := message.RegisterTMRequest{AbstractIdentifyRequest: message.AbstractIdentifyRequest{
Version: client.conf.SeataVersion,
ApplicationId: client.conf.ApplicationID,
TransactionServiceGroup: client.conf.TransactionServiceGroup,
Version: g.conf.SeataVersion,
ApplicationId: g.conf.ApplicationID,
TransactionServiceGroup: g.conf.TransactionServiceGroup,
}}
err := GetGettyRemotingClient().SendAsyncRequest(request)
//client.sendAsyncRequestWithResponse(session, request, RPC_REQUEST_TIMEOUT)
if err != nil {
log.Errorf("OnOpen error: {%#v}", err.Error())
sessionManager.ReleaseGettySession(session)
g.sessionManager.releaseSession(session)
return
}
//todo
//client.GettySessionOnOpenChannel <- session.RemoteAddr()
}()

return nil
}

func (client *gettyClientHandler) OnError(session getty.Session, err error) {
log.Infof("OnError session{%s} got error{%v}, will be closed.", session.Stat(), err)
sessionManager.ReleaseGettySession(session)
func (g *gettyClientHandler) OnError(session getty.Session, err error) {
log.Infof("session{%s} got error{%v}, will be closed.", session.Stat(), err)
g.sessionManager.releaseSession(session)
}

func (client *gettyClientHandler) OnClose(session getty.Session) {
log.Infof("OnClose session{%s} is closing......", session.Stat())
sessionManager.ReleaseGettySession(session)
func (g *gettyClientHandler) OnClose(session getty.Session) {
log.Infof("session{%s} is closing......", session.Stat())
g.sessionManager.releaseSession(session)
}

func (client *gettyClientHandler) OnMessage(session getty.Session, pkg interface{}) {
func (g *gettyClientHandler) OnMessage(session getty.Session, pkg interface{}) {
ctx := context.Background()
log.Debugf("received message: {%#v}", pkg)
log.Debug("received message: {%#v}", pkg)

rpcMessage, ok := pkg.(message.RpcMessage)
if !ok {
log.Errorf("received message is not protocol.RpcMessage. pkg: %#v", pkg)
return
}

if mm, ok := rpcMessage.Body.(message.MessageTypeAware); ok {
processor := client.processorTable[mm.GetTypeCode()]
processor := g.processorMap[mm.GetTypeCode()]
if processor != nil {
processor.Process(ctx, rpcMessage)
} else {
log.Errorf("This message type [%v] has no processor.", mm.GetTypeCode())
log.Errorf("This message type %v has no processor.", mm.GetTypeCode())
}
} else {
log.Errorf("This rpcMessage body %#v is not MessageTypeAware type.", rpcMessage.Body)
}
}

func (client *gettyClientHandler) OnCron(session getty.Session) {
//GetGettyRemotingClient().SendAsyncRequest(message.HeartBeatMessagePing)
func (g *gettyClientHandler) OnCron(session getty.Session) {
log.Debug("session{%s} Oncron executing", session.Stat())
g.transferBeatHeart(session, message.HeartBeatMessagePing)
}

func (g *gettyClientHandler) transferBeatHeart(session getty.Session, msg message.HeartBeatMessage) {
rpcMessage := message.RpcMessage{
ID: int32(g.idGenerator.Inc()),
Type: message.GettyRequestType_HeartbeatRequest,
Codec: byte(codec.CodecTypeSeata),
Compressor: 0,
Body: msg,
}
GetGettyRemotingInstance().SendASync(rpcMessage, session, nil)
}

func (client *gettyClientHandler) RegisterProcessor(msgType message.MessageType, processor processor.RemotingProcessor) {
func (g *gettyClientHandler) RegisterProcessor(msgType message.MessageType, processor processor.RemotingProcessor) {
if nil != processor {
client.processorTable[msgType] = processor
g.processorMap[msgType] = processor
}
}

+ 36
- 42
pkg/remoting/getty/session_manager.go View File

@@ -25,25 +25,36 @@ import (
getty "github.com/apache/dubbo-getty"
)

const (
maxCheckAliveRetry = 600
checkAliveInternal = 100
)

var (
MAX_CHECK_ALIVE_RETRY = 600
CHECK_ALIVE_INTERNAL = 100
allSessions = sync.Map{}
// serverAddress -> rpc_client.Session -> bool
serverSessions = sync.Map{}
sessionSize int32 = 0
sessionManager = &GettySessionManager{}
sessionManager = newSessionManager()
)

type GettySessionManager struct{}
type SessionManager struct {
// serverAddress -> rpc_client.Session -> bool
serverSessions sync.Map
allSessions sync.Map
sessionSize int32
}

func newSessionManager() *SessionManager {
return &SessionManager{
allSessions: sync.Map{},
// serverAddress -> rpc_client.Session -> bool
serverSessions: sync.Map{},
}
}

func (sessionManager *GettySessionManager) AcquireGettySession() getty.Session {
// map 遍历是随机的
func (g *SessionManager) selectSession() getty.Session {
var session getty.Session
allSessions.Range(func(key, value interface{}) bool {
g.allSessions.Range(func(key, value interface{}) bool {
session = key.(getty.Session)
if session.IsClosed() {
sessionManager.ReleaseGettySession(session)
g.releaseSession(session)
} else {
return false
}
@@ -52,15 +63,15 @@ func (sessionManager *GettySessionManager) AcquireGettySession() getty.Session {
if session != nil {
return session
}
if sessionSize == 0 {
ticker := time.NewTicker(time.Duration(CHECK_ALIVE_INTERNAL) * time.Millisecond)
if g.sessionSize == 0 {
ticker := time.NewTicker(time.Duration(checkAliveInternal) * time.Millisecond)
defer ticker.Stop()
for i := 0; i < MAX_CHECK_ALIVE_RETRY; i++ {
for i := 0; i < maxCheckAliveRetry; i++ {
<-ticker.C
allSessions.Range(func(key, value interface{}) bool {
g.allSessions.Range(func(key, value interface{}) bool {
session = key.(getty.Session)
if session.IsClosed() {
sessionManager.ReleaseGettySession(session)
g.releaseSession(session)
} else {
return false
}
@@ -74,38 +85,21 @@ func (sessionManager *GettySessionManager) AcquireGettySession() getty.Session {
return nil
}

func (sessionManager *GettySessionManager) AcquireGettySessionByServerAddress(serverAddress string) getty.Session {
m, _ := serverSessions.LoadOrStore(serverAddress, &sync.Map{})
sMap := m.(*sync.Map)

var session getty.Session
sMap.Range(func(key, value interface{}) bool {
session = key.(getty.Session)
if session.IsClosed() {
sessionManager.ReleaseGettySession(session)
} else {
return false
}
return true
})
return session
}

func (sessionManager *GettySessionManager) ReleaseGettySession(session getty.Session) {
allSessions.Delete(session)
func (g *SessionManager) releaseSession(session getty.Session) {
g.allSessions.Delete(session)
if !session.IsClosed() {
m, _ := serverSessions.LoadOrStore(session.RemoteAddr(), &sync.Map{})
m, _ := g.serverSessions.LoadOrStore(session.RemoteAddr(), &sync.Map{})
sMap := m.(*sync.Map)
sMap.Delete(session)
session.Close()
}
atomic.AddInt32(&sessionSize, -1)
atomic.AddInt32(&g.sessionSize, -1)
}

func (sessionManager *GettySessionManager) RegisterGettySession(session getty.Session) {
allSessions.Store(session, true)
m, _ := serverSessions.LoadOrStore(session.RemoteAddr(), &sync.Map{})
func (g *SessionManager) registerSession(session getty.Session) {
g.allSessions.Store(session, true)
m, _ := g.serverSessions.LoadOrStore(session.RemoteAddr(), &sync.Map{})
sMap := m.(*sync.Map)
sMap.Store(session, true)
atomic.AddInt32(&sessionSize, 1)
atomic.AddInt32(&g.sessionSize, 1)
}

+ 5
- 4
pkg/remoting/processor/client/client_heart_beat_processon.go View File

@@ -21,20 +21,21 @@ import (
"context"

"github.com/seata/seata-go/pkg/common/log"

"github.com/seata/seata-go/pkg/protocol/message"
"github.com/seata/seata-go/pkg/remoting/getty"
)

func init() {
getty.GetGettyClientHandlerInstance().RegisterProcessor(message.MessageType_HeartbeatMsg, &clientHeartBeatProcesson{})
getty.GetGettyClientHandlerInstance().RegisterProcessor(message.MessageType_HeartbeatMsg, &clientHeartBeatProcessor{})
}

type clientHeartBeatProcesson struct{}
type clientHeartBeatProcessor struct{}

func (f *clientHeartBeatProcesson) Process(ctx context.Context, rpcMessage message.RpcMessage) error {
func (f *clientHeartBeatProcessor) Process(ctx context.Context, rpcMessage message.RpcMessage) error {
if msg, ok := rpcMessage.Body.(message.HeartBeatMessage); ok {
if !msg.Ping {
log.Infof("received PONG from {}", ctx)
log.Debug("received PONG from {}", ctx)
}
}
return nil


+ 1
- 1
pkg/remoting/processor/client/client_heart_beat_processor_test.go View File

@@ -71,7 +71,7 @@ func TestClientHeartBeatProcessor(t *testing.T) {
}

var ctx context.Context
var chbProcessor clientHeartBeatProcesson
var chbProcessor clientHeartBeatProcessor
// run tests
for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {


+ 1
- 1
pkg/remoting/processor/client/rm_branch_commit_processor.go View File

@@ -77,7 +77,7 @@ func (f *rmBranchCommitProcessor) Process(ctx context.Context, rpcMessage messag
BranchStatus: status,
},
}
err = getty.GetGettyRemotingClient().SendAsyncResponse(response)
err = getty.GetGettyRemotingClient().SendAsyncResponse(rpcMessage.ID, response)
if err != nil {
log.Errorf("send branch commit response error: {%#v}", err.Error())
return err


+ 1
- 1
pkg/remoting/processor/client/rm_branch_rollback_processor.go View File

@@ -75,7 +75,7 @@ func (f *rmBranchRollbackProcessor) Process(ctx context.Context, rpcMessage mess
BranchStatus: status,
},
}
err = getty.GetGettyRemotingClient().SendAsyncResponse(response)
err = getty.GetGettyRemotingClient().SendAsyncResponse(rpcMessage.ID, response)
if err != nil {
log.Errorf("send branch rollback response error: {%#v}", err.Error())
return err


Loading…
Cancel
Save