You can not select more than 25 topics Topics must start with a chinese character,a letter or number, can include dashes ('-') and can be up to 35 characters long.

user_space.go 8.3 kB

7 months ago
7 months ago
7 months ago
7 months ago
7 months ago
7 months ago
7 months ago
7 months ago
7 months ago
7 months ago
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271
  1. package services
  2. import (
  3. "context"
  4. "fmt"
  5. "path"
  6. "strings"
  7. "gitlink.org.cn/cloudream/common/pkgs/ioswitch/exec"
  8. "gitlink.org.cn/cloudream/common/pkgs/logger"
  9. "gitlink.org.cn/cloudream/common/pkgs/trie"
  10. cdssdk "gitlink.org.cn/cloudream/common/sdks/storage"
  11. clitypes "gitlink.org.cn/cloudream/jcs-pub/client/types"
  12. "gitlink.org.cn/cloudream/jcs-pub/client/internal/db"
  13. "gitlink.org.cn/cloudream/jcs-pub/client/internal/downloader/strategy"
  14. stgglb "gitlink.org.cn/cloudream/jcs-pub/common/globals"
  15. "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/ioswitch2"
  16. "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/ioswitch2/parser"
  17. hubmq "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/mq/hub"
  18. "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/storage/types"
  19. )
  20. type UserSpaceService struct {
  21. *Service
  22. }
  23. func (svc *Service) UserSpaceSvc() *UserSpaceService {
  24. return &UserSpaceService{Service: svc}
  25. }
  26. func (svc *UserSpaceService) Get(userspaceID clitypes.UserSpaceID) (clitypes.UserSpace, error) {
  27. return svc.DB.UserSpace().GetByID(svc.DB.DefCtx(), userspaceID)
  28. }
  29. func (svc *UserSpaceService) GetByName(name string) (clitypes.UserSpace, error) {
  30. return svc.DB.UserSpace().GetByName(svc.DB.DefCtx(), name)
  31. }
  32. func (svc *UserSpaceService) LoadPackage(packageID clitypes.PackageID, userspaceID clitypes.UserSpaceID, rootPath string) error {
  33. coorCli, err := stgglb.CoordinatorMQPool.Acquire()
  34. if err != nil {
  35. return fmt.Errorf("new coordinator client: %w", err)
  36. }
  37. defer stgglb.CoordinatorMQPool.Release(coorCli)
  38. destStg := svc.UserSpaceMeta.Get(userspaceID)
  39. if destStg == nil {
  40. return fmt.Errorf("userspace not found: %d", userspaceID)
  41. }
  42. if destStg.MasterHub == nil {
  43. return fmt.Errorf("userspace %v has no master hub", userspaceID)
  44. }
  45. details, err := db.DoTx11(svc.DB, svc.DB.Object().GetPackageObjectDetails, packageID)
  46. if err != nil {
  47. return err
  48. }
  49. var pinned []clitypes.ObjectID
  50. plans := exec.NewPlanBuilder()
  51. for _, obj := range details {
  52. strg, err := svc.StrategySelector.Select(strategy.Request{
  53. Detail: obj,
  54. DestHub: destStg.MasterHub.HubID,
  55. })
  56. if err != nil {
  57. return fmt.Errorf("select download strategy: %w", err)
  58. }
  59. ft := ioswitch2.NewFromTo()
  60. switch strg := strg.(type) {
  61. case *strategy.DirectStrategy:
  62. ft.AddFrom(ioswitch2.NewFromShardstore(strg.Detail.Object.FileHash, *strg.UserSpace.MasterHub, strg.UserSpace, ioswitch2.RawStream()))
  63. case *strategy.ECReconstructStrategy:
  64. for i, b := range strg.Blocks {
  65. ft.AddFrom(ioswitch2.NewFromShardstore(b.FileHash, *strg.UserSpaces[i].MasterHub, strg.UserSpaces[i], ioswitch2.ECStream(b.Index)))
  66. ft.ECParam = &strg.Redundancy
  67. }
  68. default:
  69. return fmt.Errorf("unsupported download strategy: %T", strg)
  70. }
  71. ft.AddTo(ioswitch2.NewToPublicStore(*destStg.MasterHub, *destStg, path.Join(rootPath, obj.Object.Path)))
  72. // 顺便保存到同存储服务的分片存储中
  73. if destStg.UserSpace.ShardStore != nil {
  74. ft.AddTo(ioswitch2.NewToShardStore(*destStg.MasterHub, *destStg, ioswitch2.RawStream(), ""))
  75. pinned = append(pinned, obj.Object.ObjectID)
  76. }
  77. err = parser.Parse(ft, plans)
  78. if err != nil {
  79. return fmt.Errorf("parse plan: %w", err)
  80. }
  81. }
  82. // TODO2 加锁
  83. // mutex, err := reqbuilder.NewBuilder().
  84. // // 保护在userspace目录中下载的文件
  85. // UserSpace().Buzy(userspaceID).
  86. // // 保护下载文件时同时保存到IPFS的文件
  87. // Shard().Buzy(userspaceID).
  88. // MutexLock(svc.DistLock)
  89. // if err != nil {
  90. // return fmt.Errorf("acquire locks failed, err: %w", err)
  91. // }
  92. // defer mutex.Unlock()
  93. // 记录访问统计
  94. for _, obj := range details {
  95. svc.AccessStat.AddAccessCounter(obj.Object.ObjectID, packageID, userspaceID, 1)
  96. }
  97. drv := plans.Execute(exec.NewExecContext())
  98. _, err = drv.Wait(context.Background())
  99. if err != nil {
  100. return err
  101. }
  102. return nil
  103. }
  104. func (svc *UserSpaceService) SpaceToSpace(srcSpaceID clitypes.UserSpaceID, srcPath string, dstSpaceID clitypes.UserSpaceID, dstPath string) (clitypes.SpaceToSpaceResult, error) {
  105. srcSpace := svc.UserSpaceMeta.Get(srcSpaceID)
  106. if srcSpace == nil {
  107. return clitypes.SpaceToSpaceResult{}, fmt.Errorf("source userspace not found: %d", srcSpaceID)
  108. }
  109. if srcSpace.MasterHub == nil {
  110. return clitypes.SpaceToSpaceResult{}, fmt.Errorf("source userspace %v has no master hub", srcSpaceID)
  111. }
  112. srcSpaceCli, err := stgglb.HubMQPool.Acquire(srcSpace.MasterHub.HubID)
  113. if err != nil {
  114. return clitypes.SpaceToSpaceResult{}, fmt.Errorf("new source userspace client: %w", err)
  115. }
  116. defer stgglb.HubMQPool.Release(srcSpaceCli)
  117. dstSpace := svc.UserSpaceMeta.Get(dstSpaceID)
  118. if dstSpace == nil {
  119. return clitypes.SpaceToSpaceResult{}, fmt.Errorf("destination userspace not found: %d", dstSpaceID)
  120. }
  121. if dstSpace.MasterHub == nil {
  122. return clitypes.SpaceToSpaceResult{}, fmt.Errorf("destination userspace %v has no master hub", dstSpaceID)
  123. }
  124. dstSpaceCli, err := stgglb.HubMQPool.Acquire(dstSpace.MasterHub.HubID)
  125. if err != nil {
  126. return clitypes.SpaceToSpaceResult{}, fmt.Errorf("new destination userspace client: %w", err)
  127. }
  128. defer stgglb.HubMQPool.Release(dstSpaceCli)
  129. srcPath = strings.Trim(srcPath, cdssdk.ObjectPathSeparator)
  130. dstPath = strings.Trim(dstPath, cdssdk.ObjectPathSeparator)
  131. if srcPath == "" {
  132. return clitypes.SpaceToSpaceResult{}, fmt.Errorf("source path is empty")
  133. }
  134. if dstPath == "" {
  135. return clitypes.SpaceToSpaceResult{}, fmt.Errorf("destination path is empty")
  136. }
  137. listAllResp, err := srcSpaceCli.PublicStoreListAll(&hubmq.PublicStoreListAll{
  138. UserSpace: *srcSpace,
  139. Path: srcPath,
  140. })
  141. if err != nil {
  142. return clitypes.SpaceToSpaceResult{}, fmt.Errorf("list all from source userspace: %w", err)
  143. }
  144. srcPathComps := clitypes.SplitObjectPath(srcPath)
  145. srcDirCompLen := len(srcPathComps) - 1
  146. entryTree := trie.NewTrie[*types.PublicStoreEntry]()
  147. for _, e := range listAllResp.Entries {
  148. pa, ok := strings.CutSuffix(e.Path, clitypes.ObjectPathSeparator)
  149. comps := clitypes.SplitObjectPath(pa)
  150. e.Path = pa
  151. e2 := e
  152. entryTree.CreateWords(comps[srcDirCompLen:]).Value = &e2
  153. e2.IsDir = e2.IsDir || ok
  154. }
  155. entryTree.Iterate(func(path []string, node *trie.Node[*types.PublicStoreEntry], isWordNode bool) trie.VisitCtrl {
  156. if node.Value == nil {
  157. return trie.VisitContinue
  158. }
  159. if node.Value.IsDir && len(node.WordNexts) > 0 {
  160. node.Value = nil
  161. return trie.VisitContinue
  162. }
  163. if !node.Value.IsDir && len(node.WordNexts) == 0 {
  164. node.WordNexts = nil
  165. }
  166. return trie.VisitContinue
  167. })
  168. var filePathes []string
  169. var dirPathes []string
  170. entryTree.Iterate(func(path []string, node *trie.Node[*types.PublicStoreEntry], isWordNode bool) trie.VisitCtrl {
  171. if node.Value == nil {
  172. return trie.VisitContinue
  173. }
  174. if node.Value.IsDir {
  175. dirPathes = append(dirPathes, node.Value.Path)
  176. } else {
  177. filePathes = append(filePathes, node.Value.Path)
  178. }
  179. return trie.VisitContinue
  180. })
  181. var success []string
  182. var failed []string
  183. for _, f := range filePathes {
  184. newPath := strings.Replace(f, srcPath, dstPath, 1)
  185. ft := ioswitch2.NewFromTo()
  186. ft.AddFrom(ioswitch2.NewFromPublicStore(*srcSpace.MasterHub, *srcSpace, f))
  187. ft.AddTo(ioswitch2.NewToPublicStore(*dstSpace.MasterHub, *dstSpace, newPath))
  188. plans := exec.NewPlanBuilder()
  189. err = parser.Parse(ft, plans)
  190. if err != nil {
  191. failed = append(failed, f)
  192. logger.Warnf("s2s: parse plan of file %v: %v", f, err)
  193. continue
  194. }
  195. _, err = plans.Execute(exec.NewExecContext()).Wait(context.Background())
  196. if err != nil {
  197. failed = append(failed, f)
  198. logger.Warnf("s2s: execute plan of file %v: %v", f, err)
  199. continue
  200. }
  201. success = append(success, f)
  202. }
  203. newDirPathes := make([]string, 0, len(dirPathes))
  204. for i := range dirPathes {
  205. newDirPathes = append(newDirPathes, strings.Replace(dirPathes[i], srcPath, dstPath, 1))
  206. }
  207. mkdirResp, err := dstSpaceCli.PublicStoreMkdirs(&hubmq.PublicStoreMkdirs{
  208. UserSpace: *dstSpace,
  209. Pathes: newDirPathes,
  210. })
  211. if err != nil {
  212. failed = append(failed, dirPathes...)
  213. logger.Warnf("s2s: mkdirs to destination userspace: %v", err)
  214. } else {
  215. for i := range dirPathes {
  216. if mkdirResp.Successes[i] {
  217. success = append(success, dirPathes[i])
  218. } else {
  219. failed = append(failed, dirPathes[i])
  220. }
  221. }
  222. }
  223. return clitypes.SpaceToSpaceResult{
  224. Success: success,
  225. Failed: failed,
  226. }, nil
  227. }

本项目旨在将云际存储公共基础设施化,使个人及企业可低门槛使用高效的云际存储服务(安装开箱即用云际存储客户端即可,无需关注其他组件的部署),同时支持用户灵活便捷定制云际存储的功能细节。