Browse Source

将service的逻辑拆分成多个actor

pull/1/head
Sydonian 2 years ago
parent
commit
5eddbda982
5 changed files with 616 additions and 360 deletions
  1. +1
    -0
      pkg/distlock/distlock.go
  2. +251
    -0
      pkg/distlock/main_actor.go
  3. +210
    -0
      pkg/distlock/providers_actor.go
  4. +32
    -360
      pkg/distlock/service.go
  5. +122
    -0
      pkg/distlock/watch_etcd_actor.go

+ 1
- 0
pkg/distlock/distlock.go View File

@@ -21,6 +21,7 @@ func (b *LockRequest) Add(lock Lock) {
}

type lockRequestData struct {
ID string `json:"id"`
Locks []lockData `json:"locks"`
}



+ 251
- 0
pkg/distlock/main_actor.go View File

@@ -0,0 +1,251 @@
package distlock

import (
"context"
"fmt"
"strconv"
"time"

"gitlink.org.cn/cloudream/common/pkg/actor"
"gitlink.org.cn/cloudream/common/utils/serder"
clientv3 "go.etcd.io/etcd/client/v3"
"go.etcd.io/etcd/client/v3/concurrency"
)

type mainActor struct {
cfg *Config
etcdCli *clientv3.Client

commandChan *actor.CommandChannel

watchEtcd *watchEtcdActor
providers *providersActor
}

func newMainActor() *mainActor {
return &mainActor{
commandChan: actor.NewCommandChannel(),
}
}

func (a *mainActor) Init(watchEtcd *watchEtcdActor, providers *providersActor) {
a.watchEtcd = watchEtcd
a.providers = providers
}

// Acquire 请求一批锁。成功后返回锁请求ID
func (a *mainActor) Acquire(req LockRequest) (reqID string, err error) {
return actor.WaitValue[string](a.commandChan, func() (string, error) {
// TODO 根据不同的错误设置不同的错误类型,方便上层进行后续处理
unlock, err := a.acquireEtcdRequestDataLock()
if err != nil {
return "", fmt.Errorf("acquire etcd request data lock failed, err: %w", err)
}
defer unlock()

index, err := a.getEtcdLockRequestIndex()
if err != nil {
return "", err
}

// 等待本地状态同步到最新
err = a.providers.WaitIndexUpdated(index)
if err != nil {
return "", err
}

// 测试锁,并获得锁数据
reqData, err := a.providers.TestLockRequestAndMakeData(req)
if err != nil {
return "", err
}

// 锁成功,提交锁数据

nextIndexStr := strconv.FormatInt(index+1, 10)

reqData.ID = nextIndexStr

reqBytes, err := serder.ObjectToJSON(reqData)
if err != nil {
return "", fmt.Errorf("serialize lock request data failed, err: %w", err)
}

txResp, err := a.etcdCli.Txn(context.Background()).
Then(
clientv3.OpPut(LOCK_REQUEST_INDEX, nextIndexStr),
clientv3.OpPut(makeEtcdLockRequestKey(nextIndexStr), string(reqBytes)),
).
Commit()
if err != nil {
return "", fmt.Errorf("submit lock request data failed, err: %w", err)
}
if !txResp.Succeeded {
return "", fmt.Errorf("submit lock request data failed for lock request data index changed")
}

return nextIndexStr, nil
})
}

// Release 释放锁
func (a *mainActor) Release(reqID string) error {
return actor.Wait(a.commandChan, func() error {
// TODO 根据不同的错误设置不同的错误类型,方便上层进行后续处理
unlock, err := a.acquireEtcdRequestDataLock()
if err != nil {
return fmt.Errorf("acquire etcd request data lock failed, err: %w", err)
}
defer unlock()

index, err := a.getEtcdLockRequestIndex()
if err != nil {
return err
}

lockReqKey := makeEtcdLockRequestKey(reqID)
delResp, err := a.etcdCli.Delete(context.Background(), lockReqKey)
if err != nil {
return fmt.Errorf("delete lock request data failed, err: %w", err)
}

if delResp.Deleted == 0 {
// TODO 可以考虑返回一个更有辨识度的错误
return fmt.Errorf("lock request data not found")
}

nextIndexStr := strconv.FormatInt(index+1, 10)
_, err = a.etcdCli.Put(context.Background(), LOCK_REQUEST_INDEX, nextIndexStr)
if err != nil {
return fmt.Errorf("update lock request data index failed, err: %w", err)
}

return nil
})
}

