|
- /*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
- package sql
-
- import (
- "context"
- "flag"
- "time"
-
- "github.com/seata/seata-go/pkg/rm"
-
- "github.com/prometheus/client_golang/prometheus"
- "github.com/prometheus/client_golang/prometheus/promauto"
-
- "github.com/seata/seata-go/pkg/datasource/sql/datasource"
- "github.com/seata/seata-go/pkg/datasource/sql/undo"
- "github.com/seata/seata-go/pkg/protocol/branch"
- "github.com/seata/seata-go/pkg/util/fanout"
- "github.com/seata/seata-go/pkg/util/log"
- )
-
- type phaseTwoContext struct {
- Xid string
- BranchID int64
- ResourceID string
- }
-
- type AsyncWorkerConfig struct {
- BufferLimit int `yaml:"buffer_limit" json:"buffer_limit"`
- BufferCleanInterval time.Duration `yaml:"buffer_clean_interval" json:"buffer_clean_interval"`
- ReceiveChanSize int `yaml:"receive_chan_size" json:"receive_chan_size"`
- CommitWorkerCount int `yaml:"commit_worker_count" json:"commit_worker_count"`
- CommitWorkerBufferSize int `yaml:"commit_worker_buffer_size" json:"commit_worker_buffer_size"`
- }
-
- func (cfg *AsyncWorkerConfig) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) {
- f.IntVar(&cfg.BufferLimit, prefix+".buffer_size", 10000, "async worker commit buffer limit.")
- f.DurationVar(&cfg.BufferCleanInterval, prefix+".buffer.clean_interval", time.Second, "async worker commit buffer interval")
- f.IntVar(&cfg.ReceiveChanSize, prefix+".channel_size", 10000, "async worker commit channel size")
- f.IntVar(&cfg.CommitWorkerCount, prefix+".worker_count", 10, "async worker commit worker count")
- f.IntVar(&cfg.CommitWorkerBufferSize, prefix+".worker_buffer_size", 1000, "async worker commit worker buffer size")
- }
-
- // AsyncWorker executor for branch transaction commit and undo log
- type AsyncWorker struct {
- conf AsyncWorkerConfig
-
- commitQueue chan phaseTwoContext
- resourceMgr datasource.DataSourceManager
- commitWorker *fanout.Fanout
-
- branchCommitTotal prometheus.Counter
- doBranchCommitFailureTotal prometheus.Counter
- receiveChanLength prometheus.Gauge
- rePutBackToQueue prometheus.Counter
- }
-
- func NewAsyncWorker(prom prometheus.Registerer, conf AsyncWorkerConfig, sourceManager datasource.DataSourceManager) *AsyncWorker {
- var asyncWorker AsyncWorker
- asyncWorker.conf = conf
- asyncWorker.commitQueue = make(chan phaseTwoContext, asyncWorker.conf.ReceiveChanSize)
- asyncWorker.resourceMgr = sourceManager
- asyncWorker.commitWorker = fanout.New("asyncWorker",
- fanout.WithWorker(asyncWorker.conf.CommitWorkerCount),
- fanout.WithBuffer(asyncWorker.conf.CommitWorkerBufferSize),
- )
-
- asyncWorker.branchCommitTotal = promauto.With(prom).NewCounter(prometheus.CounterOpts{
- Name: "async_worker_branch_commit_total",
- Help: "the total count of branch commit total count",
- })
- asyncWorker.doBranchCommitFailureTotal = promauto.With(prom).NewCounter(prometheus.CounterOpts{
- Name: "async_worker_branch_commit_failure_total",
- Help: "the total count of branch commit failure count",
- })
- asyncWorker.receiveChanLength = promauto.With(prom).NewGauge(prometheus.GaugeOpts{
- Name: "async_worker_receive_channel_length",
- Help: "the current length of the receive channel size",
- })
- asyncWorker.rePutBackToQueue = promauto.With(prom).NewCounter(prometheus.CounterOpts{
- Name: "async_worker_commit_failure_retry_counter",
- Help: "the counter of commit failure retry counter",
- })
-
- go asyncWorker.run()
-
- return &asyncWorker
- }
-
- // BranchCommit commit branch transaction
- func (aw *AsyncWorker) BranchCommit(ctx context.Context, req rm.BranchResource) (branch.BranchStatus, error) {
- phaseCtx := phaseTwoContext{
- Xid: req.Xid,
- BranchID: req.BranchId,
- ResourceID: req.ResourceId,
- }
-
- aw.branchCommitTotal.Add(1)
-
- select {
- case aw.commitQueue <- phaseCtx:
- case <-ctx.Done():
- }
-
- aw.receiveChanLength.Add(float64(len(aw.commitQueue)))
-
- return branch.BranchStatusPhasetwoCommitted, nil
- }
-
- func (aw *AsyncWorker) run() {
- ticker := time.NewTicker(aw.conf.BufferCleanInterval)
- phaseCtxs := make([]phaseTwoContext, 0, aw.conf.BufferLimit)
- for {
- select {
- case phaseCtx := <-aw.commitQueue:
- phaseCtxs = append(phaseCtxs, phaseCtx)
- if len(phaseCtxs) >= aw.conf.BufferLimit*2/3 {
- aw.doBranchCommit(&phaseCtxs)
- }
- case <-ticker.C:
- aw.doBranchCommit(&phaseCtxs)
- }
- }
- }
-
- func (aw *AsyncWorker) doBranchCommit(phaseCtxs *[]phaseTwoContext) {
- if len(*phaseCtxs) == 0 {
- return
- }
-
- copyPhaseCtxs := make([]phaseTwoContext, len(*phaseCtxs))
- copy(copyPhaseCtxs, *phaseCtxs)
- *phaseCtxs = (*phaseCtxs)[:0]
-
- doBranchCommit := func(ctx context.Context) {
- groupCtxs := make(map[string][]phaseTwoContext, 16)
- for i := range copyPhaseCtxs {
- if copyPhaseCtxs[i].ResourceID == "" {
- continue
- }
-
- if _, ok := groupCtxs[copyPhaseCtxs[i].ResourceID]; !ok {
- groupCtxs[copyPhaseCtxs[i].ResourceID] = make([]phaseTwoContext, 0, 4)
- }
-
- ctxs := groupCtxs[copyPhaseCtxs[i].ResourceID]
- ctxs = append(ctxs, copyPhaseCtxs[i])
- groupCtxs[copyPhaseCtxs[i].ResourceID] = ctxs
- }
-
- for k := range groupCtxs {
- aw.dealWithGroupedContexts(k, groupCtxs[k])
- }
- }
-
- if err := aw.commitWorker.Do(context.Background(), doBranchCommit); err != nil {
- aw.doBranchCommitFailureTotal.Add(1)
- log.Errorf("do branch commit err:%v,phaseCtxs=%v", err, phaseCtxs)
- }
- }
-
- func (aw *AsyncWorker) dealWithGroupedContexts(resID string, phaseCtxs []phaseTwoContext) {
- val, ok := aw.resourceMgr.GetCachedResources().Load(resID)
- if !ok {
- for i := range phaseCtxs {
- aw.rePutBackToQueue.Add(1)
- aw.commitQueue <- phaseCtxs[i]
- }
- return
- }
-
- res := val.(*DBResource)
- conn, err := res.db.Conn(context.Background())
- if err != nil {
- for i := range phaseCtxs {
- aw.commitQueue <- phaseCtxs[i]
- }
- }
-
- defer conn.Close()
-
- undoMgr, err := undo.GetUndoLogManager(res.dbType)
- if err != nil {
- for i := range phaseCtxs {
- aw.rePutBackToQueue.Add(1)
- aw.commitQueue <- phaseCtxs[i]
- }
- return
- }
-
- for i := range phaseCtxs {
- phaseCtx := phaseCtxs[i]
- if err := undoMgr.BatchDeleteUndoLog([]string{phaseCtx.Xid}, []int64{phaseCtx.BranchID}, conn); err != nil {
- aw.rePutBackToQueue.Add(1)
- aw.commitQueue <- phaseCtx
- }
- }
- }
|