Browse Source

增加管理租约的Actor

pull/1/head
Sydonian 2 years ago
parent
commit
0a7888900c
4 changed files with 205 additions and 34 deletions
  1. +3
    -5
      pkg/distlock/config.go
  2. +125
    -0
      pkg/distlock/lease_actor.go
  3. +12
    -12
      pkg/distlock/main_actor.go
  4. +65
    -17
      pkg/distlock/service.go

+ 3
- 5
pkg/distlock/config.go View File

@@ -5,10 +5,8 @@ type Config struct {
EtcdUsername string `json:"etcdUsername"` EtcdUsername string `json:"etcdUsername"`
EtcdPassword string `json:"etcdPassword"` EtcdPassword string `json:"etcdPassword"`


LockRequestDataConfig LockRequestDataConfig `json:"lockRequestDataConfig"`
}
EtcdLockAcquireTimeoutMs int `json:"etcdLockAcquireTimeoutMs"` // 获取Etcd全局锁的超时时间
EtcdLockLeaseTimeSec int64 `json:"etcdLockLeaseTimeSec"` // 全局锁的租约时间。锁服务会在这个时间内自动续约锁,但如果服务崩溃,则其他服务在租约到期后能重新获得锁。


type LockRequestDataConfig struct {
AcquireTimeoutMs int `json:"acquireTimeoutMs"` // 获取Etcd全局锁的超时时间
LeaseTimeSec int64 `json:"leaseTimeSec"` // 全局锁的租约时间。锁服务会在这个时间内自动续约锁,但如果服务崩溃,则其他服务在租约到期后能重新获得锁。
LockRequestLeaseTimeSec int64 `json:"lockRequestLeaseTimeSec"` // 锁请求的租约时间。调用方必须在这个时间内调用Renew续约。
} }

+ 125
- 0
pkg/distlock/lease_actor.go View File

@@ -0,0 +1,125 @@
package distlock

import (
"fmt"
"time"

"gitlink.org.cn/cloudream/common/pkg/actor"
)

type lockRequestLease struct {
RequestID string
Deadline time.Time
}

type leaseActor struct {
leases map[string]*lockRequestLease
ticker *time.Ticker

commandChan *actor.CommandChannel

mainActor *mainActor
}

func newLeaseActor() *leaseActor {
return &leaseActor{
leases: make(map[string]*lockRequestLease),
commandChan: actor.NewCommandChannel(),
}
}

func (a *leaseActor) Init(mainActor *mainActor) {
a.mainActor = mainActor
}

func (a *leaseActor) StartChecking() error {
return actor.Wait(a.commandChan, func() error {
a.ticker = time.NewTicker(time.Second)
return nil
})
}

func (a *leaseActor) StopChecking() error {
return actor.Wait(a.commandChan, func() error {
if a.ticker != nil {
a.ticker.Stop()
}
a.ticker = nil
return nil
})
}

func (a *leaseActor) Add(reqID string, leaseTime time.Duration) error {
return actor.Wait(a.commandChan, func() error {
lease, ok := a.leases[reqID]
if !ok {
lease = &lockRequestLease{
RequestID: reqID,
Deadline: time.Now().Add(leaseTime),
}
a.leases[reqID] = lease
} else {
lease.Deadline = time.Now().Add(leaseTime)
}

return nil
})
}

func (a *leaseActor) Renew(reqID string, leaseTime time.Duration) error {
return actor.Wait(a.commandChan, func() error {
lease, ok := a.leases[reqID]
if !ok {
return fmt.Errorf("lease not found for this lock request")

} else {
lease.Deadline = time.Now().Add(leaseTime)
}

return nil
})
}

func (a *leaseActor) Remove(reqID string) error {
return actor.Wait(a.commandChan, func() error {
delete(a.leases, reqID)
return nil
})
}

func (a *leaseActor) Server() error {
for {
if a.ticker != nil {
select {
case cmd, ok := <-a.commandChan.ChanReceive():
if !ok {
a.ticker.Stop()
return fmt.Errorf("command chan closed")
}

cmd()

case now := <-a.ticker.C:
for reqID, lease := range a.leases {
if now.After(lease.Deadline) {
delete(a.leases, reqID)

// TODO 可以考虑打个日志

a.mainActor.Release(reqID)
}
}

}
} else {
select {
case cmd, ok := <-a.commandChan.ChanReceive():
if !ok {
return fmt.Errorf("command chan closed")
}

cmd()
}
}
}
}

