You can not select more than 25 topics Topics must start with a chinese character,a letter or number, can include dashes ('-') and can be up to 35 characters long.

rpc_context.go 3.6 kB

5 years ago
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136
  1. package server
  2. import (
  3. "errors"
  4. "github.com/dubbogo/getty"
  5. "github.com/dk-lockdown/seata-golang/meta"
  6. "github.com/dk-lockdown/seata-golang/model"
  7. "strconv"
  8. "strings"
  9. "sync"
  10. )
  11. const IpPortSplitChar = ":"
  12. type RpcContext struct {
  13. ClientRole meta.TransactionRole
  14. Version string
  15. ApplicationId string
  16. TransactionServiceGroup string
  17. ClientId string
  18. session getty.Session
  19. ResourceSets *model.Set
  20. /**
  21. * <getty.Session,*RpcContext>
  22. */
  23. ClientIDHolderMap *sync.Map
  24. /**
  25. * <int,RpcContext>
  26. */
  27. ClientTMHolderMap *sync.Map
  28. /**
  29. * resourceId -> int -> RpcContext>
  30. */
  31. ClientRMHolderMap *sync.Map
  32. }
  33. func (context *RpcContext) Release() {
  34. clientPort := getClientPortFromGettySession(context.session)
  35. if context.ClientIDHolderMap != nil {
  36. context.ClientIDHolderMap = nil
  37. }
  38. if context.ClientRole == meta.TMROLE && context.ClientTMHolderMap != nil {
  39. context.ClientTMHolderMap.Delete(clientPort)
  40. context.ClientTMHolderMap = nil
  41. }
  42. if context.ClientRole == meta.RMROLE && context.ClientRMHolderMap != nil {
  43. context.ClientRMHolderMap.Range(func (key interface{}, value interface{}) bool {
  44. m := value.(*sync.Map)
  45. m.Delete(clientPort)
  46. return true
  47. })
  48. context.ClientRMHolderMap = nil
  49. }
  50. if context.ResourceSets != nil {
  51. context.ResourceSets.Clear()
  52. }
  53. }
  54. func (context *RpcContext) HoldInClientGettySessions(clientTMHolderMap *sync.Map) error {
  55. if context.ClientTMHolderMap != nil {
  56. return errors.New("illegal state")
  57. }
  58. context.ClientTMHolderMap = clientTMHolderMap
  59. clientPort := getClientPortFromGettySession(context.session)
  60. context.ClientTMHolderMap.Store(clientPort,context)
  61. return nil
  62. }
  63. func (context *RpcContext) HoldInIdentifiedGettySessions(clientIDHolderMap *sync.Map) error {
  64. if context.ClientIDHolderMap != nil {
  65. return errors.New("illegal state")
  66. }
  67. context.ClientIDHolderMap = clientIDHolderMap
  68. context.ClientIDHolderMap.Store(context.session,context)
  69. return nil
  70. }
  71. func (context *RpcContext) HoldInResourceManagerGettySessions(resourceId string,portMap *sync.Map) {
  72. if context.ClientRMHolderMap == nil {
  73. context.ClientRMHolderMap = &sync.Map{}
  74. }
  75. clientPort := getClientPortFromGettySession(context.session)
  76. portMap.Store(clientPort,context)
  77. context.ClientRMHolderMap.Store(resourceId,portMap)
  78. }
  79. func (context *RpcContext) HoldInResourceManagerGettySessionsWithoutPortMap(resourceId string,clientPort int) {
  80. if context.ClientRMHolderMap == nil {
  81. context.ClientRMHolderMap = &sync.Map{}
  82. }
  83. portMap,_ := context.ClientRMHolderMap.LoadOrStore(resourceId,&sync.Map{})
  84. pm := portMap.(*sync.Map)
  85. pm.Store(clientPort,context)
  86. }
  87. func (context *RpcContext) AddResource(resource string) {
  88. if resource != "" {
  89. if context.ResourceSets == nil {
  90. context.ResourceSets = model.NewSet()
  91. }
  92. context.ResourceSets.Add(resource)
  93. }
  94. }
  95. func (context *RpcContext) AddResources(resources *model.Set) {
  96. if resources != nil {
  97. if context.ResourceSets == nil {
  98. context.ResourceSets = model.NewSet()
  99. }
  100. for _,resource := range resources.List() {
  101. context.ResourceSets.Add(resource)
  102. }
  103. }
  104. }
  105. func getClientIpFromGettySession(session getty.Session) string {
  106. clientIp := session.RemoteAddr()
  107. if strings.Contains(clientIp,IpPortSplitChar) {
  108. idx := strings.Index(clientIp,IpPortSplitChar)
  109. clientIp = clientIp[:idx]
  110. }
  111. return clientIp
  112. }
  113. func getClientPortFromGettySession(session getty.Session) int {
  114. address := session.RemoteAddr()
  115. port := 0
  116. if strings.Contains(address,IpPortSplitChar) {
  117. idx := strings.LastIndex(address,IpPortSplitChar)
  118. port,_ = strconv.Atoi(address[idx+1:])
  119. }
  120. return port
  121. }

Go Implementation For Seata