You can not select more than 25 topics Topics must start with a chinese character,a letter or number, can include dashes ('-') and can be up to 35 characters long.

async_worker.go 7.0 kB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213
  1. /*
  2. * Licensed to the Apache Software Foundation (ASF) under one or more
  3. * contributor license agreements. See the NOTICE file distributed with
  4. * this work for additional information regarding copyright ownership.
  5. * The ASF licenses this file to You under the Apache License, Version 2.0
  6. * (the "License"); you may not use this file except in compliance with
  7. * the License. You may obtain a copy of the License at
  8. *
  9. * http://www.apache.org/licenses/LICENSE-2.0
  10. *
  11. * Unless required by applicable law or agreed to in writing, software
  12. * distributed under the License is distributed on an "AS IS" BASIS,
  13. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  14. * See the License for the specific language governing permissions and
  15. * limitations under the License.
  16. */
  17. package sql
  18. import (
  19. "context"
  20. "flag"
  21. "time"
  22. "seata.apache.org/seata-go/pkg/rm"
  23. "github.com/prometheus/client_golang/prometheus"
  24. "github.com/prometheus/client_golang/prometheus/promauto"
  25. "seata.apache.org/seata-go/pkg/datasource/sql/datasource"
  26. "seata.apache.org/seata-go/pkg/datasource/sql/undo"
  27. "seata.apache.org/seata-go/pkg/protocol/branch"
  28. "seata.apache.org/seata-go/pkg/util/fanout"
  29. "seata.apache.org/seata-go/pkg/util/log"
  30. )
  31. type phaseTwoContext struct {
  32. Xid string
  33. BranchID int64
  34. ResourceID string
  35. }
  36. type AsyncWorkerConfig struct {
  37. BufferLimit int `yaml:"buffer_limit" json:"buffer_limit"`
  38. BufferCleanInterval time.Duration `yaml:"buffer_clean_interval" json:"buffer_clean_interval"`
  39. ReceiveChanSize int `yaml:"receive_chan_size" json:"receive_chan_size"`
  40. CommitWorkerCount int `yaml:"commit_worker_count" json:"commit_worker_count"`
  41. CommitWorkerBufferSize int `yaml:"commit_worker_buffer_size" json:"commit_worker_buffer_size"`
  42. }
  43. func (cfg *AsyncWorkerConfig) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) {
  44. f.IntVar(&cfg.BufferLimit, prefix+".buffer_size", 10000, "async worker commit buffer limit.")
  45. f.DurationVar(&cfg.BufferCleanInterval, prefix+".buffer.clean_interval", time.Second, "async worker commit buffer interval")
  46. f.IntVar(&cfg.ReceiveChanSize, prefix+".channel_size", 10000, "async worker commit channel size")
  47. f.IntVar(&cfg.CommitWorkerCount, prefix+".worker_count", 10, "async worker commit worker count")
  48. f.IntVar(&cfg.CommitWorkerBufferSize, prefix+".worker_buffer_size", 1000, "async worker commit worker buffer size")
  49. }
  50. // AsyncWorker executor for branch transaction commit and undo log
  51. type AsyncWorker struct {
  52. conf AsyncWorkerConfig
  53. commitQueue chan phaseTwoContext
  54. resourceMgr datasource.DataSourceManager
  55. commitWorker *fanout.Fanout
  56. branchCommitTotal prometheus.Counter
  57. doBranchCommitFailureTotal prometheus.Counter
  58. receiveChanLength prometheus.Gauge
  59. rePutBackToQueue prometheus.Counter
  60. }
  61. func NewAsyncWorker(prom prometheus.Registerer, conf AsyncWorkerConfig, sourceManager datasource.DataSourceManager) *AsyncWorker {
  62. var asyncWorker AsyncWorker
  63. asyncWorker.conf = conf
  64. asyncWorker.commitQueue = make(chan phaseTwoContext, asyncWorker.conf.ReceiveChanSize)
  65. asyncWorker.resourceMgr = sourceManager
  66. asyncWorker.commitWorker = fanout.New("asyncWorker",
  67. fanout.WithWorker(asyncWorker.conf.CommitWorkerCount),
  68. fanout.WithBuffer(asyncWorker.conf.CommitWorkerBufferSize),
  69. )
  70. asyncWorker.branchCommitTotal = promauto.With(prom).NewCounter(prometheus.CounterOpts{
  71. Name: "async_worker_branch_commit_total",
  72. Help: "the total count of branch commit total count",
  73. })
  74. asyncWorker.doBranchCommitFailureTotal = promauto.With(prom).NewCounter(prometheus.CounterOpts{
  75. Name: "async_worker_branch_commit_failure_total",
  76. Help: "the total count of branch commit failure count",
  77. })
  78. asyncWorker.receiveChanLength = promauto.With(prom).NewGauge(prometheus.GaugeOpts{
  79. Name: "async_worker_receive_channel_length",
  80. Help: "the current length of the receive channel size",
  81. })
  82. asyncWorker.rePutBackToQueue = promauto.With(prom).NewCounter(prometheus.CounterOpts{
  83. Name: "async_worker_commit_failure_retry_counter",
  84. Help: "the counter of commit failure retry counter",
  85. })
  86. go asyncWorker.run()
  87. return &asyncWorker
  88. }
  89. // BranchCommit commit branch transaction
  90. func (aw *AsyncWorker) BranchCommit(ctx context.Context, req rm.BranchResource) (branch.BranchStatus, error) {
  91. phaseCtx := phaseTwoContext{
  92. Xid: req.Xid,
  93. BranchID: req.BranchId,
  94. ResourceID: req.ResourceId,
  95. }
  96. aw.branchCommitTotal.Add(1)
  97. select {
  98. case aw.commitQueue <- phaseCtx:
  99. case <-ctx.Done():
  100. }
  101. aw.receiveChanLength.Add(float64(len(aw.commitQueue)))
  102. return branch.BranchStatusPhasetwoCommitted, nil
  103. }
  104. func (aw *AsyncWorker) run() {
  105. ticker := time.NewTicker(aw.conf.BufferCleanInterval)
  106. phaseCtxs := make([]phaseTwoContext, 0, aw.conf.BufferLimit)
  107. for {
  108. select {
  109. case phaseCtx := <-aw.commitQueue:
  110. phaseCtxs = append(phaseCtxs, phaseCtx)
  111. if len(phaseCtxs) >= aw.conf.BufferLimit*2/3 {
  112. aw.doBranchCommit(&phaseCtxs)
  113. }
  114. case <-ticker.C:
  115. aw.doBranchCommit(&phaseCtxs)
  116. }
  117. }
  118. }
  119. func (aw *AsyncWorker) doBranchCommit(phaseCtxs *[]phaseTwoContext) {
  120. if len(*phaseCtxs) == 0 {
  121. return
  122. }
  123. copyPhaseCtxs := make([]phaseTwoContext, len(*phaseCtxs))
  124. copy(copyPhaseCtxs, *phaseCtxs)
  125. *phaseCtxs = (*phaseCtxs)[:0]
  126. doBranchCommit := func(ctx context.Context) {
  127. groupCtxs := make(map[string][]phaseTwoContext, 16)
  128. for i := range copyPhaseCtxs {
  129. if copyPhaseCtxs[i].ResourceID == "" {
  130. continue
  131. }
  132. if _, ok := groupCtxs[copyPhaseCtxs[i].ResourceID]; !ok {
  133. groupCtxs[copyPhaseCtxs[i].ResourceID] = make([]phaseTwoContext, 0, 4)
  134. }
  135. ctxs := groupCtxs[copyPhaseCtxs[i].ResourceID]
  136. ctxs = append(ctxs, copyPhaseCtxs[i])
  137. groupCtxs[copyPhaseCtxs[i].ResourceID] = ctxs
  138. }
  139. for k := range groupCtxs {
  140. aw.dealWithGroupedContexts(k, groupCtxs[k])
  141. }
  142. }
  143. if err := aw.commitWorker.Do(context.Background(), doBranchCommit); err != nil {
  144. aw.doBranchCommitFailureTotal.Add(1)
  145. log.Errorf("do branch commit err:%v,phaseCtxs=%v", err, phaseCtxs)
  146. }
  147. }
  148. func (aw *AsyncWorker) dealWithGroupedContexts(resID string, phaseCtxs []phaseTwoContext) {
  149. val, ok := aw.resourceMgr.GetCachedResources().Load(resID)
  150. if !ok {
  151. for i := range phaseCtxs {
  152. aw.rePutBackToQueue.Add(1)
  153. aw.commitQueue <- phaseCtxs[i]
  154. }
  155. return
  156. }
  157. res := val.(*DBResource)
  158. conn, err := res.db.Conn(context.Background())
  159. if err != nil {
  160. for i := range phaseCtxs {
  161. aw.commitQueue <- phaseCtxs[i]
  162. }
  163. }
  164. defer conn.Close()
  165. undoMgr, err := undo.GetUndoLogManager(res.dbType)
  166. if err != nil {
  167. for i := range phaseCtxs {
  168. aw.rePutBackToQueue.Add(1)
  169. aw.commitQueue <- phaseCtxs[i]
  170. }
  171. return
  172. }
  173. for i := range phaseCtxs {
  174. phaseCtx := phaseCtxs[i]
  175. if err := undoMgr.BatchDeleteUndoLog([]string{phaseCtx.Xid}, []int64{phaseCtx.BranchID}, conn); err != nil {
  176. aw.rePutBackToQueue.Add(1)
  177. aw.commitQueue <- phaseCtx
  178. }
  179. }
  180. }