From 15f7e130a8e0243a5c4802c610facede825054f7 Mon Sep 17 00:00:00 2001 From: Sydonian <794346190@qq.com> Date: Tue, 8 Apr 2025 16:47:55 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BF=AE=E5=A4=8D=E6=89=80=E6=9C=89=E7=BC=96?= =?UTF-8?q?=E8=AF=91=E9=94=99=E8=AF=AF?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- agent/internal/cmd/serve.go | 213 ++++++++---------- agent/internal/config/config.go | 22 +- agent/internal/grpc/service.go | 6 +- agent/internal/http/service.go | 6 +- agent/internal/mq/cache.go | 81 ------- agent/internal/mq/service.go | 14 +- agent/internal/mq/storage.go | 2 + agent/internal/task/cache_move_package.go | 2 + agent/internal/task/create_package.go | 2 + agent/internal/task/task.go | 2 + agent/internal/tickevent/report_hub_stats.go | 2 + .../tickevent/report_storage_stats.go | 2 + common/pkgs/mq/agent/cache.go | 124 ---------- common/pkgs/mq/agent/server.go | 2 +- common/pkgs/mq/coordinator/hub.go | 27 +++ coordinator/internal/mq/hub.go | 12 + 16 files changed, 160 insertions(+), 359 deletions(-) delete mode 100644 agent/internal/mq/cache.go delete mode 100644 common/pkgs/mq/agent/cache.go diff --git a/agent/internal/cmd/serve.go b/agent/internal/cmd/serve.go index 45604f2..5a3417a 100644 --- a/agent/internal/cmd/serve.go +++ b/agent/internal/cmd/serve.go @@ -5,30 +5,21 @@ import ( "fmt" "net" "os" - "time" "github.com/go-co-op/gocron/v2" "gitlink.org.cn/cloudream/storage2/agent/internal/http" - "gitlink.org.cn/cloudream/storage2/agent/internal/tickevent" + "gitlink.org.cn/cloudream/storage2/common/pkgs/storage/pool" "gitlink.org.cn/cloudream/common/pkgs/ioswitch/exec" "gitlink.org.cn/cloudream/common/pkgs/logger" "gitlink.org.cn/cloudream/common/pkgs/mq" - cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" "gitlink.org.cn/cloudream/storage2/agent/internal/config" - "gitlink.org.cn/cloudream/storage2/agent/internal/task" stgglb "gitlink.org.cn/cloudream/storage2/common/globals" - stgmod "gitlink.org.cn/cloudream/storage2/common/models" - "gitlink.org.cn/cloudream/storage2/common/pkgs/accessstat" - "gitlink.org.cn/cloudream/storage2/common/pkgs/connectivity" + "gitlink.org.cn/cloudream/storage2/common/models/datamap" "gitlink.org.cn/cloudream/storage2/common/pkgs/distlock" - "gitlink.org.cn/cloudream/storage2/common/pkgs/downloader" - "gitlink.org.cn/cloudream/storage2/common/pkgs/downloader/strategy" agtrpc "gitlink.org.cn/cloudream/storage2/common/pkgs/grpc/agent" - "gitlink.org.cn/cloudream/storage2/common/pkgs/metacache" - "gitlink.org.cn/cloudream/storage2/common/pkgs/storage/agtpool" "gitlink.org.cn/cloudream/storage2/common/pkgs/sysevent" - "gitlink.org.cn/cloudream/storage2/common/pkgs/uploader" + cortypes "gitlink.org.cn/cloudream/storage2/coordinator/types" "google.golang.org/grpc" @@ -52,83 +43,75 @@ func serve(configPath string) { os.Exit(1) } - stgglb.InitLocal(&config.Cfg().Local) + stgglb.InitLocal(config.Cfg().Local) stgglb.InitMQPool(config.Cfg().RabbitMQ) stgglb.InitAgentRPCPool(&agtrpc.PoolConfig{}) - stgglb.Stats.SetupHubStorageTransfer(*config.Cfg().Local.HubID) - stgglb.Stats.SetupHubTransfer(*config.Cfg().Local.HubID) - + // stgglb.Stats.SetupHubStorageTransfer(*config.Cfg().Local.HubID) + // stgglb.Stats.SetupHubTransfer(*config.Cfg().Local.HubID) // 获取Hub配置 hubCfg := downloadHubConfig() // 初始化存储服务管理器 - stgAgts := agtpool.NewPool() - for _, stg := range hubCfg.Storages { - err := stgAgts.SetupAgent(stg) - if err != nil { - fmt.Printf("init storage %v: %v", stg.Storage.String(), err) - os.Exit(1) - } - } + stgPool := pool.NewPool() // 初始化执行器 worker := exec.NewWorker() // 初始化HTTP服务 - httpSvr, err := http.NewServer(config.Cfg().ListenAddr, http.NewService(&worker, stgAgts)) + httpSvr, err := http.NewServer(config.Cfg().ListenAddr, http.NewService(&worker, stgPool)) if err != nil { logger.Fatalf("new http server failed, err: %s", err.Error()) } go serveHTTP(httpSvr) // 启动网络连通性检测,并就地检测一次 - conCol := connectivity.NewCollector(&config.Cfg().Connectivity, func(collector *connectivity.Collector) { - log := logger.WithField("Connectivity", "") - - coorCli, err := stgglb.CoordinatorMQPool.Acquire() - if err != nil { - log.Warnf("acquire coordinator mq failed, err: %s", err.Error()) - return - } - defer stgglb.CoordinatorMQPool.Release(coorCli) - - cons := collector.GetAll() - hubCons := make([]cdssdk.HubConnectivity, 0, len(cons)) - for _, con := range cons { - var delay *float32 - if con.Latency != nil { - v := float32(con.Latency.Microseconds()) / 1000 - delay = &v - } - - hubCons = append(hubCons, cdssdk.HubConnectivity{ - FromHubID: *stgglb.Local.HubID, - ToHubID: con.ToHubID, - Latency: delay, - TestTime: con.TestTime, - }) - } - - _, err = coorCli.UpdateHubConnectivities(coormq.ReqUpdateHubConnectivities(hubCons)) - if err != nil { - log.Warnf("update hub connectivities: %v", err) - } - }) - conCol.CollectInPlace() + // conCol := connectivity.NewCollector(&config.Cfg().Connectivity, func(collector *connectivity.Collector) { + // log := logger.WithField("Connectivity", "") + + // coorCli, err := stgglb.CoordinatorMQPool.Acquire() + // if err != nil { + // log.Warnf("acquire coordinator mq failed, err: %s", err.Error()) + // return + // } + // defer stgglb.CoordinatorMQPool.Release(coorCli) + + // cons := collector.GetAll() + // hubCons := make([]cortypes.HubConnectivity, 0, len(cons)) + // for _, con := range cons { + // var delay *float32 + // if con.Latency != nil { + // v := float32(con.Latency.Microseconds()) / 1000 + // delay = &v + // } + + // hubCons = append(hubCons, cortypes.HubConnectivity{ + // FromHubID: *stgglb.Local.HubID, + // ToHubID: con.ToHubID, + // Latency: delay, + // TestTime: con.TestTime, + // }) + // } + + // _, err = coorCli.UpdateHubConnectivities(coormq.ReqUpdateHubConnectivities(hubCons)) + // if err != nil { + // log.Warnf("update hub connectivities: %v", err) + // } + // }) + // conCol.CollectInPlace() // 初始化元数据缓存服务 - metacacheHost := metacache.NewHost() - go metacacheHost.Serve() - stgMeta := metacacheHost.AddStorageMeta() - hubMeta := metacacheHost.AddHubMeta() - conMeta := metacacheHost.AddConnectivity() + // metacacheHost := metacache.NewHost() + // go metacacheHost.Serve() + // stgMeta := metacacheHost.AddStorageMeta() + // hubMeta := metacacheHost.AddHubMeta() + // conMeta := metacacheHost.AddConnectivity() // 启动访问统计服务 - acStat := accessstat.NewAccessStat(accessstat.Config{ - // TODO 考虑放到配置里 - ReportInterval: time.Second * 10, - }) - go serveAccessStat(acStat) + // acStat := accessstat.NewAccessStat(accessstat.Config{ + // // TODO 考虑放到配置里 + // ReportInterval: time.Second * 10, + // }) + // go serveAccessStat(acStat) // 初始化分布式锁服务 distlock, err := distlock.NewService(&config.Cfg().DistLock) @@ -136,20 +119,8 @@ func serve(configPath string) { logger.Fatalf("new ipfs failed, err: %s", err.Error()) } - // 初始化下载策略选择器 - strgSel := strategy.NewSelector(config.Cfg().DownloadStrategy, stgMeta, hubMeta, conMeta) - - // 初始化下载器 - dlder := downloader.NewDownloader(config.Cfg().Downloader, &conCol, stgAgts, strgSel) - - // 初始化上传器 - uploader := uploader.NewUploader(distlock, &conCol, stgAgts, stgMeta) - - // 初始化任务管理器 - taskMgr := task.NewManager(distlock, &conCol, &dlder, acStat, stgAgts, uploader) - // 初始化系统事件发布器 - evtPub, err := sysevent.NewPublisher(sysevent.ConfigFromMQConfig(config.Cfg().RabbitMQ), &stgmod.SourceHub{ + evtPub, err := sysevent.NewPublisher(sysevent.ConfigFromMQConfig(config.Cfg().RabbitMQ), &datamap.SourceHub{ HubID: hubCfg.Hub.HubID, HubName: hubCfg.Hub.Name, }) @@ -160,13 +131,13 @@ func serve(configPath string) { go servePublisher(evtPub) // 初始化定时任务执行器 - sch := setupTickTask(stgAgts, evtPub) + sch := setupTickTask(stgPool, evtPub) sch.Start() defer sch.Shutdown() // 启动命令服务器 // TODO 需要设计AgentID持久化机制 - agtSvr, err := agtmq.NewServer(cmdsvc.NewService(&taskMgr, stgAgts, uploader), config.Cfg().ID, config.Cfg().RabbitMQ) + agtSvr, err := agtmq.NewServer(cmdsvc.NewService(stgPool), config.Cfg().ID, config.Cfg().RabbitMQ) if err != nil { logger.Fatalf("new agent server failed, err: %s", err.Error()) } @@ -182,7 +153,7 @@ func serve(configPath string) { logger.Fatalf("listen on %s failed, err: %s", listenAddr, err.Error()) } s := grpc.NewServer() - agtrpc.RegisterAgentServer(s, grpcsvc.NewService(&worker, stgAgts)) + agtrpc.RegisterAgentServer(s, grpcsvc.NewService(&worker, stgPool)) go serveGRPC(s, lis) go serveDistLock(distlock) @@ -199,7 +170,7 @@ func downloadHubConfig() coormq.GetHubConfigResp { } defer stgglb.CoordinatorMQPool.Release(coorCli) - cfgResp, err := coorCli.GetHubConfig(coormq.ReqGetHubConfig(cdssdk.HubID(config.Cfg().ID))) + cfgResp, err := coorCli.GetHubConfig(coormq.ReqGetHubConfig(cortypes.HubID(config.Cfg().ID))) if err != nil { logger.Errorf("getting hub config: %v", err) os.Exit(1) @@ -243,30 +214,24 @@ loop: os.Exit(1) } -func setupTickTask(agtPool *agtpool.AgentPool, evtPub *sysevent.Publisher) gocron.Scheduler { +func setupTickTask(agtPool *pool.Pool, evtPub *sysevent.Publisher) gocron.Scheduler { sch, err := gocron.NewScheduler() if err != nil { logger.Errorf("new cron scheduler: %s", err.Error()) os.Exit(1) } - sch.NewJob(gocron.DailyJob(1, gocron.NewAtTimes( - gocron.NewAtTime(0, 0, 0), - )), gocron.NewTask(tickevent.ReportStorageStats, agtPool, evtPub)) - - sch.NewJob(gocron.DailyJob(1, gocron.NewAtTimes( - gocron.NewAtTime(0, 0, 1), - )), gocron.NewTask(tickevent.ReportHubTransferStats, evtPub)) + // sch.NewJob(gocron.DailyJob(1, gocron.NewAtTimes( + // gocron.NewAtTime(0, 0, 0), + // )), gocron.NewTask(tickevent.ReportStorageStats, agtPool, evtPub)) - sch.NewJob(gocron.DailyJob(1, gocron.NewAtTimes( - gocron.NewAtTime(0, 0, 2), - )), gocron.NewTask(tickevent.ReportHubStorageTransferStats, evtPub)) + // sch.NewJob(gocron.DailyJob(1, gocron.NewAtTimes( + // gocron.NewAtTime(0, 0, 1), + // )), gocron.NewTask(tickevent.ReportHubTransferStats, evtPub)) - // sch.NewJob(gocron.DurationJob(time.Minute), gocron.NewTask(tickevent.ReportStorageStats, agtPool, evtPub)) - - // sch.NewJob(gocron.DurationJob(time.Minute), gocron.NewTask(tickevent.ReportHubTransferStats, evtPub)) - - // sch.NewJob(gocron.DurationJob(time.Minute), gocron.NewTask(tickevent.ReportHubStorageTransferStats, agtPool, evtPub)) + // sch.NewJob(gocron.DailyJob(1, gocron.NewAtTimes( + // gocron.NewAtTime(0, 0, 2), + // )), gocron.NewTask(tickevent.ReportHubStorageTransferStats, evtPub)) return sch } @@ -346,26 +311,26 @@ func serveDistLock(svc *distlock.Service) { os.Exit(1) } -func serveAccessStat(svc *accessstat.AccessStat) { - logger.Info("start serving access stat") - - ch := svc.Start() -loop: - for { - val, err := ch.Receive() - if err != nil { - logger.Errorf("access stat stopped with error: %v", err) - break - } - - switch val := val.(type) { - case error: - logger.Errorf("access stat stopped with error: %v", val) - break loop - } - } - logger.Info("access stat stopped") - - // TODO 仅简单结束了程序 - os.Exit(1) -} +// func serveAccessStat(svc *accessstat.AccessStat) { +// logger.Info("start serving access stat") + +// ch := svc.Start() +// loop: +// for { +// val, err := ch.Receive() +// if err != nil { +// logger.Errorf("access stat stopped with error: %v", err) +// break +// } + +// switch val := val.(type) { +// case error: +// logger.Errorf("access stat stopped with error: %v", val) +// break loop +// } +// } +// logger.Info("access stat stopped") + +// // TODO 仅简单结束了程序 +// os.Exit(1) +// } diff --git a/agent/internal/config/config.go b/agent/internal/config/config.go index ff8ef99..a24ec0e 100644 --- a/agent/internal/config/config.go +++ b/agent/internal/config/config.go @@ -6,24 +6,20 @@ import ( "gitlink.org.cn/cloudream/common/pkgs/mq" cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" c "gitlink.org.cn/cloudream/common/utils/config" - stgmodels "gitlink.org.cn/cloudream/storage2/common/models" + stgglb "gitlink.org.cn/cloudream/storage2/common/globals" "gitlink.org.cn/cloudream/storage2/common/pkgs/connectivity" - "gitlink.org.cn/cloudream/storage2/common/pkgs/downloader" - "gitlink.org.cn/cloudream/storage2/common/pkgs/downloader/strategy" "gitlink.org.cn/cloudream/storage2/common/pkgs/grpc" ) type Config struct { - ID cdssdk.HubID `json:"id"` - ListenAddr string `json:"listenAddr"` - Local stgmodels.LocalMachineInfo `json:"local"` - GRPC *grpc.Config `json:"grpc"` - Logger log.Config `json:"logger"` - RabbitMQ mq.Config `json:"rabbitMQ"` - DistLock distlock.Config `json:"distlock"` - Connectivity connectivity.Config `json:"connectivity"` - Downloader downloader.Config `json:"downloader"` - DownloadStrategy strategy.Config `json:"downloadStrategy"` + ID cdssdk.HubID `json:"id"` + ListenAddr string `json:"listenAddr"` + Local stgglb.LocalMachineInfo `json:"local"` + GRPC *grpc.Config `json:"grpc"` + Logger log.Config `json:"logger"` + RabbitMQ mq.Config `json:"rabbitMQ"` + DistLock distlock.Config `json:"distlock"` + Connectivity connectivity.Config `json:"connectivity"` } var cfg Config diff --git a/agent/internal/grpc/service.go b/agent/internal/grpc/service.go index 1b63b9e..3cee6f8 100644 --- a/agent/internal/grpc/service.go +++ b/agent/internal/grpc/service.go @@ -3,16 +3,16 @@ package grpc import ( "gitlink.org.cn/cloudream/common/pkgs/ioswitch/exec" agentserver "gitlink.org.cn/cloudream/storage2/common/pkgs/grpc/agent" - "gitlink.org.cn/cloudream/storage2/common/pkgs/storage/agtpool" + "gitlink.org.cn/cloudream/storage2/common/pkgs/storage/pool" ) type Service struct { agentserver.AgentServer swWorker *exec.Worker - stgAgts *agtpool.AgentPool + stgAgts *pool.Pool } -func NewService(swWorker *exec.Worker, stgAgts *agtpool.AgentPool) *Service { +func NewService(swWorker *exec.Worker, stgAgts *pool.Pool) *Service { return &Service{ swWorker: swWorker, stgAgts: stgAgts, diff --git a/agent/internal/http/service.go b/agent/internal/http/service.go index 875ef37..e7f413c 100644 --- a/agent/internal/http/service.go +++ b/agent/internal/http/service.go @@ -2,15 +2,15 @@ package http import ( "gitlink.org.cn/cloudream/common/pkgs/ioswitch/exec" - "gitlink.org.cn/cloudream/storage2/common/pkgs/storage/agtpool" + "gitlink.org.cn/cloudream/storage2/common/pkgs/storage/pool" ) type Service struct { swWorker *exec.Worker - stgAgts *agtpool.AgentPool + stgAgts *pool.Pool } -func NewService(swWorker *exec.Worker, stgAgts *agtpool.AgentPool) *Service { +func NewService(swWorker *exec.Worker, stgAgts *pool.Pool) *Service { return &Service{ swWorker: swWorker, stgAgts: stgAgts, diff --git a/agent/internal/mq/cache.go b/agent/internal/mq/cache.go deleted file mode 100644 index d7de1d2..0000000 --- a/agent/internal/mq/cache.go +++ /dev/null @@ -1,81 +0,0 @@ -package mq - -import ( - "fmt" - "time" - - "gitlink.org.cn/cloudream/common/consts/errorcode" - "gitlink.org.cn/cloudream/common/pkgs/mq" - cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" - mytask "gitlink.org.cn/cloudream/storage2/agent/internal/task" - agtmq "gitlink.org.cn/cloudream/storage2/common/pkgs/mq/agent" -) - -func (svc *Service) CheckCache(msg *agtmq.CheckCache) (*agtmq.CheckCacheResp, *mq.CodeMessage) { - store, err := svc.stgAgts.GetShardStore(msg.StorageID) - if err != nil { - return nil, mq.Failed(errorcode.OperationFailed, fmt.Sprintf("get shard store of storage %v: %v", msg.StorageID, err)) - } - - infos, err := store.ListAll() - if err != nil { - return nil, mq.Failed(errorcode.OperationFailed, fmt.Sprintf("listting file in shard store: %v", err)) - } - - var fileHashes []cdssdk.FileHash - for _, info := range infos { - fileHashes = append(fileHashes, info.Hash) - } - - return mq.ReplyOK(agtmq.NewCheckCacheResp(fileHashes)) -} - -func (svc *Service) CacheGC(msg *agtmq.CacheGC) (*agtmq.CacheGCResp, *mq.CodeMessage) { - store, err := svc.stgAgts.GetShardStore(msg.StorageID) - if err != nil { - return nil, mq.Failed(errorcode.OperationFailed, fmt.Sprintf("get shard store of storage %v: %v", msg.StorageID, err)) - } - - err = store.GC(msg.Avaiables) - if err != nil { - return nil, mq.Failed(errorcode.OperationFailed, fmt.Sprintf("purging cache: %v", err)) - } - - return mq.ReplyOK(agtmq.RespCacheGC()) -} - -func (svc *Service) StartCacheMovePackage(msg *agtmq.StartCacheMovePackage) (*agtmq.StartCacheMovePackageResp, *mq.CodeMessage) { - tsk := svc.taskManager.StartNew(mytask.NewCacheMovePackage(msg.UserID, msg.PackageID, msg.StorageID)) - return mq.ReplyOK(agtmq.NewStartCacheMovePackageResp(tsk.ID())) -} - -func (svc *Service) WaitCacheMovePackage(msg *agtmq.WaitCacheMovePackage) (*agtmq.WaitCacheMovePackageResp, *mq.CodeMessage) { - tsk := svc.taskManager.FindByID(msg.TaskID) - if tsk == nil { - return nil, mq.Failed(errorcode.TaskNotFound, "task not found") - } - - if msg.WaitTimeoutMs == 0 { - tsk.Wait() - - errMsg := "" - if tsk.Error() != nil { - errMsg = tsk.Error().Error() - } - - return mq.ReplyOK(agtmq.NewWaitCacheMovePackageResp(true, errMsg)) - - } else { - if tsk.WaitTimeout(time.Duration(msg.WaitTimeoutMs) * time.Millisecond) { - - errMsg := "" - if tsk.Error() != nil { - errMsg = tsk.Error().Error() - } - - return mq.ReplyOK(agtmq.NewWaitCacheMovePackageResp(true, errMsg)) - } - - return mq.ReplyOK(agtmq.NewWaitCacheMovePackageResp(false, "")) - } -} diff --git a/agent/internal/mq/service.go b/agent/internal/mq/service.go index f55caa7..eb0ab15 100644 --- a/agent/internal/mq/service.go +++ b/agent/internal/mq/service.go @@ -1,21 +1,15 @@ package mq import ( - "gitlink.org.cn/cloudream/storage2/agent/internal/task" - "gitlink.org.cn/cloudream/storage2/common/pkgs/storage/agtpool" - "gitlink.org.cn/cloudream/storage2/common/pkgs/uploader" + "gitlink.org.cn/cloudream/storage2/common/pkgs/storage/pool" ) type Service struct { - taskManager *task.Manager - stgAgts *agtpool.AgentPool - uploader *uploader.Uploader + stgAgts *pool.Pool } -func NewService(taskMgr *task.Manager, stgAgts *agtpool.AgentPool, uplodaer *uploader.Uploader) *Service { +func NewService(stgAgts *pool.Pool) *Service { return &Service{ - taskManager: taskMgr, - stgAgts: stgAgts, - uploader: uplodaer, + stgAgts: stgAgts, } } diff --git a/agent/internal/mq/storage.go b/agent/internal/mq/storage.go index a3c01bb..908bd08 100644 --- a/agent/internal/mq/storage.go +++ b/agent/internal/mq/storage.go @@ -1,5 +1,6 @@ package mq +/* import ( "gitlink.org.cn/cloudream/common/consts/errorcode" "gitlink.org.cn/cloudream/common/pkgs/logger" @@ -55,3 +56,4 @@ func (svc *Service) StorageCreatePackage(msg *agtmq.StorageCreatePackage) (*agtm return mq.ReplyOK(agtmq.RespStorageCreatePackage(createResp.Package)) } +*/ diff --git a/agent/internal/task/cache_move_package.go b/agent/internal/task/cache_move_package.go index bb5bf85..8e3843b 100644 --- a/agent/internal/task/cache_move_package.go +++ b/agent/internal/task/cache_move_package.go @@ -1,5 +1,6 @@ package task +/* import ( "fmt" "time" @@ -88,3 +89,4 @@ func (t *CacheMovePackage) do(ctx TaskContext) error { return nil } +*/ diff --git a/agent/internal/task/create_package.go b/agent/internal/task/create_package.go index 507d3b7..568ffcd 100644 --- a/agent/internal/task/create_package.go +++ b/agent/internal/task/create_package.go @@ -1,5 +1,6 @@ package task +/* import ( "fmt" "path/filepath" @@ -146,3 +147,4 @@ func (t *CreatePackage) Execute(task *task.Task[TaskContext], ctx TaskContext, c RemovingDelay: time.Minute, }) } +*/ diff --git a/agent/internal/task/task.go b/agent/internal/task/task.go index 9def3b5..379346e 100644 --- a/agent/internal/task/task.go +++ b/agent/internal/task/task.go @@ -1,5 +1,6 @@ package task +/* import ( "gitlink.org.cn/cloudream/common/pkgs/distlock" "gitlink.org.cn/cloudream/common/pkgs/task" @@ -45,3 +46,4 @@ func NewManager(distlock *distlock.Service, connectivity *connectivity.Collector uploader: uploader, }) } +*/ diff --git a/agent/internal/tickevent/report_hub_stats.go b/agent/internal/tickevent/report_hub_stats.go index e24859f..60d67d3 100644 --- a/agent/internal/tickevent/report_hub_stats.go +++ b/agent/internal/tickevent/report_hub_stats.go @@ -1,5 +1,6 @@ package tickevent +/* import ( "gitlink.org.cn/cloudream/common/utils/math2" stgglb "gitlink.org.cn/cloudream/storage2/common/globals" @@ -59,3 +60,4 @@ func ReportHubStorageTransferStats(stgAgts *agtpool.AgentPool, evtPub *sysevent. }) } } +*/ diff --git a/agent/internal/tickevent/report_storage_stats.go b/agent/internal/tickevent/report_storage_stats.go index 35b2f65..1ea1974 100644 --- a/agent/internal/tickevent/report_storage_stats.go +++ b/agent/internal/tickevent/report_storage_stats.go @@ -1,5 +1,6 @@ package tickevent +/* import ( stgmod "gitlink.org.cn/cloudream/storage2/common/models" "gitlink.org.cn/cloudream/storage2/common/pkgs/storage/agtpool" @@ -21,3 +22,4 @@ func ReportStorageStats(agtPool *agtpool.AgentPool, evtPub *sysevent.Publisher) }) } } +*/ diff --git a/common/pkgs/mq/agent/cache.go b/common/pkgs/mq/agent/cache.go deleted file mode 100644 index 5182c22..0000000 --- a/common/pkgs/mq/agent/cache.go +++ /dev/null @@ -1,124 +0,0 @@ -package agent - -import ( - "gitlink.org.cn/cloudream/common/pkgs/mq" - cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" -) - -type CacheService interface { - CheckCache(msg *CheckCache) (*CheckCacheResp, *mq.CodeMessage) - - CacheGC(msg *CacheGC) (*CacheGCResp, *mq.CodeMessage) - - StartCacheMovePackage(msg *StartCacheMovePackage) (*StartCacheMovePackageResp, *mq.CodeMessage) - WaitCacheMovePackage(msg *WaitCacheMovePackage) (*WaitCacheMovePackageResp, *mq.CodeMessage) -} - -// 检查节点上的IPFS -var _ = Register(Service.CheckCache) - -type CheckCache struct { - mq.MessageBodyBase - StorageID cdssdk.StorageID `json:"storageID"` -} -type CheckCacheResp struct { - mq.MessageBodyBase - FileHashes []cdssdk.FileHash `json:"fileHashes"` -} - -func NewCheckCache(stgID cdssdk.StorageID) *CheckCache { - return &CheckCache{StorageID: stgID} -} -func NewCheckCacheResp(fileHashes []cdssdk.FileHash) *CheckCacheResp { - return &CheckCacheResp{ - FileHashes: fileHashes, - } -} -func (client *Client) CheckCache(msg *CheckCache, opts ...mq.RequestOption) (*CheckCacheResp, error) { - return mq.Request(Service.CheckCache, client.rabbitCli, msg, opts...) -} - -// 清理Cache中不用的文件 -var _ = Register(Service.CacheGC) - -type CacheGC struct { - mq.MessageBodyBase - StorageID cdssdk.StorageID `json:"storageID"` - Avaiables []cdssdk.FileHash `json:"avaiables"` -} -type CacheGCResp struct { - mq.MessageBodyBase -} - -func ReqCacheGC(stgID cdssdk.StorageID, avaiables []cdssdk.FileHash) *CacheGC { - return &CacheGC{ - StorageID: stgID, - Avaiables: avaiables, - } -} -func RespCacheGC() *CacheGCResp { - return &CacheGCResp{} -} -func (client *Client) CacheGC(msg *CacheGC, opts ...mq.RequestOption) (*CacheGCResp, error) { - return mq.Request(Service.CacheGC, client.rabbitCli, msg, opts...) -} - -// 将Package的缓存移动到这个节点 -var _ = Register(Service.StartCacheMovePackage) - -type StartCacheMovePackage struct { - mq.MessageBodyBase - UserID cdssdk.UserID `json:"userID"` - PackageID cdssdk.PackageID `json:"packageID"` - StorageID cdssdk.StorageID `json:"storageID"` -} -type StartCacheMovePackageResp struct { - mq.MessageBodyBase - TaskID string `json:"taskID"` -} - -func NewStartCacheMovePackage(userID cdssdk.UserID, packageID cdssdk.PackageID, stgID cdssdk.StorageID) *StartCacheMovePackage { - return &StartCacheMovePackage{ - UserID: userID, - PackageID: packageID, - StorageID: stgID, - } -} -func NewStartCacheMovePackageResp(taskID string) *StartCacheMovePackageResp { - return &StartCacheMovePackageResp{ - TaskID: taskID, - } -} -func (client *Client) StartCacheMovePackage(msg *StartCacheMovePackage, opts ...mq.RequestOption) (*StartCacheMovePackageResp, error) { - return mq.Request(Service.StartCacheMovePackage, client.rabbitCli, msg, opts...) -} - -// 将Package的缓存移动到这个节点 -var _ = Register(Service.WaitCacheMovePackage) - -type WaitCacheMovePackage struct { - mq.MessageBodyBase - TaskID string `json:"taskID"` - WaitTimeoutMs int64 `json:"waitTimeout"` -} -type WaitCacheMovePackageResp struct { - mq.MessageBodyBase - IsComplete bool `json:"isComplete"` - Error string `json:"error"` -} - -func NewWaitCacheMovePackage(taskID string, waitTimeoutMs int64) *WaitCacheMovePackage { - return &WaitCacheMovePackage{ - TaskID: taskID, - WaitTimeoutMs: waitTimeoutMs, - } -} -func NewWaitCacheMovePackageResp(isComplete bool, err string) *WaitCacheMovePackageResp { - return &WaitCacheMovePackageResp{ - IsComplete: isComplete, - Error: err, - } -} -func (client *Client) WaitCacheMovePackage(msg *WaitCacheMovePackage, opts ...mq.RequestOption) (*WaitCacheMovePackageResp, error) { - return mq.Request(Service.WaitCacheMovePackage, client.rabbitCli, msg, opts...) -} diff --git a/common/pkgs/mq/agent/server.go b/common/pkgs/mq/agent/server.go index 09b7eda..9a8b090 100644 --- a/common/pkgs/mq/agent/server.go +++ b/common/pkgs/mq/agent/server.go @@ -10,7 +10,7 @@ import ( type Service interface { // UserSpaceService - CacheService + // CacheService AgentService } diff --git a/common/pkgs/mq/coordinator/hub.go b/common/pkgs/mq/coordinator/hub.go index 5c8505e..d634c0b 100644 --- a/common/pkgs/mq/coordinator/hub.go +++ b/common/pkgs/mq/coordinator/hub.go @@ -6,11 +6,38 @@ import ( ) type HubService interface { + GetHubConfig(msg *GetHubConfig) (*GetHubConfigResp, *mq.CodeMessage) + GetHubs(msg *GetHubs) (*GetHubsResp, *mq.CodeMessage) GetHubConnectivities(msg *GetHubConnectivities) (*GetHubConnectivitiesResp, *mq.CodeMessage) } +var _ = Register(Service.GetHubConfig) + +type GetHubConfig struct { + mq.MessageBodyBase + HubID cortypes.HubID `json:"hubID"` +} +type GetHubConfigResp struct { + mq.MessageBodyBase + Hub cortypes.Hub `json:"hub"` +} + +func ReqGetHubConfig(hubID cortypes.HubID) *GetHubConfig { + return &GetHubConfig{ + HubID: hubID, + } +} +func RespGetHubConfig(hub cortypes.Hub) *GetHubConfigResp { + return &GetHubConfigResp{ + Hub: hub, + } +} +func (client *Client) GetHubConfig(msg *GetHubConfig) (*GetHubConfigResp, error) { + return mq.Request(Service.GetHubConfig, client.rabbitCli, msg) +} + // 获取指定节点的信息。如果HubIDs为nil,则返回所有Hub var _ = Register(Service.GetHubs) diff --git a/coordinator/internal/mq/hub.go b/coordinator/internal/mq/hub.go index e7b1d8d..a91f7ed 100644 --- a/coordinator/internal/mq/hub.go +++ b/coordinator/internal/mq/hub.go @@ -10,6 +10,18 @@ import ( cortypes "gitlink.org.cn/cloudream/storage2/coordinator/types" ) +func (svc *Service) GetHubConfig(msg *coormq.GetHubConfig) (*coormq.GetHubConfigResp, *mq.CodeMessage) { + log := logger.WithField("HubID", msg.HubID) + + hub, err := svc.db.Hub().GetByID(svc.db.DefCtx(), msg.HubID) + if err != nil { + log.Warnf("getting hub: %v", err) + return nil, mq.Failed(errorcode.OperationFailed, fmt.Sprintf("getting hub: %v", err)) + } + + return mq.ReplyOK(coormq.RespGetHubConfig(hub)) +} + func (svc *Service) GetHubs(msg *coormq.GetHubs) (*coormq.GetHubsResp, *mq.CodeMessage) { var hubs []*cortypes.Hub