From 3694ed824ff36998564302b37d0572d6defcdb09 Mon Sep 17 00:00:00 2001 From: Sydonian <794346190@qq.com> Date: Mon, 24 Mar 2025 17:20:07 +0800 Subject: [PATCH] =?UTF-8?q?cache=E5=A2=9E=E5=8A=A0=E9=94=81?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- client2/internal/mount/fuse/fuse.go | 3 ++ client2/internal/mount/fuse/types.go | 1 + client2/internal/mount/vfs/cache/cache.go | 24 +++++++++++++++ client2/internal/mount/vfs/cache/file.go | 34 ++++++++++++---------- client2/internal/mount/vfs/cache/remote.go | 1 + client2/internal/mount/vfs/fuse_bucket.go | 4 ++- client2/internal/mount/vfs/fuse_dir.go | 17 ++++++----- client2/internal/mount/vfs/fuse_package.go | 4 ++- 8 files changed, 63 insertions(+), 25 deletions(-) diff --git a/client2/internal/mount/fuse/fuse.go b/client2/internal/mount/fuse/fuse.go index 1d45b1e..78431b6 100644 --- a/client2/internal/mount/fuse/fuse.go +++ b/client2/internal/mount/fuse/fuse.go @@ -44,6 +44,9 @@ func translateError(err error) syscall.Errno { case ErrNotEmpty: return syscall.ENOTEMPTY + case ErrIOError: + return syscall.EIO + default: return syscall.EIO } diff --git a/client2/internal/mount/fuse/types.go b/client2/internal/mount/fuse/types.go index 58f803c..c579462 100644 --- a/client2/internal/mount/fuse/types.go +++ b/client2/internal/mount/fuse/types.go @@ -12,6 +12,7 @@ var ( ErrExists = os.ErrExist ErrNotEmpty = errors.New("directory not empty") ErrPermission = os.ErrPermission + ErrIOError = errors.New("I/O error") ) type Fs interface { diff --git a/client2/internal/mount/vfs/cache/cache.go b/client2/internal/mount/vfs/cache/cache.go index 0a916a5..c4787e8 100644 --- a/client2/internal/mount/vfs/cache/cache.go +++ b/client2/internal/mount/vfs/cache/cache.go @@ -5,6 +5,7 @@ import ( "fmt" "os" "path/filepath" + "sync" "syscall" "time" @@ -39,6 +40,7 @@ type Cache struct { cacheDataDir string cacheMetaDir string activeCache *trie.Trie[*CacheFile] + lock *sync.RWMutex } func NewCache(db *db2.DB, downloader *downloader.Downloader, cacheDataDir, cacheMetaDir string) *Cache { @@ -48,6 +50,7 @@ func NewCache(db *db2.DB, downloader *downloader.Downloader, cacheDataDir, cache cacheDataDir: cacheDataDir, cacheMetaDir: cacheMetaDir, activeCache: trie.NewTrie[*CacheFile](), + lock: &sync.RWMutex{}, } } @@ -81,6 +84,9 @@ func (c *Cache) GetCacheMetaPathComps(comps ...string) []string { // 获取指定位置的缓存条目信息。如果路径不存在,则返回nil。 func (c *Cache) Stat(pathComps []string) *CacheEntryInfo { + c.lock.RLock() + defer c.lock.RUnlock() + node, ok := c.activeCache.WalkEnd(pathComps) if ok && node.Value != nil { info := node.Value.Info() @@ -113,6 +119,9 @@ func (c *Cache) Stat(pathComps []string) *CacheEntryInfo { // 创建一个缓存文件。如果文件已经存在,则会覆盖已有文件。如果加载过程中发生了错误,或者目标位置是一个目录,则会返回nil。 func (c *Cache) CreateFile(pathComps []string) *CacheFile { + c.lock.Lock() + defer c.lock.Unlock() + ch, err := createNewCacheFile(c, pathComps) if err != nil { // TODO 日志记录 @@ -125,6 +134,9 @@ func (c *Cache) CreateFile(pathComps []string) *CacheFile { // 尝试加载缓存文件,如果文件不存在,则使用obj的信息创建一个新缓存文件,而如果obj为nil,那么会返回nil。 func (c *Cache) LoadFile(pathComps []string, obj *cdssdk.Object) *CacheFile { + c.lock.Lock() + defer c.lock.Unlock() + node, ok := c.activeCache.WalkEnd(pathComps) if ok && node.Value != nil { return node.Value @@ -160,6 +172,9 @@ func (c *Cache) LoadFile(pathComps []string, obj *cdssdk.Object) *CacheFile { // 创建一个缓存目录。如果目录已经存在,则会重置目录属性。如果加载过程中发生了错误,或者目标位置是一个文件,则会返回nil func (c *Cache) CreateDir(pathComps []string) *CacheDir { + c.lock.Lock() + defer c.lock.Unlock() + ch, err := createNewCacheDir(c, pathComps) if err != nil { // TODO 日志记录 @@ -174,6 +189,9 @@ type CreateDirOption struct { // 加载指定缓存目录,如果目录不存在,则使用createOpt选项创建目录,而如果createOpt为nil,那么会返回nil。 func (c *Cache) LoadDir(pathComps []string, createOpt *CreateDirOption) *CacheDir { + c.lock.Lock() + defer c.lock.Unlock() + ch, err := loadCacheDir(c, pathComps) if err == nil { return ch @@ -199,6 +217,9 @@ func (c *Cache) LoadDir(pathComps []string, createOpt *CreateDirOption) *CacheDi // 加载指定路径下的所有缓存条目信息 func (c *Cache) StatMany(pathComps []string) []CacheEntryInfo { + c.lock.RLock() + defer c.lock.RUnlock() + var infos []CacheEntryInfo exists := make(map[string]bool) @@ -245,6 +266,9 @@ func (c *Cache) StatMany(pathComps []string) []CacheEntryInfo { // 删除指定路径的缓存文件或目录。删除目录时如果目录不为空,则会报错。 func (c *Cache) Remove(pathComps []string) error { + c.lock.Lock() + defer c.lock.Unlock() + node, ok := c.activeCache.WalkEnd(pathComps) if ok { if len(node.WordNexts) > 0 { diff --git a/client2/internal/mount/vfs/cache/file.go b/client2/internal/mount/vfs/cache/file.go index 3896494..953c44d 100644 --- a/client2/internal/mount/vfs/cache/file.go +++ b/client2/internal/mount/vfs/cache/file.go @@ -85,7 +85,7 @@ type CacheFile struct { rwLock *sync.RWMutex readers []*CacheFileReadWriter writers []*CacheFileReadWriter - backgroundChan chan any + savingMetaChan chan any isDeleted bool localFile *os.File @@ -123,12 +123,12 @@ func createNewCacheFile(cache *Cache, pathComps []string) (*CacheFile, error) { name: pathComps[len(pathComps)-1], info: info, rwLock: &sync.RWMutex{}, - backgroundChan: make(chan any, 1), + savingMetaChan: make(chan any, 1), localFile: localFile, writeLock: &sync.RWMutex{}, } - go ch.background() + go ch.savingMeta() return ch, nil } @@ -159,12 +159,12 @@ func loadCacheFile(cache *Cache, pathComps []string) (*CacheFile, error) { name: pathComps[len(pathComps)-1], info: *info, rwLock: &sync.RWMutex{}, - backgroundChan: make(chan any, 1), + savingMetaChan: make(chan any, 1), localFile: localFile, writeLock: &sync.RWMutex{}, } - go ch.background() + go ch.savingMeta() return ch, nil } @@ -202,12 +202,12 @@ func makeCacheFileFromObject(cache *Cache, pathComps []string, obj *cdssdk.Objec info: info, remoteObj: obj, rwLock: &sync.RWMutex{}, - backgroundChan: make(chan any, 1), + savingMetaChan: make(chan any, 1), localFile: localFile, writeLock: &sync.RWMutex{}, } - go ch.background() + go ch.savingMeta() return ch, nil } @@ -281,6 +281,8 @@ func (f *CacheFile) Delete() { os.Remove(metaPath) os.Remove(dataPath) f.isDeleted = true + + f.letSave() } func (f *CacheFile) SetRemoteObject(obj *cdssdk.Object) { @@ -329,7 +331,7 @@ func (f *CacheFile) SetModTime(modTime time.Time) error { f.infoRev++ f.rwLock.Unlock() - f.notifyBackground() + f.letSave() return nil } @@ -363,7 +365,7 @@ func (f *CacheFile) Truncate(size int64) error { f.info.Size = size f.infoRev++ - f.notifyBackground() + f.letSave() return nil } @@ -372,14 +374,14 @@ func (f *CacheFile) Truncate(size int64) error { // } -func (f *CacheFile) background() { +func (f *CacheFile) savingMeta() { savedInfoRev := int64(0) ticker := time.NewTicker(time.Second * 5) defer ticker.Stop() for { select { - case _, ok := <-f.backgroundChan: + case _, ok := <-f.savingMetaChan: if !ok { return } @@ -392,7 +394,7 @@ func (f *CacheFile) background() { // 如果文件已被删除,则不能再保存元数据,防止覆盖掉新创建的同名文件 if f.isDeleted { f.rwLock.Unlock() - continue + break } for { @@ -420,9 +422,9 @@ func (f *CacheFile) background() { } } -func (f *CacheFile) notifyBackground() { +func (f *CacheFile) letSave() { select { - case f.backgroundChan <- nil: + case f.savingMetaChan <- nil: default: } } @@ -528,7 +530,7 @@ func (h *CacheFileReadWriter) ReadAt(buf []byte, off int64) (int, error) { h.file.rwLock.Lock() h.file.info.Segments = AddRange(h.file.info.Segments, loadRng) h.file.infoRev++ - h.file.notifyBackground() + h.file.letSave() h.file.rwLock.Unlock() } @@ -561,7 +563,7 @@ func (h *CacheFileReadWriter) WriteAt(buf []byte, off int64) (int, error) { h.file.info.Dirty = true h.file.infoRev++ - h.file.notifyBackground() + h.file.letSave() return writeLen, nil } diff --git a/client2/internal/mount/vfs/cache/remote.go b/client2/internal/mount/vfs/cache/remote.go index 50d1780..27828c3 100644 --- a/client2/internal/mount/vfs/cache/remote.go +++ b/client2/internal/mount/vfs/cache/remote.go @@ -32,6 +32,7 @@ func (r *RemoteLoader) Load(p []byte, pos int64) (n int, err error) { continue } + // 找到一个position刚好等于off的loader if loader.pos == pos { loader.lastUsedTime = time.Now() n, err = io.ReadFull(loader.reader, p) diff --git a/client2/internal/mount/vfs/fuse_bucket.go b/client2/internal/mount/vfs/fuse_bucket.go index 1856eba..9650458 100644 --- a/client2/internal/mount/vfs/fuse_bucket.go +++ b/client2/internal/mount/vfs/fuse_bucket.go @@ -17,6 +17,7 @@ type FuseBucket struct { vfs *Vfs bktName string modTime time.Time + mode os.FileMode } func newBucketFromCache(c cache.CacheEntryInfo, vfs *Vfs) fuse.FsDir { @@ -24,6 +25,7 @@ func newBucketFromCache(c cache.CacheEntryInfo, vfs *Vfs) fuse.FsDir { vfs: vfs, bktName: c.PathComps[len(c.PathComps)-1], modTime: c.ModTime, + mode: c.Mode, } } @@ -40,7 +42,7 @@ func (r *FuseBucket) Size() int64 { } func (r *FuseBucket) Mode() os.FileMode { - return os.ModeDir | 0755 + return os.ModeDir | r.mode } func (r *FuseBucket) ModTime() time.Time { diff --git a/client2/internal/mount/vfs/fuse_dir.go b/client2/internal/mount/vfs/fuse_dir.go index 29aee33..f61ab80 100644 --- a/client2/internal/mount/vfs/fuse_dir.go +++ b/client2/internal/mount/vfs/fuse_dir.go @@ -19,6 +19,7 @@ type FuseDir struct { vfs *Vfs pathComps []string modTime time.Time + mode os.FileMode } func newDirFromCache(ch cache.CacheEntryInfo, vfs *Vfs) *FuseDir { @@ -26,6 +27,7 @@ func newDirFromCache(ch cache.CacheEntryInfo, vfs *Vfs) *FuseDir { vfs: vfs, pathComps: ch.PathComps, modTime: ch.ModTime, + mode: ch.Mode, } } @@ -42,7 +44,7 @@ func (r *FuseDir) Size() int64 { } func (r *FuseDir) Mode() os.FileMode { - return os.ModeDir | 0755 + return os.ModeDir | r.mode } func (r *FuseDir) ModTime() time.Time { @@ -64,7 +66,7 @@ func (r *FuseDir) SetModTime(time time.Time) error { // 如果不存在,应该返回ErrNotExists func (r *FuseDir) Child(ctx context.Context, name string) (fuse.FsEntry, error) { - childPathComps := append(lo2.ArrayClone(r.pathComps), name) + childPathComps := lo2.AppendNew(r.pathComps, name) ca := r.vfs.cache.Stat(childPathComps) if ca == nil { var ret fuse.FsEntry @@ -144,6 +146,7 @@ func (r *FuseDir) listChildren() ([]fuse.FsEntry, error) { db := r.vfs.db db.DoTx(func(tx db2.SQLContext) error { + // TODO UserID pkg, err := db.Package().GetUserPackageByName(tx, 1, r.pathComps[0], r.pathComps[1]) if err != nil { return err @@ -163,7 +166,7 @@ func (r *FuseDir) listChildren() ([]fuse.FsEntry, error) { for _, dir := range coms { dir = strings.TrimSuffix(dir, cdssdk.ObjectPathSeparator) - pathComps := append(lo2.ArrayClone(r.pathComps), cdssdk.BaseName(dir)) + pathComps := lo2.AppendNew(r.pathComps, cdssdk.BaseName(dir)) cd := r.vfs.cache.LoadDir(pathComps, &cache.CreateDirOption{ ModTime: time.Now(), @@ -176,7 +179,7 @@ func (r *FuseDir) listChildren() ([]fuse.FsEntry, error) { } for _, obj := range objs { - pathComps := append(lo2.ArrayClone(r.pathComps), cdssdk.BaseName(obj.Path)) + pathComps := lo2.AppendNew(r.pathComps, cdssdk.BaseName(obj.Path)) file := newFileFromObject(r.vfs, pathComps, obj) dbEntries[file.Name()] = file } @@ -202,7 +205,7 @@ func (r *FuseDir) listChildren() ([]fuse.FsEntry, error) { } func (r *FuseDir) NewDir(ctx context.Context, name string) (fuse.FsDir, error) { - cache := r.vfs.cache.CreateDir(append(lo2.ArrayClone(r.pathComps), name)) + cache := r.vfs.cache.CreateDir(lo2.AppendNew(r.pathComps, name)) if cache == nil { return nil, fuse.ErrPermission } @@ -211,7 +214,7 @@ func (r *FuseDir) NewDir(ctx context.Context, name string) (fuse.FsDir, error) { } func (r *FuseDir) NewFile(ctx context.Context, name string, flags uint32) (fuse.FileHandle, uint32, error) { - cache := r.vfs.cache.CreateFile(append(lo2.ArrayClone(r.pathComps), name)) + cache := r.vfs.cache.CreateFile(lo2.AppendNew(r.pathComps, name)) if cache == nil { return nil, 0, fuse.ErrPermission } @@ -224,7 +227,7 @@ func (r *FuseDir) NewFile(ctx context.Context, name string, flags uint32) (fuse. } func (r *FuseDir) RemoveChild(ctx context.Context, name string) error { - pathComps := append(lo2.ArrayClone(r.pathComps), name) + pathComps := lo2.AppendNew(r.pathComps, name) joinedPath := cdssdk.JoinObjectPath(pathComps[2:]...) d := r.vfs.db diff --git a/client2/internal/mount/vfs/fuse_package.go b/client2/internal/mount/vfs/fuse_package.go index 192c94e..e45a069 100644 --- a/client2/internal/mount/vfs/fuse_package.go +++ b/client2/internal/mount/vfs/fuse_package.go @@ -19,6 +19,7 @@ type FusePackage struct { bktName string pkgName string modTime time.Time + mode os.FileMode } func newPackageFromCache(cache cache.CacheEntryInfo, vfs *Vfs) fuse.FsDir { @@ -28,6 +29,7 @@ func newPackageFromCache(cache cache.CacheEntryInfo, vfs *Vfs) fuse.FsDir { bktName: pathComps[0], pkgName: pathComps[1], modTime: cache.ModTime, + mode: cache.Mode, } } @@ -44,7 +46,7 @@ func (r *FusePackage) Size() int64 { } func (r *FusePackage) Mode() os.FileMode { - return os.ModeDir | 0755 + return os.ModeDir | r.mode } func (r *FusePackage) ModTime() time.Time {