Browse Source

Merge pull request #10 from luky116/addUnitTest

Add unit test
tags/v0.1.0-rc1
Xin.Zh GitHub 3 years ago
parent
commit
00d7850def
No known key found for this signature in database GPG Key ID: 4AEE18F83AFDEB23
80 changed files with 1126 additions and 1131 deletions
  1. +6
    -2
      go.mod
  2. +16
    -520
      go.sum
  3. +2
    -0
      goimports.sh
  4. +2
    -6
      pkg/common/constants.go
  5. +107
    -8
      pkg/common/error/error.go
  6. +0
    -3
      pkg/common/log/logging.go
  7. +1
    -1
      pkg/config/client_config.go
  8. +5
    -5
      pkg/config/getty_config.go
  9. +11
    -11
      pkg/protocol/branch/branch.go
  10. +1
    -3
      pkg/protocol/codec/branch_register_req_codec.go
  11. +18
    -18
      pkg/protocol/codec/branch_register_req_codec_test.go
  12. +6
    -19
      pkg/protocol/codec/branch_register_response_codec.go
  13. +45
    -0
      pkg/protocol/codec/branch_register_response_codec_test.go
  14. +5
    -11
      pkg/protocol/codec/codec.go
  15. +0
    -0
      pkg/protocol/codec/codec_helper.go
  16. +9
    -20
      pkg/protocol/codec/common_global_end_request_codec.go
  17. +3
    -6
      pkg/protocol/codec/common_global_end_response_codec.go
  18. +1
    -4
      pkg/protocol/codec/common_identify_request_codec.go
  19. +0
    -3
      pkg/protocol/codec/common_identify_response_codec.go
  20. +1
    -4
      pkg/protocol/codec/global_begin_request_codec.go
  21. +38
    -0
      pkg/protocol/codec/global_begin_request_codec_test.go
  22. +3
    -6
      pkg/protocol/codec/global_begin_response_codec.go
  23. +47
    -0
      pkg/protocol/codec/global_begin_response_codec_test.go
  24. +1
    -1
      pkg/protocol/codec/global_commit_req_codec.go
  25. +40
    -0
      pkg/protocol/codec/global_commit_req_codec_test.go
  26. +6
    -1
      pkg/protocol/codec/global_commit_response_codec.go
  27. +45
    -0
      pkg/protocol/codec/global_commit_response_codec_test.go
  28. +1
    -1
      pkg/protocol/codec/global_report_request_codec.go
  29. +6
    -1
      pkg/protocol/codec/global_report_response_codec.go
  30. +45
    -0
      pkg/protocol/codec/global_report_response_codec_test.go
  31. +2
    -2
      pkg/protocol/codec/global_rollback_req_codec.go
  32. +40
    -0
      pkg/protocol/codec/global_rollback_req_codec_test.go
  33. +6
    -1
      pkg/protocol/codec/global_rollback_response_codec.go
  34. +45
    -0
      pkg/protocol/codec/global_rollback_response_codec_test.go
  35. +1
    -1
      pkg/protocol/codec/global_status_req_codec.go
  36. +40
    -0
      pkg/protocol/codec/global_status_req_codec_test.go
  37. +6
    -1
      pkg/protocol/codec/global_status_response_codec.go
  38. +45
    -0
      pkg/protocol/codec/global_status_response_codec_test.go
  39. +2
    -5
      pkg/protocol/codec/register_rm_request_codec.go
  40. +17
    -18
      pkg/protocol/codec/register_rm_request_codec_test.go
  41. +6
    -1
      pkg/protocol/codec/register_rm_response_codec.go
  42. +45
    -0
      pkg/protocol/codec/register_rm_response_codec_test.go
  43. +1
    -1
      pkg/protocol/codec/register_tm_request_codec.go
  44. +42
    -0
      pkg/protocol/codec/register_tm_request_codec_test.go
  45. +1
    -1
      pkg/protocol/codec/register_tm_response_codec.go
  46. +46
    -0
      pkg/protocol/codec/register_tm_response_codec_test.go
  47. +100
    -0
      pkg/protocol/message/constant.go
  48. +2
    -2
      pkg/protocol/message/message_apis.go
  49. +1
    -2
      pkg/protocol/message/request_message.go
  50. +3
    -3
      pkg/protocol/message/response_message.go
  51. +1
    -3
      pkg/protocol/resource/resource.go
  52. +0
    -127
      pkg/protocol/transaction/expection_code.go
  53. +0
    -119
      pkg/protocol/transaction/transaction_status.go
  54. +5
    -10
      pkg/remoting/getty/getty_client.go
  55. +0
    -6
      pkg/remoting/getty/getty_remoting.go
  56. +1
    -7
      pkg/remoting/getty/listener.go
  57. +0
    -7
      pkg/remoting/getty/readwriter.go
  58. +51
    -0
      pkg/remoting/getty/readwriter_test.go
  59. +0
    -6
      pkg/remoting/getty/rpc_client.go
  60. +0
    -2
      pkg/remoting/getty/session_manager.go
  61. +0
    -2
      pkg/remoting/processor/client/client_heart_beat_processon.go
  62. +6
    -2
      pkg/remoting/processor/client/client_on_response_processor.go
  63. +1
    -2
      pkg/remoting/processor/client/rm_branch_commit_processor.go
  64. +3
    -4
      pkg/remoting/processor/client/rm_branch_rollback_processor.go
  65. +0
    -2
      pkg/remoting/processor/remoting_processor.go
  66. +0
    -2
      pkg/rm/common/handler/rm_handler.go
  67. +0
    -2
      pkg/rm/common/handler/rm_handler_facade.go
  68. +0
    -8
      pkg/rm/common/remoting/rm_remoting.go
  69. +2
    -5
      pkg/rm/resource_manager.go
  70. +4
    -0
      pkg/rm/rm_client.go
  71. +1
    -1
      pkg/rm/tcc/api/business_action_context.go
  72. +3
    -5
      pkg/rm/tcc/tcc_resource.go
  73. +10
    -15
      pkg/rm/tcc/tcc_service.go
  74. +33
    -14
      pkg/tm/constant.go
  75. +28
    -25
      pkg/tm/context.go
  76. +13
    -21
      pkg/tm/global_transaction.go
  77. +1
    -1
      pkg/tm/suspended_resources_holder.go
  78. +35
    -32
      pkg/tm/transaction_executor.go
  79. +1
    -4
      test/rpc_remoting_client_test.go
  80. +5
    -5
      test/tcc_service_test.go

+ 6
- 2
go.mod View File

