@@ -41,6 +41,7 @@ type Attachment struct { | |||
Size int64 `xorm:"DEFAULT 0"` | |||
IsPrivate bool `xorm:"DEFAULT false"` | |||
DecompressState int32 `xorm:"DEFAULT 0"` | |||
Type int `xorm:"DEFAULT 0"` | |||
CreatedUnix timeutil.TimeStamp `xorm:"created"` | |||
} | |||
@@ -350,7 +351,7 @@ func GetUnDecompressAttachments() ([]*Attachment, error) { | |||
func getUnDecompressAttachments(e Engine) ([]*Attachment, error) { | |||
attachments := make([]*Attachment, 0, 10) | |||
return attachments, e.Where("decompress_state = ? and dataset_id != 0 and name like '%.zip'", DecompressStateInit).Find(&attachments) | |||
return attachments, e.Where("decompress_state = ? and dataset_id != 0 and type = ? and name like '%.zip'", DecompressStateInit, TypeCloudBrainOne).Find(&attachments) | |||
} | |||
func GetAllPublicAttachments() ([]*AttachmentUsername, error) { | |||
@@ -360,7 +361,7 @@ func GetAllPublicAttachments() ([]*AttachmentUsername, error) { | |||
func getAllPublicAttachments(e Engine) ([]*AttachmentUsername, error) { | |||
attachments := make([]*AttachmentUsername, 0, 10) | |||
if err := e.Table("attachment").Join("LEFT", "`user`", "attachment.uploader_id "+ | |||
"= `user`.id").Where("decompress_state= ? and is_private= ?", DecompressStateDone, false).Find(&attachments); err != nil { | |||
"= `user`.id").Where("decompress_state= ? and is_private= ? and type = ?", DecompressStateDone, false, TypeCloudBrainOne).Find(&attachments); err != nil { | |||
return nil, err | |||
} | |||
return attachments, nil | |||
@@ -378,7 +379,7 @@ func GetPrivateAttachments(username string) ([]*AttachmentUsername, error) { | |||
func getPrivateAttachments(e Engine, userID int64) ([]*AttachmentUsername, error) { | |||
attachments := make([]*AttachmentUsername, 0, 10) | |||
if err := e.Table("attachment").Join("LEFT", "`user`", "attachment.uploader_id "+ | |||
"= `user`.id").Where("decompress_state= ? and uploader_id= ?", DecompressStateDone, userID).Find(&attachments); err != nil { | |||
"= `user`.id").Where("decompress_state= ? and uploader_id= ? and type = ?", DecompressStateDone, userID, TypeCloudBrainOne).Find(&attachments); err != nil { | |||
return nil, err | |||
} | |||
return attachments, nil | |||
@@ -406,7 +407,7 @@ func GetAllUserAttachments(userID int64) ([]*AttachmentUsername, error) { | |||
func getAllUserAttachments(e Engine, userID int64) ([]*AttachmentUsername, error) { | |||
attachments := make([]*AttachmentUsername, 0, 10) | |||
if err := e.Table("attachment").Join("LEFT", "`user`", "attachment.uploader_id "+ | |||
"= `user`.id").Where("decompress_state= ? and (uploader_id= ? or is_private = ?)", DecompressStateDone, userID, false).Find(&attachments); err != nil { | |||
"= `user`.id").Where("decompress_state= ? and type = ? and (uploader_id= ? or is_private = ?)", DecompressStateDone, TypeCloudBrainOne, userID, false).Find(&attachments); err != nil { | |||
return nil, err | |||
} | |||
return attachments, nil | |||
@@ -10,6 +10,11 @@ const ( | |||
FileUploaded | |||
) | |||
const ( | |||
TypeCloudBrainOne = 0 | |||
TypeCloudBrainTwo = 1 | |||
) | |||
type FileChunk struct { | |||
ID int64 `xorm:"pk autoincr"` | |||
UUID string `xorm:"uuid UNIQUE"` | |||
@@ -19,7 +24,8 @@ type FileChunk struct { | |||
TotalChunks int | |||
Size int64 | |||
UserID int64 `xorm:"INDEX"` | |||
CompletedParts []string `xorm:"DEFAULT """` // chunkNumber+etag eg: ,1-asqwewqe21312312.2-123hjkas | |||
Type int `xorm:"INDEX DEFAULT 0"` | |||
CompletedParts []string `xorm:"DEFAULT ''"` // chunkNumber+etag eg: ,1-asqwewqe21312312.2-123hjkas | |||
CreatedUnix timeutil.TimeStamp `xorm:"INDEX created"` | |||
UpdatedUnix timeutil.TimeStamp `xorm:"INDEX updated"` | |||
} | |||
@@ -41,14 +47,14 @@ func getFileChunkByMD5(e Engine, md5 string) (*FileChunk, error) { | |||
} | |||
// GetFileChunkByMD5 returns fileChunk by given id | |||
func GetFileChunkByMD5AndUser(md5 string, userID int64) (*FileChunk, error) { | |||
return getFileChunkByMD5AndUser(x, md5, userID) | |||
func GetFileChunkByMD5AndUser(md5 string, userID int64, typeCloudBrain int) (*FileChunk, error) { | |||
return getFileChunkByMD5AndUser(x, md5, userID, typeCloudBrain) | |||
} | |||
func getFileChunkByMD5AndUser(e Engine, md5 string, userID int64) (*FileChunk, error) { | |||
func getFileChunkByMD5AndUser(e Engine, md5 string, userID int64, typeCloudBrain int) (*FileChunk, error) { | |||
fileChunk := new(FileChunk) | |||
if has, err := e.Where("md5 = ? and user_id = ?", md5, userID).Get(fileChunk); err != nil { | |||
if has, err := e.Where("md5 = ? and user_id = ? and type = ?", md5, userID, typeCloudBrain).Get(fileChunk); err != nil { | |||
return nil, err | |||
} else if !has { | |||
return nil, ErrFileChunkNotExist{md5, ""} | |||
@@ -447,6 +447,15 @@ var ( | |||
//blockchain config | |||
BlockChainHost string | |||
CommitValidDate string | |||
//obs config | |||
Endpoint string | |||
AccessKeyID string | |||
SecretAccessKey string | |||
Bucket string | |||
Location string | |||
BasePath string | |||
//RealPath string | |||
) | |||
// DateLang transforms standard language locale name to corresponding value in datetime plugin. | |||
@@ -1131,6 +1140,14 @@ func NewContext() { | |||
sec = Cfg.Section("blockchain") | |||
BlockChainHost = sec.Key("HOST").MustString("http://192.168.136.66:3302/") | |||
CommitValidDate = sec.Key("COMMIT_VALID_DATE").MustString("2021-01-15") | |||
sec = Cfg.Section("obs") | |||
Endpoint = sec.Key("ENDPOINT").MustString("112.95.163.82") | |||
AccessKeyID = sec.Key("ACCESS_KEY_ID").MustString("") | |||
SecretAccessKey = sec.Key("SECRET_ACCESS_KEY").MustString("") | |||
Bucket = sec.Key("BUCKET").MustString("testopendata") | |||
Location = sec.Key("LOCATION").MustString("cn-south-222") | |||
BasePath = sec.Key("BASE_PATH").MustString("attachment/") | |||
} | |||
func loadInternalToken(sec *ini.Section) string { | |||
@@ -0,0 +1,119 @@ | |||
// Copyright 2020 The Gitea Authors. All rights reserved. | |||
// Use of this source code is governed by a MIT-style | |||
// license that can be found in the LICENSE file. | |||
package storage | |||
import ( | |||
"io" | |||
"path" | |||
"strconv" | |||
"strings" | |||
"code.gitea.io/gitea/modules/log" | |||
"code.gitea.io/gitea/modules/obs" | |||
"code.gitea.io/gitea/modules/setting" | |||
) | |||
//check if has the object | |||
func ObsHasObject(path string) (bool, error) { | |||
hasObject := false | |||
output, err := ObsCli.ListObjects(&obs.ListObjectsInput{Bucket:setting.Bucket}) | |||
if err != nil { | |||
log.Error("ListObjects failed:%v", err) | |||
return hasObject, err | |||
} | |||
for _, obj := range output.Contents { | |||
if path == obj.Key { | |||
hasObject = true | |||
break | |||
} | |||
} | |||
return hasObject, nil | |||
} | |||
func GetObsPartInfos(uuid string, uploadID string) (string, error) { | |||
key := strings.TrimPrefix(path.Join(setting.BasePath, path.Join(uuid[0:1], uuid[1:2], uuid)), "/") | |||
output, err := ObsCli.ListParts(&obs.ListPartsInput{ | |||
Bucket: setting.Bucket, | |||
Key: key, | |||
UploadId: uploadID, | |||
}) | |||
if err != nil { | |||
log.Error("ListParts failed:", err.Error()) | |||
return "", err | |||
} | |||
var chunks string | |||
for _, partInfo := range output.Parts { | |||
chunks += strconv.Itoa(partInfo.PartNumber) + "-" + partInfo.ETag + "," | |||
} | |||
return chunks, nil | |||
} | |||
func NewObsMultiPartUpload(uuid string) (string, error) { | |||
input := &obs.InitiateMultipartUploadInput{} | |||
input.Bucket = setting.Bucket | |||
input.Key = strings.TrimPrefix(path.Join(setting.BasePath, path.Join(uuid[0:1], uuid[1:2], uuid)), "/") | |||
output, err := ObsCli.InitiateMultipartUpload(input) | |||
if err != nil { | |||
log.Error("InitiateMultipartUpload failed:", err.Error()) | |||
return "", err | |||
} | |||
return output.UploadId, nil | |||
} | |||
func CompleteObsMultiPartUpload(uuid string, uploadID string) error { | |||
input := &obs.CompleteMultipartUploadInput{} | |||
input.Bucket = setting.Bucket | |||
input.Key = strings.TrimPrefix(path.Join(setting.BasePath, path.Join(uuid[0:1], uuid[1:2], uuid)), "/") | |||
input.UploadId = uploadID | |||
output, err := ObsCli.ListParts(&obs.ListPartsInput{ | |||
Bucket: setting.Bucket, | |||
Key: input.Key, | |||
UploadId: uploadID, | |||
}) | |||
if err != nil { | |||
log.Error("ListParts failed:", err.Error()) | |||
return err | |||
} | |||
for _, partInfo := range output.Parts { | |||
input.Parts = append(input.Parts, obs.Part{ | |||
PartNumber: partInfo.PartNumber, | |||
ETag: partInfo.ETag, | |||
}) | |||
} | |||
_, err = ObsCli.CompleteMultipartUpload(input) | |||
if err != nil { | |||
log.Error("CompleteMultipartUpload failed:", err.Error()) | |||
return err | |||
} | |||
return nil | |||
} | |||
func ObsUploadPart(uuid string, uploadId string, partNumber int, partSize int64, partReader io.Reader) error { | |||
input := &obs.UploadPartInput{} | |||
input.PartNumber = partNumber | |||
input.Key = strings.TrimPrefix(path.Join(setting.BasePath, path.Join(uuid[0:1], uuid[1:2], uuid)), "/") | |||
input.UploadId = uploadId | |||
input.Bucket = setting.Bucket | |||
input.PartSize = partSize | |||
input.Body = partReader | |||
_, err := ObsCli.UploadPart(input) | |||
if err != nil { | |||
log.Error("UploadPart failed:", err.Error()) | |||
return err | |||
} | |||
return nil | |||
} |
@@ -8,6 +8,8 @@ import ( | |||
"fmt" | |||
"io" | |||
"code.gitea.io/gitea/modules/log" | |||
"code.gitea.io/gitea/modules/obs" | |||
"code.gitea.io/gitea/modules/setting" | |||
) | |||
@@ -40,6 +42,7 @@ func Copy(dstStorage ObjectStorage, dstPath string, srcStorage ObjectStorage, sr | |||
var ( | |||
// Attachments represents attachments storage | |||
Attachments ObjectStorage | |||
ObsCli *obs.ObsClient | |||
) | |||
// Init init the stoarge | |||
@@ -63,6 +66,12 @@ func Init() error { | |||
return fmt.Errorf("Unsupported attachment store type: %s", setting.Attachment.StoreType) | |||
} | |||
ObsCli, err = obs.New(setting.AccessKeyID, setting.SecretAccessKey, setting.Endpoint) | |||
if err != nil { | |||
log.Error("obs.New failed:", err) | |||
return err | |||
} | |||
if err != nil { | |||
return err | |||
} | |||
@@ -15,6 +15,7 @@ import ( | |||
"code.gitea.io/gitea/modules/worker" | |||
contexExt "context" | |||
"encoding/json" | |||
"errors" | |||
"fmt" | |||
"net/http" | |||
"strconv" | |||
@@ -340,9 +341,16 @@ func UpdateAttachmentDecompressState(ctx *context.Context) { | |||
func GetSuccessChunks(ctx *context.Context) { | |||
fileMD5 := ctx.Query("md5") | |||
typeCloudBrain := ctx.QueryInt("type") | |||
var chunks string | |||
fileChunk, err := models.GetFileChunkByMD5AndUser(fileMD5, ctx.User.ID) | |||
err := checkTypeCloudBrain(typeCloudBrain) | |||
if err != nil { | |||
ctx.ServerError("checkTypeCloudBrain failed", err) | |||
return | |||
} | |||
fileChunk, err := models.GetFileChunkByMD5AndUser(fileMD5, ctx.User.ID, typeCloudBrain) | |||
if err != nil { | |||
if models.IsErrFileChunkNotExist(err) { | |||
ctx.JSON(200, map[string]string{ | |||
@@ -357,12 +365,22 @@ func GetSuccessChunks(ctx *context.Context) { | |||
return | |||
} | |||
isExist, err := storage.Attachments.HasObject(models.AttachmentRelativePath(fileChunk.UUID)) | |||
if err != nil { | |||
ctx.ServerError("HasObject failed", err) | |||
return | |||
isExist := false | |||
if typeCloudBrain == models.TypeCloudBrainOne { | |||
isExist, err = storage.Attachments.HasObject(models.AttachmentRelativePath(fileChunk.UUID)) | |||
if err != nil { | |||
ctx.ServerError("HasObject failed", err) | |||
return | |||
} | |||
} else { | |||
isExist, err = storage.ObsHasObject(models.AttachmentRelativePath(fileChunk.UUID)) | |||
if err != nil { | |||
ctx.ServerError("ObsHasObject failed", err) | |||
return | |||
} | |||
} | |||
if isExist { | |||
if fileChunk.IsUploaded == models.FileNotUploaded { | |||
log.Info("the file has been uploaded but not recorded") | |||
@@ -380,10 +398,18 @@ func GetSuccessChunks(ctx *context.Context) { | |||
} | |||
} | |||
chunks, err = storage.GetPartInfos(fileChunk.UUID, fileChunk.UploadID) | |||
if err != nil { | |||
ctx.ServerError("GetPartInfos failed", err) | |||
return | |||
if typeCloudBrain == models.TypeCloudBrainOne { | |||
chunks, err = storage.GetPartInfos(fileChunk.UUID, fileChunk.UploadID) | |||
if err != nil { | |||
ctx.ServerError("GetPartInfos failed", err) | |||
return | |||
} | |||
} else { | |||
chunks, err = storage.GetObsPartInfos(fileChunk.UUID, fileChunk.UploadID) | |||
if err != nil { | |||
ctx.ServerError("GetObsPartInfos failed", err) | |||
return | |||
} | |||
} | |||
} | |||
@@ -445,6 +471,13 @@ func NewMultipart(ctx *context.Context) { | |||
return | |||
} | |||
typeCloudBrain := ctx.QueryInt("type") | |||
err = checkTypeCloudBrain(typeCloudBrain) | |||
if err != nil { | |||
ctx.ServerError("checkTypeCloudBrain failed", err) | |||
return | |||
} | |||
if setting.Attachment.StoreType == storage.MinioStorageType { | |||
totalChunkCounts := ctx.QueryInt("totalChunkCounts") | |||
if totalChunkCounts > minio_ext.MaxPartsCount { | |||
@@ -459,10 +492,19 @@ func NewMultipart(ctx *context.Context) { | |||
} | |||
uuid := gouuid.NewV4().String() | |||
uploadID, err := storage.NewMultiPartUpload(uuid) | |||
if err != nil { | |||
ctx.ServerError("NewMultipart", err) | |||
return | |||
var uploadID string | |||
if typeCloudBrain == models.TypeCloudBrainOne { | |||
uploadID, err = storage.NewMultiPartUpload(uuid) | |||
if err != nil { | |||
ctx.ServerError("NewMultipart", err) | |||
return | |||
} | |||
} else { | |||
uploadID, err = storage.NewObsMultiPartUpload(uuid) | |||
if err != nil { | |||
ctx.ServerError("NewObsMultiPartUpload", err) | |||
return | |||
} | |||
} | |||
_, err = models.InsertFileChunk(&models.FileChunk{ | |||
@@ -472,6 +514,7 @@ func NewMultipart(ctx *context.Context) { | |||
Md5: ctx.Query("md5"), | |||
Size: fileSize, | |||
TotalChunks: totalChunkCounts, | |||
Type: typeCloudBrain, | |||
}) | |||
if err != nil { | |||
@@ -511,9 +554,40 @@ func GetMultipartUploadUrl(ctx *context.Context) { | |||
}) | |||
} | |||
func UploadPart(ctx *context.Context) { | |||
uuid := ctx.Query("uuid") | |||
uploadID := ctx.Query("uploadID") | |||
partNumber := ctx.QueryInt("chunkNumber") | |||
size := ctx.QueryInt64("size") | |||
if size > minio_ext.MinPartSize { | |||
ctx.Error(400, fmt.Sprintf("chunk size(%d) is too big", size)) | |||
return | |||
} | |||
url, err := storage.GenMultiPartSignedUrl(uuid, uploadID, partNumber, size) | |||
//todo:get file reader | |||
//err := storage.ObsUploadPart(uuid, uploadID, partNumber, size, partReader) | |||
if err != nil { | |||
ctx.Error(500, fmt.Sprintf("GenMultiPartSignedUrl failed: %v", err)) | |||
return | |||
} | |||
ctx.JSON(200, map[string]string{ | |||
"url": url, | |||
}) | |||
} | |||
func CompleteMultipart(ctx *context.Context) { | |||
uuid := ctx.Query("uuid") | |||
uploadID := ctx.Query("uploadID") | |||
typeCloudBrain := ctx.QueryInt("type") | |||
err := checkTypeCloudBrain(typeCloudBrain) | |||
if err != nil { | |||
ctx.ServerError("checkTypeCloudBrain failed", err) | |||
return | |||
} | |||
fileChunk, err := models.GetFileChunkByUUID(uuid) | |||
if err != nil { | |||
@@ -525,10 +599,18 @@ func CompleteMultipart(ctx *context.Context) { | |||
return | |||
} | |||
_, err = storage.CompleteMultiPartUpload(uuid, uploadID) | |||
if err != nil { | |||
ctx.Error(500, fmt.Sprintf("CompleteMultiPartUpload failed: %v", err)) | |||
return | |||
if typeCloudBrain == models.TypeCloudBrainOne { | |||
_, err = storage.CompleteMultiPartUpload(uuid, uploadID) | |||
if err != nil { | |||
ctx.Error(500, fmt.Sprintf("CompleteMultiPartUpload failed: %v", err)) | |||
return | |||
} | |||
} else { | |||
err = storage.CompleteObsMultiPartUpload(uuid, uploadID) | |||
if err != nil { | |||
ctx.Error(500, fmt.Sprintf("CompleteObsMultiPartUpload failed: %v", err)) | |||
return | |||
} | |||
} | |||
fileChunk.IsUploaded = models.FileUploaded | |||
@@ -546,6 +628,7 @@ func CompleteMultipart(ctx *context.Context) { | |||
Name: ctx.Query("file_name"), | |||
Size: ctx.QueryInt64("size"), | |||
DatasetID: ctx.QueryInt64("dataset_id"), | |||
Type: typeCloudBrain, | |||
}) | |||
if err != nil { | |||
@@ -704,3 +787,11 @@ func queryDatasets(ctx *context.Context, attachs []*models.AttachmentUsername) { | |||
}) | |||
return | |||
} | |||
func checkTypeCloudBrain(typeCloudBrain int) error { | |||
if typeCloudBrain != models.TypeCloudBrainOne && typeCloudBrain != models.TypeCloudBrainTwo { | |||
log.Error("type error:", typeCloudBrain) | |||
return errors.New("type error") | |||
} | |||
return nil | |||
} |
@@ -529,6 +529,7 @@ func RegisterRoutes(m *macaron.Macaron) { | |||
m.Get("/get_multipart_url", repo.GetMultipartUploadUrl) | |||
m.Post("/complete_multipart", repo.CompleteMultipart) | |||
m.Post("/update_chunk", repo.UpdateMultipart) | |||
m.Post("/upload_part", repo.UploadPart) | |||
}, reqSignIn) | |||
m.Group("/attachments", func() { | |||
@@ -21,6 +21,7 @@ import qs from 'qs'; | |||
import createDropzone from '../features/dropzone.js'; | |||
const {_AppSubUrl, _StaticUrlPrefix, csrf} = window.config; | |||
const cloud_brain_type = 0; | |||
export default { | |||
data() { | |||
@@ -255,6 +256,7 @@ export default { | |||
const params = { | |||
params: { | |||
md5: file.uniqueIdentifier, | |||
type: cloud_brain_type, | |||
_csrf: csrf | |||
} | |||
}; | |||
@@ -283,6 +285,7 @@ export default { | |||
md5: file.uniqueIdentifier, | |||
size: file.size, | |||
fileType: file.type, | |||
type: cloud_brain_type, | |||
_csrf: csrf | |||
} | |||
}); | |||
@@ -381,6 +384,7 @@ export default { | |||
file_name: file.name, | |||
size: file.size, | |||
dataset_id: file.datasetId, | |||
type: cloud_brain_type, | |||
_csrf: csrf | |||
}) | |||
); | |||