Browse Source

完善错误处理

pull/29/head
Sydonian 2 years ago
parent
commit
b41724c9a6
9 changed files with 217 additions and 134 deletions
  1. +35
    -14
      pkgs/distlock/internal/acquire_actor.go
  2. +3
    -2
      pkgs/distlock/internal/config.go
  3. +3
    -19
      pkgs/distlock/internal/lease_actor.go
  4. +2
    -1
      pkgs/distlock/internal/models.go
  5. +10
    -9
      pkgs/distlock/internal/providers_actor.go
  6. +43
    -5
      pkgs/distlock/internal/release_actor.go
  7. +18
    -8
      pkgs/distlock/internal/service_info_actor.go
  8. +17
    -16
      pkgs/distlock/internal/watch_etcd_actor.go
  9. +86
    -60
      pkgs/distlock/service.go

+ 35
- 14
pkgs/distlock/internal/acquire_actor.go View File

@@ -29,15 +29,17 @@ type AcquireActor struct {
etcdCli *clientv3.Client
providersActor *ProvidersActor

serviceID string
acquirings []*acquireInfo
lock sync.Mutex
isMaintenance bool
serviceID string
acquirings []*acquireInfo
lock sync.Mutex
}

func NewAcquireActor(cfg *Config, etcdCli *clientv3.Client) *AcquireActor {
return &AcquireActor{
cfg: cfg,
etcdCli: etcdCli,
cfg: cfg,
etcdCli: etcdCli,
isMaintenance: true,
}
}

