Browse Source

移植更新访问量的定时事件

feature_gxh
Sydonian 7 months ago
parent
commit
5c91ff1ac3
8 changed files with 47 additions and 205 deletions
  1. +2
    -1
      client/internal/ticktock/config.go
  2. +6
    -1
      client/internal/ticktock/ticktock.go
  3. +37
    -0
      client/internal/ticktock/update_package_access_stat_amount.go
  4. +2
    -1
      common/assets/confs/client.config.json
  5. +0
    -80
      scanner/internal/event/update_package_access_stat_amount.go
  6. +0
    -42
      scanner/internal/tickevent/batch_all_agent_check_shardstore.go
  7. +0
    -45
      scanner/internal/tickevent/storage_gc.go
  8. +0
    -35
      scanner/internal/tickevent/update_all_package_access_stat_amount.go

+ 2
- 1
client/internal/ticktock/config.go View File

@@ -1,5 +1,6 @@
package ticktock

type Config struct {
ECFileSizeThreshold int64 `json:"ecFileSizeThreshold"`
ECFileSizeThreshold int64 `json:"ecFileSizeThreshold"`
AccessStatHistoryWeight float64 `json:"accessStatHistoryWeight"`
}

+ 6
- 1
client/internal/ticktock/ticktock.go View File

@@ -77,10 +77,14 @@ func (t *TickTock) addJob(job Job, duration gocron.JobDefinition) {
}

