Browse Source

优化serder;增加job结构体定义

pull/12/head
Sydonian 2 years ago
parent
commit
be4e8b99e1
9 changed files with 390 additions and 130 deletions
  1. +117
    -0
      models/job.go
  2. +15
    -0
      models/unifyops.go
  3. +96
    -33
      pkgs/mq/message.go
  4. +3
    -1
      pkgs/mq/message_test.go
  5. +1
    -1
      utils/serder/any_to_any.go
  6. +61
    -52
      utils/serder/serder.go
  7. +52
    -41
      utils/serder/serder_test.go
  8. +43
    -0
      utils/serder/string_type_resolver.go
  9. +2
    -2
      utils/serder/type_name_resolver.go

+ 117
- 0
models/job.go View File

@@ -0,0 +1,117 @@
package models

import (
myreflect "gitlink.org.cn/cloudream/common/utils/reflect"
"gitlink.org.cn/cloudream/common/utils/serder"
)

const (
JobTypeNormal = "Normal"
JobTypeResource = "Resource"

FileInfoTypePackage = "Package"
FileInfoTypeLocalFile = "LocalFile"
FileInfoTypeResource = "Resource"
FileInfoTypeImage = "Image"
)

type JobSetInfo struct {
Jobs []JobInfo `json:"jobs"`
}

type JobInfo interface{}

var JobInfoTypeUnion = serder.NewTypeUnion[JobInfo]("type",
serder.NewStringTypeResolver().
Add(JobTypeNormal, myreflect.TypeOf[NormalJobInfo]()).
Add(JobTypeResource, myreflect.TypeOf[ResourceJobInfo]()),
)

type NormalJobInfo struct {
LocalJobID string `json:"localJobID"`
Type string `json:"type"`
Files JobFilesInfo `json:"files"`
Runtime JobRuntimeInfo `json:"runtime"`
Resources JobResourcesInfo `json:"resources"`
}

type ResourceJobInfo struct {
LocalJobID string `json:"localJobID"`
Type string `json:"type"`
TargetLocalJobID string `json:"targetLocalJobID"`
}

type JobFilesInfo struct {
Dateset FileInfo `json:"dataset"`
Code FileInfo `json:"code"`
Image FileInfo `json:"image"`
}

type FileInfo interface{}

var FileInfoTypeUnion = serder.NewTypeUnion[JobInfo]("type",
serder.NewStringTypeResolver().
Add(FileInfoTypePackage, myreflect.TypeOf[PackageFileInfo]()).
Add(FileInfoTypeLocalFile, myreflect.TypeOf[LocalFileInfo]()).
Add(FileInfoTypeResource, myreflect.TypeOf[ResourceFileInfo]()).
Add(FileInfoTypeImage, myreflect.TypeOf[ImageFileInfo]()),
)

type PackageFileInfo struct {
Type string `json:"type"`
PackageID int64 `json:"packageID"`
}

type LocalFileInfo struct {
Type string `json:"type"`
LocalPath string `json:"localPath"`
}

type ResourceFileInfo struct {
Type string `json:"type"`
ResourceLocalJobID string `json:"resourceLocalJobID"`
}

type ImageFileInfo struct {
Type string `json:"type"`
ImageID string `json:"imageID"`
}

type JobRuntimeInfo struct {
Command string `json:"command"`
Envs []EnvVar `json:"envs"`
}

type EnvVar struct {
Var string `json:"var"`
Value string `json:"value"`
}

type JobResourcesInfo struct {
CPU float64 `json:"cpu"`
GPU float64 `json:"gpu"`
NPU float64 `json:"npu"`
MLU float64 `json:"mlu"`
Storage int64 `json:"storage"`
Memory int64 `json:"memory"`
}

func JobSetInfoFromJSON(data []byte) (*JobSetInfo, error) {
mp := make(map[string]any)
if err := serder.JSONToObject(data, &mp); err != nil {
return nil, err
}

var ret JobSetInfo
err := serder.MapToObject(mp, &ret, serder.MapToObjectOption{
UnionTypes: []serder.UnionTypeInfo{
JobInfoTypeUnion,
FileInfoTypeUnion,
},
})
if err != nil {
return nil, err
}

return &ret, nil
}

+ 15
- 0
models/unifyops.go View File

@@ -1,5 +1,10 @@
package models

