Browse Source

增加工具函数

pull/48/head
Sydonian 1 year ago
parent
commit
4a39bb22a3
3 changed files with 64 additions and 12 deletions
  1. +13
    -7
      utils/io2/clone.go
  2. +2
    -5
      utils/io2/io.go
  3. +49
    -0
      utils/io2/join.go

+ 13
- 7
utils/io2/clone.go View File

@@ -2,6 +2,7 @@ package io2

import (
"io"
"sync"
)

// 复制一个流。注:返回的多个流的读取不能在同一个线程,且如果不再需要读取返回的某个流,那么必须关闭这个流,否则会阻塞其他流的读取。
@@ -15,7 +16,7 @@ func Clone(str io.Reader, count int) []io.ReadCloser {

go func() {
pwCount := count
buf := make([]byte, 4096)
buf := make([]byte, 1024*16)
var closeErr error
for {
if pwCount == 0 {
@@ -23,17 +24,22 @@ func Clone(str io.Reader, count int) []io.ReadCloser {
}

rd, err := str.Read(buf)
wg := sync.WaitGroup{}
for i := 0; i < count; i++ {
if pws[i] == nil {
continue
}

err := WriteAll(pws[i], buf[:rd])
if err != nil {
pws[i] = nil
pwCount--
}
wg.Add(1)
go func(i int) {
defer wg.Done()
err := WriteAll(pws[i], buf[:rd])
if err != nil {
pws[i] = nil
pwCount--
}
}(i)
}
wg.Wait()

if err == nil {
continue


+ 2
- 5
utils/io2/io.go View File

@@ -188,14 +188,11 @@ func ToReaders(strs []io.ReadCloser) ([]io.Reader, func()) {
}
}

func DropWithBuf(str io.Reader, buf []byte) error {
func DropWithBuf(str io.Reader, buf []byte) {
for {
_, err := str.Read(buf)
if err == io.EOF {
return nil
}
if err != nil {
return err
break
}
}
}

+ 49
- 0
utils/io2/join.go View File

@@ -5,6 +5,7 @@ import (

"gitlink.org.cn/cloudream/common/utils/lo2"
"gitlink.org.cn/cloudream/common/utils/math2"
"gitlink.org.cn/cloudream/common/utils/sync2"
)

func Join(strs []io.Reader) io.ReadCloser {
@@ -105,3 +106,51 @@ func ChunkedJoin(inputs []io.Reader, chunkSize int) io.ReadCloser {
chunkSize: chunkSize,
}
}

type bufferedChunkedJoin struct {
inputs []io.Reader
buffer []byte
chunkSize int
currentRead int
err error
}

func (s *bufferedChunkedJoin) Read(buf []byte) (int, error) {
if s.err != nil {
return 0, s.err
}

if s.currentRead == len(s.buffer) {
err := sync2.ParallelDo(s.inputs, func(input io.Reader, i int) error {
bufStart := i * s.chunkSize
_, err := io.ReadFull(input, s.buffer[bufStart:bufStart+s.chunkSize])
return err
})
if err == io.EOF {
return 0, io.EOF
}
if err != nil {
return 0, err
}
s.currentRead = 0
}

n := copy(buf, s.buffer[s.currentRead:])
s.currentRead += n
return n, nil
}

func (s *bufferedChunkedJoin) Close() error {
s.err = io.ErrClosedPipe
return nil
}

func BufferedChunkedJoin(inputs []io.Reader, chunkSize int) io.ReadCloser {
buffer := make([]byte, len(inputs)*chunkSize)
return &bufferedChunkedJoin{
inputs: inputs,
buffer: buffer,
chunkSize: chunkSize,
currentRead: len(buffer),
}
}

Loading…
Cancel
Save