From ee124cadab87f0bab0728f7518a62d4c5a302e9f Mon Sep 17 00:00:00 2001 From: Sydonian <794346190@qq.com> Date: Mon, 12 Jun 2023 15:33:34 +0800 Subject: [PATCH] =?UTF-8?q?=E7=BB=99=E6=8F=90=E4=BA=A4=E7=9A=84=E9=94=81?= =?UTF-8?q?=E6=95=B0=E6=8D=AE=E8=AE=BE=E7=BD=AE=E7=A7=9F=E7=BA=A6?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- pkg/distlock/distlock.go | 16 ++++++++++++++++ pkg/distlock/main_actor.go | 11 ++++++++++- 2 files changed, 26 insertions(+), 1 deletion(-) diff --git a/pkg/distlock/distlock.go b/pkg/distlock/distlock.go index d11f5b0..d506f61 100644 --- a/pkg/distlock/distlock.go +++ b/pkg/distlock/distlock.go @@ -1,5 +1,7 @@ package distlock +import "fmt" + type Lock struct { Path []string // 锁路径,存储的是路径的每一部分 Name string // 锁名 @@ -44,3 +46,17 @@ type LockProvider interface { // Clear 清除内部所有状态 Clear() } + +type LockTargetBusyError struct { + lockName string +} + +func (e *LockTargetBusyError) Error() string { + return fmt.Sprintf("the lock object is locked by %s", e.lockName) +} + +func newLockTargetBusyError(lockName string) *LockTargetBusyError { + return &LockTargetBusyError{ + lockName: lockName, + } +} diff --git a/pkg/distlock/main_actor.go b/pkg/distlock/main_actor.go index e76af9c..8ec80b9 100644 --- a/pkg/distlock/main_actor.go +++ b/pkg/distlock/main_actor.go @@ -20,6 +20,8 @@ type mainActor struct { watchEtcdActor *watchEtcdActor providersActor *providersActor + + lockRequestLeaseID clientv3.LeaseID } func newMainActor() *mainActor { @@ -74,7 +76,8 @@ func (a *mainActor) Acquire(req LockRequest) (reqID string, err error) { txResp, err := a.etcdCli.Txn(context.Background()). Then( clientv3.OpPut(LOCK_REQUEST_INDEX, nextIndexStr), - clientv3.OpPut(makeEtcdLockRequestKey(nextIndexStr), string(reqBytes)), + // 归属到当前连接的租约,在当前连接断开后,能自动解锁 + clientv3.OpPut(makeEtcdLockRequestKey(nextIndexStr), string(reqBytes), clientv3.WithLease(a.lockRequestLeaseID)), ). Commit() if err != nil { @@ -238,6 +241,12 @@ func (a *mainActor) ReloadEtcdData() error { } func (a *mainActor) Serve() error { + lease, err := a.etcdCli.Grant(context.Background(), a.cfg.EtcdLockLeaseTimeSec) + if err != nil { + return fmt.Errorf("grant lease failed, err: %w", err) + } + a.lockRequestLeaseID = lease.ID + for { select { case cmd, ok := <-a.commandChan.ChanReceive():