Browse Source

Merge pull request #37 from LoveBeforT/feat/idregister

python workerid注册器以及变量命名规范修改
pull/19/MERGE
yitter GitHub 2 years ago
parent
commit
e74490f5eb
No known key found for this signature in database GPG Key ID: 4AEE18F83AFDEB23
13 changed files with 434 additions and 217 deletions
  1. +25
    -8
      Python/README.md
  2. +27
    -0
      Python/example.py
  3. +0
    -17
      Python/source/Generator.py
  4. +0
    -27
      Python/source/Options.py
  5. +0
    -11
      Python/source/SnowFlake.py
  6. +0
    -136
      Python/source/SnowFlakeM1.py
  7. +0
    -0
      Python/source/__init__.py
  8. +38
    -0
      Python/source/generator.py
  9. +134
    -0
      Python/source/idregister.py
  10. +43
    -0
      Python/source/options.py
  11. +20
    -0
      Python/source/snowflake.py
  12. +147
    -0
      Python/source/snowflake_m1.py
  13. +0
    -18
      Python/test.py

+ 25
- 8
Python/README.md View File

@@ -1,4 +1,4 @@
# ❄ idgenerator-Python
# ❄ idgenerator-Python
## 运行环境
@@ -9,18 +9,35 @@ Python 3.6+
## 调用示例
​ 调用方法如下,其中worker_id为一个全局唯一的数字。
```python
# 导入包
from source import Options,Generator
# 声明id生成器参数,需要自己构建一个workerId
options = Options.IdGeneratorOptions(workerId=23)
# 参数中,WorkerIdBitLength 默认值6,支持的 WorkerId 最大值为2^6-1,若 WorkerId 超过64,可设置更大的 WorkerIdBitLength
idgen = Generator.DefaultIdGenerator()
from source import options, generator
# 声明id生成器参数,需要自己构建一个worker_id
options = options.IdGeneratorOptions(worker_id=23)
# 参数中,worker_id_bit_length 默认值6,支持的 worker_id 最大值为2^6-1,若 worker_id 超过64,可设置更大的 worker_id_bit_length
idgen = generator.DefaultIdGenerator()
# 保存参数
idgen.SetIdGernerator(options)
idgen.set_id_generator(options)
# 生成id
uid = idgen.NextId()
uid = idgen.next_id()
# 打印出来查看
print("%d, %x" % (uid,uid))
```
​ 包里面也提供了一个基于redis的worker id注册器,使用方法如下:
```python
from source import idregister
# 声明注册器,提供redis地址
register = idregister.Register(host="127.0.0.1", port=6379, max_worker_id=100)
# 获取worker id
worker_id = register.get_worker_id()
# 打印出来查看
print(worker_id)
# 程序退出的时候调用一次stop
register.stop()
```
​ 需要注意,注册器会启动一个线程,每隔一定时间向redis续期worker id,可以在最后退出程序的时候调用一次stop函数,使该线程退出,不过这需要等待几秒钟。

+ 27
- 0
Python/example.py View File

@@ -0,0 +1,27 @@
from source import options, generator, idregister

if __name__ == '__main__':
try:
# 连接redis
register = idregister.Register(host="127.0.0.1", port=6379)

# 获取worker id
worker_id = register.get_worker_id()

# 生成id generator
options = options.IdGeneratorOptions(worker_id=worker_id, seq_bit_length=10)
options.base_time = 12311111112
idgen = generator.DefaultIdGenerator()
idgen.set_id_generator(options)

uid = idgen.next_id()

print(worker_id)
print(uid)
print(options.__dict__)

# 退出注册器线程
register.stop()

except ValueError as e:
print(e)

+ 0
- 17
Python/source/Generator.py View File

@@ -1,17 +0,0 @@
from .Options import IdGeneratorOptions
from .SnowFlakeM1 import SnowFlakeM1

class DefaultIdGenerator():

def SetIdGernerator(self, options:IdGeneratorOptions) :
if options.BaseTime < 100000 :
raise ValueError ("BaseTime error.")
self.SnowFlake= SnowFlakeM1(options)

def NextId(self) -> int:
"""
获取新的UUID
"""
return self.SnowFlake.NextId()


+ 0
- 27
Python/source/Options.py View File

@@ -1,27 +0,0 @@

