Browse Source

feat(fence): implement fence use datasource proxy (#420)

tags/v1.2.0
juzimao GitHub 2 years ago
parent
commit
735849420e
No known key found for this signature in database GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 456 additions and 45 deletions
  1. +4
    -1
      go.mod
  2. +1
    -0
      go.sum
  3. +27
    -21
      pkg/rm/tcc/fence/fence_api.go
  4. +100
    -0
      pkg/rm/tcc/fence/fence_driver.go
  5. +159
    -0
      pkg/rm/tcc/fence/fence_driver_conn.go
  6. +30
    -0
      pkg/rm/tcc/fence/fence_driver_conn_test.go
  7. +37
    -0
      pkg/rm/tcc/fence/fence_driver_tx.go
  8. +74
    -0
      pkg/rm/tcc/fence/fennce_driver_test.go
  9. +7
    -22
      pkg/rm/tcc/fence/handler/tcc_fence_wrapper_handler.go
  10. +17
    -1
      pkg/tm/context.go

+ 4
- 1
go.mod View File

@@ -30,7 +30,10 @@ require (
vimagination.zapto.org/byteio v0.0.0-20200222190125-d27cba0f0b10
)

require github.com/agiledragon/gomonkey/v2 v2.9.0
require (
github.com/agiledragon/gomonkey v2.0.2+incompatible
github.com/agiledragon/gomonkey/v2 v2.9.0
)

require (
github.com/RoaringBitmap/roaring v1.2.0 // indirect


+ 1
- 0
go.sum View File

@@ -54,6 +54,7 @@ github.com/VividCortex/gohistogram v1.0.0/go.mod h1:Pf5mBqqDxYaXu3hDrrU+w6nw50o/
github.com/Workiva/go-datastructures v1.0.52 h1:PLSK6pwn8mYdaoaCZEMsXBpBotr4HHn9abU0yMQt0NI=
github.com/Workiva/go-datastructures v1.0.52/go.mod h1:Z+F2Rca0qCsVYDS8z7bAGm8f3UkzuWYS/oBZz5a7VVA=
github.com/afex/hystrix-go v0.0.0-20180502004556-fa1af6a1f4f5/go.mod h1:SkGFH1ia65gfNATL8TAiHDNxPzPdmEL5uirI2Uyuz6c=
github.com/agiledragon/gomonkey v2.0.2+incompatible h1:eXKi9/piiC3cjJD1658mEE2o3NjkJ5vDLgYjCQu0Xlw=
github.com/agiledragon/gomonkey v2.0.2+incompatible/go.mod h1:2NGfXu1a80LLr2cmWXGBDaHEjb1idR6+FVlX5T3D9hw=
github.com/agiledragon/gomonkey/v2 v2.9.0 h1:PDiKKybR596O6FHW+RVSG0Z7uGCBNbmbUXh3uCNQ7Hc=
github.com/agiledragon/gomonkey/v2 v2.9.0/go.mod h1:ap1AmDzcVOAz1YpeJ3TCzIgstoaWLA6jbbgxfB4w2iY=


+ 27
- 21
pkg/rm/tcc/fence/fence_api.go View File

@@ -25,36 +25,42 @@ import (
"github.com/seata/seata-go/pkg/rm/tcc/fence/enum"
"github.com/seata/seata-go/pkg/rm/tcc/fence/handler"
"github.com/seata/seata-go/pkg/tm"
"github.com/seata/seata-go/pkg/util/log"
)

// WithFence This method is a suspended API interface that asserts the phase timing of a transaction
// WithFence Execute the fence database operation first and then call back the business method
func WithFence(ctx context.Context, tx *sql.Tx, callback func() error) (err error) {
if err = DoFence(ctx, tx); err != nil {
return err
}

if err := callback(); err != nil {
return fmt.Errorf("the business method error msg of: %p, [%w]", callback, err)
}

return
}

// DeFence This method is a suspended API interface that asserts the phase timing of a transaction
// and performs corresponding database operations to ensure transaction consistency
// case 1: if fencePhase is FencePhaseNotExist, will return a fence not found error.
// case 2: if fencePhase is FencePhasePrepare, will do prepare fence operation.
// case 3: if fencePhase is FencePhaseCommit, will do commit fence operation.
// case 4: if fencePhase is FencePhaseRollback, will do rollback fence operation.
// case 5: if fencePhase not in above case, will return a fence phase illegal error.
func WithFence(ctx context.Context, tx *sql.Tx, callback func() error) (err error) {
fp := tm.GetFencePhase(ctx)
h := handler.GetFenceHandler()

switch {
case fp == enum.FencePhaseNotExist:
err = fmt.Errorf("xid %s, tx name %s, fence phase not exist", tm.GetXID(ctx), tm.GetTxName(ctx))
case fp == enum.FencePhasePrepare:
err = h.PrepareFence(ctx, tx, callback)
case fp == enum.FencePhaseCommit:
err = h.CommitFence(ctx, tx, callback)
case fp == enum.FencePhaseRollback:
err = h.RollbackFence(ctx, tx, callback)
default:
err = fmt.Errorf("fence phase: %v illegal", fp)
}
func DoFence(ctx context.Context, tx *sql.Tx) error {
hd := handler.GetFenceHandler()
phase := tm.GetFencePhase(ctx)

if err != nil {
log.Error(err)
switch phase {
case enum.FencePhaseNotExist:
return fmt.Errorf("xid %s, tx name %s, fence phase not exist", tm.GetXID(ctx), tm.GetTxName(ctx))
case enum.FencePhasePrepare:
return hd.PrepareFence(ctx, tx)
case enum.FencePhaseCommit:
return hd.CommitFence(ctx, tx)
case enum.FencePhaseRollback:
return hd.RollbackFence(ctx, tx)
}

return
return fmt.Errorf("fence phase: %v illegal", phase)
}

+ 100
- 0
pkg/rm/tcc/fence/fence_driver.go View File

@@ -0,0 +1,100 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package fence

import (
"context"
"database/sql"
"database/sql/driver"

"github.com/go-sql-driver/mysql"
"github.com/pkg/errors"
"github.com/seata/seata-go/pkg/util/log"
)

const (
// SeataFenceMySQLDriver MySQL driver for fence
SeataFenceMySQLDriver = "seata-fence-mysql"
)

func init() {
sql.Register(SeataFenceMySQLDriver, &FenceDriver{
TargetDriver: &mysql.MySQLDriver{},
})
}

type FenceDriver struct {
TargetDriver driver.Driver
TargetDB *sql.DB
}

func (fd *FenceDriver) Open(name string) (driver.Conn, error) {
return nil, errors.New("operation unsupported")
}

func (fd *FenceDriver) OpenConnector(name string) (connector driver.Connector, re error) {
connector = &dsnConnector{dsn: name, driver: fd.TargetDriver}
if driverCtx, ok := fd.TargetDriver.(driver.DriverContext); ok {
connector, re = driverCtx.OpenConnector(name)
if re != nil {
log.Errorf("open connector: %w", re)
return nil, re
}
}

fd.TargetDB = sql.OpenDB(connector)

return &SeataFenceConnector{
TargetConnector: connector,
TargetDB: fd.TargetDB,
}, nil
}

type dsnConnector struct {
dsn string
driver driver.Driver
}

func (connector *dsnConnector) Connect(_ context.Context) (driver.Conn, error) {
return connector.driver.Open(connector.dsn)
}

func (connector *dsnConnector) Driver() driver.Driver {
return connector.driver
}

type SeataFenceConnector struct {
TargetConnector driver.Connector
TargetDB *sql.DB
}

func (connector *SeataFenceConnector) Connect(ctx context.Context) (driver.Conn, error) {
targetConn, err := connector.TargetConnector.Connect(ctx)
if err != nil {
return nil, err
}

return &FenceConn{
TargetConn: targetConn,
TargetDB: connector.TargetDB,
}, nil
}

func (connector *SeataFenceConnector) Driver() driver.Driver {
return connector.TargetConnector.Driver()
}

+ 159
- 0
pkg/rm/tcc/fence/fence_driver_conn.go View File

@@ -0,0 +1,159 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package fence

import (
"context"
"database/sql"
"database/sql/driver"

"github.com/pkg/errors"
"github.com/seata/seata-go/pkg/tm"
"github.com/seata/seata-go/pkg/util/log"
)

type FenceConn struct {
TargetConn driver.Conn
TargetDB *sql.DB
}

func (c *FenceConn) ResetSession(ctx context.Context) error {
resetter, ok := c.TargetConn.(driver.SessionResetter)
if !ok {
return driver.ErrSkip
}

return resetter.ResetSession(ctx)
}

func (c *FenceConn) Prepare(query string) (driver.Stmt, error) {
return c.TargetConn.Prepare(query)
}

func (c *FenceConn) PrepareContext(ctx context.Context, query string) (driver.Stmt, error) {
return c.TargetConn.Prepare(query)
}

func (c *FenceConn) Exec(query string, args []driver.Value) (driver.Result, error) {
execer, ok := c.TargetConn.(driver.Execer)
if !ok {
return nil, driver.ErrSkip
}

return execer.Exec(query, args)
}

func (c *FenceConn) ExecContext(ctx context.Context, query string, args []driver.NamedValue) (driver.Result, error) {
execerContext, ok := c.TargetConn.(driver.ExecerContext)
if !ok {
values := make([]driver.Value, 0, len(args))
for i := range args {
values = append(values, args[i].Value)
}
return c.Exec(query, values)
}

return execerContext.ExecContext(ctx, query, args)
}

func (c *FenceConn) Query(query string, args []driver.Value) (driver.Rows, error) {
queryer, ok := c.TargetConn.(driver.Queryer)
if !ok {
return nil, driver.ErrSkip
}

return queryer.Query(query, args)
}

func (c *FenceConn) QueryContext(ctx context.Context, query string, args []driver.NamedValue) (driver.Rows, error) {
QueryerContext, ok := c.TargetConn.(driver.QueryerContext)
if !ok {
values := make([]driver.Value, 0, len(args))

for i := range args {
values = append(values, args[i].Value)
}

return c.Query(query, values)
}

return QueryerContext.QueryContext(ctx, query, args)
}

func (c *FenceConn) Begin() (driver.Tx, error) {
return nil, errors.New("operation unsupport")
}

func (c *FenceConn) BeginTx(ctx context.Context, opts driver.TxOptions) (driver.Tx, error) {
beginer, ok := c.TargetConn.(driver.ConnBeginTx)
if !ok {
return nil, errors.New("operation unsupported")
}

tx, err := beginer.BeginTx(ctx, opts)
if err != nil {
return nil, err
}

if !tm.IsSeataContext(ctx) {
return nil, errors.New("there is not seata context")
}

// check if have been begin fence tx
if tm.IsFenceTxBegin(ctx) {
return tx, nil
}

tm.SetFenceTxBeginedFlag(ctx, true)

fenceTx, err := c.TargetDB.BeginTx(ctx, &sql.TxOptions{})
if err != nil {
return nil, err
}
defer func() {
if err != nil {
if err := fenceTx.Rollback(); err != nil {
log.Error(err)
}

// although it have not any db operations yet, is still rollback to avoid leak tx.
if err := tx.Rollback(); err != nil {
log.Error(err)
}
}
}()

// do fence operations
emptyCallback := func() error {
return nil
}

if err := WithFence(ctx, fenceTx, emptyCallback); err != nil {
return nil, err
}

return &FenceTx{
Ctx: ctx,
TargetTx: tx,
TargetFenceTx: fenceTx,
}, nil
}

func (c *FenceConn) Close() error {
return c.TargetConn.Close()
}

+ 30
- 0
pkg/rm/tcc/fence/fence_driver_conn_test.go View File

@@ -0,0 +1,30 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package fence

import (
"testing"

"github.com/stretchr/testify/assert"
)

func TestBegin(t *testing.T) {
tx, err := (&FenceConn{}).Begin()
assert.NotNil(t, err)
assert.Nil(t, tx)
}

+ 37
- 0
pkg/rm/tcc/fence/fence_driver_tx.go View File

@@ -0,0 +1,37 @@
package fence

import (
"context"
"database/sql"
"database/sql/driver"

"github.com/seata/seata-go/pkg/tm"
)

type FenceTx struct {
Ctx context.Context
TargetTx driver.Tx
TargetFenceTx *sql.Tx
}

func (tx *FenceTx) Commit() error {
if err := tx.TargetTx.Commit(); err != nil {
return err
}

tx.clearFenceTx()
return tx.TargetFenceTx.Commit()
}

func (tx *FenceTx) Rollback() error {
if err := tx.TargetTx.Rollback(); err != nil {
return err
}

tx.clearFenceTx()
return tx.TargetFenceTx.Rollback()
}

func (tx *FenceTx) clearFenceTx() {
tm.SetFenceTxBeginedFlag(tx.Ctx, false)
}

+ 74
- 0
pkg/rm/tcc/fence/fennce_driver_test.go View File

@@ -0,0 +1,74 @@
package fence

import (
"context"
"database/sql"
"database/sql/driver"
"reflect"
"testing"

"github.com/agiledragon/gomonkey"
"github.com/go-sql-driver/mysql"
"github.com/stretchr/testify/assert"
)

func TestOpen(t *testing.T) {
conn, err := (&FenceDriver{}).Open("")
assert.NotNil(t, err)
assert.Nil(t, conn)
}

func TestOpenConnector(t *testing.T) {
mysqlDriver := &mysql.MySQLDriver{}
d := &FenceDriver{TargetDriver: mysqlDriver}

openDBStub := gomonkey.ApplyFunc(sql.OpenDB, func(_ driver.Connector) *sql.DB {
return &sql.DB{}
})

openConnectorStub := gomonkey.ApplyMethod(
reflect.TypeOf(mysqlDriver),
"OpenConnector",
func(_ *mysql.MySQLDriver, name string) (connector driver.Connector, re error) {
return nil, nil
},
)

connector, err := d.OpenConnector("mock")
assert.Nil(t, err)
seataConnector, ok := connector.(*SeataFenceConnector)
assert.True(t, ok)
assert.Equal(t, d.TargetDB, seataConnector.TargetDB)

openDBStub.Reset()
openConnectorStub.Reset()
}

func TestConnect(t *testing.T) {
c := &dsnConnector{
driver: &mysql.MySQLDriver{},
}

openStub := gomonkey.ApplyMethod(
reflect.TypeOf(c.driver),
"Open",
func(_ *mysql.MySQLDriver, name string) (driver.Conn, error) {
return nil, nil
},
)

conn, err := c.Connect(context.Background())
assert.Nil(t, err)
assert.Nil(t, conn)

openStub.Reset()

}

func TestDriver(t *testing.T) {
c := &dsnConnector{
driver: &mysql.MySQLDriver{},
}

assert.Equal(t, c.driver, c.Driver())
}

+ 7
- 22
pkg/rm/tcc/fence/handler/tcc_fence_wrapper_handler.go View File

@@ -68,7 +68,7 @@ func GetFenceHandler() *tccFenceWrapperHandler {
return fenceHandler
}

func (handler *tccFenceWrapperHandler) PrepareFence(ctx context.Context, tx *sql.Tx, callback func() error) error {
func (handler *tccFenceWrapperHandler) PrepareFence(ctx context.Context, tx *sql.Tx) error {
xid := tm.GetBusinessActionContext(ctx).Xid
branchId := tm.GetBusinessActionContext(ctx).BranchId
actionName := tm.GetBusinessActionContext(ctx).ActionName
@@ -82,16 +82,10 @@ func (handler *tccFenceWrapperHandler) PrepareFence(ctx context.Context, tx *sql
return fmt.Errorf("insert tcc fence record errors, prepare fence failed. xid= %s, branchId= %d, [%w]", xid, branchId, err)
}

log.Info("the phase 1 callback method will be called.")
err = callback()
if err != nil {
return fmt.Errorf("the business method error msg of: %p, [%w]", callback, err)
}

return nil
}

func (handler *tccFenceWrapperHandler) CommitFence(ctx context.Context, tx *sql.Tx, callback func() error) error {
func (handler *tccFenceWrapperHandler) CommitFence(ctx context.Context, tx *sql.Tx) error {
xid := tm.GetBusinessActionContext(ctx).Xid
branchId := tm.GetBusinessActionContext(ctx).BranchId

@@ -113,10 +107,10 @@ func (handler *tccFenceWrapperHandler) CommitFence(ctx context.Context, tx *sql.
return fmt.Errorf("branch transaction status is unexpected. xid: %s, branchId: %d, status: %d", xid, branchId, fenceDo.Status)
}

return handler.updateFenceStatusAndInvokeCallback(tx, callback, xid, branchId, enum.StatusCommitted)
return handler.updateFenceStatus(tx, xid, branchId, enum.StatusCommitted)
}

func (handler *tccFenceWrapperHandler) RollbackFence(ctx context.Context, tx *sql.Tx, callback func() error) error {
func (handler *tccFenceWrapperHandler) RollbackFence(ctx context.Context, tx *sql.Tx) error {
xid := tm.GetBusinessActionContext(ctx).Xid
branchId := tm.GetBusinessActionContext(ctx).BranchId
actionName := tm.GetBusinessActionContext(ctx).ActionName
@@ -146,7 +140,7 @@ func (handler *tccFenceWrapperHandler) RollbackFence(ctx context.Context, tx *sq
return fmt.Errorf("branch transaction status is unexpected. xid: %s, branchId: %d, status: %d", xid, branchId, fenceDo.Status)
}

return handler.updateFenceStatusAndInvokeCallback(tx, callback, xid, branchId, enum.StatusRollbacked)
return handler.updateFenceStatus(tx, xid, branchId, enum.StatusRollbacked)
}

func (handler *tccFenceWrapperHandler) insertTCCFenceLog(tx *sql.Tx, xid string, branchId int64, actionName string, status enum.FenceStatus) error {
@@ -159,17 +153,8 @@ func (handler *tccFenceWrapperHandler) insertTCCFenceLog(tx *sql.Tx, xid string,
return handler.tccFenceDao.InsertTCCFenceDO(tx, &tccFenceDo)
}

func (handler *tccFenceWrapperHandler) updateFenceStatusAndInvokeCallback(tx *sql.Tx, callback func() error, xid string, branchId int64, status enum.FenceStatus) error {
if err := handler.tccFenceDao.UpdateTCCFenceDO(tx, xid, branchId, enum.StatusTried, status); err != nil {
return err
}

log.Infof("the phase %d callback method will be called", status)
if err := callback(); err != nil {
return fmt.Errorf("the business method error msg of: %p, [%w]", callback, err)
}

return nil
func (handler *tccFenceWrapperHandler) updateFenceStatus(tx *sql.Tx, xid string, branchId int64, status enum.FenceStatus) error {
return handler.tccFenceDao.UpdateTCCFenceDO(tx, xid, branchId, enum.StatusTried, status)
}

func (handler *tccFenceWrapperHandler) InitLogCleanChannel() {


+ 17
- 1
pkg/tm/context.go View File

@@ -50,7 +50,9 @@ type BusinessActionContext struct {
}

type ContextVariable struct {
FencePhase enum.FencePhase
FencePhase enum.FencePhase
FenceTxBegined bool

BusinessActionContext *BusinessActionContext
// GlobalTransaction Represent seata ctx is a global transaction
GlobalTransaction
@@ -195,3 +197,17 @@ func GetFencePhase(ctx context.Context) enum.FencePhase {
}
return enum.FencePhaseNotExist
}

func SetFenceTxBeginedFlag(ctx context.Context, fenceTxBegined bool) {
if variable := ctx.Value(seataContextVariable); variable != nil {
variable.(*ContextVariable).FenceTxBegined = fenceTxBegined
}
}

func IsFenceTxBegin(ctx context.Context) bool {
if variable := ctx.Value(seataContextVariable); variable != nil {
return variable.(*ContextVariable).FenceTxBegined
}

return false
}

Loading…
Cancel
Save