diff --git a/pkgs/distlock/internal/acquire_actor.go b/pkgs/distlock/internal/acquire_actor.go index ce161b1..35fd1a0 100644 --- a/pkgs/distlock/internal/acquire_actor.go +++ b/pkgs/distlock/internal/acquire_actor.go @@ -29,17 +29,19 @@ type AcquireActor struct { etcdCli *clientv3.Client providersActor *ProvidersActor - isMaintenance bool - serviceID string - acquirings []*acquireInfo - lock sync.Mutex + isMaintenance bool + serviceID string + acquirings []*acquireInfo + lock sync.Mutex + doAcquiringChan chan any } func NewAcquireActor(cfg *Config, etcdCli *clientv3.Client) *AcquireActor { return &AcquireActor{ - cfg: cfg, - etcdCli: etcdCli, - isMaintenance: true, + cfg: cfg, + etcdCli: etcdCli, + isMaintenance: true, + doAcquiringChan: make(chan any), } } @@ -65,10 +67,9 @@ func (a *AcquireActor) Acquire(ctx context.Context, req LockRequest) (string, er return } - // TODO 处理错误 - err := a.doAcquiring() - if err != nil { - logger.Std.Debugf("doing acquiring: %s", err.Error()) + select { + case a.doAcquiringChan <- nil: + default: } }() @@ -106,9 +107,9 @@ func (a *AcquireActor) TryAcquireNow() { return } - err := a.doAcquiring() - if err != nil { - logger.Std.Debugf("doing acquiring: %s", err.Error()) + select { + case a.doAcquiringChan <- nil: + default: } }() } @@ -136,13 +137,30 @@ func (a *AcquireActor) ResetState(serviceID string) { a.serviceID = serviceID } +func (a *AcquireActor) Serve() { + for { + select { + case <-a.doAcquiringChan: + err := a.doAcquiring() + if err != nil { + logger.Std.Debugf("doing acquiring: %s", err.Error()) + } + } + } +} + func (a *AcquireActor) doAcquiring() error { ctx := context.Background() + // 先看一眼,如果没有需要请求的锁,就不用走后面的流程了 + a.lock.Lock() if len(a.acquirings) == 0 { + a.lock.Unlock() return nil } + a.lock.Unlock() + // 在获取全局锁的时候不用锁Actor,只有获取成功了,才加锁 // TODO 根据不同的错误设置不同的错误类型,方便上层进行后续处理 unlock, err := acquireEtcdRequestDataLock(ctx, a.etcdCli, a.cfg.EtcdLockLeaseTimeSec) if err != nil { @@ -155,6 +173,8 @@ func (a *AcquireActor) doAcquiring() error { return err } + logger.Std.Infof("wait to: %d", index) + // 等待本地状态同步到最新 // TODO 配置等待时间 err = a.providersActor.WaitLocalIndexTo(ctx, index) @@ -162,12 +182,15 @@ func (a *AcquireActor) doAcquiring() error { return err } + a.lock.Lock() + defer a.lock.Unlock() // TODO 可以考虑一次性获得多个锁 for i := 0; i < len(a.acquirings); i++ { req := a.acquirings[i] // 测试锁,并获得锁数据 reqData, err := a.providersActor.TestLockRequestAndMakeData(req.Request) + logger.Std.Infof("6") if err != nil { req.LastErr = err continue diff --git a/pkgs/distlock/internal/providers_actor.go b/pkgs/distlock/internal/providers_actor.go index 149a842..c2cd6d6 100644 --- a/pkgs/distlock/internal/providers_actor.go +++ b/pkgs/distlock/internal/providers_actor.go @@ -56,30 +56,23 @@ func (a *ProvidersActor) WaitLocalIndexTo(ctx context.Context, index int64) erro } func (a *ProvidersActor) OnLockRequestEvent(evt LockRequestEvent) error { - err := func() error { - a.lock.Lock() - defer a.lock.Unlock() - - if evt.IsLocking { - err := a.lockLockRequest(evt.Data) - if err != nil { - return fmt.Errorf("applying locking event: %w", err) - } + a.lock.Lock() + defer a.lock.Unlock() - } else { - err := a.unlockLockRequest(evt.Data) - if err != nil { - return fmt.Errorf("applying unlocking event: %w", err) - } + if evt.IsLocking { + err := a.lockLockRequest(evt.Data) + if err != nil { + return fmt.Errorf("applying locking event: %w", err) } - a.localLockReqIndex++ - return nil - }() - if err != nil { - return err + } else { + err := a.unlockLockRequest(evt.Data) + if err != nil { + return fmt.Errorf("applying unlocking event: %w", err) + } } + a.localLockReqIndex++ // 检查是否有等待同步进度的需求 a.wakeUpIndexWaiter() return nil diff --git a/pkgs/distlock/internal/release_actor.go b/pkgs/distlock/internal/release_actor.go index 7cac546..72e9145 100644 --- a/pkgs/distlock/internal/release_actor.go +++ b/pkgs/distlock/internal/release_actor.go @@ -27,6 +27,7 @@ type ReleaseActor struct { releasingLockRequestIDs map[string]bool timer *time.Timer timerSetup bool + doReleasingChan chan any } func NewReleaseActor(cfg *Config, etcdCli *clientv3.Client) *ReleaseActor { @@ -35,6 +36,7 @@ func NewReleaseActor(cfg *Config, etcdCli *clientv3.Client) *ReleaseActor { etcdCli: etcdCli, isMaintenance: true, releasingLockRequestIDs: make(map[string]bool), + doReleasingChan: make(chan any), } } @@ -51,13 +53,10 @@ func (a *ReleaseActor) Release(reqIDs []string) { return } - // TODO 处理错误 - err := a.doReleasing() - if err != nil { - logger.Std.Debugf("doing releasing: %s", err.Error()) + select { + case a.doReleasingChan <- nil: + default: } - - a.setupTimer() } // 延迟释放锁。一般用于清理崩溃的锁服务遗留下来的锁 @@ -86,13 +85,10 @@ func (a *ReleaseActor) TryReleaseNow() { return } - // TODO 处理错误 - err := a.doReleasing() - if err != nil { - logger.Std.Debugf("doing releasing: %s", err.Error()) + select { + case a.doReleasingChan <- nil: + default: } - - a.setupTimer() } // 进入维护模式。在维护模式期间只接受请求,不处理请求,包括延迟释放请求。 @@ -112,21 +108,41 @@ func (a *ReleaseActor) LeaveMaintenance() { } func (a *ReleaseActor) OnLockRequestEvent(event LockRequestEvent) { + if event.IsLocking { + return + } + a.lock.Lock() defer a.lock.Unlock() - if !event.IsLocking { - delete(a.releasingLockRequestIDs, event.Data.ID) + delete(a.releasingLockRequestIDs, event.Data.ID) +} + +func (a *ReleaseActor) Serve() { + for { + select { + case <-a.doReleasingChan: + err := a.doReleasing() + if err != nil { + logger.Std.Debugf("doing releasing: %s", err.Error()) + } + } } } func (a *ReleaseActor) doReleasing() error { - ctx := context.TODO() + ctx, cancel := context.WithTimeout(context.Background(), time.Second*5) + defer cancel() + // 先看一眼,如果没有需要释放的锁,就不用走后面的流程了 + a.lock.Lock() if len(a.releasingLockRequestIDs) == 0 { + a.lock.Unlock() return nil } + a.lock.Unlock() + // 在获取全局锁的时候不用锁Actor,只有获取成功了,才加锁 // TODO 根据不同的错误设置不同的错误类型,方便上层进行后续处理 unlock, err := acquireEtcdRequestDataLock(ctx, a.etcdCli, a.cfg.EtcdLockLeaseTimeSec) if err != nil { @@ -139,6 +155,10 @@ func (a *ReleaseActor) doReleasing() error { return err } + a.lock.Lock() + defer a.lock.Unlock() + defer a.setupTimer() + // TODO 可以考虑优化成一次性删除多个锁 for id := range a.releasingLockRequestIDs { lockReqKey := MakeEtcdLockRequestKey(id) @@ -195,12 +215,9 @@ func (a *ReleaseActor) setupTimer() { return } - // TODO 处理错误 - err := a.doReleasing() - if err != nil { - logger.Std.Debugf("doing releasing: %s", err.Error()) + select { + case a.doReleasingChan <- nil: + default: } - - a.setupTimer() }() } diff --git a/pkgs/distlock/internal/service_info_actor.go b/pkgs/distlock/internal/service_info_actor.go index c00354d..a940574 100644 --- a/pkgs/distlock/internal/service_info_actor.go +++ b/pkgs/distlock/internal/service_info_actor.go @@ -7,6 +7,7 @@ import ( "sync" "github.com/google/uuid" + "gitlink.org.cn/cloudream/common/pkgs/logger" mylo "gitlink.org.cn/cloudream/common/utils/lo" "gitlink.org.cn/cloudream/common/utils/serder" clientv3 "go.etcd.io/etcd/client/v3" @@ -20,14 +21,15 @@ type serviceStatus struct { } type ServiceInfoActor struct { - cfg *Config - etcdCli *clientv3.Client - releaseActor *ReleaseActor - - lock sync.Mutex - selfInfo ServiceInfo - leaseID *clientv3.LeaseID - services map[string]*serviceStatus + cfg *Config + etcdCli *clientv3.Client + + lock sync.Mutex + selfInfo ServiceInfo + leaseID *clientv3.LeaseID + leaseKeepAlive chan any + services map[string]*serviceStatus + releaseActor *ReleaseActor } func NewServiceInfoActor(cfg *Config, etcdCli *clientv3.Client, baseSelfInfo ServiceInfo) *ServiceInfoActor { @@ -38,6 +40,10 @@ func NewServiceInfoActor(cfg *Config, etcdCli *clientv3.Client, baseSelfInfo Ser } } +func (a *ServiceInfoActor) Init(releaseActor *ReleaseActor) { + a.releaseActor = releaseActor +} + func (a *ServiceInfoActor) GetSelfInfo() *ServiceInfo { return &a.selfInfo } @@ -48,6 +54,7 @@ func (a *ServiceInfoActor) ResetState(ctx context.Context, currentServices []Ser if a.leaseID != nil { a.etcdCli.Revoke(ctx, *a.leaseID) + close(a.leaseKeepAlive) a.leaseID = nil } @@ -63,9 +70,36 @@ func (a *ServiceInfoActor) ResetState(ctx context.Context, currentServices []Ser if err != nil { return nil, fmt.Errorf("granting lease: %w", err) } - a.leaseID = &lease.ID + keepAliveChan, err := a.etcdCli.Lease.KeepAlive(context.Background(), lease.ID) + if err != nil { + a.etcdCli.Revoke(ctx, lease.ID) + return nil, fmt.Errorf("starting keep lease alive: %w", err) + } + a.leaseKeepAlive = make(chan any) + + go func() { + for { + select { + case _, ok := <-keepAliveChan: + if !ok { + logger.Std.Warnf("lease keep alive channel closed, will try to open again") + + var err error + keepAliveChan, err = a.etcdCli.Lease.KeepAlive(context.Background(), lease.ID) + if err != nil { + logger.Std.Warnf("starting keep lease alive: %s", err.Error()) + return + } + } + + case <-a.leaseKeepAlive: + return + } + } + }() + _, err = a.etcdCli.Put(ctx, MakeServiceInfoKey(a.selfInfo.ID), string(infoData), clientv3.WithLease(lease.ID)) if err != nil { a.etcdCli.Revoke(ctx, lease.ID) @@ -79,6 +113,10 @@ func (a *ServiceInfoActor) ResetState(ctx context.Context, currentServices []Ser Info: svc, } } + // 直接添加自己的信息 + a.services[a.selfInfo.ID] = &serviceStatus{ + Info: a.selfInfo, + } // 导入锁信息的过程中可能会发现未注册信息的锁服务的锁,把他们挑出来释放掉 var willReleaseIDs []string @@ -102,10 +140,16 @@ func (a *ServiceInfoActor) OnServiceEvent(evt ServiceEvent) error { // TODO 可以考虑打印一点日志 if evt.IsNew { - a.services[evt.Info.ID] = &serviceStatus{ - Info: evt.Info, + if evt.Info.ID != a.selfInfo.ID { + logger.Std.WithField("ID", evt.Info.ID).Infof("new service up") + a.services[evt.Info.ID] = &serviceStatus{ + Info: evt.Info, + } } + } else { + logger.Std.WithField("ID", evt.Info.ID).Infof("service down, will release all its locks") + status, ok := a.services[evt.Info.ID] if !ok { return nil @@ -130,11 +174,17 @@ func (a *ServiceInfoActor) OnLockRequestEvent(evt LockRequestEvent) { status, ok := a.services[evt.Data.SerivceID] if !ok { - // 加锁的是一个没有注册过的锁服务,可能是因为这个锁服务之前网络发生了波动, - // 在波动期间它注册的信息过期,于是被大家认为服务下线,清理掉了它管理的锁, - // 而在网络恢复回来之后,它还没有意识到自己被认为下线了,于是还在提交锁请求。 - // 为了防止它加了这个锁之后又崩溃,导致的无限锁定,它加的锁我们都直接释放。 - a.releaseActor.Release([]string{evt.Data.ID}) + if evt.IsLocking { + // 加锁的是一个没有注册过的锁服务,可能是因为这个锁服务之前网络发生了波动, + // 在波动期间它注册的信息过期,于是被大家认为服务下线,清理掉了它管理的锁, + // 而在网络恢复回来之后,它还没有意识到自己被认为下线了,于是还在提交锁请求。 + // 为了防止它加了这个锁之后又崩溃,导致的无限锁定,它加的锁我们都直接释放。 + logger.Std.WithField("RequestID", evt.Data.ID). + WithField("ServiceID", evt.Data.SerivceID). + Warnf("the lock request is from an unknow service, will release it") + + a.releaseActor.Release([]string{evt.Data.ID}) + } return } diff --git a/pkgs/distlock/service.go b/pkgs/distlock/service.go index 4c36bac..354373b 100644 --- a/pkgs/distlock/service.go +++ b/pkgs/distlock/service.go @@ -111,6 +111,7 @@ func NewService(cfg *internal.Config, initProvs []PathProvider) (*Service, error svc.cmdChan.Send(func() { svc.doResetState() }) }, ) + svc.serviceInfoActor.Init(svc.releaseActor) for _, prov := range initProvs { svc.providersActor.AddProvider(prov.Provider, prov.Path...) @@ -177,6 +178,10 @@ func (svc *Service) Serve() error { go svc.leaseActor.Serve() + go svc.acquireActor.Serve() + + go svc.releaseActor.Serve() + svc.cmdChan.Send(func() { svc.doResetState() }) cmdChan := svc.cmdChan.BeginChanReceive() @@ -202,7 +207,8 @@ func (svc *Service) doResetState() { svc.cmdChan.Send(func() { svc.doResetState() }) return } - logger.Std.Infof("reset state success") + logger.Std.WithField("ID", svc.serviceInfoActor.GetSelfInfo().ID). + Infof("reset state success") } // ResetState 重置内部状态。注:只要调用到了此函数,无论在哪一步出的错, @@ -279,7 +285,7 @@ func (svc *Service) resetState(ctx context.Context) error { svc.acquireActor.ResetState(svc.serviceInfoActor.GetSelfInfo().ID) // ReleaseActor没有什么需要Reset的状态 - svc.releaseActor.Release(releasingIDs) + svc.releaseActor.DelayRelease(releasingIDs) // 重置完了之后再退出维护模式 svc.watchEtcdActor.Start(txResp.Header.Revision)