class IdGeneratorOptions():
def __init__(self, workerId = 0, workerIdBitLength = 6, seqBitLength = 6):
# 雪花计算方法,(1-漂移算法|2-传统算法),默认1。目前只实现了1。
self.Method = 1
# 基础时间(ms单位),不能超过当前系统时间
self.BaseTime = 1288834974657
# 机器码,必须由外部设定,最大值 2^WorkerIdBitLength-1
self.WorkerId = workerId
# 机器码位长,默认值6,取值范围 [1, 15](要求:序列数位长+机器码位长不超过22)
self.WorkerIdBitLength = workerIdBitLength
# 序列数位长,默认值6,取值范围 [3, 21](要求:序列数位长+机器码位长不超过22)
self.SeqBitLength = seqBitLength
# 最大序列数(含),设置范围 [MinSeqNumber, 2^SeqBitLength-1],默认值0,表示最大序列数取最大值(2^SeqBitLength-1])
self.MaxSeqNumber = 0
# 最小序列数(含),默认值5,取值范围 [5, MaxSeqNumber],每毫秒的前5个序列数对应编号0-4是保留位,其中1-4是时间回拨相应预留位,0是手工新值预留位
self.MinSeqNumber = 5
# 最大漂移次数(含),默认2000,推荐范围500-10000(与计算能力有关)
self.TopOverCostCount = 2000

+ 0
- 11
Python/source/SnowFlake.py View File

@@ -1,11 +0,0 @@
#!/usr/bin/python
# coding=UTF-8

# 组件编号生成器
class SnowFlake(object):
def __init__(self, options):
self.Options = options
def NextId(self) -> int:
return 0

+ 0
- 136
Python/source/SnowFlakeM1.py View File

@@ -1,136 +0,0 @@
#!/usr/bin/python
# coding=UTF-8
from .SnowFlake import SnowFlake
from .Options import IdGeneratorOptions
import threading,time

# 组件编号生成器
class SnowFlakeM1(SnowFlake):
def __init__(self, options:IdGeneratorOptions):
# 1.BaseTime
if options.BaseTime != 0:
self.BaseTime = int(options.BaseTime)
else:
self.BaseTime = 1582136402000
# 2.WorkerIdBitLength
if options.WorkerIdBitLength == 0:
self.WorkerIdBitLength = 6
else:
self.WorkerIdBitLength = int(options.WorkerIdBitLength)
# 3.WorkerId
self.WorkerId = options.WorkerId

# 4.SeqBitLength
if options.SeqBitLength == 0:
self.SeqBitLength = 6
else:
self.SeqBitLength = int(options.SeqBitLength)

# 5.MaxSeqNumber
if options.MaxSeqNumber <= 0:
self.MaxSeqNumber = (1 << self.SeqBitLength) - 1
else:
self.MaxSeqNumber = int(options.MaxSeqNumber)

# 6.MinSeqNumber
self.MinSeqNumber = int(options.MinSeqNumber)

# 7.TopOverCostCount
self.TopOverCostCount = int(options.TopOverCostCount)

# 8.Others
self.__TimestampShift = self.WorkerIdBitLength + self.SeqBitLength
self.__CurrentSeqNumber = self.MinSeqNumber
self.__LastTimeTick:int = 0
self.__TurnBackTimeTick:int = 0
self.__TurnBackIndex:int = 0
self.__IsOverCost = False
self.__OverCostCountInOneTerm:int = 0
self.__IDLock = threading.Lock()

def __NextOverCostId(self) -> int:
CurrentTimeTick = self.__GetCurrentTimeTick()
if CurrentTimeTick > self.__LastTimeTick:
self.__LastTimeTick = CurrentTimeTick
self.__CurrentSeqNumber = self.MinSeqNumber
self.__IsOverCost = False
self.__OverCostCountInOneTerm = 0
return self.__CalcId(self.__LastTimeTick)
if self.__OverCostCountInOneTerm >= self.TopOverCostCount:
self.__LastTimeTick = self.__GetNextTimeTick()
self.__CurrentSeqNumber = self.MinSeqNumber
self.__IsOverCost = False
self.__OverCostCountInOneTerm = 0
return self.__CalcId(self.__LastTimeTick)
if self.__CurrentSeqNumber > self.MaxSeqNumber:
self.__LastTimeTick+=1
self.__CurrentSeqNumber = self.MinSeqNumber
self.__IsOverCost = True
self.__OverCostCountInOneTerm+=1
return self.__CalcId(self.__LastTimeTick)

return self.__CalcId(self.__LastTimeTick)
def __NextNormalId(self) -> int:
CurrentTimeTick = self.__GetCurrentTimeTick()
if CurrentTimeTick < self.__LastTimeTick:
if self.__TurnBackTimeTick < 1:
self.__TurnBackTimeTick = self.__LastTimeTick - 1
self.__TurnBackIndex+=1
# 每毫秒序列数的前5位是预留位,0用于手工新值,1-4是时间回拨次序
# 支持4次回拨次序(避免回拨重叠导致ID重复),可无限次回拨(次序循环使用)。
if self.__TurnBackIndex > 4:
self.__TurnBackIndex = 1

