Reviewed-by: palytoxin <sxty32@126.com>tags/v1.21.12.1
@@ -27,11 +27,9 @@ type ErrorResponse struct { | |||
func (e ErrorResponse) Error() string { | |||
return e.Message | |||
} | |||
const ( | |||
reportIssue = "Please report this issue at https://github.com/minio/minio/issues." | |||
) | |||
// httpRespToErrorResponse returns a new encoded ErrorResponse | |||
// structure as error. | |||
func httpRespToErrorResponse(resp *http.Response, bucketName, objectName string) error { | |||
@@ -124,7 +122,6 @@ func ToErrorResponse(err error) ErrorResponse { | |||
return ErrorResponse{} | |||
} | |||
} | |||
// ErrInvalidArgument - Invalid argument response. | |||
func ErrInvalidArgument(message string) error { | |||
return ErrorResponse{ | |||
@@ -0,0 +1,90 @@ | |||
package minio_ext | |||
import ( | |||
"context" | |||
"fmt" | |||
"net/http" | |||
"net/url" | |||
"strings" | |||
) | |||
// ListObjectParts list all object parts recursively. | |||
func (c Client) ListObjectParts(bucketName, objectName, uploadID string) (partsInfo map[int]ObjectPart, err error) { | |||
// Part number marker for the next batch of request. | |||
var nextPartNumberMarker int | |||
partsInfo = make(map[int]ObjectPart) | |||
for { | |||
// Get list of uploaded parts a maximum of 1000 per request. | |||
listObjPartsResult, err := c.listObjectPartsQuery(bucketName, objectName, uploadID, nextPartNumberMarker, 1000) | |||
if err != nil { | |||
return nil, err | |||
} | |||
// Append to parts info. | |||
for _, part := range listObjPartsResult.ObjectParts { | |||
// Trim off the odd double quotes from ETag in the beginning and end. | |||
part.ETag = strings.TrimPrefix(part.ETag, "\"") | |||
part.ETag = strings.TrimSuffix(part.ETag, "\"") | |||
partsInfo[part.PartNumber] = part | |||
} | |||
// Keep part number marker, for the next iteration. | |||
nextPartNumberMarker = listObjPartsResult.NextPartNumberMarker | |||
// Listing ends result is not truncated, return right here. | |||
if !listObjPartsResult.IsTruncated { | |||
break | |||
} | |||
} | |||
// Return all the parts. | |||
return partsInfo, nil | |||
} | |||
// listObjectPartsQuery (List Parts query) | |||
// - lists some or all (up to 1000) parts that have been uploaded | |||
// for a specific multipart upload | |||
// | |||
// You can use the request parameters as selection criteria to return | |||
// a subset of the uploads in a bucket, request parameters :- | |||
// --------- | |||
// ?part-number-marker - Specifies the part after which listing should | |||
// begin. | |||
// ?max-parts - Maximum parts to be listed per request. | |||
func (c Client) listObjectPartsQuery(bucketName, objectName, uploadID string, partNumberMarker, maxParts int) (ListObjectPartsResult, error) { | |||
// Get resources properly escaped and lined up before using them in http request. | |||
urlValues := make(url.Values) | |||
// Set part number marker. | |||
urlValues.Set("part-number-marker", fmt.Sprintf("%d", partNumberMarker)) | |||
// Set upload id. | |||
urlValues.Set("uploadId", uploadID) | |||
// maxParts should be 1000 or less. | |||
if maxParts == 0 || maxParts > 1000 { | |||
maxParts = 1000 | |||
} | |||
// Set max parts. | |||
urlValues.Set("max-parts", fmt.Sprintf("%d", maxParts)) | |||
// Execute GET on objectName to get list of parts. | |||
resp, err := c.executeMethod(context.Background(), "GET", requestMetadata{ | |||
bucketName: bucketName, | |||
objectName: objectName, | |||
queryValues: urlValues, | |||
contentSHA256Hex: emptySHA256Hex, | |||
}) | |||
defer closeResponse(resp) | |||
if err != nil { | |||
return ListObjectPartsResult{}, err | |||
} | |||
if resp != nil { | |||
if resp.StatusCode != http.StatusOK { | |||
return ListObjectPartsResult{}, httpRespToErrorResponse(resp, bucketName, objectName) | |||
} | |||
} | |||
// Decode list object parts XML. | |||
listObjectPartsResult := ListObjectPartsResult{} | |||
err = xmlDecoder(resp.Body, &listObjectPartsResult) | |||
if err != nil { | |||
return listObjectPartsResult, err | |||
} | |||
return listObjectPartsResult, nil | |||
} |
@@ -0,0 +1,71 @@ | |||
/* | |||
* Minio Go Library for Amazon S3 Compatible Cloud Storage | |||
* Copyright 2015-2017 Minio, Inc. | |||
* | |||
* Licensed under the Apache License, Version 2.0 (the "License"); | |||
* you may not use this file except in compliance with the License. | |||
* You may obtain a copy of the License at | |||
* | |||
* http://www.apache.org/licenses/LICENSE-2.0 | |||
* | |||
* Unless required by applicable law or agreed to in writing, software | |||
* distributed under the License is distributed on an "AS IS" BASIS, | |||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |||
* See the License for the specific language governing permissions and | |||
* limitations under the License. | |||
*/ | |||
package minio_ext | |||
import ( | |||
"time" | |||
) | |||
// owner container for bucket owner information. | |||
type owner struct { | |||
DisplayName string | |||
ID string | |||
} | |||
// initiator container for who initiated multipart upload. | |||
type initiator struct { | |||
ID string | |||
DisplayName string | |||
} | |||
// ObjectPart container for particular part of an object. | |||
type ObjectPart struct { | |||
// Part number identifies the part. | |||
PartNumber int | |||
// Date and time the part was uploaded. | |||
LastModified time.Time | |||
// Entity tag returned when the part was uploaded, usually md5sum | |||
// of the part. | |||
ETag string | |||
// Size of the uploaded part data. | |||
Size int64 | |||
} | |||
// ListObjectPartsResult container for ListObjectParts response. | |||
type ListObjectPartsResult struct { | |||
Bucket string | |||
Key string | |||
UploadID string `xml:"UploadId"` | |||
Initiator initiator | |||
Owner owner | |||
StorageClass string | |||
PartNumberMarker int | |||
NextPartNumberMarker int | |||
MaxParts int | |||
// Indicates whether the returned list of parts is truncated. | |||
IsTruncated bool | |||
ObjectParts []ObjectPart `xml:"Part"` | |||
EncodingType string | |||
} |
@@ -1,6 +1,8 @@ | |||
package minio_ext | |||
import ( | |||
"bytes" | |||
"context" | |||
"errors" | |||
"fmt" | |||
"io" | |||
@@ -12,6 +14,7 @@ import ( | |||
"net/http/cookiejar" | |||
"net/http/httputil" | |||
"net/url" | |||
"os" | |||
"path" | |||
"runtime" | |||
"strconv" | |||
@@ -25,23 +28,6 @@ import ( | |||
"golang.org/x/net/publicsuffix" | |||
) | |||
// MaxRetry is the maximum number of retries before stopping. | |||
var MaxRetry = 10 | |||
// MaxJitter will randomize over the full exponential backoff time | |||
const MaxJitter = 1.0 | |||
// NoJitter disables the use of jitter for randomizing the exponential backoff time | |||
const NoJitter = 0.0 | |||
// DefaultRetryUnit - default unit multiplicative per retry. | |||
// defaults to 1 second. | |||
const DefaultRetryUnit = time.Second | |||
// DefaultRetryCap - Each retry attempt never waits no longer than | |||
// this maximum time duration. | |||
const DefaultRetryCap = time.Second * 30 | |||
// Global constants. | |||
const ( | |||
libraryName = "minio-go" | |||
@@ -156,6 +142,7 @@ func (r *lockedRandSource) Seed(seed int64) { | |||
r.lk.Unlock() | |||
} | |||
// Different types of url lookup supported by the server.Initialized to BucketLookupAuto | |||
const ( | |||
BucketLookupAuto BucketLookupType = iota | |||
@@ -163,21 +150,6 @@ const ( | |||
BucketLookupPath | |||
) | |||
// List of AWS S3 error codes which are retryable. | |||
var retryableS3Codes = map[string]struct{}{ | |||
"RequestError": {}, | |||
"RequestTimeout": {}, | |||
"Throttling": {}, | |||
"ThrottlingException": {}, | |||
"RequestLimitExceeded": {}, | |||
"RequestThrottled": {}, | |||
"InternalError": {}, | |||
"ExpiredToken": {}, | |||
"ExpiredTokenException": {}, | |||
"SlowDown": {}, | |||
// Add more AWS S3 codes here. | |||
} | |||
// awsS3EndpointMap Amazon S3 endpoint map. | |||
var awsS3EndpointMap = map[string]string{ | |||
"us-east-1": "s3.dualstack.us-east-1.amazonaws.com", | |||
@@ -253,16 +225,6 @@ var successStatus = []int{ | |||
http.StatusPartialContent, | |||
} | |||
// List of HTTP status codes which are retryable. | |||
var retryableHTTPStatusCodes = map[int]struct{}{ | |||
429: {}, // http.StatusTooManyRequests is not part of the Go 1.5 library, yet | |||
http.StatusInternalServerError: {}, | |||
http.StatusBadGateway: {}, | |||
http.StatusServiceUnavailable: {}, | |||
http.StatusGatewayTimeout: {}, | |||
// Add more HTTP status codes here. | |||
} | |||
// newBucketLocationCache - Provides a new bucket location cache to be | |||
// used internally with the client object. | |||
func newBucketLocationCache() *bucketLocationCache { | |||
@@ -406,55 +368,6 @@ func New(endpoint, accessKeyID, secretAccessKey string, secure bool) (*Client, e | |||
return clnt, nil | |||
} | |||
// isHTTPStatusRetryable - is HTTP error code retryable. | |||
func isHTTPStatusRetryable(httpStatusCode int) (ok bool) { | |||
_, ok = retryableHTTPStatusCodes[httpStatusCode] | |||
return ok | |||
} | |||
// newRetryTimer creates a timer with exponentially increasing | |||
// delays until the maximum retry attempts are reached. | |||
func (c Client) newRetryTimer(maxRetry int, unit time.Duration, cap time.Duration, jitter float64, doneCh chan struct{}) <-chan int { | |||
attemptCh := make(chan int) | |||
// computes the exponential backoff duration according to | |||
// https://www.awsarchitectureblog.com/2015/03/backoff.html | |||
exponentialBackoffWait := func(attempt int) time.Duration { | |||
// normalize jitter to the range [0, 1.0] | |||
if jitter < NoJitter { | |||
jitter = NoJitter | |||
} | |||
if jitter > MaxJitter { | |||
jitter = MaxJitter | |||
} | |||
//sleep = random_between(0, min(cap, base * 2 ** attempt)) | |||
sleep := unit * time.Duration(1<<uint(attempt)) | |||
if sleep > cap { | |||
sleep = cap | |||
} | |||
if jitter != NoJitter { | |||
sleep -= time.Duration(c.random.Float64() * float64(sleep) * jitter) | |||
} | |||
return sleep | |||
} | |||
go func() { | |||
defer close(attemptCh) | |||
for i := 0; i < maxRetry; i++ { | |||
select { | |||
// Attempts start from 1. | |||
case attemptCh <- i + 1: | |||
case <-doneCh: | |||
// Stop the routine. | |||
return | |||
} | |||
time.Sleep(exponentialBackoffWait(i)) | |||
} | |||
}() | |||
return attemptCh | |||
} | |||
// Get - Returns a value of a given key if it exists. | |||
func (r *bucketLocationCache) Get(bucketName string) (location string, ok bool) { | |||
r.RLock() | |||
@@ -991,7 +904,8 @@ func (c Client) newRequest(method string, metadata requestMetadata) (req *http.R | |||
return req, nil | |||
} | |||
func (c Client) GenUploadPartSignedUrl(uploadID string, bucketName string, objectName string, partNumber int, size int64, expires time.Duration, bucketLocation string) (string, error) { | |||
func (c Client) GenUploadPartSignedUrl(uploadID string, bucketName string, objectName string, partNumber int, size int64, expires time.Duration, bucketLocation string) (string, error){ | |||
signedUrl := "" | |||
// Input validation. | |||
@@ -1025,17 +939,17 @@ func (c Client) GenUploadPartSignedUrl(uploadID string, bucketName string, objec | |||
customHeader := make(http.Header) | |||
reqMetadata := requestMetadata{ | |||
presignURL: true, | |||
bucketName: bucketName, | |||
objectName: objectName, | |||
queryValues: urlValues, | |||
customHeader: customHeader, | |||
presignURL: true, | |||
bucketName: bucketName, | |||
objectName: objectName, | |||
queryValues: urlValues, | |||
customHeader: customHeader, | |||
//contentBody: reader, | |||
contentLength: size, | |||
contentLength: size, | |||
//contentMD5Base64: md5Base64, | |||
//contentSHA256Hex: sha256Hex, | |||
expires: int64(expires / time.Second), | |||
bucketLocation: bucketLocation, | |||
expires: int64(expires/time.Second), | |||
bucketLocation: bucketLocation, | |||
} | |||
req, err := c.newRequest("PUT", reqMetadata) | |||
@@ -1045,5 +959,142 @@ func (c Client) GenUploadPartSignedUrl(uploadID string, bucketName string, objec | |||
} | |||
signedUrl = req.URL.String() | |||
return signedUrl, nil | |||
return signedUrl,nil | |||
} | |||
// executeMethod - instantiates a given method, and retries the | |||
// request upon any error up to maxRetries attempts in a binomially | |||
// delayed manner using a standard back off algorithm. | |||
func (c Client) executeMethod(ctx context.Context, method string, metadata requestMetadata) (res *http.Response, err error) { | |||
var isRetryable bool // Indicates if request can be retried. | |||
var bodySeeker io.Seeker // Extracted seeker from io.Reader. | |||
var reqRetry = MaxRetry // Indicates how many times we can retry the request | |||
if metadata.contentBody != nil { | |||
// Check if body is seekable then it is retryable. | |||
bodySeeker, isRetryable = metadata.contentBody.(io.Seeker) | |||
switch bodySeeker { | |||
case os.Stdin, os.Stdout, os.Stderr: | |||
isRetryable = false | |||
} | |||
// Retry only when reader is seekable | |||
if !isRetryable { | |||
reqRetry = 1 | |||
} | |||
// Figure out if the body can be closed - if yes | |||
// we will definitely close it upon the function | |||
// return. | |||
bodyCloser, ok := metadata.contentBody.(io.Closer) | |||
if ok { | |||
defer bodyCloser.Close() | |||
} | |||
} | |||
// Create a done channel to control 'newRetryTimer' go routine. | |||
doneCh := make(chan struct{}, 1) | |||
// Indicate to our routine to exit cleanly upon return. | |||
defer close(doneCh) | |||
// Blank indentifier is kept here on purpose since 'range' without | |||
// blank identifiers is only supported since go1.4 | |||
// https://golang.org/doc/go1.4#forrange. | |||
for range c.newRetryTimer(reqRetry, DefaultRetryUnit, DefaultRetryCap, MaxJitter, doneCh) { | |||
// Retry executes the following function body if request has an | |||
// error until maxRetries have been exhausted, retry attempts are | |||
// performed after waiting for a given period of time in a | |||
// binomial fashion. | |||
if isRetryable { | |||
// Seek back to beginning for each attempt. | |||
if _, err = bodySeeker.Seek(0, 0); err != nil { | |||
// If seek failed, no need to retry. | |||
return nil, err | |||
} | |||
} | |||
// Instantiate a new request. | |||
var req *http.Request | |||
req, err = c.newRequest(method, metadata) | |||
if err != nil { | |||
errResponse := ToErrorResponse(err) | |||
if isS3CodeRetryable(errResponse.Code) { | |||
continue // Retry. | |||
} | |||
return nil, err | |||
} | |||
// Add context to request | |||
req = req.WithContext(ctx) | |||
// Initiate the request. | |||
res, err = c.do(req) | |||
if err != nil { | |||
// For supported http requests errors verify. | |||
if isHTTPReqErrorRetryable(err) { | |||
continue // Retry. | |||
} | |||
// For other errors, return here no need to retry. | |||
return nil, err | |||
} | |||
// For any known successful http status, return quickly. | |||
for _, httpStatus := range successStatus { | |||
if httpStatus == res.StatusCode { | |||
return res, nil | |||
} | |||
} | |||
// Read the body to be saved later. | |||
errBodyBytes, err := ioutil.ReadAll(res.Body) | |||
// res.Body should be closed | |||
closeResponse(res) | |||
if err != nil { | |||
return nil, err | |||
} | |||
// Save the body. | |||
errBodySeeker := bytes.NewReader(errBodyBytes) | |||
res.Body = ioutil.NopCloser(errBodySeeker) | |||
// For errors verify if its retryable otherwise fail quickly. | |||
errResponse := ToErrorResponse(httpRespToErrorResponse(res, metadata.bucketName, metadata.objectName)) | |||
// Save the body back again. | |||
errBodySeeker.Seek(0, 0) // Seek back to starting point. | |||
res.Body = ioutil.NopCloser(errBodySeeker) | |||
// Bucket region if set in error response and the error | |||
// code dictates invalid region, we can retry the request | |||
// with the new region. | |||
// | |||
// Additionally we should only retry if bucketLocation and custom | |||
// region is empty. | |||
if metadata.bucketLocation == "" && c.region == "" { | |||
if errResponse.Code == "AuthorizationHeaderMalformed" || errResponse.Code == "InvalidRegion" { | |||
if metadata.bucketName != "" && errResponse.Region != "" { | |||
// Gather Cached location only if bucketName is present. | |||
if _, cachedLocationError := c.bucketLocCache.Get(metadata.bucketName); cachedLocationError != false { | |||
c.bucketLocCache.Set(metadata.bucketName, errResponse.Region) | |||
continue // Retry. | |||
} | |||
} | |||
} | |||
} | |||
// Verify if error response code is retryable. | |||
if isS3CodeRetryable(errResponse.Code) { | |||
continue // Retry. | |||
} | |||
// Verify if http status code is retryable. | |||
if isHTTPStatusRetryable(res.StatusCode) { | |||
continue // Retry. | |||
} | |||
// For all other cases break out of the retry loop. | |||
break | |||
} | |||
return res, err | |||
} |
@@ -8,12 +8,10 @@ import ( | |||
// StringMap represents map with custom UnmarshalXML | |||
type StringMap map[string]string | |||
// CommonPrefix container for prefix response. | |||
type CommonPrefix struct { | |||
Prefix string | |||
} | |||
// ObjectInfo container for object metadata. | |||
type ObjectInfo struct { | |||
// An ETag is optionally set to md5sum of an object. In case of multipart objects, | |||
@@ -46,7 +44,6 @@ type ObjectInfo struct { | |||
// Error | |||
Err error `json:"-"` | |||
} | |||
// ListBucketResult container for listObjects response. | |||
type ListBucketResult struct { | |||
// A response can contain CommonPrefixes only if you have | |||
@@ -0,0 +1,153 @@ | |||
/* | |||
* Minio Go Library for Amazon S3 Compatible Cloud Storage | |||
* Copyright 2015-2017 Minio, Inc. | |||
* | |||
* Licensed under the Apache License, Version 2.0 (the "License"); | |||
* you may not use this file except in compliance with the License. | |||
* You may obtain a copy of the License at | |||
* | |||
* http://www.apache.org/licenses/LICENSE-2.0 | |||
* | |||
* Unless required by applicable law or agreed to in writing, software | |||
* distributed under the License is distributed on an "AS IS" BASIS, | |||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |||
* See the License for the specific language governing permissions and | |||
* limitations under the License. | |||
*/ | |||
package minio_ext | |||
import ( | |||
"net" | |||
"net/http" | |||
"net/url" | |||
"strings" | |||
"time" | |||
) | |||
// MaxRetry is the maximum number of retries before stopping. | |||
var MaxRetry = 10 | |||
// MaxJitter will randomize over the full exponential backoff time | |||
const MaxJitter = 1.0 | |||
// NoJitter disables the use of jitter for randomizing the exponential backoff time | |||
const NoJitter = 0.0 | |||
// DefaultRetryUnit - default unit multiplicative per retry. | |||
// defaults to 1 second. | |||
const DefaultRetryUnit = time.Second | |||
// DefaultRetryCap - Each retry attempt never waits no longer than | |||
// this maximum time duration. | |||
const DefaultRetryCap = time.Second * 30 | |||
// newRetryTimer creates a timer with exponentially increasing | |||
// delays until the maximum retry attempts are reached. | |||
func (c Client) newRetryTimer(maxRetry int, unit time.Duration, cap time.Duration, jitter float64, doneCh chan struct{}) <-chan int { | |||
attemptCh := make(chan int) | |||
// computes the exponential backoff duration according to | |||
// https://www.awsarchitectureblog.com/2015/03/backoff.html | |||
exponentialBackoffWait := func(attempt int) time.Duration { | |||
// normalize jitter to the range [0, 1.0] | |||
if jitter < NoJitter { | |||
jitter = NoJitter | |||
} | |||
if jitter > MaxJitter { | |||
jitter = MaxJitter | |||
} | |||
//sleep = random_between(0, min(cap, base * 2 ** attempt)) | |||
sleep := unit * time.Duration(1<<uint(attempt)) | |||
if sleep > cap { | |||
sleep = cap | |||
} | |||
if jitter != NoJitter { | |||
sleep -= time.Duration(c.random.Float64() * float64(sleep) * jitter) | |||
} | |||
return sleep | |||
} | |||
go func() { | |||
defer close(attemptCh) | |||
for i := 0; i < maxRetry; i++ { | |||
select { | |||
// Attempts start from 1. | |||
case attemptCh <- i + 1: | |||
case <-doneCh: | |||
// Stop the routine. | |||
return | |||
} | |||
time.Sleep(exponentialBackoffWait(i)) | |||
} | |||
}() | |||
return attemptCh | |||
} | |||
// isHTTPReqErrorRetryable - is http requests error retryable, such | |||
// as i/o timeout, connection broken etc.. | |||
func isHTTPReqErrorRetryable(err error) bool { | |||
if err == nil { | |||
return false | |||
} | |||
switch e := err.(type) { | |||
case *url.Error: | |||
switch e.Err.(type) { | |||
case *net.DNSError, *net.OpError, net.UnknownNetworkError: | |||
return true | |||
} | |||
if strings.Contains(err.Error(), "Connection closed by foreign host") { | |||
return true | |||
} else if strings.Contains(err.Error(), "net/http: TLS handshake timeout") { | |||
// If error is - tlsHandshakeTimeoutError, retry. | |||
return true | |||
} else if strings.Contains(err.Error(), "i/o timeout") { | |||
// If error is - tcp timeoutError, retry. | |||
return true | |||
} else if strings.Contains(err.Error(), "connection timed out") { | |||
// If err is a net.Dial timeout, retry. | |||
return true | |||
} else if strings.Contains(err.Error(), "net/http: HTTP/1.x transport connection broken") { | |||
// If error is transport connection broken, retry. | |||
return true | |||
} | |||
} | |||
return false | |||
} | |||
// List of AWS S3 error codes which are retryable. | |||
var retryableS3Codes = map[string]struct{}{ | |||
"RequestError": {}, | |||
"RequestTimeout": {}, | |||
"Throttling": {}, | |||
"ThrottlingException": {}, | |||
"RequestLimitExceeded": {}, | |||
"RequestThrottled": {}, | |||
"InternalError": {}, | |||
"ExpiredToken": {}, | |||
"ExpiredTokenException": {}, | |||
"SlowDown": {}, | |||
// Add more AWS S3 codes here. | |||
} | |||
// isS3CodeRetryable - is s3 error code retryable. | |||
func isS3CodeRetryable(s3Code string) (ok bool) { | |||
_, ok = retryableS3Codes[s3Code] | |||
return ok | |||
} | |||
// List of HTTP status codes which are retryable. | |||
var retryableHTTPStatusCodes = map[int]struct{}{ | |||
429: {}, // http.StatusTooManyRequests is not part of the Go 1.5 library, yet | |||
http.StatusInternalServerError: {}, | |||
http.StatusBadGateway: {}, | |||
http.StatusServiceUnavailable: {}, | |||
// Add more HTTP status codes here. | |||
} | |||
// isHTTPStatusRetryable - is HTTP error code retryable. | |||
func isHTTPStatusRetryable(httpStatusCode int) (ok bool) { | |||
_, ok = retryableHTTPStatusCodes[httpStatusCode] | |||
return ok | |||
} |
@@ -129,8 +129,8 @@ func NewMultiPartUpload(uuid string) (string, error) { | |||
return core.NewMultipartUpload(bucketName, objectName, miniov6.PutObjectOptions{}) | |||
} | |||
func CompleteMultiPartUpload(uuid string, uploadID string, complParts []string) (string, error) { | |||
_, core, err := getClients() | |||
func CompleteMultiPartUpload(uuid string, uploadID string) (string, error) { | |||
client, core, err := getClients() | |||
if err != nil { | |||
log.Error("getClients failed:", err.Error()) | |||
return "", err | |||
@@ -140,16 +140,17 @@ func CompleteMultiPartUpload(uuid string, uploadID string, complParts []string) | |||
bucketName := minio.Bucket | |||
objectName := strings.TrimPrefix(path.Join(minio.BasePath, path.Join(uuid[0:1], uuid[1:2], uuid)), "/") | |||
partInfos, err := client.ListObjectParts(bucketName, objectName, uploadID) | |||
if err != nil { | |||
log.Error("ListObjectParts failed:", err.Error()) | |||
return "", err | |||
} | |||
var complMultipartUpload completeMultipartUpload | |||
for _, part := range complParts { | |||
partNumber, err := strconv.Atoi(strings.Split(part, "-")[0]) | |||
if err != nil { | |||
log.Error(err.Error()) | |||
return "", err | |||
} | |||
for _, partInfo := range partInfos { | |||
complMultipartUpload.Parts = append(complMultipartUpload.Parts, miniov6.CompletePart{ | |||
PartNumber: partNumber, | |||
ETag: strings.Split(part, "-")[1], | |||
PartNumber: partInfo.PartNumber, | |||
ETag: partInfo.ETag, | |||
}) | |||
} | |||
@@ -158,3 +159,28 @@ func CompleteMultiPartUpload(uuid string, uploadID string, complParts []string) | |||
return core.CompleteMultipartUpload(bucketName, objectName, uploadID, complMultipartUpload.Parts) | |||
} | |||
func GetPartInfos(uuid string, uploadID string) (string, error) { | |||
minioClient, _, err := getClients() | |||
if err != nil { | |||
log.Error("getClients failed:", err.Error()) | |||
return "", err | |||
} | |||
minio := setting.Attachment.Minio | |||
bucketName := minio.Bucket | |||
objectName := strings.TrimPrefix(path.Join(minio.BasePath, path.Join(uuid[0:1], uuid[1:2], uuid)), "/") | |||
partInfos, err := minioClient.ListObjectParts(bucketName, objectName, uploadID) | |||
if err != nil { | |||
log.Error("ListObjectParts failed:", err.Error()) | |||
return "", err | |||
} | |||
var chunks string | |||
for _, partInfo := range partInfos { | |||
chunks += strconv.Itoa(partInfo.PartNumber) + "-" + partInfo.ETag + "," | |||
} | |||
return chunks, nil | |||
} |
@@ -6,7 +6,6 @@ package repo | |||
import ( | |||
contexExt "context" | |||
"encoding/json" | |||
"fmt" | |||
"net/http" | |||
"strconv" | |||
@@ -333,6 +332,7 @@ func UpdateAttachmentDecompressState(ctx *context.Context) { | |||
func GetSuccessChunks(ctx *context.Context) { | |||
fileMD5 := ctx.Query("md5") | |||
var chunks string | |||
fileChunk, err := models.GetFileChunkByMD5AndUser(fileMD5, ctx.User.ID) | |||
if err != nil { | |||
@@ -349,12 +349,36 @@ func GetSuccessChunks(ctx *context.Context) { | |||
return | |||
} | |||
chunks, err := json.Marshal(fileChunk.CompletedParts) | |||
isExist, err := storage.Attachments.HasObject(models.AttachmentRelativePath(fileChunk.UUID)) | |||
if err != nil { | |||
ctx.ServerError("json.Marshal failed", err) | |||
ctx.ServerError("HasObject failed", err) | |||
return | |||
} | |||
if isExist { | |||
if fileChunk.IsUploaded == models.FileNotUploaded { | |||
log.Info("the file has been uploaded but not recorded") | |||
fileChunk.IsUploaded = models.FileUploaded | |||
if err = models.UpdateFileChunk(fileChunk); err != nil { | |||
log.Error("UpdateFileChunk failed:", err.Error()) | |||
} | |||
} | |||
} else { | |||
if fileChunk.IsUploaded == models.FileUploaded { | |||
log.Info("the file has been recorded but not uploaded") | |||
fileChunk.IsUploaded = models.FileNotUploaded | |||
if err = models.UpdateFileChunk(fileChunk); err != nil { | |||
log.Error("UpdateFileChunk failed:", err.Error()) | |||
} | |||
} | |||
chunks, err = storage.GetPartInfos(fileChunk.UUID, fileChunk.UploadID) | |||
if err != nil { | |||
ctx.ServerError("json.Marshal failed", err) | |||
return | |||
} | |||
} | |||
var attachID int64 | |||
attach, err := models.GetAttachmentByUUID(fileChunk.UUID) | |||
if err != nil { | |||
@@ -479,7 +503,7 @@ func CompleteMultipart(ctx *context.Context) { | |||
return | |||
} | |||
_, err = storage.CompleteMultiPartUpload(uuid, uploadID, fileChunk.CompletedParts) | |||
_, err = storage.CompleteMultiPartUpload(uuid, uploadID) | |||
if err != nil { | |||
ctx.Error(500, fmt.Sprintf("CompleteMultiPartUpload failed: %v", err)) | |||
return | |||
@@ -358,7 +358,7 @@ export default { | |||
await uploadMinio(urls[currentChunk], e); | |||
if (etags[currentChunk] != '') { | |||
// 更新数据库:分片上传结果 | |||
await updateChunk(currentChunk); | |||
//await updateChunk(currentChunk); | |||
} else { | |||
console.log("上传到minio uploadChunk etags[currentChunk] == ''");// TODO | |||
} | |||
@@ -390,7 +390,7 @@ export default { | |||
let successParts = []; | |||
successParts = file.chunks.split(','); | |||
for (let i = 0; i < successParts.length; i++) { | |||
successChunks[i] = successParts[i].split('-')[0].split('"')[1]; | |||
successChunks[i] = successParts[i].split('-')[0]; | |||
} | |||
const urls = []; // TODO const ? | |||
const etags = []; | |||