From 9c9803e449265e2dff4746c86256e30cea8b57e8 Mon Sep 17 00:00:00 2001 From: Sydonian <794346190@qq.com> Date: Tue, 5 Nov 2024 15:26:07 +0800 Subject: [PATCH] =?UTF-8?q?=E8=A7=A3=E5=86=B3=E8=B0=83=E8=AF=95=E9=97=AE?= =?UTF-8?q?=E9=A2=98?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- agent/internal/task/cache_move_package.go | 2 +- agent/internal/task/storage_load_package.go | 2 +- client/internal/cmdline/getp.go | 6 +- client/internal/config/config.go | 2 + client/internal/http/object.go | 6 +- common/assets/confs/client.config.json | 4 +- common/pkgs/accessstat/access_stat.go | 4 +- common/pkgs/db2/object.go | 34 +++++++----- common/pkgs/db2/object_access_stat.go | 15 ++++- common/pkgs/db2/package_access_stat.go | 11 +++- common/pkgs/downloader/iterator.go | 2 +- common/pkgs/downloader/lrc.go | 2 +- common/pkgs/ioswitch2/ops2/ec.go | 2 +- common/pkgs/ioswitchlrc/ops2/ec.go | 2 +- common/pkgs/mq/coordinator/object.go | 12 ++-- common/pkgs/mq/coordinator/package.go | 4 +- common/pkgs/storage/local/shard_store.go | 55 +++++++++++++++---- coordinator/internal/mq/object.go | 17 +++--- .../event/check_package_redundancy.go | 7 +-- 19 files changed, 121 insertions(+), 68 deletions(-) diff --git a/agent/internal/task/cache_move_package.go b/agent/internal/task/cache_move_package.go index 98dad4a..e46da63 100644 --- a/agent/internal/task/cache_move_package.go +++ b/agent/internal/task/cache_move_package.go @@ -84,7 +84,7 @@ func (t *CacheMovePackage) do(ctx TaskContext) error { return fmt.Errorf("finishing store: %w", err) } - ctx.accessStat.AddAccessCounter(obj.Object.ObjectID, t.packageID, *stgglb.Local.NodeID, 1) + ctx.accessStat.AddAccessCounter(obj.Object.ObjectID, t.packageID, t.storageID, 1) } _, err = coorCli.CachePackageMoved(coormq.NewCachePackageMoved(t.packageID, t.storageID)) diff --git a/agent/internal/task/storage_load_package.go b/agent/internal/task/storage_load_package.go index 2911444..90e6f67 100644 --- a/agent/internal/task/storage_load_package.go +++ b/agent/internal/task/storage_load_package.go @@ -117,7 +117,7 @@ func (t *StorageLoadPackage) do(task *task.Task[TaskContext], ctx TaskContext) e if err != nil { return err } - ctx.accessStat.AddAccessCounter(obj.Object.ObjectID, t.packageID, *stgglb.Local.NodeID, 1) + ctx.accessStat.AddAccessCounter(obj.Object.ObjectID, t.packageID, t.storageID, 1) } _, err = coorCli.StoragePackageLoaded(coormq.NewStoragePackageLoaded(t.userID, t.storageID, t.packageID, t.pinnedBlocks)) diff --git a/client/internal/cmdline/getp.go b/client/internal/cmdline/getp.go index af14784..d071373 100644 --- a/client/internal/cmdline/getp.go +++ b/client/internal/cmdline/getp.go @@ -12,7 +12,7 @@ import ( "github.com/inhies/go-bytesize" "github.com/spf13/cobra" cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" - stgglb "gitlink.org.cn/cloudream/storage/common/globals" + "gitlink.org.cn/cloudream/storage/client/internal/config" "gitlink.org.cn/cloudream/storage/common/pkgs/iterator" ) @@ -118,8 +118,8 @@ func getpByID(cmdCtx *CommandContext, id cdssdk.PackageID, output string) { return fmt.Errorf("copy object data to local file failed, err: %w", err) } - if stgglb.Local.NodeID != nil { - cmdCtx.Cmdline.Svc.AccessStat.AddAccessCounter(objInfo.Object.ObjectID, id, *stgglb.Local.NodeID, 1) + if config.Cfg().StorageID > 0 { + cmdCtx.Cmdline.Svc.AccessStat.AddAccessCounter(objInfo.Object.ObjectID, id, config.Cfg().StorageID, 1) } return nil }() diff --git a/client/internal/config/config.go b/client/internal/config/config.go index c915ec4..b0cbb24 100644 --- a/client/internal/config/config.go +++ b/client/internal/config/config.go @@ -3,6 +3,7 @@ package config import ( "gitlink.org.cn/cloudream/common/pkgs/distlock" "gitlink.org.cn/cloudream/common/pkgs/logger" + cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" "gitlink.org.cn/cloudream/common/utils/config" stgmodels "gitlink.org.cn/cloudream/storage/common/models" "gitlink.org.cn/cloudream/storage/common/pkgs/connectivity" @@ -19,6 +20,7 @@ type Config struct { DistLock distlock.Config `json:"distlock"` Connectivity connectivity.Config `json:"connectivity"` Downloader downloader.Config `json:"downloader"` + StorageID cdssdk.StorageID `json:"storageID"` // TODO 进行访问量统计时,当前客户端所属的存储ID。临时解决方案。 } var cfg Config diff --git a/client/internal/http/object.go b/client/internal/http/object.go index e47668f..381b218 100644 --- a/client/internal/http/object.go +++ b/client/internal/http/object.go @@ -12,7 +12,7 @@ import ( "gitlink.org.cn/cloudream/common/consts/errorcode" "gitlink.org.cn/cloudream/common/pkgs/logger" "gitlink.org.cn/cloudream/common/sdks/storage/cdsapi" - stgglb "gitlink.org.cn/cloudream/storage/common/globals" + "gitlink.org.cn/cloudream/storage/client/internal/config" "gitlink.org.cn/cloudream/storage/common/pkgs/downloader" ) @@ -125,8 +125,8 @@ func (s *ObjectService) Download(ctx *gin.Context) { } // TODO 当client不在某个代理节点上时如何处理? - if stgglb.Local.NodeID != nil { - s.svc.AccessStat.AddAccessCounter(file.Object.ObjectID, file.Object.PackageID, *stgglb.Local.NodeID, float64(n)/float64(file.Object.Size)) + if config.Cfg().StorageID > 0 { + s.svc.AccessStat.AddAccessCounter(file.Object.ObjectID, file.Object.PackageID, config.Cfg().StorageID, float64(n)/float64(file.Object.Size)) } } diff --git a/common/assets/confs/client.config.json b/common/assets/confs/client.config.json index ed23a56..c50190b 100644 --- a/common/assets/confs/client.config.json +++ b/common/assets/confs/client.config.json @@ -21,7 +21,6 @@ "retryInterval": 5000 } }, - "ipfs": null, "distlock": { "etcdAddress": "127.0.0.1:2379", "etcdUsername": "", @@ -37,5 +36,6 @@ "maxStripCacheCount": 100, "highLatencyNode": 35, "ecStripPrefetchCount": 1 - } + }, + "storageID": 0 } \ No newline at end of file diff --git a/common/pkgs/accessstat/access_stat.go b/common/pkgs/accessstat/access_stat.go index ae2684e..7820626 100644 --- a/common/pkgs/accessstat/access_stat.go +++ b/common/pkgs/accessstat/access_stat.go @@ -26,14 +26,14 @@ func NewAccessStat(cfg Config) *AccessStat { } } -func (p *AccessStat) AddAccessCounter(objID cdssdk.ObjectID, pkgID cdssdk.PackageID, nodeID cdssdk.NodeID, value float64) { +func (p *AccessStat) AddAccessCounter(objID cdssdk.ObjectID, pkgID cdssdk.PackageID, stgID cdssdk.StorageID, value float64) { p.lock.Lock() defer p.lock.Unlock() p.stats = append(p.stats, coormq.AddAccessStatEntry{ ObjectID: objID, PackageID: pkgID, - NodeID: nodeID, + StorageID: stgID, Counter: value, }) } diff --git a/common/pkgs/db2/object.go b/common/pkgs/db2/object.go index 395a8c2..94a4a77 100644 --- a/common/pkgs/db2/object.go +++ b/common/pkgs/db2/object.go @@ -90,22 +90,30 @@ func (db *ObjectDB) BatchCreate(ctx SQLContext, objs *[]cdssdk.Object) error { return ctx.Table("Object").Create(objs).Error } -func (db *ObjectDB) BatchUpsertByPackagePath(ctx SQLContext, objs []cdssdk.Object) error { +// 批量更新对象所有属性,objs中的对象必须包含ObjectID +func (db *ObjectDB) BatchUpdate(ctx SQLContext, objs []cdssdk.Object) error { if len(objs) == 0 { return nil } - // 使用 GORM 的 Save 方法,插入或更新对象 - return ctx.Table("Object").Save(&objs).Error + return ctx.Clauses(clause.OnConflict{ + Columns: []clause.Column{{Name: "ObjectID"}}, + UpdateAll: true, + }).Create(objs).Error } -func (db *ObjectDB) BatchUpert(ctx SQLContext, objs []cdssdk.Object) error { +// 批量更新对象指定属性,objs中的对象只需设置需要更新的属性即可,但: +// 1. 必须包含ObjectID +// 2. 日期类型属性不能设置为0值 +func (db *ObjectDB) BatchUpdateColumns(ctx SQLContext, objs []cdssdk.Object, columns []string) error { if len(objs) == 0 { return nil } - // 直接更新或插入 - return ctx.Table("Object").Save(&objs).Error + return ctx.Clauses(clause.OnConflict{ + Columns: []clause.Column{{Name: "ObjectID"}}, + DoUpdates: clause.AssignmentColumns(columns), + }).Create(objs).Error } func (db *ObjectDB) GetPackageObjects(ctx SQLContext, packageID cdssdk.PackageID) ([]model.Object, error) { @@ -214,7 +222,7 @@ func (db *ObjectDB) BatchAdd(ctx SQLContext, packageID cdssdk.PackageID, adds [] } // 先进行更新 - err = db.BatchUpert(ctx, updatingObjs) + err = db.BatchUpdate(ctx, updatingObjs) if err != nil { return nil, fmt.Errorf("batch update objects: %w", err) } @@ -297,16 +305,12 @@ func (db *ObjectDB) BatchUpdateRedundancy(ctx SQLContext, objs []coormq.Updating dummyObjs = append(dummyObjs, cdssdk.Object{ ObjectID: obj.ObjectID, Redundancy: obj.Redundancy, - CreateTime: nowTime, + CreateTime: nowTime, // 实际不会更新,只因为不能是0值 UpdateTime: nowTime, }) } - // 目前只能使用这种方式来同时更新大量数据 - err := ctx.Clauses(clause.OnConflict{ - Columns: []clause.Column{{Name: "ObjectID"}}, - DoUpdates: clause.AssignmentColumns([]string{"Redundancy", "UpdateTime"})}, - ).Create(&dummyObjs).Error + err := db.Object().BatchUpdateColumns(ctx, dummyObjs, []string{"Redundancy", "UpdateTime"}) if err != nil { return fmt.Errorf("batch update object redundancy: %w", err) } @@ -338,7 +342,7 @@ func (db *ObjectDB) BatchUpdateRedundancy(ctx SQLContext, objs []coormq.Updating caches = append(caches, model.Cache{ FileHash: blk.FileHash, StorageID: blk.StorageID, - CreateTime: time.Now(), + CreateTime: nowTime, Priority: 0, }) } @@ -354,7 +358,7 @@ func (db *ObjectDB) BatchUpdateRedundancy(ctx SQLContext, objs []coormq.Updating pinneds = append(pinneds, cdssdk.PinnedObject{ ObjectID: obj.ObjectID, StorageID: p, - CreateTime: time.Now(), + CreateTime: nowTime, }) } } diff --git a/common/pkgs/db2/object_access_stat.go b/common/pkgs/db2/object_access_stat.go index 6c8ebbe..52e79d5 100644 --- a/common/pkgs/db2/object_access_stat.go +++ b/common/pkgs/db2/object_access_stat.go @@ -4,6 +4,7 @@ import ( cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" stgmod "gitlink.org.cn/cloudream/storage/common/models" coormq "gitlink.org.cn/cloudream/storage/common/pkgs/mq/coordinator" + "gorm.io/gorm" "gorm.io/gorm/clause" ) @@ -61,11 +62,19 @@ func (*ObjectAccessStatDB) BatchAddCounter(ctx SQLContext, entries []coormq.AddA } for _, entry := range entries { + acc := stgmod.ObjectAccessStat{ + ObjectID: entry.ObjectID, + StorageID: entry.StorageID, + Counter: entry.Counter, + } + err := ctx.Table("ObjectAccessStat"). Clauses(clause.OnConflict{ - UpdateAll: true, - }). - Create(&entry).Error + Columns: []clause.Column{{Name: "ObjectID"}, {Name: "StorageID"}}, + DoUpdates: clause.Assignments(map[string]any{ + "Counter": gorm.Expr("Counter + values(Counter)"), + }), + }).Create(&acc).Error if err != nil { return err } diff --git a/common/pkgs/db2/package_access_stat.go b/common/pkgs/db2/package_access_stat.go index f5949b9..5e93bfc 100644 --- a/common/pkgs/db2/package_access_stat.go +++ b/common/pkgs/db2/package_access_stat.go @@ -43,12 +43,21 @@ func (*PackageAccessStatDB) BatchAddCounter(ctx SQLContext, entries []coormq.Add return nil } + accs := make([]stgmod.PackageAccessStat, len(entries)) + for i, e := range entries { + accs[i] = stgmod.PackageAccessStat{ + PackageID: e.PackageID, + StorageID: e.StorageID, + Counter: e.Counter, + } + } + return ctx.Clauses(clause.OnConflict{ Columns: []clause.Column{{Name: "PackageID"}, {Name: "StorageID"}}, DoUpdates: clause.Assignments(map[string]any{ "Counter": gorm.Expr("Counter + values(Counter)"), }), - }).Table("PackageAccessStat").Create(&entries).Error + }).Table("PackageAccessStat").Create(&accs).Error } func (*PackageAccessStatDB) BatchUpdateAmount(ctx SQLContext, pkgIDs []cdssdk.PackageID, historyWeight float64) error { diff --git a/common/pkgs/downloader/iterator.go b/common/pkgs/downloader/iterator.go index 8ca630f..460fcab 100644 --- a/common/pkgs/downloader/iterator.go +++ b/common/pkgs/downloader/iterator.go @@ -224,7 +224,7 @@ func (iter *DownloadObjectIterator) downloadECObject(req downloadReqeust2, ecRed if i > 0 { logStrs = append(logStrs, ", ") } - logStrs = append(logStrs, fmt.Sprintf("%v@%v", b.Block.Index, b.Storage)) + logStrs = append(logStrs, fmt.Sprintf("%v@%v", b.Block.Index, b.Storage.Storage.String())) } logger.Debug(logStrs...) diff --git a/common/pkgs/downloader/lrc.go b/common/pkgs/downloader/lrc.go index 67fbac0..0d1fc0e 100644 --- a/common/pkgs/downloader/lrc.go +++ b/common/pkgs/downloader/lrc.go @@ -40,7 +40,7 @@ func (iter *DownloadObjectIterator) downloadLRCObject(req downloadReqeust2, red if i > 0 { logStrs = append(logStrs, ", ") } - logStrs = append(logStrs, fmt.Sprintf("%v@%v", b.Block.Index, b.Storage)) + logStrs = append(logStrs, fmt.Sprintf("%v@%v", b.Block.Index, b.Storage.Storage.String())) } logger.Debug(logStrs...) diff --git a/common/pkgs/ioswitch2/ops2/ec.go b/common/pkgs/ioswitch2/ops2/ec.go index 9ffa468..8f983d0 100644 --- a/common/pkgs/ioswitch2/ops2/ec.go +++ b/common/pkgs/ioswitch2/ops2/ec.go @@ -132,7 +132,7 @@ func (o *ECMultiply) Execute(ctx *exec.ExecContext, e *exec.Executor) error { for i := range o.Outputs { rd, wr := io.Pipe() - outputVars[i].Stream = rd + outputVars[i] = &exec.StreamValue{Stream: rd} outputWrs[i] = wr } diff --git a/common/pkgs/ioswitchlrc/ops2/ec.go b/common/pkgs/ioswitchlrc/ops2/ec.go index 0bced37..f330028 100644 --- a/common/pkgs/ioswitchlrc/ops2/ec.go +++ b/common/pkgs/ioswitchlrc/ops2/ec.go @@ -42,7 +42,7 @@ func (o *GalMultiply) Execute(ctx *exec.ExecContext, e *exec.Executor) error { outputVars := make([]*exec.StreamValue, len(o.Outputs)) for i := range o.Outputs { rd, wr := io.Pipe() - outputVars[i].Stream = rd + outputVars[i] = &exec.StreamValue{Stream: rd} outputWrs[i] = wr } diff --git a/common/pkgs/mq/coordinator/object.go b/common/pkgs/mq/coordinator/object.go index af5072d..102a382 100644 --- a/common/pkgs/mq/coordinator/object.go +++ b/common/pkgs/mq/coordinator/object.go @@ -120,8 +120,8 @@ type UpdateObjectRedundancyResp struct { mq.MessageBodyBase } type UpdatingObjectRedundancy struct { - ObjectID cdssdk.ObjectID `json:"objectID" db:"ObjectID"` - Redundancy cdssdk.Redundancy `json:"redundancy" db:"Redundancy"` + ObjectID cdssdk.ObjectID `json:"objectID"` + Redundancy cdssdk.Redundancy `json:"redundancy"` PinnedAt []cdssdk.StorageID `json:"pinnedAt"` Blocks []stgmod.ObjectBlock `json:"blocks"` } @@ -231,10 +231,10 @@ type AddAccessStat struct { } type AddAccessStatEntry struct { - ObjectID cdssdk.ObjectID `json:"objectID" db:"ObjectID"` - PackageID cdssdk.PackageID `json:"packageID" db:"PackageID"` - NodeID cdssdk.NodeID `json:"nodeID" db:"NodeID"` - Counter float64 `json:"counter" db:"Counter"` + ObjectID cdssdk.ObjectID `json:"objectID"` + PackageID cdssdk.PackageID `json:"packageID"` + StorageID cdssdk.StorageID `json:"storageID"` + Counter float64 `json:"counter"` } func ReqAddAccessStat(entries []AddAccessStatEntry) *AddAccessStat { diff --git a/common/pkgs/mq/coordinator/package.go b/common/pkgs/mq/coordinator/package.go index 893d40a..5df5edf 100644 --- a/common/pkgs/mq/coordinator/package.go +++ b/common/pkgs/mq/coordinator/package.go @@ -166,8 +166,8 @@ var _ = Register(Service.DeletePackage) type DeletePackage struct { mq.MessageBodyBase - UserID cdssdk.UserID `db:"userID"` - PackageID cdssdk.PackageID `db:"packageID"` + UserID cdssdk.UserID `json:"userID"` + PackageID cdssdk.PackageID `json:"packageID"` } type DeletePackageResp struct { mq.MessageBodyBase diff --git a/common/pkgs/storage/local/shard_store.go b/common/pkgs/storage/local/shard_store.go index d68db15..baa2cc9 100644 --- a/common/pkgs/storage/local/shard_store.go +++ b/common/pkgs/storage/local/shard_store.go @@ -8,6 +8,7 @@ import ( "io/fs" "os" "path/filepath" + "sync" "gitlink.org.cn/cloudream/common/pkgs/logger" cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" @@ -23,8 +24,9 @@ const ( ) type ShardStore struct { - stg cdssdk.Storage - cfg cdssdk.LocalShardStorage + stg cdssdk.Storage + cfg cdssdk.LocalShardStorage + lock sync.Mutex } func NewShardStore(stg cdssdk.Storage, cfg cdssdk.LocalShardStorage) (*ShardStore, error) { @@ -48,6 +50,9 @@ func (s *ShardStore) Stop() { } func (s *ShardStore) New() types.ShardWriter { + s.lock.Lock() + defer s.lock.Unlock() + tmpDir := filepath.Join(s.cfg.Root, TempDir) err := os.MkdirAll(tmpDir, 0755) @@ -70,6 +75,9 @@ func (s *ShardStore) New() types.ShardWriter { // 使用F函数创建Option对象 func (s *ShardStore) Open(opt types.OpenOption) (io.ReadCloser, error) { + s.lock.Lock() + defer s.lock.Unlock() + fileName := string(opt.FileHash) if len(fileName) < 2 { return nil, fmt.Errorf("invalid file name") @@ -97,6 +105,9 @@ func (s *ShardStore) Open(opt types.OpenOption) (io.ReadCloser, error) { } func (s *ShardStore) ListAll() ([]types.FileInfo, error) { + s.lock.Lock() + defer s.lock.Unlock() + var infos []types.FileInfo blockDir := filepath.Join(s.cfg.Root, BlocksDir) @@ -109,11 +120,10 @@ func (s *ShardStore) ListAll() ([]types.FileInfo, error) { return nil } - info, ok := d.(fs.FileInfo) - if !ok { - return nil + info, err := d.Info() + if err != nil { + return err } - // TODO 简单检查一下文件名是否合法 infos = append(infos, types.FileInfo{ @@ -131,6 +141,11 @@ func (s *ShardStore) ListAll() ([]types.FileInfo, error) { } func (s *ShardStore) Purge(removes []cdssdk.FileHash) error { + s.lock.Lock() + defer s.lock.Unlock() + + cnt := 0 + for _, hash := range removes { fileName := string(hash) @@ -138,9 +153,13 @@ func (s *ShardStore) Purge(removes []cdssdk.FileHash) error { err := os.Remove(path) if err != nil { s.getLogger().Warnf("remove file %v: %v", path, err) + } else { + cnt++ } } + s.getLogger().Infof("purge %d files", cnt) + // TODO 无法保证原子性,所以删除失败只打日志 return nil } @@ -153,11 +172,17 @@ func (s *ShardStore) Stats() types.Stats { } func (s *ShardStore) onWritterAbort(w *ShardWriter) { + s.lock.Lock() + defer s.lock.Unlock() + s.getLogger().Debugf("writting file %v aborted", w.path) s.removeTempFile(w.path) } func (s *ShardStore) onWritterFinish(w *ShardWriter, hash cdssdk.FileHash) (types.FileInfo, error) { + s.lock.Lock() + defer s.lock.Unlock() + log := s.getLogger() log.Debugf("write file %v finished, size: %v, hash: %v", w.path, w.size, hash) @@ -170,12 +195,20 @@ func (s *ShardStore) onWritterFinish(w *ShardWriter, hash cdssdk.FileHash) (type return types.FileInfo{}, fmt.Errorf("making block dir: %w", err) } - name := filepath.Join(blockDir, string(hash)) - err = os.Rename(w.path, name) - if err != nil { + newPath := filepath.Join(blockDir, string(hash)) + _, err = os.Stat(newPath) + if os.IsNotExist(err) { + err = os.Rename(w.path, newPath) + if err != nil { + s.removeTempFile(w.path) + log.Warnf("rename %v to %v: %v", w.path, newPath, err) + return types.FileInfo{}, fmt.Errorf("rename file: %w", err) + } + + } else if err != nil { s.removeTempFile(w.path) - log.Warnf("rename %v to %v: %v", w.path, name, err) - return types.FileInfo{}, fmt.Errorf("rename file: %w", err) + log.Warnf("get file %v stat: %v", newPath, err) + return types.FileInfo{}, fmt.Errorf("get file stat: %w", err) } return types.FileInfo{ diff --git a/coordinator/internal/mq/object.go b/coordinator/internal/mq/object.go index 5747409..af71966 100644 --- a/coordinator/internal/mq/object.go +++ b/coordinator/internal/mq/object.go @@ -167,7 +167,7 @@ func (svc *Service) UpdateObjectInfos(msg *coormq.UpdateObjectInfos) (*coormq.Up avaiUpdatings[i].ApplyTo(&newObjs[i]) } - err = svc.db2.Object().BatchUpsertByPackagePath(tx, newObjs) + err = svc.db2.Object().BatchUpdate(tx, newObjs) if err != nil { return fmt.Errorf("batch create or update: %w", err) } @@ -226,6 +226,7 @@ func (svc *Service) MoveObjects(msg *coormq.MoveObjects) (*coormq.MoveObjectsRes oldObjIDs[i] = obj.ObjectID } + // 找出仍在数据库的Object avaiMovings, notExistsObjs := pickByObjectIDs(msg.Movings, oldObjIDs, func(obj cdsapi.MovingObject) cdssdk.ObjectID { return obj.ObjectID }) if len(notExistsObjs) > 0 { // TODO 部分对象已经不存在 @@ -248,20 +249,20 @@ func (svc *Service) MoveObjects(msg *coormq.MoveObjects) (*coormq.MoveObjectsRes var newObjs []cdssdk.Object // 对于PackageID发生变化的对象,需要检查目标Package内是否存在同Path的对象 - ensuredObjs, err := svc.ensurePackageChangedObjects(tx, msg.UserID, pkgIDChangedObjs) + checkedObjs, err := svc.checkPackageChangedObjects(tx, msg.UserID, pkgIDChangedObjs) if err != nil { return err } - newObjs = append(newObjs, ensuredObjs...) + newObjs = append(newObjs, checkedObjs...) // 对于只有Path发生变化的对象,则检查同Package内有没有同Path的对象 - ensuredObjs, err = svc.ensurePathChangedObjects(tx, msg.UserID, pathChangedObjs) + checkedObjs, err = svc.checkPathChangedObjects(tx, msg.UserID, pathChangedObjs) if err != nil { return err } - newObjs = append(newObjs, ensuredObjs...) + newObjs = append(newObjs, checkedObjs...) - err = svc.db2.Object().BatchUpert(tx, newObjs) + err = svc.db2.Object().BatchUpdate(tx, newObjs) if err != nil { return fmt.Errorf("batch create or update: %w", err) } @@ -277,7 +278,7 @@ func (svc *Service) MoveObjects(msg *coormq.MoveObjects) (*coormq.MoveObjectsRes return mq.ReplyOK(coormq.RespMoveObjects(sucs)) } -func (svc *Service) ensurePackageChangedObjects(tx db2.SQLContext, userID cdssdk.UserID, objs []cdssdk.Object) ([]cdssdk.Object, error) { +func (svc *Service) checkPackageChangedObjects(tx db2.SQLContext, userID cdssdk.UserID, objs []cdssdk.Object) ([]cdssdk.Object, error) { if len(objs) == 0 { return nil, nil } @@ -338,7 +339,7 @@ func (svc *Service) ensurePackageChangedObjects(tx db2.SQLContext, userID cdssdk return willUpdateObjs, nil } -func (svc *Service) ensurePathChangedObjects(tx db2.SQLContext, userID cdssdk.UserID, objs []cdssdk.Object) ([]cdssdk.Object, error) { +func (svc *Service) checkPathChangedObjects(tx db2.SQLContext, userID cdssdk.UserID, objs []cdssdk.Object) ([]cdssdk.Object, error) { if len(objs) == 0 { return nil, nil } diff --git a/scanner/internal/event/check_package_redundancy.go b/scanner/internal/event/check_package_redundancy.go index d47040f..936c8e1 100644 --- a/scanner/internal/event/check_package_redundancy.go +++ b/scanner/internal/event/check_package_redundancy.go @@ -15,20 +15,15 @@ import ( stgmod "gitlink.org.cn/cloudream/storage/common/models" "gitlink.org.cn/cloudream/storage/common/pkgs/distlock/reqbuilder" "gitlink.org.cn/cloudream/storage/common/pkgs/ioswitch2" + "gitlink.org.cn/cloudream/storage/common/pkgs/ioswitch2/ops2" "gitlink.org.cn/cloudream/storage/common/pkgs/ioswitch2/parser" "gitlink.org.cn/cloudream/storage/common/pkgs/ioswitchlrc" - "gitlink.org.cn/cloudream/storage/common/pkgs/ioswitchlrc/ops2" lrcparser "gitlink.org.cn/cloudream/storage/common/pkgs/ioswitchlrc/parser" coormq "gitlink.org.cn/cloudream/storage/common/pkgs/mq/coordinator" scevt "gitlink.org.cn/cloudream/storage/common/pkgs/mq/scanner/event" "gitlink.org.cn/cloudream/storage/scanner/internal/config" ) -const ( - monthHours = 30 * 24 - yearHours = 365 * 24 -) - type CheckPackageRedundancy struct { *scevt.CheckPackageRedundancy }