Browse Source

增加存储系统的API客户端

pull/1/head
Sydonian 2 years ago
parent
commit
a991b08cd6
9 changed files with 694 additions and 0 deletions
  1. +12
    -0
      api/api.go
  2. +26
    -0
      api/storage/client.go
  3. +140
    -0
      api/storage/object.go
  4. +47
    -0
      api/storage/storage.go
  5. +89
    -0
      api/storage/storage_test.go
  6. +8
    -0
      models/models.go
  7. +63
    -0
      pkg/mq/message_dispatcher.go
  8. +282
    -0
      utils/http/http.go
  9. +27
    -0
      utils/serder/serder.go

+ 12
- 0
api/api.go View File

@@ -0,0 +1,12 @@
package api

import "fmt"

type CodeMessageError struct {
Code string
Message string
}

func (e *CodeMessageError) Error() string {
return fmt.Sprintf("code: %s, message: %s", e.Code, e.Message)
}

+ 26
- 0
api/storage/client.go View File

@@ -0,0 +1,26 @@
package storage

import "gitlink.org.cn/cloudream/common/api"

type response[T any] struct {
Code string `json:"code"`
Message string `json:"message"`
Data T `json:"data"`
}

func (r *response[T]) ToError() *api.CodeMessageError {
return &api.CodeMessageError{
Code: r.Code,
Message: r.Message,
}
}

type Client struct {
baseURL string
}

func NewClient(baseURL string) Client {
return Client{
baseURL: baseURL,
}
}

+ 140
- 0
api/storage/object.go View File

@@ -0,0 +1,140 @@
package storage

import (
"fmt"
"io"
"net/url"
"strings"

"gitlink.org.cn/cloudream/common/consts/errorcode"
myhttp "gitlink.org.cn/cloudream/common/utils/http"
"gitlink.org.cn/cloudream/common/utils/serder"
)

type ObjectDownloadReq struct {
UserID int64 `json:"userID"`
ObjectID int64 `json:"objectID"`
}

func (c *Client) ObjectDownload(req ObjectDownloadReq) (io.ReadCloser, error) {
url, err := url.JoinPath(c.baseURL, "/object/download")
if err != nil {
return nil, err
}

resp, err := myhttp.GetJSON(url, myhttp.RequestParam{
Query: req,
})
if err != nil {
return nil, err
}

contType := resp.Header.Get("Content-Type")

if strings.Contains(contType, myhttp.ContentTypeJSON) {
var codeResp response[any]
if err := serder.JSONToObjectStream(resp.Body, &codeResp); err != nil {
return nil, fmt.Errorf("parsing response: %w", err)
}

return nil, codeResp.ToError()
}

if strings.Contains(contType, myhttp.ContentTypeOctetStream) {
return resp.Body, nil
}

return nil, fmt.Errorf("unknow response content type: %s", contType)
}

type ObjectUploadReq struct {
UserID int64 `json:"userID"`
BucketID int64 `json:"bucketID"`
FileSize int64 `json:"fileSize"`
ObjectName string `json:"objectName"`
Redundancy RedundancyConfig `json:"redundancy"`
File io.Reader `json:"-"`
}

type RedundancyConfig struct {
Type string `json:"type"`
Config any `json:"config"`
}

type ObjectUploadResp struct {
ObjectID int64 `json:"objectID,string"`
}

func (c *Client) ObjectUpload(req ObjectUploadReq) (*ObjectUploadResp, error) {
url, err := url.JoinPath(c.baseURL, "/object/upload")
if err != nil {
return nil, err
}

infoJSON, err := serder.ObjectToJSON(req)
if err != nil {
return nil, fmt.Errorf("object info to json: %w", err)
}

resp, err := myhttp.PostMultiPart(url, myhttp.MultiPartRequestParam{
Form: map[string]string{"info": string(infoJSON)},
DataName: req.ObjectName,
Data: req.File,
})
if err != nil {
return nil, err
}

contType := resp.Header.Get("Content-Type")
if strings.Contains(contType, myhttp.ContentTypeJSON) {
var codeResp response[ObjectUploadResp]
if err := serder.JSONToObjectStream(resp.Body, &codeResp); err != nil {
return nil, fmt.Errorf("parsing response: %w", err)
}

if codeResp.Code == errorcode.OK {
return &codeResp.Data, nil
}

return nil, codeResp.ToError()
}

return nil, fmt.Errorf("unknow response content type: %s", contType)

}

