Browse Source

增加获取全量数据的接口

gitlink
Sydonian 1 year ago
parent
commit
4cd940fc80
6 changed files with 268 additions and 4 deletions
  1. +1
    -0
      client/internal/http/server.go
  2. +152
    -4
      client/internal/http/temp.go
  3. +23
    -0
      client/internal/services/temp.go
  4. +2
    -0
      common/pkgs/mq/coordinator/object.go
  5. +38
    -0
      common/pkgs/mq/coordinator/temp.go
  6. +52
    -0
      coordinator/internal/mq/temp.go

+ 1
- 0
client/internal/http/server.go View File

@@ -70,4 +70,5 @@ func (s *Server) initRouters() {
rt.GET("/bucket/listDetails", s.Temp().ListDetails)
rt.GET("/bucket/getObjects", s.Temp().GetObjects)
rt.GET("/object/getDetail", s.Temp().GetObjectDetail)
rt.GET("/temp/getDatabaseAll", s.Temp().GetDatabaseAll)
}

+ 152
- 4
client/internal/http/temp.go View File

@@ -93,10 +93,11 @@ type TempGetObjectDetailResp struct {
Blocks []ObjectBlockDetail `json:"blocks"`
}
type ObjectBlockDetail struct {
Type string `json:"type"`
FileHash string `json:"fileHash"`
LocationType string `json:"locationType"`
LocationName string `json:"locationName"`
ObjectID cdssdk.ObjectID `json:"objectID"`
Type string `json:"type"`
FileHash string `json:"fileHash"`
LocationType string `json:"locationType"`
LocationName string `json:"locationName"`
}

