diff --git a/pkg/distlock/distlock.go b/pkg/distlock/distlock.go index 65755a7..d11f5b0 100644 --- a/pkg/distlock/distlock.go +++ b/pkg/distlock/distlock.go @@ -21,6 +21,7 @@ func (b *LockRequest) Add(lock Lock) { } type lockRequestData struct { + ID string `json:"id"` Locks []lockData `json:"locks"` } diff --git a/pkg/distlock/main_actor.go b/pkg/distlock/main_actor.go new file mode 100644 index 0000000..260527d --- /dev/null +++ b/pkg/distlock/main_actor.go @@ -0,0 +1,251 @@ +package distlock + +import ( + "context" + "fmt" + "strconv" + "time" + + "gitlink.org.cn/cloudream/common/pkg/actor" + "gitlink.org.cn/cloudream/common/utils/serder" + clientv3 "go.etcd.io/etcd/client/v3" + "go.etcd.io/etcd/client/v3/concurrency" +) + +type mainActor struct { + cfg *Config + etcdCli *clientv3.Client + + commandChan *actor.CommandChannel + + watchEtcd *watchEtcdActor + providers *providersActor +} + +func newMainActor() *mainActor { + return &mainActor{ + commandChan: actor.NewCommandChannel(), + } +} + +func (a *mainActor) Init(watchEtcd *watchEtcdActor, providers *providersActor) { + a.watchEtcd = watchEtcd + a.providers = providers +} + +// Acquire 请求一批锁。成功后返回锁请求ID +func (a *mainActor) Acquire(req LockRequest) (reqID string, err error) { + return actor.WaitValue[string](a.commandChan, func() (string, error) { + // TODO 根据不同的错误设置不同的错误类型,方便上层进行后续处理 + unlock, err := a.acquireEtcdRequestDataLock() + if err != nil { + return "", fmt.Errorf("acquire etcd request data lock failed, err: %w", err) + } + defer unlock() + + index, err := a.getEtcdLockRequestIndex() + if err != nil { + return "", err + } + + // 等待本地状态同步到最新 + err = a.providers.WaitIndexUpdated(index) + if err != nil { + return "", err + } + + // 测试锁,并获得锁数据 + reqData, err := a.providers.TestLockRequestAndMakeData(req) + if err != nil { + return "", err + } + + // 锁成功,提交锁数据 + + nextIndexStr := strconv.FormatInt(index+1, 10) + + reqData.ID = nextIndexStr + + reqBytes, err := serder.ObjectToJSON(reqData) + if err != nil { + return "", fmt.Errorf("serialize lock request data failed, err: %w", err) + } + + txResp, err := a.etcdCli.Txn(context.Background()). + Then( + clientv3.OpPut(LOCK_REQUEST_INDEX, nextIndexStr), + clientv3.OpPut(makeEtcdLockRequestKey(nextIndexStr), string(reqBytes)), + ). + Commit() + if err != nil { + return "", fmt.Errorf("submit lock request data failed, err: %w", err) + } + if !txResp.Succeeded { + return "", fmt.Errorf("submit lock request data failed for lock request data index changed") + } + + return nextIndexStr, nil + }) +} + +// Release 释放锁 +func (a *mainActor) Release(reqID string) error { + return actor.Wait(a.commandChan, func() error { + // TODO 根据不同的错误设置不同的错误类型,方便上层进行后续处理 + unlock, err := a.acquireEtcdRequestDataLock() + if err != nil { + return fmt.Errorf("acquire etcd request data lock failed, err: %w", err) + } + defer unlock() + + index, err := a.getEtcdLockRequestIndex() + if err != nil { + return err + } + + lockReqKey := makeEtcdLockRequestKey(reqID) + delResp, err := a.etcdCli.Delete(context.Background(), lockReqKey) + if err != nil { + return fmt.Errorf("delete lock request data failed, err: %w", err) + } + + if delResp.Deleted == 0 { + // TODO 可以考虑返回一个更有辨识度的错误 + return fmt.Errorf("lock request data not found") + } + + nextIndexStr := strconv.FormatInt(index+1, 10) + _, err = a.etcdCli.Put(context.Background(), LOCK_REQUEST_INDEX, nextIndexStr) + if err != nil { + return fmt.Errorf("update lock request data index failed, err: %w", err) + } + + return nil + }) +} + +func (a *mainActor) acquireEtcdRequestDataLock() (unlock func(), err error) { + lease, err := a.etcdCli.Grant(context.Background(), a.cfg.LockRequestDataConfig.LeaseTimeSec) + if err != nil { + return nil, fmt.Errorf("grant lease failed, err: %w", err) + } + + session, err := concurrency.NewSession(a.etcdCli, concurrency.WithLease(lease.ID)) + if err != nil { + return nil, fmt.Errorf("new session failed, err: %w", err) + } + defer session.Close() + + mutex := concurrency.NewMutex(session, LOCK_REQUEST_LOCK_NAME) + + timeout, cancelFunc := context.WithTimeout(context.Background(), + time.Duration(a.cfg.LockRequestDataConfig.AcquireTimeoutMs)*time.Millisecond) + defer cancelFunc() + + err = mutex.Lock(timeout) + if err != nil { + return nil, fmt.Errorf("acquire lock failed, err: %w", err) + } + + return func() { + mutex.Unlock(context.Background()) + session.Close() + }, nil +} + +func (a *mainActor) getEtcdLockRequestIndex() (int64, error) { + indexKv, err := a.etcdCli.Get(context.Background(), LOCK_REQUEST_INDEX) + if err != nil { + return 0, fmt.Errorf("get lock request index failed, err: %w", err) + } + + if len(indexKv.Kvs) == 0 { + return 0, fmt.Errorf("lock request index not found in etcd") + } + + index, err := strconv.ParseInt(string(indexKv.Kvs[0].Value), 0, 64) + if err != nil { + return 0, fmt.Errorf("parse lock request index failed, err: %w", err) + } + + return index, nil +} + +func (a *mainActor) ReloadEtcdData() error { + return actor.Wait(a.commandChan, func() error { + // 使用事务一次性获取index和锁数据,就不需要加全局锁了 + txResp, err := a.etcdCli.Txn(context.Background()). + Then( + clientv3.OpGet(LOCK_REQUEST_INDEX), + clientv3.OpGet(LOCK_REQUEST_DATA_PREFIX, clientv3.WithPrefix()), + ). + Commit() + if err != nil { + return fmt.Errorf("get etcd data failed, err: %w", err) + } + if !txResp.Succeeded { + return fmt.Errorf("get etcd data failed") + } + + indexKvs := txResp.Responses[0].GetResponseRange().Kvs + lockKvs := txResp.Responses[1].GetResponseRange().Kvs + + var index int64 + var reqData []lockRequestData + + // 解析Index + if len(indexKvs) > 0 { + val, err := strconv.ParseInt(string(indexKvs[0].Value), 0, 64) + if err != nil { + return fmt.Errorf("parse lock request index failed, err: %w", err) + } + index = val + + } else { + index = 0 + } + + // 解析锁请求数据 + for _, kv := range lockKvs { + var req lockRequestData + err := serder.JSONToObject(kv.Value, &req) + if err != nil { + return fmt.Errorf("parse lock request data failed, err: %w", err) + } + + reqData = append(reqData, req) + } + + // 先停止监听,再重置锁状态,最后恢复监听 + + err = a.watchEtcd.StopWatching() + if err != nil { + return fmt.Errorf("stop watching etcd failed, err: %w", err) + } + + err = a.providers.ResetState(index, reqData) + if err != nil { + return fmt.Errorf("reset lock providers state failed, err: %w", err) + } + + err = a.watchEtcd.StartWatching() + if err != nil { + return fmt.Errorf("start watching etcd failed, err: %w", err) + } + + return nil + }) +} + +func (a *mainActor) Serve() error { + for { + select { + case cmd, ok := <-a.commandChan.ChanReceive(): + if !ok { + return fmt.Errorf("command channel closed") + } + + cmd() + } + } +} diff --git a/pkg/distlock/providers_actor.go b/pkg/distlock/providers_actor.go new file mode 100644 index 0000000..913c38a --- /dev/null +++ b/pkg/distlock/providers_actor.go @@ -0,0 +1,210 @@ +package distlock + +import ( + "fmt" + + "gitlink.org.cn/cloudream/common/pkg/actor" + "gitlink.org.cn/cloudream/common/pkg/future" + "gitlink.org.cn/cloudream/common/pkg/trie" +) + +type indexWaiter struct { + Index int64 + Future *future.SetVoidFuture +} + +type lockRequestDataUpdateOp struct { + Data lockRequestData + IsLock bool +} + +type providersActor struct { + localLockReqIndex int64 + provdersTrie trie.Trie[LockProvider] + allProviders []LockProvider + + indexWaiters []indexWaiter + + commandChan *actor.CommandChannel +} + +func newProvidersActor() *providersActor { + return &providersActor{ + commandChan: actor.NewCommandChannel(), + } +} + +func (a *providersActor) Init() { +} + +func (a *providersActor) WaitIndexUpdated(index int64) error { + fut := future.NewSetVoid() + + a.commandChan.Send(func() { + if a.localLockReqIndex <= index { + fut.SetVoid() + } else { + a.indexWaiters = append(a.indexWaiters, indexWaiter{ + Index: index, + Future: fut, + }) + } + }) + + return fut.Wait() +} + +func (a *providersActor) BatchUpdateByLockRequestData(ops []lockRequestDataUpdateOp) error { + return actor.Wait(a.commandChan, func() error { + for _, op := range ops { + if op.IsLock { + err := a.lockLockRequest(op.Data) + if err != nil { + return fmt.Errorf("lock by lock request data failed, err: %w", err) + } + + } else { + err := a.unlockLockRequest(op.Data) + if err != nil { + return fmt.Errorf("unlock by lock request data failed, err: %w", err) + } + } + } + + // 处理了多少事件,Index就往后移动多少个 + a.localLockReqIndex += int64(len(ops)) + + // 检查是否有等待同步进度的需求 + a.checkIndexWaiter() + + return nil + }) +} + +func (svc *providersActor) lockLockRequest(reqData lockRequestData) error { + for _, lockData := range reqData.Locks { + node, ok := svc.provdersTrie.WalkEnd(lockData.Path) + if !ok || node.Value == nil { + return fmt.Errorf("lock provider not found for path %v", lockData.Path) + } + + target, err := node.Value.ParseTargetString(lockData.Target) + if err != nil { + return fmt.Errorf("parse target data failed, err: %w", err) + } + + err = node.Value.Lock(reqData.ID, Lock{ + Path: lockData.Path, + Name: lockData.Name, + Target: target, + }) + if err != nil { + return fmt.Errorf("locking failed, err: %w", err) + } + } + return nil +} + +func (svc *providersActor) unlockLockRequest(reqData lockRequestData) error { + for _, lockData := range reqData.Locks { + node, ok := svc.provdersTrie.WalkEnd(lockData.Path) + if !ok || node.Value == nil { + return fmt.Errorf("lock provider not found for path %v", lockData.Path) + } + + target, err := node.Value.ParseTargetString(lockData.Target) + if err != nil { + return fmt.Errorf("parse target data failed, err: %w", err) + } + + err = node.Value.Unlock(reqData.ID, Lock{ + Path: lockData.Path, + Name: lockData.Name, + Target: target, + }) + if err != nil { + return fmt.Errorf("unlocking failed, err: %w", err) + } + } + return nil +} + +// TestLockRequestAndMakeData 判断锁能否锁成功,并生成锁数据的字符串表示。注:不会生成请求ID +func (a *providersActor) TestLockRequestAndMakeData(req LockRequest) (lockRequestData, error) { + return actor.WaitValue[lockRequestData](a.commandChan, func() (lockRequestData, error) { + reqData := lockRequestData{} + + for _, lock := range req.Locks { + n, ok := a.provdersTrie.WalkEnd(lock.Path) + if !ok || n.Value == nil { + return lockRequestData{}, fmt.Errorf("lock provider not found for path %v", lock.Path) + } + + err := n.Value.CanLock(lock) + if err != nil { + return lockRequestData{}, err + } + + targetStr, err := n.Value.GetTargetString(lock.Target) + if err != nil { + return lockRequestData{}, fmt.Errorf("get lock target string failed, err: %w", err) + } + + reqData.Locks = append(reqData.Locks, lockData{ + Path: lock.Path, + Name: lock.Name, + Target: targetStr, + }) + } + + return reqData, nil + }) +} + +// ResetState 重置内部状态 +func (a *providersActor) ResetState(index int64, lockRequestData []lockRequestData) error { + return actor.Wait(a.commandChan, func() error { + for _, p := range a.allProviders { + p.Clear() + } + + for _, reqData := range lockRequestData { + err := a.lockLockRequest(reqData) + if err != nil { + return fmt.Errorf("lock by lock request data failed, err: %w", err) + } + } + + a.localLockReqIndex = index + + // 检查是否有等待同步进度的需求 + a.checkIndexWaiter() + + return nil + }) +} + +func (a *providersActor) checkIndexWaiter() { + var resetWaiters []indexWaiter + for _, waiter := range a.indexWaiters { + if waiter.Index <= a.localLockReqIndex { + waiter.Future.SetVoid() + } else { + resetWaiters = append(resetWaiters, waiter) + } + } + a.indexWaiters = resetWaiters +} + +func (a *providersActor) Serve() error { + for { + select { + case cmd, ok := <-a.commandChan.ChanReceive(): + if !ok { + return fmt.Errorf("command channel closed") + } + + cmd() + } + } +} diff --git a/pkg/distlock/service.go b/pkg/distlock/service.go index 966de99..f08a253 100644 --- a/pkg/distlock/service.go +++ b/pkg/distlock/service.go @@ -1,17 +1,9 @@ package distlock import ( - "context" "fmt" - "strconv" - "sync" - "time" - "gitlink.org.cn/cloudream/common/pkg/trie" - "gitlink.org.cn/cloudream/common/utils/serder" - "go.etcd.io/etcd/api/v3/mvccpb" clientv3 "go.etcd.io/etcd/client/v3" - "go.etcd.io/etcd/client/v3/concurrency" ) const ( @@ -24,12 +16,9 @@ type Service struct { cfg *Config etcdCli *clientv3.Client - providersLock sync.Mutex - provdersTrie trie.Trie[LockProvider] - allProviders []LockProvider - localLockReqIndex int64 - waitLocalLockReqIndex int64 - waitLocalLockReqIndexChan chan any + main *mainActor + providers *providersActor + watchEtcd *watchEtcdActor } func NewService(cfg *Config) (*Service, error) { @@ -43,367 +32,50 @@ func NewService(cfg *Config) (*Service, error) { return nil, fmt.Errorf("new etcd client failed, err: %w", err) } + mainActor := newMainActor() + providersActor := newProvidersActor() + watchEtcdActor := newWatchEtcdActor() + + mainActor.Init(watchEtcdActor, providersActor) + providersActor.Init() + watchEtcdActor.Init(providersActor) + return &Service{ - cfg: cfg, - etcdCli: etcdCli, + cfg: cfg, + etcdCli: etcdCli, + main: mainActor, + providers: providersActor, + watchEtcd: watchEtcdActor, }, nil } // Acquire 请求一批锁。成功后返回锁请求ID func (svc *Service) Acquire(req LockRequest) (reqID string, err error) { - // TODO 根据不同的错误设置不同的错误类型,方便上层进行后续处理 - unlock, err := svc.lockEtcdRequestData() - if err != nil { - return "", fmt.Errorf("acquire etcd request data lock failed, err: %w", err) - } - defer unlock() - - index, err := svc.getEtcdLockRequestIndex() - if err != nil { - return "", err - } - - // 测试锁,并获得锁数据 - reqData, err := svc.testLockRequestAndMakeData(index, req) - if err != nil { - return "", err - } - - // 锁成功,提交锁数据 - - nextIndexStr := strconv.FormatInt(index+1, 10) - - reqBytes, err := serder.ObjectToJSON(reqData) - if err != nil { - return "", fmt.Errorf("serialize lock request data failed, err: %w", err) - } - - txResp, err := svc.etcdCli.Txn(context.Background()). - // 文档上没有说明如果If为空,会执行Then还是Else,所以为了避免问题,设定一个恒成立的条件。 - // 正常情况下,锁定全局锁期间index是不可能变化的,所以下面这个条件一定成立。 - // 注:由于是字符串比较,所以修改此值的时候,必须保证是10进制,且无前后空格。 - If(clientv3.Compare(clientv3.Value(LOCK_REQUEST_INDEX), "=", strconv.FormatInt(index, 10))). - Then( - clientv3.OpPut(LOCK_REQUEST_INDEX, nextIndexStr), - clientv3.OpPut(makeEtcdLockRequestKey(nextIndexStr), string(reqBytes)), - ). - Commit() - if err != nil { - return "", fmt.Errorf("submit lock request data failed, err: %w", err) - } - if !txResp.Succeeded { - return "", fmt.Errorf("submit lock request data failed for lock request data index changed") - } - - return nextIndexStr, nil -} - -func (svc *Service) getEtcdLockRequestIndex() (int64, error) { - indexKv, err := svc.etcdCli.Get(context.Background(), LOCK_REQUEST_INDEX) - if err != nil { - return 0, fmt.Errorf("get lock request index failed, err: %w", err) - } - - if len(indexKv.Kvs) == 0 { - return 0, fmt.Errorf("lock request index not found in etcd") - } - - index, err := strconv.ParseInt(string(indexKv.Kvs[0].Value), 0, 64) - if err != nil { - return 0, fmt.Errorf("parse lock request index failed, err: %w", err) - } - - return index, nil -} - -func (svc *Service) testLockRequestAndMakeData(latestIndex int64, req LockRequest) (lockRequestData, error) { - svc.providersLock.Lock() - defer svc.providersLock.Unlock() - - // 等待本地状态同步到最新 - if svc.localLockReqIndex < latestIndex { - ch := make(chan any, 1) - svc.waitLocalLockReqIndex = latestIndex - svc.waitLocalLockReqIndexChan = ch - - svc.providersLock.Unlock() - - // TODO 超时 - <-ch - - // 等待完全同步完成,那么再次加锁,防止本地状态被更改。 - // 设计上来说,锁定了etcd中的全局锁之后,不可能再有更改的事件发生,因此只要本地状态同步到了最新, - // watch协程就不会再收到事件,然后更改本地状态,但跨协程修改本地状态存在内存可见性问题,所以还是需要加锁来同步一下 - svc.providersLock.Lock() - } - - // 判断锁能否锁成功,并生成锁数据的字符串表示 - reqData := lockRequestData{} - - for _, lock := range req.Locks { - n, ok := svc.provdersTrie.WalkEnd(lock.Path) - if !ok || n.Value == nil { - return lockRequestData{}, fmt.Errorf("lock provider not found for path %v", lock.Path) - } - - err := n.Value.CanLock(lock) - if err != nil { - return lockRequestData{}, err - } - - targetStr, err := n.Value.GetTargetString(lock.Target) - if err != nil { - return lockRequestData{}, fmt.Errorf("get lock target string failed, err: %w", err) - } - - reqData.Locks = append(reqData.Locks, lockData{ - Path: lock.Path, - Name: lock.Name, - Target: targetStr, - }) - } - - return reqData, nil + return svc.main.Acquire(req) } // Renew 续约锁 -func (svc *Service) Renew(lockReqID string) error { +func (svc *Service) Renew(reqID string) error { panic("todo") } // Release 释放锁 -func (svc *Service) Release(lockReqID string) error { - panic("todo") - +func (svc *Service) Release(reqID string) error { + return svc.main.Release(reqID) } func (svc *Service) Serve() error { - return svc.watchRequestData() -} - -func (svc *Service) lockEtcdRequestData() (unlock func(), err error) { - lease, err := svc.etcdCli.Grant(context.Background(), svc.cfg.LockRequestDataConfig.LeaseTimeSec) - if err != nil { - return nil, fmt.Errorf("grant lease failed, err: %w", err) - } - - session, err := concurrency.NewSession(svc.etcdCli, concurrency.WithLease(lease.ID)) - if err != nil { - return nil, fmt.Errorf("new session failed, err: %w", err) - } - defer session.Close() - - mutex := concurrency.NewMutex(session, LOCK_REQUEST_LOCK_NAME) - - timeout, cancelFunc := context.WithTimeout(context.Background(), - time.Duration(svc.cfg.LockRequestDataConfig.AcquireTimeoutMs)*time.Millisecond) - defer cancelFunc() - - err = mutex.Lock(timeout) - if err != nil { - return nil, fmt.Errorf("acquire lock failed, err: %w", err) - } - - return func() { - mutex.Unlock(context.Background()) - session.Close() - }, nil -} - -func (svc *Service) watchRequestData() error { - // TODO 考虑增加状态字段,调用API时根据状态字段来判断能不能调用成功 - err := svc.loadInitData() - if err != nil { - return fmt.Errorf("load init data failed, err: %w", err) - } - - dataWatchChan := svc.etcdCli.Watch(context.Background(), LOCK_REQUEST_DATA_PREFIX, clientv3.WithPrefix()) - - for { - select { - case msg := <-dataWatchChan: - if msg.Canceled { - return fmt.Errorf("watch canceled, err: %w", msg.Err()) - } - - err := svc.applyEvents(msg) - if err != nil { - return err - } - } - } -} - -func (svc *Service) loadInitData() error { - index, locks, err := svc.getInitDataFromEtcd() - if err != nil { - return fmt.Errorf("get init data from etcd failed, err: %w", err) - } - - err = svc.resetLocalLockRequestData(index, locks) - if err != nil { - return fmt.Errorf("reset local lock request data failed, err: %w", err) - } - - return nil -} - -func (svc *Service) getInitDataFromEtcd() ([]*mvccpb.KeyValue, []*mvccpb.KeyValue, error) { - unlock, err := svc.lockEtcdRequestData() - if err != nil { - return nil, nil, fmt.Errorf("try lock request data failed, err: %w", err) - } - defer unlock() - - index, err := svc.etcdCli.Get(context.Background(), LOCK_REQUEST_INDEX) - if err != nil { - return nil, nil, fmt.Errorf("get lock request index failed, err: %w", err) - } - - data, err := svc.etcdCli.Get(context.Background(), LOCK_REQUEST_DATA_PREFIX, clientv3.WithPrefix()) - if err != nil { - return nil, nil, fmt.Errorf("get lock request data failed, err: %w", err) - } - - return index.Kvs, data.Kvs, nil -} - -func (svc *Service) resetLocalLockRequestData(index []*mvccpb.KeyValue, locks []*mvccpb.KeyValue) error { - svc.providersLock.Lock() - defer svc.providersLock.Unlock() - - // 先清空所有的锁数据 - for _, p := range svc.allProviders { - p.Clear() - } - - // 然后再导入全量数据 - for _, kv := range locks { - err := svc.lockLockRequest(kv) - if err != nil { - return err - } - } - - // 更新本地index - if len(index) == 0 { - svc.localLockReqIndex = 0 - - } else { - val, err := strconv.ParseInt(string(index[0].Value), 0, 64) - if err != nil { - return fmt.Errorf("parse lock request index failed, err: %w", err) - } - - svc.localLockReqIndex = val - } - - // 检查是否有等待同步进度的需求 - if svc.waitLocalLockReqIndexChan != nil && svc.waitLocalLockReqIndex <= svc.localLockReqIndex { - close(svc.waitLocalLockReqIndexChan) - svc.waitLocalLockReqIndexChan = nil - } - - return nil -} - -func (svc *Service) applyEvents(watchResp clientv3.WatchResponse) error { - handledCnt := 0 - - svc.providersLock.Lock() - defer svc.providersLock.Unlock() - - for _, e := range watchResp.Events { - var err error - - // 只监听新建和删除的事件,因为在设计上约定只有这两种事件才会影响Index - if e.Type == clientv3.EventTypeDelete { - err = svc.unlockLockRequest(e.Kv) - handledCnt++ - - } else if e.IsCreate() { - err = svc.lockLockRequest(e.Kv) - handledCnt++ - } - - if err != nil { - return fmt.Errorf("apply event failed, err: %w", err) - } - } - - // 处理了多少事件,Index就往后移动多少个 - svc.localLockReqIndex += int64(handledCnt) - - // 检查是否有等待同步进度的需求 - if svc.waitLocalLockReqIndexChan != nil && svc.waitLocalLockReqIndex <= svc.localLockReqIndex { - close(svc.waitLocalLockReqIndexChan) - svc.waitLocalLockReqIndexChan = nil - } - - return nil -} - -func (svc *Service) lockLockRequest(kv *mvccpb.KeyValue) error { - reqID := getLockRequestID(string(kv.Key)) - - var req lockRequestData - err := serder.JSONToObject(kv.Value, &req) - if err != nil { - return fmt.Errorf("parse lock request data") - } - - for _, lockData := range req.Locks { - node, ok := svc.provdersTrie.WalkEnd(lockData.Path) - if !ok || node.Value == nil { - return fmt.Errorf("lock provider not found for path %v", lockData.Path) - } - - target, err := node.Value.ParseTargetString(lockData.Target) - if err != nil { - return fmt.Errorf("parse target data failed, err: %w", err) - } - - err = node.Value.Lock(reqID, Lock{ - Path: lockData.Path, - Name: lockData.Name, - Target: target, - }) - if err != nil { - return fmt.Errorf("locking failed, err: %w", err) - } - } - return nil -} - -func (svc *Service) unlockLockRequest(kv *mvccpb.KeyValue) error { - reqID := getLockRequestID(string(kv.Key)) - - var req lockRequestData - err := serder.JSONToObject(kv.Value, &req) - if err != nil { - return fmt.Errorf("parse lock request data") - } - - for _, lockData := range req.Locks { - node, ok := svc.provdersTrie.WalkEnd(lockData.Path) - if !ok || node.Value == nil { - return fmt.Errorf("lock provider not found for path %v", lockData.Path) - } - - target, err := node.Value.ParseTargetString(lockData.Target) - if err != nil { - return fmt.Errorf("parse target data failed, err: %w", err) - } - - err = node.Value.Unlock(reqID, Lock{ - Path: lockData.Path, - Name: lockData.Name, - Target: target, - }) - if err != nil { - return fmt.Errorf("unlocking failed, err: %w", err) - } - } - return nil + go func() { + // TODO 处理错误 + svc.providers.Serve() + }() + + go func() { + // TODO 处理错误 + svc.watchEtcd.Serve() + }() + + // 考虑更好的错误处理方式 + return svc.main.Serve() } diff --git a/pkg/distlock/watch_etcd_actor.go b/pkg/distlock/watch_etcd_actor.go new file mode 100644 index 0000000..5972406 --- /dev/null +++ b/pkg/distlock/watch_etcd_actor.go @@ -0,0 +1,122 @@ +package distlock + +import ( + "context" + "fmt" + + "gitlink.org.cn/cloudream/common/pkg/actor" + "gitlink.org.cn/cloudream/common/utils/serder" + clientv3 "go.etcd.io/etcd/client/v3" +) + +type watchEtcdActor struct { + etcdCli *clientv3.Client + watchChan clientv3.WatchChan + + commandChan *actor.CommandChannel + + providers *providersActor +} + +func newWatchEtcdActor() *watchEtcdActor { + return &watchEtcdActor{ + commandChan: actor.NewCommandChannel(), + } +} + +func (a *watchEtcdActor) Init(providers *providersActor) { + a.providers = providers +} + +func (a *watchEtcdActor) StartWatching() error { + return actor.Wait(a.commandChan, func() error { + a.watchChan = a.etcdCli.Watch(context.Background(), LOCK_REQUEST_DATA_PREFIX, clientv3.WithPrefix()) + return nil + }) +} + +func (a *watchEtcdActor) StopWatching() error { + return actor.Wait(a.commandChan, func() error { + a.watchChan = nil + return nil + }) +} + +func (a *watchEtcdActor) Serve() error { + for { + if a.watchChan != nil { + select { + case cmd, ok := <-a.commandChan.ChanReceive(): + if !ok { + return fmt.Errorf("command channel closed") + } + + cmd() + + case msg := <-a.watchChan: + if msg.Canceled { + // TODO 更好的错误处理 + return fmt.Errorf("watch etcd channel closed") + } + + ops, err := a.parseEvents(msg) + if err != nil { + // TODO 更好的错误处理 + return fmt.Errorf("parse etcd lock request data failed, err: %w", err) + } + + err = a.providers.BatchUpdateByLockRequestData(ops) + if err != nil { + // TODO 更好的错误处理 + return fmt.Errorf("update local lock request data failed, err: %w", err) + } + } + + } else { + select { + case cmd, ok := <-a.commandChan.ChanReceive(): + if !ok { + return fmt.Errorf("command channel closed") + } + + cmd() + } + } + } +} + +func (a *watchEtcdActor) parseEvents(watchResp clientv3.WatchResponse) ([]lockRequestDataUpdateOp, error) { + var ops []lockRequestDataUpdateOp + + for _, e := range watchResp.Events { + + shouldParseData := false + isLock := true + + // 只监听新建和删除的事件,因为在设计上约定只有这两种事件才会影响Index + if e.Type == clientv3.EventTypeDelete { + shouldParseData = true + isLock = false + } else if e.IsCreate() { + shouldParseData = true + isLock = true + } + + if !shouldParseData { + continue + } + + var reqData lockRequestData + err := serder.JSONToObject(e.Kv.Value, &reqData) + if err != nil { + return nil, fmt.Errorf("parse lock request data failed, err: %w", err) + } + + ops = append(ops, lockRequestDataUpdateOp{ + IsLock: isLock, + Data: reqData, + }) + } + + return ops, nil +}