Browse Source

Adjust the structure of the project

tags/v0.1.0-rc1
luky116_Liuyuecai 3 years ago
parent
commit
1c0cc22211
70 changed files with 1477 additions and 1558 deletions
  1. +0
    -0
      pkg/common/error/error.go
  2. +0
    -0
      pkg/common/log/logging.go
  3. +0
    -8
      pkg/common/model/resource.go
  4. +0
    -43
      pkg/common/model/resource_manager.go
  5. +1
    -1
      pkg/common/net/net_utils.go
  6. +0
    -0
      pkg/common/runtime/goroutine.go
  7. +0
    -0
      pkg/common/xid/xid_utils.go
  8. +2
    -2
      pkg/imports/imports.go
  9. +1
    -1
      pkg/protocol/branch/branch.go
  10. +64
    -67
      pkg/protocol/codec/codec.go
  11. +83
    -85
      pkg/protocol/codec/seata_decoder.go
  12. +43
    -46
      pkg/protocol/codec/seata_encoder.go
  13. +0
    -30
      pkg/protocol/constant.go
  14. +0
    -16
      pkg/protocol/heart_beat_message.go
  15. +0
    -20
      pkg/protocol/identify.go
  16. +0
    -18
      pkg/protocol/merged_message.go
  17. +151
    -0
      pkg/protocol/message/constant.go
  18. +48
    -0
      pkg/protocol/message/message_apis.go
  19. +41
    -0
      pkg/protocol/message/other_message.go
  20. +154
    -0
      pkg/protocol/message/request_message.go
  21. +109
    -0
      pkg/protocol/message/response_message.go
  22. +0
    -17
      pkg/protocol/message_future.go
  23. +0
    -121
      pkg/protocol/message_type.go
  24. +0
    -5
      pkg/protocol/message_type_aware.go
  25. +0
    -226
      pkg/protocol/request_response.go
  26. +51
    -0
      pkg/protocol/resource/resource.go
  27. +0
    -18
      pkg/protocol/result_code.go
  28. +0
    -18
      pkg/protocol/rm.go
  29. +0
    -10
      pkg/protocol/rpc_message.go
  30. +0
    -17
      pkg/protocol/tm.go
  31. +1
    -1
      pkg/protocol/transaction/context.go
  32. +28
    -24
      pkg/protocol/transaction/executor/transactional_template.go
  33. +1
    -1
      pkg/protocol/transaction/expection_code.go
  34. +154
    -0
      pkg/protocol/transaction/manager/global_transaction_manager.go
  35. +0
    -33
      pkg/protocol/transaction/rm_handler.go
  36. +1
    -1
      pkg/protocol/transaction/transaction.go
  37. +1
    -1
      pkg/protocol/transaction/transaction_manager.go
  38. +1
    -1
      pkg/protocol/transaction/transaction_status.go
  39. +12
    -12
      pkg/remoting/getty/getty_client.go
  40. +12
    -15
      pkg/remoting/getty/getty_remoting.go
  41. +25
    -25
      pkg/remoting/getty/listener.go
  42. +15
    -15
      pkg/remoting/getty/readwriter.go
  43. +1
    -1
      pkg/remoting/getty/rpc_client.go
  44. +0
    -0
      pkg/remoting/getty/session_manager.go
  45. +23
    -0
      pkg/remoting/processor/client/client_heart_beat_processon.go
  46. +55
    -0
      pkg/remoting/processor/client/client_on_response_processor.go
  47. +10
    -10
      pkg/remoting/processor/client/rm_branch_commit_processor.go
  48. +10
    -10
      pkg/remoting/processor/client/rm_branch_rollback_processor.go
  49. +10
    -0
      pkg/remoting/processor/remoting_processor.go
  50. +0
    -8
      pkg/rm/api/transactional_executor.go
  51. +64
    -0
      pkg/rm/common/handler/rm_handler.go
  52. +8
    -11
      pkg/rm/common/handler/rm_handler_facade.go
  53. +19
    -18
      pkg/rm/common/remoting/rm_remoting.go
  54. +8
    -8
      pkg/rm/common/remoting/rm_remoting_test.go
  55. +0
    -54
      pkg/rm/common_rm_handler.go
  56. +87
    -0
      pkg/rm/resource_manager.go
  57. +0
    -89
      pkg/rm/resource_manager_facade.go
  58. +22
    -0
      pkg/rm/tcc/handler/tcc_rm_handler.go
  59. +135
    -3
      pkg/rm/tcc/tcc_resource.go
  60. +0
    -135
      pkg/rm/tcc/tcc_rm.go
  61. +0
    -20
      pkg/rm/tcc/tcc_rm_handler.go
  62. +0
    -22
      pkg/rm/tcc/tcc_runner.go
  63. +16
    -15
      pkg/rm/tcc/tcc_service.go
  64. +0
    -26
      pkg/rpc/processor/client/client_heart_beat_processon.go
  65. +0
    -58
      pkg/rpc/processor/client/client_on_response_processor.go
  66. +0
    -13
      pkg/rpc/processor/remoting_processor.go
  67. +5
    -0
      pkg/server/server.go
  68. +0
    -154
      pkg/tm/global_transaction.go
  69. +3
    -3
      test/tcc_service_test.go
  70. +2
    -2
      testdata/mock/mock_tcc_service.go

pkg/utils/error/error.go → pkg/common/error/error.go View File


pkg/utils/log/logging.go → pkg/common/log/logging.go View File


+ 0
- 8
pkg/common/model/resource.go View File

@@ -1,8 +0,0 @@
package model

// Resource that can be managed by Resource Manager and involved into global transaction
type Resource interface {
GetResourceGroupId() string
GetResourceId() string
GetBranchType() BranchType
}

+ 0
- 43
pkg/common/model/resource_manager.go View File

@@ -1,43 +0,0 @@
package model

import (
"context"
"sync"
)

// Control a branch transaction commit or rollback
type ResourceManagerInbound interface {
// Commit a branch transaction
BranchCommit(ctx context.Context, branchType BranchType, xid string, branchId int64, resourceId string, applicationData []byte) (BranchStatus, error)
// Rollback a branch transaction
BranchRollback(ctx context.Context, ranchType BranchType, xid string, branchId int64, resourceId string, applicationData []byte) (BranchStatus, error)
}

// Resource Manager: send outbound request to TC
type ResourceManagerOutbound interface {
// Branch register long
BranchRegister(ctx context.Context, ranchType BranchType, resourceId, clientId, xid, applicationData, lockKeys string) (int64, error)
// Branch report
BranchReport(ctx context.Context, ranchType BranchType, xid string, branchId int64, status BranchStatus, applicationData string) error
// Lock query boolean
LockQuery(ctx context.Context, ranchType BranchType, resourceId, xid, lockKeys string) (bool, error)
}

// Resource Manager: common behaviors
type ResourceManager interface {
ResourceManagerInbound
ResourceManagerOutbound

// Register a Resource to be managed by Resource Manager
RegisterResource(resource Resource) error
// Unregister a Resource from the Resource Manager
UnregisterResource(resource Resource) error
// Get all resources managed by this manager
GetManagedResources() sync.Map
// Get the BranchType
GetBranchType() BranchType
}

type ResourceManagerGetter interface {
GetResourceManager() ResourceManager
}

pkg/utils/net_utils.go → pkg/common/net/net_utils.go View File

@@ -1,4 +1,4 @@
package utils
package net

