From de513799a10e3c1a81cb3542fd37546dfc450184 Mon Sep 17 00:00:00 2001 From: Sydonian <794346190@qq.com> Date: Tue, 22 Aug 2023 16:52:55 +0800 Subject: [PATCH] =?UTF-8?q?=E6=8B=86=E5=88=86=E5=92=8C=E4=BF=AE=E5=A4=8D?= =?UTF-8?q?=E9=94=81=E6=9C=8D=E5=8A=A1=E4=BB=A3=E7=A0=81=EF=BC=9B=E5=B0=86?= =?UTF-8?q?=E8=B0=83=E5=BA=A6Package=E7=9A=84=E6=9C=AF=E8=AF=AD=E6=94=B9?= =?UTF-8?q?=E4=B8=BALoad?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- assets/confs/agent.config.json | 13 +- assets/confs/client.config.json | 11 +- assets/scripts/create_database.sql | 6 +- pkgs/cmd/create_ec_package.go | 88 +++---- pkgs/cmd/create_rep_package.go | 59 +++-- pkgs/cmd/download_package.go | 9 +- pkgs/cmd/update_ec_package.go | 93 +++---- pkgs/cmd/update_rep_package.go | 57 ++--- pkgs/db/bucket.go | 6 +- pkgs/db/cache.go | 6 +- pkgs/db/object.go | 5 + pkgs/db/object_block.go | 23 +- pkgs/db/object_rep.go | 21 +- pkgs/db/package.go | 29 ++- pkgs/db/storage.go | 8 +- pkgs/db/storage_package.go | 6 +- pkgs/distlock/lockprovider/ipfs_lock.go | 217 +++++++++++++++++ pkgs/distlock/lockprovider/ipfs_lock_test.go | 113 +++++++++ .../lockprovider/lock_compatibility_table.go | 123 ++++++++++ .../lock_compatibility_table_test.go | 41 ++++ pkgs/distlock/lockprovider/metadata_lock.go | 184 ++++++++++++++ pkgs/distlock/lockprovider/storage_lock.go | 226 ++++++++++++++++++ .../lockprovider/string_lock_target.go | 78 ++++++ .../lockprovider/string_lock_target_test.go | 60 +++++ pkgs/distlock/reqbuilder/ipfs.go | 64 +++++ .../reqbuilder/lock_request_builder.go | 31 +++ pkgs/distlock/reqbuilder/metadata.go | 17 ++ pkgs/distlock/reqbuilder/metadata_bucket.go | 63 +++++ pkgs/distlock/reqbuilder/metadata_cache.go | 63 +++++ pkgs/distlock/reqbuilder/metadata_node.go | 63 +++++ pkgs/distlock/reqbuilder/metadata_object.go | 65 +++++ .../reqbuilder/metadata_object_block.go | 63 +++++ .../reqbuilder/metadata_object_rep.go | 63 +++++ pkgs/distlock/reqbuilder/metadata_package.go | 63 +++++ .../reqbuilder/metadata_storage_package.go | 63 +++++ .../reqbuilder/metadata_user_bucket.go | 63 +++++ .../reqbuilder/metadata_user_storage.go | 63 +++++ pkgs/distlock/reqbuilder/storage.go | 74 ++++++ pkgs/distlock/service.go | 62 +++++ pkgs/grpc/agent/pool.go | 2 +- pkgs/iterator/ec_object_iterator.go | 7 +- pkgs/iterator/local_uploading_iterator.go | 2 +- pkgs/iterator/rep_object_iterator.go | 31 ++- pkgs/mq/agent/storage.go | 40 ++-- pkgs/mq/coordinator/storage.go | 20 +- utils/utils.go | 17 +- 46 files changed, 2205 insertions(+), 276 deletions(-) create mode 100644 pkgs/distlock/lockprovider/ipfs_lock.go create mode 100644 pkgs/distlock/lockprovider/ipfs_lock_test.go create mode 100644 pkgs/distlock/lockprovider/lock_compatibility_table.go create mode 100644 pkgs/distlock/lockprovider/lock_compatibility_table_test.go create mode 100644 pkgs/distlock/lockprovider/metadata_lock.go create mode 100644 pkgs/distlock/lockprovider/storage_lock.go create mode 100644 pkgs/distlock/lockprovider/string_lock_target.go create mode 100644 pkgs/distlock/lockprovider/string_lock_target_test.go create mode 100644 pkgs/distlock/reqbuilder/ipfs.go create mode 100644 pkgs/distlock/reqbuilder/lock_request_builder.go create mode 100644 pkgs/distlock/reqbuilder/metadata.go create mode 100644 pkgs/distlock/reqbuilder/metadata_bucket.go create mode 100644 pkgs/distlock/reqbuilder/metadata_cache.go create mode 100644 pkgs/distlock/reqbuilder/metadata_node.go create mode 100644 pkgs/distlock/reqbuilder/metadata_object.go create mode 100644 pkgs/distlock/reqbuilder/metadata_object_block.go create mode 100644 pkgs/distlock/reqbuilder/metadata_object_rep.go create mode 100644 pkgs/distlock/reqbuilder/metadata_package.go create mode 100644 pkgs/distlock/reqbuilder/metadata_storage_package.go create mode 100644 pkgs/distlock/reqbuilder/metadata_user_bucket.go create mode 100644 pkgs/distlock/reqbuilder/metadata_user_storage.go create mode 100644 pkgs/distlock/reqbuilder/storage.go create mode 100644 pkgs/distlock/service.go diff --git a/assets/confs/agent.config.json b/assets/confs/agent.config.json index dc28967..34200e9 100644 --- a/assets/confs/agent.config.json +++ b/assets/confs/agent.config.json @@ -1,10 +1,15 @@ { "id": 1, - "grpcListenAddress": "127.0.0.1:5010", - "grpcPort": 5010, + "local": { + "nodeID": 1, + "localIP": "127.0.0.1", + "externalIP": "127.0.0.1" + }, + "grpc": { + "ip": "127.0.0.1", + "port": 5010 + }, "ecPacketSize": 10, - "localIP": "127.0.0.1", - "externalIP": "127.0.0.1", "storageBaseDir": ".", "tempFileLifetime": 3600, "logger": { diff --git a/assets/confs/client.config.json b/assets/confs/client.config.json index 5afdd35..f9d97e6 100644 --- a/assets/confs/client.config.json +++ b/assets/confs/client.config.json @@ -1,10 +1,13 @@ { - "grpcPort": 5010, + "local": { + "localIP": "127.0.0.1", + "externalIP": "127.0.0.1" + }, + "agentGRPC": { + "port": 5010 + }, "ecPacketSize": 10, - "ipfsPort": 10, "maxRepCount": 10, - "localIP": "127.0.0.1", - "externalIP": "127.0.0.1", "logger": { "output": "stdout", "level": "debug" diff --git a/assets/scripts/create_database.sql b/assets/scripts/create_database.sql index 8e916e1..0a1c9d4 100644 --- a/assets/scripts/create_database.sql +++ b/assets/scripts/create_database.sql @@ -106,7 +106,7 @@ create table Package ( create table Object ( ObjectID int not null auto_increment primary key comment '对象ID', PackageID int not null comment '包ID', - Path varchar(1000) not null comment '对象路径', + Path varchar(500) not null comment '对象路径', Size bigint not null comment '对象大小(Byte)', UNIQUE KEY PackagePath (PackageID, Path) ) comment = '对象表'; @@ -118,9 +118,9 @@ create table ObjectRep ( create table ObjectBlock ( ObjectID int not null comment '对象ID', - Index int not null comment '编码块在条带内的排序', + `Index` int not null comment '编码块在条带内的排序', FileHash varchar(100) not null comment '编码块哈希值', - primary key(ObjectID, Index) + primary key(ObjectID, `Index`) ) comment = '对象编码块表'; create table Cache ( diff --git a/pkgs/cmd/create_ec_package.go b/pkgs/cmd/create_ec_package.go index 3eb6e5b..be4c3fa 100644 --- a/pkgs/cmd/create_ec_package.go +++ b/pkgs/cmd/create_ec_package.go @@ -13,6 +13,7 @@ import ( "gitlink.org.cn/cloudream/storage-common/globals" "gitlink.org.cn/cloudream/storage-common/pkgs/db/model" + "gitlink.org.cn/cloudream/storage-common/pkgs/distlock/reqbuilder" "gitlink.org.cn/cloudream/storage-common/pkgs/ec" "gitlink.org.cn/cloudream/storage-common/pkgs/iterator" coormq "gitlink.org.cn/cloudream/storage-common/pkgs/mq/coordinator" @@ -60,36 +61,25 @@ func (t *CreateECPackage) Execute(ctx *UpdateECPackageContext) (*CreateECPackage return nil, fmt.Errorf("new coordinator client: %w", err) } - // TODO2 - /* - reqBlder := reqbuilder.NewBuilder() - for _, uploadObject := range t.Objects { - reqBlder.Metadata(). - // 用于防止创建了多个同名对象 - Object().CreateOne(t.bucketID, uploadObject.ObjectName) - } - - // 如果本地的IPFS也是存储系统的一个节点,那么从本地上传时,需要加锁 - if t.uploadConfig.LocalNodeID != nil { - reqBlder.IPFS().CreateAnyRep(*t.uploadConfig.LocalNodeID) - } - - mutex, err := reqBlder. - Metadata(). - // 用于判断用户是否有桶的权限 - UserBucket().ReadOne(t.userID, t.bucketID). - // 用于查询可用的上传节点 - Node().ReadAny(). - // 用于设置Rep配置 - ObjectRep().CreateAny(). - // 用于创建Cache记录 - Cache().CreateAny(). - MutexLock(ctx.DistLock()) - if err != nil { - return fmt.Errorf("acquire locks failed, err: %w", err) - } - defer mutex.Unlock() - */ + mutex, err := reqbuilder.NewBuilder(). + Metadata(). + // 用于判断用户是否有桶的权限 + UserBucket().ReadOne(t.userID, t.bucketID). + // 用于查询可用的上传节点 + Node().ReadAny(). + // 用于创建包信息 + Package().CreateOne(t.bucketID, t.name). + // 用于创建包中的文件的信息 + Object().CreateAny(). + // 用于设置EC配置 + ObjectBlock().CreateAny(). + // 用于创建Cache记录 + Cache().CreateAny(). + MutexLock(ctx.Distlock) + if err != nil { + return nil, fmt.Errorf("acquire locks failed, err: %w", err) + } + defer mutex.Unlock() createPkgResp, err := coorCli.CreatePackage(coormq.NewCreatePackage(t.userID, t.bucketID, t.name, models.NewTypedRedundancyInfo(models.RedundancyRep, t.redundancy))) @@ -119,17 +109,25 @@ func (t *CreateECPackage) Execute(ctx *UpdateECPackageContext) (*CreateECPackage return nil, fmt.Errorf("getting ec: %w", err) } - /* - TODO2 - // 防止上传的副本被清除 - mutex2, err := reqbuilder.NewBuilder(). - IPFS().CreateAnyRep(uploadNode.Node.NodeID). - MutexLock(ctx.DistLock()) - if err != nil { - return fmt.Errorf("acquire locks failed, err: %w", err) + // 给上传节点的IPFS加锁 + ipfsReqBlder := reqbuilder.NewBuilder() + // 如果本地的IPFS也是存储系统的一个节点,那么从本地上传时,需要加锁 + if globals.Local.NodeID != nil { + ipfsReqBlder.IPFS().CreateAnyRep(*globals.Local.NodeID) + } + for _, node := range uploadNodeInfos { + if globals.Local.NodeID != nil && node.Node.NodeID == *globals.Local.NodeID { + continue } - defer mutex2.Unlock() - */ + + ipfsReqBlder.IPFS().CreateAnyRep(node.Node.NodeID) + } + // 防止上传的副本被清除 + ipfsMutex, err := ipfsReqBlder.MutexLock(ctx.Distlock) + if err != nil { + return nil, fmt.Errorf("acquire locks failed, err: %w", err) + } + defer ipfsMutex.Unlock() rets, err := uploadAndUpdateECPackage(ctx, createPkgResp.PackageID, t.objectIter, uploadNodeInfos, getECResp.Config) if err != nil { @@ -239,15 +237,7 @@ func ecWrite(ctx *UpdateECPackageContext, file io.ReadCloser, fileSize int64, ec var wg sync.WaitGroup wg.Add(ecN) - /*mutex, err := reqbuilder.NewBuilder(). - // 防止上传的副本被清除 - IPFS().CreateAnyRep(node.ID). - MutexLock(svc.distlock) - if err != nil { - return fmt.Errorf("acquire locks failed, err: %w", err) - } - defer mutex.Unlock() - */ + for idx := 0; idx < ecN; idx++ { i := idx reader := channelBytesReader{ diff --git a/pkgs/cmd/create_rep_package.go b/pkgs/cmd/create_rep_package.go index e9e77bc..51ab9bc 100644 --- a/pkgs/cmd/create_rep_package.go +++ b/pkgs/cmd/create_rep_package.go @@ -8,9 +8,9 @@ import ( "github.com/samber/lo" "gitlink.org.cn/cloudream/common/models" - "gitlink.org.cn/cloudream/common/pkgs/distlock/reqbuilder" distsvc "gitlink.org.cn/cloudream/common/pkgs/distlock/service" "gitlink.org.cn/cloudream/common/pkgs/logger" + "gitlink.org.cn/cloudream/storage-common/pkgs/distlock/reqbuilder" "gitlink.org.cn/cloudream/storage-common/globals" "gitlink.org.cn/cloudream/storage-common/pkgs/db/model" @@ -66,36 +66,31 @@ func (t *CreateRepPackage) Execute(ctx *UpdatePackageContext) (*CreateRepPackage return nil, fmt.Errorf("new coordinator client: %w", err) } - /* - // TODO2 - reqBlder := reqbuilder.NewBuilder() - for _, uploadObject := range t.Objects { - reqBlder.Metadata(). - // 用于防止创建了多个同名对象 - Object().CreateOne(t.bucketID, uploadObject.ObjectName) - } - - // 如果本地的IPFS也是存储系统的一个节点,那么从本地上传时,需要加锁 - if t.uploadConfig.LocalNodeID != nil { - reqBlder.IPFS().CreateAnyRep(*t.uploadConfig.LocalNodeID) - } + reqBlder := reqbuilder.NewBuilder() + // 如果本地的IPFS也是存储系统的一个节点,那么从本地上传时,需要加锁 + if globals.Local.NodeID != nil { + reqBlder.IPFS().CreateAnyRep(*globals.Local.NodeID) + } + mutex, err := reqBlder. + Metadata(). + // 用于判断用户是否有桶的权限 + UserBucket().ReadOne(t.userID, t.bucketID). + // 用于查询可用的上传节点 + Node().ReadAny(). + // 用于创建包信息 + Package().CreateOne(t.bucketID, t.name). + // 用于创建包中的文件的信息 + Object().CreateAny(). + // 用于设置EC配置 + ObjectBlock().CreateAny(). + // 用于创建Cache记录 + Cache().CreateAny(). + MutexLock(ctx.Distlock) + if err != nil { + return nil, fmt.Errorf("acquire locks failed, err: %w", err) + } + defer mutex.Unlock() - mutex, err := reqBlder. - Metadata(). - // 用于判断用户是否有桶的权限 - UserBucket().ReadOne(t.userID, t.bucketID). - // 用于查询可用的上传节点 - Node().ReadAny(). - // 用于设置Rep配置 - ObjectRep().CreateAny(). - // 用于创建Cache记录 - Cache().CreateAny(). - MutexLock(ctx.DistLock()) - if err != nil { - return fmt.Errorf("acquire locks failed, err: %w", err) - } - defer mutex.Unlock() - */ createPkgResp, err := coorCli.CreatePackage(coormq.NewCreatePackage(t.userID, t.bucketID, t.name, models.NewTypedRedundancyInfo(models.RedundancyRep, t.redundancy))) if err != nil { @@ -121,13 +116,13 @@ func (t *CreateRepPackage) Execute(ctx *UpdatePackageContext) (*CreateRepPackage uploadNode := t.chooseUploadNode(nodeInfos) // 防止上传的副本被清除 - mutex2, err := reqbuilder.NewBuilder(). + ipfsMutex, err := reqbuilder.NewBuilder(). IPFS().CreateAnyRep(uploadNode.Node.NodeID). MutexLock(ctx.Distlock) if err != nil { return nil, fmt.Errorf("acquire locks failed, err: %w", err) } - defer mutex2.Unlock() + defer ipfsMutex.Unlock() rets, err := uploadAndUpdateRepPackage(createPkgResp.PackageID, t.objectIter, uploadNode) if err != nil { diff --git a/pkgs/cmd/download_package.go b/pkgs/cmd/download_package.go index 4bd60db..f20c83c 100644 --- a/pkgs/cmd/download_package.go +++ b/pkgs/cmd/download_package.go @@ -133,7 +133,14 @@ func (t *DownloadPackage) writeObject(objIter iterator.DownloadingObjectIterator } defer objInfo.File.Close() - outputFile, err := os.Create(filepath.Join(t.outputPath, objInfo.Object.Path)) + fullPath := filepath.Join(t.outputPath, objInfo.Object.Path) + + dirPath := filepath.Dir(fullPath) + if err := os.MkdirAll(dirPath, 0755); err != nil { + return fmt.Errorf("creating object dir: %w", err) + } + + outputFile, err := os.Create(fullPath) if err != nil { return fmt.Errorf("creating object file: %w", err) } diff --git a/pkgs/cmd/update_ec_package.go b/pkgs/cmd/update_ec_package.go index 2b6b55f..83f195b 100644 --- a/pkgs/cmd/update_ec_package.go +++ b/pkgs/cmd/update_ec_package.go @@ -6,10 +6,10 @@ import ( "github.com/samber/lo" "gitlink.org.cn/cloudream/common/models" "gitlink.org.cn/cloudream/common/utils/serder" - mysort "gitlink.org.cn/cloudream/common/utils/sort" "gitlink.org.cn/cloudream/storage-common/globals" "gitlink.org.cn/cloudream/storage-common/pkgs/db/model" + "gitlink.org.cn/cloudream/storage-common/pkgs/distlock/reqbuilder" "gitlink.org.cn/cloudream/storage-common/pkgs/iterator" coormq "gitlink.org.cn/cloudream/storage-common/pkgs/mq/coordinator" ) @@ -40,36 +40,23 @@ func (t *UpdateECPackage) Execute(ctx *UpdateECPackageContext) (*UpdateECPackage return nil, fmt.Errorf("new coordinator client: %w", err) } - /* - TODO2 - reqBlder := reqbuilder.NewBuilder() - - // 如果本地的IPFS也是存储系统的一个节点,那么从本地上传时,需要加锁 - if t.uploadConfig.LocalNodeID != nil { - reqBlder.IPFS().CreateAnyRep(*t.uploadConfig.LocalNodeID) - } - - // TODO2 - mutex, err := reqBlder. - Metadata(). - // 用于判断用户是否有对象的权限 - UserBucket().ReadAny(). - // 用于读取、修改对象信息 - Object().WriteOne(t.objectID). - // 用于更新Rep配置 - ObjectRep().WriteOne(t.objectID). - // 用于查询可用的上传节点 - Node().ReadAny(). - // 用于创建Cache记录 - Cache().CreateAny(). - // 用于修改Move此Object的记录的状态 - StorageObject().WriteAny(). - MutexLock(ctx.DistLock()) - if err != nil { - return fmt.Errorf("acquire locks failed, err: %w", err) - } - defer mutex.Unlock() - */ + mutex, err := reqbuilder.NewBuilder(). + Metadata(). + // 用于查询可用的上传节点 + Node().ReadAny(). + // 用于创建包信息 + Package().WriteOne(t.packageID). + // 用于创建包中的文件的信息 + Object().CreateAny(). + // 用于设置EC配置 + ObjectBlock().CreateAny(). + // 用于创建Cache记录 + Cache().CreateAny(). + MutexLock(ctx.Distlock) + if err != nil { + return nil, fmt.Errorf("acquire locks failed, err: %w", err) + } + defer mutex.Unlock() getPkgResp, err := coorCli.GetPackage(coormq.NewGetPackage(t.userID, t.packageID)) if err != nil { @@ -103,17 +90,25 @@ func (t *UpdateECPackage) Execute(ctx *UpdateECPackageContext) (*UpdateECPackage return nil, fmt.Errorf("getting ec: %w", err) } - /* - TODO2 - // 防止上传的副本被清除 - mutex2, err := reqbuilder.NewBuilder(). - IPFS().CreateAnyRep(uploadNode.Node.NodeID). - MutexLock(ctx.DistLock()) - if err != nil { - return fmt.Errorf("acquire locks failed, err: %w", err) + // 给上传节点的IPFS加锁 + ipfsReqBlder := reqbuilder.NewBuilder() + // 如果本地的IPFS也是存储系统的一个节点,那么从本地上传时,需要加锁 + if globals.Local.NodeID != nil { + ipfsReqBlder.IPFS().CreateAnyRep(*globals.Local.NodeID) + } + for _, node := range nodeInfos { + if globals.Local.NodeID != nil && node.Node.NodeID == *globals.Local.NodeID { + continue } - defer mutex2.Unlock() - */ + + ipfsReqBlder.IPFS().CreateAnyRep(node.Node.NodeID) + } + // 防止上传的副本被清除 + ipfsMutex, err := ipfsReqBlder.MutexLock(ctx.Distlock) + if err != nil { + return nil, fmt.Errorf("acquire locks failed, err: %w", err) + } + defer ipfsMutex.Unlock() rets, err := uploadAndUpdateECPackage(ctx, t.packageID, t.objectIter, nodeInfos, getECResp.Config) if err != nil { @@ -124,19 +119,3 @@ func (t *UpdateECPackage) Execute(ctx *UpdateECPackageContext) (*UpdateECPackage ObjectResults: rets, }, nil } - -// chooseUploadNode 选择一个上传文件的节点 -// 1. 从与当前客户端相同地域的节点中随机选一个 -// 2. 没有用的话从所有节点中随机选一个 -func (t *UpdateECPackage) chooseUploadNode(nodes []UpdateNodeInfo) UpdateNodeInfo { - mysort.Sort(nodes, func(left, right UpdateNodeInfo) int { - v := -mysort.CmpBool(left.HasOldObject, right.HasOldObject) - if v != 0 { - return v - } - - return -mysort.CmpBool(left.IsSameLocation, right.IsSameLocation) - }) - - return nodes[0] -} diff --git a/pkgs/cmd/update_rep_package.go b/pkgs/cmd/update_rep_package.go index df27f7c..130ec44 100644 --- a/pkgs/cmd/update_rep_package.go +++ b/pkgs/cmd/update_rep_package.go @@ -4,8 +4,8 @@ import ( "fmt" "github.com/samber/lo" - "gitlink.org.cn/cloudream/common/pkgs/distlock/reqbuilder" mysort "gitlink.org.cn/cloudream/common/utils/sort" + "gitlink.org.cn/cloudream/storage-common/pkgs/distlock/reqbuilder" "gitlink.org.cn/cloudream/storage-common/globals" "gitlink.org.cn/cloudream/storage-common/pkgs/db/model" @@ -43,36 +43,29 @@ func (t *UpdateRepPackage) Execute(ctx *UpdatePackageContext) (*UpdateRepPackage if err != nil { return nil, fmt.Errorf("new coordinator client: %w", err) } - /* - TODO2 - reqBlder := reqbuilder.NewBuilder() - // 如果本地的IPFS也是存储系统的一个节点,那么从本地上传时,需要加锁 - if t.uploadConfig.LocalNodeID != nil { - reqBlder.IPFS().CreateAnyRep(*t.uploadConfig.LocalNodeID) - } - - // TODO2 - mutex, err := reqBlder. - Metadata(). - // 用于判断用户是否有对象的权限 - UserBucket().ReadAny(). - // 用于读取、修改对象信息 - Object().WriteOne(t.objectID). - // 用于更新Rep配置 - ObjectRep().WriteOne(t.objectID). - // 用于查询可用的上传节点 - Node().ReadAny(). - // 用于创建Cache记录 - Cache().CreateAny(). - // 用于修改Move此Object的记录的状态 - StorageObject().WriteAny(). - MutexLock(ctx.DistLock()) - if err != nil { - return fmt.Errorf("acquire locks failed, err: %w", err) - } - defer mutex.Unlock() - */ + reqBlder := reqbuilder.NewBuilder() + // 如果本地的IPFS也是存储系统的一个节点,那么从本地上传时,需要加锁 + if globals.Local.NodeID != nil { + reqBlder.IPFS().CreateAnyRep(*globals.Local.NodeID) + } + mutex, err := reqBlder. + Metadata(). + // 用于查询可用的上传节点 + Node().ReadAny(). + // 用于创建包信息 + Package().WriteOne(t.packageID). + // 用于创建包中的文件的信息 + Object().CreateAny(). + // 用于设置EC配置 + ObjectBlock().CreateAny(). + // 用于创建Cache记录 + Cache().CreateAny(). + MutexLock(ctx.Distlock) + if err != nil { + return nil, fmt.Errorf("acquire locks failed, err: %w", err) + } + defer mutex.Unlock() getUserNodesResp, err := coorCli.GetUserNodes(coormq.NewGetUserNodes(t.userID)) if err != nil { return nil, fmt.Errorf("getting user nodes: %w", err) @@ -100,13 +93,13 @@ func (t *UpdateRepPackage) Execute(ctx *UpdatePackageContext) (*UpdateRepPackage uploadNode := t.chooseUploadNode(nodeInfos) // 防止上传的副本被清除 - mutex2, err := reqbuilder.NewBuilder(). + ipfsMutex, err := reqbuilder.NewBuilder(). IPFS().CreateAnyRep(uploadNode.Node.NodeID). MutexLock(ctx.Distlock) if err != nil { return nil, fmt.Errorf("acquire locks failed, err: %w", err) } - defer mutex2.Unlock() + defer ipfsMutex.Unlock() rets, err := uploadAndUpdateRepPackage(t.packageID, t.objectIter, uploadNode.UploadNodeInfo) if err != nil { diff --git a/pkgs/db/bucket.go b/pkgs/db/bucket.go index e8f2eac..742cf53 100644 --- a/pkgs/db/bucket.go +++ b/pkgs/db/bucket.go @@ -50,9 +50,9 @@ func (db *BucketDB) IsAvailable(ctx SQLContext, bucketID int64, userID int64) (b func (*BucketDB) GetUserBucket(ctx SQLContext, userID int64, bucketID int64) (model.Bucket, error) { var ret model.Bucket err := sqlx.Get(ctx, &ret, - "select Bucket.* from UserBucket, Bucket where UserID = ? and "+ - "UserBucket.BucketID = Bucket.BucketID and "+ - "Bucket.BucketID = ?", userID, bucketID) + "select Bucket.* from UserBucket, Bucket where UserID = ? and"+ + " UserBucket.BucketID = Bucket.BucketID and"+ + " Bucket.BucketID = ?", userID, bucketID) return ret, err } diff --git a/pkgs/db/cache.go b/pkgs/db/cache.go index 6f0bdd3..0122bf5 100644 --- a/pkgs/db/cache.go +++ b/pkgs/db/cache.go @@ -80,9 +80,9 @@ func (*CacheDB) DeleteNodeAll(ctx SQLContext, nodeID int64) error { func (*CacheDB) FindCachingFileUserNodes(ctx SQLContext, userID int64, fileHash string) ([]model.Node, error) { var x []model.Node err := sqlx.Select(ctx, &x, - "select Node.* from Cache, UserNode, Node where "+ - "Cache.FileHash=? and Cache.NodeID = UserNode.NodeID and "+ - "UserNode.UserID = ? and UserNode.NodeID = Node.NodeID", fileHash, userID) + "select Node.* from Cache, UserNode, Node where"+ + " Cache.FileHash=? and Cache.NodeID = UserNode.NodeID and"+ + " UserNode.UserID = ? and UserNode.NodeID = Node.NodeID", fileHash, userID) return x, err } diff --git a/pkgs/db/object.go b/pkgs/db/object.go index 1bdf9c5..c008c5c 100644 --- a/pkgs/db/object.go +++ b/pkgs/db/object.go @@ -273,3 +273,8 @@ func (*ObjectDB) BatchDelete(ctx SQLContext, ids []int64) error { _, err := ctx.Exec("delete from Object where ObjectID in (?)", ids) return err } + +func (*ObjectDB) DeleteInPackage(ctx SQLContext, packageID int64) error { + _, err := ctx.Exec("delete from Object where PackageID = ?", packageID) + return err +} diff --git a/pkgs/db/object_block.go b/pkgs/db/object_block.go index 4fded12..7b4dd66 100644 --- a/pkgs/db/object_block.go +++ b/pkgs/db/object_block.go @@ -28,13 +28,18 @@ func (db *ObjectBlockDB) DeleteObjectAll(ctx SQLContext, objectID int64) error { return err } +func (db *ObjectBlockDB) DeleteInPackage(ctx SQLContext, packageID int64) error { + _, err := ctx.Exec("delete ObjectBlock from ObjectBlock inner join Object on ObjectBlock.ObjectID = Object.ObjectID where PackageID = ?", packageID) + return err +} + func (db *ObjectBlockDB) CountBlockWithHash(ctx SQLContext, fileHash string) (int, error) { var cnt int err := sqlx.Get(ctx, &cnt, - "select count(FileHash) from ObjectBlock, Object, Package where FileHash = ? and "+ - "ObjectBlock.ObjectID = Object.ObjectID and "+ - "Object.PackageID = Package.PackageID and "+ - "Package.State = ?", fileHash, consts.PackageStateNormal) + "select count(FileHash) from ObjectBlock, Object, Package where FileHash = ? and"+ + " ObjectBlock.ObjectID = Object.ObjectID and"+ + " Object.PackageID = Package.PackageID and"+ + " Package.State = ?", fileHash, consts.PackageStateNormal) if err == sql.ErrNoRows { return 0, nil } @@ -66,8 +71,8 @@ func (db *ObjectBlockDB) GetBatchBlocksNodes(ctx SQLContext, hashs [][]string) ( for j, h := range hs { var x []model.Node err = sqlx.Select(ctx, &x, - "select Node.* from Cache, Node where "+ - "Cache.FileHash=? and Cache.NodeID = Node.NodeID and Cache.State=?", h, consts.CacheStatePinned) + "select Node.* from Cache, Node where"+ + " Cache.FileHash=? and Cache.NodeID = Node.NodeID and Cache.State=?", h, consts.CacheStatePinned) xx := make([]int64, len(x)) for ii := 0; ii < len(x); ii++ { xx[ii] = x[ii].NodeID @@ -97,9 +102,9 @@ func (db *ObjectBlockDB) GetWithNodeIDInPackage(ctx SQLContext, packageID int64) err := sqlx.Select(ctx, &tmpRets, - "select ObjectBlock.Index, ObjectBlock.FileHash, group_concat(NodeID) as NodeIDs from ObjectBlock "+ - "left join Cache on ObjectBlock.FileHash = Cache.FileHash"+ - "where ObjectID = ? group by ObjectBlock.Index, ObjectBlock.FileHash", + "select ObjectBlock.Index, ObjectBlock.FileHash, group_concat(NodeID) as NodeIDs from ObjectBlock"+ + " left join Cache on ObjectBlock.FileHash = Cache.FileHash"+ + " where ObjectID = ? group by ObjectBlock.Index, ObjectBlock.FileHash", objID, ) if err != nil { diff --git a/pkgs/db/object_rep.go b/pkgs/db/object_rep.go index 77265b6..6514021 100644 --- a/pkgs/db/object_rep.go +++ b/pkgs/db/object_rep.go @@ -51,13 +51,18 @@ func (db *ObjectRepDB) Delete(ctx SQLContext, objectID int64) error { return err } +func (db *ObjectRepDB) DeleteInPackage(ctx SQLContext, packageID int64) error { + _, err := ctx.Exec("delete ObjectRep from ObjectRep inner join Object on ObjectRep.ObjectID = Object.ObjectID where PackageID = ?", packageID) + return err +} + func (db *ObjectRepDB) GetFileMaxRepCount(ctx SQLContext, fileHash string) (int, error) { var maxRepCnt *int err := sqlx.Get(ctx, &maxRepCnt, - "select max(RepCount) from ObjectRep, Object, Package where FileHash = ? and "+ - "ObjectRep.ObjectID = Object.ObjectID and "+ - "Object.PackageID = Package.PackageID and "+ - "Package.State = ?", fileHash, consts.PackageStateNormal) + "select json_extract(Redundancy, '$.info.repCount') from ObjectRep, Object, Package where FileHash = ? and"+ + " ObjectRep.ObjectID = Object.ObjectID and"+ + " Object.PackageID = Package.PackageID and"+ + " Package.State = ?", fileHash, consts.PackageStateNormal) if err == sql.ErrNoRows { return 0, nil @@ -83,10 +88,10 @@ func (db *ObjectRepDB) GetWithNodeIDInPackage(ctx SQLContext, packageID int64) ( err := sqlx.Select(ctx, &tmpRets, - "select Object.ObjectID, ObjectRep.FileHash, group_concat(NodeID) as NodeIDs from Object "+ - "left join ObjectRep on Object.ObjectID = ObjectRep.ObjectID "+ - "left join Cache on ObjectRep.FileHash = Cache.FileHash"+ - "where PackageID = ? group by Object.ObjectID order by Object.ObjectID asc", + "select Object.ObjectID, ObjectRep.FileHash, group_concat(NodeID) as NodeIDs from Object"+ + " left join ObjectRep on Object.ObjectID = ObjectRep.ObjectID"+ + " left join Cache on ObjectRep.FileHash = Cache.FileHash"+ + " where PackageID = ? group by Object.ObjectID order by Object.ObjectID asc", packageID, ) if err != nil { diff --git a/pkgs/db/package.go b/pkgs/db/package.go index 1337508..76d4014 100644 --- a/pkgs/db/package.go +++ b/pkgs/db/package.go @@ -70,10 +70,10 @@ func (db *PackageDB) IsAvailable(ctx SQLContext, userID int64, packageID int64) func (db *PackageDB) GetUserPackage(ctx SQLContext, userID int64, packageID int64) (model.Package, error) { var ret model.Package err := sqlx.Get(ctx, &ret, - "select Package.* from Package, UserBucket where "+ - "Package.PackageID = ? and "+ - "Package.BucketID = UserBucket.BucketID and "+ - "UserBucket.UserID = ?", + "select Package.* from Package, UserBucket where"+ + " Package.PackageID = ? and"+ + " Package.BucketID = UserBucket.BucketID and"+ + " UserBucket.UserID = ?", packageID, userID) return ret, err } @@ -129,16 +129,19 @@ func (db *PackageDB) SoftDelete(ctx SQLContext, packageID int64) error { } if obj.Redundancy.Type == models.RedundancyRep { - // TODO2 - //err = db.ObjectRep().Delete(ctx, objectID) - //if err != nil { - // return fmt.Errorf("delete from object rep failed, err: %w", err) - //} + err = db.ObjectRep().DeleteInPackage(ctx, packageID) + if err != nil { + return fmt.Errorf("delete from object rep failed, err: %w", err) + } } else { - //err = db.ObjectBlock().Delete(ctx, objectID) - //if err != nil { - // return fmt.Errorf("delete from object rep failed, err: %w", err) - //} + err = db.ObjectBlock().DeleteInPackage(ctx, packageID) + if err != nil { + return fmt.Errorf("delete from object rep failed, err: %w", err) + } + } + + if err := db.Object().DeleteInPackage(ctx, packageID); err != nil { + return fmt.Errorf("deleting objects in package: %w", err) } _, err = db.StoragePackage().SetAllPackageDeleted(ctx, packageID) diff --git a/pkgs/db/storage.go b/pkgs/db/storage.go index a363369..e6466ce 100644 --- a/pkgs/db/storage.go +++ b/pkgs/db/storage.go @@ -31,10 +31,10 @@ func (db *StorageDB) BatchGetAllStorageIDs(ctx SQLContext, start int, count int) func (db *StorageDB) IsAvailable(ctx SQLContext, userID int64, storageID int64) (bool, error) { var stgID int64 err := sqlx.Get(ctx, &stgID, - "select Storage.StorageID from Storage, UserStorage where "+ - "Storage.StorageID = ? and "+ - "Storage.StorageID = UserStorage.StorageID and "+ - "UserStorage.UserID = ?", + "select Storage.StorageID from Storage, UserStorage where"+ + " Storage.StorageID = ? and"+ + " Storage.StorageID = UserStorage.StorageID and"+ + " UserStorage.UserID = ?", storageID, userID) if err == sql.ErrNoRows { diff --git a/pkgs/db/storage_package.go b/pkgs/db/storage_package.go index 5f09aff..e38d8db 100644 --- a/pkgs/db/storage_package.go +++ b/pkgs/db/storage_package.go @@ -34,7 +34,7 @@ func (*StoragePackageDB) GetAllByStorageID(ctx SQLContext, storageID int64) ([]m return ret, err } -func (*StoragePackageDB) MovePackageTo(ctx SQLContext, packageID int64, storageID int64, userID int64) error { +func (*StoragePackageDB) LoadPackage(ctx SQLContext, packageID int64, storageID int64, userID int64) error { _, err := ctx.Exec("insert into StoragePackage values(?,?,?,?)", packageID, storageID, userID, consts.StoragePackageStateNormal) return err } @@ -108,8 +108,8 @@ func (*StoragePackageDB) Delete(ctx SQLContext, storageID int64, packageID int64 func (*StoragePackageDB) FindPackageStorages(ctx SQLContext, packageID int64) ([]model.Storage, error) { var ret []model.Storage err := sqlx.Select(ctx, &ret, - "select Storage.* from StoragePackage, Storage where PackageID = ? and "+ - "StoragePackage.StorageID = Storage.StorageID", + "select Storage.* from StoragePackage, Storage where PackageID = ? and"+ + " StoragePackage.StorageID = Storage.StorageID", packageID, ) return ret, err diff --git a/pkgs/distlock/lockprovider/ipfs_lock.go b/pkgs/distlock/lockprovider/ipfs_lock.go new file mode 100644 index 0000000..ca1c482 --- /dev/null +++ b/pkgs/distlock/lockprovider/ipfs_lock.go @@ -0,0 +1,217 @@ +package lockprovider + +import ( + "fmt" + + "github.com/samber/lo" + "gitlink.org.cn/cloudream/common/pkgs/distlock" + mylo "gitlink.org.cn/cloudream/common/utils/lo" +) + +const ( + IPFSLockPathPrefix = "IPFS" + + IPFS_SET_READ_LOCK = "SetRead" + IPFS_SET_WRITE_LOCK = "SetWrite" + IPFS_SET_CREATE_LOCK = "SetCreate" + + IPFS_ELEMENT_READ_LOCK = "ElementRead" + IPFS_ELEMENT_WRITE_LOCK = "ElementWrite" + + IPFS_NODE_ID_PATH_INDEX = 1 +) + +type IPFSLock struct { + nodeLocks map[string]*IPFSNodeLock + dummyLock *IPFSNodeLock +} + +func NewIPFSLock() *IPFSLock { + return &IPFSLock{ + nodeLocks: make(map[string]*IPFSNodeLock), + dummyLock: NewIPFSNodeLock(), + } +} + +// CanLock 判断这个锁能否锁定成功 +func (l *IPFSLock) CanLock(lock distlock.Lock) error { + nodeLock, ok := l.nodeLocks[lock.Path[IPFS_NODE_ID_PATH_INDEX]] + if !ok { + // 不能直接返回nil,因为如果锁数据的格式不对,也不能获取锁。 + // 这里使用一个空Provider来进行检查。 + return l.dummyLock.CanLock(lock) + } + + return nodeLock.CanLock(lock) +} + +// 锁定。在内部可以不用判断能否加锁,外部需要保证调用此函数前调用了CanLock进行检查 +func (l *IPFSLock) Lock(reqID string, lock distlock.Lock) error { + nodeID := lock.Path[IPFS_NODE_ID_PATH_INDEX] + + nodeLock, ok := l.nodeLocks[nodeID] + if !ok { + nodeLock = NewIPFSNodeLock() + l.nodeLocks[nodeID] = nodeLock + } + + return nodeLock.Lock(reqID, lock) +} + +// 解锁 +func (l *IPFSLock) Unlock(reqID string, lock distlock.Lock) error { + nodeID := lock.Path[IPFS_NODE_ID_PATH_INDEX] + + nodeLock, ok := l.nodeLocks[nodeID] + if !ok { + return nil + } + + return nodeLock.Unlock(reqID, lock) +} + +// GetTargetString 将锁对象序列化为字符串,方便存储到ETCD +func (l *IPFSLock) GetTargetString(target any) (string, error) { + tar := target.(StringLockTarget) + return StringLockTargetToString(&tar) +} + +// ParseTargetString 解析字符串格式的锁对象数据 +func (l *IPFSLock) ParseTargetString(targetStr string) (any, error) { + return StringLockTargetFromString(targetStr) +} + +// Clear 清除内部所有状态 +func (l *IPFSLock) Clear() { + l.nodeLocks = make(map[string]*IPFSNodeLock) +} + +type ipfsElementLock struct { + target StringLockTarget + requestIDs []string +} + +type IPFSNodeLock struct { + setReadReqIDs []string + setWriteReqIDs []string + setCreateReqIDs []string + + elementReadLocks []*ipfsElementLock + elementWriteLocks []*ipfsElementLock + + lockCompatibilityTable *LockCompatibilityTable +} + +func NewIPFSNodeLock() *IPFSNodeLock { + compTable := &LockCompatibilityTable{} + + ipfsLock := IPFSNodeLock{ + lockCompatibilityTable: compTable, + } + + compTable. + Column(IPFS_ELEMENT_READ_LOCK, func() bool { return len(ipfsLock.elementReadLocks) > 0 }). + Column(IPFS_ELEMENT_WRITE_LOCK, func() bool { return len(ipfsLock.elementWriteLocks) > 0 }). + Column(IPFS_SET_READ_LOCK, func() bool { return len(ipfsLock.setReadReqIDs) > 0 }). + Column(IPFS_SET_WRITE_LOCK, func() bool { return len(ipfsLock.setWriteReqIDs) > 0 }). + Column(IPFS_SET_CREATE_LOCK, func() bool { return len(ipfsLock.setCreateReqIDs) > 0 }) + + comp := LockCompatible() + uncp := LockUncompatible() + trgt := LockSpecial(func(lock distlock.Lock, testLockName string) bool { + strTar := lock.Target.(StringLockTarget) + if testLockName == IPFS_ELEMENT_READ_LOCK { + // 如果没有任何锁的锁对象与当前的锁对象冲突,那么这个锁可以加 + return lo.NoneBy(ipfsLock.elementReadLocks, func(other *ipfsElementLock) bool { return strTar.IsConflict(&other.target) }) + } + + return lo.NoneBy(ipfsLock.elementWriteLocks, func(other *ipfsElementLock) bool { return strTar.IsConflict(&other.target) }) + }) + + compTable.MustRow(comp, trgt, comp, uncp, comp) + compTable.MustRow(trgt, trgt, uncp, uncp, uncp) + compTable.MustRow(comp, uncp, comp, uncp, uncp) + compTable.MustRow(uncp, uncp, uncp, uncp, uncp) + compTable.MustRow(comp, uncp, uncp, uncp, comp) + + return &ipfsLock +} + +// CanLock 判断这个锁能否锁定成功 +func (l *IPFSNodeLock) CanLock(lock distlock.Lock) error { + return l.lockCompatibilityTable.Test(lock) +} + +// 锁定 +func (l *IPFSNodeLock) Lock(reqID string, lock distlock.Lock) error { + switch lock.Name { + case IPFS_SET_READ_LOCK: + l.setReadReqIDs = append(l.setReadReqIDs, reqID) + case IPFS_SET_WRITE_LOCK: + l.setWriteReqIDs = append(l.setWriteReqIDs, reqID) + case IPFS_SET_CREATE_LOCK: + l.setCreateReqIDs = append(l.setCreateReqIDs, reqID) + + case IPFS_ELEMENT_READ_LOCK: + l.elementReadLocks = l.addElementLock(lock, l.elementReadLocks, reqID) + case IPFS_ELEMENT_WRITE_LOCK: + l.elementWriteLocks = l.addElementLock(lock, l.elementWriteLocks, reqID) + + default: + return fmt.Errorf("unknow lock name: %s", lock.Name) + } + + return nil +} + +func (l *IPFSNodeLock) addElementLock(lock distlock.Lock, locks []*ipfsElementLock, reqID string) []*ipfsElementLock { + strTarget := lock.Target.(StringLockTarget) + lck, ok := lo.Find(locks, func(l *ipfsElementLock) bool { return strTarget.IsConflict(&l.target) }) + if !ok { + lck = &ipfsElementLock{ + target: strTarget, + } + locks = append(locks, lck) + } + + lck.requestIDs = append(lck.requestIDs, reqID) + return locks +} + +// 解锁 +func (l *IPFSNodeLock) Unlock(reqID string, lock distlock.Lock) error { + switch lock.Name { + case IPFS_SET_READ_LOCK: + l.setReadReqIDs = mylo.Remove(l.setReadReqIDs, reqID) + case IPFS_SET_WRITE_LOCK: + l.setWriteReqIDs = mylo.Remove(l.setWriteReqIDs, reqID) + case IPFS_SET_CREATE_LOCK: + l.setCreateReqIDs = mylo.Remove(l.setCreateReqIDs, reqID) + + case IPFS_ELEMENT_READ_LOCK: + l.elementReadLocks = l.removeElementLock(lock, l.elementReadLocks, reqID) + case IPFS_ELEMENT_WRITE_LOCK: + l.elementWriteLocks = l.removeElementLock(lock, l.elementWriteLocks, reqID) + + default: + return fmt.Errorf("unknow lock name: %s", lock.Name) + } + + return nil +} + +func (l *IPFSNodeLock) removeElementLock(lock distlock.Lock, locks []*ipfsElementLock, reqID string) []*ipfsElementLock { + strTarget := lock.Target.(StringLockTarget) + lck, index, ok := lo.FindIndexOf(locks, func(l *ipfsElementLock) bool { return strTarget.IsConflict(&l.target) }) + if !ok { + return locks + } + + lck.requestIDs = mylo.Remove(lck.requestIDs, reqID) + + if len(lck.requestIDs) == 0 { + locks = mylo.RemoveAt(locks, index) + } + + return locks +} diff --git a/pkgs/distlock/lockprovider/ipfs_lock_test.go b/pkgs/distlock/lockprovider/ipfs_lock_test.go new file mode 100644 index 0000000..ff8139d --- /dev/null +++ b/pkgs/distlock/lockprovider/ipfs_lock_test.go @@ -0,0 +1,113 @@ +package lockprovider + +import ( + "testing" + + . "github.com/smartystreets/goconvey/convey" + "gitlink.org.cn/cloudream/common/pkgs/distlock" +) + +func Test_IPFSLock(t *testing.T) { + cases := []struct { + title string + initLocks []distlock.Lock + doLock distlock.Lock + wantOK bool + }{ + { + title: "同节点,同一个Read锁", + initLocks: []distlock.Lock{ + { + Path: []string{IPFSLockPathPrefix, "node1"}, + Name: IPFS_SET_READ_LOCK, + }, + }, + doLock: distlock.Lock{ + Path: []string{IPFSLockPathPrefix, "node1"}, + Name: IPFS_SET_READ_LOCK, + }, + wantOK: true, + }, + { + title: "同节点,同一个Write锁", + initLocks: []distlock.Lock{ + { + Path: []string{IPFSLockPathPrefix, "node1"}, + Name: IPFS_SET_WRITE_LOCK, + }, + }, + doLock: distlock.Lock{ + Path: []string{IPFSLockPathPrefix, "node1"}, + Name: IPFS_SET_WRITE_LOCK, + }, + wantOK: false, + }, + { + title: "不同节点,同一个Write锁", + initLocks: []distlock.Lock{ + { + Path: []string{IPFSLockPathPrefix, "node1"}, + Name: IPFS_SET_WRITE_LOCK, + }, + }, + doLock: distlock.Lock{ + Path: []string{IPFSLockPathPrefix, "node2"}, + Name: IPFS_SET_WRITE_LOCK, + }, + wantOK: true, + }, + { + title: "相同对象的Read、Write锁", + initLocks: []distlock.Lock{ + { + Path: []string{IPFSLockPathPrefix, "node1"}, + Name: IPFS_ELEMENT_WRITE_LOCK, + Target: *NewStringLockTarget(), + }, + }, + doLock: distlock.Lock{ + Path: []string{IPFSLockPathPrefix, "node1"}, + Name: IPFS_ELEMENT_WRITE_LOCK, + Target: *NewStringLockTarget(), + }, + wantOK: false, + }, + } + + for _, ca := range cases { + Convey(ca.title, t, func() { + ipfsLock := NewIPFSLock() + + for _, l := range ca.initLocks { + ipfsLock.Lock("req1", l) + } + + err := ipfsLock.CanLock(ca.doLock) + if ca.wantOK { + So(err, ShouldBeNil) + } else { + So(err, ShouldNotBeNil) + } + }) + } + + Convey("解锁", t, func() { + ipfsLock := NewIPFSLock() + + lock := distlock.Lock{ + Path: []string{IPFSLockPathPrefix, "node1"}, + Name: IPFS_SET_WRITE_LOCK, + } + + ipfsLock.Lock("req1", lock) + + err := ipfsLock.CanLock(lock) + So(err, ShouldNotBeNil) + + ipfsLock.Unlock("req1", lock) + + err = ipfsLock.CanLock(lock) + So(err, ShouldBeNil) + }) + +} diff --git a/pkgs/distlock/lockprovider/lock_compatibility_table.go b/pkgs/distlock/lockprovider/lock_compatibility_table.go new file mode 100644 index 0000000..951e708 --- /dev/null +++ b/pkgs/distlock/lockprovider/lock_compatibility_table.go @@ -0,0 +1,123 @@ +package lockprovider + +import ( + "fmt" + + "github.com/samber/lo" + "gitlink.org.cn/cloudream/common/pkgs/distlock" +) + +const ( + LOCK_COMPATIBILITY_COMPATIBLE LockCompatibilityType = "Compatible" + LOCK_COMPATIBILITY_UNCOMPATIBLE LockCompatibilityType = "Uncompatible" + LOCK_COMPATIBILITY_SPECIAL LockCompatibilityType = "Special" +) + +type HasSuchLockFn = func() bool + +// LockCompatibilitySpecialFn 判断锁与指定的锁名是否兼容 +type LockCompatibilitySpecialFn func(lock distlock.Lock, testLockName string) bool + +type LockCompatibilityType string + +type LockCompatibility struct { + Type LockCompatibilityType + SpecialFn LockCompatibilitySpecialFn +} + +func LockCompatible() LockCompatibility { + return LockCompatibility{ + Type: LOCK_COMPATIBILITY_COMPATIBLE, + } +} + +func LockUncompatible() LockCompatibility { + return LockCompatibility{ + Type: LOCK_COMPATIBILITY_UNCOMPATIBLE, + } +} + +func LockSpecial(specialFn LockCompatibilitySpecialFn) LockCompatibility { + return LockCompatibility{ + Type: LOCK_COMPATIBILITY_SPECIAL, + SpecialFn: specialFn, + } +} + +type LockCompatibilityTableRow struct { + LockName string + HasSuchLockFn HasSuchLockFn + Compatibilities []LockCompatibility +} + +type LockCompatibilityTable struct { + rows []LockCompatibilityTableRow + rowIndex int +} + +func (t *LockCompatibilityTable) Column(lockName string, hasSuchLock HasSuchLockFn) *LockCompatibilityTable { + t.rows = append(t.rows, LockCompatibilityTableRow{ + LockName: lockName, + HasSuchLockFn: hasSuchLock, + }) + + return t +} +func (t *LockCompatibilityTable) MustRow(comps ...LockCompatibility) { + err := t.Row(comps...) + if err != nil { + panic(fmt.Sprintf("build lock compatibility table failed, err: %s", err.Error())) + } +} + +func (t *LockCompatibilityTable) Row(comps ...LockCompatibility) error { + if t.rowIndex >= len(t.rows) { + return fmt.Errorf("there should be no more rows in the table") + } + + if len(comps) < len(t.rows) { + return fmt.Errorf("the columns should equals the rows") + } + + t.rows[t.rowIndex].Compatibilities = comps + + for i := 0; i < t.rowIndex-1; i++ { + chkRowCeil := t.rows[t.rowIndex].Compatibilities[i] + chkColCeil := t.rows[i].Compatibilities[t.rowIndex] + + if chkRowCeil.Type != chkColCeil.Type { + return fmt.Errorf("value at %d, %d is not equals to at %d, %d", t.rowIndex, i, i, t.rowIndex) + } + } + + t.rowIndex++ + + return nil +} + +func (t *LockCompatibilityTable) Test(lock distlock.Lock) error { + row, ok := lo.Find(t.rows, func(row LockCompatibilityTableRow) bool { return lock.Name == row.LockName }) + if !ok { + return fmt.Errorf("unknow lock name %s", lock.Name) + } + + for i, c := range row.Compatibilities { + if c.Type == LOCK_COMPATIBILITY_COMPATIBLE { + continue + } + + if c.Type == LOCK_COMPATIBILITY_UNCOMPATIBLE { + if t.rows[i].HasSuchLockFn() { + return distlock.NewLockTargetBusyError(t.rows[i].LockName) + } + } + + if c.Type == LOCK_COMPATIBILITY_SPECIAL { + if !c.SpecialFn(lock, t.rows[i].LockName) { + return distlock.NewLockTargetBusyError(t.rows[i].LockName) + } + } + } + + return nil +} diff --git a/pkgs/distlock/lockprovider/lock_compatibility_table_test.go b/pkgs/distlock/lockprovider/lock_compatibility_table_test.go new file mode 100644 index 0000000..052be72 --- /dev/null +++ b/pkgs/distlock/lockprovider/lock_compatibility_table_test.go @@ -0,0 +1,41 @@ +package lockprovider + +import ( + "testing" + + . "github.com/smartystreets/goconvey/convey" + "gitlink.org.cn/cloudream/common/pkgs/distlock" +) + +func Test_LockCompatibilityTable(t *testing.T) { + Convey("兼容,互斥,特殊比较", t, func() { + table := LockCompatibilityTable{} + + table. + Column("l1", func() bool { return true }). + Column("l2", func() bool { return true }). + Column("l3", func() bool { return false }) + + comp := LockCompatible() + uncp := LockUncompatible() + spcl := LockSpecial(func(lock distlock.Lock, testLockName string) bool { return true }) + table.Row(comp, comp, comp) + table.Row(comp, uncp, comp) + table.Row(comp, comp, spcl) + + err := table.Test(distlock.Lock{ + Name: "l1", + }) + So(err, ShouldBeNil) + + err = table.Test(distlock.Lock{ + Name: "l2", + }) + So(err, ShouldNotBeNil) + + err = table.Test(distlock.Lock{ + Name: "l3", + }) + So(err, ShouldBeNil) + }) +} diff --git a/pkgs/distlock/lockprovider/metadata_lock.go b/pkgs/distlock/lockprovider/metadata_lock.go new file mode 100644 index 0000000..e26f37e --- /dev/null +++ b/pkgs/distlock/lockprovider/metadata_lock.go @@ -0,0 +1,184 @@ +package lockprovider + +import ( + "fmt" + + "github.com/samber/lo" + "gitlink.org.cn/cloudream/common/pkgs/distlock" + mylo "gitlink.org.cn/cloudream/common/utils/lo" +) + +const ( + MetadataLockPathPrefix = "Metadata" + + METADATA_SET_READ_LOCK = "SetRead" + METADATA_SET_WRITE_LOCK = "SetWrite" + METADATA_SET_CREATE_LOCK = "SetCreate" + + METADATA_ELEMENT_READ_LOCK = "ElementRead" + METADATA_ELEMENT_WRITE_LOCK = "ElementWrite" + METADATA_ELEMENT_CREATE_LOCK = "ElementCreate" +) + +type metadataElementLock struct { + target StringLockTarget + requestIDs []string +} + +type MetadataLock struct { + setReadReqIDs []string + setWriteReqIDs []string + setCreateReqIDs []string + + elementReadLocks []*metadataElementLock + elementWriteLocks []*metadataElementLock + elementCreateLocks []*metadataElementLock + + lockCompatibilityTable LockCompatibilityTable +} + +func NewMetadataLock() *MetadataLock { + + metadataLock := MetadataLock{ + lockCompatibilityTable: LockCompatibilityTable{}, + } + + compTable := &metadataLock.lockCompatibilityTable + + compTable. + Column(METADATA_ELEMENT_READ_LOCK, func() bool { return len(metadataLock.elementReadLocks) > 0 }). + Column(METADATA_ELEMENT_WRITE_LOCK, func() bool { return len(metadataLock.elementWriteLocks) > 0 }). + Column(METADATA_ELEMENT_CREATE_LOCK, func() bool { return len(metadataLock.elementCreateLocks) > 0 }). + Column(METADATA_SET_READ_LOCK, func() bool { return len(metadataLock.setReadReqIDs) > 0 }). + Column(METADATA_SET_WRITE_LOCK, func() bool { return len(metadataLock.setWriteReqIDs) > 0 }). + Column(METADATA_SET_CREATE_LOCK, func() bool { return len(metadataLock.setCreateReqIDs) > 0 }) + + comp := LockCompatible() + uncp := LockUncompatible() + trgt := LockSpecial(func(lock distlock.Lock, testLockName string) bool { + strTar := lock.Target.(StringLockTarget) + if testLockName == METADATA_ELEMENT_READ_LOCK { + // 如果没有任何锁的锁对象与当前的锁对象冲突,那么这个锁可以加 + return lo.NoneBy(metadataLock.elementReadLocks, func(other *metadataElementLock) bool { return strTar.IsConflict(&other.target) }) + } + + if testLockName == METADATA_ELEMENT_WRITE_LOCK { + return lo.NoneBy(metadataLock.elementWriteLocks, func(other *metadataElementLock) bool { return strTar.IsConflict(&other.target) }) + } + + return lo.NoneBy(metadataLock.elementCreateLocks, func(other *metadataElementLock) bool { return strTar.IsConflict(&other.target) }) + }) + + compTable.MustRow(comp, trgt, comp, comp, uncp, comp) + compTable.MustRow(trgt, trgt, comp, uncp, uncp, comp) + compTable.MustRow(comp, comp, trgt, uncp, uncp, uncp) + compTable.MustRow(comp, uncp, uncp, comp, uncp, uncp) + compTable.MustRow(uncp, uncp, uncp, uncp, uncp, uncp) + compTable.MustRow(comp, comp, uncp, uncp, uncp, uncp) + + return &metadataLock +} + +// CanLock 判断这个锁能否锁定成功 +func (l *MetadataLock) CanLock(lock distlock.Lock) error { + return l.lockCompatibilityTable.Test(lock) +} + +// 锁定 +func (l *MetadataLock) Lock(reqID string, lock distlock.Lock) error { + switch lock.Name { + case METADATA_SET_READ_LOCK: + l.setReadReqIDs = append(l.setReadReqIDs, reqID) + case METADATA_SET_WRITE_LOCK: + l.setWriteReqIDs = append(l.setWriteReqIDs, reqID) + case METADATA_SET_CREATE_LOCK: + l.setCreateReqIDs = append(l.setCreateReqIDs, reqID) + + case METADATA_ELEMENT_READ_LOCK: + l.elementReadLocks = l.addElementLock(lock, l.elementReadLocks, reqID) + case METADATA_ELEMENT_WRITE_LOCK: + l.elementWriteLocks = l.addElementLock(lock, l.elementWriteLocks, reqID) + case METADATA_ELEMENT_CREATE_LOCK: + l.elementCreateLocks = l.addElementLock(lock, l.elementCreateLocks, reqID) + + default: + return fmt.Errorf("unknow lock name: %s", lock.Name) + } + + return nil +} + +func (l *MetadataLock) addElementLock(lock distlock.Lock, locks []*metadataElementLock, reqID string) []*metadataElementLock { + strTarget := lock.Target.(StringLockTarget) + lck, ok := lo.Find(locks, func(l *metadataElementLock) bool { return strTarget.IsConflict(&l.target) }) + if !ok { + lck = &metadataElementLock{ + target: strTarget, + } + locks = append(locks, lck) + } + + lck.requestIDs = append(lck.requestIDs, reqID) + return locks +} + +// 解锁 +func (l *MetadataLock) Unlock(reqID string, lock distlock.Lock) error { + switch lock.Name { + case METADATA_SET_READ_LOCK: + l.setReadReqIDs = mylo.Remove(l.setReadReqIDs, reqID) + case METADATA_SET_WRITE_LOCK: + l.setWriteReqIDs = mylo.Remove(l.setWriteReqIDs, reqID) + case METADATA_SET_CREATE_LOCK: + l.setCreateReqIDs = mylo.Remove(l.setCreateReqIDs, reqID) + + case METADATA_ELEMENT_READ_LOCK: + l.elementReadLocks = l.removeElementLock(lock, l.elementReadLocks, reqID) + case METADATA_ELEMENT_WRITE_LOCK: + l.elementWriteLocks = l.removeElementLock(lock, l.elementWriteLocks, reqID) + case METADATA_ELEMENT_CREATE_LOCK: + l.elementCreateLocks = l.removeElementLock(lock, l.elementCreateLocks, reqID) + + default: + return fmt.Errorf("unknow lock name: %s", lock.Name) + } + + return nil +} + +func (l *MetadataLock) removeElementLock(lock distlock.Lock, locks []*metadataElementLock, reqID string) []*metadataElementLock { + strTarget := lock.Target.(StringLockTarget) + lck, index, ok := lo.FindIndexOf(locks, func(l *metadataElementLock) bool { return strTarget.IsConflict(&l.target) }) + if !ok { + return locks + } + + lck.requestIDs = mylo.Remove(lck.requestIDs, reqID) + + if len(lck.requestIDs) == 0 { + locks = mylo.RemoveAt(locks, index) + } + + return locks +} + +// GetTargetString 将锁对象序列化为字符串,方便存储到ETCD +func (l *MetadataLock) GetTargetString(target any) (string, error) { + tar := target.(StringLockTarget) + return StringLockTargetToString(&tar) +} + +// ParseTargetString 解析字符串格式的锁对象数据 +func (l *MetadataLock) ParseTargetString(targetStr string) (any, error) { + return StringLockTargetFromString(targetStr) +} + +// Clear 清除内部所有状态 +func (l *MetadataLock) Clear() { + l.setReadReqIDs = nil + l.setWriteReqIDs = nil + l.setCreateReqIDs = nil + l.elementReadLocks = nil + l.elementWriteLocks = nil + l.elementCreateLocks = nil +} diff --git a/pkgs/distlock/lockprovider/storage_lock.go b/pkgs/distlock/lockprovider/storage_lock.go new file mode 100644 index 0000000..83a22b6 --- /dev/null +++ b/pkgs/distlock/lockprovider/storage_lock.go @@ -0,0 +1,226 @@ +package lockprovider + +import ( + "fmt" + + "github.com/samber/lo" + "gitlink.org.cn/cloudream/common/pkgs/distlock" + mylo "gitlink.org.cn/cloudream/common/utils/lo" +) + +const ( + StorageLockPathPrefix = "Storage" + + STORAGE_SET_READ_LOCK = "SetRead" + STORAGE_SET_WRITE_LOCK = "SetWrite" + STORAGE_SET_CREATE_LOCK = "SetCreate" + + STORAGE_ELEMENT_READ_LOCK = "ElementRead" + STORAGE_ELEMENT_WRITE_LOCK = "ElementWrite" + STORAGE_ELEMENT_CREATE_LOCK = "ElementCreate" + + STORAGE_STORAGE_ID_PATH_INDEX = 1 +) + +type StorageLock struct { + nodeLocks map[string]*StorageNodeLock + dummyLock *StorageNodeLock +} + +func NewStorageLock() *StorageLock { + return &StorageLock{ + nodeLocks: make(map[string]*StorageNodeLock), + dummyLock: NewStorageNodeLock(), + } +} + +// CanLock 判断这个锁能否锁定成功 +func (l *StorageLock) CanLock(lock distlock.Lock) error { + nodeLock, ok := l.nodeLocks[lock.Path[STORAGE_STORAGE_ID_PATH_INDEX]] + if !ok { + // 不能直接返回nil,因为如果锁数据的格式不对,也不能获取锁。 + // 这里使用一个空Provider来进行检查。 + return l.dummyLock.CanLock(lock) + } + + return nodeLock.CanLock(lock) +} + +// 锁定。在内部可以不用判断能否加锁,外部需要保证调用此函数前调用了CanLock进行检查 +func (l *StorageLock) Lock(reqID string, lock distlock.Lock) error { + nodeID := lock.Path[STORAGE_STORAGE_ID_PATH_INDEX] + + nodeLock, ok := l.nodeLocks[nodeID] + if !ok { + nodeLock = NewStorageNodeLock() + l.nodeLocks[nodeID] = nodeLock + } + + return nodeLock.Lock(reqID, lock) +} + +// 解锁 +func (l *StorageLock) Unlock(reqID string, lock distlock.Lock) error { + nodeID := lock.Path[STORAGE_STORAGE_ID_PATH_INDEX] + + nodeLock, ok := l.nodeLocks[nodeID] + if !ok { + return nil + } + + return nodeLock.Unlock(reqID, lock) +} + +// GetTargetString 将锁对象序列化为字符串,方便存储到ETCD +func (l *StorageLock) GetTargetString(target any) (string, error) { + tar := target.(StringLockTarget) + return StringLockTargetToString(&tar) +} + +// ParseTargetString 解析字符串格式的锁对象数据 +func (l *StorageLock) ParseTargetString(targetStr string) (any, error) { + return StringLockTargetFromString(targetStr) +} + +// Clear 清除内部所有状态 +func (l *StorageLock) Clear() { + l.nodeLocks = make(map[string]*StorageNodeLock) +} + +type storageElementLock struct { + target StringLockTarget + requestIDs []string +} + +type StorageNodeLock struct { + setReadReqIDs []string + setWriteReqIDs []string + setCreateReqIDs []string + + elementReadLocks []*storageElementLock + elementWriteLocks []*storageElementLock + elementCreateLocks []*storageElementLock + + lockCompatibilityTable LockCompatibilityTable +} + +func NewStorageNodeLock() *StorageNodeLock { + + storageLock := StorageNodeLock{ + lockCompatibilityTable: LockCompatibilityTable{}, + } + + compTable := &storageLock.lockCompatibilityTable + + compTable. + Column(STORAGE_ELEMENT_READ_LOCK, func() bool { return len(storageLock.elementReadLocks) > 0 }). + Column(STORAGE_ELEMENT_WRITE_LOCK, func() bool { return len(storageLock.elementWriteLocks) > 0 }). + Column(STORAGE_ELEMENT_CREATE_LOCK, func() bool { return len(storageLock.elementCreateLocks) > 0 }). + Column(STORAGE_SET_READ_LOCK, func() bool { return len(storageLock.setReadReqIDs) > 0 }). + Column(STORAGE_SET_WRITE_LOCK, func() bool { return len(storageLock.setWriteReqIDs) > 0 }). + Column(STORAGE_SET_CREATE_LOCK, func() bool { return len(storageLock.setCreateReqIDs) > 0 }) + + comp := LockCompatible() + uncp := LockUncompatible() + trgt := LockSpecial(func(lock distlock.Lock, testLockName string) bool { + strTar := lock.Target.(StringLockTarget) + if testLockName == STORAGE_ELEMENT_READ_LOCK { + // 如果没有任何锁的锁对象与当前的锁对象冲突,那么这个锁可以加 + return lo.NoneBy(storageLock.elementReadLocks, func(other *storageElementLock) bool { return strTar.IsConflict(&other.target) }) + } + + if testLockName == STORAGE_ELEMENT_WRITE_LOCK { + return lo.NoneBy(storageLock.elementWriteLocks, func(other *storageElementLock) bool { return strTar.IsConflict(&other.target) }) + } + + return lo.NoneBy(storageLock.elementCreateLocks, func(other *storageElementLock) bool { return strTar.IsConflict(&other.target) }) + }) + + compTable.MustRow(comp, trgt, comp, comp, uncp, comp) + compTable.MustRow(trgt, trgt, comp, uncp, uncp, comp) + compTable.MustRow(comp, comp, trgt, uncp, uncp, uncp) + compTable.MustRow(comp, uncp, uncp, comp, uncp, uncp) + compTable.MustRow(uncp, uncp, uncp, uncp, uncp, uncp) + compTable.MustRow(comp, comp, uncp, uncp, uncp, uncp) + + return &storageLock +} + +// CanLock 判断这个锁能否锁定成功 +func (l *StorageNodeLock) CanLock(lock distlock.Lock) error { + return l.lockCompatibilityTable.Test(lock) +} + +// 锁定 +func (l *StorageNodeLock) Lock(reqID string, lock distlock.Lock) error { + switch lock.Name { + case STORAGE_SET_READ_LOCK: + l.setReadReqIDs = append(l.setReadReqIDs, reqID) + case STORAGE_SET_WRITE_LOCK: + l.setWriteReqIDs = append(l.setWriteReqIDs, reqID) + case STORAGE_SET_CREATE_LOCK: + l.setCreateReqIDs = append(l.setCreateReqIDs, reqID) + + case STORAGE_ELEMENT_READ_LOCK: + l.elementReadLocks = l.addElementLock(lock, l.elementReadLocks, reqID) + case STORAGE_ELEMENT_WRITE_LOCK: + l.elementWriteLocks = l.addElementLock(lock, l.elementWriteLocks, reqID) + + default: + return fmt.Errorf("unknow lock name: %s", lock.Name) + } + + return nil +} + +func (l *StorageNodeLock) addElementLock(lock distlock.Lock, locks []*storageElementLock, reqID string) []*storageElementLock { + strTarget := lock.Target.(StringLockTarget) + lck, ok := lo.Find(locks, func(l *storageElementLock) bool { return strTarget.IsConflict(&l.target) }) + if !ok { + lck = &storageElementLock{ + target: strTarget, + } + locks = append(locks, lck) + } + + lck.requestIDs = append(lck.requestIDs, reqID) + return locks +} + +// 解锁 +func (l *StorageNodeLock) Unlock(reqID string, lock distlock.Lock) error { + switch lock.Name { + case STORAGE_SET_READ_LOCK: + l.setReadReqIDs = mylo.Remove(l.setReadReqIDs, reqID) + case STORAGE_SET_WRITE_LOCK: + l.setWriteReqIDs = mylo.Remove(l.setWriteReqIDs, reqID) + case STORAGE_SET_CREATE_LOCK: + l.setCreateReqIDs = mylo.Remove(l.setCreateReqIDs, reqID) + + case STORAGE_ELEMENT_READ_LOCK: + l.elementReadLocks = l.removeElementLock(lock, l.elementReadLocks, reqID) + case STORAGE_ELEMENT_WRITE_LOCK: + l.elementWriteLocks = l.removeElementLock(lock, l.elementWriteLocks, reqID) + + default: + return fmt.Errorf("unknow lock name: %s", lock.Name) + } + + return nil +} + +func (l *StorageNodeLock) removeElementLock(lock distlock.Lock, locks []*storageElementLock, reqID string) []*storageElementLock { + strTarget := lock.Target.(StringLockTarget) + lck, index, ok := lo.FindIndexOf(locks, func(l *storageElementLock) bool { return strTarget.IsConflict(&l.target) }) + if !ok { + return locks + } + + lck.requestIDs = mylo.Remove(lck.requestIDs, reqID) + + if len(lck.requestIDs) == 0 { + locks = mylo.RemoveAt(locks, index) + } + + return locks +} diff --git a/pkgs/distlock/lockprovider/string_lock_target.go b/pkgs/distlock/lockprovider/string_lock_target.go new file mode 100644 index 0000000..729dc85 --- /dev/null +++ b/pkgs/distlock/lockprovider/string_lock_target.go @@ -0,0 +1,78 @@ +package lockprovider + +import ( + "fmt" + + "github.com/samber/lo" + "gitlink.org.cn/cloudream/common/utils/serder" +) + +type StringLockTarget struct { + Components []StringLockTargetComponet `json:"components"` +} + +func NewStringLockTarget() *StringLockTarget { + return &StringLockTarget{} +} + +// Add 添加一个Component,并将其内容设置为compValues +func (t *StringLockTarget) Add(compValues ...any) *StringLockTarget { + t.Components = append(t.Components, StringLockTargetComponet{ + Values: lo.Map(compValues, func(val any, index int) string { return fmt.Sprintf("%v", val) }), + }) + + return t +} + +// IsConflict 判断两个锁对象是否冲突。注:只有相同的结构的Target才有意义 +func (t *StringLockTarget) IsConflict(other *StringLockTarget) bool { + if len(t.Components) != len(other.Components) { + return false + } + + if len(t.Components) == 0 { + return true + } + + for i := 0; i < len(t.Components); i++ { + if t.Components[i].IsEquals(&other.Components[i]) { + return true + } + } + + return false +} + +type StringLockTargetComponet struct { + Values []string `json:"values"` +} + +// IsEquals 判断两个Component是否相同。注:只有相同的结构的Component才有意义 +func (t *StringLockTargetComponet) IsEquals(other *StringLockTargetComponet) bool { + if len(t.Values) != len(other.Values) { + return false + } + + for i := 0; i < len(t.Values); i++ { + if t.Values[i] != other.Values[i] { + return false + } + } + + return true +} + +func StringLockTargetToString(target *StringLockTarget) (string, error) { + data, err := serder.ObjectToJSON(target) + if err != nil { + return "", err + } + + return string(data), nil +} + +func StringLockTargetFromString(str string) (StringLockTarget, error) { + var ret StringLockTarget + err := serder.JSONToObject([]byte(str), &ret) + return ret, err +} diff --git a/pkgs/distlock/lockprovider/string_lock_target_test.go b/pkgs/distlock/lockprovider/string_lock_target_test.go new file mode 100644 index 0000000..9cc5a73 --- /dev/null +++ b/pkgs/distlock/lockprovider/string_lock_target_test.go @@ -0,0 +1,60 @@ +package lockprovider + +import ( + "testing" + + . "github.com/smartystreets/goconvey/convey" +) + +func Test_StringLockTarget(t *testing.T) { + cases := []struct { + title string + target1 *StringLockTarget + target2 *StringLockTarget + wantIsConflict bool + }{ + { + title: "没有任何段算冲突", + target1: NewStringLockTarget(), + target2: NewStringLockTarget(), + wantIsConflict: true, + }, + { + title: "有段,但段内为空,算冲突", + target1: NewStringLockTarget().Add(), + target2: NewStringLockTarget().Add(), + wantIsConflict: true, + }, + { + title: "每一段不同才不冲突", + target1: NewStringLockTarget().Add("a").Add("b"), + target2: NewStringLockTarget().Add("b").Add("c"), + wantIsConflict: false, + }, + { + title: "只要有一段相同就冲突", + target1: NewStringLockTarget().Add("a").Add("b"), + target2: NewStringLockTarget().Add("a").Add("c"), + wantIsConflict: true, + }, + { + title: "同段内,只要有一个数据不同就不冲突", + target1: NewStringLockTarget().Add("a", "b"), + target2: NewStringLockTarget().Add("b", "b"), + wantIsConflict: false, + }, + { + title: "同段内,只要每个数据都相同才不冲突", + target1: NewStringLockTarget().Add("a", "b"), + target2: NewStringLockTarget().Add("a", "b"), + wantIsConflict: true, + }, + } + + for _, ca := range cases { + Convey(ca.title, t, func() { + ret := ca.target1.IsConflict(ca.target2) + So(ret, ShouldEqual, ca.wantIsConflict) + }) + } +} diff --git a/pkgs/distlock/reqbuilder/ipfs.go b/pkgs/distlock/reqbuilder/ipfs.go new file mode 100644 index 0000000..91907d1 --- /dev/null +++ b/pkgs/distlock/reqbuilder/ipfs.go @@ -0,0 +1,64 @@ +package reqbuilder + +import ( + "strconv" + + "gitlink.org.cn/cloudream/common/pkgs/distlock" + "gitlink.org.cn/cloudream/storage-common/pkgs/distlock/lockprovider" +) + +type IPFSLockReqBuilder struct { + *LockRequestBuilder +} + +func (b *LockRequestBuilder) IPFS() *IPFSLockReqBuilder { + return &IPFSLockReqBuilder{LockRequestBuilder: b} +} +func (b *IPFSLockReqBuilder) ReadOneRep(nodeID int64, fileHash string) *IPFSLockReqBuilder { + b.locks = append(b.locks, distlock.Lock{ + Path: b.makePath(nodeID), + Name: lockprovider.IPFS_ELEMENT_READ_LOCK, + Target: *lockprovider.NewStringLockTarget().Add(fileHash), + }) + return b +} + +func (b *IPFSLockReqBuilder) WriteOneRep(nodeID int64, fileHash string) *IPFSLockReqBuilder { + b.locks = append(b.locks, distlock.Lock{ + Path: b.makePath(nodeID), + Name: lockprovider.IPFS_ELEMENT_WRITE_LOCK, + Target: *lockprovider.NewStringLockTarget().Add(fileHash), + }) + return b +} + +func (b *IPFSLockReqBuilder) ReadAnyRep(nodeID int64) *IPFSLockReqBuilder { + b.locks = append(b.locks, distlock.Lock{ + Path: b.makePath(nodeID), + Name: lockprovider.IPFS_SET_READ_LOCK, + Target: *lockprovider.NewStringLockTarget(), + }) + return b +} + +func (b *IPFSLockReqBuilder) WriteAnyRep(nodeID int64) *IPFSLockReqBuilder { + b.locks = append(b.locks, distlock.Lock{ + Path: b.makePath(nodeID), + Name: lockprovider.IPFS_SET_WRITE_LOCK, + Target: *lockprovider.NewStringLockTarget(), + }) + return b +} + +func (b *IPFSLockReqBuilder) CreateAnyRep(nodeID int64) *IPFSLockReqBuilder { + b.locks = append(b.locks, distlock.Lock{ + Path: b.makePath(nodeID), + Name: lockprovider.IPFS_SET_CREATE_LOCK, + Target: *lockprovider.NewStringLockTarget(), + }) + return b +} + +func (b *IPFSLockReqBuilder) makePath(nodeID int64) []string { + return []string{lockprovider.IPFSLockPathPrefix, strconv.FormatInt(nodeID, 10)} +} diff --git a/pkgs/distlock/reqbuilder/lock_request_builder.go b/pkgs/distlock/reqbuilder/lock_request_builder.go new file mode 100644 index 0000000..a6af4ed --- /dev/null +++ b/pkgs/distlock/reqbuilder/lock_request_builder.go @@ -0,0 +1,31 @@ +package reqbuilder + +import ( + "gitlink.org.cn/cloudream/common/pkgs/distlock" + "gitlink.org.cn/cloudream/common/pkgs/distlock/service" + mylo "gitlink.org.cn/cloudream/common/utils/lo" +) + +type LockRequestBuilder struct { + locks []distlock.Lock +} + +func NewBuilder() *LockRequestBuilder { + return &LockRequestBuilder{} +} + +func (b *LockRequestBuilder) Build() distlock.LockRequest { + return distlock.LockRequest{ + Locks: mylo.ArrayClone(b.locks), + } +} + +func (b *LockRequestBuilder) MutexLock(svc *service.Service) (*service.Mutex, error) { + mutex := service.NewMutex(svc, b.Build()) + err := mutex.Lock() + if err != nil { + return nil, err + } + + return mutex, nil +} diff --git a/pkgs/distlock/reqbuilder/metadata.go b/pkgs/distlock/reqbuilder/metadata.go new file mode 100644 index 0000000..ae9243b --- /dev/null +++ b/pkgs/distlock/reqbuilder/metadata.go @@ -0,0 +1,17 @@ +package reqbuilder + +import ( + "gitlink.org.cn/cloudream/storage-common/pkgs/distlock/lockprovider" +) + +type MetadataLockReqBuilder struct { + *LockRequestBuilder +} + +func (b *LockRequestBuilder) Metadata() *MetadataLockReqBuilder { + return &MetadataLockReqBuilder{LockRequestBuilder: b} +} + +func (b *MetadataLockReqBuilder) makePath(tableName string) []string { + return []string{lockprovider.MetadataLockPathPrefix, tableName} +} diff --git a/pkgs/distlock/reqbuilder/metadata_bucket.go b/pkgs/distlock/reqbuilder/metadata_bucket.go new file mode 100644 index 0000000..fd81248 --- /dev/null +++ b/pkgs/distlock/reqbuilder/metadata_bucket.go @@ -0,0 +1,63 @@ +package reqbuilder + +import ( + "gitlink.org.cn/cloudream/common/pkgs/distlock" + "gitlink.org.cn/cloudream/storage-common/pkgs/distlock/lockprovider" +) + +type MetadataBucketLockReqBuilder struct { + *MetadataLockReqBuilder +} + +func (b *MetadataLockReqBuilder) Bucket() *MetadataBucketLockReqBuilder { + return &MetadataBucketLockReqBuilder{MetadataLockReqBuilder: b} +} + +func (b *MetadataBucketLockReqBuilder) ReadOne(bucketID int64) *MetadataBucketLockReqBuilder { + b.locks = append(b.locks, distlock.Lock{ + Path: b.makePath("Bucket"), + Name: lockprovider.METADATA_ELEMENT_READ_LOCK, + Target: *lockprovider.NewStringLockTarget().Add(bucketID), + }) + return b +} +func (b *MetadataBucketLockReqBuilder) WriteOne(bucketID int64) *MetadataBucketLockReqBuilder { + b.locks = append(b.locks, distlock.Lock{ + Path: b.makePath("Bucket"), + Name: lockprovider.METADATA_ELEMENT_WRITE_LOCK, + Target: *lockprovider.NewStringLockTarget().Add(bucketID), + }) + return b +} +func (b *MetadataBucketLockReqBuilder) CreateOne(userID int64, bucketName string) *MetadataBucketLockReqBuilder { + b.locks = append(b.locks, distlock.Lock{ + Path: b.makePath("Bucket"), + Name: lockprovider.METADATA_ELEMENT_CREATE_LOCK, + Target: *lockprovider.NewStringLockTarget().Add(userID, bucketName), + }) + return b +} +func (b *MetadataBucketLockReqBuilder) ReadAny() *MetadataBucketLockReqBuilder { + b.locks = append(b.locks, distlock.Lock{ + Path: b.makePath("Bucket"), + Name: lockprovider.METADATA_SET_READ_LOCK, + Target: *lockprovider.NewStringLockTarget(), + }) + return b +} +func (b *MetadataBucketLockReqBuilder) WriteAny() *MetadataBucketLockReqBuilder { + b.locks = append(b.locks, distlock.Lock{ + Path: b.makePath("Bucket"), + Name: lockprovider.METADATA_SET_WRITE_LOCK, + Target: *lockprovider.NewStringLockTarget(), + }) + return b +} +func (b *MetadataBucketLockReqBuilder) CreateAny() *MetadataBucketLockReqBuilder { + b.locks = append(b.locks, distlock.Lock{ + Path: b.makePath("Bucket"), + Name: lockprovider.METADATA_SET_CREATE_LOCK, + Target: *lockprovider.NewStringLockTarget(), + }) + return b +} diff --git a/pkgs/distlock/reqbuilder/metadata_cache.go b/pkgs/distlock/reqbuilder/metadata_cache.go new file mode 100644 index 0000000..4347ff7 --- /dev/null +++ b/pkgs/distlock/reqbuilder/metadata_cache.go @@ -0,0 +1,63 @@ +package reqbuilder + +import ( + "gitlink.org.cn/cloudream/common/pkgs/distlock" + "gitlink.org.cn/cloudream/storage-common/pkgs/distlock/lockprovider" +) + +type MetadataCacheLockReqBuilder struct { + *MetadataLockReqBuilder +} + +func (b *MetadataLockReqBuilder) Cache() *MetadataCacheLockReqBuilder { + return &MetadataCacheLockReqBuilder{MetadataLockReqBuilder: b} +} + +func (b *MetadataCacheLockReqBuilder) ReadOne(nodeID int64, fileHash string) *MetadataCacheLockReqBuilder { + b.locks = append(b.locks, distlock.Lock{ + Path: b.makePath("Cache"), + Name: lockprovider.METADATA_ELEMENT_READ_LOCK, + Target: *lockprovider.NewStringLockTarget().Add(nodeID, fileHash), + }) + return b +} +func (b *MetadataCacheLockReqBuilder) WriteOne(nodeID int64, fileHash string) *MetadataCacheLockReqBuilder { + b.locks = append(b.locks, distlock.Lock{ + Path: b.makePath("Cache"), + Name: lockprovider.METADATA_ELEMENT_WRITE_LOCK, + Target: *lockprovider.NewStringLockTarget().Add(nodeID, fileHash), + }) + return b +} +func (b *MetadataCacheLockReqBuilder) CreateOne(nodeID int64, fileHash string) *MetadataCacheLockReqBuilder { + b.locks = append(b.locks, distlock.Lock{ + Path: b.makePath("Cache"), + Name: lockprovider.METADATA_ELEMENT_CREATE_LOCK, + Target: *lockprovider.NewStringLockTarget().Add(nodeID, fileHash), + }) + return b +} +func (b *MetadataCacheLockReqBuilder) ReadAny() *MetadataCacheLockReqBuilder { + b.locks = append(b.locks, distlock.Lock{ + Path: b.makePath("Cache"), + Name: lockprovider.METADATA_SET_READ_LOCK, + Target: *lockprovider.NewStringLockTarget(), + }) + return b +} +func (b *MetadataCacheLockReqBuilder) WriteAny() *MetadataCacheLockReqBuilder { + b.locks = append(b.locks, distlock.Lock{ + Path: b.makePath("Cache"), + Name: lockprovider.METADATA_SET_WRITE_LOCK, + Target: *lockprovider.NewStringLockTarget(), + }) + return b +} +func (b *MetadataCacheLockReqBuilder) CreateAny() *MetadataCacheLockReqBuilder { + b.locks = append(b.locks, distlock.Lock{ + Path: b.makePath("Cache"), + Name: lockprovider.METADATA_SET_CREATE_LOCK, + Target: *lockprovider.NewStringLockTarget(), + }) + return b +} diff --git a/pkgs/distlock/reqbuilder/metadata_node.go b/pkgs/distlock/reqbuilder/metadata_node.go new file mode 100644 index 0000000..7893b89 --- /dev/null +++ b/pkgs/distlock/reqbuilder/metadata_node.go @@ -0,0 +1,63 @@ +package reqbuilder + +import ( + "gitlink.org.cn/cloudream/common/pkgs/distlock" + "gitlink.org.cn/cloudream/storage-common/pkgs/distlock/lockprovider" +) + +type MetadataNodeLockReqBuilder struct { + *MetadataLockReqBuilder +} + +func (b *MetadataLockReqBuilder) Node() *MetadataNodeLockReqBuilder { + return &MetadataNodeLockReqBuilder{MetadataLockReqBuilder: b} +} + +func (b *MetadataNodeLockReqBuilder) ReadOne(nodeID int64) *MetadataNodeLockReqBuilder { + b.locks = append(b.locks, distlock.Lock{ + Path: b.makePath("Node"), + Name: lockprovider.METADATA_ELEMENT_READ_LOCK, + Target: *lockprovider.NewStringLockTarget().Add(nodeID), + }) + return b +} +func (b *MetadataNodeLockReqBuilder) WriteOne(nodeID int64) *MetadataNodeLockReqBuilder { + b.locks = append(b.locks, distlock.Lock{ + Path: b.makePath("Node"), + Name: lockprovider.METADATA_ELEMENT_WRITE_LOCK, + Target: *lockprovider.NewStringLockTarget().Add(nodeID), + }) + return b +} +func (b *MetadataNodeLockReqBuilder) CreateOne() *MetadataNodeLockReqBuilder { + b.locks = append(b.locks, distlock.Lock{ + Path: b.makePath("Node"), + Name: lockprovider.METADATA_ELEMENT_CREATE_LOCK, + Target: *lockprovider.NewStringLockTarget(), + }) + return b +} +func (b *MetadataNodeLockReqBuilder) ReadAny() *MetadataNodeLockReqBuilder { + b.locks = append(b.locks, distlock.Lock{ + Path: b.makePath("Node"), + Name: lockprovider.METADATA_SET_READ_LOCK, + Target: *lockprovider.NewStringLockTarget(), + }) + return b +} +func (b *MetadataNodeLockReqBuilder) WriteAny() *MetadataNodeLockReqBuilder { + b.locks = append(b.locks, distlock.Lock{ + Path: b.makePath("Node"), + Name: lockprovider.METADATA_SET_WRITE_LOCK, + Target: *lockprovider.NewStringLockTarget(), + }) + return b +} +func (b *MetadataNodeLockReqBuilder) CreateAny() *MetadataNodeLockReqBuilder { + b.locks = append(b.locks, distlock.Lock{ + Path: b.makePath("Node"), + Name: lockprovider.METADATA_SET_CREATE_LOCK, + Target: *lockprovider.NewStringLockTarget(), + }) + return b +} diff --git a/pkgs/distlock/reqbuilder/metadata_object.go b/pkgs/distlock/reqbuilder/metadata_object.go new file mode 100644 index 0000000..be18a9a --- /dev/null +++ b/pkgs/distlock/reqbuilder/metadata_object.go @@ -0,0 +1,65 @@ +package reqbuilder + +import ( + "gitlink.org.cn/cloudream/common/pkgs/distlock" + "gitlink.org.cn/cloudream/storage-common/pkgs/distlock/lockprovider" +) + +// TODO 可以考虑增加基于PackageID的锁,让访问不同Package的Object的操作能并行 + +type MetadataObjectLockReqBuilder struct { + *MetadataLockReqBuilder +} + +func (b *MetadataLockReqBuilder) Object() *MetadataObjectLockReqBuilder { + return &MetadataObjectLockReqBuilder{MetadataLockReqBuilder: b} +} + +func (b *MetadataObjectLockReqBuilder) ReadOne(objectID int64) *MetadataObjectLockReqBuilder { + b.locks = append(b.locks, distlock.Lock{ + Path: b.makePath("Object"), + Name: lockprovider.METADATA_ELEMENT_READ_LOCK, + Target: *lockprovider.NewStringLockTarget().Add(objectID), + }) + return b +} +func (b *MetadataObjectLockReqBuilder) WriteOne(objectID int64) *MetadataObjectLockReqBuilder { + b.locks = append(b.locks, distlock.Lock{ + Path: b.makePath("Object"), + Name: lockprovider.METADATA_ELEMENT_WRITE_LOCK, + Target: *lockprovider.NewStringLockTarget().Add(objectID), + }) + return b +} +func (b *MetadataObjectLockReqBuilder) CreateOne(bucketID int64, objectName string) *MetadataObjectLockReqBuilder { + b.locks = append(b.locks, distlock.Lock{ + Path: b.makePath("Object"), + Name: lockprovider.METADATA_ELEMENT_CREATE_LOCK, + Target: *lockprovider.NewStringLockTarget().Add(bucketID, objectName), + }) + return b +} +func (b *MetadataObjectLockReqBuilder) ReadAny() *MetadataObjectLockReqBuilder { + b.locks = append(b.locks, distlock.Lock{ + Path: b.makePath("Object"), + Name: lockprovider.METADATA_SET_READ_LOCK, + Target: *lockprovider.NewStringLockTarget(), + }) + return b +} +func (b *MetadataObjectLockReqBuilder) WriteAny() *MetadataObjectLockReqBuilder { + b.locks = append(b.locks, distlock.Lock{ + Path: b.makePath("Object"), + Name: lockprovider.METADATA_SET_WRITE_LOCK, + Target: *lockprovider.NewStringLockTarget(), + }) + return b +} +func (b *MetadataObjectLockReqBuilder) CreateAny() *MetadataObjectLockReqBuilder { + b.locks = append(b.locks, distlock.Lock{ + Path: b.makePath("Object"), + Name: lockprovider.METADATA_SET_CREATE_LOCK, + Target: *lockprovider.NewStringLockTarget(), + }) + return b +} diff --git a/pkgs/distlock/reqbuilder/metadata_object_block.go b/pkgs/distlock/reqbuilder/metadata_object_block.go new file mode 100644 index 0000000..550bf55 --- /dev/null +++ b/pkgs/distlock/reqbuilder/metadata_object_block.go @@ -0,0 +1,63 @@ +package reqbuilder + +import ( + "gitlink.org.cn/cloudream/common/pkgs/distlock" + "gitlink.org.cn/cloudream/storage-common/pkgs/distlock/lockprovider" +) + +type MetadataObjectBlockLockReqBuilder struct { + *MetadataLockReqBuilder +} + +func (b *MetadataLockReqBuilder) ObjectBlock() *MetadataObjectBlockLockReqBuilder { + return &MetadataObjectBlockLockReqBuilder{MetadataLockReqBuilder: b} +} + +func (b *MetadataObjectBlockLockReqBuilder) ReadOne(objectID int) *MetadataObjectBlockLockReqBuilder { + b.locks = append(b.locks, distlock.Lock{ + Path: b.makePath("ObjectBlock"), + Name: lockprovider.METADATA_ELEMENT_READ_LOCK, + Target: *lockprovider.NewStringLockTarget().Add(objectID), + }) + return b +} +func (b *MetadataObjectBlockLockReqBuilder) WriteOne(objectID int) *MetadataObjectBlockLockReqBuilder { + b.locks = append(b.locks, distlock.Lock{ + Path: b.makePath("ObjectBlock"), + Name: lockprovider.METADATA_ELEMENT_WRITE_LOCK, + Target: *lockprovider.NewStringLockTarget().Add(objectID), + }) + return b +} +func (b *MetadataObjectBlockLockReqBuilder) CreateOne() *MetadataObjectBlockLockReqBuilder { + b.locks = append(b.locks, distlock.Lock{ + Path: b.makePath("ObjectBlock"), + Name: lockprovider.METADATA_ELEMENT_CREATE_LOCK, + Target: *lockprovider.NewStringLockTarget(), + }) + return b +} +func (b *MetadataObjectBlockLockReqBuilder) ReadAny() *MetadataObjectBlockLockReqBuilder { + b.locks = append(b.locks, distlock.Lock{ + Path: b.makePath("ObjectBlock"), + Name: lockprovider.METADATA_SET_READ_LOCK, + Target: *lockprovider.NewStringLockTarget(), + }) + return b +} +func (b *MetadataObjectBlockLockReqBuilder) WriteAny() *MetadataObjectBlockLockReqBuilder { + b.locks = append(b.locks, distlock.Lock{ + Path: b.makePath("ObjectBlock"), + Name: lockprovider.METADATA_SET_WRITE_LOCK, + Target: *lockprovider.NewStringLockTarget(), + }) + return b +} +func (b *MetadataObjectBlockLockReqBuilder) CreateAny() *MetadataObjectBlockLockReqBuilder { + b.locks = append(b.locks, distlock.Lock{ + Path: b.makePath("ObjectBlock"), + Name: lockprovider.METADATA_SET_CREATE_LOCK, + Target: *lockprovider.NewStringLockTarget(), + }) + return b +} diff --git a/pkgs/distlock/reqbuilder/metadata_object_rep.go b/pkgs/distlock/reqbuilder/metadata_object_rep.go new file mode 100644 index 0000000..6b54a15 --- /dev/null +++ b/pkgs/distlock/reqbuilder/metadata_object_rep.go @@ -0,0 +1,63 @@ +package reqbuilder + +import ( + "gitlink.org.cn/cloudream/common/pkgs/distlock" + "gitlink.org.cn/cloudream/storage-common/pkgs/distlock/lockprovider" +) + +type MetadataObjectRepLockReqBuilder struct { + *MetadataLockReqBuilder +} + +func (b *MetadataLockReqBuilder) ObjectRep() *MetadataObjectRepLockReqBuilder { + return &MetadataObjectRepLockReqBuilder{MetadataLockReqBuilder: b} +} + +func (b *MetadataObjectRepLockReqBuilder) ReadOne(objectID int64) *MetadataObjectRepLockReqBuilder { + b.locks = append(b.locks, distlock.Lock{ + Path: b.makePath("ObjectRep"), + Name: lockprovider.METADATA_ELEMENT_READ_LOCK, + Target: *lockprovider.NewStringLockTarget().Add(objectID), + }) + return b +} +func (b *MetadataObjectRepLockReqBuilder) WriteOne(objectID int64) *MetadataObjectRepLockReqBuilder { + b.locks = append(b.locks, distlock.Lock{ + Path: b.makePath("ObjectRep"), + Name: lockprovider.METADATA_ELEMENT_WRITE_LOCK, + Target: *lockprovider.NewStringLockTarget().Add(objectID), + }) + return b +} +func (b *MetadataObjectRepLockReqBuilder) CreateOne() *MetadataObjectRepLockReqBuilder { + b.locks = append(b.locks, distlock.Lock{ + Path: b.makePath("ObjectRep"), + Name: lockprovider.METADATA_ELEMENT_CREATE_LOCK, + Target: *lockprovider.NewStringLockTarget(), + }) + return b +} +func (b *MetadataObjectRepLockReqBuilder) ReadAny() *MetadataObjectRepLockReqBuilder { + b.locks = append(b.locks, distlock.Lock{ + Path: b.makePath("ObjectRep"), + Name: lockprovider.METADATA_SET_READ_LOCK, + Target: *lockprovider.NewStringLockTarget(), + }) + return b +} +func (b *MetadataObjectRepLockReqBuilder) WriteAny() *MetadataObjectRepLockReqBuilder { + b.locks = append(b.locks, distlock.Lock{ + Path: b.makePath("ObjectRep"), + Name: lockprovider.METADATA_SET_WRITE_LOCK, + Target: *lockprovider.NewStringLockTarget(), + }) + return b +} +func (b *MetadataObjectRepLockReqBuilder) CreateAny() *MetadataObjectRepLockReqBuilder { + b.locks = append(b.locks, distlock.Lock{ + Path: b.makePath("ObjectRep"), + Name: lockprovider.METADATA_SET_CREATE_LOCK, + Target: *lockprovider.NewStringLockTarget(), + }) + return b +} diff --git a/pkgs/distlock/reqbuilder/metadata_package.go b/pkgs/distlock/reqbuilder/metadata_package.go new file mode 100644 index 0000000..92f05af --- /dev/null +++ b/pkgs/distlock/reqbuilder/metadata_package.go @@ -0,0 +1,63 @@ +package reqbuilder + +import ( + "gitlink.org.cn/cloudream/common/pkgs/distlock" + "gitlink.org.cn/cloudream/storage-common/pkgs/distlock/lockprovider" +) + +type MetadataPackageLockReqBuilder struct { + *MetadataLockReqBuilder +} + +func (b *MetadataLockReqBuilder) Package() *MetadataPackageLockReqBuilder { + return &MetadataPackageLockReqBuilder{MetadataLockReqBuilder: b} +} + +func (b *MetadataPackageLockReqBuilder) ReadOne(packageID int64) *MetadataPackageLockReqBuilder { + b.locks = append(b.locks, distlock.Lock{ + Path: b.makePath("Package"), + Name: lockprovider.METADATA_ELEMENT_READ_LOCK, + Target: *lockprovider.NewStringLockTarget().Add(packageID), + }) + return b +} +func (b *MetadataPackageLockReqBuilder) WriteOne(packageID int64) *MetadataPackageLockReqBuilder { + b.locks = append(b.locks, distlock.Lock{ + Path: b.makePath("Package"), + Name: lockprovider.METADATA_ELEMENT_WRITE_LOCK, + Target: *lockprovider.NewStringLockTarget().Add(packageID), + }) + return b +} +func (b *MetadataPackageLockReqBuilder) CreateOne(bucketID int64, packageName string) *MetadataPackageLockReqBuilder { + b.locks = append(b.locks, distlock.Lock{ + Path: b.makePath("Package"), + Name: lockprovider.METADATA_ELEMENT_CREATE_LOCK, + Target: *lockprovider.NewStringLockTarget().Add(bucketID, packageName), + }) + return b +} +func (b *MetadataPackageLockReqBuilder) ReadAny() *MetadataPackageLockReqBuilder { + b.locks = append(b.locks, distlock.Lock{ + Path: b.makePath("Package"), + Name: lockprovider.METADATA_SET_READ_LOCK, + Target: *lockprovider.NewStringLockTarget(), + }) + return b +} +func (b *MetadataPackageLockReqBuilder) WriteAny() *MetadataPackageLockReqBuilder { + b.locks = append(b.locks, distlock.Lock{ + Path: b.makePath("Package"), + Name: lockprovider.METADATA_SET_WRITE_LOCK, + Target: *lockprovider.NewStringLockTarget(), + }) + return b +} +func (b *MetadataPackageLockReqBuilder) CreateAny() *MetadataPackageLockReqBuilder { + b.locks = append(b.locks, distlock.Lock{ + Path: b.makePath("Package"), + Name: lockprovider.METADATA_SET_CREATE_LOCK, + Target: *lockprovider.NewStringLockTarget(), + }) + return b +} diff --git a/pkgs/distlock/reqbuilder/metadata_storage_package.go b/pkgs/distlock/reqbuilder/metadata_storage_package.go new file mode 100644 index 0000000..1ad0472 --- /dev/null +++ b/pkgs/distlock/reqbuilder/metadata_storage_package.go @@ -0,0 +1,63 @@ +package reqbuilder + +import ( + "gitlink.org.cn/cloudream/common/pkgs/distlock" + "gitlink.org.cn/cloudream/storage-common/pkgs/distlock/lockprovider" +) + +type MetadataStoragePackageLockReqBuilder struct { + *MetadataLockReqBuilder +} + +func (b *MetadataLockReqBuilder) StoragePackage() *MetadataStoragePackageLockReqBuilder { + return &MetadataStoragePackageLockReqBuilder{MetadataLockReqBuilder: b} +} + +func (b *MetadataStoragePackageLockReqBuilder) ReadOne(storageID int64, userID int64, packageID int64) *MetadataStoragePackageLockReqBuilder { + b.locks = append(b.locks, distlock.Lock{ + Path: b.makePath("StoragePackage"), + Name: lockprovider.METADATA_ELEMENT_READ_LOCK, + Target: *lockprovider.NewStringLockTarget().Add(storageID, userID, packageID), + }) + return b +} +func (b *MetadataStoragePackageLockReqBuilder) WriteOne(storageID int64, userID int64, packageID int64) *MetadataStoragePackageLockReqBuilder { + b.locks = append(b.locks, distlock.Lock{ + Path: b.makePath("StoragePackage"), + Name: lockprovider.METADATA_ELEMENT_WRITE_LOCK, + Target: *lockprovider.NewStringLockTarget().Add(storageID, userID, packageID), + }) + return b +} +func (b *MetadataStoragePackageLockReqBuilder) CreateOne(storageID int64, userID int64, packageID int64) *MetadataStoragePackageLockReqBuilder { + b.locks = append(b.locks, distlock.Lock{ + Path: b.makePath("StoragePackage"), + Name: lockprovider.METADATA_ELEMENT_CREATE_LOCK, + Target: *lockprovider.NewStringLockTarget().Add(storageID, userID, packageID), + }) + return b +} +func (b *MetadataStoragePackageLockReqBuilder) ReadAny() *MetadataStoragePackageLockReqBuilder { + b.locks = append(b.locks, distlock.Lock{ + Path: b.makePath("StoragePackage"), + Name: lockprovider.METADATA_SET_READ_LOCK, + Target: *lockprovider.NewStringLockTarget(), + }) + return b +} +func (b *MetadataStoragePackageLockReqBuilder) WriteAny() *MetadataStoragePackageLockReqBuilder { + b.locks = append(b.locks, distlock.Lock{ + Path: b.makePath("StoragePackage"), + Name: lockprovider.METADATA_SET_WRITE_LOCK, + Target: *lockprovider.NewStringLockTarget(), + }) + return b +} +func (b *MetadataStoragePackageLockReqBuilder) CreateAny() *MetadataStoragePackageLockReqBuilder { + b.locks = append(b.locks, distlock.Lock{ + Path: b.makePath("StoragePackage"), + Name: lockprovider.METADATA_SET_CREATE_LOCK, + Target: *lockprovider.NewStringLockTarget(), + }) + return b +} diff --git a/pkgs/distlock/reqbuilder/metadata_user_bucket.go b/pkgs/distlock/reqbuilder/metadata_user_bucket.go new file mode 100644 index 0000000..6abd6ab --- /dev/null +++ b/pkgs/distlock/reqbuilder/metadata_user_bucket.go @@ -0,0 +1,63 @@ +package reqbuilder + +import ( + "gitlink.org.cn/cloudream/common/pkgs/distlock" + "gitlink.org.cn/cloudream/storage-common/pkgs/distlock/lockprovider" +) + +type MetadataUserBucketLockReqBuilder struct { + *MetadataLockReqBuilder +} + +func (b *MetadataLockReqBuilder) UserBucket() *MetadataUserBucketLockReqBuilder { + return &MetadataUserBucketLockReqBuilder{MetadataLockReqBuilder: b} +} + +func (b *MetadataUserBucketLockReqBuilder) ReadOne(userID int64, bucketID int64) *MetadataUserBucketLockReqBuilder { + b.locks = append(b.locks, distlock.Lock{ + Path: b.makePath("UserBucket"), + Name: lockprovider.METADATA_ELEMENT_READ_LOCK, + Target: *lockprovider.NewStringLockTarget().Add(userID, bucketID), + }) + return b +} +func (b *MetadataUserBucketLockReqBuilder) WriteOne(userID int64, bucketID int64) *MetadataUserBucketLockReqBuilder { + b.locks = append(b.locks, distlock.Lock{ + Path: b.makePath("UserBucket"), + Name: lockprovider.METADATA_ELEMENT_WRITE_LOCK, + Target: *lockprovider.NewStringLockTarget().Add(userID, bucketID), + }) + return b +} +func (b *MetadataUserBucketLockReqBuilder) CreateOne(userID int64, bucketID int64) *MetadataUserBucketLockReqBuilder { + b.locks = append(b.locks, distlock.Lock{ + Path: b.makePath("UserBucket"), + Name: lockprovider.METADATA_ELEMENT_CREATE_LOCK, + Target: *lockprovider.NewStringLockTarget().Add(userID, bucketID), + }) + return b +} +func (b *MetadataUserBucketLockReqBuilder) ReadAny() *MetadataUserBucketLockReqBuilder { + b.locks = append(b.locks, distlock.Lock{ + Path: b.makePath("UserBucket"), + Name: lockprovider.METADATA_SET_READ_LOCK, + Target: *lockprovider.NewStringLockTarget(), + }) + return b +} +func (b *MetadataUserBucketLockReqBuilder) WriteAny() *MetadataUserBucketLockReqBuilder { + b.locks = append(b.locks, distlock.Lock{ + Path: b.makePath("UserBucket"), + Name: lockprovider.METADATA_SET_WRITE_LOCK, + Target: *lockprovider.NewStringLockTarget(), + }) + return b +} +func (b *MetadataUserBucketLockReqBuilder) CreateAny() *MetadataUserBucketLockReqBuilder { + b.locks = append(b.locks, distlock.Lock{ + Path: b.makePath("UserBucket"), + Name: lockprovider.METADATA_SET_CREATE_LOCK, + Target: *lockprovider.NewStringLockTarget(), + }) + return b +} diff --git a/pkgs/distlock/reqbuilder/metadata_user_storage.go b/pkgs/distlock/reqbuilder/metadata_user_storage.go new file mode 100644 index 0000000..afc28d8 --- /dev/null +++ b/pkgs/distlock/reqbuilder/metadata_user_storage.go @@ -0,0 +1,63 @@ +package reqbuilder + +import ( + "gitlink.org.cn/cloudream/common/pkgs/distlock" + "gitlink.org.cn/cloudream/storage-common/pkgs/distlock/lockprovider" +) + +type MetadataUserStorageLockReqBuilder struct { + *MetadataLockReqBuilder +} + +func (b *MetadataLockReqBuilder) UserStorage() *MetadataUserStorageLockReqBuilder { + return &MetadataUserStorageLockReqBuilder{MetadataLockReqBuilder: b} +} + +func (b *MetadataUserStorageLockReqBuilder) ReadOne(userID int64, storageID int64) *MetadataUserStorageLockReqBuilder { + b.locks = append(b.locks, distlock.Lock{ + Path: b.makePath("UserStorage"), + Name: lockprovider.METADATA_ELEMENT_READ_LOCK, + Target: *lockprovider.NewStringLockTarget().Add(userID, storageID), + }) + return b +} +func (b *MetadataUserStorageLockReqBuilder) WriteOne(userID int64, storageID int64) *MetadataUserStorageLockReqBuilder { + b.locks = append(b.locks, distlock.Lock{ + Path: b.makePath("UserStorage"), + Name: lockprovider.METADATA_ELEMENT_WRITE_LOCK, + Target: *lockprovider.NewStringLockTarget().Add(userID, storageID), + }) + return b +} +func (b *MetadataUserStorageLockReqBuilder) CreateOne(userID int64, storageID int64) *MetadataUserStorageLockReqBuilder { + b.locks = append(b.locks, distlock.Lock{ + Path: b.makePath("UserStorage"), + Name: lockprovider.METADATA_ELEMENT_CREATE_LOCK, + Target: *lockprovider.NewStringLockTarget().Add(userID, storageID), + }) + return b +} +func (b *MetadataUserStorageLockReqBuilder) ReadAny() *MetadataUserStorageLockReqBuilder { + b.locks = append(b.locks, distlock.Lock{ + Path: b.makePath("UserStorage"), + Name: lockprovider.METADATA_SET_READ_LOCK, + Target: *lockprovider.NewStringLockTarget(), + }) + return b +} +func (b *MetadataUserStorageLockReqBuilder) WriteAny() *MetadataUserStorageLockReqBuilder { + b.locks = append(b.locks, distlock.Lock{ + Path: b.makePath("UserStorage"), + Name: lockprovider.METADATA_SET_WRITE_LOCK, + Target: *lockprovider.NewStringLockTarget(), + }) + return b +} +func (b *MetadataUserStorageLockReqBuilder) CreateAny() *MetadataUserStorageLockReqBuilder { + b.locks = append(b.locks, distlock.Lock{ + Path: b.makePath("UserStorage"), + Name: lockprovider.METADATA_SET_CREATE_LOCK, + Target: *lockprovider.NewStringLockTarget(), + }) + return b +} diff --git a/pkgs/distlock/reqbuilder/storage.go b/pkgs/distlock/reqbuilder/storage.go new file mode 100644 index 0000000..f074abe --- /dev/null +++ b/pkgs/distlock/reqbuilder/storage.go @@ -0,0 +1,74 @@ +package reqbuilder + +import ( + "strconv" + + "gitlink.org.cn/cloudream/common/pkgs/distlock" + "gitlink.org.cn/cloudream/storage-common/pkgs/distlock/lockprovider" +) + +type StorageLockReqBuilder struct { + *LockRequestBuilder +} + +func (b *LockRequestBuilder) Storage() *StorageLockReqBuilder { + return &StorageLockReqBuilder{LockRequestBuilder: b} +} + +func (b *StorageLockReqBuilder) ReadOnePackage(storageID int64, userID int64, packageID int64) *StorageLockReqBuilder { + b.locks = append(b.locks, distlock.Lock{ + Path: b.makePath(storageID), + Name: lockprovider.STORAGE_ELEMENT_READ_LOCK, + Target: *lockprovider.NewStringLockTarget().Add(userID, packageID), + }) + return b +} + +func (b *StorageLockReqBuilder) WriteOnePackage(storageID int64, userID int64, packageID int64) *StorageLockReqBuilder { + b.locks = append(b.locks, distlock.Lock{ + Path: b.makePath(storageID), + Name: lockprovider.STORAGE_ELEMENT_WRITE_LOCK, + Target: *lockprovider.NewStringLockTarget().Add(userID, packageID), + }) + return b +} + +func (b *StorageLockReqBuilder) CreateOnePackage(storageID int64, userID int64, packageID int64) *StorageLockReqBuilder { + b.locks = append(b.locks, distlock.Lock{ + Path: b.makePath(storageID), + Name: lockprovider.STORAGE_ELEMENT_WRITE_LOCK, + Target: *lockprovider.NewStringLockTarget().Add(userID, packageID), + }) + return b +} + +func (b *StorageLockReqBuilder) ReadAnyPackage(storageID int64) *StorageLockReqBuilder { + b.locks = append(b.locks, distlock.Lock{ + Path: b.makePath(storageID), + Name: lockprovider.STORAGE_SET_READ_LOCK, + Target: *lockprovider.NewStringLockTarget(), + }) + return b +} + +func (b *StorageLockReqBuilder) WriteAnyPackage(storageID int64) *StorageLockReqBuilder { + b.locks = append(b.locks, distlock.Lock{ + Path: b.makePath(storageID), + Name: lockprovider.STORAGE_SET_WRITE_LOCK, + Target: *lockprovider.NewStringLockTarget(), + }) + return b +} + +func (b *StorageLockReqBuilder) CreateAnyPackage(storageID int64) *StorageLockReqBuilder { + b.locks = append(b.locks, distlock.Lock{ + Path: b.makePath(storageID), + Name: lockprovider.STORAGE_SET_CREATE_LOCK, + Target: *lockprovider.NewStringLockTarget(), + }) + return b +} + +func (b *StorageLockReqBuilder) makePath(storageID int64) []string { + return []string{lockprovider.StorageLockPathPrefix, strconv.FormatInt(storageID, 10)} +} diff --git a/pkgs/distlock/service.go b/pkgs/distlock/service.go new file mode 100644 index 0000000..119ce78 --- /dev/null +++ b/pkgs/distlock/service.go @@ -0,0 +1,62 @@ +package distlock + +import ( + "gitlink.org.cn/cloudream/common/pkgs/distlock" + "gitlink.org.cn/cloudream/common/pkgs/distlock/service" + "gitlink.org.cn/cloudream/common/pkgs/trie" + "gitlink.org.cn/cloudream/storage-common/pkgs/distlock/lockprovider" +) + +type Service = service.Service + +func NewService(cfg *distlock.Config) (*service.Service, error) { + srv, err := service.NewService(cfg, initProviders()) + if err != nil { + return nil, err + } + + return srv, nil +} + +func initProviders() []service.PathProvider { + var provs []service.PathProvider + + provs = append(provs, initMetadataLockProviders()...) + + provs = append(provs, initIPFSLockProviders()...) + + provs = append(provs, initStorageLockProviders()...) + + return provs +} + +func initMetadataLockProviders() []service.PathProvider { + return []service.PathProvider{ + service.NewPathProvider(lockprovider.NewMetadataLock(), lockprovider.MetadataLockPathPrefix, "Node"), + service.NewPathProvider(lockprovider.NewMetadataLock(), lockprovider.MetadataLockPathPrefix, "Storage"), + service.NewPathProvider(lockprovider.NewMetadataLock(), lockprovider.MetadataLockPathPrefix, "User"), + service.NewPathProvider(lockprovider.NewMetadataLock(), lockprovider.MetadataLockPathPrefix, "UserBucket"), + service.NewPathProvider(lockprovider.NewMetadataLock(), lockprovider.MetadataLockPathPrefix, "UserNode"), + service.NewPathProvider(lockprovider.NewMetadataLock(), lockprovider.MetadataLockPathPrefix, "UserStorage"), + service.NewPathProvider(lockprovider.NewMetadataLock(), lockprovider.MetadataLockPathPrefix, "Bucket"), + service.NewPathProvider(lockprovider.NewMetadataLock(), lockprovider.MetadataLockPathPrefix, "Object"), + service.NewPathProvider(lockprovider.NewMetadataLock(), lockprovider.MetadataLockPathPrefix, "Package"), + service.NewPathProvider(lockprovider.NewMetadataLock(), lockprovider.MetadataLockPathPrefix, "ObjectRep"), + service.NewPathProvider(lockprovider.NewMetadataLock(), lockprovider.MetadataLockPathPrefix, "ObjectBlock"), + service.NewPathProvider(lockprovider.NewMetadataLock(), lockprovider.MetadataLockPathPrefix, "Cache"), + service.NewPathProvider(lockprovider.NewMetadataLock(), lockprovider.MetadataLockPathPrefix, "StoragePackage"), + service.NewPathProvider(lockprovider.NewMetadataLock(), lockprovider.MetadataLockPathPrefix, "Location"), + } +} + +func initIPFSLockProviders() []service.PathProvider { + return []service.PathProvider{ + service.NewPathProvider(lockprovider.NewIPFSLock(), lockprovider.IPFSLockPathPrefix, trie.WORD_ANY), + } +} + +func initStorageLockProviders() []service.PathProvider { + return []service.PathProvider{ + service.NewPathProvider(lockprovider.NewStorageLock(), lockprovider.StorageLockPathPrefix, trie.WORD_ANY), + } +} diff --git a/pkgs/grpc/agent/pool.go b/pkgs/grpc/agent/pool.go index 342fb93..9e6b2da 100644 --- a/pkgs/grpc/agent/pool.go +++ b/pkgs/grpc/agent/pool.go @@ -39,5 +39,5 @@ func (p *Pool) Acquire(ip string) (*PoolClient, error) { } func (p *Pool) Release(cli *PoolClient) { - cli.Close() + cli.Client.Close() } diff --git a/pkgs/iterator/ec_object_iterator.go b/pkgs/iterator/ec_object_iterator.go index 3b489e4..85e2641 100644 --- a/pkgs/iterator/ec_object_iterator.go +++ b/pkgs/iterator/ec_object_iterator.go @@ -16,6 +16,8 @@ import ( ) type ECObjectIterator struct { + OnClosing func() + objects []model.Object objectECData []models.ObjectECData currentIndex int @@ -41,6 +43,7 @@ func NewECObjectIterator(objects []model.Object, objectECData []models.ObjectECD } func (i *ECObjectIterator) MoveNext() (*IterDownloadingObject, error) { + // TODO 加锁 coorCli, err := globals.CoordinatorMQPool.Acquire() if err != nil { return nil, fmt.Errorf("new coordinator client: %w", err) @@ -123,7 +126,9 @@ func (iter *ECObjectIterator) doMove(coorCli *coormq.PoolClient) (*IterDownloadi } func (i *ECObjectIterator) Close() { - + if i.OnClosing != nil { + i.OnClosing() + } } // chooseDownloadNode 选择一个下载节点 diff --git a/pkgs/iterator/local_uploading_iterator.go b/pkgs/iterator/local_uploading_iterator.go index 0e4ca33..cf2daea 100644 --- a/pkgs/iterator/local_uploading_iterator.go +++ b/pkgs/iterator/local_uploading_iterator.go @@ -52,7 +52,7 @@ func (i *LocalUploadingIterator) doMove() (*IterUploadingObject, error) { } return &IterUploadingObject{ - Path: strings.TrimPrefix(filepath.ToSlash(path), i.pathRoot), + Path: strings.TrimPrefix(filepath.ToSlash(path), i.pathRoot+"/"), Size: info.Size(), File: file, }, nil diff --git a/pkgs/iterator/rep_object_iterator.go b/pkgs/iterator/rep_object_iterator.go index c316ee0..ea5e18b 100644 --- a/pkgs/iterator/rep_object_iterator.go +++ b/pkgs/iterator/rep_object_iterator.go @@ -6,19 +6,21 @@ import ( "math/rand" "github.com/samber/lo" - "gitlink.org.cn/cloudream/common/pkgs/distlock/reqbuilder" distsvc "gitlink.org.cn/cloudream/common/pkgs/distlock/service" "gitlink.org.cn/cloudream/common/pkgs/logger" myio "gitlink.org.cn/cloudream/common/utils/io" "gitlink.org.cn/cloudream/storage-common/globals" "gitlink.org.cn/cloudream/storage-common/models" "gitlink.org.cn/cloudream/storage-common/pkgs/db/model" + "gitlink.org.cn/cloudream/storage-common/pkgs/distlock/reqbuilder" coormq "gitlink.org.cn/cloudream/storage-common/pkgs/mq/coordinator" ) type DownloadingObjectIterator = Iterator[*IterDownloadingObject] type RepObjectIterator struct { + OnClosing func() + objects []model.Object objectRepData []models.ObjectRepData currentIndex int @@ -51,6 +53,7 @@ func NewRepObjectIterator(objects []model.Object, objectRepData []models.ObjectR } func (i *RepObjectIterator) MoveNext() (*IterDownloadingObject, error) { + // TODO 加锁 coorCli, err := globals.CoordinatorMQPool.Acquire() if err != nil { return nil, fmt.Errorf("new coordinator client: %w", err) @@ -116,7 +119,9 @@ func (i *RepObjectIterator) doMove(coorCli *coormq.PoolClient) (*IterDownloading } func (i *RepObjectIterator) Close() { - + if i.OnClosing != nil { + i.OnClosing() + } } // chooseDownloadNode 选择一个下载节点 @@ -135,7 +140,7 @@ func downloadFile(ctx *DownloadContext, nodeID int64, nodeIP string, fileHash st if globals.IPFSPool != nil { logger.Infof("try to use local IPFS to download file") - reader, err := downloadFromLocalIPFS(fileHash) + reader, err := downloadFromLocalIPFS(ctx, fileHash) if err == nil { return reader, nil } @@ -173,7 +178,22 @@ func downloadFromNode(ctx *DownloadContext, nodeID int64, nodeIP string, fileHas return reader, nil } -func downloadFromLocalIPFS(fileHash string) (io.ReadCloser, error) { +func downloadFromLocalIPFS(ctx *DownloadContext, fileHash string) (io.ReadCloser, error) { + onClosed := func() {} + if globals.Local.NodeID != nil { + // 二次获取锁 + mutex, err := reqbuilder.NewBuilder(). + // 用于从IPFS下载文件 + IPFS().ReadOneRep(*globals.Local.NodeID, fileHash). + MutexLock(ctx.Distlock) + if err != nil { + return nil, fmt.Errorf("acquire locks failed, err: %w", err) + } + onClosed = func() { + mutex.Unlock() + } + } + ipfsCli, err := globals.IPFSPool.Acquire() if err != nil { return nil, fmt.Errorf("new ipfs client: %w", err) @@ -184,5 +204,8 @@ func downloadFromLocalIPFS(fileHash string) (io.ReadCloser, error) { return nil, fmt.Errorf("read ipfs file failed, err: %w", err) } + reader = myio.AfterReadClosed(reader, func(io.ReadCloser) { + onClosed() + }) return reader, nil } diff --git a/pkgs/mq/agent/storage.go b/pkgs/mq/agent/storage.go index dea4a3b..2be6055 100644 --- a/pkgs/mq/agent/storage.go +++ b/pkgs/mq/agent/storage.go @@ -7,9 +7,9 @@ import ( ) type StorageService interface { - StartStorageMovePackage(msg *StartStorageMovePackage) (*StartStorageMovePackageResp, *mq.CodeMessage) + StartStorageLoadPackage(msg *StartStorageLoadPackage) (*StartStorageLoadPackageResp, *mq.CodeMessage) - WaitStorageMovePackage(msg *WaitStorageMovePackage) (*WaitStorageMovePackageResp, *mq.CodeMessage) + WaitStorageLoadPackage(msg *WaitStorageLoadPackage) (*WaitStorageLoadPackageResp, *mq.CodeMessage) StorageCheck(msg *StorageCheck) (*StorageCheckResp, *mq.CodeMessage) @@ -19,59 +19,59 @@ type StorageService interface { } // 启动调度Package的任务 -var _ = Register(StorageService.StartStorageMovePackage) +var _ = Register(StorageService.StartStorageLoadPackage) -type StartStorageMovePackage struct { +type StartStorageLoadPackage struct { UserID int64 `json:"userID"` PackageID int64 `json:"packageID"` StorageID int64 `json:"storageID"` } -type StartStorageMovePackageResp struct { +type StartStorageLoadPackageResp struct { TaskID string `json:"taskID"` } -func NewStartStorageMovePackage(userID int64, packageID int64, storageID int64) StartStorageMovePackage { - return StartStorageMovePackage{ +func NewStartStorageLoadPackage(userID int64, packageID int64, storageID int64) StartStorageLoadPackage { + return StartStorageLoadPackage{ UserID: userID, PackageID: packageID, StorageID: storageID, } } -func NewStartStorageMovePackageResp(taskID string) StartStorageMovePackageResp { - return StartStorageMovePackageResp{ +func NewStartStorageLoadPackageResp(taskID string) StartStorageLoadPackageResp { + return StartStorageLoadPackageResp{ TaskID: taskID, } } -func (client *Client) StartStorageMovePackage(msg StartStorageMovePackage, opts ...mq.RequestOption) (*StartStorageMovePackageResp, error) { - return mq.Request[StartStorageMovePackageResp](client.rabbitCli, msg, opts...) +func (client *Client) StartStorageLoadPackage(msg StartStorageLoadPackage, opts ...mq.RequestOption) (*StartStorageLoadPackageResp, error) { + return mq.Request[StartStorageLoadPackageResp](client.rabbitCli, msg, opts...) } // 等待调度Package的任务 -var _ = Register(StorageService.WaitStorageMovePackage) +var _ = Register(StorageService.WaitStorageLoadPackage) -type WaitStorageMovePackage struct { +type WaitStorageLoadPackage struct { TaskID string `json:"taskID"` WaitTimeoutMs int64 `json:"waitTimeout"` } -type WaitStorageMovePackageResp struct { +type WaitStorageLoadPackageResp struct { IsComplete bool `json:"isComplete"` Error string `json:"error"` } -func NewWaitStorageMovePackage(taskID string, waitTimeoutMs int64) WaitStorageMovePackage { - return WaitStorageMovePackage{ +func NewWaitStorageLoadPackage(taskID string, waitTimeoutMs int64) WaitStorageLoadPackage { + return WaitStorageLoadPackage{ TaskID: taskID, WaitTimeoutMs: waitTimeoutMs, } } -func NewWaitStorageMovePackageResp(isComplete bool, err string) WaitStorageMovePackageResp { - return WaitStorageMovePackageResp{ +func NewWaitStorageLoadPackageResp(isComplete bool, err string) WaitStorageLoadPackageResp { + return WaitStorageLoadPackageResp{ IsComplete: isComplete, Error: err, } } -func (client *Client) WaitStorageMovePackage(msg WaitStorageMovePackage, opts ...mq.RequestOption) (*WaitStorageMovePackageResp, error) { - return mq.Request[WaitStorageMovePackageResp](client.rabbitCli, msg, opts...) +func (client *Client) WaitStorageLoadPackage(msg WaitStorageLoadPackage, opts ...mq.RequestOption) (*WaitStorageLoadPackageResp, error) { + return mq.Request[WaitStorageLoadPackageResp](client.rabbitCli, msg, opts...) } // 检查Storage diff --git a/pkgs/mq/coordinator/storage.go b/pkgs/mq/coordinator/storage.go index 140b888..6debd54 100644 --- a/pkgs/mq/coordinator/storage.go +++ b/pkgs/mq/coordinator/storage.go @@ -8,7 +8,7 @@ import ( type StorageService interface { GetStorageInfo(msg *GetStorageInfo) (*GetStorageInfoResp, *mq.CodeMessage) - PackageMovedToStorage(msg *PackageMovedToStorage) (*PackageMovedToStorageResp, *mq.CodeMessage) + StoragePackageLoaded(msg *StoragePackageLoaded) (*StoragePackageLoadedResp, *mq.CodeMessage) } // 获取Storage信息 @@ -44,25 +44,25 @@ func (client *Client) GetStorageInfo(msg GetStorageInfo) (*GetStorageInfoResp, e } // 提交调度记录 -var _ = Register(StorageService.PackageMovedToStorage) +var _ = Register(StorageService.StoragePackageLoaded) -type PackageMovedToStorage struct { +type StoragePackageLoaded struct { UserID int64 `json:"userID"` PackageID int64 `json:"packageID"` StorageID int64 `json:"storageID"` } -type PackageMovedToStorageResp struct{} +type StoragePackageLoadedResp struct{} -func NewPackageMovedToStorage(userID int64, packageID int64, stgID int64) PackageMovedToStorage { - return PackageMovedToStorage{ +func NewStoragePackageLoaded(userID int64, packageID int64, stgID int64) StoragePackageLoaded { + return StoragePackageLoaded{ UserID: userID, PackageID: packageID, StorageID: stgID, } } -func NewPackageMovedToStorageResp() PackageMovedToStorageResp { - return PackageMovedToStorageResp{} +func NewStoragePackageLoadedResp() StoragePackageLoadedResp { + return StoragePackageLoadedResp{} } -func (client *Client) PackageMovedToStorage(msg PackageMovedToStorage) (*PackageMovedToStorageResp, error) { - return mq.Request[PackageMovedToStorageResp](client.rabbitCli, msg) +func (client *Client) StoragePackageLoaded(msg StoragePackageLoaded) (*StoragePackageLoadedResp, error) { + return mq.Request[StoragePackageLoadedResp](client.rabbitCli, msg) } diff --git a/utils/utils.go b/utils/utils.go index 7996b78..3639aa9 100644 --- a/utils/utils.go +++ b/utils/utils.go @@ -2,20 +2,9 @@ package utils import ( "fmt" - "strings" ) -// MakeStorageMovePackageDirName Move操作时,写入的文件的名称 -func MakeStorageMovePackageDirName(objectID int64, userID int64) string { - return fmt.Sprintf("%d-%d", objectID, userID) -} - -// GetDirectoryName 根据objectName获取所属的文件夹名 -func GetDirectoryName(objectName string) string { - parts := strings.Split(objectName, "/") - //若为文件,dirName设置为空 - if len(parts) == 1 { - return "" - } - return parts[0] +// MakeStorageLoadPackageDirName Load操作时,写入的文件夹的名称 +func MakeStorageLoadPackageDirName(packageID int64, userID int64) string { + return fmt.Sprintf("%d-%d", packageID, userID) }