diff --git a/common/pkgs/downloader/strip_iterator.go b/common/pkgs/downloader/strip_iterator.go index 24cbf71..828dcdf 100644 --- a/common/pkgs/downloader/strip_iterator.go +++ b/common/pkgs/downloader/strip_iterator.go @@ -25,15 +25,18 @@ type Strip struct { } type StripIterator struct { - object cdssdk.Object - blocks []downloadBlock - red *cdssdk.ECRedundancy - curStripIndex int64 - cache *StripCache - dataChan chan dataChanEntry - downloadingDone chan any - downloadingDoneOnce sync.Once - inited bool + object cdssdk.Object + blocks []downloadBlock + red *cdssdk.ECRedundancy + curStripIndex int64 + cache *StripCache + dataChan chan dataChanEntry + downloadingDone chan any + downloadingDoneOnce sync.Once + inited bool + downloadingStream io.ReadCloser + downloadingStripIndex int64 + downloadingPlanCtxCancel func() } type dataChanEntry struct { @@ -110,36 +113,6 @@ func (s *StripIterator) Close() { } func (s *StripIterator) downloading() { - ft := ioswitch2.NewFromTo() - for _, b := range s.blocks { - node := b.Node - ft.AddFrom(ioswitch2.NewFromNode(b.Block.FileHash, &node, b.Block.Index)) - } - - toExec, hd := ioswitch2.NewToDriverWithRange(-1, exec.Range{ - Offset: s.curStripIndex * int64(s.red.ChunkSize*s.red.K), - }) - ft.AddTo(toExec) - - parser := parser.NewParser(*s.red) - plans := exec.NewPlanBuilder() - err := parser.Parse(ft, plans) - if err != nil { - s.sendToDataChan(dataChanEntry{Error: err}) - return - } - exec := plans.Execute() - - ctx, cancel := context.WithCancel(context.Background()) - go exec.Wait(ctx) - defer cancel() - - str, err := exec.BeginRead(hd) - if err != nil { - s.sendToDataChan(dataChanEntry{Error: err}) - return - } - curStripIndex := s.curStripIndex loop: for { @@ -170,7 +143,7 @@ loop: } dataBuf := make([]byte, int64(s.red.K*s.red.ChunkSize)) - n, err := io.ReadFull(str, dataBuf) + n, err := s.readStrip(curStripIndex, dataBuf) if err == io.ErrUnexpectedEOF { // dataBuf中的内容可能不足一个条带,但仍然将其完整放入cache中,外部应该自行计算该从这个buffer中读多少数据 s.cache.Add(stripKey, ObjectECStrip{ @@ -210,3 +183,49 @@ func (s *StripIterator) sendToDataChan(entry dataChanEntry) bool { return false } } + +func (s *StripIterator) readStrip(stripIndex int64, buf []byte) (int, error) { + // 如果需求的条带不当前正在下载的条带的位置不符合,则需要重新打开下载流 + if s.downloadingStream == nil || s.downloadingStripIndex != stripIndex { + if s.downloadingStream != nil { + s.downloadingStream.Close() + s.downloadingPlanCtxCancel() + } + + ft := ioswitch2.NewFromTo() + for _, b := range s.blocks { + node := b.Node + ft.AddFrom(ioswitch2.NewFromNode(b.Block.FileHash, &node, b.Block.Index)) + } + + toExec, hd := ioswitch2.NewToDriverWithRange(-1, exec.Range{ + Offset: s.curStripIndex * s.red.StripSize(), + }) + ft.AddTo(toExec) + + parser := parser.NewParser(*s.red) + plans := exec.NewPlanBuilder() + err := parser.Parse(ft, plans) + if err != nil { + return 0, err + } + exec := plans.Execute() + + ctx, cancel := context.WithCancel(context.Background()) + go exec.Wait(ctx) + + str, err := exec.BeginRead(hd) + if err != nil { + cancel() + return 0, err + } + + s.downloadingStream = str + s.downloadingStripIndex = stripIndex + s.downloadingPlanCtxCancel = cancel + } + + n, err := io.ReadFull(s.downloadingStream, buf) + s.downloadingStripIndex += 1 + return n, err +}