Browse Source

修复单机模式崩溃的问题

feature_gxh
Sydonian 5 months ago
parent
commit
af8fa2f9fa
4 changed files with 170 additions and 69 deletions
  1. +30
    -3
      client/internal/cmdline/serve.go
  2. +30
    -4
      client/internal/cmdline/test.go
  3. +29
    -3
      client/internal/cmdline/vfstest.go
  4. +81
    -59
      common/pkgs/connectivity/collector.go

+ 30
- 3
client/internal/cmdline/serve.go View File

@@ -125,7 +125,14 @@ func serveHTTP(configPath string, opts serveHTTPOptions) {
defer evtPub.Stop() defer evtPub.Stop()


// 连接性信息收集 // 连接性信息收集
conCol := connectivity.NewCollector(&config.Cfg().Connectivity, nil)
var conCol *connectivity.Collector
if stgglb.StandaloneMode {
conCol = connectivity.NewDisabled()
} else {
conCol = connectivity.NewEnabled(config.Cfg().Connectivity)
}
conColChan := conCol.Start()
defer conCol.Stop()
conCol.CollectInPlace() conCol.CollectInPlace()


// 元数据缓存 // 元数据缓存
@@ -153,10 +160,10 @@ func serveHTTP(configPath string, opts serveHTTPOptions) {
strgSel := strategy.NewSelector(config.Cfg().DownloadStrategy, stgMeta, hubMeta, conMeta) strgSel := strategy.NewSelector(config.Cfg().DownloadStrategy, stgMeta, hubMeta, conMeta)


// 下载器 // 下载器
dlder := downloader.NewDownloader(config.Cfg().Downloader, &conCol, stgPool, strgSel, db)
dlder := downloader.NewDownloader(config.Cfg().Downloader, conCol, stgPool, strgSel, db)


// 上传器 // 上传器
uploader := uploader.NewUploader(publock, &conCol, stgPool, stgMeta, db)
uploader := uploader.NewUploader(publock, conCol, stgPool, stgMeta, db)


// 定时任务 // 定时任务
tktk := ticktock.New(config.Cfg().TickTock, db, stgMeta, stgPool, evtPub, publock) tktk := ticktock.New(config.Cfg().TickTock, db, stgMeta, stgPool, evtPub, publock)
@@ -199,6 +206,7 @@ func serveHTTP(configPath string, opts serveHTTPOptions) {


accTokenEvt := accTokenChan.Receive() accTokenEvt := accTokenChan.Receive()
evtPubEvt := evtPubChan.Receive() evtPubEvt := evtPubChan.Receive()
conColEvt := conColChan.Receive()
acStatEvt := acStatChan.Receive() acStatEvt := acStatChan.Receive()
replEvt := replCh.Receive() replEvt := replCh.Receive()
httpEvt := httpChan.Receive() httpEvt := httpChan.Receive()
@@ -247,6 +255,25 @@ loop:
} }
evtPubEvt = evtPubChan.Receive() evtPubEvt = evtPubChan.Receive()


case e := <-conColEvt.Chan():
if e.Err != nil {
logger.Errorf("receive connectivity event: %v", err)
break loop
}

switch e := e.Value.(type) {
case connectivity.ExitEvent:
if e.Err != nil {
logger.Errorf("connectivity collector exited with error: %v", e.Err)
} else {
logger.Info("connectivity collector exited")
}
break loop

case connectivity.CollectedEvent:
}
conColEvt = conColChan.Receive()

case e := <-acStatEvt.Chan(): case e := <-acStatEvt.Chan():
if e.Err != nil { if e.Err != nil {
logger.Errorf("receive access stat event: %v", err) logger.Errorf("receive access stat event: %v", err)


+ 30
- 4
client/internal/cmdline/test.go View File

@@ -136,8 +136,15 @@ func test(configPath string) {
defer evtPub.Stop() defer evtPub.Stop()


// 连接性信息收集 // 连接性信息收集
conCol := connectivity.NewCollector(&config.Cfg().Connectivity, nil)
conCol.CollecNow()
var conCol *connectivity.Collector
if stgglb.StandaloneMode {
conCol = connectivity.NewDisabled()
} else {
conCol = connectivity.NewEnabled(config.Cfg().Connectivity)
}
conColChan := conCol.Start()
defer conCol.Stop()
conCol.CollectInPlace()


// 元数据缓存 // 元数据缓存
metaCacheHost := metacache.NewHost(db) metaCacheHost := metacache.NewHost(db)
@@ -164,10 +171,10 @@ func test(configPath string) {
strgSel := strategy.NewSelector(config.Cfg().DownloadStrategy, stgMeta, hubMeta, conMeta) strgSel := strategy.NewSelector(config.Cfg().DownloadStrategy, stgMeta, hubMeta, conMeta)


// 下载器 // 下载器
dlder := downloader.NewDownloader(config.Cfg().Downloader, &conCol, stgPool, strgSel, db)
dlder := downloader.NewDownloader(config.Cfg().Downloader, conCol, stgPool, strgSel, db)


// 上传器 // 上传器
uploader := uploader.NewUploader(publock, &conCol, stgPool, stgMeta, db)
uploader := uploader.NewUploader(publock, conCol, stgPool, stgMeta, db)


svc := services.NewService(publock, dlder, acStat, uploader, strgSel, stgMeta, db, evtPub, nil, stgPool) svc := services.NewService(publock, dlder, acStat, uploader, strgSel, stgMeta, db, evtPub, nil, stgPool)


@@ -178,6 +185,7 @@ func test(configPath string) {
/// 开始监听各个模块的事件 /// 开始监听各个模块的事件
accTokenEvt := accTokenChan.Receive() accTokenEvt := accTokenChan.Receive()
evtPubEvt := evtPubChan.Receive() evtPubEvt := evtPubChan.Receive()
conColEvt := conColChan.Receive()
acStatEvt := acStatChan.Receive() acStatEvt := acStatChan.Receive()


loop: loop:
@@ -223,6 +231,24 @@ loop:
} }
evtPubEvt = evtPubChan.Receive() evtPubEvt = evtPubChan.Receive()


case e := <-conColEvt.Chan():
if e.Err != nil {
logger.Errorf("receive connectivity event: %v", err)
break loop
}
switch e := e.Value.(type) {
case connectivity.ExitEvent:
if e.Err != nil {
logger.Errorf("connectivity collector exited with error: %v", e.Err)
} else {
logger.Info("connectivity collector exited")
}
break loop

case connectivity.CollectedEvent:
}
conColEvt = conColChan.Receive()

case e := <-acStatEvt.Chan(): case e := <-acStatEvt.Chan():
if e.Err != nil { if e.Err != nil {
logger.Errorf("receive access stat event: %v", err) logger.Errorf("receive access stat event: %v", err)


+ 29
- 3
client/internal/cmdline/vfstest.go View File

@@ -116,7 +116,14 @@ func vfsTest(configPath string, opts serveHTTPOptions) {
defer evtPub.Stop() defer evtPub.Stop()


// 连接性信息收集 // 连接性信息收集
conCol := connectivity.NewCollector(&config.Cfg().Connectivity, nil)
var conCol *connectivity.Collector
if stgglb.StandaloneMode {
conCol = connectivity.NewDisabled()
} else {
conCol = connectivity.NewEnabled(config.Cfg().Connectivity)
}
conColChan := conCol.Start()
defer conCol.Stop()
conCol.CollectInPlace() conCol.CollectInPlace()


// 元数据缓存 // 元数据缓存
@@ -144,10 +151,10 @@ func vfsTest(configPath string, opts serveHTTPOptions) {
strgSel := strategy.NewSelector(config.Cfg().DownloadStrategy, stgMeta, hubMeta, conMeta) strgSel := strategy.NewSelector(config.Cfg().DownloadStrategy, stgMeta, hubMeta, conMeta)


// 下载器 // 下载器
dlder := downloader.NewDownloader(config.Cfg().Downloader, &conCol, stgPool, strgSel, db)
dlder := downloader.NewDownloader(config.Cfg().Downloader, conCol, stgPool, strgSel, db)


// 上传器 // 上传器
uploader := uploader.NewUploader(publock, &conCol, stgPool, stgMeta, db)
uploader := uploader.NewUploader(publock, conCol, stgPool, stgMeta, db)


// 挂载 // 挂载
mntCfg := config.Cfg().Mount mntCfg := config.Cfg().Mount
@@ -190,6 +197,7 @@ func vfsTest(configPath string, opts serveHTTPOptions) {


accTokenEvt := accTokenChan.Receive() accTokenEvt := accTokenChan.Receive()
evtPubEvt := evtPubChan.Receive() evtPubEvt := evtPubChan.Receive()
conColEvt := conColChan.Receive()
acStatEvt := acStatChan.Receive() acStatEvt := acStatChan.Receive()
httpEvt := httpChan.Receive() httpEvt := httpChan.Receive()
mntEvt := mntChan.Receive() mntEvt := mntChan.Receive()
@@ -237,6 +245,24 @@ loop:
} }
evtPubEvt = evtPubChan.Receive() evtPubEvt = evtPubChan.Receive()


case e := <-conColEvt.Chan():
if e.Err != nil {
logger.Errorf("receive connectivity event: %v", err)
break loop
}

switch e := e.Value.(type) {
case connectivity.ExitEvent:
if e.Err != nil {
logger.Errorf("connectivity collector exited with error: %v", e.Err)
} else {
logger.Info("connectivity collector exited")
}
break loop

case connectivity.CollectedEvent:
}

case e := <-acStatEvt.Chan(): case e := <-acStatEvt.Chan():
if e.Err != nil { if e.Err != nil {
logger.Errorf("receive access stat event: %v", err) logger.Errorf("receive access stat event: %v", err)


+ 81
- 59
common/pkgs/connectivity/collector.go View File

@@ -6,6 +6,7 @@ import (
"sync" "sync"
"time" "time"


"gitlink.org.cn/cloudream/common/pkgs/async"
"gitlink.org.cn/cloudream/common/pkgs/logger" "gitlink.org.cn/cloudream/common/pkgs/logger"
stgglb "gitlink.org.cn/cloudream/jcs-pub/common/globals" stgglb "gitlink.org.cn/cloudream/jcs-pub/common/globals"
corrpc "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/rpc/coordinator" corrpc "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/rpc/coordinator"
@@ -13,6 +14,19 @@ import (
cortypes "gitlink.org.cn/cloudream/jcs-pub/coordinator/types" cortypes "gitlink.org.cn/cloudream/jcs-pub/coordinator/types"
) )


type CollectorEvent interface {
IsCollectorEvent() bool
}

type ExitEvent struct {
CollectorEvent
Err error
}

type CollectedEvent struct {
CollectorEvent
}

type Connectivity struct { type Connectivity struct {
ToHubID cortypes.HubID ToHubID cortypes.HubID
Latency *time.Duration Latency *time.Duration
@@ -20,38 +34,33 @@ type Connectivity struct {
} }


type Collector struct { type Collector struct {
cfg *Config
onCollected func(collector *Collector)
cfg Config
enabled bool
collectNow chan any collectNow chan any
close chan any
done chan any
connectivities map[cortypes.HubID]Connectivity connectivities map[cortypes.HubID]Connectivity
lock *sync.RWMutex lock *sync.RWMutex
} }


func NewCollector(cfg *Config, onCollected func(collector *Collector)) Collector {
func NewEnabled(cfg Config) *Collector {
rpt := Collector{ rpt := Collector{
cfg: cfg, cfg: cfg,
collectNow: make(chan any),
close: make(chan any),
enabled: true,
collectNow: make(chan any, 1),
done: make(chan any, 1),
connectivities: make(map[cortypes.HubID]Connectivity), connectivities: make(map[cortypes.HubID]Connectivity),
lock: &sync.RWMutex{}, lock: &sync.RWMutex{},
onCollected: onCollected,
} }
go rpt.serve()
return rpt
return &rpt
} }

func NewCollectorWithInitData(cfg *Config, onCollected func(collector *Collector), initData map[cortypes.HubID]Connectivity) Collector {
rpt := Collector{
cfg: cfg,
collectNow: make(chan any),
close: make(chan any),
connectivities: initData,
func NewDisabled() *Collector {
return &Collector{
enabled: false,
collectNow: make(chan any, 1),
done: make(chan any, 1),
connectivities: make(map[cortypes.HubID]Connectivity),
lock: &sync.RWMutex{}, lock: &sync.RWMutex{},
onCollected: onCollected,
} }
go rpt.serve()
return rpt
} }


func (r *Collector) GetAll() map[cortypes.HubID]Connectivity { func (r *Collector) GetAll() map[cortypes.HubID]Connectivity {
@@ -74,59 +83,74 @@ func (r *Collector) CollecNow() {
} }
} }


// 就地进行收集,会阻塞当前线程
// 就地进行收集,会阻塞当前线程。如果模块未启用,则不会有任何效果
func (r *Collector) CollectInPlace() { func (r *Collector) CollectInPlace() {
r.testing()
}

func (r *Collector) Close() {
select {
case r.close <- nil:
default:
if !r.enabled {
return
} }

r.testing()
} }


func (r *Collector) serve() {
log := logger.WithType[Collector]("")
log.Info("start connectivity reporter")

// 为了防止同时启动的节点会集中进行Ping,所以第一次上报间隔为0-TestInterval秒之间随机
startup := true
firstReportLatency := time.Duration(float64(r.cfg.TestInterval) * float64(time.Second) * rand.Float64())
ticker := time.NewTicker(firstReportLatency)

loop:
for {
select {
case <-ticker.C:
r.testing()
if startup {
startup = false
ticker.Reset(time.Duration(r.cfg.TestInterval) * time.Second)
}
func (r *Collector) Start() *async.UnboundChannel[CollectorEvent] {
log := logger.WithField("Mod", "Collector")


case <-r.collectNow:
r.testing()
ch := async.NewUnboundChannel[CollectorEvent]()
go func() {
if !r.enabled {
return
}


case <-r.close:
ticker.Stop()
break loop
// 为了防止同时启动的节点会集中进行Ping,所以第一次上报间隔为0-TestInterval秒之间随机
startup := true
firstReportLatency := time.Duration(float64(r.cfg.TestInterval) * float64(time.Second) * rand.Float64())
ticker := time.NewTicker(firstReportLatency)

loop:
for {
select {
case <-ticker.C:
log.Infof("collecting...")
if r.testing() {
ch.Send(CollectedEvent{})
}
if startup {
startup = false
ticker.Reset(time.Duration(r.cfg.TestInterval) * time.Second)
}

case <-r.collectNow:
log.Infof("collecting...")
if r.testing() {
ch.Send(CollectedEvent{})
}

case <-r.done:
ticker.Stop()
break loop
}
} }
}


log.Info("stop connectivity reporter")
ch.Send(ExitEvent{})
}()

return ch
} }


func (r *Collector) testing() {
log := logger.WithType[Collector]("")
log.Debug("do testing")
func (r *Collector) Stop() {
select {
case r.done <- nil:
default:
}
}


func (r *Collector) testing() bool {
coorCli := stgglb.CoordinatorRPCPool.Get() coorCli := stgglb.CoordinatorRPCPool.Get()
defer coorCli.Release() defer coorCli.Release()


getHubResp, cerr := coorCli.GetHubs(context.Background(), corrpc.NewGetHubs(nil)) getHubResp, cerr := coorCli.GetHubs(context.Background(), corrpc.NewGetHubs(nil))
if cerr != nil { if cerr != nil {
return
return false
} }


wg := sync.WaitGroup{} wg := sync.WaitGroup{}
@@ -152,9 +176,7 @@ func (r *Collector) testing() {
} }
r.lock.Unlock() r.lock.Unlock()


if r.onCollected != nil {
r.onCollected(r)
}
return true
} }


func (r *Collector) ping(hub cortypes.Hub) Connectivity { func (r *Collector) ping(hub cortypes.Hub) Connectivity {


Loading…
Cancel
Save