Browse Source

auto commit

tags/v1.0.0
yitter 4 years ago
parent
commit
31e0f44b2a
5 changed files with 287 additions and 118 deletions
  1. +9
    -5
      C#.NET/source/YitIdGen.WinFormApp/StartForm.cs
  2. +37
    -17
      C#.NET/source/Yitter.IdGenTest/Program.cs
  3. +1
    -1
      Go/source/idgen/SnowWorkerM2.go
  4. +47
    -30
      Go/source/main.go
  5. +193
    -65
      Go/source/regworkerid/reghelper.go

+ 9
- 5
C#.NET/source/YitIdGen.WinFormApp/StartForm.cs View File

@@ -25,12 +25,11 @@ namespace WInFormApp
public static extern long NextId2();
[DllImport("yitidgengo.dll", CallingConvention = CallingConvention.StdCall)]
public static extern long RegisterWorkerId(string ip, int port, string password, int maxWorkerIdNumber);
public static extern IntPtr RegisterMany(string ip, int port, string password, int maxWorkerIdNumber, int idCount);
//public static extern ulong RegisterWorkerId2();
[DllImport("yitidgengo.dll", CallingConvention = CallingConvention.StdCall)]
public static extern void UnRegisterWorkerId();
public static extern void UnRegister();
[DllImport("yitidgen.dll", CallingConvention = CallingConvention.StdCall)]
public static extern void SetWorkerId(uint workerId);
@@ -54,7 +53,12 @@ namespace WInFormApp
var ip = "localhost";
//txtIdList.Text += RegisterWorkerId(Encoding.UTF8.GetBytes(ip), 6379) + "\r\n";
txtIdList.Text += RegisterWorkerId(ip, 6379, "", 4) + "\r\n";
var ids = RegisterMany(ip, 6379, "", 4, 3);
//foreach (var id in ids)
//{
// txtIdList.Text += id;
//}
//txtIdList.Text += RegisterWorkerId() + "\r\n";
//txtIdList.Text += Test() + "\r\n";
@@ -69,7 +73,7 @@ namespace WInFormApp
{
try
{
UnRegisterWorkerId();
UnRegister();
txtIdList.Text += "LogOff";
}
catch (Exception ex)


+ 37
- 17
C#.NET/source/Yitter.IdGenTest/Program.cs View File

@@ -10,7 +10,6 @@ namespace Yitter.OrgSystem.TestA
{
class Program
{
// 测试参数(默认配置下,最佳性能是10W/s)
static int genIdCount = 50000; // 计算ID数量(如果要验证50W效率,请将TopOverCostCount设置为20000或适当增加SeqBitLength)
static short method = 1; // 1-漂移算法,2-传统算法
@@ -25,10 +24,39 @@ namespace Yitter.OrgSystem.TestA
static int workerCount = 1;
//[DllImport("yitidgenc.dll", CallingConvention = CallingConvention.StdCall)]
//public static extern long NextId();
[DllImport("yitidgengo.dll", EntryPoint = "NextId", CallingConvention = CallingConvention.StdCall)]
public static extern long NextId2();
[DllImport("yitidgengo.so", EntryPoint = "NextId", CallingConvention = CallingConvention.StdCall)]
public static extern long NextId();
[DllImport("yitidgen.dll", CallingConvention = CallingConvention.StdCall)]
public static extern void SetWorkerId(uint workerId);
[DllImport("yitidgen.dll", CallingConvention = CallingConvention.StdCall)]
public static extern int TestId();
[DllImport("yitidgengo.dll", CallingConvention = CallingConvention.StdCall)]
public static extern long RegisterWorkerId(string ip, int port, string password, int maxWorkerIdNumber);
//public static extern ulong RegisterWorkerId2();
[DllImport("yitidgengo.dll", CallingConvention = CallingConvention.StdCall)]
public static extern void UnRegisterWorkerId();
static void Main(string[] args)
{
Console.WriteLine("Hello World! C#");
RegisterWorkerId();
while (true)
{
Thread.Sleep(20000);
}
var options = new IdGeneratorOptions()
{
Method = method,
@@ -55,27 +83,19 @@ namespace Yitter.OrgSystem.TestA
while (true)
{
//RunSingle();
CallDll();
RunSingle();
//CallDll();
//Go(options);
Thread.Sleep(1000); // 每隔1秒执行一次Go
}
}
//[DllImport("yitidgenc.dll", CallingConvention = CallingConvention.StdCall)]
//public static extern long NextId();
[DllImport("yitidgengo.dll", EntryPoint = "NextId", CallingConvention = CallingConvention.StdCall)]
public static extern long NextId2();
[DllImport("yitidgengo.so", EntryPoint = "NextId", CallingConvention = CallingConvention.StdCall)]
public static extern long NextId();
[DllImport("yitidgen.dll", CallingConvention = CallingConvention.StdCall)]
public static extern void SetWorkerId(uint workerId);
[DllImport("yitidgen.dll", CallingConvention = CallingConvention.StdCall)]
public static extern int TestId();
private static void RegisterWorkerId()
{
var workerId = RegisterWorkerId("118.178.140.203", 4037, "zhou@@myredis", 4);
Console.WriteLine("workerId:" + workerId);
return;
}
private static void CallDll()
{


+ 1
- 1
Go/source/idgen/SnowWorkerM2.go View File

@@ -38,6 +38,6 @@ func (m2 SnowWorkerM2) NextId() uint64 {
fmt.Println("Time error for {0} milliseconds", strconv.FormatInt(m2._LastTimeTick-currentTimeTick, 10))
}
m2._LastTimeTick = currentTimeTick
result := uint64((currentTimeTick << m2._TimestampShift)) + uint64(m2.WorkerId<<m2.SeqBitLength) + uint64(m2._CurrentSeqNumber)
result := uint64(currentTimeTick << m2._TimestampShift) + uint64(m2.WorkerId<<m2.SeqBitLength) + uint64(m2._CurrentSeqNumber)
return result
}

+ 47
- 30
Go/source/main.go View File

@@ -4,6 +4,7 @@ import (
"C"
"fmt"
"time"
"unsafe"
"yitidgen/idgen"
"yitidgen/regworkerid"
)
@@ -19,50 +20,66 @@ func NextId() uint64 {
return idgen.NextId()
}

// 注册一个新的WorkerId
//export RegisterWorkerId
func RegisterWorkerId(ip *C.char, port int, password *C.char, maxWorkerId int) int {
return int(regworkerid.RegisterWorkerId(C.GoString(ip), port, C.GoString(password), maxWorkerId))
// 注册一个 WorkerId,会先注销所有本机已注册的记录
//export RegisterOne
func RegisterOne(ip *C.char, port int32, password *C.char, maxWorkerId int32) int32 {
return regworkerid.RegisterOne(C.GoString(ip), port, C.GoString(password), maxWorkerId)
}

// 注销WorkerId
//export UnRegisterWorkerId
func UnRegisterWorkerId() {
regworkerid.UnRegisterWorkerId()
// 注册多个 WorkerId,会先注销所有本机已注册的记录
//export RegisterMany
func RegisterMany(ip *C.char, port int32, password *C.char, maxWorkerId int32, totalCount int32) *C.int {
values := regworkerid.RegisterMany(C.GoString(ip), port, C.GoString(password), maxWorkerId, totalCount)
return (*C.int)(unsafe.Pointer(&values))
}

// 注销本机已注册的 WorkerId
//export UnRegister
func UnRegister() {
regworkerid.UnRegister()
}

// 检查本地WorkerId是否有效(0-有效,其它-无效)
//export ValidateLocalWorkerId
func ValidateLocalWorkerId(workerId int) int {
return regworkerid.ValidateLocalWorkerId(workerId)
//export Validate
func Validate(workerId int32) int32 {
return regworkerid.Validate(workerId)
}

func main() {
// 方法一:直接采用默认方法生成一个Id
fmt.Println("生成的Id:", idgen.NextId())
const testGenId = true // 测试设置

fmt.Println("注册的WorkerId:", regworkerid.RegisterWorkerId("localhost", 6379, "", 4))

// 方法二:自定义参数
var options = idgen.NewIdGeneratorOptions(1)
options.WorkerIdBitLength = 6
options.SeqBitLength = 6
options.BaseTime = time.Date(2020, 2, 20, 2, 20, 2, 20, time.UTC).UnixNano() / 1e6
idgen.SetIdGenerator(options)
if testGenId {
// 自定义参数
var options = idgen.NewIdGeneratorOptions(1)
options.WorkerIdBitLength = 6
options.SeqBitLength = 6
options.BaseTime = time.Date(2020, 2, 20, 2, 20, 2, 20, time.UTC).UnixNano() / 1e6
idgen.SetIdGenerator(options)

var genCount = 50000
var genCount = 50000
for {
var begin = time.Now().UnixNano() / 1e3
for i := 0; i < genCount; i++ {
// 生成ID
idgen.NextId()
}
var end = time.Now().UnixNano() / 1e3

for {
var begin = time.Now().UnixNano() / 1e3
for i := 0; i < genCount; i++ {
idgen.NextId()
fmt.Println(end - begin)
time.Sleep(time.Duration(1000) * time.Millisecond)
}
} else {
workerIdList := regworkerid.RegisterMany("localhost", 6379, "", 4, 3)
for _, value := range workerIdList {
fmt.Println("注册的WorkerId:", value)
}
var end = time.Now().UnixNano() / 1e3

fmt.Println(end - begin)
time.Sleep(time.Duration(1000) * time.Millisecond)
}
//var workerId = regworkerid.RegisterOne("localhost", 6379, "", 4)
//fmt.Println("注册的WorkerId:", workerId)

fmt.Println("end")
time.Sleep(time.Duration(300) * time.Second)
}
}

// go build -o ./target/yitidgengo.dll -buildmode=c-shared main.go


+ 193
- 65
Go/source/regworkerid/reghelper.go View File

@@ -13,78 +13,167 @@ var _client *redis.Client
var _ctx = context.Background()
var _workerIdLock sync.Mutex

var _usingWorkerId int = -1 // 当前已注册的WorkerId
var _loopCount int = 0 // 循环数量
var _lifeIndex int = -1 // WorkerId本地生命时序(本地多次注册时,生命时序会不同)
var _token int = -1 // WorkerId远程注册时用的token,将存储在 IdGen:WorkerId:Value:xx 的值中(本功能暂未启用)
var _workerIdList []int32 // 当前已注册的WorkerId
var _loopCount = 0 // 循环数量
var _lifeIndex = -1 // WorkerId本地生命时序(本地多次注册时,生命时序会不同)
var _token = -1 // WorkerId远程注册时用的token,将存储在 IdGen:WorkerId:Value:xx 的值中(本功能暂未启用)

var _WorkerIdLifeTimeSeconds = 15 // IdGen:WorkerId:Value:xx 的值在 redis 中的有效期(单位秒,最好是3的整数倍)
var _MaxLoopCount = 10 // 最大循环次数(无可用WorkerId时循环查找)
var _SleepMillisecondEveryLoop = 200 // 每次循环后,暂停时间
var _MaxWorkerId int = 0 // 最大WorkerId值,超过此值从0开始
var _MaxWorkerId int32 = 0 // 最大WorkerId值,超过此值从0开始

var _ConnString = ""
var _Password = ""
var _RedisConnString = ""
var _RedisPassword = ""

const _WorkerIdIndexKey string = "IdGen:WorkerId:Index" // redis 中的key
const _WorkerIdValueKeyPrefix string = "IdGen:WorkerId:Value:" // redis 中的key
const _WorkerIdFlag = "Y" // IdGen:WorkerId:Value:xx 的值(将来可用 _token 替代)
const _Log = false // 是否输出日志
const _Log = true // 是否输出日志

func ValidateLocalWorkerId(workerId int) int {
if workerId == _usingWorkerId {
return 0
} else {
return -1
func Validate(workerId int32) int32 {
for _, value := range _workerIdList {
if value == workerId {
return 1
}
}
}

func UnRegisterWorkerId() {
if _usingWorkerId < 0 {
return
}
return 0

//if workerId == _usingWorkerId {
// return 0
//} else {
// return -1
//}
}

func UnRegister() {
_workerIdLock.Lock()
_client.Del(_ctx, _WorkerIdValueKeyPrefix+strconv.Itoa(_usingWorkerId))
_usingWorkerId = -1

_lifeIndex = -1
for _, value := range _workerIdList {
if value > -1 {
_client.Del(_ctx, _WorkerIdValueKeyPrefix+strconv.Itoa(int(value)))
}
}
_workerIdList = []int32{}

_workerIdLock.Unlock()
}

func RegisterWorkerId(ip string, port int, password string, maxWorkerId int) int {
// maxWorkerId不能小于0
func autoUnRegister() {
// 如果当前已注册过 WorkerId,则先注销,并终止先前的自动续期线程
if len(_workerIdList) > 0 {
UnRegister()
}
}

func RegisterMany(ip string, port int32, password string, maxWorkerId int32, totalCount int32) []int32 {
if maxWorkerId < 0 {
return -2
return []int32{-2}
}

// 如果当前已注册过 WorkerId,则先注销,并终止先前的自动续期线程
if _usingWorkerId > -1 {
UnRegisterWorkerId()
if totalCount < 1 {
return []int32{-1}
}

autoUnRegister()

_MaxWorkerId = maxWorkerId
_ConnString = ip + ":" + strconv.Itoa(port)
_Password = password
_RedisConnString = ip + ":" + strconv.Itoa(int(port))
_RedisPassword = password
_client = newRedisClient()
defer _client.Close()
if _client == nil {
return []int32{-1}
}
defer func() {
if _client != nil {
_ = _client.Close()
}
}()
//_, err := _client.Ping(_ctx).Result()
//if err != nil {
// //panic("init redis error")
// return []int{-3}
//} else {
// if _Log {
// fmt.Println("init redis ok")
// }
//}

_lifeIndex++
_workerIdList = make([]int32, totalCount)
for key := range _workerIdList {
_workerIdList[key] = -1 // 全部初始化-1
}

_, err := _client.Ping(_ctx).Result()
if err != nil {
panic("init redis error")
} else {
if _Log {
fmt.Println("init redis ok")
useExtendFunc := false
for key := range _workerIdList {
id := register(_lifeIndex)
if id > -1 {
useExtendFunc = true
_workerIdList[key] = id //= append(_workerIdList, id)
} else {
break
}
}

if useExtendFunc {
go extendLifeTime(_lifeIndex)
}

return _workerIdList
}

func RegisterOne(ip string, port int32, password string, maxWorkerId int32) int32 {
if maxWorkerId < 0 {
return -2
}

autoUnRegister()

_MaxWorkerId = maxWorkerId
_RedisConnString = ip + ":" + strconv.Itoa(int(port))
_RedisPassword = password
_loopCount = 0
return getNextWorkerId()
_client = newRedisClient()
if _client == nil {
return -3
}
defer func() {
if _client != nil {
_ = _client.Close()
}
}()
//_, err := _client.Ping(_ctx).Result()
//if err != nil {
// // panic("init redis error")
// return -3
//} else {
// if _Log {
// fmt.Println("init redis ok")
// }
//}

_lifeIndex++
var id = register(_lifeIndex)
if id > -1 {
_workerIdList = []int32{id}
go extendLifeTime(_lifeIndex)
}

return id
}

func register(lifeTime int) int32 {
_loopCount = 0
return getNextWorkerId(lifeTime)
}

func newRedisClient() *redis.Client {
return redis.NewClient(&redis.Options{
Addr: _ConnString,
Password: _Password,
Addr: _RedisConnString,
Password: _RedisPassword,
DB: 0,
//PoolSize: 1000,
//ReadTimeout: time.Millisecond * time.Duration(100),
@@ -93,16 +182,16 @@ func newRedisClient() *redis.Client {
})
}

func getNextWorkerId() int {
func getNextWorkerId(lifeTime int) int32 {
// 获取当前 WorkerIdIndex
r, err := _client.Incr(_ctx, _WorkerIdIndexKey).Result()
if err != nil {
return 0
return -1
}

candidateId := int(r)
candidateId := int32(r)
if _Log {
fmt.Println("Begin candidateId:" + strconv.Itoa(candidateId))
fmt.Println("Begin candidateId:" + strconv.Itoa(int(candidateId)))
}

// 如果 candidateId 大于最大值,则重置
@@ -128,7 +217,7 @@ func getNextWorkerId() int {
fmt.Println("canReset loop")
}

return getNextWorkerId()
return getNextWorkerId(lifeTime)
} else {
// 如果有其它应用正在编辑,则本应用暂停200ms后,再继续
time.Sleep(time.Duration(200) * time.Millisecond)
@@ -137,46 +226,45 @@ func getNextWorkerId() int {
fmt.Println("not canReset loop")
}

return getNextWorkerId()
return getNextWorkerId(lifeTime)
}
}

if _Log {
fmt.Println("candidateId:" + strconv.Itoa(candidateId))
fmt.Println("candidateId:" + strconv.Itoa(int(candidateId)))
}

if isAvailable(candidateId) {
if _Log {
fmt.Println("AA: isAvailable:" + strconv.Itoa(candidateId))
fmt.Println("AA: isAvailable:" + strconv.Itoa(int(candidateId)))
}

// 最新获得的 WorkerIdIndex,在 redis 中是可用状态
setWorkerIdFlag(candidateId)
_usingWorkerId = candidateId
_loopCount = 0

// 获取到可用 WorkerId 后,启用新线程,每隔 1/3个 _WorkerIdLifeTimeSeconds 时间,向服务器续期(延长一次 LifeTime)
_lifeIndex++
go extendWorkerIdLifeTime(_lifeIndex)
// go extendWorkerIdLifeTime(lifeTime, candidateId)

return candidateId
} else {
if _Log {
fmt.Println("BB: not isAvailable:" + strconv.Itoa(candidateId))
fmt.Println("BB: not isAvailable:" + strconv.Itoa(int(candidateId)))
}
// 最新获得的 WorkerIdIndex,在 redis 中是不可用状态,则继续下一个 WorkerIdIndex
return getNextWorkerId()
return getNextWorkerId(lifeTime)
}
}

func extendWorkerIdLifeTime(lifeIndex int) {
func extendLifeTime(lifeIndex int) {
// 获取到可用 WorkerId 后,启用新线程,每隔 1/3个 _WorkerIdLifeTimeSeconds 时间,向服务器续期(延长一次 LifeTime)
var myLifeIndex = lifeIndex

// 循环操作:间隔一定时间,刷新 WorkerId 在 redis 中的有效时间。
for {
time.Sleep(time.Duration(_WorkerIdLifeTimeSeconds/3) * time.Second)

// 上锁操作,防止跟 UnRegisterWorkerId 操作重叠
// 上锁操作,防止跟 UnRegister 操作重叠
_workerIdLock.Lock()

// 如果临时变量 myLifeIndex 不等于 全局变量 _lifeIndex,表明全局状态被修改,当前线程可终止,不应继续操作 redis
@@ -185,12 +273,44 @@ func extendWorkerIdLifeTime(lifeIndex int) {
}

// 已经被注销,则终止(此步是上一步的二次验证)
if _usingWorkerId < 0 {
if len(_workerIdList) < 1 {
break
}

// 延长 redis 数据有效期
for _, value := range _workerIdList {
if value > -1 {
extendWorkerIdFlag(value)
}
}

_workerIdLock.Unlock()
}
}

func extendWorkerIdLifeTime(lifeIndex int, workerId int32) {
var myLifeIndex = lifeIndex
var myWorkerId = workerId

// 循环操作:间隔一定时间,刷新 WorkerId 在 redis 中的有效时间。
for {
time.Sleep(time.Duration(_WorkerIdLifeTimeSeconds/3) * time.Second)

// 上锁操作,防止跟 UnRegister 操作重叠
_workerIdLock.Lock()

// 如果临时变量 myLifeIndex 不等于 全局变量 _lifeIndex,表明全局状态被修改,当前线程可终止,不应继续操作 redis
if myLifeIndex != _lifeIndex {
break
}

// 已经被注销,则终止(此步是上一步的二次验证)
//if _usingWorkerId < 0 {
// break
//}

// 延长 redis 数据有效期
extendWorkerIdFlag(_usingWorkerId)
extendWorkerIdFlag(myWorkerId)

_workerIdLock.Unlock()
}
@@ -212,14 +332,22 @@ func setWorkerIdIndex(val int) {
_client.Set(_ctx, _WorkerIdIndexKey, val, 0)
}

func setWorkerIdFlag(index int) {
_client.Set(_ctx, _WorkerIdValueKeyPrefix+strconv.Itoa(index), _WorkerIdFlag, time.Duration(_WorkerIdLifeTimeSeconds)*time.Second)
func setWorkerIdFlag(workerId int32) {
_client.Set(_ctx, _WorkerIdValueKeyPrefix+strconv.Itoa(int(workerId)), _WorkerIdFlag, time.Duration(_WorkerIdLifeTimeSeconds)*time.Second)
}

func extendWorkerIdFlag(index int) {
func extendWorkerIdFlag(workerId int32) {
var client = newRedisClient()
defer client.Close()
client.Expire(_ctx, _WorkerIdValueKeyPrefix+strconv.Itoa(index), time.Duration(_WorkerIdLifeTimeSeconds)*time.Second)
if client == nil {
return
}
defer func() {
if client != nil {
_ = client.Close()
}
}()

client.Expire(_ctx, _WorkerIdValueKeyPrefix+strconv.Itoa(int(workerId)), time.Duration(_WorkerIdLifeTimeSeconds)*time.Second)
}

func canReset() bool {
@@ -229,7 +357,7 @@ func canReset() bool {
}

if _Log {
fmt.Println("canReset:" + string(r))
fmt.Println("canReset:" + strconv.Itoa(int(r)))
}

return r != 1
@@ -240,16 +368,16 @@ func endReset() {
_client.Set(_ctx, _WorkerIdValueKeyPrefix+"Edit", 0, 0)
}

func getWorkerIdFlag(index int) (string, bool) {
r, err := _client.Get(_ctx, _WorkerIdValueKeyPrefix+strconv.Itoa(index)).Result()
func getWorkerIdFlag(workerId int32) (string, bool) {
r, err := _client.Get(_ctx, _WorkerIdValueKeyPrefix+strconv.Itoa(int(workerId))).Result()
if err != nil {
return "", false
}
return r, true
}

func isAvailable(index int) bool {
r, err := _client.Get(_ctx, _WorkerIdValueKeyPrefix+strconv.Itoa(index)).Result()
func isAvailable(workerId int32) bool {
r, err := _client.Get(_ctx, _WorkerIdValueKeyPrefix+strconv.Itoa(int(workerId))).Result()

if _Log {
fmt.Println("XX isAvailable:" + r)


Loading…
Cancel
Save