Browse Source

拆分MainActor

pull/29/head
Sydonian 2 years ago
parent
commit
0eb03a6f2a
11 changed files with 472 additions and 520 deletions
  1. +2
    -5
      pkgs/distlock/config.go
  2. +233
    -0
      pkgs/distlock/service/internal/acquire_actor.go
  3. +4
    -12
      pkgs/distlock/service/internal/lease_actor.go
  4. +0
    -313
      pkgs/distlock/service/internal/main_actor.go
  5. +14
    -12
      pkgs/distlock/service/internal/providers_actor.go
  6. +144
    -0
      pkgs/distlock/service/internal/release_actor.go
  7. +0
    -128
      pkgs/distlock/service/internal/retry_actor.go
  8. +2
    -2
      pkgs/distlock/service/internal/watch_etcd_actor.go
  9. +2
    -2
      pkgs/distlock/service/mutex.go
  10. +69
    -46
      pkgs/distlock/service/service.go
  11. +2
    -0
      pkgs/future/set_value_future.go

+ 2
- 5
pkgs/distlock/config.go View File

@@ -5,9 +5,6 @@ type Config struct {
EtcdUsername string `json:"etcdUsername"`
EtcdPassword string `json:"etcdPassword"`

EtcdLockLeaseTimeSec int64 `json:"etcdLockLeaseTimeSec"` // 全局锁的租约时间。锁服务会在这个时间内自动续约锁,但如果服务崩溃,则其他服务在租约到期后能重新获得锁。

// 写入锁请求数据到的ETCD的时候,不设置租约。开启此选项之后,请求锁的服务崩溃,
// 锁请求数据会依然留在ETCD中。仅供调试使用。
SubmitLockRequestWithoutLease bool `json:"submitLockRequestWithoutLease"`
EtcdLockLeaseTimeSec int64 `json:"etcdLockLeaseTimeSec"` // 全局锁的租约时间。锁服务会在这个时间内自动续约锁,但如果服务崩溃,则其他服务在租约到期后能重新获得锁。
RandomReleasingDelayMs int64 `json:"randomReleasingDelayMs"` // 释放锁失败,随机延迟之后再次尝试。延迟时间=random(0, RandomReleasingDelayMs) + 最少延迟时间(1000ms)
}

+ 233
- 0
pkgs/distlock/service/internal/acquire_actor.go View File

@@ -0,0 +1,233 @@
package internal

import (
"context"
"errors"
"fmt"
"strconv"
"sync"

"gitlink.org.cn/cloudream/common/pkgs/distlock"
"gitlink.org.cn/cloudream/common/pkgs/future"
"gitlink.org.cn/cloudream/common/pkgs/logger"
mylo "gitlink.org.cn/cloudream/common/utils/lo"
"gitlink.org.cn/cloudream/common/utils/serder"
clientv3 "go.etcd.io/etcd/client/v3"
"go.etcd.io/etcd/client/v3/concurrency"
)

var ErrAcquiringTimeout = errors.New("acquiring timeout")

const (
EtcdLockRequestData = "/distlock/lockRequest/data"
EtcdLockRequestIndex = "/distlock/lockRequest/index"
EtcdLockRequestLock = "/distlock/lockRequest/lock"
)

type lockData struct {
Path []string `json:"path"`
Name string `json:"name"`
Target string `json:"target"`
}
type LockRequestData struct {
ID string `json:"id"`
Locks []lockData `json:"locks"`
}

type acquireInfo struct {
Request distlock.LockRequest
Callback *future.SetValueFuture[string]
LastErr error
}

type AcquireActor struct {
cfg *distlock.Config
etcdCli *clientv3.Client
providersActor *ProvidersActor

acquirings []*acquireInfo
lock sync.Mutex
}

func NewAcquireActor(cfg *distlock.Config, etcdCli *clientv3.Client) *AcquireActor {
return &AcquireActor{
cfg: cfg,
etcdCli: etcdCli,
}
}

func (a *AcquireActor) Init(providersActor *ProvidersActor) {
a.providersActor = providersActor
}

// Acquire 请求一批锁。成功后返回锁请求ID
func (a *AcquireActor) Acquire(ctx context.Context, req distlock.LockRequest) (string, error) {
info := &acquireInfo{
Request: req,
Callback: future.NewSetValue[string](),
}

func() {
a.lock.Lock()
defer a.lock.Unlock()

a.acquirings = append(a.acquirings, info)
// TODO 处理错误
err := a.doAcquiring()
if err != nil {
logger.Std.Debugf("doing acquiring: %s", err.Error())
}
}()

go func() {
info.Callback.Wait(ctx)

a.lock.Lock()
defer a.lock.Unlock()

// 调用Callback时都加了锁,所以此处的IsComplete判断可以作为后续操作的依据
if info.Callback.IsComplete() {
return
}

a.acquirings = mylo.Remove(a.acquirings, info)
if info.LastErr != nil {
info.Callback.SetError(info.LastErr)
} else {
info.Callback.SetError(ErrAcquiringTimeout)
}
}()

// 此处不能直接用ctx去等Callback,原因是Wait超时不代表锁没有获取到,这会导致锁泄露。
return info.Callback.WaitValue(context.Background())
}