func (a *mainActor) acquireEtcdRequestDataLock() (unlock func(), err error) {
lease, err := a.etcdCli.Grant(context.Background(), a.cfg.LockRequestDataConfig.LeaseTimeSec)
if err != nil {
return nil, fmt.Errorf("grant lease failed, err: %w", err)
}

session, err := concurrency.NewSession(a.etcdCli, concurrency.WithLease(lease.ID))
if err != nil {
return nil, fmt.Errorf("new session failed, err: %w", err)
}
defer session.Close()

mutex := concurrency.NewMutex(session, LOCK_REQUEST_LOCK_NAME)

timeout, cancelFunc := context.WithTimeout(context.Background(),
time.Duration(a.cfg.LockRequestDataConfig.AcquireTimeoutMs)*time.Millisecond)
defer cancelFunc()

err = mutex.Lock(timeout)
if err != nil {
return nil, fmt.Errorf("acquire lock failed, err: %w", err)
}

return func() {
mutex.Unlock(context.Background())
session.Close()
}, nil
}

func (a *mainActor) getEtcdLockRequestIndex() (int64, error) {
indexKv, err := a.etcdCli.Get(context.Background(), LOCK_REQUEST_INDEX)
if err != nil {
return 0, fmt.Errorf("get lock request index failed, err: %w", err)
}

if len(indexKv.Kvs) == 0 {
return 0, fmt.Errorf("lock request index not found in etcd")
}

index, err := strconv.ParseInt(string(indexKv.Kvs[0].Value), 0, 64)
if err != nil {
return 0, fmt.Errorf("parse lock request index failed, err: %w", err)
}

return index, nil
}

func (a *mainActor) ReloadEtcdData() error {
return actor.Wait(a.commandChan, func() error {
// 使用事务一次性获取index和锁数据,就不需要加全局锁了
txResp, err := a.etcdCli.Txn(context.Background()).
Then(
clientv3.OpGet(LOCK_REQUEST_INDEX),
clientv3.OpGet(LOCK_REQUEST_DATA_PREFIX, clientv3.WithPrefix()),
).
Commit()
if err != nil {
return fmt.Errorf("get etcd data failed, err: %w", err)
}
if !txResp.Succeeded {
return fmt.Errorf("get etcd data failed")
}

indexKvs := txResp.Responses[0].GetResponseRange().Kvs
lockKvs := txResp.Responses[1].GetResponseRange().Kvs

var index int64
var reqData []lockRequestData

// 解析Index
if len(indexKvs) > 0 {
val, err := strconv.ParseInt(string(indexKvs[0].Value), 0, 64)
if err != nil {
return fmt.Errorf("parse lock request index failed, err: %w", err)
}
index = val

} else {
index = 0
}

// 解析锁请求数据
for _, kv := range lockKvs {
var req lockRequestData
err := serder.JSONToObject(kv.Value, &req)
if err != nil {
return fmt.Errorf("parse lock request data failed, err: %w", err)
}

reqData = append(reqData, req)
}

// 先停止监听,再重置锁状态,最后恢复监听

err = a.watchEtcd.StopWatching()
if err != nil {
return fmt.Errorf("stop watching etcd failed, err: %w", err)
}

err = a.providers.ResetState(index, reqData)
if err != nil {
return fmt.Errorf("reset lock providers state failed, err: %w", err)
}

err = a.watchEtcd.StartWatching()
if err != nil {
return fmt.Errorf("start watching etcd failed, err: %w", err)
}

return nil
})
}

func (a *mainActor) Serve() error {
for {
select {
case cmd, ok := <-a.commandChan.ChanReceive():
if !ok {
return fmt.Errorf("command channel closed")
}

cmd()
}
}
}

+ 210
- 0
pkg/distlock/providers_actor.go View File

@@ -0,0 +1,210 @@
package distlock

import (
"fmt"

"gitlink.org.cn/cloudream/common/pkg/actor"
"gitlink.org.cn/cloudream/common/pkg/future"
"gitlink.org.cn/cloudream/common/pkg/trie"
)

