From 24856fd23fdb9205b79b00c480caebcc32a133e2 Mon Sep 17 00:00:00 2001 From: Sydonian <794346190@qq.com> Date: Thu, 28 Mar 2024 09:35:52 +0800 Subject: [PATCH 1/5] =?UTF-8?q?=E5=A2=9E=E5=8A=A0=E6=9B=B4=E6=96=B0Object?= =?UTF-8?q?=E5=85=83=E6=95=B0=E6=8D=AE=E3=80=81=E8=8E=B7=E5=8F=96Package?= =?UTF-8?q?=E5=88=97=E8=A1=A8=E7=AD=89=E6=8E=A5=E5=8F=A3?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- client/internal/http/bucket.go | 22 +++ client/internal/http/object.go | 57 +++++-- client/internal/http/package.go | 79 ++++----- client/internal/http/server.go | 10 +- client/internal/services/bucket.go | 2 - client/internal/services/object.go | 30 ++++ client/internal/services/package.go | 15 ++ common/pkgs/db/object.go | 52 ++++-- common/pkgs/mq/coordinator/object.go | 82 +++++++-- coordinator/internal/mq/object.go | 157 +++++++++++++++++- .../event/check_package_redundancy.go | 44 ++--- scanner/internal/event/clean_pinned.go | 26 +-- 12 files changed, 449 insertions(+), 127 deletions(-) diff --git a/client/internal/http/bucket.go b/client/internal/http/bucket.go index 7e749b0..ed944f2 100644 --- a/client/internal/http/bucket.go +++ b/client/internal/http/bucket.go @@ -59,3 +59,25 @@ func (s *BucketService) Delete(ctx *gin.Context) { ctx.JSON(http.StatusOK, OK(nil)) } + +func (s *BucketService) ListUserBuckets(ctx *gin.Context) { + log := logger.WithField("HTTP", "Bucket.ListUserBuckets") + + var req cdssdk.BucketListUserBucketsReq + if err := ctx.ShouldBindQuery(&req); err != nil { + log.Warnf("binding query: %s", err.Error()) + ctx.JSON(http.StatusBadRequest, Failed(errorcode.BadArgument, "missing argument or invalid argument")) + return + } + + buckets, err := s.svc.BucketSvc().GetUserBuckets(req.UserID) + if err != nil { + log.Warnf("getting user buckets: %s", err.Error()) + ctx.JSON(http.StatusOK, Failed(errorcode.OperationFailed, "get user buckets failed")) + return + } + + ctx.JSON(http.StatusOK, OK(cdssdk.BucketListUserBucketsResp{ + Buckets: buckets, + })) +} diff --git a/client/internal/http/object.go b/client/internal/http/object.go index d7dba18..fb2c1f1 100644 --- a/client/internal/http/object.go +++ b/client/internal/http/object.go @@ -71,22 +71,17 @@ func (s *ObjectService) Upload(ctx *gin.Context) { } } -type ObjectDownloadReq struct { - UserID *cdssdk.UserID `form:"userID" binding:"required"` - ObjectID *cdssdk.ObjectID `form:"objectID" binding:"required"` -} - func (s *ObjectService) Download(ctx *gin.Context) { log := logger.WithField("HTTP", "Object.Download") - var req ObjectDownloadReq + var req cdssdk.ObjectDownloadReq if err := ctx.ShouldBindQuery(&req); err != nil { log.Warnf("binding body: %s", err.Error()) ctx.JSON(http.StatusBadRequest, Failed(errorcode.BadArgument, "missing argument or invalid argument")) return } - file, err := s.svc.ObjectSvc().Download(*req.UserID, *req.ObjectID) + file, err := s.svc.ObjectSvc().Download(req.UserID, req.ObjectID) if err != nil { log.Warnf("downloading object: %s", err.Error()) ctx.JSON(http.StatusOK, Failed(errorcode.OperationFailed, "download object failed")) @@ -124,28 +119,62 @@ func (s *ObjectService) Download(ctx *gin.Context) { }) } -type GetPackageObjectsReq struct { - UserID *cdssdk.UserID `form:"userID" binding:"required"` - PackageID *cdssdk.PackageID `form:"packageID" binding:"required"` +func (s *ObjectService) UpdateInfo(ctx *gin.Context) { + log := logger.WithField("HTTP", "Object.UpdateInfo") + + var req cdssdk.ObjectUpdateInfoReq + if err := ctx.ShouldBindJSON(&req); err != nil { + log.Warnf("binding body: %s", err.Error()) + ctx.JSON(http.StatusBadRequest, Failed(errorcode.BadArgument, "missing argument or invalid argument")) + return + } + + err := s.svc.ObjectSvc().UpdateInfo(req.UserID, req.Updatings) + if err != nil { + log.Warnf("updating objects: %s", err.Error()) + ctx.JSON(http.StatusOK, Failed(errorcode.OperationFailed, "update objects failed")) + return + } + + ctx.JSON(http.StatusOK, OK(nil)) +} + +func (s *ObjectService) Delete(ctx *gin.Context) { + log := logger.WithField("HTTP", "Object.Delete") + + var req cdssdk.ObjectDeleteReq + if err := ctx.ShouldBindJSON(&req); err != nil { + log.Warnf("binding body: %s", err.Error()) + ctx.JSON(http.StatusBadRequest, Failed(errorcode.BadArgument, "missing argument or invalid argument")) + return + } + + err := s.svc.ObjectSvc().Delete(req.UserID, req.ObjectIDs) + if err != nil { + log.Warnf("deleting objects: %s", err.Error()) + ctx.JSON(http.StatusOK, Failed(errorcode.OperationFailed, "delete objects failed")) + return + } + + ctx.JSON(http.StatusOK, OK(nil)) } -type GetPackageObjectsResp = cdssdk.ObjectGetPackageObjectsResp func (s *ObjectService) GetPackageObjects(ctx *gin.Context) { log := logger.WithField("HTTP", "Object.GetPackageObjects") - var req GetPackageObjectsReq + var req cdssdk.ObjectGetPackageObjectsReq if err := ctx.ShouldBindQuery(&req); err != nil { log.Warnf("binding body: %s", err.Error()) ctx.JSON(http.StatusBadRequest, Failed(errorcode.BadArgument, "missing argument or invalid argument")) return } - objs, err := s.svc.ObjectSvc().GetPackageObjects(*req.UserID, *req.PackageID) + objs, err := s.svc.ObjectSvc().GetPackageObjects(req.UserID, req.PackageID) if err != nil { log.Warnf("getting package objects: %s", err.Error()) ctx.JSON(http.StatusOK, Failed(errorcode.OperationFailed, "get package object failed")) return } - ctx.JSON(http.StatusOK, OK(GetPackageObjectsResp{Objects: objs})) + ctx.JSON(http.StatusOK, OK(cdssdk.ObjectGetPackageObjectsResp{Objects: objs})) } diff --git a/client/internal/http/package.go b/client/internal/http/package.go index cdaf3e0..c18559d 100644 --- a/client/internal/http/package.go +++ b/client/internal/http/package.go @@ -10,7 +10,6 @@ import ( "gitlink.org.cn/cloudream/common/pkgs/logger" cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" - "gitlink.org.cn/cloudream/storage/common/pkgs/db/model" stgiter "gitlink.org.cn/cloudream/storage/common/pkgs/iterator" ) @@ -24,32 +23,24 @@ func (s *Server) Package() *PackageService { } } -type PackageGetReq struct { - UserID *cdssdk.UserID `form:"userID" binding:"required"` - PackageID *cdssdk.PackageID `form:"packageID" binding:"required"` -} -type PackageGetResp struct { - model.Package -} - func (s *PackageService) Get(ctx *gin.Context) { log := logger.WithField("HTTP", "Package.Get") - var req PackageGetReq + var req cdssdk.PackageGetReq if err := ctx.ShouldBindQuery(&req); err != nil { log.Warnf("binding body: %s", err.Error()) ctx.JSON(http.StatusBadRequest, Failed(errorcode.BadArgument, "missing argument or invalid argument")) return } - pkg, err := s.svc.PackageSvc().Get(*req.UserID, *req.PackageID) + pkg, err := s.svc.PackageSvc().Get(req.UserID, req.PackageID) if err != nil { log.Warnf("getting package: %s", err.Error()) ctx.JSON(http.StatusOK, Failed(errorcode.OperationFailed, "get package failed")) return } - ctx.JSON(http.StatusOK, OK(PackageGetResp{Package: *pkg})) + ctx.JSON(http.StatusOK, OK(cdssdk.PackageGetResp{Package: *pkg})) } func (s *PackageService) Create(ctx *gin.Context) { @@ -73,22 +64,17 @@ func (s *PackageService) Create(ctx *gin.Context) { })) } -type PackageDeleteReq struct { - UserID *cdssdk.UserID `json:"userID" binding:"required"` - PackageID *cdssdk.PackageID `json:"packageID" binding:"required"` -} - func (s *PackageService) Delete(ctx *gin.Context) { log := logger.WithField("HTTP", "Package.Delete") - var req PackageDeleteReq + var req cdssdk.PackageDeleteReq if err := ctx.ShouldBindJSON(&req); err != nil { log.Warnf("binding body: %s", err.Error()) ctx.JSON(http.StatusBadRequest, Failed(errorcode.BadArgument, "missing argument or invalid argument")) return } - err := s.svc.PackageSvc().DeletePackage(*req.UserID, *req.PackageID) + err := s.svc.PackageSvc().DeletePackage(req.UserID, req.PackageID) if err != nil { log.Warnf("deleting package: %s", err.Error()) ctx.JSON(http.StatusOK, Failed(errorcode.OperationFailed, "delete package failed")) @@ -98,61 +84,66 @@ func (s *PackageService) Delete(ctx *gin.Context) { ctx.JSON(http.StatusOK, OK(nil)) } -type GetCachedNodesReq struct { - UserID *cdssdk.UserID `json:"userID" binding:"required"` - PackageID *cdssdk.PackageID `json:"packageID" binding:"required"` -} -type GetCachedNodesResp struct { - cdssdk.PackageCachingInfo +func (s *PackageService) ListBucketPackages(ctx *gin.Context) { + log := logger.WithField("HTTP", "Package.ListBucketPackages") + + var req cdssdk.PackageListBucketPackagesReq + if err := ctx.ShouldBindQuery(&req); err != nil { + log.Warnf("binding query: %s", err.Error()) + ctx.JSON(http.StatusBadRequest, Failed(errorcode.BadArgument, "missing argument or invalid argument")) + return + } + + pkgs, err := s.svc.PackageSvc().GetBucketPackages(req.UserID, req.BucketID) + if err != nil { + log.Warnf("getting bucket packages: %s", err.Error()) + ctx.JSON(http.StatusOK, Failed(errorcode.OperationFailed, "get bucket packages failed")) + return + } + + ctx.JSON(http.StatusOK, OK(cdssdk.PackageListBucketPackagesResp{ + Packages: pkgs, + })) } func (s *PackageService) GetCachedNodes(ctx *gin.Context) { log := logger.WithField("HTTP", "Package.GetCachedNodes") - var req GetCachedNodesReq - if err := ctx.ShouldBindJSON(&req); err != nil { - log.Warnf("binding body: %s", err.Error()) + var req cdssdk.PackageGetCachedNodesReq + if err := ctx.ShouldBindQuery(&req); err != nil { + log.Warnf("binding query: %s", err.Error()) ctx.JSON(http.StatusBadRequest, Failed(errorcode.BadArgument, "missing argument or invalid argument")) return } - resp, err := s.svc.PackageSvc().GetCachedNodes(*req.UserID, *req.PackageID) + resp, err := s.svc.PackageSvc().GetCachedNodes(req.UserID, req.PackageID) if err != nil { log.Warnf("get package cached nodes failed: %s", err.Error()) ctx.JSON(http.StatusOK, Failed(errorcode.OperationFailed, "get package cached nodes failed")) return } - ctx.JSON(http.StatusOK, OK(GetCachedNodesResp{resp})) -} - -type GetLoadedNodesReq struct { - UserID *cdssdk.UserID `json:"userID" binding:"required"` - PackageID *cdssdk.PackageID `json:"packageID" binding:"required"` -} - -type GetLoadedNodesResp struct { - NodeIDs []cdssdk.NodeID `json:"nodeIDs"` + ctx.JSON(http.StatusOK, OK(cdssdk.PackageGetCachedNodesResp{PackageCachingInfo: resp})) } func (s *PackageService) GetLoadedNodes(ctx *gin.Context) { log := logger.WithField("HTTP", "Package.GetLoadedNodes") - var req GetLoadedNodesReq - if err := ctx.ShouldBindJSON(&req); err != nil { - log.Warnf("binding body: %s", err.Error()) + var req cdssdk.PackageGetLoadedNodesReq + if err := ctx.ShouldBindQuery(&req); err != nil { + log.Warnf("binding query: %s", err.Error()) ctx.JSON(http.StatusBadRequest, Failed(errorcode.BadArgument, "missing argument or invalid argument")) return } - nodeIDs, err := s.svc.PackageSvc().GetLoadedNodes(*req.UserID, *req.PackageID) + nodeIDs, err := s.svc.PackageSvc().GetLoadedNodes(req.UserID, req.PackageID) if err != nil { log.Warnf("get package loaded nodes failed: %s", err.Error()) ctx.JSON(http.StatusOK, Failed(errorcode.OperationFailed, "get package loaded nodes failed")) return } - ctx.JSON(http.StatusOK, OK(GetLoadedNodesResp{ + ctx.JSON(http.StatusOK, OK(cdssdk.PackageGetLoadedNodesResp{ NodeIDs: nodeIDs, })) } diff --git a/client/internal/http/server.go b/client/internal/http/server.go index 462f76a..ce4e276 100644 --- a/client/internal/http/server.go +++ b/client/internal/http/server.go @@ -42,12 +42,15 @@ func (s *Server) initRouters() { s.engine.GET(cdssdk.ObjectDownloadPath, s.Object().Download) s.engine.POST(cdssdk.ObjectUploadPath, s.Object().Upload) s.engine.GET(cdssdk.ObjectGetPackageObjectsPath, s.Object().GetPackageObjects) + s.engine.POST(cdssdk.ObjectUpdateInfoPath, s.Object().UpdateInfo) + s.engine.POST(cdssdk.ObjectDeletePath, s.Object().Delete) s.engine.GET(cdssdk.PackageGetPath, s.Package().Get) s.engine.POST(cdssdk.PackageCreatePath, s.Package().Create) - s.engine.POST("/package/delete", s.Package().Delete) - s.engine.GET("/package/getCachedNodes", s.Package().GetCachedNodes) - s.engine.GET("/package/getLoadedNodes", s.Package().GetLoadedNodes) + s.engine.POST(cdssdk.PackageDeletePath, s.Package().Delete) + s.engine.GET(cdssdk.PackageListBucketPackagesPath, s.Package().ListBucketPackages) + s.engine.GET(cdssdk.PackageGetCachedNodesPath, s.Package().GetCachedNodes) + s.engine.GET(cdssdk.PackageGetLoadedNodesPath, s.Package().GetLoadedNodes) s.engine.POST("/storage/loadPackage", s.Storage().LoadPackage) s.engine.POST("/storage/createPackage", s.Storage().CreatePackage) @@ -57,4 +60,5 @@ func (s *Server) initRouters() { s.engine.POST(cdssdk.BucketCreatePath, s.Bucket().Create) s.engine.POST(cdssdk.BucketDeletePath, s.Bucket().Delete) + s.engine.GET(cdssdk.BucketListUserBucketsPath, s.Bucket().ListUserBuckets) } diff --git a/client/internal/services/bucket.go b/client/internal/services/bucket.go index 4e297ce..d402b54 100644 --- a/client/internal/services/bucket.go +++ b/client/internal/services/bucket.go @@ -74,8 +74,6 @@ func (svc *BucketService) DeleteBucket(userID cdssdk.UserID, bucketID cdssdk.Buc } defer stgglb.CoordinatorMQPool.Release(coorCli) - // TODO 检查用户是否有删除这个Bucket的权限。检查的时候可以只上UserBucket的Read锁 - _, err = coorCli.DeleteBucket(coormq.NewDeleteBucket(userID, bucketID)) if err != nil { return fmt.Errorf("request to coordinator failed, err: %w", err) diff --git a/client/internal/services/object.go b/client/internal/services/object.go index 0dc7374..f4f24ab 100644 --- a/client/internal/services/object.go +++ b/client/internal/services/object.go @@ -35,10 +35,40 @@ func (svc *ObjectService) WaitUploading(taskID string, waitTimeout time.Duration return false, nil, nil } +func (svc *ObjectService) UpdateInfo(userID cdssdk.UserID, updatings []cdssdk.UpdatingObject) error { + coorCli, err := stgglb.CoordinatorMQPool.Acquire() + if err != nil { + return fmt.Errorf("new coordinator client: %w", err) + } + defer stgglb.CoordinatorMQPool.Release(coorCli) + + _, err = coorCli.UpdateObjectInfos(coormq.ReqUpdateObjectInfos(userID, updatings)) + if err != nil { + return fmt.Errorf("requsting to coodinator: %w", err) + } + + return nil +} + func (svc *ObjectService) Download(userID cdssdk.UserID, objectID cdssdk.ObjectID) (io.ReadCloser, error) { panic("not implement yet!") } +func (svc *ObjectService) Delete(userID cdssdk.UserID, objectIDs []cdssdk.ObjectID) error { + coorCli, err := stgglb.CoordinatorMQPool.Acquire() + if err != nil { + return fmt.Errorf("new coordinator client: %w", err) + } + defer stgglb.CoordinatorMQPool.Release(coorCli) + + _, err = coorCli.DeleteObjects(coormq.ReqDeleteObjects(userID, objectIDs)) + if err != nil { + return fmt.Errorf("requsting to coodinator: %w", err) + } + + return nil +} + func (svc *ObjectService) GetPackageObjects(userID cdssdk.UserID, packageID cdssdk.PackageID) ([]model.Object, error) { coorCli, err := stgglb.CoordinatorMQPool.Acquire() if err != nil { diff --git a/client/internal/services/package.go b/client/internal/services/package.go index 9667356..405d53f 100644 --- a/client/internal/services/package.go +++ b/client/internal/services/package.go @@ -34,6 +34,21 @@ func (svc *PackageService) Get(userID cdssdk.UserID, packageID cdssdk.PackageID) return &getResp.Package, nil } +func (svc *PackageService) GetBucketPackages(userID cdssdk.UserID, bucketID cdssdk.BucketID) ([]cdssdk.Package, error) { + coorCli, err := stgglb.CoordinatorMQPool.Acquire() + if err != nil { + return nil, fmt.Errorf("new coordinator client: %w", err) + } + defer stgglb.CoordinatorMQPool.Release(coorCli) + + getResp, err := coorCli.GetBucketPackages(coormq.NewGetBucketPackages(userID, bucketID)) + if err != nil { + return nil, fmt.Errorf("requsting to coodinator: %w", err) + } + + return getResp.Packages, nil +} + func (svc *PackageService) Create(userID cdssdk.UserID, bucketID cdssdk.BucketID, name string) (cdssdk.PackageID, error) { coorCli, err := stgglb.CoordinatorMQPool.Acquire() if err != nil { diff --git a/common/pkgs/db/object.go b/common/pkgs/db/object.go index 3655c98..ec9a669 100644 --- a/common/pkgs/db/object.go +++ b/common/pkgs/db/object.go @@ -26,25 +26,46 @@ func (db *ObjectDB) GetByID(ctx SQLContext, objectID cdssdk.ObjectID) (model.Obj return ret.ToObject(), err } -func (db *ObjectDB) BatchGetPackageObjectIDs(ctx SQLContext, pkgID cdssdk.PackageID, pathes []string) ([]cdssdk.ObjectID, error) { +func (db *ObjectDB) BatchGet(ctx SQLContext, objectIDs []cdssdk.ObjectID) ([]model.Object, error) { + if len(objectIDs) == 0 { + return nil, nil + } + + // TODO In语句 + stmt, args, err := sqlx.In("select * from Object where ObjectID in (?) order by ObjectID asc", objectIDs) + if err != nil { + return nil, err + } + stmt = ctx.Rebind(stmt) + + objs := make([]model.TempObject, 0, len(objectIDs)) + err = sqlx.Select(ctx, &objs, stmt, args...) + if err != nil { + return nil, err + } + + return lo.Map(objs, func(o model.TempObject, idx int) cdssdk.Object { return o.ToObject() }), nil +} + +func (db *ObjectDB) BatchByPackagePath(ctx SQLContext, pkgID cdssdk.PackageID, pathes []string) ([]cdssdk.Object, error) { if len(pathes) == 0 { return nil, nil } // TODO In语句 - stmt, args, err := sqlx.In("select ObjectID from Object force index(PackagePath) where PackageID=? and Path in (?)", pkgID, pathes) + stmt, args, err := sqlx.In("select * from Object force index(PackagePath) where PackageID=? and Path in (?)", pkgID, pathes) if err != nil { return nil, err } stmt = ctx.Rebind(stmt) - objIDs := make([]cdssdk.ObjectID, 0, len(pathes)) - err = sqlx.Select(ctx, &objIDs, stmt, args...) + objs := make([]model.TempObject, 0, len(pathes)) + err = sqlx.Select(ctx, &objs, stmt, args...) if err != nil { return nil, err } - return objIDs, nil + return lo.Map(objs, func(o model.TempObject, idx int) cdssdk.Object { return o.ToObject() }), nil } func (db *ObjectDB) Create(ctx SQLContext, obj cdssdk.Object) (cdssdk.ObjectID, error) { @@ -63,8 +84,8 @@ func (db *ObjectDB) Create(ctx SQLContext, obj cdssdk.Object) (cdssdk.ObjectID, return cdssdk.ObjectID(objectID), nil } -// 可以用于批量创建或者更新记录 -// 用于创建时,需要额外检查PackageID+Path的唯一性 +// 可以用于批量创建或者更新记录。 +// 用于创建时,需要额外检查PackageID+Path的唯一性。 // 用于更新时,需要额外检查现存的PackageID+Path对应的ObjectID是否与待更新的ObjectID相同。不会更新CreateTime。 func (db *ObjectDB) BatchCreateOrUpdate(ctx SQLContext, objs []cdssdk.Object) error { if len(objs) == 0 { @@ -163,17 +184,22 @@ func (db *ObjectDB) BatchAdd(ctx SQLContext, packageID cdssdk.PackageID, adds [] for _, add := range adds { pathes = append(pathes, add.Path) } - objIDs, err := db.BatchGetPackageObjectIDs(ctx, packageID, pathes) + // 这里可以不用检查查询结果是否与pathes的数量相同 + addedObjs, err := db.BatchByPackagePath(ctx, packageID, pathes) if err != nil { return nil, fmt.Errorf("batch get object ids: %w", err) } + addedObjIDs := make([]cdssdk.ObjectID, len(addedObjs)) + for i := range addedObjs { + addedObjIDs[i] = addedObjs[i].ObjectID + } - err = db.ObjectBlock().BatchDeleteByObjectID(ctx, objIDs) + err = db.ObjectBlock().BatchDeleteByObjectID(ctx, addedObjIDs) if err != nil { return nil, fmt.Errorf("batch delete object blocks: %w", err) } - err = db.PinnedObject().BatchDeleteByObjectID(ctx, objIDs) + err = db.PinnedObject().BatchDeleteByObjectID(ctx, addedObjIDs) if err != nil { return nil, fmt.Errorf("batch delete pinned objects: %w", err) } @@ -181,7 +207,7 @@ func (db *ObjectDB) BatchAdd(ctx SQLContext, packageID cdssdk.PackageID, adds [] objBlocks := make([]stgmod.ObjectBlock, 0, len(adds)) for i, add := range adds { objBlocks = append(objBlocks, stgmod.ObjectBlock{ - ObjectID: objIDs[i], + ObjectID: addedObjIDs[i], Index: 0, NodeID: add.NodeID, FileHash: add.FileHash, @@ -206,10 +232,10 @@ func (db *ObjectDB) BatchAdd(ctx SQLContext, packageID cdssdk.PackageID, adds [] return nil, fmt.Errorf("batch create caches: %w", err) } - return objIDs, nil + return addedObjIDs, nil } -func (db *ObjectDB) BatchUpdateRedundancy(ctx SQLContext, objs []coormq.ChangeObjectRedundancyEntry) error { +func (db *ObjectDB) BatchUpdateRedundancy(ctx SQLContext, objs []coormq.UpdatingObjectRedundancy) error { if len(objs) == 0 { return nil } diff --git a/common/pkgs/mq/coordinator/object.go b/common/pkgs/mq/coordinator/object.go index 45b5b24..5ec380d 100644 --- a/common/pkgs/mq/coordinator/object.go +++ b/common/pkgs/mq/coordinator/object.go @@ -13,7 +13,11 @@ type ObjectService interface { GetPackageObjectDetails(msg *GetPackageObjectDetails) (*GetPackageObjectDetailsResp, *mq.CodeMessage) - ChangeObjectRedundancy(msg *ChangeObjectRedundancy) (*ChangeObjectRedundancyResp, *mq.CodeMessage) + UpdateObjectRedundancy(msg *UpdateObjectRedundancy) (*UpdateObjectRedundancyResp, *mq.CodeMessage) + + UpdateObjectInfos(msg *UpdateObjectInfos) (*UpdateObjectInfosResp, *mq.CodeMessage) + + DeleteObjects(msg *DeleteObjects) (*DeleteObjectsResp, *mq.CodeMessage) } // 查询Package中的所有Object,返回的Objects会按照ObjectID升序 @@ -71,30 +75,82 @@ func (client *Client) GetPackageObjectDetails(msg *GetPackageObjectDetails) (*Ge } // 更新Object的冗余方式 -var _ = Register(Service.ChangeObjectRedundancy) +var _ = Register(Service.UpdateObjectRedundancy) -type ChangeObjectRedundancy struct { +type UpdateObjectRedundancy struct { mq.MessageBodyBase - Entries []ChangeObjectRedundancyEntry `json:"entries"` + Updatings []UpdatingObjectRedundancy `json:"updatings"` } -type ChangeObjectRedundancyResp struct { +type UpdateObjectRedundancyResp struct { mq.MessageBodyBase } -type ChangeObjectRedundancyEntry struct { +type UpdatingObjectRedundancy struct { ObjectID cdssdk.ObjectID `json:"objectID" db:"ObjectID"` Redundancy cdssdk.Redundancy `json:"redundancy" db:"Redundancy"` PinnedAt []cdssdk.NodeID `json:"pinnedAt"` Blocks []stgmod.ObjectBlock `json:"blocks"` } -func ReqChangeObjectRedundancy(entries []ChangeObjectRedundancyEntry) *ChangeObjectRedundancy { - return &ChangeObjectRedundancy{ - Entries: entries, +func ReqUpdateObjectRedundancy(updatings []UpdatingObjectRedundancy) *UpdateObjectRedundancy { + return &UpdateObjectRedundancy{ + Updatings: updatings, + } +} +func RespUpdateObjectRedundancy() *UpdateObjectRedundancyResp { + return &UpdateObjectRedundancyResp{} +} +func (client *Client) UpdateObjectRedundancy(msg *UpdateObjectRedundancy) (*UpdateObjectRedundancyResp, error) { + return mq.Request(Service.UpdateObjectRedundancy, client.rabbitCli, msg) +} + +// 更新Object元数据 +var _ = Register(Service.UpdateObjectInfos) + +type UpdateObjectInfos struct { + mq.MessageBodyBase + UserID cdssdk.UserID `json:"userID"` + Updatings []cdssdk.UpdatingObject `json:"updatings"` +} + +type UpdateObjectInfosResp struct { + mq.MessageBodyBase +} + +func ReqUpdateObjectInfos(userID cdssdk.UserID, updatings []cdssdk.UpdatingObject) *UpdateObjectInfos { + return &UpdateObjectInfos{ + UserID: userID, + Updatings: updatings, + } +} +func RespUpdateObjectInfos() *UpdateObjectInfosResp { + return &UpdateObjectInfosResp{} +} +func (client *Client) UpdateObjectInfos(msg *UpdateObjectInfos) (*UpdateObjectInfosResp, error) { + return mq.Request(Service.UpdateObjectInfos, client.rabbitCli, msg) +} + +// 删除Object +var _ = Register(Service.DeleteObjects) + +type DeleteObjects struct { + mq.MessageBodyBase + UserID cdssdk.UserID `json:"userID"` + ObjectIDs []cdssdk.ObjectID `json:"objectIDs"` +} + +type DeleteObjectsResp struct { + mq.MessageBodyBase +} + +func ReqDeleteObjects(userID cdssdk.UserID, objectIDs []cdssdk.ObjectID) *DeleteObjects { + return &DeleteObjects{ + UserID: userID, + ObjectIDs: objectIDs, } } -func RespChangeObjectRedundancy() *ChangeObjectRedundancyResp { - return &ChangeObjectRedundancyResp{} +func RespDeleteObjects() *DeleteObjectsResp { + return &DeleteObjectsResp{} } -func (client *Client) ChangeObjectRedundancy(msg *ChangeObjectRedundancy) (*ChangeObjectRedundancyResp, error) { - return mq.Request(Service.ChangeObjectRedundancy, client.rabbitCli, msg) +func (client *Client) DeleteObjects(msg *DeleteObjects) (*DeleteObjectsResp, error) { + return mq.Request(Service.DeleteObjects, client.rabbitCli, msg) } diff --git a/coordinator/internal/mq/object.go b/coordinator/internal/mq/object.go index 2af1ccc..b3171eb 100644 --- a/coordinator/internal/mq/object.go +++ b/coordinator/internal/mq/object.go @@ -5,9 +5,12 @@ import ( "fmt" "github.com/jmoiron/sqlx" + "github.com/samber/lo" "gitlink.org.cn/cloudream/common/consts/errorcode" "gitlink.org.cn/cloudream/common/pkgs/logger" "gitlink.org.cn/cloudream/common/pkgs/mq" + cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" + "gitlink.org.cn/cloudream/common/utils/sort2" stgmod "gitlink.org.cn/cloudream/storage/common/models" coormq "gitlink.org.cn/cloudream/storage/common/pkgs/mq/coordinator" ) @@ -51,14 +54,162 @@ func (svc *Service) GetPackageObjectDetails(msg *coormq.GetPackageObjectDetails) return mq.ReplyOK(coormq.NewGetPackageObjectDetailsResp(details)) } -func (svc *Service) ChangeObjectRedundancy(msg *coormq.ChangeObjectRedundancy) (*coormq.ChangeObjectRedundancyResp, *mq.CodeMessage) { +func (svc *Service) UpdateObjectRedundancy(msg *coormq.UpdateObjectRedundancy) (*coormq.UpdateObjectRedundancyResp, *mq.CodeMessage) { err := svc.db.DoTx(sql.LevelSerializable, func(tx *sqlx.Tx) error { - return svc.db.Object().BatchUpdateRedundancy(tx, msg.Entries) + return svc.db.Object().BatchUpdateRedundancy(tx, msg.Updatings) }) if err != nil { logger.Warnf("batch updating redundancy: %s", err.Error()) return nil, mq.Failed(errorcode.OperationFailed, "batch update redundancy failed") } - return mq.ReplyOK(coormq.RespChangeObjectRedundancy()) + return mq.ReplyOK(coormq.RespUpdateObjectRedundancy()) +} + +func (svc *Service) UpdateObjectInfos(msg *coormq.UpdateObjectInfos) (*coormq.UpdateObjectInfosResp, *mq.CodeMessage) { + err := svc.db.DoTx(sql.LevelSerializable, func(tx *sqlx.Tx) error { + msg.Updatings = sort2.Sort(msg.Updatings, func(o1, o2 cdssdk.UpdatingObject) int { + return sort2.Cmp(o1.ObjectID, o2.ObjectID) + }) + + objIDs := make([]cdssdk.ObjectID, len(msg.Updatings)) + for i, obj := range msg.Updatings { + objIDs[i] = obj.ObjectID + } + + oldObjs, err := svc.db.Object().BatchGet(tx, objIDs) + if err != nil { + return fmt.Errorf("batch getting objects: %w", err) + } + oldObjIDs := make([]cdssdk.ObjectID, len(oldObjs)) + for i, obj := range oldObjs { + oldObjIDs[i] = obj.ObjectID + } + + avaiUpdatings, notExistsObjs := pickByObjectIDs(msg.Updatings, oldObjIDs) + if len(notExistsObjs) > 0 { + // TODO 部分对象已经不存在 + } + + // 筛选出PackageID变化、Path变化的对象,这两种对象要检测改变后是否有冲突 + // 否则,直接更新即可 + //var pkgIDChangedObjs []cdssdk.Object + //var pathChangedObjs []cdssdk.Object + //var infoChangedObjs []cdssdk.Object + //for i := range willUpdateObjs { + // if willUpdateObjs[i].PackageID != oldObjs[i].PackageID { + // newObj := oldObjs[i] + // willUpdateObjs[i].ApplyTo(&newObj) + // pkgIDChangedObjs = append(pkgIDChangedObjs, newObj) + // } else if willUpdateObjs[i].Path != oldObjs[i].Path { + // newObj := oldObjs[i] + // willUpdateObjs[i].ApplyTo(&newObj) + // pathChangedObjs = append(pathChangedObjs, newObj) + // } else { + // newObj := oldObjs[i] + // willUpdateObjs[i].ApplyTo(&newObj) + // infoChangedObjs = append(infoChangedObjs, newObj) + // } + //} + + newObjs := make([]cdssdk.Object, len(avaiUpdatings)) + for i := range newObjs { + newObj := oldObjs[i] + avaiUpdatings[i].ApplyTo(&newObj) + } + + err = svc.db.Object().BatchCreateOrUpdate(tx, newObjs) + if err != nil { + return fmt.Errorf("batch create or update: %w", err) + } + + return nil + }) + + if err != nil { + logger.Warnf("batch updating objects: %s", err.Error()) + return nil, mq.Failed(errorcode.OperationFailed, "batch update objects failed") + } + + return mq.ReplyOK(coormq.RespUpdateObjectInfos()) +} + +func pickByObjectIDs(objs []cdssdk.UpdatingObject, objIDs []cdssdk.ObjectID) (pickedObjs []cdssdk.UpdatingObject, notFoundObjs []cdssdk.ObjectID) { + objIdx := 0 + IDIdx := 0 + + for IDIdx < len(objIDs) { + if objs[objIdx].ObjectID == objIDs[IDIdx] { + pickedObjs = append(pickedObjs, objs[objIdx]) + IDIdx++ + objIdx++ + } else if objs[objIdx].ObjectID < objIDs[IDIdx] { + objIdx++ + } else { + notFoundObjs = append(notFoundObjs, objIDs[IDIdx]) + IDIdx++ + } + } + + return +} + +func (svc *Service) ensurePackageChangedObjects(tx *sqlx.Tx, objs []cdssdk.Object) ([]cdssdk.Object, error) { + type PackageObjects struct { + PackageID cdssdk.PackageID + ObjectByPath map[string]*cdssdk.Object + } + + packages := make(map[cdssdk.PackageID]*PackageObjects) + for _, obj := range objs { + pkg, ok := packages[obj.PackageID] + if !ok { + pkg = &PackageObjects{ + PackageID: obj.PackageID, + ObjectByPath: make(map[string]*cdssdk.Object), + } + packages[obj.PackageID] = pkg + } + + if pkg.ObjectByPath[obj.Path] == nil { + o := obj + pkg.ObjectByPath[obj.Path] = &o + } else { + // TODO 有冲突 + } + } + + var willUpdateObjs []cdssdk.Object + for _, pkg := range packages { + existsObjs, err := svc.db.Object().BatchByPackagePath(tx, pkg.PackageID, lo.Keys(pkg.ObjectByPath)) + if err != nil { + return nil, fmt.Errorf("batch getting objects by package path: %w", err) + } + + for _, obj := range existsObjs { + pkg.ObjectByPath[obj.Path] = nil + } + + for _, obj := range pkg.ObjectByPath { + if obj == nil { + continue + } + willUpdateObjs = append(willUpdateObjs, *obj) + } + + } + + return willUpdateObjs, nil +} + +func (svc *Service) DeleteObjects(msg *coormq.DeleteObjects) (*coormq.DeleteObjectsResp, *mq.CodeMessage) { + err := svc.db.DoTx(sql.LevelSerializable, func(tx *sqlx.Tx) error { + return svc.db.Object().BatchDelete(tx, msg.ObjectIDs) + }) + if err != nil { + logger.Warnf("batch deleting objects: %s", err.Error()) + return nil, mq.Failed(errorcode.OperationFailed, "batch delete objects failed") + } + + return mq.ReplyOK(coormq.RespDeleteObjects()) } diff --git a/scanner/internal/event/check_package_redundancy.go b/scanner/internal/event/check_package_redundancy.go index 0a6eee0..880aa26 100644 --- a/scanner/internal/event/check_package_redundancy.go +++ b/scanner/internal/event/check_package_redundancy.go @@ -109,7 +109,7 @@ func (t *CheckPackageRedundancy) Execute(execCtx ExecuteContext) { } } - var changedObjects []coormq.ChangeObjectRedundancyEntry + var changedObjects []coormq.UpdatingObjectRedundancy defRep := cdssdk.DefaultRepRedundancy defEC := cdssdk.DefaultECRedundancy @@ -136,7 +136,7 @@ func (t *CheckPackageRedundancy) Execute(execCtx ExecuteContext) { defer mutex.Unlock() for _, obj := range getObjs.Objects { - var entry *coormq.ChangeObjectRedundancyEntry + var updating *coormq.UpdatingObjectRedundancy var err error shouldUseEC := obj.Object.Size > config.Cfg().ECFileSizeThreshold @@ -145,32 +145,32 @@ func (t *CheckPackageRedundancy) Execute(execCtx ExecuteContext) { case *cdssdk.NoneRedundancy: if shouldUseEC { log.WithField("ObjectID", obj.Object.ObjectID).Debugf("redundancy: none -> ec") - entry, err = t.noneToEC(obj, &defEC, newECNodes) + updating, err = t.noneToEC(obj, &defEC, newECNodes) } else { log.WithField("ObjectID", obj.Object.ObjectID).Debugf("redundancy: none -> rep") - entry, err = t.noneToRep(obj, &defRep, newRepNodes) + updating, err = t.noneToRep(obj, &defRep, newRepNodes) } case *cdssdk.RepRedundancy: if shouldUseEC { log.WithField("ObjectID", obj.Object.ObjectID).Debugf("redundancy: rep -> ec") - entry, err = t.repToEC(obj, &defEC, newECNodes) + updating, err = t.repToEC(obj, &defEC, newECNodes) } else { - entry, err = t.repToRep(obj, &defRep, rechoosedRepNodes) + updating, err = t.repToRep(obj, &defRep, rechoosedRepNodes) } case *cdssdk.ECRedundancy: if shouldUseEC { uploadNodes := t.rechooseNodesForEC(obj, red, allNodes) - entry, err = t.ecToEC(obj, red, &defEC, uploadNodes) + updating, err = t.ecToEC(obj, red, &defEC, uploadNodes) } else { log.WithField("ObjectID", obj.Object.ObjectID).Debugf("redundancy: ec -> rep") - entry, err = t.ecToRep(obj, red, &defRep, newRepNodes) + updating, err = t.ecToRep(obj, red, &defRep, newRepNodes) } } - if entry != nil { - changedObjects = append(changedObjects, *entry) + if updating != nil { + changedObjects = append(changedObjects, *updating) } if err != nil { @@ -182,7 +182,7 @@ func (t *CheckPackageRedundancy) Execute(execCtx ExecuteContext) { return } - _, err = coorCli.ChangeObjectRedundancy(coormq.ReqChangeObjectRedundancy(changedObjects)) + _, err = coorCli.UpdateObjectRedundancy(coormq.ReqUpdateObjectRedundancy(changedObjects)) if err != nil { log.Warnf("requesting to change object redundancy: %s", err.Error()) return @@ -367,7 +367,7 @@ func (t *CheckPackageRedundancy) chooseSoManyNodes(count int, nodes []*NodeLoadI return chosen } -func (t *CheckPackageRedundancy) noneToRep(obj stgmod.ObjectDetail, red *cdssdk.RepRedundancy, uploadNodes []*NodeLoadInfo) (*coormq.ChangeObjectRedundancyEntry, error) { +func (t *CheckPackageRedundancy) noneToRep(obj stgmod.ObjectDetail, red *cdssdk.RepRedundancy, uploadNodes []*NodeLoadInfo) (*coormq.UpdatingObjectRedundancy, error) { if len(obj.Blocks) == 0 { return nil, fmt.Errorf("object is not cached on any nodes, cannot change its redundancy to rep") } @@ -389,14 +389,14 @@ func (t *CheckPackageRedundancy) noneToRep(obj stgmod.ObjectDetail, red *cdssdk. }) } - return &coormq.ChangeObjectRedundancyEntry{ + return &coormq.UpdatingObjectRedundancy{ ObjectID: obj.Object.ObjectID, Redundancy: red, Blocks: blocks, }, nil } -func (t *CheckPackageRedundancy) noneToEC(obj stgmod.ObjectDetail, red *cdssdk.ECRedundancy, uploadNodes []*NodeLoadInfo) (*coormq.ChangeObjectRedundancyEntry, error) { +func (t *CheckPackageRedundancy) noneToEC(obj stgmod.ObjectDetail, red *cdssdk.ECRedundancy, uploadNodes []*NodeLoadInfo) (*coormq.UpdatingObjectRedundancy, error) { coorCli, err := stgglb.CoordinatorMQPool.Acquire() if err != nil { return nil, fmt.Errorf("new coordinator client: %w", err) @@ -443,14 +443,14 @@ func (t *CheckPackageRedundancy) noneToEC(obj stgmod.ObjectDetail, red *cdssdk.E }) } - return &coormq.ChangeObjectRedundancyEntry{ + return &coormq.UpdatingObjectRedundancy{ ObjectID: obj.Object.ObjectID, Redundancy: red, Blocks: blocks, }, nil } -func (t *CheckPackageRedundancy) repToRep(obj stgmod.ObjectDetail, red *cdssdk.RepRedundancy, uploadNodes []*NodeLoadInfo) (*coormq.ChangeObjectRedundancyEntry, error) { +func (t *CheckPackageRedundancy) repToRep(obj stgmod.ObjectDetail, red *cdssdk.RepRedundancy, uploadNodes []*NodeLoadInfo) (*coormq.UpdatingObjectRedundancy, error) { if len(obj.Blocks) == 0 { return nil, fmt.Errorf("object is not cached on any nodes, cannot change its redundancy to rep") } @@ -479,18 +479,18 @@ func (t *CheckPackageRedundancy) repToRep(obj stgmod.ObjectDetail, red *cdssdk.R }) } - return &coormq.ChangeObjectRedundancyEntry{ + return &coormq.UpdatingObjectRedundancy{ ObjectID: obj.Object.ObjectID, Redundancy: red, Blocks: blocks, }, nil } -func (t *CheckPackageRedundancy) repToEC(obj stgmod.ObjectDetail, red *cdssdk.ECRedundancy, uploadNodes []*NodeLoadInfo) (*coormq.ChangeObjectRedundancyEntry, error) { +func (t *CheckPackageRedundancy) repToEC(obj stgmod.ObjectDetail, red *cdssdk.ECRedundancy, uploadNodes []*NodeLoadInfo) (*coormq.UpdatingObjectRedundancy, error) { return t.noneToEC(obj, red, uploadNodes) } -func (t *CheckPackageRedundancy) ecToRep(obj stgmod.ObjectDetail, srcRed *cdssdk.ECRedundancy, tarRed *cdssdk.RepRedundancy, uploadNodes []*NodeLoadInfo) (*coormq.ChangeObjectRedundancyEntry, error) { +func (t *CheckPackageRedundancy) ecToRep(obj stgmod.ObjectDetail, srcRed *cdssdk.ECRedundancy, tarRed *cdssdk.RepRedundancy, uploadNodes []*NodeLoadInfo) (*coormq.UpdatingObjectRedundancy, error) { coorCli, err := stgglb.CoordinatorMQPool.Acquire() if err != nil { return nil, fmt.Errorf("new coordinator client: %w", err) @@ -556,14 +556,14 @@ func (t *CheckPackageRedundancy) ecToRep(obj stgmod.ObjectDetail, srcRed *cdssdk }) } - return &coormq.ChangeObjectRedundancyEntry{ + return &coormq.UpdatingObjectRedundancy{ ObjectID: obj.Object.ObjectID, Redundancy: tarRed, Blocks: blocks, }, nil } -func (t *CheckPackageRedundancy) ecToEC(obj stgmod.ObjectDetail, srcRed *cdssdk.ECRedundancy, tarRed *cdssdk.ECRedundancy, uploadNodes []*NodeLoadInfo) (*coormq.ChangeObjectRedundancyEntry, error) { +func (t *CheckPackageRedundancy) ecToEC(obj stgmod.ObjectDetail, srcRed *cdssdk.ECRedundancy, tarRed *cdssdk.ECRedundancy, uploadNodes []*NodeLoadInfo) (*coormq.UpdatingObjectRedundancy, error) { coorCli, err := stgglb.CoordinatorMQPool.Acquire() if err != nil { return nil, fmt.Errorf("new coordinator client: %w", err) @@ -654,7 +654,7 @@ func (t *CheckPackageRedundancy) ecToEC(obj stgmod.ObjectDetail, srcRed *cdssdk. newBlocks[idx].FileHash = v.(string) } - return &coormq.ChangeObjectRedundancyEntry{ + return &coormq.UpdatingObjectRedundancy{ ObjectID: obj.Object.ObjectID, Redundancy: tarRed, Blocks: newBlocks, diff --git a/scanner/internal/event/clean_pinned.go b/scanner/internal/event/clean_pinned.go index 67e4599..9b30fc0 100644 --- a/scanner/internal/event/clean_pinned.go +++ b/scanner/internal/event/clean_pinned.go @@ -109,7 +109,7 @@ func (t *CleanPinned) Execute(execCtx ExecuteContext) { pinPlans := make(map[cdssdk.NodeID]*[]string) // 对于rep对象,统计出所有对象块分布最多的两个节点,用这两个节点代表所有rep对象块的分布,去进行退火算法 - var repObjectsUpdateEntries []coormq.ChangeObjectRedundancyEntry + var repObjectsUpdating []coormq.UpdatingObjectRedundancy repMostNodeIDs := t.summaryRepObjectBlockNodes(repObjects) solu := t.startAnnealing(allNodeInfos, readerNodeIDs, annealingObject{ totalBlockCount: 1, @@ -118,11 +118,11 @@ func (t *CleanPinned) Execute(execCtx ExecuteContext) { blocks: nil, }) for _, obj := range repObjects { - repObjectsUpdateEntries = append(repObjectsUpdateEntries, t.makePlansForRepObject(solu, obj, pinPlans)) + repObjectsUpdating = append(repObjectsUpdating, t.makePlansForRepObject(solu, obj, pinPlans)) } // 对于ec对象,则每个对象单独进行退火算法 - var ecObjectsUpdateEntries []coormq.ChangeObjectRedundancyEntry + var ecObjectsUpdating []coormq.UpdatingObjectRedundancy for _, obj := range ecObjects { ecRed := obj.Object.Redundancy.(*cdssdk.ECRedundancy) solu := t.startAnnealing(allNodeInfos, readerNodeIDs, annealingObject{ @@ -131,7 +131,7 @@ func (t *CleanPinned) Execute(execCtx ExecuteContext) { pinnedAt: obj.PinnedAt, blocks: obj.Blocks, }) - ecObjectsUpdateEntries = append(ecObjectsUpdateEntries, t.makePlansForECObject(allNodeInfos, solu, obj, &planBld)) + ecObjectsUpdating = append(ecObjectsUpdating, t.makePlansForECObject(allNodeInfos, solu, obj, &planBld)) } ioSwRets, err := t.executePlans(execCtx, pinPlans, &planBld) @@ -141,13 +141,13 @@ func (t *CleanPinned) Execute(execCtx ExecuteContext) { } // 根据按照方案进行调整的结果,填充更新元数据的命令 - for i := range ecObjectsUpdateEntries { - t.populateECObjectEntry(&ecObjectsUpdateEntries[i], ecObjects[i], ioSwRets) + for i := range ecObjectsUpdating { + t.populateECObjectEntry(&ecObjectsUpdating[i], ecObjects[i], ioSwRets) } - finalEntries := append(repObjectsUpdateEntries, ecObjectsUpdateEntries...) + finalEntries := append(repObjectsUpdating, ecObjectsUpdating...) if len(finalEntries) > 0 { - _, err = coorCli.ChangeObjectRedundancy(coormq.ReqChangeObjectRedundancy(finalEntries)) + _, err = coorCli.UpdateObjectRedundancy(coormq.ReqUpdateObjectRedundancy(finalEntries)) if err != nil { log.Warnf("changing object redundancy: %s", err.Error()) return @@ -715,8 +715,8 @@ func (t *CleanPinned) alwaysAccept(curTemp float64, dScore float64, coolingRate return v > rand.Float64() } -func (t *CleanPinned) makePlansForRepObject(solu annealingSolution, obj stgmod.ObjectDetail, pinPlans map[cdssdk.NodeID]*[]string) coormq.ChangeObjectRedundancyEntry { - entry := coormq.ChangeObjectRedundancyEntry{ +func (t *CleanPinned) makePlansForRepObject(solu annealingSolution, obj stgmod.ObjectDetail, pinPlans map[cdssdk.NodeID]*[]string) coormq.UpdatingObjectRedundancy { + entry := coormq.UpdatingObjectRedundancy{ ObjectID: obj.Object.ObjectID, Redundancy: obj.Object.Redundancy, } @@ -748,8 +748,8 @@ func (t *CleanPinned) makePlansForRepObject(solu annealingSolution, obj stgmod.O return entry } -func (t *CleanPinned) makePlansForECObject(allNodeInfos map[cdssdk.NodeID]*cdssdk.Node, solu annealingSolution, obj stgmod.ObjectDetail, planBld *plans.PlanBuilder) coormq.ChangeObjectRedundancyEntry { - entry := coormq.ChangeObjectRedundancyEntry{ +func (t *CleanPinned) makePlansForECObject(allNodeInfos map[cdssdk.NodeID]*cdssdk.Node, solu annealingSolution, obj stgmod.ObjectDetail, planBld *plans.PlanBuilder) coormq.UpdatingObjectRedundancy { + entry := coormq.UpdatingObjectRedundancy{ ObjectID: obj.Object.ObjectID, Redundancy: obj.Object.Redundancy, } @@ -871,7 +871,7 @@ func (t *CleanPinned) executePlans(execCtx ExecuteContext, pinPlans map[cdssdk.N return ioSwRets, nil } -func (t *CleanPinned) populateECObjectEntry(entry *coormq.ChangeObjectRedundancyEntry, obj stgmod.ObjectDetail, ioRets map[string]any) { +func (t *CleanPinned) populateECObjectEntry(entry *coormq.UpdatingObjectRedundancy, obj stgmod.ObjectDetail, ioRets map[string]any) { for i := range entry.Blocks { if entry.Blocks[i].FileHash != "" { continue From dbf33cd7f2550c95ae34f7289ef393f133fff9a5 Mon Sep 17 00:00:00 2001 From: Sydonian <794346190@qq.com> Date: Thu, 28 Mar 2024 16:45:37 +0800 Subject: [PATCH 2/5] =?UTF-8?q?=E5=AE=8C=E5=96=84=E4=B8=8B=E8=BD=BDObject?= =?UTF-8?q?=E6=8E=A5=E5=8F=A3?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- client/internal/services/object.go | 28 ++++++- common/pkgs/db/object_block.go | 20 +++++ common/pkgs/db/pinned_object.go | 20 +++++ common/pkgs/mq/coordinator/object.go | 28 +++++++ coordinator/internal/mq/object.go | 106 ++++++++++++++++++++++++--- 5 files changed, 190 insertions(+), 12 deletions(-) diff --git a/client/internal/services/object.go b/client/internal/services/object.go index f4f24ab..5e3e568 100644 --- a/client/internal/services/object.go +++ b/client/internal/services/object.go @@ -8,6 +8,7 @@ import ( cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" mytask "gitlink.org.cn/cloudream/storage/client/internal/task" stgglb "gitlink.org.cn/cloudream/storage/common/globals" + stgmod "gitlink.org.cn/cloudream/storage/common/models" "gitlink.org.cn/cloudream/storage/common/pkgs/db/model" "gitlink.org.cn/cloudream/storage/common/pkgs/iterator" coormq "gitlink.org.cn/cloudream/storage/common/pkgs/mq/coordinator" @@ -51,7 +52,32 @@ func (svc *ObjectService) UpdateInfo(userID cdssdk.UserID, updatings []cdssdk.Up } func (svc *ObjectService) Download(userID cdssdk.UserID, objectID cdssdk.ObjectID) (io.ReadCloser, error) { - panic("not implement yet!") + coorCli, err := stgglb.CoordinatorMQPool.Acquire() + if err != nil { + return nil, fmt.Errorf("new coordinator client: %w", err) + } + defer stgglb.CoordinatorMQPool.Release(coorCli) + + resp, err := coorCli.GetObjectDetails(coormq.ReqGetObjectDetails([]cdssdk.ObjectID{objectID})) + if err != nil { + return nil, fmt.Errorf("requesting to coordinator") + } + + if resp.Objects[0] == nil { + return nil, fmt.Errorf("object not found") + } + + iter := iterator.NewDownloadObjectIterator([]stgmod.ObjectDetail{*resp.Objects[0]}, &iterator.DownloadContext{ + Distlock: svc.DistLock, + }) + defer iter.Close() + + downloading, err := iter.MoveNext() + if err != nil { + return nil, err + } + + return downloading.File, nil } func (svc *ObjectService) Delete(userID cdssdk.UserID, objectIDs []cdssdk.ObjectID) error { diff --git a/common/pkgs/db/object_block.go b/common/pkgs/db/object_block.go index a0638e8..3863ab4 100644 --- a/common/pkgs/db/object_block.go +++ b/common/pkgs/db/object_block.go @@ -24,6 +24,26 @@ func (db *ObjectBlockDB) GetByNodeID(ctx SQLContext, nodeID cdssdk.NodeID) ([]st return rets, err } +func (db *ObjectBlockDB) BatchGetByObjectID(ctx SQLContext, objectIDs []cdssdk.ObjectID) ([]stgmod.ObjectBlock, error) { + if len(objectIDs) == 0 { + return nil, nil + } + + stmt, args, err := sqlx.In("select * from ObjectBlock where ObjectID in (?) order by ObjectID, `Index` asc", objectIDs) + if err != nil { + return nil, err + } + stmt = ctx.Rebind(stmt) + + var blocks []stgmod.ObjectBlock + err = sqlx.Select(ctx, &blocks, stmt, args...) + if err != nil { + return nil, err + } + + return blocks, nil +} + func (db *ObjectBlockDB) Create(ctx SQLContext, objectID cdssdk.ObjectID, index int, nodeID cdssdk.NodeID, fileHash string) error { _, err := ctx.Exec("insert into ObjectBlock values(?,?,?,?)", objectID, index, nodeID, fileHash) return err diff --git a/common/pkgs/db/pinned_object.go b/common/pkgs/db/pinned_object.go index d3c7d7b..79f0236 100644 --- a/common/pkgs/db/pinned_object.go +++ b/common/pkgs/db/pinned_object.go @@ -34,6 +34,26 @@ func (*PinnedObjectDB) Create(ctx SQLContext, nodeID cdssdk.NodeID, objectID cds return err } +func (*PinnedObjectDB) BatchGetByObjectID(ctx SQLContext, objectIDs []cdssdk.ObjectID) ([]cdssdk.PinnedObject, error) { + if len(objectIDs) == 0 { + return nil, nil + } + + stmt, args, err := sqlx.In("select * from PinnedObject where ObjectID in (?) order by ObjectID asc", objectIDs) + if err != nil { + return nil, err + } + stmt = ctx.Rebind(stmt) + + var pinneds []cdssdk.PinnedObject + err = sqlx.Select(ctx, &pinneds, stmt, args...) + if err != nil { + return nil, err + } + + return pinneds, nil +} + func (*PinnedObjectDB) TryCreate(ctx SQLContext, nodeID cdssdk.NodeID, objectID cdssdk.ObjectID, createTime time.Time) error { _, err := ctx.Exec("insert ignore into PinnedObject values(?,?,?)", nodeID, objectID, createTime) return err diff --git a/common/pkgs/mq/coordinator/object.go b/common/pkgs/mq/coordinator/object.go index 5ec380d..12fcac5 100644 --- a/common/pkgs/mq/coordinator/object.go +++ b/common/pkgs/mq/coordinator/object.go @@ -13,6 +13,8 @@ type ObjectService interface { GetPackageObjectDetails(msg *GetPackageObjectDetails) (*GetPackageObjectDetailsResp, *mq.CodeMessage) + GetObjectDetails(msg *GetObjectDetails) (*GetObjectDetailsResp, *mq.CodeMessage) + UpdateObjectRedundancy(msg *UpdateObjectRedundancy) (*UpdateObjectRedundancyResp, *mq.CodeMessage) UpdateObjectInfos(msg *UpdateObjectInfos) (*UpdateObjectInfosResp, *mq.CodeMessage) @@ -74,6 +76,32 @@ func (client *Client) GetPackageObjectDetails(msg *GetPackageObjectDetails) (*Ge return mq.Request(Service.GetPackageObjectDetails, client.rabbitCli, msg) } +// 获取多个Object以及它们的分块详细信息,返回的Objects会按照ObjectID升序。 +var _ = Register(Service.GetObjectDetails) + +type GetObjectDetails struct { + mq.MessageBodyBase + ObjectIDs []cdssdk.ObjectID `json:"objectIDs"` +} +type GetObjectDetailsResp struct { + mq.MessageBodyBase + Objects []*stgmod.ObjectDetail `json:"objects"` // 如果没有查询到某个ID对应的信息,则此数组对应位置为nil +} + +func ReqGetObjectDetails(objectIDs []cdssdk.ObjectID) *GetObjectDetails { + return &GetObjectDetails{ + ObjectIDs: objectIDs, + } +} +func RespGetObjectDetails(objects []*stgmod.ObjectDetail) *GetObjectDetailsResp { + return &GetObjectDetailsResp{ + Objects: objects, + } +} +func (client *Client) GetObjectDetails(msg *GetObjectDetails) (*GetObjectDetailsResp, error) { + return mq.Request(Service.GetObjectDetails, client.rabbitCli, msg) +} + // 更新Object的冗余方式 var _ = Register(Service.UpdateObjectRedundancy) diff --git a/coordinator/internal/mq/object.go b/coordinator/internal/mq/object.go index b3171eb..bf4820e 100644 --- a/coordinator/internal/mq/object.go +++ b/coordinator/internal/mq/object.go @@ -54,6 +54,92 @@ func (svc *Service) GetPackageObjectDetails(msg *coormq.GetPackageObjectDetails) return mq.ReplyOK(coormq.NewGetPackageObjectDetailsResp(details)) } +func (svc *Service) GetObjectDetails(msg *coormq.GetObjectDetails) (*coormq.GetObjectDetailsResp, *mq.CodeMessage) { + details := make([]*stgmod.ObjectDetail, len(msg.ObjectIDs)) + err := svc.db.DoTx(sql.LevelSerializable, func(tx *sqlx.Tx) error { + var err error + + msg.ObjectIDs = sort2.SortAsc(msg.ObjectIDs) + + // 根据ID依次查询Object,ObjectBlock,PinnedObject,并根据升序的特点进行合并 + objs, err := svc.db.Object().BatchGet(tx, msg.ObjectIDs) + if err != nil { + return fmt.Errorf("batch get objects: %w", err) + } + + objIDIdx := 0 + objIdx := 0 + for objIDIdx < len(msg.ObjectIDs) && objIdx < len(objs) { + if msg.ObjectIDs[objIDIdx] < objs[objIdx].ObjectID { + objIDIdx++ + continue + } + + // 由于是使用msg.ObjectIDs去查询Object,因此不存在msg.ObjectIDs > Object.ObjectID的情况, + // 下面同理 + obj := stgmod.ObjectDetail{ + Object: objs[objIDIdx], + } + details[objIDIdx] = &obj + objIdx++ + } + + // 查询合并 + blocks, err := svc.db.ObjectBlock().BatchGetByObjectID(tx, msg.ObjectIDs) + if err != nil { + return fmt.Errorf("batch get object blocks: %w", err) + } + + objIDIdx = 0 + blkIdx := 0 + for objIDIdx < len(msg.ObjectIDs) && blkIdx < len(blocks) { + if details[objIDIdx] == nil { + objIDIdx++ + continue + } + + if msg.ObjectIDs[objIDIdx] < blocks[blkIdx].ObjectID { + objIDIdx++ + continue + } + + details[objIDIdx].Blocks = append(details[objIDIdx].Blocks, blocks[blkIdx]) + blkIdx++ + } + + // 查询合并 + pinneds, err := svc.db.PinnedObject().BatchGetByObjectID(tx, msg.ObjectIDs) + if err != nil { + return fmt.Errorf("batch get pinned objects: %w", err) + } + + objIDIdx = 0 + pinIdx := 0 + for objIDIdx < len(msg.ObjectIDs) && pinIdx < len(pinneds) { + if details[objIDIdx] == nil { + objIDIdx++ + continue + } + + if msg.ObjectIDs[objIDIdx] < pinneds[pinIdx].ObjectID { + objIDIdx++ + continue + } + + details[objIDIdx].PinnedAt = append(details[objIDIdx].PinnedAt, pinneds[pinIdx].NodeID) + pinIdx++ + } + return nil + }) + + if err != nil { + logger.Warn(err.Error()) + return nil, mq.Failed(errorcode.OperationFailed, "get object details failed") + } + + return mq.ReplyOK(coormq.RespGetObjectDetails(details)) +} + func (svc *Service) UpdateObjectRedundancy(msg *coormq.UpdateObjectRedundancy) (*coormq.UpdateObjectRedundancyResp, *mq.CodeMessage) { err := svc.db.DoTx(sql.LevelSerializable, func(tx *sqlx.Tx) error { return svc.db.Object().BatchUpdateRedundancy(tx, msg.Updatings) @@ -136,19 +222,17 @@ func (svc *Service) UpdateObjectInfos(msg *coormq.UpdateObjectInfos) (*coormq.Up func pickByObjectIDs(objs []cdssdk.UpdatingObject, objIDs []cdssdk.ObjectID) (pickedObjs []cdssdk.UpdatingObject, notFoundObjs []cdssdk.ObjectID) { objIdx := 0 - IDIdx := 0 + idIdx := 0 - for IDIdx < len(objIDs) { - if objs[objIdx].ObjectID == objIDs[IDIdx] { - pickedObjs = append(pickedObjs, objs[objIdx]) - IDIdx++ - objIdx++ - } else if objs[objIdx].ObjectID < objIDs[IDIdx] { - objIdx++ - } else { - notFoundObjs = append(notFoundObjs, objIDs[IDIdx]) - IDIdx++ + for idIdx < len(objIDs) && objIdx < len(objs) { + if objIDs[idIdx] < objs[objIdx].ObjectID { + notFoundObjs = append(notFoundObjs, objIDs[idIdx]) + idIdx++ + continue } + + pickedObjs = append(pickedObjs, objs[objIdx]) + objIdx++ } return From 0fc50364a62c7f8fc08e5b669f52bc2d4e9ae60f Mon Sep 17 00:00:00 2001 From: Sydonian <794346190@qq.com> Date: Fri, 29 Mar 2024 10:40:57 +0800 Subject: [PATCH 3/5] =?UTF-8?q?=E4=BF=AE=E5=A4=8D=E8=B0=83=E8=AF=95?= =?UTF-8?q?=E9=97=AE=E9=A2=98?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- client/internal/http/bucket.go | 2 +- common/pkgs/db/model/model.go | 2 +- coordinator/internal/mq/object.go | 52 ++++++++++++++++++++++++------- 3 files changed, 43 insertions(+), 13 deletions(-) diff --git a/client/internal/http/bucket.go b/client/internal/http/bucket.go index ed944f2..7dcfe50 100644 --- a/client/internal/http/bucket.go +++ b/client/internal/http/bucket.go @@ -29,7 +29,7 @@ func (s *BucketService) Create(ctx *gin.Context) { return } - bucketID, err := s.svc.BucketSvc().CreateBucket(req.UserID, req.BucketName) + bucketID, err := s.svc.BucketSvc().CreateBucket(req.UserID, req.Name) if err != nil { log.Warnf("creating bucket: %s", err.Error()) ctx.JSON(http.StatusOK, Failed(errorcode.OperationFailed, "create bucket failed")) diff --git a/common/pkgs/db/model/model.go b/common/pkgs/db/model/model.go index a48946f..8044dcb 100644 --- a/common/pkgs/db/model/model.go +++ b/common/pkgs/db/model/model.go @@ -22,7 +22,7 @@ type Storage struct { type User struct { UserID cdssdk.UserID `db:"UserID" json:"userID"` - Password string `db:"PassWord" json:"password"` + Password string `db:"Password" json:"password"` } type UserBucket struct { diff --git a/coordinator/internal/mq/object.go b/coordinator/internal/mq/object.go index bf4820e..fd53755 100644 --- a/coordinator/internal/mq/object.go +++ b/coordinator/internal/mq/object.go @@ -16,11 +16,23 @@ import ( ) func (svc *Service) GetPackageObjects(msg *coormq.GetPackageObjects) (*coormq.GetPackageObjectsResp, *mq.CodeMessage) { - // TODO 检查用户是否有权限 - objs, err := svc.db.Object().GetPackageObjects(svc.db.SQLCtx(), msg.PackageID) + var objs []cdssdk.Object + err := svc.db.DoTx(sql.LevelSerializable, func(tx *sqlx.Tx) error { + _, err := svc.db.Package().GetUserPackage(tx, msg.UserID, msg.PackageID) + if err != nil { + return fmt.Errorf("getting package by id: %w", err) + } + + objs, err = svc.db.Object().GetPackageObjects(svc.db.SQLCtx(), msg.PackageID) + if err != nil { + return fmt.Errorf("getting package objects: %w", err) + } + + return nil + }) if err != nil { - logger.WithField("PackageID", msg.PackageID). - Warnf("get package objects: %s", err.Error()) + logger.WithField("UserID", msg.UserID).WithField("PackageID", msg.PackageID). + Warn(err.Error()) return nil, mq.Failed(errorcode.OperationFailed, "get package objects failed") } @@ -200,8 +212,8 @@ func (svc *Service) UpdateObjectInfos(msg *coormq.UpdateObjectInfos) (*coormq.Up newObjs := make([]cdssdk.Object, len(avaiUpdatings)) for i := range newObjs { - newObj := oldObjs[i] - avaiUpdatings[i].ApplyTo(&newObj) + newObjs[i] = oldObjs[i] + avaiUpdatings[i].ApplyTo(&newObjs[i]) } err = svc.db.Object().BatchCreateOrUpdate(tx, newObjs) @@ -220,19 +232,22 @@ func (svc *Service) UpdateObjectInfos(msg *coormq.UpdateObjectInfos) (*coormq.Up return mq.ReplyOK(coormq.RespUpdateObjectInfos()) } -func pickByObjectIDs(objs []cdssdk.UpdatingObject, objIDs []cdssdk.ObjectID) (pickedObjs []cdssdk.UpdatingObject, notFoundObjs []cdssdk.ObjectID) { +// 根据objIDs从objs中挑选Object。 +// len(objs) >= len(objIDs) +func pickByObjectIDs(objs []cdssdk.UpdatingObject, objIDs []cdssdk.ObjectID) (pickedObjs []cdssdk.UpdatingObject, notFoundObjs []cdssdk.UpdatingObject) { objIdx := 0 idIdx := 0 for idIdx < len(objIDs) && objIdx < len(objs) { - if objIDs[idIdx] < objs[objIdx].ObjectID { - notFoundObjs = append(notFoundObjs, objIDs[idIdx]) - idIdx++ + if objs[objIdx].ObjectID < objIDs[idIdx] { + notFoundObjs = append(notFoundObjs, objs[objIdx]) + objIdx++ continue } pickedObjs = append(pickedObjs, objs[objIdx]) objIdx++ + idIdx++ } return @@ -288,7 +303,22 @@ func (svc *Service) ensurePackageChangedObjects(tx *sqlx.Tx, objs []cdssdk.Objec func (svc *Service) DeleteObjects(msg *coormq.DeleteObjects) (*coormq.DeleteObjectsResp, *mq.CodeMessage) { err := svc.db.DoTx(sql.LevelSerializable, func(tx *sqlx.Tx) error { - return svc.db.Object().BatchDelete(tx, msg.ObjectIDs) + err := svc.db.Object().BatchDelete(tx, msg.ObjectIDs) + if err != nil { + return fmt.Errorf("batch deleting objects: %w", err) + } + + err = svc.db.ObjectBlock().BatchDeleteByObjectID(tx, msg.ObjectIDs) + if err != nil { + return fmt.Errorf("batch deleting object blocks: %w", err) + } + + err = svc.db.PinnedObject().BatchDeleteByObjectID(tx, msg.ObjectIDs) + if err != nil { + return fmt.Errorf("batch deleting pinned objects: %w", err) + } + + return nil }) if err != nil { logger.Warnf("batch deleting objects: %s", err.Error()) From 786a65429330e2829d05fd0fe533b40ba3ade1e7 Mon Sep 17 00:00:00 2001 From: Sydonian <794346190@qq.com> Date: Fri, 29 Mar 2024 16:26:11 +0800 Subject: [PATCH 4/5] =?UTF-8?q?=E5=A2=9E=E5=8A=A0=E7=A7=BB=E5=8A=A8?= =?UTF-8?q?=E5=AF=B9=E8=B1=A1=E6=8E=A5=E5=8F=A3?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- client/internal/http/object.go | 24 +++- client/internal/http/server.go | 1 + client/internal/services/object.go | 25 +++- common/pkgs/db/object.go | 20 ++- common/pkgs/mq/coordinator/object.go | 38 +++++- coordinator/internal/mq/object.go | 175 ++++++++++++++++++++++----- 6 files changed, 238 insertions(+), 45 deletions(-) diff --git a/client/internal/http/object.go b/client/internal/http/object.go index fb2c1f1..ae2eb6d 100644 --- a/client/internal/http/object.go +++ b/client/internal/http/object.go @@ -129,14 +129,34 @@ func (s *ObjectService) UpdateInfo(ctx *gin.Context) { return } - err := s.svc.ObjectSvc().UpdateInfo(req.UserID, req.Updatings) + sucs, err := s.svc.ObjectSvc().UpdateInfo(req.UserID, req.Updatings) if err != nil { log.Warnf("updating objects: %s", err.Error()) ctx.JSON(http.StatusOK, Failed(errorcode.OperationFailed, "update objects failed")) return } - ctx.JSON(http.StatusOK, OK(nil)) + ctx.JSON(http.StatusOK, OK(cdssdk.ObjectUpdateInfoResp{Successes: sucs})) +} + +func (s *ObjectService) Move(ctx *gin.Context) { + log := logger.WithField("HTTP", "Object.Move") + + var req cdssdk.ObjectMoveReq + if err := ctx.ShouldBindJSON(&req); err != nil { + log.Warnf("binding body: %s", err.Error()) + ctx.JSON(http.StatusBadRequest, Failed(errorcode.BadArgument, "missing argument or invalid argument")) + return + } + + sucs, err := s.svc.ObjectSvc().Move(req.UserID, req.Movings) + if err != nil { + log.Warnf("moving objects: %s", err.Error()) + ctx.JSON(http.StatusOK, Failed(errorcode.OperationFailed, "move objects failed")) + return + } + + ctx.JSON(http.StatusOK, OK(cdssdk.ObjectMoveResp{Successes: sucs})) } func (s *ObjectService) Delete(ctx *gin.Context) { diff --git a/client/internal/http/server.go b/client/internal/http/server.go index ce4e276..d3c8fa9 100644 --- a/client/internal/http/server.go +++ b/client/internal/http/server.go @@ -43,6 +43,7 @@ func (s *Server) initRouters() { s.engine.POST(cdssdk.ObjectUploadPath, s.Object().Upload) s.engine.GET(cdssdk.ObjectGetPackageObjectsPath, s.Object().GetPackageObjects) s.engine.POST(cdssdk.ObjectUpdateInfoPath, s.Object().UpdateInfo) + s.engine.POST(cdssdk.ObjectMovePath, s.Object().Move) s.engine.POST(cdssdk.ObjectDeletePath, s.Object().Delete) s.engine.GET(cdssdk.PackageGetPath, s.Package().Get) diff --git a/client/internal/services/object.go b/client/internal/services/object.go index 5e3e568..620ce35 100644 --- a/client/internal/services/object.go +++ b/client/internal/services/object.go @@ -36,19 +36,34 @@ func (svc *ObjectService) WaitUploading(taskID string, waitTimeout time.Duration return false, nil, nil } -func (svc *ObjectService) UpdateInfo(userID cdssdk.UserID, updatings []cdssdk.UpdatingObject) error { +func (svc *ObjectService) UpdateInfo(userID cdssdk.UserID, updatings []cdssdk.UpdatingObject) ([]cdssdk.ObjectID, error) { coorCli, err := stgglb.CoordinatorMQPool.Acquire() if err != nil { - return fmt.Errorf("new coordinator client: %w", err) + return nil, fmt.Errorf("new coordinator client: %w", err) } defer stgglb.CoordinatorMQPool.Release(coorCli) - _, err = coorCli.UpdateObjectInfos(coormq.ReqUpdateObjectInfos(userID, updatings)) + resp, err := coorCli.UpdateObjectInfos(coormq.ReqUpdateObjectInfos(userID, updatings)) if err != nil { - return fmt.Errorf("requsting to coodinator: %w", err) + return nil, fmt.Errorf("requsting to coodinator: %w", err) } - return nil + return resp.Successes, nil +} + +func (svc *ObjectService) Move(userID cdssdk.UserID, movings []cdssdk.MovingObject) ([]cdssdk.ObjectID, error) { + coorCli, err := stgglb.CoordinatorMQPool.Acquire() + if err != nil { + return nil, fmt.Errorf("new coordinator client: %w", err) + } + defer stgglb.CoordinatorMQPool.Release(coorCli) + + resp, err := coorCli.MoveObjects(coormq.ReqMoveObjects(userID, movings)) + if err != nil { + return nil, fmt.Errorf("requsting to coodinator: %w", err) + } + + return resp.Successes, nil } func (svc *ObjectService) Download(userID cdssdk.UserID, objectID cdssdk.ObjectID) (io.ReadCloser, error) { diff --git a/common/pkgs/db/object.go b/common/pkgs/db/object.go index ec9a669..7fd20ef 100644 --- a/common/pkgs/db/object.go +++ b/common/pkgs/db/object.go @@ -47,7 +47,7 @@ func (db *ObjectDB) BatchGet(ctx SQLContext, objectIDs []cdssdk.ObjectID) ([]mod return lo.Map(objs, func(o model.TempObject, idx int) cdssdk.Object { return o.ToObject() }), nil } -func (db *ObjectDB) BatchByPackagePath(ctx SQLContext, pkgID cdssdk.PackageID, pathes []string) ([]cdssdk.Object, error) { +func (db *ObjectDB) BatchGetByPackagePath(ctx SQLContext, pkgID cdssdk.PackageID, pathes []string) ([]cdssdk.Object, error) { if len(pathes) == 0 { return nil, nil } @@ -87,7 +87,7 @@ func (db *ObjectDB) Create(ctx SQLContext, obj cdssdk.Object) (cdssdk.ObjectID, // 可以用于批量创建或者更新记录。 // 用于创建时,需要额外检查PackageID+Path的唯一性。 // 用于更新时,需要额外检查现存的PackageID+Path对应的ObjectID是否与待更新的ObjectID相同。不会更新CreateTime。 -func (db *ObjectDB) BatchCreateOrUpdate(ctx SQLContext, objs []cdssdk.Object) error { +func (db *ObjectDB) BatchUpsertByPackagePath(ctx SQLContext, objs []cdssdk.Object) error { if len(objs) == 0 { return nil } @@ -99,6 +99,18 @@ func (db *ObjectDB) BatchCreateOrUpdate(ctx SQLContext, objs []cdssdk.Object) er return BatchNamedExec(ctx, sql, 7, objs, nil) } +func (db *ObjectDB) BatchUpert(ctx SQLContext, objs []cdssdk.Object) error { + if len(objs) == 0 { + return nil + } + + sql := "insert into Object(ObjectID, PackageID, Path, Size, FileHash, Redundancy, CreateTime ,UpdateTime)" + + " values(:ObjectID, :PackageID,:Path,:Size,:FileHash,:Redundancy, :CreateTime, :UpdateTime) as new" + + " on duplicate key update PackageID = new.PackageID, Path = new.Path, Size = new.Size, FileHash = new.FileHash, Redundancy = new.Redundancy, UpdateTime = new.UpdateTime" + + return BatchNamedExec(ctx, sql, 8, objs, nil) +} + func (*ObjectDB) GetPackageObjects(ctx SQLContext, packageID cdssdk.PackageID) ([]model.Object, error) { var ret []model.TempObject err := sqlx.Select(ctx, &ret, "select * from Object where PackageID = ? order by ObjectID asc", packageID) @@ -175,7 +187,7 @@ func (db *ObjectDB) BatchAdd(ctx SQLContext, packageID cdssdk.PackageID, adds [] }) } - err := db.BatchCreateOrUpdate(ctx, objs) + err := db.BatchUpsertByPackagePath(ctx, objs) if err != nil { return nil, fmt.Errorf("batch create or update objects: %w", err) } @@ -185,7 +197,7 @@ func (db *ObjectDB) BatchAdd(ctx SQLContext, packageID cdssdk.PackageID, adds [] pathes = append(pathes, add.Path) } // 这里可以不用检查查询结果是否与pathes的数量相同 - addedObjs, err := db.BatchByPackagePath(ctx, packageID, pathes) + addedObjs, err := db.BatchGetByPackagePath(ctx, packageID, pathes) if err != nil { return nil, fmt.Errorf("batch get object ids: %w", err) } diff --git a/common/pkgs/mq/coordinator/object.go b/common/pkgs/mq/coordinator/object.go index 12fcac5..7b3c90f 100644 --- a/common/pkgs/mq/coordinator/object.go +++ b/common/pkgs/mq/coordinator/object.go @@ -19,6 +19,8 @@ type ObjectService interface { UpdateObjectInfos(msg *UpdateObjectInfos) (*UpdateObjectInfosResp, *mq.CodeMessage) + MoveObjects(msg *MoveObjects) (*MoveObjectsResp, *mq.CodeMessage) + DeleteObjects(msg *DeleteObjects) (*DeleteObjectsResp, *mq.CodeMessage) } @@ -142,6 +144,7 @@ type UpdateObjectInfos struct { type UpdateObjectInfosResp struct { mq.MessageBodyBase + Successes []cdssdk.ObjectID `json:"successes"` } func ReqUpdateObjectInfos(userID cdssdk.UserID, updatings []cdssdk.UpdatingObject) *UpdateObjectInfos { @@ -150,13 +153,44 @@ func ReqUpdateObjectInfos(userID cdssdk.UserID, updatings []cdssdk.UpdatingObjec Updatings: updatings, } } -func RespUpdateObjectInfos() *UpdateObjectInfosResp { - return &UpdateObjectInfosResp{} +func RespUpdateObjectInfos(successes []cdssdk.ObjectID) *UpdateObjectInfosResp { + return &UpdateObjectInfosResp{ + Successes: successes, + } } func (client *Client) UpdateObjectInfos(msg *UpdateObjectInfos) (*UpdateObjectInfosResp, error) { return mq.Request(Service.UpdateObjectInfos, client.rabbitCli, msg) } +// 移动Object +var _ = Register(Service.MoveObjects) + +type MoveObjects struct { + mq.MessageBodyBase + UserID cdssdk.UserID `json:"userID"` + Movings []cdssdk.MovingObject `json:"movings"` +} + +type MoveObjectsResp struct { + mq.MessageBodyBase + Successes []cdssdk.ObjectID `json:"successes"` +} + +func ReqMoveObjects(userID cdssdk.UserID, movings []cdssdk.MovingObject) *MoveObjects { + return &MoveObjects{ + UserID: userID, + Movings: movings, + } +} +func RespMoveObjects(successes []cdssdk.ObjectID) *MoveObjectsResp { + return &MoveObjectsResp{ + Successes: successes, + } +} +func (client *Client) MoveObjects(msg *MoveObjects) (*MoveObjectsResp, error) { + return mq.Request(Service.MoveObjects, client.rabbitCli, msg) +} + // 删除Object var _ = Register(Service.DeleteObjects) diff --git a/coordinator/internal/mq/object.go b/coordinator/internal/mq/object.go index fd53755..a5dd496 100644 --- a/coordinator/internal/mq/object.go +++ b/coordinator/internal/mq/object.go @@ -165,6 +165,7 @@ func (svc *Service) UpdateObjectRedundancy(msg *coormq.UpdateObjectRedundancy) ( } func (svc *Service) UpdateObjectInfos(msg *coormq.UpdateObjectInfos) (*coormq.UpdateObjectInfosResp, *mq.CodeMessage) { + var sucs []cdssdk.ObjectID err := svc.db.DoTx(sql.LevelSerializable, func(tx *sqlx.Tx) error { msg.Updatings = sort2.Sort(msg.Updatings, func(o1, o2 cdssdk.UpdatingObject) int { return sort2.Cmp(o1.ObjectID, o2.ObjectID) @@ -184,43 +185,23 @@ func (svc *Service) UpdateObjectInfos(msg *coormq.UpdateObjectInfos) (*coormq.Up oldObjIDs[i] = obj.ObjectID } - avaiUpdatings, notExistsObjs := pickByObjectIDs(msg.Updatings, oldObjIDs) + avaiUpdatings, notExistsObjs := pickByObjectIDs(msg.Updatings, oldObjIDs, func(obj cdssdk.UpdatingObject) cdssdk.ObjectID { return obj.ObjectID }) if len(notExistsObjs) > 0 { // TODO 部分对象已经不存在 } - // 筛选出PackageID变化、Path变化的对象,这两种对象要检测改变后是否有冲突 - // 否则,直接更新即可 - //var pkgIDChangedObjs []cdssdk.Object - //var pathChangedObjs []cdssdk.Object - //var infoChangedObjs []cdssdk.Object - //for i := range willUpdateObjs { - // if willUpdateObjs[i].PackageID != oldObjs[i].PackageID { - // newObj := oldObjs[i] - // willUpdateObjs[i].ApplyTo(&newObj) - // pkgIDChangedObjs = append(pkgIDChangedObjs, newObj) - // } else if willUpdateObjs[i].Path != oldObjs[i].Path { - // newObj := oldObjs[i] - // willUpdateObjs[i].ApplyTo(&newObj) - // pathChangedObjs = append(pathChangedObjs, newObj) - // } else { - // newObj := oldObjs[i] - // willUpdateObjs[i].ApplyTo(&newObj) - // infoChangedObjs = append(infoChangedObjs, newObj) - // } - //} - newObjs := make([]cdssdk.Object, len(avaiUpdatings)) for i := range newObjs { newObjs[i] = oldObjs[i] avaiUpdatings[i].ApplyTo(&newObjs[i]) } - err = svc.db.Object().BatchCreateOrUpdate(tx, newObjs) + err = svc.db.Object().BatchUpsertByPackagePath(tx, newObjs) if err != nil { return fmt.Errorf("batch create or update: %w", err) } + sucs = lo.Map(newObjs, func(obj cdssdk.Object, _ int) cdssdk.ObjectID { return obj.ObjectID }) return nil }) @@ -229,23 +210,23 @@ func (svc *Service) UpdateObjectInfos(msg *coormq.UpdateObjectInfos) (*coormq.Up return nil, mq.Failed(errorcode.OperationFailed, "batch update objects failed") } - return mq.ReplyOK(coormq.RespUpdateObjectInfos()) + return mq.ReplyOK(coormq.RespUpdateObjectInfos(sucs)) } // 根据objIDs从objs中挑选Object。 // len(objs) >= len(objIDs) -func pickByObjectIDs(objs []cdssdk.UpdatingObject, objIDs []cdssdk.ObjectID) (pickedObjs []cdssdk.UpdatingObject, notFoundObjs []cdssdk.UpdatingObject) { +func pickByObjectIDs[T any](objs []T, objIDs []cdssdk.ObjectID, getID func(T) cdssdk.ObjectID) (picked []T, notFound []T) { objIdx := 0 idIdx := 0 for idIdx < len(objIDs) && objIdx < len(objs) { - if objs[objIdx].ObjectID < objIDs[idIdx] { - notFoundObjs = append(notFoundObjs, objs[objIdx]) + if getID(objs[objIdx]) < objIDs[idIdx] { + notFound = append(notFound, objs[objIdx]) objIdx++ continue } - pickedObjs = append(pickedObjs, objs[objIdx]) + picked = append(picked, objs[objIdx]) objIdx++ idIdx++ } @@ -253,7 +234,83 @@ func pickByObjectIDs(objs []cdssdk.UpdatingObject, objIDs []cdssdk.ObjectID) (pi return } -func (svc *Service) ensurePackageChangedObjects(tx *sqlx.Tx, objs []cdssdk.Object) ([]cdssdk.Object, error) { +func (svc *Service) MoveObjects(msg *coormq.MoveObjects) (*coormq.MoveObjectsResp, *mq.CodeMessage) { + var sucs []cdssdk.ObjectID + err := svc.db.DoTx(sql.LevelSerializable, func(tx *sqlx.Tx) error { + msg.Movings = sort2.Sort(msg.Movings, func(o1, o2 cdssdk.MovingObject) int { + return sort2.Cmp(o1.ObjectID, o2.ObjectID) + }) + + objIDs := make([]cdssdk.ObjectID, len(msg.Movings)) + for i, obj := range msg.Movings { + objIDs[i] = obj.ObjectID + } + + oldObjs, err := svc.db.Object().BatchGet(tx, objIDs) + if err != nil { + return fmt.Errorf("batch getting objects: %w", err) + } + oldObjIDs := make([]cdssdk.ObjectID, len(oldObjs)) + for i, obj := range oldObjs { + oldObjIDs[i] = obj.ObjectID + } + + avaiMovings, notExistsObjs := pickByObjectIDs(msg.Movings, oldObjIDs, func(obj cdssdk.MovingObject) cdssdk.ObjectID { return obj.ObjectID }) + if len(notExistsObjs) > 0 { + // TODO 部分对象已经不存在 + } + + // 筛选出PackageID变化、Path变化的对象,这两种对象要检测改变后是否有冲突 + var pkgIDChangedObjs []cdssdk.Object + var pathChangedObjs []cdssdk.Object + for i := range avaiMovings { + if avaiMovings[i].PackageID != oldObjs[i].PackageID { + newObj := oldObjs[i] + avaiMovings[i].ApplyTo(&newObj) + pkgIDChangedObjs = append(pkgIDChangedObjs, newObj) + } else if avaiMovings[i].Path != oldObjs[i].Path { + newObj := oldObjs[i] + avaiMovings[i].ApplyTo(&newObj) + pathChangedObjs = append(pathChangedObjs, newObj) + } + } + + var newObjs []cdssdk.Object + // 对于PackageID发生变化的对象,需要检查目标Package内是否存在同Path的对象 + ensuredObjs, err := svc.ensurePackageChangedObjects(tx, msg.UserID, pkgIDChangedObjs) + if err != nil { + return err + } + newObjs = append(newObjs, ensuredObjs...) + + // 对于只有Path发生变化的对象,则检查同Package内有没有同Path的对象 + ensuredObjs, err = svc.ensurePathChangedObjects(tx, msg.UserID, pathChangedObjs) + if err != nil { + return err + } + newObjs = append(newObjs, ensuredObjs...) + + err = svc.db.Object().BatchUpert(tx, newObjs) + if err != nil { + return fmt.Errorf("batch create or update: %w", err) + } + + sucs = lo.Map(newObjs, func(obj cdssdk.Object, _ int) cdssdk.ObjectID { return obj.ObjectID }) + return nil + }) + if err != nil { + logger.Warn(err.Error()) + return nil, mq.Failed(errorcode.OperationFailed, "move objects failed") + } + + return mq.ReplyOK(coormq.RespMoveObjects(sucs)) +} + +func (svc *Service) ensurePackageChangedObjects(tx *sqlx.Tx, userID cdssdk.UserID, objs []cdssdk.Object) ([]cdssdk.Object, error) { + if len(objs) == 0 { + return nil, nil + } + type PackageObjects struct { PackageID cdssdk.PackageID ObjectByPath map[string]*cdssdk.Object @@ -274,19 +331,29 @@ func (svc *Service) ensurePackageChangedObjects(tx *sqlx.Tx, objs []cdssdk.Objec o := obj pkg.ObjectByPath[obj.Path] = &o } else { - // TODO 有冲突 + // TODO 有两个对象移动到同一个路径,有冲突 } } var willUpdateObjs []cdssdk.Object for _, pkg := range packages { - existsObjs, err := svc.db.Object().BatchByPackagePath(tx, pkg.PackageID, lo.Keys(pkg.ObjectByPath)) + _, err := svc.db.Package().GetUserPackage(tx, userID, pkg.PackageID) + if err == sql.ErrNoRows { + continue + } + if err != nil { + return nil, fmt.Errorf("getting user package by id: %w", err) + } + + existsObjs, err := svc.db.Object().BatchGetByPackagePath(tx, pkg.PackageID, lo.Keys(pkg.ObjectByPath)) if err != nil { return nil, fmt.Errorf("batch getting objects by package path: %w", err) } + // 标记冲突的对象 for _, obj := range existsObjs { pkg.ObjectByPath[obj.Path] = nil + // TODO 目标Package内有冲突的对象 } for _, obj := range pkg.ObjectByPath { @@ -295,12 +362,56 @@ func (svc *Service) ensurePackageChangedObjects(tx *sqlx.Tx, objs []cdssdk.Objec } willUpdateObjs = append(willUpdateObjs, *obj) } - } return willUpdateObjs, nil } +func (svc *Service) ensurePathChangedObjects(tx *sqlx.Tx, userID cdssdk.UserID, objs []cdssdk.Object) ([]cdssdk.Object, error) { + if len(objs) == 0 { + return nil, nil + } + + objByPath := make(map[string]*cdssdk.Object) + for _, obj := range objs { + if objByPath[obj.Path] == nil { + o := obj + objByPath[obj.Path] = &o + } else { + // TODO 有两个对象移动到同一个路径,有冲突 + } + + } + + _, err := svc.db.Package().GetUserPackage(tx, userID, objs[0].PackageID) + if err == sql.ErrNoRows { + return nil, nil + } + if err != nil { + return nil, fmt.Errorf("getting user package by id: %w", err) + } + + existsObjs, err := svc.db.Object().BatchGetByPackagePath(tx, objs[0].PackageID, lo.Map(objs, func(obj cdssdk.Object, idx int) string { return obj.Path })) + if err != nil { + return nil, fmt.Errorf("batch getting objects by package path: %w", err) + } + + // 不支持两个对象交换位置的情况,因为数据库不支持 + for _, obj := range existsObjs { + objByPath[obj.Path] = nil + } + + var willMoveObjs []cdssdk.Object + for _, obj := range objByPath { + if obj == nil { + continue + } + willMoveObjs = append(willMoveObjs, *obj) + } + + return willMoveObjs, nil +} + func (svc *Service) DeleteObjects(msg *coormq.DeleteObjects) (*coormq.DeleteObjectsResp, *mq.CodeMessage) { err := svc.db.DoTx(sql.LevelSerializable, func(tx *sqlx.Tx) error { err := svc.db.Object().BatchDelete(tx, msg.ObjectIDs) From 33b1a4ea2daa612156a5188adbbf793afa454e37 Mon Sep 17 00:00:00 2001 From: Sydonian <794346190@qq.com> Date: Tue, 9 Apr 2024 16:54:41 +0800 Subject: [PATCH 5/5] =?UTF-8?q?cds=E6=94=AF=E6=8C=81rclone=E6=8C=82?= =?UTF-8?q?=E8=BD=BD?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- agent/internal/task/create_package.go | 4 +- client/internal/http/bucket.go | 30 +++++++-- client/internal/http/object.go | 67 +++++++++---------- client/internal/http/package.go | 38 +++++++++-- client/internal/http/server.go | 2 + client/internal/services/bucket.go | 23 +++++-- client/internal/services/object.go | 5 +- client/internal/services/package.go | 26 +++++-- common/pkgs/cmd/upload_objects.go | 24 +++++-- common/pkgs/db/bucket.go | 15 +++++ common/pkgs/db/object.go | 4 +- common/pkgs/db/package.go | 14 ++++ .../pkgs/iterator/download_object_iterator.go | 3 +- common/pkgs/mq/coordinator/bucket.go | 36 +++++++++- common/pkgs/mq/coordinator/package.go | 45 +++++++++++-- coordinator/internal/mq/bucket.go | 22 +++++- coordinator/internal/mq/package.go | 32 +++++++-- 17 files changed, 306 insertions(+), 84 deletions(-) diff --git a/agent/internal/task/create_package.go b/agent/internal/task/create_package.go index 1ddd848..9bf7684 100644 --- a/agent/internal/task/create_package.go +++ b/agent/internal/task/create_package.go @@ -63,7 +63,7 @@ func (t *CreatePackage) Execute(task *task.Task[TaskContext], ctx TaskContext, c return } - uploadRet, err := cmd.NewUploadObjects(t.userID, createResp.PackageID, t.objIter, t.nodeAffinity).Execute(&cmd.UploadObjectsContext{ + uploadRet, err := cmd.NewUploadObjects(t.userID, createResp.Package.PackageID, t.objIter, t.nodeAffinity).Execute(&cmd.UploadObjectsContext{ Distlock: ctx.distlock, Connectivity: ctx.connectivity, }) @@ -76,7 +76,7 @@ func (t *CreatePackage) Execute(task *task.Task[TaskContext], ctx TaskContext, c return } - t.Result.PackageID = createResp.PackageID + t.Result.PackageID = createResp.Package.PackageID t.Result.Objects = uploadRet.Objects complete(nil, CompleteOption{ diff --git a/client/internal/http/bucket.go b/client/internal/http/bucket.go index 7dcfe50..515bc20 100644 --- a/client/internal/http/bucket.go +++ b/client/internal/http/bucket.go @@ -19,17 +19,39 @@ func (s *Server) Bucket() *BucketService { } } +func (s *BucketService) GetByName(ctx *gin.Context) { + log := logger.WithField("HTTP", "Bucket.GetByName") + + var req cdssdk.BucketGetByName + if err := ctx.ShouldBindQuery(&req); err != nil { + log.Warnf("binding query: %s", err.Error()) + ctx.JSON(http.StatusBadRequest, Failed(errorcode.BadArgument, "missing argument or invalid argument")) + return + } + + bucket, err := s.svc.BucketSvc().GetBucketByName(req.UserID, req.Name) + if err != nil { + log.Warnf("getting bucket by name: %s", err.Error()) + ctx.JSON(http.StatusOK, Failed(errorcode.OperationFailed, "get bucket by name failed")) + return + } + + ctx.JSON(http.StatusOK, OK(cdssdk.BucketGetByNameResp{ + Bucket: bucket, + })) +} + func (s *BucketService) Create(ctx *gin.Context) { log := logger.WithField("HTTP", "Bucket.Create") - var req cdssdk.BucketCreateReq + var req cdssdk.BucketCreate if err := ctx.ShouldBindJSON(&req); err != nil { log.Warnf("binding body: %s", err.Error()) ctx.JSON(http.StatusBadRequest, Failed(errorcode.BadArgument, "missing argument or invalid argument")) return } - bucketID, err := s.svc.BucketSvc().CreateBucket(req.UserID, req.Name) + bucket, err := s.svc.BucketSvc().CreateBucket(req.UserID, req.Name) if err != nil { log.Warnf("creating bucket: %s", err.Error()) ctx.JSON(http.StatusOK, Failed(errorcode.OperationFailed, "create bucket failed")) @@ -37,14 +59,14 @@ func (s *BucketService) Create(ctx *gin.Context) { } ctx.JSON(http.StatusOK, OK(cdssdk.BucketCreateResp{ - BucketID: bucketID, + Bucket: bucket, })) } func (s *BucketService) Delete(ctx *gin.Context) { log := logger.WithField("HTTP", "Bucket.Delete") - var req cdssdk.BucketDeleteReq + var req cdssdk.BucketDelete if err := ctx.ShouldBindJSON(&req); err != nil { log.Warnf("binding body: %s", err.Error()) ctx.JSON(http.StatusBadRequest, Failed(errorcode.BadArgument, "missing argument or invalid argument")) diff --git a/client/internal/http/object.go b/client/internal/http/object.go index ae2eb6d..9c1428e 100644 --- a/client/internal/http/object.go +++ b/client/internal/http/object.go @@ -1,16 +1,18 @@ package http import ( + "fmt" "io" "mime/multipart" "net/http" + "path" "time" "github.com/gin-gonic/gin" "gitlink.org.cn/cloudream/common/consts/errorcode" "gitlink.org.cn/cloudream/common/pkgs/logger" cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" - myio "gitlink.org.cn/cloudream/common/utils/io" + myhttp "gitlink.org.cn/cloudream/common/utils/http" ) type ObjectService struct { @@ -51,7 +53,7 @@ func (s *ObjectService) Upload(ctx *gin.Context) { } for { - complete, _, err := s.svc.ObjectSvc().WaitUploading(taskID, time.Second*5) + complete, objs, err := s.svc.ObjectSvc().WaitUploading(taskID, time.Second*5) if complete { if err != nil { log.Warnf("uploading object: %s", err.Error()) @@ -59,7 +61,20 @@ func (s *ObjectService) Upload(ctx *gin.Context) { return } - ctx.JSON(http.StatusOK, OK(nil)) + uploadeds := make([]cdssdk.UploadedObject, len(objs.Objects)) + for i, obj := range objs.Objects { + err := "" + if obj.Error != nil { + err = obj.Error.Error() + } + o := obj.Object + uploadeds[i] = cdssdk.UploadedObject{ + Object: &o, + Error: err, + } + } + + ctx.JSON(http.StatusOK, OK(cdssdk.ObjectUploadResp{Uploadeds: uploadeds})) return } @@ -74,7 +89,7 @@ func (s *ObjectService) Upload(ctx *gin.Context) { func (s *ObjectService) Download(ctx *gin.Context) { log := logger.WithField("HTTP", "Object.Download") - var req cdssdk.ObjectDownloadReq + var req cdssdk.ObjectDownload if err := ctx.ShouldBindQuery(&req); err != nil { log.Warnf("binding body: %s", err.Error()) ctx.JSON(http.StatusBadRequest, Failed(errorcode.BadArgument, "missing argument or invalid argument")) @@ -88,41 +103,25 @@ func (s *ObjectService) Download(ctx *gin.Context) { return } - ctx.Writer.WriteHeader(http.StatusOK) - // TODO 需要设置FileName - ctx.Header("Content-Disposition", "attachment; filename=filename") - ctx.Header("Content-Type", "application/octet-stream") - - buf := make([]byte, 4096) - ctx.Stream(func(w io.Writer) bool { - rd, err := file.Read(buf) - if err == io.EOF { - err = myio.WriteAll(w, buf[:rd]) - if err != nil { - log.Warnf("writing data to response: %s", err.Error()) - } - return false - } + mw := multipart.NewWriter(ctx.Writer) + defer mw.Close() - if err != nil { - log.Warnf("reading file data: %s", err.Error()) - return false - } + ctx.Writer.Header().Set("Content-Type", fmt.Sprintf("%s;boundary=%s", myhttp.ContentTypeMultiPart, mw.Boundary())) + ctx.Writer.WriteHeader(http.StatusOK) - err = myio.WriteAll(w, buf[:rd]) - if err != nil { - log.Warnf("writing data to response: %s", err.Error()) - return false - } + fw, err := mw.CreateFormFile("file", path.Base(file.Object.Path)) + if err != nil { + log.Warnf("creating form file: %s", err.Error()) + return + } - return true - }) + io.Copy(fw, file.File) } func (s *ObjectService) UpdateInfo(ctx *gin.Context) { log := logger.WithField("HTTP", "Object.UpdateInfo") - var req cdssdk.ObjectUpdateInfoReq + var req cdssdk.ObjectUpdateInfo if err := ctx.ShouldBindJSON(&req); err != nil { log.Warnf("binding body: %s", err.Error()) ctx.JSON(http.StatusBadRequest, Failed(errorcode.BadArgument, "missing argument or invalid argument")) @@ -142,7 +141,7 @@ func (s *ObjectService) UpdateInfo(ctx *gin.Context) { func (s *ObjectService) Move(ctx *gin.Context) { log := logger.WithField("HTTP", "Object.Move") - var req cdssdk.ObjectMoveReq + var req cdssdk.ObjectMove if err := ctx.ShouldBindJSON(&req); err != nil { log.Warnf("binding body: %s", err.Error()) ctx.JSON(http.StatusBadRequest, Failed(errorcode.BadArgument, "missing argument or invalid argument")) @@ -162,7 +161,7 @@ func (s *ObjectService) Move(ctx *gin.Context) { func (s *ObjectService) Delete(ctx *gin.Context) { log := logger.WithField("HTTP", "Object.Delete") - var req cdssdk.ObjectDeleteReq + var req cdssdk.ObjectDelete if err := ctx.ShouldBindJSON(&req); err != nil { log.Warnf("binding body: %s", err.Error()) ctx.JSON(http.StatusBadRequest, Failed(errorcode.BadArgument, "missing argument or invalid argument")) @@ -182,7 +181,7 @@ func (s *ObjectService) Delete(ctx *gin.Context) { func (s *ObjectService) GetPackageObjects(ctx *gin.Context) { log := logger.WithField("HTTP", "Object.GetPackageObjects") - var req cdssdk.ObjectGetPackageObjectsReq + var req cdssdk.ObjectGetPackageObjects if err := ctx.ShouldBindQuery(&req); err != nil { log.Warnf("binding body: %s", err.Error()) ctx.JSON(http.StatusBadRequest, Failed(errorcode.BadArgument, "missing argument or invalid argument")) diff --git a/client/internal/http/package.go b/client/internal/http/package.go index c18559d..b549dd5 100644 --- a/client/internal/http/package.go +++ b/client/internal/http/package.go @@ -3,6 +3,7 @@ package http import ( "mime/multipart" "net/http" + "net/url" "github.com/gin-gonic/gin" "gitlink.org.cn/cloudream/common/consts/errorcode" @@ -43,16 +44,36 @@ func (s *PackageService) Get(ctx *gin.Context) { ctx.JSON(http.StatusOK, OK(cdssdk.PackageGetResp{Package: *pkg})) } +func (s *PackageService) GetByName(ctx *gin.Context) { + log := logger.WithField("HTTP", "Package.GetByName") + + var req cdssdk.PackageGetByName + if err := ctx.ShouldBindQuery(&req); err != nil { + log.Warnf("binding query: %s", err.Error()) + ctx.JSON(http.StatusBadRequest, Failed(errorcode.BadArgument, "missing argument or invalid argument")) + return + } + + pkg, err := s.svc.PackageSvc().GetByName(req.UserID, req.BucketName, req.PackageName) + if err != nil { + log.Warnf("getting package by name: %s", err.Error()) + ctx.JSON(http.StatusOK, Failed(errorcode.OperationFailed, "get package by name failed")) + return + } + + ctx.JSON(http.StatusOK, OK(cdssdk.PackageGetByNameResp{Package: *pkg})) +} + func (s *PackageService) Create(ctx *gin.Context) { log := logger.WithField("HTTP", "Package.Create") - var req cdssdk.PackageCreateReq + var req cdssdk.PackageCreate if err := ctx.ShouldBindJSON(&req); err != nil { log.Warnf("binding body: %s", err.Error()) ctx.JSON(http.StatusBadRequest, Failed(errorcode.BadArgument, "missing argument or invalid argument")) return } - pkgID, err := s.svc.PackageSvc().Create(req.UserID, req.BucketID, req.Name) + pkg, err := s.svc.PackageSvc().Create(req.UserID, req.BucketID, req.Name) if err != nil { log.Warnf("creating package: %s", err.Error()) ctx.JSON(http.StatusOK, Failed(errorcode.OperationFailed, "create package failed")) @@ -60,14 +81,14 @@ func (s *PackageService) Create(ctx *gin.Context) { } ctx.JSON(http.StatusOK, OK(cdssdk.PackageCreateResp{ - PackageID: pkgID, + Package: pkg, })) } func (s *PackageService) Delete(ctx *gin.Context) { log := logger.WithField("HTTP", "Package.Delete") - var req cdssdk.PackageDeleteReq + var req cdssdk.PackageDelete if err := ctx.ShouldBindJSON(&req); err != nil { log.Warnf("binding body: %s", err.Error()) ctx.JSON(http.StatusBadRequest, Failed(errorcode.BadArgument, "missing argument or invalid argument")) @@ -87,7 +108,7 @@ func (s *PackageService) Delete(ctx *gin.Context) { func (s *PackageService) ListBucketPackages(ctx *gin.Context) { log := logger.WithField("HTTP", "Package.ListBucketPackages") - var req cdssdk.PackageListBucketPackagesReq + var req cdssdk.PackageListBucketPackages if err := ctx.ShouldBindQuery(&req); err != nil { log.Warnf("binding query: %s", err.Error()) ctx.JSON(http.StatusBadRequest, Failed(errorcode.BadArgument, "missing argument or invalid argument")) @@ -157,8 +178,13 @@ func mapMultiPartFileToUploadingObject(files []*multipart.FileHeader) stgiter.Up return nil, err } + fileName, err := url.PathUnescape(file.Filename) + if err != nil { + return nil, err + } + return &stgiter.IterUploadingObject{ - Path: file.Filename, + Path: fileName, Size: file.Size, File: stream, }, nil diff --git a/client/internal/http/server.go b/client/internal/http/server.go index d3c8fa9..44823ba 100644 --- a/client/internal/http/server.go +++ b/client/internal/http/server.go @@ -47,6 +47,7 @@ func (s *Server) initRouters() { s.engine.POST(cdssdk.ObjectDeletePath, s.Object().Delete) s.engine.GET(cdssdk.PackageGetPath, s.Package().Get) + s.engine.GET(cdssdk.PackageGetByNamePath, s.Package().GetByName) s.engine.POST(cdssdk.PackageCreatePath, s.Package().Create) s.engine.POST(cdssdk.PackageDeletePath, s.Package().Delete) s.engine.GET(cdssdk.PackageListBucketPackagesPath, s.Package().ListBucketPackages) @@ -59,6 +60,7 @@ func (s *Server) initRouters() { s.engine.POST(cdssdk.CacheMovePackagePath, s.Cache().MovePackage) + s.engine.GET(cdssdk.BucketGetByNamePath, s.Bucket().GetByName) s.engine.POST(cdssdk.BucketCreatePath, s.Bucket().Create) s.engine.POST(cdssdk.BucketDeletePath, s.Bucket().Delete) s.engine.GET(cdssdk.BucketListUserBucketsPath, s.Bucket().ListUserBuckets) diff --git a/client/internal/services/bucket.go b/client/internal/services/bucket.go index d402b54..57bc466 100644 --- a/client/internal/services/bucket.go +++ b/client/internal/services/bucket.go @@ -22,6 +22,21 @@ func (svc *BucketService) GetBucket(userID cdssdk.UserID, bucketID cdssdk.Bucket panic("not implement yet") } +func (svc *BucketService) GetBucketByName(userID cdssdk.UserID, bucketName string) (model.Bucket, error) { + coorCli, err := stgglb.CoordinatorMQPool.Acquire() + if err != nil { + return model.Bucket{}, fmt.Errorf("new coordinator client: %w", err) + } + defer stgglb.CoordinatorMQPool.Release(coorCli) + + resp, err := coorCli.GetBucketByName(coormq.ReqGetBucketByName(userID, bucketName)) + if err != nil { + return model.Bucket{}, fmt.Errorf("get bucket by name failed, err: %w", err) + } + + return resp.Bucket, nil +} + func (svc *BucketService) GetUserBuckets(userID cdssdk.UserID) ([]model.Bucket, error) { coorCli, err := stgglb.CoordinatorMQPool.Acquire() if err != nil { @@ -52,19 +67,19 @@ func (svc *BucketService) GetBucketPackages(userID cdssdk.UserID, bucketID cdssd return resp.Packages, nil } -func (svc *BucketService) CreateBucket(userID cdssdk.UserID, bucketName string) (cdssdk.BucketID, error) { +func (svc *BucketService) CreateBucket(userID cdssdk.UserID, bucketName string) (cdssdk.Bucket, error) { coorCli, err := stgglb.CoordinatorMQPool.Acquire() if err != nil { - return 0, fmt.Errorf("new coordinator client: %w", err) + return cdssdk.Bucket{}, fmt.Errorf("new coordinator client: %w", err) } defer stgglb.CoordinatorMQPool.Release(coorCli) resp, err := coorCli.CreateBucket(coormq.NewCreateBucket(userID, bucketName)) if err != nil { - return 0, fmt.Errorf("creating bucket: %w", err) + return cdssdk.Bucket{}, fmt.Errorf("creating bucket: %w", err) } - return resp.BucketID, nil + return resp.Bucket, nil } func (svc *BucketService) DeleteBucket(userID cdssdk.UserID, bucketID cdssdk.BucketID) error { diff --git a/client/internal/services/object.go b/client/internal/services/object.go index 620ce35..b351d04 100644 --- a/client/internal/services/object.go +++ b/client/internal/services/object.go @@ -2,7 +2,6 @@ package services import ( "fmt" - "io" "time" cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" @@ -66,7 +65,7 @@ func (svc *ObjectService) Move(userID cdssdk.UserID, movings []cdssdk.MovingObje return resp.Successes, nil } -func (svc *ObjectService) Download(userID cdssdk.UserID, objectID cdssdk.ObjectID) (io.ReadCloser, error) { +func (svc *ObjectService) Download(userID cdssdk.UserID, objectID cdssdk.ObjectID) (*iterator.IterDownloadingObject, error) { coorCli, err := stgglb.CoordinatorMQPool.Acquire() if err != nil { return nil, fmt.Errorf("new coordinator client: %w", err) @@ -92,7 +91,7 @@ func (svc *ObjectService) Download(userID cdssdk.UserID, objectID cdssdk.ObjectI return nil, err } - return downloading.File, nil + return downloading, nil } func (svc *ObjectService) Delete(userID cdssdk.UserID, objectIDs []cdssdk.ObjectID) error { diff --git a/client/internal/services/package.go b/client/internal/services/package.go index 405d53f..5b002ab 100644 --- a/client/internal/services/package.go +++ b/client/internal/services/package.go @@ -6,7 +6,6 @@ import ( cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" stgglb "gitlink.org.cn/cloudream/storage/common/globals" - "gitlink.org.cn/cloudream/storage/common/pkgs/db/model" "gitlink.org.cn/cloudream/storage/common/pkgs/iterator" coormq "gitlink.org.cn/cloudream/storage/common/pkgs/mq/coordinator" ) @@ -19,7 +18,7 @@ func (svc *Service) PackageSvc() *PackageService { return &PackageService{Service: svc} } -func (svc *PackageService) Get(userID cdssdk.UserID, packageID cdssdk.PackageID) (*model.Package, error) { +func (svc *PackageService) Get(userID cdssdk.UserID, packageID cdssdk.PackageID) (*cdssdk.Package, error) { coorCli, err := stgglb.CoordinatorMQPool.Acquire() if err != nil { return nil, fmt.Errorf("new coordinator client: %w", err) @@ -34,6 +33,21 @@ func (svc *PackageService) Get(userID cdssdk.UserID, packageID cdssdk.PackageID) return &getResp.Package, nil } +func (svc *PackageService) GetByName(userID cdssdk.UserID, bucketName string, packageName string) (*cdssdk.Package, error) { + coorCli, err := stgglb.CoordinatorMQPool.Acquire() + if err != nil { + return nil, fmt.Errorf("new coordinator client: %w", err) + } + defer stgglb.CoordinatorMQPool.Release(coorCli) + + getResp, err := coorCli.GetPackageByName(coormq.ReqGetPackageByName(userID, bucketName, packageName)) + if err != nil { + return nil, fmt.Errorf("requsting to coodinator: %w", err) + } + + return &getResp.Package, nil +} + func (svc *PackageService) GetBucketPackages(userID cdssdk.UserID, bucketID cdssdk.BucketID) ([]cdssdk.Package, error) { coorCli, err := stgglb.CoordinatorMQPool.Acquire() if err != nil { @@ -49,19 +63,19 @@ func (svc *PackageService) GetBucketPackages(userID cdssdk.UserID, bucketID cdss return getResp.Packages, nil } -func (svc *PackageService) Create(userID cdssdk.UserID, bucketID cdssdk.BucketID, name string) (cdssdk.PackageID, error) { +func (svc *PackageService) Create(userID cdssdk.UserID, bucketID cdssdk.BucketID, name string) (cdssdk.Package, error) { coorCli, err := stgglb.CoordinatorMQPool.Acquire() if err != nil { - return 0, fmt.Errorf("new coordinator client: %w", err) + return cdssdk.Package{}, fmt.Errorf("new coordinator client: %w", err) } defer stgglb.CoordinatorMQPool.Release(coorCli) resp, err := coorCli.CreatePackage(coormq.NewCreatePackage(userID, bucketID, name)) if err != nil { - return 0, fmt.Errorf("creating package: %w", err) + return cdssdk.Package{}, fmt.Errorf("creating package: %w", err) } - return resp.PackageID, nil + return resp.Package, nil } func (svc *PackageService) DownloadPackage(userID cdssdk.UserID, packageID cdssdk.PackageID) (iterator.DownloadingObjectIterator, error) { diff --git a/common/pkgs/cmd/upload_objects.go b/common/pkgs/cmd/upload_objects.go index 59a66bc..96c53d1 100644 --- a/common/pkgs/cmd/upload_objects.go +++ b/common/pkgs/cmd/upload_objects.go @@ -34,10 +34,9 @@ type UploadObjectsResult struct { } type ObjectUploadResult struct { - Info *iterator.IterUploadingObject - Error error - // TODO 这个字段没有被赋值 - ObjectID cdssdk.ObjectID + Info *iterator.IterUploadingObject + Error error + Object cdssdk.Object } type UploadNodeInfo struct { @@ -189,11 +188,26 @@ func uploadAndUpdatePackage(packageID cdssdk.PackageID, objectIter iterator.Uplo } } - _, err = coorCli.UpdatePackage(coormq.NewUpdatePackage(packageID, adds, nil)) + updateResp, err := coorCli.UpdatePackage(coormq.NewUpdatePackage(packageID, adds, nil)) if err != nil { return nil, fmt.Errorf("updating package: %w", err) } + updatedObjs := make(map[string]*cdssdk.Object) + for _, obj := range updateResp.Added { + o := obj + updatedObjs[obj.Path] = &o + } + + for i := range uploadRets { + obj := updatedObjs[uploadRets[i].Info.Path] + if obj == nil { + uploadRets[i].Error = fmt.Errorf("object %s not found in package", uploadRets[i].Info.Path) + continue + } + uploadRets[i].Object = *obj + } + return uploadRets, nil } diff --git a/common/pkgs/db/bucket.go b/common/pkgs/db/bucket.go index 5fdda6a..50d290a 100644 --- a/common/pkgs/db/bucket.go +++ b/common/pkgs/db/bucket.go @@ -18,6 +18,12 @@ func (db *DB) Bucket() *BucketDB { return &BucketDB{DB: db} } +func (db *BucketDB) GetByID(ctx SQLContext, bucketID cdssdk.BucketID) (cdssdk.Bucket, error) { + var ret cdssdk.Bucket + err := sqlx.Get(ctx, &ret, "select * from Bucket where BucketID = ?", bucketID) + return ret, err +} + // GetIDByName 根据BucketName查询BucketID func (db *BucketDB) GetIDByName(bucketName string) (int64, error) { //桶结构体 @@ -57,6 +63,15 @@ func (*BucketDB) GetUserBucket(ctx SQLContext, userID cdssdk.UserID, bucketID cd return ret, err } +func (*BucketDB) GetUserBucketByName(ctx SQLContext, userID cdssdk.UserID, bucketName string) (model.Bucket, error) { + var ret model.Bucket + err := sqlx.Get(ctx, &ret, + "select Bucket.* from UserBucket, Bucket where UserID = ? and"+ + " UserBucket.BucketID = Bucket.BucketID and"+ + " Bucket.Name = ?", userID, bucketName) + return ret, err +} + func (*BucketDB) GetUserBuckets(ctx SQLContext, userID cdssdk.UserID) ([]model.Bucket, error) { var ret []model.Bucket err := sqlx.Select(ctx, &ret, "select Bucket.* from UserBucket, Bucket where UserID = ? and UserBucket.BucketID = Bucket.BucketID", userID) diff --git a/common/pkgs/db/object.go b/common/pkgs/db/object.go index 7fd20ef..036620d 100644 --- a/common/pkgs/db/object.go +++ b/common/pkgs/db/object.go @@ -169,7 +169,7 @@ func (db *ObjectDB) GetPackageObjectDetails(ctx SQLContext, packageID cdssdk.Pac return rets, nil } -func (db *ObjectDB) BatchAdd(ctx SQLContext, packageID cdssdk.PackageID, adds []coormq.AddObjectEntry) ([]cdssdk.ObjectID, error) { +func (db *ObjectDB) BatchAdd(ctx SQLContext, packageID cdssdk.PackageID, adds []coormq.AddObjectEntry) ([]cdssdk.Object, error) { if len(adds) == 0 { return nil, nil } @@ -244,7 +244,7 @@ func (db *ObjectDB) BatchAdd(ctx SQLContext, packageID cdssdk.PackageID, adds [] return nil, fmt.Errorf("batch create caches: %w", err) } - return addedObjIDs, nil + return addedObjs, nil } func (db *ObjectDB) BatchUpdateRedundancy(ctx SQLContext, objs []coormq.UpdatingObjectRedundancy) error { diff --git a/common/pkgs/db/package.go b/common/pkgs/db/package.go index 4310ced..8c506ea 100644 --- a/common/pkgs/db/package.go +++ b/common/pkgs/db/package.go @@ -77,6 +77,20 @@ func (db *PackageDB) GetUserPackage(ctx SQLContext, userID cdssdk.UserID, packag return ret, err } +// 在指定名称的Bucket中查找指定名称的Package +func (*PackageDB) GetUserPackageByName(ctx SQLContext, userID cdssdk.UserID, bucketName string, packageName string) (cdssdk.Package, error) { + var ret model.Package + err := sqlx.Get(ctx, &ret, + "select Package.* from Package, Bucket, UserBucket where"+ + " Package.Name = ? and"+ + " Package.BucketID = Bucket.BucketID and"+ + " Bucket.Name = ? and"+ + " UserBucket.UserID = ? and"+ + " UserBucket.BucketID = Bucket.BucketID", + packageName, bucketName, userID) + return ret, err +} + func (db *PackageDB) Create(ctx SQLContext, bucketID cdssdk.BucketID, name string) (cdssdk.PackageID, error) { // 根据packagename和bucketid查询,若不存在则插入,若存在则返回错误 var packageID int64 diff --git a/common/pkgs/iterator/download_object_iterator.go b/common/pkgs/iterator/download_object_iterator.go index 792f9cc..cd58dc4 100644 --- a/common/pkgs/iterator/download_object_iterator.go +++ b/common/pkgs/iterator/download_object_iterator.go @@ -18,7 +18,6 @@ import ( stgglb "gitlink.org.cn/cloudream/storage/common/globals" stgmod "gitlink.org.cn/cloudream/storage/common/models" stgmodels "gitlink.org.cn/cloudream/storage/common/models" - "gitlink.org.cn/cloudream/storage/common/pkgs/db/model" "gitlink.org.cn/cloudream/storage/common/pkgs/distlock" "gitlink.org.cn/cloudream/storage/common/pkgs/ec" coormq "gitlink.org.cn/cloudream/storage/common/pkgs/mq/coordinator" @@ -27,7 +26,7 @@ import ( type DownloadingObjectIterator = Iterator[*IterDownloadingObject] type IterDownloadingObject struct { - Object model.Object + Object cdssdk.Object File io.ReadCloser } diff --git a/common/pkgs/mq/coordinator/bucket.go b/common/pkgs/mq/coordinator/bucket.go index a129bac..2dc7c7a 100644 --- a/common/pkgs/mq/coordinator/bucket.go +++ b/common/pkgs/mq/coordinator/bucket.go @@ -7,6 +7,8 @@ import ( ) type BucketService interface { + GetBucketByName(msg *GetBucketByName) (*GetBucketByNameResp, *mq.CodeMessage) + GetUserBuckets(msg *GetUserBuckets) (*GetUserBucketsResp, *mq.CodeMessage) GetBucketPackages(msg *GetBucketPackages) (*GetBucketPackagesResp, *mq.CodeMessage) @@ -16,6 +18,34 @@ type BucketService interface { DeleteBucket(msg *DeleteBucket) (*DeleteBucketResp, *mq.CodeMessage) } +// 根据桶名获取桶 +var _ = Register(Service.GetBucketByName) + +type GetBucketByName struct { + mq.MessageBodyBase + UserID cdssdk.UserID `json:"userID"` + Name string `json:"name"` +} +type GetBucketByNameResp struct { + mq.MessageBodyBase + Bucket cdssdk.Bucket `json:"bucket"` +} + +func ReqGetBucketByName(userID cdssdk.UserID, name string) *GetBucketByName { + return &GetBucketByName{ + UserID: userID, + Name: name, + } +} +func RespGetBucketByName(bucket cdssdk.Bucket) *GetBucketByNameResp { + return &GetBucketByNameResp{ + Bucket: bucket, + } +} +func (client *Client) GetBucketByName(msg *GetBucketByName) (*GetBucketByNameResp, error) { + return mq.Request(Service.GetBucketByName, client.rabbitCli, msg) +} + // 获取用户所有的桶 var _ = Register(Service.GetUserBuckets) @@ -80,7 +110,7 @@ type CreateBucket struct { } type CreateBucketResp struct { mq.MessageBodyBase - BucketID cdssdk.BucketID `json:"bucketID"` + Bucket cdssdk.Bucket `json:"bucket"` } func NewCreateBucket(userID cdssdk.UserID, bucketName string) *CreateBucket { @@ -89,9 +119,9 @@ func NewCreateBucket(userID cdssdk.UserID, bucketName string) *CreateBucket { BucketName: bucketName, } } -func NewCreateBucketResp(bucketID cdssdk.BucketID) *CreateBucketResp { +func NewCreateBucketResp(bucket cdssdk.Bucket) *CreateBucketResp { return &CreateBucketResp{ - BucketID: bucketID, + Bucket: bucket, } } func (client *Client) CreateBucket(msg *CreateBucket) (*CreateBucketResp, error) { diff --git a/common/pkgs/mq/coordinator/package.go b/common/pkgs/mq/coordinator/package.go index 3d01e5d..d80cc37 100644 --- a/common/pkgs/mq/coordinator/package.go +++ b/common/pkgs/mq/coordinator/package.go @@ -12,6 +12,8 @@ import ( type PackageService interface { GetPackage(msg *GetPackage) (*GetPackageResp, *mq.CodeMessage) + GetPackageByName(msg *GetPackageByName) (*GetPackageByNameResp, *mq.CodeMessage) + CreatePackage(msg *CreatePackage) (*CreatePackageResp, *mq.CodeMessage) UpdatePackage(msg *UpdatePackage) (*UpdatePackageResp, *mq.CodeMessage) @@ -51,6 +53,36 @@ func (client *Client) GetPackage(msg *GetPackage) (*GetPackageResp, error) { return mq.Request(Service.GetPackage, client.rabbitCli, msg) } +// 根据名称获取Package +var _ = Register(Service.GetPackageByName) + +type GetPackageByName struct { + mq.MessageBodyBase + UserID cdssdk.UserID `json:"userID"` + BucketName string `json:"bucketName"` + PackageName string `json:"packageName"` +} +type GetPackageByNameResp struct { + mq.MessageBodyBase + Package cdssdk.Package `json:"package"` +} + +func ReqGetPackageByName(userID cdssdk.UserID, bucketName string, packageName string) *GetPackageByName { + return &GetPackageByName{ + UserID: userID, + BucketName: bucketName, + PackageName: packageName, + } +} +func NewGetPackageByNameResp(pkg cdssdk.Package) *GetPackageByNameResp { + return &GetPackageByNameResp{ + Package: pkg, + } +} +func (client *Client) GetPackageByName(msg *GetPackageByName) (*GetPackageByNameResp, error) { + return mq.Request(Service.GetPackageByName, client.rabbitCli, msg) +} + // 创建一个Package var _ = Register(Service.CreatePackage) @@ -62,7 +94,7 @@ type CreatePackage struct { } type CreatePackageResp struct { mq.MessageBodyBase - PackageID cdssdk.PackageID `json:"packageID"` + Package cdssdk.Package `json:"package"` } func NewCreatePackage(userID cdssdk.UserID, bucketID cdssdk.BucketID, name string) *CreatePackage { @@ -72,9 +104,9 @@ func NewCreatePackage(userID cdssdk.UserID, bucketID cdssdk.BucketID, name strin Name: name, } } -func NewCreatePackageResp(packageID cdssdk.PackageID) *CreatePackageResp { +func NewCreatePackageResp(pkg cdssdk.Package) *CreatePackageResp { return &CreatePackageResp{ - PackageID: packageID, + Package: pkg, } } func (client *Client) CreatePackage(msg *CreatePackage) (*CreatePackageResp, error) { @@ -92,6 +124,7 @@ type UpdatePackage struct { } type UpdatePackageResp struct { mq.MessageBodyBase + Added []cdssdk.Object `json:"added"` } type AddObjectEntry struct { Path string `json:"path"` @@ -108,8 +141,10 @@ func NewUpdatePackage(packageID cdssdk.PackageID, adds []AddObjectEntry, deletes Deletes: deletes, } } -func NewUpdatePackageResp() *UpdatePackageResp { - return &UpdatePackageResp{} +func NewUpdatePackageResp(added []cdssdk.Object) *UpdatePackageResp { + return &UpdatePackageResp{ + Added: added, + } } func NewAddObjectEntry(path string, size int64, fileHash string, uploadTime time.Time, nodeID cdssdk.NodeID) AddObjectEntry { return AddObjectEntry{ diff --git a/coordinator/internal/mq/bucket.go b/coordinator/internal/mq/bucket.go index 78e5b41..e8278e9 100644 --- a/coordinator/internal/mq/bucket.go +++ b/coordinator/internal/mq/bucket.go @@ -18,6 +18,18 @@ func (svc *Service) GetBucket(userID cdssdk.UserID, bucketID cdssdk.BucketID) (m panic("not implement yet") } +func (svc *Service) GetBucketByName(msg *coormq.GetBucketByName) (*coormq.GetBucketByNameResp, *mq.CodeMessage) { + bucket, err := svc.db.Bucket().GetUserBucketByName(svc.db.SQLCtx(), msg.UserID, msg.Name) + if err != nil { + logger.WithField("UserID", msg.UserID). + WithField("Name", msg.Name). + Warnf("getting bucket by name: %s", err.Error()) + return nil, mq.Failed(errorcode.OperationFailed, "get bucket by name failed") + } + + return mq.ReplyOK(coormq.RespGetBucketByName(bucket)) +} + func (svc *Service) GetUserBuckets(msg *coormq.GetUserBuckets) (*coormq.GetUserBucketsResp, *mq.CodeMessage) { buckets, err := svc.db.Bucket().GetUserBuckets(svc.db.SQLCtx(), msg.UserID) @@ -44,18 +56,22 @@ func (svc *Service) GetBucketPackages(msg *coormq.GetBucketPackages) (*coormq.Ge } func (svc *Service) CreateBucket(msg *coormq.CreateBucket) (*coormq.CreateBucketResp, *mq.CodeMessage) { - var bucketID cdssdk.BucketID + var bucket cdssdk.Bucket err := svc.db.DoTx(sql.LevelSerializable, func(tx *sqlx.Tx) error { _, err := svc.db.User().GetByID(tx, msg.UserID) if err != nil { return fmt.Errorf("getting user by id: %w", err) } - bucketID, err = svc.db.Bucket().Create(tx, msg.UserID, msg.BucketName) + bucketID, err := svc.db.Bucket().Create(tx, msg.UserID, msg.BucketName) if err != nil { return fmt.Errorf("creating bucket: %w", err) } + bucket, err = svc.db.Bucket().GetByID(tx, bucketID) + if err != nil { + return fmt.Errorf("getting bucket by id: %w", err) + } return nil }) if err != nil { @@ -65,7 +81,7 @@ func (svc *Service) CreateBucket(msg *coormq.CreateBucket) (*coormq.CreateBucket return nil, mq.Failed(errorcode.OperationFailed, "create bucket failed") } - return mq.ReplyOK(coormq.NewCreateBucketResp(bucketID)) + return mq.ReplyOK(coormq.NewCreateBucketResp(bucket)) } func (svc *Service) DeleteBucket(msg *coormq.DeleteBucket) (*coormq.DeleteBucketResp, *mq.CodeMessage) { diff --git a/coordinator/internal/mq/package.go b/coordinator/internal/mq/package.go index ddab4ee..73acd1c 100644 --- a/coordinator/internal/mq/package.go +++ b/coordinator/internal/mq/package.go @@ -26,8 +26,22 @@ func (svc *Service) GetPackage(msg *coormq.GetPackage) (*coormq.GetPackageResp, return mq.ReplyOK(coormq.NewGetPackageResp(pkg)) } +func (svc *Service) GetPackageByName(msg *coormq.GetPackageByName) (*coormq.GetPackageByNameResp, *mq.CodeMessage) { + pkg, err := svc.db.Package().GetUserPackageByName(svc.db.SQLCtx(), msg.UserID, msg.BucketName, msg.PackageName) + if err != nil { + logger.WithField("UserID", msg.UserID). + WithField("BucketName", msg.BucketName). + WithField("PackageName", msg.PackageName). + Warnf("get package by name: %s", err.Error()) + + return nil, mq.Failed(errorcode.OperationFailed, "get package by name failed") + } + + return mq.ReplyOK(coormq.NewGetPackageByNameResp(pkg)) +} + func (svc *Service) CreatePackage(msg *coormq.CreatePackage) (*coormq.CreatePackageResp, *mq.CodeMessage) { - var pkgID cdssdk.PackageID + var pkg cdssdk.Package err := svc.db.DoTx(sql.LevelSerializable, func(tx *sqlx.Tx) error { var err error @@ -36,11 +50,16 @@ func (svc *Service) CreatePackage(msg *coormq.CreatePackage) (*coormq.CreatePack return fmt.Errorf("bucket is not avaiable to the user") } - pkgID, err = svc.db.Package().Create(tx, msg.BucketID, msg.Name) + pkgID, err := svc.db.Package().Create(tx, msg.BucketID, msg.Name) if err != nil { return fmt.Errorf("creating package: %w", err) } + pkg, err = svc.db.Package().GetByID(tx, pkgID) + if err != nil { + return fmt.Errorf("getting package by id: %w", err) + } + return nil }) if err != nil { @@ -50,10 +69,11 @@ func (svc *Service) CreatePackage(msg *coormq.CreatePackage) (*coormq.CreatePack return nil, mq.Failed(errorcode.OperationFailed, "creating package failed") } - return mq.ReplyOK(coormq.NewCreatePackageResp(pkgID)) + return mq.ReplyOK(coormq.NewCreatePackageResp(pkg)) } func (svc *Service) UpdatePackage(msg *coormq.UpdatePackage) (*coormq.UpdatePackageResp, *mq.CodeMessage) { + var added []cdssdk.Object err := svc.db.DoTx(sql.LevelSerializable, func(tx *sqlx.Tx) error { _, err := svc.db.Package().GetByID(tx, msg.PackageID) if err != nil { @@ -69,9 +89,11 @@ func (svc *Service) UpdatePackage(msg *coormq.UpdatePackage) (*coormq.UpdatePack // 再执行添加操作 if len(msg.Adds) > 0 { - if _, err := svc.db.Object().BatchAdd(tx, msg.PackageID, msg.Adds); err != nil { + ad, err := svc.db.Object().BatchAdd(tx, msg.PackageID, msg.Adds) + if err != nil { return fmt.Errorf("adding objects: %w", err) } + added = ad } return nil @@ -81,7 +103,7 @@ func (svc *Service) UpdatePackage(msg *coormq.UpdatePackage) (*coormq.UpdatePack return nil, mq.Failed(errorcode.OperationFailed, "update package failed") } - return mq.ReplyOK(coormq.NewUpdatePackageResp()) + return mq.ReplyOK(coormq.NewUpdatePackageResp(added)) } func (svc *Service) DeletePackage(msg *coormq.DeletePackage) (*coormq.DeletePackageResp, *mq.CodeMessage) {