You can not select more than 25 topics Topics must start with a chinese character,a letter or number, can include dashes ('-') and can be up to 35 characters long.

strip_iterator.go 6.1 kB

7 months ago
7 months ago
7 months ago
7 months ago
7 months ago
7 months ago
7 months ago
7 months ago
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240
  1. package downloader
  2. import (
  3. "context"
  4. "io"
  5. "sync"
  6. "gitlink.org.cn/cloudream/common/pkgs/ioswitch/exec"
  7. "gitlink.org.cn/cloudream/common/pkgs/iterator"
  8. "gitlink.org.cn/cloudream/common/pkgs/logger"
  9. "gitlink.org.cn/cloudream/common/utils/math2"
  10. "gitlink.org.cn/cloudream/storage2/client/types"
  11. "gitlink.org.cn/cloudream/storage2/common/pkgs/ioswitch2"
  12. "gitlink.org.cn/cloudream/storage2/common/pkgs/ioswitch2/parser"
  13. )
  14. type downloadBlock struct {
  15. Space types.UserSpaceDetail
  16. Block types.ObjectBlock
  17. }
  18. type Strip struct {
  19. Data []byte
  20. Position int64
  21. }
  22. type StripIterator struct {
  23. downloader *Downloader
  24. object types.Object
  25. blocks []downloadBlock
  26. red types.ECRedundancy
  27. curStripIndex int64
  28. cache *StripCache
  29. dataChan chan dataChanEntry
  30. downloadingDone chan any
  31. downloadingDoneOnce sync.Once
  32. inited bool
  33. downloadingStream io.ReadCloser
  34. downloadingStripIndex int64
  35. downloadingPlanCtxCancel func()
  36. }
  37. type dataChanEntry struct {
  38. Data []byte
  39. Position int64 // 条带在文件中的位置。字节为单位
  40. Error error
  41. }
  42. func NewStripIterator(downloader *Downloader, object types.Object, blocks []downloadBlock, red types.ECRedundancy, beginStripIndex int64, cache *StripCache, maxPrefetch int) *StripIterator {
  43. if maxPrefetch <= 0 {
  44. maxPrefetch = 1
  45. }
  46. iter := &StripIterator{
  47. downloader: downloader,
  48. object: object,
  49. blocks: blocks,
  50. red: red,
  51. curStripIndex: beginStripIndex,
  52. cache: cache,
  53. dataChan: make(chan dataChanEntry, maxPrefetch-1),
  54. downloadingDone: make(chan any),
  55. }
  56. return iter
  57. }
  58. func (s *StripIterator) MoveNext() (Strip, error) {
  59. if !s.inited {
  60. go s.downloading(s.curStripIndex)
  61. s.inited = true
  62. }
  63. // 先尝试获取一下,用于判断本次获取是否发生了等待
  64. select {
  65. case entry, ok := <-s.dataChan:
  66. if !ok || entry.Error == io.EOF {
  67. return Strip{}, iterator.ErrNoMoreItem
  68. }
  69. if entry.Error != nil {
  70. return Strip{}, entry.Error
  71. }
  72. s.curStripIndex++
  73. return Strip{Data: entry.Data, Position: entry.Position}, nil
  74. default:
  75. logger.Debugf("waitting for ec strip %v of object %v", s.curStripIndex, s.object.ObjectID)
  76. }
  77. // 发生了等待
  78. select {
  79. case entry, ok := <-s.dataChan:
  80. if !ok || entry.Error == io.EOF {
  81. return Strip{}, iterator.ErrNoMoreItem
  82. }
  83. if entry.Error != nil {
  84. return Strip{}, entry.Error
  85. }
  86. s.curStripIndex++
  87. return Strip{Data: entry.Data, Position: entry.Position}, nil
  88. case <-s.downloadingDone:
  89. return Strip{}, iterator.ErrNoMoreItem
  90. }
  91. }
  92. func (s *StripIterator) Close() {
  93. s.downloadingDoneOnce.Do(func() {
  94. close(s.downloadingDone)
  95. })
  96. }
  97. func (s *StripIterator) downloading(startStripIndex int64) {
  98. curStripIndex := startStripIndex
  99. loop:
  100. for {
  101. stripBytesPos := curStripIndex * int64(s.red.K) * int64(s.red.ChunkSize)
  102. if stripBytesPos >= s.object.Size {
  103. s.sendToDataChan(dataChanEntry{Error: io.EOF})
  104. break
  105. }
  106. stripKey := ECStripKey{
  107. ObjectID: s.object.ObjectID,
  108. StripIndex: curStripIndex,
  109. }
  110. item, ok := s.cache.Get(stripKey)
  111. if ok {
  112. if item.ObjectFileHash == s.object.FileHash {
  113. if !s.sendToDataChan(dataChanEntry{Data: item.Data, Position: stripBytesPos}) {
  114. break loop
  115. }
  116. curStripIndex++
  117. continue
  118. } else {
  119. // 如果Object的Hash和Cache的Hash不一致,说明Cache是无效的,需要重新下载
  120. s.cache.Remove(stripKey)
  121. }
  122. }
  123. dataBuf := make([]byte, int64(s.red.K*s.red.ChunkSize))
  124. n, err := s.readStrip(curStripIndex, dataBuf)
  125. if err == io.ErrUnexpectedEOF {
  126. // dataBuf中的内容可能不足一个条带,但仍然将其完整放入cache中,外部应该自行计算该从这个buffer中读多少数据
  127. s.cache.Add(stripKey, ObjectECStrip{
  128. Data: dataBuf,
  129. ObjectFileHash: s.object.FileHash,
  130. })
  131. s.sendToDataChan(dataChanEntry{Data: dataBuf[:n], Position: stripBytesPos})
  132. s.sendToDataChan(dataChanEntry{Error: io.EOF})
  133. break loop
  134. }
  135. if err != nil {
  136. s.sendToDataChan(dataChanEntry{Error: err})
  137. break loop
  138. }
  139. s.cache.Add(stripKey, ObjectECStrip{
  140. Data: dataBuf,
  141. ObjectFileHash: s.object.FileHash,
  142. })
  143. if !s.sendToDataChan(dataChanEntry{Data: dataBuf, Position: stripBytesPos}) {
  144. break loop
  145. }
  146. curStripIndex++
  147. }
  148. if s.downloadingStream != nil {
  149. s.downloadingStream.Close()
  150. s.downloadingPlanCtxCancel()
  151. }
  152. close(s.dataChan)
  153. }
  154. func (s *StripIterator) sendToDataChan(entry dataChanEntry) bool {
  155. select {
  156. case s.dataChan <- entry:
  157. return true
  158. case <-s.downloadingDone:
  159. return false
  160. }
  161. }
  162. func (s *StripIterator) readStrip(stripIndex int64, buf []byte) (int, error) {
  163. // 如果需求的条带不当前正在下载的条带的位置不符合,则需要重新打开下载流
  164. if s.downloadingStream == nil || s.downloadingStripIndex != stripIndex {
  165. if s.downloadingStream != nil {
  166. s.downloadingStream.Close()
  167. s.downloadingPlanCtxCancel()
  168. }
  169. ft := ioswitch2.NewFromTo()
  170. ft.ECParam = &s.red
  171. for _, b := range s.blocks {
  172. space := b.Space
  173. ft.AddFrom(ioswitch2.NewFromShardstore(b.Block.FileHash, *space.MasterHub, space, ioswitch2.ECStream(b.Block.Index)))
  174. }
  175. toExec, hd := ioswitch2.NewToDriverWithRange(ioswitch2.RawStream(), math2.Range{
  176. Offset: stripIndex * s.red.StripSize(),
  177. })
  178. ft.AddTo(toExec)
  179. plans := exec.NewPlanBuilder()
  180. err := parser.Parse(ft, plans)
  181. if err != nil {
  182. return 0, err
  183. }
  184. exeCtx := exec.NewExecContext()
  185. exec.SetValueByType(exeCtx, s.downloader.stgPool)
  186. exec := plans.Execute(exeCtx)
  187. ctx, cancel := context.WithCancel(context.Background())
  188. go exec.Wait(ctx)
  189. str, err := exec.BeginRead(hd)
  190. if err != nil {
  191. cancel()
  192. return 0, err
  193. }
  194. s.downloadingStream = str
  195. s.downloadingStripIndex = stripIndex
  196. s.downloadingPlanCtxCancel = cancel
  197. }
  198. n, err := io.ReadFull(s.downloadingStream, buf)
  199. s.downloadingStripIndex += 1
  200. return n, err
  201. }

本项目旨在将云际存储公共基础设施化,使个人及企业可低门槛使用高效的云际存储服务(安装开箱即用云际存储客户端即可,无需关注其他组件的部署),同时支持用户灵活便捷定制云际存储的功能细节。