Browse Source

feature/datasource_at merge to master (#213)

feature: add transaction at datasource
tags/1.0.2-RC1
liaochuntao GitHub 3 years ago
parent
commit
0cb9e30fc6
No known key found for this signature in database GPG Key ID: 4AEE18F83AFDEB23
35 changed files with 3798 additions and 25 deletions
  1. +3
    -0
      .gitignore
  2. +7
    -4
      go.mod
  3. +591
    -21
      go.sum
  4. +270
    -0
      pkg/datasource/sql/at.go
  5. +321
    -0
      pkg/datasource/sql/conn.go
  6. +66
    -0
      pkg/datasource/sql/connector.go
  7. +171
    -0
      pkg/datasource/sql/datasource/base/meta_cache.go
  8. +196
    -0
      pkg/datasource/sql/datasource/datasource_manager.go
  9. +29
    -0
      pkg/datasource/sql/datasource/mysql/default.go
  10. +45
    -0
      pkg/datasource/sql/datasource/mysql/meta_cache.go
  11. +118
    -0
      pkg/datasource/sql/db.go
  12. +202
    -0
      pkg/datasource/sql/driver.go
  13. +138
    -0
      pkg/datasource/sql/exec/executor.go
  14. +68
    -0
      pkg/datasource/sql/exec/hook.go
  15. +59
    -0
      pkg/datasource/sql/exec/hook/logger_executor.go
  16. +20
    -0
      pkg/datasource/sql/handler/rm_handler_at.go
  17. +20
    -0
      pkg/datasource/sql/handler/rm_handler_xa.go
  18. +18
    -0
      pkg/datasource/sql/lock.go
  19. +29
    -0
      pkg/datasource/sql/parser/executortype_string.go
  20. +87
    -0
      pkg/datasource/sql/parser/parser_factory.go
  21. +26
    -0
      pkg/datasource/sql/plugin.go
  22. +87
    -0
      pkg/datasource/sql/sql_test.go
  23. +198
    -0
      pkg/datasource/sql/stmt.go
  24. +215
    -0
      pkg/datasource/sql/tx.go
  25. +28
    -0
      pkg/datasource/sql/types/dbtype_string.go
  26. +57
    -0
      pkg/datasource/sql/types/error.go
  27. +101
    -0
      pkg/datasource/sql/types/image.go
  28. +64
    -0
      pkg/datasource/sql/types/meta.go
  29. +55
    -0
      pkg/datasource/sql/types/sql.go
  30. +53
    -0
      pkg/datasource/sql/types/sqltype_string.go
  31. +195
    -0
      pkg/datasource/sql/types/types.go
  32. +60
    -0
      pkg/datasource/sql/undo/base/undo.go
  33. +29
    -0
      pkg/datasource/sql/undo/mysql/default.go
  34. +62
    -0
      pkg/datasource/sql/undo/mysql/undo.go
  35. +110
    -0
      pkg/datasource/sql/undo/undo.go

+ 3
- 0
.gitignore View File

@@ -19,3 +19,6 @@ dist/

# Dependency directories (remove the comment below to include it)
# vendor/
.vscode
.codecc
vendor

+ 7
- 4
go.mod View File

@@ -8,15 +8,18 @@ require (
github.com/agiledragon/gomonkey v2.0.2+incompatible
github.com/apache/dubbo-getty v1.4.8
github.com/dubbogo/gost v1.12.5
github.com/go-sql-driver/mysql v1.6.0
github.com/google/uuid v1.3.0
github.com/natefinch/lumberjack v2.0.0+incompatible
github.com/pingcap/tidb v1.1.0-beta.0.20211124132551-4a1b2e9fe5b5
github.com/pingcap/tidb/parser v0.0.0-20211124132551-4a1b2e9fe5b5
github.com/pingcap/tipb v0.0.0-20220628092852-069ef6c8fc90 // indirect
github.com/pkg/errors v0.9.1
github.com/stretchr/testify v1.7.1
go.uber.org/atomic v1.9.0
go.uber.org/zap v1.21.0
golang.org/x/net v0.0.0-20220722155237-a158d28d115b // indirect
golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4 // indirect
golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f // indirect
google.golang.org/grpc v1.45.0
google.golang.org/genproto v0.0.0-20220630174209-ad1d48641aa7 // indirect
google.golang.org/grpc v1.47.0
google.golang.org/protobuf v1.28.0
vimagination.zapto.org/byteio v0.0.0-20200222190125-d27cba0f0b10
vimagination.zapto.org/memio v0.0.0-20200222190306-588ebc67b97d // indirect


+ 591
- 21
go.sum
File diff suppressed because it is too large
View File


+ 270
- 0
pkg/datasource/sql/at.go View File

@@ -0,0 +1,270 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package sql

import (
"context"
"database/sql"
"fmt"
"os"
"strconv"
"sync"
"time"

"github.com/seata/seata-go/pkg/datasource/sql/datasource"
"github.com/seata/seata-go/pkg/datasource/sql/types"
"github.com/seata/seata-go/pkg/datasource/sql/undo"
"github.com/seata/seata-go/pkg/protocol/branch"
"github.com/seata/seata-go/pkg/protocol/message"
"github.com/seata/seata-go/pkg/rm"
)

const (
_defaultResourceSize = 16
_undoLogDeleteLimitSize = 1000
)

func init() {
datasource.RegisterResourceManager(branch.BranchTypeAT,
&ATSourceManager{
resourceCache: sync.Map{},
basic: datasource.NewBasicSourceManager(),
})
}

type ATSourceManager struct {
resourceCache sync.Map
worker *asyncATWorker
basic *datasource.BasicSourceManager
}

// Register a Resource to be managed by Resource Manager
func (mgr *ATSourceManager) RegisterResource(res rm.Resource) error {
mgr.resourceCache.Store(res.GetResourceId(), res)

return mgr.basic.RegisterResource(res)
}

// Unregister a Resource from the Resource Manager
func (mgr *ATSourceManager) UnregisterResource(res rm.Resource) error {
return mgr.basic.UnregisterResource(res)
}

// Get all resources managed by this manager
func (mgr *ATSourceManager) GetManagedResources() map[string]rm.Resource {
ret := make(map[string]rm.Resource)

mgr.resourceCache.Range(func(key, value interface{}) bool {
ret[key.(string)] = value.(rm.Resource)
return true
})

return ret
}

// BranchRollback Rollback the corresponding transactions according to the request
func (mgr *ATSourceManager) BranchRollback(ctx context.Context, req message.BranchRollbackRequest) (branch.BranchStatus, error) {
val, ok := mgr.resourceCache.Load(req.ResourceId)

if !ok {
return branch.BranchStatusPhaseoneFailed, fmt.Errorf("resource %s not found", req.ResourceId)
}

res := val.(*DBResource)

undoMgr, err := undo.GetUndoLogManager(res.dbType)
if err != nil {
return branch.BranchStatusUnknown, err
}

conn, err := res.target.Conn(ctx)
if err != nil {
return branch.BranchStatusUnknown, err
}

if err := undoMgr.RunUndo(req.Xid, req.BranchId, conn); err != nil {
transErr, ok := err.(*types.TransactionError)
if !ok {
return branch.BranchStatusPhaseoneFailed, err
}

if transErr.Code() == types.ErrorCodeBranchRollbackFailedUnretriable {
return branch.BranchStatusPhasetwoRollbackFailedUnretryable, nil
}

return branch.BranchStatusPhasetwoRollbackFailedRetryable, nil
}

return branch.BranchStatusPhasetwoRollbacked, nil
}

// BranchCommit
func (mgr *ATSourceManager) BranchCommit(ctx context.Context, req message.BranchCommitRequest) (branch.BranchStatus, error) {
mgr.worker.branchCommit(ctx, req)
return branch.BranchStatusPhaseoneDone, nil
}

// LockQuery
func (mgr *ATSourceManager) LockQuery(ctx context.Context, req message.GlobalLockQueryRequest) (bool, error) {
return false, nil
}

// BranchRegister
func (mgr *ATSourceManager) BranchRegister(ctx context.Context, clientId string, req message.BranchRegisterRequest) (int64, error) {
return 0, nil
}

// BranchReport
func (mgr *ATSourceManager) BranchReport(ctx context.Context, req message.BranchReportRequest) error {
return nil
}

// CreateTableMetaCache
func (mgr *ATSourceManager) CreateTableMetaCache(ctx context.Context, resID string, dbType types.DBType,
db *sql.DB) (datasource.TableMetaCache, error) {
return mgr.basic.CreateTableMetaCache(ctx, resID, dbType, db)
}

type asyncATWorker struct {
asyncCommitBufferLimit int64
commitQueue chan phaseTwoContext
resourceMgr datasource.DataSourceManager
}

func newAsyncATWorker() *asyncATWorker {
asyncCommitBufferLimit := int64(10000)

val := os.Getenv("CLIENT_RM_ASYNC_COMMIT_BUFFER_LIMIT")
if val != "" {
limit, _ := strconv.ParseInt(val, 10, 64)
if limit != 0 {
asyncCommitBufferLimit = limit
}
}

worker := &asyncATWorker{
commitQueue: make(chan phaseTwoContext, asyncCommitBufferLimit),
}

return worker
}

func (w *asyncATWorker) doBranchCommitSafely() {
batchSize := 64

ticker := time.NewTicker(1 * time.Second)
phaseCtxs := make([]phaseTwoContext, 0, batchSize)

for {
select {
case phaseCtx := <-w.commitQueue:
phaseCtxs = append(phaseCtxs, phaseCtx)
if len(phaseCtxs) == batchSize {
tmp := phaseCtxs
w.doBranchCommit(tmp)
phaseCtxs = make([]phaseTwoContext, 0, batchSize)
}
case <-ticker.C:
tmp := phaseCtxs
w.doBranchCommit(tmp)

phaseCtxs = make([]phaseTwoContext, 0, batchSize)
}
}
}

func (w *asyncATWorker) doBranchCommit(phaseCtxs []phaseTwoContext) {
groupCtxs := make(map[string][]phaseTwoContext, _defaultResourceSize)

for i := range phaseCtxs {
if phaseCtxs[i].ResourceID == "" {
continue
}

if _, ok := groupCtxs[phaseCtxs[i].ResourceID]; !ok {
groupCtxs[phaseCtxs[i].ResourceID] = make([]phaseTwoContext, 0, 4)
}

ctxs := groupCtxs[phaseCtxs[i].ResourceID]
ctxs = append(ctxs, phaseCtxs[i])

groupCtxs[phaseCtxs[i].ResourceID] = ctxs
}

for k := range groupCtxs {
w.dealWithGroupedContexts(k, groupCtxs[k])
}
}

func (w *asyncATWorker) dealWithGroupedContexts(resID string, phaseCtxs []phaseTwoContext) {
val, ok := w.resourceMgr.GetManagedResources()[resID]
if !ok {
for i := range phaseCtxs {
w.commitQueue <- phaseCtxs[i]
}
return
}

res := val.(*DBResource)

conn, err := res.target.Conn(context.Background())
if err != nil {
for i := range phaseCtxs {
w.commitQueue <- phaseCtxs[i]
}
}

defer conn.Close()

undoMgr, err := undo.GetUndoLogManager(res.dbType)
if err != nil {
for i := range phaseCtxs {
w.commitQueue <- phaseCtxs[i]
}

return
}

for i := range phaseCtxs {
phaseCtx := phaseCtxs[i]
if err := undoMgr.DeleteUndoLogs([]string{phaseCtx.Xid}, []int64{phaseCtx.BranchID}, conn); err != nil {
w.commitQueue <- phaseCtx
}
}
}

func (w *asyncATWorker) branchCommit(ctx context.Context, req message.BranchCommitRequest) {
phaseCtx := phaseTwoContext{
Xid: req.Xid,
BranchID: req.BranchId,
ResourceID: req.ResourceId,
}

select {
case w.commitQueue <- phaseCtx:
case <-ctx.Done():
}

return
}

type phaseTwoContext struct {
Xid string
BranchID int64
ResourceID string
}

+ 321
- 0
pkg/datasource/sql/conn.go View File

@@ -0,0 +1,321 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package sql

import (
"context"
gosql "database/sql"
"database/sql/driver"
"errors"

"github.com/seata/seata-go/pkg/datasource/sql/exec"
"github.com/seata/seata-go/pkg/datasource/sql/types"
)

type Conn struct {
res *DBResource
txCtx *types.TransactionContext
conn driver.Conn
}

func (c *Conn) ResetSession(ctx context.Context) error {
conn, ok := c.conn.(driver.SessionResetter)
if !ok {
return driver.ErrSkip
}

c.txCtx = nil
return conn.ResetSession(ctx)
}

// Prepare returns a prepared statement, bound to this connection.
func (c *Conn) Prepare(query string) (driver.Stmt, error) {
s, err := c.conn.Prepare(query)
if err != nil {
return nil, err
}

return &Stmt{
conn: c,
stmt: s,
query: query,
res: c.res,
txCtx: c.txCtx,
}, nil
}

// PrepareContext
func (c *Conn) PrepareContext(ctx context.Context, query string) (driver.Stmt, error) {
conn, ok := c.conn.(driver.ConnPrepareContext)
if !ok {
stmt, err := c.conn.Prepare(query)
if err != nil {
return nil, err
}

return &Stmt{stmt: stmt, query: query, res: c.res, txCtx: c.txCtx}, nil
}

s, err := conn.PrepareContext(ctx, query)
if err != nil {
return nil, err
}

return &Stmt{
conn: c,
stmt: s,
query: query,
res: c.res,
txCtx: c.txCtx,
}, nil
}

// Exec
func (c *Conn) Exec(query string, args []driver.Value) (driver.Result, error) {
conn, ok := c.conn.(driver.Execer)
if !ok {
return nil, driver.ErrSkip
}

if c.txCtx != nil {
// in transaction, need run Executor
executor, err := exec.BuildExecutor(c.res.dbType, query)
if err != nil {
return nil, err
}

execCtx := &exec.ExecContext{
TxCtx: c.txCtx,
Query: query,
Values: args,
}

ret, err := executor.ExecWithValue(context.Background(), execCtx,
func(ctx context.Context, query string, args []driver.Value) (types.ExecResult, error) {
ret, err := conn.Exec(query, args)
if err != nil {
return nil, err
}

return types.NewResult(types.WithResult(ret)), nil
})
if err != nil {
return nil, err
}

return ret.GetResult(), nil
}

return conn.Exec(query, args)
}

// ExecContext
func (c *Conn) ExecContext(ctx context.Context, query string, args []driver.NamedValue) (driver.Result, error) {
conn, ok := c.conn.(driver.ExecerContext)
if ok {
values := make([]driver.Value, 0, len(args))

for i := range args {
values = append(values, args[i].Value)
}

return c.Exec(query, values)
}

executor, err := exec.BuildExecutor(c.res.dbType, query)
if err != nil {
return nil, err
}

execCtx := &exec.ExecContext{
TxCtx: c.txCtx,
Query: query,
NamedValues: args,
}

ret, err := executor.ExecWithNamedValue(ctx, execCtx,
func(ctx context.Context, query string, args []driver.NamedValue) (types.ExecResult, error) {
ret, err := conn.ExecContext(ctx, query, args)
if err != nil {
return nil, err
}

return types.NewResult(types.WithResult(ret)), nil
})
if err != nil {
return nil, err
}

return ret.GetResult(), nil
}

// QueryContext
func (c *Conn) Query(query string, args []driver.Value) (driver.Rows, error) {
conn, ok := c.conn.(driver.Queryer)
if !ok {
return nil, driver.ErrSkip
}

executor, err := exec.BuildExecutor(c.res.dbType, query)
if err != nil {
return nil, err
}

execCtx := &exec.ExecContext{
TxCtx: c.txCtx,
Query: query,
Values: args,
}

ret, err := executor.ExecWithValue(context.Background(), execCtx,
func(ctx context.Context, query string, args []driver.Value) (types.ExecResult, error) {
ret, err := conn.Query(query, args)
if err != nil {
return nil, err
}

return types.NewResult(types.WithRows(ret)), nil
})
if err != nil {
return nil, err
}

return ret.GetRows(), nil
}

// QueryContext
func (c *Conn) QueryContext(ctx context.Context, query string, args []driver.NamedValue) (driver.Rows, error) {
conn, ok := c.conn.(driver.QueryerContext)
if !ok {
values := make([]driver.Value, 0, len(args))

for i := range args {
values = append(values, args[i].Value)
}

return c.Query(query, values)
}

executor, err := exec.BuildExecutor(c.res.dbType, query)
if err != nil {
return nil, err
}

execCtx := &exec.ExecContext{
TxCtx: c.txCtx,
Query: query,
NamedValues: args,
}

ret, err := executor.ExecWithNamedValue(ctx, execCtx,
func(ctx context.Context, query string, args []driver.NamedValue) (types.ExecResult, error) {
ret, err := conn.QueryContext(ctx, query, args)
if err != nil {
return nil, err
}

return types.NewResult(types.WithRows(ret)), nil
})
if err != nil {
return nil, err
}

return ret.GetRows(), nil
}

// Begin starts and returns a new transaction.
//
// Deprecated: Drivers should implement ConnBeginTx instead (or additionally).
func (c *Conn) Begin() (driver.Tx, error) {
tx, err := c.conn.Begin()
if err != nil {
return nil, err
}

c.txCtx = types.NewTxCtx()
c.txCtx.DBType = c.res.dbType
c.txCtx.TxOpt = driver.TxOptions{}

return newTx(
withDriverConn(c),
withTxCtx(c.txCtx),
withOriginTx(tx),
)
}

func (c *Conn) BeginTx(ctx context.Context, opts driver.TxOptions) (driver.Tx, error) {
if conn, ok := c.conn.(driver.ConnBeginTx); ok {
tx, err := conn.BeginTx(ctx, opts)
if err != nil {
return nil, err
}

c.txCtx = types.NewTxCtx()
c.txCtx.DBType = c.res.dbType
c.txCtx.TxOpt = opts

return newTx(
withDriverConn(c),
withTxCtx(c.txCtx),
withOriginTx(tx),
)
}

// Check the transaction level. If the transaction level is non-default
// then return an error here as the BeginTx driver value is not supported.
if opts.Isolation != driver.IsolationLevel(gosql.LevelDefault) {
return nil, errors.New("sql: driver does not support non-default isolation level")
}

// If a read-only transaction is requested return an error as the
// BeginTx driver value is not supported.
if opts.ReadOnly {
return nil, errors.New("sql: driver does not support read-only transactions")
}

if ctx.Done() == nil {
return c.Begin()
}

txi, err := c.Begin()
if err == nil {
select {
case <-ctx.Done():
txi.Rollback()
return nil, ctx.Err()
default:
}
}
return txi, err
}

// Close invalidates and potentially stops any current
// prepared statements and transactions, marking this
// connection as no longer in use.
//
// Because the sql package maintains a free pool of
// connections and only calls Close when there's a surplus of
// idle connections, it shouldn't be necessary for drivers to
// do their own connection caching.
//
// Drivers must ensure all network calls made by Close
// do not block indefinitely (e.g. apply a timeout).
func (c *Conn) Close() error {
c.txCtx = nil
return c.conn.Close()
}

+ 66
- 0
pkg/datasource/sql/connector.go View File

@@ -0,0 +1,66 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package sql

import (
"context"
"database/sql/driver"
"sync"
)

type seataConnector struct {
conf *seataServerConfig
res *DBResource
once sync.Once
driver driver.Driver
target driver.Connector
}

// Connect returns a connection to the database.
// Connect may return a cached connection (one previously
// closed), but doing so is unnecessary; the sql package
// maintains a pool of idle connections for efficient re-use.
//
// The provided context.Context is for dialing purposes only
// (see net.DialContext) and should not be stored or used for
// other purposes. A default timeout should still be used
// when dialing as a connection pool may call Connect
// asynchronously to any query.
//
// The returned connection is only used by one goroutine at a
// time.
func (c *seataConnector) Connect(ctx context.Context) (driver.Conn, error) {
conn, err := c.target.Connect(ctx)
if err != nil {
return nil, err
}

return &Conn{conn: conn, res: c.res}, nil
}

// Driver returns the underlying Driver of the Connector,
// mainly to maintain compatibility with the Driver method
// on sql.DB.
func (c *seataConnector) Driver() driver.Driver {
c.once.Do(func() {
d := c.target.Driver()
c.driver = d
})

return &SeataDriver{target: c.driver}
}

+ 171
- 0
pkg/datasource/sql/datasource/base/meta_cache.go View File

@@ -0,0 +1,171 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package base

import (
"context"
"errors"
"sync"
"time"

"github.com/seata/seata-go/pkg/datasource/sql/types"
)

type (
// trigger
trigger interface {
LoadOne(table string) (types.TableMeta, error)

LoadAll() ([]types.TableMeta, error)
}

entry struct {
value types.TableMeta
lastAccess time.Time
}
)

// BaseTableMetaCache
type BaseTableMetaCache struct {
lock sync.RWMutex
expireDuration time.Duration
capity int32
size int32
cache map[string]*entry
cancel context.CancelFunc
trigger trigger
}

// NewBaseCache
func NewBaseCache(capity int32, expireDuration time.Duration, trigger trigger) (*BaseTableMetaCache, error) {
ctx, cancel := context.WithCancel(context.Background())

c := &BaseTableMetaCache{
lock: sync.RWMutex{},
capity: capity,
size: 0,
expireDuration: expireDuration,
cache: map[string]*entry{},
cancel: cancel,
trigger: trigger,
}

if err := c.Init(ctx); err != nil {
return nil, err
}

return c, nil
}

// init
func (c *BaseTableMetaCache) Init(ctx context.Context) error {
go c.refresh(ctx)
go c.scanExpire(ctx)

return nil
}

// refresh
func (c *BaseTableMetaCache) refresh(ctx context.Context) {
f := func() {
v, err := c.trigger.LoadAll()
if err != nil {
return
}

c.lock.Lock()
defer c.lock.Unlock()

for i := range v {
tm := v[i]
if _, ok := c.cache[tm.Name]; !ok {
c.cache[tm.Name] = &entry{
value: tm,
}
}
}
}

f()

ticker := time.NewTicker(time.Duration(1 * time.Minute))
defer ticker.Stop()
for range ticker.C {
f()
}
}

// scanExpire
func (c *BaseTableMetaCache) scanExpire(ctx context.Context) {
ticker := time.NewTicker(c.expireDuration)
defer ticker.Stop()
for range ticker.C {

f := func() {
c.lock.Lock()
defer c.lock.Unlock()

cur := time.Now()
for k := range c.cache {
entry := c.cache[k]

if cur.Sub(entry.lastAccess) > c.expireDuration {
delete(c.cache, k)
}
}
}

f()
}
}

// GetTableMeta
func (c *BaseTableMetaCache) GetTableMeta(table string) (types.TableMeta, error) {
c.lock.Lock()
defer c.lock.Unlock()

v, ok := c.cache[table]

if !ok {
meta, err := c.trigger.LoadOne(table)
if err != nil {
return types.TableMeta{}, err
}

if !meta.IsEmpty() {
c.cache[table] = &entry{
value: meta,
lastAccess: time.Now(),
}

return meta, nil
}

return types.TableMeta{}, errors.New("not found table metadata")
}

v.lastAccess = time.Now()
c.cache[table] = v

return v.value, nil
}

func (c *BaseTableMetaCache) Destroy() error {
c.cancel()
return nil
}

+ 196
- 0
pkg/datasource/sql/datasource/datasource_manager.go View File

@@ -0,0 +1,196 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package datasource

import (
"context"
"database/sql"
"errors"
"sync"

"github.com/seata/seata-go/pkg/datasource/sql/types"
"github.com/seata/seata-go/pkg/protocol/branch"
"github.com/seata/seata-go/pkg/protocol/message"
"github.com/seata/seata-go/pkg/rm"
)

var (
atOnce sync.Once
atMgr DataSourceManager
xaMgr DataSourceManager
solts = map[types.DBType]func() TableMetaCache{}
)

// RegisterTableCache
func RegisterTableCache(dbType types.DBType, builder func() TableMetaCache) {
solts[dbType] = builder
}

func RegisterResourceManager(b branch.BranchType, d DataSourceManager) {
if b == branch.BranchTypeAT {
atMgr = d
}

if b == branch.BranchTypeXA {
xaMgr = d
}
}

func GetDataSourceManager(b branch.BranchType) DataSourceManager {
if b == branch.BranchTypeAT {
return atMgr
}
if b == branch.BranchTypeXA {
return xaMgr
}
return nil
}

// DataSourceManager
type DataSourceManager interface {
// Register a Resource to be managed by Resource Manager
RegisterResource(resource rm.Resource) error
// Unregister a Resource from the Resource Manager
UnregisterResource(resource rm.Resource) error
// Get all resources managed by this manager
GetManagedResources() map[string]rm.Resource
// BranchRollback
BranchRollback(ctx context.Context, req message.BranchRollbackRequest) (branch.BranchStatus, error)
// BranchCommit
BranchCommit(ctx context.Context, req message.BranchCommitRequest) (branch.BranchStatus, error)
// LockQuery
LockQuery(ctx context.Context, req message.GlobalLockQueryRequest) (bool, error)
// BranchRegister
BranchRegister(ctx context.Context, clientId string, req message.BranchRegisterRequest) (int64, error)
// BranchReport
BranchReport(ctx context.Context, req message.BranchReportRequest) error
// CreateTableMetaCache
CreateTableMetaCache(ctx context.Context, resID string, dbType types.DBType, db *sql.DB) (TableMetaCache, error)
}

type entry struct {
db *sql.DB
metaCache TableMetaCache
}

// BasicSourceManager
type BasicSourceManager struct {
// lock
lock sync.RWMutex
// tableMetaCache
tableMetaCache map[string]*entry
}

func NewBasicSourceManager() *BasicSourceManager {
return &BasicSourceManager{
tableMetaCache: make(map[string]*entry, 0),
}
}

// Commit a branch transaction
// TODO wait finish
func (dm *BasicSourceManager) BranchCommit(ctx context.Context, req message.BranchCommitRequest) (branch.BranchStatus, error) {
return branch.BranchStatusPhaseoneDone, nil
}

// Rollback a branch transaction
// TODO wait finish
func (dm *BasicSourceManager) BranchRollback(ctx context.Context, req message.BranchRollbackRequest) (branch.BranchStatus, error) {
return branch.BranchStatusPhaseoneFailed, nil
}

// Branch register long
func (dm *BasicSourceManager) BranchRegister(ctx context.Context, clientId string, req message.BranchRegisterRequest) (int64, error) {
return 0, nil
}

// Branch report
func (dm *BasicSourceManager) BranchReport(ctx context.Context, req message.BranchReportRequest) error {
return nil
}

// Lock query boolean
func (dm *BasicSourceManager) LockQuery(ctx context.Context, branchType branch.BranchType, resourceId, xid, lockKeys string) (bool, error) {
return true, nil
}

// Register a model.Resource to be managed by model.Resource Manager
func (dm *BasicSourceManager) RegisterResource(resource rm.Resource) error {
err := rm.GetRMRemotingInstance().RegisterResource(resource)
if err != nil {
return err
}
return nil
}

// Unregister a model.Resource from the model.Resource Manager
func (dm *BasicSourceManager) UnregisterResource(resource rm.Resource) error {
return errors.New("unsupport unregister resource")
}

// Get all resources managed by this manager
func (dm *BasicSourceManager) GetManagedResources() *sync.Map {
return nil
}

// Get the model.BranchType
func (dm *BasicSourceManager) GetBranchType() branch.BranchType {
return branch.BranchTypeAT
}

// CreateTableMetaCache
func (dm *BasicSourceManager) CreateTableMetaCache(ctx context.Context, resID string, dbType types.DBType, db *sql.DB) (TableMetaCache, error) {
dm.lock.Lock()
defer dm.lock.Unlock()

res, err := buildResource(ctx, dbType, db)
if err != nil {
return nil, err
}

dm.tableMetaCache[resID] = res

// 注册 AT 数据资源
// dm.resourceMgr.RegisterResource(ATResource)

return res.metaCache, err
}

// TableMetaCache tables metadata cache, default is open
type TableMetaCache interface {
// Init
Init(ctx context.Context, conn *sql.DB) error
// GetTableMeta
GetTableMeta(table string) (types.TableMeta, error)
// Destroy
Destroy() error
}

// buildResource
func buildResource(ctx context.Context, dbType types.DBType, db *sql.DB) (*entry, error) {
cache := solts[dbType]()

if err := cache.Init(ctx, db); err != nil {
return nil, err
}

return &entry{
db: db,
metaCache: cache,
}, nil
}

+ 29
- 0
pkg/datasource/sql/datasource/mysql/default.go View File

@@ -0,0 +1,29 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package mysql

import (
"github.com/seata/seata-go/pkg/datasource/sql/datasource"
"github.com/seata/seata-go/pkg/datasource/sql/types"
)

func init() {
datasource.RegisterTableCache(types.DBTypeMySQL, func() datasource.TableMetaCache {
return &tableMetaCache{}
})
}

+ 45
- 0
pkg/datasource/sql/datasource/mysql/meta_cache.go View File

@@ -0,0 +1,45 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package mysql

import (
"context"
"database/sql"

"github.com/seata/seata-go/pkg/datasource/sql/datasource/base"
"github.com/seata/seata-go/pkg/datasource/sql/types"
)

type tableMetaCache struct {
cache *base.BaseTableMetaCache
}

// Init
func (c *tableMetaCache) Init(ctx context.Context, conn *sql.DB) error {
return nil
}

// GetTableMeta
func (c *tableMetaCache) GetTableMeta(table string) (types.TableMeta, error) {
return types.TableMeta{}, nil
}

// Destroy
func (c *tableMetaCache) Destroy() error {
return nil
}

+ 118
- 0
pkg/datasource/sql/db.go View File

@@ -0,0 +1,118 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package sql

import (
"context"
gosql "database/sql"

"github.com/seata/seata-go/pkg/datasource/sql/datasource"
"github.com/seata/seata-go/pkg/datasource/sql/types"
"github.com/seata/seata-go/pkg/datasource/sql/undo"
"github.com/seata/seata-go/pkg/protocol/branch"
)

type dbOption func(db *DBResource)

func withGroupID(id string) dbOption {
return func(db *DBResource) {
db.groupID = id
}
}

func withResourceID(id string) dbOption {
return func(db *DBResource) {
db.resourceID = id
}
}

func withTableMetaCache(c datasource.TableMetaCache) dbOption {
return func(db *DBResource) {
db.metaCache = c
}
}

func withDBType(dt types.DBType) dbOption {
return func(db *DBResource) {
db.dbType = dt
}
}

func withTarget(source *gosql.DB) dbOption {
return func(db *DBResource) {
db.target = source
}
}

func withConf(conf *seataServerConfig) dbOption {
return func(db *DBResource) {
db.conf = *conf
}
}

func newResource(opts ...dbOption) (*DBResource, error) {
db := new(DBResource)

for i := range opts {
opts[i](db)
}

return db, db.init()
}

// DB proxy sql.DB, enchance database/sql.DB to add distribute transaction ability
type DBResource struct {
// groupID
groupID string
// resourceID
resourceID string
// conf
conf seataServerConfig
// target
target *gosql.DB
// dbType
dbType types.DBType
// undoLogMgr
undoLogMgr undo.UndoLogManager
// metaCache
metaCache datasource.TableMetaCache
}

func (db *DBResource) init() error {
mgr := datasource.GetDataSourceManager(db.GetBranchType())
metaCache, err := mgr.CreateTableMetaCache(context.Background(), db.resourceID, db.dbType, db.target)
if err != nil {
return err
}

db.metaCache = metaCache

return nil
}

func (db *DBResource) GetResourceGroupId() string {
return db.groupID
}

func (db *DBResource) GetResourceId() string {
return db.resourceID
}

func (db *DBResource) GetBranchType() branch.BranchType {
return db.conf.BranchType
}

+ 202
- 0
pkg/datasource/sql/driver.go View File

@@ -0,0 +1,202 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package sql

import (
"context"
"database/sql"
"database/sql/driver"
"errors"
"fmt"
"reflect"
"strings"
"unsafe"

"github.com/seata/seata-go/pkg/common/log"

"github.com/go-sql-driver/mysql"
"github.com/seata/seata-go/pkg/datasource/sql/datasource"
"github.com/seata/seata-go/pkg/datasource/sql/types"
"github.com/seata/seata-go/pkg/protocol/branch"
)

const (
SeataMySQLDriver = "seata-mysql"
)

func init() {
sql.Register(SeataMySQLDriver, &SeataDriver{
target: mysql.MySQLDriver{},
})
}

type SeataDriver struct {
target driver.Driver
}

func (d *SeataDriver) Open(name string) (driver.Conn, error) {
conn, err := d.target.Open(name)
if err != nil {
log.Errorf("open connection: %w", err)
return nil, err
}

v := reflect.ValueOf(conn)
if v.Kind() == reflect.Ptr {
v = v.Elem()
}

field := v.FieldByName("connector")

connector, _ := GetUnexportedField(field).(driver.Connector)

dbType := types.ParseDBType(d.getTargetDriverName())
if dbType == types.DBTypeUnknown {
return nil, errors.New("unsupport conn type")
}

c, err := d.OpenConnector(name)
if err != nil {
log.Errorf("open connector: %w", err)
return nil, fmt.Errorf("open connector error: %v", err.Error())
}

proxy, err := registerResource(connector, dbType, sql.OpenDB(c), name)
if err != nil {
log.Errorf("register resource: %w", err)
return nil, err
}

SetUnexportedField(field, proxy)
return conn, nil
}

func (d *SeataDriver) OpenConnector(dataSourceName string) (driver.Connector, error) {
if driverCtx, ok := d.target.(driver.DriverContext); ok {
return driverCtx.OpenConnector(dataSourceName)
}
return &dsnConnector{dsn: dataSourceName, driver: d.target}, nil
}

func (d *SeataDriver) getTargetDriverName() string {
return strings.ReplaceAll(SeataMySQLDriver, "seata-", "")
}

type dsnConnector struct {
dsn string
driver driver.Driver
}

func (t *dsnConnector) Connect(_ context.Context) (driver.Conn, error) {
return t.driver.Open(t.dsn)
}

func (t *dsnConnector) Driver() driver.Driver {
return t.driver
}

func registerResource(connector driver.Connector, dbType types.DBType, db *sql.DB,
dataSourceName string, opts ...seataOption) (driver.Connector, error) {
conf := loadConfig()
for i := range opts {
opts[i](conf)
}

if err := conf.validate(); err != nil {
log.Errorf("invalid conf: %w", err)
return connector, err
}

options := []dbOption{
withGroupID(conf.GroupID),
withResourceID(parseResourceID(dataSourceName)),
withConf(conf),
withTarget(db),
withDBType(dbType),
}

res, err := newResource(options...)
if err != nil {
log.Errorf("create new resource: %w", err)
return nil, err
}

if err = datasource.GetDataSourceManager(conf.BranchType).RegisterResource(res); err != nil {
log.Errorf("regisiter resource: %w", err)
return nil, err
}

return &seataConnector{
res: res,
target: connector,
conf: conf,
}, nil
}

type (
seataOption func(cfg *seataServerConfig)

// seataServerConfig
seataServerConfig struct {
// GroupID
GroupID string `yaml:"groupID"`
// BranchType
BranchType branch.BranchType
// Endpoints
Endpoints []string `yaml:"endpoints" json:"endpoints"`
}
)

func (c *seataServerConfig) validate() error {
return nil
}

// loadConfig
// TODO wait finish
func loadConfig() *seataServerConfig {
// 先设置默认配置

// 从默认文件获取
return &seataServerConfig{
GroupID: "DEFAULT_GROUP",
BranchType: branch.BranchTypeAT,
Endpoints: []string{"127.0.0.1:8888"},
}
}

func parseResourceID(dsn string) string {
i := strings.Index(dsn, "?")

res := dsn

if i > 0 {
res = dsn[:i]
}

return strings.ReplaceAll(res, ",", "|")
}

func GetUnexportedField(field reflect.Value) interface{} {
return reflect.NewAt(field.Type(), unsafe.Pointer(field.UnsafeAddr())).Elem().Interface()
}

func SetUnexportedField(field reflect.Value, value interface{}) {
reflect.NewAt(field.Type(), unsafe.Pointer(field.UnsafeAddr())).
Elem().
Set(reflect.ValueOf(value))
}

+ 138
- 0
pkg/datasource/sql/exec/executor.go View File

@@ -0,0 +1,138 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package exec

import (
"context"
"database/sql/driver"

"github.com/seata/seata-go/pkg/common/log"
"github.com/seata/seata-go/pkg/datasource/sql/parser"
"github.com/seata/seata-go/pkg/datasource/sql/types"
)

// executorSolts
var executorSolts = make(map[types.DBType]map[parser.ExecutorType]func() SQLExecutor)

func RegisterExecutor(dt types.DBType, et parser.ExecutorType, builder func() SQLExecutor) {
if _, ok := executorSolts[dt]; !ok {
executorSolts[dt] = make(map[parser.ExecutorType]func() SQLExecutor)
}

val := executorSolts[dt]

val[et] = func() SQLExecutor {
return &BaseExecutor{ex: builder()}
}
}

type (
CallbackWithNamedValue func(ctx context.Context, query string, args []driver.NamedValue) (types.ExecResult, error)

CallbackWithValue func(ctx context.Context, query string, args []driver.Value) (types.ExecResult, error)

SQLExecutor interface {
// Interceptors
interceptors(interceptors []SQLInterceptor)
// Exec
ExecWithNamedValue(ctx context.Context, execCtx *ExecContext, f CallbackWithNamedValue) (types.ExecResult, error)
// Exec
ExecWithValue(ctx context.Context, execCtx *ExecContext, f CallbackWithValue) (types.ExecResult, error)
}
)

// buildExecutor
func BuildExecutor(dbType types.DBType, query string) (SQLExecutor, error) {
parseCtx, err := parser.DoParser(query)
if err != nil {
return nil, err
}

hooks := make([]SQLInterceptor, 0, 4)
hooks = append(hooks, commonHook...)
hooks = append(hooks, hookSolts[parseCtx.SQLType]...)

factories, ok := executorSolts[dbType]
if !ok {
log.Debugf("%s not found executor factories, return default Executor", dbType.String())
e := &BaseExecutor{}
e.interceptors(hooks)
return e, nil
}

supplier, ok := factories[parseCtx.ExecutorType]
if !ok {
log.Debugf("%s not found executor for %s, return default Executor",
dbType.String(), parseCtx.ExecutorType.String())
e := &BaseExecutor{}
e.interceptors(hooks)
return e, nil
}

executor := supplier()
executor.interceptors(hooks)
return executor, nil
}

type BaseExecutor struct {
is []SQLInterceptor
ex SQLExecutor
}

// Interceptors
func (e *BaseExecutor) interceptors(interceptors []SQLInterceptor) {
e.is = interceptors
}

// Exec
func (e *BaseExecutor) ExecWithNamedValue(ctx context.Context, execCtx *ExecContext, f CallbackWithNamedValue) (types.ExecResult, error) {
for i := range e.is {
e.is[i].Before(ctx, execCtx)
}

defer func() {
for i := range e.is {
e.is[i].After(ctx, execCtx)
}
}()

if e.ex != nil {
return e.ex.ExecWithNamedValue(ctx, execCtx, f)
}

return f(ctx, execCtx.Query, execCtx.NamedValues)
}

// Exec
func (e *BaseExecutor) ExecWithValue(ctx context.Context, execCtx *ExecContext, f CallbackWithValue) (types.ExecResult, error) {
for i := range e.is {
e.is[i].Before(ctx, execCtx)
}

defer func() {
for i := range e.is {
e.is[i].After(ctx, execCtx)
}
}()

if e.ex != nil {
return e.ex.ExecWithValue(ctx, execCtx, f)
}

return f(ctx, execCtx.Query, execCtx.Values)
}

+ 68
- 0
pkg/datasource/sql/exec/hook.go View File

@@ -0,0 +1,68 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package exec

import (
"context"
"database/sql/driver"

"github.com/seata/seata-go/pkg/datasource/sql/types"
)

var (
commonHook = make([]SQLInterceptor, 0, 4)
hookSolts = map[types.SQLType][]SQLInterceptor{}
)

// RegisCommonHook not goroutine safe
func RegisCommonHook(hook SQLInterceptor) {
commonHook = append(commonHook, hook)
}

// RegisHook not goroutine safe
func RegisHook(hook SQLInterceptor) {
_, ok := hookSolts[hook.Type()]

if !ok {
hookSolts[hook.Type()] = make([]SQLInterceptor, 0, 4)
}

hookSolts[hook.Type()] = append(hookSolts[hook.Type()], hook)
}

// ExecContext
type ExecContext struct {
TxCtx *types.TransactionContext
Query string
NamedValues []driver.NamedValue
Values []driver.Value
}

// SQLHook SQL execution front and back interceptor
// case 1. Used to intercept SQL to achieve the generation of front and rear mirrors
// case 2. Burning point to report
// case 3. SQL black and white list
type SQLInterceptor interface {
Type() types.SQLType

// Before
Before(ctx context.Context, execCtx *ExecContext)

// After
After(ctx context.Context, execCtx *ExecContext)
}

+ 59
- 0
pkg/datasource/sql/exec/hook/logger_executor.go View File

@@ -0,0 +1,59 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package exec

import (
"context"

"github.com/seata/seata-go/pkg/common/log"
"github.com/seata/seata-go/pkg/datasource/sql/exec"
"github.com/seata/seata-go/pkg/datasource/sql/types"
"go.uber.org/zap"
)

func init() {
exec.RegisCommonHook(&loggerSQLHook{})
}

type loggerSQLHook struct{}

func (h *loggerSQLHook) Type() types.SQLType {
return types.SQLTypeUnknown
}

// Before
func (h *loggerSQLHook) Before(ctx context.Context, execCtx *exec.ExecContext) {
fields := []zap.Field{
zap.String("tx-id", execCtx.TxCtx.LocalTransID),
zap.String("sql", execCtx.Query),
}

if len(execCtx.NamedValues) != 0 {
fields = append(fields, zap.Any("namedValues", execCtx.NamedValues))
}

if len(execCtx.Values) != 0 {
fields = append(fields, zap.Any("values", execCtx.Values))
}

log.Debug("sql exec log", fields)
}

// After
func (h *loggerSQLHook) After(ctx context.Context, execCtx *exec.ExecContext) {
}

+ 20
- 0
pkg/datasource/sql/handler/rm_handler_at.go View File

@@ -0,0 +1,20 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package handler

type RMHandlerAT struct{}

+ 20
- 0
pkg/datasource/sql/handler/rm_handler_xa.go View File

@@ -0,0 +1,20 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package handler

type RMHandlerXA struct{}

+ 18
- 0
pkg/datasource/sql/lock.go View File

@@ -0,0 +1,18 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package sql

+ 29
- 0
pkg/datasource/sql/parser/executortype_string.go View File

@@ -0,0 +1,29 @@
// Code generated by "stringer -type=ExecutorType"; DO NOT EDIT.

package parser

import "strconv"

func _() {
// An "invalid array index" compiler error signifies that the constant values have changed.
// Re-run the stringer command to generate them again.
var x [1]struct{}
_ = x[UnsupportExecutor-1]
_ = x[InsertExecutor-2]
_ = x[UpdateExecutor-3]
_ = x[DeleteExecutor-4]
_ = x[ReplaceIntoExecutor-5]
_ = x[InsertOnDuplicateExecutor-6]
}

const _ExecutorType_name = "UnsupportExecutorInsertExecutorUpdateExecutorDeleteExecutorReplaceIntoExecutorInsertOnDuplicateExecutor"

var _ExecutorType_index = [...]uint8{0, 17, 31, 45, 59, 78, 103}

func (i ExecutorType) String() string {
i -= 1
if i < 0 || i >= ExecutorType(len(_ExecutorType_index)-1) {
return "ExecutorType(" + strconv.FormatInt(int64(i+1), 10) + ")"
}
return _ExecutorType_name[_ExecutorType_index[i]:_ExecutorType_index[i+1]]
}

+ 87
- 0
pkg/datasource/sql/parser/parser_factory.go View File

@@ -0,0 +1,87 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package parser

import (
tparser "github.com/pingcap/tidb/parser"
"github.com/pingcap/tidb/parser/ast"
_ "github.com/pingcap/tidb/types/parser_driver"
"github.com/seata/seata-go/pkg/datasource/sql/types"
)

// ExecutorType
//go:generate stringer -type=ExecutorType
type ExecutorType int32

const (
_ ExecutorType = iota
UnsupportExecutor
InsertExecutor
UpdateExecutor
DeleteExecutor
ReplaceIntoExecutor
InsertOnDuplicateExecutor
)

type ParseContext struct {
// SQLType
SQLType types.SQLType
// ExecutorType
ExecutorType ExecutorType
// InsertStmt
InsertStmt *ast.InsertStmt
// UpdateStmt
UpdateStmt *ast.UpdateStmt
// DeleteStmt
DeleteStmt *ast.DeleteStmt
}

func DoParser(query string) (*ParseContext, error) {
p := tparser.New()
stmtNode, err := p.ParseOneStmt(query, "", "")
if err != nil {
return nil, err
}

parserCtx := new(ParseContext)

switch stmt := stmtNode.(type) {
case *ast.InsertStmt:
parserCtx.SQLType = types.SQLTypeInsert
parserCtx.InsertStmt = stmt
parserCtx.ExecutorType = InsertExecutor

if stmt.IsReplace {
parserCtx.ExecutorType = ReplaceIntoExecutor
}

if len(stmt.OnDuplicate) != 0 {
parserCtx.ExecutorType = InsertOnDuplicateExecutor
}
case *ast.UpdateStmt:
parserCtx.SQLType = types.SQLTypeUpdate
parserCtx.UpdateStmt = stmt
parserCtx.ExecutorType = UpdateExecutor
case *ast.DeleteStmt:
parserCtx.SQLType = types.SQLTypeDelete
parserCtx.DeleteStmt = stmt
parserCtx.ExecutorType = DeleteExecutor
}

return parserCtx, nil
}

+ 26
- 0
pkg/datasource/sql/plugin.go View File

@@ -0,0 +1,26 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package sql

import (
_ "github.com/seata/seata-go/pkg/datasource/sql/exec/hook"

// mysql 相关插件
_ "github.com/seata/seata-go/pkg/datasource/sql/datasource/mysql"
_ "github.com/seata/seata-go/pkg/datasource/sql/undo/mysql"
)

+ 87
- 0
pkg/datasource/sql/sql_test.go View File

@@ -0,0 +1,87 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package sql

import (
"context"
"database/sql"
"fmt"
"sync"
"testing"

_ "github.com/go-sql-driver/mysql"
)

func Test_SQLOpen(t *testing.T) {
t.SkipNow()

db, err := sql.Open(SeataMySQLDriver, "root:polaris@tcp(127.0.0.1:3306)/polaris_server?multiStatements=true")
if err != nil {
t.Fatal(err)
}

defer db.Close()

sqlStmt := `
create table if not exists foo (id integer not null primary key, name text);
delete from foo;
`
_, err = db.Exec(sqlStmt)
if err != nil {
t.Fatal(err)
}

wait := sync.WaitGroup{}

txInvoker := func(prefix string, offset, total int) {
defer wait.Done()

tx, err := db.BeginTx(context.Background(), &sql.TxOptions{})
if err != nil {
t.Fatal(err)
}

stmt, err := tx.Prepare("insert into foo(id, name) values(?, ?)")
if err != nil {
t.Fatal(err)
}
defer stmt.Close()
for i := 0; i < total; i++ {
_, err = stmt.Exec(i+offset, fmt.Sprintf("%s-%03d", prefix, i))
if err != nil {
t.Fatal(err)
}
}
err = tx.Commit()
if err != nil {
t.Fatal(err)
}
}

wait.Add(2)

t.Parallel()
t.Run("", func(t *testing.T) {
txInvoker("seata-go-at-1", 0, 10)
})
t.Run("", func(t *testing.T) {
txInvoker("seata-go-at-2", 20, 10)
})

wait.Wait()
}

+ 198
- 0
pkg/datasource/sql/stmt.go View File

@@ -0,0 +1,198 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package sql

import (
"context"
"database/sql/driver"

"github.com/seata/seata-go/pkg/datasource/sql/exec"
"github.com/seata/seata-go/pkg/datasource/sql/types"
)

type Stmt struct {
conn *Conn
// res
res *DBResource
// txCtx
txCtx *types.TransactionContext
// query
query string
// stmt
stmt driver.Stmt
}

// Close closes the statement.
//
// As of Go 1.1, a Stmt will not be closed if it's in use
// by any queries.
//
// Drivers must ensure all network calls made by Close
// do not block indefinitely (e.g. apply a timeout).
func (s *Stmt) Close() error {
s.txCtx = nil
return s.stmt.Close()
}

// NumInput returns the number of placeholder parameters.
//
// If NumInput returns >= 0, the sql package will sanity check
// argument counts from callers and return errors to the caller
// before the statement's Exec or Query methods are called.
//
// NumInput may also return -1, if the driver doesn't know
// its number of placeholders. In that case, the sql package
// will not sanity check Exec or Query argument counts.
func (s *Stmt) NumInput() int {
return s.stmt.NumInput()
}

// Query executes a query that may return rows, such as a
// SELECT.
//
// Deprecated: Drivers should implement StmtQueryContext instead (or additionally).
func (s *Stmt) Query(args []driver.Value) (driver.Rows, error) {
executor, err := exec.BuildExecutor(s.res.dbType, s.query)
if err != nil {
return nil, err
}

execCtx := &exec.ExecContext{
TxCtx: s.txCtx,
Query: s.query,
Values: args,
}

ret, err := executor.ExecWithValue(context.Background(), execCtx,
func(ctx context.Context, query string, args []driver.Value) (types.ExecResult, error) {
ret, err := s.stmt.Query(args)
if err != nil {
return nil, err
}

return types.NewResult(types.WithRows(ret)), nil
})
if err != nil {
return nil, err
}

return ret.GetRows(), nil
}

// StmtQueryContext enhances the Stmt interface by providing Query with context.
// QueryContext executes a query that may return rows, such as a
// SELECT.
//
// QueryContext must honor the context timeout and return when it is canceled.
func (s *Stmt) QueryContext(ctx context.Context, args []driver.NamedValue) (driver.Rows, error) {
stmt, ok := s.stmt.(driver.StmtQueryContext)
if !ok {
return nil, driver.ErrSkip
}

executor, err := exec.BuildExecutor(s.res.dbType, s.query)
if err != nil {
return nil, err
}

execCtx := &exec.ExecContext{
TxCtx: s.txCtx,
Query: s.query,
NamedValues: args,
}

ret, err := executor.ExecWithNamedValue(context.Background(), execCtx,
func(ctx context.Context, query string, args []driver.NamedValue) (types.ExecResult, error) {
ret, err := stmt.QueryContext(ctx, args)
if err != nil {
return nil, err
}

return types.NewResult(types.WithRows(ret)), nil
})
if err != nil {
return nil, err
}

return ret.GetRows(), nil
}

// Exec executes a query that doesn't return rows, such
// as an INSERT or UPDATE.
//
// Deprecated: Drivers should implement StmtExecContext instead (or additionally).
func (s *Stmt) Exec(args []driver.Value) (driver.Result, error) {
// in transaction, need run Executor
executor, err := exec.BuildExecutor(s.res.dbType, s.query)
if err != nil {
return nil, err
}

execCtx := &exec.ExecContext{
TxCtx: s.txCtx,
Query: s.query,
Values: args,
}

ret, err := executor.ExecWithValue(context.Background(), execCtx,
func(ctx context.Context, query string, args []driver.Value) (types.ExecResult, error) {
ret, err := s.stmt.Exec(args)
if err != nil {
return nil, err
}

return types.NewResult(types.WithResult(ret)), nil
})

return ret.GetResult(), err
}

// ExecContext executes a query that doesn't return rows, such
// as an INSERT or UPDATE.
//
// ExecContext must honor the context timeout and return when it is canceled.
func (s *Stmt) ExecContext(ctx context.Context, args []driver.NamedValue) (driver.Result, error) {
stmt, ok := s.stmt.(driver.StmtExecContext)
if !ok {
return nil, driver.ErrSkip
}

// in transaction, need run Executor
executor, err := exec.BuildExecutor(s.res.dbType, s.query)
if err != nil {
return nil, err
}

execCtx := &exec.ExecContext{
TxCtx: s.txCtx,
Query: s.query,
NamedValues: args,
}

ret, err := executor.ExecWithNamedValue(ctx, execCtx,
func(ctx context.Context, query string, args []driver.NamedValue) (types.ExecResult, error) {
ret, err := stmt.ExecContext(ctx, args)
if err != nil {
return nil, err
}

return types.NewResult(types.WithResult(ret)), nil
})

return ret.GetResult(), err
}

+ 215
- 0
pkg/datasource/sql/tx.go View File

@@ -0,0 +1,215 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package sql

import (
"context"
"database/sql/driver"

"github.com/seata/seata-go/pkg/common/log"
"github.com/seata/seata-go/pkg/datasource/sql/datasource"
"github.com/seata/seata-go/pkg/protocol/branch"
"github.com/seata/seata-go/pkg/protocol/message"

"github.com/pkg/errors"
"github.com/seata/seata-go/pkg/datasource/sql/types"
"github.com/seata/seata-go/pkg/datasource/sql/undo"
)

const REPORT_RETRY_COUNT = 5

type txOption func(tx *Tx)

func newTx(opts ...txOption) (driver.Tx, error) {
tx := new(Tx)

for i := range opts {
opts[i](tx)
}

if err := tx.init(); err != nil {
return nil, err
}

return tx, nil
}

// withDriverConn
func withDriverConn(conn *Conn) txOption {
return func(t *Tx) {
t.conn = conn
}
}

// withOriginTx
func withOriginTx(tx driver.Tx) txOption {
return func(t *Tx) {
t.target = tx
}
}

// withTxCtx
func withTxCtx(ctx *types.TransactionContext) txOption {
return func(t *Tx) {
t.ctx = ctx
}
}

// Tx
type Tx struct {
conn *Conn
ctx *types.TransactionContext
target driver.Tx
}

// Commit do commit action
// case 1. no open global-transaction, just do local transaction commit
// case 2. not need flush undolog, is XA mode, do local transaction commit
// case 3. need run AT transaction
func (tx *Tx) Commit() error {
if tx.ctx.TransType == types.Local {
return tx.commitOnLocal()
}

// flush undo log if need, is XA mode
if tx.ctx.TransType == types.XAMode {
return tx.commitOnXA()
}

return tx.commitOnAT()
}

func (tx *Tx) Rollback() error {
err := tx.target.Rollback()
if err != nil {
if tx.ctx.OpenGlobalTrsnaction() && tx.ctx.IsBranchRegistered() {
tx.report(false)
}
}

return err
}

// init
func (tx *Tx) init() error {
return nil
}

// commitOnLocal
func (tx *Tx) commitOnLocal() error {
return tx.target.Commit()
}

// commitOnXA
func (tx *Tx) commitOnXA() error {
return nil
}

// commitOnAT
func (tx *Tx) commitOnAT() error {
// if TX-Mode is AT, run regis this transaction branch
if err := tx.regis(tx.ctx); err != nil {
return err
}

undoLogMgr, err := undo.GetUndoLogManager(tx.ctx.DBType)
if err != nil {
return err
}

if err := undoLogMgr.FlushUndoLog(tx.ctx, nil); err != nil {
if rerr := tx.report(false); rerr != nil {
return errors.WithStack(rerr)
}
return errors.WithStack(err)
}

if err := tx.commitOnLocal(); err != nil {
if rerr := tx.report(false); rerr != nil {
return errors.WithStack(rerr)
}
return errors.WithStack(err)
}

tx.report(true)
return nil
}

// regis
func (tx *Tx) regis(ctx *types.TransactionContext) error {
if !ctx.HasUndoLog() || !ctx.HasLockKey() {
return nil
}
lockKey := ""
for _, v := range ctx.LockKeys {
lockKey += v + ";"
}
request := message.BranchRegisterRequest{
Xid: ctx.XaID,
BranchType: branch.BranchType(ctx.TransType),
ResourceId: ctx.ResourceID,
LockKey: lockKey,
ApplicationData: nil,
}
dataSourceManager := datasource.GetDataSourceManager(branch.BranchType(ctx.TransType))
branchId, err := dataSourceManager.BranchRegister(context.Background(), "", request)
if err != nil {
log.Infof("Failed to report branch status: %s", err.Error())
return err
}
ctx.BranchID = uint64(branchId)
return nil
}

// report
func (tx *Tx) report(success bool) error {
if tx.ctx.BranchID == 0 {
return nil
}
status := getStatus(success)
request := message.BranchReportRequest{
Xid: tx.ctx.XaID,
BranchId: int64(tx.ctx.BranchID),
ResourceId: tx.ctx.ResourceID,
Status: status,
}
dataSourceManager := datasource.GetDataSourceManager(branch.BranchType(tx.ctx.TransType))
retry := REPORT_RETRY_COUNT
for retry > 0 {
err := dataSourceManager.BranchReport(context.Background(), request)
if err != nil {
retry--
log.Infof("Failed to report [%s / %s] commit done [%s] Retry Countdown: %s", tx.ctx.BranchID, tx.ctx.XaID, success, retry)
if retry == 0 {
log.Infof("Failed to report branch status: %s", err.Error())
return err
}
} else {
return nil
}
}
return nil
}

func getStatus(success bool) branch.BranchStatus {
if success {
return branch.BranchStatusPhaseoneDone
} else {
return branch.BranchStatusPhaseoneFailed
}
}

+ 28
- 0
pkg/datasource/sql/types/dbtype_string.go View File

@@ -0,0 +1,28 @@
// Code generated by "stringer -type=DBType"; DO NOT EDIT.

package types

import "strconv"

func _() {
// An "invalid array index" compiler error signifies that the constant values have changed.
// Re-run the stringer command to generate them again.
var x [1]struct{}
_ = x[DBTypeUnknown-1]
_ = x[DBTypeMySQL-2]
_ = x[DBTypePostgreSQL-3]
_ = x[DBTypeSQLServer-4]
_ = x[DBTypeOracle-5]
}

const _DBType_name = "DBTypeUnknownDBTypeMySQLDBTypePostgreSQLDBTypeSQLServerDBTypeOracle"

var _DBType_index = [...]uint8{0, 13, 24, 40, 55, 67}

func (i DBType) String() string {
i -= 1
if i < 0 || i >= DBType(len(_DBType_index)-1) {
return "DBType(" + strconv.FormatInt(int64(i+1), 10) + ")"
}
return _DBType_name[_DBType_index[i]:_DBType_index[i+1]]
}

+ 57
- 0
pkg/datasource/sql/types/error.go View File

@@ -0,0 +1,57 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package types

type ErrorCode int32

const (
_ ErrorCode = iota
ErrorCodeUnknown
ErrorCodeBeginFailed
ErrorCodeLockKeyConflict
ErrorCodeIO
ErrorCodeBranchRollbackFailedRetriable
ErrorCodeBranchRollbackFailedUnretriable
ErrorCodeBranchRegisterFailed
ErrorCodeBranchReportFailed
ErrorCodeLockableCheckFailed
ErrorCodeBranchTransactionNotExist
ErrorCodeGlobalTransactionNotExist
ErrorCodeGlobalTransactionNotActive
ErrorCodeGlobalTransactionStatusInvalid
ErrorCodeFailedToSendBranchCommitRequest
ErrorCodeFailedToSendBranchRollbackRequest
ErrorCodeFailedToAddBranch
ErrorCodeFailedWriteSession
ErrorCodeFailedLockGlobalTranscation
ErrorCodeFailedStore
ErrorCodeLockKeyConflictFailFast
)

type TransactionError struct {
code ErrorCode
msg string
}

func (e *TransactionError) Error() string {
return e.msg
}

func (e *TransactionError) Code() ErrorCode {
return e.code
}

+ 101
- 0
pkg/datasource/sql/types/image.go View File

@@ -0,0 +1,101 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package types

import (
gosql "database/sql"
)

// RoundRecordImage Front and rear mirror data
type RoundRecordImage struct {
bIndex int32
before RecordImages
aIndex int32
after RecordImages
}

// AppendBeofreImage
func (r *RoundRecordImage) AppendBeofreImage(image *RecordImage) {
r.bIndex++
image.index = r.bIndex

r.before = append(r.before, image)
}

// AppendAfterImage
func (r *RoundRecordImage) AppendAfterImage(image *RecordImage) {
r.aIndex++
image.index = r.aIndex

r.after = append(r.after, image)
}

func (r *RoundRecordImage) BeofreImages() RecordImages {
return r.before
}

func (r *RoundRecordImage) AfterImages() RecordImages {
return r.after
}

func (r *RoundRecordImage) IsEmpty() bool {
return false
}

type RecordImages []*RecordImage

// Reserve The order of reverse mirrors, when executing undo, needs to be executed in reverse
func (rs RecordImages) Reserve() {
l := 0
r := len(rs) - 1

for l <= r {
rs[l], rs[r] = rs[r], rs[l]

l++
r--
}
}

// RecordImage
type RecordImage struct {
// index
index int32
// Table
Table string
// SQLType
SQLType string
// Rows
Rows []RowImage
}

// RowImage Mirror data information information
type RowImage struct {
// Columns All columns of image data
Columns []ColumnImage
}

// ColumnImage The mirror data information of the column
type ColumnImage struct {
// Name column name
Name string
// Type column type
Type gosql.ColumnType
// Value column value
Value interface{}
}

+ 64
- 0
pkg/datasource/sql/types/meta.go View File

@@ -0,0 +1,64 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package types

import (
"database/sql"
)

// ColumnMeta
type ColumnMeta struct {
// Schema
Schema string
// Table
Table string
// Autoincrement
Autoincrement bool
// Info
Info sql.ColumnType
}

// IndexMeta
type IndexMeta struct {
// Schema
Schema string
// Table
Table string
// Name
Name string
// IType
IType IndexType
// Values
Values []ColumnMeta
}

// TableMeta
type TableMeta struct {
// Schema
Schema string
// Name
Name string
// Columns
Columns map[string]ColumnMeta
// Indexs
Indexs map[string]IndexMeta
}

func (m TableMeta) IsEmpty() bool {
return m.Name == ""
}

+ 55
- 0
pkg/datasource/sql/types/sql.go View File

@@ -0,0 +1,55 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package types

//go:generate stringer -type=SQLType
type SQLType int32

const (
_ SQLType = iota
SQLTypeUnknown
SQLTypeSelect
SQLTypeInsert
SQLTypeUpdate
SQLTypeDelete
SQLTypeSelectForUpdate
SQLTypeReplace
SQLTypeTruncate
SQLTypeCreate
SQLTypeDrop
SQLTypeLoad
SQLTypeMerge
SQLTypeShow
SQLTypeAlter
SQLTypeRename
SQLTypeDump
SQLTypeDebug
SQLTypeExplain
SQLTypeDesc
SQLTypeSet
SQLTypeReload
SQLTypeSelectUnion
SQLTypeCreateTable
SQLTypeDropTable
SQLTypeAlterTable
SQLTypeSelectFromUpdate
SQLTypeMultiDelete
SQLTypeMultiUpdate
SQLTypeCreateIndex
SQLTypeDropIndex
)

+ 53
- 0
pkg/datasource/sql/types/sqltype_string.go View File

@@ -0,0 +1,53 @@
// Code generated by "stringer -type=SQLType"; DO NOT EDIT.

package types

import "strconv"

func _() {
// An "invalid array index" compiler error signifies that the constant values have changed.
// Re-run the stringer command to generate them again.
var x [1]struct{}
_ = x[SQLTypeUnknown-1]
_ = x[SQLTypeSelect-2]
_ = x[SQLTypeInsert-3]
_ = x[SQLTypeUpdate-4]
_ = x[SQLTypeDelete-5]
_ = x[SQLTypeSelectForUpdate-6]
_ = x[SQLTypeReplace-7]
_ = x[SQLTypeTruncate-8]
_ = x[SQLTypeCreate-9]
_ = x[SQLTypeDrop-10]
_ = x[SQLTypeLoad-11]
_ = x[SQLTypeMerge-12]
_ = x[SQLTypeShow-13]
_ = x[SQLTypeAlter-14]
_ = x[SQLTypeRename-15]
_ = x[SQLTypeDump-16]
_ = x[SQLTypeDebug-17]
_ = x[SQLTypeExplain-18]
_ = x[SQLTypeDesc-19]
_ = x[SQLTypeSet-20]
_ = x[SQLTypeReload-21]
_ = x[SQLTypeSelectUnion-22]
_ = x[SQLTypeCreateTable-23]
_ = x[SQLTypeDropTable-24]
_ = x[SQLTypeAlterTable-25]
_ = x[SQLTypeSelectFromUpdate-26]
_ = x[SQLTypeMultiDelete-27]
_ = x[SQLTypeMultiUpdate-28]
_ = x[SQLTypeCreateIndex-29]
_ = x[SQLTypeDropIndex-30]
}

const _SQLType_name = "SQLTypeUnknownSQLTypeSelectSQLTypeInsertSQLTypeUpdateSQLTypeDeleteSQLTypeSelectForUpdateSQLTypeReplaceSQLTypeTruncateSQLTypeCreateSQLTypeDropSQLTypeLoadSQLTypeMergeSQLTypeShowSQLTypeAlterSQLTypeRenameSQLTypeDumpSQLTypeDebugSQLTypeExplainSQLTypeDescSQLTypeSetSQLTypeReloadSQLTypeSelectUnionSQLTypeCreateTableSQLTypeDropTableSQLTypeAlterTableSQLTypeSelectFromUpdateSQLTypeMultiDeleteSQLTypeMultiUpdateSQLTypeCreateIndexSQLTypeDropIndex"

var _SQLType_index = [...]uint16{0, 14, 27, 40, 53, 66, 88, 102, 117, 130, 141, 152, 164, 175, 187, 200, 211, 223, 237, 248, 258, 271, 289, 307, 323, 340, 363, 381, 399, 417, 433}

func (i SQLType) String() string {
i -= 1
if i < 0 || i >= SQLType(len(_SQLType_index)-1) {
return "SQLType(" + strconv.FormatInt(int64(i+1), 10) + ")"
}
return _SQLType_name[_SQLType_index[i]:_SQLType_index[i+1]]
}

+ 195
- 0
pkg/datasource/sql/types/types.go View File

@@ -0,0 +1,195 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package types

import (
"database/sql/driver"
"strings"

"github.com/google/uuid"
)

//go:generate stringer -type=DBType
type DBType int16

type (
// DBType
// BranchPhase
BranchPhase int8
// IndexType index type
IndexType int16
)

const (
_ DBType = iota
DBTypeUnknown
DBTypeMySQL
DBTypePostgreSQL
DBTypeSQLServer
DBTypeOracle

BranchPhase_Unknown = 0
BranchPhase_Done = 1
BranchPhase_Failed = 2

// Index_Primary primary index type.
IndexPrimary = 0
// Index_Normal normal index type.
IndexNormal = 1
// Index_Unique unique index type.
IndexUnique = 2
// Index_FullText full text index type.
IndexFullText = 3
)

func ParseDBType(driverName string) DBType {
switch strings.ToLower(driverName) {
case "mysql":
return DBTypeMySQL
default:
return DBTypeUnknown
}
}

// TransactionType
type TransactionType int8

const (
_ TransactionType = iota
Local
XAMode
ATMode
)

// TransactionContext seata-go‘s context of transaction
type TransactionContext struct {
// LocalTransID locals transaction id
LocalTransID string
// LockKeys
LockKeys []string
// DBType db type, eg. MySQL/PostgreSQL/SQLServer
DBType DBType
// TxOpt transaction option
TxOpt driver.TxOptions
// TransType transaction mode, eg. XA/AT
TransType TransactionType
// ResourceID resource id, database-table
ResourceID string
// BranchID transaction branch unique id
BranchID uint64
// XaID XA id
XaID string
// GlobalLockRequire
GlobalLockRequire bool
// RoundImages when run in AT mode, record before and after Row image
RoundImages *RoundRecordImage
}

func NewTxCtx() *TransactionContext {
return &TransactionContext{
LockKeys: make([]string, 0, 4),
TransType: ATMode,
LocalTransID: uuid.New().String(),
}
}

// HasUndoLog
func (t *TransactionContext) HasUndoLog() bool {
return t.TransType == ATMode && !t.RoundImages.IsEmpty()
}

// HasLockKey
func (t *TransactionContext) HasLockKey() bool {
return len(t.LockKeys) != 0
}

func (t *TransactionContext) OpenGlobalTrsnaction() bool {
return t.TransType != Local
}

func (t *TransactionContext) IsBranchRegistered() bool {
return t.BranchID != 0
}

type (
ExecResult interface {
GetRows() driver.Rows

GetResult() driver.Result
}

queryResult struct {
Rows driver.Rows
}

writeResult struct {
Result driver.Result
}
)

func (r *queryResult) GetRows() driver.Rows {
return r.Rows
}

func (r *queryResult) GetResult() driver.Result {
panic("writeResult no support")
}

func (r *writeResult) GetRows() driver.Rows {
panic("writeResult no support")
}

func (r *writeResult) GetResult() driver.Result {
return r.Result
}

type option struct {
rows driver.Rows
ret driver.Result
}

type Option func(*option)

func WithRows(rows driver.Rows) Option {
return func(o *option) {
o.rows = rows
}
}

func WithResult(ret driver.Result) Option {
return func(o *option) {
o.ret = ret
}
}

func NewResult(opts ...Option) ExecResult {
o := &option{}

for i := range opts {
opts[i](o)
}

if o.ret != nil {
return &writeResult{Result: o.ret}
}
if o.rows != nil {
return &queryResult{Rows: o.rows}
}

panic("not expect result, impossible run into here")
}

+ 60
- 0
pkg/datasource/sql/undo/base/undo.go View File

@@ -0,0 +1,60 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package base

import (
"database/sql"
"database/sql/driver"

"github.com/seata/seata-go/pkg/datasource/sql/types"
"github.com/seata/seata-go/pkg/datasource/sql/undo"
)

var _ undo.UndoLogManager = (*BaseUndoLogManager)(nil)

// BaseUndoLogManager
type BaseUndoLogManager struct{}

// Init
func (m *BaseUndoLogManager) Init() {
}

// InsertUndoLog
func (m *BaseUndoLogManager) InsertUndoLog(l []undo.BranchUndoLog, tx driver.Tx) error {
return nil
}

// DeleteUndoLog
func (m *BaseUndoLogManager) DeleteUndoLogs(xid []string, branchID []int64, conn *sql.Conn) error {
return nil
}

// FlushUndoLog
func (m *BaseUndoLogManager) FlushUndoLog(txCtx *types.TransactionContext, tx driver.Tx) error {
return nil
}

// RunUndo
func (m *BaseUndoLogManager) RunUndo(xid string, branchID int64, conn *sql.Conn) error {
return nil
}

// DBType
func (m *BaseUndoLogManager) DBType() types.DBType {
panic("implement me")
}

+ 29
- 0
pkg/datasource/sql/undo/mysql/default.go View File

@@ -0,0 +1,29 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package mysql

import (
"github.com/pkg/errors"
"github.com/seata/seata-go/pkg/datasource/sql/undo"
)

func init() {
if err := undo.Regis(&undoLogManager{}); err != nil {
panic(errors.WithStack(err))
}
}

+ 62
- 0
pkg/datasource/sql/undo/mysql/undo.go View File

@@ -0,0 +1,62 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package mysql

import (
"database/sql"
"database/sql/driver"

"github.com/seata/seata-go/pkg/datasource/sql/types"
"github.com/seata/seata-go/pkg/datasource/sql/undo"
"github.com/seata/seata-go/pkg/datasource/sql/undo/base"
)

var _ undo.UndoLogManager = (*undoLogManager)(nil)

type undoLogManager struct {
Base *base.BaseUndoLogManager
}

// Init
func (m *undoLogManager) Init() {
}

// InsertUndoLog
func (m *undoLogManager) InsertUndoLog(l []undo.BranchUndoLog, tx driver.Tx) error {
return m.Base.InsertUndoLog(l, tx)
}

// DeleteUndoLog
func (m *undoLogManager) DeleteUndoLogs(xid []string, branchID []int64, conn *sql.Conn) error {
return m.Base.DeleteUndoLogs(xid, branchID, conn)
}

// FlushUndoLog
func (m *undoLogManager) FlushUndoLog(txCtx *types.TransactionContext, tx driver.Tx) error {
return m.Base.FlushUndoLog(txCtx, tx)
}

// RunUndo
func (m *undoLogManager) RunUndo(xid string, branchID int64, conn *sql.Conn) error {
return m.Base.RunUndo(xid, branchID, conn)
}

// DBType
func (m *undoLogManager) DBType() types.DBType {
return types.DBTypeMySQL
}

+ 110
- 0
pkg/datasource/sql/undo/undo.go View File

@@ -0,0 +1,110 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package undo

import (
"database/sql"
"database/sql/driver"
"errors"
"sync"

"github.com/seata/seata-go/pkg/datasource/sql/types"
)

var solts = map[types.DBType]*undoLogMgrHolder{}

type undoLogMgrHolder struct {
once sync.Once
mgr UndoLogManager
}

func Regis(m UndoLogManager) error {
if _, exist := solts[m.DBType()]; exist {
return nil
}

solts[m.DBType()] = &undoLogMgrHolder{
mgr: m,
once: sync.Once{},
}
return nil
}

// UndoLogManager
type UndoLogManager interface {
Init()
// InsertUndoLog
InsertUndoLog(l []BranchUndoLog, tx driver.Tx) error
// DeleteUndoLog
DeleteUndoLogs(xid []string, branchID []int64, conn *sql.Conn) error
// FlushUndoLog
FlushUndoLog(txCtx *types.TransactionContext, tx driver.Tx) error
// RunUndo
RunUndo(xid string, branchID int64, conn *sql.Conn) error
// DBType
DBType() types.DBType
}

// GetUndoLogManager
func GetUndoLogManager(d types.DBType) (UndoLogManager, error) {
v, ok := solts[d]

if !ok {
return nil, errors.New("not found UndoLogManager")
}

v.once.Do(func() {
v.mgr.Init()
})

return v.mgr, nil
}

// BranchUndoLog
type BranchUndoLog struct {
// Xid
Xid string
// BranchID
BranchID string
// Logs
Logs []SQLUndoLog
}

// Marshal
func (b *BranchUndoLog) Marshal() []byte {
return nil
}

// SQLUndoLog
type SQLUndoLog struct {
SQLType types.SQLType
TableName string
Images types.RoundRecordImage
}

// UndoLogParser
type UndoLogParser interface {
// GetName
GetName() string
// GetDefaultContent
GetDefaultContent() []byte
// Encode
Encode(l BranchUndoLog) []byte
// Decode
Decode(b []byte) BranchUndoLog
}

Loading…
Cancel
Save