return self.__CalcTurnBackId(self.__TurnBackTimeTick)

# 时间追平时,_TurnBackTimeTick清零
if self.__TurnBackTimeTick > 0:
self.__TurnBackTimeTick = 0

if CurrentTimeTick > self.__LastTimeTick:
self.__LastTimeTick = CurrentTimeTick
self.__CurrentSeqNumber = self.MinSeqNumber
return self.__CalcId(self.__LastTimeTick)

if self.__CurrentSeqNumber > self.MaxSeqNumber:
self.__LastTimeTick+=1
self.__CurrentSeqNumber = self.MinSeqNumber
self.__IsOverCost = True
self.__OverCostCountInOneTerm = 1
return self.__CalcId(self.__LastTimeTick)

return self.__CalcId(self.__LastTimeTick)

def __CalcId(self,useTimeTick) -> int:
self.__CurrentSeqNumber+=1
return ((useTimeTick<<self.__TimestampShift) + (self.WorkerId<<self.SeqBitLength) + self.__CurrentSeqNumber) % int(1e64)

def __CalcTurnBackId(self,useTimeTick) -> int:
self.__TurnBackTimeTick-=1
return ((useTimeTick<<self.__TimestampShift) + (self.WorkerId<<self.SeqBitLength) + self.__TurnBackIndex) % int(1e64)

def __GetCurrentTimeTick(self) -> int:
return int((time.time_ns() / 1e6) - self.BaseTime)

def __GetNextTimeTick(self) -> int:
TempTimeTicker = self.__GetCurrentTimeTick()
while TempTimeTicker <= self.__LastTimeTick:
# 0.001 = 1 mili sec
time.sleep(0.001)
TempTimeTicker = self.__GetCurrentTimeTick()
return TempTimeTicker

def NextId(self) -> int:
self.__IDLock.acquire()
if self.__IsOverCost:
id = self.__NextOverCostId()
else:
id = self.__NextNormalId()
self.__IDLock.release()
return id

+ 0
- 0
Python/source/__init__.py View File


+ 38
- 0
Python/source/generator.py View File

@@ -0,0 +1,38 @@
"""
雪花算法生成器IdGenerator
"""

# !/usr/bin/python
# coding=UTF-8


from . import options
from . import snowflake_m1


class DefaultIdGenerator:
"""
ID生成器
"""

def __init__(self):
self.snowflake = None

def set_id_generator(self, option: options.IdGeneratorOptions):
"""
设置id生成规则信息
"""

if option.base_time < 100000:
raise ValueError("base time error.")

self.snowflake = snowflake_m1.SnowFlakeM1(option)

def next_id(self) -> int:
"""
获取新的UUID
"""

if self.snowflake is None:
raise ValueError("please set id generator at first.")
return self.snowflake.next_id()

+ 134
- 0
Python/source/idregister.py View File

@@ -0,0 +1,134 @@
"""
worker id generator
"""

# !/usr/bin/python
# coding=UTF-8


from threading import Thread
import time
import logging
import redis


class Register:
"""
redis封装
- host 代表redis ip
- port 代表redis端口
- max_worker_id worker_id的最大值, 默认为100
- password redis的密码, 默认为空
"""

def __init__(self, host, port, max_worker_id=100, password=None):
self.redis_impl = redis.StrictRedis(host=host, port=port, db=0, password=password)
self.loop_count = 0
self.max_loop_count = 10
self.worker_id_expire_time = 15
self.max_worker_id = max_worker_id
self.worker_id = -1
self.is_stop = False

def get_lock(self, key):
"""
获取分布式全局锁,并设置过期时间为30秒
"""

if self.redis_impl.setnx(key, 1):
self.redis_impl.expire(key, 30)
return True
if self.redis_impl.ttl(key) < 0:
self.redis_impl.expire(key, 30)
return False

def stop(self):
"""
退出注册器的线程
"""

self.is_stop = True

def get_worker_id(self):
"""
获取全局唯一worker_id, 会创建一个线程给worker id续期
失败返回-1
"""

self.loop_count = 0

def extern_life(my_id):
while 1:
time.sleep(self.worker_id_expire_time / 3)
# 是否关闭了
if self.is_stop:
return
# 更新生命周期
if self.worker_id != my_id:
break
try:
self.redis_impl.expire(
f"IdGen:WorkerId:Value:{my_id}",
self.worker_id_expire_time)
except Exception as exe:
logging.error(exe)
continue

