From ef0b05608b8bd5e9a2c60da0361fc77795abe566 Mon Sep 17 00:00:00 2001 From: Sydonian <794346190@qq.com> Date: Tue, 13 Jun 2023 10:34:22 +0800 Subject: [PATCH] =?UTF-8?q?=E8=B0=83=E6=95=B4=E7=9B=AE=E5=BD=95=E7=BB=93?= =?UTF-8?q?=E6=9E=84?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- go.mod | 2 +- .../service/{ => internal}/lease_actor.go | 24 +++++------ .../service/{ => internal}/main_actor.go | 43 +++++++++++++------ .../service/{ => internal}/providers_actor.go | 26 +++++------ pkg/distlock/service/{ => internal}/utils.go | 2 +- .../{ => internal}/watch_etcd_actor.go | 20 ++++----- pkg/distlock/service/service.go | 34 ++++----------- 7 files changed, 76 insertions(+), 75 deletions(-) rename pkg/distlock/service/{ => internal}/lease_actor.go (79%) rename pkg/distlock/service/{ => internal}/main_actor.go (86%) rename pkg/distlock/service/{ => internal}/providers_actor.go (87%) rename pkg/distlock/service/{ => internal}/utils.go (99%) rename pkg/distlock/service/{ => internal}/watch_etcd_actor.go (84%) diff --git a/go.mod b/go.mod index 17f3f47..883d95b 100644 --- a/go.mod +++ b/go.mod @@ -14,7 +14,6 @@ require ( github.com/smartystreets/goconvey v1.8.0 github.com/zyedidia/generic v1.2.1 gitlink.org.cn/cloudream/proto v0.0.0 - go.etcd.io/etcd/api/v3 v3.5.9 go.etcd.io/etcd/client/v3 v3.5.9 golang.org/x/exp v0.0.0-20230519143937-03e91628a987 ) @@ -50,6 +49,7 @@ require ( github.com/smartystreets/assertions v1.13.1 // indirect github.com/spaolacci/murmur3 v1.1.0 // indirect github.com/whyrusleeping/tar-utils v0.0.0-20180509141711-8c6c8ba81d5c // indirect + go.etcd.io/etcd/api/v3 v3.5.9 // indirect go.etcd.io/etcd/client/pkg/v3 v3.5.9 // indirect go.uber.org/atomic v1.10.0 // indirect go.uber.org/multierr v1.9.0 // indirect diff --git a/pkg/distlock/service/lease_actor.go b/pkg/distlock/service/internal/lease_actor.go similarity index 79% rename from pkg/distlock/service/lease_actor.go rename to pkg/distlock/service/internal/lease_actor.go index 3864475..889101b 100644 --- a/pkg/distlock/service/lease_actor.go +++ b/pkg/distlock/service/internal/lease_actor.go @@ -1,4 +1,4 @@ -package service +package internal import ( "fmt" @@ -12,34 +12,34 @@ type lockRequestLease struct { Deadline time.Time } -type leaseActor struct { +type LeaseActor struct { leases map[string]*lockRequestLease ticker *time.Ticker commandChan *actor.CommandChannel - mainActor *mainActor + mainActor *MainActor } -func newLeaseActor() *leaseActor { - return &leaseActor{ +func NewLeaseActor() *LeaseActor { + return &LeaseActor{ leases: make(map[string]*lockRequestLease), commandChan: actor.NewCommandChannel(), } } -func (a *leaseActor) Init(mainActor *mainActor) { +func (a *LeaseActor) Init(mainActor *MainActor) { a.mainActor = mainActor } -func (a *leaseActor) StartChecking() error { +func (a *LeaseActor) StartChecking() error { return actor.Wait(a.commandChan, func() error { a.ticker = time.NewTicker(time.Second) return nil }) } -func (a *leaseActor) StopChecking() error { +func (a *LeaseActor) StopChecking() error { return actor.Wait(a.commandChan, func() error { if a.ticker != nil { a.ticker.Stop() @@ -49,7 +49,7 @@ func (a *leaseActor) StopChecking() error { }) } -func (a *leaseActor) Add(reqID string, leaseTime time.Duration) error { +func (a *LeaseActor) Add(reqID string, leaseTime time.Duration) error { return actor.Wait(a.commandChan, func() error { lease, ok := a.leases[reqID] if !ok { @@ -66,7 +66,7 @@ func (a *leaseActor) Add(reqID string, leaseTime time.Duration) error { }) } -func (a *leaseActor) Renew(reqID string, leaseTime time.Duration) error { +func (a *LeaseActor) Renew(reqID string, leaseTime time.Duration) error { return actor.Wait(a.commandChan, func() error { lease, ok := a.leases[reqID] if !ok { @@ -80,14 +80,14 @@ func (a *leaseActor) Renew(reqID string, leaseTime time.Duration) error { }) } -func (a *leaseActor) Remove(reqID string) error { +func (a *LeaseActor) Remove(reqID string) error { return actor.Wait(a.commandChan, func() error { delete(a.leases, reqID) return nil }) } -func (a *leaseActor) Server() error { +func (a *LeaseActor) Server() error { for { if a.ticker != nil { select { diff --git a/pkg/distlock/service/main_actor.go b/pkg/distlock/service/internal/main_actor.go similarity index 86% rename from pkg/distlock/service/main_actor.go rename to pkg/distlock/service/internal/main_actor.go index 3f48aa4..91fac82 100644 --- a/pkg/distlock/service/main_actor.go +++ b/pkg/distlock/service/internal/main_actor.go @@ -1,4 +1,4 @@ -package service +package internal import ( "context" @@ -13,31 +13,48 @@ import ( "go.etcd.io/etcd/client/v3/concurrency" ) -type mainActor struct { +const ( + LOCK_REQUEST_DATA_PREFIX = "/distlock/lockRequest/data" + LOCK_REQUEST_INDEX = "/distlock/lockRequest/index" + LOCK_REQUEST_LOCK_NAME = "/distlock/lockRequest/lock" +) + +type lockData struct { + Path []string `json:"path"` + Name string `json:"name"` + Target string `json:"target"` +} + +type lockRequestData struct { + ID string `json:"id"` + Locks []lockData `json:"locks"` +} + +type MainActor struct { cfg *distlock.Config etcdCli *clientv3.Client commandChan *actor.CommandChannel - watchEtcdActor *watchEtcdActor - providersActor *providersActor + watchEtcdActor *WatchEtcdActor + providersActor *ProvidersActor lockRequestLeaseID clientv3.LeaseID } -func newMainActor() *mainActor { - return &mainActor{ +func NewMainActor() *MainActor { + return &MainActor{ commandChan: actor.NewCommandChannel(), } } -func (a *mainActor) Init(watchEtcdActor *watchEtcdActor, providersActor *providersActor) { +func (a *MainActor) Init(watchEtcdActor *WatchEtcdActor, providersActor *ProvidersActor) { a.watchEtcdActor = watchEtcdActor a.providersActor = providersActor } // Acquire 请求一批锁。成功后返回锁请求ID -func (a *mainActor) Acquire(req distlock.LockRequest) (reqID string, err error) { +func (a *MainActor) Acquire(req distlock.LockRequest) (reqID string, err error) { return actor.WaitValue[string](a.commandChan, func() (string, error) { // TODO 根据不同的错误设置不同的错误类型,方便上层进行后续处理 unlock, err := a.acquireEtcdRequestDataLock() @@ -93,7 +110,7 @@ func (a *mainActor) Acquire(req distlock.LockRequest) (reqID string, err error) } // Release 释放锁 -func (a *mainActor) Release(reqID string) error { +func (a *MainActor) Release(reqID string) error { return actor.Wait(a.commandChan, func() error { // TODO 根据不同的错误设置不同的错误类型,方便上层进行后续处理 unlock, err := a.acquireEtcdRequestDataLock() @@ -128,7 +145,7 @@ func (a *mainActor) Release(reqID string) error { }) } -func (a *mainActor) acquireEtcdRequestDataLock() (unlock func(), err error) { +func (a *MainActor) acquireEtcdRequestDataLock() (unlock func(), err error) { lease, err := a.etcdCli.Grant(context.Background(), a.cfg.EtcdLockLeaseTimeSec) if err != nil { return nil, fmt.Errorf("grant lease failed, err: %w", err) @@ -157,7 +174,7 @@ func (a *mainActor) acquireEtcdRequestDataLock() (unlock func(), err error) { }, nil } -func (a *mainActor) getEtcdLockRequestIndex() (int64, error) { +func (a *MainActor) getEtcdLockRequestIndex() (int64, error) { indexKv, err := a.etcdCli.Get(context.Background(), LOCK_REQUEST_INDEX) if err != nil { return 0, fmt.Errorf("get lock request index failed, err: %w", err) @@ -175,7 +192,7 @@ func (a *mainActor) getEtcdLockRequestIndex() (int64, error) { return index, nil } -func (a *mainActor) ReloadEtcdData() error { +func (a *MainActor) ReloadEtcdData() error { return actor.Wait(a.commandChan, func() error { // 使用事务一次性获取index和锁数据,就不需要加全局锁了 txResp, err := a.etcdCli.Txn(context.Background()). @@ -241,7 +258,7 @@ func (a *mainActor) ReloadEtcdData() error { }) } -func (a *mainActor) Serve() error { +func (a *MainActor) Serve() error { lease, err := a.etcdCli.Grant(context.Background(), a.cfg.EtcdLockLeaseTimeSec) if err != nil { return fmt.Errorf("grant lease failed, err: %w", err) diff --git a/pkg/distlock/service/providers_actor.go b/pkg/distlock/service/internal/providers_actor.go similarity index 87% rename from pkg/distlock/service/providers_actor.go rename to pkg/distlock/service/internal/providers_actor.go index 6d19da5..36d44bb 100644 --- a/pkg/distlock/service/providers_actor.go +++ b/pkg/distlock/service/internal/providers_actor.go @@ -1,4 +1,4 @@ -package service +package internal import ( "fmt" @@ -19,7 +19,7 @@ type lockRequestDataUpdateOp struct { IsLock bool } -type providersActor struct { +type ProvidersActor struct { localLockReqIndex int64 provdersTrie trie.Trie[distlock.LockProvider] allProviders []distlock.LockProvider @@ -29,16 +29,16 @@ type providersActor struct { commandChan *actor.CommandChannel } -func newProvidersActor() *providersActor { - return &providersActor{ +func NewProvidersActor() *ProvidersActor { + return &ProvidersActor{ commandChan: actor.NewCommandChannel(), } } -func (a *providersActor) Init() { +func (a *ProvidersActor) Init() { } -func (a *providersActor) WaitIndexUpdated(index int64) error { +func (a *ProvidersActor) WaitIndexUpdated(index int64) error { fut := future.NewSetVoid() a.commandChan.Send(func() { @@ -55,7 +55,7 @@ func (a *providersActor) WaitIndexUpdated(index int64) error { return fut.Wait() } -func (a *providersActor) BatchUpdateByLockRequestData(ops []lockRequestDataUpdateOp) error { +func (a *ProvidersActor) BatchUpdateByLockRequestData(ops []lockRequestDataUpdateOp) error { return actor.Wait(a.commandChan, func() error { for _, op := range ops { if op.IsLock { @@ -82,7 +82,7 @@ func (a *providersActor) BatchUpdateByLockRequestData(ops []lockRequestDataUpdat }) } -func (svc *providersActor) lockLockRequest(reqData lockRequestData) error { +func (svc *ProvidersActor) lockLockRequest(reqData lockRequestData) error { for _, lockData := range reqData.Locks { node, ok := svc.provdersTrie.WalkEnd(lockData.Path) if !ok || node.Value == nil { @@ -106,7 +106,7 @@ func (svc *providersActor) lockLockRequest(reqData lockRequestData) error { return nil } -func (svc *providersActor) unlockLockRequest(reqData lockRequestData) error { +func (svc *ProvidersActor) unlockLockRequest(reqData lockRequestData) error { for _, lockData := range reqData.Locks { node, ok := svc.provdersTrie.WalkEnd(lockData.Path) if !ok || node.Value == nil { @@ -131,7 +131,7 @@ func (svc *providersActor) unlockLockRequest(reqData lockRequestData) error { } // TestLockRequestAndMakeData 判断锁能否锁成功,并生成锁数据的字符串表示。注:不会生成请求ID -func (a *providersActor) TestLockRequestAndMakeData(req distlock.LockRequest) (lockRequestData, error) { +func (a *ProvidersActor) TestLockRequestAndMakeData(req distlock.LockRequest) (lockRequestData, error) { return actor.WaitValue[lockRequestData](a.commandChan, func() (lockRequestData, error) { reqData := lockRequestData{} @@ -163,7 +163,7 @@ func (a *providersActor) TestLockRequestAndMakeData(req distlock.LockRequest) (l } // ResetState 重置内部状态 -func (a *providersActor) ResetState(index int64, lockRequestData []lockRequestData) error { +func (a *ProvidersActor) ResetState(index int64, lockRequestData []lockRequestData) error { return actor.Wait(a.commandChan, func() error { for _, p := range a.allProviders { p.Clear() @@ -185,7 +185,7 @@ func (a *providersActor) ResetState(index int64, lockRequestData []lockRequestDa }) } -func (a *providersActor) checkIndexWaiter() { +func (a *ProvidersActor) checkIndexWaiter() { var resetWaiters []indexWaiter for _, waiter := range a.indexWaiters { if waiter.Index <= a.localLockReqIndex { @@ -197,7 +197,7 @@ func (a *providersActor) checkIndexWaiter() { a.indexWaiters = resetWaiters } -func (a *providersActor) Serve() error { +func (a *ProvidersActor) Serve() error { for { select { case cmd, ok := <-a.commandChan.ChanReceive(): diff --git a/pkg/distlock/service/utils.go b/pkg/distlock/service/internal/utils.go similarity index 99% rename from pkg/distlock/service/utils.go rename to pkg/distlock/service/internal/utils.go index 6fbbdcb..9c029bc 100644 --- a/pkg/distlock/service/utils.go +++ b/pkg/distlock/service/internal/utils.go @@ -1,4 +1,4 @@ -package service +package internal import ( "strings" diff --git a/pkg/distlock/service/watch_etcd_actor.go b/pkg/distlock/service/internal/watch_etcd_actor.go similarity index 84% rename from pkg/distlock/service/watch_etcd_actor.go rename to pkg/distlock/service/internal/watch_etcd_actor.go index 55f560f..db852f5 100644 --- a/pkg/distlock/service/watch_etcd_actor.go +++ b/pkg/distlock/service/internal/watch_etcd_actor.go @@ -1,4 +1,4 @@ -package service +package internal import ( "context" @@ -9,40 +9,40 @@ import ( clientv3 "go.etcd.io/etcd/client/v3" ) -type watchEtcdActor struct { +type WatchEtcdActor struct { etcdCli *clientv3.Client watchChan clientv3.WatchChan commandChan *actor.CommandChannel - providers *providersActor + providers *ProvidersActor } -func newWatchEtcdActor() *watchEtcdActor { - return &watchEtcdActor{ +func NewWatchEtcdActor() *WatchEtcdActor { + return &WatchEtcdActor{ commandChan: actor.NewCommandChannel(), } } -func (a *watchEtcdActor) Init(providers *providersActor) { +func (a *WatchEtcdActor) Init(providers *ProvidersActor) { a.providers = providers } -func (a *watchEtcdActor) StartWatching() error { +func (a *WatchEtcdActor) StartWatching() error { return actor.Wait(a.commandChan, func() error { a.watchChan = a.etcdCli.Watch(context.Background(), LOCK_REQUEST_DATA_PREFIX, clientv3.WithPrefix()) return nil }) } -func (a *watchEtcdActor) StopWatching() error { +func (a *WatchEtcdActor) StopWatching() error { return actor.Wait(a.commandChan, func() error { a.watchChan = nil return nil }) } -func (a *watchEtcdActor) Serve() error { +func (a *WatchEtcdActor) Serve() error { for { if a.watchChan != nil { select { @@ -85,7 +85,7 @@ func (a *watchEtcdActor) Serve() error { } } -func (a *watchEtcdActor) parseEvents(watchResp clientv3.WatchResponse) ([]lockRequestDataUpdateOp, error) { +func (a *WatchEtcdActor) parseEvents(watchResp clientv3.WatchResponse) ([]lockRequestDataUpdateOp, error) { var ops []lockRequestDataUpdateOp for _, e := range watchResp.Events { diff --git a/pkg/distlock/service/service.go b/pkg/distlock/service/service.go index ea91562..0abd633 100644 --- a/pkg/distlock/service/service.go +++ b/pkg/distlock/service/service.go @@ -5,34 +5,18 @@ import ( "time" "gitlink.org.cn/cloudream/common/pkg/distlock" + "gitlink.org.cn/cloudream/common/pkg/distlock/service/internal" clientv3 "go.etcd.io/etcd/client/v3" ) -const ( - LOCK_REQUEST_DATA_PREFIX = "/distlock/lockRequest/data" - LOCK_REQUEST_INDEX = "/distlock/lockRequest/index" - LOCK_REQUEST_LOCK_NAME = "/distlock/lockRequest/lock" -) - -type lockData struct { - Path []string `json:"path"` - Name string `json:"name"` - Target string `json:"target"` -} - -type lockRequestData struct { - ID string `json:"id"` - Locks []lockData `json:"locks"` -} - type Service struct { cfg *distlock.Config etcdCli *clientv3.Client - mainActor *mainActor - providersActor *providersActor - watchEtcdActor *watchEtcdActor - leaseActor *leaseActor + mainActor *internal.MainActor + providersActor *internal.ProvidersActor + watchEtcdActor *internal.WatchEtcdActor + leaseActor *internal.LeaseActor } func NewService(cfg *distlock.Config) (*Service, error) { @@ -46,10 +30,10 @@ func NewService(cfg *distlock.Config) (*Service, error) { return nil, fmt.Errorf("new etcd client failed, err: %w", err) } - mainActor := newMainActor() - providersActor := newProvidersActor() - watchEtcdActor := newWatchEtcdActor() - leaseActor := newLeaseActor() + mainActor := internal.NewMainActor() + providersActor := internal.NewProvidersActor() + watchEtcdActor := internal.NewWatchEtcdActor() + leaseActor := internal.NewLeaseActor() mainActor.Init(watchEtcdActor, providersActor) providersActor.Init()