From 31e0f44b2a61a12e2eb9812db3a4c1168821e6a5 Mon Sep 17 00:00:00 2001 From: yitter Date: Sun, 4 Apr 2021 02:02:06 +0800 Subject: [PATCH] auto commit --- .../source/YitIdGen.WinFormApp/StartForm.cs | 14 +- C#.NET/source/Yitter.IdGenTest/Program.cs | 54 ++-- Go/source/idgen/SnowWorkerM2.go | 2 +- Go/source/main.go | 77 ++++-- Go/source/regworkerid/reghelper.go | 258 +++++++++++++----- 5 files changed, 287 insertions(+), 118 deletions(-) diff --git a/C#.NET/source/YitIdGen.WinFormApp/StartForm.cs b/C#.NET/source/YitIdGen.WinFormApp/StartForm.cs index f80a332..ac8b287 100644 --- a/C#.NET/source/YitIdGen.WinFormApp/StartForm.cs +++ b/C#.NET/source/YitIdGen.WinFormApp/StartForm.cs @@ -25,12 +25,11 @@ namespace WInFormApp public static extern long NextId2(); [DllImport("yitidgengo.dll", CallingConvention = CallingConvention.StdCall)] - public static extern long RegisterWorkerId(string ip, int port, string password, int maxWorkerIdNumber); + public static extern IntPtr RegisterMany(string ip, int port, string password, int maxWorkerIdNumber, int idCount); //public static extern ulong RegisterWorkerId2(); [DllImport("yitidgengo.dll", CallingConvention = CallingConvention.StdCall)] - public static extern void UnRegisterWorkerId(); - + public static extern void UnRegister(); [DllImport("yitidgen.dll", CallingConvention = CallingConvention.StdCall)] public static extern void SetWorkerId(uint workerId); @@ -54,7 +53,12 @@ namespace WInFormApp var ip = "localhost"; //txtIdList.Text += RegisterWorkerId(Encoding.UTF8.GetBytes(ip), 6379) + "\r\n"; - txtIdList.Text += RegisterWorkerId(ip, 6379, "", 4) + "\r\n"; + var ids = RegisterMany(ip, 6379, "", 4, 3); + //foreach (var id in ids) + //{ + // txtIdList.Text += id; + //} + //txtIdList.Text += RegisterWorkerId() + "\r\n"; //txtIdList.Text += Test() + "\r\n"; @@ -69,7 +73,7 @@ namespace WInFormApp { try { - UnRegisterWorkerId(); + UnRegister(); txtIdList.Text += "LogOff"; } catch (Exception ex) diff --git a/C#.NET/source/Yitter.IdGenTest/Program.cs b/C#.NET/source/Yitter.IdGenTest/Program.cs index 73e5905..8aa02e9 100644 --- a/C#.NET/source/Yitter.IdGenTest/Program.cs +++ b/C#.NET/source/Yitter.IdGenTest/Program.cs @@ -10,7 +10,6 @@ namespace Yitter.OrgSystem.TestA { class Program { - // 测试参数(默认配置下,最佳性能是10W/s) static int genIdCount = 50000; // 计算ID数量(如果要验证50W效率,请将TopOverCostCount设置为20000或适当增加SeqBitLength) static short method = 1; // 1-漂移算法,2-传统算法 @@ -25,10 +24,39 @@ namespace Yitter.OrgSystem.TestA static int workerCount = 1; + //[DllImport("yitidgenc.dll", CallingConvention = CallingConvention.StdCall)] + //public static extern long NextId(); + + [DllImport("yitidgengo.dll", EntryPoint = "NextId", CallingConvention = CallingConvention.StdCall)] + public static extern long NextId2(); + + [DllImport("yitidgengo.so", EntryPoint = "NextId", CallingConvention = CallingConvention.StdCall)] + public static extern long NextId(); + + [DllImport("yitidgen.dll", CallingConvention = CallingConvention.StdCall)] + public static extern void SetWorkerId(uint workerId); + + [DllImport("yitidgen.dll", CallingConvention = CallingConvention.StdCall)] + public static extern int TestId(); + + [DllImport("yitidgengo.dll", CallingConvention = CallingConvention.StdCall)] + public static extern long RegisterWorkerId(string ip, int port, string password, int maxWorkerIdNumber); + //public static extern ulong RegisterWorkerId2(); + + [DllImport("yitidgengo.dll", CallingConvention = CallingConvention.StdCall)] + public static extern void UnRegisterWorkerId(); + + static void Main(string[] args) { Console.WriteLine("Hello World! C#"); + RegisterWorkerId(); + while (true) + { + Thread.Sleep(20000); + } + var options = new IdGeneratorOptions() { Method = method, @@ -55,27 +83,19 @@ namespace Yitter.OrgSystem.TestA while (true) { - //RunSingle(); - CallDll(); + RunSingle(); + //CallDll(); //Go(options); Thread.Sleep(1000); // 每隔1秒执行一次Go } } - //[DllImport("yitidgenc.dll", CallingConvention = CallingConvention.StdCall)] - //public static extern long NextId(); - - [DllImport("yitidgengo.dll", EntryPoint = "NextId", CallingConvention = CallingConvention.StdCall)] - public static extern long NextId2(); - - [DllImport("yitidgengo.so", EntryPoint = "NextId", CallingConvention = CallingConvention.StdCall)] - public static extern long NextId(); - - [DllImport("yitidgen.dll", CallingConvention = CallingConvention.StdCall)] - public static extern void SetWorkerId(uint workerId); - - [DllImport("yitidgen.dll", CallingConvention = CallingConvention.StdCall)] - public static extern int TestId(); + private static void RegisterWorkerId() + { + var workerId = RegisterWorkerId("118.178.140.203", 4037, "zhou@@myredis", 4); + Console.WriteLine("workerId:" + workerId); + return; + } private static void CallDll() { diff --git a/Go/source/idgen/SnowWorkerM2.go b/Go/source/idgen/SnowWorkerM2.go index 2f513e3..0e5f7bf 100644 --- a/Go/source/idgen/SnowWorkerM2.go +++ b/Go/source/idgen/SnowWorkerM2.go @@ -38,6 +38,6 @@ func (m2 SnowWorkerM2) NextId() uint64 { fmt.Println("Time error for {0} milliseconds", strconv.FormatInt(m2._LastTimeTick-currentTimeTick, 10)) } m2._LastTimeTick = currentTimeTick - result := uint64((currentTimeTick << m2._TimestampShift)) + uint64(m2.WorkerId< -1 { + _client.Del(_ctx, _WorkerIdValueKeyPrefix+strconv.Itoa(int(value))) + } + } + _workerIdList = []int32{} + _workerIdLock.Unlock() } -func RegisterWorkerId(ip string, port int, password string, maxWorkerId int) int { - // maxWorkerId不能小于0 +func autoUnRegister() { + // 如果当前已注册过 WorkerId,则先注销,并终止先前的自动续期线程 + if len(_workerIdList) > 0 { + UnRegister() + } +} + +func RegisterMany(ip string, port int32, password string, maxWorkerId int32, totalCount int32) []int32 { if maxWorkerId < 0 { - return -2 + return []int32{-2} } - // 如果当前已注册过 WorkerId,则先注销,并终止先前的自动续期线程 - if _usingWorkerId > -1 { - UnRegisterWorkerId() + if totalCount < 1 { + return []int32{-1} } + autoUnRegister() + _MaxWorkerId = maxWorkerId - _ConnString = ip + ":" + strconv.Itoa(port) - _Password = password + _RedisConnString = ip + ":" + strconv.Itoa(int(port)) + _RedisPassword = password _client = newRedisClient() - defer _client.Close() + if _client == nil { + return []int32{-1} + } + defer func() { + if _client != nil { + _ = _client.Close() + } + }() + //_, err := _client.Ping(_ctx).Result() + //if err != nil { + // //panic("init redis error") + // return []int{-3} + //} else { + // if _Log { + // fmt.Println("init redis ok") + // } + //} + + _lifeIndex++ + _workerIdList = make([]int32, totalCount) + for key := range _workerIdList { + _workerIdList[key] = -1 // 全部初始化-1 + } - _, err := _client.Ping(_ctx).Result() - if err != nil { - panic("init redis error") - } else { - if _Log { - fmt.Println("init redis ok") + useExtendFunc := false + for key := range _workerIdList { + id := register(_lifeIndex) + if id > -1 { + useExtendFunc = true + _workerIdList[key] = id //= append(_workerIdList, id) + } else { + break } } + if useExtendFunc { + go extendLifeTime(_lifeIndex) + } + + return _workerIdList +} + +func RegisterOne(ip string, port int32, password string, maxWorkerId int32) int32 { + if maxWorkerId < 0 { + return -2 + } + + autoUnRegister() + + _MaxWorkerId = maxWorkerId + _RedisConnString = ip + ":" + strconv.Itoa(int(port)) + _RedisPassword = password _loopCount = 0 - return getNextWorkerId() + _client = newRedisClient() + if _client == nil { + return -3 + } + defer func() { + if _client != nil { + _ = _client.Close() + } + }() + //_, err := _client.Ping(_ctx).Result() + //if err != nil { + // // panic("init redis error") + // return -3 + //} else { + // if _Log { + // fmt.Println("init redis ok") + // } + //} + + _lifeIndex++ + var id = register(_lifeIndex) + if id > -1 { + _workerIdList = []int32{id} + go extendLifeTime(_lifeIndex) + } + + return id +} + +func register(lifeTime int) int32 { + _loopCount = 0 + return getNextWorkerId(lifeTime) } func newRedisClient() *redis.Client { return redis.NewClient(&redis.Options{ - Addr: _ConnString, - Password: _Password, + Addr: _RedisConnString, + Password: _RedisPassword, DB: 0, //PoolSize: 1000, //ReadTimeout: time.Millisecond * time.Duration(100), @@ -93,16 +182,16 @@ func newRedisClient() *redis.Client { }) } -func getNextWorkerId() int { +func getNextWorkerId(lifeTime int) int32 { // 获取当前 WorkerIdIndex r, err := _client.Incr(_ctx, _WorkerIdIndexKey).Result() if err != nil { - return 0 + return -1 } - candidateId := int(r) + candidateId := int32(r) if _Log { - fmt.Println("Begin candidateId:" + strconv.Itoa(candidateId)) + fmt.Println("Begin candidateId:" + strconv.Itoa(int(candidateId))) } // 如果 candidateId 大于最大值,则重置 @@ -128,7 +217,7 @@ func getNextWorkerId() int { fmt.Println("canReset loop") } - return getNextWorkerId() + return getNextWorkerId(lifeTime) } else { // 如果有其它应用正在编辑,则本应用暂停200ms后,再继续 time.Sleep(time.Duration(200) * time.Millisecond) @@ -137,46 +226,45 @@ func getNextWorkerId() int { fmt.Println("not canReset loop") } - return getNextWorkerId() + return getNextWorkerId(lifeTime) } } if _Log { - fmt.Println("candidateId:" + strconv.Itoa(candidateId)) + fmt.Println("candidateId:" + strconv.Itoa(int(candidateId))) } if isAvailable(candidateId) { if _Log { - fmt.Println("AA: isAvailable:" + strconv.Itoa(candidateId)) + fmt.Println("AA: isAvailable:" + strconv.Itoa(int(candidateId))) } // 最新获得的 WorkerIdIndex,在 redis 中是可用状态 setWorkerIdFlag(candidateId) - _usingWorkerId = candidateId _loopCount = 0 // 获取到可用 WorkerId 后,启用新线程,每隔 1/3个 _WorkerIdLifeTimeSeconds 时间,向服务器续期(延长一次 LifeTime) - _lifeIndex++ - go extendWorkerIdLifeTime(_lifeIndex) + // go extendWorkerIdLifeTime(lifeTime, candidateId) return candidateId } else { if _Log { - fmt.Println("BB: not isAvailable:" + strconv.Itoa(candidateId)) + fmt.Println("BB: not isAvailable:" + strconv.Itoa(int(candidateId))) } // 最新获得的 WorkerIdIndex,在 redis 中是不可用状态,则继续下一个 WorkerIdIndex - return getNextWorkerId() + return getNextWorkerId(lifeTime) } } -func extendWorkerIdLifeTime(lifeIndex int) { +func extendLifeTime(lifeIndex int) { + // 获取到可用 WorkerId 后,启用新线程,每隔 1/3个 _WorkerIdLifeTimeSeconds 时间,向服务器续期(延长一次 LifeTime) var myLifeIndex = lifeIndex // 循环操作:间隔一定时间,刷新 WorkerId 在 redis 中的有效时间。 for { time.Sleep(time.Duration(_WorkerIdLifeTimeSeconds/3) * time.Second) - // 上锁操作,防止跟 UnRegisterWorkerId 操作重叠 + // 上锁操作,防止跟 UnRegister 操作重叠 _workerIdLock.Lock() // 如果临时变量 myLifeIndex 不等于 全局变量 _lifeIndex,表明全局状态被修改,当前线程可终止,不应继续操作 redis @@ -185,12 +273,44 @@ func extendWorkerIdLifeTime(lifeIndex int) { } // 已经被注销,则终止(此步是上一步的二次验证) - if _usingWorkerId < 0 { + if len(_workerIdList) < 1 { + break + } + + // 延长 redis 数据有效期 + for _, value := range _workerIdList { + if value > -1 { + extendWorkerIdFlag(value) + } + } + + _workerIdLock.Unlock() + } +} + +func extendWorkerIdLifeTime(lifeIndex int, workerId int32) { + var myLifeIndex = lifeIndex + var myWorkerId = workerId + + // 循环操作:间隔一定时间,刷新 WorkerId 在 redis 中的有效时间。 + for { + time.Sleep(time.Duration(_WorkerIdLifeTimeSeconds/3) * time.Second) + + // 上锁操作,防止跟 UnRegister 操作重叠 + _workerIdLock.Lock() + + // 如果临时变量 myLifeIndex 不等于 全局变量 _lifeIndex,表明全局状态被修改,当前线程可终止,不应继续操作 redis + if myLifeIndex != _lifeIndex { break } + // 已经被注销,则终止(此步是上一步的二次验证) + //if _usingWorkerId < 0 { + // break + //} + // 延长 redis 数据有效期 - extendWorkerIdFlag(_usingWorkerId) + extendWorkerIdFlag(myWorkerId) _workerIdLock.Unlock() } @@ -212,14 +332,22 @@ func setWorkerIdIndex(val int) { _client.Set(_ctx, _WorkerIdIndexKey, val, 0) } -func setWorkerIdFlag(index int) { - _client.Set(_ctx, _WorkerIdValueKeyPrefix+strconv.Itoa(index), _WorkerIdFlag, time.Duration(_WorkerIdLifeTimeSeconds)*time.Second) +func setWorkerIdFlag(workerId int32) { + _client.Set(_ctx, _WorkerIdValueKeyPrefix+strconv.Itoa(int(workerId)), _WorkerIdFlag, time.Duration(_WorkerIdLifeTimeSeconds)*time.Second) } -func extendWorkerIdFlag(index int) { +func extendWorkerIdFlag(workerId int32) { var client = newRedisClient() - defer client.Close() - client.Expire(_ctx, _WorkerIdValueKeyPrefix+strconv.Itoa(index), time.Duration(_WorkerIdLifeTimeSeconds)*time.Second) + if client == nil { + return + } + defer func() { + if client != nil { + _ = client.Close() + } + }() + + client.Expire(_ctx, _WorkerIdValueKeyPrefix+strconv.Itoa(int(workerId)), time.Duration(_WorkerIdLifeTimeSeconds)*time.Second) } func canReset() bool { @@ -229,7 +357,7 @@ func canReset() bool { } if _Log { - fmt.Println("canReset:" + string(r)) + fmt.Println("canReset:" + strconv.Itoa(int(r))) } return r != 1 @@ -240,16 +368,16 @@ func endReset() { _client.Set(_ctx, _WorkerIdValueKeyPrefix+"Edit", 0, 0) } -func getWorkerIdFlag(index int) (string, bool) { - r, err := _client.Get(_ctx, _WorkerIdValueKeyPrefix+strconv.Itoa(index)).Result() +func getWorkerIdFlag(workerId int32) (string, bool) { + r, err := _client.Get(_ctx, _WorkerIdValueKeyPrefix+strconv.Itoa(int(workerId))).Result() if err != nil { return "", false } return r, true } -func isAvailable(index int) bool { - r, err := _client.Get(_ctx, _WorkerIdValueKeyPrefix+strconv.Itoa(index)).Result() +func isAvailable(workerId int32) bool { + r, err := _client.Get(_ctx, _WorkerIdValueKeyPrefix+strconv.Itoa(int(workerId))).Result() if _Log { fmt.Println("XX isAvailable:" + r)