You can not select more than 25 topics Topics must start with a chinese character,a letter or number, can include dashes ('-') and can be up to 35 characters long.

storage_load_package.go 10 kB

1 year ago
1 year ago
1 year ago
1 year ago
1 year ago
1 year ago
1 year ago
1 year ago
1 year ago
1 year ago
1 year ago
1 year ago
1 year ago
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339
  1. package task
  2. import (
  3. "fmt"
  4. "io"
  5. "math"
  6. "time"
  7. "github.com/samber/lo"
  8. "gitlink.org.cn/cloudream/common/pkgs/bitmap"
  9. "gitlink.org.cn/cloudream/common/pkgs/logger"
  10. "gitlink.org.cn/cloudream/common/pkgs/task"
  11. cdssdk "gitlink.org.cn/cloudream/common/sdks/storage"
  12. "gitlink.org.cn/cloudream/common/utils/io2"
  13. "gitlink.org.cn/cloudream/common/utils/reflect2"
  14. "gitlink.org.cn/cloudream/common/utils/sort2"
  15. "gitlink.org.cn/cloudream/storage/common/consts"
  16. stgglb "gitlink.org.cn/cloudream/storage/common/globals"
  17. stgmod "gitlink.org.cn/cloudream/storage/common/models"
  18. "gitlink.org.cn/cloudream/storage/common/pkgs/distlock/reqbuilder"
  19. "gitlink.org.cn/cloudream/storage/common/pkgs/ec"
  20. coormq "gitlink.org.cn/cloudream/storage/common/pkgs/mq/coordinator"
  21. "gitlink.org.cn/cloudream/storage/common/pkgs/storage/types"
  22. "gitlink.org.cn/cloudream/storage/common/pkgs/storage/utils"
  23. )
  24. type StorageLoadPackage struct {
  25. PackagePath string
  26. LocalBase string
  27. RemoteBase string
  28. userID cdssdk.UserID
  29. packageID cdssdk.PackageID
  30. storageID cdssdk.StorageID
  31. pinnedBlocks []stgmod.ObjectBlock
  32. }
  33. func NewStorageLoadPackage(userID cdssdk.UserID, packageID cdssdk.PackageID, storageID cdssdk.StorageID) *StorageLoadPackage {
  34. return &StorageLoadPackage{
  35. userID: userID,
  36. packageID: packageID,
  37. storageID: storageID,
  38. }
  39. }
  40. func (t *StorageLoadPackage) Execute(task *task.Task[TaskContext], ctx TaskContext, complete CompleteFn) {
  41. startTime := time.Now()
  42. log := logger.WithType[StorageLoadPackage]("Task")
  43. log.WithField("TaskID", task.ID()).
  44. Infof("begin to load package %v to %v", t.packageID, t.storageID)
  45. err := t.do(task, ctx)
  46. if err == nil {
  47. log.WithField("TaskID", task.ID()).
  48. Infof("loading success, cost: %v", time.Since(startTime))
  49. } else {
  50. log.WithField("TaskID", task.ID()).
  51. Warnf("loading package: %v, cost: %v", err, time.Since(startTime))
  52. }
  53. complete(err, CompleteOption{
  54. RemovingDelay: time.Minute,
  55. })
  56. }
  57. func (t *StorageLoadPackage) do(task *task.Task[TaskContext], ctx TaskContext) error {
  58. coorCli, err := stgglb.CoordinatorMQPool.Acquire()
  59. if err != nil {
  60. return fmt.Errorf("new coordinator client: %w", err)
  61. }
  62. defer stgglb.CoordinatorMQPool.Release(coorCli)
  63. shared, err := ctx.stgMgr.GetSharedStore(t.storageID)
  64. if err != nil {
  65. return fmt.Errorf("get shared store of storage %v: %w", t.storageID, err)
  66. }
  67. t.PackagePath = utils.MakeLoadedPackagePath(t.userID, t.packageID)
  68. getObjectDetails, err := coorCli.GetPackageObjectDetails(coormq.ReqGetPackageObjectDetails(t.packageID))
  69. if err != nil {
  70. return fmt.Errorf("getting package object details: %w", err)
  71. }
  72. shardstore, err := ctx.stgMgr.GetShardStore(t.storageID)
  73. if err != nil {
  74. return fmt.Errorf("get shard store of storage %v: %w", t.storageID, err)
  75. }
  76. mutex, err := reqbuilder.NewBuilder().
  77. // 提前占位
  78. Metadata().StoragePackage().CreateOne(t.userID, t.storageID, t.packageID).
  79. // 保护在storage目录中下载的文件
  80. Storage().Buzy(t.storageID).
  81. // 保护下载文件时同时保存到IPFS的文件
  82. Shard().Buzy(t.storageID).
  83. MutexLock(ctx.distlock)
  84. if err != nil {
  85. return fmt.Errorf("acquire locks failed, err: %w", err)
  86. }
  87. defer mutex.Unlock()
  88. for _, obj := range getObjectDetails.Objects {
  89. err := t.downloadOne(coorCli, shardstore, shared, obj)
  90. if err != nil {
  91. return err
  92. }
  93. ctx.accessStat.AddAccessCounter(obj.Object.ObjectID, t.packageID, t.storageID, 1)
  94. }
  95. _, err = coorCli.StoragePackageLoaded(coormq.NewStoragePackageLoaded(t.userID, t.storageID, t.packageID, t.pinnedBlocks))
  96. if err != nil {
  97. return fmt.Errorf("loading package to storage: %w", err)
  98. }
  99. // TODO 要防止下载的临时文件被删除
  100. return err
  101. }
  102. func (t *StorageLoadPackage) downloadOne(coorCli *coormq.Client, shardStore types.ShardStore, shared types.SharedStore, obj stgmod.ObjectDetail) error {
  103. var file io.ReadCloser
  104. switch red := obj.Object.Redundancy.(type) {
  105. case *cdssdk.NoneRedundancy:
  106. reader, err := t.downloadNoneOrRepObject(shardStore, obj)
  107. if err != nil {
  108. return fmt.Errorf("downloading object: %w", err)
  109. }
  110. file = reader
  111. case *cdssdk.RepRedundancy:
  112. reader, err := t.downloadNoneOrRepObject(shardStore, obj)
  113. if err != nil {
  114. return fmt.Errorf("downloading rep object: %w", err)
  115. }
  116. file = reader
  117. case *cdssdk.ECRedundancy:
  118. reader, pinnedBlocks, err := t.downloadECObject(coorCli, shardStore, obj, red)
  119. if err != nil {
  120. return fmt.Errorf("downloading ec object: %w", err)
  121. }
  122. file = reader
  123. t.pinnedBlocks = append(t.pinnedBlocks, pinnedBlocks...)
  124. default:
  125. return fmt.Errorf("unknow redundancy type: %v", reflect2.TypeOfValue(obj.Object.Redundancy))
  126. }
  127. defer file.Close()
  128. if _, err := shared.WritePackageObject(t.userID, t.packageID, obj.Object.Path, file); err != nil {
  129. return fmt.Errorf("writting object to file: %w", err)
  130. }
  131. return nil
  132. }
  133. func (t *StorageLoadPackage) downloadNoneOrRepObject(shardStore types.ShardStore, obj stgmod.ObjectDetail) (io.ReadCloser, error) {
  134. if len(obj.Blocks) == 0 && len(obj.PinnedAt) == 0 {
  135. return nil, fmt.Errorf("no storage has this object")
  136. }
  137. file, err := shardStore.Open(types.NewOpen(obj.Object.FileHash))
  138. if err != nil {
  139. return nil, err
  140. }
  141. return file, nil
  142. }
  143. func (t *StorageLoadPackage) downloadECObject(coorCli *coormq.Client, shardStore types.ShardStore, obj stgmod.ObjectDetail, ecRed *cdssdk.ECRedundancy) (io.ReadCloser, []stgmod.ObjectBlock, error) {
  144. allStorages, err := t.sortDownloadStorages(coorCli, obj)
  145. if err != nil {
  146. return nil, nil, err
  147. }
  148. bsc, blocks := t.getMinReadingBlockSolution(allStorages, ecRed.K)
  149. osc, _ := t.getMinReadingObjectSolution(allStorages, ecRed.K)
  150. if bsc < osc {
  151. var fileStrs []io.ReadCloser
  152. rs, err := ec.NewStreamRs(ecRed.K, ecRed.N, ecRed.ChunkSize)
  153. if err != nil {
  154. return nil, nil, fmt.Errorf("new rs: %w", err)
  155. }
  156. for i := range blocks {
  157. str, err := shardStore.Open(types.NewOpen(blocks[i].Block.FileHash))
  158. if err != nil {
  159. for i -= 1; i >= 0; i-- {
  160. fileStrs[i].Close()
  161. }
  162. return nil, nil, fmt.Errorf("donwloading file: %w", err)
  163. }
  164. fileStrs = append(fileStrs, str)
  165. }
  166. fileReaders, filesCloser := io2.ToReaders(fileStrs)
  167. var indexes []int
  168. for _, b := range blocks {
  169. indexes = append(indexes, b.Block.Index)
  170. }
  171. outputs, outputsCloser := io2.ToReaders(rs.ReconstructData(fileReaders, indexes))
  172. return io2.AfterReadClosed(io2.Length(io2.ChunkedJoin(outputs, int(ecRed.ChunkSize)), obj.Object.Size), func(c io.ReadCloser) {
  173. filesCloser()
  174. outputsCloser()
  175. }), nil, nil
  176. }
  177. // bsc >= osc,如果osc是MaxFloat64,那么bsc也一定是,也就意味着没有足够块来恢复文件
  178. if osc == math.MaxFloat64 {
  179. return nil, nil, fmt.Errorf("no enough blocks to reconstruct the file, want %d, get only %d", ecRed.K, len(blocks))
  180. }
  181. // 如果是直接读取的文件,那么就不需要Pin文件块
  182. str, err := shardStore.Open(types.NewOpen(obj.Object.FileHash))
  183. return str, nil, err
  184. }
  185. type downloadStorageInfo struct {
  186. Storage stgmod.StorageDetail
  187. ObjectPinned bool
  188. Blocks []stgmod.ObjectBlock
  189. Distance float64
  190. }
  191. func (t *StorageLoadPackage) sortDownloadStorages(coorCli *coormq.Client, obj stgmod.ObjectDetail) ([]*downloadStorageInfo, error) {
  192. var stgIDs []cdssdk.StorageID
  193. for _, id := range obj.PinnedAt {
  194. if !lo.Contains(stgIDs, id) {
  195. stgIDs = append(stgIDs, id)
  196. }
  197. }
  198. for _, b := range obj.Blocks {
  199. if !lo.Contains(stgIDs, b.StorageID) {
  200. stgIDs = append(stgIDs, b.StorageID)
  201. }
  202. }
  203. getStgs, err := coorCli.GetStorageDetails(coormq.ReqGetStorageDetails(stgIDs))
  204. if err != nil {
  205. return nil, fmt.Errorf("getting storage details: %w", err)
  206. }
  207. allStgs := make(map[cdssdk.StorageID]stgmod.StorageDetail)
  208. for _, stg := range getStgs.Storages {
  209. allStgs[stg.Storage.StorageID] = *stg
  210. }
  211. downloadStorageMap := make(map[cdssdk.StorageID]*downloadStorageInfo)
  212. for _, id := range obj.PinnedAt {
  213. storage, ok := downloadStorageMap[id]
  214. if !ok {
  215. mod := allStgs[id]
  216. storage = &downloadStorageInfo{
  217. Storage: mod,
  218. ObjectPinned: true,
  219. Distance: t.getStorageDistance(mod),
  220. }
  221. downloadStorageMap[id] = storage
  222. }
  223. storage.ObjectPinned = true
  224. }
  225. for _, b := range obj.Blocks {
  226. storage, ok := downloadStorageMap[b.StorageID]
  227. if !ok {
  228. mod := allStgs[b.StorageID]
  229. storage = &downloadStorageInfo{
  230. Storage: mod,
  231. Distance: t.getStorageDistance(mod),
  232. }
  233. downloadStorageMap[b.StorageID] = storage
  234. }
  235. storage.Blocks = append(storage.Blocks, b)
  236. }
  237. return sort2.Sort(lo.Values(downloadStorageMap), func(left, right *downloadStorageInfo) int {
  238. return sort2.Cmp(left.Distance, right.Distance)
  239. }), nil
  240. }
  241. type downloadBlock struct {
  242. Storage stgmod.StorageDetail
  243. Block stgmod.ObjectBlock
  244. }
  245. func (t *StorageLoadPackage) getMinReadingBlockSolution(sortedStorages []*downloadStorageInfo, k int) (float64, []downloadBlock) {
  246. gotBlocksMap := bitmap.Bitmap64(0)
  247. var gotBlocks []downloadBlock
  248. dist := float64(0.0)
  249. for _, n := range sortedStorages {
  250. for _, b := range n.Blocks {
  251. if !gotBlocksMap.Get(b.Index) {
  252. gotBlocks = append(gotBlocks, downloadBlock{
  253. Storage: n.Storage,
  254. Block: b,
  255. })
  256. gotBlocksMap.Set(b.Index, true)
  257. dist += n.Distance
  258. }
  259. if len(gotBlocks) >= k {
  260. return dist, gotBlocks
  261. }
  262. }
  263. }
  264. return math.MaxFloat64, gotBlocks
  265. }
  266. func (t *StorageLoadPackage) getMinReadingObjectSolution(sortedStorages []*downloadStorageInfo, k int) (float64, *stgmod.StorageDetail) {
  267. dist := math.MaxFloat64
  268. var downloadStg *stgmod.StorageDetail
  269. for _, n := range sortedStorages {
  270. if n.ObjectPinned && float64(k)*n.Distance < dist {
  271. dist = float64(k) * n.Distance
  272. stg := n.Storage
  273. downloadStg = &stg
  274. }
  275. }
  276. return dist, downloadStg
  277. }
  278. func (t *StorageLoadPackage) getStorageDistance(stg stgmod.StorageDetail) float64 {
  279. if stgglb.Local.HubID != nil {
  280. if stg.MasterHub.HubID == *stgglb.Local.HubID {
  281. return consts.StorageDistanceSameStorage
  282. }
  283. }
  284. if stg.MasterHub.LocationID == stgglb.Local.LocationID {
  285. return consts.StorageDistanceSameLocation
  286. }
  287. return consts.StorageDistanceOther
  288. }

本项目旨在将云际存储公共基础设施化,使个人及企业可低门槛使用高效的云际存储服务(安装开箱即用云际存储客户端即可,无需关注其他组件的部署),同时支持用户灵活便捷定制云际存储的功能细节。