You can not select more than 25 topics Topics must start with a chinese character,a letter or number, can include dashes ('-') and can be up to 35 characters long.

transaction_executor.go 7.0 kB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215
  1. /*
  2. * Licensed to the Apache Software Foundation (ASF) under one or more
  3. * contributor license agreements. See the NOTICE file distributed with
  4. * this work for additional information regarding copyright ownership.
  5. * The ASF licenses this file to You under the Apache License, Version 2.0
  6. * (the "License"); you may not use this file except in compliance with
  7. * the License. You may obtain a copy of the License at
  8. *
  9. * http://www.apache.org/licenses/LICENSE-2.0
  10. *
  11. * Unless required by applicable law or agreed to in writing, software
  12. * distributed under the License is distributed on an "AS IS" BASIS,
  13. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  14. * See the License for the specific language governing permissions and
  15. * limitations under the License.
  16. */
  17. package tm
  18. import (
  19. "context"
  20. "fmt"
  21. "time"
  22. "github.com/pkg/errors"
  23. "seata.apache.org/seata-go/pkg/protocol/message"
  24. "seata.apache.org/seata-go/pkg/util/log"
  25. )
  26. type GtxConfig struct {
  27. Timeout time.Duration
  28. Name string
  29. Propagation Propagation
  30. LockRetryInternal time.Duration
  31. LockRetryTimes int16
  32. }
  33. // CallbackWithCtx business callback definition
  34. type CallbackWithCtx func(ctx context.Context) error
  35. // WithGlobalTx begin a global transaction and make it step into committed or rollbacked status.
  36. func WithGlobalTx(ctx context.Context, gc *GtxConfig, business CallbackWithCtx) (re error) {
  37. if gc == nil {
  38. return fmt.Errorf("global transaction config info is required.")
  39. }
  40. if gc.Name == "" {
  41. return fmt.Errorf("global transaction name is required.")
  42. }
  43. // open global transaction for the first time
  44. if !IsSeataContext(ctx) {
  45. ctx = InitSeataContext(ctx)
  46. }
  47. if IsGlobalTx(ctx) {
  48. ctx = transferTx(ctx)
  49. }
  50. if re = begin(ctx, gc); re != nil {
  51. return
  52. }
  53. defer func() {
  54. var err error
  55. deferErr := recover()
  56. // no need to do second phase if propagation is some type e.g. NotSupported.
  57. if IsGlobalTx(ctx) {
  58. // business maybe to throw panic, so need to recover it here.
  59. if err = commitOrRollback(ctx, deferErr == nil && re == nil); err != nil {
  60. log.Errorf("global transaction xid %s, name %s second phase error", GetXID(ctx), GetTxName(ctx), err)
  61. }
  62. }
  63. if re != nil || err != nil {
  64. re = fmt.Errorf("first phase error: %v, second phase error: %v", re, err)
  65. }
  66. }()
  67. re = business(ctx)
  68. return
  69. }
  70. // begin a global transaction, it will obtain a xid and put into ctx from tc by tcp rpc.
  71. // it will to call two function beginNewGtx and useExistGtx
  72. // they do these operations on the transaction:
  73. // beginNewGtx:
  74. // start a new transaction, the previous transaction may be suspended or not exist
  75. // useExistGtx:
  76. // use the previous transaction, but the transaction obtained by propagation,
  77. // but will modify the current transaction role to participant.
  78. // in local transaction mode, the transaction role may be overwritten due to sharing a ctx,
  79. // so it is also necessary to provide suspend and resume operations like xid,
  80. // but here I do not use this method, instead, simulate the call in rpc mode,
  81. // construct a new context object and set the xid.
  82. // the advantage of this is that the suspend and resume operations of xid need not to be considered.
  83. func begin(ctx context.Context, gc *GtxConfig) error {
  84. // record time
  85. ti := TimeInfo{createTime: time.Duration(time.Now().Unix()), timeout: gc.Timeout}
  86. SetTimeInfo(ctx, ti)
  87. switch pg := gc.Propagation; pg {
  88. case NotSupported:
  89. // If transaction is existing, suspend it
  90. // return then to execute without transaction
  91. if IsGlobalTx(ctx) {
  92. // because each global transaction operation will use a new context,
  93. // there is no need to implement a suspend operation, just unbind the xid here.
  94. // the same is true for the following case that needs to be suspended.
  95. UnbindXid(ctx)
  96. }
  97. return nil
  98. case Supports:
  99. // if transaction is not existing, return then to execute without transaction
  100. // else beginNewGtx transaction then return
  101. if IsGlobalTx(ctx) {
  102. useExistGtx(ctx, gc)
  103. }
  104. return nil
  105. case RequiresNew:
  106. // if transaction is existing, suspend it, and then Begin beginNewGtx transaction.
  107. if IsGlobalTx(ctx) {
  108. UnbindXid(ctx)
  109. }
  110. case Required:
  111. // default case, If current transaction is existing, execute with current transaction,
  112. // else continue and execute with beginNewGtx transaction.
  113. if IsGlobalTx(ctx) {
  114. useExistGtx(ctx, gc)
  115. return nil
  116. }
  117. case Never:
  118. // if transaction is existing, throw exception.
  119. if IsGlobalTx(ctx) {
  120. return fmt.Errorf("existing transaction found for transaction marked with pg 'never', xid = %s", GetXID(ctx))
  121. }
  122. // return then to execute without transaction.
  123. return nil
  124. case Mandatory:
  125. // if transaction is not existing, throw exception.
  126. // else execute with current transaction.
  127. if IsGlobalTx(ctx) {
  128. useExistGtx(ctx, gc)
  129. return nil
  130. }
  131. return fmt.Errorf("no existing transaction found for transaction marked with pg 'mandatory'")
  132. default:
  133. return fmt.Errorf("not supported propagation:%d", pg)
  134. }
  135. // the follow will to construct a new transaction with xid.
  136. return beginNewGtx(ctx, gc)
  137. }
  138. // commitOrRollback commit or rollback the global transaction
  139. func commitOrRollback(ctx context.Context, isSuccess bool) (re error) {
  140. switch *GetTxRole(ctx) {
  141. case Launcher:
  142. if tx := GetTx(ctx); isSuccess {
  143. if re = GetGlobalTransactionManager().Commit(ctx, tx); re != nil {
  144. log.Errorf("transactionTemplate: commit transaction failed, error %v", re)
  145. }
  146. } else {
  147. if re = GetGlobalTransactionManager().Rollback(ctx, tx); re != nil {
  148. log.Errorf("transactionTemplate: Rollback transaction failed, error %v", re)
  149. }
  150. }
  151. case Participant:
  152. // participant has no responsibility of rollback
  153. log.Infof("ignore second phase(commit or rollback): just involved in global transaction [%s/%s]", GetTxName(ctx), GetXID(ctx))
  154. case UnKnow:
  155. re = errors.New("global transaction role is UnKnow.")
  156. }
  157. return
  158. }
  159. // beginNewGtx to construct a default global transaction
  160. func beginNewGtx(ctx context.Context, gc *GtxConfig) error {
  161. timeout := gc.Timeout
  162. if timeout == 0 {
  163. timeout = config.DefaultGlobalTransactionTimeout
  164. }
  165. SetTxRole(ctx, Launcher)
  166. SetTxName(ctx, gc.Name)
  167. SetTxStatus(ctx, message.GlobalStatusBegin)
  168. if err := GetGlobalTransactionManager().Begin(ctx, timeout); err != nil {
  169. return fmt.Errorf("transactionTemplate: Begin transaction failed, error %v", err)
  170. }
  171. return nil
  172. }
  173. // useExistGtx if xid is not empty, then construct a global transaction
  174. func useExistGtx(ctx context.Context, gc *GtxConfig) {
  175. if xid := GetXID(ctx); xid != "" {
  176. SetTx(ctx, &GlobalTransaction{
  177. Xid: GetXID(ctx),
  178. TxStatus: message.GlobalStatusBegin,
  179. TxRole: Participant,
  180. TxName: gc.Name,
  181. })
  182. }
  183. }
  184. // transferTx transfer the gtx into a new ctx from old ctx.
  185. // use it to implement suspend and resume instead
  186. func transferTx(ctx context.Context) context.Context {
  187. newCtx := InitSeataContext(ctx)
  188. SetXID(newCtx, GetXID(ctx))
  189. return newCtx
  190. }