You can not select more than 25 topics Topics must start with a chinese character,a letter or number, can include dashes ('-') and can be up to 35 characters long.

transaction_coordinator.go 31 kB

5 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
5 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
5 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
5 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
5 years ago
4 years ago
5 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
5 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
5 years ago
4 years ago
4 years ago
5 years ago

  1. package server
  2. import (
  3. "context"
  4. "fmt"
  5. "io"
  6. "os"
  7. "sync"
  8. "time"
  9. "github.com/gogo/protobuf/types"
  10. "go.uber.org/atomic"
  11. "google.golang.org/grpc/codes"
  12. "google.golang.org/grpc/metadata"
  13. "google.golang.org/grpc/status"
  14. "github.com/opentrx/seata-golang/v2/pkg/apis"
  15. common2 "github.com/opentrx/seata-golang/v2/pkg/common"
  16. "github.com/opentrx/seata-golang/v2/pkg/tc/config"
  17. "github.com/opentrx/seata-golang/v2/pkg/tc/event"
  18. "github.com/opentrx/seata-golang/v2/pkg/tc/holder"
  19. "github.com/opentrx/seata-golang/v2/pkg/tc/lock"
  20. "github.com/opentrx/seata-golang/v2/pkg/tc/model"
  21. "github.com/opentrx/seata-golang/v2/pkg/tc/storage/driver/factory"
  22. "github.com/opentrx/seata-golang/v2/pkg/util/common"
  23. "github.com/opentrx/seata-golang/v2/pkg/util/log"
  24. "github.com/opentrx/seata-golang/v2/pkg/util/runtime"
  25. time2 "github.com/opentrx/seata-golang/v2/pkg/util/time"
  26. "github.com/opentrx/seata-golang/v2/pkg/util/uuid"
  27. )
  28. const AlwaysRetryBoundary = 0
  29. type TransactionCoordinator struct {
  30. sync.Mutex
  31. maxCommitRetryTimeout int64
  32. maxRollbackRetryTimeout int64
  33. rollbackRetryTimeoutUnlockEnable bool
  34. asyncCommittingRetryPeriod time.Duration
  35. committingRetryPeriod time.Duration
  36. rollingBackRetryPeriod time.Duration
  37. timeoutRetryPeriod time.Duration
  38. streamMessageTimeout time.Duration
  39. holder *holder.SessionHolder
  40. resourceDataLocker *lock.LockManager
  41. locker GlobalSessionLocker
  42. idGenerator *atomic.Uint64
  43. futures *sync.Map
  44. activeApplications *sync.Map
  45. callBackMessages *sync.Map
  46. }
  47. func NewTransactionCoordinator(conf *config.Configuration) *TransactionCoordinator {
  48. driver, err := factory.Create(conf.Storage.Type(), conf.Storage.Parameters())
  49. if err != nil {
  50. log.Fatalf("failed to construct %s driver: %v", conf.Storage.Type(), err)
  51. os.Exit(1)
  52. }
  53. tc := &TransactionCoordinator{
  54. maxCommitRetryTimeout: conf.Server.MaxCommitRetryTimeout,
  55. maxRollbackRetryTimeout: conf.Server.MaxRollbackRetryTimeout,
  56. rollbackRetryTimeoutUnlockEnable: conf.Server.RollbackRetryTimeoutUnlockEnable,
  57. asyncCommittingRetryPeriod: conf.Server.AsyncCommittingRetryPeriod,
  58. committingRetryPeriod: conf.Server.CommittingRetryPeriod,
  59. rollingBackRetryPeriod: conf.Server.RollingBackRetryPeriod,
  60. timeoutRetryPeriod: conf.Server.TimeoutRetryPeriod,
  61. streamMessageTimeout: conf.Server.StreamMessageTimeout,
  62. holder: holder.NewSessionHolder(driver),
  63. resourceDataLocker: lock.NewLockManager(driver),
  64. locker: new(UnimplementedGlobalSessionLocker),
  65. idGenerator: &atomic.Uint64{},
  66. futures: &sync.Map{},
  67. activeApplications: &sync.Map{},
  68. callBackMessages: &sync.Map{},
  69. }
  70. go tc.processTimeoutCheck()
  71. go tc.processAsyncCommitting()
  72. go tc.processRetryCommitting()
  73. go tc.processRetryRollingBack()
  74. return tc
  75. }
  76. func (tc *TransactionCoordinator) Begin(ctx context.Context, request *apis.GlobalBeginRequest) (*apis.GlobalBeginResponse, error) {
  77. transactionID := uuid.NextID()
  78. xid := common.GenerateXID(request.Addressing, transactionID)
  79. gt := model.GlobalTransaction{
  80. GlobalSession: &apis.GlobalSession{
  81. Addressing: request.Addressing,
  82. XID: xid,
  83. TransactionID: transactionID,
  84. TransactionName: request.TransactionName,
  85. Timeout: request.Timeout,
  86. },
  87. }
  88. gt.Begin()
  89. err := tc.holder.AddGlobalSession(gt.GlobalSession)
  90. if err != nil {
  91. return &apis.GlobalBeginResponse{
  92. ResultCode: apis.ResultCodeFailed,
  93. ExceptionCode: apis.BeginFailed,
  94. Message: err.Error(),
  95. }, nil
  96. }
  97. runtime.GoWithRecover(func() {
  98. evt := event.NewGlobalTransactionEvent(gt.TransactionID, event.RoleTC, gt.TransactionName, gt.BeginTime, 0, gt.Status)
  99. event.EventBus.GlobalTransactionEventChannel <- evt
  100. }, nil)
  101. log.Infof("successfully begin global transaction xid = {}", gt.XID)
  102. return &apis.GlobalBeginResponse{
  103. ResultCode: apis.ResultCodeSuccess,
  104. XID: xid,
  105. }, nil
  106. }
  107. func (tc *TransactionCoordinator) GetStatus(ctx context.Context, request *apis.GlobalStatusRequest) (*apis.GlobalStatusResponse, error) {
  108. gs := tc.holder.FindGlobalSession(request.XID)
  109. if gs != nil {
  110. return &apis.GlobalStatusResponse{
  111. ResultCode: apis.ResultCodeSuccess,
  112. GlobalStatus: gs.Status,
  113. }, nil
  114. }
  115. return &apis.GlobalStatusResponse{
  116. ResultCode: apis.ResultCodeSuccess,
  117. GlobalStatus: apis.Finished,
  118. }, nil
  119. }
  120. func (tc *TransactionCoordinator) GlobalReport(ctx context.Context, request *apis.GlobalReportRequest) (*apis.GlobalReportResponse, error) {
  121. return nil, status.Errorf(codes.Unimplemented, "method GlobalReport not implemented")
  122. }
  123. func (tc *TransactionCoordinator) Commit(ctx context.Context, request *apis.GlobalCommitRequest) (*apis.GlobalCommitResponse, error) {
  124. gt := tc.holder.FindGlobalTransaction(request.XID)
  125. if gt == nil {
  126. return &apis.GlobalCommitResponse{
  127. ResultCode: apis.ResultCodeSuccess,
  128. GlobalStatus: apis.Finished,
  129. }, nil
  130. }
  131. shouldCommit, err := func(gt *model.GlobalTransaction) (bool, error) {
  132. result, err := tc.locker.TryLock(gt.GlobalSession, time.Duration(gt.Timeout)*time.Millisecond)
  133. if err != nil {
  134. return false, err
  135. }
  136. if result {
  137. defer tc.locker.Unlock(gt.GlobalSession)
  138. if gt.Active {
  139. // Active need persistence
  140. // Highlight: Firstly, close the session, then no more branch can be registered.
  141. err = tc.holder.InactiveGlobalSession(gt.GlobalSession)
  142. if err != nil {
  143. return false, err
  144. }
  145. }
  146. tc.resourceDataLocker.ReleaseGlobalSessionLock(gt)
  147. if gt.Status == apis.Begin {
  148. err = tc.holder.UpdateGlobalSessionStatus(gt.GlobalSession, apis.Committing)
  149. if err != nil {
  150. return false, err
  151. }
  152. return true, nil
  153. }
  154. return false, nil
  155. }
  156. return false, fmt.Errorf("failed to lock global transaction xid = %s", request.XID)
  157. }(gt)
  158. if err != nil {
  159. return &apis.GlobalCommitResponse{
  160. ResultCode: apis.ResultCodeFailed,
  161. ExceptionCode: apis.FailedLockGlobalTransaction,
  162. Message: err.Error(),
  163. GlobalStatus: gt.Status,
  164. }, nil
  165. }
  166. if !shouldCommit {
  167. if gt.Status == apis.AsyncCommitting {
  168. return &apis.GlobalCommitResponse{
  169. ResultCode: apis.ResultCodeSuccess,
  170. GlobalStatus: apis.Committed,
  171. }, nil
  172. }
  173. return &apis.GlobalCommitResponse{
  174. ResultCode: apis.ResultCodeSuccess,
  175. GlobalStatus: gt.Status,
  176. }, nil
  177. }
  178. if gt.CanBeCommittedAsync() {
  179. err = tc.holder.UpdateGlobalSessionStatus(gt.GlobalSession, apis.AsyncCommitting)
  180. if err != nil {
  181. return nil, err
  182. }
  183. return &apis.GlobalCommitResponse{
  184. ResultCode: apis.ResultCodeSuccess,
  185. GlobalStatus: apis.Committed,
  186. }, nil
  187. }
  188. _, err = tc.doGlobalCommit(gt, false)
  189. if err != nil {
  190. return &apis.GlobalCommitResponse{
  191. ResultCode: apis.ResultCodeFailed,
  192. ExceptionCode: apis.UnknownErr,
  193. Message: err.Error(),
  194. GlobalStatus: gt.Status,
  195. }, nil
  196. }
  197. return &apis.GlobalCommitResponse{
  198. ResultCode: apis.ResultCodeSuccess,
  199. GlobalStatus: apis.Committed,
  200. }, nil
  201. }
  202. func (tc *TransactionCoordinator) doGlobalCommit(gt *model.GlobalTransaction, retrying bool) (bool, error) {
  203. var err error
  204. runtime.GoWithRecover(func() {
  205. evt := event.NewGlobalTransactionEvent(gt.TransactionID, event.RoleTC, gt.TransactionName, gt.BeginTime, 0, gt.Status)
  206. event.EventBus.GlobalTransactionEventChannel <- evt
  207. }, nil)
  208. if gt.IsSaga() {
  209. return false, status.Errorf(codes.Unimplemented, "method Commit not supported saga mode")
  210. }
  211. for bs := range gt.BranchSessions {
  212. if bs.Status == apis.PhaseOneFailed {
  213. tc.resourceDataLocker.ReleaseLock(bs)
  214. delete(gt.BranchSessions, bs)
  215. err = tc.holder.RemoveBranchSession(gt.GlobalSession, bs)
  216. if err != nil {
  217. return false, err
  218. }
  219. continue
  220. }
  221. branchStatus, err1 := tc.branchCommit(bs)
  222. if err1 != nil {
  223. log.Errorf("exception committing branch xid=%d branchID=%d, err: %v", bs.GetXID(), bs.BranchID, err1)
  224. if !retrying {
  225. err = tc.holder.UpdateGlobalSessionStatus(gt.GlobalSession, apis.CommitRetrying)
  226. if err != nil {
  227. return false, err
  228. }
  229. }
  230. return false, err1
  231. }
  232. switch branchStatus {
  233. case apis.PhaseTwoCommitted:
  234. tc.resourceDataLocker.ReleaseLock(bs)
  235. delete(gt.BranchSessions, bs)
  236. err = tc.holder.RemoveBranchSession(gt.GlobalSession, bs)
  237. if err != nil {
  238. return false, err
  239. }
  240. continue
  241. case apis.PhaseTwoCommitFailedCanNotRetry:
  242. {
  243. if gt.CanBeCommittedAsync() {
  244. log.Errorf("by [%s], failed to commit branch %v", bs.Status.String(), bs)
  245. continue
  246. } else {
  247. // change status first, if need retention global session data,
  248. // might not remove global session, then, the status is very important.
  249. err = tc.holder.UpdateGlobalSessionStatus(gt.GlobalSession, apis.CommitFailed)
  250. if err != nil {
  251. return false, err
  252. }
  253. tc.resourceDataLocker.ReleaseGlobalSessionLock(gt)
  254. err = tc.holder.RemoveGlobalTransaction(gt)
  255. if err != nil {
  256. return false, err
  257. }
  258. log.Errorf("finally, failed to commit global[%d] since branch[%d] commit failed", gt.XID, bs.BranchID)
  259. return false, nil
  260. }
  261. }
  262. default:
  263. {
  264. if !retrying {
  265. err = tc.holder.UpdateGlobalSessionStatus(gt.GlobalSession, apis.CommitRetrying)
  266. if err != nil {
  267. return false, err
  268. }
  269. return false, nil
  270. }
  271. if gt.CanBeCommittedAsync() {
  272. log.Errorf("by [%s], failed to commit branch %v", bs.Status.String(), bs)
  273. continue
  274. } else {
  275. log.Errorf("failed to commit global[%d] since branch[%d] commit failed, will retry later.", gt.XID, bs.BranchID)
  276. return false, nil
  277. }
  278. }
  279. }
  280. }
  281. gs := tc.holder.FindGlobalTransaction(gt.XID)
  282. if gs != nil && gs.HasBranch() {
  283. log.Infof("global[%d] committing is NOT done.", gt.XID)
  284. return false, nil
  285. }
  286. // change status first, if need retention global session data,
  287. // might not remove global session, then, the status is very important.
  288. err = tc.holder.UpdateGlobalSessionStatus(gt.GlobalSession, apis.Committed)
  289. if err != nil {
  290. return false, err
  291. }
  292. tc.resourceDataLocker.ReleaseGlobalSessionLock(gt)
  293. err = tc.holder.RemoveGlobalTransaction(gt)
  294. if err != nil {
  295. return false, err
  296. }
  297. runtime.GoWithRecover(func() {
  298. evt := event.NewGlobalTransactionEvent(gt.TransactionID, event.RoleTC, gt.TransactionName, gt.BeginTime,
  299. int64(time2.CurrentTimeMillis()), gt.Status)
  300. event.EventBus.GlobalTransactionEventChannel <- evt
  301. }, nil)
  302. log.Infof("global[%d] committing is successfully done.", gt.XID)
  303. return true, err
  304. }
  305. func (tc *TransactionCoordinator) branchCommit(bs *apis.BranchSession) (apis.BranchSession_BranchStatus, error) {
  306. request := &apis.BranchCommitRequest{
  307. XID: bs.XID,
  308. BranchID: bs.BranchID,
  309. ResourceID: bs.ResourceID,
  310. LockKey: bs.LockKey,
  311. BranchType: bs.Type,
  312. ApplicationData: bs.ApplicationData,
  313. }
  314. content, err := types.MarshalAny(request)
  315. if err != nil {
  316. return bs.Status, err
  317. }
  318. message := &apis.BranchMessage{
  319. ID: int64(tc.idGenerator.Inc()),
  320. BranchMessageType: apis.TypeBranchCommit,
  321. Message: content,
  322. }
  323. queue, _ := tc.callBackMessages.LoadOrStore(bs.Addressing, NewCallbackMessageQueue())
  324. q := queue.(*CallbackMessageQueue)
  325. q.Enqueue(message)
  326. resp := common2.NewMessageFuture(message)
  327. tc.futures.Store(message.ID, resp)
  328. timer := time.NewTimer(tc.streamMessageTimeout)
  329. select {
  330. case <-timer.C:
  331. tc.futures.Delete(resp.ID)
  332. return bs.Status, fmt.Errorf("wait branch commit response timeout")
  333. case <-resp.Done:
  334. timer.Stop()
  335. }
  336. response, ok := resp.Response.(*apis.BranchCommitResponse)
  337. if !ok {
  338. log.Infof("rollback response: %v", resp.Response)
  339. return bs.Status, fmt.Errorf("response type not right")
  340. }
  341. if response.ResultCode == apis.ResultCodeSuccess {
  342. return response.BranchStatus, nil
  343. }
  344. return bs.Status, fmt.Errorf(response.Message)
  345. }
  346. func (tc *TransactionCoordinator) Rollback(ctx context.Context, request *apis.GlobalRollbackRequest) (*apis.GlobalRollbackResponse, error) {
  347. gt := tc.holder.FindGlobalTransaction(request.XID)
  348. if gt == nil {
  349. return &apis.GlobalRollbackResponse{
  350. ResultCode: apis.ResultCodeSuccess,
  351. GlobalStatus: apis.Finished,
  352. }, nil
  353. }
  354. shouldRollBack, err := func(gt *model.GlobalTransaction) (bool, error) {
  355. result, err := tc.locker.TryLock(gt.GlobalSession, time.Duration(gt.Timeout)*time.Millisecond)
  356. if err != nil {
  357. return false, err
  358. }
  359. if result {
  360. defer tc.locker.Unlock(gt.GlobalSession)
  361. if gt.Active {
  362. // Active need persistence
  363. // Highlight: Firstly, close the session, then no more branch can be registered.
  364. err = tc.holder.InactiveGlobalSession(gt.GlobalSession)
  365. if err != nil {
  366. return false, err
  367. }
  368. }
  369. if gt.Status == apis.Begin {
  370. err = tc.holder.UpdateGlobalSessionStatus(gt.GlobalSession, apis.RollingBack)
  371. if err != nil {
  372. return false, err
  373. }
  374. return true, nil
  375. }
  376. return false, nil
  377. }
  378. return false, fmt.Errorf("failed to lock global transaction xid = %s", request.XID)
  379. }(gt)
  380. if err != nil {
  381. return &apis.GlobalRollbackResponse{
  382. ResultCode: apis.ResultCodeFailed,
  383. ExceptionCode: apis.FailedLockGlobalTransaction,
  384. Message: err.Error(),
  385. GlobalStatus: gt.Status,
  386. }, nil
  387. }
  388. if !shouldRollBack {
  389. return &apis.GlobalRollbackResponse{
  390. ResultCode: apis.ResultCodeSuccess,
  391. GlobalStatus: gt.Status,
  392. }, nil
  393. }
  394. _, err = tc.doGlobalRollback(gt, false)
  395. if err != nil {
  396. return nil, err
  397. }
  398. return &apis.GlobalRollbackResponse{
  399. ResultCode: apis.ResultCodeSuccess,
  400. GlobalStatus: gt.Status,
  401. }, nil
  402. }
  403. func (tc *TransactionCoordinator) doGlobalRollback(gt *model.GlobalTransaction, retrying bool) (bool, error) {
  404. var err error
  405. runtime.GoWithRecover(func() {
  406. evt := event.NewGlobalTransactionEvent(gt.TransactionID, event.RoleTC, gt.TransactionName, gt.BeginTime, 0, gt.Status)
  407. event.EventBus.GlobalTransactionEventChannel <- evt
  408. }, nil)
  409. if gt.IsSaga() {
  410. return false, status.Errorf(codes.Unimplemented, "method Commit not supported saga mode")
  411. }
  412. for bs := range gt.BranchSessions {
  413. if bs.Status == apis.PhaseOneFailed {
  414. tc.resourceDataLocker.ReleaseLock(bs)
  415. delete(gt.BranchSessions, bs)
  416. err = tc.holder.RemoveBranchSession(gt.GlobalSession, bs)
  417. if err != nil {
  418. return false, err
  419. }
  420. continue
  421. }
  422. branchStatus, err1 := tc.branchRollback(bs)
  423. if err1 != nil {
  424. log.Errorf("exception rolling back branch xid=%d branchID=%d, err: %v", gt.XID, bs.BranchID, err1)
  425. if !retrying {
  426. if gt.IsTimeoutGlobalStatus() {
  427. err = tc.holder.UpdateGlobalSessionStatus(gt.GlobalSession, apis.TimeoutRollbackRetrying)
  428. if err != nil {
  429. return false, err
  430. }
  431. } else {
  432. err = tc.holder.UpdateGlobalSessionStatus(gt.GlobalSession, apis.RollbackRetrying)
  433. if err != nil {
  434. return false, err
  435. }
  436. }
  437. }
  438. return false, err1
  439. }
  440. switch branchStatus {
  441. case apis.PhaseTwoRolledBack:
  442. tc.resourceDataLocker.ReleaseLock(bs)
  443. delete(gt.BranchSessions, bs)
  444. err = tc.holder.RemoveBranchSession(gt.GlobalSession, bs)
  445. if err != nil {
  446. return false, err
  447. }
  448. log.Infof("successfully rollback branch xid=%d branchID=%d", gt.XID, bs.BranchID)
  449. continue
  450. case apis.PhaseTwoRollbackFailedCanNotRetry:
  451. if gt.IsTimeoutGlobalStatus() {
  452. err = tc.holder.UpdateGlobalSessionStatus(gt.GlobalSession, apis.TimeoutRollbackFailed)
  453. if err != nil {
  454. return false, err
  455. }
  456. } else {
  457. err = tc.holder.UpdateGlobalSessionStatus(gt.GlobalSession, apis.RollbackFailed)
  458. if err != nil {
  459. return false, err
  460. }
  461. }
  462. tc.resourceDataLocker.ReleaseGlobalSessionLock(gt)
  463. err = tc.holder.RemoveGlobalTransaction(gt)
  464. if err != nil {
  465. return false, err
  466. }
  467. log.Infof("failed to rollback branch and stop retry xid=%d branchID=%d", gt.XID, bs.BranchID)
  468. return false, nil
  469. default:
  470. log.Infof("failed to rollback branch xid=%d branchID=%d", gt.XID, bs.BranchID)
  471. if !retrying {
  472. if gt.IsTimeoutGlobalStatus() {
  473. err = tc.holder.UpdateGlobalSessionStatus(gt.GlobalSession, apis.TimeoutRollbackRetrying)
  474. if err != nil {
  475. return false, err
  476. }
  477. } else {
  478. err = tc.holder.UpdateGlobalSessionStatus(gt.GlobalSession, apis.RollbackRetrying)
  479. if err != nil {
  480. return false, err
  481. }
  482. }
  483. }
  484. return false, nil
  485. }
  486. }
  487. // In db mode, there is a problem of inconsistent data in multiple copies, resulting in new branch
  488. // transaction registration when rolling back.
  489. // 1. New branch transaction and rollback branch transaction have no data association
  490. // 2. New branch transaction has data association with rollback branch transaction
  491. // The second query can solve the first problem, and if it is the second problem, it may cause a rollback
  492. // failure due to data changes.
  493. gs := tc.holder.FindGlobalTransaction(gt.XID)
  494. if gs != nil && gs.HasBranch() {
  495. log.Infof("Global[%d] rolling back is NOT done.", gt.XID)
  496. return false, nil
  497. }
  498. if gt.IsTimeoutGlobalStatus() {
  499. err = tc.holder.UpdateGlobalSessionStatus(gt.GlobalSession, apis.TimeoutRolledBack)
  500. if err != nil {
  501. return false, err
  502. }
  503. } else {
  504. err = tc.holder.UpdateGlobalSessionStatus(gt.GlobalSession, apis.RolledBack)
  505. if err != nil {
  506. return false, err
  507. }
  508. }
  509. tc.resourceDataLocker.ReleaseGlobalSessionLock(gt)
  510. err = tc.holder.RemoveGlobalTransaction(gt)
  511. if err != nil {
  512. return false, err
  513. }
  514. runtime.GoWithRecover(func() {
  515. evt := event.NewGlobalTransactionEvent(gt.TransactionID, event.RoleTC, gt.TransactionName, gt.BeginTime,
  516. int64(time2.CurrentTimeMillis()), gt.Status)
  517. event.EventBus.GlobalTransactionEventChannel <- evt
  518. }, nil)
  519. log.Infof("successfully rollback global, xid = %d", gt.XID)
  520. return true, err
  521. }
  522. func (tc *TransactionCoordinator) branchRollback(bs *apis.BranchSession) (apis.BranchSession_BranchStatus, error) {
  523. request := &apis.BranchRollbackRequest{
  524. XID: bs.XID,
  525. BranchID: bs.BranchID,
  526. ResourceID: bs.ResourceID,
  527. LockKey: bs.LockKey,
  528. BranchType: bs.Type,
  529. ApplicationData: bs.ApplicationData,
  530. }
  531. content, err := types.MarshalAny(request)
  532. if err != nil {
  533. return bs.Status, err
  534. }
  535. message := &apis.BranchMessage{
  536. ID: int64(tc.idGenerator.Inc()),
  537. BranchMessageType: apis.TypeBranchRollback,
  538. Message: content,
  539. }
  540. queue, _ := tc.callBackMessages.LoadOrStore(bs.Addressing, NewCallbackMessageQueue())
  541. q := queue.(*CallbackMessageQueue)
  542. q.Enqueue(message)
  543. resp := common2.NewMessageFuture(message)
  544. tc.futures.Store(message.ID, resp)
  545. timer := time.NewTimer(tc.streamMessageTimeout)
  546. select {
  547. case <-timer.C:
  548. tc.futures.Delete(resp.ID)
  549. timer.Stop()
  550. return bs.Status, fmt.Errorf("wait branch rollback response timeout")
  551. case <-resp.Done:
  552. timer.Stop()
  553. }
  554. response := resp.Response.(*apis.BranchRollbackResponse)
  555. if response.ResultCode == apis.ResultCodeSuccess {
  556. return response.BranchStatus, nil
  557. }
  558. return bs.Status, fmt.Errorf(response.Message)
  559. }
  560. func (tc *TransactionCoordinator) BranchCommunicate(stream apis.ResourceManagerService_BranchCommunicateServer) error {
  561. var addressing string
  562. done := make(chan bool)
  563. ctx := stream.Context()
  564. md, ok := metadata.FromIncomingContext(ctx)
  565. if ok {
  566. addressing = md.Get("addressing")[0]
  567. c, ok := tc.activeApplications.Load(addressing)
  568. if ok {
  569. count := c.(int)
  570. tc.activeApplications.Store(addressing, count+1)
  571. } else {
  572. tc.activeApplications.Store(addressing, 1)
  573. }
  574. defer func() {
  575. c, _ := tc.activeApplications.Load(addressing)
  576. count := c.(int)
  577. tc.activeApplications.Store(addressing, count-1)
  578. }()
  579. }
  580. queue, _ := tc.callBackMessages.LoadOrStore(addressing, NewCallbackMessageQueue())
  581. q := queue.(*CallbackMessageQueue)
  582. runtime.GoWithRecover(func() {
  583. for {
  584. select {
  585. case _, ok := <-done:
  586. if !ok {
  587. return
  588. }
  589. default:
  590. msg := q.Dequeue()
  591. if msg == nil {
  592. break
  593. }
  594. err := stream.Send(msg)
  595. if err != nil {
  596. return
  597. }
  598. }
  599. }
  600. }, nil)
  601. for {
  602. select {
  603. case <-ctx.Done():
  604. close(done)
  605. return ctx.Err()
  606. default:
  607. branchMessage, err := stream.Recv()
  608. if err == io.EOF {
  609. close(done)
  610. return nil
  611. }
  612. if err != nil {
  613. close(done)
  614. return err
  615. }
  616. switch branchMessage.GetBranchMessageType() {
  617. case apis.TypeBranchCommitResult:
  618. response := &apis.BranchCommitResponse{}
  619. data := branchMessage.GetMessage().GetValue()
  620. err := response.Unmarshal(data)
  621. if err != nil {
  622. log.Error(err)
  623. continue
  624. }
  625. resp, loaded := tc.futures.Load(branchMessage.ID)
  626. if loaded {
  627. future := resp.(*common2.MessageFuture)
  628. future.Response = response
  629. future.Done <- true
  630. tc.futures.Delete(branchMessage.ID)
  631. }
  632. case apis.TypeBranchRollBackResult:
  633. response := &apis.BranchRollbackResponse{}
  634. data := branchMessage.GetMessage().GetValue()
  635. err := response.Unmarshal(data)
  636. if err != nil {
  637. log.Error(err)
  638. continue
  639. }
  640. resp, loaded := tc.futures.Load(branchMessage.ID)
  641. if loaded {
  642. future := resp.(*common2.MessageFuture)
  643. future.Response = response
  644. future.Done <- true
  645. tc.futures.Delete(branchMessage.ID)
  646. }
  647. }
  648. }
  649. }
  650. }
  651. func (tc *TransactionCoordinator) BranchRegister(ctx context.Context, request *apis.BranchRegisterRequest) (*apis.BranchRegisterResponse, error) {
  652. gt := tc.holder.FindGlobalTransaction(request.XID)
  653. if gt == nil {
  654. log.Errorf("could not found global transaction xid = %s", request.XID)
  655. return &apis.BranchRegisterResponse{
  656. ResultCode: apis.ResultCodeFailed,
  657. ExceptionCode: apis.GlobalTransactionNotExist,
  658. Message: fmt.Sprintf("could not found global transaction xid = %s", request.XID),
  659. }, nil
  660. }
  661. result, err := tc.locker.TryLock(gt.GlobalSession, time.Duration(gt.Timeout)*time.Millisecond)
  662. if err != nil {
  663. return &apis.BranchRegisterResponse{
  664. ResultCode: apis.ResultCodeFailed,
  665. ExceptionCode: apis.FailedLockGlobalTransaction,
  666. Message: fmt.Sprintf("could not found global transaction xid = %s", request.XID),
  667. }, nil
  668. }
  669. if result {
  670. defer tc.locker.Unlock(gt.GlobalSession)
  671. if !gt.Active {
  672. return &apis.BranchRegisterResponse{
  673. ResultCode: apis.ResultCodeFailed,
  674. ExceptionCode: apis.GlobalTransactionNotActive,
  675. Message: fmt.Sprintf("could not register branch into global session xid = %s status = %d", gt.XID, gt.Status),
  676. }, nil
  677. }
  678. if gt.Status != apis.Begin {
  679. return &apis.BranchRegisterResponse{
  680. ResultCode: apis.ResultCodeFailed,
  681. ExceptionCode: apis.GlobalTransactionStatusInvalid,
  682. Message: fmt.Sprintf("could not register branch into global session xid = %s status = %d while expecting %d",
  683. gt.XID, gt.Status, apis.Begin),
  684. }, nil
  685. }
  686. bs := &apis.BranchSession{
  687. Addressing: request.Addressing,
  688. XID: request.XID,
  689. BranchID: uuid.NextID(),
  690. TransactionID: gt.TransactionID,
  691. ResourceID: request.ResourceID,
  692. LockKey: request.LockKey,
  693. Type: request.BranchType,
  694. Status: apis.Registered,
  695. ApplicationData: request.ApplicationData,
  696. }
  697. if bs.Type == apis.AT {
  698. result := tc.resourceDataLocker.AcquireLock(bs)
  699. if !result {
  700. return &apis.BranchRegisterResponse{
  701. ResultCode: apis.ResultCodeFailed,
  702. ExceptionCode: apis.LockKeyConflict,
  703. Message: fmt.Sprintf("branch lock acquire failed xid = %s resourceId = %s, lockKey = %s",
  704. request.XID, request.ResourceID, request.LockKey),
  705. }, nil
  706. }
  707. }
  708. err := tc.holder.AddBranchSession(gt.GlobalSession, bs)
  709. if err != nil {
  710. log.Error(err)
  711. return &apis.BranchRegisterResponse{
  712. ResultCode: apis.ResultCodeFailed,
  713. ExceptionCode: apis.BranchRegisterFailed,
  714. Message: fmt.Sprintf("branch register failed, xid = %s, branchID = %d, err: %s", gt.XID, bs.BranchID, err.Error()),
  715. }, nil
  716. }
  717. return &apis.BranchRegisterResponse{
  718. ResultCode: apis.ResultCodeSuccess,
  719. BranchID: bs.BranchID,
  720. }, nil
  721. }
  722. return &apis.BranchRegisterResponse{
  723. ResultCode: apis.ResultCodeFailed,
  724. ExceptionCode: apis.FailedLockGlobalTransaction,
  725. Message: fmt.Sprintf("failed to lock global transaction xid = %s", request.XID),
  726. }, nil
  727. }
  728. func (tc *TransactionCoordinator) BranchReport(ctx context.Context, request *apis.BranchReportRequest) (*apis.BranchReportResponse, error) {
  729. gt := tc.holder.FindGlobalTransaction(request.XID)
  730. if gt == nil {
  731. log.Errorf("could not found global transaction xid = %s", request.XID)
  732. return &apis.BranchReportResponse{
  733. ResultCode: apis.ResultCodeFailed,
  734. ExceptionCode: apis.GlobalTransactionNotExist,
  735. Message: fmt.Sprintf("could not found global transaction xid = %s", request.XID),
  736. }, nil
  737. }
  738. bs := gt.GetBranch(request.BranchID)
  739. if bs == nil {
  740. return &apis.BranchReportResponse{
  741. ResultCode: apis.ResultCodeFailed,
  742. ExceptionCode: apis.BranchTransactionNotExist,
  743. Message: fmt.Sprintf("could not found branch session xid = %s branchID = %d", gt.XID, request.BranchID),
  744. }, nil
  745. }
  746. err := tc.holder.UpdateBranchSessionStatus(bs, request.BranchStatus)
  747. if err != nil {
  748. return &apis.BranchReportResponse{
  749. ResultCode: apis.ResultCodeFailed,
  750. ExceptionCode: apis.BranchReportFailed,
  751. Message: fmt.Sprintf("branch report failed, xid = %s, branchID = %d, err: %s", gt.XID, bs.BranchID, err.Error()),
  752. }, nil
  753. }
  754. return &apis.BranchReportResponse{
  755. ResultCode: apis.ResultCodeSuccess,
  756. }, nil
  757. }
  758. func (tc *TransactionCoordinator) LockQuery(ctx context.Context, request *apis.GlobalLockQueryRequest) (*apis.GlobalLockQueryResponse, error) {
  759. result := tc.resourceDataLocker.IsLockable(request.XID, request.ResourceID, request.LockKey)
  760. return &apis.GlobalLockQueryResponse{
  761. ResultCode: apis.ResultCodeSuccess,
  762. Lockable: result,
  763. }, nil
  764. }
  765. func (tc *TransactionCoordinator) processTimeoutCheck() {
  766. for {
  767. timer := time.NewTimer(tc.timeoutRetryPeriod)
  768. <-timer.C
  769. tc.timeoutCheck()
  770. timer.Stop()
  771. }
  772. }
  773. func (tc *TransactionCoordinator) processRetryRollingBack() {
  774. for {
  775. timer := time.NewTimer(tc.rollingBackRetryPeriod)
  776. <-timer.C
  777. tc.handleRetryRollingBack()
  778. timer.Stop()
  779. }
  780. }
  781. func (tc *TransactionCoordinator) processRetryCommitting() {
  782. for {
  783. timer := time.NewTimer(tc.committingRetryPeriod)
  784. <-timer.C
  785. tc.handleRetryCommitting()
  786. timer.Stop()
  787. }
  788. }
  789. func (tc *TransactionCoordinator) processAsyncCommitting() {
  790. for {
  791. timer := time.NewTimer(tc.asyncCommittingRetryPeriod)
  792. <-timer.C
  793. tc.handleAsyncCommitting()
  794. timer.Stop()
  795. }
  796. }
  797. func (tc *TransactionCoordinator) timeoutCheck() {
  798. sessions := tc.holder.FindGlobalSessions([]apis.GlobalSession_GlobalStatus{apis.Begin})
  799. if len(sessions) == 0 {
  800. return
  801. }
  802. for _, globalSession := range sessions {
  803. if isGlobalSessionTimeout(globalSession) {
  804. result, err := tc.locker.TryLock(globalSession, time.Duration(globalSession.Timeout)*time.Millisecond)
  805. if err == nil && result {
  806. if globalSession.Active {
  807. // Active need persistence
  808. // Highlight: Firstly, close the session, then no more branch can be registered.
  809. err = tc.holder.InactiveGlobalSession(globalSession)
  810. if err != nil {
  811. return
  812. }
  813. }
  814. err = tc.holder.UpdateGlobalSessionStatus(globalSession, apis.TimeoutRollingBack)
  815. if err != nil {
  816. return
  817. }
  818. tc.locker.Unlock(globalSession)
  819. evt := event.NewGlobalTransactionEvent(globalSession.TransactionID, event.RoleTC, globalSession.TransactionName, globalSession.BeginTime, 0, globalSession.Status)
  820. event.EventBus.GlobalTransactionEventChannel <- evt
  821. }
  822. }
  823. }
  824. }
  825. func (tc *TransactionCoordinator) handleRetryRollingBack() {
  826. addressingIdentities := tc.getAddressingIdentities()
  827. if len(addressingIdentities) == 0 {
  828. return
  829. }
  830. rollbackTransactions := tc.holder.FindRetryRollbackGlobalTransactions(addressingIdentities)
  831. if len(rollbackTransactions) == 0 {
  832. return
  833. }
  834. now := time2.CurrentTimeMillis()
  835. for _, transaction := range rollbackTransactions {
  836. if transaction.Status == apis.RollingBack && !transaction.IsRollingBackDead() {
  837. continue
  838. }
  839. if isRetryTimeout(int64(now), tc.maxRollbackRetryTimeout, transaction.BeginTime) {
  840. if tc.rollbackRetryTimeoutUnlockEnable {
  841. tc.resourceDataLocker.ReleaseGlobalSessionLock(transaction)
  842. }
  843. err := tc.holder.RemoveGlobalTransaction(transaction)
  844. if err != nil {
  845. log.Error(err)
  846. }
  847. log.Errorf("GlobalSession rollback retry timeout and removed [%s]", transaction.XID)
  848. continue
  849. }
  850. _, err := tc.doGlobalRollback(transaction, true)
  851. if err != nil {
  852. log.Errorf("failed to retry rollback [%s]", transaction.XID)
  853. }
  854. }
  855. }
  856. func isRetryTimeout(now int64, timeout int64, beginTime int64) bool {
  857. if timeout >= AlwaysRetryBoundary && now-beginTime > timeout {
  858. return true
  859. }
  860. return false
  861. }
  862. func (tc *TransactionCoordinator) handleRetryCommitting() {
  863. addressingIdentities := tc.getAddressingIdentities()
  864. if len(addressingIdentities) == 0 {
  865. return
  866. }
  867. committingTransactions := tc.holder.FindRetryCommittingGlobalTransactions(addressingIdentities)
  868. if len(committingTransactions) == 0 {
  869. return
  870. }
  871. now := time2.CurrentTimeMillis()
  872. for _, transaction := range committingTransactions {
  873. if isRetryTimeout(int64(now), tc.maxCommitRetryTimeout, transaction.BeginTime) {
  874. err := tc.holder.RemoveGlobalTransaction(transaction)
  875. if err != nil {
  876. log.Error(err)
  877. }
  878. log.Errorf("GlobalSession commit retry timeout and removed [%s]", transaction.XID)
  879. continue
  880. }
  881. _, err := tc.doGlobalCommit(transaction, true)
  882. if err != nil {
  883. log.Errorf("failed to retry committing [%s]", transaction.XID)
  884. }
  885. }
  886. }
  887. func (tc *TransactionCoordinator) handleAsyncCommitting() {
  888. addressingIdentities := tc.getAddressingIdentities()
  889. if len(addressingIdentities) == 0 {
  890. return
  891. }
  892. asyncCommittingTransactions := tc.holder.FindAsyncCommittingGlobalTransactions(addressingIdentities)
  893. if len(asyncCommittingTransactions) == 0 {
  894. return
  895. }
  896. for _, transaction := range asyncCommittingTransactions {
  897. if transaction.Status != apis.AsyncCommitting {
  898. continue
  899. }
  900. _, err := tc.doGlobalCommit(transaction, true)
  901. if err != nil {
  902. log.Errorf("failed to async committing [%s]", transaction.XID)
  903. }
  904. }
  905. }
  906. func (tc *TransactionCoordinator) getAddressingIdentities() []string {
  907. var addressIdentities []string
  908. tc.activeApplications.Range(func(key, value interface{}) bool {
  909. count := value.(int)
  910. if count > 0 {
  911. addressing := key.(string)
  912. addressIdentities = append(addressIdentities, addressing)
  913. }
  914. return true
  915. })
  916. return addressIdentities
  917. }
  918. func isGlobalSessionTimeout(gt *apis.GlobalSession) bool {
  919. return (time2.CurrentTimeMillis() - uint64(gt.BeginTime)) > uint64(gt.Timeout)
  920. }