diff --git a/pkg/distlock/distlock.go b/pkg/distlock/distlock.go index d11f5b0..d506f61 100644 --- a/pkg/distlock/distlock.go +++ b/pkg/distlock/distlock.go @@ -1,5 +1,7 @@ package distlock +import "fmt" + type Lock struct { Path []string // 锁路径,存储的是路径的每一部分 Name string // 锁名 @@ -44,3 +46,17 @@ type LockProvider interface { // Clear 清除内部所有状态 Clear() } + +type LockTargetBusyError struct { + lockName string +} + +func (e *LockTargetBusyError) Error() string { + return fmt.Sprintf("the lock object is locked by %s", e.lockName) +} + +func newLockTargetBusyError(lockName string) *LockTargetBusyError { + return &LockTargetBusyError{ + lockName: lockName, + } +} diff --git a/pkg/distlock/main_actor.go b/pkg/distlock/main_actor.go index e76af9c..8ec80b9 100644 --- a/pkg/distlock/main_actor.go +++ b/pkg/distlock/main_actor.go @@ -20,6 +20,8 @@ type mainActor struct { watchEtcdActor *watchEtcdActor providersActor *providersActor + + lockRequestLeaseID clientv3.LeaseID } func newMainActor() *mainActor { @@ -74,7 +76,8 @@ func (a *mainActor) Acquire(req LockRequest) (reqID string, err error) { txResp, err := a.etcdCli.Txn(context.Background()). Then( clientv3.OpPut(LOCK_REQUEST_INDEX, nextIndexStr), - clientv3.OpPut(makeEtcdLockRequestKey(nextIndexStr), string(reqBytes)), + // 归属到当前连接的租约,在当前连接断开后,能自动解锁 + clientv3.OpPut(makeEtcdLockRequestKey(nextIndexStr), string(reqBytes), clientv3.WithLease(a.lockRequestLeaseID)), ). Commit() if err != nil { @@ -238,6 +241,12 @@ func (a *mainActor) ReloadEtcdData() error { } func (a *mainActor) Serve() error { + lease, err := a.etcdCli.Grant(context.Background(), a.cfg.EtcdLockLeaseTimeSec) + if err != nil { + return fmt.Errorf("grant lease failed, err: %w", err) + } + a.lockRequestLeaseID = lease.ID + for { select { case cmd, ok := <-a.commandChan.ChanReceive():