You can not select more than 25 topics Topics must start with a chinese character,a letter or number, can include dashes ('-') and can be up to 35 characters long.

download_object_iterator.go 9.0 kB

2 years ago
2 years ago
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313
  1. package iterator
  2. import (
  3. "fmt"
  4. "io"
  5. "math/rand"
  6. "reflect"
  7. "github.com/samber/lo"
  8. "gitlink.org.cn/cloudream/common/pkgs/logger"
  9. cdssdk "gitlink.org.cn/cloudream/common/sdks/storage"
  10. myio "gitlink.org.cn/cloudream/common/utils/io"
  11. stgglb "gitlink.org.cn/cloudream/storage/common/globals"
  12. stgmodels "gitlink.org.cn/cloudream/storage/common/models"
  13. "gitlink.org.cn/cloudream/storage/common/pkgs/db/model"
  14. "gitlink.org.cn/cloudream/storage/common/pkgs/distlock"
  15. "gitlink.org.cn/cloudream/storage/common/pkgs/distlock/reqbuilder"
  16. "gitlink.org.cn/cloudream/storage/common/pkgs/ec"
  17. coormq "gitlink.org.cn/cloudream/storage/common/pkgs/mq/coordinator"
  18. )
  19. type DownloadingObjectIterator = Iterator[*IterDownloadingObject]
  20. type IterDownloadingObject struct {
  21. Object model.Object
  22. File io.ReadCloser
  23. }
  24. type DownloadNodeInfo struct {
  25. Node model.Node
  26. IsSameLocation bool
  27. }
  28. type DownloadContext struct {
  29. Distlock *distlock.Service
  30. }
  31. type DownloadObjectIterator struct {
  32. OnClosing func()
  33. objectDetails []stgmodels.ObjectDetail
  34. currentIndex int
  35. downloadCtx *DownloadContext
  36. }
  37. func NewDownloadObjectIterator(objectDetails []stgmodels.ObjectDetail, downloadCtx *DownloadContext) *DownloadObjectIterator {
  38. return &DownloadObjectIterator{
  39. objectDetails: objectDetails,
  40. downloadCtx: downloadCtx,
  41. }
  42. }
  43. func (i *DownloadObjectIterator) MoveNext() (*IterDownloadingObject, error) {
  44. coorCli, err := stgglb.CoordinatorMQPool.Acquire()
  45. if err != nil {
  46. return nil, fmt.Errorf("new coordinator client: %w", err)
  47. }
  48. defer stgglb.CoordinatorMQPool.Release(coorCli)
  49. if i.currentIndex >= len(i.objectDetails) {
  50. return nil, ErrNoMoreItem
  51. }
  52. item, err := i.doMove(coorCli)
  53. i.currentIndex++
  54. return item, err
  55. }
  56. func (iter *DownloadObjectIterator) doMove(coorCli *coormq.Client) (*IterDownloadingObject, error) {
  57. obj := iter.objectDetails[iter.currentIndex]
  58. switch red := obj.Object.Redundancy.(type) {
  59. case *cdssdk.RepRedundancy:
  60. reader, err := iter.downloadRepObject(coorCli, iter.downloadCtx, obj, red)
  61. if err != nil {
  62. return nil, fmt.Errorf("downloading rep object: %w", err)
  63. }
  64. return &IterDownloadingObject{
  65. Object: obj.Object,
  66. File: reader,
  67. }, nil
  68. case *cdssdk.ECRedundancy:
  69. reader, err := iter.downloadECObject(coorCli, iter.downloadCtx, obj, red)
  70. if err != nil {
  71. return nil, fmt.Errorf("downloading ec object: %w", err)
  72. }
  73. return &IterDownloadingObject{
  74. Object: obj.Object,
  75. File: reader,
  76. }, nil
  77. }
  78. return nil, fmt.Errorf("unsupported redundancy type: %v", reflect.TypeOf(obj.Object.Redundancy))
  79. }
  80. func (i *DownloadObjectIterator) Close() {
  81. if i.OnClosing != nil {
  82. i.OnClosing()
  83. }
  84. }
  85. // chooseDownloadNode 选择一个下载节点
  86. // 1. 从与当前客户端相同地域的节点中随机选一个
  87. // 2. 没有用的话从所有节点中随机选一个
  88. func (i *DownloadObjectIterator) chooseDownloadNode(entries []DownloadNodeInfo) DownloadNodeInfo {
  89. sameLocationEntries := lo.Filter(entries, func(e DownloadNodeInfo, i int) bool { return e.IsSameLocation })
  90. if len(sameLocationEntries) > 0 {
  91. return sameLocationEntries[rand.Intn(len(sameLocationEntries))]
  92. }
  93. return entries[rand.Intn(len(entries))]
  94. }
  95. func (iter *DownloadObjectIterator) downloadRepObject(coorCli *coormq.Client, ctx *DownloadContext, obj stgmodels.ObjectDetail, repRed *cdssdk.RepRedundancy) (io.ReadCloser, error) {
  96. //采取直接读,优先选内网节点
  97. var chosenNodes []DownloadNodeInfo
  98. for i := range obj.Blocks {
  99. if len(obj.Blocks[i].CachedNodeIDs) == 0 {
  100. return nil, fmt.Errorf("no node has block %d", obj.Blocks[i].Index)
  101. }
  102. getNodesResp, err := coorCli.GetNodes(coormq.NewGetNodes(obj.Blocks[i].CachedNodeIDs))
  103. if err != nil {
  104. continue
  105. }
  106. downloadNodes := lo.Map(getNodesResp.Nodes, func(node model.Node, index int) DownloadNodeInfo {
  107. return DownloadNodeInfo{
  108. Node: node,
  109. IsSameLocation: node.LocationID == stgglb.Local.LocationID,
  110. }
  111. })
  112. chosenNodes = append(chosenNodes, iter.chooseDownloadNode(downloadNodes))
  113. }
  114. var fileStrs []io.ReadCloser
  115. for i := range obj.Blocks {
  116. str, err := downloadFile(ctx, chosenNodes[i], obj.Blocks[i].FileHash)
  117. if err != nil {
  118. for i -= 1; i >= 0; i-- {
  119. fileStrs[i].Close()
  120. }
  121. return nil, fmt.Errorf("donwloading file: %w", err)
  122. }
  123. fileStrs = append(fileStrs, str)
  124. }
  125. fileReaders, filesCloser := myio.ToReaders(fileStrs)
  126. return myio.AfterReadClosed(myio.Length(myio.Join(fileReaders), obj.Object.Size), func(c io.ReadCloser) {
  127. filesCloser()
  128. }), nil
  129. }
  130. func (iter *DownloadObjectIterator) downloadECObject(coorCli *coormq.Client, ctx *DownloadContext, obj stgmodels.ObjectDetail, ecRed *cdssdk.ECRedundancy) (io.ReadCloser, error) {
  131. //采取直接读,优先选内网节点
  132. var chosenNodes []DownloadNodeInfo
  133. var chosenBlocks []stgmodels.ObjectBlockDetail
  134. for i := range obj.Blocks {
  135. if len(chosenBlocks) == ecRed.K {
  136. break
  137. }
  138. // 块没有被任何节点缓存或者获取失败都没关系,只要能获取到k个块的信息就行
  139. if len(obj.Blocks[i].CachedNodeIDs) == 0 {
  140. continue
  141. }
  142. getNodesResp, err := coorCli.GetNodes(coormq.NewGetNodes(obj.Blocks[i].CachedNodeIDs))
  143. if err != nil {
  144. continue
  145. }
  146. downloadNodes := lo.Map(getNodesResp.Nodes, func(node model.Node, index int) DownloadNodeInfo {
  147. return DownloadNodeInfo{
  148. Node: node,
  149. IsSameLocation: node.LocationID == stgglb.Local.LocationID,
  150. }
  151. })
  152. chosenBlocks = append(chosenBlocks, obj.Blocks[i])
  153. chosenNodes = append(chosenNodes, iter.chooseDownloadNode(downloadNodes))
  154. }
  155. if len(chosenBlocks) < ecRed.K {
  156. return nil, fmt.Errorf("no enough blocks to reconstruct the file, want %d, get only %d", ecRed.K, len(chosenBlocks))
  157. }
  158. var fileStrs []io.ReadCloser
  159. rs, err := ec.NewRs(ecRed.K, ecRed.N, ecRed.ChunkSize)
  160. if err != nil {
  161. return nil, fmt.Errorf("new rs: %w", err)
  162. }
  163. for i := range chosenBlocks {
  164. str, err := downloadFile(ctx, chosenNodes[i], chosenBlocks[i].FileHash)
  165. if err != nil {
  166. for i -= 1; i >= 0; i-- {
  167. fileStrs[i].Close()
  168. }
  169. return nil, fmt.Errorf("donwloading file: %w", err)
  170. }
  171. fileStrs = append(fileStrs, str)
  172. }
  173. fileReaders, filesCloser := myio.ToReaders(fileStrs)
  174. var indexes []int
  175. for _, b := range chosenBlocks {
  176. indexes = append(indexes, b.Index)
  177. }
  178. outputs, outputsCloser := myio.ToReaders(rs.ReconstructData(fileReaders, indexes))
  179. return myio.AfterReadClosed(myio.Length(myio.ChunkedJoin(outputs, int(ecRed.ChunkSize)), obj.Object.Size), func(c io.ReadCloser) {
  180. filesCloser()
  181. outputsCloser()
  182. }), nil
  183. }
  184. func downloadFile(ctx *DownloadContext, node DownloadNodeInfo, fileHash string) (io.ReadCloser, error) {
  185. // 如果客户端与节点在同一个地域,则使用内网地址连接节点
  186. nodeIP := node.Node.ExternalIP
  187. grpcPort := node.Node.ExternalGRPCPort
  188. if node.IsSameLocation {
  189. nodeIP = node.Node.LocalIP
  190. grpcPort = node.Node.LocalGRPCPort
  191. logger.Infof("client and node %d are at the same location, use local ip", node.Node.NodeID)
  192. }
  193. if stgglb.IPFSPool != nil {
  194. logger.Infof("try to use local IPFS to download file")
  195. reader, err := downloadFromLocalIPFS(ctx, fileHash)
  196. if err == nil {
  197. return reader, nil
  198. }
  199. logger.Warnf("download from local IPFS failed, so try to download from node %s, err: %s", nodeIP, err.Error())
  200. }
  201. return downloadFromNode(ctx, node.Node.NodeID, nodeIP, grpcPort, fileHash)
  202. }
  203. func downloadFromNode(ctx *DownloadContext, nodeID cdssdk.NodeID, nodeIP string, grpcPort int, fileHash string) (io.ReadCloser, error) {
  204. // 二次获取锁
  205. mutex, err := reqbuilder.NewBuilder().
  206. // 用于从IPFS下载文件
  207. IPFS().ReadOneRep(nodeID, fileHash).
  208. MutexLock(ctx.Distlock)
  209. if err != nil {
  210. return nil, fmt.Errorf("acquire locks failed, err: %w", err)
  211. }
  212. // 连接grpc
  213. agtCli, err := stgglb.AgentRPCPool.Acquire(nodeIP, grpcPort)
  214. if err != nil {
  215. return nil, fmt.Errorf("new agent grpc client: %w", err)
  216. }
  217. reader, err := agtCli.GetIPFSFile(fileHash)
  218. if err != nil {
  219. return nil, fmt.Errorf("getting ipfs file: %w", err)
  220. }
  221. reader = myio.AfterReadClosed(reader, func(io.ReadCloser) {
  222. mutex.Unlock()
  223. })
  224. return reader, nil
  225. }
  226. func downloadFromLocalIPFS(ctx *DownloadContext, fileHash string) (io.ReadCloser, error) {
  227. onClosed := func() {}
  228. if stgglb.Local.NodeID != nil {
  229. // 二次获取锁
  230. mutex, err := reqbuilder.NewBuilder().
  231. // 用于从IPFS下载文件
  232. IPFS().ReadOneRep(*stgglb.Local.NodeID, fileHash).
  233. MutexLock(ctx.Distlock)
  234. if err != nil {
  235. return nil, fmt.Errorf("acquire locks failed, err: %w", err)
  236. }
  237. onClosed = func() {
  238. mutex.Unlock()
  239. }
  240. }
  241. ipfsCli, err := stgglb.IPFSPool.Acquire()
  242. if err != nil {
  243. return nil, fmt.Errorf("new ipfs client: %w", err)
  244. }
  245. reader, err := ipfsCli.OpenRead(fileHash)
  246. if err != nil {
  247. return nil, fmt.Errorf("read ipfs file failed, err: %w", err)
  248. }
  249. reader = myio.AfterReadClosed(reader, func(io.ReadCloser) {
  250. onClosed()
  251. })
  252. return reader, nil
  253. }

本项目旨在将云际存储公共基础设施化,使个人及企业可低门槛使用高效的云际存储服务(安装开箱即用云际存储客户端即可,无需关注其他组件的部署),同时支持用户灵活便捷定制云际存储的功能细节。