diff --git a/sdks/storage/models.go b/sdks/storage/models.go index 32b1572..a271990 100644 --- a/sdks/storage/models.go +++ b/sdks/storage/models.go @@ -48,13 +48,11 @@ type RepRedundancy struct { RedundancyBase serder.Metadata `union:"rep"` Type string `json:"type"` - RepCount int `json:"repCount"` } -func NewRepRedundancy(repCount int) RepRedundancy { - return RepRedundancy{ - Type: "rep", - RepCount: repCount, +func NewRepRedundancy() *RepRedundancy { + return &RepRedundancy{ + Type: "rep", } } @@ -67,8 +65,8 @@ type ECRedundancy struct { ChunkSize int `json:"chunkSize"` } -func NewECRedundancy(k int, n int, chunkSize int) ECRedundancy { - return ECRedundancy{ +func NewECRedundancy(k int, n int, chunkSize int) *ECRedundancy { + return &ECRedundancy{ Type: "ec", K: k, N: n, @@ -76,6 +74,11 @@ func NewECRedundancy(k int, n int, chunkSize int) ECRedundancy { } } +const ( + PackageStateNormal = "Normal" + PackageStateDeleted = "Deleted" +) + type Package struct { PackageID PackageID `db:"PackageID" json:"packageID"` Name string `db:"Name" json:"name"` diff --git a/utils/io/clone.go b/utils/io/clone.go new file mode 100644 index 0000000..b1cf556 --- /dev/null +++ b/utils/io/clone.go @@ -0,0 +1,61 @@ +package io + +import ( + "io" +) + +// 复制一个流。注:返回的多个流的读取不能在同一个线程,且如果不再需要读取返回的某个流,那么必须关闭这个流,否则会阻塞其他流的读取。 +func Clone(str io.Reader, count int) []io.ReadCloser { + prs := make([]io.ReadCloser, count) + pws := make([]*io.PipeWriter, count) + + for i := 0; i < count; i++ { + prs[i], pws[i] = io.Pipe() + } + + go func() { + pwCount := count + buf := make([]byte, 4096) + var closeErr error + for { + if pwCount == 0 { + return + } + + rd, err := str.Read(buf) + for i := 0; i < count; i++ { + if pws[i] == nil { + continue + } + + err := WriteAll(pws[i], buf[:rd]) + if err != nil { + pws[i] = nil + pwCount-- + } + } + + if err == nil { + continue + } + + closeErr = err + break + } + + for i := 0; i < count; i++ { + if pws[i] == nil { + continue + } + pws[i].CloseWithError(closeErr) + } + }() + + return prs +} + +/* +func BufferedClone(str io.Reader, count int, bufSize int) []io.ReadCloser { + +} +*/ diff --git a/utils/io/io_test.go b/utils/io/io_test.go index 3066aa9..a4eab09 100644 --- a/utils/io/io_test.go +++ b/utils/io/io_test.go @@ -3,6 +3,7 @@ package io import ( "bytes" "io" + "sync" "testing" . "github.com/smartystreets/goconvey/convey" @@ -115,3 +116,68 @@ func Test_Length(t *testing.T) { So(err, ShouldEqual, io.ErrUnexpectedEOF) }) } + +func Test_Clone(t *testing.T) { + Convey("所有输出流都会被读取完", t, func() { + data := []byte{1, 2, 3, 4, 5} + str := bytes.NewReader(data) + + cloneds := Clone(str, 3) + reads := make([][]byte, 3) + errs := make([]error, 3) + + wg := sync.WaitGroup{} + wg.Add(3) + + go func() { + reads[0], errs[0] = io.ReadAll(cloneds[0]) + wg.Done() + }() + go func() { + reads[1], errs[1] = io.ReadAll(cloneds[1]) + wg.Done() + }() + go func() { + reads[2], errs[2] = io.ReadAll(cloneds[2]) + wg.Done() + }() + + wg.Wait() + + So(reads, ShouldResemble, [][]byte{data, data, data}) + So(errs, ShouldResemble, []error{nil, nil, nil}) + }) + + Convey("其中一个流读到一半就停止读取", t, func() { + data := []byte{1, 2, 3, 4, 5} + str := bytes.NewReader(data) + + cloneds := Clone(str, 3) + reads := make([][]byte, 3) + errs := make([]error, 3) + + wg := sync.WaitGroup{} + wg.Add(3) + + go func() { + reads[0], errs[0] = io.ReadAll(cloneds[0]) + wg.Done() + }() + go func() { + buf := make([]byte, 3) + _, errs[1] = io.ReadFull(cloneds[1], buf) + reads[1] = buf + cloneds[1].Close() + wg.Done() + }() + go func() { + reads[2], errs[2] = io.ReadAll(cloneds[2]) + wg.Done() + }() + + wg.Wait() + + So(reads, ShouldResemble, [][]byte{data, {1, 2, 3}, data}) + So(errs, ShouldResemble, []error{nil, nil, nil}) + }) +}