Browse Source

调整代码结构

pull/29/head
Sydonian 2 years ago
parent
commit
ede0aa5f14
12 changed files with 85 additions and 75 deletions
  1. +8
    -30
      pkgs/distlock/distlock.go
  2. +4
    -21
      pkgs/distlock/internal/acquire_actor.go
  3. +1
    -1
      pkgs/distlock/internal/config.go
  4. +0
    -0
      pkgs/distlock/internal/lease_actor.go
  5. +52
    -0
      pkgs/distlock/internal/models.go
  6. +6
    -7
      pkgs/distlock/internal/providers_actor.go
  7. +2
    -3
      pkgs/distlock/internal/release_actor.go
  8. +0
    -0
      pkgs/distlock/internal/utils.go
  9. +0
    -0
      pkgs/distlock/internal/utils_test.go
  10. +0
    -0
      pkgs/distlock/internal/watch_etcd_actor.go
  11. +4
    -4
      pkgs/distlock/mutex.go
  12. +8
    -9
      pkgs/distlock/service.go

+ 8
- 30
pkgs/distlock/distlock.go View File

@@ -1,40 +1,18 @@
package distlock

import "fmt"
import (
"fmt"

type Lock struct {
Path []string // 锁路径,存储的是路径的每一部分
Name string // 锁名
Target any // 锁对象,由具体的Provider去解析
}

type LockRequest struct {
Locks []Lock
}

func (b *LockRequest) Add(lock Lock) {
b.Locks = append(b.Locks, lock)
}

type LockProvider interface {
// CanLock 判断这个锁能否锁定成功
CanLock(lock Lock) error
"gitlink.org.cn/cloudream/common/pkgs/distlock/internal"
)

// Lock 锁定。由于同一个锁请求内的锁不检查冲突,因此这个函数必须支持有冲突的锁进行锁定。
Lock(reqID string, lock Lock) error
type Lock = internal.Lock

// 解锁
Unlock(reqID string, lock Lock) error
type LockRequest = internal.LockRequest

// GetTargetString 将锁对象序列化为字符串,方便存储到ETCD
GetTargetString(target any) (string, error)
type LockProvider = internal.LockProvider

// ParseTargetString 解析字符串格式的锁对象数据
ParseTargetString(targetStr string) (any, error)

// Clear 清除内部所有状态
Clear()
}
type Config = internal.Config

