diff --git a/pkgs/distlock/config.go b/pkgs/distlock/config.go index 65ffe67..ef2acb1 100644 --- a/pkgs/distlock/config.go +++ b/pkgs/distlock/config.go @@ -5,9 +5,6 @@ type Config struct { EtcdUsername string `json:"etcdUsername"` EtcdPassword string `json:"etcdPassword"` - EtcdLockLeaseTimeSec int64 `json:"etcdLockLeaseTimeSec"` // 全局锁的租约时间。锁服务会在这个时间内自动续约锁,但如果服务崩溃,则其他服务在租约到期后能重新获得锁。 - - // 写入锁请求数据到的ETCD的时候,不设置租约。开启此选项之后,请求锁的服务崩溃, - // 锁请求数据会依然留在ETCD中。仅供调试使用。 - SubmitLockRequestWithoutLease bool `json:"submitLockRequestWithoutLease"` + EtcdLockLeaseTimeSec int64 `json:"etcdLockLeaseTimeSec"` // 全局锁的租约时间。锁服务会在这个时间内自动续约锁,但如果服务崩溃,则其他服务在租约到期后能重新获得锁。 + RandomReleasingDelayMs int64 `json:"randomReleasingDelayMs"` // 释放锁失败,随机延迟之后再次尝试。延迟时间=random(0, RandomReleasingDelayMs) + 最少延迟时间(1000ms) } diff --git a/pkgs/distlock/service/internal/acquire_actor.go b/pkgs/distlock/service/internal/acquire_actor.go new file mode 100644 index 0000000..e7a2a39 --- /dev/null +++ b/pkgs/distlock/service/internal/acquire_actor.go @@ -0,0 +1,233 @@ +package internal + +import ( + "context" + "errors" + "fmt" + "strconv" + "sync" + + "gitlink.org.cn/cloudream/common/pkgs/distlock" + "gitlink.org.cn/cloudream/common/pkgs/future" + "gitlink.org.cn/cloudream/common/pkgs/logger" + mylo "gitlink.org.cn/cloudream/common/utils/lo" + "gitlink.org.cn/cloudream/common/utils/serder" + clientv3 "go.etcd.io/etcd/client/v3" + "go.etcd.io/etcd/client/v3/concurrency" +) + +var ErrAcquiringTimeout = errors.New("acquiring timeout") + +const ( + EtcdLockRequestData = "/distlock/lockRequest/data" + EtcdLockRequestIndex = "/distlock/lockRequest/index" + EtcdLockRequestLock = "/distlock/lockRequest/lock" +) + +type lockData struct { + Path []string `json:"path"` + Name string `json:"name"` + Target string `json:"target"` +} +type LockRequestData struct { + ID string `json:"id"` + Locks []lockData `json:"locks"` +} + +type acquireInfo struct { + Request distlock.LockRequest + Callback *future.SetValueFuture[string] + LastErr error +} + +type AcquireActor struct { + cfg *distlock.Config + etcdCli *clientv3.Client + providersActor *ProvidersActor + + acquirings []*acquireInfo + lock sync.Mutex +} + +func NewAcquireActor(cfg *distlock.Config, etcdCli *clientv3.Client) *AcquireActor { + return &AcquireActor{ + cfg: cfg, + etcdCli: etcdCli, + } +} + +func (a *AcquireActor) Init(providersActor *ProvidersActor) { + a.providersActor = providersActor +} + +// Acquire 请求一批锁。成功后返回锁请求ID +func (a *AcquireActor) Acquire(ctx context.Context, req distlock.LockRequest) (string, error) { + info := &acquireInfo{ + Request: req, + Callback: future.NewSetValue[string](), + } + + func() { + a.lock.Lock() + defer a.lock.Unlock() + + a.acquirings = append(a.acquirings, info) + // TODO 处理错误 + err := a.doAcquiring() + if err != nil { + logger.Std.Debugf("doing acquiring: %s", err.Error()) + } + }() + + go func() { + info.Callback.Wait(ctx) + + a.lock.Lock() + defer a.lock.Unlock() + + // 调用Callback时都加了锁,所以此处的IsComplete判断可以作为后续操作的依据 + if info.Callback.IsComplete() { + return + } + + a.acquirings = mylo.Remove(a.acquirings, info) + if info.LastErr != nil { + info.Callback.SetError(info.LastErr) + } else { + info.Callback.SetError(ErrAcquiringTimeout) + } + }() + + // 此处不能直接用ctx去等Callback,原因是Wait超时不代表锁没有获取到,这会导致锁泄露。 + return info.Callback.WaitValue(context.Background()) +} + +// TryAcquireNow 重试一下内部还没有成功的锁请求。不会阻塞调用者 +func (a *AcquireActor) TryAcquireNow() { + go func() { + a.lock.Lock() + defer a.lock.Unlock() + + err := a.doAcquiring() + if err != nil { + logger.Std.Debugf("doing acquiring: %s", err.Error()) + } + }() +} + +func (a *AcquireActor) doAcquiring() error { + ctx := context.Background() + + if len(a.acquirings) == 0 { + return nil + } + + // TODO 根据不同的错误设置不同的错误类型,方便上层进行后续处理 + unlock, err := acquireEtcdRequestDataLock(ctx, a.etcdCli, a.cfg.EtcdLockLeaseTimeSec) + if err != nil { + return fmt.Errorf("acquire etcd request data lock failed, err: %w", err) + } + defer unlock() + + index, err := getEtcdLockRequestIndex(ctx, a.etcdCli) + if err != nil { + return err + } + + // 等待本地状态同步到最新 + // TODO 配置等待时间 + err = a.providersActor.WaitIndexUpdated(ctx, index) + if err != nil { + return err + } + + // TODO 可以考虑一次性获得多个锁 + for i := 0; i < len(a.acquirings); i++ { + // 测试锁,并获得锁数据 + reqData, err := a.providersActor.TestLockRequestAndMakeData(a.acquirings[i].Request) + if err != nil { + a.acquirings[i].LastErr = err + continue + } + + nextIndexStr := strconv.FormatInt(index+1, 10) + reqData.ID = nextIndexStr + + // 锁成功,提交锁数据 + err = a.submitLockRequest(ctx, nextIndexStr, reqData) + if err != nil { + a.acquirings[i].LastErr = err + continue + } + + a.acquirings[i].Callback.SetValue(reqData.ID) + a.acquirings = mylo.RemoveAt(a.acquirings, i) + break + } + + return nil +} + +func (a *AcquireActor) submitLockRequest(ctx context.Context, index string, reqData LockRequestData) error { + reqBytes, err := serder.ObjectToJSON(reqData) + if err != nil { + return fmt.Errorf("serialize lock request data failed, err: %w", err) + } + + etcdOps := []clientv3.Op{ + clientv3.OpPut(EtcdLockRequestIndex, index), + clientv3.OpPut(makeEtcdLockRequestKey(reqData.ID), string(reqBytes)), + } + txResp, err := a.etcdCli.Txn(ctx).Then(etcdOps...).Commit() + if err != nil { + return fmt.Errorf("submit lock request data failed, err: %w", err) + } + if !txResp.Succeeded { + return fmt.Errorf("submit lock request data failed for lock request data index changed") + } + + return nil +} + +func acquireEtcdRequestDataLock(ctx context.Context, etcdCli *clientv3.Client, etcdLockLeaseTimeSec int64) (unlock func(), err error) { + lease, err := etcdCli.Grant(context.Background(), etcdLockLeaseTimeSec) + if err != nil { + return nil, fmt.Errorf("grant lease failed, err: %w", err) + } + + session, err := concurrency.NewSession(etcdCli, concurrency.WithLease(lease.ID)) + if err != nil { + return nil, fmt.Errorf("new session failed, err: %w", err) + } + + mutex := concurrency.NewMutex(session, EtcdLockRequestLock) + + err = mutex.Lock(ctx) + if err != nil { + session.Close() + return nil, fmt.Errorf("acquire lock failed, err: %w", err) + } + + return func() { + mutex.Unlock(context.Background()) + session.Close() + }, nil +} + +func getEtcdLockRequestIndex(ctx context.Context, etcdCli *clientv3.Client) (int64, error) { + indexKv, err := etcdCli.Get(ctx, EtcdLockRequestIndex) + if err != nil { + return 0, fmt.Errorf("get lock request index failed, err: %w", err) + } + + if len(indexKv.Kvs) == 0 { + return 0, nil + } + + index, err := strconv.ParseInt(string(indexKv.Kvs[0].Value), 0, 64) + if err != nil { + return 0, fmt.Errorf("parse lock request index failed, err: %w", err) + } + + return index, nil +} diff --git a/pkgs/distlock/service/internal/lease_actor.go b/pkgs/distlock/service/internal/lease_actor.go index 98eecf0..132430a 100644 --- a/pkgs/distlock/service/internal/lease_actor.go +++ b/pkgs/distlock/service/internal/lease_actor.go @@ -21,7 +21,7 @@ type LeaseActor struct { commandChan *actor.CommandChannel - mainActor *MainActor + releaseActor *ReleaseActor } func NewLeaseActor() *LeaseActor { @@ -31,8 +31,8 @@ func NewLeaseActor() *LeaseActor { } } -func (a *LeaseActor) Init(mainActor *MainActor) { - a.mainActor = mainActor +func (a *LeaseActor) Init(releaseActor *ReleaseActor) { + a.releaseActor = releaseActor } func (a *LeaseActor) StartChecking() error { @@ -113,15 +113,7 @@ func (a *LeaseActor) Serve() error { // TODO 可以考虑打个日志 logger.Std.Infof("lock request %s is timeout, will release it", reqID) - // TODO 可以考虑让超时时间可配置 - ctx, cancel := context.WithTimeout(context.Background(), time.Second*10) - err := a.mainActor.Release(ctx, reqID) - cancel() - if err == nil { - delete(a.leases, reqID) - } else { - logger.Std.Warnf("releasing lock request: %s", err.Error()) - } + a.releaseActor.DelayRelease([]string{reqID}) } } diff --git a/pkgs/distlock/service/internal/main_actor.go b/pkgs/distlock/service/internal/main_actor.go deleted file mode 100644 index c8fbff7..0000000 --- a/pkgs/distlock/service/internal/main_actor.go +++ /dev/null @@ -1,313 +0,0 @@ -package internal - -import ( - "context" - "fmt" - "strconv" - - "gitlink.org.cn/cloudream/common/pkgs/actor" - "gitlink.org.cn/cloudream/common/pkgs/distlock" - "gitlink.org.cn/cloudream/common/utils/serder" - clientv3 "go.etcd.io/etcd/client/v3" - "go.etcd.io/etcd/client/v3/clientv3util" - "go.etcd.io/etcd/client/v3/concurrency" -) - -const ( - EtcdLockRequestData = "/distlock/lockRequest/data" - EtcdLockRequestIndex = "/distlock/lockRequest/index" - EtcdLockRequestLock = "/distlock/lockRequest/lock" -) - -type lockData struct { - Path []string `json:"path"` - Name string `json:"name"` - Target string `json:"target"` -} - -type acquireManyResult struct { - IsTried bool - RequestID string - Err error -} - -type LockRequestData struct { - ID string `json:"id"` - Locks []lockData `json:"locks"` -} - -type MainActor struct { - cfg *distlock.Config - etcdCli *clientv3.Client - - commandChan *actor.CommandChannel - - providersActor *ProvidersActor - - lockRequestLeaseID clientv3.LeaseID -} - -func NewMainActor(cfg *distlock.Config, etcdCli *clientv3.Client) *MainActor { - return &MainActor{ - cfg: cfg, - etcdCli: etcdCli, - commandChan: actor.NewCommandChannel(), - } -} - -func (a *MainActor) Init(providersActor *ProvidersActor) { - a.providersActor = providersActor -} - -// Acquire 请求一批锁。成功后返回锁请求ID -func (a *MainActor) Acquire(ctx context.Context, req distlock.LockRequest) (reqID string, err error) { - rets, err := a.AcquireMany(ctx, []distlock.LockRequest{req}) - if err != nil { - return "", err - } - - if rets[0].Err != nil { - return "", rets[0].Err - } - - return rets[0].RequestID, nil -} - -// AcquireAny 尝试多个锁请求。目前的实现会在第一个获取成功后就直接返回 -func (a *MainActor) AcquireMany(ctx context.Context, reqs []distlock.LockRequest) (rets []acquireManyResult, err error) { - return actor.WaitValue(context.TODO(), a.commandChan, func() ([]acquireManyResult, error) { - // TODO 根据不同的错误设置不同的错误类型,方便上层进行后续处理 - unlock, err := a.acquireEtcdRequestDataLock(ctx) - if err != nil { - return nil, fmt.Errorf("acquire etcd request data lock failed, err: %w", err) - } - defer unlock() - - index, err := a.getEtcdLockRequestIndex(ctx) - if err != nil { - return nil, err - } - - // 等待本地状态同步到最新 - // TODO 配置等待时间 - err = a.providersActor.WaitIndexUpdated(ctx, index) - if err != nil { - return nil, err - } - - rets := make([]acquireManyResult, len(reqs)) - for i := 0; i < len(reqs); i++ { - // 测试锁,并获得锁数据 - reqData, err := a.providersActor.TestLockRequestAndMakeData(reqs[i]) - if err == nil { - nextIndexStr := strconv.FormatInt(index+1, 10) - reqData.ID = nextIndexStr - - // 锁成功,提交锁数据 - err := a.submitLockRequest(ctx, reqData) - - rets[i] = acquireManyResult{ - IsTried: true, - RequestID: nextIndexStr, - Err: err, - } - - break - - } else { - rets[i] = acquireManyResult{ - IsTried: true, - Err: err, - } - } - } - - return rets, nil - }) -} - -func (a *MainActor) submitLockRequest(ctx context.Context, reqData LockRequestData) error { - reqBytes, err := serder.ObjectToJSON(reqData) - if err != nil { - return fmt.Errorf("serialize lock request data failed, err: %w", err) - } - - var etcdOps []clientv3.Op - if a.cfg.SubmitLockRequestWithoutLease { - etcdOps = []clientv3.Op{ - clientv3.OpPut(EtcdLockRequestIndex, reqData.ID), - clientv3.OpPut(makeEtcdLockRequestKey(reqData.ID), string(reqBytes)), - } - - } else { - etcdOps = []clientv3.Op{ - clientv3.OpPut(EtcdLockRequestIndex, reqData.ID), - // 归属到当前连接的租约,在当前连接断开后,能自动解锁 - // TODO 不能直接给RequestData上租约,因为如果在别的服务已经获取到锁的情况下, - // 如果当前服务崩溃,删除消息会立刻发送出去,这就破坏了锁的约定(在锁定期间其他服务不能修改数据) - clientv3.OpPut(makeEtcdLockRequestKey(reqData.ID), string(reqBytes)), //, clientv3.WithLease(a.lockRequestLeaseID)), - } - } - txResp, err := a.etcdCli.Txn(ctx).Then(etcdOps...).Commit() - if err != nil { - return fmt.Errorf("submit lock request data failed, err: %w", err) - } - if !txResp.Succeeded { - return fmt.Errorf("submit lock request data failed for lock request data index changed") - } - - return nil -} - -// Release 释放锁 -func (a *MainActor) Release(ctx context.Context, reqID string) error { - return actor.Wait(context.TODO(), a.commandChan, func() error { - // TODO 根据不同的错误设置不同的错误类型,方便上层进行后续处理 - unlock, err := a.acquireEtcdRequestDataLock(ctx) - if err != nil { - return fmt.Errorf("acquire etcd request data lock failed, err: %w", err) - } - defer unlock() - - index, err := a.getEtcdLockRequestIndex(ctx) - if err != nil { - return err - } - - lockReqKey := makeEtcdLockRequestKey(reqID) - - txResp, err := a.etcdCli.Txn(ctx). - If(clientv3util.KeyExists(lockReqKey)). - Then(clientv3.OpDelete(lockReqKey), clientv3.OpPut(EtcdLockRequestIndex, strconv.FormatInt(index+1, 10))).Commit() - if err != nil { - return fmt.Errorf("updating lock request data index: %w", err) - } - if !txResp.Succeeded { - return fmt.Errorf("updating lock request data failed") - } - - return nil - }) -} - -func (a *MainActor) acquireEtcdRequestDataLock(ctx context.Context) (unlock func(), err error) { - lease, err := a.etcdCli.Grant(context.Background(), a.cfg.EtcdLockLeaseTimeSec) - if err != nil { - return nil, fmt.Errorf("grant lease failed, err: %w", err) - } - - session, err := concurrency.NewSession(a.etcdCli, concurrency.WithLease(lease.ID)) - if err != nil { - return nil, fmt.Errorf("new session failed, err: %w", err) - } - - mutex := concurrency.NewMutex(session, EtcdLockRequestLock) - - err = mutex.Lock(ctx) - if err != nil { - session.Close() - return nil, fmt.Errorf("acquire lock failed, err: %w", err) - } - - return func() { - mutex.Unlock(context.Background()) - session.Close() - }, nil -} - -func (a *MainActor) getEtcdLockRequestIndex(ctx context.Context) (int64, error) { - indexKv, err := a.etcdCli.Get(ctx, EtcdLockRequestIndex) - if err != nil { - return 0, fmt.Errorf("get lock request index failed, err: %w", err) - } - - if len(indexKv.Kvs) == 0 { - return 0, nil - } - - index, err := strconv.ParseInt(string(indexKv.Kvs[0].Value), 0, 64) - if err != nil { - return 0, fmt.Errorf("parse lock request index failed, err: %w", err) - } - - return index, nil -} - -func (a *MainActor) ReloadEtcdData() error { - return actor.Wait(context.TODO(), a.commandChan, func() error { - // 使用事务一次性获取index和锁数据,就不需要加全局锁了 - txResp, err := a.etcdCli.Txn(context.Background()). - Then( - clientv3.OpGet(EtcdLockRequestIndex), - clientv3.OpGet(EtcdLockRequestData, clientv3.WithPrefix()), - ). - Commit() - if err != nil { - return fmt.Errorf("get etcd data failed, err: %w", err) - } - if !txResp.Succeeded { - return fmt.Errorf("get etcd data failed") - } - - indexKvs := txResp.Responses[0].GetResponseRange().Kvs - lockKvs := txResp.Responses[1].GetResponseRange().Kvs - - var index int64 - var reqData []LockRequestData - - // 解析Index - if len(indexKvs) > 0 { - val, err := strconv.ParseInt(string(indexKvs[0].Value), 0, 64) - if err != nil { - return fmt.Errorf("parse lock request index failed, err: %w", err) - } - index = val - - } else { - index = 0 - } - - // 解析锁请求数据 - for _, kv := range lockKvs { - var req LockRequestData - err := serder.JSONToObject(kv.Value, &req) - if err != nil { - return fmt.Errorf("parse lock request data failed, err: %w", err) - } - - reqData = append(reqData, req) - } - - err = a.providersActor.ResetState(index, reqData) - if err != nil { - return fmt.Errorf("reset lock providers state failed, err: %w", err) - } - - return nil - }) -} - -func (a *MainActor) Serve() error { - lease, err := a.etcdCli.Grant(context.Background(), a.cfg.EtcdLockLeaseTimeSec) - if err != nil { - return fmt.Errorf("grant lease failed, err: %w", err) - } - a.lockRequestLeaseID = lease.ID - - cmdChan := a.commandChan.BeginChanReceive() - defer a.commandChan.CloseChanReceive() - - for { - select { - case cmd, ok := <-cmdChan: - if !ok { - return fmt.Errorf("command channel closed") - } - - // TODO Actor启动时,如果第一个调用的是Acquire,那么就会在Acquire中等待本地锁数据同步到最新。 - // 此时命令的执行也会被阻塞,导致ReloadEtcdData命令无法执行,因此产生死锁,最后Acquire超时失败。 - // 此处暂时使用单独的goroutine的来执行命令,避免阻塞。 - go cmd() - } - } -} diff --git a/pkgs/distlock/service/internal/providers_actor.go b/pkgs/distlock/service/internal/providers_actor.go index fa56a5b..c81d75c 100644 --- a/pkgs/distlock/service/internal/providers_actor.go +++ b/pkgs/distlock/service/internal/providers_actor.go @@ -7,6 +7,7 @@ import ( "gitlink.org.cn/cloudream/common/pkgs/actor" "gitlink.org.cn/cloudream/common/pkgs/distlock" "gitlink.org.cn/cloudream/common/pkgs/future" + "gitlink.org.cn/cloudream/common/pkgs/logger" "gitlink.org.cn/cloudream/common/pkgs/trie" ) @@ -56,30 +57,31 @@ func (a *ProvidersActor) WaitIndexUpdated(ctx context.Context, index int64) erro return fut.Wait(ctx) } -func (a *ProvidersActor) ApplyLockRequestEvents(events []LockRequestEvent) error { - return actor.Wait(context.TODO(), a.commandChan, func() error { +func (a *ProvidersActor) ApplyLockRequestEvents(events []LockRequestEvent) { + a.commandChan.Send(func() { for _, op := range events { if op.IsLocking { err := a.lockLockRequest(op.Data) if err != nil { - return fmt.Errorf("lock by lock request data failed, err: %w", err) + // TODO 发生这种错误需要重新加载全量状态,下同 + logger.Std.Warnf("applying locking event: %s", err.Error()) + return } } else { err := a.unlockLockRequest(op.Data) if err != nil { - return fmt.Errorf("unlock by lock request data failed, err: %w", err) + logger.Std.Warnf("applying unlocking event: %s", err.Error()) + return } } - } - // 处理了多少事件,Index就往后移动多少个 - a.localLockReqIndex += int64(len(events)) + // 处理了多少事件,Index就往后移动多少个 + a.localLockReqIndex++ + } // 检查是否有等待同步进度的需求 - a.checkIndexWaiter() - - return nil + a.wakeUpIndexWaiter() }) } @@ -181,13 +183,13 @@ func (a *ProvidersActor) ResetState(index int64, lockRequestData []LockRequestDa a.localLockReqIndex = index // 检查是否有等待同步进度的需求 - a.checkIndexWaiter() + a.wakeUpIndexWaiter() return nil }) } -func (a *ProvidersActor) checkIndexWaiter() { +func (a *ProvidersActor) wakeUpIndexWaiter() { var resetWaiters []indexWaiter for _, waiter := range a.indexWaiters { if waiter.Index <= a.localLockReqIndex { diff --git a/pkgs/distlock/service/internal/release_actor.go b/pkgs/distlock/service/internal/release_actor.go new file mode 100644 index 0000000..c633d53 --- /dev/null +++ b/pkgs/distlock/service/internal/release_actor.go @@ -0,0 +1,144 @@ +package internal + +import ( + "context" + "fmt" + "math/rand" + "strconv" + "sync" + "time" + + "gitlink.org.cn/cloudream/common/pkgs/distlock" + "gitlink.org.cn/cloudream/common/pkgs/logger" + clientv3 "go.etcd.io/etcd/client/v3" + "go.etcd.io/etcd/client/v3/clientv3util" +) + +const ( + DefaultMaxReleaseingDelayMs = 4000 + BaseReleaseingDelayMs = 1000 +) + +type ReleaseActor struct { + cfg *distlock.Config + etcdCli *clientv3.Client + + releasingLockRequestIDs map[string]bool + timer *time.Timer + timerSetuped bool + lock sync.Mutex +} + +func NewReleaseActor(cfg *distlock.Config, etcdCli *clientv3.Client) *ReleaseActor { + return &ReleaseActor{ + cfg: cfg, + etcdCli: etcdCli, + releasingLockRequestIDs: make(map[string]bool), + } +} + +// 立刻尝试释放这些锁。一般用于在用户主动释放了一个锁之后 +func (a *ReleaseActor) Release(reqIDs []string) { + a.lock.Lock() + defer a.lock.Unlock() + + for _, id := range reqIDs { + a.releasingLockRequestIDs[id] = true + } + + // TODO 处理错误 + err := a.doReleasing() + if err != nil { + logger.Std.Debugf("doing releasing: %s", err.Error()) + } + + a.setupTimer() +} + +// 延迟释放锁。一般用于清理崩溃的锁服务遗留下来的锁 +func (a *ReleaseActor) DelayRelease(reqIDs []string) { + a.lock.Lock() + defer a.lock.Unlock() + + for _, id := range reqIDs { + a.releasingLockRequestIDs[id] = true + } + + a.setupTimer() +} + +func (a *ReleaseActor) doReleasing() error { + ctx := context.TODO() + + if len(a.releasingLockRequestIDs) == 0 { + return nil + } + + // TODO 根据不同的错误设置不同的错误类型,方便上层进行后续处理 + unlock, err := acquireEtcdRequestDataLock(ctx, a.etcdCli, a.cfg.EtcdLockLeaseTimeSec) + if err != nil { + return fmt.Errorf("acquire etcd request data lock failed, err: %w", err) + } + defer unlock() + + index, err := getEtcdLockRequestIndex(ctx, a.etcdCli) + if err != nil { + return err + } + + // TODO 可以考虑优化成一次性删除多个锁 + for id := range a.releasingLockRequestIDs { + lockReqKey := makeEtcdLockRequestKey(id) + + txResp, err := a.etcdCli.Txn(ctx). + If(clientv3util.KeyExists(lockReqKey)). + Then(clientv3.OpDelete(lockReqKey), clientv3.OpPut(EtcdLockRequestIndex, strconv.FormatInt(index+1, 10))).Commit() + if err != nil { + return fmt.Errorf("updating lock request data: %w", err) + } + // 只有确实删除了锁数据,才更新index + if txResp.Succeeded { + index++ + } + delete(a.releasingLockRequestIDs, id) + } + + return nil +} + +func (a *ReleaseActor) setupTimer() { + if len(a.releasingLockRequestIDs) == 0 { + return + } + + if a.timerSetuped { + return + } + a.timerSetuped = true + + delay := int64(0) + if a.cfg.RandomReleasingDelayMs == 0 { + delay = rand.Int63n(DefaultMaxReleaseingDelayMs) + } else { + delay = rand.Int63n(a.cfg.RandomReleasingDelayMs) + } + + if a.timer == nil { + a.timer = time.NewTimer(time.Duration(delay+BaseReleaseingDelayMs) * time.Millisecond) + } else { + a.timer.Reset(time.Duration(delay+BaseReleaseingDelayMs) * time.Millisecond) + } + + go func() { + <-a.timer.C + a.timerSetuped = false + + // TODO 处理错误 + err := a.doReleasing() + if err != nil { + logger.Std.Debugf("doing releasing: %s", err.Error()) + } + + a.setupTimer() + }() +} diff --git a/pkgs/distlock/service/internal/retry_actor.go b/pkgs/distlock/service/internal/retry_actor.go deleted file mode 100644 index 6f6e304..0000000 --- a/pkgs/distlock/service/internal/retry_actor.go +++ /dev/null @@ -1,128 +0,0 @@ -package internal - -import ( - "context" - "fmt" - - "github.com/samber/lo" - "gitlink.org.cn/cloudream/common/pkgs/actor" - "gitlink.org.cn/cloudream/common/pkgs/distlock" - "gitlink.org.cn/cloudream/common/pkgs/future" - "gitlink.org.cn/cloudream/common/pkgs/logger" - mylo "gitlink.org.cn/cloudream/common/utils/lo" -) - -type retryInfo struct { - Callback *future.SetValueFuture[string] - LastErr error -} - -type RetryActor struct { - retrys []distlock.LockRequest - retryInfos []*retryInfo - - commandChan *actor.CommandChannel - - mainActor *MainActor -} - -func NewRetryActor() *RetryActor { - return &RetryActor{ - commandChan: actor.NewCommandChannel(), - } -} - -func (a *RetryActor) Init(mainActor *MainActor) { - a.mainActor = mainActor -} - -func (a *RetryActor) Retry(ctx context.Context, req distlock.LockRequest, lastErr error) (future.ValueFuture[string], error) { - fut := future.NewSetValue[string]() - - var info *retryInfo - err := actor.Wait(ctx, a.commandChan, func() error { - a.retrys = append(a.retrys, req) - info = &retryInfo{ - Callback: fut, - LastErr: lastErr, - } - a.retryInfos = append(a.retryInfos, info) - return nil - }) - if err != nil { - return nil, err - } - - go func() { - <-ctx.Done() - a.commandChan.Send(func() { - // 由于只可能在cmd中修改future状态,所以此处的IsComplete判断可以作为后续操作的依据 - if fut.IsComplete() { - return - } - - index := lo.IndexOf(a.retryInfos, info) - if index == -1 { - return - } - - a.retryInfos[index].Callback.SetError(a.retryInfos[index].LastErr) - - a.retrys = mylo.RemoveAt(a.retrys, index) - a.retryInfos = mylo.RemoveAt(a.retryInfos, index) - }) - }() - - return fut, nil -} - -func (a *RetryActor) OnLocalStateUpdated() { - a.commandChan.Send(func() { - if len(a.retrys) == 0 { - return - } - - rets, err := a.mainActor.AcquireMany(context.Background(), a.retrys) - if err != nil { - // TODO 处理错误 - logger.Std.Warnf("acquire many lock requests failed, err: %s", err.Error()) - return - } - - // 根据尝试的结果更新状态 - delCnt := 0 - for i, ret := range rets { - a.retrys[i-delCnt] = a.retrys[i] - a.retryInfos[i-delCnt] = a.retryInfos[i] - - if !ret.IsTried { - continue - } - - if ret.Err != nil { - a.retryInfos[i].LastErr = ret.Err - } else { - a.retryInfos[i].Callback.SetValue(ret.RequestID) - delCnt++ - } - } - a.retrys = a.retrys[:len(a.retrys)-delCnt] - a.retryInfos = a.retryInfos[:len(a.retryInfos)-delCnt] - }) -} - -func (a *RetryActor) Serve() error { - cmdChan := a.commandChan.BeginChanReceive() - defer a.commandChan.CloseChanReceive() - - for { - select { - case cmd, ok := <-cmdChan: - if !ok { - return fmt.Errorf("command channel closed") - } - - cmd() - } - } -} diff --git a/pkgs/distlock/service/internal/watch_etcd_actor.go b/pkgs/distlock/service/internal/watch_etcd_actor.go index 5d0530c..2d0b791 100644 --- a/pkgs/distlock/service/internal/watch_etcd_actor.go +++ b/pkgs/distlock/service/internal/watch_etcd_actor.go @@ -37,9 +37,9 @@ func NewWatchEtcdActor(etcdCli *clientv3.Client) *WatchEtcdActor { func (a *WatchEtcdActor) Init() { } -func (a *WatchEtcdActor) StartWatching() error { +func (a *WatchEtcdActor) StartWatching(revision int64) error { return actor.Wait(context.TODO(), a.commandChan, func() error { - a.watchChan = a.etcdCli.Watch(context.Background(), EtcdLockRequestData, clientv3.WithPrefix(), clientv3.WithPrevKV()) + a.watchChan = a.etcdCli.Watch(context.Background(), EtcdLockRequestData, clientv3.WithPrefix(), clientv3.WithPrevKV(), clientv3.WithRev(revision)) return nil }) } diff --git a/pkgs/distlock/service/mutex.go b/pkgs/distlock/service/mutex.go index b386ff1..abba728 100644 --- a/pkgs/distlock/service/mutex.go +++ b/pkgs/distlock/service/mutex.go @@ -25,6 +25,6 @@ func (m *Mutex) Lock() error { return nil } -func (m *Mutex) Unlock() error { - return m.svc.Release(m.lockReqID) +func (m *Mutex) Unlock() { + m.svc.Release(m.lockReqID) } diff --git a/pkgs/distlock/service/service.go b/pkgs/distlock/service/service.go index a655a7d..8cb8e34 100644 --- a/pkgs/distlock/service/service.go +++ b/pkgs/distlock/service/service.go @@ -3,11 +3,13 @@ package service import ( "context" "fmt" + "strconv" "time" "gitlink.org.cn/cloudream/common/pkgs/distlock" "gitlink.org.cn/cloudream/common/pkgs/distlock/service/internal" "gitlink.org.cn/cloudream/common/pkgs/logger" + "gitlink.org.cn/cloudream/common/utils/serder" clientv3 "go.etcd.io/etcd/client/v3" ) @@ -46,11 +48,11 @@ type Service struct { cfg *distlock.Config etcdCli *clientv3.Client - mainActor *internal.MainActor + acquireActor *internal.AcquireActor + releaseActor *internal.ReleaseActor providersActor *internal.ProvidersActor watchEtcdActor *internal.WatchEtcdActor leaseActor *internal.LeaseActor - retryActor *internal.RetryActor lockReqEventWatcher internal.LockRequestEventWatcher } @@ -66,17 +68,16 @@ func NewService(cfg *distlock.Config, initProvs []PathProvider) (*Service, error return nil, fmt.Errorf("new etcd client failed, err: %w", err) } - mainActor := internal.NewMainActor(cfg, etcdCli) + acquireActor := internal.NewAcquireActor(cfg, etcdCli) + releaseActor := internal.NewReleaseActor(cfg, etcdCli) providersActor := internal.NewProvidersActor() watchEtcdActor := internal.NewWatchEtcdActor(etcdCli) leaseActor := internal.NewLeaseActor() - retryActor := internal.NewRetryActor() - mainActor.Init(providersActor) + acquireActor.Init(providersActor) providersActor.Init() watchEtcdActor.Init() - leaseActor.Init(mainActor) - retryActor.Init(mainActor) + leaseActor.Init(releaseActor) for _, prov := range initProvs { providersActor.AddProvider(prov.Provider, prov.Path...) @@ -85,11 +86,11 @@ func NewService(cfg *distlock.Config, initProvs []PathProvider) (*Service, error return &Service{ cfg: cfg, etcdCli: etcdCli, - mainActor: mainActor, + acquireActor: acquireActor, + releaseActor: releaseActor, providersActor: providersActor, watchEtcdActor: watchEtcdActor, leaseActor: leaseActor, - retryActor: retryActor, }, nil } @@ -109,18 +110,9 @@ func (svc *Service) Acquire(req distlock.LockRequest, opts ...AcquireOptionFn) ( defer cancel() } - reqID, err := svc.mainActor.Acquire(ctx, req) + reqID, err := svc.acquireActor.Acquire(ctx, req) if err != nil { - fut, err := svc.retryActor.Retry(ctx, req, err) - if err != nil { - return "", fmt.Errorf("retrying failed, err: %w", err) - } - - // Retry 如果超时,Retry内部会设置fut为Failed,所以这里可以用Background无限等待 - reqID, err = fut.WaitValue(context.Background()) - if err != nil { - return "", err - } + return "", err } if opt.Lease > 0 { @@ -140,16 +132,14 @@ func (svc *Service) Renew(reqID string) error { } // Release 释放锁 -func (svc *Service) Release(reqID string) error { - err := svc.mainActor.Release(context.TODO(), reqID) +func (svc *Service) Release(reqID string) { + svc.releaseActor.Release([]string{reqID}) // TODO 不影响结果,但考虑打日志 - err2 := svc.leaseActor.Remove(reqID) - if err2 != nil { - logger.Std.Warnf("removing lease: %s", err2.Error()) + err := svc.leaseActor.Remove(reqID) + if err != nil { + logger.Std.Warnf("removing lease: %s", err.Error()) } - - return err } func (svc *Service) Serve() error { @@ -174,14 +164,6 @@ func (svc *Service) Serve() error { } }() - go func() { - // TODO 处理错误 - err := svc.mainActor.Serve() - if err != nil { - logger.Std.Warnf("serving main actor failed, err: %s", err.Error()) - } - }() - go func() { // TODO 处理错误 err := svc.leaseActor.Serve() @@ -190,23 +172,15 @@ func (svc *Service) Serve() error { } }() - go func() { - // TODO 处理错误 - err := svc.retryActor.Serve() - if err != nil { - logger.Std.Warnf("serving retry actor failed, err: %s", err.Error()) - } - }() - - err := svc.mainActor.ReloadEtcdData() + revision, err := svc.loadState() if err != nil { // TODO 关闭其他的Actor,或者更好的错误处理方式 return fmt.Errorf("init data failed, err: %w", err) } svc.lockReqEventWatcher.OnEvent = func(events []internal.LockRequestEvent) { + svc.acquireActor.TryAcquireNow() svc.providersActor.ApplyLockRequestEvents(events) - svc.retryActor.OnLocalStateUpdated() } err = svc.watchEtcdActor.AddEventWatcher(&svc.lockReqEventWatcher) if err != nil { @@ -214,7 +188,7 @@ func (svc *Service) Serve() error { return fmt.Errorf("add lock request event watcher failed, err: %w", err) } - err = svc.watchEtcdActor.StartWatching() + err = svc.watchEtcdActor.StartWatching(revision) if err != nil { // TODO 关闭其他的Actor,或者更好的错误处理方式 return fmt.Errorf("start watching etcd failed, err: %w", err) @@ -232,3 +206,52 @@ func (svc *Service) Serve() error { return nil } + +func (svc *Service) loadState() (int64, error) { + // 使用事务一次性获取index和锁数据,就不需要加全局锁了 + txResp, err := svc.etcdCli.Txn(context.Background()). + Then( + clientv3.OpGet(internal.EtcdLockRequestIndex), + clientv3.OpGet(internal.EtcdLockRequestData, clientv3.WithPrefix()), + ). + Commit() + if err != nil { + return 0, fmt.Errorf("get etcd data failed, err: %w", err) + } + + indexKvs := txResp.Responses[0].GetResponseRange().Kvs + lockKvs := txResp.Responses[1].GetResponseRange().Kvs + + var index int64 + var reqData []internal.LockRequestData + + // 解析Index + if len(indexKvs) > 0 { + val, err := strconv.ParseInt(string(indexKvs[0].Value), 0, 64) + if err != nil { + return 0, fmt.Errorf("parse lock request index failed, err: %w", err) + } + index = val + + } else { + index = 0 + } + + // 解析锁请求数据 + for _, kv := range lockKvs { + var req internal.LockRequestData + err := serder.JSONToObject(kv.Value, &req) + if err != nil { + return 0, fmt.Errorf("parse lock request data failed, err: %w", err) + } + + reqData = append(reqData, req) + } + + err = svc.providersActor.ResetState(index, reqData) + if err != nil { + return 0, fmt.Errorf("reset lock providers state failed, err: %w", err) + } + + return txResp.Header.Revision, nil +} diff --git a/pkgs/future/set_value_future.go b/pkgs/future/set_value_future.go index fad7d36..4aea9e8 100644 --- a/pkgs/future/set_value_future.go +++ b/pkgs/future/set_value_future.go @@ -48,6 +48,8 @@ func (f *SetValueFuture[T]) IsComplete() bool { return f.isCompleted } +// 等待直到Complete或者ctx被取消。 +// 注:返回ErrContextCancelled不代表产生结果的过程没有执行过,甚至不代表Future没有Complete func (f *SetValueFuture[T]) Wait(ctx context.Context) error { select { case <-f.completeChan: