Browse Source

拆分和修复锁服务代码;将调度Package的术语改为Load

gitlink
Sydonian 2 years ago
parent
commit
de513799a1
46 changed files with 2205 additions and 276 deletions
  1. +9
    -4
      assets/confs/agent.config.json
  2. +7
    -4
      assets/confs/client.config.json
  3. +3
    -3
      assets/scripts/create_database.sql
  4. +39
    -49
      pkgs/cmd/create_ec_package.go
  5. +27
    -32
      pkgs/cmd/create_rep_package.go
  6. +8
    -1
      pkgs/cmd/download_package.go
  7. +36
    -57
      pkgs/cmd/update_ec_package.go
  8. +25
    -32
      pkgs/cmd/update_rep_package.go
  9. +3
    -3
      pkgs/db/bucket.go
  10. +3
    -3
      pkgs/db/cache.go
  11. +5
    -0
      pkgs/db/object.go
  12. +14
    -9
      pkgs/db/object_block.go
  13. +13
    -8
      pkgs/db/object_rep.go
  14. +16
    -13
      pkgs/db/package.go
  15. +4
    -4
      pkgs/db/storage.go
  16. +3
    -3
      pkgs/db/storage_package.go
  17. +217
    -0
      pkgs/distlock/lockprovider/ipfs_lock.go
  18. +113
    -0
      pkgs/distlock/lockprovider/ipfs_lock_test.go
  19. +123
    -0
      pkgs/distlock/lockprovider/lock_compatibility_table.go
  20. +41
    -0
      pkgs/distlock/lockprovider/lock_compatibility_table_test.go
  21. +184
    -0
      pkgs/distlock/lockprovider/metadata_lock.go
  22. +226
    -0
      pkgs/distlock/lockprovider/storage_lock.go
  23. +78
    -0
      pkgs/distlock/lockprovider/string_lock_target.go
  24. +60
    -0
      pkgs/distlock/lockprovider/string_lock_target_test.go
  25. +64
    -0
      pkgs/distlock/reqbuilder/ipfs.go
  26. +31
    -0
      pkgs/distlock/reqbuilder/lock_request_builder.go
  27. +17
    -0
      pkgs/distlock/reqbuilder/metadata.go
  28. +63
    -0
      pkgs/distlock/reqbuilder/metadata_bucket.go
  29. +63
    -0
      pkgs/distlock/reqbuilder/metadata_cache.go
  30. +63
    -0
      pkgs/distlock/reqbuilder/metadata_node.go
  31. +65
    -0
      pkgs/distlock/reqbuilder/metadata_object.go
  32. +63
    -0
      pkgs/distlock/reqbuilder/metadata_object_block.go
  33. +63
    -0
      pkgs/distlock/reqbuilder/metadata_object_rep.go
  34. +63
    -0
      pkgs/distlock/reqbuilder/metadata_package.go
  35. +63
    -0
      pkgs/distlock/reqbuilder/metadata_storage_package.go
  36. +63
    -0
      pkgs/distlock/reqbuilder/metadata_user_bucket.go
  37. +63
    -0
      pkgs/distlock/reqbuilder/metadata_user_storage.go
  38. +74
    -0
      pkgs/distlock/reqbuilder/storage.go
  39. +62
    -0
      pkgs/distlock/service.go
  40. +1
    -1
      pkgs/grpc/agent/pool.go
  41. +6
    -1
      pkgs/iterator/ec_object_iterator.go
  42. +1
    -1
      pkgs/iterator/local_uploading_iterator.go
  43. +27
    -4
      pkgs/iterator/rep_object_iterator.go
  44. +20
    -20
      pkgs/mq/agent/storage.go
  45. +10
    -10
      pkgs/mq/coordinator/storage.go
  46. +3
    -14
      utils/utils.go

+ 9
- 4
assets/confs/agent.config.json View File