func (s *TempService) GetObjectDetail(ctx *gin.Context) {
@@ -228,6 +229,153 @@ func (s *TempService) getBucketObjects(bktID cdssdk.BucketID) ([]cdssdk.Object,
return allObjs, nil
}

type TempGetDatabaseAll struct {
}
type TempGetDatabaseAllResp struct {
Buckets []BucketDetail `json:"buckets"`
Objects []BucketObject `json:"objects"`
Blocks []ObjectBlockDetail `json:"blocks"`
}
type BucketObject struct {
cdssdk.Object
BucketID cdssdk.BucketID `json:"bucketID"`
}

func (s *TempService) GetDatabaseAll(ctx *gin.Context) {
log := logger.WithField("HTTP", "Temp.GetDatabaseAll")

db, err := s.svc.ObjectSvc().GetDatabaseAll()
if err != nil {
log.Warnf("getting database all: %s", err.Error())
ctx.JSON(http.StatusOK, Failed(errorcode.OperationFailed, "get database all failed"))
return
}

nodes, err := s.svc.NodeSvc().GetNodes(nil)
if err != nil {
log.Warnf("getting nodes: %s", err.Error())
ctx.JSON(http.StatusOK, Failed(errorcode.OperationFailed, "get nodes failed"))
return
}
allNodes := make(map[cdssdk.NodeID]cdssdk.Node)
for _, n := range nodes {
allNodes[n.NodeID] = n
}

bkts := make(map[cdssdk.BucketID]*BucketDetail)
for _, bkt := range db.Buckets {
bkts[bkt.BucketID] = &BucketDetail{
BucketID: bkt.BucketID,
Name: bkt.Name,
ObjectCount: 0,
}
}

type PackageDetail struct {
Package cdssdk.Package
Loaded []cdssdk.Node
}
pkgs := make(map[cdssdk.PackageID]*PackageDetail)
for _, pkg := range db.Packages {
p := PackageDetail{
Package: pkg,
Loaded: make([]cdssdk.Node, 0),
}

loaded, err := s.svc.PackageSvc().GetLoadedNodes(1, pkg.PackageID)
if err != nil {
log.Warnf("getting loaded nodes: %s", err.Error())
ctx.JSON(http.StatusOK, Failed(errorcode.OperationFailed, "get loaded nodes failed"))
return
}

for _, nodeID := range loaded {
p.Loaded = append(p.Loaded, allNodes[nodeID])
}

pkgs[pkg.PackageID] = &p
}

var objs []BucketObject
for _, obj := range db.Objects {
o := BucketObject{
Object: obj.Object,
BucketID: pkgs[obj.Object.PackageID].Package.BucketID,
}
objs = append(objs, o)
}

var blocks []ObjectBlockDetail
for _, obj := range db.Objects {
bkts[pkgs[obj.Object.PackageID].Package.BucketID].ObjectCount++

for _, nodeID := range obj.PinnedAt {
blocks = append(blocks, ObjectBlockDetail{
ObjectID: obj.Object.ObjectID,
Type: "Rep",
FileHash: obj.Object.FileHash,
LocationType: "Agent",
LocationName: allNodes[nodeID].Name,
})
}

switch obj.Object.Redundancy.(type) {
case *cdssdk.NoneRedundancy:
for _, blk := range obj.Blocks {
if !lo.Contains(obj.PinnedAt, blk.NodeID) {
blocks = append(blocks, ObjectBlockDetail{
ObjectID: obj.Object.ObjectID,
Type: "Rep",
FileHash: blk.FileHash,
LocationType: "Agent",
LocationName: allNodes[blk.NodeID].Name,
})
}
}
case *cdssdk.RepRedundancy:
for _, blk := range obj.Blocks {
if !lo.Contains(obj.PinnedAt, blk.NodeID) {
blocks = append(blocks, ObjectBlockDetail{
ObjectID: obj.Object.ObjectID,
Type: "Rep",
FileHash: blk.FileHash,
LocationType: "Agent",
LocationName: allNodes[blk.NodeID].Name,
})
}
}

case *cdssdk.ECRedundancy:
for _, blk := range obj.Blocks {
blocks = append(blocks, ObjectBlockDetail{
ObjectID: obj.Object.ObjectID,
Type: "Block",
FileHash: blk.FileHash,
LocationType: "Agent",
LocationName: allNodes[blk.NodeID].Name,
})
}
}

for _, node := range pkgs[obj.Object.PackageID].Loaded {
blocks = append(blocks, ObjectBlockDetail{
ObjectID: obj.Object.ObjectID,
Type: "Rep",
FileHash: obj.Object.FileHash,
LocationType: "Storage",
LocationName: allNodes[node.NodeID].Name,
})
}

}

ctx.JSON(http.StatusOK, OK(TempGetDatabaseAllResp{
Buckets: lo.Map(lo.Values(bkts), func(b *BucketDetail, _ int) BucketDetail { return *b }),
Objects: objs,
Blocks: blocks,
}))
}

func auth(ctx *gin.Context) {
token := ctx.Request.Header.Get("X-CDS-Auth")
if token != "cloudream@123" {


+ 23
- 0
client/internal/services/temp.go View File

@@ -0,0 +1,23 @@
package services

import (
"fmt"

stgglb "gitlink.org.cn/cloudream/storage/common/globals"
coormq "gitlink.org.cn/cloudream/storage/common/pkgs/mq/coordinator"
)

func (svc *ObjectService) GetDatabaseAll() (*coormq.GetDatabaseAllResp, error) {
coorCli, err := stgglb.CoordinatorMQPool.Acquire()
if err != nil {
return nil, fmt.Errorf("new coordinator client: %w", err)
}
defer stgglb.CoordinatorMQPool.Release(coorCli)

getResp, err := coorCli.GetDatabaseAll(coormq.ReqGetDatabaseAll(1))
if err != nil {
return nil, fmt.Errorf("requsting to coodinator: %w", err)
}

return getResp, nil
}

+ 2
- 0
common/pkgs/mq/coordinator/object.go View File

@@ -22,6 +22,8 @@ type ObjectService interface {
MoveObjects(msg *MoveObjects) (*MoveObjectsResp, *mq.CodeMessage)

DeleteObjects(msg *DeleteObjects) (*DeleteObjectsResp, *mq.CodeMessage)

GetDatabaseAll(msg *GetDatabaseAll) (*GetDatabaseAllResp, *mq.CodeMessage)
}

// 查询Package中的所有Object,返回的Objects会按照ObjectID升序


+ 38
- 0
common/pkgs/mq/coordinator/temp.go View File

@@ -0,0 +1,38 @@
package coordinator

import (
"gitlink.org.cn/cloudream/common/pkgs/mq"
cdssdk "gitlink.org.cn/cloudream/common/sdks/storage"
stgmod "gitlink.org.cn/cloudream/storage/common/models"
)

// 删除Object
var _ = Register(Service.GetDatabaseAll)

type GetDatabaseAll struct {
mq.MessageBodyBase
UserID cdssdk.UserID `json:"userID"`
}

type GetDatabaseAllResp struct {
mq.MessageBodyBase
Buckets []cdssdk.Bucket `json:"buckets"`
Packages []cdssdk.Package `json:"packages"`
Objects []stgmod.ObjectDetail `json:"objects"`
}

func ReqGetDatabaseAll(userID cdssdk.UserID) *GetDatabaseAll {
return &GetDatabaseAll{
UserID: userID,
}
}
func RespGetDatabaseAll(buckets []cdssdk.Bucket, packages []cdssdk.Package, objects []stgmod.ObjectDetail) *GetDatabaseAllResp {
return &GetDatabaseAllResp{
Buckets: buckets,
Packages: packages,
Objects: objects,
}
}
func (client *Client) GetDatabaseAll(msg *GetDatabaseAll) (*GetDatabaseAllResp, error) {
return mq.Request(Service.GetDatabaseAll, client.rabbitCli, msg)
}

+ 52
- 0
coordinator/internal/mq/temp.go View File

@@ -0,0 +1,52 @@
package mq

import (
"database/sql"
"fmt"

"github.com/jmoiron/sqlx"
"gitlink.org.cn/cloudream/common/consts/errorcode"
"gitlink.org.cn/cloudream/common/pkgs/logger"
"gitlink.org.cn/cloudream/common/pkgs/mq"
cdssdk "gitlink.org.cn/cloudream/common/sdks/storage"
stgmod "gitlink.org.cn/cloudream/storage/common/models"
coormq "gitlink.org.cn/cloudream/storage/common/pkgs/mq/coordinator"
)

func (svc *Service) GetDatabaseAll(msg *coormq.GetDatabaseAll) (*coormq.GetDatabaseAllResp, *mq.CodeMessage) {
var bkts []cdssdk.Bucket
var pkgs []cdssdk.Package
var objs []stgmod.ObjectDetail

err := svc.db.DoTx(sql.LevelSerializable, func(tx *sqlx.Tx) error {
var err error
bkts, err = svc.db.Bucket().GetUserBuckets(tx, msg.UserID)
if err != nil {
return fmt.Errorf("get user buckets: %w", err)
}

for _, bkt := range bkts {
ps, err := svc.db.Package().GetBucketPackages(tx, msg.UserID, bkt.BucketID)
if err != nil {
return fmt.Errorf("get bucket packages: %w", err)
}
pkgs = append(pkgs, ps...)
}

for _, pkg := range pkgs {
os, err := svc.db.Object().GetPackageObjectDetails(tx, pkg.PackageID)
if err != nil {
return fmt.Errorf("get package object details: %w", err)
}
objs = append(objs, os...)
}

return nil
})
if err != nil {
logger.Warnf("batch deleting objects: %s", err.Error())
return nil, mq.Failed(errorcode.OperationFailed, "batch delete objects failed")
}

return mq.ReplyOK(coormq.RespGetDatabaseAll(bkts, pkgs, objs))
}

Loading…
Cancel
Save