Browse Source

feature add fence for tcc, and add fence sample in tcc local mode. (#191)

tags/1.0.2-RC1
juzimao GitHub 3 years ago
parent
commit
1fc63e6da6
No known key found for this signature in database GPG Key ID: 4AEE18F83AFDEB23
38 changed files with 1750 additions and 172 deletions
  1. +1
    -0
      go.mod
  2. +1
    -0
      go.sum
  3. +0
    -137
      pkg/common/error/error.go
  4. +56
    -0
      pkg/common/errors/error.go
  5. +103
    -0
      pkg/common/errors/error_code.go
  6. +4
    -3
      pkg/protocol/codec/branch_commit_response_codec.go
  7. +3
    -4
      pkg/protocol/codec/branch_commit_response_codec_test.go
  8. +3
    -3
      pkg/protocol/codec/branch_register_response_codec.go
  9. +2
    -2
      pkg/protocol/codec/branch_register_response_codec_test.go
  10. +3
    -3
      pkg/protocol/codec/branch_rollback_response_codec.go
  11. +2
    -2
      pkg/protocol/codec/branch_rollback_response_codec_test.go
  12. +3
    -3
      pkg/protocol/codec/common_global_end_response_codec.go
  13. +3
    -3
      pkg/protocol/codec/global_begin_response_codec.go
  14. +3
    -3
      pkg/protocol/codec/global_begin_response_codec_test.go
  15. +2
    -2
      pkg/protocol/message/response_message.go
  16. +2
    -2
      pkg/remoting/processor/client/rm_branch_commit_processor.go
  17. +1
    -1
      pkg/remoting/processor/client/rm_branch_rollback_processor.go
  18. +41
    -0
      pkg/rm/tcc/fence/config/tcc_fence_config.go
  19. +35
    -0
      pkg/rm/tcc/fence/enum/fence_phase.go
  20. +35
    -0
      pkg/rm/tcc/fence/enum/fence_status.go
  21. +69
    -0
      pkg/rm/tcc/fence/fence_api.go
  22. +85
    -0
      pkg/rm/tcc/fence/fence_api_test.go
  23. +250
    -0
      pkg/rm/tcc/fence/handler/tcc_fence_wrapper_handler.go
  24. +69
    -0
      pkg/rm/tcc/fence/store/db/dao/store_api.go
  25. +186
    -0
      pkg/rm/tcc/fence/store/db/dao/tcc_fence_db.go
  26. +205
    -0
      pkg/rm/tcc/fence/store/db/dao/tcc_fence_db_test.go
  27. +46
    -0
      pkg/rm/tcc/fence/store/db/model/tcc_fence_do.go
  28. +67
    -0
      pkg/rm/tcc/fence/store/db/sql/tcc_fence_store_sql.go
  29. +19
    -2
      pkg/rm/tcc/tcc_resource.go
  30. +7
    -2
      pkg/rm/tcc/tcc_service.go
  31. +17
    -0
      pkg/tm/context.go
  32. +17
    -0
      pkg/tm/context_test.go
  33. +9
    -0
      sample/tcc/fence/README_ZH.md
  34. +54
    -0
      sample/tcc/fence/cmd/main.go
  35. +31
    -0
      sample/tcc/fence/script/mysql.sql
  36. +30
    -0
      sample/tcc/fence/script/oracle.sql
  37. +30
    -0
      sample/tcc/fence/script/postgresql.sql
  38. +256
    -0
      sample/tcc/fence/service/service.go

+ 1
- 0
go.mod View File

@@ -5,6 +5,7 @@ go 1.16
require (
dubbo.apache.org/dubbo-go/v3 v3.0.2-0.20220508105316-b27ec53b7bab
github.com/BurntSushi/toml v1.1.0 // indirect
github.com/DATA-DOG/go-sqlmock v1.5.0
github.com/agiledragon/gomonkey v2.0.2+incompatible
github.com/apache/dubbo-getty v1.4.8
github.com/dubbogo/gost v1.12.5


+ 1
- 0
go.sum View File

@@ -56,6 +56,7 @@ github.com/BurntSushi/toml v1.1.0/go.mod h1:CxXYINrC8qIiEnFrOxCa7Jy5BFHlXnUU2pbi
github.com/BurntSushi/xgb v0.0.0-20160522181843-27f122750802/go.mod h1:IVnqGOEym/WlBOVXweHU+Q+/VP0lqqI8lqeDx9IjBqo=
github.com/CloudyKit/fastprinter v0.0.0-20170127035650-74b38d55f37a/go.mod h1:EFZQ978U7x8IRnstaskI3IysnWY5Ao3QgZUKOXlsAdw=
github.com/CloudyKit/jet v2.1.3-0.20180809161101-62edd43e4f88+incompatible/go.mod h1:HPYO+50pSWkPoj9Q/eq0aRGByCL6ScRlUmiEX5Zgm+w=
github.com/DATA-DOG/go-sqlmock v1.5.0 h1:Shsta01QNfFxHCfpW6YH2STWB0MudeXXEWMr20OEh60=
github.com/DATA-DOG/go-sqlmock v1.5.0/go.mod h1:f/Ixk793poVmq4qj/V1dPUg2JEAKC73Q5eFN3EC/SaM=
github.com/DataDog/datadog-go v3.2.0+incompatible/go.mod h1:LButxg5PwREeZtORoXG3tL4fMGNddJ+vMq1mwgfaqoQ=
github.com/DataDog/zstd v1.4.5/go.mod h1:1jcaCB/ufaK+sKp1NBhlGmpz41jOoPQ35bpF36t7BBo=


+ 0
- 137
pkg/common/error/error.go View File

@@ -1,137 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package error

import "github.com/pkg/errors"

var (
ErrorTooManySessions = errors.New("too many seeessions")
ErrorHeartBeatTimeOut = errors.New("heart beat time out")
)

type TransactionExceptionCode byte

const (
/**
* Unknown transaction exception code.
*/
TransactionExceptionCodeUnknown = TransactionExceptionCode(0)

/**
* BeginFailed
*/
TransactionExceptionCodeBeginFailed = TransactionExceptionCode(1)

/**
* Lock key conflict transaction exception code.
*/
TransactionExceptionCodeLockKeyConflict = TransactionExceptionCode(2)

/**
* Io transaction exception code.
*/
IO = TransactionExceptionCode(3)
/**
* Branch rollback failed retriable transaction exception code.
*/
TransactionExceptionCodeBranchRollbackFailedRetriable = TransactionExceptionCode(4)

/**
* Branch rollback failed unretriable transaction exception code.
*/
TransactionExceptionCodeBranchRollbackFailedUnretriable = TransactionExceptionCode(5)

/**
* Branch register failed transaction exception code.
*/
TransactionExceptionCodeBranchRegisterFailed = TransactionExceptionCode(6)

/**
* Branch report failed transaction exception code.
*/
TransactionExceptionCodeBranchReportFailed = TransactionExceptionCode(7)

/**
* Lockable check failed transaction exception code.
*/
TransactionExceptionCodeLockableCheckFailed = TransactionExceptionCode(8)

/**
* Branch transaction not exist transaction exception code.
*/
TransactionExceptionCodeBranchTransactionNotExist = TransactionExceptionCode(9)

/**
* Global transaction not exist transaction exception code.
*/
TransactionExceptionCodeGlobalTransactionNotExist = TransactionExceptionCode(10)

/**
* Global transaction not active transaction exception code.
*/
TransactionExceptionCodeGlobalTransactionNotActive = TransactionExceptionCode(11)

/**
* Global transaction status invalid transaction exception code.
*/
TransactionExceptionCodeGlobalTransactionStatusInvalid = TransactionExceptionCode(12)

/**
* Failed to send branch commit request transaction exception code.
*/
TransactionExceptionCodeFailedToSendBranchCommitRequest = TransactionExceptionCode(13)

/**
* Failed to send branch rollback request transaction exception code.
*/
TransactionExceptionCodeFailedToSendBranchRollbackRequest = TransactionExceptionCode(14)

/**
* Failed to add branch transaction exception code.
*/
TransactionExceptionCodeFailedToAddBranch = TransactionExceptionCode(15)

/**
* Failed to lock global transaction exception code.
*/
TransactionExceptionCodeFailedLockGlobalTranscation = TransactionExceptionCode(16)

/**
* FailedWriteSession
*/
TransactionExceptionCodeFailedWriteSession = TransactionExceptionCode(17)

/**
* Failed to holder exception code
*/
FailedStore = TransactionExceptionCode(18)

/**
* Lock key conflict fail fast transaction exception code.
*/
LockKeyConflictFailFast = TransactionExceptionCode(19)
)

type TransactionException struct {
Code TransactionExceptionCode
Message string
}

func (e TransactionException) Error() string {
return "TransactionException: " + e.Message
}

+ 56
- 0
pkg/common/errors/error.go View File

@@ -0,0 +1,56 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package errors

import (
"fmt"

"github.com/pkg/errors"
)

var (
ErrorTooManySessions = errors.New("too many sessions")
ErrorHeartBeatTimeOut = errors.New("heart beat time out")
)

type TransactionError struct {
Code byte
Message string
}

func (e TransactionError) Error() string {
return fmt.Sprintf("TransactionError code %d, msg %s", e.Code, e.Message)
}

type TccFenceError struct {
Code TransactionErrorCode
Message string
Parent error
}

func (e TccFenceError) Error() string {
return fmt.Sprintf("TccFenceError code %d, msg %s, parent msg is %s", e.Code, e.Message, e.Parent)
}

func NewTccFenceError(code TransactionErrorCode, msg string, parent error) *TccFenceError {
return &TccFenceError{
code,
msg,
parent,
}
}

+ 103
- 0
pkg/common/errors/error_code.go View File

@@ -0,0 +1,103 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package errors

type TransactionErrorCode int32

const (
// TransactionErrorCodeUnknown Unknown transaction errors code.
TransactionErrorCodeUnknown = TransactionErrorCode(0)

// TransactionErrorCodeBeginFailed BeginFailed
TransactionErrorCodeBeginFailed = TransactionErrorCode(1)

// TransactionErrorCodeLockKeyConflict Lock key conflict transaction errors code.
TransactionErrorCodeLockKeyConflict = TransactionErrorCode(2)

// Io transaction errors code.
IO = TransactionErrorCode(3)

// TransactionErrorCodeBranchRollbackFailedRetriable Branch rollback failed retriable transaction errors code.
TransactionErrorCodeBranchRollbackFailedRetriable = TransactionErrorCode(4)

// TransactionErrorCodeBranchRollbackFailedUnretriable Branch rollback failed unretriable transaction errors code.
TransactionErrorCodeBranchRollbackFailedUnretriable = TransactionErrorCode(5)

// TransactionErrorCodeBranchRegisterFailed Branch register failed transaction errors code.
TransactionErrorCodeBranchRegisterFailed = TransactionErrorCode(6)

// TransactionErrorCodeBranchReportFailed Branch report failed transaction errors code.
TransactionErrorCodeBranchReportFailed = TransactionErrorCode(7)

// TransactionErrorCodeLockableCheckFailed Lockable check failed transaction errors code.
TransactionErrorCodeLockableCheckFailed = TransactionErrorCode(8)

// TransactionErrorCodeBranchTransactionNotExist Branch transaction not exist transaction errors code.
TransactionErrorCodeBranchTransactionNotExist = TransactionErrorCode(9)

// TransactionErrorCodeGlobalTransactionNotExist Global transaction not exist transaction errors code.
TransactionErrorCodeGlobalTransactionNotExist = TransactionErrorCode(10)

// TransactionErrorCodeGlobalTransactionNotActive Global transaction not active transaction errors code.
TransactionErrorCodeGlobalTransactionNotActive = TransactionErrorCode(11)

// TransactionErrorCodeGlobalTransactionStatusInvalid Global transaction status invalid transaction errors code.
TransactionErrorCodeGlobalTransactionStatusInvalid = TransactionErrorCode(12)

// TransactionErrorCodeFailedToSendBranchCommitRequest Failed to send branch commit request transaction errors code.
TransactionErrorCodeFailedToSendBranchCommitRequest = TransactionErrorCode(13)

// TransactionErrorCodeFailedToSendBranchRollbackRequest Failed to send branch rollback request transaction errors code.
TransactionErrorCodeFailedToSendBranchRollbackRequest = TransactionErrorCode(14)

// TransactionErrorCodeFailedToAddBranch Failed to add branch transaction errors code.
TransactionErrorCodeFailedToAddBranch = TransactionErrorCode(15)

// TransactionErrorCodeFailedLockGlobalTranscation Failed to lock global transaction errors code.
TransactionErrorCodeFailedLockGlobalTranscation = TransactionErrorCode(16)

// TransactionErrorCodeFailedWriteSession FailedWriteSession
TransactionErrorCodeFailedWriteSession = TransactionErrorCode(17)

// FailedStore Failed to holder errors code
FailedStore = TransactionErrorCode(18)

// LockKeyConflictFailFast Lock key conflict fail fast transaction exception code.
LockKeyConflictFailFast = TransactionErrorCode(19)

// TccFenceDbDuplicateKeyError Insert tcc fence record duplicate key errors
TccFenceDbDuplicateKeyError = TransactionErrorCode(20)

// RollbackFenceError rollback tcc fence error
RollbackFenceError = TransactionErrorCode(21)

// CommitFenceError commit tcc fence error
CommitFenceError = TransactionErrorCode(22)

// TccFenceDbError query tcc fence prepare sql failed
TccFenceDbError = TransactionErrorCode(23)

// PrepareFenceError prepare tcc fence error
PrepareFenceError = TransactionErrorCode(24)

// FenceBusinessError callback business method maybe return this error type
FenceBusinessError = TransactionErrorCode(26)

// FencePhaseError have fence phase but is not illegal value
FencePhaseError = TransactionErrorCode(27)
)

+ 4
- 3
pkg/protocol/codec/branch_commit_response_codec.go View File

@@ -20,8 +20,9 @@ package codec
import (
"math"

serror "github.com/seata/seata-go/pkg/common/errors"

"github.com/seata/seata-go/pkg/common/bytes"
serror "github.com/seata/seata-go/pkg/common/error"
"github.com/seata/seata-go/pkg/protocol/branch"
"github.com/seata/seata-go/pkg/protocol/message"
)
@@ -41,7 +42,7 @@ func (g *BranchCommitResponseCodec) Decode(in []byte) interface{} {
if data.ResultCode == message.ResultCodeFailed {
data.Msg = bytes.ReadString8Length(buf)
}
data.TransactionExceptionCode = serror.TransactionExceptionCode(bytes.ReadByte(buf))
data.TransactionErrorCode = serror.TransactionErrorCode(bytes.ReadByte(buf))
data.Xid = bytes.ReadString16Length(buf)
data.BranchId = int64(bytes.ReadUInt64(buf))
data.BranchStatus = branch.BranchStatus(bytes.ReadByte(buf))
@@ -61,7 +62,7 @@ func (g *BranchCommitResponseCodec) Encode(in interface{}) []byte {
}
bytes.WriteString8Length(msg, buf)
}
buf.WriteByte(byte(data.TransactionExceptionCode))
buf.WriteByte(byte(data.TransactionErrorCode))
bytes.WriteString16Length(data.Xid, buf)
buf.WriteInt64(data.BranchId)
buf.WriteByte(byte(data.BranchStatus))


+ 3
- 4
pkg/protocol/codec/branch_commit_response_codec_test.go View File

@@ -20,12 +20,11 @@ package codec
import (
"testing"

serror "github.com/seata/seata-go/pkg/common/error"
"github.com/stretchr/testify/assert"

serror "github.com/seata/seata-go/pkg/common/errors"
model2 "github.com/seata/seata-go/pkg/protocol/branch"

"github.com/seata/seata-go/pkg/protocol/message"
"github.com/stretchr/testify/assert"
)

func TestBranchCommitResponseCodec(t *testing.T) {
@@ -35,7 +34,7 @@ func TestBranchCommitResponseCodec(t *testing.T) {
BranchId: 56678,
BranchStatus: model2.BranchStatusPhaseoneFailed,
AbstractTransactionResponse: message.AbstractTransactionResponse{
TransactionExceptionCode: serror.TransactionExceptionCodeBeginFailed,
TransactionErrorCode: serror.TransactionErrorCodeBeginFailed,
AbstractResultMessage: message.AbstractResultMessage{
ResultCode: message.ResultCodeFailed,
Msg: "FAILED",


+ 3
- 3
pkg/protocol/codec/branch_register_response_codec.go View File

@@ -21,7 +21,7 @@ import (
"math"

"github.com/seata/seata-go/pkg/common/bytes"
serror "github.com/seata/seata-go/pkg/common/error"
serror "github.com/seata/seata-go/pkg/common/errors"
"github.com/seata/seata-go/pkg/protocol/message"
)

@@ -40,7 +40,7 @@ func (g *BranchRegisterResponseCodec) Decode(in []byte) interface{} {
if data.ResultCode == message.ResultCodeFailed {
data.Msg = bytes.ReadString16Length(buf)
}
data.TransactionExceptionCode = serror.TransactionExceptionCode(bytes.ReadByte(buf))
data.TransactionErrorCode = serror.TransactionErrorCode(bytes.ReadByte(buf))
data.BranchId = int64(bytes.ReadUInt64(buf))

return data
@@ -58,7 +58,7 @@ func (c *BranchRegisterResponseCodec) Encode(in interface{}) []byte {
}
bytes.WriteString16Length(msg, buf)
}
buf.WriteByte(byte(data.TransactionExceptionCode))
buf.WriteByte(byte(data.TransactionErrorCode))
buf.WriteInt64(data.BranchId)

return buf.Bytes()


+ 2
- 2
pkg/protocol/codec/branch_register_response_codec_test.go View File

@@ -20,7 +20,7 @@ package codec
import (
"testing"

serror "github.com/seata/seata-go/pkg/common/error"
serror "github.com/seata/seata-go/pkg/common/errors"

"github.com/seata/seata-go/pkg/protocol/message"
"github.com/stretchr/testify/assert"
@@ -33,7 +33,7 @@ func TestBranchRegisterResponseCodec(t *testing.T) {
ResultCode: message.ResultCodeFailed,
Msg: "FAILED",
},
TransactionExceptionCode: serror.TransactionExceptionCodeUnknown,
TransactionErrorCode: serror.TransactionErrorCodeUnknown,
},
BranchId: 124356567,
}


+ 3
- 3
pkg/protocol/codec/branch_rollback_response_codec.go View File

@@ -21,7 +21,7 @@ import (
"math"

"github.com/seata/seata-go/pkg/common/bytes"
serror "github.com/seata/seata-go/pkg/common/error"
serror "github.com/seata/seata-go/pkg/common/errors"
"github.com/seata/seata-go/pkg/protocol/branch"
"github.com/seata/seata-go/pkg/protocol/message"
)
@@ -41,7 +41,7 @@ func (g *BranchRollbackResponseCodec) Decode(in []byte) interface{} {
if data.ResultCode == message.ResultCodeFailed {
data.Msg = bytes.ReadString8Length(buf)
}
data.TransactionExceptionCode = serror.TransactionExceptionCode(bytes.ReadByte(buf))
data.TransactionErrorCode = serror.TransactionErrorCode(bytes.ReadByte(buf))
data.Xid = bytes.ReadString16Length(buf)
data.BranchId = int64(bytes.ReadUInt64(buf))
data.BranchStatus = branch.BranchStatus(bytes.ReadByte(buf))
@@ -61,7 +61,7 @@ func (g *BranchRollbackResponseCodec) Encode(in interface{}) []byte {
}
bytes.WriteString8Length(msg, buf)
}
buf.WriteByte(byte(data.TransactionExceptionCode))
buf.WriteByte(byte(data.TransactionErrorCode))
bytes.WriteString16Length(data.Xid, buf)
buf.WriteInt64(data.BranchId)
buf.WriteByte(byte(data.BranchStatus))


+ 2
- 2
pkg/protocol/codec/branch_rollback_response_codec_test.go View File

@@ -20,7 +20,7 @@ package codec
import (
"testing"

serror "github.com/seata/seata-go/pkg/common/error"
serror "github.com/seata/seata-go/pkg/common/errors"

model2 "github.com/seata/seata-go/pkg/protocol/branch"

@@ -35,7 +35,7 @@ func TestBranchRollbackResponseCodec(t *testing.T) {
BranchId: 56678,
BranchStatus: model2.BranchStatusPhaseoneFailed,
AbstractTransactionResponse: message.AbstractTransactionResponse{
TransactionExceptionCode: serror.TransactionExceptionCodeBeginFailed,
TransactionErrorCode: serror.TransactionErrorCodeBeginFailed,
AbstractResultMessage: message.AbstractResultMessage{
ResultCode: message.ResultCodeFailed,
Msg: "FAILED",


+ 3
- 3
pkg/protocol/codec/common_global_end_response_codec.go View File

@@ -21,7 +21,7 @@ import (
"math"

"github.com/seata/seata-go/pkg/common/bytes"
serror "github.com/seata/seata-go/pkg/common/error"
serror "github.com/seata/seata-go/pkg/common/errors"
"github.com/seata/seata-go/pkg/protocol/message"
)

@@ -40,7 +40,7 @@ func (c *CommonGlobalEndResponseCodec) Encode(in interface{}) []byte {
}
bytes.WriteString16Length(msg, buf)
}
buf.WriteByte(byte(data.TransactionExceptionCode))
buf.WriteByte(byte(data.TransactionErrorCode))
buf.WriteByte(byte(data.GlobalStatus))

return buf.Bytes()
@@ -54,7 +54,7 @@ func (c *CommonGlobalEndResponseCodec) Decode(in []byte) interface{} {
if data.ResultCode == message.ResultCodeFailed {
data.Msg = bytes.ReadString16Length(buf)
}
data.TransactionExceptionCode = serror.TransactionExceptionCode(bytes.ReadByte(buf))
data.TransactionErrorCode = serror.TransactionErrorCode(bytes.ReadByte(buf))
data.GlobalStatus = message.GlobalStatus(bytes.ReadByte(buf))

return data


+ 3
- 3
pkg/protocol/codec/global_begin_response_codec.go View File

@@ -21,7 +21,7 @@ import (
"math"

"github.com/seata/seata-go/pkg/common/bytes"
serror "github.com/seata/seata-go/pkg/common/error"
serror "github.com/seata/seata-go/pkg/common/errors"
"github.com/seata/seata-go/pkg/protocol/message"
)

@@ -44,7 +44,7 @@ func (c *GlobalBeginResponseCodec) Encode(in interface{}) []byte {
}
bytes.WriteString16Length(msg, buf)
}
buf.WriteByte(byte(data.TransactionExceptionCode))
buf.WriteByte(byte(data.TransactionErrorCode))
bytes.WriteString16Length(data.Xid, buf)
bytes.WriteString16Length(string(data.ExtraData), buf)

@@ -59,7 +59,7 @@ func (g *GlobalBeginResponseCodec) Decode(in []byte) interface{} {
if data.ResultCode == message.ResultCodeFailed {
data.Msg = bytes.ReadString16Length(buf)
}
data.TransactionExceptionCode = serror.TransactionExceptionCode(bytes.ReadByte(buf))
data.TransactionErrorCode = serror.TransactionErrorCode(bytes.ReadByte(buf))
data.Xid = bytes.ReadString16Length(buf)
data.ExtraData = []byte(bytes.ReadString16Length(buf))



+ 3
- 3
pkg/protocol/codec/global_begin_response_codec_test.go View File

@@ -20,10 +20,10 @@ package codec
import (
"testing"

serror "github.com/seata/seata-go/pkg/common/error"
"github.com/stretchr/testify/assert"

serror "github.com/seata/seata-go/pkg/common/errors"
"github.com/seata/seata-go/pkg/protocol/message"
"github.com/stretchr/testify/assert"
)

func TestGlobalBeginResponseCodec(t *testing.T) {
@@ -33,7 +33,7 @@ func TestGlobalBeginResponseCodec(t *testing.T) {
ResultCode: message.ResultCodeFailed,
Msg: "FAILED",
},
TransactionExceptionCode: serror.TransactionExceptionCodeBeginFailed,
TransactionErrorCode: serror.TransactionErrorCodeBeginFailed,
},

Xid: "test-transaction-id",


+ 2
- 2
pkg/protocol/message/response_message.go View File

@@ -18,13 +18,13 @@
package message

import (
serror "github.com/seata/seata-go/pkg/common/error"
"github.com/seata/seata-go/pkg/common/errors"
model2 "github.com/seata/seata-go/pkg/protocol/branch"
)

type AbstractTransactionResponse struct {
AbstractResultMessage
TransactionExceptionCode serror.TransactionExceptionCode
TransactionErrorCode errors.TransactionErrorCode
}

type AbstractBranchEndResponse struct {


+ 2
- 2
pkg/remoting/processor/client/rm_branch_commit_processor.go View File

@@ -52,7 +52,7 @@ func (f *rmBranchCommitProcessor) Process(ctx context.Context, rpcMessage messag

status, err := rm.GetRmCacheInstance().GetResourceManager(request.BranchType).BranchCommit(ctx, branchResource)
if err != nil {
log.Infof("branch commit error: %s", err.Error())
log.Errorf("branch commit error: %s", err.Error())
return err
}
log.Infof("branch commit success: xid %s, branchID %s, resourceID %s, applicationData %s", xid, branchID, resourceID, applicationData)
@@ -69,7 +69,7 @@ func (f *rmBranchCommitProcessor) Process(ctx context.Context, rpcMessage messag
}

// reply commit response to tc server
// todo add TransactionExceptionCode
// todo add TransactionErrorCode
response := message.BranchCommitResponse{
AbstractBranchEndResponse: message.AbstractBranchEndResponse{
AbstractTransactionResponse: message.AbstractTransactionResponse{


+ 1
- 1
pkg/remoting/processor/client/rm_branch_rollback_processor.go View File

@@ -52,7 +52,7 @@ func (f *rmBranchRollbackProcessor) Process(ctx context.Context, rpcMessage mess
}
status, err := rm.GetRmCacheInstance().GetResourceManager(request.BranchType).BranchRollback(ctx, branchResource)
if err != nil {
log.Infof("branch rollback error: %s", err.Error())
log.Errorf("branch rollback error: %s", err.Error())
return err
}
log.Infof("branch rollback success: xid %s, branchID %s, resourceID %s, applicationData %s", xid, branchID, resourceID, applicationData)


+ 41
- 0
pkg/rm/tcc/fence/config/tcc_fence_config.go View File

@@ -0,0 +1,41 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package config

import (
"go.uber.org/atomic"

"github.com/seata/seata-go/pkg/rm/tcc/fence/handler"
)

type TccFenceConfig struct {
Initialized atomic.Bool `default:"false"`
LogTableName string `default:"tcc_fence_log"`
}

func InitFence() {
// todo implement
}

func InitCleanTask() {
handler.GetFenceHandler().InitLogCleanChannel()
}

func Destroy() {
handler.GetFenceHandler().DestroyLogCleanChannel()
}

+ 35
- 0
pkg/rm/tcc/fence/enum/fence_phase.go View File

@@ -0,0 +1,35 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package enum

// FencePhase used to mark the stage of a tcc transaction
type FencePhase byte

const (
// FencePhaseNotExist fence phase not exist
FencePhaseNotExist = FencePhase(0)

// FencePhasePrepare prepare fence phase
FencePhasePrepare = FencePhase(1)

// FencePhaseCommit commit fence phase
FencePhaseCommit = FencePhase(2)

// FencePhaseRollback rollback fence phase
FencePhaseRollback = FencePhase(3)
)

+ 35
- 0
pkg/rm/tcc/fence/enum/fence_status.go View File

@@ -0,0 +1,35 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package enum

// FenceStatus Used to mark the state of a branch transaction
type FenceStatus byte

const (
// StatusTried phase 1: the commit tried.
StatusTried = FenceStatus(1)

// StatusCommitted phase 2: the committed.
StatusCommitted = FenceStatus(2)

// StatusRollbacked phase 2: the rollbacked.
StatusRollbacked = FenceStatus(3)

// StatusSuspended suspended status.
StatusSuspended = FenceStatus(4)
)

+ 69
- 0
pkg/rm/tcc/fence/fence_api.go View File

@@ -0,0 +1,69 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package fence

import (
"context"
"database/sql"
"fmt"

"github.com/seata/seata-go/pkg/common/errors"
"github.com/seata/seata-go/pkg/common/log"
"github.com/seata/seata-go/pkg/rm/tcc/fence/enum"
"github.com/seata/seata-go/pkg/rm/tcc/fence/handler"
"github.com/seata/seata-go/pkg/tm"
)

// WithFence This method is a suspended API interface that asserts the phase timing of a transaction
// and performs corresponding database operations to ensure transaction consistency
// case 1: if fencePhase is FencePhaseNotExist, will return a fence not found error.
// case 2: if fencePhase is FencePhasePrepare, will do prepare fence operation.
// case 3: if fencePhase is FencePhaseCommit, will do commit fence operation.
// case 4: if fencePhase is FencePhaseRollback, will do rollback fence operation.
// case 5: if fencePhase not in above case, will return a fence phase illegal error.
func WithFence(ctx context.Context, tx *sql.Tx, callback func() error) (err error) {
fp := tm.GetFencePhase(ctx)
h := handler.GetFenceHandler()

switch {
case fp == enum.FencePhaseNotExist:
err = errors.NewTccFenceError(
errors.FencePhaseError,
fmt.Sprintf("xid %s, tx name %s, fence phase not exist", tm.GetXID(ctx), tm.GetTxName(ctx)),
nil,
)
case fp == enum.FencePhasePrepare:
err = h.PrepareFence(ctx, tx, callback)
case fp == enum.FencePhaseCommit:
err = h.CommitFence(ctx, tx, callback)
case fp == enum.FencePhaseRollback:
err = h.RollbackFence(ctx, tx, callback)
default:
err = errors.NewTccFenceError(
errors.FencePhaseError,
fmt.Sprintf("fence phase: %v illegal", fp),
nil,
)
}

if err != nil {
log.Error(err)
}

return
}

+ 85
- 0
pkg/rm/tcc/fence/fence_api_test.go View File

@@ -0,0 +1,85 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package fence

import (
"context"
"database/sql"
"fmt"
"testing"

"github.com/seata/seata-go/pkg/common/errors"

"github.com/DATA-DOG/go-sqlmock"
"github.com/stretchr/testify/assert"

"github.com/seata/seata-go/pkg/rm/tcc/fence/enum"
"github.com/seata/seata-go/pkg/tm"
)

func TestWithFence(t *testing.T) {
tests := []struct {
xid string
txName string
fencePhase enum.FencePhase
bac tm.BusinessActionContext
callback func() error
wantErr bool
errStr string
wantCommit bool
wantRollback bool
}{
{
xid: "123",
txName: "test",
callback: func() error {
return nil
},
wantErr: true,
errStr: errors.NewTccFenceError(
errors.FencePhaseError,
fmt.Sprintf("xid 123, tx name test, fence phase not exist"),
nil,
).Error(),
},
}

for _, v := range tests {
db, mock, _ := sqlmock.New()
mock.ExpectBegin()
if v.wantCommit {
mock.ExpectCommit()
}
if v.wantRollback {
mock.ExpectRollback()
}
ctx := context.Background()
ctx = tm.InitSeataContext(ctx)
tm.SetXID(ctx, v.xid)
tm.SetTxName(ctx, v.txName)
tm.SetFencePhase(ctx, v.fencePhase)
tm.SetBusinessActionContext(ctx, &v.bac)
tx, _ := db.BeginTx(ctx, &sql.TxOptions{})

if v.wantErr {
assert.Equal(t, v.errStr, WithFence(ctx, tx, v.callback).Error())
} else {
assert.Nil(t, WithFence(ctx, tx, v.callback))
}
}
}

+ 250
- 0
pkg/rm/tcc/fence/handler/tcc_fence_wrapper_handler.go View File

@@ -0,0 +1,250 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package handler

import (
"container/list"
"context"
"database/sql"
"fmt"
"sync"
"time"

seataErrors "github.com/seata/seata-go/pkg/common/errors"
"github.com/seata/seata-go/pkg/common/log"
"github.com/seata/seata-go/pkg/rm/tcc/fence/enum"
"github.com/seata/seata-go/pkg/rm/tcc/fence/store/db/dao"
"github.com/seata/seata-go/pkg/rm/tcc/fence/store/db/model"
"github.com/seata/seata-go/pkg/tm"
)

type tccFenceWrapperHandler struct {
tccFenceDao dao.TCCFenceStore
logQueue chan *FenceLogIdentity
logCache list.List
logQueueOnce sync.Once
logQueueCloseOnce sync.Once
}

type FenceLogIdentity struct {
xid string
branchId int64
}

const (
maxQueueSize = 500
)

var (
fenceHandler *tccFenceWrapperHandler
fenceOnce sync.Once
)

func GetFenceHandler() *tccFenceWrapperHandler {
if fenceHandler == nil {
fenceOnce.Do(func() {
fenceHandler = &tccFenceWrapperHandler{
tccFenceDao: dao.GetTccFenceStoreDatabaseMapper(),
}
})
}
return fenceHandler
}

func (handler *tccFenceWrapperHandler) PrepareFence(ctx context.Context, tx *sql.Tx, callback func() error) error {
xid := tm.GetBusinessActionContext(ctx).Xid
branchId := tm.GetBusinessActionContext(ctx).BranchId
actionName := tm.GetBusinessActionContext(ctx).ActionName

err := handler.insertTCCFenceLog(tx, xid, branchId, actionName, enum.StatusTried)
if err != nil {
dbError, ok := err.(seataErrors.TccFenceError)
if ok && dbError.Code == seataErrors.TccFenceDbDuplicateKeyError {
// todo add clean command to channel.
handler.pushCleanChannel(xid, branchId)
}

return seataErrors.NewTccFenceError(
seataErrors.PrepareFenceError,
fmt.Sprintf("insert tcc fence record errors, prepare fence failed. xid= %s, branchId= %d", xid, branchId),
err,
)
}

log.Info("the phase 1 callback method will be called.")
err = callback()
if err != nil {
return seataErrors.NewTccFenceError(
seataErrors.FenceBusinessError,
fmt.Sprintf("the business method error msg of: %p", callback),
err,
)
}

return nil
}

func (handler *tccFenceWrapperHandler) CommitFence(ctx context.Context, tx *sql.Tx, callback func() error) error {
xid := tm.GetBusinessActionContext(ctx).Xid
branchId := tm.GetBusinessActionContext(ctx).BranchId

fenceDo, err := handler.tccFenceDao.QueryTCCFenceDO(tx, xid, branchId)
if err != nil {
return seataErrors.NewTccFenceError(seataErrors.CommitFenceError,
fmt.Sprintf(" commit fence method failed. xid= %s, branchId= %d ", xid, branchId),
err,
)
}
if fenceDo == nil {
return seataErrors.NewTccFenceError(seataErrors.CommitFenceError,
fmt.Sprintf("tcc fence record not exists, commit fence method failed. xid= %s, branchId= %d ", xid, branchId),
err,
)
}

if fenceDo.Status == enum.StatusCommitted {
log.Infof("branch transaction has already committed before. idempotency rejected. xid: %s, branchId: %d, status: %d", xid, branchId, fenceDo.Status)
return nil
}
if fenceDo.Status == enum.StatusRollbacked || fenceDo.Status == enum.StatusSuspended {
// enable warn level
log.Warnf("branch transaction status is unexpected. xid: %s, branchId: %d, status: %s", xid, branchId, fenceDo.Status)
return seataErrors.NewTccFenceError(seataErrors.CommitFenceError,
fmt.Sprintf("branch transaction status is unexpected. xid: %s, branchId: %d, status: %d", xid, branchId, fenceDo.Status),
nil,
)
}

return handler.updateFenceStatusAndInvokeCallback(tx, callback, xid, branchId, enum.StatusCommitted)
}

func (handler *tccFenceWrapperHandler) RollbackFence(ctx context.Context, tx *sql.Tx, callback func() error) error {
xid := tm.GetBusinessActionContext(ctx).Xid
branchId := tm.GetBusinessActionContext(ctx).BranchId
actionName := tm.GetBusinessActionContext(ctx).ActionName
fenceDo, err := handler.tccFenceDao.QueryTCCFenceDO(tx, xid, branchId)

if err != nil {
return seataErrors.NewTccFenceError(seataErrors.RollbackFenceError,
fmt.Sprintf(" rollback fence method failed. xid= %s, branchId= %d ", xid, branchId),
err,
)
}

// record is null, mean the need suspend
if fenceDo == nil {
err = handler.insertTCCFenceLog(tx, xid, branchId, actionName, enum.StatusSuspended)
if err != nil {
return seataErrors.NewTccFenceError(seataErrors.RollbackFenceError,
fmt.Sprintf("insert tcc fence suspend record error, rollback fence method failed. xid= %s, branchId= %d", xid, branchId),
err,
)
}
log.Infof("Insert tcc fence suspend record xid: %s, branchId: %d", xid, branchId)
return nil
}

// have rollbacked or suspended
if fenceDo.Status == enum.StatusRollbacked || fenceDo.Status == enum.StatusSuspended {
// enable warn level
log.Infof("Branch transaction had already rollbacked before, idempotency rejected. xid: %s, branchId: %d, status: %s", xid, branchId, fenceDo.Status)
return nil
}
if fenceDo.Status == enum.StatusCommitted {
log.Warnf("Branch transaction status is unexpected. xid: %s, branchId: %d, status: %d", xid, branchId, fenceDo.Status)
return seataErrors.NewTccFenceError(seataErrors.RollbackFenceError,
fmt.Sprintf("branch transaction status is unexpected. xid: %s, branchId: %d, status: %d", xid, branchId, fenceDo.Status),
err,
)
}

return handler.updateFenceStatusAndInvokeCallback(tx, callback, xid, branchId, enum.StatusRollbacked)
}

func (handler *tccFenceWrapperHandler) insertTCCFenceLog(tx *sql.Tx, xid string, branchId int64, actionName string, status enum.FenceStatus) error {
tccFenceDo := model.TCCFenceDO{
Xid: xid,
BranchId: branchId,
ActionName: actionName,
Status: status,
}
return handler.tccFenceDao.InsertTCCFenceDO(tx, &tccFenceDo)
}

func (handler *tccFenceWrapperHandler) updateFenceStatusAndInvokeCallback(tx *sql.Tx, callback func() error, xid string, branchId int64, status enum.FenceStatus) error {
if err := handler.tccFenceDao.UpdateTCCFenceDO(tx, xid, branchId, enum.StatusTried, status); err != nil {
return err
}

log.Infof("the phase %d callback method will be called", status)
if err := callback(); err != nil {
return seataErrors.NewTccFenceError(
seataErrors.FenceBusinessError,
fmt.Sprintf("the business method error msg of: %p", callback),
err,
)
}

return nil
}

func (handler *tccFenceWrapperHandler) InitLogCleanChannel() {
handler.logQueueOnce.Do(func() {
go handler.traversalCleanChannel()
})
}

func (handler *tccFenceWrapperHandler) DestroyLogCleanChannel() {
handler.logQueueCloseOnce.Do(func() {
close(handler.logQueue)
})
}

func (handler *tccFenceWrapperHandler) deleteFence(xid string, id int64) error {
// todo implement
return nil
}

func (handler *tccFenceWrapperHandler) deleteFenceByDate(datetime time.Time) int32 {
// todo implement
return 0
}

func (handler *tccFenceWrapperHandler) pushCleanChannel(xid string, branchId int64) {
// todo implement
fli := &FenceLogIdentity{
xid: xid,
branchId: branchId,
}
select {
case handler.logQueue <- fli:
// todo add batch delete from log cache.
default:
handler.logCache.PushBack(fli)
}
log.Infof("add one log to clean queue: %v ", fli)
}

func (handler *tccFenceWrapperHandler) traversalCleanChannel() {
handler.logQueue = make(chan *FenceLogIdentity, maxQueueSize)
for li := range handler.logQueue {
if err := handler.deleteFence(li.xid, li.branchId); err != nil {
log.Errorf("delete fence log failed, xid: %s, branchId: &s", li.xid, li.branchId)
}
}
}

+ 69
- 0
pkg/rm/tcc/fence/store/db/dao/store_api.go View File

@@ -0,0 +1,69 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package dao

import (
"database/sql"
"time"

"github.com/seata/seata-go/pkg/rm/tcc/fence/enum"

"github.com/seata/seata-go/pkg/rm/tcc/fence/store/db/model"
)

// The TCC Fence Store
type TCCFenceStore interface {

// QueryTCCFenceDO tcc fence do.
// param tx the tx will bind with user business method
// param xid the global transaction id
// param branchId the branch transaction id
// return the tcc fence do and error msg
QueryTCCFenceDO(tx *sql.Tx, xid string, branchId int64) (*model.TCCFenceDO, error)

// InsertTCCFenceDO tcc fence do boolean.
// param tx the tx will bind with user business method
// param tccFenceDO the tcc fence do
// return the error msg
InsertTCCFenceDO(tx *sql.Tx, tccFenceDo *model.TCCFenceDO) error

// UpdateTCCFenceDO tcc fence do boolean.
// param tx the tx will bind with user business method
// param xid the global transaction id
// param branchId the branch transaction id
// param newStatus the new status
// return the error msg
UpdateTCCFenceDO(tx *sql.Tx, xid string, branchId int64, oldStatus enum.FenceStatus, newStatus enum.FenceStatus) error

// DeleteTCCFenceDO tcc fence do boolean.
// param tx the tx will bind with user business method
// param xid the global transaction id
// param branchId the branch transaction id
// return the error msg
DeleteTCCFenceDO(tx *sql.Tx, xid string, branchId int64) error

// DeleteTCCFenceDOByMdfDate tcc fence by datetime.
// param tx the tx will bind with user business method
// param datetime modify time
// return the error msg
DeleteTCCFenceDOByMdfDate(tx *sql.Tx, datetime time.Time) error

// SetLogTableName LogTable Name
// param logTableName logTableName
SetLogTableName(logTable string)
}

+ 186
- 0
pkg/rm/tcc/fence/store/db/dao/tcc_fence_db.go View File

@@ -0,0 +1,186 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package dao

import (
"context"
"database/sql"
"fmt"
"sync"
"time"

"github.com/go-sql-driver/mysql"

"github.com/seata/seata-go/pkg/common/errors"
"github.com/seata/seata-go/pkg/rm/tcc/fence/enum"
"github.com/seata/seata-go/pkg/rm/tcc/fence/store/db/model"
sql2 "github.com/seata/seata-go/pkg/rm/tcc/fence/store/db/sql"
)

var (
once sync.Once
tccFenceStoreDatabaseMapper *TccFenceStoreDatabaseMapper
)

func GetTccFenceStoreDatabaseMapper() *TccFenceStoreDatabaseMapper {
if tccFenceStoreDatabaseMapper == nil {
once.Do(func() {
tccFenceStoreDatabaseMapper = &TccFenceStoreDatabaseMapper{}
tccFenceStoreDatabaseMapper.InitLogTableName()
})
}
return tccFenceStoreDatabaseMapper

}

func (t *TccFenceStoreDatabaseMapper) InitLogTableName() {
// todo get log table name from config
// set log table name
// default name is tcc_fence_log
t.logTableName = "tcc_fence_log"
}

type TccFenceStoreDatabaseMapper struct {
logTableName string
}

func (t *TccFenceStoreDatabaseMapper) QueryTCCFenceDO(tx *sql.Tx, xid string, branchId int64) (*model.TCCFenceDO, error) {
prepareStmt, err := tx.PrepareContext(context.Background(), sql2.GetQuerySQLByBranchIdAndXid(t.logTableName))
if err != nil {
return nil, errors.NewTccFenceError(errors.TccFenceDbError, "query tcc fence prepare sql failed", err)
}
defer prepareStmt.Close()

result := prepareStmt.QueryRow(xid, branchId)
var (
actionName string
status enum.FenceStatus
gmtCreate time.Time
gmtModify time.Time
)

if err = result.Scan(&xid, &branchId, &actionName, &status, &gmtCreate, &gmtModify); err != nil {
// will return error, if rows is empty
if err.Error() == "sql: no rows in result set" {
return nil, errors.NewTccFenceError(errors.TccFenceDbError, "query tcc fence get scan row,no rows in result set", err)
} else {
return nil, errors.NewTccFenceError(errors.TccFenceDbError, "query tcc fence get scan row failed", err)
}
}

tccFenceDo := &model.TCCFenceDO{
Xid: xid,
BranchId: branchId,
ActionName: actionName,
Status: status,
GmtModified: gmtModify,
GmtCreate: gmtCreate,
}
return tccFenceDo, nil
}

func (t *TccFenceStoreDatabaseMapper) InsertTCCFenceDO(tx *sql.Tx, tccFenceDo *model.TCCFenceDO) error {
prepareStmt, err := tx.PrepareContext(context.Background(), sql2.GetInsertLocalTCCLogSQL(t.logTableName))
if err != nil {
return errors.NewTccFenceError(errors.TccFenceDbError, "insert tcc fence prepare sql failed", err)
}
defer prepareStmt.Close()

timeNow := time.Now()
result, err := prepareStmt.Exec(tccFenceDo.Xid, tccFenceDo.BranchId, tccFenceDo.ActionName, tccFenceDo.Status, timeNow, timeNow)
if err != nil {
if mysqlError, ok := err.(*mysql.MySQLError); ok && mysqlError.Number == 1062 {
return errors.NewTccFenceError(errors.TccFenceDbDuplicateKeyError,
fmt.Sprintf("Insert tcc fence record duplicate key exception. xid= %s, branchId= %d", tccFenceDo.Xid, tccFenceDo.BranchId),
err)
} else {
return errors.NewTccFenceError(errors.TccFenceDbError, "insert tcc fence exec sql failed", err)
}
}

affected, err := result.RowsAffected()
if err != nil || affected == 0 {
return errors.NewTccFenceError(errors.TccFenceDbError, "insert tcc fence get row affect failed", err)
}

return nil
}

func (t *TccFenceStoreDatabaseMapper) UpdateTCCFenceDO(tx *sql.Tx, xid string, branchId int64, oldStatus enum.FenceStatus, newStatus enum.FenceStatus) error {
prepareStmt, err := tx.PrepareContext(context.Background(), sql2.GetUpdateStatusSQLByBranchIdAndXid(t.logTableName))
if err != nil {
return errors.NewTccFenceError(errors.TccFenceDbError, "update tcc fence prepare sql failed", err)
}
defer prepareStmt.Close()

result, err := prepareStmt.Exec(newStatus, time.Now(), xid, branchId, oldStatus)
if err != nil {
return errors.NewTccFenceError(errors.TccFenceDbError, "update tcc fence exec sql failed", err)
}

affected, err := result.RowsAffected()
if err != nil || affected == 0 {
return errors.NewTccFenceError(errors.TccFenceDbError, "update tcc fence get row affect failed", err)
}

return nil
}

func (t *TccFenceStoreDatabaseMapper) DeleteTCCFenceDO(tx *sql.Tx, xid string, branchId int64) error {
prepareStmt, err := tx.PrepareContext(context.Background(), sql2.GetDeleteSQLByBranchIdAndXid(t.logTableName))
if err != nil {
return errors.NewTccFenceError(errors.TccFenceDbError, "delete tcc fence prepare sql failed ", err)
}
defer prepareStmt.Close()

result, err := prepareStmt.Exec(xid, branchId)
if err != nil {
return errors.NewTccFenceError(errors.TccFenceDbError, "delete tcc fence execute sql failed", err)
}

affected, err := result.RowsAffected()
if err != nil || affected == 0 {
return errors.NewTccFenceError(errors.TccFenceDbError, "delete tcc fence get rows affected failed", err)
}

return nil
}

func (t *TccFenceStoreDatabaseMapper) DeleteTCCFenceDOByMdfDate(tx *sql.Tx, datetime time.Time) error {
prepareStmt, err := tx.PrepareContext(context.Background(), sql2.GetDeleteSQLByMdfDateAndStatus(t.logTableName))
if err != nil {
return errors.NewTccFenceError(errors.TccFenceDbError, "delete tcc fence prepare sql failed", err)
}
defer prepareStmt.Close()

result, err := prepareStmt.Exec(datetime)
if err != nil {
return errors.NewTccFenceError(errors.TccFenceDbError, "delete tcc fence exec sql failed", err)
}

affected, err := result.RowsAffected()
if err != nil || affected == 0 {
return errors.NewTccFenceError(errors.TccFenceDbError, "delete tcc fence get rows affected failed", err)
}

return nil
}

func (t *TccFenceStoreDatabaseMapper) SetLogTableName(logTable string) {
t.logTableName = logTable
}

+ 205
- 0
pkg/rm/tcc/fence/store/db/dao/tcc_fence_db_test.go View File

@@ -0,0 +1,205 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package dao

import (
"context"
"database/sql"
"database/sql/driver"
"math"
"reflect"
"testing"
"time"

sqlmock "github.com/DATA-DOG/go-sqlmock"
_ "github.com/go-sql-driver/mysql"
"github.com/stretchr/testify/assert"

"github.com/seata/seata-go/pkg/rm/tcc/fence/enum"
"github.com/seata/seata-go/pkg/rm/tcc/fence/store/db/model"
sql2 "github.com/seata/seata-go/pkg/rm/tcc/fence/store/db/sql"
)

func TestTccFenceStoreDatabaseMapper_SetLogTableName(t *testing.T) {
GetTccFenceStoreDatabaseMapper().SetLogTableName("tcc_fence_log")
value := reflect.ValueOf(GetTccFenceStoreDatabaseMapper())
assert.Equal(t, "tcc_fence_log", value.Elem().FieldByName("logTableName").String())
}

func TestTccFenceStoreDatabaseMapper_InsertTCCFenceDO(t *testing.T) {
tccFenceDo := &model.TCCFenceDO{
Xid: "123123124124",
BranchId: 12312312312,
ActionName: "fence_test",
Status: enum.StatusSuspended,
}
db, mock, err := sqlmock.New(sqlmock.QueryMatcherOption(sqlmock.QueryMatcherEqual))

if err != nil {
t.Fatalf("open db failed msg: %v", err)
}
defer db.Close()

mock.ExpectBegin()
mock.ExpectPrepare(sql2.GetInsertLocalTCCLogSQL("tcc_fence_log")).
ExpectExec().
WithArgs(driver.Value(tccFenceDo.Xid), driver.Value(tccFenceDo.BranchId), driver.Value(tccFenceDo.ActionName),
driver.Value(tccFenceDo.Status), sqlmock.AnyArg(), sqlmock.AnyArg()).
WillReturnResult(sqlmock.NewResult(1, 1))
mock.ExpectCommit()

tx, err := db.BeginTx(context.Background(), &sql.TxOptions{})
if err != nil {
t.Fatalf("open conn failed msg :%v", err)
}

err = GetTccFenceStoreDatabaseMapper().InsertTCCFenceDO(tx, tccFenceDo)
tx.Commit()
assert.Equal(t, nil, err)
}

func TestTccFenceStoreDatabaseMapper_QueryTCCFenceDO(t *testing.T) {
now := time.Now()
tccFenceDo := &model.TCCFenceDO{
Xid: "123123124124",
BranchId: 12312312312,
ActionName: "fence_test",
Status: enum.StatusTried,
GmtCreate: now,
GmtModified: now,
}
db, mock, err := sqlmock.New(sqlmock.QueryMatcherOption(sqlmock.QueryMatcherEqual))
if err != nil {
t.Fatalf("open db failed msg: %v", err)
}
defer db.Close()
mock.ExpectBegin()
mock.ExpectPrepare(sql2.GetQuerySQLByBranchIdAndXid("tcc_fence_log")).
ExpectQuery().
WithArgs(driver.Value(tccFenceDo.Xid), driver.Value(tccFenceDo.BranchId)).
WillReturnRows(sqlmock.NewRows([]string{"xid", "branch_id", "action_name", "status", "gmt_create", "gmt_modified"}).
AddRow(driver.Value(tccFenceDo.Xid), driver.Value(tccFenceDo.BranchId), driver.Value(tccFenceDo.ActionName),
driver.Value(tccFenceDo.Status), driver.Value(now), driver.Value(now)))
mock.ExpectCommit()
tx, err := db.BeginTx(context.Background(), &sql.TxOptions{})
if err != nil {
t.Fatalf("open conn failed msg :%v", err)
}

actualFenceDo, err := GetTccFenceStoreDatabaseMapper().QueryTCCFenceDO(tx, tccFenceDo.Xid, tccFenceDo.BranchId)
tx.Commit()
assert.Equal(t, tccFenceDo.Xid, actualFenceDo.Xid)
assert.Equal(t, tccFenceDo.BranchId, actualFenceDo.BranchId)
assert.Equal(t, tccFenceDo.Status, actualFenceDo.Status)
assert.Equal(t, tccFenceDo.ActionName, actualFenceDo.ActionName)
assert.NotEmpty(t, actualFenceDo.GmtModified)
assert.NotEmpty(t, actualFenceDo.GmtCreate)
assert.Nil(t, err)
}

func TestTccFenceStoreDatabaseMapper_UpdateTCCFenceDO(t *testing.T) {
now := time.Now()
tccFenceDo := &model.TCCFenceDO{
Xid: "123123124124",
BranchId: 12312312312,
ActionName: "fence_test",
Status: enum.StatusTried,
GmtCreate: now,
GmtModified: now,
}
db, mock, err := sqlmock.New(sqlmock.QueryMatcherOption(sqlmock.QueryMatcherEqual))
if err != nil {
t.Fatalf("open db failed msg: %v", err)
}
defer db.Close()

mock.ExpectBegin()
mock.ExpectPrepare(sql2.GetUpdateStatusSQLByBranchIdAndXid("tcc_fence_log")).
ExpectExec().
WithArgs(sqlmock.AnyArg(), sqlmock.AnyArg(), sqlmock.AnyArg(), sqlmock.AnyArg(), sqlmock.AnyArg()).
WillReturnResult(sqlmock.NewResult(1, 1))
mock.ExpectCommit()

tx, err := db.BeginTx(context.Background(), &sql.TxOptions{})
if err != nil {
t.Fatalf("open conn failed msg :%v", err)
}

err = GetTccFenceStoreDatabaseMapper().
UpdateTCCFenceDO(tx, tccFenceDo.Xid, tccFenceDo.BranchId, tccFenceDo.Status, enum.StatusCommitted)
tx.Commit()
assert.Equal(t, nil, err)
}

func TestTccFenceStoreDatabaseMapper_DeleteTCCFenceDO(t *testing.T) {
now := time.Now()
tccFenceDo := &model.TCCFenceDO{
Xid: "123123124124",
BranchId: 12312312312,
ActionName: "fence_test",
Status: enum.StatusTried,
GmtCreate: now,
GmtModified: now,
}
db, mock, err := sqlmock.New(sqlmock.QueryMatcherOption(sqlmock.QueryMatcherEqual))
if err != nil {
t.Fatalf("open db failed msg: %v", err)
}
defer db.Close()
mock.ExpectBegin()
mock.ExpectPrepare(sql2.GetDeleteSQLByBranchIdAndXid("tcc_fence_log")).
ExpectExec().
WithArgs(driver.Value(tccFenceDo.Xid), driver.Value(tccFenceDo.BranchId)).
WillReturnResult(sqlmock.NewResult(1, 1))
mock.ExpectCommit()

tx, err := db.BeginTx(context.Background(), &sql.TxOptions{})
if err != nil {
t.Fatalf("open conn failed msg :%v", err)
}

err = GetTccFenceStoreDatabaseMapper().DeleteTCCFenceDO(tx, tccFenceDo.Xid, tccFenceDo.BranchId)
tx.Commit()
assert.Equal(t, nil, err)
}

func TestTccFenceStoreDatabaseMapper_DeleteTCCFenceDOByMdfDate(t *testing.T) {
now := time.Now()
tccFenceDo := &model.TCCFenceDO{
GmtCreate: now,
}
db, mock, err := sqlmock.New(sqlmock.QueryMatcherOption(sqlmock.QueryMatcherEqual))
if err != nil {
t.Fatalf("open db failed msg: %v", err)
}
defer db.Close()
mock.ExpectBegin()
mock.ExpectPrepare(sql2.GetDeleteSQLByMdfDateAndStatus("tcc_fence_log")).
ExpectExec().
WithArgs(driver.Value(tccFenceDo.GmtModified.Add(math.MaxInt32))).
WillReturnResult(sqlmock.NewResult(1, 1))
mock.ExpectCommit()

tx, err := db.BeginTx(context.Background(), &sql.TxOptions{})
if err != nil {
t.Fatalf("open conn failed msg :%v", err)
}
err = GetTccFenceStoreDatabaseMapper().DeleteTCCFenceDOByMdfDate(tx, tccFenceDo.GmtModified.Add(math.MaxInt32))
tx.Commit()
assert.Equal(t, nil, err)
}

+ 46
- 0
pkg/rm/tcc/fence/store/db/model/tcc_fence_do.go View File

@@ -0,0 +1,46 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package model

import (
"time"

"github.com/seata/seata-go/pkg/rm/tcc/fence/enum"
)

type TCCFenceDO struct {

// Xid the global transaction id
Xid string

// BranchId the branch transaction id
BranchId int64

// ActionName the action name
ActionName string

// Status the tcc fence status
// tried: 1; committed: 2; rollbacked: 3; suspended: 4
Status enum.FenceStatus

// GmtCreate create time
GmtCreate time.Time

// GmtModified update time
GmtModified time.Time
}

+ 67
- 0
pkg/rm/tcc/fence/store/db/sql/tcc_fence_store_sql.go View File

@@ -0,0 +1,67 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package sql

import (
"fmt"
"strconv"

"github.com/seata/seata-go/pkg/rm/tcc/fence/enum"
)

var (
// localTccLogPlaced The enum LocalTccLogPlaced
localTccLogPlaced = " %s "

// insertLocalTccLog The enum InsertLocalTccLog
insertLocalTccLog = "insert into " + localTccLogPlaced + " (xid, branch_id, action_name, status, gmt_create, gmt_modified) values ( ?,?,?,?,?,?)"

// queryByBranchIdAndXid The enum QueryByBranchIdAndXid
queryByBranchIdAndXid = "select xid, branch_id, action_name, status, gmt_create, gmt_modified from " + localTccLogPlaced + " where xid = ? and branch_id = ? for update"

// updateStatusByBranchIdAndXid The enum UpdateStatusByBranchIdAndXid
updateStatusByBranchIdAndXid = "update " + localTccLogPlaced + " set status = ?, gmt_modified = ? where xid = ? and branch_id = ? and status = ? "

// deleteByBranchIdAndXid The enum DeleteByBranchIdAndXid
deleteByBranchIdAndXid = "delete from " + localTccLogPlaced + " where xid = ? and branch_id = ? "

// deleteByDateAndStatus The enum DeleteByDateAndStatus
deleteByDateAndStatus = "delete from " + localTccLogPlaced + " where gmt_modified < ? and status in (" + strconv.Itoa(int(enum.StatusCommitted)) + " , " + strconv.Itoa(int(enum.StatusRollbacked)) + " , " + strconv.Itoa(int(enum.StatusSuspended)) + ")"
)

func GetInsertLocalTCCLogSQL(localTccTable string) string {
return fmt.Sprintf(insertLocalTccLog, localTccTable)

}

func GetQuerySQLByBranchIdAndXid(localTccTable string) string {
return fmt.Sprintf(queryByBranchIdAndXid, localTccTable)
}

func GetUpdateStatusSQLByBranchIdAndXid(localTccTable string) string {
return fmt.Sprintf(updateStatusByBranchIdAndXid, localTccTable)
}

func GetDeleteSQLByBranchIdAndXid(localTccTable string) string {
return fmt.Sprintf(deleteByBranchIdAndXid, localTccTable)

}

func GetDeleteSQLByMdfDateAndStatus(localTccTable string) string {
return fmt.Sprintf(deleteByDateAndStatus, localTccTable)
}

+ 19
- 2
pkg/rm/tcc/tcc_resource.go View File

@@ -27,6 +27,7 @@ import (
"github.com/seata/seata-go/pkg/common/log"
"github.com/seata/seata-go/pkg/protocol/branch"
"github.com/seata/seata-go/pkg/rm"
"github.com/seata/seata-go/pkg/rm/tcc/fence/enum"
"github.com/seata/seata-go/pkg/tm"
)

@@ -132,7 +133,15 @@ func (t *TCCResourceManager) BranchCommit(ctx context.Context, branchResource rm
tccResource, _ = resource.(*TCCResource)
}

_, err := tccResource.TwoPhaseAction.Commit(ctx, t.getBusinessActionContext(branchResource.Xid, branchResource.BranchId, branchResource.ResourceId, branchResource.ApplicationData))
businessActionContext := t.getBusinessActionContext(branchResource.Xid, branchResource.BranchId, branchResource.ResourceId, branchResource.ApplicationData)

// to set up the fence phase
ctx = tm.InitSeataContext(ctx)
tm.SetXID(ctx, branchResource.Xid)
tm.SetFencePhase(ctx, enum.FencePhaseCommit)
tm.SetBusinessActionContext(ctx, businessActionContext)

_, err := tccResource.TwoPhaseAction.Commit(ctx, businessActionContext)
if err != nil {
return branch.BranchStatusPhasetwoCommitFailedRetryable, err
}
@@ -169,7 +178,15 @@ func (t *TCCResourceManager) BranchRollback(ctx context.Context, branchResource
tccResource, _ = resource.(*TCCResource)
}

_, err := tccResource.TwoPhaseAction.Rollback(ctx, t.getBusinessActionContext(branchResource.Xid, branchResource.BranchId, branchResource.ResourceId, branchResource.ApplicationData))
businessActionContext := t.getBusinessActionContext(branchResource.Xid, branchResource.BranchId, branchResource.ResourceId, branchResource.ApplicationData)

// to set up the fence phase
ctx = tm.InitSeataContext(ctx)
tm.SetXID(ctx, branchResource.Xid)
tm.SetFencePhase(ctx, enum.FencePhaseRollback)
tm.SetBusinessActionContext(ctx, businessActionContext)

_, err := tccResource.TwoPhaseAction.Rollback(ctx, businessActionContext)
if err != nil {
return branch.BranchStatusPhasetwoRollbackFailedRetryable, err
}


+ 7
- 2
pkg/rm/tcc/tcc_service.go View File

@@ -25,12 +25,14 @@ import (
"time"

"github.com/pkg/errors"

"github.com/seata/seata-go/pkg/common"
"github.com/seata/seata-go/pkg/common/log"
"github.com/seata/seata-go/pkg/common/net"
"github.com/seata/seata-go/pkg/common/types"
"github.com/seata/seata-go/pkg/protocol/branch"
"github.com/seata/seata-go/pkg/rm"
"github.com/seata/seata-go/pkg/rm/tcc/fence/enum"
"github.com/seata/seata-go/pkg/tm"
)

@@ -81,6 +83,9 @@ func (t *TCCServiceProxy) Prepare(ctx context.Context, params interface{}) (inte
return nil, err
}
}

// to set up the fence phase
tm.SetFencePhase(ctx, enum.FencePhasePrepare)
return t.TCCResource.Prepare(ctx, params)
}

@@ -176,7 +181,7 @@ func (t *TCCServiceProxy) initBusinessActionContext(ctx context.Context, params
// 1. null: create new BusinessActionContext
// 2. tm.BusinessActionContext: return it
// 3. *tm.BusinessActionContext: if nil then create new BusinessActionContext, else return it
// 4. Struct: if there is an attribute of businessactioncontext type and it is not nil, return it
// 4. Struct: if there is an attribute of businessactioncontext enum and it is not nil, return it
// 5. else: create new BusinessActionContext
func (t *TCCServiceProxy) getOrCreateBusinessActionContext(params interface{}) *tm.BusinessActionContext {
if params == nil {
@@ -222,7 +227,7 @@ func (t *TCCServiceProxy) getOrCreateBusinessActionContext(params interface{}) *
return &tm.BusinessActionContext{}
}

// obtainStructValueType check o is struct or pointer type
// obtainStructValueType check o is struct or pointer enum
func obtainStructValueType(o interface{}) (bool, reflect.Value, reflect.Type) {
v := reflect.ValueOf(o)
t := reflect.TypeOf(o)


+ 17
- 0
pkg/tm/context.go View File

@@ -21,6 +21,7 @@ import (
"context"

"github.com/seata/seata-go/pkg/protocol/message"
"github.com/seata/seata-go/pkg/rm/tcc/fence/enum"
)

type ContextParam string
@@ -42,6 +43,7 @@ type ContextVariable struct {
TxName string
Xid string
XidCopy string
FencePhase enum.FencePhase
TxRole *GlobalTransactionRole
BusinessActionContext *BusinessActionContext
TxStatus *message.GlobalStatus
@@ -157,3 +159,18 @@ func UnbindXid(ctx context.Context) {
variable.(*ContextVariable).XidCopy = ""
}
}

func SetFencePhase(ctx context.Context, phase enum.FencePhase) {
variable := ctx.Value(seataContextVariable)
if variable != nil {
variable.(*ContextVariable).FencePhase = phase
}
}

func GetFencePhase(ctx context.Context) enum.FencePhase {
variable := ctx.Value(seataContextVariable)
if variable != nil {
return variable.(*ContextVariable).FencePhase
}
return enum.FencePhaseNotExist
}

+ 17
- 0
pkg/tm/context_test.go View File

@@ -24,6 +24,7 @@ import (
"github.com/stretchr/testify/assert"

"github.com/seata/seata-go/pkg/protocol/message"
"github.com/seata/seata-go/pkg/rm/tcc/fence/enum"
)

func TestInitSeataContext(t *testing.T) {
@@ -134,3 +135,19 @@ func TestUnbindXid(t *testing.T) {
UnbindXid(ctx)
assert.Empty(t, GetXID(ctx))
}

func TestSetFencePhase(t *testing.T) {
ctx := InitSeataContext(context.Background())
phase := enum.FencePhaseCommit
SetFencePhase(ctx, phase)
assert.Equal(t, phase,
ctx.Value(seataContextVariable).(*ContextVariable).FencePhase)
}

func TestGetFencePhase(t *testing.T) {
ctx := InitSeataContext(context.Background())
phase := enum.FencePhaseCommit
SetFencePhase(ctx, phase)
assert.Equal(t, phase,
GetFencePhase(ctx))
}

+ 9
- 0
sample/tcc/fence/README_ZH.md View File

@@ -0,0 +1,9 @@
## 用例介绍
此用例介绍如何在tcc本地模式下使用防悬挂功能

## 使用步骤

- 在您的数据库中使用``./sample/tcc/fence/script/mysql.sql``脚本创建防悬挂所需的日志记录表,如果您使用的是其他数据库则运行对应数据库的脚本文件。
- 在``./sample/tcc/fence/service/service.go``中修改数据库驱动名为对应数据库类型并引入相关驱动包,mysql无需修改。此外需要注意用户名和密码是否正确。
- 启动``seata tc server``
- 使用以下命令运行用例``go run ./sample/tcc/fence/cmd/main.go``

+ 54
- 0
sample/tcc/fence/cmd/main.go View File

@@ -0,0 +1,54 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package main

import (
"context"

_ "github.com/go-sql-driver/mysql"

"github.com/seata/seata-go/pkg/client"
"github.com/seata/seata-go/pkg/common/log"
"github.com/seata/seata-go/pkg/tm"
"github.com/seata/seata-go/sample/tcc/fence/service"
)

func main() {
client.Init()
var err error
ctx := tm.Begin(context.Background(), "TestTCCServiceBusiness")
defer func() {
resp := tm.CommitOrRollback(ctx, err == nil)
log.Infof("tx result %v", resp)
<-make(chan struct{})
}()

tccService := service.NewTestTCCServiceBusinessProxy()
tccService2 := service.NewTestTCCServiceBusiness2Proxy()

_, err = tccService.Prepare(ctx, 1)
if err != nil {
log.Errorf("TestTCCServiceBusiness prepare error, %v", err.Error())
return
}
_, err = tccService2.Prepare(ctx, 3)
if err != nil {
log.Errorf("TestTCCServiceBusiness2 prepare error, %v", err.Error())
return
}
}

+ 31
- 0
sample/tcc/fence/script/mysql.sql View File

@@ -0,0 +1,31 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

-- -------------------------------- The script use tcc fence --------------------------------
CREATE TABLE IF NOT EXISTS `tcc_fence_log`
(
`xid` VARCHAR(128) NOT NULL COMMENT 'global id',
`branch_id` BIGINT NOT NULL COMMENT 'branch id',
`action_name` VARCHAR(64) NOT NULL COMMENT 'action name',
`status` TINYINT NOT NULL COMMENT 'status(tried:1;committed:2;rollbacked:3;suspended:4)',
`gmt_create` DATETIME(3) NOT NULL COMMENT 'create time',
`gmt_modified` DATETIME(3) NOT NULL COMMENT 'update time',
PRIMARY KEY (`xid`, `branch_id`),
KEY `idx_gmt_modified` (`gmt_modified`),
KEY `idx_status` (`status`)
) ENGINE = InnoDB
DEFAULT CHARSET = utf8mb4;

+ 30
- 0
sample/tcc/fence/script/oracle.sql View File

@@ -0,0 +1,30 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

-- -------------------------------- The script used for tcc fence --------------------------------
CREATE TABLE tcc_fence_log
(
xid VARCHAR2(128) NOT NULL,
branch_id NUMBER(19) NOT NULL,
action_name VARCHAR2(64) NOT NULL,
status NUMBER(3) NOT NULL,
gmt_create TIMESTAMP(3) NOT NULL,
gmt_modified TIMESTAMP(3) NOT NULL,
PRIMARY KEY (xid, branch_id)
);
CREATE INDEX idx_gmt_modified ON tcc_fence_log (gmt_modified);
CREATE INDEX idx_status ON tcc_fence_log (status);

+ 30
- 0
sample/tcc/fence/script/postgresql.sql View File

@@ -0,0 +1,30 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

-- -------------------------------- The script used for tcc fence --------------------------------
CREATE TABLE IF NOT EXISTS public.tcc_fence_log
(
xid VARCHAR(128) NOT NULL,
branch_id BIGINT NOT NULL,
action_name VARCHAR(64) NOT NULL,
status SMALLINT NOT NULL,
gmt_create TIMESTAMP(3) NOT NULL,
gmt_modified TIMESTAMP(3) NOT NULL,
CONSTRAINT pk_tcc_fence_log PRIMARY KEY (xid, branch_id)
);
CREATE INDEX idx_gmt_modified ON public.tcc_fence_log (gmt_modified);
CREATE INDEX idx_status ON public.tcc_fence_log (status);

+ 256
- 0
sample/tcc/fence/service/service.go View File

@@ -0,0 +1,256 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package service

import (
"context"
"database/sql"
"fmt"
"sync"

"github.com/seata/seata-go/pkg/common/log"
"github.com/seata/seata-go/pkg/rm/tcc"
"github.com/seata/seata-go/pkg/rm/tcc/fence"
"github.com/seata/seata-go/pkg/tm"
)

const (
DriverName = "mysql"
Url = "root:root@tcp(127.0.0.1:3306)/seata?charset=utf8&parseTime=True"
)

var (
tccService *tcc.TCCServiceProxy
tccServiceOnce sync.Once

tccService2 *tcc.TCCServiceProxy
tccService2Once sync.Once

commitTimes int
commitFenceTimes int

rollbackTimes int
rollbackFenceTimes int
)

type TestTCCServiceBusiness struct {
}

func NewTestTCCServiceBusinessProxy() *tcc.TCCServiceProxy {
if tccService != nil {
return tccService
}
tccServiceOnce.Do(func() {
var err error
tccService, err = tcc.NewTCCServiceProxy(&TestTCCServiceBusiness{})
if err != nil {
panic(fmt.Errorf("get TestTCCServiceBusiness tcc service proxy error, %v", err.Error()))
}
})
return tccService
}

func (T TestTCCServiceBusiness) Prepare(ctx context.Context, params interface{}) (b bool, err error) {
db, err := sql.Open(DriverName, Url)
if err != nil {
return false, fmt.Errorf("database connect failed, msg :%s", err.Error())
}
defer db.Close()
tx, err := db.Begin()
if err != nil {
return false, fmt.Errorf("transaction begin failed, msg :%s", err.Error())
}

defer func() {
if err != nil {
err = fmt.Errorf("business method throw error: %s, rollback result: %s", err, tx.Rollback())
return
}
b, err = true, tx.Commit()
}()

err = fence.WithFence(ctx, tx, func() error {
log.Infof("TestTCCServiceBusiness Prepare, param %v", params)
return nil
})

return
}

func (T TestTCCServiceBusiness) Commit(ctx context.Context, businessActionContext *tm.BusinessActionContext) (b bool, err error) {
db, err := sql.Open(DriverName, Url)
if err != nil {
return false, fmt.Errorf("database connect failed, msg :%s", err.Error())
}
defer db.Close()
tx, err := db.Begin()
if err != nil {
return false, fmt.Errorf("transaction begin failed, msg :%s", err.Error())
}

defer func() {
if err != nil {
err = fmt.Errorf("business method throw error: %s, rollback result: %s", err, tx.Rollback())
return
}
b, err = true, tx.Commit()
}()

err = fence.WithFence(ctx, tx, func() error {
log.Infof("TestTCCServiceBusiness Commit, param %v", businessActionContext)
return nil
})

return
}

func (T TestTCCServiceBusiness) Rollback(ctx context.Context, businessActionContext *tm.BusinessActionContext) (b bool, err error) {
db, err := sql.Open(DriverName, Url)
if err != nil {
return false, fmt.Errorf("database connect failed, msg :%s", err.Error())
}
defer db.Close()
tx, err := db.Begin()
if err != nil {
return false, fmt.Errorf("transaction begin failed, msg :%s", err.Error())
}

defer func() {
if err != nil {
err = fmt.Errorf("business method throw error: %s, rollback result: %s", err, tx.Rollback())
return
}
b, err = true, tx.Commit()
}()

err = fence.WithFence(ctx, tx, func() error {
log.Infof("TestTCCServiceBusiness Rollback, param %v", businessActionContext)
return nil
})

return
}

func (T TestTCCServiceBusiness) GetActionName() string {
return "TestTCCServiceBusiness"
}

type TestTCCServiceBusiness2 struct {
}

func NewTestTCCServiceBusiness2Proxy() *tcc.TCCServiceProxy {
if tccService2 != nil {
return tccService2
}
tccService2Once.Do(func() {
var err error
tccService2, err = tcc.NewTCCServiceProxy(&TestTCCServiceBusiness2{})
if err != nil {
panic(fmt.Errorf("TestTCCServiceBusiness2 get tcc service proxy error, %v", err.Error()))
}
if err != nil {
panic(fmt.Errorf("TestTCCServiceBusiness2 register resource error, %v", err.Error()))
}
})
return tccService2
}

func (T TestTCCServiceBusiness2) Prepare(ctx context.Context, params interface{}) (b bool, err error) {
db, err := sql.Open(DriverName, Url)
if err != nil {
return false, fmt.Errorf("database connect failed, msg :%s", err.Error())
}
defer db.Close()
tx, err := db.Begin()
if err != nil {
return false, fmt.Errorf("transaction begin failed, msg :%s", err.Error())
}

defer func() {
if err != nil {
err = fmt.Errorf("business method throw error: %s, rollback result: %s", err, tx.Rollback())
return
}
b, err = true, tx.Commit()
}()

err = fence.WithFence(ctx, tx, func() error {
log.Infof("TestTCCServiceBusiness2 Prepare, param %v", params)
return nil
})

return
}

func (T TestTCCServiceBusiness2) Commit(ctx context.Context, businessActionContext *tm.BusinessActionContext) (b bool, err error) {
db, err := sql.Open(DriverName, Url)
if err != nil {
return false, fmt.Errorf("database connect failed, msg :%s", err.Error())
}
defer db.Close()
tx, err := db.Begin()
if err != nil {
return false, fmt.Errorf("transaction begin failed, msg :%s", err.Error())
}

defer func() {
if err != nil {
err = fmt.Errorf("business method throw error: %s, rollback result: %s", err, tx.Rollback())
return
}
b, err = true, tx.Commit()
}()

err = fence.WithFence(ctx, tx, func() error {
log.Infof("TestTCCServiceBusiness2 Commit, param %v", businessActionContext)
return nil
})

return
}

func (T TestTCCServiceBusiness2) Rollback(ctx context.Context, businessActionContext *tm.BusinessActionContext) (b bool, err error) {
db, err := sql.Open(DriverName, Url)
if err != nil {
return false, fmt.Errorf("database connect failed, msg :%s", err.Error())
}
defer db.Close()
tx, err := db.Begin()
if err != nil {
return false, fmt.Errorf("transaction begin failed, msg :%s", err.Error())
}

defer func() {
if err != nil {
err = fmt.Errorf("business method throw error: %s, rollback result: %s", err, tx.Rollback())
return
}
b, err = true, tx.Commit()
}()

err = fence.WithFence(ctx, tx, func() error {
log.Infof("TestTCCServiceBusiness2 Rollback, param %v", businessActionContext)
return nil
})

return
}

func (T TestTCCServiceBusiness2) GetActionName() string {
return "TestTCCServiceBusiness2"
}

Loading…
Cancel
Save