From 8550da0ca9a5f9a8b540a280fbec0d998b87cd59 Mon Sep 17 00:00:00 2001 From: Road2Melon <57566066+Road2Melon@users.noreply.github.com> Date: Sat, 10 May 2025 19:46:46 +0800 Subject: [PATCH] optimize: optimize transaction timeout judgment (#777) * optimize: optimize transaction timeout judgment (#777) * optimize: timeout func --------- Co-authored-by: FengZhang Co-authored-by: JayLiu <38887641+luky116@users.noreply.github.com> --- pkg/tm/context.go | 24 +++++++- pkg/tm/global_transaction.go | 16 +++++ pkg/tm/transaction_executor.go | 4 ++ pkg/tm/transaction_executor_test.go | 95 ++++++++++++++++++++++++++--- 4 files changed, 128 insertions(+), 11 deletions(-) diff --git a/pkg/tm/context.go b/pkg/tm/context.go index aede33df..635f1294 100644 --- a/pkg/tm/context.go +++ b/pkg/tm/context.go @@ -19,9 +19,9 @@ package tm import ( "context" - "seata.apache.org/seata-go/pkg/protocol/message" "seata.apache.org/seata-go/pkg/rm/tcc/fence/enum" + "time" ) type ContextParam string @@ -56,6 +56,13 @@ type ContextVariable struct { BusinessActionContext *BusinessActionContext // GlobalTransaction Represent seata ctx is a global transaction GlobalTransaction + // use to calculate the timeout + timeInfo TimeInfo +} + +type TimeInfo struct { + createTime time.Duration + timeout time.Duration } func InitSeataContext(ctx context.Context) context.Context { @@ -211,3 +218,18 @@ func IsFenceTxBegin(ctx context.Context) bool { return false } + +func GetTimeInfo(ctx context.Context) (ti *TimeInfo) { + variable := ctx.Value(seataContextVariable) + if variable != nil { + ti = &variable.(*ContextVariable).timeInfo + } + return +} + +func SetTimeInfo(ctx context.Context, ti TimeInfo) { + variable := ctx.Value(seataContextVariable) + if variable != nil { + variable.(*ContextVariable).timeInfo = ti + } +} diff --git a/pkg/tm/global_transaction.go b/pkg/tm/global_transaction.go index 3984bdd0..8846e799 100644 --- a/pkg/tm/global_transaction.go +++ b/pkg/tm/global_transaction.go @@ -71,6 +71,14 @@ func (g *GlobalTransactionManager) Begin(ctx context.Context, timeout time.Durat // Commit the global transaction. func (g *GlobalTransactionManager) Commit(ctx context.Context, gtr *GlobalTransaction) error { + if isTimeout(ctx) { + log.Infof("Rollback: tm detected timeout in global gtr %s", gtr.Xid) + if err := GetGlobalTransactionManager().Rollback(ctx, gtr); err != nil { + log.Errorf("Rollback transaction failed, error: %v in global gtr % s", err, gtr.Xid) + return err + } + return nil + } if gtr.TxRole != Launcher { log.Infof("Ignore Commit(): just involved in global gtr %s", gtr.Xid) return nil @@ -151,3 +159,11 @@ func (g *GlobalTransactionManager) Rollback(ctx context.Context, gtr *GlobalTran return nil } + +func isTimeout(ctx context.Context) bool { + ti := GetTimeInfo(ctx) + if ti == nil || ti.createTime == 0 || ti.timeout == 0 { + return false + } + return time.Since(time.Unix(int64(ti.createTime), 0)) > ti.timeout +} diff --git a/pkg/tm/transaction_executor.go b/pkg/tm/transaction_executor.go index 6f376210..a9f42b56 100644 --- a/pkg/tm/transaction_executor.go +++ b/pkg/tm/transaction_executor.go @@ -97,6 +97,10 @@ func WithGlobalTx(ctx context.Context, gc *GtxConfig, business CallbackWithCtx) // construct a new context object and set the xid. // the advantage of this is that the suspend and resume operations of xid need not to be considered. func begin(ctx context.Context, gc *GtxConfig) error { + // record time + ti := TimeInfo{createTime: time.Duration(time.Now().Unix()), timeout: gc.Timeout} + SetTimeInfo(ctx, ti) + switch pg := gc.Propagation; pg { case NotSupported: // If transaction is existing, suspend it diff --git a/pkg/tm/transaction_executor_test.go b/pkg/tm/transaction_executor_test.go index 8fe541c1..dafcea91 100644 --- a/pkg/tm/transaction_executor_test.go +++ b/pkg/tm/transaction_executor_test.go @@ -19,6 +19,7 @@ package tm import ( "context" + "fmt" "reflect" "testing" "time" @@ -378,16 +379,21 @@ func TestWithGlobalTx(t *testing.T) { } type testCase struct { - GtxConfig *GtxConfig - occurError bool - errMessage string - callbackErr bool - mockBeginFunc interface{} - mockBeginTarget interface{} - mockSecondPhaseFunc interface{} - mockSecondPhaseTarget interface{} - secondErr bool - callback CallbackWithCtx + GtxConfig *GtxConfig + occurError bool + timeoutErr bool + errMessage string + callbackErr bool + mockBeginFunc interface{} + mockBeginTarget interface{} + mockSecondPhaseFunc interface{} + mockSecondPhaseTarget interface{} + mockRollbackTargetName string + mockRollbackFunc interface{} + mockTimeoutTarget interface{} + mockTimeoutFunc interface{} + secondErr bool + callback CallbackWithCtx } gts := []testCase{ @@ -464,18 +470,74 @@ func TestWithGlobalTx(t *testing.T) { }, secondErr: true, }, + + // case tm detected a timeout and executed rollback successfully. + { + GtxConfig: &GtxConfig{ + Name: "MockGtxConfig", + Timeout: time.Second * 30, + }, + timeoutErr: false, + mockBeginTarget: begin, + mockBeginFunc: func(ctx context.Context, gc *GtxConfig) error { + SetXID(ctx, "123456") + SetTxRole(ctx, Launcher) + return nil + }, + callback: callbackNil, + mockTimeoutTarget: isTimeout, + mockTimeoutFunc: func(ctx context.Context) bool { + return true + }, + mockRollbackTargetName: "Rollback", + mockRollbackFunc: func(_ *GlobalTransactionManager, ctx context.Context, gtr *GlobalTransaction) error { + return nil + }, + }, + // tm detected a timeout but rollback threw an exception. + { + GtxConfig: &GtxConfig{ + Name: "MockGtxConfig", + Timeout: time.Second * 30, + }, + timeoutErr: true, + errMessage: "tm detected a timeout but rollback threw an exception", + mockBeginTarget: begin, + mockBeginFunc: func(ctx context.Context, gc *GtxConfig) error { + SetXID(ctx, "123456") + SetTxRole(ctx, Launcher) + return nil + }, + callback: callbackNil, + mockTimeoutTarget: isTimeout, + mockTimeoutFunc: func(ctx context.Context) bool { + return true + }, + mockRollbackTargetName: "Rollback", + mockRollbackFunc: func(_ *GlobalTransactionManager, ctx context.Context, gtr *GlobalTransaction) error { + return fmt.Errorf("tm detected a timeout but rollback threw an exception") + }, + }, } for i, v := range gts { t.Logf("Case %v: %+v", i, v) var beginStub *gomonkey.Patches var secondStub *gomonkey.Patches + var timeoutStub *gomonkey.Patches + var rollbackStub *gomonkey.Patches if v.mockBeginTarget != nil { beginStub = gomonkey.ApplyFunc(v.mockBeginTarget, v.mockBeginFunc) } if v.mockSecondPhaseTarget != nil { secondStub = gomonkey.ApplyFunc(v.mockSecondPhaseTarget, v.mockSecondPhaseFunc) } + if v.mockTimeoutTarget != nil { + timeoutStub = gomonkey.ApplyFunc(v.mockTimeoutTarget, v.mockTimeoutFunc) + } + if v.mockRollbackTargetName != "" { + rollbackStub = gomonkey.ApplyMethod(reflect.TypeOf(GetGlobalTransactionManager()), v.mockRollbackTargetName, v.mockRollbackFunc) + } ctx := context.Background() err := WithGlobalTx(ctx, v.GtxConfig, v.callback) @@ -492,6 +554,10 @@ func TestWithGlobalTx(t *testing.T) { assert.NotNil(t, err) } + if v.timeoutErr { + assert.Regexp(t, v.errMessage, err.Error()) + } + if v.mockBeginTarget != nil { beginStub.Reset() } @@ -499,5 +565,14 @@ func TestWithGlobalTx(t *testing.T) { if v.mockSecondPhaseTarget != nil { secondStub.Reset() } + + if v.mockTimeoutTarget != nil { + timeoutStub.Reset() + } + + if v.mockRollbackTargetName != "" { + rollbackStub.Reset() + } + } }