From 4a39bb22a371adb41f1c206f628fedd613080ac7 Mon Sep 17 00:00:00 2001 From: Sydonian <794346190@qq.com> Date: Thu, 8 Aug 2024 15:01:42 +0800 Subject: [PATCH] =?UTF-8?q?=E5=A2=9E=E5=8A=A0=E5=B7=A5=E5=85=B7=E5=87=BD?= =?UTF-8?q?=E6=95=B0?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- utils/io2/clone.go | 20 ++++++++++++------- utils/io2/io.go | 7 ++----- utils/io2/join.go | 49 ++++++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 64 insertions(+), 12 deletions(-) diff --git a/utils/io2/clone.go b/utils/io2/clone.go index 3faacaf..cec7d80 100644 --- a/utils/io2/clone.go +++ b/utils/io2/clone.go @@ -2,6 +2,7 @@ package io2 import ( "io" + "sync" ) // 复制一个流。注:返回的多个流的读取不能在同一个线程,且如果不再需要读取返回的某个流,那么必须关闭这个流,否则会阻塞其他流的读取。 @@ -15,7 +16,7 @@ func Clone(str io.Reader, count int) []io.ReadCloser { go func() { pwCount := count - buf := make([]byte, 4096) + buf := make([]byte, 1024*16) var closeErr error for { if pwCount == 0 { @@ -23,17 +24,22 @@ func Clone(str io.Reader, count int) []io.ReadCloser { } rd, err := str.Read(buf) + wg := sync.WaitGroup{} for i := 0; i < count; i++ { if pws[i] == nil { continue } - - err := WriteAll(pws[i], buf[:rd]) - if err != nil { - pws[i] = nil - pwCount-- - } + wg.Add(1) + go func(i int) { + defer wg.Done() + err := WriteAll(pws[i], buf[:rd]) + if err != nil { + pws[i] = nil + pwCount-- + } + }(i) } + wg.Wait() if err == nil { continue diff --git a/utils/io2/io.go b/utils/io2/io.go index 4228e38..8a0d113 100644 --- a/utils/io2/io.go +++ b/utils/io2/io.go @@ -188,14 +188,11 @@ func ToReaders(strs []io.ReadCloser) ([]io.Reader, func()) { } } -func DropWithBuf(str io.Reader, buf []byte) error { +func DropWithBuf(str io.Reader, buf []byte) { for { _, err := str.Read(buf) - if err == io.EOF { - return nil - } if err != nil { - return err + break } } } diff --git a/utils/io2/join.go b/utils/io2/join.go index c7d46ef..0448471 100644 --- a/utils/io2/join.go +++ b/utils/io2/join.go @@ -5,6 +5,7 @@ import ( "gitlink.org.cn/cloudream/common/utils/lo2" "gitlink.org.cn/cloudream/common/utils/math2" + "gitlink.org.cn/cloudream/common/utils/sync2" ) func Join(strs []io.Reader) io.ReadCloser { @@ -105,3 +106,51 @@ func ChunkedJoin(inputs []io.Reader, chunkSize int) io.ReadCloser { chunkSize: chunkSize, } } + +type bufferedChunkedJoin struct { + inputs []io.Reader + buffer []byte + chunkSize int + currentRead int + err error +} + +func (s *bufferedChunkedJoin) Read(buf []byte) (int, error) { + if s.err != nil { + return 0, s.err + } + + if s.currentRead == len(s.buffer) { + err := sync2.ParallelDo(s.inputs, func(input io.Reader, i int) error { + bufStart := i * s.chunkSize + _, err := io.ReadFull(input, s.buffer[bufStart:bufStart+s.chunkSize]) + return err + }) + if err == io.EOF { + return 0, io.EOF + } + if err != nil { + return 0, err + } + s.currentRead = 0 + } + + n := copy(buf, s.buffer[s.currentRead:]) + s.currentRead += n + return n, nil +} + +func (s *bufferedChunkedJoin) Close() error { + s.err = io.ErrClosedPipe + return nil +} + +func BufferedChunkedJoin(inputs []io.Reader, chunkSize int) io.ReadCloser { + buffer := make([]byte, len(inputs)*chunkSize) + return &bufferedChunkedJoin{ + inputs: inputs, + buffer: buffer, + chunkSize: chunkSize, + currentRead: len(buffer), + } +}