From 97347f760b695d0a789bf3de1d47ab50d1a82b95 Mon Sep 17 00:00:00 2001 From: Sydonian <794346190@qq.com> Date: Thu, 6 Mar 2025 14:33:02 +0800 Subject: [PATCH] =?UTF-8?q?=E5=A2=9E=E5=8A=A0=E9=A2=84=E7=AD=BE=E5=90=8D?= =?UTF-8?q?=E6=8E=A5=E5=8F=A3?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- agent/internal/task/create_package.go | 2 +- client/internal/cmdline/object.go | 6 +- client/internal/cmdline/put.go | 2 +- client/internal/http/aws_auth.go | 84 +++++++++++- client/internal/http/object.go | 2 +- client/internal/http/presigned.go | 180 ++++++++++++++++++++++++++ client/internal/http/server.go | 7 + common/pkgs/uploader/update.go | 9 +- 8 files changed, 277 insertions(+), 15 deletions(-) create mode 100644 client/internal/http/presigned.go diff --git a/agent/internal/task/create_package.go b/agent/internal/task/create_package.go index 6354938..f5c8ae3 100644 --- a/agent/internal/task/create_package.go +++ b/agent/internal/task/create_package.go @@ -113,7 +113,7 @@ func (t *CreatePackage) Execute(task *task.Task[TaskContext], ctx TaskContext, c path := filepath.ToSlash(obj.Path) // 上传对象 - err = up.Upload(path, obj.Size, obj.File) + err = up.Upload(path, obj.File) if err != nil { err = fmt.Errorf("uploading object: %w", err) log.Error(err.Error()) diff --git a/client/internal/cmdline/object.go b/client/internal/cmdline/object.go index c3791d5..8267162 100644 --- a/client/internal/cmdline/object.go +++ b/client/internal/cmdline/object.go @@ -48,17 +48,13 @@ var _ = MustAddCmd(func(ctx CommandContext, packageID cdssdk.PackageID, rootPath return nil } - info, err := fi.Info() - if err != nil { - return err - } file, err := os.Open(fname) if err != nil { return err } defer file.Close() - return up.Upload(fname, info.Size(), file) + return up.Upload(fname, file) }) if err != nil { return err diff --git a/client/internal/cmdline/put.go b/client/internal/cmdline/put.go index a540518..a8f9b82 100644 --- a/client/internal/cmdline/put.go +++ b/client/internal/cmdline/put.go @@ -100,7 +100,7 @@ func init() { } defer file.Close() - return up.Upload(fname, info.Size(), file) + return up.Upload(fname, file) }) if err != nil { fmt.Println(err.Error()) diff --git a/client/internal/http/aws_auth.go b/client/internal/http/aws_auth.go index 4a6d283..71a4a95 100644 --- a/client/internal/http/aws_auth.go +++ b/client/internal/http/aws_auth.go @@ -8,6 +8,7 @@ import ( "fmt" "io" "net/http" + "strconv" "strings" "time" @@ -93,7 +94,7 @@ func (a *AWSAuth) Auth(c *gin.Context) { return } - verifySig := a.getSignature(verifyReq) + verifySig := getSignatureFromAWSHeader(verifyReq) if !strings.EqualFold(verifySig, reqSig) { c.AbortWithStatusJSON(http.StatusOK, Failed(errorcode.Unauthorized, "signature mismatch")) return @@ -135,13 +136,14 @@ func (a *AWSAuth) AuthWithoutBody(c *gin.Context) { } err = a.signer.SignHTTP(context.TODO(), a.cred, verifyReq, "", AuthService, AuthRegion, timestamp) + if err != nil { logger.Warnf("sign request: %v", err) c.AbortWithStatusJSON(http.StatusOK, Failed(errorcode.OperationFailed, "sign request failed")) return } - verifySig := a.getSignature(verifyReq) + verifySig := getSignatureFromAWSHeader(verifyReq) if strings.EqualFold(verifySig, reqSig) { c.AbortWithStatusJSON(http.StatusOK, Failed(errorcode.Unauthorized, "signature mismatch")) return @@ -150,6 +152,68 @@ func (a *AWSAuth) AuthWithoutBody(c *gin.Context) { c.Next() } +func (a *AWSAuth) PresignedAuth(c *gin.Context) { + query := c.Request.URL.Query() + + signature := query.Get("X-Amz-Signature") + query.Del("X-Amz-Signature") + if signature == "" { + c.AbortWithStatusJSON(http.StatusBadRequest, Failed(errorcode.BadArgument, "missing X-Amz-Signature query parameter")) + return + } + + // alg := c.Request.URL.Query().Get("X-Amz-Algorithm") + // cred := c.Request.URL.Query().Get("X-Amz-Credential") + + date := query.Get("X-Amz-Date") + expiresStr := query.Get("X-Expires") + expires, err := strconv.ParseInt(expiresStr, 10, 64) + if err != nil { + c.AbortWithStatusJSON(http.StatusBadRequest, Failed(errorcode.BadArgument, "invalid X-Expires format")) + return + } + + signedHeaders := strings.Split(query.Get("X-Amz-SignedHeaders"), ";") + + c.Request.URL.RawQuery = query.Encode() + + verifyReq, err := http.NewRequest(c.Request.Method, c.Request.URL.String(), nil) + if err != nil { + c.AbortWithStatusJSON(http.StatusOK, Failed(errorcode.OperationFailed, err.Error())) + return + } + for _, h := range signedHeaders { + verifyReq.Header.Add(h, c.Request.Header.Get(h)) + } + verifyReq.Host = c.Request.Host + + timestamp, err := time.Parse("20060102T150405Z", date) + if err != nil { + c.AbortWithStatusJSON(http.StatusBadRequest, Failed(errorcode.BadArgument, "invalid X-Amz-Date format")) + return + } + + if time.Now().After(timestamp.Add(time.Duration(expires) * time.Second)) { + c.AbortWithStatusJSON(http.StatusUnauthorized, Failed(errorcode.Unauthorized, "request expired")) + return + } + + signer := v4.NewSigner() + uri, _, err := signer.PresignHTTP(context.TODO(), a.cred, verifyReq, "", AuthService, AuthRegion, timestamp) + if err != nil { + c.AbortWithStatusJSON(http.StatusOK, Failed(errorcode.OperationFailed, "sign request failed")) + return + } + + verifySig := getSignatureFromAWSQuery(uri) + if !strings.EqualFold(verifySig, signature) { + c.AbortWithStatusJSON(http.StatusOK, Failed(errorcode.Unauthorized, "signature mismatch")) + return + } + + c.Next() +} + // 解析 Authorization 头部 func parseAuthorizationHeader(authorizationHeader string) (string, []string, string, error) { if !strings.HasPrefix(authorizationHeader, "AWS4-HMAC-SHA256 ") { @@ -186,7 +250,7 @@ func parseAuthorizationHeader(authorizationHeader string) (string, []string, str return credential, headers, signature, nil } -func (a *AWSAuth) getSignature(req *http.Request) string { +func getSignatureFromAWSHeader(req *http.Request) string { auth := req.Header.Get(AuthorizationHeader) idx := strings.Index(auth, "Signature=") if idx == -1 { @@ -195,3 +259,17 @@ func (a *AWSAuth) getSignature(req *http.Request) string { return auth[idx+len("Signature="):] } + +func getSignatureFromAWSQuery(uri string) string { + idx := strings.Index(uri, "X-Amz-Signature=") + if idx == -1 { + return "" + } + + andIdx := strings.Index(uri[idx:], "&") + if andIdx == -1 { + return uri[idx+len("X-Amz-Signature="):] + } + + return uri[idx+len("X-Amz-Signature=") : andIdx] +} diff --git a/client/internal/http/object.go b/client/internal/http/object.go index 0fc8593..4e44eac 100644 --- a/client/internal/http/object.go +++ b/client/internal/http/object.go @@ -109,7 +109,7 @@ func (s *ObjectService) Upload(ctx *gin.Context) { } path = filepath.ToSlash(path) - err = up.Upload(path, file.Size, f) + err = up.Upload(path, f) if err != nil { log.Warnf("uploading file: %s", err.Error()) ctx.JSON(http.StatusOK, Failed(errorcode.OperationFailed, fmt.Sprintf("uploading file %v: %v", file.Filename, err))) diff --git a/client/internal/http/presigned.go b/client/internal/http/presigned.go new file mode 100644 index 0000000..27cf389 --- /dev/null +++ b/client/internal/http/presigned.go @@ -0,0 +1,180 @@ +package http + +import ( + "fmt" + "io" + "net/http" + "net/url" + "path" + "path/filepath" + + "github.com/gin-gonic/gin" + "gitlink.org.cn/cloudream/common/consts/errorcode" + "gitlink.org.cn/cloudream/common/pkgs/logger" + "gitlink.org.cn/cloudream/common/sdks/storage/cdsapi" + "gitlink.org.cn/cloudream/common/utils/math2" + "gitlink.org.cn/cloudream/storage/client/internal/config" + "gitlink.org.cn/cloudream/storage/common/pkgs/downloader" +) + +type PresignedService struct { + *Server +} + +func (s *Server) Presigned() *PresignedService { + return &PresignedService{ + Server: s, + } +} + +func (s *PresignedService) ObjectDownloadByPath(ctx *gin.Context) { + log := logger.WithField("HTTP", "Presigned.ObjectDownloadByPath") + + var req cdsapi.PresignedObjectDownloadByPath + if err := ctx.ShouldBindQuery(&req); err != nil { + log.Warnf("binding query: %s", err.Error()) + ctx.JSON(http.StatusBadRequest, Failed(errorcode.BadArgument, "missing argument or invalid argument")) + return + } + + _, obj, err := s.svc.ObjectSvc().GetByPath(req.UserID, req.PackageID, req.Path, false, false) + if err != nil { + log.Warnf("getting object by path: %s", err.Error()) + ctx.JSON(http.StatusOK, Failed(errorcode.OperationFailed, "get object by path failed")) + return + } + + if len(obj) == 0 { + log.Warnf("object not found: %s", req.Path) + ctx.JSON(http.StatusOK, Failed(errorcode.DataNotFound, "object not found")) + return + } + + off := req.Offset + len := int64(-1) + if req.Length != nil { + len = *req.Length + } + + file, err := s.svc.ObjectSvc().Download(req.UserID, downloader.DownloadReqeust{ + ObjectID: obj[0].ObjectID, + Offset: off, + Length: len, + }) + if err != nil { + log.Warnf("downloading object: %s", err.Error()) + ctx.JSON(http.StatusOK, Failed(errorcode.OperationFailed, "download object failed")) + return + } + defer file.File.Close() + + ctx.Header("Content-Disposition", "attachment; filename="+url.PathEscape(path.Base(file.Object.Path))) + ctx.Header("Content-Type", "application/octet-stream") + ctx.Header("Content-Transfer-Encoding", "binary") + + n, err := io.Copy(ctx.Writer, file.File) + if err != nil { + log.Warnf("copying file: %s", err.Error()) + } + + if config.Cfg().StorageID > 0 { + s.svc.AccessStat.AddAccessCounter(file.Object.ObjectID, file.Object.PackageID, config.Cfg().StorageID, math2.DivOrDefault(float64(n), float64(file.Object.Size), 1)) + } +} + +func (s *PresignedService) ObjectUpload(ctx *gin.Context) { + log := logger.WithField("HTTP", "Presigned.ObjectUpload") + + var req cdsapi.PresignedObjectUpload + if err := ctx.ShouldBindQuery(&req); err != nil { + log.Warnf("binding query: %s", err.Error()) + ctx.JSON(http.StatusBadRequest, Failed(errorcode.BadArgument, "missing argument or invalid argument")) + return + } + + up, err := s.svc.Uploader.BeginUpdate(req.UserID, req.PackageID, req.Affinity, req.LoadTo, req.LoadToPath) + if err != nil { + log.Warnf("begin update: %s", err.Error()) + ctx.JSON(http.StatusOK, Failed(errorcode.OperationFailed, fmt.Sprintf("begin update: %v", err))) + return + } + defer up.Abort() + + path := filepath.ToSlash(req.Path) + + err = up.Upload(path, ctx.Request.Body) + if err != nil { + log.Warnf("uploading file: %s", err.Error()) + ctx.JSON(http.StatusOK, Failed(errorcode.OperationFailed, fmt.Sprintf("uploading file %v: %v", req.Path, err))) + return + } + + ret, err := up.Commit() + if err != nil { + log.Warnf("commit update: %s", err.Error()) + ctx.JSON(http.StatusOK, Failed(errorcode.OperationFailed, fmt.Sprintf("commit update: %v", err))) + return + } + + ctx.JSON(http.StatusOK, OK(cdsapi.PresignedObjectUploadResp{Object: ret.Objects[path]})) +} + +func (s *PresignedService) ObjectNewMultipartUpload(ctx *gin.Context) { + log := logger.WithField("HTTP", "Presigned.ObjectNewMultipartUpload") + + var req cdsapi.PresignedObjectNewMultipartUpload + if err := ctx.ShouldBindQuery(&req); err != nil { + log.Warnf("binding query: %s", err.Error()) + ctx.JSON(http.StatusBadRequest, Failed(errorcode.BadArgument, "missing argument or invalid argument")) + return + } + + obj, err := s.svc.ObjectSvc().NewMultipartUploadObject(req.UserID, req.PackageID, req.Path) + if err != nil { + log.Warnf("new multipart upload: %s", err.Error()) + ctx.JSON(http.StatusOK, Failed(errorcode.OperationFailed, fmt.Sprintf("new multipart upload: %v", err))) + return + } + + ctx.JSON(http.StatusOK, OK(cdsapi.PresignedObjectUploadResp{Object: obj})) +} + +func (s *PresignedService) ObjectUploadPart(ctx *gin.Context) { + log := logger.WithField("HTTP", "Presigned.ObjectUploadPart") + + var req cdsapi.PresignedObjectUploadPart + if err := ctx.ShouldBindQuery(&req); err != nil { + log.Warnf("binding query: %s", err.Error()) + ctx.JSON(http.StatusBadRequest, Failed(errorcode.BadArgument, "missing argument or invalid argument")) + return + } + + err := s.svc.Uploader.UploadPart(req.UserID, req.ObjectID, req.Index, ctx.Request.Body) + if err != nil { + log.Warnf("uploading part: %s", err.Error()) + ctx.JSON(http.StatusOK, Failed(errorcode.OperationFailed, fmt.Sprintf("upload part: %v", err))) + return + } + + ctx.JSON(http.StatusOK, OK(cdsapi.ObjectUploadPartResp{})) +} + +func (s *PresignedService) ObjectCompleteMultipartUpload(ctx *gin.Context) { + log := logger.WithField("HTTP", "Presigned.ObjectCompleteMultipartUpload") + + var req cdsapi.PresignedObjectCompleteMultipartUpload + if err := ctx.ShouldBindQuery(&req); err != nil { + log.Warnf("binding query: %s", err.Error()) + ctx.JSON(http.StatusBadRequest, Failed(errorcode.BadArgument, "missing argument or invalid argument")) + return + } + + obj, err := s.svc.ObjectSvc().CompleteMultipartUpload(req.UserID, req.ObjectID, req.Indexes) + if err != nil { + log.Warnf("completing multipart upload: %s", err.Error()) + ctx.JSON(http.StatusOK, Failed(errorcode.OperationFailed, fmt.Sprintf("complete multipart upload: %v", err))) + return + } + + ctx.JSON(http.StatusOK, OK(cdsapi.ObjectCompleteMultipartUploadResp{Object: obj})) +} diff --git a/client/internal/http/server.go b/client/internal/http/server.go index 4597dca..123e66a 100644 --- a/client/internal/http/server.go +++ b/client/internal/http/server.go @@ -123,4 +123,11 @@ func (s *Server) routeV1(eg *gin.Engine, rt gin.IRoutes) { rt.POST(cdsapi.ObjectNewMultipartUploadPath, s.Object().NewMultipartUpload) rt.POST(cdsapi.ObjectUploadPartPath, s.Object().UploadPart) rt.POST(cdsapi.ObjectCompleteMultipartUploadPath, s.Object().CompleteMultipartUpload) + + rt.GET(cdsapi.PresignedObjectDownloadByPathPath, s.awsAuth.PresignedAuth, s.Presigned().ObjectDownloadByPath) + rt.POST(cdsapi.PresignedObjectUploadPath, s.awsAuth.PresignedAuth, s.Presigned().ObjectUpload) + + rt.POST(cdsapi.PresignedObjectNewMultipartUploadPath, s.awsAuth.PresignedAuth, s.Presigned().ObjectNewMultipartUpload) + rt.POST(cdsapi.PresignedObjectUploadPartPath, s.awsAuth.PresignedAuth, s.Presigned().ObjectUploadPart) + rt.POST(cdsapi.PresignedObjectCompleteMultipartUploadPath, s.awsAuth.PresignedAuth, s.Presigned().ObjectCompleteMultipartUpload) } diff --git a/common/pkgs/uploader/update.go b/common/pkgs/uploader/update.go index 3514876..4000c6e 100644 --- a/common/pkgs/uploader/update.go +++ b/common/pkgs/uploader/update.go @@ -42,13 +42,13 @@ type UpdateResult struct { Objects map[string]cdssdk.Object } -func (w *UpdateUploader) Upload(pat string, size int64, stream io.Reader) error { +func (w *UpdateUploader) Upload(pat string, stream io.Reader) error { uploadTime := time.Now() ft := ioswitch2.NewFromTo() fromExec, hd := ioswitch2.NewFromDriver(ioswitch2.RawStream()) ft.AddFrom(fromExec). - AddTo(ioswitch2.NewToShardStore(*w.targetStg.MasterHub, w.targetStg, ioswitch2.RawStream(), "fileHash")) + AddTo(ioswitch2.NewToShardStore(*w.targetStg.MasterHub, w.targetStg, ioswitch2.RawStream(), "shardInfo")) for i, stg := range w.loadToStgs { ft.AddTo(ioswitch2.NewLoadToPublic(*stg.MasterHub, stg, path.Join(w.loadToPath[i], pat))) @@ -73,10 +73,11 @@ func (w *UpdateUploader) Upload(pat string, size int64, stream io.Reader) error defer w.lock.Unlock() // 记录上传结果 + shardInfo := ret["shardInfo"].(*ops2.ShardInfoValue) w.successes = append(w.successes, coormq.AddObjectEntry{ Path: pat, - Size: size, - FileHash: ret["fileHash"].(*ops2.ShardInfoValue).Hash, + Size: shardInfo.Size, + FileHash: shardInfo.Hash, UploadTime: uploadTime, StorageIDs: []cdssdk.StorageID{w.targetStg.Storage.StorageID}, })