Browse Source

增加预签名接口

gitlink
Sydonian 8 months ago
parent
commit
97347f760b
8 changed files with 277 additions and 15 deletions
  1. +1
    -1
      agent/internal/task/create_package.go
  2. +1
    -5
      client/internal/cmdline/object.go
  3. +1
    -1
      client/internal/cmdline/put.go
  4. +81
    -3
      client/internal/http/aws_auth.go
  5. +1
    -1
      client/internal/http/object.go
  6. +180
    -0
      client/internal/http/presigned.go
  7. +7
    -0
      client/internal/http/server.go
  8. +5
    -4
      common/pkgs/uploader/update.go

+ 1
- 1
agent/internal/task/create_package.go View File

@@ -113,7 +113,7 @@ func (t *CreatePackage) Execute(task *task.Task[TaskContext], ctx TaskContext, c
path := filepath.ToSlash(obj.Path)

// 上传对象
err = up.Upload(path, obj.Size, obj.File)
err = up.Upload(path, obj.File)
if err != nil {
err = fmt.Errorf("uploading object: %w", err)
log.Error(err.Error())


+ 1
- 5
client/internal/cmdline/object.go View File

@@ -48,17 +48,13 @@ var _ = MustAddCmd(func(ctx CommandContext, packageID cdssdk.PackageID, rootPath
return nil
}

info, err := fi.Info()
if err != nil {
return err
}
file, err := os.Open(fname)
if err != nil {
return err
}
defer file.Close()

return up.Upload(fname, info.Size(), file)
return up.Upload(fname, file)
})
if err != nil {
return err


+ 1
- 1
client/internal/cmdline/put.go View File

@@ -100,7 +100,7 @@ func init() {
}
defer file.Close()

return up.Upload(fname, info.Size(), file)
return up.Upload(fname, file)
})
if err != nil {
fmt.Println(err.Error())


+ 81
- 3
client/internal/http/aws_auth.go View File

@@ -8,6 +8,7 @@ import (
"fmt"
"io"
"net/http"
"strconv"
"strings"
"time"

@@ -93,7 +94,7 @@ func (a *AWSAuth) Auth(c *gin.Context) {
return
}

verifySig := a.getSignature(verifyReq)
verifySig := getSignatureFromAWSHeader(verifyReq)
if !strings.EqualFold(verifySig, reqSig) {
c.AbortWithStatusJSON(http.StatusOK, Failed(errorcode.Unauthorized, "signature mismatch"))
return
@@ -135,13 +136,14 @@ func (a *AWSAuth) AuthWithoutBody(c *gin.Context) {
}

err = a.signer.SignHTTP(context.TODO(), a.cred, verifyReq, "", AuthService, AuthRegion, timestamp)

if err != nil {
logger.Warnf("sign request: %v", err)
c.AbortWithStatusJSON(http.StatusOK, Failed(errorcode.OperationFailed, "sign request failed"))
return
}

verifySig := a.getSignature(verifyReq)
verifySig := getSignatureFromAWSHeader(verifyReq)
if strings.EqualFold(verifySig, reqSig) {
c.AbortWithStatusJSON(http.StatusOK, Failed(errorcode.Unauthorized, "signature mismatch"))
return
@@ -150,6 +152,68 @@ func (a *AWSAuth) AuthWithoutBody(c *gin.Context) {
c.Next()
}

func (a *AWSAuth) PresignedAuth(c *gin.Context) {
query := c.Request.URL.Query()

signature := query.Get("X-Amz-Signature")
query.Del("X-Amz-Signature")
if signature == "" {
c.AbortWithStatusJSON(http.StatusBadRequest, Failed(errorcode.BadArgument, "missing X-Amz-Signature query parameter"))
return
}

// alg := c.Request.URL.Query().Get("X-Amz-Algorithm")
// cred := c.Request.URL.Query().Get("X-Amz-Credential")

date := query.Get("X-Amz-Date")
expiresStr := query.Get("X-Expires")
expires, err := strconv.ParseInt(expiresStr, 10, 64)
if err != nil {
c.AbortWithStatusJSON(http.StatusBadRequest, Failed(errorcode.BadArgument, "invalid X-Expires format"))
return
}

signedHeaders := strings.Split(query.Get("X-Amz-SignedHeaders"), ";")

c.Request.URL.RawQuery = query.Encode()

verifyReq, err := http.NewRequest(c.Request.Method, c.Request.URL.String(), nil)
if err != nil {
c.AbortWithStatusJSON(http.StatusOK, Failed(errorcode.OperationFailed, err.Error()))
return
}
for _, h := range signedHeaders {
verifyReq.Header.Add(h, c.Request.Header.Get(h))
}
verifyReq.Host = c.Request.Host

timestamp, err := time.Parse("20060102T150405Z", date)
if err != nil {
c.AbortWithStatusJSON(http.StatusBadRequest, Failed(errorcode.BadArgument, "invalid X-Amz-Date format"))
return
}

if time.Now().After(timestamp.Add(time.Duration(expires) * time.Second)) {
c.AbortWithStatusJSON(http.StatusUnauthorized, Failed(errorcode.Unauthorized, "request expired"))
return
}

signer := v4.NewSigner()
uri, _, err := signer.PresignHTTP(context.TODO(), a.cred, verifyReq, "", AuthService, AuthRegion, timestamp)
if err != nil {
c.AbortWithStatusJSON(http.StatusOK, Failed(errorcode.OperationFailed, "sign request failed"))
return
}

verifySig := getSignatureFromAWSQuery(uri)
if !strings.EqualFold(verifySig, signature) {
c.AbortWithStatusJSON(http.StatusOK, Failed(errorcode.Unauthorized, "signature mismatch"))
return
}

c.Next()
}

// 解析 Authorization 头部
func parseAuthorizationHeader(authorizationHeader string) (string, []string, string, error) {
if !strings.HasPrefix(authorizationHeader, "AWS4-HMAC-SHA256 ") {
@@ -186,7 +250,7 @@ func parseAuthorizationHeader(authorizationHeader string) (string, []string, str
return credential, headers, signature, nil
}

func (a *AWSAuth) getSignature(req *http.Request) string {
func getSignatureFromAWSHeader(req *http.Request) string {
auth := req.Header.Get(AuthorizationHeader)
idx := strings.Index(auth, "Signature=")
if idx == -1 {
@@ -195,3 +259,17 @@ func (a *AWSAuth) getSignature(req *http.Request) string {

return auth[idx+len("Signature="):]
}

func getSignatureFromAWSQuery(uri string) string {
idx := strings.Index(uri, "X-Amz-Signature=")
if idx == -1 {
return ""
}

andIdx := strings.Index(uri[idx:], "&")
if andIdx == -1 {
return uri[idx+len("X-Amz-Signature="):]
}

return uri[idx+len("X-Amz-Signature=") : andIdx]
}

+ 1
- 1
client/internal/http/object.go View File

@@ -109,7 +109,7 @@ func (s *ObjectService) Upload(ctx *gin.Context) {
}
path = filepath.ToSlash(path)

err = up.Upload(path, file.Size, f)
err = up.Upload(path, f)
if err != nil {
log.Warnf("uploading file: %s", err.Error())
ctx.JSON(http.StatusOK, Failed(errorcode.OperationFailed, fmt.Sprintf("uploading file %v: %v", file.Filename, err)))


+ 180
- 0
client/internal/http/presigned.go View File

@@ -0,0 +1,180 @@
package http

import (
"fmt"
"io"
"net/http"
"net/url"
"path"
"path/filepath"

"github.com/gin-gonic/gin"
"gitlink.org.cn/cloudream/common/consts/errorcode"
"gitlink.org.cn/cloudream/common/pkgs/logger"
"gitlink.org.cn/cloudream/common/sdks/storage/cdsapi"
"gitlink.org.cn/cloudream/common/utils/math2"
"gitlink.org.cn/cloudream/storage/client/internal/config"
"gitlink.org.cn/cloudream/storage/common/pkgs/downloader"
)

type PresignedService struct {
*Server
}

func (s *Server) Presigned() *PresignedService {
return &PresignedService{
Server: s,
}
}

func (s *PresignedService) ObjectDownloadByPath(ctx *gin.Context) {
log := logger.WithField("HTTP", "Presigned.ObjectDownloadByPath")

var req cdsapi.PresignedObjectDownloadByPath
if err := ctx.ShouldBindQuery(&req); err != nil {
log.Warnf("binding query: %s", err.Error())
ctx.JSON(http.StatusBadRequest, Failed(errorcode.BadArgument, "missing argument or invalid argument"))
return
}

_, obj, err := s.svc.ObjectSvc().GetByPath(req.UserID, req.PackageID, req.Path, false, false)
if err != nil {
log.Warnf("getting object by path: %s", err.Error())
ctx.JSON(http.StatusOK, Failed(errorcode.OperationFailed, "get object by path failed"))
return
}

if len(obj) == 0 {
log.Warnf("object not found: %s", req.Path)
ctx.JSON(http.StatusOK, Failed(errorcode.DataNotFound, "object not found"))
return
}

off := req.Offset
len := int64(-1)
if req.Length != nil {
len = *req.Length
}

file, err := s.svc.ObjectSvc().Download(req.UserID, downloader.DownloadReqeust{
ObjectID: obj[0].ObjectID,
Offset: off,
Length: len,
})
if err != nil {
log.Warnf("downloading object: %s", err.Error())
ctx.JSON(http.StatusOK, Failed(errorcode.OperationFailed, "download object failed"))
return
}
defer file.File.Close()

ctx.Header("Content-Disposition", "attachment; filename="+url.PathEscape(path.Base(file.Object.Path)))
ctx.Header("Content-Type", "application/octet-stream")
ctx.Header("Content-Transfer-Encoding", "binary")

n, err := io.Copy(ctx.Writer, file.File)
if err != nil {
log.Warnf("copying file: %s", err.Error())
}

if config.Cfg().StorageID > 0 {
s.svc.AccessStat.AddAccessCounter(file.Object.ObjectID, file.Object.PackageID, config.Cfg().StorageID, math2.DivOrDefault(float64(n), float64(file.Object.Size), 1))
}
}

func (s *PresignedService) ObjectUpload(ctx *gin.Context) {
log := logger.WithField("HTTP", "Presigned.ObjectUpload")

var req cdsapi.PresignedObjectUpload
if err := ctx.ShouldBindQuery(&req); err != nil {
log.Warnf("binding query: %s", err.Error())
ctx.JSON(http.StatusBadRequest, Failed(errorcode.BadArgument, "missing argument or invalid argument"))
return
}

up, err := s.svc.Uploader.BeginUpdate(req.UserID, req.PackageID, req.Affinity, req.LoadTo, req.LoadToPath)
if err != nil {
log.Warnf("begin update: %s", err.Error())
ctx.JSON(http.StatusOK, Failed(errorcode.OperationFailed, fmt.Sprintf("begin update: %v", err)))
return
}
defer up.Abort()

path := filepath.ToSlash(req.Path)

err = up.Upload(path, ctx.Request.Body)
if err != nil {
log.Warnf("uploading file: %s", err.Error())
ctx.JSON(http.StatusOK, Failed(errorcode.OperationFailed, fmt.Sprintf("uploading file %v: %v", req.Path, err)))
return
}

ret, err := up.Commit()
if err != nil {
log.Warnf("commit update: %s", err.Error())
ctx.JSON(http.StatusOK, Failed(errorcode.OperationFailed, fmt.Sprintf("commit update: %v", err)))
return
}

ctx.JSON(http.StatusOK, OK(cdsapi.PresignedObjectUploadResp{Object: ret.Objects[path]}))
}

func (s *PresignedService) ObjectNewMultipartUpload(ctx *gin.Context) {
log := logger.WithField("HTTP", "Presigned.ObjectNewMultipartUpload")

var req cdsapi.PresignedObjectNewMultipartUpload
if err := ctx.ShouldBindQuery(&req); err != nil {
log.Warnf("binding query: %s", err.Error())
ctx.JSON(http.StatusBadRequest, Failed(errorcode.BadArgument, "missing argument or invalid argument"))
return
}

obj, err := s.svc.ObjectSvc().NewMultipartUploadObject(req.UserID, req.PackageID, req.Path)
if err != nil {
log.Warnf("new multipart upload: %s", err.Error())
ctx.JSON(http.StatusOK, Failed(errorcode.OperationFailed, fmt.Sprintf("new multipart upload: %v", err)))
return
}

ctx.JSON(http.StatusOK, OK(cdsapi.PresignedObjectUploadResp{Object: obj}))
}

func (s *PresignedService) ObjectUploadPart(ctx *gin.Context) {
log := logger.WithField("HTTP", "Presigned.ObjectUploadPart")

var req cdsapi.PresignedObjectUploadPart
if err := ctx.ShouldBindQuery(&req); err != nil {
log.Warnf("binding query: %s", err.Error())
ctx.JSON(http.StatusBadRequest, Failed(errorcode.BadArgument, "missing argument or invalid argument"))
return
}

err := s.svc.Uploader.UploadPart(req.UserID, req.ObjectID, req.Index, ctx.Request.Body)
if err != nil {
log.Warnf("uploading part: %s", err.Error())
ctx.JSON(http.StatusOK, Failed(errorcode.OperationFailed, fmt.Sprintf("upload part: %v", err)))
return
}

ctx.JSON(http.StatusOK, OK(cdsapi.ObjectUploadPartResp{}))
}

func (s *PresignedService) ObjectCompleteMultipartUpload(ctx *gin.Context) {
log := logger.WithField("HTTP", "Presigned.ObjectCompleteMultipartUpload")

var req cdsapi.PresignedObjectCompleteMultipartUpload
if err := ctx.ShouldBindQuery(&req); err != nil {
log.Warnf("binding query: %s", err.Error())
ctx.JSON(http.StatusBadRequest, Failed(errorcode.BadArgument, "missing argument or invalid argument"))
return
}

obj, err := s.svc.ObjectSvc().CompleteMultipartUpload(req.UserID, req.ObjectID, req.Indexes)
if err != nil {
log.Warnf("completing multipart upload: %s", err.Error())
ctx.JSON(http.StatusOK, Failed(errorcode.OperationFailed, fmt.Sprintf("complete multipart upload: %v", err)))
return
}

ctx.JSON(http.StatusOK, OK(cdsapi.ObjectCompleteMultipartUploadResp{Object: obj}))
}

+ 7
- 0
client/internal/http/server.go View File

@@ -123,4 +123,11 @@ func (s *Server) routeV1(eg *gin.Engine, rt gin.IRoutes) {
rt.POST(cdsapi.ObjectNewMultipartUploadPath, s.Object().NewMultipartUpload)
rt.POST(cdsapi.ObjectUploadPartPath, s.Object().UploadPart)
rt.POST(cdsapi.ObjectCompleteMultipartUploadPath, s.Object().CompleteMultipartUpload)

rt.GET(cdsapi.PresignedObjectDownloadByPathPath, s.awsAuth.PresignedAuth, s.Presigned().ObjectDownloadByPath)
rt.POST(cdsapi.PresignedObjectUploadPath, s.awsAuth.PresignedAuth, s.Presigned().ObjectUpload)

rt.POST(cdsapi.PresignedObjectNewMultipartUploadPath, s.awsAuth.PresignedAuth, s.Presigned().ObjectNewMultipartUpload)
rt.POST(cdsapi.PresignedObjectUploadPartPath, s.awsAuth.PresignedAuth, s.Presigned().ObjectUploadPart)
rt.POST(cdsapi.PresignedObjectCompleteMultipartUploadPath, s.awsAuth.PresignedAuth, s.Presigned().ObjectCompleteMultipartUpload)
}

+ 5
- 4
common/pkgs/uploader/update.go View File

@@ -42,13 +42,13 @@ type UpdateResult struct {
Objects map[string]cdssdk.Object
}

func (w *UpdateUploader) Upload(pat string, size int64, stream io.Reader) error {
func (w *UpdateUploader) Upload(pat string, stream io.Reader) error {
uploadTime := time.Now()

ft := ioswitch2.NewFromTo()
fromExec, hd := ioswitch2.NewFromDriver(ioswitch2.RawStream())
ft.AddFrom(fromExec).
AddTo(ioswitch2.NewToShardStore(*w.targetStg.MasterHub, w.targetStg, ioswitch2.RawStream(), "fileHash"))
AddTo(ioswitch2.NewToShardStore(*w.targetStg.MasterHub, w.targetStg, ioswitch2.RawStream(), "shardInfo"))

for i, stg := range w.loadToStgs {
ft.AddTo(ioswitch2.NewLoadToPublic(*stg.MasterHub, stg, path.Join(w.loadToPath[i], pat)))
@@ -73,10 +73,11 @@ func (w *UpdateUploader) Upload(pat string, size int64, stream io.Reader) error
defer w.lock.Unlock()

// 记录上传结果
shardInfo := ret["shardInfo"].(*ops2.ShardInfoValue)
w.successes = append(w.successes, coormq.AddObjectEntry{
Path: pat,
Size: size,
FileHash: ret["fileHash"].(*ops2.ShardInfoValue).Hash,
Size: shardInfo.Size,
FileHash: shardInfo.Hash,
UploadTime: uploadTime,
StorageIDs: []cdssdk.StorageID{w.targetStg.Storage.StorageID},
})


Loading…
Cancel
Save