diff --git a/pkgs/distlock/config.go b/pkgs/distlock/config.go deleted file mode 100644 index 65ffe67..0000000 --- a/pkgs/distlock/config.go +++ /dev/null @@ -1,13 +0,0 @@ -package distlock - -type Config struct { - EtcdAddress string `json:"etcdAddress"` - EtcdUsername string `json:"etcdUsername"` - EtcdPassword string `json:"etcdPassword"` - - EtcdLockLeaseTimeSec int64 `json:"etcdLockLeaseTimeSec"` // 全局锁的租约时间。锁服务会在这个时间内自动续约锁,但如果服务崩溃,则其他服务在租约到期后能重新获得锁。 - - // 写入锁请求数据到的ETCD的时候,不设置租约。开启此选项之后,请求锁的服务崩溃, - // 锁请求数据会依然留在ETCD中。仅供调试使用。 - SubmitLockRequestWithoutLease bool `json:"submitLockRequestWithoutLease"` -} diff --git a/pkgs/distlock/distlock.go b/pkgs/distlock/distlock.go index 93091bb..2972c74 100644 --- a/pkgs/distlock/distlock.go +++ b/pkgs/distlock/distlock.go @@ -1,40 +1,18 @@ package distlock -import "fmt" +import ( + "fmt" -type Lock struct { - Path []string // 锁路径,存储的是路径的每一部分 - Name string // 锁名 - Target any // 锁对象,由具体的Provider去解析 -} - -type LockRequest struct { - Locks []Lock -} - -func (b *LockRequest) Add(lock Lock) { - b.Locks = append(b.Locks, lock) -} - -type LockProvider interface { - // CanLock 判断这个锁能否锁定成功 - CanLock(lock Lock) error + "gitlink.org.cn/cloudream/common/pkgs/distlock/internal" +) - // Lock 锁定。由于同一个锁请求内的锁不检查冲突,因此这个函数必须支持有冲突的锁进行锁定。 - Lock(reqID string, lock Lock) error +type Lock = internal.Lock - // 解锁 - Unlock(reqID string, lock Lock) error +type LockRequest = internal.LockRequest - // GetTargetString 将锁对象序列化为字符串,方便存储到ETCD - GetTargetString(target any) (string, error) +type LockProvider = internal.LockProvider - // ParseTargetString 解析字符串格式的锁对象数据 - ParseTargetString(targetStr string) (any, error) - - // Clear 清除内部所有状态 - Clear() -} +type Config = internal.Config type LockTargetBusyError struct { lockName string diff --git a/pkgs/distlock/internal/acquire_actor.go b/pkgs/distlock/internal/acquire_actor.go new file mode 100644 index 0000000..35fd1a0 --- /dev/null +++ b/pkgs/distlock/internal/acquire_actor.go @@ -0,0 +1,282 @@ +package internal + +import ( + "context" + "errors" + "fmt" + "strconv" + "sync" + "time" + + "gitlink.org.cn/cloudream/common/pkgs/future" + "gitlink.org.cn/cloudream/common/pkgs/logger" + mylo "gitlink.org.cn/cloudream/common/utils/lo" + "gitlink.org.cn/cloudream/common/utils/serder" + clientv3 "go.etcd.io/etcd/client/v3" + "go.etcd.io/etcd/client/v3/concurrency" +) + +var ErrAcquiringTimeout = errors.New("acquiring timeout") + +type acquireInfo struct { + Request LockRequest + Callback *future.SetValueFuture[string] + LastErr error +} + +type AcquireActor struct { + cfg *Config + etcdCli *clientv3.Client + providersActor *ProvidersActor + + isMaintenance bool + serviceID string + acquirings []*acquireInfo + lock sync.Mutex + doAcquiringChan chan any +} + +func NewAcquireActor(cfg *Config, etcdCli *clientv3.Client) *AcquireActor { + return &AcquireActor{ + cfg: cfg, + etcdCli: etcdCli, + isMaintenance: true, + doAcquiringChan: make(chan any), + } +} + +func (a *AcquireActor) Init(providersActor *ProvidersActor) { + a.providersActor = providersActor +} + +// Acquire 请求一批锁。成功后返回锁请求ID +func (a *AcquireActor) Acquire(ctx context.Context, req LockRequest) (string, error) { + info := &acquireInfo{ + Request: req, + Callback: future.NewSetValue[string](), + } + + func() { + a.lock.Lock() + defer a.lock.Unlock() + + a.acquirings = append(a.acquirings, info) + + // 如果处于维护模式,那么只接受请求,不实际去处理 + if a.isMaintenance { + return + } + + select { + case a.doAcquiringChan <- nil: + default: + } + }() + + go func() { + info.Callback.Wait(ctx) + + a.lock.Lock() + defer a.lock.Unlock() + + // 调用Callback时都加了锁,所以此处的IsComplete判断可以作为后续操作的依据 + if info.Callback.IsComplete() { + return + } + + a.acquirings = mylo.Remove(a.acquirings, info) + if info.LastErr != nil { + info.Callback.SetError(info.LastErr) + } else { + info.Callback.SetError(ErrAcquiringTimeout) + } + }() + + // 此处不能直接用ctx去等Callback,原因是Wait超时不代表锁没有获取到,这会导致锁泄露。 + return info.Callback.WaitValue(context.Background()) +} + +// TryAcquireNow 重试一下内部还没有成功的锁请求。不会阻塞调用者 +func (a *AcquireActor) TryAcquireNow() { + go func() { + a.lock.Lock() + defer a.lock.Unlock() + + // 处于维护模式中时,即使是主动触发Acqurire也不予理会 + if a.isMaintenance { + return + } + + select { + case a.doAcquiringChan <- nil: + default: + } + }() +} + +// 进入维护模式。维护模式期间只接受请求,不处理请求。 +func (a *AcquireActor) EnterMaintenance() { + a.lock.Lock() + defer a.lock.Unlock() + + a.isMaintenance = true +} + +// 退出维护模式。退出之后建议调用一下TryAcquireNow。 +func (a *AcquireActor) LeaveMaintenance() { + a.lock.Lock() + defer a.lock.Unlock() + + a.isMaintenance = false +} + +func (a *AcquireActor) ResetState(serviceID string) { + a.lock.Lock() + defer a.lock.Unlock() + + a.serviceID = serviceID +} + +func (a *AcquireActor) Serve() { + for { + select { + case <-a.doAcquiringChan: + err := a.doAcquiring() + if err != nil { + logger.Std.Debugf("doing acquiring: %s", err.Error()) + } + } + } +} + +func (a *AcquireActor) doAcquiring() error { + ctx := context.Background() + + // 先看一眼,如果没有需要请求的锁,就不用走后面的流程了 + a.lock.Lock() + if len(a.acquirings) == 0 { + a.lock.Unlock() + return nil + } + a.lock.Unlock() + + // 在获取全局锁的时候不用锁Actor,只有获取成功了,才加锁 + // TODO 根据不同的错误设置不同的错误类型,方便上层进行后续处理 + unlock, err := acquireEtcdRequestDataLock(ctx, a.etcdCli, a.cfg.EtcdLockLeaseTimeSec) + if err != nil { + return fmt.Errorf("acquire etcd request data lock failed, err: %w", err) + } + defer unlock() + + index, err := getEtcdLockRequestIndex(ctx, a.etcdCli) + if err != nil { + return err + } + + logger.Std.Infof("wait to: %d", index) + + // 等待本地状态同步到最新 + // TODO 配置等待时间 + err = a.providersActor.WaitLocalIndexTo(ctx, index) + if err != nil { + return err + } + + a.lock.Lock() + defer a.lock.Unlock() + // TODO 可以考虑一次性获得多个锁 + for i := 0; i < len(a.acquirings); i++ { + req := a.acquirings[i] + + // 测试锁,并获得锁数据 + reqData, err := a.providersActor.TestLockRequestAndMakeData(req.Request) + logger.Std.Infof("6") + if err != nil { + req.LastErr = err + continue + } + + nextIndexStr := strconv.FormatInt(index+1, 10) + reqData.ID = nextIndexStr + reqData.SerivceID = a.serviceID + reqData.Reason = req.Request.Reason + reqData.Timestamp = time.Now().Unix() + + // 锁成功,提交锁数据 + err = a.submitLockRequest(ctx, nextIndexStr, reqData) + if err != nil { + req.LastErr = err + continue + } + + req.Callback.SetValue(reqData.ID) + a.acquirings = mylo.RemoveAt(a.acquirings, i) + break + } + + return nil +} + +func (a *AcquireActor) submitLockRequest(ctx context.Context, index string, reqData LockRequestData) error { + reqBytes, err := serder.ObjectToJSON(reqData) + if err != nil { + return fmt.Errorf("serialize lock request data failed, err: %w", err) + } + + etcdOps := []clientv3.Op{ + clientv3.OpPut(EtcdLockRequestIndex, index), + clientv3.OpPut(MakeEtcdLockRequestKey(reqData.ID), string(reqBytes)), + } + txResp, err := a.etcdCli.Txn(ctx).Then(etcdOps...).Commit() + if err != nil { + return fmt.Errorf("submit lock request data failed, err: %w", err) + } + if !txResp.Succeeded { + return fmt.Errorf("submit lock request data failed for lock request data index changed") + } + + return nil +} + +func acquireEtcdRequestDataLock(ctx context.Context, etcdCli *clientv3.Client, etcdLockLeaseTimeSec int64) (unlock func(), err error) { + lease, err := etcdCli.Grant(context.Background(), etcdLockLeaseTimeSec) + if err != nil { + return nil, fmt.Errorf("grant lease failed, err: %w", err) + } + + session, err := concurrency.NewSession(etcdCli, concurrency.WithLease(lease.ID)) + if err != nil { + return nil, fmt.Errorf("new session failed, err: %w", err) + } + + mutex := concurrency.NewMutex(session, EtcdLockRequestLock) + + err = mutex.Lock(ctx) + if err != nil { + session.Close() + return nil, fmt.Errorf("acquire lock failed, err: %w", err) + } + + return func() { + mutex.Unlock(context.Background()) + session.Close() + }, nil +} + +func getEtcdLockRequestIndex(ctx context.Context, etcdCli *clientv3.Client) (int64, error) { + indexKv, err := etcdCli.Get(ctx, EtcdLockRequestIndex) + if err != nil { + return 0, fmt.Errorf("get lock request index failed, err: %w", err) + } + + if len(indexKv.Kvs) == 0 { + return 0, nil + } + + index, err := strconv.ParseInt(string(indexKv.Kvs[0].Value), 0, 64) + if err != nil { + return 0, fmt.Errorf("parse lock request index failed, err: %w", err) + } + + return index, nil +} diff --git a/pkgs/distlock/internal/config.go b/pkgs/distlock/internal/config.go new file mode 100644 index 0000000..cf97cad --- /dev/null +++ b/pkgs/distlock/internal/config.go @@ -0,0 +1,11 @@ +package internal + +type Config struct { + EtcdAddress string `json:"etcdAddress"` + EtcdUsername string `json:"etcdUsername"` + EtcdPassword string `json:"etcdPassword"` + + EtcdLockLeaseTimeSec int64 `json:"etcdLockLeaseTimeSec"` // 全局锁的租约时间。锁服务会在这个时间内自动续约锁,但如果服务崩溃,则其他服务在租约到期后能重新获得锁。 + RandomReleasingDelayMs int64 `json:"randomReleasingDelayMs"` // 释放锁失败,随机延迟之后再次尝试。延迟时间=random(0, RandomReleasingDelayMs) + 最少延迟时间(1000ms) + ServiceDescription string `json:"serviceDescription"` // 锁服务描述信息,锁服务启动后会注册到Etcd中 +} diff --git a/pkgs/distlock/service/internal/lease_actor.go b/pkgs/distlock/internal/lease_actor.go similarity index 73% rename from pkgs/distlock/service/internal/lease_actor.go rename to pkgs/distlock/internal/lease_actor.go index 98eecf0..41c6800 100644 --- a/pkgs/distlock/service/internal/lease_actor.go +++ b/pkgs/distlock/internal/lease_actor.go @@ -21,7 +21,7 @@ type LeaseActor struct { commandChan *actor.CommandChannel - mainActor *MainActor + releaseActor *ReleaseActor } func NewLeaseActor() *LeaseActor { @@ -31,18 +31,18 @@ func NewLeaseActor() *LeaseActor { } } -func (a *LeaseActor) Init(mainActor *MainActor) { - a.mainActor = mainActor +func (a *LeaseActor) Init(releaseActor *ReleaseActor) { + a.releaseActor = releaseActor } -func (a *LeaseActor) StartChecking() error { +func (a *LeaseActor) Start() error { return actor.Wait(context.TODO(), a.commandChan, func() error { a.ticker = time.NewTicker(time.Second) return nil }) } -func (a *LeaseActor) StopChecking() error { +func (a *LeaseActor) Stop() error { return actor.Wait(context.TODO(), a.commandChan, func() error { if a.ticker != nil { a.ticker.Stop() @@ -91,19 +91,14 @@ func (a *LeaseActor) Remove(reqID string) error { }) } -func (a *LeaseActor) Serve() error { +func (a *LeaseActor) Serve() { cmdChan := a.commandChan.BeginChanReceive() defer a.commandChan.CloseChanReceive() for { if a.ticker != nil { select { - case cmd, ok := <-cmdChan: - if !ok { - a.ticker.Stop() - return fmt.Errorf("command chan closed") - } - + case cmd := <-cmdChan: cmd() case now := <-a.ticker.C: @@ -113,26 +108,14 @@ func (a *LeaseActor) Serve() error { // TODO 可以考虑打个日志 logger.Std.Infof("lock request %s is timeout, will release it", reqID) - // TODO 可以考虑让超时时间可配置 - ctx, cancel := context.WithTimeout(context.Background(), time.Second*10) - err := a.mainActor.Release(ctx, reqID) - cancel() - if err == nil { - delete(a.leases, reqID) - } else { - logger.Std.Warnf("releasing lock request: %s", err.Error()) - } + a.releaseActor.DelayRelease([]string{reqID}) } } } } else { select { - case cmd, ok := <-cmdChan: - if !ok { - return fmt.Errorf("command chan closed") - } - + case cmd := <-cmdChan: cmd() } } diff --git a/pkgs/distlock/internal/models.go b/pkgs/distlock/internal/models.go new file mode 100644 index 0000000..808cbad --- /dev/null +++ b/pkgs/distlock/internal/models.go @@ -0,0 +1,77 @@ +package internal + +import "strings" + +const ( + EtcdLockRequestDataPrefix = "/distlock/lockRequest/data" + EtcdLockRequestIndex = "/distlock/lockRequest/index" + EtcdLockRequestLock = "/distlock/lockRequest/lock" + EtcdServiceInfoPrefix = "/distlock/services" + EtcdWatchPrefix = "/distlock" +) + +type Lock struct { + Path []string // 锁路径,存储的是路径的每一部分 + Name string // 锁名 + Target any // 锁对象,由具体的Provider去解析 +} + +type LockRequest struct { + Reason string + Locks []Lock +} + +func (b *LockRequest) Add(lock Lock) { + b.Locks = append(b.Locks, lock) +} + +type LockProvider interface { + // CanLock 判断这个锁能否锁定成功 + CanLock(lock Lock) error + + // Lock 锁定。由于同一个锁请求内的锁不检查冲突,因此这个函数必须支持有冲突的锁进行锁定。 + Lock(reqID string, lock Lock) error + + // 解锁 + Unlock(reqID string, lock Lock) error + + // GetTargetString 将锁对象序列化为字符串,方便存储到ETCD + GetTargetString(target any) (string, error) + + // ParseTargetString 解析字符串格式的锁对象数据 + ParseTargetString(targetStr string) (any, error) + + // Clear 清除内部所有状态 + Clear() +} + +type lockData struct { + Path []string `json:"path"` + Name string `json:"name"` + Target string `json:"target"` +} + +type LockRequestData struct { + ID string `json:"id"` + SerivceID string `json:"serviceID"` + Reason string `json:"reason"` + Timestamp int64 `json:"timestamp"` + Locks []lockData `json:"locks"` +} + +func MakeEtcdLockRequestKey(reqID string) string { + return EtcdLockRequestDataPrefix + "/" + reqID +} + +func GetLockRequestID(key string) string { + return strings.TrimPrefix(key, EtcdLockRequestDataPrefix+"/") +} + +func MakeServiceInfoKey(svcID string) string { + return EtcdServiceInfoPrefix + "/" + svcID +} + +type ServiceInfo struct { + ID string `json:"id"` + Description string `json:"description"` +} diff --git a/pkgs/distlock/internal/providers_actor.go b/pkgs/distlock/internal/providers_actor.go new file mode 100644 index 0000000..c2cd6d6 --- /dev/null +++ b/pkgs/distlock/internal/providers_actor.go @@ -0,0 +1,202 @@ +package internal + +import ( + "context" + "errors" + "fmt" + "sync" + + "gitlink.org.cn/cloudream/common/pkgs/future" + "gitlink.org.cn/cloudream/common/pkgs/trie" +) + +var ErrWaitIndexUpdateTimeout = errors.New("waitting local index updating timeout") + +type indexWaiter struct { + Index int64 + Callback *future.SetVoidFuture +} + +type ProvidersActor struct { + localLockReqIndex int64 + provdersTrie trie.Trie[LockProvider] + allProviders []LockProvider + + indexWaiters []indexWaiter + lock sync.Mutex +} + +func NewProvidersActor() *ProvidersActor { + return &ProvidersActor{} +} + +func (a *ProvidersActor) AddProvider(prov LockProvider, path ...any) { + a.provdersTrie.Create(path).Value = prov + a.allProviders = append(a.allProviders, prov) +} + +func (a *ProvidersActor) Init() { +} + +func (a *ProvidersActor) WaitLocalIndexTo(ctx context.Context, index int64) error { + fut := future.NewSetVoid() + + a.lock.Lock() + if index <= a.localLockReqIndex { + fut.SetVoid() + } else { + a.indexWaiters = append(a.indexWaiters, indexWaiter{ + Index: index, + Callback: fut, + }) + } + a.lock.Unlock() + + return fut.Wait(ctx) +} + +func (a *ProvidersActor) OnLockRequestEvent(evt LockRequestEvent) error { + a.lock.Lock() + defer a.lock.Unlock() + + if evt.IsLocking { + err := a.lockLockRequest(evt.Data) + if err != nil { + return fmt.Errorf("applying locking event: %w", err) + } + + } else { + err := a.unlockLockRequest(evt.Data) + if err != nil { + return fmt.Errorf("applying unlocking event: %w", err) + } + } + + a.localLockReqIndex++ + // 检查是否有等待同步进度的需求 + a.wakeUpIndexWaiter() + return nil +} + +func (svc *ProvidersActor) lockLockRequest(reqData LockRequestData) error { + for _, lockData := range reqData.Locks { + node, ok := svc.provdersTrie.WalkEnd(lockData.Path) + if !ok || node.Value == nil { + return fmt.Errorf("lock provider not found for path %v", lockData.Path) + } + + target, err := node.Value.ParseTargetString(lockData.Target) + if err != nil { + return fmt.Errorf("parse target data failed, err: %w", err) + } + + err = node.Value.Lock(reqData.ID, Lock{ + Path: lockData.Path, + Name: lockData.Name, + Target: target, + }) + if err != nil { + return fmt.Errorf("locking failed, err: %w", err) + } + } + return nil +} + +func (svc *ProvidersActor) unlockLockRequest(reqData LockRequestData) error { + for _, lockData := range reqData.Locks { + node, ok := svc.provdersTrie.WalkEnd(lockData.Path) + if !ok || node.Value == nil { + return fmt.Errorf("lock provider not found for path %v", lockData.Path) + } + + target, err := node.Value.ParseTargetString(lockData.Target) + if err != nil { + return fmt.Errorf("parse target data failed, err: %w", err) + } + + err = node.Value.Unlock(reqData.ID, Lock{ + Path: lockData.Path, + Name: lockData.Name, + Target: target, + }) + if err != nil { + return fmt.Errorf("unlocking failed, err: %w", err) + } + } + return nil +} + +// TestLockRequestAndMakeData 判断锁能否锁成功,并生成锁数据的字符串表示。注:不会生成请求ID。 +// 在检查单个锁是否能上锁时,不会考虑同一个锁请求中的其他的锁影响。简单来说,就是同一个请求中的锁可以互相冲突。 +func (a *ProvidersActor) TestLockRequestAndMakeData(req LockRequest) (LockRequestData, error) { + a.lock.Lock() + defer a.lock.Unlock() + + reqData := LockRequestData{} + + for _, lock := range req.Locks { + n, ok := a.provdersTrie.WalkEnd(lock.Path) + if !ok || n.Value == nil { + return LockRequestData{}, fmt.Errorf("lock provider not found for path %v", lock.Path) + } + + err := n.Value.CanLock(lock) + if err != nil { + return LockRequestData{}, err + } + + targetStr, err := n.Value.GetTargetString(lock.Target) + if err != nil { + return LockRequestData{}, fmt.Errorf("get lock target string failed, err: %w", err) + } + + reqData.Locks = append(reqData.Locks, lockData{ + Path: lock.Path, + Name: lock.Name, + Target: targetStr, + }) + } + + return reqData, nil +} + +func (a *ProvidersActor) ResetState(index int64, lockRequestData []LockRequestData) error { + a.lock.Lock() + defer a.lock.Unlock() + + var err error + + for _, p := range a.allProviders { + p.Clear() + } + + for _, reqData := range lockRequestData { + err = a.lockLockRequest(reqData) + if err != nil { + err = fmt.Errorf("applying lock request data: %w", err) + break + } + } + + a.localLockReqIndex = index + + // 内部状态已被破坏,停止所有监听器 + for _, w := range a.indexWaiters { + w.Callback.SetError(ErrWaitIndexUpdateTimeout) + } + a.indexWaiters = nil + + return err +} + +func (a *ProvidersActor) wakeUpIndexWaiter() { + var resetWaiters []indexWaiter + for _, waiter := range a.indexWaiters { + if waiter.Index <= a.localLockReqIndex { + waiter.Callback.SetVoid() + } else { + resetWaiters = append(resetWaiters, waiter) + } + } + a.indexWaiters = resetWaiters +} diff --git a/pkgs/distlock/internal/release_actor.go b/pkgs/distlock/internal/release_actor.go new file mode 100644 index 0000000..72e9145 --- /dev/null +++ b/pkgs/distlock/internal/release_actor.go @@ -0,0 +1,223 @@ +package internal + +import ( + "context" + "fmt" + "math/rand" + "strconv" + "sync" + "time" + + "gitlink.org.cn/cloudream/common/pkgs/logger" + clientv3 "go.etcd.io/etcd/client/v3" + "go.etcd.io/etcd/client/v3/clientv3util" +) + +const ( + DefaultMaxReleaseingDelayMs = 4000 + BaseReleaseingDelayMs = 1000 +) + +type ReleaseActor struct { + cfg *Config + etcdCli *clientv3.Client + + lock sync.Mutex + isMaintenance bool + releasingLockRequestIDs map[string]bool + timer *time.Timer + timerSetup bool + doReleasingChan chan any +} + +func NewReleaseActor(cfg *Config, etcdCli *clientv3.Client) *ReleaseActor { + return &ReleaseActor{ + cfg: cfg, + etcdCli: etcdCli, + isMaintenance: true, + releasingLockRequestIDs: make(map[string]bool), + doReleasingChan: make(chan any), + } +} + +// 立刻尝试释放这些锁。一般用于在用户主动释放了一个锁之后 +func (a *ReleaseActor) Release(reqIDs []string) { + a.lock.Lock() + defer a.lock.Unlock() + + for _, id := range reqIDs { + a.releasingLockRequestIDs[id] = true + } + + if a.isMaintenance { + return + } + + select { + case a.doReleasingChan <- nil: + default: + } +} + +// 延迟释放锁。一般用于清理崩溃的锁服务遗留下来的锁 +func (a *ReleaseActor) DelayRelease(reqIDs []string) { + a.lock.Lock() + defer a.lock.Unlock() + + for _, id := range reqIDs { + a.releasingLockRequestIDs[id] = true + } + + if a.isMaintenance { + return + } + + a.setupTimer() +} + +// 重试一下内部的解锁请求。不会阻塞调用者 +func (a *ReleaseActor) TryReleaseNow() { + a.lock.Lock() + defer a.lock.Unlock() + + // 如果处于维护模式,那么即使主动进行释放操作,也不予理会 + if a.isMaintenance { + return + } + + select { + case a.doReleasingChan <- nil: + default: + } +} + +// 进入维护模式。在维护模式期间只接受请求,不处理请求,包括延迟释放请求。 +func (a *ReleaseActor) EnterMaintenance() { + a.lock.Lock() + defer a.lock.Unlock() + + a.isMaintenance = true +} + +// 退出维护模式。退出之后建议调用一下TryReleaseNow。 +func (a *ReleaseActor) LeaveMaintenance() { + a.lock.Lock() + defer a.lock.Unlock() + + a.isMaintenance = false +} + +func (a *ReleaseActor) OnLockRequestEvent(event LockRequestEvent) { + if event.IsLocking { + return + } + + a.lock.Lock() + defer a.lock.Unlock() + + delete(a.releasingLockRequestIDs, event.Data.ID) +} + +func (a *ReleaseActor) Serve() { + for { + select { + case <-a.doReleasingChan: + err := a.doReleasing() + if err != nil { + logger.Std.Debugf("doing releasing: %s", err.Error()) + } + } + } +} + +func (a *ReleaseActor) doReleasing() error { + ctx, cancel := context.WithTimeout(context.Background(), time.Second*5) + defer cancel() + + // 先看一眼,如果没有需要释放的锁,就不用走后面的流程了 + a.lock.Lock() + if len(a.releasingLockRequestIDs) == 0 { + a.lock.Unlock() + return nil + } + a.lock.Unlock() + + // 在获取全局锁的时候不用锁Actor,只有获取成功了,才加锁 + // TODO 根据不同的错误设置不同的错误类型,方便上层进行后续处理 + unlock, err := acquireEtcdRequestDataLock(ctx, a.etcdCli, a.cfg.EtcdLockLeaseTimeSec) + if err != nil { + return fmt.Errorf("acquire etcd request data lock failed, err: %w", err) + } + defer unlock() + + index, err := getEtcdLockRequestIndex(ctx, a.etcdCli) + if err != nil { + return err + } + + a.lock.Lock() + defer a.lock.Unlock() + defer a.setupTimer() + + // TODO 可以考虑优化成一次性删除多个锁 + for id := range a.releasingLockRequestIDs { + lockReqKey := MakeEtcdLockRequestKey(id) + + txResp, err := a.etcdCli.Txn(ctx). + If(clientv3util.KeyExists(lockReqKey)). + Then(clientv3.OpDelete(lockReqKey), clientv3.OpPut(EtcdLockRequestIndex, strconv.FormatInt(index+1, 10))).Commit() + if err != nil { + return fmt.Errorf("updating lock request data: %w", err) + } + // 只有确实删除了锁数据,才更新index + if txResp.Succeeded { + index++ + } + delete(a.releasingLockRequestIDs, id) + } + + return nil +} + +func (a *ReleaseActor) setupTimer() { + if len(a.releasingLockRequestIDs) == 0 { + return + } + + if a.timerSetup { + return + } + a.timerSetup = true + + delay := int64(0) + if a.cfg.RandomReleasingDelayMs == 0 { + delay = rand.Int63n(DefaultMaxReleaseingDelayMs) + } else { + delay = rand.Int63n(a.cfg.RandomReleasingDelayMs) + } + + if a.timer == nil { + a.timer = time.NewTimer(time.Duration(delay+BaseReleaseingDelayMs) * time.Millisecond) + } else { + a.timer.Reset(time.Duration(delay+BaseReleaseingDelayMs) * time.Millisecond) + } + + go func() { + <-a.timer.C + + a.lock.Lock() + defer a.lock.Unlock() + + a.timerSetup = false + + // 如果处于维护模式,那么即使是定时器要求的释放操作,也不予理会 + if a.isMaintenance { + return + } + + select { + case a.doReleasingChan <- nil: + default: + } + }() +} diff --git a/pkgs/distlock/internal/service_info_actor.go b/pkgs/distlock/internal/service_info_actor.go new file mode 100644 index 0000000..a940574 --- /dev/null +++ b/pkgs/distlock/internal/service_info_actor.go @@ -0,0 +1,196 @@ +package internal + +import ( + "context" + "errors" + "fmt" + "sync" + + "github.com/google/uuid" + "gitlink.org.cn/cloudream/common/pkgs/logger" + mylo "gitlink.org.cn/cloudream/common/utils/lo" + "gitlink.org.cn/cloudream/common/utils/serder" + clientv3 "go.etcd.io/etcd/client/v3" +) + +var ErrSelfServiceDown = errors.New("self service is down, need to restart") + +type serviceStatus struct { + Info ServiceInfo + LockRequestIDs []string +} + +type ServiceInfoActor struct { + cfg *Config + etcdCli *clientv3.Client + + lock sync.Mutex + selfInfo ServiceInfo + leaseID *clientv3.LeaseID + leaseKeepAlive chan any + services map[string]*serviceStatus + releaseActor *ReleaseActor +} + +func NewServiceInfoActor(cfg *Config, etcdCli *clientv3.Client, baseSelfInfo ServiceInfo) *ServiceInfoActor { + return &ServiceInfoActor{ + cfg: cfg, + etcdCli: etcdCli, + selfInfo: baseSelfInfo, + } +} + +func (a *ServiceInfoActor) Init(releaseActor *ReleaseActor) { + a.releaseActor = releaseActor +} + +func (a *ServiceInfoActor) GetSelfInfo() *ServiceInfo { + return &a.selfInfo +} + +func (a *ServiceInfoActor) ResetState(ctx context.Context, currentServices []ServiceInfo, currentLocks []LockRequestData) ([]string, error) { + a.lock.Lock() + defer a.lock.Unlock() + + if a.leaseID != nil { + a.etcdCli.Revoke(ctx, *a.leaseID) + close(a.leaseKeepAlive) + a.leaseID = nil + } + + // 生成并注册服务信息 + a.selfInfo.ID = uuid.NewString() + + infoData, err := serder.ObjectToJSON(a.selfInfo) + if err != nil { + return nil, fmt.Errorf("service info to json: %w", err) + } + + lease, err := a.etcdCli.Grant(ctx, a.cfg.EtcdLockLeaseTimeSec) + if err != nil { + return nil, fmt.Errorf("granting lease: %w", err) + } + a.leaseID = &lease.ID + + keepAliveChan, err := a.etcdCli.Lease.KeepAlive(context.Background(), lease.ID) + if err != nil { + a.etcdCli.Revoke(ctx, lease.ID) + return nil, fmt.Errorf("starting keep lease alive: %w", err) + } + a.leaseKeepAlive = make(chan any) + + go func() { + for { + select { + case _, ok := <-keepAliveChan: + if !ok { + logger.Std.Warnf("lease keep alive channel closed, will try to open again") + + var err error + keepAliveChan, err = a.etcdCli.Lease.KeepAlive(context.Background(), lease.ID) + if err != nil { + logger.Std.Warnf("starting keep lease alive: %s", err.Error()) + return + } + } + + case <-a.leaseKeepAlive: + return + } + } + }() + + _, err = a.etcdCli.Put(ctx, MakeServiceInfoKey(a.selfInfo.ID), string(infoData), clientv3.WithLease(lease.ID)) + if err != nil { + a.etcdCli.Revoke(ctx, lease.ID) + return nil, fmt.Errorf("putting service info to etcd: %w", err) + } + + // 导入当前已有的服务信息和锁信息 + a.services = make(map[string]*serviceStatus) + for _, svc := range currentServices { + a.services[svc.ID] = &serviceStatus{ + Info: svc, + } + } + // 直接添加自己的信息 + a.services[a.selfInfo.ID] = &serviceStatus{ + Info: a.selfInfo, + } + + // 导入锁信息的过程中可能会发现未注册信息的锁服务的锁,把他们挑出来释放掉 + var willReleaseIDs []string + for _, lock := range currentLocks { + svc, ok := a.services[lock.SerivceID] + if !ok { + willReleaseIDs = append(willReleaseIDs, lock.ID) + continue + } + + svc.LockRequestIDs = append(svc.LockRequestIDs, lock.ID) + } + + return willReleaseIDs, nil +} + +func (a *ServiceInfoActor) OnServiceEvent(evt ServiceEvent) error { + a.lock.Lock() + defer a.lock.Unlock() + + // TODO 可以考虑打印一点日志 + + if evt.IsNew { + if evt.Info.ID != a.selfInfo.ID { + logger.Std.WithField("ID", evt.Info.ID).Infof("new service up") + a.services[evt.Info.ID] = &serviceStatus{ + Info: evt.Info, + } + } + + } else { + logger.Std.WithField("ID", evt.Info.ID).Infof("service down, will release all its locks") + + status, ok := a.services[evt.Info.ID] + if !ok { + return nil + } + + a.releaseActor.DelayRelease(status.LockRequestIDs) + + delete(a.services, evt.Info.ID) + + // 如果收到的被删除服务信息是自己的,那么自己要重启,重新获取全量数据 + if evt.Info.ID == a.selfInfo.ID { + return ErrSelfServiceDown + } + } + + return nil +} + +func (a *ServiceInfoActor) OnLockRequestEvent(evt LockRequestEvent) { + a.lock.Lock() + defer a.lock.Unlock() + + status, ok := a.services[evt.Data.SerivceID] + if !ok { + if evt.IsLocking { + // 加锁的是一个没有注册过的锁服务,可能是因为这个锁服务之前网络发生了波动, + // 在波动期间它注册的信息过期,于是被大家认为服务下线,清理掉了它管理的锁, + // 而在网络恢复回来之后,它还没有意识到自己被认为下线了,于是还在提交锁请求。 + // 为了防止它加了这个锁之后又崩溃,导致的无限锁定,它加的锁我们都直接释放。 + logger.Std.WithField("RequestID", evt.Data.ID). + WithField("ServiceID", evt.Data.SerivceID). + Warnf("the lock request is from an unknow service, will release it") + + a.releaseActor.Release([]string{evt.Data.ID}) + } + return + } + + if evt.IsLocking { + status.LockRequestIDs = append(status.LockRequestIDs, evt.Data.ID) + } else { + status.LockRequestIDs = mylo.Remove(status.LockRequestIDs, evt.Data.ID) + } +} diff --git a/pkgs/distlock/internal/watch_etcd_actor.go b/pkgs/distlock/internal/watch_etcd_actor.go new file mode 100644 index 0000000..254ff17 --- /dev/null +++ b/pkgs/distlock/internal/watch_etcd_actor.go @@ -0,0 +1,190 @@ +package internal + +import ( + "context" + "fmt" + "strings" + + "gitlink.org.cn/cloudream/common/pkgs/actor" + "gitlink.org.cn/cloudream/common/utils/serder" + clientv3 "go.etcd.io/etcd/client/v3" +) + +type LockRequestEvent struct { + IsLocking bool + Data LockRequestData +} + +type ServiceEvent struct { + IsNew bool + Info ServiceInfo +} + +type OnLockRequestEventFn func(event LockRequestEvent) + +type OnServiceEventFn func(event ServiceEvent) + +type OnWatchFailedFn func(err error) + +type WatchEtcdActor struct { + etcdCli *clientv3.Client + + watchChan clientv3.WatchChan + watchChanCancel func() + onLockRequestEventFn OnLockRequestEventFn + onServiceEventFn OnServiceEventFn + onWatchFailedFn OnWatchFailedFn + commandChan *actor.CommandChannel +} + +func NewWatchEtcdActor(etcdCli *clientv3.Client) *WatchEtcdActor { + return &WatchEtcdActor{ + etcdCli: etcdCli, + commandChan: actor.NewCommandChannel(), + } +} + +func (a *WatchEtcdActor) Init(onLockRequestEvent OnLockRequestEventFn, onServiceDown OnServiceEventFn, onWatchFailed OnWatchFailedFn) { + a.onLockRequestEventFn = onLockRequestEvent + a.onServiceEventFn = onServiceDown + a.onWatchFailedFn = onWatchFailed +} + +func (a *WatchEtcdActor) Start(revision int64) { + actor.Wait(context.Background(), a.commandChan, func() error { + if a.watchChanCancel != nil { + a.watchChanCancel() + a.watchChanCancel = nil + } + + ctx, cancel := context.WithCancel(context.Background()) + a.watchChan = a.etcdCli.Watch(ctx, EtcdWatchPrefix, clientv3.WithPrefix(), clientv3.WithPrevKV(), clientv3.WithRev(revision)) + a.watchChanCancel = cancel + return nil + }) +} + +func (a *WatchEtcdActor) Stop() { + actor.Wait(context.Background(), a.commandChan, func() error { + if a.watchChanCancel != nil { + a.watchChanCancel() + a.watchChanCancel = nil + } + a.watchChan = nil + return nil + }) +} + +func (a *WatchEtcdActor) Serve() { + cmdChan := a.commandChan.BeginChanReceive() + defer a.commandChan.CloseChanReceive() + + for { + if a.watchChan != nil { + select { + case cmd := <-cmdChan: + cmd() + + case msg := <-a.watchChan: + // 只要发生错误,就停止监听,通知外部处理 + if msg.Canceled { + a.onWatchFailedFn(fmt.Errorf("watch etcd channel closed")) + a.watchChanCancel() + a.watchChan = nil + continue + } + + err := a.dispatchEtcdEvent(msg) + if err != nil { + a.onWatchFailedFn(err) + a.watchChanCancel() + a.watchChan = nil + continue + } + } + + } else { + select { + case cmd := <-cmdChan: + cmd() + } + } + } +} + +func (a *WatchEtcdActor) dispatchEtcdEvent(watchResp clientv3.WatchResponse) error { + for _, e := range watchResp.Events { + key := string(e.Kv.Key) + + if strings.HasPrefix(key, EtcdLockRequestDataPrefix) { + if err := a.applyLockRequestEvent(e); err != nil { + return fmt.Errorf("parsing lock request event: %w", err) + } + + } else if strings.HasPrefix(key, EtcdServiceInfoPrefix) { + if err := a.applyServiceEvent(e); err != nil { + return fmt.Errorf("parsing service event: %w", err) + } + } + } + + return nil +} + +func (a *WatchEtcdActor) applyLockRequestEvent(evt *clientv3.Event) error { + isLocking := true + var valueData []byte + + // 只监听新建和删除的事件,因为在设计上约定只有这两种事件才会影响Index + if evt.Type == clientv3.EventTypeDelete { + isLocking = false + valueData = evt.PrevKv.Value + } else if evt.IsCreate() { + isLocking = true + valueData = evt.Kv.Value + } else { + return nil + } + + var reqData LockRequestData + err := serder.JSONToObject(valueData, &reqData) + if err != nil { + return fmt.Errorf("parse lock request data failed, err: %w", err) + } + + a.onLockRequestEventFn(LockRequestEvent{ + IsLocking: isLocking, + Data: reqData, + }) + + return nil +} + +func (a *WatchEtcdActor) applyServiceEvent(evt *clientv3.Event) error { + isNew := true + var valueData []byte + + // 只监听新建和删除的事件,因为在设计上约定只有这两种事件才会影响Index + if evt.Type == clientv3.EventTypeDelete { + isNew = false + valueData = evt.PrevKv.Value + } else if evt.IsCreate() { + isNew = true + valueData = evt.Kv.Value + } else { + return nil + } + + var svcInfo ServiceInfo + err := serder.JSONToObject(valueData, &svcInfo) + if err != nil { + return fmt.Errorf("parsing service info: %w", err) + } + + a.onServiceEventFn(ServiceEvent{ + IsNew: isNew, + Info: svcInfo, + }) + + return nil +} diff --git a/pkgs/distlock/service/mutex.go b/pkgs/distlock/mutex.go similarity index 53% rename from pkgs/distlock/service/mutex.go rename to pkgs/distlock/mutex.go index b386ff1..a7c4d23 100644 --- a/pkgs/distlock/service/mutex.go +++ b/pkgs/distlock/mutex.go @@ -1,14 +1,14 @@ -package service +package distlock -import "gitlink.org.cn/cloudream/common/pkgs/distlock" +import "gitlink.org.cn/cloudream/common/pkgs/distlock/internal" type Mutex struct { svc *Service - lockReq distlock.LockRequest + lockReq internal.LockRequest lockReqID string } -func NewMutex(svc *Service, lockReq distlock.LockRequest) *Mutex { +func NewMutex(svc *Service, lockReq internal.LockRequest) *Mutex { return &Mutex{ svc: svc, lockReq: lockReq, @@ -25,6 +25,6 @@ func (m *Mutex) Lock() error { return nil } -func (m *Mutex) Unlock() error { - return m.svc.Release(m.lockReqID) +func (m *Mutex) Unlock() { + m.svc.Release(m.lockReqID) } diff --git a/pkgs/distlock/service.go b/pkgs/distlock/service.go new file mode 100644 index 0000000..354373b --- /dev/null +++ b/pkgs/distlock/service.go @@ -0,0 +1,300 @@ +package distlock + +import ( + "context" + "fmt" + "strconv" + "time" + + "gitlink.org.cn/cloudream/common/pkgs/actor" + "gitlink.org.cn/cloudream/common/pkgs/distlock/internal" + "gitlink.org.cn/cloudream/common/pkgs/logger" + "gitlink.org.cn/cloudream/common/utils/serder" + clientv3 "go.etcd.io/etcd/client/v3" +) + +type AcquireOption struct { + Timeout time.Duration + Lease time.Duration +} + +type AcquireOptionFn func(opt *AcquireOption) + +func WithTimeout(timeout time.Duration) AcquireOptionFn { + return func(opt *AcquireOption) { + opt.Timeout = timeout + } +} + +func WithLease(time time.Duration) AcquireOptionFn { + return func(opt *AcquireOption) { + opt.Lease = time + } +} + +type PathProvider struct { + Path []any + Provider internal.LockProvider +} + +func NewPathProvider(prov internal.LockProvider, path ...any) PathProvider { + return PathProvider{ + Path: path, + Provider: prov, + } +} + +type Service struct { + cfg *internal.Config + etcdCli *clientv3.Client + + cmdChan *actor.CommandChannel + acquireActor *internal.AcquireActor + releaseActor *internal.ReleaseActor + providersActor *internal.ProvidersActor + watchEtcdActor *internal.WatchEtcdActor + leaseActor *internal.LeaseActor + serviceInfoActor *internal.ServiceInfoActor +} + +func NewService(cfg *internal.Config, initProvs []PathProvider) (*Service, error) { + etcdCli, err := clientv3.New(clientv3.Config{ + Endpoints: []string{cfg.EtcdAddress}, + Username: cfg.EtcdUsername, + Password: cfg.EtcdPassword, + DialTimeout: time.Second * 5, + }) + if err != nil { + return nil, fmt.Errorf("new etcd client failed, err: %w", err) + } + + svc := &Service{ + cfg: cfg, + etcdCli: etcdCli, + cmdChan: actor.NewCommandChannel(), + } + + svc.acquireActor = internal.NewAcquireActor(cfg, etcdCli) + svc.releaseActor = internal.NewReleaseActor(cfg, etcdCli) + svc.providersActor = internal.NewProvidersActor() + svc.watchEtcdActor = internal.NewWatchEtcdActor(etcdCli) + svc.leaseActor = internal.NewLeaseActor() + svc.serviceInfoActor = internal.NewServiceInfoActor(cfg, etcdCli, internal.ServiceInfo{ + Description: cfg.ServiceDescription, + }) + + svc.acquireActor.Init(svc.providersActor) + svc.leaseActor.Init(svc.releaseActor) + svc.providersActor.Init() + svc.watchEtcdActor.Init( + func(event internal.LockRequestEvent) { + err := svc.providersActor.OnLockRequestEvent(event) + if err != nil { + logger.Std.Warnf("%s, will reset service state", err.Error()) + svc.cmdChan.Send(func() { svc.doResetState() }) + return + } + + svc.acquireActor.TryAcquireNow() + svc.releaseActor.OnLockRequestEvent(event) + svc.serviceInfoActor.OnLockRequestEvent(event) + }, + func(event internal.ServiceEvent) { + err := svc.serviceInfoActor.OnServiceEvent(event) + if err != nil { + logger.Std.Warnf("%s, will reset service state", err.Error()) + svc.cmdChan.Send(func() { svc.doResetState() }) + } + }, + func(err error) { + logger.Std.Warnf("%s, will reset service state", err.Error()) + svc.cmdChan.Send(func() { svc.doResetState() }) + }, + ) + svc.serviceInfoActor.Init(svc.releaseActor) + + for _, prov := range initProvs { + svc.providersActor.AddProvider(prov.Provider, prov.Path...) + } + + return svc, nil +} + +// Acquire 请求一批锁。成功后返回锁请求ID +func (svc *Service) Acquire(req internal.LockRequest, opts ...AcquireOptionFn) (string, error) { + var opt = AcquireOption{ + Timeout: time.Second * 10, + } + for _, fn := range opts { + fn(&opt) + } + + ctx := context.Background() + if opt.Timeout != 0 { + var cancel func() + ctx, cancel = context.WithTimeout(ctx, opt.Timeout) + defer cancel() + } + + reqID, err := svc.acquireActor.Acquire(ctx, req) + if err != nil { + return "", err + } + + if opt.Lease > 0 { + // TODO 不影响结果,但考虑打日志 + err := svc.leaseActor.Add(reqID, opt.Lease) + if err != nil { + logger.Std.Warnf("adding lease: %s", err.Error()) + } + } + + return reqID, nil +} + +// Renew 续约锁。只有在加锁时设置了续约时间才有意义 +func (svc *Service) Renew(reqID string) error { + return svc.leaseActor.Renew(reqID) +} + +// Release 释放锁 +func (svc *Service) Release(reqID string) { + svc.releaseActor.Release([]string{reqID}) + + // TODO 不影响结果,但考虑打日志 + err := svc.leaseActor.Remove(reqID) + if err != nil { + logger.Std.Warnf("removing lease: %s", err.Error()) + } +} + +func (svc *Service) Serve() error { + // TODO 需要停止service的方法 + // 目前已知问题: + // 1. client退出时直接中断进程,此时AcquireActor可能正在进行重试,于是导致Etcd锁没有解除就退出了进程。 + // 虽然由于租约的存在不会导致系统长期卡死,但会影响client的使用 + + go svc.watchEtcdActor.Serve() + + go svc.leaseActor.Serve() + + go svc.acquireActor.Serve() + + go svc.releaseActor.Serve() + + svc.cmdChan.Send(func() { svc.doResetState() }) + + cmdChan := svc.cmdChan.BeginChanReceive() + defer svc.cmdChan.CloseChanReceive() + + for { + select { + case cmd := <-cmdChan: + cmd() + } + } + + return nil +} + +func (svc *Service) doResetState() { + logger.Std.Infof("start reset state") + // TODO context + err := svc.resetState(context.Background()) + if err != nil { + logger.Std.Warnf("reseting state: %s, will try again after 3 seconds", err.Error()) + <-time.After(time.Second * 3) + svc.cmdChan.Send(func() { svc.doResetState() }) + return + } + logger.Std.WithField("ID", svc.serviceInfoActor.GetSelfInfo().ID). + Infof("reset state success") +} + +// ResetState 重置内部状态。注:只要调用到了此函数,无论在哪一步出的错, +// 都要将内部状态视为已被破坏,直到成功调用了此函数才能继续后面的步骤。 +// 如果调用失败,服务将进入维护模式,届时可以接受请求,但不会处理请求,直到调用成功为止。 +func (svc *Service) resetState(ctx context.Context) error { + // 让服务都进入维护模式 + svc.watchEtcdActor.Stop() + svc.leaseActor.Stop() + svc.acquireActor.EnterMaintenance() + svc.releaseActor.EnterMaintenance() + + // 必须使用事务一次性获取所有数据 + txResp, err := svc.etcdCli.Txn(ctx). + Then( + clientv3.OpGet(internal.EtcdLockRequestIndex), + clientv3.OpGet(internal.EtcdLockRequestDataPrefix, clientv3.WithPrefix()), + clientv3.OpGet(internal.EtcdServiceInfoPrefix, clientv3.WithPrefix()), + ). + Commit() + if err != nil { + return fmt.Errorf("getting etcd data: %w", err) + } + + // 解析Index + var index int64 = 0 + indexKvs := txResp.Responses[0].GetResponseRange().Kvs + if len(indexKvs) > 0 { + val, err := strconv.ParseInt(string(indexKvs[0].Value), 0, 64) + if err != nil { + return fmt.Errorf("parsing lock request index: %w", err) + } + index = val + } + + // 解析锁请求数据 + var reqData []internal.LockRequestData + lockKvs := txResp.Responses[1].GetResponseRange().Kvs + for _, kv := range lockKvs { + var req internal.LockRequestData + err := serder.JSONToObject(kv.Value, &req) + if err != nil { + return fmt.Errorf("parsing lock request data: %w", err) + } + + reqData = append(reqData, req) + } + + // 解析服务信息数据 + var svcInfo []internal.ServiceInfo + svcInfoKvs := txResp.Responses[2].GetResponseRange().Kvs + for _, kv := range svcInfoKvs { + var info internal.ServiceInfo + err := serder.JSONToObject(kv.Value, &info) + if err != nil { + return fmt.Errorf("parsing service info data: %w", err) + } + + svcInfo = append(svcInfo, info) + } + + // 然后将新获取到的状态装填到Actor中 + releasingIDs, err := svc.serviceInfoActor.ResetState(ctx, svcInfo, reqData) + if err != nil { + return fmt.Errorf("reseting service info actor: %w", err) + } + + // 要在acquireActor之前,因为acquireActor会调用它的WaitLocalIndexTo + err = svc.providersActor.ResetState(index, reqData) + if err != nil { + return fmt.Errorf("reseting providers actor: %w", err) + } + + svc.acquireActor.ResetState(svc.serviceInfoActor.GetSelfInfo().ID) + + // ReleaseActor没有什么需要Reset的状态 + svc.releaseActor.DelayRelease(releasingIDs) + + // 重置完了之后再退出维护模式 + svc.watchEtcdActor.Start(txResp.Header.Revision) + svc.leaseActor.Start() + svc.acquireActor.LeaveMaintenance() + svc.releaseActor.LeaveMaintenance() + + svc.acquireActor.TryAcquireNow() + svc.releaseActor.TryReleaseNow() + + return nil +} diff --git a/pkgs/distlock/service/internal/main_actor.go b/pkgs/distlock/service/internal/main_actor.go deleted file mode 100644 index c8fbff7..0000000 --- a/pkgs/distlock/service/internal/main_actor.go +++ /dev/null @@ -1,313 +0,0 @@ -package internal - -import ( - "context" - "fmt" - "strconv" - - "gitlink.org.cn/cloudream/common/pkgs/actor" - "gitlink.org.cn/cloudream/common/pkgs/distlock" - "gitlink.org.cn/cloudream/common/utils/serder" - clientv3 "go.etcd.io/etcd/client/v3" - "go.etcd.io/etcd/client/v3/clientv3util" - "go.etcd.io/etcd/client/v3/concurrency" -) - -const ( - EtcdLockRequestData = "/distlock/lockRequest/data" - EtcdLockRequestIndex = "/distlock/lockRequest/index" - EtcdLockRequestLock = "/distlock/lockRequest/lock" -) - -type lockData struct { - Path []string `json:"path"` - Name string `json:"name"` - Target string `json:"target"` -} - -type acquireManyResult struct { - IsTried bool - RequestID string - Err error -} - -type LockRequestData struct { - ID string `json:"id"` - Locks []lockData `json:"locks"` -} - -type MainActor struct { - cfg *distlock.Config - etcdCli *clientv3.Client - - commandChan *actor.CommandChannel - - providersActor *ProvidersActor - - lockRequestLeaseID clientv3.LeaseID -} - -func NewMainActor(cfg *distlock.Config, etcdCli *clientv3.Client) *MainActor { - return &MainActor{ - cfg: cfg, - etcdCli: etcdCli, - commandChan: actor.NewCommandChannel(), - } -} - -func (a *MainActor) Init(providersActor *ProvidersActor) { - a.providersActor = providersActor -} - -// Acquire 请求一批锁。成功后返回锁请求ID -func (a *MainActor) Acquire(ctx context.Context, req distlock.LockRequest) (reqID string, err error) { - rets, err := a.AcquireMany(ctx, []distlock.LockRequest{req}) - if err != nil { - return "", err - } - - if rets[0].Err != nil { - return "", rets[0].Err - } - - return rets[0].RequestID, nil -} - -// AcquireAny 尝试多个锁请求。目前的实现会在第一个获取成功后就直接返回 -func (a *MainActor) AcquireMany(ctx context.Context, reqs []distlock.LockRequest) (rets []acquireManyResult, err error) { - return actor.WaitValue(context.TODO(), a.commandChan, func() ([]acquireManyResult, error) { - // TODO 根据不同的错误设置不同的错误类型,方便上层进行后续处理 - unlock, err := a.acquireEtcdRequestDataLock(ctx) - if err != nil { - return nil, fmt.Errorf("acquire etcd request data lock failed, err: %w", err) - } - defer unlock() - - index, err := a.getEtcdLockRequestIndex(ctx) - if err != nil { - return nil, err - } - - // 等待本地状态同步到最新 - // TODO 配置等待时间 - err = a.providersActor.WaitIndexUpdated(ctx, index) - if err != nil { - return nil, err - } - - rets := make([]acquireManyResult, len(reqs)) - for i := 0; i < len(reqs); i++ { - // 测试锁,并获得锁数据 - reqData, err := a.providersActor.TestLockRequestAndMakeData(reqs[i]) - if err == nil { - nextIndexStr := strconv.FormatInt(index+1, 10) - reqData.ID = nextIndexStr - - // 锁成功,提交锁数据 - err := a.submitLockRequest(ctx, reqData) - - rets[i] = acquireManyResult{ - IsTried: true, - RequestID: nextIndexStr, - Err: err, - } - - break - - } else { - rets[i] = acquireManyResult{ - IsTried: true, - Err: err, - } - } - } - - return rets, nil - }) -} - -func (a *MainActor) submitLockRequest(ctx context.Context, reqData LockRequestData) error { - reqBytes, err := serder.ObjectToJSON(reqData) - if err != nil { - return fmt.Errorf("serialize lock request data failed, err: %w", err) - } - - var etcdOps []clientv3.Op - if a.cfg.SubmitLockRequestWithoutLease { - etcdOps = []clientv3.Op{ - clientv3.OpPut(EtcdLockRequestIndex, reqData.ID), - clientv3.OpPut(makeEtcdLockRequestKey(reqData.ID), string(reqBytes)), - } - - } else { - etcdOps = []clientv3.Op{ - clientv3.OpPut(EtcdLockRequestIndex, reqData.ID), - // 归属到当前连接的租约,在当前连接断开后,能自动解锁 - // TODO 不能直接给RequestData上租约,因为如果在别的服务已经获取到锁的情况下, - // 如果当前服务崩溃,删除消息会立刻发送出去,这就破坏了锁的约定(在锁定期间其他服务不能修改数据) - clientv3.OpPut(makeEtcdLockRequestKey(reqData.ID), string(reqBytes)), //, clientv3.WithLease(a.lockRequestLeaseID)), - } - } - txResp, err := a.etcdCli.Txn(ctx).Then(etcdOps...).Commit() - if err != nil { - return fmt.Errorf("submit lock request data failed, err: %w", err) - } - if !txResp.Succeeded { - return fmt.Errorf("submit lock request data failed for lock request data index changed") - } - - return nil -} - -// Release 释放锁 -func (a *MainActor) Release(ctx context.Context, reqID string) error { - return actor.Wait(context.TODO(), a.commandChan, func() error { - // TODO 根据不同的错误设置不同的错误类型,方便上层进行后续处理 - unlock, err := a.acquireEtcdRequestDataLock(ctx) - if err != nil { - return fmt.Errorf("acquire etcd request data lock failed, err: %w", err) - } - defer unlock() - - index, err := a.getEtcdLockRequestIndex(ctx) - if err != nil { - return err - } - - lockReqKey := makeEtcdLockRequestKey(reqID) - - txResp, err := a.etcdCli.Txn(ctx). - If(clientv3util.KeyExists(lockReqKey)). - Then(clientv3.OpDelete(lockReqKey), clientv3.OpPut(EtcdLockRequestIndex, strconv.FormatInt(index+1, 10))).Commit() - if err != nil { - return fmt.Errorf("updating lock request data index: %w", err) - } - if !txResp.Succeeded { - return fmt.Errorf("updating lock request data failed") - } - - return nil - }) -} - -func (a *MainActor) acquireEtcdRequestDataLock(ctx context.Context) (unlock func(), err error) { - lease, err := a.etcdCli.Grant(context.Background(), a.cfg.EtcdLockLeaseTimeSec) - if err != nil { - return nil, fmt.Errorf("grant lease failed, err: %w", err) - } - - session, err := concurrency.NewSession(a.etcdCli, concurrency.WithLease(lease.ID)) - if err != nil { - return nil, fmt.Errorf("new session failed, err: %w", err) - } - - mutex := concurrency.NewMutex(session, EtcdLockRequestLock) - - err = mutex.Lock(ctx) - if err != nil { - session.Close() - return nil, fmt.Errorf("acquire lock failed, err: %w", err) - } - - return func() { - mutex.Unlock(context.Background()) - session.Close() - }, nil -} - -func (a *MainActor) getEtcdLockRequestIndex(ctx context.Context) (int64, error) { - indexKv, err := a.etcdCli.Get(ctx, EtcdLockRequestIndex) - if err != nil { - return 0, fmt.Errorf("get lock request index failed, err: %w", err) - } - - if len(indexKv.Kvs) == 0 { - return 0, nil - } - - index, err := strconv.ParseInt(string(indexKv.Kvs[0].Value), 0, 64) - if err != nil { - return 0, fmt.Errorf("parse lock request index failed, err: %w", err) - } - - return index, nil -} - -func (a *MainActor) ReloadEtcdData() error { - return actor.Wait(context.TODO(), a.commandChan, func() error { - // 使用事务一次性获取index和锁数据,就不需要加全局锁了 - txResp, err := a.etcdCli.Txn(context.Background()). - Then( - clientv3.OpGet(EtcdLockRequestIndex), - clientv3.OpGet(EtcdLockRequestData, clientv3.WithPrefix()), - ). - Commit() - if err != nil { - return fmt.Errorf("get etcd data failed, err: %w", err) - } - if !txResp.Succeeded { - return fmt.Errorf("get etcd data failed") - } - - indexKvs := txResp.Responses[0].GetResponseRange().Kvs - lockKvs := txResp.Responses[1].GetResponseRange().Kvs - - var index int64 - var reqData []LockRequestData - - // 解析Index - if len(indexKvs) > 0 { - val, err := strconv.ParseInt(string(indexKvs[0].Value), 0, 64) - if err != nil { - return fmt.Errorf("parse lock request index failed, err: %w", err) - } - index = val - - } else { - index = 0 - } - - // 解析锁请求数据 - for _, kv := range lockKvs { - var req LockRequestData - err := serder.JSONToObject(kv.Value, &req) - if err != nil { - return fmt.Errorf("parse lock request data failed, err: %w", err) - } - - reqData = append(reqData, req) - } - - err = a.providersActor.ResetState(index, reqData) - if err != nil { - return fmt.Errorf("reset lock providers state failed, err: %w", err) - } - - return nil - }) -} - -func (a *MainActor) Serve() error { - lease, err := a.etcdCli.Grant(context.Background(), a.cfg.EtcdLockLeaseTimeSec) - if err != nil { - return fmt.Errorf("grant lease failed, err: %w", err) - } - a.lockRequestLeaseID = lease.ID - - cmdChan := a.commandChan.BeginChanReceive() - defer a.commandChan.CloseChanReceive() - - for { - select { - case cmd, ok := <-cmdChan: - if !ok { - return fmt.Errorf("command channel closed") - } - - // TODO Actor启动时,如果第一个调用的是Acquire,那么就会在Acquire中等待本地锁数据同步到最新。 - // 此时命令的执行也会被阻塞,导致ReloadEtcdData命令无法执行,因此产生死锁,最后Acquire超时失败。 - // 此处暂时使用单独的goroutine的来执行命令,避免阻塞。 - go cmd() - } - } -} diff --git a/pkgs/distlock/service/internal/providers_actor.go b/pkgs/distlock/service/internal/providers_actor.go deleted file mode 100644 index fa56a5b..0000000 --- a/pkgs/distlock/service/internal/providers_actor.go +++ /dev/null @@ -1,216 +0,0 @@ -package internal - -import ( - "context" - "fmt" - - "gitlink.org.cn/cloudream/common/pkgs/actor" - "gitlink.org.cn/cloudream/common/pkgs/distlock" - "gitlink.org.cn/cloudream/common/pkgs/future" - "gitlink.org.cn/cloudream/common/pkgs/trie" -) - -type indexWaiter struct { - Index int64 - Future *future.SetVoidFuture -} - -type ProvidersActor struct { - localLockReqIndex int64 - provdersTrie trie.Trie[distlock.LockProvider] - allProviders []distlock.LockProvider - - indexWaiters []indexWaiter - - commandChan *actor.CommandChannel -} - -func NewProvidersActor() *ProvidersActor { - return &ProvidersActor{ - commandChan: actor.NewCommandChannel(), - } -} - -func (a *ProvidersActor) AddProvider(prov distlock.LockProvider, path ...any) { - a.provdersTrie.Create(path).Value = prov - a.allProviders = append(a.allProviders, prov) -} - -func (a *ProvidersActor) Init() { -} - -func (a *ProvidersActor) WaitIndexUpdated(ctx context.Context, index int64) error { - fut := future.NewSetVoid() - - a.commandChan.Send(func() { - if index <= a.localLockReqIndex { - fut.SetVoid() - } else { - a.indexWaiters = append(a.indexWaiters, indexWaiter{ - Index: index, - Future: fut, - }) - } - }) - - return fut.Wait(ctx) -} - -func (a *ProvidersActor) ApplyLockRequestEvents(events []LockRequestEvent) error { - return actor.Wait(context.TODO(), a.commandChan, func() error { - for _, op := range events { - if op.IsLocking { - err := a.lockLockRequest(op.Data) - if err != nil { - return fmt.Errorf("lock by lock request data failed, err: %w", err) - } - - } else { - err := a.unlockLockRequest(op.Data) - if err != nil { - return fmt.Errorf("unlock by lock request data failed, err: %w", err) - } - } - } - - // 处理了多少事件,Index就往后移动多少个 - a.localLockReqIndex += int64(len(events)) - - // 检查是否有等待同步进度的需求 - a.checkIndexWaiter() - - return nil - }) -} - -func (svc *ProvidersActor) lockLockRequest(reqData LockRequestData) error { - for _, lockData := range reqData.Locks { - node, ok := svc.provdersTrie.WalkEnd(lockData.Path) - if !ok || node.Value == nil { - return fmt.Errorf("lock provider not found for path %v", lockData.Path) - } - - target, err := node.Value.ParseTargetString(lockData.Target) - if err != nil { - return fmt.Errorf("parse target data failed, err: %w", err) - } - - err = node.Value.Lock(reqData.ID, distlock.Lock{ - Path: lockData.Path, - Name: lockData.Name, - Target: target, - }) - if err != nil { - return fmt.Errorf("locking failed, err: %w", err) - } - } - return nil -} - -func (svc *ProvidersActor) unlockLockRequest(reqData LockRequestData) error { - for _, lockData := range reqData.Locks { - node, ok := svc.provdersTrie.WalkEnd(lockData.Path) - if !ok || node.Value == nil { - return fmt.Errorf("lock provider not found for path %v", lockData.Path) - } - - target, err := node.Value.ParseTargetString(lockData.Target) - if err != nil { - return fmt.Errorf("parse target data failed, err: %w", err) - } - - err = node.Value.Unlock(reqData.ID, distlock.Lock{ - Path: lockData.Path, - Name: lockData.Name, - Target: target, - }) - if err != nil { - return fmt.Errorf("unlocking failed, err: %w", err) - } - } - return nil -} - -// TestLockRequestAndMakeData 判断锁能否锁成功,并生成锁数据的字符串表示。注:不会生成请求ID。 -// 在检查单个锁是否能上锁时,不会考虑同一个锁请求中的其他的锁影响。简单来说,就是同一个请求中的锁可以互相冲突。 -func (a *ProvidersActor) TestLockRequestAndMakeData(req distlock.LockRequest) (LockRequestData, error) { - return actor.WaitValue(context.TODO(), a.commandChan, func() (LockRequestData, error) { - reqData := LockRequestData{} - - for _, lock := range req.Locks { - n, ok := a.provdersTrie.WalkEnd(lock.Path) - if !ok || n.Value == nil { - return LockRequestData{}, fmt.Errorf("lock provider not found for path %v", lock.Path) - } - - err := n.Value.CanLock(lock) - if err != nil { - return LockRequestData{}, err - } - - targetStr, err := n.Value.GetTargetString(lock.Target) - if err != nil { - return LockRequestData{}, fmt.Errorf("get lock target string failed, err: %w", err) - } - - reqData.Locks = append(reqData.Locks, lockData{ - Path: lock.Path, - Name: lock.Name, - Target: targetStr, - }) - } - - return reqData, nil - }) -} - -// ResetState 重置内部状态 -func (a *ProvidersActor) ResetState(index int64, lockRequestData []LockRequestData) error { - return actor.Wait(context.TODO(), a.commandChan, func() error { - for _, p := range a.allProviders { - p.Clear() - } - - for _, reqData := range lockRequestData { - err := a.lockLockRequest(reqData) - if err != nil { - return fmt.Errorf("lock by lock request data failed, err: %w", err) - } - } - - a.localLockReqIndex = index - - // 检查是否有等待同步进度的需求 - a.checkIndexWaiter() - - return nil - }) -} - -func (a *ProvidersActor) checkIndexWaiter() { - var resetWaiters []indexWaiter - for _, waiter := range a.indexWaiters { - if waiter.Index <= a.localLockReqIndex { - waiter.Future.SetVoid() - } else { - resetWaiters = append(resetWaiters, waiter) - } - } - a.indexWaiters = resetWaiters -} - -func (a *ProvidersActor) Serve() error { - cmdChan := a.commandChan.BeginChanReceive() - defer a.commandChan.CloseChanReceive() - - for { - select { - case cmd, ok := <-cmdChan: - if !ok { - return fmt.Errorf("command channel closed") - } - - cmd() - } - } -} diff --git a/pkgs/distlock/service/internal/retry_actor.go b/pkgs/distlock/service/internal/retry_actor.go deleted file mode 100644 index 6f6e304..0000000 --- a/pkgs/distlock/service/internal/retry_actor.go +++ /dev/null @@ -1,128 +0,0 @@ -package internal - -import ( - "context" - "fmt" - - "github.com/samber/lo" - "gitlink.org.cn/cloudream/common/pkgs/actor" - "gitlink.org.cn/cloudream/common/pkgs/distlock" - "gitlink.org.cn/cloudream/common/pkgs/future" - "gitlink.org.cn/cloudream/common/pkgs/logger" - mylo "gitlink.org.cn/cloudream/common/utils/lo" -) - -type retryInfo struct { - Callback *future.SetValueFuture[string] - LastErr error -} - -type RetryActor struct { - retrys []distlock.LockRequest - retryInfos []*retryInfo - - commandChan *actor.CommandChannel - - mainActor *MainActor -} - -func NewRetryActor() *RetryActor { - return &RetryActor{ - commandChan: actor.NewCommandChannel(), - } -} - -func (a *RetryActor) Init(mainActor *MainActor) { - a.mainActor = mainActor -} - -func (a *RetryActor) Retry(ctx context.Context, req distlock.LockRequest, lastErr error) (future.ValueFuture[string], error) { - fut := future.NewSetValue[string]() - - var info *retryInfo - err := actor.Wait(ctx, a.commandChan, func() error { - a.retrys = append(a.retrys, req) - info = &retryInfo{ - Callback: fut, - LastErr: lastErr, - } - a.retryInfos = append(a.retryInfos, info) - return nil - }) - if err != nil { - return nil, err - } - - go func() { - <-ctx.Done() - a.commandChan.Send(func() { - // 由于只可能在cmd中修改future状态,所以此处的IsComplete判断可以作为后续操作的依据 - if fut.IsComplete() { - return - } - - index := lo.IndexOf(a.retryInfos, info) - if index == -1 { - return - } - - a.retryInfos[index].Callback.SetError(a.retryInfos[index].LastErr) - - a.retrys = mylo.RemoveAt(a.retrys, index) - a.retryInfos = mylo.RemoveAt(a.retryInfos, index) - }) - }() - - return fut, nil -} - -func (a *RetryActor) OnLocalStateUpdated() { - a.commandChan.Send(func() { - if len(a.retrys) == 0 { - return - } - - rets, err := a.mainActor.AcquireMany(context.Background(), a.retrys) - if err != nil { - // TODO 处理错误 - logger.Std.Warnf("acquire many lock requests failed, err: %s", err.Error()) - return - } - - // 根据尝试的结果更新状态 - delCnt := 0 - for i, ret := range rets { - a.retrys[i-delCnt] = a.retrys[i] - a.retryInfos[i-delCnt] = a.retryInfos[i] - - if !ret.IsTried { - continue - } - - if ret.Err != nil { - a.retryInfos[i].LastErr = ret.Err - } else { - a.retryInfos[i].Callback.SetValue(ret.RequestID) - delCnt++ - } - } - a.retrys = a.retrys[:len(a.retrys)-delCnt] - a.retryInfos = a.retryInfos[:len(a.retryInfos)-delCnt] - }) -} - -func (a *RetryActor) Serve() error { - cmdChan := a.commandChan.BeginChanReceive() - defer a.commandChan.CloseChanReceive() - - for { - select { - case cmd, ok := <-cmdChan: - if !ok { - return fmt.Errorf("command channel closed") - } - - cmd() - } - } -} diff --git a/pkgs/distlock/service/internal/utils.go b/pkgs/distlock/service/internal/utils.go deleted file mode 100644 index d77e5e3..0000000 --- a/pkgs/distlock/service/internal/utils.go +++ /dev/null @@ -1,116 +0,0 @@ -package internal - -import ( - "strings" -) - -func makeEtcdLockRequestKey(reqID string) string { - return EtcdLockRequestData + "/" + reqID -} - -func getLockRequestID(key string) string { - return strings.TrimPrefix(key, EtcdLockRequestData+"/") -} - -/* -func parseLockData(str string) (lock lockData, err error) { - sb := strings.Builder{} - var comps []string - - escaping := false - for _, ch := range strings.TrimSpace(str) { - if escaping { - if ch == 'n' { - sb.WriteRune('\n') - } else { - sb.WriteRune(ch) - } - - escaping = false - continue - } - - if ch == '/' { - comps = append(comps, sb.String()) - sb.Reset() - } else if ch == '\\' { - escaping = true - } else { - sb.WriteRune(ch) - } - } - - comps = append(comps, sb.String()) - - if len(comps) < 3 { - return lockData{}, fmt.Errorf("string must includes 3 components devided by /") - } - - return lockData{ - Path: comps[0 : len(comps)-2], - Name: comps[len(comps)-2], - Target: comps[len(comps)-1], - }, nil -} - -func lockDataToString(lock lockData) string { - sb := strings.Builder{} - - for _, s := range lock.Path { - sb.WriteString(lockDataEncoding(s)) - sb.WriteRune('/') - } - - sb.WriteString(lockDataEncoding(lock.Name)) - sb.WriteRune('/') - - sb.WriteString(lockDataEncoding(lock.Target)) - - return sb.String() -} - -func lockDataEncoding(str string) string { - sb := strings.Builder{} - - for _, ch := range str { - if ch == '\\' { - sb.WriteString("\\\\") - } else if ch == '/' { - sb.WriteString("\\/") - } else if ch == '\n' { - sb.WriteString("\\n") - } else { - sb.WriteRune(ch) - } - } - - return sb.String() -} - -func lockDataDecoding(str string) string { - sb := strings.Builder{} - - escaping := false - for _, ch := range str { - if escaping { - if ch == 'n' { - sb.WriteRune('\n') - } else { - sb.WriteRune(ch) - } - - escaping = false - continue - } - - if ch == '\\' { - escaping = true - - } else { - sb.WriteRune(ch) - } - } - - return sb.String() -} -*/ diff --git a/pkgs/distlock/service/internal/utils_test.go b/pkgs/distlock/service/internal/utils_test.go deleted file mode 100644 index 6c9f95c..0000000 --- a/pkgs/distlock/service/internal/utils_test.go +++ /dev/null @@ -1,61 +0,0 @@ -package internal - -/* -import ( - . "github.com/smartystreets/goconvey/convey" -) - -func Test_parseLockData_lockDataToString(t *testing.T) { - cases := []struct { - title string - data lockData - }{ - { - title: "多段路径", - data: lockData{ - Path: []string{"a", "b", "c"}, - Name: "d", - Target: "e", - }, - }, - - { - title: "包含分隔符", - data: lockData{ - Path: []string{"a/", "b", "c/c"}, - Name: "/d", - Target: "///e//d/", - }, - }, - - { - title: "包含转义符", - data: lockData{ - Path: []string{"a\\/", "b", "\\c/c"}, - Name: "/d", - Target: "///e\\//d/\\", - }, - }, - - { - title: "包含换行符", - data: lockData{ - Path: []string{"a\n", "\nb", "c\nc"}, - Name: "/d", - Target: "e\nd\n", - }, - }, - } - - for _, ca := range cases { - Convey(ca.title, t, func() { - str := lockDataToString(ca.data) - - data, err := parseLockData(str) - - So(err, ShouldBeNil) - So(data, ShouldResemble, ca.data) - }) - } -} -*/ diff --git a/pkgs/distlock/service/internal/watch_etcd_actor.go b/pkgs/distlock/service/internal/watch_etcd_actor.go deleted file mode 100644 index 5d0530c..0000000 --- a/pkgs/distlock/service/internal/watch_etcd_actor.go +++ /dev/null @@ -1,149 +0,0 @@ -package internal - -import ( - "context" - "fmt" - - "gitlink.org.cn/cloudream/common/pkgs/actor" - mylo "gitlink.org.cn/cloudream/common/utils/lo" - "gitlink.org.cn/cloudream/common/utils/serder" - clientv3 "go.etcd.io/etcd/client/v3" -) - -type LockRequestEvent struct { - IsLocking bool - Data LockRequestData -} - -type LockRequestEventWatcher struct { - OnEvent func(events []LockRequestEvent) -} - -type WatchEtcdActor struct { - etcdCli *clientv3.Client - watchChan clientv3.WatchChan - lockReqWatchers []*LockRequestEventWatcher - - commandChan *actor.CommandChannel -} - -func NewWatchEtcdActor(etcdCli *clientv3.Client) *WatchEtcdActor { - return &WatchEtcdActor{ - etcdCli: etcdCli, - commandChan: actor.NewCommandChannel(), - } -} - -func (a *WatchEtcdActor) Init() { -} - -func (a *WatchEtcdActor) StartWatching() error { - return actor.Wait(context.TODO(), a.commandChan, func() error { - a.watchChan = a.etcdCli.Watch(context.Background(), EtcdLockRequestData, clientv3.WithPrefix(), clientv3.WithPrevKV()) - return nil - }) -} - -func (a *WatchEtcdActor) StopWatching() error { - return actor.Wait(context.TODO(), a.commandChan, func() error { - a.watchChan = nil - return nil - }) -} - -func (a *WatchEtcdActor) AddEventWatcher(watcher *LockRequestEventWatcher) error { - return actor.Wait(context.TODO(), a.commandChan, func() error { - a.lockReqWatchers = append(a.lockReqWatchers, watcher) - return nil - }) -} - -func (a *WatchEtcdActor) RemoveEventWatcher(watcher *LockRequestEventWatcher) error { - return actor.Wait(context.TODO(), a.commandChan, func() error { - a.lockReqWatchers = mylo.Remove(a.lockReqWatchers, watcher) - return nil - }) -} - -func (a *WatchEtcdActor) Serve() error { - cmdChan := a.commandChan.BeginChanReceive() - defer a.commandChan.CloseChanReceive() - - for { - if a.watchChan != nil { - select { - case cmd, ok := <-cmdChan: - if !ok { - return fmt.Errorf("command channel closed") - } - - cmd() - - case msg := <-a.watchChan: - if msg.Canceled { - // TODO 更好的错误处理 - return fmt.Errorf("watch etcd channel closed") - } - - events, err := a.parseEvents(msg) - if err != nil { - // TODO 更好的错误处理 - return fmt.Errorf("parse etcd lock request data failed, err: %w", err) - } - - for _, w := range a.lockReqWatchers { - w.OnEvent(events) - } - } - - } else { - select { - case cmd, ok := <-cmdChan: - if !ok { - return fmt.Errorf("command channel closed") - } - - cmd() - } - } - } -} - -func (a *WatchEtcdActor) parseEvents(watchResp clientv3.WatchResponse) ([]LockRequestEvent, error) { - var events []LockRequestEvent - - for _, e := range watchResp.Events { - - shouldParseData := false - isLocking := true - var valueData []byte - - // 只监听新建和删除的事件,因为在设计上约定只有这两种事件才会影响Index - if e.Type == clientv3.EventTypeDelete { - shouldParseData = true - isLocking = false - valueData = e.PrevKv.Value - } else if e.IsCreate() { - shouldParseData = true - isLocking = true - valueData = e.Kv.Value - } - - if !shouldParseData { - continue - } - - var reqData LockRequestData - err := serder.JSONToObject(valueData, &reqData) - if err != nil { - return nil, fmt.Errorf("parse lock request data failed, err: %w", err) - } - - events = append(events, LockRequestEvent{ - IsLocking: isLocking, - Data: reqData, - }) - } - - return events, nil -} diff --git a/pkgs/distlock/service/service.go b/pkgs/distlock/service/service.go deleted file mode 100644 index a655a7d..0000000 --- a/pkgs/distlock/service/service.go +++ /dev/null @@ -1,234 +0,0 @@ -package service - -import ( - "context" - "fmt" - "time" - - "gitlink.org.cn/cloudream/common/pkgs/distlock" - "gitlink.org.cn/cloudream/common/pkgs/distlock/service/internal" - "gitlink.org.cn/cloudream/common/pkgs/logger" - clientv3 "go.etcd.io/etcd/client/v3" -) - -type AcquireOption struct { - Timeout time.Duration - Lease time.Duration -} - -type AcquireOptionFn func(opt *AcquireOption) - -func WithTimeout(timeout time.Duration) AcquireOptionFn { - return func(opt *AcquireOption) { - opt.Timeout = timeout - } -} - -func WithLease(time time.Duration) AcquireOptionFn { - return func(opt *AcquireOption) { - opt.Lease = time - } -} - -type PathProvider struct { - Path []any - Provider distlock.LockProvider -} - -func NewPathProvider(prov distlock.LockProvider, path ...any) PathProvider { - return PathProvider{ - Path: path, - Provider: prov, - } -} - -type Service struct { - cfg *distlock.Config - etcdCli *clientv3.Client - - mainActor *internal.MainActor - providersActor *internal.ProvidersActor - watchEtcdActor *internal.WatchEtcdActor - leaseActor *internal.LeaseActor - retryActor *internal.RetryActor - - lockReqEventWatcher internal.LockRequestEventWatcher -} - -func NewService(cfg *distlock.Config, initProvs []PathProvider) (*Service, error) { - etcdCli, err := clientv3.New(clientv3.Config{ - Endpoints: []string{cfg.EtcdAddress}, - Username: cfg.EtcdUsername, - Password: cfg.EtcdPassword, - DialTimeout: time.Second * 5, - }) - if err != nil { - return nil, fmt.Errorf("new etcd client failed, err: %w", err) - } - - mainActor := internal.NewMainActor(cfg, etcdCli) - providersActor := internal.NewProvidersActor() - watchEtcdActor := internal.NewWatchEtcdActor(etcdCli) - leaseActor := internal.NewLeaseActor() - retryActor := internal.NewRetryActor() - - mainActor.Init(providersActor) - providersActor.Init() - watchEtcdActor.Init() - leaseActor.Init(mainActor) - retryActor.Init(mainActor) - - for _, prov := range initProvs { - providersActor.AddProvider(prov.Provider, prov.Path...) - } - - return &Service{ - cfg: cfg, - etcdCli: etcdCli, - mainActor: mainActor, - providersActor: providersActor, - watchEtcdActor: watchEtcdActor, - leaseActor: leaseActor, - retryActor: retryActor, - }, nil -} - -// Acquire 请求一批锁。成功后返回锁请求ID -func (svc *Service) Acquire(req distlock.LockRequest, opts ...AcquireOptionFn) (string, error) { - var opt = AcquireOption{ - Timeout: time.Second * 10, - } - for _, fn := range opts { - fn(&opt) - } - - ctx := context.Background() - if opt.Timeout != 0 { - var cancel func() - ctx, cancel = context.WithTimeout(ctx, opt.Timeout) - defer cancel() - } - - reqID, err := svc.mainActor.Acquire(ctx, req) - if err != nil { - fut, err := svc.retryActor.Retry(ctx, req, err) - if err != nil { - return "", fmt.Errorf("retrying failed, err: %w", err) - } - - // Retry 如果超时,Retry内部会设置fut为Failed,所以这里可以用Background无限等待 - reqID, err = fut.WaitValue(context.Background()) - if err != nil { - return "", err - } - } - - if opt.Lease > 0 { - // TODO 不影响结果,但考虑打日志 - err := svc.leaseActor.Add(reqID, opt.Lease) - if err != nil { - logger.Std.Warnf("adding lease: %s", err.Error()) - } - } - - return reqID, nil -} - -// Renew 续约锁。只有在加锁时设置了续约时间才有意义 -func (svc *Service) Renew(reqID string) error { - return svc.leaseActor.Renew(reqID) -} - -// Release 释放锁 -func (svc *Service) Release(reqID string) error { - err := svc.mainActor.Release(context.TODO(), reqID) - - // TODO 不影响结果,但考虑打日志 - err2 := svc.leaseActor.Remove(reqID) - if err2 != nil { - logger.Std.Warnf("removing lease: %s", err2.Error()) - } - - return err -} - -func (svc *Service) Serve() error { - // TODO 需要停止service的方法 - // 目前已知问题: - // 1. client退出时直接中断进程,此时RetryActor可能正在进行Retry,于是导致Etcd锁没有解除就退出了进程。 - // 虽然由于租约的存在不会导致系统长期卡死,但会影响client的使用 - - go func() { - // TODO 处理错误 - err := svc.providersActor.Serve() - if err != nil { - logger.Std.Warnf("serving providers actor failed, err: %s", err.Error()) - } - }() - - go func() { - // TODO 处理错误 - err := svc.watchEtcdActor.Serve() - if err != nil { - logger.Std.Warnf("serving watch etcd actor actor failed, err: %s", err.Error()) - } - }() - - go func() { - // TODO 处理错误 - err := svc.mainActor.Serve() - if err != nil { - logger.Std.Warnf("serving main actor failed, err: %s", err.Error()) - } - }() - - go func() { - // TODO 处理错误 - err := svc.leaseActor.Serve() - if err != nil { - logger.Std.Warnf("serving lease actor failed, err: %s", err.Error()) - } - }() - - go func() { - // TODO 处理错误 - err := svc.retryActor.Serve() - if err != nil { - logger.Std.Warnf("serving retry actor failed, err: %s", err.Error()) - } - }() - - err := svc.mainActor.ReloadEtcdData() - if err != nil { - // TODO 关闭其他的Actor,或者更好的错误处理方式 - return fmt.Errorf("init data failed, err: %w", err) - } - - svc.lockReqEventWatcher.OnEvent = func(events []internal.LockRequestEvent) { - svc.providersActor.ApplyLockRequestEvents(events) - svc.retryActor.OnLocalStateUpdated() - } - err = svc.watchEtcdActor.AddEventWatcher(&svc.lockReqEventWatcher) - if err != nil { - // TODO 关闭其他的Actor,或者更好的错误处理方式 - return fmt.Errorf("add lock request event watcher failed, err: %w", err) - } - - err = svc.watchEtcdActor.StartWatching() - if err != nil { - // TODO 关闭其他的Actor,或者更好的错误处理方式 - return fmt.Errorf("start watching etcd failed, err: %w", err) - } - - err = svc.leaseActor.StartChecking() - if err != nil { - // TODO 关闭其他的Actor,或者更好的错误处理方式 - return fmt.Errorf("start checking lease failed, err: %w", err) - } - - // TODO 防止退出的临时解决办法 - ch := make(chan any) - <-ch - - return nil -} diff --git a/pkgs/future/set_value_future.go b/pkgs/future/set_value_future.go index fad7d36..4aea9e8 100644 --- a/pkgs/future/set_value_future.go +++ b/pkgs/future/set_value_future.go @@ -48,6 +48,8 @@ func (f *SetValueFuture[T]) IsComplete() bool { return f.isCompleted } +// 等待直到Complete或者ctx被取消。 +// 注:返回ErrContextCancelled不代表产生结果的过程没有执行过,甚至不代表Future没有Complete func (f *SetValueFuture[T]) Wait(ctx context.Context) error { select { case <-f.completeChan: diff --git a/sdks/imfs/models.go b/sdks/imfs/models.go index ac53719..8565719 100644 --- a/sdks/imfs/models.go +++ b/sdks/imfs/models.go @@ -1,5 +1,6 @@ package imsdk const ( - EnvPackageList = "IMFS_PACKAGE_LIST" + EnvPackageList = "IMFS_PACKAGE_LIST" + EnvServiceAddress = "IMFS_SERVICE_ADDRESS" )