From a991b08cd6a08160c4cfaadaec58dfc37ed93dad Mon Sep 17 00:00:00 2001 From: Sydonian <794346190@qq.com> Date: Fri, 11 Aug 2023 15:53:11 +0800 Subject: [PATCH] =?UTF-8?q?=E5=A2=9E=E5=8A=A0=E5=AD=98=E5=82=A8=E7=B3=BB?= =?UTF-8?q?=E7=BB=9F=E7=9A=84API=E5=AE=A2=E6=88=B7=E7=AB=AF?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- api/api.go | 12 ++ api/storage/client.go | 26 ++++ api/storage/object.go | 140 +++++++++++++++++ api/storage/storage.go | 47 ++++++ api/storage/storage_test.go | 89 +++++++++++ models/models.go | 8 + pkg/mq/message_dispatcher.go | 63 ++++++++ utils/http/http.go | 282 +++++++++++++++++++++++++++++++++++ utils/serder/serder.go | 27 ++++ 9 files changed, 694 insertions(+) create mode 100644 api/api.go create mode 100644 api/storage/client.go create mode 100644 api/storage/object.go create mode 100644 api/storage/storage.go create mode 100644 api/storage/storage_test.go create mode 100644 pkg/mq/message_dispatcher.go create mode 100644 utils/http/http.go diff --git a/api/api.go b/api/api.go new file mode 100644 index 0000000..7c19a8e --- /dev/null +++ b/api/api.go @@ -0,0 +1,12 @@ +package api + +import "fmt" + +type CodeMessageError struct { + Code string + Message string +} + +func (e *CodeMessageError) Error() string { + return fmt.Sprintf("code: %s, message: %s", e.Code, e.Message) +} diff --git a/api/storage/client.go b/api/storage/client.go new file mode 100644 index 0000000..3af6114 --- /dev/null +++ b/api/storage/client.go @@ -0,0 +1,26 @@ +package storage + +import "gitlink.org.cn/cloudream/common/api" + +type response[T any] struct { + Code string `json:"code"` + Message string `json:"message"` + Data T `json:"data"` +} + +func (r *response[T]) ToError() *api.CodeMessageError { + return &api.CodeMessageError{ + Code: r.Code, + Message: r.Message, + } +} + +type Client struct { + baseURL string +} + +func NewClient(baseURL string) Client { + return Client{ + baseURL: baseURL, + } +} diff --git a/api/storage/object.go b/api/storage/object.go new file mode 100644 index 0000000..5ae8b84 --- /dev/null +++ b/api/storage/object.go @@ -0,0 +1,140 @@ +package storage + +import ( + "fmt" + "io" + "net/url" + "strings" + + "gitlink.org.cn/cloudream/common/consts/errorcode" + myhttp "gitlink.org.cn/cloudream/common/utils/http" + "gitlink.org.cn/cloudream/common/utils/serder" +) + +type ObjectDownloadReq struct { + UserID int64 `json:"userID"` + ObjectID int64 `json:"objectID"` +} + +func (c *Client) ObjectDownload(req ObjectDownloadReq) (io.ReadCloser, error) { + url, err := url.JoinPath(c.baseURL, "/object/download") + if err != nil { + return nil, err + } + + resp, err := myhttp.GetJSON(url, myhttp.RequestParam{ + Query: req, + }) + if err != nil { + return nil, err + } + + contType := resp.Header.Get("Content-Type") + + if strings.Contains(contType, myhttp.ContentTypeJSON) { + var codeResp response[any] + if err := serder.JSONToObjectStream(resp.Body, &codeResp); err != nil { + return nil, fmt.Errorf("parsing response: %w", err) + } + + return nil, codeResp.ToError() + } + + if strings.Contains(contType, myhttp.ContentTypeOctetStream) { + return resp.Body, nil + } + + return nil, fmt.Errorf("unknow response content type: %s", contType) +} + +type ObjectUploadReq struct { + UserID int64 `json:"userID"` + BucketID int64 `json:"bucketID"` + FileSize int64 `json:"fileSize"` + ObjectName string `json:"objectName"` + Redundancy RedundancyConfig `json:"redundancy"` + File io.Reader `json:"-"` +} + +type RedundancyConfig struct { + Type string `json:"type"` + Config any `json:"config"` +} + +type ObjectUploadResp struct { + ObjectID int64 `json:"objectID,string"` +} + +func (c *Client) ObjectUpload(req ObjectUploadReq) (*ObjectUploadResp, error) { + url, err := url.JoinPath(c.baseURL, "/object/upload") + if err != nil { + return nil, err + } + + infoJSON, err := serder.ObjectToJSON(req) + if err != nil { + return nil, fmt.Errorf("object info to json: %w", err) + } + + resp, err := myhttp.PostMultiPart(url, myhttp.MultiPartRequestParam{ + Form: map[string]string{"info": string(infoJSON)}, + DataName: req.ObjectName, + Data: req.File, + }) + if err != nil { + return nil, err + } + + contType := resp.Header.Get("Content-Type") + if strings.Contains(contType, myhttp.ContentTypeJSON) { + var codeResp response[ObjectUploadResp] + if err := serder.JSONToObjectStream(resp.Body, &codeResp); err != nil { + return nil, fmt.Errorf("parsing response: %w", err) + } + + if codeResp.Code == errorcode.OK { + return &codeResp.Data, nil + } + + return nil, codeResp.ToError() + } + + return nil, fmt.Errorf("unknow response content type: %s", contType) + +} + +type ObjectDeleteReq struct { + UserID int64 `json:"userID"` + ObjectID int64 `json:"objectID"` +} + +func (c *Client) ObjectDelete(req ObjectDeleteReq) error { + url, err := url.JoinPath(c.baseURL, "/object/delete") + if err != nil { + return err + } + + resp, err := myhttp.PostJSON(url, myhttp.RequestParam{ + Body: req, + }) + if err != nil { + return err + } + + contType := resp.Header.Get("Content-Type") + + if strings.Contains(contType, myhttp.ContentTypeJSON) { + var codeResp response[any] + if err := serder.JSONToObjectStream(resp.Body, &codeResp); err != nil { + return fmt.Errorf("parsing response: %w", err) + } + + if codeResp.Code == errorcode.OK { + return nil + } + + return codeResp.ToError() + } + + return fmt.Errorf("unknow response content type: %s", contType) +} diff --git a/api/storage/storage.go b/api/storage/storage.go new file mode 100644 index 0000000..cdf17c5 --- /dev/null +++ b/api/storage/storage.go @@ -0,0 +1,47 @@ +package storage + +import ( + "fmt" + "net/url" + "strings" + + "gitlink.org.cn/cloudream/common/consts/errorcode" + myhttp "gitlink.org.cn/cloudream/common/utils/http" + "gitlink.org.cn/cloudream/common/utils/serder" +) + +type StorageMoveObjectReq struct { + UserID int64 `json:"userID"` + ObjectID int64 `json:"objectID"` + StorageID int64 `json:"storageID"` +} + +func (c *Client) StorageMoveObject(req StorageMoveObjectReq) error { + url, err := url.JoinPath(c.baseURL, "/storage/moveObject") + if err != nil { + return err + } + + resp, err := myhttp.PostJSON(url, myhttp.RequestParam{ + Body: req, + }) + if err != nil { + return err + } + + contType := resp.Header.Get("Content-Type") + if strings.Contains(contType, myhttp.ContentTypeJSON) { + var codeResp response[any] + if err := serder.JSONToObjectStream(resp.Body, &codeResp); err != nil { + return fmt.Errorf("parsing response: %w", err) + } + + if codeResp.Code == errorcode.OK { + return nil + } + + return codeResp.ToError() + } + + return fmt.Errorf("unknow response content type: %s", contType) +} diff --git a/api/storage/storage_test.go b/api/storage/storage_test.go new file mode 100644 index 0000000..005d62f --- /dev/null +++ b/api/storage/storage_test.go @@ -0,0 +1,89 @@ +package storage + +import ( + "bytes" + "io" + "testing" + + "github.com/google/uuid" + . "github.com/smartystreets/goconvey/convey" + "gitlink.org.cn/cloudream/common/models" +) + +func Test_Object(t *testing.T) { + Convey("上传,下载,删除", t, func() { + cli := NewClient("http://localhost:7890") + + fileData := make([]byte, 4096) + for i := 0; i < len(fileData); i++ { + fileData[i] = byte(i) + } + + upResp, err := cli.ObjectUpload(ObjectUploadReq{ + UserID: 0, + BucketID: 1, + FileSize: 4096, + ObjectName: uuid.NewString(), + Redundancy: RedundancyConfig{ + Type: models.RedundancyRep, + Config: models.NewRepRedundancyConfig(1), + }, + File: bytes.NewBuffer(fileData), + }) + So(err, ShouldBeNil) + + downFs, err := cli.ObjectDownload(ObjectDownloadReq{ + UserID: 0, + ObjectID: upResp.ObjectID, + }) + So(err, ShouldBeNil) + + downFileData, err := io.ReadAll(downFs) + So(err, ShouldBeNil) + So(downFileData, ShouldResemble, fileData) + downFs.Close() + + err = cli.ObjectDelete(ObjectDeleteReq{ + UserID: 0, + ObjectID: upResp.ObjectID, + }) + So(err, ShouldBeNil) + }) +} + +func Test_Storage(t *testing.T) { + Convey("上传后调度文件", t, func() { + cli := NewClient("http://localhost:7890") + + fileData := make([]byte, 4096) + for i := 0; i < len(fileData); i++ { + fileData[i] = byte(i) + } + + upResp, err := cli.ObjectUpload(ObjectUploadReq{ + UserID: 0, + BucketID: 1, + FileSize: 4096, + ObjectName: uuid.NewString(), + Redundancy: RedundancyConfig{ + Type: models.RedundancyRep, + Config: models.NewRepRedundancyConfig(1), + }, + File: bytes.NewBuffer(fileData), + }) + So(err, ShouldBeNil) + + err = cli.StorageMoveObject(StorageMoveObjectReq{ + UserID: 0, + ObjectID: upResp.ObjectID, + StorageID: 1, + }) + So(err, ShouldBeNil) + + err = cli.ObjectDelete(ObjectDeleteReq{ + UserID: 0, + ObjectID: upResp.ObjectID, + }) + So(err, ShouldBeNil) + }) +} diff --git a/models/models.go b/models/models.go index 10d8a61..f2ee3f0 100644 --- a/models/models.go +++ b/models/models.go @@ -7,6 +7,7 @@ const ( RedundancyEC = "ec" ) +// 冗余模式的描述信息 type RedundancyConfigTypes interface{} type RedundancyConfigTypesConst interface { RepRedundancyConfig | ECRedundancyConfig @@ -15,9 +16,16 @@ type RepRedundancyConfig struct { RepCount int `json:"repCount"` } +func NewRepRedundancyConfig(repCount int) RepRedundancyConfig { + return RepRedundancyConfig{ + RepCount: repCount, + } +} + type ECRedundancyConfig struct { } +// 冗余模式的具体配置 type RedundancyDataTypes interface{} type RedundancyDataTypesConst interface { RepRedundancyData | ECRedundancyData diff --git a/pkg/mq/message_dispatcher.go b/pkg/mq/message_dispatcher.go new file mode 100644 index 0000000..0a78c6d --- /dev/null +++ b/pkg/mq/message_dispatcher.go @@ -0,0 +1,63 @@ +package mq + +import ( + "fmt" + + myreflect "gitlink.org.cn/cloudream/common/utils/reflect" +) + +type HandlerFn func(svcBase any, msg *Message) (*Message, error) + +type MessageDispatcher struct { + Handlers map[myreflect.Type]HandlerFn +} + +func NewMessageDispatcher() MessageDispatcher { + return MessageDispatcher{ + Handlers: make(map[myreflect.Type]HandlerFn), + } +} + +func (h *MessageDispatcher) Add(typ myreflect.Type, handler HandlerFn) { + h.Handlers[typ] = handler +} + +func (h *MessageDispatcher) Handle(svcBase any, msg *Message) (*Message, error) { + typ := myreflect.TypeOfValue(msg.Body) + fn, ok := h.Handlers[typ] + if !ok { + return nil, fmt.Errorf("unsupported message type: %s", typ.Name()) + } + + return fn(svcBase, msg) +} + +// 将Service中的一个接口函数作为指定类型消息的处理函数 +func AddServiceFn[TSvc any, TReq any, TResp any](dispatcher *MessageDispatcher, svcFn func(svc TSvc, msg *TReq) (*TResp, *CodeMessage)) { + dispatcher.Add(myreflect.TypeOf[TReq](), func(svcBase any, reqMsg *Message) (*Message, error) { + + reqMsgBody := reqMsg.Body.(TReq) + ret, codeMsg := svcFn(svcBase.(TSvc), &reqMsgBody) + + var body MessageBodyTypes + if ret != nil { + body = *ret + } + + respMsg := MakeMessage(body) + respMsg.SetCodeMessage(codeMsg.Code, codeMsg.Message) + + return &respMsg, nil + }) +} + +// 将Service中的一个*没有返回值的*接口函数作为指定类型消息的处理函数 +func AddNoRespServiceFn[TSvc any, TReq any](dispatcher *MessageDispatcher, svcFn func(svc TSvc, msg *TReq)) { + dispatcher.Add(myreflect.TypeOf[TReq](), func(svcBase any, reqMsg *Message) (*Message, error) { + + reqMsgBody := reqMsg.Body.(TReq) + svcFn(svcBase.(TSvc), &reqMsgBody) + + return nil, nil + }) +} diff --git a/utils/http/http.go b/utils/http/http.go new file mode 100644 index 0000000..56b7666 --- /dev/null +++ b/utils/http/http.go @@ -0,0 +1,282 @@ +package http + +import ( + "fmt" + "io" + "mime/multipart" + "net/http" + ul "net/url" + "strings" + + "gitlink.org.cn/cloudream/common/utils/serder" +) + +const ( + ContentTypeJSON = "application/json" + ContentTypeForm = "application/x-www-form-urlencoded" + ContentTypeMultiPart = "multipart/form-data" + ContentTypeOctetStream = "application/octet-stream" +) + +type RequestParam struct { + Header any + Query any + Body any +} + +func GetJSON(url string, param RequestParam) (*http.Response, error) { + req, err := http.NewRequest(http.MethodGet, url, nil) + if err != nil { + return nil, err + } + + if err = prepareQuery(req, param.Query); err != nil { + return nil, err + } + + if err = prepareHeader(req, param.Header); err != nil { + return nil, err + } + + if err = prepareJSONBody(req, param.Body); err != nil { + return nil, err + } + + return http.DefaultClient.Do(req) +} + +func GetForm(url string, param RequestParam) (*http.Response, error) { + req, err := http.NewRequest(http.MethodGet, url, nil) + if err != nil { + return nil, err + } + + if err = prepareQuery(req, param.Query); err != nil { + return nil, err + } + + if err = prepareHeader(req, param.Header); err != nil { + return nil, err + } + + if err = prepareFormBody(req, param.Body); err != nil { + return nil, err + } + + return http.DefaultClient.Do(req) +} + +func PostJSON(url string, param RequestParam) (*http.Response, error) { + req, err := http.NewRequest(http.MethodPost, url, nil) + if err != nil { + return nil, err + } + + if err = prepareQuery(req, param.Query); err != nil { + return nil, err + } + + if err = prepareHeader(req, param.Header); err != nil { + return nil, err + } + + if err = prepareJSONBody(req, param.Body); err != nil { + return nil, err + } + + return http.DefaultClient.Do(req) +} + +func PostForm(url string, param RequestParam) (*http.Response, error) { + req, err := http.NewRequest(http.MethodPost, url, nil) + if err != nil { + return nil, err + } + + if err = prepareQuery(req, param.Query); err != nil { + return nil, err + } + + if err = prepareHeader(req, param.Header); err != nil { + return nil, err + } + + if err = prepareFormBody(req, param.Body); err != nil { + return nil, err + } + + return http.DefaultClient.Do(req) +} + +type MultiPartRequestParam struct { + Header any + Query any + Form any + DataName string + Data io.Reader +} + +func PostMultiPart(url string, param MultiPartRequestParam) (*http.Response, error) { + req, err := http.NewRequest(http.MethodPost, url, nil) + if err != nil { + return nil, err + } + + if err = prepareQuery(req, param.Query); err != nil { + return nil, err + } + + if err = prepareHeader(req, param.Header); err != nil { + return nil, err + } + + pr, pw := io.Pipe() + muWriter := multipart.NewWriter(pw) + + setHeader(req.Header, "Content-Type", fmt.Sprintf("%s;boundary=%s", ContentTypeMultiPart, muWriter.Boundary())) + + writeResult := make(chan error, 1) + go func() { + writeResult <- func() error { + defer pw.Close() + defer muWriter.Close() + + if param.Form != nil { + mp, err := serder.ObjectToMap(param.Form) + if err != nil { + return fmt.Errorf("formValues object to map failed, err: %w", err) + } + + for k, v := range mp { + err := muWriter.WriteField(k, fmt.Sprintf("%v", v)) + if err != nil { + return fmt.Errorf("write form field failed, err: %w", err) + } + } + } + + if param.Data != nil { + w, err := muWriter.CreateFormFile("file", param.DataName) + if err != nil { + return fmt.Errorf("create form file failed, err: %w", err) + } + + _, err = io.Copy(w, param.Data) + if err != nil { + return err + } + } + return nil + }() + }() + + req.Body = pr + + cli := http.Client{} + resp, err := cli.Do(req) + if err != nil { + return nil, err + } + + writeErr := <-writeResult + if writeErr != nil { + return nil, writeErr + } + + return resp, nil +} + +func prepareQuery(req *http.Request, query any) error { + if query == nil { + return nil + } + + mp, ok := query.(map[string]any) + if !ok { + var err error + if mp, err = serder.ObjectToMap(query); err != nil { + return fmt.Errorf("query object to map: %w", err) + } + } + + values := make(ul.Values) + for k, v := range mp { + values.Add(k, fmt.Sprintf("%v", v)) + } + + req.URL.RawQuery = values.Encode() + return nil +} + +func prepareHeader(req *http.Request, header any) error { + if header == nil { + return nil + } + + mp, ok := header.(map[string]any) + if !ok { + var err error + if mp, err = serder.ObjectToMap(header); err != nil { + return fmt.Errorf("header object to map: %w", err) + } + } + + req.Header = make(http.Header) + for k, v := range mp { + req.Header.Set(k, fmt.Sprintf("%v", v)) + } + return nil +} + +func prepareJSONBody(req *http.Request, body any) error { + setHeader(req.Header, "Content-Type", ContentTypeJSON) + + if body == nil { + return nil + } + + req.Body = serder.ObjectToJSONStream(body) + return nil +} + +func prepareFormBody(req *http.Request, body any) error { + setHeader(req.Header, "Content-Type", ContentTypeForm) + + if body == nil { + return nil + } + + mp, ok := body.(map[string]any) + if !ok { + var err error + if mp, err = serder.ObjectToMap(body); err != nil { + return fmt.Errorf("body object to map: %w", err) + } + } + + values := make(ul.Values) + for k, v := range mp { + values.Add(k, fmt.Sprintf("%v", v)) + } + + req.Body = io.NopCloser(strings.NewReader(values.Encode())) + return nil +} + +func setHeader(mp http.Header, key, value string) http.Header { + if mp == nil { + mp = make(http.Header) + } + + mp.Set(key, value) + return mp +} + +func setValue(values ul.Values, key, value string) ul.Values { + if values == nil { + values = make(ul.Values) + } + + values.Add(key, value) + return values +} diff --git a/utils/serder/serder.go b/utils/serder/serder.go index 7efab53..abf0fed 100644 --- a/utils/serder/serder.go +++ b/utils/serder/serder.go @@ -3,6 +3,7 @@ package serder import ( "encoding/json" "fmt" + "io" "reflect" "strings" ) @@ -11,10 +12,36 @@ func ObjectToJSON(obj any) ([]byte, error) { return json.Marshal(obj) } +func ObjectToJSONStream(obj any) io.ReadCloser { + pr, pw := io.Pipe() + enc := json.NewEncoder(pw) + + go func() { + err := enc.Encode(obj) + if err != nil && err != io.EOF { + pw.CloseWithError(err) + } else { + pw.Close() + } + }() + + return pr +} + func JSONToObject(data []byte, obj any) error { return json.Unmarshal(data, obj) } +func JSONToObjectStream(str io.Reader, obj any) error { + dec := json.NewDecoder(str) + err := dec.Decode(obj) + if err != io.EOF { + return err + } + + return nil +} + type TypeResolver interface { TypeToString(typ reflect.Type) (string, error) StringToType(typeStr string) (reflect.Type, error)