diff --git a/go.mod b/go.mod index 7d0ff84e..cbbf17a9 100644 --- a/go.mod +++ b/go.mod @@ -14,6 +14,7 @@ require ( github.com/stretchr/testify v1.7.0 go.uber.org/atomic v1.9.0 go.uber.org/zap v1.19.1 + golang.org/x/tools v0.1.11 // indirect gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c // indirect gopkg.in/natefinch/lumberjack.v2 v2.0.0 // indirect vimagination.zapto.org/byteio v0.0.0-20200222190125-d27cba0f0b10 diff --git a/pkg/protocol/codec/branch_commit_response_codec.go b/pkg/protocol/codec/branch_commit_response_codec.go index a02e7777..561a4a1f 100644 --- a/pkg/protocol/codec/branch_commit_response_codec.go +++ b/pkg/protocol/codec/branch_commit_response_codec.go @@ -18,7 +18,10 @@ package codec import ( + "math" + "github.com/seata/seata-go/pkg/common/bytes" + serror "github.com/seata/seata-go/pkg/common/error" "github.com/seata/seata-go/pkg/protocol/branch" "github.com/seata/seata-go/pkg/protocol/message" ) @@ -31,23 +34,37 @@ type BranchCommitResponseCodec struct { } func (g *BranchCommitResponseCodec) Decode(in []byte) interface{} { - res := message.BranchCommitResponse{} + data := message.BranchCommitResponse{} buf := bytes.NewByteBuffer(in) - res.Xid = bytes.ReadString16Length(buf) - res.BranchId = int64(bytes.ReadUInt64(buf)) - res.BranchStatus = branch.BranchStatus(bytes.ReadByte(buf)) + data.ResultCode = message.ResultCode(bytes.ReadByte(buf)) + if data.ResultCode == message.ResultCodeFailed { + data.Msg = bytes.ReadString16Length(buf) + } + data.TransactionExceptionCode = serror.TransactionExceptionCode(bytes.ReadByte(buf)) + data.Xid = bytes.ReadString16Length(buf) + data.BranchId = int64(bytes.ReadUInt64(buf)) + data.BranchStatus = branch.BranchStatus(bytes.ReadByte(buf)) - return res + return data } func (g *BranchCommitResponseCodec) Encode(in interface{}) []byte { - req, _ := in.(message.BranchCommitResponse) + data, _ := in.(message.BranchCommitResponse) buf := bytes.NewByteBuffer([]byte{}) - bytes.WriteString16Length(req.Xid, buf) - buf.WriteInt64(req.BranchId) - buf.WriteByte(byte(req.BranchStatus)) + buf.WriteByte(byte(data.ResultCode)) + if data.ResultCode == message.ResultCodeFailed { + msg := data.Msg + if len(data.Msg) > math.MaxInt16 { + msg = data.Msg[:math.MaxInt16] + } + bytes.WriteString16Length(msg, buf) + } + buf.WriteByte(byte(data.TransactionExceptionCode)) + bytes.WriteString16Length(data.Xid, buf) + buf.WriteInt64(data.BranchId) + buf.WriteByte(byte(data.BranchStatus)) return buf.Bytes() } diff --git a/pkg/protocol/codec/branch_commit_response_codec_test.go b/pkg/protocol/codec/branch_commit_response_codec_test.go index 04f61835..6e5b0a61 100644 --- a/pkg/protocol/codec/branch_commit_response_codec_test.go +++ b/pkg/protocol/codec/branch_commit_response_codec_test.go @@ -20,6 +20,8 @@ package codec import ( "testing" + serror "github.com/seata/seata-go/pkg/common/error" + model2 "github.com/seata/seata-go/pkg/protocol/branch" "github.com/seata/seata-go/pkg/protocol/message" @@ -32,6 +34,13 @@ func TestBranchCommitResponseCodec(t *testing.T) { Xid: "123344", BranchId: 56678, BranchStatus: model2.BranchStatusPhaseoneFailed, + AbstractTransactionResponse: message.AbstractTransactionResponse{ + TransactionExceptionCode: serror.TransactionExceptionCodeBeginFailed, + AbstractResultMessage: message.AbstractResultMessage{ + ResultCode: message.ResultCodeFailed, + Msg: "FAILED", + }, + }, }, } diff --git a/pkg/protocol/codec/branch_register_response_codec.go b/pkg/protocol/codec/branch_register_response_codec.go index 9197db50..72fd18cd 100644 --- a/pkg/protocol/codec/branch_register_response_codec.go +++ b/pkg/protocol/codec/branch_register_response_codec.go @@ -18,8 +18,10 @@ package codec import ( + "math" + "github.com/seata/seata-go/pkg/common/bytes" - error2 "github.com/seata/seata-go/pkg/common/error" + serror "github.com/seata/seata-go/pkg/common/error" "github.com/seata/seata-go/pkg/protocol/message" ) @@ -38,7 +40,7 @@ func (g *BranchRegisterResponseCodec) Decode(in []byte) interface{} { if data.ResultCode == message.ResultCodeFailed { data.Msg = bytes.ReadString16Length(buf) } - data.TransactionExceptionCode = error2.TransactionExceptionCode(bytes.ReadByte(buf)) + data.TransactionExceptionCode = serror.TransactionExceptionCode(bytes.ReadByte(buf)) data.BranchId = int64(bytes.ReadUInt64(buf)) return data @@ -50,11 +52,9 @@ func (c *BranchRegisterResponseCodec) Encode(in interface{}) []byte { buf.WriteByte(byte(data.ResultCode)) if data.ResultCode == message.ResultCodeFailed { - var msg string - if len(data.Msg) > 128 { - msg = data.Msg[:128] - } else { - msg = data.Msg + msg := data.Msg + if len(data.Msg) > math.MaxInt16 { + msg = data.Msg[:math.MaxInt16] } bytes.WriteString16Length(msg, buf) } diff --git a/pkg/protocol/codec/branch_register_response_codec_test.go b/pkg/protocol/codec/branch_register_response_codec_test.go index 27445f96..8a961cac 100644 --- a/pkg/protocol/codec/branch_register_response_codec_test.go +++ b/pkg/protocol/codec/branch_register_response_codec_test.go @@ -20,7 +20,7 @@ package codec import ( "testing" - error2 "github.com/seata/seata-go/pkg/common/error" + serror "github.com/seata/seata-go/pkg/common/error" "github.com/seata/seata-go/pkg/protocol/message" "github.com/stretchr/testify/assert" @@ -33,7 +33,7 @@ func TestBranchRegisterResponseCodec(t *testing.T) { ResultCode: message.ResultCodeFailed, Msg: "FAILED", }, - TransactionExceptionCode: error2.TransactionExceptionCodeUnknown, + TransactionExceptionCode: serror.TransactionExceptionCodeUnknown, }, BranchId: 124356567, } diff --git a/pkg/protocol/codec/branch_rollback_response_codec.go b/pkg/protocol/codec/branch_rollback_response_codec.go index 0b4b0149..694d6317 100644 --- a/pkg/protocol/codec/branch_rollback_response_codec.go +++ b/pkg/protocol/codec/branch_rollback_response_codec.go @@ -18,7 +18,10 @@ package codec import ( + "math" + "github.com/seata/seata-go/pkg/common/bytes" + serror "github.com/seata/seata-go/pkg/common/error" "github.com/seata/seata-go/pkg/protocol/branch" "github.com/seata/seata-go/pkg/protocol/message" ) @@ -34,6 +37,11 @@ func (g *BranchRollbackResponseCodec) Decode(in []byte) interface{} { data := message.BranchRollbackResponse{} buf := bytes.NewByteBuffer(in) + data.ResultCode = message.ResultCode(bytes.ReadByte(buf)) + if data.ResultCode == message.ResultCodeFailed { + data.Msg = bytes.ReadString16Length(buf) + } + data.TransactionExceptionCode = serror.TransactionExceptionCode(bytes.ReadByte(buf)) data.Xid = bytes.ReadString16Length(buf) data.BranchId = int64(bytes.ReadUInt64(buf)) data.BranchStatus = branch.BranchStatus(bytes.ReadByte(buf)) @@ -45,6 +53,15 @@ func (g *BranchRollbackResponseCodec) Encode(in interface{}) []byte { data, _ := in.(message.BranchRollbackResponse) buf := bytes.NewByteBuffer([]byte{}) + buf.WriteByte(byte(data.ResultCode)) + if data.ResultCode == message.ResultCodeFailed { + msg := data.Msg + if len(data.Msg) > math.MaxInt16 { + msg = data.Msg[:math.MaxInt16] + } + bytes.WriteString16Length(msg, buf) + } + buf.WriteByte(byte(data.TransactionExceptionCode)) bytes.WriteString16Length(data.Xid, buf) buf.WriteInt64(data.BranchId) buf.WriteByte(byte(data.BranchStatus)) diff --git a/pkg/protocol/codec/branch_rollback_response_codec_test.go b/pkg/protocol/codec/branch_rollback_response_codec_test.go index 641decc7..0059ca69 100644 --- a/pkg/protocol/codec/branch_rollback_response_codec_test.go +++ b/pkg/protocol/codec/branch_rollback_response_codec_test.go @@ -20,6 +20,8 @@ package codec import ( "testing" + serror "github.com/seata/seata-go/pkg/common/error" + model2 "github.com/seata/seata-go/pkg/protocol/branch" "github.com/seata/seata-go/pkg/protocol/message" @@ -32,6 +34,13 @@ func TestBranchRollbackResponseCodec(t *testing.T) { Xid: "123344", BranchId: 56678, BranchStatus: model2.BranchStatusPhaseoneFailed, + AbstractTransactionResponse: message.AbstractTransactionResponse{ + TransactionExceptionCode: serror.TransactionExceptionCodeBeginFailed, + AbstractResultMessage: message.AbstractResultMessage{ + ResultCode: message.ResultCodeFailed, + Msg: "FAILED", + }, + }, }, } diff --git a/pkg/protocol/codec/common_global_end_response_codec.go b/pkg/protocol/codec/common_global_end_response_codec.go index 62901db8..cb9f521c 100644 --- a/pkg/protocol/codec/common_global_end_response_codec.go +++ b/pkg/protocol/codec/common_global_end_response_codec.go @@ -18,8 +18,10 @@ package codec import ( + "math" + "github.com/seata/seata-go/pkg/common/bytes" - transaction2 "github.com/seata/seata-go/pkg/common/error" + serror "github.com/seata/seata-go/pkg/common/error" "github.com/seata/seata-go/pkg/protocol/message" ) @@ -32,11 +34,9 @@ func (c *CommonGlobalEndResponseCodec) Encode(in interface{}) []byte { buf.WriteByte(byte(data.ResultCode)) if data.ResultCode == message.ResultCodeFailed { - var msg string - if len(data.Msg) > 128 { - msg = data.Msg[:128] - } else { - msg = data.Msg + msg := data.Msg + if len(data.Msg) > math.MaxInt16 { + msg = data.Msg[:math.MaxInt16] } bytes.WriteString16Length(msg, buf) } @@ -54,7 +54,7 @@ func (c *CommonGlobalEndResponseCodec) Decode(in []byte) interface{} { if data.ResultCode == message.ResultCodeFailed { data.Msg = bytes.ReadString16Length(buf) } - data.TransactionExceptionCode = transaction2.TransactionExceptionCode(bytes.ReadByte(buf)) + data.TransactionExceptionCode = serror.TransactionExceptionCode(bytes.ReadByte(buf)) data.GlobalStatus = message.GlobalStatus(bytes.ReadByte(buf)) return data diff --git a/pkg/protocol/codec/global_begin_response_codec.go b/pkg/protocol/codec/global_begin_response_codec.go index bc6bcd6d..b0a3b0df 100644 --- a/pkg/protocol/codec/global_begin_response_codec.go +++ b/pkg/protocol/codec/global_begin_response_codec.go @@ -18,8 +18,10 @@ package codec import ( + "math" + "github.com/seata/seata-go/pkg/common/bytes" - error2 "github.com/seata/seata-go/pkg/common/error" + serror "github.com/seata/seata-go/pkg/common/error" "github.com/seata/seata-go/pkg/protocol/message" ) @@ -36,11 +38,9 @@ func (c *GlobalBeginResponseCodec) Encode(in interface{}) []byte { buf.WriteByte(byte(data.ResultCode)) if data.ResultCode == message.ResultCodeFailed { - var msg string - if len(data.Msg) > 128 { - msg = data.Msg[:128] - } else { - msg = data.Msg + msg := data.Msg + if len(data.Msg) > math.MaxInt16 { + msg = data.Msg[:math.MaxInt16] } bytes.WriteString16Length(msg, buf) } @@ -59,7 +59,7 @@ func (g *GlobalBeginResponseCodec) Decode(in []byte) interface{} { if data.ResultCode == message.ResultCodeFailed { data.Msg = bytes.ReadString16Length(buf) } - data.TransactionExceptionCode = error2.TransactionExceptionCode(bytes.ReadByte(buf)) + data.TransactionExceptionCode = serror.TransactionExceptionCode(bytes.ReadByte(buf)) data.Xid = bytes.ReadString16Length(buf) data.ExtraData = []byte(bytes.ReadString16Length(buf)) diff --git a/pkg/protocol/codec/global_begin_response_codec_test.go b/pkg/protocol/codec/global_begin_response_codec_test.go index a04f9f0d..2b645fba 100644 --- a/pkg/protocol/codec/global_begin_response_codec_test.go +++ b/pkg/protocol/codec/global_begin_response_codec_test.go @@ -20,7 +20,7 @@ package codec import ( "testing" - error2 "github.com/seata/seata-go/pkg/common/error" + serror "github.com/seata/seata-go/pkg/common/error" "github.com/seata/seata-go/pkg/protocol/message" "github.com/stretchr/testify/assert" @@ -33,7 +33,7 @@ func TestGlobalBeginResponseCodec(t *testing.T) { ResultCode: message.ResultCodeFailed, Msg: "FAILED", }, - TransactionExceptionCode: error2.TransactionExceptionCodeBeginFailed, + TransactionExceptionCode: serror.TransactionExceptionCodeBeginFailed, }, Xid: "test-transaction-id", diff --git a/pkg/protocol/message/other_message.go b/pkg/protocol/message/other_message.go index 52e3cd65..ea3e8c25 100644 --- a/pkg/protocol/message/other_message.go +++ b/pkg/protocol/message/other_message.go @@ -56,3 +56,7 @@ func (msg HeartBeatMessage) ToString() string { return "services pong" } } + +func (resp HeartBeatMessage) GetTypeCode() MessageType { + return MessageType_HeartbeatMsg +} diff --git a/pkg/protocol/message/response_message.go b/pkg/protocol/message/response_message.go index 1d5eeb21..06ab5559 100644 --- a/pkg/protocol/message/response_message.go +++ b/pkg/protocol/message/response_message.go @@ -18,13 +18,13 @@ package message import ( - transaction2 "github.com/seata/seata-go/pkg/common/error" + serror "github.com/seata/seata-go/pkg/common/error" model2 "github.com/seata/seata-go/pkg/protocol/branch" ) type AbstractTransactionResponse struct { AbstractResultMessage - TransactionExceptionCode transaction2.TransactionExceptionCode + TransactionExceptionCode serror.TransactionExceptionCode } type AbstractBranchEndResponse struct { diff --git a/pkg/remoting/getty/getty_remoting.go b/pkg/remoting/getty/getty_remoting.go index ffc24fc0..6f1c9d6b 100644 --- a/pkg/remoting/getty/getty_remoting.go +++ b/pkg/remoting/getty/getty_remoting.go @@ -72,7 +72,7 @@ func (client *GettyRemoting) SendASync(msg message.RpcMessage) error { } func (client *GettyRemoting) sendAsync(session getty.Session, msg message.RpcMessage, timeout time.Duration) (interface{}, error) { - log.Infof("send message: {%#v}", msg) + log.Infof("send async message: {%#v}", msg) var err error if session == nil || session.IsClosed() { log.Warn("sendAsyncRequestWithResponse nothing, caused by null channel.") diff --git a/pkg/remoting/getty/listener.go b/pkg/remoting/getty/listener.go index 71a25539..24feb410 100644 --- a/pkg/remoting/getty/listener.go +++ b/pkg/remoting/getty/listener.go @@ -68,7 +68,7 @@ func (client *gettyClientHandler) OnOpen(session getty.Session) error { err := GetGettyRemotingClient().SendAsyncRequest(request) //client.sendAsyncRequestWithResponse(session, request, RPC_REQUEST_TIMEOUT) if err != nil { - log.Error("OnOpen error: {%#v}", err.Error()) + log.Errorf("OnOpen error: {%#v}", err.Error()) sessionManager.ReleaseGettySession(session) return } @@ -111,7 +111,7 @@ func (client *gettyClientHandler) OnMessage(session getty.Session, pkg interface } func (client *gettyClientHandler) OnCron(session getty.Session) { - // todo send heartbeat message + GetGettyRemotingClient().SendAsyncRequest(message.HeartBeatMessagePing) } func (client *gettyClientHandler) RegisterProcessor(msgType message.MessageType, processor processor.RemotingProcessor) { diff --git a/pkg/remoting/processor/client/client_heart_beat_processon.go b/pkg/remoting/processor/client/client_heart_beat_processon.go index a29d8d6c..acb04e46 100644 --- a/pkg/remoting/processor/client/client_heart_beat_processon.go +++ b/pkg/remoting/processor/client/client_heart_beat_processon.go @@ -32,10 +32,10 @@ func init() { type clientHeartBeatProcesson struct{} func (f *clientHeartBeatProcesson) Process(ctx context.Context, rpcMessage message.RpcMessage) error { - if _, ok := rpcMessage.Body.(message.HeartBeatMessage); ok { - // TODO 如何从context中获取远程服务的信息? - //log.Infof("received PONG from {}", ctx.channel().remoteAddress()) - log.Infof("received PONG from {}", ctx) + if msg, ok := rpcMessage.Body.(message.HeartBeatMessage); ok { + if !msg.Ping { + log.Infof("received PONG from {}", ctx) + } } return nil } diff --git a/pkg/remoting/processor/client/client_on_response_processor.go b/pkg/remoting/processor/client/client_on_response_processor.go index d811cabb..45fa9a83 100644 --- a/pkg/remoting/processor/client/client_on_response_processor.go +++ b/pkg/remoting/processor/client/client_on_response_processor.go @@ -23,23 +23,23 @@ import ( "github.com/seata/seata-go/pkg/common/log" "github.com/seata/seata-go/pkg/protocol/message" - getty2 "github.com/seata/seata-go/pkg/remoting/getty" + "github.com/seata/seata-go/pkg/remoting/getty" ) func init() { clientOnResponseProcessor := &clientOnResponseProcessor{} - getty2.GetGettyClientHandlerInstance().RegisterProcessor(message.MessageType_SeataMergeResult, clientOnResponseProcessor) - getty2.GetGettyClientHandlerInstance().RegisterProcessor(message.MessageType_BranchRegisterResult, clientOnResponseProcessor) - getty2.GetGettyClientHandlerInstance().RegisterProcessor(message.MessageType_BranchStatusReportResult, clientOnResponseProcessor) - getty2.GetGettyClientHandlerInstance().RegisterProcessor(message.MessageType_GlobalLockQueryResult, clientOnResponseProcessor) - getty2.GetGettyClientHandlerInstance().RegisterProcessor(message.MessageType_RegRmResult, clientOnResponseProcessor) - getty2.GetGettyClientHandlerInstance().RegisterProcessor(message.MessageType_GlobalBeginResult, clientOnResponseProcessor) - getty2.GetGettyClientHandlerInstance().RegisterProcessor(message.MessageType_GlobalCommitResult, clientOnResponseProcessor) + getty.GetGettyClientHandlerInstance().RegisterProcessor(message.MessageType_SeataMergeResult, clientOnResponseProcessor) + getty.GetGettyClientHandlerInstance().RegisterProcessor(message.MessageType_BranchRegisterResult, clientOnResponseProcessor) + getty.GetGettyClientHandlerInstance().RegisterProcessor(message.MessageType_BranchStatusReportResult, clientOnResponseProcessor) + getty.GetGettyClientHandlerInstance().RegisterProcessor(message.MessageType_GlobalLockQueryResult, clientOnResponseProcessor) + getty.GetGettyClientHandlerInstance().RegisterProcessor(message.MessageType_RegRmResult, clientOnResponseProcessor) + getty.GetGettyClientHandlerInstance().RegisterProcessor(message.MessageType_GlobalBeginResult, clientOnResponseProcessor) + getty.GetGettyClientHandlerInstance().RegisterProcessor(message.MessageType_GlobalCommitResult, clientOnResponseProcessor) - getty2.GetGettyClientHandlerInstance().RegisterProcessor(message.MessageType_GlobalReportResult, clientOnResponseProcessor) - getty2.GetGettyClientHandlerInstance().RegisterProcessor(message.MessageType_GlobalRollbackResult, clientOnResponseProcessor) - getty2.GetGettyClientHandlerInstance().RegisterProcessor(message.MessageType_GlobalStatusResult, clientOnResponseProcessor) - getty2.GetGettyClientHandlerInstance().RegisterProcessor(message.MessageType_RegCltResult, clientOnResponseProcessor) + getty.GetGettyClientHandlerInstance().RegisterProcessor(message.MessageType_GlobalReportResult, clientOnResponseProcessor) + getty.GetGettyClientHandlerInstance().RegisterProcessor(message.MessageType_GlobalRollbackResult, clientOnResponseProcessor) + getty.GetGettyClientHandlerInstance().RegisterProcessor(message.MessageType_GlobalStatusResult, clientOnResponseProcessor) + getty.GetGettyClientHandlerInstance().RegisterProcessor(message.MessageType_RegCltResult, clientOnResponseProcessor) } type clientOnResponseProcessor struct { @@ -48,26 +48,26 @@ type clientOnResponseProcessor struct { func (f *clientOnResponseProcessor) Process(ctx context.Context, rpcMessage message.RpcMessage) error { log.Infof("the rm client received clientOnResponse msg %#v from tc server.", rpcMessage) if mergedResult, ok := rpcMessage.Body.(message.MergeResultMessage); ok { - mergedMessage := getty2.GetGettyRemotingInstance().GetMergedMessage(rpcMessage.ID) + mergedMessage := getty.GetGettyRemotingInstance().GetMergedMessage(rpcMessage.ID) if mergedMessage != nil { for i := 0; i < len(mergedMessage.Msgs); i++ { msgID := mergedMessage.MsgIds[i] - response := getty2.GetGettyRemotingInstance().GetMessageFuture(msgID) + response := getty.GetGettyRemotingInstance().GetMessageFuture(msgID) if response != nil { response.Response = mergedResult.Msgs[i] response.Done <- struct{}{} - getty2.GetGettyRemotingInstance().RemoveMessageFuture(msgID) + getty.GetGettyRemotingInstance().RemoveMessageFuture(msgID) } } - getty2.GetGettyRemotingInstance().RemoveMergedMessageFuture(rpcMessage.ID) + getty.GetGettyRemotingInstance().RemoveMergedMessageFuture(rpcMessage.ID) } return nil } else { // 如果是请求消息,做处理逻辑 - msgFuture := getty2.GetGettyRemotingInstance().GetMessageFuture(rpcMessage.ID) + msgFuture := getty.GetGettyRemotingInstance().GetMessageFuture(rpcMessage.ID) if msgFuture != nil { - getty2.GetGettyRemotingInstance().NotifyRpcMessageResponse(rpcMessage) - getty2.GetGettyRemotingInstance().RemoveMessageFuture(rpcMessage.ID) + getty.GetGettyRemotingInstance().NotifyRpcMessageResponse(rpcMessage) + getty.GetGettyRemotingInstance().RemoveMessageFuture(rpcMessage.ID) } else { if _, ok := rpcMessage.Body.(message.AbstractResultMessage); ok { log.Infof("the rm client received response msg [{}] from tc server.", msgFuture) diff --git a/pkg/remoting/processor/client/rm_branch_commit_processor.go b/pkg/remoting/processor/client/rm_branch_commit_processor.go index 4e48565b..7c8e44ef 100644 --- a/pkg/remoting/processor/client/rm_branch_commit_processor.go +++ b/pkg/remoting/processor/client/rm_branch_commit_processor.go @@ -23,13 +23,13 @@ import ( "github.com/seata/seata-go/pkg/common/log" "github.com/seata/seata-go/pkg/protocol/message" - getty2 "github.com/seata/seata-go/pkg/remoting/getty" + "github.com/seata/seata-go/pkg/remoting/getty" "github.com/seata/seata-go/pkg/rm" ) func init() { rmBranchCommitProcessor := &rmBranchCommitProcessor{} - getty2.GetGettyClientHandlerInstance().RegisterProcessor(message.MessageType_BranchCommit, rmBranchCommitProcessor) + getty.GetGettyClientHandlerInstance().RegisterProcessor(message.MessageType_BranchCommit, rmBranchCommitProcessor) } type rmBranchCommitProcessor struct { @@ -46,22 +46,42 @@ func (f *rmBranchCommitProcessor) Process(ctx context.Context, rpcMessage messag status, err := rm.GetResourceManagerInstance().GetResourceManager(request.BranchType).BranchCommit(ctx, request.BranchType, xid, branchID, resourceID, applicationData) if err != nil { - log.Infof("Branch commit error: %s", err.Error()) + log.Infof("branch commit error: %s", err.Error()) return err } + log.Infof("branch commit success: xid %s, branchID %s, resourceID %s, applicationData %s", xid, branchID, resourceID, applicationData) + + var ( + resultCode message.ResultCode + errMsg string + ) + if err != nil { + resultCode = message.ResultCodeFailed + errMsg = err.Error() + } else { + resultCode = message.ResultCodeSuccess + } // reply commit response to tc server + // todo add TransactionExceptionCode response := message.BranchCommitResponse{ AbstractBranchEndResponse: message.AbstractBranchEndResponse{ + AbstractTransactionResponse: message.AbstractTransactionResponse{ + AbstractResultMessage: message.AbstractResultMessage{ + ResultCode: resultCode, + Msg: errMsg, + }, + }, Xid: xid, BranchId: branchID, BranchStatus: status, }, } - err = getty2.GetGettyRemotingClient().SendAsyncResponse(response) + err = getty.GetGettyRemotingClient().SendAsyncResponse(response) if err != nil { - log.Error("BranchCommitResponse error: {%#v}", err.Error()) + log.Errorf("send branch commit response error: {%#v}", err.Error()) return err } + log.Infof("send branch commit response success: xid %s, branchID %s, resourceID %s, applicationData %s", xid, branchID, resourceID, applicationData) return nil } diff --git a/pkg/remoting/processor/client/rm_branch_rollback_processor.go b/pkg/remoting/processor/client/rm_branch_rollback_processor.go index 4507459f..b7ef81dd 100644 --- a/pkg/remoting/processor/client/rm_branch_rollback_processor.go +++ b/pkg/remoting/processor/client/rm_branch_rollback_processor.go @@ -23,13 +23,13 @@ import ( "github.com/seata/seata-go/pkg/common/log" "github.com/seata/seata-go/pkg/protocol/message" - getty2 "github.com/seata/seata-go/pkg/remoting/getty" + "github.com/seata/seata-go/pkg/remoting/getty" "github.com/seata/seata-go/pkg/rm" ) func init() { rmBranchRollbackProcessor := &rmBranchRollbackProcessor{} - getty2.GetGettyClientHandlerInstance().RegisterProcessor(message.MessageType_BranchRollback, rmBranchRollbackProcessor) + getty.GetGettyClientHandlerInstance().RegisterProcessor(message.MessageType_BranchRollback, rmBranchRollbackProcessor) } type rmBranchRollbackProcessor struct { @@ -42,26 +42,44 @@ func (f *rmBranchRollbackProcessor) Process(ctx context.Context, rpcMessage mess branchID := request.BranchId resourceID := request.ResourceId applicationData := request.ApplicationData - log.Infof("Branch committing: xid %s, branchID %s, resourceID %s, applicationData %s", xid, branchID, resourceID, applicationData) + log.Infof("Branch rollback request: xid %s, branchID %s, resourceID %s, applicationData %s", xid, branchID, resourceID, applicationData) status, err := rm.GetResourceManagerInstance().GetResourceManager(request.BranchType).BranchRollback(ctx, request.BranchType, xid, branchID, resourceID, applicationData) if err != nil { - log.Infof("Branch commit error: %s", err.Error()) + log.Infof("branch rollback error: %s", err.Error()) return err } + log.Infof("branch rollback success: xid %s, branchID %s, resourceID %s, applicationData %s", xid, branchID, resourceID, applicationData) + var ( + resultCode message.ResultCode + errMsg string + ) + if err != nil { + resultCode = message.ResultCodeFailed + errMsg = err.Error() + } else { + resultCode = message.ResultCodeSuccess + } // reply commit response to tc server response := message.BranchRollbackResponse{ AbstractBranchEndResponse: message.AbstractBranchEndResponse{ + AbstractTransactionResponse: message.AbstractTransactionResponse{ + AbstractResultMessage: message.AbstractResultMessage{ + ResultCode: resultCode, + Msg: errMsg, + }, + }, Xid: xid, BranchId: branchID, BranchStatus: status, }, } - err = getty2.GetGettyRemotingClient().SendAsyncResponse(response) + err = getty.GetGettyRemotingClient().SendAsyncResponse(response) if err != nil { - log.Error("BranchCommitResponse error: {%#v}", err.Error()) + log.Errorf("send branch rollback response error: {%#v}", err.Error()) return err } + log.Infof("send branch rollback response success: xid %s, branchID %s, resourceID %s, applicationData %s", xid, branchID, resourceID, applicationData) return nil } diff --git a/pkg/rm/tcc/tcc_service.go b/pkg/rm/tcc/tcc_service.go index 7bfca407..f7b94bf7 100644 --- a/pkg/rm/tcc/tcc_service.go +++ b/pkg/rm/tcc/tcc_service.go @@ -96,7 +96,7 @@ func (t *TCCServiceProxy) RegisteBranch(ctx context.Context, param interface{}) branchId, err := rm.GetRMRemotingInstance().BranchRegister(branch.BranchTypeTCC, t.GetActionName(), "", tm.GetXID(ctx), string(tccContextStr), "") if err != nil { err = errors.New(fmt.Sprintf("BranchRegister error: %v", err.Error())) - log.Error(err.Error()) + log.Errorf(err.Error()) return err } diff --git a/pkg/tm/global_transaction.go b/pkg/tm/global_transaction.go index 1ff3c39b..593091dc 100644 --- a/pkg/tm/global_transaction.go +++ b/pkg/tm/global_transaction.go @@ -111,11 +111,13 @@ func (g *GlobalTransactionManager) Commit(ctx context.Context, gtr *GlobalTransa break } } - if err == nil && res != nil { + if err == nil { + log.Infof("GlobalCommitRequest commit success, xid %s", gtr.Xid) gtr.Status = res.(message.GlobalCommitResponse).GlobalStatus + UnbindXid(ctx) + return nil } - UnbindXid(ctx) - log.Infof("GlobalCommitRequest commit success, xid %s", gtr.Xid) + log.Errorf("GlobalCommitRequest commit failed, xid %s, error %v", gtr.Xid, err) return err } @@ -147,10 +149,13 @@ func (g *GlobalTransactionManager) Rollback(ctx context.Context, gtr *GlobalTran break } } - if err == nil && res != nil { + if err == nil { + log.Errorf("GlobalRollbackRequest rollback success, xid %s", gtr.Xid) gtr.Status = res.(message.GlobalRollbackResponse).GlobalStatus + UnbindXid(ctx) + return nil } - UnbindXid(ctx) + log.Errorf("GlobalRollbackRequest rollback failed, xid %s, error %v", gtr.Xid, err) return err } diff --git a/test/tcc_service_test.go b/test/tcc_service_test.go index e0712f0b..6ab9824c 100644 --- a/test/tcc_service_test.go +++ b/test/tcc_service_test.go @@ -20,7 +20,6 @@ package test import ( "context" "testing" - "time" "github.com/seata/seata-go/pkg/tm" @@ -81,6 +80,7 @@ func TestNew(test *testing.T) { defer func() { resp := tm.CommitOrRollback(ctx, err) log.Infof("tx result %v", resp) + <-make(chan bool) }() tccService := tcc.NewTCCServiceProxy(TestTCCServiceBusiness{}) @@ -97,5 +97,4 @@ func TestNew(test *testing.T) { return } - time.Sleep(time.Second * 1000) }