@@ -57,6 +59,12 @@ func (a *AcquireActor) Acquire(ctx context.Context, req LockRequest) (string, er
defer a.lock.Unlock()

a.acquirings = append(a.acquirings, info)

// 如果处于维护模式,那么只接受请求,不实际去处理
if a.isMaintenance {
return
}

// TODO 处理错误
err := a.doAcquiring()
if err != nil {
@@ -93,6 +101,11 @@ func (a *AcquireActor) TryAcquireNow() {
a.lock.Lock()
defer a.lock.Unlock()

// 处于维护模式中时,即使是主动触发Acqurire也不予理会
if a.isMaintenance {
return
}

err := a.doAcquiring()
if err != nil {
logger.Std.Debugf("doing acquiring: %s", err.Error())
@@ -100,19 +113,27 @@ func (a *AcquireActor) TryAcquireNow() {
}()
}

// 进入维护模式。维护模式期间只接受请求,不处理请求。
func (a *AcquireActor) EnterMaintenance() {
a.lock.Lock()
defer a.lock.Unlock()

a.isMaintenance = true
}

// 退出维护模式。退出之后建议调用一下TryAcquireNow。
func (a *AcquireActor) LeaveMaintenance() {
a.lock.Lock()
defer a.lock.Unlock()

a.isMaintenance = false
}

func (a *AcquireActor) ResetState(serviceID string) {
a.lock.Lock()
defer a.lock.Unlock()

a.serviceID = serviceID
for _, info := range a.acquirings {
if info.LastErr != nil {
info.Callback.SetError(info.LastErr)
} else {
info.Callback.SetError(ErrAcquiringTimeout)
}
}
a.acquirings = nil
}

func (a *AcquireActor) doAcquiring() error {
@@ -136,7 +157,7 @@ func (a *AcquireActor) doAcquiring() error {

// 等待本地状态同步到最新
// TODO 配置等待时间
err = a.providersActor.WaitIndexUpdated(ctx, index)
err = a.providersActor.WaitLocalIndexTo(ctx, index)
if err != nil {
return err
}


+ 3
- 2
pkgs/distlock/internal/config.go View File

@@ -5,6 +5,7 @@ type Config struct {
EtcdUsername string `json:"etcdUsername"`
EtcdPassword string `json:"etcdPassword"`

EtcdLockLeaseTimeSec int64 `json:"etcdLockLeaseTimeSec"` // 全局锁的租约时间。锁服务会在这个时间内自动续约锁,但如果服务崩溃,则其他服务在租约到期后能重新获得锁。
RandomReleasingDelayMs int64 `json:"randomReleasingDelayMs"` // 释放锁失败,随机延迟之后再次尝试。延迟时间=random(0, RandomReleasingDelayMs) + 最少延迟时间(1000ms)
EtcdLockLeaseTimeSec int64 `json:"etcdLockLeaseTimeSec"` // 全局锁的租约时间。锁服务会在这个时间内自动续约锁,但如果服务崩溃,则其他服务在租约到期后能重新获得锁。
RandomReleasingDelayMs int64 `json:"randomReleasingDelayMs"` // 释放锁失败,随机延迟之后再次尝试。延迟时间=random(0, RandomReleasingDelayMs) + 最少延迟时间(1000ms)
ServiceDescription string `json:"serviceDescription"` // 锁服务描述信息,锁服务启动后会注册到Etcd中
}

+ 3
- 19
pkgs/distlock/internal/lease_actor.go View File

@@ -91,26 +91,14 @@ func (a *LeaseActor) Remove(reqID string) error {
})
}

func (a *LeaseActor) ResetState() {
actor.Wait(context.Background(), a.commandChan, func() error {
a.leases = make(map[string]*lockRequestLease)
return nil
})
}

func (a *LeaseActor) Serve() error {
func (a *LeaseActor) Serve() {
cmdChan := a.commandChan.BeginChanReceive()
defer a.commandChan.CloseChanReceive()

for {
if a.ticker != nil {
select {
case cmd, ok := <-cmdChan:
if !ok {
a.ticker.Stop()
return fmt.Errorf("command chan closed")
}

case cmd := <-cmdChan:
cmd()

case now := <-a.ticker.C:
@@ -127,11 +115,7 @@ func (a *LeaseActor) Serve() error {
}
} else {
select {
case cmd, ok := <-cmdChan:
if !ok {
return fmt.Errorf("command chan closed")
}

case cmd := <-cmdChan:
cmd()
}
}


+ 2
- 1
pkgs/distlock/internal/models.go View File

@@ -72,5 +72,6 @@ func MakeServiceInfoKey(svcID string) string {
}

type ServiceInfo struct {
ID string `json:"id"`
ID string `json:"id"`
Description string `json:"description"`
}

+ 10
- 9
pkgs/distlock/internal/providers_actor.go View File

@@ -7,7 +7,6 @@ import (
"sync"

"gitlink.org.cn/cloudream/common/pkgs/future"
"gitlink.org.cn/cloudream/common/pkgs/logger"
"gitlink.org.cn/cloudream/common/pkgs/trie"
)

@@ -39,7 +38,7 @@ func (a *ProvidersActor) AddProvider(prov LockProvider, path ...any) {
func (a *ProvidersActor) Init() {
}

func (a *ProvidersActor) WaitIndexUpdated(ctx context.Context, index int64) error {
func (a *ProvidersActor) WaitLocalIndexTo(ctx context.Context, index int64) error {
fut := future.NewSetVoid()

a.lock.Lock()
@@ -56,32 +55,34 @@ func (a *ProvidersActor) WaitIndexUpdated(ctx context.Context, index int64) erro
return fut.Wait(ctx)
}

func (a *ProvidersActor) OnLockRequestEvent(evt LockRequestEvent) {
func() {
func (a *ProvidersActor) OnLockRequestEvent(evt LockRequestEvent) error {
err := func() error {
a.lock.Lock()
defer a.lock.Unlock()

if evt.IsLocking {
err := a.lockLockRequest(evt.Data)
if err != nil {
// TODO 发生这种错误需要重新加载全量状态,下同
logger.Std.Warnf("applying locking event: %s", err.Error())
return
return fmt.Errorf("applying locking event: %w", err)
}

} else {
err := a.unlockLockRequest(evt.Data)
if err != nil {
logger.Std.Warnf("applying unlocking event: %s", err.Error())
return
return fmt.Errorf("applying unlocking event: %w", err)
}
}

a.localLockReqIndex++
return nil
}()
if err != nil {
return err
}

// 检查是否有等待同步进度的需求
a.wakeUpIndexWaiter()
return nil
}

func (svc *ProvidersActor) lockLockRequest(reqData LockRequestData) error {


+ 43
- 5
pkgs/distlock/internal/release_actor.go View File

@@ -22,16 +22,18 @@ type ReleaseActor struct {
cfg *Config
etcdCli *clientv3.Client

lock sync.Mutex
isMaintenance bool
releasingLockRequestIDs map[string]bool
timer *time.Timer
timerSetup bool
lock sync.Mutex
}

func NewReleaseActor(cfg *Config, etcdCli *clientv3.Client) *ReleaseActor {
return &ReleaseActor{
cfg: cfg,
etcdCli: etcdCli,
isMaintenance: true,
releasingLockRequestIDs: make(map[string]bool),
}
}
@@ -45,6 +47,10 @@ func (a *ReleaseActor) Release(reqIDs []string) {
a.releasingLockRequestIDs[id] = true
}

if a.isMaintenance {
return
}

// TODO 处理错误
err := a.doReleasing()
if err != nil {
@@ -63,21 +69,48 @@ func (a *ReleaseActor) DelayRelease(reqIDs []string) {
a.releasingLockRequestIDs[id] = true
}

if a.isMaintenance {
return
}

a.setupTimer()
}

func (a *ReleaseActor) ResetState(reqIDs []string) {
// 重试一下内部的解锁请求。不会阻塞调用者
func (a *ReleaseActor) TryReleaseNow() {
a.lock.Lock()
defer a.lock.Unlock()

a.releasingLockRequestIDs = make(map[string]bool)
for _, id := range reqIDs {
a.releasingLockRequestIDs[id] = true
// 如果处于维护模式,那么即使主动进行释放操作,也不予理会
if a.isMaintenance {
return
}

// TODO 处理错误
err := a.doReleasing()
if err != nil {
logger.Std.Debugf("doing releasing: %s", err.Error())
}

a.setupTimer()
}

// 进入维护模式。在维护模式期间只接受请求,不处理请求,包括延迟释放请求。
func (a *ReleaseActor) EnterMaintenance() {
a.lock.Lock()
defer a.lock.Unlock()

a.isMaintenance = true
}

// 退出维护模式。退出之后建议调用一下TryReleaseNow。
func (a *ReleaseActor) LeaveMaintenance() {
a.lock.Lock()
defer a.lock.Unlock()

a.isMaintenance = false
}

func (a *ReleaseActor) OnLockRequestEvent(event LockRequestEvent) {
a.lock.Lock()
defer a.lock.Unlock()
@@ -157,6 +190,11 @@ func (a *ReleaseActor) setupTimer() {

a.timerSetup = false

// 如果处于维护模式,那么即使是定时器要求的释放操作,也不予理会
if a.isMaintenance {
return
}

// TODO 处理错误
err := a.doReleasing()
if err != nil {


+ 18
- 8
pkgs/distlock/internal/service_info_actor.go View File

@@ -2,6 +2,7 @@ package internal

import (
"context"
"errors"
"fmt"
"sync"

@@ -11,6 +12,8 @@ import (
clientv3 "go.etcd.io/etcd/client/v3"
)

var ErrSelfServiceDown = errors.New("self service is down, need to restart")

type serviceStatus struct {
Info ServiceInfo
LockRequestIDs []string
@@ -27,10 +30,11 @@ type ServiceInfoActor struct {
services map[string]*serviceStatus
}

func NewServiceInfoActor(cfg *Config, etcdCli *clientv3.Client) *ServiceInfoActor {
func NewServiceInfoActor(cfg *Config, etcdCli *clientv3.Client, baseSelfInfo ServiceInfo) *ServiceInfoActor {
return &ServiceInfoActor{
cfg: cfg,
etcdCli: etcdCli,
cfg: cfg,
etcdCli: etcdCli,
selfInfo: baseSelfInfo,
}
}

@@ -91,7 +95,7 @@ func (a *ServiceInfoActor) ResetState(ctx context.Context, currentServices []Ser
return willReleaseIDs, nil
}

func (a *ServiceInfoActor) OnServiceEvent(evt ServiceEvent) {
func (a *ServiceInfoActor) OnServiceEvent(evt ServiceEvent) error {
a.lock.Lock()
defer a.lock.Unlock()

@@ -104,15 +108,20 @@ func (a *ServiceInfoActor) OnServiceEvent(evt ServiceEvent) {
} else {
status, ok := a.services[evt.Info.ID]
if !ok {
return
return nil
}

a.releaseActor.DelayRelease(status.LockRequestIDs)

delete(a.services, evt.Info.ID)

// TODO 处理收到自己崩溃的消息
// 如果收到的被删除服务信息是自己的,那么自己要重启,重新获取全量数据
if evt.Info.ID == a.selfInfo.ID {
return ErrSelfServiceDown
}
}

return nil
}

func (a *ServiceInfoActor) OnLockRequestEvent(evt LockRequestEvent) {
@@ -121,8 +130,9 @@ func (a *ServiceInfoActor) OnLockRequestEvent(evt LockRequestEvent) {

status, ok := a.services[evt.Data.SerivceID]
if !ok {
// 加锁的是一个没有注册过的锁服务,大概率是因为这个锁服务之前网络发生了波动,
// 在波动期间它注册的信息过期,于是被当前的服务删除了。
// 加锁的是一个没有注册过的锁服务,可能是因为这个锁服务之前网络发生了波动,
// 在波动期间它注册的信息过期,于是被大家认为服务下线,清理掉了它管理的锁,
// 而在网络恢复回来之后,它还没有意识到自己被认为下线了,于是还在提交锁请求。
// 为了防止它加了这个锁之后又崩溃,导致的无限锁定,它加的锁我们都直接释放。
a.releaseActor.Release([]string{evt.Data.ID})
return


+ 17
- 16
pkgs/distlock/internal/watch_etcd_actor.go View File

@@ -24,6 +24,8 @@ type OnLockRequestEventFn func(event LockRequestEvent)

type OnServiceEventFn func(event ServiceEvent)

type OnWatchFailedFn func(err error)

type WatchEtcdActor struct {
etcdCli *clientv3.Client

@@ -31,6 +33,7 @@ type WatchEtcdActor struct {
watchChanCancel func()
onLockRequestEventFn OnLockRequestEventFn
onServiceEventFn OnServiceEventFn
onWatchFailedFn OnWatchFailedFn
commandChan *actor.CommandChannel
}

@@ -41,9 +44,10 @@ func NewWatchEtcdActor(etcdCli *clientv3.Client) *WatchEtcdActor {
}
}

func (a *WatchEtcdActor) Init(onLockRequestEvent OnLockRequestEventFn, onServiceDown OnServiceEventFn) {
func (a *WatchEtcdActor) Init(onLockRequestEvent OnLockRequestEventFn, onServiceDown OnServiceEventFn, onWatchFailed OnWatchFailedFn) {
a.onLockRequestEventFn = onLockRequestEvent
a.onServiceEventFn = onServiceDown
a.onWatchFailedFn = onWatchFailed
}

func (a *WatchEtcdActor) Start(revision int64) {
@@ -71,40 +75,37 @@ func (a *WatchEtcdActor) Stop() {
})
}

func (a *WatchEtcdActor) Serve() error {
func (a *WatchEtcdActor) Serve() {
cmdChan := a.commandChan.BeginChanReceive()
defer a.commandChan.CloseChanReceive()

for {
if a.watchChan != nil {
select {
case cmd, ok := <-cmdChan:
if !ok {
return fmt.Errorf("command channel closed")
}

case cmd := <-cmdChan:
cmd()

case msg := <-a.watchChan:
// 只要发生错误,就停止监听,通知外部处理
if msg.Canceled {
// TODO 更好的错误处理
return fmt.Errorf("watch etcd channel closed")
a.onWatchFailedFn(fmt.Errorf("watch etcd channel closed"))
a.watchChanCancel()
a.watchChan = nil
continue
}

err := a.dispatchEtcdEvent(msg)
if err != nil {
// TODO 更好的错误处理
return err
a.onWatchFailedFn(err)
a.watchChanCancel()
a.watchChan = nil
continue
}
}

} else {
select {
case cmd, ok := <-cmdChan:
if !ok {
return fmt.Errorf("command channel closed")
}

case cmd := <-cmdChan:
cmd()
}
}


+ 86
- 60
pkgs/distlock/service.go View File

@@ -6,6 +6,7 @@ import (
"strconv"
"time"

"gitlink.org.cn/cloudream/common/pkgs/actor"
"gitlink.org.cn/cloudream/common/pkgs/distlock/internal"
"gitlink.org.cn/cloudream/common/pkgs/logger"
"gitlink.org.cn/cloudream/common/utils/serder"
@@ -47,6 +48,7 @@ type Service struct {
cfg *internal.Config
etcdCli *clientv3.Client

cmdChan *actor.CommandChannel
acquireActor *internal.AcquireActor
releaseActor *internal.ReleaseActor
providersActor *internal.ProvidersActor
@@ -66,42 +68,55 @@ func NewService(cfg *internal.Config, initProvs []PathProvider) (*Service, error
return nil, fmt.Errorf("new etcd client failed, err: %w", err)
}

acquireActor := internal.NewAcquireActor(cfg, etcdCli)
releaseActor := internal.NewReleaseActor(cfg, etcdCli)
providersActor := internal.NewProvidersActor()
watchEtcdActor := internal.NewWatchEtcdActor(etcdCli)
leaseActor := internal.NewLeaseActor()
serviceInfoActor := internal.NewServiceInfoActor(cfg, etcdCli)

acquireActor.Init(providersActor)
leaseActor.Init(releaseActor)
providersActor.Init()
watchEtcdActor.Init(
svc := &Service{
cfg: cfg,
etcdCli: etcdCli,
cmdChan: actor.NewCommandChannel(),
}

svc.acquireActor = internal.NewAcquireActor(cfg, etcdCli)
svc.releaseActor = internal.NewReleaseActor(cfg, etcdCli)
svc.providersActor = internal.NewProvidersActor()
svc.watchEtcdActor = internal.NewWatchEtcdActor(etcdCli)
svc.leaseActor = internal.NewLeaseActor()
svc.serviceInfoActor = internal.NewServiceInfoActor(cfg, etcdCli, internal.ServiceInfo{
Description: cfg.ServiceDescription,
})

svc.acquireActor.Init(svc.providersActor)
svc.leaseActor.Init(svc.releaseActor)
svc.providersActor.Init()
svc.watchEtcdActor.Init(
func(event internal.LockRequestEvent) {
providersActor.OnLockRequestEvent(event)
acquireActor.TryAcquireNow()
releaseActor.OnLockRequestEvent(event)
serviceInfoActor.OnLockRequestEvent(event)
err := svc.providersActor.OnLockRequestEvent(event)
if err != nil {
logger.Std.Warnf("%s, will reset service state", err.Error())
svc.cmdChan.Send(func() { svc.doResetState() })
return
}

svc.acquireActor.TryAcquireNow()
svc.releaseActor.OnLockRequestEvent(event)
svc.serviceInfoActor.OnLockRequestEvent(event)
},
func(event internal.ServiceEvent) {
serviceInfoActor.OnServiceEvent(event)
err := svc.serviceInfoActor.OnServiceEvent(event)
if err != nil {
logger.Std.Warnf("%s, will reset service state", err.Error())
svc.cmdChan.Send(func() { svc.doResetState() })
}
},
func(err error) {
logger.Std.Warnf("%s, will reset service state", err.Error())
svc.cmdChan.Send(func() { svc.doResetState() })
},
)

for _, prov := range initProvs {
providersActor.AddProvider(prov.Provider, prov.Path...)
svc.providersActor.AddProvider(prov.Provider, prov.Path...)
}

return &Service{
cfg: cfg,
etcdCli: etcdCli,
acquireActor: acquireActor,
releaseActor: releaseActor,
providersActor: providersActor,
watchEtcdActor: watchEtcdActor,
leaseActor: leaseActor,
serviceInfoActor: serviceInfoActor,
}, nil
return svc, nil
}

// Acquire 请求一批锁。成功后返回锁请求ID
@@ -158,39 +173,48 @@ func (svc *Service) Serve() error {
// 1. client退出时直接中断进程,此时AcquireActor可能正在进行重试,于是导致Etcd锁没有解除就退出了进程。
// 虽然由于租约的存在不会导致系统长期卡死,但会影响client的使用

go func() {
// TODO 处理错误
err := svc.watchEtcdActor.Serve()
if err != nil {
logger.Std.Warnf("serving watch etcd actor actor failed, err: %s", err.Error())
}
}()
go svc.watchEtcdActor.Serve()

go func() {
// TODO 处理错误
err := svc.leaseActor.Serve()
if err != nil {
logger.Std.Warnf("serving lease actor failed, err: %s", err.Error())
go svc.leaseActor.Serve()

svc.cmdChan.Send(func() { svc.doResetState() })

cmdChan := svc.cmdChan.BeginChanReceive()
defer svc.cmdChan.CloseChanReceive()

for {
select {
case cmd := <-cmdChan:
cmd()
}
}()
}

return nil
}

func (svc *Service) doResetState() {
logger.Std.Infof("start reset state")
// TODO context
err := svc.resetState(context.Background())
if err != nil {
// TODO 关闭其他的Actor,或者更好的错误处理方式
return fmt.Errorf("init data failed, err: %w", err)
logger.Std.Warnf("reseting state: %s, will try again after 3 seconds", err.Error())
<-time.After(time.Second * 3)
svc.cmdChan.Send(func() { svc.doResetState() })
return
}

// TODO 防止退出的临时解决办法
ch := make(chan any)
<-ch

return nil
logger.Std.Infof("reset state success")
}

// ResetState 重置内部状态。注:只要调用到了此函数,无论在哪一步出的错,
// 都要将内部状态视为已被破坏,直到成功调用了此函数才能继续后面的步骤
// 都要将内部状态视为已被破坏,直到成功调用了此函数才能继续后面的步骤。
// 如果调用失败,服务将进入维护模式,届时可以接受请求,但不会处理请求,直到调用成功为止。
func (svc *Service) resetState(ctx context.Context) error {
// 让服务都进入维护模式
svc.watchEtcdActor.Stop()
svc.leaseActor.Stop()
svc.acquireActor.EnterMaintenance()
svc.releaseActor.EnterMaintenance()

// 必须使用事务一次性获取所有数据
txResp, err := svc.etcdCli.Txn(ctx).
Then(
@@ -240,29 +264,31 @@ func (svc *Service) resetState(ctx context.Context) error {
svcInfo = append(svcInfo, info)
}

// 先停止监听等定时事件
svc.watchEtcdActor.Stop()
svc.leaseActor.Stop()

// 然后将新获取到的状态装填到Actor中。注:执行顺序需要考虑Actor会被谁调用,不会被调用的优先Reset。
// 然后将新获取到的状态装填到Actor中
releasingIDs, err := svc.serviceInfoActor.ResetState(ctx, svcInfo, reqData)
if err != nil {
return fmt.Errorf("reseting service info actor: %w", err)
}

svc.acquireActor.ResetState(svc.serviceInfoActor.GetSelfInfo().ID)

svc.leaseActor.ResetState()

// 要在acquireActor之前,因为acquireActor会调用它的WaitLocalIndexTo
err = svc.providersActor.ResetState(index, reqData)
if err != nil {
return fmt.Errorf("reseting providers actor: %w", err)
}

svc.releaseActor.ResetState(releasingIDs)
svc.acquireActor.ResetState(svc.serviceInfoActor.GetSelfInfo().ID)

// 重置完了之后再启动监听
// ReleaseActor没有什么需要Reset的状态
svc.releaseActor.Release(releasingIDs)

// 重置完了之后再退出维护模式
svc.watchEtcdActor.Start(txResp.Header.Revision)
svc.leaseActor.Start()
svc.acquireActor.LeaveMaintenance()
svc.releaseActor.LeaveMaintenance()

svc.acquireActor.TryAcquireNow()
svc.releaseActor.TryReleaseNow()

return nil
}

Loading…
Cancel
Save