Browse Source

简化分段缓存机制;挂载支持显示目录结构

gitlink
Sydonian 8 months ago
parent
commit
416c43f87b
34 changed files with 2553 additions and 553 deletions
  1. +5
    -0
      client2/internal/cmd/cmd.go
  2. +105
    -0
      client2/internal/cmd/mount.go
  3. +43
    -0
      client2/internal/config/config.go
  4. +1
    -0
      client2/internal/mount/config/config.go
  5. +25
    -4
      client2/internal/mount/fuse/dir_node.go
  6. +7
    -0
      client2/internal/mount/fuse/file_node.go
  7. +8
    -6
      client2/internal/mount/fuse/fuse.go
  8. +1
    -0
      client2/internal/mount/fuse/types.go
  9. +62
    -3
      client2/internal/mount/mount.go
  10. +53
    -0
      client2/internal/mount/mount_win.go
  11. +135
    -23
      client2/internal/mount/vfs/cache/cache.go
  12. +0
    -37
      client2/internal/mount/vfs/cache/data_loader.go
  13. +98
    -5
      client2/internal/mount/vfs/cache/dir.go
  14. +385
    -192
      client2/internal/mount/vfs/cache/file.go
  15. +0
    -12
      client2/internal/mount/vfs/cache/file_saver.go
  16. +0
    -117
      client2/internal/mount/vfs/cache/file_segment.go
  17. +0
    -36
      client2/internal/mount/vfs/cache/file_segment_test.go
  18. +332
    -0
      client2/internal/mount/vfs/cache/range_test.go
  19. +95
    -0
      client2/internal/mount/vfs/cache/remote.go
  20. +143
    -12
      client2/internal/mount/vfs/cache/utils.go
  21. +231
    -3
      client2/internal/mount/vfs/fuse_bucket.go
  22. +301
    -0
      client2/internal/mount/vfs/fuse_dir.go
  23. +37
    -26
      client2/internal/mount/vfs/fuse_file.go
  24. +282
    -0
      client2/internal/mount/vfs/fuse_package.go
  25. +71
    -57
      client2/internal/mount/vfs/fuse_root.go
  26. +17
    -1
      client2/internal/mount/vfs/vfs.go
  27. +7
    -0
      client2/main.go
  28. +8
    -7
      common/pkgs/db2/bucket.go
  29. +61
    -0
      common/pkgs/db2/object.go
  30. +12
    -1
      common/pkgs/db2/package.go
  31. +12
    -2
      common/pkgs/downloader/downloader.go
  32. +2
    -5
      coordinator/internal/mq/bucket.go
  33. +5
    -4
      coordinator/internal/mq/object.go
  34. +9
    -0
      magefiles/main.go

+ 5
- 0
client2/internal/cmd/cmd.go View File

@@ -0,0 +1,5 @@
package cmd

import "github.com/spf13/cobra"

var RootCmd = cobra.Command{}

+ 105
- 0
client2/internal/cmd/mount.go View File

@@ -0,0 +1,105 @@
package cmd

import (
"fmt"
"os"
"time"

"github.com/spf13/cobra"
"gitlink.org.cn/cloudream/common/pkgs/logger"
"gitlink.org.cn/cloudream/storage/client2/internal/config"
"gitlink.org.cn/cloudream/storage/client2/internal/mount"
mntcfg "gitlink.org.cn/cloudream/storage/client2/internal/mount/config"
stgglb "gitlink.org.cn/cloudream/storage/common/globals"
"gitlink.org.cn/cloudream/storage/common/pkgs/connectivity"
"gitlink.org.cn/cloudream/storage/common/pkgs/db2"
"gitlink.org.cn/cloudream/storage/common/pkgs/downloader"
"gitlink.org.cn/cloudream/storage/common/pkgs/downloader/strategy"
agtrpc "gitlink.org.cn/cloudream/storage/common/pkgs/grpc/agent"
"gitlink.org.cn/cloudream/storage/common/pkgs/metacache"
"gitlink.org.cn/cloudream/storage/common/pkgs/storage/agtpool"
)

func init() {
var configPath string

cmd := &cobra.Command{
Use: "mount",
Args: cobra.ExactArgs(1),
Run: func(cmd *cobra.Command, args []string) {
mountCmd(args[0], configPath)
},
}
cmd.Flags().StringVarP(&configPath, "config", "c", "", "path to config file")

RootCmd.AddCommand(cmd)
}

func mountCmd(mountPoint string, configPath string) {
err := config.Init(configPath)
if err != nil {
fmt.Printf("init config failed, err: %s", err.Error())
os.Exit(1)
}

err = logger.Init(&config.Cfg().Logger)
if err != nil {
fmt.Printf("init logger failed, err: %s", err.Error())
os.Exit(1)
}

stgglb.InitLocal(&config.Cfg().Local)
stgglb.InitMQPool(config.Cfg().RabbitMQ)
stgglb.InitAgentRPCPool(&agtrpc.PoolConfig{})
stgglb.Stats.SetupHubStorageTransfer(*config.Cfg().Local.HubID)
stgglb.Stats.SetupHubTransfer(*config.Cfg().Local.HubID)

// 初始化存储服务管理器
stgAgts := agtpool.NewPool()

// 启动网络连通性检测,并就地检测一次
conCol := connectivity.NewCollector(&config.Cfg().Connectivity, nil)
// conCol.CollectInPlace()

// 初始化元数据缓存服务
metacacheHost := metacache.NewHost()
go metacacheHost.Serve()
stgMeta := metacacheHost.AddStorageMeta()
hubMeta := metacacheHost.AddHubMeta()
conMeta := metacacheHost.AddConnectivity()

// 初始化下载策略选择器
strgSel := strategy.NewSelector(config.Cfg().DownloadStrategy, stgMeta, hubMeta, conMeta)

// 初始化下载器
dlder := downloader.NewDownloader(config.Cfg().Downloader, &conCol, stgAgts, strgSel)

db, err := db2.NewDB(&config.Cfg().DB)
if err != nil {
logger.Fatalf("new db2 failed, err: %s", err.Error())
}

mnt := mount.NewMount(&mntcfg.Config{
CacheDir: "./cache",
MountPoint: mountPoint,
AttrTimeout: time.Second * 5,
}, db, &dlder)

ch := mnt.Start()
for {
evt, err := ch.Receive()
if err != nil {
break
}

switch e := evt.(type) {
case mount.MountingFailedEvent:
fmt.Println("mounting failed:", e.Err)
return

case mount.MountExitEvent:
fmt.Printf("mount exit\n")
return
}
}
}

+ 43
- 0
client2/internal/config/config.go View File

@@ -0,0 +1,43 @@
package config

import (
"gitlink.org.cn/cloudream/common/pkgs/distlock"
log "gitlink.org.cn/cloudream/common/pkgs/logger"
"gitlink.org.cn/cloudream/common/pkgs/mq"
cdssdk "gitlink.org.cn/cloudream/common/sdks/storage"
c "gitlink.org.cn/cloudream/common/utils/config"
stgmodels "gitlink.org.cn/cloudream/storage/common/models"
"gitlink.org.cn/cloudream/storage/common/pkgs/connectivity"
db "gitlink.org.cn/cloudream/storage/common/pkgs/db2/config"
"gitlink.org.cn/cloudream/storage/common/pkgs/downloader"
"gitlink.org.cn/cloudream/storage/common/pkgs/downloader/strategy"
"gitlink.org.cn/cloudream/storage/common/pkgs/grpc"
)

type Config struct {
ID cdssdk.HubID `json:"id"`
ListenAddr string `json:"listenAddr"`
Local stgmodels.LocalMachineInfo `json:"local"`
GRPC *grpc.Config `json:"grpc"`
Logger log.Config `json:"logger"`
RabbitMQ mq.Config `json:"rabbitMQ"`
DistLock distlock.Config `json:"distlock"`
Connectivity connectivity.Config `json:"connectivity"`
Downloader downloader.Config `json:"downloader"`
DownloadStrategy strategy.Config `json:"downloadStrategy"`
DB db.Config `json:"db"`
}

var cfg Config

func Init(path string) error {
if path == "" {
return c.DefaultLoad("client2", &cfg)
}

return c.Load(path, &cfg)
}

func Cfg() *Config {
return &cfg
}

+ 1
- 0
client2/internal/mount/config/config.go View File

@@ -4,6 +4,7 @@ import "time"


