diff --git a/client/internal/services/object.go b/client/internal/services/object.go index 8f9204a..5bfbbe9 100644 --- a/client/internal/services/object.go +++ b/client/internal/services/object.go @@ -15,6 +15,7 @@ import ( "gitlink.org.cn/cloudream/jcs-pub/client/sdk/api/v1" "gitlink.org.cn/cloudream/jcs-pub/client/types" "gitlink.org.cn/cloudream/jcs-pub/common/models/datamap" + "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/distlock/reqbuilder" "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/ioswitch2/ops2" "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/ioswitch2/plans" "gorm.io/gorm" @@ -681,6 +682,8 @@ func (svc *ObjectService) CompleteMultipartUpload(objectID types.ObjectID, index objBlkMap[blk.Index] = blk } + lockBld := reqbuilder.NewBuilder() + var compBlks []types.ObjectBlock var compBlkSpaces []types.UserSpaceDetail var targetSpace types.UserSpaceDetail @@ -700,7 +703,14 @@ func (svc *ObjectService) CompleteMultipartUpload(objectID types.ObjectID, index if i == 0 { targetSpace = *stg } + lockBld.UserSpace().Buzy(stg.UserSpace.UserSpaceID) + } + + mutex, err := lockBld.MutexLock(svc.PubLock) + if err != nil { + return types.Object{}, fmt.Errorf("acquire lock: %w", err) } + defer mutex.Unlock() bld := exec.NewPlanBuilder() err = plans.CompleteMultipart(compBlks, compBlkSpaces, targetSpace, "shard", bld) diff --git a/client/internal/services/user_space.go b/client/internal/services/user_space.go index 799e4c0..65c9838 100644 --- a/client/internal/services/user_space.go +++ b/client/internal/services/user_space.go @@ -90,7 +90,7 @@ func (svc *UserSpaceService) DownloadPackage(packageID clitypes.PackageID, users } mutex, err := reqbuilder.NewBuilder(). - Shard().Buzy(userspaceID). + UserSpace().Buzy(userspaceID). MutexLock(svc.PubLock) if err != nil { return fmt.Errorf("acquire locks failed, err: %w", err) @@ -122,12 +122,6 @@ func (svc *UserSpaceService) SpaceToSpace(srcSpaceID clitypes.UserSpaceID, srcPa if err != nil { return clitypes.SpaceToSpaceResult{}, fmt.Errorf("get source userspace store: %w", err) } - // srcAddr, ok := srcSpace.RecommendHub.Address.(*cortypes.GRPCAddressInfo) - // if !ok { - // return clitypes.SpaceToSpaceResult{}, fmt.Errorf("source userspace %v has no grpc address", srcSpaceID) - // } - // srcSpaceCli := stgglb.HubRPCPool.Get(stgglb.SelectGRPCAddress(&srcSpace.RecommendHub, srcAddr)) - // defer srcSpaceCli.Release() dstSpace := svc.UserSpaceMeta.Get(dstSpaceID) if dstSpace == nil { @@ -138,12 +132,6 @@ func (svc *UserSpaceService) SpaceToSpace(srcSpaceID clitypes.UserSpaceID, srcPa if err != nil { return clitypes.SpaceToSpaceResult{}, fmt.Errorf("get destination userspace store: %w", err) } - // dstAddr, ok := dstSpace.RecommendHub.Address.(*cortypes.GRPCAddressInfo) - // if !ok { - // return clitypes.SpaceToSpaceResult{}, fmt.Errorf("destination userspace %v has no grpc address", srcSpaceID) - // } - // dstSpaceCli := stgglb.HubRPCPool.Get(stgglb.SelectGRPCAddress(&dstSpace.RecommendHub, dstAddr)) - // defer dstSpaceCli.Release() srcPath = strings.Trim(srcPath, cdssdk.ObjectPathSeparator) dstPath = strings.Trim(dstPath, cdssdk.ObjectPathSeparator) @@ -208,6 +196,12 @@ func (svc *UserSpaceService) SpaceToSpace(srcSpaceID clitypes.UserSpaceID, srcPa return trie.VisitContinue }) + mutex, err := reqbuilder.NewBuilder().UserSpace().Buzy(srcSpaceID).Buzy(dstSpaceID).MutexLock(svc.PubLock) + if err != nil { + return clitypes.SpaceToSpaceResult{}, fmt.Errorf("acquire lock: %w", err) + } + defer mutex.Unlock() + var success []string var failed []string @@ -243,10 +237,6 @@ func (svc *UserSpaceService) SpaceToSpace(srcSpaceID clitypes.UserSpaceID, srcPa } for _, d := range newDirPathes { - // mkdirResp, err := dstStore.Mkdir(context.Background(), &hubrpc.BaseStoreMkdirs{ - // UserSpace: *dstSpace, - // Pathes: newDirPathes, - // }) err := dstStore.Mkdir(d) if err != nil { failed = append(failed, d) @@ -254,18 +244,6 @@ func (svc *UserSpaceService) SpaceToSpace(srcSpaceID clitypes.UserSpaceID, srcPa success = append(success, d) } } - // if err != nil { - // failed = append(failed, dirPathes...) - // logger.Warnf("s2s: mkdirs to destination userspace: %v", err) - // } else { - // for i := range dirPathes { - // if mkdirResp.Successes[i] { - // success = append(success, dirPathes[i]) - // } else { - // failed = append(failed, dirPathes[i]) - // } - // } - // } return clitypes.SpaceToSpaceResult{ Success: success, diff --git a/client/internal/ticktock/change_redundancy.go b/client/internal/ticktock/change_redundancy.go index 6797f58..f599e20 100644 --- a/client/internal/ticktock/change_redundancy.go +++ b/client/internal/ticktock/change_redundancy.go @@ -143,7 +143,7 @@ func (j *ChangeRedundancy) changeOne(ctx *changeRedundancyContext, pkg clitypes. reqBlder := reqbuilder.NewBuilder() for _, space := range selectedSpaces { - reqBlder.Shard().Buzy(space.UserSpace.UserSpace.UserSpaceID) + reqBlder.UserSpace().Buzy(space.UserSpace.UserSpace.UserSpaceID) } err := reen.Lock(reqBlder.Build()) if err != nil { diff --git a/client/internal/ticktock/redundancy_shrink.go b/client/internal/ticktock/redundancy_shrink.go index 73a363b..c5624ea 100644 --- a/client/internal/ticktock/redundancy_shrink.go +++ b/client/internal/ticktock/redundancy_shrink.go @@ -909,7 +909,7 @@ func (t *ChangeRedundancy) generateSysEventForECObject(solu annealingSolution, o func (t *ChangeRedundancy) executePlans(ctx *changeRedundancyContext, planBld *exec.PlanBuilder, planningSpaceIDs map[clitypes.UserSpaceID]bool, reen *distlock.Reentrant) (map[string]exec.VarValue, error) { reqBlder := reqbuilder.NewBuilder() for id, _ := range planningSpaceIDs { - reqBlder.Shard().Buzy(id) + reqBlder.UserSpace().Buzy(id) } err := reen.Lock(reqBlder.Build()) if err != nil { diff --git a/client/internal/ticktock/shardstore_gc.go b/client/internal/ticktock/shardstore_gc.go index 31441b6..919c9f9 100644 --- a/client/internal/ticktock/shardstore_gc.go +++ b/client/internal/ticktock/shardstore_gc.go @@ -12,16 +12,16 @@ import ( "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/distlock/reqbuilder" ) -type ShardStoreGC struct { +type UserSpaceGC struct { } -func (j *ShardStoreGC) Name() string { - return reflect2.TypeNameOf[ShardStoreGC]() +func (j *UserSpaceGC) Name() string { + return reflect2.TypeNameOf[UserSpaceGC]() } // Execute 执行垃圾回收操作。 -func (j *ShardStoreGC) Execute(t *TickTock) { - log := logger.WithType[ShardStoreGC]("Event") +func (j *UserSpaceGC) Execute(t *TickTock) { + log := logger.WithType[UserSpaceGC]("Event") startTime := time.Now() log.Infof("job start") defer func() { @@ -40,26 +40,35 @@ func (j *ShardStoreGC) Execute(t *TickTock) { continue } - err := j.gcOne(t, detail) - if err != nil { - log.Warnf("gc one user space: %v: %v", spaceID, err) - continue - } + j.gcOne(t, detail) } } -func (j *ShardStoreGC) gcOne(t *TickTock, space *types.UserSpaceDetail) error { - mutex, err := reqbuilder.NewBuilder().Shard().GC(space.UserSpace.UserSpaceID).MutexLock(t.pubLock) +func (j *UserSpaceGC) gcOne(t *TickTock, space *types.UserSpaceDetail) { + log := logger.WithType[UserSpaceGC]("Event") + + mutex, err := reqbuilder.NewBuilder().UserSpace().GC(space.UserSpace.UserSpaceID).MutexLock(t.pubLock) if err != nil { - return fmt.Errorf("acquire lock: %w", err) + log.Warnf("acquire lock: %v", err) + return } defer mutex.Unlock() + if err := j.gcShards(t, space); err != nil { + log.Warnf("gc shard store of %v: %v", space.UserSpace.UserSpaceID, err) + } + + if err := j.gcTemps(t, space); err != nil { + log.Warnf("gc base store of %v: %v", space.UserSpace.UserSpaceID, err) + } +} + +func (j *UserSpaceGC) gcShards(t *TickTock, space *types.UserSpaceDetail) error { db2 := t.db // 收集需要进行垃圾回收的文件哈希值 var allFileHashes []types.FileHash - err = db2.DoTx(func(tx db.SQLContext) error { + err := db2.DoTx(func(tx db.SQLContext) error { blocks, err := db2.ObjectBlock().GetByUserSpaceID(tx, space.UserSpace.UserSpaceID) if err != nil { return fmt.Errorf("getting object blocks by hub id: %w", err) @@ -93,25 +102,14 @@ func (j *ShardStoreGC) gcOne(t *TickTock, space *types.UserSpaceDetail) error { } return nil +} - // // 获取与节点通信的代理客户端 - // addr, ok := space.RecommendHub.Address.(*cortypes.GRPCAddressInfo) - // if !ok { - // return fmt.Errorf("master of user space %v has no grpc address", space.UserSpace) - // } - // agtCli := stgglb.HubRPCPool.Get(stgglb.SelectGRPCAddress(&space.RecommendHub, addr)) - // defer agtCli.Release() - - // // 向代理发送垃圾回收请求 - // ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(time.Minute)) - // defer cancel() - - // _, cerr := agtCli.CacheGC(ctx, &hubrpc.CacheGC{ - // UserSpace: *space, - // Availables: allFileHashes, - // }) - // if cerr != nil { - // return fmt.Errorf("request to cache gc: %w", cerr.ToError()) - // } - // return nil +func (j *UserSpaceGC) gcTemps(t *TickTock, space *types.UserSpaceDetail) error { + store, err := t.stgPool.GetBaseStore(space) + if err != nil { + return fmt.Errorf("getting base store: %w", err) + } + + store.CleanTemps() + return nil } diff --git a/client/internal/ticktock/ticktock.go b/client/internal/ticktock/ticktock.go index d0c726f..39f0aa4 100644 --- a/client/internal/ticktock/ticktock.go +++ b/client/internal/ticktock/ticktock.go @@ -97,7 +97,7 @@ func (t *TickTock) initJobs() { gocron.NewAtTime(1, 0, 0), ))) - t.addJob(&ShardStoreGC{}, gocron.DailyJob(1, gocron.NewAtTimes( + t.addJob(&UserSpaceGC{}, gocron.DailyJob(1, gocron.NewAtTimes( gocron.NewAtTime(2, 0, 0), ))) diff --git a/client/internal/uploader/uploader.go b/client/internal/uploader/uploader.go index 012179f..632bdbd 100644 --- a/client/internal/uploader/uploader.go +++ b/client/internal/uploader/uploader.go @@ -97,7 +97,7 @@ func (u *Uploader) BeginUpdate(pkgID clitypes.PackageID, affinity clitypes.UserS target := u.chooseUploadStorage(uploadSpaces, affinity) // 防止上传的副本被清除 - pubLock, err := reqbuilder.NewBuilder().Shard().Buzy(target.Space.UserSpace.UserSpaceID).MutexLock(u.pubLock) + pubLock, err := reqbuilder.NewBuilder().UserSpace().Buzy(target.Space.UserSpace.UserSpaceID).MutexLock(u.pubLock) if err != nil { return nil, fmt.Errorf("acquire lock: %w", err) } @@ -160,7 +160,7 @@ func (u *Uploader) BeginCreateUpload(bktID clitypes.BucketID, pkgName string, co reqBld := reqbuilder.NewBuilder() for _, stg := range spacesStgs { - reqBld.Shard().Buzy(stg.UserSpace.UserSpaceID) + reqBld.UserSpace().Buzy(stg.UserSpace.UserSpaceID) } lock, err := reqBld.MutexLock(u.pubLock) if err != nil { @@ -242,7 +242,7 @@ func (u *Uploader) UploadPart(objID clitypes.ObjectID, index int, stream io.Read space = u.chooseUploadStorage(userStgs, 0).Space } - lock, err := reqbuilder.NewBuilder().Shard().Buzy(space.UserSpace.UserSpaceID).MutexLock(u.pubLock) + lock, err := reqbuilder.NewBuilder().UserSpace().Buzy(space.UserSpace.UserSpaceID).MutexLock(u.pubLock) if err != nil { return fmt.Errorf("acquire lock: %w", err) } diff --git a/client/internal/uploader/user_space_upload.go b/client/internal/uploader/user_space_upload.go index 9c4f8a0..cf7248a 100644 --- a/client/internal/uploader/user_space_upload.go +++ b/client/internal/uploader/user_space_upload.go @@ -12,6 +12,7 @@ import ( "gitlink.org.cn/cloudream/jcs-pub/client/internal/db" clitypes "gitlink.org.cn/cloudream/jcs-pub/client/types" stgglb "gitlink.org.cn/cloudream/jcs-pub/common/globals" + "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/distlock/reqbuilder" "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/ioswitch2" "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/ioswitch2/ops2" "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/ioswitch2/parser" @@ -98,23 +99,6 @@ func (u *Uploader) UserSpaceUpload(userSpaceID clitypes.UserSpaceID, rootPath st targetSapce := u.chooseUploadStorage(uploadSpaces, uploadAffinity) - // addr, ok := srcSpace.RecommendHub.Address.(*cortypes.GRPCAddressInfo) - // if !ok { - // delPkg() - // return nil, fmt.Errorf("master of user space %v has no grpc address", srcSpace.UserSpace) - // } - // srcHubCli := stgglb.HubRPCPool.Get(stgglb.SelectGRPCAddress(&srcSpace.RecommendHub, addr)) - // defer srcHubCli.Release() - - // listAllResp, cerr := srcHubCli.BaseStoreListAll(context.Background(), &hubrpc.BaseStoreListAll{ - // UserSpace: *srcSpace, - // Path: rootPath, - // }) - // if cerr != nil { - // delPkg() - // return nil, fmt.Errorf("listing base store: %w", cerr.ToError()) - // } - store, err := u.stgPool.GetBaseStore(srcSpace) if err != nil { delPkg() @@ -126,6 +110,13 @@ func (u *Uploader) UserSpaceUpload(userSpaceID clitypes.UserSpaceID, rootPath st return nil, fmt.Errorf("listing base store: %w", err) } + mutex, err := reqbuilder.NewBuilder().UserSpace().Buzy(srcSpace.UserSpace.UserSpaceID).Buzy(targetSapce.Space.UserSpace.UserSpaceID).MutexLock(u.pubLock) + if err != nil { + delPkg() + return nil, fmt.Errorf("acquire lock: %w", err) + } + defer mutex.Unlock() + adds, err := u.uploadFromBaseStore(srcSpace, &targetSapce.Space, entries, rootPath) if err != nil { delPkg() diff --git a/common/pkgs/distlock/lockprovider/shard_store.go b/common/pkgs/distlock/lockprovider/shard_store.go index 740d05c..1cb4b44 100644 --- a/common/pkgs/distlock/lockprovider/shard_store.go +++ b/common/pkgs/distlock/lockprovider/shard_store.go @@ -8,27 +8,27 @@ import ( ) const ( - ShardStoreLockPathPrefix = "ShardStore" - ShardStoreStorageIDPathIndex = 1 - ShardStoreBuzyLock = "Buzy" - ShardStoreGCLock = "GC" + UserSpaceLockPathPrefix = "UserSpace" + UserSpaceStorageIDPathIndex = 1 + UserSpaceBuzyLock = "Buzy" + UserSpaceGCLock = "GC" ) -type ShardStoreLock struct { - stgLocks map[string]*ShardStoreStorageLock - dummyLock *ShardStoreStorageLock +type UserSpaceLock struct { + stgLocks map[string]*UserSpaceStorageLock + dummyLock *UserSpaceStorageLock } -func NewShardStoreLock() *ShardStoreLock { - return &ShardStoreLock{ - stgLocks: make(map[string]*ShardStoreStorageLock), - dummyLock: NewShardStoreStorageLock(), +func NewUserSpaceLock() *UserSpaceLock { + return &UserSpaceLock{ + stgLocks: make(map[string]*UserSpaceStorageLock), + dummyLock: NewUserSpaceStorageLock(), } } // CanLock 判断这个锁能否锁定成功 -func (l *ShardStoreLock) CanLock(lock types.Lock) error { - nodeLock, ok := l.stgLocks[lock.Path[ShardStoreStorageIDPathIndex]] +func (l *UserSpaceLock) CanLock(lock types.Lock) error { + nodeLock, ok := l.stgLocks[lock.Path[UserSpaceStorageIDPathIndex]] if !ok { // 不能直接返回nil,因为如果锁数据的格式不对,也不能获取锁。 // 这里使用一个空Provider来进行检查。 @@ -39,12 +39,12 @@ func (l *ShardStoreLock) CanLock(lock types.Lock) error { } // 锁定。在内部可以不用判断能否加锁,外部需要保证调用此函数前调用了CanLock进行检查 -func (l *ShardStoreLock) Lock(reqID types.RequestID, lock types.Lock) error { - stgID := lock.Path[ShardStoreStorageIDPathIndex] +func (l *UserSpaceLock) Lock(reqID types.RequestID, lock types.Lock) error { + stgID := lock.Path[UserSpaceStorageIDPathIndex] nodeLock, ok := l.stgLocks[stgID] if !ok { - nodeLock = NewShardStoreStorageLock() + nodeLock = NewUserSpaceStorageLock() l.stgLocks[stgID] = nodeLock } @@ -52,8 +52,8 @@ func (l *ShardStoreLock) Lock(reqID types.RequestID, lock types.Lock) error { } // 解锁 -func (l *ShardStoreLock) Unlock(reqID types.RequestID, lock types.Lock) error { - stgID := lock.Path[ShardStoreStorageIDPathIndex] +func (l *UserSpaceLock) Unlock(reqID types.RequestID, lock types.Lock) error { + stgID := lock.Path[UserSpaceStorageIDPathIndex] nodeLock, ok := l.stgLocks[stgID] if !ok { @@ -64,27 +64,27 @@ func (l *ShardStoreLock) Unlock(reqID types.RequestID, lock types.Lock) error { } // Clear 清除内部所有状态 -func (l *ShardStoreLock) Clear() { - l.stgLocks = make(map[string]*ShardStoreStorageLock) +func (l *UserSpaceLock) Clear() { + l.stgLocks = make(map[string]*UserSpaceStorageLock) } -type ShardStoreStorageLock struct { +type UserSpaceStorageLock struct { buzyReqIDs []types.RequestID gcReqIDs []types.RequestID lockCompatibilityTable *LockCompatibilityTable } -func NewShardStoreStorageLock() *ShardStoreStorageLock { +func NewUserSpaceStorageLock() *UserSpaceStorageLock { compTable := &LockCompatibilityTable{} - sdLock := ShardStoreStorageLock{ + sdLock := UserSpaceStorageLock{ lockCompatibilityTable: compTable, } compTable. - Column(ShardStoreBuzyLock, func() bool { return len(sdLock.buzyReqIDs) > 0 }). - Column(ShardStoreGCLock, func() bool { return len(sdLock.gcReqIDs) > 0 }) + Column(UserSpaceBuzyLock, func() bool { return len(sdLock.buzyReqIDs) > 0 }). + Column(UserSpaceGCLock, func() bool { return len(sdLock.gcReqIDs) > 0 }) comp := LockCompatible() uncp := LockUncompatible() @@ -96,16 +96,16 @@ func NewShardStoreStorageLock() *ShardStoreStorageLock { } // CanLock 判断这个锁能否锁定成功 -func (l *ShardStoreStorageLock) CanLock(lock types.Lock) error { +func (l *UserSpaceStorageLock) CanLock(lock types.Lock) error { return l.lockCompatibilityTable.Test(lock) } // 锁定 -func (l *ShardStoreStorageLock) Lock(reqID types.RequestID, lock types.Lock) error { +func (l *UserSpaceStorageLock) Lock(reqID types.RequestID, lock types.Lock) error { switch lock.Name { - case ShardStoreBuzyLock: + case UserSpaceBuzyLock: l.buzyReqIDs = append(l.buzyReqIDs, reqID) - case ShardStoreGCLock: + case UserSpaceGCLock: l.gcReqIDs = append(l.gcReqIDs, reqID) default: return fmt.Errorf("unknow lock name: %s", lock.Name) @@ -115,11 +115,11 @@ func (l *ShardStoreStorageLock) Lock(reqID types.RequestID, lock types.Lock) err } // 解锁 -func (l *ShardStoreStorageLock) Unlock(reqID types.RequestID, lock types.Lock) error { +func (l *UserSpaceStorageLock) Unlock(reqID types.RequestID, lock types.Lock) error { switch lock.Name { - case ShardStoreBuzyLock: + case UserSpaceBuzyLock: l.buzyReqIDs = lo2.Remove(l.buzyReqIDs, reqID) - case ShardStoreGCLock: + case UserSpaceGCLock: l.gcReqIDs = lo2.Remove(l.gcReqIDs, reqID) default: return fmt.Errorf("unknow lock name: %s", lock.Name) diff --git a/common/pkgs/distlock/lockprovider/shard_store_test.go b/common/pkgs/distlock/lockprovider/shard_store_test.go index cd88d8f..9d4464e 100644 --- a/common/pkgs/distlock/lockprovider/shard_store_test.go +++ b/common/pkgs/distlock/lockprovider/shard_store_test.go @@ -18,13 +18,13 @@ func Test_ShardStoreLock(t *testing.T) { title: "同节点,同一个Buzy锁", initLocks: []types.Lock{ { - Path: []string{ShardStoreLockPathPrefix, "hub1"}, - Name: ShardStoreBuzyLock, + Path: []string{UserSpaceLockPathPrefix, "hub1"}, + Name: UserSpaceBuzyLock, }, }, doLock: types.Lock{ - Path: []string{ShardStoreLockPathPrefix, "hub1"}, - Name: ShardStoreBuzyLock, + Path: []string{UserSpaceLockPathPrefix, "hub1"}, + Name: UserSpaceBuzyLock, }, wantOK: true, }, @@ -32,13 +32,13 @@ func Test_ShardStoreLock(t *testing.T) { title: "同节点,同一个GC锁", initLocks: []types.Lock{ { - Path: []string{ShardStoreLockPathPrefix, "hub1"}, - Name: ShardStoreGCLock, + Path: []string{UserSpaceLockPathPrefix, "hub1"}, + Name: UserSpaceGCLock, }, }, doLock: types.Lock{ - Path: []string{ShardStoreLockPathPrefix, "hub1"}, - Name: ShardStoreGCLock, + Path: []string{UserSpaceLockPathPrefix, "hub1"}, + Name: UserSpaceGCLock, }, wantOK: true, }, @@ -46,14 +46,14 @@ func Test_ShardStoreLock(t *testing.T) { title: "同时设置Buzy和GC", initLocks: []types.Lock{ { - Path: []string{ShardStoreLockPathPrefix, "hub1"}, - Name: ShardStoreBuzyLock, + Path: []string{UserSpaceLockPathPrefix, "hub1"}, + Name: UserSpaceBuzyLock, Target: NewStringLockTarget(), }, }, doLock: types.Lock{ - Path: []string{ShardStoreLockPathPrefix, "hub1"}, - Name: ShardStoreGCLock, + Path: []string{UserSpaceLockPathPrefix, "hub1"}, + Name: UserSpaceGCLock, Target: NewStringLockTarget(), }, wantOK: false, @@ -62,7 +62,7 @@ func Test_ShardStoreLock(t *testing.T) { for _, ca := range cases { Convey(ca.title, t, func() { - ipfsLock := NewShardStoreLock() + ipfsLock := NewUserSpaceLock() for _, l := range ca.initLocks { ipfsLock.Lock("req1", l) @@ -78,11 +78,11 @@ func Test_ShardStoreLock(t *testing.T) { } Convey("解锁", t, func() { - ipfsLock := NewShardStoreLock() + ipfsLock := NewUserSpaceLock() lock := types.Lock{ - Path: []string{ShardStoreLockPathPrefix, "hub1"}, - Name: ShardStoreBuzyLock, + Path: []string{UserSpaceLockPathPrefix, "hub1"}, + Name: UserSpaceBuzyLock, } ipfsLock.Lock("req1", lock) @@ -93,8 +93,8 @@ func Test_ShardStoreLock(t *testing.T) { ipfsLock.Unlock("req1", lock) lock = types.Lock{ - Path: []string{ShardStoreLockPathPrefix, "hub1"}, - Name: ShardStoreGCLock, + Path: []string{UserSpaceLockPathPrefix, "hub1"}, + Name: UserSpaceGCLock, } err = ipfsLock.CanLock(lock) So(err, ShouldBeNil) diff --git a/common/pkgs/distlock/reqbuilder/shard_store.go b/common/pkgs/distlock/reqbuilder/shard_store.go index a6b5c06..3852b21 100644 --- a/common/pkgs/distlock/reqbuilder/shard_store.go +++ b/common/pkgs/distlock/reqbuilder/shard_store.go @@ -8,31 +8,31 @@ import ( "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/distlock/types" ) -type ShardStoreLockReqBuilder struct { +type UserSpaceLockReqBuilder struct { *LockRequestBuilder } -func (b *LockRequestBuilder) Shard() *ShardStoreLockReqBuilder { - return &ShardStoreLockReqBuilder{LockRequestBuilder: b} +func (b *LockRequestBuilder) UserSpace() *UserSpaceLockReqBuilder { + return &UserSpaceLockReqBuilder{LockRequestBuilder: b} } -func (b *ShardStoreLockReqBuilder) Buzy(spaceID clitypes.UserSpaceID) *ShardStoreLockReqBuilder { +func (b *UserSpaceLockReqBuilder) Buzy(spaceID clitypes.UserSpaceID) *UserSpaceLockReqBuilder { b.locks = append(b.locks, types.Lock{ Path: b.makePath(spaceID), - Name: lockprovider.ShardStoreBuzyLock, + Name: lockprovider.UserSpaceBuzyLock, Target: lockprovider.NewEmptyTarget(), }) return b } -func (b *ShardStoreLockReqBuilder) GC(spaceID clitypes.UserSpaceID) *ShardStoreLockReqBuilder { +func (b *UserSpaceLockReqBuilder) GC(spaceID clitypes.UserSpaceID) *UserSpaceLockReqBuilder { b.locks = append(b.locks, types.Lock{ Path: b.makePath(spaceID), - Name: lockprovider.ShardStoreGCLock, + Name: lockprovider.UserSpaceGCLock, Target: lockprovider.NewEmptyTarget(), }) return b } -func (b *ShardStoreLockReqBuilder) makePath(hubID clitypes.UserSpaceID) []string { - return []string{lockprovider.ShardStoreLockPathPrefix, strconv.FormatInt(int64(hubID), 10)} +func (b *UserSpaceLockReqBuilder) makePath(hubID clitypes.UserSpaceID) []string { + return []string{lockprovider.UserSpaceLockPathPrefix, strconv.FormatInt(int64(hubID), 10)} } diff --git a/common/pkgs/distlock/service.go b/common/pkgs/distlock/service.go index bb922c4..7bfa53f 100644 --- a/common/pkgs/distlock/service.go +++ b/common/pkgs/distlock/service.go @@ -38,7 +38,7 @@ func NewService() *Service { provdersTrie: trie.NewTrie[types.LockProvider](), } - svc.provdersTrie.Create([]any{lockprovider.ShardStoreLockPathPrefix, trie.WORD_ANY}).Value = lockprovider.NewShardStoreLock() + svc.provdersTrie.Create([]any{lockprovider.UserSpaceLockPathPrefix, trie.WORD_ANY}).Value = lockprovider.NewUserSpaceLock() return svc }