You can not select more than 25 topics Topics must start with a chinese character,a letter or number, can include dashes ('-') and can be up to 35 characters long.

user_space_upload.go 5.6 kB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185
  1. package uploader
  2. import (
  3. "context"
  4. "fmt"
  5. "math"
  6. "strings"
  7. "time"
  8. "github.com/samber/lo"
  9. "gitlink.org.cn/cloudream/common/pkgs/ioswitch/exec"
  10. "gitlink.org.cn/cloudream/jcs-pub/client/internal/db"
  11. clitypes "gitlink.org.cn/cloudream/jcs-pub/client/types"
  12. stgglb "gitlink.org.cn/cloudream/jcs-pub/common/globals"
  13. "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/ioswitch2"
  14. "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/ioswitch2/ops2"
  15. "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/ioswitch2/parser"
  16. "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/mq/coordinator"
  17. hubrpc "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/rpc/hub"
  18. "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/storage/types"
  19. cortypes "gitlink.org.cn/cloudream/jcs-pub/coordinator/types"
  20. )
  21. func (u *Uploader) UserSpaceUpload(userSpaceID clitypes.UserSpaceID, rootPath string, targetBktID clitypes.BucketID, newPkgName string, uploadAffinity clitypes.UserSpaceID) (*clitypes.Package, error) {
  22. srcSpace := u.spaceMeta.Get(userSpaceID)
  23. if srcSpace == nil {
  24. return nil, fmt.Errorf("user space %d not found", userSpaceID)
  25. }
  26. if srcSpace.MasterHub == nil {
  27. return nil, fmt.Errorf("master hub not found for user space %d", userSpaceID)
  28. }
  29. pkg, err := db.DoTx01(u.db, func(tx db.SQLContext) (clitypes.Package, error) {
  30. _, err := u.db.Bucket().GetByID(tx, targetBktID)
  31. if err != nil {
  32. return clitypes.Package{}, err
  33. }
  34. return u.db.Package().Create(tx, targetBktID, newPkgName, time.Now())
  35. })
  36. if err != nil {
  37. return nil, fmt.Errorf("creating package: %w", err)
  38. }
  39. delPkg := func() {
  40. u.db.Package().Delete(u.db.DefCtx(), pkg.PackageID)
  41. }
  42. spaceIDs, err := u.db.UserSpace().GetAllIDs(u.db.DefCtx())
  43. if err != nil {
  44. delPkg()
  45. return nil, fmt.Errorf("getting user space ids: %w", err)
  46. }
  47. spaceDetails := u.spaceMeta.GetMany(spaceIDs)
  48. spaceDetails = lo.Filter(spaceDetails, func(e *clitypes.UserSpaceDetail, i int) bool {
  49. return e != nil && e.MasterHub != nil && e.UserSpace.ShardStore != nil
  50. })
  51. coorCli, err := stgglb.CoordinatorMQPool.Acquire()
  52. if err != nil {
  53. delPkg()
  54. return nil, fmt.Errorf("acquiring coordinator mq client: %w", err)
  55. }
  56. defer stgglb.CoordinatorMQPool.Release(coorCli)
  57. resp, err := coorCli.GetHubConnectivities(coordinator.ReqGetHubConnectivities([]cortypes.HubID{srcSpace.MasterHub.HubID}))
  58. if err != nil {
  59. delPkg()
  60. return nil, fmt.Errorf("getting hub connectivities: %w", err)
  61. }
  62. cons := make(map[cortypes.HubID]cortypes.HubConnectivity)
  63. for _, c := range resp.Connectivities {
  64. cons[c.ToHubID] = c
  65. }
  66. var uploadSpaces []UploadSpaceInfo
  67. for _, space := range spaceDetails {
  68. if space.MasterHub == nil {
  69. continue
  70. }
  71. latency := time.Duration(math.MaxInt64)
  72. con, ok := cons[space.MasterHub.HubID]
  73. if ok && con.Latency != nil {
  74. latency = time.Duration(*con.Latency * float32(time.Millisecond))
  75. }
  76. uploadSpaces = append(uploadSpaces, UploadSpaceInfo{
  77. Space: *space,
  78. Delay: latency,
  79. IsSameLocation: space.MasterHub.LocationID == srcSpace.MasterHub.LocationID,
  80. })
  81. }
  82. if len(uploadSpaces) == 0 {
  83. delPkg()
  84. return nil, fmt.Errorf("user no available userspaces")
  85. }
  86. targetSapce := u.chooseUploadStorage(uploadSpaces, uploadAffinity)
  87. addr, ok := srcSpace.MasterHub.Address.(*cortypes.GRPCAddressInfo)
  88. if !ok {
  89. delPkg()
  90. return nil, fmt.Errorf("master of user space %v has no grpc address", srcSpace.UserSpace)
  91. }
  92. srcHubCli := stgglb.HubRPCPool.Get(stgglb.SelectGRPCAddress(*srcSpace.MasterHub, *addr))
  93. defer srcHubCli.Release()
  94. listAllResp, err := srcHubCli.PublicStoreListAll(context.Background(), &hubrpc.PublicStoreListAll{
  95. UserSpace: *srcSpace,
  96. Path: rootPath,
  97. })
  98. if err != nil {
  99. delPkg()
  100. return nil, fmt.Errorf("listing public store: %w", err)
  101. }
  102. adds, err := u.uploadFromPublicStore(srcSpace, &targetSapce.Space, listAllResp.Entries, rootPath)
  103. if err != nil {
  104. delPkg()
  105. return nil, fmt.Errorf("uploading from public store: %w", err)
  106. }
  107. _, err = db.DoTx21(u.db, u.db.Object().BatchAdd, pkg.PackageID, adds)
  108. if err != nil {
  109. delPkg()
  110. return nil, fmt.Errorf("adding objects: %w", err)
  111. }
  112. return &pkg, nil
  113. }
  114. func (u *Uploader) uploadFromPublicStore(srcSpace *clitypes.UserSpaceDetail, targetSpace *clitypes.UserSpaceDetail, entries []types.PublicStoreEntry, rootPath string) ([]db.AddObjectEntry, error) {
  115. ft := ioswitch2.FromTo{}
  116. for _, e := range entries {
  117. // 可以考虑增加一个配置项来控制是否上传空目录
  118. if e.IsDir {
  119. continue
  120. }
  121. ft.AddFrom(ioswitch2.NewFromPublicStore(*srcSpace.MasterHub, *srcSpace, e.Path))
  122. ft.AddTo(ioswitch2.NewToShardStore(*targetSpace.MasterHub, *targetSpace, ioswitch2.RawStream(), e.Path))
  123. }
  124. plans := exec.NewPlanBuilder()
  125. err := parser.Parse(ft, plans)
  126. if err != nil {
  127. return nil, fmt.Errorf("parsing plan: %w", err)
  128. }
  129. exeCtx := exec.NewExecContext()
  130. exec.SetValueByType(exeCtx, u.stgPool)
  131. ret, err := plans.Execute(exeCtx).Wait(context.Background())
  132. if err != nil {
  133. return nil, fmt.Errorf("executing plan: %w", err)
  134. }
  135. cleanRoot := strings.TrimSuffix(rootPath, clitypes.ObjectPathSeparator)
  136. adds := make([]db.AddObjectEntry, 0, len(ret))
  137. for _, e := range entries {
  138. if e.IsDir {
  139. continue
  140. }
  141. pat := strings.TrimPrefix(e.Path, cleanRoot+clitypes.ObjectPathSeparator)
  142. if pat == cleanRoot {
  143. pat = clitypes.BaseName(e.Path)
  144. }
  145. info := ret[e.Path].(*ops2.ShardInfoValue)
  146. adds = append(adds, db.AddObjectEntry{
  147. Path: pat,
  148. Size: info.Size,
  149. FileHash: info.Hash,
  150. CreateTime: time.Now(),
  151. UserSpaceIDs: []clitypes.UserSpaceID{targetSpace.UserSpace.UserSpaceID},
  152. })
  153. }
  154. return adds, nil
  155. }

本项目旨在将云际存储公共基础设施化,使个人及企业可低门槛使用高效的云际存储服务(安装开箱即用云际存储客户端即可,无需关注其他组件的部署),同时支持用户灵活便捷定制云际存储的功能细节。