Browse Source

Merge pull request '利用退火算法来清除Pinned数据' (#20) from feature_gxh into master

gitlink
baohan 1 year ago
parent
commit
b90c9ca307
24 changed files with 1727 additions and 286 deletions
  1. +3
    -27
      agent/internal/mq/storage.go
  2. +331
    -9
      agent/internal/task/storage_load_package.go
  3. +2
    -0
      client/internal/cmdline/scanner.go
  4. +2
    -2
      client/internal/cmdline/storage.go
  5. +2
    -2
      client/internal/http/storage.go
  6. +46
    -10
      client/internal/services/storage.go
  7. +0
    -100
      client/internal/task/storage_load_package.go
  8. +6
    -0
      common/consts/consts.go
  9. +7
    -5
      common/models/models.go
  10. +38
    -1
      common/pkgs/db/object.go
  11. +8
    -28
      common/pkgs/db/object_block.go
  12. +15
    -0
      common/pkgs/db/pinned_object.go
  13. +154
    -92
      common/pkgs/iterator/download_object_iterator.go
  14. +9
    -0
      common/pkgs/mq/coordinator/node.go
  15. +1
    -0
      common/pkgs/mq/coordinator/object.go
  16. +10
    -7
      common/pkgs/mq/coordinator/storage.go
  17. +18
    -0
      common/pkgs/mq/scanner/event/clean_pinned.go
  18. +1
    -1
      coordinator/internal/mq/object.go
  19. +1
    -1
      coordinator/internal/mq/package.go
  20. +26
    -0
      coordinator/internal/mq/storage.go
  21. +732
    -0
      scanner/internal/event/clean_pinned.go
  22. +264
    -0
      scanner/internal/event/clean_pinned_test.go
  23. +48
    -0
      scanner/internal/tickevent/batch_clean_pinned.go
  24. +3
    -1
      scanner/main.go

+ 3
- 27
agent/internal/mq/storage.go View File

@@ -24,31 +24,7 @@ import (
)

func (svc *Service) StartStorageLoadPackage(msg *agtmq.StartStorageLoadPackage) (*agtmq.StartStorageLoadPackageResp, *mq.CodeMessage) {
coorCli, err := stgglb.CoordinatorMQPool.Acquire()
if err != nil {
logger.Warnf("new coordinator client: %s", err.Error())

return nil, mq.Failed(errorcode.OperationFailed, "new coordinator client failed")
}
defer stgglb.CoordinatorMQPool.Release(coorCli)

getStgResp, err := coorCli.GetStorageInfo(coormq.NewGetStorageInfo(msg.UserID, msg.StorageID))
if err != nil {
logger.WithField("StorageID", msg.StorageID).
Warnf("getting storage info: %s", err.Error())

return nil, mq.Failed(errorcode.OperationFailed, "get storage info failed")
}

outputDirPath := utils.MakeStorageLoadPackagePath(getStgResp.Directory, msg.UserID, msg.PackageID)
if err = os.MkdirAll(outputDirPath, 0755); err != nil {
logger.WithField("StorageID", msg.StorageID).
Warnf("creating output directory: %s", err.Error())

return nil, mq.Failed(errorcode.OperationFailed, "create output directory failed")
}

tsk := svc.taskManager.StartNew(mytask.NewStorageLoadPackage(msg.UserID, msg.PackageID, outputDirPath))
tsk := svc.taskManager.StartNew(mytask.NewStorageLoadPackage(msg.UserID, msg.PackageID, msg.StorageID))
return mq.ReplyOK(agtmq.NewStartStorageLoadPackageResp(tsk.ID()))
}

@@ -70,7 +46,7 @@ func (svc *Service) WaitStorageLoadPackage(msg *agtmq.WaitStorageLoadPackage) (*

loadTsk := tsk.Body().(*mytask.StorageLoadPackage)

return mq.ReplyOK(agtmq.NewWaitStorageLoadPackageResp(true, errMsg, loadTsk.FullPath))
return mq.ReplyOK(agtmq.NewWaitStorageLoadPackageResp(true, errMsg, loadTsk.FullOutputPath))

} else {
if tsk.WaitTimeout(time.Duration(msg.WaitTimeoutMs) * time.Millisecond) {
@@ -82,7 +58,7 @@ func (svc *Service) WaitStorageLoadPackage(msg *agtmq.WaitStorageLoadPackage) (*

loadTsk := tsk.Body().(*mytask.StorageLoadPackage)

return mq.ReplyOK(agtmq.NewWaitStorageLoadPackageResp(true, errMsg, loadTsk.FullPath))
return mq.ReplyOK(agtmq.NewWaitStorageLoadPackageResp(true, errMsg, loadTsk.FullOutputPath))
}

return mq.ReplyOK(agtmq.NewWaitStorageLoadPackageResp(false, "", ""))


+ 331
- 9
agent/internal/task/storage_load_package.go View File

@@ -1,30 +1,352 @@
package task

import (
"fmt"
"io"
"math"
"os"
"path/filepath"
"time"

"github.com/samber/lo"
"gitlink.org.cn/cloudream/common/pkgs/bitmap"
"gitlink.org.cn/cloudream/common/pkgs/ipfs"
"gitlink.org.cn/cloudream/common/pkgs/task"
cdssdk "gitlink.org.cn/cloudream/common/sdks/storage"
"gitlink.org.cn/cloudream/storage/common/pkgs/cmd"
myio "gitlink.org.cn/cloudream/common/utils/io"
myref "gitlink.org.cn/cloudream/common/utils/reflect"
mysort "gitlink.org.cn/cloudream/common/utils/sort"
"gitlink.org.cn/cloudream/storage/common/consts"
stgglb "gitlink.org.cn/cloudream/storage/common/globals"
stgmod "gitlink.org.cn/cloudream/storage/common/models"
"gitlink.org.cn/cloudream/storage/common/pkgs/db/model"
"gitlink.org.cn/cloudream/storage/common/pkgs/distlock/reqbuilder"
"gitlink.org.cn/cloudream/storage/common/pkgs/ec"
coormq "gitlink.org.cn/cloudream/storage/common/pkgs/mq/coordinator"
"gitlink.org.cn/cloudream/storage/common/utils"
)

type StorageLoadPackage struct {
cmd *cmd.DownloadPackage
FullPath string
FullOutputPath string

userID cdssdk.UserID
packageID cdssdk.PackageID
storageID cdssdk.StorageID
pinnedBlocks []stgmod.ObjectBlock
}

func NewStorageLoadPackage(userID cdssdk.UserID, packageID cdssdk.PackageID, outputPath string) *StorageLoadPackage {
func NewStorageLoadPackage(userID cdssdk.UserID, packageID cdssdk.PackageID, storageID cdssdk.StorageID) *StorageLoadPackage {
return &StorageLoadPackage{
cmd: cmd.NewDownloadPackage(userID, packageID, outputPath),
FullPath: outputPath,
userID: userID,
packageID: packageID,
storageID: storageID,
}
}
func (t *StorageLoadPackage) Execute(task *task.Task[TaskContext], ctx TaskContext, complete CompleteFn) {
err := t.cmd.Execute(&cmd.DownloadPackageContext{
Distlock: ctx.distlock,
})
err := t.do(task, ctx)

complete(err, CompleteOption{
RemovingDelay: time.Minute,
})
}

func (t *StorageLoadPackage) do(task *task.Task[TaskContext], ctx TaskContext) error {
coorCli, err := stgglb.CoordinatorMQPool.Acquire()
if err != nil {
return fmt.Errorf("new coordinator client: %w", err)
}
defer stgglb.CoordinatorMQPool.Release(coorCli)

ipfsCli, err := stgglb.IPFSPool.Acquire()
if err != nil {
return fmt.Errorf("new IPFS client: %w", err)
}
defer stgglb.IPFSPool.Release(ipfsCli)

getStgResp, err := coorCli.GetStorageInfo(coormq.NewGetStorageInfo(t.userID, t.storageID))
if err != nil {
return fmt.Errorf("request to coordinator: %w", err)
}

outputDirPath := utils.MakeStorageLoadPackagePath(getStgResp.Directory, t.userID, t.packageID)
if err = os.MkdirAll(outputDirPath, 0755); err != nil {
return fmt.Errorf("creating output directory: %w", err)
}
t.FullOutputPath = outputDirPath

getObjectDetails, err := coorCli.GetPackageObjectDetails(coormq.NewGetPackageObjectDetails(t.packageID))
if err != nil {
return fmt.Errorf("getting package object details: %w", err)
}

mutex, err := reqbuilder.NewBuilder().
// 提前占位
Metadata().StoragePackage().CreateOne(t.userID, t.storageID, t.packageID).
// 保护在storage目录中下载的文件
Storage().Buzy(t.storageID).
// 保护下载文件时同时保存到IPFS的文件
IPFS().Buzy(getStgResp.NodeID).
MutexLock(ctx.distlock)
if err != nil {
return fmt.Errorf("acquire locks failed, err: %w", err)
}
defer mutex.Unlock()

for _, obj := range getObjectDetails.Objects {
err := t.downloadOne(coorCli, ipfsCli, outputDirPath, obj)
if err != nil {
return err
}
}

_, err = coorCli.StoragePackageLoaded(coormq.NewStoragePackageLoaded(t.userID, t.storageID, t.packageID, t.pinnedBlocks))
if err != nil {
return fmt.Errorf("loading package to storage: %w", err)
}

return err
}

func (t *StorageLoadPackage) downloadOne(coorCli *coormq.Client, ipfsCli *ipfs.PoolClient, dir string, obj stgmod.ObjectDetail) error {
var file io.ReadCloser

switch red := obj.Object.Redundancy.(type) {
case *cdssdk.NoneRedundancy:
reader, err := t.downloadNoneOrRepObject(ipfsCli, obj)
if err != nil {
return fmt.Errorf("downloading object: %w", err)
}
file = reader

case *cdssdk.RepRedundancy:
reader, err := t.downloadNoneOrRepObject(ipfsCli, obj)
if err != nil {
return fmt.Errorf("downloading rep object: %w", err)
}
file = reader

case *cdssdk.ECRedundancy:
reader, pinnedBlocks, err := t.downloadECObject(coorCli, ipfsCli, obj, red)
if err != nil {
return fmt.Errorf("downloading ec object: %w", err)
}
file = reader
t.pinnedBlocks = append(t.pinnedBlocks, pinnedBlocks...)

default:
return fmt.Errorf("unknow redundancy type: %v", myref.TypeOfValue(obj.Object.Redundancy))
}
defer file.Close()

fullPath := filepath.Join(dir, obj.Object.Path)

lastDirPath := filepath.Dir(fullPath)
if err := os.MkdirAll(lastDirPath, 0755); err != nil {
return fmt.Errorf("creating object last dir: %w", err)
}

outputFile, err := os.Create(fullPath)
if err != nil {
return fmt.Errorf("creating object file: %w", err)
}
defer outputFile.Close()

if _, err := io.Copy(outputFile, file); err != nil {
return fmt.Errorf("writting object to file: %w", err)
}

return nil
}

func (t *StorageLoadPackage) downloadNoneOrRepObject(ipfsCli *ipfs.PoolClient, obj stgmod.ObjectDetail) (io.ReadCloser, error) {
if len(obj.Blocks) == 0 && len(obj.PinnedAt) == 0 {
return nil, fmt.Errorf("no node has this object")
}

// 不管实际有没有成功
ipfsCli.Pin(obj.Object.FileHash)

file, err := ipfsCli.OpenRead(obj.Object.FileHash)
if err != nil {
return nil, err
}

return file, nil
}

func (t *StorageLoadPackage) downloadECObject(coorCli *coormq.Client, ipfsCli *ipfs.PoolClient, obj stgmod.ObjectDetail, ecRed *cdssdk.ECRedundancy) (io.ReadCloser, []stgmod.ObjectBlock, error) {
allNodes, err := t.sortDownloadNodes(coorCli, obj)
if err != nil {
return nil, nil, err
}
bsc, blocks := t.getMinReadingBlockSolution(allNodes, ecRed.K)
osc, _ := t.getMinReadingObjectSolution(allNodes, ecRed.K)
if bsc < osc {
var fileStrs []io.ReadCloser

rs, err := ec.NewRs(ecRed.K, ecRed.N, ecRed.ChunkSize)
if err != nil {
return nil, nil, fmt.Errorf("new rs: %w", err)
}

for i := range blocks {
// 不管实际有没有成功
ipfsCli.Pin(blocks[i].Block.FileHash)

str, err := ipfsCli.OpenRead(blocks[i].Block.FileHash)
if err != nil {
for i -= 1; i >= 0; i-- {
fileStrs[i].Close()
}
return nil, nil, fmt.Errorf("donwloading file: %w", err)
}

fileStrs = append(fileStrs, str)
}

fileReaders, filesCloser := myio.ToReaders(fileStrs)

var indexes []int
var pinnedBlocks []stgmod.ObjectBlock
for _, b := range blocks {
indexes = append(indexes, b.Block.Index)
pinnedBlocks = append(pinnedBlocks, stgmod.ObjectBlock{
ObjectID: b.Block.ObjectID,
Index: b.Block.Index,
NodeID: *stgglb.Local.NodeID,
FileHash: b.Block.FileHash,
})
}

outputs, outputsCloser := myio.ToReaders(rs.ReconstructData(fileReaders, indexes))
return myio.AfterReadClosed(myio.Length(myio.ChunkedJoin(outputs, int(ecRed.ChunkSize)), obj.Object.Size), func(c io.ReadCloser) {
filesCloser()
outputsCloser()
}), pinnedBlocks, nil
}

// bsc >= osc,如果osc是MaxFloat64,那么bsc也一定是,也就意味着没有足够块来恢复文件
if osc == math.MaxFloat64 {
return nil, nil, fmt.Errorf("no enough blocks to reconstruct the file, want %d, get only %d", ecRed.K, len(blocks))
}

// 如果是直接读取的文件,那么就不需要Pin文件块
str, err := ipfsCli.OpenRead(obj.Object.FileHash)
return str, nil, err
}

type downloadNodeInfo struct {
Node model.Node
ObjectPinned bool
Blocks []stgmod.ObjectBlock
Distance float64
}

func (t *StorageLoadPackage) sortDownloadNodes(coorCli *coormq.Client, obj stgmod.ObjectDetail) ([]*downloadNodeInfo, error) {
var nodeIDs []cdssdk.NodeID
for _, id := range obj.PinnedAt {
if !lo.Contains(nodeIDs, id) {
nodeIDs = append(nodeIDs, id)
}
}
for _, b := range obj.Blocks {
if !lo.Contains(nodeIDs, b.NodeID) {
nodeIDs = append(nodeIDs, b.NodeID)
}
}

getNodes, err := coorCli.GetNodes(coormq.NewGetNodes(nodeIDs))
if err != nil {
return nil, fmt.Errorf("getting nodes: %w", err)
}

downloadNodeMap := make(map[cdssdk.NodeID]*downloadNodeInfo)
for _, id := range obj.PinnedAt {
node, ok := downloadNodeMap[id]
if !ok {
mod := *getNodes.GetNode(id)
node = &downloadNodeInfo{
Node: mod,
ObjectPinned: true,
Distance: t.getNodeDistance(mod),
}
downloadNodeMap[id] = node
}

node.ObjectPinned = true
}

for _, b := range obj.Blocks {
node, ok := downloadNodeMap[b.NodeID]
if !ok {
mod := *getNodes.GetNode(b.NodeID)
node = &downloadNodeInfo{
Node: mod,
Distance: t.getNodeDistance(mod),
}
downloadNodeMap[b.NodeID] = node
}

node.Blocks = append(node.Blocks, b)
}

return mysort.Sort(lo.Values(downloadNodeMap), func(left, right *downloadNodeInfo) int {
return mysort.Cmp(left.Distance, right.Distance)
}), nil
}

type downloadBlock struct {
Node model.Node
Block stgmod.ObjectBlock
}

func (t *StorageLoadPackage) getMinReadingBlockSolution(sortedNodes []*downloadNodeInfo, k int) (float64, []downloadBlock) {
gotBlocksMap := bitmap.Bitmap64(0)
var gotBlocks []downloadBlock
dist := float64(0.0)
for _, n := range sortedNodes {
for _, b := range n.Blocks {
if !gotBlocksMap.Get(b.Index) {
gotBlocks = append(gotBlocks, downloadBlock{
Node: n.Node,
Block: b,
})
gotBlocksMap.Set(b.Index, true)
dist += n.Distance
}

if len(gotBlocks) >= k {
return dist, gotBlocks
}
}
}

return math.MaxFloat64, gotBlocks
}

func (t *StorageLoadPackage) getMinReadingObjectSolution(sortedNodes []*downloadNodeInfo, k int) (float64, *model.Node) {
dist := math.MaxFloat64
var downloadNode *model.Node
for _, n := range sortedNodes {
if n.ObjectPinned && float64(k)*n.Distance < dist {
dist = float64(k) * n.Distance
downloadNode = &n.Node
}
}

return dist, downloadNode
}

func (t *StorageLoadPackage) getNodeDistance(node model.Node) float64 {
if stgglb.Local.NodeID != nil {
if node.NodeID == *stgglb.Local.NodeID {
return consts.NodeDistanceSameNode
}
}

if node.LocationID == stgglb.Local.LocationID {
return consts.NodeDistanceSameLocation
}

return consts.NodeDistanceOther
}

+ 2
- 0
client/internal/cmdline/scanner.go View File

@@ -39,5 +39,7 @@ func init() {

parseScannerEventCmdTrie.MustAdd(scevt.NewCheckPackageRedundancy, myreflect.TypeNameOf[scevt.CheckPackageRedundancy]())

parseScannerEventCmdTrie.MustAdd(scevt.NewCleanPinned, myreflect.TypeNameOf[scevt.CleanPinned]())

commands.MustAdd(ScannerPostEvent, "scanner", "event")
}

+ 2
- 2
client/internal/cmdline/storage.go View File

@@ -8,13 +8,13 @@ import (
)

func StorageLoadPackage(ctx CommandContext, packageID cdssdk.PackageID, storageID cdssdk.StorageID) error {
taskID, err := ctx.Cmdline.Svc.StorageSvc().StartStorageLoadPackage(0, packageID, storageID)
nodeID, taskID, err := ctx.Cmdline.Svc.StorageSvc().StartStorageLoadPackage(0, packageID, storageID)
if err != nil {
return fmt.Errorf("start loading package to storage: %w", err)
}

for {
complete, fullPath, err := ctx.Cmdline.Svc.StorageSvc().WaitStorageLoadPackage(taskID, time.Second*10)
complete, fullPath, err := ctx.Cmdline.Svc.StorageSvc().WaitStorageLoadPackage(nodeID, taskID, time.Second*10)
if complete {
if err != nil {
return fmt.Errorf("moving complete with: %w", err)


+ 2
- 2
client/internal/http/storage.go View File

@@ -40,7 +40,7 @@ func (s *StorageService) LoadPackage(ctx *gin.Context) {
return
}

taskID, err := s.svc.StorageSvc().StartStorageLoadPackage(*req.UserID, *req.PackageID, *req.StorageID)
nodeID, taskID, err := s.svc.StorageSvc().StartStorageLoadPackage(*req.UserID, *req.PackageID, *req.StorageID)
if err != nil {
log.Warnf("start storage load package: %s", err.Error())
ctx.JSON(http.StatusOK, Failed(errorcode.OperationFailed, "storage load package failed"))
@@ -48,7 +48,7 @@ func (s *StorageService) LoadPackage(ctx *gin.Context) {
}

for {
complete, fullPath, err := s.svc.StorageSvc().WaitStorageLoadPackage(taskID, time.Second*10)
complete, fullPath, err := s.svc.StorageSvc().WaitStorageLoadPackage(nodeID, taskID, time.Second*10)
if complete {
if err != nil {
log.Warnf("loading complete with: %s", err.Error())


+ 46
- 10
client/internal/services/storage.go View File

@@ -6,7 +6,6 @@ import (

cdssdk "gitlink.org.cn/cloudream/common/sdks/storage"

"gitlink.org.cn/cloudream/storage/client/internal/task"
stgglb "gitlink.org.cn/cloudream/storage/common/globals"
"gitlink.org.cn/cloudream/storage/common/pkgs/db/model"
agtmq "gitlink.org.cn/cloudream/storage/common/pkgs/mq/agent"
@@ -21,18 +20,55 @@ func (svc *Service) StorageSvc() *StorageService {
return &StorageService{Service: svc}
}

func (svc *StorageService) StartStorageLoadPackage(userID cdssdk.UserID, packageID cdssdk.PackageID, storageID cdssdk.StorageID) (string, error) {
tsk := svc.TaskMgr.StartNew(task.NewStorageLoadPackage(userID, packageID, storageID))
return tsk.ID(), nil
func (svc *StorageService) StartStorageLoadPackage(userID cdssdk.UserID, packageID cdssdk.PackageID, storageID cdssdk.StorageID) (cdssdk.NodeID, string, error) {
coorCli, err := stgglb.CoordinatorMQPool.Acquire()
if err != nil {
return 0, "", fmt.Errorf("new coordinator client: %w", err)
}
defer stgglb.CoordinatorMQPool.Release(coorCli)

stgResp, err := coorCli.GetStorageInfo(coormq.NewGetStorageInfo(userID, storageID))
if err != nil {
return 0, "", fmt.Errorf("getting storage info: %w", err)
}

agentCli, err := stgglb.AgentMQPool.Acquire(stgResp.NodeID)
if err != nil {
return 0, "", fmt.Errorf("new agent client: %w", err)
}
defer stgglb.AgentMQPool.Release(agentCli)

startResp, err := agentCli.StartStorageLoadPackage(agtmq.NewStartStorageLoadPackage(userID, packageID, storageID))
if err != nil {
return 0, "", fmt.Errorf("start storage load package: %w", err)
}

return stgResp.NodeID, startResp.TaskID, nil
}

func (svc *StorageService) WaitStorageLoadPackage(taskID string, waitTimeout time.Duration) (bool, string, error) {
tsk := svc.TaskMgr.FindByID(taskID)
if tsk.WaitTimeout(waitTimeout) {
loadTsk := tsk.Body().(*task.StorageLoadPackage)
return true, loadTsk.ResultFullPath, tsk.Error()
func (svc *StorageService) WaitStorageLoadPackage(nodeID cdssdk.NodeID, taskID string, waitTimeout time.Duration) (bool, string, error) {
agentCli, err := stgglb.AgentMQPool.Acquire(nodeID)
if err != nil {
// TODO 失败是否要当做任务已经结束?
return true, "", fmt.Errorf("new agent client: %w", err)
}
defer stgglb.AgentMQPool.Release(agentCli)

waitResp, err := agentCli.WaitStorageLoadPackage(agtmq.NewWaitStorageLoadPackage(taskID, waitTimeout.Milliseconds()))
if err != nil {
// TODO 请求失败是否要当做任务已经结束?
return true, "", fmt.Errorf("wait storage load package: %w", err)
}

if !waitResp.IsComplete {
return false, "", nil
}
return false, "", nil

if waitResp.Error != "" {
return true, "", fmt.Errorf("%s", waitResp.Error)
}

return true, waitResp.FullPath, nil
}

func (svc *StorageService) DeleteStoragePackage(userID int64, packageID int64, storageID int64) error {


+ 0
- 100
client/internal/task/storage_load_package.go View File

@@ -1,100 +0,0 @@
package task

import (
"fmt"
"time"

"gitlink.org.cn/cloudream/common/pkgs/task"
cdssdk "gitlink.org.cn/cloudream/common/sdks/storage"
stgglb "gitlink.org.cn/cloudream/storage/common/globals"
"gitlink.org.cn/cloudream/storage/common/pkgs/distlock/reqbuilder"
agtmq "gitlink.org.cn/cloudream/storage/common/pkgs/mq/agent"
coormq "gitlink.org.cn/cloudream/storage/common/pkgs/mq/coordinator"
)

// TODO 可以考虑不用Task来实现这些逻辑
type StorageLoadPackage struct {
userID cdssdk.UserID
packageID cdssdk.PackageID
storageID cdssdk.StorageID

ResultFullPath string
}

func NewStorageLoadPackage(userID cdssdk.UserID, packageID cdssdk.PackageID, storageID cdssdk.StorageID) *StorageLoadPackage {
return &StorageLoadPackage{
userID: userID,
packageID: packageID,
storageID: storageID,
}
}

func (t *StorageLoadPackage) Execute(task *task.Task[TaskContext], ctx TaskContext, complete CompleteFn) {
err := t.do(ctx)
complete(err, CompleteOption{
RemovingDelay: time.Minute,
})
}

func (t *StorageLoadPackage) do(ctx TaskContext) error {
mutex, err := reqbuilder.NewBuilder().
// 提前占位
Metadata().StoragePackage().CreateOne(t.userID, t.storageID, t.packageID).
// 保护在storage目录中下载的文件
Storage().Buzy(t.storageID).
MutexLock(ctx.distlock)
if err != nil {
return fmt.Errorf("acquire locks failed, err: %w", err)
}
defer mutex.Unlock()

coorCli, err := stgglb.CoordinatorMQPool.Acquire()
if err != nil {
return fmt.Errorf("new coordinator client: %w", err)
}
defer stgglb.CoordinatorMQPool.Release(coorCli)

getStgResp, err := coorCli.GetStorageInfo(coormq.NewGetStorageInfo(t.userID, t.storageID))
if err != nil {
return fmt.Errorf("getting storage info: %w", err)
}

// 然后向代理端发送移动文件的请求
agentCli, err := stgglb.AgentMQPool.Acquire(getStgResp.NodeID)
if err != nil {
return fmt.Errorf("create agent client to %d failed, err: %w", getStgResp.NodeID, err)
}
defer stgglb.AgentMQPool.Release(agentCli)

agentMoveResp, err := agentCli.StartStorageLoadPackage(
agtmq.NewStartStorageLoadPackage(
t.userID,
t.packageID,
t.storageID,
))
if err != nil {
return fmt.Errorf("start loading package to storage: %w", err)
}

for {
waitResp, err := agentCli.WaitStorageLoadPackage(agtmq.NewWaitStorageLoadPackage(agentMoveResp.TaskID, int64(time.Second)*5))
if err != nil {
return fmt.Errorf("wait loading package: %w", err)
}

if waitResp.IsComplete {
if waitResp.Error != "" {
return fmt.Errorf("agent loading package: %s", waitResp.Error)
}

t.ResultFullPath = waitResp.FullPath
break
}
}

_, err = coorCli.StoragePackageLoaded(coormq.NewStoragePackageLoaded(t.userID, t.storageID, t.packageID))
if err != nil {
return fmt.Errorf("loading package to storage: %w", err)
}
return nil
}

+ 6
- 0
common/consts/consts.go View File

@@ -9,3 +9,9 @@ const (
NodeStateNormal = "Normal"
NodeStateUnavailable = "Unavailable"
)

const (
NodeDistanceSameNode = 0.1
NodeDistanceSameLocation = 1
NodeDistanceOther = 5
)

+ 7
- 5
common/models/models.go View File

@@ -13,14 +13,16 @@ type ObjectBlock struct {
}

type ObjectDetail struct {
Object cdssdk.Object `json:"object"`
Blocks []ObjectBlock `json:"blocks"`
Object cdssdk.Object `json:"object"`
PinnedAt []cdssdk.NodeID `json:"pinnedAt"`
Blocks []ObjectBlock `json:"blocks"`
}

func NewObjectDetail(object cdssdk.Object, blocks []ObjectBlock) ObjectDetail {
func NewObjectDetail(object cdssdk.Object, pinnedAt []cdssdk.NodeID, blocks []ObjectBlock) ObjectDetail {
return ObjectDetail{
Object: object,
Blocks: blocks,
Object: object,
PinnedAt: pinnedAt,
Blocks: blocks,
}
}



+ 38
- 1
common/pkgs/db/object.go View File

@@ -6,6 +6,7 @@ import (
"github.com/jmoiron/sqlx"
"github.com/samber/lo"
cdssdk "gitlink.org.cn/cloudream/common/sdks/storage"
stgmod "gitlink.org.cn/cloudream/storage/common/models"
"gitlink.org.cn/cloudream/storage/common/pkgs/db/model"
coormq "gitlink.org.cn/cloudream/storage/common/pkgs/mq/coordinator"
)
@@ -94,6 +95,38 @@ func (*ObjectDB) GetPackageObjects(ctx SQLContext, packageID cdssdk.PackageID) (
return lo.Map(ret, func(o model.TempObject, idx int) model.Object { return o.ToObject() }), err
}

func (db *ObjectDB) GetPackageObjectDetails(ctx SQLContext, packageID cdssdk.PackageID) ([]stgmod.ObjectDetail, error) {
var objs []model.TempObject
err := sqlx.Select(ctx, &objs, "select * from Object where PackageID = ? order by ObjectID asc", packageID)
if err != nil {
return nil, fmt.Errorf("getting objects: %w", err)
}

rets := make([]stgmod.ObjectDetail, 0, len(objs))

for _, obj := range objs {
var blocks []stgmod.ObjectBlock
err = sqlx.Select(ctx,
&blocks,
"select * from ObjectBlock where ObjectID = ? order by `Index`",
obj.ObjectID,
)
if err != nil {
return nil, err
}

var pinnedAt []cdssdk.NodeID
err = sqlx.Select(ctx, &pinnedAt, "select NodeID from PinnedObject where ObjectID = ?", obj.ObjectID)
if err != nil {
return nil, err
}

rets = append(rets, stgmod.NewObjectDetail(obj.ToObject(), pinnedAt, blocks))
}

return rets, nil
}

func (db *ObjectDB) BatchAdd(ctx SQLContext, packageID cdssdk.PackageID, objs []coormq.AddObjectEntry) ([]cdssdk.ObjectID, error) {
objIDs := make([]cdssdk.ObjectID, 0, len(objs))
for _, obj := range objs {
@@ -151,7 +184,6 @@ func (db *ObjectDB) BatchUpdateRedundancy(ctx SQLContext, objs []coormq.ChangeOb
}

for _, block := range obj.Blocks {
// 首次上传默认使用不分块的rep模式
err = db.ObjectBlock().Create(ctx, obj.ObjectID, block.Index, block.NodeID, block.FileHash)
if err != nil {
return fmt.Errorf("creating object block: %w", err)
@@ -163,6 +195,11 @@ func (db *ObjectDB) BatchUpdateRedundancy(ctx SQLContext, objs []coormq.ChangeOb
return fmt.Errorf("creating cache: %w", err)
}
}

err = db.PinnedObject().ObjectBatchCreate(ctx, obj.ObjectID, obj.PinnedAt)
if err != nil {
return fmt.Errorf("creating pinned object: %w", err)
}
}

return nil


+ 8
- 28
common/pkgs/db/object_block.go View File

@@ -2,14 +2,12 @@ package db

import (
"database/sql"
"fmt"
"strconv"
"strings"

"github.com/jmoiron/sqlx"
cdssdk "gitlink.org.cn/cloudream/common/sdks/storage"
stgmod "gitlink.org.cn/cloudream/storage/common/models"
"gitlink.org.cn/cloudream/storage/common/pkgs/db/model"
)

type ObjectBlockDB struct {
@@ -31,6 +29,14 @@ func (db *ObjectBlockDB) Create(ctx SQLContext, objectID cdssdk.ObjectID, index
return err
}

func (db *ObjectBlockDB) BatchCreate(ctx SQLContext, blocks []stgmod.ObjectBlock) error {
_, err := sqlx.NamedExec(ctx,
"insert ignore into ObjectBlock(ObjectID, `Index`, NodeID, FileHash) values(:ObjectID, :Index, :NodeID, :FileHash)",
blocks,
)
return err
}

func (db *ObjectBlockDB) DeleteByObjectID(ctx SQLContext, objectID cdssdk.ObjectID) error {
_, err := ctx.Exec("delete from ObjectBlock where ObjectID = ?", objectID)
return err
@@ -65,32 +71,6 @@ func (db *ObjectBlockDB) CountBlockWithHash(ctx SQLContext, fileHash string) (in
return cnt, err
}

func (db *ObjectBlockDB) GetPackageBlockDetails(ctx SQLContext, packageID cdssdk.PackageID) ([]stgmod.ObjectDetail, error) {
var objs []model.TempObject
err := sqlx.Select(ctx, &objs, "select * from Object where PackageID = ? order by ObjectID asc", packageID)
if err != nil {
return nil, fmt.Errorf("getting objects: %w", err)
}

rets := make([]stgmod.ObjectDetail, 0, len(objs))

for _, obj := range objs {
var blocks []stgmod.ObjectBlock
err = sqlx.Select(ctx,
&blocks,
"select * from ObjectBlock where ObjectID = ? order by Index",
obj.ObjectID,
)
if err != nil {
return nil, err
}

rets = append(rets, stgmod.NewObjectDetail(obj.ToObject(), blocks))
}

return rets, nil
}

// 按逗号切割字符串,并将每一个部分解析为一个int64的ID。
// 注:需要外部保证分隔的每一个部分都是正确的10进制数字格式
func splitConcatedNodeID(idStr string) []cdssdk.NodeID {


+ 15
- 0
common/pkgs/db/pinned_object.go View File

@@ -34,6 +34,11 @@ func (*PinnedObjectDB) Create(ctx SQLContext, nodeID cdssdk.NodeID, objectID cds
return err
}

func (*PinnedObjectDB) TryCreate(ctx SQLContext, nodeID cdssdk.NodeID, objectID cdssdk.ObjectID, createTime time.Time) error {
_, err := ctx.Exec("insert ignore into PinnedObject values(?,?,?)", nodeID, objectID, createTime)
return err
}

func (*PinnedObjectDB) CreateFromPackage(ctx SQLContext, packageID cdssdk.PackageID, nodeID cdssdk.NodeID) error {
_, err := ctx.Exec(
"insert ignore into PinnedObject(NodeID, ObjectID, CreateTime) select ? as NodeID, ObjectID, ? as CreateTime from Object where PackageID = ?",
@@ -44,6 +49,16 @@ func (*PinnedObjectDB) CreateFromPackage(ctx SQLContext, packageID cdssdk.Packag
return err
}

func (db *PinnedObjectDB) ObjectBatchCreate(ctx SQLContext, objectID cdssdk.ObjectID, nodeIDs []cdssdk.NodeID) error {
for _, id := range nodeIDs {
err := db.TryCreate(ctx, id, objectID, time.Now())
if err != nil {
return err
}
}
return nil
}

func (*PinnedObjectDB) Delete(ctx SQLContext, nodeID cdssdk.NodeID, objectID cdssdk.ObjectID) error {
_, err := ctx.Exec("delete from PinnedObject where NodeID = ? and ObjectID = ?", nodeID, objectID)
return err


+ 154
- 92
common/pkgs/iterator/download_object_iterator.go View File

@@ -3,16 +3,20 @@ package iterator
import (
"fmt"
"io"
"math/rand"
"math"
"reflect"

"github.com/samber/lo"

"gitlink.org.cn/cloudream/common/pkgs/bitmap"
"gitlink.org.cn/cloudream/common/pkgs/logger"
cdssdk "gitlink.org.cn/cloudream/common/sdks/storage"

myio "gitlink.org.cn/cloudream/common/utils/io"
mysort "gitlink.org.cn/cloudream/common/utils/sort"
"gitlink.org.cn/cloudream/storage/common/consts"
stgglb "gitlink.org.cn/cloudream/storage/common/globals"
stgmod "gitlink.org.cn/cloudream/storage/common/models"
stgmodels "gitlink.org.cn/cloudream/storage/common/models"
"gitlink.org.cn/cloudream/storage/common/pkgs/db/model"
"gitlink.org.cn/cloudream/storage/common/pkgs/distlock"
@@ -28,8 +32,10 @@ type IterDownloadingObject struct {
}

type DownloadNodeInfo struct {
Node model.Node
IsSameLocation bool
Node model.Node
ObjectPinned bool
Blocks []stgmod.ObjectBlock
Distance float64
}

type DownloadContext struct {
@@ -114,136 +120,192 @@ func (i *DownloadObjectIterator) Close() {
}
}

// chooseDownloadNode 选择一个下载节点
// 1. 从与当前客户端相同地域的节点中随机选一个
// 2. 没有用的话从所有节点中随机选一个
func (i *DownloadObjectIterator) chooseDownloadNode(entries []DownloadNodeInfo) DownloadNodeInfo {
sameLocationEntries := lo.Filter(entries, func(e DownloadNodeInfo, i int) bool { return e.IsSameLocation })
if len(sameLocationEntries) > 0 {
return sameLocationEntries[rand.Intn(len(sameLocationEntries))]
func (iter *DownloadObjectIterator) downloadNoneOrRepObject(coorCli *coormq.Client, ctx *DownloadContext, obj stgmodels.ObjectDetail) (io.ReadCloser, error) {
allNodes, err := iter.sortDownloadNodes(coorCli, ctx, obj)
if err != nil {
return nil, err
}
bsc, blocks := iter.getMinReadingBlockSolution(allNodes, 1)
osc, node := iter.getMinReadingObjectSolution(allNodes, 1)
if bsc < osc {
return downloadFile(ctx, blocks[0].Node, blocks[0].Block.FileHash)
}

return entries[rand.Intn(len(entries))]
}

func (iter *DownloadObjectIterator) downloadNoneOrRepObject(coorCli *coormq.Client, ctx *DownloadContext, obj stgmodels.ObjectDetail) (io.ReadCloser, error) {
if len(obj.Blocks) == 0 {
// bsc >= osc,如果osc是MaxFloat64,那么bsc也一定是,也就意味着没有足够块来恢复文件
if osc == math.MaxFloat64 {
return nil, fmt.Errorf("no node has this object")
}

//采取直接读,优先选内网节点
var chosenNodes []DownloadNodeInfo
return downloadFile(ctx, *node, obj.Object.FileHash)
}

func (iter *DownloadObjectIterator) downloadECObject(coorCli *coormq.Client, ctx *DownloadContext, obj stgmodels.ObjectDetail, ecRed *cdssdk.ECRedundancy) (io.ReadCloser, error) {
allNodes, err := iter.sortDownloadNodes(coorCli, ctx, obj)
if err != nil {
return nil, err
}
bsc, blocks := iter.getMinReadingBlockSolution(allNodes, ecRed.K)
osc, node := iter.getMinReadingObjectSolution(allNodes, ecRed.K)
if bsc < osc {
var fileStrs []io.ReadCloser

grpBlocks := obj.GroupBlocks()
for _, grp := range grpBlocks {
getNodesResp, err := coorCli.GetNodes(coormq.NewGetNodes(grp.NodeIDs))
rs, err := ec.NewRs(ecRed.K, ecRed.N, ecRed.ChunkSize)
if err != nil {
continue
return nil, fmt.Errorf("new rs: %w", err)
}

downloadNodes := lo.Map(getNodesResp.Nodes, func(node model.Node, index int) DownloadNodeInfo {
return DownloadNodeInfo{
Node: node,
IsSameLocation: node.LocationID == stgglb.Local.LocationID,
for i, b := range blocks {
str, err := downloadFile(ctx, b.Node, b.Block.FileHash)
if err != nil {
for i -= 1; i >= 0; i-- {
fileStrs[i].Close()
}
return nil, fmt.Errorf("donwloading file: %w", err)
}
})

chosenNodes = append(chosenNodes, iter.chooseDownloadNode(downloadNodes))
}
fileStrs = append(fileStrs, str)
}

var fileStrs []io.ReadCloser
fileReaders, filesCloser := myio.ToReaders(fileStrs)

for i := range grpBlocks {
str, err := downloadFile(ctx, chosenNodes[i], grpBlocks[i].FileHash)
if err != nil {
for i -= 1; i >= 0; i-- {
fileStrs[i].Close()
}
return nil, fmt.Errorf("donwloading file: %w", err)
var indexes []int
for _, b := range blocks {
indexes = append(indexes, b.Block.Index)
}

fileStrs = append(fileStrs, str)
outputs, outputsCloser := myio.ToReaders(rs.ReconstructData(fileReaders, indexes))
return myio.AfterReadClosed(myio.Length(myio.ChunkedJoin(outputs, int(ecRed.ChunkSize)), obj.Object.Size), func(c io.ReadCloser) {
filesCloser()
outputsCloser()
}), nil
}

fileReaders, filesCloser := myio.ToReaders(fileStrs)
return myio.AfterReadClosed(myio.Length(myio.Join(fileReaders), obj.Object.Size), func(c io.ReadCloser) {
filesCloser()
}), nil
// bsc >= osc,如果osc是MaxFloat64,那么bsc也一定是,也就意味着没有足够块来恢复文件
if osc == math.MaxFloat64 {
return nil, fmt.Errorf("no enough blocks to reconstruct the file, want %d, get only %d", ecRed.K, len(blocks))
}

return downloadFile(ctx, *node, obj.Object.FileHash)
}

func (iter *DownloadObjectIterator) downloadECObject(coorCli *coormq.Client, ctx *DownloadContext, obj stgmodels.ObjectDetail, ecRed *cdssdk.ECRedundancy) (io.ReadCloser, error) {
//采取直接读,优先选内网节点
var chosenNodes []DownloadNodeInfo
var chosenBlocks []stgmodels.GrouppedObjectBlock
grpBlocks := obj.GroupBlocks()
for i := range grpBlocks {
if len(chosenBlocks) == ecRed.K {
break
func (iter *DownloadObjectIterator) sortDownloadNodes(coorCli *coormq.Client, ctx *DownloadContext, obj stgmodels.ObjectDetail) ([]*DownloadNodeInfo, error) {
var nodeIDs []cdssdk.NodeID
for _, id := range obj.PinnedAt {
if !lo.Contains(nodeIDs, id) {
nodeIDs = append(nodeIDs, id)
}
getNodesResp, err := coorCli.GetNodes(coormq.NewGetNodes(grpBlocks[i].NodeIDs))
if err != nil {
continue
}
for _, b := range obj.Blocks {
if !lo.Contains(nodeIDs, b.NodeID) {
nodeIDs = append(nodeIDs, b.NodeID)
}
}

downloadNodes := lo.Map(getNodesResp.Nodes, func(node model.Node, index int) DownloadNodeInfo {
return DownloadNodeInfo{
Node: node,
IsSameLocation: node.LocationID == stgglb.Local.LocationID,
}
})
getNodes, err := coorCli.GetNodes(coormq.NewGetNodes(nodeIDs))
if err != nil {
return nil, fmt.Errorf("getting nodes: %w", err)
}

chosenBlocks = append(chosenBlocks, grpBlocks[i])
chosenNodes = append(chosenNodes, iter.chooseDownloadNode(downloadNodes))
downloadNodeMap := make(map[cdssdk.NodeID]*DownloadNodeInfo)
for _, id := range obj.PinnedAt {
node, ok := downloadNodeMap[id]
if !ok {
mod := *getNodes.GetNode(id)
node = &DownloadNodeInfo{
Node: mod,
ObjectPinned: true,
Distance: iter.getNodeDistance(mod),
}
downloadNodeMap[id] = node
}

node.ObjectPinned = true
}

if len(chosenBlocks) < ecRed.K {
return nil, fmt.Errorf("no enough blocks to reconstruct the file, want %d, get only %d", ecRed.K, len(chosenBlocks))
for _, b := range obj.Blocks {
node, ok := downloadNodeMap[b.NodeID]
if !ok {
mod := *getNodes.GetNode(b.NodeID)
node = &DownloadNodeInfo{
Node: mod,
Distance: iter.getNodeDistance(mod),
}
downloadNodeMap[b.NodeID] = node
}

node.Blocks = append(node.Blocks, b)
}

var fileStrs []io.ReadCloser
return mysort.Sort(lo.Values(downloadNodeMap), func(left, right *DownloadNodeInfo) int {
return mysort.Cmp(left.Distance, right.Distance)
}), nil
}

rs, err := ec.NewRs(ecRed.K, ecRed.N, ecRed.ChunkSize)
if err != nil {
return nil, fmt.Errorf("new rs: %w", err)
}
type downloadBlock struct {
Node model.Node
Block stgmod.ObjectBlock
}

for i := range chosenBlocks {
str, err := downloadFile(ctx, chosenNodes[i], chosenBlocks[i].FileHash)
if err != nil {
for i -= 1; i >= 0; i-- {
fileStrs[i].Close()
func (iter *DownloadObjectIterator) getMinReadingBlockSolution(sortedNodes []*DownloadNodeInfo, k int) (float64, []downloadBlock) {
gotBlocksMap := bitmap.Bitmap64(0)
var gotBlocks []downloadBlock
dist := float64(0.0)
for _, n := range sortedNodes {
for _, b := range n.Blocks {
if !gotBlocksMap.Get(b.Index) {
gotBlocks = append(gotBlocks, downloadBlock{
Node: n.Node,
Block: b,
})
gotBlocksMap.Set(b.Index, true)
dist += n.Distance
}

if len(gotBlocks) >= k {
return dist, gotBlocks
}
return nil, fmt.Errorf("donwloading file: %w", err)
}
}

return math.MaxFloat64, gotBlocks
}

fileStrs = append(fileStrs, str)
func (iter *DownloadObjectIterator) getMinReadingObjectSolution(sortedNodes []*DownloadNodeInfo, k int) (float64, *model.Node) {
dist := math.MaxFloat64
var downloadNode *model.Node
for _, n := range sortedNodes {
if n.ObjectPinned && float64(k)*n.Distance < dist {
dist = float64(k) * n.Distance
downloadNode = &n.Node
}
}

fileReaders, filesCloser := myio.ToReaders(fileStrs)
return dist, downloadNode
}

func (iter *DownloadObjectIterator) getNodeDistance(node model.Node) float64 {
if stgglb.Local.NodeID != nil {
if node.NodeID == *stgglb.Local.NodeID {
return consts.NodeDistanceSameNode
}
}

var indexes []int
for _, b := range chosenBlocks {
indexes = append(indexes, b.Index)
if node.LocationID == stgglb.Local.LocationID {
return consts.NodeDistanceSameLocation
}

outputs, outputsCloser := myio.ToReaders(rs.ReconstructData(fileReaders, indexes))
return myio.AfterReadClosed(myio.Length(myio.ChunkedJoin(outputs, int(ecRed.ChunkSize)), obj.Object.Size), func(c io.ReadCloser) {
filesCloser()
outputsCloser()
}), nil
return consts.NodeDistanceOther
}

func downloadFile(ctx *DownloadContext, node DownloadNodeInfo, fileHash string) (io.ReadCloser, error) {
func downloadFile(ctx *DownloadContext, node model.Node, fileHash string) (io.ReadCloser, error) {
// 如果客户端与节点在同一个地域,则使用内网地址连接节点
nodeIP := node.Node.ExternalIP
grpcPort := node.Node.ExternalGRPCPort
if node.IsSameLocation {
nodeIP = node.Node.LocalIP
grpcPort = node.Node.LocalGRPCPort
nodeIP := node.ExternalIP
grpcPort := node.ExternalGRPCPort
if node.LocationID == stgglb.Local.LocationID {
nodeIP = node.LocalIP
grpcPort = node.LocalGRPCPort

logger.Infof("client and node %d are at the same location, use local ip", node.Node.NodeID)
logger.Infof("client and node %d are at the same location, use local ip", node.NodeID)
}

if stgglb.IPFSPool != nil {
@@ -257,7 +319,7 @@ func downloadFile(ctx *DownloadContext, node DownloadNodeInfo, fileHash string)
logger.Warnf("download from local IPFS failed, so try to download from node %s, err: %s", nodeIP, err.Error())
}

return downloadFromNode(ctx, node.Node.NodeID, nodeIP, grpcPort, fileHash)
return downloadFromNode(ctx, node.NodeID, nodeIP, grpcPort, fileHash)
}

func downloadFromNode(ctx *DownloadContext, nodeID cdssdk.NodeID, nodeIP string, grpcPort int, fileHash string) (io.ReadCloser, error) {


+ 9
- 0
common/pkgs/mq/coordinator/node.go View File

@@ -60,6 +60,15 @@ func NewGetNodesResp(nodes []model.Node) *GetNodesResp {
Nodes: nodes,
}
}
func (r *GetNodesResp) GetNode(id cdssdk.NodeID) *model.Node {
for _, n := range r.Nodes {
if n.NodeID == id {
return &n
}
}

return nil
}
func (client *Client) GetNodes(msg *GetNodes) (*GetNodesResp, error) {
return mq.Request(Service.GetNodes, client.rabbitCli, msg)
}

+ 1
- 0
common/pkgs/mq/coordinator/object.go View File

@@ -83,6 +83,7 @@ type ChangeObjectRedundancyResp struct {
type ChangeObjectRedundancyEntry struct {
ObjectID cdssdk.ObjectID `json:"objectID"`
Redundancy cdssdk.Redundancy `json:"redundancy"`
PinnedAt []cdssdk.NodeID `json:"pinnedAt"`
Blocks []stgmod.ObjectBlock `json:"blocks"`
}



+ 10
- 7
common/pkgs/mq/coordinator/storage.go View File

@@ -5,6 +5,7 @@ import (

"gitlink.org.cn/cloudream/common/pkgs/mq"
cdssdk "gitlink.org.cn/cloudream/common/sdks/storage"
stgmod "gitlink.org.cn/cloudream/storage/common/models"
"gitlink.org.cn/cloudream/storage/common/pkgs/db/model"
)

@@ -55,19 +56,21 @@ var _ = Register(Service.StoragePackageLoaded)

type StoragePackageLoaded struct {
mq.MessageBodyBase
UserID cdssdk.UserID `json:"userID"`
StorageID cdssdk.StorageID `json:"storageID"`
PackageID cdssdk.PackageID `json:"packageID"`
UserID cdssdk.UserID `json:"userID"`
StorageID cdssdk.StorageID `json:"storageID"`
PackageID cdssdk.PackageID `json:"packageID"`
PinnedBlocks []stgmod.ObjectBlock `json:"pinnedBlocks"`
}
type StoragePackageLoadedResp struct {
mq.MessageBodyBase
}

func NewStoragePackageLoaded(userID cdssdk.UserID, stgID cdssdk.StorageID, packageID cdssdk.PackageID) *StoragePackageLoaded {
func NewStoragePackageLoaded(userID cdssdk.UserID, stgID cdssdk.StorageID, packageID cdssdk.PackageID, pinnedBlocks []stgmod.ObjectBlock) *StoragePackageLoaded {
return &StoragePackageLoaded{
UserID: userID,
PackageID: packageID,
StorageID: stgID,
UserID: userID,
PackageID: packageID,
StorageID: stgID,
PinnedBlocks: pinnedBlocks,
}
}
func NewStoragePackageLoadedResp() *StoragePackageLoadedResp {


+ 18
- 0
common/pkgs/mq/scanner/event/clean_pinned.go View File

@@ -0,0 +1,18 @@
package event

import cdssdk "gitlink.org.cn/cloudream/common/sdks/storage"

type CleanPinned struct {
EventBase
PackageID cdssdk.PackageID `json:"nodeID"`
}

func NewCleanPinned(packageID cdssdk.PackageID) *CleanPinned {
return &CleanPinned{
PackageID: packageID,
}
}

func init() {
Register[*CleanPinned]()
}

+ 1
- 1
coordinator/internal/mq/object.go View File

@@ -35,7 +35,7 @@ func (svc *Service) GetPackageObjectDetails(msg *coormq.GetPackageObjectDetails)
return fmt.Errorf("getting package by id: %w", err)
}

details, err = svc.db.ObjectBlock().GetPackageBlockDetails(tx, msg.PackageID)
details, err = svc.db.Object().GetPackageObjectDetails(tx, msg.PackageID)
if err != nil {
return fmt.Errorf("getting package block details: %w", err)
}


+ 1
- 1
coordinator/internal/mq/package.go View File

@@ -131,7 +131,7 @@ func (svc *Service) GetPackageCachedNodes(msg *coormq.GetPackageCachedNodes) (*c
}

// 这个函数只是统计哪些节点缓存了Package中的数据,不需要多么精确,所以可以不用事务
objDetails, err := svc.db.ObjectBlock().GetPackageBlockDetails(svc.db.SQLCtx(), msg.PackageID)
objDetails, err := svc.db.Object().GetPackageObjectDetails(svc.db.SQLCtx(), msg.PackageID)
if err != nil {
logger.WithField("PackageID", msg.PackageID).
Warnf("get package block details: %s", err.Error())


+ 26
- 0
coordinator/internal/mq/storage.go View File

@@ -25,6 +25,15 @@ func (svc *Service) GetStorageInfo(msg *coormq.GetStorageInfo) (*coormq.GetStora

func (svc *Service) StoragePackageLoaded(msg *coormq.StoragePackageLoaded) (*coormq.StoragePackageLoadedResp, *mq.CodeMessage) {
err := svc.db.DoTx(sql.LevelSerializable, func(tx *sqlx.Tx) error {
// 可以不用检查用户是否存在
if ok, _ := svc.db.Package().IsAvailable(tx, msg.UserID, msg.PackageID); !ok {
return fmt.Errorf("package is not available to user")
}

if ok, _ := svc.db.Storage().IsAvailable(tx, msg.UserID, msg.StorageID); !ok {
return fmt.Errorf("storage is not available to user")
}

err := svc.db.StoragePackage().Create(tx, msg.StorageID, msg.PackageID, msg.UserID)
if err != nil {
return fmt.Errorf("creating storage package: %w", err)
@@ -35,6 +44,23 @@ func (svc *Service) StoragePackageLoaded(msg *coormq.StoragePackageLoaded) (*coo
return fmt.Errorf("creating storage package log: %w", err)
}

stg, err := svc.db.Storage().GetByID(tx, msg.StorageID)
if err != nil {
return fmt.Errorf("getting storage: %w", err)
}

err = svc.db.PinnedObject().CreateFromPackage(tx, msg.PackageID, stg.NodeID)
if err != nil {
return fmt.Errorf("creating pinned object from package: %w", err)
}

if len(msg.PinnedBlocks) > 0 {
err = svc.db.ObjectBlock().BatchCreate(tx, msg.PinnedBlocks)
if err != nil {
return fmt.Errorf("batch creating object block: %w", err)
}
}

return nil
})
if err != nil {


+ 732
- 0
scanner/internal/event/clean_pinned.go View File

@@ -0,0 +1,732 @@
package event

import (
"fmt"
"math"
"math/rand"
"strconv"

"github.com/samber/lo"
"gitlink.org.cn/cloudream/common/pkgs/bitmap"
"gitlink.org.cn/cloudream/common/pkgs/logger"
cdssdk "gitlink.org.cn/cloudream/common/sdks/storage"
mylo "gitlink.org.cn/cloudream/common/utils/lo"
mymath "gitlink.org.cn/cloudream/common/utils/math"
myref "gitlink.org.cn/cloudream/common/utils/reflect"
mysort "gitlink.org.cn/cloudream/common/utils/sort"
"gitlink.org.cn/cloudream/storage/common/consts"
stgglb "gitlink.org.cn/cloudream/storage/common/globals"
stgmod "gitlink.org.cn/cloudream/storage/common/models"
"gitlink.org.cn/cloudream/storage/common/pkgs/db/model"
"gitlink.org.cn/cloudream/storage/common/pkgs/distlock/reqbuilder"
"gitlink.org.cn/cloudream/storage/common/pkgs/ioswitch/plans"
coormq "gitlink.org.cn/cloudream/storage/common/pkgs/mq/coordinator"
scevt "gitlink.org.cn/cloudream/storage/common/pkgs/mq/scanner/event"
)

type CleanPinned struct {
*scevt.CleanPinned
}

func NewCleanPinned(evt *scevt.CleanPinned) *CleanPinned {
return &CleanPinned{
CleanPinned: evt,
}
}

func (t *CleanPinned) TryMerge(other Event) bool {
event, ok := other.(*CleanPinned)
if !ok {
return false
}

return t.PackageID == event.PackageID
}

func (t *CleanPinned) Execute(execCtx ExecuteContext) {
log := logger.WithType[CleanPinned]("Event")
log.Debugf("begin with %v", logger.FormatStruct(t.CleanPinned))
defer log.Debugf("end")

coorCli, err := stgglb.CoordinatorMQPool.Acquire()
if err != nil {
log.Warnf("new coordinator client: %s", err.Error())
return
}
defer stgglb.CoordinatorMQPool.Release(coorCli)

getObjs, err := coorCli.GetPackageObjectDetails(coormq.NewGetPackageObjectDetails(t.PackageID))
if err != nil {
log.Warnf("getting package objects: %s", err.Error())
return
}

getLoadLog, err := coorCli.GetPackageLoadLogDetails(coormq.ReqGetPackageLoadLogDetails(t.PackageID))
if err != nil {
log.Warnf("getting package load log details: %s", err.Error())
return
}
readerNodeIDs := lo.Map(getLoadLog.Logs, func(item coormq.PackageLoadLogDetail, idx int) cdssdk.NodeID { return item.Storage.NodeID })

var changeRedEntries []coormq.ChangeObjectRedundancyEntry
for _, obj := range getObjs.Objects {
entry, err := t.doOne(execCtx, readerNodeIDs, coorCli, obj)
if err != nil {
log.WithField("PackageID", obj).Warn(err.Error())
continue
}
if entry != nil {
changeRedEntries = append(changeRedEntries, *entry)
}
}

if len(changeRedEntries) > 0 {
_, err = coorCli.ChangeObjectRedundancy(coormq.ReqChangeObjectRedundancy(changeRedEntries))
if err != nil {
log.Warnf("changing object redundancy: %s", err.Error())
return
}
}
}

type doingContext struct {
execCtx ExecuteContext
readerNodeIDs []cdssdk.NodeID // 近期可能访问此对象的节点
nodesSortedByReader map[cdssdk.NodeID][]nodeDist // 拥有数据的节点到每个可能访问对象的节点按距离排序
nodeInfos map[cdssdk.NodeID]*model.Node
blockList []objectBlock // 排序后的块分布情况
nodeBlockBitmaps map[cdssdk.NodeID]*bitmap.Bitmap64 // 用位图的形式表示每一个节点上有哪些块
allBlockTypeCount int // object总共被分成了几块
minBlockTypeCount int // 最少要几块才能恢复出完整的object
nodeCombTree combinatorialTree // 节点组合树,用于加速计算容灾度

maxScore float64 // 搜索过程中得到过的最大分数
maxScoreRmBlocks []bool // 最大分数对应的删除方案

rmBlocks []bool // 当前删除方案
inversedIndex int // 当前删除方案是从上一次的方案改动哪个flag而来的
lastScore float64 // 上一次方案的分数
}

type objectBlock struct {
Index int
NodeID cdssdk.NodeID
HasEntity bool // 节点拥有实际的文件数据块
HasShadow bool // 如果节点拥有完整文件数据,那么认为这个节点拥有所有块,这些块被称为影子块
FileHash string // 只有在拥有实际文件数据块时,这个字段才有值
}

type nodeDist struct {
NodeID cdssdk.NodeID
Distance float64
}

type combinatorialTree struct {
nodes []combinatorialTreeNode
blocksMaps map[int]bitmap.Bitmap64
nodeIDToLocalNodeID map[cdssdk.NodeID]int
localNodeIDToNodeID []cdssdk.NodeID
}

const (
iterActionNone = 0
iterActionSkip = 1
iterActionBreak = 2
)

func newCombinatorialTree(nodeBlocksMaps map[cdssdk.NodeID]*bitmap.Bitmap64) combinatorialTree {
tree := combinatorialTree{
blocksMaps: make(map[int]bitmap.Bitmap64),
nodeIDToLocalNodeID: make(map[cdssdk.NodeID]int),
}

tree.nodes = make([]combinatorialTreeNode, (1 << len(nodeBlocksMaps)))
for id, mp := range nodeBlocksMaps {
tree.nodeIDToLocalNodeID[id] = len(tree.localNodeIDToNodeID)
tree.blocksMaps[len(tree.localNodeIDToNodeID)] = *mp
tree.localNodeIDToNodeID = append(tree.localNodeIDToNodeID, id)
}

tree.nodes[0].localNodeID = -1
index := 1
tree.initNode(0, &tree.nodes[0], &index)

return tree
}

func (t *combinatorialTree) initNode(minAvaiLocalNodeID int, parent *combinatorialTreeNode, index *int) {
for i := minAvaiLocalNodeID; i < len(t.nodeIDToLocalNodeID); i++ {
curIndex := *index
*index++
bitMp := t.blocksMaps[i]
bitMp.Or(&parent.blocksBitmap)

t.nodes[curIndex] = combinatorialTreeNode{
localNodeID: i,
parent: parent,
blocksBitmap: bitMp,
}
t.initNode(i+1, &t.nodes[curIndex], index)
}
}

// 获得索引指定的节点所在的层
func (t *combinatorialTree) GetDepth(index int) int {
depth := 0

// 反复判断节点在哪个子树。从左到右,子树节点的数量呈现8 4 2的变化,由此可以得到每个子树的索引值的范围
subTreeCount := 1 << len(t.nodeIDToLocalNodeID)
for index > 0 {
if index < subTreeCount {
// 定位到一个子树后,深度+1,然后进入这个子树,使用同样的方法再进行定位。
// 进入子树后需要将索引值-1,因为要去掉子树的根节点
index--
depth++
} else {
// 如果索引值不在这个子树范围内,则将值减去子树的节点数量,
// 这样每一次都可以视为使用同样的逻辑对不同大小的树进行判断。
index -= subTreeCount
}
subTreeCount >>= 1
}

return depth
}

// 更新某一个算力中心节点的块分布位图,同时更新它对应组合树节点的所有子节点。
// 如果更新到某个节点时,已有K个块,那么就不会再更新它的子节点
func (t *combinatorialTree) UpdateBitmap(nodeID cdssdk.NodeID, mp bitmap.Bitmap64, k int) {
t.blocksMaps[t.nodeIDToLocalNodeID[nodeID]] = mp
// 首先定义两种遍历树节点时的移动方式:
// 1. 竖直移动(深度增加):从一个节点移动到它最左边的子节点。每移动一步,index+1
// 2. 水平移动:从一个节点移动到它右边的兄弟节点。每移动一步,根据它所在的深度,index+8,+4,+2
// LocalNodeID从0开始,将其+1后得到移动步数steps。
// 将移动步数拆成多部分,分配到上述的两种移动方式上,并进行任意组合,且保证第一次为至少进行一次的竖直移动,移动之后的节点都会是同一个计算中心节点。
steps := t.nodeIDToLocalNodeID[nodeID] + 1
for d := 1; d <= steps; d++ {
t.iterCombBits(len(t.nodeIDToLocalNodeID)-1, steps-d, 0, func(i int) {
index := d + i
node := &t.nodes[index]

newMp := t.blocksMaps[node.localNodeID]
newMp.Or(&node.parent.blocksBitmap)
node.blocksBitmap = newMp
if newMp.Weight() >= k {
return
}

t.iterChildren(index, func(index, parentIndex, depth int) int {
curNode := &t.nodes[index]
parentNode := t.nodes[parentIndex]

newMp := t.blocksMaps[curNode.localNodeID]
newMp.Or(&parentNode.blocksBitmap)
curNode.blocksBitmap = newMp
if newMp.Weight() >= k {
return iterActionSkip
}

return iterActionNone
})
})
}
}

// 遍历树,找到至少拥有K个块的树节点的最大深度
func (t *combinatorialTree) FindKBlocksMaxDepth(k int) int {
maxDepth := -1
t.iterChildren(0, func(index, parentIndex, depth int) int {
if t.nodes[index].blocksBitmap.Weight() >= k {
if maxDepth < depth {
maxDepth = depth
}
return iterActionSkip
}
// 如果到了叶子节点,还没有找到K个块,那就认为要满足K个块,至少需要再多一个节点,即深度+1。
// 由于遍历时采用的是深度优先的算法,因此遍历到这个叶子节点时,叶子节点再加一个节点的组合已经在前面搜索过,
// 所以用当前叶子节点深度+1来作为当前分支的结果就可以,即使当前情况下增加任意一个节点依然不够K块,
// 可以使用同样的思路去递推到当前叶子节点增加两个块的情况。
if t.nodes[index].localNodeID == len(t.nodeIDToLocalNodeID)-1 {
if maxDepth < depth+1 {
maxDepth = depth + 1
}
}

return iterActionNone
})

if maxDepth == -1 || maxDepth > len(t.nodeIDToLocalNodeID) {
return len(t.nodeIDToLocalNodeID)
}

return maxDepth
}

func (t *combinatorialTree) iterCombBits(width int, count int, offset int, callback func(int)) {
if count == 0 {
callback(offset)
return
}

for b := width; b >= count; b-- {
t.iterCombBits(b-1, count-1, offset+(1<<b), callback)
}
}

func (t *combinatorialTree) iterChildren(index int, do func(index int, parentIndex int, depth int) int) {
curNode := &t.nodes[index]
childIndex := index + 1
curDepth := t.GetDepth(index)

childCounts := len(t.nodeIDToLocalNodeID) - 1 - curNode.localNodeID
if childCounts == 0 {
return
}

childTreeNodeCnt := 1 << (childCounts - 1)
for c := 0; c < childCounts; c++ {
act := t.itering(childIndex, index, curDepth+1, do)
if act == iterActionBreak {
return
}

childIndex += childTreeNodeCnt
childTreeNodeCnt >>= 1
}
}

func (t *combinatorialTree) itering(index int, parentIndex int, depth int, do func(index int, parentIndex int, depth int) int) int {
act := do(index, parentIndex, depth)
if act == iterActionBreak {
return act
}
if act == iterActionSkip {
return iterActionNone
}

curNode := &t.nodes[index]
childIndex := index + 1

childCounts := len(t.nodeIDToLocalNodeID) - 1 - curNode.localNodeID
if childCounts == 0 {
return iterActionNone
}

childTreeNodeCnt := 1 << (childCounts - 1)
for c := 0; c < childCounts; c++ {
act = t.itering(childIndex, index, depth+1, do)
if act == iterActionBreak {
return act
}

childIndex += childTreeNodeCnt
childTreeNodeCnt >>= 1
}

return iterActionNone
}

type combinatorialTreeNode struct {
localNodeID int
parent *combinatorialTreeNode
blocksBitmap bitmap.Bitmap64 // 选择了这个中心之后,所有中心一共包含多少种块
}

func (t *CleanPinned) doOne(execCtx ExecuteContext, readerNodeIDs []cdssdk.NodeID, coorCli *coormq.Client, obj stgmod.ObjectDetail) (*coormq.ChangeObjectRedundancyEntry, error) {
if len(obj.PinnedAt) == 0 && len(obj.Blocks) == 0 {
return nil, nil
}

ctx := doingContext{
execCtx: execCtx,
readerNodeIDs: readerNodeIDs,
nodesSortedByReader: make(map[cdssdk.NodeID][]nodeDist),
nodeInfos: make(map[cdssdk.NodeID]*model.Node),
nodeBlockBitmaps: make(map[cdssdk.NodeID]*bitmap.Bitmap64),
}

err := t.getNodeInfos(&ctx, coorCli, obj)
if err != nil {
return nil, err
}

err = t.makeBlockList(&ctx, obj)
if err != nil {
return nil, err
}

if ctx.blockList == nil {
return nil, nil
}

t.makeNodeBlockBitmap(&ctx)

t.sortNodeByReaderDistance(&ctx)

ctx.rmBlocks = make([]bool, len(ctx.blockList))
ctx.inversedIndex = -1
ctx.nodeCombTree = newCombinatorialTree(ctx.nodeBlockBitmaps)

ctx.lastScore = t.calcScore(&ctx)
ctx.maxScore = ctx.lastScore
ctx.maxScoreRmBlocks = mylo.ArrayClone(ctx.rmBlocks)

// 模拟退火算法的温度
curTemp := ctx.lastScore
// 结束温度
finalTemp := curTemp * 0.2
// 冷却率
coolingRate := 0.95

for curTemp > finalTemp {
ctx.inversedIndex = rand.Intn(len(ctx.rmBlocks))
block := ctx.blockList[ctx.inversedIndex]
ctx.rmBlocks[ctx.inversedIndex] = !ctx.rmBlocks[ctx.inversedIndex]
ctx.nodeBlockBitmaps[block.NodeID].Set(block.Index, !ctx.rmBlocks[ctx.inversedIndex])
ctx.nodeCombTree.UpdateBitmap(block.NodeID, *ctx.nodeBlockBitmaps[block.NodeID], ctx.minBlockTypeCount)

curScore := t.calcScore(&ctx)

dScore := curScore - ctx.lastScore
// 如果新方案比旧方案得分低,且没有要求强制接受新方案,那么就将变化改回去
if curScore == 0 || (dScore < 0 && !t.alwaysAccept(curTemp, dScore, coolingRate)) {
ctx.rmBlocks[ctx.inversedIndex] = !ctx.rmBlocks[ctx.inversedIndex]
ctx.nodeBlockBitmaps[block.NodeID].Set(block.Index, !ctx.rmBlocks[ctx.inversedIndex])
ctx.nodeCombTree.UpdateBitmap(block.NodeID, *ctx.nodeBlockBitmaps[block.NodeID], ctx.minBlockTypeCount)
fmt.Printf("\n")
} else {
fmt.Printf(" accept!\n")
ctx.lastScore = curScore
if ctx.maxScore < curScore {
ctx.maxScore = ctx.lastScore
ctx.maxScoreRmBlocks = mylo.ArrayClone(ctx.rmBlocks)
}
}
curTemp *= coolingRate
}

return t.applySolution(ctx, obj)
}

func (t *CleanPinned) getNodeInfos(ctx *doingContext, coorCli *coormq.Client, obj stgmod.ObjectDetail) error {
var nodeIDs []cdssdk.NodeID
for _, b := range obj.Blocks {
nodeIDs = append(nodeIDs, b.NodeID)
}
nodeIDs = append(nodeIDs, obj.PinnedAt...)

nodeIDs = append(nodeIDs, ctx.readerNodeIDs...)

getNode, err := coorCli.GetNodes(coormq.NewGetNodes(lo.Uniq(nodeIDs)))
if err != nil {
return fmt.Errorf("requesting to coordinator: %w", err)
}

for _, n := range getNode.Nodes {
ctx.nodeInfos[n.NodeID] = &n
}

return nil
}

func (t *CleanPinned) makeBlockList(ctx *doingContext, obj stgmod.ObjectDetail) error {
blockCnt := 1
minBlockCnt := 1
switch red := obj.Object.Redundancy.(type) {
case *cdssdk.NoneRedundancy:
return nil
case *cdssdk.RepRedundancy:
blockCnt = 1
minBlockCnt = 1
case *cdssdk.ECRedundancy:
blockCnt = red.N
minBlockCnt = red.K
default:
return fmt.Errorf("unknow redundancy type: %v", myref.TypeOfValue(obj.Object.Redundancy))
}

blocksMap := make(map[cdssdk.NodeID][]objectBlock)

// 先生成所有的影子块
for _, pinned := range obj.PinnedAt {
blocks := make([]objectBlock, 0, blockCnt)
for i := 0; i < blockCnt; i++ {
blocks = append(blocks, objectBlock{
Index: i,
NodeID: pinned,
HasShadow: true,
})
}
blocksMap[pinned] = blocks
}

// 再填充实际块
for _, b := range obj.Blocks {
blocks := blocksMap[b.NodeID]

has := false
for i := range blocks {
if blocks[i].Index == b.Index {
blocks[i].HasEntity = true
blocks[i].FileHash = b.FileHash
has = true
break
}
}

if has {
continue
}

blocks = append(blocks, objectBlock{
Index: b.Index,
NodeID: b.NodeID,
HasEntity: true,
FileHash: b.FileHash,
})
blocksMap[b.NodeID] = blocks
}

var sortedBlocks []objectBlock
for _, bs := range blocksMap {
sortedBlocks = append(sortedBlocks, bs...)
}
sortedBlocks = mysort.Sort(sortedBlocks, func(left objectBlock, right objectBlock) int {
d := left.NodeID - right.NodeID
if d != 0 {
return int(d)
}

return left.Index - right.Index
})

ctx.allBlockTypeCount = blockCnt
ctx.minBlockTypeCount = minBlockCnt
ctx.blockList = sortedBlocks
return nil
}

func (t *CleanPinned) makeNodeBlockBitmap(ctx *doingContext) {
for _, b := range ctx.blockList {
mp, ok := ctx.nodeBlockBitmaps[b.NodeID]
if !ok {
nb := bitmap.Bitmap64(0)
mp = &nb
ctx.nodeBlockBitmaps[b.NodeID] = mp
}
mp.Set(b.Index, true)
}
}

func (t *CleanPinned) sortNodeByReaderDistance(ctx *doingContext) {
for _, r := range ctx.readerNodeIDs {
var nodeDists []nodeDist

for n := range ctx.nodeBlockBitmaps {
if r == n {
// 同节点时距离视为0.1
nodeDists = append(nodeDists, nodeDist{
NodeID: n,
Distance: consts.NodeDistanceSameNode,
})
} else if ctx.nodeInfos[r].LocationID == ctx.nodeInfos[n].LocationID {
// 同地区时距离视为1
nodeDists = append(nodeDists, nodeDist{
NodeID: n,
Distance: consts.NodeDistanceSameLocation,
})
} else {
// 不同地区时距离视为5
nodeDists = append(nodeDists, nodeDist{
NodeID: n,
Distance: consts.NodeDistanceOther,
})
}
}

ctx.nodesSortedByReader[r] = mysort.Sort(nodeDists, func(left, right nodeDist) int { return mysort.Cmp(left.Distance, right.Distance) })
}
}

func (t *CleanPinned) calcScore(ctx *doingContext) float64 {
dt := t.calcDisasterTolerance(ctx)
ac := t.calcMinAccessCost(ctx)
sc := t.calcSpaceCost(ctx)

dtSc := 1.0
if dt < 1 {
dtSc = 0
} else if dt >= 2 {
dtSc = 1.5
}

newSc := 0.0
if dt == 0 || ac == 0 {
newSc = 0
} else {
newSc = dtSc / (sc * ac)
}

fmt.Printf("solu: %v, cur: %v, dt: %v, ac: %v, sc: %v ", ctx.rmBlocks, newSc, dt, ac, sc)
return newSc
}

// 计算容灾度
func (t *CleanPinned) calcDisasterTolerance(ctx *doingContext) float64 {
if ctx.inversedIndex != -1 {
node := ctx.blockList[ctx.inversedIndex]
ctx.nodeCombTree.UpdateBitmap(node.NodeID, *ctx.nodeBlockBitmaps[node.NodeID], ctx.minBlockTypeCount)
}
return float64(len(ctx.nodeBlockBitmaps) - ctx.nodeCombTree.FindKBlocksMaxDepth(ctx.minBlockTypeCount))
}

// 计算最小访问数据的代价
func (t *CleanPinned) calcMinAccessCost(ctx *doingContext) float64 {
cost := math.MaxFloat64
for _, reader := range ctx.readerNodeIDs {
tarNodes := ctx.nodesSortedByReader[reader]
gotBlocks := bitmap.Bitmap64(0)
thisCost := 0.0

for _, tar := range tarNodes {
tarNodeMp := ctx.nodeBlockBitmaps[tar.NodeID]

// 只需要从目的节点上获得缺少的块
curWeigth := gotBlocks.Weight()
// 下面的if会在拿到k个块之后跳出循环,所以or多了块也没关系
gotBlocks.Or(tarNodeMp)
willGetBlocks := mymath.Min(gotBlocks.Weight()-curWeigth, ctx.minBlockTypeCount-curWeigth)
thisCost += float64(willGetBlocks) * float64(tar.Distance)

if gotBlocks.Weight() >= ctx.minBlockTypeCount {
break
}
}
if gotBlocks.Weight() >= ctx.minBlockTypeCount {
cost = math.Min(cost, thisCost)
}
}

return cost
}

// 计算冗余度
func (t *CleanPinned) calcSpaceCost(ctx *doingContext) float64 {
blockCount := 0
for i, b := range ctx.blockList {
if ctx.rmBlocks[i] {
continue
}

if b.HasEntity {
blockCount++
}
if b.HasShadow {
blockCount++
}
}
// 所有算力中心上拥有的块的总数 / 一个对象被分成了几个块
return float64(blockCount) / float64(ctx.minBlockTypeCount)
}

// 如果新方案得分比旧方案小,那么在一定概率内也接受新方案
func (t *CleanPinned) alwaysAccept(curTemp float64, dScore float64, coolingRate float64) bool {
v := math.Exp(dScore / curTemp / coolingRate)
fmt.Printf(" -- chance: %v, temp: %v", v, curTemp)
return v > rand.Float64()
}

func (t *CleanPinned) applySolution(ctx doingContext, obj stgmod.ObjectDetail) (*coormq.ChangeObjectRedundancyEntry, error) {
entry := coormq.ChangeObjectRedundancyEntry{
ObjectID: obj.Object.ObjectID,
Redundancy: obj.Object.Redundancy,
}
fmt.Printf("final solu: %v, score: %v\n", ctx.maxScoreRmBlocks, ctx.maxScore)

reconstrct := make(map[cdssdk.NodeID]*[]int)
for i, f := range ctx.maxScoreRmBlocks {
block := ctx.blockList[i]
if !f {
entry.Blocks = append(entry.Blocks, stgmod.ObjectBlock{
ObjectID: obj.Object.ObjectID,
Index: block.Index,
NodeID: block.NodeID,
FileHash: block.FileHash,
})

// 如果这个块是影子块,那么就要从完整对象里重建这个块
if !block.HasEntity {
re, ok := reconstrct[block.NodeID]
if !ok {
re = &[]int{}
reconstrct[block.NodeID] = re
}

*re = append(*re, block.Index)
}
}
}

bld := reqbuilder.NewBuilder()
for id := range reconstrct {
bld.IPFS().Buzy(id)
}

mutex, err := bld.MutexLock(ctx.execCtx.Args.DistLock)
if err != nil {
return nil, fmt.Errorf("acquiring distlock: %w", err)
}
defer mutex.Unlock()

if ecRed, ok := obj.Object.Redundancy.(*cdssdk.ECRedundancy); ok {
for id, idxs := range reconstrct {
bld := plans.NewPlanBuilder()
agt := bld.AtAgent(*ctx.nodeInfos[id])

strs := agt.IPFSRead(obj.Object.FileHash).ChunkedSplit(ecRed.ChunkSize, ecRed.K, true)
ss := agt.ECReconstructAny(*ecRed, lo.Range(ecRed.K), *idxs, strs.Streams...)
for i, s := range ss.Streams {
s.IPFSWrite(fmt.Sprintf("%d", (*idxs)[i]))
}

plan, err := bld.Build()
if err != nil {
return nil, fmt.Errorf("building io switch plan: %w", err)
}

exec, err := plans.Execute(*plan)
if err != nil {
return nil, fmt.Errorf("executing io switch plan: %w", err)
}
ret, err := exec.Wait()
if err != nil {
return nil, fmt.Errorf("executing io switch plan: %w", err)
}

for k, v := range ret.ResultValues {
idx, err := strconv.ParseInt(k, 10, 32)
if err != nil {
return nil, fmt.Errorf("parsing plan result: %w", err)
}

for i := range entry.Blocks {
if entry.Blocks[i].NodeID == id && entry.Blocks[i].Index == int(idx) {
entry.Blocks[i].FileHash = v.(string)
}
}
}

}
} else if _, ok := obj.Object.Redundancy.(*cdssdk.RepRedundancy); ok {
// rep模式不分块,所以每一个Block的FileHash就是完整文件的FileHash
for i := range entry.Blocks {
entry.Blocks[i].FileHash = obj.Object.FileHash
}
}

return &entry, nil
}

func init() {
RegisterMessageConvertor(NewCleanPinned)
}

+ 264
- 0
scanner/internal/event/clean_pinned_test.go View File

@@ -0,0 +1,264 @@
package event

import (
"testing"

. "github.com/smartystreets/goconvey/convey"
"gitlink.org.cn/cloudream/common/pkgs/bitmap"
cdssdk "gitlink.org.cn/cloudream/common/sdks/storage"
)

func newTreeTest(nodeBlocksMap []bitmap.Bitmap64) combinatorialTree {
tree := combinatorialTree{
blocksMaps: make(map[int]bitmap.Bitmap64),
nodeIDToLocalNodeID: make(map[cdssdk.NodeID]int),
}

tree.nodes = make([]combinatorialTreeNode, (1 << len(nodeBlocksMap)))
for id, mp := range nodeBlocksMap {
tree.nodeIDToLocalNodeID[cdssdk.NodeID(id)] = len(tree.localNodeIDToNodeID)
tree.blocksMaps[len(tree.localNodeIDToNodeID)] = mp
tree.localNodeIDToNodeID = append(tree.localNodeIDToNodeID, cdssdk.NodeID(id))
}

tree.nodes[0].localNodeID = -1
index := 1
tree.initNode(0, &tree.nodes[0], &index)

return tree
}

func Test_iterCombBits(t *testing.T) {
testcases := []struct {
title string
width int
count int
expectedValues []int
}{
{
title: "1 of 4",
width: 4,
count: 1,
expectedValues: []int{16, 8, 4, 2},
},

{
title: "2 of 4",
width: 4,
count: 2,
expectedValues: []int{24, 20, 18, 12, 10, 6},
},

{
title: "3 of 4",
width: 4,
count: 3,
expectedValues: []int{28, 26, 22, 14},
},

{
title: "4 of 4",
width: 4,
count: 4,
expectedValues: []int{30},
},
}

for _, test := range testcases {
Convey(test.title, t, func() {
var ret []int
var t combinatorialTree
t.iterCombBits(test.width, test.count, 0, func(i int) {
ret = append(ret, i)
})

So(ret, ShouldResemble, test.expectedValues)
})
}
}

func Test_newCombinatorialTree(t *testing.T) {
testcases := []struct {
title string
nodeBlocks []bitmap.Bitmap64
expectedTreeNodeLocalIDs []int
expectedTreeNodeBitmaps []int
}{
{
title: "1个节点",
nodeBlocks: []bitmap.Bitmap64{1},
expectedTreeNodeLocalIDs: []int{-1, 0},
expectedTreeNodeBitmaps: []int{0, 1},
},
{
title: "2个节点",
nodeBlocks: []bitmap.Bitmap64{1, 0},
expectedTreeNodeLocalIDs: []int{-1, 0, 1, 1},
expectedTreeNodeBitmaps: []int{0, 1, 1, 0},
},
{
title: "4个节点",
nodeBlocks: []bitmap.Bitmap64{1, 2, 4, 8},
expectedTreeNodeLocalIDs: []int{-1, 0, 1, 2, 3, 3, 2, 3, 3, 1, 2, 3, 3, 2, 3, 3},
expectedTreeNodeBitmaps: []int{0, 1, 3, 7, 15, 11, 5, 13, 9, 2, 6, 14, 10, 4, 12, 8},
},
}

for _, test := range testcases {
Convey(test.title, t, func() {
t := newTreeTest(test.nodeBlocks)

var localIDs []int
var bitmaps []int
for _, n := range t.nodes {
localIDs = append(localIDs, n.localNodeID)
bitmaps = append(bitmaps, int(n.blocksBitmap))
}

So(localIDs, ShouldResemble, test.expectedTreeNodeLocalIDs)
So(bitmaps, ShouldResemble, test.expectedTreeNodeBitmaps)
})
}
}

func Test_UpdateBitmap(t *testing.T) {
testcases := []struct {
title string
nodeBlocks []bitmap.Bitmap64
updatedNodeID cdssdk.NodeID
updatedBitmap bitmap.Bitmap64
k int
expectedTreeNodeBitmaps []int
}{

{
title: "4个节点,更新但值不变",
nodeBlocks: []bitmap.Bitmap64{1, 2, 4, 8},
updatedNodeID: cdssdk.NodeID(0),
updatedBitmap: bitmap.Bitmap64(1),
k: 4,
expectedTreeNodeBitmaps: []int{0, 1, 3, 7, 15, 11, 5, 13, 9, 2, 6, 14, 10, 4, 12, 8},
},

{
title: "4个节点,更新0",
nodeBlocks: []bitmap.Bitmap64{1, 2, 4, 8},
updatedNodeID: cdssdk.NodeID(0),
updatedBitmap: bitmap.Bitmap64(2),
k: 4,
expectedTreeNodeBitmaps: []int{0, 2, 2, 6, 14, 10, 6, 14, 10, 2, 6, 14, 10, 4, 12, 8},
},

{
title: "4个节点,更新1",
nodeBlocks: []bitmap.Bitmap64{1, 2, 4, 8},
updatedNodeID: cdssdk.NodeID(1),
updatedBitmap: bitmap.Bitmap64(1),
k: 4,
expectedTreeNodeBitmaps: []int{0, 1, 1, 5, 13, 9, 5, 13, 9, 1, 5, 13, 9, 4, 12, 8},
},

{
title: "4个节点,更新2",
nodeBlocks: []bitmap.Bitmap64{1, 2, 4, 8},
updatedNodeID: cdssdk.NodeID(2),
updatedBitmap: bitmap.Bitmap64(1),
k: 4,
expectedTreeNodeBitmaps: []int{0, 1, 3, 3, 11, 11, 1, 9, 9, 2, 3, 11, 10, 1, 9, 8},
},

{
title: "4个节点,更新3",
nodeBlocks: []bitmap.Bitmap64{1, 2, 4, 8},
updatedNodeID: cdssdk.NodeID(3),
updatedBitmap: bitmap.Bitmap64(1),
k: 4,
expectedTreeNodeBitmaps: []int{0, 1, 3, 7, 7, 3, 5, 5, 1, 2, 6, 7, 3, 4, 5, 1},
},

{
title: "4个节点,k<4,更新0,0之前没有k个块,现在拥有",
nodeBlocks: []bitmap.Bitmap64{1, 2, 4, 8},
updatedNodeID: cdssdk.NodeID(0),
updatedBitmap: bitmap.Bitmap64(3),
k: 2,
expectedTreeNodeBitmaps: []int{0, 3, 3, 7, 15, 11, 5, 13, 9, 2, 6, 14, 10, 4, 12, 8},
},
{
title: "4个节点,k<4,更新0,0之前有k个块,现在没有",
nodeBlocks: []bitmap.Bitmap64{3, 4, 0, 0},
updatedNodeID: cdssdk.NodeID(0),
updatedBitmap: bitmap.Bitmap64(0),
k: 2,
expectedTreeNodeBitmaps: []int{0, 0, 4, 4, 4, 4, 0, 0, 0, 4, 4, 4, 4, 0, 0, 0},
},
}

for _, test := range testcases {
Convey(test.title, t, func() {
t := newTreeTest(test.nodeBlocks)
t.UpdateBitmap(test.updatedNodeID, test.updatedBitmap, test.k)

var bitmaps []int
for _, n := range t.nodes {
bitmaps = append(bitmaps, int(n.blocksBitmap))
}

So(bitmaps, ShouldResemble, test.expectedTreeNodeBitmaps)
})
}
}

func Test_FindKBlocksMaxDepth(t *testing.T) {
testcases := []struct {
title string
nodeBlocks []bitmap.Bitmap64
k int
expected int
}{
{
title: "每个节点各有一个块",
nodeBlocks: []bitmap.Bitmap64{1, 2, 4, 8},
k: 2,
expected: 2,
},
{
title: "所有节点加起来块数不足",
nodeBlocks: []bitmap.Bitmap64{1, 1, 1, 1},
k: 2,
expected: 4,
},
{
title: "不同节点有相同块",
nodeBlocks: []bitmap.Bitmap64{1, 1, 2, 4},
k: 2,
expected: 3,
},
{
title: "一个节点就拥有所有块",
nodeBlocks: []bitmap.Bitmap64{3, 6, 12, 24},
k: 2,
expected: 1,
},
{
title: "只有一块,且只在某一个节点1",
nodeBlocks: []bitmap.Bitmap64{1, 0},
k: 1,
expected: 2,
},
{
title: "只有一块,且只在某一个节点2",
nodeBlocks: []bitmap.Bitmap64{0, 1},
k: 1,
expected: 2,
},
}

for _, test := range testcases {
Convey(test.title, t, func() {
t := newTreeTest(test.nodeBlocks)
ret := t.FindKBlocksMaxDepth(test.k)
So(ret, ShouldResemble, test.expected)
})
}
}

+ 48
- 0
scanner/internal/tickevent/batch_clean_pinned.go View File

@@ -0,0 +1,48 @@
package tickevent

import (
"time"

"gitlink.org.cn/cloudream/common/pkgs/logger"
"gitlink.org.cn/cloudream/storage/common/pkgs/mq/scanner/event"
evt "gitlink.org.cn/cloudream/storage/scanner/internal/event"
)

type BatchCleanPinned struct {
lastCheckStart int
}

func NewBatchCleanPinned() *BatchCleanPinned {
return &BatchCleanPinned{}
}

func (e *BatchCleanPinned) Execute(ctx ExecuteContext) {
log := logger.WithType[BatchCleanPinned]("TickEvent")
log.Debugf("begin")
defer log.Debugf("end")

// TODO 更好的策略
nowHour := time.Now().Hour()
if nowHour > 6 {
return
}

packageIDs, err := ctx.Args.DB.Package().BatchGetAllPackageIDs(ctx.Args.DB.SQLCtx(), e.lastCheckStart, CheckPackageBatchSize)
if err != nil {
log.Warnf("batch get package ids failed, err: %s", err.Error())
return
}

for _, id := range packageIDs {
ctx.Args.EventExecutor.Post(evt.NewCleanPinned(event.NewCleanPinned(id)))
}

// 如果结果的长度小于预期的长度,则认为已经查询了所有,下次从头再来
if len(packageIDs) < CheckPackageBatchSize {
e.lastCheckStart = 0
log.Debugf("all package clean pinned, next time will start check at offset 0")

} else {
e.lastCheckStart += CheckPackageBatchSize
}
}

+ 3
- 1
scanner/main.go View File

@@ -125,5 +125,7 @@ func startTickEvent(tickExecutor *tickevent.Executor) {

tickExecutor.Start(tickevent.NewCheckAgentState(), 5*60*1000, tickevent.StartOption{RandomStartDelayMs: 60 * 1000})

tickExecutor.Start(tickevent.NewBatchCheckPackageRedundancy(), interval, tickevent.StartOption{RandomStartDelayMs: 10 * 60 * 1000})
tickExecutor.Start(tickevent.NewBatchCheckPackageRedundancy(), interval, tickevent.StartOption{RandomStartDelayMs: 20 * 60 * 1000})

tickExecutor.Start(tickevent.NewBatchCleanPinned(), interval, tickevent.StartOption{RandomStartDelayMs: 20 * 60 * 1000})
}

Loading…
Cancel
Save