diff --git a/utils/io2/ring.go b/utils/io2/ring.go index 8f0ea5f..ca89bea 100644 --- a/utils/io2/ring.go +++ b/utils/io2/ring.go @@ -1,36 +1,49 @@ package io2 import ( - "fmt" "io" "sync" "time" + + "gitlink.org.cn/cloudream/common/utils/math2" ) -type RingBuffer2 struct { - buf []byte - src io.ReadCloser - err error - isReading bool - writePos int // 指向下一次写入的位置,应该是一个空位 - readPos int // 执行下一次读取的位置,应该是有效数据 - waitReading *sync.Cond - waitComsuming *sync.Cond - UpstreamName string - DownstreamName string +type RingBufferStats struct { + MaxWaitDataTime time.Duration // 外部读取数据时的最长等待时间 + MaxWaitFreeSpaceTime time.Duration // 从数据源读取数据之前,等待空闲空间的最长时间 + TotalWaitDataTime time.Duration // 总等待读取数据的时间 + TotalWaitFreeSpaceTime time.Duration // 总等待空闲空间的时间 +} + +type RingBuffer struct { + buf []byte + src io.ReadCloser + maxPerRead int // 后台读取线程每次读取的最大字节数,太小会导致IO次数增多,太大会导致读、写并行性下降 + err error + isReading bool + writePos int // 指向下一次写入的位置,应该是一个空位 + readPos int // 执行下一次读取的位置,应该是有效数据 + waitReading *sync.Cond + waitComsuming *sync.Cond + stats RingBufferStats } -func RingBuffer(src io.ReadCloser, size int) *RingBuffer2 { +func Ring(src io.ReadCloser, size int) *RingBuffer { lk := &sync.Mutex{} - return &RingBuffer2{ + return &RingBuffer{ buf: make([]byte, size), src: src, + maxPerRead: size / 4, waitReading: sync.NewCond(lk), waitComsuming: sync.NewCond(lk), } } -func (r *RingBuffer2) Read(p []byte) (n int, err error) { +func (r *RingBuffer) Stats() RingBufferStats { + return r.stats +} + +func (r *RingBuffer) Read(p []byte) (n int, err error) { r.waitReading.L.Lock() if !r.isReading { go r.reading() @@ -45,16 +58,20 @@ func (r *RingBuffer2) Read(p []byte) (n int, err error) { startTime := time.Now() r.waitReading.Wait() - fmt.Printf("%s wait data for %v\n", r.DownstreamName, time.Since(startTime)) + dt := time.Since(startTime) + r.stats.MaxWaitDataTime = math2.Max(r.stats.MaxWaitDataTime, dt) + r.stats.TotalWaitDataTime += dt } writePos := r.writePos readPos := r.readPos r.waitReading.L.Unlock() if readPos < writePos { - n = copy(p, r.buf[readPos:writePos]) + maxRead := math2.Min(r.maxPerRead, writePos-readPos) + n = copy(p, r.buf[readPos:readPos+maxRead]) } else { - n = copy(p, r.buf[readPos:]) + maxRead := math2.Min(r.maxPerRead, len(r.buf)-readPos) + n = copy(p, r.buf[readPos:readPos+maxRead]) } r.waitComsuming.L.Lock() @@ -66,7 +83,7 @@ func (r *RingBuffer2) Read(p []byte) (n int, err error) { return } -func (r *RingBuffer2) Close() error { +func (r *RingBuffer) Close() error { r.src.Close() r.waitComsuming.L.Lock() r.err = io.ErrClosedPipe @@ -76,7 +93,7 @@ func (r *RingBuffer2) Close() error { return nil } -func (r *RingBuffer2) reading() { +func (r *RingBuffer) reading() { defer r.src.Close() for { @@ -86,7 +103,9 @@ func (r *RingBuffer2) reading() { for (r.writePos+1)%len(r.buf) == r.readPos { startTime := time.Now() r.waitComsuming.Wait() - fmt.Printf("%s wait free space for %v\n", r.UpstreamName, time.Since(startTime)) + dt := time.Since(startTime) + r.stats.MaxWaitFreeSpaceTime = math2.Max(r.stats.MaxWaitFreeSpaceTime, dt) + r.stats.TotalWaitFreeSpaceTime += dt if r.err != nil { return @@ -102,12 +121,15 @@ func (r *RingBuffer2) reading() { // 同上理,写入数据的时候如果readPos为0,则它的前一格是底层缓冲区的最后一格 // 那就不能写入到这一格 if readPos == 0 { - n, err = r.src.Read(r.buf[writePos : len(r.buf)-1]) + maxWrite := math2.Min(r.maxPerRead, len(r.buf)-1-writePos) + n, err = r.src.Read(r.buf[writePos : writePos+maxWrite]) } else { - n, err = r.src.Read(r.buf[writePos:]) + maxWrite := math2.Min(r.maxPerRead, len(r.buf)-writePos) + n, err = r.src.Read(r.buf[writePos : writePos+maxWrite]) } } else { - n, err = r.src.Read(r.buf[writePos : readPos-1]) + maxWrite := math2.Min(r.maxPerRead, readPos-1-writePos) + n, err = r.src.Read(r.buf[writePos : writePos+maxWrite]) } // 无论成功还是失败,都发送一下信号通知读取端 diff --git a/utils/io2/ring_test.go b/utils/io2/ring_test.go index 3c50fd4..5a8cbb7 100644 --- a/utils/io2/ring_test.go +++ b/utils/io2/ring_test.go @@ -41,7 +41,7 @@ func (r *syncReader) Close() error { func Test_RingBuffer(t *testing.T) { Convey("写满读满", t, func() { - b := RingBuffer(io.NopCloser(bytes.NewBuffer([]byte{1, 2, 3})), 4) + b := Ring(io.NopCloser(bytes.NewBuffer([]byte{1, 2, 3})), 4) ret := make([]byte, 3) n, err := b.Read(ret) @@ -53,7 +53,7 @@ func Test_RingBuffer(t *testing.T) { Convey("1+3+1", t, func() { sy := sync2.NewCounterCond(0) - b := RingBuffer(&syncReader{ + b := Ring(&syncReader{ data: [][]byte{ {1}, nil, @@ -84,7 +84,7 @@ func Test_RingBuffer(t *testing.T) { Convey("3+1+2", t, func() { sy := sync2.NewCounterCond(0) - b := RingBuffer(&syncReader{ + b := Ring(&syncReader{ data: [][]byte{ {1, 2, 3, 4, 5, 6}, },