+ 12
- 12
pkg/distlock/main_actor.go View File

@@ -18,8 +18,8 @@ type mainActor struct {


commandChan *actor.CommandChannel commandChan *actor.CommandChannel


watchEtcd *watchEtcdActor
providers *providersActor
watchEtcdActor *watchEtcdActor
providersActor *providersActor
} }


func newMainActor() *mainActor { func newMainActor() *mainActor {
@@ -28,9 +28,9 @@ func newMainActor() *mainActor {
} }
} }


func (a *mainActor) Init(watchEtcd *watchEtcdActor, providers *providersActor) {
a.watchEtcd = watchEtcd
a.providers = providers
func (a *mainActor) Init(watchEtcdActor *watchEtcdActor, providersActor *providersActor) {
a.watchEtcdActor = watchEtcdActor
a.providersActor = providersActor
} }


// Acquire 请求一批锁。成功后返回锁请求ID // Acquire 请求一批锁。成功后返回锁请求ID
@@ -49,13 +49,13 @@ func (a *mainActor) Acquire(req LockRequest) (reqID string, err error) {
} }


// 等待本地状态同步到最新 // 等待本地状态同步到最新
err = a.providers.WaitIndexUpdated(index)
err = a.providersActor.WaitIndexUpdated(index)
if err != nil { if err != nil {
return "", err return "", err
} }


// 测试锁,并获得锁数据 // 测试锁,并获得锁数据
reqData, err := a.providers.TestLockRequestAndMakeData(req)
reqData, err := a.providersActor.TestLockRequestAndMakeData(req)
if err != nil { if err != nil {
return "", err return "", err
} }
@@ -125,7 +125,7 @@ func (a *mainActor) Release(reqID string) error {
} }


