diff --git a/agent/internal/services/mq/cache.go b/agent/internal/services/mq/cache.go index 0a36ff5..fbdf9b4 100644 --- a/agent/internal/services/mq/cache.go +++ b/agent/internal/services/mq/cache.go @@ -61,7 +61,7 @@ func (svc *Service) checkIncrement(msg *agtmq.CheckCache, filesMap map[string]sh svc.taskManager.StartComparable(task.NewIPFSPin(cache.FileHash)) } else if cache.State == consts.CacheStateTemp { - if time.Since(cache.CacheTime) > time.Duration(config.Cfg().TempFileLifetime)*time.Second { + if time.Since(cache.CreateTime) > time.Duration(config.Cfg().TempFileLifetime)*time.Second { entries = append(entries, agtmq.NewCheckCacheRespEntry(cache.FileHash, agtmq.CHECK_IPFS_RESP_OP_DELETE_TEMP)) } } @@ -96,7 +96,7 @@ func (svc *Service) checkComplete(msg *agtmq.CheckCache, filesMap map[string]she svc.taskManager.StartComparable(task.NewIPFSPin(cache.FileHash)) } else if cache.State == consts.CacheStateTemp { - if time.Since(cache.CacheTime) > time.Duration(config.Cfg().TempFileLifetime)*time.Second { + if time.Since(cache.CreateTime) > time.Duration(config.Cfg().TempFileLifetime)*time.Second { entries = append(entries, agtmq.NewCheckCacheRespEntry(cache.FileHash, agtmq.CHECK_IPFS_RESP_OP_DELETE_TEMP)) } } @@ -127,8 +127,6 @@ func (svc *Service) WaitCacheMovePackage(msg *agtmq.WaitCacheMovePackage) (*agtm return nil, mq.Failed(errorcode.TaskNotFound, "task not found") } - mvPkgTask := tsk.Body().(*mytask.CacheMovePackage) - if msg.WaitTimeoutMs == 0 { tsk.Wait() @@ -137,7 +135,7 @@ func (svc *Service) WaitCacheMovePackage(msg *agtmq.WaitCacheMovePackage) (*agtm errMsg = tsk.Error().Error() } - return mq.ReplyOK(agtmq.NewWaitCacheMovePackageResp(true, errMsg, mvPkgTask.ResultCacheInfos)) + return mq.ReplyOK(agtmq.NewWaitCacheMovePackageResp(true, errMsg)) } else { if tsk.WaitTimeout(time.Duration(msg.WaitTimeoutMs) * time.Millisecond) { @@ -147,9 +145,9 @@ func (svc *Service) WaitCacheMovePackage(msg *agtmq.WaitCacheMovePackage) (*agtm errMsg = tsk.Error().Error() } - return mq.ReplyOK(agtmq.NewWaitCacheMovePackageResp(true, errMsg, mvPkgTask.ResultCacheInfos)) + return mq.ReplyOK(agtmq.NewWaitCacheMovePackageResp(true, errMsg)) } - return mq.ReplyOK(agtmq.NewWaitCacheMovePackageResp(false, "", nil)) + return mq.ReplyOK(agtmq.NewWaitCacheMovePackageResp(false, "")) } } diff --git a/agent/internal/task/cache_move_package.go b/agent/internal/task/cache_move_package.go index 4042a2d..ce63435 100644 --- a/agent/internal/task/cache_move_package.go +++ b/agent/internal/task/cache_move_package.go @@ -8,19 +8,17 @@ import ( "gitlink.org.cn/cloudream/common/pkgs/task" cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" stgglb "gitlink.org.cn/cloudream/storage/common/globals" - "gitlink.org.cn/cloudream/storage/common/pkgs/db/model" "gitlink.org.cn/cloudream/storage/common/pkgs/distlock/reqbuilder" + "gitlink.org.cn/cloudream/storage/common/pkgs/iterator" coormq "gitlink.org.cn/cloudream/storage/common/pkgs/mq/coordinator" ) type CacheMovePackage struct { - userID int64 - packageID int64 - - ResultCacheInfos []cdssdk.ObjectCacheInfo + userID cdssdk.UserID + packageID cdssdk.PackageID } -func NewCacheMovePackage(userID int64, packageID int64) *CacheMovePackage { +func NewCacheMovePackage(userID cdssdk.UserID, packageID cdssdk.PackageID) *CacheMovePackage { return &CacheMovePackage{ userID: userID, packageID: packageID, @@ -63,24 +61,9 @@ func (t *CacheMovePackage) do(ctx TaskContext) error { } defer stgglb.CoordinatorMQPool.Release(coorCli) - pkgResp, err := coorCli.GetPackage(coormq.NewGetPackage(t.userID, t.packageID)) + getResp, err := coorCli.GetPackageObjectDetails(coormq.NewGetPackageObjectDetails(t.packageID)) if err != nil { - return fmt.Errorf("getting package: %w", err) - } - - if pkgResp.Redundancy.IsRepInfo() { - return t.moveRep(ctx, coorCli, pkgResp.Package) - } else { - return fmt.Errorf("not implement yet!") - // TODO EC的CacheMove逻辑 - } - - return nil -} -func (t *CacheMovePackage) moveRep(ctx TaskContext, coorCli *coormq.Client, pkg model.Package) error { - getRepResp, err := coorCli.GetPackageObjectRepData(coormq.NewGetPackageObjectRepData(pkg.PackageID)) - if err != nil { - return fmt.Errorf("getting package object rep data: %w", err) + return fmt.Errorf("getting package object details: %w", err) } ipfsCli, err := stgglb.IPFSPool.Acquire() @@ -89,19 +72,26 @@ func (t *CacheMovePackage) moveRep(ctx TaskContext, coorCli *coormq.Client, pkg } defer ipfsCli.Close() - var fileHashes []string - for _, rep := range getRepResp.Data { - if err := ipfsCli.Pin(rep.FileHash); err != nil { - return fmt.Errorf("pinning file %s: %w", rep.FileHash, err) + // TODO 可以考虑优化,比如rep类型的直接pin就可以 + objIter := iterator.NewDownloadObjectIterator(getResp.Objects, &iterator.DownloadContext{ + Distlock: ctx.distlock, + }) + defer objIter.Close() + + for { + obj, err := objIter.MoveNext() + if err != nil { + if err == iterator.ErrNoMoreItem { + break + } + return err } + defer obj.File.Close() - fileHashes = append(fileHashes, rep.FileHash) - t.ResultCacheInfos = append(t.ResultCacheInfos, cdssdk.NewObjectCacheInfo(rep.Object, rep.FileHash)) - } - - _, err = coorCli.CachePackageMoved(coormq.NewCachePackageMoved(pkg.PackageID, *stgglb.Local.NodeID, fileHashes)) - if err != nil { - return fmt.Errorf("reporting cache package moved: %w", err) + _, err = ipfsCli.CreateFile(obj.File) + if err != nil { + return fmt.Errorf("creating ipfs file: %w", err) + } } return nil diff --git a/agent/internal/task/create_ec_package.go b/agent/internal/task/create_package.go similarity index 100% rename from agent/internal/task/create_ec_package.go rename to agent/internal/task/create_package.go diff --git a/client/internal/cmdline/cache.go b/client/internal/cmdline/cache.go index d4e358c..7a65745 100644 --- a/client/internal/cmdline/cache.go +++ b/client/internal/cmdline/cache.go @@ -14,7 +14,7 @@ func CacheMovePackage(ctx CommandContext, packageID cdssdk.PackageID, nodeID cds } for { - complete, _, err := ctx.Cmdline.Svc.CacheSvc().WaitCacheMovePackage(nodeID, taskID, time.Second*10) + complete, err := ctx.Cmdline.Svc.CacheSvc().WaitCacheMovePackage(nodeID, taskID, time.Second*10) if complete { if err != nil { return fmt.Errorf("moving complete with: %w", err) diff --git a/client/internal/http/cacah.go b/client/internal/http/cacah.go index 1435239..e1f1210 100644 --- a/client/internal/http/cacah.go +++ b/client/internal/http/cacah.go @@ -25,9 +25,7 @@ type CacheMovePackageReq struct { PackageID *cdssdk.PackageID `json:"packageID" binding:"required"` NodeID *cdssdk.NodeID `json:"nodeID" binding:"required"` } -type CacheMovePackageResp struct { - CacheInfos []cdssdk.ObjectCacheInfo `json:"cacheInfos"` -} +type CacheMovePackageResp = cdssdk.CacheMovePackageResp func (s *CacheService) MovePackage(ctx *gin.Context) { log := logger.WithField("HTTP", "Cache.LoadPackage") @@ -47,7 +45,7 @@ func (s *CacheService) MovePackage(ctx *gin.Context) { } for { - complete, cacheInfos, err := s.svc.CacheSvc().WaitCacheMovePackage(*req.NodeID, taskID, time.Second*10) + complete, err := s.svc.CacheSvc().WaitCacheMovePackage(*req.NodeID, taskID, time.Second*10) if complete { if err != nil { log.Warnf("moving complete with: %s", err.Error()) @@ -55,9 +53,7 @@ func (s *CacheService) MovePackage(ctx *gin.Context) { return } - ctx.JSON(http.StatusOK, OK(CacheMovePackageResp{ - CacheInfos: cacheInfos, - })) + ctx.JSON(http.StatusOK, OK(CacheMovePackageResp{})) return } @@ -68,30 +64,3 @@ func (s *CacheService) MovePackage(ctx *gin.Context) { } } } - -type CacheGetPackageObjectCacheInfosReq struct { - UserID *cdssdk.UserID `form:"userID" binding:"required"` - PackageID *cdssdk.PackageID `form:"packageID" binding:"required"` -} - -type CacheGetPackageObjectCacheInfosResp = cdssdk.CacheGetPackageObjectCacheInfosResp - -func (s *CacheService) GetPackageObjectCacheInfos(ctx *gin.Context) { - log := logger.WithField("HTTP", "Cache.GetPackageObjectCacheInfos") - - var req CacheGetPackageObjectCacheInfosReq - if err := ctx.ShouldBindQuery(&req); err != nil { - log.Warnf("binding body: %s", err.Error()) - ctx.JSON(http.StatusBadRequest, Failed(errorcode.BadArgument, "missing argument or invalid argument")) - return - } - - infos, err := s.svc.CacheSvc().GetPackageObjectCacheInfos(*req.UserID, *req.PackageID) - if err != nil { - log.Warnf("getting package object cache infos: %s", err.Error()) - ctx.JSON(http.StatusOK, Failed(errorcode.OperationFailed, "get package object cache infos failed")) - return - } - - ctx.JSON(http.StatusOK, OK(CacheGetPackageObjectCacheInfosResp{Infos: infos})) -} diff --git a/client/internal/http/object.go b/client/internal/http/object.go index 458b609..9b330ef 100644 --- a/client/internal/http/object.go +++ b/client/internal/http/object.go @@ -73,3 +73,29 @@ func (s *ObjectService) Download(ctx *gin.Context) { return true }) } + +type GetPackageObjectsReq struct { + UserID *cdssdk.UserID `form:"userID" binding:"required"` + PackageID *cdssdk.PackageID `form:"packageID" binding:"required"` +} +type GetPackageObjectsResp = cdssdk.ObjectGetPackageObjectsResp + +func (s *ObjectService) GetPackageObjects(ctx *gin.Context) { + log := logger.WithField("HTTP", "Object.GetPackageObjects") + + var req GetPackageObjectsReq + if err := ctx.ShouldBindQuery(&req); err != nil { + log.Warnf("binding body: %s", err.Error()) + ctx.JSON(http.StatusBadRequest, Failed(errorcode.BadArgument, "missing argument or invalid argument")) + return + } + + objs, err := s.svc.ObjectSvc().GetPackageObjects(*req.UserID, *req.PackageID) + if err != nil { + log.Warnf("getting package objects: %s", err.Error()) + ctx.JSON(http.StatusOK, Failed(errorcode.OperationFailed, "get package object failed")) + return + } + + ctx.JSON(http.StatusOK, OK(GetPackageObjectsResp{Objects: objs})) +} diff --git a/client/internal/http/server.go b/client/internal/http/server.go index aeda782..ee1954f 100644 --- a/client/internal/http/server.go +++ b/client/internal/http/server.go @@ -3,6 +3,7 @@ package http import ( "github.com/gin-gonic/gin" "gitlink.org.cn/cloudream/common/pkgs/logger" + cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" "gitlink.org.cn/cloudream/storage/client/internal/services" ) @@ -39,6 +40,7 @@ func (s *Server) Serve() error { func (s *Server) initRouters() { s.engine.GET("/object/download", s.ObjectSvc().Download) + s.engine.GET(cdssdk.ObjectGetPackageObjectsPath, s.ObjectSvc().GetPackageObjects) s.engine.GET("/package/get", s.PackageSvc().Get) s.engine.POST("/package/upload", s.PackageSvc().Upload) @@ -50,6 +52,5 @@ func (s *Server) initRouters() { s.engine.POST("/storage/createPackage", s.StorageSvc().CreatePackage) s.engine.GET("/storage/getInfo", s.StorageSvc().GetInfo) - s.engine.POST("/cache/movePackage", s.CacheSvc().MovePackage) - s.engine.GET("/cache/getPackageObjectCacheInfos", s.CacheSvc().GetPackageObjectCacheInfos) + s.engine.POST(cdssdk.CacheMovePackagePath, s.CacheSvc().MovePackage) } diff --git a/client/internal/services/cacah.go b/client/internal/services/cacah.go index 1e199c0..ac37be9 100644 --- a/client/internal/services/cacah.go +++ b/client/internal/services/cacah.go @@ -8,7 +8,6 @@ import ( stgglb "gitlink.org.cn/cloudream/storage/common/globals" agtmq "gitlink.org.cn/cloudream/storage/common/pkgs/mq/agent" - coormq "gitlink.org.cn/cloudream/storage/common/pkgs/mq/coordinator" ) type CacheService struct { @@ -34,40 +33,25 @@ func (svc *CacheService) StartCacheMovePackage(userID cdssdk.UserID, packageID c return startResp.TaskID, nil } -func (svc *CacheService) WaitCacheMovePackage(nodeID cdssdk.NodeID, taskID string, waitTimeout time.Duration) (bool, []cdssdk.ObjectCacheInfo, error) { +func (svc *CacheService) WaitCacheMovePackage(nodeID cdssdk.NodeID, taskID string, waitTimeout time.Duration) (bool, error) { agentCli, err := stgglb.AgentMQPool.Acquire(nodeID) if err != nil { - return true, nil, fmt.Errorf("new agent client: %w", err) + return true, fmt.Errorf("new agent client: %w", err) } defer stgglb.AgentMQPool.Release(agentCli) waitResp, err := agentCli.WaitCacheMovePackage(agtmq.NewWaitCacheMovePackage(taskID, waitTimeout.Milliseconds())) if err != nil { - return true, nil, fmt.Errorf("wait cache move package: %w", err) + return true, fmt.Errorf("wait cache move package: %w", err) } if !waitResp.IsComplete { - return false, nil, nil + return false, nil } if waitResp.Error != "" { - return true, nil, fmt.Errorf("%s", waitResp.Error) + return true, fmt.Errorf("%s", waitResp.Error) } - return true, waitResp.CacheInfos, nil -} - -func (svc *CacheService) GetPackageObjectCacheInfos(userID cdssdk.UserID, packageID cdssdk.PackageID) ([]cdssdk.ObjectCacheInfo, error) { - coorCli, err := stgglb.CoordinatorMQPool.Acquire() - if err != nil { - return nil, fmt.Errorf("new coordinator client: %w", err) - } - defer stgglb.CoordinatorMQPool.Release(coorCli) - - getResp, err := coorCli.GetPackageObjectCacheInfos(coormq.NewGetPackageObjectCacheInfos(userID, packageID)) - if err != nil { - return nil, fmt.Errorf("requesting to coodinator: %w", err) - } - - return getResp.Infos, nil + return true, nil } diff --git a/client/internal/services/package.go b/client/internal/services/package.go index 54e9a49..51e5d41 100644 --- a/client/internal/services/package.go +++ b/client/internal/services/package.go @@ -70,7 +70,7 @@ func (svc *PackageService) DownloadPackage(userID cdssdk.UserID, packageID cdssd return nil, fmt.Errorf("getting package object details: %w", err) } - iter := iterator.NewObjectIterator(getObjsResp.Objects, &iterator.DownloadContext{ + iter := iterator.NewDownloadObjectIterator(getObjsResp.Objects, &iterator.DownloadContext{ Distlock: svc.DistLock, }) @@ -156,9 +156,8 @@ func (svc *PackageService) GetCachedNodes(userID cdssdk.UserID, packageID cdssdk } tmp := cdssdk.PackageCachingInfo{ - NodeInfos: resp.NodeInfos, - PackageSize: resp.PackageSize, - RedunancyType: resp.RedunancyType, + NodeInfos: resp.NodeInfos, + PackageSize: resp.PackageSize, } return tmp, nil } diff --git a/common/assets/scripts/create_database.sql b/common/assets/scripts/create_database.sql index 3f1bd2e..35f25cb 100644 --- a/common/assets/scripts/create_database.sql +++ b/common/assets/scripts/create_database.sql @@ -121,6 +121,7 @@ create table Object ( PackageID int not null comment '包ID', Path varchar(500) not null comment '对象路径', Size bigint not null comment '对象大小(Byte)', + FileHash varchar(100) not null comment '完整对象的FileHash', UNIQUE KEY PackagePath (PackageID, Path) ) comment = '对象表'; @@ -136,7 +137,8 @@ create table Cache ( FileHash varchar(100) not null comment '编码块块ID', NodeID int not null comment '节点ID', State varchar(100) not null comment '状态', - CacheTime timestamp not null comment '缓存时间', + FrozenTime timestamp comment '文件被冻结的时间', + CreateTime timestamp not null comment '缓存时间', Priority int not null comment '编码块优先级', primary key(FileHash, NodeID) ) comment = '缓存表'; diff --git a/common/models/models.go b/common/models/models.go index 64098ab..e6d7988 100644 --- a/common/models/models.go +++ b/common/models/models.go @@ -30,14 +30,16 @@ func NewObjectBlockDetail(objID cdssdk.ObjectID, index int, fileHash string, nod } type ObjectDetail struct { - Object cdssdk.Object `json:"object"` - Blocks []ObjectBlockDetail `json:"blocks"` + Object cdssdk.Object `json:"object"` + CachedNodeIDs []cdssdk.NodeID `json:"cachedNodeIDs"` // 文件的完整数据在哪些节点上缓存 + Blocks []ObjectBlockDetail `json:"blocks"` } -func NewObjectDetail(object cdssdk.Object, blocks []ObjectBlockDetail) ObjectDetail { +func NewObjectDetail(object cdssdk.Object, cachedNodeIDs []cdssdk.NodeID, blocks []ObjectBlockDetail) ObjectDetail { return ObjectDetail{ - Object: object, - Blocks: blocks, + Object: object, + CachedNodeIDs: cachedNodeIDs, + Blocks: blocks, } } diff --git a/common/pkgs/cmd/create_ec_package.go b/common/pkgs/cmd/create_ec_package.go index c94da80..78af19c 100644 --- a/common/pkgs/cmd/create_ec_package.go +++ b/common/pkgs/cmd/create_ec_package.go @@ -1,11 +1,9 @@ package cmd import ( - "errors" "fmt" "io" "math/rand" - "sync" "time" "github.com/samber/lo" @@ -14,12 +12,9 @@ import ( "gitlink.org.cn/cloudream/common/pkgs/logger" cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" - myio "gitlink.org.cn/cloudream/common/utils/io" stgglb "gitlink.org.cn/cloudream/storage/common/globals" - stgmod "gitlink.org.cn/cloudream/storage/common/models" "gitlink.org.cn/cloudream/storage/common/pkgs/db/model" "gitlink.org.cn/cloudream/storage/common/pkgs/distlock/reqbuilder" - "gitlink.org.cn/cloudream/storage/common/pkgs/ec" "gitlink.org.cn/cloudream/storage/common/pkgs/iterator" agtmq "gitlink.org.cn/cloudream/storage/common/pkgs/mq/agent" coormq "gitlink.org.cn/cloudream/storage/common/pkgs/mq/coordinator" @@ -160,26 +155,6 @@ func chooseUploadNode(nodes []UploadNodeInfo, nodeAffinity *cdssdk.NodeID) Uploa return nodes[rand.Intn(len(nodes))] } -func shuffleNodes(uploadNodes []UploadNodeInfo, extendTo int) []UploadNodeInfo { - for i := len(uploadNodes); i < extendTo; i++ { - uploadNodes = append(uploadNodes, uploadNodes[rand.Intn(len(uploadNodes))]) - } - - // 随机排列上传节点 - rand.Shuffle(len(uploadNodes), func(i, j int) { - uploadNodes[i], uploadNodes[j] = uploadNodes[j], uploadNodes[i] - }) - - return uploadNodes -} - -func chooseRedundancy(obj *iterator.IterUploadingObject, userNodes []UploadNodeInfo, nodeAffinity *cdssdk.NodeID) (cdssdk.Redundancy, []UploadNodeInfo, error) { - // TODO 更好的算法 - // uploadNodes = shuffleNodes(uploadNodes, ecRed.N) - uploadNode := chooseUploadNode(userNodes, nodeAffinity) - return cdssdk.NewRepRedundancy(), []UploadNodeInfo{uploadNode}, nil -} - func uploadAndUpdatePackage(packageID cdssdk.PackageID, objectIter iterator.UploadingObjectIterator, userNodes []UploadNodeInfo, nodeAffinity *cdssdk.NodeID) ([]ObjectUploadResult, error) { coorCli, err := stgglb.CoordinatorMQPool.Acquire() if err != nil { @@ -200,17 +175,11 @@ func uploadAndUpdatePackage(packageID cdssdk.PackageID, objectIter iterator.Uplo err = func() error { defer objInfo.File.Close() - red, uploadNodes, err := chooseRedundancy(objInfo, userNodes, nodeAffinity) - if err != nil { - return fmt.Errorf("choosing redundancy: %w", err) - } + uploadNode := chooseUploadNode(userNodes, nodeAffinity) - var addInfo *coormq.AddObjectInfo - switch r := red.(type) { - case *cdssdk.RepRedundancy: - addInfo, err = uploadRepObject(objInfo, uploadNodes, r) - case *cdssdk.ECRedundancy: - addInfo, err = uploadECObject(objInfo, uploadNodes, r) + fileHash, err := uploadFile(objInfo.File, uploadNode) + if err != nil { + return fmt.Errorf("uploading file: %w", err) } uploadRets = append(uploadRets, ObjectUploadResult{ @@ -221,7 +190,7 @@ func uploadAndUpdatePackage(packageID cdssdk.PackageID, objectIter iterator.Uplo return fmt.Errorf("uploading object: %w", err) } - adds = append(adds, *addInfo) + adds = append(adds, coormq.NewAddObjectInfo(objInfo.Path, objInfo.Size, fileHash, uploadNode.Node.NodeID)) return nil }() if err != nil { @@ -237,98 +206,6 @@ func uploadAndUpdatePackage(packageID cdssdk.PackageID, objectIter iterator.Uplo return uploadRets, nil } -func uploadRepObject(obj *iterator.IterUploadingObject, uploadNodes []UploadNodeInfo, repRed *cdssdk.RepRedundancy) (*coormq.AddObjectInfo, error) { - clonedStrs := myio.Clone(obj.File, len(uploadNodes)) - - fileHashes := make([]string, len(uploadNodes)) - anyErrs := make([]error, len(uploadNodes)) - wg := sync.WaitGroup{} - for i := range uploadNodes { - idx := i - wg.Add(1) - - go func() { - defer wg.Done() - fileHashes[idx], anyErrs[idx] = uploadFile(clonedStrs[idx], uploadNodes[idx]) - }() - } - - wg.Wait() - - var uploadedNodeIDs []cdssdk.NodeID - var fileHash string - var errs []error - for i, e := range anyErrs { - if e != nil { - errs[i] = e - continue - } - - uploadedNodeIDs = append(uploadedNodeIDs, uploadNodes[i].Node.NodeID) - fileHash = fileHashes[i] - } - - if len(uploadedNodeIDs) == 0 { - return nil, fmt.Errorf("uploading file: %w", errors.Join(errs...)) - } - - info := coormq.NewAddObjectInfo(obj.Path, obj.Size, repRed, - []stgmod.ObjectBlockDetail{ - stgmod.NewObjectBlockDetail(0, 0, fileHash, uploadedNodeIDs, uploadedNodeIDs), - }) - return &info, nil -} - -func uploadECObject(obj *iterator.IterUploadingObject, uploadNodes []UploadNodeInfo, ecRed *cdssdk.ECRedundancy) (*coormq.AddObjectInfo, error) { - rs, err := ec.NewRs(ecRed.K, ecRed.N, ecRed.ChunkSize) - if err != nil { - return nil, err - } - - outputs := myio.ChunkedSplit(obj.File, ecRed.ChunkSize, ecRed.K, myio.ChunkedSplitOption{ - PaddingZeros: true, - }) - var readers []io.Reader - for _, o := range outputs { - readers = append(readers, o) - } - defer func() { - for _, o := range outputs { - o.Close() - } - }() - - encStrs := rs.EncodeAll(readers) - - wg := sync.WaitGroup{} - - blocks := make([]stgmod.ObjectBlockDetail, ecRed.N) - anyErrs := make([]error, ecRed.N) - - for i := range encStrs { - idx := i - wg.Add(1) - blocks[idx].Index = idx - blocks[idx].NodeIDs = []cdssdk.NodeID{uploadNodes[idx].Node.NodeID} - blocks[idx].CachedNodeIDs = []cdssdk.NodeID{uploadNodes[idx].Node.NodeID} - go func() { - defer wg.Done() - blocks[idx].FileHash, anyErrs[idx] = uploadFile(encStrs[idx], uploadNodes[idx]) - }() - } - - wg.Wait() - - for i, e := range anyErrs { - if e != nil { - return nil, fmt.Errorf("uploading file to node %d: %w", uploadNodes[i].Node.NodeID, e) - } - } - - info := coormq.NewAddObjectInfo(obj.Path, obj.Size, ecRed, blocks) - return &info, nil -} - func uploadFile(file io.Reader, uploadNode UploadNodeInfo) (string, error) { // 本地有IPFS,则直接从本地IPFS上传 if stgglb.IPFSPool != nil { diff --git a/common/pkgs/cmd/download_package.go b/common/pkgs/cmd/download_package.go index 1abcba5..a8e5914 100644 --- a/common/pkgs/cmd/download_package.go +++ b/common/pkgs/cmd/download_package.go @@ -44,7 +44,7 @@ func (t *DownloadPackage) Execute(ctx *DownloadPackageContext) error { return fmt.Errorf("getting package object details: %w", err) } - objIter := iterator.NewObjectIterator(getObjectDetails.Objects, &iterator.DownloadContext{ + objIter := iterator.NewDownloadObjectIterator(getObjectDetails.Objects, &iterator.DownloadContext{ Distlock: ctx.Distlock, }) defer objIter.Close() diff --git a/common/pkgs/db/cache.go b/common/pkgs/db/cache.go index c9bef4a..1c7301d 100644 --- a/common/pkgs/db/cache.go +++ b/common/pkgs/db/cache.go @@ -37,7 +37,7 @@ func (*CacheDB) GetNodeCaches(ctx SQLContext, nodeID cdssdk.NodeID) ([]model.Cac // CreateNew 创建一条新的缓存记录 func (*CacheDB) CreateNew(ctx SQLContext, fileHash string, nodeID cdssdk.NodeID) error { - _, err := ctx.Exec("insert into Cache values(?,?,?,?)", fileHash, nodeID, consts.CacheStatePinned, time.Now()) + _, err := ctx.Exec("insert into Cache values(?,?,?,?,?)", fileHash, nodeID, consts.CacheStatePinned, nil, time.Now()) if err != nil { return err } @@ -45,9 +45,26 @@ func (*CacheDB) CreateNew(ctx SQLContext, fileHash string, nodeID cdssdk.NodeID) return nil } +func (*CacheDB) SetPackageObjectFrozen(ctx SQLContext, pkgID cdssdk.PackageID, nodeID cdssdk.NodeID) error { + var nowTime = time.Now() + _, err := ctx.Exec( + "insert into Cache(FileHash,NodeID,State,FrozenTime,CreateTime,Priority)"+ + " select FileHash, ?, ?, ?, ?, ? from Object where PackageID = ?"+ + " on duplicate key update State = ?, FrozenTime = ?", + nodeID, consts.CacheStatePinned, &nowTime, &nowTime, 0, + pkgID, + consts.CacheStatePinned, &nowTime, + ) + + return err +} + // CreatePinned 创建一条缓存记录,如果已存在,但不是pinned状态,则将其设置为pin状态 func (*CacheDB) CreatePinned(ctx SQLContext, fileHash string, nodeID cdssdk.NodeID, priority int) error { - _, err := ctx.Exec("replace into Cache values(?,?,?,?,?)", fileHash, nodeID, consts.CacheStatePinned, time.Now(), priority) + _, err := ctx.Exec("insert into Cache values(?,?,?,?,?,?) on duplicate key update State = ?, CreateTime = ?, Priority = ?", + fileHash, nodeID, consts.CacheStatePinned, nil, time.Now(), priority, + consts.CacheStatePinned, time.Now(), priority, + ) return err } @@ -56,16 +73,17 @@ func (*CacheDB) BatchCreatePinned(ctx SQLContext, fileHashes []string, nodeID cd var nowTime = time.Now() for _, hash := range fileHashes { caches = append(caches, model.Cache{ - FileHash: hash, - NodeID: nodeID, - State: consts.CacheStatePinned, - CacheTime: nowTime, - Priority: priority, + FileHash: hash, + NodeID: nodeID, + State: consts.CacheStatePinned, + FrozenTime: nil, + CreateTime: nowTime, + Priority: priority, }) } - _, err := sqlx.NamedExec(ctx, "insert into Cache(FileHash,NodeID,State,CacheTime,Priority) values(:FileHash,:NodeID,:State,:CacheTime,:Priority)"+ - " on duplicate key update State=values(State), CacheTime=values(CacheTime), Priority=values(Priority)", + _, err := sqlx.NamedExec(ctx, "insert into Cache(FileHash,NodeID,State,FrozenTime,CreateTime,Priority) values(:FileHash,:NodeID,:State,:FrozenTime,:CreateTime,:Priority)"+ + " on duplicate key update State=values(State), CreateTime=values(CreateTime), Priority=values(Priority)", caches, ) return err @@ -73,7 +91,7 @@ func (*CacheDB) BatchCreatePinned(ctx SQLContext, fileHashes []string, nodeID cd // Create 创建一条Temp状态的缓存记录,如果已存在则不产生效果 func (*CacheDB) CreateTemp(ctx SQLContext, fileHash string, nodeID cdssdk.NodeID) error { - _, err := ctx.Exec("insert ignore into Cache values(?,?,?,?)", fileHash, nodeID, consts.CacheStateTemp, time.Now()) + _, err := ctx.Exec("insert ignore into Cache values(?,?,?,?,?)", fileHash, nodeID, nil, consts.CacheStateTemp, time.Now()) return err } @@ -107,8 +125,9 @@ func (*CacheDB) FindCachingFileUserNodes(ctx SQLContext, userID cdssdk.NodeID, f return x, err } +// 设置一条记录为Temp,对Frozen的记录无效 func (*CacheDB) SetTemp(ctx SQLContext, fileHash string, nodeID cdssdk.NodeID) error { - _, err := ctx.Exec("update Cache set State = ?, CacheTime = ? where FileHash = ? and NodeID = ?", + _, err := ctx.Exec("update Cache set State = ?, CreateTime = ? where FileHash = ? and NodeID = ? and FrozenTime = null", consts.CacheStateTemp, time.Now(), fileHash, diff --git a/common/pkgs/db/model/model.go b/common/pkgs/db/model/model.go index bb0c13e..1bf8cc0 100644 --- a/common/pkgs/db/model/model.go +++ b/common/pkgs/db/model/model.go @@ -68,11 +68,12 @@ type Object = cdssdk.Object type ObjectBlock = stgmod.ObjectBlock type Cache struct { - FileHash string `db:"FileHash" json:"fileHash"` - NodeID cdssdk.NodeID `db:"NodeID" json:"nodeID"` - State string `db:"State" json:"state"` - CacheTime time.Time `db:"CacheTime" json:"cacheTime"` - Priority int `db:"Priority" json:"priority"` + FileHash string `db:"FileHash" json:"fileHash"` + NodeID cdssdk.NodeID `db:"NodeID" json:"nodeID"` + State string `db:"State" json:"state"` + FrozenTime *time.Time `db:"FrozenTime" json:"frozenTime"` + CreateTime time.Time `db:"CreateTime" json:"createTime"` + Priority int `db:"Priority" json:"priority"` } const ( diff --git a/common/pkgs/db/object.go b/common/pkgs/db/object.go index 8c84632..0fbeb23 100644 --- a/common/pkgs/db/object.go +++ b/common/pkgs/db/object.go @@ -23,8 +23,8 @@ func (db *ObjectDB) GetByID(ctx SQLContext, objectID cdssdk.ObjectID) (model.Obj return ret, err } -func (db *ObjectDB) Create(ctx SQLContext, packageID cdssdk.PackageID, path string, size int64, redundancy cdssdk.Redundancy) (int64, error) { - sql := "insert into Object(PackageID, Path, Size, Redundancy) values(?,?,?,?)" +func (db *ObjectDB) Create(ctx SQLContext, packageID cdssdk.PackageID, path string, size int64, fileHash string, redundancy cdssdk.Redundancy) (int64, error) { + sql := "insert into Object(PackageID, Path, Size, FileHash, Redundancy) values(?,?,?,?,?)" ret, err := ctx.Exec(sql, packageID, path, size, redundancy) if err != nil { @@ -40,10 +40,13 @@ func (db *ObjectDB) Create(ctx SQLContext, packageID cdssdk.PackageID, path stri } // 创建或者更新记录,返回值true代表是创建,false代表是更新 -func (db *ObjectDB) CreateOrUpdate(ctx SQLContext, packageID cdssdk.PackageID, path string, size int64, redundancy cdssdk.Redundancy) (cdssdk.ObjectID, bool, error) { - sql := "insert into Object(PackageID, Path, Size, Redundancy) values(?,?,?,?) on duplicate key update Size = ?, Redundancy = ?" +func (db *ObjectDB) CreateOrUpdate(ctx SQLContext, packageID cdssdk.PackageID, path string, size int64, fileHash string) (cdssdk.ObjectID, bool, error) { + // 首次上传Object时,默认使用Rep模式,即使是在更新一个已有的Object也是如此 + defRed := cdssdk.NewRepRedundancy() - ret, err := ctx.Exec(sql, packageID, path, size, redundancy, size, redundancy) + sql := "insert into Object(PackageID, Path, Size, FileHash, Redundancy) values(?,?,?,?,?) on duplicate key update Size = ?, FileHash = ?, Redundancy = ?" + + ret, err := ctx.Exec(sql, packageID, path, size, fileHash, defRed, size, fileHash, defRed) if err != nil { return 0, false, fmt.Errorf("insert object failed, err: %w", err) } @@ -94,7 +97,7 @@ func (db *ObjectDB) BatchAdd(ctx SQLContext, packageID cdssdk.PackageID, objs [] objIDs := make([]cdssdk.ObjectID, 0, len(objs)) for _, obj := range objs { // 创建对象的记录 - objID, isCreate, err := db.CreateOrUpdate(ctx, packageID, obj.Path, obj.Size, obj.Redundancy) + objID, isCreate, err := db.CreateOrUpdate(ctx, packageID, obj.Path, obj.Size, obj.FileHash) if err != nil { return nil, fmt.Errorf("creating object: %w", err) } @@ -108,23 +111,16 @@ func (db *ObjectDB) BatchAdd(ctx SQLContext, packageID cdssdk.PackageID, objs [] } } - // 创建编码块的记录 - for _, b := range obj.Blocks { - for _, n := range b.NodeIDs { - err := db.ObjectBlock().Create(ctx, objID, b.Index, b.FileHash, n) - if err != nil { - return nil, fmt.Errorf("creating object block: %w", err) - } - } + // 首次上传默认使用不分块的rep模式 + err = db.ObjectBlock().Create(ctx, objID, 0, obj.FileHash, obj.NodeID) + if err != nil { + return nil, fmt.Errorf("creating object block: %w", err) + } - // 创建缓存记录 - priority := 0 //优先级暂时设置为0 - for _, nodeID := range b.CachedNodeIDs { - err = db.Cache().CreatePinned(ctx, b.FileHash, nodeID, priority) - if err != nil { - return nil, fmt.Errorf("creating cache: %w", err) - } - } + // 创建缓存记录 + err = db.Cache().CreatePinned(ctx, obj.FileHash, obj.NodeID, 0) + if err != nil { + return nil, fmt.Errorf("creating cache: %w", err) } } diff --git a/common/pkgs/db/object_block.go b/common/pkgs/db/object_block.go index f1a6f7d..5473b0d 100644 --- a/common/pkgs/db/object_block.go +++ b/common/pkgs/db/object_block.go @@ -59,15 +59,25 @@ func (db *ObjectBlockDB) GetPackageBlockDetails(ctx SQLContext, packageID cdssdk rets := make([]stgmod.ObjectDetail, 0, len(objs)) for _, obj := range objs { - var tmpRets []struct { + var cachedObjectNodeIDs []cdssdk.NodeID + err := sqlx.Select(ctx, &cachedObjectNodeIDs, + "select NodeID from Object, Cache where"+ + " ObjectID = ? and Object.FileHash = Cache.FileHash", + obj.ObjectID, + ) + if err != nil { + return nil, err + } + + var blockTmpRets []struct { Index int `db:"Index"` FileHashes string `db:"FileHashes"` NodeIDs string `db:"NodeIDs"` CachedNodeIDs *string `db:"CachedNodeIDs"` } - err := sqlx.Select(ctx, - &tmpRets, + err = sqlx.Select(ctx, + &blockTmpRets, "select ObjectBlock.Index, group_concat(distinct ObjectBlock.FileHash) as FileHashes, group_concat(distinct ObjectBlock.NodeID) as NodeIDs, group_concat(distinct Cache.NodeID) as CachedNodeIDs"+ " from ObjectBlock left join Cache on ObjectBlock.FileHash = Cache.FileHash"+ " where ObjectID = ? group by ObjectBlock.Index", @@ -77,21 +87,21 @@ func (db *ObjectBlockDB) GetPackageBlockDetails(ctx SQLContext, packageID cdssdk return nil, err } - blocks := make([]stgmod.ObjectBlockDetail, 0, len(tmpRets)) - for _, tmp := range tmpRets { + blocks := make([]stgmod.ObjectBlockDetail, 0, len(blockTmpRets)) + for _, tmp := range blockTmpRets { var block stgmod.ObjectBlockDetail block.Index = tmp.Index - block.FileHash = splitConcatedToString(tmp.FileHashes)[0] - block.NodeIDs = splitConcatedToNodeID(tmp.NodeIDs) + block.FileHash = splitConcatedFileHash(tmp.FileHashes)[0] + block.NodeIDs = splitConcatedNodeID(tmp.NodeIDs) if tmp.CachedNodeIDs != nil { - block.CachedNodeIDs = splitConcatedToNodeID(*tmp.CachedNodeIDs) + block.CachedNodeIDs = splitConcatedNodeID(*tmp.CachedNodeIDs) } blocks = append(blocks, block) } - rets = append(rets, stgmod.NewObjectDetail(obj, blocks)) + rets = append(rets, stgmod.NewObjectDetail(obj, cachedObjectNodeIDs, blocks)) } return rets, nil @@ -99,7 +109,7 @@ func (db *ObjectBlockDB) GetPackageBlockDetails(ctx SQLContext, packageID cdssdk // 按逗号切割字符串,并将每一个部分解析为一个int64的ID。 // 注:需要外部保证分隔的每一个部分都是正确的10进制数字格式 -func splitConcatedToNodeID(idStr string) []cdssdk.NodeID { +func splitConcatedNodeID(idStr string) []cdssdk.NodeID { idStrs := strings.Split(idStr, ",") ids := make([]cdssdk.NodeID, 0, len(idStrs)) @@ -113,7 +123,7 @@ func splitConcatedToNodeID(idStr string) []cdssdk.NodeID { } // 按逗号切割字符串 -func splitConcatedToString(idStr string) []string { +func splitConcatedFileHash(idStr string) []string { idStrs := strings.Split(idStr, ",") return idStrs } diff --git a/common/pkgs/iterator/ec_object_iterator.go b/common/pkgs/iterator/download_object_iterator.go similarity index 90% rename from common/pkgs/iterator/ec_object_iterator.go rename to common/pkgs/iterator/download_object_iterator.go index 9cbafc2..7683bd8 100644 --- a/common/pkgs/iterator/ec_object_iterator.go +++ b/common/pkgs/iterator/download_object_iterator.go @@ -36,7 +36,7 @@ type DownloadNodeInfo struct { type DownloadContext struct { Distlock *distlock.Service } -type ObjectIterator struct { +type DownloadObjectIterator struct { OnClosing func() objectDetails []stgmodels.ObjectDetail @@ -45,14 +45,14 @@ type ObjectIterator struct { downloadCtx *DownloadContext } -func NewObjectIterator(objectDetails []stgmodels.ObjectDetail, downloadCtx *DownloadContext) *ObjectIterator { - return &ObjectIterator{ +func NewDownloadObjectIterator(objectDetails []stgmodels.ObjectDetail, downloadCtx *DownloadContext) *DownloadObjectIterator { + return &DownloadObjectIterator{ objectDetails: objectDetails, downloadCtx: downloadCtx, } } -func (i *ObjectIterator) MoveNext() (*IterDownloadingObject, error) { +func (i *DownloadObjectIterator) MoveNext() (*IterDownloadingObject, error) { coorCli, err := stgglb.CoordinatorMQPool.Acquire() if err != nil { return nil, fmt.Errorf("new coordinator client: %w", err) @@ -68,7 +68,7 @@ func (i *ObjectIterator) MoveNext() (*IterDownloadingObject, error) { return item, err } -func (iter *ObjectIterator) doMove(coorCli *coormq.Client) (*IterDownloadingObject, error) { +func (iter *DownloadObjectIterator) doMove(coorCli *coormq.Client) (*IterDownloadingObject, error) { obj := iter.objectDetails[iter.currentIndex] switch red := obj.Object.Redundancy.(type) { @@ -98,7 +98,7 @@ func (iter *ObjectIterator) doMove(coorCli *coormq.Client) (*IterDownloadingObje return nil, fmt.Errorf("unsupported redundancy type: %v", reflect.TypeOf(obj.Object.Redundancy)) } -func (i *ObjectIterator) Close() { +func (i *DownloadObjectIterator) Close() { if i.OnClosing != nil { i.OnClosing() } @@ -107,7 +107,7 @@ func (i *ObjectIterator) Close() { // chooseDownloadNode 选择一个下载节点 // 1. 从与当前客户端相同地域的节点中随机选一个 // 2. 没有用的话从所有节点中随机选一个 -func (i *ObjectIterator) chooseDownloadNode(entries []DownloadNodeInfo) DownloadNodeInfo { +func (i *DownloadObjectIterator) chooseDownloadNode(entries []DownloadNodeInfo) DownloadNodeInfo { sameLocationEntries := lo.Filter(entries, func(e DownloadNodeInfo, i int) bool { return e.IsSameLocation }) if len(sameLocationEntries) > 0 { return sameLocationEntries[rand.Intn(len(sameLocationEntries))] @@ -116,7 +116,7 @@ func (i *ObjectIterator) chooseDownloadNode(entries []DownloadNodeInfo) Download return entries[rand.Intn(len(entries))] } -func (iter *ObjectIterator) downloadRepObject(coorCli *coormq.Client, ctx *DownloadContext, obj stgmodels.ObjectDetail, repRed *cdssdk.RepRedundancy) (io.ReadCloser, error) { +func (iter *DownloadObjectIterator) downloadRepObject(coorCli *coormq.Client, ctx *DownloadContext, obj stgmodels.ObjectDetail, repRed *cdssdk.RepRedundancy) (io.ReadCloser, error) { //采取直接读,优先选内网节点 var chosenNodes []DownloadNodeInfo for i := range obj.Blocks { @@ -159,7 +159,7 @@ func (iter *ObjectIterator) downloadRepObject(coorCli *coormq.Client, ctx *Downl }), nil } -func (iter *ObjectIterator) downloadECObject(coorCli *coormq.Client, ctx *DownloadContext, obj stgmodels.ObjectDetail, ecRed *cdssdk.ECRedundancy) (io.ReadCloser, error) { +func (iter *DownloadObjectIterator) downloadECObject(coorCli *coormq.Client, ctx *DownloadContext, obj stgmodels.ObjectDetail, ecRed *cdssdk.ECRedundancy) (io.ReadCloser, error) { //采取直接读,优先选内网节点 var chosenNodes []DownloadNodeInfo var chosenBlocks []stgmodels.ObjectBlockDetail diff --git a/common/pkgs/mq/agent/cache.go b/common/pkgs/mq/agent/cache.go index 6976807..dd0d3cb 100644 --- a/common/pkgs/mq/agent/cache.go +++ b/common/pkgs/mq/agent/cache.go @@ -94,9 +94,8 @@ type WaitCacheMovePackage struct { } type WaitCacheMovePackageResp struct { mq.MessageBodyBase - IsComplete bool `json:"isComplete"` - Error string `json:"error"` - CacheInfos []cdssdk.ObjectCacheInfo `json:"cacheInfos"` + IsComplete bool `json:"isComplete"` + Error string `json:"error"` } func NewWaitCacheMovePackage(taskID string, waitTimeoutMs int64) *WaitCacheMovePackage { @@ -105,11 +104,10 @@ func NewWaitCacheMovePackage(taskID string, waitTimeoutMs int64) *WaitCacheMoveP WaitTimeoutMs: waitTimeoutMs, } } -func NewWaitCacheMovePackageResp(isComplete bool, err string, cacheInfos []cdssdk.ObjectCacheInfo) *WaitCacheMovePackageResp { +func NewWaitCacheMovePackageResp(isComplete bool, err string) *WaitCacheMovePackageResp { return &WaitCacheMovePackageResp{ IsComplete: isComplete, Error: err, - CacheInfos: cacheInfos, } } func (client *Client) WaitCacheMovePackage(msg *WaitCacheMovePackage, opts ...mq.RequestOption) (*WaitCacheMovePackageResp, error) { diff --git a/common/pkgs/mq/coordinator/cache.go b/common/pkgs/mq/coordinator/cache.go index facda76..92acbf6 100644 --- a/common/pkgs/mq/coordinator/cache.go +++ b/common/pkgs/mq/coordinator/cache.go @@ -7,8 +7,6 @@ import ( type CacheService interface { CachePackageMoved(msg *CachePackageMoved) (*CachePackageMovedResp, *mq.CodeMessage) - - GetPackageObjectCacheInfos(msg *GetPackageObjectCacheInfos) (*GetPackageObjectCacheInfosResp, *mq.CodeMessage) } // Package的Object移动到了节点的Cache中 @@ -16,19 +14,17 @@ var _ = Register(Service.CachePackageMoved) type CachePackageMoved struct { mq.MessageBodyBase - PackageID cdssdk.PackageID `json:"packageID"` - NodeID cdssdk.NodeID `json:"nodeID"` - FileHashes []string `json:"fileHashes"` + PackageID cdssdk.PackageID `json:"packageID"` + NodeID cdssdk.NodeID `json:"nodeID"` } type CachePackageMovedResp struct { mq.MessageBodyBase } -func NewCachePackageMoved(packageID cdssdk.PackageID, nodeID cdssdk.NodeID, fileHashes []string) *CachePackageMoved { +func NewCachePackageMoved(packageID cdssdk.PackageID, nodeID cdssdk.NodeID) *CachePackageMoved { return &CachePackageMoved{ - PackageID: packageID, - NodeID: nodeID, - FileHashes: fileHashes, + PackageID: packageID, + NodeID: nodeID, } } func NewCachePackageMovedResp() *CachePackageMovedResp { @@ -37,31 +33,3 @@ func NewCachePackageMovedResp() *CachePackageMovedResp { func (client *Client) CachePackageMoved(msg *CachePackageMoved) (*CachePackageMovedResp, error) { return mq.Request(Service.CachePackageMoved, client.rabbitCli, msg) } - -// 获取Package中所有Object的FileHash -var _ = Register(Service.GetPackageObjectCacheInfos) - -type GetPackageObjectCacheInfos struct { - mq.MessageBodyBase - UserID cdssdk.UserID `json:"userID"` - PackageID cdssdk.PackageID `json:"packageID"` -} -type GetPackageObjectCacheInfosResp struct { - mq.MessageBodyBase - Infos []cdssdk.ObjectCacheInfo -} - -func NewGetPackageObjectCacheInfos(userID cdssdk.UserID, packageID cdssdk.PackageID) *GetPackageObjectCacheInfos { - return &GetPackageObjectCacheInfos{ - UserID: userID, - PackageID: packageID, - } -} -func NewGetPackageObjectCacheInfosResp(infos []cdssdk.ObjectCacheInfo) *GetPackageObjectCacheInfosResp { - return &GetPackageObjectCacheInfosResp{ - Infos: infos, - } -} -func (client *Client) GetPackageObjectCacheInfos(msg *GetPackageObjectCacheInfos) (*GetPackageObjectCacheInfosResp, error) { - return mq.Request(Service.GetPackageObjectCacheInfos, client.rabbitCli, msg) -} diff --git a/common/pkgs/mq/coordinator/package.go b/common/pkgs/mq/coordinator/package.go index 7962f97..a662edb 100644 --- a/common/pkgs/mq/coordinator/package.go +++ b/common/pkgs/mq/coordinator/package.go @@ -4,7 +4,6 @@ import ( "gitlink.org.cn/cloudream/common/pkgs/mq" cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" - stgmod "gitlink.org.cn/cloudream/storage/common/models" "gitlink.org.cn/cloudream/storage/common/pkgs/db/model" ) @@ -93,10 +92,10 @@ type UpdatePackageResp struct { mq.MessageBodyBase } type AddObjectInfo struct { - Path string `json:"path"` - Size int64 `json:"size,string"` - Redundancy cdssdk.Redundancy `json:"redundancy"` - Blocks []stgmod.ObjectBlockDetail `json:"blocks"` + Path string `json:"path"` + Size int64 `json:"size,string"` + FileHash string `json:"fileHash"` + NodeID cdssdk.NodeID `json:"nodeID"` } func NewUpdatePackage(packageID cdssdk.PackageID, adds []AddObjectInfo, deletes []cdssdk.ObjectID) *UpdatePackage { @@ -109,12 +108,12 @@ func NewUpdatePackage(packageID cdssdk.PackageID, adds []AddObjectInfo, deletes func NewUpdatePackageResp() *UpdatePackageResp { return &UpdatePackageResp{} } -func NewAddObjectInfo(path string, size int64, redundancy cdssdk.Redundancy, blocks []stgmod.ObjectBlockDetail) AddObjectInfo { +func NewAddObjectInfo(path string, size int64, fileHash string, nodeIDs cdssdk.NodeID) AddObjectInfo { return AddObjectInfo{ - Path: path, - Size: size, - Redundancy: redundancy, - Blocks: blocks, + Path: path, + Size: size, + FileHash: fileHash, + NodeID: nodeIDs, } } func (client *Client) UpdateECPackage(msg *UpdatePackage) (*UpdatePackageResp, error) { @@ -173,12 +172,11 @@ func NewGetPackageCachedNodes(userID cdssdk.UserID, packageID cdssdk.PackageID) } } -func NewGetPackageCachedNodesResp(nodeInfos []cdssdk.NodePackageCachingInfo, packageSize int64, redunancyType string) *GetPackageCachedNodesResp { +func NewGetPackageCachedNodesResp(nodeInfos []cdssdk.NodePackageCachingInfo, packageSize int64) *GetPackageCachedNodesResp { return &GetPackageCachedNodesResp{ PackageCachingInfo: cdssdk.PackageCachingInfo{ - NodeInfos: nodeInfos, - PackageSize: packageSize, - RedunancyType: redunancyType, + NodeInfos: nodeInfos, + PackageSize: packageSize, }, } } diff --git a/coordinator/internal/services/cache.go b/coordinator/internal/services/cache.go index bb40484..04ff586 100644 --- a/coordinator/internal/services/cache.go +++ b/coordinator/internal/services/cache.go @@ -8,22 +8,10 @@ import ( ) func (svc *Service) CachePackageMoved(msg *coormq.CachePackageMoved) (*coormq.CachePackageMovedResp, *mq.CodeMessage) { - pkg, err := svc.db.Package().GetByID(svc.db.SQLCtx(), msg.PackageID) - if err != nil { - logger.WithField("PackageID", msg.PackageID). - Warnf("getting package: %s", err.Error()) - return nil, mq.Failed(errorcode.OperationFailed, "get package failed") + if err := svc.db.Cache().SetPackageObjectFrozen(svc.db.SQLCtx(), msg.PackageID, msg.NodeID); err != nil { + logger.Warnf("setting package object frozen: %s", err.Error()) + return nil, mq.Failed(errorcode.OperationFailed, "set package object frozen failed") } - if pkg.Redundancy.IsRepInfo() { - // TODO 优先级 - if err := svc.db.Cache().BatchCreatePinned(svc.db.SQLCtx(), msg.FileHashes, msg.NodeID, 0); err != nil { - logger.Warnf("batch creating pinned cache: %s", err.Error()) - return nil, mq.Failed(errorcode.OperationFailed, "batch create pinned cache failed") - } - } - - // TODO EC的逻辑 - return mq.ReplyOK(coormq.NewCachePackageMovedResp()) } diff --git a/coordinator/internal/services/object.go b/coordinator/internal/services/object.go index 30a46bc..9b87316 100644 --- a/coordinator/internal/services/object.go +++ b/coordinator/internal/services/object.go @@ -7,45 +7,21 @@ import ( coormq "gitlink.org.cn/cloudream/storage/common/pkgs/mq/coordinator" ) -func (svc *Service) GetPackageObjectCacheInfos(msg *coormq.GetPackageObjectCacheInfos) (*coormq.GetPackageObjectCacheInfosResp, *mq.CodeMessage) { - pkg, err := svc.db.Package().GetUserPackage(svc.db.SQLCtx(), msg.UserID, msg.PackageID) +func (svc *Service) GetPackageObjects(msg *coormq.GetPackageObjects) (*coormq.GetPackageObjectsResp, *mq.CodeMessage) { + // TODO 检查用户是否有权限 + objs, err := svc.db.Object().GetPackageObjects(svc.db.SQLCtx(), msg.PackageID) if err != nil { logger.WithField("PackageID", msg.PackageID). - Warnf("getting package: %s", err.Error()) + Warnf("get package objects: %s", err.Error()) - return nil, mq.Failed(errorcode.OperationFailed, "get package failed") + return nil, mq.Failed(errorcode.OperationFailed, "get package objects failed") } - if pkg.Redundancy.IsRepInfo() { - infos, err := svc.db.ObjectRep().GetPackageObjectCacheInfos(svc.db.SQLCtx(), msg.PackageID) - if err != nil { - logger.WithField("PackageID", msg.PackageID). - Warnf("getting rep package object cache infos: %s", err.Error()) - - return nil, mq.Failed(errorcode.OperationFailed, "get rep package object cache infos failed") - } - - return mq.ReplyOK(coormq.NewGetPackageObjectCacheInfosResp(infos)) - } - // TODO EC - - return nil, mq.Failed(errorcode.OperationFailed, "not implement yet") -} - -func (svc *Service) GetPackageObjectRepData(msg *coormq.GetPackageObjectRepData) (*coormq.GetPackageObjectRepDataResp, *mq.CodeMessage) { - data, err := svc.db.ObjectRep().GetWithNodeIDInPackage(svc.db.SQLCtx(), msg.PackageID) - if err != nil { - logger.WithField("PackageID", msg.PackageID). - Warnf("query object rep and node id in package: %s", err.Error()) - - return nil, mq.Failed(errorcode.OperationFailed, "query object rep and node id in package failed") - } - - return mq.ReplyOK(coormq.NewGetPackageObjectRepDataResp(data)) + return mq.ReplyOK(coormq.NewGetPackageObjectsResp(objs)) } -func (svc *Service) GetPackageObjectECData(msg *coormq.GetPackageObjectECData) (*coormq.GetPackageObjectECDataResp, *mq.CodeMessage) { - data, err := svc.db.ObjectBlock().GetWithNodeIDInPackage(svc.db.SQLCtx(), msg.PackageID) +func (svc *Service) GetPackageObjectDetails(msg *coormq.GetPackageObjectDetails) (*coormq.GetPackageObjectDetailsResp, *mq.CodeMessage) { + data, err := svc.db.ObjectBlock().GetPackageBlockDetails(svc.db.SQLCtx(), msg.PackageID) if err != nil { logger.WithField("PackageID", msg.PackageID). Warnf("query object ec and node id in package: %s", err.Error()) @@ -53,5 +29,5 @@ func (svc *Service) GetPackageObjectECData(msg *coormq.GetPackageObjectECData) ( return nil, mq.Failed(errorcode.OperationFailed, "query object ec and node id in package failed") } - return mq.ReplyOK(coormq.NewGetPackageObjectECDataResp(data)) + return mq.ReplyOK(coormq.NewGetPackageObjectDetailsResp(data)) } diff --git a/coordinator/internal/services/package.go b/coordinator/internal/services/package.go index 6a30669..bc9724a 100644 --- a/coordinator/internal/services/package.go +++ b/coordinator/internal/services/package.go @@ -27,24 +27,11 @@ func (svc *Service) GetPackage(msg *coormq.GetPackage) (*coormq.GetPackageResp, return mq.ReplyOK(coormq.NewGetPackageResp(pkg)) } -func (svc *Service) GetPackageObjects(msg *coormq.GetPackageObjects) (*coormq.GetPackageObjectsResp, *mq.CodeMessage) { - // TODO 检查用户是否有权限 - objs, err := svc.db.Object().GetPackageObjects(svc.db.SQLCtx(), msg.PackageID) - if err != nil { - logger.WithField("PackageID", msg.PackageID). - Warnf("get package objects: %s", err.Error()) - - return nil, mq.Failed(errorcode.OperationFailed, "get package objects failed") - } - - return mq.ReplyOK(coormq.NewGetPackageObjectsResp(objs)) -} - func (svc *Service) CreatePackage(msg *coormq.CreatePackage) (*coormq.CreatePackageResp, *mq.CodeMessage) { - var pkgID int64 + var pkgID cdssdk.PackageID err := svc.db.DoTx(sql.LevelDefault, func(tx *sqlx.Tx) error { var err error - pkgID, err = svc.db.Package().Create(svc.db.SQLCtx(), msg.BucketID, msg.Name, msg.Redundancy) + pkgID, err = svc.db.Package().Create(svc.db.SQLCtx(), msg.BucketID, msg.Name) return err }) if err != nil { @@ -58,7 +45,7 @@ func (svc *Service) CreatePackage(msg *coormq.CreatePackage) (*coormq.CreatePack return mq.ReplyOK(coormq.NewCreatePackageResp(pkgID)) } -func (svc *Service) UpdateRepPackage(msg *coormq.UpdateRepPackage) (*coormq.UpdateRepPackageResp, *mq.CodeMessage) { +func (svc *Service) UpdateECPackage(msg *coormq.UpdatePackage) (*coormq.UpdatePackageResp, *mq.CodeMessage) { _, err := svc.db.Package().GetByID(svc.db.SQLCtx(), msg.PackageID) if err != nil { logger.WithField("PackageID", msg.PackageID). @@ -77,52 +64,7 @@ func (svc *Service) UpdateRepPackage(msg *coormq.UpdateRepPackage) (*coormq.Upda // 再执行添加操作 if len(msg.Adds) > 0 { - if _, err := svc.db.Object().BatchAddRep(tx, msg.PackageID, msg.Adds); err != nil { - return fmt.Errorf("adding objects: %w", err) - } - } - - return nil - }) - if err != nil { - logger.Warn(err.Error()) - return nil, mq.Failed(errorcode.OperationFailed, "update rep package failed") - } - - // 紧急任务 - var affectFileHashes []string - for _, add := range msg.Adds { - affectFileHashes = append(affectFileHashes, add.FileHash) - } - - err = svc.scanner.PostEvent(scmq.NewPostEvent(scevt.NewCheckRepCount(affectFileHashes), true, true)) - if err != nil { - logger.Warnf("post event to scanner failed, but this will not affect creating, err: %s", err.Error()) - } - - return mq.ReplyOK(coormq.NewUpdateRepPackageResp()) -} - -func (svc *Service) UpdateECPackage(msg *coormq.UpdateECPackage) (*coormq.UpdateECPackageResp, *mq.CodeMessage) { - _, err := svc.db.Package().GetByID(svc.db.SQLCtx(), msg.PackageID) - if err != nil { - logger.WithField("PackageID", msg.PackageID). - Warnf("get package: %s", err.Error()) - - return nil, mq.Failed(errorcode.OperationFailed, "get package failed") - } - - err = svc.db.DoTx(sql.LevelDefault, func(tx *sqlx.Tx) error { - // 先执行删除操作 - if len(msg.Deletes) > 0 { - if err := svc.db.Object().BatchDelete(tx, msg.Deletes); err != nil { - return fmt.Errorf("deleting objects: %w", err) - } - } - - // 再执行添加操作 - if len(msg.Adds) > 0 { - if _, err := svc.db.Object().BatchAddEC(tx, msg.PackageID, msg.Adds); err != nil { + if _, err := svc.db.Object().BatchAdd(tx, msg.PackageID, msg.Adds); err != nil { return fmt.Errorf("adding objects: %w", err) } } @@ -134,7 +76,7 @@ func (svc *Service) UpdateECPackage(msg *coormq.UpdateECPackage) (*coormq.Update return nil, mq.Failed(errorcode.OperationFailed, "update ec package failed") } - return mq.ReplyOK(coormq.NewUpdateECPackageResp()) + return mq.ReplyOK(coormq.NewUpdatePackageResp()) } func (svc *Service) DeletePackage(msg *coormq.DeletePackage) (*coormq.DeletePackageResp, *mq.CodeMessage) { @@ -171,7 +113,7 @@ func (svc *Service) DeletePackage(msg *coormq.DeletePackage) (*coormq.DeletePack // 不追求及时、准确 if len(stgs) == 0 { // 如果没有被引用,直接投递CheckPackage的任务 - err := svc.scanner.PostEvent(scmq.NewPostEvent(scevt.NewCheckPackage([]int64{msg.PackageID}), false, false)) + err := svc.scanner.PostEvent(scmq.NewPostEvent(scevt.NewCheckPackage([]cdssdk.PackageID{msg.PackageID}), false, false)) if err != nil { logger.Warnf("post event to scanner failed, but this will not affect deleting, err: %s", err.Error()) } @@ -180,7 +122,7 @@ func (svc *Service) DeletePackage(msg *coormq.DeletePackage) (*coormq.DeletePack } else { // 有引用则让Agent去检查StoragePackage for _, stg := range stgs { - err := svc.scanner.PostEvent(scmq.NewPostEvent(scevt.NewAgentCheckStorage(stg.StorageID, []int64{msg.PackageID}), false, false)) + err := svc.scanner.PostEvent(scmq.NewPostEvent(scevt.NewAgentCheckStorage(stg.StorageID, []cdssdk.PackageID{msg.PackageID}), false, false)) if err != nil { logger.Warnf("post event to scanner failed, but this will not affect deleting, err: %s", err.Error()) } @@ -206,76 +148,33 @@ func (svc *Service) GetPackageCachedNodes(msg *coormq.GetPackageCachedNodes) (*c return nil, mq.Failed(errorcode.OperationFailed, "package is not available to the user") } - pkg, err := svc.db.Package().GetByID(svc.db.SQLCtx(), msg.PackageID) + objDetails, err := svc.db.ObjectBlock().GetPackageBlockDetails(svc.db.SQLCtx(), msg.PackageID) if err != nil { logger.WithField("PackageID", msg.PackageID). - Warnf("get package: %s", err.Error()) + Warnf("get package block details: %s", err.Error()) - return nil, mq.Failed(errorcode.OperationFailed, "get package failed") + return nil, mq.Failed(errorcode.OperationFailed, "get package block details failed") } var packageSize int64 - nodeInfoMap := make(map[int64]*cdssdk.NodePackageCachingInfo) - if pkg.Redundancy.IsRepInfo() { - // 备份方式为rep - objectRepDatas, err := svc.db.ObjectRep().GetWithNodeIDInPackage(svc.db.SQLCtx(), msg.PackageID) - if err != nil { - logger.WithField("PackageID", msg.PackageID). - Warnf("get objectRepDatas by packageID failed, err: %s", err.Error()) - return nil, mq.Failed(errorcode.OperationFailed, "get objectRepDatas by packageID failed") - } - - for _, data := range objectRepDatas { - packageSize += data.Object.Size - for _, nodeID := range data.NodeIDs { - - nodeInfo, exists := nodeInfoMap[nodeID] - if !exists { - nodeInfo = &cdssdk.NodePackageCachingInfo{ - NodeID: nodeID, - FileSize: data.Object.Size, - ObjectCount: 1, + nodeInfoMap := make(map[cdssdk.NodeID]*cdssdk.NodePackageCachingInfo) + for _, obj := range objDetails { + // 只要存了文件的一个块,就认为此节点存了整个文件 + for _, block := range obj.Blocks { + for _, nodeID := range block.CachedNodeIDs { + info, ok := nodeInfoMap[nodeID] + if !ok { + info = &cdssdk.NodePackageCachingInfo{ + NodeID: nodeID, } - } else { - nodeInfo.FileSize += data.Object.Size - nodeInfo.ObjectCount++ - } - nodeInfoMap[nodeID] = nodeInfo - } - } - } else if pkg.Redundancy.IsECInfo() { - // 备份方式为ec - objectECDatas, err := svc.db.ObjectBlock().GetWithNodeIDInPackage(svc.db.SQLCtx(), msg.PackageID) - if err != nil { - logger.WithField("PackageID", msg.PackageID). - Warnf("get objectECDatas by packageID failed, err: %s", err.Error()) - return nil, mq.Failed(errorcode.OperationFailed, "get objectECDatas by packageID failed") - } + nodeInfoMap[nodeID] = info - for _, ecData := range objectECDatas { - packageSize += ecData.Object.Size - for _, block := range ecData.Blocks { - for _, nodeID := range block.NodeIDs { - - nodeInfo, exists := nodeInfoMap[nodeID] - if !exists { - nodeInfo = &cdssdk.NodePackageCachingInfo{ - NodeID: nodeID, - FileSize: ecData.Object.Size, - ObjectCount: 1, - } - } else { - nodeInfo.FileSize += ecData.Object.Size - nodeInfo.ObjectCount++ - } - nodeInfoMap[nodeID] = nodeInfo } + + info.FileSize += obj.Object.Size + info.ObjectCount++ } } - } else { - logger.WithField("PackageID", msg.PackageID). - Warnf("Redundancy type %s is wrong", pkg.Redundancy.Type) - return nil, mq.Failed(errorcode.OperationFailed, "redundancy type is wrong") } var nodeInfos []cdssdk.NodePackageCachingInfo @@ -286,7 +185,7 @@ func (svc *Service) GetPackageCachedNodes(msg *coormq.GetPackageCachedNodes) (*c sort.Slice(nodeInfos, func(i, j int) bool { return nodeInfos[i].NodeID < nodeInfos[j].NodeID }) - return mq.ReplyOK(coormq.NewGetPackageCachedNodesResp(nodeInfos, packageSize, pkg.Redundancy.Type)) + return mq.ReplyOK(coormq.NewGetPackageCachedNodesResp(nodeInfos, packageSize)) } func (svc *Service) GetPackageLoadedNodes(msg *coormq.GetPackageLoadedNodes) (*coormq.GetPackageLoadedNodesResp, *mq.CodeMessage) { @@ -297,8 +196,8 @@ func (svc *Service) GetPackageLoadedNodes(msg *coormq.GetPackageLoadedNodes) (*c return nil, mq.Failed(errorcode.OperationFailed, "get storages by packageID failed") } - uniqueNodeIDs := make(map[int64]bool) - var nodeIDs []int64 + uniqueNodeIDs := make(map[cdssdk.NodeID]bool) + var nodeIDs []cdssdk.NodeID for _, stg := range storages { if !uniqueNodeIDs[stg.NodeID] { uniqueNodeIDs[stg.NodeID] = true diff --git a/scanner/internal/event/agent_check_state.go b/scanner/internal/event/agent_check_state.go index e16f091..721f3ca 100644 --- a/scanner/internal/event/agent_check_state.go +++ b/scanner/internal/event/agent_check_state.go @@ -4,13 +4,11 @@ import ( "database/sql" "time" - "github.com/samber/lo" "gitlink.org.cn/cloudream/common/pkgs/logger" "gitlink.org.cn/cloudream/common/pkgs/mq" cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" "gitlink.org.cn/cloudream/storage/common/consts" stgglb "gitlink.org.cn/cloudream/storage/common/globals" - "gitlink.org.cn/cloudream/storage/common/pkgs/db/model" "gitlink.org.cn/cloudream/storage/common/pkgs/distlock/reqbuilder" agtmq "gitlink.org.cn/cloudream/storage/common/pkgs/mq/agent" scevt "gitlink.org.cn/cloudream/storage/common/pkgs/mq/scanner/event" @@ -81,15 +79,16 @@ func (t *AgentCheckState) Execute(execCtx ExecuteContext) { log.WithField("NodeID", t.NodeID).Warnf("set node state failed, err: %s", err.Error()) return } - - caches, err := execCtx.Args.DB.Cache().GetNodeCaches(execCtx.Args.DB.SQLCtx(), t.NodeID) - if err != nil { - log.WithField("NodeID", t.NodeID).Warnf("get node caches failed, err: %s", err.Error()) - return - } - - // 补充备份数 - execCtx.Executor.Post(NewCheckRepCount(lo.Map(caches, func(ch model.Cache, index int) string { return ch.FileHash }))) + /* + caches, err := execCtx.Args.DB.Cache().GetNodeCaches(execCtx.Args.DB.SQLCtx(), t.NodeID) + if err != nil { + log.WithField("NodeID", t.NodeID).Warnf("get node caches failed, err: %s", err.Error()) + return + } + + // 补充备份数 + execCtx.Executor.Post(NewCheckRepCount(lo.Map(caches, func(ch model.Cache, index int) string { return ch.FileHash }))) + */ return } return diff --git a/scanner/internal/event/check_cache.go b/scanner/internal/event/check_cache.go index ee94132..87928ca 100644 --- a/scanner/internal/event/check_cache.go +++ b/scanner/internal/event/check_cache.go @@ -3,11 +3,9 @@ package event import ( "database/sql" - "github.com/samber/lo" "gitlink.org.cn/cloudream/common/pkgs/logger" cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" "gitlink.org.cn/cloudream/storage/common/consts" - "gitlink.org.cn/cloudream/storage/common/pkgs/db/model" "gitlink.org.cn/cloudream/storage/common/pkgs/distlock/reqbuilder" scevt "gitlink.org.cn/cloudream/storage/common/pkgs/mq/scanner/event" ) @@ -64,20 +62,20 @@ func (t *CheckCache) Execute(execCtx ExecuteContext) { if node.State != consts.NodeStateUnavailable { return } + /* + caches, err := execCtx.Args.DB.Cache().GetNodeCaches(execCtx.Args.DB.SQLCtx(), t.NodeID) + if err != nil { + log.WithField("NodeID", t.NodeID).Warnf("get node caches failed, err: %s", err.Error()) + return + } - caches, err := execCtx.Args.DB.Cache().GetNodeCaches(execCtx.Args.DB.SQLCtx(), t.NodeID) - if err != nil { - log.WithField("NodeID", t.NodeID).Warnf("get node caches failed, err: %s", err.Error()) - return - } - - err = execCtx.Args.DB.Cache().DeleteNodeAll(execCtx.Args.DB.SQLCtx(), t.NodeID) - if err != nil { - log.WithField("NodeID", t.NodeID).Warnf("delete node all caches failed, err: %s", err.Error()) - return - } - - execCtx.Executor.Post(NewCheckRepCount(lo.Map(caches, func(ch model.Cache, index int) string { return ch.FileHash }))) + err = execCtx.Args.DB.Cache().DeleteNodeAll(execCtx.Args.DB.SQLCtx(), t.NodeID) + if err != nil { + log.WithField("NodeID", t.NodeID).Warnf("delete node all caches failed, err: %s", err.Error()) + return + } + */ + //execCtx.Executor.Post(NewCheckRepCount(lo.Map(caches, func(ch model.Cache, index int) string { return ch.FileHash }))) } func init() { diff --git a/scanner/internal/event/check_rep_count.go b/scanner/internal/event/check_rep_count.go index f5fe8ec..11adeaa 100644 --- a/scanner/internal/event/check_rep_count.go +++ b/scanner/internal/event/check_rep_count.go @@ -1,5 +1,7 @@ package event +/* +// TODO 可作为新逻辑的参考 import ( "fmt" "math" @@ -213,3 +215,4 @@ func chooseDeleteAvaiRepNodes(allNodes []model.Node, curAvaiRepNodes []model.Nod func init() { RegisterMessageConvertor(func(msg *scevt.CheckRepCount) Event { return NewCheckRepCount(msg.FileHashes) }) } +*/ diff --git a/scanner/internal/event/check_rep_count_test.go b/scanner/internal/event/check_rep_count_test.go index e4be2b8..f6d9c3e 100644 --- a/scanner/internal/event/check_rep_count_test.go +++ b/scanner/internal/event/check_rep_count_test.go @@ -1,10 +1,12 @@ package event +/* import ( "testing" "github.com/samber/lo" . "github.com/smartystreets/goconvey/convey" + cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" "gitlink.org.cn/cloudream/common/utils/sort" "gitlink.org.cn/cloudream/storage/common/consts" "gitlink.org.cn/cloudream/storage/common/pkgs/db/model" @@ -16,7 +18,7 @@ func Test_chooseNewRepNodes(t *testing.T) { allNodes []model.Node curRepNodes []model.Node newCount int - wantNodeIDs []int + wantNodeIDs []cdssdk.NodeID }{ { title: "优先选择不同地域的节点", @@ -49,7 +51,7 @@ func Test_chooseNewRepNodes(t *testing.T) { }, }, newCount: 2, - wantNodeIDs: []int{3, 4}, + wantNodeIDs: []cdssdk.NodeID{3, 4}, }, { title: "就算节点数不足,也不能选择重复节点", @@ -72,7 +74,7 @@ func Test_chooseNewRepNodes(t *testing.T) { }, }, newCount: 2, - wantNodeIDs: []int{2}, + wantNodeIDs: []cdssdk.NodeID{2}, }, { title: "就算节点数不足,也不能选择状态unavailable的节点", @@ -95,16 +97,16 @@ func Test_chooseNewRepNodes(t *testing.T) { }, }, newCount: 2, - wantNodeIDs: []int{2}, + wantNodeIDs: []cdssdk.NodeID{2}, }, } for _, test := range testcases { Convey(test.title, t, func() { chooseNodes := chooseNewRepNodes(test.allNodes, test.curRepNodes, test.newCount) - chooseNodeIDs := lo.Map(chooseNodes, func(node model.Node, index int) int64 { return node.NodeID }) + chooseNodeIDs := lo.Map(chooseNodes, func(node model.Node, index int) cdssdk.NodeID { return node.NodeID }) - sort.Sort(chooseNodeIDs, sort.Cmp[int64]) + sort.Sort(chooseNodeIDs, sort.Cmp[cdssdk.NodeID]) So(chooseNodeIDs, ShouldResemble, test.wantNodeIDs) }) @@ -117,7 +119,7 @@ func Test_chooseDeleteAvaiRepNodes(t *testing.T) { allNodes []model.Node curRepNodes []model.Node delCount int - wantNodeLocationIDs []int + wantNodeLocationIDs []cdssdk.LocationID }{ { title: "优先选择地域重复的节点", @@ -129,7 +131,7 @@ func Test_chooseDeleteAvaiRepNodes(t *testing.T) { {NodeID: 8, LocationID: 4}, }, delCount: 4, - wantNodeLocationIDs: []int{1, 2, 3, 3}, + wantNodeLocationIDs: []cdssdk.LocationID{1, 2, 3, 3}, }, { title: "节点不够删", @@ -138,18 +140,19 @@ func Test_chooseDeleteAvaiRepNodes(t *testing.T) { {NodeID: 1, LocationID: 1}, }, delCount: 2, - wantNodeLocationIDs: []int{1}, + wantNodeLocationIDs: []cdssdk.LocationID{1}, }, } for _, test := range testcases { Convey(test.title, t, func() { chooseNodes := chooseDeleteAvaiRepNodes(test.allNodes, test.curRepNodes, test.delCount) - chooseNodeLocationIDs := lo.Map(chooseNodes, func(node model.Node, index int) int64 { return node.LocationID }) + chooseNodeLocationIDs := lo.Map(chooseNodes, func(node model.Node, index int) cdssdk.LocationID { return node.LocationID }) - sort.Sort(chooseNodeLocationIDs, sort.Cmp[int64]) + sort.Sort(chooseNodeLocationIDs, sort.Cmp[cdssdk.LocationID]) So(chooseNodeLocationIDs, ShouldResemble, test.wantNodeLocationIDs) }) } } +*/ diff --git a/scanner/internal/tickevent/batch_check_all_rep_count.go b/scanner/internal/tickevent/batch_check_all_rep_count.go index 58cef58..bfe1a73 100644 --- a/scanner/internal/tickevent/batch_check_all_rep_count.go +++ b/scanner/internal/tickevent/batch_check_all_rep_count.go @@ -1,5 +1,6 @@ package tickevent +/* import ( "gitlink.org.cn/cloudream/common/pkgs/logger" "gitlink.org.cn/cloudream/storage/scanner/internal/event" @@ -37,3 +38,4 @@ func (e *BatchCheckAllRepCount) Execute(ctx ExecuteContext) { e.lastCheckStart += CHECK_CACHE_BATCH_SIZE } } +*/ diff --git a/scanner/main.go b/scanner/main.go index ca659b4..2b01002 100644 --- a/scanner/main.go +++ b/scanner/main.go @@ -119,7 +119,7 @@ func startTickEvent(tickExecutor *tickevent.Executor) { tickExecutor.Start(tickevent.NewBatchCheckAllPackage(), interval, tickevent.StartOption{RandomStartDelayMs: 60 * 1000}) - tickExecutor.Start(tickevent.NewBatchCheckAllRepCount(), interval, tickevent.StartOption{RandomStartDelayMs: 60 * 1000}) + // tickExecutor.Start(tickevent.NewBatchCheckAllRepCount(), interval, tickevent.StartOption{RandomStartDelayMs: 60 * 1000}) tickExecutor.Start(tickevent.NewBatchCheckAllStorage(), interval, tickevent.StartOption{RandomStartDelayMs: 60 * 1000})