Browse Source

optimize : tccfence log table deleted should be optimized (#745)

* feat: optimized the daily deletion of fence logs

* feat: optimized the daily deletion of fence logs

* feat: configure the fence through the configuration file

* fix: fixed some issues with sql statements

---------

Co-authored-by: JayLiu <38887641+luky116@users.noreply.github.com>
Co-authored-by: FengZhang <zfcode@qq.com>
tags/v2.0.0-rc01
ZeruiYang GitHub 5 months ago
parent
commit
5e40c6c271
No known key found for this signature in database GPG Key ID: B5690EEEBB952194
13 changed files with 348 additions and 63 deletions
  1. +1
    -1
      pkg/client/client.go
  2. +5
    -1
      pkg/client/config_test.go
  3. +5
    -10
      pkg/rm/tcc/fence/config/tcc_fence_config.go
  4. +22
    -1
      pkg/rm/tcc/fence/fence.go
  5. +1
    -1
      pkg/rm/tcc/fence/fennce_driver_test.go
  6. +116
    -30
      pkg/rm/tcc/fence/handler/tcc_fence_wrapper_handler.go
  7. +14
    -3
      pkg/rm/tcc/fence/store/db/dao/store_api.go
  8. +78
    -12
      pkg/rm/tcc/fence/store/db/dao/tcc_fence_db.go
  9. +71
    -2
      pkg/rm/tcc/fence/store/db/dao/tcc_fence_db_test.go
  10. +9
    -0
      pkg/rm/tcc/fence/store/db/model/tcc_fence_do.go
  11. +21
    -1
      pkg/rm/tcc/fence/store/db/sql/tcc_fence_store_sql.go
  12. +3
    -1
      pkg/rm/tcc/tcc_resource.go
  13. +2
    -0
      testdata/conf/seatago.yml

+ 1
- 1
pkg/client/client.go View File

@@ -88,7 +88,7 @@ func initRmClient(cfg *Config) {
config.Init(cfg.ClientConfig.RmConfig.LockConfig)
client.RegisterProcessor()
integration.Init()
tcc.InitTCC()
tcc.InitTCC(cfg.TCCConfig.FenceConfig)
at.InitAT(cfg.ClientConfig.UndoConfig, cfg.AsyncWorkerConfig)
at.InitXA(cfg.ClientConfig.XaConfig)
})


+ 5
- 1
pkg/client/config_test.go View File