type indexWaiter struct {
Index int64
Future *future.SetVoidFuture
}

type lockRequestDataUpdateOp struct {
Data lockRequestData
IsLock bool
}

type providersActor struct {
localLockReqIndex int64
provdersTrie trie.Trie[LockProvider]
allProviders []LockProvider

indexWaiters []indexWaiter

commandChan *actor.CommandChannel
}

func newProvidersActor() *providersActor {
return &providersActor{
commandChan: actor.NewCommandChannel(),
}
}

func (a *providersActor) Init() {
}

func (a *providersActor) WaitIndexUpdated(index int64) error {
fut := future.NewSetVoid()

a.commandChan.Send(func() {
if a.localLockReqIndex <= index {
fut.SetVoid()
} else {
a.indexWaiters = append(a.indexWaiters, indexWaiter{
Index: index,
Future: fut,
})
}
})

return fut.Wait()
}

func (a *providersActor) BatchUpdateByLockRequestData(ops []lockRequestDataUpdateOp) error {
return actor.Wait(a.commandChan, func() error {
for _, op := range ops {
if op.IsLock {
err := a.lockLockRequest(op.Data)
if err != nil {
return fmt.Errorf("lock by lock request data failed, err: %w", err)
}

} else {
err := a.unlockLockRequest(op.Data)
if err != nil {
return fmt.Errorf("unlock by lock request data failed, err: %w", err)
}
}
}

// 处理了多少事件,Index就往后移动多少个
a.localLockReqIndex += int64(len(ops))

// 检查是否有等待同步进度的需求
a.checkIndexWaiter()

return nil
})
}

func (svc *providersActor) lockLockRequest(reqData lockRequestData) error {
for _, lockData := range reqData.Locks {
node, ok := svc.provdersTrie.WalkEnd(lockData.Path)
if !ok || node.Value == nil {
return fmt.Errorf("lock provider not found for path %v", lockData.Path)
}

target, err := node.Value.ParseTargetString(lockData.Target)
if err != nil {
return fmt.Errorf("parse target data failed, err: %w", err)
}

err = node.Value.Lock(reqData.ID, Lock{
Path: lockData.Path,
Name: lockData.Name,
Target: target,
})
if err != nil {
return fmt.Errorf("locking failed, err: %w", err)
}
}
return nil
}

func (svc *providersActor) unlockLockRequest(reqData lockRequestData) error {
for _, lockData := range reqData.Locks {
node, ok := svc.provdersTrie.WalkEnd(lockData.Path)
if !ok || node.Value == nil {
return fmt.Errorf("lock provider not found for path %v", lockData.Path)
}

target, err := node.Value.ParseTargetString(lockData.Target)
if err != nil {
return fmt.Errorf("parse target data failed, err: %w", err)
}

err = node.Value.Unlock(reqData.ID, Lock{
Path: lockData.Path,
Name: lockData.Name,
Target: target,
})
if err != nil {
return fmt.Errorf("unlocking failed, err: %w", err)
}
}
return nil
}

// TestLockRequestAndMakeData 判断锁能否锁成功,并生成锁数据的字符串表示。注:不会生成请求ID
func (a *providersActor) TestLockRequestAndMakeData(req LockRequest) (lockRequestData, error) {
return actor.WaitValue[lockRequestData](a.commandChan, func() (lockRequestData, error) {
reqData := lockRequestData{}

for _, lock := range req.Locks {
n, ok := a.provdersTrie.WalkEnd(lock.Path)
if !ok || n.Value == nil {
return lockRequestData{}, fmt.Errorf("lock provider not found for path %v", lock.Path)
}

err := n.Value.CanLock(lock)
if err != nil {
return lockRequestData{}, err
}

targetStr, err := n.Value.GetTargetString(lock.Target)
if err != nil {
return lockRequestData{}, fmt.Errorf("get lock target string failed, err: %w", err)
}

reqData.Locks = append(reqData.Locks, lockData{
Path: lock.Path,
Name: lock.Name,
Target: targetStr,
})
}

return reqData, nil
})
}