// TryAcquireNow 重试一下内部还没有成功的锁请求。不会阻塞调用者
func (a *AcquireActor) TryAcquireNow() {
go func() {
a.lock.Lock()
defer a.lock.Unlock()

err := a.doAcquiring()
if err != nil {
logger.Std.Debugf("doing acquiring: %s", err.Error())
}
}()
}

func (a *AcquireActor) doAcquiring() error {
ctx := context.Background()

if len(a.acquirings) == 0 {
return nil
}

// TODO 根据不同的错误设置不同的错误类型,方便上层进行后续处理
unlock, err := acquireEtcdRequestDataLock(ctx, a.etcdCli, a.cfg.EtcdLockLeaseTimeSec)
if err != nil {
return fmt.Errorf("acquire etcd request data lock failed, err: %w", err)
}
defer unlock()

index, err := getEtcdLockRequestIndex(ctx, a.etcdCli)
if err != nil {
return err
}

// 等待本地状态同步到最新
// TODO 配置等待时间
err = a.providersActor.WaitIndexUpdated(ctx, index)
if err != nil {
return err
}

// TODO 可以考虑一次性获得多个锁
for i := 0; i < len(a.acquirings); i++ {
// 测试锁,并获得锁数据
reqData, err := a.providersActor.TestLockRequestAndMakeData(a.acquirings[i].Request)
if err != nil {
a.acquirings[i].LastErr = err
continue
}

nextIndexStr := strconv.FormatInt(index+1, 10)
reqData.ID = nextIndexStr

// 锁成功,提交锁数据
err = a.submitLockRequest(ctx, nextIndexStr, reqData)
if err != nil {
a.acquirings[i].LastErr = err
continue
}

a.acquirings[i].Callback.SetValue(reqData.ID)
a.acquirings = mylo.RemoveAt(a.acquirings, i)
break
}

return nil
}

func (a *AcquireActor) submitLockRequest(ctx context.Context, index string, reqData LockRequestData) error {
reqBytes, err := serder.ObjectToJSON(reqData)
if err != nil {
return fmt.Errorf("serialize lock request data failed, err: %w", err)
}

etcdOps := []clientv3.Op{
clientv3.OpPut(EtcdLockRequestIndex, index),
clientv3.OpPut(makeEtcdLockRequestKey(reqData.ID), string(reqBytes)),
}
txResp, err := a.etcdCli.Txn(ctx).Then(etcdOps...).Commit()
if err != nil {
return fmt.Errorf("submit lock request data failed, err: %w", err)
}
if !txResp.Succeeded {
return fmt.Errorf("submit lock request data failed for lock request data index changed")
}

return nil
}

func acquireEtcdRequestDataLock(ctx context.Context, etcdCli *clientv3.Client, etcdLockLeaseTimeSec int64) (unlock func(), err error) {
lease, err := etcdCli.Grant(context.Background(), etcdLockLeaseTimeSec)
if err != nil {
return nil, fmt.Errorf("grant lease failed, err: %w", err)
}

session, err := concurrency.NewSession(etcdCli, concurrency.WithLease(lease.ID))
if err != nil {
return nil, fmt.Errorf("new session failed, err: %w", err)
}

mutex := concurrency.NewMutex(session, EtcdLockRequestLock)

err = mutex.Lock(ctx)
if err != nil {
session.Close()
return nil, fmt.Errorf("acquire lock failed, err: %w", err)
}

return func() {
mutex.Unlock(context.Background())
session.Close()
}, nil
}

func getEtcdLockRequestIndex(ctx context.Context, etcdCli *clientv3.Client) (int64, error) {
indexKv, err := etcdCli.Get(ctx, EtcdLockRequestIndex)
if err != nil {
return 0, fmt.Errorf("get lock request index failed, err: %w", err)
}

if len(indexKv.Kvs) == 0 {
return 0, nil
}

index, err := strconv.ParseInt(string(indexKv.Kvs[0].Value), 0, 64)
if err != nil {
return 0, fmt.Errorf("parse lock request index failed, err: %w", err)
}

return index, nil
}

+ 4
- 12
pkgs/distlock/service/internal/lease_actor.go View File

@@ -21,7 +21,7 @@ type LeaseActor struct {

commandChan *actor.CommandChannel

mainActor *MainActor
releaseActor *ReleaseActor
}

func NewLeaseActor() *LeaseActor {
@@ -31,8 +31,8 @@ func NewLeaseActor() *LeaseActor {
}
}