@@ -1,10 +1,15 @@
{ {
"id": 1, "id": 1,
"grpcListenAddress": "127.0.0.1:5010",
"grpcPort": 5010,
"local": {
"nodeID": 1,
"localIP": "127.0.0.1",
"externalIP": "127.0.0.1"
},
"grpc": {
"ip": "127.0.0.1",
"port": 5010
},
"ecPacketSize": 10, "ecPacketSize": 10,
"localIP": "127.0.0.1",
"externalIP": "127.0.0.1",
"storageBaseDir": ".", "storageBaseDir": ".",
"tempFileLifetime": 3600, "tempFileLifetime": 3600,
"logger": { "logger": {


+ 7
- 4
assets/confs/client.config.json View File

@@ -1,10 +1,13 @@
{ {
"grpcPort": 5010,
"local": {
"localIP": "127.0.0.1",
"externalIP": "127.0.0.1"
},
"agentGRPC": {
"port": 5010
},
"ecPacketSize": 10, "ecPacketSize": 10,
"ipfsPort": 10,
"maxRepCount": 10, "maxRepCount": 10,
"localIP": "127.0.0.1",
"externalIP": "127.0.0.1",
"logger": { "logger": {
"output": "stdout", "output": "stdout",
"level": "debug" "level": "debug"


+ 3
- 3
assets/scripts/create_database.sql View File

@@ -106,7 +106,7 @@ create table Package (
create table Object ( create table Object (
ObjectID int not null auto_increment primary key comment '对象ID', ObjectID int not null auto_increment primary key comment '对象ID',
PackageID int not null comment '包ID', PackageID int not null comment '包ID',
Path varchar(1000) not null comment '对象路径',
Path varchar(500) not null comment '对象路径',
Size bigint not null comment '对象大小(Byte)', Size bigint not null comment '对象大小(Byte)',
UNIQUE KEY PackagePath (PackageID, Path) UNIQUE KEY PackagePath (PackageID, Path)
) comment = '对象表'; ) comment = '对象表';
@@ -118,9 +118,9 @@ create table ObjectRep (


create table ObjectBlock ( create table ObjectBlock (
ObjectID int not null comment '对象ID', ObjectID int not null comment '对象ID',
Index int not null comment '编码块在条带内的排序',
`Index` int not null comment '编码块在条带内的排序',
FileHash varchar(100) not null comment '编码块哈希值', FileHash varchar(100) not null comment '编码块哈希值',
primary key(ObjectID, Index)
primary key(ObjectID, `Index`)
) comment = '对象编码块表'; ) comment = '对象编码块表';


create table Cache ( create table Cache (


+ 39
- 49
pkgs/cmd/create_ec_package.go View File

@@ -13,6 +13,7 @@ import (


"gitlink.org.cn/cloudream/storage-common/globals" "gitlink.org.cn/cloudream/storage-common/globals"
"gitlink.org.cn/cloudream/storage-common/pkgs/db/model" "gitlink.org.cn/cloudream/storage-common/pkgs/db/model"
"gitlink.org.cn/cloudream/storage-common/pkgs/distlock/reqbuilder"
"gitlink.org.cn/cloudream/storage-common/pkgs/ec" "gitlink.org.cn/cloudream/storage-common/pkgs/ec"
"gitlink.org.cn/cloudream/storage-common/pkgs/iterator" "gitlink.org.cn/cloudream/storage-common/pkgs/iterator"
coormq "gitlink.org.cn/cloudream/storage-common/pkgs/mq/coordinator" coormq "gitlink.org.cn/cloudream/storage-common/pkgs/mq/coordinator"
@@ -60,36 +61,25 @@ func (t *CreateECPackage) Execute(ctx *UpdateECPackageContext) (*CreateECPackage
return nil, fmt.Errorf("new coordinator client: %w", err) return nil, fmt.Errorf("new coordinator client: %w", err)
} }


// TODO2
/*
reqBlder := reqbuilder.NewBuilder()
for _, uploadObject := range t.Objects {
reqBlder.Metadata().
// 用于防止创建了多个同名对象
Object().CreateOne(t.bucketID, uploadObject.ObjectName)
}

// 如果本地的IPFS也是存储系统的一个节点,那么从本地上传时,需要加锁
if t.uploadConfig.LocalNodeID != nil {
reqBlder.IPFS().CreateAnyRep(*t.uploadConfig.LocalNodeID)
}

mutex, err := reqBlder.
Metadata().
// 用于判断用户是否有桶的权限
UserBucket().ReadOne(t.userID, t.bucketID).
// 用于查询可用的上传节点
Node().ReadAny().
// 用于设置Rep配置
ObjectRep().CreateAny().
// 用于创建Cache记录
Cache().CreateAny().
MutexLock(ctx.DistLock())
if err != nil {
return fmt.Errorf("acquire locks failed, err: %w", err)
}
defer mutex.Unlock()
*/
mutex, err := reqbuilder.NewBuilder().
Metadata().
// 用于判断用户是否有桶的权限
UserBucket().ReadOne(t.userID, t.bucketID).
// 用于查询可用的上传节点
Node().ReadAny().
// 用于创建包信息
Package().CreateOne(t.bucketID, t.name).
// 用于创建包中的文件的信息
Object().CreateAny().
// 用于设置EC配置
ObjectBlock().CreateAny().
// 用于创建Cache记录
Cache().CreateAny().
MutexLock(ctx.Distlock)
if err != nil {
return nil, fmt.Errorf("acquire locks failed, err: %w", err)
}
defer mutex.Unlock()


createPkgResp, err := coorCli.CreatePackage(coormq.NewCreatePackage(t.userID, t.bucketID, t.name, createPkgResp, err := coorCli.CreatePackage(coormq.NewCreatePackage(t.userID, t.bucketID, t.name,
models.NewTypedRedundancyInfo(models.RedundancyRep, t.redundancy))) models.NewTypedRedundancyInfo(models.RedundancyRep, t.redundancy)))
@@ -119,17 +109,25 @@ func (t *CreateECPackage) Execute(ctx *UpdateECPackageContext) (*CreateECPackage
return nil, fmt.Errorf("getting ec: %w", err) return nil, fmt.Errorf("getting ec: %w", err)
} }


/*
TODO2
// 防止上传的副本被清除
mutex2, err := reqbuilder.NewBuilder().
IPFS().CreateAnyRep(uploadNode.Node.NodeID).
MutexLock(ctx.DistLock())
if err != nil {
return fmt.Errorf("acquire locks failed, err: %w", err)
// 给上传节点的IPFS加锁
ipfsReqBlder := reqbuilder.NewBuilder()
// 如果本地的IPFS也是存储系统的一个节点,那么从本地上传时,需要加锁
if globals.Local.NodeID != nil {
ipfsReqBlder.IPFS().CreateAnyRep(*globals.Local.NodeID)
}
for _, node := range uploadNodeInfos {
if globals.Local.NodeID != nil && node.Node.NodeID == *globals.Local.NodeID {
continue
} }
defer mutex2.Unlock()
*/

ipfsReqBlder.IPFS().CreateAnyRep(node.Node.NodeID)
}
// 防止上传的副本被清除
ipfsMutex, err := ipfsReqBlder.MutexLock(ctx.Distlock)
if err != nil {
return nil, fmt.Errorf("acquire locks failed, err: %w", err)
}
defer ipfsMutex.Unlock()


rets, err := uploadAndUpdateECPackage(ctx, createPkgResp.PackageID, t.objectIter, uploadNodeInfos, getECResp.Config) rets, err := uploadAndUpdateECPackage(ctx, createPkgResp.PackageID, t.objectIter, uploadNodeInfos, getECResp.Config)
if err != nil { if err != nil {
@@ -239,15 +237,7 @@ func ecWrite(ctx *UpdateECPackageContext, file io.ReadCloser, fileSize int64, ec


var wg sync.WaitGroup var wg sync.WaitGroup
wg.Add(ecN) wg.Add(ecN)
/*mutex, err := reqbuilder.NewBuilder().
// 防止上传的副本被清除
IPFS().CreateAnyRep(node.ID).
MutexLock(svc.distlock)
if err != nil {
return fmt.Errorf("acquire locks failed, err: %w", err)
}
defer mutex.Unlock()
*/

for idx := 0; idx < ecN; idx++ { for idx := 0; idx < ecN; idx++ {
i := idx i := idx
reader := channelBytesReader{ reader := channelBytesReader{


+ 27
- 32
pkgs/cmd/create_rep_package.go View File

@@ -8,9 +8,9 @@ import (


"github.com/samber/lo" "github.com/samber/lo"
"gitlink.org.cn/cloudream/common/models" "gitlink.org.cn/cloudream/common/models"
"gitlink.org.cn/cloudream/common/pkgs/distlock/reqbuilder"
distsvc "gitlink.org.cn/cloudream/common/pkgs/distlock/service" distsvc "gitlink.org.cn/cloudream/common/pkgs/distlock/service"
"gitlink.org.cn/cloudream/common/pkgs/logger" "gitlink.org.cn/cloudream/common/pkgs/logger"
"gitlink.org.cn/cloudream/storage-common/pkgs/distlock/reqbuilder"


"gitlink.org.cn/cloudream/storage-common/globals" "gitlink.org.cn/cloudream/storage-common/globals"
"gitlink.org.cn/cloudream/storage-common/pkgs/db/model" "gitlink.org.cn/cloudream/storage-common/pkgs/db/model"
@@ -66,36 +66,31 @@ func (t *CreateRepPackage) Execute(ctx *UpdatePackageContext) (*CreateRepPackage
return nil, fmt.Errorf("new coordinator client: %w", err) return nil, fmt.Errorf("new coordinator client: %w", err)
} }


/*
// TODO2
reqBlder := reqbuilder.NewBuilder()
for _, uploadObject := range t.Objects {
reqBlder.Metadata().
// 用于防止创建了多个同名对象
Object().CreateOne(t.bucketID, uploadObject.ObjectName)
}

// 如果本地的IPFS也是存储系统的一个节点,那么从本地上传时,需要加锁
if t.uploadConfig.LocalNodeID != nil {
reqBlder.IPFS().CreateAnyRep(*t.uploadConfig.LocalNodeID)
}
reqBlder := reqbuilder.NewBuilder()
// 如果本地的IPFS也是存储系统的一个节点,那么从本地上传时,需要加锁
if globals.Local.NodeID != nil {
reqBlder.IPFS().CreateAnyRep(*globals.Local.NodeID)
}
mutex, err := reqBlder.
Metadata().
// 用于判断用户是否有桶的权限
UserBucket().ReadOne(t.userID, t.bucketID).
// 用于查询可用的上传节点
Node().ReadAny().
// 用于创建包信息
Package().CreateOne(t.bucketID, t.name).
// 用于创建包中的文件的信息
Object().CreateAny().
// 用于设置EC配置
ObjectBlock().CreateAny().
// 用于创建Cache记录
Cache().CreateAny().
MutexLock(ctx.Distlock)
if err != nil {
return nil, fmt.Errorf("acquire locks failed, err: %w", err)
}
defer mutex.Unlock()


mutex, err := reqBlder.
Metadata().
// 用于判断用户是否有桶的权限
UserBucket().ReadOne(t.userID, t.bucketID).
// 用于查询可用的上传节点
Node().ReadAny().
// 用于设置Rep配置
ObjectRep().CreateAny().
// 用于创建Cache记录
Cache().CreateAny().
MutexLock(ctx.DistLock())
if err != nil {
return fmt.Errorf("acquire locks failed, err: %w", err)
}
defer mutex.Unlock()
*/
createPkgResp, err := coorCli.CreatePackage(coormq.NewCreatePackage(t.userID, t.bucketID, t.name, createPkgResp, err := coorCli.CreatePackage(coormq.NewCreatePackage(t.userID, t.bucketID, t.name,
models.NewTypedRedundancyInfo(models.RedundancyRep, t.redundancy))) models.NewTypedRedundancyInfo(models.RedundancyRep, t.redundancy)))
if err != nil { if err != nil {
@@ -121,13 +116,13 @@ func (t *CreateRepPackage) Execute(ctx *UpdatePackageContext) (*CreateRepPackage
uploadNode := t.chooseUploadNode(nodeInfos) uploadNode := t.chooseUploadNode(nodeInfos)


// 防止上传的副本被清除 // 防止上传的副本被清除
mutex2, err := reqbuilder.NewBuilder().
ipfsMutex, err := reqbuilder.NewBuilder().
IPFS().CreateAnyRep(uploadNode.Node.NodeID). IPFS().CreateAnyRep(uploadNode.Node.NodeID).
MutexLock(ctx.Distlock) MutexLock(ctx.Distlock)
if err != nil { if err != nil {
return nil, fmt.Errorf("acquire locks failed, err: %w", err) return nil, fmt.Errorf("acquire locks failed, err: %w", err)
} }
defer mutex2.Unlock()
defer ipfsMutex.Unlock()


rets, err := uploadAndUpdateRepPackage(createPkgResp.PackageID, t.objectIter, uploadNode) rets, err := uploadAndUpdateRepPackage(createPkgResp.PackageID, t.objectIter, uploadNode)
if err != nil { if err != nil {


+ 8
- 1
pkgs/cmd/download_package.go View File

@@ -133,7 +133,14 @@ func (t *DownloadPackage) writeObject(objIter iterator.DownloadingObjectIterator
} }
defer objInfo.File.Close() defer objInfo.File.Close()


outputFile, err := os.Create(filepath.Join(t.outputPath, objInfo.Object.Path))
fullPath := filepath.Join(t.outputPath, objInfo.Object.Path)

dirPath := filepath.Dir(fullPath)
if err := os.MkdirAll(dirPath, 0755); err != nil {
return fmt.Errorf("creating object dir: %w", err)
}

outputFile, err := os.Create(fullPath)
if err != nil { if err != nil {
return fmt.Errorf("creating object file: %w", err) return fmt.Errorf("creating object file: %w", err)
} }


+ 36
- 57
pkgs/cmd/update_ec_package.go View File

@@ -6,10 +6,10 @@ import (
"github.com/samber/lo" "github.com/samber/lo"
"gitlink.org.cn/cloudream/common/models" "gitlink.org.cn/cloudream/common/models"
"gitlink.org.cn/cloudream/common/utils/serder" "gitlink.org.cn/cloudream/common/utils/serder"
mysort "gitlink.org.cn/cloudream/common/utils/sort"


"gitlink.org.cn/cloudream/storage-common/globals" "gitlink.org.cn/cloudream/storage-common/globals"
"gitlink.org.cn/cloudream/storage-common/pkgs/db/model" "gitlink.org.cn/cloudream/storage-common/pkgs/db/model"
"gitlink.org.cn/cloudream/storage-common/pkgs/distlock/reqbuilder"
"gitlink.org.cn/cloudream/storage-common/pkgs/iterator" "gitlink.org.cn/cloudream/storage-common/pkgs/iterator"
coormq "gitlink.org.cn/cloudream/storage-common/pkgs/mq/coordinator" coormq "gitlink.org.cn/cloudream/storage-common/pkgs/mq/coordinator"
) )
@@ -40,36 +40,23 @@ func (t *UpdateECPackage) Execute(ctx *UpdateECPackageContext) (*UpdateECPackage
return nil, fmt.Errorf("new coordinator client: %w", err) return nil, fmt.Errorf("new coordinator client: %w", err)
} }


/*
TODO2
reqBlder := reqbuilder.NewBuilder()

// 如果本地的IPFS也是存储系统的一个节点,那么从本地上传时,需要加锁
if t.uploadConfig.LocalNodeID != nil {
reqBlder.IPFS().CreateAnyRep(*t.uploadConfig.LocalNodeID)
}

// TODO2
mutex, err := reqBlder.
Metadata().
// 用于判断用户是否有对象的权限
UserBucket().ReadAny().
// 用于读取、修改对象信息
Object().WriteOne(t.objectID).
// 用于更新Rep配置
ObjectRep().WriteOne(t.objectID).
// 用于查询可用的上传节点
Node().ReadAny().
// 用于创建Cache记录
Cache().CreateAny().
// 用于修改Move此Object的记录的状态
StorageObject().WriteAny().
MutexLock(ctx.DistLock())
if err != nil {
return fmt.Errorf("acquire locks failed, err: %w", err)
}
defer mutex.Unlock()
*/
mutex, err := reqbuilder.NewBuilder().
Metadata().
// 用于查询可用的上传节点
Node().ReadAny().
// 用于创建包信息
Package().WriteOne(t.packageID).
// 用于创建包中的文件的信息
Object().CreateAny().
// 用于设置EC配置
ObjectBlock().CreateAny().
// 用于创建Cache记录
Cache().CreateAny().
MutexLock(ctx.Distlock)
if err != nil {
return nil, fmt.Errorf("acquire locks failed, err: %w", err)
}
defer mutex.Unlock()


getPkgResp, err := coorCli.GetPackage(coormq.NewGetPackage(t.userID, t.packageID)) getPkgResp, err := coorCli.GetPackage(coormq.NewGetPackage(t.userID, t.packageID))
if err != nil { if err != nil {
@@ -103,17 +90,25 @@ func (t *UpdateECPackage) Execute(ctx *UpdateECPackageContext) (*UpdateECPackage
return nil, fmt.Errorf("getting ec: %w", err) return nil, fmt.Errorf("getting ec: %w", err)
} }


/*
TODO2
// 防止上传的副本被清除
mutex2, err := reqbuilder.NewBuilder().
IPFS().CreateAnyRep(uploadNode.Node.NodeID).
MutexLock(ctx.DistLock())
if err != nil {
return fmt.Errorf("acquire locks failed, err: %w", err)
// 给上传节点的IPFS加锁
ipfsReqBlder := reqbuilder.NewBuilder()
// 如果本地的IPFS也是存储系统的一个节点,那么从本地上传时,需要加锁
if globals.Local.NodeID != nil {
ipfsReqBlder.IPFS().CreateAnyRep(*globals.Local.NodeID)
}
for _, node := range nodeInfos {
if globals.Local.NodeID != nil && node.Node.NodeID == *globals.Local.NodeID {
continue
} }
defer mutex2.Unlock()
*/

ipfsReqBlder.IPFS().CreateAnyRep(node.Node.NodeID)
}
// 防止上传的副本被清除
ipfsMutex, err := ipfsReqBlder.MutexLock(ctx.Distlock)
if err != nil {
return nil, fmt.Errorf("acquire locks failed, err: %w", err)
}
defer ipfsMutex.Unlock()


rets, err := uploadAndUpdateECPackage(ctx, t.packageID, t.objectIter, nodeInfos, getECResp.Config) rets, err := uploadAndUpdateECPackage(ctx, t.packageID, t.objectIter, nodeInfos, getECResp.Config)
if err != nil { if err != nil {
@@ -124,19 +119,3 @@ func (t *UpdateECPackage) Execute(ctx *UpdateECPackageContext) (*UpdateECPackage
ObjectResults: rets, ObjectResults: rets,
}, nil }, nil
} }

// chooseUploadNode 选择一个上传文件的节点
// 1. 从与当前客户端相同地域的节点中随机选一个
// 2. 没有用的话从所有节点中随机选一个
func (t *UpdateECPackage) chooseUploadNode(nodes []UpdateNodeInfo) UpdateNodeInfo {
mysort.Sort(nodes, func(left, right UpdateNodeInfo) int {
v := -mysort.CmpBool(left.HasOldObject, right.HasOldObject)
if v != 0 {
return v
}

return -mysort.CmpBool(left.IsSameLocation, right.IsSameLocation)
})

return nodes[0]
}

+ 25
- 32
pkgs/cmd/update_rep_package.go View File

@@ -4,8 +4,8 @@ import (
"fmt" "fmt"


"github.com/samber/lo" "github.com/samber/lo"
"gitlink.org.cn/cloudream/common/pkgs/distlock/reqbuilder"
mysort "gitlink.org.cn/cloudream/common/utils/sort" mysort "gitlink.org.cn/cloudream/common/utils/sort"
"gitlink.org.cn/cloudream/storage-common/pkgs/distlock/reqbuilder"


"gitlink.org.cn/cloudream/storage-common/globals" "gitlink.org.cn/cloudream/storage-common/globals"
"gitlink.org.cn/cloudream/storage-common/pkgs/db/model" "gitlink.org.cn/cloudream/storage-common/pkgs/db/model"
@@ -43,36 +43,29 @@ func (t *UpdateRepPackage) Execute(ctx *UpdatePackageContext) (*UpdateRepPackage
if err != nil { if err != nil {
return nil, fmt.Errorf("new coordinator client: %w", err) return nil, fmt.Errorf("new coordinator client: %w", err)
} }
/*
TODO2
reqBlder := reqbuilder.NewBuilder()


// 如果本地的IPFS也是存储系统的一个节点,那么从本地上传时,需要加锁
if t.uploadConfig.LocalNodeID != nil {
reqBlder.IPFS().CreateAnyRep(*t.uploadConfig.LocalNodeID)
}

// TODO2
mutex, err := reqBlder.
Metadata().
// 用于判断用户是否有对象的权限
UserBucket().ReadAny().
// 用于读取、修改对象信息
Object().WriteOne(t.objectID).
// 用于更新Rep配置
ObjectRep().WriteOne(t.objectID).
// 用于查询可用的上传节点
Node().ReadAny().
// 用于创建Cache记录
Cache().CreateAny().
// 用于修改Move此Object的记录的状态
StorageObject().WriteAny().
MutexLock(ctx.DistLock())
if err != nil {
return fmt.Errorf("acquire locks failed, err: %w", err)
}
defer mutex.Unlock()
*/
reqBlder := reqbuilder.NewBuilder()
// 如果本地的IPFS也是存储系统的一个节点,那么从本地上传时,需要加锁
if globals.Local.NodeID != nil {
reqBlder.IPFS().CreateAnyRep(*globals.Local.NodeID)
}
mutex, err := reqBlder.
Metadata().
// 用于查询可用的上传节点
Node().ReadAny().
// 用于创建包信息
Package().WriteOne(t.packageID).
// 用于创建包中的文件的信息
Object().CreateAny().
// 用于设置EC配置
ObjectBlock().CreateAny().
// 用于创建Cache记录
Cache().CreateAny().
MutexLock(ctx.Distlock)
if err != nil {
return nil, fmt.Errorf("acquire locks failed, err: %w", err)
}
defer mutex.Unlock()
getUserNodesResp, err := coorCli.GetUserNodes(coormq.NewGetUserNodes(t.userID)) getUserNodesResp, err := coorCli.GetUserNodes(coormq.NewGetUserNodes(t.userID))
if err != nil { if err != nil {
return nil, fmt.Errorf("getting user nodes: %w", err) return nil, fmt.Errorf("getting user nodes: %w", err)
@@ -100,13 +93,13 @@ func (t *UpdateRepPackage) Execute(ctx *UpdatePackageContext) (*UpdateRepPackage
uploadNode := t.chooseUploadNode(nodeInfos) uploadNode := t.chooseUploadNode(nodeInfos)


// 防止上传的副本被清除 // 防止上传的副本被清除
mutex2, err := reqbuilder.NewBuilder().
ipfsMutex, err := reqbuilder.NewBuilder().
IPFS().CreateAnyRep(uploadNode.Node.NodeID). IPFS().CreateAnyRep(uploadNode.Node.NodeID).
MutexLock(ctx.Distlock) MutexLock(ctx.Distlock)
if err != nil { if err != nil {
return nil, fmt.Errorf("acquire locks failed, err: %w", err) return nil, fmt.Errorf("acquire locks failed, err: %w", err)
} }
defer mutex2.Unlock()
defer ipfsMutex.Unlock()


rets, err := uploadAndUpdateRepPackage(t.packageID, t.objectIter, uploadNode.UploadNodeInfo) rets, err := uploadAndUpdateRepPackage(t.packageID, t.objectIter, uploadNode.UploadNodeInfo)
if err != nil { if err != nil {


+ 3
- 3
pkgs/db/bucket.go View File

@@ -50,9 +50,9 @@ func (db *BucketDB) IsAvailable(ctx SQLContext, bucketID int64, userID int64) (b
func (*BucketDB) GetUserBucket(ctx SQLContext, userID int64, bucketID int64) (model.Bucket, error) { func (*BucketDB) GetUserBucket(ctx SQLContext, userID int64, bucketID int64) (model.Bucket, error) {
var ret model.Bucket var ret model.Bucket
err := sqlx.Get(ctx, &ret, err := sqlx.Get(ctx, &ret,
"select Bucket.* from UserBucket, Bucket where UserID = ? and "+
"UserBucket.BucketID = Bucket.BucketID and "+
"Bucket.BucketID = ?", userID, bucketID)
"select Bucket.* from UserBucket, Bucket where UserID = ? and"+
" UserBucket.BucketID = Bucket.BucketID and"+
" Bucket.BucketID = ?", userID, bucketID)
return ret, err return ret, err
} }




+ 3
- 3
pkgs/db/cache.go View File

@@ -80,9 +80,9 @@ func (*CacheDB) DeleteNodeAll(ctx SQLContext, nodeID int64) error {
func (*CacheDB) FindCachingFileUserNodes(ctx SQLContext, userID int64, fileHash string) ([]model.Node, error) { func (*CacheDB) FindCachingFileUserNodes(ctx SQLContext, userID int64, fileHash string) ([]model.Node, error) {
var x []model.Node var x []model.Node
err := sqlx.Select(ctx, &x, err := sqlx.Select(ctx, &x,
"select Node.* from Cache, UserNode, Node where "+
"Cache.FileHash=? and Cache.NodeID = UserNode.NodeID and "+
"UserNode.UserID = ? and UserNode.NodeID = Node.NodeID", fileHash, userID)
"select Node.* from Cache, UserNode, Node where"+
" Cache.FileHash=? and Cache.NodeID = UserNode.NodeID and"+
" UserNode.UserID = ? and UserNode.NodeID = Node.NodeID", fileHash, userID)
return x, err return x, err
} }




+ 5
- 0
pkgs/db/object.go View File

@@ -273,3 +273,8 @@ func (*ObjectDB) BatchDelete(ctx SQLContext, ids []int64) error {
_, err := ctx.Exec("delete from Object where ObjectID in (?)", ids) _, err := ctx.Exec("delete from Object where ObjectID in (?)", ids)
return err return err
} }

func (*ObjectDB) DeleteInPackage(ctx SQLContext, packageID int64) error {
_, err := ctx.Exec("delete from Object where PackageID = ?", packageID)
return err
}

+ 14
- 9
pkgs/db/object_block.go View File

@@ -28,13 +28,18 @@ func (db *ObjectBlockDB) DeleteObjectAll(ctx SQLContext, objectID int64) error {
return err return err
} }


func (db *ObjectBlockDB) DeleteInPackage(ctx SQLContext, packageID int64) error {
_, err := ctx.Exec("delete ObjectBlock from ObjectBlock inner join Object on ObjectBlock.ObjectID = Object.ObjectID where PackageID = ?", packageID)
return err
}

func (db *ObjectBlockDB) CountBlockWithHash(ctx SQLContext, fileHash string) (int, error) { func (db *ObjectBlockDB) CountBlockWithHash(ctx SQLContext, fileHash string) (int, error) {
var cnt int var cnt int
err := sqlx.Get(ctx, &cnt, err := sqlx.Get(ctx, &cnt,
"select count(FileHash) from ObjectBlock, Object, Package where FileHash = ? and "+
"ObjectBlock.ObjectID = Object.ObjectID and "+
"Object.PackageID = Package.PackageID and "+
"Package.State = ?", fileHash, consts.PackageStateNormal)
"select count(FileHash) from ObjectBlock, Object, Package where FileHash = ? and"+
" ObjectBlock.ObjectID = Object.ObjectID and"+
" Object.PackageID = Package.PackageID and"+
" Package.State = ?", fileHash, consts.PackageStateNormal)
if err == sql.ErrNoRows { if err == sql.ErrNoRows {
return 0, nil return 0, nil
} }
@@ -66,8 +71,8 @@ func (db *ObjectBlockDB) GetBatchBlocksNodes(ctx SQLContext, hashs [][]string) (
for j, h := range hs { for j, h := range hs {
var x []model.Node var x []model.Node
err = sqlx.Select(ctx, &x, err = sqlx.Select(ctx, &x,
"select Node.* from Cache, Node where "+
"Cache.FileHash=? and Cache.NodeID = Node.NodeID and Cache.State=?", h, consts.CacheStatePinned)
"select Node.* from Cache, Node where"+
" Cache.FileHash=? and Cache.NodeID = Node.NodeID and Cache.State=?", h, consts.CacheStatePinned)
xx := make([]int64, len(x)) xx := make([]int64, len(x))
for ii := 0; ii < len(x); ii++ { for ii := 0; ii < len(x); ii++ {
xx[ii] = x[ii].NodeID xx[ii] = x[ii].NodeID
@@ -97,9 +102,9 @@ func (db *ObjectBlockDB) GetWithNodeIDInPackage(ctx SQLContext, packageID int64)


err := sqlx.Select(ctx, err := sqlx.Select(ctx,
&tmpRets, &tmpRets,
"select ObjectBlock.Index, ObjectBlock.FileHash, group_concat(NodeID) as NodeIDs from ObjectBlock "+
"left join Cache on ObjectBlock.FileHash = Cache.FileHash"+
"where ObjectID = ? group by ObjectBlock.Index, ObjectBlock.FileHash",
"select ObjectBlock.Index, ObjectBlock.FileHash, group_concat(NodeID) as NodeIDs from ObjectBlock"+
" left join Cache on ObjectBlock.FileHash = Cache.FileHash"+
" where ObjectID = ? group by ObjectBlock.Index, ObjectBlock.FileHash",
objID, objID,
) )
if err != nil { if err != nil {


+ 13
- 8
pkgs/db/object_rep.go View File

@@ -51,13 +51,18 @@ func (db *ObjectRepDB) Delete(ctx SQLContext, objectID int64) error {
return err return err
} }


func (db *ObjectRepDB) DeleteInPackage(ctx SQLContext, packageID int64) error {
_, err := ctx.Exec("delete ObjectRep from ObjectRep inner join Object on ObjectRep.ObjectID = Object.ObjectID where PackageID = ?", packageID)
return err
}

func (db *ObjectRepDB) GetFileMaxRepCount(ctx SQLContext, fileHash string) (int, error) { func (db *ObjectRepDB) GetFileMaxRepCount(ctx SQLContext, fileHash string) (int, error) {
var maxRepCnt *int var maxRepCnt *int
err := sqlx.Get(ctx, &maxRepCnt, err := sqlx.Get(ctx, &maxRepCnt,
"select max(RepCount) from ObjectRep, Object, Package where FileHash = ? and "+
"ObjectRep.ObjectID = Object.ObjectID and "+
"Object.PackageID = Package.PackageID and "+
"Package.State = ?", fileHash, consts.PackageStateNormal)
"select json_extract(Redundancy, '$.info.repCount') from ObjectRep, Object, Package where FileHash = ? and"+
" ObjectRep.ObjectID = Object.ObjectID and"+
" Object.PackageID = Package.PackageID and"+
" Package.State = ?", fileHash, consts.PackageStateNormal)


if err == sql.ErrNoRows { if err == sql.ErrNoRows {
return 0, nil return 0, nil
@@ -83,10 +88,10 @@ func (db *ObjectRepDB) GetWithNodeIDInPackage(ctx SQLContext, packageID int64) (


err := sqlx.Select(ctx, err := sqlx.Select(ctx,
&tmpRets, &tmpRets,
"select Object.ObjectID, ObjectRep.FileHash, group_concat(NodeID) as NodeIDs from Object "+
"left join ObjectRep on Object.ObjectID = ObjectRep.ObjectID "+
"left join Cache on ObjectRep.FileHash = Cache.FileHash"+
"where PackageID = ? group by Object.ObjectID order by Object.ObjectID asc",
"select Object.ObjectID, ObjectRep.FileHash, group_concat(NodeID) as NodeIDs from Object"+
" left join ObjectRep on Object.ObjectID = ObjectRep.ObjectID"+
" left join Cache on ObjectRep.FileHash = Cache.FileHash"+
" where PackageID = ? group by Object.ObjectID order by Object.ObjectID asc",
packageID, packageID,
) )
if err != nil { if err != nil {


+ 16
- 13
pkgs/db/package.go View File

@@ -70,10 +70,10 @@ func (db *PackageDB) IsAvailable(ctx SQLContext, userID int64, packageID int64)
func (db *PackageDB) GetUserPackage(ctx SQLContext, userID int64, packageID int64) (model.Package, error) { func (db *PackageDB) GetUserPackage(ctx SQLContext, userID int64, packageID int64) (model.Package, error) {
var ret model.Package var ret model.Package
err := sqlx.Get(ctx, &ret, err := sqlx.Get(ctx, &ret,
"select Package.* from Package, UserBucket where "+
"Package.PackageID = ? and "+
"Package.BucketID = UserBucket.BucketID and "+
"UserBucket.UserID = ?",
"select Package.* from Package, UserBucket where"+
" Package.PackageID = ? and"+
" Package.BucketID = UserBucket.BucketID and"+
" UserBucket.UserID = ?",
packageID, userID) packageID, userID)
return ret, err return ret, err
} }
@@ -129,16 +129,19 @@ func (db *PackageDB) SoftDelete(ctx SQLContext, packageID int64) error {
} }


if obj.Redundancy.Type == models.RedundancyRep { if obj.Redundancy.Type == models.RedundancyRep {
// TODO2
//err = db.ObjectRep().Delete(ctx, objectID)
//if err != nil {
// return fmt.Errorf("delete from object rep failed, err: %w", err)
//}
err = db.ObjectRep().DeleteInPackage(ctx, packageID)
if err != nil {
return fmt.Errorf("delete from object rep failed, err: %w", err)
}
} else { } else {
//err = db.ObjectBlock().Delete(ctx, objectID)
//if err != nil {
// return fmt.Errorf("delete from object rep failed, err: %w", err)
//}
err = db.ObjectBlock().DeleteInPackage(ctx, packageID)
if err != nil {
return fmt.Errorf("delete from object rep failed, err: %w", err)
}
}

if err := db.Object().DeleteInPackage(ctx, packageID); err != nil {
return fmt.Errorf("deleting objects in package: %w", err)
} }


_, err = db.StoragePackage().SetAllPackageDeleted(ctx, packageID) _, err = db.StoragePackage().SetAllPackageDeleted(ctx, packageID)


+ 4
- 4
pkgs/db/storage.go View File

@@ -31,10 +31,10 @@ func (db *StorageDB) BatchGetAllStorageIDs(ctx SQLContext, start int, count int)
func (db *StorageDB) IsAvailable(ctx SQLContext, userID int64, storageID int64) (bool, error) { func (db *StorageDB) IsAvailable(ctx SQLContext, userID int64, storageID int64) (bool, error) {
var stgID int64 var stgID int64
err := sqlx.Get(ctx, &stgID, err := sqlx.Get(ctx, &stgID,
"select Storage.StorageID from Storage, UserStorage where "+
"Storage.StorageID = ? and "+
"Storage.StorageID = UserStorage.StorageID and "+
"UserStorage.UserID = ?",
"select Storage.StorageID from Storage, UserStorage where"+
" Storage.StorageID = ? and"+
" Storage.StorageID = UserStorage.StorageID and"+
" UserStorage.UserID = ?",
storageID, userID) storageID, userID)


if err == sql.ErrNoRows { if err == sql.ErrNoRows {


+ 3
- 3
pkgs/db/storage_package.go View File

@@ -34,7 +34,7 @@ func (*StoragePackageDB) GetAllByStorageID(ctx SQLContext, storageID int64) ([]m
return ret, err return ret, err
} }


func (*StoragePackageDB) MovePackageTo(ctx SQLContext, packageID int64, storageID int64, userID int64) error {
func (*StoragePackageDB) LoadPackage(ctx SQLContext, packageID int64, storageID int64, userID int64) error {
_, err := ctx.Exec("insert into StoragePackage values(?,?,?,?)", packageID, storageID, userID, consts.StoragePackageStateNormal) _, err := ctx.Exec("insert into StoragePackage values(?,?,?,?)", packageID, storageID, userID, consts.StoragePackageStateNormal)
return err return err
} }
@@ -108,8 +108,8 @@ func (*StoragePackageDB) Delete(ctx SQLContext, storageID int64, packageID int64
func (*StoragePackageDB) FindPackageStorages(ctx SQLContext, packageID int64) ([]model.Storage, error) { func (*StoragePackageDB) FindPackageStorages(ctx SQLContext, packageID int64) ([]model.Storage, error) {
var ret []model.Storage var ret []model.Storage
err := sqlx.Select(ctx, &ret, err := sqlx.Select(ctx, &ret,
"select Storage.* from StoragePackage, Storage where PackageID = ? and "+
"StoragePackage.StorageID = Storage.StorageID",
"select Storage.* from StoragePackage, Storage where PackageID = ? and"+
" StoragePackage.StorageID = Storage.StorageID",
packageID, packageID,
) )
return ret, err return ret, err


+ 217
- 0
pkgs/distlock/lockprovider/ipfs_lock.go View File

@@ -0,0 +1,217 @@
package lockprovider

import (
"fmt"

"github.com/samber/lo"
"gitlink.org.cn/cloudream/common/pkgs/distlock"
mylo "gitlink.org.cn/cloudream/common/utils/lo"
)

const (
IPFSLockPathPrefix = "IPFS"

IPFS_SET_READ_LOCK = "SetRead"
IPFS_SET_WRITE_LOCK = "SetWrite"
IPFS_SET_CREATE_LOCK = "SetCreate"

IPFS_ELEMENT_READ_LOCK = "ElementRead"
IPFS_ELEMENT_WRITE_LOCK = "ElementWrite"

IPFS_NODE_ID_PATH_INDEX = 1
)

type IPFSLock struct {
nodeLocks map[string]*IPFSNodeLock
dummyLock *IPFSNodeLock
}

func NewIPFSLock() *IPFSLock {
return &IPFSLock{
nodeLocks: make(map[string]*IPFSNodeLock),
dummyLock: NewIPFSNodeLock(),
}
}

// CanLock 判断这个锁能否锁定成功
func (l *IPFSLock) CanLock(lock distlock.Lock) error {
nodeLock, ok := l.nodeLocks[lock.Path[IPFS_NODE_ID_PATH_INDEX]]
if !ok {
// 不能直接返回nil,因为如果锁数据的格式不对,也不能获取锁。
// 这里使用一个空Provider来进行检查。
return l.dummyLock.CanLock(lock)
}

return nodeLock.CanLock(lock)
}

// 锁定。在内部可以不用判断能否加锁,外部需要保证调用此函数前调用了CanLock进行检查
func (l *IPFSLock) Lock(reqID string, lock distlock.Lock) error {
nodeID := lock.Path[IPFS_NODE_ID_PATH_INDEX]

nodeLock, ok := l.nodeLocks[nodeID]
if !ok {
nodeLock = NewIPFSNodeLock()
l.nodeLocks[nodeID] = nodeLock
}

return nodeLock.Lock(reqID, lock)
}

// 解锁
func (l *IPFSLock) Unlock(reqID string, lock distlock.Lock) error {
nodeID := lock.Path[IPFS_NODE_ID_PATH_INDEX]

nodeLock, ok := l.nodeLocks[nodeID]
if !ok {
return nil
}

return nodeLock.Unlock(reqID, lock)
}

// GetTargetString 将锁对象序列化为字符串,方便存储到ETCD
func (l *IPFSLock) GetTargetString(target any) (string, error) {
tar := target.(StringLockTarget)
return StringLockTargetToString(&tar)
}

// ParseTargetString 解析字符串格式的锁对象数据
func (l *IPFSLock) ParseTargetString(targetStr string) (any, error) {
return StringLockTargetFromString(targetStr)
}

// Clear 清除内部所有状态
func (l *IPFSLock) Clear() {
l.nodeLocks = make(map[string]*IPFSNodeLock)
}

type ipfsElementLock struct {
target StringLockTarget
requestIDs []string
}

type IPFSNodeLock struct {
setReadReqIDs []string
setWriteReqIDs []string
setCreateReqIDs []string

elementReadLocks []*ipfsElementLock
elementWriteLocks []*ipfsElementLock

lockCompatibilityTable *LockCompatibilityTable
}

func NewIPFSNodeLock() *IPFSNodeLock {
compTable := &LockCompatibilityTable{}

ipfsLock := IPFSNodeLock{
lockCompatibilityTable: compTable,
}

compTable.
Column(IPFS_ELEMENT_READ_LOCK, func() bool { return len(ipfsLock.elementReadLocks) > 0 }).
Column(IPFS_ELEMENT_WRITE_LOCK, func() bool { return len(ipfsLock.elementWriteLocks) > 0 }).
Column(IPFS_SET_READ_LOCK, func() bool { return len(ipfsLock.setReadReqIDs) > 0 }).
Column(IPFS_SET_WRITE_LOCK, func() bool { return len(ipfsLock.setWriteReqIDs) > 0 }).
Column(IPFS_SET_CREATE_LOCK, func() bool { return len(ipfsLock.setCreateReqIDs) > 0 })

comp := LockCompatible()
uncp := LockUncompatible()
trgt := LockSpecial(func(lock distlock.Lock, testLockName string) bool {
strTar := lock.Target.(StringLockTarget)
if testLockName == IPFS_ELEMENT_READ_LOCK {
// 如果没有任何锁的锁对象与当前的锁对象冲突,那么这个锁可以加
return lo.NoneBy(ipfsLock.elementReadLocks, func(other *ipfsElementLock) bool { return strTar.IsConflict(&other.target) })
}

return lo.NoneBy(ipfsLock.elementWriteLocks, func(other *ipfsElementLock) bool { return strTar.IsConflict(&other.target) })
})

compTable.MustRow(comp, trgt, comp, uncp, comp)
compTable.MustRow(trgt, trgt, uncp, uncp, uncp)
compTable.MustRow(comp, uncp, comp, uncp, uncp)
compTable.MustRow(uncp, uncp, uncp, uncp, uncp)
compTable.MustRow(comp, uncp, uncp, uncp, comp)

return &ipfsLock
}

// CanLock 判断这个锁能否锁定成功
func (l *IPFSNodeLock) CanLock(lock distlock.Lock) error {
return l.lockCompatibilityTable.Test(lock)
}

// 锁定
func (l *IPFSNodeLock) Lock(reqID string, lock distlock.Lock) error {
switch lock.Name {
case IPFS_SET_READ_LOCK:
l.setReadReqIDs = append(l.setReadReqIDs, reqID)
case IPFS_SET_WRITE_LOCK:
l.setWriteReqIDs = append(l.setWriteReqIDs, reqID)
case IPFS_SET_CREATE_LOCK:
l.setCreateReqIDs = append(l.setCreateReqIDs, reqID)

case IPFS_ELEMENT_READ_LOCK:
l.elementReadLocks = l.addElementLock(lock, l.elementReadLocks, reqID)
case IPFS_ELEMENT_WRITE_LOCK:
l.elementWriteLocks = l.addElementLock(lock, l.elementWriteLocks, reqID)

default:
return fmt.Errorf("unknow lock name: %s", lock.Name)
}

return nil
}

func (l *IPFSNodeLock) addElementLock(lock distlock.Lock, locks []*ipfsElementLock, reqID string) []*ipfsElementLock {
strTarget := lock.Target.(StringLockTarget)
lck, ok := lo.Find(locks, func(l *ipfsElementLock) bool { return strTarget.IsConflict(&l.target) })
if !ok {
lck = &ipfsElementLock{
target: strTarget,
}
locks = append(locks, lck)
}

lck.requestIDs = append(lck.requestIDs, reqID)
return locks
}

// 解锁
func (l *IPFSNodeLock) Unlock(reqID string, lock distlock.Lock) error {
switch lock.Name {
case IPFS_SET_READ_LOCK:
l.setReadReqIDs = mylo.Remove(l.setReadReqIDs, reqID)
case IPFS_SET_WRITE_LOCK:
l.setWriteReqIDs = mylo.Remove(l.setWriteReqIDs, reqID)
case IPFS_SET_CREATE_LOCK:
l.setCreateReqIDs = mylo.Remove(l.setCreateReqIDs, reqID)

case IPFS_ELEMENT_READ_LOCK:
l.elementReadLocks = l.removeElementLock(lock, l.elementReadLocks, reqID)
case IPFS_ELEMENT_WRITE_LOCK:
l.elementWriteLocks = l.removeElementLock(lock, l.elementWriteLocks, reqID)

default:
return fmt.Errorf("unknow lock name: %s", lock.Name)
}

return nil
}

func (l *IPFSNodeLock) removeElementLock(lock distlock.Lock, locks []*ipfsElementLock, reqID string) []*ipfsElementLock {
strTarget := lock.Target.(StringLockTarget)
lck, index, ok := lo.FindIndexOf(locks, func(l *ipfsElementLock) bool { return strTarget.IsConflict(&l.target) })
if !ok {
return locks
}

lck.requestIDs = mylo.Remove(lck.requestIDs, reqID)

if len(lck.requestIDs) == 0 {
locks = mylo.RemoveAt(locks, index)
}

return locks
}

+ 113
- 0
pkgs/distlock/lockprovider/ipfs_lock_test.go View File

@@ -0,0 +1,113 @@
package lockprovider

import (
"testing"

. "github.com/smartystreets/goconvey/convey"
"gitlink.org.cn/cloudream/common/pkgs/distlock"
)

func Test_IPFSLock(t *testing.T) {
cases := []struct {
title string
initLocks []distlock.Lock
doLock distlock.Lock
wantOK bool
}{
{
title: "同节点,同一个Read锁",
initLocks: []distlock.Lock{
{
Path: []string{IPFSLockPathPrefix, "node1"},
Name: IPFS_SET_READ_LOCK,
},
},
doLock: distlock.Lock{
Path: []string{IPFSLockPathPrefix, "node1"},
Name: IPFS_SET_READ_LOCK,
},
wantOK: true,
},
{
title: "同节点,同一个Write锁",
initLocks: []distlock.Lock{
{
Path: []string{IPFSLockPathPrefix, "node1"},
Name: IPFS_SET_WRITE_LOCK,
},
},
doLock: distlock.Lock{
Path: []string{IPFSLockPathPrefix, "node1"},
Name: IPFS_SET_WRITE_LOCK,
},
wantOK: false,
},
{
title: "不同节点,同一个Write锁",
initLocks: []distlock.Lock{
{
Path: []string{IPFSLockPathPrefix, "node1"},
Name: IPFS_SET_WRITE_LOCK,
},
},
doLock: distlock.Lock{
Path: []string{IPFSLockPathPrefix, "node2"},
Name: IPFS_SET_WRITE_LOCK,
},
wantOK: true,
},
{
title: "相同对象的Read、Write锁",
initLocks: []distlock.Lock{
{
Path: []string{IPFSLockPathPrefix, "node1"},
Name: IPFS_ELEMENT_WRITE_LOCK,
Target: *NewStringLockTarget(),
},
},
doLock: distlock.Lock{
Path: []string{IPFSLockPathPrefix, "node1"},
Name: IPFS_ELEMENT_WRITE_LOCK,
Target: *NewStringLockTarget(),
},
wantOK: false,
},
}

for _, ca := range cases {
Convey(ca.title, t, func() {
ipfsLock := NewIPFSLock()

for _, l := range ca.initLocks {
ipfsLock.Lock("req1", l)
}

err := ipfsLock.CanLock(ca.doLock)
if ca.wantOK {
So(err, ShouldBeNil)
} else {
So(err, ShouldNotBeNil)
}
})
}

Convey("解锁", t, func() {
ipfsLock := NewIPFSLock()

lock := distlock.Lock{
Path: []string{IPFSLockPathPrefix, "node1"},
Name: IPFS_SET_WRITE_LOCK,
}

ipfsLock.Lock("req1", lock)

err := ipfsLock.CanLock(lock)
So(err, ShouldNotBeNil)

ipfsLock.Unlock("req1", lock)

err = ipfsLock.CanLock(lock)
So(err, ShouldBeNil)
})

}

+ 123
- 0
pkgs/distlock/lockprovider/lock_compatibility_table.go View File

@@ -0,0 +1,123 @@
package lockprovider

import (
"fmt"

"github.com/samber/lo"
"gitlink.org.cn/cloudream/common/pkgs/distlock"
)

const (
LOCK_COMPATIBILITY_COMPATIBLE LockCompatibilityType = "Compatible"
LOCK_COMPATIBILITY_UNCOMPATIBLE LockCompatibilityType = "Uncompatible"
LOCK_COMPATIBILITY_SPECIAL LockCompatibilityType = "Special"
)

type HasSuchLockFn = func() bool

// LockCompatibilitySpecialFn 判断锁与指定的锁名是否兼容
type LockCompatibilitySpecialFn func(lock distlock.Lock, testLockName string) bool

type LockCompatibilityType string

type LockCompatibility struct {
Type LockCompatibilityType
SpecialFn LockCompatibilitySpecialFn
}

func LockCompatible() LockCompatibility {
return LockCompatibility{
Type: LOCK_COMPATIBILITY_COMPATIBLE,
}
}

func LockUncompatible() LockCompatibility {
return LockCompatibility{
Type: LOCK_COMPATIBILITY_UNCOMPATIBLE,
}
}

func LockSpecial(specialFn LockCompatibilitySpecialFn) LockCompatibility {
return LockCompatibility{
Type: LOCK_COMPATIBILITY_SPECIAL,
SpecialFn: specialFn,
}
}

type LockCompatibilityTableRow struct {
LockName string
HasSuchLockFn HasSuchLockFn
Compatibilities []LockCompatibility
}

type LockCompatibilityTable struct {
rows []LockCompatibilityTableRow
rowIndex int
}

func (t *LockCompatibilityTable) Column(lockName string, hasSuchLock HasSuchLockFn) *LockCompatibilityTable {
t.rows = append(t.rows, LockCompatibilityTableRow{
LockName: lockName,
HasSuchLockFn: hasSuchLock,
})

return t
}
func (t *LockCompatibilityTable) MustRow(comps ...LockCompatibility) {
err := t.Row(comps...)
if err != nil {
panic(fmt.Sprintf("build lock compatibility table failed, err: %s", err.Error()))
}
}

func (t *LockCompatibilityTable) Row(comps ...LockCompatibility) error {
if t.rowIndex >= len(t.rows) {
return fmt.Errorf("there should be no more rows in the table")
}

if len(comps) < len(t.rows) {
return fmt.Errorf("the columns should equals the rows")
}

t.rows[t.rowIndex].Compatibilities = comps

for i := 0; i < t.rowIndex-1; i++ {
chkRowCeil := t.rows[t.rowIndex].Compatibilities[i]
chkColCeil := t.rows[i].Compatibilities[t.rowIndex]

if chkRowCeil.Type != chkColCeil.Type {
return fmt.Errorf("value at %d, %d is not equals to at %d, %d", t.rowIndex, i, i, t.rowIndex)
}
}

t.rowIndex++

return nil
}

func (t *LockCompatibilityTable) Test(lock distlock.Lock) error {
row, ok := lo.Find(t.rows, func(row LockCompatibilityTableRow) bool { return lock.Name == row.LockName })
if !ok {
return fmt.Errorf("unknow lock name %s", lock.Name)
}

for i, c := range row.Compatibilities {
if c.Type == LOCK_COMPATIBILITY_COMPATIBLE {
continue
}

if c.Type == LOCK_COMPATIBILITY_UNCOMPATIBLE {
if t.rows[i].HasSuchLockFn() {
return distlock.NewLockTargetBusyError(t.rows[i].LockName)
}
}

if c.Type == LOCK_COMPATIBILITY_SPECIAL {
if !c.SpecialFn(lock, t.rows[i].LockName) {
return distlock.NewLockTargetBusyError(t.rows[i].LockName)
}
}
}

return nil
}

+ 41
- 0
pkgs/distlock/lockprovider/lock_compatibility_table_test.go View File

@@ -0,0 +1,41 @@
package lockprovider

import (
"testing"

. "github.com/smartystreets/goconvey/convey"
"gitlink.org.cn/cloudream/common/pkgs/distlock"
)

func Test_LockCompatibilityTable(t *testing.T) {
Convey("兼容,互斥,特殊比较", t, func() {
table := LockCompatibilityTable{}

table.
Column("l1", func() bool { return true }).
Column("l2", func() bool { return true }).
Column("l3", func() bool { return false })

comp := LockCompatible()
uncp := LockUncompatible()
spcl := LockSpecial(func(lock distlock.Lock, testLockName string) bool { return true })
table.Row(comp, comp, comp)
table.Row(comp, uncp, comp)
table.Row(comp, comp, spcl)

err := table.Test(distlock.Lock{
Name: "l1",
})
So(err, ShouldBeNil)

err = table.Test(distlock.Lock{
Name: "l2",
})
So(err, ShouldNotBeNil)

err = table.Test(distlock.Lock{
Name: "l3",
})
So(err, ShouldBeNil)
})
}

+ 184
- 0
pkgs/distlock/lockprovider/metadata_lock.go View File

@@ -0,0 +1,184 @@
package lockprovider

import (
"fmt"

"github.com/samber/lo"
"gitlink.org.cn/cloudream/common/pkgs/distlock"
mylo "gitlink.org.cn/cloudream/common/utils/lo"
)

const (
MetadataLockPathPrefix = "Metadata"

METADATA_SET_READ_LOCK = "SetRead"
METADATA_SET_WRITE_LOCK = "SetWrite"
METADATA_SET_CREATE_LOCK = "SetCreate"

METADATA_ELEMENT_READ_LOCK = "ElementRead"
METADATA_ELEMENT_WRITE_LOCK = "ElementWrite"
METADATA_ELEMENT_CREATE_LOCK = "ElementCreate"
)

type metadataElementLock struct {
target StringLockTarget
requestIDs []string
}

type MetadataLock struct {
setReadReqIDs []string
setWriteReqIDs []string
setCreateReqIDs []string

elementReadLocks []*metadataElementLock
elementWriteLocks []*metadataElementLock
elementCreateLocks []*metadataElementLock

lockCompatibilityTable LockCompatibilityTable
}

func NewMetadataLock() *MetadataLock {

metadataLock := MetadataLock{
lockCompatibilityTable: LockCompatibilityTable{},
}

compTable := &metadataLock.lockCompatibilityTable

compTable.
Column(METADATA_ELEMENT_READ_LOCK, func() bool { return len(metadataLock.elementReadLocks) > 0 }).
Column(METADATA_ELEMENT_WRITE_LOCK, func() bool { return len(metadataLock.elementWriteLocks) > 0 }).
Column(METADATA_ELEMENT_CREATE_LOCK, func() bool { return len(metadataLock.elementCreateLocks) > 0 }).
Column(METADATA_SET_READ_LOCK, func() bool { return len(metadataLock.setReadReqIDs) > 0 }).
Column(METADATA_SET_WRITE_LOCK, func() bool { return len(metadataLock.setWriteReqIDs) > 0 }).
Column(METADATA_SET_CREATE_LOCK, func() bool { return len(metadataLock.setCreateReqIDs) > 0 })

comp := LockCompatible()
uncp := LockUncompatible()
trgt := LockSpecial(func(lock distlock.Lock, testLockName string) bool {
strTar := lock.Target.(StringLockTarget)
if testLockName == METADATA_ELEMENT_READ_LOCK {
// 如果没有任何锁的锁对象与当前的锁对象冲突,那么这个锁可以加
return lo.NoneBy(metadataLock.elementReadLocks, func(other *metadataElementLock) bool { return strTar.IsConflict(&other.target) })
}

if testLockName == METADATA_ELEMENT_WRITE_LOCK {
return lo.NoneBy(metadataLock.elementWriteLocks, func(other *metadataElementLock) bool { return strTar.IsConflict(&other.target) })
}

return lo.NoneBy(metadataLock.elementCreateLocks, func(other *metadataElementLock) bool { return strTar.IsConflict(&other.target) })
})

compTable.MustRow(comp, trgt, comp, comp, uncp, comp)
compTable.MustRow(trgt, trgt, comp, uncp, uncp, comp)
compTable.MustRow(comp, comp, trgt, uncp, uncp, uncp)
compTable.MustRow(comp, uncp, uncp, comp, uncp, uncp)
compTable.MustRow(uncp, uncp, uncp, uncp, uncp, uncp)
compTable.MustRow(comp, comp, uncp, uncp, uncp, uncp)

return &metadataLock
}

// CanLock 判断这个锁能否锁定成功
func (l *MetadataLock) CanLock(lock distlock.Lock) error {
return l.lockCompatibilityTable.Test(lock)
}

// 锁定
func (l *MetadataLock) Lock(reqID string, lock distlock.Lock) error {
switch lock.Name {
case METADATA_SET_READ_LOCK:
l.setReadReqIDs = append(l.setReadReqIDs, reqID)
case METADATA_SET_WRITE_LOCK:
l.setWriteReqIDs = append(l.setWriteReqIDs, reqID)
case METADATA_SET_CREATE_LOCK:
l.setCreateReqIDs = append(l.setCreateReqIDs, reqID)

case METADATA_ELEMENT_READ_LOCK:
l.elementReadLocks = l.addElementLock(lock, l.elementReadLocks, reqID)
case METADATA_ELEMENT_WRITE_LOCK:
l.elementWriteLocks = l.addElementLock(lock, l.elementWriteLocks, reqID)
case METADATA_ELEMENT_CREATE_LOCK:
l.elementCreateLocks = l.addElementLock(lock, l.elementCreateLocks, reqID)

default:
return fmt.Errorf("unknow lock name: %s", lock.Name)
}

return nil
}

func (l *MetadataLock) addElementLock(lock distlock.Lock, locks []*metadataElementLock, reqID string) []*metadataElementLock {
strTarget := lock.Target.(StringLockTarget)
lck, ok := lo.Find(locks, func(l *metadataElementLock) bool { return strTarget.IsConflict(&l.target) })
if !ok {
lck = &metadataElementLock{
target: strTarget,
}
locks = append(locks, lck)
}

lck.requestIDs = append(lck.requestIDs, reqID)
return locks
}

// 解锁
func (l *MetadataLock) Unlock(reqID string, lock distlock.Lock) error {
switch lock.Name {
case METADATA_SET_READ_LOCK:
l.setReadReqIDs = mylo.Remove(l.setReadReqIDs, reqID)
case METADATA_SET_WRITE_LOCK:
l.setWriteReqIDs = mylo.Remove(l.setWriteReqIDs, reqID)
case METADATA_SET_CREATE_LOCK:
l.setCreateReqIDs = mylo.Remove(l.setCreateReqIDs, reqID)

case METADATA_ELEMENT_READ_LOCK:
l.elementReadLocks = l.removeElementLock(lock, l.elementReadLocks, reqID)
case METADATA_ELEMENT_WRITE_LOCK:
l.elementWriteLocks = l.removeElementLock(lock, l.elementWriteLocks, reqID)
case METADATA_ELEMENT_CREATE_LOCK:
l.elementCreateLocks = l.removeElementLock(lock, l.elementCreateLocks, reqID)

default:
return fmt.Errorf("unknow lock name: %s", lock.Name)
}

return nil
}

func (l *MetadataLock) removeElementLock(lock distlock.Lock, locks []*metadataElementLock, reqID string) []*metadataElementLock {
strTarget := lock.Target.(StringLockTarget)
lck, index, ok := lo.FindIndexOf(locks, func(l *metadataElementLock) bool { return strTarget.IsConflict(&l.target) })
if !ok {
return locks
}

lck.requestIDs = mylo.Remove(lck.requestIDs, reqID)

if len(lck.requestIDs) == 0 {
locks = mylo.RemoveAt(locks, index)
}

return locks
}

// GetTargetString 将锁对象序列化为字符串,方便存储到ETCD
func (l *MetadataLock) GetTargetString(target any) (string, error) {
tar := target.(StringLockTarget)
return StringLockTargetToString(&tar)
}

// ParseTargetString 解析字符串格式的锁对象数据
func (l *MetadataLock) ParseTargetString(targetStr string) (any, error) {
return StringLockTargetFromString(targetStr)
}

// Clear 清除内部所有状态
func (l *MetadataLock) Clear() {
l.setReadReqIDs = nil
l.setWriteReqIDs = nil
l.setCreateReqIDs = nil
l.elementReadLocks = nil
l.elementWriteLocks = nil
l.elementCreateLocks = nil
}

+ 226
- 0
pkgs/distlock/lockprovider/storage_lock.go View File

@@ -0,0 +1,226 @@
package lockprovider

import (
"fmt"

"github.com/samber/lo"
"gitlink.org.cn/cloudream/common/pkgs/distlock"
mylo "gitlink.org.cn/cloudream/common/utils/lo"
)

const (
StorageLockPathPrefix = "Storage"

STORAGE_SET_READ_LOCK = "SetRead"
STORAGE_SET_WRITE_LOCK = "SetWrite"
STORAGE_SET_CREATE_LOCK = "SetCreate"

STORAGE_ELEMENT_READ_LOCK = "ElementRead"
STORAGE_ELEMENT_WRITE_LOCK = "ElementWrite"
STORAGE_ELEMENT_CREATE_LOCK = "ElementCreate"

STORAGE_STORAGE_ID_PATH_INDEX = 1
)

type StorageLock struct {
nodeLocks map[string]*StorageNodeLock
dummyLock *StorageNodeLock
}

func NewStorageLock() *StorageLock {
return &StorageLock{
nodeLocks: make(map[string]*StorageNodeLock),
dummyLock: NewStorageNodeLock(),
}
}

// CanLock 判断这个锁能否锁定成功
func (l *StorageLock) CanLock(lock distlock.Lock) error {
nodeLock, ok := l.nodeLocks[lock.Path[STORAGE_STORAGE_ID_PATH_INDEX]]
if !ok {
// 不能直接返回nil,因为如果锁数据的格式不对,也不能获取锁。
// 这里使用一个空Provider来进行检查。
return l.dummyLock.CanLock(lock)
}

return nodeLock.CanLock(lock)
}

// 锁定。在内部可以不用判断能否加锁,外部需要保证调用此函数前调用了CanLock进行检查
func (l *StorageLock) Lock(reqID string, lock distlock.Lock) error {
nodeID := lock.Path[STORAGE_STORAGE_ID_PATH_INDEX]

nodeLock, ok := l.nodeLocks[nodeID]
if !ok {
nodeLock = NewStorageNodeLock()
l.nodeLocks[nodeID] = nodeLock
}

return nodeLock.Lock(reqID, lock)
}

// 解锁
func (l *StorageLock) Unlock(reqID string, lock distlock.Lock) error {
nodeID := lock.Path[STORAGE_STORAGE_ID_PATH_INDEX]

nodeLock, ok := l.nodeLocks[nodeID]
if !ok {
return nil
}

return nodeLock.Unlock(reqID, lock)
}

// GetTargetString 将锁对象序列化为字符串,方便存储到ETCD
func (l *StorageLock) GetTargetString(target any) (string, error) {
tar := target.(StringLockTarget)
return StringLockTargetToString(&tar)
}

// ParseTargetString 解析字符串格式的锁对象数据
func (l *StorageLock) ParseTargetString(targetStr string) (any, error) {
return StringLockTargetFromString(targetStr)
}

// Clear 清除内部所有状态
func (l *StorageLock) Clear() {
l.nodeLocks = make(map[string]*StorageNodeLock)
}

type storageElementLock struct {
target StringLockTarget
requestIDs []string
}

type StorageNodeLock struct {
setReadReqIDs []string
setWriteReqIDs []string
setCreateReqIDs []string

elementReadLocks []*storageElementLock
elementWriteLocks []*storageElementLock
elementCreateLocks []*storageElementLock

lockCompatibilityTable LockCompatibilityTable
}

func NewStorageNodeLock() *StorageNodeLock {

storageLock := StorageNodeLock{
lockCompatibilityTable: LockCompatibilityTable{},
}

compTable := &storageLock.lockCompatibilityTable

compTable.
Column(STORAGE_ELEMENT_READ_LOCK, func() bool { return len(storageLock.elementReadLocks) > 0 }).
Column(STORAGE_ELEMENT_WRITE_LOCK, func() bool { return len(storageLock.elementWriteLocks) > 0 }).
Column(STORAGE_ELEMENT_CREATE_LOCK, func() bool { return len(storageLock.elementCreateLocks) > 0 }).
Column(STORAGE_SET_READ_LOCK, func() bool { return len(storageLock.setReadReqIDs) > 0 }).
Column(STORAGE_SET_WRITE_LOCK, func() bool { return len(storageLock.setWriteReqIDs) > 0 }).
Column(STORAGE_SET_CREATE_LOCK, func() bool { return len(storageLock.setCreateReqIDs) > 0 })

comp := LockCompatible()
uncp := LockUncompatible()
trgt := LockSpecial(func(lock distlock.Lock, testLockName string) bool {
strTar := lock.Target.(StringLockTarget)
if testLockName == STORAGE_ELEMENT_READ_LOCK {
// 如果没有任何锁的锁对象与当前的锁对象冲突,那么这个锁可以加
return lo.NoneBy(storageLock.elementReadLocks, func(other *storageElementLock) bool { return strTar.IsConflict(&other.target) })
}

if testLockName == STORAGE_ELEMENT_WRITE_LOCK {
return lo.NoneBy(storageLock.elementWriteLocks, func(other *storageElementLock) bool { return strTar.IsConflict(&other.target) })
}

return lo.NoneBy(storageLock.elementCreateLocks, func(other *storageElementLock) bool { return strTar.IsConflict(&other.target) })
})

compTable.MustRow(comp, trgt, comp, comp, uncp, comp)
compTable.MustRow(trgt, trgt, comp, uncp, uncp, comp)
compTable.MustRow(comp, comp, trgt, uncp, uncp, uncp)
compTable.MustRow(comp, uncp, uncp, comp, uncp, uncp)
compTable.MustRow(uncp, uncp, uncp, uncp, uncp, uncp)
compTable.MustRow(comp, comp, uncp, uncp, uncp, uncp)

return &storageLock
}

// CanLock 判断这个锁能否锁定成功
func (l *StorageNodeLock) CanLock(lock distlock.Lock) error {
return l.lockCompatibilityTable.Test(lock)
}

// 锁定
func (l *StorageNodeLock) Lock(reqID string, lock distlock.Lock) error {
switch lock.Name {
case STORAGE_SET_READ_LOCK:
l.setReadReqIDs = append(l.setReadReqIDs, reqID)
case STORAGE_SET_WRITE_LOCK:
l.setWriteReqIDs = append(l.setWriteReqIDs, reqID)
case STORAGE_SET_CREATE_LOCK:
l.setCreateReqIDs = append(l.setCreateReqIDs, reqID)

case STORAGE_ELEMENT_READ_LOCK:
l.elementReadLocks = l.addElementLock(lock, l.elementReadLocks, reqID)
case STORAGE_ELEMENT_WRITE_LOCK:
l.elementWriteLocks = l.addElementLock(lock, l.elementWriteLocks, reqID)

default:
return fmt.Errorf("unknow lock name: %s", lock.Name)
}

return nil
}

func (l *StorageNodeLock) addElementLock(lock distlock.Lock, locks []*storageElementLock, reqID string) []*storageElementLock {
strTarget := lock.Target.(StringLockTarget)
lck, ok := lo.Find(locks, func(l *storageElementLock) bool { return strTarget.IsConflict(&l.target) })
if !ok {
lck = &storageElementLock{
target: strTarget,
}
locks = append(locks, lck)
}

lck.requestIDs = append(lck.requestIDs, reqID)
return locks
}

// 解锁
func (l *StorageNodeLock) Unlock(reqID string, lock distlock.Lock) error {
switch lock.Name {
case STORAGE_SET_READ_LOCK:
l.setReadReqIDs = mylo.Remove(l.setReadReqIDs, reqID)
case STORAGE_SET_WRITE_LOCK:
l.setWriteReqIDs = mylo.Remove(l.setWriteReqIDs, reqID)
case STORAGE_SET_CREATE_LOCK:
l.setCreateReqIDs = mylo.Remove(l.setCreateReqIDs, reqID)

case STORAGE_ELEMENT_READ_LOCK:
l.elementReadLocks = l.removeElementLock(lock, l.elementReadLocks, reqID)
case STORAGE_ELEMENT_WRITE_LOCK:
l.elementWriteLocks = l.removeElementLock(lock, l.elementWriteLocks, reqID)

default:
return fmt.Errorf("unknow lock name: %s", lock.Name)
}

return nil
}

func (l *StorageNodeLock) removeElementLock(lock distlock.Lock, locks []*storageElementLock, reqID string) []*storageElementLock {
strTarget := lock.Target.(StringLockTarget)
lck, index, ok := lo.FindIndexOf(locks, func(l *storageElementLock) bool { return strTarget.IsConflict(&l.target) })
if !ok {
return locks
}

lck.requestIDs = mylo.Remove(lck.requestIDs, reqID)

if len(lck.requestIDs) == 0 {
locks = mylo.RemoveAt(locks, index)
}

return locks
}

+ 78
- 0
pkgs/distlock/lockprovider/string_lock_target.go View File

@@ -0,0 +1,78 @@
package lockprovider

import (
"fmt"

"github.com/samber/lo"
"gitlink.org.cn/cloudream/common/utils/serder"
)

type StringLockTarget struct {
Components []StringLockTargetComponet `json:"components"`
}

func NewStringLockTarget() *StringLockTarget {
return &StringLockTarget{}
}

// Add 添加一个Component,并将其内容设置为compValues
func (t *StringLockTarget) Add(compValues ...any) *StringLockTarget {
t.Components = append(t.Components, StringLockTargetComponet{
Values: lo.Map(compValues, func(val any, index int) string { return fmt.Sprintf("%v", val) }),
})

return t
}

// IsConflict 判断两个锁对象是否冲突。注:只有相同的结构的Target才有意义
func (t *StringLockTarget) IsConflict(other *StringLockTarget) bool {
if len(t.Components) != len(other.Components) {
return false
}

if len(t.Components) == 0 {
return true
}

for i := 0; i < len(t.Components); i++ {
if t.Components[i].IsEquals(&other.Components[i]) {
return true
}
}

return false
}

type StringLockTargetComponet struct {
Values []string `json:"values"`
}

// IsEquals 判断两个Component是否相同。注:只有相同的结构的Component才有意义
func (t *StringLockTargetComponet) IsEquals(other *StringLockTargetComponet) bool {
if len(t.Values) != len(other.Values) {
return false
}

for i := 0; i < len(t.Values); i++ {
if t.Values[i] != other.Values[i] {
return false
}
}

return true
}

func StringLockTargetToString(target *StringLockTarget) (string, error) {
data, err := serder.ObjectToJSON(target)
if err != nil {
return "", err
}

return string(data), nil
}

func StringLockTargetFromString(str string) (StringLockTarget, error) {
var ret StringLockTarget
err := serder.JSONToObject([]byte(str), &ret)
return ret, err
}

+ 60
- 0
pkgs/distlock/lockprovider/string_lock_target_test.go View File

@@ -0,0 +1,60 @@
package lockprovider

import (
"testing"

. "github.com/smartystreets/goconvey/convey"
)

func Test_StringLockTarget(t *testing.T) {
cases := []struct {
title string
target1 *StringLockTarget
target2 *StringLockTarget
wantIsConflict bool
}{
{
title: "没有任何段算冲突",
target1: NewStringLockTarget(),
target2: NewStringLockTarget(),
wantIsConflict: true,
},
{
title: "有段,但段内为空,算冲突",
target1: NewStringLockTarget().Add(),
target2: NewStringLockTarget().Add(),
wantIsConflict: true,
},
{
title: "每一段不同才不冲突",
target1: NewStringLockTarget().Add("a").Add("b"),
target2: NewStringLockTarget().Add("b").Add("c"),
wantIsConflict: false,
},
{
title: "只要有一段相同就冲突",
target1: NewStringLockTarget().Add("a").Add("b"),
target2: NewStringLockTarget().Add("a").Add("c"),
wantIsConflict: true,
},
{
title: "同段内,只要有一个数据不同就不冲突",
target1: NewStringLockTarget().Add("a", "b"),
target2: NewStringLockTarget().Add("b", "b"),
wantIsConflict: false,
},
{
title: "同段内,只要每个数据都相同才不冲突",
target1: NewStringLockTarget().Add("a", "b"),
target2: NewStringLockTarget().Add("a", "b"),
wantIsConflict: true,
},
}

for _, ca := range cases {
Convey(ca.title, t, func() {
ret := ca.target1.IsConflict(ca.target2)
So(ret, ShouldEqual, ca.wantIsConflict)
})
}
}

+ 64
- 0
pkgs/distlock/reqbuilder/ipfs.go View File

@@ -0,0 +1,64 @@
package reqbuilder

import (
"strconv"

"gitlink.org.cn/cloudream/common/pkgs/distlock"
"gitlink.org.cn/cloudream/storage-common/pkgs/distlock/lockprovider"
)

type IPFSLockReqBuilder struct {
*LockRequestBuilder
}

func (b *LockRequestBuilder) IPFS() *IPFSLockReqBuilder {
return &IPFSLockReqBuilder{LockRequestBuilder: b}
}
func (b *IPFSLockReqBuilder) ReadOneRep(nodeID int64, fileHash string) *IPFSLockReqBuilder {
b.locks = append(b.locks, distlock.Lock{
Path: b.makePath(nodeID),
Name: lockprovider.IPFS_ELEMENT_READ_LOCK,
Target: *lockprovider.NewStringLockTarget().Add(fileHash),
})
return b
}

func (b *IPFSLockReqBuilder) WriteOneRep(nodeID int64, fileHash string) *IPFSLockReqBuilder {
b.locks = append(b.locks, distlock.Lock{
Path: b.makePath(nodeID),
Name: lockprovider.IPFS_ELEMENT_WRITE_LOCK,
Target: *lockprovider.NewStringLockTarget().Add(fileHash),
})
return b
}

func (b *IPFSLockReqBuilder) ReadAnyRep(nodeID int64) *IPFSLockReqBuilder {
b.locks = append(b.locks, distlock.Lock{
Path: b.makePath(nodeID),
Name: lockprovider.IPFS_SET_READ_LOCK,
Target: *lockprovider.NewStringLockTarget(),
})
return b
}

func (b *IPFSLockReqBuilder) WriteAnyRep(nodeID int64) *IPFSLockReqBuilder {
b.locks = append(b.locks, distlock.Lock{
Path: b.makePath(nodeID),
Name: lockprovider.IPFS_SET_WRITE_LOCK,
Target: *lockprovider.NewStringLockTarget(),
})
return b
}

func (b *IPFSLockReqBuilder) CreateAnyRep(nodeID int64) *IPFSLockReqBuilder {
b.locks = append(b.locks, distlock.Lock{
Path: b.makePath(nodeID),
Name: lockprovider.IPFS_SET_CREATE_LOCK,
Target: *lockprovider.NewStringLockTarget(),
})
return b
}

func (b *IPFSLockReqBuilder) makePath(nodeID int64) []string {
return []string{lockprovider.IPFSLockPathPrefix, strconv.FormatInt(nodeID, 10)}
}

+ 31
- 0
pkgs/distlock/reqbuilder/lock_request_builder.go View File

@@ -0,0 +1,31 @@
package reqbuilder

import (
"gitlink.org.cn/cloudream/common/pkgs/distlock"
"gitlink.org.cn/cloudream/common/pkgs/distlock/service"
mylo "gitlink.org.cn/cloudream/common/utils/lo"
)

type LockRequestBuilder struct {
locks []distlock.Lock
}

func NewBuilder() *LockRequestBuilder {
return &LockRequestBuilder{}
}

func (b *LockRequestBuilder) Build() distlock.LockRequest {
return distlock.LockRequest{
Locks: mylo.ArrayClone(b.locks),
}
}

func (b *LockRequestBuilder) MutexLock(svc *service.Service) (*service.Mutex, error) {
mutex := service.NewMutex(svc, b.Build())
err := mutex.Lock()
if err != nil {
return nil, err
}

return mutex, nil
}

+ 17
- 0
pkgs/distlock/reqbuilder/metadata.go View File

@@ -0,0 +1,17 @@
package reqbuilder

import (
"gitlink.org.cn/cloudream/storage-common/pkgs/distlock/lockprovider"
)

type MetadataLockReqBuilder struct {
*LockRequestBuilder
}

func (b *LockRequestBuilder) Metadata() *MetadataLockReqBuilder {
return &MetadataLockReqBuilder{LockRequestBuilder: b}
}

func (b *MetadataLockReqBuilder) makePath(tableName string) []string {
return []string{lockprovider.MetadataLockPathPrefix, tableName}
}

+ 63
- 0
pkgs/distlock/reqbuilder/metadata_bucket.go View File

@@ -0,0 +1,63 @@
package reqbuilder

import (
"gitlink.org.cn/cloudream/common/pkgs/distlock"
"gitlink.org.cn/cloudream/storage-common/pkgs/distlock/lockprovider"
)

type MetadataBucketLockReqBuilder struct {
*MetadataLockReqBuilder
}

func (b *MetadataLockReqBuilder) Bucket() *MetadataBucketLockReqBuilder {
return &MetadataBucketLockReqBuilder{MetadataLockReqBuilder: b}
}

func (b *MetadataBucketLockReqBuilder) ReadOne(bucketID int64) *MetadataBucketLockReqBuilder {
b.locks = append(b.locks, distlock.Lock{
Path: b.makePath("Bucket"),
Name: lockprovider.METADATA_ELEMENT_READ_LOCK,
Target: *lockprovider.NewStringLockTarget().Add(bucketID),
})
return b
}
func (b *MetadataBucketLockReqBuilder) WriteOne(bucketID int64) *MetadataBucketLockReqBuilder {
b.locks = append(b.locks, distlock.Lock{
Path: b.makePath("Bucket"),
Name: lockprovider.METADATA_ELEMENT_WRITE_LOCK,
Target: *lockprovider.NewStringLockTarget().Add(bucketID),
})
return b
}
func (b *MetadataBucketLockReqBuilder) CreateOne(userID int64, bucketName string) *MetadataBucketLockReqBuilder {
b.locks = append(b.locks, distlock.Lock{
Path: b.makePath("Bucket"),
Name: lockprovider.METADATA_ELEMENT_CREATE_LOCK,
Target: *lockprovider.NewStringLockTarget().Add(userID, bucketName),
})
return b
}
func (b *MetadataBucketLockReqBuilder) ReadAny() *MetadataBucketLockReqBuilder {
b.locks = append(b.locks, distlock.Lock{
Path: b.makePath("Bucket"),
Name: lockprovider.METADATA_SET_READ_LOCK,
Target: *lockprovider.NewStringLockTarget(),
})
return b
}
func (b *MetadataBucketLockReqBuilder) WriteAny() *MetadataBucketLockReqBuilder {
b.locks = append(b.locks, distlock.Lock{
Path: b.makePath("Bucket"),
Name: lockprovider.METADATA_SET_WRITE_LOCK,
Target: *lockprovider.NewStringLockTarget(),
})
return b
}
func (b *MetadataBucketLockReqBuilder) CreateAny() *MetadataBucketLockReqBuilder {
b.locks = append(b.locks, distlock.Lock{
Path: b.makePath("Bucket"),
Name: lockprovider.METADATA_SET_CREATE_LOCK,
Target: *lockprovider.NewStringLockTarget(),
})
return b
}

+ 63
- 0
pkgs/distlock/reqbuilder/metadata_cache.go View File

@@ -0,0 +1,63 @@
package reqbuilder

import (
"gitlink.org.cn/cloudream/common/pkgs/distlock"
"gitlink.org.cn/cloudream/storage-common/pkgs/distlock/lockprovider"
)

type MetadataCacheLockReqBuilder struct {
*MetadataLockReqBuilder
}

func (b *MetadataLockReqBuilder) Cache() *MetadataCacheLockReqBuilder {
return &MetadataCacheLockReqBuilder{MetadataLockReqBuilder: b}
}

func (b *MetadataCacheLockReqBuilder) ReadOne(nodeID int64, fileHash string) *MetadataCacheLockReqBuilder {
b.locks = append(b.locks, distlock.Lock{
Path: b.makePath("Cache"),
Name: lockprovider.METADATA_ELEMENT_READ_LOCK,
Target: *lockprovider.NewStringLockTarget().Add(nodeID, fileHash),
})
return b
}
func (b *MetadataCacheLockReqBuilder) WriteOne(nodeID int64, fileHash string) *MetadataCacheLockReqBuilder {
b.locks = append(b.locks, distlock.Lock{
Path: b.makePath("Cache"),
Name: lockprovider.METADATA_ELEMENT_WRITE_LOCK,
Target: *lockprovider.NewStringLockTarget().Add(nodeID, fileHash),
})
return b
}
func (b *MetadataCacheLockReqBuilder) CreateOne(nodeID int64, fileHash string) *MetadataCacheLockReqBuilder {
b.locks = append(b.locks, distlock.Lock{
Path: b.makePath("Cache"),
Name: lockprovider.METADATA_ELEMENT_CREATE_LOCK,
Target: *lockprovider.NewStringLockTarget().Add(nodeID, fileHash),
})
return b
}
func (b *MetadataCacheLockReqBuilder) ReadAny() *MetadataCacheLockReqBuilder {
b.locks = append(b.locks, distlock.Lock{
Path: b.makePath("Cache"),
Name: lockprovider.METADATA_SET_READ_LOCK,
Target: *lockprovider.NewStringLockTarget(),
})
return b
}
func (b *MetadataCacheLockReqBuilder) WriteAny() *MetadataCacheLockReqBuilder {
b.locks = append(b.locks, distlock.Lock{
Path: b.makePath("Cache"),
Name: lockprovider.METADATA_SET_WRITE_LOCK,
Target: *lockprovider.NewStringLockTarget(),
})
return b
}
func (b *MetadataCacheLockReqBuilder) CreateAny() *MetadataCacheLockReqBuilder {
b.locks = append(b.locks, distlock.Lock{
Path: b.makePath("Cache"),
Name: lockprovider.METADATA_SET_CREATE_LOCK,
Target: *lockprovider.NewStringLockTarget(),
})
return b
}

+ 63
- 0
pkgs/distlock/reqbuilder/metadata_node.go View File

@@ -0,0 +1,63 @@
package reqbuilder

import (
"gitlink.org.cn/cloudream/common/pkgs/distlock"
"gitlink.org.cn/cloudream/storage-common/pkgs/distlock/lockprovider"
)

type MetadataNodeLockReqBuilder struct {
*MetadataLockReqBuilder
}

func (b *MetadataLockReqBuilder) Node() *MetadataNodeLockReqBuilder {
return &MetadataNodeLockReqBuilder{MetadataLockReqBuilder: b}
}

func (b *MetadataNodeLockReqBuilder) ReadOne(nodeID int64) *MetadataNodeLockReqBuilder {
b.locks = append(b.locks, distlock.Lock{
Path: b.makePath("Node"),
Name: lockprovider.METADATA_ELEMENT_READ_LOCK,
Target: *lockprovider.NewStringLockTarget().Add(nodeID),
})
return b
}
func (b *MetadataNodeLockReqBuilder) WriteOne(nodeID int64) *MetadataNodeLockReqBuilder {
b.locks = append(b.locks, distlock.Lock{
Path: b.makePath("Node"),
Name: lockprovider.METADATA_ELEMENT_WRITE_LOCK,
Target: *lockprovider.NewStringLockTarget().Add(nodeID),
})
return b
}
func (b *MetadataNodeLockReqBuilder) CreateOne() *MetadataNodeLockReqBuilder {
b.locks = append(b.locks, distlock.Lock{
Path: b.makePath("Node"),
Name: lockprovider.METADATA_ELEMENT_CREATE_LOCK,
Target: *lockprovider.NewStringLockTarget(),
})
return b
}
func (b *MetadataNodeLockReqBuilder) ReadAny() *MetadataNodeLockReqBuilder {
b.locks = append(b.locks, distlock.Lock{
Path: b.makePath("Node"),
Name: lockprovider.METADATA_SET_READ_LOCK,
Target: *lockprovider.NewStringLockTarget(),
})
return b
}
func (b *MetadataNodeLockReqBuilder) WriteAny() *MetadataNodeLockReqBuilder {
b.locks = append(b.locks, distlock.Lock{
Path: b.makePath("Node"),
Name: lockprovider.METADATA_SET_WRITE_LOCK,
Target: *lockprovider.NewStringLockTarget(),
})
return b
}
func (b *MetadataNodeLockReqBuilder) CreateAny() *MetadataNodeLockReqBuilder {
b.locks = append(b.locks, distlock.Lock{
Path: b.makePath("Node"),
Name: lockprovider.METADATA_SET_CREATE_LOCK,
Target: *lockprovider.NewStringLockTarget(),
})
return b
}

+ 65
- 0
pkgs/distlock/reqbuilder/metadata_object.go View File

@@ -0,0 +1,65 @@
package reqbuilder

import (
"gitlink.org.cn/cloudream/common/pkgs/distlock"
"gitlink.org.cn/cloudream/storage-common/pkgs/distlock/lockprovider"
)

// TODO 可以考虑增加基于PackageID的锁,让访问不同Package的Object的操作能并行

type MetadataObjectLockReqBuilder struct {
*MetadataLockReqBuilder
}

func (b *MetadataLockReqBuilder) Object() *MetadataObjectLockReqBuilder {
return &MetadataObjectLockReqBuilder{MetadataLockReqBuilder: b}
}

func (b *MetadataObjectLockReqBuilder) ReadOne(objectID int64) *MetadataObjectLockReqBuilder {
b.locks = append(b.locks, distlock.Lock{
Path: b.makePath("Object"),
Name: lockprovider.METADATA_ELEMENT_READ_LOCK,
Target: *lockprovider.NewStringLockTarget().Add(objectID),
})
return b
}
func (b *MetadataObjectLockReqBuilder) WriteOne(objectID int64) *MetadataObjectLockReqBuilder {
b.locks = append(b.locks, distlock.Lock{
Path: b.makePath("Object"),
Name: lockprovider.METADATA_ELEMENT_WRITE_LOCK,
Target: *lockprovider.NewStringLockTarget().Add(objectID),
})
return b
}
func (b *MetadataObjectLockReqBuilder) CreateOne(bucketID int64, objectName string) *MetadataObjectLockReqBuilder {
b.locks = append(b.locks, distlock.Lock{
Path: b.makePath("Object"),
Name: lockprovider.METADATA_ELEMENT_CREATE_LOCK,
Target: *lockprovider.NewStringLockTarget().Add(bucketID, objectName),
})
return b
}
func (b *MetadataObjectLockReqBuilder) ReadAny() *MetadataObjectLockReqBuilder {
b.locks = append(b.locks, distlock.Lock{
Path: b.makePath("Object"),
Name: lockprovider.METADATA_SET_READ_LOCK,
Target: *lockprovider.NewStringLockTarget(),
})
return b
}
func (b *MetadataObjectLockReqBuilder) WriteAny() *MetadataObjectLockReqBuilder {
b.locks = append(b.locks, distlock.Lock{
Path: b.makePath("Object"),
Name: lockprovider.METADATA_SET_WRITE_LOCK,
Target: *lockprovider.NewStringLockTarget(),
})
return b
}
func (b *MetadataObjectLockReqBuilder) CreateAny() *MetadataObjectLockReqBuilder {
b.locks = append(b.locks, distlock.Lock{
Path: b.makePath("Object"),
Name: lockprovider.METADATA_SET_CREATE_LOCK,
Target: *lockprovider.NewStringLockTarget(),
})
return b
}

+ 63
- 0
pkgs/distlock/reqbuilder/metadata_object_block.go View File

@@ -0,0 +1,63 @@
package reqbuilder

import (
"gitlink.org.cn/cloudream/common/pkgs/distlock"
"gitlink.org.cn/cloudream/storage-common/pkgs/distlock/lockprovider"
)

type MetadataObjectBlockLockReqBuilder struct {
*MetadataLockReqBuilder
}

func (b *MetadataLockReqBuilder) ObjectBlock() *MetadataObjectBlockLockReqBuilder {
return &MetadataObjectBlockLockReqBuilder{MetadataLockReqBuilder: b}
}

func (b *MetadataObjectBlockLockReqBuilder) ReadOne(objectID int) *MetadataObjectBlockLockReqBuilder {
b.locks = append(b.locks, distlock.Lock{
Path: b.makePath("ObjectBlock"),
Name: lockprovider.METADATA_ELEMENT_READ_LOCK,
Target: *lockprovider.NewStringLockTarget().Add(objectID),
})
return b
}
func (b *MetadataObjectBlockLockReqBuilder) WriteOne(objectID int) *MetadataObjectBlockLockReqBuilder {
b.locks = append(b.locks, distlock.Lock{
Path: b.makePath("ObjectBlock"),
Name: lockprovider.METADATA_ELEMENT_WRITE_LOCK,
Target: *lockprovider.NewStringLockTarget().Add(objectID),
})
return b
}
func (b *MetadataObjectBlockLockReqBuilder) CreateOne() *MetadataObjectBlockLockReqBuilder {
b.locks = append(b.locks, distlock.Lock{
Path: b.makePath("ObjectBlock"),
Name: lockprovider.METADATA_ELEMENT_CREATE_LOCK,
Target: *lockprovider.NewStringLockTarget(),
})
return b
}
func (b *MetadataObjectBlockLockReqBuilder) ReadAny() *MetadataObjectBlockLockReqBuilder {
b.locks = append(b.locks, distlock.Lock{
Path: b.makePath("ObjectBlock"),
Name: lockprovider.METADATA_SET_READ_LOCK,
Target: *lockprovider.NewStringLockTarget(),
})
return b
}
func (b *MetadataObjectBlockLockReqBuilder) WriteAny() *MetadataObjectBlockLockReqBuilder {
b.locks = append(b.locks, distlock.Lock{
Path: b.makePath("ObjectBlock"),
Name: lockprovider.METADATA_SET_WRITE_LOCK,
Target: *lockprovider.NewStringLockTarget(),
})
return b
}
func (b *MetadataObjectBlockLockReqBuilder) CreateAny() *MetadataObjectBlockLockReqBuilder {
b.locks = append(b.locks, distlock.Lock{
Path: b.makePath("ObjectBlock"),
Name: lockprovider.METADATA_SET_CREATE_LOCK,
Target: *lockprovider.NewStringLockTarget(),
})
return b
}

+ 63
- 0
pkgs/distlock/reqbuilder/metadata_object_rep.go View File

@@ -0,0 +1,63 @@
package reqbuilder

import (
"gitlink.org.cn/cloudream/common/pkgs/distlock"
"gitlink.org.cn/cloudream/storage-common/pkgs/distlock/lockprovider"
)

type MetadataObjectRepLockReqBuilder struct {
*MetadataLockReqBuilder
}

func (b *MetadataLockReqBuilder) ObjectRep() *MetadataObjectRepLockReqBuilder {
return &MetadataObjectRepLockReqBuilder{MetadataLockReqBuilder: b}
}

func (b *MetadataObjectRepLockReqBuilder) ReadOne(objectID int64) *MetadataObjectRepLockReqBuilder {
b.locks = append(b.locks, distlock.Lock{
Path: b.makePath("ObjectRep"),
Name: lockprovider.METADATA_ELEMENT_READ_LOCK,
Target: *lockprovider.NewStringLockTarget().Add(objectID),
})
return b
}
func (b *MetadataObjectRepLockReqBuilder) WriteOne(objectID int64) *MetadataObjectRepLockReqBuilder {
b.locks = append(b.locks, distlock.Lock{
Path: b.makePath("ObjectRep"),
Name: lockprovider.METADATA_ELEMENT_WRITE_LOCK,
Target: *lockprovider.NewStringLockTarget().Add(objectID),
})
return b
}
func (b *MetadataObjectRepLockReqBuilder) CreateOne() *MetadataObjectRepLockReqBuilder {
b.locks = append(b.locks, distlock.Lock{
Path: b.makePath("ObjectRep"),
Name: lockprovider.METADATA_ELEMENT_CREATE_LOCK,
Target: *lockprovider.NewStringLockTarget(),
})
return b
}
func (b *MetadataObjectRepLockReqBuilder) ReadAny() *MetadataObjectRepLockReqBuilder {
b.locks = append(b.locks, distlock.Lock{
Path: b.makePath("ObjectRep"),
Name: lockprovider.METADATA_SET_READ_LOCK,
Target: *lockprovider.NewStringLockTarget(),
})
return b
}
func (b *MetadataObjectRepLockReqBuilder) WriteAny() *MetadataObjectRepLockReqBuilder {
b.locks = append(b.locks, distlock.Lock{
Path: b.makePath("ObjectRep"),
Name: lockprovider.METADATA_SET_WRITE_LOCK,
Target: *lockprovider.NewStringLockTarget(),
})
return b
}
func (b *MetadataObjectRepLockReqBuilder) CreateAny() *MetadataObjectRepLockReqBuilder {
b.locks = append(b.locks, distlock.Lock{
Path: b.makePath("ObjectRep"),
Name: lockprovider.METADATA_SET_CREATE_LOCK,
Target: *lockprovider.NewStringLockTarget(),
})
return b
}

+ 63
- 0
pkgs/distlock/reqbuilder/metadata_package.go View File

@@ -0,0 +1,63 @@
package reqbuilder

import (
"gitlink.org.cn/cloudream/common/pkgs/distlock"
"gitlink.org.cn/cloudream/storage-common/pkgs/distlock/lockprovider"
)

type MetadataPackageLockReqBuilder struct {
*MetadataLockReqBuilder
}

func (b *MetadataLockReqBuilder) Package() *MetadataPackageLockReqBuilder {
return &MetadataPackageLockReqBuilder{MetadataLockReqBuilder: b}
}

func (b *MetadataPackageLockReqBuilder) ReadOne(packageID int64) *MetadataPackageLockReqBuilder {
b.locks = append(b.locks, distlock.Lock{
Path: b.makePath("Package"),
Name: lockprovider.METADATA_ELEMENT_READ_LOCK,
Target: *lockprovider.NewStringLockTarget().Add(packageID),
})
return b
}
func (b *MetadataPackageLockReqBuilder) WriteOne(packageID int64) *MetadataPackageLockReqBuilder {
b.locks = append(b.locks, distlock.Lock{
Path: b.makePath("Package"),
Name: lockprovider.METADATA_ELEMENT_WRITE_LOCK,
Target: *lockprovider.NewStringLockTarget().Add(packageID),
})
return b
}
func (b *MetadataPackageLockReqBuilder) CreateOne(bucketID int64, packageName string) *MetadataPackageLockReqBuilder {
b.locks = append(b.locks, distlock.Lock{
Path: b.makePath("Package"),
Name: lockprovider.METADATA_ELEMENT_CREATE_LOCK,
Target: *lockprovider.NewStringLockTarget().Add(bucketID, packageName),
})
return b
}
func (b *MetadataPackageLockReqBuilder) ReadAny() *MetadataPackageLockReqBuilder {
b.locks = append(b.locks, distlock.Lock{
Path: b.makePath("Package"),
Name: lockprovider.METADATA_SET_READ_LOCK,
Target: *lockprovider.NewStringLockTarget(),
})
return b
}
func (b *MetadataPackageLockReqBuilder) WriteAny() *MetadataPackageLockReqBuilder {
b.locks = append(b.locks, distlock.Lock{
Path: b.makePath("Package"),
Name: lockprovider.METADATA_SET_WRITE_LOCK,
Target: *lockprovider.NewStringLockTarget(),
})
return b
}
func (b *MetadataPackageLockReqBuilder) CreateAny() *MetadataPackageLockReqBuilder {
b.locks = append(b.locks, distlock.Lock{
Path: b.makePath("Package"),
Name: lockprovider.METADATA_SET_CREATE_LOCK,
Target: *lockprovider.NewStringLockTarget(),
})
return b
}

+ 63
- 0
pkgs/distlock/reqbuilder/metadata_storage_package.go View File

@@ -0,0 +1,63 @@
package reqbuilder

import (
"gitlink.org.cn/cloudream/common/pkgs/distlock"
"gitlink.org.cn/cloudream/storage-common/pkgs/distlock/lockprovider"
)

type MetadataStoragePackageLockReqBuilder struct {
*MetadataLockReqBuilder
}

func (b *MetadataLockReqBuilder) StoragePackage() *MetadataStoragePackageLockReqBuilder {
return &MetadataStoragePackageLockReqBuilder{MetadataLockReqBuilder: b}
}

func (b *MetadataStoragePackageLockReqBuilder) ReadOne(storageID int64, userID int64, packageID int64) *MetadataStoragePackageLockReqBuilder {
b.locks = append(b.locks, distlock.Lock{
Path: b.makePath("StoragePackage"),
Name: lockprovider.METADATA_ELEMENT_READ_LOCK,
Target: *lockprovider.NewStringLockTarget().Add(storageID, userID, packageID),
})
return b
}
func (b *MetadataStoragePackageLockReqBuilder) WriteOne(storageID int64, userID int64, packageID int64) *MetadataStoragePackageLockReqBuilder {
b.locks = append(b.locks, distlock.Lock{
Path: b.makePath("StoragePackage"),
Name: lockprovider.METADATA_ELEMENT_WRITE_LOCK,
Target: *lockprovider.NewStringLockTarget().Add(storageID, userID, packageID),
})
return b
}
func (b *MetadataStoragePackageLockReqBuilder) CreateOne(storageID int64, userID int64, packageID int64) *MetadataStoragePackageLockReqBuilder {
b.locks = append(b.locks, distlock.Lock{
Path: b.makePath("StoragePackage"),
Name: lockprovider.METADATA_ELEMENT_CREATE_LOCK,
Target: *lockprovider.NewStringLockTarget().Add(storageID, userID, packageID),
})
return b
}
func (b *MetadataStoragePackageLockReqBuilder) ReadAny() *MetadataStoragePackageLockReqBuilder {
b.locks = append(b.locks, distlock.Lock{
Path: b.makePath("StoragePackage"),
Name: lockprovider.METADATA_SET_READ_LOCK,
Target: *lockprovider.NewStringLockTarget(),
})
return b
}
func (b *MetadataStoragePackageLockReqBuilder) WriteAny() *MetadataStoragePackageLockReqBuilder {
b.locks = append(b.locks, distlock.Lock{
Path: b.makePath("StoragePackage"),
Name: lockprovider.METADATA_SET_WRITE_LOCK,
Target: *lockprovider.NewStringLockTarget(),
})
return b
}
func (b *MetadataStoragePackageLockReqBuilder) CreateAny() *MetadataStoragePackageLockReqBuilder {
b.locks = append(b.locks, distlock.Lock{
Path: b.makePath("StoragePackage"),
Name: lockprovider.METADATA_SET_CREATE_LOCK,
Target: *lockprovider.NewStringLockTarget(),
})
return b
}

+ 63
- 0
pkgs/distlock/reqbuilder/metadata_user_bucket.go View File

@@ -0,0 +1,63 @@
package reqbuilder

import (
"gitlink.org.cn/cloudream/common/pkgs/distlock"
"gitlink.org.cn/cloudream/storage-common/pkgs/distlock/lockprovider"
)

type MetadataUserBucketLockReqBuilder struct {
*MetadataLockReqBuilder
}

func (b *MetadataLockReqBuilder) UserBucket() *MetadataUserBucketLockReqBuilder {
return &MetadataUserBucketLockReqBuilder{MetadataLockReqBuilder: b}
}

func (b *MetadataUserBucketLockReqBuilder) ReadOne(userID int64, bucketID int64) *MetadataUserBucketLockReqBuilder {
b.locks = append(b.locks, distlock.Lock{
Path: b.makePath("UserBucket"),
Name: lockprovider.METADATA_ELEMENT_READ_LOCK,
Target: *lockprovider.NewStringLockTarget().Add(userID, bucketID),
})
return b
}
func (b *MetadataUserBucketLockReqBuilder) WriteOne(userID int64, bucketID int64) *MetadataUserBucketLockReqBuilder {
b.locks = append(b.locks, distlock.Lock{
Path: b.makePath("UserBucket"),
Name: lockprovider.METADATA_ELEMENT_WRITE_LOCK,
Target: *lockprovider.NewStringLockTarget().Add(userID, bucketID),
})
return b
}
func (b *MetadataUserBucketLockReqBuilder) CreateOne(userID int64, bucketID int64) *MetadataUserBucketLockReqBuilder {
b.locks = append(b.locks, distlock.Lock{
Path: b.makePath("UserBucket"),
Name: lockprovider.METADATA_ELEMENT_CREATE_LOCK,
Target: *lockprovider.NewStringLockTarget().Add(userID, bucketID),
})
return b
}
func (b *MetadataUserBucketLockReqBuilder) ReadAny() *MetadataUserBucketLockReqBuilder {
b.locks = append(b.locks, distlock.Lock{
Path: b.makePath("UserBucket"),
Name: lockprovider.METADATA_SET_READ_LOCK,
Target: *lockprovider.NewStringLockTarget(),
})
return b
}
func (b *MetadataUserBucketLockReqBuilder) WriteAny() *MetadataUserBucketLockReqBuilder {
b.locks = append(b.locks, distlock.Lock{
Path: b.makePath("UserBucket"),
Name: lockprovider.METADATA_SET_WRITE_LOCK,
Target: *lockprovider.NewStringLockTarget(),
})
return b
}
func (b *MetadataUserBucketLockReqBuilder) CreateAny() *MetadataUserBucketLockReqBuilder {
b.locks = append(b.locks, distlock.Lock{
Path: b.makePath("UserBucket"),
Name: lockprovider.METADATA_SET_CREATE_LOCK,
Target: *lockprovider.NewStringLockTarget(),
})
return b
}

+ 63
- 0
pkgs/distlock/reqbuilder/metadata_user_storage.go View File

@@ -0,0 +1,63 @@
package reqbuilder

import (
"gitlink.org.cn/cloudream/common/pkgs/distlock"
"gitlink.org.cn/cloudream/storage-common/pkgs/distlock/lockprovider"
)

type MetadataUserStorageLockReqBuilder struct {
*MetadataLockReqBuilder
}

func (b *MetadataLockReqBuilder) UserStorage() *MetadataUserStorageLockReqBuilder {
return &MetadataUserStorageLockReqBuilder{MetadataLockReqBuilder: b}
}

func (b *MetadataUserStorageLockReqBuilder) ReadOne(userID int64, storageID int64) *MetadataUserStorageLockReqBuilder {
b.locks = append(b.locks, distlock.Lock{
Path: b.makePath("UserStorage"),
Name: lockprovider.METADATA_ELEMENT_READ_LOCK,
Target: *lockprovider.NewStringLockTarget().Add(userID, storageID),
})
return b
}
func (b *MetadataUserStorageLockReqBuilder) WriteOne(userID int64, storageID int64) *MetadataUserStorageLockReqBuilder {
b.locks = append(b.locks, distlock.Lock{
Path: b.makePath("UserStorage"),
Name: lockprovider.METADATA_ELEMENT_WRITE_LOCK,
Target: *lockprovider.NewStringLockTarget().Add(userID, storageID),
})
return b
}
func (b *MetadataUserStorageLockReqBuilder) CreateOne(userID int64, storageID int64) *MetadataUserStorageLockReqBuilder {
b.locks = append(b.locks, distlock.Lock{
Path: b.makePath("UserStorage"),
Name: lockprovider.METADATA_ELEMENT_CREATE_LOCK,
Target: *lockprovider.NewStringLockTarget().Add(userID, storageID),
})
return b
}
func (b *MetadataUserStorageLockReqBuilder) ReadAny() *MetadataUserStorageLockReqBuilder {
b.locks = append(b.locks, distlock.Lock{
Path: b.makePath("UserStorage"),
Name: lockprovider.METADATA_SET_READ_LOCK,
Target: *lockprovider.NewStringLockTarget(),
})
return b
}
func (b *MetadataUserStorageLockReqBuilder) WriteAny() *MetadataUserStorageLockReqBuilder {
b.locks = append(b.locks, distlock.Lock{
Path: b.makePath("UserStorage"),
Name: lockprovider.METADATA_SET_WRITE_LOCK,
Target: *lockprovider.NewStringLockTarget(),
})
return b
}
func (b *MetadataUserStorageLockReqBuilder) CreateAny() *MetadataUserStorageLockReqBuilder {
b.locks = append(b.locks, distlock.Lock{
Path: b.makePath("UserStorage"),
Name: lockprovider.METADATA_SET_CREATE_LOCK,
Target: *lockprovider.NewStringLockTarget(),
})
return b
}

+ 74
- 0
pkgs/distlock/reqbuilder/storage.go View File

@@ -0,0 +1,74 @@
package reqbuilder

import (
"strconv"

"gitlink.org.cn/cloudream/common/pkgs/distlock"
"gitlink.org.cn/cloudream/storage-common/pkgs/distlock/lockprovider"
)

type StorageLockReqBuilder struct {
*LockRequestBuilder
}

func (b *LockRequestBuilder) Storage() *StorageLockReqBuilder {
return &StorageLockReqBuilder{LockRequestBuilder: b}
}

func (b *StorageLockReqBuilder) ReadOnePackage(storageID int64, userID int64, packageID int64) *StorageLockReqBuilder {
b.locks = append(b.locks, distlock.Lock{
Path: b.makePath(storageID),
Name: lockprovider.STORAGE_ELEMENT_READ_LOCK,
Target: *lockprovider.NewStringLockTarget().Add(userID, packageID),
})
return b
}

func (b *StorageLockReqBuilder) WriteOnePackage(storageID int64, userID int64, packageID int64) *StorageLockReqBuilder {
b.locks = append(b.locks, distlock.Lock{
Path: b.makePath(storageID),
Name: lockprovider.STORAGE_ELEMENT_WRITE_LOCK,
Target: *lockprovider.NewStringLockTarget().Add(userID, packageID),
})
return b
}

func (b *StorageLockReqBuilder) CreateOnePackage(storageID int64, userID int64, packageID int64) *StorageLockReqBuilder {
b.locks = append(b.locks, distlock.Lock{
Path: b.makePath(storageID),
Name: lockprovider.STORAGE_ELEMENT_WRITE_LOCK,
Target: *lockprovider.NewStringLockTarget().Add(userID, packageID),
})
return b
}

func (b *StorageLockReqBuilder) ReadAnyPackage(storageID int64) *StorageLockReqBuilder {
b.locks = append(b.locks, distlock.Lock{
Path: b.makePath(storageID),
Name: lockprovider.STORAGE_SET_READ_LOCK,
Target: *lockprovider.NewStringLockTarget(),
})
return b
}

func (b *StorageLockReqBuilder) WriteAnyPackage(storageID int64) *StorageLockReqBuilder {
b.locks = append(b.locks, distlock.Lock{
Path: b.makePath(storageID),
Name: lockprovider.STORAGE_SET_WRITE_LOCK,
Target: *lockprovider.NewStringLockTarget(),
})
return b
}

func (b *StorageLockReqBuilder) CreateAnyPackage(storageID int64) *StorageLockReqBuilder {
b.locks = append(b.locks, distlock.Lock{
Path: b.makePath(storageID),
Name: lockprovider.STORAGE_SET_CREATE_LOCK,
Target: *lockprovider.NewStringLockTarget(),
})
return b
}

func (b *StorageLockReqBuilder) makePath(storageID int64) []string {
return []string{lockprovider.StorageLockPathPrefix, strconv.FormatInt(storageID, 10)}
}

+ 62
- 0
pkgs/distlock/service.go View File

@@ -0,0 +1,62 @@
package distlock

import (
"gitlink.org.cn/cloudream/common/pkgs/distlock"
"gitlink.org.cn/cloudream/common/pkgs/distlock/service"
"gitlink.org.cn/cloudream/common/pkgs/trie"
"gitlink.org.cn/cloudream/storage-common/pkgs/distlock/lockprovider"
)

type Service = service.Service

func NewService(cfg *distlock.Config) (*service.Service, error) {
srv, err := service.NewService(cfg, initProviders())
if err != nil {
return nil, err
}

return srv, nil
}

func initProviders() []service.PathProvider {
var provs []service.PathProvider

provs = append(provs, initMetadataLockProviders()...)

provs = append(provs, initIPFSLockProviders()...)

provs = append(provs, initStorageLockProviders()...)

return provs
}

func initMetadataLockProviders() []service.PathProvider {
return []service.PathProvider{
service.NewPathProvider(lockprovider.NewMetadataLock(), lockprovider.MetadataLockPathPrefix, "Node"),
service.NewPathProvider(lockprovider.NewMetadataLock(), lockprovider.MetadataLockPathPrefix, "Storage"),
service.NewPathProvider(lockprovider.NewMetadataLock(), lockprovider.MetadataLockPathPrefix, "User"),
service.NewPathProvider(lockprovider.NewMetadataLock(), lockprovider.MetadataLockPathPrefix, "UserBucket"),
service.NewPathProvider(lockprovider.NewMetadataLock(), lockprovider.MetadataLockPathPrefix, "UserNode"),
service.NewPathProvider(lockprovider.NewMetadataLock(), lockprovider.MetadataLockPathPrefix, "UserStorage"),
service.NewPathProvider(lockprovider.NewMetadataLock(), lockprovider.MetadataLockPathPrefix, "Bucket"),
service.NewPathProvider(lockprovider.NewMetadataLock(), lockprovider.MetadataLockPathPrefix, "Object"),
service.NewPathProvider(lockprovider.NewMetadataLock(), lockprovider.MetadataLockPathPrefix, "Package"),
service.NewPathProvider(lockprovider.NewMetadataLock(), lockprovider.MetadataLockPathPrefix, "ObjectRep"),
service.NewPathProvider(lockprovider.NewMetadataLock(), lockprovider.MetadataLockPathPrefix, "ObjectBlock"),
service.NewPathProvider(lockprovider.NewMetadataLock(), lockprovider.MetadataLockPathPrefix, "Cache"),
service.NewPathProvider(lockprovider.NewMetadataLock(), lockprovider.MetadataLockPathPrefix, "StoragePackage"),
service.NewPathProvider(lockprovider.NewMetadataLock(), lockprovider.MetadataLockPathPrefix, "Location"),
}
}

func initIPFSLockProviders() []service.PathProvider {
return []service.PathProvider{
service.NewPathProvider(lockprovider.NewIPFSLock(), lockprovider.IPFSLockPathPrefix, trie.WORD_ANY),
}
}

func initStorageLockProviders() []service.PathProvider {
return []service.PathProvider{
service.NewPathProvider(lockprovider.NewStorageLock(), lockprovider.StorageLockPathPrefix, trie.WORD_ANY),
}
}

+ 1
- 1
pkgs/grpc/agent/pool.go View File

@@ -39,5 +39,5 @@ func (p *Pool) Acquire(ip string) (*PoolClient, error) {
} }


func (p *Pool) Release(cli *PoolClient) { func (p *Pool) Release(cli *PoolClient) {
cli.Close()
cli.Client.Close()
} }

+ 6
- 1
pkgs/iterator/ec_object_iterator.go View File

@@ -16,6 +16,8 @@ import (
) )


type ECObjectIterator struct { type ECObjectIterator struct {
OnClosing func()

objects []model.Object objects []model.Object
objectECData []models.ObjectECData objectECData []models.ObjectECData
currentIndex int currentIndex int
@@ -41,6 +43,7 @@ func NewECObjectIterator(objects []model.Object, objectECData []models.ObjectECD
} }


func (i *ECObjectIterator) MoveNext() (*IterDownloadingObject, error) { func (i *ECObjectIterator) MoveNext() (*IterDownloadingObject, error) {
// TODO 加锁
coorCli, err := globals.CoordinatorMQPool.Acquire() coorCli, err := globals.CoordinatorMQPool.Acquire()
if err != nil { if err != nil {
return nil, fmt.Errorf("new coordinator client: %w", err) return nil, fmt.Errorf("new coordinator client: %w", err)
@@ -123,7 +126,9 @@ func (iter *ECObjectIterator) doMove(coorCli *coormq.PoolClient) (*IterDownloadi
} }


func (i *ECObjectIterator) Close() { func (i *ECObjectIterator) Close() {

if i.OnClosing != nil {
i.OnClosing()
}
} }


// chooseDownloadNode 选择一个下载节点 // chooseDownloadNode 选择一个下载节点


+ 1
- 1
pkgs/iterator/local_uploading_iterator.go View File

@@ -52,7 +52,7 @@ func (i *LocalUploadingIterator) doMove() (*IterUploadingObject, error) {
} }


return &IterUploadingObject{ return &IterUploadingObject{
Path: strings.TrimPrefix(filepath.ToSlash(path), i.pathRoot),
Path: strings.TrimPrefix(filepath.ToSlash(path), i.pathRoot+"/"),
Size: info.Size(), Size: info.Size(),
File: file, File: file,
}, nil }, nil


+ 27
- 4
pkgs/iterator/rep_object_iterator.go View File

@@ -6,19 +6,21 @@ import (
"math/rand" "math/rand"


"github.com/samber/lo" "github.com/samber/lo"
"gitlink.org.cn/cloudream/common/pkgs/distlock/reqbuilder"
distsvc "gitlink.org.cn/cloudream/common/pkgs/distlock/service" distsvc "gitlink.org.cn/cloudream/common/pkgs/distlock/service"
"gitlink.org.cn/cloudream/common/pkgs/logger" "gitlink.org.cn/cloudream/common/pkgs/logger"
myio "gitlink.org.cn/cloudream/common/utils/io" myio "gitlink.org.cn/cloudream/common/utils/io"
"gitlink.org.cn/cloudream/storage-common/globals" "gitlink.org.cn/cloudream/storage-common/globals"
"gitlink.org.cn/cloudream/storage-common/models" "gitlink.org.cn/cloudream/storage-common/models"
"gitlink.org.cn/cloudream/storage-common/pkgs/db/model" "gitlink.org.cn/cloudream/storage-common/pkgs/db/model"
"gitlink.org.cn/cloudream/storage-common/pkgs/distlock/reqbuilder"
coormq "gitlink.org.cn/cloudream/storage-common/pkgs/mq/coordinator" coormq "gitlink.org.cn/cloudream/storage-common/pkgs/mq/coordinator"
) )


type DownloadingObjectIterator = Iterator[*IterDownloadingObject] type DownloadingObjectIterator = Iterator[*IterDownloadingObject]


type RepObjectIterator struct { type RepObjectIterator struct {
OnClosing func()

objects []model.Object objects []model.Object
objectRepData []models.ObjectRepData objectRepData []models.ObjectRepData
currentIndex int currentIndex int
@@ -51,6 +53,7 @@ func NewRepObjectIterator(objects []model.Object, objectRepData []models.ObjectR
} }


func (i *RepObjectIterator) MoveNext() (*IterDownloadingObject, error) { func (i *RepObjectIterator) MoveNext() (*IterDownloadingObject, error) {
// TODO 加锁
coorCli, err := globals.CoordinatorMQPool.Acquire() coorCli, err := globals.CoordinatorMQPool.Acquire()
if err != nil { if err != nil {
return nil, fmt.Errorf("new coordinator client: %w", err) return nil, fmt.Errorf("new coordinator client: %w", err)
@@ -116,7 +119,9 @@ func (i *RepObjectIterator) doMove(coorCli *coormq.PoolClient) (*IterDownloading
} }


func (i *RepObjectIterator) Close() { func (i *RepObjectIterator) Close() {

if i.OnClosing != nil {
i.OnClosing()
}
} }


// chooseDownloadNode 选择一个下载节点 // chooseDownloadNode 选择一个下载节点
@@ -135,7 +140,7 @@ func downloadFile(ctx *DownloadContext, nodeID int64, nodeIP string, fileHash st
if globals.IPFSPool != nil { if globals.IPFSPool != nil {
logger.Infof("try to use local IPFS to download file") logger.Infof("try to use local IPFS to download file")


reader, err := downloadFromLocalIPFS(fileHash)
reader, err := downloadFromLocalIPFS(ctx, fileHash)
if err == nil { if err == nil {
return reader, nil return reader, nil
} }
@@ -173,7 +178,22 @@ func downloadFromNode(ctx *DownloadContext, nodeID int64, nodeIP string, fileHas
return reader, nil return reader, nil
} }


func downloadFromLocalIPFS(fileHash string) (io.ReadCloser, error) {
func downloadFromLocalIPFS(ctx *DownloadContext, fileHash string) (io.ReadCloser, error) {
onClosed := func() {}
if globals.Local.NodeID != nil {
// 二次获取锁
mutex, err := reqbuilder.NewBuilder().
// 用于从IPFS下载文件
IPFS().ReadOneRep(*globals.Local.NodeID, fileHash).
MutexLock(ctx.Distlock)
if err != nil {
return nil, fmt.Errorf("acquire locks failed, err: %w", err)
}
onClosed = func() {
mutex.Unlock()
}
}

ipfsCli, err := globals.IPFSPool.Acquire() ipfsCli, err := globals.IPFSPool.Acquire()
if err != nil { if err != nil {
return nil, fmt.Errorf("new ipfs client: %w", err) return nil, fmt.Errorf("new ipfs client: %w", err)
@@ -184,5 +204,8 @@ func downloadFromLocalIPFS(fileHash string) (io.ReadCloser, error) {
return nil, fmt.Errorf("read ipfs file failed, err: %w", err) return nil, fmt.Errorf("read ipfs file failed, err: %w", err)
} }


reader = myio.AfterReadClosed(reader, func(io.ReadCloser) {
onClosed()
})
return reader, nil return reader, nil
} }

+ 20
- 20
pkgs/mq/agent/storage.go View File

@@ -7,9 +7,9 @@ import (
) )


type StorageService interface { type StorageService interface {
StartStorageMovePackage(msg *StartStorageMovePackage) (*StartStorageMovePackageResp, *mq.CodeMessage)
StartStorageLoadPackage(msg *StartStorageLoadPackage) (*StartStorageLoadPackageResp, *mq.CodeMessage)


WaitStorageMovePackage(msg *WaitStorageMovePackage) (*WaitStorageMovePackageResp, *mq.CodeMessage)
WaitStorageLoadPackage(msg *WaitStorageLoadPackage) (*WaitStorageLoadPackageResp, *mq.CodeMessage)


StorageCheck(msg *StorageCheck) (*StorageCheckResp, *mq.CodeMessage) StorageCheck(msg *StorageCheck) (*StorageCheckResp, *mq.CodeMessage)


@@ -19,59 +19,59 @@ type StorageService interface {
} }


// 启动调度Package的任务 // 启动调度Package的任务
var _ = Register(StorageService.StartStorageMovePackage)
var _ = Register(StorageService.StartStorageLoadPackage)


type StartStorageMovePackage struct {
type StartStorageLoadPackage struct {
UserID int64 `json:"userID"` UserID int64 `json:"userID"`
PackageID int64 `json:"packageID"` PackageID int64 `json:"packageID"`
StorageID int64 `json:"storageID"` StorageID int64 `json:"storageID"`
} }
type StartStorageMovePackageResp struct {
type StartStorageLoadPackageResp struct {
TaskID string `json:"taskID"` TaskID string `json:"taskID"`
} }


func NewStartStorageMovePackage(userID int64, packageID int64, storageID int64) StartStorageMovePackage {
return StartStorageMovePackage{
func NewStartStorageLoadPackage(userID int64, packageID int64, storageID int64) StartStorageLoadPackage {
return StartStorageLoadPackage{
UserID: userID, UserID: userID,
PackageID: packageID, PackageID: packageID,
StorageID: storageID, StorageID: storageID,
} }
} }
func NewStartStorageMovePackageResp(taskID string) StartStorageMovePackageResp {
return StartStorageMovePackageResp{
func NewStartStorageLoadPackageResp(taskID string) StartStorageLoadPackageResp {
return StartStorageLoadPackageResp{
TaskID: taskID, TaskID: taskID,
} }
} }
func (client *Client) StartStorageMovePackage(msg StartStorageMovePackage, opts ...mq.RequestOption) (*StartStorageMovePackageResp, error) {
return mq.Request[StartStorageMovePackageResp](client.rabbitCli, msg, opts...)
func (client *Client) StartStorageLoadPackage(msg StartStorageLoadPackage, opts ...mq.RequestOption) (*StartStorageLoadPackageResp, error) {
return mq.Request[StartStorageLoadPackageResp](client.rabbitCli, msg, opts...)
} }


// 等待调度Package的任务 // 等待调度Package的任务
var _ = Register(StorageService.WaitStorageMovePackage)
var _ = Register(StorageService.WaitStorageLoadPackage)


type WaitStorageMovePackage struct {
type WaitStorageLoadPackage struct {
TaskID string `json:"taskID"` TaskID string `json:"taskID"`
WaitTimeoutMs int64 `json:"waitTimeout"` WaitTimeoutMs int64 `json:"waitTimeout"`
} }
type WaitStorageMovePackageResp struct {
type WaitStorageLoadPackageResp struct {
IsComplete bool `json:"isComplete"` IsComplete bool `json:"isComplete"`
Error string `json:"error"` Error string `json:"error"`
} }


func NewWaitStorageMovePackage(taskID string, waitTimeoutMs int64) WaitStorageMovePackage {
return WaitStorageMovePackage{
func NewWaitStorageLoadPackage(taskID string, waitTimeoutMs int64) WaitStorageLoadPackage {
return WaitStorageLoadPackage{
TaskID: taskID, TaskID: taskID,
WaitTimeoutMs: waitTimeoutMs, WaitTimeoutMs: waitTimeoutMs,
} }
} }
func NewWaitStorageMovePackageResp(isComplete bool, err string) WaitStorageMovePackageResp {
return WaitStorageMovePackageResp{
func NewWaitStorageLoadPackageResp(isComplete bool, err string) WaitStorageLoadPackageResp {
return WaitStorageLoadPackageResp{
IsComplete: isComplete, IsComplete: isComplete,
Error: err, Error: err,
} }
} }
func (client *Client) WaitStorageMovePackage(msg WaitStorageMovePackage, opts ...mq.RequestOption) (*WaitStorageMovePackageResp, error) {
return mq.Request[WaitStorageMovePackageResp](client.rabbitCli, msg, opts...)
func (client *Client) WaitStorageLoadPackage(msg WaitStorageLoadPackage, opts ...mq.RequestOption) (*WaitStorageLoadPackageResp, error) {
return mq.Request[WaitStorageLoadPackageResp](client.rabbitCli, msg, opts...)
} }


// 检查Storage // 检查Storage


+ 10
- 10
pkgs/mq/coordinator/storage.go View File

@@ -8,7 +8,7 @@ import (
type StorageService interface { type StorageService interface {
GetStorageInfo(msg *GetStorageInfo) (*GetStorageInfoResp, *mq.CodeMessage) GetStorageInfo(msg *GetStorageInfo) (*GetStorageInfoResp, *mq.CodeMessage)


PackageMovedToStorage(msg *PackageMovedToStorage) (*PackageMovedToStorageResp, *mq.CodeMessage)
StoragePackageLoaded(msg *StoragePackageLoaded) (*StoragePackageLoadedResp, *mq.CodeMessage)
} }


// 获取Storage信息 // 获取Storage信息
@@ -44,25 +44,25 @@ func (client *Client) GetStorageInfo(msg GetStorageInfo) (*GetStorageInfoResp, e
} }


// 提交调度记录 // 提交调度记录
var _ = Register(StorageService.PackageMovedToStorage)
var _ = Register(StorageService.StoragePackageLoaded)


type PackageMovedToStorage struct {
type StoragePackageLoaded struct {
UserID int64 `json:"userID"` UserID int64 `json:"userID"`
PackageID int64 `json:"packageID"` PackageID int64 `json:"packageID"`
StorageID int64 `json:"storageID"` StorageID int64 `json:"storageID"`
} }
type PackageMovedToStorageResp struct{}
type StoragePackageLoadedResp struct{}


func NewPackageMovedToStorage(userID int64, packageID int64, stgID int64) PackageMovedToStorage {
return PackageMovedToStorage{
func NewStoragePackageLoaded(userID int64, packageID int64, stgID int64) StoragePackageLoaded {
return StoragePackageLoaded{
UserID: userID, UserID: userID,
PackageID: packageID, PackageID: packageID,
StorageID: stgID, StorageID: stgID,
} }
} }
func NewPackageMovedToStorageResp() PackageMovedToStorageResp {
return PackageMovedToStorageResp{}
func NewStoragePackageLoadedResp() StoragePackageLoadedResp {
return StoragePackageLoadedResp{}
} }
func (client *Client) PackageMovedToStorage(msg PackageMovedToStorage) (*PackageMovedToStorageResp, error) {
return mq.Request[PackageMovedToStorageResp](client.rabbitCli, msg)
func (client *Client) StoragePackageLoaded(msg StoragePackageLoaded) (*StoragePackageLoadedResp, error) {
return mq.Request[StoragePackageLoadedResp](client.rabbitCli, msg)
} }

+ 3
- 14
utils/utils.go View File

@@ -2,20 +2,9 @@ package utils


import ( import (
"fmt" "fmt"
"strings"
) )


// MakeStorageMovePackageDirName Move操作时,写入的文件的名称
func MakeStorageMovePackageDirName(objectID int64, userID int64) string {
return fmt.Sprintf("%d-%d", objectID, userID)
}

// GetDirectoryName 根据objectName获取所属的文件夹名
func GetDirectoryName(objectName string) string {
parts := strings.Split(objectName, "/")
//若为文件,dirName设置为空
if len(parts) == 1 {
return ""
}
return parts[0]
// MakeStorageLoadPackageDirName Load操作时,写入的文件夹的名称
func MakeStorageLoadPackageDirName(packageID int64, userID int64) string {
return fmt.Sprintf("%d-%d", packageID, userID)
} }

Loading…
Cancel
Save