From 06338c884fdd0553057d22fc9009e0c1e50bd8f4 Mon Sep 17 00:00:00 2001 From: Sydonian <794346190@qq.com> Date: Sun, 8 Oct 2023 16:22:21 +0800 Subject: [PATCH 1/3] =?UTF-8?q?=E5=A2=9E=E5=8A=A0=E8=8E=B7=E5=8F=96Storage?= =?UTF-8?q?=E4=BF=A1=E6=81=AF=E7=9A=84=E6=8E=A5=E5=8F=A3?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- sdks/storage/storage.go | 62 +++++++++++++++++++++++++++--------- sdks/storage/storage_test.go | 2 +- utils/http/http.go | 14 ++++++++ 3 files changed, 62 insertions(+), 16 deletions(-) diff --git a/sdks/storage/storage.go b/sdks/storage/storage.go index a663741..f8c7a5f 100644 --- a/sdks/storage/storage.go +++ b/sdks/storage/storage.go @@ -15,35 +15,33 @@ type StorageLoadPackageReq struct { PackageID int64 `json:"packageID"` StorageID int64 `json:"storageID"` } +type StorageLoadPackageResp struct { + FullPath string `json:"fullPath"` +} -func (c *Client) StorageLoadPackage(req StorageLoadPackageReq) error { +func (c *Client) StorageLoadPackage(req StorageLoadPackageReq) (*StorageLoadPackageResp, error) { url, err := url.JoinPath(c.baseURL, "/storage/loadPackage") if err != nil { - return err + return nil, err } resp, err := myhttp.PostJSON(url, myhttp.RequestParam{ Body: req, }) if err != nil { - return err + return nil, err } - contType := resp.Header.Get("Content-Type") - if strings.Contains(contType, myhttp.ContentTypeJSON) { - var codeResp response[any] - if err := serder.JSONToObjectStream(resp.Body, &codeResp); err != nil { - return fmt.Errorf("parsing response: %w", err) - } - - if codeResp.Code == errorcode.OK { - return nil - } + codeResp, err := myhttp.ParseJSONResponse[response[StorageLoadPackageResp]](resp) + if err != nil { + return nil, err + } - return codeResp.ToError() + if codeResp.Code == errorcode.OK { + return &codeResp.Data, nil } - return fmt.Errorf("unknow response content type: %s", contType) + return nil, codeResp.ToError() } type StorageCreatePackageReq struct { @@ -88,3 +86,37 @@ func (c *Client) StorageCreatePackage(req StorageCreatePackageReq) (*StorageCrea return nil, fmt.Errorf("unknow response content type: %s", contType) } + +type StorageGetInfoReq struct { + StorageID int64 `json:"storageID"` +} +type StorageGetInfoResp struct { + Name string `json:"name"` + NodeID int64 `json:"nodeID"` + Directory string `json:"directory"` +} + +func (c *Client) StorageGetInfo(req StorageGetInfoReq) (*StorageGetInfoResp, error) { + url, err := url.JoinPath(c.baseURL, "/storage/getInfo") + if err != nil { + return nil, err + } + + resp, err := myhttp.GetForm(url, myhttp.RequestParam{ + Body: req, + }) + if err != nil { + return nil, err + } + + codeResp, err := myhttp.ParseJSONResponse[response[StorageGetInfoResp]](resp) + if err != nil { + return nil, err + } + + if codeResp.Code == errorcode.OK { + return &codeResp.Data, nil + } + + return nil, codeResp.ToError() +} diff --git a/sdks/storage/storage_test.go b/sdks/storage/storage_test.go index deace22..68fbc45 100644 --- a/sdks/storage/storage_test.go +++ b/sdks/storage/storage_test.go @@ -96,7 +96,7 @@ func Test_Storage(t *testing.T) { }) So(err, ShouldBeNil) - err = cli.StorageLoadPackage(StorageLoadPackageReq{ + _, err = cli.StorageLoadPackage(StorageLoadPackageReq{ UserID: 0, PackageID: upResp.PackageID, StorageID: 1, diff --git a/utils/http/http.go b/utils/http/http.go index ff3dc77..48824a9 100644 --- a/utils/http/http.go +++ b/utils/http/http.go @@ -109,6 +109,20 @@ func PostForm(url string, param RequestParam) (*http.Response, error) { return http.DefaultClient.Do(req) } +func ParseJSONResponse[TBody any](resp *http.Response) (TBody, error) { + var ret TBody + contType := resp.Header.Get("Content-Type") + if strings.Contains(contType, ContentTypeJSON) { + if err := serder.JSONToObjectStream(resp.Body, &ret); err != nil { + return ret, fmt.Errorf("parsing response: %w", err) + } + + return ret, nil + } + + return ret, fmt.Errorf("unknow response content type: %s", contType) +} + type MultiPartRequestParam struct { Header any Query any From 2f701891475a51435bc05f95f6b66c97daee2631 Mon Sep 17 00:00:00 2001 From: Sydonian <794346190@qq.com> Date: Tue, 10 Oct 2023 17:15:46 +0800 Subject: [PATCH 2/3] =?UTF-8?q?=E5=AD=98=E5=82=A8=E7=B3=BB=E7=BB=9F?= =?UTF-8?q?=E5=A2=9E=E5=8A=A0=E8=8E=B7=E5=8F=96package=E4=B8=ADobject?= =?UTF-8?q?=E7=9A=84filehash=E7=9A=84=E6=8E=A5=E5=8F=A3?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- sdks/storage/cache.go | 53 +++++++++++++++++++++++--------- sdks/storage/models.go | 21 +++++++++++-- sdks/storage/package.go | 33 ++++++++++++++++++++ sdks/storage/storage_test.go | 59 +++++++++++++++++++++++++++++++++++- 4 files changed, 148 insertions(+), 18 deletions(-) diff --git a/sdks/storage/cache.go b/sdks/storage/cache.go index e2e3229..f0b8d46 100644 --- a/sdks/storage/cache.go +++ b/sdks/storage/cache.go @@ -1,13 +1,10 @@ package stgsdk import ( - "fmt" "net/url" - "strings" "gitlink.org.cn/cloudream/common/consts/errorcode" myhttp "gitlink.org.cn/cloudream/common/utils/http" - "gitlink.org.cn/cloudream/common/utils/serder" ) type CacheMovePackageReq struct { @@ -32,19 +29,47 @@ func (c *Client) CacheMovePackage(req CacheMovePackageReq) (*CacheMovePackageRes return nil, err } - contType := resp.Header.Get("Content-Type") - if strings.Contains(contType, myhttp.ContentTypeJSON) { - var codeResp response[CacheMovePackageResp] - if err := serder.JSONToObjectStream(resp.Body, &codeResp); err != nil { - return nil, fmt.Errorf("parsing response: %w", err) - } + jsonResp, err := myhttp.ParseJSONResponse[response[CacheMovePackageResp]](resp) + if err != nil { + return nil, err + } + + if jsonResp.Code == errorcode.OK { + return &jsonResp.Data, nil + } + + return nil, jsonResp.ToError() +} + +type GetPackageObjectCacheInfosReq struct { + UserID int64 `json:"userID"` + PackageID int64 `json:"packageID"` +} +type GetPackageObjectCacheInfosResp struct { + Infos []ObjectCacheInfo `json:"cacheInfos"` +} - if codeResp.Code == errorcode.OK { - return &codeResp.Data, nil - } +func (c *Client) GetPackageObjectCacheInfos(req GetPackageObjectCacheInfosReq) (*GetPackageObjectCacheInfosResp, error) { + url, err := url.JoinPath(c.baseURL, "/cache/getPackageObjectCacheInfos") + if err != nil { + return nil, err + } + + resp, err := myhttp.GetForm(url, myhttp.RequestParam{ + Query: req, + }) + if err != nil { + return nil, err + } + + jsonResp, err := myhttp.ParseJSONResponse[response[GetPackageObjectCacheInfosResp]](resp) + if err != nil { + return nil, err + } - return nil, codeResp.ToError() + if jsonResp.Code == errorcode.OK { + return &jsonResp.Data, nil } - return nil, fmt.Errorf("unknow response content type: %s", contType) + return nil, jsonResp.ToError() } diff --git a/sdks/storage/models.go b/sdks/storage/models.go index 91ff60a..cdd0afe 100644 --- a/sdks/storage/models.go +++ b/sdks/storage/models.go @@ -129,14 +129,29 @@ func NewPackageCachingInfo(nodeInfos []NodePackageCachingInfo, packageSize int64 } } +type Object struct { + ObjectID int64 `db:"ObjectID" json:"objectID"` + PackageID int64 `db:"PackageID" json:"packageID"` + Path string `db:"Path" json:"path"` + Size int64 `db:"Size" json:"size,string"` +} + +type Package struct { + PackageID int64 `db:"PackageID" json:"packageID"` + Name string `db:"Name" json:"name"` + BucketID int64 `db:"BucketID" json:"bucketID"` + State string `db:"State" json:"state"` + Redundancy TypedRedundancyInfo `db:"Redundancy" json:"redundancy"` +} + type ObjectCacheInfo struct { - ObjectID int64 `json:"objectID"` + Object Object `json:"object"` FileHash string `json:"fileHash"` } -func NewObjectCacheInfo(objectID int64, fileHash string) ObjectCacheInfo { +func NewObjectCacheInfo(object Object, fileHash string) ObjectCacheInfo { return ObjectCacheInfo{ - ObjectID: objectID, + Object: object, FileHash: fileHash, } } diff --git a/sdks/storage/package.go b/sdks/storage/package.go index d111a65..4462cb0 100644 --- a/sdks/storage/package.go +++ b/sdks/storage/package.go @@ -12,6 +12,39 @@ import ( "gitlink.org.cn/cloudream/common/utils/serder" ) +type PackageGetReq struct { + UserID int64 `json:"userID"` + PackageID int64 `json:"packageID"` +} +type PackageGetResp struct { + Package +} + +func (c *Client) PackageGet(req PackageGetReq) (*PackageGetResp, error) { + url, err := url.JoinPath(c.baseURL, "/package/get") + if err != nil { + return nil, err + } + + resp, err := myhttp.GetForm(url, myhttp.RequestParam{ + Query: req, + }) + if err != nil { + return nil, err + } + + codeResp, err := myhttp.ParseJSONResponse[response[PackageGetResp]](resp) + if err != nil { + return nil, err + } + + if codeResp.Code == errorcode.OK { + return &codeResp.Data, nil + } + + return nil, codeResp.ToError() +} + type PackageUploadReq struct { UserID int64 `json:"userID"` BucketID int64 `json:"bucketID"` diff --git a/sdks/storage/storage_test.go b/sdks/storage/storage_test.go index 68fbc45..2b6d3dc 100644 --- a/sdks/storage/storage_test.go +++ b/sdks/storage/storage_test.go @@ -11,6 +11,56 @@ import ( "gitlink.org.cn/cloudream/common/pkgs/iterator" ) +func Test_PackageGet(t *testing.T) { + Convey("上传后获取Package信息", t, func() { + cli := NewClient(&Config{ + URL: "http://localhost:7890", + }) + + fileData := make([]byte, 4096) + for i := 0; i < len(fileData); i++ { + fileData[i] = byte(i) + } + + pkgName := uuid.NewString() + upResp, err := cli.PackageUpload(PackageUploadReq{ + UserID: 0, + BucketID: 1, + Name: pkgName, + Redundancy: TypedRedundancyInfo{ + Type: RedundancyRep, + Info: NewRepRedundancyInfo(1), + }, + Files: iterator.Array( + &IterPackageUploadFile{ + Path: "test", + File: io.NopCloser(bytes.NewBuffer(fileData)), + }, + &IterPackageUploadFile{ + Path: "test2", + File: io.NopCloser(bytes.NewBuffer(fileData)), + }, + ), + }) + So(err, ShouldBeNil) + + getResp, err := cli.PackageGet(PackageGetReq{ + UserID: 0, + PackageID: upResp.PackageID, + }) + So(err, ShouldBeNil) + + So(getResp.PackageID, ShouldEqual, upResp.PackageID) + So(getResp.Package.Name, ShouldEqual, pkgName) + + err = cli.PackageDelete(PackageDeleteReq{ + UserID: 0, + PackageID: upResp.PackageID, + }) + So(err, ShouldBeNil) + }) +} + func Test_Object(t *testing.T) { Convey("上传,下载,删除", t, func() { cli := NewClient(&Config{ @@ -149,7 +199,14 @@ func Test_Cache(t *testing.T) { NodeID: 1, }) So(err, ShouldBeNil) - So(len(cacheMoveResp.CacheInfos), ShouldEqual, 2) + + cacheInfoResp, err := cli.GetPackageObjectCacheInfos(GetPackageObjectCacheInfosReq{ + UserID: 0, + PackageID: upResp.PackageID, + }) + So(err, ShouldBeNil) + + So(cacheInfoResp.Infos, ShouldResemble, cacheMoveResp.CacheInfos) err = cli.PackageDelete(PackageDeleteReq{ UserID: 0, From 71df48ef6c70c5ee0c09d770a9209739629cb7ec Mon Sep 17 00:00:00 2001 From: Sydonian <794346190@qq.com> Date: Wed, 11 Oct 2023 14:59:19 +0800 Subject: [PATCH 3/3] =?UTF-8?q?=E5=88=9D=E6=AD=A5=E5=AE=8C=E6=88=90IMFS?= =?UTF-8?q?=E6=9C=8D=E5=8A=A1?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- pkgs/ipfs/ipfs.go | 34 ++++++++++++++++++++++-- sdks/imfs/client.go | 51 ++++++++++++++++++++++++++++++++++++ sdks/imfs/config.go | 5 ++++ sdks/imfs/imfs_test.go | 38 +++++++++++++++++++++++++++ sdks/imfs/ipfs.go | 50 +++++++++++++++++++++++++++++++++++ sdks/imfs/package.go | 45 +++++++++++++++++++++++++++++++ sdks/storage/cache.go | 8 +++--- sdks/storage/storage_test.go | 2 +- 8 files changed, 226 insertions(+), 7 deletions(-) create mode 100644 sdks/imfs/client.go create mode 100644 sdks/imfs/config.go create mode 100644 sdks/imfs/imfs_test.go create mode 100644 sdks/imfs/ipfs.go create mode 100644 sdks/imfs/package.go diff --git a/pkgs/ipfs/ipfs.go b/pkgs/ipfs/ipfs.go index 142b09a..9e560b7 100644 --- a/pkgs/ipfs/ipfs.go +++ b/pkgs/ipfs/ipfs.go @@ -9,6 +9,11 @@ import ( myio "gitlink.org.cn/cloudream/common/utils/io" ) +type ReadOption struct { + Offset int64 // 从指定位置开始读取,为-1时代表不设置,从头开始读 + Length int64 // 读取长度,为-1时代表不设置,读取Offset之后的所有内容 +} + type Client struct { shell *shell.Shell } @@ -54,8 +59,33 @@ func (fs *Client) CreateFile(file io.Reader) (string, error) { return fs.shell.Add(file) } -func (fs *Client) OpenRead(hash string) (io.ReadCloser, error) { - return fs.shell.Cat(hash) +func (fs *Client) OpenRead(hash string, opts ...ReadOption) (io.ReadCloser, error) { + opt := ReadOption{ + Offset: 0, + Length: -1, + } + if len(opts) > 0 { + opt = opts[0] + } + + req := fs.shell.Request("cat", hash) + if opt.Offset >= 0 { + req.Option("offset", opt.Offset) + } + + if opt.Length >= 0 { + req.Option("length", opt.Length) + } + + resp, err := req.Send(context.Background()) + if err != nil { + return nil, err + } + if resp.Error != nil { + return nil, resp.Error + } + + return resp.Output, nil } func (fs *Client) Pin(hash string) error { diff --git a/sdks/imfs/client.go b/sdks/imfs/client.go new file mode 100644 index 0000000..4759232 --- /dev/null +++ b/sdks/imfs/client.go @@ -0,0 +1,51 @@ +package imsdk + +import ( + "gitlink.org.cn/cloudream/common/sdks" +) + +type response[T any] struct { + Code string `json:"code"` + Message string `json:"message"` + Data T `json:"data"` +} + +func (r *response[T]) ToError() *sdks.CodeMessageError { + return &sdks.CodeMessageError{ + Code: r.Code, + Message: r.Message, + } +} + +type Client struct { + baseURL string +} + +func NewClient(cfg *Config) *Client { + return &Client{ + baseURL: cfg.URL, + } +} + +type Pool interface { + Acquire() (*Client, error) + Release(cli *Client) +} + +type pool struct { + cfg *Config +} + +func NewPool(cfg *Config) Pool { + return &pool{ + cfg: cfg, + } +} +func (p *pool) Acquire() (*Client, error) { + cli := NewClient(p.cfg) + return cli, nil +} + +func (p *pool) Release(cli *Client) { + +} diff --git a/sdks/imfs/config.go b/sdks/imfs/config.go new file mode 100644 index 0000000..2f0adce --- /dev/null +++ b/sdks/imfs/config.go @@ -0,0 +1,5 @@ +package imsdk + +type Config struct { + URL string `json:"url"` +} diff --git a/sdks/imfs/imfs_test.go b/sdks/imfs/imfs_test.go new file mode 100644 index 0000000..3610cfc --- /dev/null +++ b/sdks/imfs/imfs_test.go @@ -0,0 +1,38 @@ +package imsdk + +import ( + "io" + "testing" + + . "github.com/smartystreets/goconvey/convey" +) + +func Test_IPFSRead(t *testing.T) { + Convey("读取IPFS文件", t, func() { + cli := NewClient(&Config{ + URL: "http://localhost:7893", + }) + + file, err := cli.IPFSRead(IPFSRead{ + FileHash: "QmcYsRZxmYGgSaydEiJwJRMsD8uWzS2x8gCt1iGMtsZKsU", + Length: 2, + }) + So(err, ShouldBeNil) + defer file.Close() + + data, err := io.ReadAll(file) + So(err, ShouldBeNil) + So(len(data), ShouldEqual, 2) + }) +} + +func Test_Package(t *testing.T) { + Convey("获取Package文件列表", t, func() { + cli := NewClient(&Config{ + URL: "http://localhost:7893", + }) + + _, err := cli.PackageGetWithObjectCacheInfos(PackageGetWithObjectCacheInfos{UserID: 0, PackageID: 13}) + So(err, ShouldBeNil) + }) +} diff --git a/sdks/imfs/ipfs.go b/sdks/imfs/ipfs.go new file mode 100644 index 0000000..62d7061 --- /dev/null +++ b/sdks/imfs/ipfs.go @@ -0,0 +1,50 @@ +package imsdk + +import ( + "fmt" + "io" + "net/url" + "strings" + + myhttp "gitlink.org.cn/cloudream/common/utils/http" + "gitlink.org.cn/cloudream/common/utils/serder" +) + +const IPFSReadPath = "/ipfs/read" + +type IPFSRead struct { + FileHash string `json:"fileHash"` + Offset int64 `json:"offset"` + Length int64 `json:"length,omitempty"` // 接口允许设置Length为0,所以这里只能omitempty +} + +func (c *Client) IPFSRead(req IPFSRead) (io.ReadCloser, error) { + url, err := url.JoinPath(c.baseURL, IPFSReadPath) + if err != nil { + return nil, err + } + + resp, err := myhttp.GetForm(url, myhttp.RequestParam{ + Query: req, + }) + if err != nil { + return nil, err + } + + contType := resp.Header.Get("Content-Type") + + if strings.Contains(contType, myhttp.ContentTypeJSON) { + var codeResp response[any] + if err := serder.JSONToObjectStream(resp.Body, &codeResp); err != nil { + return nil, fmt.Errorf("parsing response: %w", err) + } + + return nil, codeResp.ToError() + } + + if strings.Contains(contType, myhttp.ContentTypeOctetStream) { + return resp.Body, nil + } + + return nil, fmt.Errorf("unknow response content type: %s", contType) +} diff --git a/sdks/imfs/package.go b/sdks/imfs/package.go new file mode 100644 index 0000000..5eb59c5 --- /dev/null +++ b/sdks/imfs/package.go @@ -0,0 +1,45 @@ +package imsdk + +import ( + "net/url" + + "gitlink.org.cn/cloudream/common/consts/errorcode" + stgsdk "gitlink.org.cn/cloudream/common/sdks/storage" + myhttp "gitlink.org.cn/cloudream/common/utils/http" +) + +const PackageGetWithObjectCacheInfosPath = "/package/getWithObjectCacheInfos" + +type PackageGetWithObjectCacheInfos struct { + UserID int64 `json:"userID"` + PackageID int64 `json:"packageID"` +} +type PackageGetWithObjectCacheInfosResp struct { + Package stgsdk.Package `json:"package"` + ObjectCacheInfos []stgsdk.ObjectCacheInfo `json:"objectCacheInfos"` +} + +func (c *Client) PackageGetWithObjectCacheInfos(req PackageGetWithObjectCacheInfos) (*PackageGetWithObjectCacheInfosResp, error) { + url, err := url.JoinPath(c.baseURL, PackageGetWithObjectCacheInfosPath) + if err != nil { + return nil, err + } + + resp, err := myhttp.GetForm(url, myhttp.RequestParam{ + Query: req, + }) + if err != nil { + return nil, err + } + + jsonResp, err := myhttp.ParseJSONResponse[response[PackageGetWithObjectCacheInfosResp]](resp) + if err != nil { + return nil, err + } + + if jsonResp.Code == errorcode.OK { + return &jsonResp.Data, nil + } + + return nil, jsonResp.ToError() +} diff --git a/sdks/storage/cache.go b/sdks/storage/cache.go index f0b8d46..18bc5f7 100644 --- a/sdks/storage/cache.go +++ b/sdks/storage/cache.go @@ -41,15 +41,15 @@ func (c *Client) CacheMovePackage(req CacheMovePackageReq) (*CacheMovePackageRes return nil, jsonResp.ToError() } -type GetPackageObjectCacheInfosReq struct { +type CacheGetPackageObjectCacheInfosReq struct { UserID int64 `json:"userID"` PackageID int64 `json:"packageID"` } -type GetPackageObjectCacheInfosResp struct { +type CacheGetPackageObjectCacheInfosResp struct { Infos []ObjectCacheInfo `json:"cacheInfos"` } -func (c *Client) GetPackageObjectCacheInfos(req GetPackageObjectCacheInfosReq) (*GetPackageObjectCacheInfosResp, error) { +func (c *Client) CacheGetPackageObjectCacheInfos(req CacheGetPackageObjectCacheInfosReq) (*CacheGetPackageObjectCacheInfosResp, error) { url, err := url.JoinPath(c.baseURL, "/cache/getPackageObjectCacheInfos") if err != nil { return nil, err @@ -62,7 +62,7 @@ func (c *Client) GetPackageObjectCacheInfos(req GetPackageObjectCacheInfosReq) ( return nil, err } - jsonResp, err := myhttp.ParseJSONResponse[response[GetPackageObjectCacheInfosResp]](resp) + jsonResp, err := myhttp.ParseJSONResponse[response[CacheGetPackageObjectCacheInfosResp]](resp) if err != nil { return nil, err } diff --git a/sdks/storage/storage_test.go b/sdks/storage/storage_test.go index 2b6d3dc..b6a9e8f 100644 --- a/sdks/storage/storage_test.go +++ b/sdks/storage/storage_test.go @@ -200,7 +200,7 @@ func Test_Cache(t *testing.T) { }) So(err, ShouldBeNil) - cacheInfoResp, err := cli.GetPackageObjectCacheInfos(GetPackageObjectCacheInfosReq{ + cacheInfoResp, err := cli.CacheGetPackageObjectCacheInfos(CacheGetPackageObjectCacheInfosReq{ UserID: 0, PackageID: upResp.PackageID, })