Browse Source

自适应冗余方式基础机制

pull/33/head
Sydonian 1 year ago
parent
commit
b2228ce0fb
3 changed files with 137 additions and 7 deletions
  1. +10
    -7
      sdks/storage/models.go
  2. +61
    -0
      utils/io/clone.go
  3. +66
    -0
      utils/io/io_test.go

+ 10
- 7
sdks/storage/models.go View File

@@ -48,13 +48,11 @@ type RepRedundancy struct {
RedundancyBase
serder.Metadata `union:"rep"`
Type string `json:"type"`
RepCount int `json:"repCount"`
}

func NewRepRedundancy(repCount int) RepRedundancy {
return RepRedundancy{
Type: "rep",
RepCount: repCount,
func NewRepRedundancy() *RepRedundancy {
return &RepRedundancy{
Type: "rep",
}
}

@@ -67,8 +65,8 @@ type ECRedundancy struct {
ChunkSize int `json:"chunkSize"`
}

func NewECRedundancy(k int, n int, chunkSize int) ECRedundancy {
return ECRedundancy{
func NewECRedundancy(k int, n int, chunkSize int) *ECRedundancy {
return &ECRedundancy{
Type: "ec",
K: k,
N: n,
@@ -76,6 +74,11 @@ func NewECRedundancy(k int, n int, chunkSize int) ECRedundancy {
}
}

const (
PackageStateNormal = "Normal"
PackageStateDeleted = "Deleted"
)

type Package struct {
PackageID PackageID `db:"PackageID" json:"packageID"`
Name string `db:"Name" json:"name"`


+ 61
- 0
utils/io/clone.go View File

@@ -0,0 +1,61 @@
package io

import (
"io"
)

// 复制一个流。注:返回的多个流的读取不能在同一个线程,且如果不再需要读取返回的某个流,那么必须关闭这个流,否则会阻塞其他流的读取。
func Clone(str io.Reader, count int) []io.ReadCloser {
prs := make([]io.ReadCloser, count)
pws := make([]*io.PipeWriter, count)

for i := 0; i < count; i++ {
prs[i], pws[i] = io.Pipe()
}

go func() {
pwCount := count
buf := make([]byte, 4096)
var closeErr error
for {
if pwCount == 0 {
return
}

rd, err := str.Read(buf)
for i := 0; i < count; i++ {
if pws[i] == nil {
continue
}

err := WriteAll(pws[i], buf[:rd])
if err != nil {
pws[i] = nil
pwCount--
}
}

if err == nil {
continue
}

closeErr = err
break
}

for i := 0; i < count; i++ {
if pws[i] == nil {
continue
}
pws[i].CloseWithError(closeErr)
}
}()

return prs
}

/*
func BufferedClone(str io.Reader, count int, bufSize int) []io.ReadCloser {

}
*/

+ 66
- 0
utils/io/io_test.go View File

@@ -3,6 +3,7 @@ package io
import (
"bytes"
"io"
"sync"
"testing"

. "github.com/smartystreets/goconvey/convey"
@@ -115,3 +116,68 @@ func Test_Length(t *testing.T) {
So(err, ShouldEqual, io.ErrUnexpectedEOF)
})
}

func Test_Clone(t *testing.T) {
Convey("所有输出流都会被读取完", t, func() {
data := []byte{1, 2, 3, 4, 5}
str := bytes.NewReader(data)

cloneds := Clone(str, 3)
reads := make([][]byte, 3)
errs := make([]error, 3)

wg := sync.WaitGroup{}
wg.Add(3)

go func() {
reads[0], errs[0] = io.ReadAll(cloneds[0])
wg.Done()
}()
go func() {
reads[1], errs[1] = io.ReadAll(cloneds[1])
wg.Done()
}()
go func() {
reads[2], errs[2] = io.ReadAll(cloneds[2])
wg.Done()
}()

wg.Wait()

So(reads, ShouldResemble, [][]byte{data, data, data})
So(errs, ShouldResemble, []error{nil, nil, nil})
})

Convey("其中一个流读到一半就停止读取", t, func() {
data := []byte{1, 2, 3, 4, 5}
str := bytes.NewReader(data)

cloneds := Clone(str, 3)
reads := make([][]byte, 3)
errs := make([]error, 3)

wg := sync.WaitGroup{}
wg.Add(3)

go func() {
reads[0], errs[0] = io.ReadAll(cloneds[0])
wg.Done()
}()
go func() {
buf := make([]byte, 3)
_, errs[1] = io.ReadFull(cloneds[1], buf)
reads[1] = buf
cloneds[1].Close()
wg.Done()
}()
go func() {
reads[2], errs[2] = io.ReadAll(cloneds[2])
wg.Done()
}()

wg.Wait()

So(reads, ShouldResemble, [][]byte{data, {1, 2, 3}, data})
So(errs, ShouldResemble, []error{nil, nil, nil})
})
}

Loading…
Cancel
Save