// ResetState 重置内部状态
func (a *providersActor) ResetState(index int64, lockRequestData []lockRequestData) error {
return actor.Wait(a.commandChan, func() error {
for _, p := range a.allProviders {
p.Clear()
}

for _, reqData := range lockRequestData {
err := a.lockLockRequest(reqData)
if err != nil {
return fmt.Errorf("lock by lock request data failed, err: %w", err)
}
}

a.localLockReqIndex = index

// 检查是否有等待同步进度的需求
a.checkIndexWaiter()

return nil
})
}

func (a *providersActor) checkIndexWaiter() {
var resetWaiters []indexWaiter
for _, waiter := range a.indexWaiters {
if waiter.Index <= a.localLockReqIndex {
waiter.Future.SetVoid()
} else {
resetWaiters = append(resetWaiters, waiter)
}
}
a.indexWaiters = resetWaiters
}

func (a *providersActor) Serve() error {
for {
select {
case cmd, ok := <-a.commandChan.ChanReceive():
if !ok {
return fmt.Errorf("command channel closed")
}

cmd()
}
}
}

+ 32
- 360
pkg/distlock/service.go View File

@@ -1,17 +1,9 @@
package distlock

import (
"context"
"fmt"
"strconv"
"sync"
"time"

"gitlink.org.cn/cloudream/common/pkg/trie"
"gitlink.org.cn/cloudream/common/utils/serder"
"go.etcd.io/etcd/api/v3/mvccpb"
clientv3 "go.etcd.io/etcd/client/v3"
"go.etcd.io/etcd/client/v3/concurrency"
)

