diff --git a/client/internal/cmdline/serve.go b/client/internal/cmdline/serve.go index e0808b4..e2544a7 100644 --- a/client/internal/cmdline/serve.go +++ b/client/internal/cmdline/serve.go @@ -164,6 +164,15 @@ func serveHTTP(configPath string, opts serveHTTPOptions) { evtPubChan := evtPub.Start() defer evtPub.Stop() + // 系统事件监听器 + evtWtchr, err := sysevent.NewWatcherHost(config.Cfg().SysEvent) + if err != nil { + logger.Errorf("new sysevent watcher host: %v", err) + os.Exit(1) + } + evtWtchrChan := evtWtchr.Start() + defer evtWtchr.Stop() + // 连接性信息收集 var conCol *connectivity.Collector if stgglb.StandaloneMode { @@ -198,7 +207,7 @@ func serveHTTP(configPath string, opts serveHTTPOptions) { strgSel := strategy.NewSelector(config.Cfg().DownloadStrategy, spaceMeta, hubMeta, conMeta) // 下载器 - dlder := downloader.NewDownloader(config.Cfg().Downloader, conCol, stgPool, strgSel, db, spdStats) + dlder := downloader.NewDownloader(config.Cfg().Downloader, conCol, stgPool, strgSel, db, spdStats, evtPub) // 上传器 uploader := uploader.NewUploader(publock, conCol, stgPool, spaceMeta, db) @@ -230,7 +239,7 @@ func serveHTTP(configPath string, opts serveHTTPOptions) { mntChan := mnt.Start() defer mnt.Stop() - svc := services.NewService(publock, dlder, acStat, uploader, strgSel, spaceMeta, db, evtPub, mnt, stgPool, spaceSync, tktk, spdStats, accToken) + svc := services.NewService(publock, dlder, acStat, uploader, strgSel, spaceMeta, db, evtPub, evtWtchr, mnt, stgPool, spaceSync, tktk, spdStats, accToken) // HTTP接口 httpCfgJSON := config.Cfg().HTTP @@ -261,6 +270,7 @@ func serveHTTP(configPath string, opts serveHTTPOptions) { accTokenEvt := accTokenChan.Receive() evtPubEvt := evtPubChan.Receive() + evtWtchrEvt := evtWtchrChan.Receive() conColEvt := conColChan.Receive() acStatEvt := acStatChan.Receive() spaceSyncEvt := spaceSyncChan.Receive() @@ -312,6 +322,26 @@ loop: } evtPubEvt = evtPubChan.Receive() + case e := <-evtWtchrEvt.Chan(): + if e.Err != nil { + logger.Errorf("receive watcher event: %v", err) + break loop + } + + switch val := e.Value.(type) { + case sysevent.WatcherExited: + if val.Err != nil { + logger.Errorf("sysevent watcher exited with error: %v", val.Err) + } else { + logger.Info("sysevent watcher exited") + } + break loop + + case sysevent.OtherError: + logger.Errorf("sysevent watcher: %v", val) + } + evtWtchrEvt = evtWtchrChan.Receive() + case e := <-conColEvt.Chan(): if e.Err != nil { logger.Errorf("receive connectivity event: %v", err) diff --git a/client/internal/cmdline/test.go b/client/internal/cmdline/test.go index 31447b5..2def5bb 100644 --- a/client/internal/cmdline/test.go +++ b/client/internal/cmdline/test.go @@ -172,6 +172,15 @@ func test(configPath string) { evtPubChan := evtPub.Start() defer evtPub.Stop() + // 系统事件监听器 + evtWtchr, err := sysevent.NewWatcherHost(config.Cfg().SysEvent) + if err != nil { + logger.Errorf("new sysevent watcher host: %v", err) + os.Exit(1) + } + evtWtchrChan := evtWtchr.Start() + defer evtWtchr.Stop() + // 连接性信息收集 var conCol *connectivity.Collector if stgglb.StandaloneMode { @@ -206,7 +215,7 @@ func test(configPath string) { strgSel := strategy.NewSelector(config.Cfg().DownloadStrategy, stgMeta, hubMeta, conMeta) // 下载器 - dlder := downloader.NewDownloader(config.Cfg().Downloader, conCol, stgPool, strgSel, db, spdStats) + dlder := downloader.NewDownloader(config.Cfg().Downloader, conCol, stgPool, strgSel, db, spdStats, evtPub) // 上传器 uploader := uploader.NewUploader(publock, conCol, stgPool, stgMeta, db) @@ -216,7 +225,7 @@ func test(configPath string) { spaceSyncChan := spaceSync.Start() defer spaceSync.Stop() - svc := services.NewService(publock, dlder, acStat, uploader, strgSel, stgMeta, db, evtPub, nil, stgPool, spaceSync, nil, spdStats, accToken) + svc := services.NewService(publock, dlder, acStat, uploader, strgSel, stgMeta, db, evtPub, evtWtchr, nil, stgPool, spaceSync, nil, spdStats, accToken) go func() { doTest(svc) @@ -225,6 +234,7 @@ func test(configPath string) { /// 开始监听各个模块的事件 accTokenEvt := accTokenChan.Receive() evtPubEvt := evtPubChan.Receive() + evtWtchrEvt := evtWtchrChan.Receive() conColEvt := conColChan.Receive() acStatEvt := acStatChan.Receive() spaceSyncEvt := spaceSyncChan.Receive() @@ -272,6 +282,26 @@ loop: } evtPubEvt = evtPubChan.Receive() + case e := <-evtWtchrEvt.Chan(): + if e.Err != nil { + logger.Errorf("receive watcher event: %v", err) + break loop + } + + switch val := e.Value.(type) { + case sysevent.WatcherExited: + if val.Err != nil { + logger.Errorf("sysevent watcher exited with error: %v", val.Err) + } else { + logger.Info("sysevent watcher exited") + } + break loop + + case sysevent.OtherError: + logger.Errorf("sysevent watcher: %v", val) + } + evtWtchrEvt = evtWtchrChan.Receive() + case e := <-conColEvt.Chan(): if e.Err != nil { logger.Errorf("receive connectivity event: %v", err) diff --git a/client/internal/cmdline/vfstest.go b/client/internal/cmdline/vfstest.go index 88f2237..da20ad5 100644 --- a/client/internal/cmdline/vfstest.go +++ b/client/internal/cmdline/vfstest.go @@ -152,6 +152,15 @@ func vfsTest(configPath string, opts serveHTTPOptions) { evtPubChan := evtPub.Start() defer evtPub.Stop() + // 系统事件监听器 + evtWtchr, err := sysevent.NewWatcherHost(config.Cfg().SysEvent) + if err != nil { + logger.Errorf("new sysevent watcher host: %v", err) + os.Exit(1) + } + evtWtchrChan := evtWtchr.Start() + defer evtWtchr.Stop() + // 连接性信息收集 var conCol *connectivity.Collector if stgglb.StandaloneMode { @@ -186,7 +195,7 @@ func vfsTest(configPath string, opts serveHTTPOptions) { strgSel := strategy.NewSelector(config.Cfg().DownloadStrategy, stgMeta, hubMeta, conMeta) // 下载器 - dlder := downloader.NewDownloader(config.Cfg().Downloader, conCol, stgPool, strgSel, db, spdStats) + dlder := downloader.NewDownloader(config.Cfg().Downloader, conCol, stgPool, strgSel, db, spdStats, evtPub) // 上传器 uploader := uploader.NewUploader(publock, conCol, stgPool, stgMeta, db) @@ -209,7 +218,7 @@ func vfsTest(configPath string, opts serveHTTPOptions) { mntChan := mnt.Start() defer mnt.Stop() - svc := services.NewService(publock, dlder, acStat, uploader, strgSel, stgMeta, db, evtPub, mnt, stgPool, spaceSync, nil, spdStats, accToken) + svc := services.NewService(publock, dlder, acStat, uploader, strgSel, stgMeta, db, evtPub, evtWtchr, mnt, stgPool, spaceSync, nil, spdStats, accToken) // HTTP接口 httpCfgJSON := config.Cfg().HTTP @@ -244,6 +253,7 @@ func vfsTest(configPath string, opts serveHTTPOptions) { accTokenEvt := accTokenChan.Receive() evtPubEvt := evtPubChan.Receive() + evtWtchrEvt := evtWtchrChan.Receive() conColEvt := conColChan.Receive() acStatEvt := acStatChan.Receive() spaceSyncEvt := spaceSyncChan.Receive() @@ -293,6 +303,26 @@ loop: } evtPubEvt = evtPubChan.Receive() + case e := <-evtWtchrEvt.Chan(): + if e.Err != nil { + logger.Errorf("receive watcher event: %v", err) + break loop + } + + switch val := e.Value.(type) { + case sysevent.WatcherExited: + if val.Err != nil { + logger.Errorf("sysevent watcher exited with error: %v", val.Err) + } else { + logger.Info("sysevent watcher exited") + } + break loop + + case sysevent.OtherError: + logger.Errorf("sysevent watcher: %v", val) + } + evtWtchrEvt = evtWtchrChan.Receive() + case e := <-conColEvt.Chan(): if e.Err != nil { logger.Errorf("receive connectivity event: %v", err) diff --git a/client/internal/db/object.go b/client/internal/db/object.go index 2ab5a26..d8f37c5 100644 --- a/client/internal/db/object.go +++ b/client/internal/db/object.go @@ -814,3 +814,17 @@ func (db *ObjectDB) BatchCreateByDetails(ctx SQLContext, pkgID jcstypes.PackageI return affectedObjs, nil } + +func (*ObjectDB) Summary(ctx SQLContext) (cnt int64, size int64, err error) { + var stats struct { + Cnt int64 `gorm:"column:Cnt; type:bigint;"` + Size int64 `gorm:"column:Size; type:bigint;"` + } + + err = ctx.Table("Object").Select("Count(Size) as Cnt, Sum(Size) as Size").Find(&stats).Error + if err != nil { + return 0, 0, err + } + + return stats.Cnt, stats.Size, nil +} diff --git a/client/internal/downloader/downloader.go b/client/internal/downloader/downloader.go index 2af4066..dd8d8e6 100644 --- a/client/internal/downloader/downloader.go +++ b/client/internal/downloader/downloader.go @@ -11,6 +11,7 @@ import ( "gitlink.org.cn/cloudream/jcs-pub/client/internal/speedstats" "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/connectivity" "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/storage/pool" + "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/sysevent" jcstypes "gitlink.org.cn/cloudream/jcs-pub/common/types" ) @@ -45,9 +46,10 @@ type Downloader struct { selector *strategy.Selector db *db.DB speedStats *speedstats.SpeedStats + evtPub *sysevent.Publisher } -func NewDownloader(cfg Config, conn *connectivity.Collector, stgPool *pool.Pool, sel *strategy.Selector, db *db.DB, speedStats *speedstats.SpeedStats) *Downloader { +func NewDownloader(cfg Config, conn *connectivity.Collector, stgPool *pool.Pool, sel *strategy.Selector, db *db.DB, speedStats *speedstats.SpeedStats, evtPub *sysevent.Publisher) *Downloader { if cfg.MaxStripCacheCount == 0 { cfg.MaxStripCacheCount = DefaultMaxStripCacheCount } @@ -61,6 +63,7 @@ func NewDownloader(cfg Config, conn *connectivity.Collector, stgPool *pool.Pool, selector: sel, db: db, speedStats: speedStats, + evtPub: evtPub, } } diff --git a/client/internal/downloader/iterator.go b/client/internal/downloader/iterator.go index 646fa74..a13fd3c 100644 --- a/client/internal/downloader/iterator.go +++ b/client/internal/downloader/iterator.go @@ -5,9 +5,12 @@ import ( "fmt" "io" "reflect" + "time" "gitlink.org.cn/cloudream/common/pkgs/logger" "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/ioswitch/exec" + "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/ioswitch/plan/ops" + "gitlink.org.cn/cloudream/jcs-pub/common/types/datamap" "gitlink.org.cn/cloudream/common/utils/io2" "gitlink.org.cn/cloudream/common/utils/math2" @@ -140,19 +143,42 @@ func (i *DownloadObjectIterator) downloadDirect(req downloadReqeust2, strg strat exeCtx := exec.NewExecContext() exec.SetValueByType(exeCtx, i.downloader.stgPool) exec := plans.Execute(exeCtx) + + rd, err := exec.BeginRead(strHandle) + if err != nil { + return nil, err + } + + counter := io2.CounterCloser(rd, nil) go func() { + startTime := time.Now() + ret, err := exec.Wait(context.TODO()) if err != nil { logger.Warnf("downloading object %v: %v", req.Raw.ObjectID, err) } + transBytes := int64(0) for _, v := range ret.GetArray(ops2.BaseReadStatsStoreKey) { v2 := v.(*ops2.BaseReadStatsValue) - i.downloader.speedStats.Record(v2.Size, v2.Time, v2.Location.IsDriver) + i.downloader.speedStats.Record(v2.Length, v2.ElapsedTime, v2.Location.IsDriver) + transBytes += v2.Length } + + for _, v := range ret.GetArray(ops.SendStreamStatsStoreKey) { + v2 := v.(*ops.SendStreamStatsValue) + transBytes += v2.Length + } + + i.downloader.evtPub.Publish(&datamap.BodyObjectAccessStats{ + ObjectID: req.Raw.ObjectID, + RequestSize: len, + TransferAmount: transBytes, + ElapsedTime: time.Since(startTime), + }) }() - return exec.BeginRead(strHandle) + return counter, nil } func (i *DownloadObjectIterator) downloadECReconstruct(req downloadReqeust2, strg strategy.ECReconstructStrategy) (io.ReadCloser, error) { @@ -187,7 +213,10 @@ func (i *DownloadObjectIterator) downloadECReconstruct(req downloadReqeust2, str } pr, pw := io.Pipe() + counter := io2.CounterCloser(pr, nil) go func() { + startTime := time.Now() + readPos := req.Raw.Offset totalReadLen := req.Detail.Object.Size - req.Raw.Offset if req.Raw.Length >= 0 { @@ -196,6 +225,17 @@ func (i *DownloadObjectIterator) downloadECReconstruct(req downloadReqeust2, str firstStripIndex := readPos / strg.Redundancy.StripSize() stripIter := NewStripIterator(i.downloader, req.Detail.Object, downloadBlks, strg.Redundancy, firstStripIndex, i.downloader.strips, i.downloader.cfg.ECStripPrefetchCount) + + // defer顺序不能改,因为CollectStats需要在Close之后调用 + defer func() { + stats := stripIter.CollectStats() + i.downloader.evtPub.Publish(&datamap.BodyObjectAccessStats{ + ObjectID: req.Raw.ObjectID, + RequestSize: length, + TransferAmount: stats.TransferredBytes, + ElapsedTime: time.Since(startTime), + }) + }() defer stripIter.Close() for totalReadLen > 0 { @@ -224,5 +264,5 @@ func (i *DownloadObjectIterator) downloadECReconstruct(req downloadReqeust2, str pw.Close() }() - return pr, nil + return counter, nil } diff --git a/client/internal/downloader/lrc_strip_iterator.go b/client/internal/downloader/lrc_strip_iterator.go index 9c9428f..85e590e 100644 --- a/client/internal/downloader/lrc_strip_iterator.go +++ b/client/internal/downloader/lrc_strip_iterator.go @@ -129,7 +129,7 @@ func (s *LRCStripIterator) downloading() { for _, v := range ret.GetArray(ops2.BaseReadStatsStoreKey) { v2 := v.(*ops2.BaseReadStatsValue) - s.downloader.speedStats.Record(v2.Size, v2.Time, v2.Location.IsDriver) + s.downloader.speedStats.Record(v2.Length, v2.ElapsedTime, v2.Location.IsDriver) } }() defer cancel() diff --git a/client/internal/downloader/strip_iterator.go b/client/internal/downloader/strip_iterator.go index 68b7b67..6645342 100644 --- a/client/internal/downloader/strip_iterator.go +++ b/client/internal/downloader/strip_iterator.go @@ -9,6 +9,7 @@ import ( "gitlink.org.cn/cloudream/common/pkgs/logger" "gitlink.org.cn/cloudream/common/utils/math2" "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/ioswitch/exec" + "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/ioswitch/plan/ops" "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/ioswitch2" "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/ioswitch2/ops2" "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/ioswitch2/parser" @@ -26,19 +27,25 @@ type Strip struct { } type StripIterator struct { - downloader *Downloader - object jcstypes.Object - blocks []downloadBlock - red jcstypes.ECRedundancy - curStripIndex int64 - cache *StripCache - dataChan chan dataChanEntry - downloadingDone chan any - downloadingDoneOnce sync.Once - inited bool - downloadingStream io.ReadCloser - downloadingStripIndex int64 - downloadingPlanCtxCancel func() + downloader *Downloader + object jcstypes.Object + blocks []downloadBlock + red jcstypes.ECRedundancy + curStripIndex int64 + cache *StripCache + dataChan chan dataChanEntry + downloadingDone chan any + downloadingDoneOnce sync.Once + inited bool + downloadingStream io.ReadCloser + downloadingStripIndex int64 + statsLock *sync.Cond + downloadingStrips int + transferredBytes int64 // 总传输量 +} + +type StripDownloadStats struct { + TransferredBytes int64 // 总传输量 } type dataChanEntry struct { @@ -61,6 +68,7 @@ func NewStripIterator(downloader *Downloader, object jcstypes.Object, blocks []d cache: cache, dataChan: make(chan dataChanEntry, maxPrefetch-1), downloadingDone: make(chan any), + statsLock: sync.NewCond(&sync.Mutex{}), } return iter @@ -115,6 +123,19 @@ func (s *StripIterator) Close() { }) } +// 必须在Close之后调用! +func (s *StripIterator) CollectStats() StripDownloadStats { + s.statsLock.L.Lock() + for s.downloadingStrips > 0 { + s.statsLock.Wait() + } + s.statsLock.L.Unlock() + + return StripDownloadStats{ + TransferredBytes: s.transferredBytes, + } +} + func (s *StripIterator) downloading(startStripIndex int64) { curStripIndex := startStripIndex loop: @@ -177,7 +198,6 @@ loop: if s.downloadingStream != nil { s.downloadingStream.Close() - s.downloadingPlanCtxCancel() } close(s.dataChan) } @@ -196,7 +216,6 @@ func (s *StripIterator) readStrip(stripIndex int64, buf []byte) (int, error) { if s.downloadingStream == nil || s.downloadingStripIndex != stripIndex { if s.downloadingStream != nil { s.downloadingStream.Close() - s.downloadingPlanCtxCancel() } ft := ioswitch2.NewFromTo() @@ -221,29 +240,44 @@ func (s *StripIterator) readStrip(stripIndex int64, buf []byte) (int, error) { exec.SetValueByType(exeCtx, s.downloader.stgPool) exec := plans.Execute(exeCtx) - ctx, cancel := context.WithCancel(context.Background()) + s.statsLock.L.Lock() + s.downloadingStrips += 1 + s.statsLock.L.Unlock() + s.statsLock.Broadcast() + go func() { - ret, err := exec.Wait(ctx) + ret, err := exec.Wait(context.Background()) if err != nil { logger.Warnf("downloading strip: %v", err) return } + transBytes := int64(0) for _, v := range ret.GetArray(ops2.BaseReadStatsStoreKey) { v2 := v.(*ops2.BaseReadStatsValue) - s.downloader.speedStats.Record(v2.Size, v2.Time, v2.Location.IsDriver) + s.downloader.speedStats.Record(v2.Length, v2.ElapsedTime, v2.Location.IsDriver) + transBytes += v2.Length } + + for _, v := range ret.GetArray(ops.SendStreamStatsStoreKey) { + v2 := v.(*ops.SendStreamStatsValue) + transBytes += v2.Length + } + + s.statsLock.L.Lock() + s.downloadingStrips -= 1 + s.transferredBytes += transBytes + s.statsLock.L.Unlock() + s.statsLock.Broadcast() }() str, err := exec.BeginRead(hd) if err != nil { - cancel() return 0, err } s.downloadingStream = str s.downloadingStripIndex = stripIndex - s.downloadingPlanCtxCancel = cancel } n, err := io.ReadFull(s.downloadingStream, buf) diff --git a/client/internal/http/v1/server.go b/client/internal/http/v1/server.go index 1b6f28a..5c13221 100644 --- a/client/internal/http/v1/server.go +++ b/client/internal/http/v1/server.go @@ -95,4 +95,6 @@ func (s *Server) InitRouters(rt gin.IRoutes, ah *auth.Auth, proxy *proxy.Cluster rt.GET(cliapi.PubShardsGetPath, certAuth, s.PubShards().Get) rt.GET(cliapi.PubShardsExportPackagePath, certAuth, s.PubShards().ExportPackage) rt.POST(cliapi.PubShardsImportPackagePath, certAuth, s.PubShards().ImportPackage) + + rt.POST(cliapi.SysEventWatchPath, certAuth, s.SysEvent().Watch) } diff --git a/client/internal/http/v1/sysevent.go b/client/internal/http/v1/sysevent.go new file mode 100644 index 0000000..ece8d7d --- /dev/null +++ b/client/internal/http/v1/sysevent.go @@ -0,0 +1,57 @@ +package http + +import ( + "net/http" + + "github.com/gin-gonic/gin" + "gitlink.org.cn/cloudream/common/pkgs/logger" + "gitlink.org.cn/cloudream/common/utils/serder" + "gitlink.org.cn/cloudream/jcs-pub/client/internal/http/types" + cliapi "gitlink.org.cn/cloudream/jcs-pub/client/sdk/api/v1" + "gitlink.org.cn/cloudream/jcs-pub/common/ecode" + "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/sysevent" +) + +type SysEventService struct { + *Server +} + +func (s *Server) SysEvent() *SysEventService { + return &SysEventService{s} +} + +func (s *SysEventService) Watch(ctx *gin.Context) { + log := logger.WithField("HTTP", "SysEvent.Watch") + + _, err := types.ShouldBindJSONEx[cliapi.SysEventWatch](ctx) + if err != nil { + log.Warnf("binding body: %s", err.Error()) + ctx.JSON(http.StatusBadRequest, types.Failed(ecode.BadArgument, "%v", err)) + return + } + + if s.svc.EvtWatcher == nil { + ctx.JSON(http.StatusOK, types.Failed(ecode.OperationFailed, "event watcher not configured")) + return + } + + ctx.Writer.Header().Set("Content-Type", "text/event-stream") + ctx.Writer.Header().Set("Cache-Control", "no-cache") + ctx.Writer.Header().Set("Connection", "keep-alive") + + wtchr := s.svc.EvtWatcher.AddWatcherFn(func(event sysevent.SysEvent) { + data, err := serder.ObjectToJSONEx(event) + if err != nil { + log.Warnf("serializing event: %s", err.Error()) + return + } + + ctx.SSEvent("message", string(data)) + ctx.Writer.Flush() + }) + + <-ctx.Request.Context().Done() + s.svc.EvtWatcher.RemoveWatcher(wtchr) + + log.Debugf("event watcher stopped") +} diff --git a/client/internal/services/service.go b/client/internal/services/service.go index 17d9350..bc39b7a 100644 --- a/client/internal/services/service.go +++ b/client/internal/services/service.go @@ -27,6 +27,7 @@ type Service struct { UserSpaceMeta *metacache.UserSpaceMeta DB *db.DB EvtPub *sysevent.Publisher + EvtWatcher *sysevent.WatcherHost // 为nil表示没有配置Watcher Mount *mount.Mount StgPool *pool.Pool SpaceSyncer *spacesyncer.SpaceSyncer @@ -44,6 +45,7 @@ func NewService( userSpaceMeta *metacache.UserSpaceMeta, db *db.DB, evtPub *sysevent.Publisher, + evtWatcher *sysevent.WatcherHost, mount *mount.Mount, stgPool *pool.Pool, spaceSyncer *spacesyncer.SpaceSyncer, @@ -60,6 +62,7 @@ func NewService( UserSpaceMeta: userSpaceMeta, DB: db, EvtPub: evtPub, + EvtWatcher: evtWatcher, Mount: mount, StgPool: stgPool, SpaceSyncer: spaceSyncer, diff --git a/client/internal/services/user_space.go b/client/internal/services/user_space.go index 8cc65d4..a188fff 100644 --- a/client/internal/services/user_space.go +++ b/client/internal/services/user_space.go @@ -298,7 +298,7 @@ func (svc *UserSpaceService) DownloadPackage(req cliapi.UserSpaceDownloadPackage for _, v := range ret.GetArray(ops2.BaseReadStatsStoreKey) { v2 := v.(*ops2.BaseReadStatsValue) - svc.SpeedStats.Record(v2.Size, v2.Time, v2.Location.IsDriver) + svc.SpeedStats.Record(v2.Length, v2.ElapsedTime, v2.Location.IsDriver) } err = svc.DB.DoTx(func(tx db.SQLContext) error { diff --git a/client/internal/speedstats/speedstats.go b/client/internal/speedstats/speedstats.go index 01f3cbc..712a75b 100644 --- a/client/internal/speedstats/speedstats.go +++ b/client/internal/speedstats/speedstats.go @@ -55,27 +55,27 @@ func New() *SpeedStats { } } -func (p *SpeedStats) Record(size int64, time time.Duration, happenedAtClient bool) { +func (p *SpeedStats) Record(size int64, elapsedTime time.Duration, happenedAtClient bool) { p.lock.Lock() defer p.lock.Unlock() if size < 100*1024*1024 { if happenedAtClient { - p.stats100M[0].Record(size, time) + p.stats100M[0].Record(size, elapsedTime) } else { - p.stats100M[1].Record(size, time) + p.stats100M[1].Record(size, elapsedTime) } } else if size < 1024*1024*1024 { if happenedAtClient { - p.stats1G[0].Record(size, time) + p.stats1G[0].Record(size, elapsedTime) } else { - p.stats1G[1].Record(size, time) + p.stats1G[1].Record(size, elapsedTime) } } else { if happenedAtClient { - p.statsAbove1G[0].Record(size, time) + p.statsAbove1G[0].Record(size, elapsedTime) } else { - p.statsAbove1G[1].Record(size, time) + p.statsAbove1G[1].Record(size, elapsedTime) } } } diff --git a/client/internal/ticktock/stats_log.go b/client/internal/ticktock/stats_log.go new file mode 100644 index 0000000..235e819 --- /dev/null +++ b/client/internal/ticktock/stats_log.go @@ -0,0 +1,62 @@ +package ticktock + +import ( + "gitlink.org.cn/cloudream/common/pkgs/logger" + "gitlink.org.cn/cloudream/common/utils/reflect2" + stgtypes "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/storage/types" + jcstypes "gitlink.org.cn/cloudream/jcs-pub/common/types" + "gitlink.org.cn/cloudream/jcs-pub/common/types/datamap" +) + +type StatsLog struct { +} + +func (j *StatsLog) Name() string { + return reflect2.TypeNameOf[StatsLog]() +} + +func (j *StatsLog) Execute(t *TickTock) { + j.logGlobalObjectStats(t) +} + +func (j *StatsLog) logGlobalObjectStats(t *TickTock) { + log := logger.WithType[StatsLog]("Event") + + cnt, size, err := t.db.Object().Summary(t.db.DefCtx()) + if err != nil { + log.Warnf("get object summary: %v", err) + return + } + + usIDs, err := t.db.UserSpace().GetAllIDs(t.db.DefCtx()) + if err != nil { + log.Warnf("get user space ids: %v", err) + return + } + + var logUsIDs []jcstypes.UserSpaceID + var usStats []stgtypes.ShardStoreStats + + details := t.spaceMeta.GetMany(usIDs) + for _, d := range details { + if d.UserSpace.ShardStore == nil { + continue + } + + store, err := t.stgPool.GetShardStore(d) + if err != nil { + log.Warnf("get shard store %v: %v", d, err) + continue + } + + logUsIDs = append(logUsIDs, d.UserSpace.UserSpaceID) + usStats = append(usStats, store.Stats()) + } + + t.evtPub.Publish(&datamap.BodyGlobalObjectStats{ + TotalObjectCount: cnt, + TotalDataSize: size, + UserSpaceIDs: logUsIDs, + UserSpaceStats: usStats, + }) +} diff --git a/client/internal/ticktock/ticktock.go b/client/internal/ticktock/ticktock.go index bde3547..7c8a512 100644 --- a/client/internal/ticktock/ticktock.go +++ b/client/internal/ticktock/ticktock.go @@ -114,4 +114,8 @@ func (t *TickTock) initJobs() { gocron.NewAtTime(12, 0, 0), gocron.NewAtTime(18, 0, 0), ))) + + t.addJob(&StatsLog{}, gocron.DailyJob(1, gocron.NewAtTimes( + gocron.NewAtTime(0, 0, 0), + ))) } diff --git a/client/sdk/api/v1/sysevent.go b/client/sdk/api/v1/sysevent.go new file mode 100644 index 0000000..9204b6c --- /dev/null +++ b/client/sdk/api/v1/sysevent.go @@ -0,0 +1,55 @@ +package api + +import ( + "fmt" + "io" + "net/http" + "strings" + + "gitlink.org.cn/cloudream/common/sdks" + "gitlink.org.cn/cloudream/common/utils/http2" + "gitlink.org.cn/cloudream/common/utils/serder" +) + +type SysEventService struct { + *Client +} + +func (c *Client) SysEvent() *SysEventService { + return &SysEventService{ + Client: c, + } +} + +const SysEventWatchPath = "/sysEvent/watch" + +type SysEventWatch struct{} + +func (r *SysEventWatch) MakeParam() *sdks.RequestParam { + return sdks.MakeJSONParam(http.MethodPost, SysEventWatchPath, r) +} + +type SysEventWatchResp struct { + Stream io.ReadCloser +} + +func (r *SysEventWatchResp) ParseResponse(resp *http.Response) error { + contType := resp.Header.Get("Content-Type") + if strings.Contains(contType, http2.ContentTypeJSON) { + var err error + var r sdks.CodeDataResponse[any] + + if err = serder.JSONToObjectStreamExRaw(resp.Body, &r); err != nil { + return fmt.Errorf("parsing response: %w", err) + } + + return &sdks.CodeMessageError{Code: r.Code, Message: r.Message} + } + + r.Stream = resp.Body + return nil +} + +func (c *SysEventService) Watch(req SysEventWatch) (*SysEventWatchResp, error) { + return JSONAPI(&c.cfg, c.httpCli, &req, &SysEventWatchResp{}) +} diff --git a/common/pkgs/ioswitch/exec/executor.go b/common/pkgs/ioswitch/exec/executor.go index 7db9801..2d81629 100644 --- a/common/pkgs/ioswitch/exec/executor.go +++ b/common/pkgs/ioswitch/exec/executor.go @@ -63,6 +63,9 @@ func (s *Executor) Run(ctx *ExecContext) (ExecutorResult, error) { return ExecutorResult{}, err } + s.lock.Lock() + defer s.lock.Unlock() + return ExecutorResult{Stored: s.store}, nil } diff --git a/common/pkgs/ioswitch/exec/worker.go b/common/pkgs/ioswitch/exec/worker.go index 2091563..b49251e 100644 --- a/common/pkgs/ioswitch/exec/worker.go +++ b/common/pkgs/ioswitch/exec/worker.go @@ -86,6 +86,8 @@ func (s *Worker) FindByIDContexted(ctx context.Context, id PlanID) *Executor { } type WorkerInfo interface { + // Worker名称 + Name() string NewClient() (WorkerClient, error) // 判断两个worker是否相同 Equals(worker WorkerInfo) bool diff --git a/common/pkgs/ioswitch/plan/ops/send.go b/common/pkgs/ioswitch/plan/ops/send.go index 28c05b0..209f688 100644 --- a/common/pkgs/ioswitch/plan/ops/send.go +++ b/common/pkgs/ioswitch/plan/ops/send.go @@ -3,6 +3,7 @@ package ops import ( "fmt" "io" + "time" "gitlink.org.cn/cloudream/common/pkgs/future" "gitlink.org.cn/cloudream/common/utils/io2" @@ -10,6 +11,10 @@ import ( "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/ioswitch/exec" ) +const ( + SendStreamStatsStoreKey = "Stats.SendStream" +) + func init() { exec.UseOp[*SendStream]() exec.UseOp[*GetStream]() @@ -17,10 +22,23 @@ func init() { exec.UseOp[*GetVar]() } +type SendStreamStatsValue struct { + IsSend bool + Length int64 + Time time.Duration + Src exec.Location + Dst exec.Location +} + +func (v *SendStreamStatsValue) Clone() exec.VarValue { + v2 := *v + return &v2 +} + type SendStream struct { - Input exec.VarID `json:"input"` - Send exec.VarID `json:"send"` - Worker exec.WorkerInfo `json:"worker"` + Input exec.VarID + Send exec.VarID + Worker exec.WorkerInfo } func (o *SendStream) Execute(ctx *exec.ExecContext, e *exec.Executor) error { @@ -36,12 +54,27 @@ func (o *SendStream) Execute(ctx *exec.ExecContext, e *exec.Executor) error { } defer cli.Close() + counter := io2.CounterCloser(inputStr.Stream, nil) + + startTime := time.Now() + // 发送后流的ID不同 - err = cli.SendStream(ctx.Context, e.Plan().ID, o.Send, inputStr.Stream) + err = cli.SendStream(ctx.Context, e.Plan().ID, o.Send, counter) if err != nil { return fmt.Errorf("sending stream: %w", err) } + e.Store(SendStreamStatsStoreKey, &SendStreamStatsValue{ + IsSend: true, + Length: counter.Count(), + Time: time.Since(startTime), + Src: e.Location(), + Dst: exec.Location{ + IsDriver: false, + WorkerName: o.Worker.Name(), + }, + }) + return nil } @@ -68,14 +101,29 @@ func (o *GetStream) Execute(ctx *exec.ExecContext, e *exec.Executor) error { return fmt.Errorf("getting stream: %w", err) } + startTime := time.Now() + + counter := io2.CounterCloser(str, nil) fut := future.NewSetVoid() // 获取后送到本地的流ID是不同的 - str = io2.AfterReadClosedOnce(str, func(closer io.ReadCloser) { + str = io2.AfterReadClosedOnce(counter, func(closer io.ReadCloser) { fut.SetVoid() }) e.PutVar(o.Output, &exec.StreamValue{Stream: str}) - return fut.Wait(ctx.Context) + err = fut.Wait(ctx.Context) + e.Store(SendStreamStatsStoreKey, &SendStreamStatsValue{ + IsSend: false, + Length: counter.Count(), + Time: time.Since(startTime), + Src: exec.Location{ + IsDriver: false, + WorkerName: o.Worker.Name(), + }, + Dst: e.Location(), + }) + + return err } func (o *GetStream) String() string { diff --git a/common/pkgs/ioswitch2/http_hub_worker.go b/common/pkgs/ioswitch2/http_hub_worker.go index 31721d3..f43bf83 100644 --- a/common/pkgs/ioswitch2/http_hub_worker.go +++ b/common/pkgs/ioswitch2/http_hub_worker.go @@ -17,6 +17,10 @@ type HttpHubWorker struct { Hub jcstypes.Hub } +func (w *HttpHubWorker) Name() string { + return fmt.Sprintf("%v", w.Hub.HubID) +} + func (w *HttpHubWorker) NewClient() (exec.WorkerClient, error) { addressInfo := w.Hub.Address.(*jcstypes.HttpAddressInfo) baseUrl := "http://" + addressInfo.ExternalIP + ":" + strconv.Itoa(addressInfo.Port) @@ -30,7 +34,7 @@ func (w *HttpHubWorker) NewClient() (exec.WorkerClient, error) { return nil, err } - return &HttpHubWorkerClient{hubID: w.Hub.HubID, cli: cli}, nil + return &HttpHubWorkerClient{workerName: w.Name(), hubID: w.Hub.HubID, cli: cli}, nil } func (w *HttpHubWorker) String() string { @@ -47,14 +51,15 @@ func (w *HttpHubWorker) Equals(worker exec.WorkerInfo) bool { } type HttpHubWorkerClient struct { - hubID jcstypes.HubID - cli *hubapi.Client + workerName string + hubID jcstypes.HubID + cli *hubapi.Client } func (c *HttpHubWorkerClient) ExecutePlan(ctx context.Context, plan exec.Plan) (exec.ExecutorResult, error) { resp, err := c.cli.ExecuteIOPlan(hubapi.ExecuteIOPlanReq{ Plan: plan, - WorkerName: fmt.Sprintf("%v", c.hubID), + WorkerName: c.workerName, }) if err != nil { return exec.ExecutorResult{}, err diff --git a/common/pkgs/ioswitch2/hub_worker.go b/common/pkgs/ioswitch2/hub_worker.go index 44db251..719d891 100644 --- a/common/pkgs/ioswitch2/hub_worker.go +++ b/common/pkgs/ioswitch2/hub_worker.go @@ -24,9 +24,13 @@ type HubWorker struct { Address jcstypes.GRPCAddressInfo } +func (w *HubWorker) Name() string { + return fmt.Sprintf("%v", w.Hub.HubID) +} + func (w *HubWorker) NewClient() (exec.WorkerClient, error) { cli := stgglb.HubRPCPool.Get(stgglb.SelectGRPCAddress(&w.Hub, &w.Address)) - return &HubWorkerClient{hubID: w.Hub.HubID, cli: cli}, nil + return &HubWorkerClient{workerName: w.Name(), hubID: w.Hub.HubID, cli: cli}, nil } func (w *HubWorker) String() string { @@ -43,12 +47,13 @@ func (w *HubWorker) Equals(worker exec.WorkerInfo) bool { } type HubWorkerClient struct { - hubID jcstypes.HubID - cli *hubrpc.Client + workerName string + hubID jcstypes.HubID + cli *hubrpc.Client } func (c *HubWorkerClient) ExecutePlan(ctx context.Context, plan exec.Plan) (exec.ExecutorResult, error) { - resp, err := c.cli.ExecuteIOPlan(ctx, &hubrpc.ExecuteIOPlan{Plan: plan, WorkerName: fmt.Sprintf("%v", c.hubID)}) + resp, err := c.cli.ExecuteIOPlan(ctx, &hubrpc.ExecuteIOPlan{Plan: plan, WorkerName: c.workerName}) if err != nil { return exec.ExecutorResult{}, err.ToError() } diff --git a/common/pkgs/ioswitch2/ops2/base_store.go b/common/pkgs/ioswitch2/ops2/base_store.go index b0802f8..4d01210 100644 --- a/common/pkgs/ioswitch2/ops2/base_store.go +++ b/common/pkgs/ioswitch2/ops2/base_store.go @@ -17,7 +17,7 @@ import ( ) const ( - BaseReadStatsStoreKey = "BaseReadSpeed" + BaseReadStatsStoreKey = "Stats.BaseRead" ) func init() { @@ -28,16 +28,16 @@ func init() { } type BaseReadStatsValue struct { - Size int64 - Time time.Duration - Location exec.Location + Length int64 + ElapsedTime time.Duration + Location exec.Location } func (v *BaseReadStatsValue) Clone() exec.VarValue { return &BaseReadStatsValue{ - Size: v.Size, - Time: v.Time, - Location: v.Location, + Length: v.Length, + ElapsedTime: v.ElapsedTime, + Location: v.Location, } } @@ -72,18 +72,7 @@ func (o *BaseRead) Execute(ctx *exec.ExecContext, e *exec.Executor) error { } startTime := time.Now() - counter := io2.CounterCloser(stream, func(cnt int64, err error) { - if err != nil && err != io.EOF { - return - } - - // 要注意这个回调一定要在return之前调用 - e.Store(BaseReadStatsStoreKey, &BaseReadStatsValue{ - Size: cnt, - Time: time.Since(startTime), - Location: e.Location(), - }) - }) + counter := io2.CounterCloser(stream, nil) fut := future.NewSetVoid() output := &exec.StreamValue{ @@ -93,7 +82,13 @@ func (o *BaseRead) Execute(ctx *exec.ExecContext, e *exec.Executor) error { } e.PutVar(o.Output, output) - return fut.Wait(ctx.Context) + err = fut.Wait(ctx.Context) + e.Store(BaseReadStatsStoreKey, &BaseReadStatsValue{ + Length: counter.Count(), + ElapsedTime: time.Since(startTime), + Location: e.Location(), + }) + return err } func (o *BaseRead) String() string { @@ -137,18 +132,7 @@ func (o *BaseReadDyn) Execute(ctx *exec.ExecContext, e *exec.Executor) error { } startTime := time.Now() - counter := io2.CounterCloser(stream, func(cnt int64, err error) { - if err != nil && err != io.EOF { - return - } - - // 要注意这个回调一定要在return之前调用 - e.Store(BaseReadStatsStoreKey, &BaseReadStatsValue{ - Size: cnt, - Time: time.Since(startTime), - Location: e.Location(), - }) - }) + counter := io2.CounterCloser(stream, nil) fut := future.NewSetVoid() output := &exec.StreamValue{ @@ -158,7 +142,13 @@ func (o *BaseReadDyn) Execute(ctx *exec.ExecContext, e *exec.Executor) error { } e.PutVar(o.Output, output) - return fut.Wait(ctx.Context) + err = fut.Wait(ctx.Context) + e.Store(BaseReadStatsStoreKey, &BaseReadStatsValue{ + Length: counter.Count(), + ElapsedTime: time.Since(startTime), + Location: e.Location(), + }) + return err } func (o *BaseReadDyn) String() string { diff --git a/common/pkgs/ioswitchlrc/hub_worker.go b/common/pkgs/ioswitchlrc/hub_worker.go index f3b1bd8..1e2bd76 100644 --- a/common/pkgs/ioswitchlrc/hub_worker.go +++ b/common/pkgs/ioswitchlrc/hub_worker.go @@ -2,6 +2,7 @@ package ioswitchlrc import ( "context" + "fmt" "io" stgglb "gitlink.org.cn/cloudream/jcs-pub/common/globals" @@ -19,6 +20,10 @@ type HubWorker struct { Address jcstypes.GRPCAddressInfo } +func (w *HubWorker) Name() string { + return fmt.Sprintf("%v", w.Hub.HubID) +} + func (w *HubWorker) NewClient() (exec.WorkerClient, error) { cli := stgglb.HubRPCPool.Get(stgglb.SelectGRPCAddress(&w.Hub, &w.Address)) return &HubWorkerClient{cli: cli}, nil @@ -38,11 +43,12 @@ func (w *HubWorker) Equals(worker exec.WorkerInfo) bool { } type HubWorkerClient struct { - cli *hubrpc.Client + workerName string + cli *hubrpc.Client } func (c *HubWorkerClient) ExecutePlan(ctx context.Context, plan exec.Plan) (exec.ExecutorResult, error) { - resp, err := c.cli.ExecuteIOPlan(ctx, &hubrpc.ExecuteIOPlan{Plan: plan}) + resp, err := c.cli.ExecuteIOPlan(ctx, &hubrpc.ExecuteIOPlan{Plan: plan, WorkerName: c.workerName}) if err != nil { return exec.ExecutorResult{}, err.ToError() } diff --git a/common/pkgs/rpc/hub/pub_shards.go b/common/pkgs/rpc/hub/pub_shards.go index 0d6dc35..c24c848 100644 --- a/common/pkgs/rpc/hub/pub_shards.go +++ b/common/pkgs/rpc/hub/pub_shards.go @@ -112,7 +112,7 @@ type PubShardsStats struct { Password string } type PubShardsStatsResp struct { - Stats stgtypes.Stats + Stats stgtypes.ShardStoreStats } var _ = TokenAuth(Hub_PubShardsStats_FullMethodName) diff --git a/common/pkgs/rpc/utils.go b/common/pkgs/rpc/utils.go index 834f128..36669a4 100644 --- a/common/pkgs/rpc/utils.go +++ b/common/pkgs/rpc/utils.go @@ -219,6 +219,7 @@ func DownloadStreamServer[Resp DownloadStreamResp, Req any, APIRet DownloadStrea if cerr != nil { return WrapCodeError(cerr) } + defer resp.GetStream().Close() cw := NewChunkedWriter(ret) data, err := serder.ObjectToJSONEx(resp) diff --git a/common/pkgs/servicestats/hub_strorage_transfer.go b/common/pkgs/servicestats/hub_strorage_transfer.go index 9a7d79a..8eedba4 100644 --- a/common/pkgs/servicestats/hub_strorage_transfer.go +++ b/common/pkgs/servicestats/hub_strorage_transfer.go @@ -18,22 +18,23 @@ type HubStorageTransferStats struct { type HubStorageTransferStatsData struct { Entries map[jcstypes.StorageID]*HubStorageTransferStatsEntry StartTime time.Time + EndTime time.Time } type HubStorageTransferStatsEntry struct { DestStorageID jcstypes.StorageID - OutputBytes int64 - MaxOutputBytes int64 - MinOutputBytes int64 - TotalOutput int64 - SuccessOutput int64 - - InputBytes int64 - MaxInputBytes int64 - MinInputBytes int64 - TotalInput int64 - SuccessInput int64 + OutputBytes int64 + MaxOutputBytes int64 + MinOutputBytes int64 + TotalOutputReqs int64 + SuccessOutputReqs int64 + + InputBytes int64 + MaxInputBytes int64 + MinInputBytes int64 + TotalInputReqs int64 + SuccessInputReqs int64 } func (s *HubStorageTransferStats) RecordUpload(dstStorageID jcstypes.StorageID, transferBytes int64, isSuccess bool) { @@ -53,9 +54,9 @@ func (s *HubStorageTransferStats) RecordUpload(dstStorageID jcstypes.StorageID, e.MaxOutputBytes = math2.Max(e.MaxOutputBytes, transferBytes) e.MinOutputBytes = math2.Min(e.MinOutputBytes, transferBytes) if isSuccess { - e.SuccessOutput++ + e.SuccessOutputReqs++ } - e.TotalOutput++ + e.TotalOutputReqs++ } func (s *HubStorageTransferStats) RecordDownload(dstStorageID jcstypes.StorageID, transferBytes int64, isSuccess bool) { @@ -75,20 +76,11 @@ func (s *HubStorageTransferStats) RecordDownload(dstStorageID jcstypes.StorageID e.MaxInputBytes = math2.Max(e.MaxInputBytes, transferBytes) e.MinInputBytes = math2.Min(e.MinInputBytes, transferBytes) if isSuccess { - e.SuccessInput++ + e.SuccessInputReqs++ } } -func (s *HubStorageTransferStats) Reset() time.Time { - s.lock.Lock() - defer s.lock.Unlock() - - s.data.Entries = make(map[jcstypes.StorageID]*HubStorageTransferStatsEntry) - s.data.StartTime = time.Now() - return s.data.StartTime -} - -func (s *HubStorageTransferStats) DumpData() HubStorageTransferStatsData { +func (s *HubStorageTransferStats) DumpData(reset bool) HubStorageTransferStatsData { s.lock.Lock() defer s.lock.Unlock() @@ -98,6 +90,12 @@ func (s *HubStorageTransferStats) DumpData() HubStorageTransferStatsData { v2 := *v data.Entries[k] = &v2 } + data.EndTime = time.Now() + + if reset { + s.data.Entries = make(map[jcstypes.StorageID]*HubStorageTransferStatsEntry) + s.data.StartTime = time.Now() + } return data } diff --git a/common/pkgs/servicestats/hub_transfter.go b/common/pkgs/servicestats/hub_transfter.go index b5a447a..9b09d49 100644 --- a/common/pkgs/servicestats/hub_transfter.go +++ b/common/pkgs/servicestats/hub_transfter.go @@ -18,22 +18,23 @@ type HubTransferStats struct { type HubTransferStatsData struct { Entries map[jcstypes.HubID]*HubTransferStatsEntry StartTime time.Time + EndTime time.Time } type HubTransferStatsEntry struct { DestHubID jcstypes.HubID - OutputBytes int64 - MaxOutputBytes int64 - MinOutputBytes int64 - TotalOutput int64 - SuccessOutput int64 + OutputBytes int64 + MaxOutputBytes int64 + MinOutputBytes int64 + OutputReqs int64 // 发送数据的请求数量 + SuccessOutputReqs int64 // 发送成功的请求数量 - InputBytes int64 - MaxInputBytes int64 - MinInputBytes int64 - TotalInput int64 - SuccessInput int64 + InputBytes int64 + MaxInputBytes int64 + MinInputBytes int64 + TotalInputReqs int64 // 下载数据的请求数量 + SuccessInputReqs int64 // 下载成功的请求数量 } func (s *HubTransferStats) RecordOutput(dstHubID jcstypes.HubID, transferBytes int64, isSuccess bool) { @@ -53,9 +54,9 @@ func (s *HubTransferStats) RecordOutput(dstHubID jcstypes.HubID, transferBytes i e.MaxOutputBytes = math2.Max(e.MaxOutputBytes, transferBytes) e.MinOutputBytes = math2.Min(e.MinOutputBytes, transferBytes) if isSuccess { - e.SuccessOutput++ + e.SuccessOutputReqs++ } - e.TotalOutput++ + e.OutputReqs++ } func (s *HubTransferStats) RecordInput(dstHubID jcstypes.HubID, transferBytes int64, isSuccess bool) { @@ -75,21 +76,12 @@ func (s *HubTransferStats) RecordInput(dstHubID jcstypes.HubID, transferBytes in e.MaxInputBytes = math2.Max(e.MaxInputBytes, transferBytes) e.MinInputBytes = math2.Min(e.MinInputBytes, transferBytes) if isSuccess { - e.SuccessInput++ + e.SuccessInputReqs++ } - e.TotalInput++ + e.TotalInputReqs++ } -func (s *HubTransferStats) Reset() time.Time { - s.lock.Lock() - defer s.lock.Unlock() - - s.data.StartTime = time.Now() - s.data.Entries = make(map[jcstypes.HubID]*HubTransferStatsEntry) - return s.data.StartTime -} - -func (s *HubTransferStats) DumpData() HubTransferStatsData { +func (s *HubTransferStats) DumpData(reset bool) HubTransferStatsData { s.lock.Lock() defer s.lock.Unlock() @@ -99,5 +91,12 @@ func (s *HubTransferStats) DumpData() HubTransferStatsData { v2 := *v data.Entries[k] = &v2 } + data.EndTime = time.Now() + + if reset { + s.data.StartTime = time.Now() + s.data.Entries = make(map[jcstypes.HubID]*HubTransferStatsEntry) + } + return data } diff --git a/common/pkgs/storage/local/shard_store.go b/common/pkgs/storage/local/shard_store.go index 564169b..1f3751f 100644 --- a/common/pkgs/storage/local/shard_store.go +++ b/common/pkgs/storage/local/shard_store.go @@ -207,11 +207,11 @@ func (s *ShardStore) GC(avaiables []jcstypes.FileHash) error { return nil } -func (s *ShardStore) Stats() stgtypes.Stats { +func (s *ShardStore) Stats() stgtypes.ShardStoreStats { s.lock.Lock() defer s.lock.Unlock() - return stgtypes.Stats{ + return stgtypes.ShardStoreStats{ Status: stgtypes.StatusOK, FileCount: s.totalShardsCnt, TotalSize: s.detail.UserSpace.ShardStore.MaxSize, diff --git a/common/pkgs/storage/pubshards/shard_store.go b/common/pkgs/storage/pubshards/shard_store.go index 5152f34..7e292ea 100644 --- a/common/pkgs/storage/pubshards/shard_store.go +++ b/common/pkgs/storage/pubshards/shard_store.go @@ -88,13 +88,13 @@ func (s *ShardStore) GC(avaiables []jcstypes.FileHash) error { return nil } -func (s *ShardStore) Stats() stgtypes.Stats { +func (s *ShardStore) Stats() stgtypes.ShardStoreStats { resp, cerr := s.hubCli.PubShardsStats(context.Background(), &hubrpc.PubShardsStats{ PubShardsID: s.stgType.PubShardsID, Password: s.stgType.Password, }) if cerr != nil { - return stgtypes.Stats{} + return stgtypes.ShardStoreStats{} } return resp.Stats } diff --git a/common/pkgs/storage/rclone/shard_store.go b/common/pkgs/storage/rclone/shard_store.go index 64d00ce..959fed4 100644 --- a/common/pkgs/storage/rclone/shard_store.go +++ b/common/pkgs/storage/rclone/shard_store.go @@ -195,11 +195,11 @@ func (s *ShardStore) GC(avaiables []jcstypes.FileHash) error { return nil } -func (s *ShardStore) Stats() stgtypes.Stats { +func (s *ShardStore) Stats() stgtypes.ShardStoreStats { s.lock.Lock() defer s.lock.Unlock() - return stgtypes.Stats{ + return stgtypes.ShardStoreStats{ Status: stgtypes.StatusOK, FileCount: s.totalShardsCnt, TotalSize: s.Detail.UserSpace.ShardStore.MaxSize, diff --git a/common/pkgs/storage/s3/shard_store.go b/common/pkgs/storage/s3/shard_store.go index 713b18c..62f4a16 100644 --- a/common/pkgs/storage/s3/shard_store.go +++ b/common/pkgs/storage/s3/shard_store.go @@ -244,11 +244,11 @@ func (s *ShardStore) GC(avaiables []jcstypes.FileHash) error { return nil } -func (s *ShardStore) Stats() stgtypes.Stats { +func (s *ShardStore) Stats() stgtypes.ShardStoreStats { s.lock.Lock() defer s.lock.Unlock() - return stgtypes.Stats{ + return stgtypes.ShardStoreStats{ Status: stgtypes.StatusOK, FileCount: s.totalShardsCnt, TotalSize: s.Detail.UserSpace.ShardStore.MaxSize, diff --git a/common/pkgs/storage/types/shard_store.go b/common/pkgs/storage/types/shard_store.go index 1f9d4f0..b44c17e 100644 --- a/common/pkgs/storage/types/shard_store.go +++ b/common/pkgs/storage/types/shard_store.go @@ -18,30 +18,22 @@ type ShardStore interface { // 垃圾清理。只保留availables中的文件,删除其他文件 GC(avaiables []jcstypes.FileHash) error // 获得存储系统信息 - Stats() Stats + Stats() ShardStoreStats } -type Stats struct { +type ShardStoreStats struct { // 存储服务状态,如果状态正常,此值应该是StatusOK - Status Status + Status ShardStoreStatus `json:"status"` // 文件总数 - FileCount int64 + FileCount int64 `json:"fileCount"` // 存储空间总大小 - TotalSize int64 + TotalSize int64 `json:"totalSize"` // 已使用的存储空间大小,可以超过存储空间总大小 - UsedSize int64 + UsedSize int64 `json:"usedSize"` // 描述信息,用于调试 - Description string + Description string `json:"description"` } -type Status interface { - String() string -} - -type OKStatus struct{} - -func (s *OKStatus) String() string { - return "OK" -} +type ShardStoreStatus string -var StatusOK = &OKStatus{} +var StatusOK ShardStoreStatus = "OK" diff --git a/common/pkgs/sysevent/publisher.go b/common/pkgs/sysevent/publisher.go index 45cf83e..57edda0 100644 --- a/common/pkgs/sysevent/publisher.go +++ b/common/pkgs/sysevent/publisher.go @@ -70,6 +70,7 @@ func NewPublisher(cfg Config, thisSource Source) (*Publisher, error) { } pub := &Publisher{ + cfg: cfg, connection: connection, channel: channel, eventChan: async.NewUnboundChannel[SysEvent](), diff --git a/common/pkgs/sysevent/watcher.go b/common/pkgs/sysevent/watcher.go index 5484d8c..9190672 100644 --- a/common/pkgs/sysevent/watcher.go +++ b/common/pkgs/sysevent/watcher.go @@ -21,6 +21,7 @@ type WatcherExited struct { } type WatcherHost struct { + cfg Config watchers []Watcher lock sync.Mutex connection *amqp.Connection @@ -29,6 +30,12 @@ type WatcherHost struct { } func NewWatcherHost(cfg Config) (*WatcherHost, error) { + if !cfg.Enabled { + return &WatcherHost{ + cfg: cfg, + }, nil + } + config := amqp.Config{ Vhost: cfg.VHost, } @@ -80,6 +87,7 @@ func NewWatcherHost(cfg Config) (*WatcherHost, error) { } wat := &WatcherHost{ + cfg: cfg, connection: connection, channel: channel, recvChan: recvChan, @@ -92,6 +100,10 @@ func (w *WatcherHost) Start() *async.UnboundChannel[WatcherEvent] { ch := async.NewUnboundChannel[WatcherEvent]() go func() { + if !w.cfg.Enabled { + return + } + defer ch.Close() defer w.channel.Close() defer w.connection.Close() @@ -119,6 +131,19 @@ func (w *WatcherHost) Start() *async.UnboundChannel[WatcherEvent] { return ch } +func (w *WatcherHost) Stop() { + if !w.cfg.Enabled { + return + } + + w.channel.Close() + w.connection.Close() +} + +func (w *WatcherHost) Enabled() bool { + return w.cfg.Enabled +} + func (w *WatcherHost) AddWatcher(watcher Watcher) { w.lock.Lock() defer w.lock.Unlock() diff --git a/common/types/datamap/datamap.go b/common/types/datamap/datamap.go index 52b4365..b1ca4db 100644 --- a/common/types/datamap/datamap.go +++ b/common/types/datamap/datamap.go @@ -6,6 +6,7 @@ import ( "gitlink.org.cn/cloudream/common/pkgs/types" "gitlink.org.cn/cloudream/common/utils/serder" + stgtypes "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/storage/types" jcstypes "gitlink.org.cn/cloudream/jcs-pub/common/types" ) @@ -98,10 +99,12 @@ var _ = serder.UseTypeUnionInternallyTagged(types.Ref(types.NewTypeUnion[SysEven (*BodyNewUserSpace)(nil), (*BodyUserSpaceUpdated)(nil), (*BodyUserSpaceDeleted)(nil), + (*BodyGlobalObjectStats)(nil), // (*BodyStorageStats)(nil), - // (*BodyHubTransferStats)(nil), - // (*BodyHubStorageTransferStats)(nil), + (*BodyHubTransferStats)(nil), + (*BodyHubStorageTransferStats)(nil), + (*BodyObjectAccessStats)(nil), (*BodyBlockTransfer)(nil), (*BodyBlockDistribution)(nil), @@ -225,6 +228,25 @@ func (b *BodyStorageStats) GetBodyType() string { func (b *BodyStorageStats) OnUnionSerializing() { b.Type = b.GetBodyType() } +*/ + +// 全局总数据量的统计信息 +type BodyGlobalObjectStats struct { + serder.Metadata `union:"GlobalObjectStats"` + Type string `json:"type"` + TotalObjectCount int64 `json:"totalObjectCount"` + TotalDataSize int64 `json:"totalDataSize"` + UserSpaceIDs []jcstypes.UserSpaceID `json:"userSpaceIDs"` + UserSpaceStats []stgtypes.ShardStoreStats `json:"userSpaceStats"` +} + +func (b *BodyGlobalObjectStats) GetBodyType() string { + return "GlobalObjectStats" +} + +func (b *BodyGlobalObjectStats) OnUnionSerializing() { + b.Type = b.GetBodyType() +} // Hub数据传输统计信息的事件 type BodyHubTransferStats struct { @@ -232,7 +254,7 @@ type BodyHubTransferStats struct { Type string `json:"type"` SourceHubID jcstypes.HubID `json:"sourceHubID"` TargetHubID jcstypes.HubID `json:"targetHubID"` - Send DataTrans `json:"send"` + Send TransferStats `json:"send"` StartTimestamp time.Time `json:"startTimestamp"` EndTimestamp time.Time `json:"endTimestamp"` } @@ -245,13 +267,13 @@ func (b *BodyHubTransferStats) OnUnionSerializing() { b.Type = b.GetBodyType() } -type DataTrans struct { - TotalTransfer int64 `json:"totalTransfer"` +type TransferStats struct { + Amount int64 `json:"amount"` RequestCount int64 `json:"requestCount"` FailedRequestCount int64 `json:"failedRequestCount"` - AvgTransfer int64 `json:"avgTransfer"` - MaxTransfer int64 `json:"maxTransfer"` - MinTransfer int64 `json:"minTransfer"` + RequestAverage int64 `json:"requestAverage"` + RequestMax int64 `json:"requestMax"` + RequestMin int64 `json:"requestMin"` } // Hub和Storage数据传输统计信息的事件 @@ -260,8 +282,8 @@ type BodyHubStorageTransferStats struct { Type string `json:"type"` HubID jcstypes.HubID `json:"hubID"` StorageID jcstypes.StorageID `json:"storageID"` - Send DataTrans `json:"send"` - Receive DataTrans `json:"receive"` + Send TransferStats `json:"send"` + Receive TransferStats `json:"receive"` StartTimestamp time.Time `json:"startTimestamp"` EndTimestamp time.Time `json:"endTimestamp"` } @@ -273,7 +295,24 @@ func (b *BodyHubStorageTransferStats) GetBodyType() string { func (b *BodyHubStorageTransferStats) OnUnionSerializing() { b.Type = b.GetBodyType() } -*/ + +// 对象读取或调度时产生的传输事件 +type BodyObjectAccessStats struct { + serder.Metadata `union:"ObjectAccessStats"` + Type string `json:"type"` + ObjectID jcstypes.ObjectID `json:"objectID"` + RequestSize int64 `json:"requestSize"` + TransferAmount int64 `json:"transferAmount"` + ElapsedTime time.Duration `json:"elapsedTime"` +} + +func (b *BodyObjectAccessStats) GetBodyType() string { + return "ObjectAccessStats" +} + +func (b *BodyObjectAccessStats) OnUnionSerializing() { + b.Type = b.GetBodyType() +} // 块传输的事件 type BodyBlockTransfer struct { diff --git a/go.mod b/go.mod index 4b7f55c..5a4d101 100644 --- a/go.mod +++ b/go.mod @@ -81,6 +81,7 @@ require ( github.com/robfig/cron/v3 v3.0.1 // indirect github.com/rogpeppe/go-internal v1.12.0 // indirect github.com/tjfoc/gmsm v1.4.1 // indirect + github.com/tmaxmax/go-sse v0.11.0 // indirect github.com/twitchyliquid64/golang-asm v0.15.1 // indirect go.mongodb.org/mongo-driver v1.12.0 // indirect golang.org/x/arch v0.8.0 // indirect diff --git a/go.sum b/go.sum index 352a68a..3848c6b 100644 --- a/go.sum +++ b/go.sum @@ -329,6 +329,8 @@ github.com/stretchr/testify v1.10.0 h1:Xv5erBjTwe/5IxqUQTdXv5kgmIvbHo3QQyRwhJsOf github.com/stretchr/testify v1.10.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= github.com/tjfoc/gmsm v1.4.1 h1:aMe1GlZb+0bLjn+cKTPEvvn9oUEBlJitaZiiBwsbgho= github.com/tjfoc/gmsm v1.4.1/go.mod h1:j4INPkHWMrhJb38G+J6W4Tw0AbuN8Thu3PbdVYhVcTE= +github.com/tmaxmax/go-sse v0.11.0 h1:nogmJM6rJUoOLoAwEKeQe5XlVpt9l7N82SS1jI7lWFg= +github.com/tmaxmax/go-sse v0.11.0/go.mod h1:u/2kZQR1tyngo1lKaNCj1mJmhXGZWS1Zs5yiSOD+Eg8= github.com/tv42/httpunix v0.0.0-20150427012821-b75d8614f926/go.mod h1:9ESjWnEqriFuLhtthL60Sar/7RFoluCcXsuvEwTV5KM= github.com/twitchyliquid64/golang-asm v0.15.1 h1:SU5vSMR7hnwNxj24w34ZyCi/FmDZTkS4MhqMhdFk5YI= github.com/twitchyliquid64/golang-asm v0.15.1/go.mod h1:a1lVb/DtPvCB8fslRZhAngC2+aY1QWCk3Cedj/Gdt08= diff --git a/hub/internal/cmd/serve.go b/hub/internal/cmd/serve.go index a4217f8..7226e1f 100644 --- a/hub/internal/cmd/serve.go +++ b/hub/internal/cmd/serve.go @@ -121,7 +121,7 @@ func serve(configPath string, opts serveOptions) { defer evtPub.Stop() // 初始化定时任务执行器 - tktk := ticktock.New(config.Cfg().TickTock, config.Cfg().ID, stgPool) + tktk := ticktock.New(config.Cfg().TickTock, config.Cfg().ID, stgPool, evtPub) tktk.Start() defer tktk.Stop() diff --git a/hub/internal/pubshards/pub_shards.go b/hub/internal/pubshards/pub_shards.go index 4a7e56a..8caeffc 100644 --- a/hub/internal/pubshards/pub_shards.go +++ b/hub/internal/pubshards/pub_shards.go @@ -90,16 +90,19 @@ func (s *LoadedStore) GC(userID jcstypes.UserID, fileHashes []jcstypes.FileHash) }) } -func (s *LoadedStore) GetUserStats(userID jcstypes.UserID) stgtypes.Stats { - cnt := int64(0) - size := int64(0) - s.ClientFileHashDB.Table("Shard").Select("Count(Hash), Sum(Size)").Find(&cnt) +func (s *LoadedStore) GetUserStats(userID jcstypes.UserID) stgtypes.ShardStoreStats { + var stats struct { + Cnt int64 `gorm:"column:Cnt; type:bigint;"` + Size int64 `gorm:"column:Size; type:bigint;"` + } + + s.ClientFileHashDB.Table("Shard").Select("Count(Hash) as Cnt, Sum(Size) as Size").Find(&stats) - return stgtypes.Stats{ + return stgtypes.ShardStoreStats{ Status: stgtypes.StatusOK, - FileCount: cnt, + FileCount: stats.Cnt, TotalSize: s.Config.ShardStore.MaxSize, - UsedSize: size, + UsedSize: stats.Size, } } diff --git a/hub/internal/ticktock/stats_log.go b/hub/internal/ticktock/stats_log.go new file mode 100644 index 0000000..2b7ab27 --- /dev/null +++ b/hub/internal/ticktock/stats_log.go @@ -0,0 +1,69 @@ +package ticktock + +import ( + "gitlink.org.cn/cloudream/common/utils/reflect2" + stgglb "gitlink.org.cn/cloudream/jcs-pub/common/globals" + "gitlink.org.cn/cloudream/jcs-pub/common/types/datamap" +) + +type StatsLog struct { +} + +func (j *StatsLog) Name() string { + return reflect2.TypeNameOf[StatsLog]() +} + +func (j *StatsLog) Execute(t *TickTock) { + j.logTransferStats(t) +} + +func (j *StatsLog) logTransferStats(t *TickTock) { + // log := logger.WithType[StatsLog]("Event") + if stgglb.Stats.HubTransfer != nil { + data := stgglb.Stats.HubTransfer.DumpData(true) + for k, v := range data.Entries { + t.evtPub.Publish(&datamap.BodyHubTransferStats{ + SourceHubID: t.myHubID, + TargetHubID: k, + Send: datamap.TransferStats{ + Amount: v.OutputBytes, + RequestCount: v.OutputReqs, + FailedRequestCount: v.OutputReqs - v.SuccessOutputReqs, + RequestAverage: v.OutputBytes / v.OutputReqs, + RequestMax: v.MaxOutputBytes, + RequestMin: v.MinOutputBytes, + }, + StartTimestamp: data.StartTime, + EndTimestamp: data.EndTime, + }) + } + } + + if stgglb.Stats.HubStorageTransfer != nil { + data := stgglb.Stats.HubStorageTransfer.DumpData(true) + for k, v := range data.Entries { + t.evtPub.Publish(&datamap.BodyHubStorageTransferStats{ + HubID: t.myHubID, + StorageID: k, + Send: datamap.TransferStats{ + Amount: v.OutputBytes, + RequestCount: v.TotalOutputReqs, + FailedRequestCount: v.TotalOutputReqs - v.SuccessOutputReqs, + RequestAverage: v.OutputBytes / v.TotalOutputReqs, + RequestMax: v.MaxOutputBytes, + RequestMin: v.MinOutputBytes, + }, + Receive: datamap.TransferStats{ + Amount: v.InputBytes, + RequestCount: v.TotalInputReqs, + FailedRequestCount: v.TotalInputReqs - v.SuccessInputReqs, + RequestAverage: v.InputBytes / v.TotalInputReqs, + RequestMax: v.MaxInputBytes, + RequestMin: v.MinInputBytes, + }, + StartTimestamp: data.StartTime, + EndTimestamp: data.EndTime, + }) + } + } +} diff --git a/hub/internal/ticktock/ticktock.go b/hub/internal/ticktock/ticktock.go index 67df20a..a4f24bc 100644 --- a/hub/internal/ticktock/ticktock.go +++ b/hub/internal/ticktock/ticktock.go @@ -6,6 +6,7 @@ import ( "github.com/go-co-op/gocron/v2" "gitlink.org.cn/cloudream/common/pkgs/logger" "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/storage/pool" + "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/sysevent" jcstypes "gitlink.org.cn/cloudream/jcs-pub/common/types" ) @@ -26,9 +27,10 @@ type TickTock struct { myHubID jcstypes.HubID stgPool *pool.Pool + evtPub *sysevent.Publisher } -func New(cfg Config, myHubID jcstypes.HubID, stgPool *pool.Pool) *TickTock { +func New(cfg Config, myHubID jcstypes.HubID, stgPool *pool.Pool, evtPub *sysevent.Publisher) *TickTock { sch, _ := gocron.NewScheduler() t := &TickTock{ cfg: cfg, @@ -37,6 +39,7 @@ func New(cfg Config, myHubID jcstypes.HubID, stgPool *pool.Pool) *TickTock { myHubID: myHubID, stgPool: stgPool, + evtPub: evtPub, } t.initJobs() return t @@ -74,4 +77,8 @@ func (t *TickTock) addJob(job Job, duration gocron.JobDefinition) { func (t *TickTock) initJobs() { t.addJob(&TestHubConnectivities{myHubID: t.myHubID}, gocron.DurationJob(t.cfg.TestHubConnectivitiesInterval)) + + t.addJob(&StatsLog{}, gocron.DailyJob(1, gocron.NewAtTimes( + gocron.NewAtTime(0, 0, 0), + ))) } diff --git a/jcsctl/cmd/admin/sysevent/sysevent.go b/jcsctl/cmd/admin/sysevent/sysevent.go new file mode 100644 index 0000000..12e3c28 --- /dev/null +++ b/jcsctl/cmd/admin/sysevent/sysevent.go @@ -0,0 +1,15 @@ +package sysevent + +import ( + "github.com/spf13/cobra" + "gitlink.org.cn/cloudream/jcs-pub/jcsctl/cmd/admin" +) + +var SysEventCmd = &cobra.Command{ + Use: "sysevent", + Aliases: []string{"sevt"}, +} + +func init() { + admin.AdminCmd.AddCommand(SysEventCmd) +} diff --git a/jcsctl/cmd/admin/sysevent/watch.go b/jcsctl/cmd/admin/sysevent/watch.go new file mode 100644 index 0000000..e0101d0 --- /dev/null +++ b/jcsctl/cmd/admin/sysevent/watch.go @@ -0,0 +1,45 @@ +package sysevent + +import ( + "bytes" + "encoding/json" + "fmt" + + "github.com/spf13/cobra" + se "github.com/tmaxmax/go-sse" + cliapi "gitlink.org.cn/cloudream/jcs-pub/client/sdk/api/v1" + "gitlink.org.cn/cloudream/jcs-pub/jcsctl/cmd" +) + +func init() { + var opt watchOpt + cmd := cobra.Command{ + Use: "watch", + Args: cobra.ExactArgs(0), + RunE: func(c *cobra.Command, args []string) error { + ctx := cmd.GetCmdCtx(c) + return watch(c, ctx, opt, args) + }, + } + SysEventCmd.AddCommand(&cmd) +} + +type watchOpt struct { +} + +func watch(c *cobra.Command, ctx *cmd.CommandContext, opt watchOpt, args []string) error { + resp, err := ctx.Client.SysEvent().Watch(cliapi.SysEventWatch{}) + if err != nil { + return fmt.Errorf("begin watch : %v", err) + } + defer resp.Stream.Close() + + se.Read(resp.Stream, nil)(func(e se.Event, err error) bool { + buf := bytes.NewBuffer(nil) + json.Indent(buf, []byte(e.Data), "", " ") + fmt.Println(buf.String()) + return true + }) + + return nil +} diff --git a/jcsctl/cmd/all/all.go b/jcsctl/cmd/all/all.go index 8aa9b87..a088e59 100644 --- a/jcsctl/cmd/all/all.go +++ b/jcsctl/cmd/all/all.go @@ -2,6 +2,7 @@ package all import ( _ "gitlink.org.cn/cloudream/jcs-pub/jcsctl/cmd/admin" + _ "gitlink.org.cn/cloudream/jcs-pub/jcsctl/cmd/admin/sysevent" _ "gitlink.org.cn/cloudream/jcs-pub/jcsctl/cmd/admin/ticktock" _ "gitlink.org.cn/cloudream/jcs-pub/jcsctl/cmd/bucket" _ "gitlink.org.cn/cloudream/jcs-pub/jcsctl/cmd/geto"