You can not select more than 25 topics Topics must start with a chinese character,a letter or number, can include dashes ('-') and can be up to 35 characters long.

storage.go 6.4 kB

10 months ago
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191
  1. package services
  2. import (
  3. "context"
  4. "fmt"
  5. "path"
  6. "gitlink.org.cn/cloudream/common/pkgs/ioswitch/exec"
  7. cdssdk "gitlink.org.cn/cloudream/common/sdks/storage"
  8. stgglb "gitlink.org.cn/cloudream/storage2/common/globals"
  9. stgmod "gitlink.org.cn/cloudream/storage2/common/models"
  10. "gitlink.org.cn/cloudream/storage2/common/pkgs/db2/model"
  11. "gitlink.org.cn/cloudream/storage2/common/pkgs/distlock/reqbuilder"
  12. "gitlink.org.cn/cloudream/storage2/common/pkgs/downloader/strategy"
  13. "gitlink.org.cn/cloudream/storage2/common/pkgs/ioswitch2"
  14. "gitlink.org.cn/cloudream/storage2/common/pkgs/ioswitch2/parser"
  15. agtmq "gitlink.org.cn/cloudream/storage2/common/pkgs/mq/agent"
  16. coormq "gitlink.org.cn/cloudream/storage2/common/pkgs/mq/coordinator"
  17. "gitlink.org.cn/cloudream/storage2/common/pkgs/storage/factory"
  18. )
  19. type StorageService struct {
  20. *Service
  21. }
  22. func (svc *Service) StorageSvc() *StorageService {
  23. return &StorageService{Service: svc}
  24. }
  25. func (svc *StorageService) Get(userID cdssdk.UserID, storageID cdssdk.StorageID) (*model.Storage, error) {
  26. coorCli, err := stgglb.CoordinatorMQPool.Acquire()
  27. if err != nil {
  28. return nil, fmt.Errorf("new coordinator client: %w", err)
  29. }
  30. defer stgglb.CoordinatorMQPool.Release(coorCli)
  31. getResp, err := coorCli.GetStorage(coormq.ReqGetStorage(userID, storageID))
  32. if err != nil {
  33. return nil, fmt.Errorf("request to coordinator: %w", err)
  34. }
  35. return &getResp.Storage, nil
  36. }
  37. func (svc *StorageService) GetByName(userID cdssdk.UserID, name string) (*model.Storage, error) {
  38. coorCli, err := stgglb.CoordinatorMQPool.Acquire()
  39. if err != nil {
  40. return nil, fmt.Errorf("new coordinator client: %w", err)
  41. }
  42. defer stgglb.CoordinatorMQPool.Release(coorCli)
  43. getResp, err := coorCli.GetStorageByName(coormq.ReqGetStorageByName(userID, name))
  44. if err != nil {
  45. return nil, fmt.Errorf("request to coordinator: %w", err)
  46. }
  47. return &getResp.Storage, nil
  48. }
  49. func (svc *StorageService) GetDetails(stgIDs []cdssdk.StorageID) ([]*stgmod.StorageDetail, error) {
  50. coorCli, err := stgglb.CoordinatorMQPool.Acquire()
  51. if err != nil {
  52. return nil, fmt.Errorf("new coordinator client: %w", err)
  53. }
  54. defer stgglb.CoordinatorMQPool.Release(coorCli)
  55. getResp, err := coorCli.GetStorageDetails(coormq.ReqGetStorageDetails(stgIDs))
  56. if err != nil {
  57. return nil, fmt.Errorf("request to coordinator: %w", err)
  58. }
  59. return getResp.Storages, nil
  60. }
  61. func (svc *StorageService) LoadPackage(userID cdssdk.UserID, packageID cdssdk.PackageID, storageID cdssdk.StorageID, rootPath string) error {
  62. coorCli, err := stgglb.CoordinatorMQPool.Acquire()
  63. if err != nil {
  64. return fmt.Errorf("new coordinator client: %w", err)
  65. }
  66. defer stgglb.CoordinatorMQPool.Release(coorCli)
  67. destStg := svc.StorageMeta.Get(storageID)
  68. if destStg == nil {
  69. return fmt.Errorf("storage not found: %d", storageID)
  70. }
  71. if destStg.MasterHub == nil {
  72. return fmt.Errorf("storage %v has no master hub", storageID)
  73. }
  74. details, err := coorCli.GetPackageObjectDetails(coormq.ReqGetPackageObjectDetails(packageID))
  75. if err != nil {
  76. return err
  77. }
  78. var pinned []cdssdk.ObjectID
  79. plans := exec.NewPlanBuilder()
  80. for _, obj := range details.Objects {
  81. strg, err := svc.StrategySelector.Select(strategy.Request{
  82. Detail: obj,
  83. DestHub: destStg.MasterHub.HubID,
  84. })
  85. if err != nil {
  86. return fmt.Errorf("select download strategy: %w", err)
  87. }
  88. ft := ioswitch2.NewFromTo()
  89. switch strg := strg.(type) {
  90. case *strategy.DirectStrategy:
  91. ft.AddFrom(ioswitch2.NewFromShardstore(strg.Detail.Object.FileHash, *strg.Storage.MasterHub, strg.Storage, ioswitch2.RawStream()))
  92. case *strategy.ECReconstructStrategy:
  93. for i, b := range strg.Blocks {
  94. ft.AddFrom(ioswitch2.NewFromShardstore(b.FileHash, *strg.Storages[i].MasterHub, strg.Storages[i], ioswitch2.ECStream(b.Index)))
  95. ft.ECParam = &strg.Redundancy
  96. }
  97. default:
  98. return fmt.Errorf("unsupported download strategy: %T", strg)
  99. }
  100. ft.AddTo(ioswitch2.NewLoadToPublic(*destStg.MasterHub, *destStg, path.Join(rootPath, obj.Object.Path)))
  101. // 顺便保存到同存储服务的分片存储中
  102. if factory.GetBuilder(*destStg).ShardStoreDesc().Enabled() {
  103. ft.AddTo(ioswitch2.NewToShardStore(*destStg.MasterHub, *destStg, ioswitch2.RawStream(), ""))
  104. pinned = append(pinned, obj.Object.ObjectID)
  105. }
  106. err = parser.Parse(ft, plans)
  107. if err != nil {
  108. return fmt.Errorf("parse plan: %w", err)
  109. }
  110. }
  111. mutex, err := reqbuilder.NewBuilder().
  112. // 保护在storage目录中下载的文件
  113. Storage().Buzy(storageID).
  114. // 保护下载文件时同时保存到IPFS的文件
  115. Shard().Buzy(storageID).
  116. MutexLock(svc.DistLock)
  117. if err != nil {
  118. return fmt.Errorf("acquire locks failed, err: %w", err)
  119. }
  120. // 记录访问统计
  121. for _, obj := range details.Objects {
  122. svc.AccessStat.AddAccessCounter(obj.Object.ObjectID, packageID, storageID, 1)
  123. }
  124. defer mutex.Unlock()
  125. drv := plans.Execute(exec.NewExecContext())
  126. _, err = drv.Wait(context.Background())
  127. if err != nil {
  128. return err
  129. }
  130. // 失败也没关系
  131. coorCli.StoragePackageLoaded(coormq.ReqStoragePackageLoaded(userID, storageID, packageID, rootPath, pinned))
  132. return nil
  133. }
  134. // 请求节点启动从Storage中上传文件的任务。会返回节点ID和任务ID
  135. func (svc *StorageService) StorageCreatePackage(userID cdssdk.UserID, bucketID cdssdk.BucketID, name string, storageID cdssdk.StorageID, path string, storageAffinity cdssdk.StorageID) (cdssdk.Package, error) {
  136. coorCli, err := stgglb.CoordinatorMQPool.Acquire()
  137. if err != nil {
  138. return cdssdk.Package{}, fmt.Errorf("new coordinator client: %w", err)
  139. }
  140. defer stgglb.CoordinatorMQPool.Release(coorCli)
  141. stgResp, err := coorCli.GetStorageDetails(coormq.ReqGetStorageDetails([]cdssdk.StorageID{storageID}))
  142. if err != nil {
  143. return cdssdk.Package{}, fmt.Errorf("getting storage info: %w", err)
  144. }
  145. if stgResp.Storages[0].Storage.ShardStore == nil {
  146. return cdssdk.Package{}, fmt.Errorf("shard storage is not enabled")
  147. }
  148. agentCli, err := stgglb.AgentMQPool.Acquire(stgResp.Storages[0].MasterHub.HubID)
  149. if err != nil {
  150. return cdssdk.Package{}, fmt.Errorf("new agent client: %w", err)
  151. }
  152. defer stgglb.AgentMQPool.Release(agentCli)
  153. createResp, err := agentCli.StorageCreatePackage(agtmq.ReqStorageCreatePackage(userID, bucketID, name, storageID, path, storageAffinity))
  154. if err != nil {
  155. return cdssdk.Package{}, err
  156. }
  157. return createResp.Package, nil
  158. }

本项目旨在将云际存储公共基础设施化,使个人及企业可低门槛使用高效的云际存储服务(安装开箱即用云际存储客户端即可,无需关注其他组件的部署),同时支持用户灵活便捷定制云际存储的功能细节。