Browse Source

fix: fix xa bug (#557)

tags/v1.2.0
georgehao GitHub 2 years ago
parent
commit
bbc1af0fe9
No known key found for this signature in database GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 12 additions and 6 deletions
  1. +2
    -4
      pkg/datasource/sql/conn_xa.go
  2. +8
    -0
      pkg/datasource/sql/xa/mysql_xa_connection.go
  3. +2
    -2
      pkg/datasource/sql/xa_resource_manager.go

+ 2
- 4
pkg/datasource/sql/conn_xa.go View File

@@ -370,15 +370,13 @@ func (c *XAConn) CloseForce() error {
return nil
}

func (c *XAConn) XaCommit(ctx context.Context, xid string, branchId int64) error {
xaXid := XaIdBuild(xid, uint64(branchId))
func (c *XAConn) XaCommit(ctx context.Context, xaXid XAXid) error {
err := c.xaResource.Commit(ctx, xaXid.String(), false)
c.releaseIfNecessary()
return err
}

func (c *XAConn) XaRollbackByBranchId(ctx context.Context, xid string, branchId int64) error {
xaXid := XaIdBuild(xid, uint64(branchId))
func (c *XAConn) XaRollbackByBranchId(ctx context.Context, xaXid XAXid) error {
return c.XaRollback(ctx, xaXid)
}



+ 8
- 0
pkg/datasource/sql/xa/mysql_xa_connection.go View File

@@ -38,7 +38,9 @@ func NewMysqlXaConn(conn driver.Conn) *MysqlXAConn {
func (c *MysqlXAConn) Commit(ctx context.Context, xid string, onePhase bool) error {
var sb strings.Builder
sb.WriteString("XA COMMIT ")
sb.WriteString("'")
sb.WriteString(xid)
sb.WriteString("'")
if onePhase {
sb.WriteString(" ONE PHASE")
}
@@ -51,7 +53,9 @@ func (c *MysqlXAConn) Commit(ctx context.Context, xid string, onePhase bool) err
func (c *MysqlXAConn) End(ctx context.Context, xid string, flags int) error {
var sb strings.Builder
sb.WriteString("XA END ")
sb.WriteString("'")
sb.WriteString(xid)
sb.WriteString("'")

switch flags {
case TMSuccess:
@@ -89,7 +93,9 @@ func (c *MysqlXAConn) IsSameRM(ctx context.Context, xares XAResource) bool {
func (c *MysqlXAConn) XAPrepare(ctx context.Context, xid string) error {
var sb strings.Builder
sb.WriteString("XA PREPARE ")
sb.WriteString("'")
sb.WriteString(xid)
sb.WriteString("'")

conn, _ := c.Conn.(driver.ExecerContext)
_, err := conn.ExecContext(ctx, sb.String(), nil)
@@ -139,7 +145,9 @@ func (c *MysqlXAConn) Recover(ctx context.Context, flag int) (xids []string, err
func (c *MysqlXAConn) Rollback(ctx context.Context, xid string) error {
var sb strings.Builder
sb.WriteString("XA ROLLBACK ")
sb.WriteString("'")
sb.WriteString(xid)
sb.WriteString("'")

conn, _ := c.Conn.(driver.ExecerContext)
_, err := conn.ExecContext(ctx, sb.String(), nil)


+ 2
- 2
pkg/datasource/sql/xa_resource_manager.go View File

@@ -167,7 +167,7 @@ func (xaManager *XAResourceManager) BranchCommit(ctx context.Context, branchReso
return branch.BranchStatusPhasetwoRollbackFailedUnretryable, err
}

if commitErr := connectionProxyXA.XaCommit(ctx, xaID.String(), branchResource.BranchId); commitErr != nil {
if commitErr := connectionProxyXA.XaCommit(ctx, xaID); commitErr != nil {
err := fmt.Errorf("rollback xa, resourceId: %s", branchResource.ResourceId)
log.Errorf(err.Error())
setBranchStatus(xaID.String(), branch.BranchStatusPhasetwoCommitted)
@@ -185,7 +185,7 @@ func (xaManager *XAResourceManager) BranchRollback(ctx context.Context, branchRe
return branch.BranchStatusPhasetwoRollbackFailedUnretryable, err
}

if rollbackErr := connectionProxyXA.XaRollbackByBranchId(ctx, xaID.String(), branchResource.BranchId); rollbackErr != nil {
if rollbackErr := connectionProxyXA.XaRollbackByBranchId(ctx, xaID); rollbackErr != nil {
err := fmt.Errorf("rollback xa, resourceId: %s", branchResource.ResourceId)
log.Errorf(err.Error())
setBranchStatus(xaID.String(), branch.BranchStatusPhasetwoRollbacked)


Loading…
Cancel
Save