From 5c91ff1ac3558a5893795e1f35d00446326f5319 Mon Sep 17 00:00:00 2001 From: Sydonian <794346190@qq.com> Date: Mon, 21 Apr 2025 10:36:16 +0800 Subject: [PATCH] =?UTF-8?q?=E7=A7=BB=E6=A4=8D=E6=9B=B4=E6=96=B0=E8=AE=BF?= =?UTF-8?q?=E9=97=AE=E9=87=8F=E7=9A=84=E5=AE=9A=E6=97=B6=E4=BA=8B=E4=BB=B6?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- client/internal/ticktock/config.go | 3 +- client/internal/ticktock/ticktock.go | 7 +- .../update_package_access_stat_amount.go | 37 +++++++++ common/assets/confs/client.config.json | 3 +- .../update_package_access_stat_amount.go | 80 ------------------- .../batch_all_agent_check_shardstore.go | 42 ---------- scanner/internal/tickevent/storage_gc.go | 45 ----------- .../update_all_package_access_stat_amount.go | 35 -------- 8 files changed, 47 insertions(+), 205 deletions(-) create mode 100644 client/internal/ticktock/update_package_access_stat_amount.go delete mode 100644 scanner/internal/event/update_package_access_stat_amount.go delete mode 100644 scanner/internal/tickevent/batch_all_agent_check_shardstore.go delete mode 100644 scanner/internal/tickevent/storage_gc.go delete mode 100644 scanner/internal/tickevent/update_all_package_access_stat_amount.go diff --git a/client/internal/ticktock/config.go b/client/internal/ticktock/config.go index 6093501..6abb0ed 100644 --- a/client/internal/ticktock/config.go +++ b/client/internal/ticktock/config.go @@ -1,5 +1,6 @@ package ticktock type Config struct { - ECFileSizeThreshold int64 `json:"ecFileSizeThreshold"` + ECFileSizeThreshold int64 `json:"ecFileSizeThreshold"` + AccessStatHistoryWeight float64 `json:"accessStatHistoryWeight"` } diff --git a/client/internal/ticktock/ticktock.go b/client/internal/ticktock/ticktock.go index aa49ba8..46db68d 100644 --- a/client/internal/ticktock/ticktock.go +++ b/client/internal/ticktock/ticktock.go @@ -77,10 +77,14 @@ func (t *TickTock) addJob(job Job, duration gocron.JobDefinition) { } func (t *TickTock) initJobs() { - t.addJob(&ChangeRedundancy{}, gocron.DailyJob(1, gocron.NewAtTimes( + t.addJob(&UpdatePackageAccessStatAmount{}, gocron.DailyJob(1, gocron.NewAtTimes( gocron.NewAtTime(0, 0, 0), ))) + t.addJob(&ChangeRedundancy{}, gocron.DailyJob(1, gocron.NewAtTimes( + gocron.NewAtTime(0, 5, 0), + ))) + t.addJob(&CheckShardStore{}, gocron.DailyJob(1, gocron.NewAtTimes( gocron.NewAtTime(1, 0, 0), ))) @@ -88,4 +92,5 @@ func (t *TickTock) initJobs() { t.addJob(&ShardStoreGC{}, gocron.DailyJob(1, gocron.NewAtTimes( gocron.NewAtTime(2, 0, 0), ))) + } diff --git a/client/internal/ticktock/update_package_access_stat_amount.go b/client/internal/ticktock/update_package_access_stat_amount.go new file mode 100644 index 0000000..3e8de65 --- /dev/null +++ b/client/internal/ticktock/update_package_access_stat_amount.go @@ -0,0 +1,37 @@ +package ticktock + +import ( + "time" + + "gitlink.org.cn/cloudream/common/pkgs/logger" + "gitlink.org.cn/cloudream/common/utils/reflect2" + scevt "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/mq/scanner/event" +) + +type UpdatePackageAccessStatAmount struct { + *scevt.UpdatePackageAccessStatAmount +} + +func (j *UpdatePackageAccessStatAmount) Name() string { + return reflect2.TypeNameOf[UpdatePackageAccessStatAmount]() +} + +func (j *UpdatePackageAccessStatAmount) Execute(t *TickTock) { + log := logger.WithType[UpdatePackageAccessStatAmount]("TickTock") + startTime := time.Now() + log.Debugf("job start") + defer func() { + log.Debugf("job end, time: %v", time.Since(startTime)) + }() + + err := t.db.PackageAccessStat().UpdateAllAmount(t.db.DefCtx(), t.cfg.AccessStatHistoryWeight) + if err != nil { + log.Warnf("update all package access stat amount: %v", err) + return + } + err = t.db.ObjectAccessStat().UpdateAllAmount(t.db.DefCtx(), t.cfg.AccessStatHistoryWeight) + if err != nil { + log.Warnf("update all object access stat amount: %v", err) + return + } +} diff --git a/common/assets/confs/client.config.json b/common/assets/confs/client.config.json index 3bc8f5d..7bb8027 100644 --- a/common/assets/confs/client.config.json +++ b/common/assets/confs/client.config.json @@ -47,7 +47,8 @@ "highLatencyHub": 35 }, "tickTock": { - "ecFileSizeThreshold": 5242880 + "ecFileSizeThreshold": 5242880, + "accessStatHistoryWeight": 0.8 }, "http": { "enabled": true, diff --git a/scanner/internal/event/update_package_access_stat_amount.go b/scanner/internal/event/update_package_access_stat_amount.go deleted file mode 100644 index 667a48f..0000000 --- a/scanner/internal/event/update_package_access_stat_amount.go +++ /dev/null @@ -1,80 +0,0 @@ -package event - -import ( - "time" - - "github.com/samber/lo" - "gitlink.org.cn/cloudream/common/pkgs/logger" - scevt "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/mq/scanner/event" - "gitlink.org.cn/cloudream/jcs-pub/scanner/internal/config" -) - -type UpdatePackageAccessStatAmount struct { - *scevt.UpdatePackageAccessStatAmount -} - -func NewUpdatePackageAccessStatAmount(evt *scevt.UpdatePackageAccessStatAmount) *UpdatePackageAccessStatAmount { - return &UpdatePackageAccessStatAmount{ - UpdatePackageAccessStatAmount: evt, - } -} - -func (t *UpdatePackageAccessStatAmount) TryMerge(other Event) bool { - event, ok := other.(*UpdatePackageAccessStatAmount) - if !ok { - return false - } - - if t.PackageIDs == nil { - return true - } - - if event.PackageIDs == nil { - t.PackageIDs = nil - return true - } - - t.PackageIDs = append(t.PackageIDs, event.PackageIDs...) - t.PackageIDs = lo.Uniq(t.PackageIDs) - return true -} - -func (t *UpdatePackageAccessStatAmount) Execute(execCtx ExecuteContext) { - log := logger.WithType[UpdatePackageAccessStatAmount]("Event") - startTime := time.Now() - log.Debugf("begin with %v", logger.FormatStruct(t.UpdatePackageAccessStatAmount)) - defer func() { - log.Debugf("end, time: %v", time.Since(startTime)) - }() - - if t.PackageIDs == nil { - err := execCtx.Args.DB.PackageAccessStat().UpdateAllAmount(execCtx.Args.DB.DefCtx(), config.Cfg().AccessStatHistoryAmount) - if err != nil { - log.Warnf("update all package access stat amount: %v", err) - return - } - - err = execCtx.Args.DB.ObjectAccessStat().UpdateAllAmount(execCtx.Args.DB.DefCtx(), config.Cfg().AccessStatHistoryAmount) - if err != nil { - log.Warnf("update all object access stat amount: %v", err) - return - } - - } else { - err := execCtx.Args.DB.PackageAccessStat().BatchUpdateAmount(execCtx.Args.DB.DefCtx(), t.PackageIDs, config.Cfg().AccessStatHistoryAmount) - if err != nil { - log.Warnf("batch update package access stat amount: %v", err) - return - } - - err = execCtx.Args.DB.ObjectAccessStat().BatchUpdateAmountInPackage(execCtx.Args.DB.DefCtx(), t.PackageIDs, config.Cfg().AccessStatHistoryAmount) - if err != nil { - log.Warnf("batch update object access stat amount in package: %v", err) - return - } - } -} - -func init() { - RegisterMessageConvertor(NewUpdatePackageAccessStatAmount) -} diff --git a/scanner/internal/tickevent/batch_all_agent_check_shardstore.go b/scanner/internal/tickevent/batch_all_agent_check_shardstore.go deleted file mode 100644 index 3c43f7f..0000000 --- a/scanner/internal/tickevent/batch_all_agent_check_shardstore.go +++ /dev/null @@ -1,42 +0,0 @@ -package tickevent - -import ( - "gitlink.org.cn/cloudream/common/pkgs/logger" - cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" - scevt "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/mq/scanner/event" - "gitlink.org.cn/cloudream/jcs-pub/scanner/internal/event" -) - -const HUB_CHECK_CACHE_BATCH_SIZE = 2 - -type BatchAllHubCheckShardStore struct { - stgIDs []cdssdk.StorageID -} - -func NewBatchAllHubCheckShardStore() *BatchAllHubCheckShardStore { - return &BatchAllHubCheckShardStore{} -} - -func (e *BatchAllHubCheckShardStore) Execute(ctx ExecuteContext) { - log := logger.WithType[BatchAllHubCheckShardStore]("TickEvent") - log.Debugf("begin") - defer log.Debugf("end") - - if len(e.stgIDs) == 0 { - ids, err := ctx.Args.DB.Storage().GetAllIDs(ctx.Args.DB.DefCtx()) - if err != nil { - log.Warnf("get all storages failed, err: %s", err.Error()) - return - } - - log.Debugf("new check start, get all storages") - e.stgIDs = ids - } - - checkedCnt := 0 - for ; checkedCnt < len(e.stgIDs) && checkedCnt < HUB_CHECK_CACHE_BATCH_SIZE; checkedCnt++ { - // nil代表进行全量检查 - ctx.Args.EventExecutor.Post(event.NewHubCheckShardStore(scevt.NewHubCheckShardStore(e.stgIDs[checkedCnt]))) - } - e.stgIDs = e.stgIDs[checkedCnt:] -} diff --git a/scanner/internal/tickevent/storage_gc.go b/scanner/internal/tickevent/storage_gc.go deleted file mode 100644 index 7d9031f..0000000 --- a/scanner/internal/tickevent/storage_gc.go +++ /dev/null @@ -1,45 +0,0 @@ -package tickevent - -import ( - "time" - - "gitlink.org.cn/cloudream/common/pkgs/logger" - cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" - scevt "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/mq/scanner/event" - "gitlink.org.cn/cloudream/jcs-pub/scanner/internal/event" -) - -type StorageGC struct { - storageIDs []cdssdk.StorageID -} - -func NewStorageGC() *StorageGC { - return &StorageGC{} -} - -func (e *StorageGC) Execute(ctx ExecuteContext) { - log := logger.WithType[StorageGC]("TickEvent") - log.Debugf("begin") - defer log.Debugf("end") - - if len(e.storageIDs) == 0 { - // 0点开始检查 - if time.Now().Hour() > 0 { - return - } - - stgIDs, err := ctx.Args.DB.Storage().GetAllIDs(ctx.Args.DB.DefCtx()) - if err != nil { - log.Warnf("get all storage ids: %v", err) - return - } - e.storageIDs = stgIDs - } - - if len(e.storageIDs) == 0 { - return - } - - ctx.Args.EventExecutor.Post(event.NewHubShardStoreGC(scevt.NewHubShardStoreGC(e.storageIDs[0]))) - e.storageIDs = e.storageIDs[1:] -} diff --git a/scanner/internal/tickevent/update_all_package_access_stat_amount.go b/scanner/internal/tickevent/update_all_package_access_stat_amount.go deleted file mode 100644 index 9221112..0000000 --- a/scanner/internal/tickevent/update_all_package_access_stat_amount.go +++ /dev/null @@ -1,35 +0,0 @@ -package tickevent - -import ( - "time" - - "gitlink.org.cn/cloudream/common/pkgs/logger" - "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/mq/scanner/event" - evt "gitlink.org.cn/cloudream/jcs-pub/scanner/internal/event" -) - -type UpdateAllPackageAccessStatAmount struct { - todayUpdated bool -} - -func NewUpdateAllPackageAccessStatAmount() *UpdateAllPackageAccessStatAmount { - return &UpdateAllPackageAccessStatAmount{} -} - -func (e *UpdateAllPackageAccessStatAmount) Execute(ctx ExecuteContext) { - log := logger.WithType[UpdateAllPackageAccessStatAmount]("TickEvent") - log.Debugf("begin") - defer log.Debugf("end") - - nowHour := time.Now().Hour() - if nowHour != 0 { - e.todayUpdated = false - return - } - if e.todayUpdated { - return - } - e.todayUpdated = true - - ctx.Args.EventExecutor.Post(evt.NewUpdatePackageAccessStatAmount(event.NewUpdatePackageAccessStatAmount(nil))) -}