From 5f578c004dea4d05d1ce4223a4336c0457fa5ff4 Mon Sep 17 00:00:00 2001 From: yuyuanshifu <747342561@qq.com> Date: Wed, 16 Sep 2020 14:47:33 +0800 Subject: [PATCH] no more record chunk upload result in mysql --- modules/minio_ext/api-error-response.go | 3 - modules/minio_ext/api-list.go | 90 +++++++++ modules/minio_ext/api-s3-datatypes.go | 71 +++++++ modules/minio_ext/api.go | 253 ++++++++++++++---------- modules/minio_ext/object.go | 3 - modules/minio_ext/retry.go | 153 ++++++++++++++ modules/storage/minio_ext.go | 46 ++++- routers/repo/attachment.go | 32 ++- web_src/js/components/MinioUploader.vue | 4 +- 9 files changed, 532 insertions(+), 123 deletions(-) create mode 100755 modules/minio_ext/api-list.go create mode 100755 modules/minio_ext/api-s3-datatypes.go create mode 100755 modules/minio_ext/retry.go diff --git a/modules/minio_ext/api-error-response.go b/modules/minio_ext/api-error-response.go index 5dee08500..b515c829c 100755 --- a/modules/minio_ext/api-error-response.go +++ b/modules/minio_ext/api-error-response.go @@ -27,11 +27,9 @@ type ErrorResponse struct { func (e ErrorResponse) Error() string { return e.Message } - const ( reportIssue = "Please report this issue at https://github.com/minio/minio/issues." ) - // httpRespToErrorResponse returns a new encoded ErrorResponse // structure as error. func httpRespToErrorResponse(resp *http.Response, bucketName, objectName string) error { @@ -124,7 +122,6 @@ func ToErrorResponse(err error) ErrorResponse { return ErrorResponse{} } } - // ErrInvalidArgument - Invalid argument response. func ErrInvalidArgument(message string) error { return ErrorResponse{ diff --git a/modules/minio_ext/api-list.go b/modules/minio_ext/api-list.go new file mode 100755 index 000000000..5904431e7 --- /dev/null +++ b/modules/minio_ext/api-list.go @@ -0,0 +1,90 @@ +package minio_ext + +import ( + "context" + "fmt" + "net/http" + "net/url" + "strings" +) + +// ListObjectParts list all object parts recursively. +func (c Client) ListObjectParts(bucketName, objectName, uploadID string) (partsInfo map[int]ObjectPart, err error) { + // Part number marker for the next batch of request. + var nextPartNumberMarker int + partsInfo = make(map[int]ObjectPart) + for { + // Get list of uploaded parts a maximum of 1000 per request. + listObjPartsResult, err := c.listObjectPartsQuery(bucketName, objectName, uploadID, nextPartNumberMarker, 1000) + if err != nil { + return nil, err + } + // Append to parts info. + for _, part := range listObjPartsResult.ObjectParts { + // Trim off the odd double quotes from ETag in the beginning and end. + part.ETag = strings.TrimPrefix(part.ETag, "\"") + part.ETag = strings.TrimSuffix(part.ETag, "\"") + partsInfo[part.PartNumber] = part + } + // Keep part number marker, for the next iteration. + nextPartNumberMarker = listObjPartsResult.NextPartNumberMarker + // Listing ends result is not truncated, return right here. + if !listObjPartsResult.IsTruncated { + break + } + } + + // Return all the parts. + return partsInfo, nil +} + + +// listObjectPartsQuery (List Parts query) +// - lists some or all (up to 1000) parts that have been uploaded +// for a specific multipart upload +// +// You can use the request parameters as selection criteria to return +// a subset of the uploads in a bucket, request parameters :- +// --------- +// ?part-number-marker - Specifies the part after which listing should +// begin. +// ?max-parts - Maximum parts to be listed per request. +func (c Client) listObjectPartsQuery(bucketName, objectName, uploadID string, partNumberMarker, maxParts int) (ListObjectPartsResult, error) { + // Get resources properly escaped and lined up before using them in http request. + urlValues := make(url.Values) + // Set part number marker. + urlValues.Set("part-number-marker", fmt.Sprintf("%d", partNumberMarker)) + // Set upload id. + urlValues.Set("uploadId", uploadID) + + // maxParts should be 1000 or less. + if maxParts == 0 || maxParts > 1000 { + maxParts = 1000 + } + // Set max parts. + urlValues.Set("max-parts", fmt.Sprintf("%d", maxParts)) + + // Execute GET on objectName to get list of parts. + resp, err := c.executeMethod(context.Background(), "GET", requestMetadata{ + bucketName: bucketName, + objectName: objectName, + queryValues: urlValues, + contentSHA256Hex: emptySHA256Hex, + }) + defer closeResponse(resp) + if err != nil { + return ListObjectPartsResult{}, err + } + if resp != nil { + if resp.StatusCode != http.StatusOK { + return ListObjectPartsResult{}, httpRespToErrorResponse(resp, bucketName, objectName) + } + } + // Decode list object parts XML. + listObjectPartsResult := ListObjectPartsResult{} + err = xmlDecoder(resp.Body, &listObjectPartsResult) + if err != nil { + return listObjectPartsResult, err + } + return listObjectPartsResult, nil +} diff --git a/modules/minio_ext/api-s3-datatypes.go b/modules/minio_ext/api-s3-datatypes.go new file mode 100755 index 000000000..df13ca380 --- /dev/null +++ b/modules/minio_ext/api-s3-datatypes.go @@ -0,0 +1,71 @@ +/* + * Minio Go Library for Amazon S3 Compatible Cloud Storage + * Copyright 2015-2017 Minio, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package minio_ext + +import ( + "time" +) + +// owner container for bucket owner information. +type owner struct { + DisplayName string + ID string +} + +// initiator container for who initiated multipart upload. +type initiator struct { + ID string + DisplayName string +} + +// ObjectPart container for particular part of an object. +type ObjectPart struct { + // Part number identifies the part. + PartNumber int + + // Date and time the part was uploaded. + LastModified time.Time + + // Entity tag returned when the part was uploaded, usually md5sum + // of the part. + ETag string + + // Size of the uploaded part data. + Size int64 +} + +// ListObjectPartsResult container for ListObjectParts response. +type ListObjectPartsResult struct { + Bucket string + Key string + UploadID string `xml:"UploadId"` + + Initiator initiator + Owner owner + + StorageClass string + PartNumberMarker int + NextPartNumberMarker int + MaxParts int + + // Indicates whether the returned list of parts is truncated. + IsTruncated bool + ObjectParts []ObjectPart `xml:"Part"` + + EncodingType string +} \ No newline at end of file diff --git a/modules/minio_ext/api.go b/modules/minio_ext/api.go index 3847b30e7..d3d7272e5 100755 --- a/modules/minio_ext/api.go +++ b/modules/minio_ext/api.go @@ -1,6 +1,8 @@ package minio_ext import ( + "bytes" + "context" "errors" "fmt" "io" @@ -12,6 +14,7 @@ import ( "net/http/cookiejar" "net/http/httputil" "net/url" + "os" "path" "runtime" "strconv" @@ -25,23 +28,6 @@ import ( "golang.org/x/net/publicsuffix" ) -// MaxRetry is the maximum number of retries before stopping. -var MaxRetry = 10 - -// MaxJitter will randomize over the full exponential backoff time -const MaxJitter = 1.0 - -// NoJitter disables the use of jitter for randomizing the exponential backoff time -const NoJitter = 0.0 - -// DefaultRetryUnit - default unit multiplicative per retry. -// defaults to 1 second. -const DefaultRetryUnit = time.Second - -// DefaultRetryCap - Each retry attempt never waits no longer than -// this maximum time duration. -const DefaultRetryCap = time.Second * 30 - // Global constants. const ( libraryName = "minio-go" @@ -156,6 +142,7 @@ func (r *lockedRandSource) Seed(seed int64) { r.lk.Unlock() } + // Different types of url lookup supported by the server.Initialized to BucketLookupAuto const ( BucketLookupAuto BucketLookupType = iota @@ -163,21 +150,6 @@ const ( BucketLookupPath ) -// List of AWS S3 error codes which are retryable. -var retryableS3Codes = map[string]struct{}{ - "RequestError": {}, - "RequestTimeout": {}, - "Throttling": {}, - "ThrottlingException": {}, - "RequestLimitExceeded": {}, - "RequestThrottled": {}, - "InternalError": {}, - "ExpiredToken": {}, - "ExpiredTokenException": {}, - "SlowDown": {}, - // Add more AWS S3 codes here. -} - // awsS3EndpointMap Amazon S3 endpoint map. var awsS3EndpointMap = map[string]string{ "us-east-1": "s3.dualstack.us-east-1.amazonaws.com", @@ -253,16 +225,6 @@ var successStatus = []int{ http.StatusPartialContent, } -// List of HTTP status codes which are retryable. -var retryableHTTPStatusCodes = map[int]struct{}{ - 429: {}, // http.StatusTooManyRequests is not part of the Go 1.5 library, yet - http.StatusInternalServerError: {}, - http.StatusBadGateway: {}, - http.StatusServiceUnavailable: {}, - http.StatusGatewayTimeout: {}, - // Add more HTTP status codes here. -} - // newBucketLocationCache - Provides a new bucket location cache to be // used internally with the client object. func newBucketLocationCache() *bucketLocationCache { @@ -406,55 +368,6 @@ func New(endpoint, accessKeyID, secretAccessKey string, secure bool) (*Client, e return clnt, nil } -// isHTTPStatusRetryable - is HTTP error code retryable. -func isHTTPStatusRetryable(httpStatusCode int) (ok bool) { - _, ok = retryableHTTPStatusCodes[httpStatusCode] - return ok -} - -// newRetryTimer creates a timer with exponentially increasing -// delays until the maximum retry attempts are reached. -func (c Client) newRetryTimer(maxRetry int, unit time.Duration, cap time.Duration, jitter float64, doneCh chan struct{}) <-chan int { - attemptCh := make(chan int) - - // computes the exponential backoff duration according to - // https://www.awsarchitectureblog.com/2015/03/backoff.html - exponentialBackoffWait := func(attempt int) time.Duration { - // normalize jitter to the range [0, 1.0] - if jitter < NoJitter { - jitter = NoJitter - } - if jitter > MaxJitter { - jitter = MaxJitter - } - - //sleep = random_between(0, min(cap, base * 2 ** attempt)) - sleep := unit * time.Duration(1< cap { - sleep = cap - } - if jitter != NoJitter { - sleep -= time.Duration(c.random.Float64() * float64(sleep) * jitter) - } - return sleep - } - - go func() { - defer close(attemptCh) - for i := 0; i < maxRetry; i++ { - select { - // Attempts start from 1. - case attemptCh <- i + 1: - case <-doneCh: - // Stop the routine. - return - } - time.Sleep(exponentialBackoffWait(i)) - } - }() - return attemptCh -} - // Get - Returns a value of a given key if it exists. func (r *bucketLocationCache) Get(bucketName string) (location string, ok bool) { r.RLock() @@ -991,7 +904,8 @@ func (c Client) newRequest(method string, metadata requestMetadata) (req *http.R return req, nil } -func (c Client) GenUploadPartSignedUrl(uploadID string, bucketName string, objectName string, partNumber int, size int64, expires time.Duration, bucketLocation string) (string, error) { + +func (c Client) GenUploadPartSignedUrl(uploadID string, bucketName string, objectName string, partNumber int, size int64, expires time.Duration, bucketLocation string) (string, error){ signedUrl := "" // Input validation. @@ -1025,17 +939,17 @@ func (c Client) GenUploadPartSignedUrl(uploadID string, bucketName string, objec customHeader := make(http.Header) reqMetadata := requestMetadata{ - presignURL: true, - bucketName: bucketName, - objectName: objectName, - queryValues: urlValues, - customHeader: customHeader, + presignURL: true, + bucketName: bucketName, + objectName: objectName, + queryValues: urlValues, + customHeader: customHeader, //contentBody: reader, - contentLength: size, + contentLength: size, //contentMD5Base64: md5Base64, //contentSHA256Hex: sha256Hex, - expires: int64(expires / time.Second), - bucketLocation: bucketLocation, + expires: int64(expires/time.Second), + bucketLocation: bucketLocation, } req, err := c.newRequest("PUT", reqMetadata) @@ -1045,5 +959,142 @@ func (c Client) GenUploadPartSignedUrl(uploadID string, bucketName string, objec } signedUrl = req.URL.String() - return signedUrl, nil + return signedUrl,nil +} + + +// executeMethod - instantiates a given method, and retries the +// request upon any error up to maxRetries attempts in a binomially +// delayed manner using a standard back off algorithm. +func (c Client) executeMethod(ctx context.Context, method string, metadata requestMetadata) (res *http.Response, err error) { + var isRetryable bool // Indicates if request can be retried. + var bodySeeker io.Seeker // Extracted seeker from io.Reader. + var reqRetry = MaxRetry // Indicates how many times we can retry the request + + if metadata.contentBody != nil { + // Check if body is seekable then it is retryable. + bodySeeker, isRetryable = metadata.contentBody.(io.Seeker) + switch bodySeeker { + case os.Stdin, os.Stdout, os.Stderr: + isRetryable = false + } + // Retry only when reader is seekable + if !isRetryable { + reqRetry = 1 + } + + // Figure out if the body can be closed - if yes + // we will definitely close it upon the function + // return. + bodyCloser, ok := metadata.contentBody.(io.Closer) + if ok { + defer bodyCloser.Close() + } + } + + // Create a done channel to control 'newRetryTimer' go routine. + doneCh := make(chan struct{}, 1) + + // Indicate to our routine to exit cleanly upon return. + defer close(doneCh) + + // Blank indentifier is kept here on purpose since 'range' without + // blank identifiers is only supported since go1.4 + // https://golang.org/doc/go1.4#forrange. + for range c.newRetryTimer(reqRetry, DefaultRetryUnit, DefaultRetryCap, MaxJitter, doneCh) { + // Retry executes the following function body if request has an + // error until maxRetries have been exhausted, retry attempts are + // performed after waiting for a given period of time in a + // binomial fashion. + if isRetryable { + // Seek back to beginning for each attempt. + if _, err = bodySeeker.Seek(0, 0); err != nil { + // If seek failed, no need to retry. + return nil, err + } + } + + // Instantiate a new request. + var req *http.Request + req, err = c.newRequest(method, metadata) + if err != nil { + errResponse := ToErrorResponse(err) + if isS3CodeRetryable(errResponse.Code) { + continue // Retry. + } + return nil, err + } + + // Add context to request + req = req.WithContext(ctx) + + // Initiate the request. + res, err = c.do(req) + if err != nil { + // For supported http requests errors verify. + if isHTTPReqErrorRetryable(err) { + continue // Retry. + } + // For other errors, return here no need to retry. + return nil, err + } + + // For any known successful http status, return quickly. + for _, httpStatus := range successStatus { + if httpStatus == res.StatusCode { + return res, nil + } + } + + // Read the body to be saved later. + errBodyBytes, err := ioutil.ReadAll(res.Body) + // res.Body should be closed + closeResponse(res) + if err != nil { + return nil, err + } + + // Save the body. + errBodySeeker := bytes.NewReader(errBodyBytes) + res.Body = ioutil.NopCloser(errBodySeeker) + + // For errors verify if its retryable otherwise fail quickly. + errResponse := ToErrorResponse(httpRespToErrorResponse(res, metadata.bucketName, metadata.objectName)) + + // Save the body back again. + errBodySeeker.Seek(0, 0) // Seek back to starting point. + res.Body = ioutil.NopCloser(errBodySeeker) + + // Bucket region if set in error response and the error + // code dictates invalid region, we can retry the request + // with the new region. + // + // Additionally we should only retry if bucketLocation and custom + // region is empty. + if metadata.bucketLocation == "" && c.region == "" { + if errResponse.Code == "AuthorizationHeaderMalformed" || errResponse.Code == "InvalidRegion" { + if metadata.bucketName != "" && errResponse.Region != "" { + // Gather Cached location only if bucketName is present. + if _, cachedLocationError := c.bucketLocCache.Get(metadata.bucketName); cachedLocationError != false { + c.bucketLocCache.Set(metadata.bucketName, errResponse.Region) + continue // Retry. + } + } + } + } + + // Verify if error response code is retryable. + if isS3CodeRetryable(errResponse.Code) { + continue // Retry. + } + + // Verify if http status code is retryable. + if isHTTPStatusRetryable(res.StatusCode) { + continue // Retry. + } + + // For all other cases break out of the retry loop. + break + } + return res, err } diff --git a/modules/minio_ext/object.go b/modules/minio_ext/object.go index 12fed7d5c..3a075e457 100755 --- a/modules/minio_ext/object.go +++ b/modules/minio_ext/object.go @@ -8,12 +8,10 @@ import ( // StringMap represents map with custom UnmarshalXML type StringMap map[string]string - // CommonPrefix container for prefix response. type CommonPrefix struct { Prefix string } - // ObjectInfo container for object metadata. type ObjectInfo struct { // An ETag is optionally set to md5sum of an object. In case of multipart objects, @@ -46,7 +44,6 @@ type ObjectInfo struct { // Error Err error `json:"-"` } - // ListBucketResult container for listObjects response. type ListBucketResult struct { // A response can contain CommonPrefixes only if you have diff --git a/modules/minio_ext/retry.go b/modules/minio_ext/retry.go new file mode 100755 index 000000000..df4eeec85 --- /dev/null +++ b/modules/minio_ext/retry.go @@ -0,0 +1,153 @@ +/* + * Minio Go Library for Amazon S3 Compatible Cloud Storage + * Copyright 2015-2017 Minio, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package minio_ext + +import ( + "net" + "net/http" + "net/url" + "strings" + "time" +) + +// MaxRetry is the maximum number of retries before stopping. +var MaxRetry = 10 + +// MaxJitter will randomize over the full exponential backoff time +const MaxJitter = 1.0 + +// NoJitter disables the use of jitter for randomizing the exponential backoff time +const NoJitter = 0.0 + +// DefaultRetryUnit - default unit multiplicative per retry. +// defaults to 1 second. +const DefaultRetryUnit = time.Second + +// DefaultRetryCap - Each retry attempt never waits no longer than +// this maximum time duration. +const DefaultRetryCap = time.Second * 30 + +// newRetryTimer creates a timer with exponentially increasing +// delays until the maximum retry attempts are reached. +func (c Client) newRetryTimer(maxRetry int, unit time.Duration, cap time.Duration, jitter float64, doneCh chan struct{}) <-chan int { + attemptCh := make(chan int) + + // computes the exponential backoff duration according to + // https://www.awsarchitectureblog.com/2015/03/backoff.html + exponentialBackoffWait := func(attempt int) time.Duration { + // normalize jitter to the range [0, 1.0] + if jitter < NoJitter { + jitter = NoJitter + } + if jitter > MaxJitter { + jitter = MaxJitter + } + + //sleep = random_between(0, min(cap, base * 2 ** attempt)) + sleep := unit * time.Duration(1< cap { + sleep = cap + } + if jitter != NoJitter { + sleep -= time.Duration(c.random.Float64() * float64(sleep) * jitter) + } + return sleep + } + + go func() { + defer close(attemptCh) + for i := 0; i < maxRetry; i++ { + select { + // Attempts start from 1. + case attemptCh <- i + 1: + case <-doneCh: + // Stop the routine. + return + } + time.Sleep(exponentialBackoffWait(i)) + } + }() + return attemptCh +} + +// isHTTPReqErrorRetryable - is http requests error retryable, such +// as i/o timeout, connection broken etc.. +func isHTTPReqErrorRetryable(err error) bool { + if err == nil { + return false + } + switch e := err.(type) { + case *url.Error: + switch e.Err.(type) { + case *net.DNSError, *net.OpError, net.UnknownNetworkError: + return true + } + if strings.Contains(err.Error(), "Connection closed by foreign host") { + return true + } else if strings.Contains(err.Error(), "net/http: TLS handshake timeout") { + // If error is - tlsHandshakeTimeoutError, retry. + return true + } else if strings.Contains(err.Error(), "i/o timeout") { + // If error is - tcp timeoutError, retry. + return true + } else if strings.Contains(err.Error(), "connection timed out") { + // If err is a net.Dial timeout, retry. + return true + } else if strings.Contains(err.Error(), "net/http: HTTP/1.x transport connection broken") { + // If error is transport connection broken, retry. + return true + } + } + return false +} + +// List of AWS S3 error codes which are retryable. +var retryableS3Codes = map[string]struct{}{ + "RequestError": {}, + "RequestTimeout": {}, + "Throttling": {}, + "ThrottlingException": {}, + "RequestLimitExceeded": {}, + "RequestThrottled": {}, + "InternalError": {}, + "ExpiredToken": {}, + "ExpiredTokenException": {}, + "SlowDown": {}, + // Add more AWS S3 codes here. +} + +// isS3CodeRetryable - is s3 error code retryable. +func isS3CodeRetryable(s3Code string) (ok bool) { + _, ok = retryableS3Codes[s3Code] + return ok +} + +// List of HTTP status codes which are retryable. +var retryableHTTPStatusCodes = map[int]struct{}{ + 429: {}, // http.StatusTooManyRequests is not part of the Go 1.5 library, yet + http.StatusInternalServerError: {}, + http.StatusBadGateway: {}, + http.StatusServiceUnavailable: {}, + // Add more HTTP status codes here. +} + +// isHTTPStatusRetryable - is HTTP error code retryable. +func isHTTPStatusRetryable(httpStatusCode int) (ok bool) { + _, ok = retryableHTTPStatusCodes[httpStatusCode] + return ok +} diff --git a/modules/storage/minio_ext.go b/modules/storage/minio_ext.go index 4f1567ff1..e110138f7 100755 --- a/modules/storage/minio_ext.go +++ b/modules/storage/minio_ext.go @@ -129,8 +129,8 @@ func NewMultiPartUpload(uuid string) (string, error) { return core.NewMultipartUpload(bucketName, objectName, miniov6.PutObjectOptions{}) } -func CompleteMultiPartUpload(uuid string, uploadID string, complParts []string) (string, error) { - _, core, err := getClients() +func CompleteMultiPartUpload(uuid string, uploadID string) (string, error) { + client, core, err := getClients() if err != nil { log.Error("getClients failed:", err.Error()) return "", err @@ -140,16 +140,17 @@ func CompleteMultiPartUpload(uuid string, uploadID string, complParts []string) bucketName := minio.Bucket objectName := strings.TrimPrefix(path.Join(minio.BasePath, path.Join(uuid[0:1], uuid[1:2], uuid)), "/") + partInfos, err := client.ListObjectParts(bucketName, objectName, uploadID) + if err != nil { + log.Error("ListObjectParts failed:", err.Error()) + return "", err + } + var complMultipartUpload completeMultipartUpload - for _, part := range complParts { - partNumber, err := strconv.Atoi(strings.Split(part, "-")[0]) - if err != nil { - log.Error(err.Error()) - return "", err - } + for _, partInfo := range partInfos { complMultipartUpload.Parts = append(complMultipartUpload.Parts, miniov6.CompletePart{ - PartNumber: partNumber, - ETag: strings.Split(part, "-")[1], + PartNumber: partInfo.PartNumber, + ETag: partInfo.ETag, }) } @@ -158,3 +159,28 @@ func CompleteMultiPartUpload(uuid string, uploadID string, complParts []string) return core.CompleteMultipartUpload(bucketName, objectName, uploadID, complMultipartUpload.Parts) } + +func GetPartInfos(uuid string, uploadID string) (string, error) { + minioClient, _, err := getClients() + if err != nil { + log.Error("getClients failed:", err.Error()) + return "", err + } + + minio := setting.Attachment.Minio + bucketName := minio.Bucket + objectName := strings.TrimPrefix(path.Join(minio.BasePath, path.Join(uuid[0:1], uuid[1:2], uuid)), "/") + + partInfos, err := minioClient.ListObjectParts(bucketName, objectName, uploadID) + if err != nil { + log.Error("ListObjectParts failed:", err.Error()) + return "", err + } + + var chunks string + for _, partInfo := range partInfos { + chunks += strconv.Itoa(partInfo.PartNumber) + "-" + partInfo.ETag + "," + } + + return chunks, nil +} diff --git a/routers/repo/attachment.go b/routers/repo/attachment.go index 43b5db07f..6c9817360 100755 --- a/routers/repo/attachment.go +++ b/routers/repo/attachment.go @@ -6,7 +6,6 @@ package repo import ( contexExt "context" - "encoding/json" "fmt" "net/http" "strconv" @@ -333,6 +332,7 @@ func UpdateAttachmentDecompressState(ctx *context.Context) { func GetSuccessChunks(ctx *context.Context) { fileMD5 := ctx.Query("md5") + var chunks string fileChunk, err := models.GetFileChunkByMD5AndUser(fileMD5, ctx.User.ID) if err != nil { @@ -349,12 +349,36 @@ func GetSuccessChunks(ctx *context.Context) { return } - chunks, err := json.Marshal(fileChunk.CompletedParts) + isExist, err := storage.Attachments.HasObject(models.AttachmentRelativePath(fileChunk.UUID)) if err != nil { - ctx.ServerError("json.Marshal failed", err) + ctx.ServerError("HasObject failed", err) return } + if isExist { + if fileChunk.IsUploaded == models.FileNotUploaded { + log.Info("the file has been uploaded but not recorded") + fileChunk.IsUploaded = models.FileUploaded + if err = models.UpdateFileChunk(fileChunk); err != nil { + log.Error("UpdateFileChunk failed:", err.Error()) + } + } + } else { + if fileChunk.IsUploaded == models.FileUploaded { + log.Info("the file has been recorded but not uploaded") + fileChunk.IsUploaded = models.FileNotUploaded + if err = models.UpdateFileChunk(fileChunk); err != nil { + log.Error("UpdateFileChunk failed:", err.Error()) + } + } + + chunks, err = storage.GetPartInfos(fileChunk.UUID, fileChunk.UploadID) + if err != nil { + ctx.ServerError("json.Marshal failed", err) + return + } + } + var attachID int64 attach, err := models.GetAttachmentByUUID(fileChunk.UUID) if err != nil { @@ -479,7 +503,7 @@ func CompleteMultipart(ctx *context.Context) { return } - _, err = storage.CompleteMultiPartUpload(uuid, uploadID, fileChunk.CompletedParts) + _, err = storage.CompleteMultiPartUpload(uuid, uploadID) if err != nil { ctx.Error(500, fmt.Sprintf("CompleteMultiPartUpload failed: %v", err)) return diff --git a/web_src/js/components/MinioUploader.vue b/web_src/js/components/MinioUploader.vue index c35f1c87c..2e7bb9b25 100755 --- a/web_src/js/components/MinioUploader.vue +++ b/web_src/js/components/MinioUploader.vue @@ -358,7 +358,7 @@ export default { await uploadMinio(urls[currentChunk], e); if (etags[currentChunk] != '') { // 更新数据库:分片上传结果 - await updateChunk(currentChunk); + //await updateChunk(currentChunk); } else { console.log("上传到minio uploadChunk etags[currentChunk] == ''");// TODO } @@ -390,7 +390,7 @@ export default { let successParts = []; successParts = file.chunks.split(','); for (let i = 0; i < successParts.length; i++) { - successChunks[i] = successParts[i].split('-')[0].split('"')[1]; + successChunks[i] = successParts[i].split('-')[0]; } const urls = []; // TODO const ? const etags = [];