Browse Source

恢复中期检查接口

gitlink
Sydonian 10 months ago
parent
commit
9b78962ebe
3 changed files with 71 additions and 63 deletions
  1. +1
    -1
      client/internal/http/server.go
  2. +54
    -62
      client/internal/http/temp.go
  3. +16
    -0
      client/internal/services/storage.go

+ 1
- 1
client/internal/http/server.go View File

@@ -43,7 +43,7 @@ func (s *Server) Serve() error {
func (s *Server) initRouters() { func (s *Server) initRouters() {
rt := s.engine.Use() rt := s.engine.Use()


// initTemp(rt, s)
initTemp(rt, s)


s.routeV1(s.engine) s.routeV1(s.engine)




+ 54
- 62
client/internal/http/temp.go View File

@@ -1,6 +1,5 @@
package http package http


/*
import ( import (
"net/http" "net/http"


@@ -96,7 +95,7 @@ type TempGetObjectDetailResp struct {
type ObjectBlockDetail struct { type ObjectBlockDetail struct {
ObjectID cdssdk.ObjectID `json:"objectID"` ObjectID cdssdk.ObjectID `json:"objectID"`
Type string `json:"type"` Type string `json:"type"`
FileHash string `json:"fileHash"`
FileHash cdssdk.FileHash `json:"fileHash"`
LocationType string `json:"locationType"` LocationType string `json:"locationType"`
LocationName string `json:"locationName"` LocationName string `json:"locationName"`
} }
@@ -123,43 +122,36 @@ func (s *TempService) GetObjectDetail(ctx *gin.Context) {
return return
} }


loadedHubIDs, err := s.svc.PackageSvc().GetLoadedNodes(1, details.Object.PackageID)
if err != nil {
log.Warnf("getting loaded nodes: %s", err.Error())
ctx.JSON(http.StatusOK, Failed(errorcode.OperationFailed, "get loaded nodes failed"))
return
}

var allHubIDs []cdssdk.HubID
allHubIDs = append(allHubIDs, details.PinnedAt...)
var allStgIDs []cdssdk.StorageID
allStgIDs = append(allStgIDs, details.PinnedAt...)
for _, b := range details.Blocks { for _, b := range details.Blocks {
allHubIDs = append(allHubIDs, b.StorageID)
allStgIDs = append(allStgIDs, b.StorageID)
} }
allHubIDs = append(allHubIDs, loadedHubIDs...)


allHubIDs = lo.Uniq(allHubIDs)
allStgIDs = lo.Uniq(allStgIDs)


getNodes, err := s.svc.NodeSvc().GetNodes(allHubIDs)
getStgs, err := s.svc.StorageSvc().GetDetails(allStgIDs)
if err != nil { if err != nil {
log.Warnf("getting nodes: %s", err.Error()) log.Warnf("getting nodes: %s", err.Error())
ctx.JSON(http.StatusOK, Failed(errorcode.OperationFailed, "get nodes failed")) ctx.JSON(http.StatusOK, Failed(errorcode.OperationFailed, "get nodes failed"))
return return
} }


allNodes := make(map[cdssdk.HubID]*cdssdk.Node)
for _, n := range getNodes {
n2 := n
allNodes[n.HubID] = &n2
allStgs := make(map[cdssdk.StorageID]cdssdk.Storage)
for _, n := range getStgs {
if n != nil {
allStgs[n.Storage.StorageID] = n.Storage
}
} }


var blocks []ObjectBlockDetail var blocks []ObjectBlockDetail


for _, hubID := range details.PinnedAt {
for _, stgID := range details.PinnedAt {
blocks = append(blocks, ObjectBlockDetail{ blocks = append(blocks, ObjectBlockDetail{
Type: "Rep", Type: "Rep",
FileHash: details.Object.FileHash, FileHash: details.Object.FileHash,
LocationType: "Agent", LocationType: "Agent",
LocationName: allNodes[hubID].Name,
LocationName: allStgs[stgID].Name,
}) })
} }


@@ -171,7 +163,7 @@ func (s *TempService) GetObjectDetail(ctx *gin.Context) {
Type: "Rep", Type: "Rep",
FileHash: blk.FileHash, FileHash: blk.FileHash,
LocationType: "Agent", LocationType: "Agent",
LocationName: allNodes[blk.StorageID].Name,
LocationName: allStgs[blk.StorageID].Name,
}) })
} }
} }
@@ -182,7 +174,7 @@ func (s *TempService) GetObjectDetail(ctx *gin.Context) {
Type: "Rep", Type: "Rep",
FileHash: blk.FileHash, FileHash: blk.FileHash,
LocationType: "Agent", LocationType: "Agent",
LocationName: allNodes[blk.StorageID].Name,
LocationName: allStgs[blk.StorageID].Name,
}) })
} }
} }
@@ -193,20 +185,11 @@ func (s *TempService) GetObjectDetail(ctx *gin.Context) {
Type: "Block", Type: "Block",
FileHash: blk.FileHash, FileHash: blk.FileHash,
LocationType: "Agent", LocationType: "Agent",
LocationName: allNodes[blk.StorageID].Name,
LocationName: allStgs[blk.StorageID].Name,
}) })
} }
} }


for _, hubID := range loadedHubIDs {
blocks = append(blocks, ObjectBlockDetail{
Type: "Rep",
FileHash: details.Object.FileHash,
LocationType: "Storage",
LocationName: allNodes[hubID].Name,
})
}

ctx.JSON(http.StatusOK, OK(TempGetObjectDetailResp{ ctx.JSON(http.StatusOK, OK(TempGetObjectDetailResp{
Blocks: blocks, Blocks: blocks,
})) }))
@@ -252,15 +235,25 @@ func (s *TempService) GetDatabaseAll(ctx *gin.Context) {
return return
} }


nodes, err := s.svc.NodeSvc().GetNodes(nil)
var allStgIDs []cdssdk.StorageID
for _, obj := range db.Objects {
allStgIDs = append(allStgIDs, obj.PinnedAt...)
for _, blk := range obj.Blocks {
allStgIDs = append(allStgIDs, blk.StorageID)
}
}

getStgs, err := s.svc.StorageSvc().GetDetails(allStgIDs)
if err != nil { if err != nil {
log.Warnf("getting nodes: %s", err.Error()) log.Warnf("getting nodes: %s", err.Error())
ctx.JSON(http.StatusOK, Failed(errorcode.OperationFailed, "get nodes failed")) ctx.JSON(http.StatusOK, Failed(errorcode.OperationFailed, "get nodes failed"))
return return
} }
allNodes := make(map[cdssdk.HubID]cdssdk.Node)
for _, n := range nodes {
allNodes[n.HubID] = n
allStgs := make(map[cdssdk.StorageID]cdssdk.Storage)
for _, n := range getStgs {
if n != nil {
allStgs[n.Storage.StorageID] = n.Storage
}
} }


bkts := make(map[cdssdk.BucketID]*BucketDetail) bkts := make(map[cdssdk.BucketID]*BucketDetail)
@@ -274,25 +267,25 @@ func (s *TempService) GetDatabaseAll(ctx *gin.Context) {


type PackageDetail struct { type PackageDetail struct {
Package cdssdk.Package Package cdssdk.Package
Loaded []cdssdk.Node
// Loaded []cdssdk.Node
} }
pkgs := make(map[cdssdk.PackageID]*PackageDetail) pkgs := make(map[cdssdk.PackageID]*PackageDetail)
for _, pkg := range db.Packages { for _, pkg := range db.Packages {
p := PackageDetail{ p := PackageDetail{
Package: pkg, Package: pkg,
Loaded: make([]cdssdk.Node, 0),
// Loaded: make([]cdssdk.Node, 0),
} }


loaded, err := s.svc.PackageSvc().GetLoadedNodes(1, pkg.PackageID)
if err != nil {
log.Warnf("getting loaded nodes: %s", err.Error())
ctx.JSON(http.StatusOK, Failed(errorcode.OperationFailed, "get loaded nodes failed"))
return
}
// loaded, err := s.svc.PackageSvc().GetLoadedNodes(1, pkg.PackageID)
// if err != nil {
// log.Warnf("getting loaded nodes: %s", err.Error())
// ctx.JSON(http.StatusOK, Failed(errorcode.OperationFailed, "get loaded nodes failed"))
// return
// }


for _, hubID := range loaded {
p.Loaded = append(p.Loaded, allNodes[hubID])
}
// for _, hubID := range loaded {
// p.Loaded = append(p.Loaded, allNodes[hubID])
// }


pkgs[pkg.PackageID] = &p pkgs[pkg.PackageID] = &p
} }
@@ -316,7 +309,7 @@ func (s *TempService) GetDatabaseAll(ctx *gin.Context) {
Type: "Rep", Type: "Rep",
FileHash: obj.Object.FileHash, FileHash: obj.Object.FileHash,
LocationType: "Agent", LocationType: "Agent",
LocationName: allNodes[hubID].Name,
LocationName: allStgs[hubID].Name,
}) })
} }


@@ -329,7 +322,7 @@ func (s *TempService) GetDatabaseAll(ctx *gin.Context) {
Type: "Rep", Type: "Rep",
FileHash: blk.FileHash, FileHash: blk.FileHash,
LocationType: "Agent", LocationType: "Agent",
LocationName: allNodes[blk.StorageID].Name,
LocationName: allStgs[blk.StorageID].Name,
}) })
} }
} }
@@ -341,7 +334,7 @@ func (s *TempService) GetDatabaseAll(ctx *gin.Context) {
Type: "Rep", Type: "Rep",
FileHash: blk.FileHash, FileHash: blk.FileHash,
LocationType: "Agent", LocationType: "Agent",
LocationName: allNodes[blk.StorageID].Name,
LocationName: allStgs[blk.StorageID].Name,
}) })
} }
} }
@@ -353,20 +346,20 @@ func (s *TempService) GetDatabaseAll(ctx *gin.Context) {
Type: "Block", Type: "Block",
FileHash: blk.FileHash, FileHash: blk.FileHash,
LocationType: "Agent", LocationType: "Agent",
LocationName: allNodes[blk.StorageID].Name,
LocationName: allStgs[blk.StorageID].Name,
}) })
} }
} }


for _, node := range pkgs[obj.Object.PackageID].Loaded {
blocks = append(blocks, ObjectBlockDetail{
ObjectID: obj.Object.ObjectID,
Type: "Rep",
FileHash: obj.Object.FileHash,
LocationType: "Storage",
LocationName: allNodes[node.HubID].Name,
})
}
// for _, node := range pkgs[obj.Object.PackageID].Loaded {
// blocks = append(blocks, ObjectBlockDetail{
// ObjectID: obj.Object.ObjectID,
// Type: "Rep",
// FileHash: obj.Object.FileHash,
// LocationType: "Storage",
// LocationName: allNodes[node.HubID].Name,
// })
// }


} }


@@ -390,4 +383,3 @@ func auth(ctx *gin.Context) {
ctx.AbortWithStatus(http.StatusUnauthorized) ctx.AbortWithStatus(http.StatusUnauthorized)
} }
} }
*/

+ 16
- 0
client/internal/services/storage.go View File

@@ -10,6 +10,7 @@ import (
cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" cdssdk "gitlink.org.cn/cloudream/common/sdks/storage"


stgglb "gitlink.org.cn/cloudream/storage/common/globals" stgglb "gitlink.org.cn/cloudream/storage/common/globals"
stgmod "gitlink.org.cn/cloudream/storage/common/models"
"gitlink.org.cn/cloudream/storage/common/pkgs/db2/model" "gitlink.org.cn/cloudream/storage/common/pkgs/db2/model"
"gitlink.org.cn/cloudream/storage/common/pkgs/distlock/reqbuilder" "gitlink.org.cn/cloudream/storage/common/pkgs/distlock/reqbuilder"
"gitlink.org.cn/cloudream/storage/common/pkgs/downloader/strategy" "gitlink.org.cn/cloudream/storage/common/pkgs/downloader/strategy"
@@ -58,6 +59,21 @@ func (svc *StorageService) GetByName(userID cdssdk.UserID, name string) (*model.
return &getResp.Storage, nil return &getResp.Storage, nil
} }


func (svc *StorageService) GetDetails(stgIDs []cdssdk.StorageID) ([]*stgmod.StorageDetail, error) {
coorCli, err := stgglb.CoordinatorMQPool.Acquire()
if err != nil {
return nil, fmt.Errorf("new coordinator client: %w", err)
}
defer stgglb.CoordinatorMQPool.Release(coorCli)

getResp, err := coorCli.GetStorageDetails(coormq.ReqGetStorageDetails(stgIDs))
if err != nil {
return nil, fmt.Errorf("request to coordinator: %w", err)
}

return getResp.Storages, nil
}

func (svc *StorageService) LoadPackage(userID cdssdk.UserID, packageID cdssdk.PackageID, storageID cdssdk.StorageID, rootPath string) error { func (svc *StorageService) LoadPackage(userID cdssdk.UserID, packageID cdssdk.PackageID, storageID cdssdk.StorageID, rootPath string) error {
coorCli, err := stgglb.CoordinatorMQPool.Acquire() coorCli, err := stgglb.CoordinatorMQPool.Acquire()
if err != nil { if err != nil {


Loading…
Cancel
Save