From e2fa2a507ccccfec6f8b92e56a15dea07d2b0bde Mon Sep 17 00:00:00 2001 From: Sydonian <794346190@qq.com> Date: Wed, 11 Sep 2024 15:49:13 +0800 Subject: [PATCH] =?UTF-8?q?=E5=A2=9E=E5=8A=A0Object=E7=9A=84=E8=AE=BF?= =?UTF-8?q?=E9=97=AE=E9=87=8F=E7=BB=9F=E8=AE=A1?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- agent/internal/task/cache_move_package.go | 2 +- agent/internal/task/storage_load_package.go | 2 +- agent/internal/task/task.go | 8 +- agent/main.go | 18 ++-- client/internal/cmdline/getp.go | 2 +- client/internal/http/object.go | 2 +- client/internal/services/service.go | 20 ++-- client/main.go | 18 ++-- common/assets/scripts/create_database.sql | 10 +- common/pkgs/accessstat/access_stat.go | 72 ++++++++++++++ .../{package_stat => accessstat}/config.go | 2 +- common/pkgs/db/model/model.go | 7 ++ common/pkgs/db/object.go | 20 ++++ common/pkgs/db/object_access_stat.go | 76 ++++++++++++++ common/pkgs/db/package.go | 26 +++++ common/pkgs/db/package_access_stat.go | 8 +- common/pkgs/mq/coordinator/object.go | 27 +++++ common/pkgs/mq/coordinator/package.go | 33 ------- common/pkgs/package_stat/package_stat.go | 98 ------------------- coordinator/internal/mq/object.go | 5 + coordinator/internal/mq/package.go | 49 ++++++++-- 21 files changed, 325 insertions(+), 180 deletions(-) create mode 100644 common/pkgs/accessstat/access_stat.go rename common/pkgs/{package_stat => accessstat}/config.go (77%) create mode 100644 common/pkgs/db/object_access_stat.go delete mode 100644 common/pkgs/package_stat/package_stat.go diff --git a/agent/internal/task/cache_move_package.go b/agent/internal/task/cache_move_package.go index 3708499..4be9bf3 100644 --- a/agent/internal/task/cache_move_package.go +++ b/agent/internal/task/cache_move_package.go @@ -77,7 +77,7 @@ func (t *CacheMovePackage) do(ctx TaskContext) error { return fmt.Errorf("creating ipfs file: %w", err) } - ctx.packageStat.AddAccessCounter(t.packageID, *stgglb.Local.NodeID, 1) + ctx.accessStat.AddAccessCounter(obj.Object.ObjectID, t.packageID, *stgglb.Local.NodeID, 1) } _, err = coorCli.CachePackageMoved(coormq.NewCachePackageMoved(t.packageID, *stgglb.Local.NodeID)) diff --git a/agent/internal/task/storage_load_package.go b/agent/internal/task/storage_load_package.go index eda7bd3..be68aa6 100644 --- a/agent/internal/task/storage_load_package.go +++ b/agent/internal/task/storage_load_package.go @@ -99,7 +99,7 @@ func (t *StorageLoadPackage) do(task *task.Task[TaskContext], ctx TaskContext) e if err != nil { return err } - ctx.packageStat.AddAccessCounter(t.packageID, *stgglb.Local.NodeID, 1) + ctx.accessStat.AddAccessCounter(obj.Object.ObjectID, t.packageID, *stgglb.Local.NodeID, 1) } _, err = coorCli.StoragePackageLoaded(coormq.NewStoragePackageLoaded(t.userID, t.storageID, t.packageID, t.pinnedBlocks)) diff --git a/agent/internal/task/task.go b/agent/internal/task/task.go index b174495..31c27f8 100644 --- a/agent/internal/task/task.go +++ b/agent/internal/task/task.go @@ -3,16 +3,16 @@ package task import ( "gitlink.org.cn/cloudream/common/pkgs/distlock" "gitlink.org.cn/cloudream/common/pkgs/task" + "gitlink.org.cn/cloudream/storage/common/pkgs/accessstat" "gitlink.org.cn/cloudream/storage/common/pkgs/connectivity" "gitlink.org.cn/cloudream/storage/common/pkgs/downloader" - packagestat "gitlink.org.cn/cloudream/storage/common/pkgs/package_stat" ) type TaskContext struct { distlock *distlock.Service connectivity *connectivity.Collector downloader *downloader.Downloader - packageStat *packagestat.PackageStat + accessStat *accessstat.AccessStat } // 需要在Task结束后主动调用,completing函数将在Manager加锁期间被调用, @@ -27,11 +27,11 @@ type Task = task.Task[TaskContext] type CompleteOption = task.CompleteOption -func NewManager(distlock *distlock.Service, connectivity *connectivity.Collector, downloader *downloader.Downloader, packageStat *packagestat.PackageStat) Manager { +func NewManager(distlock *distlock.Service, connectivity *connectivity.Collector, downloader *downloader.Downloader, accessStat *accessstat.AccessStat) Manager { return task.NewManager(TaskContext{ distlock: distlock, connectivity: connectivity, downloader: downloader, - packageStat: packageStat, + accessStat: accessStat, }) } diff --git a/agent/main.go b/agent/main.go index 314f781..17d3ac7 100644 --- a/agent/main.go +++ b/agent/main.go @@ -12,11 +12,11 @@ import ( "gitlink.org.cn/cloudream/storage/agent/internal/config" "gitlink.org.cn/cloudream/storage/agent/internal/task" stgglb "gitlink.org.cn/cloudream/storage/common/globals" + "gitlink.org.cn/cloudream/storage/common/pkgs/accessstat" "gitlink.org.cn/cloudream/storage/common/pkgs/connectivity" "gitlink.org.cn/cloudream/storage/common/pkgs/distlock" "gitlink.org.cn/cloudream/storage/common/pkgs/downloader" agtrpc "gitlink.org.cn/cloudream/storage/common/pkgs/grpc/agent" - packagestat "gitlink.org.cn/cloudream/storage/common/pkgs/package_stat" "google.golang.org/grpc" @@ -86,11 +86,11 @@ func main() { }) conCol.CollectInPlace() - pkgStat := packagestat.NewPackageStat(packagestat.Config{ + acStat := accessstat.NewAccessStat(accessstat.Config{ // TODO 考虑放到配置里 ReportInterval: time.Second * 10, }) - go servePackageStat(pkgStat) + go serveAccessStat(acStat) distlock, err := distlock.NewService(&config.Cfg().DistLock) if err != nil { @@ -101,7 +101,7 @@ func main() { dlder := downloader.NewDownloader(config.Cfg().Downloader, &conCol) - taskMgr := task.NewManager(distlock, &conCol, &dlder, pkgStat) + taskMgr := task.NewManager(distlock, &conCol, &dlder, acStat) // 启动命令服务器 // TODO 需要设计AgentID持久化机制 @@ -175,25 +175,25 @@ func serveDistLock(svc *distlock.Service) { os.Exit(1) } -func servePackageStat(svc *packagestat.PackageStat) { - logger.Info("start serving package stat") +func serveAccessStat(svc *accessstat.AccessStat) { + logger.Info("start serving access stat") ch := svc.Start() loop: for { val, err := ch.Receive() if err != nil { - logger.Errorf("package stat stopped with error: %v", err) + logger.Errorf("access stat stopped with error: %v", err) break } switch val := val.(type) { case error: - logger.Errorf("package stat stopped with error: %v", val) + logger.Errorf("access stat stopped with error: %v", val) break loop } } - logger.Info("package stat stopped") + logger.Info("access stat stopped") // TODO 仅简单结束了程序 os.Exit(1) diff --git a/client/internal/cmdline/getp.go b/client/internal/cmdline/getp.go index f053bf1..af14784 100644 --- a/client/internal/cmdline/getp.go +++ b/client/internal/cmdline/getp.go @@ -119,7 +119,7 @@ func getpByID(cmdCtx *CommandContext, id cdssdk.PackageID, output string) { } if stgglb.Local.NodeID != nil { - cmdCtx.Cmdline.Svc.PackageStat.AddAccessCounter(id, *stgglb.Local.NodeID, 1) + cmdCtx.Cmdline.Svc.AccessStat.AddAccessCounter(objInfo.Object.ObjectID, id, *stgglb.Local.NodeID, 1) } return nil }() diff --git a/client/internal/http/object.go b/client/internal/http/object.go index 0c402f1..06c986a 100644 --- a/client/internal/http/object.go +++ b/client/internal/http/object.go @@ -131,7 +131,7 @@ func (s *ObjectService) Download(ctx *gin.Context) { // TODO 当client不在某个代理节点上时如何处理? if stgglb.Local.NodeID != nil { - s.svc.PackageStat.AddAccessCounter(file.Object.PackageID, *stgglb.Local.NodeID, float64(sendSize)/float64(file.Object.Size)) + s.svc.AccessStat.AddAccessCounter(file.Object.ObjectID, file.Object.PackageID, *stgglb.Local.NodeID, float64(sendSize)/float64(file.Object.Size)) } if err != nil { diff --git a/client/internal/services/service.go b/client/internal/services/service.go index 7e13bab..1bcd284 100644 --- a/client/internal/services/service.go +++ b/client/internal/services/service.go @@ -3,22 +3,22 @@ package services import ( "gitlink.org.cn/cloudream/common/pkgs/distlock" "gitlink.org.cn/cloudream/storage/client/internal/task" + "gitlink.org.cn/cloudream/storage/common/pkgs/accessstat" "gitlink.org.cn/cloudream/storage/common/pkgs/downloader" - packagestat "gitlink.org.cn/cloudream/storage/common/pkgs/package_stat" ) type Service struct { - DistLock *distlock.Service - TaskMgr *task.Manager - Downloader *downloader.Downloader - PackageStat *packagestat.PackageStat + DistLock *distlock.Service + TaskMgr *task.Manager + Downloader *downloader.Downloader + AccessStat *accessstat.AccessStat } -func NewService(distlock *distlock.Service, taskMgr *task.Manager, downloader *downloader.Downloader, pkgStat *packagestat.PackageStat) (*Service, error) { +func NewService(distlock *distlock.Service, taskMgr *task.Manager, downloader *downloader.Downloader, accStat *accessstat.AccessStat) (*Service, error) { return &Service{ - DistLock: distlock, - TaskMgr: taskMgr, - Downloader: downloader, - PackageStat: pkgStat, + DistLock: distlock, + TaskMgr: taskMgr, + Downloader: downloader, + AccessStat: accStat, }, nil } diff --git a/client/main.go b/client/main.go index f1b9898..7b90e81 100644 --- a/client/main.go +++ b/client/main.go @@ -14,11 +14,11 @@ import ( "gitlink.org.cn/cloudream/storage/client/internal/services" "gitlink.org.cn/cloudream/storage/client/internal/task" stgglb "gitlink.org.cn/cloudream/storage/common/globals" + "gitlink.org.cn/cloudream/storage/common/pkgs/accessstat" "gitlink.org.cn/cloudream/storage/common/pkgs/connectivity" "gitlink.org.cn/cloudream/storage/common/pkgs/distlock" "gitlink.org.cn/cloudream/storage/common/pkgs/downloader" coormq "gitlink.org.cn/cloudream/storage/common/pkgs/mq/coordinator" - packagestat "gitlink.org.cn/cloudream/storage/common/pkgs/package_stat" ) func main() { @@ -84,17 +84,17 @@ func main() { } go serveDistLock(distlockSvc) - pkgStat := packagestat.NewPackageStat(packagestat.Config{ + acStat := accessstat.NewAccessStat(accessstat.Config{ // TODO 考虑放到配置里 ReportInterval: time.Second * 10, }) - go servePackageStat(pkgStat) + go serveAccessStat(acStat) taskMgr := task.NewManager(distlockSvc, &conCol) dlder := downloader.NewDownloader(config.Cfg().Downloader, &conCol) - svc, err := services.NewService(distlockSvc, &taskMgr, &dlder, pkgStat) + svc, err := services.NewService(distlockSvc, &taskMgr, &dlder, acStat) if err != nil { logger.Warnf("new services failed, err: %s", err.Error()) os.Exit(1) @@ -124,25 +124,25 @@ func serveDistLock(svc *distlock.Service) { os.Exit(1) } -func servePackageStat(svc *packagestat.PackageStat) { - logger.Info("start serving package stat") +func serveAccessStat(svc *accessstat.AccessStat) { + logger.Info("start serving access stat") ch := svc.Start() loop: for { val, err := ch.Receive() if err != nil { - logger.Errorf("package stat stopped with error: %v", err) + logger.Errorf("access stat stopped with error: %v", err) break } switch val := val.(type) { case error: - logger.Errorf("package stat stopped with error: %v", val) + logger.Errorf("access stat stopped with error: %v", val) break loop } } - logger.Info("package stat stopped") + logger.Info("access stat stopped") // TODO 仅简单结束了程序 os.Exit(1) diff --git a/common/assets/scripts/create_database.sql b/common/assets/scripts/create_database.sql index b78ef70..02457a0 100644 --- a/common/assets/scripts/create_database.sql +++ b/common/assets/scripts/create_database.sql @@ -176,4 +176,12 @@ create table Location ( insert into Location (LocationID, Name) values - (1, "Local"); \ No newline at end of file + (1, "Local"); + +create table ObjectAccessStat ( + ObjectID int not null comment '对象ID', + NodeID int not null comment '节点ID', + Amount float not null comment '前一日流量的滑动平均值', + Counter float not null comment '本日的流量', + primary key(ObjectID, NodeID) +); \ No newline at end of file diff --git a/common/pkgs/accessstat/access_stat.go b/common/pkgs/accessstat/access_stat.go new file mode 100644 index 0000000..c402dd5 --- /dev/null +++ b/common/pkgs/accessstat/access_stat.go @@ -0,0 +1,72 @@ +package accessstat + +import ( + "fmt" + "sync" + "time" + + "gitlink.org.cn/cloudream/common/pkgs/logger" + cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" + "gitlink.org.cn/cloudream/common/utils/sync2" + stgglb "gitlink.org.cn/cloudream/storage/common/globals" + coormq "gitlink.org.cn/cloudream/storage/common/pkgs/mq/coordinator" +) + +type AccessStatEvent interface{} + +type AccessStat struct { + cfg Config + stats []coormq.AddAccessStatEntry + lock sync.Mutex +} + +func NewAccessStat(cfg Config) *AccessStat { + return &AccessStat{ + cfg: cfg, + } +} + +func (p *AccessStat) AddAccessCounter(objID cdssdk.ObjectID, pkgID cdssdk.PackageID, nodeID cdssdk.NodeID, value float64) { + p.lock.Lock() + defer p.lock.Unlock() + + p.stats = append(p.stats, coormq.AddAccessStatEntry{ + ObjectID: objID, + PackageID: pkgID, + NodeID: nodeID, + Counter: value, + }) +} + +func (p *AccessStat) Start() *sync2.UnboundChannel[AccessStatEvent] { + ch := sync2.NewUnboundChannel[AccessStatEvent]() + + go func() { + coorCli, err := stgglb.CoordinatorMQPool.Acquire() + if err != nil { + ch.Send(fmt.Errorf("new coordinator client: %w", err)) + } + defer stgglb.CoordinatorMQPool.Release(coorCli) + + ticker := time.NewTicker(p.cfg.ReportInterval) + for { + <-ticker.C + + p.lock.Lock() + st := p.stats + p.stats = nil + p.lock.Unlock() + + err := coorCli.AddAccessStat(coormq.ReqAddAccessStat(st)) + if err != nil { + logger.Errorf("add all package access stat counter: %v", err) + + p.lock.Lock() + p.stats = append(p.stats, st...) + p.lock.Unlock() + continue + } + } + }() + return ch +} diff --git a/common/pkgs/package_stat/config.go b/common/pkgs/accessstat/config.go similarity index 77% rename from common/pkgs/package_stat/config.go rename to common/pkgs/accessstat/config.go index 8d3f6d8..b67e8e6 100644 --- a/common/pkgs/package_stat/config.go +++ b/common/pkgs/accessstat/config.go @@ -1,4 +1,4 @@ -package packagestat +package accessstat import "time" diff --git a/common/pkgs/db/model/model.go b/common/pkgs/db/model/model.go index 60a11ed..4504b82 100644 --- a/common/pkgs/db/model/model.go +++ b/common/pkgs/db/model/model.go @@ -107,3 +107,10 @@ type PackageAccessStat struct { Amount float64 `db:"Amount" json:"Amount"` // 前一日的读取量的滑动平均值 Counter float64 `db:"Counter" json:"counter"` // 当日的读取量 } + +type ObjectAccessStat struct { + ObjectID cdssdk.ObjectID `db:"ObjectID" json:"objectID"` + NodeID cdssdk.NodeID `db:"NodeID" json:"nodeID"` + Amount float64 `db:"Amount" json:"Amount"` // 前一日的读取量的滑动平均值 + Counter float64 `db:"Counter" json:"counter"` // 当日的读取量 +} diff --git a/common/pkgs/db/object.go b/common/pkgs/db/object.go index 83b30ac..2df97b6 100644 --- a/common/pkgs/db/object.go +++ b/common/pkgs/db/object.go @@ -28,6 +28,26 @@ func (db *ObjectDB) GetByID(ctx SQLContext, objectID cdssdk.ObjectID) (model.Obj return ret.ToObject(), err } +func (db *ObjectDB) BatchTestObjectID(ctx SQLContext, objectIDs []cdssdk.ObjectID) (map[cdssdk.ObjectID]bool, error) { + stmt, args, err := sqlx.In("select ObjectID from Object where ObjectID in (?)", lo.Uniq(objectIDs)) + if err != nil { + return nil, err + } + + var avaiIDs []cdssdk.ObjectID + err = sqlx.Select(ctx, &avaiIDs, stmt, args...) + if err != nil { + return nil, err + } + + avaiIDMap := make(map[cdssdk.ObjectID]bool) + for _, pkgID := range avaiIDs { + avaiIDMap[pkgID] = true + } + + return avaiIDMap, nil +} + func (db *ObjectDB) BatchGet(ctx SQLContext, objectIDs []cdssdk.ObjectID) ([]model.Object, error) { if len(objectIDs) == 0 { return nil, nil diff --git a/common/pkgs/db/object_access_stat.go b/common/pkgs/db/object_access_stat.go new file mode 100644 index 0000000..3fbd0cc --- /dev/null +++ b/common/pkgs/db/object_access_stat.go @@ -0,0 +1,76 @@ +package db + +import ( + "github.com/jmoiron/sqlx" + cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" + "gitlink.org.cn/cloudream/storage/common/pkgs/db/model" + coormq "gitlink.org.cn/cloudream/storage/common/pkgs/mq/coordinator" +) + +type ObjectAccessStatDB struct { + *DB +} + +func (db *DB) ObjectAccessStat() *ObjectAccessStatDB { + return &ObjectAccessStatDB{db} +} + +func (*ObjectAccessStatDB) Get(ctx SQLContext, objID cdssdk.ObjectID, nodeID cdssdk.NodeID) (model.ObjectAccessStat, error) { + var ret model.ObjectAccessStat + err := sqlx.Get(ctx, &ret, "select * from ObjectAccessStat where ObjectID=? and NodeID=?", objID, nodeID) + return ret, err +} + +func (*ObjectAccessStatDB) GetByObjectID(ctx SQLContext, objID cdssdk.ObjectID) ([]model.ObjectAccessStat, error) { + var ret []model.ObjectAccessStat + err := sqlx.Select(ctx, &ret, "select * from ObjectAccessStat where ObjectID=?", objID) + return ret, err +} + +func (*ObjectAccessStatDB) BatchAddCounter(ctx SQLContext, entries []coormq.AddAccessStatEntry) error { + sql := "insert into ObjectAccessStat(ObjectID, NodeID, Counter, Amount) " + + "values(:ObjectID, :NodeID, :Counter, 0) as new" + + "on duplicate key update Counter=Counter+new.Counter" + err := BatchNamedExec(ctx, sql, 4, entries, nil) + return err +} + +func (*ObjectAccessStatDB) BatchUpdateAmount(ctx SQLContext, objIDs []cdssdk.ObjectID, historyWeight float64) error { + stmt, args, err := sqlx.In("update ObjectAccessStat set Amount=Amount*?+Counter*(1-?), Counter = 0 where ObjectID in (?)", historyWeight, historyWeight, objIDs) + if err != nil { + return err + } + + _, err = ctx.Exec(stmt, args...) + return err +} + +func (*ObjectAccessStatDB) UpdateAllAmount(ctx SQLContext, historyWeight float64) error { + stmt, args, err := sqlx.In("update ObjectAccessStat set Amount=Amount*?+Counter*(1-?), Counter = 0", historyWeight, historyWeight) + if err != nil { + return err + } + + _, err = ctx.Exec(stmt, args...) + return err +} + +func (*ObjectAccessStatDB) DeleteByObjectID(ctx SQLContext, objID cdssdk.ObjectID) error { + _, err := ctx.Exec("delete from ObjectAccessStat where ObjectID=?", objID) + return err +} + +func (*ObjectAccessStatDB) BatchDeleteByObjectID(ctx SQLContext, objIDs []cdssdk.ObjectID) error { + stmt, args, err := sqlx.In("delete from ObjectAccessStat where ObjectID in (?)", objIDs) + if err != nil { + return err + } + + _, err = ctx.Exec(stmt, args...) + return err +} + +func (*ObjectAccessStatDB) DeleteInPackage(ctx SQLContext, packageID cdssdk.PackageID) error { + _, err := ctx.Exec("delete ObjectAccessStat from ObjectAccessStat inner join Object on ObjectAccessStat.ObjectID = Object.ObjectID where PackageID = ?", packageID) + return err +} diff --git a/common/pkgs/db/package.go b/common/pkgs/db/package.go index 8c506ea..d88d0d6 100644 --- a/common/pkgs/db/package.go +++ b/common/pkgs/db/package.go @@ -6,6 +6,7 @@ import ( "fmt" "github.com/jmoiron/sqlx" + "github.com/samber/lo" cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" "gitlink.org.cn/cloudream/storage/common/pkgs/db/model" @@ -31,6 +32,26 @@ func (db *PackageDB) GetByName(ctx SQLContext, bucketID cdssdk.BucketID, name st return ret, err } +func (db *PackageDB) BatchTestPackageID(ctx SQLContext, pkgIDs []cdssdk.PackageID) (map[cdssdk.PackageID]bool, error) { + stmt, args, err := sqlx.In("select PackageID from Package where PackageID in (?)", lo.Uniq(pkgIDs)) + if err != nil { + return nil, err + } + + var avaiIDs []cdssdk.PackageID + err = sqlx.Select(ctx, &avaiIDs, stmt, args...) + if err != nil { + return nil, err + } + + avaiIDMap := make(map[cdssdk.PackageID]bool) + for _, pkgID := range avaiIDs { + avaiIDMap[pkgID] = true + } + + return avaiIDMap, nil +} + func (*PackageDB) BatchGetAllPackageIDs(ctx SQLContext, start int, count int) ([]cdssdk.PackageID, error) { var ret []cdssdk.PackageID err := sqlx.Select(ctx, &ret, "select PackageID from Package limit ?, ?", start, count) @@ -136,6 +157,11 @@ func (db *PackageDB) SoftDelete(ctx SQLContext, packageID cdssdk.PackageID) erro return fmt.Errorf("change package state failed, err: %w", err) } + err = db.ObjectAccessStat().DeleteInPackage(ctx, packageID) + if err != nil { + return fmt.Errorf("delete from object access stat: %w", err) + } + err = db.ObjectBlock().DeleteInPackage(ctx, packageID) if err != nil { return fmt.Errorf("delete from object rep failed, err: %w", err) diff --git a/common/pkgs/db/package_access_stat.go b/common/pkgs/db/package_access_stat.go index 3a5cae0..1e34b6c 100644 --- a/common/pkgs/db/package_access_stat.go +++ b/common/pkgs/db/package_access_stat.go @@ -27,10 +27,10 @@ func (*PackageAccessStatDB) GetByPackageID(ctx SQLContext, pkgID cdssdk.PackageI return ret, err } -func (*PackageAccessStatDB) BatchAddCounter(ctx SQLContext, entries []coormq.AddPackageAccessStatCounterEntry) error { - sql := "insert into PackageAccessStat(PackageID, NodeID, Counter, Amount) " + - "values(:PackageID, :NodeID, :Value, 0)" + - "on duplicate key update Counter=Counter+:Value" +func (*PackageAccessStatDB) BatchAddCounter(ctx SQLContext, entries []coormq.AddAccessStatEntry) error { + sql := "insert into PackageAccessStat(PackageID, NodeID, Counter, Amount)" + + " values(:PackageID, :NodeID, :Counter, 0) as new" + + " on duplicate key update Counter=Counter+new.Counter" err := BatchNamedExec(ctx, sql, 4, entries, nil) return err } diff --git a/common/pkgs/mq/coordinator/object.go b/common/pkgs/mq/coordinator/object.go index d09303a..197507b 100644 --- a/common/pkgs/mq/coordinator/object.go +++ b/common/pkgs/mq/coordinator/object.go @@ -24,6 +24,8 @@ type ObjectService interface { DeleteObjects(msg *DeleteObjects) (*DeleteObjectsResp, *mq.CodeMessage) GetDatabaseAll(msg *GetDatabaseAll) (*GetDatabaseAllResp, *mq.CodeMessage) + + AddAccessStat(msg *AddAccessStat) } // 查询Package中的所有Object,返回的Objects会按照ObjectID升序 @@ -218,3 +220,28 @@ func RespDeleteObjects() *DeleteObjectsResp { func (client *Client) DeleteObjects(msg *DeleteObjects) (*DeleteObjectsResp, error) { return mq.Request(Service.DeleteObjects, client.rabbitCli, msg) } + +// 增加访问计数 +var _ = RegisterNoReply(Service.AddAccessStat) + +type AddAccessStat struct { + mq.MessageBodyBase + Entries []AddAccessStatEntry `json:"entries"` +} + +type AddAccessStatEntry struct { + ObjectID cdssdk.ObjectID `json:"objectID" db:"ObjectID"` + PackageID cdssdk.PackageID `json:"packageID" db:"PackageID"` + NodeID cdssdk.NodeID `json:"nodeID" db:"NodeID"` + Counter float64 `json:"counter" db:"Counter"` +} + +func ReqAddAccessStat(entries []AddAccessStatEntry) *AddAccessStat { + return &AddAccessStat{ + Entries: entries, + } +} + +func (client *Client) AddAccessStat(msg *AddAccessStat) error { + return mq.Send(Service.AddAccessStat, client.rabbitCli, msg) +} diff --git a/common/pkgs/mq/coordinator/package.go b/common/pkgs/mq/coordinator/package.go index b4820ca..d80cc37 100644 --- a/common/pkgs/mq/coordinator/package.go +++ b/common/pkgs/mq/coordinator/package.go @@ -23,8 +23,6 @@ type PackageService interface { GetPackageCachedNodes(msg *GetPackageCachedNodes) (*GetPackageCachedNodesResp, *mq.CodeMessage) GetPackageLoadedNodes(msg *GetPackageLoadedNodes) (*GetPackageLoadedNodesResp, *mq.CodeMessage) - - AddPackageAccessStatCounter(msg *AddPackageAccessStatCounter) (*AddPackageAccessStatCounterResp, *mq.CodeMessage) } // 获取Package基本信息 @@ -256,34 +254,3 @@ func NewGetPackageLoadedNodesResp(nodeIDs []cdssdk.NodeID) *GetPackageLoadedNode func (client *Client) GetPackageLoadedNodes(msg *GetPackageLoadedNodes) (*GetPackageLoadedNodesResp, error) { return mq.Request(Service.GetPackageLoadedNodes, client.rabbitCli, msg) } - -// 更新Pacakge访问统计中的计数值 -var _ = Register(Service.AddPackageAccessStatCounter) - -type AddPackageAccessStatCounter struct { - mq.MessageBodyBase - Entries []AddPackageAccessStatCounterEntry `json:"entries"` -} -type AddPackageAccessStatCounterEntry struct { - PackageID cdssdk.PackageID `json:"packageID" db:"PackageID"` - NodeID cdssdk.NodeID `json:"nodeID" db:"NodeID"` - Value float64 `json:"value" db:"Value"` -} - -type AddPackageAccessStatCounterResp struct { - mq.MessageBodyBase -} - -func NewAddPackageAccessStatCounter(entries []AddPackageAccessStatCounterEntry) *AddPackageAccessStatCounter { - return &AddPackageAccessStatCounter{ - Entries: entries, - } -} - -func NewAddPackageAccessStatCounterResp() *AddPackageAccessStatCounterResp { - return &AddPackageAccessStatCounterResp{} -} - -func (client *Client) AddPackageAccessStatCounter(msg *AddPackageAccessStatCounter) (*AddPackageAccessStatCounterResp, error) { - return mq.Request(Service.AddPackageAccessStatCounter, client.rabbitCli, msg) -} diff --git a/common/pkgs/package_stat/package_stat.go b/common/pkgs/package_stat/package_stat.go deleted file mode 100644 index c430325..0000000 --- a/common/pkgs/package_stat/package_stat.go +++ /dev/null @@ -1,98 +0,0 @@ -package packagestat - -import ( - "fmt" - "sync" - "time" - - "gitlink.org.cn/cloudream/common/pkgs/logger" - cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" - "gitlink.org.cn/cloudream/common/utils/sync2" - stgglb "gitlink.org.cn/cloudream/storage/common/globals" - coormq "gitlink.org.cn/cloudream/storage/common/pkgs/mq/coordinator" -) - -type PackageStatEvent interface{} - -type amountKey struct { - PackageID cdssdk.PackageID - NodeID cdssdk.NodeID -} - -type amount struct { - Counter float64 -} - -type PackageStat struct { - cfg Config - amounts map[amountKey]*amount - lock sync.Mutex -} - -func NewPackageStat(cfg Config) *PackageStat { - return &PackageStat{ - cfg: cfg, - amounts: make(map[amountKey]*amount), - } -} - -func (p *PackageStat) AddAccessCounter(pkgID cdssdk.PackageID, nodeID cdssdk.NodeID, value float64) { - p.lock.Lock() - defer p.lock.Unlock() - - key := amountKey{ - PackageID: pkgID, - NodeID: nodeID, - } - if _, ok := p.amounts[key]; !ok { - p.amounts[key] = &amount{} - } - p.amounts[key].Counter += value -} - -func (p *PackageStat) Start() *sync2.UnboundChannel[PackageStatEvent] { - ch := sync2.NewUnboundChannel[PackageStatEvent]() - - go func() { - coorCli, err := stgglb.CoordinatorMQPool.Acquire() - if err != nil { - ch.Send(fmt.Errorf("new coordinator client: %w", err)) - } - defer stgglb.CoordinatorMQPool.Release(coorCli) - - ticker := time.NewTicker(p.cfg.ReportInterval) - for { - <-ticker.C - - p.lock.Lock() - amts := p.amounts - p.amounts = make(map[amountKey]*amount) - - var addEntries []coormq.AddPackageAccessStatCounterEntry - for key, amount := range amts { - addEntries = append(addEntries, coormq.AddPackageAccessStatCounterEntry{ - PackageID: key.PackageID, - NodeID: key.NodeID, - Value: amount.Counter, - }) - } - p.lock.Unlock() - - _, err := coorCli.AddPackageAccessStatCounter(coormq.NewAddPackageAccessStatCounter(addEntries)) - if err != nil { - logger.Errorf("add all package access stat counter: %v", err) - - p.lock.Lock() - for key, a := range amts { - if _, ok := p.amounts[key]; !ok { - p.amounts[key] = &amount{} - } - p.amounts[key].Counter += a.Counter - } - p.lock.Unlock() - continue - } - } - }() - return ch -} diff --git a/coordinator/internal/mq/object.go b/coordinator/internal/mq/object.go index 3b6e66c..4890bb3 100644 --- a/coordinator/internal/mq/object.go +++ b/coordinator/internal/mq/object.go @@ -429,6 +429,11 @@ func (svc *Service) DeleteObjects(msg *coormq.DeleteObjects) (*coormq.DeleteObje return fmt.Errorf("batch deleting pinned objects: %w", err) } + err = svc.db.ObjectAccessStat().BatchDeleteByObjectID(tx, msg.ObjectIDs) + if err != nil { + return fmt.Errorf("batch deleting object access stats: %w", err) + } + return nil }) if err != nil { diff --git a/coordinator/internal/mq/package.go b/coordinator/internal/mq/package.go index 9ed43c9..0a466c1 100644 --- a/coordinator/internal/mq/package.go +++ b/coordinator/internal/mq/package.go @@ -221,13 +221,48 @@ func (svc *Service) GetPackageLoadedNodes(msg *coormq.GetPackageLoadedNodes) (*c return mq.ReplyOK(coormq.NewGetPackageLoadedNodesResp(nodeIDs)) } -func (svc *Service) AddPackageAccessStatCounter(msg *coormq.AddPackageAccessStatCounter) (*coormq.AddPackageAccessStatCounterResp, *mq.CodeMessage) { - err := svc.db.PackageAccessStat().BatchAddCounter(svc.db.SQLCtx(), msg.Entries) - if err != nil { - errMsg := fmt.Sprintf("batch add package access stat counter: %s", err.Error()) - logger.Error(errMsg) - return nil, mq.Failed(errorcode.OperationFailed, errMsg) +func (svc *Service) AddAccessStat(msg *coormq.AddAccessStat) { + pkgIDs := make([]cdssdk.PackageID, len(msg.Entries)) + objIDs := make([]cdssdk.ObjectID, len(msg.Entries)) + for i, e := range msg.Entries { + pkgIDs[i] = e.PackageID + objIDs[i] = e.ObjectID } - return mq.ReplyOK(coormq.NewAddPackageAccessStatCounterResp()) + err := svc.db.DoTx(sql.LevelSerializable, func(tx *sqlx.Tx) error { + avaiPkgIDs, err := svc.db.Package().BatchTestPackageID(tx, pkgIDs) + if err != nil { + return fmt.Errorf("batch test package id: %w", err) + } + + avaiObjIDs, err := svc.db.Object().BatchTestObjectID(tx, objIDs) + if err != nil { + return fmt.Errorf("batch test object id: %w", err) + } + + var willAdds []coormq.AddAccessStatEntry + for _, e := range msg.Entries { + if avaiPkgIDs[e.PackageID] && avaiObjIDs[e.ObjectID] { + willAdds = append(willAdds, e) + } + } + + if len(willAdds) > 0 { + err := svc.db.PackageAccessStat().BatchAddCounter(tx, willAdds) + if err != nil { + return fmt.Errorf("batch add package access stat counter: %w", err) + } + + err = svc.db.ObjectAccessStat().BatchAddCounter(tx, willAdds) + if err != nil { + return fmt.Errorf("batch add object access stat counter: %w", err) + } + } + + return nil + }) + + if err != nil { + logger.Warn(err.Error()) + } }