diff --git a/client/internal/http/bucket.go b/client/internal/http/bucket.go index 1fb319d..4cab67f 100644 --- a/client/internal/http/bucket.go +++ b/client/internal/http/bucket.go @@ -32,7 +32,7 @@ func (s *BucketService) GetByName(ctx *gin.Context) { bucket, err := s.svc.BucketSvc().GetBucketByName(req.UserID, req.Name) if err != nil { log.Warnf("getting bucket by name: %s", err.Error()) - ctx.JSON(http.StatusOK, Failed(errorcode.OperationFailed, "get bucket by name failed")) + ctx.JSON(http.StatusOK, FailedError(err)) return } @@ -54,7 +54,7 @@ func (s *BucketService) Create(ctx *gin.Context) { bucket, err := s.svc.BucketSvc().CreateBucket(req.UserID, req.Name) if err != nil { log.Warnf("creating bucket: %s", err.Error()) - ctx.JSON(http.StatusOK, Failed(errorcode.OperationFailed, "create bucket failed")) + ctx.JSON(http.StatusOK, FailedError(err)) return } diff --git a/client/internal/http/http.go b/client/internal/http/http.go index 20b1394..3df07b6 100644 --- a/client/internal/http/http.go +++ b/client/internal/http/http.go @@ -1,6 +1,9 @@ package http -import "gitlink.org.cn/cloudream/common/consts/errorcode" +import ( + "gitlink.org.cn/cloudream/common/consts/errorcode" + "gitlink.org.cn/cloudream/common/pkgs/mq" +) type Response struct { Code string `json:"code"` @@ -22,3 +25,11 @@ func Failed(code string, msg string) Response { Message: msg, } } + +func FailedError(err error) Response { + if codeErr, ok := err.(*mq.CodeMessageError); ok { + return Failed(codeErr.Code, codeErr.Message) + } + + return Failed(errorcode.OperationFailed, err.Error()) +} diff --git a/client/internal/http/object.go b/client/internal/http/object.go index c36a48e..c535d5c 100644 --- a/client/internal/http/object.go +++ b/client/internal/http/object.go @@ -38,7 +38,7 @@ func (s *ObjectService) List(ctx *gin.Context) { return } - objs, err := s.svc.ObjectSvc().List(req.UserID, req.PackageID, req.PathPrefix) + objs, err := s.svc.ObjectSvc().GetByPath(req.UserID, req.PackageID, req.Path, req.IsPrefix) if err != nil { log.Warnf("listing objects: %s", err.Error()) ctx.JSON(http.StatusOK, Failed(errorcode.OperationFailed, fmt.Sprintf("listing objects: %v", err))) @@ -155,6 +155,61 @@ func (s *ObjectService) Download(ctx *gin.Context) { } } +func (s *ObjectService) DownloadByPath(ctx *gin.Context) { + log := logger.WithField("HTTP", "Object.DownloadByPath") + + var req cdsapi.ObjectDownloadByPath + if err := ctx.ShouldBindQuery(&req); err != nil { + log.Warnf("binding body: %s", err.Error()) + ctx.JSON(http.StatusBadRequest, Failed(errorcode.BadArgument, "missing argument or invalid argument")) + return + } + + obj, err := s.svc.ObjectSvc().GetByPath(req.UserID, req.PackageID, req.Path, false) + if err != nil { + log.Warnf("getting object by path: %s", err.Error()) + ctx.JSON(http.StatusOK, Failed(errorcode.OperationFailed, "get object by path failed")) + return + } + + if len(obj) == 0 { + log.Warnf("object not found: %s", req.Path) + ctx.JSON(http.StatusOK, Failed(errorcode.DataNotFound, "object not found")) + return + } + + off := req.Offset + len := int64(-1) + if req.Length != nil { + len = *req.Length + } + + file, err := s.svc.ObjectSvc().Download(req.UserID, downloader.DownloadReqeust{ + ObjectID: obj[0].ObjectID, + Offset: off, + Length: len, + }) + if err != nil { + log.Warnf("downloading object: %s", err.Error()) + ctx.JSON(http.StatusOK, Failed(errorcode.OperationFailed, "download object failed")) + return + } + defer file.File.Close() + + ctx.Header("Content-Disposition", "attachment; filename="+url.PathEscape(path.Base(file.Object.Path))) + ctx.Header("Content-Type", "application/octet-stream") + ctx.Header("Content-Transfer-Encoding", "binary") + + n, err := io.Copy(ctx.Writer, file.File) + if err != nil { + log.Warnf("copying file: %s", err.Error()) + } + + if config.Cfg().StorageID > 0 { + s.svc.AccessStat.AddAccessCounter(file.Object.ObjectID, file.Object.PackageID, config.Cfg().StorageID, float64(n)/float64(file.Object.Size)) + } +} + func (s *ObjectService) UpdateInfo(ctx *gin.Context) { log := logger.WithField("HTTP", "Object.UpdateInfo") @@ -175,6 +230,43 @@ func (s *ObjectService) UpdateInfo(ctx *gin.Context) { ctx.JSON(http.StatusOK, OK(cdsapi.ObjectUpdateInfoResp{Successes: sucs})) } +func (s *ObjectService) UpdateInfoByPath(ctx *gin.Context) { + log := logger.WithField("HTTP", "Object.UpdateInfoByPath") + + var req cdsapi.ObjectUpdateInfoByPath + if err := ctx.ShouldBindJSON(&req); err != nil { + log.Warnf("binding body: %s", err.Error()) + ctx.JSON(http.StatusBadRequest, Failed(errorcode.BadArgument, "missing argument or invalid argument")) + return + } + + obj, err := s.svc.ObjectSvc().GetByPath(req.UserID, req.PackageID, req.Path, true) + if err != nil { + log.Warnf("getting object by path: %s", err.Error()) + ctx.JSON(http.StatusOK, Failed(errorcode.OperationFailed, "get object by path failed")) + return + } + if len(obj) == 0 { + log.Warnf("object not found: %s", req.Path) + ctx.JSON(http.StatusOK, Failed(errorcode.DataNotFound, "object not found")) + return + } + + sucs, err := s.svc.ObjectSvc().UpdateInfo(req.UserID, []cdsapi.UpdatingObject{{ + ObjectID: obj[0].ObjectID, + UpdateTime: req.UpdateTime, + }}) + if err != nil { + log.Warnf("updating objects: %s", err.Error()) + ctx.JSON(http.StatusOK, Failed(errorcode.OperationFailed, "update objects failed")) + return + } + if len(sucs) == 0 { + } + + ctx.JSON(http.StatusOK, OK(cdsapi.ObjectUpdateInfoByPathResp{})) +} + func (s *ObjectService) Move(ctx *gin.Context) { log := logger.WithField("HTTP", "Object.Move") @@ -215,6 +307,37 @@ func (s *ObjectService) Delete(ctx *gin.Context) { ctx.JSON(http.StatusOK, OK(nil)) } +func (s *ObjectService) DeleteByPath(ctx *gin.Context) { + log := logger.WithField("HTTP", "Object.DeleteByPath") + + var req cdsapi.ObjectDeleteByPath + if err := ctx.ShouldBindJSON(&req); err != nil { + log.Warnf("binding body: %s", err.Error()) + ctx.JSON(http.StatusBadRequest, Failed(errorcode.BadArgument, "missing argument or invalid argument")) + return + } + + obj, err := s.svc.ObjectSvc().GetByPath(req.UserID, req.PackageID, req.Path, false) + if err != nil { + log.Warnf("getting object by path: %s", err.Error()) + ctx.JSON(http.StatusOK, Failed(errorcode.OperationFailed, "get object by path failed")) + return + } + if len(obj) == 0 { + ctx.JSON(http.StatusOK, OK(nil)) + return + } + + err = s.svc.ObjectSvc().Delete(req.UserID, []cdssdk.ObjectID{obj[0].ObjectID}) + if err != nil { + log.Warnf("deleting objects: %s", err.Error()) + ctx.JSON(http.StatusOK, Failed(errorcode.OperationFailed, "delete objects failed")) + return + } + + ctx.JSON(http.StatusOK, OK(nil)) +} + func (s *ObjectService) GetPackageObjects(ctx *gin.Context) { log := logger.WithField("HTTP", "Object.GetPackageObjects") diff --git a/client/internal/http/package.go b/client/internal/http/package.go index c196e8a..5b9aaf0 100644 --- a/client/internal/http/package.go +++ b/client/internal/http/package.go @@ -39,7 +39,7 @@ func (s *PackageService) Get(ctx *gin.Context) { pkg, err := s.svc.PackageSvc().Get(req.UserID, req.PackageID) if err != nil { log.Warnf("getting package: %s", err.Error()) - ctx.JSON(http.StatusOK, Failed(errorcode.OperationFailed, "get package failed")) + ctx.JSON(http.StatusOK, FailedError(err)) return } @@ -59,7 +59,7 @@ func (s *PackageService) GetByName(ctx *gin.Context) { pkg, err := s.svc.PackageSvc().GetByName(req.UserID, req.BucketName, req.PackageName) if err != nil { log.Warnf("getting package by name: %s", err.Error()) - ctx.JSON(http.StatusOK, Failed(errorcode.OperationFailed, "get package by name failed")) + ctx.JSON(http.StatusOK, FailedError(err)) return } @@ -79,7 +79,7 @@ func (s *PackageService) Create(ctx *gin.Context) { pkg, err := s.svc.PackageSvc().Create(req.UserID, req.BucketID, req.Name) if err != nil { log.Warnf("creating package: %s", err.Error()) - ctx.JSON(http.StatusOK, Failed(errorcode.OperationFailed, "create package failed")) + ctx.JSON(http.StatusOK, FailedError(err)) return } diff --git a/client/internal/http/server.go b/client/internal/http/server.go index c1e70d1..de73e52 100644 --- a/client/internal/http/server.go +++ b/client/internal/http/server.go @@ -45,11 +45,14 @@ func (s *Server) initRouters() { rt.GET(cdsapi.ObjectListPath, s.Object().List) rt.GET(cdsapi.ObjectDownloadPath, s.Object().Download) + rt.GET(cdsapi.ObjectDownloadByPathPath, s.Object().DownloadByPath) rt.POST(cdsapi.ObjectUploadPath, s.Object().Upload) rt.GET(cdsapi.ObjectGetPackageObjectsPath, s.Object().GetPackageObjects) rt.POST(cdsapi.ObjectUpdateInfoPath, s.Object().UpdateInfo) + rt.POST(cdsapi.ObjectUpdateInfoByPathPath, s.Object().UpdateInfoByPath) rt.POST(cdsapi.ObjectMovePath, s.Object().Move) rt.POST(cdsapi.ObjectDeletePath, s.Object().Delete) + rt.POST(cdsapi.ObjectDeleteByPathPath, s.Object().DeleteByPath) rt.GET(cdsapi.PackageGetPath, s.Package().Get) rt.GET(cdsapi.PackageGetByNamePath, s.Package().GetByName) diff --git a/client/internal/services/bucket.go b/client/internal/services/bucket.go index fef9223..5a8e694 100644 --- a/client/internal/services/bucket.go +++ b/client/internal/services/bucket.go @@ -5,7 +5,6 @@ import ( cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" stgglb "gitlink.org.cn/cloudream/storage/common/globals" - "gitlink.org.cn/cloudream/storage/common/pkgs/db2/model" coormq "gitlink.org.cn/cloudream/storage/common/pkgs/mq/coordinator" ) @@ -23,27 +22,27 @@ func (svc *Service) BucketSvc() *BucketService { // userID: 用户的唯一标识 // bucketID: 桶的唯一标识 // 返回值: 桶的信息和可能发生的错误 -func (svc *BucketService) GetBucket(userID cdssdk.UserID, bucketID cdssdk.BucketID) (model.Bucket, error) { +func (svc *BucketService) GetBucket(userID cdssdk.UserID, bucketID cdssdk.BucketID) (cdssdk.Bucket, error) { // TODO: 此函数尚未实现 panic("not implement yet") } -func (svc *BucketService) GetBucketByName(userID cdssdk.UserID, bucketName string) (model.Bucket, error) { +func (svc *BucketService) GetBucketByName(userID cdssdk.UserID, bucketName string) (cdssdk.Bucket, error) { coorCli, err := stgglb.CoordinatorMQPool.Acquire() if err != nil { - return model.Bucket{}, fmt.Errorf("new coordinator client: %w", err) + return cdssdk.Bucket{}, fmt.Errorf("new coordinator client: %w", err) } defer stgglb.CoordinatorMQPool.Release(coorCli) resp, err := coorCli.GetBucketByName(coormq.ReqGetBucketByName(userID, bucketName)) if err != nil { - return model.Bucket{}, fmt.Errorf("get bucket by name failed, err: %w", err) + return cdssdk.Bucket{}, err } return resp.Bucket, nil } -func (svc *BucketService) GetUserBuckets(userID cdssdk.UserID) ([]model.Bucket, error) { +func (svc *BucketService) GetUserBuckets(userID cdssdk.UserID) ([]cdssdk.Bucket, error) { // 从CoordinatorMQPool中获取Coordinator客户端 coorCli, err := stgglb.CoordinatorMQPool.Acquire() if err != nil { @@ -64,7 +63,7 @@ func (svc *BucketService) GetUserBuckets(userID cdssdk.UserID) ([]model.Bucket, // userID: 用户的唯一标识 // bucketID: 桶的唯一标识 // 返回值: 桶的所有包列表和可能发生的错误 -func (svc *BucketService) GetBucketPackages(userID cdssdk.UserID, bucketID cdssdk.BucketID) ([]model.Package, error) { +func (svc *BucketService) GetBucketPackages(userID cdssdk.UserID, bucketID cdssdk.BucketID) ([]cdssdk.Package, error) { // 获取Coordinator客户端 coorCli, err := stgglb.CoordinatorMQPool.Acquire() if err != nil { @@ -91,7 +90,7 @@ func (svc *BucketService) CreateBucket(userID cdssdk.UserID, bucketName string) // 请求Coordinator创建新桶 resp, err := coorCli.CreateBucket(coormq.NewCreateBucket(userID, bucketName)) if err != nil { - return cdssdk.Bucket{}, fmt.Errorf("creating bucket: %w", err) + return cdssdk.Bucket{}, err } return resp.Bucket, nil diff --git a/client/internal/services/object.go b/client/internal/services/object.go index 051ede7..d5f2208 100644 --- a/client/internal/services/object.go +++ b/client/internal/services/object.go @@ -22,14 +22,14 @@ func (svc *Service) ObjectSvc() *ObjectService { return &ObjectService{Service: svc} } -func (svc *ObjectService) List(userID cdssdk.UserID, pkgID cdssdk.PackageID, pathPrefix string) ([]cdssdk.Object, error) { +func (svc *ObjectService) GetByPath(userID cdssdk.UserID, pkgID cdssdk.PackageID, path string, isPrefix bool) ([]cdssdk.Object, error) { coorCli, err := stgglb.CoordinatorMQPool.Acquire() if err != nil { return nil, fmt.Errorf("new coordinator client: %w", err) } defer stgglb.CoordinatorMQPool.Release(coorCli) - listResp, err := coorCli.GetObjectsWithPrefix(coormq.ReqGetObjectsWithPrefix(userID, pkgID, pathPrefix)) + listResp, err := coorCli.GetObjectsByPath(coormq.ReqGetObjectsByPath(userID, pkgID, path, isPrefix)) if err != nil { return nil, fmt.Errorf("requsting to coodinator: %w", err) } diff --git a/client/internal/services/package.go b/client/internal/services/package.go index d1fdf5a..4991565 100644 --- a/client/internal/services/package.go +++ b/client/internal/services/package.go @@ -30,7 +30,7 @@ func (svc *PackageService) Get(userID cdssdk.UserID, packageID cdssdk.PackageID) // 向协调器请求获取包信息 getResp, err := coorCli.GetPackage(coormq.NewGetPackage(userID, packageID)) if err != nil { - return nil, fmt.Errorf("requsting to coodinator: %w", err) + return nil, err } return &getResp.Package, nil @@ -77,7 +77,7 @@ func (svc *PackageService) Create(userID cdssdk.UserID, bucketID cdssdk.BucketID // 向协调器发送创建包的请求 resp, err := coorCli.CreatePackage(coormq.NewCreatePackage(userID, bucketID, name)) if err != nil { - return cdssdk.Package{}, fmt.Errorf("creating package: %w", err) + return cdssdk.Package{}, err } return resp.Package, nil diff --git a/common/pkgs/db2/bucket.go b/common/pkgs/db2/bucket.go index cd5f691..d3ea7d5 100644 --- a/common/pkgs/db2/bucket.go +++ b/common/pkgs/db2/bucket.go @@ -96,7 +96,7 @@ func (db *BucketDB) Create(ctx SQLContext, userID cdssdk.UserID, bucketName stri } if bucketID > 0 { - return 0, fmt.Errorf("bucket name exists") + return 0, gorm.ErrDuplicatedKey } newBucket := cdssdk.Bucket{Name: bucketName, CreatorID: userID} diff --git a/common/pkgs/db2/object.go b/common/pkgs/db2/object.go index 96d2042..7a7f02c 100644 --- a/common/pkgs/db2/object.go +++ b/common/pkgs/db2/object.go @@ -26,6 +26,12 @@ func (db *ObjectDB) GetByID(ctx SQLContext, objectID cdssdk.ObjectID) (model.Obj return ret, err } +func (db *ObjectDB) GetByPath(ctx SQLContext, packageID cdssdk.PackageID, path string) ([]cdssdk.Object, error) { + var ret []cdssdk.Object + err := ctx.Table("Object").Where("PackageID = ? AND Path = ?", packageID, path).Find(&ret).Error + return ret, err +} + func (db *ObjectDB) GetWithPathPrefix(ctx SQLContext, packageID cdssdk.PackageID, pathPrefix string) ([]cdssdk.Object, error) { var ret []cdssdk.Object err := ctx.Table("Object").Where("PackageID = ? AND Path LIKE ?", packageID, pathPrefix+"%").Order("ObjectID ASC").Find(&ret).Error diff --git a/common/pkgs/db2/package.go b/common/pkgs/db2/package.go index ee6dec5..37a7012 100644 --- a/common/pkgs/db2/package.go +++ b/common/pkgs/db2/package.go @@ -1,7 +1,6 @@ package db2 import ( - "errors" "fmt" "gorm.io/gorm" @@ -122,7 +121,7 @@ func (db *PackageDB) Create(ctx SQLContext, bucketID cdssdk.BucketID, name strin return 0, err } if packageID != 0 { - return 0, errors.New("package already exists") + return 0, gorm.ErrDuplicatedKey } newPackage := cdssdk.Package{Name: name, BucketID: bucketID, State: cdssdk.PackageStateNormal} diff --git a/common/pkgs/mq/coordinator/object.go b/common/pkgs/mq/coordinator/object.go index 2c224df..a792d13 100644 --- a/common/pkgs/mq/coordinator/object.go +++ b/common/pkgs/mq/coordinator/object.go @@ -10,7 +10,7 @@ import ( ) type ObjectService interface { - GetObjectsWithPrefix(msg *GetObjectsWithPrefix) (*GetObjectsWithPrefixResp, *mq.CodeMessage) + GetObjectsByPath(msg *GetObjectsByPath) (*GetObjectsByPathResp, *mq.CodeMessage) GetPackageObjects(msg *GetPackageObjects) (*GetPackageObjectsResp, *mq.CodeMessage) @@ -32,33 +32,35 @@ type ObjectService interface { } // 查询指定前缀的Object,返回的Objects会按照ObjectID升序 -var _ = Register(Service.GetObjectsWithPrefix) +var _ = Register(Service.GetObjectsByPath) -type GetObjectsWithPrefix struct { +type GetObjectsByPath struct { mq.MessageBodyBase - UserID cdssdk.UserID `json:"userID"` - PackageID cdssdk.PackageID `json:"packageID"` - PathPrefix string `json:"pathPrefix"` + UserID cdssdk.UserID `json:"userID"` + PackageID cdssdk.PackageID `json:"packageID"` + Path string `json:"path"` + IsPrefix bool `json:"isPrefix"` } -type GetObjectsWithPrefixResp struct { +type GetObjectsByPathResp struct { mq.MessageBodyBase Objects []model.Object `json:"objects"` } -func ReqGetObjectsWithPrefix(userID cdssdk.UserID, packageID cdssdk.PackageID, pathPrefix string) *GetObjectsWithPrefix { - return &GetObjectsWithPrefix{ - UserID: userID, - PackageID: packageID, - PathPrefix: pathPrefix, +func ReqGetObjectsByPath(userID cdssdk.UserID, packageID cdssdk.PackageID, path string, isPrefix bool) *GetObjectsByPath { + return &GetObjectsByPath{ + UserID: userID, + PackageID: packageID, + Path: path, + IsPrefix: isPrefix, } } -func RespGetObjectsWithPrefix(objects []model.Object) *GetObjectsWithPrefixResp { - return &GetObjectsWithPrefixResp{ +func RespGetObjectsByPath(objects []model.Object) *GetObjectsByPathResp { + return &GetObjectsByPathResp{ Objects: objects, } } -func (client *Client) GetObjectsWithPrefix(msg *GetObjectsWithPrefix) (*GetObjectsWithPrefixResp, error) { - return mq.Request(Service.GetObjectsWithPrefix, client.rabbitCli, msg) +func (client *Client) GetObjectsByPath(msg *GetObjectsByPath) (*GetObjectsByPathResp, error) { + return mq.Request(Service.GetObjectsByPath, client.rabbitCli, msg) } // 查询Package中的所有Object,返回的Objects会按照ObjectID升序 diff --git a/coordinator/internal/mq/bucket.go b/coordinator/internal/mq/bucket.go index f4bd7e0..965640d 100644 --- a/coordinator/internal/mq/bucket.go +++ b/coordinator/internal/mq/bucket.go @@ -1,9 +1,11 @@ package mq import ( + "errors" "fmt" "gitlink.org.cn/cloudream/storage/common/pkgs/db2" + "gorm.io/gorm" "gitlink.org.cn/cloudream/common/consts/errorcode" "gitlink.org.cn/cloudream/common/pkgs/logger" @@ -24,7 +26,12 @@ func (svc *Service) GetBucketByName(msg *coormq.GetBucketByName) (*coormq.GetBuc logger.WithField("UserID", msg.UserID). WithField("Name", msg.Name). Warnf("getting bucket by name: %s", err.Error()) - return nil, mq.Failed(errorcode.OperationFailed, "get bucket by name failed") + + if errors.Is(err, gorm.ErrRecordNotFound) { + return nil, mq.Failed(errorcode.DataNotFound, "bucket not found") + } + + return nil, mq.Failed(errorcode.OperationFailed, err.Error()) } return mq.ReplyOK(coormq.RespGetBucketByName(bucket)) @@ -36,7 +43,7 @@ func (svc *Service) GetUserBuckets(msg *coormq.GetUserBuckets) (*coormq.GetUserB if err != nil { logger.WithField("UserID", msg.UserID). Warnf("get user buckets failed, err: %s", err.Error()) - return nil, mq.Failed(errorcode.OperationFailed, "get all buckets failed") + return nil, mq.Failed(errorcode.OperationFailed, err.Error()) } return mq.ReplyOK(coormq.NewGetUserBucketsResp(buckets)) @@ -78,7 +85,12 @@ func (svc *Service) CreateBucket(msg *coormq.CreateBucket) (*coormq.CreateBucket logger.WithField("UserID", msg.UserID). WithField("BucketName", msg.BucketName). Warn(err.Error()) - return nil, mq.Failed(errorcode.OperationFailed, "create bucket failed") + + if errors.Is(err, gorm.ErrDuplicatedKey) { + return nil, mq.Failed(errorcode.DataExists, "bucket name already exists") + } + + return nil, mq.Failed(errorcode.OperationFailed, err.Error()) } return mq.ReplyOK(coormq.NewCreateBucketResp(bucket)) diff --git a/coordinator/internal/mq/object.go b/coordinator/internal/mq/object.go index f22b563..ad02731 100644 --- a/coordinator/internal/mq/object.go +++ b/coordinator/internal/mq/object.go @@ -1,10 +1,11 @@ package mq import ( - "database/sql" + "errors" "fmt" "gitlink.org.cn/cloudream/storage/common/pkgs/db2" + "gorm.io/gorm" "github.com/samber/lo" "gitlink.org.cn/cloudream/common/consts/errorcode" @@ -17,7 +18,7 @@ import ( coormq "gitlink.org.cn/cloudream/storage/common/pkgs/mq/coordinator" ) -func (svc *Service) GetObjectsWithPrefix(msg *coormq.GetObjectsWithPrefix) (*coormq.GetObjectsWithPrefixResp, *mq.CodeMessage) { +func (svc *Service) GetObjectsByPath(msg *coormq.GetObjectsByPath) (*coormq.GetObjectsByPathResp, *mq.CodeMessage) { var objs []cdssdk.Object err := svc.db2.DoTx(func(tx db2.SQLContext) error { var err error @@ -27,19 +28,26 @@ func (svc *Service) GetObjectsWithPrefix(msg *coormq.GetObjectsWithPrefix) (*coo return fmt.Errorf("getting package by id: %w", err) } - objs, err = svc.db2.Object().GetWithPathPrefix(tx, msg.PackageID, msg.PathPrefix) - if err != nil { - return fmt.Errorf("getting objects with prefix: %w", err) + if msg.IsPrefix { + objs, err = svc.db2.Object().GetWithPathPrefix(tx, msg.PackageID, msg.Path) + if err != nil { + return fmt.Errorf("getting objects with prefix: %w", err) + } + } else { + objs, err = svc.db2.Object().GetByPath(tx, msg.PackageID, msg.Path) + if err != nil { + return fmt.Errorf("getting object by path: %w", err) + } } return nil }) if err != nil { - logger.WithField("PathPrefix", msg.PathPrefix).Warn(err.Error()) + logger.WithField("PathPrefix", msg.Path).Warn(err.Error()) return nil, mq.Failed(errorcode.OperationFailed, "get objects with prefix failed") } - return mq.ReplyOK(coormq.RespGetObjectsWithPrefix(objs)) + return mq.ReplyOK(coormq.RespGetObjectsByPath(objs)) } func (svc *Service) GetPackageObjects(msg *coormq.GetPackageObjects) (*coormq.GetPackageObjectsResp, *mq.CodeMessage) { @@ -335,7 +343,7 @@ func (svc *Service) checkPackageChangedObjects(tx db2.SQLContext, userID cdssdk. var willUpdateObjs []cdssdk.Object for _, pkg := range packages { _, err := svc.db2.Package().GetUserPackage(tx, userID, pkg.PackageID) - if err == sql.ErrNoRows { + if errors.Is(err, gorm.ErrRecordNotFound) { continue } if err != nil { @@ -381,7 +389,7 @@ func (svc *Service) checkPathChangedObjects(tx db2.SQLContext, userID cdssdk.Use } _, err := svc.db2.Package().GetUserPackage(tx, userID, objs[0].PackageID) - if err == sql.ErrNoRows { + if errors.Is(err, gorm.ErrRecordNotFound) { return nil, nil } if err != nil { diff --git a/coordinator/internal/mq/package.go b/coordinator/internal/mq/package.go index a399339..6ad4ba6 100644 --- a/coordinator/internal/mq/package.go +++ b/coordinator/internal/mq/package.go @@ -1,11 +1,12 @@ package mq import ( - "database/sql" + "errors" "fmt" "sort" "gitlink.org.cn/cloudream/storage/common/pkgs/db2" + "gorm.io/gorm" "gitlink.org.cn/cloudream/common/consts/errorcode" "gitlink.org.cn/cloudream/common/pkgs/logger" @@ -20,6 +21,10 @@ func (svc *Service) GetPackage(msg *coormq.GetPackage) (*coormq.GetPackageResp, logger.WithField("PackageID", msg.PackageID). Warnf("get package: %s", err.Error()) + if errors.Is(err, gorm.ErrRecordNotFound) { + return nil, mq.Failed(errorcode.DataNotFound, "package not found") + } + return nil, mq.Failed(errorcode.OperationFailed, "get package failed") } @@ -34,7 +39,7 @@ func (svc *Service) GetPackageByName(msg *coormq.GetPackageByName) (*coormq.GetP WithField("PackageName", msg.PackageName). Warnf("get package by name: %s", err.Error()) - if err == sql.ErrNoRows { + if errors.Is(err, gorm.ErrRecordNotFound) { return nil, mq.Failed(errorcode.DataNotFound, "package not found") } @@ -70,6 +75,11 @@ func (svc *Service) CreatePackage(msg *coormq.CreatePackage) (*coormq.CreatePack logger.WithField("BucketID", msg.BucketID). WithField("Name", msg.Name). Warn(err.Error()) + + if errors.Is(err, gorm.ErrDuplicatedKey) { + return nil, mq.Failed(errorcode.DataExists, "package already exists") + } + return nil, mq.Failed(errorcode.OperationFailed, err.Error()) } diff --git a/coordinator/internal/mq/storage.go b/coordinator/internal/mq/storage.go index f903e5d..61ad71c 100644 --- a/coordinator/internal/mq/storage.go +++ b/coordinator/internal/mq/storage.go @@ -1,7 +1,7 @@ package mq import ( - "database/sql" + "errors" "fmt" "gitlink.org.cn/cloudream/common/consts/errorcode" @@ -92,7 +92,7 @@ func (svc *Service) GetStorageByName(msg *coormq.GetStorageByName) (*coormq.GetS if err != nil { logger.Warnf("getting user storage by name: %s", err.Error()) - if err == sql.ErrNoRows { + if errors.Is(err, gorm.ErrRecordNotFound) { return nil, mq.Failed(errorcode.DataNotFound, "storage not found") }