You can not select more than 25 topics Topics must start with a chinese character,a letter or number, can include dashes ('-') and can be up to 35 characters long.

at.go 7.0 kB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271
  1. /*
  2. * Licensed to the Apache Software Foundation (ASF) under one or more
  3. * contributor license agreements. See the NOTICE file distributed with
  4. * this work for additional information regarding copyright ownership.
  5. * The ASF licenses this file to You under the Apache License, Version 2.0
  6. * (the "License"); you may not use this file except in compliance with
  7. * the License. You may obtain a copy of the License at
  8. *
  9. * http://www.apache.org/licenses/LICENSE-2.0
  10. *
  11. * Unless required by applicable law or agreed to in writing, software
  12. * distributed under the License is distributed on an "AS IS" BASIS,
  13. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  14. * See the License for the specific language governing permissions and
  15. * limitations under the License.
  16. */
  17. package sql
  18. import (
  19. "context"
  20. "database/sql"
  21. "fmt"
  22. "os"
  23. "strconv"
  24. "sync"
  25. "time"
  26. "github.com/seata/seata-go/pkg/datasource/sql/undo"
  27. "github.com/seata/seata-go/pkg/datasource/sql/datasource"
  28. "github.com/seata/seata-go/pkg/datasource/sql/types"
  29. "github.com/seata/seata-go/pkg/protocol/branch"
  30. "github.com/seata/seata-go/pkg/protocol/message"
  31. "github.com/seata/seata-go/pkg/rm"
  32. )
  33. const (
  34. _defaultResourceSize = 16
  35. _undoLogDeleteLimitSize = 1000
  36. )
  37. func init() {
  38. datasource.RegisterResourceManager(branch.BranchTypeAT,
  39. &ATSourceManager{
  40. resourceCache: sync.Map{},
  41. basic: datasource.NewBasicSourceManager(),
  42. })
  43. }
  44. type ATSourceManager struct {
  45. resourceCache sync.Map
  46. worker *asyncATWorker
  47. basic *datasource.BasicSourceManager
  48. }
  49. // Register a Resource to be managed by Resource Manager
  50. func (mgr *ATSourceManager) RegisterResource(res rm.Resource) error {
  51. mgr.resourceCache.Store(res.GetResourceId(), res)
  52. return mgr.basic.RegisterResource(res)
  53. }
  54. // Unregister a Resource from the Resource Manager
  55. func (mgr *ATSourceManager) UnregisterResource(res rm.Resource) error {
  56. return mgr.basic.UnregisterResource(res)
  57. }
  58. // Get all resources managed by this manager
  59. func (mgr *ATSourceManager) GetManagedResources() map[string]rm.Resource {
  60. ret := make(map[string]rm.Resource)
  61. mgr.resourceCache.Range(func(key, value interface{}) bool {
  62. ret[key.(string)] = value.(rm.Resource)
  63. return true
  64. })
  65. return ret
  66. }
  67. // BranchRollback Rollback the corresponding transactions according to the request
  68. func (mgr *ATSourceManager) BranchRollback(ctx context.Context, req message.BranchRollbackRequest) (branch.BranchStatus, error) {
  69. val, ok := mgr.resourceCache.Load(req.ResourceId)
  70. if !ok {
  71. return branch.BranchStatusPhaseoneFailed, fmt.Errorf("resource %s not found", req.ResourceId)
  72. }
  73. res := val.(*DBResource)
  74. undoMgr, err := undo.GetUndoLogManager(res.dbType)
  75. if err != nil {
  76. return branch.BranchStatusUnknown, err
  77. }
  78. conn, err := res.target.Conn(ctx)
  79. if err != nil {
  80. return branch.BranchStatusUnknown, err
  81. }
  82. if err := undoMgr.RunUndo(req.Xid, req.BranchId, conn); err != nil {
  83. transErr, ok := err.(*types.TransactionError)
  84. if !ok {
  85. return branch.BranchStatusPhaseoneFailed, err
  86. }
  87. if transErr.Code() == types.ErrorCodeBranchRollbackFailedUnretriable {
  88. return branch.BranchStatusPhasetwoRollbackFailedUnretryable, nil
  89. }
  90. return branch.BranchStatusPhasetwoRollbackFailedRetryable, nil
  91. }
  92. return branch.BranchStatusPhasetwoRollbacked, nil
  93. }
  94. // BranchCommit
  95. func (mgr *ATSourceManager) BranchCommit(ctx context.Context, req message.BranchCommitRequest) (branch.BranchStatus, error) {
  96. mgr.worker.branchCommit(ctx, req)
  97. return branch.BranchStatusPhaseoneDone, nil
  98. }
  99. // LockQuery
  100. func (mgr *ATSourceManager) LockQuery(ctx context.Context, req message.GlobalLockQueryRequest) (bool, error) {
  101. return false, nil
  102. }
  103. // BranchRegister
  104. func (mgr *ATSourceManager) BranchRegister(ctx context.Context, clientId string, req message.BranchRegisterRequest) (int64, error) {
  105. return 0, nil
  106. }
  107. // BranchReport
  108. func (mgr *ATSourceManager) BranchReport(ctx context.Context, req message.BranchReportRequest) error {
  109. return nil
  110. }
  111. // CreateTableMetaCache
  112. func (mgr *ATSourceManager) CreateTableMetaCache(ctx context.Context, resID string, dbType types.DBType,
  113. db *sql.DB) (datasource.TableMetaCache, error) {
  114. return mgr.basic.CreateTableMetaCache(ctx, resID, dbType, db)
  115. }
  116. type asyncATWorker struct {
  117. asyncCommitBufferLimit int64
  118. commitQueue chan phaseTwoContext
  119. resourceMgr datasource.DataSourceManager
  120. }
  121. func newAsyncATWorker() *asyncATWorker {
  122. asyncCommitBufferLimit := int64(10000)
  123. val := os.Getenv("CLIENT_RM_ASYNC_COMMIT_BUFFER_LIMIT")
  124. if val != "" {
  125. limit, _ := strconv.ParseInt(val, 10, 64)
  126. if limit != 0 {
  127. asyncCommitBufferLimit = limit
  128. }
  129. }
  130. worker := &asyncATWorker{
  131. commitQueue: make(chan phaseTwoContext, asyncCommitBufferLimit),
  132. }
  133. return worker
  134. }
  135. func (w *asyncATWorker) doBranchCommitSafely() {
  136. batchSize := 64
  137. ticker := time.NewTicker(1 * time.Second)
  138. phaseCtxs := make([]phaseTwoContext, 0, batchSize)
  139. for {
  140. select {
  141. case phaseCtx := <-w.commitQueue:
  142. phaseCtxs = append(phaseCtxs, phaseCtx)
  143. if len(phaseCtxs) == batchSize {
  144. tmp := phaseCtxs
  145. w.doBranchCommit(tmp)
  146. phaseCtxs = make([]phaseTwoContext, 0, batchSize)
  147. }
  148. case <-ticker.C:
  149. tmp := phaseCtxs
  150. w.doBranchCommit(tmp)
  151. phaseCtxs = make([]phaseTwoContext, 0, batchSize)
  152. }
  153. }
  154. }
  155. func (w *asyncATWorker) doBranchCommit(phaseCtxs []phaseTwoContext) {
  156. groupCtxs := make(map[string][]phaseTwoContext, _defaultResourceSize)
  157. for i := range phaseCtxs {
  158. if phaseCtxs[i].ResourceID == "" {
  159. continue
  160. }
  161. if _, ok := groupCtxs[phaseCtxs[i].ResourceID]; !ok {
  162. groupCtxs[phaseCtxs[i].ResourceID] = make([]phaseTwoContext, 0, 4)
  163. }
  164. ctxs := groupCtxs[phaseCtxs[i].ResourceID]
  165. ctxs = append(ctxs, phaseCtxs[i])
  166. groupCtxs[phaseCtxs[i].ResourceID] = ctxs
  167. }
  168. for k := range groupCtxs {
  169. w.dealWithGroupedContexts(k, groupCtxs[k])
  170. }
  171. }
  172. func (w *asyncATWorker) dealWithGroupedContexts(resID string, phaseCtxs []phaseTwoContext) {
  173. val, ok := w.resourceMgr.GetManagedResources()[resID]
  174. if !ok {
  175. for i := range phaseCtxs {
  176. w.commitQueue <- phaseCtxs[i]
  177. }
  178. return
  179. }
  180. res := val.(*DBResource)
  181. conn, err := res.target.Conn(context.Background())
  182. if err != nil {
  183. for i := range phaseCtxs {
  184. w.commitQueue <- phaseCtxs[i]
  185. }
  186. }
  187. defer conn.Close()
  188. undoMgr, err := undo.GetUndoLogManager(res.dbType)
  189. if err != nil {
  190. for i := range phaseCtxs {
  191. w.commitQueue <- phaseCtxs[i]
  192. }
  193. return
  194. }
  195. for i := range phaseCtxs {
  196. phaseCtx := phaseCtxs[i]
  197. if err := undoMgr.BatchDeleteUndoLog([]string{phaseCtx.Xid}, []int64{phaseCtx.BranchID}, conn); err != nil {
  198. w.commitQueue <- phaseCtx
  199. }
  200. }
  201. }
  202. func (w *asyncATWorker) branchCommit(ctx context.Context, req message.BranchCommitRequest) {
  203. phaseCtx := phaseTwoContext{
  204. Xid: req.Xid,
  205. BranchID: req.BranchId,
  206. ResourceID: req.ResourceId,
  207. }
  208. select {
  209. case w.commitQueue <- phaseCtx:
  210. case <-ctx.Done():
  211. }
  212. return
  213. }
  214. type phaseTwoContext struct {
  215. Xid string
  216. BranchID int64
  217. ResourceID string
  218. }