type ObjectDeleteReq struct {
UserID int64 `json:"userID"`
ObjectID int64 `json:"objectID"`
}

func (c *Client) ObjectDelete(req ObjectDeleteReq) error {
url, err := url.JoinPath(c.baseURL, "/object/delete")
if err != nil {
return err
}

resp, err := myhttp.PostJSON(url, myhttp.RequestParam{
Body: req,
})
if err != nil {
return err
}

contType := resp.Header.Get("Content-Type")

if strings.Contains(contType, myhttp.ContentTypeJSON) {
var codeResp response[any]
if err := serder.JSONToObjectStream(resp.Body, &codeResp); err != nil {
return fmt.Errorf("parsing response: %w", err)
}

if codeResp.Code == errorcode.OK {
return nil
}

return codeResp.ToError()
}

return fmt.Errorf("unknow response content type: %s", contType)
}

+ 47
- 0
api/storage/storage.go View File

@@ -0,0 +1,47 @@
package storage

import (
"fmt"
"net/url"
"strings"

"gitlink.org.cn/cloudream/common/consts/errorcode"
myhttp "gitlink.org.cn/cloudream/common/utils/http"
"gitlink.org.cn/cloudream/common/utils/serder"
)

type StorageMoveObjectReq struct {
UserID int64 `json:"userID"`
ObjectID int64 `json:"objectID"`
StorageID int64 `json:"storageID"`
}

func (c *Client) StorageMoveObject(req StorageMoveObjectReq) error {
url, err := url.JoinPath(c.baseURL, "/storage/moveObject")
if err != nil {
return err
}

resp, err := myhttp.PostJSON(url, myhttp.RequestParam{
Body: req,
})
if err != nil {
return err
}

contType := resp.Header.Get("Content-Type")
if strings.Contains(contType, myhttp.ContentTypeJSON) {
var codeResp response[any]
if err := serder.JSONToObjectStream(resp.Body, &codeResp); err != nil {
return fmt.Errorf("parsing response: %w", err)
}

if codeResp.Code == errorcode.OK {
return nil
}

return codeResp.ToError()
}

return fmt.Errorf("unknow response content type: %s", contType)
}

+ 89
- 0
api/storage/storage_test.go View File

@@ -0,0 +1,89 @@
package storage

import (
"bytes"
"io"
"testing"

"github.com/google/uuid"
. "github.com/smartystreets/goconvey/convey"
"gitlink.org.cn/cloudream/common/models"
)

func Test_Object(t *testing.T) {
Convey("上传,下载,删除", t, func() {
cli := NewClient("http://localhost:7890")

fileData := make([]byte, 4096)
for i := 0; i < len(fileData); i++ {
fileData[i] = byte(i)
}

upResp, err := cli.ObjectUpload(ObjectUploadReq{
UserID: 0,
BucketID: 1,
FileSize: 4096,
ObjectName: uuid.NewString(),
Redundancy: RedundancyConfig{
Type: models.RedundancyRep,
Config: models.NewRepRedundancyConfig(1),
},
File: bytes.NewBuffer(fileData),
})
So(err, ShouldBeNil)

downFs, err := cli.ObjectDownload(ObjectDownloadReq{
UserID: 0,
ObjectID: upResp.ObjectID,
})
So(err, ShouldBeNil)

downFileData, err := io.ReadAll(downFs)
So(err, ShouldBeNil)
So(downFileData, ShouldResemble, fileData)
downFs.Close()

err = cli.ObjectDelete(ObjectDeleteReq{
UserID: 0,
ObjectID: upResp.ObjectID,
})
So(err, ShouldBeNil)
})
}