self.worker_id = self.__get_next_worker_id()
if self.worker_id > -1:
Thread(target=extern_life, args=[self.worker_id]).start()
return self.worker_id

def __get_next_worker_id(self):
"""
获取全局唯一worker id内部实现
"""

cur = self.redis_impl.incrby("IdGen:WorkerId:Index", 1)

def can_reset():
try:
reset_value = self.redis_impl.incr("IdGen:WorkerId:Value:Edit")
return reset_value != 1
except Exception as ept:
logging.error(ept)
return False

def end_reset():
try:
self.redis_impl.set("IdGen:WorkerId:Value:Edit", 0)
except Exception as ept:
logging.error(ept)

def is_available(worker_id: int):
try:
rst = self.redis_impl.get(f"IdGen:WorkerId:Value:{worker_id}")
return rst != "Y"
except Exception as ept:
logging.error(ept)
return False

if cur > self.max_worker_id:
if can_reset():
self.redis_impl.set("IdGen:WorkerId:Index", -1)
end_reset()
self.loop_count += 1

if self.loop_count > self.max_loop_count:
self.loop_count = 0
return -1

time.sleep(0.2 * self.loop_count)
return self.__get_next_worker_id()
time.sleep(0.2)
return self.__get_next_worker_id()
if is_available(cur):
self.redis_impl.setex(
f"IdGen:WorkerId:Value:{cur}",
self.worker_id_expire_time,
"Y"
)
self.loop_count = 0
return cur

return self.__get_next_worker_id()

+ 43
- 0
Python/source/options.py View File

@@ -0,0 +1,43 @@
"""
生成器IdGenerator配置选项
"""

# !/usr/bin/python
# coding=UTF-8


class IdGeneratorOptions:
"""
ID生成器配置
- worker_id 全局唯一id, 区分不同uuid生成器实例
- worker_id_bit_length 生成的uuid中worker_id占用的位数
- seq_bit_length 生成的uuid中序列号占用的位数
"""

def __init__(self, worker_id=0, worker_id_bit_length=6, seq_bit_length=6):

# 雪花计算方法,(1-漂移算法|2-传统算法), 默认1。目前只实现了1。
self.method = 1

# 基础时间(ms单位), 不能超过当前系统时间
self.base_time = 1288834974657

# 机器码, 必须由外部设定, 最大值 2^worker_id_bit_length-1
self.worker_id = worker_id

# 机器码位长, 默认值6, 取值范围 [1, 15](要求:序列数位长+机器码位长不超过22)
self.worker_id_bit_length = worker_id_bit_length

# 序列数位长, 默认值6, 取值范围 [3, 21](要求:序列数位长+机器码位长不超过22)
self.seq_bit_length = seq_bit_length

# 最大序列数(含), 设置范围 [max_seq_number, 2^seq_bit_length-1]
# 默认值0, 表示最大序列数取最大值(2^seq_bit_length-1])
self.max_seq_number = 0

# 最小序列数(含), 默认值5, 取值范围 [5, max_seq_number], 每毫秒的前5个序列数对应编号0-4是保留位
# 其中1-4是时间回拨相应预留位, 0是手工新值预留位
self.min_seq_number = 5

# 最大漂移次数(含), 默认2000, 推荐范围500-10000(与计算能力有关)
self.top_over_cost_count = 2000

+ 20
- 0
Python/source/snowflake.py View File

@@ -0,0 +1,20 @@

"""
雪花算法生成器接口声明
"""

# !/usr/bin/python
# coding=UTF-8


class SnowFlake():

def __init__(self, options):
self.options = options

def next_id(self) -> int:
"""
获取新的UUID
"""
return 0

+ 147
- 0
Python/source/snowflake_m1.py View File

@@ -0,0 +1,147 @@
"""
M1生成器
"""

# !/usr/bin/python
# coding=UTF-8

import threading
import time
from .snowflake import SnowFlake
from .options import IdGeneratorOptions


class SnowFlakeM1(SnowFlake):
"""
M1规则ID生成器配置
"""

def __init__(self, options: IdGeneratorOptions):
# 1.base_time
self.base_time = 1582136402000
if options.base_time != 0:
self.base_time = int(options.base_time)

# 2.worker_id_bit_length
self.worker_id_bit_length = 6
if options.worker_id_bit_length != 0:
self.worker_id_bit_length = int(options.worker_id_bit_length)

# 3.worker_id
self.worker_id = options.worker_id

# 4.seq_bit_length
self.seq_bit_length = 6
if options.seq_bit_length != 0:
self.seq_bit_length = int(options.seq_bit_length)

