|
- /*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
- package handler
-
- import (
- "container/list"
- "context"
- "database/sql"
- "fmt"
- "sync"
- "time"
-
- "github.com/seata/seata-go/pkg/rm/tcc/fence/enum"
- "github.com/seata/seata-go/pkg/rm/tcc/fence/store/db/dao"
- "github.com/seata/seata-go/pkg/rm/tcc/fence/store/db/model"
- "github.com/seata/seata-go/pkg/tm"
- seataErrors "github.com/seata/seata-go/pkg/util/errors"
- "github.com/seata/seata-go/pkg/util/log"
- )
-
- type tccFenceWrapperHandler struct {
- tccFenceDao dao.TCCFenceStore
- logQueue chan *FenceLogIdentity
- logCache list.List
- logQueueOnce sync.Once
- logQueueCloseOnce sync.Once
- }
-
- type FenceLogIdentity struct {
- xid string
- branchId int64
- }
-
- const (
- maxQueueSize = 500
- )
-
- var (
- fenceHandler *tccFenceWrapperHandler
- fenceOnce sync.Once
- )
-
- func GetFenceHandler() *tccFenceWrapperHandler {
- if fenceHandler == nil {
- fenceOnce.Do(func() {
- fenceHandler = &tccFenceWrapperHandler{
- tccFenceDao: dao.GetTccFenceStoreDatabaseMapper(),
- }
- })
- }
- return fenceHandler
- }
-
- func (handler *tccFenceWrapperHandler) PrepareFence(ctx context.Context, tx *sql.Tx, callback func() error) error {
- xid := tm.GetBusinessActionContext(ctx).Xid
- branchId := tm.GetBusinessActionContext(ctx).BranchId
- actionName := tm.GetBusinessActionContext(ctx).ActionName
-
- err := handler.insertTCCFenceLog(tx, xid, branchId, actionName, enum.StatusTried)
- if err != nil {
- dbError, ok := err.(seataErrors.TccFenceError)
- if ok && dbError.Code == seataErrors.TccFenceDbDuplicateKeyError {
- // todo add clean command to channel.
- handler.pushCleanChannel(xid, branchId)
- }
-
- return seataErrors.NewTccFenceError(
- seataErrors.PrepareFenceError,
- fmt.Sprintf("insert tcc fence record errors, prepare fence failed. xid= %s, branchId= %d", xid, branchId),
- err,
- )
- }
-
- log.Info("the phase 1 callback method will be called.")
- err = callback()
- if err != nil {
- return seataErrors.NewTccFenceError(
- seataErrors.FenceBusinessError,
- fmt.Sprintf("the business method error msg of: %p", callback),
- err,
- )
- }
-
- return nil
- }
-
- func (handler *tccFenceWrapperHandler) CommitFence(ctx context.Context, tx *sql.Tx, callback func() error) error {
- xid := tm.GetBusinessActionContext(ctx).Xid
- branchId := tm.GetBusinessActionContext(ctx).BranchId
-
- fenceDo, err := handler.tccFenceDao.QueryTCCFenceDO(tx, xid, branchId)
- if err != nil {
- return seataErrors.NewTccFenceError(seataErrors.CommitFenceError,
- fmt.Sprintf(" commit fence method failed. xid= %s, branchId= %d ", xid, branchId),
- err,
- )
- }
- if fenceDo == nil {
- return seataErrors.NewTccFenceError(seataErrors.CommitFenceError,
- fmt.Sprintf("tcc fence record not exists, commit fence method failed. xid= %s, branchId= %d ", xid, branchId),
- err,
- )
- }
-
- if fenceDo.Status == enum.StatusCommitted {
- log.Infof("branch transaction has already committed before. idempotency rejected. xid: %s, branchId: %d, status: %d", xid, branchId, fenceDo.Status)
- return nil
- }
- if fenceDo.Status == enum.StatusRollbacked || fenceDo.Status == enum.StatusSuspended {
- // enable warn level
- log.Warnf("branch transaction status is unexpected. xid: %s, branchId: %d, status: %s", xid, branchId, fenceDo.Status)
- return seataErrors.NewTccFenceError(seataErrors.CommitFenceError,
- fmt.Sprintf("branch transaction status is unexpected. xid: %s, branchId: %d, status: %d", xid, branchId, fenceDo.Status),
- nil,
- )
- }
-
- return handler.updateFenceStatusAndInvokeCallback(tx, callback, xid, branchId, enum.StatusCommitted)
- }
-
- func (handler *tccFenceWrapperHandler) RollbackFence(ctx context.Context, tx *sql.Tx, callback func() error) error {
- xid := tm.GetBusinessActionContext(ctx).Xid
- branchId := tm.GetBusinessActionContext(ctx).BranchId
- actionName := tm.GetBusinessActionContext(ctx).ActionName
- fenceDo, err := handler.tccFenceDao.QueryTCCFenceDO(tx, xid, branchId)
- if err != nil {
- return seataErrors.NewTccFenceError(seataErrors.RollbackFenceError,
- fmt.Sprintf(" rollback fence method failed. xid= %s, branchId= %d ", xid, branchId),
- err,
- )
- }
-
- // record is null, mean the need suspend
- if fenceDo == nil {
- err = handler.insertTCCFenceLog(tx, xid, branchId, actionName, enum.StatusSuspended)
- if err != nil {
- return seataErrors.NewTccFenceError(seataErrors.RollbackFenceError,
- fmt.Sprintf("insert tcc fence suspend record error, rollback fence method failed. xid= %s, branchId= %d", xid, branchId),
- err,
- )
- }
- log.Infof("Insert tcc fence suspend record xid: %s, branchId: %d", xid, branchId)
- return nil
- }
-
- // have rollbacked or suspended
- if fenceDo.Status == enum.StatusRollbacked || fenceDo.Status == enum.StatusSuspended {
- // enable warn level
- log.Infof("Branch transaction had already rollbacked before, idempotency rejected. xid: %s, branchId: %d, status: %s", xid, branchId, fenceDo.Status)
- return nil
- }
- if fenceDo.Status == enum.StatusCommitted {
- log.Warnf("Branch transaction status is unexpected. xid: %s, branchId: %d, status: %d", xid, branchId, fenceDo.Status)
- return seataErrors.NewTccFenceError(seataErrors.RollbackFenceError,
- fmt.Sprintf("branch transaction status is unexpected. xid: %s, branchId: %d, status: %d", xid, branchId, fenceDo.Status),
- err,
- )
- }
-
- return handler.updateFenceStatusAndInvokeCallback(tx, callback, xid, branchId, enum.StatusRollbacked)
- }
-
- func (handler *tccFenceWrapperHandler) insertTCCFenceLog(tx *sql.Tx, xid string, branchId int64, actionName string, status enum.FenceStatus) error {
- tccFenceDo := model.TCCFenceDO{
- Xid: xid,
- BranchId: branchId,
- ActionName: actionName,
- Status: status,
- }
- return handler.tccFenceDao.InsertTCCFenceDO(tx, &tccFenceDo)
- }
-
- func (handler *tccFenceWrapperHandler) updateFenceStatusAndInvokeCallback(tx *sql.Tx, callback func() error, xid string, branchId int64, status enum.FenceStatus) error {
- if err := handler.tccFenceDao.UpdateTCCFenceDO(tx, xid, branchId, enum.StatusTried, status); err != nil {
- return err
- }
-
- log.Infof("the phase %d callback method will be called", status)
- if err := callback(); err != nil {
- return seataErrors.NewTccFenceError(
- seataErrors.FenceBusinessError,
- fmt.Sprintf("the business method error msg of: %p", callback),
- err,
- )
- }
-
- return nil
- }
-
- func (handler *tccFenceWrapperHandler) InitLogCleanChannel() {
- handler.logQueueOnce.Do(func() {
- go handler.traversalCleanChannel()
- })
- }
-
- func (handler *tccFenceWrapperHandler) DestroyLogCleanChannel() {
- handler.logQueueCloseOnce.Do(func() {
- close(handler.logQueue)
- })
- }
-
- func (handler *tccFenceWrapperHandler) deleteFence(xid string, id int64) error {
- // todo implement
- return nil
- }
-
- func (handler *tccFenceWrapperHandler) deleteFenceByDate(datetime time.Time) int32 {
- // todo implement
- return 0
- }
-
- func (handler *tccFenceWrapperHandler) pushCleanChannel(xid string, branchId int64) {
- // todo implement
- fli := &FenceLogIdentity{
- xid: xid,
- branchId: branchId,
- }
- select {
- case handler.logQueue <- fli:
- // todo add batch delete from log cache.
- default:
- handler.logCache.PushBack(fli)
- }
- log.Infof("add one log to clean queue: %v ", fli)
- }
-
- func (handler *tccFenceWrapperHandler) traversalCleanChannel() {
- handler.logQueue = make(chan *FenceLogIdentity, maxQueueSize)
- for li := range handler.logQueue {
- if err := handler.deleteFence(li.xid, li.branchId); err != nil {
- log.Errorf("delete fence log failed, xid: %s, branchId: &s", li.xid, li.branchId)
- }
- }
- }
|