@@ -6,15 +6,19 @@ require (
github.com/BurntSushi/toml v1.1.0 // indirect
github.com/apache/dubbo-getty v1.4.8
github.com/dubbogo/gost v1.11.23
github.com/dubbogo/tools v1.0.9 // indirect
github.com/fagongzi/goetty v1.3.1
github.com/fagongzi/log v0.0.0-20170831135209-9a647df25e0e
github.com/fagongzi/util v0.0.0-20181102105153-fd38e0f42a4f
github.com/golang/snappy v0.0.4 // indirect
github.com/incu6us/goimports-reviser v0.1.6 // indirect
github.com/kr/pretty v0.3.0 // indirect
github.com/natefinch/lumberjack v2.0.0+incompatible
github.com/pkg/errors v0.9.1
github.com/rogpeppe/go-internal v1.8.0 // indirect
github.com/stretchr/testify v1.7.0
go.uber.org/atomic v1.9.0
go.uber.org/zap v1.19.1
golang.org/x/tools v0.1.11 // indirect
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c // indirect
gopkg.in/natefinch/lumberjack.v2 v2.0.0 // indirect
vimagination.zapto.org/byteio v0.0.0-20200222190125-d27cba0f0b10
vimagination.zapto.org/memio v0.0.0-20200222190306-588ebc67b97d // indirect


+ 16
- 520
go.sum
File diff suppressed because it is too large
View File


+ 2
- 0
goimports.sh View File

@@ -0,0 +1,2 @@
go get -v golang.org/x/tools/cmd/goimports
goimports -w .

+ 2
- 6
pkg/common/constants.go View File

@@ -18,10 +18,6 @@
package common

const (
StartTime = "action-start-time"
HostName = "host-name"
TccBusinessActionContext = "tcc-business-action-context"

CONTEXT_VARIABLE = "contextVariable"
XID = "xid"
StartTime = "action-start-time"
HostName = "host-name"
)

+ 107
- 8
pkg/common/error/error.go View File

@@ -17,17 +17,116 @@

package error

import (
"github.com/pkg/errors"
import "github.com/pkg/errors"

var (
Error_TooManySessions = errors.New("too many seeessions")
Error_HeartBeatTimeOut = errors.New("heart beat time out")
)

type ErrorCode int32
type TransactionExceptionCode byte

const (
ErrorCode_IllegalState ErrorCode = 40001
)
/**
* Unknown transaction exception code.
*/
TransactionExceptionCodeUnknown = TransactionExceptionCode(0)

var (
Error_TooManySessions = errors.New("too many seeessions")
Error_HeartBeatTimeOut = errors.New("heart beat time out")
/**
* BeginFailed
*/
TransactionExceptionCodeBeginFailed = 1

/**
* Lock key conflict transaction exception code.
*/
TransactionExceptionCodeLockKeyConflict = 2

/**
* Io transaction exception code.
*/

/**
* Branch rollback failed retriable transaction exception code.
*/
TransactionExceptionCodeBranchRollbackFailedRetriable = 3

/**
* Branch rollback failed unretriable transaction exception code.
*/
TransactionExceptionCodeBranchRollbackFailedUnretriable = 4

/**
* Branch register failed transaction exception code.
*/
TransactionExceptionCodeBranchRegisterFailed = 5

/**
* Branch report failed transaction exception code.
*/
TransactionExceptionCodeBranchReportFailed = 6

/**
* Lockable check failed transaction exception code.
*/
TransactionExceptionCodeLockableCheckFailed = 7

/**
* Branch transaction not exist transaction exception code.
*/
TransactionExceptionCodeBranchTransactionNotExist = 8

/**
* Global transaction not exist transaction exception code.
*/
TransactionExceptionCodeGlobalTransactionNotExist = 9

/**
* Global transaction not active transaction exception code.
*/
TransactionExceptionCodeGlobalTransactionNotActive = 10

/**
* Global transaction status invalid transaction exception code.
*/
TransactionExceptionCodeGlobalTransactionStatusInvalid = 11

/**
* Failed to send branch commit request transaction exception code.
*/
TransactionExceptionCodeFailedToSendBranchCommitRequest = 12

/**
* Failed to send branch rollback request transaction exception code.
*/
TransactionExceptionCodeFailedToSendBranchRollbackRequest = 13

/**
* Failed to add branch transaction exception code.
*/
TransactionExceptionCodeFailedToAddBranch = 14

/**
* Failed to lock global transaction exception code.
*/
TransactionExceptionCodeFailedLockGlobalTranscation = 15

/**
* FailedWriteSession
*/
TransactionExceptionCodeFailedWriteSession = 16

/**
* Failed to holder exception code
*/
FailedStore = 17
)

type TransactionException struct {
Code TransactionExceptionCode
Message string
}

func (e TransactionException) Error() string {
return "TransactionException: " + e.Message
}

+ 0
- 3
pkg/common/log/logging.go View File

@@ -21,11 +21,8 @@ import (
"bytes"
"errors"
"fmt"
)

import (
"github.com/natefinch/lumberjack"

"go.uber.org/zap"
"go.uber.org/zap/zapcore"
)


+ 1
- 1
pkg/config/client_config.go View File

@@ -34,7 +34,7 @@ type ClientConfig struct {
DSN string `yaml:"dsn" json:"dsn,omitempty"`
ReportRetryCount int `default:"5" yaml:"report_retry_count" json:"report_retry_count,omitempty"`
ReportSuccessEnable bool `default:"false" yaml:"report_success_enable" json:"report_success_enable,omitempty"`
LockRetryInterval time.Duration `default:"10ms" yaml:"lock_retry_interval" json:"lock_retry_interval,omitempty"`
LockRetryInterval time.Duration `default:"10" yaml:"lock_retry_interval" json:"lock_retry_interval,omitempty"`
LockRetryTimes int `default:"30" yaml:"lock_retry_times" json:"lock_retry_times,omitempty"`
} `yaml:"at" json:"at,omitempty"`
}


+ 5
- 5
pkg/config/getty_config.go View File

@@ -63,13 +63,13 @@ type GettySessionParam struct {
CompressEncoding bool `default:"false" yaml:"compress_encoding" json:"compress_encoding,omitempty"`
TCPNoDelay bool `default:"true" yaml:"tcp_no_delay" json:"tcp_no_delay,omitempty"`
TCPKeepAlive bool `default:"true" yaml:"tcp_keep_alive" json:"tcp_keep_alive,omitempty"`
KeepAlivePeriod time.Duration `default:"180s" yaml:"keep_alive_period" json:"keep_alive_period,omitempty"`
CronPeriod time.Duration `default:"1s" yaml:"keep_alive_period" json:"keep_alive_period,omitempty"`
KeepAlivePeriod time.Duration `default:"180" yaml:"keep_alive_period" json:"keep_alive_period,omitempty"`
CronPeriod time.Duration `default:"1" yaml:"cron_period" json:"cron_period,omitempty"`
TCPRBufSize int `default:"262144" yaml:"tcp_r_buf_size" json:"tcp_r_buf_size,omitempty"`
TCPWBufSize int `default:"65536" yaml:"tcp_w_buf_size" json:"tcp_w_buf_size,omitempty"`
TCPReadTimeout time.Duration `default:"1s" yaml:"tcp_read_timeout" json:"tcp_read_timeout,omitempty"`
TCPWriteTimeout time.Duration `default:"5s" yaml:"tcp_write_timeout" json:"tcp_write_timeout,omitempty"`
WaitTimeout time.Duration `default:"7s" yaml:"wait_timeout" json:"wait_timeout,omitempty"`
TCPReadTimeout time.Duration `default:"1" yaml:"tcp_read_timeout" json:"tcp_read_timeout,omitempty"`
TCPWriteTimeout time.Duration `default:"5" yaml:"tcp_write_timeout" json:"tcp_write_timeout,omitempty"`
WaitTimeout time.Duration `default:"7" yaml:"wait_timeout" json:"wait_timeout,omitempty"`
MaxMsgLen int `default:"4096" yaml:"max_msg_len" json:"max_msg_len,omitempty"`
SessionName string `default:"rpc" yaml:"session_name" json:"session_name,omitempty"`
}

+ 11
- 11
pkg/protocol/branch/branch.go View File

@@ -36,67 +36,67 @@ const (
* The BranchStatus_Unknown.
* description:BranchStatus_Unknown branch status.
*/
BranchStatusUnknown BranchStatus = iota
BranchStatusUnknown = BranchStatus(0)

/**
* The BranchStatus_Registered.
* description:BranchStatus_Registered to TC.
*/
BranchStatusRegistered
BranchStatusRegistered = BranchStatus(1)

/**
* The Phase one done.
* description:Branch logic is successfully done at phase one.
*/
BranchStatusPhaseoneDone
BranchStatusPhaseoneDone = BranchStatus(2)

/**
* The Phase one failed.
* description:Branch logic is failed at phase one.
*/
BranchStatusPhaseoneFailed
BranchStatusPhaseoneFailed = BranchStatus(3)

/**
* The Phase one timeout.
* description:Branch logic is NOT reported for a timeout.
*/
BranchStatusPhaseoneTimeout
BranchStatusPhaseoneTimeout = BranchStatus(4)

/**
* The Phase two committed.
* description:Commit logic is successfully done at phase two.
*/
BranchStatusPhasetwoCommitted
BranchStatusPhasetwoCommitted = BranchStatus(5)

/**
* The Phase two commit failed retryable.
* description:Commit logic is failed but retryable.
*/
BranchStatusPhasetwoCommitFailedRetryable
BranchStatusPhasetwoCommitFailedRetryable = BranchStatus(6)

/**
* The Phase two commit failed unretryable.
* description:Commit logic is failed and NOT retryable.
*/
BranchStatusPhasetwoCommitFailedUnretryable
BranchStatusPhasetwoCommitFailedUnretryable = BranchStatus(7)

/**
* The Phase two rollbacked.
* description:Rollback logic is successfully done at phase two.
*/
BranchStatusPhasetwoRollbacked
BranchStatusPhasetwoRollbacked = BranchStatus(8)

/**
* The Phase two rollback failed retryable.
* description:Rollback logic is failed but retryable.
*/
BranchStatusPhasetwoRollbackFailedRetryable
BranchStatusPhasetwoRollbackFailedRetryable = BranchStatus(9)

/**
* The Phase two rollback failed unretryable.
* description:Rollback logic is failed but NOT retryable.
*/
BranchStatusPhasetwoRollbackFailedUnretryable
BranchStatusPhasetwoRollbackFailedUnretryable = BranchStatus(10)
)

func (s BranchStatus) String() string {


+ 1
- 3
pkg/protocol/codec/branch_register_req_codec.go View File

@@ -19,15 +19,13 @@ package codec

import (
"github.com/fagongzi/goetty"
)

import (
model2 "github.com/seata/seata-go/pkg/protocol/branch"
"github.com/seata/seata-go/pkg/protocol/message"
)

func init() {
GetCodecManager().RegisterCodec(CodeTypeSeata, &BranchRegisterRequestCodec{})
GetCodecManager().RegisterCodec(CodecTypeSeata, &BranchRegisterRequestCodec{})
}

type BranchRegisterRequestCodec struct {


pkg/protocol/transaction/transaction_manager.go → pkg/protocol/codec/branch_register_req_codec_test.go View File

@@ -15,28 +15,28 @@
* limitations under the License.
*/

package transaction
package codec

type GlobalTransactionRole int8
import (
"testing"

const (
LAUNCHER GlobalTransactionRole = 0
PARTICIPANT GlobalTransactionRole = 1
"github.com/seata/seata-go/pkg/protocol/branch"
"github.com/seata/seata-go/pkg/protocol/message"
"github.com/stretchr/testify/assert"
)

type TransactionManager interface {
// GlobalStatusBegin a new global transaction.
Begin(applicationId, transactionServiceGroup, name string, timeout int64) (string, error)
func TestBranchRegisterRequestCodec(t *testing.T) {
msg := message.BranchRegisterRequest{
Xid: "abc134",
ResourceId: "124",
LockKey: "a:1,b:2",
ApplicationData: []byte("abc"),
BranchType: branch.BranchTypeTCC,
}

// Global commit.
Commit(xid string) (GlobalStatus, error)
codec := BranchRegisterRequestCodec{}
bytes := codec.Encode(msg)
msg2 := codec.Decode(bytes)

//Global rollback.
Rollback(xid string) (GlobalStatus, error)

// Get current status of the give transaction.
GetStatus(xid string) (GlobalStatus, error)

// Global report.
GlobalReport(xid string, globalStatus GlobalStatus) (GlobalStatus, error)
assert.Equal(t, msg, msg2)
}

+ 6
- 19
pkg/protocol/codec/branch_register_response_codec.go View File

@@ -19,15 +19,12 @@ package codec

import (
"github.com/fagongzi/goetty"
)

import (
error2 "github.com/seata/seata-go/pkg/common/error"
"github.com/seata/seata-go/pkg/protocol/message"
"github.com/seata/seata-go/pkg/protocol/transaction"
)

func init() {
GetCodecManager().RegisterCodec(CodeTypeSeata, &BranchRegisterResponseCodec{})
GetCodecManager().RegisterCodec(CodecTypeSeata, &BranchRegisterResponseCodec{})
}

type BranchRegisterResponseCodec struct {
@@ -49,7 +46,7 @@ func (g *BranchRegisterResponseCodec) Decode(in []byte) interface{} {
}

exceptionCode := ReadByte(buf)
msg.TransactionExceptionCode = transaction.TransactionExceptionCode(exceptionCode)
msg.TransactionExceptionCode = error2.TransactionExceptionCode(exceptionCode)
msg.BranchId = int64(ReadUInt64(buf))

return msg
@@ -59,8 +56,8 @@ func (c *BranchRegisterResponseCodec) Encode(in interface{}) []byte {
buf := goetty.NewByteBuf(0)
resp, _ := in.(message.BranchRegisterResponse)

resultCode := ReadByte(buf)
if resultCode == byte(message.ResultCodeFailed) {
buf.WriteByte(byte(resp.ResultCode))
if resp.ResultCode == message.ResultCodeFailed {
var msg string
if len(resp.Msg) > 128 {
msg = resp.Msg[:128]
@@ -72,17 +69,7 @@ func (c *BranchRegisterResponseCodec) Encode(in interface{}) []byte {

buf.WriteByte(byte(resp.TransactionExceptionCode))
branchID := uint64(resp.BranchId)
branchIdBytes := []byte{
byte(branchID >> 56),
byte(branchID >> 48),
byte(branchID >> 40),
byte(branchID >> 32),
byte(branchID >> 24),
byte(branchID >> 16),
byte(branchID >> 8),
byte(branchID),
}
buf.Write(branchIdBytes)
buf.WriteUInt64(branchID)
return buf.RawBuf()
}



+ 45
- 0
pkg/protocol/codec/branch_register_response_codec_test.go View File

@@ -0,0 +1,45 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package codec

import (
error2 "github.com/seata/seata-go/pkg/common/error"
"testing"

"github.com/seata/seata-go/pkg/protocol/message"
"github.com/stretchr/testify/assert"
)

func TestBranchRegisterResponseCodec(t *testing.T) {
msg := message.BranchRegisterResponse{
AbstractTransactionResponse: message.AbstractTransactionResponse{
AbstractResultMessage: message.AbstractResultMessage{
ResultCode: message.ResultCodeFailed,
Msg: "FAILED",
},
TransactionExceptionCode: error2.TransactionExceptionCodeUnknown,
},
BranchId: 124356567,
}

codec := BranchRegisterResponseCodec{}
bytes := codec.Encode(msg)
msg2 := codec.Decode(bytes)

assert.Equal(t, msg, msg2)
}

+ 5
- 11
pkg/protocol/codec/codec.go View File

@@ -20,25 +20,19 @@ package codec
import (
"bytes"
"sync"
)

import (
"vimagination.zapto.org/byteio"
)

import (
"github.com/seata/seata-go/pkg/common/log"
"github.com/seata/seata-go/pkg/protocol/message"
"vimagination.zapto.org/byteio"
)

type CodecType byte

// TODO 待重构
const (
CodeTypeSeata = CodecType(0x1)
CodeTypeProtobuf = CodecType(0x2)
CodeTypeKRYO = CodecType(0x4)
CodeTypeFST = CodecType(0x8)
CodecTypeSeata = CodecType(0x1)
CodecTypeProtobuf = CodecType(0x2)
CodecTypeKRYO = CodecType(0x4)
CodecTypeFST = CodecType(0x8)
)

type Codec interface {


pkg/protocol/codec/codec_help.go → pkg/protocol/codec/codec_helper.go View File


+ 9
- 20
pkg/protocol/codec/common_global_end_request_codec.go View File

@@ -19,9 +19,6 @@ package codec

import (
"github.com/fagongzi/goetty"
)

import (
"github.com/seata/seata-go/pkg/protocol/message"
)

@@ -43,25 +40,17 @@ func (c *CommonGlobalEndRequestCodec) Decode(in []byte) interface{} {

buf := goetty.NewByteBuf(len(in))
buf.Write(in)
var length uint16

var xidLen int
if buf.Readable() >= 2 {
xidLen = int(ReadUInt16(buf))
}
if buf.Readable() >= xidLen {
xidBytes := make([]byte, xidLen)
xidBytes = Read(buf, xidBytes)
res.Xid = string(xidBytes)
}

var extraDataLen int
if buf.Readable() >= 2 {
extraDataLen = int(ReadUInt16(buf))
length = ReadUInt16(buf)
if length > 0 {
bytes := make([]byte, length)
res.Xid = string(Read(buf, bytes))
}
if buf.Readable() >= extraDataLen {
extraDataBytes := make([]byte, xidLen)
extraDataBytes = Read(buf, extraDataBytes)
res.ExtraData = extraDataBytes
length = ReadUInt16(buf)
if length > 0 {
bytes := make([]byte, length)
res.ExtraData = Read(buf, bytes)
}

return res


+ 3
- 6
pkg/protocol/codec/common_global_end_response_codec.go View File

@@ -19,11 +19,8 @@ package codec

import (
"github.com/fagongzi/goetty"
)

import (
transaction2 "github.com/seata/seata-go/pkg/common/error"
"github.com/seata/seata-go/pkg/protocol/message"
"github.com/seata/seata-go/pkg/protocol/transaction"
)

type CommonGlobalEndResponseCodec struct {
@@ -66,10 +63,10 @@ func (c *CommonGlobalEndResponseCodec) Decode(in []byte) interface{} {
}

exceptionCode := ReadByte(buf)
msg.TransactionExceptionCode = transaction.TransactionExceptionCode(exceptionCode)
msg.TransactionExceptionCode = transaction2.TransactionExceptionCode(exceptionCode)

globalStatus := ReadByte(buf)
msg.GlobalStatus = transaction.GlobalStatus(globalStatus)
msg.GlobalStatus = message.GlobalStatus(globalStatus)

return msg
}

+ 1
- 4
pkg/protocol/codec/common_identify_request_codec.go View File

@@ -19,9 +19,6 @@ package codec

import (
"github.com/fagongzi/goetty"
)

import (
"github.com/seata/seata-go/pkg/protocol/message"
)

@@ -80,7 +77,7 @@ func (c *AbstractIdentifyRequestCodec) Decode(in []byte) interface{} {
return msg
}
len = ReadUInt16(buf)
if len > 0 && uint16(buf.Readable()) > len {
if len > 0 && uint16(buf.Readable()) >= len {
extraDataBytes := make([]byte, len)
msg.ExtraData = Read(buf, extraDataBytes)
}


+ 0
- 3
pkg/protocol/codec/common_identify_response_codec.go View File

@@ -19,9 +19,6 @@ package codec

import (
"github.com/fagongzi/goetty"
)

import (
"github.com/seata/seata-go/pkg/protocol/message"
)



+ 1
- 4
pkg/protocol/codec/global_begin_request_codec.go View File

@@ -19,14 +19,11 @@ package codec

import (
"github.com/fagongzi/goetty"
)

import (
"github.com/seata/seata-go/pkg/protocol/message"
)

func init() {
GetCodecManager().RegisterCodec(CodeTypeSeata, &GlobalBeginRequestCodec{})
GetCodecManager().RegisterCodec(CodecTypeSeata, &GlobalBeginRequestCodec{})
}

type GlobalBeginRequestCodec struct {


+ 38
- 0
pkg/protocol/codec/global_begin_request_codec_test.go View File

@@ -0,0 +1,38 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package codec

import (
"testing"

"github.com/seata/seata-go/pkg/protocol/message"
"github.com/stretchr/testify/assert"
)

func TestGlobalBeginRequestCodec(t *testing.T) {
msg := message.GlobalBeginRequest{
Timeout: 100,
TransactionName: "SeataGoTransaction",
}

codec := GlobalBeginRequestCodec{}
bytes := codec.Encode(msg)
msg2 := codec.Decode(bytes)

assert.Equal(t, msg, msg2)
}

+ 3
- 6
pkg/protocol/codec/global_begin_response_codec.go View File

@@ -19,15 +19,12 @@ package codec

import (
"github.com/fagongzi/goetty"
)

import (
error2 "github.com/seata/seata-go/pkg/common/error"
"github.com/seata/seata-go/pkg/protocol/message"
"github.com/seata/seata-go/pkg/protocol/transaction"
)

func init() {
GetCodecManager().RegisterCodec(CodeTypeSeata, &GlobalBeginResponseCodec{})
GetCodecManager().RegisterCodec(CodecTypeSeata, &GlobalBeginResponseCodec{})
}

type GlobalBeginResponseCodec struct {
@@ -71,7 +68,7 @@ func (g *GlobalBeginResponseCodec) Decode(in []byte) interface{} {
}

exceptionCode := ReadByte(buf)
msg.TransactionExceptionCode = transaction.TransactionExceptionCode(exceptionCode)
msg.TransactionExceptionCode = error2.TransactionExceptionCode(exceptionCode)

lenth = ReadUInt16(buf)
if lenth > 0 {


+ 47
- 0
pkg/protocol/codec/global_begin_response_codec_test.go View File

@@ -0,0 +1,47 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package codec

import (
error2 "github.com/seata/seata-go/pkg/common/error"
"testing"

"github.com/seata/seata-go/pkg/protocol/message"
"github.com/stretchr/testify/assert"
)

func TestGlobalBeginResponseCodec(t *testing.T) {
msg := message.GlobalBeginResponse{
AbstractTransactionResponse: message.AbstractTransactionResponse{
AbstractResultMessage: message.AbstractResultMessage{
ResultCode: message.ResultCodeFailed,
Msg: "FAILED",
},
TransactionExceptionCode: error2.TransactionExceptionCodeBeginFailed,
},

Xid: "test-transaction-id",
ExtraData: []byte("TestExtraData"),
}

codec := GlobalBeginResponseCodec{}
bytes := codec.Encode(msg)
msg2 := codec.Decode(bytes)

assert.Equal(t, msg, msg2)
}

+ 1
- 1
pkg/protocol/codec/global_commit_req_codec.go View File

@@ -22,7 +22,7 @@ import (
)

func init() {
GetCodecManager().RegisterCodec(CodeTypeSeata, &GlobalCommitRequestCodec{})
GetCodecManager().RegisterCodec(CodecTypeSeata, &GlobalCommitRequestCodec{})
}

type GlobalCommitRequestCodec struct {


+ 40
- 0
pkg/protocol/codec/global_commit_req_codec_test.go View File

@@ -0,0 +1,40 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package codec

import (
"testing"

"github.com/seata/seata-go/pkg/protocol/message"
"github.com/stretchr/testify/assert"
)

func TestGlobalCommitRequestCodec(t *testing.T) {
msg := message.GlobalCommitRequest{
AbstractGlobalEndRequest: message.AbstractGlobalEndRequest{
Xid: "test-transaction-id",
ExtraData: []byte("TestExtraData"),
},
}

codec := GlobalCommitRequestCodec{}
bytes := codec.Encode(msg)
msg2 := codec.Decode(bytes)

assert.Equal(t, msg, msg2)
}

+ 6
- 1
pkg/protocol/codec/global_commit_response_codec.go View File

@@ -22,7 +22,7 @@ import (
)

func init() {
GetCodecManager().RegisterCodec(CodeTypeSeata, &GlobalCommitResponseCodec{})
GetCodecManager().RegisterCodec(CodecTypeSeata, &GlobalCommitResponseCodec{})
}

type GlobalCommitResponseCodec struct {
@@ -37,6 +37,11 @@ func (g *GlobalCommitResponseCodec) Decode(in []byte) interface{} {
}
}

func (g *GlobalCommitResponseCodec) Encode(in interface{}) []byte {
req := in.(message.GlobalCommitResponse)
return g.CommonGlobalEndResponseCodec.Encode(req.AbstractGlobalEndResponse)
}

func (g *GlobalCommitResponseCodec) GetMessageType() message.MessageType {
return message.MessageType_GlobalCommitResult
}

+ 45
- 0
pkg/protocol/codec/global_commit_response_codec_test.go View File

@@ -0,0 +1,45 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package codec

import (
"testing"

"github.com/seata/seata-go/pkg/protocol/message"
"github.com/stretchr/testify/assert"
)

func TestGlobalCommitResponseCodec(t *testing.T) {
msg := message.GlobalCommitResponse{
AbstractGlobalEndResponse: message.AbstractGlobalEndResponse{
AbstractTransactionResponse: message.AbstractTransactionResponse{
AbstractResultMessage: message.AbstractResultMessage{
ResultCode: message.ResultCodeFailed,
Msg: "ResultCodeFailed message",
},
},
GlobalStatus: message.GlobalStatusCommitted,
},
}

codec := GlobalCommitResponseCodec{}
bytes := codec.Encode(msg)
msg2 := codec.Decode(bytes)

assert.Equal(t, msg, msg2)
}

pkg/protocol/codec/global_report_request_codec.go.go → pkg/protocol/codec/global_report_request_codec.go View File

@@ -18,7 +18,7 @@
package codec

//func init() {
// GetCodecManager().RegisterCodec(CodeTypeSeata, &GlobalReportRequestCodec{})
// GetCodecManager().RegisterCodec(CodecTypeSeata, &GlobalReportRequestCodec{})
//}
//
//type GlobalReportRequestCodec struct {

+ 6
- 1
pkg/protocol/codec/global_report_response_codec.go View File

@@ -22,7 +22,7 @@ import (
)

func init() {
GetCodecManager().RegisterCodec(CodeTypeSeata, &GlobalReportResponseCodec{})
GetCodecManager().RegisterCodec(CodecTypeSeata, &GlobalReportResponseCodec{})
}

type GlobalReportResponseCodec struct {
@@ -37,6 +37,11 @@ func (g *GlobalReportResponseCodec) Decode(in []byte) interface{} {
}
}

func (g *GlobalReportResponseCodec) Encode(in interface{}) []byte {
req := in.(message.GlobalReportResponse)
return g.CommonGlobalEndResponseCodec.Encode(req.AbstractGlobalEndResponse)
}

func (g *GlobalReportResponseCodec) GetMessageType() message.MessageType {
return message.MessageType_GlobalReportResult
}

+ 45
- 0
pkg/protocol/codec/global_report_response_codec_test.go View File

@@ -0,0 +1,45 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package codec

import (
"testing"

"github.com/seata/seata-go/pkg/protocol/message"
"github.com/stretchr/testify/assert"
)

func TestGlobalReportResponseCodec(t *testing.T) {
msg := message.GlobalReportResponse{
AbstractGlobalEndResponse: message.AbstractGlobalEndResponse{
AbstractTransactionResponse: message.AbstractTransactionResponse{
AbstractResultMessage: message.AbstractResultMessage{
ResultCode: message.ResultCodeFailed,
Msg: "ResultCodeFailed message",
},
},
GlobalStatus: message.GlobalStatusCommitted,
},
}

codec := GlobalReportResponseCodec{}
bytes := codec.Encode(msg)
msg2 := codec.Decode(bytes)

assert.Equal(t, msg, msg2)
}

+ 2
- 2
pkg/protocol/codec/global_rollback_req_codec.go View File

@@ -22,7 +22,7 @@ import (
)

func init() {
GetCodecManager().RegisterCodec(CodeTypeSeata, &GlobalRollbackRequestCodec{})
GetCodecManager().RegisterCodec(CodecTypeSeata, &GlobalRollbackRequestCodec{})
}

type GlobalRollbackRequestCodec struct {
@@ -32,7 +32,7 @@ type GlobalRollbackRequestCodec struct {
func (g *GlobalRollbackRequestCodec) Decode(in []byte) interface{} {
req := g.CommonGlobalEndRequestCodec.Decode(in)
abstractGlobalEndRequest := req.(message.AbstractGlobalEndRequest)
return message.GlobalCommitRequest{
return message.GlobalRollbackRequest{
AbstractGlobalEndRequest: abstractGlobalEndRequest,
}
}


+ 40
- 0
pkg/protocol/codec/global_rollback_req_codec_test.go View File

@@ -0,0 +1,40 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package codec

import (
"testing"

"github.com/seata/seata-go/pkg/protocol/message"
"github.com/stretchr/testify/assert"
)

func TestGlobalRollbackRequestCodec(t *testing.T) {
msg := message.GlobalRollbackRequest{
AbstractGlobalEndRequest: message.AbstractGlobalEndRequest{
Xid: "test-transaction-id",
ExtraData: []byte("TestExtraData"),
},
}

codec := GlobalRollbackRequestCodec{}
bytes := codec.Encode(msg)
msg2 := codec.Decode(bytes)

assert.Equal(t, msg, msg2)
}

+ 6
- 1
pkg/protocol/codec/global_rollback_response_codec.go View File

@@ -22,7 +22,7 @@ import (
)

func init() {
GetCodecManager().RegisterCodec(CodeTypeSeata, &GlobalRollbackResponseCodec{})
GetCodecManager().RegisterCodec(CodecTypeSeata, &GlobalRollbackResponseCodec{})
}

type GlobalRollbackResponseCodec struct {
@@ -37,6 +37,11 @@ func (g *GlobalRollbackResponseCodec) Decode(in []byte) interface{} {
}
}

func (g *GlobalRollbackResponseCodec) Encode(in interface{}) []byte {
req := in.(message.GlobalRollbackResponse)
return g.CommonGlobalEndResponseCodec.Encode(req.AbstractGlobalEndResponse)
}

func (g *GlobalRollbackResponseCodec) GetMessageType() message.MessageType {
return message.MessageType_GlobalRollbackResult
}

+ 45
- 0
pkg/protocol/codec/global_rollback_response_codec_test.go View File

@@ -0,0 +1,45 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package codec

import (
"testing"

"github.com/seata/seata-go/pkg/protocol/message"
"github.com/stretchr/testify/assert"
)

func TestGlobalRollbackResponseCodec(t *testing.T) {
msg := message.GlobalRollbackResponse{
AbstractGlobalEndResponse: message.AbstractGlobalEndResponse{
AbstractTransactionResponse: message.AbstractTransactionResponse{
AbstractResultMessage: message.AbstractResultMessage{
ResultCode: message.ResultCodeFailed,
Msg: "ResultCodeFailed message",
},
},
GlobalStatus: message.GlobalStatusCommitted,
},
}

codec := GlobalRollbackResponseCodec{}
bytes := codec.Encode(msg)
msg2 := codec.Decode(bytes)

assert.Equal(t, msg, msg2)
}

+ 1
- 1
pkg/protocol/codec/global_status_req_codec.go View File

@@ -22,7 +22,7 @@ import (
)

func init() {
GetCodecManager().RegisterCodec(CodeTypeSeata, &GlobalStatusRequestCodec{})
GetCodecManager().RegisterCodec(CodecTypeSeata, &GlobalStatusRequestCodec{})
}

type GlobalStatusRequestCodec struct {


+ 40
- 0
pkg/protocol/codec/global_status_req_codec_test.go View File

@@ -0,0 +1,40 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package codec

import (
"testing"

"github.com/seata/seata-go/pkg/protocol/message"
"github.com/stretchr/testify/assert"
)

func TestGlobalStatusRequestCodec(t *testing.T) {
msg := message.GlobalStatusRequest{
AbstractGlobalEndRequest: message.AbstractGlobalEndRequest{
Xid: "test-transaction-id",
ExtraData: []byte("TestExtraData"),
},
}

codec := GlobalStatusRequestCodec{}
bytes := codec.Encode(msg)
msg2 := codec.Decode(bytes)

assert.Equal(t, msg, msg2)
}

+ 6
- 1
pkg/protocol/codec/global_status_response_codec.go View File

@@ -22,7 +22,7 @@ import (
)

func init() {
GetCodecManager().RegisterCodec(CodeTypeSeata, &GlobalStatusResponseCodec{})
GetCodecManager().RegisterCodec(CodecTypeSeata, &GlobalStatusResponseCodec{})
}

type GlobalStatusResponseCodec struct {
@@ -37,6 +37,11 @@ func (g *GlobalStatusResponseCodec) Decode(in []byte) interface{} {
}
}

func (g *GlobalStatusResponseCodec) Encode(in interface{}) []byte {
req := in.(message.GlobalStatusResponse)
return g.CommonGlobalEndResponseCodec.Encode(req.AbstractGlobalEndResponse)
}

func (g *GlobalStatusResponseCodec) GetMessageType() message.MessageType {
return message.MessageType_GlobalStatusResult
}

+ 45
- 0
pkg/protocol/codec/global_status_response_codec_test.go View File

@@ -0,0 +1,45 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package codec

import (
"testing"

"github.com/seata/seata-go/pkg/protocol/message"
"github.com/stretchr/testify/assert"
)

func TestGlobalStatusResponseCodec(t *testing.T) {
msg := message.GlobalStatusResponse{
AbstractGlobalEndResponse: message.AbstractGlobalEndResponse{
AbstractTransactionResponse: message.AbstractTransactionResponse{
AbstractResultMessage: message.AbstractResultMessage{
ResultCode: message.ResultCodeFailed,
Msg: "ResultCodeFailed message",
},
},
GlobalStatus: message.GlobalStatusCommitted,
},
}

codec := GlobalStatusResponseCodec{}
bytes := codec.Encode(msg)
msg2 := codec.Decode(bytes)

assert.Equal(t, msg, msg2)
}

+ 2
- 5
pkg/protocol/codec/register_rm_request_codec.go View File

@@ -19,14 +19,11 @@ package codec

import (
"github.com/fagongzi/goetty"
)

import (
"github.com/seata/seata-go/pkg/protocol/message"
)

func init() {
GetCodecManager().RegisterCodec(CodeTypeSeata, &RegisterRMRequestCodec{})
GetCodecManager().RegisterCodec(CodecTypeSeata, &RegisterRMRequestCodec{})
}

type RegisterRMRequestCodec struct {
@@ -78,7 +75,7 @@ func (c *RegisterRMRequestCodec) Encode(in interface{}) []byte {
Write16String(req.ApplicationId, buf)
Write16String(req.TransactionServiceGroup, buf)
Write16String(string(req.ExtraData), buf)
Write16String(req.ResourceIds, buf)
Write32String(req.ResourceIds, buf)

return buf.RawBuf()
}


pkg/rm/common/remoting/rm_remoting_test.go → pkg/protocol/codec/register_rm_request_codec_test.go View File

@@ -15,30 +15,29 @@
* limitations under the License.
*/

package remoting
package codec

import (
"github.com/seata/seata-go/pkg/common/log"
_ "github.com/seata/seata-go/pkg/imports"
"testing"
"github.com/seata/seata-go/pkg/protocol/message"
"github.com/seata/seata-go/pkg/protocol/resource"
"github.com/seata/seata-go/pkg/remoting/getty"
"github.com/stretchr/testify/assert"
)

func (RMRemoting) RegisterResource(resource resource.Resource) error {
req := message.RegisterRMRequest{
func TestRegisterRMRequestCodec(t *testing.T) {
msg := message.RegisterRMRequest{
AbstractIdentifyRequest: message.AbstractIdentifyRequest{
//todo replace with config
Version: "1.4.2",
ApplicationId: "tcc-sample",
TransactionServiceGroup: "my_test_tx_group",
Version: "V1,0",
ApplicationId: "TestApplicationId",
TransactionServiceGroup: "TestTransactionServiceGroup",
ExtraData: []byte("TestExtraData"),
},
ResourceIds: resource.GetResourceId(),
}
err := getty.GetGettyRemotingClient().SendAsyncRequest(req)
if err != nil {
log.Error("RegisterResourceManager error: {%#v}", err.Error())
return err
ResourceIds: "TestResourceIds",
}
return nil

codec := RegisterRMRequestCodec{}
bytes := codec.Encode(msg)
msg2 := codec.Decode(bytes)

assert.Equal(t, msg, msg2)
}

+ 6
- 1
pkg/protocol/codec/register_rm_response_codec.go View File

@@ -22,7 +22,7 @@ import (
)

func init() {
GetCodecManager().RegisterCodec(CodeTypeSeata, &RegisterRMResponseCodec{})
GetCodecManager().RegisterCodec(CodecTypeSeata, &RegisterRMResponseCodec{})
}

type RegisterRMResponseCodec struct {
@@ -37,6 +37,11 @@ func (g *RegisterRMResponseCodec) Decode(in []byte) interface{} {
}
}

func (c *RegisterRMResponseCodec) Encode(in interface{}) []byte {
resp := in.(message.RegisterRMResponse)
return c.AbstractIdentifyResponseCodec.Encode(resp.AbstractIdentifyResponse)
}

func (g *RegisterRMResponseCodec) GetMessageType() message.MessageType {
return message.MessageType_RegRmResult
}

+ 45
- 0
pkg/protocol/codec/register_rm_response_codec_test.go View File

@@ -0,0 +1,45 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package codec

import (
"testing"

"github.com/seata/seata-go/pkg/protocol/message"
"github.com/stretchr/testify/assert"
)

func TestRegisterRMResponseCodec(t *testing.T) {
msg := message.RegisterRMResponse{
AbstractIdentifyResponse: message.AbstractIdentifyResponse{
AbstractResultMessage: message.AbstractResultMessage{
ResultCode: message.ResultCodeFailed,
//Msg: "TestMsg",
},
Version: "V1,0",
Identified: false,
},
}

codec := RegisterRMResponseCodec{}
bytes := codec.Encode(msg)
msg2 := codec.Decode(bytes)

assert.Equal(t, msg.Identified, msg2.(message.RegisterRMResponse).Identified)
assert.Equal(t, msg.Version, msg2.(message.RegisterRMResponse).Version)
}

+ 1
- 1
pkg/protocol/codec/register_tm_request_codec.go View File

@@ -22,7 +22,7 @@ import (
)

func init() {
GetCodecManager().RegisterCodec(CodeTypeSeata, &RegisterTMRequestCodec{})
GetCodecManager().RegisterCodec(CodecTypeSeata, &RegisterTMRequestCodec{})
}

type RegisterTMRequestCodec struct {


+ 42
- 0
pkg/protocol/codec/register_tm_request_codec_test.go View File

@@ -0,0 +1,42 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package codec

import (
"testing"

"github.com/seata/seata-go/pkg/protocol/message"
"github.com/stretchr/testify/assert"
)

func TestRegisterTMRequestCodec(t *testing.T) {
msg := message.RegisterTMRequest{
AbstractIdentifyRequest: message.AbstractIdentifyRequest{
Version: "V1,0",
ApplicationId: "TestApplicationId",
TransactionServiceGroup: "TestTransactionServiceGroup",
ExtraData: []byte("TestExtraData"),
},
}

codec := RegisterTMRequestCodec{}
bytes := codec.Encode(msg)
msg2 := codec.Decode(bytes)

assert.Equal(t, msg, msg2)
}

+ 1
- 1
pkg/protocol/codec/register_tm_response_codec.go View File

@@ -22,7 +22,7 @@ import (
)

func init() {
GetCodecManager().RegisterCodec(CodeTypeSeata, &RegisterTMResponseCodec{})
GetCodecManager().RegisterCodec(CodecTypeSeata, &RegisterTMResponseCodec{})
}

type RegisterTMResponseCodec struct {


+ 46
- 0
pkg/protocol/codec/register_tm_response_codec_test.go View File

@@ -0,0 +1,46 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package codec

import (
"testing"

"github.com/seata/seata-go/pkg/protocol/message"
"github.com/stretchr/testify/assert"
)

func TestRegisterTMResponseCodec(t *testing.T) {
msg := message.RegisterTMResponse{
AbstractIdentifyResponse: message.AbstractIdentifyResponse{
AbstractResultMessage: message.AbstractResultMessage{
ResultCode: message.ResultCodeFailed,
Msg: "TestMsg",
},
ExtraData: []byte("TestExtraData"),
Version: "V1,0",
Identified: false,
},
}

codec := RegisterTMResponseCodec{}
bytes := codec.Encode(msg)
msg2 := codec.Decode(bytes)

assert.Equal(t, msg.Identified, msg2.(message.RegisterTMResponse).Identified)
assert.Equal(t, msg.Version, msg2.(message.RegisterTMResponse).Version)
}

+ 100
- 0
pkg/protocol/message/constant.go View File

@@ -22,6 +22,7 @@ var MAGIC_CODE_BYTES = [2]byte{0xda, 0xda}
type (
MessageType int
GettyRequestType byte
GlobalStatus int64
)

const (
@@ -166,3 +167,102 @@ const (
// Heartbeat Response
GettyRequestType_HeartbeatResponse GettyRequestType = 4
)

const (

/**
* Un known global status.
*/
// Unknown
GlobalStatusUnKnown GlobalStatus = 0

/**
* The GlobalStatusBegin.
*/
// PHASE 1: can accept new branch registering.
GlobalStatusBegin GlobalStatus = 1

/**
* PHASE 2: Running Status: may be changed any time.
*/
// Committing.
GlobalStatusCommitting GlobalStatus = 2

/**
* The Commit retrying.
*/
// Retrying commit after a recoverable failure.
GlobalStatusCommitRetrying GlobalStatus = 3

/**
* Rollbacking global status.
*/
// Rollbacking
GlobalStatusRollbacking GlobalStatus = 4

/**
* The Rollback retrying.
*/
// Retrying rollback after a recoverable failure.
GlobalStatusRollbackRetrying GlobalStatus = 5

/**
* The Timeout rollbacking.
*/
// Rollbacking since timeout
GlobalStatusTimeoutRollbacking GlobalStatus = 6

/**
* The Timeout rollback retrying.
*/
// Retrying rollback GlobalStatus = since timeout) after a recoverable failure.
GlobalStatusTimeoutRollbackRetrying GlobalStatus = 7

/**
* All branches can be async committed. The committing is NOT done yet, but it can be seen as committed for TM/RM
* client.
*/
GlobalStatusAsyncCommitting GlobalStatus = 8

/**
* PHASE 2: Final Status: will NOT change any more.
*/
// Finally: global transaction is successfully committed.
GlobalStatusCommitted GlobalStatus = 9

/**
* The Commit failed.
*/
// Finally: failed to commit
GlobalStatusCommitFailed GlobalStatus = 10

/**
* The Rollbacked.
*/
// Finally: global transaction is successfully rollbacked.
GlobalStatusRollbacked GlobalStatus = 11

/**
* The Rollback failed.
*/
// Finally: failed to rollback
GlobalStatusRollbackFailed GlobalStatus = 12

/**
* The Timeout rollbacked.
*/
// Finally: global transaction is successfully rollbacked since timeout.
GlobalStatusTimeoutRollbacked GlobalStatus = 13

/**
* The Timeout rollback failed.
*/
// Finally: failed to rollback since timeout
GlobalStatusTimeoutRollbackFailed GlobalStatus = 14

/**
* The Finished.
*/
// Not managed in session MAP any more
GlobalStatusFinished GlobalStatus = 15
)

+ 2
- 2
pkg/protocol/message/message_apis.go View File

@@ -60,6 +60,6 @@ func (resp MergeResultMessage) GetTypeCode() MessageType {
type ResultCode byte

const (
ResultCodeFailed ResultCode = iota
ResultCodeSuccess
ResultCodeFailed = ResultCode(0)
ResultCodeSuccess = ResultCode(1)
)

+ 1
- 2
pkg/protocol/message/request_message.go View File

@@ -19,7 +19,6 @@ package message

import (
model2 "github.com/seata/seata-go/pkg/protocol/branch"
"github.com/seata/seata-go/pkg/protocol/transaction"
)

type AbstractBranchEndRequest struct {
@@ -105,7 +104,7 @@ func (req GlobalLockQueryRequest) GetTypeCode() MessageType {
type GlobalReportRequest struct {
AbstractGlobalEndRequest

GlobalStatus transaction.GlobalStatus
GlobalStatus GlobalStatus
}

func (req GlobalReportRequest) GetTypeCode() MessageType {


+ 3
- 3
pkg/protocol/message/response_message.go View File

@@ -18,13 +18,13 @@
package message

import (
transaction2 "github.com/seata/seata-go/pkg/common/error"
model2 "github.com/seata/seata-go/pkg/protocol/branch"
"github.com/seata/seata-go/pkg/protocol/transaction"
)

type AbstractTransactionResponse struct {
AbstractResultMessage
TransactionExceptionCode transaction.TransactionExceptionCode
TransactionExceptionCode transaction2.TransactionExceptionCode
}

type AbstractBranchEndResponse struct {
@@ -36,7 +36,7 @@ type AbstractBranchEndResponse struct {

type AbstractGlobalEndResponse struct {
AbstractTransactionResponse
GlobalStatus transaction.GlobalStatus
GlobalStatus GlobalStatus
}

type BranchRegisterResponse struct {


+ 1
- 3
pkg/protocol/resource/resource.go View File

@@ -20,9 +20,7 @@ package resource
import (
"context"
"sync"
)

import (
"github.com/seata/seata-go/pkg/protocol/branch"
)

@@ -61,7 +59,7 @@ type ResourceManager interface {
// Unregister a Resource from the Resource Manager
UnregisterResource(resource Resource) error
// Get all resources managed by this manager
GetManagedResources() sync.Map
GetManagedResources() *sync.Map
// Get the BranchType
GetBranchType() branch.BranchType
}


+ 0
- 127
pkg/protocol/transaction/expection_code.go View File

@@ -1,127 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package transaction

type TransactionExceptionCode byte

const (
/**
* Unknown transaction exception code.
*/
TransactionExceptionCodeUnknown TransactionExceptionCode = iota

/**
* BeginFailed
*/
TransactionExceptionCodeBeginFailed

/**
* Lock key conflict transaction exception code.
*/
TransactionExceptionCodeLockKeyConflict

/**
* Io transaction exception code.
*/
IO

/**
* Branch rollback failed retriable transaction exception code.
*/
TransactionExceptionCodeBranchRollbackFailedRetriable

/**
* Branch rollback failed unretriable transaction exception code.
*/
TransactionExceptionCodeBranchRollbackFailedUnretriable

/**
* Branch register failed transaction exception code.
*/
TransactionExceptionCodeBranchRegisterFailed

/**
* Branch report failed transaction exception code.
*/
TransactionExceptionCodeBranchReportFailed

/**
* Lockable check failed transaction exception code.
*/
TransactionExceptionCodeLockableCheckFailed

/**
* Branch transaction not exist transaction exception code.
*/
TransactionExceptionCodeBranchTransactionNotExist

/**
* Global transaction not exist transaction exception code.
*/
TransactionExceptionCodeGlobalTransactionNotExist

/**
* Global transaction not active transaction exception code.
*/
TransactionExceptionCodeGlobalTransactionNotActive

/**
* Global transaction status invalid transaction exception code.
*/
TransactionExceptionCodeGlobalTransactionStatusInvalid

/**
* Failed to send branch commit request transaction exception code.
*/
TransactionExceptionCodeFailedToSendBranchCommitRequest

/**
* Failed to send branch rollback request transaction exception code.
*/
TransactionExceptionCodeFailedToSendBranchRollbackRequest

/**
* Failed to add branch transaction exception code.
*/
TransactionExceptionCodeFailedToAddBranch

/**
* Failed to lock global transaction exception code.
*/
TransactionExceptionCodeFailedLockGlobalTranscation

/**
* FailedWriteSession
*/
TransactionExceptionCodeFailedWriteSession

/**
* Failed to holder exception code
*/
FailedStore
)

type TransactionException struct {
Code TransactionExceptionCode
Message string
}

//Error 隐式继承 builtin.error 接口
func (e TransactionException) Error() string {
return "TransactionException: " + e.Message
}

+ 0
- 119
pkg/protocol/transaction/transaction_status.go View File

@@ -1,119 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package transaction

type GlobalStatus int64

const (

/**
* Un known global status.
*/
// Unknown
GlobalStatusUnKnown GlobalStatus = 0

/**
* The GlobalStatusBegin.
*/
// PHASE 1: can accept new branch registering.
GlobalStatusBegin GlobalStatus = 1

/**
* PHASE 2: Running Status: may be changed any time.
*/
// Committing.
GlobalStatusCommitting GlobalStatus = 2

/**
* The Commit retrying.
*/
// Retrying commit after a recoverable failure.
GlobalStatusCommitRetrying GlobalStatus = 3

/**
* Rollbacking global status.
*/
// Rollbacking
GlobalStatusRollbacking GlobalStatus = 4

/**
* The Rollback retrying.
*/
// Retrying rollback after a recoverable failure.
GlobalStatusRollbackRetrying GlobalStatus = 5

/**
* The Timeout rollbacking.
*/
// Rollbacking since timeout
GlobalStatusTimeoutRollbacking GlobalStatus = 6

/**
* The Timeout rollback retrying.
*/
// Retrying rollback GlobalStatus = since timeout) after a recoverable failure.
GlobalStatusTimeoutRollbackRetrying GlobalStatus = 7

/**
* All branches can be async committed. The committing is NOT done yet, but it can be seen as committed for TM/RM
* client.
*/
GlobalStatusAsyncCommitting GlobalStatus = 8

/**
* PHASE 2: Final Status: will NOT change any more.
*/
// Finally: global transaction is successfully committed.
GlobalStatusCommitted GlobalStatus = 9

/**
* The Commit failed.
*/
// Finally: failed to commit
GlobalStatusCommitFailed GlobalStatus = 10

/**
* The Rollbacked.
*/
// Finally: global transaction is successfully rollbacked.
GlobalStatusRollbacked GlobalStatus = 11

/**
* The Rollback failed.
*/
// Finally: failed to rollback
GlobalStatusRollbackFailed GlobalStatus = 12

/**
* The Timeout rollbacked.
*/
// Finally: global transaction is successfully rollbacked since timeout.
GlobalStatusTimeoutRollbacked GlobalStatus = 13

/**
* The Timeout rollback failed.
*/
// Finally: failed to rollback since timeout
GlobalStatusTimeoutRollbackFailed GlobalStatus = 14

/**
* The Finished.
*/
// Not managed in session MAP any more
GlobalStatusFinished GlobalStatus = 15
)

+ 5
- 10
pkg/remoting/getty/getty_client.go View File

@@ -20,15 +20,10 @@ package getty
import (
"sync"
"time"
)

import (
"go.uber.org/atomic"
)

import (
"github.com/seata/seata-go/pkg/protocol/codec"
"github.com/seata/seata-go/pkg/protocol/message"
"go.uber.org/atomic"
)

var (
@@ -61,7 +56,7 @@ func (client *GettyRemotingClient) SendAsyncRequest(msg interface{}) error {
rpcMessage := message.RpcMessage{
ID: int32(client.idGenerator.Inc()),
Type: msgType,
Codec: byte(codec.CodeTypeSeata),
Codec: byte(codec.CodecTypeSeata),
Compressor: 0,
Body: msg,
}
@@ -72,7 +67,7 @@ func (client *GettyRemotingClient) SendAsyncResponse(msg interface{}) error {
rpcMessage := message.RpcMessage{
ID: int32(client.idGenerator.Inc()),
Type: message.GettyRequestType_Response,
Codec: byte(codec.CodeTypeSeata),
Codec: byte(codec.CodecTypeSeata),
Compressor: 0,
Body: msg,
}
@@ -83,7 +78,7 @@ func (client *GettyRemotingClient) SendSyncRequest(msg interface{}) (interface{}
rpcMessage := message.RpcMessage{
ID: int32(client.idGenerator.Inc()),
Type: message.GettyRequestType_RequestSync,
Codec: byte(codec.CodeTypeSeata),
Codec: byte(codec.CodecTypeSeata),
Compressor: 0,
Body: msg,
}
@@ -94,7 +89,7 @@ func (client *GettyRemotingClient) SendSyncRequestWithTimeout(msg interface{}, t
rpcMessage := message.RpcMessage{
ID: int32(client.idGenerator.Inc()),
Type: message.GettyRequestType_RequestSync,
Codec: byte(codec.CodeTypeSeata),
Codec: byte(codec.CodecTypeSeata),
Compressor: 0,
Body: msg,
}


+ 0
- 6
pkg/remoting/getty/getty_remoting.go View File

@@ -20,17 +20,11 @@ package getty
import (
"sync"
"time"
)

import (
getty "github.com/apache/dubbo-getty"

gxtime "github.com/dubbogo/gost/time"

"github.com/pkg/errors"
)

import (
"github.com/seata/seata-go/pkg/common/log"
"github.com/seata/seata-go/pkg/protocol/message"
)


+ 1
- 7
pkg/remoting/getty/listener.go View File

@@ -20,19 +20,13 @@ package getty
import (
"context"
"sync"
)

import (
getty "github.com/apache/dubbo-getty"

"go.uber.org/atomic"
)

import (
"github.com/seata/seata-go/pkg/common/log"
"github.com/seata/seata-go/pkg/config"
"github.com/seata/seata-go/pkg/protocol/message"
"github.com/seata/seata-go/pkg/remoting/processor"
"go.uber.org/atomic"
)

var (


+ 0
- 7
pkg/remoting/getty/readwriter.go View File

@@ -19,17 +19,10 @@ package getty

import (
"fmt"
)

import (
getty "github.com/apache/dubbo-getty"

"github.com/fagongzi/goetty"

"github.com/pkg/errors"
)

import (
"github.com/seata/seata-go/pkg/protocol/codec"
"github.com/seata/seata-go/pkg/protocol/message"
)


+ 51
- 0
pkg/remoting/getty/readwriter_test.go View File

@@ -0,0 +1,51 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package getty

import (
"testing"

"github.com/seata/seata-go/pkg/protocol/codec"
"github.com/seata/seata-go/pkg/protocol/message"
"github.com/stretchr/testify/assert"
)

func TestRpcPackageHandler(t *testing.T) {
msg := message.RpcMessage{
ID: 1123,
Type: message.GettyRequestType_RequestSync,
Codec: byte(codec.CodecTypeSeata),
Compressor: byte(1),
HeadMap: map[string]string{
"name": " Jack",
"age": "12",
"address": "Beijing",
},
Body: message.GlobalBeginRequest{
Timeout: 100,
TransactionName: "SeataGoTransaction",
},
}

codec := RpcPackageHandler{}
bytes, err := codec.Write(nil, msg)
assert.Nil(t, err)
msg2, _, _ := codec.Read(nil, bytes)

assert.Equal(t, msg, msg2)
}

+ 0
- 6
pkg/remoting/getty/rpc_client.go View File

@@ -22,17 +22,11 @@ import (
"fmt"
"net"
"sync"
)

import (
getty "github.com/apache/dubbo-getty"

gxsync "github.com/dubbogo/gost/sync"

"github.com/pkg/errors"
)

import (
"github.com/seata/seata-go/pkg/common/log"
"github.com/seata/seata-go/pkg/config"
)


+ 0
- 2
pkg/remoting/getty/session_manager.go View File

@@ -21,9 +21,7 @@ import (
"sync"
"sync/atomic"
"time"
)

import (
getty "github.com/apache/dubbo-getty"
)



+ 0
- 2
pkg/remoting/processor/client/client_heart_beat_processon.go View File

@@ -19,9 +19,7 @@ package client

import (
"context"
)

import (
"github.com/seata/seata-go/pkg/common/log"
"github.com/seata/seata-go/pkg/protocol/message"
"github.com/seata/seata-go/pkg/remoting/getty"


+ 6
- 2
pkg/remoting/processor/client/client_on_response_processor.go View File

@@ -19,11 +19,10 @@ package client

import (
"context"
)

import (
"github.com/seata/seata-go/pkg/common/log"
"github.com/seata/seata-go/pkg/protocol/message"

getty2 "github.com/seata/seata-go/pkg/remoting/getty"
)

@@ -36,6 +35,11 @@ func init() {
getty2.GetGettyClientHandlerInstance().RegisterProcessor(message.MessageType_RegRmResult, clientOnResponseProcessor)
getty2.GetGettyClientHandlerInstance().RegisterProcessor(message.MessageType_GlobalBeginResult, clientOnResponseProcessor)
getty2.GetGettyClientHandlerInstance().RegisterProcessor(message.MessageType_GlobalCommitResult, clientOnResponseProcessor)

getty2.GetGettyClientHandlerInstance().RegisterProcessor(message.MessageType_GlobalReportResult, clientOnResponseProcessor)
getty2.GetGettyClientHandlerInstance().RegisterProcessor(message.MessageType_GlobalRollbackResult, clientOnResponseProcessor)
getty2.GetGettyClientHandlerInstance().RegisterProcessor(message.MessageType_GlobalStatusResult, clientOnResponseProcessor)
getty2.GetGettyClientHandlerInstance().RegisterProcessor(message.MessageType_RegCltResult, clientOnResponseProcessor)
}

type clientOnResponseProcessor struct {


+ 1
- 2
pkg/remoting/processor/client/rm_branch_commit_processor.go View File

@@ -19,11 +19,10 @@ package client

import (
"context"
)

import (
"github.com/seata/seata-go/pkg/common/log"
"github.com/seata/seata-go/pkg/protocol/message"

getty2 "github.com/seata/seata-go/pkg/remoting/getty"
"github.com/seata/seata-go/pkg/rm"
)


+ 3
- 4
pkg/remoting/processor/client/rm_branch_rollback_processor.go View File

@@ -19,18 +19,17 @@ package client

import (
"context"
)

import (
"github.com/seata/seata-go/pkg/common/log"
"github.com/seata/seata-go/pkg/protocol/message"

getty2 "github.com/seata/seata-go/pkg/remoting/getty"
"github.com/seata/seata-go/pkg/rm"
)

func init() {
rmBranchCommitProcessor := &rmBranchCommitProcessor{}
getty2.GetGettyClientHandlerInstance().RegisterProcessor(message.MessageType_BranchCommit, rmBranchCommitProcessor)
rmBranchRollbackProcessor := &rmBranchRollbackProcessor{}
getty2.GetGettyClientHandlerInstance().RegisterProcessor(message.MessageType_BranchRollback, rmBranchRollbackProcessor)
}

type rmBranchRollbackProcessor struct {


+ 0
- 2
pkg/remoting/processor/remoting_processor.go View File

@@ -19,9 +19,7 @@ package processor

import (
"context"
)

import (
"github.com/seata/seata-go/pkg/protocol/message"
)



+ 0
- 2
pkg/rm/common/handler/rm_handler.go View File

@@ -19,9 +19,7 @@ package handler

import (
"context"
)

import (
"github.com/seata/seata-go/pkg/common/log"
"github.com/seata/seata-go/pkg/protocol/branch"
"github.com/seata/seata-go/pkg/protocol/message"


+ 0
- 2
pkg/rm/common/handler/rm_handler_facade.go View File

@@ -20,9 +20,7 @@ package handler
import (
"context"
"sync"
)

import (
"github.com/seata/seata-go/pkg/protocol/branch"
"github.com/seata/seata-go/pkg/protocol/message"
)


+ 0
- 8
pkg/rm/common/remoting/rm_remoting.go View File

@@ -19,9 +19,7 @@ package remoting

import (
"sync"
)

import (
"github.com/seata/seata-go/pkg/common/log"
"github.com/seata/seata-go/pkg/protocol/branch"
"github.com/seata/seata-go/pkg/protocol/message"
@@ -88,12 +86,6 @@ func (r *RMRemoting) RegisterResource(resource resource.Resource) error {
}

func isRegisterSuccess(response interface{}) bool {
//if res, ok := response.(protocol.RegisterTMResponse); ok {
// return res.Identified
//} else if res, ok := response.(protocol.RegisterRMResponse); ok {
// return res.Identified
//}
//return false
if res, ok := response.(message.RegisterRMResponse); ok {
return res.Identified
}


+ 2
- 5
pkg/rm/resource_manager.go View File

@@ -21,9 +21,7 @@ import (
"context"
"fmt"
"sync"
)

import (
"github.com/seata/seata-go/pkg/protocol/branch"
"github.com/seata/seata-go/pkg/protocol/resource"
)
@@ -48,7 +46,6 @@ func GetResourceManagerInstance() *ResourceManager {
type ResourceManager struct {
}

// 将事务管理器注册到这里
func RegisterResourceManager(resourceManager resource.ResourceManager) {
resourceManagerMap.Store(resourceManager.GetBranchType(), resourceManager)
}
@@ -97,8 +94,8 @@ func (d *ResourceManager) UnregisterResource(resource resource.Resource) error {
}

// Get all resources managed by this manager
func (d *ResourceManager) GetManagedResources() sync.Map {
return resourceManagerMap
func (d *ResourceManager) GetManagedResources() *sync.Map {
return &resourceManagerMap
}

// Get the model.BranchType


+ 4
- 0
pkg/rm/rm_client.go View File

@@ -0,0 +1,4 @@
package rm

func init() {
}

+ 1
- 1
pkg/rm/tcc/api/business_action_context.go View File

@@ -19,7 +19,7 @@ package api

type BusinessActionContext struct {
Xid string
BranchId string
BranchId int64
ActionName string
ActionContext interface{}
}

+ 3
- 5
pkg/rm/tcc/tcc_resource.go View File

@@ -21,9 +21,7 @@ import (
"context"
"fmt"
"sync"
)

import (
"github.com/seata/seata-go/pkg/common/log"
"github.com/seata/seata-go/pkg/protocol/branch"
"github.com/seata/seata-go/pkg/protocol/message"
@@ -120,8 +118,8 @@ func (t *TCCResourceManager) RegisterResource(resource resource.Resource) error
return t.rmRemoting.RegisterResource(resource)
}

func (t *TCCResourceManager) GetManagedResources() sync.Map {
return t.resourceManagerMap
func (t *TCCResourceManager) GetManagedResources() *sync.Map {
return &t.resourceManagerMap
}

// Commit a branch transaction
@@ -144,7 +142,7 @@ func (t *TCCResourceManager) BranchCommit(ctx context.Context, ranchType branch.
func (t *TCCResourceManager) getBusinessActionContext(xid string, branchID int64, resourceID string, applicationData []byte) api.BusinessActionContext {
return api.BusinessActionContext{
Xid: xid,
BranchId: string(branchID),
BranchId: branchID,
ActionName: resourceID,
// todo get ActionContext
//ActionContext:,


+ 10
- 15
pkg/rm/tcc/tcc_service.go View File

@@ -21,21 +21,16 @@ import (
"context"
"encoding/json"
"fmt"
"github.com/seata/seata-go/pkg/tm"
"time"
)

import (
"github.com/pkg/errors"
)

import (
"github.com/seata/seata-go/pkg/common"
"github.com/seata/seata-go/pkg/common/log"
"github.com/seata/seata-go/pkg/common/net"
"github.com/seata/seata-go/pkg/protocol/branch"
"github.com/seata/seata-go/pkg/protocol/seatactx"
context2 "github.com/seata/seata-go/pkg/protocol/transaction"
"github.com/seata/seata-go/pkg/rm"

api2 "github.com/seata/seata-go/pkg/rm/tcc/api"
)

@@ -76,7 +71,7 @@ func NewTCCServiceProxy(tccService TCCService) TCCService {
}

func (t *TCCServiceProxy) Prepare(ctx context.Context, param interface{}) error {
if seatactx.HasXID(ctx) {
if tm.HasXID(ctx) {
err := t.RegisteBranch(ctx, param)
if err != nil {
return err
@@ -87,7 +82,7 @@ func (t *TCCServiceProxy) Prepare(ctx context.Context, param interface{}) error

func (t *TCCServiceProxy) RegisteBranch(ctx context.Context, param interface{}) error {
// register transaction branch
if !seatactx.HasXID(ctx) {
if !tm.HasXID(ctx) {
err := errors.New("BranchRegister error, xid should not be nil")
log.Errorf(err.Error())
return err
@@ -98,7 +93,7 @@ func (t *TCCServiceProxy) RegisteBranch(ctx context.Context, param interface{})
tccContextStr, _ := json.Marshal(tccContext)

branchId, err := rm.GetResourceManagerInstance().GetResourceManager(branch.BranchTypeTCC).BranchRegister(
ctx, branch.BranchTypeTCC, t.GetActionName(), "", seatactx.GetXID(ctx), string(tccContextStr), "")
ctx, branch.BranchTypeTCC, t.GetActionName(), "", tm.GetXID(ctx), string(tccContextStr), "")
if err != nil {
err = errors.New(fmt.Sprintf("BranchRegister error: %v", err.Error()))
log.Error(err.Error())
@@ -106,18 +101,18 @@ func (t *TCCServiceProxy) RegisteBranch(ctx context.Context, param interface{})
}

actionContext := &api2.BusinessActionContext{
Xid: seatactx.GetXID(ctx),
BranchId: string(branchId),
Xid: tm.GetXID(ctx),
BranchId: branchId,
ActionName: t.GetActionName(),
ActionContext: param,
}
seatactx.SetBusinessActionContext(ctx, actionContext)
tm.SetBusinessActionContext(ctx, actionContext)
return nil
}

func (t *TCCServiceProxy) GetTransactionInfo() context2.TransactionInfo {
func (t *TCCServiceProxy) GetTransactionInfo() tm.TransactionInfo {
// todo replace with config
return context2.TransactionInfo{
return tm.TransactionInfo{
TimeOut: 10000,
Name: t.GetActionName(),
//Propagation, Propagation


pkg/protocol/transaction/transaction.go → pkg/tm/constant.go View File

@@ -15,16 +15,35 @@
* limitations under the License.
*/

package transaction
package tm

type Propagation int8
import "github.com/seata/seata-go/pkg/protocol/message"

type TransactionInfo struct {
TimeOut int32
Name string
Propagation Propagation
LockRetryInternal int64
LockRetryTimes int64
const (
LAUNCHER GlobalTransactionRole = 0
PARTICIPANT GlobalTransactionRole = 1
)

type (
Propagation int8
GlobalTransactionRole int8
)

type TransactionManager interface {
// GlobalStatusBegin a new global transaction.
Begin(applicationId, transactionServiceGroup, name string, timeout int64) (string, error)

// Global commit.
Commit(xid string) (message.GlobalStatus, error)

//Global rollback.
Rollback(xid string) (message.GlobalStatus, error)

// Get current status of the give transaction.
GetStatus(xid string) (message.GlobalStatus, error)

// Global report.
GlobalReport(xid string, globalStatus message.GlobalStatus) (message.GlobalStatus, error)
}

const (
@@ -57,7 +76,7 @@ const (
* </pre></code>
* </p>
*/
REQUIRED Propagation = iota
REQUIRED = Propagation(0)

/**
* The REQUIRES_NEW.
@@ -90,7 +109,7 @@ const (
* </pre></code>
* </p>
*/
REQUIRES_NEW
REQUIRES_NEW = Propagation(1)

/**
* The NOT_SUPPORTED.
@@ -115,7 +134,7 @@ const (
* </pre></code>
* </p>
*/
NOT_SUPPORTED
NOT_SUPPORTED = Propagation(2)

/**
* The SUPPORTS.
@@ -136,7 +155,7 @@ const (
* </pre></code>
* </p>
*/
SUPPORTS
SUPPORTS = Propagation(3)

/**
* The NEVER.
@@ -156,7 +175,7 @@ const (
* </pre></code>
* </p>
*/
NEVER
NEVER = Propagation(4)

/**
* The MANDATORY.
@@ -176,5 +195,5 @@ const (
* </pre></code>
* </p>
*/
MANDATORY
MANDATORY = Propagation(5)
)

pkg/protocol/seatactx/context.go → pkg/tm/context.go View File

@@ -15,48 +15,51 @@
* limitations under the License.
*/

package seatactx
package tm

import (
"context"
"github.com/seata/seata-go/pkg/protocol/message"
"github.com/seata/seata-go/pkg/rm/tcc/api"
)

import (
"github.com/seata/seata-go/pkg/common"
"github.com/seata/seata-go/pkg/protocol/transaction"
"github.com/seata/seata-go/pkg/rm/tcc/api"
type ContextParam string

const (
seataContextVariable = ContextParam("seataContextVariable")
tccBusinessActionContext = ContextParam("tcc-business-action-context")
)

type ContextVariable struct {
TxName string
Xid string
Status *transaction.GlobalStatus
TxRole *transaction.GlobalTransactionRole
Status *message.GlobalStatus
TxRole *GlobalTransactionRole
BusinessActionContext *api.BusinessActionContext
TxStatus *transaction.GlobalStatus
TxStatus *message.GlobalStatus
}

func InitSeataContext(ctx context.Context) context.Context {
return context.WithValue(ctx, common.CONTEXT_VARIABLE, &ContextVariable{})
return context.WithValue(ctx, seataContextVariable, &ContextVariable{})
}

func GetTxStatus(ctx context.Context) *transaction.GlobalStatus {
variable := ctx.Value(common.CONTEXT_VARIABLE)
func GetTxStatus(ctx context.Context) *message.GlobalStatus {
variable := ctx.Value(seataContextVariable)
if variable == nil {
return nil
}
return variable.(*ContextVariable).TxStatus
}

func SetTxStatus(ctx context.Context, status transaction.GlobalStatus) {
variable := ctx.Value(common.CONTEXT_VARIABLE)
func SetTxStatus(ctx context.Context, status message.GlobalStatus) {
variable := ctx.Value(seataContextVariable)
if variable != nil {
variable.(*ContextVariable).TxStatus = &status
}
}

func GetTxName(ctx context.Context) string {
variable := ctx.Value(common.CONTEXT_VARIABLE)
variable := ctx.Value(seataContextVariable)
if variable == nil {
return ""
}
@@ -64,18 +67,18 @@ func GetTxName(ctx context.Context) string {
}

func SetTxName(ctx context.Context, name string) {
variable := ctx.Value(common.TccBusinessActionContext)
variable := ctx.Value(tccBusinessActionContext)
if variable != nil {
variable.(*ContextVariable).TxName = name
}
}

func IsSeataContext(ctx context.Context) bool {
return ctx.Value(common.CONTEXT_VARIABLE) != nil
return ctx.Value(seataContextVariable) != nil
}

func GetBusinessActionContext(ctx context.Context) *api.BusinessActionContext {
variable := ctx.Value(common.TccBusinessActionContext)
variable := ctx.Value(tccBusinessActionContext)
if variable == nil {
return nil
}
@@ -83,29 +86,29 @@ func GetBusinessActionContext(ctx context.Context) *api.BusinessActionContext {
}

func SetBusinessActionContext(ctx context.Context, businessActionContext *api.BusinessActionContext) {
variable := ctx.Value(common.TccBusinessActionContext)
variable := ctx.Value(tccBusinessActionContext)
if variable != nil {
variable.(*ContextVariable).BusinessActionContext = businessActionContext
}
}

func GetTransactionRole(ctx context.Context) *transaction.GlobalTransactionRole {
variable := ctx.Value(common.CONTEXT_VARIABLE)
func GetTransactionRole(ctx context.Context) *GlobalTransactionRole {
variable := ctx.Value(seataContextVariable)
if variable == nil {
return nil
}
return variable.(*ContextVariable).TxRole
}

func SetTransactionRole(ctx context.Context, role transaction.GlobalTransactionRole) {
variable := ctx.Value(common.CONTEXT_VARIABLE)
func SetTransactionRole(ctx context.Context, role GlobalTransactionRole) {
variable := ctx.Value(seataContextVariable)
if variable != nil {
variable.(*ContextVariable).TxRole = &role
}
}

func GetXID(ctx context.Context) string {
variable := ctx.Value(common.CONTEXT_VARIABLE)
variable := ctx.Value(seataContextVariable)
if variable == nil {
return ""
}
@@ -117,14 +120,14 @@ func HasXID(ctx context.Context) bool {
}

func SetXID(ctx context.Context, xid string) {
variable := ctx.Value(common.CONTEXT_VARIABLE)
variable := ctx.Value(seataContextVariable)
if variable != nil {
variable.(*ContextVariable).Xid = xid
}
}

func UnbindXid(ctx context.Context) {
variable := ctx.Value(common.CONTEXT_VARIABLE)
variable := ctx.Value(seataContextVariable)
if variable != nil {
variable.(*ContextVariable).Xid = ""
}

pkg/protocol/transaction/manager/global_transaction_manager.go → pkg/tm/global_transaction.go View File

@@ -15,31 +15,23 @@
* limitations under the License.
*/

package manager
package tm

import (
"context"
"fmt"
"sync"
)

import (
"github.com/pkg/errors"
)

import (
"github.com/seata/seata-go/pkg/common/log"
"github.com/seata/seata-go/pkg/protocol/message"
"github.com/seata/seata-go/pkg/protocol/seatactx"
"github.com/seata/seata-go/pkg/protocol/transaction"
"github.com/seata/seata-go/pkg/remoting/getty"
"github.com/seata/seata-go/pkg/tm/api"
)

type GlobalTransaction struct {
Xid string
Status transaction.GlobalStatus
Role transaction.GlobalTransactionRole
Status message.GlobalStatus
Role GlobalTransactionRole
}

var (
@@ -62,7 +54,7 @@ type GlobalTransactionManager struct {

// Begin a new global transaction with given timeout and given name.
func (g *GlobalTransactionManager) Begin(ctx context.Context, gtr *GlobalTransaction, timeout int32, name string) error {
if gtr.Role != transaction.LAUNCHER {
if gtr.Role != LAUNCHER {
log.Infof("Ignore GlobalStatusBegin(): just involved in global transaction %s", gtr.Xid)
return nil
}
@@ -85,15 +77,15 @@ func (g *GlobalTransactionManager) Begin(ctx context.Context, gtr *GlobalTransac
}
log.Infof("GlobalBeginRequest success, xid %s, res %v", gtr.Xid, res)

gtr.Status = transaction.GlobalStatusBegin
gtr.Status = message.GlobalStatusBegin
gtr.Xid = res.(message.GlobalBeginResponse).Xid
seatactx.SetXID(ctx, res.(message.GlobalBeginResponse).Xid)
SetXID(ctx, res.(message.GlobalBeginResponse).Xid)
return nil
}

// Commit the global transaction.
func (g *GlobalTransactionManager) Commit(ctx context.Context, gtr *GlobalTransaction) error {
if gtr.Role != transaction.LAUNCHER {
if gtr.Role != LAUNCHER {
log.Infof("Ignore Commit(): just involved in global gtr [{}]", gtr.Xid)
return nil
}
@@ -122,14 +114,14 @@ func (g *GlobalTransactionManager) Commit(ctx context.Context, gtr *GlobalTransa
if err == nil && res != nil {
gtr.Status = res.(message.GlobalCommitResponse).GlobalStatus
}
seatactx.UnbindXid(ctx)
UnbindXid(ctx)
log.Infof("GlobalCommitRequest commit success, xid %s", gtr.Xid)
return err
}

// Rollback the global transaction.
func (g *GlobalTransactionManager) Rollback(ctx context.Context, gtr *GlobalTransaction) error {
if gtr.Role != transaction.LAUNCHER {
if gtr.Role != LAUNCHER {
log.Infof("Ignore Commit(): just involved in global gtr [{}]", gtr.Xid)
return nil
}
@@ -158,21 +150,21 @@ func (g *GlobalTransactionManager) Rollback(ctx context.Context, gtr *GlobalTran
if err == nil && res != nil {
gtr.Status = res.(message.GlobalRollbackResponse).GlobalStatus
}
seatactx.UnbindXid(ctx)
UnbindXid(ctx)
return err
}

// Suspend the global transaction.
func (g *GlobalTransactionManager) Suspend() (api.SuspendedResourcesHolder, error) {
func (g *GlobalTransactionManager) Suspend() (SuspendedResourcesHolder, error) {
panic("implement me")
}

// Resume the global transaction.
func (g *GlobalTransactionManager) Resume(suspendedResourcesHolder api.SuspendedResourcesHolder) error {
func (g *GlobalTransactionManager) Resume(suspendedResourcesHolder SuspendedResourcesHolder) error {
panic("implement me")
}

// report the global transaction status.
func (g *GlobalTransactionManager) GlobalReport(globalStatus transaction.GlobalStatus) error {
func (g *GlobalTransactionManager) GlobalReport(globalStatus message.GlobalStatus) error {
panic("implement me")
}

pkg/tm/api/suspended_resources_holder.go → pkg/tm/suspended_resources_holder.go View File

@@ -15,7 +15,7 @@
* limitations under the License.
*/

package api
package tm

type SuspendedResourcesHolder struct {
Xid string

pkg/protocol/transaction/api/api.go → pkg/tm/transaction_executor.go View File

@@ -15,58 +15,61 @@
* limitations under the License.
*/

package api
package tm

import (
"context"
"fmt"
)

import (
"github.com/seata/seata-go/pkg/common/log"
"github.com/seata/seata-go/pkg/protocol/seatactx"
"github.com/seata/seata-go/pkg/protocol/transaction"
"github.com/seata/seata-go/pkg/protocol/transaction/manager"
"github.com/seata/seata-go/pkg/protocol/message"
)

type TransactionInfo struct {
TimeOut int32
Name string
Propagation Propagation
LockRetryInternal int64
LockRetryTimes int64
}

type TransactionalExecutor interface {
Execute(ctx context.Context, param interface{}) (interface{}, error)
GetTransactionInfo() transaction.TransactionInfo
GetTransactionInfo() TransactionInfo
}

func Begin(ctx context.Context, name string) context.Context {
if !seatactx.IsSeataContext(ctx) {
ctx = seatactx.InitSeataContext(ctx)
if !IsSeataContext(ctx) {
ctx = InitSeataContext(ctx)
}

seatactx.SetTxName(ctx, name)
if seatactx.GetTransactionRole(ctx) == nil {
seatactx.SetTransactionRole(ctx, transaction.LAUNCHER)
SetTxName(ctx, name)
if GetTransactionRole(ctx) == nil {
SetTransactionRole(ctx, LAUNCHER)
}

var tx *manager.GlobalTransaction
if seatactx.HasXID(ctx) {
tx = &manager.GlobalTransaction{
Xid: seatactx.GetXID(ctx),
Status: transaction.GlobalStatusBegin,
Role: transaction.PARTICIPANT,
var tx *GlobalTransaction
if HasXID(ctx) {
tx = &GlobalTransaction{
Xid: GetXID(ctx),
Status: message.GlobalStatusBegin,
Role: PARTICIPANT,
}
seatactx.SetTxStatus(ctx, transaction.GlobalStatusBegin)
SetTxStatus(ctx, message.GlobalStatusBegin)
}

// todo: Handle the transaction propagation.

if tx == nil {
tx = &manager.GlobalTransaction{
Xid: seatactx.GetXID(ctx),
Status: transaction.GlobalStatusUnKnown,
Role: transaction.LAUNCHER,
tx = &GlobalTransaction{
Xid: GetXID(ctx),
Status: message.GlobalStatusUnKnown,
Role: LAUNCHER,
}
seatactx.SetTxStatus(ctx, transaction.GlobalStatusUnKnown)
SetTxStatus(ctx, message.GlobalStatusUnKnown)
}

// todo timeout should read from config
err := manager.GetGlobalTransactionManager().Begin(ctx, tx, 50, name)
err := GetGlobalTransactionManager().Begin(ctx, tx, 50, name)
if err != nil {
panic(fmt.Sprintf("transactionTemplate: begin transaction failed, error %v", err))
}
@@ -76,20 +79,20 @@ func Begin(ctx context.Context, name string) context.Context {

// commit global transaction
func CommitOrRollback(ctx context.Context, err error) error {
tx := &manager.GlobalTransaction{
Xid: seatactx.GetXID(ctx),
Status: *seatactx.GetTxStatus(ctx),
Role: *seatactx.GetTransactionRole(ctx),
tx := &GlobalTransaction{
Xid: GetXID(ctx),
Status: *GetTxStatus(ctx),
Role: *GetTransactionRole(ctx),
}

var resp error
if err == nil {
resp = manager.GetGlobalTransactionManager().Commit(ctx, tx)
resp = GetGlobalTransactionManager().Commit(ctx, tx)
if resp != nil {
log.Infof("transactionTemplate: commit transaction failed, error %v", err)
}
} else {
resp = manager.GetGlobalTransactionManager().Rollback(ctx, tx)
resp = GetGlobalTransactionManager().Rollback(ctx, tx)
if resp != nil {
log.Infof("transactionTemplate: Rollback transaction failed, error %v", err)
}

+ 1
- 4
test/rpc_remoting_client_test.go View File

@@ -19,10 +19,7 @@ package test

import (
"testing"
"time"
)

import (
_ "github.com/seata/seata-go/pkg/imports"
)

@@ -40,5 +37,5 @@ func TestSendMsgWithResponse(test *testing.T) {
//}
//handler := GetGettyClientHandlerInstance()
//handler.sendMergedMessage(mergedMessage)
time.Sleep(100000 * time.Second)
//time.Sleep(100000 * time.Second)
}

+ 5
- 5
test/tcc_service_test.go View File

@@ -19,14 +19,14 @@ package test

import (
"context"
"github.com/seata/seata-go/pkg/tm"
"testing"
"time"
)

import (
"github.com/seata/seata-go/pkg/common/log"

_ "github.com/seata/seata-go/pkg/imports"
txapi "github.com/seata/seata-go/pkg/protocol/transaction/api"
"github.com/seata/seata-go/pkg/rm/tcc"
"github.com/seata/seata-go/pkg/rm/tcc/api"
)
@@ -77,9 +77,9 @@ func (T TestTCCServiceBusiness2) GetActionName() string {

func TestNew(test *testing.T) {
var err error
ctx := txapi.Begin(context.Background(), "TestTCCServiceBusiness")
ctx := tm.Begin(context.Background(), "TestTCCServiceBusiness")
defer func() {
resp := txapi.CommitOrRollback(ctx, err)
resp := tm.CommitOrRollback(ctx, err)
log.Infof("tx result %v", resp)
}()



Loading…
Cancel
Save