Browse Source

Merge branch 'master' into feature_sjc_debug

feature_sjc_debug
Sydonian 1 year ago
parent
commit
3a29857996
61 changed files with 1477 additions and 404 deletions
  1. +1
    -0
      consts/errorcode/error_code.go
  2. +1
    -1
      go.mod
  3. +3
    -3
      pkgs/actor/actor.go
  4. +7
    -5
      pkgs/cmdtrie/command_trie.go
  5. +3
    -3
      pkgs/distlock/internal/acquire_actor.go
  6. +2
    -2
      pkgs/distlock/internal/service_info_actor.go
  7. +3
    -3
      pkgs/event/executor.go
  8. +4
    -4
      pkgs/ipfs/ipfs.go
  9. +15
    -0
      pkgs/iterator/empty.go
  10. +18
    -0
      pkgs/iterator/fuse.go
  11. +2
    -2
      pkgs/logger/global_logger.go
  12. +9
    -9
      pkgs/mq/client.go
  13. +2
    -2
      pkgs/mq/message.go
  14. +7
    -7
      pkgs/mq/message_dispatcher.go
  15. +4
    -4
      pkgs/mq/message_test.go
  16. +3
    -3
      pkgs/task/manager.go
  17. +2
    -2
      pkgs/task/task.go
  18. +6
    -6
      pkgs/typedispatcher/type_dispatcher.go
  19. +6
    -6
      pkgs/types/union.go
  20. +1
    -1
      sdks/imfs/imfs_test.go
  21. +1
    -1
      sdks/pcm/models.go
  22. +34
    -8
      sdks/scheduler/models.go
  23. +1
    -1
      sdks/scheduler/scheduler_test.go
  24. +156
    -0
      sdks/storage/bucket.go
  25. +2
    -2
      sdks/storage/cache.go
  26. +16
    -1
      sdks/storage/models.go
  27. +1
    -1
      sdks/storage/node.go
  28. +244
    -14
      sdks/storage/object.go
  29. +137
    -87
      sdks/storage/package.go
  30. +22
    -15
      sdks/storage/storage.go
  31. +83
    -47
      sdks/storage/storage_test.go
  32. +32
    -1
      sdks/storage/utils.go
  33. +119
    -33
      sdks/unifyops/unifyops.go
  34. +1
    -2
      utils/config/config.go
  35. +167
    -14
      utils/http/http.go
  36. +32
    -0
      utils/http/http_test.go
  37. +1
    -1
      utils/io2/binary.go
  38. +1
    -1
      utils/io2/chunked_split.go
  39. +1
    -1
      utils/io2/chunked_split_test.go
  40. +1
    -1
      utils/io2/clone.go
  41. +1
    -1
      utils/io2/io.go
  42. +1
    -1
      utils/io2/io_test.go
  43. +5
    -5
      utils/io2/join.go
  44. +3
    -3
      utils/io2/length.go
  45. +1
    -1
      utils/io2/zero.go
  46. +1
    -1
      utils/lo2/lo.go
  47. +1
    -1
      utils/lo2/lo_test.go
  48. +1
    -1
      utils/math2/math.go
  49. +1
    -1
      utils/reflect2/reflect.go
  50. +7
    -7
      utils/serder/any_to_any.go
  51. +20
    -73
      utils/serder/serder.go
  52. +5
    -5
      utils/serder/serder_test.go
  53. +4
    -4
      utils/serder/union_handler.go
  54. +3
    -3
      utils/serder/walk_test.go
  55. +9
    -1
      utils/sort2/sort.go
  56. +80
    -0
      utils/sync2/channel.go
  57. +1
    -1
      utils/sync2/counter_cond.go
  58. +1
    -1
      utils/sync2/safe_channel.go
  59. +48
    -0
      utils/sync2/select_set.go
  60. +20
    -0
      utils/sync2/sync2.go
  61. +113
    -0
      utils/time2/measurement.go

+ 1
- 0
consts/errorcode/error_code.go View File

@@ -3,6 +3,7 @@ package errorcode
const ( const (
OK = "OK" OK = "OK"
OperationFailed = "OperationFailed" OperationFailed = "OperationFailed"
DataNotFound = "DataNotFound"
BadArgument = "BadArgument" BadArgument = "BadArgument"
TaskNotFound = "TaskNotFound" TaskNotFound = "TaskNotFound"
) )

+ 1
- 1
go.mod View File

@@ -11,6 +11,7 @@ require (
github.com/json-iterator/go v1.1.12 github.com/json-iterator/go v1.1.12
github.com/magefile/mage v1.15.0 github.com/magefile/mage v1.15.0
github.com/mitchellh/mapstructure v1.5.0 github.com/mitchellh/mapstructure v1.5.0
github.com/modern-go/reflect2 v1.0.2
github.com/otiai10/copy v1.12.0 github.com/otiai10/copy v1.12.0
github.com/samber/lo v1.36.0 github.com/samber/lo v1.36.0
github.com/sirupsen/logrus v1.9.2 github.com/sirupsen/logrus v1.9.2
@@ -41,7 +42,6 @@ require (
github.com/minio/sha256-simd v1.0.0 // indirect github.com/minio/sha256-simd v1.0.0 // indirect
github.com/mitchellh/go-homedir v1.1.0 // indirect github.com/mitchellh/go-homedir v1.1.0 // indirect
github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421 // indirect github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421 // indirect
github.com/modern-go/reflect2 v1.0.2 // indirect
github.com/mr-tron/base58 v1.2.0 // indirect github.com/mr-tron/base58 v1.2.0 // indirect
github.com/multiformats/go-base32 v0.1.0 // indirect github.com/multiformats/go-base32 v0.1.0 // indirect
github.com/multiformats/go-base36 v0.2.0 // indirect github.com/multiformats/go-base36 v0.2.0 // indirect


+ 3
- 3
pkgs/actor/actor.go View File

@@ -8,7 +8,7 @@ import (
"github.com/zyedidia/generic/list" "github.com/zyedidia/generic/list"


"gitlink.org.cn/cloudream/common/pkgs/future" "gitlink.org.cn/cloudream/common/pkgs/future"
mysync "gitlink.org.cn/cloudream/common/utils/sync"
"gitlink.org.cn/cloudream/common/utils/sync2"
) )


type CommandFn func() type CommandFn func()
@@ -16,7 +16,7 @@ type CommandFn func()
type CommandChannel struct { type CommandChannel struct {
cmds *list.List[CommandFn] cmds *list.List[CommandFn]
cmdsLock sync.Mutex cmdsLock sync.Mutex
cmdsCounter *mysync.CounterCond
cmdsCounter *sync2.CounterCond


chanReceive chan CommandFn chanReceive chan CommandFn
chanReceiveDone atomic.Bool chanReceiveDone atomic.Bool
@@ -28,7 +28,7 @@ type CommandChannel struct {
func NewCommandChannel() *CommandChannel { func NewCommandChannel() *CommandChannel {
return &CommandChannel{ return &CommandChannel{
cmds: list.New[CommandFn](), cmds: list.New[CommandFn](),
cmdsCounter: mysync.NewCounterCond(0),
cmdsCounter: sync2.NewCounterCond(0),
cmdChan: make(chan CommandFn), cmdChan: make(chan CommandFn),
} }
} }


+ 7
- 5
pkgs/cmdtrie/command_trie.go View File

@@ -7,9 +7,11 @@ import (


"github.com/samber/lo" "github.com/samber/lo"
"gitlink.org.cn/cloudream/common/pkgs/trie" "gitlink.org.cn/cloudream/common/pkgs/trie"
myreflect "gitlink.org.cn/cloudream/common/utils/reflect"
"gitlink.org.cn/cloudream/common/utils/reflect2"
) )


var ErrCommandNotFound = fmt.Errorf("command not found")

type ExecuteOption struct { type ExecuteOption struct {
ReplaceEmptyArrayWithNil bool // 如果最后一个参数是空数组,则调用命令的时候传递nil参数 ReplaceEmptyArrayWithNil bool // 如果最后一个参数是空数组,则调用命令的时候传递nil参数
} }
@@ -178,7 +180,7 @@ func (t *anyCommandTrie) findCommand(cmdWords []string, argWords []string) (*com
}) })


if cmd == nil { if cmd == nil {
return nil, nil, fmt.Errorf("command not found")
return nil, nil, ErrCommandNotFound
} }
return cmd, argWords, nil return cmd, argWords, nil
} }
@@ -296,7 +298,7 @@ type CommandTrie[TCtx any, TRet any] struct {


func NewCommandTrie[TCtx any, TRet any]() CommandTrie[TCtx, TRet] { func NewCommandTrie[TCtx any, TRet any]() CommandTrie[TCtx, TRet] {
return CommandTrie[TCtx, TRet]{ return CommandTrie[TCtx, TRet]{
anyTrie: newAnyCommandTrie(myreflect.TypeOf[TCtx](), myreflect.TypeOf[TRet]()),
anyTrie: newAnyCommandTrie(reflect2.TypeOf[TCtx](), reflect2.TypeOf[TRet]()),
} }
} }


@@ -337,7 +339,7 @@ type VoidCommandTrie[TCtx any] struct {


func NewVoidCommandTrie[TCtx any]() VoidCommandTrie[TCtx] { func NewVoidCommandTrie[TCtx any]() VoidCommandTrie[TCtx] {
return VoidCommandTrie[TCtx]{ return VoidCommandTrie[TCtx]{
anyTrie: newAnyCommandTrie(myreflect.TypeOf[TCtx](), nil),
anyTrie: newAnyCommandTrie(reflect2.TypeOf[TCtx](), nil),
} }
} }


@@ -368,7 +370,7 @@ type StaticCommandTrie[TRet any] struct {


func NewStaticCommandTrie[TRet any]() StaticCommandTrie[TRet] { func NewStaticCommandTrie[TRet any]() StaticCommandTrie[TRet] {
return StaticCommandTrie[TRet]{ return StaticCommandTrie[TRet]{
anyTrie: newAnyCommandTrie(nil, myreflect.TypeOf[TRet]()),
anyTrie: newAnyCommandTrie(nil, reflect2.TypeOf[TRet]()),
} }
} }




+ 3
- 3
pkgs/distlock/internal/acquire_actor.go View File

@@ -10,7 +10,7 @@ import (


"gitlink.org.cn/cloudream/common/pkgs/future" "gitlink.org.cn/cloudream/common/pkgs/future"
"gitlink.org.cn/cloudream/common/pkgs/logger" "gitlink.org.cn/cloudream/common/pkgs/logger"
mylo "gitlink.org.cn/cloudream/common/utils/lo"
"gitlink.org.cn/cloudream/common/utils/lo2"
"gitlink.org.cn/cloudream/common/utils/serder" "gitlink.org.cn/cloudream/common/utils/serder"
clientv3 "go.etcd.io/etcd/client/v3" clientv3 "go.etcd.io/etcd/client/v3"
"go.etcd.io/etcd/client/v3/concurrency" "go.etcd.io/etcd/client/v3/concurrency"
@@ -84,7 +84,7 @@ func (a *AcquireActor) Acquire(ctx context.Context, req LockRequest) (string, er
return return
} }


a.acquirings = mylo.Remove(a.acquirings, info)
a.acquirings = lo2.Remove(a.acquirings, info)
if info.LastErr != nil { if info.LastErr != nil {
info.Callback.SetError(info.LastErr) info.Callback.SetError(info.LastErr)
} else { } else {
@@ -213,7 +213,7 @@ func (a *AcquireActor) doAcquiring() error {
} }


req.Callback.SetValue(reqData.ID) req.Callback.SetValue(reqData.ID)
a.acquirings = mylo.RemoveAt(a.acquirings, i)
a.acquirings = lo2.RemoveAt(a.acquirings, i)
break break
} }




+ 2
- 2
pkgs/distlock/internal/service_info_actor.go View File

@@ -8,7 +8,7 @@ import (


"github.com/google/uuid" "github.com/google/uuid"
"gitlink.org.cn/cloudream/common/pkgs/logger" "gitlink.org.cn/cloudream/common/pkgs/logger"
mylo "gitlink.org.cn/cloudream/common/utils/lo"
"gitlink.org.cn/cloudream/common/utils/lo2"
"gitlink.org.cn/cloudream/common/utils/serder" "gitlink.org.cn/cloudream/common/utils/serder"
clientv3 "go.etcd.io/etcd/client/v3" clientv3 "go.etcd.io/etcd/client/v3"
) )
@@ -191,6 +191,6 @@ func (a *ServiceInfoActor) OnLockRequestEvent(evt LockRequestEvent) {
if evt.IsLocking { if evt.IsLocking {
status.LockRequestIDs = append(status.LockRequestIDs, evt.Data.ID) status.LockRequestIDs = append(status.LockRequestIDs, evt.Data.ID)
} else { } else {
status.LockRequestIDs = mylo.Remove(status.LockRequestIDs, evt.Data.ID)
status.LockRequestIDs = lo2.Remove(status.LockRequestIDs, evt.Data.ID)
} }
} }

+ 3
- 3
pkgs/event/executor.go View File

@@ -4,7 +4,7 @@ import (
"sync" "sync"


"github.com/zyedidia/generic/list" "github.com/zyedidia/generic/list"
mysync "gitlink.org.cn/cloudream/common/utils/sync"
"gitlink.org.cn/cloudream/common/utils/sync2"
) )


