Browse Source

调整目录结构

pull/1/head
Sydonian 2 years ago
parent
commit
dbdd613c70
12 changed files with 72 additions and 154 deletions
  1. +2
    -13
      pkg/distlock/distlock.go
  2. +19
    -2
      pkg/distlock/lockprovider/string_lock_target.go
  3. +1
    -1
      pkg/distlock/service/lease_actor.go
  4. +4
    -3
      pkg/distlock/service/main_actor.go
  5. +7
    -6
      pkg/distlock/service/providers_actor.go
  6. +16
    -4
      pkg/distlock/service/service.go
  7. +1
    -1
      pkg/distlock/service/utils.go
  8. +1
    -1
      pkg/distlock/service/utils_test.go
  9. +1
    -1
      pkg/distlock/service/watch_etcd_actor.go
  10. +0
    -6
      pkg/event/event.go
  11. +0
    -116
      pkg/event/executor.go
  12. +20
    -0
      utils/lo/lo.go

+ 2
- 13
pkg/distlock/distlock.go View File

@@ -8,12 +8,6 @@ type Lock struct {
Target any // 锁对象,由具体的Provider去解析 Target any // 锁对象,由具体的Provider去解析
} }


type lockData struct {
Path []string `json:"path"`
Name string `json:"name"`
Target string `json:"target"`
}

type LockRequest struct { type LockRequest struct {
Locks []Lock Locks []Lock
} }
@@ -22,16 +16,11 @@ func (b *LockRequest) Add(lock Lock) {
b.Locks = append(b.Locks, lock) b.Locks = append(b.Locks, lock)
} }


type lockRequestData struct {
ID string `json:"id"`
Locks []lockData `json:"locks"`
}

type LockProvider interface { type LockProvider interface {
// CanLock 判断这个锁能否锁定成功 // CanLock 判断这个锁能否锁定成功
CanLock(lock Lock) error CanLock(lock Lock) error


// 锁定
// 锁定。在内部可以不用判断能否加锁,外部需要保证调用此函数前调用了CanLock进行检查
Lock(reqID string, lock Lock) error Lock(reqID string, lock Lock) error


// 解锁 // 解锁
@@ -55,7 +44,7 @@ func (e *LockTargetBusyError) Error() string {
return fmt.Sprintf("the lock object is locked by %s", e.lockName) return fmt.Sprintf("the lock object is locked by %s", e.lockName)
} }


func newLockTargetBusyError(lockName string) *LockTargetBusyError {
func NewLockTargetBusyError(lockName string) *LockTargetBusyError {
return &LockTargetBusyError{ return &LockTargetBusyError{
lockName: lockName, lockName: lockName,
} }


+ 19
- 2
pkg/distlock/lockprovider/string_lock_target.go View File

@@ -1,7 +1,9 @@
package lockprovider package lockprovider


import "gitlink.org.cn/cloudream/common/utils/serder"

type StringLockTarget struct { type StringLockTarget struct {
Components []StringLockTargetComponet
Components []StringLockTargetComponet `json:"components"`
} }


// IsConflict 判断两个锁对象是否冲突。注:只有相同的结构的Target才有意义 // IsConflict 判断两个锁对象是否冲突。注:只有相同的结构的Target才有意义
@@ -20,7 +22,7 @@ func (t *StringLockTarget) IsConflict(other *StringLockTarget) bool {
} }


type StringLockTargetComponet struct { type StringLockTargetComponet struct {
Values []string
Values []string `json:"values"`
} }


// IsEquals 判断两个Component是否相同。注:只有相同的结构的Component才有意义 // IsEquals 判断两个Component是否相同。注:只有相同的结构的Component才有意义
@@ -37,3 +39,18 @@ func (t *StringLockTargetComponet) IsEquals(other *StringLockTargetComponet) boo


return true return true
} }

func StringLockTargetToString(target *StringLockTarget) (string, error) {
data, err := serder.ObjectToJSON(target)
if err != nil {
return "", err
}

return string(data), nil
}

func StringLockTargetFromString(str string) (StringLockTarget, error) {
var ret StringLockTarget
err := serder.JSONToObject([]byte(str), &ret)
return ret, err
}

pkg/distlock/lease_actor.go → pkg/distlock/service/lease_actor.go View File

@@ -1,4 +1,4 @@
package distlock
package service


