From 021131b7a6e53e769497f507d6137971098e257c Mon Sep 17 00:00:00 2001 From: Sydonian <794346190@qq.com> Date: Mon, 14 Jul 2025 09:07:09 +0800 Subject: [PATCH] =?UTF-8?q?=E5=A2=9E=E5=8A=A0=E7=BB=9F=E8=AE=A1=E4=BC=A0?= =?UTF-8?q?=E8=BE=93=E9=80=9F=E5=BA=A6=E7=9A=84=E6=9C=BA=E5=88=B6?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- client/internal/cmdline/serve.go | 10 +- client/internal/cmdline/test.go | 8 +- client/internal/cmdline/vfstest.go | 8 +- client/internal/downloader/downloader.go | 29 ++-- client/internal/downloader/iterator.go | 34 ++++- client/internal/downloader/lrc.go | 14 +- .../internal/downloader/lrc_strip_iterator.go | 20 ++- client/internal/downloader/strip_iterator.go | 14 +- client/internal/services/service.go | 4 + client/internal/services/user_space.go | 48 +++++-- client/internal/speedstats/speedstats.go | 128 ++++++++++++++++++ client/internal/speedstats/speedstats_test.go | 25 ++++ client/internal/ticktock/speed_stats_step.go | 27 ++++ client/internal/ticktock/ticktock.go | 43 +++--- client/internal/uploader/create_load.go | 8 +- client/internal/uploader/update.go | 9 +- common/pkgs/ioswitch/exec/executor.go | 19 ++- common/pkgs/ioswitch/exec/plan_builder.go | 2 +- common/pkgs/ioswitch2/http_hub_worker.go | 4 +- common/pkgs/ioswitch2/hub_worker.go | 3 +- common/pkgs/ioswitch2/ops2/base_store.go | 52 ++++++- common/pkgs/rpc/hub/ioswitch.go | 3 +- hub/internal/http/hub_io.go | 11 +- hub/internal/rpc/ioswitch.go | 11 +- hub/sdk/api/hub_io.go | 3 +- 25 files changed, 458 insertions(+), 79 deletions(-) create mode 100644 client/internal/speedstats/speedstats.go create mode 100644 client/internal/speedstats/speedstats_test.go create mode 100644 client/internal/ticktock/speed_stats_step.go diff --git a/client/internal/cmdline/serve.go b/client/internal/cmdline/serve.go index 99f1c01..0adb4c0 100644 --- a/client/internal/cmdline/serve.go +++ b/client/internal/cmdline/serve.go @@ -18,6 +18,7 @@ import ( "gitlink.org.cn/cloudream/jcs-pub/client/internal/mount" "gitlink.org.cn/cloudream/jcs-pub/client/internal/services" "gitlink.org.cn/cloudream/jcs-pub/client/internal/spacesyncer" + "gitlink.org.cn/cloudream/jcs-pub/client/internal/speedstats" "gitlink.org.cn/cloudream/jcs-pub/client/internal/ticktock" "gitlink.org.cn/cloudream/jcs-pub/client/internal/uploader" stgglb "gitlink.org.cn/cloudream/jcs-pub/common/globals" @@ -158,17 +159,20 @@ func serveHTTP(configPath string, opts serveHTTPOptions) { // 存储管理器 stgPool := pool.NewPool() + // 传输速度统计 + spdStats := speedstats.New() + // 下载策略 strgSel := strategy.NewSelector(config.Cfg().DownloadStrategy, spaceMeta, hubMeta, conMeta) // 下载器 - dlder := downloader.NewDownloader(config.Cfg().Downloader, conCol, stgPool, strgSel, db) + dlder := downloader.NewDownloader(config.Cfg().Downloader, conCol, stgPool, strgSel, db, spdStats) // 上传器 uploader := uploader.NewUploader(publock, conCol, stgPool, spaceMeta, db) // 定时任务 - tktk := ticktock.New(config.Cfg().TickTock, db, spaceMeta, stgPool, evtPub, publock) + tktk := ticktock.New(config.Cfg().TickTock, db, spaceMeta, stgPool, evtPub, publock, spdStats) tktk.Start() defer tktk.Stop() @@ -194,7 +198,7 @@ func serveHTTP(configPath string, opts serveHTTPOptions) { mntChan := mnt.Start() defer mnt.Stop() - svc := services.NewService(publock, dlder, acStat, uploader, strgSel, spaceMeta, db, evtPub, mnt, stgPool, spaceSync, tktk) + svc := services.NewService(publock, dlder, acStat, uploader, strgSel, spaceMeta, db, evtPub, mnt, stgPool, spaceSync, tktk, spdStats) // HTTP接口 httpCfgJSON := config.Cfg().HTTP diff --git a/client/internal/cmdline/test.go b/client/internal/cmdline/test.go index f6966fd..bd8593b 100644 --- a/client/internal/cmdline/test.go +++ b/client/internal/cmdline/test.go @@ -17,6 +17,7 @@ import ( "gitlink.org.cn/cloudream/jcs-pub/client/internal/metacache" "gitlink.org.cn/cloudream/jcs-pub/client/internal/services" "gitlink.org.cn/cloudream/jcs-pub/client/internal/spacesyncer" + "gitlink.org.cn/cloudream/jcs-pub/client/internal/speedstats" "gitlink.org.cn/cloudream/jcs-pub/client/internal/uploader" clitypes "gitlink.org.cn/cloudream/jcs-pub/client/types" stgglb "gitlink.org.cn/cloudream/jcs-pub/common/globals" @@ -171,11 +172,14 @@ func test(configPath string) { // 存储管理器 stgPool := pool.NewPool() + // 传输速度统计 + spdStats := speedstats.New() + // 下载策略 strgSel := strategy.NewSelector(config.Cfg().DownloadStrategy, stgMeta, hubMeta, conMeta) // 下载器 - dlder := downloader.NewDownloader(config.Cfg().Downloader, conCol, stgPool, strgSel, db) + dlder := downloader.NewDownloader(config.Cfg().Downloader, conCol, stgPool, strgSel, db, spdStats) // 上传器 uploader := uploader.NewUploader(publock, conCol, stgPool, stgMeta, db) @@ -185,7 +189,7 @@ func test(configPath string) { spaceSyncChan := spaceSync.Start() defer spaceSync.Stop() - svc := services.NewService(publock, dlder, acStat, uploader, strgSel, stgMeta, db, evtPub, nil, stgPool, spaceSync, nil) + svc := services.NewService(publock, dlder, acStat, uploader, strgSel, stgMeta, db, evtPub, nil, stgPool, spaceSync, nil, spdStats) go func() { doTest(svc) diff --git a/client/internal/cmdline/vfstest.go b/client/internal/cmdline/vfstest.go index d4e85f4..bd9cd7d 100644 --- a/client/internal/cmdline/vfstest.go +++ b/client/internal/cmdline/vfstest.go @@ -20,6 +20,7 @@ import ( "gitlink.org.cn/cloudream/jcs-pub/client/internal/mount/vfstest" "gitlink.org.cn/cloudream/jcs-pub/client/internal/services" "gitlink.org.cn/cloudream/jcs-pub/client/internal/spacesyncer" + "gitlink.org.cn/cloudream/jcs-pub/client/internal/speedstats" "gitlink.org.cn/cloudream/jcs-pub/client/internal/uploader" stgglb "gitlink.org.cn/cloudream/jcs-pub/common/globals" "gitlink.org.cn/cloudream/jcs-pub/common/models/datamap" @@ -147,6 +148,9 @@ func vfsTest(configPath string, opts serveHTTPOptions) { acStatChan := acStat.Start() defer acStat.Stop() + // 传输速度统计 + spdStats := speedstats.New() + // 存储管理器 stgPool := pool.NewPool() @@ -154,7 +158,7 @@ func vfsTest(configPath string, opts serveHTTPOptions) { strgSel := strategy.NewSelector(config.Cfg().DownloadStrategy, stgMeta, hubMeta, conMeta) // 下载器 - dlder := downloader.NewDownloader(config.Cfg().Downloader, conCol, stgPool, strgSel, db) + dlder := downloader.NewDownloader(config.Cfg().Downloader, conCol, stgPool, strgSel, db, spdStats) // 上传器 uploader := uploader.NewUploader(publock, conCol, stgPool, stgMeta, db) @@ -177,7 +181,7 @@ func vfsTest(configPath string, opts serveHTTPOptions) { mntChan := mnt.Start() defer mnt.Stop() - svc := services.NewService(publock, dlder, acStat, uploader, strgSel, stgMeta, db, evtPub, mnt, stgPool, spaceSync, nil) + svc := services.NewService(publock, dlder, acStat, uploader, strgSel, stgMeta, db, evtPub, mnt, stgPool, spaceSync, nil, spdStats) // HTTP接口 httpCfgJSON := config.Cfg().HTTP diff --git a/client/internal/downloader/downloader.go b/client/internal/downloader/downloader.go index 31c93ae..6e94eec 100644 --- a/client/internal/downloader/downloader.go +++ b/client/internal/downloader/downloader.go @@ -8,6 +8,7 @@ import ( "gitlink.org.cn/cloudream/common/pkgs/iterator" "gitlink.org.cn/cloudream/jcs-pub/client/internal/db" "gitlink.org.cn/cloudream/jcs-pub/client/internal/downloader/strategy" + "gitlink.org.cn/cloudream/jcs-pub/client/internal/speedstats" clitypes "gitlink.org.cn/cloudream/jcs-pub/client/types" "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/connectivity" "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/storage/pool" @@ -37,27 +38,29 @@ type Downloading struct { } type Downloader struct { - strips *StripCache - cfg Config - conn *connectivity.Collector - stgPool *pool.Pool - selector *strategy.Selector - db *db.DB + strips *StripCache + cfg Config + conn *connectivity.Collector + stgPool *pool.Pool + selector *strategy.Selector + db *db.DB + speedStats *speedstats.SpeedStats } -func NewDownloader(cfg Config, conn *connectivity.Collector, stgPool *pool.Pool, sel *strategy.Selector, db *db.DB) *Downloader { +func NewDownloader(cfg Config, conn *connectivity.Collector, stgPool *pool.Pool, sel *strategy.Selector, db *db.DB, speedStats *speedstats.SpeedStats) *Downloader { if cfg.MaxStripCacheCount == 0 { cfg.MaxStripCacheCount = DefaultMaxStripCacheCount } ch, _ := lru.New[ECStripKey, ObjectECStrip](cfg.MaxStripCacheCount) return &Downloader{ - strips: ch, - cfg: cfg, - conn: conn, - stgPool: stgPool, - selector: sel, - db: db, + strips: ch, + cfg: cfg, + conn: conn, + stgPool: stgPool, + selector: sel, + db: db, + speedStats: speedStats, } } diff --git a/client/internal/downloader/iterator.go b/client/internal/downloader/iterator.go index 18a5d9b..dd22b24 100644 --- a/client/internal/downloader/iterator.go +++ b/client/internal/downloader/iterator.go @@ -15,6 +15,7 @@ import ( "gitlink.org.cn/cloudream/jcs-pub/client/types" stgglb "gitlink.org.cn/cloudream/jcs-pub/common/globals" "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/ioswitch2" + "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/ioswitch2/ops2" "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/ioswitch2/parser" "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/iterator" "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/publock" @@ -115,12 +116,20 @@ func (i *DownloadObjectIterator) downloadDirect(req downloadReqeust2, strg strat toExec.Range = math2.Range{ Offset: req.Raw.Offset, } + len := req.Detail.Object.Size - req.Raw.Offset if req.Raw.Length != -1 { - len := req.Raw.Length + len = req.Raw.Length toExec.Range.Length = &len } - ft.AddFrom(ioswitch2.NewFromShardstore(req.Detail.Object.FileHash, strg.UserSpace, ioswitch2.RawStream())).AddTo(toExec) + fromSpace := strg.UserSpace + + shouldAtClient := i.downloader.speedStats.ShouldAtClient(len) + if shouldAtClient { + fromSpace.RecommendHub = nil + } + + ft.AddFrom(ioswitch2.NewFromShardstore(req.Detail.Object.FileHash, fromSpace, ioswitch2.RawStream())).AddTo(toExec) strHandle = handle plans := exec.NewPlanBuilder() @@ -132,10 +141,15 @@ func (i *DownloadObjectIterator) downloadDirect(req downloadReqeust2, strg strat exec.SetValueByType(exeCtx, i.downloader.stgPool) exec := plans.Execute(exeCtx) go func() { - _, err := exec.Wait(context.TODO()) + ret, err := exec.Wait(context.TODO()) if err != nil { logger.Warnf("downloading object %v: %v", req.Raw.ObjectID, err) } + + for _, v := range ret.GetArray(ops2.BaseReadStatsStoreKey) { + v2 := v.(*ops2.BaseReadStatsValue) + i.downloader.speedStats.Record(v2.Size, v2.Time, v2.Location.IsDriver) + } }() return exec.BeginRead(strHandle) @@ -152,11 +166,23 @@ func (i *DownloadObjectIterator) downloadECReconstruct(req downloadReqeust2, str } logger.Debug(logStrs...) + length := req.Detail.Object.Size - req.Raw.Offset + if req.Raw.Length != -1 { + length = req.Raw.Length + } + + shouldAtClient := i.downloader.speedStats.ShouldAtClient(length) + downloadBlks := make([]downloadBlock, len(strg.Blocks)) for i, b := range strg.Blocks { + fromSpace := strg.UserSpaces[i] + if shouldAtClient { + fromSpace.RecommendHub = nil + } + downloadBlks[i] = downloadBlock{ Block: b, - Space: strg.UserSpaces[i], + Space: fromSpace, } } diff --git a/client/internal/downloader/lrc.go b/client/internal/downloader/lrc.go index 0d64e1d..876b856 100644 --- a/client/internal/downloader/lrc.go +++ b/client/internal/downloader/lrc.go @@ -22,11 +22,23 @@ func (iter *DownloadObjectIterator) downloadLRCReconstruct(req downloadReqeust2, } logger.Debug(logStrs...) + length := req.Detail.Object.Size - req.Raw.Offset + if req.Raw.Length != -1 { + length = req.Raw.Length + } + + shouldAtClient := iter.downloader.speedStats.ShouldAtClient(length) + downloadBlks := make([]downloadBlock, len(strg.Blocks)) for i, b := range strg.Blocks { + fromSpace := strg.Spaces[i] + if shouldAtClient { + fromSpace.RecommendHub = nil + } + downloadBlks[i] = downloadBlock{ Block: b, - Space: strg.Spaces[i], + Space: fromSpace, } } diff --git a/client/internal/downloader/lrc_strip_iterator.go b/client/internal/downloader/lrc_strip_iterator.go index 1d067d5..3b4ae7e 100644 --- a/client/internal/downloader/lrc_strip_iterator.go +++ b/client/internal/downloader/lrc_strip_iterator.go @@ -10,12 +10,13 @@ import ( "gitlink.org.cn/cloudream/common/utils/math2" clitypes "gitlink.org.cn/cloudream/jcs-pub/client/types" "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/ioswitch/exec" + "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/ioswitch2/ops2" "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/ioswitchlrc" "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/ioswitchlrc/parser" ) type LRCStripIterator struct { - downloder *Downloader + downloader *Downloader object clitypes.Object blocks []downloadBlock red clitypes.LRCRedundancy @@ -33,7 +34,7 @@ func NewLRCStripIterator(downloder *Downloader, object clitypes.Object, blocks [ } iter := &LRCStripIterator{ - downloder: downloder, + downloader: downloder, object: object, blocks: blocks, red: red, @@ -114,12 +115,23 @@ func (s *LRCStripIterator) downloading() { } exeCtx := exec.NewExecContext() - exec.SetValueByType(exeCtx, s.downloder.stgPool) + exec.SetValueByType(exeCtx, s.downloader.stgPool) exec := plans.Execute(exeCtx) ctx, cancel := context.WithCancel(context.Background()) - go exec.Wait(ctx) + go func() { + ret, err := exec.Wait(ctx) + if err != nil { + logger.Warnf("downloading lrc strip: %v", err) + return + } + + for _, v := range ret.GetArray(ops2.BaseReadStatsStoreKey) { + v2 := v.(*ops2.BaseReadStatsValue) + s.downloader.speedStats.Record(v2.Size, v2.Time, v2.Location.IsDriver) + } + }() defer cancel() str, err := exec.BeginRead(hd) diff --git a/client/internal/downloader/strip_iterator.go b/client/internal/downloader/strip_iterator.go index 7a57b6b..9c311b5 100644 --- a/client/internal/downloader/strip_iterator.go +++ b/client/internal/downloader/strip_iterator.go @@ -11,6 +11,7 @@ import ( "gitlink.org.cn/cloudream/jcs-pub/client/types" "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/ioswitch/exec" "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/ioswitch2" + "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/ioswitch2/ops2" "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/ioswitch2/parser" ) @@ -221,7 +222,18 @@ func (s *StripIterator) readStrip(stripIndex int64, buf []byte) (int, error) { exec := plans.Execute(exeCtx) ctx, cancel := context.WithCancel(context.Background()) - go exec.Wait(ctx) + go func() { + ret, err := exec.Wait(ctx) + if err != nil { + logger.Warnf("downloading strip: %v", err) + return + } + + for _, v := range ret.GetArray(ops2.BaseReadStatsStoreKey) { + v2 := v.(*ops2.BaseReadStatsValue) + s.downloader.speedStats.Record(v2.Size, v2.Time, v2.Location.IsDriver) + } + }() str, err := exec.BeginRead(hd) if err != nil { diff --git a/client/internal/services/service.go b/client/internal/services/service.go index 16cbf7b..1e6648d 100644 --- a/client/internal/services/service.go +++ b/client/internal/services/service.go @@ -8,6 +8,7 @@ import ( "gitlink.org.cn/cloudream/jcs-pub/client/internal/metacache" "gitlink.org.cn/cloudream/jcs-pub/client/internal/mount" "gitlink.org.cn/cloudream/jcs-pub/client/internal/spacesyncer" + "gitlink.org.cn/cloudream/jcs-pub/client/internal/speedstats" "gitlink.org.cn/cloudream/jcs-pub/client/internal/ticktock" "gitlink.org.cn/cloudream/jcs-pub/client/internal/uploader" "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/publock" @@ -29,6 +30,7 @@ type Service struct { StgPool *pool.Pool SpaceSyncer *spacesyncer.SpaceSyncer TickTock *ticktock.TickTock + SpeedStats *speedstats.SpeedStats } func NewService( @@ -44,6 +46,7 @@ func NewService( stgPool *pool.Pool, spaceSyncer *spacesyncer.SpaceSyncer, tickTock *ticktock.TickTock, + speedStats *speedstats.SpeedStats, ) *Service { return &Service{ PubLock: publock, @@ -58,5 +61,6 @@ func NewService( StgPool: stgPool, SpaceSyncer: spaceSyncer, TickTock: tickTock, + SpeedStats: speedStats, } } diff --git a/client/internal/services/user_space.go b/client/internal/services/user_space.go index 037c2c7..afec177 100644 --- a/client/internal/services/user_space.go +++ b/client/internal/services/user_space.go @@ -17,6 +17,7 @@ import ( "gitlink.org.cn/cloudream/jcs-pub/common/ecode" stgglb "gitlink.org.cn/cloudream/jcs-pub/common/globals" "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/ioswitch2" + "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/ioswitch2/ops2" "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/ioswitch2/parser" "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/publock/reqbuilder" "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/storage/factory" @@ -189,8 +190,8 @@ func (svc *UserSpaceService) Test(req cliapi.UserSpaceTest) (*cliapi.UserSpaceTe } func (svc *UserSpaceService) DownloadPackage(packageID clitypes.PackageID, userspaceID clitypes.UserSpaceID, rootPath string) error { - destStg := svc.UserSpaceMeta.Get(userspaceID) - if destStg == nil { + destSpace := svc.UserSpaceMeta.Get(userspaceID) + if destSpace == nil { return fmt.Errorf("userspace not found: %d", userspaceID) } @@ -216,20 +217,35 @@ func (svc *UserSpaceService) DownloadPackage(packageID clitypes.PackageID, users for i := 0; i < 10 && dIndex < len(details); i++ { strg, err := svc.StrategySelector.Select(strategy.Request{ Detail: details[dIndex], - DestLocation: destStg.UserSpace.Storage.GetLocation(), + DestLocation: destSpace.UserSpace.Storage.GetLocation(), }) if err != nil { return fmt.Errorf("select download strategy: %w", err) } + shouldAtClient := svc.SpeedStats.ShouldAtClient(details[dIndex].Object.Size) + ft := ioswitch2.NewFromTo() switch strg := strg.(type) { case *strategy.DirectStrategy: - ft.AddFrom(ioswitch2.NewFromShardstore(strg.Detail.Object.FileHash, strg.UserSpace, ioswitch2.RawStream())) + if shouldAtClient && strg.UserSpace.RecommendHub != nil { + newSpace := strg.UserSpace + newSpace.RecommendHub = nil + ft.AddFrom(ioswitch2.NewFromShardstore(strg.Detail.Object.FileHash, newSpace, ioswitch2.RawStream())) + } else { + ft.AddFrom(ioswitch2.NewFromShardstore(strg.Detail.Object.FileHash, strg.UserSpace, ioswitch2.RawStream())) + } case *strategy.ECReconstructStrategy: for i, b := range strg.Blocks { - ft.AddFrom(ioswitch2.NewFromShardstore(b.FileHash, strg.UserSpaces[i], ioswitch2.ECStream(b.Index))) + if shouldAtClient && strg.UserSpaces[i].RecommendHub != nil { + newSpace := strg.UserSpaces[i] + newSpace.RecommendHub = nil + ft.AddFrom(ioswitch2.NewFromShardstore(b.FileHash, newSpace, ioswitch2.RawStream())) + } else { + ft.AddFrom(ioswitch2.NewFromShardstore(b.FileHash, strg.UserSpaces[i], ioswitch2.RawStream())) + } + ft.ECParam = &strg.Redundancy } default: @@ -238,13 +254,20 @@ func (svc *UserSpaceService) DownloadPackage(packageID clitypes.PackageID, users objPath := clitypes.PathFromJcsPathString(details[dIndex].Object.Path) dstPath := rootJPath.ConcatNew(objPath) - ft.AddTo(ioswitch2.NewToBaseStore(*destStg, dstPath)) + + newDstSpace := *destSpace + if shouldAtClient { + newDstSpace.RecommendHub = nil + } + + ft.AddTo(ioswitch2.NewToBaseStore(newDstSpace, dstPath)) + // 顺便保存到同存储服务的分片存储中 - if destStg.UserSpace.ShardStore != nil { - ft.AddTo(ioswitch2.NewToShardStore(*destStg, ioswitch2.RawStream(), "")) + if destSpace.UserSpace.ShardStore != nil { + ft.AddTo(ioswitch2.NewToShardStore(newDstSpace, ioswitch2.RawStream(), "")) pinned = append(pinned, clitypes.PinnedObject{ ObjectID: details[dIndex].Object.ObjectID, - UserSpaceID: destStg.UserSpace.UserSpaceID, + UserSpaceID: destSpace.UserSpace.UserSpaceID, CreateTime: time.Now(), }) } @@ -263,11 +286,16 @@ func (svc *UserSpaceService) DownloadPackage(packageID clitypes.PackageID, users exeCtx := exec.NewExecContext() exec.SetValueByType(exeCtx, svc.StgPool) drv := plans.Execute(exeCtx) - _, err = drv.Wait(context.Background()) + ret, err := drv.Wait(context.Background()) if err != nil { return err } + for _, v := range ret.GetArray(ops2.BaseReadStatsStoreKey) { + v2 := v.(*ops2.BaseReadStatsValue) + svc.SpeedStats.Record(v2.Size, v2.Time, v2.Location.IsDriver) + } + err = svc.DB.DoTx(func(tx db.SQLContext) error { objIDs := make([]clitypes.ObjectID, len(pinned)) for i, obj := range pinned { diff --git a/client/internal/speedstats/speedstats.go b/client/internal/speedstats/speedstats.go new file mode 100644 index 0000000..cf44df8 --- /dev/null +++ b/client/internal/speedstats/speedstats.go @@ -0,0 +1,128 @@ +package speedstats + +import ( + "fmt" + "math/rand/v2" + "sync" + "time" +) + +const ( + LastStatsWeigth = 0.8 + MinProb = 0.05 +) + +type Stats struct { + TotalSize int64 + TotalTime time.Duration + LastSpeed float64 + HasLastSpeed bool + AvarageSpeed float64 +} + +func (s *Stats) Record(size int64, time time.Duration) { + s.TotalSize += size + s.TotalTime += time + + if s.HasLastSpeed { + s.AvarageSpeed = float64(s.TotalSize)*(1-LastStatsWeigth)/s.TotalTime.Seconds() + s.LastSpeed*LastStatsWeigth + } else { + s.AvarageSpeed = float64(s.TotalSize) / s.TotalTime.Seconds() + } +} + +func (s *Stats) Step() { + s.TotalSize = 0 + s.TotalTime = 0 + s.LastSpeed = s.AvarageSpeed + s.AvarageSpeed = s.LastSpeed * LastStatsWeigth + s.HasLastSpeed = true +} + +type SpeedStats struct { + lock sync.RWMutex + stats100M []Stats + stats1G []Stats + statsAbove1G []Stats +} + +func New() *SpeedStats { + return &SpeedStats{ + stats100M: make([]Stats, 2), + stats1G: make([]Stats, 2), + statsAbove1G: make([]Stats, 2), + } +} + +func (p *SpeedStats) Record(size int64, time time.Duration, happenedAtClient bool) { + p.lock.Lock() + defer p.lock.Unlock() + + if size < 100*1024*1024 { + if happenedAtClient { + p.stats100M[0].Record(size, time) + } else { + p.stats100M[1].Record(size, time) + } + } else if size < 1024*1024*1024 { + if happenedAtClient { + p.stats1G[0].Record(size, time) + } else { + p.stats1G[1].Record(size, time) + } + } else { + if happenedAtClient { + p.statsAbove1G[0].Record(size, time) + } else { + p.statsAbove1G[1].Record(size, time) + } + } +} + +func (p *SpeedStats) Step() { + p.lock.Lock() + defer p.lock.Unlock() + + p.stats100M[0].Step() + p.stats100M[1].Step() + p.stats1G[0].Step() + p.stats1G[1].Step() + p.statsAbove1G[0].Step() + p.statsAbove1G[1].Step() +} + +func (p *SpeedStats) Dump() string { + p.lock.RLock() + defer p.lock.RUnlock() + + return fmt.Sprintf("100M: %v, %v\n1G: %v, %v\nAbove1G: %v, %v\n", p.stats100M[0].AvarageSpeed, p.stats100M[1].AvarageSpeed, p.stats1G[0].AvarageSpeed, p.stats1G[1].AvarageSpeed, p.statsAbove1G[0].AvarageSpeed, p.statsAbove1G[1].AvarageSpeed) +} + +func (p *SpeedStats) ShouldAtClient(size int64) bool { + p.lock.RLock() + defer p.lock.RUnlock() + + var ss []Stats + + if size < 100*1024*1024 { + ss = p.stats100M + } else if size < 1024*1024*1024 { + ss = p.stats1G + } else { + ss = p.statsAbove1G + } + + prob := 0.0 + + if ss[0].AvarageSpeed == 0 || ss[1].AvarageSpeed == 0 { + prob = 0.5 + } else { + // 保证最小概率为0.05,最大概率为0.95 + totalSpeed := ss[0].AvarageSpeed + ss[1].AvarageSpeed + scale := 1 - 2*MinProb + prob = ss[0].AvarageSpeed/totalSpeed*scale + MinProb + } + + v := rand.Float64() + return v < prob +} diff --git a/client/internal/speedstats/speedstats_test.go b/client/internal/speedstats/speedstats_test.go new file mode 100644 index 0000000..639253d --- /dev/null +++ b/client/internal/speedstats/speedstats_test.go @@ -0,0 +1,25 @@ +package speedstats + +import ( + "testing" + "time" + + . "github.com/smartystreets/goconvey/convey" +) + +func Test_Stats(t *testing.T) { + Convey("Stats", t, func() { + stats := Stats{} + stats.Record(100, time.Second) + So(stats.AvarageSpeed, ShouldAlmostEqual, 100, 0.01) + + stats.Step() + So(stats.AvarageSpeed, ShouldAlmostEqual, 80, 0.01) + + stats.Record(200, time.Second) + So(stats.AvarageSpeed, ShouldAlmostEqual, 120, 0.01) + + stats.Step() + So(stats.AvarageSpeed, ShouldAlmostEqual, 96, 0.01) + }) +} diff --git a/client/internal/ticktock/speed_stats_step.go b/client/internal/ticktock/speed_stats_step.go new file mode 100644 index 0000000..91f8fd2 --- /dev/null +++ b/client/internal/ticktock/speed_stats_step.go @@ -0,0 +1,27 @@ +package ticktock + +import ( + "time" + + "gitlink.org.cn/cloudream/common/pkgs/logger" + "gitlink.org.cn/cloudream/common/utils/reflect2" +) + +type SpeedStatsStep struct { +} + +func (j *SpeedStatsStep) Name() string { + return reflect2.TypeNameOf[SpeedStatsStep]() +} + +func (j *SpeedStatsStep) Execute(t *TickTock) { + log := logger.WithType[SpeedStatsStep]("Event") + startTime := time.Now() + log.Infof("job start") + defer func() { + log.Infof("job end, time: %v", time.Since(startTime)) + }() + + t.speedStats.Step() + log.Info(t.speedStats.Dump()) +} diff --git a/client/internal/ticktock/ticktock.go b/client/internal/ticktock/ticktock.go index bd10e1a..3146832 100644 --- a/client/internal/ticktock/ticktock.go +++ b/client/internal/ticktock/ticktock.go @@ -8,6 +8,7 @@ import ( "gitlink.org.cn/cloudream/common/pkgs/logger" "gitlink.org.cn/cloudream/jcs-pub/client/internal/db" "gitlink.org.cn/cloudream/jcs-pub/client/internal/metacache" + "gitlink.org.cn/cloudream/jcs-pub/client/internal/speedstats" "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/publock" "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/storage/pool" "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/sysevent" @@ -24,27 +25,29 @@ type cronJob struct { } type TickTock struct { - cfg Config - sch gocron.Scheduler - jobs map[string]cronJob - db *db.DB - spaceMeta *metacache.UserSpaceMeta - stgPool *pool.Pool - evtPub *sysevent.Publisher - pubLock *publock.Service + cfg Config + sch gocron.Scheduler + jobs map[string]cronJob + db *db.DB + spaceMeta *metacache.UserSpaceMeta + stgPool *pool.Pool + evtPub *sysevent.Publisher + pubLock *publock.Service + speedStats *speedstats.SpeedStats } -func New(cfg Config, db *db.DB, spaceMeta *metacache.UserSpaceMeta, stgPool *pool.Pool, evtPub *sysevent.Publisher, pubLock *publock.Service) *TickTock { +func New(cfg Config, db *db.DB, spaceMeta *metacache.UserSpaceMeta, stgPool *pool.Pool, evtPub *sysevent.Publisher, pubLock *publock.Service, speedStats *speedstats.SpeedStats) *TickTock { sch, _ := gocron.NewScheduler() t := &TickTock{ - cfg: cfg, - sch: sch, - jobs: map[string]cronJob{}, - db: db, - spaceMeta: spaceMeta, - stgPool: stgPool, - evtPub: evtPub, - pubLock: pubLock, + cfg: cfg, + sch: sch, + jobs: map[string]cronJob{}, + db: db, + spaceMeta: spaceMeta, + stgPool: stgPool, + evtPub: evtPub, + pubLock: pubLock, + speedStats: speedStats, } t.initJobs() return t @@ -102,4 +105,10 @@ func (t *TickTock) initJobs() { gocron.NewAtTime(2, 0, 0), ))) + t.addJob(&SpeedStatsStep{}, gocron.DailyJob(1, gocron.NewAtTimes( + gocron.NewAtTime(0, 0, 0), + gocron.NewAtTime(6, 0, 0), + gocron.NewAtTime(12, 0, 0), + gocron.NewAtTime(18, 0, 0), + ))) } diff --git a/client/internal/uploader/create_load.go b/client/internal/uploader/create_load.go index 27648fc..c078dc4 100644 --- a/client/internal/uploader/create_load.go +++ b/client/internal/uploader/create_load.go @@ -48,9 +48,11 @@ func (u *CreateUploader) Upload(pa types.JPath, stream io.Reader, opts ...Upload fromExec, hd := ioswitch2.NewFromDriver(ioswitch2.RawStream()) ft.AddFrom(fromExec) for i, space := range u.targetSpaces { - ft.AddTo(ioswitch2.NewToShardStore(space, ioswitch2.RawStream(), "shardInfo")) - ft.AddTo(ioswitch2.NewToBaseStore(space, u.copyRoots[i].ConcatNew(pa))) - spaceIDs = append(spaceIDs, space.UserSpace.UserSpaceID) + space2 := space + space2.RecommendHub = nil + ft.AddTo(ioswitch2.NewToShardStore(space2, ioswitch2.RawStream(), "shardInfo")) + ft.AddTo(ioswitch2.NewToBaseStore(space2, u.copyRoots[i].ConcatNew(pa))) + spaceIDs = append(spaceIDs, space2.UserSpace.UserSpaceID) } plans := exec.NewPlanBuilder() diff --git a/client/internal/uploader/update.go b/client/internal/uploader/update.go index ed68df1..d207e77 100644 --- a/client/internal/uploader/update.go +++ b/client/internal/uploader/update.go @@ -54,13 +54,18 @@ func (w *UpdateUploader) Upload(pat types.JPath, stream io.Reader, opts ...Uploa opt.CreateTime = time.Now() } + targetSpace := w.targetSpace + targetSpace.RecommendHub = nil + ft := ioswitch2.NewFromTo() fromExec, hd := ioswitch2.NewFromDriver(ioswitch2.RawStream()) ft.AddFrom(fromExec). - AddTo(ioswitch2.NewToShardStore(w.targetSpace, ioswitch2.RawStream(), "shardInfo")) + AddTo(ioswitch2.NewToShardStore(targetSpace, ioswitch2.RawStream(), "shardInfo")) for i, space := range w.copyToSpaces { - ft.AddTo(ioswitch2.NewToBaseStore(space, w.copyToPath[i].ConcatNew(pat))) + toSpace := space + toSpace.RecommendHub = nil + ft.AddTo(ioswitch2.NewToBaseStore(toSpace, w.copyToPath[i].ConcatNew(pat))) } plans := exec.NewPlanBuilder() diff --git a/common/pkgs/ioswitch/exec/executor.go b/common/pkgs/ioswitch/exec/executor.go index 42cae9c..13fb7b5 100644 --- a/common/pkgs/ioswitch/exec/executor.go +++ b/common/pkgs/ioswitch/exec/executor.go @@ -23,17 +23,19 @@ type freeVar struct { type Executor struct { plan Plan + location Location vars map[VarID]freeVar bindings []*binding lock sync.Mutex store map[string][]VarValue } -func NewExecutor(plan Plan) *Executor { +func NewExecutor(plan Plan, loc Location) *Executor { planning := Executor{ - plan: plan, - vars: make(map[VarID]freeVar), - store: make(map[string][]VarValue), + plan: plan, + location: loc, + vars: make(map[VarID]freeVar), + store: make(map[string][]VarValue), } return &planning @@ -43,6 +45,10 @@ func (s *Executor) Plan() *Plan { return &s.plan } +func (s *Executor) Location() Location { + return s.location +} + func (s *Executor) Run(ctx *ExecContext) (ExecutorResult, error) { c, cancel := context.WithCancel(ctx.Context) ctx = &ExecContext{ @@ -183,3 +189,8 @@ func PutArray[T VarValue](e *Executor, ids []VarID, values []T) { e.PutVar(ids[i], values[i]) } } + +type Location struct { + IsDriver bool + WorkerName string +} diff --git a/common/pkgs/ioswitch/exec/plan_builder.go b/common/pkgs/ioswitch/exec/plan_builder.go index 21eae7d..17b0d53 100644 --- a/common/pkgs/ioswitch/exec/plan_builder.go +++ b/common/pkgs/ioswitch/exec/plan_builder.go @@ -66,7 +66,7 @@ func (b *PlanBuilder) Execute(ctx *ExecContext) *Driver { callback: future.NewSetValue[PlanResult](), ctx: ctx, cancel: cancel, - driverExec: NewExecutor(execPlan), + driverExec: NewExecutor(execPlan, Location{IsDriver: true}), } go exec.execute() diff --git a/common/pkgs/ioswitch2/http_hub_worker.go b/common/pkgs/ioswitch2/http_hub_worker.go index 691ca67..c90b350 100644 --- a/common/pkgs/ioswitch2/http_hub_worker.go +++ b/common/pkgs/ioswitch2/http_hub_worker.go @@ -2,6 +2,7 @@ package ioswitch2 import ( "context" + "fmt" "io" "strconv" @@ -52,7 +53,8 @@ type HttpHubWorkerClient struct { func (c *HttpHubWorkerClient) ExecutePlan(ctx context.Context, plan exec.Plan) (exec.ExecutorResult, error) { resp, err := c.cli.ExecuteIOPlan(hubapi.ExecuteIOPlanReq{ - Plan: plan, + Plan: plan, + WorkerName: fmt.Sprintf("%v", c.hubID), }) if err != nil { return exec.ExecutorResult{}, err diff --git a/common/pkgs/ioswitch2/hub_worker.go b/common/pkgs/ioswitch2/hub_worker.go index ee35b04..ab819d8 100644 --- a/common/pkgs/ioswitch2/hub_worker.go +++ b/common/pkgs/ioswitch2/hub_worker.go @@ -2,6 +2,7 @@ package ioswitch2 import ( "context" + "fmt" "io" "gitlink.org.cn/cloudream/common/pkgs/types" @@ -47,7 +48,7 @@ type HubWorkerClient struct { } func (c *HubWorkerClient) ExecutePlan(ctx context.Context, plan exec.Plan) (exec.ExecutorResult, error) { - resp, err := c.cli.ExecuteIOPlan(ctx, &hubrpc.ExecuteIOPlan{Plan: plan}) + resp, err := c.cli.ExecuteIOPlan(ctx, &hubrpc.ExecuteIOPlan{Plan: plan, WorkerName: fmt.Sprintf("%v", c.hubID)}) if err != nil { return exec.ExecutorResult{}, err.ToError() } diff --git a/common/pkgs/ioswitch2/ops2/base_store.go b/common/pkgs/ioswitch2/ops2/base_store.go index 669bb0a..84ddbc0 100644 --- a/common/pkgs/ioswitch2/ops2/base_store.go +++ b/common/pkgs/ioswitch2/ops2/base_store.go @@ -3,6 +3,7 @@ package ops2 import ( "fmt" "io" + "time" "gitlink.org.cn/cloudream/common/pkgs/future" "gitlink.org.cn/cloudream/common/pkgs/logger" @@ -15,10 +16,29 @@ import ( "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/storage/types" ) +const ( + BaseReadStatsStoreKey = "BaseReadSpeed" +) + func init() { exec.UseOp[*BaseWrite]() exec.UseOp[*BaseRead]() exec.UseOp[*BaseReadDyn]() + exec.UseVarValue[*BaseReadStatsValue]() +} + +type BaseReadStatsValue struct { + Size int64 + Time time.Duration + Location exec.Location +} + +func (v *BaseReadStatsValue) Clone() exec.VarValue { + return &BaseReadStatsValue{ + Size: v.Size, + Time: v.Time, + Location: v.Location, + } } type BaseRead struct { @@ -51,9 +71,23 @@ func (o *BaseRead) Execute(ctx *exec.ExecContext, e *exec.Executor) error { return fmt.Errorf("reading object %v: %w", o.Path, err) } + startTime := time.Now() + counter := io2.CounterCloser(stream, func(cnt int64, err error) { + if err != nil && err != io.EOF { + return + } + + // 要注意这个回调一定要在return之前调用 + e.Store(BaseReadStatsStoreKey, &BaseReadStatsValue{ + Size: cnt, + Time: time.Since(startTime), + Location: e.Location(), + }) + }) + fut := future.NewSetVoid() output := &exec.StreamValue{ - Stream: io2.AfterReadClosed(stream, func(closer io.ReadCloser) { + Stream: io2.AfterReadClosed(counter, func(closer io.ReadCloser) { fut.SetVoid() }), } @@ -102,9 +136,23 @@ func (o *BaseReadDyn) Execute(ctx *exec.ExecContext, e *exec.Executor) error { return fmt.Errorf("reading object %v: %w", o.FileInfo, err) } + startTime := time.Now() + counter := io2.CounterCloser(stream, func(cnt int64, err error) { + if err != nil && err != io.EOF { + return + } + + // 要注意这个回调一定要在return之前调用 + e.Store(BaseReadStatsStoreKey, &BaseReadStatsValue{ + Size: cnt, + Time: time.Since(startTime), + Location: e.Location(), + }) + }) + fut := future.NewSetVoid() output := &exec.StreamValue{ - Stream: io2.AfterReadClosed(stream, func(closer io.ReadCloser) { + Stream: io2.AfterReadClosed(counter, func(closer io.ReadCloser) { fut.SetVoid() }), } diff --git a/common/pkgs/rpc/hub/ioswitch.go b/common/pkgs/rpc/hub/ioswitch.go index b761acc..b8097cd 100644 --- a/common/pkgs/rpc/hub/ioswitch.go +++ b/common/pkgs/rpc/hub/ioswitch.go @@ -18,7 +18,8 @@ type IOSwitchSvc interface { // 执行IO计划 type ExecuteIOPlan struct { - Plan exec.Plan + Plan exec.Plan + WorkerName string } type ExecuteIOPlanResp struct { Result exec.ExecutorResult diff --git a/hub/internal/http/hub_io.go b/hub/internal/http/hub_io.go index 2b08ce2..b8bb4c6 100644 --- a/hub/internal/http/hub_io.go +++ b/hub/internal/http/hub_io.go @@ -156,20 +156,25 @@ func (s *IOService) ExecuteIOPlan(ctx *gin.Context) { log.Infof("begin execute io plan") - sw := exec.NewExecutor(req.Plan) + sw := exec.NewExecutor(req.Plan, exec.Location{ + IsDriver: false, + WorkerName: req.WorkerName, + }) s.svc.swWorker.Add(sw) defer s.svc.swWorker.Remove(sw) execCtx := exec.NewWithContext(ctx.Request.Context()) exec.SetValueByType(execCtx, s.svc.stgPool) - _, err = sw.Run(execCtx) + ret, err := sw.Run(execCtx) if err != nil { ctx.JSON(http.StatusOK, Failed(errorcode.OperationFailed, fmt.Sprintf("executing plan: %v", err))) return } - ctx.JSON(http.StatusOK, OK(nil)) + ctx.JSON(http.StatusOK, OK(hubapi.ExecuteIOPlanResp{ + Result: ret, + })) } func (s *IOService) SendVar(ctx *gin.Context) { diff --git a/hub/internal/rpc/ioswitch.go b/hub/internal/rpc/ioswitch.go index 83257d1..d6e2137 100644 --- a/hub/internal/rpc/ioswitch.go +++ b/hub/internal/rpc/ioswitch.go @@ -17,21 +17,26 @@ func (s *Service) ExecuteIOPlan(ctx context.Context, req *hubrpc.ExecuteIOPlan) log := logger.WithField("PlanID", req.Plan.ID) log.Infof("begin execute io plan") - sw := exec.NewExecutor(req.Plan) + sw := exec.NewExecutor(req.Plan, exec.Location{ + IsDriver: false, + WorkerName: req.WorkerName, + }) s.swWorker.Add(sw) defer s.swWorker.Remove(sw) execCtx := exec.NewWithContext(ctx) exec.SetValueByType(execCtx, s.stgPool) - _, err := sw.Run(execCtx) + ret, err := sw.Run(execCtx) if err != nil { log.Warnf("running io plan: %v", err) return nil, rpc.Failed(errorcode.OperationFailed, "%v", err) } log.Infof("plan finished") - return &hubrpc.ExecuteIOPlanResp{}, nil + return &hubrpc.ExecuteIOPlanResp{ + Result: ret, + }, nil } func (s *Service) SendIOStream(ctx context.Context, req *hubrpc.SendIOStream) (*hubrpc.SendIOStreamResp, *rpc.CodeError) { diff --git a/hub/sdk/api/hub_io.go b/hub/sdk/api/hub_io.go index 200f8b4..8327aa3 100644 --- a/hub/sdk/api/hub_io.go +++ b/hub/sdk/api/hub_io.go @@ -127,7 +127,8 @@ func (c *Client) SendStream(req SendStreamReq) error { const ExecuteIOPlanPath = "/hubIO/executeIOPlan" type ExecuteIOPlanReq struct { - Plan exec.Plan `json:"plan"` + Plan exec.Plan `json:"plan"` + WorkerName string `json:"workerName"` } type ExecuteIOPlanResp struct { Result exec.ExecutorResult `json:"result"`