# 5.max_seq_number
self.max_seq_number = int(options.max_seq_number)
if options.max_seq_number <= 0:
self.max_seq_number = (1 << self.seq_bit_length) - 1

# 6.min_seq_number
self.min_seq_number = int(options.min_seq_number)

# 7.top_over_cost_count
self.top_over_cost_count = int(options.top_over_cost_count)

# 8.Others
self.__timestamp_shift = self.worker_id_bit_length + self.seq_bit_length
self.__current_seq_number = self.min_seq_number
self.__last_time_tick: int = 0
self.__turn_back_time_tick: int = 0
self.__turn_back_index: int = 0
self.__is_over_cost = False
self.___over_cost_count_in_one_term: int = 0
self.__id_lock = threading.Lock()

def __next_over_cost_id(self) -> int:
current_time_tick = self.__get_current_time_tick()
if current_time_tick > self.__last_time_tick:
self.__last_time_tick = current_time_tick
self.__current_seq_number = self.min_seq_number
self.__is_over_cost = False
self.___over_cost_count_in_one_term = 0
return self.__calc_id(self.__last_time_tick)

if self.___over_cost_count_in_one_term >= self.top_over_cost_count:
self.__last_time_tick = self.__get_next_time_tick()
self.__current_seq_number = self.min_seq_number
self.__is_over_cost = False
self.___over_cost_count_in_one_term = 0
return self.__calc_id(self.__last_time_tick)

if self.__current_seq_number > self.max_seq_number:
self.__last_time_tick += 1
self.__current_seq_number = self.min_seq_number
self.__is_over_cost = True
self.___over_cost_count_in_one_term += 1
return self.__calc_id(self.__last_time_tick)

return self.__calc_id(self.__last_time_tick)

def __next_normal_id(self) -> int:
current_time_tick = self.__get_current_time_tick()
if current_time_tick < self.__last_time_tick:
if self.__turn_back_time_tick < 1:
self.__turn_back_time_tick = self.__last_time_tick - 1
self.__turn_back_index += 1
# 每毫秒序列数的前5位是预留位, 0用于手工新值, 1-4是时间回拨次序
# 支持4次回拨次序(避免回拨重叠导致ID重复), 可无限次回拨(次序循环使用)。
if self.__turn_back_index > 4:
self.__turn_back_index = 1

return self.__calc_turn_back_id(self.__turn_back_time_tick)

# 时间追平时, _TurnBackTimeTick清零
self.__turn_back_time_tick = min(self.__turn_back_time_tick, 0)

if current_time_tick > self.__last_time_tick:
self.__last_time_tick = current_time_tick
self.__current_seq_number = self.min_seq_number
return self.__calc_id(self.__last_time_tick)

if self.__current_seq_number > self.max_seq_number:
self.__last_time_tick += 1
self.__current_seq_number = self.min_seq_number
self.__is_over_cost = True
self.___over_cost_count_in_one_term = 1
return self.__calc_id(self.__last_time_tick)

return self.__calc_id(self.__last_time_tick)

def __calc_id(self, use_time_tick) -> int:
self.__current_seq_number += 1
return (
(use_time_tick << self.__timestamp_shift) +
(self.worker_id << self.seq_bit_length) +
self.__current_seq_number
) % int(1e64)

def __calc_turn_back_id(self, use_time_tick) -> int:
self.__turn_back_time_tick -= 1
return (
(use_time_tick << self.__timestamp_shift) +
(self.worker_id << self.seq_bit_length) +
self.__turn_back_index
) % int(1e64)

def __get_current_time_tick(self) -> int:
return int((time.time_ns() / 1e6) - self.base_time)

def __get_next_time_tick(self) -> int:
temp_time_ticker = self.__get_current_time_tick()
while temp_time_ticker <= self.__last_time_tick:
# 0.001 = 1 mili sec
time.sleep(0.001)
temp_time_ticker = self.__get_current_time_tick()
return temp_time_ticker

def next_id(self) -> int:
with self.__id_lock:
if self.__is_over_cost:
nextid = self.__next_over_cost_id()
else:
nextid = self.__next_normal_id()
return nextid

+ 0
- 18
Python/test.py View File

@@ -1,18 +0,0 @@
from source import Options,Generator

if __name__ == '__main__':
try:
options = Options.IdGeneratorOptions(workerId=23,seqBitLength=10)
options.BaseTime = 1231111111
idgen = Generator.DefaultIdGenerator()
idgen.SetIdGernerator(options)

uid = idgen.NextId()

print(uid)
print(options.__dict__)
except ValueError as e:
print(e)



Loading…
Cancel
Save