Browse Source

重命名http包;优化工具函数

gitlink
Sydonian 1 year ago
parent
commit
10be4808c4
16 changed files with 111 additions and 107 deletions
  1. +4
    -4
      sdks/imfs/ipfs.go
  2. +3
    -3
      sdks/imfs/package.go
  3. +3
    -3
      sdks/imfs/proxy.go
  4. +17
    -17
      sdks/pcm/pcm.go
  5. +7
    -7
      sdks/scheduler/jobset.go
  6. +5
    -5
      sdks/storage/bucket.go
  7. +2
    -2
      sdks/storage/cache.go
  8. +2
    -2
      sdks/storage/node.go
  9. +16
    -23
      sdks/storage/object.go
  10. +9
    -9
      sdks/storage/package.go
  11. +5
    -5
      sdks/storage/storage.go
  12. +2
    -2
      sdks/storage/utils.go
  13. +15
    -15
      sdks/unifyops/unifyops.go
  14. +1
    -1
      utils/http2/http.go
  15. +1
    -1
      utils/http2/http_test.go
  16. +19
    -8
      utils/io2/ring.go

+ 4
- 4
sdks/imfs/ipfs.go View File

@@ -6,7 +6,7 @@ import (
"net/url" "net/url"
"strings" "strings"


myhttp "gitlink.org.cn/cloudream/common/utils/http"
"gitlink.org.cn/cloudream/common/utils/http2"
"gitlink.org.cn/cloudream/common/utils/serder" "gitlink.org.cn/cloudream/common/utils/serder"
) )


@@ -24,7 +24,7 @@ func (c *Client) IPFSRead(req IPFSRead) (io.ReadCloser, error) {
return nil, err return nil, err
} }


