You can not select more than 25 topics Topics must start with a chinese character,a letter or number, can include dashes ('-') and can be up to 35 characters long.

getty_session_manager.go 12 kB

5 years ago
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372
  1. package server
  2. import (
  3. "github.com/pkg/errors"
  4. "github.com/dubbogo/getty"
  5. "github.com/dk-lockdown/seata-golang/logging"
  6. "github.com/dk-lockdown/seata-golang/meta"
  7. "github.com/dk-lockdown/seata-golang/model"
  8. "github.com/dk-lockdown/seata-golang/protocal"
  9. "strconv"
  10. "strings"
  11. "sync"
  12. )
  13. var (
  14. /**
  15. * resourceId -> applicationId -> ip -> port -> RpcContext
  16. */
  17. rm_sessions = sync.Map{}
  18. /**
  19. * ip+appname -> port -> RpcContext
  20. */
  21. tm_sessions = sync.Map{}
  22. )
  23. const (
  24. ClientIdSplitChar = ":"
  25. DbkeysSplitChar = ","
  26. )
  27. type GettySessionManager struct {
  28. IdentifiedSessions *sync.Map
  29. }
  30. var SessionManager GettySessionManager
  31. func init() {
  32. SessionManager = GettySessionManager{IdentifiedSessions:&sync.Map{}}
  33. }
  34. func (manager *GettySessionManager) IsRegistered(session getty.Session) bool {
  35. _,ok := manager.IdentifiedSessions.Load(session)
  36. return ok
  37. }
  38. func (manager *GettySessionManager) GetRoleFromGettySession(session getty.Session) meta.TransactionRole {
  39. context, ok := manager.IdentifiedSessions.Load(session)
  40. if ok {
  41. return context.(*RpcContext).ClientRole
  42. }
  43. return 0
  44. }
  45. func (manager *GettySessionManager) GetContextFromIdentified(session getty.Session) *RpcContext {
  46. context, ok := manager.IdentifiedSessions.Load(session)
  47. if ok {
  48. rpcContext := context.(*RpcContext)
  49. return rpcContext
  50. }
  51. return nil
  52. }
  53. func (manager *GettySessionManager) RegisterTmGettySession(request protocal.RegisterTMRequest,session getty.Session) {
  54. //todo check version
  55. rpcContext := buildGettySessionHolder(meta.TMROLE,request.Version,request.ApplicationId,request.TransactionServiceGroup,"",session)
  56. rpcContext.HoldInIdentifiedGettySessions(manager.IdentifiedSessions)
  57. clientIdentified := rpcContext.ApplicationId + ClientIdSplitChar + getClientIpFromGettySession(session)
  58. clientIdentifiedMap,_ := tm_sessions.LoadOrStore(clientIdentified,&sync.Map{})
  59. cMap := clientIdentifiedMap.(*sync.Map)
  60. rpcContext.HoldInClientGettySessions(cMap)
  61. }
  62. func (manager *GettySessionManager) RegisterRmGettySession(resourceManagerRequest protocal.RegisterRMRequest,session getty.Session){
  63. //todo check version
  64. var rpcContext *RpcContext
  65. dbKeySet := dbKeyToSet(resourceManagerRequest.ResourceIds)
  66. context,ok := manager.IdentifiedSessions.Load(session)
  67. if ok {
  68. rpcContext = context.(*RpcContext)
  69. rpcContext.AddResources(dbKeySet)
  70. } else {
  71. rpcContext = buildGettySessionHolder(meta.RMROLE,resourceManagerRequest.Version,resourceManagerRequest.ApplicationId,
  72. resourceManagerRequest.TransactionServiceGroup,resourceManagerRequest.ResourceIds,session)
  73. rpcContext.HoldInIdentifiedGettySessions(manager.IdentifiedSessions)
  74. }
  75. if dbKeySet == nil || dbKeySet.IsEmpty() { return }
  76. for _,resourceId := range dbKeySet.List() {
  77. applicationMap,_ := rm_sessions.LoadOrStore(resourceId,&sync.Map{})
  78. aMap,_ := applicationMap.(*sync.Map)
  79. ipMap,_ := aMap.LoadOrStore(resourceManagerRequest.ApplicationId,&sync.Map{})
  80. iMap,_ := ipMap.(*sync.Map)
  81. clientIp := getClientIpFromGettySession(session)
  82. portMap,_ := iMap.LoadOrStore(clientIp,&sync.Map{})
  83. pMap,_ := portMap.(*sync.Map)
  84. rpcContext.HoldInResourceManagerGettySessions(resourceId,pMap)
  85. // 老实讲,我不知道为什么要写这么一个方法,双重保证?
  86. manager.updateGettySessionsResource(resourceId,clientIp,resourceManagerRequest.ApplicationId)
  87. }
  88. }
  89. func (manager *GettySessionManager) updateGettySessionsResource(resourceId string,clientIp string,applicationId string) {
  90. applicationMap,_ := rm_sessions.Load(resourceId)
  91. aMap,_ := applicationMap.(*sync.Map)
  92. ipMap,_ := aMap.Load(applicationId)
  93. iMap,_ := ipMap.(*sync.Map)
  94. portMap,_ := iMap.Load(clientIp)
  95. pMap,_ := portMap.(*sync.Map)
  96. rm_sessions.Range(func (key interface{},value interface{}) bool {
  97. resourceKey,ok := key.(string)
  98. if ok && resourceKey != resourceId {
  99. appMap,_ := value.(*sync.Map)
  100. clientIpMap,clientIpMapLoaded := appMap.Load(applicationId)
  101. if clientIpMapLoaded {
  102. cipMap,_ := clientIpMap.(*sync.Map)
  103. clientPortMap, clientPortMapLoaded := cipMap.Load(clientIp)
  104. if clientPortMapLoaded {
  105. cpMap := clientPortMap.(*sync.Map)
  106. cpMap.Range(func (key interface{},value interface{}) bool{
  107. port,_ := key.(int)
  108. rpcContext,_ := value.(*RpcContext)
  109. _, ok := pMap.LoadOrStore(port,rpcContext)
  110. if ok {
  111. rpcContext.HoldInResourceManagerGettySessionsWithoutPortMap(resourceId,port)
  112. }
  113. return true
  114. })
  115. }
  116. }
  117. }
  118. return true
  119. })
  120. }
  121. func (manager *GettySessionManager) GetSameClientGettySession(session getty.Session) getty.Session {
  122. if !session.IsClosed() {
  123. return session
  124. }
  125. rpcContext := manager.GetContextFromIdentified(session)
  126. if rpcContext == nil {
  127. logging.Logger.Errorf("rpcContext is null,channel:{%v},active:{%t}",session,!session.IsClosed())
  128. }
  129. if !rpcContext.session.IsClosed() {
  130. return rpcContext.session
  131. }
  132. clientPort := getClientPortFromGettySession(session)
  133. if rpcContext.ClientRole == meta.TMROLE {
  134. clientIdentified := rpcContext.ApplicationId + ClientIdSplitChar + getClientIpFromGettySession(session)
  135. clientRpcMap, ok := tm_sessions.Load(clientIdentified)
  136. if !ok {
  137. return nil
  138. }
  139. clientMap := clientRpcMap.(*sync.Map)
  140. return getGettySessionFromSameClientMap(clientMap,clientPort)
  141. } else if rpcContext.ClientRole == meta.RMROLE {
  142. var sameClientSession getty.Session
  143. rpcContext.ClientRMHolderMap.Range(func (key interface{},value interface{}) bool {
  144. clientRmMap := value.(*sync.Map)
  145. sameClientSession = getGettySessionFromSameClientMap(clientRmMap,clientPort)
  146. if sameClientSession != nil {
  147. return false
  148. }
  149. return true
  150. })
  151. return sameClientSession
  152. }
  153. return nil
  154. }
  155. func getGettySessionFromSameClientMap(clientGettySessionMap *sync.Map,exclusivePort int) getty.Session {
  156. var session getty.Session
  157. if clientGettySessionMap != nil {
  158. clientGettySessionMap.Range(func (key interface{},value interface{}) bool {
  159. port,ok := key.(int)
  160. if ok {
  161. if port == exclusivePort {
  162. clientGettySessionMap.Delete(key)
  163. return true
  164. }
  165. }
  166. context := value.(*RpcContext)
  167. session = context.session
  168. if !session.IsClosed() {
  169. return false
  170. }
  171. clientGettySessionMap.Delete(key)
  172. return true
  173. })
  174. }
  175. return session
  176. }
  177. func (manager *GettySessionManager) GetGettySession(resourceId string,clientId string) (getty.Session,error) {
  178. var resultSession getty.Session
  179. clientIdInfo := strings.Split(clientId,ClientIdSplitChar)
  180. if clientIdInfo == nil || len(clientIdInfo) != 3 {
  181. return nil,errors.Errorf("Invalid Client ID:%d",clientId)
  182. }
  183. targetApplicationId := clientIdInfo[0]
  184. targetIP := clientIdInfo[1]
  185. targetPort,_ := strconv.Atoi(clientIdInfo[2])
  186. applicationMap,ok := rm_sessions.Load(resourceId)
  187. if targetApplicationId == "" || !ok || applicationMap == nil {
  188. logging.Logger.Infof("No channel is available for resource[%s]",resourceId)
  189. }
  190. appMap,_ := applicationMap.(*sync.Map)
  191. clientIpMap,clientIpMapLoaded := appMap.Load(targetApplicationId)
  192. if clientIpMapLoaded {
  193. ipMap,_ := clientIpMap.(*sync.Map)
  194. portMap,portMapLoaded := ipMap.Load(targetIP)
  195. if portMapLoaded {
  196. pMap,_ := portMap.(*sync.Map)
  197. context,contextLoaded := pMap.Load(targetPort)
  198. // Firstly, try to find the original channel through which the branch was registered.
  199. if contextLoaded {
  200. rpcContext := context.(*RpcContext)
  201. if !rpcContext.session.IsClosed() {
  202. resultSession = rpcContext.session
  203. logging.Logger.Debugf("Just got exactly the one %v for %s",rpcContext.session,clientId)
  204. } else {
  205. pMap.Delete(targetPort)
  206. logging.Logger.Infof("Removed inactive %d",rpcContext.session)
  207. }
  208. }
  209. // The original channel was broken, try another one.
  210. if resultSession == nil {
  211. pMap.Range(func (key interface{},value interface{}) bool {
  212. rpcContext := value.(*RpcContext)
  213. if !rpcContext.session.IsClosed() {
  214. resultSession = rpcContext.session
  215. logging.Logger.Infof("Choose %v on the same IP[%s] as alternative of %s",rpcContext.session,targetIP,clientId)
  216. //跳出 range 循环
  217. return false
  218. } else {
  219. pMap.Delete(key)
  220. logging.Logger.Infof("Removed inactive %d",rpcContext.session)
  221. }
  222. return true
  223. })
  224. }
  225. }
  226. // No channel on the this app node, try another one.
  227. if resultSession == nil {
  228. ipMap.Range(func (key interface{},value interface{}) bool {
  229. ip := key.(string)
  230. if ip == targetIP { return true }
  231. portMapOnOtherIP,_ := value.(*sync.Map)
  232. if portMapOnOtherIP == nil { return true }
  233. portMapOnOtherIP.Range(func (key interface{},value interface {}) bool {
  234. rpcContext := value.(*RpcContext)
  235. if !rpcContext.session.IsClosed() {
  236. resultSession = rpcContext.session
  237. logging.Logger.Infof("Choose %v on the same application[%s] as alternative of %s",rpcContext.session,targetApplicationId,clientId)
  238. //跳出 range 循环
  239. return false
  240. } else {
  241. portMapOnOtherIP.Delete(key)
  242. logging.Logger.Infof("Removed inactive %d",rpcContext.session)
  243. }
  244. return true
  245. })
  246. if resultSession != nil { return false }
  247. return true
  248. })
  249. }
  250. }
  251. if resultSession == nil {
  252. resultSession = tryOtherApp(appMap,targetApplicationId)
  253. if resultSession == nil {
  254. logging.Logger.Infof("No channel is available for resource[%s] as alternative of %s",resourceId,clientId)
  255. } else {
  256. logging.Logger.Infof("Choose %v on the same resource[%s] as alternative of %s", resultSession, resourceId, clientId)
  257. }
  258. }
  259. return resultSession,nil
  260. }
  261. func tryOtherApp(applicationMap *sync.Map,myApplicationId string) getty.Session {
  262. var chosenChannel getty.Session
  263. applicationMap.Range(func (key interface{},value interface{}) bool {
  264. applicationId := key.(string)
  265. if myApplicationId != "" && applicationId == myApplicationId {return true}
  266. targetIPMap,_ := value.(*sync.Map)
  267. targetIPMap.Range(func (key interface{},value interface{}) bool {
  268. if value == nil { return true }
  269. portMap,_ := value.(*sync.Map)
  270. portMap.Range(func (key interface{},value interface{}) bool {
  271. rpcContext := value.(*RpcContext)
  272. if !rpcContext.session.IsClosed() {
  273. chosenChannel = rpcContext.session
  274. return false
  275. } else {
  276. portMap.Delete(key)
  277. logging.Logger.Infof("Removed inactive %d",rpcContext.session)
  278. }
  279. return true
  280. })
  281. if chosenChannel != nil { return false }
  282. return true
  283. })
  284. if chosenChannel != nil { return false }
  285. return true
  286. })
  287. return chosenChannel
  288. }
  289. func buildGettySessionHolder(role meta.TransactionRole,version string,applicationId string,
  290. txServiceGroup string,dbKeys string,session getty.Session) *RpcContext {
  291. return &RpcContext{
  292. ClientRole: role,
  293. Version: version,
  294. ApplicationId: applicationId,
  295. TransactionServiceGroup: txServiceGroup,
  296. ClientId: buildClientId(applicationId,session),
  297. session: session,
  298. ResourceSets: dbKeyToSet(dbKeys),
  299. }
  300. }
  301. func dbKeyToSet(dbKey string) *model.Set {
  302. if dbKey == "" {
  303. return nil
  304. }
  305. keys := strings.Split(dbKey,DbkeysSplitChar)
  306. set := model.NewSet()
  307. for _,key := range keys {
  308. set.Add(key)
  309. }
  310. return set
  311. }
  312. func buildClientId(applicationId string, session getty.Session) string {
  313. return applicationId + ClientIdSplitChar + session.RemoteAddr()
  314. }
  315. func (manager *GettySessionManager) GetRmSessions() map[string]getty.Session {
  316. sessions := make(map[string]getty.Session)
  317. rm_sessions.Range(func (key interface{},value interface{}) bool {
  318. resourceId,_ := key.(string)
  319. applicationMap := value.(*sync.Map)
  320. session := tryOtherApp(applicationMap,"")
  321. if session == nil {
  322. return false
  323. }
  324. sessions[resourceId] = session
  325. return true
  326. })
  327. return sessions
  328. }

Go Implementation For Seata