diff --git a/client/internal/http/object.go b/client/internal/http/object.go index ae25c53..3865013 100644 --- a/client/internal/http/object.go +++ b/client/internal/http/object.go @@ -358,6 +358,26 @@ func (s *ObjectService) DeleteByPath(ctx *gin.Context) { ctx.JSON(http.StatusOK, OK(nil)) } +func (s *ObjectService) Clone(ctx *gin.Context) { + log := logger.WithField("HTTP", "Object.Clone") + + var req cdsapi.ObjectClone + if err := ctx.ShouldBindJSON(&req); err != nil { + log.Warnf("binding body: %s", err.Error()) + ctx.JSON(http.StatusBadRequest, Failed(errorcode.BadArgument, "missing argument or invalid argument")) + return + } + + objs, err := s.svc.ObjectSvc().Clone(req.UserID, req.Clonings) + if err != nil { + log.Warnf("cloning object: %s", err.Error()) + ctx.JSON(http.StatusOK, Failed(errorcode.OperationFailed, "clone object failed")) + return + } + + ctx.JSON(http.StatusOK, OK(cdsapi.ObjectCloneResp{Objects: objs})) +} + func (s *ObjectService) GetPackageObjects(ctx *gin.Context) { log := logger.WithField("HTTP", "Object.GetPackageObjects") diff --git a/client/internal/http/server.go b/client/internal/http/server.go index 5590b01..821db1c 100644 --- a/client/internal/http/server.go +++ b/client/internal/http/server.go @@ -54,6 +54,7 @@ func (s *Server) initRouters() { rt.POST(cdsapi.ObjectMovePath, s.Object().Move) rt.POST(cdsapi.ObjectDeletePath, s.Object().Delete) rt.POST(cdsapi.ObjectDeleteByPathPath, s.Object().DeleteByPath) + rt.POST(cdsapi.ObjectClonePath, s.Object().Clone) rt.GET(cdsapi.PackageGetPath, s.Package().Get) rt.GET(cdsapi.PackageGetByNamePath, s.Package().GetByName) diff --git a/client/internal/services/object.go b/client/internal/services/object.go index 2f8e4b7..b155ece 100644 --- a/client/internal/services/object.go +++ b/client/internal/services/object.go @@ -113,6 +113,21 @@ func (svc *ObjectService) Delete(userID cdssdk.UserID, objectIDs []cdssdk.Object return nil } +func (svc *ObjectService) Clone(userID cdssdk.UserID, clonings []cdsapi.CloningObject) ([]*cdssdk.Object, error) { + coorCli, err := stgglb.CoordinatorMQPool.Acquire() + if err != nil { + return nil, fmt.Errorf("new coordinator client: %w", err) + } + defer stgglb.CoordinatorMQPool.Release(coorCli) + + resp, err := coorCli.CloneObjects(coormq.ReqCloneObjects(userID, clonings)) + if err != nil { + return nil, fmt.Errorf("requsting to coodinator: %w", err) + } + + return resp.Objects, nil +} + // GetPackageObjects 获取包中的对象列表。 // userID: 用户ID。 // packageID: 包ID。 diff --git a/common/pkgs/db2/object.go b/common/pkgs/db2/object.go index c876c4a..00a7069 100644 --- a/common/pkgs/db2/object.go +++ b/common/pkgs/db2/object.go @@ -85,6 +85,41 @@ func (db *ObjectDB) BatchGetByPackagePath(ctx SQLContext, pkgID cdssdk.PackageID return objs, nil } +// 仅返回查询到的对象 +func (db *ObjectDB) BatchGetDetails(ctx SQLContext, objectIDs []cdssdk.ObjectID) ([]stgmod.ObjectDetail, error) { + var objs []cdssdk.Object + + err := ctx.Table("Object").Where("ObjectID IN ?", objectIDs).Order("ObjectID ASC").Find(&objs).Error + if err != nil { + return nil, err + } + + // 获取所有的 ObjectBlock + var allBlocks []stgmod.ObjectBlock + err = ctx.Table("ObjectBlock").Where("ObjectID IN ?", objectIDs).Order("ObjectID, `Index` ASC").Find(&allBlocks).Error + if err != nil { + return nil, err + } + + // 获取所有的 PinnedObject + var allPinnedObjs []cdssdk.PinnedObject + err = ctx.Table("PinnedObject").Where("ObjectID IN ?", objectIDs).Order("ObjectID ASC").Find(&allPinnedObjs).Error + if err != nil { + return nil, err + } + + details := make([]stgmod.ObjectDetail, len(objs)) + for i, obj := range objs { + details[i] = stgmod.ObjectDetail{ + Object: obj, + } + } + + stgmod.DetailsFillObjectBlocks(details, allBlocks) + stgmod.DetailsFillPinnedAt(details, allPinnedObjs) + return details, nil +} + func (db *ObjectDB) Create(ctx SQLContext, obj cdssdk.Object) (cdssdk.ObjectID, error) { err := ctx.Table("Object").Create(&obj).Error if err != nil { diff --git a/common/pkgs/mq/coordinator/object.go b/common/pkgs/mq/coordinator/object.go index ef14872..c90712d 100644 --- a/common/pkgs/mq/coordinator/object.go +++ b/common/pkgs/mq/coordinator/object.go @@ -28,6 +28,8 @@ type ObjectService interface { DeleteObjects(msg *DeleteObjects) (*DeleteObjectsResp, *mq.CodeMessage) + CloneObjects(msg *CloneObjects) (*CloneObjectsResp, *mq.CodeMessage) + GetDatabaseAll(msg *GetDatabaseAll) (*GetDatabaseAllResp, *mq.CodeMessage) AddAccessStat(msg *AddAccessStat) @@ -285,6 +287,34 @@ func (client *Client) DeleteObjects(msg *DeleteObjects) (*DeleteObjectsResp, err return mq.Request(Service.DeleteObjects, client.rabbitCli, msg) } +// 克隆Object +var _ = Register(Service.CloneObjects) + +type CloneObjects struct { + mq.MessageBodyBase + UserID cdssdk.UserID `json:"userID"` + Clonings []cdsapi.CloningObject `json:"clonings"` +} +type CloneObjectsResp struct { + mq.MessageBodyBase + Objects []*cdssdk.Object `json:"objects"` +} + +func ReqCloneObjects(userID cdssdk.UserID, clonings []cdsapi.CloningObject) *CloneObjects { + return &CloneObjects{ + UserID: userID, + Clonings: clonings, + } +} +func RespCloneObjects(objects []*cdssdk.Object) *CloneObjectsResp { + return &CloneObjectsResp{ + Objects: objects, + } +} +func (client *Client) CloneObjects(msg *CloneObjects) (*CloneObjectsResp, error) { + return mq.Request(Service.CloneObjects, client.rabbitCli, msg) +} + // 增加访问计数 var _ = RegisterNoReply(Service.AddAccessStat) diff --git a/coordinator/internal/mq/object.go b/coordinator/internal/mq/object.go index bdea776..4431e16 100644 --- a/coordinator/internal/mq/object.go +++ b/coordinator/internal/mq/object.go @@ -483,3 +483,130 @@ func (svc *Service) DeleteObjects(msg *coormq.DeleteObjects) (*coormq.DeleteObje return mq.ReplyOK(coormq.RespDeleteObjects()) } + +func (svc *Service) CloneObjects(msg *coormq.CloneObjects) (*coormq.CloneObjectsResp, *mq.CodeMessage) { + type CloningObject struct { + Cloning cdsapi.CloningObject + OrgIndex int + } + type PackageClonings struct { + PackageID cdssdk.PackageID + Clonings map[string]CloningObject + } + + // TODO 要检查用户是否有Object、Package的权限 + clonings := make(map[cdssdk.PackageID]*PackageClonings) + for i, cloning := range msg.Clonings { + pkg, ok := clonings[cloning.NewPackageID] + if !ok { + pkg = &PackageClonings{ + PackageID: cloning.NewPackageID, + Clonings: make(map[string]CloningObject), + } + clonings[cloning.NewPackageID] = pkg + } + pkg.Clonings[cloning.NewPath] = CloningObject{ + Cloning: cloning, + OrgIndex: i, + } + } + + ret := make([]*cdssdk.Object, len(msg.Clonings)) + err := svc.db2.DoTx(func(tx db2.SQLContext) error { + // 剔除掉新路径已经存在的对象 + for _, pkg := range clonings { + exists, err := svc.db2.Object().BatchGetByPackagePath(tx, pkg.PackageID, lo.Keys(pkg.Clonings)) + if err != nil { + return fmt.Errorf("batch getting objects by package path: %w", err) + } + + for _, obj := range exists { + delete(pkg.Clonings, obj.Path) + } + } + + // 删除目的Package不存在的对象 + newPkg, err := svc.db2.Package().BatchTestPackageID(tx, lo.Keys(clonings)) + if err != nil { + return fmt.Errorf("batch testing package id: %w", err) + } + for _, pkg := range clonings { + if !newPkg[pkg.PackageID] { + delete(clonings, pkg.PackageID) + } + } + + var avaiClonings []CloningObject + var avaiObjIDs []cdssdk.ObjectID + for _, pkg := range clonings { + for _, cloning := range pkg.Clonings { + avaiClonings = append(avaiClonings, cloning) + avaiObjIDs = append(avaiObjIDs, cloning.Cloning.ObjectID) + } + } + + avaiDetails, err := svc.db2.Object().BatchGetDetails(tx, avaiObjIDs) + if err != nil { + return fmt.Errorf("batch getting object details: %w", err) + } + + avaiDetailsMap := make(map[cdssdk.ObjectID]stgmod.ObjectDetail) + for _, detail := range avaiDetails { + avaiDetailsMap[detail.Object.ObjectID] = detail + } + + oldAvaiClonings := avaiClonings + avaiClonings = nil + + var newObjs []cdssdk.Object + for _, cloning := range oldAvaiClonings { + // 进一步剔除原始对象不存在的情况 + detail, ok := avaiDetailsMap[cloning.Cloning.ObjectID] + if !ok { + continue + } + + avaiClonings = append(avaiClonings, cloning) + + newObj := detail.Object + newObj.ObjectID = 0 + newObj.Path = cloning.Cloning.NewPath + newObj.PackageID = cloning.Cloning.NewPackageID + newObjs = append(newObjs, newObj) + } + + // 先创建出新对象 + err = svc.db2.Object().BatchCreate(tx, &newObjs) + if err != nil { + return fmt.Errorf("batch creating objects: %w", err) + } + + // 创建了新对象就能拿到新对象ID,再创建新对象块 + var newBlks []stgmod.ObjectBlock + for i, cloning := range avaiClonings { + oldBlks := avaiDetailsMap[cloning.Cloning.ObjectID].Blocks + for _, blk := range oldBlks { + newBlk := blk + newBlk.ObjectID = newObjs[i].ObjectID + newBlks = append(newBlks, newBlk) + } + } + + err = svc.db2.ObjectBlock().BatchCreate(tx, newBlks) + if err != nil { + return fmt.Errorf("batch creating object blocks: %w", err) + } + + for i, cloning := range avaiClonings { + ret[cloning.OrgIndex] = &newObjs[i] + } + return nil + }) + + if err != nil { + logger.Warnf("cloning objects: %s", err.Error()) + return nil, mq.Failed(errorcode.OperationFailed, err.Error()) + } + + return mq.ReplyOK(coormq.RespCloneObjects(ret)) +}