func (a *LeaseActor) Init(mainActor *MainActor) {
a.mainActor = mainActor
func (a *LeaseActor) Init(releaseActor *ReleaseActor) {
a.releaseActor = releaseActor
}

func (a *LeaseActor) StartChecking() error {
@@ -113,15 +113,7 @@ func (a *LeaseActor) Serve() error {
// TODO 可以考虑打个日志
logger.Std.Infof("lock request %s is timeout, will release it", reqID)

// TODO 可以考虑让超时时间可配置
ctx, cancel := context.WithTimeout(context.Background(), time.Second*10)
err := a.mainActor.Release(ctx, reqID)
cancel()
if err == nil {
delete(a.leases, reqID)
} else {
logger.Std.Warnf("releasing lock request: %s", err.Error())
}
a.releaseActor.DelayRelease([]string{reqID})
}
}



+ 0
- 313
pkgs/distlock/service/internal/main_actor.go View File

@@ -1,313 +0,0 @@
package internal

import (
"context"
"fmt"
"strconv"

"gitlink.org.cn/cloudream/common/pkgs/actor"
"gitlink.org.cn/cloudream/common/pkgs/distlock"
"gitlink.org.cn/cloudream/common/utils/serder"
clientv3 "go.etcd.io/etcd/client/v3"
"go.etcd.io/etcd/client/v3/clientv3util"
"go.etcd.io/etcd/client/v3/concurrency"
)

const (
EtcdLockRequestData = "/distlock/lockRequest/data"
EtcdLockRequestIndex = "/distlock/lockRequest/index"
EtcdLockRequestLock = "/distlock/lockRequest/lock"
)

type lockData struct {
Path []string `json:"path"`
Name string `json:"name"`
Target string `json:"target"`
}

type acquireManyResult struct {
IsTried bool
RequestID string
Err error
}

type LockRequestData struct {
ID string `json:"id"`
Locks []lockData `json:"locks"`
}

type MainActor struct {
cfg *distlock.Config
etcdCli *clientv3.Client

commandChan *actor.CommandChannel

providersActor *ProvidersActor

lockRequestLeaseID clientv3.LeaseID
}

func NewMainActor(cfg *distlock.Config, etcdCli *clientv3.Client) *MainActor {
return &MainActor{
cfg: cfg,
etcdCli: etcdCli,
commandChan: actor.NewCommandChannel(),
}
}

func (a *MainActor) Init(providersActor *ProvidersActor) {
a.providersActor = providersActor
}

// Acquire 请求一批锁。成功后返回锁请求ID
func (a *MainActor) Acquire(ctx context.Context, req distlock.LockRequest) (reqID string, err error) {
rets, err := a.AcquireMany(ctx, []distlock.LockRequest{req})
if err != nil {
return "", err
}

if rets[0].Err != nil {
return "", rets[0].Err
}

return rets[0].RequestID, nil
}

// AcquireAny 尝试多个锁请求。目前的实现会在第一个获取成功后就直接返回
func (a *MainActor) AcquireMany(ctx context.Context, reqs []distlock.LockRequest) (rets []acquireManyResult, err error) {
return actor.WaitValue(context.TODO(), a.commandChan, func() ([]acquireManyResult, error) {
// TODO 根据不同的错误设置不同的错误类型,方便上层进行后续处理
unlock, err := a.acquireEtcdRequestDataLock(ctx)
if err != nil {
return nil, fmt.Errorf("acquire etcd request data lock failed, err: %w", err)
}
defer unlock()

index, err := a.getEtcdLockRequestIndex(ctx)
if err != nil {
return nil, err
}

// 等待本地状态同步到最新
// TODO 配置等待时间
err = a.providersActor.WaitIndexUpdated(ctx, index)
if err != nil {
return nil, err
}

rets := make([]acquireManyResult, len(reqs))
for i := 0; i < len(reqs); i++ {
// 测试锁,并获得锁数据
reqData, err := a.providersActor.TestLockRequestAndMakeData(reqs[i])
if err == nil {
nextIndexStr := strconv.FormatInt(index+1, 10)
reqData.ID = nextIndexStr

// 锁成功,提交锁数据
err := a.submitLockRequest(ctx, reqData)

rets[i] = acquireManyResult{
IsTried: true,
RequestID: nextIndexStr,
Err: err,
}

break

} else {
rets[i] = acquireManyResult{
IsTried: true,
Err: err,
}
}
}

return rets, nil
})
}

func (a *MainActor) submitLockRequest(ctx context.Context, reqData LockRequestData) error {
reqBytes, err := serder.ObjectToJSON(reqData)
if err != nil {
return fmt.Errorf("serialize lock request data failed, err: %w", err)
}

var etcdOps []clientv3.Op
if a.cfg.SubmitLockRequestWithoutLease {
etcdOps = []clientv3.Op{
clientv3.OpPut(EtcdLockRequestIndex, reqData.ID),
clientv3.OpPut(makeEtcdLockRequestKey(reqData.ID), string(reqBytes)),
}

} else {
etcdOps = []clientv3.Op{
clientv3.OpPut(EtcdLockRequestIndex, reqData.ID),
// 归属到当前连接的租约,在当前连接断开后,能自动解锁
// TODO 不能直接给RequestData上租约,因为如果在别的服务已经获取到锁的情况下,
// 如果当前服务崩溃,删除消息会立刻发送出去,这就破坏了锁的约定(在锁定期间其他服务不能修改数据)
clientv3.OpPut(makeEtcdLockRequestKey(reqData.ID), string(reqBytes)), //, clientv3.WithLease(a.lockRequestLeaseID)),
}
}
txResp, err := a.etcdCli.Txn(ctx).Then(etcdOps...).Commit()
if err != nil {
return fmt.Errorf("submit lock request data failed, err: %w", err)
}
if !txResp.Succeeded {
return fmt.Errorf("submit lock request data failed for lock request data index changed")
}

return nil
}

// Release 释放锁
func (a *MainActor) Release(ctx context.Context, reqID string) error {
return actor.Wait(context.TODO(), a.commandChan, func() error {
// TODO 根据不同的错误设置不同的错误类型,方便上层进行后续处理
unlock, err := a.acquireEtcdRequestDataLock(ctx)
if err != nil {
return fmt.Errorf("acquire etcd request data lock failed, err: %w", err)
}
defer unlock()

index, err := a.getEtcdLockRequestIndex(ctx)
if err != nil {
return err
}

lockReqKey := makeEtcdLockRequestKey(reqID)

txResp, err := a.etcdCli.Txn(ctx).
If(clientv3util.KeyExists(lockReqKey)).
Then(clientv3.OpDelete(lockReqKey), clientv3.OpPut(EtcdLockRequestIndex, strconv.FormatInt(index+1, 10))).Commit()
if err != nil {
return fmt.Errorf("updating lock request data index: %w", err)
}
if !txResp.Succeeded {
return fmt.Errorf("updating lock request data failed")
}

return nil
})
}

func (a *MainActor) acquireEtcdRequestDataLock(ctx context.Context) (unlock func(), err error) {
lease, err := a.etcdCli.Grant(context.Background(), a.cfg.EtcdLockLeaseTimeSec)
if err != nil {
return nil, fmt.Errorf("grant lease failed, err: %w", err)
}

session, err := concurrency.NewSession(a.etcdCli, concurrency.WithLease(lease.ID))
if err != nil {
return nil, fmt.Errorf("new session failed, err: %w", err)
}

mutex := concurrency.NewMutex(session, EtcdLockRequestLock)

err = mutex.Lock(ctx)
if err != nil {
session.Close()
return nil, fmt.Errorf("acquire lock failed, err: %w", err)
}

return func() {
mutex.Unlock(context.Background())
session.Close()
}, nil
}

func (a *MainActor) getEtcdLockRequestIndex(ctx context.Context) (int64, error) {
indexKv, err := a.etcdCli.Get(ctx, EtcdLockRequestIndex)
if err != nil {
return 0, fmt.Errorf("get lock request index failed, err: %w", err)
}

if len(indexKv.Kvs) == 0 {
return 0, nil
}

index, err := strconv.ParseInt(string(indexKv.Kvs[0].Value), 0, 64)
if err != nil {
return 0, fmt.Errorf("parse lock request index failed, err: %w", err)
}

return index, nil
}

func (a *MainActor) ReloadEtcdData() error {
return actor.Wait(context.TODO(), a.commandChan, func() error {
// 使用事务一次性获取index和锁数据,就不需要加全局锁了
txResp, err := a.etcdCli.Txn(context.Background()).
Then(
clientv3.OpGet(EtcdLockRequestIndex),
clientv3.OpGet(EtcdLockRequestData, clientv3.WithPrefix()),
).
Commit()
if err != nil {
return fmt.Errorf("get etcd data failed, err: %w", err)
}
if !txResp.Succeeded {
return fmt.Errorf("get etcd data failed")
}

indexKvs := txResp.Responses[0].GetResponseRange().Kvs
lockKvs := txResp.Responses[1].GetResponseRange().Kvs

var index int64
var reqData []LockRequestData

// 解析Index
if len(indexKvs) > 0 {
val, err := strconv.ParseInt(string(indexKvs[0].Value), 0, 64)
if err != nil {
return fmt.Errorf("parse lock request index failed, err: %w", err)
}
index = val

} else {
index = 0
}

// 解析锁请求数据
for _, kv := range lockKvs {
var req LockRequestData
err := serder.JSONToObject(kv.Value, &req)
if err != nil {
return fmt.Errorf("parse lock request data failed, err: %w", err)
}

reqData = append(reqData, req)
}

err = a.providersActor.ResetState(index, reqData)
if err != nil {
return fmt.Errorf("reset lock providers state failed, err: %w", err)
}

return nil
})
}

func (a *MainActor) Serve() error {
lease, err := a.etcdCli.Grant(context.Background(), a.cfg.EtcdLockLeaseTimeSec)
if err != nil {
return fmt.Errorf("grant lease failed, err: %w", err)
}
a.lockRequestLeaseID = lease.ID

cmdChan := a.commandChan.BeginChanReceive()
defer a.commandChan.CloseChanReceive()

for {
select {
case cmd, ok := <-cmdChan:
if !ok {
return fmt.Errorf("command channel closed")
}

// TODO Actor启动时,如果第一个调用的是Acquire,那么就会在Acquire中等待本地锁数据同步到最新。
// 此时命令的执行也会被阻塞,导致ReloadEtcdData命令无法执行,因此产生死锁,最后Acquire超时失败。
// 此处暂时使用单独的goroutine的来执行命令,避免阻塞。
go cmd()
}
}
}

+ 14
- 12
pkgs/distlock/service/internal/providers_actor.go View File

@@ -7,6 +7,7 @@ import (
"gitlink.org.cn/cloudream/common/pkgs/actor"
"gitlink.org.cn/cloudream/common/pkgs/distlock"
"gitlink.org.cn/cloudream/common/pkgs/future"
"gitlink.org.cn/cloudream/common/pkgs/logger"
"gitlink.org.cn/cloudream/common/pkgs/trie"
)

@@ -56,30 +57,31 @@ func (a *ProvidersActor) WaitIndexUpdated(ctx context.Context, index int64) erro
return fut.Wait(ctx)
}

