From 041f87dd6e281ad1febce70c4db34be30cb543f3 Mon Sep 17 00:00:00 2001 From: yitter Date: Thu, 1 Apr 2021 16:50:17 +0800 Subject: [PATCH] recodeGo --- Go/source/regworkerid/reghelper.go | 44 ++++++++++++++++++------------ 1 file changed, 26 insertions(+), 18 deletions(-) diff --git a/Go/source/regworkerid/reghelper.go b/Go/source/regworkerid/reghelper.go index 09485dc..e628a2f 100644 --- a/Go/source/regworkerid/reghelper.go +++ b/Go/source/regworkerid/reghelper.go @@ -13,7 +13,7 @@ var _workerIdLock sync.Mutex var _usingWorkerId int = -1 // 当前已注册的WorkerId var _loopCount int = 0 // 循环数量 -var _liftIndex int = -1 // WorkerId本地生命时序(本地多次注册时,生命时序会不同) +var _lifeIndex int = -1 // WorkerId本地生命时序(本地多次注册时,生命时序会不同) var _token int = -1 // WorkerId远程注册时用的token,将存储在 IdGen:WorkerId:Value:xx 的值中(本功能暂未启用) var _WorkerIdLifeTimeSeconds = 15 // IdGen:WorkerId:Value:xx 的值在 redis 中的有效期(单位秒,最好是3的整数倍) @@ -38,10 +38,11 @@ func UnRegisterWorkerId() { if _usingWorkerId < 0 { return } + _workerIdLock.Lock() _client.Del(_WorkerIdValueKeyPrefix + strconv.Itoa(_usingWorkerId)) _usingWorkerId = -1 - _liftIndex = -1 + _lifeIndex = -1 _workerIdLock.Unlock() } @@ -85,13 +86,13 @@ func getNextWorkerId() int { return 0 } - currentId := int(r) + candidateId := int(r) if _Log { - fmt.Println("Begin currentId:" + strconv.Itoa(currentId)) + fmt.Println("Begin candidateId:" + strconv.Itoa(candidateId)) } - // 如果 currentId 大于最大值,则重置 - if currentId > _MaxWorkerId { + // 如果 candidateId 大于最大值,则重置 + if candidateId > _MaxWorkerId { if canReset() { // 当前应用获得重置 WorkerIdIndex 的权限 setWorkerIdIndex(-1) @@ -124,27 +125,27 @@ func getNextWorkerId() int { } if _Log { - fmt.Println("currentId:" + strconv.Itoa(currentId)) + fmt.Println("candidateId:" + strconv.Itoa(candidateId)) } - if isAvailable(currentId) { + if isAvailable(candidateId) { if _Log { - fmt.Println("AA: isAvailable:" + strconv.Itoa(currentId)) + fmt.Println("AA: isAvailable:" + strconv.Itoa(candidateId)) } // 最新获得的 WorkerIdIndex,在 redis 中是可用状态 - setWorkerIdFlag(currentId) - _usingWorkerId = currentId + setWorkerIdFlag(candidateId) + _usingWorkerId = candidateId _loopCount = 0 // 获取到可用 WorkerId 后,启用新线程,每隔 1/3个 _WorkerIdLifeTimeSeconds 时间,向服务器续期(延长一次 LifeTime) - _liftIndex++ - go extendWorkerIdLifeTime(_liftIndex) + _lifeIndex++ + go extendWorkerIdLifeTime(_lifeIndex) - return currentId + return candidateId } else { if _Log { - fmt.Println("BB: not isAvailable:" + strconv.Itoa(currentId)) + fmt.Println("BB: not isAvailable:" + strconv.Itoa(candidateId)) } // 最新获得的 WorkerIdIndex,在 redis 中是不可用状态,则继续下一个 WorkerIdIndex return getNextWorkerId() @@ -152,13 +153,17 @@ func getNextWorkerId() int { } func extendWorkerIdLifeTime(lifeIndex int) { - var index = lifeIndex + var myLifeIndex = lifeIndex + + // 循环操作:间隔一定时间,刷新 WorkerId 在 redis 中的有效时间。 for { time.Sleep(time.Duration(_WorkerIdLifeTimeSeconds/3) * time.Millisecond) + // 上锁操作,防止跟 UnRegisterWorkerId 操作重叠 _workerIdLock.Lock() - if index != _liftIndex { - // 如果临时变量 index 不等于 全局变量 _liftIndex,表明全局状态被修改,当前线程可终止 + + // 如果临时变量 myLifeIndex 不等于 全局变量 _lifeIndex,表明全局状态被修改,当前线程可终止,不应继续操作 redis + if myLifeIndex != _lifeIndex { break } @@ -167,7 +172,9 @@ func extendWorkerIdLifeTime(lifeIndex int) { break } + // 延长 redis 数据有效期 extendWorkerIdFlag(_usingWorkerId) + _workerIdLock.Unlock() } } @@ -236,5 +243,6 @@ func isAvailable(index int) bool { } return false } + return r != _WorkerIdFlag }