type LockTargetBusyError struct {
lockName string


pkgs/distlock/service/internal/acquire_actor.go → pkgs/distlock/internal/acquire_actor.go View File

@@ -7,7 +7,6 @@ import (
"strconv"
"sync"

"gitlink.org.cn/cloudream/common/pkgs/distlock"
"gitlink.org.cn/cloudream/common/pkgs/future"
"gitlink.org.cn/cloudream/common/pkgs/logger"
mylo "gitlink.org.cn/cloudream/common/utils/lo"
@@ -18,30 +17,14 @@ import (

var ErrAcquiringTimeout = errors.New("acquiring timeout")

const (
EtcdLockRequestData = "/distlock/lockRequest/data"
EtcdLockRequestIndex = "/distlock/lockRequest/index"
EtcdLockRequestLock = "/distlock/lockRequest/lock"
)

type lockData struct {
Path []string `json:"path"`
Name string `json:"name"`
Target string `json:"target"`
}
type LockRequestData struct {
ID string `json:"id"`
Locks []lockData `json:"locks"`
}

type acquireInfo struct {
Request distlock.LockRequest
Request LockRequest
Callback *future.SetValueFuture[string]
LastErr error
}

type AcquireActor struct {
cfg *distlock.Config
cfg *Config
etcdCli *clientv3.Client
providersActor *ProvidersActor

@@ -49,7 +32,7 @@ type AcquireActor struct {
lock sync.Mutex
}

func NewAcquireActor(cfg *distlock.Config, etcdCli *clientv3.Client) *AcquireActor {
func NewAcquireActor(cfg *Config, etcdCli *clientv3.Client) *AcquireActor {
return &AcquireActor{
cfg: cfg,
etcdCli: etcdCli,
@@ -61,7 +44,7 @@ func (a *AcquireActor) Init(providersActor *ProvidersActor) {
}

// Acquire 请求一批锁。成功后返回锁请求ID
func (a *AcquireActor) Acquire(ctx context.Context, req distlock.LockRequest) (string, error) {
func (a *AcquireActor) Acquire(ctx context.Context, req LockRequest) (string, error) {
info := &acquireInfo{
Request: req,
Callback: future.NewSetValue[string](),

pkgs/distlock/config.go → pkgs/distlock/internal/config.go View File

@@ -1,4 +1,4 @@
package distlock
package internal

type Config struct {
EtcdAddress string `json:"etcdAddress"`

pkgs/distlock/service/internal/lease_actor.go → pkgs/distlock/internal/lease_actor.go View File


+ 52
- 0
pkgs/distlock/internal/models.go View File

@@ -0,0 +1,52 @@
package internal

const (
EtcdLockRequestData = "/distlock/lockRequest/data"
EtcdLockRequestIndex = "/distlock/lockRequest/index"
EtcdLockRequestLock = "/distlock/lockRequest/lock"
)

type Lock struct {
Path []string // 锁路径,存储的是路径的每一部分
Name string // 锁名
Target any // 锁对象,由具体的Provider去解析
}

type LockRequest struct {
Locks []Lock
}

func (b *LockRequest) Add(lock Lock) {
b.Locks = append(b.Locks, lock)
}

type LockProvider interface {
// CanLock 判断这个锁能否锁定成功
CanLock(lock Lock) error

// Lock 锁定。由于同一个锁请求内的锁不检查冲突,因此这个函数必须支持有冲突的锁进行锁定。
Lock(reqID string, lock Lock) error

// 解锁
Unlock(reqID string, lock Lock) error

// GetTargetString 将锁对象序列化为字符串,方便存储到ETCD
GetTargetString(target any) (string, error)

// ParseTargetString 解析字符串格式的锁对象数据
ParseTargetString(targetStr string) (any, error)

// Clear 清除内部所有状态
Clear()
}

type lockData struct {
Path []string `json:"path"`
Name string `json:"name"`
Target string `json:"target"`
}

type LockRequestData struct {
ID string `json:"id"`
Locks []lockData `json:"locks"`
}

pkgs/distlock/service/internal/providers_actor.go → pkgs/distlock/internal/providers_actor.go View File

@@ -5,7 +5,6 @@ import (
"fmt"

"gitlink.org.cn/cloudream/common/pkgs/actor"
"gitlink.org.cn/cloudream/common/pkgs/distlock"
"gitlink.org.cn/cloudream/common/pkgs/future"
"gitlink.org.cn/cloudream/common/pkgs/logger"
"gitlink.org.cn/cloudream/common/pkgs/trie"
@@ -18,8 +17,8 @@ type indexWaiter struct {

type ProvidersActor struct {
localLockReqIndex int64
provdersTrie trie.Trie[distlock.LockProvider]
allProviders []distlock.LockProvider
provdersTrie trie.Trie[LockProvider]
allProviders []LockProvider

indexWaiters []indexWaiter

@@ -32,7 +31,7 @@ func NewProvidersActor() *ProvidersActor {
}
}

func (a *ProvidersActor) AddProvider(prov distlock.LockProvider, path ...any) {
func (a *ProvidersActor) AddProvider(prov LockProvider, path ...any) {
a.provdersTrie.Create(path).Value = prov
a.allProviders = append(a.allProviders, prov)
}
@@ -97,7 +96,7 @@ func (svc *ProvidersActor) lockLockRequest(reqData LockRequestData) error {
return fmt.Errorf("parse target data failed, err: %w", err)
}

err = node.Value.Lock(reqData.ID, distlock.Lock{
err = node.Value.Lock(reqData.ID, Lock{
Path: lockData.Path,
Name: lockData.Name,
Target: target,
@@ -121,7 +120,7 @@ func (svc *ProvidersActor) unlockLockRequest(reqData LockRequestData) error {
return fmt.Errorf("parse target data failed, err: %w", err)
}

err = node.Value.Unlock(reqData.ID, distlock.Lock{
err = node.Value.Unlock(reqData.ID, Lock{
Path: lockData.Path,
Name: lockData.Name,
Target: target,
@@ -135,7 +134,7 @@ func (svc *ProvidersActor) unlockLockRequest(reqData LockRequestData) error {

// TestLockRequestAndMakeData 判断锁能否锁成功,并生成锁数据的字符串表示。注:不会生成请求ID。
// 在检查单个锁是否能上锁时,不会考虑同一个锁请求中的其他的锁影响。简单来说,就是同一个请求中的锁可以互相冲突。
func (a *ProvidersActor) TestLockRequestAndMakeData(req distlock.LockRequest) (LockRequestData, error) {
func (a *ProvidersActor) TestLockRequestAndMakeData(req LockRequest) (LockRequestData, error) {
return actor.WaitValue(context.TODO(), a.commandChan, func() (LockRequestData, error) {
reqData := LockRequestData{}


pkgs/distlock/service/internal/release_actor.go → pkgs/distlock/internal/release_actor.go View File

@@ -8,7 +8,6 @@ import (
"sync"
"time"

"gitlink.org.cn/cloudream/common/pkgs/distlock"
"gitlink.org.cn/cloudream/common/pkgs/logger"
clientv3 "go.etcd.io/etcd/client/v3"
"go.etcd.io/etcd/client/v3/clientv3util"
@@ -20,7 +19,7 @@ const (
)

type ReleaseActor struct {
cfg *distlock.Config
cfg *Config
etcdCli *clientv3.Client

releasingLockRequestIDs map[string]bool
@@ -29,7 +28,7 @@ type ReleaseActor struct {
lock sync.Mutex
}

func NewReleaseActor(cfg *distlock.Config, etcdCli *clientv3.Client) *ReleaseActor {
func NewReleaseActor(cfg *Config, etcdCli *clientv3.Client) *ReleaseActor {
return &ReleaseActor{
cfg: cfg,
etcdCli: etcdCli,

pkgs/distlock/service/internal/utils.go → pkgs/distlock/internal/utils.go View File


pkgs/distlock/service/internal/utils_test.go → pkgs/distlock/internal/utils_test.go View File


pkgs/distlock/service/internal/watch_etcd_actor.go → pkgs/distlock/internal/watch_etcd_actor.go View File


pkgs/distlock/service/mutex.go → pkgs/distlock/mutex.go View File

@@ -1,14 +1,14 @@
package service
package distlock

import "gitlink.org.cn/cloudream/common/pkgs/distlock"
import "gitlink.org.cn/cloudream/common/pkgs/distlock/internal"

type Mutex struct {
svc *Service
lockReq distlock.LockRequest
lockReq internal.LockRequest
lockReqID string
}

func NewMutex(svc *Service, lockReq distlock.LockRequest) *Mutex {
func NewMutex(svc *Service, lockReq internal.LockRequest) *Mutex {
return &Mutex{
svc: svc,
lockReq: lockReq,

pkgs/distlock/service/service.go → pkgs/distlock/service.go View File

@@ -1,4 +1,4 @@
package service
package distlock

import (
"context"
@@ -6,8 +6,7 @@ import (
"strconv"
"time"

"gitlink.org.cn/cloudream/common/pkgs/distlock"
"gitlink.org.cn/cloudream/common/pkgs/distlock/service/internal"
"gitlink.org.cn/cloudream/common/pkgs/distlock/internal"
"gitlink.org.cn/cloudream/common/pkgs/logger"
"gitlink.org.cn/cloudream/common/utils/serder"
clientv3 "go.etcd.io/etcd/client/v3"
@@ -34,10 +33,10 @@ func WithLease(time time.Duration) AcquireOptionFn {

type PathProvider struct {
Path []any
Provider distlock.LockProvider
Provider internal.LockProvider
}

func NewPathProvider(prov distlock.LockProvider, path ...any) PathProvider {
func NewPathProvider(prov internal.LockProvider, path ...any) PathProvider {
return PathProvider{
Path: path,
Provider: prov,
@@ -45,7 +44,7 @@ func NewPathProvider(prov distlock.LockProvider, path ...any) PathProvider {
}

type Service struct {
cfg *distlock.Config
cfg *internal.Config
etcdCli *clientv3.Client

acquireActor *internal.AcquireActor
@@ -57,7 +56,7 @@ type Service struct {
lockReqEventWatcher internal.LockRequestEventWatcher
}

func NewService(cfg *distlock.Config, initProvs []PathProvider) (*Service, error) {
func NewService(cfg *internal.Config, initProvs []PathProvider) (*Service, error) {
etcdCli, err := clientv3.New(clientv3.Config{
Endpoints: []string{cfg.EtcdAddress},
Username: cfg.EtcdUsername,
@@ -95,7 +94,7 @@ func NewService(cfg *distlock.Config, initProvs []PathProvider) (*Service, error
}

// Acquire 请求一批锁。成功后返回锁请求ID
func (svc *Service) Acquire(req distlock.LockRequest, opts ...AcquireOptionFn) (string, error) {
func (svc *Service) Acquire(req internal.LockRequest, opts ...AcquireOptionFn) (string, error) {
var opt = AcquireOption{
Timeout: time.Second * 10,
}
@@ -145,7 +144,7 @@ func (svc *Service) Release(reqID string) {
func (svc *Service) Serve() error {
// TODO 需要停止service的方法
// 目前已知问题:
// 1. client退出时直接中断进程,此时RetryActor可能正在进行Retry,于是导致Etcd锁没有解除就退出了进程。
// 1. client退出时直接中断进程,此时AcquireActor可能正在进行重试,于是导致Etcd锁没有解除就退出了进程。
// 虽然由于租约的存在不会导致系统长期卡死,但会影响client的使用

go func() {

Loading…
Cancel
Save