|
- package regworkerid
-
- import (
- "context"
- "fmt"
- "github.com/go-redis/redis/v8"
- "strconv"
- "sync"
- "time"
- )
-
- var _client *redis.Client
- var _ctx = context.Background()
- var _workerIdLock sync.Mutex
-
- var _usingWorkerId int = -1 // 当前已注册的WorkerId
- var _loopCount int = 0 // 循环数量
- var _lifeIndex int = -1 // WorkerId本地生命时序(本地多次注册时,生命时序会不同)
- var _token int = -1 // WorkerId远程注册时用的token,将存储在 IdGen:WorkerId:Value:xx 的值中(本功能暂未启用)
-
- var _WorkerIdLifeTimeSeconds = 15 // IdGen:WorkerId:Value:xx 的值在 redis 中的有效期(单位秒,最好是3的整数倍)
- var _MaxLoopCount = 10 // 最大循环次数(无可用WorkerId时循环查找)
- var _SleepMillisecondEveryLoop = 200 // 每次循环后,暂停时间
- var _MaxWorkerId int = 0 // 最大WorkerId值,超过此值从0开始
-
- const _WorkerIdIndexKey string = "IdGen:WorkerId:Index" // redis 中的key
- const _WorkerIdValueKeyPrefix string = "IdGen:WorkerId:Value:" // redis 中的key
- const _WorkerIdFlag = "Y" // IdGen:WorkerId:Value:xx 的值(将来可用 _token 替代)
- const _Log = false // 是否输出日志
-
- func ValidateLocalWorkerId(workerId int) int {
- if workerId == _usingWorkerId {
- return 0
- } else {
- return -1
- }
- }
-
- func UnRegisterWorkerId() {
- if _usingWorkerId < 0 {
- return
- }
-
- _workerIdLock.Lock()
- _client.Del(_ctx, _WorkerIdValueKeyPrefix+strconv.Itoa(_usingWorkerId))
- _usingWorkerId = -1
- _lifeIndex = -1
- _workerIdLock.Unlock()
-
- _client.Quit(_ctx)
- }
-
- func RegisterWorkerId(ip string, port int, password string, maxWorkerId int) int {
- // maxWorkerId不能小于0
- if maxWorkerId < 0 {
- return -1
- }
-
- // 如果当前已注册过 WorkerId,则先注销,并终止先前的自动续期线程
- if _usingWorkerId > -1 {
- UnRegisterWorkerId()
- }
-
- _MaxWorkerId = maxWorkerId
- _client = redis.NewClient(&redis.Options{
- Addr: ip + ":" + strconv.Itoa(port),
- Password: password,
- DB: 0,
- //PoolSize: 1000,
- //ReadTimeout: time.Millisecond * time.Duration(100),
- //WriteTimeout: time.Millisecond * time.Duration(100),
- //IdleTimeout: time.Second * time.Duration(60),
- })
-
- _, err := _client.Ping(_ctx).Result()
- if err != nil {
- panic("init redis error")
- } else {
- if _Log {
- fmt.Println("init redis ok")
- }
- }
-
- _loopCount = 0
- return getNextWorkerId()
- }
-
- func getNextWorkerId() int {
- // 获取当前 WorkerIdIndex
- r, err := _client.Incr(_ctx, _WorkerIdIndexKey).Result()
- if err != nil {
- return 0
- }
-
- candidateId := int(r)
- if _Log {
- fmt.Println("Begin candidateId:" + strconv.Itoa(candidateId))
- }
-
- // 如果 candidateId 大于最大值,则重置
- if candidateId > _MaxWorkerId {
- if canReset() {
- // 当前应用获得重置 WorkerIdIndex 的权限
- setWorkerIdIndex(-1)
- endReset() // 此步有可能不被执行?
- _loopCount++
- // 超过一定次数,直接终止操作
- if _loopCount > _MaxLoopCount {
- _loopCount = 0
- return -1
- }
-
- // 每次一个大循环后,暂停一些时间
- time.Sleep(time.Duration(_SleepMillisecondEveryLoop*_loopCount) * time.Millisecond)
-
- if _Log {
- fmt.Println("canReset loop")
- }
-
- return getNextWorkerId()
- } else {
- // 如果有其它应用正在编辑,则本应用暂停200ms后,再继续
- time.Sleep(time.Duration(200) * time.Millisecond)
-
- if _Log {
- fmt.Println("not canReset loop")
- }
-
- return getNextWorkerId()
- }
- }
-
- if _Log {
- fmt.Println("candidateId:" + strconv.Itoa(candidateId))
- }
-
- if isAvailable(candidateId) {
- if _Log {
- fmt.Println("AA: isAvailable:" + strconv.Itoa(candidateId))
- }
-
- // 最新获得的 WorkerIdIndex,在 redis 中是可用状态
- setWorkerIdFlag(candidateId)
- _usingWorkerId = candidateId
- _loopCount = 0
-
- // 获取到可用 WorkerId 后,启用新线程,每隔 1/3个 _WorkerIdLifeTimeSeconds 时间,向服务器续期(延长一次 LifeTime)
- _lifeIndex++
- go extendWorkerIdLifeTime(_lifeIndex)
-
- return candidateId
- } else {
- if _Log {
- fmt.Println("BB: not isAvailable:" + strconv.Itoa(candidateId))
- }
- // 最新获得的 WorkerIdIndex,在 redis 中是不可用状态,则继续下一个 WorkerIdIndex
- return getNextWorkerId()
- }
- }
-
- func extendWorkerIdLifeTime(lifeIndex int) {
- var myLifeIndex = lifeIndex
-
- // 循环操作:间隔一定时间,刷新 WorkerId 在 redis 中的有效时间。
- for {
- time.Sleep(time.Duration(_WorkerIdLifeTimeSeconds/3) * time.Millisecond)
-
- // 上锁操作,防止跟 UnRegisterWorkerId 操作重叠
- _workerIdLock.Lock()
-
- // 如果临时变量 myLifeIndex 不等于 全局变量 _lifeIndex,表明全局状态被修改,当前线程可终止,不应继续操作 redis
- if myLifeIndex != _lifeIndex {
- break
- }
-
- // 已经被注销,则终止(此步是上一步的二次验证)
- if _usingWorkerId < 0 {
- break
- }
-
- // 延长 redis 数据有效期
- extendWorkerIdFlag(_usingWorkerId)
-
- _workerIdLock.Unlock()
- }
- }
-
- func get(key string) (string, bool) {
- r, err := _client.Get(_ctx, key).Result()
- if err != nil {
- return "", false
- }
- return r, true
- }
-
- func set(key string, val string, expTime int32) {
- _client.Set(_ctx, key, val, time.Duration(expTime)*time.Second)
- }
-
- func setWorkerIdIndex(val int) {
- _client.Set(_ctx, _WorkerIdIndexKey, val, 0)
- }
-
- func setWorkerIdFlag(index int) {
- _client.Set(_ctx, _WorkerIdValueKeyPrefix+strconv.Itoa(index), _WorkerIdFlag, time.Duration(_WorkerIdLifeTimeSeconds)*time.Second)
- }
-
- func extendWorkerIdFlag(index int) {
- _client.Expire(_ctx, _WorkerIdValueKeyPrefix+strconv.Itoa(index), time.Duration(_WorkerIdLifeTimeSeconds)*time.Second)
- }
-
- func canReset() bool {
- r, err := _client.Incr(_ctx, _WorkerIdValueKeyPrefix+"Edit").Result()
- if err != nil {
- return false
- }
-
- if _Log {
- fmt.Println("canReset:" + string(r))
- }
-
- return r != 1
- }
-
- func endReset() {
- // _client.Set(_WorkerIdValueKeyPrefix+"Edit", 0, time.Duration(2)*time.Second)
- _client.Set(_ctx, _WorkerIdValueKeyPrefix+"Edit", 0, 0)
- }
-
- func getWorkerIdFlag(index int) (string, bool) {
- r, err := _client.Get(_ctx, _WorkerIdValueKeyPrefix+strconv.Itoa(index)).Result()
- if err != nil {
- return "", false
- }
- return r, true
- }
-
- func isAvailable(index int) bool {
- r, err := _client.Get(_ctx, _WorkerIdValueKeyPrefix+strconv.Itoa(index)).Result()
-
- if _Log {
- fmt.Println("XX isAvailable:" + r)
- fmt.Println("YY isAvailable:" + err.Error())
- }
-
- if err != nil {
- if err.Error() == "redis: nil" {
- return true
- }
- return false
- }
-
- return r != _WorkerIdFlag
- }
|