From f990e5fce8de2fa1eb897fc9f3b7f83d55b56eb2 Mon Sep 17 00:00:00 2001 From: Sydonian <794346190@qq.com> Date: Mon, 27 Nov 2023 15:56:16 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BC=98=E5=8C=96TypeUnion=EF=BC=9B=E5=A2=9E?= =?UTF-8?q?=E5=8A=A0io=E5=B7=A5=E5=85=B7=E5=87=BD=E6=95=B0?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- pkgs/ipfs/config.go | 2 +- pkgs/ipfs/ipfs.go | 5 +-- pkgs/types/union.go | 48 ++++++++++++++-------- sdks/storage/models.go | 6 +-- utils/io/chunked_split.go | 6 +-- utils/io/chunked_split_test.go | 6 +-- utils/io/io.go | 27 +++++++++++++ utils/serder/serder.go | 6 +-- utils/serder/union_handler.go | 74 +++++++++++++++++++++++----------- 9 files changed, 123 insertions(+), 57 deletions(-) diff --git a/pkgs/ipfs/config.go b/pkgs/ipfs/config.go index 540c388..8a61bd9 100644 --- a/pkgs/ipfs/config.go +++ b/pkgs/ipfs/config.go @@ -1,5 +1,5 @@ package ipfs type Config struct { - Port int `json:"port"` + Address string `json:"address"` } diff --git a/pkgs/ipfs/ipfs.go b/pkgs/ipfs/ipfs.go index 9e560b7..1822ce9 100644 --- a/pkgs/ipfs/ipfs.go +++ b/pkgs/ipfs/ipfs.go @@ -19,12 +19,11 @@ type Client struct { } func NewClient(cfg *Config) (*Client, error) { - ipfsAddr := fmt.Sprintf("localhost:%d", cfg.Port) - sh := shell.NewShell(ipfsAddr) + sh := shell.NewShell(cfg.Address) // 检测连通性 if !sh.IsUp() { - return nil, fmt.Errorf("cannot connect to %s", ipfsAddr) + return nil, fmt.Errorf("cannot connect to %s", cfg.Address) } return &Client{ diff --git a/pkgs/types/union.go b/pkgs/types/union.go index ec4ecce..639e25c 100644 --- a/pkgs/types/union.go +++ b/pkgs/types/union.go @@ -7,28 +7,14 @@ import ( myreflect "gitlink.org.cn/cloudream/common/utils/reflect" ) -// 描述一个类型集合 -type TypeUnion struct { +type AnyTypeUnion struct { // 这个集合的类型 UnionType myreflect.Type // 集合中包含的类型,即遇到UnionType类型的值时,它内部的实际类型的范围 ElementTypes []myreflect.Type } -// 创建一个TypeUnion。泛型参数为Union的类型,形参为Union中包含的类型的一个实例,无实际用途,仅用于获取类型。 -func NewTypeUnion[TU any](eleValues ...TU) TypeUnion { - var eleTypes []reflect.Type - for _, v := range eleValues { - eleTypes = append(eleTypes, reflect.TypeOf(v)) - } - - return TypeUnion{ - UnionType: myreflect.TypeOf[TU](), - ElementTypes: eleTypes, - } -} - -func (u *TypeUnion) Include(typ myreflect.Type) bool { +func (u *AnyTypeUnion) Include(typ myreflect.Type) bool { for _, t := range u.ElementTypes { if t == typ { return true @@ -38,7 +24,7 @@ func (u *TypeUnion) Include(typ myreflect.Type) bool { return false } -func (u *TypeUnion) Add(typ myreflect.Type) error { +func (u *AnyTypeUnion) Add(typ myreflect.Type) error { if !typ.AssignableTo(u.UnionType) { return fmt.Errorf("type is not assignable to union type") } @@ -46,3 +32,31 @@ func (u *TypeUnion) Add(typ myreflect.Type) error { u.ElementTypes = append(u.ElementTypes, typ) return nil } + +// 描述一个类型集合 +type TypeUnion[T any] struct { + AnyTypeUnion +} + +func (u *TypeUnion[T]) AddT(nilValue T) { + u.ElementTypes = append(u.ElementTypes, reflect.TypeOf(nilValue)) +} + +func (u *TypeUnion[T]) ToAny() *AnyTypeUnion { + return &u.AnyTypeUnion +} + +// 创建一个TypeUnion。泛型参数为Union的类型,形参为Union中包含的类型的一个实例,无实际用途,仅用于获取类型。 +func NewTypeUnion[TU any](eleValues ...TU) TypeUnion[TU] { + var eleTypes []reflect.Type + for _, v := range eleValues { + eleTypes = append(eleTypes, reflect.TypeOf(v)) + } + + return TypeUnion[TU]{ + AnyTypeUnion{ + UnionType: myreflect.TypeOf[TU](), + ElementTypes: eleTypes, + }, + } +} diff --git a/sdks/storage/models.go b/sdks/storage/models.go index b0600a9..6286843 100644 --- a/sdks/storage/models.go +++ b/sdks/storage/models.go @@ -36,10 +36,10 @@ func NewRepRedundancyInfo(repCount int) RepRedundancyInfo { type ECRedundancyInfo struct { ECName string `json:"ecName"` - ChunkSize int64 `json:"chunkSize"` + ChunkSize int `json:"chunkSize"` } -func NewECRedundancyInfo(ecName string, chunkSize int64) ECRedundancyInfo { +func NewECRedundancyInfo(ecName string, chunkSize int) ECRedundancyInfo { return ECRedundancyInfo{ ECName: ecName, ChunkSize: chunkSize, @@ -74,7 +74,7 @@ func NewTypedRepRedundancyInfo(repCount int) TypedRedundancyInfo { } } -func NewTypedECRedundancyInfo(ecName string, chunkSize int64) TypedRedundancyInfo { +func NewTypedECRedundancyInfo(ecName string, chunkSize int) TypedRedundancyInfo { return TypedRedundancyInfo{ Type: RedundancyRep, Info: ECRedundancyInfo{ diff --git a/utils/io/chunked_split.go b/utils/io/chunked_split.go index 97c68d4..ba5a7e1 100644 --- a/utils/io/chunked_split.go +++ b/utils/io/chunked_split.go @@ -7,11 +7,11 @@ import ( type ChunkedSplitOption struct { // 如果流的长度不是chunkSize * streamCount的整数倍,启用此参数后,会在输出流里填充0直到满足长度 - FillZeros bool + PaddingZeros bool } // 每次读取一个chunkSize大小的数据,然后轮流写入到返回的流中。注:读取不同流的操作必须在不同的goroutine中进行,或者按顺序读取,每次精确读取一个chunkSize大小 -func ChunkedSplit(stream io.Reader, chunkSize int64, streamCount int, opts ...ChunkedSplitOption) []io.ReadCloser { +func ChunkedSplit(stream io.Reader, chunkSize int, streamCount int, opts ...ChunkedSplitOption) []io.ReadCloser { var opt ChunkedSplitOption if len(opts) > 0 { opt = opts[0] @@ -49,7 +49,7 @@ func ChunkedSplit(stream io.Reader, chunkSize int64, streamCount int, opts ...Ch break } - if opt.FillZeros { + if opt.PaddingZeros { Zero(buf[rd:]) err := WriteAll(pws[i], buf) if err != nil { diff --git a/utils/io/chunked_split_test.go b/utils/io/chunked_split_test.go index 0e35574..8be2d3f 100644 --- a/utils/io/chunked_split_test.go +++ b/utils/io/chunked_split_test.go @@ -157,7 +157,7 @@ func Test_RoundRobin(t *testing.T) { } outputs := ChunkedSplit(bytes.NewReader(input), 3, 3, ChunkedSplitOption{ - FillZeros: true, + PaddingZeros: true, }) wg := sync.WaitGroup{} @@ -206,7 +206,7 @@ func Test_RoundRobin(t *testing.T) { } outputs := ChunkedSplit(bytes.NewReader(input), 3, 3, ChunkedSplitOption{ - FillZeros: true, + PaddingZeros: true, }) wg := sync.WaitGroup{} wg.Add(3) @@ -254,7 +254,7 @@ func Test_RoundRobin(t *testing.T) { } outputs := ChunkedSplit(bytes.NewReader(input), 3, 3, ChunkedSplitOption{ - FillZeros: true, + PaddingZeros: true, }) wg := sync.WaitGroup{} diff --git a/utils/io/io.go b/utils/io/io.go index af8c639..186c70b 100644 --- a/utils/io/io.go +++ b/utils/io/io.go @@ -24,9 +24,16 @@ func WriteAll(writer io.Writer, data []byte) error { return nil } +const ( + onceDisabled = 0 + onceEnabled = 1 + onceDone = 2 +) + type readCloserHook struct { readCloser io.ReadCloser callback func(closer io.ReadCloser) + once int isBefore bool // callback调用时机,true则在closer的Close之前调用 } @@ -35,6 +42,10 @@ func (hook *readCloserHook) Read(buf []byte) (n int, err error) { } func (hook *readCloserHook) Close() error { + if hook.once == onceDone { + return hook.readCloser.Close() + } + if hook.isBefore { hook.callback(hook.readCloser) } @@ -44,6 +55,11 @@ func (hook *readCloserHook) Close() error { if !hook.isBefore { hook.callback(hook.readCloser) } + + if hook.once == onceEnabled { + hook.once = onceDone + } + return err } @@ -51,6 +67,7 @@ func BeforeReadClosing(closer io.ReadCloser, callback func(closer io.ReadCloser) return &readCloserHook{ readCloser: closer, callback: callback, + once: onceDisabled, isBefore: true, } } @@ -59,6 +76,16 @@ func AfterReadClosed(closer io.ReadCloser, callback func(closer io.ReadCloser)) return &readCloserHook{ readCloser: closer, callback: callback, + once: onceDisabled, + isBefore: false, + } +} + +func AfterReadClosedOnce(closer io.ReadCloser, callback func(closer io.ReadCloser)) io.ReadCloser { + return &readCloserHook{ + readCloser: closer, + callback: callback, + once: onceEnabled, isBefore: false, } } diff --git a/utils/serder/serder.go b/utils/serder/serder.go index 81b36b5..911e8ae 100644 --- a/utils/serder/serder.go +++ b/utils/serder/serder.go @@ -12,8 +12,8 @@ import ( ) var unionHandler = UnionHandler{ - internallyTagged: make(map[reflect.Type]*TypeUnionInternallyTagged), - externallyTagged: make(map[reflect.Type]*TypeUnionExternallyTagged), + internallyTagged: make(map[reflect.Type]*anyTypeUnionInternallyTagged), + externallyTagged: make(map[reflect.Type]*anyTypeUnionExternallyTagged), } var defaultAPI = func() jsoniter.API { @@ -107,7 +107,7 @@ func MapToObject(m map[string]any, obj any, opt ...MapToObjectOption) error { op = opt[0] } - unionTypeMapping := make(map[reflect.Type]*TypeUnionInternallyTagged) + unionTypeMapping := make(map[reflect.Type]*anyTypeUnionInternallyTagged) if !op.NoRegisteredUnionTypes { for _, u := range unionHandler.internallyTagged { diff --git a/utils/serder/union_handler.go b/utils/serder/union_handler.go index bd4df5b..8b4f0f8 100644 --- a/utils/serder/union_handler.go +++ b/utils/serder/union_handler.go @@ -12,32 +12,40 @@ import ( myreflect "gitlink.org.cn/cloudream/common/utils/reflect" ) -type TypeUnionExternallyTagged struct { - Union *types.TypeUnion +type anyTypeUnionExternallyTagged struct { + Union *types.AnyTypeUnion TypeNameToType map[string]reflect.Type } +type TypeUnionExternallyTagged[T any] struct { + anyTypeUnionExternallyTagged + TUnion *types.TypeUnion[T] +} + // 遇到TypeUnion的基类(UnionType)的字段时,将其实际值的类型信息也编码到JSON中,反序列化时也会解析出类型信息,还原出真实的类型。 // Externally Tagged的格式是:{ "类型名": {...对象内容...} } // // 可以通过内嵌Metadata结构体,并在它身上增加"union"Tag来指定类型名称,如果没有指定,则默认使用系统类型名(包括包路径)。 -func UseTypeUnionExternallyTagged(union *types.TypeUnion) *TypeUnionExternallyTagged { - eu := &TypeUnionExternallyTagged{ - Union: union, - TypeNameToType: make(map[string]reflect.Type), +func UseTypeUnionExternallyTagged[T any](union *types.TypeUnion[T]) *TypeUnionExternallyTagged[T] { + eu := &TypeUnionExternallyTagged[T]{ + anyTypeUnionExternallyTagged: anyTypeUnionExternallyTagged{ + Union: union.ToAny(), + TypeNameToType: make(map[string]reflect.Type), + }, + TUnion: union, } for _, eleType := range union.ElementTypes { eu.Add(eleType) } - unionHandler.externallyTagged[union.UnionType] = eu + unionHandler.externallyTagged[union.UnionType] = &eu.anyTypeUnionExternallyTagged return eu } -func (u *TypeUnionExternallyTagged) Add(typ reflect.Type) error { - err := u.Union.Add(typ) +func (u *TypeUnionExternallyTagged[T]) Add(typ reflect.Type) error { + err := u.TUnion.Add(typ) if err != nil { return nil } @@ -46,33 +54,46 @@ func (u *TypeUnionExternallyTagged) Add(typ reflect.Type) error { return nil } -type TypeUnionInternallyTagged struct { - Union *types.TypeUnion +func (u *TypeUnionExternallyTagged[T]) AddT(nilValue T) error { + u.Add(reflect.TypeOf(nilValue)) + return nil +} + +type anyTypeUnionInternallyTagged struct { + Union *types.AnyTypeUnion TagField string TagToType map[string]reflect.Type } +type TypeUnionInternallyTagged[T any] struct { + anyTypeUnionInternallyTagged + TUnion *types.TypeUnion[T] +} + // 遇到TypeUnion的基类(UnionType)的字段时,将其实际值的类型信息也编码到JSON中,反序列化时也会解析出类型信息,还原出真实的类型。 // Internally Tagged的格式是:{ "类型字段": "类型名", ...对象内容...},JSON中的类型字段名需要指定。 // 注:对象定义需要包含类型字段,而且在序列化之前需要手动赋值,目前不支持自动设置。 // // 可以通过内嵌Metadata结构体,并在它身上增加"union"Tag来指定类型名称,如果没有指定,则默认使用系统类型名(包括包路径)。 -func UseTypeUnionInternallyTagged(union *types.TypeUnion, tagField string) *TypeUnionInternallyTagged { - iu := &TypeUnionInternallyTagged{ - Union: union, - TagField: tagField, - TagToType: make(map[string]reflect.Type), +func UseTypeUnionInternallyTagged[T any](union *types.TypeUnion[T], tagField string) *TypeUnionInternallyTagged[T] { + iu := &TypeUnionInternallyTagged[T]{ + anyTypeUnionInternallyTagged: anyTypeUnionInternallyTagged{ + Union: union.ToAny(), + TagField: tagField, + TagToType: make(map[string]reflect.Type), + }, + TUnion: union, } for _, eleType := range union.ElementTypes { iu.Add(eleType) } - unionHandler.internallyTagged[union.UnionType] = iu + unionHandler.internallyTagged[union.UnionType] = &iu.anyTypeUnionInternallyTagged return iu } -func (u *TypeUnionInternallyTagged) Add(typ reflect.Type) error { +func (u *TypeUnionInternallyTagged[T]) Add(typ reflect.Type) error { err := u.Union.Add(typ) if err != nil { return nil @@ -107,9 +128,14 @@ func (u *TypeUnionInternallyTagged) Add(typ reflect.Type) error { return nil } +func (u *TypeUnionInternallyTagged[T]) AddT(nilValue T) error { + u.Add(reflect.TypeOf(nilValue)) + return nil +} + type UnionHandler struct { - internallyTagged map[reflect.Type]*TypeUnionInternallyTagged - externallyTagged map[reflect.Type]*TypeUnionExternallyTagged + internallyTagged map[reflect.Type]*anyTypeUnionInternallyTagged + externallyTagged map[reflect.Type]*anyTypeUnionExternallyTagged } func (h *UnionHandler) UpdateStructDescriptor(structDescriptor *jsoniter.StructDescriptor) { @@ -167,7 +193,7 @@ func (h *UnionHandler) DecorateEncoder(typ reflect2.Type, encoder jsoniter.ValEn // 以下Encoder/Decoder都是在传入类型/目标类型是TypeUnion的基类(UnionType)时使用 type InternallyTaggedEncoder struct { - union *TypeUnionInternallyTagged + union *anyTypeUnionInternallyTagged } func (e *InternallyTaggedEncoder) IsEmpty(ptr unsafe.Pointer) bool { @@ -190,7 +216,7 @@ func (e *InternallyTaggedEncoder) Encode(ptr unsafe.Pointer, stream *jsoniter.St } type InternallyTaggedDecoder struct { - union *TypeUnionInternallyTagged + union *anyTypeUnionInternallyTagged } func (e *InternallyTaggedDecoder) Decode(ptr unsafe.Pointer, iter *jsoniter.Iterator) { @@ -243,7 +269,7 @@ func (e *InternallyTaggedDecoder) Decode(ptr unsafe.Pointer, iter *jsoniter.Iter } type ExternallyTaggedEncoder struct { - union *TypeUnionExternallyTagged + union *anyTypeUnionExternallyTagged } func (e *ExternallyTaggedEncoder) IsEmpty(ptr unsafe.Pointer) bool { @@ -278,7 +304,7 @@ func (e *ExternallyTaggedEncoder) Encode(ptr unsafe.Pointer, stream *jsoniter.St } type ExternallyTaggedDecoder struct { - union *TypeUnionExternallyTagged + union *anyTypeUnionExternallyTagged } func (e *ExternallyTaggedDecoder) Decode(ptr unsafe.Pointer, iter *jsoniter.Iterator) {