@@ -39,6 +39,8 @@ func TestLoadPath(t *testing.T) {

assert.NotNil(t, cfg.TCCConfig)
assert.NotNil(t, cfg.TCCConfig.FenceConfig)
assert.Equal(t, false, cfg.TCCConfig.FenceConfig.Enable)
assert.Equal(t, "root:12345678@tcp(127.0.0.1:3306)/seata_client1?charset=utf8&parseTime=True", cfg.TCCConfig.FenceConfig.Url)
assert.Equal(t, "tcc_fence_log_test", cfg.TCCConfig.FenceConfig.LogTableName)
assert.Equal(t, time.Second*60, cfg.TCCConfig.FenceConfig.CleanPeriod)

@@ -131,7 +133,7 @@ func TestLoadPath(t *testing.T) {
}

func TestLoadJson(t *testing.T) {
confJson := `{"enabled":false,"application-id":"application_test","tx-service-group":"default_tx_group","access-key":"test","secret-key":"test","enable-auto-data-source-proxy":false,"data-source-proxy-mode":"AT","client":{"rm":{"async-commit-buffer-limit":10000,"report-retry-count":5,"table-meta-check-enable":false,"report-success-enable":false,"saga-branch-register-enable":false,"saga-json-parser":"fastjson","saga-retry-persist-mode-update":false,"saga-compensate-persist-mode-update":false,"tcc-action-interceptor-order":-2147482648,"sql-parser-type":"druid","lock":{"retry-interval":"30s","retry-times":10,"retry-policy-branch-rollback-on-conflict":true}},"tm":{"commit-retry-count":5,"rollback-retry-count":5,"default-global-transaction-timeout":"60s","degrade-check":false,"degrade-check-period":2000,"degrade-check-allow-times":"10s","interceptor-order":-2147482648},"undo":{"data-validation":false,"log-serialization":"jackson222","only-care-update-columns":false,"log-table":"undo_log333","compress":{"enable":false,"type":"zip111","threshold":"128k"}}},"tcc":{"fence":{"log-table-name":"tcc_fence_log_test2","clean-period":80000000000}},"getty":{"reconnect-interval":1,"connection-num":10,"session":{"compress-encoding":true,"tcp-no-delay":false,"tcp-keep-alive":false,"keep-alive-period":"120s","tcp-r-buf-size":261120,"tcp-w-buf-size":32768,"tcp-read-timeout":"2s","tcp-write-timeout":"8s","wait-timeout":"2s","max-msg-len":261120,"session-name":"client_test","cron-period":"2s"}},"transport":{"shutdown":{"wait":"3s"},"type":"TCP","server":"NIO","heartbeat":true,"serialization":"seata","compressor":"none"," enable-tm-client-batch-send-request":false,"enable-rm-client-batch-send-request":true,"rpc-rm-request-timeout":"30s","rpc-tm-request-timeout":"30s"},"service":{"enable-degrade":true,"disable-global-transaction":true,"vgroup-mapping":{"default_tx_group":"default_test"},"grouplist":{"default":"127.0.0.1:8092"}}}`
confJson := `{"enabled":false,"application-id":"application_test","tx-service-group":"default_tx_group","access-key":"test","secret-key":"test","enable-auto-data-source-proxy":false,"data-source-proxy-mode":"AT","client":{"rm":{"async-commit-buffer-limit":10000,"report-retry-count":5,"table-meta-check-enable":false,"report-success-enable":false,"saga-branch-register-enable":false,"saga-json-parser":"fastjson","saga-retry-persist-mode-update":false,"saga-compensate-persist-mode-update":false,"tcc-action-interceptor-order":-2147482648,"sql-parser-type":"druid","lock":{"retry-interval":"30s","retry-times":10,"retry-policy-branch-rollback-on-conflict":true}},"tm":{"commit-retry-count":5,"rollback-retry-count":5,"default-global-transaction-timeout":"60s","degrade-check":false,"degrade-check-period":2000,"degrade-check-allow-times":"10s","interceptor-order":-2147482648},"undo":{"data-validation":false,"log-serialization":"jackson222","only-care-update-columns":false,"log-table":"undo_log333","compress":{"enable":false,"type":"zip111","threshold":"128k"}}},"tcc":{"fence":{"enable":false,"url":"root:12345678@tcp(127.0.0.1:3306)/seata_client1?charset=utf8&parseTime=True","log-table-name":"tcc_fence_log_test2","clean-period":80000000000}},"getty":{"reconnect-interval":1,"connection-num":10,"session":{"compress-encoding":true,"tcp-no-delay":false,"tcp-keep-alive":false,"keep-alive-period":"120s","tcp-r-buf-size":261120,"tcp-w-buf-size":32768,"tcp-read-timeout":"2s","tcp-write-timeout":"8s","wait-timeout":"2s","max-msg-len":261120,"session-name":"client_test","cron-period":"2s"}},"transport":{"shutdown":{"wait":"3s"},"type":"TCP","server":"NIO","heartbeat":true,"serialization":"seata","compressor":"none"," enable-tm-client-batch-send-request":false,"enable-rm-client-batch-send-request":true,"rpc-rm-request-timeout":"30s","rpc-tm-request-timeout":"30s"},"service":{"enable-degrade":true,"disable-global-transaction":true,"vgroup-mapping":{"default_tx_group":"default_test"},"grouplist":{"default":"127.0.0.1:8092"}}}`
cfg := LoadJson([]byte(confJson))
assert.NotNil(t, cfg)
assert.Equal(t, false, cfg.Enabled)
@@ -168,6 +170,8 @@ func TestLoadJson(t *testing.T) {

assert.NotNil(t, cfg.TCCConfig)
assert.NotNil(t, cfg.TCCConfig.FenceConfig)
assert.Equal(t, false, cfg.TCCConfig.FenceConfig.Enable)
assert.Equal(t, "root:12345678@tcp(127.0.0.1:3306)/seata_client1?charset=utf8&parseTime=True", cfg.TCCConfig.FenceConfig.Url)
assert.Equal(t, "tcc_fence_log_test2", cfg.TCCConfig.FenceConfig.LogTableName)
assert.Equal(t, time.Second*80, cfg.TCCConfig.FenceConfig.CleanPeriod)



+ 5
- 10
pkg/rm/tcc/fence/config/tcc_fence_config.go View File

@@ -18,22 +18,17 @@
package config

import (
"go.uber.org/atomic"

"seata.apache.org/seata-go/pkg/rm/tcc/fence/handler"
)

type TccFenceConfig struct {
Initialized atomic.Bool `default:"false"`
LogTableName string `default:"tcc_fence_log"`
}

func InitFence() {
// todo implement

}

func InitCleanTask() {
handler.GetFenceHandler().InitLogCleanChannel()
func InitCleanTask(dsn string) {

go handler.GetFenceHandler().InitLogCleanChannel(dsn)

}

func Destroy() {


+ 22
- 1
pkg/rm/tcc/fence/fence.go View File

@@ -19,16 +19,37 @@ package fence

import (
"flag"
"seata.apache.org/seata-go/pkg/rm/tcc/fence/config"
"seata.apache.org/seata-go/pkg/rm/tcc/fence/handler"
"seata.apache.org/seata-go/pkg/rm/tcc/fence/store/db/dao"
"time"
)

var (
FenceConfig Config
)

func InitFenceConfig(cfg Config) {
FenceConfig = cfg

if FenceConfig.Enable {
dao.GetTccFenceStoreDatabaseMapper().InitLogTableName(FenceConfig.LogTableName)
handler.GetFenceHandler().InitCleanPeriod(FenceConfig.CleanPeriod)
config.InitCleanTask(FenceConfig.Url)
}
}

type Config struct {
Enable bool `yaml:"enable" json:"enable" koanf:"enable"`
Url string `yaml:"url" json:"url" koanf:"url"`
LogTableName string `yaml:"log-table-name" json:"log-table-name" koanf:"log-table-name"`
CleanPeriod time.Duration `yaml:"clean-period" json:"clean-period" koanf:"clean-period"`
}

// RegisterFlagsWithPrefix for Config.
func (cfg *Config) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) {
f.BoolVar(&cfg.Enable, prefix+".enable", false, "Whether the fence is initialized.")
f.StringVar(&cfg.Url, prefix+".url", "", "Data source name.")
f.StringVar(&cfg.LogTableName, prefix+".log-table-name", "tcc_fence_log", "Undo log table name.")
f.DurationVar(&cfg.CleanPeriod, prefix+".clean-period", 24*time.Hour, "Undo log retention time.")
f.DurationVar(&cfg.CleanPeriod, prefix+".clean-period", 5*time.Minute, "Undo log retention time.")
}

+ 1
- 1
pkg/rm/tcc/fence/fennce_driver_test.go View File

@@ -24,7 +24,7 @@ import (
"reflect"
"testing"

gomonkey "github.com/agiledragon/gomonkey/v2"
"github.com/agiledragon/gomonkey/v2"
"github.com/go-sql-driver/mysql"
"github.com/stretchr/testify/assert"
)


+ 116
- 30
pkg/rm/tcc/fence/handler/tcc_fence_wrapper_handler.go View File

@@ -23,38 +23,38 @@ import (
"database/sql"
"errors"
"fmt"
"github.com/go-sql-driver/mysql"
"seata.apache.org/seata-go/pkg/rm/tcc/fence/store/db/dao"
"seata.apache.org/seata-go/pkg/rm/tcc/fence/store/db/model"
"sync"
"time"

"github.com/go-sql-driver/mysql"

"seata.apache.org/seata-go/pkg/rm/tcc/fence/enum"
"seata.apache.org/seata-go/pkg/rm/tcc/fence/store/db/dao"
"seata.apache.org/seata-go/pkg/rm/tcc/fence/store/db/model"
"seata.apache.org/seata-go/pkg/tm"
"seata.apache.org/seata-go/pkg/util/log"
)

type tccFenceWrapperHandler struct {
tccFenceDao dao.TCCFenceStore
logQueue chan *FenceLogIdentity
logQueue chan *model.FenceLogIdentity
logCache list.List
logQueueOnce sync.Once
logQueueCloseOnce sync.Once
}

type FenceLogIdentity struct {
xid string
branchId int64
logTaskOnce sync.Once
db *sql.DB
dbMutex sync.RWMutex
}

const (
maxQueueSize = 500
maxQueueSize = 500
channelDelete = 5
cleanExpired = 24 * time.Hour
)

var (
fenceHandler *tccFenceWrapperHandler
fenceOnce sync.Once
fenceHandler *tccFenceWrapperHandler
fenceOnce sync.Once
cleanInterval = 5 * time.Minute
)

func GetFenceHandler() *tccFenceWrapperHandler {
@@ -68,6 +68,10 @@ func GetFenceHandler() *tccFenceWrapperHandler {
return fenceHandler
}

func (handler *tccFenceWrapperHandler) InitCleanPeriod(time time.Duration) {
cleanInterval = time
}

func (handler *tccFenceWrapperHandler) PrepareFence(ctx context.Context, tx *sql.Tx) error {
xid := tm.GetBusinessActionContext(ctx).Xid
branchId := tm.GetBusinessActionContext(ctx).BranchId
@@ -76,7 +80,7 @@ func (handler *tccFenceWrapperHandler) PrepareFence(ctx context.Context, tx *sql
err := handler.insertTCCFenceLog(tx, xid, branchId, actionName, enum.StatusTried)
if err != nil {
if mysqlError, ok := errors.Unwrap(err).(*mysql.MySQLError); ok && mysqlError.Number == 1062 {
// todo add clean command to channel.
log.Warnf("tcc fence record already exists, idempotency rejected. xid: %s, branchId: %d", xid, branchId)
handler.pushCleanChannel(xid, branchId)
}
return fmt.Errorf("insert tcc fence record errors, prepare fence failed. xid= %s, branchId= %d, [%w]", xid, branchId, err)
@@ -114,6 +118,7 @@ func (handler *tccFenceWrapperHandler) RollbackFence(ctx context.Context, tx *sq
xid := tm.GetBusinessActionContext(ctx).Xid
branchId := tm.GetBusinessActionContext(ctx).BranchId
actionName := tm.GetBusinessActionContext(ctx).ActionName

fenceDo, err := handler.tccFenceDao.QueryTCCFenceDO(tx, xid, branchId)
if err != nil {
return fmt.Errorf("rollback fence method failed. xid= %s, branchId= %d, [%w]", xid, branchId, err)
@@ -157,33 +162,86 @@ func (handler *tccFenceWrapperHandler) updateFenceStatus(tx *sql.Tx, xid string,
return handler.tccFenceDao.UpdateTCCFenceDO(tx, xid, branchId, enum.StatusTried, status)
}

func (handler *tccFenceWrapperHandler) InitLogCleanChannel() {
func (handler *tccFenceWrapperHandler) InitLogCleanChannel(dsn string) {

db, err := sql.Open("mysql", dsn)
if err != nil {
log.Warnf("failed to open database: %v", err)
return
}

handler.dbMutex.Lock()
handler.db = db
handler.dbMutex.Unlock()

handler.logQueueOnce.Do(func() {
go handler.traversalCleanChannel()
go handler.traversalCleanChannel(db)
})

handler.logTaskOnce.Do(func() {
go handler.initLogCleanTask(db)
})

}

func (handler *tccFenceWrapperHandler) initLogCleanTask(db *sql.DB) {

ticker := time.NewTicker(cleanInterval)
defer ticker.Stop()

for range ticker.C {
tx, err := db.Begin()
if err != nil {
log.Warnf("failed to begin transaction: %v", err)
continue
}

expiredTime := time.Now().Add(-cleanExpired)
identityList, err := handler.tccFenceDao.QueryTCCFenceLogIdentityByMdDate(tx, expiredTime)

if err != nil {
log.Warnf("failed to delete expired logs: %v", err)
tx.Rollback()
continue
}

err = tx.Commit()
if err != nil {
log.Errorf("failed to commit transaction: %v", err)
}

// push to clean channel
for _, identity := range identityList {
handler.logQueue <- &identity
}
}
}

func (handler *tccFenceWrapperHandler) DestroyLogCleanChannel() {
handler.logQueueCloseOnce.Do(func() {
close(handler.logQueue)
handler.dbMutex.Lock()
if handler.db != nil {
handler.db.Close()
handler.db = nil
}
handler.dbMutex.Unlock()
})
}

func (handler *tccFenceWrapperHandler) deleteFence(xid string, id int64) error {
// todo implement
func (handler *tccFenceWrapperHandler) deleteBatchFence(tx *sql.Tx, batch []model.FenceLogIdentity) error {
err := handler.tccFenceDao.DeleteMultipleTCCFenceLogIdentity(tx, batch)
if err != nil {
return fmt.Errorf("delete batch fence log failed, batch: %v, err: %v", batch, err)
}
return nil
}

func (handler *tccFenceWrapperHandler) deleteFenceByDate(datetime time.Time) int32 {
// todo implement
return 0
}

func (handler *tccFenceWrapperHandler) pushCleanChannel(xid string, branchId int64) {
// todo implement
fli := &FenceLogIdentity{
xid: xid,
branchId: branchId,
fli := &model.FenceLogIdentity{
Xid: xid,
BranchId: branchId,
}
select {
case handler.logQueue <- fli:
@@ -194,11 +252,39 @@ func (handler *tccFenceWrapperHandler) pushCleanChannel(xid string, branchId int
log.Infof("add one log to clean queue: %v ", fli)
}

func (handler *tccFenceWrapperHandler) traversalCleanChannel() {
handler.logQueue = make(chan *FenceLogIdentity, maxQueueSize)
func (handler *tccFenceWrapperHandler) traversalCleanChannel(db *sql.DB) {

if handler.logQueue == nil {
handler.logQueue = make(chan *model.FenceLogIdentity, maxQueueSize)
}

counter := 0
batch := []model.FenceLogIdentity{}

for li := range handler.logQueue {
if err := handler.deleteFence(li.xid, li.branchId); err != nil {
log.Errorf("delete fence log failed, xid: %s, branchId: &s", li.xid, li.branchId)
counter++
batch = append(batch, *li)

if counter%channelDelete == 0 {
tx, _ := db.Begin()
err := handler.deleteBatchFence(tx, batch)
if err != nil {
log.Errorf("delete batch fence log failed, batch: %v, err: %v", batch, err)
} else {
tx.Commit()
}
counter = 0
batch = []model.FenceLogIdentity{}
}
}

if len(batch) > 0 {
tx, _ := db.Begin()
err := handler.deleteBatchFence(tx, batch)
if err != nil {
log.Errorf("delete batch fence log failed, batch: %v, err: %v", batch, err)
} else {
tx.Commit()
}
}
}

+ 14
- 3
pkg/rm/tcc/fence/store/db/dao/store_api.go View File

@@ -36,6 +36,11 @@ type TCCFenceStore interface {
// return the tcc fence do and error msg
QueryTCCFenceDO(tx *sql.Tx, xid string, branchId int64) (*model.TCCFenceDO, error)

// QueryTCCFenceLogIdentityByMdDate tcc fence do by status.
// param tx the tx will bind with user business method
// param datetime modify time
QueryTCCFenceLogIdentityByMdDate(tx *sql.Tx, datetime time.Time) ([]model.FenceLogIdentity, error)

// InsertTCCFenceDO tcc fence do boolean.
// param tx the tx will bind with user business method
// param tccFenceDO the tcc fence do
@@ -57,11 +62,17 @@ type TCCFenceStore interface {
// return the error msg
DeleteTCCFenceDO(tx *sql.Tx, xid string, branchId int64) error

// DeleteTCCFenceDOByMdfDate tcc fence by datetime.
// DeleteMultipleTCCFenceLogIdentity tcc fence log identity boolean.
// param tx the tx will bind with user business method
// param datetime modify time
// param identity the tcc fence log identity
// return the error msg
DeleteTCCFenceDOByMdfDate(tx *sql.Tx, datetime time.Time) error
DeleteMultipleTCCFenceLogIdentity(tx *sql.Tx, identity []model.FenceLogIdentity) error

// DeleteTCCFenceDOByMdfDate tcc fence by datetime.
// param tx the tx will bind with user business method
// param datetime modify time, int32 limit delete
// return the delete number and error msg
DeleteTCCFenceDOByMdfDate(tx *sql.Tx, datetime time.Time, limit int32) (int64, error)

// SetLogTableName LogTable ColumnName
// param logTableName logTableName


+ 78
- 12
pkg/rm/tcc/fence/store/db/dao/tcc_fence_db.go View File

@@ -21,6 +21,8 @@ import (
"context"
"database/sql"
"fmt"
"seata.apache.org/seata-go/pkg/util/log"
"strings"
"sync"
"time"

@@ -39,18 +41,22 @@ var (
func GetTccFenceStoreDatabaseMapper() *TccFenceStoreDatabaseMapper {
if tccFenceStoreDatabaseMapper == nil {
once.Do(func() {
tccFenceStoreDatabaseMapper = &TccFenceStoreDatabaseMapper{}
tccFenceStoreDatabaseMapper.InitLogTableName()
tccFenceStoreDatabaseMapper = &TccFenceStoreDatabaseMapper{
logTableName: "tcc_fence_log",
}
})
}
return tccFenceStoreDatabaseMapper
}

func (t *TccFenceStoreDatabaseMapper) InitLogTableName() {
// todo get log table name from config
func (t *TccFenceStoreDatabaseMapper) InitLogTableName(logTableName string) {
// set log table name
// default name is tcc_fence_log
t.logTableName = "tcc_fence_log"
if logTableName != "" {
t.logTableName = logTableName
} else {
t.logTableName = "tcc_fence_log"
}
}

type TccFenceStoreDatabaseMapper struct {
@@ -75,7 +81,7 @@ func (t *TccFenceStoreDatabaseMapper) QueryTCCFenceDO(tx *sql.Tx, xid string, br
if err = result.Scan(&xid, &branchId, &actionName, &status, &gmtCreate, &gmtModify); err != nil {
// will return error, if rows is empty
if err.Error() == "sql: no rows in result set" {
return nil, fmt.Errorf("query tcc fence get scan row,no rows in result set, [%w]", err)
return nil, nil
} else {
return nil, fmt.Errorf("query tcc fence get scan row failed, [%w]", err)
}
@@ -92,6 +98,35 @@ func (t *TccFenceStoreDatabaseMapper) QueryTCCFenceDO(tx *sql.Tx, xid string, br
return tccFenceDo, nil
}

func (t *TccFenceStoreDatabaseMapper) QueryTCCFenceLogIdentityByMdDate(tx *sql.Tx, datetime time.Time) ([]model.FenceLogIdentity, error) {
prepareStmt, err := tx.PrepareContext(context.Background(), sql2.GetQuerySQLByMdDate(t.logTableName))
if err != nil {
return nil, fmt.Errorf("query tcc fence prepare sql failed, [%w]", err)
}
defer prepareStmt.Close()

rows, err := prepareStmt.Query(datetime)
if err != nil {
return nil, fmt.Errorf("query tcc fence exec sql failed, [%w]", err)
}
defer rows.Close()

var fenceLogIdentities []model.FenceLogIdentity
for rows.Next() {
var xid string
var branchId int64
err := rows.Scan(&xid, &branchId)
if err != nil {
return nil, fmt.Errorf("query tcc fence get scan row failed, [%w]", err)
}
fenceLogIdentities = append(fenceLogIdentities, model.FenceLogIdentity{
Xid: xid,
BranchId: branchId,
})
}
return fenceLogIdentities, nil
}

func (t *TccFenceStoreDatabaseMapper) InsertTCCFenceDO(tx *sql.Tx, tccFenceDo *model.TCCFenceDO) error {
prepareStmt, err := tx.PrepareContext(context.Background(), sql2.GetInsertLocalTCCLogSQL(t.logTableName))
if err != nil {
@@ -157,24 +192,55 @@ func (t *TccFenceStoreDatabaseMapper) DeleteTCCFenceDO(tx *sql.Tx, xid string, b
return nil
}

func (t *TccFenceStoreDatabaseMapper) DeleteTCCFenceDOByMdfDate(tx *sql.Tx, datetime time.Time) error {
prepareStmt, err := tx.PrepareContext(context.Background(), sql2.GetDeleteSQLByMdfDateAndStatus(t.logTableName))
func (t *TccFenceStoreDatabaseMapper) DeleteMultipleTCCFenceLogIdentity(tx *sql.Tx, identities []model.FenceLogIdentity) error {

placeholders := strings.Repeat("(?,?),", len(identities)-1) + "(?,?)"
prepareStmt, err := tx.PrepareContext(context.Background(), sql2.GertDeleteSQLByBranchIdsAndXids(t.logTableName, placeholders))
if err != nil {
return fmt.Errorf("delete tcc fence prepare sql failed, [%w]", err)
}
defer prepareStmt.Close()

result, err := prepareStmt.Exec(datetime)
// prepare args
args := make([]interface{}, 0, len(identities)*2)
for _, identity := range identities {
args = append(args, identity.Xid, identity.BranchId)
}

result, err := prepareStmt.Exec(args...)

if err != nil {
return fmt.Errorf("delete tcc fences exec sql failed, [%w]", err)
}

log.Debugf("Delete SQL: %s, args: %v", sql2.GertDeleteSQLByBranchIdsAndXids(t.logTableName, placeholders), args)

_, err = result.RowsAffected()
if err != nil {
return fmt.Errorf("delete tcc fences get affected rows failed, [%w]", err)
}

return nil
}

func (t *TccFenceStoreDatabaseMapper) DeleteTCCFenceDOByMdfDate(tx *sql.Tx, datetime time.Time, limit int32) (int64, error) {
prepareStmt, err := tx.PrepareContext(context.Background(), sql2.GetDeleteSQLByMdfDateAndStatus(t.logTableName))
if err != nil {
return -1, fmt.Errorf("delete tcc fence prepare sql failed, [%w]", err)
}
defer prepareStmt.Close()

result, err := prepareStmt.Exec(datetime, limit)
if err != nil {
return fmt.Errorf("delete tcc fence exec sql failed, [%w]", err)
return -1, fmt.Errorf("delete tcc fence exec sql failed, [%w]", err)
}

affected, err := result.RowsAffected()
if err != nil || affected == 0 {
return fmt.Errorf("delete tcc fence get affected rows failed, [%w]", err)
return 0, fmt.Errorf("delete tcc fence get affected rows failed, [%w]", err)
}

return nil
return affected, nil
}

func (t *TccFenceStoreDatabaseMapper) SetLogTableName(logTable string) {


+ 71
- 2
pkg/rm/tcc/fence/store/db/dao/tcc_fence_db_test.go View File

@@ -23,6 +23,7 @@ import (
"database/sql/driver"
"math"
"reflect"
"strings"
"testing"
"time"

@@ -111,6 +112,40 @@ func TestTccFenceStoreDatabaseMapper_QueryTCCFenceDO(t *testing.T) {
assert.Nil(t, err)
}

func TestTccFenceStoreDatabaseMapper_QueryTCCFenceLogIdentityByMdDate(t *testing.T) {
now := time.Now()
tccFenceDo := &model.TCCFenceDO{
Xid: "123123124124",
BranchId: 12312312312,
ActionName: "fence_test",
Status: enum.StatusTried,
GmtCreate: now,
GmtModified: now,
}
db, mock, err := sqlmock.New(sqlmock.QueryMatcherOption(sqlmock.QueryMatcherEqual))
if err != nil {
t.Fatalf("open db failed msg: %v", err)
}
defer db.Close()
mock.ExpectBegin()
mock.ExpectPrepare(sql2.GetQuerySQLByMdDate("tcc_fence_log")).
ExpectQuery().
WithArgs(driver.Value(tccFenceDo.GmtModified)).
WillReturnRows(sqlmock.NewRows([]string{"xid", "branch_id"}).
AddRow(driver.Value(tccFenceDo.Xid), driver.Value(tccFenceDo.BranchId)))
mock.ExpectCommit()
tx, err := db.BeginTx(context.Background(), &sql.TxOptions{})
if err != nil {
t.Fatalf("open conn failed msg :%v", err)
}

actualFenceDo, err := GetTccFenceStoreDatabaseMapper().QueryTCCFenceLogIdentityByMdDate(tx, tccFenceDo.GmtModified)
tx.Commit()
assert.Equal(t, tccFenceDo.Xid, actualFenceDo[0].Xid)
assert.Equal(t, tccFenceDo.BranchId, actualFenceDo[0].BranchId)
assert.Nil(t, err)
}

func TestTccFenceStoreDatabaseMapper_UpdateTCCFenceDO(t *testing.T) {
now := time.Now()
tccFenceDo := &model.TCCFenceDO{
@@ -177,6 +212,35 @@ func TestTccFenceStoreDatabaseMapper_DeleteTCCFenceDO(t *testing.T) {
assert.Equal(t, nil, err)
}

func TestTccFenceStoreDatabaseMapper_DeleteMultipleTCCFenceDO(t *testing.T) {
identities := []model.FenceLogIdentity{
{Xid: "123123124124", BranchId: 12312312312},
{Xid: "123123124125", BranchId: 12312312313},
}
db, mock, err := sqlmock.New(sqlmock.QueryMatcherOption(sqlmock.QueryMatcherEqual))
if err != nil {
t.Fatalf("open db failed msg: %v", err)
}
defer db.Close()

placeholders := strings.Repeat("(?,?),", len(identities)-1) + "(?,?)"
mock.ExpectBegin()
mock.ExpectPrepare(sql2.GertDeleteSQLByBranchIdsAndXids("tcc_fence_log", placeholders)).
ExpectExec().
WithArgs(driver.Value(identities[0].Xid), driver.Value(identities[0].BranchId), driver.Value(identities[1].Xid), driver.Value(identities[1].BranchId)).
WillReturnResult(sqlmock.NewResult(1, 2))
mock.ExpectCommit()

tx, err := db.BeginTx(context.Background(), &sql.TxOptions{})
if err != nil {
t.Fatalf("open conn failed msg :%v", err)
}

err = GetTccFenceStoreDatabaseMapper().DeleteMultipleTCCFenceLogIdentity(tx, identities)
tx.Commit()
assert.Equal(t, nil, err)
}

func TestTccFenceStoreDatabaseMapper_DeleteTCCFenceDOByMdfDate(t *testing.T) {
now := time.Now()
tccFenceDo := &model.TCCFenceDO{
@@ -190,7 +254,7 @@ func TestTccFenceStoreDatabaseMapper_DeleteTCCFenceDOByMdfDate(t *testing.T) {
mock.ExpectBegin()
mock.ExpectPrepare(sql2.GetDeleteSQLByMdfDateAndStatus("tcc_fence_log")).
ExpectExec().
WithArgs(driver.Value(tccFenceDo.GmtModified.Add(math.MaxInt32))).
WithArgs(driver.Value(tccFenceDo.GmtModified.Add(math.MaxInt32)), 1000).
WillReturnResult(sqlmock.NewResult(1, 1))
mock.ExpectCommit()

@@ -198,7 +262,12 @@ func TestTccFenceStoreDatabaseMapper_DeleteTCCFenceDOByMdfDate(t *testing.T) {
if err != nil {
t.Fatalf("open conn failed msg :%v", err)
}
err = GetTccFenceStoreDatabaseMapper().DeleteTCCFenceDOByMdfDate(tx, tccFenceDo.GmtModified.Add(math.MaxInt32))
affect, err := GetTccFenceStoreDatabaseMapper().DeleteTCCFenceDOByMdfDate(tx, tccFenceDo.GmtModified.Add(math.MaxInt32), 1000)
tx.Commit()
assert.Equal(t, nil, err)
assert.Equal(t, int64(1), affect)

if err := mock.ExpectationsWereMet(); err != nil {
t.Errorf("there were unfulfilled expectations: %v", err)
}
}

+ 9
- 0
pkg/rm/tcc/fence/store/db/model/tcc_fence_do.go View File

@@ -44,3 +44,12 @@ type TCCFenceDO struct {
// GmtModified update time
GmtModified time.Time
}

type FenceLogIdentity struct {

// Xid the global transaction id
Xid string

// BranchId the branch transaction id
BranchId int64
}

+ 21
- 1
pkg/rm/tcc/fence/store/db/sql/tcc_fence_store_sql.go View File

@@ -28,20 +28,32 @@ var (
// localTccLogPlaced The enum LocalTccLogPlaced
localTccLogPlaced = " %s "

pramPlaceHolder = "%s "

// insertLocalTccLog The enum InsertLocalTccLog
insertLocalTccLog = "insert into " + localTccLogPlaced + " (xid, branch_id, action_name, status, gmt_create, gmt_modified) values ( ?,?,?,?,?,?)"

// queryByBranchIdAndXid The enum QueryByBranchIdAndXid
queryByBranchIdAndXid = "select xid, branch_id, action_name, status, gmt_create, gmt_modified from " + localTccLogPlaced + " where xid = ? and branch_id = ? for update"

// queryByMdDate The enum QueryByMdDate
queryByMdDate = "select xid, branch_id from " + localTccLogPlaced + " where gmt_modified < ? " +
" and status in (" + strconv.Itoa(int(enum.StatusCommitted)) + " , " + strconv.Itoa(int(enum.StatusRollbacked)) + " , " + strconv.Itoa(int(enum.StatusSuspended)) + ")"

// updateStatusByBranchIdAndXid The enum UpdateStatusByBranchIdAndXid
updateStatusByBranchIdAndXid = "update " + localTccLogPlaced + " set status = ?, gmt_modified = ? where xid = ? and branch_id = ? and status = ? "

// deleteByBranchIdAndXid The enum DeleteByBranchIdAndXid
deleteByBranchIdAndXid = "delete from " + localTccLogPlaced + " where xid = ? and branch_id = ? "

// deleteByBranchIdsAndXids The enum DeleteByBranchIdsAndXids
deleteByBranchIdsAndXids = "delete from " + localTccLogPlaced + " where (xid,branch_id) in (" + pramPlaceHolder + ")"

// deleteByDateAndStatus The enum DeleteByDateAndStatus
deleteByDateAndStatus = "delete from " + localTccLogPlaced + " where gmt_modified < ? and status in (" + strconv.Itoa(int(enum.StatusCommitted)) + " , " + strconv.Itoa(int(enum.StatusRollbacked)) + " , " + strconv.Itoa(int(enum.StatusSuspended)) + ")"
deleteByDateAndStatus = "delete from " + localTccLogPlaced +
" where gmt_modified < ? and" +
" status in (" + strconv.Itoa(int(enum.StatusCommitted)) + " , " + strconv.Itoa(int(enum.StatusRollbacked)) + " , " + strconv.Itoa(int(enum.StatusSuspended)) + ")" +
" limit ?"
)

func GetInsertLocalTCCLogSQL(localTccTable string) string {
@@ -60,6 +72,14 @@ func GetDeleteSQLByBranchIdAndXid(localTccTable string) string {
return fmt.Sprintf(deleteByBranchIdAndXid, localTccTable)
}

func GertDeleteSQLByBranchIdsAndXids(localTccTable string, paramsPlaceHolder string) string {
return fmt.Sprintf(deleteByBranchIdsAndXids, localTccTable, paramsPlaceHolder)
}

func GetDeleteSQLByMdfDateAndStatus(localTccTable string) string {
return fmt.Sprintf(deleteByDateAndStatus, localTccTable)
}

func GetQuerySQLByMdDate(localTccTable string) string {
return fmt.Sprintf(queryByMdDate, localTccTable)
}

+ 3
- 1
pkg/rm/tcc/tcc_resource.go View File

@@ -21,6 +21,7 @@ import (
"context"
"encoding/json"
"fmt"
"seata.apache.org/seata-go/pkg/rm/tcc/fence"
"sync"

"seata.apache.org/seata-go/pkg/constant"
@@ -68,7 +69,8 @@ func (t *TCCResource) GetBranchType() branch.BranchType {
return branch.BranchTypeTCC
}

func InitTCC() {
func InitTCC(cfg fence.Config) {
fence.InitFenceConfig(cfg)
rm.GetRmCacheInstance().RegisterResourceManager(GetTCCResourceManagerInstance())
}



+ 2
- 0
testdata/conf/seatago.yml View File

@@ -141,6 +141,8 @@ seata:
tcc:
fence:
# Anti suspension table name
enable: false
url: root:12345678@tcp(127.0.0.1:3306)/seata_client1?charset=utf8&parseTime=True
log-table-name: tcc_fence_log_test
clean-period: 60s
# getty configuration


Loading…
Cancel
Save