const (
@@ -24,12 +16,9 @@ type Service struct {
cfg *Config
etcdCli *clientv3.Client

providersLock sync.Mutex
provdersTrie trie.Trie[LockProvider]
allProviders []LockProvider
localLockReqIndex int64
waitLocalLockReqIndex int64
waitLocalLockReqIndexChan chan any
main *mainActor
providers *providersActor
watchEtcd *watchEtcdActor
}

func NewService(cfg *Config) (*Service, error) {
@@ -43,367 +32,50 @@ func NewService(cfg *Config) (*Service, error) {
return nil, fmt.Errorf("new etcd client failed, err: %w", err)
}

mainActor := newMainActor()
providersActor := newProvidersActor()
watchEtcdActor := newWatchEtcdActor()

mainActor.Init(watchEtcdActor, providersActor)
providersActor.Init()
watchEtcdActor.Init(providersActor)

return &Service{
cfg: cfg,
etcdCli: etcdCli,
cfg: cfg,
etcdCli: etcdCli,
main: mainActor,
providers: providersActor,
watchEtcd: watchEtcdActor,
}, nil
}

// Acquire 请求一批锁。成功后返回锁请求ID
func (svc *Service) Acquire(req LockRequest) (reqID string, err error) {
// TODO 根据不同的错误设置不同的错误类型,方便上层进行后续处理
unlock, err := svc.lockEtcdRequestData()
if err != nil {
return "", fmt.Errorf("acquire etcd request data lock failed, err: %w", err)
}
defer unlock()

index, err := svc.getEtcdLockRequestIndex()
if err != nil {
return "", err
}

// 测试锁,并获得锁数据
reqData, err := svc.testLockRequestAndMakeData(index, req)
if err != nil {
return "", err
}

// 锁成功,提交锁数据

nextIndexStr := strconv.FormatInt(index+1, 10)

reqBytes, err := serder.ObjectToJSON(reqData)
if err != nil {
return "", fmt.Errorf("serialize lock request data failed, err: %w", err)
}

txResp, err := svc.etcdCli.Txn(context.Background()).
// 文档上没有说明如果If为空,会执行Then还是Else,所以为了避免问题,设定一个恒成立的条件。
// 正常情况下,锁定全局锁期间index是不可能变化的,所以下面这个条件一定成立。
// 注:由于是字符串比较,所以修改此值的时候,必须保证是10进制,且无前后空格。
If(clientv3.Compare(clientv3.Value(LOCK_REQUEST_INDEX), "=", strconv.FormatInt(index, 10))).
Then(
clientv3.OpPut(LOCK_REQUEST_INDEX, nextIndexStr),
clientv3.OpPut(makeEtcdLockRequestKey(nextIndexStr), string(reqBytes)),
).
Commit()
if err != nil {
return "", fmt.Errorf("submit lock request data failed, err: %w", err)
}
if !txResp.Succeeded {
return "", fmt.Errorf("submit lock request data failed for lock request data index changed")
}

return nextIndexStr, nil
}

func (svc *Service) getEtcdLockRequestIndex() (int64, error) {
indexKv, err := svc.etcdCli.Get(context.Background(), LOCK_REQUEST_INDEX)
if err != nil {
return 0, fmt.Errorf("get lock request index failed, err: %w", err)
}

if len(indexKv.Kvs) == 0 {
return 0, fmt.Errorf("lock request index not found in etcd")
}

index, err := strconv.ParseInt(string(indexKv.Kvs[0].Value), 0, 64)
if err != nil {
return 0, fmt.Errorf("parse lock request index failed, err: %w", err)
}

return index, nil
}

func (svc *Service) testLockRequestAndMakeData(latestIndex int64, req LockRequest) (lockRequestData, error) {
svc.providersLock.Lock()
defer svc.providersLock.Unlock()

// 等待本地状态同步到最新
if svc.localLockReqIndex < latestIndex {
ch := make(chan any, 1)
svc.waitLocalLockReqIndex = latestIndex
svc.waitLocalLockReqIndexChan = ch

svc.providersLock.Unlock()

// TODO 超时
<-ch

// 等待完全同步完成,那么再次加锁,防止本地状态被更改。
// 设计上来说,锁定了etcd中的全局锁之后,不可能再有更改的事件发生,因此只要本地状态同步到了最新,
// watch协程就不会再收到事件,然后更改本地状态,但跨协程修改本地状态存在内存可见性问题,所以还是需要加锁来同步一下
svc.providersLock.Lock()
}

// 判断锁能否锁成功,并生成锁数据的字符串表示
reqData := lockRequestData{}

for _, lock := range req.Locks {
n, ok := svc.provdersTrie.WalkEnd(lock.Path)
if !ok || n.Value == nil {
return lockRequestData{}, fmt.Errorf("lock provider not found for path %v", lock.Path)
}

err := n.Value.CanLock(lock)
if err != nil {
return lockRequestData{}, err
}

targetStr, err := n.Value.GetTargetString(lock.Target)
if err != nil {
return lockRequestData{}, fmt.Errorf("get lock target string failed, err: %w", err)
}

reqData.Locks = append(reqData.Locks, lockData{
Path: lock.Path,
Name: lock.Name,
Target: targetStr,
})
}

return reqData, nil
return svc.main.Acquire(req)
}

// Renew 续约锁
func (svc *Service) Renew(lockReqID string) error {
func (svc *Service) Renew(reqID string) error {
panic("todo")

}

// Release 释放锁
func (svc *Service) Release(lockReqID string) error {
panic("todo")

func (svc *Service) Release(reqID string) error {
return svc.main.Release(reqID)
}

func (svc *Service) Serve() error {
return svc.watchRequestData()
}

func (svc *Service) lockEtcdRequestData() (unlock func(), err error) {
lease, err := svc.etcdCli.Grant(context.Background(), svc.cfg.LockRequestDataConfig.LeaseTimeSec)
if err != nil {
return nil, fmt.Errorf("grant lease failed, err: %w", err)
}

session, err := concurrency.NewSession(svc.etcdCli, concurrency.WithLease(lease.ID))
if err != nil {
return nil, fmt.Errorf("new session failed, err: %w", err)
}
defer session.Close()

mutex := concurrency.NewMutex(session, LOCK_REQUEST_LOCK_NAME)

timeout, cancelFunc := context.WithTimeout(context.Background(),
time.Duration(svc.cfg.LockRequestDataConfig.AcquireTimeoutMs)*time.Millisecond)
defer cancelFunc()

err = mutex.Lock(timeout)
if err != nil {
return nil, fmt.Errorf("acquire lock failed, err: %w", err)
}

return func() {
mutex.Unlock(context.Background())
session.Close()
}, nil
}

func (svc *Service) watchRequestData() error {
// TODO 考虑增加状态字段,调用API时根据状态字段来判断能不能调用成功
err := svc.loadInitData()
if err != nil {
return fmt.Errorf("load init data failed, err: %w", err)
}

dataWatchChan := svc.etcdCli.Watch(context.Background(), LOCK_REQUEST_DATA_PREFIX, clientv3.WithPrefix())

for {
select {
case msg := <-dataWatchChan:
if msg.Canceled {
return fmt.Errorf("watch canceled, err: %w", msg.Err())
}

err := svc.applyEvents(msg)
if err != nil {
return err
}
}
}
}

func (svc *Service) loadInitData() error {
index, locks, err := svc.getInitDataFromEtcd()
if err != nil {
return fmt.Errorf("get init data from etcd failed, err: %w", err)
}

err = svc.resetLocalLockRequestData(index, locks)
if err != nil {
return fmt.Errorf("reset local lock request data failed, err: %w", err)
}

return nil
}

func (svc *Service) getInitDataFromEtcd() ([]*mvccpb.KeyValue, []*mvccpb.KeyValue, error) {
unlock, err := svc.lockEtcdRequestData()
if err != nil {
return nil, nil, fmt.Errorf("try lock request data failed, err: %w", err)
}
defer unlock()

index, err := svc.etcdCli.Get(context.Background(), LOCK_REQUEST_INDEX)
if err != nil {
return nil, nil, fmt.Errorf("get lock request index failed, err: %w", err)
}

data, err := svc.etcdCli.Get(context.Background(), LOCK_REQUEST_DATA_PREFIX, clientv3.WithPrefix())
if err != nil {
return nil, nil, fmt.Errorf("get lock request data failed, err: %w", err)
}

return index.Kvs, data.Kvs, nil
}

func (svc *Service) resetLocalLockRequestData(index []*mvccpb.KeyValue, locks []*mvccpb.KeyValue) error {
svc.providersLock.Lock()
defer svc.providersLock.Unlock()

// 先清空所有的锁数据
for _, p := range svc.allProviders {
p.Clear()
}

// 然后再导入全量数据
for _, kv := range locks {
err := svc.lockLockRequest(kv)
if err != nil {
return err
}
}

// 更新本地index
if len(index) == 0 {
svc.localLockReqIndex = 0

} else {
val, err := strconv.ParseInt(string(index[0].Value), 0, 64)
if err != nil {
return fmt.Errorf("parse lock request index failed, err: %w", err)
}

svc.localLockReqIndex = val
}

// 检查是否有等待同步进度的需求
if svc.waitLocalLockReqIndexChan != nil && svc.waitLocalLockReqIndex <= svc.localLockReqIndex {
close(svc.waitLocalLockReqIndexChan)
svc.waitLocalLockReqIndexChan = nil
}

return nil
}

func (svc *Service) applyEvents(watchResp clientv3.WatchResponse) error {
handledCnt := 0

svc.providersLock.Lock()
defer svc.providersLock.Unlock()

for _, e := range watchResp.Events {
var err error

// 只监听新建和删除的事件,因为在设计上约定只有这两种事件才会影响Index
if e.Type == clientv3.EventTypeDelete {
err = svc.unlockLockRequest(e.Kv)
handledCnt++

} else if e.IsCreate() {
err = svc.lockLockRequest(e.Kv)
handledCnt++
}

if err != nil {
return fmt.Errorf("apply event failed, err: %w", err)
}
}

// 处理了多少事件,Index就往后移动多少个
svc.localLockReqIndex += int64(handledCnt)

// 检查是否有等待同步进度的需求
if svc.waitLocalLockReqIndexChan != nil && svc.waitLocalLockReqIndex <= svc.localLockReqIndex {
close(svc.waitLocalLockReqIndexChan)
svc.waitLocalLockReqIndexChan = nil
}

return nil
}

func (svc *Service) lockLockRequest(kv *mvccpb.KeyValue) error {
reqID := getLockRequestID(string(kv.Key))

var req lockRequestData
err := serder.JSONToObject(kv.Value, &req)
if err != nil {
return fmt.Errorf("parse lock request data")
}

for _, lockData := range req.Locks {
node, ok := svc.provdersTrie.WalkEnd(lockData.Path)
if !ok || node.Value == nil {
return fmt.Errorf("lock provider not found for path %v", lockData.Path)
}

target, err := node.Value.ParseTargetString(lockData.Target)
if err != nil {
return fmt.Errorf("parse target data failed, err: %w", err)
}

err = node.Value.Lock(reqID, Lock{
Path: lockData.Path,
Name: lockData.Name,
Target: target,
})
if err != nil {
return fmt.Errorf("locking failed, err: %w", err)
}
}
return nil
}

func (svc *Service) unlockLockRequest(kv *mvccpb.KeyValue) error {
reqID := getLockRequestID(string(kv.Key))

var req lockRequestData
err := serder.JSONToObject(kv.Value, &req)
if err != nil {
return fmt.Errorf("parse lock request data")
}

for _, lockData := range req.Locks {
node, ok := svc.provdersTrie.WalkEnd(lockData.Path)
if !ok || node.Value == nil {
return fmt.Errorf("lock provider not found for path %v", lockData.Path)
}

target, err := node.Value.ParseTargetString(lockData.Target)
if err != nil {
return fmt.Errorf("parse target data failed, err: %w", err)
}

err = node.Value.Unlock(reqID, Lock{
Path: lockData.Path,
Name: lockData.Name,
Target: target,
})
if err != nil {
return fmt.Errorf("unlocking failed, err: %w", err)
}
}
return nil
go func() {
// TODO 处理错误
svc.providers.Serve()
}()

go func() {
// TODO 处理错误
svc.watchEtcd.Serve()
}()

// 考虑更好的错误处理方式
return svc.main.Serve()
}

+ 122
- 0
pkg/distlock/watch_etcd_actor.go View File

@@ -0,0 +1,122 @@
package distlock

import (
"context"
"fmt"

"gitlink.org.cn/cloudream/common/pkg/actor"
"gitlink.org.cn/cloudream/common/utils/serder"
clientv3 "go.etcd.io/etcd/client/v3"
)

type watchEtcdActor struct {
etcdCli *clientv3.Client
watchChan clientv3.WatchChan

commandChan *actor.CommandChannel

providers *providersActor
}

func newWatchEtcdActor() *watchEtcdActor {
return &watchEtcdActor{
commandChan: actor.NewCommandChannel(),
}
}

func (a *watchEtcdActor) Init(providers *providersActor) {
a.providers = providers
}

func (a *watchEtcdActor) StartWatching() error {
return actor.Wait(a.commandChan, func() error {
a.watchChan = a.etcdCli.Watch(context.Background(), LOCK_REQUEST_DATA_PREFIX, clientv3.WithPrefix())
return nil
})
}

func (a *watchEtcdActor) StopWatching() error {
return actor.Wait(a.commandChan, func() error {
a.watchChan = nil
return nil
})
}

func (a *watchEtcdActor) Serve() error {
for {
if a.watchChan != nil {
select {
case cmd, ok := <-a.commandChan.ChanReceive():
if !ok {
return fmt.Errorf("command channel closed")
}

cmd()

case msg := <-a.watchChan:
if msg.Canceled {
// TODO 更好的错误处理
return fmt.Errorf("watch etcd channel closed")
}

ops, err := a.parseEvents(msg)
if err != nil {
// TODO 更好的错误处理
return fmt.Errorf("parse etcd lock request data failed, err: %w", err)
}

err = a.providers.BatchUpdateByLockRequestData(ops)
if err != nil {
// TODO 更好的错误处理
return fmt.Errorf("update local lock request data failed, err: %w", err)
}
}

} else {
select {
case cmd, ok := <-a.commandChan.ChanReceive():
if !ok {
return fmt.Errorf("command channel closed")
}

cmd()
}
}
}
}

func (a *watchEtcdActor) parseEvents(watchResp clientv3.WatchResponse) ([]lockRequestDataUpdateOp, error) {
var ops []lockRequestDataUpdateOp

for _, e := range watchResp.Events {

shouldParseData := false
isLock := true

// 只监听新建和删除的事件,因为在设计上约定只有这两种事件才会影响Index
if e.Type == clientv3.EventTypeDelete {
shouldParseData = true
isLock = false
} else if e.IsCreate() {
shouldParseData = true
isLock = true
}

if !shouldParseData {
continue
}

var reqData lockRequestData
err := serder.JSONToObject(e.Kv.Value, &reqData)
if err != nil {
return nil, fmt.Errorf("parse lock request data failed, err: %w", err)
}

ops = append(ops, lockRequestDataUpdateOp{
IsLock: isLock,
Data: reqData,
})
}

return ops, nil
}

Loading…
Cancel
Save