diff --git a/pkg/distlock/config.go b/pkg/distlock/config.go index 3111904..c8d6d41 100644 --- a/pkg/distlock/config.go +++ b/pkg/distlock/config.go @@ -5,10 +5,8 @@ type Config struct { EtcdUsername string `json:"etcdUsername"` EtcdPassword string `json:"etcdPassword"` - LockRequestDataConfig LockRequestDataConfig `json:"lockRequestDataConfig"` -} + EtcdLockAcquireTimeoutMs int `json:"etcdLockAcquireTimeoutMs"` // 获取Etcd全局锁的超时时间 + EtcdLockLeaseTimeSec int64 `json:"etcdLockLeaseTimeSec"` // 全局锁的租约时间。锁服务会在这个时间内自动续约锁,但如果服务崩溃,则其他服务在租约到期后能重新获得锁。 -type LockRequestDataConfig struct { - AcquireTimeoutMs int `json:"acquireTimeoutMs"` // 获取Etcd全局锁的超时时间 - LeaseTimeSec int64 `json:"leaseTimeSec"` // 全局锁的租约时间。锁服务会在这个时间内自动续约锁,但如果服务崩溃,则其他服务在租约到期后能重新获得锁。 + LockRequestLeaseTimeSec int64 `json:"lockRequestLeaseTimeSec"` // 锁请求的租约时间。调用方必须在这个时间内调用Renew续约。 } diff --git a/pkg/distlock/lease_actor.go b/pkg/distlock/lease_actor.go new file mode 100644 index 0000000..b3cd1df --- /dev/null +++ b/pkg/distlock/lease_actor.go @@ -0,0 +1,125 @@ +package distlock + +import ( + "fmt" + "time" + + "gitlink.org.cn/cloudream/common/pkg/actor" +) + +type lockRequestLease struct { + RequestID string + Deadline time.Time +} + +type leaseActor struct { + leases map[string]*lockRequestLease + ticker *time.Ticker + + commandChan *actor.CommandChannel + + mainActor *mainActor +} + +func newLeaseActor() *leaseActor { + return &leaseActor{ + leases: make(map[string]*lockRequestLease), + commandChan: actor.NewCommandChannel(), + } +} + +func (a *leaseActor) Init(mainActor *mainActor) { + a.mainActor = mainActor +} + +func (a *leaseActor) StartChecking() error { + return actor.Wait(a.commandChan, func() error { + a.ticker = time.NewTicker(time.Second) + return nil + }) +} + +func (a *leaseActor) StopChecking() error { + return actor.Wait(a.commandChan, func() error { + if a.ticker != nil { + a.ticker.Stop() + } + a.ticker = nil + return nil + }) +} + +func (a *leaseActor) Add(reqID string, leaseTime time.Duration) error { + return actor.Wait(a.commandChan, func() error { + lease, ok := a.leases[reqID] + if !ok { + lease = &lockRequestLease{ + RequestID: reqID, + Deadline: time.Now().Add(leaseTime), + } + a.leases[reqID] = lease + } else { + lease.Deadline = time.Now().Add(leaseTime) + } + + return nil + }) +} + +func (a *leaseActor) Renew(reqID string, leaseTime time.Duration) error { + return actor.Wait(a.commandChan, func() error { + lease, ok := a.leases[reqID] + if !ok { + return fmt.Errorf("lease not found for this lock request") + + } else { + lease.Deadline = time.Now().Add(leaseTime) + } + + return nil + }) +} + +func (a *leaseActor) Remove(reqID string) error { + return actor.Wait(a.commandChan, func() error { + delete(a.leases, reqID) + return nil + }) +} + +func (a *leaseActor) Server() error { + for { + if a.ticker != nil { + select { + case cmd, ok := <-a.commandChan.ChanReceive(): + if !ok { + a.ticker.Stop() + return fmt.Errorf("command chan closed") + } + + cmd() + + case now := <-a.ticker.C: + for reqID, lease := range a.leases { + if now.After(lease.Deadline) { + delete(a.leases, reqID) + + // TODO 可以考虑打个日志 + + a.mainActor.Release(reqID) + } + } + + } + } else { + select { + case cmd, ok := <-a.commandChan.ChanReceive(): + if !ok { + return fmt.Errorf("command chan closed") + } + + cmd() + } + } + } +} diff --git a/pkg/distlock/main_actor.go b/pkg/distlock/main_actor.go index 260527d..e76af9c 100644 --- a/pkg/distlock/main_actor.go +++ b/pkg/distlock/main_actor.go @@ -18,8 +18,8 @@ type mainActor struct { commandChan *actor.CommandChannel - watchEtcd *watchEtcdActor - providers *providersActor + watchEtcdActor *watchEtcdActor + providersActor *providersActor } func newMainActor() *mainActor { @@ -28,9 +28,9 @@ func newMainActor() *mainActor { } } -func (a *mainActor) Init(watchEtcd *watchEtcdActor, providers *providersActor) { - a.watchEtcd = watchEtcd - a.providers = providers +func (a *mainActor) Init(watchEtcdActor *watchEtcdActor, providersActor *providersActor) { + a.watchEtcdActor = watchEtcdActor + a.providersActor = providersActor } // Acquire 请求一批锁。成功后返回锁请求ID @@ -49,13 +49,13 @@ func (a *mainActor) Acquire(req LockRequest) (reqID string, err error) { } // 等待本地状态同步到最新 - err = a.providers.WaitIndexUpdated(index) + err = a.providersActor.WaitIndexUpdated(index) if err != nil { return "", err } // 测试锁,并获得锁数据 - reqData, err := a.providers.TestLockRequestAndMakeData(req) + reqData, err := a.providersActor.TestLockRequestAndMakeData(req) if err != nil { return "", err } @@ -125,7 +125,7 @@ func (a *mainActor) Release(reqID string) error { } func (a *mainActor) acquireEtcdRequestDataLock() (unlock func(), err error) { - lease, err := a.etcdCli.Grant(context.Background(), a.cfg.LockRequestDataConfig.LeaseTimeSec) + lease, err := a.etcdCli.Grant(context.Background(), a.cfg.EtcdLockLeaseTimeSec) if err != nil { return nil, fmt.Errorf("grant lease failed, err: %w", err) } @@ -139,7 +139,7 @@ func (a *mainActor) acquireEtcdRequestDataLock() (unlock func(), err error) { mutex := concurrency.NewMutex(session, LOCK_REQUEST_LOCK_NAME) timeout, cancelFunc := context.WithTimeout(context.Background(), - time.Duration(a.cfg.LockRequestDataConfig.AcquireTimeoutMs)*time.Millisecond) + time.Duration(a.cfg.EtcdLockAcquireTimeoutMs)*time.Millisecond) defer cancelFunc() err = mutex.Lock(timeout) @@ -218,17 +218,17 @@ func (a *mainActor) ReloadEtcdData() error { // 先停止监听,再重置锁状态,最后恢复监听 - err = a.watchEtcd.StopWatching() + err = a.watchEtcdActor.StopWatching() if err != nil { return fmt.Errorf("stop watching etcd failed, err: %w", err) } - err = a.providers.ResetState(index, reqData) + err = a.providersActor.ResetState(index, reqData) if err != nil { return fmt.Errorf("reset lock providers state failed, err: %w", err) } - err = a.watchEtcd.StartWatching() + err = a.watchEtcdActor.StartWatching() if err != nil { return fmt.Errorf("start watching etcd failed, err: %w", err) } diff --git a/pkg/distlock/service.go b/pkg/distlock/service.go index f08a253..7b6272f 100644 --- a/pkg/distlock/service.go +++ b/pkg/distlock/service.go @@ -2,6 +2,7 @@ package distlock import ( "fmt" + "time" clientv3 "go.etcd.io/etcd/client/v3" ) @@ -16,9 +17,10 @@ type Service struct { cfg *Config etcdCli *clientv3.Client - main *mainActor - providers *providersActor - watchEtcd *watchEtcdActor + mainActor *mainActor + providersActor *providersActor + watchEtcdActor *watchEtcdActor + leaseActor *leaseActor } func NewService(cfg *Config) (*Service, error) { @@ -35,47 +37,93 @@ func NewService(cfg *Config) (*Service, error) { mainActor := newMainActor() providersActor := newProvidersActor() watchEtcdActor := newWatchEtcdActor() + leaseActor := newLeaseActor() mainActor.Init(watchEtcdActor, providersActor) providersActor.Init() watchEtcdActor.Init(providersActor) + leaseActor.Init(mainActor) return &Service{ - cfg: cfg, - etcdCli: etcdCli, - main: mainActor, - providers: providersActor, - watchEtcd: watchEtcdActor, + cfg: cfg, + etcdCli: etcdCli, + mainActor: mainActor, + providersActor: providersActor, + watchEtcdActor: watchEtcdActor, + leaseActor: leaseActor, }, nil } // Acquire 请求一批锁。成功后返回锁请求ID -func (svc *Service) Acquire(req LockRequest) (reqID string, err error) { - return svc.main.Acquire(req) +func (svc *Service) Acquire(req LockRequest) (string, error) { + reqID, err := svc.mainActor.Acquire(req) + if err != nil { + return "", err + } + + // TODO 不影响结果,但考虑打日志 + svc.leaseActor.Add(reqID, time.Duration(svc.cfg.LockRequestLeaseTimeSec)*time.Second) + + return reqID, nil } // Renew 续约锁 func (svc *Service) Renew(reqID string) error { - panic("todo") - + return svc.leaseActor.Renew(reqID, time.Duration(svc.cfg.LockRequestLeaseTimeSec)*time.Second) } // Release 释放锁 func (svc *Service) Release(reqID string) error { - return svc.main.Release(reqID) + err := svc.mainActor.Release(reqID) + + // TODO 不影响结果,但考虑打日志 + svc.leaseActor.Remove(reqID) + + return err } func (svc *Service) Serve() error { go func() { // TODO 处理错误 - svc.providers.Serve() + svc.providersActor.Serve() }() go func() { // TODO 处理错误 - svc.watchEtcd.Serve() + svc.watchEtcdActor.Serve() }() - // 考虑更好的错误处理方式 - return svc.main.Serve() + go func() { + // TODO 处理错误 + svc.mainActor.Serve() + }() + + go func() { + // TODO 处理错误 + svc.leaseActor.Server() + }() + + err := svc.mainActor.ReloadEtcdData() + if err != nil { + // TODO 关闭其他的Actor,或者更好的错误处理方式 + return fmt.Errorf("init data failed, err: %w", err) + } + + err = svc.watchEtcdActor.StartWatching() + if err != nil { + // TODO 关闭其他的Actor,或者更好的错误处理方式 + return fmt.Errorf("start watching etcd failed, err: %w", err) + } + + err = svc.leaseActor.StartChecking() + if err != nil { + // TODO 关闭其他的Actor,或者更好的错误处理方式 + return fmt.Errorf("start checking lease failed, err: %w", err) + } + + // TODO 临时解决办法 + ch := make(chan any) + <-ch + + return nil }