Browse Source

Merge pull request '自适应冗余方式基础机制' (#33) from feature_gxh into master

pull/36/head
baohan 1 year ago
parent
commit
18a5fbbd5c
13 changed files with 287 additions and 233 deletions
  1. +1
    -1
      sdks/imfs/imfs_test.go
  2. +8
    -8
      sdks/imfs/package.go
  3. +5
    -4
      sdks/scheduler/jobset.go
  4. +7
    -8
      sdks/scheduler/models.go
  5. +7
    -40
      sdks/storage/cache.go
  6. +65
    -115
      sdks/storage/models.go
  7. +36
    -0
      sdks/storage/object.go
  8. +13
    -14
      sdks/storage/package.go
  9. +12
    -13
      sdks/storage/storage.go
  10. +5
    -29
      sdks/storage/storage_test.go
  11. +61
    -0
      utils/io/clone.go
  12. +66
    -0
      utils/io/io_test.go
  13. +1
    -1
      utils/serder/union_handler.go

+ 1
- 1
sdks/imfs/imfs_test.go View File

@@ -32,7 +32,7 @@ func Test_Package(t *testing.T) {
URL: "http://localhost:7893",
})

_, err := cli.PackageGetWithObjectCacheInfos(PackageGetWithObjectCacheInfos{UserID: 0, PackageID: 13})
_, err := cli.PackageGetWithObjects(PackageGetWithObjectsInfos{UserID: 0, PackageID: 13})
So(err, ShouldBeNil)
})
}

+ 8
- 8
sdks/imfs/package.go View File

@@ -8,19 +8,19 @@ import (
myhttp "gitlink.org.cn/cloudream/common/utils/http"
)

const PackageGetWithObjectCacheInfosPath = "/package/getWithObjectCacheInfos"
const PackageGetWithObjectsPath = "/package/getWithObjects"

