您最多选择25个标签 标签必须以中文、字母或数字开头,可以包含连字符 (-),并且长度不得超过35个字符

reghelper.go 9.6 kB

4 年前
4 年前
4 年前
4 年前
4 年前
4 年前
4 年前
4 年前
4 年前
4 年前
4 年前
4 年前
4 年前
4 年前
4 年前
4 年前
4 年前
4 年前
4 年前
4 年前
4 年前
4 年前
4 年前
4 年前
4 年前
4 年前
4 年前
4 年前
4 年前
4 年前
4 年前
4 年前
4 年前
4 年前
4 年前
4 年前
4 年前
4 年前
4 年前
4 年前
4 年前
4 年前
4 年前
4 年前
4 年前
4 年前
4 年前
4 年前
4 年前
4 年前
4 年前
4 年前
4 年前
4 年前
4 年前
4 年前
4 年前
4 年前
4 年前
4 年前
4 年前
4 年前
4 年前
4 年前
4 年前
4 年前
4 年前
4 年前
4 年前
4 年前
4 年前
4 年前
4 年前
4 年前
4 年前
4 年前
4 年前
4 年前
4 年前
4 年前
4 年前
4 年前
4 年前
4 年前
4 年前
4 年前
4 年前
4 年前
4 年前
4 年前
4 年前
4 年前
4 年前
4 年前
4 年前
4 年前
4 年前
4 年前
4 年前
4 年前
4 年前
4 年前
4 年前
4 年前
4 年前
4 年前
4 年前
4 年前
4 年前
4 年前
4 年前
4 年前
4 年前
4 年前
4 年前
4 年前
4 年前
4 年前
4 年前
4 年前
4 年前
4 年前
4 年前
4 年前
4 年前
4 年前
4 年前
4 年前
4 年前
4 年前
4 年前
4 年前
4 年前
4 年前
4 年前
4 年前
4 年前
4 年前
4 年前
4 年前
4 年前
4 年前
4 年前
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395
  1. package regworkerid
  2. import (
  3. "context"
  4. "fmt"
  5. "github.com/go-redis/redis/v8"
  6. "strconv"
  7. "sync"
  8. "time"
  9. )
  10. var _client *redis.Client
  11. var _ctx = context.Background()
  12. var _workerIdLock sync.Mutex
  13. var _workerIdList []int32 // 当前已注册的WorkerId
  14. var _loopCount = 0 // 循环数量
  15. var _lifeIndex = -1 // WorkerId本地生命时序(本地多次注册时,生命时序会不同)
  16. var _token = -1 // WorkerId远程注册时用的token,将存储在 IdGen:WorkerId:Value:xx 的值中(本功能暂未启用)
  17. var _WorkerIdLifeTimeSeconds = 15 // IdGen:WorkerId:Value:xx 的值在 redis 中的有效期(单位秒,最好是3的整数倍)
  18. var _MaxLoopCount = 10 // 最大循环次数(无可用WorkerId时循环查找)
  19. var _SleepMillisecondEveryLoop = 200 // 每次循环后,暂停时间
  20. var _MaxWorkerId int32 = 0 // 最大WorkerId值,超过此值从0开始
  21. var _RedisConnString = ""
  22. var _RedisPassword = ""
  23. const _WorkerIdIndexKey string = "IdGen:WorkerId:Index" // redis 中的key
  24. const _WorkerIdValueKeyPrefix string = "IdGen:WorkerId:Value:" // redis 中的key
  25. const _WorkerIdFlag = "Y" // IdGen:WorkerId:Value:xx 的值(将来可用 _token 替代)
  26. const _Log = false // 是否输出日志
  27. func Validate(workerId int32) int32 {
  28. for _, value := range _workerIdList {
  29. if value == workerId {
  30. return 1
  31. }
  32. }
  33. return 0
  34. //if workerId == _usingWorkerId {
  35. // return 0
  36. //} else {
  37. // return -1
  38. //}
  39. }
  40. func UnRegister() {
  41. _workerIdLock.Lock()
  42. _lifeIndex = -1
  43. for _, value := range _workerIdList {
  44. if value > -1 {
  45. _client.Del(_ctx, _WorkerIdValueKeyPrefix+strconv.Itoa(int(value)))
  46. }
  47. }
  48. _workerIdList = []int32{}
  49. _workerIdLock.Unlock()
  50. }
  51. func autoUnRegister() {
  52. // 如果当前已注册过 WorkerId,则先注销,并终止先前的自动续期线程
  53. if len(_workerIdList) > 0 {
  54. UnRegister()
  55. }
  56. }
  57. func RegisterMany(ip string, port int32, password string, maxWorkerId int32, totalCount int32) []int32 {
  58. if maxWorkerId < 0 {
  59. return []int32{-2}
  60. }
  61. if totalCount < 1 {
  62. return []int32{-1}
  63. }
  64. autoUnRegister()
  65. _MaxWorkerId = maxWorkerId
  66. _RedisConnString = ip + ":" + strconv.Itoa(int(port))
  67. _RedisPassword = password
  68. _client = newRedisClient()
  69. if _client == nil {
  70. return []int32{-1}
  71. }
  72. defer func() {
  73. if _client != nil {
  74. _ = _client.Close()
  75. }
  76. }()
  77. //_, err := _client.Ping(_ctx).Result()
  78. //if err != nil {
  79. // //panic("init redis error")
  80. // return []int{-3}
  81. //} else {
  82. // if _Log {
  83. // fmt.Println("init redis ok")
  84. // }
  85. //}
  86. _lifeIndex++
  87. _workerIdList = make([]int32, totalCount)
  88. for key := range _workerIdList {
  89. _workerIdList[key] = -1 // 全部初始化-1
  90. }
  91. useExtendFunc := false
  92. for key := range _workerIdList {
  93. id := register(_lifeIndex)
  94. if id > -1 {
  95. useExtendFunc = true
  96. _workerIdList[key] = id //= append(_workerIdList, id)
  97. } else {
  98. break
  99. }
  100. }
  101. if useExtendFunc {
  102. go extendLifeTime(_lifeIndex)
  103. }
  104. return _workerIdList
  105. }
  106. func RegisterOne(ip string, port int32, password string, maxWorkerId int32) int32 {
  107. if maxWorkerId < 0 {
  108. return -2
  109. }
  110. autoUnRegister()
  111. _MaxWorkerId = maxWorkerId
  112. _RedisConnString = ip + ":" + strconv.Itoa(int(port))
  113. _RedisPassword = password
  114. _loopCount = 0
  115. _client = newRedisClient()
  116. if _client == nil {
  117. return -3
  118. }
  119. defer func() {
  120. if _client != nil {
  121. _ = _client.Close()
  122. }
  123. }()
  124. //_, err := _client.Ping(_ctx).Result()
  125. //if err != nil {
  126. // // panic("init redis error")
  127. // return -3
  128. //} else {
  129. // if _Log {
  130. // fmt.Println("init redis ok")
  131. // }
  132. //}
  133. _lifeIndex++
  134. var id = register(_lifeIndex)
  135. if id > -1 {
  136. _workerIdList = []int32{id}
  137. go extendLifeTime(_lifeIndex)
  138. }
  139. return id
  140. }
  141. func register(lifeTime int) int32 {
  142. _loopCount = 0
  143. return getNextWorkerId(lifeTime)
  144. }
  145. func newRedisClient() *redis.Client {
  146. return redis.NewClient(&redis.Options{
  147. Addr: _RedisConnString,
  148. Password: _RedisPassword,
  149. DB: 0,
  150. //PoolSize: 1000,
  151. //ReadTimeout: time.Millisecond * time.Duration(100),
  152. //WriteTimeout: time.Millisecond * time.Duration(100),
  153. //IdleTimeout: time.Second * time.Duration(60),
  154. })
  155. }
  156. func getNextWorkerId(lifeTime int) int32 {
  157. // 获取当前 WorkerIdIndex
  158. r, err := _client.Incr(_ctx, _WorkerIdIndexKey).Result()
  159. if err != nil {
  160. return -1
  161. }
  162. candidateId := int32(r)
  163. if _Log {
  164. fmt.Println("Begin candidateId:" + strconv.Itoa(int(candidateId)))
  165. }
  166. // 如果 candidateId 大于最大值,则重置
  167. if candidateId > _MaxWorkerId {
  168. if canReset() {
  169. // 当前应用获得重置 WorkerIdIndex 的权限
  170. setWorkerIdIndex(-1)
  171. endReset() // 此步有可能不被执行?
  172. _loopCount++
  173. // 超过一定次数,直接终止操作
  174. if _loopCount > _MaxLoopCount {
  175. _loopCount = 0
  176. // 返回错误
  177. return -1
  178. }
  179. // 每次一个大循环后,暂停一些时间
  180. time.Sleep(time.Duration(_SleepMillisecondEveryLoop*_loopCount) * time.Millisecond)
  181. if _Log {
  182. fmt.Println("canReset loop")
  183. }
  184. return getNextWorkerId(lifeTime)
  185. } else {
  186. // 如果有其它应用正在编辑,则本应用暂停200ms后,再继续
  187. time.Sleep(time.Duration(200) * time.Millisecond)
  188. if _Log {
  189. fmt.Println("not canReset loop")
  190. }
  191. return getNextWorkerId(lifeTime)
  192. }
  193. }
  194. if _Log {
  195. fmt.Println("candidateId:" + strconv.Itoa(int(candidateId)))
  196. }
  197. if isAvailable(candidateId) {
  198. if _Log {
  199. fmt.Println("AA: isAvailable:" + strconv.Itoa(int(candidateId)))
  200. }
  201. // 最新获得的 WorkerIdIndex,在 redis 中是可用状态
  202. setWorkerIdFlag(candidateId)
  203. _loopCount = 0
  204. // 获取到可用 WorkerId 后,启用新线程,每隔 1/3个 _WorkerIdLifeTimeSeconds 时间,向服务器续期(延长一次 LifeTime)
  205. // go extendWorkerIdLifeTime(lifeTime, candidateId)
  206. return candidateId
  207. } else {
  208. if _Log {
  209. fmt.Println("BB: not isAvailable:" + strconv.Itoa(int(candidateId)))
  210. }
  211. // 最新获得的 WorkerIdIndex,在 redis 中是不可用状态,则继续下一个 WorkerIdIndex
  212. return getNextWorkerId(lifeTime)
  213. }
  214. }
  215. func extendLifeTime(lifeIndex int) {
  216. // 获取到可用 WorkerId 后,启用新线程,每隔 1/3个 _WorkerIdLifeTimeSeconds 时间,向服务器续期(延长一次 LifeTime)
  217. var myLifeIndex = lifeIndex
  218. // 循环操作:间隔一定时间,刷新 WorkerId 在 redis 中的有效时间。
  219. for {
  220. time.Sleep(time.Duration(_WorkerIdLifeTimeSeconds/3) * time.Second)
  221. // 上锁操作,防止跟 UnRegister 操作重叠
  222. _workerIdLock.Lock()
  223. // 如果临时变量 myLifeIndex 不等于 全局变量 _lifeIndex,表明全局状态被修改,当前线程可终止,不应继续操作 redis
  224. if myLifeIndex != _lifeIndex {
  225. break
  226. }
  227. // 已经被注销,则终止(此步是上一步的二次验证)
  228. if len(_workerIdList) < 1 {
  229. break
  230. }
  231. // 延长 redis 数据有效期
  232. for _, value := range _workerIdList {
  233. if value > -1 {
  234. extendWorkerIdFlag(value)
  235. }
  236. }
  237. _workerIdLock.Unlock()
  238. }
  239. }
  240. func extendWorkerIdLifeTime(lifeIndex int, workerId int32) {
  241. var myLifeIndex = lifeIndex
  242. var myWorkerId = workerId
  243. // 循环操作:间隔一定时间,刷新 WorkerId 在 redis 中的有效时间。
  244. for {
  245. time.Sleep(time.Duration(_WorkerIdLifeTimeSeconds/3) * time.Second)
  246. // 上锁操作,防止跟 UnRegister 操作重叠
  247. _workerIdLock.Lock()
  248. // 如果临时变量 myLifeIndex 不等于 全局变量 _lifeIndex,表明全局状态被修改,当前线程可终止,不应继续操作 redis
  249. if myLifeIndex != _lifeIndex {
  250. break
  251. }
  252. // 已经被注销,则终止(此步是上一步的二次验证)
  253. //if _usingWorkerId < 0 {
  254. // break
  255. //}
  256. // 延长 redis 数据有效期
  257. extendWorkerIdFlag(myWorkerId)
  258. _workerIdLock.Unlock()
  259. }
  260. }
  261. func get(key string) (string, bool) {
  262. r, err := _client.Get(_ctx, key).Result()
  263. if err != nil {
  264. return "", false
  265. }
  266. return r, true
  267. }
  268. func set(key string, val string, expTime int32) {
  269. _client.Set(_ctx, key, val, time.Duration(expTime)*time.Second)
  270. }
  271. func setWorkerIdIndex(val int) {
  272. _client.Set(_ctx, _WorkerIdIndexKey, val, 0)
  273. }
  274. func setWorkerIdFlag(workerId int32) {
  275. _client.Set(_ctx, _WorkerIdValueKeyPrefix+strconv.Itoa(int(workerId)), _WorkerIdFlag, time.Duration(_WorkerIdLifeTimeSeconds)*time.Second)
  276. }
  277. func extendWorkerIdFlag(workerId int32) {
  278. var client = newRedisClient()
  279. if client == nil {
  280. return
  281. }
  282. defer func() {
  283. if client != nil {
  284. _ = client.Close()
  285. }
  286. }()
  287. client.Expire(_ctx, _WorkerIdValueKeyPrefix+strconv.Itoa(int(workerId)), time.Duration(_WorkerIdLifeTimeSeconds)*time.Second)
  288. }
  289. func canReset() bool {
  290. r, err := _client.Incr(_ctx, _WorkerIdValueKeyPrefix+"Edit").Result()
  291. if err != nil {
  292. return false
  293. }
  294. if _Log {
  295. fmt.Println("canReset:" + strconv.Itoa(int(r)))
  296. }
  297. return r != 1
  298. }
  299. func endReset() {
  300. // _client.Set(_WorkerIdValueKeyPrefix+"Edit", 0, time.Duration(2)*time.Second)
  301. _client.Set(_ctx, _WorkerIdValueKeyPrefix+"Edit", 0, 0)
  302. }
  303. func getWorkerIdFlag(workerId int32) (string, bool) {
  304. r, err := _client.Get(_ctx, _WorkerIdValueKeyPrefix+strconv.Itoa(int(workerId))).Result()
  305. if err != nil {
  306. return "", false
  307. }
  308. return r, true
  309. }
  310. func isAvailable(workerId int32) bool {
  311. r, err := _client.Get(_ctx, _WorkerIdValueKeyPrefix+strconv.Itoa(int(workerId))).Result()
  312. if _Log {
  313. fmt.Println("XX isAvailable:" + r)
  314. fmt.Println("YY isAvailable:" + err.Error())
  315. }
  316. if err != nil {
  317. if err.Error() == "redis: nil" {
  318. return true
  319. }
  320. return false
  321. }
  322. return r != _WorkerIdFlag
  323. }