diff --git a/pkg/distlock/distlock.go b/pkg/distlock/distlock.go index d506f61..bd8b300 100644 --- a/pkg/distlock/distlock.go +++ b/pkg/distlock/distlock.go @@ -8,12 +8,6 @@ type Lock struct { Target any // 锁对象,由具体的Provider去解析 } -type lockData struct { - Path []string `json:"path"` - Name string `json:"name"` - Target string `json:"target"` -} - type LockRequest struct { Locks []Lock } @@ -22,16 +16,11 @@ func (b *LockRequest) Add(lock Lock) { b.Locks = append(b.Locks, lock) } -type lockRequestData struct { - ID string `json:"id"` - Locks []lockData `json:"locks"` -} - type LockProvider interface { // CanLock 判断这个锁能否锁定成功 CanLock(lock Lock) error - // 锁定 + // 锁定。在内部可以不用判断能否加锁,外部需要保证调用此函数前调用了CanLock进行检查 Lock(reqID string, lock Lock) error // 解锁 @@ -55,7 +44,7 @@ func (e *LockTargetBusyError) Error() string { return fmt.Sprintf("the lock object is locked by %s", e.lockName) } -func newLockTargetBusyError(lockName string) *LockTargetBusyError { +func NewLockTargetBusyError(lockName string) *LockTargetBusyError { return &LockTargetBusyError{ lockName: lockName, } diff --git a/pkg/distlock/lockprovider/string_lock_target.go b/pkg/distlock/lockprovider/string_lock_target.go index f5313c2..7288956 100644 --- a/pkg/distlock/lockprovider/string_lock_target.go +++ b/pkg/distlock/lockprovider/string_lock_target.go @@ -1,7 +1,9 @@ package lockprovider +import "gitlink.org.cn/cloudream/common/utils/serder" + type StringLockTarget struct { - Components []StringLockTargetComponet + Components []StringLockTargetComponet `json:"components"` } // IsConflict 判断两个锁对象是否冲突。注:只有相同的结构的Target才有意义 @@ -20,7 +22,7 @@ func (t *StringLockTarget) IsConflict(other *StringLockTarget) bool { } type StringLockTargetComponet struct { - Values []string + Values []string `json:"values"` } // IsEquals 判断两个Component是否相同。注:只有相同的结构的Component才有意义 @@ -37,3 +39,18 @@ func (t *StringLockTargetComponet) IsEquals(other *StringLockTargetComponet) boo return true } + +func StringLockTargetToString(target *StringLockTarget) (string, error) { + data, err := serder.ObjectToJSON(target) + if err != nil { + return "", err + } + + return string(data), nil +} + +func StringLockTargetFromString(str string) (StringLockTarget, error) { + var ret StringLockTarget + err := serder.JSONToObject([]byte(str), &ret) + return ret, err +} diff --git a/pkg/distlock/lease_actor.go b/pkg/distlock/service/lease_actor.go similarity index 99% rename from pkg/distlock/lease_actor.go rename to pkg/distlock/service/lease_actor.go index b3cd1df..3864475 100644 --- a/pkg/distlock/lease_actor.go +++ b/pkg/distlock/service/lease_actor.go @@ -1,4 +1,4 @@ -package distlock +package service import ( "fmt" diff --git a/pkg/distlock/main_actor.go b/pkg/distlock/service/main_actor.go similarity index 97% rename from pkg/distlock/main_actor.go rename to pkg/distlock/service/main_actor.go index 8ec80b9..3f48aa4 100644 --- a/pkg/distlock/main_actor.go +++ b/pkg/distlock/service/main_actor.go @@ -1,4 +1,4 @@ -package distlock +package service import ( "context" @@ -7,13 +7,14 @@ import ( "time" "gitlink.org.cn/cloudream/common/pkg/actor" + "gitlink.org.cn/cloudream/common/pkg/distlock" "gitlink.org.cn/cloudream/common/utils/serder" clientv3 "go.etcd.io/etcd/client/v3" "go.etcd.io/etcd/client/v3/concurrency" ) type mainActor struct { - cfg *Config + cfg *distlock.Config etcdCli *clientv3.Client commandChan *actor.CommandChannel @@ -36,7 +37,7 @@ func (a *mainActor) Init(watchEtcdActor *watchEtcdActor, providersActor *provide } // Acquire 请求一批锁。成功后返回锁请求ID -func (a *mainActor) Acquire(req LockRequest) (reqID string, err error) { +func (a *mainActor) Acquire(req distlock.LockRequest) (reqID string, err error) { return actor.WaitValue[string](a.commandChan, func() (string, error) { // TODO 根据不同的错误设置不同的错误类型,方便上层进行后续处理 unlock, err := a.acquireEtcdRequestDataLock() diff --git a/pkg/distlock/providers_actor.go b/pkg/distlock/service/providers_actor.go similarity index 92% rename from pkg/distlock/providers_actor.go rename to pkg/distlock/service/providers_actor.go index 913c38a..6d19da5 100644 --- a/pkg/distlock/providers_actor.go +++ b/pkg/distlock/service/providers_actor.go @@ -1,9 +1,10 @@ -package distlock +package service import ( "fmt" "gitlink.org.cn/cloudream/common/pkg/actor" + "gitlink.org.cn/cloudream/common/pkg/distlock" "gitlink.org.cn/cloudream/common/pkg/future" "gitlink.org.cn/cloudream/common/pkg/trie" ) @@ -20,8 +21,8 @@ type lockRequestDataUpdateOp struct { type providersActor struct { localLockReqIndex int64 - provdersTrie trie.Trie[LockProvider] - allProviders []LockProvider + provdersTrie trie.Trie[distlock.LockProvider] + allProviders []distlock.LockProvider indexWaiters []indexWaiter @@ -93,7 +94,7 @@ func (svc *providersActor) lockLockRequest(reqData lockRequestData) error { return fmt.Errorf("parse target data failed, err: %w", err) } - err = node.Value.Lock(reqData.ID, Lock{ + err = node.Value.Lock(reqData.ID, distlock.Lock{ Path: lockData.Path, Name: lockData.Name, Target: target, @@ -117,7 +118,7 @@ func (svc *providersActor) unlockLockRequest(reqData lockRequestData) error { return fmt.Errorf("parse target data failed, err: %w", err) } - err = node.Value.Unlock(reqData.ID, Lock{ + err = node.Value.Unlock(reqData.ID, distlock.Lock{ Path: lockData.Path, Name: lockData.Name, Target: target, @@ -130,7 +131,7 @@ func (svc *providersActor) unlockLockRequest(reqData lockRequestData) error { } // TestLockRequestAndMakeData 判断锁能否锁成功,并生成锁数据的字符串表示。注:不会生成请求ID -func (a *providersActor) TestLockRequestAndMakeData(req LockRequest) (lockRequestData, error) { +func (a *providersActor) TestLockRequestAndMakeData(req distlock.LockRequest) (lockRequestData, error) { return actor.WaitValue[lockRequestData](a.commandChan, func() (lockRequestData, error) { reqData := lockRequestData{} diff --git a/pkg/distlock/service.go b/pkg/distlock/service/service.go similarity index 86% rename from pkg/distlock/service.go rename to pkg/distlock/service/service.go index 7b6272f..ea91562 100644 --- a/pkg/distlock/service.go +++ b/pkg/distlock/service/service.go @@ -1,9 +1,10 @@ -package distlock +package service import ( "fmt" "time" + "gitlink.org.cn/cloudream/common/pkg/distlock" clientv3 "go.etcd.io/etcd/client/v3" ) @@ -13,8 +14,19 @@ const ( LOCK_REQUEST_LOCK_NAME = "/distlock/lockRequest/lock" ) +type lockData struct { + Path []string `json:"path"` + Name string `json:"name"` + Target string `json:"target"` +} + +type lockRequestData struct { + ID string `json:"id"` + Locks []lockData `json:"locks"` +} + type Service struct { - cfg *Config + cfg *distlock.Config etcdCli *clientv3.Client mainActor *mainActor @@ -23,7 +35,7 @@ type Service struct { leaseActor *leaseActor } -func NewService(cfg *Config) (*Service, error) { +func NewService(cfg *distlock.Config) (*Service, error) { etcdCli, err := clientv3.New(clientv3.Config{ Endpoints: []string{cfg.EtcdAddress}, Username: cfg.EtcdUsername, @@ -55,7 +67,7 @@ func NewService(cfg *Config) (*Service, error) { } // Acquire 请求一批锁。成功后返回锁请求ID -func (svc *Service) Acquire(req LockRequest) (string, error) { +func (svc *Service) Acquire(req distlock.LockRequest) (string, error) { reqID, err := svc.mainActor.Acquire(req) if err != nil { return "", err diff --git a/pkg/distlock/utils.go b/pkg/distlock/service/utils.go similarity index 99% rename from pkg/distlock/utils.go rename to pkg/distlock/service/utils.go index d4cc110..6fbbdcb 100644 --- a/pkg/distlock/utils.go +++ b/pkg/distlock/service/utils.go @@ -1,4 +1,4 @@ -package distlock +package service import ( "strings" diff --git a/pkg/distlock/utils_test.go b/pkg/distlock/service/utils_test.go similarity index 98% rename from pkg/distlock/utils_test.go rename to pkg/distlock/service/utils_test.go index 3e292c8..e2ac54b 100644 --- a/pkg/distlock/utils_test.go +++ b/pkg/distlock/service/utils_test.go @@ -1,4 +1,4 @@ -package distlock +package service /* import ( diff --git a/pkg/distlock/watch_etcd_actor.go b/pkg/distlock/service/watch_etcd_actor.go similarity index 99% rename from pkg/distlock/watch_etcd_actor.go rename to pkg/distlock/service/watch_etcd_actor.go index 5972406..55f560f 100644 --- a/pkg/distlock/watch_etcd_actor.go +++ b/pkg/distlock/service/watch_etcd_actor.go @@ -1,4 +1,4 @@ -package distlock +package service import ( "context" diff --git a/pkg/event/event.go b/pkg/event/event.go deleted file mode 100644 index cc4d853..0000000 --- a/pkg/event/event.go +++ /dev/null @@ -1,6 +0,0 @@ -package event - -type Event[TArgs any] interface { - TryMerge(other Event[TArgs]) bool // 尝试将other任务与自身合并,如果成功返回true - Execute(ctx ExecuteContext[TArgs]) -} diff --git a/pkg/event/executor.go b/pkg/event/executor.go deleted file mode 100644 index a73e470..0000000 --- a/pkg/event/executor.go +++ /dev/null @@ -1,116 +0,0 @@ -package event - -import ( - "sync" - - "github.com/zyedidia/generic/list" - mysync "gitlink.org.cn/cloudream/common/utils/sync" -) - -type ExecuteOption struct { - IsEmergency bool - DontMerge bool -} -type ExecuteContext[TArgs any] struct { - Executor *Executor[TArgs] - Option ExecuteOption - Args TArgs -} -type postedEvent[TArgs any] struct { - Event Event[TArgs] - Option ExecuteOption -} - -type Executor[TArgs any] struct { - events *list.List[postedEvent[TArgs]] - locker sync.Mutex - eventCond *mysync.CounterCond - execArgs TArgs -} - -func NewExecutor[TArgs any](args TArgs) Executor[TArgs] { - return Executor[TArgs]{ - events: list.New[postedEvent[TArgs]](), - locker: sync.Mutex{}, - eventCond: mysync.NewCounterCond(0), - execArgs: args, - } -} - -func (e *Executor[TArgs]) Post(event Event[TArgs], opts ...ExecuteOption) { - opt := ExecuteOption{ - IsEmergency: false, - DontMerge: false, - } - - if len(opts) > 0 { - opt = opts[0] - } - - e.locker.Lock() - defer e.locker.Unlock() - - // 紧急任务直接插入到队头,不进行合并 - if opt.IsEmergency { - e.events.PushFront(postedEvent[TArgs]{ - Event: event, - Option: opt, - }) - e.eventCond.Release() - return - } - - // 合并任务 - if opt.DontMerge { - ptr := e.events.Front - for ptr != nil { - // 只与非紧急任务,且允许合并的任务进行合并 - if !ptr.Value.Option.IsEmergency && !ptr.Value.Option.DontMerge { - if ptr.Value.Event.TryMerge(event) { - return - } - } - - ptr = ptr.Next - } - } - - e.events.PushBack(postedEvent[TArgs]{ - Event: event, - Option: opt, - }) - e.eventCond.Release() -} - -// Execute 开始执行任务 -func (e *Executor[TArgs]) Execute() error { - for { - e.eventCond.Wait() - - event := e.popFrontEvent() - if event == nil { - continue - } - - ctx := ExecuteContext[TArgs]{ - Executor: e, - Option: event.Option, - Args: e.execArgs, - } - - event.Event.Execute(ctx) - } -} - -func (e *Executor[TArgs]) popFrontEvent() *postedEvent[TArgs] { - e.locker.Lock() - defer e.locker.Unlock() - - if e.events.Front == nil { - return nil - } - - val := &e.events.Front.Value - e.events.Remove(e.events.Front) - return val -} diff --git a/utils/lo/lo.go b/utils/lo/lo.go new file mode 100644 index 0000000..bbc3461 --- /dev/null +++ b/utils/lo/lo.go @@ -0,0 +1,20 @@ +package lo + +import "github.com/samber/lo" + +func Remove[T comparable](arr []T, item T) []T { + index := lo.IndexOf(arr, item) + if index == -1 { + return arr + } + + return RemoveAt(arr, index) +} + +func RemoveAt[T any](arr []T, index int) []T { + if index >= len(arr) { + return arr + } + + return append(arr[:index], arr[:index+1]...) +}