You can not select more than 25 topics Topics must start with a chinese character,a letter or number, can include dashes ('-') and can be up to 35 characters long.

db_transaction_store_manager.go 9.1 kB

4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
5 years ago
4 years ago
5 years ago
5 years ago
4 years ago
5 years ago
4 years ago
5 years ago
4 years ago
5 years ago
4 years ago
4 years ago
4 years ago
4 years ago
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225
  1. package holder
  2. import (
  3. "github.com/transaction-wg/seata-golang/pkg/base/meta"
  4. "github.com/transaction-wg/seata-golang/pkg/tc/model"
  5. "github.com/transaction-wg/seata-golang/pkg/tc/session"
  6. "github.com/transaction-wg/seata-golang/pkg/util/log"
  7. )
  8. type DBTransactionStoreManager struct {
  9. logQueryLimit int
  10. LogStore LogStore
  11. }
  12. func (storeManager *DBTransactionStoreManager) WriteSession(logOperation LogOperation, session session.SessionStorable) bool {
  13. if LogOperationGlobalAdd == logOperation {
  14. globalSession := convertGlobalTransactionDO(session)
  15. if globalSession != nil {
  16. storeManager.LogStore.InsertGlobalTransactionDO(*globalSession)
  17. }
  18. return true
  19. } else if LogOperationGlobalUpdate == logOperation {
  20. globalSession := convertGlobalTransactionDO(session)
  21. if globalSession != nil {
  22. storeManager.LogStore.UpdateGlobalTransactionDO(*globalSession)
  23. }
  24. return true
  25. } else if LogOperationGlobalRemove == logOperation {
  26. globalSession := convertGlobalTransactionDO(session)
  27. if globalSession != nil {
  28. storeManager.LogStore.DeleteGlobalTransactionDO(*globalSession)
  29. }
  30. return true
  31. } else if LogOperationBranchAdd == logOperation {
  32. branchSession := convertBranchTransactionDO(session)
  33. if branchSession != nil {
  34. storeManager.LogStore.InsertBranchTransactionDO(*branchSession)
  35. }
  36. return true
  37. } else if LogOperationBranchUpdate == logOperation {
  38. branchSession := convertBranchTransactionDO(session)
  39. if branchSession != nil {
  40. storeManager.LogStore.UpdateBranchTransactionDO(*branchSession)
  41. }
  42. return true
  43. } else if LogOperationBranchRemove == logOperation {
  44. branchSession := convertBranchTransactionDO(session)
  45. if branchSession != nil {
  46. storeManager.LogStore.DeleteBranchTransactionDO(*branchSession)
  47. }
  48. return true
  49. }
  50. log.Errorf("Unknown LogOperation:%v", logOperation)
  51. return false
  52. }
  53. func (storeManager *DBTransactionStoreManager) ReadSession(xid string) *session.GlobalSession {
  54. return storeManager.ReadSessionWithBranchSessions(xid, true)
  55. }
  56. func (storeManager *DBTransactionStoreManager) ReadSessionWithBranchSessions(xid string, withBranchSessions bool) *session.GlobalSession {
  57. globalTransactionDO := storeManager.LogStore.QueryGlobalTransactionDOByXID(xid)
  58. if globalTransactionDO == nil {
  59. return nil
  60. }
  61. var branchTransactionDOs []*model.BranchTransactionDO
  62. if withBranchSessions {
  63. branchTransactionDOs = storeManager.LogStore.QueryBranchTransactionDOByXID(globalTransactionDO.XID)
  64. }
  65. return getGlobalSession(globalTransactionDO, branchTransactionDOs)
  66. }
  67. func (storeManager *DBTransactionStoreManager) ReadSessionByTransactionID(transactionID int64) *session.GlobalSession {
  68. globalTransactionDO := storeManager.LogStore.QueryGlobalTransactionDOByTransactionID(transactionID)
  69. if globalTransactionDO == nil {
  70. return nil
  71. }
  72. branchTransactionDOs := storeManager.LogStore.QueryBranchTransactionDOByXID(globalTransactionDO.XID)
  73. return getGlobalSession(globalTransactionDO, branchTransactionDOs)
  74. }
  75. func (storeManager *DBTransactionStoreManager) readSessionByStatuses(statuses []meta.GlobalStatus) []*session.GlobalSession {
  76. states := make([]int, 0)
  77. for _, status := range statuses {
  78. states = append(states, int(status))
  79. }
  80. globalTransactionDOs := storeManager.LogStore.QueryGlobalTransactionDOByStatuses(states, storeManager.logQueryLimit)
  81. if globalTransactionDOs == nil || len(globalTransactionDOs) == 0 {
  82. return nil
  83. }
  84. xids := make([]string, 0)
  85. for _, globalTransactionDO := range globalTransactionDOs {
  86. xids = append(xids, globalTransactionDO.XID)
  87. }
  88. branchTransactionDOs := storeManager.LogStore.QueryBranchTransactionDOByXIDs(xids)
  89. branchTransactionMap := make(map[string][]*model.BranchTransactionDO)
  90. for _, branchTransactionDO := range branchTransactionDOs {
  91. branchTransactions, ok := branchTransactionMap[branchTransactionDO.XID]
  92. if ok {
  93. branchTransactions = append(branchTransactions, branchTransactionDO)
  94. branchTransactionMap[branchTransactionDO.XID] = branchTransactions
  95. } else {
  96. branchTransactions = make([]*model.BranchTransactionDO, 0)
  97. branchTransactions = append(branchTransactions, branchTransactionDO)
  98. branchTransactionMap[branchTransactionDO.XID] = branchTransactions
  99. }
  100. }
  101. globalSessions := make([]*session.GlobalSession, 0)
  102. for _, globalTransaction := range globalTransactionDOs {
  103. globalSession := getGlobalSession(globalTransaction, branchTransactionMap[globalTransaction.XID])
  104. globalSessions = append(globalSessions, globalSession)
  105. }
  106. return globalSessions
  107. }
  108. func (storeManager *DBTransactionStoreManager) ReadSessionWithSessionCondition(sessionCondition model.SessionCondition) []*session.GlobalSession {
  109. if sessionCondition.XID != "" {
  110. globalSession := storeManager.ReadSession(sessionCondition.XID)
  111. if globalSession != nil {
  112. globalSessions := make([]*session.GlobalSession, 0)
  113. globalSessions = append(globalSessions, globalSession)
  114. return globalSessions
  115. }
  116. } else if sessionCondition.TransactionID != 0 {
  117. globalSession := storeManager.ReadSessionByTransactionID(sessionCondition.TransactionID)
  118. if globalSession != nil {
  119. globalSessions := make([]*session.GlobalSession, 0)
  120. globalSessions = append(globalSessions, globalSession)
  121. return globalSessions
  122. }
  123. } else if sessionCondition.Statuses != nil && len(sessionCondition.Statuses) > 0 {
  124. return storeManager.readSessionByStatuses(sessionCondition.Statuses)
  125. }
  126. return nil
  127. }
  128. func (storeManager *DBTransactionStoreManager) Shutdown() {
  129. }
  130. func getGlobalSession(globalTransactionDO *model.GlobalTransactionDO, branchTransactionDOs []*model.BranchTransactionDO) *session.GlobalSession {
  131. globalSession := convertGlobalTransaction(globalTransactionDO)
  132. if branchTransactionDOs != nil && len(branchTransactionDOs) > 0 {
  133. for _, branchTransactionDO := range branchTransactionDOs {
  134. globalSession.Add(convertBranchSession(branchTransactionDO))
  135. }
  136. }
  137. return globalSession
  138. }
  139. func convertGlobalTransaction(globalTransactionDO *model.GlobalTransactionDO) *session.GlobalSession {
  140. globalSession := session.NewGlobalSession(
  141. session.WithGsXID(globalTransactionDO.XID),
  142. session.WithGsApplicationID(globalTransactionDO.ApplicationID),
  143. session.WithGsTransactionID(globalTransactionDO.TransactionID),
  144. session.WithGsTransactionName(globalTransactionDO.TransactionName),
  145. session.WithGsTransactionServiceGroup(globalTransactionDO.TransactionServiceGroup),
  146. session.WithGsStatus(meta.GlobalStatus(globalTransactionDO.Status)),
  147. session.WithGsTimeout(globalTransactionDO.Timeout),
  148. session.WithGsBeginTime(globalTransactionDO.BeginTime),
  149. session.WithGsApplicationData(globalTransactionDO.ApplicationData),
  150. )
  151. return globalSession
  152. }
  153. func convertBranchSession(branchTransactionDO *model.BranchTransactionDO) *session.BranchSession {
  154. branchSession := session.NewBranchSession(
  155. session.WithBsXid(branchTransactionDO.XID),
  156. session.WithBsTransactionID(branchTransactionDO.TransactionID),
  157. session.WithBsApplicationData(branchTransactionDO.ApplicationData),
  158. session.WithBsBranchID(branchTransactionDO.BranchID),
  159. session.WithBsBranchType(meta.ValueOfBranchType(branchTransactionDO.BranchType)),
  160. session.WithBsResourceID(branchTransactionDO.ResourceID),
  161. session.WithBsClientID(branchTransactionDO.ClientID),
  162. session.WithBsResourceGroupID(branchTransactionDO.ResourceGroupID),
  163. session.WithBsStatus(meta.BranchStatus(branchTransactionDO.Status)),
  164. )
  165. return branchSession
  166. }
  167. func convertGlobalTransactionDO(sessionStorable session.SessionStorable) *model.GlobalTransactionDO {
  168. globalSession, ok := sessionStorable.(*session.GlobalSession)
  169. if sessionStorable == nil || !ok {
  170. log.Errorf("the parameter of SessionStorable is not available, SessionStorable:%v", sessionStorable)
  171. return nil
  172. }
  173. globalTransactionDO := &model.GlobalTransactionDO{
  174. XID: globalSession.XID,
  175. TransactionID: globalSession.TransactionID,
  176. Status: int32(globalSession.Status),
  177. ApplicationID: globalSession.ApplicationID,
  178. TransactionServiceGroup: globalSession.TransactionServiceGroup,
  179. TransactionName: globalSession.TransactionName,
  180. Timeout: globalSession.Timeout,
  181. BeginTime: globalSession.BeginTime,
  182. ApplicationData: globalSession.ApplicationData,
  183. }
  184. return globalTransactionDO
  185. }
  186. func convertBranchTransactionDO(sessionStorable session.SessionStorable) *model.BranchTransactionDO {
  187. branchSession, ok := sessionStorable.(*session.BranchSession)
  188. if sessionStorable == nil || !ok {
  189. log.Errorf("the parameter of SessionStorable is not available, SessionStorable:%v", sessionStorable)
  190. return nil
  191. }
  192. branchSessionDO := &model.BranchTransactionDO{
  193. XID: branchSession.XID,
  194. TransactionID: branchSession.TransactionID,
  195. BranchID: branchSession.BranchID,
  196. ResourceGroupID: branchSession.ResourceGroupID,
  197. ResourceID: branchSession.ResourceID,
  198. BranchType: branchSession.BranchType.String(),
  199. Status: int32(branchSession.Status),
  200. ClientID: branchSession.ClientID,
  201. ApplicationData: branchSession.ApplicationData,
  202. }
  203. return branchSessionDO
  204. }