@@ -12,6 +12,7 @@ require ( | |||
github.com/stretchr/testify v1.7.1 | |||
go.uber.org/atomic v1.9.0 | |||
go.uber.org/zap v1.21.0 | |||
golang.org/x/tools v0.1.11 // indirect | |||
vimagination.zapto.org/byteio v0.0.0-20200222190125-d27cba0f0b10 | |||
vimagination.zapto.org/memio v0.0.0-20200222190306-588ebc67b97d // indirect | |||
) |
@@ -770,6 +770,7 @@ github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9de | |||
github.com/yuin/goldmark v1.1.32/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= | |||
github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= | |||
github.com/yuin/goldmark v1.3.5/go.mod h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1Zlc8k= | |||
github.com/yuin/goldmark v1.4.1/go.mod h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1Zlc8k= | |||
github.com/yusufpapurcu/wmi v1.2.2 h1:KBNDSne4vP5mbSWnJbO+51IMOXJB67QiYCSBrubbPRg= | |||
github.com/yusufpapurcu/wmi v1.2.2/go.mod h1:SBZ9tNy3G9/m5Oi98Zks0QjeHVDvuK0qfxQmPyzfmi0= | |||
github.com/zouyx/agollo/v3 v3.4.5 h1:7YCxzY9ZYaH9TuVUBvmI6Tk0mwMggikah+cfbYogcHQ= | |||
@@ -840,6 +841,7 @@ golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8U | |||
golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= | |||
golang.org/x/crypto v0.0.0-20201002170205-7f63de1d35b0/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= | |||
golang.org/x/crypto v0.0.0-20210711020723-a769d52b0f97/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc= | |||
golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc= | |||
golang.org/x/crypto v0.0.0-20211215153901-e495a2d5b3d3 h1:0es+/5331RGQPcXlMfP+WrnIIS6dNnNRe0WB02W0F4M= | |||
golang.org/x/crypto v0.0.0-20211215153901-e495a2d5b3d3/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4= | |||
golang.org/x/exp v0.0.0-20180321215751-8460e604b9de/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= | |||
@@ -879,6 +881,8 @@ golang.org/x/mod v0.1.1-0.20191107180719-034126e5016b/go.mod h1:QqPTAvyqsEbceGzB | |||
golang.org/x/mod v0.2.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= | |||
golang.org/x/mod v0.3.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= | |||
golang.org/x/mod v0.4.2/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= | |||
golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4 h1:6zppjxzCulZykYSLyVDYbneBfbaBIQPYMevg0bEwv2s= | |||
golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4/go.mod h1:jJ57K6gSWd91VN4djpZkiMVwK6gcyfeH4XE8wZrZaV4= | |||
golang.org/x/net v0.0.0-20180530234432-1e491301e022/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= | |||
golang.org/x/net v0.0.0-20180724234803-3673e40ba225/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= | |||
golang.org/x/net v0.0.0-20180826012351-8a410e7b638d/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= | |||
@@ -923,6 +927,7 @@ golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v | |||
golang.org/x/net v0.0.0-20210405180319-a5a99cb37ef4/go.mod h1:p54w0d4576C0XHj96bSt6lcn1PtDYWL6XObtHCRCNQM= | |||
golang.org/x/net v0.0.0-20210525063256-abc453219eb5/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= | |||
golang.org/x/net v0.0.0-20210917221730-978cfadd31cf/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= | |||
golang.org/x/net v0.0.0-20211015210444-4f30a5c0130f/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= | |||
golang.org/x/net v0.0.0-20211029224645-99673261e6eb/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= | |||
golang.org/x/net v0.0.0-20211105192438-b53810dc28af/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= | |||
golang.org/x/net v0.0.0-20211112202133-69e39bad7dc2 h1:CIJ76btIcR3eFI5EgSo6k1qKw9KJexJuRLI9G7Hp5wE= | |||
@@ -1012,6 +1017,7 @@ golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBc | |||
golang.org/x/sys v0.0.0-20210630005230-0f9fa26af87c/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= | |||
golang.org/x/sys v0.0.0-20210806184541-e5e7981a1069/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= | |||
golang.org/x/sys v0.0.0-20210816074244-15123e1e1f71/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= | |||
golang.org/x/sys v0.0.0-20211019181941-9d821ace8654/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= | |||
golang.org/x/sys v0.0.0-20211106132015-ebca88c72f68/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= | |||
golang.org/x/sys v0.0.0-20211117180635-dee7805ff2e1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= | |||
golang.org/x/sys v0.0.0-20220111092808-5a964db01320/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= | |||
@@ -1093,6 +1099,8 @@ golang.org/x/tools v0.0.0-20201014170642-d1624618ad65/go.mod h1:z6u4i615ZeAfBE4X | |||
golang.org/x/tools v0.0.0-20210106214847-113979e3529a/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA= | |||
golang.org/x/tools v0.1.2/go.mod h1:o0xws9oXOQQZyjljx8fwUC0k7L1pTE6eaCbjGeHmOkk= | |||
golang.org/x/tools v0.1.5/go.mod h1:o0xws9oXOQQZyjljx8fwUC0k7L1pTE6eaCbjGeHmOkk= | |||
golang.org/x/tools v0.1.11 h1:loJ25fNOEhSXfHrpoGj91eCUThwdNX6u24rO1xnNteY= | |||
golang.org/x/tools v0.1.11/go.mod h1:SgwaegtQh8clINPpECJMqnxLv9I09HLqnW3RMqW0CA4= | |||
golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= | |||
golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= | |||
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= | |||
@@ -170,9 +170,9 @@ func (t *TCCResourceManager) BranchRollback(ctx context.Context, ranchType branc | |||
_, err := tccResource.TwoPhaseAction.Rollback(ctx, t.getBusinessActionContext(xid, branchID, resourceID, applicationData)) | |||
if err != nil { | |||
return branch.BranchStatusPhasetwoRollbacked, err | |||
return branch.BranchStatusPhasetwoRollbackFailedRetryable, err | |||
} | |||
return branch.BranchStatusPhasetwoRollbackFailedRetryable, err | |||
return branch.BranchStatusPhasetwoRollbacked, err | |||
} | |||
func (t *TCCResourceManager) GetBranchType() branch.BranchType { | |||
@@ -91,6 +91,7 @@ func (t *TCCServiceProxy) registeBranch(ctx context.Context) error { | |||
log.Errorf(err.Error()) | |||
return err | |||
} | |||
// todo add param | |||
tccContext := make(map[string]interface{}, 0) | |||
tccContext[common.StartTime] = time.Now().UnixNano() / 1e6 | |||
tccContext[common.HostName] = net.GetLocalIp() | |||
@@ -21,6 +21,7 @@ import ( | |||
"context" | |||
"fmt" | |||
"sync" | |||
"time" | |||
"github.com/pkg/errors" | |||
"github.com/seata/seata-go/pkg/common/log" | |||
@@ -97,8 +98,11 @@ func (g *GlobalTransactionManager) Commit(ctx context.Context, gtr *GlobalTransa | |||
var ( | |||
err error | |||
res interface{} | |||
// todo retry and retryInterval should read from config | |||
retry = 10 | |||
retryInterval = 200 * time.Millisecond | |||
) | |||
for retry := 5; retry > 0; retry-- { | |||
for ; retry > 0; retry-- { | |||
req := message.GlobalCommitRequest{ | |||
AbstractGlobalEndRequest: message.AbstractGlobalEndRequest{ | |||
Xid: gtr.Xid, | |||
@@ -107,6 +111,7 @@ func (g *GlobalTransactionManager) Commit(ctx context.Context, gtr *GlobalTransa | |||
res, err = getty.GetGettyRemotingClient().SendSyncRequest(req) | |||
if err != nil { | |||
log.Errorf("GlobalCommitRequest error, xid %s, error %v", gtr.Xid, err) | |||
time.Sleep(retryInterval) | |||
} else { | |||
break | |||
} | |||
@@ -135,8 +140,11 @@ func (g *GlobalTransactionManager) Rollback(ctx context.Context, gtr *GlobalTran | |||
var ( | |||
err error | |||
res interface{} | |||
// todo retry and retryInterval should read from config | |||
retry = 10 | |||
retryInterval = 200 * time.Millisecond | |||
) | |||
for retry := 5; retry > 0; retry-- { | |||
for ; retry > 0; retry-- { | |||
req := message.GlobalRollbackRequest{ | |||
AbstractGlobalEndRequest: message.AbstractGlobalEndRequest{ | |||
Xid: gtr.Xid, | |||
@@ -145,18 +153,19 @@ func (g *GlobalTransactionManager) Rollback(ctx context.Context, gtr *GlobalTran | |||
res, err = getty.GetGettyRemotingClient().SendSyncRequest(req) | |||
if err != nil { | |||
log.Errorf("GlobalRollbackRequest error, xid %s, error %v", gtr.Xid, err) | |||
time.Sleep(retryInterval) | |||
} else { | |||
break | |||
} | |||
} | |||
if err == nil { | |||
log.Errorf("GlobalRollbackRequest rollback success, xid %s, error %v", gtr.Xid, err.Error()) | |||
gtr.Status = res.(message.GlobalRollbackResponse).GlobalStatus | |||
UnbindXid(ctx) | |||
return nil | |||
if err != nil { | |||
log.Errorf("GlobalRollbackRequest rollback failed, xid %s, error %v", gtr.Xid, err) | |||
return err | |||
} | |||
log.Errorf("GlobalRollbackRequest rollback failed, xid %s, error %v", gtr.Xid, err) | |||
return err | |||
log.Infof("GlobalRollbackRequest rollback success, xid %s,", gtr.Xid) | |||
gtr.Status = res.(message.GlobalRollbackResponse).GlobalStatus | |||
UnbindXid(ctx) | |||
return nil | |||
} | |||
// Suspend the global transaction. | |||
@@ -20,6 +20,7 @@ package tm | |||
import ( | |||
"context" | |||
"fmt" | |||
"time" | |||
"github.com/seata/seata-go/pkg/common/log" | |||
"github.com/seata/seata-go/pkg/protocol/message" | |||
@@ -79,24 +80,42 @@ func Begin(ctx context.Context, name string) context.Context { | |||
} | |||
// commit global transaction | |||
func CommitOrRollback(ctx context.Context, err *error) error { | |||
func CommitOrRollback(ctx context.Context, isSuccess bool) error { | |||
role := *GetTransactionRole(ctx) | |||
if role == PARTICIPANT { | |||
// Participant has no responsibility of rollback | |||
log.Debugf("Ignore Rollback(): just involved in global transaction [%s]", GetXID(ctx)) | |||
return nil | |||
} | |||
tx := &GlobalTransaction{ | |||
Xid: GetXID(ctx), | |||
Status: *GetTxStatus(ctx), | |||
Role: *GetTransactionRole(ctx), | |||
Role: role, | |||
} | |||
var resp error | |||
if *err == nil { | |||
resp = GetGlobalTransactionManager().Commit(ctx, tx) | |||
if resp != nil { | |||
log.Infof("transactionTemplate: commit transaction failed, error %v", err) | |||
var ( | |||
err error | |||
// todo retry and retryInterval should read from config | |||
retry = 10 | |||
retryInterval = 200 * time.Millisecond | |||
) | |||
for ; retry > 0; retry-- { | |||
if isSuccess { | |||
err = GetGlobalTransactionManager().Commit(ctx, tx) | |||
if err != nil { | |||
log.Infof("transactionTemplate: commit transaction failed, error %v", err) | |||
} | |||
} else { | |||
err = GetGlobalTransactionManager().Rollback(ctx, tx) | |||
if err != nil { | |||
log.Infof("transactionTemplate: Rollback transaction failed, error %v", err) | |||
} | |||
} | |||
} else { | |||
resp = GetGlobalTransactionManager().Rollback(ctx, tx) | |||
if resp != nil { | |||
log.Infof("transactionTemplate: Rollback transaction failed, error %v", err) | |||
if err == nil { | |||
break | |||
} else { | |||
time.Sleep(retryInterval) | |||
} | |||
} | |||
return resp | |||
// todo unbind xid | |||
return err | |||
} |
@@ -45,7 +45,7 @@ func test() { | |||
var err error | |||
ctx := tm.Begin(context.Background(), "TestTCCServiceBusiness") | |||
defer func() { | |||
resp := tm.CommitOrRollback(ctx, &err) | |||
resp := tm.CommitOrRollback(ctx, err == nil) | |||
logger.Infof("tx result %v", resp) | |||
<-make(chan struct{}) | |||
}() | |||
@@ -22,47 +22,27 @@ import ( | |||
"github.com/seata/seata-go/pkg/common/log" | |||
_ "github.com/seata/seata-go/pkg/imports" | |||
"github.com/seata/seata-go/pkg/rm/tcc" | |||
"github.com/seata/seata-go/pkg/tm" | |||
"github.com/seata/seata-go/sample/tcc/local/service" | |||
) | |||
func main() { | |||
var err error | |||
ctx := tm.Begin(context.Background(), "TestTCCServiceBusiness") | |||
defer func() { | |||
resp := tm.CommitOrRollback(ctx, &err) | |||
resp := tm.CommitOrRollback(ctx, err == nil) | |||
log.Infof("tx result %v", resp) | |||
<-make(chan struct{}) | |||
}() | |||
tccService, err := tcc.NewTCCServiceProxy(&service.TestTCCServiceBusiness{}) | |||
if err != nil { | |||
log.Errorf("get TestTCCServiceBusiness tcc service proxy error, %v", err.Error()) | |||
return | |||
} | |||
err = tccService.RegisterResource() | |||
if err != nil { | |||
log.Errorf("TestTCCServiceBusiness register resource error, %v", err.Error()) | |||
return | |||
} | |||
tccService := service.NewTestTCCServiceBusinessProxy() | |||
tccService2 := service.NewTestTCCServiceBusiness2Proxy() | |||
_, err = tccService.Prepare(ctx, 1) | |||
if err != nil { | |||
log.Errorf("TestTCCServiceBusiness prepare error, %v", err.Error()) | |||
return | |||
} | |||
tccService2, err := tcc.NewTCCServiceProxy(&service.TestTCCServiceBusiness2{}) | |||
if err != nil { | |||
log.Errorf("get TestTCCServiceBusiness2 tcc service proxy error, %v", err.Error()) | |||
return | |||
} | |||
err = tccService2.RegisterResource() | |||
if err != nil { | |||
log.Errorf("TestTCCServiceBusiness2 register resource error, %v", err.Error()) | |||
return | |||
} | |||
_, err = tccService2.Prepare(ctx, 3) | |||
if err != nil { | |||
log.Errorf("TestTCCServiceBusiness2 prepare error, %v", err.Error()) | |||
@@ -19,14 +19,44 @@ package service | |||
import ( | |||
"context" | |||
"fmt" | |||
"sync" | |||
"github.com/seata/seata-go/pkg/rm/tcc" | |||
"github.com/seata/seata-go/pkg/common/log" | |||
"github.com/seata/seata-go/pkg/tm" | |||
) | |||
var ( | |||
tccService *tcc.TCCServiceProxy | |||
tccServiceOnce sync.Once | |||
tccService2 *tcc.TCCServiceProxy | |||
tccService2Once sync.Once | |||
) | |||
type TestTCCServiceBusiness struct { | |||
} | |||
func NewTestTCCServiceBusinessProxy() *tcc.TCCServiceProxy { | |||
if tccService != nil { | |||
return tccService | |||
} | |||
tccServiceOnce.Do(func() { | |||
var err error | |||
tccService, err = tcc.NewTCCServiceProxy(&TestTCCServiceBusiness{}) | |||
if err != nil { | |||
panic(fmt.Errorf("get TestTCCServiceBusiness tcc service proxy error, %v", err.Error())) | |||
} | |||
err = tccService.RegisterResource() | |||
if err != nil { | |||
panic(fmt.Errorf("TestTCCServiceBusiness register resource error, %v", err.Error())) | |||
} | |||
}) | |||
return tccService | |||
} | |||
func (T TestTCCServiceBusiness) Prepare(ctx context.Context, params ...interface{}) (bool, error) { | |||
log.Infof("TestTCCServiceBusiness Prepare, param %v", params) | |||
return true, nil | |||
@@ -49,6 +79,24 @@ func (T TestTCCServiceBusiness) GetActionName() string { | |||
type TestTCCServiceBusiness2 struct { | |||
} | |||
func NewTestTCCServiceBusiness2Proxy() *tcc.TCCServiceProxy { | |||
if tccService2 != nil { | |||
return tccService2 | |||
} | |||
tccService2Once.Do(func() { | |||
var err error | |||
tccService2, err = tcc.NewTCCServiceProxy(&TestTCCServiceBusiness2{}) | |||
if err != nil { | |||
panic(fmt.Errorf("TestTCCServiceBusiness2 get tcc service proxy error, %v", err.Error())) | |||
} | |||
err = tccService2.RegisterResource() | |||
if err != nil { | |||
panic(fmt.Errorf("TestTCCServiceBusiness2 register resource error, %v", err.Error())) | |||
} | |||
}) | |||
return tccService2 | |||
} | |||
func (T TestTCCServiceBusiness2) Prepare(ctx context.Context, params ...interface{}) (bool, error) { | |||
log.Infof("TestTCCServiceBusiness2 Prepare, param %v", params) | |||
return true, nil | |||