diff --git a/models/job.go b/models/job.go new file mode 100644 index 0000000..55e42ff --- /dev/null +++ b/models/job.go @@ -0,0 +1,117 @@ +package models + +import ( + myreflect "gitlink.org.cn/cloudream/common/utils/reflect" + "gitlink.org.cn/cloudream/common/utils/serder" +) + +const ( + JobTypeNormal = "Normal" + JobTypeResource = "Resource" + + FileInfoTypePackage = "Package" + FileInfoTypeLocalFile = "LocalFile" + FileInfoTypeResource = "Resource" + FileInfoTypeImage = "Image" +) + +type JobSetInfo struct { + Jobs []JobInfo `json:"jobs"` +} + +type JobInfo interface{} + +var JobInfoTypeUnion = serder.NewTypeUnion[JobInfo]("type", + serder.NewStringTypeResolver(). + Add(JobTypeNormal, myreflect.TypeOf[NormalJobInfo]()). + Add(JobTypeResource, myreflect.TypeOf[ResourceJobInfo]()), +) + +type NormalJobInfo struct { + LocalJobID string `json:"localJobID"` + Type string `json:"type"` + Files JobFilesInfo `json:"files"` + Runtime JobRuntimeInfo `json:"runtime"` + Resources JobResourcesInfo `json:"resources"` +} + +type ResourceJobInfo struct { + LocalJobID string `json:"localJobID"` + Type string `json:"type"` + TargetLocalJobID string `json:"targetLocalJobID"` +} + +type JobFilesInfo struct { + Dateset FileInfo `json:"dataset"` + Code FileInfo `json:"code"` + Image FileInfo `json:"image"` +} + +type FileInfo interface{} + +var FileInfoTypeUnion = serder.NewTypeUnion[JobInfo]("type", + serder.NewStringTypeResolver(). + Add(FileInfoTypePackage, myreflect.TypeOf[PackageFileInfo]()). + Add(FileInfoTypeLocalFile, myreflect.TypeOf[LocalFileInfo]()). + Add(FileInfoTypeResource, myreflect.TypeOf[ResourceFileInfo]()). + Add(FileInfoTypeImage, myreflect.TypeOf[ImageFileInfo]()), +) + +type PackageFileInfo struct { + Type string `json:"type"` + PackageID int64 `json:"packageID"` +} + +type LocalFileInfo struct { + Type string `json:"type"` + LocalPath string `json:"localPath"` +} + +type ResourceFileInfo struct { + Type string `json:"type"` + ResourceLocalJobID string `json:"resourceLocalJobID"` +} + +type ImageFileInfo struct { + Type string `json:"type"` + ImageID string `json:"imageID"` +} + +type JobRuntimeInfo struct { + Command string `json:"command"` + Envs []EnvVar `json:"envs"` +} + +type EnvVar struct { + Var string `json:"var"` + Value string `json:"value"` +} + +type JobResourcesInfo struct { + CPU float64 `json:"cpu"` + GPU float64 `json:"gpu"` + NPU float64 `json:"npu"` + MLU float64 `json:"mlu"` + Storage int64 `json:"storage"` + Memory int64 `json:"memory"` +} + +func JobSetInfoFromJSON(data []byte) (*JobSetInfo, error) { + mp := make(map[string]any) + if err := serder.JSONToObject(data, &mp); err != nil { + return nil, err + } + + var ret JobSetInfo + err := serder.MapToObject(mp, &ret, serder.MapToObjectOption{ + UnionTypes: []serder.UnionTypeInfo{ + JobInfoTypeUnion, + FileInfoTypeUnion, + }, + }) + if err != nil { + return nil, err + } + + return &ret, nil +} diff --git a/models/unifyops.go b/models/unifyops.go index 7beed3f..bf717f3 100644 --- a/models/unifyops.go +++ b/models/unifyops.go @@ -1,5 +1,10 @@ package models +import ( + myreflect "gitlink.org.cn/cloudream/common/utils/reflect" + "gitlink.org.cn/cloudream/common/utils/serder" +) + const ( ResourceTypeCPU = "CPU" ResourceTypeNPU = "NPU" @@ -20,6 +25,16 @@ type ResourceDataConst interface { ResourceData | CPUResourceData | NPUResourceData | GPUResourceData | MLUResourceData | StorageResourceData | MemoryResourceData } +var ResourceDataTypeUnion = serder.NewTypeUnion[ResourceData]("name", + serder.NewStringTypeResolver(). + Add(ResourceTypeCPU, myreflect.TypeOf[CPUResourceData]()). + Add(ResourceTypeNPU, myreflect.TypeOf[NPUResourceData]()). + Add(ResourceTypeGPU, myreflect.TypeOf[GPUResourceData]()). + Add(ResourceTypeMLU, myreflect.TypeOf[MLUResourceData]()). + Add(ResourceTypeStorage, myreflect.TypeOf[StorageResourceData]()). + Add(ResourceTypeMemory, myreflect.TypeOf[MemoryResourceData]()), +) + type DetailType[T any] struct { Unit string `json:"unit"` Value T `json:"value"` diff --git a/pkgs/mq/message.go b/pkgs/mq/message.go index 8ef714c..ec04bbe 100644 --- a/pkgs/mq/message.go +++ b/pkgs/mq/message.go @@ -51,42 +51,26 @@ func MakeMessage(body MessageBodyTypes) Message { return msg } -type typeSet struct { - TopType myreflect.Type - ElementTypes serder.TypeNameResolver -} - -var typeSets map[myreflect.Type]typeSet = make(map[reflect.Type]typeSet) -var messageTypeSet *typeSet +var unionTypes map[myreflect.Type]serder.UnionTypeInfo = make(map[reflect.Type]serder.UnionTypeInfo) +var messageTypeUnionEles *serder.TypeNameResolver // 所有新定义的Message都需要在init中调用此函数 func RegisterMessage[T any]() { - messageTypeSet.ElementTypes.Register(myreflect.TypeOf[T]()) + messageTypeUnionEles.Register(myreflect.TypeOf[T]()) } -// 如果对一个类型T调用了此函数,那么在序列化结构体中包含的T类型字段时, -// 会将字段值的实际类型保存在序列化后的结果中(作为一个字段@type), +// 在序列化结构体中包含的UnionType类型字段时,会将字段值的实际类型保存在序列化后的结果中。 // 在反序列化时,会根据类型信息重建原本的字段值。 // -// 只会处理types指定的类型。 -func RegisterTypeSet[T any](types ...myreflect.Type) *typeSet { - set := typeSet{ - TopType: myreflect.TypeOf[T](), - ElementTypes: serder.NewTypeNameResolver(true), - } +// 注:不是采用在序列化后的数据中增加TypeFieldName指名的字段数据,因此会无视UnionTypeInfo中的这个字段的设定 +func RegisterUnionType(set serder.UnionTypeInfo) { + unionTypes[set.UnionType] = set - for _, t := range types { - set.ElementTypes.Register(t) - } - - typeSets[set.TopType] = set - - jsoniter.RegisterTypeEncoderFunc(myreflect.TypeOf[T]().String(), + jsoniter.RegisterTypeEncoderFunc(set.UnionType.String(), func(ptr unsafe.Pointer, stream *jsoniter.Stream) { - val := *((*T)(ptr)) - var ifVal any = val - - if ifVal != nil { + // 此处无法变成*UnionType,只能强转为*any + val := *(*any)(ptr) + if val != nil { stream.WriteArrayStart() typeStr, err := set.ElementTypes.TypeToString(myreflect.TypeOfValue(val)) if err != nil { @@ -105,15 +89,16 @@ func RegisterTypeSet[T any](types ...myreflect.Type) *typeSet { return false }) - jsoniter.RegisterTypeDecoderFunc(myreflect.TypeOf[T]().String(), + jsoniter.RegisterTypeDecoderFunc(set.UnionType.String(), func(ptr unsafe.Pointer, iter *jsoniter.Iterator) { - vp := (*T)(ptr) + // 此处无法变成*UnionType,只能强转为*any + vp := (*any)(ptr) nextTkType := iter.WhatIsNext() if nextTkType == jsoniter.NilValue { iter.ReadNil() - var zero T - *vp = zero + *vp = nil + } else if nextTkType == jsoniter.ArrayValue { iter.ReadArray() typeStr := iter.ReadString() @@ -127,7 +112,7 @@ func RegisterTypeSet[T any](types ...myreflect.Type) *typeSet { val := reflect.New(typ) iter.ReadVal(val.Interface()) - *vp = val.Elem().Interface().(T) + *vp = val.Elem().Interface() iter.ReadArray() } else { @@ -135,7 +120,84 @@ func RegisterTypeSet[T any](types ...myreflect.Type) *typeSet { return } }) +} + +// 如果对一个类型T调用了此函数,那么在序列化结构体中包含的T类型字段时, +// 会将字段值的实际类型保存在序列化后的结果中 +// 在反序列化时,会根据类型信息重建原本的字段值。 +// +// 只会处理types指定的类型。 +func RegisterTypeSet[T any](types ...myreflect.Type) *serder.UnionTypeInfo { + eleTypes := serder.NewTypeNameResolver(true) + set := serder.UnionTypeInfo{ + UnionType: myreflect.TypeOf[T](), + ElementTypes: eleTypes, + } + + for _, t := range types { + eleTypes.Register(t) + } + /* + TODO 暂时保留这一段代码,如果RegisterUnionType中的非泛型版本出了问题,则重新使用这一部分的代码 + unionTypes[set.UnionType] = set + + jsoniter.RegisterTypeEncoderFunc(myreflect.TypeOf[T]().String(), + func(ptr unsafe.Pointer, stream *jsoniter.Stream) { + val := *((*T)(ptr)) + var ifVal any = val + + if ifVal != nil { + stream.WriteArrayStart() + typeStr, err := set.ElementTypes.TypeToString(myreflect.TypeOfValue(val)) + if err != nil { + stream.Error = err + return + } + stream.WriteString(typeStr) + stream.WriteRaw(",") + stream.WriteVal(val) + stream.WriteArrayEnd() + } else { + stream.WriteNil() + } + }, + func(p unsafe.Pointer) bool { + return false + }) + + jsoniter.RegisterTypeDecoderFunc(myreflect.TypeOf[T]().String(), + func(ptr unsafe.Pointer, iter *jsoniter.Iterator) { + vp := (*T)(ptr) + + nextTkType := iter.WhatIsNext() + if nextTkType == jsoniter.NilValue { + iter.ReadNil() + var zero T + *vp = zero + } else if nextTkType == jsoniter.ArrayValue { + iter.ReadArray() + typeStr := iter.ReadString() + iter.ReadArray() + + typ, err := set.ElementTypes.StringToType(typeStr) + if err != nil { + iter.ReportError("get type from string", err.Error()) + return + } + + val := reflect.New(typ) + iter.ReadVal(val.Interface()) + *vp = val.Elem().Interface().(T) + + iter.ReadArray() + } else { + iter.ReportError("parse TypeSet field", fmt.Sprintf("unknow next token type %v", nextTkType)) + return + } + }) + */ + RegisterUnionType(serder.NewTypeUnion[T]("", serder.NewTypeNameResolver(true))) return &set } @@ -163,5 +225,6 @@ func Deserialize(data []byte) (*Message, error) { } func init() { - messageTypeSet = RegisterTypeSet[MessageBodyTypes]() + messageTypeUnionEles = serder.NewTypeNameResolver(true) + RegisterUnionType(serder.NewTypeUnion[MessageBodyTypes]("", messageTypeUnionEles)) } diff --git a/pkgs/mq/message_test.go b/pkgs/mq/message_test.go index f64b02e..4610f20 100644 --- a/pkgs/mq/message_test.go +++ b/pkgs/mq/message_test.go @@ -131,7 +131,9 @@ func TestMessage(t *testing.T) { }) Convey("使用TypeSet类型,但字段值为nil", t, func() { - type MyTypeSet interface{} + type MyTypeSet interface { + Test() + } type Body struct { Value MyTypeSet diff --git a/utils/serder/any_to_any.go b/utils/serder/any_to_any.go index a4eba9e..8fe3889 100644 --- a/utils/serder/any_to_any.go +++ b/utils/serder/any_to_any.go @@ -7,7 +7,7 @@ import ( myreflect "gitlink.org.cn/cloudream/common/utils/reflect" ) -type Converter func(srcType reflect.Type, dstType reflect.Type, data interface{}) (interface{}, error) +type Converter func(from reflect.Value, to reflect.Value) (interface{}, error) type AnyToAnyOption struct { NoFromAny bool // 不判断目的字段是否实现了FromAny接口 diff --git a/utils/serder/serder.go b/utils/serder/serder.go index abf0fed..c089b09 100644 --- a/utils/serder/serder.go +++ b/utils/serder/serder.go @@ -6,6 +6,8 @@ import ( "io" "reflect" "strings" + + myreflect "gitlink.org.cn/cloudream/common/utils/reflect" ) func ObjectToJSON(obj any) ([]byte, error) { @@ -47,13 +49,68 @@ type TypeResolver interface { StringToType(typeStr string) (reflect.Type, error) } -type TypedSerderOption struct { - TypeResolver TypeResolver +type UnionTypeInfo struct { + UnionType reflect.Type TypeFieldName string + ElementTypes TypeResolver +} + +func NewTypeUnion[TU any](typeField string, eleTypes TypeResolver) UnionTypeInfo { + return UnionTypeInfo{ + UnionType: myreflect.TypeOf[TU](), + TypeFieldName: typeField, + ElementTypes: eleTypes, + } } -func MapToObject(m map[string]any, obj any) error { - return AnyToAny(m, obj) +type MapToObjectOption struct { + UnionTypes []UnionTypeInfo // 转换过程中遇到这些类型时,会依据指定的字段的值,来决定转换后的实际类型 +} + +func MapToObject(m map[string]any, obj any, opt ...MapToObjectOption) error { + var op MapToObjectOption + if len(opt) > 0 { + op = opt[0] + } + + unionTypeMapping := make(map[reflect.Type]*UnionTypeInfo) + + for _, u := range op.UnionTypes { + unionTypeMapping[u.UnionType] = &u + } + + convs := []Converter{ + func(from reflect.Value, to reflect.Value) (interface{}, error) { + info, ok := unionTypeMapping[to.Type()] + if !ok { + return from.Interface(), nil + } + + mp := from.Interface().(map[string]any) + tag, ok := mp[info.TypeFieldName] + if !ok { + return nil, fmt.Errorf("converting to %v: no tag field %s in map", to.Type(), info.TypeFieldName) + } + + tagStr, ok := tag.(string) + if !ok { + return nil, fmt.Errorf("converting to %v: tag field %s value is %v, which is not a string", to.Type(), info.TypeFieldName, tag) + } + + eleType, err := info.ElementTypes.StringToType(tagStr) + if err != nil { + return nil, fmt.Errorf("converting to %v: %w", to.Type(), err) + } + + to.Set(reflect.Indirect(reflect.New(eleType))) + + return from.Interface(), nil + }, + } + + return AnyToAny(m, obj, AnyToAnyOption{ + Converters: convs, + }) } func ObjectToMap(obj any) (map[string]any, error) { @@ -132,51 +189,3 @@ func contains(arr []string, ele string, startIndex int) bool { return false } - -func TypedMapToObject(m map[string]any, opt TypedSerderOption) (any, error) { - - typeVal, ok := m[opt.TypeFieldName] - if !ok { - return nil, fmt.Errorf("no type field in the map") - } - - typeStr, ok := typeVal.(string) - if !ok { - return nil, fmt.Errorf("type is not a string") - } - - typ, err := opt.TypeResolver.StringToType(typeStr) - if err != nil { - return nil, fmt.Errorf("get type from string failed, err: %w", err) - } - - val := reflect.New(typ) - - valPtr := val.Interface() - err = AnyToAny(m, valPtr) - if err != nil { - return nil, err - } - - return val.Elem().Interface(), nil -} - -func ObjectToTypedMap(obj any, opt TypedSerderOption) (map[string]any, error) { - var mp map[string]any - err := AnyToAny(obj, &mp) - if err != nil { - return nil, err - } - - _, ok := mp[opt.TypeFieldName] - if ok { - return nil, fmt.Errorf("object has the same field as the type field") - } - - mp[opt.TypeFieldName], err = opt.TypeResolver.TypeToString(reflect.TypeOf(obj)) - if err != nil { - return nil, fmt.Errorf("get string from type failed, err: %w", err) - } - - return mp, nil -} diff --git a/utils/serder/serder_test.go b/utils/serder/serder_test.go index 39558aa..72747fb 100644 --- a/utils/serder/serder_test.go +++ b/utils/serder/serder_test.go @@ -179,9 +179,9 @@ func Test_AnyToAny(t *testing.T) { st2 := Struct2{} err := AnyToAny(st1, &st2, AnyToAnyOption{ - Converters: []Converter{func(srcType reflect.Type, dstType reflect.Type, data interface{}) (interface{}, error) { - if srcType == myreflect.TypeOf[Struct1]() && dstType == myreflect.TypeOf[Struct2]() { - s1 := data.(Struct1) + Converters: []Converter{func(from reflect.Value, to reflect.Value) (interface{}, error) { + if from.Type() == myreflect.TypeOf[Struct1]() && to.Type() == myreflect.TypeOf[Struct2]() { + s1 := from.Interface().(Struct1) return Struct2{ Value: "@" + s1.Value, }, nil @@ -196,44 +196,6 @@ func Test_AnyToAny(t *testing.T) { }) } -func Test_TypedMapToObject(t *testing.T) { - type Struct struct { - A string `json:"a"` - B int `json:"b"` - C int64 `json:"c,string"` - } - - nameResovler := NewTypeNameResolver(true) - nameResovler.Register(myreflect.TypeOf[Struct]()) - - Convey("结构体", t, func() { - st := Struct{ - A: "a", - B: 1, - C: 2, - } - - mp, err := ObjectToTypedMap(st, TypedSerderOption{ - TypeResolver: &nameResovler, - TypeFieldName: "@type", - }) - - So(err, ShouldBeNil) - - st2Ptr, err := TypedMapToObject(mp, TypedSerderOption{ - TypeResolver: &nameResovler, - TypeFieldName: "@type", - }) - So(err, ShouldBeNil) - - st2, ok := st2Ptr.(Struct) - So(ok, ShouldBeTrue) - So(st2, ShouldHaveSameTypeAs, st) - So(st2, ShouldResemble, st) - }) - -} - func Test_MapToObject(t *testing.T) { type Base struct { Int int @@ -394,4 +356,53 @@ func Test_MapToObject(t *testing.T) { So(string(mpRetJson), ShouldEqualJSON, string(exceptMapJson)) }) + + Convey("包含UnionType", t, func() { + type UnionType interface{} + + type EleType1 struct { + Value1 string `json:"value1"` + } + + type EleType2 struct { + Value2 int `json:"value2"` + } + + type St struct { + Us []UnionType `json:"us"` + } + + mp := map[string]any{ + "us": []map[string]any{ + { + "type": "1", + "value1": "1", + }, + { + "type": "2", + "value2": 2, + }, + }, + } + + var ret St + err := MapToObject(mp, &ret, MapToObjectOption{ + UnionTypes: []UnionTypeInfo{ + { + UnionType: myreflect.TypeOf[UnionType](), + TypeFieldName: "type", + ElementTypes: NewStringTypeResolver(). + Add("1", myreflect.TypeOf[EleType1]()). + Add("2", myreflect.TypeOf[EleType2]()), + }, + }, + }) + + So(err, ShouldBeNil) + + So(ret.Us, ShouldResemble, []UnionType{ + EleType1{Value1: "1"}, + EleType2{Value2: 2}, + }) + }) } diff --git a/utils/serder/string_type_resolver.go b/utils/serder/string_type_resolver.go new file mode 100644 index 0000000..757c4ff --- /dev/null +++ b/utils/serder/string_type_resolver.go @@ -0,0 +1,43 @@ +package serder + +import ( + "fmt" + "reflect" +) + +type StringTypeResolver struct { + strToType map[string]reflect.Type + typeToStr map[reflect.Type]string +} + +func NewStringTypeResolver() *StringTypeResolver { + return &StringTypeResolver{ + strToType: make(map[string]reflect.Type), + typeToStr: make(map[reflect.Type]string), + } +} + +func (r *StringTypeResolver) Add(str string, typ reflect.Type) *StringTypeResolver { + r.strToType[str] = typ + r.typeToStr[typ] = str + return r +} + +func (r *StringTypeResolver) TypeToString(typ reflect.Type) (string, error) { + var typeStr string + var ok bool + if typeStr, ok = r.typeToStr[typ]; !ok { + return "", fmt.Errorf("type %s is not registered before", typ) + } + + return typeStr, nil +} + +func (r *StringTypeResolver) StringToType(typeStr string) (reflect.Type, error) { + typ, ok := r.strToType[typeStr] + if !ok { + return nil, fmt.Errorf("unknow type string %s", typeStr) + } + + return typ, nil +} diff --git a/utils/serder/type_name_resolver.go b/utils/serder/type_name_resolver.go index 99b2290..03a15c6 100644 --- a/utils/serder/type_name_resolver.go +++ b/utils/serder/type_name_resolver.go @@ -10,8 +10,8 @@ type TypeNameResolver struct { types map[string]reflect.Type } -func NewTypeNameResolver(includePackagePath bool) TypeNameResolver { - return TypeNameResolver{ +func NewTypeNameResolver(includePackagePath bool) *TypeNameResolver { + return &TypeNameResolver{ includePackagePath: includePackagePath, types: make(map[string]reflect.Type), }