diff --git a/client/internal/accessstat/access_stat.go b/client/internal/accessstat/access_stat.go index 5ce1b6b..782c2cd 100644 --- a/client/internal/accessstat/access_stat.go +++ b/client/internal/accessstat/access_stat.go @@ -24,16 +24,23 @@ type ExitEvent struct { type AccessStat struct { cfg Config done chan any - stats []db.AddAccessStatEntry + stats map[entryKey]float64 lock sync.Mutex db *db.DB } +type entryKey struct { + objID clitypes.ObjectID + pkgID clitypes.PackageID + spaceID clitypes.UserSpaceID +} + func NewAccessStat(cfg Config, db *db.DB) *AccessStat { return &AccessStat{ - cfg: cfg, - done: make(chan any), - db: db, + cfg: cfg, + done: make(chan any), + stats: make(map[entryKey]float64), + db: db, } } @@ -41,12 +48,12 @@ func (p *AccessStat) AddAccessCounter(objID clitypes.ObjectID, pkgID clitypes.Pa p.lock.Lock() defer p.lock.Unlock() - p.stats = append(p.stats, db.AddAccessStatEntry{ - ObjectID: objID, - PackageID: pkgID, - UserSpaceID: spaceID, - Counter: value, - }) + key := entryKey{ + objID: objID, + pkgID: pkgID, + spaceID: spaceID, + } + p.stats[key] += value } func (p *AccessStat) Start() *AccessStatEventChan { @@ -73,12 +80,24 @@ func (p *AccessStat) Start() *AccessStatEventChan { continue } - err := db.DoTx10(p.db, p.db.Package().BatchAddPackageAccessStat, st) + var entries []db.AddAccessStatEntry + for k, v := range st { + entries = append(entries, db.AddAccessStatEntry{ + ObjectID: k.objID, + PackageID: k.pkgID, + UserSpaceID: k.spaceID, + Counter: v, + }) + } + err := db.DoTx10(p.db, p.db.Package().BatchAddPackageAccessStat, entries) + if err != nil { logger.Errorf("add all package access stat counter: %v", err) p.lock.Lock() - p.stats = append(p.stats, st...) + for k, v := range st { + p.stats[k] += v + } p.lock.Unlock() continue } diff --git a/client/internal/db/package.go b/client/internal/db/package.go index 8181e05..25864af 100644 --- a/client/internal/db/package.go +++ b/client/internal/db/package.go @@ -4,6 +4,7 @@ import ( "fmt" "time" + "github.com/samber/lo" "gitlink.org.cn/cloudream/jcs-pub/client/types" "gorm.io/gorm" ) @@ -242,19 +243,19 @@ type AddAccessStatEntry struct { } func (db *PackageDB) BatchAddPackageAccessStat(ctx SQLContext, entries []AddAccessStatEntry) error { - pkgIDs := make([]types.PackageID, len(entries)) - objIDs := make([]types.ObjectID, len(entries)) - for i, e := range entries { - pkgIDs[i] = e.PackageID - objIDs[i] = e.ObjectID + pkgIDs := make(map[types.PackageID]bool) + objIDs := make(map[types.ObjectID]bool) + for _, e := range entries { + pkgIDs[e.PackageID] = true + objIDs[e.ObjectID] = true } - avaiPkgIDs, err := db.Package().BatchTestPackageID(ctx, pkgIDs) + avaiPkgIDs, err := db.Package().BatchTestPackageID(ctx, lo.Keys(pkgIDs)) if err != nil { return fmt.Errorf("batch test package id: %w", err) } - avaiObjIDs, err := db.Object().BatchTestObjectID(ctx, objIDs) + avaiObjIDs, err := db.Object().BatchTestObjectID(ctx, lo.Keys(objIDs)) if err != nil { return fmt.Errorf("batch test object id: %w", err) } diff --git a/client/internal/ticktock/change_redundancy.go b/client/internal/ticktock/change_redundancy.go index 2e47888..c57e824 100644 --- a/client/internal/ticktock/change_redundancy.go +++ b/client/internal/ticktock/change_redundancy.go @@ -77,6 +77,11 @@ func (j *ChangeRedundancy) Execute(t *TickTock) { return } } + + // 如果执行超过两个小时,则停止 + if time.Since(startTime) > time.Hour*2 { + break + } } } diff --git a/client/internal/ticktock/redundancy_recover.go b/client/internal/ticktock/redundancy_recover.go index 426a840..14348ec 100644 --- a/client/internal/ticktock/redundancy_recover.go +++ b/client/internal/ticktock/redundancy_recover.go @@ -37,18 +37,39 @@ func (t *ChangeRedundancy) chooseRedundancy(ctx *changeRedundancyContext, obj cl return &clitypes.DefaultECRedundancy, newStgs } - return &clitypes.DefaultRepRedundancy, t.rechooseUserSpacesForRep(ctx, &clitypes.DefaultRepRedundancy) + newSpaces := t.rechooseUserSpacesForRep(ctx, &clitypes.DefaultRepRedundancy) + for _, s := range newSpaces { + if !obj.ContainsBlock(0, s.UserSpace.UserSpace.UserSpaceID) && !obj.ContainsPinned(s.UserSpace.UserSpace.UserSpaceID) { + return &clitypes.DefaultRepRedundancy, newSpaces + } + } + + return nil, nil case *clitypes.ECRedundancy: if obj.Object.Size < ctx.ticktock.cfg.ECFileSizeThreshold { return &clitypes.DefaultRepRedundancy, t.chooseNewUserSpacesForRep(ctx, &clitypes.DefaultRepRedundancy) } - return &clitypes.DefaultECRedundancy, t.rechooseUserSpacesForEC(ctx, obj, &clitypes.DefaultECRedundancy) + newSpaces := t.rechooseUserSpacesForEC(ctx, obj, &clitypes.DefaultECRedundancy) + for i, s := range newSpaces { + if !obj.ContainsBlock(i, s.UserSpace.UserSpace.UserSpaceID) { + return &clitypes.DefaultECRedundancy, newSpaces + } + } + + return nil, nil case *clitypes.LRCRedundancy: newLRCStgs := t.rechooseUserSpacesForLRC(ctx, obj, &clitypes.DefaultLRCRedundancy) - return &clitypes.DefaultLRCRedundancy, newLRCStgs + + for i, s := range newLRCStgs { + if !obj.ContainsBlock(i, s.UserSpace.UserSpace.UserSpaceID) { + return &clitypes.DefaultLRCRedundancy, newLRCStgs + } + } + + return nil, nil } return nil, nil } diff --git a/client/types/types.go b/client/types/types.go index 9e33d5a..deca657 100644 --- a/client/types/types.go +++ b/client/types/types.go @@ -219,6 +219,24 @@ func (o *ObjectDetail) GroupBlocks() []GrouppedObjectBlock { return sort2.Sort(lo.Values(grps), func(l, r GrouppedObjectBlock) int { return l.Index - r.Index }) } +func (o *ObjectDetail) ContainsBlock(idx int, userSpaceID UserSpaceID) bool { + for _, block := range o.Blocks { + if block.Index == idx && block.UserSpaceID == userSpaceID { + return true + } + } + return false +} + +func (o *ObjectDetail) ContainsPinned(userSpaceID UserSpaceID) bool { + for _, spaceID := range o.PinnedAt { + if spaceID == userSpaceID { + return true + } + } + return false +} + type UserSpaceDetail struct { UserID cortypes.UserID UserSpace UserSpace