type Config struct { type Config struct {
CacheDir string `json:"cacheDir"` CacheDir string `json:"cacheDir"`
MountPoint string `json:"mountPoint"`
GID uint32 `json:"gid"` GID uint32 `json:"gid"`
UID uint32 `json:"uid"` UID uint32 `json:"uid"`
AttrTimeout time.Duration `json:"attrTimeout"` AttrTimeout time.Duration `json:"attrTimeout"`


+ 25
- 4
client2/internal/mount/fuse/dir_node.go View File

@@ -9,6 +9,7 @@ import (


fusefs "github.com/hanwen/go-fuse/v2/fs" fusefs "github.com/hanwen/go-fuse/v2/fs"
"github.com/hanwen/go-fuse/v2/fuse" "github.com/hanwen/go-fuse/v2/fuse"
"gitlink.org.cn/cloudream/common/pkgs/logger"
) )


type DirNode struct { type DirNode struct {
@@ -21,12 +22,16 @@ func newDirNode(fs *Fuse, dir FsDir) *DirNode {
} }


func (n *DirNode) Getattr(ctx context.Context, f fusefs.FileHandle, out *fuse.AttrOut) syscall.Errno { func (n *DirNode) Getattr(ctx context.Context, f fusefs.FileHandle, out *fuse.AttrOut) syscall.Errno {
logger.Tracef("DirNode.Getattr: %v", n.dir.Name())

n.fs.fillAttrOut(n.dir, out) n.fs.fillAttrOut(n.dir, out)
return 0 return 0
} }


// Setattr sets attributes for an Inode. // Setattr sets attributes for an Inode.
func (n *DirNode) Setattr(ctx context.Context, f fusefs.FileHandle, in *fuse.SetAttrIn, out *fuse.AttrOut) (errno syscall.Errno) { func (n *DirNode) Setattr(ctx context.Context, f fusefs.FileHandle, in *fuse.SetAttrIn, out *fuse.AttrOut) (errno syscall.Errno) {
logger.Tracef("DirNode.Setattr: %v", n.dir.Name())

n.fs.fillAttrOut(n.dir, out) n.fs.fillAttrOut(n.dir, out)


_, ok := in.GetSize() _, ok := in.GetSize()
@@ -55,6 +60,8 @@ const accessModeMask = (os.O_RDONLY | os.O_WRONLY | os.O_RDWR)
// Open opens an Inode (of regular file type) for reading. It // Open opens an Inode (of regular file type) for reading. It
// is optional but recommended to return a FileHandle. // is optional but recommended to return a FileHandle.
func (n *DirNode) Open(ctx context.Context, flags uint32) (fh fusefs.FileHandle, fuseFlags uint32, errno syscall.Errno) { func (n *DirNode) Open(ctx context.Context, flags uint32) (fh fusefs.FileHandle, fuseFlags uint32, errno syscall.Errno) {
logger.Tracef("DirNode.Open: %v, %#o", n.dir.Name(), flags)

rdwrMode := int(flags) & accessModeMask rdwrMode := int(flags) & accessModeMask
if rdwrMode != os.O_RDONLY { if rdwrMode != os.O_RDONLY {
return nil, 0, syscall.EPERM return nil, 0, syscall.EPERM
@@ -71,6 +78,8 @@ func (n *DirNode) Open(ctx context.Context, flags uint32) (fh fusefs.FileHandle,
var _ = (fusefs.NodeOpener)((*DirNode)(nil)) var _ = (fusefs.NodeOpener)((*DirNode)(nil))


func (n *DirNode) Lookup(ctx context.Context, name string, out *fuse.EntryOut) (inode *fusefs.Inode, errno syscall.Errno) { func (n *DirNode) Lookup(ctx context.Context, name string, out *fuse.EntryOut) (inode *fusefs.Inode, errno syscall.Errno) {
logger.Tracef("DirNode.Lookup: %v, %v", n.dir.Name(), name)

child, err := n.dir.Child(ctx, name) child, err := n.dir.Child(ctx, name)
if err != nil { if err != nil {
return nil, translateError(err) return nil, translateError(err)
@@ -81,7 +90,7 @@ func (n *DirNode) Lookup(ctx context.Context, name string, out *fuse.EntryOut) (
node := newDirNode(n.fs, child) node := newDirNode(n.fs, child)
n.fs.fillEntryOut(child, out) n.fs.fillEntryOut(child, out)


return node.NewInode(ctx, node, fusefs.StableAttr{
return n.NewInode(ctx, node, fusefs.StableAttr{
Mode: out.Attr.Mode, Mode: out.Attr.Mode,
}), 0 }), 0


@@ -89,7 +98,7 @@ func (n *DirNode) Lookup(ctx context.Context, name string, out *fuse.EntryOut) (
node := newFileNode(n.fs, child) node := newFileNode(n.fs, child)
n.fs.fillEntryOut(child, out) n.fs.fillEntryOut(child, out)


return node.NewInode(ctx, node, fusefs.StableAttr{
return n.NewInode(ctx, node, fusefs.StableAttr{
Mode: out.Attr.Mode, Mode: out.Attr.Mode,
}), 0 }), 0


@@ -133,6 +142,8 @@ func (s *dirStream) Close() {
} }


func (n *DirNode) Readdir(ctx context.Context) (ds fusefs.DirStream, errno syscall.Errno) { func (n *DirNode) Readdir(ctx context.Context) (ds fusefs.DirStream, errno syscall.Errno) {
logger.Tracef("DirNode.Readdir: %v", n.dir.Name())

reader, err := n.dir.ReadChildren() reader, err := n.dir.ReadChildren()
if err != nil { if err != nil {
return nil, translateError(err) return nil, translateError(err)
@@ -144,6 +155,8 @@ func (n *DirNode) Readdir(ctx context.Context) (ds fusefs.DirStream, errno sysca
var _ = (fusefs.NodeReaddirer)((*DirNode)(nil)) var _ = (fusefs.NodeReaddirer)((*DirNode)(nil))


func (n *DirNode) Mkdir(ctx context.Context, name string, mode uint32, out *fuse.EntryOut) (inode *fusefs.Inode, errno syscall.Errno) { func (n *DirNode) Mkdir(ctx context.Context, name string, mode uint32, out *fuse.EntryOut) (inode *fusefs.Inode, errno syscall.Errno) {
logger.Tracef("DirNode.Mkdir: %v, %v, %#o", n.dir.Name(), name, mode)

newDir, err := n.dir.NewDir(ctx, name) newDir, err := n.dir.NewDir(ctx, name)
if err != nil { if err != nil {
return nil, translateError(err) return nil, translateError(err)
@@ -152,7 +165,7 @@ func (n *DirNode) Mkdir(ctx context.Context, name string, mode uint32, out *fuse
node := newDirNode(n.fs, newDir) node := newDirNode(n.fs, newDir)
n.fs.fillEntryOut(newDir, out) n.fs.fillEntryOut(newDir, out)


return node.NewInode(ctx, node, fusefs.StableAttr{
return n.NewInode(ctx, node, fusefs.StableAttr{
Mode: out.Attr.Mode, Mode: out.Attr.Mode,
}), 0 }), 0
} }
@@ -160,6 +173,8 @@ func (n *DirNode) Mkdir(ctx context.Context, name string, mode uint32, out *fuse
var _ = (fusefs.NodeMkdirer)((*DirNode)(nil)) var _ = (fusefs.NodeMkdirer)((*DirNode)(nil))


func (n *DirNode) Create(ctx context.Context, name string, flags uint32, mode uint32, out *fuse.EntryOut) (node *fusefs.Inode, fh fusefs.FileHandle, fuseFlags uint32, errno syscall.Errno) { func (n *DirNode) Create(ctx context.Context, name string, flags uint32, mode uint32, out *fuse.EntryOut) (node *fusefs.Inode, fh fusefs.FileHandle, fuseFlags uint32, errno syscall.Errno) {
logger.Tracef("DirNode.Create: %v, %v, %#o, %#o", n.dir.Name(), name, flags, mode)

hd, err := n.dir.NewFile(ctx, name, flags) hd, err := n.dir.NewFile(ctx, name, flags)
if err != nil { if err != nil {
return nil, nil, 0, translateError(err) return nil, nil, 0, translateError(err)
@@ -168,7 +183,7 @@ func (n *DirNode) Create(ctx context.Context, name string, flags uint32, mode ui
n.fs.fillEntryOut(hd.Entry(), out) n.fs.fillEntryOut(hd.Entry(), out)


fileNode := newFileNode(n.fs, hd.Entry()) fileNode := newFileNode(n.fs, hd.Entry())
return fileNode.NewInode(ctx, fileNode, fusefs.StableAttr{
return n.NewInode(ctx, fileNode, fusefs.StableAttr{
Mode: out.Attr.Mode, Mode: out.Attr.Mode,
}), hd, 0, 0 }), hd, 0, 0
} }
@@ -179,6 +194,8 @@ var _ = (fusefs.NodeCreater)((*DirNode)(nil))
// return status is OK, the Inode is removed as child in the // return status is OK, the Inode is removed as child in the
// FS tree automatically. Default is to return EROFS. // FS tree automatically. Default is to return EROFS.
func (n *DirNode) Unlink(ctx context.Context, name string) (errno syscall.Errno) { func (n *DirNode) Unlink(ctx context.Context, name string) (errno syscall.Errno) {
logger.Tracef("DirNode.Unlink: %v, %v", n.dir.Name(), name)

return translateError(n.dir.RemoveChild(ctx, name)) return translateError(n.dir.RemoveChild(ctx, name))
} }


@@ -187,12 +204,16 @@ var _ = (fusefs.NodeUnlinker)((*DirNode)(nil))
// Rmdir is like Unlink but for directories. // Rmdir is like Unlink but for directories.
// Default is to return EROFS. // Default is to return EROFS.
func (n *DirNode) Rmdir(ctx context.Context, name string) (errno syscall.Errno) { func (n *DirNode) Rmdir(ctx context.Context, name string) (errno syscall.Errno) {
logger.Tracef("DirNode.Rmdir: %v, %v", n.dir.Name(), name)

return translateError(n.dir.RemoveChild(ctx, name)) return translateError(n.dir.RemoveChild(ctx, name))
} }


var _ = (fusefs.NodeRmdirer)((*DirNode)(nil)) var _ = (fusefs.NodeRmdirer)((*DirNode)(nil))


func (n *DirNode) Rename(ctx context.Context, oldName string, newParent fusefs.InodeEmbedder, newName string, flags uint32) (errno syscall.Errno) { func (n *DirNode) Rename(ctx context.Context, oldName string, newParent fusefs.InodeEmbedder, newName string, flags uint32) (errno syscall.Errno) {
logger.Tracef("DirNode.Rename: %v/%v->%v, %#o", n.dir.Name(), oldName, newName, flags)

newParentNode, ok := newParent.(*DirNode) newParentNode, ok := newParent.(*DirNode)
if !ok { if !ok {
return syscall.ENOTDIR return syscall.ENOTDIR


+ 7
- 0
client2/internal/mount/fuse/file_node.go View File

@@ -8,6 +8,7 @@ import (


fusefs "github.com/hanwen/go-fuse/v2/fs" fusefs "github.com/hanwen/go-fuse/v2/fs"
"github.com/hanwen/go-fuse/v2/fuse" "github.com/hanwen/go-fuse/v2/fuse"
"gitlink.org.cn/cloudream/common/pkgs/logger"
) )


type FileNode struct { type FileNode struct {
@@ -23,11 +24,15 @@ func newFileNode(fs *Fuse, file FsFile) *FileNode {
} }


func (n *FileNode) Getattr(ctx context.Context, f fusefs.FileHandle, out *fuse.AttrOut) syscall.Errno { func (n *FileNode) Getattr(ctx context.Context, f fusefs.FileHandle, out *fuse.AttrOut) syscall.Errno {
logger.Tracef("FileNode.Getattr: %v", n.file.Name())

n.fs.fillAttrOut(n.file, out) n.fs.fillAttrOut(n.file, out)
return 0 return 0
} }


func (n *FileNode) Setattr(ctx context.Context, f fusefs.FileHandle, in *fuse.SetAttrIn, out *fuse.AttrOut) (errno syscall.Errno) { func (n *FileNode) Setattr(ctx context.Context, f fusefs.FileHandle, in *fuse.SetAttrIn, out *fuse.AttrOut) (errno syscall.Errno) {
logger.Tracef("FileNode.Setattr: %v", n.file.Name())

n.fs.fillAttrOut(n.file, out) n.fs.fillAttrOut(n.file, out)


size, ok := in.GetSize() size, ok := in.GetSize()
@@ -55,6 +60,8 @@ func (n *FileNode) Setattr(ctx context.Context, f fusefs.FileHandle, in *fuse.Se
var _ = (fusefs.NodeSetattrer)((*FileNode)(nil)) var _ = (fusefs.NodeSetattrer)((*FileNode)(nil))


func (n *FileNode) Open(ctx context.Context, flags uint32) (fh fusefs.FileHandle, fuseFlags uint32, errno syscall.Errno) { func (n *FileNode) Open(ctx context.Context, flags uint32) (fh fusefs.FileHandle, fuseFlags uint32, errno syscall.Errno) {
logger.Tracef("FileNode.Open: %v, %#o", n.file.Name(), flags)

// 只支持以下标志: // 只支持以下标志:
// os.O_RDONLY // os.O_RDONLY
// os.O_WRONLY // os.O_WRONLY


+ 8
- 6
client2/internal/mount/fuse/fuse.go View File

@@ -12,17 +12,19 @@ import (
) )


type Fuse struct { type Fuse struct {
fs Fs
rootDir FsDir
config *config.Config
config *config.Config
fs Fs
} }


func NewFuse(fs Fs, rootDir FsDir, config *config.Config) *Fuse {
return &Fuse{fs: fs, rootDir: rootDir, config: config}
func NewFuse(config *config.Config, fs Fs) *Fuse {
return &Fuse{
config: config,
fs: fs,
}
} }


func (v *Fuse) Root() fusefs.InodeEmbedder { func (v *Fuse) Root() fusefs.InodeEmbedder {
return newDirNode(v, v.rootDir)
return newDirNode(v, v.fs.Root())
} }


func translateError(err error) syscall.Errno { func translateError(err error) syscall.Errno {


+ 1
- 0
client2/internal/mount/fuse/types.go View File

@@ -16,6 +16,7 @@ var (


type Fs interface { type Fs interface {
Stats() FsStats Stats() FsStats
Root() FsDir
} }


type FsStats struct { type FsStats struct {


+ 62
- 3
client2/internal/mount/mount.go View File

@@ -1,11 +1,70 @@
//go:build linux || (darwin && amd64)

package mount package mount


import cdssdk "gitlink.org.cn/cloudream/common/sdks/storage"
import (
fusefs "github.com/hanwen/go-fuse/v2/fs"
"github.com/hanwen/go-fuse/v2/fuse"
cdssdk "gitlink.org.cn/cloudream/common/sdks/storage"
"gitlink.org.cn/cloudream/common/utils/sync2"
"gitlink.org.cn/cloudream/storage/client2/internal/mount/config"
fuse2 "gitlink.org.cn/cloudream/storage/client2/internal/mount/fuse"
"gitlink.org.cn/cloudream/storage/client2/internal/mount/vfs"
"gitlink.org.cn/cloudream/storage/common/pkgs/db2"
"gitlink.org.cn/cloudream/storage/common/pkgs/downloader"
)

type MountEvent interface {
IsMountEvent() bool
}

type MountExitEvent struct {
MountEvent
}

type MountingFailedEvent struct {
MountEvent
Err error
}

type Mount struct {
cfg *config.Config
vfs *vfs.Vfs
fuse *fuse2.Fuse
}

func NewMount(cfg *config.Config, db *db2.DB, downloader *downloader.Downloader) *Mount {
vfs := vfs.NewVfs(cfg, db, downloader)
fuse := fuse2.NewFuse(cfg, vfs)

return &Mount{
cfg: cfg,
vfs: vfs,
fuse: fuse,
}
}

func (m *Mount) Start() *sync2.UnboundChannel[MountEvent] {
ch := sync2.NewUnboundChannel[MountEvent]()
go func() {


type Mount struct{}
nodeFsOpt := &fusefs.Options{
MountOptions: fuse.MountOptions{
FsName: "CDS",
},
}
rawFs := fusefs.NewNodeFS(m.fuse.Root(), nodeFsOpt)


func (m *Mount) Start() {
svr, err := fuse.NewServer(rawFs, m.cfg.MountPoint, &nodeFsOpt.MountOptions)
if err != nil {
ch.Send(MountingFailedEvent{Err: err})
return
}


svr.Serve()
ch.Send(MountExitEvent{})
}()
return ch
} }


func (m *Mount) NotifyObjectInvalid(obj cdssdk.Object) { func (m *Mount) NotifyObjectInvalid(obj cdssdk.Object) {


+ 53
- 0
client2/internal/mount/mount_win.go View File

@@ -0,0 +1,53 @@
//go:build windows

package mount

import (
"fmt"

cdssdk "gitlink.org.cn/cloudream/common/sdks/storage"
"gitlink.org.cn/cloudream/common/utils/sync2"
"gitlink.org.cn/cloudream/storage/client2/internal/mount/config"
"gitlink.org.cn/cloudream/storage/common/pkgs/db2"
"gitlink.org.cn/cloudream/storage/common/pkgs/downloader"
)

type MountEvent interface {
IsMountEvent() bool
}

type MountExitEvent struct {
MountEvent
}

type MountingFailedEvent struct {
MountEvent
Err error
}

type Mount struct {
}

func NewMount(cfg *config.Config, db *db2.DB, downloader *downloader.Downloader) *Mount {
return &Mount{}
}

func (m *Mount) Start() *sync2.UnboundChannel[MountEvent] {
ch := sync2.NewUnboundChannel[MountEvent]()
go func() {
ch.Send(MountingFailedEvent{Err: fmt.Errorf("not implemented")})
}()
return ch
}

func (m *Mount) NotifyObjectInvalid(obj cdssdk.Object) {

}

func (m *Mount) NotifyPackageInvalid(pkg cdssdk.Package) {

}

func (m *Mount) NotifyBucketInvalid(bkt cdssdk.Bucket) {

}

+ 135
- 23
client2/internal/mount/vfs/cache/cache.go View File

@@ -1,11 +1,17 @@
package cache package cache


import ( import (
"fmt"
"os" "os"
"path/filepath" "path/filepath"
"time"


"gitlink.org.cn/cloudream/common/pkgs/logger"
cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" cdssdk "gitlink.org.cn/cloudream/common/sdks/storage"
"gitlink.org.cn/cloudream/common/utils/lo2"
"gitlink.org.cn/cloudream/storage/client2/internal/mount/fuse" "gitlink.org.cn/cloudream/storage/client2/internal/mount/fuse"
"gitlink.org.cn/cloudream/storage/common/pkgs/db2"
"gitlink.org.cn/cloudream/storage/common/pkgs/downloader"
) )


type CacheEntry interface { type CacheEntry interface {
@@ -13,14 +19,33 @@ type CacheEntry interface {
// 在虚拟文件系统中的路径,即不包含缓存目录的路径 // 在虚拟文件系统中的路径,即不包含缓存目录的路径
PathComps() []string PathComps() []string
// 不再使用本缓存条目 // 不再使用本缓存条目
Release()
// Release()
}

type CacheEntryInfo struct {
PathComps []string
Size int64
Mode os.FileMode
ModTime time.Time
IsDir bool
} }


type Cache struct { type Cache struct {
db *db2.DB
downloader *downloader.Downloader
cacheDataDir string cacheDataDir string
cacheMetaDir string cacheMetaDir string
} }


func NewCache(db *db2.DB, downloader *downloader.Downloader, cacheDataDir, cacheMetaDir string) *Cache {
return &Cache{
db: db,
downloader: downloader,
cacheDataDir: cacheDataDir,
cacheMetaDir: cacheMetaDir,
}
}

func (c *Cache) GetCacheDataPath(comps ...string) string { func (c *Cache) GetCacheDataPath(comps ...string) string {
comps2 := make([]string, len(comps)+1) comps2 := make([]string, len(comps)+1)
comps2[0] = c.cacheDataDir comps2[0] = c.cacheDataDir
@@ -49,59 +74,146 @@ func (c *Cache) GetCacheMetaPathComps(comps ...string) []string {
return comps2 return comps2
} }


// 加载指定路径的缓存文件或者目录,如果路径不存在,则返回nil。
func (c *Cache) LoadAny(pathComps []string) CacheEntry {
pat := c.GetCacheMetaPath(pathComps...)
info, err := os.Stat(pat)
// 获取指定位置的缓存条目信息。如果路径不存在,则返回nil。
func (c *Cache) Stat(pathComps []string) *CacheEntryInfo {
metaPath := c.GetCacheMetaPath(pathComps...)
stat, err := os.Stat(metaPath)
if err != nil { if err != nil {
// TODO 日志记录 // TODO 日志记录
return nil return nil
} }


if info.IsDir() {
return newCacheDir(pathComps, info)
if stat.IsDir() {
info, err := loadCacheDirInfo(c, pathComps)
if err != nil {
return nil
}

return info
}

info, err := loadCacheFileInfo(c, pathComps)
if err != nil {
return nil
} }


file, err := loadCacheFile(pathComps, pat)
return info
}

// 创建一个缓存文件。如果文件已经存在,则会覆盖已有文件。如果加载过程中发生了错误,或者目标位置是一个目录,则会返回nil。
func (c *Cache) CreateFile(pathComps []string) *CacheFile {
ch, err := createNewCacheFile(c, pathComps)
if err != nil { if err != nil {
// TODO 日志记录 // TODO 日志记录
return nil return nil
} }
return file
return ch
} }


// 加载指定缓存文件,如果文件不存在,则根据create参数决定是否创建文件。
//
// 如果create为false,且文件不存在,则返回nil。如果目标位置是一个目录,则也会返回nil。
func (c *Cache) LoadFile(pathComps []string, create bool) *CacheFile {
// 尝试加载缓存文件,如果文件不存在,则使用obj的信息创建一个新缓存文件,而如果obj为nil,那么会返回nil。
func (c *Cache) LoadFile(pathComps []string, obj *cdssdk.Object) *CacheFile {
ch, err := loadCacheFile(c, pathComps)
if err == nil {
ch.remoteObj = obj
return ch
}


if !os.IsNotExist(err) {
// TODO 日志记录
logger.Tracef("load cache file: %v", err)
return nil
}

if obj == nil {
return nil
}

ch, err = makeCacheFileFromObject(c, pathComps, obj)
if err != nil {
// TODO 日志记录
logger.Tracef("make cache file from object: %v", err)
return nil
}
return ch
} }


// 加载指定缓存文件,如果文件不存在,则根据obj参数创建缓存文件。
func (c *Cache) LoadOrCreateFile(pathComps []string, obj cdssdk.Object) *CacheFile {
// 创建一个缓存目录。如果目录已经存在,则会重置目录属性。如果加载过程中发生了错误,或者目标位置是一个文件,则会返回nil
func (c *Cache) CreateDir(pathComps []string) *CacheDir {
ch, err := createNewCacheDir(c, pathComps)
if err != nil {
// TODO 日志记录
return nil
}
return ch
}


type CreateDirOption struct {
ModTime time.Time
} }


// 加载指定缓存目录,如果目录不存在,则根据create参数决定是否创建目录。
//
// 如果create为false,且目录不存在,则返回nil。如果目标位置是一个文件,则也会返回nil。
func (c *Cache) LoadDir(pathComps []string, create bool) *CacheDir {
// 加载指定缓存目录,如果目录不存在,则使用createOpt选项创建目录,而如果createOpt为nil,那么会返回nil。
func (c *Cache) LoadDir(pathComps []string, createOpt *CreateDirOption) *CacheDir {
ch, err := loadCacheDir(c, pathComps)
if err == nil {
return ch
}


if !os.IsNotExist(err) {
// TODO 日志记录
return nil
}

if createOpt == nil {
return nil
}

// 创建目录
ch, err = makeCacheDirFromOption(c, pathComps, *createOpt)
if err != nil {
// TODO 日志记录
return nil
}
return ch
} }


// 加载指定路径下所有的缓存文件或者目录,如果路径不存在,则返回nil。
func (c *Cache) LoadMany(pathComps []string) []CacheEntry {
// 加载指定路径下的所有缓存条目信息
func (c *Cache) StatMany(pathComps []string) []CacheEntryInfo {
osEns, err := os.ReadDir(c.GetCacheMetaPath(pathComps...))
if err != nil {
return nil
}

var infos []CacheEntryInfo
for _, e := range osEns {


if e.IsDir() {
info, err := loadCacheDirInfo(c, append(lo2.ArrayClone(pathComps), e.Name()))
if err != nil {
continue
}

infos = append(infos, *info)
} else {
info, err := loadCacheFileInfo(c, append(lo2.ArrayClone(pathComps), e.Name()))
if err != nil {
continue
}

infos = append(infos, *info)
}
}

return infos
} }


// 删除指定路径的缓存文件或目录。删除目录时如果目录不为空,则会报错。 // 删除指定路径的缓存文件或目录。删除目录时如果目录不为空,则会报错。
func (c *Cache) Remove(pathComps []string) error { func (c *Cache) Remove(pathComps []string) error {

return fmt.Errorf("not implemented")
} }


// 移动指定路径的缓存文件或目录到新的路径。如果目标路径已经存在,则会报错。 // 移动指定路径的缓存文件或目录到新的路径。如果目标路径已经存在,则会报错。
// //
// 如果移动成功,则返回移动后的缓存文件或目录。如果文件或目录不存在,则返回nil。 // 如果移动成功,则返回移动后的缓存文件或目录。如果文件或目录不存在,则返回nil。
func (c *Cache) Move(pathComps []string, newPathComps []string) (CacheEntry, error) { func (c *Cache) Move(pathComps []string, newPathComps []string) (CacheEntry, error) {

return nil, fmt.Errorf("not implemented")
} }

+ 0
- 37
client2/internal/mount/vfs/cache/data_loader.go View File

@@ -1,37 +0,0 @@
package cache

import (
"io"
"os"

stgmod "gitlink.org.cn/cloudream/storage/common/models"
)

type LocalLoader struct {
cache *CacheFile
file *os.File
offset int64
// 当前正在加载的数据段。为nil表示没有加载任何数据。
loading *FileSegment
// 最多加载到哪个位置。-1代表无限制。
limit int64
}

// 启动数据加载。通过设置seg的Position和Length来指定要加载的数据范围。
func (l *LocalLoader) BeginLoading(seg *FileSegment) {

}

type RemoteLoader struct {
cache *CacheFile
object stgmod.ObjectDetail
reader io.ReadCloser
offset int64
// 当前正在加载的数据段。为nil表示没有加载任何数据。
loading *FileSegment
// 最多加载到哪个位置。-1代表无限制。
limit int64
}

func (l *RemoteLoader) BeginLoading(seg *FileSegment) {
}

+ 98
- 5
client2/internal/mount/vfs/cache/dir.go View File

@@ -1,6 +1,7 @@
package cache package cache


import ( import (
"fmt"
"os" "os"
"time" "time"
) )
@@ -12,13 +13,90 @@ type CacheDir struct {
perm os.FileMode perm os.FileMode
} }


func newCacheDir(pathComps []string, info os.FileInfo) *CacheDir {
func createNewCacheDir(c *Cache, pathComps []string) (*CacheDir, error) {
metaPath := c.GetCacheMetaPath(pathComps...)
dataPath := c.GetCacheDataPath(pathComps...)

err := os.MkdirAll(metaPath, 0755)
if err != nil {
return nil, err
}

err = os.MkdirAll(dataPath, 0755)
if err != nil {
return nil, err
}

// 修改文件夹的修改时间
modTime := time.Now()
os.Chtimes(metaPath, modTime, modTime)

return &CacheDir{
pathComps: pathComps,
name: pathComps[len(pathComps)-1],
modTime: modTime,
perm: 0755,
}, nil
}

func loadCacheDir(c *Cache, pathComps []string) (*CacheDir, error) {
stat, err := os.Stat(c.GetCacheMetaPath(pathComps...))
if err != nil {
return nil, err
}

if !stat.IsDir() {
return nil, fmt.Errorf("not a directory")
}

return &CacheDir{
pathComps: pathComps,
name: stat.Name(),
modTime: stat.ModTime(),
perm: stat.Mode().Perm(),
}, nil
}

func makeCacheDirFromOption(c *Cache, pathComps []string, opt CreateDirOption) (*CacheDir, error) {
metaPath := c.GetCacheMetaPath(pathComps...)
dataPath := c.GetCacheDataPath(pathComps...)

err := os.MkdirAll(metaPath, 0755)
if err != nil {
return nil, err
}

err = os.MkdirAll(dataPath, 0755)
if err != nil {
return nil, err
}

os.Chtimes(metaPath, opt.ModTime, opt.ModTime)
return &CacheDir{ return &CacheDir{
pathComps: pathComps, pathComps: pathComps,
name: info.Name(),
modTime: info.ModTime(),
perm: info.Mode().Perm(),
name: pathComps[len(pathComps)-1],
modTime: opt.ModTime,
perm: 0755,
}, nil
}

func loadCacheDirInfo(c *Cache, pathComps []string) (*CacheEntryInfo, error) {
stat, err := os.Stat(c.GetCacheMetaPath(pathComps...))
if err != nil {
return nil, err
}

if !stat.IsDir() {
return nil, fmt.Errorf("not a directory")
} }

return &CacheEntryInfo{
PathComps: pathComps,
Size: 0,
Mode: stat.Mode(),
ModTime: stat.ModTime(),
IsDir: true,
}, nil
} }


func (f *CacheDir) PathComps() []string { func (f *CacheDir) PathComps() []string {
@@ -45,6 +123,21 @@ func (f *CacheDir) IsDir() bool {
return true return true
} }


func (f *CacheDir) Release() {
func (f *CacheDir) Info() CacheEntryInfo {
return CacheEntryInfo{
PathComps: f.pathComps,
Size: 0,
Mode: f.perm,
ModTime: f.modTime,
IsDir: true,
}
}


func (f *CacheDir) SetModTime(modTime time.Time) error {
// TODO 修改文件夹的修改时间
return nil
} }

// func (f *CacheDir) Release() {

// }

+ 385
- 192
client2/internal/mount/vfs/cache/file.go View File

@@ -1,50 +1,51 @@
package cache package cache


import ( import (
"context"
"io"
"fmt"
"os" "os"
"path/filepath"
"sync" "sync"
"time" "time"


"gitlink.org.cn/cloudream/common/pkgs/future"
"gitlink.org.cn/cloudream/common/pkgs/logger"
cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" cdssdk "gitlink.org.cn/cloudream/common/sdks/storage"
"gitlink.org.cn/cloudream/common/utils/lo2" "gitlink.org.cn/cloudream/common/utils/lo2"
"gitlink.org.cn/cloudream/common/utils/math2" "gitlink.org.cn/cloudream/common/utils/math2"
"gitlink.org.cn/cloudream/common/utils/serder" "gitlink.org.cn/cloudream/common/utils/serder"
"gitlink.org.cn/cloudream/storage/client2/internal/mount/fuse" "gitlink.org.cn/cloudream/storage/client2/internal/mount/fuse"
"gitlink.org.cn/cloudream/storage/common/pkgs/downloader"
) )


type FileInfo struct { type FileInfo struct {
// 文件总大小。可能会超过对应的远端文件的大小。 // 文件总大小。可能会超过对应的远端文件的大小。
// 此大小可能与本地缓存文件大小也不,需要定时将本地缓存文件大小修正到与这个值相同。
// 此大小可能与本地缓存文件大小也不,需要定时将本地缓存文件大小修正到与这个值相同。
Size int64 Size int64
// 本文件是否有未提交的修改 // 本文件是否有未提交的修改
Dirty bool Dirty bool
// 数据段列表,按照段开始位置从小到大排列 // 数据段列表,按照段开始位置从小到大排列
Segments []Range
Segments []*Range
// 文件对应的对象ID,仅在文件是一个缓存文件时才有值 // 文件对应的对象ID,仅在文件是一个缓存文件时才有值
ObjectID cdssdk.ObjectID
// ObjectID cdssdk.ObjectID
// 文件对应的对象大小,仅在文件是一个缓存文件时才有值。 // 文件对应的对象大小,仅在文件是一个缓存文件时才有值。
// 此值代表有多少数据应该从远端加载,所以可能会小于远端实际大小 // 此值代表有多少数据应该从远端加载,所以可能会小于远端实际大小
ObjectSize int64 ObjectSize int64
// 如果本文件完全是一个缓存文件,那么这个字段记录了其内容的哈希值,用于在下载缓存数据时,检查远端文件是否被修改过 // 如果本文件完全是一个缓存文件,那么这个字段记录了其内容的哈希值,用于在下载缓存数据时,检查远端文件是否被修改过
Hash string
// Hash cdssdk.FileHash
// 文件的最后修改时间 // 文件的最后修改时间
ModTime time.Time ModTime time.Time
// 文件的权限 // 文件的权限
Perm os.FileMode Perm os.FileMode
} }


// 返回值代表文件大小是否发生了变化
func (f *FileInfo) EnlargeTo(size int64) bool {
if size > f.Size {
f.Size = size
return true
func (f *FileInfo) Clone() FileInfo {
n := *f
n.Segments = make([]*Range, len(f.Segments))
for i, seg := range f.Segments {
n.Segments[i] = &Range{
Position: seg.Position,
Length: seg.Length,
}
} }
return false

return n
} }


type Range struct { type Range struct {
@@ -56,69 +57,180 @@ func (r *Range) GetPosition() int64 {
return r.Position return r.Position
} }


func (r *Range) SetPosition(pos int64) {
r.Position = pos
}

func (r *Range) GetLength() int64 { func (r *Range) GetLength() int64 {
return r.Length return r.Length
} }


type ReadRequest struct {
Position int64
Length int64
Callback *future.SetVoidFuture
Zeros int64 // 如果>0,那么说明读到了一块全0的数据,值的大小代表这块数据的长度,此时Segment字段为nil
Segment *FileSegment
func (r *Range) SetLength(length int64) {
r.Length = length
} }


func (r *ReadRequest) GetPosition() int64 {
return r.Position
func (r *Range) End() int64 {
return r.Position + r.Length
} }


func (r *ReadRequest) GetLength() int64 {
return r.Length
// 所有读写过程共用同一个CacheFile对象。
// 不应该将此结构体保存到对象中
type CacheFile struct {
cache *Cache
pathComps []string
name string
info FileInfo
remoteObj *cdssdk.Object
infoRev int64
rwLock *sync.RWMutex
readers []*CacheFileReadWriter
writers []*CacheFileReadWriter
backgroundChan chan any

localFile *os.File
writeLock *sync.RWMutex
} }


type WriteReqeust struct {
Position int64
Length int64
Callback *future.SetVoidFuture
Segment *FileSegment
func createNewCacheFile(cache *Cache, pathComps []string) (*CacheFile, error) {
metaPath := cache.GetCacheMetaPath(pathComps...)
dataPath := cache.GetCacheDataPath(pathComps...)

info := FileInfo{
Dirty: true,
ModTime: time.Now(),
Perm: 0644,
}

infoData, err := serder.ObjectToJSON(info)
if err != nil {
return nil, err
}

err = os.WriteFile(metaPath, infoData, 0644)
if err != nil {
return nil, fmt.Errorf("save cache file: %w", err)
}

localFile, err := os.OpenFile(dataPath, os.O_CREATE|os.O_RDWR|os.O_TRUNC, 0644)
if err != nil {
return nil, fmt.Errorf("create cache file: %w", err)
}

ch := &CacheFile{
cache: cache,
pathComps: pathComps,
name: pathComps[len(pathComps)-1],
info: info,
rwLock: &sync.RWMutex{},
backgroundChan: make(chan any, 1),
localFile: localFile,
writeLock: &sync.RWMutex{},
}

go ch.background()

return ch, nil
} }


// 所有读写过程共用同一个CacheFile对象。
// 不应该将此结构体保存到对象中
type CacheFile struct {
objDownloader *downloader.Downloader
pathComps []string
name string
info FileInfo
lock sync.Mutex
segBuffer SegmentBuffer
readers int
writers int

localLoaders []*LocalLoader
remoteLoaders []*RemoteLoader
pendingReadings []*ReadRequest
pendingWritings []*WriteReqeust
managerChan chan any
func loadCacheFile(cache *Cache, pathComps []string) (*CacheFile, error) {
metaPath := cache.GetCacheMetaPath(pathComps...)
dataPath := cache.GetCacheDataPath(pathComps...)

metaData, err := os.ReadFile(metaPath)
if err != nil {
return nil, err
}

info := &FileInfo{}
err = serder.JSONToObject(metaData, info)
if err != nil {
return nil, err
}

localFile, err := os.OpenFile(dataPath, os.O_CREATE|os.O_RDWR, 0644)
if err != nil {
return nil, fmt.Errorf("create cache file: %w", err)
}

ch := &CacheFile{
cache: cache,
pathComps: pathComps,
name: pathComps[len(pathComps)-1],
info: *info,
rwLock: &sync.RWMutex{},
backgroundChan: make(chan any, 1),
localFile: localFile,
writeLock: &sync.RWMutex{},
}

go ch.background()

return ch, nil
} }


func loadCacheFile(pathComps []string, infoPath string) (*CacheFile, error) {
f, err := os.Open(infoPath)
func makeCacheFileFromObject(cache *Cache, pathComps []string, obj *cdssdk.Object) (*CacheFile, error) {
metaPath := cache.GetCacheMetaPath(pathComps...)
dataPath := cache.GetCacheDataPath(pathComps...)

info := FileInfo{
Size: obj.Size,
ObjectSize: obj.Size,
ModTime: obj.UpdateTime,
Perm: 0755,
}

infoData, err := serder.ObjectToJSON(info)
if err != nil {
return nil, err
}

err = os.WriteFile(metaPath, infoData, 0644)
if err != nil {
return nil, fmt.Errorf("save cache file: %w", err)
}

localFile, err := os.OpenFile(dataPath, os.O_CREATE|os.O_RDWR|os.O_TRUNC, 0644)
if err != nil {
return nil, fmt.Errorf("create cache file: %w", err)
}

ch := &CacheFile{
cache: cache,
pathComps: pathComps,
name: pathComps[len(pathComps)-1],
info: info,
remoteObj: obj,
rwLock: &sync.RWMutex{},
backgroundChan: make(chan any, 1),
localFile: localFile,
writeLock: &sync.RWMutex{},
}

go ch.background()

return ch, nil
}

func loadCacheFileInfo(cache *Cache, pathComps []string) (*CacheEntryInfo, error) {
metaPath := cache.GetCacheMetaPath(pathComps...)

metaData, err := os.ReadFile(metaPath)
if err != nil { if err != nil {
return nil, err return nil, err
} }
defer f.Close()


info := &FileInfo{} info := &FileInfo{}
err = serder.JSONToObjectStream(f, info)
err = serder.JSONToObject(metaData, info)
if err != nil { if err != nil {
return nil, err return nil, err
} }


return &CacheFile{
pathComps: pathComps,
name: filepath.Base(infoPath),
info: *info,
return &CacheEntryInfo{
PathComps: pathComps,
Size: info.Size,
Mode: info.Perm,
ModTime: info.ModTime,
IsDir: false,
}, nil }, nil
} }


@@ -146,207 +258,288 @@ func (f *CacheFile) IsDir() bool {
return false return false
} }


func (f *CacheFile) Info() CacheEntryInfo {
return CacheEntryInfo{
PathComps: f.pathComps,
Size: f.info.Size,
Mode: f.info.Perm,
ModTime: f.info.ModTime,
IsDir: false,
}
}

func (f *CacheFile) SetRemoteObject(obj *cdssdk.Object) {
f.remoteObj = obj
}

// 打开一个写入句柄,同时支持读取 // 打开一个写入句柄,同时支持读取
func (f *CacheFile) Open(readOnly bool) *CacheFileReadWriter { func (f *CacheFile) Open(readOnly bool) *CacheFileReadWriter {
f.lock.Lock()
defer f.lock.Unlock()
logger.Tracef("CacheFile.Open: %v, %v", f.name, readOnly)

f.rwLock.Lock()
defer f.rwLock.Unlock()

h := &CacheFileReadWriter{
file: f,
readOnly: readOnly,
remoteLock: &sync.Mutex{},
}

if f.remoteObj != nil {
h.remote = newRemoteLoader(f)
}


if readOnly { if readOnly {
f.readers++
f.readers = append(f.readers, h)
} else { } else {
f.writers++
f.writers = append(f.writers, h)
} }


return &CacheFileReadWriter{
file: f,
readOnly: readOnly,
}
return h
} }


func (f *CacheFile) SetModTime(modTime time.Time) error { func (f *CacheFile) SetModTime(modTime time.Time) error {
f.rwLock.Lock()
f.info.ModTime = modTime
f.infoRev++
f.rwLock.Unlock()

f.notifyBackground()
return nil
} }


func (f *CacheFile) Truncate(size int64) error { func (f *CacheFile) Truncate(size int64) error {
// 修改文件大小前不允许写入
f.writeLock.Lock()
defer f.writeLock.Unlock()

err := f.localFile.Truncate(size)
if err != nil {
return err
}

f.rwLock.Lock()
defer f.rwLock.Unlock()

// 调整能从远端下载的大小
f.info.ObjectSize = math2.Min(f.info.ObjectSize, size)

// 调整本地缓存文件里的有效数据大小
if size < f.info.Size {
f.info.Segments = TruncateRange(f.info.Segments, size)
} else {
f.info.Segments = AddRange(f.info.Segments, &Range{Position: f.info.Size, Length: size - f.info.Size})
}
f.info.Size = size
f.infoRev++

f.notifyBackground()
return nil
} }


// 不再使用缓存文件 // 不再使用缓存文件
func (f *CacheFile) Release() {
// func (f *CacheFile) Release() {


}
// }


// 一个文件段的数据被写入到本地了。err为nil表示成功,否则表示写入失败。
func (f *CacheFile) OnSaved(seg *FileSegment, err error) {
func (f *CacheFile) background() {
savedInfoRev := int64(0)
ticker := time.NewTicker(time.Second * 5)
defer ticker.Stop()


}
for {
select {
case _, ok := <-f.backgroundChan:
if !ok {
return
}


// 一个文件段的数据被加载到内存了。err为nil表示成功,否则表示加载失败。
func (f *CacheFile) OnLoaded(seg *FileSegment, err error) {
case <-ticker.C:
}


}
f.rwLock.Lock()


func (f *CacheFile) notifyManager() {
select {
case f.managerChan <- nil:
break
default:
for {
if f.infoRev == savedInfoRev {
break
}

jsonData, err := serder.ObjectToJSON(f.info)
if err != nil {
// TODO 日志
break
}

err = os.WriteFile(f.cache.GetCacheMetaPath(f.pathComps...), jsonData, 0644)
if err != nil {
// TODO 日志
break
}

savedInfoRev = f.infoRev
break
}

f.rwLock.Unlock()
} }
} }


func (f *CacheFile) managing() {
for {
<-f.managerChan

func (f *CacheFile) notifyBackground() {
select {
case f.backgroundChan <- nil:
default:
} }
} }


type CacheFileReadWriter struct { type CacheFileReadWriter struct {
file *CacheFile
readOnly bool
file *CacheFile
readOnly bool
remote *RemoteLoader
remoteLock *sync.Mutex
} }


func (f *CacheFileReadWriter) ReadAt(buf []byte, off int64) (int, error) {
f.file.lock.Lock()
func (h *CacheFileReadWriter) ReadAt(buf []byte, off int64) (int, error) {
logger.Tracef("CacheFileReadWriter.ReadAt: %v, %v, %v", h.file.name, off, len(buf))


if off >= f.file.info.Size {
return 0, io.EOF
}
totalReadLen := 0
for totalReadLen < len(buf) {
curBuf := buf[totalReadLen:]
curOff := off + int64(totalReadLen)

h.file.rwLock.RLock()


segIdx := f.file.segBuffer.FirstContains(off)
if curOff >= h.file.info.Size {
h.file.rwLock.RUnlock()
break
}


if segIdx >= 0 {
seg := f.file.segBuffer.Segment(segIdx)
// 先尝试从本地缓存文件里读取
rngIdx := FirstContainsIndex(h.file.info.Segments, curOff)
if rngIdx >= 0 && h.file.info.Segments[rngIdx].End() > curOff {
readLen := math2.Min(int64(len(curBuf)), h.file.info.Segments[rngIdx].End()-curOff)
realReadLen, err := h.file.localFile.ReadAt(curBuf[:readLen], curOff)
totalReadLen += realReadLen
h.file.rwLock.RUnlock()

logger.Tracef("read from local cache, n: %v, err: %v", realReadLen, err)
if err != nil {
return totalReadLen, err
}
continue
}


// 读取的数据在当前段内
if off >= seg.Position && off < seg.Position+seg.Length {
readLen := math2.Min(int64(len(buf)), seg.Position+seg.Length-off)
seg.RefCount++
f.file.lock.Unlock()
// 否则从远端下载
loadLen := math2.Min(int64(len(curBuf)), h.file.info.ObjectSize-curOff)
if rngIdx+1 < len(h.file.info.Segments) {
// 最多加载到下一个段的开头
loadLen = math2.Min(loadLen, h.file.info.Segments[rngIdx+1].Position-curOff)
}


copy(buf[:readLen], seg.SubSliceAbs(off, readLen))
h.file.rwLock.RUnlock()


f.file.lock.Lock()
seg.RefCount--
seg.Type = SegmentDirty
f.file.lock.Unlock()
return int(readLen), nil
if h.remote == nil {
return totalReadLen, fmt.Errorf("no remote file")
} }


// if off >= f.file.info.ObjectSize {
// readLen := int64(len(buf))
fmt.Printf("load from remote\n")


// if segIdx < f.file.segBuffer.BuzyCount()-1 {
// nextSeg := f.file.segBuffer.Segment(segIdx + 1)
// readLen = math2.Min(readLen, nextSeg.Position-off)
// } else {
// readLen = math2.Min(readLen, f.file.info.Size-off)
// }
// f.file.lock.Unlock()
// 加锁,防止并发Seek
h.remoteLock.Lock()
realLoadLen, err := h.remote.Load(curBuf[:loadLen], curOff)
totalReadLen += realLoadLen
if err != nil {
h.remoteLock.Unlock()
return totalReadLen, err
}
h.remoteLock.Unlock()


// clear(buf[:readLen])
// return int(readLen), nil
// }
}
logger.Tracef("load from remote: %v", realLoadLen)


// 没有被缓存的数据,则需要通知加载器去加载
// 在写入到本地之前,先停止其他的写入,防止冲突
h.file.writeLock.Lock()


fut := future.NewSetVoid()
req := &ReadRequest{
Position: off,
Callback: fut,
}
insertIdx := FirstContains(f.file.pendingReadings, off)
f.file.pendingReadings = lo2.Insert(f.file.pendingReadings, insertIdx+1, req)
f.file.lock.Unlock()
// 停止其他写入后,就可以计算一下实际要写回的长度。
h.file.rwLock.RLock()
loadRng := &Range{Position: curOff, Length: int64(realLoadLen)}
DifferentRange(loadRng, h.file.info.Segments)
h.file.rwLock.RUnlock()


f.file.notifyManager()
if loadRng.Length == 0 {
h.file.writeLock.Unlock()
continue
}


err := fut.Wait(context.Background())
if err != nil {
return 0, err
}
// 写入到本地缓存文件
writeStart := loadRng.Position - curOff
_, err = h.file.localFile.WriteAt(curBuf[writeStart:writeStart+loadRng.Length], curOff)
if err != nil {
h.file.writeLock.Unlock()


if req.Segment != nil {
// 这里不加锁,因为在引用计数大于0期间,Position不变,而Length虽然可能发生变化,但并发读写是未定义行为,所以不管读到多少数据都可以
readLen := math2.Min(int64(len(buf)), req.Segment.Position+req.Segment.Length-off)
copy(buf[:readLen], req.Segment.SubSliceAbs(off, readLen))

// bufferManager线程会在调用回调之前,给引用计数+1
// TODO 这种做法容易粗心产生遗漏,且不利于实现超时机制
f.file.lock.Lock()
req.Segment.RefCount--
f.file.lock.Unlock()
return int(readLen), nil
}
logger.Tracef("save to local file: %v", err)
return totalReadLen, fmt.Errorf("save to local file: %w", err)
}

logger.Tracef("save to local: %v", loadRng.Length)


if req.Zeros > 0 {
clear(buf[:req.Zeros])
return int(req.Zeros), nil
h.file.writeLock.Unlock()

// 提交到段列表里
h.file.rwLock.Lock()
h.file.info.Segments = AddRange(h.file.info.Segments, loadRng)
h.file.infoRev++
h.file.notifyBackground()
h.file.rwLock.Unlock()
} }


return 0, io.EOF
return totalReadLen, nil
} }


func (f *CacheFileReadWriter) WriteAt(buf []byte, off int64) (int, error) {
if f.readOnly {
func (h *CacheFileReadWriter) WriteAt(buf []byte, off int64) (int, error) {
if h.readOnly {
return 0, fuse.ErrPermission return 0, fuse.ErrPermission
} }


f.file.lock.Lock()
// 如果找到一个包含off位置的段,那么就先写满这个段
segIdx := f.file.segBuffer.FirstContains(off)
if segIdx >= 0 {
seg := f.file.segBuffer.Segment(segIdx)

if off >= seg.Position && off < seg.Position+seg.Length {
writeLen := math2.Min(int64(len(buf)), seg.BufferEnd()-off)
seg.RefCount++
f.file.lock.Unlock()

// 不管此时是不是有其他线程在读取数据,因为我们约定并发读写就是未定义行为。
copy(seg.SubSliceAbs(off, writeLen), buf[:writeLen])

f.file.lock.Lock()
seg.RefCount--
seg.EnlargeTo(off + writeLen)
seg.Type = SegmentDirty
f.file.info.EnlargeTo(seg.AvailableEnd())
f.file.lock.Unlock()
f.file.notifyManager()
return int(writeLen), nil
}
}
logger.Tracef("CacheFileReadWriter.WriteAt: %v, %v, %v", h.file.name, off, len(buf))


fut := future.NewSetVoid()
req := &WriteReqeust{
Position: off,
Callback: fut,
}
f.file.pendingWritings = append(f.file.pendingWritings, req)
f.file.lock.Unlock()
// 允许多线程并行写入,但在数据加载期间不能写入
h.file.writeLock.RLock()
defer h.file.writeLock.RUnlock()


err := fut.Wait(context.Background())
// 写入到本地缓存文件
writeLen, err := h.file.localFile.WriteAt(buf, off)
if err != nil { if err != nil {
return 0, err
return writeLen, fmt.Errorf("save to local file: %w", err)
} }


// 一些说明参考ReadAt函数
// 由managing线程保证文件的同一个位置只会有一个段,因此这里不考虑在copy期间又有其他线程在写同一个段导致的产生新的重复段
writeLen := math2.Min(int64(len(buf)), req.Segment.BufferEnd()-off)
copy(req.Segment.SubSliceAbs(off, writeLen), buf[:writeLen])

f.file.lock.Lock()
req.Segment.RefCount--
req.Segment.EnlargeTo(off + writeLen)
req.Segment.Type = SegmentDirty
f.file.info.EnlargeTo(req.Segment.AvailableEnd())
f.file.lock.Unlock()
f.file.notifyManager()
return int(writeLen), nil
// 提交到段列表里
h.file.rwLock.Lock()
defer h.file.rwLock.Unlock()

h.file.info.Segments = AddRange(h.file.info.Segments, &Range{Position: off, Length: int64(writeLen)})
h.file.info.Size = math2.Max(h.file.info.Size, off+int64(writeLen))
h.file.info.Dirty = true
h.file.infoRev++

h.file.notifyBackground()

return writeLen, nil
} }


func (f *CacheFileReadWriter) Sync() error { func (f *CacheFileReadWriter) Sync() error {
return f.file.localFile.Sync()
} }


func (f *CacheFileReadWriter) Close() error { func (f *CacheFileReadWriter) Close() error {
f.Sync()


f.remote.Close()

f.file.rwLock.Lock()
if f.readOnly {
f.file.readers = lo2.Remove(f.file.readers, f)
} else {
f.file.writers = lo2.Remove(f.file.writers, f)
}
f.file.rwLock.Unlock()
return nil
} }

+ 0
- 12
client2/internal/mount/vfs/cache/file_saver.go View File

@@ -1,12 +0,0 @@
package cache

import "os"

type FileSaver struct {
owner *CacheFile
fileHd *os.File
}

func (s *FileSaver) BeginSaving(seg *FileSegment) {

}

+ 0
- 117
client2/internal/mount/vfs/cache/file_segment.go View File

@@ -1,117 +0,0 @@
package cache

import (
"gitlink.org.cn/cloudream/common/utils/lo2"
"gitlink.org.cn/cloudream/common/utils/math2"
)

type SegmentType int

const (
// 数据段刚被初始化
SegmentInit = iota
// 数据来自本地文件
SegmentLocal
// 数据来自远端文件,还未写入到本地文件
SegmentRemote
// 数据由用户写入,还未写入到本地文件
SegmentDirty
)

type FileSegment struct {
Position int64
// 有效数据的长度。不一定等于Buffer的长度
Length int64
Type SegmentType
// 文件数据缓冲区。不可对此缓冲区进行append操作!
Buffer []byte
// 当前段是否正在被保存到本地文件中
IsSaving bool
// 引用计数。当引用计数为0时,可以安全地删除此段
RefCount int
}

func (s *FileSegment) GetPosition() int64 {
return s.Position
}

func (s *FileSegment) GetLength() int64 {
return s.Length
}

func (s *FileSegment) SubSliceAbs(pos int64, len int64) []byte {
start := pos - s.Position
return s.Buffer[start : start+len]
}

// 将当前段拆分为两个段。当前段将持有第一个段,返回值持有第二个段
func (s *FileSegment) SplitAbs(pos int64) *FileSegment {
s2 := s.Buffer[pos-s.Position:]
s2Len := math2.Max(s.Position+s.Length-pos, 0)

s.Buffer = s.Buffer[:pos-s.Position]
s.Length = math2.Min(int64(len(s.Buffer)), s.Length)

return &FileSegment{
Position: pos,
Length: s2Len,
Type: s.Type,
Buffer: s2,
}
}

func (s *FileSegment) AvailableEnd() int64 {
return s.Position + s.Length
}

func (s *FileSegment) BufferEnd() int64 {
return s.Position + int64(len(s.Buffer))
}

func (s *FileSegment) EnlargeTo(pos int64) {
if pos > s.Position+s.Length {
s.Length = pos - s.Position
}
}

type SegmentBuffer struct {
// 正在使用的文件段缓冲区
buzys []*FileSegment
}

func (s *SegmentBuffer) BuzyCount() int {
return len(s.buzys)
}

func (s *SegmentBuffer) Segment(idx int) *FileSegment {
return s.buzys[idx]
}

func (s *SegmentBuffer) FirstContains(pos int64) int {
return FirstContains(s.buzys, pos)
}

// 将指定段插入到段缓存的恰当位置
func (s *SegmentBuffer) Insert(seg *FileSegment) {
index := s.FirstContains(seg.Position)

if index == -1 {
s.buzys = append([]*FileSegment{seg}, s.buzys...)
} else {
// index是最后一个小于Position的位置,所以要加1
s.buzys = lo2.Insert(s.buzys, index+1, seg)
}
}

// 插入一个段到指定索引位置。不会检查插入后Segments是否依然保持有序。
func (s *SegmentBuffer) InsertAt(index int, seg *FileSegment) {
s.buzys = lo2.Insert(s.buzys, index, seg)
}

func (s *SegmentBuffer) RemoveAt(index int) {
s.buzys = lo2.RemoveAt(s.buzys, index)
}

func (s *SegmentBuffer) Remove(seg *FileSegment) {
s.buzys = lo2.Remove(s.buzys, seg)
}

+ 0
- 36
client2/internal/mount/vfs/cache/file_segment_test.go View File

@@ -1,36 +0,0 @@
package cache

import (
"fmt"
"testing"

. "github.com/smartystreets/goconvey/convey"
)

func Test_FindSegmentIndex(t *testing.T) {
cases := []struct {
title string
buffer SegmentBuffer
postions []int64
wants []int
}{
{
buffer: SegmentBuffer{
buzys: []*FileSegment{
{Position: 0}, {Position: 10},
},
},
postions: []int64{0, 2, 10, 11},
wants: []int{0, 0, 1, 1},
},
}

for i, c := range cases {
for j, _ := range c.postions {
Convey(fmt.Sprintf("%d.%d. %s", i, j, c.title), t, func() {
got := c.buffer.FirstContains(c.postions[j])
So(got, ShouldEqual, c.wants[j])
})
}
}
}

+ 332
- 0
client2/internal/mount/vfs/cache/range_test.go View File

@@ -0,0 +1,332 @@
package cache

import (
"testing"

. "github.com/smartystreets/goconvey/convey"
)

func Test_FirstContainsIndex(t *testing.T) {
cases := []struct {
title string
arr []*Range
pos int64
want int
}{
{
"空数组",
[]*Range{},
100,
-1,
},
{
"只有一个元素,pos在范围内",
[]*Range{{100, 100}},
100,
0,
},
{
"区间不连续,但在范围内-奇数区间数",
[]*Range{{100, 100}, {300, 100}, {500, 100}},
350,
1,
},
{
"区间不连续,但在范围内-偶数区间数",
[]*Range{{100, 100}, {300, 100}},
350,
1,
},
{
"区间不连续,pos在最左边",
[]*Range{{100, 100}, {300, 100}, {500, 100}},
10,
-1,
},
{
"区间不连续,pos在靠左的第一个空洞中",
[]*Range{{100, 100}, {300, 100}, {500, 100}},
200,
0,
},
{
"区间不连续,pos在靠左的第一个空洞中-偶数区间数",
[]*Range{{100, 100}, {300, 100}, {500, 100}, {700, 100}},
200,
0,
},
{
"区间不连续,pos在最右边",
[]*Range{{100, 100}, {300, 100}, {500, 100}},
601,
2,
},
}

for _, c := range cases {
Convey(c.title, t, func() {
got := FirstContainsIndex(c.arr, c.pos)
So(got, ShouldEqual, c.want)
})
}
}

func Test_AddRange(t *testing.T) {
cases := []struct {
title string
arr []*Range
r *Range
want []*Range
}{
{
"空数组",
[]*Range{},
&Range{100, 100},
[]*Range{{100, 100}},
},
{
"单个区间,连接在左侧",
[]*Range{{100, 100}},
&Range{0, 100},
[]*Range{{0, 200}},
},
{
"单个区间,连接在右侧",
[]*Range{{100, 100}},
&Range{200, 100},
[]*Range{{100, 200}},
},
{
"单个区间,左边重叠了一部分",
[]*Range{{100, 100}},
&Range{50, 100},
[]*Range{{50, 150}},
},
{
"单个区间,右边重叠了一部分",
[]*Range{{100, 100}},
&Range{150, 100},
[]*Range{{100, 150}},
},
{
"单个区间,被新区间完全覆盖",
[]*Range{{100, 100}},
&Range{50, 150},
[]*Range{{50, 150}},
},
{
"单个区间,在左边但有空洞",
[]*Range{{100, 100}},
&Range{0, 50},
[]*Range{{0, 50}, {100, 100}},
},
{
"单个区间,在右边但有空洞",
[]*Range{{100, 100}},
&Range{250, 50},
[]*Range{{100, 100}, {250, 50}},
},
{
"恰好连接了两个区间",
[]*Range{{100, 100}, {300, 100}},
&Range{200, 100},
[]*Range{{100, 300}},
},
{
"连接了两个区间,但在左边有重叠",
[]*Range{{100, 100}, {300, 100}},
&Range{150, 150},
[]*Range{{100, 300}},
},
{
"连接了两个区间,但在右边有重叠",
[]*Range{{100, 100}, {300, 100}},
&Range{150, 200},
[]*Range{{100, 300}},
},
{
"覆盖了多个区间,但不是完全连接",
[]*Range{{100, 100}, {300, 100}, {500, 100}, {700, 100}},
&Range{150, 400},
[]*Range{{100, 500}, {700, 100}},
},
{
"完全覆盖所有区间",
[]*Range{{100, 100}, {300, 100}, {500, 100}, {700, 100}},
&Range{0, 900},
[]*Range{{0, 900}},
},
}

for _, c := range cases {
Convey(c.title, t, func() {
got := AddRange(c.arr, c.r)
So(got, ShouldResemble, c.want)
})
}
}

func Test_MergeRanges(t *testing.T) {
cases := []struct {
title string
arr1 []*Range
arr2 []*Range
want []*Range
}{
{
"两个都是空数组",
[]*Range{},
[]*Range{},
nil,
},
{
"其中一个是空数组",
[]*Range{{100, 100}},
[]*Range{},
[]*Range{{100, 100}},
},
{
"两个都是单个区间,没有重叠",
[]*Range{{100, 100}},
[]*Range{{300, 100}},
[]*Range{{100, 100}, {300, 100}},
},
{
"两个都是单个区间,恰好连接",
[]*Range{{100, 100}},
[]*Range{{200, 100}},
[]*Range{{100, 200}},
},
{
"两个都是单个区间,有重叠",
[]*Range{{100, 100}},
[]*Range{{150, 100}},
[]*Range{{100, 150}},
},
{
"多区间恰好连接",
[]*Range{{100, 100}, {300, 100}, {500, 100}},
[]*Range{{200, 100}, {400, 100}},
[]*Range{{100, 500}},
},
{
"多区间各种情况",
[]*Range{{100, 100}, {300, 100}, {500, 100}, {700, 100}},
[]*Range{{150, 100}, {260, 90}, {400, 100}, {550, 250}},
[]*Range{{100, 150}, {260, 540}},
},
}

for _, c := range cases {
Convey(c.title, t, func() {
got := MergeRanges(c.arr1, c.arr2)
So(got, ShouldResemble, c.want)
})
}
}

func Test_TruncateRange(t *testing.T) {
cases := []struct {
title string
arr []*Range
pos int64
want []*Range
}{
{
"空数组",
[]*Range{},
100,
[]*Range{},
},
{
"截断的长度比现有长",
[]*Range{{100, 100}},
300,
[]*Range{{100, 100}},
},
{
"截断的长度等于现有长",
[]*Range{{100, 100}},
200,
[]*Range{{100, 100}},
},
{
"截断的长度小于现有长",
[]*Range{{100, 100}, {300, 100}},
150,
[]*Range{{100, 50}},
},
{
"截断了所有",
[]*Range{{100, 100}, {300, 100}},
0,
[]*Range{},
},
{
"截断的位置在空洞内",
[]*Range{{100, 100}, {300, 100}},
250,
[]*Range{{100, 100}},
},
}

for _, c := range cases {
Convey(c.title, t, func() {
got := TruncateRange(c.arr, c.pos)
So(got, ShouldResemble, c.want)
})
}
}

func Test_DifferentRange(t *testing.T) {
cases := []struct {
title string
rng *Range
arr []*Range
want *Range
}{
{
"空数组",
&Range{100, 100},
[]*Range{},
&Range{100, 100},
},
{
"区间的右边被截断",
&Range{50, 150},
[]*Range{{100, 100}, {300, 100}},
&Range{50, 50},
},
{
"区间的左边被截断",
&Range{150, 150},
[]*Range{{100, 100}, {300, 100}},
&Range{200, 100},
},
{
"区间被彻底截断",
&Range{100, 100},
[]*Range{{100, 100}, {300, 100}},
&Range{200, 0},
},
{
"区间被截断成多份1",
&Range{0, 400},
[]*Range{{100, 100}, {300, 100}},
&Range{0, 100},
},
{
"区间被截断成多份2",
&Range{100, 300},
[]*Range{{100, 100}, {300, 100}},
&Range{200, 100},
},
}

for _, c := range cases {
Convey(c.title, t, func() {
DifferentRange(c.rng, c.arr)
So(c.rng, ShouldResemble, c.want)
})
}
}

+ 95
- 0
client2/internal/mount/vfs/cache/remote.go View File

@@ -0,0 +1,95 @@
package cache

import (
"io"
"time"
)

type RemoteLoader struct {
file *CacheFile
loaders []*OpenLoader
}

type OpenLoader struct {
reader io.ReadCloser
pos int64
lastUsedTime time.Time
}

func newRemoteLoader(file *CacheFile) *RemoteLoader {
return &RemoteLoader{
file: file,
loaders: make([]*OpenLoader, 2),
}
}

func (r *RemoteLoader) Load(p []byte, pos int64) (n int, err error) {
replaceIdx := -1
for i := 0; i < len(r.loaders); i++ {
loader := r.loaders[i]
if loader == nil {
replaceIdx = i
continue
}

if loader.pos == pos {
loader.lastUsedTime = time.Now()
n, err = io.ReadFull(loader.reader, p)
loader.pos += int64(n)
if err != nil {
loader.reader.Close()
r.loaders[i] = nil
}
return
}

if replaceIdx == -1 || r.loaders[replaceIdx] != nil && r.loaders[replaceIdx].lastUsedTime.After(loader.lastUsedTime) {
replaceIdx = i
}
}

if r.loaders[replaceIdx] != nil {
r.loaders[replaceIdx].reader.Close()
r.loaders[replaceIdx] = nil
}

loader, err := r.newLoader(pos)
if err != nil {
return 0, err
}
loader.lastUsedTime = time.Now()

r.loaders[replaceIdx] = loader
n, err = io.ReadFull(loader.reader, p)
loader.pos += int64(n)
if err != nil {
loader.reader.Close()
r.loaders[replaceIdx] = nil
}
return
}

func (r *RemoteLoader) Close() {
for i := 0; i < len(r.loaders); i++ {
if r.loaders[i] != nil {
r.loaders[i].reader.Close()
}
}
}

func (r *RemoteLoader) newLoader(pos int64) (*OpenLoader, error) {
detail, err := r.file.cache.db.Object().GetDetail(r.file.cache.db.DefCtx(), r.file.remoteObj.ObjectID)
if err != nil {
return nil, err
}

down, err := r.file.cache.downloader.DownloadObjectByDetail(detail, pos, -1)
if err != nil {
return nil, err
}

return &OpenLoader{
reader: down.File,
pos: pos,
}, nil
}

+ 143
- 12
client2/internal/mount/vfs/cache/utils.go View File

@@ -1,9 +1,11 @@
package cache package cache


type Ranger interface {
GetPosition() int64
GetLength() int64
}
import (
"github.com/samber/lo"
"gitlink.org.cn/cloudream/common/utils/lo2"
"gitlink.org.cn/cloudream/common/utils/math2"
"gitlink.org.cn/cloudream/common/utils/sort2"
)


// 查找第一个包含指定位置的段索引。如果所有段都不包含指定位置,那么有以下三种情况: // 查找第一个包含指定位置的段索引。如果所有段都不包含指定位置,那么有以下三种情况:
// //
@@ -14,7 +16,7 @@ type Ranger interface {
// 3. pos在段之间的空洞内,那么会返回小于pos的最后一个段 // 3. pos在段之间的空洞内,那么会返回小于pos的最后一个段
// //
// 注:2、3情况返回的结果是相同的 // 注:2、3情况返回的结果是相同的
func FirstContains[T Ranger](arr []T, pos int64) int {
func FirstContainsIndex(arr []*Range, pos int64) int {
low, high := 0, len(arr)-1 low, high := 0, len(arr)-1


for low <= high { for low <= high {
@@ -22,17 +24,146 @@ func FirstContains[T Ranger](arr []T, pos int64) int {


if arr[mid].GetPosition() > pos { if arr[mid].GetPosition() > pos {
high = mid - 1 high = mid - 1
} else if arr[mid].GetPosition() < pos {
if mid == len(arr)-1 {
return mid
}
} else {
low = mid + 1
}
}

return low - 1
}


low = mid
func FirstContains(arr []*Range, pos int64) (*Range, bool) {
index := FirstContainsIndex(arr, pos)

var ret *Range
if index < 0 {
return nil, false
}

ret = arr[index]

if ret.End() <= pos {
return nil, false
}

return ret, true
}

func AddRange(arr []*Range, rng *Range) []*Range {
if len(arr) == 0 {
return []*Range{rng}
}

idx := FirstContainsIndex(arr, rng.GetPosition())
arr = lo2.Insert(arr, idx+1, rng)

lowerIdx := math2.Max(0, idx)
lowerRng := arr[lowerIdx]

for i := lowerIdx + 1; i < len(arr); i++ {
nextRng := arr[i]
if nextRng.GetPosition() > lowerRng.End() {
break
}

curRngEnd := lowerRng.End()
nextRngEnd := nextRng.End()
lowerRng.SetPosition(math2.Min(lowerRng.GetPosition(), nextRng.GetPosition()))
lowerRng.SetLength(math2.Max(curRngEnd, nextRngEnd) - lowerRng.GetPosition())

nextRng.SetLength(0)
}

return lo.Reject(arr, func(i *Range, idx int) bool {
return i.GetLength() == 0
})
}

func MergeRanges(arr []*Range, rngs []*Range) []*Range {
type Point struct {
pos int64
isStart bool
}


points := make([]Point, 0, (len(arr)+len(rngs))*2)

for _, rng := range arr {
points = append(points, Point{rng.GetPosition(), true})
points = append(points, Point{rng.GetPosition() + rng.GetLength(), false})
}

for _, rng := range rngs {
points = append(points, Point{rng.GetPosition(), true})
points = append(points, Point{rng.GetPosition() + rng.GetLength(), false})
}

points = sort2.Sort(points, func(a, b Point) int {
if a.pos == b.pos {
// 位置相同,则endpoint在后
return sort2.CmpBool(b.isStart, a.isStart)
}
return math2.Sign(a.pos - b.pos)
})

var merged []*Range
depth := 0
for _, p := range points {
if p.isStart {
depth++
if depth == 1 {
merged = append(merged, &Range{p.pos, 0})
}
} else { } else {
return mid
depth--
if depth == 0 {
lastIdx := len(merged) - 1
merged[lastIdx].Length = p.pos - merged[lastIdx].Position
}
} }
} }


return high
return merged
}

func TruncateRange(arr []*Range, length int64) []*Range {
if len(arr) == 0 {
return arr
}

idx := FirstContainsIndex(arr, length)
if idx < 0 {
return arr[:0]
}

rng := arr[idx]
if rng.End() > length {
rng.Length = length - rng.Position
}

if rng.Length <= 0 {
return arr[:idx]
}

return arr[:idx+1]
}

// 修剪指定的范围,使其仅包含在arr的空洞中。
// 如果范围会被切分成多个,那么就只会返回第一个切分段。
func DifferentRange(rng *Range, arr []*Range) {
rngStart := rng.Position
rngEnd := rng.End()

idx := FirstContainsIndex(arr, rng.Position)
if idx >= 0 {
leftRng := arr[idx]
rngStart = math2.Max(rng.Position, leftRng.End())
}

if idx+1 < len(arr) {
rightRng := arr[idx+1]
rngEnd = math2.Min(rngEnd, rightRng.Position)
}

rng.Position = rngStart
rng.Length = rngEnd - rngStart
} }

+ 231
- 3
client2/internal/mount/vfs/fuse_bucket.go View File

@@ -1,22 +1,250 @@
package vfs package vfs


import ( import (
"context"
"fmt"
"os"
"time"

cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" cdssdk "gitlink.org.cn/cloudream/common/sdks/storage"
"gitlink.org.cn/cloudream/storage/client2/internal/mount/fuse" "gitlink.org.cn/cloudream/storage/client2/internal/mount/fuse"
"gitlink.org.cn/cloudream/storage/client2/internal/mount/vfs/cache" "gitlink.org.cn/cloudream/storage/client2/internal/mount/vfs/cache"
"gitlink.org.cn/cloudream/storage/common/pkgs/db2"
"gorm.io/gorm"
) )


type FuseBucket struct { type FuseBucket struct {
vfs *Vfs
name string
modTime time.Time
}

func newBucketFromCache(c cache.CacheEntryInfo, vfs *Vfs) fuse.FsDir {
return &FuseBucket{
vfs: vfs,
name: c.PathComps[len(c.PathComps)-1],
modTime: c.ModTime,
}
}

func (r *FuseBucket) PathComps() []string {
return []string{r.name}
}

func (r *FuseBucket) Name() string {
return r.name
}

func (r *FuseBucket) Size() int64 {
return 0
}

func (r *FuseBucket) Mode() os.FileMode {
return os.ModeDir | 0755
}

func (r *FuseBucket) ModTime() time.Time {
return r.modTime
}

func (r *FuseBucket) IsDir() bool {
return true
}

func (r *FuseBucket) SetModTime(time time.Time) error {
dir := r.loadCacheDir()
if dir == nil {
return fuse.ErrNotExists
}

return dir.SetModTime(time)
}

// 如果不存在,应该返回ErrNotExists
func (r *FuseBucket) Child(ctx context.Context, name string) (fuse.FsEntry, error) {
childPathComps := []string{r.name, name}
ca := r.vfs.cache.Stat(childPathComps)

if ca == nil {
// TODO UserID

pkg, err := r.vfs.db.Package().GetUserPackageByName(r.vfs.db.DefCtx(), 1, r.name, name)
if err == nil {
dir := r.vfs.cache.LoadDir(childPathComps, &cache.CreateDirOption{
ModTime: pkg.CreateTime,
})
if dir == nil {
return nil, fuse.ErrNotExists
}

return newPackageFromCache(dir.Info(), r.vfs), nil
}

if err == gorm.ErrRecordNotFound {
return nil, fuse.ErrNotExists
}

return nil, err
}

if ca.IsDir {
return newPackageFromCache(*ca, r.vfs), nil
}

return newFileFromCache(*ca, r.vfs), nil
}

func (r *FuseBucket) Children(ctx context.Context) ([]fuse.FsEntry, error) {
return r.listChildren()
}

func (r *FuseBucket) ReadChildren() (fuse.DirReader, error) {
ens, err := r.listChildren()
if err != nil {
return nil, err
}

return newFuseDirReader(ens), nil
}

func (r *FuseBucket) listChildren() ([]fuse.FsEntry, error) {
var ens []fuse.FsEntry

infos := r.vfs.cache.StatMany([]string{r.name})

pkgs, err := r.vfs.db.Package().GetBucketPackagesByName(r.vfs.db.DefCtx(), r.name)
if err != nil {
return nil, err
}

pkgMap := make(map[string]*cdssdk.Package)
for _, pkg := range pkgs {
p := pkg
pkgMap[pkg.Name] = &p
}

for _, c := range infos {
delete(pkgMap, c.PathComps[len(c.PathComps)-1])

if c.IsDir {
ens = append(ens, newPackageFromCache(c, r.vfs))
} else {
ens = append(ens, newFileFromCache(c, r.vfs))
}
}

for _, pkg := range pkgMap {
dir := r.vfs.cache.LoadDir([]string{r.name, pkg.Name}, &cache.CreateDirOption{
ModTime: pkg.CreateTime,
})
if dir == nil {
continue
}

ens = append(ens, newPackageFromCache(dir.Info(), r.vfs))
}

return ens, nil
}

func (r *FuseBucket) NewDir(ctx context.Context, name string) (fuse.FsDir, error) {
cache := r.vfs.cache.CreateDir([]string{r.name, name})
if cache == nil {
return nil, fuse.ErrPermission
}

// TODO 用户ID,失败了可以打个日志
// TODO 生成系统事件
// 不关注创建是否成功,仅尝试一下
r.vfs.db.DoTx(func(tx db2.SQLContext) error {
db := r.vfs.db
bkt, err := db.Bucket().GetByName(tx, name)
if err != nil {
return fmt.Errorf("get bucket: %v", err)
}

_, err = db.Package().Create(tx, bkt.BucketID, name)
if err != nil {
return fmt.Errorf("create package: %v", err)
}

return err
})

return newPackageFromCache(cache.Info(), r.vfs), nil
} }


func newBucketOrFileFromCache(c cache.CacheEntry, vfs *Vfs) fuse.FsEntry {
func (r *FuseBucket) NewFile(ctx context.Context, name string, flags uint32) (fuse.FileHandle, error) {
cache := r.vfs.cache.CreateFile([]string{r.name, name})
if cache == nil {
return nil, fuse.ErrPermission
}
// Open之后会给cache的引用计数额外+1,即使cache先于FileHandle被关闭,
// 也有有FileHandle的计数保持cache的有效性

fileNode := newFileFromCache(cache.Info(), r.vfs)


if flags&uint32(os.O_WRONLY) != 0 {
hd := cache.Open(false)
return newFileHandle(fileNode, hd), nil
}

if flags&uint32(os.O_RDONLY) != 0 {
hd := cache.Open(true)
return newFileHandle(fileNode, hd), nil
}

return nil, fuse.ErrPermission
} }


func newBucketFromCache(c *cache.CacheDir, vfs *Vfs) fuse.FsDir {
func (r *FuseBucket) RemoveChild(ctx context.Context, name string) error {
err := r.vfs.cache.Remove([]string{r.name, name})
if err != nil {
return err
}

// TODO 生成系统事件
// 不关心是否成功
r.vfs.db.DoTx(func(tx db2.SQLContext) error {
d := r.vfs.db

pkg, err := d.Package().GetUserPackageByName(tx, 1, r.name, name)
if err != nil {
if err == gorm.ErrRecordNotFound {
return nil
}
}


return d.Package().DeleteComplete(tx, pkg.PackageID)
})

return nil
} }


func newBucketFromDB(bkt cdssdk.Bucket, vfs *Vfs) fuse.FsEntry {
func (r *FuseBucket) MoveChild(ctx context.Context, oldName string, newName string, newParent fuse.FsDir) error {

// TODO 有问题


newParentNode := newParent.(FuseNode)
_, err := r.vfs.cache.Move([]string{r.name, oldName}, append(newParentNode.PathComps(), newName))
if err != nil {
return err
}

return nil
} }

func (r *FuseBucket) loadCacheDir() *cache.CacheDir {
var createOpt *cache.CreateDirOption
bkt, err := r.vfs.db.Bucket().GetByName(r.vfs.db.DefCtx(), r.name)
if err == nil {
createOpt = &cache.CreateDirOption{
ModTime: bkt.CreateTime,
}
}

return r.vfs.cache.LoadDir([]string{r.name}, createOpt)
}

var _ fuse.FsDir = (*FuseBucket)(nil)
var _ FuseNode = (*FuseBucket)(nil)

+ 301
- 0
client2/internal/mount/vfs/fuse_dir.go View File

@@ -0,0 +1,301 @@
package vfs

import (
"context"
"fmt"
"os"
"strings"
"time"

cdssdk "gitlink.org.cn/cloudream/common/sdks/storage"
"gitlink.org.cn/cloudream/common/utils/lo2"
"gitlink.org.cn/cloudream/storage/client2/internal/mount/fuse"
"gitlink.org.cn/cloudream/storage/client2/internal/mount/vfs/cache"
"gitlink.org.cn/cloudream/storage/common/pkgs/db2"
"gorm.io/gorm"
)

type FuseDir struct {
vfs *Vfs
pathComps []string
modTime time.Time
}

func newDirFromCache(ch cache.CacheEntryInfo, vfs *Vfs) *FuseDir {
return &FuseDir{
vfs: vfs,
pathComps: ch.PathComps,
modTime: ch.ModTime,
}
}

func (r *FuseDir) PathComps() []string {
return r.pathComps
}

func (r *FuseDir) Name() string {
return r.pathComps[len(r.pathComps)-1]
}

func (r *FuseDir) Size() int64 {
return 0
}

func (r *FuseDir) Mode() os.FileMode {
return os.ModeDir | 0755
}

func (r *FuseDir) ModTime() time.Time {
return r.modTime
}

func (r *FuseDir) IsDir() bool {
return true
}

func (r *FuseDir) SetModTime(time time.Time) error {
dir := r.loadCacheDir()
if dir == nil {
return fuse.ErrNotExists
}

return dir.SetModTime(time)
}

// 如果不存在,应该返回ErrNotExists
func (r *FuseDir) Child(ctx context.Context, name string) (fuse.FsEntry, error) {
childPathComps := append(lo2.ArrayClone(r.pathComps), name)
ca := r.vfs.cache.Stat(childPathComps)
if ca == nil {
var ret fuse.FsEntry

db := r.vfs.db
err := db.DoTx(func(tx db2.SQLContext) error {
pkg, err := db.Package().GetUserPackageByName(tx, 1, r.pathComps[0], r.pathComps[1])
if err != nil {
return err
}

objPath := cdssdk.JoinObjectPath(childPathComps[2:]...)
obj, err := db.Object().GetByPath(tx, pkg.PackageID, objPath)
if err == nil {
ret = newFileFromObject(r.vfs, childPathComps, obj)
return nil
}
if err != gorm.ErrRecordNotFound {
return err
}

has, err := db.Object().HasObjectWithPrefix(tx, pkg.PackageID, objPath+cdssdk.ObjectPathSeparator)
if err != nil {
return err
}

if has {
dir := r.vfs.cache.LoadDir(childPathComps, &cache.CreateDirOption{
ModTime: time.Now(),
})
if dir == nil {
return nil
}

ret = newDirFromCache(dir.Info(), r.vfs)
}

return nil
})
if err != nil {
return nil, err
}

if ret == nil {
return nil, fuse.ErrNotExists
}

return ret, nil
}

if ca.IsDir {
return newDirFromCache(*ca, r.vfs), nil
}

return newFileFromCache(*ca, r.vfs), nil
}

func (r *FuseDir) Children(ctx context.Context) ([]fuse.FsEntry, error) {
return r.listChildren()
}

func (r *FuseDir) ReadChildren() (fuse.DirReader, error) {
ens, err := r.listChildren()
if err != nil {
return nil, err
}

return newFuseDirReader(ens), nil
}

func (r *FuseDir) listChildren() ([]fuse.FsEntry, error) {
var ens []fuse.FsEntry

infos := r.vfs.cache.StatMany(r.pathComps)

dbEntries := make(map[string]fuse.FsEntry)

db := r.vfs.db
db.DoTx(func(tx db2.SQLContext) error {
pkg, err := db.Package().GetUserPackageByName(tx, 1, r.pathComps[0], r.pathComps[1])
if err != nil {
return err
}

objPath := cdssdk.JoinObjectPath(r.pathComps[2:]...)

coms, err := db.Object().GetCommonPrefixes(tx, pkg.PackageID, objPath+cdssdk.ObjectPathSeparator)
if err != nil {
return fmt.Errorf("getting common prefixes: %w", err)
}

objs, err := db.Object().GetDirectChildren(tx, pkg.PackageID, objPath+cdssdk.ObjectPathSeparator)
if err != nil {
return fmt.Errorf("getting direct children: %w", err)
}

for _, dir := range coms {
dir = strings.TrimSuffix(dir, cdssdk.ObjectPathSeparator)
pathComps := append(lo2.ArrayClone(r.pathComps), cdssdk.BaseName(dir))

cd := r.vfs.cache.LoadDir(pathComps, &cache.CreateDirOption{
ModTime: time.Now(),
})
if cd == nil {
continue
}

dbEntries[dir] = newDirFromCache(cd.Info(), r.vfs)
}

for _, obj := range objs {
pathComps := append(lo2.ArrayClone(r.pathComps), cdssdk.BaseName(obj.Path))
file := newFileFromObject(r.vfs, pathComps, obj)
dbEntries[file.Name()] = file
}

return nil
})

for _, c := range infos {
delete(dbEntries, c.PathComps[len(c.PathComps)-1])

if c.IsDir {
ens = append(ens, newDirFromCache(c, r.vfs))
} else {
ens = append(ens, newFileFromCache(c, r.vfs))
}
}

for _, e := range dbEntries {
ens = append(ens, e)
}

return ens, nil
}

func (r *FuseDir) NewDir(ctx context.Context, name string) (fuse.FsDir, error) {
cache := r.vfs.cache.CreateDir(append(lo2.ArrayClone(r.pathComps), name))
if cache == nil {
return nil, fuse.ErrPermission
}

return newDirFromCache(cache.Info(), r.vfs), nil
}

func (r *FuseDir) NewFile(ctx context.Context, name string, flags uint32) (fuse.FileHandle, error) {
cache := r.vfs.cache.CreateFile(append(lo2.ArrayClone(r.pathComps), name))
if cache == nil {
return nil, fuse.ErrPermission
}
// Open之后会给cache的引用计数额外+1,即使cache先于FileHandle被关闭,
// 也有有FileHandle的计数保持cache的有效性

fileNode := newFileFromCache(cache.Info(), r.vfs)

if flags&uint32(os.O_WRONLY) != 0 {
hd := cache.Open(false)
return newFileHandle(fileNode, hd), nil
}

if flags&uint32(os.O_RDONLY) != 0 {
hd := cache.Open(true)
return newFileHandle(fileNode, hd), nil
}

return nil, fuse.ErrPermission
}

func (r *FuseDir) RemoveChild(ctx context.Context, name string) error {
pathComps := append(lo2.ArrayClone(r.pathComps), name)
err := r.vfs.cache.Remove(pathComps)
if err != nil {
return err
}

// TODO 生成系统事件
// 不关心是否成功
r.vfs.db.DoTx(func(tx db2.SQLContext) error {
d := r.vfs.db
pkg, err := d.Package().GetUserPackageByName(tx, 1, pathComps[0], pathComps[1])
if err != nil {
return err
}

return d.Object().DeleteByPath(tx, pkg.PackageID, cdssdk.JoinObjectPath(pathComps[2:]...))
})

return nil
}

func (r *FuseDir) MoveChild(ctx context.Context, oldName string, newName string, newParent fuse.FsDir) error {

// TODO 有问题
oldPathComps := append(lo2.ArrayClone(r.pathComps), oldName)

newParentNode := newParent.(FuseNode)
_, err := r.vfs.cache.Move(oldPathComps, append(newParentNode.PathComps(), newName))
if err != nil {
return err
}

return nil
}

func (r *FuseDir) loadCacheDir() *cache.CacheDir {
var createOpt *cache.CreateDirOption

err := r.vfs.db.DoTx(func(tx db2.SQLContext) error {
pkg, err := r.vfs.db.Package().GetUserPackageByName(tx, 1, r.pathComps[0], r.pathComps[1])
if err != nil {
return err
}

has, err := r.vfs.db.Object().HasObjectWithPrefix(tx, pkg.PackageID, cdssdk.JoinObjectPath(r.pathComps[2:]...))
if err != nil {
return err
}

if has {
createOpt = &cache.CreateDirOption{
ModTime: time.Now(),
}
}
return nil
})
if err != nil {
return nil
}

return r.vfs.cache.LoadDir(r.pathComps, createOpt)
}

var _ fuse.FsDir = (*FuseDir)(nil)
var _ FuseNode = (*FuseDir)(nil)

+ 37
- 26
client2/internal/mount/vfs/fuse_file.go View File

@@ -1,12 +1,14 @@
package vfs package vfs


import ( import (
"fmt"
"os" "os"
"time" "time"


cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" cdssdk "gitlink.org.cn/cloudream/common/sdks/storage"
"gitlink.org.cn/cloudream/storage/client2/internal/mount/fuse" "gitlink.org.cn/cloudream/storage/client2/internal/mount/fuse"
"gitlink.org.cn/cloudream/storage/client2/internal/mount/vfs/cache" "gitlink.org.cn/cloudream/storage/client2/internal/mount/vfs/cache"
"gorm.io/gorm"
) )


type FuseFileNode struct { type FuseFileNode struct {
@@ -17,13 +19,23 @@ type FuseFileNode struct {
mode os.FileMode mode os.FileMode
} }


func newFile(vfs *Vfs, cache *cache.CacheFile) *FuseFileNode {
func newFileFromCache(info cache.CacheEntryInfo, vfs *Vfs) *FuseFileNode {
return &FuseFileNode{ return &FuseFileNode{
vfs: vfs, vfs: vfs,
pathComps: cache.PathComps(),
size: cache.Size(),
modTime: cache.ModTime(),
mode: cache.Mode(),
pathComps: info.PathComps,
size: info.Size,
modTime: info.ModTime,
mode: info.Mode,
}
}

func newFileFromObject(vfs *Vfs, pathComps []string, obj cdssdk.Object) *FuseFileNode {
return &FuseFileNode{
vfs: vfs,
pathComps: pathComps,
size: obj.Size,
modTime: obj.UpdateTime,
mode: os.FileMode(0755), // TODO Object元数据中是没有保存权限的
} }
} }


@@ -61,7 +73,6 @@ func (n *FuseFileNode) SetModTime(time time.Time) error {
if cacheFile == nil { if cacheFile == nil {
return fuse.ErrNotExists return fuse.ErrNotExists
} }
defer cacheFile.Release()


return cacheFile.SetModTime(time) return cacheFile.SetModTime(time)
} }
@@ -72,15 +83,14 @@ func (n *FuseFileNode) Open(flags uint32) (fuse.FileHandle, error) {
// 如果文件不存在,也不进行创建,因为创建不应该调用这个接口 // 如果文件不存在,也不进行创建,因为创建不应该调用这个接口
return nil, fuse.ErrNotExists return nil, fuse.ErrNotExists
} }
defer cacheFile.Release()


if flags&uint32(os.O_WRONLY) != 0 {
hd := cacheFile.Open(false)
if flags&uint32(os.O_WRONLY) == uint32(os.O_WRONLY) {
hd := cacheFile.Open(true)
return newFileHandle(n, hd), nil return newFileHandle(n, hd), nil
} }


if flags&uint32(os.O_RDONLY) != 0 {
hd := cacheFile.Open(true)
if flags&uint32(os.O_RDONLY) == uint32(os.O_RDONLY) {
hd := cacheFile.Open(false)
return newFileHandle(n, hd), nil return newFileHandle(n, hd), nil
} }


@@ -88,24 +98,25 @@ func (n *FuseFileNode) Open(flags uint32) (fuse.FileHandle, error) {
} }


func (n *FuseFileNode) loadCacheFile() *cache.CacheFile { func (n *FuseFileNode) loadCacheFile() *cache.CacheFile {
cdsObj := n.loadCDSObject()

if cdsObj != nil {
return n.vfs.cache.LoadOrCreateFile(n.pathComps, *cdsObj)
fmt.Printf("path: %v\n", n.pathComps)
if len(n.pathComps) <= 2 {
return n.vfs.cache.LoadFile(n.pathComps, nil)
} }


return n.vfs.cache.LoadFile(n.pathComps, false)
}

func (n *FuseFileNode) loadCDSObject() *cdssdk.Object {
if len(n.pathComps) >= 3 {
pkg, err := n.vfs.db.Package().GetUserPackageByName(n.vfs.db.DefCtx(), 1, n.pathComps[0], n.pathComps[1])
if err == nil {
obj, err := n.vfs.db.Object().GetByPath(n.vfs.db.DefCtx(), pkg.PackageID, cdssdk.JoinObjectPath(n.pathComps[2:]...))
if err == nil {
return &obj
}
cdsObj, err := n.vfs.db.Object().GetByFullPath(n.vfs.db.DefCtx(), n.pathComps[0], n.pathComps[1], cdssdk.JoinObjectPath(n.pathComps[2:]...))
if err == nil {
fmt.Printf("obj: %v\n", cdsObj)
file := n.vfs.cache.LoadFile(n.pathComps, &cdsObj)
if file == nil {
return nil
} }

return file
}

if err == gorm.ErrRecordNotFound {
fmt.Printf("not found\n")
return n.vfs.cache.LoadFile(n.pathComps, nil)
} }


return nil return nil


+ 282
- 0
client2/internal/mount/vfs/fuse_package.go View File

@@ -0,0 +1,282 @@
package vfs

import (
"context"
"fmt"
"os"
"strings"
"time"

cdssdk "gitlink.org.cn/cloudream/common/sdks/storage"
"gitlink.org.cn/cloudream/storage/client2/internal/mount/fuse"
"gitlink.org.cn/cloudream/storage/client2/internal/mount/vfs/cache"
"gitlink.org.cn/cloudream/storage/common/pkgs/db2"
"gorm.io/gorm"
)

type FusePackage struct {
vfs *Vfs
bktName string
pkgName string
modTime time.Time
}

func newPackageFromCache(cache cache.CacheEntryInfo, vfs *Vfs) fuse.FsDir {
pathComps := cache.PathComps
return &FusePackage{
vfs: vfs,
bktName: pathComps[0],
pkgName: pathComps[1],
modTime: cache.ModTime,
}
}

func (r *FusePackage) PathComps() []string {
return []string{r.bktName, r.pkgName}
}

func (r *FusePackage) Name() string {
return r.pkgName
}

func (r *FusePackage) Size() int64 {
return 0
}

func (r *FusePackage) Mode() os.FileMode {
return os.ModeDir | 0755
}

func (r *FusePackage) ModTime() time.Time {
return r.modTime
}

func (r *FusePackage) IsDir() bool {
return true
}

func (r *FusePackage) SetModTime(time time.Time) error {
dir := r.loadCacheDir()
if dir == nil {
return fuse.ErrNotExists
}

return dir.SetModTime(time)
}

// 如果不存在,应该返回ErrNotExists
func (r *FusePackage) Child(ctx context.Context, name string) (fuse.FsEntry, error) {
childPathComps := []string{r.bktName, r.pkgName, name}
ca := r.vfs.cache.Stat(childPathComps)

if ca == nil {
var ret fuse.FsEntry

db := r.vfs.db
err := db.DoTx(func(tx db2.SQLContext) error {
pkg, err := db.Package().GetUserPackageByName(tx, 1, r.bktName, r.pkgName)
if err != nil {
return err
}

obj, err := db.Object().GetByPath(tx, pkg.PackageID, name)
if err == nil {
ret = newFileFromObject(r.vfs, childPathComps, obj)
return nil
}
if err != gorm.ErrRecordNotFound {
return err
}

has, err := db.Object().HasObjectWithPrefix(tx, pkg.PackageID, name+cdssdk.ObjectPathSeparator)
if err != nil {
return err
}

if has {
dir := r.vfs.cache.LoadDir(childPathComps, &cache.CreateDirOption{
ModTime: time.Now(),
})
if dir == nil {
return nil
}

ret = newDirFromCache(dir.Info(), r.vfs)
}

return nil
})
if err != nil {
return nil, err
}

if ret == nil {
return nil, fuse.ErrNotExists
}

return ret, nil
}

if ca.IsDir {
return newDirFromCache(*ca, r.vfs), nil
}

return newFileFromCache(*ca, r.vfs), nil
}

func (r *FusePackage) Children(ctx context.Context) ([]fuse.FsEntry, error) {
return r.listChildren()
}

func (r *FusePackage) ReadChildren() (fuse.DirReader, error) {
ens, err := r.listChildren()
if err != nil {
return nil, err
}

return newFuseDirReader(ens), nil
}

func (r *FusePackage) listChildren() ([]fuse.FsEntry, error) {
var ens []fuse.FsEntry

infos := r.vfs.cache.StatMany([]string{r.bktName, r.pkgName})

dbEntries := make(map[string]fuse.FsEntry)

db := r.vfs.db
db.DoTx(func(tx db2.SQLContext) error {
pkg, err := db.Package().GetUserPackageByName(tx, 1, r.bktName, r.pkgName)
if err != nil {
return err
}

coms, err := db.Object().GetCommonPrefixes(tx, pkg.PackageID, "")
if err != nil {
return fmt.Errorf("getting common prefixes: %w", err)
}

objs, err := db.Object().GetDirectChildren(tx, pkg.PackageID, "")
if err != nil {
return fmt.Errorf("getting direct children: %w", err)
}

for _, dir := range coms {
dir = strings.TrimSuffix(dir, cdssdk.ObjectPathSeparator)
pathComps := []string{r.bktName, r.pkgName, dir}
cd := r.vfs.cache.LoadDir(pathComps, &cache.CreateDirOption{
ModTime: time.Now(),
})
if cd == nil {
continue
}

dbEntries[dir] = newDirFromCache(cd.Info(), r.vfs)
}

for _, obj := range objs {
file := newFileFromObject(r.vfs, []string{r.bktName, r.pkgName, obj.Path}, obj)
dbEntries[file.Name()] = file
}

return nil
})

for _, c := range infos {
delete(dbEntries, c.PathComps[len(c.PathComps)-1])

if c.IsDir {
ens = append(ens, newDirFromCache(c, r.vfs))
} else {
ens = append(ens, newFileFromCache(c, r.vfs))
}
}

for _, e := range dbEntries {
ens = append(ens, e)
}

return ens, nil
}

func (r *FusePackage) NewDir(ctx context.Context, name string) (fuse.FsDir, error) {
cache := r.vfs.cache.CreateDir([]string{r.bktName, r.pkgName, name})
if cache == nil {
return nil, fuse.ErrPermission
}

return newPackageFromCache(cache.Info(), r.vfs), nil
}

func (r *FusePackage) NewFile(ctx context.Context, name string, flags uint32) (fuse.FileHandle, error) {
cache := r.vfs.cache.CreateFile([]string{r.bktName, r.pkgName, name})
if cache == nil {
return nil, fuse.ErrPermission
}
// Open之后会给cache的引用计数额外+1,即使cache先于FileHandle被关闭,
// 也有有FileHandle的计数保持cache的有效性

fileNode := newFileFromCache(cache.Info(), r.vfs)

if flags&uint32(os.O_WRONLY) != 0 {
hd := cache.Open(false)
return newFileHandle(fileNode, hd), nil
}

if flags&uint32(os.O_RDONLY) != 0 {
hd := cache.Open(true)
return newFileHandle(fileNode, hd), nil
}

return nil, fuse.ErrPermission
}

func (r *FusePackage) RemoveChild(ctx context.Context, name string) error {
err := r.vfs.cache.Remove([]string{r.bktName, r.pkgName, name})
if err != nil {
return err
}

// TODO 生成系统事件
// 不关心是否成功
r.vfs.db.DoTx(func(tx db2.SQLContext) error {
d := r.vfs.db

pkg, err := d.Package().GetUserPackageByName(tx, 1, r.bktName, r.pkgName)
if err != nil {
return err
}

return d.Object().DeleteByPath(tx, pkg.PackageID, name)
})

return nil
}

func (r *FusePackage) MoveChild(ctx context.Context, oldName string, newName string, newParent fuse.FsDir) error {

// TODO 有问题

newParentNode := newParent.(FuseNode)
_, err := r.vfs.cache.Move([]string{r.bktName, r.pkgName, oldName}, append(newParentNode.PathComps(), newName))
if err != nil {
return err
}

return nil
}

func (r *FusePackage) loadCacheDir() *cache.CacheDir {
var createOpt *cache.CreateDirOption
pkg, err := r.vfs.db.Package().GetUserPackageByName(r.vfs.db.DefCtx(), 1, r.bktName, r.pkgName)
if err == nil {
createOpt = &cache.CreateDirOption{
ModTime: pkg.CreateTime,
}
}

return r.vfs.cache.LoadDir([]string{r.bktName, r.pkgName}, createOpt)
}

var _ fuse.FsDir = (*FusePackage)(nil)
var _ FuseNode = (*FusePackage)(nil)

+ 71
- 57
client2/internal/mount/vfs/fuse_root.go View File

@@ -5,8 +5,9 @@ import (
"os" "os"
"time" "time"


"github.com/samber/lo"
cdssdk "gitlink.org.cn/cloudream/common/sdks/storage"
"gitlink.org.cn/cloudream/storage/client2/internal/mount/fuse" "gitlink.org.cn/cloudream/storage/client2/internal/mount/fuse"
"gitlink.org.cn/cloudream/storage/client2/internal/mount/vfs/cache"
"gitlink.org.cn/cloudream/storage/common/pkgs/db2" "gitlink.org.cn/cloudream/storage/common/pkgs/db2"
"gorm.io/gorm" "gorm.io/gorm"
) )
@@ -15,6 +16,12 @@ type Root struct {
vfs *Vfs vfs *Vfs
} }


func newRoot(vfs *Vfs) *Root {
return &Root{
vfs: vfs,
}
}

func (r *Root) PathComps() []string { func (r *Root) PathComps() []string {
return []string{} return []string{}
} }
@@ -35,10 +42,6 @@ func (r *Root) ModTime() time.Time {
return time.Now() return time.Now()
} }


func (r *Root) CreateTime() time.Time {
return time.Now()
}

func (r *Root) IsDir() bool { func (r *Root) IsDir() bool {
return true return true
} }
@@ -49,15 +52,21 @@ func (r *Root) SetModTime(time time.Time) error {


// 如果不存在,应该返回ErrNotExists // 如果不存在,应该返回ErrNotExists
func (r *Root) Child(ctx context.Context, name string) (fuse.FsEntry, error) { func (r *Root) Child(ctx context.Context, name string) (fuse.FsEntry, error) {
cache := r.vfs.cache.LoadAny([]string{name})
if cache != nil {
defer cache.Release()
ca := r.vfs.cache.Stat([]string{name})

if ca == nil {
bkt, err := r.vfs.db.Bucket().GetByName(r.vfs.db.DefCtx(), name)
if err == nil {
dir := r.vfs.cache.LoadDir([]string{name}, &cache.CreateDirOption{
ModTime: bkt.CreateTime,
})
if dir == nil {
return nil, fuse.ErrNotExists
}


return newBucketOrFileFromCache(cache, r.vfs), nil
}
return newBucketFromCache(dir.Info(), r.vfs), nil
}


bkt, err := r.vfs.db.Bucket().GetByName(r.vfs.db.DefCtx(), name)
if err != nil {
if err == gorm.ErrRecordNotFound { if err == gorm.ErrRecordNotFound {
return nil, fuse.ErrNotExists return nil, fuse.ErrNotExists
} }
@@ -65,97 +74,99 @@ func (r *Root) Child(ctx context.Context, name string) (fuse.FsEntry, error) {
return nil, err return nil, err
} }


return newBucketFromDB(bkt, r.vfs), nil
if ca.IsDir {
return newBucketFromCache(*ca, r.vfs), nil
}

return newFileFromCache(*ca, r.vfs), nil
} }


func (r *Root) Children(ctx context.Context) ([]fuse.FsEntry, error) { func (r *Root) Children(ctx context.Context) ([]fuse.FsEntry, error) {
var ens []fuse.FsEntry

exists := make(map[string]bool)

caches := r.vfs.cache.LoadMany([]string{})
for _, c := range caches {
exists[c.Name()] = true
ens = append(ens, newBucketOrFileFromCache(c, r.vfs))
c.Release()
}
return r.listChildren()
}


remotes, err := r.vfs.db.Bucket().GetAll(r.vfs.db.DefCtx())
func (r *Root) ReadChildren() (fuse.DirReader, error) {
ens, err := r.listChildren()
if err != nil { if err != nil {
return nil, err return nil, err
} }
for _, remote := range remotes {
if !exists[remote.Name] {
ens = append(ens, newBucketFromDB(remote, r.vfs))
}
}


return ens, nil
return newFuseDirReader(ens), nil
} }


func (r *Root) ReadChildren() (fuse.DirReader, error) {
caches := r.vfs.cache.LoadMany([]string{})
func (r *Root) listChildren() ([]fuse.FsEntry, error) {
var ens []fuse.FsEntry

infos := r.vfs.cache.StatMany([]string{})

bkts, err := r.vfs.db.Bucket().GetAll(r.vfs.db.DefCtx()) bkts, err := r.vfs.db.Bucket().GetAll(r.vfs.db.DefCtx())
if err != nil { if err != nil {
return nil, err return nil, err
} }


exists := make(map[string]fuse.FsEntry)
for _, c := range caches {
exists[c.Name()] = newBucketOrFileFromCache(c, r.vfs)
c.Release()
bktMap := make(map[string]*cdssdk.Bucket)
for _, bkt := range bkts {
b := bkt
bktMap[bkt.Name] = &b
} }


for _, bkt := range bkts {
if _, ok := exists[bkt.Name]; !ok {
exists[bkt.Name] = newBucketFromDB(bkt, r.vfs)
for _, c := range infos {
delete(bktMap, c.PathComps[len(c.PathComps)-1])

if c.IsDir {
ens = append(ens, newBucketFromCache(c, r.vfs))
} else {
ens = append(ens, newFileFromCache(c, r.vfs))
} }
} }


return newFuseDirReader(lo.Values(exists)), nil
// 将远端目录同步到本地缓存中,防止在给目录中的远端对象创建本地缓存时,顺便创建的目录的元数据不对的情况
for _, bkt := range bktMap {
dir := r.vfs.cache.LoadDir([]string{bkt.Name}, &cache.CreateDirOption{
ModTime: bkt.CreateTime,
})

if dir == nil {
continue
}

ens = append(ens, newBucketFromCache(dir.Info(), r.vfs))
}

return ens, nil
} }


func (r *Root) NewDir(ctx context.Context, name string) (fuse.FsDir, error) { func (r *Root) NewDir(ctx context.Context, name string) (fuse.FsDir, error) {
cache := r.vfs.cache.LoadDir([]string{name}, true)
cache := r.vfs.cache.CreateDir([]string{name})
if cache == nil { if cache == nil {
return nil, fuse.ErrPermission return nil, fuse.ErrPermission
} }
defer cache.Release()


// TODO 用户ID,失败了可以打个日志 // TODO 用户ID,失败了可以打个日志
// TODO 生成系统事件 // TODO 生成系统事件
// 不关注创建是否成功,仅尝试一下 // 不关注创建是否成功,仅尝试一下
r.vfs.db.Bucket().Create(r.vfs.db.DefCtx(), 1, name)
r.vfs.db.Bucket().Create(r.vfs.db.DefCtx(), 1, name, cache.ModTime())


return newBucketFromCache(cache, r.vfs), nil
return newBucketFromCache(cache.Info(), r.vfs), nil
} }


func (r *Root) NewFile(ctx context.Context, name string, flags uint32) (fuse.FileHandle, error) { func (r *Root) NewFile(ctx context.Context, name string, flags uint32) (fuse.FileHandle, error) {
cache := r.vfs.cache.LoadFile([]string{name}, true)
cache := r.vfs.cache.CreateFile([]string{name})
if cache == nil { if cache == nil {
return nil, fuse.ErrPermission return nil, fuse.ErrPermission
} }
defer cache.Release()
// Open之后会给cache的引用计数额外+1,即使cache先于FileHandle被关闭, // Open之后会给cache的引用计数额外+1,即使cache先于FileHandle被关闭,
// 也有有FileHandle的计数保持cache的有效性 // 也有有FileHandle的计数保持cache的有效性


fileNode := newFile(r.vfs, cache)
fileNode := newFileFromCache(cache.Info(), r.vfs)


if flags&uint32(os.O_WRONLY) != 0 { if flags&uint32(os.O_WRONLY) != 0 {
hd, err := cache.Open(false)
if err != nil {
return nil, err
}

hd := cache.Open(false)
return newFileHandle(fileNode, hd), nil return newFileHandle(fileNode, hd), nil
} }


if flags&uint32(os.O_RDONLY) != 0 { if flags&uint32(os.O_RDONLY) != 0 {
hd, err := cache.Open(true)
if err != nil {
return nil, err
}

hd := cache.Open(true)
return newFileHandle(fileNode, hd), nil return newFileHandle(fileNode, hd), nil
} }


@@ -187,6 +198,9 @@ func (r *Root) RemoveChild(ctx context.Context, name string) error {
} }


func (r *Root) MoveChild(ctx context.Context, oldName string, newName string, newParent fuse.FsDir) error { func (r *Root) MoveChild(ctx context.Context, oldName string, newName string, newParent fuse.FsDir) error {

// TODO 有问题

newParentNode := newParent.(FuseNode) newParentNode := newParent.(FuseNode)
_, err := r.vfs.cache.Move([]string{oldName}, append(newParentNode.PathComps(), newName)) _, err := r.vfs.cache.Move([]string{oldName}, append(newParentNode.PathComps(), newName))
if err != nil { if err != nil {


+ 17
- 1
client2/internal/mount/vfs/vfs.go View File

@@ -1,9 +1,13 @@
package vfs package vfs


import ( import (
"path/filepath"

"gitlink.org.cn/cloudream/storage/client2/internal/mount/config" "gitlink.org.cn/cloudream/storage/client2/internal/mount/config"
"gitlink.org.cn/cloudream/storage/client2/internal/mount/fuse"
"gitlink.org.cn/cloudream/storage/client2/internal/mount/vfs/cache" "gitlink.org.cn/cloudream/storage/client2/internal/mount/vfs/cache"
"gitlink.org.cn/cloudream/storage/common/pkgs/db2" "gitlink.org.cn/cloudream/storage/common/pkgs/db2"
"gitlink.org.cn/cloudream/storage/common/pkgs/downloader"
) )


type Vfs struct { type Vfs struct {
@@ -12,6 +16,18 @@ type Vfs struct {
cache *cache.Cache cache *cache.Cache
} }


func (v *Vfs) Root() {
func NewVfs(cfg *config.Config, db *db2.DB, downloader *downloader.Downloader) *Vfs {
return &Vfs{
db: db,
config: cfg,
cache: cache.NewCache(db, downloader, filepath.Join(cfg.CacheDir, "data"), filepath.Join(cfg.CacheDir, "meta")),
}
}

func (v *Vfs) Root() fuse.FsDir {
return newRoot(v)
}


func (v *Vfs) Stats() fuse.FsStats {
return fuse.FsStats{}
} }

+ 7
- 0
client2/main.go View File

@@ -0,0 +1,7 @@
package main

import "gitlink.org.cn/cloudream/storage/client2/internal/cmd"

func main() {
cmd.RootCmd.Execute()
}

+ 8
- 7
common/pkgs/db2/bucket.go View File

@@ -3,6 +3,7 @@ package db2
import ( import (
"errors" "errors"
"fmt" "fmt"
"time"


"gorm.io/gorm" "gorm.io/gorm"


@@ -95,7 +96,7 @@ func (*BucketDB) GetUserBuckets(ctx SQLContext, userID cdssdk.UserID) ([]model.B
return ret, err return ret, err
} }


func (db *BucketDB) Create(ctx SQLContext, userID cdssdk.UserID, bucketName string) (cdssdk.BucketID, error) {
func (db *BucketDB) Create(ctx SQLContext, userID cdssdk.UserID, bucketName string, createTime time.Time) (cdssdk.Bucket, error) {
var bucketID int64 var bucketID int64
err := ctx.Table("UserBucket"). err := ctx.Table("UserBucket").
Select("Bucket.BucketID"). Select("Bucket.BucketID").
@@ -104,24 +105,24 @@ func (db *BucketDB) Create(ctx SQLContext, userID cdssdk.UserID, bucketName stri
Scan(&bucketID).Error Scan(&bucketID).Error


if err != nil { if err != nil {
return 0, err
return cdssdk.Bucket{}, err
} }


if bucketID > 0 { if bucketID > 0 {
return 0, gorm.ErrDuplicatedKey
return cdssdk.Bucket{}, gorm.ErrDuplicatedKey
} }


newBucket := cdssdk.Bucket{Name: bucketName, CreatorID: userID}
newBucket := cdssdk.Bucket{Name: bucketName, CreateTime: createTime, CreatorID: userID}
if err := ctx.Table("Bucket").Create(&newBucket).Error; err != nil { if err := ctx.Table("Bucket").Create(&newBucket).Error; err != nil {
return 0, fmt.Errorf("insert bucket failed, err: %w", err)
return cdssdk.Bucket{}, fmt.Errorf("insert bucket failed, err: %w", err)
} }


err = ctx.Table("UserBucket").Create(&model.UserBucket{UserID: userID, BucketID: newBucket.BucketID}).Error err = ctx.Table("UserBucket").Create(&model.UserBucket{UserID: userID, BucketID: newBucket.BucketID}).Error
if err != nil { if err != nil {
return 0, fmt.Errorf("insert user bucket: %w", err)
return cdssdk.Bucket{}, fmt.Errorf("insert user bucket: %w", err)
} }


return newBucket.BucketID, nil
return newBucket, nil
} }


func (db *BucketDB) Rename(ctx SQLContext, bucketID cdssdk.BucketID, bucketName string) error { func (db *BucketDB) Rename(ctx SQLContext, bucketID cdssdk.BucketID, bucketName string) error {


+ 61
- 0
common/pkgs/db2/object.go View File

@@ -5,6 +5,7 @@ import (
"strings" "strings"
"time" "time"


"gorm.io/gorm"
"gorm.io/gorm/clause" "gorm.io/gorm/clause"


cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" cdssdk "gitlink.org.cn/cloudream/common/sdks/storage"
@@ -33,6 +34,15 @@ func (db *ObjectDB) GetByPath(ctx SQLContext, packageID cdssdk.PackageID, path s
return ret, err return ret, err
} }


func (db *ObjectDB) GetByFullPath(ctx SQLContext, bktName string, pkgName string, path string) (cdssdk.Object, error) {
var ret cdssdk.Object
err := ctx.Table("Object").
Joins("join Package on Package.PackageID = Object.PackageID and Package.Name = ?", pkgName).
Joins("join Bucket on Bucket.BucketID = Package.BucketID and Bucket.Name = ?", bktName).
Where("Object.Path = ?", path).First(&ret).Error
return ret, err
}

func (db *ObjectDB) GetWithPathPrefix(ctx SQLContext, packageID cdssdk.PackageID, pathPrefix string) ([]cdssdk.Object, error) { func (db *ObjectDB) GetWithPathPrefix(ctx SQLContext, packageID cdssdk.PackageID, pathPrefix string) ([]cdssdk.Object, error) {
var ret []cdssdk.Object var ret []cdssdk.Object
err := ctx.Table("Object").Where("PackageID = ? AND Path LIKE ?", packageID, pathPrefix+"%").Order("ObjectID ASC").Find(&ret).Error err := ctx.Table("Object").Where("PackageID = ? AND Path LIKE ?", packageID, pathPrefix+"%").Order("ObjectID ASC").Find(&ret).Error
@@ -77,6 +87,20 @@ func (db *ObjectDB) GetDirectChildren(ctx SQLContext, packageID cdssdk.PackageID
return ret, err return ret, err
} }


func (db *ObjectDB) HasObjectWithPrefix(ctx SQLContext, packageID cdssdk.PackageID, pathPrefix string) (bool, error) {
var obj cdssdk.Object
err := ctx.Table("Object").Where("PackageID = ? AND Path LIKE ?", packageID, pathPrefix+"%").First(&obj).Error
if err == nil {
return true, nil
}

if err == gorm.ErrRecordNotFound {
return false, nil
}

return false, err
}

func (db *ObjectDB) BatchTestObjectID(ctx SQLContext, objectIDs []cdssdk.ObjectID) (map[cdssdk.ObjectID]bool, error) { func (db *ObjectDB) BatchTestObjectID(ctx SQLContext, objectIDs []cdssdk.ObjectID) (map[cdssdk.ObjectID]bool, error) {
if len(objectIDs) == 0 { if len(objectIDs) == 0 {
return make(map[cdssdk.ObjectID]bool), nil return make(map[cdssdk.ObjectID]bool), nil
@@ -124,6 +148,39 @@ func (db *ObjectDB) BatchGetByPackagePath(ctx SQLContext, pkgID cdssdk.PackageID
return objs, nil return objs, nil
} }


func (db *ObjectDB) GetDetail(ctx SQLContext, objectID cdssdk.ObjectID) (stgmod.ObjectDetail, error) {
var obj cdssdk.Object
err := ctx.Table("Object").Where("ObjectID = ?", objectID).First(&obj).Error
if err != nil {
return stgmod.ObjectDetail{}, fmt.Errorf("getting object: %w", err)
}

// 获取所有的 ObjectBlock
var allBlocks []stgmod.ObjectBlock
err = ctx.Table("ObjectBlock").Where("ObjectID = ?", objectID).Order("`Index` ASC").Find(&allBlocks).Error
if err != nil {
return stgmod.ObjectDetail{}, fmt.Errorf("getting all object blocks: %w", err)
}

// 获取所有的 PinnedObject
var allPinnedObjs []cdssdk.PinnedObject
err = ctx.Table("PinnedObject").Where("ObjectID = ?", objectID).Order("ObjectID ASC").Find(&allPinnedObjs).Error
if err != nil {
return stgmod.ObjectDetail{}, fmt.Errorf("getting all pinned objects: %w", err)
}

pinnedAt := make([]cdssdk.StorageID, len(allPinnedObjs))
for i, po := range allPinnedObjs {
pinnedAt[i] = po.StorageID
}

return stgmod.ObjectDetail{
Object: obj,
Blocks: allBlocks,
PinnedAt: pinnedAt,
}, nil
}

// 仅返回查询到的对象 // 仅返回查询到的对象
func (db *ObjectDB) BatchGetDetails(ctx SQLContext, objectIDs []cdssdk.ObjectID) ([]stgmod.ObjectDetail, error) { func (db *ObjectDB) BatchGetDetails(ctx SQLContext, objectIDs []cdssdk.ObjectID) ([]stgmod.ObjectDetail, error) {
var objs []cdssdk.Object var objs []cdssdk.Object
@@ -394,3 +451,7 @@ func (db *ObjectDB) BatchDelete(ctx SQLContext, ids []cdssdk.ObjectID) error {
func (db *ObjectDB) DeleteInPackage(ctx SQLContext, packageID cdssdk.PackageID) error { func (db *ObjectDB) DeleteInPackage(ctx SQLContext, packageID cdssdk.PackageID) error {
return ctx.Table("Object").Where("PackageID = ?", packageID).Delete(&cdssdk.Object{}).Error return ctx.Table("Object").Where("PackageID = ?", packageID).Delete(&cdssdk.Object{}).Error
} }

func (db *ObjectDB) DeleteByPath(ctx SQLContext, packageID cdssdk.PackageID, path string) error {
return ctx.Table("Object").Where("PackageID = ? AND Path = ?", packageID, path).Delete(&cdssdk.Object{}).Error
}

+ 12
- 1
common/pkgs/db2/package.go View File

@@ -2,6 +2,7 @@ package db2


import ( import (
"fmt" "fmt"
"time"


"gorm.io/gorm" "gorm.io/gorm"


@@ -76,6 +77,16 @@ func (db *PackageDB) GetBucketPackages(ctx SQLContext, bucketID cdssdk.BucketID)
return ret, err return ret, err
} }


func (db *PackageDB) GetBucketPackagesByName(ctx SQLContext, bucketName string) ([]model.Package, error) {
var ret []model.Package
err := ctx.Table("Package").
Select("Package.*").
Joins("JOIN Bucket ON Package.BucketID = Bucket.BucketID").
Where("Bucket.Name = ?", bucketName).
Find(&ret).Error
return ret, err
}

// IsAvailable 判断一个用户是否拥有指定对象 // IsAvailable 判断一个用户是否拥有指定对象
func (db *PackageDB) IsAvailable(ctx SQLContext, userID cdssdk.UserID, packageID cdssdk.PackageID) (bool, error) { func (db *PackageDB) IsAvailable(ctx SQLContext, userID cdssdk.UserID, packageID cdssdk.PackageID) (bool, error) {
var pkgID cdssdk.PackageID var pkgID cdssdk.PackageID
@@ -133,7 +144,7 @@ func (db *PackageDB) Create(ctx SQLContext, bucketID cdssdk.BucketID, name strin
return cdssdk.Package{}, gorm.ErrDuplicatedKey return cdssdk.Package{}, gorm.ErrDuplicatedKey
} }


newPackage := cdssdk.Package{Name: name, BucketID: bucketID, State: cdssdk.PackageStateNormal}
newPackage := cdssdk.Package{Name: name, BucketID: bucketID, CreateTime: time.Now(), State: cdssdk.PackageStateNormal}
if err := ctx.Create(&newPackage).Error; err != nil { if err := ctx.Create(&newPackage).Error; err != nil {
return cdssdk.Package{}, fmt.Errorf("insert package failed, err: %w", err) return cdssdk.Package{}, fmt.Errorf("insert package failed, err: %w", err)
} }


+ 12
- 2
common/pkgs/downloader/downloader.go View File

@@ -93,8 +93,18 @@ func (d *Downloader) DownloadObjects(reqs []DownloadReqeust) DownloadIterator {
return NewDownloadObjectIterator(d, req2s) return NewDownloadObjectIterator(d, req2s)
} }


func (d *Downloader) DownloadObjectByDetail(detail stgmod.ObjectDetail, off int64, length int64) (Downloading, error) {

func (d *Downloader) DownloadObjectByDetail(detail stgmod.ObjectDetail, off int64, length int64) (*Downloading, error) {
req2s := []downloadReqeust2{{
Detail: &detail,
Raw: DownloadReqeust{
ObjectID: detail.Object.ObjectID,
Offset: off,
Length: length,
},
}}

iter := NewDownloadObjectIterator(d, req2s)
return iter.MoveNext()
} }


func (d *Downloader) DownloadPackage(pkgID cdssdk.PackageID) DownloadIterator { func (d *Downloader) DownloadPackage(pkgID cdssdk.PackageID) DownloadIterator {


+ 2
- 5
coordinator/internal/mq/bucket.go View File

@@ -3,6 +3,7 @@ package mq
import ( import (
"errors" "errors"
"fmt" "fmt"
"time"


stgmod "gitlink.org.cn/cloudream/storage/common/models" stgmod "gitlink.org.cn/cloudream/storage/common/models"
"gitlink.org.cn/cloudream/storage/common/pkgs/db2" "gitlink.org.cn/cloudream/storage/common/pkgs/db2"
@@ -71,15 +72,11 @@ func (svc *Service) CreateBucket(msg *coormq.CreateBucket) (*coormq.CreateBucket
return fmt.Errorf("getting user by id: %w", err) return fmt.Errorf("getting user by id: %w", err)
} }


bucketID, err := svc.db2.Bucket().Create(tx, msg.UserID, msg.BucketName)
bucket, err = svc.db2.Bucket().Create(tx, msg.UserID, msg.BucketName, time.Now())
if err != nil { if err != nil {
return fmt.Errorf("creating bucket: %w", err) return fmt.Errorf("creating bucket: %w", err)
} }


bucket, err = svc.db2.Bucket().GetByID(tx, bucketID)
if err != nil {
return fmt.Errorf("getting bucket by id: %w", err)
}
return nil return nil
}) })
if err != nil { if err != nil {


+ 5
- 4
coordinator/internal/mq/object.go View File

@@ -67,10 +67,11 @@ func (svc *Service) GetObjectsByPath(msg *coormq.GetObjectsByPath) (*coormq.GetO
} }


if !msg.IsPrefix { if !msg.IsPrefix {
objs, err = svc.db2.Object().GetByPath(tx, msg.PackageID, msg.Path)
obj, err := svc.db2.Object().GetByPath(tx, msg.PackageID, msg.Path)
if err != nil { if err != nil {
return fmt.Errorf("getting object by path: %w", err) return fmt.Errorf("getting object by path: %w", err)
} }
objs = append(objs, obj)


return nil return nil
} }
@@ -770,9 +771,9 @@ func (svc *Service) CloneObjects(msg *coormq.CloneObjects) (*coormq.CloneObjects
func (svc *Service) NewMultipartUploadObject(msg *coormq.NewMultipartUploadObject) (*coormq.NewMultipartUploadObjectResp, *mq.CodeMessage) { func (svc *Service) NewMultipartUploadObject(msg *coormq.NewMultipartUploadObject) (*coormq.NewMultipartUploadObjectResp, *mq.CodeMessage) {
var obj cdssdk.Object var obj cdssdk.Object
err := svc.db2.DoTx(func(tx db2.SQLContext) error { err := svc.db2.DoTx(func(tx db2.SQLContext) error {
oldObjs, err := svc.db2.Object().GetByPath(tx, msg.PackageID, msg.Path)
if err == nil && len(oldObjs) > 0 {
obj = oldObjs[0]
oldObj, err := svc.db2.Object().GetByPath(tx, msg.PackageID, msg.Path)
if err == nil {
obj = oldObj
err := svc.db2.ObjectBlock().DeleteByObjectID(tx, obj.ObjectID) err := svc.db2.ObjectBlock().DeleteByObjectID(tx, obj.ObjectID)
if err != nil { if err != nil {
return fmt.Errorf("delete object blocks: %w", err) return fmt.Errorf("delete object blocks: %w", err)


+ 9
- 0
magefiles/main.go View File

@@ -117,6 +117,15 @@ func Client() error {
}) })
} }


func Client2() error {
return magefiles.Build(magefiles.BuildArgs{
OutputName: "client2",
OutputDir: "client2",
AssetsDir: "assets",
EntryFile: "client2/main.go",
})
}

func Coordinator() error { func Coordinator() error {
return magefiles.Build(magefiles.BuildArgs{ return magefiles.Build(magefiles.BuildArgs{
OutputName: "coordinator", OutputName: "coordinator",


Loading…
Cancel
Save