Browse Source

重构manager模块

pull/45/head
Sydonian 1 year ago
parent
commit
c484262e95
19 changed files with 186 additions and 61 deletions
  1. +4
    -4
      pkgs/cmdtrie/command_trie.go
  2. +2
    -2
      pkgs/logger/global_logger.go
  3. +3
    -3
      pkgs/mq/client.go
  4. +2
    -2
      pkgs/mq/message.go
  5. +7
    -7
      pkgs/mq/message_dispatcher.go
  6. +4
    -4
      pkgs/mq/message_test.go
  7. +6
    -6
      pkgs/typedispatcher/type_dispatcher.go
  8. +6
    -6
      pkgs/types/union.go
  9. +1
    -1
      sdks/pcm/models.go
  10. +3
    -3
      sdks/scheduler/models.go
  11. +1
    -1
      sdks/scheduler/scheduler_test.go
  12. +2
    -2
      sdks/storage/package.go
  13. +1
    -1
      utils/reflect2/reflect.go
  14. +7
    -7
      utils/serder/any_to_any.go
  15. +5
    -5
      utils/serder/serder_test.go
  16. +4
    -4
      utils/serder/union_handler.go
  17. +3
    -3
      utils/serder/walk_test.go
  18. +77
    -0
      utils/sync2/channel.go
  19. +48
    -0
      utils/sync2/select_set.go

+ 4
- 4
pkgs/cmdtrie/command_trie.go View File

@@ -7,7 +7,7 @@ import (


"github.com/samber/lo" "github.com/samber/lo"
"gitlink.org.cn/cloudream/common/pkgs/trie" "gitlink.org.cn/cloudream/common/pkgs/trie"
myreflect "gitlink.org.cn/cloudream/common/utils/reflect"
"gitlink.org.cn/cloudream/common/utils/reflect2"
) )


