You can not select more than 25 topics Topics must start with a chinese character,a letter or number, can include dashes ('-') and can be up to 35 characters long.

uploader.go 7.9 kB

7 months ago
7 months ago
7 months ago
7 months ago
7 months ago
7 months ago
7 months ago
7 months ago
7 months ago
7 months ago
7 months ago
7 months ago
7 months ago
7 months ago
7 months ago
7 months ago
7 months ago
7 months ago
7 months ago
7 months ago
7 months ago
7 months ago
7 months ago
7 months ago
7 months ago
7 months ago
7 months ago
7 months ago
7 months ago
7 months ago
7 months ago
7 months ago
7 months ago
7 months ago
7 months ago
7 months ago
7 months ago
7 months ago
7 months ago
7 months ago
7 months ago
7 months ago
7 months ago
7 months ago
7 months ago
7 months ago
7 months ago
7 months ago
7 months ago
7 months ago
7 months ago
7 months ago
7 months ago
7 months ago
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269
  1. package uploader
  2. import (
  3. "context"
  4. "fmt"
  5. "io"
  6. "math"
  7. "math/rand"
  8. "time"
  9. "github.com/samber/lo"
  10. "gitlink.org.cn/cloudream/common/pkgs/ioswitch/exec"
  11. "gitlink.org.cn/cloudream/common/utils/lo2"
  12. "gitlink.org.cn/cloudream/common/utils/sort2"
  13. "gitlink.org.cn/cloudream/storage2/client/internal/db"
  14. "gitlink.org.cn/cloudream/storage2/client/internal/metacache"
  15. "gitlink.org.cn/cloudream/storage2/client/types"
  16. stgglb "gitlink.org.cn/cloudream/storage2/common/globals"
  17. "gitlink.org.cn/cloudream/storage2/common/pkgs/connectivity"
  18. "gitlink.org.cn/cloudream/storage2/common/pkgs/distlock"
  19. "gitlink.org.cn/cloudream/storage2/common/pkgs/ioswitch2"
  20. "gitlink.org.cn/cloudream/storage2/common/pkgs/ioswitch2/ops2"
  21. "gitlink.org.cn/cloudream/storage2/common/pkgs/ioswitch2/parser"
  22. "gitlink.org.cn/cloudream/storage2/common/pkgs/storage/pool"
  23. )
  24. type Uploader struct {
  25. distlock *distlock.Service
  26. connectivity *connectivity.Collector
  27. stgPool *pool.Pool
  28. spaceMeta *metacache.UserSpaceMeta
  29. db *db.DB
  30. }
  31. func NewUploader(distlock *distlock.Service, connectivity *connectivity.Collector, stgPool *pool.Pool, spaceMeta *metacache.UserSpaceMeta, db *db.DB) *Uploader {
  32. return &Uploader{
  33. distlock: distlock,
  34. connectivity: connectivity,
  35. stgPool: stgPool,
  36. spaceMeta: spaceMeta,
  37. }
  38. }
  39. func (u *Uploader) BeginUpdate(pkgID types.PackageID, affinity types.UserSpaceID, loadTo []types.UserSpaceID, loadToPath []string) (*UpdateUploader, error) {
  40. spaceIDs, err := u.db.UserSpace().GetAllIDs(u.db.DefCtx())
  41. if err != nil {
  42. return nil, fmt.Errorf("getting user space ids: %w", err)
  43. }
  44. spaceDetails := u.spaceMeta.GetMany(spaceIDs)
  45. spaceDetails = lo2.RemoveAllDefault(spaceDetails)
  46. cons := u.connectivity.GetAll()
  47. var uploadSpaces []UploadSpaceInfo
  48. for _, space := range spaceDetails {
  49. if space.MasterHub == nil {
  50. continue
  51. }
  52. latency := time.Duration(math.MaxInt64)
  53. con, ok := cons[space.MasterHub.HubID]
  54. if ok && con.Latency != nil {
  55. latency = *con.Latency
  56. }
  57. uploadSpaces = append(uploadSpaces, UploadSpaceInfo{
  58. Space: *space,
  59. Delay: latency,
  60. IsSameLocation: space.MasterHub.LocationID == stgglb.Local.LocationID,
  61. })
  62. }
  63. if len(uploadSpaces) == 0 {
  64. return nil, fmt.Errorf("user no available storages")
  65. }
  66. loadToSpaces := make([]types.UserSpaceDetail, len(loadTo))
  67. for i, spaceID := range loadTo {
  68. space, ok := lo.Find(spaceDetails, func(space *types.UserSpaceDetail) bool {
  69. return space.UserSpace.UserSpaceID == spaceID
  70. })
  71. if !ok {
  72. return nil, fmt.Errorf("load to storage %v not found", spaceID)
  73. }
  74. if space.MasterHub == nil {
  75. return nil, fmt.Errorf("load to storage %v has no master hub", spaceID)
  76. }
  77. loadToSpaces[i] = *space
  78. }
  79. target := u.chooseUploadStorage(uploadSpaces, affinity)
  80. // TODO2 加锁
  81. // 给上传节点的IPFS加锁
  82. // TODO 考虑加Object的Create锁
  83. // 防止上传的副本被清除
  84. // distMutex, err := reqbuilder.NewBuilder().Shard().Buzy(target.Space.Storage.StorageID).MutexLock(u.distlock)
  85. // if err != nil {
  86. // return nil, fmt.Errorf("acquire distlock: %w", err)
  87. // }
  88. return &UpdateUploader{
  89. uploader: u,
  90. pkgID: pkgID,
  91. targetSpace: target.Space,
  92. // distMutex: distMutex,
  93. loadToSpaces: loadToSpaces,
  94. loadToPath: loadToPath,
  95. }, nil
  96. }
  97. // chooseUploadStorage 选择一个上传文件的节点
  98. // 1. 选择设置了亲和性的节点
  99. // 2. 从与当前客户端相同地域的节点中随机选一个
  100. // 3. 没有的话从所有节点选择延迟最低的节点
  101. func (w *Uploader) chooseUploadStorage(spaces []UploadSpaceInfo, spaceAffinity types.UserSpaceID) UploadSpaceInfo {
  102. if spaceAffinity > 0 {
  103. aff, ok := lo.Find(spaces, func(space UploadSpaceInfo) bool { return space.Space.UserSpace.UserSpaceID == spaceAffinity })
  104. if ok {
  105. return aff
  106. }
  107. }
  108. sameLocationStorages := lo.Filter(spaces, func(e UploadSpaceInfo, i int) bool { return e.IsSameLocation })
  109. if len(sameLocationStorages) > 0 {
  110. return sameLocationStorages[rand.Intn(len(sameLocationStorages))]
  111. }
  112. // 选择延迟最低的节点
  113. spaces = sort2.Sort(spaces, func(e1, e2 UploadSpaceInfo) int { return sort2.Cmp(e1.Delay, e2.Delay) })
  114. return spaces[0]
  115. }
  116. func (u *Uploader) BeginCreateLoad(bktID types.BucketID, pkgName string, loadTo []types.UserSpaceID, loadToPath []string) (*CreateLoadUploader, error) {
  117. getSpaces := u.spaceMeta.GetMany(loadTo)
  118. spacesStgs := make([]types.UserSpaceDetail, len(loadTo))
  119. for i, stg := range getSpaces {
  120. if stg == nil {
  121. return nil, fmt.Errorf("storage %v not found", loadTo[i])
  122. }
  123. spacesStgs[i] = *stg
  124. }
  125. pkg, err := u.db.Package().Create(u.db.DefCtx(), bktID, pkgName)
  126. if err != nil {
  127. return nil, fmt.Errorf("create package: %w", err)
  128. }
  129. // TODO2 加锁
  130. // reqBld := reqbuilder.NewBuilder()
  131. // for _, stg := range spacesStgs {
  132. // reqBld.Shard().Buzy(stg.Storage.StorageID)
  133. // reqBld.Storage().Buzy(stg.Storage.StorageID)
  134. // }
  135. // lock, err := reqBld.MutexLock(u.distlock)
  136. // if err != nil {
  137. // return nil, fmt.Errorf("acquire distlock: %w", err)
  138. // }
  139. return &CreateLoadUploader{
  140. pkg: pkg,
  141. targetSpaces: spacesStgs,
  142. loadRoots: loadToPath,
  143. uploader: u,
  144. // distlock: lock,
  145. }, nil
  146. }
  147. func (u *Uploader) UploadPart(objID types.ObjectID, index int, stream io.Reader) error {
  148. detail, err := u.db.Object().GetDetail(u.db.DefCtx(), objID)
  149. if err != nil {
  150. return fmt.Errorf("getting object detail: %w", err)
  151. }
  152. objDe := detail
  153. _, ok := objDe.Object.Redundancy.(*types.MultipartUploadRedundancy)
  154. if !ok {
  155. return fmt.Errorf("object %v is not a multipart upload", objID)
  156. }
  157. var space types.UserSpaceDetail
  158. if len(objDe.Blocks) > 0 {
  159. cstg := u.spaceMeta.Get(objDe.Blocks[0].UserSpaceID)
  160. if cstg == nil {
  161. return fmt.Errorf("space %v not found", objDe.Blocks[0].UserSpaceID)
  162. }
  163. space = *cstg
  164. } else {
  165. spaceIDs, err := u.db.UserSpace().GetAllIDs(u.db.DefCtx())
  166. if err != nil {
  167. return fmt.Errorf("getting user space ids: %w", err)
  168. }
  169. spaces := u.spaceMeta.GetMany(spaceIDs)
  170. spaces = lo2.RemoveAllDefault(spaces)
  171. cons := u.connectivity.GetAll()
  172. var userStgs []UploadSpaceInfo
  173. for _, space := range spaces {
  174. if space.MasterHub == nil {
  175. continue
  176. }
  177. delay := time.Duration(math.MaxInt64)
  178. con, ok := cons[space.MasterHub.HubID]
  179. if ok && con.Latency != nil {
  180. delay = *con.Latency
  181. }
  182. userStgs = append(userStgs, UploadSpaceInfo{
  183. Space: *space,
  184. Delay: delay,
  185. IsSameLocation: space.MasterHub.LocationID == stgglb.Local.LocationID,
  186. })
  187. }
  188. if len(userStgs) == 0 {
  189. return fmt.Errorf("user no available storages")
  190. }
  191. space = u.chooseUploadStorage(userStgs, 0).Space
  192. }
  193. // TODO2 加锁
  194. // lock, err := reqbuilder.NewBuilder().Shard().Buzy(space.Storage.StorageID).MutexLock(u.distlock)
  195. // if err != nil {
  196. // return fmt.Errorf("acquire distlock: %w", err)
  197. // }
  198. // defer lock.Unlock()
  199. ft := ioswitch2.NewFromTo()
  200. fromDrv, hd := ioswitch2.NewFromDriver(ioswitch2.RawStream())
  201. ft.AddFrom(fromDrv).
  202. AddTo(ioswitch2.NewToShardStore(*space.MasterHub, space, ioswitch2.RawStream(), "shard"))
  203. plans := exec.NewPlanBuilder()
  204. err = parser.Parse(ft, plans)
  205. if err != nil {
  206. return fmt.Errorf("parse fromto: %w", err)
  207. }
  208. exeCtx := exec.NewExecContext()
  209. exec.SetValueByType(exeCtx, u.stgPool)
  210. exec := plans.Execute(exeCtx)
  211. exec.BeginWrite(io.NopCloser(stream), hd)
  212. ret, err := exec.Wait(context.TODO())
  213. if err != nil {
  214. return fmt.Errorf("executing plan: %w", err)
  215. }
  216. shardInfo := ret["shard"].(*ops2.ShardInfoValue)
  217. err = u.db.DoTx(func(tx db.SQLContext) error {
  218. return u.db.Object().AppendPart(tx, types.ObjectBlock{
  219. ObjectID: objID,
  220. Index: index,
  221. UserSpaceID: space.UserSpace.UserSpaceID,
  222. FileHash: shardInfo.Hash,
  223. Size: shardInfo.Size,
  224. })
  225. })
  226. return err
  227. }

本项目旨在将云际存储公共基础设施化,使个人及企业可低门槛使用高效的云际存储服务(安装开箱即用云际存储客户端即可,无需关注其他组件的部署),同时支持用户灵活便捷定制云际存储的功能细节。