resp, err := myhttp.GetForm(url, myhttp.RequestParam{
resp, err := http2.GetForm(url, http2.RequestParam{
Query: req, Query: req,
}) })
if err != nil { if err != nil {
@@ -33,7 +33,7 @@ func (c *Client) IPFSRead(req IPFSRead) (io.ReadCloser, error) {


contType := resp.Header.Get("Content-Type") contType := resp.Header.Get("Content-Type")


if strings.Contains(contType, myhttp.ContentTypeJSON) {
if strings.Contains(contType, http2.ContentTypeJSON) {
var codeResp response[any] var codeResp response[any]
if err := serder.JSONToObjectStream(resp.Body, &codeResp); err != nil { if err := serder.JSONToObjectStream(resp.Body, &codeResp); err != nil {
return nil, fmt.Errorf("parsing response: %w", err) return nil, fmt.Errorf("parsing response: %w", err)
@@ -42,7 +42,7 @@ func (c *Client) IPFSRead(req IPFSRead) (io.ReadCloser, error) {
return nil, codeResp.ToError() return nil, codeResp.ToError()
} }


if strings.Contains(contType, myhttp.ContentTypeOctetStream) {
if strings.Contains(contType, http2.ContentTypeOctetStream) {
return resp.Body, nil return resp.Body, nil
} }




+ 3
- 3
sdks/imfs/package.go View File

@@ -5,7 +5,7 @@ import (


"gitlink.org.cn/cloudream/common/consts/errorcode" "gitlink.org.cn/cloudream/common/consts/errorcode"
cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" cdssdk "gitlink.org.cn/cloudream/common/sdks/storage"
myhttp "gitlink.org.cn/cloudream/common/utils/http"
"gitlink.org.cn/cloudream/common/utils/http2"
) )


const PackageGetWithObjectsPath = "/package/getWithObjects" const PackageGetWithObjectsPath = "/package/getWithObjects"
@@ -25,14 +25,14 @@ func (c *Client) PackageGetWithObjects(req PackageGetWithObjectsInfos) (*Package
return nil, err return nil, err
} }


resp, err := myhttp.GetForm(url, myhttp.RequestParam{
resp, err := http2.GetForm(url, http2.RequestParam{
Query: req, Query: req,
}) })
if err != nil { if err != nil {
return nil, err return nil, err
} }


jsonResp, err := myhttp.ParseJSONResponse[response[PackageGetWithObjectsResp]](resp)
jsonResp, err := http2.ParseJSONResponse[response[PackageGetWithObjectsResp]](resp)
if err != nil { if err != nil {
return nil, err return nil, err
} }


+ 3
- 3
sdks/imfs/proxy.go View File

@@ -5,7 +5,7 @@ import (


"gitlink.org.cn/cloudream/common/consts/errorcode" "gitlink.org.cn/cloudream/common/consts/errorcode"
schsdk "gitlink.org.cn/cloudream/common/sdks/scheduler" schsdk "gitlink.org.cn/cloudream/common/sdks/scheduler"
myhttp "gitlink.org.cn/cloudream/common/utils/http"
"gitlink.org.cn/cloudream/common/utils/http2"
) )


const ProxyGetServiceInfoPath = "/proxy/getServiceInfo" const ProxyGetServiceInfoPath = "/proxy/getServiceInfo"
@@ -25,14 +25,14 @@ func (c *Client) ProxyGetServiceInfo(req ProxyGetServiceInfo) (*ProxyGetServiceI
return nil, err return nil, err
} }


resp, err := myhttp.GetForm(url, myhttp.RequestParam{
resp, err := http2.GetForm(url, http2.RequestParam{
Query: req, Query: req,
}) })
if err != nil { if err != nil {
return nil, err return nil, err
} }


jsonResp, err := myhttp.ParseJSONResponse[response[ProxyGetServiceInfoResp]](resp)
jsonResp, err := http2.ParseJSONResponse[response[ProxyGetServiceInfoResp]](resp)
if err != nil { if err != nil {
return nil, err return nil, err
} }


+ 17
- 17
sdks/pcm/pcm.go View File

@@ -8,7 +8,7 @@ import (


"gitlink.org.cn/cloudream/common/sdks" "gitlink.org.cn/cloudream/common/sdks"
schsdk "gitlink.org.cn/cloudream/common/sdks/scheduler" schsdk "gitlink.org.cn/cloudream/common/sdks/scheduler"
myhttp "gitlink.org.cn/cloudream/common/utils/http"
"gitlink.org.cn/cloudream/common/utils/http2"
"gitlink.org.cn/cloudream/common/utils/serder" "gitlink.org.cn/cloudream/common/utils/serder"
) )


@@ -31,7 +31,7 @@ func (c *Client) UploadImage(req UploadImageReq) (*UploadImageResp, error) {
if err != nil { if err != nil {
return nil, err return nil, err
} }
resp, err := myhttp.PostJSON(url, myhttp.RequestParam{
resp, err := http2.PostJSON(url, http2.RequestParam{
Body: req, Body: req,
}) })
if err != nil { if err != nil {
@@ -39,7 +39,7 @@ func (c *Client) UploadImage(req UploadImageReq) (*UploadImageResp, error) {
} }


contType := resp.Header.Get("Content-Type") contType := resp.Header.Get("Content-Type")
if strings.Contains(contType, myhttp.ContentTypeJSON) {
if strings.Contains(contType, http2.ContentTypeJSON) {


var codeResp response[UploadImageResp] var codeResp response[UploadImageResp]
if err := serder.JSONToObjectStream(resp.Body, &codeResp); err != nil { if err := serder.JSONToObjectStream(resp.Body, &codeResp); err != nil {
@@ -71,12 +71,12 @@ func (c *Client) GetParticipants() (*GetParticipantsResp, error) {
if err != nil { if err != nil {
return nil, err return nil, err
} }
rawResp, err := myhttp.GetJSON(url, myhttp.RequestParam{})
rawResp, err := http2.GetJSON(url, http2.RequestParam{})
if err != nil { if err != nil {
return nil, err return nil, err
} }


resp, err := myhttp.ParseJSONResponse[Resp](rawResp)
resp, err := http2.ParseJSONResponse[Resp](rawResp)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@@ -112,14 +112,14 @@ func (c *Client) GetImageList(req GetImageListReq) (*GetImageListResp, error) {
if err != nil { if err != nil {
return nil, err return nil, err
} }
rawResp, err := myhttp.GetJSON(url, myhttp.RequestParam{
rawResp, err := http2.GetJSON(url, http2.RequestParam{
Body: req, Body: req,
}) })
if err != nil { if err != nil {
return nil, err return nil, err
} }


resp, err := myhttp.ParseJSONResponse[Resp](rawResp)
resp, err := http2.ParseJSONResponse[Resp](rawResp)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@@ -148,14 +148,14 @@ func (c *Client) DeleteImage(req DeleteImageReq) error {
if err != nil { if err != nil {
return err return err
} }
rawResp, err := myhttp.PostJSON(url, myhttp.RequestParam{
rawResp, err := http2.PostJSON(url, http2.RequestParam{
Body: req, Body: req,
}) })
if err != nil { if err != nil {
return err return err
} }


resp, err := myhttp.ParseJSONResponse[Resp](rawResp)
resp, err := http2.ParseJSONResponse[Resp](rawResp)
if err != nil { if err != nil {
return err return err
} }
@@ -191,14 +191,14 @@ func (c *Client) SubmitTask(req SubmitTaskReq) (*SubmitTaskResp, error) {
if err != nil { if err != nil {
return nil, err return nil, err
} }
rawResp, err := myhttp.PostJSON(url, myhttp.RequestParam{
rawResp, err := http2.PostJSON(url, http2.RequestParam{
Body: req, Body: req,
}) })
if err != nil { if err != nil {
return nil, err return nil, err
} }


resp, err := myhttp.ParseJSONResponse[Resp](rawResp)
resp, err := http2.ParseJSONResponse[Resp](rawResp)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@@ -241,14 +241,14 @@ func (c *Client) GetTask(req GetTaskReq) (*GetTaskResp, error) {
if err != nil { if err != nil {
return nil, err return nil, err
} }
rawResp, err := myhttp.GetJSON(url, myhttp.RequestParam{
rawResp, err := http2.GetJSON(url, http2.RequestParam{
Body: req, Body: req,
}) })
if err != nil { if err != nil {
return nil, err return nil, err
} }


resp, err := myhttp.ParseJSONResponse[Resp](rawResp)
resp, err := http2.ParseJSONResponse[Resp](rawResp)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@@ -280,14 +280,14 @@ func (c *Client) DeleteTask(req DeleteTaskReq) error {
if err != nil { if err != nil {
return err return err
} }
rawResp, err := myhttp.PostJSON(url, myhttp.RequestParam{
rawResp, err := http2.PostJSON(url, http2.RequestParam{
Body: req, Body: req,
}) })
if err != nil { if err != nil {
return err return err
} }


resp, err := myhttp.ParseJSONResponse[Resp](rawResp)
resp, err := http2.ParseJSONResponse[Resp](rawResp)
if err != nil { if err != nil {
return err return err
} }
@@ -318,14 +318,14 @@ func (c *Client) GetResourceSpecs(req GetImageListReq) (*GetResourceSpecsResp, e
if err != nil { if err != nil {
return nil, err return nil, err
} }
rawResp, err := myhttp.GetJSON(url, myhttp.RequestParam{
rawResp, err := http2.GetJSON(url, http2.RequestParam{
Body: req, Body: req,
}) })
if err != nil { if err != nil {
return nil, err return nil, err
} }


resp, err := myhttp.ParseJSONResponse[Resp](rawResp)
resp, err := http2.ParseJSONResponse[Resp](rawResp)
if err != nil { if err != nil {
return nil, err return nil, err
} }


+ 7
- 7
sdks/scheduler/jobset.go View File

@@ -7,7 +7,7 @@ import (


"gitlink.org.cn/cloudream/common/consts/errorcode" "gitlink.org.cn/cloudream/common/consts/errorcode"
cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" cdssdk "gitlink.org.cn/cloudream/common/sdks/storage"
myhttp "gitlink.org.cn/cloudream/common/utils/http"
"gitlink.org.cn/cloudream/common/utils/http2"
"gitlink.org.cn/cloudream/common/utils/serder" "gitlink.org.cn/cloudream/common/utils/serder"
) )


@@ -26,7 +26,7 @@ func (c *Client) JobSetSumbit(req JobSetSumbitReq) (*JobSetSumbitResp, error) {
return nil, err return nil, err
} }


resp, err := myhttp.PostJSON(url, myhttp.RequestParam{
resp, err := http2.PostJSON(url, http2.RequestParam{
Body: req, Body: req,
}) })
if err != nil { if err != nil {
@@ -34,7 +34,7 @@ func (c *Client) JobSetSumbit(req JobSetSumbitReq) (*JobSetSumbitResp, error) {
} }


contType := resp.Header.Get("Content-Type") contType := resp.Header.Get("Content-Type")
if strings.Contains(contType, myhttp.ContentTypeJSON) {
if strings.Contains(contType, http2.ContentTypeJSON) {
var codeResp response[JobSetSumbitResp] var codeResp response[JobSetSumbitResp]
if err := serder.JSONToObjectStream(resp.Body, &codeResp); err != nil { if err := serder.JSONToObjectStream(resp.Body, &codeResp); err != nil {
return nil, fmt.Errorf("parsing response: %w", err) return nil, fmt.Errorf("parsing response: %w", err)
@@ -63,7 +63,7 @@ func (c *Client) JobSetLocalFileUploaded(req JobSetLocalFileUploadedReq) error {
return err return err
} }


resp, err := myhttp.PostJSON(url, myhttp.RequestParam{
resp, err := http2.PostJSON(url, http2.RequestParam{
Body: req, Body: req,
}) })
if err != nil { if err != nil {
@@ -71,7 +71,7 @@ func (c *Client) JobSetLocalFileUploaded(req JobSetLocalFileUploadedReq) error {
} }


contType := resp.Header.Get("Content-Type") contType := resp.Header.Get("Content-Type")
if strings.Contains(contType, myhttp.ContentTypeJSON) {
if strings.Contains(contType, http2.ContentTypeJSON) {
var codeResp response[any] var codeResp response[any]
if err := serder.JSONToObjectStream(resp.Body, &codeResp); err != nil { if err := serder.JSONToObjectStream(resp.Body, &codeResp); err != nil {
return fmt.Errorf("parsing response: %w", err) return fmt.Errorf("parsing response: %w", err)
@@ -101,7 +101,7 @@ func (c *Client) JobSetGetServiceList(req JobSetGetServiceListReq) (*JobSetGetSe
return nil, err return nil, err
} }


resp, err := myhttp.GetJSON(url, myhttp.RequestParam{
resp, err := http2.GetJSON(url, http2.RequestParam{
Body: req, Body: req,
}) })
if err != nil { if err != nil {
@@ -109,7 +109,7 @@ func (c *Client) JobSetGetServiceList(req JobSetGetServiceListReq) (*JobSetGetSe
} }


contType := resp.Header.Get("Content-Type") contType := resp.Header.Get("Content-Type")
if strings.Contains(contType, myhttp.ContentTypeJSON) {
if strings.Contains(contType, http2.ContentTypeJSON) {
var codeResp response[JobSetGetServiceListResp] var codeResp response[JobSetGetServiceListResp]
if err := serder.JSONToObjectStream(resp.Body, &codeResp); err != nil { if err := serder.JSONToObjectStream(resp.Body, &codeResp); err != nil {
return nil, fmt.Errorf("parsing response: %w", err) return nil, fmt.Errorf("parsing response: %w", err)


+ 5
- 5
sdks/storage/bucket.go View File

@@ -4,7 +4,7 @@ import (
"net/url" "net/url"


"gitlink.org.cn/cloudream/common/consts/errorcode" "gitlink.org.cn/cloudream/common/consts/errorcode"
myhttp "gitlink.org.cn/cloudream/common/utils/http"
"gitlink.org.cn/cloudream/common/utils/http2"
) )


type BucketService struct { type BucketService struct {
@@ -31,7 +31,7 @@ func (c *BucketService) GetByName(req BucketGetByName) (*BucketGetByNameResp, er
return nil, err return nil, err
} }


resp, err := myhttp.GetForm(url, myhttp.RequestParam{
resp, err := http2.GetForm(url, http2.RequestParam{
Query: req, Query: req,
}) })
if err != nil { if err != nil {
@@ -67,7 +67,7 @@ func (c *BucketService) Create(req BucketCreate) (*BucketCreateResp, error) {
return nil, err return nil, err
} }


resp, err := myhttp.PostJSON(url, myhttp.RequestParam{
resp, err := http2.PostJSON(url, http2.RequestParam{
Body: req, Body: req,
}) })
if err != nil { if err != nil {
@@ -101,7 +101,7 @@ func (c *BucketService) Delete(req BucketDelete) error {
return err return err
} }


resp, err := myhttp.PostJSON(url, myhttp.RequestParam{
resp, err := http2.PostJSON(url, http2.RequestParam{
Body: req, Body: req,
}) })
if err != nil { if err != nil {
@@ -136,7 +136,7 @@ func (c *BucketService) ListUserBuckets(req BucketListUserBucketsReq) (*BucketLi
return nil, err return nil, err
} }


resp, err := myhttp.GetForm(url, myhttp.RequestParam{
resp, err := http2.GetForm(url, http2.RequestParam{
Query: req, Query: req,
}) })
if err != nil { if err != nil {


+ 2
- 2
sdks/storage/cache.go View File

@@ -4,7 +4,7 @@ import (
"net/url" "net/url"


"gitlink.org.cn/cloudream/common/consts/errorcode" "gitlink.org.cn/cloudream/common/consts/errorcode"
myhttp "gitlink.org.cn/cloudream/common/utils/http"
"gitlink.org.cn/cloudream/common/utils/http2"
) )


const CacheMovePackagePath = "/cache/movePackage" const CacheMovePackagePath = "/cache/movePackage"
@@ -22,7 +22,7 @@ func (c *Client) CacheMovePackage(req CacheMovePackageReq) (*CacheMovePackageRes
return nil, err return nil, err
} }


resp, err := myhttp.PostJSON(url, myhttp.RequestParam{
resp, err := http2.PostJSON(url, http2.RequestParam{
Body: req, Body: req,
}) })
if err != nil { if err != nil {


+ 2
- 2
sdks/storage/node.go View File

@@ -4,7 +4,7 @@ import (
"net/url" "net/url"


"gitlink.org.cn/cloudream/common/consts/errorcode" "gitlink.org.cn/cloudream/common/consts/errorcode"
myhttp "gitlink.org.cn/cloudream/common/utils/http"
"gitlink.org.cn/cloudream/common/utils/http2"
) )


var NodeGetNodesPath = "/node/getNodes" var NodeGetNodesPath = "/node/getNodes"
@@ -23,7 +23,7 @@ func (c *Client) NodeGetNodes(req NodeGetNodesReq) (*NodeGetNodesResp, error) {
return nil, err return nil, err
} }


resp, err := myhttp.GetForm(url, myhttp.RequestParam{
resp, err := http2.GetForm(url, http2.RequestParam{
Query: req, Query: req,
}) })
if err != nil { if err != nil {


+ 16
- 23
sdks/storage/object.go View File

@@ -3,13 +3,14 @@ package cdssdk
import ( import (
"fmt" "fmt"
"io" "io"
"mime"
"net/url" "net/url"
"strings" "strings"
"time" "time"


"gitlink.org.cn/cloudream/common/consts/errorcode" "gitlink.org.cn/cloudream/common/consts/errorcode"
"gitlink.org.cn/cloudream/common/pkgs/iterator" "gitlink.org.cn/cloudream/common/pkgs/iterator"
myhttp "gitlink.org.cn/cloudream/common/utils/http"
"gitlink.org.cn/cloudream/common/utils/http2"
"gitlink.org.cn/cloudream/common/utils/serder" "gitlink.org.cn/cloudream/common/utils/serder"
) )


@@ -62,10 +63,10 @@ func (c *ObjectService) Upload(req ObjectUpload) (*ObjectUploadResp, error) {
return nil, fmt.Errorf("upload info to json: %w", err) return nil, fmt.Errorf("upload info to json: %w", err)
} }


resp, err := myhttp.PostMultiPart(url, myhttp.MultiPartRequestParam{
resp, err := http2.PostMultiPart(url, http2.MultiPartRequestParam{
Form: map[string]string{"info": string(infoJSON)}, Form: map[string]string{"info": string(infoJSON)},
Files: iterator.Map(req.Files, func(src *UploadingObject) (*myhttp.IterMultiPartFile, error) {
return &myhttp.IterMultiPartFile{
Files: iterator.Map(req.Files, func(src *UploadingObject) (*http2.IterMultiPartFile, error) {
return &http2.IterMultiPartFile{
FieldName: "files", FieldName: "files",
FileName: src.Path, FileName: src.Path,
File: src.File, File: src.File,
@@ -77,7 +78,7 @@ func (c *ObjectService) Upload(req ObjectUpload) (*ObjectUploadResp, error) {
} }


contType := resp.Header.Get("Content-Type") contType := resp.Header.Get("Content-Type")
if strings.Contains(contType, myhttp.ContentTypeJSON) {
if strings.Contains(contType, http2.ContentTypeJSON) {
var err error var err error
var codeResp response[ObjectUploadResp] var codeResp response[ObjectUploadResp]
if codeResp, err = serder.JSONToObjectStreamEx[response[ObjectUploadResp]](resp.Body); err != nil { if codeResp, err = serder.JSONToObjectStreamEx[response[ObjectUploadResp]](resp.Body); err != nil {
@@ -115,7 +116,7 @@ func (c *ObjectService) Download(req ObjectDownload) (*DownloadingObject, error)
return nil, err return nil, err
} }


resp, err := myhttp.GetJSON(url, myhttp.RequestParam{
resp, err := http2.GetJSON(url, http2.RequestParam{
Query: req, Query: req,
}) })
if err != nil { if err != nil {
@@ -124,7 +125,7 @@ func (c *ObjectService) Download(req ObjectDownload) (*DownloadingObject, error)


contType := resp.Header.Get("Content-Type") contType := resp.Header.Get("Content-Type")


if strings.Contains(contType, myhttp.ContentTypeJSON) {
if strings.Contains(contType, http2.ContentTypeJSON) {
var codeResp response[any] var codeResp response[any]
if err := serder.JSONToObjectStream(resp.Body, &codeResp); err != nil { if err := serder.JSONToObjectStream(resp.Body, &codeResp); err != nil {
return nil, fmt.Errorf("parsing response: %w", err) return nil, fmt.Errorf("parsing response: %w", err)
@@ -133,22 +134,14 @@ func (c *ObjectService) Download(req ObjectDownload) (*DownloadingObject, error)
return nil, codeResp.ToError() return nil, codeResp.ToError()
} }


_, files, err := myhttp.ParseMultiPartResponse(resp)
_, params, err := mime.ParseMediaType(resp.Header.Get("Content-Disposition"))
if err != nil { if err != nil {
return nil, err
}

file, err := files.MoveNext()
if err == iterator.ErrNoMoreItem {
return nil, fmt.Errorf("no file found in response")
}
if err != nil {
return nil, err
return nil, fmt.Errorf("parsing content disposition: %w", err)
} }


return &DownloadingObject{ return &DownloadingObject{
Path: file.FileName,
File: file.File,
Path: params["filename"],
File: resp.Body,
}, nil }, nil
} }


@@ -178,7 +171,7 @@ func (c *ObjectService) UpdateInfo(req ObjectUpdateInfo) (*ObjectUpdateInfoResp,
return nil, err return nil, err
} }


resp, err := myhttp.PostJSON(url, myhttp.RequestParam{
resp, err := http2.PostJSON(url, http2.RequestParam{
Body: req, Body: req,
}) })
if err != nil { if err != nil {
@@ -225,7 +218,7 @@ func (c *ObjectService) Move(req ObjectMove) (*ObjectMoveResp, error) {
return nil, err return nil, err
} }


resp, err := myhttp.PostJSON(url, myhttp.RequestParam{
resp, err := http2.PostJSON(url, http2.RequestParam{
Body: req, Body: req,
}) })
if err != nil { if err != nil {
@@ -259,7 +252,7 @@ func (c *ObjectService) Delete(req ObjectDelete) error {
return err return err
} }


resp, err := myhttp.PostJSON(url, myhttp.RequestParam{
resp, err := http2.PostJSON(url, http2.RequestParam{
Body: req, Body: req,
}) })
if err != nil { if err != nil {
@@ -294,7 +287,7 @@ func (c *ObjectService) GetPackageObjects(req ObjectGetPackageObjects) (*ObjectG
return nil, err return nil, err
} }


resp, err := myhttp.GetForm(url, myhttp.RequestParam{
resp, err := http2.GetForm(url, http2.RequestParam{
Query: req, Query: req,
}) })
if err != nil { if err != nil {


+ 9
- 9
sdks/storage/package.go View File

@@ -6,7 +6,7 @@ import (
"strings" "strings"


"gitlink.org.cn/cloudream/common/consts/errorcode" "gitlink.org.cn/cloudream/common/consts/errorcode"
myhttp "gitlink.org.cn/cloudream/common/utils/http"
"gitlink.org.cn/cloudream/common/utils/http2"
"gitlink.org.cn/cloudream/common/utils/serder" "gitlink.org.cn/cloudream/common/utils/serder"
) )


@@ -34,7 +34,7 @@ func (c *PackageService) Get(req PackageGetReq) (*PackageGetResp, error) {
return nil, err return nil, err
} }


resp, err := myhttp.GetForm(url, myhttp.RequestParam{
resp, err := http2.GetForm(url, http2.RequestParam{
Query: req, Query: req,
}) })
if err != nil { if err != nil {
@@ -70,7 +70,7 @@ func (c *PackageService) GetByName(req PackageGetByName) (*PackageGetByNameResp,
return nil, err return nil, err
} }


resp, err := myhttp.GetForm(url, myhttp.RequestParam{
resp, err := http2.GetForm(url, http2.RequestParam{
Query: req, Query: req,
}) })
if err != nil { if err != nil {
@@ -107,7 +107,7 @@ func (s *PackageService) Create(req PackageCreate) (*PackageCreateResp, error) {
return nil, err return nil, err
} }


resp, err := myhttp.PostJSON(url, myhttp.RequestParam{
resp, err := http2.PostJSON(url, http2.RequestParam{
Body: req, Body: req,
}) })
if err != nil { if err != nil {
@@ -139,7 +139,7 @@ func (c *PackageService) Delete(req PackageDelete) error {
return err return err
} }


resp, err := myhttp.PostJSON(url, myhttp.RequestParam{
resp, err := http2.PostJSON(url, http2.RequestParam{
Body: req, Body: req,
}) })
if err != nil { if err != nil {
@@ -148,7 +148,7 @@ func (c *PackageService) Delete(req PackageDelete) error {


contType := resp.Header.Get("Content-Type") contType := resp.Header.Get("Content-Type")


if strings.Contains(contType, myhttp.ContentTypeJSON) {
if strings.Contains(contType, http2.ContentTypeJSON) {
var codeResp response[any] var codeResp response[any]
if err := serder.JSONToObjectStream(resp.Body, &codeResp); err != nil { if err := serder.JSONToObjectStream(resp.Body, &codeResp); err != nil {
return fmt.Errorf("parsing response: %w", err) return fmt.Errorf("parsing response: %w", err)
@@ -181,7 +181,7 @@ func (c *PackageService) ListBucketPackages(req PackageListBucketPackages) (*Pac
return nil, err return nil, err
} }


resp, err := myhttp.GetForm(url, myhttp.RequestParam{
resp, err := http2.GetForm(url, http2.RequestParam{
Query: req, Query: req,
}) })
if err != nil { if err != nil {
@@ -216,7 +216,7 @@ func (c *PackageService) GetCachedNodes(req PackageGetCachedNodesReq) (*PackageG
if err != nil { if err != nil {
return nil, err return nil, err
} }
resp, err := myhttp.GetJSON(url, myhttp.RequestParam{
resp, err := http2.GetJSON(url, http2.RequestParam{
Query: req, Query: req,
}) })
if err != nil { if err != nil {
@@ -251,7 +251,7 @@ func (c *PackageService) GetLoadedNodes(req PackageGetLoadedNodesReq) (*PackageG
if err != nil { if err != nil {
return nil, err return nil, err
} }
resp, err := myhttp.GetJSON(url, myhttp.RequestParam{
resp, err := http2.GetJSON(url, http2.RequestParam{
Query: req, Query: req,
}) })
if err != nil { if err != nil {


+ 5
- 5
sdks/storage/storage.go View File

@@ -6,7 +6,7 @@ import (
"strings" "strings"


"gitlink.org.cn/cloudream/common/consts/errorcode" "gitlink.org.cn/cloudream/common/consts/errorcode"
myhttp "gitlink.org.cn/cloudream/common/utils/http"
"gitlink.org.cn/cloudream/common/utils/http2"
"gitlink.org.cn/cloudream/common/utils/serder" "gitlink.org.cn/cloudream/common/utils/serder"
) )


@@ -30,7 +30,7 @@ func (c *Client) StorageLoadPackage(req StorageLoadPackageReq) (*StorageLoadPack
return nil, err return nil, err
} }


resp, err := myhttp.PostJSON(url, myhttp.RequestParam{
resp, err := http2.PostJSON(url, http2.RequestParam{
Body: req, Body: req,
}) })
if err != nil { if err != nil {
@@ -70,7 +70,7 @@ func (c *Client) StorageCreatePackage(req StorageCreatePackageReq) (*StorageCrea
return nil, err return nil, err
} }


resp, err := myhttp.PostJSON(url, myhttp.RequestParam{
resp, err := http2.PostJSON(url, http2.RequestParam{
Body: req, Body: req,
}) })
if err != nil { if err != nil {
@@ -78,7 +78,7 @@ func (c *Client) StorageCreatePackage(req StorageCreatePackageReq) (*StorageCrea
} }


contType := resp.Header.Get("Content-Type") contType := resp.Header.Get("Content-Type")
if strings.Contains(contType, myhttp.ContentTypeJSON) {
if strings.Contains(contType, http2.ContentTypeJSON) {
var codeResp response[StorageCreatePackageResp] var codeResp response[StorageCreatePackageResp]
if err := serder.JSONToObjectStream(resp.Body, &codeResp); err != nil { if err := serder.JSONToObjectStream(resp.Body, &codeResp); err != nil {
return nil, fmt.Errorf("parsing response: %w", err) return nil, fmt.Errorf("parsing response: %w", err)
@@ -110,7 +110,7 @@ func (c *Client) StorageGet(req StorageGet) (*StorageGetResp, error) {
return nil, err return nil, err
} }


resp, err := myhttp.GetForm(url, myhttp.RequestParam{
resp, err := http2.GetForm(url, http2.RequestParam{
Query: req, Query: req,
}) })
if err != nil { if err != nil {


+ 2
- 2
sdks/storage/utils.go View File

@@ -7,7 +7,7 @@ import (
"path/filepath" "path/filepath"
"strings" "strings"


myhttp "gitlink.org.cn/cloudream/common/utils/http"
"gitlink.org.cn/cloudream/common/utils/http2"
"gitlink.org.cn/cloudream/common/utils/math2" "gitlink.org.cn/cloudream/common/utils/math2"
"gitlink.org.cn/cloudream/common/utils/serder" "gitlink.org.cn/cloudream/common/utils/serder"
) )
@@ -19,7 +19,7 @@ func MakeIPFSFilePath(fileHash string) string {
func ParseJSONResponse[TBody any](resp *http.Response) (TBody, error) { func ParseJSONResponse[TBody any](resp *http.Response) (TBody, error) {
var ret TBody var ret TBody
contType := resp.Header.Get("Content-Type") contType := resp.Header.Get("Content-Type")
if strings.Contains(contType, myhttp.ContentTypeJSON) {
if strings.Contains(contType, http2.ContentTypeJSON) {
var err error var err error
if ret, err = serder.JSONToObjectStreamEx[TBody](resp.Body); err != nil { if ret, err = serder.JSONToObjectStreamEx[TBody](resp.Body); err != nil {
return ret, fmt.Errorf("parsing response: %w", err) return ret, fmt.Errorf("parsing response: %w", err)


+ 15
- 15
sdks/unifyops/unifyops.go View File

@@ -5,7 +5,7 @@ import (
"net/url" "net/url"
"strings" "strings"


myhttp "gitlink.org.cn/cloudream/common/utils/http"
"gitlink.org.cn/cloudream/common/utils/http2"
"gitlink.org.cn/cloudream/common/utils/serder" "gitlink.org.cn/cloudream/common/utils/serder"
) )


@@ -16,12 +16,12 @@ func (c *Client) GetAllSlwNodeInfo() ([]SlwNode, error) {
if err != nil { if err != nil {
return nil, err return nil, err
} }
resp, err := myhttp.GetJSON(url, myhttp.RequestParam{})
resp, err := http2.GetJSON(url, http2.RequestParam{})
if err != nil { if err != nil {
return nil, err return nil, err
} }
contType := resp.Header.Get("Content-Type") contType := resp.Header.Get("Content-Type")
if strings.Contains(contType, myhttp.ContentTypeJSON) {
if strings.Contains(contType, http2.ContentTypeJSON) {


var codeResp response[[]SlwNode] var codeResp response[[]SlwNode]
if err := serder.JSONToObjectStream(resp.Body, &codeResp); err != nil { if err := serder.JSONToObjectStream(resp.Body, &codeResp); err != nil {
@@ -47,7 +47,7 @@ func (c *Client) GetCPUData(node GetOneResourceDataReq) (*CPUResourceData, error
if err != nil { if err != nil {
return nil, err return nil, err
} }
resp, err := myhttp.PostJSON(url, myhttp.RequestParam{
resp, err := http2.PostJSON(url, http2.RequestParam{
Body: node, Body: node,
}) })
if err != nil { if err != nil {
@@ -55,7 +55,7 @@ func (c *Client) GetCPUData(node GetOneResourceDataReq) (*CPUResourceData, error
} }


contType := resp.Header.Get("Content-Type") contType := resp.Header.Get("Content-Type")
if strings.Contains(contType, myhttp.ContentTypeJSON) {
if strings.Contains(contType, http2.ContentTypeJSON) {


var codeResp response[CPUResourceData] var codeResp response[CPUResourceData]
if err := serder.JSONToObjectStream(resp.Body, &codeResp); err != nil { if err := serder.JSONToObjectStream(resp.Body, &codeResp); err != nil {
@@ -77,7 +77,7 @@ func (c *Client) GetNPUData(node GetOneResourceDataReq) (*NPUResourceData, error
if err != nil { if err != nil {
return nil, err return nil, err
} }
resp, err := myhttp.PostJSON(url, myhttp.RequestParam{
resp, err := http2.PostJSON(url, http2.RequestParam{
Body: node, Body: node,
}) })
if err != nil { if err != nil {
@@ -85,7 +85,7 @@ func (c *Client) GetNPUData(node GetOneResourceDataReq) (*NPUResourceData, error
} }


contType := resp.Header.Get("Content-Type") contType := resp.Header.Get("Content-Type")
if strings.Contains(contType, myhttp.ContentTypeJSON) {
if strings.Contains(contType, http2.ContentTypeJSON) {


var codeResp response[NPUResourceData] var codeResp response[NPUResourceData]
if err := serder.JSONToObjectStream(resp.Body, &codeResp); err != nil { if err := serder.JSONToObjectStream(resp.Body, &codeResp); err != nil {
@@ -107,7 +107,7 @@ func (c *Client) GetGPUData(node GetOneResourceDataReq) (*GPUResourceData, error
if err != nil { if err != nil {
return nil, err return nil, err
} }
resp, err := myhttp.PostJSON(url, myhttp.RequestParam{
resp, err := http2.PostJSON(url, http2.RequestParam{
Body: node, Body: node,
}) })
if err != nil { if err != nil {
@@ -115,7 +115,7 @@ func (c *Client) GetGPUData(node GetOneResourceDataReq) (*GPUResourceData, error
} }


contType := resp.Header.Get("Content-Type") contType := resp.Header.Get("Content-Type")
if strings.Contains(contType, myhttp.ContentTypeJSON) {
if strings.Contains(contType, http2.ContentTypeJSON) {


var codeResp response[GPUResourceData] var codeResp response[GPUResourceData]
if err := serder.JSONToObjectStream(resp.Body, &codeResp); err != nil { if err := serder.JSONToObjectStream(resp.Body, &codeResp); err != nil {
@@ -137,7 +137,7 @@ func (c *Client) GetMLUData(node GetOneResourceDataReq) (*MLUResourceData, error
if err != nil { if err != nil {
return nil, err return nil, err
} }
resp, err := myhttp.PostJSON(url, myhttp.RequestParam{
resp, err := http2.PostJSON(url, http2.RequestParam{
Body: node, Body: node,
}) })
if err != nil { if err != nil {
@@ -145,7 +145,7 @@ func (c *Client) GetMLUData(node GetOneResourceDataReq) (*MLUResourceData, error
} }


contType := resp.Header.Get("Content-Type") contType := resp.Header.Get("Content-Type")
if strings.Contains(contType, myhttp.ContentTypeJSON) {
if strings.Contains(contType, http2.ContentTypeJSON) {


var codeResp response[MLUResourceData] var codeResp response[MLUResourceData]
if err := serder.JSONToObjectStream(resp.Body, &codeResp); err != nil { if err := serder.JSONToObjectStream(resp.Body, &codeResp); err != nil {
@@ -167,7 +167,7 @@ func (c *Client) GetStorageData(node GetOneResourceDataReq) (*StorageResourceDat
if err != nil { if err != nil {
return nil, err return nil, err
} }
resp, err := myhttp.PostJSON(url, myhttp.RequestParam{
resp, err := http2.PostJSON(url, http2.RequestParam{
Body: node, Body: node,
}) })
if err != nil { if err != nil {
@@ -175,7 +175,7 @@ func (c *Client) GetStorageData(node GetOneResourceDataReq) (*StorageResourceDat
} }


contType := resp.Header.Get("Content-Type") contType := resp.Header.Get("Content-Type")
if strings.Contains(contType, myhttp.ContentTypeJSON) {
if strings.Contains(contType, http2.ContentTypeJSON) {


var codeResp response[StorageResourceData] var codeResp response[StorageResourceData]
if err := serder.JSONToObjectStream(resp.Body, &codeResp); err != nil { if err := serder.JSONToObjectStream(resp.Body, &codeResp); err != nil {
@@ -197,7 +197,7 @@ func (c *Client) GetMemoryData(node GetOneResourceDataReq) (*MemoryResourceData,
if err != nil { if err != nil {
return nil, err return nil, err
} }
resp, err := myhttp.PostJSON(url, myhttp.RequestParam{
resp, err := http2.PostJSON(url, http2.RequestParam{
Body: node, Body: node,
}) })
if err != nil { if err != nil {
@@ -205,7 +205,7 @@ func (c *Client) GetMemoryData(node GetOneResourceDataReq) (*MemoryResourceData,
} }


contType := resp.Header.Get("Content-Type") contType := resp.Header.Get("Content-Type")
if strings.Contains(contType, myhttp.ContentTypeJSON) {
if strings.Contains(contType, http2.ContentTypeJSON) {


var codeResp response[MemoryResourceData] var codeResp response[MemoryResourceData]
if err := serder.JSONToObjectStream(resp.Body, &codeResp); err != nil { if err := serder.JSONToObjectStream(resp.Body, &codeResp); err != nil {


utils/http/http.go → utils/http2/http.go View File

@@ -1,4 +1,4 @@
package http
package http2


import ( import (
"bytes" "bytes"

utils/http/http_test.go → utils/http2/http_test.go View File

@@ -1,4 +1,4 @@
package http
package http2


import ( import (
"testing" "testing"

+ 19
- 8
utils/io2/ring.go View File

@@ -18,7 +18,7 @@ type RingBufferStats struct {
type RingBuffer struct { type RingBuffer struct {
buf []byte buf []byte
src io.ReadCloser src io.ReadCloser
maxPerRead int // 后台读取线程每次读取的最大字节数,太小会导致IO次数增多,太大会导致读、写并行性下降
maxPreRead int // 后台读取线程每次读取的最大字节数,太小会导致IO次数增多,太大会导致读、写并行性下降
err error err error
isReading bool isReading bool
writePos int // 指向下一次写入的位置,应该是一个空位 writePos int // 指向下一次写入的位置,应该是一个空位
@@ -28,12 +28,23 @@ type RingBuffer struct {
stats RingBufferStats stats RingBufferStats
} }


func Ring(src io.ReadCloser, size int) *RingBuffer {
type RingBufferConfig struct {
MaxPreReading int // 后台读取线程每次读取的最大字节数,太小会导致IO次数增多,太大会导致读、写并行性下降
}

func Ring(src io.ReadCloser, size int, cfg ...RingBufferConfig) *RingBuffer {
c := RingBufferConfig{
MaxPreReading: size / 4,
}
if len(cfg) > 0 {
c = cfg[0]
}

lk := &sync.Mutex{} lk := &sync.Mutex{}
return &RingBuffer{ return &RingBuffer{
buf: make([]byte, size), buf: make([]byte, size),
src: src, src: src,
maxPerRead: size / 4,
maxPreRead: c.MaxPreReading,
waitReading: sync.NewCond(lk), waitReading: sync.NewCond(lk),
waitComsuming: sync.NewCond(lk), waitComsuming: sync.NewCond(lk),
} }
@@ -67,10 +78,10 @@ func (r *RingBuffer) Read(p []byte) (n int, err error) {
r.waitReading.L.Unlock() r.waitReading.L.Unlock()


if readPos < writePos { if readPos < writePos {
maxRead := math2.Min(r.maxPerRead, writePos-readPos)
maxRead := math2.Min(r.maxPreRead, writePos-readPos)
n = copy(p, r.buf[readPos:readPos+maxRead]) n = copy(p, r.buf[readPos:readPos+maxRead])
} else { } else {
maxRead := math2.Min(r.maxPerRead, len(r.buf)-readPos)
maxRead := math2.Min(r.maxPreRead, len(r.buf)-readPos)
n = copy(p, r.buf[readPos:readPos+maxRead]) n = copy(p, r.buf[readPos:readPos+maxRead])
} }


@@ -121,14 +132,14 @@ func (r *RingBuffer) reading() {
// 同上理,写入数据的时候如果readPos为0,则它的前一格是底层缓冲区的最后一格 // 同上理,写入数据的时候如果readPos为0,则它的前一格是底层缓冲区的最后一格
// 那就不能写入到这一格 // 那就不能写入到这一格
if readPos == 0 { if readPos == 0 {
maxWrite := math2.Min(r.maxPerRead, len(r.buf)-1-writePos)
maxWrite := math2.Min(r.maxPreRead, len(r.buf)-1-writePos)
n, err = r.src.Read(r.buf[writePos : writePos+maxWrite]) n, err = r.src.Read(r.buf[writePos : writePos+maxWrite])
} else { } else {
maxWrite := math2.Min(r.maxPerRead, len(r.buf)-writePos)
maxWrite := math2.Min(r.maxPreRead, len(r.buf)-writePos)
n, err = r.src.Read(r.buf[writePos : writePos+maxWrite]) n, err = r.src.Read(r.buf[writePos : writePos+maxWrite])
} }
} else { } else {
maxWrite := math2.Min(r.maxPerRead, readPos-1-writePos)
maxWrite := math2.Min(r.maxPreRead, readPos-1-writePos)
n, err = r.src.Read(r.buf[writePos : writePos+maxWrite]) n, err = r.src.Read(r.buf[writePos : writePos+maxWrite])
} }




Loading…
Cancel
Save