diff --git a/agent/internal/mq/storage.go b/agent/internal/mq/storage.go index b87f1d4..bdc006e 100644 --- a/agent/internal/mq/storage.go +++ b/agent/internal/mq/storage.go @@ -24,31 +24,7 @@ import ( ) func (svc *Service) StartStorageLoadPackage(msg *agtmq.StartStorageLoadPackage) (*agtmq.StartStorageLoadPackageResp, *mq.CodeMessage) { - coorCli, err := stgglb.CoordinatorMQPool.Acquire() - if err != nil { - logger.Warnf("new coordinator client: %s", err.Error()) - - return nil, mq.Failed(errorcode.OperationFailed, "new coordinator client failed") - } - defer stgglb.CoordinatorMQPool.Release(coorCli) - - getStgResp, err := coorCli.GetStorageInfo(coormq.NewGetStorageInfo(msg.UserID, msg.StorageID)) - if err != nil { - logger.WithField("StorageID", msg.StorageID). - Warnf("getting storage info: %s", err.Error()) - - return nil, mq.Failed(errorcode.OperationFailed, "get storage info failed") - } - - outputDirPath := utils.MakeStorageLoadPackagePath(getStgResp.Directory, msg.UserID, msg.PackageID) - if err = os.MkdirAll(outputDirPath, 0755); err != nil { - logger.WithField("StorageID", msg.StorageID). - Warnf("creating output directory: %s", err.Error()) - - return nil, mq.Failed(errorcode.OperationFailed, "create output directory failed") - } - - tsk := svc.taskManager.StartNew(mytask.NewStorageLoadPackage(msg.UserID, msg.PackageID, outputDirPath)) + tsk := svc.taskManager.StartNew(mytask.NewStorageLoadPackage(msg.UserID, msg.PackageID, msg.StorageID)) return mq.ReplyOK(agtmq.NewStartStorageLoadPackageResp(tsk.ID())) } @@ -70,7 +46,7 @@ func (svc *Service) WaitStorageLoadPackage(msg *agtmq.WaitStorageLoadPackage) (* loadTsk := tsk.Body().(*mytask.StorageLoadPackage) - return mq.ReplyOK(agtmq.NewWaitStorageLoadPackageResp(true, errMsg, loadTsk.FullPath)) + return mq.ReplyOK(agtmq.NewWaitStorageLoadPackageResp(true, errMsg, loadTsk.FullOutputPath)) } else { if tsk.WaitTimeout(time.Duration(msg.WaitTimeoutMs) * time.Millisecond) { @@ -82,7 +58,7 @@ func (svc *Service) WaitStorageLoadPackage(msg *agtmq.WaitStorageLoadPackage) (* loadTsk := tsk.Body().(*mytask.StorageLoadPackage) - return mq.ReplyOK(agtmq.NewWaitStorageLoadPackageResp(true, errMsg, loadTsk.FullPath)) + return mq.ReplyOK(agtmq.NewWaitStorageLoadPackageResp(true, errMsg, loadTsk.FullOutputPath)) } return mq.ReplyOK(agtmq.NewWaitStorageLoadPackageResp(false, "", "")) diff --git a/agent/internal/task/storage_load_package.go b/agent/internal/task/storage_load_package.go index 79e0b70..2f88b0b 100644 --- a/agent/internal/task/storage_load_package.go +++ b/agent/internal/task/storage_load_package.go @@ -1,30 +1,352 @@ package task import ( + "fmt" + "io" + "math" + "os" + "path/filepath" "time" + "github.com/samber/lo" + "gitlink.org.cn/cloudream/common/pkgs/bitmap" + "gitlink.org.cn/cloudream/common/pkgs/ipfs" "gitlink.org.cn/cloudream/common/pkgs/task" cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" - "gitlink.org.cn/cloudream/storage/common/pkgs/cmd" + myio "gitlink.org.cn/cloudream/common/utils/io" + myref "gitlink.org.cn/cloudream/common/utils/reflect" + mysort "gitlink.org.cn/cloudream/common/utils/sort" + "gitlink.org.cn/cloudream/storage/common/consts" + stgglb "gitlink.org.cn/cloudream/storage/common/globals" + stgmod "gitlink.org.cn/cloudream/storage/common/models" + "gitlink.org.cn/cloudream/storage/common/pkgs/db/model" + "gitlink.org.cn/cloudream/storage/common/pkgs/distlock/reqbuilder" + "gitlink.org.cn/cloudream/storage/common/pkgs/ec" + coormq "gitlink.org.cn/cloudream/storage/common/pkgs/mq/coordinator" + "gitlink.org.cn/cloudream/storage/common/utils" ) type StorageLoadPackage struct { - cmd *cmd.DownloadPackage - FullPath string + FullOutputPath string + + userID cdssdk.UserID + packageID cdssdk.PackageID + storageID cdssdk.StorageID + pinnedBlocks []stgmod.ObjectBlock } -func NewStorageLoadPackage(userID cdssdk.UserID, packageID cdssdk.PackageID, outputPath string) *StorageLoadPackage { +func NewStorageLoadPackage(userID cdssdk.UserID, packageID cdssdk.PackageID, storageID cdssdk.StorageID) *StorageLoadPackage { return &StorageLoadPackage{ - cmd: cmd.NewDownloadPackage(userID, packageID, outputPath), - FullPath: outputPath, + userID: userID, + packageID: packageID, + storageID: storageID, } } func (t *StorageLoadPackage) Execute(task *task.Task[TaskContext], ctx TaskContext, complete CompleteFn) { - err := t.cmd.Execute(&cmd.DownloadPackageContext{ - Distlock: ctx.distlock, - }) + err := t.do(task, ctx) complete(err, CompleteOption{ RemovingDelay: time.Minute, }) } + +func (t *StorageLoadPackage) do(task *task.Task[TaskContext], ctx TaskContext) error { + coorCli, err := stgglb.CoordinatorMQPool.Acquire() + if err != nil { + return fmt.Errorf("new coordinator client: %w", err) + } + defer stgglb.CoordinatorMQPool.Release(coorCli) + + ipfsCli, err := stgglb.IPFSPool.Acquire() + if err != nil { + return fmt.Errorf("new IPFS client: %w", err) + } + defer stgglb.IPFSPool.Release(ipfsCli) + + getStgResp, err := coorCli.GetStorageInfo(coormq.NewGetStorageInfo(t.userID, t.storageID)) + if err != nil { + return fmt.Errorf("request to coordinator: %w", err) + } + + outputDirPath := utils.MakeStorageLoadPackagePath(getStgResp.Directory, t.userID, t.packageID) + if err = os.MkdirAll(outputDirPath, 0755); err != nil { + return fmt.Errorf("creating output directory: %w", err) + } + t.FullOutputPath = outputDirPath + + getObjectDetails, err := coorCli.GetPackageObjectDetails(coormq.NewGetPackageObjectDetails(t.packageID)) + if err != nil { + return fmt.Errorf("getting package object details: %w", err) + } + + mutex, err := reqbuilder.NewBuilder(). + // 提前占位 + Metadata().StoragePackage().CreateOne(t.userID, t.storageID, t.packageID). + // 保护在storage目录中下载的文件 + Storage().Buzy(t.storageID). + // 保护下载文件时同时保存到IPFS的文件 + IPFS().Buzy(getStgResp.NodeID). + MutexLock(ctx.distlock) + if err != nil { + return fmt.Errorf("acquire locks failed, err: %w", err) + } + defer mutex.Unlock() + + for _, obj := range getObjectDetails.Objects { + err := t.downloadOne(coorCli, ipfsCli, outputDirPath, obj) + if err != nil { + return err + } + } + + _, err = coorCli.StoragePackageLoaded(coormq.NewStoragePackageLoaded(t.userID, t.storageID, t.packageID, t.pinnedBlocks)) + if err != nil { + return fmt.Errorf("loading package to storage: %w", err) + } + + return err +} + +func (t *StorageLoadPackage) downloadOne(coorCli *coormq.Client, ipfsCli *ipfs.PoolClient, dir string, obj stgmod.ObjectDetail) error { + var file io.ReadCloser + + switch red := obj.Object.Redundancy.(type) { + case *cdssdk.NoneRedundancy: + reader, err := t.downloadNoneOrRepObject(ipfsCli, obj) + if err != nil { + return fmt.Errorf("downloading object: %w", err) + } + file = reader + + case *cdssdk.RepRedundancy: + reader, err := t.downloadNoneOrRepObject(ipfsCli, obj) + if err != nil { + return fmt.Errorf("downloading rep object: %w", err) + } + file = reader + + case *cdssdk.ECRedundancy: + reader, pinnedBlocks, err := t.downloadECObject(coorCli, ipfsCli, obj, red) + if err != nil { + return fmt.Errorf("downloading ec object: %w", err) + } + file = reader + t.pinnedBlocks = append(t.pinnedBlocks, pinnedBlocks...) + + default: + return fmt.Errorf("unknow redundancy type: %v", myref.TypeOfValue(obj.Object.Redundancy)) + } + defer file.Close() + + fullPath := filepath.Join(dir, obj.Object.Path) + + lastDirPath := filepath.Dir(fullPath) + if err := os.MkdirAll(lastDirPath, 0755); err != nil { + return fmt.Errorf("creating object last dir: %w", err) + } + + outputFile, err := os.Create(fullPath) + if err != nil { + return fmt.Errorf("creating object file: %w", err) + } + defer outputFile.Close() + + if _, err := io.Copy(outputFile, file); err != nil { + return fmt.Errorf("writting object to file: %w", err) + } + + return nil +} + +func (t *StorageLoadPackage) downloadNoneOrRepObject(ipfsCli *ipfs.PoolClient, obj stgmod.ObjectDetail) (io.ReadCloser, error) { + if len(obj.Blocks) == 0 && len(obj.PinnedAt) == 0 { + return nil, fmt.Errorf("no node has this object") + } + + // 不管实际有没有成功 + ipfsCli.Pin(obj.Object.FileHash) + + file, err := ipfsCli.OpenRead(obj.Object.FileHash) + if err != nil { + return nil, err + } + + return file, nil +} + +func (t *StorageLoadPackage) downloadECObject(coorCli *coormq.Client, ipfsCli *ipfs.PoolClient, obj stgmod.ObjectDetail, ecRed *cdssdk.ECRedundancy) (io.ReadCloser, []stgmod.ObjectBlock, error) { + allNodes, err := t.sortDownloadNodes(coorCli, obj) + if err != nil { + return nil, nil, err + } + bsc, blocks := t.getMinReadingBlockSolution(allNodes, ecRed.K) + osc, _ := t.getMinReadingObjectSolution(allNodes, ecRed.K) + if bsc < osc { + var fileStrs []io.ReadCloser + + rs, err := ec.NewRs(ecRed.K, ecRed.N, ecRed.ChunkSize) + if err != nil { + return nil, nil, fmt.Errorf("new rs: %w", err) + } + + for i := range blocks { + // 不管实际有没有成功 + ipfsCli.Pin(blocks[i].Block.FileHash) + + str, err := ipfsCli.OpenRead(blocks[i].Block.FileHash) + if err != nil { + for i -= 1; i >= 0; i-- { + fileStrs[i].Close() + } + return nil, nil, fmt.Errorf("donwloading file: %w", err) + } + + fileStrs = append(fileStrs, str) + } + + fileReaders, filesCloser := myio.ToReaders(fileStrs) + + var indexes []int + var pinnedBlocks []stgmod.ObjectBlock + for _, b := range blocks { + indexes = append(indexes, b.Block.Index) + pinnedBlocks = append(pinnedBlocks, stgmod.ObjectBlock{ + ObjectID: b.Block.ObjectID, + Index: b.Block.Index, + NodeID: *stgglb.Local.NodeID, + FileHash: b.Block.FileHash, + }) + } + + outputs, outputsCloser := myio.ToReaders(rs.ReconstructData(fileReaders, indexes)) + return myio.AfterReadClosed(myio.Length(myio.ChunkedJoin(outputs, int(ecRed.ChunkSize)), obj.Object.Size), func(c io.ReadCloser) { + filesCloser() + outputsCloser() + }), pinnedBlocks, nil + } + + // bsc >= osc,如果osc是MaxFloat64,那么bsc也一定是,也就意味着没有足够块来恢复文件 + if osc == math.MaxFloat64 { + return nil, nil, fmt.Errorf("no enough blocks to reconstruct the file, want %d, get only %d", ecRed.K, len(blocks)) + } + + // 如果是直接读取的文件,那么就不需要Pin文件块 + str, err := ipfsCli.OpenRead(obj.Object.FileHash) + return str, nil, err +} + +type downloadNodeInfo struct { + Node model.Node + ObjectPinned bool + Blocks []stgmod.ObjectBlock + Distance float64 +} + +func (t *StorageLoadPackage) sortDownloadNodes(coorCli *coormq.Client, obj stgmod.ObjectDetail) ([]*downloadNodeInfo, error) { + var nodeIDs []cdssdk.NodeID + for _, id := range obj.PinnedAt { + if !lo.Contains(nodeIDs, id) { + nodeIDs = append(nodeIDs, id) + } + } + for _, b := range obj.Blocks { + if !lo.Contains(nodeIDs, b.NodeID) { + nodeIDs = append(nodeIDs, b.NodeID) + } + } + + getNodes, err := coorCli.GetNodes(coormq.NewGetNodes(nodeIDs)) + if err != nil { + return nil, fmt.Errorf("getting nodes: %w", err) + } + + downloadNodeMap := make(map[cdssdk.NodeID]*downloadNodeInfo) + for _, id := range obj.PinnedAt { + node, ok := downloadNodeMap[id] + if !ok { + mod := *getNodes.GetNode(id) + node = &downloadNodeInfo{ + Node: mod, + ObjectPinned: true, + Distance: t.getNodeDistance(mod), + } + downloadNodeMap[id] = node + } + + node.ObjectPinned = true + } + + for _, b := range obj.Blocks { + node, ok := downloadNodeMap[b.NodeID] + if !ok { + mod := *getNodes.GetNode(b.NodeID) + node = &downloadNodeInfo{ + Node: mod, + Distance: t.getNodeDistance(mod), + } + downloadNodeMap[b.NodeID] = node + } + + node.Blocks = append(node.Blocks, b) + } + + return mysort.Sort(lo.Values(downloadNodeMap), func(left, right *downloadNodeInfo) int { + return mysort.Cmp(left.Distance, right.Distance) + }), nil +} + +type downloadBlock struct { + Node model.Node + Block stgmod.ObjectBlock +} + +func (t *StorageLoadPackage) getMinReadingBlockSolution(sortedNodes []*downloadNodeInfo, k int) (float64, []downloadBlock) { + gotBlocksMap := bitmap.Bitmap64(0) + var gotBlocks []downloadBlock + dist := float64(0.0) + for _, n := range sortedNodes { + for _, b := range n.Blocks { + if !gotBlocksMap.Get(b.Index) { + gotBlocks = append(gotBlocks, downloadBlock{ + Node: n.Node, + Block: b, + }) + gotBlocksMap.Set(b.Index, true) + dist += n.Distance + } + + if len(gotBlocks) >= k { + return dist, gotBlocks + } + } + } + + return math.MaxFloat64, gotBlocks +} + +func (t *StorageLoadPackage) getMinReadingObjectSolution(sortedNodes []*downloadNodeInfo, k int) (float64, *model.Node) { + dist := math.MaxFloat64 + var downloadNode *model.Node + for _, n := range sortedNodes { + if n.ObjectPinned && float64(k)*n.Distance < dist { + dist = float64(k) * n.Distance + downloadNode = &n.Node + } + } + + return dist, downloadNode +} + +func (t *StorageLoadPackage) getNodeDistance(node model.Node) float64 { + if stgglb.Local.NodeID != nil { + if node.NodeID == *stgglb.Local.NodeID { + return consts.NodeDistanceSameNode + } + } + + if node.LocationID == stgglb.Local.LocationID { + return consts.NodeDistanceSameLocation + } + + return consts.NodeDistanceOther +} diff --git a/client/internal/cmdline/scanner.go b/client/internal/cmdline/scanner.go index cbbfcbf..e5aa0a8 100644 --- a/client/internal/cmdline/scanner.go +++ b/client/internal/cmdline/scanner.go @@ -39,5 +39,7 @@ func init() { parseScannerEventCmdTrie.MustAdd(scevt.NewCheckPackageRedundancy, myreflect.TypeNameOf[scevt.CheckPackageRedundancy]()) + parseScannerEventCmdTrie.MustAdd(scevt.NewCleanPinned, myreflect.TypeNameOf[scevt.CleanPinned]()) + commands.MustAdd(ScannerPostEvent, "scanner", "event") } diff --git a/client/internal/cmdline/storage.go b/client/internal/cmdline/storage.go index b99bd4a..355e85c 100644 --- a/client/internal/cmdline/storage.go +++ b/client/internal/cmdline/storage.go @@ -8,13 +8,13 @@ import ( ) func StorageLoadPackage(ctx CommandContext, packageID cdssdk.PackageID, storageID cdssdk.StorageID) error { - taskID, err := ctx.Cmdline.Svc.StorageSvc().StartStorageLoadPackage(0, packageID, storageID) + nodeID, taskID, err := ctx.Cmdline.Svc.StorageSvc().StartStorageLoadPackage(0, packageID, storageID) if err != nil { return fmt.Errorf("start loading package to storage: %w", err) } for { - complete, fullPath, err := ctx.Cmdline.Svc.StorageSvc().WaitStorageLoadPackage(taskID, time.Second*10) + complete, fullPath, err := ctx.Cmdline.Svc.StorageSvc().WaitStorageLoadPackage(nodeID, taskID, time.Second*10) if complete { if err != nil { return fmt.Errorf("moving complete with: %w", err) diff --git a/client/internal/http/storage.go b/client/internal/http/storage.go index d3d90e5..1b30f1c 100644 --- a/client/internal/http/storage.go +++ b/client/internal/http/storage.go @@ -40,7 +40,7 @@ func (s *StorageService) LoadPackage(ctx *gin.Context) { return } - taskID, err := s.svc.StorageSvc().StartStorageLoadPackage(*req.UserID, *req.PackageID, *req.StorageID) + nodeID, taskID, err := s.svc.StorageSvc().StartStorageLoadPackage(*req.UserID, *req.PackageID, *req.StorageID) if err != nil { log.Warnf("start storage load package: %s", err.Error()) ctx.JSON(http.StatusOK, Failed(errorcode.OperationFailed, "storage load package failed")) @@ -48,7 +48,7 @@ func (s *StorageService) LoadPackage(ctx *gin.Context) { } for { - complete, fullPath, err := s.svc.StorageSvc().WaitStorageLoadPackage(taskID, time.Second*10) + complete, fullPath, err := s.svc.StorageSvc().WaitStorageLoadPackage(nodeID, taskID, time.Second*10) if complete { if err != nil { log.Warnf("loading complete with: %s", err.Error()) diff --git a/client/internal/services/storage.go b/client/internal/services/storage.go index 8977f92..d5671a4 100644 --- a/client/internal/services/storage.go +++ b/client/internal/services/storage.go @@ -6,7 +6,6 @@ import ( cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" - "gitlink.org.cn/cloudream/storage/client/internal/task" stgglb "gitlink.org.cn/cloudream/storage/common/globals" "gitlink.org.cn/cloudream/storage/common/pkgs/db/model" agtmq "gitlink.org.cn/cloudream/storage/common/pkgs/mq/agent" @@ -21,18 +20,55 @@ func (svc *Service) StorageSvc() *StorageService { return &StorageService{Service: svc} } -func (svc *StorageService) StartStorageLoadPackage(userID cdssdk.UserID, packageID cdssdk.PackageID, storageID cdssdk.StorageID) (string, error) { - tsk := svc.TaskMgr.StartNew(task.NewStorageLoadPackage(userID, packageID, storageID)) - return tsk.ID(), nil +func (svc *StorageService) StartStorageLoadPackage(userID cdssdk.UserID, packageID cdssdk.PackageID, storageID cdssdk.StorageID) (cdssdk.NodeID, string, error) { + coorCli, err := stgglb.CoordinatorMQPool.Acquire() + if err != nil { + return 0, "", fmt.Errorf("new coordinator client: %w", err) + } + defer stgglb.CoordinatorMQPool.Release(coorCli) + + stgResp, err := coorCli.GetStorageInfo(coormq.NewGetStorageInfo(userID, storageID)) + if err != nil { + return 0, "", fmt.Errorf("getting storage info: %w", err) + } + + agentCli, err := stgglb.AgentMQPool.Acquire(stgResp.NodeID) + if err != nil { + return 0, "", fmt.Errorf("new agent client: %w", err) + } + defer stgglb.AgentMQPool.Release(agentCli) + + startResp, err := agentCli.StartStorageLoadPackage(agtmq.NewStartStorageLoadPackage(userID, packageID, storageID)) + if err != nil { + return 0, "", fmt.Errorf("start storage load package: %w", err) + } + + return stgResp.NodeID, startResp.TaskID, nil } -func (svc *StorageService) WaitStorageLoadPackage(taskID string, waitTimeout time.Duration) (bool, string, error) { - tsk := svc.TaskMgr.FindByID(taskID) - if tsk.WaitTimeout(waitTimeout) { - loadTsk := tsk.Body().(*task.StorageLoadPackage) - return true, loadTsk.ResultFullPath, tsk.Error() +func (svc *StorageService) WaitStorageLoadPackage(nodeID cdssdk.NodeID, taskID string, waitTimeout time.Duration) (bool, string, error) { + agentCli, err := stgglb.AgentMQPool.Acquire(nodeID) + if err != nil { + // TODO 失败是否要当做任务已经结束? + return true, "", fmt.Errorf("new agent client: %w", err) + } + defer stgglb.AgentMQPool.Release(agentCli) + + waitResp, err := agentCli.WaitStorageLoadPackage(agtmq.NewWaitStorageLoadPackage(taskID, waitTimeout.Milliseconds())) + if err != nil { + // TODO 请求失败是否要当做任务已经结束? + return true, "", fmt.Errorf("wait storage load package: %w", err) + } + + if !waitResp.IsComplete { + return false, "", nil } - return false, "", nil + + if waitResp.Error != "" { + return true, "", fmt.Errorf("%s", waitResp.Error) + } + + return true, waitResp.FullPath, nil } func (svc *StorageService) DeleteStoragePackage(userID int64, packageID int64, storageID int64) error { diff --git a/client/internal/task/storage_load_package.go b/client/internal/task/storage_load_package.go deleted file mode 100644 index 644c407..0000000 --- a/client/internal/task/storage_load_package.go +++ /dev/null @@ -1,100 +0,0 @@ -package task - -import ( - "fmt" - "time" - - "gitlink.org.cn/cloudream/common/pkgs/task" - cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" - stgglb "gitlink.org.cn/cloudream/storage/common/globals" - "gitlink.org.cn/cloudream/storage/common/pkgs/distlock/reqbuilder" - agtmq "gitlink.org.cn/cloudream/storage/common/pkgs/mq/agent" - coormq "gitlink.org.cn/cloudream/storage/common/pkgs/mq/coordinator" -) - -// TODO 可以考虑不用Task来实现这些逻辑 -type StorageLoadPackage struct { - userID cdssdk.UserID - packageID cdssdk.PackageID - storageID cdssdk.StorageID - - ResultFullPath string -} - -func NewStorageLoadPackage(userID cdssdk.UserID, packageID cdssdk.PackageID, storageID cdssdk.StorageID) *StorageLoadPackage { - return &StorageLoadPackage{ - userID: userID, - packageID: packageID, - storageID: storageID, - } -} - -func (t *StorageLoadPackage) Execute(task *task.Task[TaskContext], ctx TaskContext, complete CompleteFn) { - err := t.do(ctx) - complete(err, CompleteOption{ - RemovingDelay: time.Minute, - }) -} - -func (t *StorageLoadPackage) do(ctx TaskContext) error { - mutex, err := reqbuilder.NewBuilder(). - // 提前占位 - Metadata().StoragePackage().CreateOne(t.userID, t.storageID, t.packageID). - // 保护在storage目录中下载的文件 - Storage().Buzy(t.storageID). - MutexLock(ctx.distlock) - if err != nil { - return fmt.Errorf("acquire locks failed, err: %w", err) - } - defer mutex.Unlock() - - coorCli, err := stgglb.CoordinatorMQPool.Acquire() - if err != nil { - return fmt.Errorf("new coordinator client: %w", err) - } - defer stgglb.CoordinatorMQPool.Release(coorCli) - - getStgResp, err := coorCli.GetStorageInfo(coormq.NewGetStorageInfo(t.userID, t.storageID)) - if err != nil { - return fmt.Errorf("getting storage info: %w", err) - } - - // 然后向代理端发送移动文件的请求 - agentCli, err := stgglb.AgentMQPool.Acquire(getStgResp.NodeID) - if err != nil { - return fmt.Errorf("create agent client to %d failed, err: %w", getStgResp.NodeID, err) - } - defer stgglb.AgentMQPool.Release(agentCli) - - agentMoveResp, err := agentCli.StartStorageLoadPackage( - agtmq.NewStartStorageLoadPackage( - t.userID, - t.packageID, - t.storageID, - )) - if err != nil { - return fmt.Errorf("start loading package to storage: %w", err) - } - - for { - waitResp, err := agentCli.WaitStorageLoadPackage(agtmq.NewWaitStorageLoadPackage(agentMoveResp.TaskID, int64(time.Second)*5)) - if err != nil { - return fmt.Errorf("wait loading package: %w", err) - } - - if waitResp.IsComplete { - if waitResp.Error != "" { - return fmt.Errorf("agent loading package: %s", waitResp.Error) - } - - t.ResultFullPath = waitResp.FullPath - break - } - } - - _, err = coorCli.StoragePackageLoaded(coormq.NewStoragePackageLoaded(t.userID, t.storageID, t.packageID)) - if err != nil { - return fmt.Errorf("loading package to storage: %w", err) - } - return nil -} diff --git a/common/consts/consts.go b/common/consts/consts.go index 0e5c658..b7f849d 100644 --- a/common/consts/consts.go +++ b/common/consts/consts.go @@ -9,3 +9,9 @@ const ( NodeStateNormal = "Normal" NodeStateUnavailable = "Unavailable" ) + +const ( + NodeDistanceSameNode = 0.1 + NodeDistanceSameLocation = 1 + NodeDistanceOther = 5 +) diff --git a/common/models/models.go b/common/models/models.go index 6430e7b..f3f56bb 100644 --- a/common/models/models.go +++ b/common/models/models.go @@ -13,14 +13,16 @@ type ObjectBlock struct { } type ObjectDetail struct { - Object cdssdk.Object `json:"object"` - Blocks []ObjectBlock `json:"blocks"` + Object cdssdk.Object `json:"object"` + PinnedAt []cdssdk.NodeID `json:"pinnedAt"` + Blocks []ObjectBlock `json:"blocks"` } -func NewObjectDetail(object cdssdk.Object, blocks []ObjectBlock) ObjectDetail { +func NewObjectDetail(object cdssdk.Object, pinnedAt []cdssdk.NodeID, blocks []ObjectBlock) ObjectDetail { return ObjectDetail{ - Object: object, - Blocks: blocks, + Object: object, + PinnedAt: pinnedAt, + Blocks: blocks, } } diff --git a/common/pkgs/db/object.go b/common/pkgs/db/object.go index 9816f78..af4369e 100644 --- a/common/pkgs/db/object.go +++ b/common/pkgs/db/object.go @@ -6,6 +6,7 @@ import ( "github.com/jmoiron/sqlx" "github.com/samber/lo" cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" + stgmod "gitlink.org.cn/cloudream/storage/common/models" "gitlink.org.cn/cloudream/storage/common/pkgs/db/model" coormq "gitlink.org.cn/cloudream/storage/common/pkgs/mq/coordinator" ) @@ -94,6 +95,38 @@ func (*ObjectDB) GetPackageObjects(ctx SQLContext, packageID cdssdk.PackageID) ( return lo.Map(ret, func(o model.TempObject, idx int) model.Object { return o.ToObject() }), err } +func (db *ObjectDB) GetPackageObjectDetails(ctx SQLContext, packageID cdssdk.PackageID) ([]stgmod.ObjectDetail, error) { + var objs []model.TempObject + err := sqlx.Select(ctx, &objs, "select * from Object where PackageID = ? order by ObjectID asc", packageID) + if err != nil { + return nil, fmt.Errorf("getting objects: %w", err) + } + + rets := make([]stgmod.ObjectDetail, 0, len(objs)) + + for _, obj := range objs { + var blocks []stgmod.ObjectBlock + err = sqlx.Select(ctx, + &blocks, + "select * from ObjectBlock where ObjectID = ? order by `Index`", + obj.ObjectID, + ) + if err != nil { + return nil, err + } + + var pinnedAt []cdssdk.NodeID + err = sqlx.Select(ctx, &pinnedAt, "select NodeID from PinnedObject where ObjectID = ?", obj.ObjectID) + if err != nil { + return nil, err + } + + rets = append(rets, stgmod.NewObjectDetail(obj.ToObject(), pinnedAt, blocks)) + } + + return rets, nil +} + func (db *ObjectDB) BatchAdd(ctx SQLContext, packageID cdssdk.PackageID, objs []coormq.AddObjectEntry) ([]cdssdk.ObjectID, error) { objIDs := make([]cdssdk.ObjectID, 0, len(objs)) for _, obj := range objs { @@ -151,7 +184,6 @@ func (db *ObjectDB) BatchUpdateRedundancy(ctx SQLContext, objs []coormq.ChangeOb } for _, block := range obj.Blocks { - // 首次上传默认使用不分块的rep模式 err = db.ObjectBlock().Create(ctx, obj.ObjectID, block.Index, block.NodeID, block.FileHash) if err != nil { return fmt.Errorf("creating object block: %w", err) @@ -163,6 +195,11 @@ func (db *ObjectDB) BatchUpdateRedundancy(ctx SQLContext, objs []coormq.ChangeOb return fmt.Errorf("creating cache: %w", err) } } + + err = db.PinnedObject().ObjectBatchCreate(ctx, obj.ObjectID, obj.PinnedAt) + if err != nil { + return fmt.Errorf("creating pinned object: %w", err) + } } return nil diff --git a/common/pkgs/db/object_block.go b/common/pkgs/db/object_block.go index 93c706f..e560e03 100644 --- a/common/pkgs/db/object_block.go +++ b/common/pkgs/db/object_block.go @@ -2,14 +2,12 @@ package db import ( "database/sql" - "fmt" "strconv" "strings" "github.com/jmoiron/sqlx" cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" stgmod "gitlink.org.cn/cloudream/storage/common/models" - "gitlink.org.cn/cloudream/storage/common/pkgs/db/model" ) type ObjectBlockDB struct { @@ -31,6 +29,14 @@ func (db *ObjectBlockDB) Create(ctx SQLContext, objectID cdssdk.ObjectID, index return err } +func (db *ObjectBlockDB) BatchCreate(ctx SQLContext, blocks []stgmod.ObjectBlock) error { + _, err := sqlx.NamedExec(ctx, + "insert ignore into ObjectBlock(ObjectID, `Index`, NodeID, FileHash) values(:ObjectID, :Index, :NodeID, :FileHash)", + blocks, + ) + return err +} + func (db *ObjectBlockDB) DeleteByObjectID(ctx SQLContext, objectID cdssdk.ObjectID) error { _, err := ctx.Exec("delete from ObjectBlock where ObjectID = ?", objectID) return err @@ -65,32 +71,6 @@ func (db *ObjectBlockDB) CountBlockWithHash(ctx SQLContext, fileHash string) (in return cnt, err } -func (db *ObjectBlockDB) GetPackageBlockDetails(ctx SQLContext, packageID cdssdk.PackageID) ([]stgmod.ObjectDetail, error) { - var objs []model.TempObject - err := sqlx.Select(ctx, &objs, "select * from Object where PackageID = ? order by ObjectID asc", packageID) - if err != nil { - return nil, fmt.Errorf("getting objects: %w", err) - } - - rets := make([]stgmod.ObjectDetail, 0, len(objs)) - - for _, obj := range objs { - var blocks []stgmod.ObjectBlock - err = sqlx.Select(ctx, - &blocks, - "select * from ObjectBlock where ObjectID = ? order by Index", - obj.ObjectID, - ) - if err != nil { - return nil, err - } - - rets = append(rets, stgmod.NewObjectDetail(obj.ToObject(), blocks)) - } - - return rets, nil -} - // 按逗号切割字符串,并将每一个部分解析为一个int64的ID。 // 注:需要外部保证分隔的每一个部分都是正确的10进制数字格式 func splitConcatedNodeID(idStr string) []cdssdk.NodeID { diff --git a/common/pkgs/db/pinned_object.go b/common/pkgs/db/pinned_object.go index 1e71872..026e5ca 100644 --- a/common/pkgs/db/pinned_object.go +++ b/common/pkgs/db/pinned_object.go @@ -34,6 +34,11 @@ func (*PinnedObjectDB) Create(ctx SQLContext, nodeID cdssdk.NodeID, objectID cds return err } +func (*PinnedObjectDB) TryCreate(ctx SQLContext, nodeID cdssdk.NodeID, objectID cdssdk.ObjectID, createTime time.Time) error { + _, err := ctx.Exec("insert ignore into PinnedObject values(?,?,?)", nodeID, objectID, createTime) + return err +} + func (*PinnedObjectDB) CreateFromPackage(ctx SQLContext, packageID cdssdk.PackageID, nodeID cdssdk.NodeID) error { _, err := ctx.Exec( "insert ignore into PinnedObject(NodeID, ObjectID, CreateTime) select ? as NodeID, ObjectID, ? as CreateTime from Object where PackageID = ?", @@ -44,6 +49,16 @@ func (*PinnedObjectDB) CreateFromPackage(ctx SQLContext, packageID cdssdk.Packag return err } +func (db *PinnedObjectDB) ObjectBatchCreate(ctx SQLContext, objectID cdssdk.ObjectID, nodeIDs []cdssdk.NodeID) error { + for _, id := range nodeIDs { + err := db.TryCreate(ctx, id, objectID, time.Now()) + if err != nil { + return err + } + } + return nil +} + func (*PinnedObjectDB) Delete(ctx SQLContext, nodeID cdssdk.NodeID, objectID cdssdk.ObjectID) error { _, err := ctx.Exec("delete from PinnedObject where NodeID = ? and ObjectID = ?", nodeID, objectID) return err diff --git a/common/pkgs/iterator/download_object_iterator.go b/common/pkgs/iterator/download_object_iterator.go index 5cb1ef4..a219f76 100644 --- a/common/pkgs/iterator/download_object_iterator.go +++ b/common/pkgs/iterator/download_object_iterator.go @@ -3,16 +3,20 @@ package iterator import ( "fmt" "io" - "math/rand" + "math" "reflect" "github.com/samber/lo" + "gitlink.org.cn/cloudream/common/pkgs/bitmap" "gitlink.org.cn/cloudream/common/pkgs/logger" cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" myio "gitlink.org.cn/cloudream/common/utils/io" + mysort "gitlink.org.cn/cloudream/common/utils/sort" + "gitlink.org.cn/cloudream/storage/common/consts" stgglb "gitlink.org.cn/cloudream/storage/common/globals" + stgmod "gitlink.org.cn/cloudream/storage/common/models" stgmodels "gitlink.org.cn/cloudream/storage/common/models" "gitlink.org.cn/cloudream/storage/common/pkgs/db/model" "gitlink.org.cn/cloudream/storage/common/pkgs/distlock" @@ -28,8 +32,10 @@ type IterDownloadingObject struct { } type DownloadNodeInfo struct { - Node model.Node - IsSameLocation bool + Node model.Node + ObjectPinned bool + Blocks []stgmod.ObjectBlock + Distance float64 } type DownloadContext struct { @@ -114,136 +120,192 @@ func (i *DownloadObjectIterator) Close() { } } -// chooseDownloadNode 选择一个下载节点 -// 1. 从与当前客户端相同地域的节点中随机选一个 -// 2. 没有用的话从所有节点中随机选一个 -func (i *DownloadObjectIterator) chooseDownloadNode(entries []DownloadNodeInfo) DownloadNodeInfo { - sameLocationEntries := lo.Filter(entries, func(e DownloadNodeInfo, i int) bool { return e.IsSameLocation }) - if len(sameLocationEntries) > 0 { - return sameLocationEntries[rand.Intn(len(sameLocationEntries))] +func (iter *DownloadObjectIterator) downloadNoneOrRepObject(coorCli *coormq.Client, ctx *DownloadContext, obj stgmodels.ObjectDetail) (io.ReadCloser, error) { + allNodes, err := iter.sortDownloadNodes(coorCli, ctx, obj) + if err != nil { + return nil, err + } + bsc, blocks := iter.getMinReadingBlockSolution(allNodes, 1) + osc, node := iter.getMinReadingObjectSolution(allNodes, 1) + if bsc < osc { + return downloadFile(ctx, blocks[0].Node, blocks[0].Block.FileHash) } - return entries[rand.Intn(len(entries))] -} - -func (iter *DownloadObjectIterator) downloadNoneOrRepObject(coorCli *coormq.Client, ctx *DownloadContext, obj stgmodels.ObjectDetail) (io.ReadCloser, error) { - if len(obj.Blocks) == 0 { + // bsc >= osc,如果osc是MaxFloat64,那么bsc也一定是,也就意味着没有足够块来恢复文件 + if osc == math.MaxFloat64 { return nil, fmt.Errorf("no node has this object") } - //采取直接读,优先选内网节点 - var chosenNodes []DownloadNodeInfo + return downloadFile(ctx, *node, obj.Object.FileHash) +} + +func (iter *DownloadObjectIterator) downloadECObject(coorCli *coormq.Client, ctx *DownloadContext, obj stgmodels.ObjectDetail, ecRed *cdssdk.ECRedundancy) (io.ReadCloser, error) { + allNodes, err := iter.sortDownloadNodes(coorCli, ctx, obj) + if err != nil { + return nil, err + } + bsc, blocks := iter.getMinReadingBlockSolution(allNodes, ecRed.K) + osc, node := iter.getMinReadingObjectSolution(allNodes, ecRed.K) + if bsc < osc { + var fileStrs []io.ReadCloser - grpBlocks := obj.GroupBlocks() - for _, grp := range grpBlocks { - getNodesResp, err := coorCli.GetNodes(coormq.NewGetNodes(grp.NodeIDs)) + rs, err := ec.NewRs(ecRed.K, ecRed.N, ecRed.ChunkSize) if err != nil { - continue + return nil, fmt.Errorf("new rs: %w", err) } - downloadNodes := lo.Map(getNodesResp.Nodes, func(node model.Node, index int) DownloadNodeInfo { - return DownloadNodeInfo{ - Node: node, - IsSameLocation: node.LocationID == stgglb.Local.LocationID, + for i, b := range blocks { + str, err := downloadFile(ctx, b.Node, b.Block.FileHash) + if err != nil { + for i -= 1; i >= 0; i-- { + fileStrs[i].Close() + } + return nil, fmt.Errorf("donwloading file: %w", err) } - }) - chosenNodes = append(chosenNodes, iter.chooseDownloadNode(downloadNodes)) - } + fileStrs = append(fileStrs, str) + } - var fileStrs []io.ReadCloser + fileReaders, filesCloser := myio.ToReaders(fileStrs) - for i := range grpBlocks { - str, err := downloadFile(ctx, chosenNodes[i], grpBlocks[i].FileHash) - if err != nil { - for i -= 1; i >= 0; i-- { - fileStrs[i].Close() - } - return nil, fmt.Errorf("donwloading file: %w", err) + var indexes []int + for _, b := range blocks { + indexes = append(indexes, b.Block.Index) } - fileStrs = append(fileStrs, str) + outputs, outputsCloser := myio.ToReaders(rs.ReconstructData(fileReaders, indexes)) + return myio.AfterReadClosed(myio.Length(myio.ChunkedJoin(outputs, int(ecRed.ChunkSize)), obj.Object.Size), func(c io.ReadCloser) { + filesCloser() + outputsCloser() + }), nil } - fileReaders, filesCloser := myio.ToReaders(fileStrs) - return myio.AfterReadClosed(myio.Length(myio.Join(fileReaders), obj.Object.Size), func(c io.ReadCloser) { - filesCloser() - }), nil + // bsc >= osc,如果osc是MaxFloat64,那么bsc也一定是,也就意味着没有足够块来恢复文件 + if osc == math.MaxFloat64 { + return nil, fmt.Errorf("no enough blocks to reconstruct the file, want %d, get only %d", ecRed.K, len(blocks)) + } + + return downloadFile(ctx, *node, obj.Object.FileHash) } -func (iter *DownloadObjectIterator) downloadECObject(coorCli *coormq.Client, ctx *DownloadContext, obj stgmodels.ObjectDetail, ecRed *cdssdk.ECRedundancy) (io.ReadCloser, error) { - //采取直接读,优先选内网节点 - var chosenNodes []DownloadNodeInfo - var chosenBlocks []stgmodels.GrouppedObjectBlock - grpBlocks := obj.GroupBlocks() - for i := range grpBlocks { - if len(chosenBlocks) == ecRed.K { - break +func (iter *DownloadObjectIterator) sortDownloadNodes(coorCli *coormq.Client, ctx *DownloadContext, obj stgmodels.ObjectDetail) ([]*DownloadNodeInfo, error) { + var nodeIDs []cdssdk.NodeID + for _, id := range obj.PinnedAt { + if !lo.Contains(nodeIDs, id) { + nodeIDs = append(nodeIDs, id) } - - getNodesResp, err := coorCli.GetNodes(coormq.NewGetNodes(grpBlocks[i].NodeIDs)) - if err != nil { - continue + } + for _, b := range obj.Blocks { + if !lo.Contains(nodeIDs, b.NodeID) { + nodeIDs = append(nodeIDs, b.NodeID) } + } - downloadNodes := lo.Map(getNodesResp.Nodes, func(node model.Node, index int) DownloadNodeInfo { - return DownloadNodeInfo{ - Node: node, - IsSameLocation: node.LocationID == stgglb.Local.LocationID, - } - }) + getNodes, err := coorCli.GetNodes(coormq.NewGetNodes(nodeIDs)) + if err != nil { + return nil, fmt.Errorf("getting nodes: %w", err) + } - chosenBlocks = append(chosenBlocks, grpBlocks[i]) - chosenNodes = append(chosenNodes, iter.chooseDownloadNode(downloadNodes)) + downloadNodeMap := make(map[cdssdk.NodeID]*DownloadNodeInfo) + for _, id := range obj.PinnedAt { + node, ok := downloadNodeMap[id] + if !ok { + mod := *getNodes.GetNode(id) + node = &DownloadNodeInfo{ + Node: mod, + ObjectPinned: true, + Distance: iter.getNodeDistance(mod), + } + downloadNodeMap[id] = node + } + node.ObjectPinned = true } - if len(chosenBlocks) < ecRed.K { - return nil, fmt.Errorf("no enough blocks to reconstruct the file, want %d, get only %d", ecRed.K, len(chosenBlocks)) + for _, b := range obj.Blocks { + node, ok := downloadNodeMap[b.NodeID] + if !ok { + mod := *getNodes.GetNode(b.NodeID) + node = &DownloadNodeInfo{ + Node: mod, + Distance: iter.getNodeDistance(mod), + } + downloadNodeMap[b.NodeID] = node + } + + node.Blocks = append(node.Blocks, b) } - var fileStrs []io.ReadCloser + return mysort.Sort(lo.Values(downloadNodeMap), func(left, right *DownloadNodeInfo) int { + return mysort.Cmp(left.Distance, right.Distance) + }), nil +} - rs, err := ec.NewRs(ecRed.K, ecRed.N, ecRed.ChunkSize) - if err != nil { - return nil, fmt.Errorf("new rs: %w", err) - } +type downloadBlock struct { + Node model.Node + Block stgmod.ObjectBlock +} - for i := range chosenBlocks { - str, err := downloadFile(ctx, chosenNodes[i], chosenBlocks[i].FileHash) - if err != nil { - for i -= 1; i >= 0; i-- { - fileStrs[i].Close() +func (iter *DownloadObjectIterator) getMinReadingBlockSolution(sortedNodes []*DownloadNodeInfo, k int) (float64, []downloadBlock) { + gotBlocksMap := bitmap.Bitmap64(0) + var gotBlocks []downloadBlock + dist := float64(0.0) + for _, n := range sortedNodes { + for _, b := range n.Blocks { + if !gotBlocksMap.Get(b.Index) { + gotBlocks = append(gotBlocks, downloadBlock{ + Node: n.Node, + Block: b, + }) + gotBlocksMap.Set(b.Index, true) + dist += n.Distance + } + + if len(gotBlocks) >= k { + return dist, gotBlocks } - return nil, fmt.Errorf("donwloading file: %w", err) } + } + + return math.MaxFloat64, gotBlocks +} - fileStrs = append(fileStrs, str) +func (iter *DownloadObjectIterator) getMinReadingObjectSolution(sortedNodes []*DownloadNodeInfo, k int) (float64, *model.Node) { + dist := math.MaxFloat64 + var downloadNode *model.Node + for _, n := range sortedNodes { + if n.ObjectPinned && float64(k)*n.Distance < dist { + dist = float64(k) * n.Distance + downloadNode = &n.Node + } } - fileReaders, filesCloser := myio.ToReaders(fileStrs) + return dist, downloadNode +} + +func (iter *DownloadObjectIterator) getNodeDistance(node model.Node) float64 { + if stgglb.Local.NodeID != nil { + if node.NodeID == *stgglb.Local.NodeID { + return consts.NodeDistanceSameNode + } + } - var indexes []int - for _, b := range chosenBlocks { - indexes = append(indexes, b.Index) + if node.LocationID == stgglb.Local.LocationID { + return consts.NodeDistanceSameLocation } - outputs, outputsCloser := myio.ToReaders(rs.ReconstructData(fileReaders, indexes)) - return myio.AfterReadClosed(myio.Length(myio.ChunkedJoin(outputs, int(ecRed.ChunkSize)), obj.Object.Size), func(c io.ReadCloser) { - filesCloser() - outputsCloser() - }), nil + return consts.NodeDistanceOther } -func downloadFile(ctx *DownloadContext, node DownloadNodeInfo, fileHash string) (io.ReadCloser, error) { +func downloadFile(ctx *DownloadContext, node model.Node, fileHash string) (io.ReadCloser, error) { // 如果客户端与节点在同一个地域,则使用内网地址连接节点 - nodeIP := node.Node.ExternalIP - grpcPort := node.Node.ExternalGRPCPort - if node.IsSameLocation { - nodeIP = node.Node.LocalIP - grpcPort = node.Node.LocalGRPCPort + nodeIP := node.ExternalIP + grpcPort := node.ExternalGRPCPort + if node.LocationID == stgglb.Local.LocationID { + nodeIP = node.LocalIP + grpcPort = node.LocalGRPCPort - logger.Infof("client and node %d are at the same location, use local ip", node.Node.NodeID) + logger.Infof("client and node %d are at the same location, use local ip", node.NodeID) } if stgglb.IPFSPool != nil { @@ -257,7 +319,7 @@ func downloadFile(ctx *DownloadContext, node DownloadNodeInfo, fileHash string) logger.Warnf("download from local IPFS failed, so try to download from node %s, err: %s", nodeIP, err.Error()) } - return downloadFromNode(ctx, node.Node.NodeID, nodeIP, grpcPort, fileHash) + return downloadFromNode(ctx, node.NodeID, nodeIP, grpcPort, fileHash) } func downloadFromNode(ctx *DownloadContext, nodeID cdssdk.NodeID, nodeIP string, grpcPort int, fileHash string) (io.ReadCloser, error) { diff --git a/common/pkgs/mq/coordinator/node.go b/common/pkgs/mq/coordinator/node.go index 7285f83..eb307a2 100644 --- a/common/pkgs/mq/coordinator/node.go +++ b/common/pkgs/mq/coordinator/node.go @@ -60,6 +60,15 @@ func NewGetNodesResp(nodes []model.Node) *GetNodesResp { Nodes: nodes, } } +func (r *GetNodesResp) GetNode(id cdssdk.NodeID) *model.Node { + for _, n := range r.Nodes { + if n.NodeID == id { + return &n + } + } + + return nil +} func (client *Client) GetNodes(msg *GetNodes) (*GetNodesResp, error) { return mq.Request(Service.GetNodes, client.rabbitCli, msg) } diff --git a/common/pkgs/mq/coordinator/object.go b/common/pkgs/mq/coordinator/object.go index 543d4f4..6c665d6 100644 --- a/common/pkgs/mq/coordinator/object.go +++ b/common/pkgs/mq/coordinator/object.go @@ -83,6 +83,7 @@ type ChangeObjectRedundancyResp struct { type ChangeObjectRedundancyEntry struct { ObjectID cdssdk.ObjectID `json:"objectID"` Redundancy cdssdk.Redundancy `json:"redundancy"` + PinnedAt []cdssdk.NodeID `json:"pinnedAt"` Blocks []stgmod.ObjectBlock `json:"blocks"` } diff --git a/common/pkgs/mq/coordinator/storage.go b/common/pkgs/mq/coordinator/storage.go index a92169a..f12ce94 100644 --- a/common/pkgs/mq/coordinator/storage.go +++ b/common/pkgs/mq/coordinator/storage.go @@ -5,6 +5,7 @@ import ( "gitlink.org.cn/cloudream/common/pkgs/mq" cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" + stgmod "gitlink.org.cn/cloudream/storage/common/models" "gitlink.org.cn/cloudream/storage/common/pkgs/db/model" ) @@ -55,19 +56,21 @@ var _ = Register(Service.StoragePackageLoaded) type StoragePackageLoaded struct { mq.MessageBodyBase - UserID cdssdk.UserID `json:"userID"` - StorageID cdssdk.StorageID `json:"storageID"` - PackageID cdssdk.PackageID `json:"packageID"` + UserID cdssdk.UserID `json:"userID"` + StorageID cdssdk.StorageID `json:"storageID"` + PackageID cdssdk.PackageID `json:"packageID"` + PinnedBlocks []stgmod.ObjectBlock `json:"pinnedBlocks"` } type StoragePackageLoadedResp struct { mq.MessageBodyBase } -func NewStoragePackageLoaded(userID cdssdk.UserID, stgID cdssdk.StorageID, packageID cdssdk.PackageID) *StoragePackageLoaded { +func NewStoragePackageLoaded(userID cdssdk.UserID, stgID cdssdk.StorageID, packageID cdssdk.PackageID, pinnedBlocks []stgmod.ObjectBlock) *StoragePackageLoaded { return &StoragePackageLoaded{ - UserID: userID, - PackageID: packageID, - StorageID: stgID, + UserID: userID, + PackageID: packageID, + StorageID: stgID, + PinnedBlocks: pinnedBlocks, } } func NewStoragePackageLoadedResp() *StoragePackageLoadedResp { diff --git a/common/pkgs/mq/scanner/event/clean_pinned.go b/common/pkgs/mq/scanner/event/clean_pinned.go new file mode 100644 index 0000000..a2ce976 --- /dev/null +++ b/common/pkgs/mq/scanner/event/clean_pinned.go @@ -0,0 +1,18 @@ +package event + +import cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" + +type CleanPinned struct { + EventBase + PackageID cdssdk.PackageID `json:"nodeID"` +} + +func NewCleanPinned(packageID cdssdk.PackageID) *CleanPinned { + return &CleanPinned{ + PackageID: packageID, + } +} + +func init() { + Register[*CleanPinned]() +} diff --git a/coordinator/internal/mq/object.go b/coordinator/internal/mq/object.go index 876644c..2af1ccc 100644 --- a/coordinator/internal/mq/object.go +++ b/coordinator/internal/mq/object.go @@ -35,7 +35,7 @@ func (svc *Service) GetPackageObjectDetails(msg *coormq.GetPackageObjectDetails) return fmt.Errorf("getting package by id: %w", err) } - details, err = svc.db.ObjectBlock().GetPackageBlockDetails(tx, msg.PackageID) + details, err = svc.db.Object().GetPackageObjectDetails(tx, msg.PackageID) if err != nil { return fmt.Errorf("getting package block details: %w", err) } diff --git a/coordinator/internal/mq/package.go b/coordinator/internal/mq/package.go index d7bf332..ddab4ee 100644 --- a/coordinator/internal/mq/package.go +++ b/coordinator/internal/mq/package.go @@ -131,7 +131,7 @@ func (svc *Service) GetPackageCachedNodes(msg *coormq.GetPackageCachedNodes) (*c } // 这个函数只是统计哪些节点缓存了Package中的数据,不需要多么精确,所以可以不用事务 - objDetails, err := svc.db.ObjectBlock().GetPackageBlockDetails(svc.db.SQLCtx(), msg.PackageID) + objDetails, err := svc.db.Object().GetPackageObjectDetails(svc.db.SQLCtx(), msg.PackageID) if err != nil { logger.WithField("PackageID", msg.PackageID). Warnf("get package block details: %s", err.Error()) diff --git a/coordinator/internal/mq/storage.go b/coordinator/internal/mq/storage.go index 08fe3e4..06ec0f6 100644 --- a/coordinator/internal/mq/storage.go +++ b/coordinator/internal/mq/storage.go @@ -25,6 +25,15 @@ func (svc *Service) GetStorageInfo(msg *coormq.GetStorageInfo) (*coormq.GetStora func (svc *Service) StoragePackageLoaded(msg *coormq.StoragePackageLoaded) (*coormq.StoragePackageLoadedResp, *mq.CodeMessage) { err := svc.db.DoTx(sql.LevelSerializable, func(tx *sqlx.Tx) error { + // 可以不用检查用户是否存在 + if ok, _ := svc.db.Package().IsAvailable(tx, msg.UserID, msg.PackageID); !ok { + return fmt.Errorf("package is not available to user") + } + + if ok, _ := svc.db.Storage().IsAvailable(tx, msg.UserID, msg.StorageID); !ok { + return fmt.Errorf("storage is not available to user") + } + err := svc.db.StoragePackage().Create(tx, msg.StorageID, msg.PackageID, msg.UserID) if err != nil { return fmt.Errorf("creating storage package: %w", err) @@ -35,6 +44,23 @@ func (svc *Service) StoragePackageLoaded(msg *coormq.StoragePackageLoaded) (*coo return fmt.Errorf("creating storage package log: %w", err) } + stg, err := svc.db.Storage().GetByID(tx, msg.StorageID) + if err != nil { + return fmt.Errorf("getting storage: %w", err) + } + + err = svc.db.PinnedObject().CreateFromPackage(tx, msg.PackageID, stg.NodeID) + if err != nil { + return fmt.Errorf("creating pinned object from package: %w", err) + } + + if len(msg.PinnedBlocks) > 0 { + err = svc.db.ObjectBlock().BatchCreate(tx, msg.PinnedBlocks) + if err != nil { + return fmt.Errorf("batch creating object block: %w", err) + } + } + return nil }) if err != nil { diff --git a/scanner/internal/event/clean_pinned.go b/scanner/internal/event/clean_pinned.go new file mode 100644 index 0000000..15a9ee1 --- /dev/null +++ b/scanner/internal/event/clean_pinned.go @@ -0,0 +1,732 @@ +package event + +import ( + "fmt" + "math" + "math/rand" + "strconv" + + "github.com/samber/lo" + "gitlink.org.cn/cloudream/common/pkgs/bitmap" + "gitlink.org.cn/cloudream/common/pkgs/logger" + cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" + mylo "gitlink.org.cn/cloudream/common/utils/lo" + mymath "gitlink.org.cn/cloudream/common/utils/math" + myref "gitlink.org.cn/cloudream/common/utils/reflect" + mysort "gitlink.org.cn/cloudream/common/utils/sort" + "gitlink.org.cn/cloudream/storage/common/consts" + stgglb "gitlink.org.cn/cloudream/storage/common/globals" + stgmod "gitlink.org.cn/cloudream/storage/common/models" + "gitlink.org.cn/cloudream/storage/common/pkgs/db/model" + "gitlink.org.cn/cloudream/storage/common/pkgs/distlock/reqbuilder" + "gitlink.org.cn/cloudream/storage/common/pkgs/ioswitch/plans" + coormq "gitlink.org.cn/cloudream/storage/common/pkgs/mq/coordinator" + scevt "gitlink.org.cn/cloudream/storage/common/pkgs/mq/scanner/event" +) + +type CleanPinned struct { + *scevt.CleanPinned +} + +func NewCleanPinned(evt *scevt.CleanPinned) *CleanPinned { + return &CleanPinned{ + CleanPinned: evt, + } +} + +func (t *CleanPinned) TryMerge(other Event) bool { + event, ok := other.(*CleanPinned) + if !ok { + return false + } + + return t.PackageID == event.PackageID +} + +func (t *CleanPinned) Execute(execCtx ExecuteContext) { + log := logger.WithType[CleanPinned]("Event") + log.Debugf("begin with %v", logger.FormatStruct(t.CleanPinned)) + defer log.Debugf("end") + + coorCli, err := stgglb.CoordinatorMQPool.Acquire() + if err != nil { + log.Warnf("new coordinator client: %s", err.Error()) + return + } + defer stgglb.CoordinatorMQPool.Release(coorCli) + + getObjs, err := coorCli.GetPackageObjectDetails(coormq.NewGetPackageObjectDetails(t.PackageID)) + if err != nil { + log.Warnf("getting package objects: %s", err.Error()) + return + } + + getLoadLog, err := coorCli.GetPackageLoadLogDetails(coormq.ReqGetPackageLoadLogDetails(t.PackageID)) + if err != nil { + log.Warnf("getting package load log details: %s", err.Error()) + return + } + readerNodeIDs := lo.Map(getLoadLog.Logs, func(item coormq.PackageLoadLogDetail, idx int) cdssdk.NodeID { return item.Storage.NodeID }) + + var changeRedEntries []coormq.ChangeObjectRedundancyEntry + for _, obj := range getObjs.Objects { + entry, err := t.doOne(execCtx, readerNodeIDs, coorCli, obj) + if err != nil { + log.WithField("PackageID", obj).Warn(err.Error()) + continue + } + if entry != nil { + changeRedEntries = append(changeRedEntries, *entry) + } + } + + if len(changeRedEntries) > 0 { + _, err = coorCli.ChangeObjectRedundancy(coormq.ReqChangeObjectRedundancy(changeRedEntries)) + if err != nil { + log.Warnf("changing object redundancy: %s", err.Error()) + return + } + } +} + +type doingContext struct { + execCtx ExecuteContext + readerNodeIDs []cdssdk.NodeID // 近期可能访问此对象的节点 + nodesSortedByReader map[cdssdk.NodeID][]nodeDist // 拥有数据的节点到每个可能访问对象的节点按距离排序 + nodeInfos map[cdssdk.NodeID]*model.Node + blockList []objectBlock // 排序后的块分布情况 + nodeBlockBitmaps map[cdssdk.NodeID]*bitmap.Bitmap64 // 用位图的形式表示每一个节点上有哪些块 + allBlockTypeCount int // object总共被分成了几块 + minBlockTypeCount int // 最少要几块才能恢复出完整的object + nodeCombTree combinatorialTree // 节点组合树,用于加速计算容灾度 + + maxScore float64 // 搜索过程中得到过的最大分数 + maxScoreRmBlocks []bool // 最大分数对应的删除方案 + + rmBlocks []bool // 当前删除方案 + inversedIndex int // 当前删除方案是从上一次的方案改动哪个flag而来的 + lastScore float64 // 上一次方案的分数 +} + +type objectBlock struct { + Index int + NodeID cdssdk.NodeID + HasEntity bool // 节点拥有实际的文件数据块 + HasShadow bool // 如果节点拥有完整文件数据,那么认为这个节点拥有所有块,这些块被称为影子块 + FileHash string // 只有在拥有实际文件数据块时,这个字段才有值 +} + +type nodeDist struct { + NodeID cdssdk.NodeID + Distance float64 +} + +type combinatorialTree struct { + nodes []combinatorialTreeNode + blocksMaps map[int]bitmap.Bitmap64 + nodeIDToLocalNodeID map[cdssdk.NodeID]int + localNodeIDToNodeID []cdssdk.NodeID +} + +const ( + iterActionNone = 0 + iterActionSkip = 1 + iterActionBreak = 2 +) + +func newCombinatorialTree(nodeBlocksMaps map[cdssdk.NodeID]*bitmap.Bitmap64) combinatorialTree { + tree := combinatorialTree{ + blocksMaps: make(map[int]bitmap.Bitmap64), + nodeIDToLocalNodeID: make(map[cdssdk.NodeID]int), + } + + tree.nodes = make([]combinatorialTreeNode, (1 << len(nodeBlocksMaps))) + for id, mp := range nodeBlocksMaps { + tree.nodeIDToLocalNodeID[id] = len(tree.localNodeIDToNodeID) + tree.blocksMaps[len(tree.localNodeIDToNodeID)] = *mp + tree.localNodeIDToNodeID = append(tree.localNodeIDToNodeID, id) + } + + tree.nodes[0].localNodeID = -1 + index := 1 + tree.initNode(0, &tree.nodes[0], &index) + + return tree +} + +func (t *combinatorialTree) initNode(minAvaiLocalNodeID int, parent *combinatorialTreeNode, index *int) { + for i := minAvaiLocalNodeID; i < len(t.nodeIDToLocalNodeID); i++ { + curIndex := *index + *index++ + bitMp := t.blocksMaps[i] + bitMp.Or(&parent.blocksBitmap) + + t.nodes[curIndex] = combinatorialTreeNode{ + localNodeID: i, + parent: parent, + blocksBitmap: bitMp, + } + t.initNode(i+1, &t.nodes[curIndex], index) + } +} + +// 获得索引指定的节点所在的层 +func (t *combinatorialTree) GetDepth(index int) int { + depth := 0 + + // 反复判断节点在哪个子树。从左到右,子树节点的数量呈现8 4 2的变化,由此可以得到每个子树的索引值的范围 + subTreeCount := 1 << len(t.nodeIDToLocalNodeID) + for index > 0 { + if index < subTreeCount { + // 定位到一个子树后,深度+1,然后进入这个子树,使用同样的方法再进行定位。 + // 进入子树后需要将索引值-1,因为要去掉子树的根节点 + index-- + depth++ + } else { + // 如果索引值不在这个子树范围内,则将值减去子树的节点数量, + // 这样每一次都可以视为使用同样的逻辑对不同大小的树进行判断。 + index -= subTreeCount + } + subTreeCount >>= 1 + } + + return depth +} + +// 更新某一个算力中心节点的块分布位图,同时更新它对应组合树节点的所有子节点。 +// 如果更新到某个节点时,已有K个块,那么就不会再更新它的子节点 +func (t *combinatorialTree) UpdateBitmap(nodeID cdssdk.NodeID, mp bitmap.Bitmap64, k int) { + t.blocksMaps[t.nodeIDToLocalNodeID[nodeID]] = mp + // 首先定义两种遍历树节点时的移动方式: + // 1. 竖直移动(深度增加):从一个节点移动到它最左边的子节点。每移动一步,index+1 + // 2. 水平移动:从一个节点移动到它右边的兄弟节点。每移动一步,根据它所在的深度,index+8,+4,+2 + // LocalNodeID从0开始,将其+1后得到移动步数steps。 + // 将移动步数拆成多部分,分配到上述的两种移动方式上,并进行任意组合,且保证第一次为至少进行一次的竖直移动,移动之后的节点都会是同一个计算中心节点。 + steps := t.nodeIDToLocalNodeID[nodeID] + 1 + for d := 1; d <= steps; d++ { + t.iterCombBits(len(t.nodeIDToLocalNodeID)-1, steps-d, 0, func(i int) { + index := d + i + node := &t.nodes[index] + + newMp := t.blocksMaps[node.localNodeID] + newMp.Or(&node.parent.blocksBitmap) + node.blocksBitmap = newMp + if newMp.Weight() >= k { + return + } + + t.iterChildren(index, func(index, parentIndex, depth int) int { + curNode := &t.nodes[index] + parentNode := t.nodes[parentIndex] + + newMp := t.blocksMaps[curNode.localNodeID] + newMp.Or(&parentNode.blocksBitmap) + curNode.blocksBitmap = newMp + if newMp.Weight() >= k { + return iterActionSkip + } + + return iterActionNone + }) + }) + } +} + +// 遍历树,找到至少拥有K个块的树节点的最大深度 +func (t *combinatorialTree) FindKBlocksMaxDepth(k int) int { + maxDepth := -1 + t.iterChildren(0, func(index, parentIndex, depth int) int { + if t.nodes[index].blocksBitmap.Weight() >= k { + if maxDepth < depth { + maxDepth = depth + } + return iterActionSkip + } + // 如果到了叶子节点,还没有找到K个块,那就认为要满足K个块,至少需要再多一个节点,即深度+1。 + // 由于遍历时采用的是深度优先的算法,因此遍历到这个叶子节点时,叶子节点再加一个节点的组合已经在前面搜索过, + // 所以用当前叶子节点深度+1来作为当前分支的结果就可以,即使当前情况下增加任意一个节点依然不够K块, + // 可以使用同样的思路去递推到当前叶子节点增加两个块的情况。 + if t.nodes[index].localNodeID == len(t.nodeIDToLocalNodeID)-1 { + if maxDepth < depth+1 { + maxDepth = depth + 1 + } + } + + return iterActionNone + }) + + if maxDepth == -1 || maxDepth > len(t.nodeIDToLocalNodeID) { + return len(t.nodeIDToLocalNodeID) + } + + return maxDepth +} + +func (t *combinatorialTree) iterCombBits(width int, count int, offset int, callback func(int)) { + if count == 0 { + callback(offset) + return + } + + for b := width; b >= count; b-- { + t.iterCombBits(b-1, count-1, offset+(1<>= 1 + } +} + +func (t *combinatorialTree) itering(index int, parentIndex int, depth int, do func(index int, parentIndex int, depth int) int) int { + act := do(index, parentIndex, depth) + if act == iterActionBreak { + return act + } + if act == iterActionSkip { + return iterActionNone + } + + curNode := &t.nodes[index] + childIndex := index + 1 + + childCounts := len(t.nodeIDToLocalNodeID) - 1 - curNode.localNodeID + if childCounts == 0 { + return iterActionNone + } + + childTreeNodeCnt := 1 << (childCounts - 1) + for c := 0; c < childCounts; c++ { + act = t.itering(childIndex, index, depth+1, do) + if act == iterActionBreak { + return act + } + + childIndex += childTreeNodeCnt + childTreeNodeCnt >>= 1 + } + + return iterActionNone +} + +type combinatorialTreeNode struct { + localNodeID int + parent *combinatorialTreeNode + blocksBitmap bitmap.Bitmap64 // 选择了这个中心之后,所有中心一共包含多少种块 +} + +func (t *CleanPinned) doOne(execCtx ExecuteContext, readerNodeIDs []cdssdk.NodeID, coorCli *coormq.Client, obj stgmod.ObjectDetail) (*coormq.ChangeObjectRedundancyEntry, error) { + if len(obj.PinnedAt) == 0 && len(obj.Blocks) == 0 { + return nil, nil + } + + ctx := doingContext{ + execCtx: execCtx, + readerNodeIDs: readerNodeIDs, + nodesSortedByReader: make(map[cdssdk.NodeID][]nodeDist), + nodeInfos: make(map[cdssdk.NodeID]*model.Node), + nodeBlockBitmaps: make(map[cdssdk.NodeID]*bitmap.Bitmap64), + } + + err := t.getNodeInfos(&ctx, coorCli, obj) + if err != nil { + return nil, err + } + + err = t.makeBlockList(&ctx, obj) + if err != nil { + return nil, err + } + + if ctx.blockList == nil { + return nil, nil + } + + t.makeNodeBlockBitmap(&ctx) + + t.sortNodeByReaderDistance(&ctx) + + ctx.rmBlocks = make([]bool, len(ctx.blockList)) + ctx.inversedIndex = -1 + ctx.nodeCombTree = newCombinatorialTree(ctx.nodeBlockBitmaps) + + ctx.lastScore = t.calcScore(&ctx) + ctx.maxScore = ctx.lastScore + ctx.maxScoreRmBlocks = mylo.ArrayClone(ctx.rmBlocks) + + // 模拟退火算法的温度 + curTemp := ctx.lastScore + // 结束温度 + finalTemp := curTemp * 0.2 + // 冷却率 + coolingRate := 0.95 + + for curTemp > finalTemp { + ctx.inversedIndex = rand.Intn(len(ctx.rmBlocks)) + block := ctx.blockList[ctx.inversedIndex] + ctx.rmBlocks[ctx.inversedIndex] = !ctx.rmBlocks[ctx.inversedIndex] + ctx.nodeBlockBitmaps[block.NodeID].Set(block.Index, !ctx.rmBlocks[ctx.inversedIndex]) + ctx.nodeCombTree.UpdateBitmap(block.NodeID, *ctx.nodeBlockBitmaps[block.NodeID], ctx.minBlockTypeCount) + + curScore := t.calcScore(&ctx) + + dScore := curScore - ctx.lastScore + // 如果新方案比旧方案得分低,且没有要求强制接受新方案,那么就将变化改回去 + if curScore == 0 || (dScore < 0 && !t.alwaysAccept(curTemp, dScore, coolingRate)) { + ctx.rmBlocks[ctx.inversedIndex] = !ctx.rmBlocks[ctx.inversedIndex] + ctx.nodeBlockBitmaps[block.NodeID].Set(block.Index, !ctx.rmBlocks[ctx.inversedIndex]) + ctx.nodeCombTree.UpdateBitmap(block.NodeID, *ctx.nodeBlockBitmaps[block.NodeID], ctx.minBlockTypeCount) + fmt.Printf("\n") + } else { + fmt.Printf(" accept!\n") + ctx.lastScore = curScore + if ctx.maxScore < curScore { + ctx.maxScore = ctx.lastScore + ctx.maxScoreRmBlocks = mylo.ArrayClone(ctx.rmBlocks) + } + } + curTemp *= coolingRate + } + + return t.applySolution(ctx, obj) +} + +func (t *CleanPinned) getNodeInfos(ctx *doingContext, coorCli *coormq.Client, obj stgmod.ObjectDetail) error { + var nodeIDs []cdssdk.NodeID + for _, b := range obj.Blocks { + nodeIDs = append(nodeIDs, b.NodeID) + } + nodeIDs = append(nodeIDs, obj.PinnedAt...) + + nodeIDs = append(nodeIDs, ctx.readerNodeIDs...) + + getNode, err := coorCli.GetNodes(coormq.NewGetNodes(lo.Uniq(nodeIDs))) + if err != nil { + return fmt.Errorf("requesting to coordinator: %w", err) + } + + for _, n := range getNode.Nodes { + ctx.nodeInfos[n.NodeID] = &n + } + + return nil +} + +func (t *CleanPinned) makeBlockList(ctx *doingContext, obj stgmod.ObjectDetail) error { + blockCnt := 1 + minBlockCnt := 1 + switch red := obj.Object.Redundancy.(type) { + case *cdssdk.NoneRedundancy: + return nil + case *cdssdk.RepRedundancy: + blockCnt = 1 + minBlockCnt = 1 + case *cdssdk.ECRedundancy: + blockCnt = red.N + minBlockCnt = red.K + default: + return fmt.Errorf("unknow redundancy type: %v", myref.TypeOfValue(obj.Object.Redundancy)) + } + + blocksMap := make(map[cdssdk.NodeID][]objectBlock) + + // 先生成所有的影子块 + for _, pinned := range obj.PinnedAt { + blocks := make([]objectBlock, 0, blockCnt) + for i := 0; i < blockCnt; i++ { + blocks = append(blocks, objectBlock{ + Index: i, + NodeID: pinned, + HasShadow: true, + }) + } + blocksMap[pinned] = blocks + } + + // 再填充实际块 + for _, b := range obj.Blocks { + blocks := blocksMap[b.NodeID] + + has := false + for i := range blocks { + if blocks[i].Index == b.Index { + blocks[i].HasEntity = true + blocks[i].FileHash = b.FileHash + has = true + break + } + } + + if has { + continue + } + + blocks = append(blocks, objectBlock{ + Index: b.Index, + NodeID: b.NodeID, + HasEntity: true, + FileHash: b.FileHash, + }) + blocksMap[b.NodeID] = blocks + } + + var sortedBlocks []objectBlock + for _, bs := range blocksMap { + sortedBlocks = append(sortedBlocks, bs...) + } + sortedBlocks = mysort.Sort(sortedBlocks, func(left objectBlock, right objectBlock) int { + d := left.NodeID - right.NodeID + if d != 0 { + return int(d) + } + + return left.Index - right.Index + }) + + ctx.allBlockTypeCount = blockCnt + ctx.minBlockTypeCount = minBlockCnt + ctx.blockList = sortedBlocks + return nil +} + +func (t *CleanPinned) makeNodeBlockBitmap(ctx *doingContext) { + for _, b := range ctx.blockList { + mp, ok := ctx.nodeBlockBitmaps[b.NodeID] + if !ok { + nb := bitmap.Bitmap64(0) + mp = &nb + ctx.nodeBlockBitmaps[b.NodeID] = mp + } + mp.Set(b.Index, true) + } +} + +func (t *CleanPinned) sortNodeByReaderDistance(ctx *doingContext) { + for _, r := range ctx.readerNodeIDs { + var nodeDists []nodeDist + + for n := range ctx.nodeBlockBitmaps { + if r == n { + // 同节点时距离视为0.1 + nodeDists = append(nodeDists, nodeDist{ + NodeID: n, + Distance: consts.NodeDistanceSameNode, + }) + } else if ctx.nodeInfos[r].LocationID == ctx.nodeInfos[n].LocationID { + // 同地区时距离视为1 + nodeDists = append(nodeDists, nodeDist{ + NodeID: n, + Distance: consts.NodeDistanceSameLocation, + }) + } else { + // 不同地区时距离视为5 + nodeDists = append(nodeDists, nodeDist{ + NodeID: n, + Distance: consts.NodeDistanceOther, + }) + } + } + + ctx.nodesSortedByReader[r] = mysort.Sort(nodeDists, func(left, right nodeDist) int { return mysort.Cmp(left.Distance, right.Distance) }) + } +} + +func (t *CleanPinned) calcScore(ctx *doingContext) float64 { + dt := t.calcDisasterTolerance(ctx) + ac := t.calcMinAccessCost(ctx) + sc := t.calcSpaceCost(ctx) + + dtSc := 1.0 + if dt < 1 { + dtSc = 0 + } else if dt >= 2 { + dtSc = 1.5 + } + + newSc := 0.0 + if dt == 0 || ac == 0 { + newSc = 0 + } else { + newSc = dtSc / (sc * ac) + } + + fmt.Printf("solu: %v, cur: %v, dt: %v, ac: %v, sc: %v ", ctx.rmBlocks, newSc, dt, ac, sc) + return newSc +} + +// 计算容灾度 +func (t *CleanPinned) calcDisasterTolerance(ctx *doingContext) float64 { + if ctx.inversedIndex != -1 { + node := ctx.blockList[ctx.inversedIndex] + ctx.nodeCombTree.UpdateBitmap(node.NodeID, *ctx.nodeBlockBitmaps[node.NodeID], ctx.minBlockTypeCount) + } + return float64(len(ctx.nodeBlockBitmaps) - ctx.nodeCombTree.FindKBlocksMaxDepth(ctx.minBlockTypeCount)) +} + +// 计算最小访问数据的代价 +func (t *CleanPinned) calcMinAccessCost(ctx *doingContext) float64 { + cost := math.MaxFloat64 + for _, reader := range ctx.readerNodeIDs { + tarNodes := ctx.nodesSortedByReader[reader] + gotBlocks := bitmap.Bitmap64(0) + thisCost := 0.0 + + for _, tar := range tarNodes { + tarNodeMp := ctx.nodeBlockBitmaps[tar.NodeID] + + // 只需要从目的节点上获得缺少的块 + curWeigth := gotBlocks.Weight() + // 下面的if会在拿到k个块之后跳出循环,所以or多了块也没关系 + gotBlocks.Or(tarNodeMp) + willGetBlocks := mymath.Min(gotBlocks.Weight()-curWeigth, ctx.minBlockTypeCount-curWeigth) + thisCost += float64(willGetBlocks) * float64(tar.Distance) + + if gotBlocks.Weight() >= ctx.minBlockTypeCount { + break + } + } + if gotBlocks.Weight() >= ctx.minBlockTypeCount { + cost = math.Min(cost, thisCost) + } + } + + return cost +} + +// 计算冗余度 +func (t *CleanPinned) calcSpaceCost(ctx *doingContext) float64 { + blockCount := 0 + for i, b := range ctx.blockList { + if ctx.rmBlocks[i] { + continue + } + + if b.HasEntity { + blockCount++ + } + if b.HasShadow { + blockCount++ + } + } + // 所有算力中心上拥有的块的总数 / 一个对象被分成了几个块 + return float64(blockCount) / float64(ctx.minBlockTypeCount) +} + +// 如果新方案得分比旧方案小,那么在一定概率内也接受新方案 +func (t *CleanPinned) alwaysAccept(curTemp float64, dScore float64, coolingRate float64) bool { + v := math.Exp(dScore / curTemp / coolingRate) + fmt.Printf(" -- chance: %v, temp: %v", v, curTemp) + return v > rand.Float64() +} + +func (t *CleanPinned) applySolution(ctx doingContext, obj stgmod.ObjectDetail) (*coormq.ChangeObjectRedundancyEntry, error) { + entry := coormq.ChangeObjectRedundancyEntry{ + ObjectID: obj.Object.ObjectID, + Redundancy: obj.Object.Redundancy, + } + fmt.Printf("final solu: %v, score: %v\n", ctx.maxScoreRmBlocks, ctx.maxScore) + + reconstrct := make(map[cdssdk.NodeID]*[]int) + for i, f := range ctx.maxScoreRmBlocks { + block := ctx.blockList[i] + if !f { + entry.Blocks = append(entry.Blocks, stgmod.ObjectBlock{ + ObjectID: obj.Object.ObjectID, + Index: block.Index, + NodeID: block.NodeID, + FileHash: block.FileHash, + }) + + // 如果这个块是影子块,那么就要从完整对象里重建这个块 + if !block.HasEntity { + re, ok := reconstrct[block.NodeID] + if !ok { + re = &[]int{} + reconstrct[block.NodeID] = re + } + + *re = append(*re, block.Index) + } + } + } + + bld := reqbuilder.NewBuilder() + for id := range reconstrct { + bld.IPFS().Buzy(id) + } + + mutex, err := bld.MutexLock(ctx.execCtx.Args.DistLock) + if err != nil { + return nil, fmt.Errorf("acquiring distlock: %w", err) + } + defer mutex.Unlock() + + if ecRed, ok := obj.Object.Redundancy.(*cdssdk.ECRedundancy); ok { + for id, idxs := range reconstrct { + bld := plans.NewPlanBuilder() + agt := bld.AtAgent(*ctx.nodeInfos[id]) + + strs := agt.IPFSRead(obj.Object.FileHash).ChunkedSplit(ecRed.ChunkSize, ecRed.K, true) + ss := agt.ECReconstructAny(*ecRed, lo.Range(ecRed.K), *idxs, strs.Streams...) + for i, s := range ss.Streams { + s.IPFSWrite(fmt.Sprintf("%d", (*idxs)[i])) + } + + plan, err := bld.Build() + if err != nil { + return nil, fmt.Errorf("building io switch plan: %w", err) + } + + exec, err := plans.Execute(*plan) + if err != nil { + return nil, fmt.Errorf("executing io switch plan: %w", err) + } + ret, err := exec.Wait() + if err != nil { + return nil, fmt.Errorf("executing io switch plan: %w", err) + } + + for k, v := range ret.ResultValues { + idx, err := strconv.ParseInt(k, 10, 32) + if err != nil { + return nil, fmt.Errorf("parsing plan result: %w", err) + } + + for i := range entry.Blocks { + if entry.Blocks[i].NodeID == id && entry.Blocks[i].Index == int(idx) { + entry.Blocks[i].FileHash = v.(string) + } + } + } + + } + } else if _, ok := obj.Object.Redundancy.(*cdssdk.RepRedundancy); ok { + // rep模式不分块,所以每一个Block的FileHash就是完整文件的FileHash + for i := range entry.Blocks { + entry.Blocks[i].FileHash = obj.Object.FileHash + } + } + + return &entry, nil +} + +func init() { + RegisterMessageConvertor(NewCleanPinned) +} diff --git a/scanner/internal/event/clean_pinned_test.go b/scanner/internal/event/clean_pinned_test.go new file mode 100644 index 0000000..0f167f8 --- /dev/null +++ b/scanner/internal/event/clean_pinned_test.go @@ -0,0 +1,264 @@ +package event + +import ( + "testing" + + . "github.com/smartystreets/goconvey/convey" + "gitlink.org.cn/cloudream/common/pkgs/bitmap" + cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" +) + +func newTreeTest(nodeBlocksMap []bitmap.Bitmap64) combinatorialTree { + tree := combinatorialTree{ + blocksMaps: make(map[int]bitmap.Bitmap64), + nodeIDToLocalNodeID: make(map[cdssdk.NodeID]int), + } + + tree.nodes = make([]combinatorialTreeNode, (1 << len(nodeBlocksMap))) + for id, mp := range nodeBlocksMap { + tree.nodeIDToLocalNodeID[cdssdk.NodeID(id)] = len(tree.localNodeIDToNodeID) + tree.blocksMaps[len(tree.localNodeIDToNodeID)] = mp + tree.localNodeIDToNodeID = append(tree.localNodeIDToNodeID, cdssdk.NodeID(id)) + } + + tree.nodes[0].localNodeID = -1 + index := 1 + tree.initNode(0, &tree.nodes[0], &index) + + return tree +} + +func Test_iterCombBits(t *testing.T) { + testcases := []struct { + title string + width int + count int + expectedValues []int + }{ + { + title: "1 of 4", + width: 4, + count: 1, + expectedValues: []int{16, 8, 4, 2}, + }, + + { + title: "2 of 4", + width: 4, + count: 2, + expectedValues: []int{24, 20, 18, 12, 10, 6}, + }, + + { + title: "3 of 4", + width: 4, + count: 3, + expectedValues: []int{28, 26, 22, 14}, + }, + + { + title: "4 of 4", + width: 4, + count: 4, + expectedValues: []int{30}, + }, + } + + for _, test := range testcases { + Convey(test.title, t, func() { + var ret []int + var t combinatorialTree + t.iterCombBits(test.width, test.count, 0, func(i int) { + ret = append(ret, i) + }) + + So(ret, ShouldResemble, test.expectedValues) + }) + } +} + +func Test_newCombinatorialTree(t *testing.T) { + testcases := []struct { + title string + nodeBlocks []bitmap.Bitmap64 + expectedTreeNodeLocalIDs []int + expectedTreeNodeBitmaps []int + }{ + { + title: "1个节点", + nodeBlocks: []bitmap.Bitmap64{1}, + expectedTreeNodeLocalIDs: []int{-1, 0}, + expectedTreeNodeBitmaps: []int{0, 1}, + }, + { + title: "2个节点", + nodeBlocks: []bitmap.Bitmap64{1, 0}, + expectedTreeNodeLocalIDs: []int{-1, 0, 1, 1}, + expectedTreeNodeBitmaps: []int{0, 1, 1, 0}, + }, + { + title: "4个节点", + nodeBlocks: []bitmap.Bitmap64{1, 2, 4, 8}, + expectedTreeNodeLocalIDs: []int{-1, 0, 1, 2, 3, 3, 2, 3, 3, 1, 2, 3, 3, 2, 3, 3}, + expectedTreeNodeBitmaps: []int{0, 1, 3, 7, 15, 11, 5, 13, 9, 2, 6, 14, 10, 4, 12, 8}, + }, + } + + for _, test := range testcases { + Convey(test.title, t, func() { + t := newTreeTest(test.nodeBlocks) + + var localIDs []int + var bitmaps []int + for _, n := range t.nodes { + localIDs = append(localIDs, n.localNodeID) + bitmaps = append(bitmaps, int(n.blocksBitmap)) + } + + So(localIDs, ShouldResemble, test.expectedTreeNodeLocalIDs) + So(bitmaps, ShouldResemble, test.expectedTreeNodeBitmaps) + }) + } +} + +func Test_UpdateBitmap(t *testing.T) { + testcases := []struct { + title string + nodeBlocks []bitmap.Bitmap64 + updatedNodeID cdssdk.NodeID + updatedBitmap bitmap.Bitmap64 + k int + expectedTreeNodeBitmaps []int + }{ + + { + title: "4个节点,更新但值不变", + nodeBlocks: []bitmap.Bitmap64{1, 2, 4, 8}, + updatedNodeID: cdssdk.NodeID(0), + updatedBitmap: bitmap.Bitmap64(1), + k: 4, + expectedTreeNodeBitmaps: []int{0, 1, 3, 7, 15, 11, 5, 13, 9, 2, 6, 14, 10, 4, 12, 8}, + }, + + { + title: "4个节点,更新0", + nodeBlocks: []bitmap.Bitmap64{1, 2, 4, 8}, + updatedNodeID: cdssdk.NodeID(0), + updatedBitmap: bitmap.Bitmap64(2), + k: 4, + expectedTreeNodeBitmaps: []int{0, 2, 2, 6, 14, 10, 6, 14, 10, 2, 6, 14, 10, 4, 12, 8}, + }, + + { + title: "4个节点,更新1", + nodeBlocks: []bitmap.Bitmap64{1, 2, 4, 8}, + updatedNodeID: cdssdk.NodeID(1), + updatedBitmap: bitmap.Bitmap64(1), + k: 4, + expectedTreeNodeBitmaps: []int{0, 1, 1, 5, 13, 9, 5, 13, 9, 1, 5, 13, 9, 4, 12, 8}, + }, + + { + title: "4个节点,更新2", + nodeBlocks: []bitmap.Bitmap64{1, 2, 4, 8}, + updatedNodeID: cdssdk.NodeID(2), + updatedBitmap: bitmap.Bitmap64(1), + k: 4, + expectedTreeNodeBitmaps: []int{0, 1, 3, 3, 11, 11, 1, 9, 9, 2, 3, 11, 10, 1, 9, 8}, + }, + + { + title: "4个节点,更新3", + nodeBlocks: []bitmap.Bitmap64{1, 2, 4, 8}, + updatedNodeID: cdssdk.NodeID(3), + updatedBitmap: bitmap.Bitmap64(1), + k: 4, + expectedTreeNodeBitmaps: []int{0, 1, 3, 7, 7, 3, 5, 5, 1, 2, 6, 7, 3, 4, 5, 1}, + }, + + { + title: "4个节点,k<4,更新0,0之前没有k个块,现在拥有", + nodeBlocks: []bitmap.Bitmap64{1, 2, 4, 8}, + updatedNodeID: cdssdk.NodeID(0), + updatedBitmap: bitmap.Bitmap64(3), + k: 2, + expectedTreeNodeBitmaps: []int{0, 3, 3, 7, 15, 11, 5, 13, 9, 2, 6, 14, 10, 4, 12, 8}, + }, + { + title: "4个节点,k<4,更新0,0之前有k个块,现在没有", + nodeBlocks: []bitmap.Bitmap64{3, 4, 0, 0}, + updatedNodeID: cdssdk.NodeID(0), + updatedBitmap: bitmap.Bitmap64(0), + k: 2, + expectedTreeNodeBitmaps: []int{0, 0, 4, 4, 4, 4, 0, 0, 0, 4, 4, 4, 4, 0, 0, 0}, + }, + } + + for _, test := range testcases { + Convey(test.title, t, func() { + t := newTreeTest(test.nodeBlocks) + t.UpdateBitmap(test.updatedNodeID, test.updatedBitmap, test.k) + + var bitmaps []int + for _, n := range t.nodes { + bitmaps = append(bitmaps, int(n.blocksBitmap)) + } + + So(bitmaps, ShouldResemble, test.expectedTreeNodeBitmaps) + }) + } +} + +func Test_FindKBlocksMaxDepth(t *testing.T) { + testcases := []struct { + title string + nodeBlocks []bitmap.Bitmap64 + k int + expected int + }{ + { + title: "每个节点各有一个块", + nodeBlocks: []bitmap.Bitmap64{1, 2, 4, 8}, + k: 2, + expected: 2, + }, + { + title: "所有节点加起来块数不足", + nodeBlocks: []bitmap.Bitmap64{1, 1, 1, 1}, + k: 2, + expected: 4, + }, + { + title: "不同节点有相同块", + nodeBlocks: []bitmap.Bitmap64{1, 1, 2, 4}, + k: 2, + expected: 3, + }, + { + title: "一个节点就拥有所有块", + nodeBlocks: []bitmap.Bitmap64{3, 6, 12, 24}, + k: 2, + expected: 1, + }, + { + title: "只有一块,且只在某一个节点1", + nodeBlocks: []bitmap.Bitmap64{1, 0}, + k: 1, + expected: 2, + }, + { + title: "只有一块,且只在某一个节点2", + nodeBlocks: []bitmap.Bitmap64{0, 1}, + k: 1, + expected: 2, + }, + } + + for _, test := range testcases { + Convey(test.title, t, func() { + t := newTreeTest(test.nodeBlocks) + ret := t.FindKBlocksMaxDepth(test.k) + So(ret, ShouldResemble, test.expected) + }) + } +} diff --git a/scanner/internal/tickevent/batch_clean_pinned.go b/scanner/internal/tickevent/batch_clean_pinned.go new file mode 100644 index 0000000..00e2f5c --- /dev/null +++ b/scanner/internal/tickevent/batch_clean_pinned.go @@ -0,0 +1,48 @@ +package tickevent + +import ( + "time" + + "gitlink.org.cn/cloudream/common/pkgs/logger" + "gitlink.org.cn/cloudream/storage/common/pkgs/mq/scanner/event" + evt "gitlink.org.cn/cloudream/storage/scanner/internal/event" +) + +type BatchCleanPinned struct { + lastCheckStart int +} + +func NewBatchCleanPinned() *BatchCleanPinned { + return &BatchCleanPinned{} +} + +func (e *BatchCleanPinned) Execute(ctx ExecuteContext) { + log := logger.WithType[BatchCleanPinned]("TickEvent") + log.Debugf("begin") + defer log.Debugf("end") + + // TODO 更好的策略 + nowHour := time.Now().Hour() + if nowHour > 6 { + return + } + + packageIDs, err := ctx.Args.DB.Package().BatchGetAllPackageIDs(ctx.Args.DB.SQLCtx(), e.lastCheckStart, CheckPackageBatchSize) + if err != nil { + log.Warnf("batch get package ids failed, err: %s", err.Error()) + return + } + + for _, id := range packageIDs { + ctx.Args.EventExecutor.Post(evt.NewCleanPinned(event.NewCleanPinned(id))) + } + + // 如果结果的长度小于预期的长度,则认为已经查询了所有,下次从头再来 + if len(packageIDs) < CheckPackageBatchSize { + e.lastCheckStart = 0 + log.Debugf("all package clean pinned, next time will start check at offset 0") + + } else { + e.lastCheckStart += CheckPackageBatchSize + } +} diff --git a/scanner/main.go b/scanner/main.go index f3aee3d..88d6a5a 100644 --- a/scanner/main.go +++ b/scanner/main.go @@ -125,5 +125,7 @@ func startTickEvent(tickExecutor *tickevent.Executor) { tickExecutor.Start(tickevent.NewCheckAgentState(), 5*60*1000, tickevent.StartOption{RandomStartDelayMs: 60 * 1000}) - tickExecutor.Start(tickevent.NewBatchCheckPackageRedundancy(), interval, tickevent.StartOption{RandomStartDelayMs: 10 * 60 * 1000}) + tickExecutor.Start(tickevent.NewBatchCheckPackageRedundancy(), interval, tickevent.StartOption{RandomStartDelayMs: 20 * 60 * 1000}) + + tickExecutor.Start(tickevent.NewBatchCleanPinned(), interval, tickevent.StartOption{RandomStartDelayMs: 20 * 60 * 1000}) }