You can not select more than 25 topics Topics must start with a chinese character,a letter or number, can include dashes ('-') and can be up to 35 characters long.

storage_load_package.go 10 kB


  1. package task
  2. import (
  3. "fmt"
  4. "io"
  5. "math"
  6. "os"
  7. "path/filepath"
  8. "time"
  9. "github.com/samber/lo"
  10. "gitlink.org.cn/cloudream/common/pkgs/bitmap"
  11. "gitlink.org.cn/cloudream/common/pkgs/ipfs"
  12. "gitlink.org.cn/cloudream/common/pkgs/task"
  13. cdssdk "gitlink.org.cn/cloudream/common/sdks/storage"
  14. myio "gitlink.org.cn/cloudream/common/utils/io"
  15. myref "gitlink.org.cn/cloudream/common/utils/reflect"
  16. mysort "gitlink.org.cn/cloudream/common/utils/sort"
  17. "gitlink.org.cn/cloudream/storage/common/consts"
  18. stgglb "gitlink.org.cn/cloudream/storage/common/globals"
  19. stgmod "gitlink.org.cn/cloudream/storage/common/models"
  20. "gitlink.org.cn/cloudream/storage/common/pkgs/distlock/reqbuilder"
  21. "gitlink.org.cn/cloudream/storage/common/pkgs/ec"
  22. coormq "gitlink.org.cn/cloudream/storage/common/pkgs/mq/coordinator"
  23. "gitlink.org.cn/cloudream/storage/common/utils"
  24. )
  25. type StorageLoadPackage struct {
  26. FullOutputPath string
  27. userID cdssdk.UserID
  28. packageID cdssdk.PackageID
  29. storageID cdssdk.StorageID
  30. pinnedBlocks []stgmod.ObjectBlock
  31. }
  32. func NewStorageLoadPackage(userID cdssdk.UserID, packageID cdssdk.PackageID, storageID cdssdk.StorageID) *StorageLoadPackage {
  33. return &StorageLoadPackage{
  34. userID: userID,
  35. packageID: packageID,
  36. storageID: storageID,
  37. }
  38. }
  39. func (t *StorageLoadPackage) Execute(task *task.Task[TaskContext], ctx TaskContext, complete CompleteFn) {
  40. err := t.do(task, ctx)
  41. complete(err, CompleteOption{
  42. RemovingDelay: time.Minute,
  43. })
  44. }
  45. func (t *StorageLoadPackage) do(task *task.Task[TaskContext], ctx TaskContext) error {
  46. coorCli, err := stgglb.CoordinatorMQPool.Acquire()
  47. if err != nil {
  48. return fmt.Errorf("new coordinator client: %w", err)
  49. }
  50. defer stgglb.CoordinatorMQPool.Release(coorCli)
  51. ipfsCli, err := stgglb.IPFSPool.Acquire()
  52. if err != nil {
  53. return fmt.Errorf("new IPFS client: %w", err)
  54. }
  55. defer stgglb.IPFSPool.Release(ipfsCli)
  56. getStgResp, err := coorCli.GetStorageInfo(coormq.NewGetStorageInfo(t.userID, t.storageID))
  57. if err != nil {
  58. return fmt.Errorf("request to coordinator: %w", err)
  59. }
  60. outputDirPath := utils.MakeStorageLoadPackagePath(getStgResp.Directory, t.userID, t.packageID)
  61. if err = os.MkdirAll(outputDirPath, 0755); err != nil {
  62. return fmt.Errorf("creating output directory: %w", err)
  63. }
  64. t.FullOutputPath = outputDirPath
  65. getObjectDetails, err := coorCli.GetPackageObjectDetails(coormq.NewGetPackageObjectDetails(t.packageID))
  66. if err != nil {
  67. return fmt.Errorf("getting package object details: %w", err)
  68. }
  69. mutex, err := reqbuilder.NewBuilder().
  70. // 提前占位
  71. Metadata().StoragePackage().CreateOne(t.userID, t.storageID, t.packageID).
  72. // 保护在storage目录中下载的文件
  73. Storage().Buzy(t.storageID).
  74. // 保护下载文件时同时保存到IPFS的文件
  75. IPFS().Buzy(getStgResp.NodeID).
  76. MutexLock(ctx.distlock)
  77. if err != nil {
  78. return fmt.Errorf("acquire locks failed, err: %w", err)
  79. }
  80. defer mutex.Unlock()
  81. for _, obj := range getObjectDetails.Objects {
  82. err := t.downloadOne(coorCli, ipfsCli, outputDirPath, obj)
  83. if err != nil {
  84. return err
  85. }
  86. }
  87. _, err = coorCli.StoragePackageLoaded(coormq.NewStoragePackageLoaded(t.userID, t.storageID, t.packageID, t.pinnedBlocks))
  88. if err != nil {
  89. return fmt.Errorf("loading package to storage: %w", err)
  90. }
  91. // TODO 要防止下载的临时文件被删除
  92. return err
  93. }
  94. func (t *StorageLoadPackage) downloadOne(coorCli *coormq.Client, ipfsCli *ipfs.PoolClient, dir string, obj stgmod.ObjectDetail) error {
  95. var file io.ReadCloser
  96. switch red := obj.Object.Redundancy.(type) {
  97. case *cdssdk.NoneRedundancy:
  98. reader, err := t.downloadNoneOrRepObject(ipfsCli, obj)
  99. if err != nil {
  100. return fmt.Errorf("downloading object: %w", err)
  101. }
  102. file = reader
  103. case *cdssdk.RepRedundancy:
  104. reader, err := t.downloadNoneOrRepObject(ipfsCli, obj)
  105. if err != nil {
  106. return fmt.Errorf("downloading rep object: %w", err)
  107. }
  108. file = reader
  109. case *cdssdk.ECRedundancy:
  110. reader, pinnedBlocks, err := t.downloadECObject(coorCli, ipfsCli, obj, red)
  111. if err != nil {
  112. return fmt.Errorf("downloading ec object: %w", err)
  113. }
  114. file = reader
  115. t.pinnedBlocks = append(t.pinnedBlocks, pinnedBlocks...)
  116. default:
  117. return fmt.Errorf("unknow redundancy type: %v", myref.TypeOfValue(obj.Object.Redundancy))
  118. }
  119. defer file.Close()
  120. fullPath := filepath.Join(dir, obj.Object.Path)
  121. lastDirPath := filepath.Dir(fullPath)
  122. if err := os.MkdirAll(lastDirPath, 0755); err != nil {
  123. return fmt.Errorf("creating object last dir: %w", err)
  124. }
  125. outputFile, err := os.Create(fullPath)
  126. if err != nil {
  127. return fmt.Errorf("creating object file: %w", err)
  128. }
  129. defer outputFile.Close()
  130. if _, err := io.Copy(outputFile, file); err != nil {
  131. return fmt.Errorf("writting object to file: %w", err)
  132. }
  133. return nil
  134. }
  135. func (t *StorageLoadPackage) downloadNoneOrRepObject(ipfsCli *ipfs.PoolClient, obj stgmod.ObjectDetail) (io.ReadCloser, error) {
  136. if len(obj.Blocks) == 0 && len(obj.PinnedAt) == 0 {
  137. return nil, fmt.Errorf("no node has this object")
  138. }
  139. // 不管实际有没有成功
  140. ipfsCli.Pin(obj.Object.FileHash)
  141. file, err := ipfsCli.OpenRead(obj.Object.FileHash)
  142. if err != nil {
  143. return nil, err
  144. }
  145. return file, nil
  146. }
  147. func (t *StorageLoadPackage) downloadECObject(coorCli *coormq.Client, ipfsCli *ipfs.PoolClient, obj stgmod.ObjectDetail, ecRed *cdssdk.ECRedundancy) (io.ReadCloser, []stgmod.ObjectBlock, error) {
  148. allNodes, err := t.sortDownloadNodes(coorCli, obj)
  149. if err != nil {
  150. return nil, nil, err
  151. }
  152. bsc, blocks := t.getMinReadingBlockSolution(allNodes, ecRed.K)
  153. osc, _ := t.getMinReadingObjectSolution(allNodes, ecRed.K)
  154. if bsc < osc {
  155. var fileStrs []io.ReadCloser
  156. rs, err := ec.NewRs(ecRed.K, ecRed.N, ecRed.ChunkSize)
  157. if err != nil {
  158. return nil, nil, fmt.Errorf("new rs: %w", err)
  159. }
  160. for i := range blocks {
  161. // 不管实际有没有成功
  162. ipfsCli.Pin(blocks[i].Block.FileHash)
  163. str, err := ipfsCli.OpenRead(blocks[i].Block.FileHash)
  164. if err != nil {
  165. for i -= 1; i >= 0; i-- {
  166. fileStrs[i].Close()
  167. }
  168. return nil, nil, fmt.Errorf("donwloading file: %w", err)
  169. }
  170. fileStrs = append(fileStrs, str)
  171. }
  172. fileReaders, filesCloser := myio.ToReaders(fileStrs)
  173. var indexes []int
  174. var pinnedBlocks []stgmod.ObjectBlock
  175. for _, b := range blocks {
  176. indexes = append(indexes, b.Block.Index)
  177. pinnedBlocks = append(pinnedBlocks, stgmod.ObjectBlock{
  178. ObjectID: b.Block.ObjectID,
  179. Index: b.Block.Index,
  180. NodeID: *stgglb.Local.NodeID,
  181. FileHash: b.Block.FileHash,
  182. })
  183. }
  184. outputs, outputsCloser := myio.ToReaders(rs.ReconstructData(fileReaders, indexes))
  185. return myio.AfterReadClosed(myio.Length(myio.ChunkedJoin(outputs, int(ecRed.ChunkSize)), obj.Object.Size), func(c io.ReadCloser) {
  186. filesCloser()
  187. outputsCloser()
  188. }), pinnedBlocks, nil
  189. }
  190. // bsc >= osc,如果osc是MaxFloat64,那么bsc也一定是,也就意味着没有足够块来恢复文件
  191. if osc == math.MaxFloat64 {
  192. return nil, nil, fmt.Errorf("no enough blocks to reconstruct the file, want %d, get only %d", ecRed.K, len(blocks))
  193. }
  194. // 如果是直接读取的文件,那么就不需要Pin文件块
  195. str, err := ipfsCli.OpenRead(obj.Object.FileHash)
  196. return str, nil, err
  197. }
  198. type downloadNodeInfo struct {
  199. Node cdssdk.Node
  200. ObjectPinned bool
  201. Blocks []stgmod.ObjectBlock
  202. Distance float64
  203. }
  204. func (t *StorageLoadPackage) sortDownloadNodes(coorCli *coormq.Client, obj stgmod.ObjectDetail) ([]*downloadNodeInfo, error) {
  205. var nodeIDs []cdssdk.NodeID
  206. for _, id := range obj.PinnedAt {
  207. if !lo.Contains(nodeIDs, id) {
  208. nodeIDs = append(nodeIDs, id)
  209. }
  210. }
  211. for _, b := range obj.Blocks {
  212. if !lo.Contains(nodeIDs, b.NodeID) {
  213. nodeIDs = append(nodeIDs, b.NodeID)
  214. }
  215. }
  216. getNodes, err := coorCli.GetNodes(coormq.NewGetNodes(nodeIDs))
  217. if err != nil {
  218. return nil, fmt.Errorf("getting nodes: %w", err)
  219. }
  220. downloadNodeMap := make(map[cdssdk.NodeID]*downloadNodeInfo)
  221. for _, id := range obj.PinnedAt {
  222. node, ok := downloadNodeMap[id]
  223. if !ok {
  224. mod := *getNodes.GetNode(id)
  225. node = &downloadNodeInfo{
  226. Node: mod,
  227. ObjectPinned: true,
  228. Distance: t.getNodeDistance(mod),
  229. }
  230. downloadNodeMap[id] = node
  231. }
  232. node.ObjectPinned = true
  233. }
  234. for _, b := range obj.Blocks {
  235. node, ok := downloadNodeMap[b.NodeID]
  236. if !ok {
  237. mod := *getNodes.GetNode(b.NodeID)
  238. node = &downloadNodeInfo{
  239. Node: mod,
  240. Distance: t.getNodeDistance(mod),
  241. }
  242. downloadNodeMap[b.NodeID] = node
  243. }
  244. node.Blocks = append(node.Blocks, b)
  245. }
  246. return mysort.Sort(lo.Values(downloadNodeMap), func(left, right *downloadNodeInfo) int {
  247. return mysort.Cmp(left.Distance, right.Distance)
  248. }), nil
  249. }
  250. type downloadBlock struct {
  251. Node cdssdk.Node
  252. Block stgmod.ObjectBlock
  253. }
  254. func (t *StorageLoadPackage) getMinReadingBlockSolution(sortedNodes []*downloadNodeInfo, k int) (float64, []downloadBlock) {
  255. gotBlocksMap := bitmap.Bitmap64(0)
  256. var gotBlocks []downloadBlock
  257. dist := float64(0.0)
  258. for _, n := range sortedNodes {
  259. for _, b := range n.Blocks {
  260. if !gotBlocksMap.Get(b.Index) {
  261. gotBlocks = append(gotBlocks, downloadBlock{
  262. Node: n.Node,
  263. Block: b,
  264. })
  265. gotBlocksMap.Set(b.Index, true)
  266. dist += n.Distance
  267. }
  268. if len(gotBlocks) >= k {
  269. return dist, gotBlocks
  270. }
  271. }
  272. }
  273. return math.MaxFloat64, gotBlocks
  274. }
  275. func (t *StorageLoadPackage) getMinReadingObjectSolution(sortedNodes []*downloadNodeInfo, k int) (float64, *cdssdk.Node) {
  276. dist := math.MaxFloat64
  277. var downloadNode *cdssdk.Node
  278. for _, n := range sortedNodes {
  279. if n.ObjectPinned && float64(k)*n.Distance < dist {
  280. dist = float64(k) * n.Distance
  281. downloadNode = &n.Node
  282. }
  283. }
  284. return dist, downloadNode
  285. }
  286. func (t *StorageLoadPackage) getNodeDistance(node cdssdk.Node) float64 {
  287. if stgglb.Local.NodeID != nil {
  288. if node.NodeID == *stgglb.Local.NodeID {
  289. return consts.NodeDistanceSameNode
  290. }
  291. }
  292. if node.LocationID == stgglb.Local.LocationID {
  293. return consts.NodeDistanceSameLocation
  294. }
  295. return consts.NodeDistanceOther
  296. }

本项目旨在将云际存储公共基础设施化,使个人及企业可低门槛使用高效的云际存储服务(安装开箱即用云际存储客户端即可,无需关注其他组件的部署),同时支持用户灵活便捷定制云际存储的功能细节。