diff --git a/pkg/distlock/const.go b/pkg/distlock/const.go new file mode 100644 index 0000000..9cb56e7 --- /dev/null +++ b/pkg/distlock/const.go @@ -0,0 +1,7 @@ +package distlock + +const ( + METADATA_LOCK_PATH_PREFIX = "Metadata" + IPFS_LOCK_PATH_PREFIX = "IPFS" + STORAGE_LOCK_PATH_PREFIX = "Storage" +) diff --git a/pkg/distlock/lockprovider/string_lock_target.go b/pkg/distlock/lockprovider/string_lock_target.go index 7288956..afc6ceb 100644 --- a/pkg/distlock/lockprovider/string_lock_target.go +++ b/pkg/distlock/lockprovider/string_lock_target.go @@ -1,11 +1,28 @@ package lockprovider -import "gitlink.org.cn/cloudream/common/utils/serder" +import ( + "fmt" + + "github.com/samber/lo" + "gitlink.org.cn/cloudream/common/utils/serder" +) type StringLockTarget struct { Components []StringLockTargetComponet `json:"components"` } +func NewStringLockTarget() *StringLockTarget { + return &StringLockTarget{} +} + +func (t *StringLockTarget) AddComponent(values ...any) *StringLockTarget { + t.Components = append(t.Components, StringLockTargetComponet{ + Values: lo.Map(values, func(val any, index int) string { return fmt.Sprintf("%v", val) }), + }) + + return t +} + // IsConflict 判断两个锁对象是否冲突。注:只有相同的结构的Target才有意义 func (t *StringLockTarget) IsConflict(other *StringLockTarget) bool { if len(t.Components) != len(other.Components) { diff --git a/pkg/distlock/reqbuilder/ipfs.go b/pkg/distlock/reqbuilder/ipfs.go new file mode 100644 index 0000000..2fee8e4 --- /dev/null +++ b/pkg/distlock/reqbuilder/ipfs.go @@ -0,0 +1,73 @@ +package reqbuilder + +import ( + "strconv" + + "gitlink.org.cn/cloudream/common/pkg/distlock" + "gitlink.org.cn/cloudream/common/pkg/distlock/lockprovider" +) + +type IPFSLockReqBuilder struct { + *LockRequestBuilder +} + +func (b *LockRequestBuilder) IPFS() *IPFSLockReqBuilder { + return &IPFSLockReqBuilder{LockRequestBuilder: b} +} + +func (b *MetadataLockReqBuilder) IPFS() *IPFSLockReqBuilder { + return &IPFSLockReqBuilder{LockRequestBuilder: b.LockRequestBuilder} +} + +func (b *StorageLockReqBuilder) IPFS() *IPFSLockReqBuilder { + return &IPFSLockReqBuilder{LockRequestBuilder: b.LockRequestBuilder} +} + +func (b *IPFSLockReqBuilder) ReadOneRep(nodeID int, fileHash string) *IPFSLockReqBuilder { + b.locks = append(b.locks, distlock.Lock{ + Path: b.makePath(nodeID), + Name: lockprovider.IPFS_ELEMENT_READ_LOCK, + Target: *lockprovider.NewStringLockTarget().AddComponent(fileHash), + }) + return b +} + +func (b *IPFSLockReqBuilder) WriteOneRep(nodeID int, fileHash string) *IPFSLockReqBuilder { + b.locks = append(b.locks, distlock.Lock{ + Path: b.makePath(nodeID), + Name: lockprovider.IPFS_ELEMENT_WRITE_LOCK, + Target: *lockprovider.NewStringLockTarget().AddComponent(fileHash), + }) + return b +} + +func (b *IPFSLockReqBuilder) ReadAnyRep(nodeID int) *IPFSLockReqBuilder { + b.locks = append(b.locks, distlock.Lock{ + Path: b.makePath(nodeID), + Name: lockprovider.IPFS_SET_READ_LOCK, + Target: *lockprovider.NewStringLockTarget(), + }) + return b +} + +func (b *IPFSLockReqBuilder) WriteAnyRep(nodeID int) *IPFSLockReqBuilder { + b.locks = append(b.locks, distlock.Lock{ + Path: b.makePath(nodeID), + Name: lockprovider.IPFS_SET_WRITE_LOCK, + Target: *lockprovider.NewStringLockTarget(), + }) + return b +} + +func (b *IPFSLockReqBuilder) CreateAnyRep(nodeID int) *IPFSLockReqBuilder { + b.locks = append(b.locks, distlock.Lock{ + Path: b.makePath(nodeID), + Name: lockprovider.IPFS_SET_CREATE_LOCK, + Target: *lockprovider.NewStringLockTarget(), + }) + return b +} + +func (b *IPFSLockReqBuilder) makePath(nodeID int) []string { + return []string{distlock.IPFS_LOCK_PATH_PREFIX, strconv.Itoa(nodeID)} +} diff --git a/pkg/distlock/reqbuilder/lock_request_builder.go b/pkg/distlock/reqbuilder/lock_request_builder.go new file mode 100644 index 0000000..b475992 --- /dev/null +++ b/pkg/distlock/reqbuilder/lock_request_builder.go @@ -0,0 +1,16 @@ +package reqbuilder + +import ( + "gitlink.org.cn/cloudream/common/pkg/distlock" + mylo "gitlink.org.cn/cloudream/common/utils/lo" +) + +type LockRequestBuilder struct { + locks []distlock.Lock +} + +func (b *LockRequestBuilder) Build() distlock.LockRequest { + return distlock.LockRequest{ + Locks: mylo.ArrayClone(b.locks), + } +} diff --git a/pkg/distlock/reqbuilder/metadata.go b/pkg/distlock/reqbuilder/metadata.go new file mode 100644 index 0000000..089d945 --- /dev/null +++ b/pkg/distlock/reqbuilder/metadata.go @@ -0,0 +1,75 @@ +package reqbuilder + +import ( + "gitlink.org.cn/cloudream/common/pkg/distlock" + "gitlink.org.cn/cloudream/common/pkg/distlock/lockprovider" +) + +type MetadataLockReqBuilder struct { + *LockRequestBuilder +} + +func (b *LockRequestBuilder) Metadata() *MetadataLockReqBuilder { + return &MetadataLockReqBuilder{LockRequestBuilder: b} +} + +func (b *IPFSLockReqBuilder) Metadata() *MetadataLockReqBuilder { + return &MetadataLockReqBuilder{LockRequestBuilder: b.LockRequestBuilder} +} + +func (b *StorageLockReqBuilder) Metadata() *MetadataLockReqBuilder { + return &MetadataLockReqBuilder{LockRequestBuilder: b.LockRequestBuilder} +} + +func (b *MetadataLockReqBuilder) ReadOneNode(nodeID int) *MetadataLockReqBuilder { + b.locks = append(b.locks, distlock.Lock{ + Path: b.makePath("Node"), + Name: lockprovider.METADATA_ELEMENT_READ_LOCK, + Target: *lockprovider.NewStringLockTarget().AddComponent(nodeID), + }) + return b +} +func (b *MetadataLockReqBuilder) WriteOneNode(nodeID int) *MetadataLockReqBuilder { + b.locks = append(b.locks, distlock.Lock{ + Path: b.makePath("Node"), + Name: lockprovider.METADATA_ELEMENT_WRITE_LOCK, + Target: *lockprovider.NewStringLockTarget().AddComponent(nodeID), + }) + return b +} +func (b *MetadataLockReqBuilder) CreateOneNode() *MetadataLockReqBuilder { + b.locks = append(b.locks, distlock.Lock{ + Path: b.makePath("Node"), + Name: lockprovider.METADATA_ELEMENT_CREATE_LOCK, + Target: *lockprovider.NewStringLockTarget(), + }) + return b +} +func (b *MetadataLockReqBuilder) ReadAnyNode() *MetadataLockReqBuilder { + b.locks = append(b.locks, distlock.Lock{ + Path: b.makePath("Node"), + Name: lockprovider.METADATA_SET_READ_LOCK, + Target: *lockprovider.NewStringLockTarget(), + }) + return b +} +func (b *MetadataLockReqBuilder) WriteAnyNode(nodeID int) *MetadataLockReqBuilder { + b.locks = append(b.locks, distlock.Lock{ + Path: b.makePath("Node"), + Name: lockprovider.METADATA_SET_WRITE_LOCK, + Target: *lockprovider.NewStringLockTarget(), + }) + return b +} +func (b *MetadataLockReqBuilder) CreateAnyNode() *MetadataLockReqBuilder { + b.locks = append(b.locks, distlock.Lock{ + Path: b.makePath("Node"), + Name: lockprovider.METADATA_SET_CREATE_LOCK, + Target: *lockprovider.NewStringLockTarget(), + }) + return b +} + +func (b *MetadataLockReqBuilder) makePath(tableName string) []string { + return []string{distlock.METADATA_LOCK_PATH_PREFIX, tableName} +} diff --git a/pkg/distlock/reqbuilder/storage.go b/pkg/distlock/reqbuilder/storage.go new file mode 100644 index 0000000..6e55039 --- /dev/null +++ b/pkg/distlock/reqbuilder/storage.go @@ -0,0 +1,82 @@ +package reqbuilder + +import ( + "strconv" + + "gitlink.org.cn/cloudream/common/pkg/distlock" + "gitlink.org.cn/cloudream/common/pkg/distlock/lockprovider" +) + +type StorageLockReqBuilder struct { + *LockRequestBuilder +} + +func (b *LockRequestBuilder) Storage() *StorageLockReqBuilder { + return &StorageLockReqBuilder{LockRequestBuilder: b} +} + +func (b *MetadataLockReqBuilder) Storage() *StorageLockReqBuilder { + return &StorageLockReqBuilder{LockRequestBuilder: b.LockRequestBuilder} +} + +func (b *IPFSLockReqBuilder) Storage() *StorageLockReqBuilder { + return &StorageLockReqBuilder{LockRequestBuilder: b.LockRequestBuilder} +} + +func (b *StorageLockReqBuilder) ReadOneObject(storageID int, fileHash string) *StorageLockReqBuilder { + b.locks = append(b.locks, distlock.Lock{ + Path: b.makePath(storageID), + Name: lockprovider.STORAGE_ELEMENT_READ_LOCK, + Target: *lockprovider.NewStringLockTarget().AddComponent(fileHash), + }) + return b +} + +func (b *StorageLockReqBuilder) WriteOneObject(storageID int, fileHash string) *StorageLockReqBuilder { + b.locks = append(b.locks, distlock.Lock{ + Path: b.makePath(storageID), + Name: lockprovider.STORAGE_ELEMENT_WRITE_LOCK, + Target: *lockprovider.NewStringLockTarget().AddComponent(fileHash), + }) + return b +} + +func (b *StorageLockReqBuilder) CreateOneObject(storageID int, fileHash string) *StorageLockReqBuilder { + b.locks = append(b.locks, distlock.Lock{ + Path: b.makePath(storageID), + Name: lockprovider.STORAGE_ELEMENT_WRITE_LOCK, + Target: *lockprovider.NewStringLockTarget().AddComponent(fileHash), + }) + return b +} + +func (b *StorageLockReqBuilder) ReadAnyObject(storageID int) *StorageLockReqBuilder { + b.locks = append(b.locks, distlock.Lock{ + Path: b.makePath(storageID), + Name: lockprovider.STORAGE_SET_READ_LOCK, + Target: *lockprovider.NewStringLockTarget(), + }) + return b +} + +func (b *StorageLockReqBuilder) WriteAnyObject(storageID int) *StorageLockReqBuilder { + b.locks = append(b.locks, distlock.Lock{ + Path: b.makePath(storageID), + Name: lockprovider.STORAGE_SET_WRITE_LOCK, + Target: *lockprovider.NewStringLockTarget(), + }) + return b +} + +func (b *StorageLockReqBuilder) CreateAnyObject(storageID int) *StorageLockReqBuilder { + b.locks = append(b.locks, distlock.Lock{ + Path: b.makePath(storageID), + Name: lockprovider.STORAGE_SET_CREATE_LOCK, + Target: *lockprovider.NewStringLockTarget(), + }) + return b +} + +func (b *StorageLockReqBuilder) makePath(storageID int) []string { + return []string{distlock.STORAGE_LOCK_PATH_PREFIX, strconv.Itoa(storageID)} +} diff --git a/pkg/distlock/service/init_providers.go b/pkg/distlock/service/init_providers.go index 8218e6a..c1168ce 100644 --- a/pkg/distlock/service/init_providers.go +++ b/pkg/distlock/service/init_providers.go @@ -1,6 +1,7 @@ package service import ( + "gitlink.org.cn/cloudream/common/pkg/distlock" "gitlink.org.cn/cloudream/common/pkg/distlock/lockprovider" "gitlink.org.cn/cloudream/common/pkg/distlock/service/internal" "gitlink.org.cn/cloudream/common/pkg/trie" @@ -15,25 +16,25 @@ func initProviders(providers *internal.ProvidersActor) { } func initMetadataLockProviders(providers *internal.ProvidersActor) { - providers.AddProvider(lockprovider.NewMetadataLock(), "Metadata", "Node") - providers.AddProvider(lockprovider.NewMetadataLock(), "Metadata", "Storage") - providers.AddProvider(lockprovider.NewMetadataLock(), "Metadata", "User") - providers.AddProvider(lockprovider.NewMetadataLock(), "Metadata", "UserBucket") - providers.AddProvider(lockprovider.NewMetadataLock(), "Metadata", "UserNode") - providers.AddProvider(lockprovider.NewMetadataLock(), "Metadata", "UserStorage") - providers.AddProvider(lockprovider.NewMetadataLock(), "Metadata", "Bucket") - providers.AddProvider(lockprovider.NewMetadataLock(), "Metadata", "Object") - providers.AddProvider(lockprovider.NewMetadataLock(), "Metadata", "ObjectRep") - providers.AddProvider(lockprovider.NewMetadataLock(), "Metadata", "ObjectBlock") - providers.AddProvider(lockprovider.NewMetadataLock(), "Metadata", "Cache") - providers.AddProvider(lockprovider.NewMetadataLock(), "Metadata", "StorageObject") - providers.AddProvider(lockprovider.NewMetadataLock(), "Metadata", "Location") + providers.AddProvider(lockprovider.NewMetadataLock(), distlock.METADATA_LOCK_PATH_PREFIX, "Node") + providers.AddProvider(lockprovider.NewMetadataLock(), distlock.METADATA_LOCK_PATH_PREFIX, "Storage") + providers.AddProvider(lockprovider.NewMetadataLock(), distlock.METADATA_LOCK_PATH_PREFIX, "User") + providers.AddProvider(lockprovider.NewMetadataLock(), distlock.METADATA_LOCK_PATH_PREFIX, "UserBucket") + providers.AddProvider(lockprovider.NewMetadataLock(), distlock.METADATA_LOCK_PATH_PREFIX, "UserNode") + providers.AddProvider(lockprovider.NewMetadataLock(), distlock.METADATA_LOCK_PATH_PREFIX, "UserStorage") + providers.AddProvider(lockprovider.NewMetadataLock(), distlock.METADATA_LOCK_PATH_PREFIX, "Bucket") + providers.AddProvider(lockprovider.NewMetadataLock(), distlock.METADATA_LOCK_PATH_PREFIX, "Object") + providers.AddProvider(lockprovider.NewMetadataLock(), distlock.METADATA_LOCK_PATH_PREFIX, "ObjectRep") + providers.AddProvider(lockprovider.NewMetadataLock(), distlock.METADATA_LOCK_PATH_PREFIX, "ObjectBlock") + providers.AddProvider(lockprovider.NewMetadataLock(), distlock.METADATA_LOCK_PATH_PREFIX, "Cache") + providers.AddProvider(lockprovider.NewMetadataLock(), distlock.METADATA_LOCK_PATH_PREFIX, "StorageObject") + providers.AddProvider(lockprovider.NewMetadataLock(), distlock.METADATA_LOCK_PATH_PREFIX, "Location") } func initIPFSLockProviders(providers *internal.ProvidersActor) { - providers.AddProvider(lockprovider.NewIPFSLock(), "IPFS", trie.WORD_ANY) + providers.AddProvider(lockprovider.NewIPFSLock(), distlock.IPFS_LOCK_PATH_PREFIX, trie.WORD_ANY) } func initStorageLockProviders(providers *internal.ProvidersActor) { - providers.AddProvider(lockprovider.NewStorageLock(), "Storage", trie.WORD_ANY) + providers.AddProvider(lockprovider.NewStorageLock(), distlock.STORAGE_LOCK_PATH_PREFIX, trie.WORD_ANY) } diff --git a/utils/lo/lo.go b/utils/lo/lo.go index bbc3461..85f3dc7 100644 --- a/utils/lo/lo.go +++ b/utils/lo/lo.go @@ -18,3 +18,7 @@ func RemoveAt[T any](arr []T, index int) []T { return append(arr[:index], arr[:index+1]...) } + +func ArrayClone[T any](arr []T) []T { + return append([]T{}, arr...) +}