You can not select more than 25 topics Topics must start with a chinese character,a letter or number, can include dashes ('-') and can be up to 35 characters long.

lrc_strip_iterator.go 4.9 kB

7 months ago
7 months ago
7 months ago
7 months ago
7 months ago
7 months ago
7 months ago
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199
  1. package downloader
  2. import (
  3. "context"
  4. "io"
  5. "sync"
  6. "gitlink.org.cn/cloudream/common/pkgs/ioswitch/exec"
  7. "gitlink.org.cn/cloudream/common/pkgs/iterator"
  8. "gitlink.org.cn/cloudream/common/pkgs/logger"
  9. "gitlink.org.cn/cloudream/common/utils/math2"
  10. clitypes "gitlink.org.cn/cloudream/storage2/client/types"
  11. "gitlink.org.cn/cloudream/storage2/common/pkgs/ioswitchlrc"
  12. "gitlink.org.cn/cloudream/storage2/common/pkgs/ioswitchlrc/parser"
  13. )
  14. type LRCStripIterator struct {
  15. downloder *Downloader
  16. object clitypes.Object
  17. blocks []downloadBlock
  18. red clitypes.LRCRedundancy
  19. curStripIndex int64
  20. cache *StripCache
  21. dataChan chan dataChanEntry
  22. downloadingDone chan any
  23. downloadingDoneOnce sync.Once
  24. inited bool
  25. }
  26. func NewLRCStripIterator(downloder *Downloader, object clitypes.Object, blocks []downloadBlock, red clitypes.LRCRedundancy, beginStripIndex int64, cache *StripCache, maxPrefetch int) *LRCStripIterator {
  27. if maxPrefetch <= 0 {
  28. maxPrefetch = 1
  29. }
  30. iter := &LRCStripIterator{
  31. downloder: downloder,
  32. object: object,
  33. blocks: blocks,
  34. red: red,
  35. curStripIndex: beginStripIndex,
  36. cache: cache,
  37. dataChan: make(chan dataChanEntry, maxPrefetch-1),
  38. downloadingDone: make(chan any),
  39. }
  40. return iter
  41. }
  42. func (s *LRCStripIterator) MoveNext() (Strip, error) {
  43. if !s.inited {
  44. go s.downloading()
  45. s.inited = true
  46. }
  47. // 先尝试获取一下,用于判断本次获取是否发生了等待
  48. select {
  49. case entry, ok := <-s.dataChan:
  50. if !ok || entry.Error == io.EOF {
  51. return Strip{}, iterator.ErrNoMoreItem
  52. }
  53. if entry.Error != nil {
  54. return Strip{}, entry.Error
  55. }
  56. s.curStripIndex++
  57. return Strip{Data: entry.Data, Position: entry.Position}, nil
  58. default:
  59. logger.Debugf("waitting for ec strip %v of object %v", s.curStripIndex, s.object.ObjectID)
  60. }
  61. // 发生了等待
  62. select {
  63. case entry, ok := <-s.dataChan:
  64. if !ok || entry.Error == io.EOF {
  65. return Strip{}, iterator.ErrNoMoreItem
  66. }
  67. if entry.Error != nil {
  68. return Strip{}, entry.Error
  69. }
  70. s.curStripIndex++
  71. return Strip{Data: entry.Data, Position: entry.Position}, nil
  72. case <-s.downloadingDone:
  73. return Strip{}, iterator.ErrNoMoreItem
  74. }
  75. }
  76. func (s *LRCStripIterator) Close() {
  77. s.downloadingDoneOnce.Do(func() {
  78. close(s.downloadingDone)
  79. })
  80. }
  81. func (s *LRCStripIterator) downloading() {
  82. var froms []ioswitchlrc.From
  83. for _, b := range s.blocks {
  84. space := b.Space
  85. froms = append(froms, ioswitchlrc.NewFromStorage(b.Block.FileHash, *space.MasterHub, space, b.Block.Index))
  86. }
  87. toExec, hd := ioswitchlrc.NewToDriverWithRange(-1, math2.Range{
  88. Offset: s.curStripIndex * int64(s.red.ChunkSize*s.red.K),
  89. })
  90. plans := exec.NewPlanBuilder()
  91. err := parser.ReconstructAny(froms, []ioswitchlrc.To{toExec}, plans)
  92. if err != nil {
  93. s.sendToDataChan(dataChanEntry{Error: err})
  94. return
  95. }
  96. exeCtx := exec.NewExecContext()
  97. exec.SetValueByType(exeCtx, s.downloder.stgPool)
  98. exec := plans.Execute(exeCtx)
  99. ctx, cancel := context.WithCancel(context.Background())
  100. go exec.Wait(ctx)
  101. defer cancel()
  102. str, err := exec.BeginRead(hd)
  103. if err != nil {
  104. s.sendToDataChan(dataChanEntry{Error: err})
  105. return
  106. }
  107. curStripIndex := s.curStripIndex
  108. loop:
  109. for {
  110. stripBytesPos := curStripIndex * int64(s.red.K) * int64(s.red.ChunkSize)
  111. if stripBytesPos >= s.object.Size {
  112. s.sendToDataChan(dataChanEntry{Error: io.EOF})
  113. break
  114. }
  115. stripKey := ECStripKey{
  116. ObjectID: s.object.ObjectID,
  117. StripIndex: curStripIndex,
  118. }
  119. item, ok := s.cache.Get(stripKey)
  120. if ok {
  121. if item.ObjectFileHash == s.object.FileHash {
  122. if !s.sendToDataChan(dataChanEntry{Data: item.Data, Position: stripBytesPos}) {
  123. break loop
  124. }
  125. curStripIndex++
  126. continue
  127. } else {
  128. // 如果Object的Hash和Cache的Hash不一致,说明Cache是无效的,需要重新下载
  129. s.cache.Remove(stripKey)
  130. }
  131. }
  132. dataBuf := make([]byte, int64(s.red.K*s.red.ChunkSize))
  133. n, err := io.ReadFull(str, dataBuf)
  134. if err == io.ErrUnexpectedEOF {
  135. s.cache.Add(stripKey, ObjectECStrip{
  136. Data: dataBuf,
  137. ObjectFileHash: s.object.FileHash,
  138. })
  139. s.sendToDataChan(dataChanEntry{Data: dataBuf[:n], Position: stripBytesPos})
  140. s.sendToDataChan(dataChanEntry{Error: io.EOF})
  141. break loop
  142. }
  143. if err != nil {
  144. s.sendToDataChan(dataChanEntry{Error: err})
  145. break loop
  146. }
  147. s.cache.Add(stripKey, ObjectECStrip{
  148. Data: dataBuf,
  149. ObjectFileHash: s.object.FileHash,
  150. })
  151. if !s.sendToDataChan(dataChanEntry{Data: dataBuf, Position: stripBytesPos}) {
  152. break loop
  153. }
  154. curStripIndex++
  155. }
  156. close(s.dataChan)
  157. }
  158. func (s *LRCStripIterator) sendToDataChan(entry dataChanEntry) bool {
  159. select {
  160. case s.dataChan <- entry:
  161. return true
  162. case <-s.downloadingDone:
  163. return false
  164. }
  165. }

本项目旨在将云际存储公共基础设施化,使个人及企业可低门槛使用高效的云际存储服务(安装开箱即用云际存储客户端即可,无需关注其他组件的部署),同时支持用户灵活便捷定制云际存储的功能细节。