Browse Source

fix xa branch commit bug (#564)

* fix xa branch commit bug
tags/v1.2.0
Yuecai Liu GitHub 2 years ago
parent
commit
bfb9eb12f4
No known key found for this signature in database GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 46 additions and 10 deletions
  1. +1
    -1
      goimports.sh
  2. +1
    -1
      pkg/client/client.go
  3. +6
    -6
      pkg/datasource/sql/conn_xa.go
  4. +9
    -1
      pkg/datasource/sql/db.go
  5. +1
    -0
      pkg/datasource/sql/driver.go
  6. +1
    -1
      pkg/datasource/sql/exec/at/multi_delete_executor.go
  7. +27
    -0
      pkg/datasource/sql/xa/mysql_xa_connection.go

+ 1
- 1
goimports.sh View File

@@ -16,7 +16,7 @@
#

# format go imports style
go install -v golang.org/x/tools/cmd/goimports
go install golang.org/x/tools/cmd/goimports
goimports -local github.com/seata/seata-go -w .

# format licence style


+ 1
- 1
pkg/client/client.go View File

@@ -18,7 +18,6 @@
package client

import (
"github.com/seata/seata-go/pkg/remoting/getty"
"sync"

"github.com/seata/seata-go/pkg/datasource"
@@ -26,6 +25,7 @@ import (
"github.com/seata/seata-go/pkg/datasource/sql/exec/config"
"github.com/seata/seata-go/pkg/integration"
remoteConfig "github.com/seata/seata-go/pkg/remoting/config"
"github.com/seata/seata-go/pkg/remoting/getty"
"github.com/seata/seata-go/pkg/remoting/processor/client"
"github.com/seata/seata-go/pkg/rm"
"github.com/seata/seata-go/pkg/rm/tcc"


+ 6
- 6
pkg/datasource/sql/conn_xa.go View File

@@ -322,22 +322,22 @@ func (c *XAConn) Commit(ctx context.Context) error {

now := time.Now()
if c.end(ctx, xa.TMSuccess) != nil {
return c.commitErrorHandle()
return c.commitErrorHandle(ctx)
}

if c.checkTimeout(ctx, now) != nil {
return c.commitErrorHandle()
return c.commitErrorHandle(ctx)
}

if c.xaResource.XAPrepare(ctx, c.xaBranchXid.String()) != nil {
return c.commitErrorHandle()
return c.commitErrorHandle(ctx)
}
return nil
}

func (c *XAConn) commitErrorHandle() error {
func (c *XAConn) commitErrorHandle(ctx context.Context) error {
var err error
if err = c.tx.Rollback(); err != nil {
if err = c.XaRollback(ctx, c.xaBranchXid); err != nil {
err = fmt.Errorf("failed to report XA branch commit-failure xid:%s, err:%w", c.txCtx.XID, err)
}
c.cleanXABranchContext()
@@ -389,7 +389,7 @@ func (c *XAConn) XaRollbackByBranchId(ctx context.Context, xaXid XAXid) error {
}

func (c *XAConn) XaRollback(ctx context.Context, xaXid XAXid) error {
err := c.xaResource.Rollback(ctx, xaXid.GetGlobalXid())
err := c.xaResource.Rollback(ctx, xaXid.String())
c.releaseIfNecessary()
return err
}

+ 9
- 1
pkg/datasource/sql/db.go View File

@@ -22,14 +22,15 @@ import (
"database/sql"
"database/sql/driver"
"fmt"
"github.com/seata/seata-go/pkg/util/log"
"sync"

"github.com/seata/seata-go/pkg/datasource/sql/datasource"
"github.com/seata/seata-go/pkg/datasource/sql/types"
"github.com/seata/seata-go/pkg/datasource/sql/undo"
"github.com/seata/seata-go/pkg/datasource/sql/util"
"github.com/seata/seata-go/pkg/datasource/sql/xa"
"github.com/seata/seata-go/pkg/protocol/branch"
"github.com/seata/seata-go/pkg/util/log"
)

type dbOption func(db *DBResource)
@@ -218,10 +219,17 @@ func (db *DBResource) ConnectionForXA(ctx context.Context, xaXid XAXid) (*XAConn
if err != nil {
return nil, fmt.Errorf("get xa new connection failure, xid:%s, err:%v", xaXid.String(), err)
}
xaResource, err := xa.CreateXAResource(newDriverConn, types.DBTypeMySQL)
if err != nil {
return nil, fmt.Errorf("create xa resoruce err:%w", err)
}
xaConn := &XAConn{
Conn: &Conn{
targetConn: newDriverConn,
res: db,
},
xaBranchXid: XaIdBuild(xaXid.GetGlobalXid(), xaXid.GetBranchId()),
xaResource: xaResource,
}
return xaConn, nil
}


+ 1
- 0
pkg/datasource/sql/driver.go View File

@@ -28,6 +28,7 @@ import (
"strings"

"github.com/go-sql-driver/mysql"

"github.com/seata/seata-go/pkg/datasource/sql/datasource"
mysql2 "github.com/seata/seata-go/pkg/datasource/sql/datasource/mysql"
"github.com/seata/seata-go/pkg/datasource/sql/types"


+ 1
- 1
pkg/datasource/sql/exec/at/multi_delete_executor.go View File

@@ -70,7 +70,7 @@ type multiDelete struct {
clear bool
}

//NewMultiDeleteExecutor get multiDelete executor
// NewMultiDeleteExecutor get multiDelete executor
func NewMultiDeleteExecutor(parserCtx *types.ParseContext, execContent *types.ExecContext, hooks []exec.SQLHook) *multiDeleteExecutor {
return &multiDeleteExecutor{parserCtx: parserCtx, execContext: execContent, baseExecutor: baseExecutor{hooks: hooks}}
}


+ 27
- 0
pkg/datasource/sql/xa/mysql_xa_connection.go View File

@@ -25,6 +25,8 @@ import (
"io"
"strings"
"time"

"github.com/seata/seata-go/pkg/util/log"
)

type MysqlXAConn struct {
@@ -36,6 +38,8 @@ func NewMysqlXaConn(conn driver.Conn) *MysqlXAConn {
}

func (c *MysqlXAConn) Commit(ctx context.Context, xid string, onePhase bool) error {
log.Infof("xa branch commit, xid %s", xid)

var sb strings.Builder
sb.WriteString("XA COMMIT ")
sb.WriteString("'")
@@ -47,10 +51,15 @@ func (c *MysqlXAConn) Commit(ctx context.Context, xid string, onePhase bool) err

conn, _ := c.Conn.(driver.ExecerContext)
_, err := conn.ExecContext(ctx, sb.String(), nil)
if err != nil {
log.Errorf("xa branch commit failed, xid %s, err %v", xid, err)
}
return err
}

func (c *MysqlXAConn) End(ctx context.Context, xid string, flags int) error {
log.Infof("xa branch end, xid %s", xid)

var sb strings.Builder
sb.WriteString("XA END ")
sb.WriteString("'")
@@ -71,6 +80,9 @@ func (c *MysqlXAConn) End(ctx context.Context, xid string, flags int) error {

conn, _ := c.Conn.(driver.ExecerContext)
_, err := conn.ExecContext(ctx, sb.String(), nil)
if err != nil {
log.Errorf("xa branch end failed, xid %s, err %v", xid, err)
}
return err
}

@@ -91,6 +103,8 @@ func (c *MysqlXAConn) IsSameRM(ctx context.Context, xares XAResource) bool {
}

func (c *MysqlXAConn) XAPrepare(ctx context.Context, xid string) error {
log.Infof("xa branch prepare, xid %s", xid)

var sb strings.Builder
sb.WriteString("XA PREPARE ")
sb.WriteString("'")
@@ -99,6 +113,9 @@ func (c *MysqlXAConn) XAPrepare(ctx context.Context, xid string) error {

conn, _ := c.Conn.(driver.ExecerContext)
_, err := conn.ExecContext(ctx, sb.String(), nil)
if err != nil {
log.Errorf("xa branch prepare failed, xid %s, err %v", xid, err)
}
return err
}

@@ -143,6 +160,8 @@ func (c *MysqlXAConn) Recover(ctx context.Context, flag int) (xids []string, err
}

func (c *MysqlXAConn) Rollback(ctx context.Context, xid string) error {
log.Infof("xa branch rollback, xid %s", xid)

var sb strings.Builder
sb.WriteString("XA ROLLBACK ")
sb.WriteString("'")
@@ -151,6 +170,9 @@ func (c *MysqlXAConn) Rollback(ctx context.Context, xid string) error {

conn, _ := c.Conn.(driver.ExecerContext)
_, err := conn.ExecContext(ctx, sb.String(), nil)
if err != nil {
log.Errorf("xa branch rollback failed, xid %s, err %v", xid, err)
}
return err
}

@@ -159,6 +181,8 @@ func (c *MysqlXAConn) SetTransactionTimeout(duration time.Duration) bool {
}

func (c *MysqlXAConn) Start(ctx context.Context, xid string, flags int) error {
log.Infof("xa branch start, xid %s", xid)

var sb strings.Builder
sb.WriteString("XA START ")
sb.WriteString("'")
@@ -180,5 +204,8 @@ func (c *MysqlXAConn) Start(ctx context.Context, xid string, flags int) error {

conn, _ := c.Conn.(driver.ExecerContext)
_, err := conn.ExecContext(ctx, sb.String(), nil)
if err != nil {
log.Errorf("xa branch start failed, xid %s, err %v", xid, err)
}
return err
}

Loading…
Cancel
Save