From 29cf25d14276fca14efbfb69a48dc0ffdabbf2bc Mon Sep 17 00:00:00 2001 From: Sydonian <794346190@qq.com> Date: Thu, 12 Sep 2024 09:39:01 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BF=AE=E5=A4=8D=E8=B0=83=E8=AF=95=E9=97=AE?= =?UTF-8?q?=E9=A2=98?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- common/pkgs/accessstat/access_stat.go | 4 +++ common/pkgs/db/object.go | 4 +++ common/pkgs/db/object_access_stat.go | 31 +++++++++++++++++-- common/pkgs/db/package.go | 4 +++ common/pkgs/db/package_access_stat.go | 23 ++++++++++++++ .../update_package_access_stat_amount.go | 2 +- 6 files changed, 65 insertions(+), 3 deletions(-) diff --git a/common/pkgs/accessstat/access_stat.go b/common/pkgs/accessstat/access_stat.go index c402dd5..ae2684e 100644 --- a/common/pkgs/accessstat/access_stat.go +++ b/common/pkgs/accessstat/access_stat.go @@ -57,6 +57,10 @@ func (p *AccessStat) Start() *sync2.UnboundChannel[AccessStatEvent] { p.stats = nil p.lock.Unlock() + if len(st) == 0 { + continue + } + err := coorCli.AddAccessStat(coormq.ReqAddAccessStat(st)) if err != nil { logger.Errorf("add all package access stat counter: %v", err) diff --git a/common/pkgs/db/object.go b/common/pkgs/db/object.go index 2df97b6..19edacd 100644 --- a/common/pkgs/db/object.go +++ b/common/pkgs/db/object.go @@ -29,6 +29,10 @@ func (db *ObjectDB) GetByID(ctx SQLContext, objectID cdssdk.ObjectID) (model.Obj } func (db *ObjectDB) BatchTestObjectID(ctx SQLContext, objectIDs []cdssdk.ObjectID) (map[cdssdk.ObjectID]bool, error) { + if len(objectIDs) == 0 { + return make(map[cdssdk.ObjectID]bool), nil + } + stmt, args, err := sqlx.In("select ObjectID from Object where ObjectID in (?)", lo.Uniq(objectIDs)) if err != nil { return nil, err diff --git a/common/pkgs/db/object_access_stat.go b/common/pkgs/db/object_access_stat.go index 46f9544..26f5922 100644 --- a/common/pkgs/db/object_access_stat.go +++ b/common/pkgs/db/object_access_stat.go @@ -27,15 +27,38 @@ func (*ObjectAccessStatDB) GetByObjectID(ctx SQLContext, objID cdssdk.ObjectID) return ret, err } +func (*ObjectAccessStatDB) BatchGetByObjectID(ctx SQLContext, objIDs []cdssdk.ObjectID) ([]model.ObjectAccessStat, error) { + if len(objIDs) == 0 { + return nil, nil + } + + var ret []model.ObjectAccessStat + stmt, args, err := sqlx.In("select * from ObjectAccessStat where ObjectID in (?)", objIDs) + if err != nil { + return ret, err + } + + err = sqlx.Select(ctx, &ret, stmt, args...) + return ret, err +} + func (*ObjectAccessStatDB) BatchAddCounter(ctx SQLContext, entries []coormq.AddAccessStatEntry) error { + if len(entries) == 0 { + return nil + } + sql := "insert into ObjectAccessStat(ObjectID, NodeID, Counter, Amount) " + - "values(:ObjectID, :NodeID, :Counter, 0) as new" + - "on duplicate key update Counter=Counter+new.Counter" + " values(:ObjectID, :NodeID, :Counter, 0) as new" + + " on duplicate key update ObjectAccessStat.Counter=ObjectAccessStat.Counter+new.Counter" err := BatchNamedExec(ctx, sql, 4, entries, nil) return err } func (*ObjectAccessStatDB) BatchUpdateAmountInPackage(ctx SQLContext, pkgIDs []cdssdk.PackageID, historyWeight float64) error { + if len(pkgIDs) == 0 { + return nil + } + stmt, args, err := sqlx.In("update ObjectAccessStat inner join Object"+ " on ObjectAccessStat.ObjectID = Object.ObjectID"+ " set Amount=Amount*?+Counter*(1-?), Counter = 0"+ @@ -64,6 +87,10 @@ func (*ObjectAccessStatDB) DeleteByObjectID(ctx SQLContext, objID cdssdk.ObjectI } func (*ObjectAccessStatDB) BatchDeleteByObjectID(ctx SQLContext, objIDs []cdssdk.ObjectID) error { + if len(objIDs) == 0 { + return nil + } + stmt, args, err := sqlx.In("delete from ObjectAccessStat where ObjectID in (?)", objIDs) if err != nil { return err diff --git a/common/pkgs/db/package.go b/common/pkgs/db/package.go index d88d0d6..eaa7dae 100644 --- a/common/pkgs/db/package.go +++ b/common/pkgs/db/package.go @@ -33,6 +33,10 @@ func (db *PackageDB) GetByName(ctx SQLContext, bucketID cdssdk.BucketID, name st } func (db *PackageDB) BatchTestPackageID(ctx SQLContext, pkgIDs []cdssdk.PackageID) (map[cdssdk.PackageID]bool, error) { + if len(pkgIDs) == 0 { + return make(map[cdssdk.PackageID]bool), nil + } + stmt, args, err := sqlx.In("select PackageID from Package where PackageID in (?)", lo.Uniq(pkgIDs)) if err != nil { return nil, err diff --git a/common/pkgs/db/package_access_stat.go b/common/pkgs/db/package_access_stat.go index 1e34b6c..aabed44 100644 --- a/common/pkgs/db/package_access_stat.go +++ b/common/pkgs/db/package_access_stat.go @@ -27,7 +27,26 @@ func (*PackageAccessStatDB) GetByPackageID(ctx SQLContext, pkgID cdssdk.PackageI return ret, err } +func (*PackageAccessStatDB) BatchGetByPackageID(ctx SQLContext, pkgIDs []cdssdk.PackageID) ([]model.PackageAccessStat, error) { + if len(pkgIDs) == 0 { + return nil, nil + } + + var ret []model.PackageAccessStat + stmt, args, err := sqlx.In("select * from PackageAccessStat where PackageID in (?)", pkgIDs) + if err != nil { + return nil, err + } + + err = sqlx.Select(ctx, &ret, stmt, args...) + return ret, err +} + func (*PackageAccessStatDB) BatchAddCounter(ctx SQLContext, entries []coormq.AddAccessStatEntry) error { + if len(entries) == 0 { + return nil + } + sql := "insert into PackageAccessStat(PackageID, NodeID, Counter, Amount)" + " values(:PackageID, :NodeID, :Counter, 0) as new" + " on duplicate key update Counter=Counter+new.Counter" @@ -36,6 +55,10 @@ func (*PackageAccessStatDB) BatchAddCounter(ctx SQLContext, entries []coormq.Add } func (*PackageAccessStatDB) BatchUpdateAmount(ctx SQLContext, pkgIDs []cdssdk.PackageID, historyWeight float64) error { + if len(pkgIDs) == 0 { + return nil + } + stmt, args, err := sqlx.In("update PackageAccessStat set Amount=Amount*?+Counter*(1-?), Counter = 0 where PackageID in (?)", historyWeight, historyWeight, pkgIDs) if err != nil { return err diff --git a/scanner/internal/event/update_package_access_stat_amount.go b/scanner/internal/event/update_package_access_stat_amount.go index 072e9c6..b3f2729 100644 --- a/scanner/internal/event/update_package_access_stat_amount.go +++ b/scanner/internal/event/update_package_access_stat_amount.go @@ -40,7 +40,7 @@ func (t *UpdatePackageAccessStatAmount) TryMerge(other Event) bool { } func (t *UpdatePackageAccessStatAmount) Execute(execCtx ExecuteContext) { - log := logger.WithType[AgentCacheGC]("Event") + log := logger.WithType[UpdatePackageAccessStatAmount]("Event") startTime := time.Now() log.Debugf("begin with %v", logger.FormatStruct(t.UpdatePackageAccessStatAmount)) defer func() {