From 4cd940fc803eabba2955f89a05848cf4eba811a4 Mon Sep 17 00:00:00 2001 From: Sydonian <794346190@qq.com> Date: Tue, 23 Apr 2024 16:43:42 +0800 Subject: [PATCH] =?UTF-8?q?=E5=A2=9E=E5=8A=A0=E8=8E=B7=E5=8F=96=E5=85=A8?= =?UTF-8?q?=E9=87=8F=E6=95=B0=E6=8D=AE=E7=9A=84=E6=8E=A5=E5=8F=A3?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- client/internal/http/server.go | 1 + client/internal/http/temp.go | 156 ++++++++++++++++++++++++++- client/internal/services/temp.go | 23 ++++ common/pkgs/mq/coordinator/object.go | 2 + common/pkgs/mq/coordinator/temp.go | 38 +++++++ coordinator/internal/mq/temp.go | 52 +++++++++ 6 files changed, 268 insertions(+), 4 deletions(-) create mode 100644 client/internal/services/temp.go create mode 100644 common/pkgs/mq/coordinator/temp.go create mode 100644 coordinator/internal/mq/temp.go diff --git a/client/internal/http/server.go b/client/internal/http/server.go index c4a1b91..de978d0 100644 --- a/client/internal/http/server.go +++ b/client/internal/http/server.go @@ -70,4 +70,5 @@ func (s *Server) initRouters() { rt.GET("/bucket/listDetails", s.Temp().ListDetails) rt.GET("/bucket/getObjects", s.Temp().GetObjects) rt.GET("/object/getDetail", s.Temp().GetObjectDetail) + rt.GET("/temp/getDatabaseAll", s.Temp().GetDatabaseAll) } diff --git a/client/internal/http/temp.go b/client/internal/http/temp.go index 6977a29..58c88ad 100644 --- a/client/internal/http/temp.go +++ b/client/internal/http/temp.go @@ -93,10 +93,11 @@ type TempGetObjectDetailResp struct { Blocks []ObjectBlockDetail `json:"blocks"` } type ObjectBlockDetail struct { - Type string `json:"type"` - FileHash string `json:"fileHash"` - LocationType string `json:"locationType"` - LocationName string `json:"locationName"` + ObjectID cdssdk.ObjectID `json:"objectID"` + Type string `json:"type"` + FileHash string `json:"fileHash"` + LocationType string `json:"locationType"` + LocationName string `json:"locationName"` } func (s *TempService) GetObjectDetail(ctx *gin.Context) { @@ -228,6 +229,153 @@ func (s *TempService) getBucketObjects(bktID cdssdk.BucketID) ([]cdssdk.Object, return allObjs, nil } +type TempGetDatabaseAll struct { +} +type TempGetDatabaseAllResp struct { + Buckets []BucketDetail `json:"buckets"` + Objects []BucketObject `json:"objects"` + Blocks []ObjectBlockDetail `json:"blocks"` +} +type BucketObject struct { + cdssdk.Object + BucketID cdssdk.BucketID `json:"bucketID"` +} + +func (s *TempService) GetDatabaseAll(ctx *gin.Context) { + log := logger.WithField("HTTP", "Temp.GetDatabaseAll") + + db, err := s.svc.ObjectSvc().GetDatabaseAll() + if err != nil { + log.Warnf("getting database all: %s", err.Error()) + ctx.JSON(http.StatusOK, Failed(errorcode.OperationFailed, "get database all failed")) + return + } + + nodes, err := s.svc.NodeSvc().GetNodes(nil) + if err != nil { + log.Warnf("getting nodes: %s", err.Error()) + ctx.JSON(http.StatusOK, Failed(errorcode.OperationFailed, "get nodes failed")) + return + } + allNodes := make(map[cdssdk.NodeID]cdssdk.Node) + for _, n := range nodes { + allNodes[n.NodeID] = n + } + + bkts := make(map[cdssdk.BucketID]*BucketDetail) + for _, bkt := range db.Buckets { + bkts[bkt.BucketID] = &BucketDetail{ + BucketID: bkt.BucketID, + Name: bkt.Name, + ObjectCount: 0, + } + } + + type PackageDetail struct { + Package cdssdk.Package + Loaded []cdssdk.Node + } + pkgs := make(map[cdssdk.PackageID]*PackageDetail) + for _, pkg := range db.Packages { + p := PackageDetail{ + Package: pkg, + Loaded: make([]cdssdk.Node, 0), + } + + loaded, err := s.svc.PackageSvc().GetLoadedNodes(1, pkg.PackageID) + if err != nil { + log.Warnf("getting loaded nodes: %s", err.Error()) + ctx.JSON(http.StatusOK, Failed(errorcode.OperationFailed, "get loaded nodes failed")) + return + } + + for _, nodeID := range loaded { + p.Loaded = append(p.Loaded, allNodes[nodeID]) + } + + pkgs[pkg.PackageID] = &p + } + + var objs []BucketObject + for _, obj := range db.Objects { + o := BucketObject{ + Object: obj.Object, + BucketID: pkgs[obj.Object.PackageID].Package.BucketID, + } + objs = append(objs, o) + } + + var blocks []ObjectBlockDetail + for _, obj := range db.Objects { + bkts[pkgs[obj.Object.PackageID].Package.BucketID].ObjectCount++ + + for _, nodeID := range obj.PinnedAt { + blocks = append(blocks, ObjectBlockDetail{ + ObjectID: obj.Object.ObjectID, + Type: "Rep", + FileHash: obj.Object.FileHash, + LocationType: "Agent", + LocationName: allNodes[nodeID].Name, + }) + } + + switch obj.Object.Redundancy.(type) { + case *cdssdk.NoneRedundancy: + for _, blk := range obj.Blocks { + if !lo.Contains(obj.PinnedAt, blk.NodeID) { + blocks = append(blocks, ObjectBlockDetail{ + ObjectID: obj.Object.ObjectID, + Type: "Rep", + FileHash: blk.FileHash, + LocationType: "Agent", + LocationName: allNodes[blk.NodeID].Name, + }) + } + } + case *cdssdk.RepRedundancy: + for _, blk := range obj.Blocks { + if !lo.Contains(obj.PinnedAt, blk.NodeID) { + blocks = append(blocks, ObjectBlockDetail{ + ObjectID: obj.Object.ObjectID, + Type: "Rep", + FileHash: blk.FileHash, + LocationType: "Agent", + LocationName: allNodes[blk.NodeID].Name, + }) + } + } + + case *cdssdk.ECRedundancy: + for _, blk := range obj.Blocks { + blocks = append(blocks, ObjectBlockDetail{ + ObjectID: obj.Object.ObjectID, + Type: "Block", + FileHash: blk.FileHash, + LocationType: "Agent", + LocationName: allNodes[blk.NodeID].Name, + }) + } + } + + for _, node := range pkgs[obj.Object.PackageID].Loaded { + blocks = append(blocks, ObjectBlockDetail{ + ObjectID: obj.Object.ObjectID, + Type: "Rep", + FileHash: obj.Object.FileHash, + LocationType: "Storage", + LocationName: allNodes[node.NodeID].Name, + }) + } + + } + + ctx.JSON(http.StatusOK, OK(TempGetDatabaseAllResp{ + Buckets: lo.Map(lo.Values(bkts), func(b *BucketDetail, _ int) BucketDetail { return *b }), + Objects: objs, + Blocks: blocks, + })) +} + func auth(ctx *gin.Context) { token := ctx.Request.Header.Get("X-CDS-Auth") if token != "cloudream@123" { diff --git a/client/internal/services/temp.go b/client/internal/services/temp.go new file mode 100644 index 0000000..08d72aa --- /dev/null +++ b/client/internal/services/temp.go @@ -0,0 +1,23 @@ +package services + +import ( + "fmt" + + stgglb "gitlink.org.cn/cloudream/storage/common/globals" + coormq "gitlink.org.cn/cloudream/storage/common/pkgs/mq/coordinator" +) + +func (svc *ObjectService) GetDatabaseAll() (*coormq.GetDatabaseAllResp, error) { + coorCli, err := stgglb.CoordinatorMQPool.Acquire() + if err != nil { + return nil, fmt.Errorf("new coordinator client: %w", err) + } + defer stgglb.CoordinatorMQPool.Release(coorCli) + + getResp, err := coorCli.GetDatabaseAll(coormq.ReqGetDatabaseAll(1)) + if err != nil { + return nil, fmt.Errorf("requsting to coodinator: %w", err) + } + + return getResp, nil +} diff --git a/common/pkgs/mq/coordinator/object.go b/common/pkgs/mq/coordinator/object.go index a112365..d09303a 100644 --- a/common/pkgs/mq/coordinator/object.go +++ b/common/pkgs/mq/coordinator/object.go @@ -22,6 +22,8 @@ type ObjectService interface { MoveObjects(msg *MoveObjects) (*MoveObjectsResp, *mq.CodeMessage) DeleteObjects(msg *DeleteObjects) (*DeleteObjectsResp, *mq.CodeMessage) + + GetDatabaseAll(msg *GetDatabaseAll) (*GetDatabaseAllResp, *mq.CodeMessage) } // 查询Package中的所有Object,返回的Objects会按照ObjectID升序 diff --git a/common/pkgs/mq/coordinator/temp.go b/common/pkgs/mq/coordinator/temp.go new file mode 100644 index 0000000..3dda538 --- /dev/null +++ b/common/pkgs/mq/coordinator/temp.go @@ -0,0 +1,38 @@ +package coordinator + +import ( + "gitlink.org.cn/cloudream/common/pkgs/mq" + cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" + stgmod "gitlink.org.cn/cloudream/storage/common/models" +) + +// 删除Object +var _ = Register(Service.GetDatabaseAll) + +type GetDatabaseAll struct { + mq.MessageBodyBase + UserID cdssdk.UserID `json:"userID"` +} + +type GetDatabaseAllResp struct { + mq.MessageBodyBase + Buckets []cdssdk.Bucket `json:"buckets"` + Packages []cdssdk.Package `json:"packages"` + Objects []stgmod.ObjectDetail `json:"objects"` +} + +func ReqGetDatabaseAll(userID cdssdk.UserID) *GetDatabaseAll { + return &GetDatabaseAll{ + UserID: userID, + } +} +func RespGetDatabaseAll(buckets []cdssdk.Bucket, packages []cdssdk.Package, objects []stgmod.ObjectDetail) *GetDatabaseAllResp { + return &GetDatabaseAllResp{ + Buckets: buckets, + Packages: packages, + Objects: objects, + } +} +func (client *Client) GetDatabaseAll(msg *GetDatabaseAll) (*GetDatabaseAllResp, error) { + return mq.Request(Service.GetDatabaseAll, client.rabbitCli, msg) +} diff --git a/coordinator/internal/mq/temp.go b/coordinator/internal/mq/temp.go new file mode 100644 index 0000000..abe21f7 --- /dev/null +++ b/coordinator/internal/mq/temp.go @@ -0,0 +1,52 @@ +package mq + +import ( + "database/sql" + "fmt" + + "github.com/jmoiron/sqlx" + "gitlink.org.cn/cloudream/common/consts/errorcode" + "gitlink.org.cn/cloudream/common/pkgs/logger" + "gitlink.org.cn/cloudream/common/pkgs/mq" + cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" + stgmod "gitlink.org.cn/cloudream/storage/common/models" + coormq "gitlink.org.cn/cloudream/storage/common/pkgs/mq/coordinator" +) + +func (svc *Service) GetDatabaseAll(msg *coormq.GetDatabaseAll) (*coormq.GetDatabaseAllResp, *mq.CodeMessage) { + var bkts []cdssdk.Bucket + var pkgs []cdssdk.Package + var objs []stgmod.ObjectDetail + + err := svc.db.DoTx(sql.LevelSerializable, func(tx *sqlx.Tx) error { + var err error + bkts, err = svc.db.Bucket().GetUserBuckets(tx, msg.UserID) + if err != nil { + return fmt.Errorf("get user buckets: %w", err) + } + + for _, bkt := range bkts { + ps, err := svc.db.Package().GetBucketPackages(tx, msg.UserID, bkt.BucketID) + if err != nil { + return fmt.Errorf("get bucket packages: %w", err) + } + pkgs = append(pkgs, ps...) + } + + for _, pkg := range pkgs { + os, err := svc.db.Object().GetPackageObjectDetails(tx, pkg.PackageID) + if err != nil { + return fmt.Errorf("get package object details: %w", err) + } + objs = append(objs, os...) + } + + return nil + }) + if err != nil { + logger.Warnf("batch deleting objects: %s", err.Error()) + return nil, mq.Failed(errorcode.OperationFailed, "batch delete objects failed") + } + + return mq.ReplyOK(coormq.RespGetDatabaseAll(bkts, pkgs, objs)) +}