Browse Source

支持分片下载文件

pull/47/head
Sydonian 1 year ago
parent
commit
4b4bde7dbe
3 changed files with 151 additions and 66 deletions
  1. +3
    -3
      sdks/imfs/models.go
  2. +1
    -0
      sdks/storage/object.go
  3. +147
    -63
      utils/http/http.go

+ 3
- 3
sdks/imfs/models.go View File

@@ -16,10 +16,10 @@ type ClientService struct {
//代表任务给提供各服务的端口
type ServerService struct {
Name string `json:"name"`
Port string `json:"port"`
Port int `json:"port"`
}

type FullJobID struct {
JobSetID schsdk.JobSetID
LocalJobID string
JobSetID schsdk.JobSetID `json:"jobSetID"`
LocalJobID string `json:"localJobID"`
}

+ 1
- 0
sdks/storage/object.go View File

@@ -102,6 +102,7 @@ type ObjectDownload struct {
ObjectID ObjectID `form:"objectID" json:"objectID" binding:"required"`
Offset int64 `form:"offset" json:"offset,omitempty"`
Length *int64 `form:"length" json:"length,omitempty"`
PartSize int64 `form:"partSize" json:"partSize,omitempty"`
}
type DownloadingObject struct {
Path string


+ 147
- 63
utils/http/http.go View File

@@ -6,14 +6,17 @@ import (
"io"
"mime"
"mime/multipart"
"net"
"net/http"
"net/textproto"
ul "net/url"
"reflect"
"strings"
"time"

"github.com/mitchellh/mapstructure"
"gitlink.org.cn/cloudream/common/pkgs/iterator"
"gitlink.org.cn/cloudream/common/utils/io2"
"gitlink.org.cn/cloudream/common/utils/math2"
"gitlink.org.cn/cloudream/common/utils/serder"
)
@@ -25,6 +28,16 @@ const (
ContentTypeOctetStream = "application/octet-stream"
)

var defaultClient = http.Client{
Timeout: time.Second * 5,
Transport: &http.Transport{
DialContext: (&net.Dialer{
Timeout: time.Second * 5,
KeepAlive: 0, // 修改为 0 可以生效
}).DialContext,
},
}

type RequestParam struct {
Header any
Query any
@@ -49,7 +62,7 @@ func GetJSON(url string, param RequestParam) (*http.Response, error) {
return nil, err
}

return http.DefaultClient.Do(req)
return defaultClient.Do(req)
}

func GetForm(url string, param RequestParam) (*http.Response, error) {
@@ -70,7 +83,7 @@ func GetForm(url string, param RequestParam) (*http.Response, error) {
return nil, err
}

return http.DefaultClient.Do(req)
return defaultClient.Do(req)
}

func PostJSON(url string, param RequestParam) (*http.Response, error) {
@@ -91,7 +104,7 @@ func PostJSON(url string, param RequestParam) (*http.Response, error) {
return nil, err
}

return http.DefaultClient.Do(req)
return defaultClient.Do(req)
}

func PostForm(url string, param RequestParam) (*http.Response, error) {
@@ -112,7 +125,7 @@ func PostForm(url string, param RequestParam) (*http.Response, error) {
return nil, err
}

return http.DefaultClient.Do(req)
return defaultClient.Do(req)
}

func ParseJSONResponse[TBody any](resp *http.Response) (TBody, error) {
@@ -143,56 +156,96 @@ type MultiPartFile struct {
}

type multiPartFileIterator struct {
mr *multipart.Reader
firstFile *multipart.Part
iterErr error
mr *multipart.Reader
nextPart *multipart.Part
nextFileName string
}

// 不可异步调用
func (m *multiPartFileIterator) MoveNext() (*MultiPartFile, error) {
if m.firstFile != nil {
f := m.firstFile
m.firstFile = nil
if m.iterErr != nil {
return nil, m.iterErr
}

return &MultiPartFile{
FieldName: m.nextPart.FormName(),
FileName: m.nextFileName,
File: &multiPartFileReader{
iter: m,
curPart: m.nextPart,
fileName: m.nextFileName,
},
Header: m.nextPart.Header,
}, nil
}

fileName, err := ul.PathUnescape(f.FileName())
if err != nil {
return nil, fmt.Errorf("unescape file name: %w", err)
}
func (m *multiPartFileIterator) Close() {}

type multiPartFileReader struct {
iter *multiPartFileIterator
curPart *multipart.Part
fileName string
}

return &MultiPartFile{
FieldName: f.FormName(),
FileName: fileName,
File: f,
Header: f.Header,
}, nil
func (r *multiPartFileReader) Read(p []byte) (int, error) {
if r.curPart != nil {
return r.readFromCurPart(p)
}

for {
part, err := m.mr.NextPart()
if err == io.EOF {
return nil, iterator.ErrNoMoreItem
}
if err != nil {
return nil, err
}
part, err := r.iter.mr.NextPart()
if err == io.EOF {
r.iter.iterErr = iterator.ErrNoMoreItem
return 0, io.EOF
}
if err != nil {
r.iter.iterErr = err
return 0, err
}
r.curPart = part

fileName, err := ul.PathUnescape(part.FileName())
if err != nil {
return nil, fmt.Errorf("unescape file name: %w", err)
}
fileName, err := ul.PathUnescape(part.FileName())
if err != nil {
r.iter.iterErr = fmt.Errorf("unescape file name: %w", err)
return 0, r.iter.iterErr
}

if part.FileName() != "" {
return &MultiPartFile{
FieldName: part.FormName(),
FileName: fileName,
File: part,
Header: part.Header,
}, nil
}
if fileName == r.fileName {
return r.readFromCurPart(p)
}

// 如果新一个part的文件名不同,则开始一个新文件
r.iter.nextPart = part
r.iter.nextFileName = fileName
return 0, io.EOF
}

func (m *multiPartFileIterator) Close() {}
func (r *multiPartFileReader) readFromCurPart(p []byte) (int, error) {
rd, err := r.curPart.Read(p)
if err == io.EOF {
r.curPart = nil
return rd, nil
}
if err != nil {
return 0, err
}

return rd, nil
}

// 解析multipart/form-data响应,只支持form参数在头部的情况
func (m *multiPartFileReader) Close() error {
// 如果在文件还未读取完毕的时候关闭文件,则后续所有文件都无法再读取(暂不支持跳文件读取)
if m.curPart != nil {
m.curPart.Close()
m.iter.iterErr = fmt.Errorf("stream closed")
}

return nil
}

// 解析multipart/form-data响应。
// - 只支持form参数在头部的情况。
// - 支持文件分片(拆分成连续的同文件名的part)
func ParseMultiPartResponse(resp *http.Response) (map[string]string, iterator.Iterator[*MultiPartFile], error) {
mtype, params, err := mime.ParseMediaType(resp.Header.Get("Content-Type"))
if err != nil {
@@ -211,7 +264,8 @@ func ParseMultiPartResponse(resp *http.Response) (map[string]string, iterator.It
formValues := make(map[string]string)
rd := multipart.NewReader(resp.Body, boundary)

var firstFile *multipart.Part
var firstFilePart *multipart.Part
var firstFileName string
for {
part, err := rd.NextPart()
if err == io.EOF {
@@ -229,7 +283,12 @@ func ParseMultiPartResponse(resp *http.Response) (map[string]string, iterator.It
}

if fileName != "" {
firstFile = part
firstFilePart = part
firstFileName, err = ul.PathUnescape(part.FileName())
if err != nil {
return nil, nil, fmt.Errorf("unescape file name: %w", err)
}

break
}

@@ -242,16 +301,18 @@ func ParseMultiPartResponse(resp *http.Response) (map[string]string, iterator.It
}

return formValues, &multiPartFileIterator{
mr: rd,
firstFile: firstFile,
mr: rd,
nextPart: firstFilePart,
nextFileName: firstFileName,
}, nil
}

type MultiPartRequestParam struct {
Header any
Query any
Form any
Files MultiPartFileIterator
Header any
Query any
Form any
Files MultiPartFileIterator
PartSize int64 // 文件分片大小,如果为0则不分片
}

type MultiPartFileIterator = iterator.Iterator[*IterMultiPartFile]
@@ -309,20 +370,12 @@ func PostMultiPart(url string, param MultiPartRequestParam) (*http.Response, err
return fmt.Errorf("opening file: %w", err)
}

err = func() error {
defer file.File.Close()

w, err := muWriter.CreateFormFile(file.FieldName, ul.PathEscape(file.FileName))
if err != nil {
return fmt.Errorf("create form file failed, err: %w", err)
}

_, err = io.Copy(w, file.File)
if err != nil {
return err
}
return nil
}()
if param.PartSize == 0 {
err = sendFileOnePart(muWriter, file.FieldName, file.FileName, file.File)
} else {
err = sendFileMultiPart(muWriter, file.FieldName, file.FileName, file.File, param.PartSize)
}
file.File.Close()
if err != nil {
return err
}
@@ -348,6 +401,37 @@ func PostMultiPart(url string, param MultiPartRequestParam) (*http.Response, err
return resp, nil
}

func sendFileMultiPart(muWriter *multipart.Writer, fieldName, fileName string, file io.ReadCloser, partSize int64) error {
for {
w, err := muWriter.CreateFormFile(fieldName, ul.PathEscape(fileName))
if err != nil {
return fmt.Errorf("create form file failed, err: %w", err)
}

_, err = io.Copy(w, io2.MustLength(file, partSize))
if err == io.EOF {
continue
}
if err == io.ErrUnexpectedEOF {
break
}
if err != nil {
return err
}
}
return nil
}

func sendFileOnePart(muWriter *multipart.Writer, fieldName, fileName string, file io.ReadCloser) error {
w, err := muWriter.CreateFormFile(fieldName, ul.PathEscape(fileName))
if err != nil {
return fmt.Errorf("create form file failed, err: %w", err)
}

_, err = io.Copy(w, file)
return err
}

func prepareQuery(req *http.Request, query any) error {
if query == nil {
return nil


Loading…
Cancel
Save