import (
myreflect "gitlink.org.cn/cloudream/common/utils/reflect"
"gitlink.org.cn/cloudream/common/utils/serder"
)

const (
ResourceTypeCPU = "CPU"
ResourceTypeNPU = "NPU"
@@ -20,6 +25,16 @@ type ResourceDataConst interface {
ResourceData | CPUResourceData | NPUResourceData | GPUResourceData | MLUResourceData | StorageResourceData | MemoryResourceData
}

var ResourceDataTypeUnion = serder.NewTypeUnion[ResourceData]("name",
serder.NewStringTypeResolver().
Add(ResourceTypeCPU, myreflect.TypeOf[CPUResourceData]()).
Add(ResourceTypeNPU, myreflect.TypeOf[NPUResourceData]()).
Add(ResourceTypeGPU, myreflect.TypeOf[GPUResourceData]()).
Add(ResourceTypeMLU, myreflect.TypeOf[MLUResourceData]()).
Add(ResourceTypeStorage, myreflect.TypeOf[StorageResourceData]()).
Add(ResourceTypeMemory, myreflect.TypeOf[MemoryResourceData]()),
)

type DetailType[T any] struct {
Unit string `json:"unit"`
Value T `json:"value"`


+ 96
- 33
pkgs/mq/message.go View File

@@ -51,42 +51,26 @@ func MakeMessage(body MessageBodyTypes) Message {
return msg
}

type typeSet struct {
TopType myreflect.Type
ElementTypes serder.TypeNameResolver
}

var typeSets map[myreflect.Type]typeSet = make(map[reflect.Type]typeSet)
var messageTypeSet *typeSet
var unionTypes map[myreflect.Type]serder.UnionTypeInfo = make(map[reflect.Type]serder.UnionTypeInfo)
var messageTypeUnionEles *serder.TypeNameResolver

// 所有新定义的Message都需要在init中调用此函数
func RegisterMessage[T any]() {
messageTypeSet.ElementTypes.Register(myreflect.TypeOf[T]())
messageTypeUnionEles.Register(myreflect.TypeOf[T]())
}

// 如果对一个类型T调用了此函数,那么在序列化结构体中包含的T类型字段时,
// 会将字段值的实际类型保存在序列化后的结果中(作为一个字段@type),
// 在序列化结构体中包含的UnionType类型字段时,会将字段值的实际类型保存在序列化后的结果中。
// 在反序列化时,会根据类型信息重建原本的字段值。
//
// 只会处理types指定的类型。
func RegisterTypeSet[T any](types ...myreflect.Type) *typeSet {
set := typeSet{
TopType: myreflect.TypeOf[T](),
ElementTypes: serder.NewTypeNameResolver(true),
}
// 注:不是采用在序列化后的数据中增加TypeFieldName指名的字段数据,因此会无视UnionTypeInfo中的这个字段的设定
func RegisterUnionType(set serder.UnionTypeInfo) {
unionTypes[set.UnionType] = set

for _, t := range types {
set.ElementTypes.Register(t)
}

typeSets[set.TopType] = set

jsoniter.RegisterTypeEncoderFunc(myreflect.TypeOf[T]().String(),
jsoniter.RegisterTypeEncoderFunc(set.UnionType.String(),
func(ptr unsafe.Pointer, stream *jsoniter.Stream) {
val := *((*T)(ptr))
var ifVal any = val

if ifVal != nil {
// 此处无法变成*UnionType,只能强转为*any
val := *(*any)(ptr)
if val != nil {
stream.WriteArrayStart()
typeStr, err := set.ElementTypes.TypeToString(myreflect.TypeOfValue(val))
if err != nil {
@@ -105,15 +89,16 @@ func RegisterTypeSet[T any](types ...myreflect.Type) *typeSet {
return false
})

jsoniter.RegisterTypeDecoderFunc(myreflect.TypeOf[T]().String(),
jsoniter.RegisterTypeDecoderFunc(set.UnionType.String(),
func(ptr unsafe.Pointer, iter *jsoniter.Iterator) {
vp := (*T)(ptr)
// 此处无法变成*UnionType,只能强转为*any
vp := (*any)(ptr)

nextTkType := iter.WhatIsNext()
if nextTkType == jsoniter.NilValue {
iter.ReadNil()
var zero T
*vp = zero
*vp = nil
} else if nextTkType == jsoniter.ArrayValue {
iter.ReadArray()
typeStr := iter.ReadString()
@@ -127,7 +112,7 @@ func RegisterTypeSet[T any](types ...myreflect.Type) *typeSet {

val := reflect.New(typ)
iter.ReadVal(val.Interface())
*vp = val.Elem().Interface().(T)
*vp = val.Elem().Interface()

iter.ReadArray()
} else {
@@ -135,7 +120,84 @@ func RegisterTypeSet[T any](types ...myreflect.Type) *typeSet {
return
}
})
}

// 如果对一个类型T调用了此函数,那么在序列化结构体中包含的T类型字段时,
// 会将字段值的实际类型保存在序列化后的结果中
// 在反序列化时,会根据类型信息重建原本的字段值。
//
// 只会处理types指定的类型。
func RegisterTypeSet[T any](types ...myreflect.Type) *serder.UnionTypeInfo {
eleTypes := serder.NewTypeNameResolver(true)
set := serder.UnionTypeInfo{
UnionType: myreflect.TypeOf[T](),
ElementTypes: eleTypes,
}

for _, t := range types {
eleTypes.Register(t)
}

/*
TODO 暂时保留这一段代码,如果RegisterUnionType中的非泛型版本出了问题,则重新使用这一部分的代码
unionTypes[set.UnionType] = set

jsoniter.RegisterTypeEncoderFunc(myreflect.TypeOf[T]().String(),
func(ptr unsafe.Pointer, stream *jsoniter.Stream) {
val := *((*T)(ptr))
var ifVal any = val

if ifVal != nil {
stream.WriteArrayStart()
typeStr, err := set.ElementTypes.TypeToString(myreflect.TypeOfValue(val))
if err != nil {
stream.Error = err
return
}
stream.WriteString(typeStr)
stream.WriteRaw(",")
stream.WriteVal(val)
stream.WriteArrayEnd()
} else {
stream.WriteNil()
}
},
func(p unsafe.Pointer) bool {
return false
})

jsoniter.RegisterTypeDecoderFunc(myreflect.TypeOf[T]().String(),
func(ptr unsafe.Pointer, iter *jsoniter.Iterator) {
vp := (*T)(ptr)

nextTkType := iter.WhatIsNext()
if nextTkType == jsoniter.NilValue {
iter.ReadNil()
var zero T
*vp = zero
} else if nextTkType == jsoniter.ArrayValue {
iter.ReadArray()
typeStr := iter.ReadString()
iter.ReadArray()

typ, err := set.ElementTypes.StringToType(typeStr)
if err != nil {
iter.ReportError("get type from string", err.Error())
return
}

val := reflect.New(typ)
iter.ReadVal(val.Interface())
*vp = val.Elem().Interface().(T)

iter.ReadArray()
} else {
iter.ReportError("parse TypeSet field", fmt.Sprintf("unknow next token type %v", nextTkType))
return
}
})
*/
RegisterUnionType(serder.NewTypeUnion[T]("", serder.NewTypeNameResolver(true)))
return &set
}

@@ -163,5 +225,6 @@ func Deserialize(data []byte) (*Message, error) {
}

func init() {
messageTypeSet = RegisterTypeSet[MessageBodyTypes]()
messageTypeUnionEles = serder.NewTypeNameResolver(true)
RegisterUnionType(serder.NewTypeUnion[MessageBodyTypes]("", messageTypeUnionEles))
}

+ 3
- 1
pkgs/mq/message_test.go View File

@@ -131,7 +131,9 @@ func TestMessage(t *testing.T) {
})

Convey("使用TypeSet类型,但字段值为nil", t, func() {
type MyTypeSet interface{}
type MyTypeSet interface {
Test()
}

type Body struct {
Value MyTypeSet


+ 1
- 1
utils/serder/any_to_any.go View File

@@ -7,7 +7,7 @@ import (
myreflect "gitlink.org.cn/cloudream/common/utils/reflect"
)

type Converter func(srcType reflect.Type, dstType reflect.Type, data interface{}) (interface{}, error)
type Converter func(from reflect.Value, to reflect.Value) (interface{}, error)

type AnyToAnyOption struct {
NoFromAny bool // 不判断目的字段是否实现了FromAny接口


+ 61
- 52
utils/serder/serder.go View File

@@ -6,6 +6,8 @@ import (
"io"
"reflect"
"strings"

myreflect "gitlink.org.cn/cloudream/common/utils/reflect"
)

func ObjectToJSON(obj any) ([]byte, error) {
@@ -47,13 +49,68 @@ type TypeResolver interface {
StringToType(typeStr string) (reflect.Type, error)
}

type TypedSerderOption struct {
TypeResolver TypeResolver
type UnionTypeInfo struct {
UnionType reflect.Type
TypeFieldName string
ElementTypes TypeResolver
}

func NewTypeUnion[TU any](typeField string, eleTypes TypeResolver) UnionTypeInfo {
return UnionTypeInfo{
UnionType: myreflect.TypeOf[TU](),
TypeFieldName: typeField,
ElementTypes: eleTypes,
}
}

func MapToObject(m map[string]any, obj any) error {
return AnyToAny(m, obj)
type MapToObjectOption struct {
UnionTypes []UnionTypeInfo // 转换过程中遇到这些类型时,会依据指定的字段的值,来决定转换后的实际类型
}

func MapToObject(m map[string]any, obj any, opt ...MapToObjectOption) error {
var op MapToObjectOption
if len(opt) > 0 {
op = opt[0]
}

unionTypeMapping := make(map[reflect.Type]*UnionTypeInfo)

for _, u := range op.UnionTypes {
unionTypeMapping[u.UnionType] = &u
}

convs := []Converter{
func(from reflect.Value, to reflect.Value) (interface{}, error) {
info, ok := unionTypeMapping[to.Type()]
if !ok {
return from.Interface(), nil
}

mp := from.Interface().(map[string]any)
tag, ok := mp[info.TypeFieldName]
if !ok {
return nil, fmt.Errorf("converting to %v: no tag field %s in map", to.Type(), info.TypeFieldName)
}

tagStr, ok := tag.(string)
if !ok {
return nil, fmt.Errorf("converting to %v: tag field %s value is %v, which is not a string", to.Type(), info.TypeFieldName, tag)
}

eleType, err := info.ElementTypes.StringToType(tagStr)
if err != nil {
return nil, fmt.Errorf("converting to %v: %w", to.Type(), err)
}

to.Set(reflect.Indirect(reflect.New(eleType)))

return from.Interface(), nil
},
}

return AnyToAny(m, obj, AnyToAnyOption{
Converters: convs,
})
}

func ObjectToMap(obj any) (map[string]any, error) {
@@ -132,51 +189,3 @@ func contains(arr []string, ele string, startIndex int) bool {

return false
}

func TypedMapToObject(m map[string]any, opt TypedSerderOption) (any, error) {

typeVal, ok := m[opt.TypeFieldName]
if !ok {
return nil, fmt.Errorf("no type field in the map")
}

typeStr, ok := typeVal.(string)
if !ok {
return nil, fmt.Errorf("type is not a string")
}

typ, err := opt.TypeResolver.StringToType(typeStr)
if err != nil {
return nil, fmt.Errorf("get type from string failed, err: %w", err)
}

val := reflect.New(typ)

valPtr := val.Interface()
err = AnyToAny(m, valPtr)
if err != nil {
return nil, err
}

return val.Elem().Interface(), nil
}

func ObjectToTypedMap(obj any, opt TypedSerderOption) (map[string]any, error) {
var mp map[string]any
err := AnyToAny(obj, &mp)
if err != nil {
return nil, err
}

_, ok := mp[opt.TypeFieldName]
if ok {
return nil, fmt.Errorf("object has the same field as the type field")
}

mp[opt.TypeFieldName], err = opt.TypeResolver.TypeToString(reflect.TypeOf(obj))
if err != nil {
return nil, fmt.Errorf("get string from type failed, err: %w", err)
}

return mp, nil
}

+ 52
- 41
utils/serder/serder_test.go View File

@@ -179,9 +179,9 @@ func Test_AnyToAny(t *testing.T) {
st2 := Struct2{}

err := AnyToAny(st1, &st2, AnyToAnyOption{
Converters: []Converter{func(srcType reflect.Type, dstType reflect.Type, data interface{}) (interface{}, error) {
if srcType == myreflect.TypeOf[Struct1]() && dstType == myreflect.TypeOf[Struct2]() {
s1 := data.(Struct1)
Converters: []Converter{func(from reflect.Value, to reflect.Value) (interface{}, error) {
if from.Type() == myreflect.TypeOf[Struct1]() && to.Type() == myreflect.TypeOf[Struct2]() {
s1 := from.Interface().(Struct1)
return Struct2{
Value: "@" + s1.Value,
}, nil
@@ -196,44 +196,6 @@ func Test_AnyToAny(t *testing.T) {
})
}

func Test_TypedMapToObject(t *testing.T) {
type Struct struct {
A string `json:"a"`
B int `json:"b"`
C int64 `json:"c,string"`
}

nameResovler := NewTypeNameResolver(true)
nameResovler.Register(myreflect.TypeOf[Struct]())

Convey("结构体", t, func() {
st := Struct{
A: "a",
B: 1,
C: 2,
}

mp, err := ObjectToTypedMap(st, TypedSerderOption{
TypeResolver: &nameResovler,
TypeFieldName: "@type",
})

So(err, ShouldBeNil)

st2Ptr, err := TypedMapToObject(mp, TypedSerderOption{
TypeResolver: &nameResovler,
TypeFieldName: "@type",
})
So(err, ShouldBeNil)

st2, ok := st2Ptr.(Struct)
So(ok, ShouldBeTrue)
So(st2, ShouldHaveSameTypeAs, st)
So(st2, ShouldResemble, st)
})

}

func Test_MapToObject(t *testing.T) {
type Base struct {
Int int
@@ -394,4 +356,53 @@ func Test_MapToObject(t *testing.T) {

So(string(mpRetJson), ShouldEqualJSON, string(exceptMapJson))
})

Convey("包含UnionType", t, func() {
type UnionType interface{}

type EleType1 struct {
Value1 string `json:"value1"`
}

type EleType2 struct {
Value2 int `json:"value2"`
}

type St struct {
Us []UnionType `json:"us"`
}

mp := map[string]any{
"us": []map[string]any{
{
"type": "1",
"value1": "1",
},
{
"type": "2",
"value2": 2,
},
},
}

var ret St
err := MapToObject(mp, &ret, MapToObjectOption{
UnionTypes: []UnionTypeInfo{
{
UnionType: myreflect.TypeOf[UnionType](),
TypeFieldName: "type",
ElementTypes: NewStringTypeResolver().
Add("1", myreflect.TypeOf[EleType1]()).
Add("2", myreflect.TypeOf[EleType2]()),
},
},
})

So(err, ShouldBeNil)

So(ret.Us, ShouldResemble, []UnionType{
EleType1{Value1: "1"},
EleType2{Value2: 2},
})
})
}

+ 43
- 0
utils/serder/string_type_resolver.go View File

@@ -0,0 +1,43 @@
package serder

import (
"fmt"
"reflect"
)

type StringTypeResolver struct {
strToType map[string]reflect.Type
typeToStr map[reflect.Type]string
}

func NewStringTypeResolver() *StringTypeResolver {
return &StringTypeResolver{
strToType: make(map[string]reflect.Type),
typeToStr: make(map[reflect.Type]string),
}
}

func (r *StringTypeResolver) Add(str string, typ reflect.Type) *StringTypeResolver {
r.strToType[str] = typ
r.typeToStr[typ] = str
return r
}

func (r *StringTypeResolver) TypeToString(typ reflect.Type) (string, error) {
var typeStr string
var ok bool
if typeStr, ok = r.typeToStr[typ]; !ok {
return "", fmt.Errorf("type %s is not registered before", typ)
}

return typeStr, nil
}

func (r *StringTypeResolver) StringToType(typeStr string) (reflect.Type, error) {
typ, ok := r.strToType[typeStr]
if !ok {
return nil, fmt.Errorf("unknow type string %s", typeStr)
}

return typ, nil
}

+ 2
- 2
utils/serder/type_name_resolver.go View File

@@ -10,8 +10,8 @@ type TypeNameResolver struct {
types map[string]reflect.Type
}

func NewTypeNameResolver(includePackagePath bool) TypeNameResolver {
return TypeNameResolver{
func NewTypeNameResolver(includePackagePath bool) *TypeNameResolver {
return &TypeNameResolver{
includePackagePath: includePackagePath,
types: make(map[string]reflect.Type),
}


Loading…
Cancel
Save