import ( import (
"fmt" "fmt"

pkg/distlock/main_actor.go → pkg/distlock/service/main_actor.go View File

@@ -1,4 +1,4 @@
package distlock
package service


import ( import (
"context" "context"
@@ -7,13 +7,14 @@ import (
"time" "time"


"gitlink.org.cn/cloudream/common/pkg/actor" "gitlink.org.cn/cloudream/common/pkg/actor"
"gitlink.org.cn/cloudream/common/pkg/distlock"
"gitlink.org.cn/cloudream/common/utils/serder" "gitlink.org.cn/cloudream/common/utils/serder"
clientv3 "go.etcd.io/etcd/client/v3" clientv3 "go.etcd.io/etcd/client/v3"
"go.etcd.io/etcd/client/v3/concurrency" "go.etcd.io/etcd/client/v3/concurrency"
) )


type mainActor struct { type mainActor struct {
cfg *Config
cfg *distlock.Config
etcdCli *clientv3.Client etcdCli *clientv3.Client


commandChan *actor.CommandChannel commandChan *actor.CommandChannel
@@ -36,7 +37,7 @@ func (a *mainActor) Init(watchEtcdActor *watchEtcdActor, providersActor *provide
} }


// Acquire 请求一批锁。成功后返回锁请求ID // Acquire 请求一批锁。成功后返回锁请求ID
func (a *mainActor) Acquire(req LockRequest) (reqID string, err error) {
func (a *mainActor) Acquire(req distlock.LockRequest) (reqID string, err error) {
return actor.WaitValue[string](a.commandChan, func() (string, error) { return actor.WaitValue[string](a.commandChan, func() (string, error) {
// TODO 根据不同的错误设置不同的错误类型,方便上层进行后续处理 // TODO 根据不同的错误设置不同的错误类型,方便上层进行后续处理
unlock, err := a.acquireEtcdRequestDataLock() unlock, err := a.acquireEtcdRequestDataLock()

pkg/distlock/providers_actor.go → pkg/distlock/service/providers_actor.go View File

@@ -1,9 +1,10 @@
package distlock
package service


import ( import (
"fmt" "fmt"


"gitlink.org.cn/cloudream/common/pkg/actor" "gitlink.org.cn/cloudream/common/pkg/actor"
"gitlink.org.cn/cloudream/common/pkg/distlock"
"gitlink.org.cn/cloudream/common/pkg/future" "gitlink.org.cn/cloudream/common/pkg/future"
"gitlink.org.cn/cloudream/common/pkg/trie" "gitlink.org.cn/cloudream/common/pkg/trie"
) )
@@ -20,8 +21,8 @@ type lockRequestDataUpdateOp struct {


type providersActor struct { type providersActor struct {
localLockReqIndex int64 localLockReqIndex int64
provdersTrie trie.Trie[LockProvider]
allProviders []LockProvider
provdersTrie trie.Trie[distlock.LockProvider]
allProviders []distlock.LockProvider


indexWaiters []indexWaiter indexWaiters []indexWaiter


@@ -93,7 +94,7 @@ func (svc *providersActor) lockLockRequest(reqData lockRequestData) error {
return fmt.Errorf("parse target data failed, err: %w", err) return fmt.Errorf("parse target data failed, err: %w", err)
} }


err = node.Value.Lock(reqData.ID, Lock{
err = node.Value.Lock(reqData.ID, distlock.Lock{
Path: lockData.Path, Path: lockData.Path,
Name: lockData.Name, Name: lockData.Name,
Target: target, Target: target,
@@ -117,7 +118,7 @@ func (svc *providersActor) unlockLockRequest(reqData lockRequestData) error {
return fmt.Errorf("parse target data failed, err: %w", err) return fmt.Errorf("parse target data failed, err: %w", err)
} }


err = node.Value.Unlock(reqData.ID, Lock{
err = node.Value.Unlock(reqData.ID, distlock.Lock{
Path: lockData.Path, Path: lockData.Path,
Name: lockData.Name, Name: lockData.Name,
Target: target, Target: target,
@@ -130,7 +131,7 @@ func (svc *providersActor) unlockLockRequest(reqData lockRequestData) error {
} }


// TestLockRequestAndMakeData 判断锁能否锁成功,并生成锁数据的字符串表示。注:不会生成请求ID // TestLockRequestAndMakeData 判断锁能否锁成功,并生成锁数据的字符串表示。注:不会生成请求ID
func (a *providersActor) TestLockRequestAndMakeData(req LockRequest) (lockRequestData, error) {
func (a *providersActor) TestLockRequestAndMakeData(req distlock.LockRequest) (lockRequestData, error) {
return actor.WaitValue[lockRequestData](a.commandChan, func() (lockRequestData, error) { return actor.WaitValue[lockRequestData](a.commandChan, func() (lockRequestData, error) {
reqData := lockRequestData{} reqData := lockRequestData{}



pkg/distlock/service.go → pkg/distlock/service/service.go View File

@@ -1,9 +1,10 @@
package distlock
package service


import ( import (
"fmt" "fmt"
"time" "time"


"gitlink.org.cn/cloudream/common/pkg/distlock"
clientv3 "go.etcd.io/etcd/client/v3" clientv3 "go.etcd.io/etcd/client/v3"
) )


@@ -13,8 +14,19 @@ const (
LOCK_REQUEST_LOCK_NAME = "/distlock/lockRequest/lock" LOCK_REQUEST_LOCK_NAME = "/distlock/lockRequest/lock"
) )


type lockData struct {
Path []string `json:"path"`
Name string `json:"name"`
Target string `json:"target"`
}

type lockRequestData struct {
ID string `json:"id"`
Locks []lockData `json:"locks"`
}

type Service struct { type Service struct {
cfg *Config
cfg *distlock.Config
etcdCli *clientv3.Client etcdCli *clientv3.Client


mainActor *mainActor mainActor *mainActor
@@ -23,7 +35,7 @@ type Service struct {
leaseActor *leaseActor leaseActor *leaseActor
} }


func NewService(cfg *Config) (*Service, error) {
func NewService(cfg *distlock.Config) (*Service, error) {
etcdCli, err := clientv3.New(clientv3.Config{ etcdCli, err := clientv3.New(clientv3.Config{
Endpoints: []string{cfg.EtcdAddress}, Endpoints: []string{cfg.EtcdAddress},
Username: cfg.EtcdUsername, Username: cfg.EtcdUsername,
@@ -55,7 +67,7 @@ func NewService(cfg *Config) (*Service, error) {
} }


// Acquire 请求一批锁。成功后返回锁请求ID // Acquire 请求一批锁。成功后返回锁请求ID
func (svc *Service) Acquire(req LockRequest) (string, error) {
func (svc *Service) Acquire(req distlock.LockRequest) (string, error) {
reqID, err := svc.mainActor.Acquire(req) reqID, err := svc.mainActor.Acquire(req)
if err != nil { if err != nil {
return "", err return "", err

pkg/distlock/utils.go → pkg/distlock/service/utils.go View File

@@ -1,4 +1,4 @@
package distlock
package service


import ( import (
"strings" "strings"

pkg/distlock/utils_test.go → pkg/distlock/service/utils_test.go View File

@@ -1,4 +1,4 @@
package distlock
package service


/* /*
import ( import (

pkg/distlock/watch_etcd_actor.go → pkg/distlock/service/watch_etcd_actor.go View File

@@ -1,4 +1,4 @@
package distlock
package service


import ( import (
"context" "context"

+ 0
- 6
pkg/event/event.go View File

@@ -1,6 +0,0 @@
package event

type Event[TArgs any] interface {
TryMerge(other Event[TArgs]) bool // 尝试将other任务与自身合并,如果成功返回true
Execute(ctx ExecuteContext[TArgs])
}

+ 0
- 116
pkg/event/executor.go View File

@@ -1,116 +0,0 @@
package event

import (
"sync"

"github.com/zyedidia/generic/list"
mysync "gitlink.org.cn/cloudream/common/utils/sync"
)

type ExecuteOption struct {
IsEmergency bool
DontMerge bool
}
type ExecuteContext[TArgs any] struct {
Executor *Executor[TArgs]
Option ExecuteOption
Args TArgs
}
type postedEvent[TArgs any] struct {
Event Event[TArgs]
Option ExecuteOption
}

type Executor[TArgs any] struct {
events *list.List[postedEvent[TArgs]]
locker sync.Mutex
eventCond *mysync.CounterCond
execArgs TArgs
}

func NewExecutor[TArgs any](args TArgs) Executor[TArgs] {
return Executor[TArgs]{
events: list.New[postedEvent[TArgs]](),
locker: sync.Mutex{},
eventCond: mysync.NewCounterCond(0),
execArgs: args,
}
}

func (e *Executor[TArgs]) Post(event Event[TArgs], opts ...ExecuteOption) {
opt := ExecuteOption{
IsEmergency: false,
DontMerge: false,
}

if len(opts) > 0 {
opt = opts[0]
}

e.locker.Lock()
defer e.locker.Unlock()

// 紧急任务直接插入到队头,不进行合并
if opt.IsEmergency {
e.events.PushFront(postedEvent[TArgs]{
Event: event,
Option: opt,
})
e.eventCond.Release()
return
}

// 合并任务
if opt.DontMerge {
ptr := e.events.Front
for ptr != nil {
// 只与非紧急任务,且允许合并的任务进行合并
if !ptr.Value.Option.IsEmergency && !ptr.Value.Option.DontMerge {
if ptr.Value.Event.TryMerge(event) {
return
}
}

ptr = ptr.Next
}
}

e.events.PushBack(postedEvent[TArgs]{
Event: event,
Option: opt,
})
e.eventCond.Release()
}

// Execute 开始执行任务
func (e *Executor[TArgs]) Execute() error {
for {
e.eventCond.Wait()

event := e.popFrontEvent()
if event == nil {
continue
}

ctx := ExecuteContext[TArgs]{
Executor: e,
Option: event.Option,
Args: e.execArgs,
}

event.Event.Execute(ctx)
}
}

func (e *Executor[TArgs]) popFrontEvent() *postedEvent[TArgs] {
e.locker.Lock()
defer e.locker.Unlock()

if e.events.Front == nil {
return nil
}

val := &e.events.Front.Value
e.events.Remove(e.events.Front)
return val
}

+ 20
- 0
utils/lo/lo.go View File

@@ -0,0 +1,20 @@
package lo

import "github.com/samber/lo"

func Remove[T comparable](arr []T, item T) []T {
index := lo.IndexOf(arr, item)
if index == -1 {
return arr
}

return RemoveAt(arr, index)
}

func RemoveAt[T any](arr []T, index int) []T {
if index >= len(arr) {
return arr
}

return append(arr[:index], arr[:index+1]...)
}

Loading…
Cancel
Save