diff --git a/client2/internal/mount/mount.go b/client2/internal/mount/mount.go index 79261be..0abfccc 100644 --- a/client2/internal/mount/mount.go +++ b/client2/internal/mount/mount.go @@ -46,7 +46,9 @@ func NewMount(cfg *config.Config, db *db2.DB, downloader *downloader.Downloader) func (m *Mount) Start() *sync2.UnboundChannel[MountEvent] { ch := sync2.NewUnboundChannel[MountEvent]() + m.vfs.Start() go func() { + defer m.vfs.Stop() nodeFsOpt := &fusefs.Options{ MountOptions: fuse.MountOptions{ diff --git a/client2/internal/mount/vfs/cache/cache.go b/client2/internal/mount/vfs/cache/cache.go index 8130a68..6b1ef4b 100644 --- a/client2/internal/mount/vfs/cache/cache.go +++ b/client2/internal/mount/vfs/cache/cache.go @@ -21,8 +21,6 @@ type CacheEntry interface { fuse.FsEntry // 在虚拟文件系统中的路径,即不包含缓存目录的路径 PathComps() []string - // 不再使用本缓存条目 - // Release() } type CacheEntryInfo struct { @@ -38,8 +36,10 @@ type Cache struct { downloader *downloader.Downloader cacheDataDir string cacheMetaDir string - activeCache *trie.Trie[*CacheFile] lock *sync.RWMutex + cacheDone chan any + activeCache *trie.Trie[*CacheFile] + freeCache []*CacheFile } func NewCache(db *db2.DB, downloader *downloader.Downloader, cacheDataDir, cacheMetaDir string) *Cache { @@ -48,11 +48,20 @@ func NewCache(db *db2.DB, downloader *downloader.Downloader, cacheDataDir, cache downloader: downloader, cacheDataDir: cacheDataDir, cacheMetaDir: cacheMetaDir, - activeCache: trie.NewTrie[*CacheFile](), lock: &sync.RWMutex{}, + cacheDone: make(chan any), + activeCache: trie.NewTrie[*CacheFile](), } } +func (c *Cache) Start() { + go c.clearFreeCache() +} + +func (c *Cache) Stop() { + close(c.cacheDone) +} + func (c *Cache) GetCacheDataPath(comps ...string) string { comps2 := make([]string, len(comps)+1) comps2[0] = c.cacheDataDir @@ -123,11 +132,14 @@ func (c *Cache) CreateFile(pathComps []string) *CacheFile { ch, err := createNewCacheFile(c, pathComps) if err != nil { - // TODO 日志记录 + logger.Warnf("create new cache file %v: %v", pathComps, err) return nil } + ch.refCount++ c.activeCache.CreateWords(pathComps).Value = ch + + logger.Debugf("create new cache file %v", pathComps) return ch } @@ -144,13 +156,16 @@ func (c *Cache) LoadFile(pathComps []string, obj *cdssdk.Object) *CacheFile { ch, err := loadCacheFile(c, pathComps) if err == nil { ch.remoteObj = obj + ch.refCount++ c.activeCache.CreateWords(pathComps).Value = ch + + logger.Debugf("load cache %v", pathComps) return ch } if !os.IsNotExist(err) { // TODO 日志记录 - logger.Tracef("load cache file: %v", err) + logger.Warnf("load cache %v: %v", pathComps, err) return nil } @@ -160,12 +175,14 @@ func (c *Cache) LoadFile(pathComps []string, obj *cdssdk.Object) *CacheFile { ch, err = newCacheFileFromObject(c, pathComps, obj) if err != nil { - // TODO 日志记录 - logger.Tracef("make cache file from object: %v", err) + logger.Warnf("create cache %v from object: %v", pathComps, err) return nil } + ch.refCount++ c.activeCache.CreateWords(pathComps).Value = ch + + logger.Debugf("create cache %v from object %v", pathComps, obj.ObjectID) return ch } @@ -176,7 +193,7 @@ func (c *Cache) CreateDir(pathComps []string) *CacheDir { ch, err := createNewCacheDir(c, pathComps) if err != nil { - // TODO 日志记录 + logger.Warnf("create cache dir: %v", err) return nil } return ch @@ -276,15 +293,19 @@ func (c *Cache) Remove(pathComps []string) error { if node.Value != nil { node.Value.Delete() + c.freeCache = lo2.Remove(c.freeCache, node.Value) } node.RemoveSelf(true) + + logger.Debugf("active cache %v removed", pathComps) return nil } metaPath := c.GetCacheMetaPath(pathComps...) err := os.Remove(metaPath) if err == nil || os.IsNotExist(err) { + logger.Debugf("local cache %v removed", pathComps) return nil } @@ -344,5 +365,37 @@ func (c *Cache) Move(pathComps []string, newPathComps []string) error { }) } + logger.Debugf("cache moved: %v -> %v", pathComps, newPathComps) return nil } + +func (c *Cache) clearFreeCache() { + ticker := time.NewTicker(time.Second * 5) + defer ticker.Stop() + + for { + + select { + case _, ok := <-c.cacheDone: + if !ok { + return + } + + case <-ticker.C: + } + + c.lock.Lock() + for i, ch := range c.freeCache { + if time.Since(ch.freeTime) > time.Second*30 { + ch.Free() + node, _ := c.activeCache.WalkEnd(ch.PathComps()) + node.RemoveSelf(true) + c.freeCache[i] = nil + + logger.Debugf("cache %v freed", ch.PathComps()) + } + } + c.freeCache = lo2.RemoveAllDefault(c.freeCache) + c.lock.Unlock() + } +} diff --git a/client2/internal/mount/vfs/cache/dir.go b/client2/internal/mount/vfs/cache/dir.go index 4467b2f..18c5987 100644 --- a/client2/internal/mount/vfs/cache/dir.go +++ b/client2/internal/mount/vfs/cache/dir.go @@ -7,6 +7,7 @@ import ( ) type CacheDir struct { + cache *Cache pathComps []string name string modTime time.Time @@ -32,6 +33,7 @@ func createNewCacheDir(c *Cache, pathComps []string) (*CacheDir, error) { os.Chtimes(metaPath, modTime, modTime) return &CacheDir{ + cache: c, pathComps: pathComps, name: pathComps[len(pathComps)-1], modTime: modTime, @@ -134,10 +136,6 @@ func (f *CacheDir) Info() CacheEntryInfo { } func (f *CacheDir) SetModTime(modTime time.Time) error { - // TODO 修改文件夹的修改时间 - return nil + metaPath := f.cache.GetCacheMetaPath(f.pathComps...) + return os.Chtimes(metaPath, modTime, modTime) } - -// func (f *CacheDir) Release() { - -// } diff --git a/client2/internal/mount/vfs/cache/file.go b/client2/internal/mount/vfs/cache/file.go index 56c9d2d..b7e8751 100644 --- a/client2/internal/mount/vfs/cache/file.go +++ b/client2/internal/mount/vfs/cache/file.go @@ -78,21 +78,26 @@ func (r *Range) End() int64 { // 所有读写过程共用同一个CacheFile对象。 // 不应该将此结构体保存到对象中 type CacheFile struct { - cache *Cache - pathComps []string - name string - info FileInfo - remoteObj *cdssdk.Object - infoRev int64 - rwLock *sync.RWMutex - readers []*CacheFileReadWriter - writers []*CacheFileReadWriter - savingMetaChan chan any - isDeleted bool + cache *Cache + pathComps []string + name string + info FileInfo + remoteObj *cdssdk.Object + infoRev int64 + rwLock *sync.RWMutex + readers []*CacheFileReadWriter + writers []*CacheFileReadWriter + saveMetaChan chan any + isDeleted bool + isFreed bool metaFile *os.File dataFile *os.File writeLock *sync.RWMutex + + // 下面的字段不受rwLock保护! + refCount int + freeTime time.Time } func createNewCacheFile(cache *Cache, pathComps []string) (*CacheFile, error) { @@ -128,18 +133,18 @@ func createNewCacheFile(cache *Cache, pathComps []string) (*CacheFile, error) { } ch := &CacheFile{ - cache: cache, - pathComps: pathComps, - name: pathComps[len(pathComps)-1], - info: info, - rwLock: &sync.RWMutex{}, - savingMetaChan: make(chan any, 1), - metaFile: metaFile, - dataFile: dataFile, - writeLock: &sync.RWMutex{}, + cache: cache, + pathComps: pathComps, + name: pathComps[len(pathComps)-1], + info: info, + rwLock: &sync.RWMutex{}, + saveMetaChan: make(chan any, 1), + metaFile: metaFile, + dataFile: dataFile, + writeLock: &sync.RWMutex{}, } - go ch.savingMeta() + go ch.serving() return ch, nil } @@ -169,18 +174,18 @@ func loadCacheFile(cache *Cache, pathComps []string) (*CacheFile, error) { } ch := &CacheFile{ - cache: cache, - pathComps: pathComps, - name: pathComps[len(pathComps)-1], - info: *info, - rwLock: &sync.RWMutex{}, - savingMetaChan: make(chan any, 1), - metaFile: metaFile, - dataFile: dataFile, - writeLock: &sync.RWMutex{}, + cache: cache, + pathComps: pathComps, + name: pathComps[len(pathComps)-1], + info: *info, + rwLock: &sync.RWMutex{}, + saveMetaChan: make(chan any, 1), + metaFile: metaFile, + dataFile: dataFile, + writeLock: &sync.RWMutex{}, } - go ch.savingMeta() + go ch.serving() return ch, nil } @@ -219,19 +224,19 @@ func newCacheFileFromObject(cache *Cache, pathComps []string, obj *cdssdk.Object } ch := &CacheFile{ - cache: cache, - pathComps: pathComps, - name: pathComps[len(pathComps)-1], - info: info, - remoteObj: obj, - rwLock: &sync.RWMutex{}, - savingMetaChan: make(chan any, 1), - metaFile: metaFile, - dataFile: dataFile, - writeLock: &sync.RWMutex{}, + cache: cache, + pathComps: pathComps, + name: pathComps[len(pathComps)-1], + info: info, + remoteObj: obj, + rwLock: &sync.RWMutex{}, + saveMetaChan: make(chan any, 1), + metaFile: metaFile, + dataFile: dataFile, + writeLock: &sync.RWMutex{}, } - go ch.savingMeta() + go ch.serving() return ch, nil } @@ -322,17 +327,23 @@ func (f *CacheFile) Move(newPathComps []string) { f.letSave() } -func (f *CacheFile) SetRemoteObject(obj *cdssdk.Object) { - f.remoteObj = obj -} - // 打开一个写入句柄,同时支持读取 func (f *CacheFile) Open(flags uint32) *CacheFileReadWriter { logger.Tracef("CacheFile.Open: %v, %#x", f.name, flags) + f.cache.lock.Lock() + f.rwLock.Lock() defer f.rwLock.Unlock() + f.refCount++ + if f.refCount == 1 && !f.isDeleted { + f.cache.freeCache = lo2.Remove(f.cache.freeCache, f) + } + + // 提前释放Cache的锁 + f.cache.lock.Unlock() + h := &CacheFileReadWriter{ file: f, remoteLock: &sync.Mutex{}, @@ -407,18 +418,45 @@ func (f *CacheFile) Truncate(size int64) error { } // 不再使用缓存文件 -// func (f *CacheFile) Release() { +func (f *CacheFile) Release() { + f.cache.lock.Lock() + defer f.cache.lock.Unlock() + + f.refCount-- + f.freeTime = time.Now() -// } + f.rwLock.RLock() + defer f.rwLock.RUnlock() + + if f.refCount == 0 && !f.isDeleted { + f.cache.freeCache = append(f.cache.freeCache, f) + } +} + +func (f *CacheFile) Free() { + f.rwLock.Lock() + defer f.rwLock.Unlock() + + if !f.isDeleted { + // TODO 日志 + f.saveMeta() + } -func (f *CacheFile) savingMeta() { + // 防止在关闭缓存后又保存了文件 + f.isFreed = true + f.metaFile.Close() + f.dataFile.Close() + close(f.saveMetaChan) +} + +func (f *CacheFile) serving() { savedInfoRev := int64(0) ticker := time.NewTicker(time.Second * 5) defer ticker.Stop() for { select { - case _, ok := <-f.savingMetaChan: + case _, ok := <-f.saveMetaChan: if !ok { return } @@ -434,35 +472,19 @@ func (f *CacheFile) savingMeta() { break } + // 如果缓存已经被释放,就不要再保存元数据了 + if f.isFreed { + f.rwLock.Unlock() + break + } + for { if f.infoRev == savedInfoRev { break } - jsonData, err := serder.ObjectToJSON(f.info) - if err != nil { - // TODO 日志 - break - } - - err = f.metaFile.Truncate(0) - if err != nil { - // TODO 日志 - break - } - - _, err = f.metaFile.Seek(0, io.SeekStart) - if err != nil { - // TODO 日志 - break - } - - err = io2.WriteAll(f.metaFile, jsonData) - if err != nil { - // TODO 日志 - break - } - + // TODO 错误日志 + f.saveMeta() f.metaFile.Sync() savedInfoRev = f.infoRev @@ -473,9 +495,33 @@ func (f *CacheFile) savingMeta() { } } +func (f *CacheFile) saveMeta() error { + jsonData, err := serder.ObjectToJSON(f.info) + if err != nil { + return err + } + + err = f.metaFile.Truncate(0) + if err != nil { + return err + } + + _, err = f.metaFile.Seek(0, io.SeekStart) + if err != nil { + return err + } + + err = io2.WriteAll(f.metaFile, jsonData) + if err != nil { + return err + } + + return nil +} + func (f *CacheFile) letSave() { select { - case f.savingMetaChan <- nil: + case f.saveMetaChan <- nil: default: } } @@ -630,12 +676,22 @@ func (f *CacheFileReadWriter) Close() error { f.remote.Close() } + f.file.cache.lock.Lock() + defer f.file.cache.lock.Unlock() + f.file.rwLock.Lock() + defer f.file.rwLock.Unlock() + + f.file.refCount-- + if f.file.refCount == 0 && !f.file.isDeleted { + f.file.cache.freeCache = append(f.file.cache.freeCache, f.file) + f.file.freeTime = time.Now() + } + if f.writeable { f.file.writers = lo2.Remove(f.file.writers, f) } else if f.readable { f.file.readers = lo2.Remove(f.file.readers, f) } - f.file.rwLock.Unlock() return nil } diff --git a/client2/internal/mount/vfs/fuse_bucket.go b/client2/internal/mount/vfs/fuse_bucket.go index 2c4a4cb..6dea9d9 100644 --- a/client2/internal/mount/vfs/fuse_bucket.go +++ b/client2/internal/mount/vfs/fuse_bucket.go @@ -181,6 +181,7 @@ func (r *FuseBucket) NewFile(ctx context.Context, name string, flags uint32) (fu if cache == nil { return nil, 0, fuse.ErrPermission } + defer cache.Release() // Open之后会给cache的引用计数额外+1,即使cache先于FileHandle被关闭, // 也有有FileHandle的计数保持cache的有效性 diff --git a/client2/internal/mount/vfs/fuse_dir.go b/client2/internal/mount/vfs/fuse_dir.go index 851bbb5..5eb5bfe 100644 --- a/client2/internal/mount/vfs/fuse_dir.go +++ b/client2/internal/mount/vfs/fuse_dir.go @@ -2,7 +2,6 @@ package vfs import ( "context" - "fmt" "os" "strings" "time" @@ -154,16 +153,10 @@ func (r *FuseDir) listChildren() ([]fuse.FsEntry, error) { objPath := cdssdk.JoinObjectPath(r.pathComps[2:]...) - coms, err := db.Object().GetCommonPrefixes(tx, pkg.PackageID, objPath+cdssdk.ObjectPathSeparator) + objs, coms, err := db.Object().GetByPrefixGrouped(tx, pkg.PackageID, objPath+cdssdk.ObjectPathSeparator) if err != nil { - return fmt.Errorf("getting common prefixes: %w", err) - } - - objs, err := db.Object().GetDirectChildren(tx, pkg.PackageID, objPath+cdssdk.ObjectPathSeparator) - if err != nil { - return fmt.Errorf("getting direct children: %w", err) + return err } - for _, dir := range coms { dir = strings.TrimSuffix(dir, cdssdk.ObjectPathSeparator) pathComps := lo2.AppendNew(r.pathComps, cdssdk.BaseName(dir)) @@ -218,6 +211,7 @@ func (r *FuseDir) NewFile(ctx context.Context, name string, flags uint32) (fuse. if cache == nil { return nil, 0, fuse.ErrPermission } + defer cache.Release() // Open之后会给cache的引用计数额外+1,即使cache先于FileHandle被关闭, // 也有有FileHandle的计数保持cache的有效性 diff --git a/client2/internal/mount/vfs/fuse_file.go b/client2/internal/mount/vfs/fuse_file.go index c71208d..912d4e1 100644 --- a/client2/internal/mount/vfs/fuse_file.go +++ b/client2/internal/mount/vfs/fuse_file.go @@ -63,6 +63,7 @@ func (n *FuseFileNode) Truncate(size uint64) error { if cacheFile == nil { return fuse.ErrNotExists } + defer cacheFile.Release() return cacheFile.Truncate(int64(size)) } @@ -72,6 +73,7 @@ func (n *FuseFileNode) SetModTime(time time.Time) error { if cacheFile == nil { return fuse.ErrNotExists } + defer cacheFile.Release() return cacheFile.SetModTime(time) } @@ -82,6 +84,7 @@ func (n *FuseFileNode) Open(flags uint32) (fuse.FileHandle, uint32, error) { // 如果文件不存在,也不进行创建,因为创建不应该调用这个接口 return nil, 0, fuse.ErrNotExists } + defer cacheFile.Release() hd := cacheFile.Open(flags) return newFileHandle(n, hd), flags, nil diff --git a/client2/internal/mount/vfs/fuse_package.go b/client2/internal/mount/vfs/fuse_package.go index 2fa67f9..b574893 100644 --- a/client2/internal/mount/vfs/fuse_package.go +++ b/client2/internal/mount/vfs/fuse_package.go @@ -2,7 +2,6 @@ package vfs import ( "context" - "fmt" "os" "strings" "time" @@ -154,14 +153,9 @@ func (r *FusePackage) listChildren() ([]fuse.FsEntry, error) { return err } - coms, err := db.Object().GetCommonPrefixes(tx, pkg.PackageID, "") + objs, coms, err := db.Object().GetByPrefixGrouped(tx, pkg.PackageID, "") if err != nil { - return fmt.Errorf("getting common prefixes: %w", err) - } - - objs, err := db.Object().GetDirectChildren(tx, pkg.PackageID, "") - if err != nil { - return fmt.Errorf("getting direct children: %w", err) + return err } for _, dir := range coms { @@ -216,6 +210,7 @@ func (r *FusePackage) NewFile(ctx context.Context, name string, flags uint32) (f if cache == nil { return nil, 0, fuse.ErrPermission } + defer cache.Release() // Open之后会给cache的引用计数额外+1,即使cache先于FileHandle被关闭, // 也有有FileHandle的计数保持cache的有效性 diff --git a/client2/internal/mount/vfs/fuse_root.go b/client2/internal/mount/vfs/fuse_root.go index a772f85..649fd75 100644 --- a/client2/internal/mount/vfs/fuse_root.go +++ b/client2/internal/mount/vfs/fuse_root.go @@ -155,6 +155,7 @@ func (r *FuseRoot) NewFile(ctx context.Context, name string, flags uint32) (fuse if cache == nil { return nil, 0, fuse.ErrPermission } + defer cache.Release() // Open之后会给cache的引用计数额外+1,即使cache先于FileHandle被关闭, // 也有有FileHandle的计数保持cache的有效性 diff --git a/client2/internal/mount/vfs/vfs.go b/client2/internal/mount/vfs/vfs.go index ff84861..cf385bb 100644 --- a/client2/internal/mount/vfs/vfs.go +++ b/client2/internal/mount/vfs/vfs.go @@ -24,6 +24,14 @@ func NewVfs(cfg *config.Config, db *db2.DB, downloader *downloader.Downloader) * } } +func (v *Vfs) Start() { + v.cache.Start() +} + +func (v *Vfs) Stop() { + v.cache.Stop() +} + func (v *Vfs) Root() fuse.FsDir { return newRoot(v) } diff --git a/common/pkgs/db2/object.go b/common/pkgs/db2/object.go index 8c6b2d9..d97f264 100644 --- a/common/pkgs/db2/object.go +++ b/common/pkgs/db2/object.go @@ -56,6 +56,43 @@ func (db *ObjectDB) GetWithPathPrefixPaged(ctx SQLContext, packageID cdssdk.Pack return ret, err } +func (db *ObjectDB) GetByPrefixGrouped(ctx SQLContext, packageID cdssdk.PackageID, pathPrefix string) (objs []cdssdk.Object, commonPrefixes []string, err error) { + type ObjectOrDir struct { + cdssdk.Object + IsObject bool `gorm:"IsObject"` + Prefix string `gorm:"Prefix"` + } + + sepCnt := strings.Count(pathPrefix, cdssdk.ObjectPathSeparator) + 1 + + prefixStatm := fmt.Sprintf("Substring_Index(Path, '%s', %d)", cdssdk.ObjectPathSeparator, sepCnt) + grouping := ctx.Table("Object"). + Select(fmt.Sprintf("%s as Prefix, Max(ObjectID) as ObjectID, %s = Path as IsObject", prefixStatm, prefixStatm)). + Where("PackageID = ?", packageID). + Where("Path like ?", pathPrefix+"%"). + Group("Prefix, IsObject"). + Order("Prefix ASC") + + var ret []ObjectOrDir + err = ctx.Table("Object"). + Select("Grouped.IsObject, Grouped.Prefix, Object.*"). + Joins("right join (?) as Grouped on Object.ObjectID = Grouped.ObjectID and Grouped.IsObject = 1", grouping). + Find(&ret).Error + if err != nil { + return + } + + for _, o := range ret { + if o.IsObject { + objs = append(objs, o.Object) + } else { + commonPrefixes = append(commonPrefixes, o.Prefix+cdssdk.ObjectPathSeparator) + } + } + + return +} + func (db *ObjectDB) GetByPrefixGroupedPaged(ctx SQLContext, packageID cdssdk.PackageID, pathPrefix string, startPath string, limit int) (objs []cdssdk.Object, commonPrefixes []string, nextStartPath string, err error) { type ObjectOrDir struct { cdssdk.Object