type ExecuteOption struct { type ExecuteOption struct {
@@ -296,7 +296,7 @@ type CommandTrie[TCtx any, TRet any] struct {


func NewCommandTrie[TCtx any, TRet any]() CommandTrie[TCtx, TRet] { func NewCommandTrie[TCtx any, TRet any]() CommandTrie[TCtx, TRet] {
return CommandTrie[TCtx, TRet]{ return CommandTrie[TCtx, TRet]{
anyTrie: newAnyCommandTrie(myreflect.TypeOf[TCtx](), myreflect.TypeOf[TRet]()),
anyTrie: newAnyCommandTrie(reflect2.TypeOf[TCtx](), reflect2.TypeOf[TRet]()),
} }
} }


@@ -337,7 +337,7 @@ type VoidCommandTrie[TCtx any] struct {


func NewVoidCommandTrie[TCtx any]() VoidCommandTrie[TCtx] { func NewVoidCommandTrie[TCtx any]() VoidCommandTrie[TCtx] {
return VoidCommandTrie[TCtx]{ return VoidCommandTrie[TCtx]{
anyTrie: newAnyCommandTrie(myreflect.TypeOf[TCtx](), nil),
anyTrie: newAnyCommandTrie(reflect2.TypeOf[TCtx](), nil),
} }
} }


@@ -368,7 +368,7 @@ type StaticCommandTrie[TRet any] struct {


func NewStaticCommandTrie[TRet any]() StaticCommandTrie[TRet] { func NewStaticCommandTrie[TRet any]() StaticCommandTrie[TRet] {
return StaticCommandTrie[TRet]{ return StaticCommandTrie[TRet]{
anyTrie: newAnyCommandTrie(nil, myreflect.TypeOf[TRet]()),
anyTrie: newAnyCommandTrie(nil, reflect2.TypeOf[TRet]()),
} }
} }




+ 2
- 2
pkgs/logger/global_logger.go View File

@@ -8,7 +8,7 @@ import (


nested "github.com/antonfisher/nested-logrus-formatter" nested "github.com/antonfisher/nested-logrus-formatter"
"github.com/sirupsen/logrus" "github.com/sirupsen/logrus"
myreflect "gitlink.org.cn/cloudream/common/utils/reflect"
"gitlink.org.cn/cloudream/common/utils/reflect2"
) )


// 输出日志到标准输出。适用于没有设计好日志输出方案时临时使用。 // 输出日志到标准输出。适用于没有设计好日志输出方案时临时使用。
@@ -123,6 +123,6 @@ func WithField(key string, val any) Logger {


func WithType[T any](key string) Logger { func WithType[T any](key string) Logger {
return &logrusLogger{ return &logrusLogger{
entry: logrus.WithField(key, myreflect.TypeOf[T]().Name()),
entry: logrus.WithField(key, reflect2.TypeOf[T]().Name()),
} }
} }

+ 3
- 3
pkgs/mq/client.go View File

@@ -10,7 +10,7 @@ import (
"github.com/streadway/amqp" "github.com/streadway/amqp"
"gitlink.org.cn/cloudream/common/consts/errorcode" "gitlink.org.cn/cloudream/common/consts/errorcode"
"gitlink.org.cn/cloudream/common/pkgs/logger" "gitlink.org.cn/cloudream/common/pkgs/logger"
myreflect "gitlink.org.cn/cloudream/common/utils/reflect"
"gitlink.org.cn/cloudream/common/utils/reflect2"
) )


const ( const (
@@ -323,8 +323,8 @@ func Request[TSvc any, TReq MessageBody, TResp MessageBody](_ func(svc TSvc, msg
respBody, ok := resp.Body.(TResp) respBody, ok := resp.Body.(TResp)
if !ok { if !ok {
return defRet, fmt.Errorf("expect a %s body, but got %s", return defRet, fmt.Errorf("expect a %s body, but got %s",
myreflect.ElemTypeOf[TResp]().Name(),
myreflect.TypeOfValue(resp.Body).Name())
reflect2.ElemTypeOf[TResp]().Name(),
reflect2.TypeOfValue(resp.Body).Name())
} }


return respBody, nil return respBody, nil


+ 2
- 2
pkgs/mq/message.go View File

@@ -2,7 +2,7 @@ package mq


import ( import (
"gitlink.org.cn/cloudream/common/pkgs/types" "gitlink.org.cn/cloudream/common/pkgs/types"
myreflect "gitlink.org.cn/cloudream/common/utils/reflect"
"gitlink.org.cn/cloudream/common/utils/reflect2"
"gitlink.org.cn/cloudream/common/utils/serder" "gitlink.org.cn/cloudream/common/utils/serder"
) )


@@ -83,7 +83,7 @@ var msgBodyTypeUnion = serder.UseTypeUnionExternallyTagged(types.Ref(types.NewTy


// 所有新定义的Message都需要在init中调用此函数 // 所有新定义的Message都需要在init中调用此函数
func RegisterMessage[T MessageBody]() { func RegisterMessage[T MessageBody]() {
err := msgBodyTypeUnion.Add(myreflect.TypeOf[T]())
err := msgBodyTypeUnion.Add(reflect2.TypeOf[T]())
if err != nil { if err != nil {
panic(err) panic(err)
} }


+ 7
- 7
pkgs/mq/message_dispatcher.go View File

@@ -3,27 +3,27 @@ package mq
import ( import (
"fmt" "fmt"


myreflect "gitlink.org.cn/cloudream/common/utils/reflect"
"gitlink.org.cn/cloudream/common/utils/reflect2"
) )


type HandlerFn func(svcBase any, msg *Message) (*Message, error) type HandlerFn func(svcBase any, msg *Message) (*Message, error)


type MessageDispatcher struct { type MessageDispatcher struct {
Handlers map[myreflect.Type]HandlerFn
Handlers map[reflect2.Type]HandlerFn
} }


func NewMessageDispatcher() MessageDispatcher { func NewMessageDispatcher() MessageDispatcher {
return MessageDispatcher{ return MessageDispatcher{
Handlers: make(map[myreflect.Type]HandlerFn),
Handlers: make(map[reflect2.Type]HandlerFn),
} }
} }


func (h *MessageDispatcher) Add(typ myreflect.Type, handler HandlerFn) {
func (h *MessageDispatcher) Add(typ reflect2.Type, handler HandlerFn) {
h.Handlers[typ] = handler h.Handlers[typ] = handler
} }


func (h *MessageDispatcher) Handle(svcBase any, msg *Message) (*Message, error) { func (h *MessageDispatcher) Handle(svcBase any, msg *Message) (*Message, error) {
typ := myreflect.TypeOfValue(msg.Body)
typ := reflect2.TypeOfValue(msg.Body)
fn, ok := h.Handlers[typ] fn, ok := h.Handlers[typ]
if !ok { if !ok {
return nil, fmt.Errorf("unsupported message type: %s", typ.String()) return nil, fmt.Errorf("unsupported message type: %s", typ.String())
@@ -34,7 +34,7 @@ func (h *MessageDispatcher) Handle(svcBase any, msg *Message) (*Message, error)


// 将Service中的一个接口函数作为指定类型消息的处理函数 // 将Service中的一个接口函数作为指定类型消息的处理函数
func AddServiceFn[TSvc any, TReq MessageBody, TResp MessageBody](dispatcher *MessageDispatcher, svcFn func(svc TSvc, msg TReq) (TResp, *CodeMessage)) { func AddServiceFn[TSvc any, TReq MessageBody, TResp MessageBody](dispatcher *MessageDispatcher, svcFn func(svc TSvc, msg TReq) (TResp, *CodeMessage)) {
dispatcher.Add(myreflect.TypeOf[TReq](), func(svcBase any, reqMsg *Message) (*Message, error) {
dispatcher.Add(reflect2.TypeOf[TReq](), func(svcBase any, reqMsg *Message) (*Message, error) {


reqMsgBody := reqMsg.Body.(TReq) reqMsgBody := reqMsg.Body.(TReq)
ret, codeMsg := svcFn(svcBase.(TSvc), reqMsgBody) ret, codeMsg := svcFn(svcBase.(TSvc), reqMsgBody)
@@ -47,7 +47,7 @@ func AddServiceFn[TSvc any, TReq MessageBody, TResp MessageBody](dispatcher *Mes


// 将Service中的一个*没有返回值的*接口函数作为指定类型消息的处理函数 // 将Service中的一个*没有返回值的*接口函数作为指定类型消息的处理函数
func AddNoRespServiceFn[TSvc any, TReq MessageBody](dispatcher *MessageDispatcher, svcFn func(svc TSvc, msg TReq)) { func AddNoRespServiceFn[TSvc any, TReq MessageBody](dispatcher *MessageDispatcher, svcFn func(svc TSvc, msg TReq)) {
dispatcher.Add(myreflect.TypeOf[TReq](), func(svcBase any, reqMsg *Message) (*Message, error) {
dispatcher.Add(reflect2.TypeOf[TReq](), func(svcBase any, reqMsg *Message) (*Message, error) {


reqMsgBody := reqMsg.Body.(TReq) reqMsgBody := reqMsg.Body.(TReq)
svcFn(svcBase.(TSvc), reqMsgBody) svcFn(svcBase.(TSvc), reqMsgBody)


+ 4
- 4
pkgs/mq/message_test.go View File

@@ -9,7 +9,7 @@ import (
jsoniter "github.com/json-iterator/go" jsoniter "github.com/json-iterator/go"
. "github.com/smartystreets/goconvey/convey" . "github.com/smartystreets/goconvey/convey"
"gitlink.org.cn/cloudream/common/pkgs/types" "gitlink.org.cn/cloudream/common/pkgs/types"
myreflect "gitlink.org.cn/cloudream/common/utils/reflect"
"gitlink.org.cn/cloudream/common/utils/reflect2"
"gitlink.org.cn/cloudream/common/utils/serder" "gitlink.org.cn/cloudream/common/utils/serder"
) )


@@ -42,13 +42,13 @@ func TestMessage(t *testing.T) {
Nil: nil, Nil: nil,
} }


jsoniter.RegisterTypeEncoderFunc(myreflect.TypeOf[MyAny]().String(),
jsoniter.RegisterTypeEncoderFunc(reflect2.TypeOf[MyAny]().String(),
func(ptr unsafe.Pointer, stream *jsoniter.Stream) { func(ptr unsafe.Pointer, stream *jsoniter.Stream) {
val := *((*MyAny)(ptr)) val := *((*MyAny)(ptr))


stream.WriteArrayStart() stream.WriteArrayStart()
if val != nil { if val != nil {
stream.WriteString(myreflect.TypeOfValue(val).String())
stream.WriteString(reflect2.TypeOfValue(val).String())
stream.WriteRaw(",") stream.WriteRaw(",")
stream.WriteVal(val) stream.WriteVal(val)
} }
@@ -58,7 +58,7 @@ func TestMessage(t *testing.T) {
return false return false
}) })


jsoniter.RegisterTypeDecoderFunc(myreflect.TypeOf[MyAny]().String(),
jsoniter.RegisterTypeDecoderFunc(reflect2.TypeOf[MyAny]().String(),
func(ptr unsafe.Pointer, iter *jsoniter.Iterator) { func(ptr unsafe.Pointer, iter *jsoniter.Iterator) {
vp := (*MyAny)(ptr) vp := (*MyAny)(ptr)




+ 6
- 6
pkgs/typedispatcher/type_dispatcher.go View File

@@ -1,28 +1,28 @@
package typedispatcher package typedispatcher


import ( import (
myreflect "gitlink.org.cn/cloudream/common/utils/reflect"
"gitlink.org.cn/cloudream/common/utils/reflect2"
) )


type HandlerFn[TRet any] func(val any) TRet type HandlerFn[TRet any] func(val any) TRet


type TypeDispatcher[TRet any] struct { type TypeDispatcher[TRet any] struct {
handlers map[myreflect.Type]HandlerFn[TRet]
handlers map[reflect2.Type]HandlerFn[TRet]
} }


func NewTypeDispatcher[TRet any]() TypeDispatcher[TRet] { func NewTypeDispatcher[TRet any]() TypeDispatcher[TRet] {
return TypeDispatcher[TRet]{ return TypeDispatcher[TRet]{
handlers: make(map[myreflect.Type]HandlerFn[TRet]),
handlers: make(map[reflect2.Type]HandlerFn[TRet]),
} }
} }


func (t *TypeDispatcher[TRet]) Add(typ myreflect.Type, fn HandlerFn[TRet]) {
func (t *TypeDispatcher[TRet]) Add(typ reflect2.Type, fn HandlerFn[TRet]) {
t.handlers[typ] = fn t.handlers[typ] = fn
} }


func (t *TypeDispatcher[TRet]) Dispatch(val any) (TRet, bool) { func (t *TypeDispatcher[TRet]) Dispatch(val any) (TRet, bool) {
var ret TRet var ret TRet
typ := myreflect.TypeOfValue(val)
typ := reflect2.TypeOfValue(val)
handler, ok := t.handlers[typ] handler, ok := t.handlers[typ]
if !ok { if !ok {
return ret, false return ret, false
@@ -32,7 +32,7 @@ func (t *TypeDispatcher[TRet]) Dispatch(val any) (TRet, bool) {
} }


func Add[T any, TRet any](dispatcher TypeDispatcher[TRet], handler func(val T) TRet) { func Add[T any, TRet any](dispatcher TypeDispatcher[TRet], handler func(val T) TRet) {
dispatcher.Add(myreflect.TypeOf[T](), func(val any) TRet {
dispatcher.Add(reflect2.TypeOf[T](), func(val any) TRet {
return handler(val.(T)) return handler(val.(T))
}) })
} }

+ 6
- 6
pkgs/types/union.go View File

@@ -4,17 +4,17 @@ import (
"fmt" "fmt"
"reflect" "reflect"


myreflect "gitlink.org.cn/cloudream/common/utils/reflect"
"gitlink.org.cn/cloudream/common/utils/reflect2"
) )


type AnyTypeUnion struct { type AnyTypeUnion struct {
// 这个集合的类型 // 这个集合的类型
UnionType myreflect.Type
UnionType reflect2.Type
// 集合中包含的类型,即遇到UnionType类型的值时,它内部的实际类型的范围 // 集合中包含的类型,即遇到UnionType类型的值时,它内部的实际类型的范围
ElementTypes []myreflect.Type
ElementTypes []reflect2.Type
} }


func (u *AnyTypeUnion) Include(typ myreflect.Type) bool {
func (u *AnyTypeUnion) Include(typ reflect2.Type) bool {
for _, t := range u.ElementTypes { for _, t := range u.ElementTypes {
if t == typ { if t == typ {
return true return true
@@ -24,7 +24,7 @@ func (u *AnyTypeUnion) Include(typ myreflect.Type) bool {
return false return false
} }


func (u *AnyTypeUnion) Add(typ myreflect.Type) error {
func (u *AnyTypeUnion) Add(typ reflect2.Type) error {
if !typ.AssignableTo(u.UnionType) { if !typ.AssignableTo(u.UnionType) {
return fmt.Errorf("type is not assignable to union type") return fmt.Errorf("type is not assignable to union type")
} }
@@ -55,7 +55,7 @@ func NewTypeUnion[TU any](eleValues ...TU) TypeUnion[TU] {


return TypeUnion[TU]{ return TypeUnion[TU]{
AnyTypeUnion{ AnyTypeUnion{
UnionType: myreflect.TypeOf[TU](),
UnionType: reflect2.TypeOf[TU](),
ElementTypes: eleTypes, ElementTypes: eleTypes,
}, },
} }


+ 1
- 1
sdks/pcm/models.go View File

@@ -34,5 +34,5 @@ const (
TaskStatusPending TaskStatus = "Pending" TaskStatusPending TaskStatus = "Pending"
TaskStatusRunning TaskStatus = "Running" TaskStatusRunning TaskStatus = "Running"
TaskStatusSuccess TaskStatus = "succeeded" TaskStatusSuccess TaskStatus = "succeeded"
TaskStatuFailed TaskStatus = "failed"
TaskStatusFailed TaskStatus = "failed"
) )

+ 3
- 3
sdks/scheduler/models.go View File

@@ -35,7 +35,7 @@ type JobInfo interface {


var JobInfoTypeUnion = types.NewTypeUnion[JobInfo]( var JobInfoTypeUnion = types.NewTypeUnion[JobInfo](
(*NormalJobInfo)(nil), (*NormalJobInfo)(nil),
(*ResourceJobInfo)(nil),
(*DataReturnJobInfo)(nil),
) )
var _ = serder.UseTypeUnionInternallyTagged(&JobInfoTypeUnion, "type") var _ = serder.UseTypeUnionInternallyTagged(&JobInfoTypeUnion, "type")


@@ -57,8 +57,8 @@ type NormalJobInfo struct {
Services JobServicesInfo `json:"services"` Services JobServicesInfo `json:"services"`
} }


type ResourceJobInfo struct {
serder.Metadata `union:"Resource"`
type DataReturnJobInfo struct {
serder.Metadata `union:"DataReturn"`
JobInfoBase JobInfoBase
Type string `json:"type"` Type string `json:"type"`
BucketID cdssdk.BucketID `json:"bucketID"` BucketID cdssdk.BucketID `json:"bucketID"`


+ 1
- 1
sdks/scheduler/scheduler_test.go View File

@@ -15,7 +15,7 @@ func Test_JobSet(t *testing.T) {
id, err := cli.JobSetSumbit(JobSetSumbitReq{ id, err := cli.JobSetSumbit(JobSetSumbitReq{
JobSetInfo: JobSetInfo{ JobSetInfo: JobSetInfo{
Jobs: []JobInfo{ Jobs: []JobInfo{
&ResourceJobInfo{
&DataReturnJobInfo{
Type: JobTypeResource, Type: JobTypeResource,
}, },
&NormalJobInfo{ &NormalJobInfo{


+ 2
- 2
sdks/storage/package.go View File

@@ -217,7 +217,7 @@ func (c *PackageService) GetCachedNodes(req PackageGetCachedNodesReq) (*PackageG
return nil, err return nil, err
} }
resp, err := myhttp.GetJSON(url, myhttp.RequestParam{ resp, err := myhttp.GetJSON(url, myhttp.RequestParam{
Body: req,
Query: req,
}) })
if err != nil { if err != nil {
return nil, err return nil, err
@@ -252,7 +252,7 @@ func (c *PackageService) GetLoadedNodes(req PackageGetLoadedNodesReq) (*PackageG
return nil, err return nil, err
} }
resp, err := myhttp.GetJSON(url, myhttp.RequestParam{ resp, err := myhttp.GetJSON(url, myhttp.RequestParam{
Body: req,
Query: req,
}) })
if err != nil { if err != nil {
return nil, err return nil, err


utils/reflect/reflect.go → utils/reflect2/reflect.go View File

@@ -1,4 +1,4 @@
package reflect
package reflect2


import "reflect" import "reflect"



+ 7
- 7
utils/serder/any_to_any.go View File

@@ -4,7 +4,7 @@ import (
"reflect" "reflect"


mp "github.com/mitchellh/mapstructure" mp "github.com/mitchellh/mapstructure"
myreflect "gitlink.org.cn/cloudream/common/utils/reflect"
"gitlink.org.cn/cloudream/common/utils/reflect2"
) )


type Converter func(from reflect.Value, to reflect.Value) (interface{}, error) type Converter func(from reflect.Value, to reflect.Value) (interface{}, error)
@@ -68,11 +68,11 @@ func AnyToAny(src any, dst any, opts ...AnyToAnyOption) error {


// fromAny 如果目的字段实现的FromAny接口,那么通过此接口实现字段类型转换 // fromAny 如果目的字段实现的FromAny接口,那么通过此接口实现字段类型转换
func fromAny(srcType reflect.Type, targetType reflect.Type, data interface{}) (interface{}, error) { func fromAny(srcType reflect.Type, targetType reflect.Type, data interface{}) (interface{}, error) {
if myreflect.TypeOfValue(data) == targetType {
if reflect2.TypeOfValue(data) == targetType {
return data, nil return data, nil
} }


if targetType.Implements(myreflect.TypeOf[FromAny]()) {
if targetType.Implements(reflect2.TypeOf[FromAny]()) {
// 非pointer receiver的FromAny没有意义,因为修改不了receiver的内容,所以这里只支持指针类型 // 非pointer receiver的FromAny没有意义,因为修改不了receiver的内容,所以这里只支持指针类型
if targetType.Kind() == reflect.Pointer { if targetType.Kind() == reflect.Pointer {
val := reflect.New(targetType.Elem()) val := reflect.New(targetType.Elem())
@@ -88,7 +88,7 @@ func fromAny(srcType reflect.Type, targetType reflect.Type, data interface{}) (i
return val.Interface(), nil return val.Interface(), nil
} }


} else if reflect.PointerTo(targetType).Implements(myreflect.TypeOf[FromAny]()) {
} else if reflect.PointerTo(targetType).Implements(reflect2.TypeOf[FromAny]()) {
val := reflect.New(targetType) val := reflect.New(targetType)
anyIf := val.Interface().(FromAny) anyIf := val.Interface().(FromAny)
ok, err := anyIf.FromAny(data) ok, err := anyIf.FromAny(data)
@@ -107,12 +107,12 @@ func fromAny(srcType reflect.Type, targetType reflect.Type, data interface{}) (i


// 如果源字段实现了ToAny接口,那么通过此接口实现字段类型转换 // 如果源字段实现了ToAny接口,那么通过此接口实现字段类型转换
func toAny(srcType reflect.Type, targetType reflect.Type, data interface{}) (interface{}, error) { func toAny(srcType reflect.Type, targetType reflect.Type, data interface{}) (interface{}, error) {
dataType := myreflect.TypeOfValue(data)
dataType := reflect2.TypeOfValue(data)
if dataType == targetType { if dataType == targetType {
return data, nil return data, nil
} }


if dataType.Implements(myreflect.TypeOf[ToAny]()) {
if dataType.Implements(reflect2.TypeOf[ToAny]()) {
anyIf := data.(ToAny) anyIf := data.(ToAny)
dstVal, ok, err := anyIf.ToAny(targetType) dstVal, ok, err := anyIf.ToAny(targetType)
if err != nil { if err != nil {
@@ -123,7 +123,7 @@ func toAny(srcType reflect.Type, targetType reflect.Type, data interface{}) (int
} }


return dstVal, nil return dstVal, nil
} else if reflect.PointerTo(dataType).Implements(myreflect.TypeOf[ToAny]()) {
} else if reflect.PointerTo(dataType).Implements(reflect2.TypeOf[ToAny]()) {
dataVal := reflect.ValueOf(data) dataVal := reflect.ValueOf(data)


dataPtrVal := reflect.New(dataType) dataPtrVal := reflect.New(dataType)


+ 5
- 5
utils/serder/serder_test.go View File

@@ -7,7 +7,7 @@ import (


. "github.com/smartystreets/goconvey/convey" . "github.com/smartystreets/goconvey/convey"
"gitlink.org.cn/cloudream/common/pkgs/types" "gitlink.org.cn/cloudream/common/pkgs/types"
myreflect "gitlink.org.cn/cloudream/common/utils/reflect"
"gitlink.org.cn/cloudream/common/utils/reflect2"
) )


type FromAnyString struct { type FromAnyString struct {
@@ -28,7 +28,7 @@ type ToAnyString struct {
} }


func (a *ToAnyString) ToAny(typ reflect.Type) (val any, ok bool, err error) { func (a *ToAnyString) ToAny(typ reflect.Type) (val any, ok bool, err error) {
if typ == myreflect.TypeOf[map[string]any]() {
if typ == reflect2.TypeOf[map[string]any]() {
return map[string]any{ return map[string]any{
"str": "@" + a.Str, "str": "@" + a.Str,
}, true, nil }, true, nil
@@ -55,7 +55,7 @@ type ToAnySt struct {
} }


func (a *ToAnySt) ToAny(typ reflect.Type) (val any, ok bool, err error) { func (a *ToAnySt) ToAny(typ reflect.Type) (val any, ok bool, err error) {
if typ == myreflect.TypeOf[FromAnySt]() {
if typ == reflect2.TypeOf[FromAnySt]() {
return FromAnySt{ return FromAnySt{
Value: "To:" + a.Value, Value: "To:" + a.Value,
}, true, nil }, true, nil
@@ -69,7 +69,7 @@ type DirToAnySt struct {
} }


func (a DirToAnySt) ToAny(typ reflect.Type) (val any, ok bool, err error) { func (a DirToAnySt) ToAny(typ reflect.Type) (val any, ok bool, err error) {
if typ == myreflect.TypeOf[FromAnySt]() {
if typ == reflect2.TypeOf[FromAnySt]() {
return FromAnySt{ return FromAnySt{
Value: "DirTo:" + a.Value, Value: "DirTo:" + a.Value,
}, true, nil }, true, nil
@@ -181,7 +181,7 @@ func Test_AnyToAny(t *testing.T) {


err := AnyToAny(st1, &st2, AnyToAnyOption{ err := AnyToAny(st1, &st2, AnyToAnyOption{
Converters: []Converter{func(from reflect.Value, to reflect.Value) (interface{}, error) { Converters: []Converter{func(from reflect.Value, to reflect.Value) (interface{}, error) {
if from.Type() == myreflect.TypeOf[Struct1]() && to.Type() == myreflect.TypeOf[Struct2]() {
if from.Type() == reflect2.TypeOf[Struct1]() && to.Type() == reflect2.TypeOf[Struct2]() {
s1 := from.Interface().(Struct1) s1 := from.Interface().(Struct1)
return Struct2{ return Struct2{
Value: "@" + s1.Value, Value: "@" + s1.Value,


+ 4
- 4
utils/serder/union_handler.go View File

@@ -9,7 +9,7 @@ import (
"github.com/modern-go/reflect2" "github.com/modern-go/reflect2"
"gitlink.org.cn/cloudream/common/pkgs/types" "gitlink.org.cn/cloudream/common/pkgs/types"


myreflect "gitlink.org.cn/cloudream/common/utils/reflect"
ref2 "gitlink.org.cn/cloudream/common/utils/reflect2"
) )


type anyTypeUnionExternallyTagged struct { type anyTypeUnionExternallyTagged struct {
@@ -106,14 +106,14 @@ func (u *TypeUnionInternallyTagged[T]) Add(typ reflect.Type) error {
} }


// 要求内嵌Metadata结构体,那么结构体中的字段名就会是Metadata, // 要求内嵌Metadata结构体,那么结构体中的字段名就会是Metadata,
field, ok := structType.FieldByName(myreflect.TypeNameOf[Metadata]())
field, ok := structType.FieldByName(ref2.TypeNameOf[Metadata]())
if !ok { if !ok {
u.TagToType[makeDerefFullTypeName(structType)] = typ u.TagToType[makeDerefFullTypeName(structType)] = typ
return nil return nil
} }


// 为防同名,检查类型是不是也是Metadata // 为防同名,检查类型是不是也是Metadata
if field.Type != myreflect.TypeOf[Metadata]() {
if field.Type != ref2.TypeOf[Metadata]() {
u.TagToType[makeDerefFullTypeName(structType)] = typ u.TagToType[makeDerefFullTypeName(structType)] = typ
return nil return nil
} }
@@ -293,7 +293,7 @@ func (e *ExternallyTaggedEncoder) Encode(ptr unsafe.Pointer, stream *jsoniter.St
} }


stream.WriteObjectStart() stream.WriteObjectStart()
valType := myreflect.TypeOfValue(val)
valType := ref2.TypeOfValue(val)
if !e.union.Union.Include(valType) { if !e.union.Union.Include(valType) {
stream.Error = fmt.Errorf("type %v is not in union %v", valType, e.union.Union.UnionType) stream.Error = fmt.Errorf("type %v is not in union %v", valType, e.union.Union.UnionType)
return return


+ 3
- 3
utils/serder/walk_test.go View File

@@ -6,7 +6,7 @@ import (
"testing" "testing"


. "github.com/smartystreets/goconvey/convey" . "github.com/smartystreets/goconvey/convey"
myreflect "gitlink.org.cn/cloudream/common/utils/reflect"
"gitlink.org.cn/cloudream/common/utils/reflect2"
) )


func Test_WalkValue(t *testing.T) { func Test_WalkValue(t *testing.T) {
@@ -38,8 +38,8 @@ func Test_WalkValue(t *testing.T) {


isBaseDataType := func(val reflect.Value) bool { isBaseDataType := func(val reflect.Value) bool {
typ := val.Type() typ := val.Type()
return typ == myreflect.TypeOf[int]() || typ == myreflect.TypeOf[bool]() ||
typ == myreflect.TypeOf[string]() || typ == myreflect.TypeOf[float32]() || val.IsZero()
return typ == reflect2.TypeOf[int]() || typ == reflect2.TypeOf[bool]() ||
typ == reflect2.TypeOf[string]() || typ == reflect2.TypeOf[float32]() || val.IsZero()
} }


toString := func(val any) string { toString := func(val any) string {


+ 77
- 0
utils/sync2/channel.go View File

@@ -0,0 +1,77 @@
package sync2

import (
"context"
"errors"
"sync"
)

var ErrChannelClosed = errors.New("channel is closed")

type Channel[T any] struct {
ch chan T
closed chan any
closeOnce sync.Once
err error
}

func NewChannel[T any]() *Channel[T] {
return &Channel[T]{
ch: make(chan T),
closed: make(chan any),
}
}

func (c *Channel[T]) Error() error {
return c.err
}

func (c *Channel[T]) Send(val T) error {
select {
case c.ch <- val:
return nil
case <-c.closed:
return c.err
}
}

func (c *Channel[T]) Receive(ctx context.Context) (T, error) {
select {
case val := <-c.ch:
return val, nil
case <-c.closed:
var t T
return t, c.err
case <-ctx.Done():
var t T
return t, ctx.Err()
}
}

func (c *Channel[T]) Sender() chan<- T {
return c.ch
}

func (c *Channel[T]) Receiver() <-chan T {
return c.ch
}

func (c *Channel[T]) Close() {
c.closeOnce.Do(func() {
close(c.closed)
close(c.ch)
c.err = ErrChannelClosed
})
}

func (c *Channel[T]) CloseWithError(err error) {
c.closeOnce.Do(func() {
close(c.closed)
close(c.ch)
c.err = err
})
}

func (c *Channel[T]) Closed() <-chan any {
return c.closed
}

+ 48
- 0
utils/sync2/select_set.go View File

@@ -0,0 +1,48 @@
package sync2

import (
"reflect"

"gitlink.org.cn/cloudream/common/utils/lo2"
)

type SelectCase int

type SelectSet[T any, C any] struct {
cases []reflect.SelectCase
tags []T
}

func (s *SelectSet[T, C]) Add(tag T, ch <-chan C) SelectCase {
s.cases = append(s.cases, reflect.SelectCase{Dir: reflect.SelectRecv, Chan: reflect.ValueOf(ch)})
s.tags = append(s.tags, tag)

return SelectCase(len(s.cases) - 1)
}

func (s *SelectSet[T, C]) AddDefault(tag T, ch <-chan C) SelectCase {
s.cases = append(s.cases, reflect.SelectCase{Dir: reflect.SelectDefault, Chan: reflect.ValueOf(ch)})
s.tags = append(s.tags, tag)

return SelectCase(len(s.cases) - 1)
}

func (s *SelectSet[T, C]) Remove(caze SelectCase) {
s.cases = lo2.RemoveAt(s.cases, int(caze))
s.tags = lo2.RemoveAt(s.tags, int(caze))
}

func (s *SelectSet[T, C]) Select() (T, C, bool) {
chosen, recv, ok := reflect.Select(s.cases)
if !ok {
var t T
var c C
return t, c, false
}

return s.tags[chosen], recv.Interface().(C), true
}

func (s *SelectSet[T, C]) Count() int {
return len(s.cases)
}

Loading…
Cancel
Save