From 97979df7a4e5ccd6befcd79fdd83a7de5c96ee0f Mon Sep 17 00:00:00 2001 From: Sydonian <794346190@qq.com> Date: Wed, 20 Aug 2025 16:17:47 +0800 Subject: [PATCH] =?UTF-8?q?=E7=BB=9F=E8=AE=A1=E5=88=86=E7=89=87=E6=95=B0?= =?UTF-8?q?=E9=87=8F=E5=92=8C=E6=80=BB=E5=A4=A7=E5=B0=8F?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- common/pkgs/storage/local/dir_reader.go | 9 -- common/pkgs/storage/local/shard_store.go | 87 ++++++++++++++--- common/pkgs/storage/rclone/shard_store.go | 59 ++++++++++-- common/pkgs/storage/s3/shard_store.go | 111 +++++++++++++++++++--- hub/internal/pubshards/pub_shards.go | 12 ++- 5 files changed, 236 insertions(+), 42 deletions(-) diff --git a/common/pkgs/storage/local/dir_reader.go b/common/pkgs/storage/local/dir_reader.go index 53999b7..0a74a2e 100644 --- a/common/pkgs/storage/local/dir_reader.go +++ b/common/pkgs/storage/local/dir_reader.go @@ -106,12 +106,3 @@ type dirEntry struct { dir jcstypes.JPath entry os.DirEntry } - -type fileInfoDirEntry struct { - info os.FileInfo -} - -func (d fileInfoDirEntry) Name() string { return d.info.Name() } -func (d fileInfoDirEntry) IsDir() bool { return d.info.IsDir() } -func (d fileInfoDirEntry) Type() os.FileMode { return d.info.Mode().Type() } -func (d fileInfoDirEntry) Info() (os.FileInfo, error) { return d.info, nil } diff --git a/common/pkgs/storage/local/shard_store.go b/common/pkgs/storage/local/shard_store.go index 29599ec..564169b 100644 --- a/common/pkgs/storage/local/shard_store.go +++ b/common/pkgs/storage/local/shard_store.go @@ -7,18 +7,22 @@ import ( "os" "path/filepath" "sync" + "time" + "github.com/inhies/go-bytesize" "gitlink.org.cn/cloudream/common/pkgs/logger" stgtypes "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/storage/types" jcstypes "gitlink.org.cn/cloudream/jcs-pub/common/types" ) type ShardStore struct { - detail *jcstypes.UserSpaceDetail - stgRoot string - storeAbsRoot string - lock sync.Mutex - done chan any + detail *jcstypes.UserSpaceDetail + stgRoot string + storeAbsRoot string + lock sync.Mutex + done chan any + totalShardsCnt int64 + totalShardsSize int64 } func NewShardStore(root string, detail *jcstypes.UserSpaceDetail) (*ShardStore, error) { @@ -36,7 +40,10 @@ func NewShardStore(root string, detail *jcstypes.UserSpaceDetail) (*ShardStore, } func (s *ShardStore) Start(ch *stgtypes.StorageEventChan) { - s.getLogger().Infof("component start, root: %v, max size: %v", s.storeAbsRoot, s.detail.UserSpace.ShardStore.MaxSize) + s.updateStats() + s.getLogger().Infof("component start, root: %v, max size: %v, shards count: %v, total size: %v", + s.storeAbsRoot, s.detail.UserSpace.ShardStore.MaxSize, s.totalShardsCnt, s.totalShardsSize, + ) } func (s *ShardStore) Stop() { @@ -69,6 +76,9 @@ func (s *ShardStore) Store(path jcstypes.JPath, hash jcstypes.FileHash, size int return stgtypes.FileInfo{}, fmt.Errorf("rename file: %w", err) } + s.totalShardsCnt++ + s.totalShardsSize += size + } else if err != nil { log.Warnf("get file %v stat: %v", newPath, err) return stgtypes.FileInfo{}, fmt.Errorf("get file stat: %w", err) @@ -138,6 +148,8 @@ func (s *ShardStore) ListAll() ([]stgtypes.FileInfo, error) { } func (s *ShardStore) GC(avaiables []jcstypes.FileHash) error { + startTime := time.Now() + s.lock.Lock() defer s.lock.Unlock() @@ -146,7 +158,8 @@ func (s *ShardStore) GC(avaiables []jcstypes.FileHash) error { avais[hash] = true } - cnt := 0 + totalCnt := 0 + totalSize := int64(0) err := filepath.WalkDir(s.storeAbsRoot, func(path string, d fs.DirEntry, err error) error { if err != nil { @@ -172,7 +185,10 @@ func (s *ShardStore) GC(avaiables []jcstypes.FileHash) error { if err != nil { s.getLogger().Warnf("remove file %v: %v", path, err) } else { - cnt++ + totalCnt++ + totalSize += info.Size() + s.totalShardsCnt-- + s.totalShardsSize -= info.Size() } } @@ -182,16 +198,24 @@ func (s *ShardStore) GC(avaiables []jcstypes.FileHash) error { return err } - s.getLogger().Infof("purge %d files", cnt) + s.getLogger().Infof( + "gc %v(size: %v), time: %v, remains: %v(size: %v)", + totalCnt, bytesize.ByteSize(totalSize), time.Since(startTime), s.totalShardsCnt, bytesize.ByteSize(s.totalShardsSize), + ) // TODO 无法保证原子性,所以删除失败只打日志 return nil } func (s *ShardStore) Stats() stgtypes.Stats { - // TODO 统计本地存储的相关信息 + s.lock.Lock() + defer s.lock.Unlock() + return stgtypes.Stats{ - Status: stgtypes.StatusOK, + Status: stgtypes.StatusOK, + FileCount: s.totalShardsCnt, + TotalSize: s.detail.UserSpace.ShardStore.MaxSize, + UsedSize: s.totalShardsSize, } } @@ -210,3 +234,44 @@ func (s *ShardStore) getFilePathFromHash(hash jcstypes.FileHash) string { func (s *ShardStore) getJPathFromHash(hash jcstypes.FileHash) jcstypes.JPath { return s.detail.UserSpace.WorkingDir.PushNew(stgtypes.ShardStoreWorkingDir, hash.GetHashPrefix(2), string(hash)) } + +func (s *ShardStore) updateStats() { + log := s.getLogger() + + totalCnt := int64(0) + totalSize := int64(0) + + err := filepath.WalkDir(s.storeAbsRoot, func(path string, d fs.DirEntry, err error) error { + if err != nil { + return err + } + + if d.IsDir() { + return nil + } + + info, err := d.Info() + if err != nil { + return err + } + + _, err = jcstypes.ParseHash(filepath.Base(info.Name())) + if err != nil { + return nil + } + + totalCnt++ + totalSize += info.Size() + return nil + }) + if err != nil && !errors.Is(err, os.ErrNotExist) { + log.Warnf("walk dir %v: %v", s.storeAbsRoot, err) + return + } + + s.lock.Lock() + defer s.lock.Unlock() + + s.totalShardsCnt = totalCnt + s.totalShardsSize = totalSize +} diff --git a/common/pkgs/storage/rclone/shard_store.go b/common/pkgs/storage/rclone/shard_store.go index 2dbc32a..64d00ce 100644 --- a/common/pkgs/storage/rclone/shard_store.go +++ b/common/pkgs/storage/rclone/shard_store.go @@ -16,9 +16,11 @@ import ( ) type ShardStore struct { - Fs fs.Fs - Detail *jcstypes.UserSpaceDetail - lock sync.Mutex + Fs fs.Fs + Detail *jcstypes.UserSpaceDetail + lock sync.Mutex + totalShardsCnt int64 + totalShardsSize int64 } func NewShardStore(fs fs.Fs, detail *jcstypes.UserSpaceDetail) *ShardStore { @@ -29,11 +31,12 @@ func NewShardStore(fs fs.Fs, detail *jcstypes.UserSpaceDetail) *ShardStore { } func (s *ShardStore) Start(ch *stgtypes.StorageEventChan) { - s.getLogger().Infof("component start, max size: %v", s.Detail.UserSpace.ShardStore.MaxSize) + s.updateStats() + s.getLogger().Infof("start, max size: %v, shards count: %v, total size: %v", s.Detail.UserSpace.ShardStore.MaxSize, s.totalShardsCnt, s.totalShardsSize) } func (s *ShardStore) Stop() { - s.getLogger().Infof("component stop") + s.getLogger().Infof("stop") } func (s *ShardStore) Store(path jcstypes.JPath, hash jcstypes.FileHash, size int64) (stgtypes.FileInfo, error) { @@ -64,6 +67,9 @@ func (s *ShardStore) Store(path jcstypes.JPath, hash jcstypes.FileHash, size int return stgtypes.FileInfo{}, fmt.Errorf("rename file: %w", err) } + s.totalShardsCnt++ + s.totalShardsSize += size + } else if err != nil { log.Warnf("get file %v stat: %v", newPath, err) return stgtypes.FileInfo{}, fmt.Errorf("get file stat: %w", err) @@ -170,12 +176,17 @@ func (s *ShardStore) GC(avaiables []jcstypes.FileHash) error { } else { cnt += 1 size += obj.Size() + s.totalShardsCnt-- + s.totalShardsSize -= obj.Size() } } return nil }) - s.getLogger().Infof("gc remove %d files, size: %v, time: %v", cnt, bytesize.ByteSize(size), time.Since(startTime)) + s.getLogger().Infof( + "gc %v(size: %v), time: %v, remains: %v(size: %v)", + cnt, bytesize.ByteSize(size), time.Since(startTime), s.totalShardsCnt, bytesize.ByteSize(s.totalShardsSize), + ) if err != nil { return err @@ -185,9 +196,14 @@ func (s *ShardStore) GC(avaiables []jcstypes.FileHash) error { } func (s *ShardStore) Stats() stgtypes.Stats { - // TODO 统计本地存储的相关信息 + s.lock.Lock() + defer s.lock.Unlock() + return stgtypes.Stats{ - Status: stgtypes.StatusOK, + Status: stgtypes.StatusOK, + FileCount: s.totalShardsCnt, + TotalSize: s.Detail.UserSpace.ShardStore.MaxSize, + UsedSize: s.totalShardsSize, } } @@ -245,6 +261,33 @@ func (s *ShardStore) listAllCallback(callback func(fs.Object) error) error { return nil } +func (s *ShardStore) updateStats() { + log := s.getLogger() + + totalCnt := int64(0) + totalSize := int64(0) + + err := s.listAllCallback(func(obj fs.Object) error { + _, err := jcstypes.ParseHash(BaseName(obj.Remote())) + if err != nil { + return nil + } + totalCnt++ + totalSize += obj.Size() + return nil + }) + if err != nil { + log.Warnf("update stats: %v", err) + return + } + + s.lock.Lock() + defer s.lock.Unlock() + + s.totalShardsCnt = totalCnt + s.totalShardsSize = totalSize +} + func (s *ShardStore) getLogger() logger.Logger { return logger.WithField("ShardStore", "Local").WithField("Storage", s.Detail.UserSpace.Storage.String()) } diff --git a/common/pkgs/storage/s3/shard_store.go b/common/pkgs/storage/s3/shard_store.go index 8c61d35..713b18c 100644 --- a/common/pkgs/storage/s3/shard_store.go +++ b/common/pkgs/storage/s3/shard_store.go @@ -2,11 +2,14 @@ package s3 import ( "context" + "errors" "sync" + "time" "github.com/aws/aws-sdk-go-v2/aws" "github.com/aws/aws-sdk-go-v2/service/s3" s3types "github.com/aws/aws-sdk-go-v2/service/s3/types" + "github.com/inhies/go-bytesize" "gitlink.org.cn/cloudream/common/pkgs/logger" "gitlink.org.cn/cloudream/common/utils/math2" stgtypes "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/storage/types" @@ -18,12 +21,14 @@ type ShardStoreOption struct { } type ShardStore struct { - Detail *jcstypes.UserSpaceDetail - Bucket string - workingDir jcstypes.JPath - cli *s3.Client - opt ShardStoreOption - lock sync.Mutex + Detail *jcstypes.UserSpaceDetail + Bucket string + workingDir jcstypes.JPath + cli *s3.Client + opt ShardStoreOption + lock sync.Mutex + totalShardsCnt int64 + totalShardsSize int64 } func NewShardStore(detail *jcstypes.UserSpaceDetail, cli *s3.Client, bkt string, opt ShardStoreOption) (*ShardStore, error) { @@ -39,7 +44,11 @@ func NewShardStore(detail *jcstypes.UserSpaceDetail, cli *s3.Client, bkt string, } func (s *ShardStore) Start(ch *stgtypes.StorageEventChan) { - s.getLogger().Infof("start, root: %v", s.workingDir) + s.updateStats() + s.getLogger().Infof( + "start, root: %v, shards count: %v, total size: %v", + s.workingDir, s.totalShardsCnt, s.totalShardsSize, + ) } func (s *ShardStore) Stop() { @@ -56,7 +65,19 @@ func (s *ShardStore) Store(path jcstypes.JPath, hash jcstypes.FileHash, size int newPath := s.GetFilePathFromHash(hash) - _, err := s.cli.CopyObject(context.Background(), &s3.CopyObjectInput{ + isNewShard := false + _, err := s.cli.HeadObject(context.Background(), &s3.HeadObjectInput{ + Bucket: aws.String(s.Bucket), + Key: aws.String(newPath.String()), + }) + if err != nil { + var e *s3types.NotFound + if errors.As(err, &e) { + isNewShard = true + } + } + + _, err = s.cli.CopyObject(context.Background(), &s3.CopyObjectInput{ Bucket: aws.String(s.Bucket), CopySource: aws.String(JoinKey(s.Bucket, path.String())), Key: aws.String(newPath.String()), @@ -66,6 +87,11 @@ func (s *ShardStore) Store(path jcstypes.JPath, hash jcstypes.FileHash, size int return stgtypes.FileInfo{}, err } + if isNewShard { + s.totalShardsCnt++ + s.totalShardsSize += size + } + return stgtypes.FileInfo{ Hash: hash, Size: size, @@ -139,6 +165,8 @@ func (s *ShardStore) ListAll() ([]stgtypes.FileInfo, error) { } func (s *ShardStore) GC(avaiables []jcstypes.FileHash) error { + startTime := time.Now() + s.lock.Lock() defer s.lock.Unlock() @@ -170,7 +198,8 @@ func (s *ShardStore) GC(avaiables []jcstypes.FileHash) error { if !avais[fileHash] { deletes = append(deletes, s3types.ObjectIdentifier{ - Key: obj.Key, + Key: obj.Key, + Size: obj.Size, }) } } @@ -183,6 +212,7 @@ func (s *ShardStore) GC(avaiables []jcstypes.FileHash) error { } totalCnt := len(deletes) + totalSize := int64(0) for len(deletes) > 0 { cnt := math2.Min(500, len(deletes)) @@ -197,18 +227,32 @@ func (s *ShardStore) GC(avaiables []jcstypes.FileHash) error { return err } + s.totalShardsCnt -= int64(cnt) + for i := 0; i < cnt; i++ { + s.totalShardsSize -= *deletes[i].Size + totalSize += *deletes[i].Size + } + deletes = deletes[cnt:] } - s.getLogger().Infof("purge %d files", totalCnt) + s.getLogger().Infof( + "gc %v(size: %v), time: %v, remains: %v(size: %v)", + totalCnt, bytesize.ByteSize(totalSize), time.Since(startTime), s.totalShardsCnt, bytesize.ByteSize(s.totalShardsSize), + ) // TODO 无法保证原子性,所以删除失败只打日志 return nil } func (s *ShardStore) Stats() stgtypes.Stats { - // TODO 统计本地存储的相关信息 + s.lock.Lock() + defer s.lock.Unlock() + return stgtypes.Stats{ - Status: stgtypes.StatusOK, + Status: stgtypes.StatusOK, + FileCount: s.totalShardsCnt, + TotalSize: s.Detail.UserSpace.ShardStore.MaxSize, + UsedSize: s.totalShardsSize, } } @@ -216,6 +260,49 @@ func (s *ShardStore) getLogger() logger.Logger { return logger.WithField("ShardStore", "S3").WithField("UserSpace", s.Detail) } +func (s *ShardStore) updateStats() { + totalCnt := int64(0) + totalSize := int64(0) + + var marker *string + for { + resp, err := s.cli.ListObjects(context.Background(), &s3.ListObjectsInput{ + Bucket: aws.String(s.Bucket), + Prefix: aws.String(s.workingDir.String()), + Marker: marker, + }) + + if err != nil { + s.getLogger().Warnf("list objects: %v", err) + return + } + + for _, obj := range resp.Contents { + key := BaseKey(*obj.Key) + + _, err := jcstypes.ParseHash(key) + if err != nil { + continue + } + + totalCnt++ + totalSize += *obj.Size + } + + if !*resp.IsTruncated { + break + } + + marker = resp.NextMarker + } + + s.lock.Lock() + defer s.lock.Unlock() + + s.totalShardsCnt = totalCnt + s.totalShardsSize = totalSize +} + func (s *ShardStore) GetFileDirFromHash(hash jcstypes.FileHash) jcstypes.JPath { p := s.workingDir.Clone() p.Push(hash.GetHashPrefix(2)) diff --git a/hub/internal/pubshards/pub_shards.go b/hub/internal/pubshards/pub_shards.go index e6eff8a..4a7e56a 100644 --- a/hub/internal/pubshards/pub_shards.go +++ b/hub/internal/pubshards/pub_shards.go @@ -91,8 +91,16 @@ func (s *LoadedStore) GC(userID jcstypes.UserID, fileHashes []jcstypes.FileHash) } func (s *LoadedStore) GetUserStats(userID jcstypes.UserID) stgtypes.Stats { - // TODO 实现 - return stgtypes.Stats{} + cnt := int64(0) + size := int64(0) + s.ClientFileHashDB.Table("Shard").Select("Count(Hash), Sum(Size)").Find(&cnt) + + return stgtypes.Stats{ + Status: stgtypes.StatusOK, + FileCount: cnt, + TotalSize: s.Config.ShardStore.MaxSize, + UsedSize: size, + } } func (s *LoadedStore) CreateRefs(userID jcstypes.UserID, refs []jcstypes.FileHash) ([]jcstypes.FileHash, error) {