func (a *ProvidersActor) ApplyLockRequestEvents(events []LockRequestEvent) error {
return actor.Wait(context.TODO(), a.commandChan, func() error {
func (a *ProvidersActor) ApplyLockRequestEvents(events []LockRequestEvent) {
a.commandChan.Send(func() {
for _, op := range events {
if op.IsLocking {
err := a.lockLockRequest(op.Data)
if err != nil {
return fmt.Errorf("lock by lock request data failed, err: %w", err)
// TODO 发生这种错误需要重新加载全量状态,下同
logger.Std.Warnf("applying locking event: %s", err.Error())
return
}

} else {
err := a.unlockLockRequest(op.Data)
if err != nil {
return fmt.Errorf("unlock by lock request data failed, err: %w", err)
logger.Std.Warnf("applying unlocking event: %s", err.Error())
return
}
}
}

// 处理了多少事件,Index就往后移动多少个
a.localLockReqIndex += int64(len(events))
// 处理了多少事件,Index就往后移动多少个
a.localLockReqIndex++
}

// 检查是否有等待同步进度的需求
a.checkIndexWaiter()

return nil
a.wakeUpIndexWaiter()
})
}

@@ -181,13 +183,13 @@ func (a *ProvidersActor) ResetState(index int64, lockRequestData []LockRequestDa
a.localLockReqIndex = index

// 检查是否有等待同步进度的需求
a.checkIndexWaiter()
a.wakeUpIndexWaiter()

return nil
})
}

