package io2 import ( "io" "sync" "time" "gitlink.org.cn/cloudream/common/utils/math2" ) type RingBufferStats struct { MaxWaitDataTime time.Duration // 外部读取数据时的最长等待时间 MaxWaitFreeSpaceTime time.Duration // 从数据源读取数据之前,等待空闲空间的最长时间 TotalWaitDataTime time.Duration // 总等待读取数据的时间 TotalWaitFreeSpaceTime time.Duration // 总等待空闲空间的时间 } type RingBuffer struct { buf []byte src io.ReadCloser maxPreRead int // 后台读取线程每次读取的最大字节数,太小会导致IO次数增多,太大会导致读、写并行性下降 err error isReading bool writePos int // 指向下一次写入的位置,应该是一个空位 readPos int // 执行下一次读取的位置,应该是有效数据 waitReading *sync.Cond waitComsuming *sync.Cond stats RingBufferStats } type RingBufferConfig struct { MaxPreReading int // 后台读取线程每次读取的最大字节数,太小会导致IO次数增多,太大会导致读、写并行性下降 } func Ring(src io.ReadCloser, size int, cfg ...RingBufferConfig) *RingBuffer { c := RingBufferConfig{ MaxPreReading: size / 4, } if len(cfg) > 0 { c = cfg[0] } lk := &sync.Mutex{} return &RingBuffer{ buf: make([]byte, size), src: src, maxPreRead: c.MaxPreReading, waitReading: sync.NewCond(lk), waitComsuming: sync.NewCond(lk), } } func (r *RingBuffer) Stats() RingBufferStats { return r.stats } func (r *RingBuffer) Read(p []byte) (n int, err error) { r.waitReading.L.Lock() if !r.isReading { go r.reading() r.isReading = true } for r.writePos == r.readPos { if r.err != nil { r.waitReading.L.Unlock() return 0, r.err } startTime := time.Now() r.waitReading.Wait() dt := time.Since(startTime) r.stats.MaxWaitDataTime = math2.Max(r.stats.MaxWaitDataTime, dt) r.stats.TotalWaitDataTime += dt } writePos := r.writePos readPos := r.readPos r.waitReading.L.Unlock() if readPos < writePos { maxRead := math2.Min(r.maxPreRead, writePos-readPos) n = copy(p, r.buf[readPos:readPos+maxRead]) } else { maxRead := math2.Min(r.maxPreRead, len(r.buf)-readPos) n = copy(p, r.buf[readPos:readPos+maxRead]) } r.waitComsuming.L.Lock() r.readPos = (r.readPos + n) % len(r.buf) r.waitComsuming.L.Unlock() r.waitComsuming.Broadcast() err = nil return } func (r *RingBuffer) Close() error { r.src.Close() r.waitComsuming.L.Lock() r.err = io.ErrClosedPipe r.waitComsuming.L.Unlock() r.waitComsuming.Broadcast() r.waitReading.Broadcast() return nil } func (r *RingBuffer) reading() { defer r.src.Close() for { r.waitComsuming.L.Lock() // writePos不能和readPos重合,因为无法区分缓冲区是已经满了,还是完全是空的 // 所以writePos最多能到readPos的前一格 for (r.writePos+1)%len(r.buf) == r.readPos { startTime := time.Now() r.waitComsuming.Wait() dt := time.Since(startTime) r.stats.MaxWaitFreeSpaceTime = math2.Max(r.stats.MaxWaitFreeSpaceTime, dt) r.stats.TotalWaitFreeSpaceTime += dt if r.err != nil { return } } writePos := r.writePos readPos := r.readPos r.waitComsuming.L.Unlock() var n int var err error if readPos <= writePos { // 同上理,写入数据的时候如果readPos为0,则它的前一格是底层缓冲区的最后一格 // 那就不能写入到这一格 if readPos == 0 { maxWrite := math2.Min(r.maxPreRead, len(r.buf)-1-writePos) n, err = r.src.Read(r.buf[writePos : writePos+maxWrite]) } else { maxWrite := math2.Min(r.maxPreRead, len(r.buf)-writePos) n, err = r.src.Read(r.buf[writePos : writePos+maxWrite]) } } else { maxWrite := math2.Min(r.maxPreRead, readPos-1-writePos) n, err = r.src.Read(r.buf[writePos : writePos+maxWrite]) } // 无论成功还是失败,都发送一下信号通知读取端 r.waitReading.L.Lock() r.err = err r.writePos = (r.writePos + n) % len(r.buf) r.waitReading.L.Unlock() r.waitReading.Broadcast() if err != nil { break } } }