Browse Source

Merge pull request '修复锁服务遗留问题' (#29) from feature_gxh into master

pull/36/head
baohan 2 years ago
parent
commit
dcf9f74d8e
21 changed files with 1508 additions and 1293 deletions
  1. +0
    -13
      pkgs/distlock/config.go
  2. +8
    -30
      pkgs/distlock/distlock.go
  3. +282
    -0
      pkgs/distlock/internal/acquire_actor.go
  4. +11
    -0
      pkgs/distlock/internal/config.go
  5. +9
    -26
      pkgs/distlock/internal/lease_actor.go
  6. +77
    -0
      pkgs/distlock/internal/models.go
  7. +202
    -0
      pkgs/distlock/internal/providers_actor.go
  8. +223
    -0
      pkgs/distlock/internal/release_actor.go
  9. +196
    -0
      pkgs/distlock/internal/service_info_actor.go
  10. +190
    -0
      pkgs/distlock/internal/watch_etcd_actor.go
  11. +6
    -6
      pkgs/distlock/mutex.go
  12. +300
    -0
      pkgs/distlock/service.go
  13. +0
    -313
      pkgs/distlock/service/internal/main_actor.go
  14. +0
    -216
      pkgs/distlock/service/internal/providers_actor.go
  15. +0
    -128
      pkgs/distlock/service/internal/retry_actor.go
  16. +0
    -116
      pkgs/distlock/service/internal/utils.go
  17. +0
    -61
      pkgs/distlock/service/internal/utils_test.go
  18. +0
    -149
      pkgs/distlock/service/internal/watch_etcd_actor.go
  19. +0
    -234
      pkgs/distlock/service/service.go
  20. +2
    -0
      pkgs/future/set_value_future.go
  21. +2
    -1
      sdks/imfs/models.go

+ 0
- 13
pkgs/distlock/config.go View File

@@ -1,13 +0,0 @@
package distlock

type Config struct {
EtcdAddress string `json:"etcdAddress"`
EtcdUsername string `json:"etcdUsername"`
EtcdPassword string `json:"etcdPassword"`

EtcdLockLeaseTimeSec int64 `json:"etcdLockLeaseTimeSec"` // 全局锁的租约时间。锁服务会在这个时间内自动续约锁,但如果服务崩溃,则其他服务在租约到期后能重新获得锁。

// 写入锁请求数据到的ETCD的时候,不设置租约。开启此选项之后,请求锁的服务崩溃,
// 锁请求数据会依然留在ETCD中。仅供调试使用。
SubmitLockRequestWithoutLease bool `json:"submitLockRequestWithoutLease"`
}

+ 8
- 30
pkgs/distlock/distlock.go View File

@@ -1,40 +1,18 @@
package distlock

import "fmt"
import (
"fmt"

type Lock struct {
Path []string // 锁路径,存储的是路径的每一部分
Name string // 锁名
Target any // 锁对象,由具体的Provider去解析
}

type LockRequest struct {
Locks []Lock
}

func (b *LockRequest) Add(lock Lock) {
b.Locks = append(b.Locks, lock)
}

type LockProvider interface {
// CanLock 判断这个锁能否锁定成功
CanLock(lock Lock) error
"gitlink.org.cn/cloudream/common/pkgs/distlock/internal"
)

// Lock 锁定。由于同一个锁请求内的锁不检查冲突,因此这个函数必须支持有冲突的锁进行锁定。
Lock(reqID string, lock Lock) error
type Lock = internal.Lock

// 解锁
Unlock(reqID string, lock Lock) error
type LockRequest = internal.LockRequest

// GetTargetString 将锁对象序列化为字符串,方便存储到ETCD
GetTargetString(target any) (string, error)
type LockProvider = internal.LockProvider

// ParseTargetString 解析字符串格式的锁对象数据
ParseTargetString(targetStr string) (any, error)

// Clear 清除内部所有状态
Clear()
}
type Config = internal.Config

type LockTargetBusyError struct {
lockName string


+ 282
- 0
pkgs/distlock/internal/acquire_actor.go View File

@@ -0,0 +1,282 @@
package internal

import (
"context"
"errors"
"fmt"
"strconv"
"sync"
"time"

"gitlink.org.cn/cloudream/common/pkgs/future"
"gitlink.org.cn/cloudream/common/pkgs/logger"
mylo "gitlink.org.cn/cloudream/common/utils/lo"
"gitlink.org.cn/cloudream/common/utils/serder"
clientv3 "go.etcd.io/etcd/client/v3"
"go.etcd.io/etcd/client/v3/concurrency"
)

var ErrAcquiringTimeout = errors.New("acquiring timeout")

type acquireInfo struct {
Request LockRequest
Callback *future.SetValueFuture[string]
LastErr error
}

type AcquireActor struct {
cfg *Config
etcdCli *clientv3.Client
providersActor *ProvidersActor

isMaintenance bool
serviceID string
acquirings []*acquireInfo
lock sync.Mutex
doAcquiringChan chan any
}

func NewAcquireActor(cfg *Config, etcdCli *clientv3.Client) *AcquireActor {
return &AcquireActor{
cfg: cfg,
etcdCli: etcdCli,
isMaintenance: true,
doAcquiringChan: make(chan any),
}
}

func (a *AcquireActor) Init(providersActor *ProvidersActor) {
a.providersActor = providersActor
}

// Acquire 请求一批锁。成功后返回锁请求ID
func (a *AcquireActor) Acquire(ctx context.Context, req LockRequest) (string, error) {
info := &acquireInfo{
Request: req,
Callback: future.NewSetValue[string](),
}

func() {
a.lock.Lock()
defer a.lock.Unlock()

a.acquirings = append(a.acquirings, info)

// 如果处于维护模式,那么只接受请求,不实际去处理
if a.isMaintenance {
return
}

select {
case a.doAcquiringChan <- nil:
default:
}
}()

go func() {
info.Callback.Wait(ctx)

a.lock.Lock()
defer a.lock.Unlock()

// 调用Callback时都加了锁,所以此处的IsComplete判断可以作为后续操作的依据
if info.Callback.IsComplete() {
return
}

a.acquirings = mylo.Remove(a.acquirings, info)
if info.LastErr != nil {
info.Callback.SetError(info.LastErr)
} else {
info.Callback.SetError(ErrAcquiringTimeout)
}
}()

// 此处不能直接用ctx去等Callback,原因是Wait超时不代表锁没有获取到,这会导致锁泄露。
return info.Callback.WaitValue(context.Background())
}

// TryAcquireNow 重试一下内部还没有成功的锁请求。不会阻塞调用者
func (a *AcquireActor) TryAcquireNow() {
go func() {
a.lock.Lock()
defer a.lock.Unlock()

// 处于维护模式中时,即使是主动触发Acqurire也不予理会
if a.isMaintenance {
return
}

select {
case a.doAcquiringChan <- nil:
default:
}
}()
}

// 进入维护模式。维护模式期间只接受请求,不处理请求。
func (a *AcquireActor) EnterMaintenance() {
a.lock.Lock()
defer a.lock.Unlock()

a.isMaintenance = true
}

// 退出维护模式。退出之后建议调用一下TryAcquireNow。
func (a *AcquireActor) LeaveMaintenance() {
a.lock.Lock()
defer a.lock.Unlock()

a.isMaintenance = false
}

func (a *AcquireActor) ResetState(serviceID string) {
a.lock.Lock()
defer a.lock.Unlock()

a.serviceID = serviceID
}

func (a *AcquireActor) Serve() {
for {
select {
case <-a.doAcquiringChan:
err := a.doAcquiring()
if err != nil {
logger.Std.Debugf("doing acquiring: %s", err.Error())
}
}
}
}

func (a *AcquireActor) doAcquiring() error {
ctx := context.Background()

// 先看一眼,如果没有需要请求的锁,就不用走后面的流程了
a.lock.Lock()
if len(a.acquirings) == 0 {
a.lock.Unlock()
return nil
}
a.lock.Unlock()

// 在获取全局锁的时候不用锁Actor,只有获取成功了,才加锁
// TODO 根据不同的错误设置不同的错误类型,方便上层进行后续处理
unlock, err := acquireEtcdRequestDataLock(ctx, a.etcdCli, a.cfg.EtcdLockLeaseTimeSec)
if err != nil {
return fmt.Errorf("acquire etcd request data lock failed, err: %w", err)
}
defer unlock()

index, err := getEtcdLockRequestIndex(ctx, a.etcdCli)
if err != nil {
return err
}

logger.Std.Infof("wait to: %d", index)

// 等待本地状态同步到最新
// TODO 配置等待时间
err = a.providersActor.WaitLocalIndexTo(ctx, index)
if err != nil {
return err
}

a.lock.Lock()
defer a.lock.Unlock()
// TODO 可以考虑一次性获得多个锁
for i := 0; i < len(a.acquirings); i++ {
req := a.acquirings[i]

// 测试锁,并获得锁数据
reqData, err := a.providersActor.TestLockRequestAndMakeData(req.Request)
logger.Std.Infof("6")
if err != nil {
req.LastErr = err
continue
}

nextIndexStr := strconv.FormatInt(index+1, 10)
reqData.ID = nextIndexStr
reqData.SerivceID = a.serviceID
reqData.Reason = req.Request.Reason
reqData.Timestamp = time.Now().Unix()

// 锁成功,提交锁数据
err = a.submitLockRequest(ctx, nextIndexStr, reqData)
if err != nil {
req.LastErr = err
continue
}

req.Callback.SetValue(reqData.ID)
a.acquirings = mylo.RemoveAt(a.acquirings, i)
break
}

return nil
}

func (a *AcquireActor) submitLockRequest(ctx context.Context, index string, reqData LockRequestData) error {
reqBytes, err := serder.ObjectToJSON(reqData)
if err != nil {
return fmt.Errorf("serialize lock request data failed, err: %w", err)
}

etcdOps := []clientv3.Op{
clientv3.OpPut(EtcdLockRequestIndex, index),
clientv3.OpPut(MakeEtcdLockRequestKey(reqData.ID), string(reqBytes)),
}
txResp, err := a.etcdCli.Txn(ctx).Then(etcdOps...).Commit()
if err != nil {
return fmt.Errorf("submit lock request data failed, err: %w", err)
}
if !txResp.Succeeded {
return fmt.Errorf("submit lock request data failed for lock request data index changed")
}

return nil
}

func acquireEtcdRequestDataLock(ctx context.Context, etcdCli *clientv3.Client, etcdLockLeaseTimeSec int64) (unlock func(), err error) {
lease, err := etcdCli.Grant(context.Background(), etcdLockLeaseTimeSec)
if err != nil {
return nil, fmt.Errorf("grant lease failed, err: %w", err)
}

session, err := concurrency.NewSession(etcdCli, concurrency.WithLease(lease.ID))
if err != nil {
return nil, fmt.Errorf("new session failed, err: %w", err)
}

mutex := concurrency.NewMutex(session, EtcdLockRequestLock)

err = mutex.Lock(ctx)
if err != nil {
session.Close()
return nil, fmt.Errorf("acquire lock failed, err: %w", err)
}

return func() {
mutex.Unlock(context.Background())
session.Close()
}, nil
}

func getEtcdLockRequestIndex(ctx context.Context, etcdCli *clientv3.Client) (int64, error) {
indexKv, err := etcdCli.Get(ctx, EtcdLockRequestIndex)
if err != nil {
return 0, fmt.Errorf("get lock request index failed, err: %w", err)
}

if len(indexKv.Kvs) == 0 {
return 0, nil
}

index, err := strconv.ParseInt(string(indexKv.Kvs[0].Value), 0, 64)
if err != nil {
return 0, fmt.Errorf("parse lock request index failed, err: %w", err)
}

return index, nil
}

+ 11
- 0
pkgs/distlock/internal/config.go View File

@@ -0,0 +1,11 @@
package internal

type Config struct {
EtcdAddress string `json:"etcdAddress"`
EtcdUsername string `json:"etcdUsername"`
EtcdPassword string `json:"etcdPassword"`

EtcdLockLeaseTimeSec int64 `json:"etcdLockLeaseTimeSec"` // 全局锁的租约时间。锁服务会在这个时间内自动续约锁,但如果服务崩溃,则其他服务在租约到期后能重新获得锁。
RandomReleasingDelayMs int64 `json:"randomReleasingDelayMs"` // 释放锁失败,随机延迟之后再次尝试。延迟时间=random(0, RandomReleasingDelayMs) + 最少延迟时间(1000ms)
ServiceDescription string `json:"serviceDescription"` // 锁服务描述信息,锁服务启动后会注册到Etcd中
}

pkgs/distlock/service/internal/lease_actor.go → pkgs/distlock/internal/lease_actor.go View File

@@ -21,7 +21,7 @@ type LeaseActor struct {

commandChan *actor.CommandChannel

mainActor *MainActor
releaseActor *ReleaseActor
}

func NewLeaseActor() *LeaseActor {
@@ -31,18 +31,18 @@ func NewLeaseActor() *LeaseActor {
}
}

func (a *LeaseActor) Init(mainActor *MainActor) {
a.mainActor = mainActor
func (a *LeaseActor) Init(releaseActor *ReleaseActor) {
a.releaseActor = releaseActor
}

func (a *LeaseActor) StartChecking() error {
func (a *LeaseActor) Start() error {
return actor.Wait(context.TODO(), a.commandChan, func() error {
a.ticker = time.NewTicker(time.Second)
return nil
})
}

func (a *LeaseActor) StopChecking() error {
func (a *LeaseActor) Stop() error {
return actor.Wait(context.TODO(), a.commandChan, func() error {
if a.ticker != nil {
a.ticker.Stop()
@@ -91,19 +91,14 @@ func (a *LeaseActor) Remove(reqID string) error {
})
}

func (a *LeaseActor) Serve() error {
func (a *LeaseActor) Serve() {
cmdChan := a.commandChan.BeginChanReceive()
defer a.commandChan.CloseChanReceive()

for {
if a.ticker != nil {
select {
case cmd, ok := <-cmdChan:
if !ok {
a.ticker.Stop()
return fmt.Errorf("command chan closed")
}

case cmd := <-cmdChan:
cmd()

case now := <-a.ticker.C:
@@ -113,26 +108,14 @@ func (a *LeaseActor) Serve() error {
// TODO 可以考虑打个日志
logger.Std.Infof("lock request %s is timeout, will release it", reqID)

// TODO 可以考虑让超时时间可配置
ctx, cancel := context.WithTimeout(context.Background(), time.Second*10)
err := a.mainActor.Release(ctx, reqID)
cancel()
if err == nil {
delete(a.leases, reqID)
} else {
logger.Std.Warnf("releasing lock request: %s", err.Error())
}
a.releaseActor.DelayRelease([]string{reqID})
}
}

}
} else {
select {
case cmd, ok := <-cmdChan:
if !ok {
return fmt.Errorf("command chan closed")
}

case cmd := <-cmdChan:
cmd()
}
}

+ 77
- 0
pkgs/distlock/internal/models.go View File

@@ -0,0 +1,77 @@
package internal

import "strings"

const (
EtcdLockRequestDataPrefix = "/distlock/lockRequest/data"
EtcdLockRequestIndex = "/distlock/lockRequest/index"
EtcdLockRequestLock = "/distlock/lockRequest/lock"
EtcdServiceInfoPrefix = "/distlock/services"
EtcdWatchPrefix = "/distlock"
)

type Lock struct {
Path []string // 锁路径,存储的是路径的每一部分
Name string // 锁名
Target any // 锁对象,由具体的Provider去解析
}

type LockRequest struct {
Reason string
Locks []Lock
}

func (b *LockRequest) Add(lock Lock) {
b.Locks = append(b.Locks, lock)
}

type LockProvider interface {
// CanLock 判断这个锁能否锁定成功
CanLock(lock Lock) error

// Lock 锁定。由于同一个锁请求内的锁不检查冲突,因此这个函数必须支持有冲突的锁进行锁定。
Lock(reqID string, lock Lock) error

// 解锁
Unlock(reqID string, lock Lock) error

// GetTargetString 将锁对象序列化为字符串,方便存储到ETCD
GetTargetString(target any) (string, error)

// ParseTargetString 解析字符串格式的锁对象数据
ParseTargetString(targetStr string) (any, error)

// Clear 清除内部所有状态
Clear()
}

type lockData struct {
Path []string `json:"path"`
Name string `json:"name"`
Target string `json:"target"`
}

type LockRequestData struct {
ID string `json:"id"`
SerivceID string `json:"serviceID"`
Reason string `json:"reason"`
Timestamp int64 `json:"timestamp"`
Locks []lockData `json:"locks"`
}

func MakeEtcdLockRequestKey(reqID string) string {
return EtcdLockRequestDataPrefix + "/" + reqID
}

func GetLockRequestID(key string) string {
return strings.TrimPrefix(key, EtcdLockRequestDataPrefix+"/")
}

func MakeServiceInfoKey(svcID string) string {
return EtcdServiceInfoPrefix + "/" + svcID
}

type ServiceInfo struct {
ID string `json:"id"`
Description string `json:"description"`
}

+ 202
- 0
pkgs/distlock/internal/providers_actor.go View File

@@ -0,0 +1,202 @@
package internal

import (
"context"
"errors"
"fmt"
"sync"

"gitlink.org.cn/cloudream/common/pkgs/future"
"gitlink.org.cn/cloudream/common/pkgs/trie"
)

var ErrWaitIndexUpdateTimeout = errors.New("waitting local index updating timeout")

type indexWaiter struct {
Index int64
Callback *future.SetVoidFuture
}

type ProvidersActor struct {
localLockReqIndex int64
provdersTrie trie.Trie[LockProvider]
allProviders []LockProvider

indexWaiters []indexWaiter
lock sync.Mutex
}

func NewProvidersActor() *ProvidersActor {
return &ProvidersActor{}
}

func (a *ProvidersActor) AddProvider(prov LockProvider, path ...any) {
a.provdersTrie.Create(path).Value = prov
a.allProviders = append(a.allProviders, prov)
}

func (a *ProvidersActor) Init() {
}

func (a *ProvidersActor) WaitLocalIndexTo(ctx context.Context, index int64) error {
fut := future.NewSetVoid()

a.lock.Lock()
if index <= a.localLockReqIndex {
fut.SetVoid()
} else {
a.indexWaiters = append(a.indexWaiters, indexWaiter{
Index: index,
Callback: fut,
})
}
a.lock.Unlock()

return fut.Wait(ctx)
}

func (a *ProvidersActor) OnLockRequestEvent(evt LockRequestEvent) error {
a.lock.Lock()
defer a.lock.Unlock()

if evt.IsLocking {
err := a.lockLockRequest(evt.Data)
if err != nil {
return fmt.Errorf("applying locking event: %w", err)
}

} else {
err := a.unlockLockRequest(evt.Data)
if err != nil {
return fmt.Errorf("applying unlocking event: %w", err)
}
}

a.localLockReqIndex++
// 检查是否有等待同步进度的需求
a.wakeUpIndexWaiter()
return nil
}

func (svc *ProvidersActor) lockLockRequest(reqData LockRequestData) error {
for _, lockData := range reqData.Locks {
node, ok := svc.provdersTrie.WalkEnd(lockData.Path)
if !ok || node.Value == nil {
return fmt.Errorf("lock provider not found for path %v", lockData.Path)
}

target, err := node.Value.ParseTargetString(lockData.Target)
if err != nil {
return fmt.Errorf("parse target data failed, err: %w", err)
}

err = node.Value.Lock(reqData.ID, Lock{
Path: lockData.Path,
Name: lockData.Name,
Target: target,
})
if err != nil {
return fmt.Errorf("locking failed, err: %w", err)
}
}
return nil
}

func (svc *ProvidersActor) unlockLockRequest(reqData LockRequestData) error {
for _, lockData := range reqData.Locks {
node, ok := svc.provdersTrie.WalkEnd(lockData.Path)
if !ok || node.Value == nil {
return fmt.Errorf("lock provider not found for path %v", lockData.Path)
}

target, err := node.Value.ParseTargetString(lockData.Target)
if err != nil {
return fmt.Errorf("parse target data failed, err: %w", err)
}

err = node.Value.Unlock(reqData.ID, Lock{
Path: lockData.Path,
Name: lockData.Name,
Target: target,
})
if err != nil {
return fmt.Errorf("unlocking failed, err: %w", err)
}
}
return nil
}

// TestLockRequestAndMakeData 判断锁能否锁成功,并生成锁数据的字符串表示。注:不会生成请求ID。
// 在检查单个锁是否能上锁时,不会考虑同一个锁请求中的其他的锁影响。简单来说,就是同一个请求中的锁可以互相冲突。
func (a *ProvidersActor) TestLockRequestAndMakeData(req LockRequest) (LockRequestData, error) {
a.lock.Lock()
defer a.lock.Unlock()

reqData := LockRequestData{}

for _, lock := range req.Locks {
n, ok := a.provdersTrie.WalkEnd(lock.Path)
if !ok || n.Value == nil {
return LockRequestData{}, fmt.Errorf("lock provider not found for path %v", lock.Path)
}

err := n.Value.CanLock(lock)
if err != nil {
return LockRequestData{}, err
}

targetStr, err := n.Value.GetTargetString(lock.Target)
if err != nil {
return LockRequestData{}, fmt.Errorf("get lock target string failed, err: %w", err)
}

reqData.Locks = append(reqData.Locks, lockData{
Path: lock.Path,
Name: lock.Name,
Target: targetStr,
})
}

return reqData, nil
}

func (a *ProvidersActor) ResetState(index int64, lockRequestData []LockRequestData) error {
a.lock.Lock()
defer a.lock.Unlock()

var err error

for _, p := range a.allProviders {
p.Clear()
}

for _, reqData := range lockRequestData {
err = a.lockLockRequest(reqData)
if err != nil {
err = fmt.Errorf("applying lock request data: %w", err)
break
}
}

a.localLockReqIndex = index

// 内部状态已被破坏,停止所有监听器
for _, w := range a.indexWaiters {
w.Callback.SetError(ErrWaitIndexUpdateTimeout)
}
a.indexWaiters = nil

return err
}

func (a *ProvidersActor) wakeUpIndexWaiter() {
var resetWaiters []indexWaiter
for _, waiter := range a.indexWaiters {
if waiter.Index <= a.localLockReqIndex {
waiter.Callback.SetVoid()
} else {
resetWaiters = append(resetWaiters, waiter)
}
}
a.indexWaiters = resetWaiters
}

+ 223
- 0
pkgs/distlock/internal/release_actor.go View File

@@ -0,0 +1,223 @@
package internal

import (
"context"
"fmt"
"math/rand"
"strconv"
"sync"
"time"

"gitlink.org.cn/cloudream/common/pkgs/logger"
clientv3 "go.etcd.io/etcd/client/v3"
"go.etcd.io/etcd/client/v3/clientv3util"
)

const (
DefaultMaxReleaseingDelayMs = 4000
BaseReleaseingDelayMs = 1000
)

type ReleaseActor struct {
cfg *Config
etcdCli *clientv3.Client

lock sync.Mutex
isMaintenance bool
releasingLockRequestIDs map[string]bool
timer *time.Timer
timerSetup bool
doReleasingChan chan any
}

func NewReleaseActor(cfg *Config, etcdCli *clientv3.Client) *ReleaseActor {
return &ReleaseActor{
cfg: cfg,
etcdCli: etcdCli,
isMaintenance: true,
releasingLockRequestIDs: make(map[string]bool),
doReleasingChan: make(chan any),
}
}

// 立刻尝试释放这些锁。一般用于在用户主动释放了一个锁之后
func (a *ReleaseActor) Release(reqIDs []string) {
a.lock.Lock()
defer a.lock.Unlock()

for _, id := range reqIDs {
a.releasingLockRequestIDs[id] = true
}

if a.isMaintenance {
return
}

select {
case a.doReleasingChan <- nil:
default:
}
}

// 延迟释放锁。一般用于清理崩溃的锁服务遗留下来的锁
func (a *ReleaseActor) DelayRelease(reqIDs []string) {
a.lock.Lock()
defer a.lock.Unlock()

for _, id := range reqIDs {
a.releasingLockRequestIDs[id] = true
}

if a.isMaintenance {
return
}

a.setupTimer()
}

// 重试一下内部的解锁请求。不会阻塞调用者
func (a *ReleaseActor) TryReleaseNow() {
a.lock.Lock()
defer a.lock.Unlock()

// 如果处于维护模式,那么即使主动进行释放操作,也不予理会
if a.isMaintenance {
return
}

select {
case a.doReleasingChan <- nil:
default:
}
}

// 进入维护模式。在维护模式期间只接受请求,不处理请求,包括延迟释放请求。
func (a *ReleaseActor) EnterMaintenance() {
a.lock.Lock()
defer a.lock.Unlock()

a.isMaintenance = true
}

// 退出维护模式。退出之后建议调用一下TryReleaseNow。
func (a *ReleaseActor) LeaveMaintenance() {
a.lock.Lock()
defer a.lock.Unlock()

a.isMaintenance = false
}

func (a *ReleaseActor) OnLockRequestEvent(event LockRequestEvent) {
if event.IsLocking {
return
}

a.lock.Lock()
defer a.lock.Unlock()

delete(a.releasingLockRequestIDs, event.Data.ID)
}

func (a *ReleaseActor) Serve() {
for {
select {
case <-a.doReleasingChan:
err := a.doReleasing()
if err != nil {
logger.Std.Debugf("doing releasing: %s", err.Error())
}
}
}
}

func (a *ReleaseActor) doReleasing() error {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
defer cancel()

// 先看一眼,如果没有需要释放的锁,就不用走后面的流程了
a.lock.Lock()
if len(a.releasingLockRequestIDs) == 0 {
a.lock.Unlock()
return nil
}
a.lock.Unlock()

// 在获取全局锁的时候不用锁Actor,只有获取成功了,才加锁
// TODO 根据不同的错误设置不同的错误类型,方便上层进行后续处理
unlock, err := acquireEtcdRequestDataLock(ctx, a.etcdCli, a.cfg.EtcdLockLeaseTimeSec)
if err != nil {
return fmt.Errorf("acquire etcd request data lock failed, err: %w", err)
}
defer unlock()

index, err := getEtcdLockRequestIndex(ctx, a.etcdCli)
if err != nil {
return err
}

a.lock.Lock()
defer a.lock.Unlock()
defer a.setupTimer()

// TODO 可以考虑优化成一次性删除多个锁
for id := range a.releasingLockRequestIDs {
lockReqKey := MakeEtcdLockRequestKey(id)

txResp, err := a.etcdCli.Txn(ctx).
If(clientv3util.KeyExists(lockReqKey)).
Then(clientv3.OpDelete(lockReqKey), clientv3.OpPut(EtcdLockRequestIndex, strconv.FormatInt(index+1, 10))).Commit()
if err != nil {
return fmt.Errorf("updating lock request data: %w", err)
}
// 只有确实删除了锁数据,才更新index
if txResp.Succeeded {
index++
}
delete(a.releasingLockRequestIDs, id)
}

return nil
}

func (a *ReleaseActor) setupTimer() {
if len(a.releasingLockRequestIDs) == 0 {
return
}

if a.timerSetup {
return
}
a.timerSetup = true

delay := int64(0)
if a.cfg.RandomReleasingDelayMs == 0 {
delay = rand.Int63n(DefaultMaxReleaseingDelayMs)
} else {
delay = rand.Int63n(a.cfg.RandomReleasingDelayMs)
}

if a.timer == nil {
a.timer = time.NewTimer(time.Duration(delay+BaseReleaseingDelayMs) * time.Millisecond)
} else {
a.timer.Reset(time.Duration(delay+BaseReleaseingDelayMs) * time.Millisecond)
}

go func() {
<-a.timer.C

a.lock.Lock()
defer a.lock.Unlock()

a.timerSetup = false

// 如果处于维护模式,那么即使是定时器要求的释放操作,也不予理会
if a.isMaintenance {
return
}

select {
case a.doReleasingChan <- nil:
default:
}
}()
}

+ 196
- 0
pkgs/distlock/internal/service_info_actor.go View File

@@ -0,0 +1,196 @@
package internal

import (
"context"
"errors"
"fmt"
"sync"

"github.com/google/uuid"
"gitlink.org.cn/cloudream/common/pkgs/logger"
mylo "gitlink.org.cn/cloudream/common/utils/lo"
"gitlink.org.cn/cloudream/common/utils/serder"
clientv3 "go.etcd.io/etcd/client/v3"
)

var ErrSelfServiceDown = errors.New("self service is down, need to restart")

type serviceStatus struct {
Info ServiceInfo
LockRequestIDs []string
}

type ServiceInfoActor struct {
cfg *Config
etcdCli *clientv3.Client

lock sync.Mutex
selfInfo ServiceInfo
leaseID *clientv3.LeaseID
leaseKeepAlive chan any
services map[string]*serviceStatus
releaseActor *ReleaseActor
}

func NewServiceInfoActor(cfg *Config, etcdCli *clientv3.Client, baseSelfInfo ServiceInfo) *ServiceInfoActor {
return &ServiceInfoActor{
cfg: cfg,
etcdCli: etcdCli,
selfInfo: baseSelfInfo,
}
}

func (a *ServiceInfoActor) Init(releaseActor *ReleaseActor) {
a.releaseActor = releaseActor
}

func (a *ServiceInfoActor) GetSelfInfo() *ServiceInfo {
return &a.selfInfo
}

func (a *ServiceInfoActor) ResetState(ctx context.Context, currentServices []ServiceInfo, currentLocks []LockRequestData) ([]string, error) {
a.lock.Lock()
defer a.lock.Unlock()

if a.leaseID != nil {
a.etcdCli.Revoke(ctx, *a.leaseID)
close(a.leaseKeepAlive)
a.leaseID = nil
}

// 生成并注册服务信息
a.selfInfo.ID = uuid.NewString()

infoData, err := serder.ObjectToJSON(a.selfInfo)
if err != nil {
return nil, fmt.Errorf("service info to json: %w", err)
}

lease, err := a.etcdCli.Grant(ctx, a.cfg.EtcdLockLeaseTimeSec)
if err != nil {
return nil, fmt.Errorf("granting lease: %w", err)
}
a.leaseID = &lease.ID

keepAliveChan, err := a.etcdCli.Lease.KeepAlive(context.Background(), lease.ID)
if err != nil {
a.etcdCli.Revoke(ctx, lease.ID)
return nil, fmt.Errorf("starting keep lease alive: %w", err)
}
a.leaseKeepAlive = make(chan any)

go func() {
for {
select {
case _, ok := <-keepAliveChan:
if !ok {
logger.Std.Warnf("lease keep alive channel closed, will try to open again")

var err error
keepAliveChan, err = a.etcdCli.Lease.KeepAlive(context.Background(), lease.ID)
if err != nil {
logger.Std.Warnf("starting keep lease alive: %s", err.Error())
return
}
}

case <-a.leaseKeepAlive:
return
}
}
}()

_, err = a.etcdCli.Put(ctx, MakeServiceInfoKey(a.selfInfo.ID), string(infoData), clientv3.WithLease(lease.ID))
if err != nil {
a.etcdCli.Revoke(ctx, lease.ID)
return nil, fmt.Errorf("putting service info to etcd: %w", err)
}

// 导入当前已有的服务信息和锁信息
a.services = make(map[string]*serviceStatus)
for _, svc := range currentServices {
a.services[svc.ID] = &serviceStatus{
Info: svc,
}
}
// 直接添加自己的信息
a.services[a.selfInfo.ID] = &serviceStatus{
Info: a.selfInfo,
}

// 导入锁信息的过程中可能会发现未注册信息的锁服务的锁,把他们挑出来释放掉
var willReleaseIDs []string
for _, lock := range currentLocks {
svc, ok := a.services[lock.SerivceID]
if !ok {
willReleaseIDs = append(willReleaseIDs, lock.ID)
continue
}

svc.LockRequestIDs = append(svc.LockRequestIDs, lock.ID)
}

return willReleaseIDs, nil
}

func (a *ServiceInfoActor) OnServiceEvent(evt ServiceEvent) error {
a.lock.Lock()
defer a.lock.Unlock()

// TODO 可以考虑打印一点日志

if evt.IsNew {
if evt.Info.ID != a.selfInfo.ID {
logger.Std.WithField("ID", evt.Info.ID).Infof("new service up")
a.services[evt.Info.ID] = &serviceStatus{
Info: evt.Info,
}
}

} else {
logger.Std.WithField("ID", evt.Info.ID).Infof("service down, will release all its locks")

status, ok := a.services[evt.Info.ID]
if !ok {
return nil
}

a.releaseActor.DelayRelease(status.LockRequestIDs)

delete(a.services, evt.Info.ID)

// 如果收到的被删除服务信息是自己的,那么自己要重启,重新获取全量数据
if evt.Info.ID == a.selfInfo.ID {
return ErrSelfServiceDown
}
}

return nil
}

func (a *ServiceInfoActor) OnLockRequestEvent(evt LockRequestEvent) {
a.lock.Lock()
defer a.lock.Unlock()

status, ok := a.services[evt.Data.SerivceID]
if !ok {
if evt.IsLocking {
// 加锁的是一个没有注册过的锁服务,可能是因为这个锁服务之前网络发生了波动,
// 在波动期间它注册的信息过期,于是被大家认为服务下线,清理掉了它管理的锁,
// 而在网络恢复回来之后,它还没有意识到自己被认为下线了,于是还在提交锁请求。
// 为了防止它加了这个锁之后又崩溃,导致的无限锁定,它加的锁我们都直接释放。
logger.Std.WithField("RequestID", evt.Data.ID).
WithField("ServiceID", evt.Data.SerivceID).
Warnf("the lock request is from an unknow service, will release it")

a.releaseActor.Release([]string{evt.Data.ID})
}
return
}

if evt.IsLocking {
status.LockRequestIDs = append(status.LockRequestIDs, evt.Data.ID)
} else {
status.LockRequestIDs = mylo.Remove(status.LockRequestIDs, evt.Data.ID)
}
}

+ 190
- 0
pkgs/distlock/internal/watch_etcd_actor.go View File

@@ -0,0 +1,190 @@
package internal

import (
"context"
"fmt"
"strings"

"gitlink.org.cn/cloudream/common/pkgs/actor"
"gitlink.org.cn/cloudream/common/utils/serder"
clientv3 "go.etcd.io/etcd/client/v3"
)

type LockRequestEvent struct {
IsLocking bool
Data LockRequestData
}

type ServiceEvent struct {
IsNew bool
Info ServiceInfo
}

type OnLockRequestEventFn func(event LockRequestEvent)

type OnServiceEventFn func(event ServiceEvent)

type OnWatchFailedFn func(err error)

type WatchEtcdActor struct {
etcdCli *clientv3.Client

watchChan clientv3.WatchChan
watchChanCancel func()
onLockRequestEventFn OnLockRequestEventFn
onServiceEventFn OnServiceEventFn
onWatchFailedFn OnWatchFailedFn
commandChan *actor.CommandChannel
}

func NewWatchEtcdActor(etcdCli *clientv3.Client) *WatchEtcdActor {
return &WatchEtcdActor{
etcdCli: etcdCli,
commandChan: actor.NewCommandChannel(),
}
}

func (a *WatchEtcdActor) Init(onLockRequestEvent OnLockRequestEventFn, onServiceDown OnServiceEventFn, onWatchFailed OnWatchFailedFn) {
a.onLockRequestEventFn = onLockRequestEvent
a.onServiceEventFn = onServiceDown
a.onWatchFailedFn = onWatchFailed
}

func (a *WatchEtcdActor) Start(revision int64) {
actor.Wait(context.Background(), a.commandChan, func() error {
if a.watchChanCancel != nil {
a.watchChanCancel()
a.watchChanCancel = nil
}

ctx, cancel := context.WithCancel(context.Background())
a.watchChan = a.etcdCli.Watch(ctx, EtcdWatchPrefix, clientv3.WithPrefix(), clientv3.WithPrevKV(), clientv3.WithRev(revision))
a.watchChanCancel = cancel
return nil
})
}

func (a *WatchEtcdActor) Stop() {
actor.Wait(context.Background(), a.commandChan, func() error {
if a.watchChanCancel != nil {
a.watchChanCancel()
a.watchChanCancel = nil
}
a.watchChan = nil
return nil
})
}

func (a *WatchEtcdActor) Serve() {
cmdChan := a.commandChan.BeginChanReceive()
defer a.commandChan.CloseChanReceive()

for {
if a.watchChan != nil {
select {
case cmd := <-cmdChan:
cmd()

case msg := <-a.watchChan:
// 只要发生错误,就停止监听,通知外部处理
if msg.Canceled {
a.onWatchFailedFn(fmt.Errorf("watch etcd channel closed"))
a.watchChanCancel()
a.watchChan = nil
continue
}

err := a.dispatchEtcdEvent(msg)
if err != nil {
a.onWatchFailedFn(err)
a.watchChanCancel()
a.watchChan = nil
continue
}
}

} else {
select {
case cmd := <-cmdChan:
cmd()
}
}
}
}

func (a *WatchEtcdActor) dispatchEtcdEvent(watchResp clientv3.WatchResponse) error {
for _, e := range watchResp.Events {
key := string(e.Kv.Key)

if strings.HasPrefix(key, EtcdLockRequestDataPrefix) {
if err := a.applyLockRequestEvent(e); err != nil {
return fmt.Errorf("parsing lock request event: %w", err)
}

} else if strings.HasPrefix(key, EtcdServiceInfoPrefix) {
if err := a.applyServiceEvent(e); err != nil {
return fmt.Errorf("parsing service event: %w", err)
}
}
}

return nil
}

func (a *WatchEtcdActor) applyLockRequestEvent(evt *clientv3.Event) error {
isLocking := true
var valueData []byte

// 只监听新建和删除的事件,因为在设计上约定只有这两种事件才会影响Index
if evt.Type == clientv3.EventTypeDelete {
isLocking = false
valueData = evt.PrevKv.Value
} else if evt.IsCreate() {
isLocking = true
valueData = evt.Kv.Value
} else {
return nil
}

var reqData LockRequestData
err := serder.JSONToObject(valueData, &reqData)
if err != nil {
return fmt.Errorf("parse lock request data failed, err: %w", err)
}

a.onLockRequestEventFn(LockRequestEvent{
IsLocking: isLocking,
Data: reqData,
})

return nil
}

func (a *WatchEtcdActor) applyServiceEvent(evt *clientv3.Event) error {
isNew := true
var valueData []byte

// 只监听新建和删除的事件,因为在设计上约定只有这两种事件才会影响Index
if evt.Type == clientv3.EventTypeDelete {
isNew = false
valueData = evt.PrevKv.Value
} else if evt.IsCreate() {
isNew = true
valueData = evt.Kv.Value
} else {
return nil
}

var svcInfo ServiceInfo
err := serder.JSONToObject(valueData, &svcInfo)
if err != nil {
return fmt.Errorf("parsing service info: %w", err)
}

a.onServiceEventFn(ServiceEvent{
IsNew: isNew,
Info: svcInfo,
})

return nil
}

pkgs/distlock/service/mutex.go → pkgs/distlock/mutex.go View File

@@ -1,14 +1,14 @@
package service
package distlock

import "gitlink.org.cn/cloudream/common/pkgs/distlock"
import "gitlink.org.cn/cloudream/common/pkgs/distlock/internal"

type Mutex struct {
svc *Service
lockReq distlock.LockRequest
lockReq internal.LockRequest
lockReqID string
}

func NewMutex(svc *Service, lockReq distlock.LockRequest) *Mutex {
func NewMutex(svc *Service, lockReq internal.LockRequest) *Mutex {
return &Mutex{
svc: svc,
lockReq: lockReq,
@@ -25,6 +25,6 @@ func (m *Mutex) Lock() error {
return nil
}

func (m *Mutex) Unlock() error {
return m.svc.Release(m.lockReqID)
func (m *Mutex) Unlock() {
m.svc.Release(m.lockReqID)
}

+ 300
- 0
pkgs/distlock/service.go View File

@@ -0,0 +1,300 @@
package distlock

import (
"context"
"fmt"
"strconv"
"time"

"gitlink.org.cn/cloudream/common/pkgs/actor"
"gitlink.org.cn/cloudream/common/pkgs/distlock/internal"
"gitlink.org.cn/cloudream/common/pkgs/logger"
"gitlink.org.cn/cloudream/common/utils/serder"
clientv3 "go.etcd.io/etcd/client/v3"
)

type AcquireOption struct {
Timeout time.Duration
Lease time.Duration
}

type AcquireOptionFn func(opt *AcquireOption)

func WithTimeout(timeout time.Duration) AcquireOptionFn {
return func(opt *AcquireOption) {
opt.Timeout = timeout
}
}

func WithLease(time time.Duration) AcquireOptionFn {
return func(opt *AcquireOption) {
opt.Lease = time
}
}

type PathProvider struct {
Path []any
Provider internal.LockProvider
}

func NewPathProvider(prov internal.LockProvider, path ...any) PathProvider {
return PathProvider{
Path: path,
Provider: prov,
}
}

type Service struct {
cfg *internal.Config
etcdCli *clientv3.Client

cmdChan *actor.CommandChannel
acquireActor *internal.AcquireActor
releaseActor *internal.ReleaseActor
providersActor *internal.ProvidersActor
watchEtcdActor *internal.WatchEtcdActor
leaseActor *internal.LeaseActor
serviceInfoActor *internal.ServiceInfoActor
}

func NewService(cfg *internal.Config, initProvs []PathProvider) (*Service, error) {
etcdCli, err := clientv3.New(clientv3.Config{
Endpoints: []string{cfg.EtcdAddress},
Username: cfg.EtcdUsername,
Password: cfg.EtcdPassword,
DialTimeout: time.Second * 5,
})
if err != nil {
return nil, fmt.Errorf("new etcd client failed, err: %w", err)
}

svc := &Service{
cfg: cfg,
etcdCli: etcdCli,
cmdChan: actor.NewCommandChannel(),
}

svc.acquireActor = internal.NewAcquireActor(cfg, etcdCli)
svc.releaseActor = internal.NewReleaseActor(cfg, etcdCli)
svc.providersActor = internal.NewProvidersActor()
svc.watchEtcdActor = internal.NewWatchEtcdActor(etcdCli)
svc.leaseActor = internal.NewLeaseActor()
svc.serviceInfoActor = internal.NewServiceInfoActor(cfg, etcdCli, internal.ServiceInfo{
Description: cfg.ServiceDescription,
})

svc.acquireActor.Init(svc.providersActor)
svc.leaseActor.Init(svc.releaseActor)
svc.providersActor.Init()
svc.watchEtcdActor.Init(
func(event internal.LockRequestEvent) {
err := svc.providersActor.OnLockRequestEvent(event)
if err != nil {
logger.Std.Warnf("%s, will reset service state", err.Error())
svc.cmdChan.Send(func() { svc.doResetState() })
return
}

svc.acquireActor.TryAcquireNow()
svc.releaseActor.OnLockRequestEvent(event)
svc.serviceInfoActor.OnLockRequestEvent(event)
},
func(event internal.ServiceEvent) {
err := svc.serviceInfoActor.OnServiceEvent(event)
if err != nil {
logger.Std.Warnf("%s, will reset service state", err.Error())
svc.cmdChan.Send(func() { svc.doResetState() })
}
},
func(err error) {
logger.Std.Warnf("%s, will reset service state", err.Error())
svc.cmdChan.Send(func() { svc.doResetState() })
},
)
svc.serviceInfoActor.Init(svc.releaseActor)

for _, prov := range initProvs {
svc.providersActor.AddProvider(prov.Provider, prov.Path...)
}

return svc, nil
}

// Acquire 请求一批锁。成功后返回锁请求ID
func (svc *Service) Acquire(req internal.LockRequest, opts ...AcquireOptionFn) (string, error) {
var opt = AcquireOption{
Timeout: time.Second * 10,
}
for _, fn := range opts {
fn(&opt)
}

ctx := context.Background()
if opt.Timeout != 0 {
var cancel func()
ctx, cancel = context.WithTimeout(ctx, opt.Timeout)
defer cancel()
}

reqID, err := svc.acquireActor.Acquire(ctx, req)
if err != nil {
return "", err
}

if opt.Lease > 0 {
// TODO 不影响结果,但考虑打日志
err := svc.leaseActor.Add(reqID, opt.Lease)
if err != nil {
logger.Std.Warnf("adding lease: %s", err.Error())
}
}

return reqID, nil
}

// Renew 续约锁。只有在加锁时设置了续约时间才有意义
func (svc *Service) Renew(reqID string) error {
return svc.leaseActor.Renew(reqID)
}

// Release 释放锁
func (svc *Service) Release(reqID string) {
svc.releaseActor.Release([]string{reqID})

// TODO 不影响结果,但考虑打日志
err := svc.leaseActor.Remove(reqID)
if err != nil {
logger.Std.Warnf("removing lease: %s", err.Error())
}
}

func (svc *Service) Serve() error {
// TODO 需要停止service的方法
// 目前已知问题:
// 1. client退出时直接中断进程,此时AcquireActor可能正在进行重试,于是导致Etcd锁没有解除就退出了进程。
// 虽然由于租约的存在不会导致系统长期卡死,但会影响client的使用

go svc.watchEtcdActor.Serve()

go svc.leaseActor.Serve()

go svc.acquireActor.Serve()

go svc.releaseActor.Serve()

svc.cmdChan.Send(func() { svc.doResetState() })

cmdChan := svc.cmdChan.BeginChanReceive()
defer svc.cmdChan.CloseChanReceive()

for {
select {
case cmd := <-cmdChan:
cmd()
}
}

return nil
}

func (svc *Service) doResetState() {
logger.Std.Infof("start reset state")
// TODO context
err := svc.resetState(context.Background())
if err != nil {
logger.Std.Warnf("reseting state: %s, will try again after 3 seconds", err.Error())
<-time.After(time.Second * 3)
svc.cmdChan.Send(func() { svc.doResetState() })
return
}
logger.Std.WithField("ID", svc.serviceInfoActor.GetSelfInfo().ID).
Infof("reset state success")
}

// ResetState 重置内部状态。注:只要调用到了此函数,无论在哪一步出的错,
// 都要将内部状态视为已被破坏,直到成功调用了此函数才能继续后面的步骤。
// 如果调用失败,服务将进入维护模式,届时可以接受请求,但不会处理请求,直到调用成功为止。
func (svc *Service) resetState(ctx context.Context) error {
// 让服务都进入维护模式
svc.watchEtcdActor.Stop()
svc.leaseActor.Stop()
svc.acquireActor.EnterMaintenance()
svc.releaseActor.EnterMaintenance()

// 必须使用事务一次性获取所有数据
txResp, err := svc.etcdCli.Txn(ctx).
Then(
clientv3.OpGet(internal.EtcdLockRequestIndex),
clientv3.OpGet(internal.EtcdLockRequestDataPrefix, clientv3.WithPrefix()),
clientv3.OpGet(internal.EtcdServiceInfoPrefix, clientv3.WithPrefix()),
).
Commit()
if err != nil {
return fmt.Errorf("getting etcd data: %w", err)
}

// 解析Index
var index int64 = 0
indexKvs := txResp.Responses[0].GetResponseRange().Kvs
if len(indexKvs) > 0 {
val, err := strconv.ParseInt(string(indexKvs[0].Value), 0, 64)
if err != nil {
return fmt.Errorf("parsing lock request index: %w", err)
}
index = val
}

// 解析锁请求数据
var reqData []internal.LockRequestData
lockKvs := txResp.Responses[1].GetResponseRange().Kvs
for _, kv := range lockKvs {
var req internal.LockRequestData
err := serder.JSONToObject(kv.Value, &req)
if err != nil {
return fmt.Errorf("parsing lock request data: %w", err)
}

reqData = append(reqData, req)
}

// 解析服务信息数据
var svcInfo []internal.ServiceInfo
svcInfoKvs := txResp.Responses[2].GetResponseRange().Kvs
for _, kv := range svcInfoKvs {
var info internal.ServiceInfo
err := serder.JSONToObject(kv.Value, &info)
if err != nil {
return fmt.Errorf("parsing service info data: %w", err)
}

svcInfo = append(svcInfo, info)
}

// 然后将新获取到的状态装填到Actor中
releasingIDs, err := svc.serviceInfoActor.ResetState(ctx, svcInfo, reqData)
if err != nil {
return fmt.Errorf("reseting service info actor: %w", err)
}

// 要在acquireActor之前,因为acquireActor会调用它的WaitLocalIndexTo
err = svc.providersActor.ResetState(index, reqData)
if err != nil {
return fmt.Errorf("reseting providers actor: %w", err)
}

svc.acquireActor.ResetState(svc.serviceInfoActor.GetSelfInfo().ID)

// ReleaseActor没有什么需要Reset的状态
svc.releaseActor.DelayRelease(releasingIDs)

// 重置完了之后再退出维护模式
svc.watchEtcdActor.Start(txResp.Header.Revision)
svc.leaseActor.Start()
svc.acquireActor.LeaveMaintenance()
svc.releaseActor.LeaveMaintenance()

svc.acquireActor.TryAcquireNow()
svc.releaseActor.TryReleaseNow()

return nil
}

+ 0
- 313
pkgs/distlock/service/internal/main_actor.go View File

@@ -1,313 +0,0 @@
package internal

import (
"context"
"fmt"
"strconv"

"gitlink.org.cn/cloudream/common/pkgs/actor"
"gitlink.org.cn/cloudream/common/pkgs/distlock"
"gitlink.org.cn/cloudream/common/utils/serder"
clientv3 "go.etcd.io/etcd/client/v3"
"go.etcd.io/etcd/client/v3/clientv3util"
"go.etcd.io/etcd/client/v3/concurrency"
)

const (
EtcdLockRequestData = "/distlock/lockRequest/data"
EtcdLockRequestIndex = "/distlock/lockRequest/index"
EtcdLockRequestLock = "/distlock/lockRequest/lock"
)

type lockData struct {
Path []string `json:"path"`
Name string `json:"name"`
Target string `json:"target"`
}

type acquireManyResult struct {
IsTried bool
RequestID string
Err error
}

type LockRequestData struct {
ID string `json:"id"`
Locks []lockData `json:"locks"`
}

type MainActor struct {
cfg *distlock.Config
etcdCli *clientv3.Client

commandChan *actor.CommandChannel

providersActor *ProvidersActor

lockRequestLeaseID clientv3.LeaseID
}

func NewMainActor(cfg *distlock.Config, etcdCli *clientv3.Client) *MainActor {
return &MainActor{
cfg: cfg,
etcdCli: etcdCli,
commandChan: actor.NewCommandChannel(),
}
}

func (a *MainActor) Init(providersActor *ProvidersActor) {
a.providersActor = providersActor
}

// Acquire 请求一批锁。成功后返回锁请求ID
func (a *MainActor) Acquire(ctx context.Context, req distlock.LockRequest) (reqID string, err error) {
rets, err := a.AcquireMany(ctx, []distlock.LockRequest{req})
if err != nil {
return "", err
}

if rets[0].Err != nil {
return "", rets[0].Err
}

return rets[0].RequestID, nil
}

// AcquireAny 尝试多个锁请求。目前的实现会在第一个获取成功后就直接返回
func (a *MainActor) AcquireMany(ctx context.Context, reqs []distlock.LockRequest) (rets []acquireManyResult, err error) {
return actor.WaitValue(context.TODO(), a.commandChan, func() ([]acquireManyResult, error) {
// TODO 根据不同的错误设置不同的错误类型,方便上层进行后续处理
unlock, err := a.acquireEtcdRequestDataLock(ctx)
if err != nil {
return nil, fmt.Errorf("acquire etcd request data lock failed, err: %w", err)
}
defer unlock()

index, err := a.getEtcdLockRequestIndex(ctx)
if err != nil {
return nil, err
}

// 等待本地状态同步到最新
// TODO 配置等待时间
err = a.providersActor.WaitIndexUpdated(ctx, index)
if err != nil {
return nil, err
}

rets := make([]acquireManyResult, len(reqs))
for i := 0; i < len(reqs); i++ {
// 测试锁,并获得锁数据
reqData, err := a.providersActor.TestLockRequestAndMakeData(reqs[i])
if err == nil {
nextIndexStr := strconv.FormatInt(index+1, 10)
reqData.ID = nextIndexStr

// 锁成功,提交锁数据
err := a.submitLockRequest(ctx, reqData)

rets[i] = acquireManyResult{
IsTried: true,
RequestID: nextIndexStr,
Err: err,
}

break

} else {
rets[i] = acquireManyResult{
IsTried: true,
Err: err,
}
}
}

return rets, nil
})
}

func (a *MainActor) submitLockRequest(ctx context.Context, reqData LockRequestData) error {
reqBytes, err := serder.ObjectToJSON(reqData)
if err != nil {
return fmt.Errorf("serialize lock request data failed, err: %w", err)
}

var etcdOps []clientv3.Op
if a.cfg.SubmitLockRequestWithoutLease {
etcdOps = []clientv3.Op{
clientv3.OpPut(EtcdLockRequestIndex, reqData.ID),
clientv3.OpPut(makeEtcdLockRequestKey(reqData.ID), string(reqBytes)),
}

} else {
etcdOps = []clientv3.Op{
clientv3.OpPut(EtcdLockRequestIndex, reqData.ID),
// 归属到当前连接的租约,在当前连接断开后,能自动解锁
// TODO 不能直接给RequestData上租约,因为如果在别的服务已经获取到锁的情况下,
// 如果当前服务崩溃,删除消息会立刻发送出去,这就破坏了锁的约定(在锁定期间其他服务不能修改数据)
clientv3.OpPut(makeEtcdLockRequestKey(reqData.ID), string(reqBytes)), //, clientv3.WithLease(a.lockRequestLeaseID)),
}
}
txResp, err := a.etcdCli.Txn(ctx).Then(etcdOps...).Commit()
if err != nil {
return fmt.Errorf("submit lock request data failed, err: %w", err)
}
if !txResp.Succeeded {
return fmt.Errorf("submit lock request data failed for lock request data index changed")
}

return nil
}

// Release 释放锁
func (a *MainActor) Release(ctx context.Context, reqID string) error {
return actor.Wait(context.TODO(), a.commandChan, func() error {
// TODO 根据不同的错误设置不同的错误类型,方便上层进行后续处理
unlock, err := a.acquireEtcdRequestDataLock(ctx)
if err != nil {
return fmt.Errorf("acquire etcd request data lock failed, err: %w", err)
}
defer unlock()

index, err := a.getEtcdLockRequestIndex(ctx)
if err != nil {
return err
}

lockReqKey := makeEtcdLockRequestKey(reqID)

txResp, err := a.etcdCli.Txn(ctx).
If(clientv3util.KeyExists(lockReqKey)).
Then(clientv3.OpDelete(lockReqKey), clientv3.OpPut(EtcdLockRequestIndex, strconv.FormatInt(index+1, 10))).Commit()
if err != nil {
return fmt.Errorf("updating lock request data index: %w", err)
}
if !txResp.Succeeded {
return fmt.Errorf("updating lock request data failed")
}

return nil
})
}

func (a *MainActor) acquireEtcdRequestDataLock(ctx context.Context) (unlock func(), err error) {
lease, err := a.etcdCli.Grant(context.Background(), a.cfg.EtcdLockLeaseTimeSec)
if err != nil {
return nil, fmt.Errorf("grant lease failed, err: %w", err)
}

session, err := concurrency.NewSession(a.etcdCli, concurrency.WithLease(lease.ID))
if err != nil {
return nil, fmt.Errorf("new session failed, err: %w", err)
}

mutex := concurrency.NewMutex(session, EtcdLockRequestLock)

err = mutex.Lock(ctx)
if err != nil {
session.Close()
return nil, fmt.Errorf("acquire lock failed, err: %w", err)
}

return func() {
mutex.Unlock(context.Background())
session.Close()
}, nil
}

func (a *MainActor) getEtcdLockRequestIndex(ctx context.Context) (int64, error) {
indexKv, err := a.etcdCli.Get(ctx, EtcdLockRequestIndex)
if err != nil {
return 0, fmt.Errorf("get lock request index failed, err: %w", err)
}

if len(indexKv.Kvs) == 0 {
return 0, nil
}

index, err := strconv.ParseInt(string(indexKv.Kvs[0].Value), 0, 64)
if err != nil {
return 0, fmt.Errorf("parse lock request index failed, err: %w", err)
}

return index, nil
}

func (a *MainActor) ReloadEtcdData() error {
return actor.Wait(context.TODO(), a.commandChan, func() error {
// 使用事务一次性获取index和锁数据,就不需要加全局锁了
txResp, err := a.etcdCli.Txn(context.Background()).
Then(
clientv3.OpGet(EtcdLockRequestIndex),
clientv3.OpGet(EtcdLockRequestData, clientv3.WithPrefix()),
).
Commit()
if err != nil {
return fmt.Errorf("get etcd data failed, err: %w", err)
}
if !txResp.Succeeded {
return fmt.Errorf("get etcd data failed")
}

indexKvs := txResp.Responses[0].GetResponseRange().Kvs
lockKvs := txResp.Responses[1].GetResponseRange().Kvs

var index int64
var reqData []LockRequestData

// 解析Index
if len(indexKvs) > 0 {
val, err := strconv.ParseInt(string(indexKvs[0].Value), 0, 64)
if err != nil {
return fmt.Errorf("parse lock request index failed, err: %w", err)
}
index = val

} else {
index = 0
}

// 解析锁请求数据
for _, kv := range lockKvs {
var req LockRequestData
err := serder.JSONToObject(kv.Value, &req)
if err != nil {
return fmt.Errorf("parse lock request data failed, err: %w", err)
}

reqData = append(reqData, req)
}

err = a.providersActor.ResetState(index, reqData)
if err != nil {
return fmt.Errorf("reset lock providers state failed, err: %w", err)
}

return nil
})
}

func (a *MainActor) Serve() error {
lease, err := a.etcdCli.Grant(context.Background(), a.cfg.EtcdLockLeaseTimeSec)
if err != nil {
return fmt.Errorf("grant lease failed, err: %w", err)
}
a.lockRequestLeaseID = lease.ID

cmdChan := a.commandChan.BeginChanReceive()
defer a.commandChan.CloseChanReceive()

for {
select {
case cmd, ok := <-cmdChan:
if !ok {
return fmt.Errorf("command channel closed")
}

// TODO Actor启动时,如果第一个调用的是Acquire,那么就会在Acquire中等待本地锁数据同步到最新。
// 此时命令的执行也会被阻塞,导致ReloadEtcdData命令无法执行,因此产生死锁,最后Acquire超时失败。
// 此处暂时使用单独的goroutine的来执行命令,避免阻塞。
go cmd()
}
}
}

+ 0
- 216
pkgs/distlock/service/internal/providers_actor.go View File

@@ -1,216 +0,0 @@
package internal

import (
"context"
"fmt"

"gitlink.org.cn/cloudream/common/pkgs/actor"
"gitlink.org.cn/cloudream/common/pkgs/distlock"
"gitlink.org.cn/cloudream/common/pkgs/future"
"gitlink.org.cn/cloudream/common/pkgs/trie"
)

type indexWaiter struct {
Index int64
Future *future.SetVoidFuture
}

type ProvidersActor struct {
localLockReqIndex int64
provdersTrie trie.Trie[distlock.LockProvider]
allProviders []distlock.LockProvider

indexWaiters []indexWaiter

commandChan *actor.CommandChannel
}

func NewProvidersActor() *ProvidersActor {
return &ProvidersActor{
commandChan: actor.NewCommandChannel(),
}
}

func (a *ProvidersActor) AddProvider(prov distlock.LockProvider, path ...any) {
a.provdersTrie.Create(path).Value = prov
a.allProviders = append(a.allProviders, prov)
}

func (a *ProvidersActor) Init() {
}

func (a *ProvidersActor) WaitIndexUpdated(ctx context.Context, index int64) error {
fut := future.NewSetVoid()

a.commandChan.Send(func() {
if index <= a.localLockReqIndex {
fut.SetVoid()
} else {
a.indexWaiters = append(a.indexWaiters, indexWaiter{
Index: index,
Future: fut,
})
}
})

return fut.Wait(ctx)
}

func (a *ProvidersActor) ApplyLockRequestEvents(events []LockRequestEvent) error {
return actor.Wait(context.TODO(), a.commandChan, func() error {
for _, op := range events {
if op.IsLocking {
err := a.lockLockRequest(op.Data)
if err != nil {
return fmt.Errorf("lock by lock request data failed, err: %w", err)
}

} else {
err := a.unlockLockRequest(op.Data)
if err != nil {
return fmt.Errorf("unlock by lock request data failed, err: %w", err)
}
}
}

// 处理了多少事件,Index就往后移动多少个
a.localLockReqIndex += int64(len(events))

// 检查是否有等待同步进度的需求
a.checkIndexWaiter()

return nil
})
}

func (svc *ProvidersActor) lockLockRequest(reqData LockRequestData) error {
for _, lockData := range reqData.Locks {
node, ok := svc.provdersTrie.WalkEnd(lockData.Path)
if !ok || node.Value == nil {
return fmt.Errorf("lock provider not found for path %v", lockData.Path)
}

target, err := node.Value.ParseTargetString(lockData.Target)
if err != nil {
return fmt.Errorf("parse target data failed, err: %w", err)
}

err = node.Value.Lock(reqData.ID, distlock.Lock{
Path: lockData.Path,
Name: lockData.Name,
Target: target,
})
if err != nil {
return fmt.Errorf("locking failed, err: %w", err)
}
}
return nil
}

func (svc *ProvidersActor) unlockLockRequest(reqData LockRequestData) error {
for _, lockData := range reqData.Locks {
node, ok := svc.provdersTrie.WalkEnd(lockData.Path)
if !ok || node.Value == nil {
return fmt.Errorf("lock provider not found for path %v", lockData.Path)
}

target, err := node.Value.ParseTargetString(lockData.Target)
if err != nil {
return fmt.Errorf("parse target data failed, err: %w", err)
}

err = node.Value.Unlock(reqData.ID, distlock.Lock{
Path: lockData.Path,
Name: lockData.Name,
Target: target,
})
if err != nil {
return fmt.Errorf("unlocking failed, err: %w", err)
}
}
return nil
}

// TestLockRequestAndMakeData 判断锁能否锁成功,并生成锁数据的字符串表示。注:不会生成请求ID。
// 在检查单个锁是否能上锁时,不会考虑同一个锁请求中的其他的锁影响。简单来说,就是同一个请求中的锁可以互相冲突。
func (a *ProvidersActor) TestLockRequestAndMakeData(req distlock.LockRequest) (LockRequestData, error) {
return actor.WaitValue(context.TODO(), a.commandChan, func() (LockRequestData, error) {
reqData := LockRequestData{}

for _, lock := range req.Locks {
n, ok := a.provdersTrie.WalkEnd(lock.Path)
if !ok || n.Value == nil {
return LockRequestData{}, fmt.Errorf("lock provider not found for path %v", lock.Path)
}

err := n.Value.CanLock(lock)
if err != nil {
return LockRequestData{}, err
}

targetStr, err := n.Value.GetTargetString(lock.Target)
if err != nil {
return LockRequestData{}, fmt.Errorf("get lock target string failed, err: %w", err)
}

reqData.Locks = append(reqData.Locks, lockData{
Path: lock.Path,
Name: lock.Name,
Target: targetStr,
})
}

return reqData, nil
})
}

// ResetState 重置内部状态
func (a *ProvidersActor) ResetState(index int64, lockRequestData []LockRequestData) error {
return actor.Wait(context.TODO(), a.commandChan, func() error {
for _, p := range a.allProviders {
p.Clear()
}

for _, reqData := range lockRequestData {
err := a.lockLockRequest(reqData)
if err != nil {
return fmt.Errorf("lock by lock request data failed, err: %w", err)
}
}

a.localLockReqIndex = index

// 检查是否有等待同步进度的需求
a.checkIndexWaiter()

return nil
})
}

func (a *ProvidersActor) checkIndexWaiter() {
var resetWaiters []indexWaiter
for _, waiter := range a.indexWaiters {
if waiter.Index <= a.localLockReqIndex {
waiter.Future.SetVoid()
} else {
resetWaiters = append(resetWaiters, waiter)
}
}
a.indexWaiters = resetWaiters
}

func (a *ProvidersActor) Serve() error {
cmdChan := a.commandChan.BeginChanReceive()
defer a.commandChan.CloseChanReceive()

for {
select {
case cmd, ok := <-cmdChan:
if !ok {
return fmt.Errorf("command channel closed")
}

cmd()
}
}
}

+ 0
- 128
pkgs/distlock/service/internal/retry_actor.go View File

@@ -1,128 +0,0 @@
package internal

import (
"context"
"fmt"

"github.com/samber/lo"
"gitlink.org.cn/cloudream/common/pkgs/actor"
"gitlink.org.cn/cloudream/common/pkgs/distlock"
"gitlink.org.cn/cloudream/common/pkgs/future"
"gitlink.org.cn/cloudream/common/pkgs/logger"
mylo "gitlink.org.cn/cloudream/common/utils/lo"
)

type retryInfo struct {
Callback *future.SetValueFuture[string]
LastErr error
}

type RetryActor struct {
retrys []distlock.LockRequest
retryInfos []*retryInfo

commandChan *actor.CommandChannel

mainActor *MainActor
}

func NewRetryActor() *RetryActor {
return &RetryActor{
commandChan: actor.NewCommandChannel(),
}
}

func (a *RetryActor) Init(mainActor *MainActor) {
a.mainActor = mainActor
}

func (a *RetryActor) Retry(ctx context.Context, req distlock.LockRequest, lastErr error) (future.ValueFuture[string], error) {
fut := future.NewSetValue[string]()

var info *retryInfo
err := actor.Wait(ctx, a.commandChan, func() error {
a.retrys = append(a.retrys, req)
info = &retryInfo{
Callback: fut,
LastErr: lastErr,
}
a.retryInfos = append(a.retryInfos, info)
return nil
})
if err != nil {
return nil, err
}

go func() {
<-ctx.Done()
a.commandChan.Send(func() {
// 由于只可能在cmd中修改future状态,所以此处的IsComplete判断可以作为后续操作的依据
if fut.IsComplete() {
return
}

index := lo.IndexOf(a.retryInfos, info)
if index == -1 {
return
}

a.retryInfos[index].Callback.SetError(a.retryInfos[index].LastErr)

a.retrys = mylo.RemoveAt(a.retrys, index)
a.retryInfos = mylo.RemoveAt(a.retryInfos, index)
})
}()

return fut, nil
}

func (a *RetryActor) OnLocalStateUpdated() {
a.commandChan.Send(func() {
if len(a.retrys) == 0 {
return
}

rets, err := a.mainActor.AcquireMany(context.Background(), a.retrys)
if err != nil {
// TODO 处理错误
logger.Std.Warnf("acquire many lock requests failed, err: %s", err.Error())
return
}

// 根据尝试的结果更新状态
delCnt := 0
for i, ret := range rets {
a.retrys[i-delCnt] = a.retrys[i]
a.retryInfos[i-delCnt] = a.retryInfos[i]

if !ret.IsTried {
continue
}

if ret.Err != nil {
a.retryInfos[i].LastErr = ret.Err
} else {
a.retryInfos[i].Callback.SetValue(ret.RequestID)
delCnt++
}
}
a.retrys = a.retrys[:len(a.retrys)-delCnt]
a.retryInfos = a.retryInfos[:len(a.retryInfos)-delCnt]
})
}

func (a *RetryActor) Serve() error {
cmdChan := a.commandChan.BeginChanReceive()
defer a.commandChan.CloseChanReceive()

for {
select {
case cmd, ok := <-cmdChan:
if !ok {
return fmt.Errorf("command channel closed")
}

cmd()
}
}
}

+ 0
- 116
pkgs/distlock/service/internal/utils.go View File

@@ -1,116 +0,0 @@
package internal

import (
"strings"
)

func makeEtcdLockRequestKey(reqID string) string {
return EtcdLockRequestData + "/" + reqID
}

func getLockRequestID(key string) string {
return strings.TrimPrefix(key, EtcdLockRequestData+"/")
}

/*
func parseLockData(str string) (lock lockData, err error) {
sb := strings.Builder{}
var comps []string

escaping := false
for _, ch := range strings.TrimSpace(str) {
if escaping {
if ch == 'n' {
sb.WriteRune('\n')
} else {
sb.WriteRune(ch)
}

escaping = false
continue
}

if ch == '/' {
comps = append(comps, sb.String())
sb.Reset()
} else if ch == '\\' {
escaping = true
} else {
sb.WriteRune(ch)
}
}

comps = append(comps, sb.String())

if len(comps) < 3 {
return lockData{}, fmt.Errorf("string must includes 3 components devided by /")
}

return lockData{
Path: comps[0 : len(comps)-2],
Name: comps[len(comps)-2],
Target: comps[len(comps)-1],
}, nil
}

func lockDataToString(lock lockData) string {
sb := strings.Builder{}

for _, s := range lock.Path {
sb.WriteString(lockDataEncoding(s))
sb.WriteRune('/')
}

sb.WriteString(lockDataEncoding(lock.Name))
sb.WriteRune('/')

sb.WriteString(lockDataEncoding(lock.Target))

return sb.String()
}

func lockDataEncoding(str string) string {
sb := strings.Builder{}

for _, ch := range str {
if ch == '\\' {
sb.WriteString("\\\\")
} else if ch == '/' {
sb.WriteString("\\/")
} else if ch == '\n' {
sb.WriteString("\\n")
} else {
sb.WriteRune(ch)
}
}

return sb.String()
}

func lockDataDecoding(str string) string {
sb := strings.Builder{}

escaping := false
for _, ch := range str {
if escaping {
if ch == 'n' {
sb.WriteRune('\n')
} else {
sb.WriteRune(ch)
}

escaping = false
continue
}

if ch == '\\' {
escaping = true

} else {
sb.WriteRune(ch)
}
}

return sb.String()
}
*/

+ 0
- 61
pkgs/distlock/service/internal/utils_test.go View File

@@ -1,61 +0,0 @@
package internal

/*
import (
. "github.com/smartystreets/goconvey/convey"
)

func Test_parseLockData_lockDataToString(t *testing.T) {
cases := []struct {
title string
data lockData
}{
{
title: "多段路径",
data: lockData{
Path: []string{"a", "b", "c"},
Name: "d",
Target: "e",
},
},

{
title: "包含分隔符",
data: lockData{
Path: []string{"a/", "b", "c/c"},
Name: "/d",
Target: "///e//d/",
},
},

{
title: "包含转义符",
data: lockData{
Path: []string{"a\\/", "b", "\\c/c"},
Name: "/d",
Target: "///e\\//d/\\",
},
},

{
title: "包含换行符",
data: lockData{
Path: []string{"a\n", "\nb", "c\nc"},
Name: "/d",
Target: "e\nd\n",
},
},
}

for _, ca := range cases {
Convey(ca.title, t, func() {
str := lockDataToString(ca.data)

data, err := parseLockData(str)

So(err, ShouldBeNil)
So(data, ShouldResemble, ca.data)
})
}
}
*/

+ 0
- 149
pkgs/distlock/service/internal/watch_etcd_actor.go View File

@@ -1,149 +0,0 @@
package internal

import (
"context"
"fmt"

"gitlink.org.cn/cloudream/common/pkgs/actor"
mylo "gitlink.org.cn/cloudream/common/utils/lo"
"gitlink.org.cn/cloudream/common/utils/serder"
clientv3 "go.etcd.io/etcd/client/v3"
)

type LockRequestEvent struct {
IsLocking bool
Data LockRequestData
}

type LockRequestEventWatcher struct {
OnEvent func(events []LockRequestEvent)
}

type WatchEtcdActor struct {
etcdCli *clientv3.Client
watchChan clientv3.WatchChan
lockReqWatchers []*LockRequestEventWatcher

commandChan *actor.CommandChannel
}

func NewWatchEtcdActor(etcdCli *clientv3.Client) *WatchEtcdActor {
return &WatchEtcdActor{
etcdCli: etcdCli,
commandChan: actor.NewCommandChannel(),
}
}

func (a *WatchEtcdActor) Init() {
}

func (a *WatchEtcdActor) StartWatching() error {
return actor.Wait(context.TODO(), a.commandChan, func() error {
a.watchChan = a.etcdCli.Watch(context.Background(), EtcdLockRequestData, clientv3.WithPrefix(), clientv3.WithPrevKV())
return nil
})
}

func (a *WatchEtcdActor) StopWatching() error {
return actor.Wait(context.TODO(), a.commandChan, func() error {
a.watchChan = nil
return nil
})
}

func (a *WatchEtcdActor) AddEventWatcher(watcher *LockRequestEventWatcher) error {
return actor.Wait(context.TODO(), a.commandChan, func() error {
a.lockReqWatchers = append(a.lockReqWatchers, watcher)
return nil
})
}

func (a *WatchEtcdActor) RemoveEventWatcher(watcher *LockRequestEventWatcher) error {
return actor.Wait(context.TODO(), a.commandChan, func() error {
a.lockReqWatchers = mylo.Remove(a.lockReqWatchers, watcher)
return nil
})
}

func (a *WatchEtcdActor) Serve() error {
cmdChan := a.commandChan.BeginChanReceive()
defer a.commandChan.CloseChanReceive()

for {
if a.watchChan != nil {
select {
case cmd, ok := <-cmdChan:
if !ok {
return fmt.Errorf("command channel closed")
}

cmd()

case msg := <-a.watchChan:
if msg.Canceled {
// TODO 更好的错误处理
return fmt.Errorf("watch etcd channel closed")
}

events, err := a.parseEvents(msg)
if err != nil {
// TODO 更好的错误处理
return fmt.Errorf("parse etcd lock request data failed, err: %w", err)
}

for _, w := range a.lockReqWatchers {
w.OnEvent(events)
}
}

} else {
select {
case cmd, ok := <-cmdChan:
if !ok {
return fmt.Errorf("command channel closed")
}

cmd()
}
}
}
}

func (a *WatchEtcdActor) parseEvents(watchResp clientv3.WatchResponse) ([]LockRequestEvent, error) {
var events []LockRequestEvent

for _, e := range watchResp.Events {

shouldParseData := false
isLocking := true
var valueData []byte

// 只监听新建和删除的事件,因为在设计上约定只有这两种事件才会影响Index
if e.Type == clientv3.EventTypeDelete {
shouldParseData = true
isLocking = false
valueData = e.PrevKv.Value
} else if e.IsCreate() {
shouldParseData = true
isLocking = true
valueData = e.Kv.Value
}

if !shouldParseData {
continue
}

var reqData LockRequestData
err := serder.JSONToObject(valueData, &reqData)
if err != nil {
return nil, fmt.Errorf("parse lock request data failed, err: %w", err)
}

events = append(events, LockRequestEvent{
IsLocking: isLocking,
Data: reqData,
})
}

return events, nil
}

+ 0
- 234
pkgs/distlock/service/service.go View File

@@ -1,234 +0,0 @@
package service

import (
"context"
"fmt"
"time"

"gitlink.org.cn/cloudream/common/pkgs/distlock"
"gitlink.org.cn/cloudream/common/pkgs/distlock/service/internal"
"gitlink.org.cn/cloudream/common/pkgs/logger"
clientv3 "go.etcd.io/etcd/client/v3"
)

type AcquireOption struct {
Timeout time.Duration
Lease time.Duration
}

type AcquireOptionFn func(opt *AcquireOption)

func WithTimeout(timeout time.Duration) AcquireOptionFn {
return func(opt *AcquireOption) {
opt.Timeout = timeout
}
}

func WithLease(time time.Duration) AcquireOptionFn {
return func(opt *AcquireOption) {
opt.Lease = time
}
}

type PathProvider struct {
Path []any
Provider distlock.LockProvider
}

func NewPathProvider(prov distlock.LockProvider, path ...any) PathProvider {
return PathProvider{
Path: path,
Provider: prov,
}
}

type Service struct {
cfg *distlock.Config
etcdCli *clientv3.Client

mainActor *internal.MainActor
providersActor *internal.ProvidersActor
watchEtcdActor *internal.WatchEtcdActor
leaseActor *internal.LeaseActor
retryActor *internal.RetryActor

lockReqEventWatcher internal.LockRequestEventWatcher
}

func NewService(cfg *distlock.Config, initProvs []PathProvider) (*Service, error) {
etcdCli, err := clientv3.New(clientv3.Config{
Endpoints: []string{cfg.EtcdAddress},
Username: cfg.EtcdUsername,
Password: cfg.EtcdPassword,
DialTimeout: time.Second * 5,
})
if err != nil {
return nil, fmt.Errorf("new etcd client failed, err: %w", err)
}

mainActor := internal.NewMainActor(cfg, etcdCli)
providersActor := internal.NewProvidersActor()
watchEtcdActor := internal.NewWatchEtcdActor(etcdCli)
leaseActor := internal.NewLeaseActor()
retryActor := internal.NewRetryActor()

mainActor.Init(providersActor)
providersActor.Init()
watchEtcdActor.Init()
leaseActor.Init(mainActor)
retryActor.Init(mainActor)

for _, prov := range initProvs {
providersActor.AddProvider(prov.Provider, prov.Path...)
}

return &Service{
cfg: cfg,
etcdCli: etcdCli,
mainActor: mainActor,
providersActor: providersActor,
watchEtcdActor: watchEtcdActor,
leaseActor: leaseActor,
retryActor: retryActor,
}, nil
}

// Acquire 请求一批锁。成功后返回锁请求ID
func (svc *Service) Acquire(req distlock.LockRequest, opts ...AcquireOptionFn) (string, error) {
var opt = AcquireOption{
Timeout: time.Second * 10,
}
for _, fn := range opts {
fn(&opt)
}

ctx := context.Background()
if opt.Timeout != 0 {
var cancel func()
ctx, cancel = context.WithTimeout(ctx, opt.Timeout)
defer cancel()
}

reqID, err := svc.mainActor.Acquire(ctx, req)
if err != nil {
fut, err := svc.retryActor.Retry(ctx, req, err)
if err != nil {
return "", fmt.Errorf("retrying failed, err: %w", err)
}

// Retry 如果超时,Retry内部会设置fut为Failed,所以这里可以用Background无限等待
reqID, err = fut.WaitValue(context.Background())
if err != nil {
return "", err
}
}

if opt.Lease > 0 {
// TODO 不影响结果,但考虑打日志
err := svc.leaseActor.Add(reqID, opt.Lease)
if err != nil {
logger.Std.Warnf("adding lease: %s", err.Error())
}
}

return reqID, nil
}

// Renew 续约锁。只有在加锁时设置了续约时间才有意义
func (svc *Service) Renew(reqID string) error {
return svc.leaseActor.Renew(reqID)
}

// Release 释放锁
func (svc *Service) Release(reqID string) error {
err := svc.mainActor.Release(context.TODO(), reqID)

// TODO 不影响结果,但考虑打日志
err2 := svc.leaseActor.Remove(reqID)
if err2 != nil {
logger.Std.Warnf("removing lease: %s", err2.Error())
}

return err
}

func (svc *Service) Serve() error {
// TODO 需要停止service的方法
// 目前已知问题:
// 1. client退出时直接中断进程,此时RetryActor可能正在进行Retry,于是导致Etcd锁没有解除就退出了进程。
// 虽然由于租约的存在不会导致系统长期卡死,但会影响client的使用

go func() {
// TODO 处理错误
err := svc.providersActor.Serve()
if err != nil {
logger.Std.Warnf("serving providers actor failed, err: %s", err.Error())
}
}()

go func() {
// TODO 处理错误
err := svc.watchEtcdActor.Serve()
if err != nil {
logger.Std.Warnf("serving watch etcd actor actor failed, err: %s", err.Error())
}
}()

go func() {
// TODO 处理错误
err := svc.mainActor.Serve()
if err != nil {
logger.Std.Warnf("serving main actor failed, err: %s", err.Error())
}
}()

go func() {
// TODO 处理错误
err := svc.leaseActor.Serve()
if err != nil {
logger.Std.Warnf("serving lease actor failed, err: %s", err.Error())
}
}()

go func() {
// TODO 处理错误
err := svc.retryActor.Serve()
if err != nil {
logger.Std.Warnf("serving retry actor failed, err: %s", err.Error())
}
}()

err := svc.mainActor.ReloadEtcdData()
if err != nil {
// TODO 关闭其他的Actor,或者更好的错误处理方式
return fmt.Errorf("init data failed, err: %w", err)
}

svc.lockReqEventWatcher.OnEvent = func(events []internal.LockRequestEvent) {
svc.providersActor.ApplyLockRequestEvents(events)
svc.retryActor.OnLocalStateUpdated()
}
err = svc.watchEtcdActor.AddEventWatcher(&svc.lockReqEventWatcher)
if err != nil {
// TODO 关闭其他的Actor,或者更好的错误处理方式
return fmt.Errorf("add lock request event watcher failed, err: %w", err)
}

err = svc.watchEtcdActor.StartWatching()
if err != nil {
// TODO 关闭其他的Actor,或者更好的错误处理方式
return fmt.Errorf("start watching etcd failed, err: %w", err)
}

err = svc.leaseActor.StartChecking()
if err != nil {
// TODO 关闭其他的Actor,或者更好的错误处理方式
return fmt.Errorf("start checking lease failed, err: %w", err)
}

// TODO 防止退出的临时解决办法
ch := make(chan any)
<-ch

return nil
}

+ 2
- 0
pkgs/future/set_value_future.go View File

@@ -48,6 +48,8 @@ func (f *SetValueFuture[T]) IsComplete() bool {
return f.isCompleted
}

// 等待直到Complete或者ctx被取消。
// 注:返回ErrContextCancelled不代表产生结果的过程没有执行过,甚至不代表Future没有Complete
func (f *SetValueFuture[T]) Wait(ctx context.Context) error {
select {
case <-f.completeChan:


+ 2
- 1
sdks/imfs/models.go View File

@@ -1,5 +1,6 @@
package imsdk

const (
EnvPackageList = "IMFS_PACKAGE_LIST"
EnvPackageList = "IMFS_PACKAGE_LIST"
EnvServiceAddress = "IMFS_SERVICE_ADDRESS"
)

Loading…
Cancel
Save