You can not select more than 25 topics Topics must start with a chinese character,a letter or number, can include dashes ('-') and can be up to 35 characters long.

join.go 2.9 kB

1 year ago
1 year ago
1 year ago
1 year ago
1 year ago
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156
  1. package io2
  2. import (
  3. "io"
  4. "gitlink.org.cn/cloudream/common/utils/lo2"
  5. "gitlink.org.cn/cloudream/common/utils/math2"
  6. "gitlink.org.cn/cloudream/common/utils/sync2"
  7. )
  8. func Join(strs []io.Reader) io.ReadCloser {
  9. pr, pw := io.Pipe()
  10. go func() {
  11. var closeErr error
  12. buf := make([]byte, 4096)
  13. outer:
  14. for _, str := range strs {
  15. for {
  16. bufLen := len(buf)
  17. if bufLen == 0 {
  18. break outer
  19. }
  20. rd, err := str.Read(buf[:bufLen])
  21. if err != nil {
  22. if err != io.EOF {
  23. closeErr = err
  24. break outer
  25. }
  26. err = WriteAll(pw, buf[:rd])
  27. if err != nil {
  28. closeErr = err
  29. break outer
  30. }
  31. break
  32. }
  33. err = WriteAll(pw, buf[:rd])
  34. if err != nil {
  35. closeErr = err
  36. break outer
  37. }
  38. }
  39. }
  40. pw.CloseWithError(closeErr)
  41. }()
  42. return pr
  43. }
  44. type chunkedJoin struct {
  45. inputs []io.Reader
  46. chunkSize int
  47. currentInput int
  48. currentRead int
  49. err error
  50. }
  51. func (s *chunkedJoin) Read(buf []byte) (int, error) {
  52. if s.err != nil {
  53. return 0, s.err
  54. }
  55. if len(s.inputs) == 0 {
  56. return 0, io.EOF
  57. }
  58. bufLen := math2.Min(math2.Min(s.chunkSize, len(buf)), s.chunkSize-s.currentRead)
  59. rd, err := s.inputs[s.currentInput].Read(buf[:bufLen])
  60. if err == nil {
  61. s.currentRead += rd
  62. if s.currentRead == s.chunkSize {
  63. s.currentInput = (s.currentInput + 1) % len(s.inputs)
  64. s.currentRead = 0
  65. }
  66. return rd, nil
  67. }
  68. if err == io.EOF {
  69. s.inputs = lo2.RemoveAt(s.inputs, s.currentInput)
  70. // 此处不需要+1
  71. if len(s.inputs) > 0 {
  72. s.currentInput = s.currentInput % len(s.inputs)
  73. s.currentRead = 0
  74. }
  75. return rd, nil
  76. }
  77. s.err = err
  78. return rd, err
  79. }
  80. func (s *chunkedJoin) Close() error {
  81. s.err = io.ErrClosedPipe
  82. return nil
  83. }
  84. func ChunkedJoin(inputs []io.Reader, chunkSize int) io.ReadCloser {
  85. return &chunkedJoin{
  86. inputs: inputs,
  87. chunkSize: chunkSize,
  88. }
  89. }
  90. type bufferedChunkedJoin struct {
  91. inputs []io.Reader
  92. buffer []byte
  93. chunkSize int
  94. currentRead int
  95. err error
  96. }
  97. func (s *bufferedChunkedJoin) Read(buf []byte) (int, error) {
  98. if s.err != nil {
  99. return 0, s.err
  100. }
  101. if s.currentRead == len(s.buffer) {
  102. err := sync2.ParallelDo(s.inputs, func(input io.Reader, i int) error {
  103. bufStart := i * s.chunkSize
  104. _, err := io.ReadFull(input, s.buffer[bufStart:bufStart+s.chunkSize])
  105. return err
  106. })
  107. if err == io.EOF {
  108. return 0, io.EOF
  109. }
  110. if err != nil {
  111. return 0, err
  112. }
  113. s.currentRead = 0
  114. }
  115. n := copy(buf, s.buffer[s.currentRead:])
  116. s.currentRead += n
  117. return n, nil
  118. }
  119. func (s *bufferedChunkedJoin) Close() error {
  120. s.err = io.ErrClosedPipe
  121. return nil
  122. }
  123. func BufferedChunkedJoin(inputs []io.Reader, chunkSize int) io.ReadCloser {
  124. buffer := make([]byte, len(inputs)*chunkSize)
  125. return &bufferedChunkedJoin{
  126. inputs: inputs,
  127. buffer: buffer,
  128. chunkSize: chunkSize,
  129. currentRead: len(buffer),
  130. }
  131. }