You can not select more than 25 topics Topics must start with a chinese character,a letter or number, can include dashes ('-') and can be up to 35 characters long.

ring.go 4.2 kB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157
  1. package io2
  2. import (
  3. "io"
  4. "sync"
  5. "time"
  6. "gitlink.org.cn/cloudream/common/utils/math2"
  7. )
  8. type RingBufferStats struct {
  9. MaxWaitDataTime time.Duration // 外部读取数据时的最长等待时间
  10. MaxWaitFreeSpaceTime time.Duration // 从数据源读取数据之前,等待空闲空间的最长时间
  11. TotalWaitDataTime time.Duration // 总等待读取数据的时间
  12. TotalWaitFreeSpaceTime time.Duration // 总等待空闲空间的时间
  13. }
  14. type RingBuffer struct {
  15. buf []byte
  16. src io.ReadCloser
  17. maxPreRead int // 后台读取线程每次读取的最大字节数,太小会导致IO次数增多,太大会导致读、写并行性下降
  18. err error
  19. isReading bool
  20. writePos int // 指向下一次写入的位置,应该是一个空位
  21. readPos int // 执行下一次读取的位置,应该是有效数据
  22. waitReading *sync.Cond
  23. waitComsuming *sync.Cond
  24. stats RingBufferStats
  25. }
  26. type RingBufferConfig struct {
  27. MaxPreReading int // 后台读取线程每次读取的最大字节数,太小会导致IO次数增多,太大会导致读、写并行性下降
  28. }
  29. func Ring(src io.ReadCloser, size int, cfg ...RingBufferConfig) *RingBuffer {
  30. c := RingBufferConfig{
  31. MaxPreReading: size / 4,
  32. }
  33. if len(cfg) > 0 {
  34. c = cfg[0]
  35. }
  36. lk := &sync.Mutex{}
  37. return &RingBuffer{
  38. buf: make([]byte, size),
  39. src: src,
  40. maxPreRead: c.MaxPreReading,
  41. waitReading: sync.NewCond(lk),
  42. waitComsuming: sync.NewCond(lk),
  43. }
  44. }
  45. func (r *RingBuffer) Stats() RingBufferStats {
  46. return r.stats
  47. }
  48. func (r *RingBuffer) Read(p []byte) (n int, err error) {
  49. r.waitReading.L.Lock()
  50. if !r.isReading {
  51. go r.reading()
  52. r.isReading = true
  53. }
  54. for r.writePos == r.readPos {
  55. if r.err != nil {
  56. r.waitReading.L.Unlock()
  57. return 0, r.err
  58. }
  59. startTime := time.Now()
  60. r.waitReading.Wait()
  61. dt := time.Since(startTime)
  62. r.stats.MaxWaitDataTime = math2.Max(r.stats.MaxWaitDataTime, dt)
  63. r.stats.TotalWaitDataTime += dt
  64. }
  65. writePos := r.writePos
  66. readPos := r.readPos
  67. r.waitReading.L.Unlock()
  68. if readPos < writePos {
  69. maxRead := math2.Min(r.maxPreRead, writePos-readPos)
  70. n = copy(p, r.buf[readPos:readPos+maxRead])
  71. } else {
  72. maxRead := math2.Min(r.maxPreRead, len(r.buf)-readPos)
  73. n = copy(p, r.buf[readPos:readPos+maxRead])
  74. }
  75. r.waitComsuming.L.Lock()
  76. r.readPos = (r.readPos + n) % len(r.buf)
  77. r.waitComsuming.L.Unlock()
  78. r.waitComsuming.Broadcast()
  79. err = nil
  80. return
  81. }
  82. func (r *RingBuffer) Close() error {
  83. r.src.Close()
  84. r.waitComsuming.L.Lock()
  85. r.err = io.ErrClosedPipe
  86. r.waitComsuming.L.Unlock()
  87. r.waitComsuming.Broadcast()
  88. r.waitReading.Broadcast()
  89. return nil
  90. }
  91. func (r *RingBuffer) reading() {
  92. defer r.src.Close()
  93. for {
  94. r.waitComsuming.L.Lock()
  95. // writePos不能和readPos重合,因为无法区分缓冲区是已经满了,还是完全是空的
  96. // 所以writePos最多能到readPos的前一格
  97. for (r.writePos+1)%len(r.buf) == r.readPos {
  98. startTime := time.Now()
  99. r.waitComsuming.Wait()
  100. dt := time.Since(startTime)
  101. r.stats.MaxWaitFreeSpaceTime = math2.Max(r.stats.MaxWaitFreeSpaceTime, dt)
  102. r.stats.TotalWaitFreeSpaceTime += dt
  103. if r.err != nil {
  104. return
  105. }
  106. }
  107. writePos := r.writePos
  108. readPos := r.readPos
  109. r.waitComsuming.L.Unlock()
  110. var n int
  111. var err error
  112. if readPos <= writePos {
  113. // 同上理,写入数据的时候如果readPos为0,则它的前一格是底层缓冲区的最后一格
  114. // 那就不能写入到这一格
  115. if readPos == 0 {
  116. maxWrite := math2.Min(r.maxPreRead, len(r.buf)-1-writePos)
  117. n, err = r.src.Read(r.buf[writePos : writePos+maxWrite])
  118. } else {
  119. maxWrite := math2.Min(r.maxPreRead, len(r.buf)-writePos)
  120. n, err = r.src.Read(r.buf[writePos : writePos+maxWrite])
  121. }
  122. } else {
  123. maxWrite := math2.Min(r.maxPreRead, readPos-1-writePos)
  124. n, err = r.src.Read(r.buf[writePos : writePos+maxWrite])
  125. }
  126. // 无论成功还是失败,都发送一下信号通知读取端
  127. r.waitReading.L.Lock()
  128. r.err = err
  129. r.writePos = (r.writePos + n) % len(r.buf)
  130. r.waitReading.L.Unlock()
  131. r.waitReading.Broadcast()
  132. if err != nil {
  133. break
  134. }
  135. }
  136. }