diff --git a/agent/internal/cmd/serve.go b/agent/internal/cmd/serve.go index aa62710..ea65883 100644 --- a/agent/internal/cmd/serve.go +++ b/agent/internal/cmd/serve.go @@ -47,8 +47,10 @@ func serve(configPath string) { stgglb.InitMQPool(&config.Cfg().RabbitMQ) stgglb.InitAgentRPCPool(&agtrpc.PoolConfig{}) + // 获取Hub配置 hubCfg := downloadHubConfig() + // 初始化存储服务管理器 stgMgr := mgr.NewManager() for _, stg := range hubCfg.Storages { err := stgMgr.InitStorage(stg) @@ -58,9 +60,11 @@ func serve(configPath string) { } } - sw := exec.NewWorker() + // 初始化执行器 + worker := exec.NewWorker() - httpSvr, err := http.NewServer(config.Cfg().ListenAddr, http.NewService(&sw)) + // 初始化HTTP服务 + httpSvr, err := http.NewServer(config.Cfg().ListenAddr, http.NewService(&worker, stgMgr)) if err != nil { logger.Fatalf("new http server failed, err: %s", err.Error()) } @@ -101,19 +105,23 @@ func serve(configPath string) { }) conCol.CollectInPlace() + // 启动访问统计服务 acStat := accessstat.NewAccessStat(accessstat.Config{ // TODO 考虑放到配置里 ReportInterval: time.Second * 10, }) go serveAccessStat(acStat) + // 初始化分布式锁服务 distlock, err := distlock.NewService(&config.Cfg().DistLock) if err != nil { logger.Fatalf("new ipfs failed, err: %s", err.Error()) } - dlder := downloader.NewDownloader(config.Cfg().Downloader, &conCol) + // 初始化下载器 + dlder := downloader.NewDownloader(config.Cfg().Downloader, &conCol, stgMgr) + // 初始化任务管理器 taskMgr := task.NewManager(distlock, &conCol, &dlder, acStat, stgMgr) // 启动命令服务器 @@ -127,14 +135,14 @@ func serve(configPath string) { }) go serveAgentServer(agtSvr) - //面向客户端收发数据 + // 启动GRPC服务 listenAddr := config.Cfg().GRPC.MakeListenAddress() lis, err := net.Listen("tcp", listenAddr) if err != nil { logger.Fatalf("listen on %s failed, err: %s", listenAddr, err.Error()) } s := grpc.NewServer() - agtrpc.RegisterAgentServer(s, grpcsvc.NewService(&sw)) + agtrpc.RegisterAgentServer(s, grpcsvc.NewService(&worker, stgMgr)) go serveGRPC(s, lis) go serveDistLock(distlock) diff --git a/agent/internal/grpc/io.go b/agent/internal/grpc/io.go index de593ec..83433c0 100644 --- a/agent/internal/grpc/io.go +++ b/agent/internal/grpc/io.go @@ -29,9 +29,7 @@ func (s *Service) ExecuteIOPlan(ctx context.Context, req *agtrpc.ExecuteIOPlanRe defer s.swWorker.Remove(sw) execCtx := exec.NewWithContext(ctx) - - // TODO2 注入依赖 - + exec.SetValueByType(execCtx, s.stgMgr) _, err = sw.Run(execCtx) if err != nil { return nil, fmt.Errorf("running io plan: %w", err) diff --git a/agent/internal/grpc/service.go b/agent/internal/grpc/service.go index 4936bae..9d76469 100644 --- a/agent/internal/grpc/service.go +++ b/agent/internal/grpc/service.go @@ -3,15 +3,18 @@ package grpc import ( "gitlink.org.cn/cloudream/common/pkgs/ioswitch/exec" agentserver "gitlink.org.cn/cloudream/storage/common/pkgs/grpc/agent" + "gitlink.org.cn/cloudream/storage/common/pkgs/storage/mgr" ) type Service struct { agentserver.AgentServer swWorker *exec.Worker + stgMgr *mgr.Manager } -func NewService(swWorker *exec.Worker) *Service { +func NewService(swWorker *exec.Worker, stgMgr *mgr.Manager) *Service { return &Service{ swWorker: swWorker, + stgMgr: stgMgr, } } diff --git a/agent/internal/http/hub_io.go b/agent/internal/http/hub_io.go index f5bc37b..9fabf18 100644 --- a/agent/internal/http/hub_io.go +++ b/agent/internal/http/hub_io.go @@ -168,8 +168,7 @@ func (s *IOService) ExecuteIOPlan(ctx *gin.Context) { defer s.svc.swWorker.Remove(sw) execCtx := exec.NewWithContext(ctx.Request.Context()) - - // TODO 注入依赖 + exec.SetValueByType(execCtx, s.svc.stgMgr) _, err = sw.Run(execCtx) if err != nil { ctx.JSON(http.StatusOK, Failed(errorcode.OperationFailed, fmt.Sprintf("executing plan: %v", err))) diff --git a/agent/internal/http/service.go b/agent/internal/http/service.go index 7188b0c..401893f 100644 --- a/agent/internal/http/service.go +++ b/agent/internal/http/service.go @@ -1,13 +1,18 @@ package http -import "gitlink.org.cn/cloudream/common/pkgs/ioswitch/exec" +import ( + "gitlink.org.cn/cloudream/common/pkgs/ioswitch/exec" + "gitlink.org.cn/cloudream/storage/common/pkgs/storage/mgr" +) type Service struct { swWorker *exec.Worker + stgMgr *mgr.Manager } -func NewService(swWorker *exec.Worker) *Service { +func NewService(swWorker *exec.Worker, stgMgr *mgr.Manager) *Service { return &Service{ swWorker: swWorker, + stgMgr: stgMgr, } } diff --git a/agent/internal/task/create_package.go b/agent/internal/task/create_package.go index eaf6a58..779925f 100644 --- a/agent/internal/task/create_package.go +++ b/agent/internal/task/create_package.go @@ -87,6 +87,7 @@ func (t *CreatePackage) Execute(task *task.Task[TaskContext], ctx TaskContext, c uploadRet, err := cmd.NewUploadObjects(t.userID, createResp.Package.PackageID, t.objIter, t.nodeAffinity).Execute(&cmd.UploadObjectsContext{ Distlock: ctx.distlock, Connectivity: ctx.connectivity, + StgMgr: ctx.stgMgr, }) if err != nil { err = fmt.Errorf("uploading objects: %w", err) diff --git a/client/internal/task/task.go b/client/internal/task/task.go index 7d25c40..de04dfc 100644 --- a/client/internal/task/task.go +++ b/client/internal/task/task.go @@ -4,12 +4,14 @@ import ( "gitlink.org.cn/cloudream/common/pkgs/distlock" // 引入分布式锁服务 "gitlink.org.cn/cloudream/common/pkgs/task" // 引入任务处理相关的包 "gitlink.org.cn/cloudream/storage/common/pkgs/connectivity" // 引入网络连接状态收集器 + "gitlink.org.cn/cloudream/storage/common/pkgs/storage/mgr" ) // TaskContext 定义了任务执行的上下文环境,包含分布式锁服务和网络连接状态收集器 type TaskContext struct { distlock *distlock.Service connectivity *connectivity.Collector + stgMgr *mgr.Manager } // CompleteFn 类型定义了任务完成时的回调函数,用于设置任务的执行结果 @@ -29,9 +31,10 @@ type CompleteOption = task.CompleteOption // NewManager 创建一个新的任务管理器实例,接受一个分布式锁服务和一个网络连接状态收集器作为参数 // 返回一个初始化好的任务管理器实例 -func NewManager(distlock *distlock.Service, connectivity *connectivity.Collector) Manager { +func NewManager(distlock *distlock.Service, connectivity *connectivity.Collector, stgMgr *mgr.Manager) Manager { return task.NewManager(TaskContext{ distlock: distlock, connectivity: connectivity, + stgMgr: stgMgr, }) } diff --git a/client/internal/task/upload_objects.go b/client/internal/task/upload_objects.go index 2f8fae6..b9ca29a 100644 --- a/client/internal/task/upload_objects.go +++ b/client/internal/task/upload_objects.go @@ -41,6 +41,7 @@ func (t *UploadObjects) Execute(task *task.Task[TaskContext], ctx TaskContext, c ret, err := t.cmd.Execute(&cmd.UploadObjectsContext{ Distlock: ctx.distlock, // 使用任务上下文中的分布式锁。 Connectivity: ctx.connectivity, // 使用任务上下文中的网络连接性信息。 + StgMgr: ctx.stgMgr, }) t.Result = ret // 存储上传结果。 diff --git a/client/main.go b/client/main.go index ea172d1..c549a39 100644 --- a/client/main.go +++ b/client/main.go @@ -19,6 +19,7 @@ import ( "gitlink.org.cn/cloudream/storage/common/pkgs/distlock" "gitlink.org.cn/cloudream/storage/common/pkgs/downloader" coormq "gitlink.org.cn/cloudream/storage/common/pkgs/mq/coordinator" + "gitlink.org.cn/cloudream/storage/common/pkgs/storage/mgr" ) func main() { @@ -85,9 +86,11 @@ func main() { }) go serveAccessStat(acStat) - taskMgr := task.NewManager(distlockSvc, &conCol) + stgMgr := mgr.NewManager() - dlder := downloader.NewDownloader(config.Cfg().Downloader, &conCol) + taskMgr := task.NewManager(distlockSvc, &conCol, stgMgr) + + dlder := downloader.NewDownloader(config.Cfg().Downloader, &conCol, stgMgr) svc, err := services.NewService(distlockSvc, &taskMgr, &dlder, acStat) if err != nil { diff --git a/common/pkgs/cmd/upload_objects.go b/common/pkgs/cmd/upload_objects.go index 2ef5f30..4938c29 100644 --- a/common/pkgs/cmd/upload_objects.go +++ b/common/pkgs/cmd/upload_objects.go @@ -24,6 +24,7 @@ import ( "gitlink.org.cn/cloudream/storage/common/pkgs/ioswitch2/parser" "gitlink.org.cn/cloudream/storage/common/pkgs/iterator" coormq "gitlink.org.cn/cloudream/storage/common/pkgs/mq/coordinator" + "gitlink.org.cn/cloudream/storage/common/pkgs/storage/mgr" ) type UploadObjects struct { @@ -52,6 +53,7 @@ type UploadStorageInfo struct { type UploadObjectsContext struct { Distlock *distlock.Service Connectivity *connectivity.Collector + StgMgr *mgr.Manager } func NewUploadObjects(userID cdssdk.UserID, packageID cdssdk.PackageID, objIter iterator.UploadingObjectIterator, nodeAffinity *cdssdk.NodeID) *UploadObjects { @@ -114,7 +116,7 @@ func (t *UploadObjects) Execute(ctx *UploadObjectsContext) (*UploadObjectsResult } defer ipfsMutex.Unlock() - rets, err := uploadAndUpdatePackage(t.packageID, t.objectIter, userStgs, t.nodeAffinity) + rets, err := uploadAndUpdatePackage(ctx, t.packageID, t.objectIter, userStgs, t.nodeAffinity) if err != nil { return nil, err } @@ -147,7 +149,7 @@ func chooseUploadNode(nodes []UploadStorageInfo, nodeAffinity *cdssdk.NodeID) Up return nodes[0] } -func uploadAndUpdatePackage(packageID cdssdk.PackageID, objectIter iterator.UploadingObjectIterator, userNodes []UploadStorageInfo, nodeAffinity *cdssdk.NodeID) ([]ObjectUploadResult, error) { +func uploadAndUpdatePackage(ctx *UploadObjectsContext, packageID cdssdk.PackageID, objectIter iterator.UploadingObjectIterator, userNodes []UploadStorageInfo, nodeAffinity *cdssdk.NodeID) ([]ObjectUploadResult, error) { coorCli, err := stgglb.CoordinatorMQPool.Acquire() if err != nil { return nil, fmt.Errorf("new coordinator client: %w", err) @@ -172,7 +174,7 @@ func uploadAndUpdatePackage(packageID cdssdk.PackageID, objectIter iterator.Uplo defer objInfo.File.Close() uploadTime := time.Now() - fileHash, err := uploadFile(objInfo.File, uploadNode) + fileHash, err := uploadFile(ctx, objInfo.File, uploadNode) if err != nil { return fmt.Errorf("uploading file: %w", err) } @@ -213,7 +215,7 @@ func uploadAndUpdatePackage(packageID cdssdk.PackageID, objectIter iterator.Uplo return uploadRets, nil } -func uploadFile(file io.Reader, uploadStg UploadStorageInfo) (cdssdk.FileHash, error) { +func uploadFile(ctx *UploadObjectsContext, file io.Reader, uploadStg UploadStorageInfo) (cdssdk.FileHash, error) { ft := ioswitch2.NewFromTo() fromExec, hd := ioswitch2.NewFromDriver(-1) ft.AddFrom(fromExec).AddTo(ioswitch2.NewToShardStore(*uploadStg.Storage.MasterHub, uploadStg.Storage.Storage, -1, "fileHash")) @@ -225,9 +227,8 @@ func uploadFile(file io.Reader, uploadStg UploadStorageInfo) (cdssdk.FileHash, e return "", fmt.Errorf("parsing plan: %w", err) } - // TODO2 注入依赖 exeCtx := exec.NewExecContext() - + exec.SetValueByType(exeCtx, ctx.StgMgr) exec := plans.Execute(exeCtx) exec.BeginWrite(io.NopCloser(file), hd) ret, err := exec.Wait(context.TODO()) diff --git a/common/pkgs/downloader/downloader.go b/common/pkgs/downloader/downloader.go index 7c97d36..8d6fcc5 100644 --- a/common/pkgs/downloader/downloader.go +++ b/common/pkgs/downloader/downloader.go @@ -11,6 +11,7 @@ import ( stgmod "gitlink.org.cn/cloudream/storage/common/models" "gitlink.org.cn/cloudream/storage/common/pkgs/connectivity" coormq "gitlink.org.cn/cloudream/storage/common/pkgs/mq/coordinator" + "gitlink.org.cn/cloudream/storage/common/pkgs/storage/mgr" ) const ( @@ -38,11 +39,12 @@ type Downloading struct { type Downloader struct { strips *StripCache - conn *connectivity.Collector cfg Config + conn *connectivity.Collector + stgMgr *mgr.Manager } -func NewDownloader(cfg Config, conn *connectivity.Collector) Downloader { +func NewDownloader(cfg Config, conn *connectivity.Collector, stgMgr *mgr.Manager) Downloader { if cfg.MaxStripCacheCount == 0 { cfg.MaxStripCacheCount = DefaultMaxStripCacheCount } @@ -50,8 +52,9 @@ func NewDownloader(cfg Config, conn *connectivity.Collector) Downloader { ch, _ := lru.New[ECStripKey, ObjectECStrip](cfg.MaxStripCacheCount) return Downloader{ strips: ch, - conn: conn, cfg: cfg, + conn: conn, + stgMgr: stgMgr, } } diff --git a/common/pkgs/downloader/iterator.go b/common/pkgs/downloader/iterator.go index acd1856..8ca630f 100644 --- a/common/pkgs/downloader/iterator.go +++ b/common/pkgs/downloader/iterator.go @@ -205,7 +205,7 @@ func (iter *DownloadObjectIterator) downloadNoneOrRepObject(obj downloadReqeust2 return nil, fmt.Errorf("no storage has this object") } - logger.Debugf("downloading object %v from storage %v(%v)", obj.Raw.ObjectID, stg) + logger.Debugf("downloading object %v from storage %v", obj.Raw.ObjectID, stg) return iter.downloadFromStorage(stg, obj) } @@ -224,7 +224,7 @@ func (iter *DownloadObjectIterator) downloadECObject(req downloadReqeust2, ecRed if i > 0 { logStrs = append(logStrs, ", ") } - logStrs = append(logStrs, fmt.Sprintf("%v@%v(%v)", b.Block.Index, b.Storage)) + logStrs = append(logStrs, fmt.Sprintf("%v@%v", b.Block.Index, b.Storage)) } logger.Debug(logStrs...) @@ -237,7 +237,7 @@ func (iter *DownloadObjectIterator) downloadECObject(req downloadReqeust2, ecRed } firstStripIndex := readPos / ecRed.StripSize() - stripIter := NewStripIterator(req.Detail.Object, blocks, ecRed, firstStripIndex, iter.downloader.strips, iter.downloader.cfg.ECStripPrefetchCount) + stripIter := NewStripIterator(iter.downloader, req.Detail.Object, blocks, ecRed, firstStripIndex, iter.downloader.strips, iter.downloader.cfg.ECStripPrefetchCount) defer stripIter.Close() for totalReadLen > 0 { @@ -274,7 +274,7 @@ func (iter *DownloadObjectIterator) downloadECObject(req downloadReqeust2, ecRed return nil, fmt.Errorf("no enough blocks to reconstruct the object %v , want %d, get only %d", req.Raw.ObjectID, ecRed.K, len(blocks)) } - logger.Debugf("downloading ec object %v from storage %v(%v)", req.Raw.ObjectID, stg) + logger.Debugf("downloading ec object %v from storage %v", req.Raw.ObjectID, stg) return iter.downloadFromStorage(stg, req) } @@ -395,7 +395,7 @@ func (iter *DownloadObjectIterator) downloadFromStorage(stg *stgmod.StorageDetai len := req.Raw.Length toExec.Range.Length = &len } - // TODO FileHash应该是FileHash类型 + ft.AddFrom(ioswitch2.NewFromShardstore(req.Detail.Object.FileHash, *stg.MasterHub, stg.Storage, -1)).AddTo(toExec) strHandle = handle @@ -405,9 +405,8 @@ func (iter *DownloadObjectIterator) downloadFromStorage(stg *stgmod.StorageDetai return nil, fmt.Errorf("parsing plan: %w", err) } - // TODO2 注入依赖 exeCtx := exec.NewExecContext() - + exec.SetValueByType(exeCtx, iter.downloader.stgMgr) exec := plans.Execute(exeCtx) go exec.Wait(context.TODO()) diff --git a/common/pkgs/downloader/lrc.go b/common/pkgs/downloader/lrc.go index d795cbb..67fbac0 100644 --- a/common/pkgs/downloader/lrc.go +++ b/common/pkgs/downloader/lrc.go @@ -53,7 +53,7 @@ func (iter *DownloadObjectIterator) downloadLRCObject(req downloadReqeust2, red } firstStripIndex := readPos / int64(red.K) / int64(red.ChunkSize) - stripIter := NewLRCStripIterator(req.Detail.Object, blocks, red, firstStripIndex, iter.downloader.strips, iter.downloader.cfg.ECStripPrefetchCount) + stripIter := NewLRCStripIterator(iter.downloader, req.Detail.Object, blocks, red, firstStripIndex, iter.downloader.strips, iter.downloader.cfg.ECStripPrefetchCount) defer stripIter.Close() for totalReadLen > 0 { diff --git a/common/pkgs/downloader/lrc_strip_iterator.go b/common/pkgs/downloader/lrc_strip_iterator.go index 99e22ae..d11847f 100644 --- a/common/pkgs/downloader/lrc_strip_iterator.go +++ b/common/pkgs/downloader/lrc_strip_iterator.go @@ -14,6 +14,7 @@ import ( ) type LRCStripIterator struct { + downloder *Downloader object cdssdk.Object blocks []downloadBlock red *cdssdk.LRCRedundancy @@ -25,12 +26,13 @@ type LRCStripIterator struct { inited bool } -func NewLRCStripIterator(object cdssdk.Object, blocks []downloadBlock, red *cdssdk.LRCRedundancy, beginStripIndex int64, cache *StripCache, maxPrefetch int) *LRCStripIterator { +func NewLRCStripIterator(downloder *Downloader, object cdssdk.Object, blocks []downloadBlock, red *cdssdk.LRCRedundancy, beginStripIndex int64, cache *StripCache, maxPrefetch int) *LRCStripIterator { if maxPrefetch <= 0 { maxPrefetch = 1 } iter := &LRCStripIterator{ + downloder: downloder, object: object, blocks: blocks, red: red, @@ -110,8 +112,8 @@ func (s *LRCStripIterator) downloading() { return } - // TODO2 注入依赖 exeCtx := exec.NewExecContext() + exec.SetValueByType(exeCtx, s.downloder.stgMgr) exec := plans.Execute(exeCtx) diff --git a/common/pkgs/downloader/strip_iterator.go b/common/pkgs/downloader/strip_iterator.go index f4ae4bd..d593815 100644 --- a/common/pkgs/downloader/strip_iterator.go +++ b/common/pkgs/downloader/strip_iterator.go @@ -25,6 +25,7 @@ type Strip struct { } type StripIterator struct { + downloader *Downloader object cdssdk.Object blocks []downloadBlock red *cdssdk.ECRedundancy @@ -45,12 +46,13 @@ type dataChanEntry struct { Error error } -func NewStripIterator(object cdssdk.Object, blocks []downloadBlock, red *cdssdk.ECRedundancy, beginStripIndex int64, cache *StripCache, maxPrefetch int) *StripIterator { +func NewStripIterator(downloader *Downloader, object cdssdk.Object, blocks []downloadBlock, red *cdssdk.ECRedundancy, beginStripIndex int64, cache *StripCache, maxPrefetch int) *StripIterator { if maxPrefetch <= 0 { maxPrefetch = 1 } iter := &StripIterator{ + downloader: downloader, object: object, blocks: blocks, red: red, @@ -210,9 +212,8 @@ func (s *StripIterator) readStrip(stripIndex int64, buf []byte) (int, error) { return 0, err } - // TODo2 注入依赖 exeCtx := exec.NewExecContext() - + exec.SetValueByType(exeCtx, s.downloader.stgMgr) exec := plans.Execute(exeCtx) ctx, cancel := context.WithCancel(context.Background()) diff --git a/common/pkgs/ioswitch2/ops2/shard_store.go b/common/pkgs/ioswitch2/ops2/shard_store.go index 610e325..6e98c98 100644 --- a/common/pkgs/ioswitch2/ops2/shard_store.go +++ b/common/pkgs/ioswitch2/ops2/shard_store.go @@ -40,7 +40,7 @@ func (o *ShardRead) Execute(ctx *exec.ExecContext, e *exec.Executor) error { Debugf("reading from shard store") defer logger.Debugf("reading from shard store finished") - stgMgr, err := exec.ValueByType[*mgr.Manager](ctx) + stgMgr, err := exec.GetValueByType[*mgr.Manager](ctx) if err != nil { return fmt.Errorf("getting storage manager: %w", err) } @@ -82,7 +82,7 @@ func (o *ShardWrite) Execute(ctx *exec.ExecContext, e *exec.Executor) error { Debugf("writting file to shard store") defer logger.Debugf("write to shard store finished") - stgMgr, err := exec.ValueByType[*mgr.Manager](ctx) + stgMgr, err := exec.GetValueByType[*mgr.Manager](ctx) if err != nil { return fmt.Errorf("getting storage manager: %w", err) } diff --git a/common/pkgs/ioswitchlrc/ops2/shard_store.go b/common/pkgs/ioswitchlrc/ops2/shard_store.go index f21bebb..daae238 100644 --- a/common/pkgs/ioswitchlrc/ops2/shard_store.go +++ b/common/pkgs/ioswitchlrc/ops2/shard_store.go @@ -40,7 +40,7 @@ func (o *ShardRead) Execute(ctx *exec.ExecContext, e *exec.Executor) error { Debugf("reading from shard store") defer logger.Debugf("reading from shard store finished") - stgMgr, err := exec.ValueByType[*mgr.Manager](ctx) + stgMgr, err := exec.GetValueByType[*mgr.Manager](ctx) if err != nil { return fmt.Errorf("getting storage manager: %w", err) } @@ -82,7 +82,7 @@ func (o *ShardWrite) Execute(ctx *exec.ExecContext, e *exec.Executor) error { Debugf("writting file to shard store") defer logger.Debugf("write to shard store finished") - stgMgr, err := exec.ValueByType[*mgr.Manager](ctx) + stgMgr, err := exec.GetValueByType[*mgr.Manager](ctx) if err != nil { return fmt.Errorf("getting storage manager: %w", err) } diff --git a/common/pkgs/storage/local/shard_store.go b/common/pkgs/storage/local/shard_store.go index eb2f7e3..11321ca 100644 --- a/common/pkgs/storage/local/shard_store.go +++ b/common/pkgs/storage/local/shard_store.go @@ -36,10 +36,11 @@ func NewShardStore(stg cdssdk.Storage, cfg cdssdk.LocalShardStorage) (*ShardStor } func (s *ShardStore) Start(ch *types.StorageEventChan) { - + logger.Infof("local shard store start, root: %v, max size: %v", s.cfg.Root, s.cfg.MaxSize) } func (s *ShardStore) Stop() { + logger.Infof("local shard store stop") } func (s *ShardStore) New() types.ShardWriter { diff --git a/scanner/internal/event/check_package_redundancy.go b/scanner/internal/event/check_package_redundancy.go index d5db4c0..d47040f 100644 --- a/scanner/internal/event/check_package_redundancy.go +++ b/scanner/internal/event/check_package_redundancy.go @@ -147,43 +147,43 @@ func (t *CheckPackageRedundancy) Execute(execCtx ExecuteContext) { switch newRed := newRed.(type) { case *cdssdk.RepRedundancy: log.WithField("ObjectID", obj.Object.ObjectID).Debugf("redundancy: none -> rep") - updating, err = t.noneToRep(obj, newRed, newRepStgs) + updating, err = t.noneToRep(execCtx, obj, newRed, newRepStgs) case *cdssdk.ECRedundancy: log.WithField("ObjectID", obj.Object.ObjectID).Debugf("redundancy: none -> ec") - updating, err = t.noneToEC(obj, newRed, newECStgs) + updating, err = t.noneToEC(execCtx, obj, newRed, newECStgs) case *cdssdk.LRCRedundancy: log.WithField("ObjectID", obj.Object.ObjectID).Debugf("redundancy: none -> lrc") - updating, err = t.noneToLRC(obj, newRed, selectedNodes) + updating, err = t.noneToLRC(execCtx, obj, newRed, selectedNodes) } case *cdssdk.RepRedundancy: switch newRed := newRed.(type) { case *cdssdk.RepRedundancy: - updating, err = t.repToRep(obj, srcRed, rechoosedRepStgs) + updating, err = t.repToRep(execCtx, obj, srcRed, rechoosedRepStgs) case *cdssdk.ECRedundancy: log.WithField("ObjectID", obj.Object.ObjectID).Debugf("redundancy: rep -> ec") - updating, err = t.repToEC(obj, newRed, newECStgs) + updating, err = t.repToEC(execCtx, obj, newRed, newECStgs) } case *cdssdk.ECRedundancy: switch newRed := newRed.(type) { case *cdssdk.RepRedundancy: log.WithField("ObjectID", obj.Object.ObjectID).Debugf("redundancy: ec -> rep") - updating, err = t.ecToRep(obj, srcRed, newRed, newRepStgs) + updating, err = t.ecToRep(execCtx, obj, srcRed, newRed, newRepStgs) case *cdssdk.ECRedundancy: uploadNodes := t.rechooseNodesForEC(obj, srcRed, userAllStorages) - updating, err = t.ecToEC(obj, srcRed, newRed, uploadNodes) + updating, err = t.ecToEC(execCtx, obj, srcRed, newRed, uploadNodes) } case *cdssdk.LRCRedundancy: switch newRed := newRed.(type) { case *cdssdk.LRCRedundancy: uploadNodes := t.rechooseNodesForLRC(obj, srcRed, userAllStorages) - updating, err = t.lrcToLRC(obj, srcRed, newRed, uploadNodes) + updating, err = t.lrcToLRC(execCtx, obj, srcRed, newRed, uploadNodes) } } @@ -426,7 +426,7 @@ func (t *CheckPackageRedundancy) chooseSoManyNodes(count int, stgs []*StorageLoa return chosen } -func (t *CheckPackageRedundancy) noneToRep(obj stgmod.ObjectDetail, red *cdssdk.RepRedundancy, uploadStgs []*StorageLoadInfo) (*coormq.UpdatingObjectRedundancy, error) { +func (t *CheckPackageRedundancy) noneToRep(ctx ExecuteContext, obj stgmod.ObjectDetail, red *cdssdk.RepRedundancy, uploadStgs []*StorageLoadInfo) (*coormq.UpdatingObjectRedundancy, error) { if len(obj.Blocks) == 0 { return nil, fmt.Errorf("object is not cached on any nodes, cannot change its redundancy to rep") } @@ -465,7 +465,9 @@ func (t *CheckPackageRedundancy) noneToRep(obj stgmod.ObjectDetail, red *cdssdk. } // TODO 添加依赖 - ret, err := plans.Execute(exec.NewExecContext()).Wait(context.Background()) + execCtx := exec.NewExecContext() + exec.SetValueByType(execCtx, ctx.Args.StgMgr) + ret, err := plans.Execute(execCtx).Wait(context.Background()) if err != nil { return nil, fmt.Errorf("executing io plan: %w", err) } @@ -487,7 +489,7 @@ func (t *CheckPackageRedundancy) noneToRep(obj stgmod.ObjectDetail, red *cdssdk. }, nil } -func (t *CheckPackageRedundancy) noneToEC(obj stgmod.ObjectDetail, red *cdssdk.ECRedundancy, uploadStgs []*StorageLoadInfo) (*coormq.UpdatingObjectRedundancy, error) { +func (t *CheckPackageRedundancy) noneToEC(ctx ExecuteContext, obj stgmod.ObjectDetail, red *cdssdk.ECRedundancy, uploadStgs []*StorageLoadInfo) (*coormq.UpdatingObjectRedundancy, error) { coorCli, err := stgglb.CoordinatorMQPool.Acquire() if err != nil { return nil, fmt.Errorf("new coordinator client: %w", err) @@ -521,8 +523,9 @@ func (t *CheckPackageRedundancy) noneToEC(obj stgmod.ObjectDetail, red *cdssdk.E return nil, fmt.Errorf("parsing plan: %w", err) } - // TODO 添加依赖 - ioRet, err := plans.Execute(exec.NewExecContext()).Wait(context.TODO()) + execCtx := exec.NewExecContext() + exec.SetValueByType(execCtx, ctx.Args.StgMgr) + ioRet, err := plans.Execute(execCtx).Wait(context.Background()) if err != nil { return nil, fmt.Errorf("executing io plan: %w", err) } @@ -544,7 +547,7 @@ func (t *CheckPackageRedundancy) noneToEC(obj stgmod.ObjectDetail, red *cdssdk.E }, nil } -func (t *CheckPackageRedundancy) noneToLRC(obj stgmod.ObjectDetail, red *cdssdk.LRCRedundancy, uploadNodes []*StorageLoadInfo) (*coormq.UpdatingObjectRedundancy, error) { +func (t *CheckPackageRedundancy) noneToLRC(ctx ExecuteContext, obj stgmod.ObjectDetail, red *cdssdk.LRCRedundancy, uploadNodes []*StorageLoadInfo) (*coormq.UpdatingObjectRedundancy, error) { coorCli, err := stgglb.CoordinatorMQPool.Acquire() if err != nil { return nil, fmt.Errorf("new coordinator client: %w", err) @@ -577,8 +580,9 @@ func (t *CheckPackageRedundancy) noneToLRC(obj stgmod.ObjectDetail, red *cdssdk. return nil, fmt.Errorf("parsing plan: %w", err) } - // TODO 添加依赖 - ioRet, err := plans.Execute(exec.NewExecContext()).Wait(context.TODO()) + execCtx := exec.NewExecContext() + exec.SetValueByType(execCtx, ctx.Args.StgMgr) + ioRet, err := plans.Execute(execCtx).Wait(context.Background()) if err != nil { return nil, fmt.Errorf("executing io plan: %w", err) } @@ -600,7 +604,7 @@ func (t *CheckPackageRedundancy) noneToLRC(obj stgmod.ObjectDetail, red *cdssdk. }, nil } -func (t *CheckPackageRedundancy) repToRep(obj stgmod.ObjectDetail, red *cdssdk.RepRedundancy, uploadStgs []*StorageLoadInfo) (*coormq.UpdatingObjectRedundancy, error) { +func (t *CheckPackageRedundancy) repToRep(ctx ExecuteContext, obj stgmod.ObjectDetail, red *cdssdk.RepRedundancy, uploadStgs []*StorageLoadInfo) (*coormq.UpdatingObjectRedundancy, error) { if len(obj.Blocks) == 0 { return nil, fmt.Errorf("object is not cached on any nodes, cannot change its redundancy to rep") } @@ -639,7 +643,9 @@ func (t *CheckPackageRedundancy) repToRep(obj stgmod.ObjectDetail, red *cdssdk.R } // TODO 添加依赖 - ret, err := plans.Execute(exec.NewExecContext()).Wait(context.Background()) + execCtx := exec.NewExecContext() + exec.SetValueByType(execCtx, ctx.Args.StgMgr) + ret, err := plans.Execute(execCtx).Wait(context.Background()) if err != nil { return nil, fmt.Errorf("executing io plan: %w", err) } @@ -661,11 +667,11 @@ func (t *CheckPackageRedundancy) repToRep(obj stgmod.ObjectDetail, red *cdssdk.R }, nil } -func (t *CheckPackageRedundancy) repToEC(obj stgmod.ObjectDetail, red *cdssdk.ECRedundancy, uploadNodes []*StorageLoadInfo) (*coormq.UpdatingObjectRedundancy, error) { - return t.noneToEC(obj, red, uploadNodes) +func (t *CheckPackageRedundancy) repToEC(ctx ExecuteContext, obj stgmod.ObjectDetail, red *cdssdk.ECRedundancy, uploadNodes []*StorageLoadInfo) (*coormq.UpdatingObjectRedundancy, error) { + return t.noneToEC(ctx, obj, red, uploadNodes) } -func (t *CheckPackageRedundancy) ecToRep(obj stgmod.ObjectDetail, srcRed *cdssdk.ECRedundancy, tarRed *cdssdk.RepRedundancy, uploadStgs []*StorageLoadInfo) (*coormq.UpdatingObjectRedundancy, error) { +func (t *CheckPackageRedundancy) ecToRep(ctx ExecuteContext, obj stgmod.ObjectDetail, srcRed *cdssdk.ECRedundancy, tarRed *cdssdk.RepRedundancy, uploadStgs []*StorageLoadInfo) (*coormq.UpdatingObjectRedundancy, error) { coorCli, err := stgglb.CoordinatorMQPool.Acquire() if err != nil { return nil, fmt.Errorf("new coordinator client: %w", err) @@ -715,7 +721,9 @@ func (t *CheckPackageRedundancy) ecToRep(obj stgmod.ObjectDetail, srcRed *cdssdk } // TODO 添加依赖 - ioRet, err := planBlder.Execute(exec.NewExecContext()).Wait(context.TODO()) + execCtx := exec.NewExecContext() + exec.SetValueByType(execCtx, ctx.Args.StgMgr) + ioRet, err := planBlder.Execute(execCtx).Wait(context.Background()) if err != nil { return nil, fmt.Errorf("executing io plan: %w", err) } @@ -737,7 +745,7 @@ func (t *CheckPackageRedundancy) ecToRep(obj stgmod.ObjectDetail, srcRed *cdssdk }, nil } -func (t *CheckPackageRedundancy) ecToEC(obj stgmod.ObjectDetail, srcRed *cdssdk.ECRedundancy, tarRed *cdssdk.ECRedundancy, uploadNodes []*StorageLoadInfo) (*coormq.UpdatingObjectRedundancy, error) { +func (t *CheckPackageRedundancy) ecToEC(ctx ExecuteContext, obj stgmod.ObjectDetail, srcRed *cdssdk.ECRedundancy, tarRed *cdssdk.ECRedundancy, uploadNodes []*StorageLoadInfo) (*coormq.UpdatingObjectRedundancy, error) { coorCli, err := stgglb.CoordinatorMQPool.Acquire() if err != nil { return nil, fmt.Errorf("new coordinator client: %w", err) @@ -805,8 +813,9 @@ func (t *CheckPackageRedundancy) ecToEC(obj stgmod.ObjectDetail, srcRed *cdssdk. } // 如果没有任何Plan,Wait会直接返回成功 - // TODO 添加依赖 - ret, err := planBlder.Execute(exec.NewExecContext()).Wait(context.TODO()) + execCtx := exec.NewExecContext() + exec.SetValueByType(execCtx, ctx.Args.StgMgr) + ret, err := planBlder.Execute(execCtx).Wait(context.Background()) if err != nil { return nil, fmt.Errorf("executing io plan: %w", err) } @@ -831,7 +840,7 @@ func (t *CheckPackageRedundancy) ecToEC(obj stgmod.ObjectDetail, srcRed *cdssdk. }, nil } -func (t *CheckPackageRedundancy) lrcToLRC(obj stgmod.ObjectDetail, srcRed *cdssdk.LRCRedundancy, tarRed *cdssdk.LRCRedundancy, uploadNodes []*StorageLoadInfo) (*coormq.UpdatingObjectRedundancy, error) { +func (t *CheckPackageRedundancy) lrcToLRC(ctx ExecuteContext, obj stgmod.ObjectDetail, srcRed *cdssdk.LRCRedundancy, tarRed *cdssdk.LRCRedundancy, uploadNodes []*StorageLoadInfo) (*coormq.UpdatingObjectRedundancy, error) { coorCli, err := stgglb.CoordinatorMQPool.Acquire() if err != nil { return nil, fmt.Errorf("new coordinator client: %w", err) @@ -871,10 +880,12 @@ func (t *CheckPackageRedundancy) lrcToLRC(obj stgmod.ObjectDetail, srcRed *cdssd // return t.groupReconstructLRC(obj, lostBlocks, lostBlockGrps, blocksGrpByIndex, srcRed, uploadNodes) } - return t.reconstructLRC(obj, blocksGrpByIndex, srcRed, uploadNodes) + return t.reconstructLRC(ctx, obj, blocksGrpByIndex, srcRed, uploadNodes) } /* +TODO2 修复这一块的代码 + func (t *CheckPackageRedundancy) groupReconstructLRC(obj stgmod.ObjectDetail, lostBlocks []int, lostBlockGrps []int, grpedBlocks []stgmod.GrouppedObjectBlock, red *cdssdk.LRCRedundancy, uploadNodes []*StorageLoadInfo) (*coormq.UpdatingObjectRedundancy, error) { grped := make(map[int]stgmod.GrouppedObjectBlock) for _, b := range grpedBlocks { @@ -938,7 +949,7 @@ func (t *CheckPackageRedundancy) lrcToLRC(obj stgmod.ObjectDetail, srcRed *cdssd }, nil } */ -func (t *CheckPackageRedundancy) reconstructLRC(obj stgmod.ObjectDetail, grpBlocks []stgmod.GrouppedObjectBlock, red *cdssdk.LRCRedundancy, uploadNodes []*StorageLoadInfo) (*coormq.UpdatingObjectRedundancy, error) { +func (t *CheckPackageRedundancy) reconstructLRC(ctx ExecuteContext, obj stgmod.ObjectDetail, grpBlocks []stgmod.GrouppedObjectBlock, red *cdssdk.LRCRedundancy, uploadNodes []*StorageLoadInfo) (*coormq.UpdatingObjectRedundancy, error) { var chosenBlocks []stgmod.GrouppedObjectBlock for _, block := range grpBlocks { if len(block.StorageIDs) > 0 && block.Index < red.M() { @@ -1001,8 +1012,9 @@ func (t *CheckPackageRedundancy) reconstructLRC(obj stgmod.ObjectDetail, grpBloc fmt.Printf("plans: %v\n", planBlder) // 如果没有任何Plan,Wait会直接返回成功 - // TODO 添加依赖 - ret, err := planBlder.Execute(exec.NewExecContext()).Wait(context.TODO()) + execCtx := exec.NewExecContext() + exec.SetValueByType(execCtx, ctx.Args.StgMgr) + ret, err := planBlder.Execute(execCtx).Wait(context.Background()) if err != nil { return nil, fmt.Errorf("executing io plan: %w", err) } diff --git a/scanner/internal/event/clean_pinned.go b/scanner/internal/event/clean_pinned.go index 7035411..a52e81e 100644 --- a/scanner/internal/event/clean_pinned.go +++ b/scanner/internal/event/clean_pinned.go @@ -817,13 +817,13 @@ func (t *CleanPinned) makePlansForECObject(allStgInfos map[cdssdk.StorageID]*stg return entry } -func (t *CleanPinned) executePlans(execCtx ExecuteContext, planBld *exec.PlanBuilder, planningStgIDs map[cdssdk.StorageID]bool) (map[string]exec.VarValue, error) { +func (t *CleanPinned) executePlans(ctx ExecuteContext, planBld *exec.PlanBuilder, planningStgIDs map[cdssdk.StorageID]bool) (map[string]exec.VarValue, error) { // 统一加锁,有重复也没关系 lockBld := reqbuilder.NewBuilder() for id := range planningStgIDs { lockBld.Shard().Buzy(id) } - lock, err := lockBld.MutexLock(execCtx.Args.DistLock) + lock, err := lockBld.MutexLock(ctx.Args.DistLock) if err != nil { return nil, fmt.Errorf("acquiring distlock: %w", err) } @@ -838,8 +838,9 @@ func (t *CleanPinned) executePlans(execCtx ExecuteContext, planBld *exec.PlanBui go func() { defer wg.Done() - // TODO 添加依赖 - ret, err := planBld.Execute(exec.NewExecContext()).Wait(context.TODO()) + execCtx := exec.NewExecContext() + exec.SetValueByType(execCtx, ctx.Args.StgMgr) + ret, err := planBld.Execute(execCtx).Wait(context.TODO()) if err != nil { ioSwErr = fmt.Errorf("executing io switch plan: %w", err) return diff --git a/scanner/internal/event/event.go b/scanner/internal/event/event.go index 23a2d3d..9d643f3 100644 --- a/scanner/internal/event/event.go +++ b/scanner/internal/event/event.go @@ -9,11 +9,13 @@ import ( "gitlink.org.cn/cloudream/common/pkgs/typedispatcher" "gitlink.org.cn/cloudream/storage/common/pkgs/db2" scevt "gitlink.org.cn/cloudream/storage/common/pkgs/mq/scanner/event" + "gitlink.org.cn/cloudream/storage/common/pkgs/storage/mgr" ) type ExecuteArgs struct { DB *db2.DB DistLock *distlock.Service + StgMgr *mgr.Manager } type Executor = event.Executor[ExecuteArgs] @@ -24,10 +26,11 @@ type Event = event.Event[ExecuteArgs] type ExecuteOption = event.ExecuteOption -func NewExecutor(db *db2.DB, distLock *distlock.Service) Executor { +func NewExecutor(db *db2.DB, distLock *distlock.Service, stgMgr *mgr.Manager) Executor { return event.NewExecutor(ExecuteArgs{ DB: db, DistLock: distLock, + StgMgr: stgMgr, }) } diff --git a/scanner/main.go b/scanner/main.go index 68c87e1..b68d17e 100644 --- a/scanner/main.go +++ b/scanner/main.go @@ -10,6 +10,7 @@ import ( "gitlink.org.cn/cloudream/storage/common/pkgs/distlock" agtrpc "gitlink.org.cn/cloudream/storage/common/pkgs/grpc/agent" scmq "gitlink.org.cn/cloudream/storage/common/pkgs/mq/scanner" + "gitlink.org.cn/cloudream/storage/common/pkgs/storage/mgr" "gitlink.org.cn/cloudream/storage/scanner/internal/config" "gitlink.org.cn/cloudream/storage/scanner/internal/event" "gitlink.org.cn/cloudream/storage/scanner/internal/mq" @@ -38,6 +39,7 @@ func main() { stgglb.InitAgentRPCPool(&agtrpc.PoolConfig{}) + // 启动分布式锁服务 distlockSvc, err := distlock.NewService(&config.Cfg().DistLock) if err != nil { logger.Warnf("new distlock service failed, err: %s", err.Error()) @@ -45,7 +47,11 @@ func main() { } go serveDistLock(distlockSvc) - eventExecutor := event.NewExecutor(db, distlockSvc) + // 启动存储服务管理器 + stgMgr := mgr.NewManager() + + // 启动事件执行器 + eventExecutor := event.NewExecutor(db, distlockSvc, stgMgr) go serveEventExecutor(&eventExecutor) agtSvr, err := scmq.NewServer(mq.NewService(&eventExecutor), &config.Cfg().RabbitMQ)