diff --git a/consts/errorcode/error_code.go b/consts/errorcode/error_code.go index 4c7bc59..fc9e00c 100644 --- a/consts/errorcode/error_code.go +++ b/consts/errorcode/error_code.go @@ -3,6 +3,7 @@ package errorcode const ( OK = "OK" OperationFailed = "OperationFailed" + DataNotFound = "DataNotFound" BadArgument = "BadArgument" TaskNotFound = "TaskNotFound" ) diff --git a/go.mod b/go.mod index c8621aa..8207adb 100644 --- a/go.mod +++ b/go.mod @@ -11,6 +11,7 @@ require ( github.com/json-iterator/go v1.1.12 github.com/magefile/mage v1.15.0 github.com/mitchellh/mapstructure v1.5.0 + github.com/modern-go/reflect2 v1.0.2 github.com/otiai10/copy v1.12.0 github.com/samber/lo v1.36.0 github.com/sirupsen/logrus v1.9.2 @@ -41,7 +42,6 @@ require ( github.com/minio/sha256-simd v1.0.0 // indirect github.com/mitchellh/go-homedir v1.1.0 // indirect github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421 // indirect - github.com/modern-go/reflect2 v1.0.2 // indirect github.com/mr-tron/base58 v1.2.0 // indirect github.com/multiformats/go-base32 v0.1.0 // indirect github.com/multiformats/go-base36 v0.2.0 // indirect diff --git a/pkgs/actor/actor.go b/pkgs/actor/actor.go index 08e6f93..2df0b71 100644 --- a/pkgs/actor/actor.go +++ b/pkgs/actor/actor.go @@ -8,7 +8,7 @@ import ( "github.com/zyedidia/generic/list" "gitlink.org.cn/cloudream/common/pkgs/future" - mysync "gitlink.org.cn/cloudream/common/utils/sync" + "gitlink.org.cn/cloudream/common/utils/sync2" ) type CommandFn func() @@ -16,7 +16,7 @@ type CommandFn func() type CommandChannel struct { cmds *list.List[CommandFn] cmdsLock sync.Mutex - cmdsCounter *mysync.CounterCond + cmdsCounter *sync2.CounterCond chanReceive chan CommandFn chanReceiveDone atomic.Bool @@ -28,7 +28,7 @@ type CommandChannel struct { func NewCommandChannel() *CommandChannel { return &CommandChannel{ cmds: list.New[CommandFn](), - cmdsCounter: mysync.NewCounterCond(0), + cmdsCounter: sync2.NewCounterCond(0), cmdChan: make(chan CommandFn), } } diff --git a/pkgs/cmdtrie/command_trie.go b/pkgs/cmdtrie/command_trie.go index e2d3c05..5f86407 100644 --- a/pkgs/cmdtrie/command_trie.go +++ b/pkgs/cmdtrie/command_trie.go @@ -7,9 +7,11 @@ import ( "github.com/samber/lo" "gitlink.org.cn/cloudream/common/pkgs/trie" - myreflect "gitlink.org.cn/cloudream/common/utils/reflect" + "gitlink.org.cn/cloudream/common/utils/reflect2" ) +var ErrCommandNotFound = fmt.Errorf("command not found") + type ExecuteOption struct { ReplaceEmptyArrayWithNil bool // 如果最后一个参数是空数组,则调用命令的时候传递nil参数 } @@ -178,7 +180,7 @@ func (t *anyCommandTrie) findCommand(cmdWords []string, argWords []string) (*com }) if cmd == nil { - return nil, nil, fmt.Errorf("command not found") + return nil, nil, ErrCommandNotFound } return cmd, argWords, nil } @@ -296,7 +298,7 @@ type CommandTrie[TCtx any, TRet any] struct { func NewCommandTrie[TCtx any, TRet any]() CommandTrie[TCtx, TRet] { return CommandTrie[TCtx, TRet]{ - anyTrie: newAnyCommandTrie(myreflect.TypeOf[TCtx](), myreflect.TypeOf[TRet]()), + anyTrie: newAnyCommandTrie(reflect2.TypeOf[TCtx](), reflect2.TypeOf[TRet]()), } } @@ -337,7 +339,7 @@ type VoidCommandTrie[TCtx any] struct { func NewVoidCommandTrie[TCtx any]() VoidCommandTrie[TCtx] { return VoidCommandTrie[TCtx]{ - anyTrie: newAnyCommandTrie(myreflect.TypeOf[TCtx](), nil), + anyTrie: newAnyCommandTrie(reflect2.TypeOf[TCtx](), nil), } } @@ -368,7 +370,7 @@ type StaticCommandTrie[TRet any] struct { func NewStaticCommandTrie[TRet any]() StaticCommandTrie[TRet] { return StaticCommandTrie[TRet]{ - anyTrie: newAnyCommandTrie(nil, myreflect.TypeOf[TRet]()), + anyTrie: newAnyCommandTrie(nil, reflect2.TypeOf[TRet]()), } } diff --git a/pkgs/distlock/internal/acquire_actor.go b/pkgs/distlock/internal/acquire_actor.go index 78589b8..d53f199 100644 --- a/pkgs/distlock/internal/acquire_actor.go +++ b/pkgs/distlock/internal/acquire_actor.go @@ -10,7 +10,7 @@ import ( "gitlink.org.cn/cloudream/common/pkgs/future" "gitlink.org.cn/cloudream/common/pkgs/logger" - mylo "gitlink.org.cn/cloudream/common/utils/lo" + "gitlink.org.cn/cloudream/common/utils/lo2" "gitlink.org.cn/cloudream/common/utils/serder" clientv3 "go.etcd.io/etcd/client/v3" "go.etcd.io/etcd/client/v3/concurrency" @@ -84,7 +84,7 @@ func (a *AcquireActor) Acquire(ctx context.Context, req LockRequest) (string, er return } - a.acquirings = mylo.Remove(a.acquirings, info) + a.acquirings = lo2.Remove(a.acquirings, info) if info.LastErr != nil { info.Callback.SetError(info.LastErr) } else { @@ -213,7 +213,7 @@ func (a *AcquireActor) doAcquiring() error { } req.Callback.SetValue(reqData.ID) - a.acquirings = mylo.RemoveAt(a.acquirings, i) + a.acquirings = lo2.RemoveAt(a.acquirings, i) break } diff --git a/pkgs/distlock/internal/service_info_actor.go b/pkgs/distlock/internal/service_info_actor.go index a940574..dcbac71 100644 --- a/pkgs/distlock/internal/service_info_actor.go +++ b/pkgs/distlock/internal/service_info_actor.go @@ -8,7 +8,7 @@ import ( "github.com/google/uuid" "gitlink.org.cn/cloudream/common/pkgs/logger" - mylo "gitlink.org.cn/cloudream/common/utils/lo" + "gitlink.org.cn/cloudream/common/utils/lo2" "gitlink.org.cn/cloudream/common/utils/serder" clientv3 "go.etcd.io/etcd/client/v3" ) @@ -191,6 +191,6 @@ func (a *ServiceInfoActor) OnLockRequestEvent(evt LockRequestEvent) { if evt.IsLocking { status.LockRequestIDs = append(status.LockRequestIDs, evt.Data.ID) } else { - status.LockRequestIDs = mylo.Remove(status.LockRequestIDs, evt.Data.ID) + status.LockRequestIDs = lo2.Remove(status.LockRequestIDs, evt.Data.ID) } } diff --git a/pkgs/event/executor.go b/pkgs/event/executor.go index afd8a82..3536826 100644 --- a/pkgs/event/executor.go +++ b/pkgs/event/executor.go @@ -4,7 +4,7 @@ import ( "sync" "github.com/zyedidia/generic/list" - mysync "gitlink.org.cn/cloudream/common/utils/sync" + "gitlink.org.cn/cloudream/common/utils/sync2" ) type ExecuteOption struct { @@ -26,7 +26,7 @@ type postedEvent[TArgs any] struct { type Executor[TArgs any] struct { events *list.List[postedEvent[TArgs]] locker sync.Mutex - eventCond *mysync.CounterCond + eventCond *sync2.CounterCond execArgs TArgs } @@ -34,7 +34,7 @@ func NewExecutor[TArgs any](args TArgs) Executor[TArgs] { return Executor[TArgs]{ events: list.New[postedEvent[TArgs]](), locker: sync.Mutex{}, - eventCond: mysync.NewCounterCond(0), + eventCond: sync2.NewCounterCond(0), execArgs: args, } diff --git a/pkgs/ipfs/ipfs.go b/pkgs/ipfs/ipfs.go index 1822ce9..3f4e09e 100644 --- a/pkgs/ipfs/ipfs.go +++ b/pkgs/ipfs/ipfs.go @@ -6,12 +6,12 @@ import ( "io" shell "github.com/ipfs/go-ipfs-api" - myio "gitlink.org.cn/cloudream/common/utils/io" + "gitlink.org.cn/cloudream/common/utils/io2" ) type ReadOption struct { - Offset int64 // 从指定位置开始读取,为-1时代表不设置,从头开始读 - Length int64 // 读取长度,为-1时代表不设置,读取Offset之后的所有内容 + Offset int64 `json:"offset,string"` // 从指定位置开始读取,为-1时代表不设置,从头开始读 + Length int64 `json:"length,string"` // 读取长度,为-1时代表不设置,读取Offset之后的所有内容 } type Client struct { @@ -35,7 +35,7 @@ func (fs *Client) IsUp() bool { return fs.shell.IsUp() } -func (fs *Client) CreateFileStream() (myio.PromiseWriteCloser[string], error) { +func (fs *Client) CreateFileStream() (io2.PromiseWriteCloser[string], error) { pr, pw := io.Pipe() ipfsWriter := ipfsWriter{ diff --git a/pkgs/iterator/empty.go b/pkgs/iterator/empty.go new file mode 100644 index 0000000..0fea9cc --- /dev/null +++ b/pkgs/iterator/empty.go @@ -0,0 +1,15 @@ +package iterator + +type emptyIterator[T any] struct{} + +func (i *emptyIterator[T]) MoveNext() (T, error) { + var ret T + return ret, ErrNoMoreItem +} +func (i *emptyIterator[T]) Close() { + +} + +func Empty[T any]() Iterator[T] { + return &emptyIterator[T]{} +} diff --git a/pkgs/iterator/fuse.go b/pkgs/iterator/fuse.go new file mode 100644 index 0000000..7174ce4 --- /dev/null +++ b/pkgs/iterator/fuse.go @@ -0,0 +1,18 @@ +package iterator + +type fuseError[T any] struct { + err error +} + +func (i *fuseError[T]) MoveNext() (T, error) { + var ret T + return ret, i.err +} +func (i *fuseError[T]) Close() { + +} +func FuseError[T any](err error) Iterator[T] { + return &fuseError[T]{ + err: err, + } +} diff --git a/pkgs/logger/global_logger.go b/pkgs/logger/global_logger.go index 793f1fc..3a469fc 100644 --- a/pkgs/logger/global_logger.go +++ b/pkgs/logger/global_logger.go @@ -8,7 +8,7 @@ import ( nested "github.com/antonfisher/nested-logrus-formatter" "github.com/sirupsen/logrus" - myreflect "gitlink.org.cn/cloudream/common/utils/reflect" + "gitlink.org.cn/cloudream/common/utils/reflect2" ) // 输出日志到标准输出。适用于没有设计好日志输出方案时临时使用。 @@ -123,6 +123,6 @@ func WithField(key string, val any) Logger { func WithType[T any](key string) Logger { return &logrusLogger{ - entry: logrus.WithField(key, myreflect.TypeOf[T]().Name()), + entry: logrus.WithField(key, reflect2.TypeOf[T]().Name()), } } diff --git a/pkgs/mq/client.go b/pkgs/mq/client.go index 078cdc2..8d6f090 100644 --- a/pkgs/mq/client.go +++ b/pkgs/mq/client.go @@ -10,7 +10,7 @@ import ( "github.com/streadway/amqp" "gitlink.org.cn/cloudream/common/consts/errorcode" "gitlink.org.cn/cloudream/common/pkgs/logger" - myreflect "gitlink.org.cn/cloudream/common/utils/reflect" + "gitlink.org.cn/cloudream/common/utils/reflect2" ) const ( @@ -22,12 +22,12 @@ const ( var ErrWaitResponseTimeout = fmt.Errorf("wait response timeout") type CodeMessageError struct { - code string - message string + Code string + Message string } func (e *CodeMessageError) Error() string { - return fmt.Sprintf("code: %s, message: %s", e.code, e.message) + return fmt.Sprintf("code: %s, message: %s", e.Code, e.Message) } type SendOption struct { @@ -300,7 +300,7 @@ func (c *RabbitMQTransport) Close() error { // 发送消息并等待回应。因为无法自动推断出TResp的类型,所以将其放在第一个手工填写,之后的TBody可以自动推断出来 func Request[TSvc any, TReq MessageBody, TResp MessageBody](_ func(svc TSvc, msg TReq) (TResp, *CodeMessage), cli RoundTripper, req TReq, opts ...RequestOption) (TResp, error) { - opt := RequestOption{Timeout: time.Second * 15} + opt := RequestOption{Timeout: time.Second * 15, KeepAlive: true} if len(opts) > 0 { opt = opts[0] } @@ -315,16 +315,16 @@ func Request[TSvc any, TReq MessageBody, TResp MessageBody](_ func(svc TSvc, msg errCode, errMsg := resp.GetCodeMessage() if errCode != errorcode.OK { return defRet, &CodeMessageError{ - code: errCode, - message: errMsg, + Code: errCode, + Message: errMsg, } } respBody, ok := resp.Body.(TResp) if !ok { return defRet, fmt.Errorf("expect a %s body, but got %s", - myreflect.ElemTypeOf[TResp]().Name(), - myreflect.TypeOfValue(resp.Body).Name()) + reflect2.ElemTypeOf[TResp]().Name(), + reflect2.TypeOfValue(resp.Body).Name()) } return respBody, nil diff --git a/pkgs/mq/message.go b/pkgs/mq/message.go index 36a7ad3..452716a 100644 --- a/pkgs/mq/message.go +++ b/pkgs/mq/message.go @@ -2,7 +2,7 @@ package mq import ( "gitlink.org.cn/cloudream/common/pkgs/types" - myreflect "gitlink.org.cn/cloudream/common/utils/reflect" + "gitlink.org.cn/cloudream/common/utils/reflect2" "gitlink.org.cn/cloudream/common/utils/serder" ) @@ -83,7 +83,7 @@ var msgBodyTypeUnion = serder.UseTypeUnionExternallyTagged(types.Ref(types.NewTy // 所有新定义的Message都需要在init中调用此函数 func RegisterMessage[T MessageBody]() { - err := msgBodyTypeUnion.Add(myreflect.TypeOf[T]()) + err := msgBodyTypeUnion.Add(reflect2.TypeOf[T]()) if err != nil { panic(err) } diff --git a/pkgs/mq/message_dispatcher.go b/pkgs/mq/message_dispatcher.go index 666e872..629fbf7 100644 --- a/pkgs/mq/message_dispatcher.go +++ b/pkgs/mq/message_dispatcher.go @@ -3,27 +3,27 @@ package mq import ( "fmt" - myreflect "gitlink.org.cn/cloudream/common/utils/reflect" + "gitlink.org.cn/cloudream/common/utils/reflect2" ) type HandlerFn func(svcBase any, msg *Message) (*Message, error) type MessageDispatcher struct { - Handlers map[myreflect.Type]HandlerFn + Handlers map[reflect2.Type]HandlerFn } func NewMessageDispatcher() MessageDispatcher { return MessageDispatcher{ - Handlers: make(map[myreflect.Type]HandlerFn), + Handlers: make(map[reflect2.Type]HandlerFn), } } -func (h *MessageDispatcher) Add(typ myreflect.Type, handler HandlerFn) { +func (h *MessageDispatcher) Add(typ reflect2.Type, handler HandlerFn) { h.Handlers[typ] = handler } func (h *MessageDispatcher) Handle(svcBase any, msg *Message) (*Message, error) { - typ := myreflect.TypeOfValue(msg.Body) + typ := reflect2.TypeOfValue(msg.Body) fn, ok := h.Handlers[typ] if !ok { return nil, fmt.Errorf("unsupported message type: %s", typ.String()) @@ -34,7 +34,7 @@ func (h *MessageDispatcher) Handle(svcBase any, msg *Message) (*Message, error) // 将Service中的一个接口函数作为指定类型消息的处理函数 func AddServiceFn[TSvc any, TReq MessageBody, TResp MessageBody](dispatcher *MessageDispatcher, svcFn func(svc TSvc, msg TReq) (TResp, *CodeMessage)) { - dispatcher.Add(myreflect.TypeOf[TReq](), func(svcBase any, reqMsg *Message) (*Message, error) { + dispatcher.Add(reflect2.TypeOf[TReq](), func(svcBase any, reqMsg *Message) (*Message, error) { reqMsgBody := reqMsg.Body.(TReq) ret, codeMsg := svcFn(svcBase.(TSvc), reqMsgBody) @@ -47,7 +47,7 @@ func AddServiceFn[TSvc any, TReq MessageBody, TResp MessageBody](dispatcher *Mes // 将Service中的一个*没有返回值的*接口函数作为指定类型消息的处理函数 func AddNoRespServiceFn[TSvc any, TReq MessageBody](dispatcher *MessageDispatcher, svcFn func(svc TSvc, msg TReq)) { - dispatcher.Add(myreflect.TypeOf[TReq](), func(svcBase any, reqMsg *Message) (*Message, error) { + dispatcher.Add(reflect2.TypeOf[TReq](), func(svcBase any, reqMsg *Message) (*Message, error) { reqMsgBody := reqMsg.Body.(TReq) svcFn(svcBase.(TSvc), reqMsgBody) diff --git a/pkgs/mq/message_test.go b/pkgs/mq/message_test.go index b2482fa..03be582 100644 --- a/pkgs/mq/message_test.go +++ b/pkgs/mq/message_test.go @@ -9,7 +9,7 @@ import ( jsoniter "github.com/json-iterator/go" . "github.com/smartystreets/goconvey/convey" "gitlink.org.cn/cloudream/common/pkgs/types" - myreflect "gitlink.org.cn/cloudream/common/utils/reflect" + "gitlink.org.cn/cloudream/common/utils/reflect2" "gitlink.org.cn/cloudream/common/utils/serder" ) @@ -42,13 +42,13 @@ func TestMessage(t *testing.T) { Nil: nil, } - jsoniter.RegisterTypeEncoderFunc(myreflect.TypeOf[MyAny]().String(), + jsoniter.RegisterTypeEncoderFunc(reflect2.TypeOf[MyAny]().String(), func(ptr unsafe.Pointer, stream *jsoniter.Stream) { val := *((*MyAny)(ptr)) stream.WriteArrayStart() if val != nil { - stream.WriteString(myreflect.TypeOfValue(val).String()) + stream.WriteString(reflect2.TypeOfValue(val).String()) stream.WriteRaw(",") stream.WriteVal(val) } @@ -58,7 +58,7 @@ func TestMessage(t *testing.T) { return false }) - jsoniter.RegisterTypeDecoderFunc(myreflect.TypeOf[MyAny]().String(), + jsoniter.RegisterTypeDecoderFunc(reflect2.TypeOf[MyAny]().String(), func(ptr unsafe.Pointer, iter *jsoniter.Iterator) { vp := (*MyAny)(ptr) diff --git a/pkgs/task/manager.go b/pkgs/task/manager.go index 9f7b276..b925e18 100644 --- a/pkgs/task/manager.go +++ b/pkgs/task/manager.go @@ -5,7 +5,7 @@ import ( "sync" "time" - mylo "gitlink.org.cn/cloudream/common/utils/lo" + "gitlink.org.cn/cloudream/common/utils/lo2" ) type Manager[TCtx any] struct { @@ -108,12 +108,12 @@ func (m *Manager[TCtx]) executeTask(task *Task[TCtx]) { // 立刻删除任务,或者延迟一段时间再删除 if opt.RemovingDelay == 0 { - m.tasks = mylo.Remove(m.tasks, task) + m.tasks = lo2.Remove(m.tasks, task) } else { go func() { <-time.After(opt.RemovingDelay) m.lock.Lock() - m.tasks = mylo.Remove(m.tasks, task) + m.tasks = lo2.Remove(m.tasks, task) m.lock.Unlock() }() } diff --git a/pkgs/task/task.go b/pkgs/task/task.go index 7f6ff35..2210f86 100644 --- a/pkgs/task/task.go +++ b/pkgs/task/task.go @@ -5,7 +5,7 @@ import ( "sync/atomic" "time" - mylo "gitlink.org.cn/cloudream/common/utils/lo" + "gitlink.org.cn/cloudream/common/utils/lo2" ) type CompleteOption struct { @@ -82,7 +82,7 @@ func (t *Task[TCtx]) WaitTimeout(timeout time.Duration) bool { select { case <-time.After(timeout): t.waiterLock.Lock() - t.waiters = mylo.Remove(t.waiters, waiter) + t.waiters = lo2.Remove(t.waiters, waiter) t.waiterLock.Unlock() return false diff --git a/pkgs/typedispatcher/type_dispatcher.go b/pkgs/typedispatcher/type_dispatcher.go index 4270244..57f046a 100644 --- a/pkgs/typedispatcher/type_dispatcher.go +++ b/pkgs/typedispatcher/type_dispatcher.go @@ -1,28 +1,28 @@ package typedispatcher import ( - myreflect "gitlink.org.cn/cloudream/common/utils/reflect" + "gitlink.org.cn/cloudream/common/utils/reflect2" ) type HandlerFn[TRet any] func(val any) TRet type TypeDispatcher[TRet any] struct { - handlers map[myreflect.Type]HandlerFn[TRet] + handlers map[reflect2.Type]HandlerFn[TRet] } func NewTypeDispatcher[TRet any]() TypeDispatcher[TRet] { return TypeDispatcher[TRet]{ - handlers: make(map[myreflect.Type]HandlerFn[TRet]), + handlers: make(map[reflect2.Type]HandlerFn[TRet]), } } -func (t *TypeDispatcher[TRet]) Add(typ myreflect.Type, fn HandlerFn[TRet]) { +func (t *TypeDispatcher[TRet]) Add(typ reflect2.Type, fn HandlerFn[TRet]) { t.handlers[typ] = fn } func (t *TypeDispatcher[TRet]) Dispatch(val any) (TRet, bool) { var ret TRet - typ := myreflect.TypeOfValue(val) + typ := reflect2.TypeOfValue(val) handler, ok := t.handlers[typ] if !ok { return ret, false @@ -32,7 +32,7 @@ func (t *TypeDispatcher[TRet]) Dispatch(val any) (TRet, bool) { } func Add[T any, TRet any](dispatcher TypeDispatcher[TRet], handler func(val T) TRet) { - dispatcher.Add(myreflect.TypeOf[T](), func(val any) TRet { + dispatcher.Add(reflect2.TypeOf[T](), func(val any) TRet { return handler(val.(T)) }) } diff --git a/pkgs/types/union.go b/pkgs/types/union.go index 639e25c..8d0d73c 100644 --- a/pkgs/types/union.go +++ b/pkgs/types/union.go @@ -4,17 +4,17 @@ import ( "fmt" "reflect" - myreflect "gitlink.org.cn/cloudream/common/utils/reflect" + "gitlink.org.cn/cloudream/common/utils/reflect2" ) type AnyTypeUnion struct { // 这个集合的类型 - UnionType myreflect.Type + UnionType reflect2.Type // 集合中包含的类型,即遇到UnionType类型的值时,它内部的实际类型的范围 - ElementTypes []myreflect.Type + ElementTypes []reflect2.Type } -func (u *AnyTypeUnion) Include(typ myreflect.Type) bool { +func (u *AnyTypeUnion) Include(typ reflect2.Type) bool { for _, t := range u.ElementTypes { if t == typ { return true @@ -24,7 +24,7 @@ func (u *AnyTypeUnion) Include(typ myreflect.Type) bool { return false } -func (u *AnyTypeUnion) Add(typ myreflect.Type) error { +func (u *AnyTypeUnion) Add(typ reflect2.Type) error { if !typ.AssignableTo(u.UnionType) { return fmt.Errorf("type is not assignable to union type") } @@ -55,7 +55,7 @@ func NewTypeUnion[TU any](eleValues ...TU) TypeUnion[TU] { return TypeUnion[TU]{ AnyTypeUnion{ - UnionType: myreflect.TypeOf[TU](), + UnionType: reflect2.TypeOf[TU](), ElementTypes: eleTypes, }, } diff --git a/sdks/imfs/imfs_test.go b/sdks/imfs/imfs_test.go index 22011a1..257d006 100644 --- a/sdks/imfs/imfs_test.go +++ b/sdks/imfs/imfs_test.go @@ -32,7 +32,7 @@ func Test_Package(t *testing.T) { URL: "http://localhost:7893", }) - _, err := cli.PackageGetWithObjects(PackageGetWithObjectsInfos{UserID: 0, PackageID: 13}) + _, err := cli.PackageGetWithObjects(PackageGetWithObjectsInfos{UserID: 1, PackageID: 13}) So(err, ShouldBeNil) }) } diff --git a/sdks/pcm/models.go b/sdks/pcm/models.go index 0b6acb3..a225375 100644 --- a/sdks/pcm/models.go +++ b/sdks/pcm/models.go @@ -34,5 +34,5 @@ const ( TaskStatusPending TaskStatus = "Pending" TaskStatusRunning TaskStatus = "Running" TaskStatusSuccess TaskStatus = "succeeded" - TaskStatuFailed TaskStatus = "failed" + TaskStatusFailed TaskStatus = "failed" ) diff --git a/sdks/scheduler/models.go b/sdks/scheduler/models.go index 2748c55..0b059b1 100644 --- a/sdks/scheduler/models.go +++ b/sdks/scheduler/models.go @@ -9,6 +9,7 @@ import ( const ( JobTypeNormal = "Normal" JobTypeResource = "Resource" + JobTypeInstance = "Instance" FileInfoTypePackage = "Package" FileInfoTypeLocalFile = "LocalFile" @@ -35,7 +36,9 @@ type JobInfo interface { var JobInfoTypeUnion = types.NewTypeUnion[JobInfo]( (*NormalJobInfo)(nil), - (*ResourceJobInfo)(nil), + (*DataReturnJobInfo)(nil), + (*MultiInstanceJobInfo)(nil), + (*InstanceJobInfo)(nil), ) var _ = serder.UseTypeUnionInternallyTagged(&JobInfoTypeUnion, "type") @@ -57,14 +60,33 @@ type NormalJobInfo struct { Services JobServicesInfo `json:"services"` } -type ResourceJobInfo struct { - serder.Metadata `union:"Resource"` +type DataReturnJobInfo struct { + serder.Metadata `union:"DataReturn"` JobInfoBase Type string `json:"type"` BucketID cdssdk.BucketID `json:"bucketID"` TargetLocalJobID string `json:"targetLocalJobID"` } +type MultiInstanceJobInfo struct { + serder.Metadata `union:"MultiInstance"` + JobInfoBase + Type string `json:"type"` + Files JobFilesInfo `json:"files"` + Runtime JobRuntimeInfo `json:"runtime"` + Resources JobResourcesInfo `json:"resources"` +} + +type InstanceJobInfo struct { + serder.Metadata `union:"Instance"` + JobInfoBase + Type string `json:"type"` + LocalJobID string `json:"multiInstJobID"` + Files JobFilesInfo `json:"files"` + Runtime JobRuntimeInfo `json:"runtime"` + Resources JobResourcesInfo `json:"resources"` +} + type JobFilesInfo struct { Dataset JobFileInfo `json:"dataset"` Code JobFileInfo `json:"code"` @@ -78,7 +100,7 @@ type JobFileInfo interface { var FileInfoTypeUnion = types.NewTypeUnion[JobFileInfo]( (*PackageJobFileInfo)(nil), (*LocalJobFileInfo)(nil), - (*ResourceJobFileInfo)(nil), + (*DataReturnJobFileInfo)(nil), (*ImageJobFileInfo)(nil), ) var _ = serder.UseTypeUnionInternallyTagged(&FileInfoTypeUnion, "type") @@ -101,11 +123,11 @@ type LocalJobFileInfo struct { LocalPath string `json:"localPath"` } -type ResourceJobFileInfo struct { - serder.Metadata `union:"Resource"` +type DataReturnJobFileInfo struct { + serder.Metadata `union:"DataReturn"` JobFileInfoBase - Type string `json:"type"` - ResourceLocalJobID string `json:"resourceLocalJobID"` + Type string `json:"type"` + DataReturnLocalJobID string `json:"dataReturnLocalJobID"` } type ImageJobFileInfo struct { @@ -140,6 +162,10 @@ type JobSetFilesUploadScheme struct { LocalFileSchemes []LocalFileUploadScheme `json:"localFileUploadSchemes"` } +type JobFilesUploadScheme struct { + LocalFileSchemes []LocalFileUploadScheme `json:"localFileUploadSchemes"` +} + type LocalFileUploadScheme struct { LocalPath string `json:"localPath"` UploadToCDSNodeID *cdssdk.NodeID `json:"uploadToCDSNodeID"` diff --git a/sdks/scheduler/scheduler_test.go b/sdks/scheduler/scheduler_test.go index 25a2977..9b22a18 100644 --- a/sdks/scheduler/scheduler_test.go +++ b/sdks/scheduler/scheduler_test.go @@ -15,7 +15,7 @@ func Test_JobSet(t *testing.T) { id, err := cli.JobSetSumbit(JobSetSumbitReq{ JobSetInfo: JobSetInfo{ Jobs: []JobInfo{ - &ResourceJobInfo{ + &DataReturnJobInfo{ Type: JobTypeResource, }, &NormalJobInfo{ diff --git a/sdks/storage/bucket.go b/sdks/storage/bucket.go new file mode 100644 index 0000000..15bfac8 --- /dev/null +++ b/sdks/storage/bucket.go @@ -0,0 +1,156 @@ +package cdssdk + +import ( + "net/url" + + "gitlink.org.cn/cloudream/common/consts/errorcode" + myhttp "gitlink.org.cn/cloudream/common/utils/http" +) + +type BucketService struct { + *Client +} + +func (c *Client) Bucket() *BucketService { + return &BucketService{c} +} + +const BucketGetByNamePath = "/bucket/getByName" + +type BucketGetByName struct { + UserID UserID `json:"userID" form:"userID" binding:"required"` + Name string `json:"name" form:"name" binding:"required"` +} +type BucketGetByNameResp struct { + Bucket Bucket `json:"bucket"` +} + +func (c *BucketService) GetByName(req BucketGetByName) (*BucketGetByNameResp, error) { + url, err := url.JoinPath(c.baseURL, BucketGetByNamePath) + if err != nil { + return nil, err + } + + resp, err := myhttp.GetForm(url, myhttp.RequestParam{ + Query: req, + }) + if err != nil { + return nil, err + } + + codeResp, err := ParseJSONResponse[response[BucketGetByNameResp]](resp) + if err != nil { + return nil, err + } + + if codeResp.Code == errorcode.OK { + return &codeResp.Data, nil + } + + return nil, codeResp.ToError() +} + +const BucketCreatePath = "/bucket/create" + +type BucketCreate struct { + UserID UserID `json:"userID" binding:"required"` + Name string `json:"name" binding:"required"` +} + +type BucketCreateResp struct { + Bucket Bucket `json:"bucket"` +} + +func (c *BucketService) Create(req BucketCreate) (*BucketCreateResp, error) { + url, err := url.JoinPath(c.baseURL, BucketCreatePath) + if err != nil { + return nil, err + } + + resp, err := myhttp.PostJSON(url, myhttp.RequestParam{ + Body: req, + }) + if err != nil { + return nil, err + } + + codeResp, err := ParseJSONResponse[response[BucketCreateResp]](resp) + if err != nil { + return nil, err + } + + if codeResp.Code == errorcode.OK { + return &codeResp.Data, nil + } + + return nil, codeResp.ToError() +} + +const BucketDeletePath = "/bucket/delete" + +type BucketDelete struct { + UserID UserID `json:"userID" binding:"required"` + BucketID BucketID `json:"bucketID" binding:"required"` +} + +type BucketDeleteResp struct{} + +func (c *BucketService) Delete(req BucketDelete) error { + url, err := url.JoinPath(c.baseURL, BucketDeletePath) + if err != nil { + return err + } + + resp, err := myhttp.PostJSON(url, myhttp.RequestParam{ + Body: req, + }) + if err != nil { + return err + } + + codeResp, err := ParseJSONResponse[response[BucketDeleteResp]](resp) + if err != nil { + return err + } + + if codeResp.Code == errorcode.OK { + return nil + } + + return codeResp.ToError() +} + +const BucketListUserBucketsPath = "/bucket/listUserBuckets" + +type BucketListUserBucketsReq struct { + UserID UserID `form:"userID" json:"userID" binding:"required"` +} + +type BucketListUserBucketsResp struct { + Buckets []Bucket `json:"buckets"` +} + +func (c *BucketService) ListUserBuckets(req BucketListUserBucketsReq) (*BucketListUserBucketsResp, error) { + url, err := url.JoinPath(c.baseURL, BucketListUserBucketsPath) + if err != nil { + return nil, err + } + + resp, err := myhttp.GetForm(url, myhttp.RequestParam{ + Query: req, + }) + if err != nil { + return nil, err + } + + codeResp, err := ParseJSONResponse[response[BucketListUserBucketsResp]](resp) + if err != nil { + return nil, err + } + + if codeResp.Code == errorcode.OK { + return &codeResp.Data, nil + } + + return nil, codeResp.ToError() +} diff --git a/sdks/storage/cache.go b/sdks/storage/cache.go index a6866a1..74d02b7 100644 --- a/sdks/storage/cache.go +++ b/sdks/storage/cache.go @@ -7,7 +7,7 @@ import ( myhttp "gitlink.org.cn/cloudream/common/utils/http" ) -var CacheMovePackagePath = "/cache/movePackage" +const CacheMovePackagePath = "/cache/movePackage" type CacheMovePackageReq struct { UserID UserID `json:"userID"` @@ -29,7 +29,7 @@ func (c *Client) CacheMovePackage(req CacheMovePackageReq) (*CacheMovePackageRes return nil, err } - jsonResp, err := myhttp.ParseJSONResponse[response[CacheMovePackageResp]](resp) + jsonResp, err := ParseJSONResponse[response[CacheMovePackageResp]](resp) if err != nil { return nil, err } diff --git a/sdks/storage/models.go b/sdks/storage/models.go index 9c60ce1..2597cae 100644 --- a/sdks/storage/models.go +++ b/sdks/storage/models.go @@ -10,7 +10,7 @@ import ( ) const ( - ObjectPathSeperator = "/" + ObjectPathSeparator = "/" ) type NodeID int64 @@ -112,6 +112,8 @@ type Object struct { Size int64 `db:"Size" json:"size,string"` FileHash string `db:"FileHash" json:"fileHash"` Redundancy Redundancy `db:"Redundancy" json:"redundancy"` + CreateTime time.Time `db:"CreateTime" json:"createTime"` + UpdateTime time.Time `db:"UpdateTime" json:"updateTime"` } type Node struct { @@ -132,6 +134,19 @@ type PinnedObject struct { CreateTime time.Time `db:"CreateTime" json:"createTime"` } +type Bucket struct { + BucketID BucketID `db:"BucketID" json:"bucketID"` + Name string `db:"Name" json:"name"` + CreatorID UserID `db:"CreatorID" json:"creatorID"` +} + +type NodeConnectivity struct { + FromNodeID NodeID `db:"FromNodeID" json:"fromNodeID"` + ToNodeID NodeID `db:"ToNodeID" json:"ToNodeID"` + Delay *float32 `db:"Delay" json:"delay"` + TestTime time.Time `db:"TestTime" json:"testTime"` +} + type NodePackageCachingInfo struct { NodeID NodeID `json:"nodeID"` FileSize int64 `json:"fileSize"` diff --git a/sdks/storage/node.go b/sdks/storage/node.go index 050e859..111135f 100644 --- a/sdks/storage/node.go +++ b/sdks/storage/node.go @@ -30,7 +30,7 @@ func (c *Client) NodeGetNodes(req NodeGetNodesReq) (*NodeGetNodesResp, error) { return nil, err } - jsonResp, err := myhttp.ParseJSONResponse[response[NodeGetNodesResp]](resp) + jsonResp, err := ParseJSONResponse[response[NodeGetNodesResp]](resp) if err != nil { return nil, err } diff --git a/sdks/storage/object.go b/sdks/storage/object.go index 81fb5d7..fd82606 100644 --- a/sdks/storage/object.go +++ b/sdks/storage/object.go @@ -5,19 +5,111 @@ import ( "io" "net/url" "strings" + "time" "gitlink.org.cn/cloudream/common/consts/errorcode" + "gitlink.org.cn/cloudream/common/pkgs/iterator" myhttp "gitlink.org.cn/cloudream/common/utils/http" "gitlink.org.cn/cloudream/common/utils/serder" ) -type ObjectDownloadReq struct { - UserID int64 `json:"userID"` - ObjectID int64 `json:"objectID"` +type ObjectService struct { + *Client } -func (c *Client) ObjectDownload(req ObjectDownloadReq) (io.ReadCloser, error) { - url, err := url.JoinPath(c.baseURL, "/object/download") +func (c *Client) Object() *ObjectService { + return &ObjectService{ + Client: c, + } +} + +const ObjectUploadPath = "/object/upload" + +type ObjectUpload struct { + ObjectUploadInfo + Files UploadObjectIterator `json:"-"` +} + +type ObjectUploadInfo struct { + UserID UserID `json:"userID" binding:"required"` + PackageID PackageID `json:"packageID" binding:"required"` + NodeAffinity *NodeID `json:"nodeAffinity"` +} + +type UploadingObject struct { + Path string + File io.ReadCloser +} + +type UploadObjectIterator = iterator.Iterator[*UploadingObject] + +type ObjectUploadResp struct { + Uploadeds []UploadedObject `json:"uploadeds"` +} +type UploadedObject struct { + Object *Object `json:"object"` + Error string `json:"error"` +} + +func (c *ObjectService) Upload(req ObjectUpload) (*ObjectUploadResp, error) { + url, err := url.JoinPath(c.baseURL, ObjectUploadPath) + if err != nil { + return nil, err + } + + infoJSON, err := serder.ObjectToJSON(req) + if err != nil { + return nil, fmt.Errorf("upload info to json: %w", err) + } + + resp, err := myhttp.PostMultiPart(url, myhttp.MultiPartRequestParam{ + Form: map[string]string{"info": string(infoJSON)}, + Files: iterator.Map(req.Files, func(src *UploadingObject) (*myhttp.IterMultiPartFile, error) { + return &myhttp.IterMultiPartFile{ + FieldName: "files", + FileName: src.Path, + File: src.File, + }, nil + }), + }) + if err != nil { + return nil, err + } + + contType := resp.Header.Get("Content-Type") + if strings.Contains(contType, myhttp.ContentTypeJSON) { + var err error + var codeResp response[ObjectUploadResp] + if codeResp, err = serder.JSONToObjectStreamEx[response[ObjectUploadResp]](resp.Body); err != nil { + return nil, fmt.Errorf("parsing response: %w", err) + } + + if codeResp.Code == errorcode.OK { + return &codeResp.Data, nil + } + + return nil, codeResp.ToError() + } + + return nil, fmt.Errorf("unknow response content type: %s", contType) + +} + +const ObjectDownloadPath = "/object/download" + +type ObjectDownload struct { + UserID UserID `form:"userID" json:"userID" binding:"required"` + ObjectID ObjectID `form:"objectID" json:"objectID" binding:"required"` + Offset int64 `form:"offset" json:"offset,omitempty"` + Length *int64 `form:"length" json:"length,omitempty"` +} +type DownloadingObject struct { + Path string + File io.ReadCloser +} + +func (c *ObjectService) Download(req ObjectDownload) (*DownloadingObject, error) { + url, err := url.JoinPath(c.baseURL, ObjectDownloadPath) if err != nil { return nil, err } @@ -40,24 +132,162 @@ func (c *Client) ObjectDownload(req ObjectDownloadReq) (io.ReadCloser, error) { return nil, codeResp.ToError() } - if strings.Contains(contType, myhttp.ContentTypeOctetStream) { - return resp.Body, nil + _, files, err := myhttp.ParseMultiPartResponse(resp) + if err != nil { + return nil, err } - return nil, fmt.Errorf("unknow response content type: %s", contType) + file, err := files.MoveNext() + if err == iterator.ErrNoMoreItem { + return nil, fmt.Errorf("no file found in response") + } + if err != nil { + return nil, err + } + + return &DownloadingObject{ + Path: file.FileName, + File: file.File, + }, nil +} + +const ObjectUpdateInfoPath = "/object/updateInfo" + +type UpdatingObject struct { + ObjectID ObjectID `json:"objectID" binding:"required"` + UpdateTime time.Time `json:"updateTime" binding:"required"` +} + +func (u *UpdatingObject) ApplyTo(obj *Object) { + obj.UpdateTime = u.UpdateTime +} + +type ObjectUpdateInfo struct { + UserID UserID `json:"userID" binding:"required"` + Updatings []UpdatingObject `json:"updatings" binding:"required"` +} + +type ObjectUpdateInfoResp struct { + Successes []ObjectID `json:"successes"` +} + +func (c *ObjectService) UpdateInfo(req ObjectUpdateInfo) (*ObjectUpdateInfoResp, error) { + url, err := url.JoinPath(c.baseURL, ObjectUpdateInfoPath) + if err != nil { + return nil, err + } + + resp, err := myhttp.PostJSON(url, myhttp.RequestParam{ + Body: req, + }) + if err != nil { + return nil, err + } + + jsonResp, err := ParseJSONResponse[response[ObjectUpdateInfoResp]](resp) + if err != nil { + return nil, err + } + + if jsonResp.Code == errorcode.OK { + return &jsonResp.Data, nil + } + + return nil, jsonResp.ToError() +} + +const ObjectMovePath = "/object/move" + +type MovingObject struct { + ObjectID ObjectID `json:"objectID" binding:"required"` + PackageID PackageID `json:"packageID" binding:"required"` + Path string `json:"path" binding:"required"` +} + +func (m *MovingObject) ApplyTo(obj *Object) { + obj.PackageID = m.PackageID + obj.Path = m.Path +} + +type ObjectMove struct { + UserID UserID `json:"userID" binding:"required"` + Movings []MovingObject `json:"movings" binding:"required"` +} + +type ObjectMoveResp struct { + Successes []ObjectID `json:"successes"` +} + +func (c *ObjectService) Move(req ObjectMove) (*ObjectMoveResp, error) { + url, err := url.JoinPath(c.baseURL, ObjectMovePath) + if err != nil { + return nil, err + } + + resp, err := myhttp.PostJSON(url, myhttp.RequestParam{ + Body: req, + }) + if err != nil { + return nil, err + } + + jsonResp, err := ParseJSONResponse[response[ObjectMoveResp]](resp) + if err != nil { + return nil, err + } + + if jsonResp.Code == errorcode.OK { + return &jsonResp.Data, nil + } + + return nil, jsonResp.ToError() +} + +const ObjectDeletePath = "/object/delete" + +type ObjectDelete struct { + UserID UserID `json:"userID" binding:"required"` + ObjectIDs []ObjectID `json:"objectIDs" binding:"required"` +} + +type ObjectDeleteResp struct{} + +func (c *ObjectService) Delete(req ObjectDelete) error { + url, err := url.JoinPath(c.baseURL, ObjectDeletePath) + if err != nil { + return err + } + + resp, err := myhttp.PostJSON(url, myhttp.RequestParam{ + Body: req, + }) + if err != nil { + return err + } + + jsonResp, err := ParseJSONResponse[response[ObjectDeleteResp]](resp) + if err != nil { + return err + } + + if jsonResp.Code == errorcode.OK { + return nil + } + + return jsonResp.ToError() } -var ObjectGetPackageObjectsPath = "/object/getPackageObjects" +const ObjectGetPackageObjectsPath = "/object/getPackageObjects" -type ObjectGetPackageObjectsReq struct { - UserID UserID `json:"userID"` - PackageID PackageID `json:"packageID"` +type ObjectGetPackageObjects struct { + UserID UserID `form:"userID" json:"userID" binding:"required"` + PackageID PackageID `form:"packageID" json:"packageID" binding:"required"` } type ObjectGetPackageObjectsResp struct { Objects []Object `json:"objects"` } -func (c *Client) ObjectGetPackageObjects(req ObjectGetPackageObjectsReq) (*ObjectGetPackageObjectsResp, error) { +func (c *ObjectService) GetPackageObjects(req ObjectGetPackageObjects) (*ObjectGetPackageObjectsResp, error) { url, err := url.JoinPath(c.baseURL, ObjectGetPackageObjectsPath) if err != nil { return nil, err @@ -70,7 +300,7 @@ func (c *Client) ObjectGetPackageObjects(req ObjectGetPackageObjectsReq) (*Objec return nil, err } - jsonResp, err := myhttp.ParseJSONResponse[response[ObjectGetPackageObjectsResp]](resp) + jsonResp, err := ParseJSONResponse[response[ObjectGetPackageObjectsResp]](resp) if err != nil { return nil, err } diff --git a/sdks/storage/package.go b/sdks/storage/package.go index c264a21..22b6aa2 100644 --- a/sdks/storage/package.go +++ b/sdks/storage/package.go @@ -2,26 +2,34 @@ package cdssdk import ( "fmt" - "io" "net/url" "strings" "gitlink.org.cn/cloudream/common/consts/errorcode" - "gitlink.org.cn/cloudream/common/pkgs/iterator" myhttp "gitlink.org.cn/cloudream/common/utils/http" "gitlink.org.cn/cloudream/common/utils/serder" ) +type PackageService struct { + *Client +} + +func (c *Client) Package() *PackageService { + return &PackageService{c} +} + +const PackageGetPath = "/package/get" + type PackageGetReq struct { - UserID UserID `json:"userID"` - PackageID PackageID `json:"packageID"` + UserID UserID `form:"userID" json:"userID" binding:"required"` + PackageID PackageID `form:"packageID" json:"packageID" binding:"required"` } type PackageGetResp struct { Package } -func (c *Client) PackageGet(req PackageGetReq) (*PackageGetResp, error) { - url, err := url.JoinPath(c.baseURL, "/package/get") +func (c *PackageService) Get(req PackageGetReq) (*PackageGetResp, error) { + url, err := url.JoinPath(c.baseURL, PackageGetPath) if err != nil { return nil, err } @@ -33,7 +41,7 @@ func (c *Client) PackageGet(req PackageGetReq) (*PackageGetResp, error) { return nil, err } - codeResp, err := myhttp.ParseJSONResponse[response[PackageGetResp]](resp) + codeResp, err := ParseJSONResponse[response[PackageGetResp]](resp) if err != nil { return nil, err } @@ -45,75 +53,88 @@ func (c *Client) PackageGet(req PackageGetReq) (*PackageGetResp, error) { return nil, codeResp.ToError() } -type PackageUploadReq struct { - UserID UserID `json:"userID"` - BucketID BucketID `json:"bucketID"` - Name string `json:"name"` - NodeAffinity *NodeID `json:"nodeAffinity"` - Files PackageUploadFileIterator `json:"-"` -} +const PackageGetByNamePath = "/package/getByName" -type IterPackageUploadFile struct { - Path string - File io.ReadCloser +type PackageGetByName struct { + UserID UserID `form:"userID" json:"userID" binding:"required"` + BucketName string `form:"bucketName" json:"bucketName" binding:"required"` + PackageName string `form:"packageName" json:"packageName" binding:"required"` } - -type PackageUploadFileIterator = iterator.Iterator[*IterPackageUploadFile] - -type PackageUploadResp struct { - PackageID PackageID `json:"packageID,string"` +type PackageGetByNameResp struct { + Package Package `json:"package"` } -func (c *Client) PackageUpload(req PackageUploadReq) (*PackageUploadResp, error) { - url, err := url.JoinPath(c.baseURL, "/package/upload") +func (c *PackageService) GetByName(req PackageGetByName) (*PackageGetByNameResp, error) { + url, err := url.JoinPath(c.baseURL, PackageGetByNamePath) if err != nil { return nil, err } - infoJSON, err := serder.ObjectToJSON(req) + resp, err := myhttp.GetForm(url, myhttp.RequestParam{ + Query: req, + }) if err != nil { - return nil, fmt.Errorf("package info to json: %w", err) + return nil, err } - resp, err := myhttp.PostMultiPart(url, myhttp.MultiPartRequestParam{ - Form: map[string]string{"info": string(infoJSON)}, - Files: iterator.Map(req.Files, func(src *IterPackageUploadFile) (*myhttp.IterMultiPartFile, error) { - return &myhttp.IterMultiPartFile{ - FieldName: "files", - FileName: src.Path, - File: src.File, - }, nil - }), - }) + codeResp, err := ParseJSONResponse[response[PackageGetByNameResp]](resp) if err != nil { return nil, err } - contType := resp.Header.Get("Content-Type") - if strings.Contains(contType, myhttp.ContentTypeJSON) { - var codeResp response[PackageUploadResp] - if err := serder.JSONToObjectStream(resp.Body, &codeResp); err != nil { - return nil, fmt.Errorf("parsing response: %w", err) - } + if codeResp.Code == errorcode.OK { + return &codeResp.Data, nil + } - if codeResp.Code == errorcode.OK { - return &codeResp.Data, nil - } + return nil, codeResp.ToError() +} + +const PackageCreatePath = "/package/create" + +type PackageCreate struct { + UserID UserID `json:"userID"` + BucketID BucketID `json:"bucketID"` + Name string `json:"name"` +} - return nil, codeResp.ToError() +type PackageCreateResp struct { + Package Package `json:"package"` +} + +func (s *PackageService) Create(req PackageCreate) (*PackageCreateResp, error) { + url, err := url.JoinPath(s.baseURL, PackageCreatePath) + if err != nil { + return nil, err + } + + resp, err := myhttp.PostJSON(url, myhttp.RequestParam{ + Body: req, + }) + if err != nil { + return nil, err } - return nil, fmt.Errorf("unknow response content type: %s", contType) + codeResp, err := ParseJSONResponse[response[PackageCreateResp]](resp) + if err != nil { + return nil, err + } + if codeResp.Code == errorcode.OK { + return &codeResp.Data, nil + } + + return nil, codeResp.ToError() } -type PackageDeleteReq struct { - UserID UserID `json:"userID"` - PackageID PackageID `json:"packageID"` +const PackageDeletePath = "/package/delete" + +type PackageDelete struct { + UserID UserID `json:"userID" binding:"required"` + PackageID PackageID `json:"packageID" binding:"required"` } -func (c *Client) PackageDelete(req PackageDeleteReq) error { - url, err := url.JoinPath(c.baseURL, "/package/delete") +func (c *PackageService) Delete(req PackageDelete) error { + url, err := url.JoinPath(c.baseURL, PackageDeletePath) if err != nil { return err } @@ -143,79 +164,108 @@ func (c *Client) PackageDelete(req PackageDeleteReq) error { return fmt.Errorf("unknow response content type: %s", contType) } +const PackageListBucketPackagesPath = "/package/listBucketPackages" + +type PackageListBucketPackages struct { + UserID UserID `form:"userID" json:"userID" binding:"required"` + BucketID BucketID `form:"bucketID" json:"bucketID" binding:"required"` +} + +type PackageListBucketPackagesResp struct { + Packages []Package `json:"packages"` +} + +func (c *PackageService) ListBucketPackages(req PackageListBucketPackages) (*PackageListBucketPackagesResp, error) { + url, err := url.JoinPath(c.baseURL, PackageListBucketPackagesPath) + if err != nil { + return nil, err + } + + resp, err := myhttp.GetForm(url, myhttp.RequestParam{ + Query: req, + }) + if err != nil { + return nil, err + } + + codeResp, err := ParseJSONResponse[response[PackageListBucketPackagesResp]](resp) + if err != nil { + return nil, err + } + + if codeResp.Code == errorcode.OK { + return &codeResp.Data, nil + } + + return nil, codeResp.ToError() +} + +const PackageGetCachedNodesPath = "/package/getCachedNodes" + type PackageGetCachedNodesReq struct { - PackageID PackageID `json:"packageID"` - UserID UserID `json:"userID"` + PackageID PackageID `form:"packageID" json:"packageID" binding:"required"` + UserID UserID `form:"userID" json:"userID" binding:"required"` } type PackageGetCachedNodesResp struct { PackageCachingInfo } -func (c *Client) PackageGetCachedNodes(req PackageGetCachedNodesReq) (*PackageGetCachedNodesResp, error) { - url, err := url.JoinPath(c.baseURL, "/package/getCachedNodes") +func (c *PackageService) GetCachedNodes(req PackageGetCachedNodesReq) (*PackageGetCachedNodesResp, error) { + url, err := url.JoinPath(c.baseURL, PackageGetCachedNodesPath) if err != nil { return nil, err } resp, err := myhttp.GetJSON(url, myhttp.RequestParam{ - Body: req, + Query: req, }) if err != nil { return nil, err } - contType := resp.Header.Get("Content-Type") - if strings.Contains(contType, myhttp.ContentTypeJSON) { - var codeResp response[PackageGetCachedNodesResp] - if err := serder.JSONToObjectStream(resp.Body, &codeResp); err != nil { - return nil, fmt.Errorf("parsing response: %w", err) - } - - if codeResp.Code == errorcode.OK { - return &codeResp.Data, nil - } + codeResp, err := ParseJSONResponse[response[PackageGetCachedNodesResp]](resp) + if err != nil { + return nil, err + } - return nil, codeResp.ToError() + if codeResp.Code == errorcode.OK { + return &codeResp.Data, nil } - return nil, fmt.Errorf("unknow response content type: %s", contType) + return nil, codeResp.ToError() } +const PackageGetLoadedNodesPath = "/package/getLoadedNodes" + type PackageGetLoadedNodesReq struct { - PackageID PackageID `json:"packageID"` - UserID UserID `json:"userID"` + PackageID PackageID `form:"packageID" json:"packageID" binding:"required"` + UserID UserID `form:"userID" json:"userID" binding:"required"` } type PackageGetLoadedNodesResp struct { NodeIDs []NodeID `json:"nodeIDs"` } -func (c *Client) PackageGetLoadedNodes(req PackageGetLoadedNodesReq) (*PackageGetLoadedNodesResp, error) { - url, err := url.JoinPath(c.baseURL, "/package/getLoadedNodes") +func (c *PackageService) GetLoadedNodes(req PackageGetLoadedNodesReq) (*PackageGetLoadedNodesResp, error) { + url, err := url.JoinPath(c.baseURL, PackageGetLoadedNodesPath) if err != nil { return nil, err } resp, err := myhttp.GetJSON(url, myhttp.RequestParam{ - Body: req, + Query: req, }) if err != nil { return nil, err } - contType := resp.Header.Get("Content-Type") - if strings.Contains(contType, myhttp.ContentTypeJSON) { - - var codeResp response[PackageGetLoadedNodesResp] - if err := serder.JSONToObjectStream(resp.Body, &codeResp); err != nil { - return nil, fmt.Errorf("parsing response: %w", err) - } - - if codeResp.Code == errorcode.OK { - return &codeResp.Data, nil - } + codeResp, err := ParseJSONResponse[response[PackageGetLoadedNodesResp]](resp) + if err != nil { + return nil, err + } - return nil, codeResp.ToError() + if codeResp.Code == errorcode.OK { + return &codeResp.Data, nil } - return nil, fmt.Errorf("unknow response content type: %s", contType) + return nil, codeResp.ToError() } diff --git a/sdks/storage/storage.go b/sdks/storage/storage.go index 827d06e..439119d 100644 --- a/sdks/storage/storage.go +++ b/sdks/storage/storage.go @@ -10,17 +10,19 @@ import ( "gitlink.org.cn/cloudream/common/utils/serder" ) +const StorageLoadPackagePath = "/storage/loadPackage" + type StorageLoadPackageReq struct { - UserID UserID `json:"userID"` - PackageID PackageID `json:"packageID"` - StorageID StorageID `json:"storageID"` + UserID UserID `json:"userID" binding:"required"` + PackageID PackageID `json:"packageID" binding:"required"` + StorageID StorageID `json:"storageID" binding:"required"` } type StorageLoadPackageResp struct { FullPath string `json:"fullPath"` } func (c *Client) StorageLoadPackage(req StorageLoadPackageReq) (*StorageLoadPackageResp, error) { - url, err := url.JoinPath(c.baseURL, "/storage/loadPackage") + url, err := url.JoinPath(c.baseURL, StorageLoadPackagePath) if err != nil { return nil, err } @@ -32,7 +34,7 @@ func (c *Client) StorageLoadPackage(req StorageLoadPackageReq) (*StorageLoadPack return nil, err } - codeResp, err := myhttp.ParseJSONResponse[response[StorageLoadPackageResp]](resp) + codeResp, err := ParseJSONResponse[response[StorageLoadPackageResp]](resp) if err != nil { return nil, err } @@ -44,12 +46,15 @@ func (c *Client) StorageLoadPackage(req StorageLoadPackageReq) (*StorageLoadPack return nil, codeResp.ToError() } +const StorageCreatePackagePath = "/storage/createPackage" + type StorageCreatePackageReq struct { - UserID UserID `json:"userID"` - StorageID StorageID `json:"storageID"` - Path string `json:"path"` - BucketID BucketID `json:"bucketID"` - Name string `json:"name"` + UserID UserID `json:"userID" binding:"required"` + StorageID StorageID `json:"storageID" binding:"required"` + Path string `json:"path" binding:"required"` + BucketID BucketID `json:"bucketID" binding:"required"` + Name string `json:"name" binding:"required"` + NodeAffinity *NodeID `json:"nodeAffinity"` } type StorageCreatePackageResp struct { @@ -57,7 +62,7 @@ type StorageCreatePackageResp struct { } func (c *Client) StorageCreatePackage(req StorageCreatePackageReq) (*StorageCreatePackageResp, error) { - url, err := url.JoinPath(c.baseURL, "/storage/createPackage") + url, err := url.JoinPath(c.baseURL, StorageCreatePackagePath) if err != nil { return nil, err } @@ -86,9 +91,11 @@ func (c *Client) StorageCreatePackage(req StorageCreatePackageReq) (*StorageCrea return nil, fmt.Errorf("unknow response content type: %s", contType) } +const StorageGetInfoPath = "/storage/getInfo" + type StorageGetInfoReq struct { - UserID UserID `json:"userID"` - StorageID StorageID `json:"storageID"` + UserID UserID `form:"userID" json:"userID" binding:"required"` + StorageID StorageID `form:"storageID" json:"storageID" binding:"required"` } type StorageGetInfoResp struct { Name string `json:"name"` @@ -97,7 +104,7 @@ type StorageGetInfoResp struct { } func (c *Client) StorageGetInfo(req StorageGetInfoReq) (*StorageGetInfoResp, error) { - url, err := url.JoinPath(c.baseURL, "/storage/getInfo") + url, err := url.JoinPath(c.baseURL, StorageGetInfoPath) if err != nil { return nil, err } @@ -109,7 +116,7 @@ func (c *Client) StorageGetInfo(req StorageGetInfoReq) (*StorageGetInfoResp, err return nil, err } - codeResp, err := myhttp.ParseJSONResponse[response[StorageGetInfoResp]](resp) + codeResp, err := ParseJSONResponse[response[StorageGetInfoResp]](resp) if err != nil { return nil, err } diff --git a/sdks/storage/storage_test.go b/sdks/storage/storage_test.go index 11e7115..a5a7d6c 100644 --- a/sdks/storage/storage_test.go +++ b/sdks/storage/storage_test.go @@ -23,16 +23,24 @@ func Test_PackageGet(t *testing.T) { } pkgName := uuid.NewString() - upResp, err := cli.PackageUpload(PackageUploadReq{ - UserID: 0, + createResp, err := cli.Package().Create(PackageCreate{ + UserID: 1, BucketID: 1, Name: pkgName, + }) + So(err, ShouldBeNil) + + _, err = cli.Object().Upload(ObjectUpload{ + ObjectUploadInfo: ObjectUploadInfo{ + UserID: 1, + PackageID: createResp.Package.PackageID, + }, Files: iterator.Array( - &IterPackageUploadFile{ - Path: "test", + &UploadingObject{ + Path: "abc/test", File: io.NopCloser(bytes.NewBuffer(fileData)), }, - &IterPackageUploadFile{ + &UploadingObject{ Path: "test2", File: io.NopCloser(bytes.NewBuffer(fileData)), }, @@ -40,18 +48,18 @@ func Test_PackageGet(t *testing.T) { }) So(err, ShouldBeNil) - getResp, err := cli.PackageGet(PackageGetReq{ - UserID: 0, - PackageID: upResp.PackageID, + getResp, err := cli.Package().Get(PackageGetReq{ + UserID: 1, + PackageID: createResp.Package.PackageID, }) So(err, ShouldBeNil) - So(getResp.PackageID, ShouldEqual, upResp.PackageID) + So(getResp.PackageID, ShouldEqual, createResp.Package.PackageID) So(getResp.Package.Name, ShouldEqual, pkgName) - err = cli.PackageDelete(PackageDeleteReq{ - UserID: 0, - PackageID: upResp.PackageID, + err = cli.Package().Delete(PackageDelete{ + UserID: 1, + PackageID: createResp.Package.PackageID, }) So(err, ShouldBeNil) }) @@ -69,17 +77,27 @@ func Test_Object(t *testing.T) { } nodeAff := NodeID(2) - upResp, err := cli.PackageUpload(PackageUploadReq{ - UserID: 0, - BucketID: 1, - Name: uuid.NewString(), - NodeAffinity: &nodeAff, + + pkgName := uuid.NewString() + createResp, err := cli.Package().Create(PackageCreate{ + UserID: 1, + BucketID: 1, + Name: pkgName, + }) + So(err, ShouldBeNil) + + _, err = cli.Object().Upload(ObjectUpload{ + ObjectUploadInfo: ObjectUploadInfo{ + UserID: 1, + PackageID: createResp.Package.PackageID, + NodeAffinity: &nodeAff, + }, Files: iterator.Array( - &IterPackageUploadFile{ + &UploadingObject{ Path: "test", File: io.NopCloser(bytes.NewBuffer(fileData)), }, - &IterPackageUploadFile{ + &UploadingObject{ Path: "test2", File: io.NopCloser(bytes.NewBuffer(fileData)), }, @@ -88,7 +106,7 @@ func Test_Object(t *testing.T) { So(err, ShouldBeNil) // downFs, err := cli.ObjectDownload(ObjectDownloadReq{ - // UserID: 0, + // UserID: 1, // ObjectID: upResp.ObjectID, // }) // So(err, ShouldBeNil) @@ -98,9 +116,9 @@ func Test_Object(t *testing.T) { // So(downFileData, ShouldResemble, fileData) // downFs.Close() - err = cli.PackageDelete(PackageDeleteReq{ - UserID: 0, - PackageID: upResp.PackageID, + err = cli.Package().Delete(PackageDelete{ + UserID: 1, + PackageID: createResp.Package.PackageID, }) So(err, ShouldBeNil) }) @@ -117,16 +135,25 @@ func Test_Storage(t *testing.T) { fileData[i] = byte(i) } - upResp, err := cli.PackageUpload(PackageUploadReq{ - UserID: 0, + pkgName := uuid.NewString() + createResp, err := cli.Package().Create(PackageCreate{ + UserID: 1, BucketID: 1, - Name: uuid.NewString(), + Name: pkgName, + }) + So(err, ShouldBeNil) + + _, err = cli.Object().Upload(ObjectUpload{ + ObjectUploadInfo: ObjectUploadInfo{ + UserID: 1, + PackageID: createResp.Package.PackageID, + }, Files: iterator.Array( - &IterPackageUploadFile{ + &UploadingObject{ Path: "test", File: io.NopCloser(bytes.NewBuffer(fileData)), }, - &IterPackageUploadFile{ + &UploadingObject{ Path: "test2", File: io.NopCloser(bytes.NewBuffer(fileData)), }, @@ -135,15 +162,15 @@ func Test_Storage(t *testing.T) { So(err, ShouldBeNil) _, err = cli.StorageLoadPackage(StorageLoadPackageReq{ - UserID: 0, - PackageID: upResp.PackageID, + UserID: 1, + PackageID: createResp.Package.PackageID, StorageID: 1, }) So(err, ShouldBeNil) - err = cli.PackageDelete(PackageDeleteReq{ - UserID: 0, - PackageID: upResp.PackageID, + err = cli.Package().Delete(PackageDelete{ + UserID: 1, + PackageID: createResp.Package.PackageID, }) So(err, ShouldBeNil) }) @@ -160,16 +187,25 @@ func Test_Cache(t *testing.T) { fileData[i] = byte(i) } - upResp, err := cli.PackageUpload(PackageUploadReq{ - UserID: 0, + pkgName := uuid.NewString() + createResp, err := cli.Package().Create(PackageCreate{ + UserID: 1, BucketID: 1, - Name: uuid.NewString(), + Name: pkgName, + }) + So(err, ShouldBeNil) + + _, err = cli.Object().Upload(ObjectUpload{ + ObjectUploadInfo: ObjectUploadInfo{ + UserID: 1, + PackageID: createResp.Package.PackageID, + }, Files: iterator.Array( - &IterPackageUploadFile{ + &UploadingObject{ Path: "test.txt", File: io.NopCloser(bytes.NewBuffer(fileData)), }, - &IterPackageUploadFile{ + &UploadingObject{ Path: "test2.txt", File: io.NopCloser(bytes.NewBuffer(fileData)), }, @@ -178,15 +214,15 @@ func Test_Cache(t *testing.T) { So(err, ShouldBeNil) _, err = cli.CacheMovePackage(CacheMovePackageReq{ - UserID: 0, - PackageID: upResp.PackageID, + UserID: 1, + PackageID: createResp.Package.PackageID, NodeID: 1, }) So(err, ShouldBeNil) - err = cli.PackageDelete(PackageDeleteReq{ - UserID: 0, - PackageID: upResp.PackageID, + err = cli.Package().Delete(PackageDelete{ + UserID: 1, + PackageID: createResp.Package.PackageID, }) So(err, ShouldBeNil) }) @@ -197,16 +233,16 @@ func Test_GetNodeInfos(t *testing.T) { cli := NewClient(&Config{ URL: "http://localhost:7890", }) - resp1, err := cli.PackageGetCachedNodes(PackageGetCachedNodesReq{ + resp1, err := cli.Package().GetCachedNodes(PackageGetCachedNodesReq{ PackageID: 11, - UserID: 0, + UserID: 1, }) So(err, ShouldBeNil) fmt.Printf("resp1: %v\n", resp1) - resp2, err := cli.PackageGetLoadedNodes(PackageGetLoadedNodesReq{ + resp2, err := cli.Package().GetLoadedNodes(PackageGetLoadedNodesReq{ PackageID: 11, - UserID: 0, + UserID: 1, }) So(err, ShouldBeNil) fmt.Printf("resp2: %v\n", resp2) diff --git a/sdks/storage/utils.go b/sdks/storage/utils.go index 89cb5a2..b8f0fa0 100644 --- a/sdks/storage/utils.go +++ b/sdks/storage/utils.go @@ -1,7 +1,38 @@ package cdssdk -import "path/filepath" +import ( + "fmt" + "io" + "net/http" + "path/filepath" + "strings" + + myhttp "gitlink.org.cn/cloudream/common/utils/http" + "gitlink.org.cn/cloudream/common/utils/math2" + "gitlink.org.cn/cloudream/common/utils/serder" +) func MakeIPFSFilePath(fileHash string) string { return filepath.Join("ipfs", fileHash) } + +func ParseJSONResponse[TBody any](resp *http.Response) (TBody, error) { + var ret TBody + contType := resp.Header.Get("Content-Type") + if strings.Contains(contType, myhttp.ContentTypeJSON) { + var err error + if ret, err = serder.JSONToObjectStreamEx[TBody](resp.Body); err != nil { + return ret, fmt.Errorf("parsing response: %w", err) + } + + return ret, nil + } + + cont, err := io.ReadAll(resp.Body) + if err != nil { + return ret, fmt.Errorf("unknow response content type: %s, status: %d", contType, resp.StatusCode) + } + strCont := string(cont) + + return ret, fmt.Errorf("unknow response content type: %s, status: %d, body(prefix): %s", contType, resp.StatusCode, strCont[:math2.Min(len(strCont), 200)]) +} diff --git a/sdks/unifyops/unifyops.go b/sdks/unifyops/unifyops.go index 97e64a7..64bb7ad 100644 --- a/sdks/unifyops/unifyops.go +++ b/sdks/unifyops/unifyops.go @@ -223,41 +223,127 @@ func (c *Client) GetMemoryData(node GetOneResourceDataReq) (*MemoryResourceData, } func (c *Client) GetIndicatorData(node GetOneResourceDataReq) (*[]ResourceData, error) { - url, err := url.JoinPath(c.baseURL, "/cmdb/resApi/getIndicatorData") - if err != nil { - return nil, err + //url, err := url.JoinPath(c.baseURL, "/cmdb/resApi/getIndicatorData") + //if err != nil { + // return nil, err + //} + //resp, err := myhttp.PostJSON(url, myhttp.RequestParam{ + // Body: node, + //}) + //if err != nil { + // return nil, err + //} + // + //contType := resp.Header.Get("Content-Type") + //if strings.Contains(contType, myhttp.ContentTypeJSON) { + // + // var codeResp response[[]map[string]any] + // if err := serder.JSONToObjectStream(resp.Body, &codeResp); err != nil { + // return nil, fmt.Errorf("parsing response: %w", err) + // } + // + // if codeResp.Code != CORRECT_CODE { + // return nil, codeResp.ToError() + // } + // + // var ret []ResourceData + // for _, mp := range codeResp.Data { + // var data ResourceData + // err := serder.MapToObject(mp, &data) + // if err != nil { + // return nil, err + // } + // ret = append(ret, data) + // } + // + // return &ret, nil + //} + // + //return nil, fmt.Errorf("unknow response content type: %s", contType) + + return mockData() +} + +func mockData() (*[]ResourceData, error) { + var ret []ResourceData + + cpuResourceData := CPUResourceData{ + Name: ResourceTypeCPU, + Total: UnitValue[int64]{ + Value: 100, + Unit: "", + }, + Available: UnitValue[int64]{ + Value: 100, + Unit: "", + }, } - resp, err := myhttp.PostJSON(url, myhttp.RequestParam{ - Body: node, - }) - if err != nil { - return nil, err + ret = append(ret, &cpuResourceData) + + npuResourceData := NPUResourceData{ + Name: ResourceTypeNPU, + Total: UnitValue[int64]{ + Value: 100, + Unit: "", + }, + Available: UnitValue[int64]{ + Value: 100, + Unit: "", + }, } - - contType := resp.Header.Get("Content-Type") - if strings.Contains(contType, myhttp.ContentTypeJSON) { - - var codeResp response[[]map[string]any] - if err := serder.JSONToObjectStream(resp.Body, &codeResp); err != nil { - return nil, fmt.Errorf("parsing response: %w", err) - } - - if codeResp.Code != CORRECT_CODE { - return nil, codeResp.ToError() - } - - var ret []ResourceData - for _, mp := range codeResp.Data { - var data ResourceData - err := serder.MapToObject(mp, &data) - if err != nil { - return nil, err - } - ret = append(ret, data) - } - - return &ret, nil + ret = append(ret, &npuResourceData) + + gpuResourceData := GPUResourceData{ + Name: ResourceTypeGPU, + Total: UnitValue[int64]{ + Value: 100, + Unit: "", + }, + Available: UnitValue[int64]{ + Value: 100, + Unit: "", + }, + } + ret = append(ret, &gpuResourceData) + + mluResourceData := MLUResourceData{ + Name: ResourceTypeMLU, + Total: UnitValue[int64]{ + Value: 100, + Unit: "", + }, + Available: UnitValue[int64]{ + Value: 100, + Unit: "", + }, } + ret = append(ret, &mluResourceData) + + storageResourceData := StorageResourceData{ + Name: ResourceTypeStorage, + Total: UnitValue[float64]{ + Value: 100, + Unit: "GB", + }, + Available: UnitValue[float64]{ + Value: 100, + Unit: "GB", + }, + } + ret = append(ret, &storageResourceData) + + memoryResourceData := MemoryResourceData{ + Name: ResourceTypeMemory, + Total: UnitValue[float64]{ + Value: 100, + Unit: "GB", + }, + Available: UnitValue[float64]{ + Value: 100, + Unit: "GB", + }, + } + ret = append(ret, &memoryResourceData) - return nil, fmt.Errorf("unknow response content type: %s", contType) + return &ret, nil } diff --git a/utils/config/config.go b/utils/config/config.go index 7a9890b..0f4cb04 100644 --- a/utils/config/config.go +++ b/utils/config/config.go @@ -3,10 +3,9 @@ package config import ( "encoding/json" "fmt" + "github.com/imdario/mergo" "os" "path/filepath" - - "github.com/imdario/mergo" ) // Load 加载配置文件 diff --git a/utils/http/http.go b/utils/http/http.go index 463bd37..d496daa 100644 --- a/utils/http/http.go +++ b/utils/http/http.go @@ -4,13 +4,17 @@ import ( "bytes" "fmt" "io" + "mime" "mime/multipart" "net/http" + "net/textproto" ul "net/url" + "reflect" "strings" + "github.com/mitchellh/mapstructure" "gitlink.org.cn/cloudream/common/pkgs/iterator" - "gitlink.org.cn/cloudream/common/utils/math" + "gitlink.org.cn/cloudream/common/utils/math2" "gitlink.org.cn/cloudream/common/utils/serder" ) @@ -128,7 +132,119 @@ func ParseJSONResponse[TBody any](resp *http.Response) (TBody, error) { } strCont := string(cont) - return ret, fmt.Errorf("unknow response content type: %s, status: %d, body(prefix): %s", contType, resp.StatusCode, strCont[:math.Min(len(strCont), 200)]) + return ret, fmt.Errorf("unknow response content type: %s, status: %d, body(prefix): %s", contType, resp.StatusCode, strCont[:math2.Min(len(strCont), 200)]) +} + +type MultiPartFile struct { + FieldName string + FileName string + File io.ReadCloser + Header textproto.MIMEHeader +} + +type multiPartFileIterator struct { + mr *multipart.Reader + firstFile *multipart.Part +} + +func (m *multiPartFileIterator) MoveNext() (*MultiPartFile, error) { + if m.firstFile != nil { + f := m.firstFile + m.firstFile = nil + + fileName, err := ul.PathUnescape(f.FileName()) + if err != nil { + return nil, fmt.Errorf("unescape file name: %w", err) + } + + return &MultiPartFile{ + FieldName: f.FormName(), + FileName: fileName, + File: f, + Header: f.Header, + }, nil + } + + for { + part, err := m.mr.NextPart() + if err == io.EOF { + return nil, iterator.ErrNoMoreItem + } + if err != nil { + return nil, err + } + + fileName, err := ul.PathUnescape(part.FileName()) + if err != nil { + return nil, fmt.Errorf("unescape file name: %w", err) + } + + if part.FileName() != "" { + return &MultiPartFile{ + FieldName: part.FormName(), + FileName: fileName, + File: part, + Header: part.Header, + }, nil + } + } +} + +func (m *multiPartFileIterator) Close() {} + +// 解析multipart/form-data响应,只支持form参数在头部的情况 +func ParseMultiPartResponse(resp *http.Response) (map[string]string, iterator.Iterator[*MultiPartFile], error) { + mtype, params, err := mime.ParseMediaType(resp.Header.Get("Content-Type")) + if err != nil { + return nil, nil, fmt.Errorf("parse content type: %w", err) + } + + if mtype != ContentTypeMultiPart { + return nil, nil, fmt.Errorf("unknow content type: %s", mtype) + } + + boundary := params["boundary"] + if boundary == "" { + return nil, nil, fmt.Errorf("no boundary in content type: %s", mtype) + } + + formValues := make(map[string]string) + rd := multipart.NewReader(resp.Body, boundary) + + var firstFile *multipart.Part + for { + part, err := rd.NextPart() + if err == io.EOF { + return formValues, iterator.Empty[*MultiPartFile](), nil + } + if err != nil { + return nil, nil, err + } + + formName := part.FormName() + fileName := part.FileName() + + if formName == "" { + continue + } + + if fileName != "" { + firstFile = part + break + } + + data, err := io.ReadAll(part) + if err != nil { + return nil, nil, err + } + + formValues[formName] = string(data) + } + + return formValues, &multiPartFileIterator{ + mr: rd, + firstFile: firstFile, + }, nil } type MultiPartRequestParam struct { @@ -171,13 +287,13 @@ func PostMultiPart(url string, param MultiPartRequestParam) (*http.Response, err defer muWriter.Close() if param.Form != nil { - mp, err := serder.ObjectToMap(param.Form) + mp, err := objectToStringMap(param.Form) if err != nil { return fmt.Errorf("formValues object to map failed, err: %w", err) } for k, v := range mp { - err := muWriter.WriteField(k, fmt.Sprintf("%v", v)) + err := muWriter.WriteField(k, v) if err != nil { return fmt.Errorf("write form field failed, err: %w", err) } @@ -196,7 +312,7 @@ func PostMultiPart(url string, param MultiPartRequestParam) (*http.Response, err err = func() error { defer file.File.Close() - w, err := muWriter.CreateFormFile(file.FieldName, file.FileName) + w, err := muWriter.CreateFormFile(file.FieldName, ul.PathEscape(file.FileName)) if err != nil { return fmt.Errorf("create form file failed, err: %w", err) } @@ -237,17 +353,17 @@ func prepareQuery(req *http.Request, query any) error { return nil } - mp, ok := query.(map[string]any) + mp, ok := query.(map[string]string) if !ok { var err error - if mp, err = serder.ObjectToMap(query); err != nil { + if mp, err = objectToStringMap(query); err != nil { return fmt.Errorf("query object to map: %w", err) } } values := make(ul.Values) for k, v := range mp { - values.Add(k, fmt.Sprintf("%v", v)) + values.Add(k, v) } req.URL.RawQuery = values.Encode() @@ -259,17 +375,17 @@ func prepareHeader(req *http.Request, header any) error { return nil } - mp, ok := header.(map[string]any) + mp, ok := header.(map[string]string) if !ok { var err error - if mp, err = serder.ObjectToMap(header); err != nil { + if mp, err = objectToStringMap(header); err != nil { return fmt.Errorf("header object to map: %w", err) } } req.Header = make(http.Header) for k, v := range mp { - req.Header.Set(k, fmt.Sprintf("%v", v)) + req.Header.Set(k, v) } return nil } @@ -298,17 +414,17 @@ func prepareFormBody(req *http.Request, body any) error { return nil } - mp, ok := body.(map[string]any) + mp, ok := body.(map[string]string) if !ok { var err error - if mp, err = serder.ObjectToMap(body); err != nil { + if mp, err = objectToStringMap(body); err != nil { return fmt.Errorf("body object to map: %w", err) } } values := make(ul.Values) for k, v := range mp { - values.Add(k, fmt.Sprintf("%v", v)) + values.Add(k, v) } data := values.Encode() @@ -334,3 +450,40 @@ func setValue(values ul.Values, key, value string) ul.Values { values.Add(key, value) return values } + +func objectToStringMap(obj any) (map[string]string, error) { + anyMap := make(map[string]any) + dec, err := mapstructure.NewDecoder(&mapstructure.DecoderConfig{ + TagName: "json", + Result: &anyMap, + WeaklyTypedInput: true, + }) + if err != nil { + return nil, err + } + + err = dec.Decode(obj) + if err != nil { + return nil, err + } + + ret := make(map[string]string) + for k, v := range anyMap { + val := reflect.ValueOf(v) + for val.Kind() == reflect.Ptr { + if val.IsNil() { + break + } else { + val = val.Elem() + } + } + + if val.Kind() == reflect.Pointer { + ret[k] = "" + } else { + ret[k] = fmt.Sprintf("%v", val) + } + } + + return ret, nil +} diff --git a/utils/http/http_test.go b/utils/http/http_test.go new file mode 100644 index 0000000..2776275 --- /dev/null +++ b/utils/http/http_test.go @@ -0,0 +1,32 @@ +package http + +import ( + "testing" + + . "github.com/smartystreets/goconvey/convey" +) + +func Test_objectToStringMap(t *testing.T) { + Convey("包含指针", t, func() { + type A struct { + Val *int `json:"Val,omitempty"` + Nil *int `json:"Nil,omitempty"` + Omit *int `json:"Omit"` + } + + v := 10 + a := A{ + Val: &v, + Nil: nil, + Omit: nil, + } + + mp, err := objectToStringMap(a) + So(err, ShouldBeNil) + + So(mp, ShouldResemble, map[string]string{ + "Val": "10", + "Omit": "", + }) + }) +} diff --git a/utils/io/binary.go b/utils/io2/binary.go similarity index 99% rename from utils/io/binary.go rename to utils/io2/binary.go index a5a400d..51372cd 100644 --- a/utils/io/binary.go +++ b/utils/io2/binary.go @@ -1,4 +1,4 @@ -package io +package io2 import ( "bufio" diff --git a/utils/io/chunked_split.go b/utils/io2/chunked_split.go similarity index 99% rename from utils/io/chunked_split.go rename to utils/io2/chunked_split.go index ba5a7e1..b6606c5 100644 --- a/utils/io/chunked_split.go +++ b/utils/io2/chunked_split.go @@ -1,4 +1,4 @@ -package io +package io2 import ( "fmt" diff --git a/utils/io/chunked_split_test.go b/utils/io2/chunked_split_test.go similarity index 99% rename from utils/io/chunked_split_test.go rename to utils/io2/chunked_split_test.go index 8be2d3f..437c67b 100644 --- a/utils/io/chunked_split_test.go +++ b/utils/io2/chunked_split_test.go @@ -1,4 +1,4 @@ -package io +package io2 import ( "bytes" diff --git a/utils/io/clone.go b/utils/io2/clone.go similarity index 98% rename from utils/io/clone.go rename to utils/io2/clone.go index b1cf556..3faacaf 100644 --- a/utils/io/clone.go +++ b/utils/io2/clone.go @@ -1,4 +1,4 @@ -package io +package io2 import ( "io" diff --git a/utils/io/io.go b/utils/io2/io.go similarity index 99% rename from utils/io/io.go rename to utils/io2/io.go index 186c70b..89a95e8 100644 --- a/utils/io/io.go +++ b/utils/io2/io.go @@ -1,4 +1,4 @@ -package io +package io2 import "io" diff --git a/utils/io/io_test.go b/utils/io2/io_test.go similarity index 99% rename from utils/io/io_test.go rename to utils/io2/io_test.go index a4eab09..279dbaf 100644 --- a/utils/io/io_test.go +++ b/utils/io2/io_test.go @@ -1,4 +1,4 @@ -package io +package io2 import ( "bytes" diff --git a/utils/io/join.go b/utils/io2/join.go similarity index 86% rename from utils/io/join.go rename to utils/io2/join.go index 09c294e..c7d46ef 100644 --- a/utils/io/join.go +++ b/utils/io2/join.go @@ -1,10 +1,10 @@ -package io +package io2 import ( "io" - "gitlink.org.cn/cloudream/common/utils/lo" - "gitlink.org.cn/cloudream/common/utils/math" + "gitlink.org.cn/cloudream/common/utils/lo2" + "gitlink.org.cn/cloudream/common/utils/math2" ) func Join(strs []io.Reader) io.ReadCloser { @@ -69,7 +69,7 @@ func (s *chunkedJoin) Read(buf []byte) (int, error) { return 0, io.EOF } - bufLen := math.Min(math.Min(s.chunkSize, len(buf)), s.chunkSize-s.currentRead) + bufLen := math2.Min(math2.Min(s.chunkSize, len(buf)), s.chunkSize-s.currentRead) rd, err := s.inputs[s.currentInput].Read(buf[:bufLen]) if err == nil { s.currentRead += rd @@ -81,7 +81,7 @@ func (s *chunkedJoin) Read(buf []byte) (int, error) { } if err == io.EOF { - s.inputs = lo.RemoveAt(s.inputs, s.currentInput) + s.inputs = lo2.RemoveAt(s.inputs, s.currentInput) // 此处不需要+1 if len(s.inputs) > 0 { s.currentInput = s.currentInput % len(s.inputs) diff --git a/utils/io/length.go b/utils/io2/length.go similarity index 88% rename from utils/io/length.go rename to utils/io2/length.go index d86781c..9cb0696 100644 --- a/utils/io/length.go +++ b/utils/io2/length.go @@ -1,9 +1,9 @@ -package io +package io2 import ( "io" - "gitlink.org.cn/cloudream/common/utils/math" + "gitlink.org.cn/cloudream/common/utils/math2" ) type lengthStream struct { @@ -19,7 +19,7 @@ func (s *lengthStream) Read(buf []byte) (int, error) { return 0, s.err } - bufLen := math.Min(s.length-s.readLength, int64(len(buf))) + bufLen := math2.Min(s.length-s.readLength, int64(len(buf))) rd, err := s.src.Read(buf[:bufLen]) if err == nil { s.readLength += int64(rd) diff --git a/utils/io/zero.go b/utils/io2/zero.go similarity index 95% rename from utils/io/zero.go rename to utils/io2/zero.go index ddd3a3f..39bea3a 100644 --- a/utils/io/zero.go +++ b/utils/io2/zero.go @@ -1,4 +1,4 @@ -package io +package io2 import "io" diff --git a/utils/lo/lo.go b/utils/lo2/lo.go similarity index 96% rename from utils/lo/lo.go rename to utils/lo2/lo.go index 1852af3..f399ed7 100644 --- a/utils/lo/lo.go +++ b/utils/lo2/lo.go @@ -1,4 +1,4 @@ -package lo +package lo2 import "github.com/samber/lo" diff --git a/utils/lo/lo_test.go b/utils/lo2/lo_test.go similarity index 99% rename from utils/lo/lo_test.go rename to utils/lo2/lo_test.go index b6ff303..9182cb7 100644 --- a/utils/lo/lo_test.go +++ b/utils/lo2/lo_test.go @@ -1,4 +1,4 @@ -package lo +package lo2 import ( "testing" diff --git a/utils/math/math.go b/utils/math2/math.go similarity index 93% rename from utils/math/math.go rename to utils/math2/math.go index ca04819..2ee7c4f 100644 --- a/utils/math/math.go +++ b/utils/math2/math.go @@ -1,4 +1,4 @@ -package math +package math2 import "golang.org/x/exp/constraints" diff --git a/utils/reflect/reflect.go b/utils/reflect2/reflect.go similarity index 96% rename from utils/reflect/reflect.go rename to utils/reflect2/reflect.go index cf98e68..92237c2 100644 --- a/utils/reflect/reflect.go +++ b/utils/reflect2/reflect.go @@ -1,4 +1,4 @@ -package reflect +package reflect2 import "reflect" diff --git a/utils/serder/any_to_any.go b/utils/serder/any_to_any.go index 8fe3889..85ea4fe 100644 --- a/utils/serder/any_to_any.go +++ b/utils/serder/any_to_any.go @@ -4,7 +4,7 @@ import ( "reflect" mp "github.com/mitchellh/mapstructure" - myreflect "gitlink.org.cn/cloudream/common/utils/reflect" + "gitlink.org.cn/cloudream/common/utils/reflect2" ) type Converter func(from reflect.Value, to reflect.Value) (interface{}, error) @@ -68,11 +68,11 @@ func AnyToAny(src any, dst any, opts ...AnyToAnyOption) error { // fromAny 如果目的字段实现的FromAny接口,那么通过此接口实现字段类型转换 func fromAny(srcType reflect.Type, targetType reflect.Type, data interface{}) (interface{}, error) { - if myreflect.TypeOfValue(data) == targetType { + if reflect2.TypeOfValue(data) == targetType { return data, nil } - if targetType.Implements(myreflect.TypeOf[FromAny]()) { + if targetType.Implements(reflect2.TypeOf[FromAny]()) { // 非pointer receiver的FromAny没有意义,因为修改不了receiver的内容,所以这里只支持指针类型 if targetType.Kind() == reflect.Pointer { val := reflect.New(targetType.Elem()) @@ -88,7 +88,7 @@ func fromAny(srcType reflect.Type, targetType reflect.Type, data interface{}) (i return val.Interface(), nil } - } else if reflect.PointerTo(targetType).Implements(myreflect.TypeOf[FromAny]()) { + } else if reflect.PointerTo(targetType).Implements(reflect2.TypeOf[FromAny]()) { val := reflect.New(targetType) anyIf := val.Interface().(FromAny) ok, err := anyIf.FromAny(data) @@ -107,12 +107,12 @@ func fromAny(srcType reflect.Type, targetType reflect.Type, data interface{}) (i // 如果源字段实现了ToAny接口,那么通过此接口实现字段类型转换 func toAny(srcType reflect.Type, targetType reflect.Type, data interface{}) (interface{}, error) { - dataType := myreflect.TypeOfValue(data) + dataType := reflect2.TypeOfValue(data) if dataType == targetType { return data, nil } - if dataType.Implements(myreflect.TypeOf[ToAny]()) { + if dataType.Implements(reflect2.TypeOf[ToAny]()) { anyIf := data.(ToAny) dstVal, ok, err := anyIf.ToAny(targetType) if err != nil { @@ -123,7 +123,7 @@ func toAny(srcType reflect.Type, targetType reflect.Type, data interface{}) (int } return dstVal, nil - } else if reflect.PointerTo(dataType).Implements(myreflect.TypeOf[ToAny]()) { + } else if reflect.PointerTo(dataType).Implements(reflect2.TypeOf[ToAny]()) { dataVal := reflect.ValueOf(data) dataPtrVal := reflect.New(dataType) diff --git a/utils/serder/serder.go b/utils/serder/serder.go index 911e8ae..6c7b72a 100644 --- a/utils/serder/serder.go +++ b/utils/serder/serder.go @@ -6,9 +6,9 @@ import ( "fmt" "io" "reflect" - "strings" jsoniter "github.com/json-iterator/go" + "github.com/mitchellh/mapstructure" ) var unionHandler = UnionHandler{ @@ -53,6 +53,18 @@ func JSONToObjectEx[T any](data []byte) (T, error) { return ret, nil } +// 将JSON字符串转为对象。支持TypeUnion。 +func JSONToObjectStreamEx[T any](stream io.Reader) (T, error) { + var ret T + dec := defaultAPI.NewDecoder(stream) + err := dec.Decode(&ret) + if err != nil { + return ret, err + } + + return ret, nil +} + // 将对象转为JSON字符串。如果需要支持解析TypeUnion类型,则使用"Ex"结尾的同名函数。 func ObjectToJSON(obj any) ([]byte, error) { return json.Marshal(obj) @@ -151,78 +163,13 @@ func MapToObject(m map[string]any, obj any, opt ...MapToObjectOption) error { } func ObjectToMap(obj any) (map[string]any, error) { - ctx := WalkValue(obj, func(ctx *WalkContext, event WalkEvent) WalkingOp { - switch e := event.(type) { - case StructBeginEvent: - mp := make(map[string]any) - ctx.StackPush(mp) - - case StructArriveFieldEvent: - if !WillWalkInto(e.Value) { - ctx.StackPush(e.Value.Interface()) - } - case StructLeaveFieldEvent: - val := ctx.StackPop() - mp := ctx.StackPeek().(map[string]any) - jsonTag := e.Info.Tag.Get("json") - if jsonTag == "-" { - break - } - - opts := strings.Split(jsonTag, ",") - keyName := opts[0] - if keyName == "" { - keyName = e.Info.Name - } - - if contains(opts, "string", 1) { - val = fmt.Sprintf("%v", val) - } - - mp[keyName] = val - - case StructEndEvent: - - case MapBeginEvent: - ctx.StackPush(make(map[string]any)) - case MapArriveEntryEvent: - if !WillWalkInto(e.Value) { - ctx.StackPush(e.Value.Interface()) - } - case MapLeaveEntryEvent: - val := ctx.StackPop() - mp := ctx.StackPeek().(map[string]any) - mp[fmt.Sprintf("%v", e.Key)] = val - case MapEndEvent: - - case ArrayBeginEvent: - ctx.StackPush(make([]any, e.Value.Len())) - case ArrayArriveElementEvent: - if !WillWalkInto(e.Value) { - ctx.StackPush(e.Value.Interface()) - } - case ArrayLeaveElementEvent: - val := ctx.StackPop() - arr := ctx.StackPeek().([]any) - arr[e.Index] = val - case ArrayEndEvent: - } - - return Next - - }, WalkOption{ - StackValues: []any{make(map[string]any)}, + mp := make(map[string]any) + dec, err := mapstructure.NewDecoder(&mapstructure.DecoderConfig{ + TagName: "json", + Result: &mp, }) - - return ctx.StackPop().(map[string]any), nil -} - -func contains(arr []string, ele string, startIndex int) bool { - for i := startIndex; i < len(arr); i++ { - if arr[i] == ele { - return true - } + if err != nil { + return nil, err } - - return false + return mp, dec.Decode(obj) } diff --git a/utils/serder/serder_test.go b/utils/serder/serder_test.go index e2c4b8c..4914531 100644 --- a/utils/serder/serder_test.go +++ b/utils/serder/serder_test.go @@ -7,7 +7,7 @@ import ( . "github.com/smartystreets/goconvey/convey" "gitlink.org.cn/cloudream/common/pkgs/types" - myreflect "gitlink.org.cn/cloudream/common/utils/reflect" + "gitlink.org.cn/cloudream/common/utils/reflect2" ) type FromAnyString struct { @@ -28,7 +28,7 @@ type ToAnyString struct { } func (a *ToAnyString) ToAny(typ reflect.Type) (val any, ok bool, err error) { - if typ == myreflect.TypeOf[map[string]any]() { + if typ == reflect2.TypeOf[map[string]any]() { return map[string]any{ "str": "@" + a.Str, }, true, nil @@ -55,7 +55,7 @@ type ToAnySt struct { } func (a *ToAnySt) ToAny(typ reflect.Type) (val any, ok bool, err error) { - if typ == myreflect.TypeOf[FromAnySt]() { + if typ == reflect2.TypeOf[FromAnySt]() { return FromAnySt{ Value: "To:" + a.Value, }, true, nil @@ -69,7 +69,7 @@ type DirToAnySt struct { } func (a DirToAnySt) ToAny(typ reflect.Type) (val any, ok bool, err error) { - if typ == myreflect.TypeOf[FromAnySt]() { + if typ == reflect2.TypeOf[FromAnySt]() { return FromAnySt{ Value: "DirTo:" + a.Value, }, true, nil @@ -181,7 +181,7 @@ func Test_AnyToAny(t *testing.T) { err := AnyToAny(st1, &st2, AnyToAnyOption{ Converters: []Converter{func(from reflect.Value, to reflect.Value) (interface{}, error) { - if from.Type() == myreflect.TypeOf[Struct1]() && to.Type() == myreflect.TypeOf[Struct2]() { + if from.Type() == reflect2.TypeOf[Struct1]() && to.Type() == reflect2.TypeOf[Struct2]() { s1 := from.Interface().(Struct1) return Struct2{ Value: "@" + s1.Value, diff --git a/utils/serder/union_handler.go b/utils/serder/union_handler.go index bef1dd0..ce4ac3e 100644 --- a/utils/serder/union_handler.go +++ b/utils/serder/union_handler.go @@ -9,7 +9,7 @@ import ( "github.com/modern-go/reflect2" "gitlink.org.cn/cloudream/common/pkgs/types" - myreflect "gitlink.org.cn/cloudream/common/utils/reflect" + ref2 "gitlink.org.cn/cloudream/common/utils/reflect2" ) type anyTypeUnionExternallyTagged struct { @@ -106,14 +106,14 @@ func (u *TypeUnionInternallyTagged[T]) Add(typ reflect.Type) error { } // 要求内嵌Metadata结构体,那么结构体中的字段名就会是Metadata, - field, ok := structType.FieldByName(myreflect.TypeNameOf[Metadata]()) + field, ok := structType.FieldByName(ref2.TypeNameOf[Metadata]()) if !ok { u.TagToType[makeDerefFullTypeName(structType)] = typ return nil } // 为防同名,检查类型是不是也是Metadata - if field.Type != myreflect.TypeOf[Metadata]() { + if field.Type != ref2.TypeOf[Metadata]() { u.TagToType[makeDerefFullTypeName(structType)] = typ return nil } @@ -293,7 +293,7 @@ func (e *ExternallyTaggedEncoder) Encode(ptr unsafe.Pointer, stream *jsoniter.St } stream.WriteObjectStart() - valType := myreflect.TypeOfValue(val) + valType := ref2.TypeOfValue(val) if !e.union.Union.Include(valType) { stream.Error = fmt.Errorf("type %v is not in union %v", valType, e.union.Union.UnionType) return diff --git a/utils/serder/walk_test.go b/utils/serder/walk_test.go index 91f0671..de926ed 100644 --- a/utils/serder/walk_test.go +++ b/utils/serder/walk_test.go @@ -6,7 +6,7 @@ import ( "testing" . "github.com/smartystreets/goconvey/convey" - myreflect "gitlink.org.cn/cloudream/common/utils/reflect" + "gitlink.org.cn/cloudream/common/utils/reflect2" ) func Test_WalkValue(t *testing.T) { @@ -38,8 +38,8 @@ func Test_WalkValue(t *testing.T) { isBaseDataType := func(val reflect.Value) bool { typ := val.Type() - return typ == myreflect.TypeOf[int]() || typ == myreflect.TypeOf[bool]() || - typ == myreflect.TypeOf[string]() || typ == myreflect.TypeOf[float32]() || val.IsZero() + return typ == reflect2.TypeOf[int]() || typ == reflect2.TypeOf[bool]() || + typ == reflect2.TypeOf[string]() || typ == reflect2.TypeOf[float32]() || val.IsZero() } toString := func(val any) string { diff --git a/utils/sort/sort.go b/utils/sort2/sort.go similarity index 78% rename from utils/sort/sort.go rename to utils/sort2/sort.go index f469e0b..9967fa8 100644 --- a/utils/sort/sort.go +++ b/utils/sort2/sort.go @@ -1,4 +1,4 @@ -package sort +package sort2 import ( "sort" @@ -36,6 +36,14 @@ func Sort[T any](arr []T, cmp Comparer[T]) []T { return arr } +func SortAsc[T constraints.Ordered](arr []T) []T { + return Sort(arr, Cmp[T]) +} + +func SortDesc[T constraints.Ordered](arr []T) []T { + return Sort(arr, func(left, right T) int { return Cmp(right, left) }) +} + // false < true func CmpBool(left, right bool) int { leftVal := 0 diff --git a/utils/sync2/channel.go b/utils/sync2/channel.go new file mode 100644 index 0000000..50b3aa8 --- /dev/null +++ b/utils/sync2/channel.go @@ -0,0 +1,80 @@ +package sync2 + +import ( + "context" + "errors" + "sync" +) + +var ErrChannelClosed = errors.New("channel is closed") + +type Channel[T any] struct { + ch chan T + closed chan any + closeOnce sync.Once + err error +} + +func NewChannel[T any]() *Channel[T] { + return &Channel[T]{ + ch: make(chan T), + closed: make(chan any), + } +} + +func (c *Channel[T]) Error() error { + return c.err +} + +func (c *Channel[T]) Send(val T) error { + select { + case c.ch <- val: + return nil + case <-c.closed: + return c.err + } +} + +func (c *Channel[T]) Receive(ctx context.Context) (T, error) { + select { + case val := <-c.ch: + return val, nil + case <-c.closed: + var t T + return t, c.err + case <-ctx.Done(): + var t T + return t, ctx.Err() + } +} + +// 获取channel的发送端,需要与Closed一起使用,防止错过关闭信号 +func (c *Channel[T]) Sender() chan<- T { + return c.ch +} + +// 获取channel的接收端,需要与Closed一起使用,防止错过关闭信号 +func (c *Channel[T]) Receiver() <-chan T { + return c.ch +} + +// 获取channel的关闭信号,用于通知接收端和发送端关闭 +func (c *Channel[T]) Closed() <-chan any { + return c.closed +} + +// 关闭channel。注:此操作不会关闭Sender和Receiver返回的channel +func (c *Channel[T]) Close() { + c.closeOnce.Do(func() { + close(c.closed) + c.err = ErrChannelClosed + }) +} + +// 关闭channel并设置error。注:此操作不会关闭Sender和Receiver返回的channel +func (c *Channel[T]) CloseWithError(err error) { + c.closeOnce.Do(func() { + close(c.closed) + c.err = err + }) +} diff --git a/utils/sync/counter_cond.go b/utils/sync2/counter_cond.go similarity index 97% rename from utils/sync/counter_cond.go rename to utils/sync2/counter_cond.go index e0995d2..8da29e2 100644 --- a/utils/sync/counter_cond.go +++ b/utils/sync2/counter_cond.go @@ -1,4 +1,4 @@ -package sync +package sync2 import "sync" diff --git a/utils/sync/safe_channel.go b/utils/sync2/safe_channel.go similarity index 98% rename from utils/sync/safe_channel.go rename to utils/sync2/safe_channel.go index 04a791e..e1edbff 100644 --- a/utils/sync/safe_channel.go +++ b/utils/sync2/safe_channel.go @@ -1,4 +1,4 @@ -package sync +package sync2 import "context" diff --git a/utils/sync2/select_set.go b/utils/sync2/select_set.go new file mode 100644 index 0000000..592c28b --- /dev/null +++ b/utils/sync2/select_set.go @@ -0,0 +1,48 @@ +package sync2 + +import ( + "reflect" + + "gitlink.org.cn/cloudream/common/utils/lo2" +) + +type SelectCase int + +type SelectSet[T any, C any] struct { + cases []reflect.SelectCase + tags []T +} + +func (s *SelectSet[T, C]) Add(tag T, ch <-chan C) SelectCase { + s.cases = append(s.cases, reflect.SelectCase{Dir: reflect.SelectRecv, Chan: reflect.ValueOf(ch)}) + s.tags = append(s.tags, tag) + + return SelectCase(len(s.cases) - 1) +} + +func (s *SelectSet[T, C]) AddDefault(tag T, ch <-chan C) SelectCase { + s.cases = append(s.cases, reflect.SelectCase{Dir: reflect.SelectDefault, Chan: reflect.ValueOf(ch)}) + s.tags = append(s.tags, tag) + + return SelectCase(len(s.cases) - 1) +} + +func (s *SelectSet[T, C]) Remove(caze SelectCase) { + s.cases = lo2.RemoveAt(s.cases, int(caze)) + s.tags = lo2.RemoveAt(s.tags, int(caze)) +} + +func (s *SelectSet[T, C]) Select() (T, C, bool) { + chosen, recv, ok := reflect.Select(s.cases) + if !ok { + var t T + var c C + return t, c, false + } + + return s.tags[chosen], recv.Interface().(C), true +} + +func (s *SelectSet[T, C]) Count() int { + return len(s.cases) +} diff --git a/utils/sync2/sync2.go b/utils/sync2/sync2.go new file mode 100644 index 0000000..c6ab32f --- /dev/null +++ b/utils/sync2/sync2.go @@ -0,0 +1,20 @@ +package sync2 + +import "sync" + +func ParallelDo[T any](args []T, fn func(val T, index int) error) error { + var err error + var wg sync.WaitGroup + wg.Add(len(args)) + for i, arg := range args { + go func(arg T, index int) { + defer wg.Done() + + if e := fn(arg, index); e != nil { + err = e + } + }(arg, i) + } + wg.Wait() + return err +} diff --git a/utils/time2/measurement.go b/utils/time2/measurement.go new file mode 100644 index 0000000..f5f77c9 --- /dev/null +++ b/utils/time2/measurement.go @@ -0,0 +1,113 @@ +package time2 + +import ( + "fmt" + "path" + "runtime" + "strings" + "time" +) + +type Measurement struct { + startTime time.Time + lastPointTime time.Time + printer func(string) + on bool + title string +} + +func NewMeasurement(printer func(string)) Measurement { + return Measurement{ + printer: printer, + } +} + +func (m *Measurement) Begin(on bool, title ...string) { + if m == nil { + return + } + + m.on = on + m.title = strings.Join(title, ".") + + if on { + m.startTime = time.Now() + m.lastPointTime = m.startTime + + _, file, line, ok := runtime.Caller(1) + + titlePart := "" + if m.title != "" { + titlePart = fmt.Sprintf(":%s", m.title) + } + + if ok { + m.printer(fmt.Sprintf("[BEGIN%v]<%v:%v>", titlePart, path.Base(file), line)) + } else { + m.printer(fmt.Sprintf("[BEGIN%v]", titlePart)) + } + } +} + +func (m *Measurement) Point(desc ...string) { + if m == nil { + return + } + + if m.on { + m.printer(m.makePointString(strings.Join(desc, "."))) + } +} + +func (m *Measurement) makePointString(desc string) string { + last := m.lastPointTime + now := time.Now() + m.lastPointTime = now + + _, file, line, ok := runtime.Caller(2) + + titlePart := "" + if m.title != "" { + titlePart = fmt.Sprintf("(%s)", m.title) + } + + if desc != "" { + desc = fmt.Sprintf("@%s", desc) + } + + if ok { + return fmt.Sprintf("%v {%v/%v} %v<%v:%v>", titlePart, now.Sub(last), now.Sub(m.startTime), desc, path.Base(file), line) + } + + return fmt.Sprintf("{%v/%v}%v", now.Sub(last), now.Sub(m.startTime), desc) +} + +func (m *Measurement) End(descs ...string) { + if m == nil { + return + } + + if m.on { + last := m.lastPointTime + now := time.Now() + m.lastPointTime = now + + _, file, line, ok := runtime.Caller(1) + + titlePart := "" + if m.title != "" { + titlePart = fmt.Sprintf(":%s", m.title) + } + + desc := strings.Join(descs, ".") + if desc != "" { + desc = fmt.Sprintf("@%s", desc) + } + + if ok { + m.printer(fmt.Sprintf("[END%v] {%v/%v} %v<%v:%v>", titlePart, now.Sub(last), now.Sub(m.startTime), desc, path.Base(file), line)) + } else { + m.printer(fmt.Sprintf("[END%v] {%v/%v} %v", titlePart, now.Sub(last), now.Sub(m.startTime), desc)) + } + } +}