func GetLocalIp() string {
// todo

pkg/utils/runtime/goroutine.go → pkg/common/runtime/goroutine.go View File


pkg/utils/xid/xid_utils.go → pkg/common/xid/xid_utils.go View File


+ 2
- 2
pkg/imports/imports.go View File

@@ -1,7 +1,7 @@
package imports

import (
_ "github.com/seata/seata-go/pkg/remoting/getty"
_ "github.com/seata/seata-go/pkg/remoting/processor/client"
_ "github.com/seata/seata-go/pkg/rm/tcc"
_ "github.com/seata/seata-go/pkg/rpc/getty"
_ "github.com/seata/seata-go/pkg/rpc/processor/client"
)

pkg/common/model/branch.go → pkg/protocol/branch/branch.go View File

@@ -1,4 +1,4 @@
package model
package branch

import (
"fmt"

+ 64
- 67
pkg/protocol/codec/codec.go View File

@@ -2,17 +2,14 @@ package codec

import (
"bytes"
"github.com/seata/seata-go/pkg/common/log"
"github.com/seata/seata-go/pkg/protocol/message"
)

import (
"vimagination.zapto.org/byteio"
)

import (
"github.com/seata/seata-go/pkg/protocol"
"github.com/seata/seata-go/pkg/utils/log"
)

type SerializerType byte

// TODO 待重构
@@ -49,7 +46,7 @@ func MessageDecoder(codecType byte, in []byte) (interface{}, int) {

func SeataEncoder(in interface{}) []byte {
var result = make([]byte, 0)
msg := in.(protocol.MessageTypeAware)
msg := in.(message.MessageTypeAware)
typeCode := msg.GetTypeCode()
encoder := getMessageEncoder(typeCode)

@@ -66,32 +63,32 @@ func SeataDecoder(in []byte) (interface{}, int) {
r := byteio.BigEndianReader{Reader: bytes.NewReader(in)}
typeCode, _, _ := r.ReadInt16()

decoder := getMessageDecoder(protocol.MessageType(typeCode))
decoder := getMessageDecoder(message.MessageType(typeCode))
if decoder != nil {
return decoder(in[2:])
}
return nil, 0
}

func getMessageEncoder(typeCode protocol.MessageType) Encoder {
func getMessageEncoder(typeCode message.MessageType) Encoder {
switch typeCode {
case protocol.MessageTypeSeataMerge:
case message.MessageType_SeataMerge:
return MergedWarpMessageEncoder
case protocol.MessageTypeSeataMergeResult:
case message.MessageType_SeataMergeResult:
return MergeResultMessageEncoder
case protocol.MessageTypeRegClt:
case message.MessageType_RegClt:
return RegisterTMRequestEncoder
case protocol.MessageTypeRegCltResult:
case message.MessageType_RegCltResult:
return RegisterTMResponseEncoder
case protocol.MessageTypeRegRm:
case message.MessageType_RegRm:
return RegisterRMRequestEncoder
case protocol.MessageTypeRegRmResult:
case message.MessageType_RegRmResult:
return RegisterRMResponseEncoder
case protocol.MessageTypeBranchCommit:
case message.MessageType_BranchCommit:
return BranchCommitRequestEncoder
case protocol.MessageTypeBranchRollback:
case message.MessageType_BranchRollback:
return BranchRollbackRequestEncoder
case protocol.MessageTypeGlobalReport:
case message.MessageType_GlobalReport:
return GlobalReportRequestEncoder
default:
var encoder Encoder
@@ -108,23 +105,23 @@ func getMessageEncoder(typeCode protocol.MessageType) Encoder {
}
}

func getMergeRequestMessageEncoder(typeCode protocol.MessageType) Encoder {
func getMergeRequestMessageEncoder(typeCode message.MessageType) Encoder {
switch typeCode {
case protocol.MessageTypeGlobalBegin:
case message.MessageType_GlobalBegin:
return GlobalBeginRequestEncoder
case protocol.MessageTypeGlobalCommit:
case message.MessageType_GlobalCommit:
return GlobalCommitRequestEncoder
case protocol.MessageTypeGlobalRollback:
case message.MessageType_GlobalRollback:
return GlobalRollbackRequestEncoder
case protocol.MessageTypeGlobalStatus:
case message.MessageType_GlobalStatus:
return GlobalStatusRequestEncoder
case protocol.MessageTypeGlobalLockQuery:
case message.MessageType_GlobalLockQuery:
return GlobalLockQueryRequestEncoder
case protocol.MessageTypeBranchRegister:
case message.MessageType_BranchRegister:
return BranchRegisterRequestEncoder
case protocol.MessageTypeBranchStatusReport:
case message.MessageType_BranchStatusReport:
return BranchReportRequestEncoder
case protocol.MessageTypeGlobalReport:
case message.MessageType_GlobalReport:
return GlobalReportRequestEncoder
default:
break
@@ -132,27 +129,27 @@ func getMergeRequestMessageEncoder(typeCode protocol.MessageType) Encoder {
return nil
}

func getMergeResponseMessageEncoder(typeCode protocol.MessageType) Encoder {
func getMergeResponseMessageEncoder(typeCode message.MessageType) Encoder {
switch typeCode {
case protocol.MessageTypeGlobalBeginResult:
case message.MessageType_GlobalBeginResult:
return GlobalBeginResponseEncoder
case protocol.MessageTypeGlobalCommitResult:
case message.MessageType_GlobalCommitResult:
return GlobalCommitResponseEncoder
case protocol.MessageTypeGlobalRollbackResult:
case message.MessageType_GlobalRollbackResult:
return GlobalRollbackResponseEncoder
case protocol.MessageTypeGlobalStatusResult:
case message.MessageType_GlobalStatusResult:
return GlobalStatusResponseEncoder
case protocol.MessageTypeGlobalLockQueryResult:
case message.MessageType_GlobalLockQueryResult:
return GlobalLockQueryResponseEncoder
case protocol.MessageTypeBranchRegisterResult:
case message.MessageType_BranchRegisterResult:
return BranchRegisterResponseEncoder
case protocol.MessageTypeBranchStatusReportResult:
case message.MessageType_BranchStatusReportResult:
return BranchReportResponseEncoder
case protocol.MessageTypeBranchCommitResult:
case message.MessageType_BranchCommitResult:
return BranchCommitResponseEncoder
case protocol.MessageTypeBranchRollbackResult:
case message.MessageType_BranchRollbackResult:
return BranchRollbackResponseEncoder
case protocol.MessageTypeGlobalReportResult:
case message.MessageType_GlobalReportResult:
return GlobalReportResponseEncoder
default:
break
@@ -160,25 +157,25 @@ func getMergeResponseMessageEncoder(typeCode protocol.MessageType) Encoder {
return nil
}

func getMessageDecoder(typeCode protocol.MessageType) Decoder {
func getMessageDecoder(typeCode message.MessageType) Decoder {
switch typeCode {
case protocol.MessageTypeSeataMerge:
case message.MessageType_SeataMerge:
return MergedWarpMessageDecoder
case protocol.MessageTypeSeataMergeResult:
case message.MessageType_SeataMergeResult:
return MergeResultMessageDecoder
case protocol.MessageTypeRegClt:
case message.MessageType_RegClt:
return RegisterTMRequestDecoder
case protocol.MessageTypeRegCltResult:
case message.MessageType_RegCltResult:
return RegisterTMResponseDecoder
case protocol.MessageTypeRegRm:
case message.MessageType_RegRm:
return RegisterRMRequestDecoder
case protocol.MessageTypeRegRmResult:
case message.MessageType_RegRmResult:
return RegisterRMResponseDecoder
case protocol.MessageTypeBranchCommit:
case message.MessageType_BranchCommit:
return BranchCommitRequestDecoder
case protocol.MessageTypeBranchRollback:
case message.MessageType_BranchRollback:
return BranchRollbackRequestDecoder
case protocol.MessageTypeGlobalReport:
case message.MessageType_GlobalReport:
return GlobalReportRequestDecoder
default:
var Decoder Decoder
@@ -195,23 +192,23 @@ func getMessageDecoder(typeCode protocol.MessageType) Decoder {
}
}

func getMergeRequestMessageDecoder(typeCode protocol.MessageType) Decoder {
func getMergeRequestMessageDecoder(typeCode message.MessageType) Decoder {
switch typeCode {
case protocol.MessageTypeGlobalBegin:
case message.MessageType_GlobalBegin:
return GlobalBeginRequestDecoder
case protocol.MessageTypeGlobalCommit:
case message.MessageType_GlobalCommit:
return GlobalCommitRequestDecoder
case protocol.MessageTypeGlobalRollback:
case message.MessageType_GlobalRollback:
return GlobalRollbackRequestDecoder
case protocol.MessageTypeGlobalStatus:
case message.MessageType_GlobalStatus:
return GlobalStatusRequestDecoder
case protocol.MessageTypeGlobalLockQuery:
case message.MessageType_GlobalLockQuery:
return GlobalLockQueryRequestDecoder
case protocol.MessageTypeBranchRegister:
case message.MessageType_BranchRegister:
return BranchRegisterRequestDecoder
case protocol.MessageTypeBranchStatusReport:
case message.MessageType_BranchStatusReport:
return BranchReportRequestDecoder
case protocol.MessageTypeGlobalReport:
case message.MessageType_GlobalReport:
return GlobalReportRequestDecoder
default:
break
@@ -219,27 +216,27 @@ func getMergeRequestMessageDecoder(typeCode protocol.MessageType) Decoder {
return nil
}

func getMergeResponseMessageDecoder(typeCode protocol.MessageType) Decoder {
func getMergeResponseMessageDecoder(typeCode message.MessageType) Decoder {
switch typeCode {
case protocol.MessageTypeGlobalBeginResult:
case message.MessageType_GlobalBeginResult:
return GlobalBeginResponseDecoder
case protocol.MessageTypeGlobalCommitResult:
case message.MessageType_GlobalCommitResult:
return GlobalCommitResponseDecoder
case protocol.MessageTypeGlobalRollbackResult:
case message.MessageType_GlobalRollbackResult:
return GlobalRollbackResponseDecoder
case protocol.MessageTypeGlobalStatusResult:
case message.MessageType_GlobalStatusResult:
return GlobalStatusResponseDecoder
case protocol.MessageTypeGlobalLockQueryResult:
case message.MessageType_GlobalLockQueryResult:
return GlobalLockQueryResponseDecoder
case protocol.MessageTypeBranchRegisterResult:
case message.MessageType_BranchRegisterResult:
return BranchRegisterResponseDecoder
case protocol.MessageTypeBranchStatusReportResult:
case message.MessageType_BranchStatusReportResult:
return BranchReportResponseDecoder
case protocol.MessageTypeBranchCommitResult:
case message.MessageType_BranchCommitResult:
return BranchCommitResponseDecoder
case protocol.MessageTypeBranchRollbackResult:
case message.MessageType_BranchRollbackResult:
return BranchRollbackResponseDecoder
case protocol.MessageTypeGlobalReportResult:
case message.MessageType_GlobalReportResult:
return GlobalReportResponseDecoder
default:
break


+ 83
- 85
pkg/protocol/codec/seata_decoder.go View File

@@ -2,17 +2,15 @@ package codec

import (
"bytes"
model2 "github.com/seata/seata-go/pkg/protocol/branch"
"github.com/seata/seata-go/pkg/protocol/message"
"github.com/seata/seata-go/pkg/protocol/transaction"
)

import (
"vimagination.zapto.org/byteio"
)

import (
model2 "github.com/seata/seata-go/pkg/common/model"
"github.com/seata/seata-go/pkg/protocol"
)

// TODO 待重构
func AbstractResultMessageDecoder(in []byte) (interface{}, int) {
var (
@@ -20,13 +18,13 @@ func AbstractResultMessageDecoder(in []byte) (interface{}, int) {
readN = 0
totalReadN = 0
)
msg := protocol.AbstractResultMessage{}
msg := message.AbstractResultMessage{}

r := byteio.BigEndianReader{Reader: bytes.NewReader(in)}
resultCode, _ := r.ReadByte()
msg.ResultCode = protocol.ResultCode(resultCode)
msg.ResultCode = message.ResultCode(resultCode)
totalReadN += 1
if msg.ResultCode == protocol.ResultCodeFailed {
if msg.ResultCode == message.ResultCodeFailed {
length16, readN, _ = r.ReadUint16()
totalReadN += readN
if length16 > 0 {
@@ -44,7 +42,7 @@ func MergedWarpMessageDecoder(in []byte) (interface{}, int) {
readN = 0
totalReadN = 0
)
result := protocol.MergedWarpMessage{}
result := message.MergedWarpMessage{}

r := byteio.BigEndianReader{Reader: bytes.NewReader(in)}

@@ -52,15 +50,15 @@ func MergedWarpMessageDecoder(in []byte) (interface{}, int) {
totalReadN += 4
size16, readN, _ = r.ReadInt16()
totalReadN += readN
result.Msgs = make([]protocol.MessageTypeAware, 0)
result.Msgs = make([]message.MessageTypeAware, 0)
for index := 0; index < int(size16); index++ {
typeCode, _, _ := r.ReadInt16()
totalReadN += 2
decoder := getMessageDecoder(protocol.MessageType(typeCode))
decoder := getMessageDecoder(message.MessageType(typeCode))
if decoder != nil {
msg, readN := decoder(in[totalReadN:])
totalReadN += readN
result.Msgs = append(result.Msgs, msg.(protocol.MessageTypeAware))
result.Msgs = append(result.Msgs, msg.(message.MessageTypeAware))
}
}
return result, totalReadN
@@ -72,7 +70,7 @@ func MergeResultMessageDecoder(in []byte) (interface{}, int) {
readN = 0
totalReadN = 0
)
result := protocol.MergeResultMessage{}
result := message.MergeResultMessage{}

r := byteio.BigEndianReader{Reader: bytes.NewReader(in)}

@@ -80,16 +78,16 @@ func MergeResultMessageDecoder(in []byte) (interface{}, int) {
totalReadN += 4
size16, readN, _ = r.ReadInt16()
totalReadN += readN
result.Msgs = make([]protocol.MessageTypeAware, 0)
result.Msgs = make([]message.MessageTypeAware, 0)

for index := 0; index < int(size16); index++ {
typeCode, _, _ := r.ReadInt16()
totalReadN += 2
decoder := getMessageDecoder(protocol.MessageType(typeCode))
decoder := getMessageDecoder(message.MessageType(typeCode))
if decoder != nil {
msg, readN := decoder(in[totalReadN:])
totalReadN += readN
result.Msgs = append(result.Msgs, msg.(protocol.MessageTypeAware))
result.Msgs = append(result.Msgs, msg.(message.MessageTypeAware))
}
}
return result, totalReadN
@@ -101,7 +99,7 @@ func AbstractIdentifyRequestDecoder(in []byte) (interface{}, int) {
readN = 0
totalReadN = 0
)
msg := protocol.AbstractIdentifyRequest{}
msg := message.AbstractIdentifyRequest{}

r := byteio.BigEndianReader{Reader: bytes.NewReader(in)}

@@ -143,7 +141,7 @@ func AbstractIdentifyResponseDecoder(in []byte) (interface{}, int) {
readN = 0
totalReadN = 0
)
msg := protocol.AbstractIdentifyResponse{}
msg := message.AbstractIdentifyResponse{}

r := byteio.BigEndianReader{Reader: bytes.NewReader(in)}

@@ -172,7 +170,7 @@ func RegisterRMRequestDecoder(in []byte) (interface{}, int) {
readN = 0
totalReadN = 0
)
msg := protocol.RegisterRMRequest{}
msg := message.RegisterRMRequest{}

r := byteio.BigEndianReader{Reader: bytes.NewReader(in)}

@@ -217,22 +215,22 @@ func RegisterRMRequestDecoder(in []byte) (interface{}, int) {

func RegisterRMResponseDecoder(in []byte) (interface{}, int) {
resp, totalReadN := AbstractIdentifyResponseDecoder(in)
abstractIdentifyResponse := resp.(protocol.AbstractIdentifyResponse)
msg := protocol.RegisterRMResponse{AbstractIdentifyResponse: abstractIdentifyResponse}
abstractIdentifyResponse := resp.(message.AbstractIdentifyResponse)
msg := message.RegisterRMResponse{AbstractIdentifyResponse: abstractIdentifyResponse}
return msg, totalReadN
}

func RegisterTMRequestDecoder(in []byte) (interface{}, int) {
req, totalReadN := AbstractIdentifyRequestDecoder(in)
abstractIdentifyRequest := req.(protocol.AbstractIdentifyRequest)
msg := protocol.RegisterTMRequest{AbstractIdentifyRequest: abstractIdentifyRequest}
abstractIdentifyRequest := req.(message.AbstractIdentifyRequest)
msg := message.RegisterTMRequest{AbstractIdentifyRequest: abstractIdentifyRequest}
return msg, totalReadN
}

func RegisterTMResponseDecoder(in []byte) (interface{}, int) {
resp, totalReadN := AbstractIdentifyResponseDecoder(in)
abstractIdentifyResponse := resp.(protocol.AbstractIdentifyResponse)
msg := protocol.RegisterRMResponse{AbstractIdentifyResponse: abstractIdentifyResponse}
abstractIdentifyResponse := resp.(message.AbstractIdentifyResponse)
msg := message.RegisterRMResponse{AbstractIdentifyResponse: abstractIdentifyResponse}
return msg, totalReadN
}

@@ -242,13 +240,13 @@ func AbstractTransactionResponseDecoder(in []byte) (interface{}, int) {
readN = 0
totalReadN = 0
)
msg := protocol.AbstractTransactionResponse{}
msg := message.AbstractTransactionResponse{}

r := byteio.BigEndianReader{Reader: bytes.NewReader(in)}
resultCode, _ := r.ReadByte()
totalReadN += 1
msg.ResultCode = protocol.ResultCode(resultCode)
if msg.ResultCode == protocol.ResultCodeFailed {
msg.ResultCode = message.ResultCode(resultCode)
if msg.ResultCode == message.ResultCodeFailed {
length16, readN, _ = r.ReadUint16()
totalReadN += readN
if length16 > 0 {
@@ -259,7 +257,7 @@ func AbstractTransactionResponseDecoder(in []byte) (interface{}, int) {

exceptionCode, _ := r.ReadByte()
totalReadN += 1
msg.TransactionExceptionCode = model2.TransactionExceptionCode(exceptionCode)
msg.TransactionExceptionCode = transaction.TransactionExceptionCode(exceptionCode)

return msg, totalReadN
}
@@ -270,7 +268,7 @@ func AbstractBranchEndRequestDecoder(in []byte) (interface{}, int) {
readN = 0
totalReadN = 0
)
msg := protocol.AbstractBranchEndRequest{}
msg := message.AbstractBranchEndRequest{}

r := byteio.BigEndianReader{Reader: bytes.NewReader(in)}

@@ -311,13 +309,13 @@ func AbstractBranchEndResponseDecoder(in []byte) (interface{}, int) {
readN = 0
totalReadN = 0
)
msg := protocol.AbstractBranchEndResponse{}
msg := message.AbstractBranchEndResponse{}

r := byteio.BigEndianReader{Reader: bytes.NewReader(in)}
resultCode, _ := r.ReadByte()
totalReadN += 1
msg.ResultCode = protocol.ResultCode(resultCode)
if msg.ResultCode == protocol.ResultCodeFailed {
msg.ResultCode = message.ResultCode(resultCode)
if msg.ResultCode == message.ResultCodeFailed {
length16, readN, _ = r.ReadUint16()
totalReadN += readN
if length16 > 0 {
@@ -328,7 +326,7 @@ func AbstractBranchEndResponseDecoder(in []byte) (interface{}, int) {

exceptionCode, _ := r.ReadByte()
totalReadN += 1
msg.TransactionExceptionCode = model2.TransactionExceptionCode(exceptionCode)
msg.TransactionExceptionCode = transaction.TransactionExceptionCode(exceptionCode)

length16, readN, _ = r.ReadUint16()
totalReadN += readN
@@ -352,7 +350,7 @@ func AbstractGlobalEndRequestDecoder(in []byte) (interface{}, int) {
readN = 0
totalReadN = 0
)
msg := protocol.AbstractGlobalEndRequest{}
msg := message.AbstractGlobalEndRequest{}

r := byteio.BigEndianReader{Reader: bytes.NewReader(in)}

@@ -380,13 +378,13 @@ func AbstractGlobalEndResponseDecoder(in []byte) (interface{}, int) {
readN = 0
totalReadN = 0
)
msg := protocol.AbstractGlobalEndResponse{}
msg := message.AbstractGlobalEndResponse{}

r := byteio.BigEndianReader{Reader: bytes.NewReader(in)}
resultCode, _ := r.ReadByte()
totalReadN += 1
msg.ResultCode = protocol.ResultCode(resultCode)
if msg.ResultCode == protocol.ResultCodeFailed {
msg.ResultCode = message.ResultCode(resultCode)
if msg.ResultCode == message.ResultCodeFailed {
length16, readN, _ = r.ReadUint16()
totalReadN += readN
if length16 > 0 {
@@ -397,26 +395,26 @@ func AbstractGlobalEndResponseDecoder(in []byte) (interface{}, int) {

exceptionCode, _ := r.ReadByte()
totalReadN += 1
msg.TransactionExceptionCode = model2.TransactionExceptionCode(exceptionCode)
msg.TransactionExceptionCode = transaction.TransactionExceptionCode(exceptionCode)

globalStatus, _ := r.ReadByte()
totalReadN += 1
msg.GlobalStatus = model2.GlobalStatus(globalStatus)
msg.GlobalStatus = transaction.GlobalStatus(globalStatus)

return msg, totalReadN
}

func BranchCommitRequestDecoder(in []byte) (interface{}, int) {
req, totalReadN := AbstractBranchEndRequestDecoder(in)
abstractBranchEndRequest := req.(protocol.AbstractBranchEndRequest)
msg := protocol.BranchCommitRequest{AbstractBranchEndRequest: abstractBranchEndRequest}
abstractBranchEndRequest := req.(message.AbstractBranchEndRequest)
msg := message.BranchCommitRequest{AbstractBranchEndRequest: abstractBranchEndRequest}
return msg, totalReadN
}

func BranchCommitResponseDecoder(in []byte) (interface{}, int) {
resp, totalReadN := AbstractBranchEndResponseDecoder(in)
abstractBranchEndResponse := resp.(protocol.AbstractBranchEndResponse)
msg := protocol.BranchCommitResponse{AbstractBranchEndResponse: abstractBranchEndResponse}
abstractBranchEndResponse := resp.(message.AbstractBranchEndResponse)
msg := message.BranchCommitResponse{AbstractBranchEndResponse: abstractBranchEndResponse}
return msg, totalReadN
}

@@ -427,7 +425,7 @@ func BranchRegisterRequestDecoder(in []byte) (interface{}, int) {
readN = 0
totalReadN = 0
)
msg := protocol.BranchRegisterRequest{}
msg := message.BranchRegisterRequest{}

r := byteio.BigEndianReader{Reader: bytes.NewReader(in)}

@@ -473,13 +471,13 @@ func BranchRegisterResponseDecoder(in []byte) (interface{}, int) {
readN = 0
totalReadN = 0
)
msg := protocol.BranchRegisterResponse{}
msg := message.BranchRegisterResponse{}

r := byteio.BigEndianReader{Reader: bytes.NewReader(in)}
resultCode, _ := r.ReadByte()
totalReadN += 1
msg.ResultCode = protocol.ResultCode(resultCode)
if msg.ResultCode == protocol.ResultCodeFailed {
msg.ResultCode = message.ResultCode(resultCode)
if msg.ResultCode == message.ResultCodeFailed {
length16, readN, _ = r.ReadUint16()
totalReadN += readN
if length16 > 0 {
@@ -490,7 +488,7 @@ func BranchRegisterResponseDecoder(in []byte) (interface{}, int) {

exceptionCode, _ := r.ReadByte()
totalReadN += 1
msg.TransactionExceptionCode = model2.TransactionExceptionCode(exceptionCode)
msg.TransactionExceptionCode = transaction.TransactionExceptionCode(exceptionCode)

msg.BranchId, readN, _ = r.ReadInt64()
totalReadN += readN
@@ -504,7 +502,7 @@ func BranchReportRequestDecoder(in []byte) (interface{}, int) {
readN = 0
totalReadN = 0
)
msg := protocol.BranchReportRequest{}
msg := message.BranchReportRequest{}

r := byteio.BigEndianReader{Reader: bytes.NewReader(in)}

@@ -543,22 +541,22 @@ func BranchReportRequestDecoder(in []byte) (interface{}, int) {

func BranchReportResponseDecoder(in []byte) (interface{}, int) {
resp, totalReadN := AbstractTransactionResponseDecoder(in)
abstractTransactionResponse := resp.(protocol.AbstractTransactionResponse)
msg := protocol.BranchReportResponse{AbstractTransactionResponse: abstractTransactionResponse}
abstractTransactionResponse := resp.(message.AbstractTransactionResponse)
msg := message.BranchReportResponse{AbstractTransactionResponse: abstractTransactionResponse}
return msg, totalReadN
}

func BranchRollbackRequestDecoder(in []byte) (interface{}, int) {
req, totalReadN := AbstractBranchEndRequestDecoder(in)
abstractBranchEndRequest := req.(protocol.AbstractBranchEndRequest)
msg := protocol.BranchRollbackRequest{AbstractBranchEndRequest: abstractBranchEndRequest}
abstractBranchEndRequest := req.(message.AbstractBranchEndRequest)
msg := message.BranchRollbackRequest{AbstractBranchEndRequest: abstractBranchEndRequest}
return msg, totalReadN
}

func BranchRollbackResponseDecoder(in []byte) (interface{}, int) {
resp, totalReadN := AbstractBranchEndResponseDecoder(in)
abstractBranchEndResponse := resp.(protocol.AbstractBranchEndResponse)
msg := protocol.BranchRollbackResponse{AbstractBranchEndResponse: abstractBranchEndResponse}
abstractBranchEndResponse := resp.(message.AbstractBranchEndResponse)
msg := message.BranchRollbackResponse{AbstractBranchEndResponse: abstractBranchEndResponse}
return msg, totalReadN
}

@@ -568,7 +566,7 @@ func GlobalBeginRequestDecoder(in []byte) (interface{}, int) {
readN = 0
totalReadN = 0
)
msg := protocol.GlobalBeginRequest{}
msg := message.GlobalBeginRequest{}

r := byteio.BigEndianReader{Reader: bytes.NewReader(in)}

@@ -592,13 +590,13 @@ func GlobalBeginResponseDecoder(in []byte) (interface{}, int) {
readN = 0
totalReadN = 0
)
msg := protocol.GlobalBeginResponse{}
msg := message.GlobalBeginResponse{}

r := byteio.BigEndianReader{Reader: bytes.NewReader(in)}
resultCode, _ := r.ReadByte()
totalReadN += 1
msg.ResultCode = protocol.ResultCode(resultCode)
if msg.ResultCode == protocol.ResultCodeFailed {
msg.ResultCode = message.ResultCode(resultCode)
if msg.ResultCode == message.ResultCodeFailed {
length16, readN, _ = r.ReadUint16()
totalReadN += readN
if length16 > 0 {
@@ -609,7 +607,7 @@ func GlobalBeginResponseDecoder(in []byte) (interface{}, int) {

exceptionCode, _ := r.ReadByte()
totalReadN += 1
msg.TransactionExceptionCode = model2.TransactionExceptionCode(exceptionCode)
msg.TransactionExceptionCode = transaction.TransactionExceptionCode(exceptionCode)

length16, readN, _ = r.ReadUint16()
totalReadN += readN
@@ -631,22 +629,22 @@ func GlobalBeginResponseDecoder(in []byte) (interface{}, int) {

func GlobalCommitRequestDecoder(in []byte) (interface{}, int) {
req, totalReadN := AbstractGlobalEndRequestDecoder(in)
abstractGlobalEndRequest := req.(protocol.AbstractGlobalEndRequest)
msg := protocol.GlobalCommitRequest{AbstractGlobalEndRequest: abstractGlobalEndRequest}
abstractGlobalEndRequest := req.(message.AbstractGlobalEndRequest)
msg := message.GlobalCommitRequest{AbstractGlobalEndRequest: abstractGlobalEndRequest}
return msg, totalReadN
}

func GlobalCommitResponseDecoder(in []byte) (interface{}, int) {
resp, totalReadN := AbstractGlobalEndResponseDecoder(in)
abstractGlobalEndResponse := resp.(protocol.AbstractGlobalEndResponse)
msg := protocol.GlobalCommitResponse{AbstractGlobalEndResponse: abstractGlobalEndResponse}
abstractGlobalEndResponse := resp.(message.AbstractGlobalEndResponse)
msg := message.GlobalCommitResponse{AbstractGlobalEndResponse: abstractGlobalEndResponse}
return msg, totalReadN
}

func GlobalLockQueryRequestDecoder(in []byte) (interface{}, int) {
req, totalReadN := BranchRegisterRequestDecoder(in)
branchRegisterRequest := req.(protocol.BranchRegisterRequest)
msg := protocol.GlobalLockQueryRequest{BranchRegisterRequest: branchRegisterRequest}
branchRegisterRequest := req.(message.BranchRegisterRequest)
msg := message.GlobalLockQueryRequest{BranchRegisterRequest: branchRegisterRequest}
return msg, totalReadN
}

@@ -656,13 +654,13 @@ func GlobalLockQueryResponseDecoder(in []byte) (interface{}, int) {
readN = 0
totalReadN = 0
)
msg := protocol.GlobalLockQueryResponse{}
msg := message.GlobalLockQueryResponse{}

r := byteio.BigEndianReader{Reader: bytes.NewReader(in)}
resultCode, _ := r.ReadByte()
totalReadN += 1
msg.ResultCode = protocol.ResultCode(resultCode)
if msg.ResultCode == protocol.ResultCodeFailed {
msg.ResultCode = message.ResultCode(resultCode)
if msg.ResultCode == message.ResultCodeFailed {
length16, readN, _ = r.ReadUint16()
totalReadN += readN
if length16 > 0 {
@@ -673,7 +671,7 @@ func GlobalLockQueryResponseDecoder(in []byte) (interface{}, int) {

exceptionCode, _ := r.ReadByte()
totalReadN += 1
msg.TransactionExceptionCode = model2.TransactionExceptionCode(exceptionCode)
msg.TransactionExceptionCode = transaction.TransactionExceptionCode(exceptionCode)

lockable, readN, _ := r.ReadUint16()
totalReadN += readN
@@ -692,7 +690,7 @@ func GlobalReportRequestDecoder(in []byte) (interface{}, int) {
readN = 0
totalReadN = 0
)
msg := protocol.GlobalReportRequest{}
msg := message.GlobalReportRequest{}

r := byteio.BigEndianReader{Reader: bytes.NewReader(in)}

@@ -713,43 +711,43 @@ func GlobalReportRequestDecoder(in []byte) (interface{}, int) {

globalStatus, _ := r.ReadByte()
totalReadN += 1
msg.GlobalStatus = model2.GlobalStatus(globalStatus)
msg.GlobalStatus = transaction.GlobalStatus(globalStatus)

return msg, totalReadN
}

func GlobalReportResponseDecoder(in []byte) (interface{}, int) {
resp, totalReadN := AbstractGlobalEndResponseDecoder(in)
abstractGlobalEndResponse := resp.(protocol.AbstractGlobalEndResponse)
msg := protocol.GlobalReportResponse{AbstractGlobalEndResponse: abstractGlobalEndResponse}
abstractGlobalEndResponse := resp.(message.AbstractGlobalEndResponse)
msg := message.GlobalReportResponse{AbstractGlobalEndResponse: abstractGlobalEndResponse}
return msg, totalReadN
}

func GlobalRollbackRequestDecoder(in []byte) (interface{}, int) {
req, totalReadN := AbstractGlobalEndRequestDecoder(in)
abstractGlobalEndRequest := req.(protocol.AbstractGlobalEndRequest)
msg := protocol.GlobalRollbackRequest{AbstractGlobalEndRequest: abstractGlobalEndRequest}
abstractGlobalEndRequest := req.(message.AbstractGlobalEndRequest)
msg := message.GlobalRollbackRequest{AbstractGlobalEndRequest: abstractGlobalEndRequest}
return msg, totalReadN
}

func GlobalRollbackResponseDecoder(in []byte) (interface{}, int) {
resp, totalReadN := AbstractGlobalEndResponseDecoder(in)
abstractGlobalEndResponse := resp.(protocol.AbstractGlobalEndResponse)
msg := protocol.GlobalRollbackResponse{AbstractGlobalEndResponse: abstractGlobalEndResponse}
abstractGlobalEndResponse := resp.(message.AbstractGlobalEndResponse)
msg := message.GlobalRollbackResponse{AbstractGlobalEndResponse: abstractGlobalEndResponse}
return msg, totalReadN
}

func GlobalStatusRequestDecoder(in []byte) (interface{}, int) {
req, totalReadN := AbstractGlobalEndRequestDecoder(in)
abstractGlobalEndRequest := req.(protocol.AbstractGlobalEndRequest)
msg := protocol.GlobalStatusRequest{AbstractGlobalEndRequest: abstractGlobalEndRequest}
abstractGlobalEndRequest := req.(message.AbstractGlobalEndRequest)
msg := message.GlobalStatusRequest{AbstractGlobalEndRequest: abstractGlobalEndRequest}
return msg, totalReadN
}

func GlobalStatusResponseDecoder(in []byte) (interface{}, int) {
resp, totalReadN := AbstractGlobalEndResponseDecoder(in)
abstractGlobalEndResponse := resp.(protocol.AbstractGlobalEndResponse)
msg := protocol.GlobalStatusResponse{AbstractGlobalEndResponse: abstractGlobalEndResponse}
abstractGlobalEndResponse := resp.(message.AbstractGlobalEndResponse)
msg := message.GlobalStatusResponse{AbstractGlobalEndResponse: abstractGlobalEndResponse}
return msg, totalReadN
}

@@ -759,7 +757,7 @@ func UndoLogDeleteRequestDecoder(in []byte) (interface{}, int) {
readN = 0
totalReadN = 0
)
msg := protocol.UndoLogDeleteRequest{}
msg := message.UndoLogDeleteRequest{}

r := byteio.BigEndianReader{Reader: bytes.NewReader(in)}
branchType, _ := r.ReadByte()
@@ -774,7 +772,7 @@ func UndoLogDeleteRequestDecoder(in []byte) (interface{}, int) {
}

day, readN, _ := r.ReadInt16()
msg.SaveDays = protocol.MessageType(day)
msg.SaveDays = message.MessageType(day)
totalReadN += readN

return msg, totalReadN


+ 43
- 46
pkg/protocol/codec/seata_encoder.go View File

@@ -2,17 +2,14 @@ package codec

import (
"bytes"
"github.com/seata/seata-go/pkg/common/log"
"github.com/seata/seata-go/pkg/protocol/message"
)

import (
"vimagination.zapto.org/byteio"
)

import (
"github.com/seata/seata-go/pkg/protocol"
"github.com/seata/seata-go/pkg/utils/log"
)

// TODO 待重构
func AbstractResultMessageEncoder(in interface{}) []byte {
var (
@@ -21,18 +18,18 @@ func AbstractResultMessageEncoder(in interface{}) []byte {
)
w := byteio.BigEndianWriter{Writer: &b}

message := in.(protocol.AbstractResultMessage)
msgs := in.(message.AbstractResultMessage)

w.WriteByte(byte(message.ResultCode))
if message.ResultCode == protocol.ResultCodeFailed {
w.WriteByte(byte(msgs.ResultCode))
if msgs.ResultCode == message.ResultCodeFailed {
var msg string
if message.Msg != "" {
if len(message.Msg) > 128 {
msg = message.Msg[:128]
if msgs.Msg != "" {
if len(msgs.Msg) > 128 {
msg = msgs.Msg[:128]
} else {
msg = message.Msg
msg = msgs.Msg
}
// 暂时不考虑 message.Msg 包含中文的情况,这样字符串的长度就是 byte 数组的长度
// 暂时不考虑 msg.Msg 包含中文的情况,这样字符串的长度就是 byte 数组的长度

w.WriteInt16(int16(len(msg)))
w.WriteString(msg)
@@ -51,7 +48,7 @@ func MergedWarpMessageEncoder(in interface{}) []byte {
)
w := byteio.BigEndianWriter{Writer: &b}

req, _ := in.(protocol.MergedWarpMessage)
req, _ := in.(message.MergedWarpMessage)
w.WriteInt16(int16(len(req.Msgs)))

for _, msg := range req.Msgs {
@@ -80,7 +77,7 @@ func MergeResultMessageEncoder(in interface{}) []byte {
)
w := byteio.BigEndianWriter{Writer: &b}

req, _ := in.(protocol.MergeResultMessage)
req, _ := in.(message.MergeResultMessage)
w.WriteInt16(int16(len(req.Msgs)))

for _, msg := range req.Msgs {
@@ -109,7 +106,7 @@ func AbstractIdentifyRequestEncoder(in interface{}) []byte {
)
w := byteio.BigEndianWriter{Writer: &b}

req := in.(protocol.AbstractIdentifyRequest)
req := in.(message.AbstractIdentifyRequest)

if req.Version != "" {
w.WriteInt16(int16(len(req.Version)))
@@ -143,7 +140,7 @@ func AbstractIdentifyRequestEncoder(in interface{}) []byte {
}

func AbstractIdentifyResponseEncoder(in interface{}) []byte {
resp := in.(protocol.AbstractIdentifyResponse)
resp := in.(message.AbstractIdentifyResponse)

var (
zero16 int16 = 0
@@ -168,7 +165,7 @@ func AbstractIdentifyResponseEncoder(in interface{}) []byte {
}

func RegisterRMRequestEncoder(in interface{}) []byte {
req := in.(protocol.RegisterRMRequest)
req := in.(message.RegisterRMRequest)
data := AbstractIdentifyRequestEncoder(req.AbstractIdentifyRequest)

var (
@@ -189,22 +186,22 @@ func RegisterRMRequestEncoder(in interface{}) []byte {
}

func RegisterRMResponseEncoder(in interface{}) []byte {
resp := in.(protocol.RegisterRMResponse)
resp := in.(message.RegisterRMResponse)
return AbstractIdentifyResponseEncoder(resp.AbstractIdentifyResponse)
}

func RegisterTMRequestEncoder(in interface{}) []byte {
req := in.(protocol.RegisterTMRequest)
req := in.(message.RegisterTMRequest)
return AbstractIdentifyRequestEncoder(req.AbstractIdentifyRequest)
}

func RegisterTMResponseEncoder(in interface{}) []byte {
resp := in.(protocol.RegisterTMResponse)
resp := in.(message.RegisterTMResponse)
return AbstractIdentifyResponseEncoder(resp.AbstractIdentifyResponse)
}

func AbstractTransactionResponseEncoder(in interface{}) []byte {
resp := in.(protocol.AbstractTransactionResponse)
resp := in.(message.AbstractTransactionResponse)
data := AbstractResultMessageEncoder(resp.AbstractResultMessage)

result := append(data, byte(resp.TransactionExceptionCode))
@@ -220,7 +217,7 @@ func AbstractBranchEndRequestEncoder(in interface{}) []byte {
)
w := byteio.BigEndianWriter{Writer: &b}

req, _ := in.(protocol.AbstractBranchEndRequest)
req, _ := in.(message.AbstractBranchEndRequest)

if req.Xid != "" {
w.WriteInt16(int16(len(req.Xid)))
@@ -250,7 +247,7 @@ func AbstractBranchEndRequestEncoder(in interface{}) []byte {
}

func AbstractBranchEndResponseEncoder(in interface{}) []byte {
resp, _ := in.(protocol.AbstractBranchEndResponse)
resp, _ := in.(message.AbstractBranchEndResponse)
data := AbstractTransactionResponseEncoder(resp.AbstractTransactionResponse)

var (
@@ -281,7 +278,7 @@ func AbstractGlobalEndRequestEncoder(in interface{}) []byte {
)
w := byteio.BigEndianWriter{Writer: &b}

req, _ := in.(protocol.AbstractGlobalEndRequest)
req, _ := in.(message.AbstractGlobalEndRequest)

if req.Xid != "" {
w.WriteInt16(int16(len(req.Xid)))
@@ -300,7 +297,7 @@ func AbstractGlobalEndRequestEncoder(in interface{}) []byte {
}

func AbstractGlobalEndResponseEncoder(in interface{}) []byte {
resp := in.(protocol.AbstractGlobalEndResponse)
resp := in.(message.AbstractGlobalEndResponse)
data := AbstractTransactionResponseEncoder(resp.AbstractTransactionResponse)

result := append(data, byte(resp.GlobalStatus))
@@ -309,12 +306,12 @@ func AbstractGlobalEndResponseEncoder(in interface{}) []byte {
}

func BranchCommitRequestEncoder(in interface{}) []byte {
req := in.(protocol.BranchCommitRequest)
req := in.(message.BranchCommitRequest)
return AbstractBranchEndRequestEncoder(req.AbstractBranchEndRequest)
}

func BranchCommitResponseEncoder(in interface{}) []byte {
resp := in.(protocol.BranchCommitResponse)
resp := in.(message.BranchCommitResponse)
return AbstractBranchEndResponseEncoder(resp.AbstractBranchEndResponse)
}

@@ -326,7 +323,7 @@ func BranchRegisterRequestEncoder(in interface{}) []byte {
)
w := byteio.BigEndianWriter{Writer: &b}

req, _ := in.(protocol.BranchRegisterRequest)
req, _ := in.(message.BranchRegisterRequest)

if req.Xid != "" {
w.WriteInt16(int16(len(req.Xid)))
@@ -362,7 +359,7 @@ func BranchRegisterRequestEncoder(in interface{}) []byte {
}

func BranchRegisterResponseEncoder(in interface{}) []byte {
resp := in.(protocol.BranchRegisterResponse)
resp := in.(message.BranchRegisterResponse)
data := AbstractTransactionResponseEncoder(resp.AbstractTransactionResponse)

c := uint64(resp.BranchId)
@@ -389,7 +386,7 @@ func BranchReportRequestEncoder(in interface{}) []byte {
)
w := byteio.BigEndianWriter{Writer: &b}

req, _ := in.(protocol.BranchReportRequest)
req, _ := in.(message.BranchReportRequest)

if req.Xid != "" {
w.WriteInt16(int16(len(req.Xid)))
@@ -421,17 +418,17 @@ func BranchReportRequestEncoder(in interface{}) []byte {
}

func BranchReportResponseEncoder(in interface{}) []byte {
resp := in.(protocol.BranchReportResponse)
resp := in.(message.BranchReportResponse)
return AbstractTransactionResponseEncoder(resp.AbstractTransactionResponse)
}

func BranchRollbackRequestEncoder(in interface{}) []byte {
req := in.(protocol.BranchRollbackRequest)
req := in.(message.BranchRollbackRequest)
return AbstractBranchEndRequestEncoder(req.AbstractBranchEndRequest)
}

func BranchRollbackResponseEncoder(in interface{}) []byte {
resp := in.(protocol.BranchRollbackResponse)
resp := in.(message.BranchRollbackResponse)
return AbstractBranchEndResponseEncoder(resp.AbstractBranchEndResponse)
}

@@ -442,7 +439,7 @@ func GlobalBeginRequestEncoder(in interface{}) []byte {
)
w := byteio.BigEndianWriter{Writer: &b}

req, _ := in.(protocol.GlobalBeginRequest)
req, _ := in.(message.GlobalBeginRequest)

w.WriteInt32(req.Timeout)
if req.TransactionName != "" {
@@ -456,7 +453,7 @@ func GlobalBeginRequestEncoder(in interface{}) []byte {
}

func GlobalBeginResponseEncoder(in interface{}) []byte {
resp := in.(protocol.GlobalBeginResponse)
resp := in.(message.GlobalBeginResponse)
data := AbstractTransactionResponseEncoder(resp.AbstractTransactionResponse)

var (
@@ -484,12 +481,12 @@ func GlobalBeginResponseEncoder(in interface{}) []byte {
}

func GlobalCommitRequestEncoder(in interface{}) []byte {
req := in.(protocol.GlobalCommitRequest)
req := in.(message.GlobalCommitRequest)
return AbstractGlobalEndRequestEncoder(req.AbstractGlobalEndRequest)
}

func GlobalCommitResponseEncoder(in interface{}) []byte {
resp := in.(protocol.GlobalCommitResponse)
resp := in.(message.GlobalCommitResponse)
return AbstractGlobalEndResponseEncoder(resp.AbstractGlobalEndResponse)
}

@@ -498,7 +495,7 @@ func GlobalLockQueryRequestEncoder(in interface{}) []byte {
}

func GlobalLockQueryResponseEncoder(in interface{}) []byte {
resp, _ := in.(protocol.GlobalLockQueryResponse)
resp, _ := in.(message.GlobalLockQueryResponse)
data := AbstractTransactionResponseEncoder(resp.AbstractTransactionResponse)

var result []byte
@@ -512,7 +509,7 @@ func GlobalLockQueryResponseEncoder(in interface{}) []byte {
}

func GlobalReportRequestEncoder(in interface{}) []byte {
req, _ := in.(protocol.GlobalReportRequest)
req, _ := in.(message.GlobalReportRequest)
data := AbstractGlobalEndRequestEncoder(req.AbstractGlobalEndRequest)

result := append(data, byte(req.GlobalStatus))
@@ -520,27 +517,27 @@ func GlobalReportRequestEncoder(in interface{}) []byte {
}

func GlobalReportResponseEncoder(in interface{}) []byte {
resp := in.(protocol.GlobalReportResponse)
resp := in.(message.GlobalReportResponse)
return AbstractGlobalEndResponseEncoder(resp.AbstractGlobalEndResponse)
}

func GlobalRollbackRequestEncoder(in interface{}) []byte {
req := in.(protocol.GlobalRollbackRequest)
req := in.(message.GlobalRollbackRequest)
return AbstractGlobalEndRequestEncoder(req.AbstractGlobalEndRequest)
}

func GlobalRollbackResponseEncoder(in interface{}) []byte {
resp := in.(protocol.GlobalRollbackResponse)
resp := in.(message.GlobalRollbackResponse)
return AbstractGlobalEndResponseEncoder(resp.AbstractGlobalEndResponse)
}

func GlobalStatusRequestEncoder(in interface{}) []byte {
req := in.(protocol.GlobalStatusRequest)
req := in.(message.GlobalStatusRequest)
return AbstractGlobalEndRequestEncoder(req.AbstractGlobalEndRequest)
}

func GlobalStatusResponseEncoder(in interface{}) []byte {
resp := in.(protocol.GlobalStatusResponse)
resp := in.(message.GlobalStatusResponse)
return AbstractGlobalEndResponseEncoder(resp.AbstractGlobalEndResponse)
}

@@ -551,7 +548,7 @@ func UndoLogDeleteRequestEncoder(in interface{}) []byte {
)
w := byteio.BigEndianWriter{Writer: &b}

req, _ := in.(protocol.UndoLogDeleteRequest)
req, _ := in.(message.UndoLogDeleteRequest)

w.WriteByte(byte(req.BranchType))
if req.ResourceId != "" {


+ 0
- 30
pkg/protocol/constant.go View File

@@ -1,30 +0,0 @@
package protocol

var MAGIC_CODE_BYTES = [2]byte{0xda, 0xda}

type RequestType byte

const (
VERSION = 1

// MaxFrameLength max frame length
MaxFrameLength = 8 * 1024 * 1024

// V1HeadLength v1 head length
V1HeadLength = 16

// Request message type
MSGTypeRequestSync RequestType = 0

// Response message type
MSGTypeResponse RequestType = 1

// Request which no need response
MSGTypeRequestOneway RequestType = 2

// Heartbeat Request
MSGTypeHeartbeatRequest RequestType = 3

// Heartbeat Response
MSGTypeHeartbeatResponse RequestType = 4
)

+ 0
- 16
pkg/protocol/heart_beat_message.go View File

@@ -1,16 +0,0 @@
package protocol

type HeartBeatMessage struct {
Ping bool
}

var HeartBeatMessagePing = HeartBeatMessage{true}
var HeartBeatMessagePong = HeartBeatMessage{false}

func (msg HeartBeatMessage) ToString() string {
if msg.Ping {
return "services ping"
} else {
return "services pong"
}
}

+ 0
- 20
pkg/protocol/identify.go View File

@@ -1,20 +0,0 @@
package protocol

type AbstractResultMessage struct {
ResultCode ResultCode
Msg string
}

type AbstractIdentifyRequest struct {
Version string
ApplicationId string `json:"applicationId"`
TransactionServiceGroup string
ExtraData []byte
}

type AbstractIdentifyResponse struct {
AbstractResultMessage
Version string
ExtraData []byte
Identified bool
}

+ 0
- 18
pkg/protocol/merged_message.go View File

@@ -1,18 +0,0 @@
package protocol

type MergedWarpMessage struct {
Msgs []MessageTypeAware
MsgIds []int32
}

func (req MergedWarpMessage) GetTypeCode() MessageType {
return MessageTypeSeataMerge
}

type MergeResultMessage struct {
Msgs []MessageTypeAware
}

func (resp MergeResultMessage) GetTypeCode() MessageType {
return MessageTypeSeataMergeResult
}

+ 151
- 0
pkg/protocol/message/constant.go View File

@@ -0,0 +1,151 @@
package message

var MAGIC_CODE_BYTES = [2]byte{0xda, 0xda}

type (
MessageType int
GettyRequestType byte
)

const (
/**
* The constant TYPE_GLOBAL_BEGIN.
*/
MessageType_GlobalBegin MessageType = 1
/**
* The constant TYPE_GLOBAL_BEGIN_RESULT.
*/
MessageType_GlobalBeginResult MessageType = 2
/**
* The constant TYPE_GLOBAL_COMMIT.
*/
MessageType_GlobalCommit MessageType = 7
/**
* The constant TYPE_GLOBAL_COMMIT_RESULT.
*/
MessageType_GlobalCommitResult MessageType = 8
/**
* The constant TYPE_GLOBAL_ROLLBACK.
*/
MessageType_GlobalRollback MessageType = 9
/**
* The constant TYPE_GLOBAL_ROLLBACK_RESULT.
*/
MessageType_GlobalRollbackResult MessageType = 10
/**
* The constant TYPE_GLOBAL_STATUS.
*/
MessageType_GlobalStatus MessageType = 15
/**
* The constant TYPE_GLOBAL_STATUS_RESULT.
*/
MessageType_GlobalStatusResult MessageType = 16
/**
* The constant TYPE_GLOBAL_REPORT.
*/
MessageType_GlobalReport MessageType = 17
/**
* The constant TYPE_GLOBAL_REPORT_RESULT.
*/
MessageType_GlobalReportResult MessageType = 18
/**
* The constant TYPE_GLOBAL_LOCK_QUERY.
*/
MessageType_GlobalLockQuery MessageType = 21
/**
* The constant TYPE_GLOBAL_LOCK_QUERY_RESULT.
*/
MessageType_GlobalLockQueryResult MessageType = 22

/**
* The constant TYPE_BRANCH_COMMIT.
*/
MessageType_BranchCommit MessageType = 3
/**
* The constant TYPE_BRANCH_COMMIT_RESULT.
*/
MessageType_BranchCommitResult MessageType = 4
/**
* The constant TYPE_BRANCH_ROLLBACK.
*/
MessageType_BranchRollback MessageType = 5
/**
* The constant TYPE_BRANCH_ROLLBACK_RESULT.
*/
MessageType_BranchRollbackResult MessageType = 6
/**
* The constant TYPE_BRANCH_REGISTER.
*/
MessageType_BranchRegister MessageType = 11
/**
* The constant TYPE_BRANCH_REGISTER_RESULT.
*/
MessageType_BranchRegisterResult MessageType = 12
/**
* The constant TYPE_BRANCH_STATUS_REPORT.
*/
MessageType_BranchStatusReport MessageType = 13
/**
* The constant TYPE_BRANCH_STATUS_REPORT_RESULT.
*/
MessageType_BranchStatusReportResult MessageType = 14

/**
* The constant TYPE_SEATA_MERGE.
*/
MessageType_SeataMerge MessageType = 59
/**
* The constant TYPE_SEATA_MERGE_RESULT.
*/
MessageType_SeataMergeResult MessageType = 60

/**
* The constant TYPE_REG_CLT.
*/
MessageType_RegClt MessageType = 101
/**
* The constant TYPE_REG_CLT_RESULT.
*/
MessageType_RegCltResult MessageType = 102
/**
* The constant TYPE_REG_RM.
*/
MessageType_RegRm MessageType = 103
/**
* The constant TYPE_REG_RM_RESULT.
*/
MessageType_RegRmResult MessageType = 104
/**
* The constant TYPE_RM_DELETE_UNDOLOG.
*/
MessageType_RmDeleteUndolog MessageType = 111
/**
* the constant TYPE_HEARTBEAT_MSG
*/
MessageType_HeartbeatMsg MessageType = 120
)

const (
VERSION = 1

// MaxFrameLength max frame length
MaxFrameLength = 8 * 1024 * 1024

// V1HeadLength v1 head length
V1HeadLength = 16

// Request message type
GettyRequestType_RequestSync GettyRequestType = 0

// Response message type
GettyRequestType_Response GettyRequestType = 1

// Request which no need response
GettyRequestType_RequestOneway GettyRequestType = 2

// Heartbeat Request
GettyRequestType_HeartbeatRequest GettyRequestType = 3

// Heartbeat Response
GettyRequestType_HeartbeatResponse GettyRequestType = 4
)

+ 48
- 0
pkg/protocol/message/message_apis.go View File

@@ -0,0 +1,48 @@
package message

type AbstractResultMessage struct {
ResultCode ResultCode
Msg string
}

type AbstractIdentifyRequest struct {
Version string
ApplicationId string `json:"applicationId"`
TransactionServiceGroup string
ExtraData []byte
}

type AbstractIdentifyResponse struct {
AbstractResultMessage
Version string
ExtraData []byte
Identified bool
}

type MessageTypeAware interface {
GetTypeCode() MessageType
}

type MergedWarpMessage struct {
Msgs []MessageTypeAware
MsgIds []int32
}

func (req MergedWarpMessage) GetTypeCode() MessageType {
return MessageType_SeataMerge
}

type MergeResultMessage struct {
Msgs []MessageTypeAware
}

func (resp MergeResultMessage) GetTypeCode() MessageType {
return MessageType_SeataMergeResult
}

type ResultCode byte

const (
ResultCodeFailed ResultCode = iota
ResultCodeSuccess
)

+ 41
- 0
pkg/protocol/message/other_message.go View File

@@ -0,0 +1,41 @@
package message

type RpcMessage struct {
ID int32
Type GettyRequestType
Codec byte
Compressor byte
HeadMap map[string]string
Body interface{}
}

type MessageFuture struct {
ID int32
Err error
Response interface{}
Done chan bool
}

func NewMessageFuture(message RpcMessage) *MessageFuture {
return &MessageFuture{
ID: message.ID,
Done: make(chan bool),
}
}

type HeartBeatMessage struct {
Ping bool
}

var (
HeartBeatMessagePing = HeartBeatMessage{true}
HeartBeatMessagePong = HeartBeatMessage{false}
)

func (msg HeartBeatMessage) ToString() string {
if msg.Ping {
return "services ping"
} else {
return "services pong"
}
}

+ 154
- 0
pkg/protocol/message/request_message.go View File

@@ -0,0 +1,154 @@
package message

import (
model2 "github.com/seata/seata-go/pkg/protocol/branch"
"github.com/seata/seata-go/pkg/protocol/transaction"
)

type AbstractBranchEndRequest struct {
Xid string
BranchId int64
BranchType model2.BranchType
ResourceId string
ApplicationData []byte
}

type AbstractGlobalEndRequest struct {
Xid string
ExtraData []byte
}

type BranchRegisterRequest struct {
Xid string
BranchType model2.BranchType
ResourceId string
LockKey string
ApplicationData []byte
}

func (req BranchRegisterRequest) GetTypeCode() MessageType {
return MessageType_BranchRegister
}

type BranchReportRequest struct {
Xid string
BranchId int64
ResourceId string
Status model2.BranchStatus
ApplicationData []byte
BranchType model2.BranchType
}

func (req BranchReportRequest) GetTypeCode() MessageType {
return MessageType_BranchStatusReport
}

type BranchCommitRequest struct {
AbstractBranchEndRequest
}

func (req BranchCommitRequest) GetTypeCode() MessageType {
return MessageType_BranchCommit
}

type BranchRollbackRequest struct {
AbstractBranchEndRequest
}

func (req BranchRollbackRequest) GetTypeCode() MessageType {
return MessageType_BranchRollback
}

type GlobalBeginRequest struct {
Timeout int32
TransactionName string
}

func (req GlobalBeginRequest) GetTypeCode() MessageType {
return MessageType_GlobalBegin
}

type GlobalStatusRequest struct {
AbstractGlobalEndRequest
}

func (req GlobalStatusRequest) GetTypeCode() MessageType {
return MessageType_GlobalStatus
}

type GlobalLockQueryRequest struct {
BranchRegisterRequest
}

func (req GlobalLockQueryRequest) GetTypeCode() MessageType {
return MessageType_GlobalLockQuery
}

type GlobalReportRequest struct {
AbstractGlobalEndRequest

GlobalStatus transaction.GlobalStatus
}

func (req GlobalReportRequest) GetTypeCode() MessageType {
return MessageType_GlobalStatus
}

type GlobalCommitRequest struct {
AbstractGlobalEndRequest
}

func (req GlobalCommitRequest) GetTypeCode() MessageType {
return MessageType_GlobalCommit
}

type GlobalRollbackRequest struct {
AbstractGlobalEndRequest
}

func (req GlobalRollbackRequest) GetTypeCode() MessageType {
return MessageType_GlobalRollback
}

type UndoLogDeleteRequest struct {
ResourceId string
SaveDays MessageType
BranchType model2.BranchType
}

func (req UndoLogDeleteRequest) GetTypeCode() MessageType {
return MessageType_RmDeleteUndolog
}

type RegisterTMRequest struct {
AbstractIdentifyRequest
}

func (req RegisterTMRequest) GetTypeCode() MessageType {
return MessageType_RegClt
}

type RegisterTMResponse struct {
AbstractIdentifyResponse
}

func (resp RegisterTMResponse) GetTypeCode() MessageType {
return MessageType_RegCltResult
}

type RegisterRMRequest struct {
AbstractIdentifyRequest
ResourceIds string
}

func (req RegisterRMRequest) GetTypeCode() MessageType {
return MessageType_RegRm
}

type RegisterRMResponse struct {
AbstractIdentifyResponse
}

func (resp RegisterRMResponse) GetTypeCode() MessageType {
return MessageType_RegRmResult
}

+ 109
- 0
pkg/protocol/message/response_message.go View File

@@ -0,0 +1,109 @@
package message

import (
model2 "github.com/seata/seata-go/pkg/protocol/branch"
"github.com/seata/seata-go/pkg/protocol/transaction"
)

type AbstractTransactionResponse struct {
AbstractResultMessage
TransactionExceptionCode transaction.TransactionExceptionCode
}

type AbstractBranchEndResponse struct {
AbstractTransactionResponse
Xid string
BranchId int64
BranchStatus model2.BranchStatus
}

type AbstractGlobalEndResponse struct {
AbstractTransactionResponse
GlobalStatus transaction.GlobalStatus
}

type BranchRegisterResponse struct {
AbstractTransactionResponse
BranchId int64
}

func (resp BranchRegisterResponse) GetTypeCode() MessageType {
return MessageType_BranchRegisterResult
}

type BranchReportResponse struct {
AbstractTransactionResponse
}

func (resp BranchReportResponse) GetTypeCode() MessageType {
return MessageType_BranchStatusReportResult
}

type BranchCommitResponse struct {
AbstractBranchEndResponse
}

func (resp BranchCommitResponse) GetTypeCode() MessageType {
return MessageType_BranchCommitResult
}

type BranchRollbackResponse struct {
AbstractBranchEndResponse
}

func (resp BranchRollbackResponse) GetTypeCode() MessageType {
return MessageType_GlobalRollbackResult
}

type GlobalBeginResponse struct {
AbstractTransactionResponse

Xid string
ExtraData []byte
}

func (resp GlobalBeginResponse) GetTypeCode() MessageType {
return MessageType_GlobalBeginResult
}

type GlobalStatusResponse struct {
AbstractGlobalEndResponse
}

func (resp GlobalStatusResponse) GetTypeCode() MessageType {
return MessageType_GlobalStatusResult
}

type GlobalLockQueryResponse struct {
AbstractTransactionResponse

Lockable bool
}

func (resp GlobalLockQueryResponse) GetTypeCode() MessageType {
return MessageType_GlobalLockQueryResult
}

type GlobalReportResponse struct {
AbstractGlobalEndResponse
}

func (resp GlobalReportResponse) GetTypeCode() MessageType {
return MessageType_GlobalStatusResult
}

type GlobalCommitResponse struct {
AbstractGlobalEndResponse
}

func (resp GlobalCommitResponse) GetTypeCode() MessageType {
return MessageType_GlobalCommitResult
}

type GlobalRollbackResponse struct {
AbstractGlobalEndResponse
}

func (resp GlobalRollbackResponse) GetTypeCode() MessageType {
return MessageType_GlobalRollbackResult
}

+ 0
- 17
pkg/protocol/message_future.go View File

@@ -1,17 +0,0 @@
package protocol

// MessageFuture ...
type MessageFuture struct {
ID int32
Err error
Response interface{}
Done chan bool
}

// NewMessageFuture ...
func NewMessageFuture(message RpcMessage) *MessageFuture {
return &MessageFuture{
ID: message.ID,
Done: make(chan bool),
}
}

+ 0
- 121
pkg/protocol/message_type.go View File

@@ -1,121 +0,0 @@
package protocol

type MessageType int

const (
/**
* The constant TYPE_GLOBAL_BEGIN.
*/
MessageTypeGlobalBegin MessageType = 1
/**
* The constant TYPE_GLOBAL_BEGIN_RESULT.
*/
MessageTypeGlobalBeginResult MessageType = 2
/**
* The constant TYPE_GLOBAL_COMMIT.
*/
MessageTypeGlobalCommit MessageType = 7
/**
* The constant TYPE_GLOBAL_COMMIT_RESULT.
*/
MessageTypeGlobalCommitResult MessageType = 8
/**
* The constant TYPE_GLOBAL_ROLLBACK.
*/
MessageTypeGlobalRollback MessageType = 9
/**
* The constant TYPE_GLOBAL_ROLLBACK_RESULT.
*/
MessageTypeGlobalRollbackResult MessageType = 10
/**
* The constant TYPE_GLOBAL_STATUS.
*/
MessageTypeGlobalStatus MessageType = 15
/**
* The constant TYPE_GLOBAL_STATUS_RESULT.
*/
MessageTypeGlobalStatusResult MessageType = 16
/**
* The constant TYPE_GLOBAL_REPORT.
*/
MessageTypeGlobalReport MessageType = 17
/**
* The constant TYPE_GLOBAL_REPORT_RESULT.
*/
MessageTypeGlobalReportResult MessageType = 18
/**
* The constant TYPE_GLOBAL_LOCK_QUERY.
*/
MessageTypeGlobalLockQuery MessageType = 21
/**
* The constant TYPE_GLOBAL_LOCK_QUERY_RESULT.
*/
MessageTypeGlobalLockQueryResult MessageType = 22

/**
* The constant TYPE_BRANCH_COMMIT.
*/
MessageTypeBranchCommit MessageType = 3
/**
* The constant TYPE_BRANCH_COMMIT_RESULT.
*/
MessageTypeBranchCommitResult MessageType = 4
/**
* The constant TYPE_BRANCH_ROLLBACK.
*/
MessageTypeBranchRollback MessageType = 5
/**
* The constant TYPE_BRANCH_ROLLBACK_RESULT.
*/
MessageTypeBranchRollbackResult MessageType = 6
/**
* The constant TYPE_BRANCH_REGISTER.
*/
MessageTypeBranchRegister MessageType = 11
/**
* The constant TYPE_BRANCH_REGISTER_RESULT.
*/
MessageTypeBranchRegisterResult MessageType = 12
/**
* The constant TYPE_BRANCH_STATUS_REPORT.
*/
MessageTypeBranchStatusReport MessageType = 13
/**
* The constant TYPE_BRANCH_STATUS_REPORT_RESULT.
*/
MessageTypeBranchStatusReportResult MessageType = 14

/**
* The constant TYPE_SEATA_MERGE.
*/
MessageTypeSeataMerge MessageType = 59
/**
* The constant TYPE_SEATA_MERGE_RESULT.
*/
MessageTypeSeataMergeResult MessageType = 60

/**
* The constant TYPE_REG_CLT.
*/
MessageTypeRegClt MessageType = 101
/**
* The constant TYPE_REG_CLT_RESULT.
*/
MessageTypeRegCltResult MessageType = 102
/**
* The constant TYPE_REG_RM.
*/
MessageTypeRegRm MessageType = 103
/**
* The constant TYPE_REG_RM_RESULT.
*/
MessageTypeRegRmResult MessageType = 104
/**
* The constant TYPE_RM_DELETE_UNDOLOG.
*/
MessageTypeRmDeleteUndolog MessageType = 111
/**
* the constant TYPE_HEARTBEAT_MSG
*/
MessageTypeHeartbeatMsg MessageType = 120
)

+ 0
- 5
pkg/protocol/message_type_aware.go View File

@@ -1,5 +0,0 @@
package protocol

type MessageTypeAware interface {
GetTypeCode() MessageType
}

+ 0
- 226
pkg/protocol/request_response.go View File

@@ -1,226 +0,0 @@
package protocol

import (
model2 "github.com/seata/seata-go/pkg/common/model"
)

type AbstractTransactionResponse struct {
AbstractResultMessage
TransactionExceptionCode model2.TransactionExceptionCode
}

type AbstractBranchEndRequest struct {
Xid string
BranchId int64
BranchType model2.BranchType
ResourceId string
ApplicationData []byte
}

type AbstractBranchEndResponse struct {
AbstractTransactionResponse

Xid string
BranchId int64
BranchStatus model2.BranchStatus
}

type AbstractGlobalEndRequest struct {
Xid string
ExtraData []byte
}

type AbstractGlobalEndResponse struct {
AbstractTransactionResponse

GlobalStatus model2.GlobalStatus
}

type BranchRegisterRequest struct {
Xid string
BranchType model2.BranchType
ResourceId string
LockKey string
ApplicationData []byte
}

func (req BranchRegisterRequest) GetTypeCode() MessageType {
return MessageTypeBranchRegister
}

type BranchRegisterResponse struct {
AbstractTransactionResponse

BranchId int64
}

func (resp BranchRegisterResponse) GetTypeCode() MessageType {
return MessageTypeBranchRegisterResult
}

type BranchReportRequest struct {
Xid string
BranchId int64
ResourceId string
Status model2.BranchStatus
ApplicationData []byte
BranchType model2.BranchType
}

func (req BranchReportRequest) GetTypeCode() MessageType {
return MessageTypeBranchStatusReport
}

type BranchReportResponse struct {
AbstractTransactionResponse
}

func (resp BranchReportResponse) GetTypeCode() MessageType {
return MessageTypeBranchStatusReportResult
}

type BranchCommitRequest struct {
AbstractBranchEndRequest
}

func (req BranchCommitRequest) GetTypeCode() MessageType {
return MessageTypeBranchCommit
}

type BranchCommitResponse struct {
AbstractBranchEndResponse
}

func (resp BranchCommitResponse) GetTypeCode() MessageType {
return MessageTypeBranchCommitResult
}

type BranchRollbackRequest struct {
AbstractBranchEndRequest
}

func (req BranchRollbackRequest) GetTypeCode() MessageType {
return MessageTypeBranchRollback
}

type BranchRollbackResponse struct {
AbstractBranchEndResponse
}

func (resp BranchRollbackResponse) GetTypeCode() MessageType {
return MessageTypeGlobalRollbackResult
}

type GlobalBeginRequest struct {
Timeout int32
TransactionName string
}

func (req GlobalBeginRequest) GetTypeCode() MessageType {
return MessageTypeGlobalBegin
}

type GlobalBeginResponse struct {
AbstractTransactionResponse

Xid string
ExtraData []byte
}

func (resp GlobalBeginResponse) GetTypeCode() MessageType {
return MessageTypeGlobalBeginResult
}

type GlobalStatusRequest struct {
AbstractGlobalEndRequest
}

func (req GlobalStatusRequest) GetTypeCode() MessageType {
return MessageTypeGlobalStatus
}

type GlobalStatusResponse struct {
AbstractGlobalEndResponse
}

func (resp GlobalStatusResponse) GetTypeCode() MessageType {
return MessageTypeGlobalStatusResult
}

type GlobalLockQueryRequest struct {
BranchRegisterRequest
}

func (req GlobalLockQueryRequest) GetTypeCode() MessageType {
return MessageTypeGlobalLockQuery
}

type GlobalLockQueryResponse struct {
AbstractTransactionResponse

Lockable bool
}

func (resp GlobalLockQueryResponse) GetTypeCode() MessageType {
return MessageTypeGlobalLockQueryResult
}

type GlobalReportRequest struct {
AbstractGlobalEndRequest

GlobalStatus model2.GlobalStatus
}

func (req GlobalReportRequest) GetTypeCode() MessageType {
return MessageTypeGlobalStatus
}

type GlobalReportResponse struct {
AbstractGlobalEndResponse
}

func (resp GlobalReportResponse) GetTypeCode() MessageType {
return MessageTypeGlobalStatusResult
}

type GlobalCommitRequest struct {
AbstractGlobalEndRequest
}

func (req GlobalCommitRequest) GetTypeCode() MessageType {
return MessageTypeGlobalCommit
}

type GlobalCommitResponse struct {
AbstractGlobalEndResponse
}

func (resp GlobalCommitResponse) GetTypeCode() MessageType {
return MessageTypeGlobalCommitResult
}

type GlobalRollbackRequest struct {
AbstractGlobalEndRequest
}

func (req GlobalRollbackRequest) GetTypeCode() MessageType {
return MessageTypeGlobalRollback
}

type GlobalRollbackResponse struct {
AbstractGlobalEndResponse
}

func (resp GlobalRollbackResponse) GetTypeCode() MessageType {
return MessageTypeGlobalRollbackResult
}

type UndoLogDeleteRequest struct {
ResourceId string
SaveDays MessageType
BranchType model2.BranchType
}

func (req UndoLogDeleteRequest) GetTypeCode() MessageType {
return MessageTypeRmDeleteUndolog
}

+ 51
- 0
pkg/protocol/resource/resource.go View File

@@ -0,0 +1,51 @@
package resource

import (
"context"
"github.com/seata/seata-go/pkg/protocol/branch"
"sync"
)

// Resource that can be managed by Resource Manager and involved into global transaction
type Resource interface {
GetResourceGroupId() string
GetResourceId() string
GetBranchType() branch.BranchType
}

// Control a branch transaction commit or rollback
type ResourceManagerInbound interface {
// Commit a branch transaction
BranchCommit(ctx context.Context, branchType branch.BranchType, xid string, branchId int64, resourceId string, applicationData []byte) (branch.BranchStatus, error)
// Rollback a branch transaction
BranchRollback(ctx context.Context, ranchType branch.BranchType, xid string, branchId int64, resourceId string, applicationData []byte) (branch.BranchStatus, error)
}

// Resource Manager: send outbound request to TC
type ResourceManagerOutbound interface {
// Branch register long
BranchRegister(ctx context.Context, ranchType branch.BranchType, resourceId, clientId, xid, applicationData, lockKeys string) (int64, error)
// Branch report
BranchReport(ctx context.Context, ranchType branch.BranchType, xid string, branchId int64, status branch.BranchStatus, applicationData string) error
// Lock query boolean
LockQuery(ctx context.Context, ranchType branch.BranchType, resourceId, xid, lockKeys string) (bool, error)
}

// Resource Manager: common behaviors
type ResourceManager interface {
ResourceManagerInbound
ResourceManagerOutbound

// Register a Resource to be managed by Resource Manager
RegisterResource(resource Resource) error
// Unregister a Resource from the Resource Manager
UnregisterResource(resource Resource) error
// Get all resources managed by this manager
GetManagedResources() sync.Map
// Get the BranchType
GetBranchType() branch.BranchType
}

type ResourceManagerGetter interface {
GetResourceManager() ResourceManager
}

+ 0
- 18
pkg/protocol/result_code.go View File

@@ -1,18 +0,0 @@
package protocol

type ResultCode byte

const (

/**
* ResultCodeFailed result code.
*/
// ResultCodeFailed
ResultCodeFailed ResultCode = iota

/**
* Success result code.
*/
// Success
ResultCodeSuccess
)

+ 0
- 18
pkg/protocol/rm.go View File

@@ -1,18 +0,0 @@
package protocol

type RegisterRMRequest struct {
AbstractIdentifyRequest
ResourceIds string
}

func (req RegisterRMRequest) GetTypeCode() MessageType {
return MessageTypeRegRm
}

type RegisterRMResponse struct {
AbstractIdentifyResponse
}

func (resp RegisterRMResponse) GetTypeCode() MessageType {
return MessageTypeRegRmResult
}

+ 0
- 10
pkg/protocol/rpc_message.go View File

@@ -1,10 +0,0 @@
package protocol

type RpcMessage struct {
ID int32
Type RequestType
Codec byte
Compressor byte
HeadMap map[string]string
Body interface{}
}

+ 0
- 17
pkg/protocol/tm.go View File

@@ -1,17 +0,0 @@
package protocol

type RegisterTMRequest struct {
AbstractIdentifyRequest
}

func (req RegisterTMRequest) GetTypeCode() MessageType {
return MessageTypeRegClt
}

type RegisterTMResponse struct {
AbstractIdentifyResponse
}

func (resp RegisterTMResponse) GetTypeCode() MessageType {
return MessageTypeRegCltResult
}

pkg/common/model/context.go → pkg/protocol/transaction/context.go View File

@@ -1,4 +1,4 @@
package model
package transaction

import (
"context"

pkg/rm/api/transactional_template.go → pkg/protocol/transaction/executor/transactional_template.go View File

@@ -1,16 +1,20 @@
package api
package executor

import (
"context"
"github.com/pkg/errors"
"github.com/seata/seata-go/pkg/common/model"
"github.com/seata/seata-go/pkg/tm"
"github.com/seata/seata-go/pkg/utils/log"
"github.com/seata/seata-go/pkg/common/log"
"github.com/seata/seata-go/pkg/protocol/transaction"
"github.com/seata/seata-go/pkg/protocol/transaction/manager"
"sync"
)

type TransactionalExecutor interface {
Execute(ctx context.Context, param interface{}) (interface{}, error)
GetTransactionInfo() transaction.TransactionInfo
}

var (
// singletone ResourceManagerFacade
transactionTemplate *TransactionTemplate
onceTransactionTemplate = &sync.Once{}
)
@@ -28,32 +32,32 @@ type TransactionTemplate struct {
}

func (t *TransactionTemplate) Execute(ctx context.Context, business TransactionalExecutor, param interface{}) (interface{}, error) {
if !model.IsSeataContext(ctx) {
if !transaction.IsSeataContext(ctx) {
err := errors.New("context should be inited as seata context!")
log.Error(err)
return nil, err
}

if model.GetTransactionRole(ctx) == nil {
model.SetTransactionRole(ctx, model.LAUNCHER)
if transaction.GetTransactionRole(ctx) == nil {
transaction.SetTransactionRole(ctx, transaction.LAUNCHER)
}

var tx *tm.GlobalTransaction
if model.HasXID(ctx) {
tx = &tm.GlobalTransaction{
Xid: model.GetXID(ctx),
Status: model.Begin,
Role: model.PARTICIPANT,
var tx *manager.GlobalTransaction
if transaction.HasXID(ctx) {
tx = &manager.GlobalTransaction{
Xid: transaction.GetXID(ctx),
Status: transaction.Begin,
Role: transaction.PARTICIPANT,
}
}

// todo: Handle the transaction propagation.

if tx == nil {
tx = &tm.GlobalTransaction{
Xid: model.GetXID(ctx),
Status: model.UnKnown,
Role: model.LAUNCHER,
tx = &manager.GlobalTransaction{
Xid: transaction.GetXID(ctx),
Status: transaction.UnKnown,
Role: transaction.LAUNCHER,
}
}

@@ -70,7 +74,7 @@ func (t *TransactionTemplate) Execute(ctx context.Context, business Transactiona
res, err := business.Execute(ctx, param)
if err != nil {
log.Infof("transactionTemplate: execute business failed, error %v", err)
return nil, tm.GetGlobalTransactionManager().Rollback(ctx, tx)
return nil, manager.GetGlobalTransactionManager().Rollback(ctx, tx)
}

// commit global transaction
@@ -78,15 +82,15 @@ func (t *TransactionTemplate) Execute(ctx context.Context, business Transactiona
if err != nil {
log.Infof("transactionTemplate: commit transaction failed, error %v", err)
// rollback transaction
return nil, tm.GetGlobalTransactionManager().Rollback(ctx, tx)
return nil, manager.GetGlobalTransactionManager().Rollback(ctx, tx)
}
return res, err
}

func (TransactionTemplate) BeginTransaction(ctx context.Context, tx *tm.GlobalTransaction, timeout int32, name string) error {
return tm.GetGlobalTransactionManager().Begin(ctx, tx, timeout, name)
func (TransactionTemplate) BeginTransaction(ctx context.Context, tx *manager.GlobalTransaction, timeout int32, name string) error {
return manager.GetGlobalTransactionManager().Begin(ctx, tx, timeout, name)
}

func (TransactionTemplate) CommitTransaction(ctx context.Context, tx *tm.GlobalTransaction) error {
return tm.GetGlobalTransactionManager().Commit(ctx, tx)
func (TransactionTemplate) CommitTransaction(ctx context.Context, tx *manager.GlobalTransaction) error {
return manager.GetGlobalTransactionManager().Commit(ctx, tx)
}

pkg/common/model/transaction_exception_code.go → pkg/protocol/transaction/expection_code.go View File

@@ -1,4 +1,4 @@
package model
package transaction

type TransactionExceptionCode byte


+ 154
- 0
pkg/protocol/transaction/manager/global_transaction_manager.go View File

@@ -0,0 +1,154 @@
package manager

import (
"context"
"fmt"
"github.com/pkg/errors"
"github.com/seata/seata-go/pkg/common/log"
"github.com/seata/seata-go/pkg/protocol/message"
"github.com/seata/seata-go/pkg/protocol/transaction"
"github.com/seata/seata-go/pkg/remoting/getty"
"github.com/seata/seata-go/pkg/tm/api"
"sync"
)

type GlobalTransaction struct {
Xid string
Status transaction.GlobalStatus
Role transaction.GlobalTransactionRole
}

var (
// singletone ResourceManagerFacade
globalTransactionManager *GlobalTransactionManager
onceGlobalTransactionManager = &sync.Once{}
)

func GetGlobalTransactionManager() *GlobalTransactionManager {
if globalTransactionManager == nil {
onceGlobalTransactionManager.Do(func() {
globalTransactionManager = &GlobalTransactionManager{}
})
}
return globalTransactionManager
}

type GlobalTransactionManager struct {
}

// Begin a new global transaction with given timeout and given name.
func (g *GlobalTransactionManager) Begin(ctx context.Context, gtr *GlobalTransaction, timeout int32, name string) error {
if gtr.Role != transaction.LAUNCHER {
log.Infof("Ignore Begin(): just involved in global transaction %s", gtr.Xid)
return nil
}
if gtr.Xid != "" {
return errors.New(fmt.Sprintf("Global transaction already exists,can't begin a new global transaction, currentXid = %s ", gtr.Xid))
}

req := message.GlobalBeginRequest{
TransactionName: name,
Timeout: timeout,
}
res, err := getty.GetGettyRemotingClient().SendSyncRequest(req)
if err != nil {
log.Errorf("GlobalBeginRequest error, xid %s, error %v", gtr.Xid, err)
return err
}
if res == nil || res.(message.GlobalBeginResponse).ResultCode == message.ResultCodeFailed {
log.Errorf("GlobalBeginRequest error, xid %s, res %v", gtr.Xid, res)
return err
}
log.Infof("GlobalBeginRequest success, xid %s, res %v", gtr.Xid, res)

gtr.Status = transaction.Begin
gtr.Xid = res.(message.GlobalBeginResponse).Xid
transaction.SetXID(ctx, res.(message.GlobalBeginResponse).Xid)
return nil
}

// Commit the global transaction.
func (g *GlobalTransactionManager) Commit(ctx context.Context, gtr *GlobalTransaction) error {
if gtr.Role != transaction.LAUNCHER {
log.Infof("Ignore Commit(): just involved in global gtr [{}]", gtr.Xid)
return nil
}
if gtr.Xid == "" {
return errors.New("Commit xid should not be empty")
}

// todo: replace retry with config
var (
err error
res interface{}
)
for retry := 5; retry > 0; retry-- {
req := message.GlobalCommitRequest{
AbstractGlobalEndRequest: message.AbstractGlobalEndRequest{
Xid: gtr.Xid,
},
}
res, err = getty.GetGettyRemotingClient().SendSyncRequest(req)
if err != nil {
log.Errorf("GlobalCommitRequest error, xid %s, error %v", gtr.Xid, err)
} else {
break
}
}
if err == nil && res != nil {
gtr.Status = res.(message.GlobalCommitResponse).GlobalStatus
}
transaction.UnbindXid(ctx)
log.Infof("GlobalCommitRequest commit success, xid %s", gtr.Xid)
return err
}

// Rollback the global transaction.
func (g *GlobalTransactionManager) Rollback(ctx context.Context, gtr *GlobalTransaction) error {
if gtr.Role != transaction.LAUNCHER {
log.Infof("Ignore Commit(): just involved in global gtr [{}]", gtr.Xid)
return nil
}
if gtr.Xid == "" {
return errors.New("Commit xid should not be empty")
}

// todo: replace retry with config
var (
err error
res interface{}
)
for retry := 5; retry > 0; retry-- {
req := message.GlobalRollbackRequest{
AbstractGlobalEndRequest: message.AbstractGlobalEndRequest{
Xid: gtr.Xid,
},
}
res, err = getty.GetGettyRemotingClient().SendSyncRequest(req)
if err != nil {
log.Errorf("GlobalRollbackRequest error, xid %s, error %v", gtr.Xid, err)
} else {
break
}
}
if err == nil && res != nil {
gtr.Status = res.(message.GlobalRollbackResponse).GlobalStatus
}
transaction.UnbindXid(ctx)
return err
}

// Suspend the global transaction.
func (g *GlobalTransactionManager) Suspend() (api.SuspendedResourcesHolder, error) {
panic("implement me")
}

// Resume the global transaction.
func (g *GlobalTransactionManager) Resume(suspendedResourcesHolder api.SuspendedResourcesHolder) error {
panic("implement me")
}

// report the global transaction status.
func (g *GlobalTransactionManager) GlobalReport(globalStatus transaction.GlobalStatus) error {
panic("implement me")
}

+ 0
- 33
pkg/protocol/transaction/rm_handler.go View File

@@ -1,33 +0,0 @@
package transaction

import (
"context"
"github.com/seata/seata-go/pkg/protocol"
)

type RMInboundHandler interface {

/**
* Handle branch commit response.
*
* @param request the request
* @return the branch commit response
*/
HandleBranchCommitRequest(ctx context.Context, request protocol.BranchCommitRequest) (*protocol.BranchCommitResponse, error)

/**
* Handle branch rollback response.
*
* @param request the request
* @return the branch rollback response
*/

HandleBranchRollbackRequest(ctx context.Context, request protocol.BranchRollbackRequest) (*protocol.BranchRollbackResponse, error)

/**
* Handle delete undo log .
*
* @param request the request
*/
HandleUndoLogDeleteRequest(ctx context.Context, request protocol.UndoLogDeleteRequest) error
}

pkg/rm/api/transaction.go → pkg/protocol/transaction/transaction.go View File

@@ -1,4 +1,4 @@
package api
package transaction

type Propagation int8


pkg/common/model/transaction_manager.go → pkg/protocol/transaction/transaction_manager.go View File

@@ -1,4 +1,4 @@
package model
package transaction

type GlobalTransactionRole int8


pkg/common/model/global_status.go → pkg/protocol/transaction/transaction_status.go View File

@@ -1,4 +1,4 @@
package model
package transaction

type GlobalStatus int64


pkg/rpc/getty/getty_client.go → pkg/remoting/getty/getty_client.go View File

@@ -1,6 +1,7 @@
package getty

import (
"github.com/seata/seata-go/pkg/protocol/message"
"sync"
"time"
)
@@ -10,7 +11,6 @@ import (
)

import (
"github.com/seata/seata-go/pkg/protocol"
"github.com/seata/seata-go/pkg/protocol/codec"
)

@@ -35,13 +35,13 @@ func GetGettyRemotingClient() *GettyRemotingClient {
}

func (client *GettyRemotingClient) SendAsyncRequest(msg interface{}) error {
var msgType protocol.RequestType
if _, ok := msg.(protocol.HeartBeatMessage); ok {
msgType = protocol.MSGTypeHeartbeatRequest
var msgType message.GettyRequestType
if _, ok := msg.(message.HeartBeatMessage); ok {
msgType = message.GettyRequestType_HeartbeatRequest
} else {
msgType = protocol.MSGTypeRequestOneway
msgType = message.GettyRequestType_RequestOneway
}
rpcMessage := protocol.RpcMessage{
rpcMessage := message.RpcMessage{
ID: int32(client.idGenerator.Inc()),
Type: msgType,
Codec: codec.SEATA,
@@ -52,9 +52,9 @@ func (client *GettyRemotingClient) SendAsyncRequest(msg interface{}) error {
}

func (client *GettyRemotingClient) SendAsyncResponse(msg interface{}) error {
rpcMessage := protocol.RpcMessage{
rpcMessage := message.RpcMessage{
ID: int32(client.idGenerator.Inc()),
Type: protocol.MSGTypeResponse,
Type: message.GettyRequestType_Response,
Codec: codec.SEATA,
Compressor: 0,
Body: msg,
@@ -63,9 +63,9 @@ func (client *GettyRemotingClient) SendAsyncResponse(msg interface{}) error {
}

func (client *GettyRemotingClient) SendSyncRequest(msg interface{}) (interface{}, error) {
rpcMessage := protocol.RpcMessage{
rpcMessage := message.RpcMessage{
ID: int32(client.idGenerator.Inc()),
Type: protocol.MSGTypeRequestSync,
Type: message.GettyRequestType_RequestSync,
Codec: codec.SEATA,
Compressor: 0,
Body: msg,
@@ -74,9 +74,9 @@ func (client *GettyRemotingClient) SendSyncRequest(msg interface{}) (interface{}
}

func (client *GettyRemotingClient) SendSyncRequestWithTimeout(msg interface{}, timeout time.Duration) (interface{}, error) {
rpcMessage := protocol.RpcMessage{
rpcMessage := message.RpcMessage{
ID: int32(client.idGenerator.Inc()),
Type: protocol.MSGTypeRequestSync,
Type: message.GettyRequestType_RequestSync,
Codec: codec.SEATA,
Compressor: 0,
Body: msg,

pkg/rpc/getty/getty_remoting.go → pkg/remoting/getty/getty_remoting.go View File

@@ -1,6 +1,8 @@
package getty

import (
"github.com/seata/seata-go/pkg/common/log"
"github.com/seata/seata-go/pkg/protocol/message"
"sync"
"time"
)
@@ -13,11 +15,6 @@ import (
"github.com/pkg/errors"
)

import (
"github.com/seata/seata-go/pkg/protocol"
"github.com/seata/seata-go/pkg/utils/log"
)

const (
RPC_REQUEST_TIMEOUT = 30 * time.Second
)
@@ -44,29 +41,29 @@ func GetGettyRemotingInstance() *GettyRemoting {
return gettyRemoting
}

func (client *GettyRemoting) SendSync(msg protocol.RpcMessage) (interface{}, error) {
func (client *GettyRemoting) SendSync(msg message.RpcMessage) (interface{}, error) {
ss := clientSessionManager.AcquireGettySession()
return client.sendAsync(ss, msg, RPC_REQUEST_TIMEOUT)
}

func (client *GettyRemoting) SendSyncWithTimeout(msg protocol.RpcMessage, timeout time.Duration) (interface{}, error) {
func (client *GettyRemoting) SendSyncWithTimeout(msg message.RpcMessage, timeout time.Duration) (interface{}, error) {
ss := clientSessionManager.AcquireGettySession()
return client.sendAsync(ss, msg, timeout)
}

func (client *GettyRemoting) SendASync(msg protocol.RpcMessage) error {
func (client *GettyRemoting) SendASync(msg message.RpcMessage) error {
ss := clientSessionManager.AcquireGettySession()
_, err := client.sendAsync(ss, msg, 0*time.Second)
return err
}

// TODO 待重构
func (client *GettyRemoting) sendAsync(session getty.Session, msg protocol.RpcMessage, timeout time.Duration) (interface{}, error) {
func (client *GettyRemoting) sendAsync(session getty.Session, msg message.RpcMessage, timeout time.Duration) (interface{}, error) {
var err error
if session == nil || session.IsClosed() {
log.Warn("sendAsyncRequestWithResponse nothing, caused by null channel.")
}
resp := protocol.NewMessageFuture(msg)
resp := message.NewMessageFuture(msg)
client.futures.Store(msg.ID, resp)
_, _, err = session.WritePkg(msg, time.Duration(0))
if err != nil {
@@ -106,9 +103,9 @@ func (client *GettyRemoting) sendAsync(session getty.Session, msg protocol.RpcMe
return nil, err
}

func (client *GettyRemoting) GetMessageFuture(msgID int32) *protocol.MessageFuture {
func (client *GettyRemoting) GetMessageFuture(msgID int32) *message.MessageFuture {
if msg, ok := client.futures.Load(msgID); ok {
return msg.(*protocol.MessageFuture)
return msg.(*message.MessageFuture)
}
return nil
}
@@ -121,14 +118,14 @@ func (client *GettyRemoting) RemoveMergedMessageFuture(msgID int32) {
client.mergeMsgMap.Delete(msgID)
}

func (client *GettyRemoting) GetMergedMessage(msgID int32) *protocol.MergedWarpMessage {
func (client *GettyRemoting) GetMergedMessage(msgID int32) *message.MergedWarpMessage {
if msg, ok := client.mergeMsgMap.Load(msgID); ok {
return msg.(*protocol.MergedWarpMessage)
return msg.(*message.MergedWarpMessage)
}
return nil
}

func (client *GettyRemoting) NotifyRpcMessageResponse(rpcMessage protocol.RpcMessage) {
func (client *GettyRemoting) NotifyRpcMessageResponse(rpcMessage message.RpcMessage) {
messageFuture := client.GetMessageFuture(rpcMessage.ID)
if messageFuture != nil {
messageFuture.Response = rpcMessage.Body

pkg/rpc/getty/listener.go → pkg/remoting/getty/listener.go View File

@@ -2,6 +2,9 @@ package getty

import (
"context"
"github.com/seata/seata-go/pkg/common/log"
"github.com/seata/seata-go/pkg/protocol/message"
"github.com/seata/seata-go/pkg/remoting/processor"
"sync"
)

@@ -13,9 +16,6 @@ import (

import (
"github.com/seata/seata-go/pkg/config"
"github.com/seata/seata-go/pkg/protocol"
"github.com/seata/seata-go/pkg/rpc/processor"
"github.com/seata/seata-go/pkg/utils/log"
)

var (
@@ -28,7 +28,7 @@ type gettyClientHandler struct {
idGenerator *atomic.Uint32
msgFutures *sync.Map
mergeMsgMap *sync.Map
processorTable map[protocol.MessageType]processor.RemotingProcessor
processorTable map[message.MessageType]processor.RemotingProcessor
}

func GetGettyClientHandlerInstance() *gettyClientHandler {
@@ -39,7 +39,7 @@ func GetGettyClientHandlerInstance() *gettyClientHandler {
idGenerator: &atomic.Uint32{},
msgFutures: &sync.Map{},
mergeMsgMap: &sync.Map{},
processorTable: make(map[protocol.MessageType]processor.RemotingProcessor, 0),
processorTable: make(map[message.MessageType]processor.RemotingProcessor, 0),
}
})
}
@@ -48,23 +48,23 @@ func GetGettyClientHandlerInstance() *gettyClientHandler {

func (client *gettyClientHandler) OnOpen(session getty.Session) error {
clientSessionManager.RegisterGettySession(session)
//go func() {
// request := protocol.RegisterTMRequest{AbstractIdentifyRequest: protocol.AbstractIdentifyRequest{
// Version: client.conf.SeataVersion,
// ApplicationId: client.conf.ApplicationID,
// TransactionServiceGroup: client.conf.TransactionServiceGroup,
// }}
// err := GetGettyRemotingClient().SendAsyncRequest(request)
// //client.sendAsyncRequestWithResponse(session, request, RPC_REQUEST_TIMEOUT)
// if err != nil {
// log.Error("OnOpen error: {%#v}", err.Error())
// clientSessionManager.ReleaseGettySession(session)
// return
// }
//
// //todo
// //client.GettySessionOnOpenChannel <- session.RemoteAddr()
//}()
go func() {
request := message.RegisterTMRequest{AbstractIdentifyRequest: message.AbstractIdentifyRequest{
Version: client.conf.SeataVersion,
ApplicationId: client.conf.ApplicationID,
TransactionServiceGroup: client.conf.TransactionServiceGroup,
}}
err := GetGettyRemotingClient().SendAsyncRequest(request)
//client.sendAsyncRequestWithResponse(session, request, RPC_REQUEST_TIMEOUT)
if err != nil {
log.Error("OnOpen error: {%#v}", err.Error())
clientSessionManager.ReleaseGettySession(session)
return
}
//todo
//client.GettySessionOnOpenChannel <- session.RemoteAddr()
}()

return nil
}
@@ -82,13 +82,13 @@ func (client *gettyClientHandler) OnMessage(session getty.Session, pkg interface
ctx := context.Background()
log.Debugf("received message: {%#v}", pkg)

rpcMessage, ok := pkg.(protocol.RpcMessage)
rpcMessage, ok := pkg.(message.RpcMessage)
if !ok {
log.Errorf("received message is not protocol.RpcMessage. pkg: %#v", pkg)
return
}

if mm, ok := rpcMessage.Body.(protocol.MessageTypeAware); ok {
if mm, ok := rpcMessage.Body.(message.MessageTypeAware); ok {
processor := client.processorTable[mm.GetTypeCode()]
if processor != nil {
processor.Process(ctx, rpcMessage)
@@ -104,7 +104,7 @@ func (client *gettyClientHandler) OnCron(session getty.Session) {
// todo 发送心跳消息
}

func (client *gettyClientHandler) RegisterProcessor(msgType protocol.MessageType, processor processor.RemotingProcessor) {
func (client *gettyClientHandler) RegisterProcessor(msgType message.MessageType, processor processor.RemotingProcessor) {
if nil != processor {
client.processorTable[msgType] = processor
}

pkg/rpc/getty/readwriter.go → pkg/remoting/getty/readwriter.go View File

@@ -3,6 +3,7 @@ package getty
import (
"bytes"
"encoding/binary"
"github.com/seata/seata-go/pkg/protocol/message"
)

import (
@@ -14,7 +15,6 @@ import (
)

import (
"github.com/seata/seata-go/pkg/protocol"
"github.com/seata/seata-go/pkg/protocol/codec"
)

@@ -66,7 +66,7 @@ type SeataV1PackageHeader struct {
Version byte
TotalLength uint32
HeadLength uint16
MessageType protocol.RequestType
MessageType message.GettyRequestType
CodecType byte
CompressType byte
ID uint32
@@ -87,7 +87,7 @@ func (h *SeataV1PackageHeader) Unmarshal(buf *bytes.Buffer) (int, error) {
if err := binary.Read(buf, binary.BigEndian, &(h.Magic1)); err != nil {
return 0, err
}
if h.Magic0 != protocol.MAGIC_CODE_BYTES[0] || h.Magic1 != protocol.MAGIC_CODE_BYTES[1] {
if h.Magic0 != message.MAGIC_CODE_BYTES[0] || h.Magic1 != message.MAGIC_CODE_BYTES[1] {
return 0, ErrIllegalMagic
}
// version
@@ -150,7 +150,7 @@ func (p *RpcPackageHandler) Read(ss getty.Session, data []byte) (interface{}, in
}

//r := byteio.BigEndianReader{Reader: bytes.NewReader(data)}
rpcMessage := protocol.RpcMessage{
rpcMessage := message.RpcMessage{
Codec: header.CodecType,
ID: int32(header.ID),
Compressor: header.CompressType,
@@ -158,10 +158,10 @@ func (p *RpcPackageHandler) Read(ss getty.Session, data []byte) (interface{}, in
HeadMap: header.Meta,
}

if header.MessageType == protocol.MSGTypeHeartbeatRequest {
rpcMessage.Body = protocol.HeartBeatMessagePing
} else if header.MessageType == protocol.MSGTypeHeartbeatResponse {
rpcMessage.Body = protocol.HeartBeatMessagePong
if header.MessageType == message.GettyRequestType_HeartbeatRequest {
rpcMessage.Body = message.HeartBeatMessagePing
} else if header.MessageType == message.GettyRequestType_HeartbeatResponse {
rpcMessage.Body = message.HeartBeatMessagePong
} else {
if header.BodyLength > 0 {
//todo compress
@@ -175,20 +175,20 @@ func (p *RpcPackageHandler) Read(ss getty.Session, data []byte) (interface{}, in

// Write write rpc message to binary data
func (p *RpcPackageHandler) Write(ss getty.Session, pkg interface{}) ([]byte, error) {
msg, ok := pkg.(protocol.RpcMessage)
msg, ok := pkg.(message.RpcMessage)
if !ok {
return nil, ErrInvalidPackage
}

fullLength := protocol.V1HeadLength
headLength := protocol.V1HeadLength
fullLength := message.V1HeadLength
headLength := message.V1HeadLength
var result = make([]byte, 0, fullLength)

var b bytes.Buffer
w := byteio.BigEndianWriter{Writer: &b}

result = append(result, protocol.MAGIC_CODE_BYTES[:2]...)
result = append(result, protocol.VERSION)
result = append(result, message.MAGIC_CODE_BYTES[:2]...)
result = append(result, message.VERSION)

w.WriteByte(byte(msg.Type))
w.WriteByte(msg.Codec)
@@ -202,8 +202,8 @@ func (p *RpcPackageHandler) Write(ss getty.Session, pkg interface{}) ([]byte, er
w.Write(headMapBytes)
}

if msg.Type != protocol.MSGTypeHeartbeatRequest &&
msg.Type != protocol.MSGTypeHeartbeatResponse {
if msg.Type != message.GettyRequestType_HeartbeatRequest &&
msg.Type != message.GettyRequestType_HeartbeatResponse {

bodyBytes := codec.MessageEncoder(msg.Codec, msg.Body)
fullLength += len(bodyBytes)

pkg/rpc/getty/rpc_client.go → pkg/remoting/getty/rpc_client.go View File

@@ -2,6 +2,7 @@ package getty

import (
"fmt"
"github.com/seata/seata-go/pkg/common/log"
"net"
"sync"
)
@@ -14,7 +15,6 @@ import (

import (
"github.com/seata/seata-go/pkg/config"
"github.com/seata/seata-go/pkg/utils/log"
)

type RpcClient struct {

pkg/rpc/getty/getty_client_session_manager.go → pkg/remoting/getty/session_manager.go View File


+ 23
- 0
pkg/remoting/processor/client/client_heart_beat_processon.go View File

@@ -0,0 +1,23 @@
package client

import (
"context"
"github.com/seata/seata-go/pkg/common/log"
"github.com/seata/seata-go/pkg/protocol/message"
"github.com/seata/seata-go/pkg/remoting/getty"
)

func init() {
getty.GetGettyClientHandlerInstance().RegisterProcessor(message.MessageType_HeartbeatMsg, &clientHeartBeatProcesson{})
}

type clientHeartBeatProcesson struct{}

func (f *clientHeartBeatProcesson) Process(ctx context.Context, rpcMessage message.RpcMessage) error {
if _, ok := rpcMessage.Body.(message.HeartBeatMessage); ok {
// TODO 如何从context中获取远程服务的信息?
//log.Infof("received PONG from {}", ctx.channel().remoteAddress())
log.Infof("received PONG from {}", ctx)
}
return nil
}

+ 55
- 0
pkg/remoting/processor/client/client_on_response_processor.go View File

@@ -0,0 +1,55 @@
package client

import (
"context"
"github.com/seata/seata-go/pkg/common/log"
"github.com/seata/seata-go/pkg/protocol/message"
getty2 "github.com/seata/seata-go/pkg/remoting/getty"
)

func init() {
clientOnResponseProcessor := &clientOnResponseProcessor{}
getty2.GetGettyClientHandlerInstance().RegisterProcessor(message.MessageType_SeataMergeResult, clientOnResponseProcessor)
getty2.GetGettyClientHandlerInstance().RegisterProcessor(message.MessageType_BranchRegisterResult, clientOnResponseProcessor)
getty2.GetGettyClientHandlerInstance().RegisterProcessor(message.MessageType_BranchStatusReportResult, clientOnResponseProcessor)
getty2.GetGettyClientHandlerInstance().RegisterProcessor(message.MessageType_GlobalLockQueryResult, clientOnResponseProcessor)
getty2.GetGettyClientHandlerInstance().RegisterProcessor(message.MessageType_RegRmResult, clientOnResponseProcessor)
getty2.GetGettyClientHandlerInstance().RegisterProcessor(message.MessageType_GlobalBeginResult, clientOnResponseProcessor)
getty2.GetGettyClientHandlerInstance().RegisterProcessor(message.MessageType_GlobalCommitResult, clientOnResponseProcessor)
}

type clientOnResponseProcessor struct {
}

func (f *clientOnResponseProcessor) Process(ctx context.Context, rpcMessage message.RpcMessage) error {
// 如果是合并的结果消息,直接通知已经处理完成
if mergedResult, ok := rpcMessage.Body.(message.MergeResultMessage); ok {

mergedMessage := getty2.GetGettyRemotingInstance().GetMergedMessage(rpcMessage.ID)
if mergedMessage != nil {
for i := 0; i < len(mergedMessage.Msgs); i++ {
msgID := mergedMessage.MsgIds[i]
response := getty2.GetGettyRemotingInstance().GetMessageFuture(msgID)
if response != nil {
response.Response = mergedResult.Msgs[i]
response.Done <- true
getty2.GetGettyRemotingInstance().RemoveMessageFuture(msgID)
}
}
getty2.GetGettyRemotingInstance().RemoveMergedMessageFuture(rpcMessage.ID)
}
return nil
} else {
// 如果是请求消息,做处理逻辑
msgFuture := getty2.GetGettyRemotingInstance().GetMessageFuture(rpcMessage.ID)
if msgFuture != nil {
getty2.GetGettyRemotingInstance().NotifyRpcMessageResponse(rpcMessage)
getty2.GetGettyRemotingInstance().RemoveMessageFuture(rpcMessage.ID)
} else {
if _, ok := rpcMessage.Body.(message.AbstractResultMessage); ok {
log.Infof("the rm client received response msg [{}] from tc server.", msgFuture)
}
}
}
return nil
}

pkg/rpc/processor/client/rm_branch_commit_processor.go → pkg/remoting/processor/client/rm_branch_commit_processor.go View File

@@ -2,44 +2,44 @@ package client

import (
"context"
"github.com/seata/seata-go/pkg/protocol"
"github.com/seata/seata-go/pkg/common/log"
"github.com/seata/seata-go/pkg/protocol/message"
getty2 "github.com/seata/seata-go/pkg/remoting/getty"
"github.com/seata/seata-go/pkg/rm"
"github.com/seata/seata-go/pkg/rpc/getty"
"github.com/seata/seata-go/pkg/utils/log"
)

func init() {
rmBranchCommitProcessor := &rmBranchCommitProcessor{}
getty.GetGettyClientHandlerInstance().RegisterProcessor(protocol.MessageTypeBranchCommit, rmBranchCommitProcessor)
getty2.GetGettyClientHandlerInstance().RegisterProcessor(message.MessageType_BranchCommit, rmBranchCommitProcessor)
}

type rmBranchCommitProcessor struct {
}

func (f *rmBranchCommitProcessor) Process(ctx context.Context, rpcMessage protocol.RpcMessage) error {
func (f *rmBranchCommitProcessor) Process(ctx context.Context, rpcMessage message.RpcMessage) error {
log.Infof("rm client handle branch commit process %v", rpcMessage)
request := rpcMessage.Body.(protocol.BranchCommitRequest)
request := rpcMessage.Body.(message.BranchCommitRequest)
xid := request.Xid
branchID := request.BranchId
resourceID := request.ResourceId
applicationData := request.ApplicationData
log.Infof("Branch committing: xid %s, branchID %s, resourceID %s, applicationData %s", xid, branchID, resourceID, applicationData)

status, err := rm.GetResourceManagerFacadeInstance().GetResourceManager(request.BranchType).BranchCommit(ctx, request.BranchType, xid, branchID, resourceID, applicationData)
status, err := rm.GetResourceManagerInstance().GetResourceManager(request.BranchType).BranchCommit(ctx, request.BranchType, xid, branchID, resourceID, applicationData)
if err != nil {
log.Infof("Branch commit error: %s", err.Error())
return err
}

// reply commit response to tc server
response := protocol.BranchCommitResponse{
AbstractBranchEndResponse: protocol.AbstractBranchEndResponse{
response := message.BranchCommitResponse{
AbstractBranchEndResponse: message.AbstractBranchEndResponse{
Xid: xid,
BranchId: branchID,
BranchStatus: status,
},
}
err = getty.GetGettyRemotingClient().SendAsyncResponse(response)
err = getty2.GetGettyRemotingClient().SendAsyncResponse(response)
if err != nil {
log.Error("BranchCommitResponse error: {%#v}", err.Error())
return err

pkg/rpc/processor/client/rm_branch_rollback_processor.go → pkg/remoting/processor/client/rm_branch_rollback_processor.go View File

@@ -2,44 +2,44 @@ package client

import (
"context"
"github.com/seata/seata-go/pkg/protocol"
"github.com/seata/seata-go/pkg/common/log"
"github.com/seata/seata-go/pkg/protocol/message"
getty2 "github.com/seata/seata-go/pkg/remoting/getty"
"github.com/seata/seata-go/pkg/rm"
"github.com/seata/seata-go/pkg/rpc/getty"
"github.com/seata/seata-go/pkg/utils/log"
)

func init() {
rmBranchCommitProcessor := &rmBranchCommitProcessor{}
getty.GetGettyClientHandlerInstance().RegisterProcessor(protocol.MessageTypeBranchCommit, rmBranchCommitProcessor)
getty2.GetGettyClientHandlerInstance().RegisterProcessor(message.MessageType_BranchCommit, rmBranchCommitProcessor)
}

type rmBranchRollbackProcessor struct {
}

func (f *rmBranchRollbackProcessor) Process(ctx context.Context, rpcMessage protocol.RpcMessage) error {
func (f *rmBranchRollbackProcessor) Process(ctx context.Context, rpcMessage message.RpcMessage) error {
log.Infof("rm client handle branch commit process %v", rpcMessage)
request := rpcMessage.Body.(protocol.BranchCommitRequest)
request := rpcMessage.Body.(message.BranchCommitRequest)
xid := request.Xid
branchID := request.BranchId
resourceID := request.ResourceId
applicationData := request.ApplicationData
log.Infof("Branch committing: xid %s, branchID %s, resourceID %s, applicationData %s", xid, branchID, resourceID, applicationData)

status, err := rm.GetResourceManagerFacadeInstance().GetResourceManager(request.BranchType).BranchCommit(ctx, request.BranchType, xid, branchID, resourceID, applicationData)
status, err := rm.GetResourceManagerInstance().GetResourceManager(request.BranchType).BranchCommit(ctx, request.BranchType, xid, branchID, resourceID, applicationData)
if err != nil {
log.Infof("Branch commit error: %s", err.Error())
return err
}

// reply commit response to tc server
response := protocol.BranchCommitResponse{
AbstractBranchEndResponse: protocol.AbstractBranchEndResponse{
response := message.BranchCommitResponse{
AbstractBranchEndResponse: message.AbstractBranchEndResponse{
Xid: xid,
BranchId: branchID,
BranchStatus: status,
},
}
err = getty.GetGettyRemotingClient().SendAsyncResponse(response)
err = getty2.GetGettyRemotingClient().SendAsyncResponse(response)
if err != nil {
log.Error("BranchCommitResponse error: {%#v}", err.Error())
return err

+ 10
- 0
pkg/remoting/processor/remoting_processor.go View File

@@ -0,0 +1,10 @@
package processor

import (
"context"
"github.com/seata/seata-go/pkg/protocol/message"
)

type RemotingProcessor interface {
Process(ctx context.Context, rpcMessage message.RpcMessage) error
}

+ 0
- 8
pkg/rm/api/transactional_executor.go View File

@@ -1,8 +0,0 @@
package api

import "context"

type TransactionalExecutor interface {
Execute(ctx context.Context, param interface{}) (interface{}, error)
GetTransactionInfo() TransactionInfo
}

+ 64
- 0
pkg/rm/common/handler/rm_handler.go View File

@@ -0,0 +1,64 @@
package handler

import (
"context"
"github.com/seata/seata-go/pkg/common/log"
"github.com/seata/seata-go/pkg/protocol/branch"
"github.com/seata/seata-go/pkg/protocol/message"
"github.com/seata/seata-go/pkg/protocol/resource"
)

type RMInboundHandler interface {
// Handle branch commit response.
HandleBranchCommitRequest(ctx context.Context, request message.BranchCommitRequest) (*message.BranchCommitResponse, error)
// Handle branch rollback response.
HandleBranchRollbackRequest(ctx context.Context, request message.BranchRollbackRequest) (*message.BranchRollbackResponse, error)
// Handle delete undo log .
HandleUndoLogDeleteRequest(ctx context.Context, request message.UndoLogDeleteRequest) error
}

type CommonRMHandler struct {
rmGetter resource.ResourceManagerGetter
}

func (h *CommonRMHandler) SetRMGetter(rmGetter resource.ResourceManagerGetter) {
h.rmGetter = rmGetter
}

// Handle branch commit response.
func (h *CommonRMHandler) HandleBranchCommitRequest(ctx context.Context, request message.BranchCommitRequest) (*message.BranchCommitResponse, error) {
xid := request.Xid
branchID := request.BranchId
resourceID := request.ResourceId
applicationData := request.ApplicationData
log.Infof("Branch committing: xid %s, branchID %s, resourceID %s, applicationData %s", xid, branchID, resourceID, applicationData)

status, err := h.rmGetter.GetResourceManager().BranchCommit(ctx, request.BranchType, xid, branchID, resourceID, applicationData)
if err != nil {
// TODO: handle error
return nil, err
}
return &message.BranchCommitResponse{
AbstractBranchEndResponse: message.AbstractBranchEndResponse{
Xid: xid,
BranchId: branchID,
BranchStatus: status,
},
}, nil
}

// Handle branch rollback response.
// TODO
func (h *CommonRMHandler) HandleBranchRollbackRequest(ctx context.Context, request message.BranchRollbackRequest) (*message.BranchRollbackResponse, error) {
return nil, nil
}

// Handle delete undo log .
// TODO
func (h *CommonRMHandler) HandleUndoLogDeleteRequest(ctx context.Context, request message.UndoLogDeleteRequest) error {
return nil
}

func (h *CommonRMHandler) GetBranchType() branch.BranchType {
return h.rmGetter.GetResourceManager().GetBranchType()
}

pkg/rm/rm_handler_facade.go → pkg/rm/common/handler/rm_handler_facade.go View File

@@ -1,15 +1,12 @@
package rm
package handler

import (
"context"
"github.com/seata/seata-go/pkg/protocol/branch"
"github.com/seata/seata-go/pkg/protocol/message"
"sync"
)

import (
"github.com/seata/seata-go/pkg/common/model"
"github.com/seata/seata-go/pkg/protocol"
)

var (
onceRMHandlerFacade = &sync.Once{}
rmHandler *RMHandlerFacade
@@ -21,7 +18,7 @@ type RMHandlerFacade struct {

func GetRMHandlerFacadeInstance() *RMHandlerFacade {
if rmHandler == nil {
onceRMFacade.Do(func() {
onceRMHandlerFacade.Do(func() {
rmHandler = &RMHandlerFacade{}
})
}
@@ -29,19 +26,19 @@ func GetRMHandlerFacadeInstance() *RMHandlerFacade {
}

// Handle branch commit response.
func (h *RMHandlerFacade) HandleBranchCommitRequest(ctx context.Context, request protocol.BranchCommitRequest) (*protocol.BranchCommitResponse, error) {
func (h *RMHandlerFacade) HandleBranchCommitRequest(ctx context.Context, request message.BranchCommitRequest) (*message.BranchCommitResponse, error) {
return h.getRMHandler(request.BranchType).HandleBranchCommitRequest(ctx, request)
}

// Handle branch rollback response.
// TODO
func (h *RMHandlerFacade) HandleBranchRollbackRequest(ctx context.Context, request protocol.BranchRollbackRequest) (*protocol.BranchRollbackResponse, error) {
func (h *RMHandlerFacade) HandleBranchRollbackRequest(ctx context.Context, request message.BranchRollbackRequest) (*message.BranchRollbackResponse, error) {
return h.getRMHandler(request.BranchType).HandleBranchRollbackRequest(ctx, request)
}

// Handle delete undo log .
// TODO
func (h *RMHandlerFacade) HandleUndoLogDeleteRequest(ctx context.Context, request protocol.UndoLogDeleteRequest) error {
func (h *RMHandlerFacade) HandleUndoLogDeleteRequest(ctx context.Context, request message.UndoLogDeleteRequest) error {
return h.getRMHandler(request.BranchType).HandleUndoLogDeleteRequest(ctx, request)
}

@@ -52,7 +49,7 @@ func (h *RMHandlerFacade) RegisteRMHandler(handler *CommonRMHandler) {
h.rmHandlerMap.Store(handler.GetBranchType(), handler)
}

func (h *RMHandlerFacade) getRMHandler(branchType model.BranchType) *CommonRMHandler {
func (h *RMHandlerFacade) getRMHandler(branchType branch.BranchType) *CommonRMHandler {
if handler, ok := h.rmHandlerMap.Load(branchType); ok {
return handler.(*CommonRMHandler)
}

pkg/rm/rm_remoting.go → pkg/rm/common/remoting/rm_remoting.go View File

@@ -1,10 +1,11 @@
package rm
package remoting

import (
"github.com/seata/seata-go/pkg/common/model"
"github.com/seata/seata-go/pkg/protocol"
"github.com/seata/seata-go/pkg/rpc/getty"
"github.com/seata/seata-go/pkg/utils/log"
"github.com/seata/seata-go/pkg/common/log"
"github.com/seata/seata-go/pkg/protocol/branch"
"github.com/seata/seata-go/pkg/protocol/message"
"github.com/seata/seata-go/pkg/protocol/resource"
"github.com/seata/seata-go/pkg/remoting/getty"
"sync"
)

@@ -27,23 +28,23 @@ type RMRemoting struct {
}

// Branch register long
func (RMRemoting) BranchRegister(branchType model.BranchType, resourceId, clientId, xid, applicationData, lockKeys string) (int64, error) {
func (RMRemoting) BranchRegister(branchType branch.BranchType, resourceId, clientId, xid, applicationData, lockKeys string) (int64, error) {
return 0, nil
}

// Branch report
func (RMRemoting) BranchReport(branchType model.BranchType, xid string, branchId int64, status model.BranchStatus, applicationData string) error {
func (RMRemoting) BranchReport(branchType branch.BranchType, xid string, branchId int64, status branch.BranchStatus, applicationData string) error {
return nil
}

// Lock query boolean
func (RMRemoting) LockQuery(branchType model.BranchType, resourceId, xid, lockKeys string) (bool, error) {
func (RMRemoting) LockQuery(branchType branch.BranchType, resourceId, xid, lockKeys string) (bool, error) {
return false, nil
}

func (r *RMRemoting) RegisterResource(resource model.Resource) error {
req := protocol.RegisterRMRequest{
AbstractIdentifyRequest: protocol.AbstractIdentifyRequest{
func (r *RMRemoting) RegisterResource(resource resource.Resource) error {
req := message.RegisterRMRequest{
AbstractIdentifyRequest: message.AbstractIdentifyRequest{
//todo replace with config
Version: "1.4.2",
ApplicationId: "tcc-sample",
@@ -58,9 +59,9 @@ func (r *RMRemoting) RegisterResource(resource model.Resource) error {
}

if isRegisterSuccess(res) {
r.onRegisterRMSuccess(res.(protocol.RegisterRMResponse))
r.onRegisterRMSuccess(res.(message.RegisterRMResponse))
} else {
r.onRegisterRMFailure(res.(protocol.RegisterRMResponse))
r.onRegisterRMFailure(res.(message.RegisterRMResponse))
}

return nil
@@ -73,28 +74,28 @@ func isRegisterSuccess(response interface{}) bool {
// return res.Identified
//}
//return false
if res, ok := response.(protocol.RegisterRMResponse); ok {
if res, ok := response.(message.RegisterRMResponse); ok {
return res.Identified
}
return false
}

func (r *RMRemoting) onRegisterRMSuccess(response protocol.RegisterRMResponse) {
func (r *RMRemoting) onRegisterRMSuccess(response message.RegisterRMResponse) {
// TODO
log.Infof("register RM success. response: %#v", response)
}

func (r *RMRemoting) onRegisterRMFailure(response protocol.RegisterRMResponse) {
func (r *RMRemoting) onRegisterRMFailure(response message.RegisterRMResponse) {
// TODO
log.Infof("register RM failure. response: %#v", response)
}

func (r *RMRemoting) onRegisterTMSuccess(response protocol.RegisterTMResponse) {
func (r *RMRemoting) onRegisterTMSuccess(response message.RegisterTMResponse) {
// TODO
log.Infof("register TM success. response: %#v", response)
}

func (r *RMRemoting) onRegisterTMFailure(response protocol.RegisterTMResponse) {
func (r *RMRemoting) onRegisterTMFailure(response message.RegisterTMResponse) {
// TODO
log.Infof("register TM failure. response: %#v", response)
}

pkg/rm/rm_remoting_test.go → pkg/rm/common/remoting/rm_remoting_test.go View File

@@ -1,16 +1,16 @@
package rm
package remoting

import (
"github.com/seata/seata-go/pkg/common/model"
"github.com/seata/seata-go/pkg/common/log"
_ "github.com/seata/seata-go/pkg/imports"
"github.com/seata/seata-go/pkg/protocol"
"github.com/seata/seata-go/pkg/rpc/getty"
"github.com/seata/seata-go/pkg/utils/log"
"github.com/seata/seata-go/pkg/protocol/message"
"github.com/seata/seata-go/pkg/protocol/resource"
"github.com/seata/seata-go/pkg/remoting/getty"
)

func (RMRemoting) RegisterResource(resource model.Resource) error {
req := protocol.RegisterRMRequest{
AbstractIdentifyRequest: protocol.AbstractIdentifyRequest{
func (RMRemoting) RegisterResource(resource resource.Resource) error {
req := message.RegisterRMRequest{
AbstractIdentifyRequest: message.AbstractIdentifyRequest{
//todo replace with config
Version: "1.4.2",
ApplicationId: "tcc-sample",

+ 0
- 54
pkg/rm/common_rm_handler.go View File

@@ -1,54 +0,0 @@
package rm

import (
"context"
"github.com/seata/seata-go/pkg/common/model"
"github.com/seata/seata-go/pkg/protocol"
"github.com/seata/seata-go/pkg/utils/log"
)

type CommonRMHandler struct {
rmGetter model.ResourceManagerGetter
}

func (h *CommonRMHandler) SetRMGetter(rmGetter model.ResourceManagerGetter) {
h.rmGetter = rmGetter
}

// Handle branch commit response.
func (h *CommonRMHandler) HandleBranchCommitRequest(ctx context.Context, request protocol.BranchCommitRequest) (*protocol.BranchCommitResponse, error) {
xid := request.Xid
branchID := request.BranchId
resourceID := request.ResourceId
applicationData := request.ApplicationData
log.Infof("Branch committing: xid %s, branchID %s, resourceID %s, applicationData %s", xid, branchID, resourceID, applicationData)

status, err := h.rmGetter.GetResourceManager().BranchCommit(ctx, request.BranchType, xid, branchID, resourceID, applicationData)
if err != nil {
// TODO: handle error
return nil, err
}
return &protocol.BranchCommitResponse{
AbstractBranchEndResponse: protocol.AbstractBranchEndResponse{
Xid: xid,
BranchId: branchID,
BranchStatus: status,
},
}, nil
}

// Handle branch rollback response.
// TODO
func (h *CommonRMHandler) HandleBranchRollbackRequest(ctx context.Context, request protocol.BranchRollbackRequest) (*protocol.BranchRollbackResponse, error) {
return nil, nil
}

// Handle delete undo log .
// TODO
func (h *CommonRMHandler) HandleUndoLogDeleteRequest(ctx context.Context, request protocol.UndoLogDeleteRequest) error {
return nil
}

func (h *CommonRMHandler) GetBranchType() model.BranchType {
return h.rmGetter.GetResourceManager().GetBranchType()
}

+ 87
- 0
pkg/rm/resource_manager.go View File

@@ -0,0 +1,87 @@
package rm

import (
"context"
"fmt"
"github.com/seata/seata-go/pkg/protocol/branch"
"github.com/seata/seata-go/pkg/protocol/resource"
"sync"
)

var (
// BranchType -> ResourceManager
resourceManagerMap sync.Map
// singletone ResourceManager
rmFacadeInstance *ResourceManager
onceRMFacade = &sync.Once{}
)

func GetResourceManagerInstance() *ResourceManager {
if rmFacadeInstance == nil {
onceRMFacade.Do(func() {
rmFacadeInstance = &ResourceManager{}
})
}
return rmFacadeInstance
}

type ResourceManager struct {
}

// 将事务管理器注册到这里
func RegisterResourceManager(resourceManager resource.ResourceManager) {
resourceManagerMap.Store(resourceManager.GetBranchType(), resourceManager)
}

func (*ResourceManager) GetResourceManager(branchType branch.BranchType) resource.ResourceManager {
rm, ok := resourceManagerMap.Load(branchType)
if !ok {
panic(fmt.Sprintf("No ResourceManager for BranchType: %v", branchType))
}
return rm.(resource.ResourceManager)
}

// Commit a branch transaction
func (d *ResourceManager) BranchCommit(ctx context.Context, branchType branch.BranchType, xid string, branchId int64, resourceId string, applicationData []byte) (branch.BranchStatus, error) {
return d.GetResourceManager(branchType).BranchCommit(ctx, branchType, xid, branchId, resourceId, applicationData)
}

// Rollback a branch transaction
func (d *ResourceManager) BranchRollback(ctx context.Context, branchType branch.BranchType, xid string, branchId int64, resourceId string, applicationData []byte) (branch.BranchStatus, error) {
return d.GetResourceManager(branchType).BranchRollback(ctx, branchType, xid, branchId, resourceId, applicationData)
}

// Branch register long
func (d *ResourceManager) BranchRegister(ctx context.Context, branchType branch.BranchType, resourceId, clientId, xid, applicationData, lockKeys string) (int64, error) {
return d.GetResourceManager(branchType).BranchRegister(ctx, branchType, resourceId, clientId, xid, applicationData, lockKeys)
}

// Branch report
func (d *ResourceManager) BranchReport(ctx context.Context, branchType branch.BranchType, xid string, branchId int64, status branch.BranchStatus, applicationData string) error {
return d.GetResourceManager(branchType).BranchReport(ctx, branchType, xid, branchId, status, applicationData)
}

// Lock query boolean
func (d *ResourceManager) LockQuery(ctx context.Context, branchType branch.BranchType, resourceId, xid, lockKeys string) (bool, error) {
return d.GetResourceManager(branchType).LockQuery(ctx, branchType, resourceId, xid, lockKeys)
}

// Register a model.Resource to be managed by model.Resource Manager
func (d *ResourceManager) RegisterResource(resource resource.Resource) error {
return d.GetResourceManager(resource.GetBranchType()).RegisterResource(resource)
}

// Unregister a model.Resource from the model.Resource Manager
func (d *ResourceManager) UnregisterResource(resource resource.Resource) error {
return d.GetResourceManager(resource.GetBranchType()).UnregisterResource(resource)
}

// Get all resources managed by this manager
func (d *ResourceManager) GetManagedResources() sync.Map {
return resourceManagerMap
}

// Get the model.BranchType
func (d *ResourceManager) GetBranchType() branch.BranchType {
panic("DefaultResourceManager isn't a real ResourceManager")
}

+ 0
- 89
pkg/rm/resource_manager_facade.go View File

@@ -1,89 +0,0 @@
package rm

import (
"context"
"fmt"
"sync"
)

import (
model2 "github.com/seata/seata-go/pkg/common/model"
)

var (
// BranchType -> ResourceManager
resourceManagerMap sync.Map
// singletone ResourceManagerFacade
rmFacadeInstance *ResourceManagerFacade
onceRMFacade = &sync.Once{}
)

func GetResourceManagerFacadeInstance() *ResourceManagerFacade {
if rmFacadeInstance == nil {
onceRMFacade.Do(func() {
rmFacadeInstance = &ResourceManagerFacade{}
})
}
return rmFacadeInstance
}

type ResourceManagerFacade struct {
}

// 将事务管理器注册到这里
func RegisterResourceManager(resourceManager model2.ResourceManager) {
resourceManagerMap.Store(resourceManager.GetBranchType(), resourceManager)
}

func (*ResourceManagerFacade) GetResourceManager(branchType model2.BranchType) model2.ResourceManager {
rm, ok := resourceManagerMap.Load(branchType)
if !ok {
panic(fmt.Sprintf("No ResourceManager for BranchType: %v", branchType))
}
return rm.(model2.ResourceManager)
}

// Commit a branch transaction
func (d *ResourceManagerFacade) BranchCommit(ctx context.Context, branchType model2.BranchType, xid string, branchId int64, resourceId string, applicationData []byte) (model2.BranchStatus, error) {
return d.GetResourceManager(branchType).BranchCommit(ctx, branchType, xid, branchId, resourceId, applicationData)
}

// Rollback a branch transaction
func (d *ResourceManagerFacade) BranchRollback(ctx context.Context, branchType model2.BranchType, xid string, branchId int64, resourceId string, applicationData []byte) (model2.BranchStatus, error) {
return d.GetResourceManager(branchType).BranchRollback(ctx, branchType, xid, branchId, resourceId, applicationData)
}

// Branch register long
func (d *ResourceManagerFacade) BranchRegister(ctx context.Context, branchType model2.BranchType, resourceId, clientId, xid, applicationData, lockKeys string) (int64, error) {
return d.GetResourceManager(branchType).BranchRegister(ctx, branchType, resourceId, clientId, xid, applicationData, lockKeys)
}

// Branch report
func (d *ResourceManagerFacade) BranchReport(ctx context.Context, branchType model2.BranchType, xid string, branchId int64, status model2.BranchStatus, applicationData string) error {
return d.GetResourceManager(branchType).BranchReport(ctx, branchType, xid, branchId, status, applicationData)
}

// Lock query boolean
func (d *ResourceManagerFacade) LockQuery(ctx context.Context, branchType model2.BranchType, resourceId, xid, lockKeys string) (bool, error) {
return d.GetResourceManager(branchType).LockQuery(ctx, branchType, resourceId, xid, lockKeys)
}

// Register a model.Resource to be managed by model.Resource Manager
func (d *ResourceManagerFacade) RegisterResource(resource model2.Resource) error {
return d.GetResourceManager(resource.GetBranchType()).RegisterResource(resource)
}

// Unregister a model.Resource from the model.Resource Manager
func (d *ResourceManagerFacade) UnregisterResource(resource model2.Resource) error {
return d.GetResourceManager(resource.GetBranchType()).UnregisterResource(resource)
}

// Get all resources managed by this manager
func (d *ResourceManagerFacade) GetManagedResources() sync.Map {
return resourceManagerMap
}

// Get the model.BranchType
func (d *ResourceManagerFacade) GetBranchType() model2.BranchType {
panic("DefaultResourceManager isn't a real ResourceManager")
}

+ 22
- 0
pkg/rm/tcc/handler/tcc_rm_handler.go View File

@@ -0,0 +1,22 @@
package handler

import (
"github.com/seata/seata-go/pkg/protocol/branch"
"github.com/seata/seata-go/pkg/protocol/resource"
"github.com/seata/seata-go/pkg/rm"
"github.com/seata/seata-go/pkg/rm/common/handler"
)

type TCCRMHandler struct {
handler.CommonRMHandler
}

func NewTCCRMHandler() TCCRMHandler {
handler := TCCRMHandler{}
handler.CommonRMHandler.SetRMGetter(handler)
return handler
}

func (TCCRMHandler) GetResourceManager() resource.ResourceManager {
return rm.GetResourceManagerInstance().GetResourceManager(branch.BranchTypeTCC)
}

+ 135
- 3
pkg/rm/tcc/tcc_resource.go View File

@@ -1,7 +1,25 @@
package tcc

import (
"github.com/seata/seata-go/pkg/common/model"
"context"
"fmt"
"github.com/seata/seata-go/pkg/common/log"
"github.com/seata/seata-go/pkg/protocol/branch"
"github.com/seata/seata-go/pkg/protocol/message"
"github.com/seata/seata-go/pkg/protocol/resource"
"github.com/seata/seata-go/pkg/remoting/getty"
"github.com/seata/seata-go/pkg/rm/common/remoting"
"github.com/seata/seata-go/pkg/rm/tcc/api"
"sync"
)

import (
"github.com/seata/seata-go/pkg/rm"
)

var (
tCCResourceManager *TCCResourceManager
onceTCCResourceManager = &sync.Once{}
)

type TCCResource struct {
@@ -25,6 +43,120 @@ func (t *TCCResource) GetResourceId() string {
return t.ActionName
}

func (t *TCCResource) GetBranchType() model.BranchType {
return model.BranchTypeTCC
func (t *TCCResource) GetBranchType() branch.BranchType {
return branch.BranchTypeTCC
}

func init() {
rm.RegisterResourceManager(GetTCCResourceManagerInstance())
}

func GetTCCResourceManagerInstance() *TCCResourceManager {
if tCCResourceManager == nil {
onceTCCResourceManager.Do(func() {
tCCResourceManager = &TCCResourceManager{
resourceManagerMap: sync.Map{},
rmRemoting: remoting.GetRMRemotingInstance(),
}
})
}
return tCCResourceManager
}

type TCCResourceManager struct {
rmRemoting *remoting.RMRemoting
// resourceID -> resource
resourceManagerMap sync.Map
}

// register transaction branch
func (t *TCCResourceManager) BranchRegister(ctx context.Context, branchType branch.BranchType, resourceId, clientId, xid, applicationData, lockKeys string) (int64, error) {
request := message.BranchRegisterRequest{
Xid: xid,
BranchType: t.GetBranchType(),
ResourceId: resourceId,
LockKey: lockKeys,
ApplicationData: []byte(applicationData),
}
resp, err := getty.GetGettyRemotingClient().SendSyncRequest(request)
if err != nil || resp == nil {
log.Errorf("BranchRegister error: %v, res %v", err.Error(), resp)
return 0, err
}
return resp.(message.BranchRegisterResponse).BranchId, nil
}

func (t *TCCResourceManager) BranchReport(ctx context.Context, ranchType branch.BranchType, xid string, branchId int64, status branch.BranchStatus, applicationData string) error {
//TODO implement me
panic("implement me")
}

func (t *TCCResourceManager) LockQuery(ctx context.Context, ranchType branch.BranchType, resourceId, xid, lockKeys string) (bool, error) {
//TODO implement me
panic("implement me")
}

func (t *TCCResourceManager) UnregisterResource(resource resource.Resource) error {
//TODO implement me
panic("implement me")
}

func (t *TCCResourceManager) RegisterResource(resource resource.Resource) error {
if _, ok := resource.(*TCCResource); !ok {
panic(fmt.Sprintf("register tcc resource error, TCCResource is needed, param %v", resource))
}
t.resourceManagerMap.Store(resource.GetResourceId(), resource)
return t.rmRemoting.RegisterResource(resource)
}

func (t *TCCResourceManager) GetManagedResources() sync.Map {
return t.resourceManagerMap
}

// Commit a branch transaction
func (t *TCCResourceManager) BranchCommit(ctx context.Context, ranchType branch.BranchType, xid string, branchID int64, resourceID string, applicationData []byte) (branch.BranchStatus, error) {
var tccResource *TCCResource
if resource, ok := t.resourceManagerMap.Load(resourceID); !ok {
err := fmt.Errorf("CC resource is not exist, resourceId: %s", resourceID)
return 0, err
} else {
tccResource, _ = resource.(*TCCResource)
}

err := tccResource.TCCServiceBean.Commit(ctx, t.getBusinessActionContext(xid, branchID, resourceID, applicationData))
if err != nil {
return branch.BranchStatusPhasetwoCommitFailedRetryable, err
}
return branch.BranchStatusPhasetwoCommitted, err
}

func (t *TCCResourceManager) getBusinessActionContext(xid string, branchID int64, resourceID string, applicationData []byte) api.BusinessActionContext {
return api.BusinessActionContext{
Xid: xid,
BranchId: string(branchID),
ActionName: resourceID,
// todo get ActionContext
//ActionContext:,
}
}

// Rollback a branch transaction
func (t *TCCResourceManager) BranchRollback(ctx context.Context, ranchType branch.BranchType, xid string, branchID int64, resourceID string, applicationData []byte) (branch.BranchStatus, error) {
var tccResource *TCCResource
if resource, ok := t.resourceManagerMap.Load(resourceID); !ok {
err := fmt.Errorf("CC resource is not exist, resourceId: %s", resourceID)
return 0, err
} else {
tccResource, _ = resource.(*TCCResource)
}

err := tccResource.TCCServiceBean.Rollback(ctx, t.getBusinessActionContext(xid, branchID, resourceID, applicationData))
if err != nil {
return branch.BranchStatusPhasetwoRollbacked, err
}
return branch.BranchStatusPhasetwoRollbackFailedRetryable, err
}

func (t *TCCResourceManager) GetBranchType() branch.BranchType {
return branch.BranchTypeTCC
}

+ 0
- 135
pkg/rm/tcc/tcc_rm.go View File

@@ -1,135 +0,0 @@
package tcc

import (
"context"
"fmt"
"github.com/seata/seata-go/pkg/protocol"
"github.com/seata/seata-go/pkg/rm/tcc/api"
"github.com/seata/seata-go/pkg/rpc/getty"
"github.com/seata/seata-go/pkg/utils/log"
"sync"
)

import (
"github.com/seata/seata-go/pkg/common/model"
"github.com/seata/seata-go/pkg/rm"
)

var (
tCCResourceManager *TCCResourceManager
onceTCCResourceManager = &sync.Once{}
)

func init() {
rm.RegisterResourceManager(GetTCCResourceManagerInstance())
}

func GetTCCResourceManagerInstance() *TCCResourceManager {
if tCCResourceManager == nil {
onceTCCResourceManager.Do(func() {
tCCResourceManager = &TCCResourceManager{
resourceManagerMap: sync.Map{},
rmRemoting: rm.GetRMRemotingInstance(),
}
})
}
return tCCResourceManager
}

type TCCResourceManager struct {
rmRemoting *rm.RMRemoting
// resourceID -> resource
resourceManagerMap sync.Map
}

// register transaction branch
func (t *TCCResourceManager) BranchRegister(ctx context.Context, branchType model.BranchType, resourceId, clientId, xid, applicationData, lockKeys string) (int64, error) {
request := protocol.BranchRegisterRequest{
Xid: xid,
BranchType: t.GetBranchType(),
ResourceId: resourceId,
LockKey: lockKeys,
ApplicationData: []byte(applicationData),
}
response, err := getty.GetGettyRemotingClient().SendSyncRequest(request)
if err != nil || response == nil {
log.Errorf("BranchRegister error: %v, res %v", err.Error(), response)
return 0, err
}
return response.(protocol.BranchRegisterResponse).BranchId, nil
}

func (t *TCCResourceManager) BranchReport(ctx context.Context, ranchType model.BranchType, xid string, branchId int64, status model.BranchStatus, applicationData string) error {
//TODO implement me
panic("implement me")
}

func (t *TCCResourceManager) LockQuery(ctx context.Context, ranchType model.BranchType, resourceId, xid, lockKeys string) (bool, error) {
//TODO implement me
panic("implement me")
}

func (t *TCCResourceManager) UnregisterResource(resource model.Resource) error {
//TODO implement me
panic("implement me")
}

func (t *TCCResourceManager) RegisterResource(resource model.Resource) error {
if _, ok := resource.(*TCCResource); !ok {
panic(fmt.Sprintf("register tcc resource error, TCCResource is needed, param %v", resource))
}
t.resourceManagerMap.Store(resource.GetResourceId(), resource)
return t.rmRemoting.RegisterResource(resource)
}

func (t *TCCResourceManager) GetManagedResources() sync.Map {
return t.resourceManagerMap
}

// Commit a branch transaction
func (t *TCCResourceManager) BranchCommit(ctx context.Context, ranchType model.BranchType, xid string, branchID int64, resourceID string, applicationData []byte) (model.BranchStatus, error) {
var tccResource *TCCResource
if resource, ok := t.resourceManagerMap.Load(resourceID); !ok {
err := fmt.Errorf("CC resource is not exist, resourceId: %s", resourceID)
return 0, err
} else {
tccResource, _ = resource.(*TCCResource)
}

err := tccResource.TCCServiceBean.Commit(ctx, t.getBusinessActionContext(xid, branchID, resourceID, applicationData))
if err != nil {
return model.BranchStatusPhasetwoCommitFailedRetryable, err
}
return model.BranchStatusPhasetwoCommitted, err
}

func (t *TCCResourceManager) getBusinessActionContext(xid string, branchID int64, resourceID string, applicationData []byte) api.BusinessActionContext {
return api.BusinessActionContext{
Xid: xid,
BranchId: string(branchID),
ActionName: resourceID,
// todo get ActionContext
//ActionContext:,
}
}

// Rollback a branch transaction
func (t *TCCResourceManager) BranchRollback(ctx context.Context, ranchType model.BranchType, xid string, branchID int64, resourceID string, applicationData []byte) (model.BranchStatus, error) {
var tccResource *TCCResource
if resource, ok := t.resourceManagerMap.Load(resourceID); !ok {
err := fmt.Errorf("CC resource is not exist, resourceId: %s", resourceID)
return 0, err
} else {
tccResource, _ = resource.(*TCCResource)
}

err := tccResource.TCCServiceBean.Rollback(ctx, t.getBusinessActionContext(xid, branchID, resourceID, applicationData))
if err != nil {
return model.BranchStatusPhasetwoRollbacked, err
}
return model.BranchStatusPhasetwoRollbackFailedRetryable, err
}

func (t *TCCResourceManager) GetBranchType() model.BranchType {
return model.BranchTypeTCC
}

+ 0
- 20
pkg/rm/tcc/tcc_rm_handler.go View File

@@ -1,20 +0,0 @@
package tcc

import (
"github.com/seata/seata-go/pkg/common/model"
"github.com/seata/seata-go/pkg/rm"
)

type TCCRMHandler struct {
rm.CommonRMHandler
}

func NewTCCRMHandler() TCCRMHandler {
handler := TCCRMHandler{}
handler.CommonRMHandler.SetRMGetter(handler)
return handler
}

func (TCCRMHandler) GetResourceManager() model.ResourceManager {
return rm.GetResourceManagerFacadeInstance().GetResourceManager(model.BranchTypeTCC)
}

+ 0
- 22
pkg/rm/tcc/tcc_runner.go View File

@@ -1,22 +0,0 @@
package tcc

import (
"context"
)

var (
tccRunnerSingleton = &TccRunner{}
)

type TccRunner struct {
}

func (c *TccRunner) run(ctx context.Context, tccService TCCService, params []interface{}) {
// step1: register rm service

// step2: open global transaction

// step3: do prepare Method

// step4: commit prepare transaction
}

+ 16
- 15
pkg/rm/tcc/tcc_service.go View File

@@ -6,16 +6,17 @@ import (
"fmt"
"github.com/pkg/errors"
"github.com/seata/seata-go/pkg/common"
"github.com/seata/seata-go/pkg/common/model"
"github.com/seata/seata-go/pkg/common/log"
"github.com/seata/seata-go/pkg/common/net"
"github.com/seata/seata-go/pkg/protocol/branch"
context2 "github.com/seata/seata-go/pkg/protocol/transaction"
"github.com/seata/seata-go/pkg/protocol/transaction/executor"
"github.com/seata/seata-go/pkg/rm"
api2 "github.com/seata/seata-go/pkg/rm/tcc/api"
"github.com/seata/seata-go/pkg/utils"
"github.com/seata/seata-go/pkg/utils/log"
"time"
)

import (
"github.com/seata/seata-go/pkg/rm/api"
"github.com/seata/seata-go/pkg/rm/tcc/remoting"
)

@@ -45,7 +46,7 @@ func NewTCCServiceProxy(tccService TCCService) TCCService {
AppName: "",
ActionName: tccService.GetActionName(),
}
err := rm.GetResourceManagerFacadeInstance().GetResourceManager(model.BranchTypeTCC).RegisterResource(&tccResource)
err := rm.GetResourceManagerInstance().GetResourceManager(branch.BranchTypeTCC).RegisterResource(&tccResource)
if err != nil {
panic(fmt.Sprintf("NewTCCServiceProxy registerResource error: {%#v}", err.Error()))
}
@@ -57,9 +58,9 @@ func NewTCCServiceProxy(tccService TCCService) TCCService {

func (t *TCCServiceProxy) Prepare(ctx context.Context, param interface{}) error {
var err error
if model.IsSeataContext(ctx) {
if context2.IsSeataContext(ctx) {
// execute transaction
_, err = api.GetTransactionTemplate().Execute(ctx, t, param)
_, err = executor.GetTransactionTemplate().Execute(ctx, t, param)
} else {
log.Warn("context is not inited as seata context, will not execute transaction!")
err = t.TCCService.Prepare(ctx, param)
@@ -79,18 +80,18 @@ func (t *TCCServiceProxy) Execute(ctx context.Context, param interface{}) (inter

func (t *TCCServiceProxy) RegisteBranch(ctx context.Context, param interface{}) error {
// register transaction branch
if !model.HasXID(ctx) {
if !context2.HasXID(ctx) {
err := errors.New("BranchRegister error, xid should not be nil")
log.Errorf(err.Error())
return err
}
tccContext := make(map[string]interface{}, 0)
tccContext[common.StartTime] = time.Now().UnixNano() / 1e6
tccContext[common.HostName] = utils.GetLocalIp()
tccContext[common.HostName] = net.GetLocalIp()
tccContextStr, _ := json.Marshal(tccContext)

branchId, err := rm.GetResourceManagerFacadeInstance().GetResourceManager(model.BranchTypeTCC).BranchRegister(
ctx, model.BranchTypeTCC, t.GetActionName(), "", model.GetXID(ctx), string(tccContextStr), "")
branchId, err := rm.GetResourceManagerInstance().GetResourceManager(branch.BranchTypeTCC).BranchRegister(
ctx, branch.BranchTypeTCC, t.GetActionName(), "", context2.GetXID(ctx), string(tccContextStr), "")
if err != nil {
err = errors.New(fmt.Sprintf("BranchRegister error: %v", err.Error()))
log.Error(err.Error())
@@ -98,18 +99,18 @@ func (t *TCCServiceProxy) RegisteBranch(ctx context.Context, param interface{})
}

actionContext := &api2.BusinessActionContext{
Xid: model.GetXID(ctx),
Xid: context2.GetXID(ctx),
BranchId: string(branchId),
ActionName: t.GetActionName(),
ActionContext: param,
}
model.SetBusinessActionContext(ctx, actionContext)
context2.SetBusinessActionContext(ctx, actionContext)
return nil
}

func (t *TCCServiceProxy) GetTransactionInfo() api.TransactionInfo {
func (t *TCCServiceProxy) GetTransactionInfo() context2.TransactionInfo {
// todo replace with config
return api.TransactionInfo{
return context2.TransactionInfo{
TimeOut: 10000,
Name: t.GetActionName(),
//Propagation, Propagation


+ 0
- 26
pkg/rpc/processor/client/client_heart_beat_processon.go View File

@@ -1,26 +0,0 @@
package client

import (
"context"
)

import (
"github.com/seata/seata-go/pkg/protocol"
"github.com/seata/seata-go/pkg/rpc/getty"
"github.com/seata/seata-go/pkg/utils/log"
)

func init() {
getty.GetGettyClientHandlerInstance().RegisterProcessor(protocol.MessageTypeHeartbeatMsg, &clientHeartBeatProcesson{})
}

type clientHeartBeatProcesson struct{}

func (f *clientHeartBeatProcesson) Process(ctx context.Context, rpcMessage protocol.RpcMessage) error {
if _, ok := rpcMessage.Body.(protocol.HeartBeatMessage); ok {
// TODO 如何从context中获取远程服务的信息?
//log.Infof("received PONG from {}", ctx.channel().remoteAddress())
log.Infof("received PONG from {}", ctx)
}
return nil
}

+ 0
- 58
pkg/rpc/processor/client/client_on_response_processor.go View File

@@ -1,58 +0,0 @@
package client

import (
"context"
)

import (
"github.com/seata/seata-go/pkg/protocol"
"github.com/seata/seata-go/pkg/rpc/getty"
"github.com/seata/seata-go/pkg/utils/log"
)

func init() {
clientOnResponseProcessor := &clientOnResponseProcessor{}
getty.GetGettyClientHandlerInstance().RegisterProcessor(protocol.MessageTypeSeataMergeResult, clientOnResponseProcessor)
getty.GetGettyClientHandlerInstance().RegisterProcessor(protocol.MessageTypeBranchRegisterResult, clientOnResponseProcessor)
getty.GetGettyClientHandlerInstance().RegisterProcessor(protocol.MessageTypeBranchStatusReportResult, clientOnResponseProcessor)
getty.GetGettyClientHandlerInstance().RegisterProcessor(protocol.MessageTypeGlobalLockQueryResult, clientOnResponseProcessor)
getty.GetGettyClientHandlerInstance().RegisterProcessor(protocol.MessageTypeRegRmResult, clientOnResponseProcessor)
getty.GetGettyClientHandlerInstance().RegisterProcessor(protocol.MessageTypeGlobalBeginResult, clientOnResponseProcessor)
getty.GetGettyClientHandlerInstance().RegisterProcessor(protocol.MessageTypeGlobalCommitResult, clientOnResponseProcessor)
}

type clientOnResponseProcessor struct {
}

func (f *clientOnResponseProcessor) Process(ctx context.Context, rpcMessage protocol.RpcMessage) error {
// 如果是合并的结果消息,直接通知已经处理完成
if mergedResult, ok := rpcMessage.Body.(protocol.MergeResultMessage); ok {

mergedMessage := getty.GetGettyRemotingInstance().GetMergedMessage(rpcMessage.ID)
if mergedMessage != nil {
for i := 0; i < len(mergedMessage.Msgs); i++ {
msgID := mergedMessage.MsgIds[i]
response := getty.GetGettyRemotingInstance().GetMessageFuture(msgID)
if response != nil {
response.Response = mergedResult.Msgs[i]
response.Done <- true
getty.GetGettyRemotingInstance().RemoveMessageFuture(msgID)
}
}
getty.GetGettyRemotingInstance().RemoveMergedMessageFuture(rpcMessage.ID)
}
return nil
} else {
// 如果是请求消息,做处理逻辑
msgFuture := getty.GetGettyRemotingInstance().GetMessageFuture(rpcMessage.ID)
if msgFuture != nil {
getty.GetGettyRemotingInstance().NotifyRpcMessageResponse(rpcMessage)
getty.GetGettyRemotingInstance().RemoveMessageFuture(rpcMessage.ID)
} else {
if _, ok := rpcMessage.Body.(protocol.AbstractResultMessage); ok {
log.Infof("the rm client received response msg [{}] from tc server.", msgFuture)
}
}
}
return nil
}

+ 0
- 13
pkg/rpc/processor/remoting_processor.go View File

@@ -1,13 +0,0 @@
package processor

import (
"context"
)

import (
"github.com/seata/seata-go/pkg/protocol"
)

type RemotingProcessor interface {
Process(ctx context.Context, rpcMessage protocol.RpcMessage) error
}

+ 5
- 0
pkg/server/server.go View File

@@ -0,0 +1,5 @@
package main

func main() {
// start seata server
}

+ 0
- 154
pkg/tm/global_transaction.go View File

@@ -1,154 +0,0 @@
package tm

import (
"context"
"fmt"
"github.com/pkg/errors"
"github.com/seata/seata-go/pkg/common/model"
"github.com/seata/seata-go/pkg/protocol"
"github.com/seata/seata-go/pkg/rpc/getty"
"github.com/seata/seata-go/pkg/tm/api"
"github.com/seata/seata-go/pkg/utils/log"
"sync"
)

type GlobalTransaction struct {
Xid string
Status model.GlobalStatus
Role model.GlobalTransactionRole
}

var (
// singletone ResourceManagerFacade
globalTransactionManager *GlobalTransactionManager
onceGlobalTransactionManager = &sync.Once{}
)

func GetGlobalTransactionManager() *GlobalTransactionManager {
if globalTransactionManager == nil {
onceGlobalTransactionManager.Do(func() {
globalTransactionManager = &GlobalTransactionManager{}
})
}
return globalTransactionManager
}

type GlobalTransactionManager struct {
}

// Begin a new global transaction with given timeout and given name.
func (g *GlobalTransactionManager) Begin(ctx context.Context, transaction *GlobalTransaction, timeout int32, name string) error {
if transaction.Role != model.LAUNCHER {
log.Infof("Ignore Begin(): just involved in global transaction %s", transaction.Xid)
return nil
}
if transaction.Xid != "" {
return errors.New(fmt.Sprintf("Global transaction already exists,can't begin a new global transaction, currentXid = %s ", transaction.Xid))
}

req := protocol.GlobalBeginRequest{
TransactionName: name,
Timeout: timeout,
}
res, err := getty.GetGettyRemotingClient().SendSyncRequest(req)
if err != nil {
log.Errorf("GlobalBeginRequest error, xid %s, error %v", transaction.Xid, err)
return err
}
if res == nil || res.(protocol.GlobalBeginResponse).ResultCode == protocol.ResultCodeFailed {
log.Errorf("GlobalBeginRequest error, xid %s, res %v", transaction.Xid, res)
return err
}
log.Infof("GlobalBeginRequest success, xid %s, res %v", transaction.Xid, res)

transaction.Status = model.Begin
transaction.Xid = res.(protocol.GlobalBeginResponse).Xid
model.SetXID(ctx, res.(protocol.GlobalBeginResponse).Xid)
return nil
}

// Commit the global transaction.
func (g *GlobalTransactionManager) Commit(ctx context.Context, transaction *GlobalTransaction) error {
if transaction.Role != model.LAUNCHER {
log.Infof("Ignore Commit(): just involved in global transaction [{}]", transaction.Xid)
return nil
}
if transaction.Xid == "" {
return errors.New("Commit xid should not be empty")
}

// todo: replace retry with config
var (
err error
res interface{}
)
for retry := 5; retry > 0; retry-- {
req := protocol.GlobalCommitRequest{
AbstractGlobalEndRequest: protocol.AbstractGlobalEndRequest{
Xid: transaction.Xid,
},
}
res, err = getty.GetGettyRemotingClient().SendSyncRequest(req)
if err != nil {
log.Errorf("GlobalCommitRequest error, xid %s, error %v", transaction.Xid, err)
} else {
break
}
}
if err == nil && res != nil {
transaction.Status = res.(protocol.GlobalCommitResponse).GlobalStatus
}
model.UnbindXid(ctx)
log.Infof("GlobalCommitRequest commit success, xid %s", transaction.Xid)
return err
}

// Rollback the global transaction.
func (g *GlobalTransactionManager) Rollback(ctx context.Context, transaction *GlobalTransaction) error {
if transaction.Role != model.LAUNCHER {
log.Infof("Ignore Commit(): just involved in global transaction [{}]", transaction.Xid)
return nil
}
if transaction.Xid == "" {
return errors.New("Commit xid should not be empty")
}

// todo: replace retry with config
var (
err error
res interface{}
)
for retry := 5; retry > 0; retry-- {
req := protocol.GlobalRollbackRequest{
AbstractGlobalEndRequest: protocol.AbstractGlobalEndRequest{
Xid: transaction.Xid,
},
}
res, err = getty.GetGettyRemotingClient().SendSyncRequest(req)
if err != nil {
log.Errorf("GlobalRollbackRequest error, xid %s, error %v", transaction.Xid, err)
} else {
break
}
}
if err == nil && res != nil {
transaction.Status = res.(protocol.GlobalRollbackResponse).GlobalStatus
}
model.UnbindXid(ctx)
return err
}

// Suspend the global transaction.
func (g *GlobalTransactionManager) Suspend() (api.SuspendedResourcesHolder, error) {
panic("implement me")
}

// Resume the global transaction.
func (g *GlobalTransactionManager) Resume(suspendedResourcesHolder api.SuspendedResourcesHolder) error {
panic("implement me")
}

// report the global transaction status.
func (g *GlobalTransactionManager) GlobalReport(globalStatus model.GlobalStatus) error {
panic("implement me")
}

+ 3
- 3
test/tcc_service_test.go View File

@@ -2,12 +2,12 @@ package test

import (
"context"
"github.com/seata/seata-go/pkg/common/model"
"github.com/seata/seata-go/pkg/common/log"
_ "github.com/seata/seata-go/pkg/imports"
context2 "github.com/seata/seata-go/pkg/protocol/transaction"
"github.com/seata/seata-go/pkg/rm/tcc"
"github.com/seata/seata-go/pkg/rm/tcc/api"
"github.com/seata/seata-go/pkg/rm/tcc/remoting"
"github.com/seata/seata-go/pkg/utils/log"
"testing"
)

@@ -43,7 +43,7 @@ func (T TestTCCServiceBusiness) GetServiceType() remoting.ServiceType {

func TestNew(test *testing.T) {
tccService := tcc.NewTCCServiceProxy(TestTCCServiceBusiness{})
tccService.Prepare(model.InitSeataContext(context.Background()), 1)
tccService.Prepare(context2.InitSeataContext(context.Background()), 1)

//time.Sleep(time.Second * 1000)
}

+ 2
- 2
testdata/mock/mock_tcc_service.go View File

@@ -3,13 +3,13 @@ package mock
import (
"context"
"fmt"
"github.com/seata/seata-go/pkg/common/xid"
"github.com/seata/seata-go/pkg/rm/tcc/api"
)

import (
"github.com/seata/seata-go/pkg/rm/tcc/remoting"
_ "github.com/seata/seata-go/pkg/utils/xid"
xid_utils "github.com/seata/seata-go/pkg/utils/xid"
)

// 注册RM资源
@@ -21,7 +21,7 @@ type MockTccService struct {
}

func (*MockTccService) Prepare(ctx context.Context, params interface{}) error {
xid := xid_utils.GetXID(ctx)
xid := xid_utils.xid_utils.GetXID(ctx)
fmt.Printf("TccActionOne prepare, xid:" + xid)
return nil
}


Loading…
Cancel
Save