| @@ -27,11 +27,9 @@ type ErrorResponse struct { | |||||
| func (e ErrorResponse) Error() string { | func (e ErrorResponse) Error() string { | ||||
| return e.Message | return e.Message | ||||
| } | } | ||||
| const ( | const ( | ||||
| reportIssue = "Please report this issue at https://github.com/minio/minio/issues." | reportIssue = "Please report this issue at https://github.com/minio/minio/issues." | ||||
| ) | ) | ||||
| // httpRespToErrorResponse returns a new encoded ErrorResponse | // httpRespToErrorResponse returns a new encoded ErrorResponse | ||||
| // structure as error. | // structure as error. | ||||
| func httpRespToErrorResponse(resp *http.Response, bucketName, objectName string) error { | func httpRespToErrorResponse(resp *http.Response, bucketName, objectName string) error { | ||||
| @@ -124,7 +122,6 @@ func ToErrorResponse(err error) ErrorResponse { | |||||
| return ErrorResponse{} | return ErrorResponse{} | ||||
| } | } | ||||
| } | } | ||||
| // ErrInvalidArgument - Invalid argument response. | // ErrInvalidArgument - Invalid argument response. | ||||
| func ErrInvalidArgument(message string) error { | func ErrInvalidArgument(message string) error { | ||||
| return ErrorResponse{ | return ErrorResponse{ | ||||
| @@ -0,0 +1,90 @@ | |||||
| package minio_ext | |||||
| import ( | |||||
| "context" | |||||
| "fmt" | |||||
| "net/http" | |||||
| "net/url" | |||||
| "strings" | |||||
| ) | |||||
| // ListObjectParts list all object parts recursively. | |||||
| func (c Client) ListObjectParts(bucketName, objectName, uploadID string) (partsInfo map[int]ObjectPart, err error) { | |||||
| // Part number marker for the next batch of request. | |||||
| var nextPartNumberMarker int | |||||
| partsInfo = make(map[int]ObjectPart) | |||||
| for { | |||||
| // Get list of uploaded parts a maximum of 1000 per request. | |||||
| listObjPartsResult, err := c.listObjectPartsQuery(bucketName, objectName, uploadID, nextPartNumberMarker, 1000) | |||||
| if err != nil { | |||||
| return nil, err | |||||
| } | |||||
| // Append to parts info. | |||||
| for _, part := range listObjPartsResult.ObjectParts { | |||||
| // Trim off the odd double quotes from ETag in the beginning and end. | |||||
| part.ETag = strings.TrimPrefix(part.ETag, "\"") | |||||
| part.ETag = strings.TrimSuffix(part.ETag, "\"") | |||||
| partsInfo[part.PartNumber] = part | |||||
| } | |||||
| // Keep part number marker, for the next iteration. | |||||
| nextPartNumberMarker = listObjPartsResult.NextPartNumberMarker | |||||
| // Listing ends result is not truncated, return right here. | |||||
| if !listObjPartsResult.IsTruncated { | |||||
| break | |||||
| } | |||||
| } | |||||
| // Return all the parts. | |||||
| return partsInfo, nil | |||||
| } | |||||
| // listObjectPartsQuery (List Parts query) | |||||
| // - lists some or all (up to 1000) parts that have been uploaded | |||||
| // for a specific multipart upload | |||||
| // | |||||
| // You can use the request parameters as selection criteria to return | |||||
| // a subset of the uploads in a bucket, request parameters :- | |||||
| // --------- | |||||
| // ?part-number-marker - Specifies the part after which listing should | |||||
| // begin. | |||||
| // ?max-parts - Maximum parts to be listed per request. | |||||
| func (c Client) listObjectPartsQuery(bucketName, objectName, uploadID string, partNumberMarker, maxParts int) (ListObjectPartsResult, error) { | |||||
| // Get resources properly escaped and lined up before using them in http request. | |||||
| urlValues := make(url.Values) | |||||
| // Set part number marker. | |||||
| urlValues.Set("part-number-marker", fmt.Sprintf("%d", partNumberMarker)) | |||||
| // Set upload id. | |||||
| urlValues.Set("uploadId", uploadID) | |||||
| // maxParts should be 1000 or less. | |||||
| if maxParts == 0 || maxParts > 1000 { | |||||
| maxParts = 1000 | |||||
| } | |||||
| // Set max parts. | |||||
| urlValues.Set("max-parts", fmt.Sprintf("%d", maxParts)) | |||||
| // Execute GET on objectName to get list of parts. | |||||
| resp, err := c.executeMethod(context.Background(), "GET", requestMetadata{ | |||||
| bucketName: bucketName, | |||||
| objectName: objectName, | |||||
| queryValues: urlValues, | |||||
| contentSHA256Hex: emptySHA256Hex, | |||||
| }) | |||||
| defer closeResponse(resp) | |||||
| if err != nil { | |||||
| return ListObjectPartsResult{}, err | |||||
| } | |||||
| if resp != nil { | |||||
| if resp.StatusCode != http.StatusOK { | |||||
| return ListObjectPartsResult{}, httpRespToErrorResponse(resp, bucketName, objectName) | |||||
| } | |||||
| } | |||||
| // Decode list object parts XML. | |||||
| listObjectPartsResult := ListObjectPartsResult{} | |||||
| err = xmlDecoder(resp.Body, &listObjectPartsResult) | |||||
| if err != nil { | |||||
| return listObjectPartsResult, err | |||||
| } | |||||
| return listObjectPartsResult, nil | |||||
| } | |||||
| @@ -0,0 +1,71 @@ | |||||
| /* | |||||
| * Minio Go Library for Amazon S3 Compatible Cloud Storage | |||||
| * Copyright 2015-2017 Minio, Inc. | |||||
| * | |||||
| * Licensed under the Apache License, Version 2.0 (the "License"); | |||||
| * you may not use this file except in compliance with the License. | |||||
| * You may obtain a copy of the License at | |||||
| * | |||||
| * http://www.apache.org/licenses/LICENSE-2.0 | |||||
| * | |||||
| * Unless required by applicable law or agreed to in writing, software | |||||
| * distributed under the License is distributed on an "AS IS" BASIS, | |||||
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |||||
| * See the License for the specific language governing permissions and | |||||
| * limitations under the License. | |||||
| */ | |||||
| package minio_ext | |||||
| import ( | |||||
| "time" | |||||
| ) | |||||
| // owner container for bucket owner information. | |||||
| type owner struct { | |||||
| DisplayName string | |||||
| ID string | |||||
| } | |||||
| // initiator container for who initiated multipart upload. | |||||
| type initiator struct { | |||||
| ID string | |||||
| DisplayName string | |||||
| } | |||||
| // ObjectPart container for particular part of an object. | |||||
| type ObjectPart struct { | |||||
| // Part number identifies the part. | |||||
| PartNumber int | |||||
| // Date and time the part was uploaded. | |||||
| LastModified time.Time | |||||
| // Entity tag returned when the part was uploaded, usually md5sum | |||||
| // of the part. | |||||
| ETag string | |||||
| // Size of the uploaded part data. | |||||
| Size int64 | |||||
| } | |||||
| // ListObjectPartsResult container for ListObjectParts response. | |||||
| type ListObjectPartsResult struct { | |||||
| Bucket string | |||||
| Key string | |||||
| UploadID string `xml:"UploadId"` | |||||
| Initiator initiator | |||||
| Owner owner | |||||
| StorageClass string | |||||
| PartNumberMarker int | |||||
| NextPartNumberMarker int | |||||
| MaxParts int | |||||
| // Indicates whether the returned list of parts is truncated. | |||||
| IsTruncated bool | |||||
| ObjectParts []ObjectPart `xml:"Part"` | |||||
| EncodingType string | |||||
| } | |||||
| @@ -1,6 +1,8 @@ | |||||
| package minio_ext | package minio_ext | ||||
| import ( | import ( | ||||
| "bytes" | |||||
| "context" | |||||
| "errors" | "errors" | ||||
| "fmt" | "fmt" | ||||
| "io" | "io" | ||||
| @@ -12,6 +14,7 @@ import ( | |||||
| "net/http/cookiejar" | "net/http/cookiejar" | ||||
| "net/http/httputil" | "net/http/httputil" | ||||
| "net/url" | "net/url" | ||||
| "os" | |||||
| "path" | "path" | ||||
| "runtime" | "runtime" | ||||
| "strconv" | "strconv" | ||||
| @@ -25,23 +28,6 @@ import ( | |||||
| "golang.org/x/net/publicsuffix" | "golang.org/x/net/publicsuffix" | ||||
| ) | ) | ||||
| // MaxRetry is the maximum number of retries before stopping. | |||||
| var MaxRetry = 10 | |||||
| // MaxJitter will randomize over the full exponential backoff time | |||||
| const MaxJitter = 1.0 | |||||
| // NoJitter disables the use of jitter for randomizing the exponential backoff time | |||||
| const NoJitter = 0.0 | |||||
| // DefaultRetryUnit - default unit multiplicative per retry. | |||||
| // defaults to 1 second. | |||||
| const DefaultRetryUnit = time.Second | |||||
| // DefaultRetryCap - Each retry attempt never waits no longer than | |||||
| // this maximum time duration. | |||||
| const DefaultRetryCap = time.Second * 30 | |||||
| // Global constants. | // Global constants. | ||||
| const ( | const ( | ||||
| libraryName = "minio-go" | libraryName = "minio-go" | ||||
| @@ -156,6 +142,7 @@ func (r *lockedRandSource) Seed(seed int64) { | |||||
| r.lk.Unlock() | r.lk.Unlock() | ||||
| } | } | ||||
| // Different types of url lookup supported by the server.Initialized to BucketLookupAuto | // Different types of url lookup supported by the server.Initialized to BucketLookupAuto | ||||
| const ( | const ( | ||||
| BucketLookupAuto BucketLookupType = iota | BucketLookupAuto BucketLookupType = iota | ||||
| @@ -163,21 +150,6 @@ const ( | |||||
| BucketLookupPath | BucketLookupPath | ||||
| ) | ) | ||||
| // List of AWS S3 error codes which are retryable. | |||||
| var retryableS3Codes = map[string]struct{}{ | |||||
| "RequestError": {}, | |||||
| "RequestTimeout": {}, | |||||
| "Throttling": {}, | |||||
| "ThrottlingException": {}, | |||||
| "RequestLimitExceeded": {}, | |||||
| "RequestThrottled": {}, | |||||
| "InternalError": {}, | |||||
| "ExpiredToken": {}, | |||||
| "ExpiredTokenException": {}, | |||||
| "SlowDown": {}, | |||||
| // Add more AWS S3 codes here. | |||||
| } | |||||
| // awsS3EndpointMap Amazon S3 endpoint map. | // awsS3EndpointMap Amazon S3 endpoint map. | ||||
| var awsS3EndpointMap = map[string]string{ | var awsS3EndpointMap = map[string]string{ | ||||
| "us-east-1": "s3.dualstack.us-east-1.amazonaws.com", | "us-east-1": "s3.dualstack.us-east-1.amazonaws.com", | ||||
| @@ -253,16 +225,6 @@ var successStatus = []int{ | |||||
| http.StatusPartialContent, | http.StatusPartialContent, | ||||
| } | } | ||||
| // List of HTTP status codes which are retryable. | |||||
| var retryableHTTPStatusCodes = map[int]struct{}{ | |||||
| 429: {}, // http.StatusTooManyRequests is not part of the Go 1.5 library, yet | |||||
| http.StatusInternalServerError: {}, | |||||
| http.StatusBadGateway: {}, | |||||
| http.StatusServiceUnavailable: {}, | |||||
| http.StatusGatewayTimeout: {}, | |||||
| // Add more HTTP status codes here. | |||||
| } | |||||
| // newBucketLocationCache - Provides a new bucket location cache to be | // newBucketLocationCache - Provides a new bucket location cache to be | ||||
| // used internally with the client object. | // used internally with the client object. | ||||
| func newBucketLocationCache() *bucketLocationCache { | func newBucketLocationCache() *bucketLocationCache { | ||||
| @@ -406,55 +368,6 @@ func New(endpoint, accessKeyID, secretAccessKey string, secure bool) (*Client, e | |||||
| return clnt, nil | return clnt, nil | ||||
| } | } | ||||
| // isHTTPStatusRetryable - is HTTP error code retryable. | |||||
| func isHTTPStatusRetryable(httpStatusCode int) (ok bool) { | |||||
| _, ok = retryableHTTPStatusCodes[httpStatusCode] | |||||
| return ok | |||||
| } | |||||
| // newRetryTimer creates a timer with exponentially increasing | |||||
| // delays until the maximum retry attempts are reached. | |||||
| func (c Client) newRetryTimer(maxRetry int, unit time.Duration, cap time.Duration, jitter float64, doneCh chan struct{}) <-chan int { | |||||
| attemptCh := make(chan int) | |||||
| // computes the exponential backoff duration according to | |||||
| // https://www.awsarchitectureblog.com/2015/03/backoff.html | |||||
| exponentialBackoffWait := func(attempt int) time.Duration { | |||||
| // normalize jitter to the range [0, 1.0] | |||||
| if jitter < NoJitter { | |||||
| jitter = NoJitter | |||||
| } | |||||
| if jitter > MaxJitter { | |||||
| jitter = MaxJitter | |||||
| } | |||||
| //sleep = random_between(0, min(cap, base * 2 ** attempt)) | |||||
| sleep := unit * time.Duration(1<<uint(attempt)) | |||||
| if sleep > cap { | |||||
| sleep = cap | |||||
| } | |||||
| if jitter != NoJitter { | |||||
| sleep -= time.Duration(c.random.Float64() * float64(sleep) * jitter) | |||||
| } | |||||
| return sleep | |||||
| } | |||||
| go func() { | |||||
| defer close(attemptCh) | |||||
| for i := 0; i < maxRetry; i++ { | |||||
| select { | |||||
| // Attempts start from 1. | |||||
| case attemptCh <- i + 1: | |||||
| case <-doneCh: | |||||
| // Stop the routine. | |||||
| return | |||||
| } | |||||
| time.Sleep(exponentialBackoffWait(i)) | |||||
| } | |||||
| }() | |||||
| return attemptCh | |||||
| } | |||||
| // Get - Returns a value of a given key if it exists. | // Get - Returns a value of a given key if it exists. | ||||
| func (r *bucketLocationCache) Get(bucketName string) (location string, ok bool) { | func (r *bucketLocationCache) Get(bucketName string) (location string, ok bool) { | ||||
| r.RLock() | r.RLock() | ||||
| @@ -991,7 +904,8 @@ func (c Client) newRequest(method string, metadata requestMetadata) (req *http.R | |||||
| return req, nil | return req, nil | ||||
| } | } | ||||
| func (c Client) GenUploadPartSignedUrl(uploadID string, bucketName string, objectName string, partNumber int, size int64, expires time.Duration, bucketLocation string) (string, error) { | |||||
| func (c Client) GenUploadPartSignedUrl(uploadID string, bucketName string, objectName string, partNumber int, size int64, expires time.Duration, bucketLocation string) (string, error){ | |||||
| signedUrl := "" | signedUrl := "" | ||||
| // Input validation. | // Input validation. | ||||
| @@ -1025,17 +939,17 @@ func (c Client) GenUploadPartSignedUrl(uploadID string, bucketName string, objec | |||||
| customHeader := make(http.Header) | customHeader := make(http.Header) | ||||
| reqMetadata := requestMetadata{ | reqMetadata := requestMetadata{ | ||||
| presignURL: true, | |||||
| bucketName: bucketName, | |||||
| objectName: objectName, | |||||
| queryValues: urlValues, | |||||
| customHeader: customHeader, | |||||
| presignURL: true, | |||||
| bucketName: bucketName, | |||||
| objectName: objectName, | |||||
| queryValues: urlValues, | |||||
| customHeader: customHeader, | |||||
| //contentBody: reader, | //contentBody: reader, | ||||
| contentLength: size, | |||||
| contentLength: size, | |||||
| //contentMD5Base64: md5Base64, | //contentMD5Base64: md5Base64, | ||||
| //contentSHA256Hex: sha256Hex, | //contentSHA256Hex: sha256Hex, | ||||
| expires: int64(expires / time.Second), | |||||
| bucketLocation: bucketLocation, | |||||
| expires: int64(expires/time.Second), | |||||
| bucketLocation: bucketLocation, | |||||
| } | } | ||||
| req, err := c.newRequest("PUT", reqMetadata) | req, err := c.newRequest("PUT", reqMetadata) | ||||
| @@ -1045,5 +959,142 @@ func (c Client) GenUploadPartSignedUrl(uploadID string, bucketName string, objec | |||||
| } | } | ||||
| signedUrl = req.URL.String() | signedUrl = req.URL.String() | ||||
| return signedUrl, nil | |||||
| return signedUrl,nil | |||||
| } | |||||
| // executeMethod - instantiates a given method, and retries the | |||||
| // request upon any error up to maxRetries attempts in a binomially | |||||
| // delayed manner using a standard back off algorithm. | |||||
| func (c Client) executeMethod(ctx context.Context, method string, metadata requestMetadata) (res *http.Response, err error) { | |||||
| var isRetryable bool // Indicates if request can be retried. | |||||
| var bodySeeker io.Seeker // Extracted seeker from io.Reader. | |||||
| var reqRetry = MaxRetry // Indicates how many times we can retry the request | |||||
| if metadata.contentBody != nil { | |||||
| // Check if body is seekable then it is retryable. | |||||
| bodySeeker, isRetryable = metadata.contentBody.(io.Seeker) | |||||
| switch bodySeeker { | |||||
| case os.Stdin, os.Stdout, os.Stderr: | |||||
| isRetryable = false | |||||
| } | |||||
| // Retry only when reader is seekable | |||||
| if !isRetryable { | |||||
| reqRetry = 1 | |||||
| } | |||||
| // Figure out if the body can be closed - if yes | |||||
| // we will definitely close it upon the function | |||||
| // return. | |||||
| bodyCloser, ok := metadata.contentBody.(io.Closer) | |||||
| if ok { | |||||
| defer bodyCloser.Close() | |||||
| } | |||||
| } | |||||
| // Create a done channel to control 'newRetryTimer' go routine. | |||||
| doneCh := make(chan struct{}, 1) | |||||
| // Indicate to our routine to exit cleanly upon return. | |||||
| defer close(doneCh) | |||||
| // Blank indentifier is kept here on purpose since 'range' without | |||||
| // blank identifiers is only supported since go1.4 | |||||
| // https://golang.org/doc/go1.4#forrange. | |||||
| for range c.newRetryTimer(reqRetry, DefaultRetryUnit, DefaultRetryCap, MaxJitter, doneCh) { | |||||
| // Retry executes the following function body if request has an | |||||
| // error until maxRetries have been exhausted, retry attempts are | |||||
| // performed after waiting for a given period of time in a | |||||
| // binomial fashion. | |||||
| if isRetryable { | |||||
| // Seek back to beginning for each attempt. | |||||
| if _, err = bodySeeker.Seek(0, 0); err != nil { | |||||
| // If seek failed, no need to retry. | |||||
| return nil, err | |||||
| } | |||||
| } | |||||
| // Instantiate a new request. | |||||
| var req *http.Request | |||||
| req, err = c.newRequest(method, metadata) | |||||
| if err != nil { | |||||
| errResponse := ToErrorResponse(err) | |||||
| if isS3CodeRetryable(errResponse.Code) { | |||||
| continue // Retry. | |||||
| } | |||||
| return nil, err | |||||
| } | |||||
| // Add context to request | |||||
| req = req.WithContext(ctx) | |||||
| // Initiate the request. | |||||
| res, err = c.do(req) | |||||
| if err != nil { | |||||
| // For supported http requests errors verify. | |||||
| if isHTTPReqErrorRetryable(err) { | |||||
| continue // Retry. | |||||
| } | |||||
| // For other errors, return here no need to retry. | |||||
| return nil, err | |||||
| } | |||||
| // For any known successful http status, return quickly. | |||||
| for _, httpStatus := range successStatus { | |||||
| if httpStatus == res.StatusCode { | |||||
| return res, nil | |||||
| } | |||||
| } | |||||
| // Read the body to be saved later. | |||||
| errBodyBytes, err := ioutil.ReadAll(res.Body) | |||||
| // res.Body should be closed | |||||
| closeResponse(res) | |||||
| if err != nil { | |||||
| return nil, err | |||||
| } | |||||
| // Save the body. | |||||
| errBodySeeker := bytes.NewReader(errBodyBytes) | |||||
| res.Body = ioutil.NopCloser(errBodySeeker) | |||||
| // For errors verify if its retryable otherwise fail quickly. | |||||
| errResponse := ToErrorResponse(httpRespToErrorResponse(res, metadata.bucketName, metadata.objectName)) | |||||
| // Save the body back again. | |||||
| errBodySeeker.Seek(0, 0) // Seek back to starting point. | |||||
| res.Body = ioutil.NopCloser(errBodySeeker) | |||||
| // Bucket region if set in error response and the error | |||||
| // code dictates invalid region, we can retry the request | |||||
| // with the new region. | |||||
| // | |||||
| // Additionally we should only retry if bucketLocation and custom | |||||
| // region is empty. | |||||
| if metadata.bucketLocation == "" && c.region == "" { | |||||
| if errResponse.Code == "AuthorizationHeaderMalformed" || errResponse.Code == "InvalidRegion" { | |||||
| if metadata.bucketName != "" && errResponse.Region != "" { | |||||
| // Gather Cached location only if bucketName is present. | |||||
| if _, cachedLocationError := c.bucketLocCache.Get(metadata.bucketName); cachedLocationError != false { | |||||
| c.bucketLocCache.Set(metadata.bucketName, errResponse.Region) | |||||
| continue // Retry. | |||||
| } | |||||
| } | |||||
| } | |||||
| } | |||||
| // Verify if error response code is retryable. | |||||
| if isS3CodeRetryable(errResponse.Code) { | |||||
| continue // Retry. | |||||
| } | |||||
| // Verify if http status code is retryable. | |||||
| if isHTTPStatusRetryable(res.StatusCode) { | |||||
| continue // Retry. | |||||
| } | |||||
| // For all other cases break out of the retry loop. | |||||
| break | |||||
| } | |||||
| return res, err | |||||
| } | } | ||||
| @@ -8,12 +8,10 @@ import ( | |||||
| // StringMap represents map with custom UnmarshalXML | // StringMap represents map with custom UnmarshalXML | ||||
| type StringMap map[string]string | type StringMap map[string]string | ||||
| // CommonPrefix container for prefix response. | // CommonPrefix container for prefix response. | ||||
| type CommonPrefix struct { | type CommonPrefix struct { | ||||
| Prefix string | Prefix string | ||||
| } | } | ||||
| // ObjectInfo container for object metadata. | // ObjectInfo container for object metadata. | ||||
| type ObjectInfo struct { | type ObjectInfo struct { | ||||
| // An ETag is optionally set to md5sum of an object. In case of multipart objects, | // An ETag is optionally set to md5sum of an object. In case of multipart objects, | ||||
| @@ -46,7 +44,6 @@ type ObjectInfo struct { | |||||
| // Error | // Error | ||||
| Err error `json:"-"` | Err error `json:"-"` | ||||
| } | } | ||||
| // ListBucketResult container for listObjects response. | // ListBucketResult container for listObjects response. | ||||
| type ListBucketResult struct { | type ListBucketResult struct { | ||||
| // A response can contain CommonPrefixes only if you have | // A response can contain CommonPrefixes only if you have | ||||
| @@ -0,0 +1,153 @@ | |||||
| /* | |||||
| * Minio Go Library for Amazon S3 Compatible Cloud Storage | |||||
| * Copyright 2015-2017 Minio, Inc. | |||||
| * | |||||
| * Licensed under the Apache License, Version 2.0 (the "License"); | |||||
| * you may not use this file except in compliance with the License. | |||||
| * You may obtain a copy of the License at | |||||
| * | |||||
| * http://www.apache.org/licenses/LICENSE-2.0 | |||||
| * | |||||
| * Unless required by applicable law or agreed to in writing, software | |||||
| * distributed under the License is distributed on an "AS IS" BASIS, | |||||
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |||||
| * See the License for the specific language governing permissions and | |||||
| * limitations under the License. | |||||
| */ | |||||
| package minio_ext | |||||
| import ( | |||||
| "net" | |||||
| "net/http" | |||||
| "net/url" | |||||
| "strings" | |||||
| "time" | |||||
| ) | |||||
| // MaxRetry is the maximum number of retries before stopping. | |||||
| var MaxRetry = 10 | |||||
| // MaxJitter will randomize over the full exponential backoff time | |||||
| const MaxJitter = 1.0 | |||||
| // NoJitter disables the use of jitter for randomizing the exponential backoff time | |||||
| const NoJitter = 0.0 | |||||
| // DefaultRetryUnit - default unit multiplicative per retry. | |||||
| // defaults to 1 second. | |||||
| const DefaultRetryUnit = time.Second | |||||
| // DefaultRetryCap - Each retry attempt never waits no longer than | |||||
| // this maximum time duration. | |||||
| const DefaultRetryCap = time.Second * 30 | |||||
| // newRetryTimer creates a timer with exponentially increasing | |||||
| // delays until the maximum retry attempts are reached. | |||||
| func (c Client) newRetryTimer(maxRetry int, unit time.Duration, cap time.Duration, jitter float64, doneCh chan struct{}) <-chan int { | |||||
| attemptCh := make(chan int) | |||||
| // computes the exponential backoff duration according to | |||||
| // https://www.awsarchitectureblog.com/2015/03/backoff.html | |||||
| exponentialBackoffWait := func(attempt int) time.Duration { | |||||
| // normalize jitter to the range [0, 1.0] | |||||
| if jitter < NoJitter { | |||||
| jitter = NoJitter | |||||
| } | |||||
| if jitter > MaxJitter { | |||||
| jitter = MaxJitter | |||||
| } | |||||
| //sleep = random_between(0, min(cap, base * 2 ** attempt)) | |||||
| sleep := unit * time.Duration(1<<uint(attempt)) | |||||
| if sleep > cap { | |||||
| sleep = cap | |||||
| } | |||||
| if jitter != NoJitter { | |||||
| sleep -= time.Duration(c.random.Float64() * float64(sleep) * jitter) | |||||
| } | |||||
| return sleep | |||||
| } | |||||
| go func() { | |||||
| defer close(attemptCh) | |||||
| for i := 0; i < maxRetry; i++ { | |||||
| select { | |||||
| // Attempts start from 1. | |||||
| case attemptCh <- i + 1: | |||||
| case <-doneCh: | |||||
| // Stop the routine. | |||||
| return | |||||
| } | |||||
| time.Sleep(exponentialBackoffWait(i)) | |||||
| } | |||||
| }() | |||||
| return attemptCh | |||||
| } | |||||
| // isHTTPReqErrorRetryable - is http requests error retryable, such | |||||
| // as i/o timeout, connection broken etc.. | |||||
| func isHTTPReqErrorRetryable(err error) bool { | |||||
| if err == nil { | |||||
| return false | |||||
| } | |||||
| switch e := err.(type) { | |||||
| case *url.Error: | |||||
| switch e.Err.(type) { | |||||
| case *net.DNSError, *net.OpError, net.UnknownNetworkError: | |||||
| return true | |||||
| } | |||||
| if strings.Contains(err.Error(), "Connection closed by foreign host") { | |||||
| return true | |||||
| } else if strings.Contains(err.Error(), "net/http: TLS handshake timeout") { | |||||
| // If error is - tlsHandshakeTimeoutError, retry. | |||||
| return true | |||||
| } else if strings.Contains(err.Error(), "i/o timeout") { | |||||
| // If error is - tcp timeoutError, retry. | |||||
| return true | |||||
| } else if strings.Contains(err.Error(), "connection timed out") { | |||||
| // If err is a net.Dial timeout, retry. | |||||
| return true | |||||
| } else if strings.Contains(err.Error(), "net/http: HTTP/1.x transport connection broken") { | |||||
| // If error is transport connection broken, retry. | |||||
| return true | |||||
| } | |||||
| } | |||||
| return false | |||||
| } | |||||
| // List of AWS S3 error codes which are retryable. | |||||
| var retryableS3Codes = map[string]struct{}{ | |||||
| "RequestError": {}, | |||||
| "RequestTimeout": {}, | |||||
| "Throttling": {}, | |||||
| "ThrottlingException": {}, | |||||
| "RequestLimitExceeded": {}, | |||||
| "RequestThrottled": {}, | |||||
| "InternalError": {}, | |||||
| "ExpiredToken": {}, | |||||
| "ExpiredTokenException": {}, | |||||
| "SlowDown": {}, | |||||
| // Add more AWS S3 codes here. | |||||
| } | |||||
| // isS3CodeRetryable - is s3 error code retryable. | |||||
| func isS3CodeRetryable(s3Code string) (ok bool) { | |||||
| _, ok = retryableS3Codes[s3Code] | |||||
| return ok | |||||
| } | |||||
| // List of HTTP status codes which are retryable. | |||||
| var retryableHTTPStatusCodes = map[int]struct{}{ | |||||
| 429: {}, // http.StatusTooManyRequests is not part of the Go 1.5 library, yet | |||||
| http.StatusInternalServerError: {}, | |||||
| http.StatusBadGateway: {}, | |||||
| http.StatusServiceUnavailable: {}, | |||||
| // Add more HTTP status codes here. | |||||
| } | |||||
| // isHTTPStatusRetryable - is HTTP error code retryable. | |||||
| func isHTTPStatusRetryable(httpStatusCode int) (ok bool) { | |||||
| _, ok = retryableHTTPStatusCodes[httpStatusCode] | |||||
| return ok | |||||
| } | |||||
| @@ -129,8 +129,8 @@ func NewMultiPartUpload(uuid string) (string, error) { | |||||
| return core.NewMultipartUpload(bucketName, objectName, miniov6.PutObjectOptions{}) | return core.NewMultipartUpload(bucketName, objectName, miniov6.PutObjectOptions{}) | ||||
| } | } | ||||
| func CompleteMultiPartUpload(uuid string, uploadID string, complParts []string) (string, error) { | |||||
| _, core, err := getClients() | |||||
| func CompleteMultiPartUpload(uuid string, uploadID string) (string, error) { | |||||
| client, core, err := getClients() | |||||
| if err != nil { | if err != nil { | ||||
| log.Error("getClients failed:", err.Error()) | log.Error("getClients failed:", err.Error()) | ||||
| return "", err | return "", err | ||||
| @@ -140,16 +140,17 @@ func CompleteMultiPartUpload(uuid string, uploadID string, complParts []string) | |||||
| bucketName := minio.Bucket | bucketName := minio.Bucket | ||||
| objectName := strings.TrimPrefix(path.Join(minio.BasePath, path.Join(uuid[0:1], uuid[1:2], uuid)), "/") | objectName := strings.TrimPrefix(path.Join(minio.BasePath, path.Join(uuid[0:1], uuid[1:2], uuid)), "/") | ||||
| partInfos, err := client.ListObjectParts(bucketName, objectName, uploadID) | |||||
| if err != nil { | |||||
| log.Error("ListObjectParts failed:", err.Error()) | |||||
| return "", err | |||||
| } | |||||
| var complMultipartUpload completeMultipartUpload | var complMultipartUpload completeMultipartUpload | ||||
| for _, part := range complParts { | |||||
| partNumber, err := strconv.Atoi(strings.Split(part, "-")[0]) | |||||
| if err != nil { | |||||
| log.Error(err.Error()) | |||||
| return "", err | |||||
| } | |||||
| for _, partInfo := range partInfos { | |||||
| complMultipartUpload.Parts = append(complMultipartUpload.Parts, miniov6.CompletePart{ | complMultipartUpload.Parts = append(complMultipartUpload.Parts, miniov6.CompletePart{ | ||||
| PartNumber: partNumber, | |||||
| ETag: strings.Split(part, "-")[1], | |||||
| PartNumber: partInfo.PartNumber, | |||||
| ETag: partInfo.ETag, | |||||
| }) | }) | ||||
| } | } | ||||
| @@ -158,3 +159,28 @@ func CompleteMultiPartUpload(uuid string, uploadID string, complParts []string) | |||||
| return core.CompleteMultipartUpload(bucketName, objectName, uploadID, complMultipartUpload.Parts) | return core.CompleteMultipartUpload(bucketName, objectName, uploadID, complMultipartUpload.Parts) | ||||
| } | } | ||||
| func GetPartInfos(uuid string, uploadID string) (string, error) { | |||||
| minioClient, _, err := getClients() | |||||
| if err != nil { | |||||
| log.Error("getClients failed:", err.Error()) | |||||
| return "", err | |||||
| } | |||||
| minio := setting.Attachment.Minio | |||||
| bucketName := minio.Bucket | |||||
| objectName := strings.TrimPrefix(path.Join(minio.BasePath, path.Join(uuid[0:1], uuid[1:2], uuid)), "/") | |||||
| partInfos, err := minioClient.ListObjectParts(bucketName, objectName, uploadID) | |||||
| if err != nil { | |||||
| log.Error("ListObjectParts failed:", err.Error()) | |||||
| return "", err | |||||
| } | |||||
| var chunks string | |||||
| for _, partInfo := range partInfos { | |||||
| chunks += strconv.Itoa(partInfo.PartNumber) + "-" + partInfo.ETag + "," | |||||
| } | |||||
| return chunks, nil | |||||
| } | |||||
| @@ -6,7 +6,6 @@ package repo | |||||
| import ( | import ( | ||||
| contexExt "context" | contexExt "context" | ||||
| "encoding/json" | |||||
| "fmt" | "fmt" | ||||
| "net/http" | "net/http" | ||||
| "strconv" | "strconv" | ||||
| @@ -333,6 +332,7 @@ func UpdateAttachmentDecompressState(ctx *context.Context) { | |||||
| func GetSuccessChunks(ctx *context.Context) { | func GetSuccessChunks(ctx *context.Context) { | ||||
| fileMD5 := ctx.Query("md5") | fileMD5 := ctx.Query("md5") | ||||
| var chunks string | |||||
| fileChunk, err := models.GetFileChunkByMD5AndUser(fileMD5, ctx.User.ID) | fileChunk, err := models.GetFileChunkByMD5AndUser(fileMD5, ctx.User.ID) | ||||
| if err != nil { | if err != nil { | ||||
| @@ -349,12 +349,36 @@ func GetSuccessChunks(ctx *context.Context) { | |||||
| return | return | ||||
| } | } | ||||
| chunks, err := json.Marshal(fileChunk.CompletedParts) | |||||
| isExist, err := storage.Attachments.HasObject(models.AttachmentRelativePath(fileChunk.UUID)) | |||||
| if err != nil { | if err != nil { | ||||
| ctx.ServerError("json.Marshal failed", err) | |||||
| ctx.ServerError("HasObject failed", err) | |||||
| return | return | ||||
| } | } | ||||
| if isExist { | |||||
| if fileChunk.IsUploaded == models.FileNotUploaded { | |||||
| log.Info("the file has been uploaded but not recorded") | |||||
| fileChunk.IsUploaded = models.FileUploaded | |||||
| if err = models.UpdateFileChunk(fileChunk); err != nil { | |||||
| log.Error("UpdateFileChunk failed:", err.Error()) | |||||
| } | |||||
| } | |||||
| } else { | |||||
| if fileChunk.IsUploaded == models.FileUploaded { | |||||
| log.Info("the file has been recorded but not uploaded") | |||||
| fileChunk.IsUploaded = models.FileNotUploaded | |||||
| if err = models.UpdateFileChunk(fileChunk); err != nil { | |||||
| log.Error("UpdateFileChunk failed:", err.Error()) | |||||
| } | |||||
| } | |||||
| chunks, err = storage.GetPartInfos(fileChunk.UUID, fileChunk.UploadID) | |||||
| if err != nil { | |||||
| ctx.ServerError("json.Marshal failed", err) | |||||
| return | |||||
| } | |||||
| } | |||||
| var attachID int64 | var attachID int64 | ||||
| attach, err := models.GetAttachmentByUUID(fileChunk.UUID) | attach, err := models.GetAttachmentByUUID(fileChunk.UUID) | ||||
| if err != nil { | if err != nil { | ||||
| @@ -479,7 +503,7 @@ func CompleteMultipart(ctx *context.Context) { | |||||
| return | return | ||||
| } | } | ||||
| _, err = storage.CompleteMultiPartUpload(uuid, uploadID, fileChunk.CompletedParts) | |||||
| _, err = storage.CompleteMultiPartUpload(uuid, uploadID) | |||||
| if err != nil { | if err != nil { | ||||
| ctx.Error(500, fmt.Sprintf("CompleteMultiPartUpload failed: %v", err)) | ctx.Error(500, fmt.Sprintf("CompleteMultiPartUpload failed: %v", err)) | ||||
| return | return | ||||
| @@ -358,7 +358,7 @@ export default { | |||||
| await uploadMinio(urls[currentChunk], e); | await uploadMinio(urls[currentChunk], e); | ||||
| if (etags[currentChunk] != '') { | if (etags[currentChunk] != '') { | ||||
| // 更新数据库:分片上传结果 | // 更新数据库:分片上传结果 | ||||
| await updateChunk(currentChunk); | |||||
| //await updateChunk(currentChunk); | |||||
| } else { | } else { | ||||
| console.log("上传到minio uploadChunk etags[currentChunk] == ''");// TODO | console.log("上传到minio uploadChunk etags[currentChunk] == ''");// TODO | ||||
| } | } | ||||
| @@ -390,7 +390,7 @@ export default { | |||||
| let successParts = []; | let successParts = []; | ||||
| successParts = file.chunks.split(','); | successParts = file.chunks.split(','); | ||||
| for (let i = 0; i < successParts.length; i++) { | for (let i = 0; i < successParts.length; i++) { | ||||
| successChunks[i] = successParts[i].split('-')[0].split('"')[1]; | |||||
| successChunks[i] = successParts[i].split('-')[0]; | |||||
| } | } | ||||
| const urls = []; // TODO const ? | const urls = []; // TODO const ? | ||||
| const etags = []; | const etags = []; | ||||