func Test_Storage(t *testing.T) {
Convey("上传后调度文件", t, func() {
cli := NewClient("http://localhost:7890")

fileData := make([]byte, 4096)
for i := 0; i < len(fileData); i++ {
fileData[i] = byte(i)
}

upResp, err := cli.ObjectUpload(ObjectUploadReq{
UserID: 0,
BucketID: 1,
FileSize: 4096,
ObjectName: uuid.NewString(),
Redundancy: RedundancyConfig{
Type: models.RedundancyRep,
Config: models.NewRepRedundancyConfig(1),
},
File: bytes.NewBuffer(fileData),
})
So(err, ShouldBeNil)

err = cli.StorageMoveObject(StorageMoveObjectReq{
UserID: 0,
ObjectID: upResp.ObjectID,
StorageID: 1,
})
So(err, ShouldBeNil)

err = cli.ObjectDelete(ObjectDeleteReq{
UserID: 0,
ObjectID: upResp.ObjectID,
})
So(err, ShouldBeNil)
})
}

+ 8
- 0
models/models.go View File

@@ -7,6 +7,7 @@ const (
RedundancyEC = "ec" RedundancyEC = "ec"
) )


// 冗余模式的描述信息
type RedundancyConfigTypes interface{} type RedundancyConfigTypes interface{}
type RedundancyConfigTypesConst interface { type RedundancyConfigTypesConst interface {
RepRedundancyConfig | ECRedundancyConfig RepRedundancyConfig | ECRedundancyConfig
@@ -15,9 +16,16 @@ type RepRedundancyConfig struct {
RepCount int `json:"repCount"` RepCount int `json:"repCount"`
} }


func NewRepRedundancyConfig(repCount int) RepRedundancyConfig {
return RepRedundancyConfig{
RepCount: repCount,
}
}

type ECRedundancyConfig struct { type ECRedundancyConfig struct {
} }


// 冗余模式的具体配置
type RedundancyDataTypes interface{} type RedundancyDataTypes interface{}
type RedundancyDataTypesConst interface { type RedundancyDataTypesConst interface {
RepRedundancyData | ECRedundancyData RepRedundancyData | ECRedundancyData


+ 63
- 0
pkg/mq/message_dispatcher.go View File

@@ -0,0 +1,63 @@
package mq

import (
"fmt"

myreflect "gitlink.org.cn/cloudream/common/utils/reflect"
)

type HandlerFn func(svcBase any, msg *Message) (*Message, error)

type MessageDispatcher struct {
Handlers map[myreflect.Type]HandlerFn
}

func NewMessageDispatcher() MessageDispatcher {
return MessageDispatcher{
Handlers: make(map[myreflect.Type]HandlerFn),
}
}

func (h *MessageDispatcher) Add(typ myreflect.Type, handler HandlerFn) {
h.Handlers[typ] = handler
}

func (h *MessageDispatcher) Handle(svcBase any, msg *Message) (*Message, error) {
typ := myreflect.TypeOfValue(msg.Body)
fn, ok := h.Handlers[typ]
if !ok {
return nil, fmt.Errorf("unsupported message type: %s", typ.Name())
}

return fn(svcBase, msg)
}

// 将Service中的一个接口函数作为指定类型消息的处理函数
func AddServiceFn[TSvc any, TReq any, TResp any](dispatcher *MessageDispatcher, svcFn func(svc TSvc, msg *TReq) (*TResp, *CodeMessage)) {
dispatcher.Add(myreflect.TypeOf[TReq](), func(svcBase any, reqMsg *Message) (*Message, error) {

reqMsgBody := reqMsg.Body.(TReq)
ret, codeMsg := svcFn(svcBase.(TSvc), &reqMsgBody)

var body MessageBodyTypes
if ret != nil {
body = *ret
}

respMsg := MakeMessage(body)
respMsg.SetCodeMessage(codeMsg.Code, codeMsg.Message)

return &respMsg, nil
})
}

// 将Service中的一个*没有返回值的*接口函数作为指定类型消息的处理函数
func AddNoRespServiceFn[TSvc any, TReq any](dispatcher *MessageDispatcher, svcFn func(svc TSvc, msg *TReq)) {
dispatcher.Add(myreflect.TypeOf[TReq](), func(svcBase any, reqMsg *Message) (*Message, error) {

reqMsgBody := reqMsg.Body.(TReq)
svcFn(svcBase.(TSvc), &reqMsgBody)

return nil, nil
})
}

+ 282
- 0
utils/http/http.go View File

@@ -0,0 +1,282 @@
package http

import (
"fmt"
"io"
"mime/multipart"
"net/http"
ul "net/url"
"strings"

"gitlink.org.cn/cloudream/common/utils/serder"
)

const (
ContentTypeJSON = "application/json"
ContentTypeForm = "application/x-www-form-urlencoded"
ContentTypeMultiPart = "multipart/form-data"
ContentTypeOctetStream = "application/octet-stream"
)

type RequestParam struct {
Header any
Query any
Body any
}

func GetJSON(url string, param RequestParam) (*http.Response, error) {
req, err := http.NewRequest(http.MethodGet, url, nil)
if err != nil {
return nil, err
}

if err = prepareQuery(req, param.Query); err != nil {
return nil, err
}

if err = prepareHeader(req, param.Header); err != nil {
return nil, err
}

if err = prepareJSONBody(req, param.Body); err != nil {
return nil, err
}

return http.DefaultClient.Do(req)
}

func GetForm(url string, param RequestParam) (*http.Response, error) {
req, err := http.NewRequest(http.MethodGet, url, nil)
if err != nil {
return nil, err
}

if err = prepareQuery(req, param.Query); err != nil {
return nil, err
}

if err = prepareHeader(req, param.Header); err != nil {
return nil, err
}

if err = prepareFormBody(req, param.Body); err != nil {
return nil, err
}

return http.DefaultClient.Do(req)
}

func PostJSON(url string, param RequestParam) (*http.Response, error) {
req, err := http.NewRequest(http.MethodPost, url, nil)
if err != nil {
return nil, err
}

if err = prepareQuery(req, param.Query); err != nil {
return nil, err
}

if err = prepareHeader(req, param.Header); err != nil {
return nil, err
}

if err = prepareJSONBody(req, param.Body); err != nil {
return nil, err
}

return http.DefaultClient.Do(req)
}

func PostForm(url string, param RequestParam) (*http.Response, error) {
req, err := http.NewRequest(http.MethodPost, url, nil)
if err != nil {
return nil, err
}

if err = prepareQuery(req, param.Query); err != nil {
return nil, err
}

if err = prepareHeader(req, param.Header); err != nil {
return nil, err
}

if err = prepareFormBody(req, param.Body); err != nil {
return nil, err
}

return http.DefaultClient.Do(req)
}

type MultiPartRequestParam struct {
Header any
Query any
Form any
DataName string
Data io.Reader
}

func PostMultiPart(url string, param MultiPartRequestParam) (*http.Response, error) {
req, err := http.NewRequest(http.MethodPost, url, nil)
if err != nil {
return nil, err
}

if err = prepareQuery(req, param.Query); err != nil {
return nil, err
}

if err = prepareHeader(req, param.Header); err != nil {
return nil, err
}

pr, pw := io.Pipe()
muWriter := multipart.NewWriter(pw)

setHeader(req.Header, "Content-Type", fmt.Sprintf("%s;boundary=%s", ContentTypeMultiPart, muWriter.Boundary()))

writeResult := make(chan error, 1)
go func() {
writeResult <- func() error {
defer pw.Close()
defer muWriter.Close()

if param.Form != nil {
mp, err := serder.ObjectToMap(param.Form)
if err != nil {
return fmt.Errorf("formValues object to map failed, err: %w", err)
}

for k, v := range mp {
err := muWriter.WriteField(k, fmt.Sprintf("%v", v))
if err != nil {
return fmt.Errorf("write form field failed, err: %w", err)
}
}
}

if param.Data != nil {
w, err := muWriter.CreateFormFile("file", param.DataName)
if err != nil {
return fmt.Errorf("create form file failed, err: %w", err)
}

_, err = io.Copy(w, param.Data)
if err != nil {
return err
}
}
return nil
}()
}()

req.Body = pr

cli := http.Client{}
resp, err := cli.Do(req)
if err != nil {
return nil, err
}

writeErr := <-writeResult
if writeErr != nil {
return nil, writeErr
}

return resp, nil
}

func prepareQuery(req *http.Request, query any) error {
if query == nil {
return nil
}

mp, ok := query.(map[string]any)
if !ok {
var err error
if mp, err = serder.ObjectToMap(query); err != nil {
return fmt.Errorf("query object to map: %w", err)
}
}

values := make(ul.Values)
for k, v := range mp {
values.Add(k, fmt.Sprintf("%v", v))
}

req.URL.RawQuery = values.Encode()
return nil
}

func prepareHeader(req *http.Request, header any) error {
if header == nil {
return nil
}

mp, ok := header.(map[string]any)
if !ok {
var err error
if mp, err = serder.ObjectToMap(header); err != nil {
return fmt.Errorf("header object to map: %w", err)
}
}

req.Header = make(http.Header)
for k, v := range mp {
req.Header.Set(k, fmt.Sprintf("%v", v))
}
return nil
}

func prepareJSONBody(req *http.Request, body any) error {
setHeader(req.Header, "Content-Type", ContentTypeJSON)

if body == nil {
return nil
}

req.Body = serder.ObjectToJSONStream(body)
return nil
}

func prepareFormBody(req *http.Request, body any) error {
setHeader(req.Header, "Content-Type", ContentTypeForm)

if body == nil {
return nil
}

mp, ok := body.(map[string]any)
if !ok {
var err error
if mp, err = serder.ObjectToMap(body); err != nil {
return fmt.Errorf("body object to map: %w", err)
}
}

values := make(ul.Values)
for k, v := range mp {
values.Add(k, fmt.Sprintf("%v", v))
}

req.Body = io.NopCloser(strings.NewReader(values.Encode()))
return nil
}

func setHeader(mp http.Header, key, value string) http.Header {
if mp == nil {
mp = make(http.Header)
}

mp.Set(key, value)
return mp
}

func setValue(values ul.Values, key, value string) ul.Values {
if values == nil {
values = make(ul.Values)
}

values.Add(key, value)
return values
}

+ 27
- 0
utils/serder/serder.go View File

@@ -3,6 +3,7 @@ package serder
import ( import (
"encoding/json" "encoding/json"
"fmt" "fmt"
"io"
"reflect" "reflect"
"strings" "strings"
) )
@@ -11,10 +12,36 @@ func ObjectToJSON(obj any) ([]byte, error) {
return json.Marshal(obj) return json.Marshal(obj)
} }


func ObjectToJSONStream(obj any) io.ReadCloser {
pr, pw := io.Pipe()
enc := json.NewEncoder(pw)

go func() {
err := enc.Encode(obj)
if err != nil && err != io.EOF {
pw.CloseWithError(err)
} else {
pw.Close()
}
}()

return pr
}

func JSONToObject(data []byte, obj any) error { func JSONToObject(data []byte, obj any) error {
return json.Unmarshal(data, obj) return json.Unmarshal(data, obj)
} }


func JSONToObjectStream(str io.Reader, obj any) error {
dec := json.NewDecoder(str)
err := dec.Decode(obj)
if err != io.EOF {
return err
}

return nil
}

type TypeResolver interface { type TypeResolver interface {
TypeToString(typ reflect.Type) (string, error) TypeToString(typ reflect.Type) (string, error)
StringToType(typeStr string) (reflect.Type, error) StringToType(typeStr string) (reflect.Type, error)


Loading…
Cancel
Save