From af8fa2f9fa29ba4f2715f46674f020c44b66ae78 Mon Sep 17 00:00:00 2001 From: Sydonian <794346190@qq.com> Date: Fri, 30 May 2025 17:04:17 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BF=AE=E5=A4=8D=E5=8D=95=E6=9C=BA=E6=A8=A1?= =?UTF-8?q?=E5=BC=8F=E5=B4=A9=E6=BA=83=E7=9A=84=E9=97=AE=E9=A2=98?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- client/internal/cmdline/serve.go | 33 +++++- client/internal/cmdline/test.go | 34 ++++++- client/internal/cmdline/vfstest.go | 32 +++++- common/pkgs/connectivity/collector.go | 140 +++++++++++++++----------- 4 files changed, 170 insertions(+), 69 deletions(-) diff --git a/client/internal/cmdline/serve.go b/client/internal/cmdline/serve.go index 6e738a4..be083b6 100644 --- a/client/internal/cmdline/serve.go +++ b/client/internal/cmdline/serve.go @@ -125,7 +125,14 @@ func serveHTTP(configPath string, opts serveHTTPOptions) { defer evtPub.Stop() // 连接性信息收集 - conCol := connectivity.NewCollector(&config.Cfg().Connectivity, nil) + var conCol *connectivity.Collector + if stgglb.StandaloneMode { + conCol = connectivity.NewDisabled() + } else { + conCol = connectivity.NewEnabled(config.Cfg().Connectivity) + } + conColChan := conCol.Start() + defer conCol.Stop() conCol.CollectInPlace() // 元数据缓存 @@ -153,10 +160,10 @@ func serveHTTP(configPath string, opts serveHTTPOptions) { strgSel := strategy.NewSelector(config.Cfg().DownloadStrategy, stgMeta, hubMeta, conMeta) // 下载器 - dlder := downloader.NewDownloader(config.Cfg().Downloader, &conCol, stgPool, strgSel, db) + dlder := downloader.NewDownloader(config.Cfg().Downloader, conCol, stgPool, strgSel, db) // 上传器 - uploader := uploader.NewUploader(publock, &conCol, stgPool, stgMeta, db) + uploader := uploader.NewUploader(publock, conCol, stgPool, stgMeta, db) // 定时任务 tktk := ticktock.New(config.Cfg().TickTock, db, stgMeta, stgPool, evtPub, publock) @@ -199,6 +206,7 @@ func serveHTTP(configPath string, opts serveHTTPOptions) { accTokenEvt := accTokenChan.Receive() evtPubEvt := evtPubChan.Receive() + conColEvt := conColChan.Receive() acStatEvt := acStatChan.Receive() replEvt := replCh.Receive() httpEvt := httpChan.Receive() @@ -247,6 +255,25 @@ loop: } evtPubEvt = evtPubChan.Receive() + case e := <-conColEvt.Chan(): + if e.Err != nil { + logger.Errorf("receive connectivity event: %v", err) + break loop + } + + switch e := e.Value.(type) { + case connectivity.ExitEvent: + if e.Err != nil { + logger.Errorf("connectivity collector exited with error: %v", e.Err) + } else { + logger.Info("connectivity collector exited") + } + break loop + + case connectivity.CollectedEvent: + } + conColEvt = conColChan.Receive() + case e := <-acStatEvt.Chan(): if e.Err != nil { logger.Errorf("receive access stat event: %v", err) diff --git a/client/internal/cmdline/test.go b/client/internal/cmdline/test.go index 5bd87fd..d394438 100644 --- a/client/internal/cmdline/test.go +++ b/client/internal/cmdline/test.go @@ -136,8 +136,15 @@ func test(configPath string) { defer evtPub.Stop() // 连接性信息收集 - conCol := connectivity.NewCollector(&config.Cfg().Connectivity, nil) - conCol.CollecNow() + var conCol *connectivity.Collector + if stgglb.StandaloneMode { + conCol = connectivity.NewDisabled() + } else { + conCol = connectivity.NewEnabled(config.Cfg().Connectivity) + } + conColChan := conCol.Start() + defer conCol.Stop() + conCol.CollectInPlace() // 元数据缓存 metaCacheHost := metacache.NewHost(db) @@ -164,10 +171,10 @@ func test(configPath string) { strgSel := strategy.NewSelector(config.Cfg().DownloadStrategy, stgMeta, hubMeta, conMeta) // 下载器 - dlder := downloader.NewDownloader(config.Cfg().Downloader, &conCol, stgPool, strgSel, db) + dlder := downloader.NewDownloader(config.Cfg().Downloader, conCol, stgPool, strgSel, db) // 上传器 - uploader := uploader.NewUploader(publock, &conCol, stgPool, stgMeta, db) + uploader := uploader.NewUploader(publock, conCol, stgPool, stgMeta, db) svc := services.NewService(publock, dlder, acStat, uploader, strgSel, stgMeta, db, evtPub, nil, stgPool) @@ -178,6 +185,7 @@ func test(configPath string) { /// 开始监听各个模块的事件 accTokenEvt := accTokenChan.Receive() evtPubEvt := evtPubChan.Receive() + conColEvt := conColChan.Receive() acStatEvt := acStatChan.Receive() loop: @@ -223,6 +231,24 @@ loop: } evtPubEvt = evtPubChan.Receive() + case e := <-conColEvt.Chan(): + if e.Err != nil { + logger.Errorf("receive connectivity event: %v", err) + break loop + } + switch e := e.Value.(type) { + case connectivity.ExitEvent: + if e.Err != nil { + logger.Errorf("connectivity collector exited with error: %v", e.Err) + } else { + logger.Info("connectivity collector exited") + } + break loop + + case connectivity.CollectedEvent: + } + conColEvt = conColChan.Receive() + case e := <-acStatEvt.Chan(): if e.Err != nil { logger.Errorf("receive access stat event: %v", err) diff --git a/client/internal/cmdline/vfstest.go b/client/internal/cmdline/vfstest.go index 21ec6c8..5745838 100644 --- a/client/internal/cmdline/vfstest.go +++ b/client/internal/cmdline/vfstest.go @@ -116,7 +116,14 @@ func vfsTest(configPath string, opts serveHTTPOptions) { defer evtPub.Stop() // 连接性信息收集 - conCol := connectivity.NewCollector(&config.Cfg().Connectivity, nil) + var conCol *connectivity.Collector + if stgglb.StandaloneMode { + conCol = connectivity.NewDisabled() + } else { + conCol = connectivity.NewEnabled(config.Cfg().Connectivity) + } + conColChan := conCol.Start() + defer conCol.Stop() conCol.CollectInPlace() // 元数据缓存 @@ -144,10 +151,10 @@ func vfsTest(configPath string, opts serveHTTPOptions) { strgSel := strategy.NewSelector(config.Cfg().DownloadStrategy, stgMeta, hubMeta, conMeta) // 下载器 - dlder := downloader.NewDownloader(config.Cfg().Downloader, &conCol, stgPool, strgSel, db) + dlder := downloader.NewDownloader(config.Cfg().Downloader, conCol, stgPool, strgSel, db) // 上传器 - uploader := uploader.NewUploader(publock, &conCol, stgPool, stgMeta, db) + uploader := uploader.NewUploader(publock, conCol, stgPool, stgMeta, db) // 挂载 mntCfg := config.Cfg().Mount @@ -190,6 +197,7 @@ func vfsTest(configPath string, opts serveHTTPOptions) { accTokenEvt := accTokenChan.Receive() evtPubEvt := evtPubChan.Receive() + conColEvt := conColChan.Receive() acStatEvt := acStatChan.Receive() httpEvt := httpChan.Receive() mntEvt := mntChan.Receive() @@ -237,6 +245,24 @@ loop: } evtPubEvt = evtPubChan.Receive() + case e := <-conColEvt.Chan(): + if e.Err != nil { + logger.Errorf("receive connectivity event: %v", err) + break loop + } + + switch e := e.Value.(type) { + case connectivity.ExitEvent: + if e.Err != nil { + logger.Errorf("connectivity collector exited with error: %v", e.Err) + } else { + logger.Info("connectivity collector exited") + } + break loop + + case connectivity.CollectedEvent: + } + case e := <-acStatEvt.Chan(): if e.Err != nil { logger.Errorf("receive access stat event: %v", err) diff --git a/common/pkgs/connectivity/collector.go b/common/pkgs/connectivity/collector.go index e184420..99bb321 100644 --- a/common/pkgs/connectivity/collector.go +++ b/common/pkgs/connectivity/collector.go @@ -6,6 +6,7 @@ import ( "sync" "time" + "gitlink.org.cn/cloudream/common/pkgs/async" "gitlink.org.cn/cloudream/common/pkgs/logger" stgglb "gitlink.org.cn/cloudream/jcs-pub/common/globals" corrpc "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/rpc/coordinator" @@ -13,6 +14,19 @@ import ( cortypes "gitlink.org.cn/cloudream/jcs-pub/coordinator/types" ) +type CollectorEvent interface { + IsCollectorEvent() bool +} + +type ExitEvent struct { + CollectorEvent + Err error +} + +type CollectedEvent struct { + CollectorEvent +} + type Connectivity struct { ToHubID cortypes.HubID Latency *time.Duration @@ -20,38 +34,33 @@ type Connectivity struct { } type Collector struct { - cfg *Config - onCollected func(collector *Collector) + cfg Config + enabled bool collectNow chan any - close chan any + done chan any connectivities map[cortypes.HubID]Connectivity lock *sync.RWMutex } -func NewCollector(cfg *Config, onCollected func(collector *Collector)) Collector { +func NewEnabled(cfg Config) *Collector { rpt := Collector{ cfg: cfg, - collectNow: make(chan any), - close: make(chan any), + enabled: true, + collectNow: make(chan any, 1), + done: make(chan any, 1), connectivities: make(map[cortypes.HubID]Connectivity), lock: &sync.RWMutex{}, - onCollected: onCollected, } - go rpt.serve() - return rpt + return &rpt } - -func NewCollectorWithInitData(cfg *Config, onCollected func(collector *Collector), initData map[cortypes.HubID]Connectivity) Collector { - rpt := Collector{ - cfg: cfg, - collectNow: make(chan any), - close: make(chan any), - connectivities: initData, +func NewDisabled() *Collector { + return &Collector{ + enabled: false, + collectNow: make(chan any, 1), + done: make(chan any, 1), + connectivities: make(map[cortypes.HubID]Connectivity), lock: &sync.RWMutex{}, - onCollected: onCollected, } - go rpt.serve() - return rpt } func (r *Collector) GetAll() map[cortypes.HubID]Connectivity { @@ -74,59 +83,74 @@ func (r *Collector) CollecNow() { } } -// 就地进行收集,会阻塞当前线程 +// 就地进行收集,会阻塞当前线程。如果模块未启用,则不会有任何效果 func (r *Collector) CollectInPlace() { - r.testing() -} - -func (r *Collector) Close() { - select { - case r.close <- nil: - default: + if !r.enabled { + return } + + r.testing() } -func (r *Collector) serve() { - log := logger.WithType[Collector]("") - log.Info("start connectivity reporter") - - // 为了防止同时启动的节点会集中进行Ping,所以第一次上报间隔为0-TestInterval秒之间随机 - startup := true - firstReportLatency := time.Duration(float64(r.cfg.TestInterval) * float64(time.Second) * rand.Float64()) - ticker := time.NewTicker(firstReportLatency) - -loop: - for { - select { - case <-ticker.C: - r.testing() - if startup { - startup = false - ticker.Reset(time.Duration(r.cfg.TestInterval) * time.Second) - } +func (r *Collector) Start() *async.UnboundChannel[CollectorEvent] { + log := logger.WithField("Mod", "Collector") - case <-r.collectNow: - r.testing() + ch := async.NewUnboundChannel[CollectorEvent]() + go func() { + if !r.enabled { + return + } - case <-r.close: - ticker.Stop() - break loop + // 为了防止同时启动的节点会集中进行Ping,所以第一次上报间隔为0-TestInterval秒之间随机 + startup := true + firstReportLatency := time.Duration(float64(r.cfg.TestInterval) * float64(time.Second) * rand.Float64()) + ticker := time.NewTicker(firstReportLatency) + + loop: + for { + select { + case <-ticker.C: + log.Infof("collecting...") + if r.testing() { + ch.Send(CollectedEvent{}) + } + if startup { + startup = false + ticker.Reset(time.Duration(r.cfg.TestInterval) * time.Second) + } + + case <-r.collectNow: + log.Infof("collecting...") + if r.testing() { + ch.Send(CollectedEvent{}) + } + + case <-r.done: + ticker.Stop() + break loop + } } - } - log.Info("stop connectivity reporter") + ch.Send(ExitEvent{}) + }() + + return ch } -func (r *Collector) testing() { - log := logger.WithType[Collector]("") - log.Debug("do testing") +func (r *Collector) Stop() { + select { + case r.done <- nil: + default: + } +} +func (r *Collector) testing() bool { coorCli := stgglb.CoordinatorRPCPool.Get() defer coorCli.Release() getHubResp, cerr := coorCli.GetHubs(context.Background(), corrpc.NewGetHubs(nil)) if cerr != nil { - return + return false } wg := sync.WaitGroup{} @@ -152,9 +176,7 @@ func (r *Collector) testing() { } r.lock.Unlock() - if r.onCollected != nil { - r.onCollected(r) - } + return true } func (r *Collector) ping(hub cortypes.Hub) Connectivity {