Browse Source

use processor on message

tags/v0.1.0-rc1
luky116_Liuyuecai 3 years ago
parent
commit
ca3cfe0930
6 changed files with 60 additions and 85 deletions
  1. +6
    -0
      pkg/imports/imports.go
  2. +0
    -4
      pkg/rm/resource_manager_facade.go
  3. +22
    -7
      pkg/rpc/getty/getty_remoting.go
  4. +14
    -64
      pkg/rpc/getty/listener.go
  5. +16
    -9
      pkg/rpc/processor/client/client_on_response_processor.go
  6. +2
    -1
      test/rpc_remoting_client_test.go

+ 6
- 0
pkg/imports/imports.go View File

@@ -0,0 +1,6 @@
package imports

import (
_ "github.com/seata/seata-go/pkg/rpc/getty"
_ "github.com/seata/seata-go/pkg/rpc/processor/client"
)

+ 0
- 4
pkg/rm/resource_manager_facade.go View File

@@ -17,10 +17,6 @@ var (
onceRMFacade = &sync.Once{}
)

func init() {
rmFacadeInstance = &ResourceManagerFacade{}
}

func GetResourceManagerFacadeInstance() *ResourceManagerFacade {
if rmFacadeInstance == nil {
onceRMFacade.Do(func() {


+ 22
- 7
pkg/rpc/getty/getty_remoting.go View File

@@ -101,15 +101,30 @@ func (client *GettyRemoting) GetMessageFuture(msgID int32) *protocol.MessageFutu
return nil
}

func (client *GettyRemoting) RemoveMessageFuture(msgID int32) {
client.futures.Delete(msgID)
}

func (client *GettyRemoting) RemoveMergedMessageFuture(msgID int32) {
client.mergeMsgMap.Delete(msgID)
}

func (client *GettyRemoting) GetMergedMessage(msgID int32) *protocol.MergedWarpMessage {
if msg, ok := client.mergeMsgMap.Load(msgID); ok {
return msg.(*protocol.MergedWarpMessage)
}
return nil
}

func (client *GettyRemoting) NotifytRpcMessageResponse(rpcMessage protocol.RpcMessage) {
messageFuture := client.GetMessageFuture(rpcMessage.ID)
if messageFuture == nil {
if messageFuture != nil {
messageFuture.Response = rpcMessage.Body
// todo messageFuture.Err怎么配置呢?
//messageFuture.Err = rpcMessage.Err
messageFuture.Done <- true
//client.futures.Delete(rpcMessage.ID)
} else {
log.Infof("msg: {} is not found in futures.", rpcMessage.ID)
return
}
messageFuture.Response = rpcMessage.Body
// todo messageFuture.Err怎么配置呢?
//messageFuture.Err = rpcMessage.Err
messageFuture.Done <- true
client.futures.Delete(rpcMessage.ID)
}

+ 14
- 64
pkg/rpc/getty/listener.go View File

@@ -1,6 +1,7 @@
package getty

import (
"context"
"sync"
)

@@ -34,10 +35,11 @@ func GetGettyClientHandlerInstance() *gettyClientHandler {
if clientHandler == nil {
onceClientHandler.Do(func() {
clientHandler = &gettyClientHandler{
conf: config.GetClientConfig(),
idGenerator: &atomic.Uint32{},
futures: &sync.Map{},
mergeMsgMap: &sync.Map{},
conf: config.GetClientConfig(),
idGenerator: &atomic.Uint32{},
futures: &sync.Map{},
mergeMsgMap: &sync.Map{},
processorTable: make(map[protocol.MessageType]processor.RemotingProcessor, 0),
}
})
}
@@ -81,6 +83,7 @@ func (client *gettyClientHandler) OnClose(session getty.Session) {
// OnMessage ...
func (client *gettyClientHandler) OnMessage(session getty.Session, pkg interface{}) {
// TODO 需要把session里面的关键信息存储到context中,以方便在后面流程中获取使用。比如,XID等等
ctx := context.Background()
log.Debugf("received message: {%#v}", pkg)

rpcMessage, ok := pkg.(protocol.RpcMessage)
@@ -89,46 +92,15 @@ func (client *gettyClientHandler) OnMessage(session getty.Session, pkg interface
return
}

heartBeat, isHeartBeat := rpcMessage.Body.(protocol.HeartBeatMessage)
if isHeartBeat && heartBeat == protocol.HeartBeatMessagePong {
log.Debugf("received PONG from %s", session.RemoteAddr())
return
}

if rpcMessage.Type == protocol.MSGTypeRequestSync ||
rpcMessage.Type == protocol.MSGTypeRequestOneway {
log.Debugf("msgID: %d, body: %#v", rpcMessage.ID, rpcMessage.Body)

client.onMessage(rpcMessage, session.RemoteAddr())
return
}

mergedResult, isMergedResult := rpcMessage.Body.(protocol.MergeResultMessage)
if isMergedResult {
mm, loaded := client.mergeMsgMap.Load(rpcMessage.ID)
if loaded {
mergedMessage := mm.(protocol.MergedWarpMessage)
log.Infof("rpcMessageID: %d, rpcMessage: %#v, result: %#v", rpcMessage.ID, mergedMessage, mergedResult)
for i := 0; i < len(mergedMessage.Msgs); i++ {
msgID := mergedMessage.MsgIds[i]
resp, loaded := client.futures.Load(msgID)
if loaded {
response := resp.(*protocol.MessageFuture)
response.Response = mergedResult.Msgs[i]
response.Done <- true
client.futures.Delete(msgID)
}
}
client.mergeMsgMap.Delete(rpcMessage.ID)
if mm, ok := rpcMessage.Body.(protocol.MessageTypeAware); ok {
processor := client.processorTable[mm.GetTypeCode()]
if processor != nil {
processor.Process(ctx, rpcMessage)
} else {
log.Errorf("This message type [%v] has no processor.", mm.GetTypeCode())
}
} else {
resp, loaded := client.futures.Load(rpcMessage.ID)
if loaded {
response := resp.(*protocol.MessageFuture)
response.Response = rpcMessage.Body
response.Done <- true
client.futures.Delete(rpcMessage.ID)
}
log.Errorf("This rpcMessage body[%v] is not MessageTypeAware type.", rpcMessage.Body)
}
}

@@ -137,28 +109,6 @@ func (client *gettyClientHandler) OnCron(session getty.Session) {
//GetGettyRemotingClient().SendAsyncRequest(protocol.HeartBeatMessagePing)
}

func (client *gettyClientHandler) onMessage(rpcMessage protocol.RpcMessage, serverAddress string) {
msg := rpcMessage.Body.(protocol.MessageTypeAware)
log.Debugf("onMessage: %#v", msg)
// todo
//switch msg.GetTypeCode() {
//case protocol.TypeBranchCommit:
// client.BranchCommitRequestChannel <- RpcRMMessage{
// RpcMessage: rpcMessage,
// ServerAddress: serverAddress,
// }
//case protocol.TypeBranchRollback:
// client.BranchRollbackRequestChannel <- RpcRMMessage{
// RpcMessage: rpcMessage,
// ServerAddress: serverAddress,
// }
//case protocol.TypeRmDeleteUndolog:
// break
//default:
// break
//}
}

func (client *gettyClientHandler) RegisterProcessor(msgType protocol.MessageType, processor processor.RemotingProcessor) {
if nil != processor {
client.processorTable[msgType] = processor


+ 16
- 9
pkg/rpc/processor/client/client_on_response_processor.go View File

@@ -24,21 +24,28 @@ type clientOnResponseProcessor struct {

func (f *clientOnResponseProcessor) Process(ctx context.Context, rpcMessage protocol.RpcMessage) error {
// 如果是合并的结果消息,直接通知已经处理完成
if _, ok := rpcMessage.Body.(protocol.MergeResultMessage); ok {
mergeMsg, _ := rpcMessage.Body.(protocol.MergeResultMessage)
for _, msg := range mergeMsg.Msgs {
// todo handle merge msg
log.Info(msg)
//for _, msg := range mergeMsg.Msgs {
// msgID := msgmergeMsg.
//
//}
if mergedResult, ok := rpcMessage.Body.(protocol.MergeResultMessage); ok {

mergedMessage := getty.GetGettyRemotingInstance().GetMergedMessage(rpcMessage.ID)
if mergedMessage != nil {
for i := 0; i < len(mergedMessage.Msgs); i++ {
msgID := mergedMessage.MsgIds[i]
response := getty.GetGettyRemotingInstance().GetMessageFuture(msgID)
if response != nil {
response.Response = mergedResult.Msgs[i]
response.Done <- true
getty.GetGettyRemotingInstance().RemoveMessageFuture(msgID)
}
}
getty.GetGettyRemotingInstance().RemoveMergedMessageFuture(rpcMessage.ID)
}
return nil
} else {
// 如果是请求消息,做处理逻辑
msgFuture := getty.GetGettyRemotingInstance().GetMessageFuture(rpcMessage.ID)
if msgFuture != nil {
getty.GetGettyRemotingInstance().NotifytRpcMessageResponse(rpcMessage)
getty.GetGettyRemotingInstance().RemoveMessageFuture(rpcMessage.ID)
} else {
if _, ok := rpcMessage.Body.(protocol.AbstractResultMessage); ok {
log.Infof("the rm client received response msg [{}] from tc server.", msgFuture)


pkg/rpc/getty/rpc_remoting_client_test.go → test/rpc_remoting_client_test.go View File

@@ -1,6 +1,7 @@
package getty
package test

import (
_ "github.com/seata/seata-go/pkg/imports"
"testing"
"time"
)

Loading…
Cancel
Save