Browse Source

挂载功能以data目录里的文件结构为准

gitlink
Sydonian 7 months ago
parent
commit
f2f96f0d77
11 changed files with 471 additions and 605 deletions
  1. +3
    -11
      client/internal/db/object.go
  2. +4
    -10
      client/internal/db/package.go
  3. +49
    -16
      client/internal/mount/vfs/cache/cache.go
  4. +8
    -17
      client/internal/mount/vfs/cache/dir.go
  5. +68
    -19
      client/internal/mount/vfs/cache/file.go
  6. +286
    -0
      client/internal/mount/vfs/fuse.go
  7. +17
    -29
      client/internal/mount/vfs/fuse_bucket.go
  8. +17
    -243
      client/internal/mount/vfs/fuse_dir.go
  9. +8
    -232
      client/internal/mount/vfs/fuse_package.go
  10. +10
    -27
      client/internal/mount/vfs/fuse_root.go
  11. +1
    -1
      client/internal/services/package.go

+ 3
- 11
client/internal/db/object.go View File

@@ -132,18 +132,10 @@ func (db *ObjectDB) GetByPrefixGroupedPaged(ctx SQLContext, packageID types.Pack
return
}

func (db *ObjectDB) HasObjectWithPrefix(ctx SQLContext, packageID types.PackageID, pathPrefix string) (bool, error) {
// 返回gorm.ErrRecordNotFound表示没有对象,nil表示有对象
func (db *ObjectDB) HasObjectWithPrefix(ctx SQLContext, packageID types.PackageID, pathPrefix string) error {
var obj types.Object
err := ctx.Table("Object").Where("PackageID = ? AND Path LIKE ?", packageID, escapeLike("", "%", pathPrefix)).First(&obj).Error
if err == nil {
return true, nil
}

if err == gorm.ErrRecordNotFound {
return false, nil
}

return false, err
return ctx.Table("Object").Where("PackageID = ? AND Path LIKE ?", packageID, escapeLike("", "%", pathPrefix)).First(&obj).Error
}

func (db *ObjectDB) BatchTestObjectID(ctx SQLContext, objectIDs []types.ObjectID) (map[types.ObjectID]bool, error) {


+ 4
- 10
client/internal/db/package.go View File

@@ -76,7 +76,7 @@ func (db *PackageDB) GetBucketPackagesByName(ctx SQLContext, bucketName string)
}

// 在指定名称的Bucket中查找指定名称的Package
func (*PackageDB) GetUserPackageByName(ctx SQLContext, bucketName string, packageName string) (types.Package, error) {
func (*PackageDB) GetByFullName(ctx SQLContext, bucketName string, packageName string) (types.Package, error) {
var ret types.Package
err := ctx.Table("Package").
Select("Package.*").
@@ -147,16 +147,10 @@ func (*PackageDB) ChangeState(ctx SQLContext, packageID types.PackageID, state s
return err
}

func (*PackageDB) HasPackageIn(ctx SQLContext, bucketID types.BucketID) (bool, error) {
// 返回ErrRecordNotFound表示没有找到指定名称的Bucket,nil表示找到了
func (*PackageDB) HasPackageIn(ctx SQLContext, bucketID types.BucketID) error {
var pkg types.Package
err := ctx.Table("Package").Where("BucketID = ?", bucketID).First(&pkg).Error
if err == gorm.ErrRecordNotFound {
return false, nil
}
if err != nil {
return false, err
}
return true, nil
return ctx.Table("Package").Where("BucketID = ?", bucketID).First(&pkg).Error
}

func (*PackageDB) Move(ctx SQLContext, packageID types.PackageID, newBktID types.BucketID, newName string) error {


+ 49
- 16
client/internal/mount/vfs/cache/cache.go View File

@@ -110,15 +110,16 @@ func (c *Cache) Stat(pathComps []string) *CacheEntryInfo {
return &info
}

metaPath := c.GetCacheMetaPath(pathComps...)
stat, err := os.Stat(metaPath)
dataPath := c.GetCacheDataPath(pathComps...)
stat, err := os.Stat(dataPath)
if err != nil {
// logger.Warnf("")
// TODO 日志记录
return nil
}

if stat.IsDir() {
info, err := loadCacheDirInfo(c, pathComps)
info, err := loadCacheDirInfo(c, pathComps, stat)
if err != nil {
return nil
}
@@ -126,7 +127,7 @@ func (c *Cache) Stat(pathComps []string) *CacheEntryInfo {
return info
}

info, err := loadCacheFileInfo(c, pathComps)
info, err := loadCacheFileInfo(c, pathComps, stat)
if err != nil {
return nil
}
@@ -275,7 +276,7 @@ func (c *Cache) StatMany(pathComps []string) []CacheEntryInfo {
}
}

osEns, err := os.ReadDir(c.GetCacheMetaPath(pathComps...))
osEns, err := os.ReadDir(c.GetCacheDataPath(pathComps...))
if err != nil {
return nil
}
@@ -284,16 +285,20 @@ func (c *Cache) StatMany(pathComps []string) []CacheEntryInfo {
if exists[e.Name()] {
continue
}
info, err := e.Info()
if err != nil {
continue
}

if e.IsDir() {
info, err := loadCacheDirInfo(c, append(lo2.ArrayClone(pathComps), e.Name()))
info, err := loadCacheDirInfo(c, append(lo2.ArrayClone(pathComps), e.Name()), info)
if err != nil {
continue
}

infos = append(infos, *info)
} else {
info, err := loadCacheFileInfo(c, append(lo2.ArrayClone(pathComps), e.Name()))
info, err := loadCacheFileInfo(c, append(lo2.ArrayClone(pathComps), e.Name()), info)
if err != nil {
continue
}
@@ -330,7 +335,10 @@ func (c *Cache) Remove(pathComps []string) error {
}

metaPath := c.GetCacheMetaPath(pathComps...)
err := os.Remove(metaPath)
dataPath := c.GetCacheDataPath(pathComps...)

os.Remove(metaPath)
err := os.Remove(dataPath)
if err == nil || os.IsNotExist(err) {
logger.Debugf("local cache %v removed", pathComps)
return nil
@@ -358,20 +366,45 @@ func (c *Cache) Move(pathComps []string, newPathComps []string) error {
newMetaPath := c.GetCacheMetaPath(newPathComps...)
newDataPath := c.GetCacheDataPath(newPathComps...)

_, err := os.Stat(newMetaPath)
_, err := os.Stat(newDataPath)
if err == nil {
return fuse.ErrExists
} else if !os.IsNotExist(err) {
}

if !os.IsNotExist(err) {
return err
}

metaPath := c.GetCacheMetaPath(pathComps...)
dataPath := c.GetCacheDataPath(pathComps...)
oldMetaPath := c.GetCacheMetaPath(pathComps...)
oldDataPath := c.GetCacheDataPath(pathComps...)

// 确定源文件存在,再进行后面的操作
_, err = os.Stat(oldDataPath)
if err != nil {
if os.IsNotExist(err) {
return fuse.ErrNotExists
}
return err
}

// 创建父目录是为了解决被移动的文件不在本地的问题。
// 但同时也导致了如果目的路径的父目录确实不存在,这里会意外的创建了这个目录
newMetaDir := filepath.Dir(newMetaPath)
err = os.MkdirAll(newMetaDir, 0755)
if err != nil {
return err
}

newDataDir := filepath.Dir(newDataPath)
err = os.MkdirAll(newDataDir, 0755)
if err != nil {
return err
}

// 每个缓存文件持有meta文件和data文件的句柄,所以这里移动文件,不影响句柄的使用。
// 只能忽略这里的错误
os.Rename(metaPath, newMetaPath)
os.Rename(dataPath, newDataPath)
os.Rename(oldMetaPath, newMetaPath)
os.Rename(oldDataPath, newDataPath)

// 更新缓存
oldNode, ok := c.activeCache.WalkEnd(pathComps)
@@ -521,9 +554,9 @@ func (c *Cache) doUploading(pkgs []*uploadingPackage) {
var sucPkgs []*uploadingPackage
var failedPkgs []*uploadingPackage
for _, pkg := range pkgs {
p, err := c.db.Package().GetUserPackageByName(c.db.DefCtx(), pkg.bktName, pkg.pkgName)
p, err := c.db.Package().GetByFullName(c.db.DefCtx(), pkg.bktName, pkg.pkgName)
if err != nil {
logger.Warnf("get user package %v/%v: %v", pkg.bktName, pkg.pkgName, err)
logger.Warnf("get package %v/%v: %v", pkg.bktName, pkg.pkgName, err)
failedPkgs = append(failedPkgs, pkg)
continue
}


+ 8
- 17
client/internal/mount/vfs/cache/dir.go View File

@@ -30,7 +30,7 @@ func createNewCacheDir(c *Cache, pathComps []string) (*CacheDir, error) {

// 修改文件夹的修改时间
modTime := time.Now()
os.Chtimes(metaPath, modTime, modTime)
os.Chtimes(dataPath, modTime, modTime)

return &CacheDir{
cache: c,
@@ -42,7 +42,7 @@ func createNewCacheDir(c *Cache, pathComps []string) (*CacheDir, error) {
}

func loadCacheDir(c *Cache, pathComps []string) (*CacheDir, error) {
stat, err := os.Stat(c.GetCacheMetaPath(pathComps...))
stat, err := os.Stat(c.GetCacheDataPath(pathComps...))
if err != nil {
return nil, err
}
@@ -73,7 +73,7 @@ func makeCacheDirFromOption(c *Cache, pathComps []string, opt CreateDirOption) (
return nil, err
}

os.Chtimes(metaPath, opt.ModTime, opt.ModTime)
os.Chtimes(dataPath, opt.ModTime, opt.ModTime)
return &CacheDir{
pathComps: pathComps,
name: pathComps[len(pathComps)-1],
@@ -82,21 +82,12 @@ func makeCacheDirFromOption(c *Cache, pathComps []string, opt CreateDirOption) (
}, nil
}

func loadCacheDirInfo(c *Cache, pathComps []string) (*CacheEntryInfo, error) {
stat, err := os.Stat(c.GetCacheMetaPath(pathComps...))
if err != nil {
return nil, err
}

if !stat.IsDir() {
return nil, fmt.Errorf("not a directory")
}

func loadCacheDirInfo(c *Cache, pathComps []string, dataFileInfo os.FileInfo) (*CacheEntryInfo, error) {
return &CacheEntryInfo{
PathComps: pathComps,
Size: 0,
Mode: stat.Mode(),
ModTime: stat.ModTime(),
Mode: dataFileInfo.Mode(),
ModTime: dataFileInfo.ModTime(),
IsDir: true,
}, nil
}
@@ -136,6 +127,6 @@ func (f *CacheDir) Info() CacheEntryInfo {
}

func (f *CacheDir) SetModTime(modTime time.Time) error {
metaPath := f.cache.GetCacheMetaPath(f.pathComps...)
return os.Chtimes(metaPath, modTime, modTime)
dataPath := f.cache.GetCacheDataPath(f.pathComps...)
return os.Chtimes(dataPath, modTime, modTime)
}

+ 68
- 19
client/internal/mount/vfs/cache/file.go View File

@@ -4,6 +4,7 @@ import (
"fmt"
"io"
"os"
"path/filepath"
"sync"
"time"

@@ -120,6 +121,11 @@ func createNewCacheFile(cache *Cache, pathComps []string) (*CacheFile, error) {
return nil, err
}

err = os.MkdirAll(filepath.Dir(metaPath), 0755)
if err != nil {
return nil, err
}

metaFile, err := os.OpenFile(metaPath, os.O_CREATE|os.O_RDWR|os.O_TRUNC, 0644)
if err != nil {
return nil, fmt.Errorf("create cache meta file: %w", err)
@@ -160,24 +166,52 @@ func loadCacheFile(cache *Cache, pathComps []string) (*CacheFile, error) {
metaPath := cache.GetCacheMetaPath(pathComps...)
dataPath := cache.GetCacheDataPath(pathComps...)

metaFile, err := os.OpenFile(metaPath, os.O_RDWR, 0644)
dataFile, err := os.OpenFile(dataPath, os.O_RDWR, 0644)
if err != nil {
// 不要包装这里的err
return nil, err
}

info := &FileInfo{}
err = serder.JSONToObjectStream(metaFile, info)
if err != nil {
metaFile.Close()
return nil, err
}

dataFile, err := os.OpenFile(dataPath, os.O_RDWR, 0644)
metaFile, err := os.OpenFile(metaPath, os.O_RDWR, 0644)
if err != nil {
metaFile.Close()
// 不要包装这里的err
return nil, err
// 如果有数据文件,而没有元数据文件,则创建一个元数据文件
if !os.IsNotExist(err) {
dataFile.Close()
return nil, err
}

stat, err := dataFile.Stat()
if err != nil {
dataFile.Close()
return nil, err
}

err = os.MkdirAll(filepath.Dir(metaPath), 0755)
if err != nil {
dataFile.Close()
return nil, err
}

metaFile, err = os.OpenFile(metaPath, os.O_CREATE|os.O_RDWR|os.O_TRUNC, 0644)
if err != nil {
dataFile.Close()
return nil, err
}

info.Size = stat.Size()
info.ModTime = stat.ModTime()
info.Perm = stat.Mode().Perm()
info.Segments = []*Range{{Position: 0, Length: info.Size}}
info.Revision = 1 // 未同步的文件视为已修改

} else {
err = serder.JSONToObjectStream(metaFile, info)
if err != nil {
dataFile.Close()
return nil, err
}
}

ch := &CacheFile{
@@ -215,6 +249,11 @@ func newCacheFileFromObject(cache *Cache, pathComps []string, obj *clitypes.Obje
return nil, err
}

err = os.MkdirAll(filepath.Dir(metaPath), 0755)
if err != nil {
return nil, err
}

metaFile, err := os.OpenFile(metaPath, os.O_CREATE|os.O_RDWR|os.O_TRUNC, 0644)
if err != nil {
return nil, fmt.Errorf("create cache meta file: %w", err)
@@ -252,25 +291,35 @@ func newCacheFileFromObject(cache *Cache, pathComps []string, obj *clitypes.Obje
return ch, nil
}

func loadCacheFileInfo(cache *Cache, pathComps []string) (*CacheEntryInfo, error) {
func loadCacheFileInfo(cache *Cache, pathComps []string, dataFileInfo os.FileInfo) (*CacheEntryInfo, error) {
metaPath := cache.GetCacheMetaPath(pathComps...)

metaData, err := os.ReadFile(metaPath)
if err != nil {
return nil, err
if err == nil {
info := &FileInfo{}
err = serder.JSONToObject(metaData, info)
if err != nil {
return nil, err
}

return &CacheEntryInfo{
PathComps: pathComps,
Size: info.Size,
Mode: info.Perm,
ModTime: info.ModTime,
IsDir: false,
}, nil
}

info := &FileInfo{}
err = serder.JSONToObject(metaData, info)
if err != nil {
if !os.IsNotExist(err) {
return nil, err
}

return &CacheEntryInfo{
PathComps: pathComps,
Size: info.Size,
Mode: info.Perm,
ModTime: info.ModTime,
Size: dataFileInfo.Size(),
Mode: dataFileInfo.Mode(),
ModTime: dataFileInfo.ModTime(),
IsDir: false,
}, nil
}


+ 286
- 0
client/internal/mount/vfs/fuse.go View File

@@ -1,5 +1,291 @@
package vfs

import (
"context"
"strings"
"time"

"gitlink.org.cn/cloudream/common/utils/lo2"
"gitlink.org.cn/cloudream/storage2/client/internal/db"
"gitlink.org.cn/cloudream/storage2/client/internal/mount/fuse"
"gitlink.org.cn/cloudream/storage2/client/internal/mount/vfs/cache"
clitypes "gitlink.org.cn/cloudream/storage2/client/types"
"gorm.io/gorm"
)

type FuseNode interface {
PathComps() []string
}

func child(vfs *Vfs, ctx context.Context, parent FuseNode, name string) (fuse.FsEntry, error) {
parentPathComps := parent.PathComps()

childPathComps := lo2.AppendNew(parentPathComps, name)
ca := vfs.cache.Stat(childPathComps)
if ca == nil {
var ret fuse.FsEntry

d := vfs.db
err := d.DoTx(func(tx db.SQLContext) error {
pkg, err := d.Package().GetByFullName(tx, childPathComps[0], childPathComps[1])
if err != nil {
if err != gorm.ErrRecordNotFound {
return err
}
return nil
}

objPath := clitypes.JoinObjectPath(childPathComps[2:]...)
obj, err := d.Object().GetByPath(tx, pkg.PackageID, objPath)
if err == nil {
ret = newFileFromObject(vfs, childPathComps, obj)
return nil
}
if err != gorm.ErrRecordNotFound {
return err
}

err = d.Object().HasObjectWithPrefix(tx, pkg.PackageID, objPath+clitypes.ObjectPathSeparator)
if err == nil {
dir := vfs.cache.LoadDir(childPathComps, &cache.CreateDirOption{
ModTime: time.Now(),
})
if dir == nil {
return nil
}

ret = newDirFromCache(dir.Info(), vfs)
return nil
}

if err == gorm.ErrRecordNotFound {
return nil
}

return err
})
if err != nil {
return nil, err
}

if ret == nil {
return nil, fuse.ErrNotExists
}

return ret, nil
}

if ca.IsDir {
return newDirFromCache(*ca, vfs), nil
}

return newFileFromCache(*ca, vfs), nil
}

func listChildren(vfs *Vfs, ctx context.Context, parent FuseNode) ([]fuse.FsEntry, error) {
var ens []fuse.FsEntry

myPathComps := parent.PathComps()

infos := vfs.cache.StatMany(myPathComps)

dbEntries := make(map[string]fuse.FsEntry)

d := vfs.db
d.DoTx(func(tx db.SQLContext) error {
pkg, err := d.Package().GetByFullName(tx, myPathComps[0], myPathComps[1])
if err != nil {
return err
}

objPath := clitypes.JoinObjectPath(myPathComps[2:]...)

objs, coms, err := d.Object().GetByPrefixGrouped(tx, pkg.PackageID, objPath+clitypes.ObjectPathSeparator)
if err != nil {
return err
}

for _, dir := range coms {
dir = strings.TrimSuffix(dir, clitypes.ObjectPathSeparator)
pathComps := lo2.AppendNew(myPathComps, clitypes.BaseName(dir))

cd := vfs.cache.LoadDir(pathComps, &cache.CreateDirOption{
ModTime: time.Now(),
})
if cd == nil {
continue
}

dbEntries[dir] = newDirFromCache(cd.Info(), vfs)
}

for _, obj := range objs {
pathComps := lo2.AppendNew(myPathComps, clitypes.BaseName(obj.Path))
file := newFileFromObject(vfs, pathComps, obj)
dbEntries[file.Name()] = file
}

return nil
})

for _, c := range infos {
delete(dbEntries, c.PathComps[len(c.PathComps)-1])

if c.IsDir {
ens = append(ens, newDirFromCache(c, vfs))
} else {
ens = append(ens, newFileFromCache(c, vfs))
}
}

for _, e := range dbEntries {
ens = append(ens, e)
}

return ens, nil
}

func newDir(vfs *Vfs, ctx context.Context, name string, parent FuseNode) (fuse.FsDir, error) {
cache := vfs.cache.CreateDir(lo2.AppendNew(parent.PathComps(), name))
if cache == nil {
return nil, fuse.ErrPermission
}

return newDirFromCache(cache.Info(), vfs), nil
}

func newFile(vfs *Vfs, ctx context.Context, name string, parent FuseNode, flags uint32) (fuse.FileHandle, uint32, error) {
cache := vfs.cache.CreateFile(lo2.AppendNew(parent.PathComps(), name))
if cache == nil {
return nil, 0, fuse.ErrPermission
}
defer cache.Release()
// Open之后会给cache的引用计数额外+1,即使cache先于FileHandle被关闭,
// 也有有FileHandle的计数保持cache的有效性

fileNode := newFileFromCache(cache.Info(), vfs)
hd := cache.Open(flags)
return newFileHandle(fileNode, hd), flags, nil
}

func removeChild(vfs *Vfs, ctx context.Context, name string, parent FuseNode) error {
pathComps := lo2.AppendNew(parent.PathComps(), name)
joinedPath := clitypes.JoinObjectPath(pathComps[2:]...)
d := vfs.db

// TODO 生成系统事件
return vfs.db.DoTx(func(tx db.SQLContext) error {
pkg, err := d.Package().GetByFullName(tx, pathComps[0], pathComps[1])
if err == nil {
err := d.Object().HasObjectWithPrefix(tx, pkg.PackageID, joinedPath+clitypes.ObjectPathSeparator)
if err == nil {
return fuse.ErrNotEmpty
}
if err != gorm.ErrRecordNotFound {
return err
}

// 存储系统不会保存目录结构,所以这里是尝试删除同名文件
err = d.Object().DeleteByPath(tx, pkg.PackageID, joinedPath)
if err != nil {
return err
}

} else if err != gorm.ErrRecordNotFound {
return err
}

return vfs.cache.Remove(pathComps)
})
}

func moveChild(vfs *Vfs, ctx context.Context, oldName string, oldParent FuseNode, newName string, newParent FuseNode) error {
newParentPath := newParent.PathComps()
newChildPath := lo2.AppendNew(newParentPath, newName)
newChildPathJoined := clitypes.JoinObjectPath(newChildPath[2:]...)

// 不允许移动任何内容到Package层级以上
if len(newParentPath) < 2 {
return fuse.ErrNotSupported
}

oldChildPath := lo2.AppendNew(oldParent.PathComps(), oldName)
oldChildPathJoined := clitypes.JoinObjectPath(oldChildPath[2:]...)

// 先更新远程,再更新本地,因为远程使用事务更新,可以回滚,而本地不行
return vfs.db.DoTx(func(tx db.SQLContext) error {
err := moveRemote(vfs, tx, oldChildPath, newParentPath, oldChildPathJoined, newChildPathJoined)
if err == fuse.ErrExists {
return err
}
if err != nil && err != fuse.ErrNotExists {
return err
}

err2 := vfs.cache.Move(oldChildPath, newChildPath)
if err == fuse.ErrNotExists && err2 == fuse.ErrNotExists {
return fuse.ErrNotExists
}

return err2
})
}

func moveRemote(vfs *Vfs, tx db.SQLContext, oldChildPath []string, newParentPath []string, oldChildPathJoined string, newChildPathJoined string) error {
d := vfs.db
newPkg, err := d.Package().GetByFullName(tx, newParentPath[0], newParentPath[1])
if err != nil {
if err == gorm.ErrRecordNotFound {
return fuse.ErrNotExists
}
return err
}

// 检查目的文件或文件夹是否已经存在
_, err = d.Object().GetByPath(tx, newPkg.PackageID, newChildPathJoined)
if err == nil {
return fuse.ErrExists
}

err = d.Object().HasObjectWithPrefix(tx, newPkg.PackageID, newChildPathJoined+clitypes.ObjectPathSeparator)
if err == nil {
return fuse.ErrExists
}
if err != gorm.ErrRecordNotFound {
return err
}

// 按理来说还需要检查远程文件所在的文件夹是否存在,但对象存储是不存文件夹的,所以不检查,导致的后果就是移动时会创建不存在的文件夹

oldPkg, err := d.Package().GetByFullName(tx, oldChildPath[0], oldChildPath[1])
if err != nil {
if err == gorm.ErrRecordNotFound {
return fuse.ErrNotExists
}
return err
}

// 都不存在,就开始移动文件
oldObj, err := d.Object().GetByPath(tx, oldPkg.PackageID, oldChildPathJoined)
if err == nil {
oldObj.PackageID = newPkg.PackageID
oldObj.Path = newChildPathJoined

return d.Object().BatchUpdate(tx, []clitypes.Object{oldObj})
}
if err == gorm.ErrRecordNotFound {
return fuse.ErrNotExists
}

err = d.Object().HasObjectWithPrefix(tx, oldObj.PackageID, oldChildPathJoined+clitypes.ObjectPathSeparator)
if err == nil {
return d.Object().MoveByPrefix(tx,
oldPkg.PackageID, oldChildPathJoined+clitypes.ObjectPathSeparator,
newPkg.PackageID, newChildPathJoined+clitypes.ObjectPathSeparator,
)
}
if err == gorm.ErrRecordNotFound {
return fuse.ErrNotExists
}
return err
}

+ 17
- 29
client/internal/mount/vfs/fuse_bucket.go View File

@@ -70,7 +70,7 @@ func (r *FuseBucket) Child(ctx context.Context, name string) (fuse.FsEntry, erro
if ca == nil {
// TODO UserID

pkg, err := r.vfs.db.Package().GetUserPackageByName(r.vfs.db.DefCtx(), r.bktName, name)
pkg, err := r.vfs.db.Package().GetByFullName(r.vfs.db.DefCtx(), r.bktName, name)
if err == nil {
dir := r.vfs.cache.LoadDir(childPathComps, &cache.CreateDirOption{
ModTime: pkg.CreateTime,
@@ -135,6 +135,7 @@ func (r *FuseBucket) listChildren() ([]fuse.FsEntry, error) {
}
}

// 顺便创建一下文件夹
for _, pkg := range pkgMap {
dir := r.vfs.cache.LoadDir([]string{r.bktName, pkg.Name}, &cache.CreateDirOption{
ModTime: pkg.CreateTime,
@@ -177,17 +178,7 @@ func (r *FuseBucket) NewDir(ctx context.Context, name string) (fuse.FsDir, error
}

func (r *FuseBucket) NewFile(ctx context.Context, name string, flags uint32) (fuse.FileHandle, uint32, error) {
cache := r.vfs.cache.CreateFile([]string{r.bktName, name})
if cache == nil {
return nil, 0, fuse.ErrPermission
}
defer cache.Release()
// Open之后会给cache的引用计数额外+1,即使cache先于FileHandle被关闭,
// 也有有FileHandle的计数保持cache的有效性

fileNode := newFileFromCache(cache.Info(), r.vfs)
hd := cache.Open(flags)
return newFileHandle(fileNode, hd), flags, nil
return newFile(r.vfs, ctx, name, r, flags)
}

func (r *FuseBucket) RemoveChild(ctx context.Context, name string) error {
@@ -195,29 +186,26 @@ func (r *FuseBucket) RemoveChild(ctx context.Context, name string) error {
return r.vfs.db.DoTx(func(tx db.SQLContext) error {
d := r.vfs.db

pkg, err := d.Package().GetUserPackageByName(tx, r.bktName, name)
pkg, err := d.Package().GetByFullName(tx, r.bktName, name)
if err == nil {
has, err := d.Object().HasObjectWithPrefix(tx, pkg.PackageID, "")
if err != nil {
err = d.Object().HasObjectWithPrefix(tx, pkg.PackageID, "")
if err == nil {
return fuse.ErrNotEmpty
}
if err != gorm.ErrRecordNotFound {
return err
}
if has {
return fuse.ErrNotEmpty

err = d.Package().DeleteComplete(tx, pkg.PackageID)
if err != nil {
return err
}
} else if err != gorm.ErrRecordNotFound {
return err
}

err = r.vfs.cache.Remove([]string{r.bktName, name})
if err != nil {
} else if err != gorm.ErrRecordNotFound {
return err
}

if pkg.PackageID != 0 {
d.Package().DeleteComplete(tx, pkg.PackageID)
}

return nil
return r.vfs.cache.Remove([]string{r.bktName, name})
})
}

@@ -232,7 +220,7 @@ func (r *FuseBucket) MoveChild(ctx context.Context, oldName string, newName stri

d := r.vfs.db
return d.DoTx(func(tx db.SQLContext) error {
_, err := d.Package().GetUserPackageByName(tx, newParentPath[0], newName)
_, err := d.Package().GetByFullName(tx, newParentPath[0], newName)
if err == nil {
// 目标节点已经存在,不能重命名,直接退出
return fuse.ErrExists
@@ -242,7 +230,7 @@ func (r *FuseBucket) MoveChild(ctx context.Context, oldName string, newName stri

newBkt, err := d.Bucket().GetByName(tx, newParentPath[0])
if err == nil {
oldPkg, err := d.Package().GetUserPackageByName(tx, r.bktName, oldName)
oldPkg, err := d.Package().GetByFullName(tx, r.bktName, oldName)
if err == nil {
err = d.Package().Move(tx, oldPkg.PackageID, newBkt.BucketID, newName)
if err != nil {


+ 17
- 243
client/internal/mount/vfs/fuse_dir.go View File

@@ -3,10 +3,8 @@ package vfs
import (
"context"
"os"
"strings"
"time"

"gitlink.org.cn/cloudream/common/utils/lo2"
db2 "gitlink.org.cn/cloudream/storage2/client/internal/db"
"gitlink.org.cn/cloudream/storage2/client/internal/mount/fuse"
"gitlink.org.cn/cloudream/storage2/client/internal/mount/vfs/cache"
@@ -65,70 +63,15 @@ func (r *FuseDir) SetModTime(time time.Time) error {

// 如果不存在,应该返回ErrNotExists
func (r *FuseDir) Child(ctx context.Context, name string) (fuse.FsEntry, error) {
childPathComps := lo2.AppendNew(r.pathComps, name)
ca := r.vfs.cache.Stat(childPathComps)
if ca == nil {
var ret fuse.FsEntry

db := r.vfs.db
err := db.DoTx(func(tx db2.SQLContext) error {
pkg, err := db.Package().GetUserPackageByName(tx, r.pathComps[0], r.pathComps[1])
if err != nil {
return err
}

objPath := clitypes.JoinObjectPath(childPathComps[2:]...)
obj, err := db.Object().GetByPath(tx, pkg.PackageID, objPath)
if err == nil {
ret = newFileFromObject(r.vfs, childPathComps, obj)
return nil
}
if err != gorm.ErrRecordNotFound {
return err
}

has, err := db.Object().HasObjectWithPrefix(tx, pkg.PackageID, objPath+clitypes.ObjectPathSeparator)
if err != nil {
return err
}

if has {
dir := r.vfs.cache.LoadDir(childPathComps, &cache.CreateDirOption{
ModTime: time.Now(),
})
if dir == nil {
return nil
}

ret = newDirFromCache(dir.Info(), r.vfs)
}

return nil
})
if err != nil {
return nil, err
}

if ret == nil {
return nil, fuse.ErrNotExists
}

return ret, nil
}

if ca.IsDir {
return newDirFromCache(*ca, r.vfs), nil
}

return newFileFromCache(*ca, r.vfs), nil
return child(r.vfs, ctx, r, name)
}

func (r *FuseDir) Children(ctx context.Context) ([]fuse.FsEntry, error) {
return r.listChildren()
return listChildren(r.vfs, ctx, r)
}

func (r *FuseDir) ReadChildren() (fuse.DirReader, error) {
ens, err := r.listChildren()
ens, err := listChildren(r.vfs, context.Background(), r)
if err != nil {
return nil, err
}
@@ -136,213 +79,44 @@ func (r *FuseDir) ReadChildren() (fuse.DirReader, error) {
return newFuseDirReader(ens), nil
}

func (r *FuseDir) listChildren() ([]fuse.FsEntry, error) {
var ens []fuse.FsEntry

infos := r.vfs.cache.StatMany(r.pathComps)

dbEntries := make(map[string]fuse.FsEntry)

db := r.vfs.db
db.DoTx(func(tx db2.SQLContext) error {
// TODO UserID
pkg, err := db.Package().GetUserPackageByName(tx, r.pathComps[0], r.pathComps[1])
if err != nil {
return err
}

objPath := clitypes.JoinObjectPath(r.pathComps[2:]...)

objs, coms, err := db.Object().GetByPrefixGrouped(tx, pkg.PackageID, objPath+clitypes.ObjectPathSeparator)
if err != nil {
return err
}
for _, dir := range coms {
dir = strings.TrimSuffix(dir, clitypes.ObjectPathSeparator)
pathComps := lo2.AppendNew(r.pathComps, clitypes.BaseName(dir))

cd := r.vfs.cache.LoadDir(pathComps, &cache.CreateDirOption{
ModTime: time.Now(),
})
if cd == nil {
continue
}

dbEntries[dir] = newDirFromCache(cd.Info(), r.vfs)
}

for _, obj := range objs {
pathComps := lo2.AppendNew(r.pathComps, clitypes.BaseName(obj.Path))
file := newFileFromObject(r.vfs, pathComps, obj)
dbEntries[file.Name()] = file
}

return nil
})

for _, c := range infos {
delete(dbEntries, c.PathComps[len(c.PathComps)-1])

if c.IsDir {
ens = append(ens, newDirFromCache(c, r.vfs))
} else {
ens = append(ens, newFileFromCache(c, r.vfs))
}
}

for _, e := range dbEntries {
ens = append(ens, e)
}

return ens, nil
}

func (r *FuseDir) NewDir(ctx context.Context, name string) (fuse.FsDir, error) {
cache := r.vfs.cache.CreateDir(lo2.AppendNew(r.pathComps, name))
if cache == nil {
return nil, fuse.ErrPermission
}

return newDirFromCache(cache.Info(), r.vfs), nil
return newDir(r.vfs, ctx, name, r)
}

func (r *FuseDir) NewFile(ctx context.Context, name string, flags uint32) (fuse.FileHandle, uint32, error) {
cache := r.vfs.cache.CreateFile(lo2.AppendNew(r.pathComps, name))
if cache == nil {
return nil, 0, fuse.ErrPermission
}
defer cache.Release()
// Open之后会给cache的引用计数额外+1,即使cache先于FileHandle被关闭,
// 也有有FileHandle的计数保持cache的有效性

fileNode := newFileFromCache(cache.Info(), r.vfs)
hd := cache.Open(flags)
return newFileHandle(fileNode, hd), flags, nil
return newFile(r.vfs, ctx, name, r, flags)
}

func (r *FuseDir) RemoveChild(ctx context.Context, name string) error {
pathComps := lo2.AppendNew(r.pathComps, name)
joinedPath := clitypes.JoinObjectPath(pathComps[2:]...)
d := r.vfs.db

// TODO 生成系统事件
return r.vfs.db.DoTx(func(tx db2.SQLContext) error {
pkg, err := d.Package().GetUserPackageByName(tx, pathComps[0], pathComps[1])
if err == nil {
has, err := d.Object().HasObjectWithPrefix(tx, pkg.PackageID, joinedPath+clitypes.ObjectPathSeparator)
if err != nil {
return err
}
if has {
return fuse.ErrNotEmpty
}
}

err = r.vfs.cache.Remove(pathComps)
if err != nil {
return err
}

if pkg.PackageID > 0 {
// 存储系统不会保存目录结构,所以这里是尝试删除同名文件
d.Object().DeleteByPath(tx, pkg.PackageID, joinedPath)
}

return nil
})
return removeChild(r.vfs, ctx, name, r)
}

func (r *FuseDir) MoveChild(ctx context.Context, oldName string, newName string, newParent fuse.FsDir) error {
newParentNode := newParent.(FuseNode)
newParentPath := newParentNode.PathComps()
newChildPath := lo2.AppendNew(newParentPath, newName)
newChildPathJoined := clitypes.JoinObjectPath(newChildPath[2:]...)

// 不允许移动任何内容到Package层级以上
if len(newParentPath) < 2 {
return fuse.ErrNotSupported
}

oldChildPath := lo2.AppendNew(r.PathComps(), oldName)
oldChildPathJoined := clitypes.JoinObjectPath(oldChildPath[2:]...)

// 先更新远程,再更新本地,因为远程使用事务更新,可以回滚,而本地不行
d := r.vfs.db
return r.vfs.db.DoTx(func(tx db2.SQLContext) error {
newPkg, err := d.Package().GetUserPackageByName(tx, newParentPath[0], newParentPath[1])
if err != nil {
if err == gorm.ErrRecordNotFound {
return fuse.ErrNotExists
}
return err
}

oldPkg, err := d.Package().GetUserPackageByName(tx, oldChildPath[0], oldChildPath[1])
if err != nil {
if err == gorm.ErrRecordNotFound {
return fuse.ErrNotExists
}
return err
}

// 检查目的文件或文件夹是否已经存在
_, err = d.Object().GetByPath(tx, newPkg.PackageID, newChildPathJoined)
if err == nil {
return fuse.ErrExists
}

has, err := d.Object().HasObjectWithPrefix(tx, newPkg.PackageID, newChildPathJoined+clitypes.ObjectPathSeparator)
if err != nil {
return err
}
if has {
return fuse.ErrExists
}

// 都不存在,就开始移动文件
oldObj, err := d.Object().GetByPath(tx, oldPkg.PackageID, oldChildPathJoined)
if err == nil {
oldObj.PackageID = newPkg.PackageID
oldObj.Path = newChildPathJoined

err = d.Object().BatchUpdate(tx, []clitypes.Object{oldObj})
if err != nil {
return err
}
}

err = d.Object().MoveByPrefix(tx,
oldPkg.PackageID, oldChildPathJoined+clitypes.ObjectPathSeparator,
newPkg.PackageID, newChildPathJoined+clitypes.ObjectPathSeparator,
)
if err != nil {
return err
}

return r.vfs.cache.Move(oldChildPath, newChildPath)
})
return moveChild(r.vfs, ctx, oldName, r, newName, newParent.(FuseNode))
}

func (r *FuseDir) loadCacheDir() *cache.CacheDir {
var createOpt *cache.CreateDirOption

err := r.vfs.db.DoTx(func(tx db2.SQLContext) error {
pkg, err := r.vfs.db.Package().GetUserPackageByName(tx, r.pathComps[0], r.pathComps[1])
if err != nil {
return err
}

has, err := r.vfs.db.Object().HasObjectWithPrefix(tx, pkg.PackageID, clitypes.JoinObjectPath(r.pathComps[2:]...))
pkg, err := r.vfs.db.Package().GetByFullName(tx, r.pathComps[0], r.pathComps[1])
if err != nil {
return err
}

if has {
err = r.vfs.db.Object().HasObjectWithPrefix(tx, pkg.PackageID, clitypes.JoinObjectPath(r.pathComps[2:]...))
if err == nil {
createOpt = &cache.CreateDirOption{
ModTime: time.Now(),
}
return nil
}
return nil

if err == gorm.ErrRecordNotFound {
return nil
}

return err
})
if err != nil {
return nil


+ 8
- 232
client/internal/mount/vfs/fuse_package.go View File

@@ -3,15 +3,10 @@ package vfs
import (
"context"
"os"
"strings"
"time"

"gitlink.org.cn/cloudream/common/utils/lo2"
db2 "gitlink.org.cn/cloudream/storage2/client/internal/db"
"gitlink.org.cn/cloudream/storage2/client/internal/mount/fuse"
"gitlink.org.cn/cloudream/storage2/client/internal/mount/vfs/cache"
clitypes "gitlink.org.cn/cloudream/storage2/client/types"
"gorm.io/gorm"
)

type FusePackage struct {
@@ -68,70 +63,15 @@ func (r *FusePackage) SetModTime(time time.Time) error {

// 如果不存在,应该返回ErrNotExists
func (r *FusePackage) Child(ctx context.Context, name string) (fuse.FsEntry, error) {
childPathComps := []string{r.bktName, r.pkgName, name}
ca := r.vfs.cache.Stat(childPathComps)

if ca == nil {
var ret fuse.FsEntry

db := r.vfs.db
err := db.DoTx(func(tx db2.SQLContext) error {
pkg, err := db.Package().GetUserPackageByName(tx, r.bktName, r.pkgName)
if err != nil {
return err
}

obj, err := db.Object().GetByPath(tx, pkg.PackageID, name)
if err == nil {
ret = newFileFromObject(r.vfs, childPathComps, obj)
return nil
}
if err != gorm.ErrRecordNotFound {
return err
}

has, err := db.Object().HasObjectWithPrefix(tx, pkg.PackageID, name+clitypes.ObjectPathSeparator)
if err != nil {
return err
}

if has {
dir := r.vfs.cache.LoadDir(childPathComps, &cache.CreateDirOption{
ModTime: time.Now(),
})
if dir == nil {
return nil
}

ret = newDirFromCache(dir.Info(), r.vfs)
}

return nil
})
if err != nil {
return nil, err
}

if ret == nil {
return nil, fuse.ErrNotExists
}

return ret, nil
}

if ca.IsDir {
return newDirFromCache(*ca, r.vfs), nil
}

return newFileFromCache(*ca, r.vfs), nil
return child(r.vfs, ctx, r, name)
}

func (r *FusePackage) Children(ctx context.Context) ([]fuse.FsEntry, error) {
return r.listChildren()
return listChildren(r.vfs, ctx, r)
}

func (r *FusePackage) ReadChildren() (fuse.DirReader, error) {
ens, err := r.listChildren()
ens, err := listChildren(r.vfs, context.Background(), r)
if err != nil {
return nil, err
}
@@ -139,189 +79,25 @@ func (r *FusePackage) ReadChildren() (fuse.DirReader, error) {
return newFuseDirReader(ens), nil
}

func (r *FusePackage) listChildren() ([]fuse.FsEntry, error) {
var ens []fuse.FsEntry

infos := r.vfs.cache.StatMany([]string{r.bktName, r.pkgName})

dbEntries := make(map[string]fuse.FsEntry)

db := r.vfs.db
db.DoTx(func(tx db2.SQLContext) error {
pkg, err := db.Package().GetUserPackageByName(tx, r.bktName, r.pkgName)
if err != nil {
return err
}

objs, coms, err := db.Object().GetByPrefixGrouped(tx, pkg.PackageID, "")
if err != nil {
return err
}

for _, dir := range coms {
dir = strings.TrimSuffix(dir, clitypes.ObjectPathSeparator)
pathComps := []string{r.bktName, r.pkgName, dir}
cd := r.vfs.cache.LoadDir(pathComps, &cache.CreateDirOption{
ModTime: time.Now(),
})
if cd == nil {
continue
}

dbEntries[dir] = newDirFromCache(cd.Info(), r.vfs)
}

for _, obj := range objs {
file := newFileFromObject(r.vfs, []string{r.bktName, r.pkgName, obj.Path}, obj)
dbEntries[file.Name()] = file
}

return nil
})

for _, c := range infos {
delete(dbEntries, c.PathComps[len(c.PathComps)-1])

if c.IsDir {
ens = append(ens, newDirFromCache(c, r.vfs))
} else {
ens = append(ens, newFileFromCache(c, r.vfs))
}
}

for _, e := range dbEntries {
ens = append(ens, e)
}

return ens, nil
}

func (r *FusePackage) NewDir(ctx context.Context, name string) (fuse.FsDir, error) {
cache := r.vfs.cache.CreateDir([]string{r.bktName, r.pkgName, name})
if cache == nil {
return nil, fuse.ErrPermission
}

return newDirFromCache(cache.Info(), r.vfs), nil
return newDir(r.vfs, ctx, name, r)
}

func (r *FusePackage) NewFile(ctx context.Context, name string, flags uint32) (fuse.FileHandle, uint32, error) {
cache := r.vfs.cache.CreateFile([]string{r.bktName, r.pkgName, name})
if cache == nil {
return nil, 0, fuse.ErrPermission
}
defer cache.Release()
// Open之后会给cache的引用计数额外+1,即使cache先于FileHandle被关闭,
// 也有有FileHandle的计数保持cache的有效性

fileNode := newFileFromCache(cache.Info(), r.vfs)
hd := cache.Open(flags)
return newFileHandle(fileNode, hd), flags, nil
return newFile(r.vfs, ctx, name, r, flags)
}

func (r *FusePackage) RemoveChild(ctx context.Context, name string) error {
// TODO 生成系统事件
d := r.vfs.db
return r.vfs.db.DoTx(func(tx db2.SQLContext) error {
pkg, err := d.Package().GetUserPackageByName(tx, r.bktName, r.pkgName)
if err == nil {
has, err := d.Object().HasObjectWithPrefix(tx, pkg.PackageID, name+clitypes.ObjectPathSeparator)
if err != nil {
return err
}
if has {
return fuse.ErrNotEmpty
}
}

err = r.vfs.cache.Remove([]string{r.bktName, r.pkgName, name})
if err != nil {
return err
}

if pkg.PackageID > 0 {
// 存储系统不会保存目录结构,所以这里是尝试删除同名文件
d.Object().DeleteByPath(tx, pkg.PackageID, name)
}

return nil
})
return removeChild(r.vfs, ctx, name, r)
}

func (r *FusePackage) MoveChild(ctx context.Context, oldName string, newName string, newParent fuse.FsDir) error {
newParentNode := newParent.(FuseNode)
newParentPath := newParentNode.PathComps()
newChildPath := lo2.AppendNew(newParentPath, newName)
newChildPathJoined := clitypes.JoinObjectPath(newChildPath[2:]...)

// 不允许移动任何内容到Package层级以上
if len(newParentPath) < 2 {
return fuse.ErrNotSupported
}

oldChildPath := lo2.AppendNew(r.PathComps(), oldName)
oldChildPathJoined := clitypes.JoinObjectPath(oldChildPath[2:]...)

// 先更新远程,再更新本地,因为远程使用事务更新,可以回滚,而本地不行
d := r.vfs.db
return r.vfs.db.DoTx(func(tx db2.SQLContext) error {
newPkg, err := d.Package().GetUserPackageByName(tx, newParentPath[0], newParentPath[1])
if err != nil {
if err == gorm.ErrRecordNotFound {
return fuse.ErrNotExists
}
return err
}

oldPkg, err := d.Package().GetUserPackageByName(tx, r.bktName, r.pkgName)
if err != nil {
if err == gorm.ErrRecordNotFound {
return fuse.ErrNotExists
}
return err
}

// 检查目的地是否已经存在
_, err = d.Object().GetByPath(tx, newPkg.PackageID, newChildPathJoined)
if err == nil {
return fuse.ErrExists
}

has, err := d.Object().HasObjectWithPrefix(tx, newPkg.PackageID, newChildPathJoined+clitypes.ObjectPathSeparator)
if err != nil {
return err
}
if has {
return fuse.ErrExists
}

// 都不存在,就开始移动文件
oldObj, err := d.Object().GetByPath(tx, oldPkg.PackageID, oldChildPathJoined)
if err == nil {
oldObj.PackageID = newPkg.PackageID
oldObj.Path = newChildPathJoined

err = d.Object().BatchUpdate(tx, []clitypes.Object{oldObj})
if err != nil {
return err
}
}

err = d.Object().MoveByPrefix(tx,
oldPkg.PackageID, oldChildPathJoined+clitypes.ObjectPathSeparator,
newPkg.PackageID, newChildPathJoined+clitypes.ObjectPathSeparator,
)
if err != nil {
return err
}

return r.vfs.cache.Move(oldChildPath, newChildPath)
})
return moveChild(r.vfs, ctx, oldName, r, newName, newParent.(FuseNode))
}

func (r *FusePackage) loadCacheDir() *cache.CacheDir {
var createOpt *cache.CreateDirOption
pkg, err := r.vfs.db.Package().GetUserPackageByName(r.vfs.db.DefCtx(), r.bktName, r.pkgName)
pkg, err := r.vfs.db.Package().GetByFullName(r.vfs.db.DefCtx(), r.bktName, r.pkgName)
if err == nil {
createOpt = &cache.CreateDirOption{
ModTime: pkg.CreateTime,


+ 10
- 27
client/internal/mount/vfs/fuse_root.go View File

@@ -151,17 +151,7 @@ func (r *FuseRoot) NewDir(ctx context.Context, name string) (fuse.FsDir, error)
}

func (r *FuseRoot) NewFile(ctx context.Context, name string, flags uint32) (fuse.FileHandle, uint32, error) {
cache := r.vfs.cache.CreateFile([]string{name})
if cache == nil {
return nil, 0, fuse.ErrPermission
}
defer cache.Release()
// Open之后会给cache的引用计数额外+1,即使cache先于FileHandle被关闭,
// 也有有FileHandle的计数保持cache的有效性

fileNode := newFileFromCache(cache.Info(), r.vfs)
hd := cache.Open(flags)
return newFileHandle(fileNode, hd), flags, nil
return newFile(r.vfs, ctx, name, r, flags)
}

func (r *FuseRoot) RemoveChild(ctx context.Context, name string) error {
@@ -171,29 +161,22 @@ func (r *FuseRoot) RemoveChild(ctx context.Context, name string) error {

bkt, err := db.Bucket().GetByName(tx, name)
if err == nil {
has, err := db.Package().HasPackageIn(tx, bkt.BucketID)
if err != nil {
return err
}
if has {
err := db.Package().HasPackageIn(tx, bkt.BucketID)
if err == nil {
return fuse.ErrNotEmpty
}
if err != gorm.ErrRecordNotFound {
return err
}

} else if err != gorm.ErrRecordNotFound {
return err
}

err = r.vfs.cache.Remove([]string{name})
if err != nil {
return err
}

if bkt.BucketID != 0 {
// 不管是否成功
db.Bucket().DeleteComplete(tx, bkt.BucketID)

} else if err != gorm.ErrRecordNotFound {
return err
}

return nil
return r.vfs.cache.Remove([]string{name})
})
}



+ 1
- 1
client/internal/services/package.go View File

@@ -26,7 +26,7 @@ func (svc *PackageService) Get(packageID types.PackageID) (types.Package, error)
}

func (svc *PackageService) GetByFullName(bucketName string, packageName string) (types.Package, error) {
return svc.DB.Package().GetUserPackageByName(svc.DB.DefCtx(), bucketName, packageName)
return svc.DB.Package().GetByFullName(svc.DB.DefCtx(), bucketName, packageName)
}

func (svc *PackageService) GetBucketPackages(bucketID types.BucketID) ([]types.Package, error) {


Loading…
Cancel
Save