diff --git a/sdks/imfs/imfs_test.go b/sdks/imfs/imfs_test.go index 3610cfc..22011a1 100644 --- a/sdks/imfs/imfs_test.go +++ b/sdks/imfs/imfs_test.go @@ -32,7 +32,7 @@ func Test_Package(t *testing.T) { URL: "http://localhost:7893", }) - _, err := cli.PackageGetWithObjectCacheInfos(PackageGetWithObjectCacheInfos{UserID: 0, PackageID: 13}) + _, err := cli.PackageGetWithObjects(PackageGetWithObjectsInfos{UserID: 0, PackageID: 13}) So(err, ShouldBeNil) }) } diff --git a/sdks/imfs/package.go b/sdks/imfs/package.go index ffbab30..f59a04b 100644 --- a/sdks/imfs/package.go +++ b/sdks/imfs/package.go @@ -8,19 +8,19 @@ import ( myhttp "gitlink.org.cn/cloudream/common/utils/http" ) -const PackageGetWithObjectCacheInfosPath = "/package/getWithObjectCacheInfos" +const PackageGetWithObjectsPath = "/package/getWithObjects" -type PackageGetWithObjectCacheInfos struct { +type PackageGetWithObjectsInfos struct { UserID int64 `json:"userID"` PackageID int64 `json:"packageID"` } -type PackageGetWithObjectCacheInfosResp struct { - Package cdssdk.Package `json:"package"` - ObjectCacheInfos []cdssdk.ObjectCacheInfo `json:"objectCacheInfos"` +type PackageGetWithObjectsResp struct { + Package cdssdk.Package `json:"package"` + Objects []cdssdk.Object `json:"objects"` } -func (c *Client) PackageGetWithObjectCacheInfos(req PackageGetWithObjectCacheInfos) (*PackageGetWithObjectCacheInfosResp, error) { - url, err := url.JoinPath(c.baseURL, PackageGetWithObjectCacheInfosPath) +func (c *Client) PackageGetWithObjects(req PackageGetWithObjectsInfos) (*PackageGetWithObjectsResp, error) { + url, err := url.JoinPath(c.baseURL, PackageGetWithObjectsPath) if err != nil { return nil, err } @@ -32,7 +32,7 @@ func (c *Client) PackageGetWithObjectCacheInfos(req PackageGetWithObjectCacheInf return nil, err } - jsonResp, err := myhttp.ParseJSONResponse[response[PackageGetWithObjectCacheInfosResp]](resp) + jsonResp, err := myhttp.ParseJSONResponse[response[PackageGetWithObjectsResp]](resp) if err != nil { return nil, err } diff --git a/sdks/scheduler/jobset.go b/sdks/scheduler/jobset.go index 6d232f1..b266d68 100644 --- a/sdks/scheduler/jobset.go +++ b/sdks/scheduler/jobset.go @@ -6,6 +6,7 @@ import ( "strings" "gitlink.org.cn/cloudream/common/consts/errorcode" + cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" myhttp "gitlink.org.cn/cloudream/common/utils/http" "gitlink.org.cn/cloudream/common/utils/serder" ) @@ -50,10 +51,10 @@ func (c *Client) JobSetSumbit(req JobSetSumbitReq) (*JobSetSumbitResp, error) { } type JobSetLocalFileUploadedReq struct { - JobSetID string `json:"jobSetID"` - LocalPath string `json:"localPath"` - Error string `json:"error"` - PackageID int64 `json:"packageID"` + JobSetID string `json:"jobSetID"` + LocalPath string `json:"localPath"` + Error string `json:"error"` + PackageID cdssdk.PackageID `json:"packageID"` } func (c *Client) JobSetLocalFileUploaded(req JobSetLocalFileUploadedReq) error { diff --git a/sdks/scheduler/models.go b/sdks/scheduler/models.go index 29a2594..031a0ce 100644 --- a/sdks/scheduler/models.go +++ b/sdks/scheduler/models.go @@ -59,10 +59,9 @@ type NormalJobInfo struct { type ResourceJobInfo struct { serder.Metadata `union:"Resource"` JobInfoBase - Type string `json:"type"` - BucketID int64 `json:"bucketID"` - Redundancy cdssdk.TypedRedundancyInfo `json:"redundancy"` - TargetLocalJobID string `json:"targetLocalJobID"` + Type string `json:"type"` + BucketID cdssdk.BucketID `json:"bucketID"` + TargetLocalJobID string `json:"targetLocalJobID"` } type JobFilesInfo struct { @@ -90,8 +89,8 @@ func (i *JobFileInfoBase) Noop() {} type PackageJobFileInfo struct { serder.Metadata `union:"Package"` JobFileInfoBase - Type string `json:"type"` - PackageID int64 `json:"packageID"` + Type string `json:"type"` + PackageID cdssdk.PackageID `json:"packageID"` } type LocalJobFileInfo struct { @@ -141,6 +140,6 @@ type JobSetFilesUploadScheme struct { } type LocalFileUploadScheme struct { - LocalPath string `json:"localPath"` - UploadToCDSNodeID *int64 `json:"uploadToCDSNodeID"` + LocalPath string `json:"localPath"` + UploadToCDSNodeID *cdssdk.NodeID `json:"uploadToCDSNodeID"` } diff --git a/sdks/storage/cache.go b/sdks/storage/cache.go index 3deab7f..a6866a1 100644 --- a/sdks/storage/cache.go +++ b/sdks/storage/cache.go @@ -7,17 +7,17 @@ import ( myhttp "gitlink.org.cn/cloudream/common/utils/http" ) +var CacheMovePackagePath = "/cache/movePackage" + type CacheMovePackageReq struct { - UserID int64 `json:"userID"` - PackageID int64 `json:"packageID"` - NodeID int64 `json:"nodeID"` -} -type CacheMovePackageResp struct { - CacheInfos []ObjectCacheInfo `json:"cacheInfos"` + UserID UserID `json:"userID"` + PackageID PackageID `json:"packageID"` + NodeID NodeID `json:"nodeID"` } +type CacheMovePackageResp struct{} func (c *Client) CacheMovePackage(req CacheMovePackageReq) (*CacheMovePackageResp, error) { - url, err := url.JoinPath(c.baseURL, "/cache/movePackage") + url, err := url.JoinPath(c.baseURL, CacheMovePackagePath) if err != nil { return nil, err } @@ -40,36 +40,3 @@ func (c *Client) CacheMovePackage(req CacheMovePackageReq) (*CacheMovePackageRes return nil, jsonResp.ToError() } - -type CacheGetPackageObjectCacheInfosReq struct { - UserID int64 `json:"userID"` - PackageID int64 `json:"packageID"` -} -type CacheGetPackageObjectCacheInfosResp struct { - Infos []ObjectCacheInfo `json:"cacheInfos"` -} - -func (c *Client) CacheGetPackageObjectCacheInfos(req CacheGetPackageObjectCacheInfosReq) (*CacheGetPackageObjectCacheInfosResp, error) { - url, err := url.JoinPath(c.baseURL, "/cache/getPackageObjectCacheInfos") - if err != nil { - return nil, err - } - - resp, err := myhttp.GetForm(url, myhttp.RequestParam{ - Query: req, - }) - if err != nil { - return nil, err - } - - jsonResp, err := myhttp.ParseJSONResponse[response[CacheGetPackageObjectCacheInfosResp]](resp) - if err != nil { - return nil, err - } - - if jsonResp.Code == errorcode.OK { - return &jsonResp.Data, nil - } - - return nil, jsonResp.ToError() -} diff --git a/sdks/storage/models.go b/sdks/storage/models.go index 6286843..12647bf 100644 --- a/sdks/storage/models.go +++ b/sdks/storage/models.go @@ -1,9 +1,10 @@ package cdssdk import ( + "database/sql/driver" "fmt" - myreflect "gitlink.org.cn/cloudream/common/utils/reflect" + "gitlink.org.cn/cloudream/common/pkgs/types" "gitlink.org.cn/cloudream/common/utils/serder" ) @@ -11,152 +12,101 @@ const ( ObjectPathSeperator = "/" ) -/// TODO 将分散在各处的公共结构体定义集中到这里来 +type NodeID int64 -const ( - RedundancyRep = "rep" - RedundancyEC = "ec" -) +type PackageID int64 -// 冗余模式的描述信息。 -// 注:如果在mq中的消息结构体使用了此类型,记得使用RegisterTypeSet注册相关的类型。 -type RedundancyInfo interface{} -type RedundancyInfoConst interface { - RepRedundancyInfo | ECRedundancyInfo -} -type RepRedundancyInfo struct { - RepCount int `json:"repCount"` -} +type ObjectID int64 -func NewRepRedundancyInfo(repCount int) RepRedundancyInfo { - return RepRedundancyInfo{ - RepCount: repCount, - } -} +type UserID int64 -type ECRedundancyInfo struct { - ECName string `json:"ecName"` - ChunkSize int `json:"chunkSize"` -} +type BucketID int64 -func NewECRedundancyInfo(ecName string, chunkSize int) ECRedundancyInfo { - return ECRedundancyInfo{ - ECName: ecName, - ChunkSize: chunkSize, - } -} - -type TypedRedundancyInfo struct { - Type string `json:"type"` - Info RedundancyInfo `json:"info"` -} +type StorageID int64 -func NewTypedRedundancyInfo[T RedundancyInfoConst](info T) TypedRedundancyInfo { - var typ string +type LocationID int64 - if myreflect.TypeOf[T]() == myreflect.TypeOf[RepRedundancyInfo]() { - typ = RedundancyRep - } else if myreflect.TypeOf[T]() == myreflect.TypeOf[ECRedundancyInfo]() { - typ = RedundancyEC - } +/// TODO 将分散在各处的公共结构体定义集中到这里来 - return TypedRedundancyInfo{ - Type: typ, - Info: info, - } -} -func NewTypedRepRedundancyInfo(repCount int) TypedRedundancyInfo { - return TypedRedundancyInfo{ - Type: RedundancyRep, - Info: RepRedundancyInfo{ - RepCount: repCount, - }, - } +type Redundancy interface { + driver.Valuer } -func NewTypedECRedundancyInfo(ecName string, chunkSize int) TypedRedundancyInfo { - return TypedRedundancyInfo{ - Type: RedundancyRep, - Info: ECRedundancyInfo{ - ECName: ecName, - ChunkSize: chunkSize, - }, - } -} +var RedundancyUnion = serder.UseTypeUnionInternallyTagged(types.Ref(types.NewTypeUnion[Redundancy]( + (*RepRedundancy)(nil), + (*ECRedundancy)(nil), +)), "type") -func (i *TypedRedundancyInfo) IsRepInfo() bool { - return i.Type == RedundancyRep +type RepRedundancy struct { + serder.Metadata `union:"rep"` + Type string `json:"type"` } -func (i *TypedRedundancyInfo) IsECInfo() bool { - return i.Type == RedundancyEC +func NewRepRedundancy() *RepRedundancy { + return &RepRedundancy{ + Type: "rep", + } } - -func (i *TypedRedundancyInfo) ToRepInfo() (RepRedundancyInfo, error) { - var info RepRedundancyInfo - err := serder.AnyToAny(i.Info, &info) - return info, err +func (b *RepRedundancy) Value() (driver.Value, error) { + return serder.ObjectToJSONEx[Redundancy](b) } -func (i *TypedRedundancyInfo) ToECInfo() (ECRedundancyInfo, error) { - var info ECRedundancyInfo - err := serder.AnyToAny(i.Info, &info) - return info, err +type ECRedundancy struct { + serder.Metadata `union:"ec"` + Type string `json:"type"` + K int `json:"k"` + N int `json:"n"` + ChunkSize int `json:"chunkSize"` } -func (i *TypedRedundancyInfo) Scan(src interface{}) error { - data, ok := src.([]uint8) - if !ok { - return fmt.Errorf("unknow src type: %v", myreflect.TypeOfValue(data)) +func NewECRedundancy(k int, n int, chunkSize int) *ECRedundancy { + return &ECRedundancy{ + Type: "ec", + K: k, + N: n, + ChunkSize: chunkSize, } - - return serder.JSONToObject(data, i) } - -type NodePackageCachingInfo struct { - NodeID int64 `json:"nodeID"` - FileSize int64 `json:"fileSize"` - ObjectCount int64 `json:"objectCount"` +func (b *ECRedundancy) Value() (driver.Value, error) { + return serder.ObjectToJSONEx[Redundancy](b) } -type PackageCachingInfo struct { - NodeInfos []NodePackageCachingInfo `json:"nodeInfos"` - PackageSize int64 `json:"packageSize"` - RedunancyType string `json:"redunancyType"` -} +const ( + PackageStateNormal = "Normal" + PackageStateDeleted = "Deleted" +) -func NewPackageCachingInfo(nodeInfos []NodePackageCachingInfo, packageSize int64, redunancyType string) PackageCachingInfo { - return PackageCachingInfo{ - NodeInfos: nodeInfos, - PackageSize: packageSize, - RedunancyType: redunancyType, - } +type Package struct { + PackageID PackageID `db:"PackageID" json:"packageID"` + Name string `db:"Name" json:"name"` + BucketID BucketID `db:"BucketID" json:"bucketID"` + State string `db:"State" json:"state"` } type Object struct { - ObjectID int64 `db:"ObjectID" json:"objectID"` - PackageID int64 `db:"PackageID" json:"packageID"` - Path string `db:"Path" json:"path"` - Size int64 `db:"Size" json:"size,string"` + ObjectID ObjectID `db:"ObjectID" json:"objectID"` + PackageID PackageID `db:"PackageID" json:"packageID"` + Path string `db:"Path" json:"path"` + Size int64 `db:"Size" json:"size,string"` + FileHash string `db:"FileHash" json:"fileHash"` + Redundancy Redundancy `db:"Redundancy" json:"redundancy"` } -type Package struct { - PackageID int64 `db:"PackageID" json:"packageID"` - Name string `db:"Name" json:"name"` - BucketID int64 `db:"BucketID" json:"bucketID"` - State string `db:"State" json:"state"` - Redundancy TypedRedundancyInfo `db:"Redundancy" json:"redundancy"` +type NodePackageCachingInfo struct { + NodeID NodeID `json:"nodeID"` + FileSize int64 `json:"fileSize"` + ObjectCount int64 `json:"objectCount"` } -type ObjectCacheInfo struct { - Object Object `json:"object"` - FileHash string `json:"fileHash"` +type PackageCachingInfo struct { + NodeInfos []NodePackageCachingInfo `json:"nodeInfos"` + PackageSize int64 `json:"packageSize"` } -func NewObjectCacheInfo(object Object, fileHash string) ObjectCacheInfo { - return ObjectCacheInfo{ - Object: object, - FileHash: fileHash, +func NewPackageCachingInfo(nodeInfos []NodePackageCachingInfo, packageSize int64) PackageCachingInfo { + return PackageCachingInfo{ + NodeInfos: nodeInfos, + PackageSize: packageSize, } } diff --git a/sdks/storage/object.go b/sdks/storage/object.go index c7152bb..81fb5d7 100644 --- a/sdks/storage/object.go +++ b/sdks/storage/object.go @@ -6,6 +6,7 @@ import ( "net/url" "strings" + "gitlink.org.cn/cloudream/common/consts/errorcode" myhttp "gitlink.org.cn/cloudream/common/utils/http" "gitlink.org.cn/cloudream/common/utils/serder" ) @@ -45,3 +46,38 @@ func (c *Client) ObjectDownload(req ObjectDownloadReq) (io.ReadCloser, error) { return nil, fmt.Errorf("unknow response content type: %s", contType) } + +var ObjectGetPackageObjectsPath = "/object/getPackageObjects" + +type ObjectGetPackageObjectsReq struct { + UserID UserID `json:"userID"` + PackageID PackageID `json:"packageID"` +} +type ObjectGetPackageObjectsResp struct { + Objects []Object `json:"objects"` +} + +func (c *Client) ObjectGetPackageObjects(req ObjectGetPackageObjectsReq) (*ObjectGetPackageObjectsResp, error) { + url, err := url.JoinPath(c.baseURL, ObjectGetPackageObjectsPath) + if err != nil { + return nil, err + } + + resp, err := myhttp.GetForm(url, myhttp.RequestParam{ + Query: req, + }) + if err != nil { + return nil, err + } + + jsonResp, err := myhttp.ParseJSONResponse[response[ObjectGetPackageObjectsResp]](resp) + if err != nil { + return nil, err + } + + if jsonResp.Code == errorcode.OK { + return &jsonResp.Data, nil + } + + return nil, jsonResp.ToError() +} diff --git a/sdks/storage/package.go b/sdks/storage/package.go index fd11d84..c264a21 100644 --- a/sdks/storage/package.go +++ b/sdks/storage/package.go @@ -13,8 +13,8 @@ import ( ) type PackageGetReq struct { - UserID int64 `json:"userID"` - PackageID int64 `json:"packageID"` + UserID UserID `json:"userID"` + PackageID PackageID `json:"packageID"` } type PackageGetResp struct { Package @@ -46,11 +46,10 @@ func (c *Client) PackageGet(req PackageGetReq) (*PackageGetResp, error) { } type PackageUploadReq struct { - UserID int64 `json:"userID"` - BucketID int64 `json:"bucketID"` + UserID UserID `json:"userID"` + BucketID BucketID `json:"bucketID"` Name string `json:"name"` - Redundancy TypedRedundancyInfo `json:"redundancy"` - NodeAffinity *int64 `json:"nodeAffinity"` + NodeAffinity *NodeID `json:"nodeAffinity"` Files PackageUploadFileIterator `json:"-"` } @@ -62,7 +61,7 @@ type IterPackageUploadFile struct { type PackageUploadFileIterator = iterator.Iterator[*IterPackageUploadFile] type PackageUploadResp struct { - PackageID int64 `json:"packageID,string"` + PackageID PackageID `json:"packageID,string"` } func (c *Client) PackageUpload(req PackageUploadReq) (*PackageUploadResp, error) { @@ -109,8 +108,8 @@ func (c *Client) PackageUpload(req PackageUploadReq) (*PackageUploadResp, error) } type PackageDeleteReq struct { - UserID int64 `json:"userID"` - PackageID int64 `json:"packageID"` + UserID UserID `json:"userID"` + PackageID PackageID `json:"packageID"` } func (c *Client) PackageDelete(req PackageDeleteReq) error { @@ -145,8 +144,8 @@ func (c *Client) PackageDelete(req PackageDeleteReq) error { } type PackageGetCachedNodesReq struct { - PackageID int64 `json:"packageID"` - UserID int64 `json:"userID"` + PackageID PackageID `json:"packageID"` + UserID UserID `json:"userID"` } type PackageGetCachedNodesResp struct { @@ -183,12 +182,12 @@ func (c *Client) PackageGetCachedNodes(req PackageGetCachedNodesReq) (*PackageGe } type PackageGetLoadedNodesReq struct { - PackageID int64 `json:"packageID"` - UserID int64 `json:"userID"` + PackageID PackageID `json:"packageID"` + UserID UserID `json:"userID"` } type PackageGetLoadedNodesResp struct { - NodeIDs []int64 `json:"nodeIDs"` + NodeIDs []NodeID `json:"nodeIDs"` } func (c *Client) PackageGetLoadedNodes(req PackageGetLoadedNodesReq) (*PackageGetLoadedNodesResp, error) { diff --git a/sdks/storage/storage.go b/sdks/storage/storage.go index 376aecd..827d06e 100644 --- a/sdks/storage/storage.go +++ b/sdks/storage/storage.go @@ -11,9 +11,9 @@ import ( ) type StorageLoadPackageReq struct { - UserID int64 `json:"userID"` - PackageID int64 `json:"packageID"` - StorageID int64 `json:"storageID"` + UserID UserID `json:"userID"` + PackageID PackageID `json:"packageID"` + StorageID StorageID `json:"storageID"` } type StorageLoadPackageResp struct { FullPath string `json:"fullPath"` @@ -45,16 +45,15 @@ func (c *Client) StorageLoadPackage(req StorageLoadPackageReq) (*StorageLoadPack } type StorageCreatePackageReq struct { - UserID int64 `json:"userID"` - StorageID int64 `json:"storageID"` - Path string `json:"path"` - BucketID int64 `json:"bucketID"` - Name string `json:"name"` - Redundancy TypedRedundancyInfo `json:"redundancy"` + UserID UserID `json:"userID"` + StorageID StorageID `json:"storageID"` + Path string `json:"path"` + BucketID BucketID `json:"bucketID"` + Name string `json:"name"` } type StorageCreatePackageResp struct { - PackageID int64 `json:"packageID"` + PackageID PackageID `json:"packageID"` } func (c *Client) StorageCreatePackage(req StorageCreatePackageReq) (*StorageCreatePackageResp, error) { @@ -88,12 +87,12 @@ func (c *Client) StorageCreatePackage(req StorageCreatePackageReq) (*StorageCrea } type StorageGetInfoReq struct { - UserID int64 `json:"userID"` - StorageID int64 `json:"storageID"` + UserID UserID `json:"userID"` + StorageID StorageID `json:"storageID"` } type StorageGetInfoResp struct { Name string `json:"name"` - NodeID int64 `json:"nodeID"` + NodeID NodeID `json:"nodeID"` Directory string `json:"directory"` } diff --git a/sdks/storage/storage_test.go b/sdks/storage/storage_test.go index a41732b..11e7115 100644 --- a/sdks/storage/storage_test.go +++ b/sdks/storage/storage_test.go @@ -27,10 +27,6 @@ func Test_PackageGet(t *testing.T) { UserID: 0, BucketID: 1, Name: pkgName, - Redundancy: TypedRedundancyInfo{ - Type: RedundancyRep, - Info: NewRepRedundancyInfo(1), - }, Files: iterator.Array( &IterPackageUploadFile{ Path: "test", @@ -72,15 +68,11 @@ func Test_Object(t *testing.T) { fileData[i] = byte(i) } - nodeAff := int64(2) + nodeAff := NodeID(2) upResp, err := cli.PackageUpload(PackageUploadReq{ - UserID: 0, - BucketID: 1, - Name: uuid.NewString(), - Redundancy: TypedRedundancyInfo{ - Type: RedundancyRep, - Info: NewRepRedundancyInfo(1), - }, + UserID: 0, + BucketID: 1, + Name: uuid.NewString(), NodeAffinity: &nodeAff, Files: iterator.Array( &IterPackageUploadFile{ @@ -129,10 +121,6 @@ func Test_Storage(t *testing.T) { UserID: 0, BucketID: 1, Name: uuid.NewString(), - Redundancy: TypedRedundancyInfo{ - Type: RedundancyRep, - Info: NewRepRedundancyInfo(1), - }, Files: iterator.Array( &IterPackageUploadFile{ Path: "test", @@ -176,10 +164,6 @@ func Test_Cache(t *testing.T) { UserID: 0, BucketID: 1, Name: uuid.NewString(), - Redundancy: TypedRedundancyInfo{ - Type: RedundancyRep, - Info: NewRepRedundancyInfo(1), - }, Files: iterator.Array( &IterPackageUploadFile{ Path: "test.txt", @@ -193,21 +177,13 @@ func Test_Cache(t *testing.T) { }) So(err, ShouldBeNil) - cacheMoveResp, err := cli.CacheMovePackage(CacheMovePackageReq{ + _, err = cli.CacheMovePackage(CacheMovePackageReq{ UserID: 0, PackageID: upResp.PackageID, NodeID: 1, }) So(err, ShouldBeNil) - cacheInfoResp, err := cli.CacheGetPackageObjectCacheInfos(CacheGetPackageObjectCacheInfosReq{ - UserID: 0, - PackageID: upResp.PackageID, - }) - So(err, ShouldBeNil) - - So(cacheInfoResp.Infos, ShouldResemble, cacheMoveResp.CacheInfos) - err = cli.PackageDelete(PackageDeleteReq{ UserID: 0, PackageID: upResp.PackageID, diff --git a/utils/io/clone.go b/utils/io/clone.go new file mode 100644 index 0000000..b1cf556 --- /dev/null +++ b/utils/io/clone.go @@ -0,0 +1,61 @@ +package io + +import ( + "io" +) + +// 复制一个流。注:返回的多个流的读取不能在同一个线程,且如果不再需要读取返回的某个流,那么必须关闭这个流,否则会阻塞其他流的读取。 +func Clone(str io.Reader, count int) []io.ReadCloser { + prs := make([]io.ReadCloser, count) + pws := make([]*io.PipeWriter, count) + + for i := 0; i < count; i++ { + prs[i], pws[i] = io.Pipe() + } + + go func() { + pwCount := count + buf := make([]byte, 4096) + var closeErr error + for { + if pwCount == 0 { + return + } + + rd, err := str.Read(buf) + for i := 0; i < count; i++ { + if pws[i] == nil { + continue + } + + err := WriteAll(pws[i], buf[:rd]) + if err != nil { + pws[i] = nil + pwCount-- + } + } + + if err == nil { + continue + } + + closeErr = err + break + } + + for i := 0; i < count; i++ { + if pws[i] == nil { + continue + } + pws[i].CloseWithError(closeErr) + } + }() + + return prs +} + +/* +func BufferedClone(str io.Reader, count int, bufSize int) []io.ReadCloser { + +} +*/ diff --git a/utils/io/io_test.go b/utils/io/io_test.go index 3066aa9..a4eab09 100644 --- a/utils/io/io_test.go +++ b/utils/io/io_test.go @@ -3,6 +3,7 @@ package io import ( "bytes" "io" + "sync" "testing" . "github.com/smartystreets/goconvey/convey" @@ -115,3 +116,68 @@ func Test_Length(t *testing.T) { So(err, ShouldEqual, io.ErrUnexpectedEOF) }) } + +func Test_Clone(t *testing.T) { + Convey("所有输出流都会被读取完", t, func() { + data := []byte{1, 2, 3, 4, 5} + str := bytes.NewReader(data) + + cloneds := Clone(str, 3) + reads := make([][]byte, 3) + errs := make([]error, 3) + + wg := sync.WaitGroup{} + wg.Add(3) + + go func() { + reads[0], errs[0] = io.ReadAll(cloneds[0]) + wg.Done() + }() + go func() { + reads[1], errs[1] = io.ReadAll(cloneds[1]) + wg.Done() + }() + go func() { + reads[2], errs[2] = io.ReadAll(cloneds[2]) + wg.Done() + }() + + wg.Wait() + + So(reads, ShouldResemble, [][]byte{data, data, data}) + So(errs, ShouldResemble, []error{nil, nil, nil}) + }) + + Convey("其中一个流读到一半就停止读取", t, func() { + data := []byte{1, 2, 3, 4, 5} + str := bytes.NewReader(data) + + cloneds := Clone(str, 3) + reads := make([][]byte, 3) + errs := make([]error, 3) + + wg := sync.WaitGroup{} + wg.Add(3) + + go func() { + reads[0], errs[0] = io.ReadAll(cloneds[0]) + wg.Done() + }() + go func() { + buf := make([]byte, 3) + _, errs[1] = io.ReadFull(cloneds[1], buf) + reads[1] = buf + cloneds[1].Close() + wg.Done() + }() + go func() { + reads[2], errs[2] = io.ReadAll(cloneds[2]) + wg.Done() + }() + + wg.Wait() + + So(reads, ShouldResemble, [][]byte{data, {1, 2, 3}, data}) + So(errs, ShouldResemble, []error{nil, nil, nil}) + }) +} diff --git a/utils/serder/union_handler.go b/utils/serder/union_handler.go index 8b4f0f8..bef1dd0 100644 --- a/utils/serder/union_handler.go +++ b/utils/serder/union_handler.go @@ -22,7 +22,7 @@ type TypeUnionExternallyTagged[T any] struct { TUnion *types.TypeUnion[T] } -// 遇到TypeUnion的基类(UnionType)的字段时,将其实际值的类型信息也编码到JSON中,反序列化时也会解析出类型信息,还原出真实的类型。 +// 遇到TypeUnion的基类(UnionType)的字段时,将其实际值的类型信息也编码到JSON中,反序列化时也会根据解析出类型信息,还原出真实的类型。 // Externally Tagged的格式是:{ "类型名": {...对象内容...} } // // 可以通过内嵌Metadata结构体,并在它身上增加"union"Tag来指定类型名称,如果没有指定,则默认使用系统类型名(包括包路径)。