type ExecuteOption struct { type ExecuteOption struct {
@@ -26,7 +26,7 @@ type postedEvent[TArgs any] struct {
type Executor[TArgs any] struct { type Executor[TArgs any] struct {
events *list.List[postedEvent[TArgs]] events *list.List[postedEvent[TArgs]]
locker sync.Mutex locker sync.Mutex
eventCond *mysync.CounterCond
eventCond *sync2.CounterCond
execArgs TArgs execArgs TArgs
} }


@@ -34,7 +34,7 @@ func NewExecutor[TArgs any](args TArgs) Executor[TArgs] {
return Executor[TArgs]{ return Executor[TArgs]{
events: list.New[postedEvent[TArgs]](), events: list.New[postedEvent[TArgs]](),
locker: sync.Mutex{}, locker: sync.Mutex{},
eventCond: mysync.NewCounterCond(0),
eventCond: sync2.NewCounterCond(0),
execArgs: args, execArgs: args,
} }




+ 4
- 4
pkgs/ipfs/ipfs.go View File

@@ -6,12 +6,12 @@ import (
"io" "io"


shell "github.com/ipfs/go-ipfs-api" shell "github.com/ipfs/go-ipfs-api"
myio "gitlink.org.cn/cloudream/common/utils/io"
"gitlink.org.cn/cloudream/common/utils/io2"
) )


type ReadOption struct { type ReadOption struct {
Offset int64 // 从指定位置开始读取,为-1时代表不设置,从头开始读
Length int64 // 读取长度,为-1时代表不设置,读取Offset之后的所有内容
Offset int64 `json:"offset,string"` // 从指定位置开始读取,为-1时代表不设置,从头开始读
Length int64 `json:"length,string"` // 读取长度,为-1时代表不设置,读取Offset之后的所有内容
} }


type Client struct { type Client struct {
@@ -35,7 +35,7 @@ func (fs *Client) IsUp() bool {
return fs.shell.IsUp() return fs.shell.IsUp()
} }


func (fs *Client) CreateFileStream() (myio.PromiseWriteCloser[string], error) {
func (fs *Client) CreateFileStream() (io2.PromiseWriteCloser[string], error) {
pr, pw := io.Pipe() pr, pw := io.Pipe()


ipfsWriter := ipfsWriter{ ipfsWriter := ipfsWriter{


+ 15
- 0
pkgs/iterator/empty.go View File

@@ -0,0 +1,15 @@
package iterator

type emptyIterator[T any] struct{}

func (i *emptyIterator[T]) MoveNext() (T, error) {
var ret T
return ret, ErrNoMoreItem
}
func (i *emptyIterator[T]) Close() {

}

func Empty[T any]() Iterator[T] {
return &emptyIterator[T]{}
}

+ 18
- 0
pkgs/iterator/fuse.go View File

@@ -0,0 +1,18 @@
package iterator

type fuseError[T any] struct {
err error
}

func (i *fuseError[T]) MoveNext() (T, error) {
var ret T
return ret, i.err
}
func (i *fuseError[T]) Close() {

}
func FuseError[T any](err error) Iterator[T] {
return &fuseError[T]{
err: err,
}
}

+ 2
- 2
pkgs/logger/global_logger.go View File

@@ -8,7 +8,7 @@ import (


nested "github.com/antonfisher/nested-logrus-formatter" nested "github.com/antonfisher/nested-logrus-formatter"
"github.com/sirupsen/logrus" "github.com/sirupsen/logrus"
myreflect "gitlink.org.cn/cloudream/common/utils/reflect"
"gitlink.org.cn/cloudream/common/utils/reflect2"
) )


// 输出日志到标准输出。适用于没有设计好日志输出方案时临时使用。 // 输出日志到标准输出。适用于没有设计好日志输出方案时临时使用。
@@ -123,6 +123,6 @@ func WithField(key string, val any) Logger {


func WithType[T any](key string) Logger { func WithType[T any](key string) Logger {
return &logrusLogger{ return &logrusLogger{
entry: logrus.WithField(key, myreflect.TypeOf[T]().Name()),
entry: logrus.WithField(key, reflect2.TypeOf[T]().Name()),
} }
} }

+ 9
- 9
pkgs/mq/client.go View File

@@ -10,7 +10,7 @@ import (
"github.com/streadway/amqp" "github.com/streadway/amqp"
"gitlink.org.cn/cloudream/common/consts/errorcode" "gitlink.org.cn/cloudream/common/consts/errorcode"
"gitlink.org.cn/cloudream/common/pkgs/logger" "gitlink.org.cn/cloudream/common/pkgs/logger"
myreflect "gitlink.org.cn/cloudream/common/utils/reflect"
"gitlink.org.cn/cloudream/common/utils/reflect2"
) )


const ( const (
@@ -22,12 +22,12 @@ const (
var ErrWaitResponseTimeout = fmt.Errorf("wait response timeout") var ErrWaitResponseTimeout = fmt.Errorf("wait response timeout")


type CodeMessageError struct { type CodeMessageError struct {
code string
message string
Code string
Message string
} }


func (e *CodeMessageError) Error() string { func (e *CodeMessageError) Error() string {
return fmt.Sprintf("code: %s, message: %s", e.code, e.message)
return fmt.Sprintf("code: %s, message: %s", e.Code, e.Message)
} }


type SendOption struct { type SendOption struct {
@@ -300,7 +300,7 @@ func (c *RabbitMQTransport) Close() error {


// 发送消息并等待回应。因为无法自动推断出TResp的类型,所以将其放在第一个手工填写,之后的TBody可以自动推断出来 // 发送消息并等待回应。因为无法自动推断出TResp的类型,所以将其放在第一个手工填写,之后的TBody可以自动推断出来
func Request[TSvc any, TReq MessageBody, TResp MessageBody](_ func(svc TSvc, msg TReq) (TResp, *CodeMessage), cli RoundTripper, req TReq, opts ...RequestOption) (TResp, error) { func Request[TSvc any, TReq MessageBody, TResp MessageBody](_ func(svc TSvc, msg TReq) (TResp, *CodeMessage), cli RoundTripper, req TReq, opts ...RequestOption) (TResp, error) {
opt := RequestOption{Timeout: time.Second * 15}
opt := RequestOption{Timeout: time.Second * 15, KeepAlive: true}
if len(opts) > 0 { if len(opts) > 0 {
opt = opts[0] opt = opts[0]
} }
@@ -315,16 +315,16 @@ func Request[TSvc any, TReq MessageBody, TResp MessageBody](_ func(svc TSvc, msg
errCode, errMsg := resp.GetCodeMessage() errCode, errMsg := resp.GetCodeMessage()
if errCode != errorcode.OK { if errCode != errorcode.OK {
return defRet, &CodeMessageError{ return defRet, &CodeMessageError{
code: errCode,
message: errMsg,
Code: errCode,
Message: errMsg,
} }
} }


respBody, ok := resp.Body.(TResp) respBody, ok := resp.Body.(TResp)
if !ok { if !ok {
return defRet, fmt.Errorf("expect a %s body, but got %s", return defRet, fmt.Errorf("expect a %s body, but got %s",
myreflect.ElemTypeOf[TResp]().Name(),
myreflect.TypeOfValue(resp.Body).Name())
reflect2.ElemTypeOf[TResp]().Name(),
reflect2.TypeOfValue(resp.Body).Name())
} }


return respBody, nil return respBody, nil


+ 2
- 2
pkgs/mq/message.go View File

@@ -2,7 +2,7 @@ package mq


import ( import (
"gitlink.org.cn/cloudream/common/pkgs/types" "gitlink.org.cn/cloudream/common/pkgs/types"
myreflect "gitlink.org.cn/cloudream/common/utils/reflect"
"gitlink.org.cn/cloudream/common/utils/reflect2"
"gitlink.org.cn/cloudream/common/utils/serder" "gitlink.org.cn/cloudream/common/utils/serder"
) )


@@ -83,7 +83,7 @@ var msgBodyTypeUnion = serder.UseTypeUnionExternallyTagged(types.Ref(types.NewTy


// 所有新定义的Message都需要在init中调用此函数 // 所有新定义的Message都需要在init中调用此函数
func RegisterMessage[T MessageBody]() { func RegisterMessage[T MessageBody]() {
err := msgBodyTypeUnion.Add(myreflect.TypeOf[T]())
err := msgBodyTypeUnion.Add(reflect2.TypeOf[T]())
if err != nil { if err != nil {
panic(err) panic(err)
} }


+ 7
- 7
pkgs/mq/message_dispatcher.go View File

@@ -3,27 +3,27 @@ package mq
import ( import (
"fmt" "fmt"


myreflect "gitlink.org.cn/cloudream/common/utils/reflect"
"gitlink.org.cn/cloudream/common/utils/reflect2"
) )


type HandlerFn func(svcBase any, msg *Message) (*Message, error) type HandlerFn func(svcBase any, msg *Message) (*Message, error)


type MessageDispatcher struct { type MessageDispatcher struct {
Handlers map[myreflect.Type]HandlerFn
Handlers map[reflect2.Type]HandlerFn
} }


func NewMessageDispatcher() MessageDispatcher { func NewMessageDispatcher() MessageDispatcher {
return MessageDispatcher{ return MessageDispatcher{
Handlers: make(map[myreflect.Type]HandlerFn),
Handlers: make(map[reflect2.Type]HandlerFn),
} }
} }


func (h *MessageDispatcher) Add(typ myreflect.Type, handler HandlerFn) {
func (h *MessageDispatcher) Add(typ reflect2.Type, handler HandlerFn) {
h.Handlers[typ] = handler h.Handlers[typ] = handler
} }


func (h *MessageDispatcher) Handle(svcBase any, msg *Message) (*Message, error) { func (h *MessageDispatcher) Handle(svcBase any, msg *Message) (*Message, error) {
typ := myreflect.TypeOfValue(msg.Body)
typ := reflect2.TypeOfValue(msg.Body)
fn, ok := h.Handlers[typ] fn, ok := h.Handlers[typ]
if !ok { if !ok {
return nil, fmt.Errorf("unsupported message type: %s", typ.String()) return nil, fmt.Errorf("unsupported message type: %s", typ.String())
@@ -34,7 +34,7 @@ func (h *MessageDispatcher) Handle(svcBase any, msg *Message) (*Message, error)


// 将Service中的一个接口函数作为指定类型消息的处理函数 // 将Service中的一个接口函数作为指定类型消息的处理函数
func AddServiceFn[TSvc any, TReq MessageBody, TResp MessageBody](dispatcher *MessageDispatcher, svcFn func(svc TSvc, msg TReq) (TResp, *CodeMessage)) { func AddServiceFn[TSvc any, TReq MessageBody, TResp MessageBody](dispatcher *MessageDispatcher, svcFn func(svc TSvc, msg TReq) (TResp, *CodeMessage)) {
dispatcher.Add(myreflect.TypeOf[TReq](), func(svcBase any, reqMsg *Message) (*Message, error) {
dispatcher.Add(reflect2.TypeOf[TReq](), func(svcBase any, reqMsg *Message) (*Message, error) {


reqMsgBody := reqMsg.Body.(TReq) reqMsgBody := reqMsg.Body.(TReq)
ret, codeMsg := svcFn(svcBase.(TSvc), reqMsgBody) ret, codeMsg := svcFn(svcBase.(TSvc), reqMsgBody)
@@ -47,7 +47,7 @@ func AddServiceFn[TSvc any, TReq MessageBody, TResp MessageBody](dispatcher *Mes


// 将Service中的一个*没有返回值的*接口函数作为指定类型消息的处理函数 // 将Service中的一个*没有返回值的*接口函数作为指定类型消息的处理函数
func AddNoRespServiceFn[TSvc any, TReq MessageBody](dispatcher *MessageDispatcher, svcFn func(svc TSvc, msg TReq)) { func AddNoRespServiceFn[TSvc any, TReq MessageBody](dispatcher *MessageDispatcher, svcFn func(svc TSvc, msg TReq)) {
dispatcher.Add(myreflect.TypeOf[TReq](), func(svcBase any, reqMsg *Message) (*Message, error) {
dispatcher.Add(reflect2.TypeOf[TReq](), func(svcBase any, reqMsg *Message) (*Message, error) {


reqMsgBody := reqMsg.Body.(TReq) reqMsgBody := reqMsg.Body.(TReq)
svcFn(svcBase.(TSvc), reqMsgBody) svcFn(svcBase.(TSvc), reqMsgBody)


+ 4
- 4
pkgs/mq/message_test.go View File

@@ -9,7 +9,7 @@ import (
jsoniter "github.com/json-iterator/go" jsoniter "github.com/json-iterator/go"
. "github.com/smartystreets/goconvey/convey" . "github.com/smartystreets/goconvey/convey"
"gitlink.org.cn/cloudream/common/pkgs/types" "gitlink.org.cn/cloudream/common/pkgs/types"
myreflect "gitlink.org.cn/cloudream/common/utils/reflect"
"gitlink.org.cn/cloudream/common/utils/reflect2"
"gitlink.org.cn/cloudream/common/utils/serder" "gitlink.org.cn/cloudream/common/utils/serder"
) )


@@ -42,13 +42,13 @@ func TestMessage(t *testing.T) {
Nil: nil, Nil: nil,
} }


jsoniter.RegisterTypeEncoderFunc(myreflect.TypeOf[MyAny]().String(),
jsoniter.RegisterTypeEncoderFunc(reflect2.TypeOf[MyAny]().String(),
func(ptr unsafe.Pointer, stream *jsoniter.Stream) { func(ptr unsafe.Pointer, stream *jsoniter.Stream) {
val := *((*MyAny)(ptr)) val := *((*MyAny)(ptr))


stream.WriteArrayStart() stream.WriteArrayStart()
if val != nil { if val != nil {
stream.WriteString(myreflect.TypeOfValue(val).String())
stream.WriteString(reflect2.TypeOfValue(val).String())
stream.WriteRaw(",") stream.WriteRaw(",")
stream.WriteVal(val) stream.WriteVal(val)
} }
@@ -58,7 +58,7 @@ func TestMessage(t *testing.T) {
return false return false
}) })


jsoniter.RegisterTypeDecoderFunc(myreflect.TypeOf[MyAny]().String(),
jsoniter.RegisterTypeDecoderFunc(reflect2.TypeOf[MyAny]().String(),
func(ptr unsafe.Pointer, iter *jsoniter.Iterator) { func(ptr unsafe.Pointer, iter *jsoniter.Iterator) {
vp := (*MyAny)(ptr) vp := (*MyAny)(ptr)




+ 3
- 3
pkgs/task/manager.go View File

@@ -5,7 +5,7 @@ import (
"sync" "sync"
"time" "time"


mylo "gitlink.org.cn/cloudream/common/utils/lo"
"gitlink.org.cn/cloudream/common/utils/lo2"
) )


type Manager[TCtx any] struct { type Manager[TCtx any] struct {
@@ -108,12 +108,12 @@ func (m *Manager[TCtx]) executeTask(task *Task[TCtx]) {


// 立刻删除任务,或者延迟一段时间再删除 // 立刻删除任务,或者延迟一段时间再删除
if opt.RemovingDelay == 0 { if opt.RemovingDelay == 0 {
m.tasks = mylo.Remove(m.tasks, task)
m.tasks = lo2.Remove(m.tasks, task)
} else { } else {
go func() { go func() {
<-time.After(opt.RemovingDelay) <-time.After(opt.RemovingDelay)
m.lock.Lock() m.lock.Lock()
m.tasks = mylo.Remove(m.tasks, task)
m.tasks = lo2.Remove(m.tasks, task)
m.lock.Unlock() m.lock.Unlock()
}() }()
} }


+ 2
- 2
pkgs/task/task.go View File

@@ -5,7 +5,7 @@ import (
"sync/atomic" "sync/atomic"
"time" "time"


mylo "gitlink.org.cn/cloudream/common/utils/lo"
"gitlink.org.cn/cloudream/common/utils/lo2"
) )


type CompleteOption struct { type CompleteOption struct {
@@ -82,7 +82,7 @@ func (t *Task[TCtx]) WaitTimeout(timeout time.Duration) bool {
select { select {
case <-time.After(timeout): case <-time.After(timeout):
t.waiterLock.Lock() t.waiterLock.Lock()
t.waiters = mylo.Remove(t.waiters, waiter)
t.waiters = lo2.Remove(t.waiters, waiter)
t.waiterLock.Unlock() t.waiterLock.Unlock()


return false return false


+ 6
- 6
pkgs/typedispatcher/type_dispatcher.go View File

@@ -1,28 +1,28 @@
package typedispatcher package typedispatcher


import ( import (
myreflect "gitlink.org.cn/cloudream/common/utils/reflect"
"gitlink.org.cn/cloudream/common/utils/reflect2"
) )


type HandlerFn[TRet any] func(val any) TRet type HandlerFn[TRet any] func(val any) TRet


type TypeDispatcher[TRet any] struct { type TypeDispatcher[TRet any] struct {
handlers map[myreflect.Type]HandlerFn[TRet]
handlers map[reflect2.Type]HandlerFn[TRet]
} }


func NewTypeDispatcher[TRet any]() TypeDispatcher[TRet] { func NewTypeDispatcher[TRet any]() TypeDispatcher[TRet] {
return TypeDispatcher[TRet]{ return TypeDispatcher[TRet]{
handlers: make(map[myreflect.Type]HandlerFn[TRet]),
handlers: make(map[reflect2.Type]HandlerFn[TRet]),
} }
} }


func (t *TypeDispatcher[TRet]) Add(typ myreflect.Type, fn HandlerFn[TRet]) {
func (t *TypeDispatcher[TRet]) Add(typ reflect2.Type, fn HandlerFn[TRet]) {
t.handlers[typ] = fn t.handlers[typ] = fn
} }


func (t *TypeDispatcher[TRet]) Dispatch(val any) (TRet, bool) { func (t *TypeDispatcher[TRet]) Dispatch(val any) (TRet, bool) {
var ret TRet var ret TRet
typ := myreflect.TypeOfValue(val)
typ := reflect2.TypeOfValue(val)
handler, ok := t.handlers[typ] handler, ok := t.handlers[typ]
if !ok { if !ok {
return ret, false return ret, false
@@ -32,7 +32,7 @@ func (t *TypeDispatcher[TRet]) Dispatch(val any) (TRet, bool) {
} }


func Add[T any, TRet any](dispatcher TypeDispatcher[TRet], handler func(val T) TRet) { func Add[T any, TRet any](dispatcher TypeDispatcher[TRet], handler func(val T) TRet) {
dispatcher.Add(myreflect.TypeOf[T](), func(val any) TRet {
dispatcher.Add(reflect2.TypeOf[T](), func(val any) TRet {
return handler(val.(T)) return handler(val.(T))
}) })
} }

+ 6
- 6
pkgs/types/union.go View File

@@ -4,17 +4,17 @@ import (
"fmt" "fmt"
"reflect" "reflect"


myreflect "gitlink.org.cn/cloudream/common/utils/reflect"
"gitlink.org.cn/cloudream/common/utils/reflect2"
) )


type AnyTypeUnion struct { type AnyTypeUnion struct {
// 这个集合的类型 // 这个集合的类型
UnionType myreflect.Type
UnionType reflect2.Type
// 集合中包含的类型,即遇到UnionType类型的值时,它内部的实际类型的范围 // 集合中包含的类型,即遇到UnionType类型的值时,它内部的实际类型的范围
ElementTypes []myreflect.Type
ElementTypes []reflect2.Type
} }


func (u *AnyTypeUnion) Include(typ myreflect.Type) bool {
func (u *AnyTypeUnion) Include(typ reflect2.Type) bool {
for _, t := range u.ElementTypes { for _, t := range u.ElementTypes {
if t == typ { if t == typ {
return true return true
@@ -24,7 +24,7 @@ func (u *AnyTypeUnion) Include(typ myreflect.Type) bool {
return false return false
} }


func (u *AnyTypeUnion) Add(typ myreflect.Type) error {
func (u *AnyTypeUnion) Add(typ reflect2.Type) error {
if !typ.AssignableTo(u.UnionType) { if !typ.AssignableTo(u.UnionType) {
return fmt.Errorf("type is not assignable to union type") return fmt.Errorf("type is not assignable to union type")
} }
@@ -55,7 +55,7 @@ func NewTypeUnion[TU any](eleValues ...TU) TypeUnion[TU] {


return TypeUnion[TU]{ return TypeUnion[TU]{
AnyTypeUnion{ AnyTypeUnion{
UnionType: myreflect.TypeOf[TU](),
UnionType: reflect2.TypeOf[TU](),
ElementTypes: eleTypes, ElementTypes: eleTypes,
}, },
} }


+ 1
- 1
sdks/imfs/imfs_test.go View File

@@ -32,7 +32,7 @@ func Test_Package(t *testing.T) {
URL: "http://localhost:7893", URL: "http://localhost:7893",
}) })


_, err := cli.PackageGetWithObjects(PackageGetWithObjectsInfos{UserID: 0, PackageID: 13})
_, err := cli.PackageGetWithObjects(PackageGetWithObjectsInfos{UserID: 1, PackageID: 13})
So(err, ShouldBeNil) So(err, ShouldBeNil)
}) })
} }

+ 1
- 1
sdks/pcm/models.go View File

@@ -34,5 +34,5 @@ const (
TaskStatusPending TaskStatus = "Pending" TaskStatusPending TaskStatus = "Pending"
TaskStatusRunning TaskStatus = "Running" TaskStatusRunning TaskStatus = "Running"
TaskStatusSuccess TaskStatus = "succeeded" TaskStatusSuccess TaskStatus = "succeeded"
TaskStatuFailed TaskStatus = "failed"
TaskStatusFailed TaskStatus = "failed"
) )

+ 34
- 8
sdks/scheduler/models.go View File

@@ -9,6 +9,7 @@ import (
const ( const (
JobTypeNormal = "Normal" JobTypeNormal = "Normal"
JobTypeResource = "Resource" JobTypeResource = "Resource"
JobTypeInstance = "Instance"


FileInfoTypePackage = "Package" FileInfoTypePackage = "Package"
FileInfoTypeLocalFile = "LocalFile" FileInfoTypeLocalFile = "LocalFile"
@@ -35,7 +36,9 @@ type JobInfo interface {


var JobInfoTypeUnion = types.NewTypeUnion[JobInfo]( var JobInfoTypeUnion = types.NewTypeUnion[JobInfo](
(*NormalJobInfo)(nil), (*NormalJobInfo)(nil),
(*ResourceJobInfo)(nil),
(*DataReturnJobInfo)(nil),
(*MultiInstanceJobInfo)(nil),
(*InstanceJobInfo)(nil),
) )
var _ = serder.UseTypeUnionInternallyTagged(&JobInfoTypeUnion, "type") var _ = serder.UseTypeUnionInternallyTagged(&JobInfoTypeUnion, "type")


@@ -57,14 +60,33 @@ type NormalJobInfo struct {
Services JobServicesInfo `json:"services"` Services JobServicesInfo `json:"services"`
} }


type ResourceJobInfo struct {
serder.Metadata `union:"Resource"`
type DataReturnJobInfo struct {
serder.Metadata `union:"DataReturn"`
JobInfoBase JobInfoBase
Type string `json:"type"` Type string `json:"type"`
BucketID cdssdk.BucketID `json:"bucketID"` BucketID cdssdk.BucketID `json:"bucketID"`
TargetLocalJobID string `json:"targetLocalJobID"` TargetLocalJobID string `json:"targetLocalJobID"`
} }


type MultiInstanceJobInfo struct {
serder.Metadata `union:"MultiInstance"`
JobInfoBase
Type string `json:"type"`
Files JobFilesInfo `json:"files"`
Runtime JobRuntimeInfo `json:"runtime"`
Resources JobResourcesInfo `json:"resources"`
}

type InstanceJobInfo struct {
serder.Metadata `union:"Instance"`
JobInfoBase
Type string `json:"type"`
LocalJobID string `json:"multiInstJobID"`
Files JobFilesInfo `json:"files"`
Runtime JobRuntimeInfo `json:"runtime"`
Resources JobResourcesInfo `json:"resources"`
}

type JobFilesInfo struct { type JobFilesInfo struct {
Dataset JobFileInfo `json:"dataset"` Dataset JobFileInfo `json:"dataset"`
Code JobFileInfo `json:"code"` Code JobFileInfo `json:"code"`
@@ -78,7 +100,7 @@ type JobFileInfo interface {
var FileInfoTypeUnion = types.NewTypeUnion[JobFileInfo]( var FileInfoTypeUnion = types.NewTypeUnion[JobFileInfo](
(*PackageJobFileInfo)(nil), (*PackageJobFileInfo)(nil),
(*LocalJobFileInfo)(nil), (*LocalJobFileInfo)(nil),
(*ResourceJobFileInfo)(nil),
(*DataReturnJobFileInfo)(nil),
(*ImageJobFileInfo)(nil), (*ImageJobFileInfo)(nil),
) )
var _ = serder.UseTypeUnionInternallyTagged(&FileInfoTypeUnion, "type") var _ = serder.UseTypeUnionInternallyTagged(&FileInfoTypeUnion, "type")
@@ -101,11 +123,11 @@ type LocalJobFileInfo struct {
LocalPath string `json:"localPath"` LocalPath string `json:"localPath"`
} }


type ResourceJobFileInfo struct {
serder.Metadata `union:"Resource"`
type DataReturnJobFileInfo struct {
serder.Metadata `union:"DataReturn"`
JobFileInfoBase JobFileInfoBase
Type string `json:"type"`
ResourceLocalJobID string `json:"resourceLocalJobID"`
Type string `json:"type"`
DataReturnLocalJobID string `json:"dataReturnLocalJobID"`
} }


type ImageJobFileInfo struct { type ImageJobFileInfo struct {
@@ -140,6 +162,10 @@ type JobSetFilesUploadScheme struct {
LocalFileSchemes []LocalFileUploadScheme `json:"localFileUploadSchemes"` LocalFileSchemes []LocalFileUploadScheme `json:"localFileUploadSchemes"`
} }


type JobFilesUploadScheme struct {
LocalFileSchemes []LocalFileUploadScheme `json:"localFileUploadSchemes"`
}

type LocalFileUploadScheme struct { type LocalFileUploadScheme struct {
LocalPath string `json:"localPath"` LocalPath string `json:"localPath"`
UploadToCDSNodeID *cdssdk.NodeID `json:"uploadToCDSNodeID"` UploadToCDSNodeID *cdssdk.NodeID `json:"uploadToCDSNodeID"`


+ 1
- 1
sdks/scheduler/scheduler_test.go View File

@@ -15,7 +15,7 @@ func Test_JobSet(t *testing.T) {
id, err := cli.JobSetSumbit(JobSetSumbitReq{ id, err := cli.JobSetSumbit(JobSetSumbitReq{
JobSetInfo: JobSetInfo{ JobSetInfo: JobSetInfo{
Jobs: []JobInfo{ Jobs: []JobInfo{
&ResourceJobInfo{
&DataReturnJobInfo{
Type: JobTypeResource, Type: JobTypeResource,
}, },
&NormalJobInfo{ &NormalJobInfo{


+ 156
- 0
sdks/storage/bucket.go View File

@@ -0,0 +1,156 @@
package cdssdk

import (
"net/url"

"gitlink.org.cn/cloudream/common/consts/errorcode"
myhttp "gitlink.org.cn/cloudream/common/utils/http"
)

type BucketService struct {
*Client
}

func (c *Client) Bucket() *BucketService {
return &BucketService{c}
}

const BucketGetByNamePath = "/bucket/getByName"

type BucketGetByName struct {
UserID UserID `json:"userID" form:"userID" binding:"required"`
Name string `json:"name" form:"name" binding:"required"`
}
type BucketGetByNameResp struct {
Bucket Bucket `json:"bucket"`
}

func (c *BucketService) GetByName(req BucketGetByName) (*BucketGetByNameResp, error) {
url, err := url.JoinPath(c.baseURL, BucketGetByNamePath)
if err != nil {
return nil, err
}

resp, err := myhttp.GetForm(url, myhttp.RequestParam{
Query: req,
})
if err != nil {
return nil, err
}

codeResp, err := ParseJSONResponse[response[BucketGetByNameResp]](resp)
if err != nil {
return nil, err
}

if codeResp.Code == errorcode.OK {
return &codeResp.Data, nil
}

return nil, codeResp.ToError()
}

const BucketCreatePath = "/bucket/create"

type BucketCreate struct {
UserID UserID `json:"userID" binding:"required"`
Name string `json:"name" binding:"required"`
}

type BucketCreateResp struct {
Bucket Bucket `json:"bucket"`
}

func (c *BucketService) Create(req BucketCreate) (*BucketCreateResp, error) {
url, err := url.JoinPath(c.baseURL, BucketCreatePath)
if err != nil {
return nil, err
}

resp, err := myhttp.PostJSON(url, myhttp.RequestParam{
Body: req,
})
if err != nil {
return nil, err
}

codeResp, err := ParseJSONResponse[response[BucketCreateResp]](resp)
if err != nil {
return nil, err
}

if codeResp.Code == errorcode.OK {
return &codeResp.Data, nil
}

return nil, codeResp.ToError()
}

const BucketDeletePath = "/bucket/delete"

type BucketDelete struct {
UserID UserID `json:"userID" binding:"required"`
BucketID BucketID `json:"bucketID" binding:"required"`
}

type BucketDeleteResp struct{}

func (c *BucketService) Delete(req BucketDelete) error {
url, err := url.JoinPath(c.baseURL, BucketDeletePath)
if err != nil {
return err
}

resp, err := myhttp.PostJSON(url, myhttp.RequestParam{
Body: req,
})
if err != nil {
return err
}

codeResp, err := ParseJSONResponse[response[BucketDeleteResp]](resp)
if err != nil {
return err
}

if codeResp.Code == errorcode.OK {
return nil
}

return codeResp.ToError()
}

const BucketListUserBucketsPath = "/bucket/listUserBuckets"

type BucketListUserBucketsReq struct {
UserID UserID `form:"userID" json:"userID" binding:"required"`
}

type BucketListUserBucketsResp struct {
Buckets []Bucket `json:"buckets"`
}

func (c *BucketService) ListUserBuckets(req BucketListUserBucketsReq) (*BucketListUserBucketsResp, error) {
url, err := url.JoinPath(c.baseURL, BucketListUserBucketsPath)
if err != nil {
return nil, err
}

resp, err := myhttp.GetForm(url, myhttp.RequestParam{
Query: req,
})
if err != nil {
return nil, err
}

codeResp, err := ParseJSONResponse[response[BucketListUserBucketsResp]](resp)
if err != nil {
return nil, err
}

if codeResp.Code == errorcode.OK {
return &codeResp.Data, nil
}

return nil, codeResp.ToError()
}

+ 2
- 2
sdks/storage/cache.go View File

@@ -7,7 +7,7 @@ import (
myhttp "gitlink.org.cn/cloudream/common/utils/http" myhttp "gitlink.org.cn/cloudream/common/utils/http"
) )


var CacheMovePackagePath = "/cache/movePackage"
const CacheMovePackagePath = "/cache/movePackage"


type CacheMovePackageReq struct { type CacheMovePackageReq struct {
UserID UserID `json:"userID"` UserID UserID `json:"userID"`
@@ -29,7 +29,7 @@ func (c *Client) CacheMovePackage(req CacheMovePackageReq) (*CacheMovePackageRes
return nil, err return nil, err
} }


jsonResp, err := myhttp.ParseJSONResponse[response[CacheMovePackageResp]](resp)
jsonResp, err := ParseJSONResponse[response[CacheMovePackageResp]](resp)
if err != nil { if err != nil {
return nil, err return nil, err
} }


+ 16
- 1
sdks/storage/models.go View File

@@ -10,7 +10,7 @@ import (
) )


const ( const (
ObjectPathSeperator = "/"
ObjectPathSeparator = "/"
) )


type NodeID int64 type NodeID int64
@@ -112,6 +112,8 @@ type Object struct {
Size int64 `db:"Size" json:"size,string"` Size int64 `db:"Size" json:"size,string"`
FileHash string `db:"FileHash" json:"fileHash"` FileHash string `db:"FileHash" json:"fileHash"`
Redundancy Redundancy `db:"Redundancy" json:"redundancy"` Redundancy Redundancy `db:"Redundancy" json:"redundancy"`
CreateTime time.Time `db:"CreateTime" json:"createTime"`
UpdateTime time.Time `db:"UpdateTime" json:"updateTime"`
} }


type Node struct { type Node struct {
@@ -132,6 +134,19 @@ type PinnedObject struct {
CreateTime time.Time `db:"CreateTime" json:"createTime"` CreateTime time.Time `db:"CreateTime" json:"createTime"`
} }


type Bucket struct {
BucketID BucketID `db:"BucketID" json:"bucketID"`
Name string `db:"Name" json:"name"`
CreatorID UserID `db:"CreatorID" json:"creatorID"`
}

type NodeConnectivity struct {
FromNodeID NodeID `db:"FromNodeID" json:"fromNodeID"`
ToNodeID NodeID `db:"ToNodeID" json:"ToNodeID"`
Delay *float32 `db:"Delay" json:"delay"`
TestTime time.Time `db:"TestTime" json:"testTime"`
}

type NodePackageCachingInfo struct { type NodePackageCachingInfo struct {
NodeID NodeID `json:"nodeID"` NodeID NodeID `json:"nodeID"`
FileSize int64 `json:"fileSize"` FileSize int64 `json:"fileSize"`


+ 1
- 1
sdks/storage/node.go View File

@@ -30,7 +30,7 @@ func (c *Client) NodeGetNodes(req NodeGetNodesReq) (*NodeGetNodesResp, error) {
return nil, err return nil, err
} }


jsonResp, err := myhttp.ParseJSONResponse[response[NodeGetNodesResp]](resp)
jsonResp, err := ParseJSONResponse[response[NodeGetNodesResp]](resp)
if err != nil { if err != nil {
return nil, err return nil, err
} }


+ 244
- 14
sdks/storage/object.go View File

@@ -5,19 +5,111 @@ import (
"io" "io"
"net/url" "net/url"
"strings" "strings"
"time"


"gitlink.org.cn/cloudream/common/consts/errorcode" "gitlink.org.cn/cloudream/common/consts/errorcode"
"gitlink.org.cn/cloudream/common/pkgs/iterator"
myhttp "gitlink.org.cn/cloudream/common/utils/http" myhttp "gitlink.org.cn/cloudream/common/utils/http"
"gitlink.org.cn/cloudream/common/utils/serder" "gitlink.org.cn/cloudream/common/utils/serder"
) )


type ObjectDownloadReq struct {
UserID int64 `json:"userID"`
ObjectID int64 `json:"objectID"`
type ObjectService struct {
*Client
} }


func (c *Client) ObjectDownload(req ObjectDownloadReq) (io.ReadCloser, error) {
url, err := url.JoinPath(c.baseURL, "/object/download")
func (c *Client) Object() *ObjectService {
return &ObjectService{
Client: c,
}
}

const ObjectUploadPath = "/object/upload"

type ObjectUpload struct {
ObjectUploadInfo
Files UploadObjectIterator `json:"-"`
}

type ObjectUploadInfo struct {
UserID UserID `json:"userID" binding:"required"`
PackageID PackageID `json:"packageID" binding:"required"`
NodeAffinity *NodeID `json:"nodeAffinity"`
}

type UploadingObject struct {
Path string
File io.ReadCloser
}

type UploadObjectIterator = iterator.Iterator[*UploadingObject]

type ObjectUploadResp struct {
Uploadeds []UploadedObject `json:"uploadeds"`
}
type UploadedObject struct {
Object *Object `json:"object"`
Error string `json:"error"`
}

func (c *ObjectService) Upload(req ObjectUpload) (*ObjectUploadResp, error) {
url, err := url.JoinPath(c.baseURL, ObjectUploadPath)
if err != nil {
return nil, err
}

infoJSON, err := serder.ObjectToJSON(req)
if err != nil {
return nil, fmt.Errorf("upload info to json: %w", err)
}

resp, err := myhttp.PostMultiPart(url, myhttp.MultiPartRequestParam{
Form: map[string]string{"info": string(infoJSON)},
Files: iterator.Map(req.Files, func(src *UploadingObject) (*myhttp.IterMultiPartFile, error) {
return &myhttp.IterMultiPartFile{
FieldName: "files",
FileName: src.Path,
File: src.File,
}, nil
}),
})
if err != nil {
return nil, err
}

contType := resp.Header.Get("Content-Type")
if strings.Contains(contType, myhttp.ContentTypeJSON) {
var err error
var codeResp response[ObjectUploadResp]
if codeResp, err = serder.JSONToObjectStreamEx[response[ObjectUploadResp]](resp.Body); err != nil {
return nil, fmt.Errorf("parsing response: %w", err)
}

if codeResp.Code == errorcode.OK {
return &codeResp.Data, nil
}

return nil, codeResp.ToError()
}

return nil, fmt.Errorf("unknow response content type: %s", contType)

}

const ObjectDownloadPath = "/object/download"

type ObjectDownload struct {
UserID UserID `form:"userID" json:"userID" binding:"required"`
ObjectID ObjectID `form:"objectID" json:"objectID" binding:"required"`
Offset int64 `form:"offset" json:"offset,omitempty"`
Length *int64 `form:"length" json:"length,omitempty"`
}
type DownloadingObject struct {
Path string
File io.ReadCloser
}

func (c *ObjectService) Download(req ObjectDownload) (*DownloadingObject, error) {
url, err := url.JoinPath(c.baseURL, ObjectDownloadPath)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@@ -40,24 +132,162 @@ func (c *Client) ObjectDownload(req ObjectDownloadReq) (io.ReadCloser, error) {
return nil, codeResp.ToError() return nil, codeResp.ToError()
} }


if strings.Contains(contType, myhttp.ContentTypeOctetStream) {
return resp.Body, nil
_, files, err := myhttp.ParseMultiPartResponse(resp)
if err != nil {
return nil, err
} }


return nil, fmt.Errorf("unknow response content type: %s", contType)
file, err := files.MoveNext()
if err == iterator.ErrNoMoreItem {
return nil, fmt.Errorf("no file found in response")
}
if err != nil {
return nil, err
}

return &DownloadingObject{
Path: file.FileName,
File: file.File,
}, nil
}

const ObjectUpdateInfoPath = "/object/updateInfo"

type UpdatingObject struct {
ObjectID ObjectID `json:"objectID" binding:"required"`
UpdateTime time.Time `json:"updateTime" binding:"required"`
}

func (u *UpdatingObject) ApplyTo(obj *Object) {
obj.UpdateTime = u.UpdateTime
}

type ObjectUpdateInfo struct {
UserID UserID `json:"userID" binding:"required"`
Updatings []UpdatingObject `json:"updatings" binding:"required"`
}

type ObjectUpdateInfoResp struct {
Successes []ObjectID `json:"successes"`
}

func (c *ObjectService) UpdateInfo(req ObjectUpdateInfo) (*ObjectUpdateInfoResp, error) {
url, err := url.JoinPath(c.baseURL, ObjectUpdateInfoPath)
if err != nil {
return nil, err
}

resp, err := myhttp.PostJSON(url, myhttp.RequestParam{
Body: req,
})
if err != nil {
return nil, err
}

jsonResp, err := ParseJSONResponse[response[ObjectUpdateInfoResp]](resp)
if err != nil {
return nil, err
}

if jsonResp.Code == errorcode.OK {
return &jsonResp.Data, nil
}

return nil, jsonResp.ToError()
}

const ObjectMovePath = "/object/move"

type MovingObject struct {
ObjectID ObjectID `json:"objectID" binding:"required"`
PackageID PackageID `json:"packageID" binding:"required"`
Path string `json:"path" binding:"required"`
}

func (m *MovingObject) ApplyTo(obj *Object) {
obj.PackageID = m.PackageID
obj.Path = m.Path
}

type ObjectMove struct {
UserID UserID `json:"userID" binding:"required"`
Movings []MovingObject `json:"movings" binding:"required"`
}

type ObjectMoveResp struct {
Successes []ObjectID `json:"successes"`
}

func (c *ObjectService) Move(req ObjectMove) (*ObjectMoveResp, error) {
url, err := url.JoinPath(c.baseURL, ObjectMovePath)
if err != nil {
return nil, err
}

resp, err := myhttp.PostJSON(url, myhttp.RequestParam{
Body: req,
})
if err != nil {
return nil, err
}

jsonResp, err := ParseJSONResponse[response[ObjectMoveResp]](resp)
if err != nil {
return nil, err
}

if jsonResp.Code == errorcode.OK {
return &jsonResp.Data, nil
}

return nil, jsonResp.ToError()
}

const ObjectDeletePath = "/object/delete"

type ObjectDelete struct {
UserID UserID `json:"userID" binding:"required"`
ObjectIDs []ObjectID `json:"objectIDs" binding:"required"`
}

type ObjectDeleteResp struct{}

func (c *ObjectService) Delete(req ObjectDelete) error {
url, err := url.JoinPath(c.baseURL, ObjectDeletePath)
if err != nil {
return err
}

resp, err := myhttp.PostJSON(url, myhttp.RequestParam{
Body: req,
})
if err != nil {
return err
}

jsonResp, err := ParseJSONResponse[response[ObjectDeleteResp]](resp)
if err != nil {
return err
}

if jsonResp.Code == errorcode.OK {
return nil
}

return jsonResp.ToError()
} }


var ObjectGetPackageObjectsPath = "/object/getPackageObjects"
const ObjectGetPackageObjectsPath = "/object/getPackageObjects"


type ObjectGetPackageObjectsReq struct {
UserID UserID `json:"userID"`
PackageID PackageID `json:"packageID"`
type ObjectGetPackageObjects struct {
UserID UserID `form:"userID" json:"userID" binding:"required"`
PackageID PackageID `form:"packageID" json:"packageID" binding:"required"`
} }
type ObjectGetPackageObjectsResp struct { type ObjectGetPackageObjectsResp struct {
Objects []Object `json:"objects"` Objects []Object `json:"objects"`
} }


func (c *Client) ObjectGetPackageObjects(req ObjectGetPackageObjectsReq) (*ObjectGetPackageObjectsResp, error) {
func (c *ObjectService) GetPackageObjects(req ObjectGetPackageObjects) (*ObjectGetPackageObjectsResp, error) {
url, err := url.JoinPath(c.baseURL, ObjectGetPackageObjectsPath) url, err := url.JoinPath(c.baseURL, ObjectGetPackageObjectsPath)
if err != nil { if err != nil {
return nil, err return nil, err
@@ -70,7 +300,7 @@ func (c *Client) ObjectGetPackageObjects(req ObjectGetPackageObjectsReq) (*Objec
return nil, err return nil, err
} }


jsonResp, err := myhttp.ParseJSONResponse[response[ObjectGetPackageObjectsResp]](resp)
jsonResp, err := ParseJSONResponse[response[ObjectGetPackageObjectsResp]](resp)
if err != nil { if err != nil {
return nil, err return nil, err
} }


+ 137
- 87
sdks/storage/package.go View File

@@ -2,26 +2,34 @@ package cdssdk


import ( import (
"fmt" "fmt"
"io"
"net/url" "net/url"
"strings" "strings"


"gitlink.org.cn/cloudream/common/consts/errorcode" "gitlink.org.cn/cloudream/common/consts/errorcode"
"gitlink.org.cn/cloudream/common/pkgs/iterator"
myhttp "gitlink.org.cn/cloudream/common/utils/http" myhttp "gitlink.org.cn/cloudream/common/utils/http"
"gitlink.org.cn/cloudream/common/utils/serder" "gitlink.org.cn/cloudream/common/utils/serder"
) )


type PackageService struct {
*Client
}

func (c *Client) Package() *PackageService {
return &PackageService{c}
}

const PackageGetPath = "/package/get"

type PackageGetReq struct { type PackageGetReq struct {
UserID UserID `json:"userID"`
PackageID PackageID `json:"packageID"`
UserID UserID `form:"userID" json:"userID" binding:"required"`
PackageID PackageID `form:"packageID" json:"packageID" binding:"required"`
} }
type PackageGetResp struct { type PackageGetResp struct {
Package Package
} }


func (c *Client) PackageGet(req PackageGetReq) (*PackageGetResp, error) {
url, err := url.JoinPath(c.baseURL, "/package/get")
func (c *PackageService) Get(req PackageGetReq) (*PackageGetResp, error) {
url, err := url.JoinPath(c.baseURL, PackageGetPath)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@@ -33,7 +41,7 @@ func (c *Client) PackageGet(req PackageGetReq) (*PackageGetResp, error) {
return nil, err return nil, err
} }


codeResp, err := myhttp.ParseJSONResponse[response[PackageGetResp]](resp)
codeResp, err := ParseJSONResponse[response[PackageGetResp]](resp)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@@ -45,75 +53,88 @@ func (c *Client) PackageGet(req PackageGetReq) (*PackageGetResp, error) {
return nil, codeResp.ToError() return nil, codeResp.ToError()
} }


type PackageUploadReq struct {
UserID UserID `json:"userID"`
BucketID BucketID `json:"bucketID"`
Name string `json:"name"`
NodeAffinity *NodeID `json:"nodeAffinity"`
Files PackageUploadFileIterator `json:"-"`
}
const PackageGetByNamePath = "/package/getByName"


type IterPackageUploadFile struct {
Path string
File io.ReadCloser
type PackageGetByName struct {
UserID UserID `form:"userID" json:"userID" binding:"required"`
BucketName string `form:"bucketName" json:"bucketName" binding:"required"`
PackageName string `form:"packageName" json:"packageName" binding:"required"`
} }

type PackageUploadFileIterator = iterator.Iterator[*IterPackageUploadFile]

type PackageUploadResp struct {
PackageID PackageID `json:"packageID,string"`
type PackageGetByNameResp struct {
Package Package `json:"package"`
} }


func (c *Client) PackageUpload(req PackageUploadReq) (*PackageUploadResp, error) {
url, err := url.JoinPath(c.baseURL, "/package/upload")
func (c *PackageService) GetByName(req PackageGetByName) (*PackageGetByNameResp, error) {
url, err := url.JoinPath(c.baseURL, PackageGetByNamePath)
if err != nil { if err != nil {
return nil, err return nil, err
} }


infoJSON, err := serder.ObjectToJSON(req)
resp, err := myhttp.GetForm(url, myhttp.RequestParam{
Query: req,
})
if err != nil { if err != nil {
return nil, fmt.Errorf("package info to json: %w", err)
return nil, err
} }


resp, err := myhttp.PostMultiPart(url, myhttp.MultiPartRequestParam{
Form: map[string]string{"info": string(infoJSON)},
Files: iterator.Map(req.Files, func(src *IterPackageUploadFile) (*myhttp.IterMultiPartFile, error) {
return &myhttp.IterMultiPartFile{
FieldName: "files",
FileName: src.Path,
File: src.File,
}, nil
}),
})
codeResp, err := ParseJSONResponse[response[PackageGetByNameResp]](resp)
if err != nil { if err != nil {
return nil, err return nil, err
} }


contType := resp.Header.Get("Content-Type")
if strings.Contains(contType, myhttp.ContentTypeJSON) {
var codeResp response[PackageUploadResp]
if err := serder.JSONToObjectStream(resp.Body, &codeResp); err != nil {
return nil, fmt.Errorf("parsing response: %w", err)
}
if codeResp.Code == errorcode.OK {
return &codeResp.Data, nil
}


if codeResp.Code == errorcode.OK {
return &codeResp.Data, nil
}
return nil, codeResp.ToError()
}

const PackageCreatePath = "/package/create"

type PackageCreate struct {
UserID UserID `json:"userID"`
BucketID BucketID `json:"bucketID"`
Name string `json:"name"`
}


return nil, codeResp.ToError()
type PackageCreateResp struct {
Package Package `json:"package"`
}

func (s *PackageService) Create(req PackageCreate) (*PackageCreateResp, error) {
url, err := url.JoinPath(s.baseURL, PackageCreatePath)
if err != nil {
return nil, err
}

resp, err := myhttp.PostJSON(url, myhttp.RequestParam{
Body: req,
})
if err != nil {
return nil, err
} }


return nil, fmt.Errorf("unknow response content type: %s", contType)
codeResp, err := ParseJSONResponse[response[PackageCreateResp]](resp)
if err != nil {
return nil, err
}


if codeResp.Code == errorcode.OK {
return &codeResp.Data, nil
}

return nil, codeResp.ToError()
} }


type PackageDeleteReq struct {
UserID UserID `json:"userID"`
PackageID PackageID `json:"packageID"`
const PackageDeletePath = "/package/delete"

type PackageDelete struct {
UserID UserID `json:"userID" binding:"required"`
PackageID PackageID `json:"packageID" binding:"required"`
} }


func (c *Client) PackageDelete(req PackageDeleteReq) error {
url, err := url.JoinPath(c.baseURL, "/package/delete")
func (c *PackageService) Delete(req PackageDelete) error {
url, err := url.JoinPath(c.baseURL, PackageDeletePath)
if err != nil { if err != nil {
return err return err
} }
@@ -143,79 +164,108 @@ func (c *Client) PackageDelete(req PackageDeleteReq) error {
return fmt.Errorf("unknow response content type: %s", contType) return fmt.Errorf("unknow response content type: %s", contType)
} }


const PackageListBucketPackagesPath = "/package/listBucketPackages"

type PackageListBucketPackages struct {
UserID UserID `form:"userID" json:"userID" binding:"required"`
BucketID BucketID `form:"bucketID" json:"bucketID" binding:"required"`
}

type PackageListBucketPackagesResp struct {
Packages []Package `json:"packages"`
}

func (c *PackageService) ListBucketPackages(req PackageListBucketPackages) (*PackageListBucketPackagesResp, error) {
url, err := url.JoinPath(c.baseURL, PackageListBucketPackagesPath)
if err != nil {
return nil, err
}

resp, err := myhttp.GetForm(url, myhttp.RequestParam{
Query: req,
})
if err != nil {
return nil, err
}

codeResp, err := ParseJSONResponse[response[PackageListBucketPackagesResp]](resp)
if err != nil {
return nil, err
}

if codeResp.Code == errorcode.OK {
return &codeResp.Data, nil
}

return nil, codeResp.ToError()
}

const PackageGetCachedNodesPath = "/package/getCachedNodes"

type PackageGetCachedNodesReq struct { type PackageGetCachedNodesReq struct {
PackageID PackageID `json:"packageID"`
UserID UserID `json:"userID"`
PackageID PackageID `form:"packageID" json:"packageID" binding:"required"`
UserID UserID `form:"userID" json:"userID" binding:"required"`
} }


type PackageGetCachedNodesResp struct { type PackageGetCachedNodesResp struct {
PackageCachingInfo PackageCachingInfo
} }


func (c *Client) PackageGetCachedNodes(req PackageGetCachedNodesReq) (*PackageGetCachedNodesResp, error) {
url, err := url.JoinPath(c.baseURL, "/package/getCachedNodes")
func (c *PackageService) GetCachedNodes(req PackageGetCachedNodesReq) (*PackageGetCachedNodesResp, error) {
url, err := url.JoinPath(c.baseURL, PackageGetCachedNodesPath)
if err != nil { if err != nil {
return nil, err return nil, err
} }
resp, err := myhttp.GetJSON(url, myhttp.RequestParam{ resp, err := myhttp.GetJSON(url, myhttp.RequestParam{
Body: req,
Query: req,
}) })
if err != nil { if err != nil {
return nil, err return nil, err
} }
contType := resp.Header.Get("Content-Type")
if strings.Contains(contType, myhttp.ContentTypeJSON) {


var codeResp response[PackageGetCachedNodesResp]
if err := serder.JSONToObjectStream(resp.Body, &codeResp); err != nil {
return nil, fmt.Errorf("parsing response: %w", err)
}

if codeResp.Code == errorcode.OK {
return &codeResp.Data, nil
}
codeResp, err := ParseJSONResponse[response[PackageGetCachedNodesResp]](resp)
if err != nil {
return nil, err
}


return nil, codeResp.ToError()
if codeResp.Code == errorcode.OK {
return &codeResp.Data, nil
} }


return nil, fmt.Errorf("unknow response content type: %s", contType)
return nil, codeResp.ToError()
} }


const PackageGetLoadedNodesPath = "/package/getLoadedNodes"

type PackageGetLoadedNodesReq struct { type PackageGetLoadedNodesReq struct {
PackageID PackageID `json:"packageID"`
UserID UserID `json:"userID"`
PackageID PackageID `form:"packageID" json:"packageID" binding:"required"`
UserID UserID `form:"userID" json:"userID" binding:"required"`
} }


type PackageGetLoadedNodesResp struct { type PackageGetLoadedNodesResp struct {
NodeIDs []NodeID `json:"nodeIDs"` NodeIDs []NodeID `json:"nodeIDs"`
} }


func (c *Client) PackageGetLoadedNodes(req PackageGetLoadedNodesReq) (*PackageGetLoadedNodesResp, error) {
url, err := url.JoinPath(c.baseURL, "/package/getLoadedNodes")
func (c *PackageService) GetLoadedNodes(req PackageGetLoadedNodesReq) (*PackageGetLoadedNodesResp, error) {
url, err := url.JoinPath(c.baseURL, PackageGetLoadedNodesPath)
if err != nil { if err != nil {
return nil, err return nil, err
} }
resp, err := myhttp.GetJSON(url, myhttp.RequestParam{ resp, err := myhttp.GetJSON(url, myhttp.RequestParam{
Body: req,
Query: req,
}) })
if err != nil { if err != nil {
return nil, err return nil, err
} }


contType := resp.Header.Get("Content-Type")
if strings.Contains(contType, myhttp.ContentTypeJSON) {

var codeResp response[PackageGetLoadedNodesResp]
if err := serder.JSONToObjectStream(resp.Body, &codeResp); err != nil {
return nil, fmt.Errorf("parsing response: %w", err)
}

if codeResp.Code == errorcode.OK {
return &codeResp.Data, nil
}
codeResp, err := ParseJSONResponse[response[PackageGetLoadedNodesResp]](resp)
if err != nil {
return nil, err
}


return nil, codeResp.ToError()
if codeResp.Code == errorcode.OK {
return &codeResp.Data, nil
} }


return nil, fmt.Errorf("unknow response content type: %s", contType)
return nil, codeResp.ToError()
} }

+ 22
- 15
sdks/storage/storage.go View File

@@ -10,17 +10,19 @@ import (
"gitlink.org.cn/cloudream/common/utils/serder" "gitlink.org.cn/cloudream/common/utils/serder"
) )


const StorageLoadPackagePath = "/storage/loadPackage"

type StorageLoadPackageReq struct { type StorageLoadPackageReq struct {
UserID UserID `json:"userID"`
PackageID PackageID `json:"packageID"`
StorageID StorageID `json:"storageID"`
UserID UserID `json:"userID" binding:"required"`
PackageID PackageID `json:"packageID" binding:"required"`
StorageID StorageID `json:"storageID" binding:"required"`
} }
type StorageLoadPackageResp struct { type StorageLoadPackageResp struct {
FullPath string `json:"fullPath"` FullPath string `json:"fullPath"`
} }


func (c *Client) StorageLoadPackage(req StorageLoadPackageReq) (*StorageLoadPackageResp, error) { func (c *Client) StorageLoadPackage(req StorageLoadPackageReq) (*StorageLoadPackageResp, error) {
url, err := url.JoinPath(c.baseURL, "/storage/loadPackage")
url, err := url.JoinPath(c.baseURL, StorageLoadPackagePath)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@@ -32,7 +34,7 @@ func (c *Client) StorageLoadPackage(req StorageLoadPackageReq) (*StorageLoadPack
return nil, err return nil, err
} }


codeResp, err := myhttp.ParseJSONResponse[response[StorageLoadPackageResp]](resp)
codeResp, err := ParseJSONResponse[response[StorageLoadPackageResp]](resp)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@@ -44,12 +46,15 @@ func (c *Client) StorageLoadPackage(req StorageLoadPackageReq) (*StorageLoadPack
return nil, codeResp.ToError() return nil, codeResp.ToError()
} }


const StorageCreatePackagePath = "/storage/createPackage"

type StorageCreatePackageReq struct { type StorageCreatePackageReq struct {
UserID UserID `json:"userID"`
StorageID StorageID `json:"storageID"`
Path string `json:"path"`
BucketID BucketID `json:"bucketID"`
Name string `json:"name"`
UserID UserID `json:"userID" binding:"required"`
StorageID StorageID `json:"storageID" binding:"required"`
Path string `json:"path" binding:"required"`
BucketID BucketID `json:"bucketID" binding:"required"`
Name string `json:"name" binding:"required"`
NodeAffinity *NodeID `json:"nodeAffinity"`
} }


type StorageCreatePackageResp struct { type StorageCreatePackageResp struct {
@@ -57,7 +62,7 @@ type StorageCreatePackageResp struct {
} }


func (c *Client) StorageCreatePackage(req StorageCreatePackageReq) (*StorageCreatePackageResp, error) { func (c *Client) StorageCreatePackage(req StorageCreatePackageReq) (*StorageCreatePackageResp, error) {
url, err := url.JoinPath(c.baseURL, "/storage/createPackage")
url, err := url.JoinPath(c.baseURL, StorageCreatePackagePath)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@@ -86,9 +91,11 @@ func (c *Client) StorageCreatePackage(req StorageCreatePackageReq) (*StorageCrea
return nil, fmt.Errorf("unknow response content type: %s", contType) return nil, fmt.Errorf("unknow response content type: %s", contType)
} }


const StorageGetInfoPath = "/storage/getInfo"

type StorageGetInfoReq struct { type StorageGetInfoReq struct {
UserID UserID `json:"userID"`
StorageID StorageID `json:"storageID"`
UserID UserID `form:"userID" json:"userID" binding:"required"`
StorageID StorageID `form:"storageID" json:"storageID" binding:"required"`
} }
type StorageGetInfoResp struct { type StorageGetInfoResp struct {
Name string `json:"name"` Name string `json:"name"`
@@ -97,7 +104,7 @@ type StorageGetInfoResp struct {
} }


func (c *Client) StorageGetInfo(req StorageGetInfoReq) (*StorageGetInfoResp, error) { func (c *Client) StorageGetInfo(req StorageGetInfoReq) (*StorageGetInfoResp, error) {
url, err := url.JoinPath(c.baseURL, "/storage/getInfo")
url, err := url.JoinPath(c.baseURL, StorageGetInfoPath)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@@ -109,7 +116,7 @@ func (c *Client) StorageGetInfo(req StorageGetInfoReq) (*StorageGetInfoResp, err
return nil, err return nil, err
} }


codeResp, err := myhttp.ParseJSONResponse[response[StorageGetInfoResp]](resp)
codeResp, err := ParseJSONResponse[response[StorageGetInfoResp]](resp)
if err != nil { if err != nil {
return nil, err return nil, err
} }


+ 83
- 47
sdks/storage/storage_test.go View File

@@ -23,16 +23,24 @@ func Test_PackageGet(t *testing.T) {
} }


pkgName := uuid.NewString() pkgName := uuid.NewString()
upResp, err := cli.PackageUpload(PackageUploadReq{
UserID: 0,
createResp, err := cli.Package().Create(PackageCreate{
UserID: 1,
BucketID: 1, BucketID: 1,
Name: pkgName, Name: pkgName,
})
So(err, ShouldBeNil)

_, err = cli.Object().Upload(ObjectUpload{
ObjectUploadInfo: ObjectUploadInfo{
UserID: 1,
PackageID: createResp.Package.PackageID,
},
Files: iterator.Array( Files: iterator.Array(
&IterPackageUploadFile{
Path: "test",
&UploadingObject{
Path: "abc/test",
File: io.NopCloser(bytes.NewBuffer(fileData)), File: io.NopCloser(bytes.NewBuffer(fileData)),
}, },
&IterPackageUploadFile{
&UploadingObject{
Path: "test2", Path: "test2",
File: io.NopCloser(bytes.NewBuffer(fileData)), File: io.NopCloser(bytes.NewBuffer(fileData)),
}, },
@@ -40,18 +48,18 @@ func Test_PackageGet(t *testing.T) {
}) })
So(err, ShouldBeNil) So(err, ShouldBeNil)


getResp, err := cli.PackageGet(PackageGetReq{
UserID: 0,
PackageID: upResp.PackageID,
getResp, err := cli.Package().Get(PackageGetReq{
UserID: 1,
PackageID: createResp.Package.PackageID,
}) })
So(err, ShouldBeNil) So(err, ShouldBeNil)


So(getResp.PackageID, ShouldEqual, upResp.PackageID)
So(getResp.PackageID, ShouldEqual, createResp.Package.PackageID)
So(getResp.Package.Name, ShouldEqual, pkgName) So(getResp.Package.Name, ShouldEqual, pkgName)


err = cli.PackageDelete(PackageDeleteReq{
UserID: 0,
PackageID: upResp.PackageID,
err = cli.Package().Delete(PackageDelete{
UserID: 1,
PackageID: createResp.Package.PackageID,
}) })
So(err, ShouldBeNil) So(err, ShouldBeNil)
}) })
@@ -69,17 +77,27 @@ func Test_Object(t *testing.T) {
} }


nodeAff := NodeID(2) nodeAff := NodeID(2)
upResp, err := cli.PackageUpload(PackageUploadReq{
UserID: 0,
BucketID: 1,
Name: uuid.NewString(),
NodeAffinity: &nodeAff,

pkgName := uuid.NewString()
createResp, err := cli.Package().Create(PackageCreate{
UserID: 1,
BucketID: 1,
Name: pkgName,
})
So(err, ShouldBeNil)

_, err = cli.Object().Upload(ObjectUpload{
ObjectUploadInfo: ObjectUploadInfo{
UserID: 1,
PackageID: createResp.Package.PackageID,
NodeAffinity: &nodeAff,
},
Files: iterator.Array( Files: iterator.Array(
&IterPackageUploadFile{
&UploadingObject{
Path: "test", Path: "test",
File: io.NopCloser(bytes.NewBuffer(fileData)), File: io.NopCloser(bytes.NewBuffer(fileData)),
}, },
&IterPackageUploadFile{
&UploadingObject{
Path: "test2", Path: "test2",
File: io.NopCloser(bytes.NewBuffer(fileData)), File: io.NopCloser(bytes.NewBuffer(fileData)),
}, },
@@ -88,7 +106,7 @@ func Test_Object(t *testing.T) {
So(err, ShouldBeNil) So(err, ShouldBeNil)


// downFs, err := cli.ObjectDownload(ObjectDownloadReq{ // downFs, err := cli.ObjectDownload(ObjectDownloadReq{
// UserID: 0,
// UserID: 1,
// ObjectID: upResp.ObjectID, // ObjectID: upResp.ObjectID,
// }) // })
// So(err, ShouldBeNil) // So(err, ShouldBeNil)
@@ -98,9 +116,9 @@ func Test_Object(t *testing.T) {
// So(downFileData, ShouldResemble, fileData) // So(downFileData, ShouldResemble, fileData)
// downFs.Close() // downFs.Close()


err = cli.PackageDelete(PackageDeleteReq{
UserID: 0,
PackageID: upResp.PackageID,
err = cli.Package().Delete(PackageDelete{
UserID: 1,
PackageID: createResp.Package.PackageID,
}) })
So(err, ShouldBeNil) So(err, ShouldBeNil)
}) })
@@ -117,16 +135,25 @@ func Test_Storage(t *testing.T) {
fileData[i] = byte(i) fileData[i] = byte(i)
} }


upResp, err := cli.PackageUpload(PackageUploadReq{
UserID: 0,
pkgName := uuid.NewString()
createResp, err := cli.Package().Create(PackageCreate{
UserID: 1,
BucketID: 1, BucketID: 1,
Name: uuid.NewString(),
Name: pkgName,
})
So(err, ShouldBeNil)

_, err = cli.Object().Upload(ObjectUpload{
ObjectUploadInfo: ObjectUploadInfo{
UserID: 1,
PackageID: createResp.Package.PackageID,
},
Files: iterator.Array( Files: iterator.Array(
&IterPackageUploadFile{
&UploadingObject{
Path: "test", Path: "test",
File: io.NopCloser(bytes.NewBuffer(fileData)), File: io.NopCloser(bytes.NewBuffer(fileData)),
}, },
&IterPackageUploadFile{
&UploadingObject{
Path: "test2", Path: "test2",
File: io.NopCloser(bytes.NewBuffer(fileData)), File: io.NopCloser(bytes.NewBuffer(fileData)),
}, },
@@ -135,15 +162,15 @@ func Test_Storage(t *testing.T) {
So(err, ShouldBeNil) So(err, ShouldBeNil)


_, err = cli.StorageLoadPackage(StorageLoadPackageReq{ _, err = cli.StorageLoadPackage(StorageLoadPackageReq{
UserID: 0,
PackageID: upResp.PackageID,
UserID: 1,
PackageID: createResp.Package.PackageID,
StorageID: 1, StorageID: 1,
}) })
So(err, ShouldBeNil) So(err, ShouldBeNil)


err = cli.PackageDelete(PackageDeleteReq{
UserID: 0,
PackageID: upResp.PackageID,
err = cli.Package().Delete(PackageDelete{
UserID: 1,
PackageID: createResp.Package.PackageID,
}) })
So(err, ShouldBeNil) So(err, ShouldBeNil)
}) })
@@ -160,16 +187,25 @@ func Test_Cache(t *testing.T) {
fileData[i] = byte(i) fileData[i] = byte(i)
} }


upResp, err := cli.PackageUpload(PackageUploadReq{
UserID: 0,
pkgName := uuid.NewString()
createResp, err := cli.Package().Create(PackageCreate{
UserID: 1,
BucketID: 1, BucketID: 1,
Name: uuid.NewString(),
Name: pkgName,
})
So(err, ShouldBeNil)

_, err = cli.Object().Upload(ObjectUpload{
ObjectUploadInfo: ObjectUploadInfo{
UserID: 1,
PackageID: createResp.Package.PackageID,
},
Files: iterator.Array( Files: iterator.Array(
&IterPackageUploadFile{
&UploadingObject{
Path: "test.txt", Path: "test.txt",
File: io.NopCloser(bytes.NewBuffer(fileData)), File: io.NopCloser(bytes.NewBuffer(fileData)),
}, },
&IterPackageUploadFile{
&UploadingObject{
Path: "test2.txt", Path: "test2.txt",
File: io.NopCloser(bytes.NewBuffer(fileData)), File: io.NopCloser(bytes.NewBuffer(fileData)),
}, },
@@ -178,15 +214,15 @@ func Test_Cache(t *testing.T) {
So(err, ShouldBeNil) So(err, ShouldBeNil)


_, err = cli.CacheMovePackage(CacheMovePackageReq{ _, err = cli.CacheMovePackage(CacheMovePackageReq{
UserID: 0,
PackageID: upResp.PackageID,
UserID: 1,
PackageID: createResp.Package.PackageID,
NodeID: 1, NodeID: 1,
}) })
So(err, ShouldBeNil) So(err, ShouldBeNil)


err = cli.PackageDelete(PackageDeleteReq{
UserID: 0,
PackageID: upResp.PackageID,
err = cli.Package().Delete(PackageDelete{
UserID: 1,
PackageID: createResp.Package.PackageID,
}) })
So(err, ShouldBeNil) So(err, ShouldBeNil)
}) })
@@ -197,16 +233,16 @@ func Test_GetNodeInfos(t *testing.T) {
cli := NewClient(&Config{ cli := NewClient(&Config{
URL: "http://localhost:7890", URL: "http://localhost:7890",
}) })
resp1, err := cli.PackageGetCachedNodes(PackageGetCachedNodesReq{
resp1, err := cli.Package().GetCachedNodes(PackageGetCachedNodesReq{
PackageID: 11, PackageID: 11,
UserID: 0,
UserID: 1,
}) })
So(err, ShouldBeNil) So(err, ShouldBeNil)
fmt.Printf("resp1: %v\n", resp1) fmt.Printf("resp1: %v\n", resp1)


resp2, err := cli.PackageGetLoadedNodes(PackageGetLoadedNodesReq{
resp2, err := cli.Package().GetLoadedNodes(PackageGetLoadedNodesReq{
PackageID: 11, PackageID: 11,
UserID: 0,
UserID: 1,
}) })
So(err, ShouldBeNil) So(err, ShouldBeNil)
fmt.Printf("resp2: %v\n", resp2) fmt.Printf("resp2: %v\n", resp2)


+ 32
- 1
sdks/storage/utils.go View File

@@ -1,7 +1,38 @@
package cdssdk package cdssdk


import "path/filepath"
import (
"fmt"
"io"
"net/http"
"path/filepath"
"strings"

myhttp "gitlink.org.cn/cloudream/common/utils/http"
"gitlink.org.cn/cloudream/common/utils/math2"
"gitlink.org.cn/cloudream/common/utils/serder"
)


func MakeIPFSFilePath(fileHash string) string { func MakeIPFSFilePath(fileHash string) string {
return filepath.Join("ipfs", fileHash) return filepath.Join("ipfs", fileHash)
} }

func ParseJSONResponse[TBody any](resp *http.Response) (TBody, error) {
var ret TBody
contType := resp.Header.Get("Content-Type")
if strings.Contains(contType, myhttp.ContentTypeJSON) {
var err error
if ret, err = serder.JSONToObjectStreamEx[TBody](resp.Body); err != nil {
return ret, fmt.Errorf("parsing response: %w", err)
}

return ret, nil
}

cont, err := io.ReadAll(resp.Body)
if err != nil {
return ret, fmt.Errorf("unknow response content type: %s, status: %d", contType, resp.StatusCode)
}
strCont := string(cont)

return ret, fmt.Errorf("unknow response content type: %s, status: %d, body(prefix): %s", contType, resp.StatusCode, strCont[:math2.Min(len(strCont), 200)])
}

+ 119
- 33
sdks/unifyops/unifyops.go View File

@@ -223,41 +223,127 @@ func (c *Client) GetMemoryData(node GetOneResourceDataReq) (*MemoryResourceData,
} }


func (c *Client) GetIndicatorData(node GetOneResourceDataReq) (*[]ResourceData, error) { func (c *Client) GetIndicatorData(node GetOneResourceDataReq) (*[]ResourceData, error) {
url, err := url.JoinPath(c.baseURL, "/cmdb/resApi/getIndicatorData")
if err != nil {
return nil, err
//url, err := url.JoinPath(c.baseURL, "/cmdb/resApi/getIndicatorData")
//if err != nil {
// return nil, err
//}
//resp, err := myhttp.PostJSON(url, myhttp.RequestParam{
// Body: node,
//})
//if err != nil {
// return nil, err
//}
//
//contType := resp.Header.Get("Content-Type")
//if strings.Contains(contType, myhttp.ContentTypeJSON) {
//
// var codeResp response[[]map[string]any]
// if err := serder.JSONToObjectStream(resp.Body, &codeResp); err != nil {
// return nil, fmt.Errorf("parsing response: %w", err)
// }
//
// if codeResp.Code != CORRECT_CODE {
// return nil, codeResp.ToError()
// }
//
// var ret []ResourceData
// for _, mp := range codeResp.Data {
// var data ResourceData
// err := serder.MapToObject(mp, &data)
// if err != nil {
// return nil, err
// }
// ret = append(ret, data)
// }
//
// return &ret, nil
//}
//
//return nil, fmt.Errorf("unknow response content type: %s", contType)

return mockData()
}

func mockData() (*[]ResourceData, error) {
var ret []ResourceData

cpuResourceData := CPUResourceData{
Name: ResourceTypeCPU,
Total: UnitValue[int64]{
Value: 100,
Unit: "",
},
Available: UnitValue[int64]{
Value: 100,
Unit: "",
},
} }
resp, err := myhttp.PostJSON(url, myhttp.RequestParam{
Body: node,
})
if err != nil {
return nil, err
ret = append(ret, &cpuResourceData)

npuResourceData := NPUResourceData{
Name: ResourceTypeNPU,
Total: UnitValue[int64]{
Value: 100,
Unit: "",
},
Available: UnitValue[int64]{
Value: 100,
Unit: "",
},
} }

contType := resp.Header.Get("Content-Type")
if strings.Contains(contType, myhttp.ContentTypeJSON) {

var codeResp response[[]map[string]any]
if err := serder.JSONToObjectStream(resp.Body, &codeResp); err != nil {
return nil, fmt.Errorf("parsing response: %w", err)
}

if codeResp.Code != CORRECT_CODE {
return nil, codeResp.ToError()
}

var ret []ResourceData
for _, mp := range codeResp.Data {
var data ResourceData
err := serder.MapToObject(mp, &data)
if err != nil {
return nil, err
}
ret = append(ret, data)
}

return &ret, nil
ret = append(ret, &npuResourceData)

gpuResourceData := GPUResourceData{
Name: ResourceTypeGPU,
Total: UnitValue[int64]{
Value: 100,
Unit: "",
},
Available: UnitValue[int64]{
Value: 100,
Unit: "",
},
}
ret = append(ret, &gpuResourceData)

mluResourceData := MLUResourceData{
Name: ResourceTypeMLU,
Total: UnitValue[int64]{
Value: 100,
Unit: "",
},
Available: UnitValue[int64]{
Value: 100,
Unit: "",
},
} }
ret = append(ret, &mluResourceData)

storageResourceData := StorageResourceData{
Name: ResourceTypeStorage,
Total: UnitValue[float64]{
Value: 100,
Unit: "GB",
},
Available: UnitValue[float64]{
Value: 100,
Unit: "GB",
},
}
ret = append(ret, &storageResourceData)

memoryResourceData := MemoryResourceData{
Name: ResourceTypeMemory,
Total: UnitValue[float64]{
Value: 100,
Unit: "GB",
},
Available: UnitValue[float64]{
Value: 100,
Unit: "GB",
},
}
ret = append(ret, &memoryResourceData)


return nil, fmt.Errorf("unknow response content type: %s", contType)
return &ret, nil
} }

+ 1
- 2
utils/config/config.go View File

@@ -3,10 +3,9 @@ package config
import ( import (
"encoding/json" "encoding/json"
"fmt" "fmt"
"github.com/imdario/mergo"
"os" "os"
"path/filepath" "path/filepath"

"github.com/imdario/mergo"
) )


// Load 加载配置文件 // Load 加载配置文件


+ 167
- 14
utils/http/http.go View File

@@ -4,13 +4,17 @@ import (
"bytes" "bytes"
"fmt" "fmt"
"io" "io"
"mime"
"mime/multipart" "mime/multipart"
"net/http" "net/http"
"net/textproto"
ul "net/url" ul "net/url"
"reflect"
"strings" "strings"


"github.com/mitchellh/mapstructure"
"gitlink.org.cn/cloudream/common/pkgs/iterator" "gitlink.org.cn/cloudream/common/pkgs/iterator"
"gitlink.org.cn/cloudream/common/utils/math"
"gitlink.org.cn/cloudream/common/utils/math2"
"gitlink.org.cn/cloudream/common/utils/serder" "gitlink.org.cn/cloudream/common/utils/serder"
) )


@@ -128,7 +132,119 @@ func ParseJSONResponse[TBody any](resp *http.Response) (TBody, error) {
} }
strCont := string(cont) strCont := string(cont)


return ret, fmt.Errorf("unknow response content type: %s, status: %d, body(prefix): %s", contType, resp.StatusCode, strCont[:math.Min(len(strCont), 200)])
return ret, fmt.Errorf("unknow response content type: %s, status: %d, body(prefix): %s", contType, resp.StatusCode, strCont[:math2.Min(len(strCont), 200)])
}

type MultiPartFile struct {
FieldName string
FileName string
File io.ReadCloser
Header textproto.MIMEHeader
}

type multiPartFileIterator struct {
mr *multipart.Reader
firstFile *multipart.Part
}

func (m *multiPartFileIterator) MoveNext() (*MultiPartFile, error) {
if m.firstFile != nil {
f := m.firstFile
m.firstFile = nil

fileName, err := ul.PathUnescape(f.FileName())
if err != nil {
return nil, fmt.Errorf("unescape file name: %w", err)
}

return &MultiPartFile{
FieldName: f.FormName(),
FileName: fileName,
File: f,
Header: f.Header,
}, nil
}

for {
part, err := m.mr.NextPart()
if err == io.EOF {
return nil, iterator.ErrNoMoreItem
}
if err != nil {
return nil, err
}

fileName, err := ul.PathUnescape(part.FileName())
if err != nil {
return nil, fmt.Errorf("unescape file name: %w", err)
}

if part.FileName() != "" {
return &MultiPartFile{
FieldName: part.FormName(),
FileName: fileName,
File: part,
Header: part.Header,
}, nil
}
}
}

func (m *multiPartFileIterator) Close() {}

// 解析multipart/form-data响应,只支持form参数在头部的情况
func ParseMultiPartResponse(resp *http.Response) (map[string]string, iterator.Iterator[*MultiPartFile], error) {
mtype, params, err := mime.ParseMediaType(resp.Header.Get("Content-Type"))
if err != nil {
return nil, nil, fmt.Errorf("parse content type: %w", err)
}

if mtype != ContentTypeMultiPart {
return nil, nil, fmt.Errorf("unknow content type: %s", mtype)
}

boundary := params["boundary"]
if boundary == "" {
return nil, nil, fmt.Errorf("no boundary in content type: %s", mtype)
}

formValues := make(map[string]string)
rd := multipart.NewReader(resp.Body, boundary)

var firstFile *multipart.Part
for {
part, err := rd.NextPart()
if err == io.EOF {
return formValues, iterator.Empty[*MultiPartFile](), nil
}
if err != nil {
return nil, nil, err
}

formName := part.FormName()
fileName := part.FileName()

if formName == "" {
continue
}

if fileName != "" {
firstFile = part
break
}

data, err := io.ReadAll(part)
if err != nil {
return nil, nil, err
}

formValues[formName] = string(data)
}

return formValues, &multiPartFileIterator{
mr: rd,
firstFile: firstFile,
}, nil
} }


type MultiPartRequestParam struct { type MultiPartRequestParam struct {
@@ -171,13 +287,13 @@ func PostMultiPart(url string, param MultiPartRequestParam) (*http.Response, err
defer muWriter.Close() defer muWriter.Close()


if param.Form != nil { if param.Form != nil {
mp, err := serder.ObjectToMap(param.Form)
mp, err := objectToStringMap(param.Form)
if err != nil { if err != nil {
return fmt.Errorf("formValues object to map failed, err: %w", err) return fmt.Errorf("formValues object to map failed, err: %w", err)
} }


for k, v := range mp { for k, v := range mp {
err := muWriter.WriteField(k, fmt.Sprintf("%v", v))
err := muWriter.WriteField(k, v)
if err != nil { if err != nil {
return fmt.Errorf("write form field failed, err: %w", err) return fmt.Errorf("write form field failed, err: %w", err)
} }
@@ -196,7 +312,7 @@ func PostMultiPart(url string, param MultiPartRequestParam) (*http.Response, err
err = func() error { err = func() error {
defer file.File.Close() defer file.File.Close()


w, err := muWriter.CreateFormFile(file.FieldName, file.FileName)
w, err := muWriter.CreateFormFile(file.FieldName, ul.PathEscape(file.FileName))
if err != nil { if err != nil {
return fmt.Errorf("create form file failed, err: %w", err) return fmt.Errorf("create form file failed, err: %w", err)
} }
@@ -237,17 +353,17 @@ func prepareQuery(req *http.Request, query any) error {
return nil return nil
} }


mp, ok := query.(map[string]any)
mp, ok := query.(map[string]string)
if !ok { if !ok {
var err error var err error
if mp, err = serder.ObjectToMap(query); err != nil {
if mp, err = objectToStringMap(query); err != nil {
return fmt.Errorf("query object to map: %w", err) return fmt.Errorf("query object to map: %w", err)
} }
} }


values := make(ul.Values) values := make(ul.Values)
for k, v := range mp { for k, v := range mp {
values.Add(k, fmt.Sprintf("%v", v))
values.Add(k, v)
} }


req.URL.RawQuery = values.Encode() req.URL.RawQuery = values.Encode()
@@ -259,17 +375,17 @@ func prepareHeader(req *http.Request, header any) error {
return nil return nil
} }


mp, ok := header.(map[string]any)
mp, ok := header.(map[string]string)
if !ok { if !ok {
var err error var err error
if mp, err = serder.ObjectToMap(header); err != nil {
if mp, err = objectToStringMap(header); err != nil {
return fmt.Errorf("header object to map: %w", err) return fmt.Errorf("header object to map: %w", err)
} }
} }


req.Header = make(http.Header) req.Header = make(http.Header)
for k, v := range mp { for k, v := range mp {
req.Header.Set(k, fmt.Sprintf("%v", v))
req.Header.Set(k, v)
} }
return nil return nil
} }
@@ -298,17 +414,17 @@ func prepareFormBody(req *http.Request, body any) error {
return nil return nil
} }


mp, ok := body.(map[string]any)
mp, ok := body.(map[string]string)
if !ok { if !ok {
var err error var err error
if mp, err = serder.ObjectToMap(body); err != nil {
if mp, err = objectToStringMap(body); err != nil {
return fmt.Errorf("body object to map: %w", err) return fmt.Errorf("body object to map: %w", err)
} }
} }


values := make(ul.Values) values := make(ul.Values)
for k, v := range mp { for k, v := range mp {
values.Add(k, fmt.Sprintf("%v", v))
values.Add(k, v)
} }


data := values.Encode() data := values.Encode()
@@ -334,3 +450,40 @@ func setValue(values ul.Values, key, value string) ul.Values {
values.Add(key, value) values.Add(key, value)
return values return values
} }

func objectToStringMap(obj any) (map[string]string, error) {
anyMap := make(map[string]any)
dec, err := mapstructure.NewDecoder(&mapstructure.DecoderConfig{
TagName: "json",
Result: &anyMap,
WeaklyTypedInput: true,
})
if err != nil {
return nil, err
}

err = dec.Decode(obj)
if err != nil {
return nil, err
}

ret := make(map[string]string)
for k, v := range anyMap {
val := reflect.ValueOf(v)
for val.Kind() == reflect.Ptr {
if val.IsNil() {
break
} else {
val = val.Elem()
}
}

if val.Kind() == reflect.Pointer {
ret[k] = ""
} else {
ret[k] = fmt.Sprintf("%v", val)
}
}

return ret, nil
}

+ 32
- 0
utils/http/http_test.go View File

@@ -0,0 +1,32 @@
package http

import (
"testing"

. "github.com/smartystreets/goconvey/convey"
)

func Test_objectToStringMap(t *testing.T) {
Convey("包含指针", t, func() {
type A struct {
Val *int `json:"Val,omitempty"`
Nil *int `json:"Nil,omitempty"`
Omit *int `json:"Omit"`
}

v := 10
a := A{
Val: &v,
Nil: nil,
Omit: nil,
}

mp, err := objectToStringMap(a)
So(err, ShouldBeNil)

So(mp, ShouldResemble, map[string]string{
"Val": "10",
"Omit": "",
})
})
}

utils/io/binary.go → utils/io2/binary.go View File

@@ -1,4 +1,4 @@
package io
package io2


import ( import (
"bufio" "bufio"

utils/io/chunked_split.go → utils/io2/chunked_split.go View File

@@ -1,4 +1,4 @@
package io
package io2


import ( import (
"fmt" "fmt"

utils/io/chunked_split_test.go → utils/io2/chunked_split_test.go View File

@@ -1,4 +1,4 @@
package io
package io2


import ( import (
"bytes" "bytes"

utils/io/clone.go → utils/io2/clone.go View File

@@ -1,4 +1,4 @@
package io
package io2


import ( import (
"io" "io"

utils/io/io.go → utils/io2/io.go View File

@@ -1,4 +1,4 @@
package io
package io2


import "io" import "io"



utils/io/io_test.go → utils/io2/io_test.go View File

@@ -1,4 +1,4 @@
package io
package io2


import ( import (
"bytes" "bytes"

utils/io/join.go → utils/io2/join.go View File

@@ -1,10 +1,10 @@
package io
package io2


import ( import (
"io" "io"


"gitlink.org.cn/cloudream/common/utils/lo"
"gitlink.org.cn/cloudream/common/utils/math"
"gitlink.org.cn/cloudream/common/utils/lo2"
"gitlink.org.cn/cloudream/common/utils/math2"
) )


func Join(strs []io.Reader) io.ReadCloser { func Join(strs []io.Reader) io.ReadCloser {
@@ -69,7 +69,7 @@ func (s *chunkedJoin) Read(buf []byte) (int, error) {
return 0, io.EOF return 0, io.EOF
} }


bufLen := math.Min(math.Min(s.chunkSize, len(buf)), s.chunkSize-s.currentRead)
bufLen := math2.Min(math2.Min(s.chunkSize, len(buf)), s.chunkSize-s.currentRead)
rd, err := s.inputs[s.currentInput].Read(buf[:bufLen]) rd, err := s.inputs[s.currentInput].Read(buf[:bufLen])
if err == nil { if err == nil {
s.currentRead += rd s.currentRead += rd
@@ -81,7 +81,7 @@ func (s *chunkedJoin) Read(buf []byte) (int, error) {
} }


if err == io.EOF { if err == io.EOF {
s.inputs = lo.RemoveAt(s.inputs, s.currentInput)
s.inputs = lo2.RemoveAt(s.inputs, s.currentInput)
// 此处不需要+1 // 此处不需要+1
if len(s.inputs) > 0 { if len(s.inputs) > 0 {
s.currentInput = s.currentInput % len(s.inputs) s.currentInput = s.currentInput % len(s.inputs)

utils/io/length.go → utils/io2/length.go View File

@@ -1,9 +1,9 @@
package io
package io2


import ( import (
"io" "io"


"gitlink.org.cn/cloudream/common/utils/math"
"gitlink.org.cn/cloudream/common/utils/math2"
) )


type lengthStream struct { type lengthStream struct {
@@ -19,7 +19,7 @@ func (s *lengthStream) Read(buf []byte) (int, error) {
return 0, s.err return 0, s.err
} }


bufLen := math.Min(s.length-s.readLength, int64(len(buf)))
bufLen := math2.Min(s.length-s.readLength, int64(len(buf)))
rd, err := s.src.Read(buf[:bufLen]) rd, err := s.src.Read(buf[:bufLen])
if err == nil { if err == nil {
s.readLength += int64(rd) s.readLength += int64(rd)

utils/io/zero.go → utils/io2/zero.go View File

@@ -1,4 +1,4 @@
package io
package io2


import "io" import "io"



utils/lo/lo.go → utils/lo2/lo.go View File

@@ -1,4 +1,4 @@
package lo
package lo2


import "github.com/samber/lo" import "github.com/samber/lo"



utils/lo/lo_test.go → utils/lo2/lo_test.go View File

@@ -1,4 +1,4 @@
package lo
package lo2


import ( import (
"testing" "testing"

utils/math/math.go → utils/math2/math.go View File

@@ -1,4 +1,4 @@
package math
package math2


import "golang.org/x/exp/constraints" import "golang.org/x/exp/constraints"



utils/reflect/reflect.go → utils/reflect2/reflect.go View File

@@ -1,4 +1,4 @@
package reflect
package reflect2


import "reflect" import "reflect"



+ 7
- 7
utils/serder/any_to_any.go View File

@@ -4,7 +4,7 @@ import (
"reflect" "reflect"


mp "github.com/mitchellh/mapstructure" mp "github.com/mitchellh/mapstructure"
myreflect "gitlink.org.cn/cloudream/common/utils/reflect"
"gitlink.org.cn/cloudream/common/utils/reflect2"
) )


type Converter func(from reflect.Value, to reflect.Value) (interface{}, error) type Converter func(from reflect.Value, to reflect.Value) (interface{}, error)
@@ -68,11 +68,11 @@ func AnyToAny(src any, dst any, opts ...AnyToAnyOption) error {


// fromAny 如果目的字段实现的FromAny接口,那么通过此接口实现字段类型转换 // fromAny 如果目的字段实现的FromAny接口,那么通过此接口实现字段类型转换
func fromAny(srcType reflect.Type, targetType reflect.Type, data interface{}) (interface{}, error) { func fromAny(srcType reflect.Type, targetType reflect.Type, data interface{}) (interface{}, error) {
if myreflect.TypeOfValue(data) == targetType {
if reflect2.TypeOfValue(data) == targetType {
return data, nil return data, nil
} }


if targetType.Implements(myreflect.TypeOf[FromAny]()) {
if targetType.Implements(reflect2.TypeOf[FromAny]()) {
// 非pointer receiver的FromAny没有意义,因为修改不了receiver的内容,所以这里只支持指针类型 // 非pointer receiver的FromAny没有意义,因为修改不了receiver的内容,所以这里只支持指针类型
if targetType.Kind() == reflect.Pointer { if targetType.Kind() == reflect.Pointer {
val := reflect.New(targetType.Elem()) val := reflect.New(targetType.Elem())
@@ -88,7 +88,7 @@ func fromAny(srcType reflect.Type, targetType reflect.Type, data interface{}) (i
return val.Interface(), nil return val.Interface(), nil
} }


} else if reflect.PointerTo(targetType).Implements(myreflect.TypeOf[FromAny]()) {
} else if reflect.PointerTo(targetType).Implements(reflect2.TypeOf[FromAny]()) {
val := reflect.New(targetType) val := reflect.New(targetType)
anyIf := val.Interface().(FromAny) anyIf := val.Interface().(FromAny)
ok, err := anyIf.FromAny(data) ok, err := anyIf.FromAny(data)
@@ -107,12 +107,12 @@ func fromAny(srcType reflect.Type, targetType reflect.Type, data interface{}) (i


// 如果源字段实现了ToAny接口,那么通过此接口实现字段类型转换 // 如果源字段实现了ToAny接口,那么通过此接口实现字段类型转换
func toAny(srcType reflect.Type, targetType reflect.Type, data interface{}) (interface{}, error) { func toAny(srcType reflect.Type, targetType reflect.Type, data interface{}) (interface{}, error) {
dataType := myreflect.TypeOfValue(data)
dataType := reflect2.TypeOfValue(data)
if dataType == targetType { if dataType == targetType {
return data, nil return data, nil
} }


if dataType.Implements(myreflect.TypeOf[ToAny]()) {
if dataType.Implements(reflect2.TypeOf[ToAny]()) {
anyIf := data.(ToAny) anyIf := data.(ToAny)
dstVal, ok, err := anyIf.ToAny(targetType) dstVal, ok, err := anyIf.ToAny(targetType)
if err != nil { if err != nil {
@@ -123,7 +123,7 @@ func toAny(srcType reflect.Type, targetType reflect.Type, data interface{}) (int
} }


return dstVal, nil return dstVal, nil
} else if reflect.PointerTo(dataType).Implements(myreflect.TypeOf[ToAny]()) {
} else if reflect.PointerTo(dataType).Implements(reflect2.TypeOf[ToAny]()) {
dataVal := reflect.ValueOf(data) dataVal := reflect.ValueOf(data)


dataPtrVal := reflect.New(dataType) dataPtrVal := reflect.New(dataType)


+ 20
- 73
utils/serder/serder.go View File

@@ -6,9 +6,9 @@ import (
"fmt" "fmt"
"io" "io"
"reflect" "reflect"
"strings"


jsoniter "github.com/json-iterator/go" jsoniter "github.com/json-iterator/go"
"github.com/mitchellh/mapstructure"
) )


var unionHandler = UnionHandler{ var unionHandler = UnionHandler{
@@ -53,6 +53,18 @@ func JSONToObjectEx[T any](data []byte) (T, error) {
return ret, nil return ret, nil
} }


// 将JSON字符串转为对象。支持TypeUnion。
func JSONToObjectStreamEx[T any](stream io.Reader) (T, error) {
var ret T
dec := defaultAPI.NewDecoder(stream)
err := dec.Decode(&ret)
if err != nil {
return ret, err
}

return ret, nil
}

// 将对象转为JSON字符串。如果需要支持解析TypeUnion类型,则使用"Ex"结尾的同名函数。 // 将对象转为JSON字符串。如果需要支持解析TypeUnion类型,则使用"Ex"结尾的同名函数。
func ObjectToJSON(obj any) ([]byte, error) { func ObjectToJSON(obj any) ([]byte, error) {
return json.Marshal(obj) return json.Marshal(obj)
@@ -151,78 +163,13 @@ func MapToObject(m map[string]any, obj any, opt ...MapToObjectOption) error {
} }


func ObjectToMap(obj any) (map[string]any, error) { func ObjectToMap(obj any) (map[string]any, error) {
ctx := WalkValue(obj, func(ctx *WalkContext, event WalkEvent) WalkingOp {
switch e := event.(type) {
case StructBeginEvent:
mp := make(map[string]any)
ctx.StackPush(mp)

case StructArriveFieldEvent:
if !WillWalkInto(e.Value) {
ctx.StackPush(e.Value.Interface())
}
case StructLeaveFieldEvent:
val := ctx.StackPop()
mp := ctx.StackPeek().(map[string]any)
jsonTag := e.Info.Tag.Get("json")
if jsonTag == "-" {
break
}

opts := strings.Split(jsonTag, ",")
keyName := opts[0]
if keyName == "" {
keyName = e.Info.Name
}

if contains(opts, "string", 1) {
val = fmt.Sprintf("%v", val)
}

mp[keyName] = val

case StructEndEvent:

case MapBeginEvent:
ctx.StackPush(make(map[string]any))
case MapArriveEntryEvent:
if !WillWalkInto(e.Value) {
ctx.StackPush(e.Value.Interface())
}
case MapLeaveEntryEvent:
val := ctx.StackPop()
mp := ctx.StackPeek().(map[string]any)
mp[fmt.Sprintf("%v", e.Key)] = val
case MapEndEvent:

case ArrayBeginEvent:
ctx.StackPush(make([]any, e.Value.Len()))
case ArrayArriveElementEvent:
if !WillWalkInto(e.Value) {
ctx.StackPush(e.Value.Interface())
}
case ArrayLeaveElementEvent:
val := ctx.StackPop()
arr := ctx.StackPeek().([]any)
arr[e.Index] = val
case ArrayEndEvent:
}

return Next

}, WalkOption{
StackValues: []any{make(map[string]any)},
mp := make(map[string]any)
dec, err := mapstructure.NewDecoder(&mapstructure.DecoderConfig{
TagName: "json",
Result: &mp,
}) })

return ctx.StackPop().(map[string]any), nil
}

func contains(arr []string, ele string, startIndex int) bool {
for i := startIndex; i < len(arr); i++ {
if arr[i] == ele {
return true
}
if err != nil {
return nil, err
} }

return false
return mp, dec.Decode(obj)
} }

+ 5
- 5
utils/serder/serder_test.go View File

@@ -7,7 +7,7 @@ import (


. "github.com/smartystreets/goconvey/convey" . "github.com/smartystreets/goconvey/convey"
"gitlink.org.cn/cloudream/common/pkgs/types" "gitlink.org.cn/cloudream/common/pkgs/types"
myreflect "gitlink.org.cn/cloudream/common/utils/reflect"
"gitlink.org.cn/cloudream/common/utils/reflect2"
) )


type FromAnyString struct { type FromAnyString struct {
@@ -28,7 +28,7 @@ type ToAnyString struct {
} }


func (a *ToAnyString) ToAny(typ reflect.Type) (val any, ok bool, err error) { func (a *ToAnyString) ToAny(typ reflect.Type) (val any, ok bool, err error) {
if typ == myreflect.TypeOf[map[string]any]() {
if typ == reflect2.TypeOf[map[string]any]() {
return map[string]any{ return map[string]any{
"str": "@" + a.Str, "str": "@" + a.Str,
}, true, nil }, true, nil
@@ -55,7 +55,7 @@ type ToAnySt struct {
} }


func (a *ToAnySt) ToAny(typ reflect.Type) (val any, ok bool, err error) { func (a *ToAnySt) ToAny(typ reflect.Type) (val any, ok bool, err error) {
if typ == myreflect.TypeOf[FromAnySt]() {
if typ == reflect2.TypeOf[FromAnySt]() {
return FromAnySt{ return FromAnySt{
Value: "To:" + a.Value, Value: "To:" + a.Value,
}, true, nil }, true, nil
@@ -69,7 +69,7 @@ type DirToAnySt struct {
} }


func (a DirToAnySt) ToAny(typ reflect.Type) (val any, ok bool, err error) { func (a DirToAnySt) ToAny(typ reflect.Type) (val any, ok bool, err error) {
if typ == myreflect.TypeOf[FromAnySt]() {
if typ == reflect2.TypeOf[FromAnySt]() {
return FromAnySt{ return FromAnySt{
Value: "DirTo:" + a.Value, Value: "DirTo:" + a.Value,
}, true, nil }, true, nil
@@ -181,7 +181,7 @@ func Test_AnyToAny(t *testing.T) {


err := AnyToAny(st1, &st2, AnyToAnyOption{ err := AnyToAny(st1, &st2, AnyToAnyOption{
Converters: []Converter{func(from reflect.Value, to reflect.Value) (interface{}, error) { Converters: []Converter{func(from reflect.Value, to reflect.Value) (interface{}, error) {
if from.Type() == myreflect.TypeOf[Struct1]() && to.Type() == myreflect.TypeOf[Struct2]() {
if from.Type() == reflect2.TypeOf[Struct1]() && to.Type() == reflect2.TypeOf[Struct2]() {
s1 := from.Interface().(Struct1) s1 := from.Interface().(Struct1)
return Struct2{ return Struct2{
Value: "@" + s1.Value, Value: "@" + s1.Value,


+ 4
- 4
utils/serder/union_handler.go View File

@@ -9,7 +9,7 @@ import (
"github.com/modern-go/reflect2" "github.com/modern-go/reflect2"
"gitlink.org.cn/cloudream/common/pkgs/types" "gitlink.org.cn/cloudream/common/pkgs/types"


myreflect "gitlink.org.cn/cloudream/common/utils/reflect"
ref2 "gitlink.org.cn/cloudream/common/utils/reflect2"
) )


type anyTypeUnionExternallyTagged struct { type anyTypeUnionExternallyTagged struct {
@@ -106,14 +106,14 @@ func (u *TypeUnionInternallyTagged[T]) Add(typ reflect.Type) error {
} }


// 要求内嵌Metadata结构体,那么结构体中的字段名就会是Metadata, // 要求内嵌Metadata结构体,那么结构体中的字段名就会是Metadata,
field, ok := structType.FieldByName(myreflect.TypeNameOf[Metadata]())
field, ok := structType.FieldByName(ref2.TypeNameOf[Metadata]())
if !ok { if !ok {
u.TagToType[makeDerefFullTypeName(structType)] = typ u.TagToType[makeDerefFullTypeName(structType)] = typ
return nil return nil
} }


// 为防同名,检查类型是不是也是Metadata // 为防同名,检查类型是不是也是Metadata
if field.Type != myreflect.TypeOf[Metadata]() {
if field.Type != ref2.TypeOf[Metadata]() {
u.TagToType[makeDerefFullTypeName(structType)] = typ u.TagToType[makeDerefFullTypeName(structType)] = typ
return nil return nil
} }
@@ -293,7 +293,7 @@ func (e *ExternallyTaggedEncoder) Encode(ptr unsafe.Pointer, stream *jsoniter.St
} }


stream.WriteObjectStart() stream.WriteObjectStart()
valType := myreflect.TypeOfValue(val)
valType := ref2.TypeOfValue(val)
if !e.union.Union.Include(valType) { if !e.union.Union.Include(valType) {
stream.Error = fmt.Errorf("type %v is not in union %v", valType, e.union.Union.UnionType) stream.Error = fmt.Errorf("type %v is not in union %v", valType, e.union.Union.UnionType)
return return


+ 3
- 3
utils/serder/walk_test.go View File

@@ -6,7 +6,7 @@ import (
"testing" "testing"


. "github.com/smartystreets/goconvey/convey" . "github.com/smartystreets/goconvey/convey"
myreflect "gitlink.org.cn/cloudream/common/utils/reflect"
"gitlink.org.cn/cloudream/common/utils/reflect2"
) )


func Test_WalkValue(t *testing.T) { func Test_WalkValue(t *testing.T) {
@@ -38,8 +38,8 @@ func Test_WalkValue(t *testing.T) {


isBaseDataType := func(val reflect.Value) bool { isBaseDataType := func(val reflect.Value) bool {
typ := val.Type() typ := val.Type()
return typ == myreflect.TypeOf[int]() || typ == myreflect.TypeOf[bool]() ||
typ == myreflect.TypeOf[string]() || typ == myreflect.TypeOf[float32]() || val.IsZero()
return typ == reflect2.TypeOf[int]() || typ == reflect2.TypeOf[bool]() ||
typ == reflect2.TypeOf[string]() || typ == reflect2.TypeOf[float32]() || val.IsZero()
} }


toString := func(val any) string { toString := func(val any) string {


utils/sort/sort.go → utils/sort2/sort.go View File

@@ -1,4 +1,4 @@
package sort
package sort2


import ( import (
"sort" "sort"
@@ -36,6 +36,14 @@ func Sort[T any](arr []T, cmp Comparer[T]) []T {
return arr return arr
} }


func SortAsc[T constraints.Ordered](arr []T) []T {
return Sort(arr, Cmp[T])
}

func SortDesc[T constraints.Ordered](arr []T) []T {
return Sort(arr, func(left, right T) int { return Cmp(right, left) })
}

// false < true // false < true
func CmpBool(left, right bool) int { func CmpBool(left, right bool) int {
leftVal := 0 leftVal := 0

+ 80
- 0
utils/sync2/channel.go View File

@@ -0,0 +1,80 @@
package sync2

import (
"context"
"errors"
"sync"
)

var ErrChannelClosed = errors.New("channel is closed")

type Channel[T any] struct {
ch chan T
closed chan any
closeOnce sync.Once
err error
}

func NewChannel[T any]() *Channel[T] {
return &Channel[T]{
ch: make(chan T),
closed: make(chan any),
}
}

func (c *Channel[T]) Error() error {
return c.err
}

func (c *Channel[T]) Send(val T) error {
select {
case c.ch <- val:
return nil
case <-c.closed:
return c.err
}
}

func (c *Channel[T]) Receive(ctx context.Context) (T, error) {
select {
case val := <-c.ch:
return val, nil
case <-c.closed:
var t T
return t, c.err
case <-ctx.Done():
var t T
return t, ctx.Err()
}
}

// 获取channel的发送端,需要与Closed一起使用,防止错过关闭信号
func (c *Channel[T]) Sender() chan<- T {
return c.ch
}

// 获取channel的接收端,需要与Closed一起使用,防止错过关闭信号
func (c *Channel[T]) Receiver() <-chan T {
return c.ch
}

// 获取channel的关闭信号,用于通知接收端和发送端关闭
func (c *Channel[T]) Closed() <-chan any {
return c.closed
}

// 关闭channel。注:此操作不会关闭Sender和Receiver返回的channel
func (c *Channel[T]) Close() {
c.closeOnce.Do(func() {
close(c.closed)
c.err = ErrChannelClosed
})
}

// 关闭channel并设置error。注:此操作不会关闭Sender和Receiver返回的channel
func (c *Channel[T]) CloseWithError(err error) {
c.closeOnce.Do(func() {
close(c.closed)
c.err = err
})
}

utils/sync/counter_cond.go → utils/sync2/counter_cond.go View File

@@ -1,4 +1,4 @@
package sync
package sync2


import "sync" import "sync"



utils/sync/safe_channel.go → utils/sync2/safe_channel.go View File

@@ -1,4 +1,4 @@
package sync
package sync2


import "context" import "context"



+ 48
- 0
utils/sync2/select_set.go View File

@@ -0,0 +1,48 @@
package sync2

import (
"reflect"

"gitlink.org.cn/cloudream/common/utils/lo2"
)

type SelectCase int

type SelectSet[T any, C any] struct {
cases []reflect.SelectCase
tags []T
}

func (s *SelectSet[T, C]) Add(tag T, ch <-chan C) SelectCase {
s.cases = append(s.cases, reflect.SelectCase{Dir: reflect.SelectRecv, Chan: reflect.ValueOf(ch)})
s.tags = append(s.tags, tag)

return SelectCase(len(s.cases) - 1)
}

func (s *SelectSet[T, C]) AddDefault(tag T, ch <-chan C) SelectCase {
s.cases = append(s.cases, reflect.SelectCase{Dir: reflect.SelectDefault, Chan: reflect.ValueOf(ch)})
s.tags = append(s.tags, tag)

return SelectCase(len(s.cases) - 1)
}

func (s *SelectSet[T, C]) Remove(caze SelectCase) {
s.cases = lo2.RemoveAt(s.cases, int(caze))
s.tags = lo2.RemoveAt(s.tags, int(caze))
}

func (s *SelectSet[T, C]) Select() (T, C, bool) {
chosen, recv, ok := reflect.Select(s.cases)
if !ok {
var t T
var c C
return t, c, false
}

return s.tags[chosen], recv.Interface().(C), true
}

func (s *SelectSet[T, C]) Count() int {
return len(s.cases)
}

+ 20
- 0
utils/sync2/sync2.go View File

@@ -0,0 +1,20 @@
package sync2

import "sync"

func ParallelDo[T any](args []T, fn func(val T, index int) error) error {
var err error
var wg sync.WaitGroup
wg.Add(len(args))
for i, arg := range args {
go func(arg T, index int) {
defer wg.Done()

if e := fn(arg, index); e != nil {
err = e
}
}(arg, i)
}
wg.Wait()
return err
}

+ 113
- 0
utils/time2/measurement.go View File

@@ -0,0 +1,113 @@
package time2

import (
"fmt"
"path"
"runtime"
"strings"
"time"
)

type Measurement struct {
startTime time.Time
lastPointTime time.Time
printer func(string)
on bool
title string
}

func NewMeasurement(printer func(string)) Measurement {
return Measurement{
printer: printer,
}
}

func (m *Measurement) Begin(on bool, title ...string) {
if m == nil {
return
}

m.on = on
m.title = strings.Join(title, ".")

if on {
m.startTime = time.Now()
m.lastPointTime = m.startTime

_, file, line, ok := runtime.Caller(1)

titlePart := ""
if m.title != "" {
titlePart = fmt.Sprintf(":%s", m.title)
}

if ok {
m.printer(fmt.Sprintf("[BEGIN%v]<%v:%v>", titlePart, path.Base(file), line))
} else {
m.printer(fmt.Sprintf("[BEGIN%v]<UNKNOWN>", titlePart))
}
}
}

func (m *Measurement) Point(desc ...string) {
if m == nil {
return
}

if m.on {
m.printer(m.makePointString(strings.Join(desc, ".")))
}
}

func (m *Measurement) makePointString(desc string) string {
last := m.lastPointTime
now := time.Now()
m.lastPointTime = now

_, file, line, ok := runtime.Caller(2)

titlePart := ""
if m.title != "" {
titlePart = fmt.Sprintf("(%s)", m.title)
}

if desc != "" {
desc = fmt.Sprintf("@%s", desc)
}

if ok {
return fmt.Sprintf("%v {%v/%v} %v<%v:%v>", titlePart, now.Sub(last), now.Sub(m.startTime), desc, path.Base(file), line)
}

return fmt.Sprintf("{%v/%v}%v<UNKNOWN>", now.Sub(last), now.Sub(m.startTime), desc)
}

func (m *Measurement) End(descs ...string) {
if m == nil {
return
}

if m.on {
last := m.lastPointTime
now := time.Now()
m.lastPointTime = now

_, file, line, ok := runtime.Caller(1)

titlePart := ""
if m.title != "" {
titlePart = fmt.Sprintf(":%s", m.title)
}

desc := strings.Join(descs, ".")
if desc != "" {
desc = fmt.Sprintf("@%s", desc)
}

if ok {
m.printer(fmt.Sprintf("[END%v] {%v/%v} %v<%v:%v>", titlePart, now.Sub(last), now.Sub(m.startTime), desc, path.Base(file), line))
} else {
m.printer(fmt.Sprintf("[END%v] {%v/%v} %v<UNKNOWN>", titlePart, now.Sub(last), now.Sub(m.startTime), desc))
}
}
}

Loading…
Cancel
Save