Browse Source

解决缓存bug

gitlink
Sydonian 1 year ago
parent
commit
61eb94a875
1 changed files with 59 additions and 40 deletions
  1. +59
    -40
      common/pkgs/downloader/strip_iterator.go

+ 59
- 40
common/pkgs/downloader/strip_iterator.go View File

@@ -25,15 +25,18 @@ type Strip struct {
} }


type StripIterator struct { type StripIterator struct {
object cdssdk.Object
blocks []downloadBlock
red *cdssdk.ECRedundancy
curStripIndex int64
cache *StripCache
dataChan chan dataChanEntry
downloadingDone chan any
downloadingDoneOnce sync.Once
inited bool
object cdssdk.Object
blocks []downloadBlock
red *cdssdk.ECRedundancy
curStripIndex int64
cache *StripCache
dataChan chan dataChanEntry
downloadingDone chan any
downloadingDoneOnce sync.Once
inited bool
downloadingStream io.ReadCloser
downloadingStripIndex int64
downloadingPlanCtxCancel func()
} }


type dataChanEntry struct { type dataChanEntry struct {
@@ -110,36 +113,6 @@ func (s *StripIterator) Close() {
} }


func (s *StripIterator) downloading() { func (s *StripIterator) downloading() {
ft := ioswitch2.NewFromTo()
for _, b := range s.blocks {
node := b.Node
ft.AddFrom(ioswitch2.NewFromNode(b.Block.FileHash, &node, b.Block.Index))
}

toExec, hd := ioswitch2.NewToDriverWithRange(-1, exec.Range{
Offset: s.curStripIndex * int64(s.red.ChunkSize*s.red.K),
})
ft.AddTo(toExec)

parser := parser.NewParser(*s.red)
plans := exec.NewPlanBuilder()
err := parser.Parse(ft, plans)
if err != nil {
s.sendToDataChan(dataChanEntry{Error: err})
return
}
exec := plans.Execute()

ctx, cancel := context.WithCancel(context.Background())
go exec.Wait(ctx)
defer cancel()

str, err := exec.BeginRead(hd)
if err != nil {
s.sendToDataChan(dataChanEntry{Error: err})
return
}

curStripIndex := s.curStripIndex curStripIndex := s.curStripIndex
loop: loop:
for { for {
@@ -170,7 +143,7 @@ loop:
} }


dataBuf := make([]byte, int64(s.red.K*s.red.ChunkSize)) dataBuf := make([]byte, int64(s.red.K*s.red.ChunkSize))
n, err := io.ReadFull(str, dataBuf)
n, err := s.readStrip(curStripIndex, dataBuf)
if err == io.ErrUnexpectedEOF { if err == io.ErrUnexpectedEOF {
// dataBuf中的内容可能不足一个条带,但仍然将其完整放入cache中,外部应该自行计算该从这个buffer中读多少数据 // dataBuf中的内容可能不足一个条带,但仍然将其完整放入cache中,外部应该自行计算该从这个buffer中读多少数据
s.cache.Add(stripKey, ObjectECStrip{ s.cache.Add(stripKey, ObjectECStrip{
@@ -210,3 +183,49 @@ func (s *StripIterator) sendToDataChan(entry dataChanEntry) bool {
return false return false
} }
} }

func (s *StripIterator) readStrip(stripIndex int64, buf []byte) (int, error) {
// 如果需求的条带不当前正在下载的条带的位置不符合,则需要重新打开下载流
if s.downloadingStream == nil || s.downloadingStripIndex != stripIndex {
if s.downloadingStream != nil {
s.downloadingStream.Close()
s.downloadingPlanCtxCancel()
}

ft := ioswitch2.NewFromTo()
for _, b := range s.blocks {
node := b.Node
ft.AddFrom(ioswitch2.NewFromNode(b.Block.FileHash, &node, b.Block.Index))
}

toExec, hd := ioswitch2.NewToDriverWithRange(-1, exec.Range{
Offset: s.curStripIndex * s.red.StripSize(),
})
ft.AddTo(toExec)

parser := parser.NewParser(*s.red)
plans := exec.NewPlanBuilder()
err := parser.Parse(ft, plans)
if err != nil {
return 0, err
}
exec := plans.Execute()

ctx, cancel := context.WithCancel(context.Background())
go exec.Wait(ctx)

str, err := exec.BeginRead(hd)
if err != nil {
cancel()
return 0, err
}

s.downloadingStream = str
s.downloadingStripIndex = stripIndex
s.downloadingPlanCtxCancel = cancel
}

n, err := io.ReadFull(s.downloadingStream, buf)
s.downloadingStripIndex += 1
return n, err
}

Loading…
Cancel
Save