diff --git a/agent/internal/task/create_package.go b/agent/internal/task/create_package.go index 9050a5d..1ce6104 100644 --- a/agent/internal/task/create_package.go +++ b/agent/internal/task/create_package.go @@ -2,6 +2,7 @@ package task import ( "fmt" + "path/filepath" "time" "github.com/samber/lo" @@ -109,9 +110,10 @@ func (t *CreatePackage) Execute(task *task.Task[TaskContext], ctx TaskContext, c }) return } + path := filepath.ToSlash(obj.Path) // 上传对象 - err = up.Upload(obj.Path, obj.Size, obj.File) + err = up.Upload(path, obj.Size, obj.File) if err != nil { err = fmt.Errorf("uploading object: %w", err) log.Error(err.Error()) diff --git a/client/internal/http/object.go b/client/internal/http/object.go index bded3c6..c36a48e 100644 --- a/client/internal/http/object.go +++ b/client/internal/http/object.go @@ -7,6 +7,7 @@ import ( "net/http" "net/url" "path" + "path/filepath" "github.com/gin-gonic/gin" "gitlink.org.cn/cloudream/common/consts/errorcode" @@ -27,6 +28,26 @@ func (s *Server) Object() *ObjectService { } } +func (s *ObjectService) List(ctx *gin.Context) { + log := logger.WithField("HTTP", "Object.List") + + var req cdsapi.ObjectList + if err := ctx.ShouldBindQuery(&req); err != nil { + log.Warnf("binding body: %s", err.Error()) + ctx.JSON(http.StatusBadRequest, Failed(errorcode.BadArgument, "missing argument or invalid argument")) + return + } + + objs, err := s.svc.ObjectSvc().List(req.UserID, req.PackageID, req.PathPrefix) + if err != nil { + log.Warnf("listing objects: %s", err.Error()) + ctx.JSON(http.StatusOK, Failed(errorcode.OperationFailed, fmt.Sprintf("listing objects: %v", err))) + return + } + + ctx.JSON(http.StatusOK, OK(cdsapi.ObjectListResp{Objects: objs})) +} + type ObjectUploadReq struct { Info cdsapi.ObjectUploadInfo `form:"info" binding:"required"` Files []*multipart.FileHeader `form:"files"` @@ -65,6 +86,7 @@ func (s *ObjectService) Upload(ctx *gin.Context) { ctx.JSON(http.StatusOK, Failed(errorcode.OperationFailed, fmt.Sprintf("unescape filename %v: %v", file.Filename, err))) return } + path = filepath.ToSlash(path) err = up.Upload(path, file.Size, f) if err != nil { diff --git a/client/internal/http/package.go b/client/internal/http/package.go index 306d40b..c196e8a 100644 --- a/client/internal/http/package.go +++ b/client/internal/http/package.go @@ -5,6 +5,7 @@ import ( "mime/multipart" "net/http" "net/url" + "path/filepath" "github.com/gin-gonic/gin" "gitlink.org.cn/cloudream/common/consts/errorcode" @@ -125,6 +126,7 @@ func (s *PackageService) CreateLoad(ctx *gin.Context) { ctx.JSON(http.StatusOK, Failed(errorcode.OperationFailed, fmt.Sprintf("unescape filename %v: %v", file.Filename, err))) return } + path = filepath.ToSlash(path) err = up.Upload(path, file.Size, f) if err != nil { diff --git a/client/internal/http/server.go b/client/internal/http/server.go index 40d61d1..c1e70d1 100644 --- a/client/internal/http/server.go +++ b/client/internal/http/server.go @@ -43,6 +43,7 @@ func (s *Server) initRouters() { // initTemp(rt, s) + rt.GET(cdsapi.ObjectListPath, s.Object().List) rt.GET(cdsapi.ObjectDownloadPath, s.Object().Download) rt.POST(cdsapi.ObjectUploadPath, s.Object().Upload) rt.GET(cdsapi.ObjectGetPackageObjectsPath, s.Object().GetPackageObjects) diff --git a/client/internal/services/object.go b/client/internal/services/object.go index 4dc05ec..051ede7 100644 --- a/client/internal/services/object.go +++ b/client/internal/services/object.go @@ -22,6 +22,21 @@ func (svc *Service) ObjectSvc() *ObjectService { return &ObjectService{Service: svc} } +func (svc *ObjectService) List(userID cdssdk.UserID, pkgID cdssdk.PackageID, pathPrefix string) ([]cdssdk.Object, error) { + coorCli, err := stgglb.CoordinatorMQPool.Acquire() + if err != nil { + return nil, fmt.Errorf("new coordinator client: %w", err) + } + defer stgglb.CoordinatorMQPool.Release(coorCli) + + listResp, err := coorCli.GetObjectsWithPrefix(coormq.ReqGetObjectsWithPrefix(userID, pkgID, pathPrefix)) + if err != nil { + return nil, fmt.Errorf("requsting to coodinator: %w", err) + } + + return listResp.Objects, nil +} + func (svc *ObjectService) UpdateInfo(userID cdssdk.UserID, updatings []cdsapi.UpdatingObject) ([]cdssdk.ObjectID, error) { coorCli, err := stgglb.CoordinatorMQPool.Acquire() if err != nil { @@ -94,7 +109,7 @@ func (svc *ObjectService) GetPackageObjects(userID cdssdk.UserID, packageID cdss } defer stgglb.CoordinatorMQPool.Release(coorCli) // 释放协调器客户端资源 - getResp, err := coorCli.GetPackageObjects(coormq.NewGetPackageObjects(userID, packageID)) // 请求协调器获取套餐对象 + getResp, err := coorCli.GetPackageObjects(coormq.ReqGetPackageObjects(userID, packageID)) // 请求协调器获取套餐对象 if err != nil { return nil, fmt.Errorf("requesting to coordinator: %w", err) } diff --git a/common/pkgs/db2/object.go b/common/pkgs/db2/object.go index a8bb0bd..96d2042 100644 --- a/common/pkgs/db2/object.go +++ b/common/pkgs/db2/object.go @@ -26,6 +26,12 @@ func (db *ObjectDB) GetByID(ctx SQLContext, objectID cdssdk.ObjectID) (model.Obj return ret, err } +func (db *ObjectDB) GetWithPathPrefix(ctx SQLContext, packageID cdssdk.PackageID, pathPrefix string) ([]cdssdk.Object, error) { + var ret []cdssdk.Object + err := ctx.Table("Object").Where("PackageID = ? AND Path LIKE ?", packageID, pathPrefix+"%").Order("ObjectID ASC").Find(&ret).Error + return ret, err +} + func (db *ObjectDB) BatchTestObjectID(ctx SQLContext, objectIDs []cdssdk.ObjectID) (map[cdssdk.ObjectID]bool, error) { if len(objectIDs) == 0 { return make(map[cdssdk.ObjectID]bool), nil diff --git a/common/pkgs/mq/coordinator/object.go b/common/pkgs/mq/coordinator/object.go index 102a382..2c224df 100644 --- a/common/pkgs/mq/coordinator/object.go +++ b/common/pkgs/mq/coordinator/object.go @@ -10,6 +10,8 @@ import ( ) type ObjectService interface { + GetObjectsWithPrefix(msg *GetObjectsWithPrefix) (*GetObjectsWithPrefixResp, *mq.CodeMessage) + GetPackageObjects(msg *GetPackageObjects) (*GetPackageObjectsResp, *mq.CodeMessage) GetPackageObjectDetails(msg *GetPackageObjectDetails) (*GetPackageObjectDetailsResp, *mq.CodeMessage) @@ -29,6 +31,36 @@ type ObjectService interface { AddAccessStat(msg *AddAccessStat) } +// 查询指定前缀的Object,返回的Objects会按照ObjectID升序 +var _ = Register(Service.GetObjectsWithPrefix) + +type GetObjectsWithPrefix struct { + mq.MessageBodyBase + UserID cdssdk.UserID `json:"userID"` + PackageID cdssdk.PackageID `json:"packageID"` + PathPrefix string `json:"pathPrefix"` +} +type GetObjectsWithPrefixResp struct { + mq.MessageBodyBase + Objects []model.Object `json:"objects"` +} + +func ReqGetObjectsWithPrefix(userID cdssdk.UserID, packageID cdssdk.PackageID, pathPrefix string) *GetObjectsWithPrefix { + return &GetObjectsWithPrefix{ + UserID: userID, + PackageID: packageID, + PathPrefix: pathPrefix, + } +} +func RespGetObjectsWithPrefix(objects []model.Object) *GetObjectsWithPrefixResp { + return &GetObjectsWithPrefixResp{ + Objects: objects, + } +} +func (client *Client) GetObjectsWithPrefix(msg *GetObjectsWithPrefix) (*GetObjectsWithPrefixResp, error) { + return mq.Request(Service.GetObjectsWithPrefix, client.rabbitCli, msg) +} + // 查询Package中的所有Object,返回的Objects会按照ObjectID升序 var _ = Register(Service.GetPackageObjects) @@ -42,13 +74,13 @@ type GetPackageObjectsResp struct { Objects []model.Object `json:"objects"` } -func NewGetPackageObjects(userID cdssdk.UserID, packageID cdssdk.PackageID) *GetPackageObjects { +func ReqGetPackageObjects(userID cdssdk.UserID, packageID cdssdk.PackageID) *GetPackageObjects { return &GetPackageObjects{ UserID: userID, PackageID: packageID, } } -func NewGetPackageObjectsResp(objects []model.Object) *GetPackageObjectsResp { +func RespGetPackageObjects(objects []model.Object) *GetPackageObjectsResp { return &GetPackageObjectsResp{ Objects: objects, } diff --git a/common/pkgs/storage/s3/shard_store.go b/common/pkgs/storage/s3/shard_store.go index d694e13..38aa271 100644 --- a/common/pkgs/storage/s3/shard_store.go +++ b/common/pkgs/storage/s3/shard_store.go @@ -104,6 +104,10 @@ func (s *ShardStore) removeUnusedTempFiles() { marker = resp.NextMarker } + if len(deletes) == 0 { + return + } + resp, err := s.cli.DeleteObjects(context.Background(), &s3.DeleteObjectsInput{ Bucket: aws.String(s.bucket), Delete: &s3types.Delete{ diff --git a/coordinator/internal/mq/object.go b/coordinator/internal/mq/object.go index af71966..f22b563 100644 --- a/coordinator/internal/mq/object.go +++ b/coordinator/internal/mq/object.go @@ -17,6 +17,31 @@ import ( coormq "gitlink.org.cn/cloudream/storage/common/pkgs/mq/coordinator" ) +func (svc *Service) GetObjectsWithPrefix(msg *coormq.GetObjectsWithPrefix) (*coormq.GetObjectsWithPrefixResp, *mq.CodeMessage) { + var objs []cdssdk.Object + err := svc.db2.DoTx(func(tx db2.SQLContext) error { + var err error + + _, err = svc.db2.Package().GetUserPackage(tx, msg.UserID, msg.PackageID) + if err != nil { + return fmt.Errorf("getting package by id: %w", err) + } + + objs, err = svc.db2.Object().GetWithPathPrefix(tx, msg.PackageID, msg.PathPrefix) + if err != nil { + return fmt.Errorf("getting objects with prefix: %w", err) + } + + return nil + }) + if err != nil { + logger.WithField("PathPrefix", msg.PathPrefix).Warn(err.Error()) + return nil, mq.Failed(errorcode.OperationFailed, "get objects with prefix failed") + } + + return mq.ReplyOK(coormq.RespGetObjectsWithPrefix(objs)) +} + func (svc *Service) GetPackageObjects(msg *coormq.GetPackageObjects) (*coormq.GetPackageObjectsResp, *mq.CodeMessage) { var objs []cdssdk.Object err := svc.db2.DoTx(func(tx db2.SQLContext) error { @@ -39,7 +64,7 @@ func (svc *Service) GetPackageObjects(msg *coormq.GetPackageObjects) (*coormq.Ge return nil, mq.Failed(errorcode.OperationFailed, "get package objects failed") } - return mq.ReplyOK(coormq.NewGetPackageObjectsResp(objs)) + return mq.ReplyOK(coormq.RespGetPackageObjects(objs)) } func (svc *Service) GetPackageObjectDetails(msg *coormq.GetPackageObjectDetails) (*coormq.GetPackageObjectDetailsResp, *mq.CodeMessage) {