You can not select more than 25 topics Topics must start with a chinese character,a letter or number, can include dashes ('-') and can be up to 35 characters long.

saga_resource_manager.go 5.8 kB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159
  1. /*
  2. * Licensed to the Apache Software Foundation (ASF) under one or more
  3. * contributor license agreements. See the NOTICE file distributed with
  4. * this work for additional information regarding copyright ownership.
  5. * The ASF licenses this file to You under the Apache License, Version 2.0
  6. * (the "License"); you may not use this file except in compliance with
  7. * the License. You may obtain a copy of the License at
  8. *
  9. * http://www.apache.org/licenses/LICENSE-2.0
  10. *
  11. * Unless required by applicable law or agreed to in writing, software
  12. * distributed under the License is distributed on an "AS IS" BASIS,
  13. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  14. * See the License for the specific language governing permissions and
  15. * limitations under the License.
  16. */
  17. package rm
  18. import (
  19. "bytes"
  20. "context"
  21. "fmt"
  22. "log"
  23. "sync"
  24. "github.com/seata/seata-go/pkg/protocol/branch"
  25. "github.com/seata/seata-go/pkg/protocol/message"
  26. "github.com/seata/seata-go/pkg/rm"
  27. "github.com/seata/seata-go/pkg/saga/statemachine/engine/exception"
  28. "github.com/seata/seata-go/pkg/saga/statemachine/statelang"
  29. seataErrors "github.com/seata/seata-go/pkg/util/errors"
  30. )
  31. var (
  32. sagaResourceManagerInstance *SagaResourceManager
  33. once sync.Once
  34. )
  35. type SagaResourceManager struct {
  36. rmRemoting *rm.RMRemoting
  37. resourceCache sync.Map
  38. }
  39. func InitSaga() {
  40. rm.GetRmCacheInstance().RegisterResourceManager(GetSagaResourceManager())
  41. }
  42. func GetSagaResourceManager() *SagaResourceManager {
  43. once.Do(func() {
  44. sagaResourceManagerInstance = &SagaResourceManager{
  45. rmRemoting: rm.GetRMRemotingInstance(),
  46. resourceCache: sync.Map{},
  47. }
  48. })
  49. return sagaResourceManagerInstance
  50. }
  51. func (s *SagaResourceManager) RegisterResource(resource rm.Resource) error {
  52. if _, ok := resource.(*SagaResource); !ok {
  53. return fmt.Errorf("register saga resource error, SagaResource is needed, param %v", resource)
  54. }
  55. s.resourceCache.Store(resource.GetResourceId(), resource)
  56. return s.rmRemoting.RegisterResource(resource)
  57. }
  58. func (s *SagaResourceManager) GetCachedResources() *sync.Map {
  59. return &s.resourceCache
  60. }
  61. func (s *SagaResourceManager) GetBranchType() branch.BranchType {
  62. return branch.BranchTypeSAGA
  63. }
  64. func (s *SagaResourceManager) BranchCommit(ctx context.Context, resource rm.BranchResource) (branch.BranchStatus, error) {
  65. engine := GetStateMachineEngine()
  66. stMaInst, err := engine.Forward(ctx, resource.Xid, nil)
  67. if err != nil {
  68. if fie, ok := exception.IsForwardInvalidException(err); ok {
  69. log.Printf("StateMachine forward failed, xid: %s, err: %v", resource.Xid, err)
  70. if isInstanceNotExists(fie.ErrCode) {
  71. return branch.BranchStatusPhasetwoCommitted, nil
  72. }
  73. }
  74. log.Printf("StateMachine forward failed, xid: %s, err: %v", resource.Xid, err)
  75. return branch.BranchStatusPhasetwoCommitFailedRetryable, err
  76. }
  77. status := stMaInst.Status()
  78. compStatus := stMaInst.CompensationStatus()
  79. switch {
  80. case status == statelang.SU && compStatus == "":
  81. return branch.BranchStatusPhasetwoCommitted, nil
  82. case compStatus == statelang.SU:
  83. return branch.BranchStatusPhasetwoRollbacked, nil
  84. case compStatus == statelang.FA || compStatus == statelang.UN:
  85. return branch.BranchStatusPhasetwoRollbackFailedRetryable, nil
  86. case status == statelang.FA && compStatus == "":
  87. return branch.BranchStatusPhaseoneFailed, nil
  88. default:
  89. return branch.BranchStatusPhasetwoCommitFailedRetryable, nil
  90. }
  91. }
  92. func (s *SagaResourceManager) BranchRollback(ctx context.Context, resource rm.BranchResource) (branch.BranchStatus, error) {
  93. engine := GetStateMachineEngine()
  94. stMaInst, err := engine.ReloadStateMachineInstance(ctx, resource.Xid)
  95. if err != nil || stMaInst == nil {
  96. return branch.BranchStatusPhasetwoRollbacked, nil
  97. }
  98. strategy := stMaInst.StateMachine().RecoverStrategy()
  99. appData := resource.ApplicationData
  100. isTimeoutRollback := bytes.Equal(appData, []byte{byte(message.GlobalStatusTimeoutRollbacking)}) || bytes.Equal(appData, []byte{byte(message.GlobalStatusTimeoutRollbackRetrying)})
  101. if strategy == statelang.Forward && isTimeoutRollback {
  102. log.Printf("Retry by custom recover strategy [Forward] on timeout, SAGA global[%s]", resource.Xid)
  103. return branch.BranchStatusPhasetwoCommitFailedRetryable, nil
  104. }
  105. stMaInst, err = engine.Compensate(ctx, resource.Xid, nil)
  106. if err == nil && stMaInst.CompensationStatus() == statelang.SU {
  107. return branch.BranchStatusPhasetwoRollbacked, nil
  108. }
  109. if fie, ok := exception.IsEngineExecutionException(err); ok {
  110. log.Printf("StateMachine compensate failed, xid: %s, err: %v", resource.Xid, err)
  111. if isInstanceNotExists(fie.ErrCode) {
  112. return branch.BranchStatusPhasetwoRollbacked, nil
  113. }
  114. }
  115. log.Printf("StateMachine compensate failed, xid: %s, err: %v", resource.Xid, err)
  116. return branch.BranchStatusPhasetwoRollbackFailedRetryable, err
  117. }
  118. func (s *SagaResourceManager) BranchRegister(ctx context.Context, param rm.BranchRegisterParam) (int64, error) {
  119. return s.rmRemoting.BranchRegister(param)
  120. }
  121. func (s *SagaResourceManager) BranchReport(ctx context.Context, param rm.BranchReportParam) error {
  122. return s.rmRemoting.BranchReport(param)
  123. }
  124. func (s *SagaResourceManager) LockQuery(ctx context.Context, param rm.LockQueryParam) (bool, error) {
  125. // LockQuery is not supported for Saga resources
  126. return false, fmt.Errorf("LockQuery is not supported for Saga resources")
  127. }
  128. func (s *SagaResourceManager) UnregisterResource(resource rm.Resource) error {
  129. // UnregisterResource is not supported for SagaResourceManager
  130. return fmt.Errorf("UnregisterResource is not supported for SagaResourceManager")
  131. }
  132. // isInstanceNotExists checks if the error code indicates StateMachineInstanceNotExists
  133. func isInstanceNotExists(errCode string) bool {
  134. return errCode == fmt.Sprintf("%v", seataErrors.StateMachineInstanceNotExists)
  135. }