Browse Source

修复调试问题

pull/29/head
Sydonian 2 years ago
parent
commit
d775fd3113
5 changed files with 161 additions and 72 deletions
  1. +37
    -14
      pkgs/distlock/internal/acquire_actor.go
  2. +12
    -19
      pkgs/distlock/internal/providers_actor.go
  3. +38
    -21
      pkgs/distlock/internal/release_actor.go
  4. +66
    -16
      pkgs/distlock/internal/service_info_actor.go
  5. +8
    -2
      pkgs/distlock/service.go

+ 37
- 14
pkgs/distlock/internal/acquire_actor.go View File

@@ -29,17 +29,19 @@ type AcquireActor struct {
etcdCli *clientv3.Client
providersActor *ProvidersActor

isMaintenance bool
serviceID string
acquirings []*acquireInfo
lock sync.Mutex
isMaintenance bool
serviceID string
acquirings []*acquireInfo
lock sync.Mutex
doAcquiringChan chan any
}

func NewAcquireActor(cfg *Config, etcdCli *clientv3.Client) *AcquireActor {
return &AcquireActor{
cfg: cfg,
etcdCli: etcdCli,
isMaintenance: true,
cfg: cfg,
etcdCli: etcdCli,
isMaintenance: true,
doAcquiringChan: make(chan any),
}
}

@@ -65,10 +67,9 @@ func (a *AcquireActor) Acquire(ctx context.Context, req LockRequest) (string, er
return
}

// TODO 处理错误
err := a.doAcquiring()
if err != nil {
logger.Std.Debugf("doing acquiring: %s", err.Error())
select {
case a.doAcquiringChan <- nil:
default:
}
}()

@@ -106,9 +107,9 @@ func (a *AcquireActor) TryAcquireNow() {
return
}

err := a.doAcquiring()
if err != nil {
logger.Std.Debugf("doing acquiring: %s", err.Error())
select {
case a.doAcquiringChan <- nil:
default:
}
}()
}
@@ -136,13 +137,30 @@ func (a *AcquireActor) ResetState(serviceID string) {
a.serviceID = serviceID
}

func (a *AcquireActor) Serve() {
for {
select {
case <-a.doAcquiringChan:
err := a.doAcquiring()
if err != nil {
logger.Std.Debugf("doing acquiring: %s", err.Error())
}
}
}
}