type PackageGetWithObjectCacheInfos struct {
type PackageGetWithObjectsInfos struct {
UserID int64 `json:"userID"`
PackageID int64 `json:"packageID"`
}
type PackageGetWithObjectCacheInfosResp struct {
Package cdssdk.Package `json:"package"`
ObjectCacheInfos []cdssdk.ObjectCacheInfo `json:"objectCacheInfos"`
type PackageGetWithObjectsResp struct {
Package cdssdk.Package `json:"package"`
Objects []cdssdk.Object `json:"objects"`
}

func (c *Client) PackageGetWithObjectCacheInfos(req PackageGetWithObjectCacheInfos) (*PackageGetWithObjectCacheInfosResp, error) {
url, err := url.JoinPath(c.baseURL, PackageGetWithObjectCacheInfosPath)
func (c *Client) PackageGetWithObjects(req PackageGetWithObjectsInfos) (*PackageGetWithObjectsResp, error) {
url, err := url.JoinPath(c.baseURL, PackageGetWithObjectsPath)
if err != nil {
return nil, err
}
@@ -32,7 +32,7 @@ func (c *Client) PackageGetWithObjectCacheInfos(req PackageGetWithObjectCacheInf
return nil, err
}

jsonResp, err := myhttp.ParseJSONResponse[response[PackageGetWithObjectCacheInfosResp]](resp)
jsonResp, err := myhttp.ParseJSONResponse[response[PackageGetWithObjectsResp]](resp)
if err != nil {
return nil, err
}


+ 5
- 4
sdks/scheduler/jobset.go View File

@@ -6,6 +6,7 @@ import (
"strings"

"gitlink.org.cn/cloudream/common/consts/errorcode"
cdssdk "gitlink.org.cn/cloudream/common/sdks/storage"
myhttp "gitlink.org.cn/cloudream/common/utils/http"
"gitlink.org.cn/cloudream/common/utils/serder"
)
@@ -50,10 +51,10 @@ func (c *Client) JobSetSumbit(req JobSetSumbitReq) (*JobSetSumbitResp, error) {
}

type JobSetLocalFileUploadedReq struct {
JobSetID string `json:"jobSetID"`
LocalPath string `json:"localPath"`
Error string `json:"error"`
PackageID int64 `json:"packageID"`
JobSetID string `json:"jobSetID"`
LocalPath string `json:"localPath"`
Error string `json:"error"`
PackageID cdssdk.PackageID `json:"packageID"`
}

func (c *Client) JobSetLocalFileUploaded(req JobSetLocalFileUploadedReq) error {


+ 7
- 8
sdks/scheduler/models.go View File

@@ -59,10 +59,9 @@ type NormalJobInfo struct {
type ResourceJobInfo struct {
serder.Metadata `union:"Resource"`
JobInfoBase
Type string `json:"type"`
BucketID int64 `json:"bucketID"`
Redundancy cdssdk.TypedRedundancyInfo `json:"redundancy"`
TargetLocalJobID string `json:"targetLocalJobID"`
Type string `json:"type"`
BucketID cdssdk.BucketID `json:"bucketID"`
TargetLocalJobID string `json:"targetLocalJobID"`
}

type JobFilesInfo struct {
@@ -90,8 +89,8 @@ func (i *JobFileInfoBase) Noop() {}
type PackageJobFileInfo struct {
serder.Metadata `union:"Package"`
JobFileInfoBase
Type string `json:"type"`
PackageID int64 `json:"packageID"`
Type string `json:"type"`
PackageID cdssdk.PackageID `json:"packageID"`
}

type LocalJobFileInfo struct {
@@ -141,6 +140,6 @@ type JobSetFilesUploadScheme struct {
}

type LocalFileUploadScheme struct {
LocalPath string `json:"localPath"`
UploadToCDSNodeID *int64 `json:"uploadToCDSNodeID"`
LocalPath string `json:"localPath"`
UploadToCDSNodeID *cdssdk.NodeID `json:"uploadToCDSNodeID"`
}

+ 7
- 40
sdks/storage/cache.go View File

@@ -7,17 +7,17 @@ import (
myhttp "gitlink.org.cn/cloudream/common/utils/http"
)

var CacheMovePackagePath = "/cache/movePackage"

type CacheMovePackageReq struct {
UserID int64 `json:"userID"`
PackageID int64 `json:"packageID"`
NodeID int64 `json:"nodeID"`
}
type CacheMovePackageResp struct {
CacheInfos []ObjectCacheInfo `json:"cacheInfos"`
UserID UserID `json:"userID"`
PackageID PackageID `json:"packageID"`
NodeID NodeID `json:"nodeID"`
}
type CacheMovePackageResp struct{}

func (c *Client) CacheMovePackage(req CacheMovePackageReq) (*CacheMovePackageResp, error) {
url, err := url.JoinPath(c.baseURL, "/cache/movePackage")
url, err := url.JoinPath(c.baseURL, CacheMovePackagePath)
if err != nil {
return nil, err
}
@@ -40,36 +40,3 @@ func (c *Client) CacheMovePackage(req CacheMovePackageReq) (*CacheMovePackageRes

return nil, jsonResp.ToError()
}

type CacheGetPackageObjectCacheInfosReq struct {
UserID int64 `json:"userID"`
PackageID int64 `json:"packageID"`
}
type CacheGetPackageObjectCacheInfosResp struct {
Infos []ObjectCacheInfo `json:"cacheInfos"`
}

func (c *Client) CacheGetPackageObjectCacheInfos(req CacheGetPackageObjectCacheInfosReq) (*CacheGetPackageObjectCacheInfosResp, error) {
url, err := url.JoinPath(c.baseURL, "/cache/getPackageObjectCacheInfos")
if err != nil {
return nil, err
}

resp, err := myhttp.GetForm(url, myhttp.RequestParam{
Query: req,
})
if err != nil {
return nil, err
}

jsonResp, err := myhttp.ParseJSONResponse[response[CacheGetPackageObjectCacheInfosResp]](resp)
if err != nil {
return nil, err
}

if jsonResp.Code == errorcode.OK {
return &jsonResp.Data, nil
}

return nil, jsonResp.ToError()
}

+ 65
- 115
sdks/storage/models.go View File

@@ -1,9 +1,10 @@
package cdssdk

import (
"database/sql/driver"
"fmt"

myreflect "gitlink.org.cn/cloudream/common/utils/reflect"
"gitlink.org.cn/cloudream/common/pkgs/types"
"gitlink.org.cn/cloudream/common/utils/serder"
)

@@ -11,152 +12,101 @@ const (
ObjectPathSeperator = "/"
)

/// TODO 将分散在各处的公共结构体定义集中到这里来
type NodeID int64

const (
RedundancyRep = "rep"
RedundancyEC = "ec"
)
type PackageID int64

// 冗余模式的描述信息。
// 注:如果在mq中的消息结构体使用了此类型,记得使用RegisterTypeSet注册相关的类型。
type RedundancyInfo interface{}
type RedundancyInfoConst interface {
RepRedundancyInfo | ECRedundancyInfo
}
type RepRedundancyInfo struct {
RepCount int `json:"repCount"`
}
type ObjectID int64

func NewRepRedundancyInfo(repCount int) RepRedundancyInfo {
return RepRedundancyInfo{
RepCount: repCount,
}
}
type UserID int64

type ECRedundancyInfo struct {
ECName string `json:"ecName"`
ChunkSize int `json:"chunkSize"`
}
type BucketID int64

func NewECRedundancyInfo(ecName string, chunkSize int) ECRedundancyInfo {
return ECRedundancyInfo{
ECName: ecName,
ChunkSize: chunkSize,
}
}

type TypedRedundancyInfo struct {
Type string `json:"type"`
Info RedundancyInfo `json:"info"`
}
type StorageID int64

func NewTypedRedundancyInfo[T RedundancyInfoConst](info T) TypedRedundancyInfo {
var typ string
type LocationID int64

if myreflect.TypeOf[T]() == myreflect.TypeOf[RepRedundancyInfo]() {
typ = RedundancyRep
} else if myreflect.TypeOf[T]() == myreflect.TypeOf[ECRedundancyInfo]() {
typ = RedundancyEC
}
/// TODO 将分散在各处的公共结构体定义集中到这里来

return TypedRedundancyInfo{
Type: typ,
Info: info,
}
}
func NewTypedRepRedundancyInfo(repCount int) TypedRedundancyInfo {
return TypedRedundancyInfo{
Type: RedundancyRep,
Info: RepRedundancyInfo{
RepCount: repCount,
},
}
type Redundancy interface {
driver.Valuer
}

func NewTypedECRedundancyInfo(ecName string, chunkSize int) TypedRedundancyInfo {
return TypedRedundancyInfo{
Type: RedundancyRep,
Info: ECRedundancyInfo{
ECName: ecName,
ChunkSize: chunkSize,
},
}
}
var RedundancyUnion = serder.UseTypeUnionInternallyTagged(types.Ref(types.NewTypeUnion[Redundancy](
(*RepRedundancy)(nil),
(*ECRedundancy)(nil),
)), "type")

func (i *TypedRedundancyInfo) IsRepInfo() bool {
return i.Type == RedundancyRep
type RepRedundancy struct {
serder.Metadata `union:"rep"`
Type string `json:"type"`
}

func (i *TypedRedundancyInfo) IsECInfo() bool {
return i.Type == RedundancyEC
func NewRepRedundancy() *RepRedundancy {
return &RepRedundancy{
Type: "rep",
}
}

func (i *TypedRedundancyInfo) ToRepInfo() (RepRedundancyInfo, error) {
var info RepRedundancyInfo
err := serder.AnyToAny(i.Info, &info)
return info, err
func (b *RepRedundancy) Value() (driver.Value, error) {
return serder.ObjectToJSONEx[Redundancy](b)
}

func (i *TypedRedundancyInfo) ToECInfo() (ECRedundancyInfo, error) {
var info ECRedundancyInfo
err := serder.AnyToAny(i.Info, &info)
return info, err
type ECRedundancy struct {
serder.Metadata `union:"ec"`
Type string `json:"type"`
K int `json:"k"`
N int `json:"n"`
ChunkSize int `json:"chunkSize"`
}

func (i *TypedRedundancyInfo) Scan(src interface{}) error {
data, ok := src.([]uint8)
if !ok {
return fmt.Errorf("unknow src type: %v", myreflect.TypeOfValue(data))
func NewECRedundancy(k int, n int, chunkSize int) *ECRedundancy {
return &ECRedundancy{
Type: "ec",
K: k,
N: n,
ChunkSize: chunkSize,
}

return serder.JSONToObject(data, i)
}

type NodePackageCachingInfo struct {
NodeID int64 `json:"nodeID"`
FileSize int64 `json:"fileSize"`
ObjectCount int64 `json:"objectCount"`
func (b *ECRedundancy) Value() (driver.Value, error) {
return serder.ObjectToJSONEx[Redundancy](b)
}

type PackageCachingInfo struct {
NodeInfos []NodePackageCachingInfo `json:"nodeInfos"`
PackageSize int64 `json:"packageSize"`
RedunancyType string `json:"redunancyType"`
}
const (
PackageStateNormal = "Normal"
PackageStateDeleted = "Deleted"
)

func NewPackageCachingInfo(nodeInfos []NodePackageCachingInfo, packageSize int64, redunancyType string) PackageCachingInfo {
return PackageCachingInfo{
NodeInfos: nodeInfos,
PackageSize: packageSize,
RedunancyType: redunancyType,
}
type Package struct {
PackageID PackageID `db:"PackageID" json:"packageID"`
Name string `db:"Name" json:"name"`
BucketID BucketID `db:"BucketID" json:"bucketID"`
State string `db:"State" json:"state"`
}

type Object struct {
ObjectID int64 `db:"ObjectID" json:"objectID"`
PackageID int64 `db:"PackageID" json:"packageID"`
Path string `db:"Path" json:"path"`
Size int64 `db:"Size" json:"size,string"`
ObjectID ObjectID `db:"ObjectID" json:"objectID"`
PackageID PackageID `db:"PackageID" json:"packageID"`
Path string `db:"Path" json:"path"`
Size int64 `db:"Size" json:"size,string"`
FileHash string `db:"FileHash" json:"fileHash"`
Redundancy Redundancy `db:"Redundancy" json:"redundancy"`
}

type Package struct {
PackageID int64 `db:"PackageID" json:"packageID"`
Name string `db:"Name" json:"name"`
BucketID int64 `db:"BucketID" json:"bucketID"`
State string `db:"State" json:"state"`
Redundancy TypedRedundancyInfo `db:"Redundancy" json:"redundancy"`
type NodePackageCachingInfo struct {
NodeID NodeID `json:"nodeID"`
FileSize int64 `json:"fileSize"`
ObjectCount int64 `json:"objectCount"`
}

type ObjectCacheInfo struct {
Object Object `json:"object"`
FileHash string `json:"fileHash"`
type PackageCachingInfo struct {
NodeInfos []NodePackageCachingInfo `json:"nodeInfos"`
PackageSize int64 `json:"packageSize"`
}

func NewObjectCacheInfo(object Object, fileHash string) ObjectCacheInfo {
return ObjectCacheInfo{
Object: object,
FileHash: fileHash,
func NewPackageCachingInfo(nodeInfos []NodePackageCachingInfo, packageSize int64) PackageCachingInfo {
return PackageCachingInfo{
NodeInfos: nodeInfos,
PackageSize: packageSize,
}
}



+ 36
- 0
sdks/storage/object.go View File

@@ -6,6 +6,7 @@ import (
"net/url"
"strings"

"gitlink.org.cn/cloudream/common/consts/errorcode"
myhttp "gitlink.org.cn/cloudream/common/utils/http"
"gitlink.org.cn/cloudream/common/utils/serder"
)
@@ -45,3 +46,38 @@ func (c *Client) ObjectDownload(req ObjectDownloadReq) (io.ReadCloser, error) {

return nil, fmt.Errorf("unknow response content type: %s", contType)
}

var ObjectGetPackageObjectsPath = "/object/getPackageObjects"

type ObjectGetPackageObjectsReq struct {
UserID UserID `json:"userID"`
PackageID PackageID `json:"packageID"`
}
type ObjectGetPackageObjectsResp struct {
Objects []Object `json:"objects"`
}

func (c *Client) ObjectGetPackageObjects(req ObjectGetPackageObjectsReq) (*ObjectGetPackageObjectsResp, error) {
url, err := url.JoinPath(c.baseURL, ObjectGetPackageObjectsPath)
if err != nil {
return nil, err
}

resp, err := myhttp.GetForm(url, myhttp.RequestParam{
Query: req,
})
if err != nil {
return nil, err
}

jsonResp, err := myhttp.ParseJSONResponse[response[ObjectGetPackageObjectsResp]](resp)
if err != nil {
return nil, err
}

if jsonResp.Code == errorcode.OK {
return &jsonResp.Data, nil
}

return nil, jsonResp.ToError()
}

+ 13
- 14
sdks/storage/package.go View File

@@ -13,8 +13,8 @@ import (
)

type PackageGetReq struct {
UserID int64 `json:"userID"`
PackageID int64 `json:"packageID"`
UserID UserID `json:"userID"`
PackageID PackageID `json:"packageID"`
}
type PackageGetResp struct {
Package
@@ -46,11 +46,10 @@ func (c *Client) PackageGet(req PackageGetReq) (*PackageGetResp, error) {
}

type PackageUploadReq struct {
UserID int64 `json:"userID"`
BucketID int64 `json:"bucketID"`
UserID UserID `json:"userID"`
BucketID BucketID `json:"bucketID"`
Name string `json:"name"`
Redundancy TypedRedundancyInfo `json:"redundancy"`
NodeAffinity *int64 `json:"nodeAffinity"`
NodeAffinity *NodeID `json:"nodeAffinity"`
Files PackageUploadFileIterator `json:"-"`
}

@@ -62,7 +61,7 @@ type IterPackageUploadFile struct {
type PackageUploadFileIterator = iterator.Iterator[*IterPackageUploadFile]

type PackageUploadResp struct {
PackageID int64 `json:"packageID,string"`
PackageID PackageID `json:"packageID,string"`
}

func (c *Client) PackageUpload(req PackageUploadReq) (*PackageUploadResp, error) {
@@ -109,8 +108,8 @@ func (c *Client) PackageUpload(req PackageUploadReq) (*PackageUploadResp, error)
}

type PackageDeleteReq struct {
UserID int64 `json:"userID"`
PackageID int64 `json:"packageID"`
UserID UserID `json:"userID"`
PackageID PackageID `json:"packageID"`
}

func (c *Client) PackageDelete(req PackageDeleteReq) error {
@@ -145,8 +144,8 @@ func (c *Client) PackageDelete(req PackageDeleteReq) error {
}

type PackageGetCachedNodesReq struct {
PackageID int64 `json:"packageID"`
UserID int64 `json:"userID"`
PackageID PackageID `json:"packageID"`
UserID UserID `json:"userID"`
}

type PackageGetCachedNodesResp struct {
@@ -183,12 +182,12 @@ func (c *Client) PackageGetCachedNodes(req PackageGetCachedNodesReq) (*PackageGe
}

type PackageGetLoadedNodesReq struct {
PackageID int64 `json:"packageID"`
UserID int64 `json:"userID"`
PackageID PackageID `json:"packageID"`
UserID UserID `json:"userID"`
}

type PackageGetLoadedNodesResp struct {
NodeIDs []int64 `json:"nodeIDs"`
NodeIDs []NodeID `json:"nodeIDs"`
}

func (c *Client) PackageGetLoadedNodes(req PackageGetLoadedNodesReq) (*PackageGetLoadedNodesResp, error) {


+ 12
- 13
sdks/storage/storage.go View File

@@ -11,9 +11,9 @@ import (
)

type StorageLoadPackageReq struct {
UserID int64 `json:"userID"`
PackageID int64 `json:"packageID"`
StorageID int64 `json:"storageID"`
UserID UserID `json:"userID"`
PackageID PackageID `json:"packageID"`
StorageID StorageID `json:"storageID"`
}
type StorageLoadPackageResp struct {
FullPath string `json:"fullPath"`
@@ -45,16 +45,15 @@ func (c *Client) StorageLoadPackage(req StorageLoadPackageReq) (*StorageLoadPack
}

type StorageCreatePackageReq struct {
UserID int64 `json:"userID"`
StorageID int64 `json:"storageID"`
Path string `json:"path"`
BucketID int64 `json:"bucketID"`
Name string `json:"name"`
Redundancy TypedRedundancyInfo `json:"redundancy"`
UserID UserID `json:"userID"`
StorageID StorageID `json:"storageID"`
Path string `json:"path"`
BucketID BucketID `json:"bucketID"`
Name string `json:"name"`
}

type StorageCreatePackageResp struct {
PackageID int64 `json:"packageID"`
PackageID PackageID `json:"packageID"`
}

func (c *Client) StorageCreatePackage(req StorageCreatePackageReq) (*StorageCreatePackageResp, error) {
@@ -88,12 +87,12 @@ func (c *Client) StorageCreatePackage(req StorageCreatePackageReq) (*StorageCrea
}

type StorageGetInfoReq struct {
UserID int64 `json:"userID"`
StorageID int64 `json:"storageID"`
UserID UserID `json:"userID"`
StorageID StorageID `json:"storageID"`
}
type StorageGetInfoResp struct {
Name string `json:"name"`
NodeID int64 `json:"nodeID"`
NodeID NodeID `json:"nodeID"`
Directory string `json:"directory"`
}



+ 5
- 29
sdks/storage/storage_test.go View File

@@ -27,10 +27,6 @@ func Test_PackageGet(t *testing.T) {
UserID: 0,
BucketID: 1,
Name: pkgName,
Redundancy: TypedRedundancyInfo{
Type: RedundancyRep,
Info: NewRepRedundancyInfo(1),
},
Files: iterator.Array(
&IterPackageUploadFile{
Path: "test",
@@ -72,15 +68,11 @@ func Test_Object(t *testing.T) {
fileData[i] = byte(i)
}

nodeAff := int64(2)
nodeAff := NodeID(2)
upResp, err := cli.PackageUpload(PackageUploadReq{
UserID: 0,
BucketID: 1,
Name: uuid.NewString(),
Redundancy: TypedRedundancyInfo{
Type: RedundancyRep,
Info: NewRepRedundancyInfo(1),
},
UserID: 0,
BucketID: 1,
Name: uuid.NewString(),
NodeAffinity: &nodeAff,
Files: iterator.Array(
&IterPackageUploadFile{
@@ -129,10 +121,6 @@ func Test_Storage(t *testing.T) {
UserID: 0,
BucketID: 1,
Name: uuid.NewString(),
Redundancy: TypedRedundancyInfo{
Type: RedundancyRep,
Info: NewRepRedundancyInfo(1),
},
Files: iterator.Array(
&IterPackageUploadFile{
Path: "test",
@@ -176,10 +164,6 @@ func Test_Cache(t *testing.T) {
UserID: 0,
BucketID: 1,
Name: uuid.NewString(),
Redundancy: TypedRedundancyInfo{
Type: RedundancyRep,
Info: NewRepRedundancyInfo(1),
},
Files: iterator.Array(
&IterPackageUploadFile{
Path: "test.txt",
@@ -193,21 +177,13 @@ func Test_Cache(t *testing.T) {
})
So(err, ShouldBeNil)

cacheMoveResp, err := cli.CacheMovePackage(CacheMovePackageReq{
_, err = cli.CacheMovePackage(CacheMovePackageReq{
UserID: 0,
PackageID: upResp.PackageID,
NodeID: 1,
})
So(err, ShouldBeNil)

cacheInfoResp, err := cli.CacheGetPackageObjectCacheInfos(CacheGetPackageObjectCacheInfosReq{
UserID: 0,
PackageID: upResp.PackageID,
})
So(err, ShouldBeNil)

So(cacheInfoResp.Infos, ShouldResemble, cacheMoveResp.CacheInfos)

err = cli.PackageDelete(PackageDeleteReq{
UserID: 0,
PackageID: upResp.PackageID,


+ 61
- 0
utils/io/clone.go View File

@@ -0,0 +1,61 @@
package io

import (
"io"
)

// 复制一个流。注:返回的多个流的读取不能在同一个线程,且如果不再需要读取返回的某个流,那么必须关闭这个流,否则会阻塞其他流的读取。
func Clone(str io.Reader, count int) []io.ReadCloser {
prs := make([]io.ReadCloser, count)
pws := make([]*io.PipeWriter, count)

for i := 0; i < count; i++ {
prs[i], pws[i] = io.Pipe()
}

go func() {
pwCount := count
buf := make([]byte, 4096)
var closeErr error
for {
if pwCount == 0 {
return
}

rd, err := str.Read(buf)
for i := 0; i < count; i++ {
if pws[i] == nil {
continue
}

err := WriteAll(pws[i], buf[:rd])
if err != nil {
pws[i] = nil
pwCount--
}
}

if err == nil {
continue
}

closeErr = err
break
}

for i := 0; i < count; i++ {
if pws[i] == nil {
continue
}
pws[i].CloseWithError(closeErr)
}
}()

return prs
}

/*
func BufferedClone(str io.Reader, count int, bufSize int) []io.ReadCloser {

}
*/

+ 66
- 0
utils/io/io_test.go View File

@@ -3,6 +3,7 @@ package io
import (
"bytes"
"io"
"sync"
"testing"

. "github.com/smartystreets/goconvey/convey"
@@ -115,3 +116,68 @@ func Test_Length(t *testing.T) {
So(err, ShouldEqual, io.ErrUnexpectedEOF)
})
}

func Test_Clone(t *testing.T) {
Convey("所有输出流都会被读取完", t, func() {
data := []byte{1, 2, 3, 4, 5}
str := bytes.NewReader(data)

cloneds := Clone(str, 3)
reads := make([][]byte, 3)
errs := make([]error, 3)

wg := sync.WaitGroup{}
wg.Add(3)

go func() {
reads[0], errs[0] = io.ReadAll(cloneds[0])
wg.Done()
}()
go func() {
reads[1], errs[1] = io.ReadAll(cloneds[1])
wg.Done()
}()
go func() {
reads[2], errs[2] = io.ReadAll(cloneds[2])
wg.Done()
}()

wg.Wait()

So(reads, ShouldResemble, [][]byte{data, data, data})
So(errs, ShouldResemble, []error{nil, nil, nil})
})

Convey("其中一个流读到一半就停止读取", t, func() {
data := []byte{1, 2, 3, 4, 5}
str := bytes.NewReader(data)

cloneds := Clone(str, 3)
reads := make([][]byte, 3)
errs := make([]error, 3)

wg := sync.WaitGroup{}
wg.Add(3)

go func() {
reads[0], errs[0] = io.ReadAll(cloneds[0])
wg.Done()
}()
go func() {
buf := make([]byte, 3)
_, errs[1] = io.ReadFull(cloneds[1], buf)
reads[1] = buf
cloneds[1].Close()
wg.Done()
}()
go func() {
reads[2], errs[2] = io.ReadAll(cloneds[2])
wg.Done()
}()

wg.Wait()

So(reads, ShouldResemble, [][]byte{data, {1, 2, 3}, data})
So(errs, ShouldResemble, []error{nil, nil, nil})
})
}

+ 1
- 1
utils/serder/union_handler.go View File

@@ -22,7 +22,7 @@ type TypeUnionExternallyTagged[T any] struct {
TUnion *types.TypeUnion[T]
}

// 遇到TypeUnion的基类(UnionType)的字段时,将其实际值的类型信息也编码到JSON中,反序列化时也会解析出类型信息,还原出真实的类型。
// 遇到TypeUnion的基类(UnionType)的字段时,将其实际值的类型信息也编码到JSON中,反序列化时也会根据解析出类型信息,还原出真实的类型。
// Externally Tagged的格式是:{ "类型名": {...对象内容...} }
//
// 可以通过内嵌Metadata结构体,并在它身上增加"union"Tag来指定类型名称,如果没有指定,则默认使用系统类型名(包括包路径)。


Loading…
Cancel
Save