Browse Source

Merge pull request #31 from luky116/fix-codec

fix commit and rollback codec bug #14
tags/v0.1.0-rc1
Xin.Zh GitHub 3 years ago
parent
commit
52363172fb
No known key found for this signature in database GPG Key ID: 4AEE18F83AFDEB23
21 changed files with 180 additions and 81 deletions
  1. +1
    -0
      go.mod
  2. +26
    -9
      pkg/protocol/codec/branch_commit_response_codec.go
  3. +9
    -0
      pkg/protocol/codec/branch_commit_response_codec_test.go
  4. +7
    -7
      pkg/protocol/codec/branch_register_response_codec.go
  5. +2
    -2
      pkg/protocol/codec/branch_register_response_codec_test.go
  6. +17
    -0
      pkg/protocol/codec/branch_rollback_response_codec.go
  7. +9
    -0
      pkg/protocol/codec/branch_rollback_response_codec_test.go
  8. +7
    -7
      pkg/protocol/codec/common_global_end_response_codec.go
  9. +7
    -7
      pkg/protocol/codec/global_begin_response_codec.go
  10. +2
    -2
      pkg/protocol/codec/global_begin_response_codec_test.go
  11. +4
    -0
      pkg/protocol/message/other_message.go
  12. +2
    -2
      pkg/protocol/message/response_message.go
  13. +1
    -1
      pkg/remoting/getty/getty_remoting.go
  14. +2
    -2
      pkg/remoting/getty/listener.go
  15. +4
    -4
      pkg/remoting/processor/client/client_heart_beat_processon.go
  16. +19
    -19
      pkg/remoting/processor/client/client_on_response_processor.go
  17. +25
    -5
      pkg/remoting/processor/client/rm_branch_commit_processor.go
  18. +24
    -6
      pkg/remoting/processor/client/rm_branch_rollback_processor.go
  19. +1
    -1
      pkg/rm/tcc/tcc_service.go
  20. +10
    -5
      pkg/tm/global_transaction.go
  21. +1
    -2
      test/tcc_service_test.go

+ 1
- 0
go.mod View File

