You can not select more than 25 topics Topics must start with a chinese character,a letter or number, can include dashes ('-') and can be up to 35 characters long.

ring.go 2.6 kB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117
  1. package io2
  2. import (
  3. "io"
  4. "sync"
  5. )
  6. type RingBuffer2 struct {
  7. buf []byte
  8. src io.ReadCloser
  9. err error
  10. isReading bool
  11. writePos int // 指向下一次写入的位置,应该是一个空位
  12. readPos int // 执行下一次读取的位置,应该是有效数据
  13. waitReading *sync.Cond
  14. waitComsuming *sync.Cond
  15. UpstreamName string
  16. DownstreamName string
  17. }
  18. func RingBuffer(src io.ReadCloser, size int) io.ReadCloser {
  19. lk := &sync.Mutex{}
  20. return &RingBuffer2{
  21. buf: make([]byte, size),
  22. src: src,
  23. waitReading: sync.NewCond(lk),
  24. waitComsuming: sync.NewCond(lk),
  25. }
  26. }
  27. func (r *RingBuffer2) Read(p []byte) (n int, err error) {
  28. r.waitReading.L.Lock()
  29. if !r.isReading {
  30. go r.reading()
  31. r.isReading = true
  32. }
  33. for r.writePos == r.readPos {
  34. if r.err != nil {
  35. r.waitReading.L.Unlock()
  36. return 0, r.err
  37. }
  38. // startTime := time.Now()
  39. r.waitReading.Wait()
  40. // fmt.Printf("%s wait data for %v\n", r.DownstreamName, time.Since(startTime))
  41. }
  42. writePos := r.writePos
  43. readPos := r.readPos
  44. r.waitReading.L.Unlock()
  45. if readPos < writePos {
  46. n = copy(p, r.buf[readPos:writePos])
  47. } else {
  48. n = copy(p, r.buf[readPos:])
  49. }
  50. r.waitComsuming.L.Lock()
  51. r.readPos = (r.readPos + n) % len(r.buf)
  52. r.waitComsuming.L.Unlock()
  53. r.waitComsuming.Broadcast()
  54. err = nil
  55. return
  56. }
  57. func (r *RingBuffer2) Close() error {
  58. r.src.Close()
  59. r.waitComsuming.Broadcast()
  60. r.waitReading.Broadcast()
  61. return nil
  62. }
  63. func (r *RingBuffer2) reading() {
  64. defer r.src.Close()
  65. for {
  66. r.waitComsuming.L.Lock()
  67. // writePos不能和readPos重合,因为无法区分缓冲区是已经满了,还是完全是空的
  68. // 所以writePos最多能到readPos的前一格
  69. for r.writePos+1 == r.readPos {
  70. r.waitComsuming.Wait()
  71. if r.err != nil {
  72. return
  73. }
  74. }
  75. writePos := r.writePos
  76. readPos := r.readPos
  77. r.waitComsuming.L.Unlock()
  78. var n int
  79. var err error
  80. if readPos <= writePos {
  81. // 同上理,写入数据的时候如果readPos为0,则它的前一格是底层缓冲区的最后一格
  82. // 那就不能写入到这一格
  83. if readPos == 0 {
  84. n, err = r.src.Read(r.buf[writePos : len(r.buf)-1])
  85. } else {
  86. n, err = r.src.Read(r.buf[writePos:])
  87. }
  88. } else if readPos > writePos {
  89. n, err = r.src.Read(r.buf[writePos:readPos])
  90. }
  91. // 无论成功还是失败,都发送一下信号通知读取端
  92. r.waitReading.L.Lock()
  93. r.err = err
  94. r.writePos = (r.writePos + n) % len(r.buf)
  95. r.waitReading.L.Unlock()
  96. r.waitReading.Broadcast()
  97. if err != nil {
  98. break
  99. }
  100. }
  101. }