Browse Source

Merge pull request '增加获取Storage信息的接口' (#26) from feature_gxh into master

pull/28/head
baohan 2 years ago
parent
commit
2b124375a1
12 changed files with 431 additions and 36 deletions
  1. +32
    -2
      pkgs/ipfs/ipfs.go
  2. +51
    -0
      sdks/imfs/client.go
  3. +5
    -0
      sdks/imfs/config.go
  4. +38
    -0
      sdks/imfs/imfs_test.go
  5. +50
    -0
      sdks/imfs/ipfs.go
  6. +45
    -0
      sdks/imfs/package.go
  7. +39
    -14
      sdks/storage/cache.go
  8. +18
    -3
      sdks/storage/models.go
  9. +33
    -0
      sdks/storage/package.go
  10. +47
    -15
      sdks/storage/storage.go
  11. +59
    -2
      sdks/storage/storage_test.go
  12. +14
    -0
      utils/http/http.go

+ 32
- 2
pkgs/ipfs/ipfs.go View File

@@ -9,6 +9,11 @@ import (
myio "gitlink.org.cn/cloudream/common/utils/io"
)

type ReadOption struct {
Offset int64 // 从指定位置开始读取,为-1时代表不设置,从头开始读
Length int64 // 读取长度,为-1时代表不设置,读取Offset之后的所有内容
}

type Client struct {
shell *shell.Shell
}
@@ -54,8 +59,33 @@ func (fs *Client) CreateFile(file io.Reader) (string, error) {
return fs.shell.Add(file)
}

func (fs *Client) OpenRead(hash string) (io.ReadCloser, error) {
return fs.shell.Cat(hash)
func (fs *Client) OpenRead(hash string, opts ...ReadOption) (io.ReadCloser, error) {
opt := ReadOption{
Offset: 0,
Length: -1,
}
if len(opts) > 0 {
opt = opts[0]
}

req := fs.shell.Request("cat", hash)
if opt.Offset >= 0 {
req.Option("offset", opt.Offset)
}

if opt.Length >= 0 {
req.Option("length", opt.Length)
}

resp, err := req.Send(context.Background())
if err != nil {
return nil, err
}
if resp.Error != nil {
return nil, resp.Error
}

return resp.Output, nil
}

func (fs *Client) Pin(hash string) error {


+ 51
- 0
sdks/imfs/client.go View File

@@ -0,0 +1,51 @@
package imsdk

import (
"gitlink.org.cn/cloudream/common/sdks"
)

type response[T any] struct {
Code string `json:"code"`
Message string `json:"message"`
Data T `json:"data"`
}

func (r *response[T]) ToError() *sdks.CodeMessageError {
return &sdks.CodeMessageError{
Code: r.Code,
Message: r.Message,
}
}

type Client struct {
baseURL string
}

func NewClient(cfg *Config) *Client {
return &Client{
baseURL: cfg.URL,
}
}

type Pool interface {
Acquire() (*Client, error)
Release(cli *Client)
}

type pool struct {
cfg *Config
}

func NewPool(cfg *Config) Pool {
return &pool{
cfg: cfg,
}
}
func (p *pool) Acquire() (*Client, error) {
cli := NewClient(p.cfg)
return cli, nil
}

func (p *pool) Release(cli *Client) {

}

+ 5
- 0
sdks/imfs/config.go View File

@@ -0,0 +1,5 @@
package imsdk

type Config struct {
URL string `json:"url"`
}

+ 38
- 0
sdks/imfs/imfs_test.go View File

@@ -0,0 +1,38 @@
package imsdk

import (
"io"
"testing"

. "github.com/smartystreets/goconvey/convey"
)

func Test_IPFSRead(t *testing.T) {
Convey("读取IPFS文件", t, func() {
cli := NewClient(&Config{
URL: "http://localhost:7893",
})

file, err := cli.IPFSRead(IPFSRead{
FileHash: "QmcYsRZxmYGgSaydEiJwJRMsD8uWzS2x8gCt1iGMtsZKsU",
Length: 2,
})
So(err, ShouldBeNil)
defer file.Close()

data, err := io.ReadAll(file)
So(err, ShouldBeNil)
So(len(data), ShouldEqual, 2)
})
}

func Test_Package(t *testing.T) {
Convey("获取Package文件列表", t, func() {
cli := NewClient(&Config{
URL: "http://localhost:7893",
})

_, err := cli.PackageGetWithObjectCacheInfos(PackageGetWithObjectCacheInfos{UserID: 0, PackageID: 13})
So(err, ShouldBeNil)
})
}

+ 50
- 0
sdks/imfs/ipfs.go View File

@@ -0,0 +1,50 @@
package imsdk

import (
"fmt"
"io"
"net/url"
"strings"

myhttp "gitlink.org.cn/cloudream/common/utils/http"
"gitlink.org.cn/cloudream/common/utils/serder"
)

const IPFSReadPath = "/ipfs/read"

type IPFSRead struct {
FileHash string `json:"fileHash"`
Offset int64 `json:"offset"`
Length int64 `json:"length,omitempty"` // 接口允许设置Length为0,所以这里只能omitempty
}

func (c *Client) IPFSRead(req IPFSRead) (io.ReadCloser, error) {
url, err := url.JoinPath(c.baseURL, IPFSReadPath)
if err != nil {
return nil, err
}

resp, err := myhttp.GetForm(url, myhttp.RequestParam{
Query: req,
})
if err != nil {
return nil, err
}

contType := resp.Header.Get("Content-Type")

if strings.Contains(contType, myhttp.ContentTypeJSON) {
var codeResp response[any]
if err := serder.JSONToObjectStream(resp.Body, &codeResp); err != nil {
return nil, fmt.Errorf("parsing response: %w", err)
}

return nil, codeResp.ToError()
}

if strings.Contains(contType, myhttp.ContentTypeOctetStream) {
return resp.Body, nil
}

return nil, fmt.Errorf("unknow response content type: %s", contType)
}

+ 45
- 0
sdks/imfs/package.go View File

@@ -0,0 +1,45 @@
package imsdk

import (
"net/url"

"gitlink.org.cn/cloudream/common/consts/errorcode"
stgsdk "gitlink.org.cn/cloudream/common/sdks/storage"
myhttp "gitlink.org.cn/cloudream/common/utils/http"
)

const PackageGetWithObjectCacheInfosPath = "/package/getWithObjectCacheInfos"

type PackageGetWithObjectCacheInfos struct {
UserID int64 `json:"userID"`
PackageID int64 `json:"packageID"`
}
type PackageGetWithObjectCacheInfosResp struct {
Package stgsdk.Package `json:"package"`
ObjectCacheInfos []stgsdk.ObjectCacheInfo `json:"objectCacheInfos"`
}

func (c *Client) PackageGetWithObjectCacheInfos(req PackageGetWithObjectCacheInfos) (*PackageGetWithObjectCacheInfosResp, error) {
url, err := url.JoinPath(c.baseURL, PackageGetWithObjectCacheInfosPath)
if err != nil {
return nil, err
}

resp, err := myhttp.GetForm(url, myhttp.RequestParam{
Query: req,
})
if err != nil {
return nil, err
}

jsonResp, err := myhttp.ParseJSONResponse[response[PackageGetWithObjectCacheInfosResp]](resp)
if err != nil {
return nil, err
}

if jsonResp.Code == errorcode.OK {
return &jsonResp.Data, nil
}

return nil, jsonResp.ToError()
}

+ 39
- 14
sdks/storage/cache.go View File

@@ -1,13 +1,10 @@
package stgsdk

import (
"fmt"
"net/url"
"strings"

"gitlink.org.cn/cloudream/common/consts/errorcode"
myhttp "gitlink.org.cn/cloudream/common/utils/http"
"gitlink.org.cn/cloudream/common/utils/serder"
)

type CacheMovePackageReq struct {
@@ -32,19 +29,47 @@ func (c *Client) CacheMovePackage(req CacheMovePackageReq) (*CacheMovePackageRes
return nil, err
}

contType := resp.Header.Get("Content-Type")
if strings.Contains(contType, myhttp.ContentTypeJSON) {
var codeResp response[CacheMovePackageResp]
if err := serder.JSONToObjectStream(resp.Body, &codeResp); err != nil {
return nil, fmt.Errorf("parsing response: %w", err)
}
jsonResp, err := myhttp.ParseJSONResponse[response[CacheMovePackageResp]](resp)
if err != nil {
return nil, err
}

if jsonResp.Code == errorcode.OK {
return &jsonResp.Data, nil
}

return nil, jsonResp.ToError()
}

type CacheGetPackageObjectCacheInfosReq struct {
UserID int64 `json:"userID"`
PackageID int64 `json:"packageID"`
}
type CacheGetPackageObjectCacheInfosResp struct {
Infos []ObjectCacheInfo `json:"cacheInfos"`
}

if codeResp.Code == errorcode.OK {
return &codeResp.Data, nil
}
func (c *Client) CacheGetPackageObjectCacheInfos(req CacheGetPackageObjectCacheInfosReq) (*CacheGetPackageObjectCacheInfosResp, error) {
url, err := url.JoinPath(c.baseURL, "/cache/getPackageObjectCacheInfos")
if err != nil {
return nil, err
}

resp, err := myhttp.GetForm(url, myhttp.RequestParam{
Query: req,
})
if err != nil {
return nil, err
}

jsonResp, err := myhttp.ParseJSONResponse[response[CacheGetPackageObjectCacheInfosResp]](resp)
if err != nil {
return nil, err
}

return nil, codeResp.ToError()
if jsonResp.Code == errorcode.OK {
return &jsonResp.Data, nil
}

return nil, fmt.Errorf("unknow response content type: %s", contType)
return nil, jsonResp.ToError()
}

+ 18
- 3
sdks/storage/models.go View File

@@ -129,14 +129,29 @@ func NewPackageCachingInfo(nodeInfos []NodePackageCachingInfo, packageSize int64
}
}

type Object struct {
ObjectID int64 `db:"ObjectID" json:"objectID"`
PackageID int64 `db:"PackageID" json:"packageID"`
Path string `db:"Path" json:"path"`
Size int64 `db:"Size" json:"size,string"`
}

type Package struct {
PackageID int64 `db:"PackageID" json:"packageID"`
Name string `db:"Name" json:"name"`
BucketID int64 `db:"BucketID" json:"bucketID"`
State string `db:"State" json:"state"`
Redundancy TypedRedundancyInfo `db:"Redundancy" json:"redundancy"`
}

type ObjectCacheInfo struct {
ObjectID int64 `json:"objectID"`
Object Object `json:"object"`
FileHash string `json:"fileHash"`
}

func NewObjectCacheInfo(objectID int64, fileHash string) ObjectCacheInfo {
func NewObjectCacheInfo(object Object, fileHash string) ObjectCacheInfo {
return ObjectCacheInfo{
ObjectID: objectID,
Object: object,
FileHash: fileHash,
}
}


+ 33
- 0
sdks/storage/package.go View File

@@ -12,6 +12,39 @@ import (
"gitlink.org.cn/cloudream/common/utils/serder"
)

type PackageGetReq struct {
UserID int64 `json:"userID"`
PackageID int64 `json:"packageID"`
}
type PackageGetResp struct {
Package
}

func (c *Client) PackageGet(req PackageGetReq) (*PackageGetResp, error) {
url, err := url.JoinPath(c.baseURL, "/package/get")
if err != nil {
return nil, err
}

resp, err := myhttp.GetForm(url, myhttp.RequestParam{
Query: req,
})
if err != nil {
return nil, err
}

codeResp, err := myhttp.ParseJSONResponse[response[PackageGetResp]](resp)
if err != nil {
return nil, err
}

if codeResp.Code == errorcode.OK {
return &codeResp.Data, nil
}

return nil, codeResp.ToError()
}

type PackageUploadReq struct {
UserID int64 `json:"userID"`
BucketID int64 `json:"bucketID"`


+ 47
- 15
sdks/storage/storage.go View File

@@ -15,35 +15,33 @@ type StorageLoadPackageReq struct {
PackageID int64 `json:"packageID"`
StorageID int64 `json:"storageID"`
}
type StorageLoadPackageResp struct {
FullPath string `json:"fullPath"`
}

func (c *Client) StorageLoadPackage(req StorageLoadPackageReq) error {
func (c *Client) StorageLoadPackage(req StorageLoadPackageReq) (*StorageLoadPackageResp, error) {
url, err := url.JoinPath(c.baseURL, "/storage/loadPackage")
if err != nil {
return err
return nil, err
}

resp, err := myhttp.PostJSON(url, myhttp.RequestParam{
Body: req,
})
if err != nil {
return err
return nil, err
}

contType := resp.Header.Get("Content-Type")
if strings.Contains(contType, myhttp.ContentTypeJSON) {
var codeResp response[any]
if err := serder.JSONToObjectStream(resp.Body, &codeResp); err != nil {
return fmt.Errorf("parsing response: %w", err)
}

if codeResp.Code == errorcode.OK {
return nil
}
codeResp, err := myhttp.ParseJSONResponse[response[StorageLoadPackageResp]](resp)
if err != nil {
return nil, err
}

return codeResp.ToError()
if codeResp.Code == errorcode.OK {
return &codeResp.Data, nil
}

return fmt.Errorf("unknow response content type: %s", contType)
return nil, codeResp.ToError()
}

type StorageCreatePackageReq struct {
@@ -88,3 +86,37 @@ func (c *Client) StorageCreatePackage(req StorageCreatePackageReq) (*StorageCrea

return nil, fmt.Errorf("unknow response content type: %s", contType)
}

type StorageGetInfoReq struct {
StorageID int64 `json:"storageID"`
}
type StorageGetInfoResp struct {
Name string `json:"name"`
NodeID int64 `json:"nodeID"`
Directory string `json:"directory"`
}

func (c *Client) StorageGetInfo(req StorageGetInfoReq) (*StorageGetInfoResp, error) {
url, err := url.JoinPath(c.baseURL, "/storage/getInfo")
if err != nil {
return nil, err
}

resp, err := myhttp.GetForm(url, myhttp.RequestParam{
Body: req,
})
if err != nil {
return nil, err
}

codeResp, err := myhttp.ParseJSONResponse[response[StorageGetInfoResp]](resp)
if err != nil {
return nil, err
}

if codeResp.Code == errorcode.OK {
return &codeResp.Data, nil
}

return nil, codeResp.ToError()
}

+ 59
- 2
sdks/storage/storage_test.go View File

@@ -11,6 +11,56 @@ import (
"gitlink.org.cn/cloudream/common/pkgs/iterator"
)

func Test_PackageGet(t *testing.T) {
Convey("上传后获取Package信息", t, func() {
cli := NewClient(&Config{
URL: "http://localhost:7890",
})

fileData := make([]byte, 4096)
for i := 0; i < len(fileData); i++ {
fileData[i] = byte(i)
}

pkgName := uuid.NewString()
upResp, err := cli.PackageUpload(PackageUploadReq{
UserID: 0,
BucketID: 1,
Name: pkgName,
Redundancy: TypedRedundancyInfo{
Type: RedundancyRep,
Info: NewRepRedundancyInfo(1),
},
Files: iterator.Array(
&IterPackageUploadFile{
Path: "test",
File: io.NopCloser(bytes.NewBuffer(fileData)),
},
&IterPackageUploadFile{
Path: "test2",
File: io.NopCloser(bytes.NewBuffer(fileData)),
},
),
})
So(err, ShouldBeNil)

getResp, err := cli.PackageGet(PackageGetReq{
UserID: 0,
PackageID: upResp.PackageID,
})
So(err, ShouldBeNil)

So(getResp.PackageID, ShouldEqual, upResp.PackageID)
So(getResp.Package.Name, ShouldEqual, pkgName)

err = cli.PackageDelete(PackageDeleteReq{
UserID: 0,
PackageID: upResp.PackageID,
})
So(err, ShouldBeNil)
})
}

func Test_Object(t *testing.T) {
Convey("上传,下载,删除", t, func() {
cli := NewClient(&Config{
@@ -96,7 +146,7 @@ func Test_Storage(t *testing.T) {
})
So(err, ShouldBeNil)

err = cli.StorageLoadPackage(StorageLoadPackageReq{
_, err = cli.StorageLoadPackage(StorageLoadPackageReq{
UserID: 0,
PackageID: upResp.PackageID,
StorageID: 1,
@@ -149,7 +199,14 @@ func Test_Cache(t *testing.T) {
NodeID: 1,
})
So(err, ShouldBeNil)
So(len(cacheMoveResp.CacheInfos), ShouldEqual, 2)

cacheInfoResp, err := cli.CacheGetPackageObjectCacheInfos(CacheGetPackageObjectCacheInfosReq{
UserID: 0,
PackageID: upResp.PackageID,
})
So(err, ShouldBeNil)

So(cacheInfoResp.Infos, ShouldResemble, cacheMoveResp.CacheInfos)

err = cli.PackageDelete(PackageDeleteReq{
UserID: 0,


+ 14
- 0
utils/http/http.go View File

@@ -109,6 +109,20 @@ func PostForm(url string, param RequestParam) (*http.Response, error) {
return http.DefaultClient.Do(req)
}

func ParseJSONResponse[TBody any](resp *http.Response) (TBody, error) {
var ret TBody
contType := resp.Header.Get("Content-Type")
if strings.Contains(contType, ContentTypeJSON) {
if err := serder.JSONToObjectStream(resp.Body, &ret); err != nil {
return ret, fmt.Errorf("parsing response: %w", err)
}

return ret, nil
}

return ret, fmt.Errorf("unknow response content type: %s", contType)
}

type MultiPartRequestParam struct {
Header any
Query any


Loading…
Cancel
Save