@@ -14,6 +14,7 @@ require (
github.com/stretchr/testify v1.7.0
go.uber.org/atomic v1.9.0
go.uber.org/zap v1.19.1
golang.org/x/tools v0.1.11 // indirect
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c // indirect
gopkg.in/natefinch/lumberjack.v2 v2.0.0 // indirect
vimagination.zapto.org/byteio v0.0.0-20200222190125-d27cba0f0b10


+ 26
- 9
pkg/protocol/codec/branch_commit_response_codec.go View File

@@ -18,7 +18,10 @@
package codec

import (
"math"

"github.com/seata/seata-go/pkg/common/bytes"
serror "github.com/seata/seata-go/pkg/common/error"
"github.com/seata/seata-go/pkg/protocol/branch"
"github.com/seata/seata-go/pkg/protocol/message"
)
@@ -31,23 +34,37 @@ type BranchCommitResponseCodec struct {
}

func (g *BranchCommitResponseCodec) Decode(in []byte) interface{} {
res := message.BranchCommitResponse{}
data := message.BranchCommitResponse{}
buf := bytes.NewByteBuffer(in)

res.Xid = bytes.ReadString16Length(buf)
res.BranchId = int64(bytes.ReadUInt64(buf))
res.BranchStatus = branch.BranchStatus(bytes.ReadByte(buf))
data.ResultCode = message.ResultCode(bytes.ReadByte(buf))
if data.ResultCode == message.ResultCodeFailed {
data.Msg = bytes.ReadString16Length(buf)
}
data.TransactionExceptionCode = serror.TransactionExceptionCode(bytes.ReadByte(buf))
data.Xid = bytes.ReadString16Length(buf)
data.BranchId = int64(bytes.ReadUInt64(buf))
data.BranchStatus = branch.BranchStatus(bytes.ReadByte(buf))

return res
return data
}

func (g *BranchCommitResponseCodec) Encode(in interface{}) []byte {
req, _ := in.(message.BranchCommitResponse)
data, _ := in.(message.BranchCommitResponse)
buf := bytes.NewByteBuffer([]byte{})

bytes.WriteString16Length(req.Xid, buf)
buf.WriteInt64(req.BranchId)
buf.WriteByte(byte(req.BranchStatus))
buf.WriteByte(byte(data.ResultCode))
if data.ResultCode == message.ResultCodeFailed {
msg := data.Msg
if len(data.Msg) > math.MaxInt16 {
msg = data.Msg[:math.MaxInt16]
}
bytes.WriteString16Length(msg, buf)
}
buf.WriteByte(byte(data.TransactionExceptionCode))
bytes.WriteString16Length(data.Xid, buf)
buf.WriteInt64(data.BranchId)
buf.WriteByte(byte(data.BranchStatus))

return buf.Bytes()
}


+ 9
- 0
pkg/protocol/codec/branch_commit_response_codec_test.go View File

@@ -20,6 +20,8 @@ package codec
import (
"testing"

serror "github.com/seata/seata-go/pkg/common/error"

model2 "github.com/seata/seata-go/pkg/protocol/branch"

"github.com/seata/seata-go/pkg/protocol/message"
@@ -32,6 +34,13 @@ func TestBranchCommitResponseCodec(t *testing.T) {
Xid: "123344",
BranchId: 56678,
BranchStatus: model2.BranchStatusPhaseoneFailed,
AbstractTransactionResponse: message.AbstractTransactionResponse{
TransactionExceptionCode: serror.TransactionExceptionCodeBeginFailed,
AbstractResultMessage: message.AbstractResultMessage{
ResultCode: message.ResultCodeFailed,
Msg: "FAILED",
},
},
},
}



+ 7
- 7
pkg/protocol/codec/branch_register_response_codec.go View File

@@ -18,8 +18,10 @@
package codec

import (
"math"

"github.com/seata/seata-go/pkg/common/bytes"
error2 "github.com/seata/seata-go/pkg/common/error"
serror "github.com/seata/seata-go/pkg/common/error"
"github.com/seata/seata-go/pkg/protocol/message"
)

@@ -38,7 +40,7 @@ func (g *BranchRegisterResponseCodec) Decode(in []byte) interface{} {
if data.ResultCode == message.ResultCodeFailed {
data.Msg = bytes.ReadString16Length(buf)
}
data.TransactionExceptionCode = error2.TransactionExceptionCode(bytes.ReadByte(buf))
data.TransactionExceptionCode = serror.TransactionExceptionCode(bytes.ReadByte(buf))
data.BranchId = int64(bytes.ReadUInt64(buf))

return data
@@ -50,11 +52,9 @@ func (c *BranchRegisterResponseCodec) Encode(in interface{}) []byte {

buf.WriteByte(byte(data.ResultCode))
if data.ResultCode == message.ResultCodeFailed {
var msg string
if len(data.Msg) > 128 {
msg = data.Msg[:128]
} else {
msg = data.Msg
msg := data.Msg
if len(data.Msg) > math.MaxInt16 {
msg = data.Msg[:math.MaxInt16]
}
bytes.WriteString16Length(msg, buf)
}


+ 2
- 2
pkg/protocol/codec/branch_register_response_codec_test.go View File

@@ -20,7 +20,7 @@ package codec
import (
"testing"

error2 "github.com/seata/seata-go/pkg/common/error"
serror "github.com/seata/seata-go/pkg/common/error"

"github.com/seata/seata-go/pkg/protocol/message"
"github.com/stretchr/testify/assert"
@@ -33,7 +33,7 @@ func TestBranchRegisterResponseCodec(t *testing.T) {
ResultCode: message.ResultCodeFailed,
Msg: "FAILED",
},
TransactionExceptionCode: error2.TransactionExceptionCodeUnknown,
TransactionExceptionCode: serror.TransactionExceptionCodeUnknown,
},
BranchId: 124356567,
}


+ 17
- 0
pkg/protocol/codec/branch_rollback_response_codec.go View File

@@ -18,7 +18,10 @@
package codec

import (
"math"

"github.com/seata/seata-go/pkg/common/bytes"
serror "github.com/seata/seata-go/pkg/common/error"
"github.com/seata/seata-go/pkg/protocol/branch"
"github.com/seata/seata-go/pkg/protocol/message"
)
@@ -34,6 +37,11 @@ func (g *BranchRollbackResponseCodec) Decode(in []byte) interface{} {
data := message.BranchRollbackResponse{}
buf := bytes.NewByteBuffer(in)

data.ResultCode = message.ResultCode(bytes.ReadByte(buf))
if data.ResultCode == message.ResultCodeFailed {
data.Msg = bytes.ReadString16Length(buf)
}
data.TransactionExceptionCode = serror.TransactionExceptionCode(bytes.ReadByte(buf))
data.Xid = bytes.ReadString16Length(buf)
data.BranchId = int64(bytes.ReadUInt64(buf))
data.BranchStatus = branch.BranchStatus(bytes.ReadByte(buf))
@@ -45,6 +53,15 @@ func (g *BranchRollbackResponseCodec) Encode(in interface{}) []byte {
data, _ := in.(message.BranchRollbackResponse)
buf := bytes.NewByteBuffer([]byte{})

buf.WriteByte(byte(data.ResultCode))
if data.ResultCode == message.ResultCodeFailed {
msg := data.Msg
if len(data.Msg) > math.MaxInt16 {
msg = data.Msg[:math.MaxInt16]
}
bytes.WriteString16Length(msg, buf)
}
buf.WriteByte(byte(data.TransactionExceptionCode))
bytes.WriteString16Length(data.Xid, buf)
buf.WriteInt64(data.BranchId)
buf.WriteByte(byte(data.BranchStatus))


+ 9
- 0
pkg/protocol/codec/branch_rollback_response_codec_test.go View File

@@ -20,6 +20,8 @@ package codec
import (
"testing"

serror "github.com/seata/seata-go/pkg/common/error"

model2 "github.com/seata/seata-go/pkg/protocol/branch"

"github.com/seata/seata-go/pkg/protocol/message"
@@ -32,6 +34,13 @@ func TestBranchRollbackResponseCodec(t *testing.T) {
Xid: "123344",
BranchId: 56678,
BranchStatus: model2.BranchStatusPhaseoneFailed,
AbstractTransactionResponse: message.AbstractTransactionResponse{
TransactionExceptionCode: serror.TransactionExceptionCodeBeginFailed,
AbstractResultMessage: message.AbstractResultMessage{
ResultCode: message.ResultCodeFailed,
Msg: "FAILED",
},
},
},
}



+ 7
- 7
pkg/protocol/codec/common_global_end_response_codec.go View File

@@ -18,8 +18,10 @@
package codec

import (
"math"

"github.com/seata/seata-go/pkg/common/bytes"
transaction2 "github.com/seata/seata-go/pkg/common/error"
serror "github.com/seata/seata-go/pkg/common/error"
"github.com/seata/seata-go/pkg/protocol/message"
)

@@ -32,11 +34,9 @@ func (c *CommonGlobalEndResponseCodec) Encode(in interface{}) []byte {

buf.WriteByte(byte(data.ResultCode))
if data.ResultCode == message.ResultCodeFailed {
var msg string
if len(data.Msg) > 128 {
msg = data.Msg[:128]
} else {
msg = data.Msg
msg := data.Msg
if len(data.Msg) > math.MaxInt16 {
msg = data.Msg[:math.MaxInt16]
}
bytes.WriteString16Length(msg, buf)
}
@@ -54,7 +54,7 @@ func (c *CommonGlobalEndResponseCodec) Decode(in []byte) interface{} {
if data.ResultCode == message.ResultCodeFailed {
data.Msg = bytes.ReadString16Length(buf)
}
data.TransactionExceptionCode = transaction2.TransactionExceptionCode(bytes.ReadByte(buf))
data.TransactionExceptionCode = serror.TransactionExceptionCode(bytes.ReadByte(buf))
data.GlobalStatus = message.GlobalStatus(bytes.ReadByte(buf))

return data


+ 7
- 7
pkg/protocol/codec/global_begin_response_codec.go View File

@@ -18,8 +18,10 @@
package codec

import (
"math"

"github.com/seata/seata-go/pkg/common/bytes"
error2 "github.com/seata/seata-go/pkg/common/error"
serror "github.com/seata/seata-go/pkg/common/error"
"github.com/seata/seata-go/pkg/protocol/message"
)

@@ -36,11 +38,9 @@ func (c *GlobalBeginResponseCodec) Encode(in interface{}) []byte {

buf.WriteByte(byte(data.ResultCode))
if data.ResultCode == message.ResultCodeFailed {
var msg string
if len(data.Msg) > 128 {
msg = data.Msg[:128]
} else {
msg = data.Msg
msg := data.Msg
if len(data.Msg) > math.MaxInt16 {
msg = data.Msg[:math.MaxInt16]
}
bytes.WriteString16Length(msg, buf)
}
@@ -59,7 +59,7 @@ func (g *GlobalBeginResponseCodec) Decode(in []byte) interface{} {
if data.ResultCode == message.ResultCodeFailed {
data.Msg = bytes.ReadString16Length(buf)
}
data.TransactionExceptionCode = error2.TransactionExceptionCode(bytes.ReadByte(buf))
data.TransactionExceptionCode = serror.TransactionExceptionCode(bytes.ReadByte(buf))
data.Xid = bytes.ReadString16Length(buf)
data.ExtraData = []byte(bytes.ReadString16Length(buf))



+ 2
- 2
pkg/protocol/codec/global_begin_response_codec_test.go View File

@@ -20,7 +20,7 @@ package codec
import (
"testing"

error2 "github.com/seata/seata-go/pkg/common/error"
serror "github.com/seata/seata-go/pkg/common/error"

"github.com/seata/seata-go/pkg/protocol/message"
"github.com/stretchr/testify/assert"
@@ -33,7 +33,7 @@ func TestGlobalBeginResponseCodec(t *testing.T) {
ResultCode: message.ResultCodeFailed,
Msg: "FAILED",
},
TransactionExceptionCode: error2.TransactionExceptionCodeBeginFailed,
TransactionExceptionCode: serror.TransactionExceptionCodeBeginFailed,
},

Xid: "test-transaction-id",


+ 4
- 0
pkg/protocol/message/other_message.go View File

@@ -56,3 +56,7 @@ func (msg HeartBeatMessage) ToString() string {
return "services pong"
}
}

func (resp HeartBeatMessage) GetTypeCode() MessageType {
return MessageType_HeartbeatMsg
}

+ 2
- 2
pkg/protocol/message/response_message.go View File

@@ -18,13 +18,13 @@
package message

import (
transaction2 "github.com/seata/seata-go/pkg/common/error"
serror "github.com/seata/seata-go/pkg/common/error"
model2 "github.com/seata/seata-go/pkg/protocol/branch"
)

type AbstractTransactionResponse struct {
AbstractResultMessage
TransactionExceptionCode transaction2.TransactionExceptionCode
TransactionExceptionCode serror.TransactionExceptionCode
}

type AbstractBranchEndResponse struct {


+ 1
- 1
pkg/remoting/getty/getty_remoting.go View File

@@ -72,7 +72,7 @@ func (client *GettyRemoting) SendASync(msg message.RpcMessage) error {
}

func (client *GettyRemoting) sendAsync(session getty.Session, msg message.RpcMessage, timeout time.Duration) (interface{}, error) {
log.Infof("send message: {%#v}", msg)
log.Infof("send async message: {%#v}", msg)
var err error
if session == nil || session.IsClosed() {
log.Warn("sendAsyncRequestWithResponse nothing, caused by null channel.")


+ 2
- 2
pkg/remoting/getty/listener.go View File

@@ -68,7 +68,7 @@ func (client *gettyClientHandler) OnOpen(session getty.Session) error {
err := GetGettyRemotingClient().SendAsyncRequest(request)
//client.sendAsyncRequestWithResponse(session, request, RPC_REQUEST_TIMEOUT)
if err != nil {
log.Error("OnOpen error: {%#v}", err.Error())
log.Errorf("OnOpen error: {%#v}", err.Error())
sessionManager.ReleaseGettySession(session)
return
}
@@ -111,7 +111,7 @@ func (client *gettyClientHandler) OnMessage(session getty.Session, pkg interface
}

func (client *gettyClientHandler) OnCron(session getty.Session) {
// todo send heartbeat message
GetGettyRemotingClient().SendAsyncRequest(message.HeartBeatMessagePing)
}

func (client *gettyClientHandler) RegisterProcessor(msgType message.MessageType, processor processor.RemotingProcessor) {


+ 4
- 4
pkg/remoting/processor/client/client_heart_beat_processon.go View File

@@ -32,10 +32,10 @@ func init() {
type clientHeartBeatProcesson struct{}

func (f *clientHeartBeatProcesson) Process(ctx context.Context, rpcMessage message.RpcMessage) error {
if _, ok := rpcMessage.Body.(message.HeartBeatMessage); ok {
// TODO 如何从context中获取远程服务的信息?
//log.Infof("received PONG from {}", ctx.channel().remoteAddress())
log.Infof("received PONG from {}", ctx)
if msg, ok := rpcMessage.Body.(message.HeartBeatMessage); ok {
if !msg.Ping {
log.Infof("received PONG from {}", ctx)
}
}
return nil
}

+ 19
- 19
pkg/remoting/processor/client/client_on_response_processor.go View File

@@ -23,23 +23,23 @@ import (
"github.com/seata/seata-go/pkg/common/log"
"github.com/seata/seata-go/pkg/protocol/message"

getty2 "github.com/seata/seata-go/pkg/remoting/getty"
"github.com/seata/seata-go/pkg/remoting/getty"
)

func init() {
clientOnResponseProcessor := &clientOnResponseProcessor{}
getty2.GetGettyClientHandlerInstance().RegisterProcessor(message.MessageType_SeataMergeResult, clientOnResponseProcessor)
getty2.GetGettyClientHandlerInstance().RegisterProcessor(message.MessageType_BranchRegisterResult, clientOnResponseProcessor)
getty2.GetGettyClientHandlerInstance().RegisterProcessor(message.MessageType_BranchStatusReportResult, clientOnResponseProcessor)
getty2.GetGettyClientHandlerInstance().RegisterProcessor(message.MessageType_GlobalLockQueryResult, clientOnResponseProcessor)
getty2.GetGettyClientHandlerInstance().RegisterProcessor(message.MessageType_RegRmResult, clientOnResponseProcessor)
getty2.GetGettyClientHandlerInstance().RegisterProcessor(message.MessageType_GlobalBeginResult, clientOnResponseProcessor)
getty2.GetGettyClientHandlerInstance().RegisterProcessor(message.MessageType_GlobalCommitResult, clientOnResponseProcessor)
getty.GetGettyClientHandlerInstance().RegisterProcessor(message.MessageType_SeataMergeResult, clientOnResponseProcessor)
getty.GetGettyClientHandlerInstance().RegisterProcessor(message.MessageType_BranchRegisterResult, clientOnResponseProcessor)
getty.GetGettyClientHandlerInstance().RegisterProcessor(message.MessageType_BranchStatusReportResult, clientOnResponseProcessor)
getty.GetGettyClientHandlerInstance().RegisterProcessor(message.MessageType_GlobalLockQueryResult, clientOnResponseProcessor)
getty.GetGettyClientHandlerInstance().RegisterProcessor(message.MessageType_RegRmResult, clientOnResponseProcessor)
getty.GetGettyClientHandlerInstance().RegisterProcessor(message.MessageType_GlobalBeginResult, clientOnResponseProcessor)
getty.GetGettyClientHandlerInstance().RegisterProcessor(message.MessageType_GlobalCommitResult, clientOnResponseProcessor)

getty2.GetGettyClientHandlerInstance().RegisterProcessor(message.MessageType_GlobalReportResult, clientOnResponseProcessor)
getty2.GetGettyClientHandlerInstance().RegisterProcessor(message.MessageType_GlobalRollbackResult, clientOnResponseProcessor)
getty2.GetGettyClientHandlerInstance().RegisterProcessor(message.MessageType_GlobalStatusResult, clientOnResponseProcessor)
getty2.GetGettyClientHandlerInstance().RegisterProcessor(message.MessageType_RegCltResult, clientOnResponseProcessor)
getty.GetGettyClientHandlerInstance().RegisterProcessor(message.MessageType_GlobalReportResult, clientOnResponseProcessor)
getty.GetGettyClientHandlerInstance().RegisterProcessor(message.MessageType_GlobalRollbackResult, clientOnResponseProcessor)
getty.GetGettyClientHandlerInstance().RegisterProcessor(message.MessageType_GlobalStatusResult, clientOnResponseProcessor)
getty.GetGettyClientHandlerInstance().RegisterProcessor(message.MessageType_RegCltResult, clientOnResponseProcessor)
}

type clientOnResponseProcessor struct {
@@ -48,26 +48,26 @@ type clientOnResponseProcessor struct {
func (f *clientOnResponseProcessor) Process(ctx context.Context, rpcMessage message.RpcMessage) error {
log.Infof("the rm client received clientOnResponse msg %#v from tc server.", rpcMessage)
if mergedResult, ok := rpcMessage.Body.(message.MergeResultMessage); ok {
mergedMessage := getty2.GetGettyRemotingInstance().GetMergedMessage(rpcMessage.ID)
mergedMessage := getty.GetGettyRemotingInstance().GetMergedMessage(rpcMessage.ID)
if mergedMessage != nil {
for i := 0; i < len(mergedMessage.Msgs); i++ {
msgID := mergedMessage.MsgIds[i]
response := getty2.GetGettyRemotingInstance().GetMessageFuture(msgID)
response := getty.GetGettyRemotingInstance().GetMessageFuture(msgID)
if response != nil {
response.Response = mergedResult.Msgs[i]
response.Done <- struct{}{}
getty2.GetGettyRemotingInstance().RemoveMessageFuture(msgID)
getty.GetGettyRemotingInstance().RemoveMessageFuture(msgID)
}
}
getty2.GetGettyRemotingInstance().RemoveMergedMessageFuture(rpcMessage.ID)
getty.GetGettyRemotingInstance().RemoveMergedMessageFuture(rpcMessage.ID)
}
return nil
} else {
// 如果是请求消息,做处理逻辑
msgFuture := getty2.GetGettyRemotingInstance().GetMessageFuture(rpcMessage.ID)
msgFuture := getty.GetGettyRemotingInstance().GetMessageFuture(rpcMessage.ID)
if msgFuture != nil {
getty2.GetGettyRemotingInstance().NotifyRpcMessageResponse(rpcMessage)
getty2.GetGettyRemotingInstance().RemoveMessageFuture(rpcMessage.ID)
getty.GetGettyRemotingInstance().NotifyRpcMessageResponse(rpcMessage)
getty.GetGettyRemotingInstance().RemoveMessageFuture(rpcMessage.ID)
} else {
if _, ok := rpcMessage.Body.(message.AbstractResultMessage); ok {
log.Infof("the rm client received response msg [{}] from tc server.", msgFuture)


+ 25
- 5
pkg/remoting/processor/client/rm_branch_commit_processor.go View File

@@ -23,13 +23,13 @@ import (
"github.com/seata/seata-go/pkg/common/log"
"github.com/seata/seata-go/pkg/protocol/message"

getty2 "github.com/seata/seata-go/pkg/remoting/getty"
"github.com/seata/seata-go/pkg/remoting/getty"
"github.com/seata/seata-go/pkg/rm"
)

func init() {
rmBranchCommitProcessor := &rmBranchCommitProcessor{}
getty2.GetGettyClientHandlerInstance().RegisterProcessor(message.MessageType_BranchCommit, rmBranchCommitProcessor)
getty.GetGettyClientHandlerInstance().RegisterProcessor(message.MessageType_BranchCommit, rmBranchCommitProcessor)
}

type rmBranchCommitProcessor struct {
@@ -46,22 +46,42 @@ func (f *rmBranchCommitProcessor) Process(ctx context.Context, rpcMessage messag

status, err := rm.GetResourceManagerInstance().GetResourceManager(request.BranchType).BranchCommit(ctx, request.BranchType, xid, branchID, resourceID, applicationData)
if err != nil {
log.Infof("Branch commit error: %s", err.Error())
log.Infof("branch commit error: %s", err.Error())
return err
}
log.Infof("branch commit success: xid %s, branchID %s, resourceID %s, applicationData %s", xid, branchID, resourceID, applicationData)

var (
resultCode message.ResultCode
errMsg string
)
if err != nil {
resultCode = message.ResultCodeFailed
errMsg = err.Error()
} else {
resultCode = message.ResultCodeSuccess
}

// reply commit response to tc server
// todo add TransactionExceptionCode
response := message.BranchCommitResponse{
AbstractBranchEndResponse: message.AbstractBranchEndResponse{
AbstractTransactionResponse: message.AbstractTransactionResponse{
AbstractResultMessage: message.AbstractResultMessage{
ResultCode: resultCode,
Msg: errMsg,
},
},
Xid: xid,
BranchId: branchID,
BranchStatus: status,
},
}
err = getty2.GetGettyRemotingClient().SendAsyncResponse(response)
err = getty.GetGettyRemotingClient().SendAsyncResponse(response)
if err != nil {
log.Error("BranchCommitResponse error: {%#v}", err.Error())
log.Errorf("send branch commit response error: {%#v}", err.Error())
return err
}
log.Infof("send branch commit response success: xid %s, branchID %s, resourceID %s, applicationData %s", xid, branchID, resourceID, applicationData)
return nil
}

+ 24
- 6
pkg/remoting/processor/client/rm_branch_rollback_processor.go View File

@@ -23,13 +23,13 @@ import (
"github.com/seata/seata-go/pkg/common/log"
"github.com/seata/seata-go/pkg/protocol/message"

getty2 "github.com/seata/seata-go/pkg/remoting/getty"
"github.com/seata/seata-go/pkg/remoting/getty"
"github.com/seata/seata-go/pkg/rm"
)

func init() {
rmBranchRollbackProcessor := &rmBranchRollbackProcessor{}
getty2.GetGettyClientHandlerInstance().RegisterProcessor(message.MessageType_BranchRollback, rmBranchRollbackProcessor)
getty.GetGettyClientHandlerInstance().RegisterProcessor(message.MessageType_BranchRollback, rmBranchRollbackProcessor)
}

type rmBranchRollbackProcessor struct {
@@ -42,26 +42,44 @@ func (f *rmBranchRollbackProcessor) Process(ctx context.Context, rpcMessage mess
branchID := request.BranchId
resourceID := request.ResourceId
applicationData := request.ApplicationData
log.Infof("Branch committing: xid %s, branchID %s, resourceID %s, applicationData %s", xid, branchID, resourceID, applicationData)
log.Infof("Branch rollback request: xid %s, branchID %s, resourceID %s, applicationData %s", xid, branchID, resourceID, applicationData)

status, err := rm.GetResourceManagerInstance().GetResourceManager(request.BranchType).BranchRollback(ctx, request.BranchType, xid, branchID, resourceID, applicationData)
if err != nil {
log.Infof("Branch commit error: %s", err.Error())
log.Infof("branch rollback error: %s", err.Error())
return err
}
log.Infof("branch rollback success: xid %s, branchID %s, resourceID %s, applicationData %s", xid, branchID, resourceID, applicationData)

var (
resultCode message.ResultCode
errMsg string
)
if err != nil {
resultCode = message.ResultCodeFailed
errMsg = err.Error()
} else {
resultCode = message.ResultCodeSuccess
}
// reply commit response to tc server
response := message.BranchRollbackResponse{
AbstractBranchEndResponse: message.AbstractBranchEndResponse{
AbstractTransactionResponse: message.AbstractTransactionResponse{
AbstractResultMessage: message.AbstractResultMessage{
ResultCode: resultCode,
Msg: errMsg,
},
},
Xid: xid,
BranchId: branchID,
BranchStatus: status,
},
}
err = getty2.GetGettyRemotingClient().SendAsyncResponse(response)
err = getty.GetGettyRemotingClient().SendAsyncResponse(response)
if err != nil {
log.Error("BranchCommitResponse error: {%#v}", err.Error())
log.Errorf("send branch rollback response error: {%#v}", err.Error())
return err
}
log.Infof("send branch rollback response success: xid %s, branchID %s, resourceID %s, applicationData %s", xid, branchID, resourceID, applicationData)
return nil
}

+ 1
- 1
pkg/rm/tcc/tcc_service.go View File

@@ -96,7 +96,7 @@ func (t *TCCServiceProxy) RegisteBranch(ctx context.Context, param interface{})
branchId, err := rm.GetRMRemotingInstance().BranchRegister(branch.BranchTypeTCC, t.GetActionName(), "", tm.GetXID(ctx), string(tccContextStr), "")
if err != nil {
err = errors.New(fmt.Sprintf("BranchRegister error: %v", err.Error()))
log.Error(err.Error())
log.Errorf(err.Error())
return err
}



+ 10
- 5
pkg/tm/global_transaction.go View File

@@ -111,11 +111,13 @@ func (g *GlobalTransactionManager) Commit(ctx context.Context, gtr *GlobalTransa
break
}
}
if err == nil && res != nil {
if err == nil {
log.Infof("GlobalCommitRequest commit success, xid %s", gtr.Xid)
gtr.Status = res.(message.GlobalCommitResponse).GlobalStatus
UnbindXid(ctx)
return nil
}
UnbindXid(ctx)
log.Infof("GlobalCommitRequest commit success, xid %s", gtr.Xid)
log.Errorf("GlobalCommitRequest commit failed, xid %s, error %v", gtr.Xid, err)
return err
}

@@ -147,10 +149,13 @@ func (g *GlobalTransactionManager) Rollback(ctx context.Context, gtr *GlobalTran
break
}
}
if err == nil && res != nil {
if err == nil {
log.Errorf("GlobalRollbackRequest rollback success, xid %s", gtr.Xid)
gtr.Status = res.(message.GlobalRollbackResponse).GlobalStatus
UnbindXid(ctx)
return nil
}
UnbindXid(ctx)
log.Errorf("GlobalRollbackRequest rollback failed, xid %s, error %v", gtr.Xid, err)
return err
}



+ 1
- 2
test/tcc_service_test.go View File

@@ -20,7 +20,6 @@ package test
import (
"context"
"testing"
"time"

"github.com/seata/seata-go/pkg/tm"

@@ -81,6 +80,7 @@ func TestNew(test *testing.T) {
defer func() {
resp := tm.CommitOrRollback(ctx, err)
log.Infof("tx result %v", resp)
<-make(chan bool)
}()

tccService := tcc.NewTCCServiceProxy(TestTCCServiceBusiness{})
@@ -97,5 +97,4 @@ func TestNew(test *testing.T) {
return
}

time.Sleep(time.Second * 1000)
}

Loading…
Cancel
Save