You can not select more than 25 topics Topics must start with a chinese character,a letter or number, can include dashes ('-') and can be up to 35 characters long.

strip_iterator.go 5.1 kB


  1. package downloader
  2. import (
  3. "io"
  4. "sync"
  5. "gitlink.org.cn/cloudream/common/pkgs/iterator"
  6. "gitlink.org.cn/cloudream/common/pkgs/logger"
  7. cdssdk "gitlink.org.cn/cloudream/common/sdks/storage"
  8. "gitlink.org.cn/cloudream/common/utils/sync2"
  9. stgmod "gitlink.org.cn/cloudream/storage/common/models"
  10. "gitlink.org.cn/cloudream/storage/common/pkgs/ec"
  11. )
  12. type downloadBlock struct {
  13. Node cdssdk.Node
  14. Block stgmod.ObjectBlock
  15. }
  16. type Strip struct {
  17. Data []byte
  18. Position int64
  19. }
  20. type StripIterator struct {
  21. object cdssdk.Object
  22. blocks []downloadBlock
  23. red *cdssdk.ECRedundancy
  24. curStripPos int64
  25. cache *StripCache
  26. dataChan chan dataChanEntry
  27. downloadingDone chan any
  28. downloadingDoneOnce sync.Once
  29. inited bool
  30. }
  31. type dataChanEntry struct {
  32. Data []byte
  33. Position int64 // 条带在文件中的位置。字节为单位
  34. Error error
  35. }
  36. func NewStripIterator(object cdssdk.Object, blocks []downloadBlock, red *cdssdk.ECRedundancy, beginStripPos int64, cache *StripCache, maxPrefetch int) *StripIterator {
  37. if maxPrefetch <= 0 {
  38. maxPrefetch = 1
  39. }
  40. iter := &StripIterator{
  41. object: object,
  42. blocks: blocks,
  43. red: red,
  44. curStripPos: beginStripPos,
  45. cache: cache,
  46. dataChan: make(chan dataChanEntry, maxPrefetch-1),
  47. downloadingDone: make(chan any),
  48. }
  49. return iter
  50. }
  51. func (s *StripIterator) MoveNext() (Strip, error) {
  52. if !s.inited {
  53. go s.downloading()
  54. s.inited = true
  55. }
  56. // 先尝试获取一下,用于判断本次获取是否发生了等待
  57. select {
  58. case entry, ok := <-s.dataChan:
  59. if !ok || entry.Error == io.EOF {
  60. return Strip{}, iterator.ErrNoMoreItem
  61. }
  62. if entry.Error != nil {
  63. return Strip{}, entry.Error
  64. }
  65. s.curStripPos++
  66. return Strip{Data: entry.Data, Position: entry.Position}, nil
  67. default:
  68. logger.Debugf("waitting for ec strip %v for object %v", s.curStripPos, s.object.ObjectID)
  69. }
  70. // 发生了等待
  71. select {
  72. case entry, ok := <-s.dataChan:
  73. if !ok || entry.Error == io.EOF {
  74. return Strip{}, iterator.ErrNoMoreItem
  75. }
  76. if entry.Error != nil {
  77. return Strip{}, entry.Error
  78. }
  79. s.curStripPos++
  80. return Strip{Data: entry.Data, Position: entry.Position}, nil
  81. case <-s.downloadingDone:
  82. return Strip{}, iterator.ErrNoMoreItem
  83. }
  84. }
  85. func (s *StripIterator) Close() {
  86. s.downloadingDoneOnce.Do(func() {
  87. close(s.downloadingDone)
  88. })
  89. }
  90. func (s *StripIterator) downloading() {
  91. rs, err := ec.NewRs(s.red.K, s.red.N)
  92. if err != nil {
  93. s.sendToDataChan(dataChanEntry{Error: err})
  94. return
  95. }
  96. var blockStrs []*IPFSReader
  97. for _, b := range s.blocks {
  98. blockStrs = append(blockStrs, NewIPFSReader(b.Node, b.Block.FileHash))
  99. }
  100. curStripPos := s.curStripPos
  101. loop:
  102. for {
  103. stripBytesPos := curStripPos * int64(s.red.ChunkSize)
  104. if stripBytesPos >= s.object.Size {
  105. s.sendToDataChan(dataChanEntry{Error: io.EOF})
  106. break
  107. }
  108. stripKey := ECStripKey{
  109. ObjectID: s.object.ObjectID,
  110. StripPosition: curStripPos,
  111. }
  112. item, ok := s.cache.Get(stripKey)
  113. if ok {
  114. if item.ObjectFileHash == s.object.FileHash {
  115. if !s.sendToDataChan(dataChanEntry{Data: item.Data, Position: stripBytesPos}) {
  116. break loop
  117. }
  118. curStripPos++
  119. continue
  120. } else {
  121. // 如果Object的Hash和Cache的Hash不一致,说明Cache是无效的,需要重新下载
  122. s.cache.Remove(stripKey)
  123. }
  124. }
  125. for _, str := range blockStrs {
  126. _, err := str.Seek(stripBytesPos*int64(s.red.ChunkSize), io.SeekStart)
  127. if err != nil {
  128. s.sendToDataChan(dataChanEntry{Error: err})
  129. break loop
  130. }
  131. }
  132. dataBuf := make([]byte, int64(s.red.K*s.red.ChunkSize))
  133. blockArrs := make([][]byte, s.red.N)
  134. for i := 0; i < s.red.K; i++ {
  135. // 放入的slice长度为0,但容量为ChunkSize,EC库发现长度为0的块后才会认为是待恢复块
  136. blockArrs[i] = dataBuf[i*s.red.ChunkSize : i*s.red.ChunkSize]
  137. }
  138. for _, b := range s.blocks {
  139. // 用于恢复的块则要将其长度变回ChunkSize,用于后续读取块数据
  140. if b.Block.Index < s.red.K {
  141. // 此处扩容不会导致slice指向一个新内存
  142. blockArrs[b.Block.Index] = blockArrs[b.Block.Index][0:s.red.ChunkSize]
  143. } else {
  144. blockArrs[b.Block.Index] = make([]byte, s.red.ChunkSize)
  145. }
  146. }
  147. err := sync2.ParallelDo(s.blocks, func(b downloadBlock, idx int) error {
  148. _, err := io.ReadFull(blockStrs[idx], blockArrs[b.Block.Index])
  149. return err
  150. })
  151. if err != nil {
  152. s.sendToDataChan(dataChanEntry{Error: err})
  153. break loop
  154. }
  155. err = rs.ReconstructData(blockArrs)
  156. if err != nil {
  157. s.sendToDataChan(dataChanEntry{Error: err})
  158. break loop
  159. }
  160. s.cache.Add(stripKey, ObjectECStrip{
  161. Data: dataBuf,
  162. ObjectFileHash: s.object.FileHash,
  163. })
  164. if !s.sendToDataChan(dataChanEntry{Data: dataBuf, Position: stripBytesPos}) {
  165. break loop
  166. }
  167. }
  168. for _, str := range blockStrs {
  169. str.Close()
  170. }
  171. close(s.dataChan)
  172. }
  173. func (s *StripIterator) sendToDataChan(entry dataChanEntry) bool {
  174. select {
  175. case s.dataChan <- entry:
  176. return true
  177. case <-s.downloadingDone:
  178. return false
  179. }
  180. }

本项目旨在将云际存储公共基础设施化,使个人及企业可低门槛使用高效的云际存储服务(安装开箱即用云际存储客户端即可,无需关注其他组件的部署),同时支持用户灵活便捷定制云际存储的功能细节。