func (a *mainActor) acquireEtcdRequestDataLock() (unlock func(), err error) { func (a *mainActor) acquireEtcdRequestDataLock() (unlock func(), err error) {
lease, err := a.etcdCli.Grant(context.Background(), a.cfg.LockRequestDataConfig.LeaseTimeSec)
lease, err := a.etcdCli.Grant(context.Background(), a.cfg.EtcdLockLeaseTimeSec)
if err != nil { if err != nil {
return nil, fmt.Errorf("grant lease failed, err: %w", err) return nil, fmt.Errorf("grant lease failed, err: %w", err)
} }
@@ -139,7 +139,7 @@ func (a *mainActor) acquireEtcdRequestDataLock() (unlock func(), err error) {
mutex := concurrency.NewMutex(session, LOCK_REQUEST_LOCK_NAME) mutex := concurrency.NewMutex(session, LOCK_REQUEST_LOCK_NAME)


timeout, cancelFunc := context.WithTimeout(context.Background(), timeout, cancelFunc := context.WithTimeout(context.Background(),
time.Duration(a.cfg.LockRequestDataConfig.AcquireTimeoutMs)*time.Millisecond)
time.Duration(a.cfg.EtcdLockAcquireTimeoutMs)*time.Millisecond)
defer cancelFunc() defer cancelFunc()


err = mutex.Lock(timeout) err = mutex.Lock(timeout)
@@ -218,17 +218,17 @@ func (a *mainActor) ReloadEtcdData() error {


// 先停止监听,再重置锁状态,最后恢复监听 // 先停止监听,再重置锁状态,最后恢复监听


err = a.watchEtcd.StopWatching()
err = a.watchEtcdActor.StopWatching()
if err != nil { if err != nil {
return fmt.Errorf("stop watching etcd failed, err: %w", err) return fmt.Errorf("stop watching etcd failed, err: %w", err)
} }


err = a.providers.ResetState(index, reqData)
err = a.providersActor.ResetState(index, reqData)
if err != nil { if err != nil {
return fmt.Errorf("reset lock providers state failed, err: %w", err) return fmt.Errorf("reset lock providers state failed, err: %w", err)
} }


err = a.watchEtcd.StartWatching()
err = a.watchEtcdActor.StartWatching()
if err != nil { if err != nil {
return fmt.Errorf("start watching etcd failed, err: %w", err) return fmt.Errorf("start watching etcd failed, err: %w", err)
} }


+ 65
- 17
pkg/distlock/service.go View File

@@ -2,6 +2,7 @@ package distlock


import ( import (
"fmt" "fmt"
"time"


clientv3 "go.etcd.io/etcd/client/v3" clientv3 "go.etcd.io/etcd/client/v3"
) )
@@ -16,9 +17,10 @@ type Service struct {
cfg *Config cfg *Config
etcdCli *clientv3.Client etcdCli *clientv3.Client


main *mainActor
providers *providersActor
watchEtcd *watchEtcdActor
mainActor *mainActor
providersActor *providersActor
watchEtcdActor *watchEtcdActor
leaseActor *leaseActor
} }


func NewService(cfg *Config) (*Service, error) { func NewService(cfg *Config) (*Service, error) {
@@ -35,47 +37,93 @@ func NewService(cfg *Config) (*Service, error) {
mainActor := newMainActor() mainActor := newMainActor()
providersActor := newProvidersActor() providersActor := newProvidersActor()
watchEtcdActor := newWatchEtcdActor() watchEtcdActor := newWatchEtcdActor()
leaseActor := newLeaseActor()


mainActor.Init(watchEtcdActor, providersActor) mainActor.Init(watchEtcdActor, providersActor)
providersActor.Init() providersActor.Init()
watchEtcdActor.Init(providersActor) watchEtcdActor.Init(providersActor)
leaseActor.Init(mainActor)


return &Service{ return &Service{
cfg: cfg,
etcdCli: etcdCli,
main: mainActor,
providers: providersActor,
watchEtcd: watchEtcdActor,
cfg: cfg,
etcdCli: etcdCli,
mainActor: mainActor,
providersActor: providersActor,
watchEtcdActor: watchEtcdActor,
leaseActor: leaseActor,
}, nil }, nil
} }


// Acquire 请求一批锁。成功后返回锁请求ID // Acquire 请求一批锁。成功后返回锁请求ID
func (svc *Service) Acquire(req LockRequest) (reqID string, err error) {
return svc.main.Acquire(req)
func (svc *Service) Acquire(req LockRequest) (string, error) {
reqID, err := svc.mainActor.Acquire(req)
if err != nil {
return "", err
}

// TODO 不影响结果,但考虑打日志
svc.leaseActor.Add(reqID, time.Duration(svc.cfg.LockRequestLeaseTimeSec)*time.Second)

return reqID, nil
} }


// Renew 续约锁 // Renew 续约锁
func (svc *Service) Renew(reqID string) error { func (svc *Service) Renew(reqID string) error {
panic("todo")

return svc.leaseActor.Renew(reqID, time.Duration(svc.cfg.LockRequestLeaseTimeSec)*time.Second)
} }


// Release 释放锁 // Release 释放锁
func (svc *Service) Release(reqID string) error { func (svc *Service) Release(reqID string) error {
return svc.main.Release(reqID)
err := svc.mainActor.Release(reqID)

// TODO 不影响结果,但考虑打日志
svc.leaseActor.Remove(reqID)

return err
} }


func (svc *Service) Serve() error { func (svc *Service) Serve() error {
go func() { go func() {
// TODO 处理错误 // TODO 处理错误
svc.providers.Serve()
svc.providersActor.Serve()
}() }()


go func() { go func() {
// TODO 处理错误 // TODO 处理错误
svc.watchEtcd.Serve()
svc.watchEtcdActor.Serve()
}() }()


// 考虑更好的错误处理方式
return svc.main.Serve()
go func() {
// TODO 处理错误
svc.mainActor.Serve()
}()

go func() {
// TODO 处理错误
svc.leaseActor.Server()
}()

err := svc.mainActor.ReloadEtcdData()
if err != nil {
// TODO 关闭其他的Actor,或者更好的错误处理方式
return fmt.Errorf("init data failed, err: %w", err)
}

err = svc.watchEtcdActor.StartWatching()
if err != nil {
// TODO 关闭其他的Actor,或者更好的错误处理方式
return fmt.Errorf("start watching etcd failed, err: %w", err)
}

err = svc.leaseActor.StartChecking()
if err != nil {
// TODO 关闭其他的Actor,或者更好的错误处理方式
return fmt.Errorf("start checking lease failed, err: %w", err)
}

// TODO 临时解决办法
ch := make(chan any)
<-ch

return nil
} }

Loading…
Cancel
Save