diff --git a/pkgs/distlock/internal/acquire_actor.go b/pkgs/distlock/internal/acquire_actor.go index 28e2bb6..ce161b1 100644 --- a/pkgs/distlock/internal/acquire_actor.go +++ b/pkgs/distlock/internal/acquire_actor.go @@ -29,15 +29,17 @@ type AcquireActor struct { etcdCli *clientv3.Client providersActor *ProvidersActor - serviceID string - acquirings []*acquireInfo - lock sync.Mutex + isMaintenance bool + serviceID string + acquirings []*acquireInfo + lock sync.Mutex } func NewAcquireActor(cfg *Config, etcdCli *clientv3.Client) *AcquireActor { return &AcquireActor{ - cfg: cfg, - etcdCli: etcdCli, + cfg: cfg, + etcdCli: etcdCli, + isMaintenance: true, } } @@ -57,6 +59,12 @@ func (a *AcquireActor) Acquire(ctx context.Context, req LockRequest) (string, er defer a.lock.Unlock() a.acquirings = append(a.acquirings, info) + + // 如果处于维护模式,那么只接受请求,不实际去处理 + if a.isMaintenance { + return + } + // TODO 处理错误 err := a.doAcquiring() if err != nil { @@ -93,6 +101,11 @@ func (a *AcquireActor) TryAcquireNow() { a.lock.Lock() defer a.lock.Unlock() + // 处于维护模式中时,即使是主动触发Acqurire也不予理会 + if a.isMaintenance { + return + } + err := a.doAcquiring() if err != nil { logger.Std.Debugf("doing acquiring: %s", err.Error()) @@ -100,19 +113,27 @@ func (a *AcquireActor) TryAcquireNow() { }() } +// 进入维护模式。维护模式期间只接受请求,不处理请求。 +func (a *AcquireActor) EnterMaintenance() { + a.lock.Lock() + defer a.lock.Unlock() + + a.isMaintenance = true +} + +// 退出维护模式。退出之后建议调用一下TryAcquireNow。 +func (a *AcquireActor) LeaveMaintenance() { + a.lock.Lock() + defer a.lock.Unlock() + + a.isMaintenance = false +} + func (a *AcquireActor) ResetState(serviceID string) { a.lock.Lock() defer a.lock.Unlock() a.serviceID = serviceID - for _, info := range a.acquirings { - if info.LastErr != nil { - info.Callback.SetError(info.LastErr) - } else { - info.Callback.SetError(ErrAcquiringTimeout) - } - } - a.acquirings = nil } func (a *AcquireActor) doAcquiring() error { @@ -136,7 +157,7 @@ func (a *AcquireActor) doAcquiring() error { // 等待本地状态同步到最新 // TODO 配置等待时间 - err = a.providersActor.WaitIndexUpdated(ctx, index) + err = a.providersActor.WaitLocalIndexTo(ctx, index) if err != nil { return err } diff --git a/pkgs/distlock/internal/config.go b/pkgs/distlock/internal/config.go index b6219f3..cf97cad 100644 --- a/pkgs/distlock/internal/config.go +++ b/pkgs/distlock/internal/config.go @@ -5,6 +5,7 @@ type Config struct { EtcdUsername string `json:"etcdUsername"` EtcdPassword string `json:"etcdPassword"` - EtcdLockLeaseTimeSec int64 `json:"etcdLockLeaseTimeSec"` // 全局锁的租约时间。锁服务会在这个时间内自动续约锁,但如果服务崩溃,则其他服务在租约到期后能重新获得锁。 - RandomReleasingDelayMs int64 `json:"randomReleasingDelayMs"` // 释放锁失败,随机延迟之后再次尝试。延迟时间=random(0, RandomReleasingDelayMs) + 最少延迟时间(1000ms) + EtcdLockLeaseTimeSec int64 `json:"etcdLockLeaseTimeSec"` // 全局锁的租约时间。锁服务会在这个时间内自动续约锁,但如果服务崩溃,则其他服务在租约到期后能重新获得锁。 + RandomReleasingDelayMs int64 `json:"randomReleasingDelayMs"` // 释放锁失败,随机延迟之后再次尝试。延迟时间=random(0, RandomReleasingDelayMs) + 最少延迟时间(1000ms) + ServiceDescription string `json:"serviceDescription"` // 锁服务描述信息,锁服务启动后会注册到Etcd中 } diff --git a/pkgs/distlock/internal/lease_actor.go b/pkgs/distlock/internal/lease_actor.go index 5b6045f..41c6800 100644 --- a/pkgs/distlock/internal/lease_actor.go +++ b/pkgs/distlock/internal/lease_actor.go @@ -91,26 +91,14 @@ func (a *LeaseActor) Remove(reqID string) error { }) } -func (a *LeaseActor) ResetState() { - actor.Wait(context.Background(), a.commandChan, func() error { - a.leases = make(map[string]*lockRequestLease) - return nil - }) -} - -func (a *LeaseActor) Serve() error { +func (a *LeaseActor) Serve() { cmdChan := a.commandChan.BeginChanReceive() defer a.commandChan.CloseChanReceive() for { if a.ticker != nil { select { - case cmd, ok := <-cmdChan: - if !ok { - a.ticker.Stop() - return fmt.Errorf("command chan closed") - } - + case cmd := <-cmdChan: cmd() case now := <-a.ticker.C: @@ -127,11 +115,7 @@ func (a *LeaseActor) Serve() error { } } else { select { - case cmd, ok := <-cmdChan: - if !ok { - return fmt.Errorf("command chan closed") - } - + case cmd := <-cmdChan: cmd() } } diff --git a/pkgs/distlock/internal/models.go b/pkgs/distlock/internal/models.go index 08eb593..808cbad 100644 --- a/pkgs/distlock/internal/models.go +++ b/pkgs/distlock/internal/models.go @@ -72,5 +72,6 @@ func MakeServiceInfoKey(svcID string) string { } type ServiceInfo struct { - ID string `json:"id"` + ID string `json:"id"` + Description string `json:"description"` } diff --git a/pkgs/distlock/internal/providers_actor.go b/pkgs/distlock/internal/providers_actor.go index c9f426c..149a842 100644 --- a/pkgs/distlock/internal/providers_actor.go +++ b/pkgs/distlock/internal/providers_actor.go @@ -7,7 +7,6 @@ import ( "sync" "gitlink.org.cn/cloudream/common/pkgs/future" - "gitlink.org.cn/cloudream/common/pkgs/logger" "gitlink.org.cn/cloudream/common/pkgs/trie" ) @@ -39,7 +38,7 @@ func (a *ProvidersActor) AddProvider(prov LockProvider, path ...any) { func (a *ProvidersActor) Init() { } -func (a *ProvidersActor) WaitIndexUpdated(ctx context.Context, index int64) error { +func (a *ProvidersActor) WaitLocalIndexTo(ctx context.Context, index int64) error { fut := future.NewSetVoid() a.lock.Lock() @@ -56,32 +55,34 @@ func (a *ProvidersActor) WaitIndexUpdated(ctx context.Context, index int64) erro return fut.Wait(ctx) } -func (a *ProvidersActor) OnLockRequestEvent(evt LockRequestEvent) { - func() { +func (a *ProvidersActor) OnLockRequestEvent(evt LockRequestEvent) error { + err := func() error { a.lock.Lock() defer a.lock.Unlock() if evt.IsLocking { err := a.lockLockRequest(evt.Data) if err != nil { - // TODO 发生这种错误需要重新加载全量状态,下同 - logger.Std.Warnf("applying locking event: %s", err.Error()) - return + return fmt.Errorf("applying locking event: %w", err) } } else { err := a.unlockLockRequest(evt.Data) if err != nil { - logger.Std.Warnf("applying unlocking event: %s", err.Error()) - return + return fmt.Errorf("applying unlocking event: %w", err) } } a.localLockReqIndex++ + return nil }() + if err != nil { + return err + } // 检查是否有等待同步进度的需求 a.wakeUpIndexWaiter() + return nil } func (svc *ProvidersActor) lockLockRequest(reqData LockRequestData) error { diff --git a/pkgs/distlock/internal/release_actor.go b/pkgs/distlock/internal/release_actor.go index fea93a9..7cac546 100644 --- a/pkgs/distlock/internal/release_actor.go +++ b/pkgs/distlock/internal/release_actor.go @@ -22,16 +22,18 @@ type ReleaseActor struct { cfg *Config etcdCli *clientv3.Client + lock sync.Mutex + isMaintenance bool releasingLockRequestIDs map[string]bool timer *time.Timer timerSetup bool - lock sync.Mutex } func NewReleaseActor(cfg *Config, etcdCli *clientv3.Client) *ReleaseActor { return &ReleaseActor{ cfg: cfg, etcdCli: etcdCli, + isMaintenance: true, releasingLockRequestIDs: make(map[string]bool), } } @@ -45,6 +47,10 @@ func (a *ReleaseActor) Release(reqIDs []string) { a.releasingLockRequestIDs[id] = true } + if a.isMaintenance { + return + } + // TODO 处理错误 err := a.doReleasing() if err != nil { @@ -63,21 +69,48 @@ func (a *ReleaseActor) DelayRelease(reqIDs []string) { a.releasingLockRequestIDs[id] = true } + if a.isMaintenance { + return + } + a.setupTimer() } -func (a *ReleaseActor) ResetState(reqIDs []string) { +// 重试一下内部的解锁请求。不会阻塞调用者 +func (a *ReleaseActor) TryReleaseNow() { a.lock.Lock() defer a.lock.Unlock() - a.releasingLockRequestIDs = make(map[string]bool) - for _, id := range reqIDs { - a.releasingLockRequestIDs[id] = true + // 如果处于维护模式,那么即使主动进行释放操作,也不予理会 + if a.isMaintenance { + return + } + + // TODO 处理错误 + err := a.doReleasing() + if err != nil { + logger.Std.Debugf("doing releasing: %s", err.Error()) } a.setupTimer() } +// 进入维护模式。在维护模式期间只接受请求,不处理请求,包括延迟释放请求。 +func (a *ReleaseActor) EnterMaintenance() { + a.lock.Lock() + defer a.lock.Unlock() + + a.isMaintenance = true +} + +// 退出维护模式。退出之后建议调用一下TryReleaseNow。 +func (a *ReleaseActor) LeaveMaintenance() { + a.lock.Lock() + defer a.lock.Unlock() + + a.isMaintenance = false +} + func (a *ReleaseActor) OnLockRequestEvent(event LockRequestEvent) { a.lock.Lock() defer a.lock.Unlock() @@ -157,6 +190,11 @@ func (a *ReleaseActor) setupTimer() { a.timerSetup = false + // 如果处于维护模式,那么即使是定时器要求的释放操作,也不予理会 + if a.isMaintenance { + return + } + // TODO 处理错误 err := a.doReleasing() if err != nil { diff --git a/pkgs/distlock/internal/service_info_actor.go b/pkgs/distlock/internal/service_info_actor.go index baad890..c00354d 100644 --- a/pkgs/distlock/internal/service_info_actor.go +++ b/pkgs/distlock/internal/service_info_actor.go @@ -2,6 +2,7 @@ package internal import ( "context" + "errors" "fmt" "sync" @@ -11,6 +12,8 @@ import ( clientv3 "go.etcd.io/etcd/client/v3" ) +var ErrSelfServiceDown = errors.New("self service is down, need to restart") + type serviceStatus struct { Info ServiceInfo LockRequestIDs []string @@ -27,10 +30,11 @@ type ServiceInfoActor struct { services map[string]*serviceStatus } -func NewServiceInfoActor(cfg *Config, etcdCli *clientv3.Client) *ServiceInfoActor { +func NewServiceInfoActor(cfg *Config, etcdCli *clientv3.Client, baseSelfInfo ServiceInfo) *ServiceInfoActor { return &ServiceInfoActor{ - cfg: cfg, - etcdCli: etcdCli, + cfg: cfg, + etcdCli: etcdCli, + selfInfo: baseSelfInfo, } } @@ -91,7 +95,7 @@ func (a *ServiceInfoActor) ResetState(ctx context.Context, currentServices []Ser return willReleaseIDs, nil } -func (a *ServiceInfoActor) OnServiceEvent(evt ServiceEvent) { +func (a *ServiceInfoActor) OnServiceEvent(evt ServiceEvent) error { a.lock.Lock() defer a.lock.Unlock() @@ -104,15 +108,20 @@ func (a *ServiceInfoActor) OnServiceEvent(evt ServiceEvent) { } else { status, ok := a.services[evt.Info.ID] if !ok { - return + return nil } a.releaseActor.DelayRelease(status.LockRequestIDs) delete(a.services, evt.Info.ID) - // TODO 处理收到自己崩溃的消息 + // 如果收到的被删除服务信息是自己的,那么自己要重启,重新获取全量数据 + if evt.Info.ID == a.selfInfo.ID { + return ErrSelfServiceDown + } } + + return nil } func (a *ServiceInfoActor) OnLockRequestEvent(evt LockRequestEvent) { @@ -121,8 +130,9 @@ func (a *ServiceInfoActor) OnLockRequestEvent(evt LockRequestEvent) { status, ok := a.services[evt.Data.SerivceID] if !ok { - // 加锁的是一个没有注册过的锁服务,大概率是因为这个锁服务之前网络发生了波动, - // 在波动期间它注册的信息过期,于是被当前的服务删除了。 + // 加锁的是一个没有注册过的锁服务,可能是因为这个锁服务之前网络发生了波动, + // 在波动期间它注册的信息过期,于是被大家认为服务下线,清理掉了它管理的锁, + // 而在网络恢复回来之后,它还没有意识到自己被认为下线了,于是还在提交锁请求。 // 为了防止它加了这个锁之后又崩溃,导致的无限锁定,它加的锁我们都直接释放。 a.releaseActor.Release([]string{evt.Data.ID}) return diff --git a/pkgs/distlock/internal/watch_etcd_actor.go b/pkgs/distlock/internal/watch_etcd_actor.go index 5928da6..254ff17 100644 --- a/pkgs/distlock/internal/watch_etcd_actor.go +++ b/pkgs/distlock/internal/watch_etcd_actor.go @@ -24,6 +24,8 @@ type OnLockRequestEventFn func(event LockRequestEvent) type OnServiceEventFn func(event ServiceEvent) +type OnWatchFailedFn func(err error) + type WatchEtcdActor struct { etcdCli *clientv3.Client @@ -31,6 +33,7 @@ type WatchEtcdActor struct { watchChanCancel func() onLockRequestEventFn OnLockRequestEventFn onServiceEventFn OnServiceEventFn + onWatchFailedFn OnWatchFailedFn commandChan *actor.CommandChannel } @@ -41,9 +44,10 @@ func NewWatchEtcdActor(etcdCli *clientv3.Client) *WatchEtcdActor { } } -func (a *WatchEtcdActor) Init(onLockRequestEvent OnLockRequestEventFn, onServiceDown OnServiceEventFn) { +func (a *WatchEtcdActor) Init(onLockRequestEvent OnLockRequestEventFn, onServiceDown OnServiceEventFn, onWatchFailed OnWatchFailedFn) { a.onLockRequestEventFn = onLockRequestEvent a.onServiceEventFn = onServiceDown + a.onWatchFailedFn = onWatchFailed } func (a *WatchEtcdActor) Start(revision int64) { @@ -71,40 +75,37 @@ func (a *WatchEtcdActor) Stop() { }) } -func (a *WatchEtcdActor) Serve() error { +func (a *WatchEtcdActor) Serve() { cmdChan := a.commandChan.BeginChanReceive() defer a.commandChan.CloseChanReceive() for { if a.watchChan != nil { select { - case cmd, ok := <-cmdChan: - if !ok { - return fmt.Errorf("command channel closed") - } - + case cmd := <-cmdChan: cmd() case msg := <-a.watchChan: + // 只要发生错误,就停止监听,通知外部处理 if msg.Canceled { - // TODO 更好的错误处理 - return fmt.Errorf("watch etcd channel closed") + a.onWatchFailedFn(fmt.Errorf("watch etcd channel closed")) + a.watchChanCancel() + a.watchChan = nil + continue } err := a.dispatchEtcdEvent(msg) if err != nil { - // TODO 更好的错误处理 - return err + a.onWatchFailedFn(err) + a.watchChanCancel() + a.watchChan = nil + continue } } } else { select { - case cmd, ok := <-cmdChan: - if !ok { - return fmt.Errorf("command channel closed") - } - + case cmd := <-cmdChan: cmd() } } diff --git a/pkgs/distlock/service.go b/pkgs/distlock/service.go index ec08875..4c36bac 100644 --- a/pkgs/distlock/service.go +++ b/pkgs/distlock/service.go @@ -6,6 +6,7 @@ import ( "strconv" "time" + "gitlink.org.cn/cloudream/common/pkgs/actor" "gitlink.org.cn/cloudream/common/pkgs/distlock/internal" "gitlink.org.cn/cloudream/common/pkgs/logger" "gitlink.org.cn/cloudream/common/utils/serder" @@ -47,6 +48,7 @@ type Service struct { cfg *internal.Config etcdCli *clientv3.Client + cmdChan *actor.CommandChannel acquireActor *internal.AcquireActor releaseActor *internal.ReleaseActor providersActor *internal.ProvidersActor @@ -66,42 +68,55 @@ func NewService(cfg *internal.Config, initProvs []PathProvider) (*Service, error return nil, fmt.Errorf("new etcd client failed, err: %w", err) } - acquireActor := internal.NewAcquireActor(cfg, etcdCli) - releaseActor := internal.NewReleaseActor(cfg, etcdCli) - providersActor := internal.NewProvidersActor() - watchEtcdActor := internal.NewWatchEtcdActor(etcdCli) - leaseActor := internal.NewLeaseActor() - serviceInfoActor := internal.NewServiceInfoActor(cfg, etcdCli) - - acquireActor.Init(providersActor) - leaseActor.Init(releaseActor) - providersActor.Init() - watchEtcdActor.Init( + svc := &Service{ + cfg: cfg, + etcdCli: etcdCli, + cmdChan: actor.NewCommandChannel(), + } + + svc.acquireActor = internal.NewAcquireActor(cfg, etcdCli) + svc.releaseActor = internal.NewReleaseActor(cfg, etcdCli) + svc.providersActor = internal.NewProvidersActor() + svc.watchEtcdActor = internal.NewWatchEtcdActor(etcdCli) + svc.leaseActor = internal.NewLeaseActor() + svc.serviceInfoActor = internal.NewServiceInfoActor(cfg, etcdCli, internal.ServiceInfo{ + Description: cfg.ServiceDescription, + }) + + svc.acquireActor.Init(svc.providersActor) + svc.leaseActor.Init(svc.releaseActor) + svc.providersActor.Init() + svc.watchEtcdActor.Init( func(event internal.LockRequestEvent) { - providersActor.OnLockRequestEvent(event) - acquireActor.TryAcquireNow() - releaseActor.OnLockRequestEvent(event) - serviceInfoActor.OnLockRequestEvent(event) + err := svc.providersActor.OnLockRequestEvent(event) + if err != nil { + logger.Std.Warnf("%s, will reset service state", err.Error()) + svc.cmdChan.Send(func() { svc.doResetState() }) + return + } + + svc.acquireActor.TryAcquireNow() + svc.releaseActor.OnLockRequestEvent(event) + svc.serviceInfoActor.OnLockRequestEvent(event) }, func(event internal.ServiceEvent) { - serviceInfoActor.OnServiceEvent(event) + err := svc.serviceInfoActor.OnServiceEvent(event) + if err != nil { + logger.Std.Warnf("%s, will reset service state", err.Error()) + svc.cmdChan.Send(func() { svc.doResetState() }) + } + }, + func(err error) { + logger.Std.Warnf("%s, will reset service state", err.Error()) + svc.cmdChan.Send(func() { svc.doResetState() }) }, ) for _, prov := range initProvs { - providersActor.AddProvider(prov.Provider, prov.Path...) + svc.providersActor.AddProvider(prov.Provider, prov.Path...) } - return &Service{ - cfg: cfg, - etcdCli: etcdCli, - acquireActor: acquireActor, - releaseActor: releaseActor, - providersActor: providersActor, - watchEtcdActor: watchEtcdActor, - leaseActor: leaseActor, - serviceInfoActor: serviceInfoActor, - }, nil + return svc, nil } // Acquire 请求一批锁。成功后返回锁请求ID @@ -158,39 +173,48 @@ func (svc *Service) Serve() error { // 1. client退出时直接中断进程,此时AcquireActor可能正在进行重试,于是导致Etcd锁没有解除就退出了进程。 // 虽然由于租约的存在不会导致系统长期卡死,但会影响client的使用 - go func() { - // TODO 处理错误 - err := svc.watchEtcdActor.Serve() - if err != nil { - logger.Std.Warnf("serving watch etcd actor actor failed, err: %s", err.Error()) - } - }() + go svc.watchEtcdActor.Serve() - go func() { - // TODO 处理错误 - err := svc.leaseActor.Serve() - if err != nil { - logger.Std.Warnf("serving lease actor failed, err: %s", err.Error()) + go svc.leaseActor.Serve() + + svc.cmdChan.Send(func() { svc.doResetState() }) + + cmdChan := svc.cmdChan.BeginChanReceive() + defer svc.cmdChan.CloseChanReceive() + + for { + select { + case cmd := <-cmdChan: + cmd() } - }() + } + return nil +} + +func (svc *Service) doResetState() { + logger.Std.Infof("start reset state") // TODO context err := svc.resetState(context.Background()) if err != nil { - // TODO 关闭其他的Actor,或者更好的错误处理方式 - return fmt.Errorf("init data failed, err: %w", err) + logger.Std.Warnf("reseting state: %s, will try again after 3 seconds", err.Error()) + <-time.After(time.Second * 3) + svc.cmdChan.Send(func() { svc.doResetState() }) + return } - - // TODO 防止退出的临时解决办法 - ch := make(chan any) - <-ch - - return nil + logger.Std.Infof("reset state success") } // ResetState 重置内部状态。注:只要调用到了此函数,无论在哪一步出的错, -// 都要将内部状态视为已被破坏,直到成功调用了此函数才能继续后面的步骤 +// 都要将内部状态视为已被破坏,直到成功调用了此函数才能继续后面的步骤。 +// 如果调用失败,服务将进入维护模式,届时可以接受请求,但不会处理请求,直到调用成功为止。 func (svc *Service) resetState(ctx context.Context) error { + // 让服务都进入维护模式 + svc.watchEtcdActor.Stop() + svc.leaseActor.Stop() + svc.acquireActor.EnterMaintenance() + svc.releaseActor.EnterMaintenance() + // 必须使用事务一次性获取所有数据 txResp, err := svc.etcdCli.Txn(ctx). Then( @@ -240,29 +264,31 @@ func (svc *Service) resetState(ctx context.Context) error { svcInfo = append(svcInfo, info) } - // 先停止监听等定时事件 - svc.watchEtcdActor.Stop() - svc.leaseActor.Stop() - - // 然后将新获取到的状态装填到Actor中。注:执行顺序需要考虑Actor会被谁调用,不会被调用的优先Reset。 + // 然后将新获取到的状态装填到Actor中 releasingIDs, err := svc.serviceInfoActor.ResetState(ctx, svcInfo, reqData) if err != nil { return fmt.Errorf("reseting service info actor: %w", err) } - svc.acquireActor.ResetState(svc.serviceInfoActor.GetSelfInfo().ID) - - svc.leaseActor.ResetState() - + // 要在acquireActor之前,因为acquireActor会调用它的WaitLocalIndexTo err = svc.providersActor.ResetState(index, reqData) if err != nil { return fmt.Errorf("reseting providers actor: %w", err) } - svc.releaseActor.ResetState(releasingIDs) + svc.acquireActor.ResetState(svc.serviceInfoActor.GetSelfInfo().ID) - // 重置完了之后再启动监听 + // ReleaseActor没有什么需要Reset的状态 + svc.releaseActor.Release(releasingIDs) + + // 重置完了之后再退出维护模式 svc.watchEtcdActor.Start(txResp.Header.Revision) svc.leaseActor.Start() + svc.acquireActor.LeaveMaintenance() + svc.releaseActor.LeaveMaintenance() + + svc.acquireActor.TryAcquireNow() + svc.releaseActor.TryReleaseNow() + return nil }