func (t *TickTock) initJobs() {
t.addJob(&ChangeRedundancy{}, gocron.DailyJob(1, gocron.NewAtTimes(
t.addJob(&UpdatePackageAccessStatAmount{}, gocron.DailyJob(1, gocron.NewAtTimes(
gocron.NewAtTime(0, 0, 0),
)))

t.addJob(&ChangeRedundancy{}, gocron.DailyJob(1, gocron.NewAtTimes(
gocron.NewAtTime(0, 5, 0),
)))

t.addJob(&CheckShardStore{}, gocron.DailyJob(1, gocron.NewAtTimes(
gocron.NewAtTime(1, 0, 0),
)))
@@ -88,4 +92,5 @@ func (t *TickTock) initJobs() {
t.addJob(&ShardStoreGC{}, gocron.DailyJob(1, gocron.NewAtTimes(
gocron.NewAtTime(2, 0, 0),
)))

}

+ 37
- 0
client/internal/ticktock/update_package_access_stat_amount.go View File

@@ -0,0 +1,37 @@
package ticktock

import (
"time"

"gitlink.org.cn/cloudream/common/pkgs/logger"
"gitlink.org.cn/cloudream/common/utils/reflect2"
scevt "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/mq/scanner/event"
)

type UpdatePackageAccessStatAmount struct {
*scevt.UpdatePackageAccessStatAmount
}

func (j *UpdatePackageAccessStatAmount) Name() string {
return reflect2.TypeNameOf[UpdatePackageAccessStatAmount]()
}

func (j *UpdatePackageAccessStatAmount) Execute(t *TickTock) {
log := logger.WithType[UpdatePackageAccessStatAmount]("TickTock")
startTime := time.Now()
log.Debugf("job start")
defer func() {
log.Debugf("job end, time: %v", time.Since(startTime))
}()

err := t.db.PackageAccessStat().UpdateAllAmount(t.db.DefCtx(), t.cfg.AccessStatHistoryWeight)
if err != nil {
log.Warnf("update all package access stat amount: %v", err)
return
}
err = t.db.ObjectAccessStat().UpdateAllAmount(t.db.DefCtx(), t.cfg.AccessStatHistoryWeight)
if err != nil {
log.Warnf("update all object access stat amount: %v", err)
return
}
}

+ 2
- 1
common/assets/confs/client.config.json View File

@@ -47,7 +47,8 @@
"highLatencyHub": 35
},
"tickTock": {
"ecFileSizeThreshold": 5242880
"ecFileSizeThreshold": 5242880,
"accessStatHistoryWeight": 0.8
},
"http": {
"enabled": true,


+ 0
- 80
scanner/internal/event/update_package_access_stat_amount.go View File

@@ -1,80 +0,0 @@
package event

import (
"time"

"github.com/samber/lo"
"gitlink.org.cn/cloudream/common/pkgs/logger"
scevt "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/mq/scanner/event"
"gitlink.org.cn/cloudream/jcs-pub/scanner/internal/config"
)

type UpdatePackageAccessStatAmount struct {
*scevt.UpdatePackageAccessStatAmount
}

func NewUpdatePackageAccessStatAmount(evt *scevt.UpdatePackageAccessStatAmount) *UpdatePackageAccessStatAmount {
return &UpdatePackageAccessStatAmount{
UpdatePackageAccessStatAmount: evt,
}
}

func (t *UpdatePackageAccessStatAmount) TryMerge(other Event) bool {
event, ok := other.(*UpdatePackageAccessStatAmount)
if !ok {
return false
}

if t.PackageIDs == nil {
return true
}

if event.PackageIDs == nil {
t.PackageIDs = nil
return true
}

t.PackageIDs = append(t.PackageIDs, event.PackageIDs...)
t.PackageIDs = lo.Uniq(t.PackageIDs)
return true
}

func (t *UpdatePackageAccessStatAmount) Execute(execCtx ExecuteContext) {
log := logger.WithType[UpdatePackageAccessStatAmount]("Event")
startTime := time.Now()
log.Debugf("begin with %v", logger.FormatStruct(t.UpdatePackageAccessStatAmount))
defer func() {
log.Debugf("end, time: %v", time.Since(startTime))
}()

if t.PackageIDs == nil {
err := execCtx.Args.DB.PackageAccessStat().UpdateAllAmount(execCtx.Args.DB.DefCtx(), config.Cfg().AccessStatHistoryAmount)
if err != nil {
log.Warnf("update all package access stat amount: %v", err)
return
}

err = execCtx.Args.DB.ObjectAccessStat().UpdateAllAmount(execCtx.Args.DB.DefCtx(), config.Cfg().AccessStatHistoryAmount)
if err != nil {
log.Warnf("update all object access stat amount: %v", err)
return
}

} else {
err := execCtx.Args.DB.PackageAccessStat().BatchUpdateAmount(execCtx.Args.DB.DefCtx(), t.PackageIDs, config.Cfg().AccessStatHistoryAmount)
if err != nil {
log.Warnf("batch update package access stat amount: %v", err)
return
}

err = execCtx.Args.DB.ObjectAccessStat().BatchUpdateAmountInPackage(execCtx.Args.DB.DefCtx(), t.PackageIDs, config.Cfg().AccessStatHistoryAmount)
if err != nil {
log.Warnf("batch update object access stat amount in package: %v", err)
return
}
}
}

func init() {
RegisterMessageConvertor(NewUpdatePackageAccessStatAmount)
}

+ 0
- 42
scanner/internal/tickevent/batch_all_agent_check_shardstore.go View File

@@ -1,42 +0,0 @@
package tickevent

import (
"gitlink.org.cn/cloudream/common/pkgs/logger"
cdssdk "gitlink.org.cn/cloudream/common/sdks/storage"
scevt "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/mq/scanner/event"
"gitlink.org.cn/cloudream/jcs-pub/scanner/internal/event"
)

const HUB_CHECK_CACHE_BATCH_SIZE = 2

type BatchAllHubCheckShardStore struct {
stgIDs []cdssdk.StorageID
}

func NewBatchAllHubCheckShardStore() *BatchAllHubCheckShardStore {
return &BatchAllHubCheckShardStore{}
}

func (e *BatchAllHubCheckShardStore) Execute(ctx ExecuteContext) {
log := logger.WithType[BatchAllHubCheckShardStore]("TickEvent")
log.Debugf("begin")
defer log.Debugf("end")

if len(e.stgIDs) == 0 {
ids, err := ctx.Args.DB.Storage().GetAllIDs(ctx.Args.DB.DefCtx())
if err != nil {
log.Warnf("get all storages failed, err: %s", err.Error())
return
}

log.Debugf("new check start, get all storages")
e.stgIDs = ids
}

checkedCnt := 0
for ; checkedCnt < len(e.stgIDs) && checkedCnt < HUB_CHECK_CACHE_BATCH_SIZE; checkedCnt++ {
// nil代表进行全量检查
ctx.Args.EventExecutor.Post(event.NewHubCheckShardStore(scevt.NewHubCheckShardStore(e.stgIDs[checkedCnt])))
}
e.stgIDs = e.stgIDs[checkedCnt:]
}

+ 0
- 45
scanner/internal/tickevent/storage_gc.go View File

@@ -1,45 +0,0 @@
package tickevent

import (
"time"

"gitlink.org.cn/cloudream/common/pkgs/logger"
cdssdk "gitlink.org.cn/cloudream/common/sdks/storage"
scevt "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/mq/scanner/event"
"gitlink.org.cn/cloudream/jcs-pub/scanner/internal/event"
)

type StorageGC struct {
storageIDs []cdssdk.StorageID
}

func NewStorageGC() *StorageGC {
return &StorageGC{}
}

func (e *StorageGC) Execute(ctx ExecuteContext) {
log := logger.WithType[StorageGC]("TickEvent")
log.Debugf("begin")
defer log.Debugf("end")

if len(e.storageIDs) == 0 {
// 0点开始检查
if time.Now().Hour() > 0 {
return
}

stgIDs, err := ctx.Args.DB.Storage().GetAllIDs(ctx.Args.DB.DefCtx())
if err != nil {
log.Warnf("get all storage ids: %v", err)
return
}
e.storageIDs = stgIDs
}

if len(e.storageIDs) == 0 {
return
}

ctx.Args.EventExecutor.Post(event.NewHubShardStoreGC(scevt.NewHubShardStoreGC(e.storageIDs[0])))
e.storageIDs = e.storageIDs[1:]
}

+ 0
- 35
scanner/internal/tickevent/update_all_package_access_stat_amount.go View File

@@ -1,35 +0,0 @@
package tickevent

import (
"time"

"gitlink.org.cn/cloudream/common/pkgs/logger"
"gitlink.org.cn/cloudream/jcs-pub/common/pkgs/mq/scanner/event"
evt "gitlink.org.cn/cloudream/jcs-pub/scanner/internal/event"
)

type UpdateAllPackageAccessStatAmount struct {
todayUpdated bool
}

func NewUpdateAllPackageAccessStatAmount() *UpdateAllPackageAccessStatAmount {
return &UpdateAllPackageAccessStatAmount{}
}

func (e *UpdateAllPackageAccessStatAmount) Execute(ctx ExecuteContext) {
log := logger.WithType[UpdateAllPackageAccessStatAmount]("TickEvent")
log.Debugf("begin")
defer log.Debugf("end")

nowHour := time.Now().Hour()
if nowHour != 0 {
e.todayUpdated = false
return
}
if e.todayUpdated {
return
}
e.todayUpdated = true

ctx.Args.EventExecutor.Post(evt.NewUpdatePackageAccessStatAmount(event.NewUpdatePackageAccessStatAmount(nil)))
}

Loading…
Cancel
Save