Browse Source

Feat add two phase (#122)

* add two phase

* support seata dubbo
tags/v0.1.0-rc1
Yuecai Liu GitHub 3 years ago
parent
commit
1ced5eb5a2
No known key found for this signature in database GPG Key ID: 4AEE18F83AFDEB23
33 changed files with 1809 additions and 324 deletions
  1. +5
    -8
      go.mod
  2. +544
    -8
      go.sum
  3. +8
    -0
      pkg/common/constants.go
  4. +1
    -1
      pkg/common/log/logging.go
  5. +49
    -0
      pkg/common/types/types.go
  6. +3
    -3
      pkg/config/getty_config.go
  7. +96
    -0
      pkg/integration/dubbo/dubbo_transaction_filter.go
  8. +29
    -0
      pkg/integration/integration.go
  9. +1
    -1
      pkg/remoting/getty/getty_remoting.go
  10. +3
    -5
      pkg/remoting/getty/listener.go
  11. +1
    -1
      pkg/remoting/getty/rpc_client.go
  12. +5
    -10
      pkg/remoting/getty/session_manager.go
  13. +3
    -3
      pkg/remoting/processor/client/client_heart_beat_processor_test.go
  14. +3
    -3
      pkg/remoting/processor/client/client_on_response_processor_test.go
  15. +3
    -3
      pkg/remoting/processor/client/rm_branch_commit_processor_test.go
  16. +64
    -65
      pkg/remoting/processor/client/rm_branch_rollback_processor_test.go
  17. +0
    -97
      pkg/rm/tcc/remoting/remoting.go
  18. +26
    -10
      pkg/rm/tcc/tcc_resource.go
  19. +45
    -39
      pkg/rm/tcc/tcc_service.go
  20. +315
    -0
      pkg/rm/two_phase.go
  21. +208
    -0
      pkg/rm/two_phase_test.go
  22. +24
    -6
      pkg/tm/context.go
  23. +8
    -8
      pkg/tm/global_transaction.go
  24. +4
    -4
      pkg/tm/transaction_executor.go
  25. +64
    -0
      sample/tcc/dubbo/client/cmd/client.go
  26. +16
    -0
      sample/tcc/dubbo/client/conf/dubbogo.yml
  27. +35
    -0
      sample/tcc/dubbo/client/service/user_provider.go
  28. +77
    -0
      sample/tcc/dubbo/server/cmd/server.go
  29. +20
    -0
      sample/tcc/dubbo/server/conf/dubbogo.yml
  30. +48
    -0
      sample/tcc/dubbo/server/service/user_provider.go
  31. +72
    -0
      sample/tcc/local/cmd/local.go
  32. +0
    -37
      sample/tcc/local/cmd/tcc_local_transation.go
  33. +29
    -12
      sample/tcc/local/service/service.go

+ 5
- 8
go.mod View File

@@ -3,20 +3,17 @@ module github.com/seata/seata-go
go 1.16

require (
dubbo.apache.org/dubbo-go/v3 v3.0.2-0.20220508105316-b27ec53b7bab
github.com/BurntSushi/toml v1.1.0 // indirect
github.com/apache/dubbo-getty v1.4.8
github.com/dubbogo/gost v1.12.3
github.com/golang/snappy v0.0.4 // indirect
github.com/kr/pretty v0.3.0 // indirect
github.com/apache/dubbo-go-hessian2 v1.11.0
github.com/dubbogo/gost v1.12.5
github.com/natefinch/lumberjack v2.0.0+incompatible
github.com/pkg/errors v0.9.1
github.com/rogpeppe/go-internal v1.8.0 // indirect
github.com/stretchr/testify v1.7.0
github.com/stretchr/testify v1.7.1
go.uber.org/atomic v1.9.0
go.uber.org/zap v1.19.1
go.uber.org/zap v1.21.0
golang.org/x/tools v0.1.11 // indirect
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c // indirect
gopkg.in/natefinch/lumberjack.v2 v2.0.0 // indirect
vimagination.zapto.org/byteio v0.0.0-20200222190125-d27cba0f0b10
vimagination.zapto.org/memio v0.0.0-20200222190306-588ebc67b97d // indirect
)

+ 544
- 8
go.sum
File diff suppressed because it is too large
View File


+ 8
- 0
pkg/common/constants.go View File

@@ -21,4 +21,12 @@ const (
StartTime = "action-start-time"
HostName = "host-name"
ActionContext = "actionContext"

SeataXidKey = "SEATA_XID"
XidKey = "TX_XID"
MdcXidKey = "X-TX-XID"
MdcBranchIDKey = "X-TX-BRANCH-ID"
BranchTypeKey = "TX_BRANCH_TYPE"
GlobalLockKey = "TX_LOCK"
SeataFilterKey = "seataDubboFilter"
)

+ 1
- 1
pkg/common/log/logging.go View File

@@ -22,7 +22,7 @@ import (
"errors"
"fmt"
"time"
getty "github.com/apache/dubbo-getty"
"github.com/natefinch/lumberjack"
"go.uber.org/zap"


+ 49
- 0
pkg/common/types/types.go View File

@@ -0,0 +1,49 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package types

import "reflect"

type ReferencedService interface {
Reference() string
}

// GetReference return the reference id of the service.
// If the service implemented the ReferencedService interface,
// it will call the Reference method. If not, it will
// return the struct name as the reference id.
func GetReference(service interface{}) string {
if s, ok := service.(ReferencedService); ok {
return s.Reference()
}
ref := ""
sType := reflect.TypeOf(service)
kind := sType.Kind()
switch kind {
case reflect.Struct:
ref = sType.Name()
case reflect.Ptr:
sName := sType.Elem().Name()
if sName != "" {
ref = sName
} else {
ref = sType.Elem().Field(0).Name
}
}
return ref
}

+ 3
- 3
pkg/config/getty_config.go View File

@@ -38,8 +38,8 @@ type GettyConfig struct {
// GetDefaultGettyConfig ...
func GetDefaultGettyConfig() GettyConfig {
return GettyConfig{
ReconnectInterval: 0,
ConnectionNum: 2,
ReconnectInterval: 1,
ConnectionNum: 20,
HeartbeatPeriod: 10 * time.Second,
GettySessionParam: GettySessionParam{
CompressEncoding: false,
@@ -51,7 +51,7 @@ func GetDefaultGettyConfig() GettyConfig {
TCPReadTimeout: time.Second,
TCPWriteTimeout: 5 * time.Second,
WaitTimeout: time.Second,
CronPeriod: time.Second,
CronPeriod: 5 * time.Second,
MaxMsgLen: 4096,
SessionName: "rpc_client",
},


+ 96
- 0
pkg/integration/dubbo/dubbo_transaction_filter.go View File

@@ -0,0 +1,96 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package dubbo

import (
"context"
"strings"
"sync"

"dubbo.apache.org/dubbo-go/v3/filter"
"dubbo.apache.org/dubbo-go/v3/protocol"
"github.com/seata/seata-go/pkg/common"
"github.com/seata/seata-go/pkg/common/log"
"github.com/seata/seata-go/pkg/tm"
)

var (
seataFilter *dubboTransactionFilter
once sync.Once
)

type Filter interface {
}

type dubboTransactionFilter struct {
}

func GetDubboTransactionFilter() filter.Filter {
if seataFilter == nil {
once.Do(func() {
seataFilter = &dubboTransactionFilter{}
})
}
return seataFilter
}

func (d *dubboTransactionFilter) Invoke(ctx context.Context, invoker protocol.Invoker, invocation protocol.Invocation) protocol.Result {
xid := tm.GetXID(ctx)
rpcXid := d.getRpcXid(invocation)
log.Infof("xid in context is %s, xid in RpcContextis %s", xid, rpcXid)

if xid != "" {
// dubbo go
invocation.SetAttachment(common.SeataXidKey, xid)
// dubbo java
invocation.SetAttachment(common.XidKey, xid)
} else if rpcXid != xid {
ctx = tm.InitSeataContext(ctx)
tm.SetXIDCopy(ctx, rpcXid)
}
return invoker.Invoke(ctx, invocation)
// todo why should unbind xid???
}

func (*dubboTransactionFilter) OnResponse(ctx context.Context, result protocol.Result, invoker protocol.Invoker, invocation protocol.Invocation) protocol.Result {
return result
}

func (d *dubboTransactionFilter) getRpcXid(invocation protocol.Invocation) string {
rpcXid := d.getDubboGoRpcXid(invocation)
if rpcXid == "" {
rpcXid = d.getDubboJavaRpcXid(invocation)
}
return rpcXid
}

func (*dubboTransactionFilter) getDubboGoRpcXid(invocation protocol.Invocation) string {
rpcXid := invocation.GetAttachmentWithDefaultValue(common.SeataXidKey, "")
if rpcXid == "" {
rpcXid = invocation.GetAttachmentWithDefaultValue(strings.ToLower(common.SeataXidKey), "")
}
return rpcXid
}

func (*dubboTransactionFilter) getDubboJavaRpcXid(invocation protocol.Invocation) string {
rpcXid := invocation.GetAttachmentWithDefaultValue(common.XidKey, "")
if rpcXid == "" {
rpcXid = invocation.GetAttachmentWithDefaultValue(strings.ToLower(common.XidKey), "")
}
return rpcXid
}

+ 29
- 0
pkg/integration/integration.go View File

@@ -0,0 +1,29 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package integration

import (
"dubbo.apache.org/dubbo-go/v3/common/extension"

"github.com/seata/seata-go/pkg/common"
"github.com/seata/seata-go/pkg/integration/dubbo"
)

func UseDubbo() {
extension.SetFilter(common.SeataFilterKey, dubbo.GetDubboTransactionFilter)
}

+ 1
- 1
pkg/remoting/getty/getty_remoting.go View File

@@ -30,7 +30,7 @@ import (
)

const (
RPC_REQUEST_TIMEOUT = 30 * time.Second
RPC_REQUEST_TIMEOUT = 5 * time.Second
)

var (


+ 3
- 5
pkg/remoting/getty/listener.go View File

@@ -72,32 +72,30 @@ func (client *gettyClientHandler) OnOpen(session getty.Session) error {
sessionManager.ReleaseGettySession(session)
return
}

//todo
//client.GettySessionOnOpenChannel <- session.RemoteAddr()
}()

return nil
}

func (client *gettyClientHandler) OnError(session getty.Session, err error) {
log.Infof("OnError session{%s} got error{%v}, will be closed.", session.Stat(), err)
sessionManager.ReleaseGettySession(session)
}

func (client *gettyClientHandler) OnClose(session getty.Session) {
log.Infof("OnClose session{%s} is closing......", session.Stat())
sessionManager.ReleaseGettySession(session)
}

func (client *gettyClientHandler) OnMessage(session getty.Session, pkg interface{}) {
ctx := context.Background()
log.Debugf("received message: {%#v}", pkg)

rpcMessage, ok := pkg.(message.RpcMessage)
if !ok {
log.Errorf("received message is not protocol.RpcMessage. pkg: %#v", pkg)
return
}

if mm, ok := rpcMessage.Body.(message.MessageTypeAware); ok {
processor := client.processorTable[mm.GetTypeCode()]
if processor != nil {
@@ -111,7 +109,7 @@ func (client *gettyClientHandler) OnMessage(session getty.Session, pkg interface
}

func (client *gettyClientHandler) OnCron(session getty.Session) {
GetGettyRemotingClient().SendAsyncRequest(message.HeartBeatMessagePing)
//GetGettyRemotingClient().SendAsyncRequest(message.HeartBeatMessagePing)
}

func (client *gettyClientHandler) RegisterProcessor(msgType message.MessageType, processor processor.RemotingProcessor) {


+ 1
- 1
pkg/remoting/getty/rpc_client.go View File

@@ -58,7 +58,7 @@ func (c *RpcClient) init() {
for _, address := range addressList {
gettyClient := getty.NewTCPClient(
getty.WithServerAddress(address),
getty.WithConnectionNumber((int)(c.conf.GettyConfig.ConnectionNum)),
getty.WithConnectionNumber(c.conf.GettyConfig.ConnectionNum),
getty.WithReconnectInterval(c.conf.GettyConfig.ReconnectInterval),
getty.WithClientTaskPool(gxsync.NewTaskPoolSimple(0)),
)


+ 5
- 10
pkg/remoting/getty/session_manager.go View File

@@ -27,17 +27,12 @@ import (

var (
MAX_CHECK_ALIVE_RETRY = 600

CHECK_ALIVE_INTERNAL = 100

allSessions = sync.Map{}

CHECK_ALIVE_INTERNAL = 100
allSessions = sync.Map{}
// serverAddress -> rpc_client.Session -> bool
serverSessions = sync.Map{}

sessionSize int32 = 0

sessionManager = &GettySessionManager{}
serverSessions = sync.Map{}
sessionSize int32 = 0
sessionManager = &GettySessionManager{}
)

type GettySessionManager struct{}


+ 3
- 3
pkg/remoting/processor/client/client_heart_beat_processor_test.go View File

@@ -28,9 +28,9 @@ import (
func TestClientHeartBeatProcessor(t *testing.T) {
// testcases
var tests = []struct {
name string // testcase name
rpcMsg message.RpcMessage // rpcMessage case
wantErr bool //want testcase err or not
name string // testcase name
rpcMsg message.RpcMessage // rpcMessage case
wantErr bool //want testcase err or not
}{
{
name: "chb-testcase1",


+ 3
- 3
pkg/remoting/processor/client/client_on_response_processor_test.go View File

@@ -28,9 +28,9 @@ import (
func TestClientOnResponseProcessor(t *testing.T) {
// testcases
var tests = []struct {
name string // testcase name
rpcMsg message.RpcMessage // rpcMessage case
wantErr bool //want testcase err or not
name string // testcase name
rpcMsg message.RpcMessage // rpcMessage case
wantErr bool //want testcase err or not
}{
{
name: "cor-testcase1-mergeResult",


+ 3
- 3
pkg/remoting/processor/client/rm_branch_commit_processor_test.go View File

@@ -32,9 +32,9 @@ func TestRmBranchCommitProcessor(t *testing.T) {

// testcases
var tests = []struct {
name string // testcase name
rpcMsg message.RpcMessage // rpcMessage case
wantErr bool //want testcase err or not
name string // testcase name
rpcMsg message.RpcMessage // rpcMessage case
wantErr bool //want testcase err or not
}{
{
name: "rbc-testcase1-failure",


+ 64
- 65
pkg/remoting/processor/client/rm_branch_rollback_processor_test.go View File

@@ -15,69 +15,68 @@
* limitations under the License.
*/

package client
package client

import (
"context"
"testing"
model2 "github.com/seata/seata-go/pkg/protocol/branch"
"github.com/seata/seata-go/pkg/protocol/codec"
"github.com/seata/seata-go/pkg/protocol/message"
"github.com/seata/seata-go/pkg/rm"
"github.com/seata/seata-go/pkg/rm/tcc"
)
func TestRmBranchRollbackProcessor(t *testing.T) {
// testcases
var tests = []struct {
name string // testcase name
rpcMsg message.RpcMessage // rpcMessage case
wantErr bool //want testcase err or not
}{
{
name: "rbr-testcase1-failure",
rpcMsg: message.RpcMessage{
ID: 223,
Type: message.GettyRequestType(message.MessageType_BranchRollback),
Codec: byte(codec.CodecTypeSeata),
Compressor: byte(1),
HeadMap: map[string]string{
"name": " Jack",
"age": "12",
"address": "Beijing",
},
Body: message.BranchRollbackRequest{
AbstractBranchEndRequest: message.AbstractBranchEndRequest{
Xid: "123345",
BranchId: 56679,
BranchType: model2.BranchTypeTCC,
ResourceId: "1232324",
ApplicationData: []byte("TestExtraData"),
},
},
},
wantErr: true, // need dail to server, so err accured
},
}
var ctx context.Context
var rbrProcessor rmBranchRollbackProcessor
rm.GetResourceManagerInstance().RegisterResourceManager(tcc.GetTCCResourceManagerInstance())
// run tests
for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
err := rbrProcessor.Process(ctx, tc.rpcMsg)
if (err != nil) != tc.wantErr {
t.Errorf("rmBranchRollbackProcessor wantErr: %v, got: %v", tc.wantErr, err)
return
}
})
}
}
import (
"context"
"testing"

model2 "github.com/seata/seata-go/pkg/protocol/branch"
"github.com/seata/seata-go/pkg/protocol/codec"
"github.com/seata/seata-go/pkg/protocol/message"
"github.com/seata/seata-go/pkg/rm"
"github.com/seata/seata-go/pkg/rm/tcc"
)

func TestRmBranchRollbackProcessor(t *testing.T) {

// testcases
var tests = []struct {
name string // testcase name
rpcMsg message.RpcMessage // rpcMessage case
wantErr bool //want testcase err or not
}{
{
name: "rbr-testcase1-failure",
rpcMsg: message.RpcMessage{
ID: 223,
Type: message.GettyRequestType(message.MessageType_BranchRollback),
Codec: byte(codec.CodecTypeSeata),
Compressor: byte(1),
HeadMap: map[string]string{
"name": " Jack",
"age": "12",
"address": "Beijing",
},
Body: message.BranchRollbackRequest{
AbstractBranchEndRequest: message.AbstractBranchEndRequest{
Xid: "123345",
BranchId: 56679,
BranchType: model2.BranchTypeTCC,
ResourceId: "1232324",
ApplicationData: []byte("TestExtraData"),
},
},
},

wantErr: true, // need dail to server, so err accured
},
}

var ctx context.Context
var rbrProcessor rmBranchRollbackProcessor

rm.GetResourceManagerInstance().RegisterResourceManager(tcc.GetTCCResourceManagerInstance())

// run tests
for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
err := rbrProcessor.Process(ctx, tc.rpcMsg)
if (err != nil) != tc.wantErr {
t.Errorf("rmBranchRollbackProcessor wantErr: %v, got: %v", tc.wantErr, err)
return
}
})
}

}

+ 0
- 97
pkg/rm/tcc/remoting/remoting.go View File

@@ -1,97 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package remoting

import (
"reflect"
)

type RemoteType byte
type ServiceType byte

const (
RemoteTypeSofaRpc RemoteType = 2
RemoteTypeDubbo RemoteType = 3
RemoteTypeRestful RemoteType = 4
RemoteTypeLocalService RemoteType = 5
RemoteTypeHsf RemoteType = 8

ServiceTypeReference ServiceType = 1
ServiceTypeProvider ServiceType = 2
)

type RemotingDesc struct {
/**
* is referenc bean ?
*/
isReference bool

/**
* rpc target bean, the service bean has this property
*/
targetBean interface{}

/**
* the tcc interface tyep
*/
interfaceClass reflect.Type

/**
* interface class name
*/
interfaceClassName string

/**
* rpc uniqueId: hsf, dubbo's version, sofa-rpc's uniqueId
*/
uniqueId string

/**
* dubbo/hsf 's group
*/
group string

/**
* protocol: sofa-rpc, dubbo, injvm etc.
*/
protocol RemoteType
}

type RemotingParser interface {
isRemoting(bean interface{}, beanName string) (bool, error)

/**
* if it is reference bean ?
*/
isReference(bean interface{}, beanName string) (bool, error)

/**
* if it is service bean ?
*/
isService(bean interface{}, beanName string) (bool, error)

/**
* get the remoting bean info
*/
getServiceDesc(bean interface{}, beanName string) (RemotingDesc, error)

/**
* the remoting protocol
*/
getProtocol() RemoteType
}

+ 26
- 10
pkg/rm/tcc/tcc_resource.go View File

@@ -24,10 +24,13 @@ import (
"sync"

"github.com/seata/seata-go/pkg/common"
"github.com/seata/seata-go/pkg/protocol/branch"
"github.com/seata/seata-go/pkg/common/log"

"github.com/seata/seata-go/pkg/protocol/resource"
"github.com/seata/seata-go/pkg/rm"
"github.com/seata/seata-go/pkg/tm"

"github.com/seata/seata-go/pkg/protocol/branch"
"github.com/seata/seata-go/pkg/rm"
)

var (
@@ -36,10 +39,23 @@ var (
)

type TCCResource struct {
TCCServiceBean TCCService
ResourceGroupId string `default:"DEFAULT"`
AppName string
ActionName string
*rm.TwoPhaseAction
}

func ParseTCCResource(v interface{}) (*TCCResource, error) {
t, err := rm.ParseTwoPhaseAction(v)
if err != nil {
log.Errorf("%#v is not tcc two phase service, %s", v, err.Error())
return nil, err
}
return &TCCResource{
// todo read from config
ResourceGroupId: `default:"DEFAULT"`,
AppName: "seata-go-mock-app-name",
TwoPhaseAction: t,
}, nil
}

func (t *TCCResource) GetResourceGroupId() string {
@@ -47,7 +63,7 @@ func (t *TCCResource) GetResourceGroupId() string {
}

func (t *TCCResource) GetResourceId() string {
return t.ActionName
return t.TwoPhaseAction.GetActionName()
}

func (t *TCCResource) GetBranchType() branch.BranchType {
@@ -118,14 +134,14 @@ func (t *TCCResourceManager) BranchCommit(ctx context.Context, ranchType branch.
tccResource, _ = resource.(*TCCResource)
}

err := tccResource.TCCServiceBean.Commit(ctx, t.getBusinessActionContext(xid, branchID, resourceID, applicationData))
_, err := tccResource.TwoPhaseAction.Commit(ctx, t.getBusinessActionContext(xid, branchID, resourceID, applicationData))
if err != nil {
return branch.BranchStatusPhasetwoCommitFailedRetryable, err
}
return branch.BranchStatusPhasetwoCommitted, err
}

func (t *TCCResourceManager) getBusinessActionContext(xid string, branchID int64, resourceID string, applicationData []byte) tm.BusinessActionContext {
func (t *TCCResourceManager) getBusinessActionContext(xid string, branchID int64, resourceID string, applicationData []byte) *tm.BusinessActionContext {
var actionContextMap = make(map[string]interface{}, 2)
if len(applicationData) > 0 {
var tccContext map[string]interface{}
@@ -137,11 +153,11 @@ func (t *TCCResourceManager) getBusinessActionContext(xid string, branchID int64
}
}

return tm.BusinessActionContext{
return &tm.BusinessActionContext{
Xid: xid,
BranchId: branchID,
ActionName: resourceID,
ActionContext: &actionContextMap,
ActionContext: actionContextMap,
}
}

@@ -155,7 +171,7 @@ func (t *TCCResourceManager) BranchRollback(ctx context.Context, ranchType branc
tccResource, _ = resource.(*TCCResource)
}

err := tccResource.TCCServiceBean.Rollback(ctx, t.getBusinessActionContext(xid, branchID, resourceID, applicationData))
_, err := tccResource.TwoPhaseAction.Rollback(ctx, t.getBusinessActionContext(xid, branchID, resourceID, applicationData))
if err != nil {
return branch.BranchStatusPhasetwoRollbacked, err
}


+ 45
- 39
pkg/rm/tcc/tcc_service.go View File

@@ -21,8 +21,11 @@ import (
"context"
"encoding/json"
"fmt"
"sync"
"time"

"github.com/seata/seata-go/pkg/common/types"

"github.com/seata/seata-go/pkg/tm"

"github.com/pkg/errors"
@@ -33,56 +36,59 @@ import (
"github.com/seata/seata-go/pkg/rm"
)

type TCCService interface {
Prepare(ctx context.Context, params interface{}) error
Commit(ctx context.Context, businessActionContext tm.BusinessActionContext) error
Rollback(ctx context.Context, businessActionContext tm.BusinessActionContext) error

GetActionName() string
//GetRemoteType() remoting.RemoteType
//GetServiceType() remoting.ServiceType
}

type TCCServiceProxy struct {
TCCService
referenceName string
registerResourceOnce sync.Once
*TCCResource
}

func NewTCCServiceProxy(tccService TCCService) TCCService {
if tccService == nil {
panic("param tccService should not be nil")
}

// register resource
tccResource := TCCResource{
TCCServiceBean: tccService,
ResourceGroupId: "DEFAULT",
AppName: "",
ActionName: tccService.GetActionName(),
}
err := rm.GetResourceManagerInstance().GetResourceManager(branch.BranchTypeTCC).RegisterResource(&tccResource)
func NewTCCServiceProxy(service interface{}) (*TCCServiceProxy, error) {
tccResource, err := ParseTCCResource(service)
if err != nil {
panic(fmt.Sprintf("NewTCCServiceProxy registerResource error: {%#v}", err.Error()))
log.Errorf("invalid tcc service, err %v", err)
return nil, err
}

return &TCCServiceProxy{
TCCService: tccService,
TCCResource: tccResource,
}, err
}

func (t *TCCServiceProxy) RegisterResource() error {
var err error
t.registerResourceOnce.Do(func() {
err = rm.GetResourceManagerInstance().GetResourceManager(branch.BranchTypeTCC).RegisterResource(t.TCCResource)
if err != nil {
log.Errorf("NewTCCServiceProxy RegisterResource error: %#v", err.Error())
}
})
return err
}

func (t *TCCServiceProxy) SetReferenceName(referenceName string) {
t.referenceName = referenceName
}

func (t *TCCServiceProxy) Reference() string {
if t.referenceName != "" {
return t.referenceName
}
return types.GetReference(t.TCCResource.TwoPhaseAction.GetTwoPhaseService())
}

func (t *TCCServiceProxy) Prepare(ctx context.Context, param interface{}) error {
if tm.HasXID(ctx) {
err := t.RegisteBranch(ctx, param)
func (t *TCCServiceProxy) Prepare(ctx context.Context, param ...interface{}) (interface{}, error) {
if tm.IsTransactionOpened(ctx) {
err := t.registeBranch(ctx)
if err != nil {
return err
return nil, err
}
}
return t.TCCService.Prepare(ctx, param)
return t.TCCResource.Prepare(ctx, param)
}

func (t *TCCServiceProxy) RegisteBranch(ctx context.Context, param interface{}) error {
func (t *TCCServiceProxy) registeBranch(ctx context.Context) error {
// register transaction branch
if !tm.HasXID(ctx) {
err := errors.New("BranchRegister error, xid should not be nil")
if !tm.IsTransactionOpened(ctx) {
err := errors.New("BranchRegister error, transaction should be opened")
log.Errorf(err.Error())
return err
}
@@ -101,10 +107,10 @@ func (t *TCCServiceProxy) RegisteBranch(ctx context.Context, param interface{})
}

actionContext := &tm.BusinessActionContext{
Xid: tm.GetXID(ctx),
BranchId: branchId,
ActionName: t.GetActionName(),
ActionContext: param,
Xid: tm.GetXID(ctx),
BranchId: branchId,
ActionName: t.GetActionName(),
//ActionContext: param,
}
tm.SetBusinessActionContext(ctx, actionContext)
return nil


+ 315
- 0
pkg/rm/two_phase.go View File

@@ -0,0 +1,315 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package rm

import (
"context"
"reflect"

"github.com/pkg/errors"
"github.com/seata/seata-go/pkg/tm"
)

const (
TwoPhaseActionTag = "seataTwoPhaseAction"
TwoPhaseActionNameTag = "seataTwoPhaseServiceName"
TwoPhaseActionPrepareTagVal = "prepare"
TwoPhaseActionCommitTagVal = "commit"
TwoPhaseActionRollbackTagVal = "rollback"
)

var (
typError = reflect.Zero(reflect.TypeOf((*error)(nil)).Elem()).Type()
typContext = reflect.Zero(reflect.TypeOf((*context.Context)(nil)).Elem()).Type()
typBool = reflect.Zero(reflect.TypeOf((*bool)(nil)).Elem()).Type()
typBusinessContextInterface = reflect.Zero(reflect.TypeOf((*tm.BusinessActionContext)(nil))).Type()
)

type TwoPhaseInterface interface {
Prepare(ctx context.Context, params ...interface{}) (bool, error)
Commit(ctx context.Context, businessActionContext *tm.BusinessActionContext) (bool, error)
Rollback(ctx context.Context, businessActionContext *tm.BusinessActionContext) (bool, error)
GetActionName() string
}

type TwoPhaseAction struct {
twoPhaseService interface{}
actionName string
prepareMethodName string
prepareMethod *reflect.Value
commitMethodName string
commitMethod *reflect.Value
rollbackMethodName string
rollbackMethod *reflect.Value
}

func (t *TwoPhaseAction) GetTwoPhaseService() interface{} {
return t.twoPhaseService
}

func (t *TwoPhaseAction) Prepare(ctx context.Context, params ...interface{}) (bool, error) {
values := make([]reflect.Value, 0, len(params))
values = append(values, reflect.ValueOf(ctx))
for _, param := range params {
values = append(values, reflect.ValueOf(param))
}
res := t.prepareMethod.Call(values)
var (
r0 = res[0].Interface()
r1 = res[1].Interface()
res0 bool
res1 error
)
if r0 != nil {
res0 = r0.(bool)
}
if r1 != nil {
res1 = r1.(error)
}
return res0, res1
}

func (t *TwoPhaseAction) Commit(ctx context.Context, businessActionContext *tm.BusinessActionContext) (bool, error) {
res := t.commitMethod.Call([]reflect.Value{reflect.ValueOf(ctx), reflect.ValueOf(businessActionContext)})
var (
r0 = res[0].Interface()
r1 = res[1].Interface()
res0 bool
res1 error
)
if r0 != nil {
res0 = r0.(bool)
}
if r1 != nil {
res1 = r1.(error)
}
return res0, res1
}

func (t *TwoPhaseAction) Rollback(ctx context.Context, businessActionContext *tm.BusinessActionContext) (bool, error) {
res := t.rollbackMethod.Call([]reflect.Value{reflect.ValueOf(ctx), reflect.ValueOf(businessActionContext)})
var (
r0 = res[0].Interface()
r1 = res[1].Interface()
res0 bool
res1 error
)
if r0 != nil {
res0 = r0.(bool)
}
if r1 != nil {
res1 = r1.(error)
}
return res0, res1
}

func (t *TwoPhaseAction) GetActionName() string {
return t.actionName
}

func IsTwoPhaseAction(v interface{}) bool {
m, err := ParseTwoPhaseAction(v)
return m != nil && err != nil
}

func ParseTwoPhaseAction(v interface{}) (*TwoPhaseAction, error) {
if m, ok := v.(TwoPhaseInterface); ok {
return parseTwoPhaseActionByTwoPhaseInterface(m), nil
}
return ParseTwoPhaseActionByInterface(v)
}

func parseTwoPhaseActionByTwoPhaseInterface(v TwoPhaseInterface) *TwoPhaseAction {
value := reflect.ValueOf(v)
mp := value.MethodByName("Prepare")
mc := value.MethodByName("Commit")
mr := value.MethodByName("Rollback")
return &TwoPhaseAction{
twoPhaseService: v,
actionName: v.GetActionName(),
prepareMethodName: "Prepare",
prepareMethod: &mp,
commitMethodName: "Commit",
commitMethod: &mc,
rollbackMethodName: "Rollback",
rollbackMethod: &mr,
}
}

func ParseTwoPhaseActionByInterface(v interface{}) (*TwoPhaseAction, error) {
valueOfElem := reflect.ValueOf(v).Elem()
typeOf := valueOfElem.Type()
k := typeOf.Kind()
if k != reflect.Struct {
return nil, errors.New("invalid type kind")
}
numField := typeOf.NumField()
if typeOf.Kind() != reflect.Struct {
return nil, errors.New("param should be a struct, instead of a pointer")
}

var (
hasPrepareMethodName bool
hasCommitMethodName bool
hasRollbackMethod bool
twoPhaseName string
result = TwoPhaseAction{
twoPhaseService: v,
}
)
for i := 0; i < numField; i++ {
t := typeOf.Field(i)
f := valueOfElem.Field(i)
if ms, m, ok := getPrepareAction(t, f); ok {
hasPrepareMethodName = true
result.prepareMethod = m
result.prepareMethodName = ms
} else if ms, m, ok = getCommitMethod(t, f); ok {
hasCommitMethodName = true
result.commitMethod = m
result.commitMethodName = ms
} else if ms, m, ok = getRollbackMethod(t, f); ok {
hasRollbackMethod = true
result.rollbackMethod = m
result.rollbackMethodName = ms
}
}
if !hasPrepareMethodName {
return nil, errors.New("missing prepare method")
}
if !hasCommitMethodName {
return nil, errors.New("missing commit method")
}
if !hasRollbackMethod {
return nil, errors.New("missing rollback method")
}
twoPhaseName = getActionName(v)
if twoPhaseName == "" {
return nil, errors.New("missing two phase name")
}
result.actionName = twoPhaseName
return &result, nil
}

func getPrepareAction(t reflect.StructField, f reflect.Value) (string, *reflect.Value, bool) {
if t.Tag.Get(TwoPhaseActionTag) != TwoPhaseActionPrepareTagVal {
return "", nil, false
}
if f.Kind() != reflect.Func || !f.IsValid() {
return "", nil, false
}
// prepare has 2 retuen error value
if outNum := t.Type.NumOut(); outNum != 2 {
return "", nil, false
}
if returnType := t.Type.Out(0); returnType != typBool {
return "", nil, false
}
if returnType := t.Type.Out(1); returnType != typError {
return "", nil, false
}
// prepared method has at least 1 params, context.Context, and other params
if inNum := t.Type.NumIn(); inNum == 0 {
return "", nil, false
}
if inType := t.Type.In(0); inType != typContext {
return "", nil, false
}
return t.Name, &f, true
}

func getCommitMethod(t reflect.StructField, f reflect.Value) (string, *reflect.Value, bool) {
if t.Tag.Get(TwoPhaseActionTag) != TwoPhaseActionCommitTagVal {
return "", nil, false
}
if f.Kind() != reflect.Func || !f.IsValid() {
return "", nil, false
}
// commit method has 2 retuen error value
if outNum := t.Type.NumOut(); outNum != 2 {
return "", nil, false
}
if returnType := t.Type.Out(0); returnType != typBool {
return "", nil, false
}
if returnType := t.Type.Out(1); returnType != typError {
return "", nil, false
}
// commit method has at least 1 params, context.Context, and other params
if inNum := t.Type.NumIn(); inNum != 2 {
return "", nil, false
}
if inType := t.Type.In(0); inType != typContext {
return "", nil, false
}
if inType := t.Type.In(1); inType != typBusinessContextInterface {
return "", nil, false
}
return t.Name, &f, true

}

func getRollbackMethod(t reflect.StructField, f reflect.Value) (string, *reflect.Value, bool) {
if t.Tag.Get(TwoPhaseActionTag) != TwoPhaseActionRollbackTagVal {
return "", nil, false
}
if f.Kind() != reflect.Func || !f.IsValid() {
return "", nil, false
}
// rollback method has 2 retuen value
if outNum := t.Type.NumOut(); outNum != 2 {
return "", nil, false
}
if returnType := t.Type.Out(0); returnType != typBool {
return "", nil, false
}
if returnType := t.Type.Out(1); returnType != typError {
return "", nil, false
}
// rollback method has at least 1 params, context.Context, and other params
if inNum := t.Type.NumIn(); inNum != 2 {
return "", nil, false
}
if inType := t.Type.In(0); inType != typContext {
return "", nil, false
}
if inType := t.Type.In(1); inType != typBusinessContextInterface {
return "", nil, false
}
return t.Name, &f, true
}

func getActionName(v interface{}) string {
var (
actionName string
valueOf = reflect.ValueOf(v)
valueOfElem = valueOf.Elem()
typeOf = valueOfElem.Type()
)
if typeOf.Kind() != reflect.Struct {
return ""
}
numField := valueOfElem.NumField()
for i := 0; i < numField; i++ {
t := typeOf.Field(i)
if actionName = t.Tag.Get(TwoPhaseActionNameTag); actionName != "" {
break
}
}
return actionName
}

+ 208
- 0
pkg/rm/two_phase_test.go View File

@@ -0,0 +1,208 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package rm

import (
"context"
"fmt"
"testing"

"github.com/seata/seata-go/pkg/tm"
"github.com/stretchr/testify/assert"
)

func TestParseTwoPhaseActionGetMethodName(t *testing.T) {
tests := []struct {
service interface{}
wantService *TwoPhaseAction
wantHasError bool
wantErrMsg string
}{{
service: &struct {
Prepare111 func(ctx context.Context, params string) (bool, error) `seataTwoPhaseAction:"prepare"`
Commit111 func(ctx context.Context, businessActionContext *tm.BusinessActionContext) (bool, error) `seataTwoPhaseAction:"commit"`
Rollback11 func(ctx context.Context, businessActionContext *tm.BusinessActionContext) (bool, error) `seataTwoPhaseAction:"rollback" seataTwoPhaseServiceName:"seataTwoPhaseName"`
GetName func() string
}{},
wantService: &TwoPhaseAction{
actionName: "seataTwoPhaseName",
prepareMethodName: "Prepare111",
commitMethodName: "Commit111",
rollbackMethodName: "Rollback11",
},
wantHasError: false,
}, {
service: &struct {
Prepare func(ctx context.Context, params interface{}) (bool, error) `seataTwoPhaseAction:"prepare"`
Commit func(ctx context.Context, businessActionContext *tm.BusinessActionContext) (bool, error) `seataTwoPhaseAction:"commit"`
Rollback func(ctx context.Context, businessActionContext *tm.BusinessActionContext) (bool, error) `seataTwoPhaseAction:"rollback"`
GetName func() string
}{},
wantService: nil,
wantHasError: true,
wantErrMsg: "missing two phase name",
}, {
service: &struct {
Prepare func(ctx context.Context, params interface{}) (bool, error) `serviceName:"seataTwoPhaseName"`
Commit func(ctx context.Context, businessActionContext *tm.BusinessActionContext) (bool, error) `seataTwoPhaseAction:"commit"`
Rollback func(ctx context.Context, businessActionContext *tm.BusinessActionContext) (bool, error) `seataTwoPhaseAction:"rollback"`
GetName func() string
}{},
wantService: nil,
wantHasError: true,
wantErrMsg: "missing prepare method",
}, {
service: &struct {
Prepare func(ctx context.Context, params interface{}) (bool, error) `seataTwoPhaseAction:"commit" seataTwoPhaseName:"seataTwoPhaseName"`
Commit func(ctx context.Context, businessActionContext *tm.BusinessActionContext) (bool, error)
Rollback func(ctx context.Context, businessActionContext *tm.BusinessActionContext) (bool, error) `seataTwoPhaseAction:"rollback"`
GetName func() string
}{},
wantService: nil,
wantHasError: true,
wantErrMsg: "missing prepare method",
}, {
service: &struct {
Prepare func(ctx context.Context, params interface{}) (bool, error) `seataTwoPhaseAction:"prepare" seataTwoPhaseName:"seataTwoPhaseName"`
Commit func(ctx context.Context, businessActionContext *tm.BusinessActionContext) (bool, error) `seataTwoPhaseAction:"commit"`
Rollback func(ctx context.Context, businessActionContext *tm.BusinessActionContext) (bool, error)
GetName func() string
}{},
wantService: nil,
wantHasError: true,
wantErrMsg: "missing rollback method",
}, {
service: &struct {
Prepare func(ctx context.Context, params interface{}) (bool, error) `seataTwoPhaseAction:"prepare"`
Commit func(ctx context.Context, businessActionContext tm.BusinessActionContext) (bool, error) `seataTwoPhaseAction:"commit"`
Rollback func(ctx context.Context, businessActionContext *tm.BusinessActionContext) (bool, error) `seataTwoPhaseAction:"rollback" seataTwoPhaseServiceName:"seataTwoPhaseName"`
GetName func() string
}{},
wantService: nil,
wantHasError: true,
wantErrMsg: "missing commit method",
}, {
service: &struct {
Prepare func(ctx context.Context, params interface{}) (bool, error) `seataTwoPhaseAction:"prepare"`
Commit func(ctx context.Context, businessActionContext *tm.BusinessActionContext) (bool, error) `seataTwoPhaseAction:"commit"`
Rollback func(ctx context.Context, businessActionContext tm.BusinessActionContext) (bool, error) `seataTwoPhaseAction:"rollback" seataTwoPhaseServiceName:"seataTwoPhaseName"`
GetName func() string
}{},
wantService: nil,
wantHasError: true,
wantErrMsg: "missing rollback method",
}}

for _, tt := range tests {
actual, err := ParseTwoPhaseAction(tt.service)
if tt.wantHasError {
assert.NotNil(t, err)
assert.Equal(t, tt.wantErrMsg, err.Error())
} else {
assert.Nil(t, err)
assert.Equal(t, actual.actionName, tt.wantService.actionName)
assert.Equal(t, actual.prepareMethodName, tt.wantService.prepareMethodName)
assert.Equal(t, actual.commitMethodName, tt.wantService.commitMethodName)
assert.Equal(t, actual.rollbackMethodName, tt.wantService.rollbackMethodName)
}
}
}

type TwoPhaseDemoService1 struct {
TwoPhasePrepare func(ctx context.Context, params ...interface{}) (bool, error) `seataTwoPhaseAction:"prepare" seataTwoPhaseServiceName:"TwoPhaseDemoService"`
TwoPhaseCommit func(ctx context.Context, businessActionContext *tm.BusinessActionContext) (bool, error) `seataTwoPhaseAction:"commit"`
TwoPhaseRollback func(ctx context.Context, businessActionContext *tm.BusinessActionContext) (bool, error) `seataTwoPhaseAction:"rollback"`
TwoPhaseDemoService func() string
}

func NewTwoPhaseDemoService1() *TwoPhaseDemoService1 {
return &TwoPhaseDemoService1{
TwoPhasePrepare: func(ctx context.Context, params ...interface{}) (bool, error) {
return false, fmt.Errorf("execute two phase prepare method, param %v", params)
},
TwoPhaseCommit: func(ctx context.Context, businessActionContext *tm.BusinessActionContext) (bool, error) {
return false, fmt.Errorf("execute two phase commit method, xid %v", businessActionContext.Xid)
},
TwoPhaseRollback: func(ctx context.Context, businessActionContext *tm.BusinessActionContext) (bool, error) {
return true, nil
},
TwoPhaseDemoService: func() string {
return "TwoPhaseDemoService"
},
}
}

func TestParseTwoPhaseActionExecuteMethod1(t *testing.T) {
twoPhaseService, err := ParseTwoPhaseAction(NewTwoPhaseDemoService1())
ctx := context.Background()
assert.Nil(t, err)
assert.Equal(t, "TwoPhasePrepare", twoPhaseService.prepareMethodName)
assert.Equal(t, "TwoPhaseCommit", twoPhaseService.commitMethodName)
assert.Equal(t, "TwoPhaseRollback", twoPhaseService.rollbackMethodName)
assert.Equal(t, "TwoPhaseDemoService", twoPhaseService.actionName)

resp, err := twoPhaseService.Prepare(ctx, 11)
assert.Equal(t, false, resp)
assert.Equal(t, "execute two phase prepare method, param [11]", err.Error())

resp, err = twoPhaseService.Commit(ctx, &tm.BusinessActionContext{Xid: "1234"})
assert.Equal(t, false, resp)
assert.Equal(t, "execute two phase commit method, xid 1234", err.Error())

resp, err = twoPhaseService.Rollback(ctx, &tm.BusinessActionContext{Xid: "1234"})
assert.Equal(t, true, resp)
assert.Nil(t, err)

assert.Equal(t, "TwoPhaseDemoService", twoPhaseService.GetActionName())
}

type TwoPhaseDemoService2 struct {
}

func (t *TwoPhaseDemoService2) Prepare(ctx context.Context, params ...interface{}) (bool, error) {
return false, fmt.Errorf("execute two phase prepare method, param %v", params)
}

func (t *TwoPhaseDemoService2) Commit(ctx context.Context, businessActionContext *tm.BusinessActionContext) (bool, error) {
return true, fmt.Errorf("execute two phase commit method, xid %v", businessActionContext.Xid)
}

func (t *TwoPhaseDemoService2) Rollback(ctx context.Context, businessActionContext *tm.BusinessActionContext) (bool, error) {
return false, fmt.Errorf("execute two phase rollback method, xid %v", businessActionContext.Xid)
}

func (t *TwoPhaseDemoService2) GetActionName() string {
return "TwoPhaseDemoService2"
}

func TestParseTwoPhaseActionExecuteMethod2(t *testing.T) {
twoPhaseService, err := ParseTwoPhaseAction(&TwoPhaseDemoService2{})
ctx := context.Background()
assert.Nil(t, err)
resp, err := twoPhaseService.Prepare(ctx, 11)
assert.Equal(t, false, resp)
assert.Equal(t, "execute two phase prepare method, param [11]", err.Error())

resp, err = twoPhaseService.Commit(ctx, &tm.BusinessActionContext{Xid: "1234"})
assert.Equal(t, true, resp)
assert.Equal(t, "execute two phase commit method, xid 1234", err.Error())

resp, err = twoPhaseService.Rollback(ctx, &tm.BusinessActionContext{Xid: "1234"})
assert.Equal(t, false, resp)
assert.Equal(t, "execute two phase rollback method, xid 1234", err.Error())
}

+ 24
- 6
pkg/tm/context.go View File

@@ -34,12 +34,13 @@ type BusinessActionContext struct {
Xid string
BranchId int64
ActionName string
ActionContext interface{}
ActionContext map[string]interface{}
}

type ContextVariable struct {
TxName string
Xid string
XidCopy string
Status *message.GlobalStatus
TxRole *GlobalTransactionRole
BusinessActionContext *BusinessActionContext
@@ -114,16 +115,25 @@ func SetTransactionRole(ctx context.Context, role GlobalTransactionRole) {
}
}

func GetXID(ctx context.Context) string {
func IsTransactionOpened(ctx context.Context) bool {
variable := ctx.Value(seataContextVariable)
if variable == nil {
return ""
return false
}
return variable.(*ContextVariable).Xid
xid := variable.(*ContextVariable).Xid
return xid != ""
}

func HasXID(ctx context.Context) bool {
return GetXID(ctx) != ""
func GetXID(ctx context.Context) string {
variable := ctx.Value(seataContextVariable)
if variable == nil {
return ""
}
xid := variable.(*ContextVariable).Xid
if xid == "" {
xid = variable.(*ContextVariable).XidCopy
}
return xid
}

func SetXID(ctx context.Context, xid string) {
@@ -133,9 +143,17 @@ func SetXID(ctx context.Context, xid string) {
}
}

func SetXIDCopy(ctx context.Context, xid string) {
variable := ctx.Value(seataContextVariable)
if variable != nil {
variable.(*ContextVariable).XidCopy = xid
}
}

func UnbindXid(ctx context.Context) {
variable := ctx.Value(seataContextVariable)
if variable != nil {
variable.(*ContextVariable).Xid = ""
variable.(*ContextVariable).XidCopy = ""
}
}

+ 8
- 8
pkg/tm/global_transaction.go View File

@@ -111,14 +111,14 @@ func (g *GlobalTransactionManager) Commit(ctx context.Context, gtr *GlobalTransa
break
}
}
if err == nil {
log.Infof("GlobalCommitRequest commit success, xid %s", gtr.Xid)
gtr.Status = res.(message.GlobalCommitResponse).GlobalStatus
UnbindXid(ctx)
return nil
if err != nil {
log.Infof("send global commit request failed, xid %s, error %v", gtr.Xid, err)
return err
}
log.Errorf("GlobalCommitRequest commit failed, xid %s, error %v", gtr.Xid, err)
return err
log.Infof("send global commit request success, xid %s", gtr.Xid)
gtr.Status = res.(message.GlobalCommitResponse).GlobalStatus
UnbindXid(ctx)
return nil
}

// Rollback the global transaction.
@@ -150,7 +150,7 @@ func (g *GlobalTransactionManager) Rollback(ctx context.Context, gtr *GlobalTran
}
}
if err == nil {
log.Errorf("GlobalRollbackRequest rollback success, xid %s", gtr.Xid)
log.Errorf("GlobalRollbackRequest rollback success, xid %s, error %v", gtr.Xid, err.Error())
gtr.Status = res.(message.GlobalRollbackResponse).GlobalStatus
UnbindXid(ctx)
return nil


+ 4
- 4
pkg/tm/transaction_executor.go View File

@@ -49,7 +49,7 @@ func Begin(ctx context.Context, name string) context.Context {
}

var tx *GlobalTransaction
if HasXID(ctx) {
if IsTransactionOpened(ctx) {
tx = &GlobalTransaction{
Xid: GetXID(ctx),
Status: message.GlobalStatusBegin,
@@ -70,7 +70,7 @@ func Begin(ctx context.Context, name string) context.Context {
}

// todo timeout should read from config
err := GetGlobalTransactionManager().Begin(ctx, tx, 50, name)
err := GetGlobalTransactionManager().Begin(ctx, tx, 60000*30, name)
if err != nil {
panic(fmt.Sprintf("transactionTemplate: begin transaction failed, error %v", err))
}
@@ -79,7 +79,7 @@ func Begin(ctx context.Context, name string) context.Context {
}

// commit global transaction
func CommitOrRollback(ctx context.Context, err error) error {
func CommitOrRollback(ctx context.Context, err *error) error {
tx := &GlobalTransaction{
Xid: GetXID(ctx),
Status: *GetTxStatus(ctx),
@@ -87,7 +87,7 @@ func CommitOrRollback(ctx context.Context, err error) error {
}

var resp error
if err == nil {
if *err == nil {
resp = GetGlobalTransactionManager().Commit(ctx, tx)
if resp != nil {
log.Infof("transactionTemplate: commit transaction failed, error %v", err)


+ 64
- 0
sample/tcc/dubbo/client/cmd/client.go View File

@@ -0,0 +1,64 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package main

import (
"context"

"dubbo.apache.org/dubbo-go/v3/common/logger"
"dubbo.apache.org/dubbo-go/v3/config"
_ "dubbo.apache.org/dubbo-go/v3/imports"
_ "github.com/seata/seata-go/pkg/imports"
"github.com/seata/seata-go/pkg/integration"
"github.com/seata/seata-go/pkg/rm/tcc"
"github.com/seata/seata-go/pkg/tm"
"github.com/seata/seata-go/sample/tcc/dubbo/client/service"
)

// need to setup environment variable "DUBBO_GO_CONFIG_PATH" to "conf/dubbogo.yml" before run
func main() {
integration.UseDubbo()
config.SetConsumerService(service.UserProviderInstance)
err := config.Load()
if err != nil {
panic(err)
}
test()
}

func test() {
var err error
ctx := tm.Begin(context.Background(), "TestTCCServiceBusiness")
defer func() {
resp := tm.CommitOrRollback(ctx, &err)
logger.Infof("tx result %v", resp)
<-make(chan struct{})
}()

userProviderProxy, err := tcc.NewTCCServiceProxy(service.UserProviderInstance)
if err != nil {
logger.Infof("userProviderProxyis not tcc service")
return
}
resp, err := userProviderProxy.Prepare(ctx, 1)
if err != nil {
logger.Infof("response prepare: %v", err)
return
}
logger.Infof("get resp %#v", resp)
}

+ 16
- 0
sample/tcc/dubbo/client/conf/dubbogo.yml View File

@@ -0,0 +1,16 @@
# dubbo client yaml configure file

dubbo:
registries:
demoZK:
protocol: zookeeper
address: 127.0.0.1:2181
consumer:
filter: seataDubboFilter
references:
UserProvider:
protocol: dubbo
interface: com.github.seata.sample.UserProvider
logger:
zap-config:
level: info

+ 35
- 0
sample/tcc/dubbo/client/service/user_provider.go View File

@@ -0,0 +1,35 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package service

import (
"context"

"github.com/seata/seata-go/pkg/tm"
)

var (
UserProviderInstance = &UserProvider{}
)

type UserProvider struct {
Prepare func(ctx context.Context, params ...interface{}) (bool, error) `seataTwoPhaseAction:"prepare" seataTwoPhaseServiceName:"TwoPhaseDemoService"`
Commit func(ctx context.Context, businessActionContext *tm.BusinessActionContext) (bool, error) `seataTwoPhaseAction:"commit"`
Rollback func(ctx context.Context, businessActionContext *tm.BusinessActionContext) (bool, error) `seataTwoPhaseAction:"rollback"`
GetActionName func() string
}

+ 77
- 0
sample/tcc/dubbo/server/cmd/server.go View File

@@ -0,0 +1,77 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package main

import (
"fmt"
"os"
"os/signal"
"syscall"
"time"

"dubbo.apache.org/dubbo-go/v3/common/logger"
"dubbo.apache.org/dubbo-go/v3/config"
_ "dubbo.apache.org/dubbo-go/v3/imports"
_ "github.com/seata/seata-go/pkg/imports"
"github.com/seata/seata-go/pkg/integration"
"github.com/seata/seata-go/pkg/rm/tcc"
"github.com/seata/seata-go/sample/tcc/dubbo/server/service"
)

// need to setup environment variable "DUBBO_GO_CONFIG_PATH" to "conf/dubbogo.yml" before run
func main() {
integration.UseDubbo()
userProviderProxy, err := tcc.NewTCCServiceProxy(&service.UserProvider{})
if err != nil {
logger.Errorf("get userProviderProxy tcc service proxy error, %v", err.Error())
return
}
// server should register resource
err = userProviderProxy.RegisterResource()
if err != nil {
logger.Errorf("userProviderProxy register resource error, %v", err.Error())
return
}
config.SetProviderService(userProviderProxy)
if err := config.Load(); err != nil {
panic(err)
}
initSignal()
}

func initSignal() {
signals := make(chan os.Signal, 1)
// It is not possible to block SIGKILL or syscall.SIGSTOP
signal.Notify(signals, os.Interrupt, syscall.SIGHUP, syscall.SIGQUIT, syscall.SIGTERM, syscall.SIGINT)
for {
sig := <-signals
logger.Infof("get signal %s", sig.String())
switch sig {
case syscall.SIGHUP:
// reload()
default:
time.AfterFunc(time.Duration(int(3e9)), func() {
logger.Warnf("app exit now by force...")
os.Exit(1)
})
// The program exits normally or timeout forcibly exits.
fmt.Println("provider app exit now...")
return
}
}
}

+ 20
- 0
sample/tcc/dubbo/server/conf/dubbogo.yml View File

@@ -0,0 +1,20 @@
# dubbo server yaml configure file

dubbo:
registries:
demoZK:
protocol: zookeeper
timeout: 3s
address: 127.0.0.1:2181
protocols:
dubbo:
name: dubbo
port: 20000
provider:
services:
UserProvider:
interface: com.github.seata.sample.UserProvider
filter: seataDubboFilter
logger:
zap-config:
level: info

+ 48
- 0
sample/tcc/dubbo/server/service/user_provider.go View File

@@ -0,0 +1,48 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package service

import (
"context"

"dubbo.apache.org/dubbo-go/v3/common/logger"
"github.com/seata/seata-go/pkg/tm"
)

type UserProvider struct {
}

func (t *UserProvider) Prepare(ctx context.Context, params ...interface{}) (bool, error) {
logger.Infof("Prepare result: %v, xid %v", params, tm.GetXID(ctx))
return true, nil
}

func (t *UserProvider) Commit(ctx context.Context, businessActionContext *tm.BusinessActionContext) (bool, error) {
logger.Infof("Commit result: %v, xid %s", businessActionContext, tm.GetXID(ctx))
return true, nil
}

func (t *UserProvider) Rollback(ctx context.Context, businessActionContext *tm.BusinessActionContext) (bool, error) {
logger.Infof("Rollback result: %v, xid %s", businessActionContext, tm.GetXID(ctx))
return true, nil
}

func (t *UserProvider) GetActionName() string {
logger.Infof("GetActionName result")
return "TwoPhaseDemoService2"
}

+ 72
- 0
sample/tcc/local/cmd/local.go View File

@@ -0,0 +1,72 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package main

import (
"context"

"github.com/seata/seata-go/pkg/common/log"
_ "github.com/seata/seata-go/pkg/imports"
"github.com/seata/seata-go/pkg/rm/tcc"
"github.com/seata/seata-go/pkg/tm"
"github.com/seata/seata-go/sample/tcc/local/service"
)

func main() {

var err error
ctx := tm.Begin(context.Background(), "TestTCCServiceBusiness")
defer func() {
resp := tm.CommitOrRollback(ctx, &err)
log.Infof("tx result %v", resp)
<-make(chan struct{})
}()

tccService, err := tcc.NewTCCServiceProxy(&service.TestTCCServiceBusiness{})
if err != nil {
log.Errorf("get TestTCCServiceBusiness tcc service proxy error, %v", err.Error())
return
}
err = tccService.RegisterResource()
if err != nil {
log.Errorf("TestTCCServiceBusiness register resource error, %v", err.Error())
return
}
_, err = tccService.Prepare(ctx, 1)
if err != nil {
log.Errorf("TestTCCServiceBusiness prepare error, %v", err.Error())
return
}

tccService2, err := tcc.NewTCCServiceProxy(&service.TestTCCServiceBusiness2{})
if err != nil {
log.Errorf("get TestTCCServiceBusiness2 tcc service proxy error, %v", err.Error())
return
}
err = tccService2.RegisterResource()
if err != nil {
log.Errorf("TestTCCServiceBusiness2 register resource error, %v", err.Error())
return
}
_, err = tccService2.Prepare(ctx, 3)
if err != nil {
log.Errorf("TestTCCServiceBusiness2 prepare error, %v", err.Error())
return
}

}

+ 0
- 37
sample/tcc/local/cmd/tcc_local_transation.go View File

@@ -1,37 +0,0 @@
package main

import (
"context"

"github.com/seata/seata-go/pkg/common/log"
_ "github.com/seata/seata-go/pkg/imports"
"github.com/seata/seata-go/pkg/rm/tcc"
"github.com/seata/seata-go/pkg/tm"
"github.com/seata/seata-go/sample/tcc/local/service"
)

func main() {

var err error
ctx := tm.Begin(context.Background(), "TestTCCServiceBusiness")
defer func() {
resp := tm.CommitOrRollback(ctx, err)
log.Infof("tx result %v", resp)
<-make(chan struct{})
}()

tccService := tcc.NewTCCServiceProxy(service.TestTCCServiceBusiness{})
err = tccService.Prepare(ctx, 1)
if err != nil {
log.Errorf("execute TestTCCServiceBusiness prepare error %s", err.Error())
return
}

tccService2 := tcc.NewTCCServiceProxy(service.TestTCCServiceBusiness2{})
err = tccService2.Prepare(ctx, 3)
if err != nil {
log.Errorf("execute TestTCCServiceBusiness2 prepare error %s", err.Error())
return
}

}

+ 29
- 12
sample/tcc/local/service/service.go View File

@@ -1,3 +1,20 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package service

import (
@@ -10,19 +27,19 @@ import (
type TestTCCServiceBusiness struct {
}

func (T TestTCCServiceBusiness) Prepare(ctx context.Context, params interface{}) error {
func (T TestTCCServiceBusiness) Prepare(ctx context.Context, params ...interface{}) (bool, error) {
log.Infof("TestTCCServiceBusiness Prepare, param %v", params)
return nil
return true, nil
}

func (T TestTCCServiceBusiness) Commit(ctx context.Context, businessActionContext tm.BusinessActionContext) error {
func (T TestTCCServiceBusiness) Commit(ctx context.Context, businessActionContext *tm.BusinessActionContext) (bool, error) {
log.Infof("TestTCCServiceBusiness Commit, param %v", businessActionContext)
return nil
return true, nil
}

func (T TestTCCServiceBusiness) Rollback(ctx context.Context, businessActionContext tm.BusinessActionContext) error {
func (T TestTCCServiceBusiness) Rollback(ctx context.Context, businessActionContext *tm.BusinessActionContext) (bool, error) {
log.Infof("TestTCCServiceBusiness Rollback, param %v", businessActionContext)
return nil
return true, nil
}

func (T TestTCCServiceBusiness) GetActionName() string {
@@ -32,19 +49,19 @@ func (T TestTCCServiceBusiness) GetActionName() string {
type TestTCCServiceBusiness2 struct {
}

func (T TestTCCServiceBusiness2) Prepare(ctx context.Context, params interface{}) error {
func (T TestTCCServiceBusiness2) Prepare(ctx context.Context, params ...interface{}) (bool, error) {
log.Infof("TestTCCServiceBusiness2 Prepare, param %v", params)
return nil
return true, nil
}

func (T TestTCCServiceBusiness2) Commit(ctx context.Context, businessActionContext tm.BusinessActionContext) error {
func (T TestTCCServiceBusiness2) Commit(ctx context.Context, businessActionContext *tm.BusinessActionContext) (bool, error) {
log.Infof("TestTCCServiceBusiness2 Commit, param %v", businessActionContext)
return nil
return true, nil
}

func (T TestTCCServiceBusiness2) Rollback(ctx context.Context, businessActionContext tm.BusinessActionContext) error {
func (T TestTCCServiceBusiness2) Rollback(ctx context.Context, businessActionContext *tm.BusinessActionContext) (bool, error) {
log.Infof("TestTCCServiceBusiness2 Rollback, param %v", businessActionContext)
return nil
return true, nil
}

func (T TestTCCServiceBusiness2) GetActionName() string {


Loading…
Cancel
Save