You can not select more than 25 topics Topics must start with a chinese character,a letter or number, can include dashes ('-') and can be up to 35 characters long.

storage.go 5.4 kB

7 months ago
7 months ago
7 months ago
7 months ago
7 months ago
7 months ago
7 months ago
7 months ago
7 months ago
7 months ago
7 months ago
7 months ago
7 months ago
10 months ago
7 months ago
7 months ago
7 months ago
7 months ago
7 months ago
7 months ago
7 months ago
7 months ago
7 months ago
7 months ago
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158
  1. package services
  2. import (
  3. "context"
  4. "fmt"
  5. "path"
  6. "gitlink.org.cn/cloudream/common/pkgs/ioswitch/exec"
  7. cdssdk "gitlink.org.cn/cloudream/storage2/client/types"
  8. "gitlink.org.cn/cloudream/storage2/client/types"
  9. stgglb "gitlink.org.cn/cloudream/storage2/common/globals"
  10. stgmod "gitlink.org.cn/cloudream/storage2/common/models"
  11. "gitlink.org.cn/cloudream/storage2/common/pkgs/distlock/reqbuilder"
  12. "gitlink.org.cn/cloudream/storage2/common/pkgs/downloader/strategy"
  13. "gitlink.org.cn/cloudream/storage2/common/pkgs/ioswitch2"
  14. "gitlink.org.cn/cloudream/storage2/common/pkgs/ioswitch2/parser"
  15. agtmq "gitlink.org.cn/cloudream/storage2/common/pkgs/mq/agent"
  16. coormq "gitlink.org.cn/cloudream/storage2/common/pkgs/mq/coordinator"
  17. "gitlink.org.cn/cloudream/storage2/common/pkgs/storage/factory"
  18. )
  19. type UserSpaceService struct {
  20. *Service
  21. }
  22. func (svc *Service) UserSpaceSvc() *UserSpaceService {
  23. return &UserSpaceService{Service: svc}
  24. }
  25. func (svc *UserSpaceService) Get(userspaceID cdssdk.UserSpaceID) (types.UserSpace, error) {
  26. return svc.DB.UserSpace().GetByID(svc.DB.DefCtx(), userspaceID)
  27. }
  28. func (svc *UserSpaceService) GetByName(name string) (types.UserSpace, error) {
  29. return svc.DB.UserSpace().GetByName(svc.DB.DefCtx(), name)
  30. }
  31. func (svc *UserSpaceService) GetDetails(stgIDs []cdssdk.UserSpaceID) ([]*stgmod.UserSpaceDetail, error) {
  32. }
  33. func (svc *UserSpaceService) LoadPackage(packageID cdssdk.PackageID, userspaceID cdssdk.UserSpaceID, rootPath string) error {
  34. coorCli, err := stgglb.CoordinatorMQPool.Acquire()
  35. if err != nil {
  36. return fmt.Errorf("new coordinator client: %w", err)
  37. }
  38. defer stgglb.CoordinatorMQPool.Release(coorCli)
  39. destStg := svc.UserSpaceMeta.Get(userspaceID)
  40. if destStg == nil {
  41. return fmt.Errorf("userspace not found: %d", userspaceID)
  42. }
  43. if destStg.MasterHub == nil {
  44. return fmt.Errorf("userspace %v has no master hub", userspaceID)
  45. }
  46. details, err := coorCli.GetPackageObjectDetails(coormq.ReqGetPackageObjectDetails(packageID))
  47. if err != nil {
  48. return err
  49. }
  50. var pinned []cdssdk.ObjectID
  51. plans := exec.NewPlanBuilder()
  52. for _, obj := range details.Objects {
  53. strg, err := svc.StrategySelector.Select(strategy.Request{
  54. Detail: obj,
  55. DestHub: destStg.MasterHub.HubID,
  56. })
  57. if err != nil {
  58. return fmt.Errorf("select download strategy: %w", err)
  59. }
  60. ft := ioswitch2.NewFromTo()
  61. switch strg := strg.(type) {
  62. case *strategy.DirectStrategy:
  63. ft.AddFrom(ioswitch2.NewFromShardstore(strg.Detail.Object.FileHash, *strg.UserSpace.MasterHub, strg.UserSpace, ioswitch2.RawStream()))
  64. case *strategy.ECReconstructStrategy:
  65. for i, b := range strg.Blocks {
  66. ft.AddFrom(ioswitch2.NewFromShardstore(b.FileHash, *strg.UserSpaces[i].MasterHub, strg.UserSpaces[i], ioswitch2.ECStream(b.Index)))
  67. ft.ECParam = &strg.Redundancy
  68. }
  69. default:
  70. return fmt.Errorf("unsupported download strategy: %T", strg)
  71. }
  72. ft.AddTo(ioswitch2.NewLoadToPublic(*destStg.MasterHub, *destStg, path.Join(rootPath, obj.Object.Path)))
  73. // 顺便保存到同存储服务的分片存储中
  74. if factory.GetBuilder(*destStg).ShardStoreDesc().Enabled() {
  75. ft.AddTo(ioswitch2.NewToShardStore(*destStg.MasterHub, *destStg, ioswitch2.RawStream(), ""))
  76. pinned = append(pinned, obj.Object.ObjectID)
  77. }
  78. err = parser.Parse(ft, plans)
  79. if err != nil {
  80. return fmt.Errorf("parse plan: %w", err)
  81. }
  82. }
  83. mutex, err := reqbuilder.NewBuilder().
  84. // 保护在userspace目录中下载的文件
  85. UserSpace().Buzy(userspaceID).
  86. // 保护下载文件时同时保存到IPFS的文件
  87. Shard().Buzy(userspaceID).
  88. MutexLock(svc.DistLock)
  89. if err != nil {
  90. return fmt.Errorf("acquire locks failed, err: %w", err)
  91. }
  92. // 记录访问统计
  93. for _, obj := range details.Objects {
  94. svc.AccessStat.AddAccessCounter(obj.Object.ObjectID, packageID, userspaceID, 1)
  95. }
  96. defer mutex.Unlock()
  97. drv := plans.Execute(exec.NewExecContext())
  98. _, err = drv.Wait(context.Background())
  99. if err != nil {
  100. return err
  101. }
  102. // 失败也没关系
  103. coorCli.UserSpacePackageLoaded(coormq.ReqUserSpacePackageLoaded(userID, userspaceID, packageID, rootPath, pinned))
  104. return nil
  105. }
  106. // 请求节点启动从UserSpace中上传文件的任务。会返回节点ID和任务ID
  107. func (svc *UserSpaceService) UserSpaceCreatePackage(bucketID cdssdk.BucketID, name string, userspaceID cdssdk.UserSpaceID, path string, userspaceAffinity cdssdk.UserSpaceID) (cdssdk.Package, error) {
  108. coorCli, err := stgglb.CoordinatorMQPool.Acquire()
  109. if err != nil {
  110. return cdssdk.Package{}, fmt.Errorf("new coordinator client: %w", err)
  111. }
  112. defer stgglb.CoordinatorMQPool.Release(coorCli)
  113. stgResp, err := coorCli.GetUserSpaceDetails(coormq.ReqGetUserSpaceDetails([]cdssdk.UserSpaceID{userspaceID}))
  114. if err != nil {
  115. return cdssdk.Package{}, fmt.Errorf("getting userspace info: %w", err)
  116. }
  117. if stgResp.UserSpaces[0].UserSpace.ShardStore == nil {
  118. return cdssdk.Package{}, fmt.Errorf("shard userspace is not enabled")
  119. }
  120. agentCli, err := stgglb.AgentMQPool.Acquire(stgResp.UserSpaces[0].MasterHub.HubID)
  121. if err != nil {
  122. return cdssdk.Package{}, fmt.Errorf("new agent client: %w", err)
  123. }
  124. defer stgglb.AgentMQPool.Release(agentCli)
  125. createResp, err := agentCli.UserSpaceCreatePackage(agtmq.ReqUserSpaceCreatePackage(bucketID, name, userspaceID, path, userspaceAffinity))
  126. if err != nil {
  127. return cdssdk.Package{}, err
  128. }
  129. return createResp.Package, nil
  130. }

本项目旨在将云际存储公共基础设施化,使个人及企业可低门槛使用高效的云际存储服务(安装开箱即用云际存储客户端即可,无需关注其他组件的部署),同时支持用户灵活便捷定制云际存储的功能细节。