Browse Source

缓存增加定时释放机制

gitlink
Sydonian 7 months ago
parent
commit
d5e4e06751
11 changed files with 256 additions and 108 deletions
  1. +2
    -0
      client2/internal/mount/mount.go
  2. +62
    -9
      client2/internal/mount/vfs/cache/cache.go
  3. +4
    -6
      client2/internal/mount/vfs/cache/dir.go
  4. +132
    -76
      client2/internal/mount/vfs/cache/file.go
  5. +1
    -0
      client2/internal/mount/vfs/fuse_bucket.go
  6. +3
    -9
      client2/internal/mount/vfs/fuse_dir.go
  7. +3
    -0
      client2/internal/mount/vfs/fuse_file.go
  8. +3
    -8
      client2/internal/mount/vfs/fuse_package.go
  9. +1
    -0
      client2/internal/mount/vfs/fuse_root.go
  10. +8
    -0
      client2/internal/mount/vfs/vfs.go
  11. +37
    -0
      common/pkgs/db2/object.go

+ 2
- 0
client2/internal/mount/mount.go View File

@@ -46,7 +46,9 @@ func NewMount(cfg *config.Config, db *db2.DB, downloader *downloader.Downloader)

func (m *Mount) Start() *sync2.UnboundChannel[MountEvent] {
ch := sync2.NewUnboundChannel[MountEvent]()
m.vfs.Start()
go func() {
defer m.vfs.Stop()

nodeFsOpt := &fusefs.Options{
MountOptions: fuse.MountOptions{


+ 62
- 9
client2/internal/mount/vfs/cache/cache.go View File

@@ -21,8 +21,6 @@ type CacheEntry interface {
fuse.FsEntry
// 在虚拟文件系统中的路径,即不包含缓存目录的路径
PathComps() []string
// 不再使用本缓存条目
// Release()
}

type CacheEntryInfo struct {
@@ -38,8 +36,10 @@ type Cache struct {
downloader *downloader.Downloader
cacheDataDir string
cacheMetaDir string
activeCache *trie.Trie[*CacheFile]
lock *sync.RWMutex
cacheDone chan any
activeCache *trie.Trie[*CacheFile]
freeCache []*CacheFile
}

func NewCache(db *db2.DB, downloader *downloader.Downloader, cacheDataDir, cacheMetaDir string) *Cache {
@@ -48,11 +48,20 @@ func NewCache(db *db2.DB, downloader *downloader.Downloader, cacheDataDir, cache
downloader: downloader,
cacheDataDir: cacheDataDir,
cacheMetaDir: cacheMetaDir,
activeCache: trie.NewTrie[*CacheFile](),
lock: &sync.RWMutex{},
cacheDone: make(chan any),
activeCache: trie.NewTrie[*CacheFile](),
}
}

func (c *Cache) Start() {
go c.clearFreeCache()
}

func (c *Cache) Stop() {
close(c.cacheDone)
}

func (c *Cache) GetCacheDataPath(comps ...string) string {
comps2 := make([]string, len(comps)+1)
comps2[0] = c.cacheDataDir
@@ -123,11 +132,14 @@ func (c *Cache) CreateFile(pathComps []string) *CacheFile {

ch, err := createNewCacheFile(c, pathComps)
if err != nil {
// TODO 日志记录
logger.Warnf("create new cache file %v: %v", pathComps, err)
return nil
}

ch.refCount++
c.activeCache.CreateWords(pathComps).Value = ch

logger.Debugf("create new cache file %v", pathComps)
return ch
}

@@ -144,13 +156,16 @@ func (c *Cache) LoadFile(pathComps []string, obj *cdssdk.Object) *CacheFile {
ch, err := loadCacheFile(c, pathComps)
if err == nil {
ch.remoteObj = obj
ch.refCount++
c.activeCache.CreateWords(pathComps).Value = ch

logger.Debugf("load cache %v", pathComps)
return ch
}

if !os.IsNotExist(err) {
// TODO 日志记录
logger.Tracef("load cache file: %v", err)
logger.Warnf("load cache %v: %v", pathComps, err)
return nil
}

@@ -160,12 +175,14 @@ func (c *Cache) LoadFile(pathComps []string, obj *cdssdk.Object) *CacheFile {

ch, err = newCacheFileFromObject(c, pathComps, obj)
if err != nil {
// TODO 日志记录
logger.Tracef("make cache file from object: %v", err)
logger.Warnf("create cache %v from object: %v", pathComps, err)
return nil
}

ch.refCount++
c.activeCache.CreateWords(pathComps).Value = ch

logger.Debugf("create cache %v from object %v", pathComps, obj.ObjectID)
return ch
}

@@ -176,7 +193,7 @@ func (c *Cache) CreateDir(pathComps []string) *CacheDir {

ch, err := createNewCacheDir(c, pathComps)
if err != nil {
// TODO 日志记录
logger.Warnf("create cache dir: %v", err)
return nil
}
return ch
@@ -276,15 +293,19 @@ func (c *Cache) Remove(pathComps []string) error {

if node.Value != nil {
node.Value.Delete()
c.freeCache = lo2.Remove(c.freeCache, node.Value)
}

node.RemoveSelf(true)

logger.Debugf("active cache %v removed", pathComps)
return nil
}

metaPath := c.GetCacheMetaPath(pathComps...)
err := os.Remove(metaPath)
if err == nil || os.IsNotExist(err) {
logger.Debugf("local cache %v removed", pathComps)
return nil
}

@@ -344,5 +365,37 @@ func (c *Cache) Move(pathComps []string, newPathComps []string) error {
})
}

logger.Debugf("cache moved: %v -> %v", pathComps, newPathComps)
return nil
}

func (c *Cache) clearFreeCache() {
ticker := time.NewTicker(time.Second * 5)
defer ticker.Stop()

for {

select {
case _, ok := <-c.cacheDone:
if !ok {
return
}

case <-ticker.C:
}

c.lock.Lock()
for i, ch := range c.freeCache {
if time.Since(ch.freeTime) > time.Second*30 {
ch.Free()
node, _ := c.activeCache.WalkEnd(ch.PathComps())
node.RemoveSelf(true)
c.freeCache[i] = nil

logger.Debugf("cache %v freed", ch.PathComps())
}
}
c.freeCache = lo2.RemoveAllDefault(c.freeCache)
c.lock.Unlock()
}
}

+ 4
- 6
client2/internal/mount/vfs/cache/dir.go View File

@@ -7,6 +7,7 @@ import (
)

type CacheDir struct {
cache *Cache
pathComps []string
name string
modTime time.Time
@@ -32,6 +33,7 @@ func createNewCacheDir(c *Cache, pathComps []string) (*CacheDir, error) {
os.Chtimes(metaPath, modTime, modTime)

return &CacheDir{
cache: c,
pathComps: pathComps,
name: pathComps[len(pathComps)-1],
modTime: modTime,
@@ -134,10 +136,6 @@ func (f *CacheDir) Info() CacheEntryInfo {
}

func (f *CacheDir) SetModTime(modTime time.Time) error {
// TODO 修改文件夹的修改时间
return nil
metaPath := f.cache.GetCacheMetaPath(f.pathComps...)
return os.Chtimes(metaPath, modTime, modTime)
}

// func (f *CacheDir) Release() {

// }

+ 132
- 76
client2/internal/mount/vfs/cache/file.go View File

@@ -78,21 +78,26 @@ func (r *Range) End() int64 {
// 所有读写过程共用同一个CacheFile对象。
// 不应该将此结构体保存到对象中
type CacheFile struct {
cache *Cache
pathComps []string
name string
info FileInfo
remoteObj *cdssdk.Object
infoRev int64
rwLock *sync.RWMutex
readers []*CacheFileReadWriter
writers []*CacheFileReadWriter
savingMetaChan chan any
isDeleted bool
cache *Cache
pathComps []string
name string
info FileInfo
remoteObj *cdssdk.Object
infoRev int64
rwLock *sync.RWMutex
readers []*CacheFileReadWriter
writers []*CacheFileReadWriter
saveMetaChan chan any
isDeleted bool
isFreed bool

metaFile *os.File
dataFile *os.File
writeLock *sync.RWMutex

// 下面的字段不受rwLock保护!
refCount int
freeTime time.Time
}

func createNewCacheFile(cache *Cache, pathComps []string) (*CacheFile, error) {
@@ -128,18 +133,18 @@ func createNewCacheFile(cache *Cache, pathComps []string) (*CacheFile, error) {
}

ch := &CacheFile{
cache: cache,
pathComps: pathComps,
name: pathComps[len(pathComps)-1],
info: info,
rwLock: &sync.RWMutex{},
savingMetaChan: make(chan any, 1),
metaFile: metaFile,
dataFile: dataFile,
writeLock: &sync.RWMutex{},
cache: cache,
pathComps: pathComps,
name: pathComps[len(pathComps)-1],
info: info,
rwLock: &sync.RWMutex{},
saveMetaChan: make(chan any, 1),
metaFile: metaFile,
dataFile: dataFile,
writeLock: &sync.RWMutex{},
}

go ch.savingMeta()
go ch.serving()

return ch, nil
}
@@ -169,18 +174,18 @@ func loadCacheFile(cache *Cache, pathComps []string) (*CacheFile, error) {
}

ch := &CacheFile{
cache: cache,
pathComps: pathComps,
name: pathComps[len(pathComps)-1],
info: *info,
rwLock: &sync.RWMutex{},
savingMetaChan: make(chan any, 1),
metaFile: metaFile,
dataFile: dataFile,
writeLock: &sync.RWMutex{},
cache: cache,
pathComps: pathComps,
name: pathComps[len(pathComps)-1],
info: *info,
rwLock: &sync.RWMutex{},
saveMetaChan: make(chan any, 1),
metaFile: metaFile,
dataFile: dataFile,
writeLock: &sync.RWMutex{},
}

go ch.savingMeta()
go ch.serving()

return ch, nil
}
@@ -219,19 +224,19 @@ func newCacheFileFromObject(cache *Cache, pathComps []string, obj *cdssdk.Object
}

ch := &CacheFile{
cache: cache,
pathComps: pathComps,
name: pathComps[len(pathComps)-1],
info: info,
remoteObj: obj,
rwLock: &sync.RWMutex{},
savingMetaChan: make(chan any, 1),
metaFile: metaFile,
dataFile: dataFile,
writeLock: &sync.RWMutex{},
cache: cache,
pathComps: pathComps,
name: pathComps[len(pathComps)-1],
info: info,
remoteObj: obj,
rwLock: &sync.RWMutex{},
saveMetaChan: make(chan any, 1),
metaFile: metaFile,
dataFile: dataFile,
writeLock: &sync.RWMutex{},
}

go ch.savingMeta()
go ch.serving()

return ch, nil
}
@@ -322,17 +327,23 @@ func (f *CacheFile) Move(newPathComps []string) {
f.letSave()
}

func (f *CacheFile) SetRemoteObject(obj *cdssdk.Object) {
f.remoteObj = obj
}

// 打开一个写入句柄,同时支持读取
func (f *CacheFile) Open(flags uint32) *CacheFileReadWriter {
logger.Tracef("CacheFile.Open: %v, %#x", f.name, flags)

f.cache.lock.Lock()

f.rwLock.Lock()
defer f.rwLock.Unlock()

f.refCount++
if f.refCount == 1 && !f.isDeleted {
f.cache.freeCache = lo2.Remove(f.cache.freeCache, f)
}

// 提前释放Cache的锁
f.cache.lock.Unlock()

h := &CacheFileReadWriter{
file: f,
remoteLock: &sync.Mutex{},
@@ -407,18 +418,45 @@ func (f *CacheFile) Truncate(size int64) error {
}

// 不再使用缓存文件
// func (f *CacheFile) Release() {
func (f *CacheFile) Release() {
f.cache.lock.Lock()
defer f.cache.lock.Unlock()

f.refCount--
f.freeTime = time.Now()

// }
f.rwLock.RLock()
defer f.rwLock.RUnlock()

if f.refCount == 0 && !f.isDeleted {
f.cache.freeCache = append(f.cache.freeCache, f)
}
}

func (f *CacheFile) Free() {
f.rwLock.Lock()
defer f.rwLock.Unlock()

if !f.isDeleted {
// TODO 日志
f.saveMeta()
}

func (f *CacheFile) savingMeta() {
// 防止在关闭缓存后又保存了文件
f.isFreed = true
f.metaFile.Close()
f.dataFile.Close()
close(f.saveMetaChan)
}

func (f *CacheFile) serving() {
savedInfoRev := int64(0)
ticker := time.NewTicker(time.Second * 5)
defer ticker.Stop()

for {
select {
case _, ok := <-f.savingMetaChan:
case _, ok := <-f.saveMetaChan:
if !ok {
return
}
@@ -434,35 +472,19 @@ func (f *CacheFile) savingMeta() {
break
}

// 如果缓存已经被释放,就不要再保存元数据了
if f.isFreed {
f.rwLock.Unlock()
break
}

for {
if f.infoRev == savedInfoRev {
break
}

jsonData, err := serder.ObjectToJSON(f.info)
if err != nil {
// TODO 日志
break
}

err = f.metaFile.Truncate(0)
if err != nil {
// TODO 日志
break
}

_, err = f.metaFile.Seek(0, io.SeekStart)
if err != nil {
// TODO 日志
break
}

err = io2.WriteAll(f.metaFile, jsonData)
if err != nil {
// TODO 日志
break
}

// TODO 错误日志
f.saveMeta()
f.metaFile.Sync()

savedInfoRev = f.infoRev
@@ -473,9 +495,33 @@ func (f *CacheFile) savingMeta() {
}
}

func (f *CacheFile) saveMeta() error {
jsonData, err := serder.ObjectToJSON(f.info)
if err != nil {
return err
}

err = f.metaFile.Truncate(0)
if err != nil {
return err
}

_, err = f.metaFile.Seek(0, io.SeekStart)
if err != nil {
return err
}

err = io2.WriteAll(f.metaFile, jsonData)
if err != nil {
return err
}

return nil
}

func (f *CacheFile) letSave() {
select {
case f.savingMetaChan <- nil:
case f.saveMetaChan <- nil:
default:
}
}
@@ -630,12 +676,22 @@ func (f *CacheFileReadWriter) Close() error {
f.remote.Close()
}

f.file.cache.lock.Lock()
defer f.file.cache.lock.Unlock()

f.file.rwLock.Lock()
defer f.file.rwLock.Unlock()

f.file.refCount--
if f.file.refCount == 0 && !f.file.isDeleted {
f.file.cache.freeCache = append(f.file.cache.freeCache, f.file)
f.file.freeTime = time.Now()
}

if f.writeable {
f.file.writers = lo2.Remove(f.file.writers, f)
} else if f.readable {
f.file.readers = lo2.Remove(f.file.readers, f)
}
f.file.rwLock.Unlock()
return nil
}

+ 1
- 0
client2/internal/mount/vfs/fuse_bucket.go View File

@@ -181,6 +181,7 @@ func (r *FuseBucket) NewFile(ctx context.Context, name string, flags uint32) (fu
if cache == nil {
return nil, 0, fuse.ErrPermission
}
defer cache.Release()
// Open之后会给cache的引用计数额外+1,即使cache先于FileHandle被关闭,
// 也有有FileHandle的计数保持cache的有效性



+ 3
- 9
client2/internal/mount/vfs/fuse_dir.go View File

@@ -2,7 +2,6 @@ package vfs

import (
"context"
"fmt"
"os"
"strings"
"time"
@@ -154,16 +153,10 @@ func (r *FuseDir) listChildren() ([]fuse.FsEntry, error) {

objPath := cdssdk.JoinObjectPath(r.pathComps[2:]...)

coms, err := db.Object().GetCommonPrefixes(tx, pkg.PackageID, objPath+cdssdk.ObjectPathSeparator)
objs, coms, err := db.Object().GetByPrefixGrouped(tx, pkg.PackageID, objPath+cdssdk.ObjectPathSeparator)
if err != nil {
return fmt.Errorf("getting common prefixes: %w", err)
}

objs, err := db.Object().GetDirectChildren(tx, pkg.PackageID, objPath+cdssdk.ObjectPathSeparator)
if err != nil {
return fmt.Errorf("getting direct children: %w", err)
return err
}

for _, dir := range coms {
dir = strings.TrimSuffix(dir, cdssdk.ObjectPathSeparator)
pathComps := lo2.AppendNew(r.pathComps, cdssdk.BaseName(dir))
@@ -218,6 +211,7 @@ func (r *FuseDir) NewFile(ctx context.Context, name string, flags uint32) (fuse.
if cache == nil {
return nil, 0, fuse.ErrPermission
}
defer cache.Release()
// Open之后会给cache的引用计数额外+1,即使cache先于FileHandle被关闭,
// 也有有FileHandle的计数保持cache的有效性



+ 3
- 0
client2/internal/mount/vfs/fuse_file.go View File

@@ -63,6 +63,7 @@ func (n *FuseFileNode) Truncate(size uint64) error {
if cacheFile == nil {
return fuse.ErrNotExists
}
defer cacheFile.Release()

return cacheFile.Truncate(int64(size))
}
@@ -72,6 +73,7 @@ func (n *FuseFileNode) SetModTime(time time.Time) error {
if cacheFile == nil {
return fuse.ErrNotExists
}
defer cacheFile.Release()

return cacheFile.SetModTime(time)
}
@@ -82,6 +84,7 @@ func (n *FuseFileNode) Open(flags uint32) (fuse.FileHandle, uint32, error) {
// 如果文件不存在,也不进行创建,因为创建不应该调用这个接口
return nil, 0, fuse.ErrNotExists
}
defer cacheFile.Release()

hd := cacheFile.Open(flags)
return newFileHandle(n, hd), flags, nil


+ 3
- 8
client2/internal/mount/vfs/fuse_package.go View File

@@ -2,7 +2,6 @@ package vfs

import (
"context"
"fmt"
"os"
"strings"
"time"
@@ -154,14 +153,9 @@ func (r *FusePackage) listChildren() ([]fuse.FsEntry, error) {
return err
}

coms, err := db.Object().GetCommonPrefixes(tx, pkg.PackageID, "")
objs, coms, err := db.Object().GetByPrefixGrouped(tx, pkg.PackageID, "")
if err != nil {
return fmt.Errorf("getting common prefixes: %w", err)
}

objs, err := db.Object().GetDirectChildren(tx, pkg.PackageID, "")
if err != nil {
return fmt.Errorf("getting direct children: %w", err)
return err
}

for _, dir := range coms {
@@ -216,6 +210,7 @@ func (r *FusePackage) NewFile(ctx context.Context, name string, flags uint32) (f
if cache == nil {
return nil, 0, fuse.ErrPermission
}
defer cache.Release()
// Open之后会给cache的引用计数额外+1,即使cache先于FileHandle被关闭,
// 也有有FileHandle的计数保持cache的有效性



+ 1
- 0
client2/internal/mount/vfs/fuse_root.go View File

@@ -155,6 +155,7 @@ func (r *FuseRoot) NewFile(ctx context.Context, name string, flags uint32) (fuse
if cache == nil {
return nil, 0, fuse.ErrPermission
}
defer cache.Release()
// Open之后会给cache的引用计数额外+1,即使cache先于FileHandle被关闭,
// 也有有FileHandle的计数保持cache的有效性



+ 8
- 0
client2/internal/mount/vfs/vfs.go View File

@@ -24,6 +24,14 @@ func NewVfs(cfg *config.Config, db *db2.DB, downloader *downloader.Downloader) *
}
}

func (v *Vfs) Start() {
v.cache.Start()
}

func (v *Vfs) Stop() {
v.cache.Stop()
}

func (v *Vfs) Root() fuse.FsDir {
return newRoot(v)
}


+ 37
- 0
common/pkgs/db2/object.go View File

@@ -56,6 +56,43 @@ func (db *ObjectDB) GetWithPathPrefixPaged(ctx SQLContext, packageID cdssdk.Pack
return ret, err
}

func (db *ObjectDB) GetByPrefixGrouped(ctx SQLContext, packageID cdssdk.PackageID, pathPrefix string) (objs []cdssdk.Object, commonPrefixes []string, err error) {
type ObjectOrDir struct {
cdssdk.Object
IsObject bool `gorm:"IsObject"`
Prefix string `gorm:"Prefix"`
}

sepCnt := strings.Count(pathPrefix, cdssdk.ObjectPathSeparator) + 1

prefixStatm := fmt.Sprintf("Substring_Index(Path, '%s', %d)", cdssdk.ObjectPathSeparator, sepCnt)
grouping := ctx.Table("Object").
Select(fmt.Sprintf("%s as Prefix, Max(ObjectID) as ObjectID, %s = Path as IsObject", prefixStatm, prefixStatm)).
Where("PackageID = ?", packageID).
Where("Path like ?", pathPrefix+"%").
Group("Prefix, IsObject").
Order("Prefix ASC")

var ret []ObjectOrDir
err = ctx.Table("Object").
Select("Grouped.IsObject, Grouped.Prefix, Object.*").
Joins("right join (?) as Grouped on Object.ObjectID = Grouped.ObjectID and Grouped.IsObject = 1", grouping).
Find(&ret).Error
if err != nil {
return
}

for _, o := range ret {
if o.IsObject {
objs = append(objs, o.Object)
} else {
commonPrefixes = append(commonPrefixes, o.Prefix+cdssdk.ObjectPathSeparator)
}
}

return
}

func (db *ObjectDB) GetByPrefixGroupedPaged(ctx SQLContext, packageID cdssdk.PackageID, pathPrefix string, startPath string, limit int) (objs []cdssdk.Object, commonPrefixes []string, nextStartPath string, err error) {
type ObjectOrDir struct {
cdssdk.Object


Loading…
Cancel
Save