Browse Source

Merge branch 'feature_gxh'

pull/36/head
Sydonian 2 years ago
parent
commit
0282545241
9 changed files with 123 additions and 57 deletions
  1. +1
    -1
      pkgs/ipfs/config.go
  2. +2
    -3
      pkgs/ipfs/ipfs.go
  3. +31
    -17
      pkgs/types/union.go
  4. +3
    -3
      sdks/storage/models.go
  5. +3
    -3
      utils/io/chunked_split.go
  6. +3
    -3
      utils/io/chunked_split_test.go
  7. +27
    -0
      utils/io/io.go
  8. +3
    -3
      utils/serder/serder.go
  9. +50
    -24
      utils/serder/union_handler.go

+ 1
- 1
pkgs/ipfs/config.go View File

@@ -1,5 +1,5 @@
package ipfs package ipfs


type Config struct { type Config struct {
Port int `json:"port"`
Address string `json:"address"`
} }

+ 2
- 3
pkgs/ipfs/ipfs.go View File

@@ -19,12 +19,11 @@ type Client struct {
} }


func NewClient(cfg *Config) (*Client, error) { func NewClient(cfg *Config) (*Client, error) {
ipfsAddr := fmt.Sprintf("localhost:%d", cfg.Port)
sh := shell.NewShell(ipfsAddr)
sh := shell.NewShell(cfg.Address)


// 检测连通性 // 检测连通性
if !sh.IsUp() { if !sh.IsUp() {
return nil, fmt.Errorf("cannot connect to %s", ipfsAddr)
return nil, fmt.Errorf("cannot connect to %s", cfg.Address)
} }


return &Client{ return &Client{


+ 31
- 17
pkgs/types/union.go View File

@@ -7,28 +7,14 @@ import (
myreflect "gitlink.org.cn/cloudream/common/utils/reflect" myreflect "gitlink.org.cn/cloudream/common/utils/reflect"
) )


// 描述一个类型集合
type TypeUnion struct {
type AnyTypeUnion struct {
// 这个集合的类型 // 这个集合的类型
UnionType myreflect.Type UnionType myreflect.Type
// 集合中包含的类型,即遇到UnionType类型的值时,它内部的实际类型的范围 // 集合中包含的类型,即遇到UnionType类型的值时,它内部的实际类型的范围
ElementTypes []myreflect.Type ElementTypes []myreflect.Type
} }


// 创建一个TypeUnion。泛型参数为Union的类型,形参为Union中包含的类型的一个实例,无实际用途,仅用于获取类型。
func NewTypeUnion[TU any](eleValues ...TU) TypeUnion {
var eleTypes []reflect.Type
for _, v := range eleValues {
eleTypes = append(eleTypes, reflect.TypeOf(v))
}

return TypeUnion{
UnionType: myreflect.TypeOf[TU](),
ElementTypes: eleTypes,
}
}

func (u *TypeUnion) Include(typ myreflect.Type) bool {
func (u *AnyTypeUnion) Include(typ myreflect.Type) bool {
for _, t := range u.ElementTypes { for _, t := range u.ElementTypes {
if t == typ { if t == typ {
return true return true
@@ -38,7 +24,7 @@ func (u *TypeUnion) Include(typ myreflect.Type) bool {
return false return false
} }


func (u *TypeUnion) Add(typ myreflect.Type) error {
func (u *AnyTypeUnion) Add(typ myreflect.Type) error {
if !typ.AssignableTo(u.UnionType) { if !typ.AssignableTo(u.UnionType) {
return fmt.Errorf("type is not assignable to union type") return fmt.Errorf("type is not assignable to union type")
} }
@@ -46,3 +32,31 @@ func (u *TypeUnion) Add(typ myreflect.Type) error {
u.ElementTypes = append(u.ElementTypes, typ) u.ElementTypes = append(u.ElementTypes, typ)
return nil return nil
} }

// 描述一个类型集合
type TypeUnion[T any] struct {
AnyTypeUnion
}

func (u *TypeUnion[T]) AddT(nilValue T) {
u.ElementTypes = append(u.ElementTypes, reflect.TypeOf(nilValue))
}

func (u *TypeUnion[T]) ToAny() *AnyTypeUnion {
return &u.AnyTypeUnion
}

// 创建一个TypeUnion。泛型参数为Union的类型,形参为Union中包含的类型的一个实例,无实际用途,仅用于获取类型。
func NewTypeUnion[TU any](eleValues ...TU) TypeUnion[TU] {
var eleTypes []reflect.Type
for _, v := range eleValues {
eleTypes = append(eleTypes, reflect.TypeOf(v))
}

return TypeUnion[TU]{
AnyTypeUnion{
UnionType: myreflect.TypeOf[TU](),
ElementTypes: eleTypes,
},
}
}

+ 3
- 3
sdks/storage/models.go View File

@@ -36,10 +36,10 @@ func NewRepRedundancyInfo(repCount int) RepRedundancyInfo {


type ECRedundancyInfo struct { type ECRedundancyInfo struct {
ECName string `json:"ecName"` ECName string `json:"ecName"`
ChunkSize int64 `json:"chunkSize"`
ChunkSize int `json:"chunkSize"`
} }


func NewECRedundancyInfo(ecName string, chunkSize int64) ECRedundancyInfo {
func NewECRedundancyInfo(ecName string, chunkSize int) ECRedundancyInfo {
return ECRedundancyInfo{ return ECRedundancyInfo{
ECName: ecName, ECName: ecName,
ChunkSize: chunkSize, ChunkSize: chunkSize,
@@ -74,7 +74,7 @@ func NewTypedRepRedundancyInfo(repCount int) TypedRedundancyInfo {
} }
} }


func NewTypedECRedundancyInfo(ecName string, chunkSize int64) TypedRedundancyInfo {
func NewTypedECRedundancyInfo(ecName string, chunkSize int) TypedRedundancyInfo {
return TypedRedundancyInfo{ return TypedRedundancyInfo{
Type: RedundancyRep, Type: RedundancyRep,
Info: ECRedundancyInfo{ Info: ECRedundancyInfo{


+ 3
- 3
utils/io/chunked_split.go View File

@@ -7,11 +7,11 @@ import (


type ChunkedSplitOption struct { type ChunkedSplitOption struct {
// 如果流的长度不是chunkSize * streamCount的整数倍,启用此参数后,会在输出流里填充0直到满足长度 // 如果流的长度不是chunkSize * streamCount的整数倍,启用此参数后,会在输出流里填充0直到满足长度
FillZeros bool
PaddingZeros bool
} }


// 每次读取一个chunkSize大小的数据,然后轮流写入到返回的流中。注:读取不同流的操作必须在不同的goroutine中进行,或者按顺序读取,每次精确读取一个chunkSize大小 // 每次读取一个chunkSize大小的数据,然后轮流写入到返回的流中。注:读取不同流的操作必须在不同的goroutine中进行,或者按顺序读取,每次精确读取一个chunkSize大小
func ChunkedSplit(stream io.Reader, chunkSize int64, streamCount int, opts ...ChunkedSplitOption) []io.ReadCloser {
func ChunkedSplit(stream io.Reader, chunkSize int, streamCount int, opts ...ChunkedSplitOption) []io.ReadCloser {
var opt ChunkedSplitOption var opt ChunkedSplitOption
if len(opts) > 0 { if len(opts) > 0 {
opt = opts[0] opt = opts[0]
@@ -49,7 +49,7 @@ func ChunkedSplit(stream io.Reader, chunkSize int64, streamCount int, opts ...Ch
break break
} }


if opt.FillZeros {
if opt.PaddingZeros {
Zero(buf[rd:]) Zero(buf[rd:])
err := WriteAll(pws[i], buf) err := WriteAll(pws[i], buf)
if err != nil { if err != nil {


+ 3
- 3
utils/io/chunked_split_test.go View File

@@ -157,7 +157,7 @@ func Test_RoundRobin(t *testing.T) {
} }


outputs := ChunkedSplit(bytes.NewReader(input), 3, 3, ChunkedSplitOption{ outputs := ChunkedSplit(bytes.NewReader(input), 3, 3, ChunkedSplitOption{
FillZeros: true,
PaddingZeros: true,
}) })


wg := sync.WaitGroup{} wg := sync.WaitGroup{}
@@ -206,7 +206,7 @@ func Test_RoundRobin(t *testing.T) {
} }


outputs := ChunkedSplit(bytes.NewReader(input), 3, 3, ChunkedSplitOption{ outputs := ChunkedSplit(bytes.NewReader(input), 3, 3, ChunkedSplitOption{
FillZeros: true,
PaddingZeros: true,
}) })
wg := sync.WaitGroup{} wg := sync.WaitGroup{}
wg.Add(3) wg.Add(3)
@@ -254,7 +254,7 @@ func Test_RoundRobin(t *testing.T) {
} }


outputs := ChunkedSplit(bytes.NewReader(input), 3, 3, ChunkedSplitOption{ outputs := ChunkedSplit(bytes.NewReader(input), 3, 3, ChunkedSplitOption{
FillZeros: true,
PaddingZeros: true,
}) })


wg := sync.WaitGroup{} wg := sync.WaitGroup{}


+ 27
- 0
utils/io/io.go View File

@@ -24,9 +24,16 @@ func WriteAll(writer io.Writer, data []byte) error {
return nil return nil
} }


const (
onceDisabled = 0
onceEnabled = 1
onceDone = 2
)

type readCloserHook struct { type readCloserHook struct {
readCloser io.ReadCloser readCloser io.ReadCloser
callback func(closer io.ReadCloser) callback func(closer io.ReadCloser)
once int
isBefore bool // callback调用时机,true则在closer的Close之前调用 isBefore bool // callback调用时机,true则在closer的Close之前调用
} }


@@ -35,6 +42,10 @@ func (hook *readCloserHook) Read(buf []byte) (n int, err error) {
} }


func (hook *readCloserHook) Close() error { func (hook *readCloserHook) Close() error {
if hook.once == onceDone {
return hook.readCloser.Close()
}

if hook.isBefore { if hook.isBefore {
hook.callback(hook.readCloser) hook.callback(hook.readCloser)
} }
@@ -44,6 +55,11 @@ func (hook *readCloserHook) Close() error {
if !hook.isBefore { if !hook.isBefore {
hook.callback(hook.readCloser) hook.callback(hook.readCloser)
} }

if hook.once == onceEnabled {
hook.once = onceDone
}

return err return err
} }


@@ -51,6 +67,7 @@ func BeforeReadClosing(closer io.ReadCloser, callback func(closer io.ReadCloser)
return &readCloserHook{ return &readCloserHook{
readCloser: closer, readCloser: closer,
callback: callback, callback: callback,
once: onceDisabled,
isBefore: true, isBefore: true,
} }
} }
@@ -59,6 +76,16 @@ func AfterReadClosed(closer io.ReadCloser, callback func(closer io.ReadCloser))
return &readCloserHook{ return &readCloserHook{
readCloser: closer, readCloser: closer,
callback: callback, callback: callback,
once: onceDisabled,
isBefore: false,
}
}

func AfterReadClosedOnce(closer io.ReadCloser, callback func(closer io.ReadCloser)) io.ReadCloser {
return &readCloserHook{
readCloser: closer,
callback: callback,
once: onceEnabled,
isBefore: false, isBefore: false,
} }
} }


+ 3
- 3
utils/serder/serder.go View File

@@ -12,8 +12,8 @@ import (
) )


var unionHandler = UnionHandler{ var unionHandler = UnionHandler{
internallyTagged: make(map[reflect.Type]*TypeUnionInternallyTagged),
externallyTagged: make(map[reflect.Type]*TypeUnionExternallyTagged),
internallyTagged: make(map[reflect.Type]*anyTypeUnionInternallyTagged),
externallyTagged: make(map[reflect.Type]*anyTypeUnionExternallyTagged),
} }


var defaultAPI = func() jsoniter.API { var defaultAPI = func() jsoniter.API {
@@ -107,7 +107,7 @@ func MapToObject(m map[string]any, obj any, opt ...MapToObjectOption) error {
op = opt[0] op = opt[0]
} }


unionTypeMapping := make(map[reflect.Type]*TypeUnionInternallyTagged)
unionTypeMapping := make(map[reflect.Type]*anyTypeUnionInternallyTagged)


if !op.NoRegisteredUnionTypes { if !op.NoRegisteredUnionTypes {
for _, u := range unionHandler.internallyTagged { for _, u := range unionHandler.internallyTagged {


+ 50
- 24
utils/serder/union_handler.go View File

@@ -12,32 +12,40 @@ import (
myreflect "gitlink.org.cn/cloudream/common/utils/reflect" myreflect "gitlink.org.cn/cloudream/common/utils/reflect"
) )


type TypeUnionExternallyTagged struct {
Union *types.TypeUnion
type anyTypeUnionExternallyTagged struct {
Union *types.AnyTypeUnion
TypeNameToType map[string]reflect.Type TypeNameToType map[string]reflect.Type
} }


type TypeUnionExternallyTagged[T any] struct {
anyTypeUnionExternallyTagged
TUnion *types.TypeUnion[T]
}

// 遇到TypeUnion的基类(UnionType)的字段时,将其实际值的类型信息也编码到JSON中,反序列化时也会解析出类型信息,还原出真实的类型。 // 遇到TypeUnion的基类(UnionType)的字段时,将其实际值的类型信息也编码到JSON中,反序列化时也会解析出类型信息,还原出真实的类型。
// Externally Tagged的格式是:{ "类型名": {...对象内容...} } // Externally Tagged的格式是:{ "类型名": {...对象内容...} }
// //
// 可以通过内嵌Metadata结构体,并在它身上增加"union"Tag来指定类型名称,如果没有指定,则默认使用系统类型名(包括包路径)。 // 可以通过内嵌Metadata结构体,并在它身上增加"union"Tag来指定类型名称,如果没有指定,则默认使用系统类型名(包括包路径)。
func UseTypeUnionExternallyTagged(union *types.TypeUnion) *TypeUnionExternallyTagged {
eu := &TypeUnionExternallyTagged{
Union: union,
TypeNameToType: make(map[string]reflect.Type),
func UseTypeUnionExternallyTagged[T any](union *types.TypeUnion[T]) *TypeUnionExternallyTagged[T] {
eu := &TypeUnionExternallyTagged[T]{
anyTypeUnionExternallyTagged: anyTypeUnionExternallyTagged{
Union: union.ToAny(),
TypeNameToType: make(map[string]reflect.Type),
},
TUnion: union,
} }


for _, eleType := range union.ElementTypes { for _, eleType := range union.ElementTypes {
eu.Add(eleType) eu.Add(eleType)
} }


unionHandler.externallyTagged[union.UnionType] = eu
unionHandler.externallyTagged[union.UnionType] = &eu.anyTypeUnionExternallyTagged


return eu return eu
} }


func (u *TypeUnionExternallyTagged) Add(typ reflect.Type) error {
err := u.Union.Add(typ)
func (u *TypeUnionExternallyTagged[T]) Add(typ reflect.Type) error {
err := u.TUnion.Add(typ)
if err != nil { if err != nil {
return nil return nil
} }
@@ -46,33 +54,46 @@ func (u *TypeUnionExternallyTagged) Add(typ reflect.Type) error {
return nil return nil
} }


type TypeUnionInternallyTagged struct {
Union *types.TypeUnion
func (u *TypeUnionExternallyTagged[T]) AddT(nilValue T) error {
u.Add(reflect.TypeOf(nilValue))
return nil
}

type anyTypeUnionInternallyTagged struct {
Union *types.AnyTypeUnion
TagField string TagField string
TagToType map[string]reflect.Type TagToType map[string]reflect.Type
} }


type TypeUnionInternallyTagged[T any] struct {
anyTypeUnionInternallyTagged
TUnion *types.TypeUnion[T]
}

// 遇到TypeUnion的基类(UnionType)的字段时,将其实际值的类型信息也编码到JSON中,反序列化时也会解析出类型信息,还原出真实的类型。 // 遇到TypeUnion的基类(UnionType)的字段时,将其实际值的类型信息也编码到JSON中,反序列化时也会解析出类型信息,还原出真实的类型。
// Internally Tagged的格式是:{ "类型字段": "类型名", ...对象内容...},JSON中的类型字段名需要指定。 // Internally Tagged的格式是:{ "类型字段": "类型名", ...对象内容...},JSON中的类型字段名需要指定。
// 注:对象定义需要包含类型字段,而且在序列化之前需要手动赋值,目前不支持自动设置。 // 注:对象定义需要包含类型字段,而且在序列化之前需要手动赋值,目前不支持自动设置。
// //
// 可以通过内嵌Metadata结构体,并在它身上增加"union"Tag来指定类型名称,如果没有指定,则默认使用系统类型名(包括包路径)。 // 可以通过内嵌Metadata结构体,并在它身上增加"union"Tag来指定类型名称,如果没有指定,则默认使用系统类型名(包括包路径)。
func UseTypeUnionInternallyTagged(union *types.TypeUnion, tagField string) *TypeUnionInternallyTagged {
iu := &TypeUnionInternallyTagged{
Union: union,
TagField: tagField,
TagToType: make(map[string]reflect.Type),
func UseTypeUnionInternallyTagged[T any](union *types.TypeUnion[T], tagField string) *TypeUnionInternallyTagged[T] {
iu := &TypeUnionInternallyTagged[T]{
anyTypeUnionInternallyTagged: anyTypeUnionInternallyTagged{
Union: union.ToAny(),
TagField: tagField,
TagToType: make(map[string]reflect.Type),
},
TUnion: union,
} }


for _, eleType := range union.ElementTypes { for _, eleType := range union.ElementTypes {
iu.Add(eleType) iu.Add(eleType)
} }


unionHandler.internallyTagged[union.UnionType] = iu
unionHandler.internallyTagged[union.UnionType] = &iu.anyTypeUnionInternallyTagged
return iu return iu
} }


func (u *TypeUnionInternallyTagged) Add(typ reflect.Type) error {
func (u *TypeUnionInternallyTagged[T]) Add(typ reflect.Type) error {
err := u.Union.Add(typ) err := u.Union.Add(typ)
if err != nil { if err != nil {
return nil return nil
@@ -107,9 +128,14 @@ func (u *TypeUnionInternallyTagged) Add(typ reflect.Type) error {
return nil return nil
} }


func (u *TypeUnionInternallyTagged[T]) AddT(nilValue T) error {
u.Add(reflect.TypeOf(nilValue))
return nil
}

type UnionHandler struct { type UnionHandler struct {
internallyTagged map[reflect.Type]*TypeUnionInternallyTagged
externallyTagged map[reflect.Type]*TypeUnionExternallyTagged
internallyTagged map[reflect.Type]*anyTypeUnionInternallyTagged
externallyTagged map[reflect.Type]*anyTypeUnionExternallyTagged
} }


func (h *UnionHandler) UpdateStructDescriptor(structDescriptor *jsoniter.StructDescriptor) { func (h *UnionHandler) UpdateStructDescriptor(structDescriptor *jsoniter.StructDescriptor) {
@@ -167,7 +193,7 @@ func (h *UnionHandler) DecorateEncoder(typ reflect2.Type, encoder jsoniter.ValEn


// 以下Encoder/Decoder都是在传入类型/目标类型是TypeUnion的基类(UnionType)时使用 // 以下Encoder/Decoder都是在传入类型/目标类型是TypeUnion的基类(UnionType)时使用
type InternallyTaggedEncoder struct { type InternallyTaggedEncoder struct {
union *TypeUnionInternallyTagged
union *anyTypeUnionInternallyTagged
} }


func (e *InternallyTaggedEncoder) IsEmpty(ptr unsafe.Pointer) bool { func (e *InternallyTaggedEncoder) IsEmpty(ptr unsafe.Pointer) bool {
@@ -190,7 +216,7 @@ func (e *InternallyTaggedEncoder) Encode(ptr unsafe.Pointer, stream *jsoniter.St
} }


type InternallyTaggedDecoder struct { type InternallyTaggedDecoder struct {
union *TypeUnionInternallyTagged
union *anyTypeUnionInternallyTagged
} }


func (e *InternallyTaggedDecoder) Decode(ptr unsafe.Pointer, iter *jsoniter.Iterator) { func (e *InternallyTaggedDecoder) Decode(ptr unsafe.Pointer, iter *jsoniter.Iterator) {
@@ -243,7 +269,7 @@ func (e *InternallyTaggedDecoder) Decode(ptr unsafe.Pointer, iter *jsoniter.Iter
} }


type ExternallyTaggedEncoder struct { type ExternallyTaggedEncoder struct {
union *TypeUnionExternallyTagged
union *anyTypeUnionExternallyTagged
} }


func (e *ExternallyTaggedEncoder) IsEmpty(ptr unsafe.Pointer) bool { func (e *ExternallyTaggedEncoder) IsEmpty(ptr unsafe.Pointer) bool {
@@ -278,7 +304,7 @@ func (e *ExternallyTaggedEncoder) Encode(ptr unsafe.Pointer, stream *jsoniter.St
} }


type ExternallyTaggedDecoder struct { type ExternallyTaggedDecoder struct {
union *TypeUnionExternallyTagged
union *anyTypeUnionExternallyTagged
} }


func (e *ExternallyTaggedDecoder) Decode(ptr unsafe.Pointer, iter *jsoniter.Iterator) { func (e *ExternallyTaggedDecoder) Decode(ptr unsafe.Pointer, iter *jsoniter.Iterator) {


Loading…
Cancel
Save