func (a *ProvidersActor) checkIndexWaiter() {
func (a *ProvidersActor) wakeUpIndexWaiter() {
var resetWaiters []indexWaiter
for _, waiter := range a.indexWaiters {
if waiter.Index <= a.localLockReqIndex {


+ 144
- 0
pkgs/distlock/service/internal/release_actor.go View File

@@ -0,0 +1,144 @@
package internal

import (
"context"
"fmt"
"math/rand"
"strconv"
"sync"
"time"

"gitlink.org.cn/cloudream/common/pkgs/distlock"
"gitlink.org.cn/cloudream/common/pkgs/logger"
clientv3 "go.etcd.io/etcd/client/v3"
"go.etcd.io/etcd/client/v3/clientv3util"
)

const (
DefaultMaxReleaseingDelayMs = 4000
BaseReleaseingDelayMs = 1000
)

type ReleaseActor struct {
cfg *distlock.Config
etcdCli *clientv3.Client

releasingLockRequestIDs map[string]bool
timer *time.Timer
timerSetuped bool
lock sync.Mutex
}

func NewReleaseActor(cfg *distlock.Config, etcdCli *clientv3.Client) *ReleaseActor {
return &ReleaseActor{
cfg: cfg,
etcdCli: etcdCli,
releasingLockRequestIDs: make(map[string]bool),
}
}

// 立刻尝试释放这些锁。一般用于在用户主动释放了一个锁之后
func (a *ReleaseActor) Release(reqIDs []string) {
a.lock.Lock()
defer a.lock.Unlock()

for _, id := range reqIDs {
a.releasingLockRequestIDs[id] = true
}

// TODO 处理错误
err := a.doReleasing()
if err != nil {
logger.Std.Debugf("doing releasing: %s", err.Error())
}

a.setupTimer()
}

// 延迟释放锁。一般用于清理崩溃的锁服务遗留下来的锁
func (a *ReleaseActor) DelayRelease(reqIDs []string) {
a.lock.Lock()
defer a.lock.Unlock()

for _, id := range reqIDs {
a.releasingLockRequestIDs[id] = true
}

a.setupTimer()
}

func (a *ReleaseActor) doReleasing() error {
ctx := context.TODO()

if len(a.releasingLockRequestIDs) == 0 {
return nil
}

// TODO 根据不同的错误设置不同的错误类型,方便上层进行后续处理
unlock, err := acquireEtcdRequestDataLock(ctx, a.etcdCli, a.cfg.EtcdLockLeaseTimeSec)
if err != nil {
return fmt.Errorf("acquire etcd request data lock failed, err: %w", err)
}
defer unlock()

index, err := getEtcdLockRequestIndex(ctx, a.etcdCli)
if err != nil {
return err
}

// TODO 可以考虑优化成一次性删除多个锁
for id := range a.releasingLockRequestIDs {
lockReqKey := makeEtcdLockRequestKey(id)

txResp, err := a.etcdCli.Txn(ctx).
If(clientv3util.KeyExists(lockReqKey)).
Then(clientv3.OpDelete(lockReqKey), clientv3.OpPut(EtcdLockRequestIndex, strconv.FormatInt(index+1, 10))).Commit()
if err != nil {
return fmt.Errorf("updating lock request data: %w", err)
}
// 只有确实删除了锁数据,才更新index
if txResp.Succeeded {
index++
}
delete(a.releasingLockRequestIDs, id)
}

return nil
}

func (a *ReleaseActor) setupTimer() {
if len(a.releasingLockRequestIDs) == 0 {
return
}

if a.timerSetuped {
return
}
a.timerSetuped = true

delay := int64(0)
if a.cfg.RandomReleasingDelayMs == 0 {
delay = rand.Int63n(DefaultMaxReleaseingDelayMs)
} else {
delay = rand.Int63n(a.cfg.RandomReleasingDelayMs)
}

if a.timer == nil {
a.timer = time.NewTimer(time.Duration(delay+BaseReleaseingDelayMs) * time.Millisecond)
} else {
a.timer.Reset(time.Duration(delay+BaseReleaseingDelayMs) * time.Millisecond)
}

go func() {
<-a.timer.C
a.timerSetuped = false

// TODO 处理错误
err := a.doReleasing()
if err != nil {
logger.Std.Debugf("doing releasing: %s", err.Error())
}

a.setupTimer()
}()
}

+ 0
- 128
pkgs/distlock/service/internal/retry_actor.go View File

@@ -1,128 +0,0 @@
package internal

import (
"context"
"fmt"

"github.com/samber/lo"
"gitlink.org.cn/cloudream/common/pkgs/actor"
"gitlink.org.cn/cloudream/common/pkgs/distlock"
"gitlink.org.cn/cloudream/common/pkgs/future"
"gitlink.org.cn/cloudream/common/pkgs/logger"
mylo "gitlink.org.cn/cloudream/common/utils/lo"
)

type retryInfo struct {
Callback *future.SetValueFuture[string]
LastErr error
}

type RetryActor struct {
retrys []distlock.LockRequest
retryInfos []*retryInfo

commandChan *actor.CommandChannel

mainActor *MainActor
}

func NewRetryActor() *RetryActor {
return &RetryActor{
commandChan: actor.NewCommandChannel(),
}
}

func (a *RetryActor) Init(mainActor *MainActor) {
a.mainActor = mainActor
}

func (a *RetryActor) Retry(ctx context.Context, req distlock.LockRequest, lastErr error) (future.ValueFuture[string], error) {
fut := future.NewSetValue[string]()

var info *retryInfo
err := actor.Wait(ctx, a.commandChan, func() error {
a.retrys = append(a.retrys, req)
info = &retryInfo{
Callback: fut,
LastErr: lastErr,
}
a.retryInfos = append(a.retryInfos, info)
return nil
})
if err != nil {
return nil, err
}

go func() {
<-ctx.Done()
a.commandChan.Send(func() {
// 由于只可能在cmd中修改future状态,所以此处的IsComplete判断可以作为后续操作的依据
if fut.IsComplete() {
return
}

index := lo.IndexOf(a.retryInfos, info)
if index == -1 {
return
}

a.retryInfos[index].Callback.SetError(a.retryInfos[index].LastErr)

a.retrys = mylo.RemoveAt(a.retrys, index)
a.retryInfos = mylo.RemoveAt(a.retryInfos, index)
})
}()

return fut, nil
}

func (a *RetryActor) OnLocalStateUpdated() {
a.commandChan.Send(func() {
if len(a.retrys) == 0 {
return
}

rets, err := a.mainActor.AcquireMany(context.Background(), a.retrys)
if err != nil {
// TODO 处理错误
logger.Std.Warnf("acquire many lock requests failed, err: %s", err.Error())
return
}

// 根据尝试的结果更新状态
delCnt := 0
for i, ret := range rets {
a.retrys[i-delCnt] = a.retrys[i]
a.retryInfos[i-delCnt] = a.retryInfos[i]

if !ret.IsTried {
continue
}

if ret.Err != nil {
a.retryInfos[i].LastErr = ret.Err
} else {
a.retryInfos[i].Callback.SetValue(ret.RequestID)
delCnt++
}
}
a.retrys = a.retrys[:len(a.retrys)-delCnt]
a.retryInfos = a.retryInfos[:len(a.retryInfos)-delCnt]
})
}

func (a *RetryActor) Serve() error {
cmdChan := a.commandChan.BeginChanReceive()
defer a.commandChan.CloseChanReceive()

for {
select {
case cmd, ok := <-cmdChan:
if !ok {
return fmt.Errorf("command channel closed")
}

cmd()
}
}
}

+ 2
- 2
pkgs/distlock/service/internal/watch_etcd_actor.go View File

@@ -37,9 +37,9 @@ func NewWatchEtcdActor(etcdCli *clientv3.Client) *WatchEtcdActor {
func (a *WatchEtcdActor) Init() {
}

func (a *WatchEtcdActor) StartWatching() error {
func (a *WatchEtcdActor) StartWatching(revision int64) error {
return actor.Wait(context.TODO(), a.commandChan, func() error {
a.watchChan = a.etcdCli.Watch(context.Background(), EtcdLockRequestData, clientv3.WithPrefix(), clientv3.WithPrevKV())
a.watchChan = a.etcdCli.Watch(context.Background(), EtcdLockRequestData, clientv3.WithPrefix(), clientv3.WithPrevKV(), clientv3.WithRev(revision))
return nil
})
}


+ 2
- 2
pkgs/distlock/service/mutex.go View File

@@ -25,6 +25,6 @@ func (m *Mutex) Lock() error {
return nil
}

func (m *Mutex) Unlock() error {
return m.svc.Release(m.lockReqID)
func (m *Mutex) Unlock() {
m.svc.Release(m.lockReqID)
}

+ 69
- 46
pkgs/distlock/service/service.go View File

@@ -3,11 +3,13 @@ package service
import (
"context"
"fmt"
"strconv"
"time"

"gitlink.org.cn/cloudream/common/pkgs/distlock"
"gitlink.org.cn/cloudream/common/pkgs/distlock/service/internal"
"gitlink.org.cn/cloudream/common/pkgs/logger"
"gitlink.org.cn/cloudream/common/utils/serder"
clientv3 "go.etcd.io/etcd/client/v3"
)

@@ -46,11 +48,11 @@ type Service struct {
cfg *distlock.Config
etcdCli *clientv3.Client

mainActor *internal.MainActor
acquireActor *internal.AcquireActor
releaseActor *internal.ReleaseActor
providersActor *internal.ProvidersActor
watchEtcdActor *internal.WatchEtcdActor
leaseActor *internal.LeaseActor
retryActor *internal.RetryActor

lockReqEventWatcher internal.LockRequestEventWatcher
}
@@ -66,17 +68,16 @@ func NewService(cfg *distlock.Config, initProvs []PathProvider) (*Service, error
return nil, fmt.Errorf("new etcd client failed, err: %w", err)
}

mainActor := internal.NewMainActor(cfg, etcdCli)
acquireActor := internal.NewAcquireActor(cfg, etcdCli)
releaseActor := internal.NewReleaseActor(cfg, etcdCli)
providersActor := internal.NewProvidersActor()
watchEtcdActor := internal.NewWatchEtcdActor(etcdCli)
leaseActor := internal.NewLeaseActor()
retryActor := internal.NewRetryActor()

mainActor.Init(providersActor)
acquireActor.Init(providersActor)
providersActor.Init()
watchEtcdActor.Init()
leaseActor.Init(mainActor)
retryActor.Init(mainActor)
leaseActor.Init(releaseActor)

for _, prov := range initProvs {
providersActor.AddProvider(prov.Provider, prov.Path...)
@@ -85,11 +86,11 @@ func NewService(cfg *distlock.Config, initProvs []PathProvider) (*Service, error
return &Service{
cfg: cfg,
etcdCli: etcdCli,
mainActor: mainActor,
acquireActor: acquireActor,
releaseActor: releaseActor,
providersActor: providersActor,
watchEtcdActor: watchEtcdActor,
leaseActor: leaseActor,
retryActor: retryActor,
}, nil
}

@@ -109,18 +110,9 @@ func (svc *Service) Acquire(req distlock.LockRequest, opts ...AcquireOptionFn) (
defer cancel()
}

reqID, err := svc.mainActor.Acquire(ctx, req)
reqID, err := svc.acquireActor.Acquire(ctx, req)
if err != nil {
fut, err := svc.retryActor.Retry(ctx, req, err)
if err != nil {
return "", fmt.Errorf("retrying failed, err: %w", err)
}

// Retry 如果超时,Retry内部会设置fut为Failed,所以这里可以用Background无限等待
reqID, err = fut.WaitValue(context.Background())
if err != nil {
return "", err
}
return "", err
}

if opt.Lease > 0 {
@@ -140,16 +132,14 @@ func (svc *Service) Renew(reqID string) error {
}

// Release 释放锁
func (svc *Service) Release(reqID string) error {
err := svc.mainActor.Release(context.TODO(), reqID)
func (svc *Service) Release(reqID string) {
svc.releaseActor.Release([]string{reqID})

// TODO 不影响结果,但考虑打日志
err2 := svc.leaseActor.Remove(reqID)
if err2 != nil {
logger.Std.Warnf("removing lease: %s", err2.Error())
err := svc.leaseActor.Remove(reqID)
if err != nil {
logger.Std.Warnf("removing lease: %s", err.Error())
}

return err
}

func (svc *Service) Serve() error {
@@ -174,14 +164,6 @@ func (svc *Service) Serve() error {
}
}()

go func() {
// TODO 处理错误
err := svc.mainActor.Serve()
if err != nil {
logger.Std.Warnf("serving main actor failed, err: %s", err.Error())
}
}()

go func() {
// TODO 处理错误
err := svc.leaseActor.Serve()
@@ -190,23 +172,15 @@ func (svc *Service) Serve() error {
}
}()

go func() {
// TODO 处理错误
err := svc.retryActor.Serve()
if err != nil {
logger.Std.Warnf("serving retry actor failed, err: %s", err.Error())
}
}()

err := svc.mainActor.ReloadEtcdData()
revision, err := svc.loadState()
if err != nil {
// TODO 关闭其他的Actor,或者更好的错误处理方式
return fmt.Errorf("init data failed, err: %w", err)
}

svc.lockReqEventWatcher.OnEvent = func(events []internal.LockRequestEvent) {
svc.acquireActor.TryAcquireNow()
svc.providersActor.ApplyLockRequestEvents(events)
svc.retryActor.OnLocalStateUpdated()
}
err = svc.watchEtcdActor.AddEventWatcher(&svc.lockReqEventWatcher)
if err != nil {
@@ -214,7 +188,7 @@ func (svc *Service) Serve() error {
return fmt.Errorf("add lock request event watcher failed, err: %w", err)
}

err = svc.watchEtcdActor.StartWatching()
err = svc.watchEtcdActor.StartWatching(revision)
if err != nil {
// TODO 关闭其他的Actor,或者更好的错误处理方式
return fmt.Errorf("start watching etcd failed, err: %w", err)
@@ -232,3 +206,52 @@ func (svc *Service) Serve() error {

return nil
}

func (svc *Service) loadState() (int64, error) {
// 使用事务一次性获取index和锁数据,就不需要加全局锁了
txResp, err := svc.etcdCli.Txn(context.Background()).
Then(
clientv3.OpGet(internal.EtcdLockRequestIndex),
clientv3.OpGet(internal.EtcdLockRequestData, clientv3.WithPrefix()),
).
Commit()
if err != nil {
return 0, fmt.Errorf("get etcd data failed, err: %w", err)
}

indexKvs := txResp.Responses[0].GetResponseRange().Kvs
lockKvs := txResp.Responses[1].GetResponseRange().Kvs

var index int64
var reqData []internal.LockRequestData

// 解析Index
if len(indexKvs) > 0 {
val, err := strconv.ParseInt(string(indexKvs[0].Value), 0, 64)
if err != nil {
return 0, fmt.Errorf("parse lock request index failed, err: %w", err)
}
index = val

} else {
index = 0
}

// 解析锁请求数据
for _, kv := range lockKvs {
var req internal.LockRequestData
err := serder.JSONToObject(kv.Value, &req)
if err != nil {
return 0, fmt.Errorf("parse lock request data failed, err: %w", err)
}

reqData = append(reqData, req)
}

err = svc.providersActor.ResetState(index, reqData)
if err != nil {
return 0, fmt.Errorf("reset lock providers state failed, err: %w", err)
}

return txResp.Header.Revision, nil
}

+ 2
- 0
pkgs/future/set_value_future.go View File

@@ -48,6 +48,8 @@ func (f *SetValueFuture[T]) IsComplete() bool {
return f.isCompleted
}

// 等待直到Complete或者ctx被取消。
// 注:返回ErrContextCancelled不代表产生结果的过程没有执行过,甚至不代表Future没有Complete
func (f *SetValueFuture[T]) Wait(ctx context.Context) error {
select {
case <-f.completeChan:


Loading…
Cancel
Save