func (a *AcquireActor) doAcquiring() error {
ctx := context.Background()

// 先看一眼,如果没有需要请求的锁,就不用走后面的流程了
a.lock.Lock()
if len(a.acquirings) == 0 {
a.lock.Unlock()
return nil
}
a.lock.Unlock()

// 在获取全局锁的时候不用锁Actor,只有获取成功了,才加锁
// TODO 根据不同的错误设置不同的错误类型,方便上层进行后续处理
unlock, err := acquireEtcdRequestDataLock(ctx, a.etcdCli, a.cfg.EtcdLockLeaseTimeSec)
if err != nil {
@@ -155,6 +173,8 @@ func (a *AcquireActor) doAcquiring() error {
return err
}

logger.Std.Infof("wait to: %d", index)

// 等待本地状态同步到最新
// TODO 配置等待时间
err = a.providersActor.WaitLocalIndexTo(ctx, index)
@@ -162,12 +182,15 @@ func (a *AcquireActor) doAcquiring() error {
return err
}

a.lock.Lock()
defer a.lock.Unlock()
// TODO 可以考虑一次性获得多个锁
for i := 0; i < len(a.acquirings); i++ {
req := a.acquirings[i]

// 测试锁,并获得锁数据
reqData, err := a.providersActor.TestLockRequestAndMakeData(req.Request)
logger.Std.Infof("6")
if err != nil {
req.LastErr = err
continue


+ 12
- 19
pkgs/distlock/internal/providers_actor.go View File

@@ -56,30 +56,23 @@ func (a *ProvidersActor) WaitLocalIndexTo(ctx context.Context, index int64) erro
}

func (a *ProvidersActor) OnLockRequestEvent(evt LockRequestEvent) error {
err := func() error {
a.lock.Lock()
defer a.lock.Unlock()

if evt.IsLocking {
err := a.lockLockRequest(evt.Data)
if err != nil {
return fmt.Errorf("applying locking event: %w", err)
}
a.lock.Lock()
defer a.lock.Unlock()

} else {
err := a.unlockLockRequest(evt.Data)
if err != nil {
return fmt.Errorf("applying unlocking event: %w", err)
}
if evt.IsLocking {
err := a.lockLockRequest(evt.Data)
if err != nil {
return fmt.Errorf("applying locking event: %w", err)
}

a.localLockReqIndex++
return nil
}()
if err != nil {
return err
} else {
err := a.unlockLockRequest(evt.Data)
if err != nil {
return fmt.Errorf("applying unlocking event: %w", err)
}
}

a.localLockReqIndex++
// 检查是否有等待同步进度的需求
a.wakeUpIndexWaiter()
return nil


+ 38
- 21
pkgs/distlock/internal/release_actor.go View File

@@ -27,6 +27,7 @@ type ReleaseActor struct {
releasingLockRequestIDs map[string]bool
timer *time.Timer
timerSetup bool
doReleasingChan chan any
}

func NewReleaseActor(cfg *Config, etcdCli *clientv3.Client) *ReleaseActor {
@@ -35,6 +36,7 @@ func NewReleaseActor(cfg *Config, etcdCli *clientv3.Client) *ReleaseActor {
etcdCli: etcdCli,
isMaintenance: true,
releasingLockRequestIDs: make(map[string]bool),
doReleasingChan: make(chan any),
}
}

@@ -51,13 +53,10 @@ func (a *ReleaseActor) Release(reqIDs []string) {
return
}

// TODO 处理错误
err := a.doReleasing()
if err != nil {
logger.Std.Debugf("doing releasing: %s", err.Error())
select {
case a.doReleasingChan <- nil:
default:
}

a.setupTimer()
}

// 延迟释放锁。一般用于清理崩溃的锁服务遗留下来的锁
@@ -86,13 +85,10 @@ func (a *ReleaseActor) TryReleaseNow() {
return
}

// TODO 处理错误
err := a.doReleasing()
if err != nil {
logger.Std.Debugf("doing releasing: %s", err.Error())
select {
case a.doReleasingChan <- nil:
default:
}

a.setupTimer()
}

// 进入维护模式。在维护模式期间只接受请求,不处理请求,包括延迟释放请求。
@@ -112,21 +108,41 @@ func (a *ReleaseActor) LeaveMaintenance() {
}

func (a *ReleaseActor) OnLockRequestEvent(event LockRequestEvent) {
if event.IsLocking {
return
}

a.lock.Lock()
defer a.lock.Unlock()

if !event.IsLocking {
delete(a.releasingLockRequestIDs, event.Data.ID)
delete(a.releasingLockRequestIDs, event.Data.ID)
}

func (a *ReleaseActor) Serve() {
for {
select {
case <-a.doReleasingChan:
err := a.doReleasing()
if err != nil {
logger.Std.Debugf("doing releasing: %s", err.Error())
}
}
}
}

func (a *ReleaseActor) doReleasing() error {
ctx := context.TODO()
ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
defer cancel()

// 先看一眼,如果没有需要释放的锁,就不用走后面的流程了
a.lock.Lock()
if len(a.releasingLockRequestIDs) == 0 {
a.lock.Unlock()
return nil
}
a.lock.Unlock()

// 在获取全局锁的时候不用锁Actor,只有获取成功了,才加锁
// TODO 根据不同的错误设置不同的错误类型,方便上层进行后续处理
unlock, err := acquireEtcdRequestDataLock(ctx, a.etcdCli, a.cfg.EtcdLockLeaseTimeSec)
if err != nil {
@@ -139,6 +155,10 @@ func (a *ReleaseActor) doReleasing() error {
return err
}

a.lock.Lock()
defer a.lock.Unlock()
defer a.setupTimer()

// TODO 可以考虑优化成一次性删除多个锁
for id := range a.releasingLockRequestIDs {
lockReqKey := MakeEtcdLockRequestKey(id)
@@ -195,12 +215,9 @@ func (a *ReleaseActor) setupTimer() {
return
}

// TODO 处理错误
err := a.doReleasing()
if err != nil {
logger.Std.Debugf("doing releasing: %s", err.Error())
select {
case a.doReleasingChan <- nil:
default:
}

a.setupTimer()
}()
}

+ 66
- 16
pkgs/distlock/internal/service_info_actor.go View File

@@ -7,6 +7,7 @@ import (
"sync"

"github.com/google/uuid"
"gitlink.org.cn/cloudream/common/pkgs/logger"
mylo "gitlink.org.cn/cloudream/common/utils/lo"
"gitlink.org.cn/cloudream/common/utils/serder"
clientv3 "go.etcd.io/etcd/client/v3"
@@ -20,14 +21,15 @@ type serviceStatus struct {
}

type ServiceInfoActor struct {
cfg *Config
etcdCli *clientv3.Client
releaseActor *ReleaseActor

lock sync.Mutex
selfInfo ServiceInfo
leaseID *clientv3.LeaseID
services map[string]*serviceStatus
cfg *Config
etcdCli *clientv3.Client

lock sync.Mutex
selfInfo ServiceInfo
leaseID *clientv3.LeaseID
leaseKeepAlive chan any
services map[string]*serviceStatus
releaseActor *ReleaseActor
}

func NewServiceInfoActor(cfg *Config, etcdCli *clientv3.Client, baseSelfInfo ServiceInfo) *ServiceInfoActor {
@@ -38,6 +40,10 @@ func NewServiceInfoActor(cfg *Config, etcdCli *clientv3.Client, baseSelfInfo Ser
}
}

func (a *ServiceInfoActor) Init(releaseActor *ReleaseActor) {
a.releaseActor = releaseActor
}

func (a *ServiceInfoActor) GetSelfInfo() *ServiceInfo {
return &a.selfInfo
}
@@ -48,6 +54,7 @@ func (a *ServiceInfoActor) ResetState(ctx context.Context, currentServices []Ser

if a.leaseID != nil {
a.etcdCli.Revoke(ctx, *a.leaseID)
close(a.leaseKeepAlive)
a.leaseID = nil
}

@@ -63,9 +70,36 @@ func (a *ServiceInfoActor) ResetState(ctx context.Context, currentServices []Ser
if err != nil {
return nil, fmt.Errorf("granting lease: %w", err)
}

a.leaseID = &lease.ID

keepAliveChan, err := a.etcdCli.Lease.KeepAlive(context.Background(), lease.ID)
if err != nil {
a.etcdCli.Revoke(ctx, lease.ID)
return nil, fmt.Errorf("starting keep lease alive: %w", err)
}
a.leaseKeepAlive = make(chan any)

go func() {
for {
select {
case _, ok := <-keepAliveChan:
if !ok {
logger.Std.Warnf("lease keep alive channel closed, will try to open again")

var err error
keepAliveChan, err = a.etcdCli.Lease.KeepAlive(context.Background(), lease.ID)
if err != nil {
logger.Std.Warnf("starting keep lease alive: %s", err.Error())
return
}
}

case <-a.leaseKeepAlive:
return
}
}
}()

_, err = a.etcdCli.Put(ctx, MakeServiceInfoKey(a.selfInfo.ID), string(infoData), clientv3.WithLease(lease.ID))
if err != nil {
a.etcdCli.Revoke(ctx, lease.ID)
@@ -79,6 +113,10 @@ func (a *ServiceInfoActor) ResetState(ctx context.Context, currentServices []Ser
Info: svc,
}
}
// 直接添加自己的信息
a.services[a.selfInfo.ID] = &serviceStatus{
Info: a.selfInfo,
}

// 导入锁信息的过程中可能会发现未注册信息的锁服务的锁,把他们挑出来释放掉
var willReleaseIDs []string
@@ -102,10 +140,16 @@ func (a *ServiceInfoActor) OnServiceEvent(evt ServiceEvent) error {
// TODO 可以考虑打印一点日志

if evt.IsNew {
a.services[evt.Info.ID] = &serviceStatus{
Info: evt.Info,
if evt.Info.ID != a.selfInfo.ID {
logger.Std.WithField("ID", evt.Info.ID).Infof("new service up")
a.services[evt.Info.ID] = &serviceStatus{
Info: evt.Info,
}
}

} else {
logger.Std.WithField("ID", evt.Info.ID).Infof("service down, will release all its locks")

status, ok := a.services[evt.Info.ID]
if !ok {
return nil
@@ -130,11 +174,17 @@ func (a *ServiceInfoActor) OnLockRequestEvent(evt LockRequestEvent) {

status, ok := a.services[evt.Data.SerivceID]
if !ok {
// 加锁的是一个没有注册过的锁服务,可能是因为这个锁服务之前网络发生了波动,
// 在波动期间它注册的信息过期,于是被大家认为服务下线,清理掉了它管理的锁,
// 而在网络恢复回来之后,它还没有意识到自己被认为下线了,于是还在提交锁请求。
// 为了防止它加了这个锁之后又崩溃,导致的无限锁定,它加的锁我们都直接释放。
a.releaseActor.Release([]string{evt.Data.ID})
if evt.IsLocking {
// 加锁的是一个没有注册过的锁服务,可能是因为这个锁服务之前网络发生了波动,
// 在波动期间它注册的信息过期,于是被大家认为服务下线,清理掉了它管理的锁,
// 而在网络恢复回来之后,它还没有意识到自己被认为下线了,于是还在提交锁请求。
// 为了防止它加了这个锁之后又崩溃,导致的无限锁定,它加的锁我们都直接释放。
logger.Std.WithField("RequestID", evt.Data.ID).
WithField("ServiceID", evt.Data.SerivceID).
Warnf("the lock request is from an unknow service, will release it")

a.releaseActor.Release([]string{evt.Data.ID})
}
return
}



+ 8
- 2
pkgs/distlock/service.go View File

@@ -111,6 +111,7 @@ func NewService(cfg *internal.Config, initProvs []PathProvider) (*Service, error
svc.cmdChan.Send(func() { svc.doResetState() })
},
)
svc.serviceInfoActor.Init(svc.releaseActor)

for _, prov := range initProvs {
svc.providersActor.AddProvider(prov.Provider, prov.Path...)
@@ -177,6 +178,10 @@ func (svc *Service) Serve() error {

go svc.leaseActor.Serve()

go svc.acquireActor.Serve()

go svc.releaseActor.Serve()

svc.cmdChan.Send(func() { svc.doResetState() })

cmdChan := svc.cmdChan.BeginChanReceive()
@@ -202,7 +207,8 @@ func (svc *Service) doResetState() {
svc.cmdChan.Send(func() { svc.doResetState() })
return
}
logger.Std.Infof("reset state success")
logger.Std.WithField("ID", svc.serviceInfoActor.GetSelfInfo().ID).
Infof("reset state success")
}

// ResetState 重置内部状态。注:只要调用到了此函数,无论在哪一步出的错,
@@ -279,7 +285,7 @@ func (svc *Service) resetState(ctx context.Context) error {
svc.acquireActor.ResetState(svc.serviceInfoActor.GetSelfInfo().ID)

// ReleaseActor没有什么需要Reset的状态
svc.releaseActor.Release(releasingIDs)
svc.releaseActor.DelayRelease(releasingIDs)

// 重置完了之后再退出维护模式
svc.watchEtcdActor.Start(txResp.Header.Revision)


Loading…
Cancel
Save