From 416c43f87baa8046f3a4b8e7d7b74e418f299666 Mon Sep 17 00:00:00 2001 From: Sydonian <794346190@qq.com> Date: Fri, 21 Mar 2025 16:17:26 +0800 Subject: [PATCH] =?UTF-8?q?=E7=AE=80=E5=8C=96=E5=88=86=E6=AE=B5=E7=BC=93?= =?UTF-8?q?=E5=AD=98=E6=9C=BA=E5=88=B6=EF=BC=9B=E6=8C=82=E8=BD=BD=E6=94=AF?= =?UTF-8?q?=E6=8C=81=E6=98=BE=E7=A4=BA=E7=9B=AE=E5=BD=95=E7=BB=93=E6=9E=84?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- client2/internal/cmd/cmd.go | 5 + client2/internal/cmd/mount.go | 105 ++++ client2/internal/config/config.go | 43 ++ client2/internal/mount/config/config.go | 1 + client2/internal/mount/fuse/dir_node.go | 29 +- client2/internal/mount/fuse/file_node.go | 7 + client2/internal/mount/fuse/fuse.go | 14 +- client2/internal/mount/fuse/types.go | 1 + client2/internal/mount/mount.go | 65 +- client2/internal/mount/mount_win.go | 53 ++ client2/internal/mount/vfs/cache/cache.go | 158 ++++- .../internal/mount/vfs/cache/data_loader.go | 37 -- client2/internal/mount/vfs/cache/dir.go | 103 +++- client2/internal/mount/vfs/cache/file.go | 577 ++++++++++++------ .../internal/mount/vfs/cache/file_saver.go | 12 - .../internal/mount/vfs/cache/file_segment.go | 117 ---- .../mount/vfs/cache/file_segment_test.go | 36 -- .../internal/mount/vfs/cache/range_test.go | 332 ++++++++++ client2/internal/mount/vfs/cache/remote.go | 95 +++ client2/internal/mount/vfs/cache/utils.go | 155 ++++- client2/internal/mount/vfs/fuse_bucket.go | 234 ++++++- client2/internal/mount/vfs/fuse_dir.go | 301 +++++++++ client2/internal/mount/vfs/fuse_file.go | 63 +- client2/internal/mount/vfs/fuse_package.go | 282 +++++++++ client2/internal/mount/vfs/fuse_root.go | 128 ++-- client2/internal/mount/vfs/vfs.go | 18 +- client2/main.go | 7 + common/pkgs/db2/bucket.go | 15 +- common/pkgs/db2/object.go | 61 ++ common/pkgs/db2/package.go | 13 +- common/pkgs/downloader/downloader.go | 14 +- coordinator/internal/mq/bucket.go | 7 +- coordinator/internal/mq/object.go | 9 +- magefiles/main.go | 9 + 34 files changed, 2553 insertions(+), 553 deletions(-) create mode 100644 client2/internal/cmd/cmd.go create mode 100644 client2/internal/cmd/mount.go create mode 100644 client2/internal/config/config.go create mode 100644 client2/internal/mount/mount_win.go delete mode 100644 client2/internal/mount/vfs/cache/data_loader.go delete mode 100644 client2/internal/mount/vfs/cache/file_saver.go delete mode 100644 client2/internal/mount/vfs/cache/file_segment.go delete mode 100644 client2/internal/mount/vfs/cache/file_segment_test.go create mode 100644 client2/internal/mount/vfs/cache/range_test.go create mode 100644 client2/internal/mount/vfs/cache/remote.go create mode 100644 client2/internal/mount/vfs/fuse_dir.go create mode 100644 client2/internal/mount/vfs/fuse_package.go create mode 100644 client2/main.go diff --git a/client2/internal/cmd/cmd.go b/client2/internal/cmd/cmd.go new file mode 100644 index 0000000..d79aea8 --- /dev/null +++ b/client2/internal/cmd/cmd.go @@ -0,0 +1,5 @@ +package cmd + +import "github.com/spf13/cobra" + +var RootCmd = cobra.Command{} diff --git a/client2/internal/cmd/mount.go b/client2/internal/cmd/mount.go new file mode 100644 index 0000000..889ad3a --- /dev/null +++ b/client2/internal/cmd/mount.go @@ -0,0 +1,105 @@ +package cmd + +import ( + "fmt" + "os" + "time" + + "github.com/spf13/cobra" + "gitlink.org.cn/cloudream/common/pkgs/logger" + "gitlink.org.cn/cloudream/storage/client2/internal/config" + "gitlink.org.cn/cloudream/storage/client2/internal/mount" + mntcfg "gitlink.org.cn/cloudream/storage/client2/internal/mount/config" + stgglb "gitlink.org.cn/cloudream/storage/common/globals" + "gitlink.org.cn/cloudream/storage/common/pkgs/connectivity" + "gitlink.org.cn/cloudream/storage/common/pkgs/db2" + "gitlink.org.cn/cloudream/storage/common/pkgs/downloader" + "gitlink.org.cn/cloudream/storage/common/pkgs/downloader/strategy" + agtrpc "gitlink.org.cn/cloudream/storage/common/pkgs/grpc/agent" + "gitlink.org.cn/cloudream/storage/common/pkgs/metacache" + "gitlink.org.cn/cloudream/storage/common/pkgs/storage/agtpool" +) + +func init() { + var configPath string + + cmd := &cobra.Command{ + Use: "mount", + Args: cobra.ExactArgs(1), + Run: func(cmd *cobra.Command, args []string) { + mountCmd(args[0], configPath) + }, + } + cmd.Flags().StringVarP(&configPath, "config", "c", "", "path to config file") + + RootCmd.AddCommand(cmd) +} + +func mountCmd(mountPoint string, configPath string) { + err := config.Init(configPath) + if err != nil { + fmt.Printf("init config failed, err: %s", err.Error()) + os.Exit(1) + } + + err = logger.Init(&config.Cfg().Logger) + if err != nil { + fmt.Printf("init logger failed, err: %s", err.Error()) + os.Exit(1) + } + + stgglb.InitLocal(&config.Cfg().Local) + stgglb.InitMQPool(config.Cfg().RabbitMQ) + stgglb.InitAgentRPCPool(&agtrpc.PoolConfig{}) + stgglb.Stats.SetupHubStorageTransfer(*config.Cfg().Local.HubID) + stgglb.Stats.SetupHubTransfer(*config.Cfg().Local.HubID) + + // 初始化存储服务管理器 + stgAgts := agtpool.NewPool() + + // 启动网络连通性检测,并就地检测一次 + conCol := connectivity.NewCollector(&config.Cfg().Connectivity, nil) + // conCol.CollectInPlace() + + // 初始化元数据缓存服务 + metacacheHost := metacache.NewHost() + go metacacheHost.Serve() + stgMeta := metacacheHost.AddStorageMeta() + hubMeta := metacacheHost.AddHubMeta() + conMeta := metacacheHost.AddConnectivity() + + // 初始化下载策略选择器 + strgSel := strategy.NewSelector(config.Cfg().DownloadStrategy, stgMeta, hubMeta, conMeta) + + // 初始化下载器 + dlder := downloader.NewDownloader(config.Cfg().Downloader, &conCol, stgAgts, strgSel) + + db, err := db2.NewDB(&config.Cfg().DB) + if err != nil { + logger.Fatalf("new db2 failed, err: %s", err.Error()) + } + + mnt := mount.NewMount(&mntcfg.Config{ + CacheDir: "./cache", + MountPoint: mountPoint, + AttrTimeout: time.Second * 5, + }, db, &dlder) + + ch := mnt.Start() + for { + evt, err := ch.Receive() + if err != nil { + break + } + + switch e := evt.(type) { + case mount.MountingFailedEvent: + fmt.Println("mounting failed:", e.Err) + return + + case mount.MountExitEvent: + fmt.Printf("mount exit\n") + return + } + } +} diff --git a/client2/internal/config/config.go b/client2/internal/config/config.go new file mode 100644 index 0000000..c40b118 --- /dev/null +++ b/client2/internal/config/config.go @@ -0,0 +1,43 @@ +package config + +import ( + "gitlink.org.cn/cloudream/common/pkgs/distlock" + log "gitlink.org.cn/cloudream/common/pkgs/logger" + "gitlink.org.cn/cloudream/common/pkgs/mq" + cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" + c "gitlink.org.cn/cloudream/common/utils/config" + stgmodels "gitlink.org.cn/cloudream/storage/common/models" + "gitlink.org.cn/cloudream/storage/common/pkgs/connectivity" + db "gitlink.org.cn/cloudream/storage/common/pkgs/db2/config" + "gitlink.org.cn/cloudream/storage/common/pkgs/downloader" + "gitlink.org.cn/cloudream/storage/common/pkgs/downloader/strategy" + "gitlink.org.cn/cloudream/storage/common/pkgs/grpc" +) + +type Config struct { + ID cdssdk.HubID `json:"id"` + ListenAddr string `json:"listenAddr"` + Local stgmodels.LocalMachineInfo `json:"local"` + GRPC *grpc.Config `json:"grpc"` + Logger log.Config `json:"logger"` + RabbitMQ mq.Config `json:"rabbitMQ"` + DistLock distlock.Config `json:"distlock"` + Connectivity connectivity.Config `json:"connectivity"` + Downloader downloader.Config `json:"downloader"` + DownloadStrategy strategy.Config `json:"downloadStrategy"` + DB db.Config `json:"db"` +} + +var cfg Config + +func Init(path string) error { + if path == "" { + return c.DefaultLoad("client2", &cfg) + } + + return c.Load(path, &cfg) +} + +func Cfg() *Config { + return &cfg +} diff --git a/client2/internal/mount/config/config.go b/client2/internal/mount/config/config.go index e4c4664..32a0bd6 100644 --- a/client2/internal/mount/config/config.go +++ b/client2/internal/mount/config/config.go @@ -4,6 +4,7 @@ import "time" type Config struct { CacheDir string `json:"cacheDir"` + MountPoint string `json:"mountPoint"` GID uint32 `json:"gid"` UID uint32 `json:"uid"` AttrTimeout time.Duration `json:"attrTimeout"` diff --git a/client2/internal/mount/fuse/dir_node.go b/client2/internal/mount/fuse/dir_node.go index 88654d1..b7d8d82 100644 --- a/client2/internal/mount/fuse/dir_node.go +++ b/client2/internal/mount/fuse/dir_node.go @@ -9,6 +9,7 @@ import ( fusefs "github.com/hanwen/go-fuse/v2/fs" "github.com/hanwen/go-fuse/v2/fuse" + "gitlink.org.cn/cloudream/common/pkgs/logger" ) type DirNode struct { @@ -21,12 +22,16 @@ func newDirNode(fs *Fuse, dir FsDir) *DirNode { } func (n *DirNode) Getattr(ctx context.Context, f fusefs.FileHandle, out *fuse.AttrOut) syscall.Errno { + logger.Tracef("DirNode.Getattr: %v", n.dir.Name()) + n.fs.fillAttrOut(n.dir, out) return 0 } // Setattr sets attributes for an Inode. func (n *DirNode) Setattr(ctx context.Context, f fusefs.FileHandle, in *fuse.SetAttrIn, out *fuse.AttrOut) (errno syscall.Errno) { + logger.Tracef("DirNode.Setattr: %v", n.dir.Name()) + n.fs.fillAttrOut(n.dir, out) _, ok := in.GetSize() @@ -55,6 +60,8 @@ const accessModeMask = (os.O_RDONLY | os.O_WRONLY | os.O_RDWR) // Open opens an Inode (of regular file type) for reading. It // is optional but recommended to return a FileHandle. func (n *DirNode) Open(ctx context.Context, flags uint32) (fh fusefs.FileHandle, fuseFlags uint32, errno syscall.Errno) { + logger.Tracef("DirNode.Open: %v, %#o", n.dir.Name(), flags) + rdwrMode := int(flags) & accessModeMask if rdwrMode != os.O_RDONLY { return nil, 0, syscall.EPERM @@ -71,6 +78,8 @@ func (n *DirNode) Open(ctx context.Context, flags uint32) (fh fusefs.FileHandle, var _ = (fusefs.NodeOpener)((*DirNode)(nil)) func (n *DirNode) Lookup(ctx context.Context, name string, out *fuse.EntryOut) (inode *fusefs.Inode, errno syscall.Errno) { + logger.Tracef("DirNode.Lookup: %v, %v", n.dir.Name(), name) + child, err := n.dir.Child(ctx, name) if err != nil { return nil, translateError(err) @@ -81,7 +90,7 @@ func (n *DirNode) Lookup(ctx context.Context, name string, out *fuse.EntryOut) ( node := newDirNode(n.fs, child) n.fs.fillEntryOut(child, out) - return node.NewInode(ctx, node, fusefs.StableAttr{ + return n.NewInode(ctx, node, fusefs.StableAttr{ Mode: out.Attr.Mode, }), 0 @@ -89,7 +98,7 @@ func (n *DirNode) Lookup(ctx context.Context, name string, out *fuse.EntryOut) ( node := newFileNode(n.fs, child) n.fs.fillEntryOut(child, out) - return node.NewInode(ctx, node, fusefs.StableAttr{ + return n.NewInode(ctx, node, fusefs.StableAttr{ Mode: out.Attr.Mode, }), 0 @@ -133,6 +142,8 @@ func (s *dirStream) Close() { } func (n *DirNode) Readdir(ctx context.Context) (ds fusefs.DirStream, errno syscall.Errno) { + logger.Tracef("DirNode.Readdir: %v", n.dir.Name()) + reader, err := n.dir.ReadChildren() if err != nil { return nil, translateError(err) @@ -144,6 +155,8 @@ func (n *DirNode) Readdir(ctx context.Context) (ds fusefs.DirStream, errno sysca var _ = (fusefs.NodeReaddirer)((*DirNode)(nil)) func (n *DirNode) Mkdir(ctx context.Context, name string, mode uint32, out *fuse.EntryOut) (inode *fusefs.Inode, errno syscall.Errno) { + logger.Tracef("DirNode.Mkdir: %v, %v, %#o", n.dir.Name(), name, mode) + newDir, err := n.dir.NewDir(ctx, name) if err != nil { return nil, translateError(err) @@ -152,7 +165,7 @@ func (n *DirNode) Mkdir(ctx context.Context, name string, mode uint32, out *fuse node := newDirNode(n.fs, newDir) n.fs.fillEntryOut(newDir, out) - return node.NewInode(ctx, node, fusefs.StableAttr{ + return n.NewInode(ctx, node, fusefs.StableAttr{ Mode: out.Attr.Mode, }), 0 } @@ -160,6 +173,8 @@ func (n *DirNode) Mkdir(ctx context.Context, name string, mode uint32, out *fuse var _ = (fusefs.NodeMkdirer)((*DirNode)(nil)) func (n *DirNode) Create(ctx context.Context, name string, flags uint32, mode uint32, out *fuse.EntryOut) (node *fusefs.Inode, fh fusefs.FileHandle, fuseFlags uint32, errno syscall.Errno) { + logger.Tracef("DirNode.Create: %v, %v, %#o, %#o", n.dir.Name(), name, flags, mode) + hd, err := n.dir.NewFile(ctx, name, flags) if err != nil { return nil, nil, 0, translateError(err) @@ -168,7 +183,7 @@ func (n *DirNode) Create(ctx context.Context, name string, flags uint32, mode ui n.fs.fillEntryOut(hd.Entry(), out) fileNode := newFileNode(n.fs, hd.Entry()) - return fileNode.NewInode(ctx, fileNode, fusefs.StableAttr{ + return n.NewInode(ctx, fileNode, fusefs.StableAttr{ Mode: out.Attr.Mode, }), hd, 0, 0 } @@ -179,6 +194,8 @@ var _ = (fusefs.NodeCreater)((*DirNode)(nil)) // return status is OK, the Inode is removed as child in the // FS tree automatically. Default is to return EROFS. func (n *DirNode) Unlink(ctx context.Context, name string) (errno syscall.Errno) { + logger.Tracef("DirNode.Unlink: %v, %v", n.dir.Name(), name) + return translateError(n.dir.RemoveChild(ctx, name)) } @@ -187,12 +204,16 @@ var _ = (fusefs.NodeUnlinker)((*DirNode)(nil)) // Rmdir is like Unlink but for directories. // Default is to return EROFS. func (n *DirNode) Rmdir(ctx context.Context, name string) (errno syscall.Errno) { + logger.Tracef("DirNode.Rmdir: %v, %v", n.dir.Name(), name) + return translateError(n.dir.RemoveChild(ctx, name)) } var _ = (fusefs.NodeRmdirer)((*DirNode)(nil)) func (n *DirNode) Rename(ctx context.Context, oldName string, newParent fusefs.InodeEmbedder, newName string, flags uint32) (errno syscall.Errno) { + logger.Tracef("DirNode.Rename: %v/%v->%v, %#o", n.dir.Name(), oldName, newName, flags) + newParentNode, ok := newParent.(*DirNode) if !ok { return syscall.ENOTDIR diff --git a/client2/internal/mount/fuse/file_node.go b/client2/internal/mount/fuse/file_node.go index 21b3b2d..fa0bfb0 100644 --- a/client2/internal/mount/fuse/file_node.go +++ b/client2/internal/mount/fuse/file_node.go @@ -8,6 +8,7 @@ import ( fusefs "github.com/hanwen/go-fuse/v2/fs" "github.com/hanwen/go-fuse/v2/fuse" + "gitlink.org.cn/cloudream/common/pkgs/logger" ) type FileNode struct { @@ -23,11 +24,15 @@ func newFileNode(fs *Fuse, file FsFile) *FileNode { } func (n *FileNode) Getattr(ctx context.Context, f fusefs.FileHandle, out *fuse.AttrOut) syscall.Errno { + logger.Tracef("FileNode.Getattr: %v", n.file.Name()) + n.fs.fillAttrOut(n.file, out) return 0 } func (n *FileNode) Setattr(ctx context.Context, f fusefs.FileHandle, in *fuse.SetAttrIn, out *fuse.AttrOut) (errno syscall.Errno) { + logger.Tracef("FileNode.Setattr: %v", n.file.Name()) + n.fs.fillAttrOut(n.file, out) size, ok := in.GetSize() @@ -55,6 +60,8 @@ func (n *FileNode) Setattr(ctx context.Context, f fusefs.FileHandle, in *fuse.Se var _ = (fusefs.NodeSetattrer)((*FileNode)(nil)) func (n *FileNode) Open(ctx context.Context, flags uint32) (fh fusefs.FileHandle, fuseFlags uint32, errno syscall.Errno) { + logger.Tracef("FileNode.Open: %v, %#o", n.file.Name(), flags) + // 只支持以下标志: // os.O_RDONLY // os.O_WRONLY diff --git a/client2/internal/mount/fuse/fuse.go b/client2/internal/mount/fuse/fuse.go index 7d2bb92..1d45b1e 100644 --- a/client2/internal/mount/fuse/fuse.go +++ b/client2/internal/mount/fuse/fuse.go @@ -12,17 +12,19 @@ import ( ) type Fuse struct { - fs Fs - rootDir FsDir - config *config.Config + config *config.Config + fs Fs } -func NewFuse(fs Fs, rootDir FsDir, config *config.Config) *Fuse { - return &Fuse{fs: fs, rootDir: rootDir, config: config} +func NewFuse(config *config.Config, fs Fs) *Fuse { + return &Fuse{ + config: config, + fs: fs, + } } func (v *Fuse) Root() fusefs.InodeEmbedder { - return newDirNode(v, v.rootDir) + return newDirNode(v, v.fs.Root()) } func translateError(err error) syscall.Errno { diff --git a/client2/internal/mount/fuse/types.go b/client2/internal/mount/fuse/types.go index 8c987e3..28f46be 100644 --- a/client2/internal/mount/fuse/types.go +++ b/client2/internal/mount/fuse/types.go @@ -16,6 +16,7 @@ var ( type Fs interface { Stats() FsStats + Root() FsDir } type FsStats struct { diff --git a/client2/internal/mount/mount.go b/client2/internal/mount/mount.go index d217fbf..1f5dc46 100644 --- a/client2/internal/mount/mount.go +++ b/client2/internal/mount/mount.go @@ -1,11 +1,70 @@ +//go:build linux || (darwin && amd64) + package mount -import cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" +import ( + fusefs "github.com/hanwen/go-fuse/v2/fs" + "github.com/hanwen/go-fuse/v2/fuse" + cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" + "gitlink.org.cn/cloudream/common/utils/sync2" + "gitlink.org.cn/cloudream/storage/client2/internal/mount/config" + fuse2 "gitlink.org.cn/cloudream/storage/client2/internal/mount/fuse" + "gitlink.org.cn/cloudream/storage/client2/internal/mount/vfs" + "gitlink.org.cn/cloudream/storage/common/pkgs/db2" + "gitlink.org.cn/cloudream/storage/common/pkgs/downloader" +) + +type MountEvent interface { + IsMountEvent() bool +} + +type MountExitEvent struct { + MountEvent +} + +type MountingFailedEvent struct { + MountEvent + Err error +} + +type Mount struct { + cfg *config.Config + vfs *vfs.Vfs + fuse *fuse2.Fuse +} + +func NewMount(cfg *config.Config, db *db2.DB, downloader *downloader.Downloader) *Mount { + vfs := vfs.NewVfs(cfg, db, downloader) + fuse := fuse2.NewFuse(cfg, vfs) + + return &Mount{ + cfg: cfg, + vfs: vfs, + fuse: fuse, + } +} + +func (m *Mount) Start() *sync2.UnboundChannel[MountEvent] { + ch := sync2.NewUnboundChannel[MountEvent]() + go func() { -type Mount struct{} + nodeFsOpt := &fusefs.Options{ + MountOptions: fuse.MountOptions{ + FsName: "CDS", + }, + } + rawFs := fusefs.NewNodeFS(m.fuse.Root(), nodeFsOpt) -func (m *Mount) Start() { + svr, err := fuse.NewServer(rawFs, m.cfg.MountPoint, &nodeFsOpt.MountOptions) + if err != nil { + ch.Send(MountingFailedEvent{Err: err}) + return + } + svr.Serve() + ch.Send(MountExitEvent{}) + }() + return ch } func (m *Mount) NotifyObjectInvalid(obj cdssdk.Object) { diff --git a/client2/internal/mount/mount_win.go b/client2/internal/mount/mount_win.go new file mode 100644 index 0000000..18c2e87 --- /dev/null +++ b/client2/internal/mount/mount_win.go @@ -0,0 +1,53 @@ +//go:build windows + +package mount + +import ( + "fmt" + + cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" + "gitlink.org.cn/cloudream/common/utils/sync2" + "gitlink.org.cn/cloudream/storage/client2/internal/mount/config" + "gitlink.org.cn/cloudream/storage/common/pkgs/db2" + "gitlink.org.cn/cloudream/storage/common/pkgs/downloader" +) + +type MountEvent interface { + IsMountEvent() bool +} + +type MountExitEvent struct { + MountEvent +} + +type MountingFailedEvent struct { + MountEvent + Err error +} + +type Mount struct { +} + +func NewMount(cfg *config.Config, db *db2.DB, downloader *downloader.Downloader) *Mount { + return &Mount{} +} + +func (m *Mount) Start() *sync2.UnboundChannel[MountEvent] { + ch := sync2.NewUnboundChannel[MountEvent]() + go func() { + ch.Send(MountingFailedEvent{Err: fmt.Errorf("not implemented")}) + }() + return ch +} + +func (m *Mount) NotifyObjectInvalid(obj cdssdk.Object) { + +} + +func (m *Mount) NotifyPackageInvalid(pkg cdssdk.Package) { + +} + +func (m *Mount) NotifyBucketInvalid(bkt cdssdk.Bucket) { + +} diff --git a/client2/internal/mount/vfs/cache/cache.go b/client2/internal/mount/vfs/cache/cache.go index b7a11f7..2ad1113 100644 --- a/client2/internal/mount/vfs/cache/cache.go +++ b/client2/internal/mount/vfs/cache/cache.go @@ -1,11 +1,17 @@ package cache import ( + "fmt" "os" "path/filepath" + "time" + "gitlink.org.cn/cloudream/common/pkgs/logger" cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" + "gitlink.org.cn/cloudream/common/utils/lo2" "gitlink.org.cn/cloudream/storage/client2/internal/mount/fuse" + "gitlink.org.cn/cloudream/storage/common/pkgs/db2" + "gitlink.org.cn/cloudream/storage/common/pkgs/downloader" ) type CacheEntry interface { @@ -13,14 +19,33 @@ type CacheEntry interface { // 在虚拟文件系统中的路径,即不包含缓存目录的路径 PathComps() []string // 不再使用本缓存条目 - Release() + // Release() +} + +type CacheEntryInfo struct { + PathComps []string + Size int64 + Mode os.FileMode + ModTime time.Time + IsDir bool } type Cache struct { + db *db2.DB + downloader *downloader.Downloader cacheDataDir string cacheMetaDir string } +func NewCache(db *db2.DB, downloader *downloader.Downloader, cacheDataDir, cacheMetaDir string) *Cache { + return &Cache{ + db: db, + downloader: downloader, + cacheDataDir: cacheDataDir, + cacheMetaDir: cacheMetaDir, + } +} + func (c *Cache) GetCacheDataPath(comps ...string) string { comps2 := make([]string, len(comps)+1) comps2[0] = c.cacheDataDir @@ -49,59 +74,146 @@ func (c *Cache) GetCacheMetaPathComps(comps ...string) []string { return comps2 } -// 加载指定路径的缓存文件或者目录,如果路径不存在,则返回nil。 -func (c *Cache) LoadAny(pathComps []string) CacheEntry { - pat := c.GetCacheMetaPath(pathComps...) - info, err := os.Stat(pat) +// 获取指定位置的缓存条目信息。如果路径不存在,则返回nil。 +func (c *Cache) Stat(pathComps []string) *CacheEntryInfo { + metaPath := c.GetCacheMetaPath(pathComps...) + stat, err := os.Stat(metaPath) if err != nil { // TODO 日志记录 return nil } - if info.IsDir() { - return newCacheDir(pathComps, info) + if stat.IsDir() { + info, err := loadCacheDirInfo(c, pathComps) + if err != nil { + return nil + } + + return info + } + + info, err := loadCacheFileInfo(c, pathComps) + if err != nil { + return nil } - file, err := loadCacheFile(pathComps, pat) + return info +} + +// 创建一个缓存文件。如果文件已经存在,则会覆盖已有文件。如果加载过程中发生了错误,或者目标位置是一个目录,则会返回nil。 +func (c *Cache) CreateFile(pathComps []string) *CacheFile { + ch, err := createNewCacheFile(c, pathComps) if err != nil { // TODO 日志记录 return nil } - return file + return ch } -// 加载指定缓存文件,如果文件不存在,则根据create参数决定是否创建文件。 -// -// 如果create为false,且文件不存在,则返回nil。如果目标位置是一个目录,则也会返回nil。 -func (c *Cache) LoadFile(pathComps []string, create bool) *CacheFile { +// 尝试加载缓存文件,如果文件不存在,则使用obj的信息创建一个新缓存文件,而如果obj为nil,那么会返回nil。 +func (c *Cache) LoadFile(pathComps []string, obj *cdssdk.Object) *CacheFile { + ch, err := loadCacheFile(c, pathComps) + if err == nil { + ch.remoteObj = obj + return ch + } + if !os.IsNotExist(err) { + // TODO 日志记录 + logger.Tracef("load cache file: %v", err) + return nil + } + + if obj == nil { + return nil + } + + ch, err = makeCacheFileFromObject(c, pathComps, obj) + if err != nil { + // TODO 日志记录 + logger.Tracef("make cache file from object: %v", err) + return nil + } + return ch } -// 加载指定缓存文件,如果文件不存在,则根据obj参数创建缓存文件。 -func (c *Cache) LoadOrCreateFile(pathComps []string, obj cdssdk.Object) *CacheFile { +// 创建一个缓存目录。如果目录已经存在,则会重置目录属性。如果加载过程中发生了错误,或者目标位置是一个文件,则会返回nil +func (c *Cache) CreateDir(pathComps []string) *CacheDir { + ch, err := createNewCacheDir(c, pathComps) + if err != nil { + // TODO 日志记录 + return nil + } + return ch +} +type CreateDirOption struct { + ModTime time.Time } -// 加载指定缓存目录,如果目录不存在,则根据create参数决定是否创建目录。 -// -// 如果create为false,且目录不存在,则返回nil。如果目标位置是一个文件,则也会返回nil。 -func (c *Cache) LoadDir(pathComps []string, create bool) *CacheDir { +// 加载指定缓存目录,如果目录不存在,则使用createOpt选项创建目录,而如果createOpt为nil,那么会返回nil。 +func (c *Cache) LoadDir(pathComps []string, createOpt *CreateDirOption) *CacheDir { + ch, err := loadCacheDir(c, pathComps) + if err == nil { + return ch + } + if !os.IsNotExist(err) { + // TODO 日志记录 + return nil + } + + if createOpt == nil { + return nil + } + + // 创建目录 + ch, err = makeCacheDirFromOption(c, pathComps, *createOpt) + if err != nil { + // TODO 日志记录 + return nil + } + return ch } -// 加载指定路径下所有的缓存文件或者目录,如果路径不存在,则返回nil。 -func (c *Cache) LoadMany(pathComps []string) []CacheEntry { +// 加载指定路径下的所有缓存条目信息 +func (c *Cache) StatMany(pathComps []string) []CacheEntryInfo { + osEns, err := os.ReadDir(c.GetCacheMetaPath(pathComps...)) + if err != nil { + return nil + } + + var infos []CacheEntryInfo + for _, e := range osEns { + if e.IsDir() { + info, err := loadCacheDirInfo(c, append(lo2.ArrayClone(pathComps), e.Name())) + if err != nil { + continue + } + + infos = append(infos, *info) + } else { + info, err := loadCacheFileInfo(c, append(lo2.ArrayClone(pathComps), e.Name())) + if err != nil { + continue + } + + infos = append(infos, *info) + } + } + + return infos } // 删除指定路径的缓存文件或目录。删除目录时如果目录不为空,则会报错。 func (c *Cache) Remove(pathComps []string) error { - + return fmt.Errorf("not implemented") } // 移动指定路径的缓存文件或目录到新的路径。如果目标路径已经存在,则会报错。 // // 如果移动成功,则返回移动后的缓存文件或目录。如果文件或目录不存在,则返回nil。 func (c *Cache) Move(pathComps []string, newPathComps []string) (CacheEntry, error) { - + return nil, fmt.Errorf("not implemented") } diff --git a/client2/internal/mount/vfs/cache/data_loader.go b/client2/internal/mount/vfs/cache/data_loader.go deleted file mode 100644 index cd35c8c..0000000 --- a/client2/internal/mount/vfs/cache/data_loader.go +++ /dev/null @@ -1,37 +0,0 @@ -package cache - -import ( - "io" - "os" - - stgmod "gitlink.org.cn/cloudream/storage/common/models" -) - -type LocalLoader struct { - cache *CacheFile - file *os.File - offset int64 - // 当前正在加载的数据段。为nil表示没有加载任何数据。 - loading *FileSegment - // 最多加载到哪个位置。-1代表无限制。 - limit int64 -} - -// 启动数据加载。通过设置seg的Position和Length来指定要加载的数据范围。 -func (l *LocalLoader) BeginLoading(seg *FileSegment) { - -} - -type RemoteLoader struct { - cache *CacheFile - object stgmod.ObjectDetail - reader io.ReadCloser - offset int64 - // 当前正在加载的数据段。为nil表示没有加载任何数据。 - loading *FileSegment - // 最多加载到哪个位置。-1代表无限制。 - limit int64 -} - -func (l *RemoteLoader) BeginLoading(seg *FileSegment) { -} diff --git a/client2/internal/mount/vfs/cache/dir.go b/client2/internal/mount/vfs/cache/dir.go index 6e7788f..4467b2f 100644 --- a/client2/internal/mount/vfs/cache/dir.go +++ b/client2/internal/mount/vfs/cache/dir.go @@ -1,6 +1,7 @@ package cache import ( + "fmt" "os" "time" ) @@ -12,13 +13,90 @@ type CacheDir struct { perm os.FileMode } -func newCacheDir(pathComps []string, info os.FileInfo) *CacheDir { +func createNewCacheDir(c *Cache, pathComps []string) (*CacheDir, error) { + metaPath := c.GetCacheMetaPath(pathComps...) + dataPath := c.GetCacheDataPath(pathComps...) + + err := os.MkdirAll(metaPath, 0755) + if err != nil { + return nil, err + } + + err = os.MkdirAll(dataPath, 0755) + if err != nil { + return nil, err + } + + // 修改文件夹的修改时间 + modTime := time.Now() + os.Chtimes(metaPath, modTime, modTime) + + return &CacheDir{ + pathComps: pathComps, + name: pathComps[len(pathComps)-1], + modTime: modTime, + perm: 0755, + }, nil +} + +func loadCacheDir(c *Cache, pathComps []string) (*CacheDir, error) { + stat, err := os.Stat(c.GetCacheMetaPath(pathComps...)) + if err != nil { + return nil, err + } + + if !stat.IsDir() { + return nil, fmt.Errorf("not a directory") + } + + return &CacheDir{ + pathComps: pathComps, + name: stat.Name(), + modTime: stat.ModTime(), + perm: stat.Mode().Perm(), + }, nil +} + +func makeCacheDirFromOption(c *Cache, pathComps []string, opt CreateDirOption) (*CacheDir, error) { + metaPath := c.GetCacheMetaPath(pathComps...) + dataPath := c.GetCacheDataPath(pathComps...) + + err := os.MkdirAll(metaPath, 0755) + if err != nil { + return nil, err + } + + err = os.MkdirAll(dataPath, 0755) + if err != nil { + return nil, err + } + + os.Chtimes(metaPath, opt.ModTime, opt.ModTime) return &CacheDir{ pathComps: pathComps, - name: info.Name(), - modTime: info.ModTime(), - perm: info.Mode().Perm(), + name: pathComps[len(pathComps)-1], + modTime: opt.ModTime, + perm: 0755, + }, nil +} + +func loadCacheDirInfo(c *Cache, pathComps []string) (*CacheEntryInfo, error) { + stat, err := os.Stat(c.GetCacheMetaPath(pathComps...)) + if err != nil { + return nil, err + } + + if !stat.IsDir() { + return nil, fmt.Errorf("not a directory") } + + return &CacheEntryInfo{ + PathComps: pathComps, + Size: 0, + Mode: stat.Mode(), + ModTime: stat.ModTime(), + IsDir: true, + }, nil } func (f *CacheDir) PathComps() []string { @@ -45,6 +123,21 @@ func (f *CacheDir) IsDir() bool { return true } -func (f *CacheDir) Release() { +func (f *CacheDir) Info() CacheEntryInfo { + return CacheEntryInfo{ + PathComps: f.pathComps, + Size: 0, + Mode: f.perm, + ModTime: f.modTime, + IsDir: true, + } +} +func (f *CacheDir) SetModTime(modTime time.Time) error { + // TODO 修改文件夹的修改时间 + return nil } + +// func (f *CacheDir) Release() { + +// } diff --git a/client2/internal/mount/vfs/cache/file.go b/client2/internal/mount/vfs/cache/file.go index e574e01..0480200 100644 --- a/client2/internal/mount/vfs/cache/file.go +++ b/client2/internal/mount/vfs/cache/file.go @@ -1,50 +1,51 @@ package cache import ( - "context" - "io" + "fmt" "os" - "path/filepath" "sync" "time" - "gitlink.org.cn/cloudream/common/pkgs/future" + "gitlink.org.cn/cloudream/common/pkgs/logger" cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" "gitlink.org.cn/cloudream/common/utils/lo2" "gitlink.org.cn/cloudream/common/utils/math2" "gitlink.org.cn/cloudream/common/utils/serder" "gitlink.org.cn/cloudream/storage/client2/internal/mount/fuse" - "gitlink.org.cn/cloudream/storage/common/pkgs/downloader" ) type FileInfo struct { // 文件总大小。可能会超过对应的远端文件的大小。 - // 此大小可能与本地缓存文件大小也不用,需要定时将本地缓存文件大小修正到与这个值相同。 + // 此大小可能与本地缓存文件大小也不同,需要定时将本地缓存文件大小修正到与这个值相同。 Size int64 // 本文件是否有未提交的修改 Dirty bool // 数据段列表,按照段开始位置从小到大排列 - Segments []Range + Segments []*Range // 文件对应的对象ID,仅在文件是一个缓存文件时才有值 - ObjectID cdssdk.ObjectID + // ObjectID cdssdk.ObjectID // 文件对应的对象大小,仅在文件是一个缓存文件时才有值。 // 此值代表有多少数据应该从远端加载,所以可能会小于远端实际大小 ObjectSize int64 // 如果本文件完全是一个缓存文件,那么这个字段记录了其内容的哈希值,用于在下载缓存数据时,检查远端文件是否被修改过 - Hash string + // Hash cdssdk.FileHash // 文件的最后修改时间 ModTime time.Time // 文件的权限 Perm os.FileMode } -// 返回值代表文件大小是否发生了变化 -func (f *FileInfo) EnlargeTo(size int64) bool { - if size > f.Size { - f.Size = size - return true +func (f *FileInfo) Clone() FileInfo { + n := *f + n.Segments = make([]*Range, len(f.Segments)) + for i, seg := range f.Segments { + n.Segments[i] = &Range{ + Position: seg.Position, + Length: seg.Length, + } } - return false + + return n } type Range struct { @@ -56,69 +57,180 @@ func (r *Range) GetPosition() int64 { return r.Position } +func (r *Range) SetPosition(pos int64) { + r.Position = pos +} + func (r *Range) GetLength() int64 { return r.Length } -type ReadRequest struct { - Position int64 - Length int64 - Callback *future.SetVoidFuture - Zeros int64 // 如果>0,那么说明读到了一块全0的数据,值的大小代表这块数据的长度,此时Segment字段为nil - Segment *FileSegment +func (r *Range) SetLength(length int64) { + r.Length = length } -func (r *ReadRequest) GetPosition() int64 { - return r.Position +func (r *Range) End() int64 { + return r.Position + r.Length } -func (r *ReadRequest) GetLength() int64 { - return r.Length +// 所有读写过程共用同一个CacheFile对象。 +// 不应该将此结构体保存到对象中 +type CacheFile struct { + cache *Cache + pathComps []string + name string + info FileInfo + remoteObj *cdssdk.Object + infoRev int64 + rwLock *sync.RWMutex + readers []*CacheFileReadWriter + writers []*CacheFileReadWriter + backgroundChan chan any + + localFile *os.File + writeLock *sync.RWMutex } -type WriteReqeust struct { - Position int64 - Length int64 - Callback *future.SetVoidFuture - Segment *FileSegment +func createNewCacheFile(cache *Cache, pathComps []string) (*CacheFile, error) { + metaPath := cache.GetCacheMetaPath(pathComps...) + dataPath := cache.GetCacheDataPath(pathComps...) + + info := FileInfo{ + Dirty: true, + ModTime: time.Now(), + Perm: 0644, + } + + infoData, err := serder.ObjectToJSON(info) + if err != nil { + return nil, err + } + + err = os.WriteFile(metaPath, infoData, 0644) + if err != nil { + return nil, fmt.Errorf("save cache file: %w", err) + } + + localFile, err := os.OpenFile(dataPath, os.O_CREATE|os.O_RDWR|os.O_TRUNC, 0644) + if err != nil { + return nil, fmt.Errorf("create cache file: %w", err) + } + + ch := &CacheFile{ + cache: cache, + pathComps: pathComps, + name: pathComps[len(pathComps)-1], + info: info, + rwLock: &sync.RWMutex{}, + backgroundChan: make(chan any, 1), + localFile: localFile, + writeLock: &sync.RWMutex{}, + } + + go ch.background() + + return ch, nil } -// 所有读写过程共用同一个CacheFile对象。 -// 不应该将此结构体保存到对象中 -type CacheFile struct { - objDownloader *downloader.Downloader - pathComps []string - name string - info FileInfo - lock sync.Mutex - segBuffer SegmentBuffer - readers int - writers int - - localLoaders []*LocalLoader - remoteLoaders []*RemoteLoader - pendingReadings []*ReadRequest - pendingWritings []*WriteReqeust - managerChan chan any +func loadCacheFile(cache *Cache, pathComps []string) (*CacheFile, error) { + metaPath := cache.GetCacheMetaPath(pathComps...) + dataPath := cache.GetCacheDataPath(pathComps...) + + metaData, err := os.ReadFile(metaPath) + if err != nil { + return nil, err + } + + info := &FileInfo{} + err = serder.JSONToObject(metaData, info) + if err != nil { + return nil, err + } + + localFile, err := os.OpenFile(dataPath, os.O_CREATE|os.O_RDWR, 0644) + if err != nil { + return nil, fmt.Errorf("create cache file: %w", err) + } + + ch := &CacheFile{ + cache: cache, + pathComps: pathComps, + name: pathComps[len(pathComps)-1], + info: *info, + rwLock: &sync.RWMutex{}, + backgroundChan: make(chan any, 1), + localFile: localFile, + writeLock: &sync.RWMutex{}, + } + + go ch.background() + + return ch, nil } -func loadCacheFile(pathComps []string, infoPath string) (*CacheFile, error) { - f, err := os.Open(infoPath) +func makeCacheFileFromObject(cache *Cache, pathComps []string, obj *cdssdk.Object) (*CacheFile, error) { + metaPath := cache.GetCacheMetaPath(pathComps...) + dataPath := cache.GetCacheDataPath(pathComps...) + + info := FileInfo{ + Size: obj.Size, + ObjectSize: obj.Size, + ModTime: obj.UpdateTime, + Perm: 0755, + } + + infoData, err := serder.ObjectToJSON(info) + if err != nil { + return nil, err + } + + err = os.WriteFile(metaPath, infoData, 0644) + if err != nil { + return nil, fmt.Errorf("save cache file: %w", err) + } + + localFile, err := os.OpenFile(dataPath, os.O_CREATE|os.O_RDWR|os.O_TRUNC, 0644) + if err != nil { + return nil, fmt.Errorf("create cache file: %w", err) + } + + ch := &CacheFile{ + cache: cache, + pathComps: pathComps, + name: pathComps[len(pathComps)-1], + info: info, + remoteObj: obj, + rwLock: &sync.RWMutex{}, + backgroundChan: make(chan any, 1), + localFile: localFile, + writeLock: &sync.RWMutex{}, + } + + go ch.background() + + return ch, nil +} + +func loadCacheFileInfo(cache *Cache, pathComps []string) (*CacheEntryInfo, error) { + metaPath := cache.GetCacheMetaPath(pathComps...) + + metaData, err := os.ReadFile(metaPath) if err != nil { return nil, err } - defer f.Close() info := &FileInfo{} - err = serder.JSONToObjectStream(f, info) + err = serder.JSONToObject(metaData, info) if err != nil { return nil, err } - return &CacheFile{ - pathComps: pathComps, - name: filepath.Base(infoPath), - info: *info, + return &CacheEntryInfo{ + PathComps: pathComps, + Size: info.Size, + Mode: info.Perm, + ModTime: info.ModTime, + IsDir: false, }, nil } @@ -146,207 +258,288 @@ func (f *CacheFile) IsDir() bool { return false } +func (f *CacheFile) Info() CacheEntryInfo { + return CacheEntryInfo{ + PathComps: f.pathComps, + Size: f.info.Size, + Mode: f.info.Perm, + ModTime: f.info.ModTime, + IsDir: false, + } +} + +func (f *CacheFile) SetRemoteObject(obj *cdssdk.Object) { + f.remoteObj = obj +} + // 打开一个写入句柄,同时支持读取 func (f *CacheFile) Open(readOnly bool) *CacheFileReadWriter { - f.lock.Lock() - defer f.lock.Unlock() + logger.Tracef("CacheFile.Open: %v, %v", f.name, readOnly) + + f.rwLock.Lock() + defer f.rwLock.Unlock() + + h := &CacheFileReadWriter{ + file: f, + readOnly: readOnly, + remoteLock: &sync.Mutex{}, + } + + if f.remoteObj != nil { + h.remote = newRemoteLoader(f) + } if readOnly { - f.readers++ + f.readers = append(f.readers, h) } else { - f.writers++ + f.writers = append(f.writers, h) } - return &CacheFileReadWriter{ - file: f, - readOnly: readOnly, - } + return h } func (f *CacheFile) SetModTime(modTime time.Time) error { + f.rwLock.Lock() + f.info.ModTime = modTime + f.infoRev++ + f.rwLock.Unlock() + + f.notifyBackground() + return nil } func (f *CacheFile) Truncate(size int64) error { + // 修改文件大小前不允许写入 + f.writeLock.Lock() + defer f.writeLock.Unlock() + + err := f.localFile.Truncate(size) + if err != nil { + return err + } + + f.rwLock.Lock() + defer f.rwLock.Unlock() + + // 调整能从远端下载的大小 + f.info.ObjectSize = math2.Min(f.info.ObjectSize, size) + + // 调整本地缓存文件里的有效数据大小 + if size < f.info.Size { + f.info.Segments = TruncateRange(f.info.Segments, size) + } else { + f.info.Segments = AddRange(f.info.Segments, &Range{Position: f.info.Size, Length: size - f.info.Size}) + } + f.info.Size = size + f.infoRev++ + + f.notifyBackground() + return nil } // 不再使用缓存文件 -func (f *CacheFile) Release() { +// func (f *CacheFile) Release() { -} +// } -// 一个文件段的数据被写入到本地了。err为nil表示成功,否则表示写入失败。 -func (f *CacheFile) OnSaved(seg *FileSegment, err error) { +func (f *CacheFile) background() { + savedInfoRev := int64(0) + ticker := time.NewTicker(time.Second * 5) + defer ticker.Stop() -} + for { + select { + case _, ok := <-f.backgroundChan: + if !ok { + return + } -// 一个文件段的数据被加载到内存了。err为nil表示成功,否则表示加载失败。 -func (f *CacheFile) OnLoaded(seg *FileSegment, err error) { + case <-ticker.C: + } -} + f.rwLock.Lock() -func (f *CacheFile) notifyManager() { - select { - case f.managerChan <- nil: - break - default: + for { + if f.infoRev == savedInfoRev { + break + } + + jsonData, err := serder.ObjectToJSON(f.info) + if err != nil { + // TODO 日志 + break + } + + err = os.WriteFile(f.cache.GetCacheMetaPath(f.pathComps...), jsonData, 0644) + if err != nil { + // TODO 日志 + break + } + + savedInfoRev = f.infoRev + break + } + + f.rwLock.Unlock() } } -func (f *CacheFile) managing() { - for { - <-f.managerChan - +func (f *CacheFile) notifyBackground() { + select { + case f.backgroundChan <- nil: + default: } } type CacheFileReadWriter struct { - file *CacheFile - readOnly bool + file *CacheFile + readOnly bool + remote *RemoteLoader + remoteLock *sync.Mutex } -func (f *CacheFileReadWriter) ReadAt(buf []byte, off int64) (int, error) { - f.file.lock.Lock() +func (h *CacheFileReadWriter) ReadAt(buf []byte, off int64) (int, error) { + logger.Tracef("CacheFileReadWriter.ReadAt: %v, %v, %v", h.file.name, off, len(buf)) - if off >= f.file.info.Size { - return 0, io.EOF - } + totalReadLen := 0 + for totalReadLen < len(buf) { + curBuf := buf[totalReadLen:] + curOff := off + int64(totalReadLen) + + h.file.rwLock.RLock() - segIdx := f.file.segBuffer.FirstContains(off) + if curOff >= h.file.info.Size { + h.file.rwLock.RUnlock() + break + } - if segIdx >= 0 { - seg := f.file.segBuffer.Segment(segIdx) + // 先尝试从本地缓存文件里读取 + rngIdx := FirstContainsIndex(h.file.info.Segments, curOff) + if rngIdx >= 0 && h.file.info.Segments[rngIdx].End() > curOff { + readLen := math2.Min(int64(len(curBuf)), h.file.info.Segments[rngIdx].End()-curOff) + realReadLen, err := h.file.localFile.ReadAt(curBuf[:readLen], curOff) + totalReadLen += realReadLen + h.file.rwLock.RUnlock() + + logger.Tracef("read from local cache, n: %v, err: %v", realReadLen, err) + if err != nil { + return totalReadLen, err + } + continue + } - // 读取的数据在当前段内 - if off >= seg.Position && off < seg.Position+seg.Length { - readLen := math2.Min(int64(len(buf)), seg.Position+seg.Length-off) - seg.RefCount++ - f.file.lock.Unlock() + // 否则从远端下载 + loadLen := math2.Min(int64(len(curBuf)), h.file.info.ObjectSize-curOff) + if rngIdx+1 < len(h.file.info.Segments) { + // 最多加载到下一个段的开头 + loadLen = math2.Min(loadLen, h.file.info.Segments[rngIdx+1].Position-curOff) + } - copy(buf[:readLen], seg.SubSliceAbs(off, readLen)) + h.file.rwLock.RUnlock() - f.file.lock.Lock() - seg.RefCount-- - seg.Type = SegmentDirty - f.file.lock.Unlock() - return int(readLen), nil + if h.remote == nil { + return totalReadLen, fmt.Errorf("no remote file") } - // if off >= f.file.info.ObjectSize { - // readLen := int64(len(buf)) + fmt.Printf("load from remote\n") - // if segIdx < f.file.segBuffer.BuzyCount()-1 { - // nextSeg := f.file.segBuffer.Segment(segIdx + 1) - // readLen = math2.Min(readLen, nextSeg.Position-off) - // } else { - // readLen = math2.Min(readLen, f.file.info.Size-off) - // } - // f.file.lock.Unlock() + // 加锁,防止并发Seek + h.remoteLock.Lock() + realLoadLen, err := h.remote.Load(curBuf[:loadLen], curOff) + totalReadLen += realLoadLen + if err != nil { + h.remoteLock.Unlock() + return totalReadLen, err + } + h.remoteLock.Unlock() - // clear(buf[:readLen]) - // return int(readLen), nil - // } - } + logger.Tracef("load from remote: %v", realLoadLen) - // 没有被缓存的数据,则需要通知加载器去加载 + // 在写入到本地之前,先停止其他的写入,防止冲突 + h.file.writeLock.Lock() - fut := future.NewSetVoid() - req := &ReadRequest{ - Position: off, - Callback: fut, - } - insertIdx := FirstContains(f.file.pendingReadings, off) - f.file.pendingReadings = lo2.Insert(f.file.pendingReadings, insertIdx+1, req) - f.file.lock.Unlock() + // 停止其他写入后,就可以计算一下实际要写回的长度。 + h.file.rwLock.RLock() + loadRng := &Range{Position: curOff, Length: int64(realLoadLen)} + DifferentRange(loadRng, h.file.info.Segments) + h.file.rwLock.RUnlock() - f.file.notifyManager() + if loadRng.Length == 0 { + h.file.writeLock.Unlock() + continue + } - err := fut.Wait(context.Background()) - if err != nil { - return 0, err - } + // 写入到本地缓存文件 + writeStart := loadRng.Position - curOff + _, err = h.file.localFile.WriteAt(curBuf[writeStart:writeStart+loadRng.Length], curOff) + if err != nil { + h.file.writeLock.Unlock() - if req.Segment != nil { - // 这里不加锁,因为在引用计数大于0期间,Position不变,而Length虽然可能发生变化,但并发读写是未定义行为,所以不管读到多少数据都可以 - readLen := math2.Min(int64(len(buf)), req.Segment.Position+req.Segment.Length-off) - copy(buf[:readLen], req.Segment.SubSliceAbs(off, readLen)) - - // bufferManager线程会在调用回调之前,给引用计数+1 - // TODO 这种做法容易粗心产生遗漏,且不利于实现超时机制 - f.file.lock.Lock() - req.Segment.RefCount-- - f.file.lock.Unlock() - return int(readLen), nil - } + logger.Tracef("save to local file: %v", err) + return totalReadLen, fmt.Errorf("save to local file: %w", err) + } + + logger.Tracef("save to local: %v", loadRng.Length) - if req.Zeros > 0 { - clear(buf[:req.Zeros]) - return int(req.Zeros), nil + h.file.writeLock.Unlock() + + // 提交到段列表里 + h.file.rwLock.Lock() + h.file.info.Segments = AddRange(h.file.info.Segments, loadRng) + h.file.infoRev++ + h.file.notifyBackground() + h.file.rwLock.Unlock() } - return 0, io.EOF + return totalReadLen, nil } -func (f *CacheFileReadWriter) WriteAt(buf []byte, off int64) (int, error) { - if f.readOnly { +func (h *CacheFileReadWriter) WriteAt(buf []byte, off int64) (int, error) { + if h.readOnly { return 0, fuse.ErrPermission } - f.file.lock.Lock() - // 如果找到一个包含off位置的段,那么就先写满这个段 - segIdx := f.file.segBuffer.FirstContains(off) - if segIdx >= 0 { - seg := f.file.segBuffer.Segment(segIdx) - - if off >= seg.Position && off < seg.Position+seg.Length { - writeLen := math2.Min(int64(len(buf)), seg.BufferEnd()-off) - seg.RefCount++ - f.file.lock.Unlock() - - // 不管此时是不是有其他线程在读取数据,因为我们约定并发读写就是未定义行为。 - copy(seg.SubSliceAbs(off, writeLen), buf[:writeLen]) - - f.file.lock.Lock() - seg.RefCount-- - seg.EnlargeTo(off + writeLen) - seg.Type = SegmentDirty - f.file.info.EnlargeTo(seg.AvailableEnd()) - f.file.lock.Unlock() - f.file.notifyManager() - return int(writeLen), nil - } - } + logger.Tracef("CacheFileReadWriter.WriteAt: %v, %v, %v", h.file.name, off, len(buf)) - fut := future.NewSetVoid() - req := &WriteReqeust{ - Position: off, - Callback: fut, - } - f.file.pendingWritings = append(f.file.pendingWritings, req) - f.file.lock.Unlock() + // 允许多线程并行写入,但在数据加载期间不能写入 + h.file.writeLock.RLock() + defer h.file.writeLock.RUnlock() - err := fut.Wait(context.Background()) + // 写入到本地缓存文件 + writeLen, err := h.file.localFile.WriteAt(buf, off) if err != nil { - return 0, err + return writeLen, fmt.Errorf("save to local file: %w", err) } - // 一些说明参考ReadAt函数 - // 由managing线程保证文件的同一个位置只会有一个段,因此这里不考虑在copy期间又有其他线程在写同一个段导致的产生新的重复段 - writeLen := math2.Min(int64(len(buf)), req.Segment.BufferEnd()-off) - copy(req.Segment.SubSliceAbs(off, writeLen), buf[:writeLen]) - - f.file.lock.Lock() - req.Segment.RefCount-- - req.Segment.EnlargeTo(off + writeLen) - req.Segment.Type = SegmentDirty - f.file.info.EnlargeTo(req.Segment.AvailableEnd()) - f.file.lock.Unlock() - f.file.notifyManager() - return int(writeLen), nil + // 提交到段列表里 + h.file.rwLock.Lock() + defer h.file.rwLock.Unlock() + + h.file.info.Segments = AddRange(h.file.info.Segments, &Range{Position: off, Length: int64(writeLen)}) + h.file.info.Size = math2.Max(h.file.info.Size, off+int64(writeLen)) + h.file.info.Dirty = true + h.file.infoRev++ + + h.file.notifyBackground() + + return writeLen, nil } func (f *CacheFileReadWriter) Sync() error { + return f.file.localFile.Sync() } func (f *CacheFileReadWriter) Close() error { + f.Sync() + f.remote.Close() + + f.file.rwLock.Lock() + if f.readOnly { + f.file.readers = lo2.Remove(f.file.readers, f) + } else { + f.file.writers = lo2.Remove(f.file.writers, f) + } + f.file.rwLock.Unlock() + return nil } diff --git a/client2/internal/mount/vfs/cache/file_saver.go b/client2/internal/mount/vfs/cache/file_saver.go deleted file mode 100644 index 90770ec..0000000 --- a/client2/internal/mount/vfs/cache/file_saver.go +++ /dev/null @@ -1,12 +0,0 @@ -package cache - -import "os" - -type FileSaver struct { - owner *CacheFile - fileHd *os.File -} - -func (s *FileSaver) BeginSaving(seg *FileSegment) { - -} diff --git a/client2/internal/mount/vfs/cache/file_segment.go b/client2/internal/mount/vfs/cache/file_segment.go deleted file mode 100644 index 44d1236..0000000 --- a/client2/internal/mount/vfs/cache/file_segment.go +++ /dev/null @@ -1,117 +0,0 @@ -package cache - -import ( - "gitlink.org.cn/cloudream/common/utils/lo2" - "gitlink.org.cn/cloudream/common/utils/math2" -) - -type SegmentType int - -const ( - // 数据段刚被初始化 - SegmentInit = iota - // 数据来自本地文件 - SegmentLocal - // 数据来自远端文件,还未写入到本地文件 - SegmentRemote - // 数据由用户写入,还未写入到本地文件 - SegmentDirty -) - -type FileSegment struct { - Position int64 - // 有效数据的长度。不一定等于Buffer的长度 - Length int64 - Type SegmentType - // 文件数据缓冲区。不可对此缓冲区进行append操作! - Buffer []byte - // 当前段是否正在被保存到本地文件中 - IsSaving bool - // 引用计数。当引用计数为0时,可以安全地删除此段 - RefCount int -} - -func (s *FileSegment) GetPosition() int64 { - return s.Position -} - -func (s *FileSegment) GetLength() int64 { - return s.Length -} - -func (s *FileSegment) SubSliceAbs(pos int64, len int64) []byte { - start := pos - s.Position - return s.Buffer[start : start+len] -} - -// 将当前段拆分为两个段。当前段将持有第一个段,返回值持有第二个段 -func (s *FileSegment) SplitAbs(pos int64) *FileSegment { - s2 := s.Buffer[pos-s.Position:] - s2Len := math2.Max(s.Position+s.Length-pos, 0) - - s.Buffer = s.Buffer[:pos-s.Position] - s.Length = math2.Min(int64(len(s.Buffer)), s.Length) - - return &FileSegment{ - Position: pos, - Length: s2Len, - Type: s.Type, - Buffer: s2, - } -} - -func (s *FileSegment) AvailableEnd() int64 { - return s.Position + s.Length -} - -func (s *FileSegment) BufferEnd() int64 { - return s.Position + int64(len(s.Buffer)) -} - -func (s *FileSegment) EnlargeTo(pos int64) { - if pos > s.Position+s.Length { - s.Length = pos - s.Position - } -} - -type SegmentBuffer struct { - // 正在使用的文件段缓冲区 - buzys []*FileSegment -} - -func (s *SegmentBuffer) BuzyCount() int { - return len(s.buzys) -} - -func (s *SegmentBuffer) Segment(idx int) *FileSegment { - return s.buzys[idx] -} - -func (s *SegmentBuffer) FirstContains(pos int64) int { - return FirstContains(s.buzys, pos) -} - -// 将指定段插入到段缓存的恰当位置 -func (s *SegmentBuffer) Insert(seg *FileSegment) { - index := s.FirstContains(seg.Position) - - if index == -1 { - s.buzys = append([]*FileSegment{seg}, s.buzys...) - } else { - // index是最后一个小于Position的位置,所以要加1 - s.buzys = lo2.Insert(s.buzys, index+1, seg) - } -} - -// 插入一个段到指定索引位置。不会检查插入后Segments是否依然保持有序。 -func (s *SegmentBuffer) InsertAt(index int, seg *FileSegment) { - s.buzys = lo2.Insert(s.buzys, index, seg) -} - -func (s *SegmentBuffer) RemoveAt(index int) { - s.buzys = lo2.RemoveAt(s.buzys, index) -} - -func (s *SegmentBuffer) Remove(seg *FileSegment) { - s.buzys = lo2.Remove(s.buzys, seg) -} diff --git a/client2/internal/mount/vfs/cache/file_segment_test.go b/client2/internal/mount/vfs/cache/file_segment_test.go deleted file mode 100644 index a8947ef..0000000 --- a/client2/internal/mount/vfs/cache/file_segment_test.go +++ /dev/null @@ -1,36 +0,0 @@ -package cache - -import ( - "fmt" - "testing" - - . "github.com/smartystreets/goconvey/convey" -) - -func Test_FindSegmentIndex(t *testing.T) { - cases := []struct { - title string - buffer SegmentBuffer - postions []int64 - wants []int - }{ - { - buffer: SegmentBuffer{ - buzys: []*FileSegment{ - {Position: 0}, {Position: 10}, - }, - }, - postions: []int64{0, 2, 10, 11}, - wants: []int{0, 0, 1, 1}, - }, - } - - for i, c := range cases { - for j, _ := range c.postions { - Convey(fmt.Sprintf("%d.%d. %s", i, j, c.title), t, func() { - got := c.buffer.FirstContains(c.postions[j]) - So(got, ShouldEqual, c.wants[j]) - }) - } - } -} diff --git a/client2/internal/mount/vfs/cache/range_test.go b/client2/internal/mount/vfs/cache/range_test.go new file mode 100644 index 0000000..dfd06b4 --- /dev/null +++ b/client2/internal/mount/vfs/cache/range_test.go @@ -0,0 +1,332 @@ +package cache + +import ( + "testing" + + . "github.com/smartystreets/goconvey/convey" +) + +func Test_FirstContainsIndex(t *testing.T) { + cases := []struct { + title string + arr []*Range + pos int64 + want int + }{ + { + "空数组", + []*Range{}, + 100, + -1, + }, + { + "只有一个元素,pos在范围内", + []*Range{{100, 100}}, + 100, + 0, + }, + { + "区间不连续,但在范围内-奇数区间数", + []*Range{{100, 100}, {300, 100}, {500, 100}}, + 350, + 1, + }, + { + "区间不连续,但在范围内-偶数区间数", + []*Range{{100, 100}, {300, 100}}, + 350, + 1, + }, + { + "区间不连续,pos在最左边", + []*Range{{100, 100}, {300, 100}, {500, 100}}, + 10, + -1, + }, + { + "区间不连续,pos在靠左的第一个空洞中", + []*Range{{100, 100}, {300, 100}, {500, 100}}, + 200, + 0, + }, + { + "区间不连续,pos在靠左的第一个空洞中-偶数区间数", + []*Range{{100, 100}, {300, 100}, {500, 100}, {700, 100}}, + 200, + 0, + }, + { + "区间不连续,pos在最右边", + []*Range{{100, 100}, {300, 100}, {500, 100}}, + 601, + 2, + }, + } + + for _, c := range cases { + Convey(c.title, t, func() { + got := FirstContainsIndex(c.arr, c.pos) + So(got, ShouldEqual, c.want) + }) + } +} + +func Test_AddRange(t *testing.T) { + cases := []struct { + title string + arr []*Range + r *Range + want []*Range + }{ + { + "空数组", + []*Range{}, + &Range{100, 100}, + []*Range{{100, 100}}, + }, + { + "单个区间,连接在左侧", + []*Range{{100, 100}}, + &Range{0, 100}, + []*Range{{0, 200}}, + }, + { + "单个区间,连接在右侧", + []*Range{{100, 100}}, + &Range{200, 100}, + []*Range{{100, 200}}, + }, + { + "单个区间,左边重叠了一部分", + []*Range{{100, 100}}, + &Range{50, 100}, + []*Range{{50, 150}}, + }, + { + "单个区间,右边重叠了一部分", + []*Range{{100, 100}}, + &Range{150, 100}, + []*Range{{100, 150}}, + }, + { + "单个区间,被新区间完全覆盖", + []*Range{{100, 100}}, + &Range{50, 150}, + []*Range{{50, 150}}, + }, + { + "单个区间,在左边但有空洞", + []*Range{{100, 100}}, + &Range{0, 50}, + []*Range{{0, 50}, {100, 100}}, + }, + { + "单个区间,在右边但有空洞", + []*Range{{100, 100}}, + &Range{250, 50}, + []*Range{{100, 100}, {250, 50}}, + }, + { + "恰好连接了两个区间", + []*Range{{100, 100}, {300, 100}}, + &Range{200, 100}, + []*Range{{100, 300}}, + }, + { + "连接了两个区间,但在左边有重叠", + []*Range{{100, 100}, {300, 100}}, + &Range{150, 150}, + []*Range{{100, 300}}, + }, + { + "连接了两个区间,但在右边有重叠", + []*Range{{100, 100}, {300, 100}}, + &Range{150, 200}, + []*Range{{100, 300}}, + }, + { + "覆盖了多个区间,但不是完全连接", + []*Range{{100, 100}, {300, 100}, {500, 100}, {700, 100}}, + &Range{150, 400}, + []*Range{{100, 500}, {700, 100}}, + }, + { + "完全覆盖所有区间", + []*Range{{100, 100}, {300, 100}, {500, 100}, {700, 100}}, + &Range{0, 900}, + []*Range{{0, 900}}, + }, + } + + for _, c := range cases { + Convey(c.title, t, func() { + got := AddRange(c.arr, c.r) + So(got, ShouldResemble, c.want) + }) + } +} + +func Test_MergeRanges(t *testing.T) { + cases := []struct { + title string + arr1 []*Range + arr2 []*Range + want []*Range + }{ + { + "两个都是空数组", + []*Range{}, + []*Range{}, + nil, + }, + { + "其中一个是空数组", + []*Range{{100, 100}}, + []*Range{}, + []*Range{{100, 100}}, + }, + { + "两个都是单个区间,没有重叠", + []*Range{{100, 100}}, + []*Range{{300, 100}}, + []*Range{{100, 100}, {300, 100}}, + }, + { + "两个都是单个区间,恰好连接", + []*Range{{100, 100}}, + []*Range{{200, 100}}, + []*Range{{100, 200}}, + }, + { + "两个都是单个区间,有重叠", + []*Range{{100, 100}}, + []*Range{{150, 100}}, + []*Range{{100, 150}}, + }, + { + "多区间恰好连接", + []*Range{{100, 100}, {300, 100}, {500, 100}}, + []*Range{{200, 100}, {400, 100}}, + []*Range{{100, 500}}, + }, + { + "多区间各种情况", + []*Range{{100, 100}, {300, 100}, {500, 100}, {700, 100}}, + []*Range{{150, 100}, {260, 90}, {400, 100}, {550, 250}}, + []*Range{{100, 150}, {260, 540}}, + }, + } + + for _, c := range cases { + Convey(c.title, t, func() { + got := MergeRanges(c.arr1, c.arr2) + So(got, ShouldResemble, c.want) + }) + } +} + +func Test_TruncateRange(t *testing.T) { + cases := []struct { + title string + arr []*Range + pos int64 + want []*Range + }{ + { + "空数组", + []*Range{}, + 100, + []*Range{}, + }, + { + "截断的长度比现有长", + []*Range{{100, 100}}, + 300, + []*Range{{100, 100}}, + }, + { + "截断的长度等于现有长", + []*Range{{100, 100}}, + 200, + []*Range{{100, 100}}, + }, + { + "截断的长度小于现有长", + []*Range{{100, 100}, {300, 100}}, + 150, + []*Range{{100, 50}}, + }, + { + "截断了所有", + []*Range{{100, 100}, {300, 100}}, + 0, + []*Range{}, + }, + { + "截断的位置在空洞内", + []*Range{{100, 100}, {300, 100}}, + 250, + []*Range{{100, 100}}, + }, + } + + for _, c := range cases { + Convey(c.title, t, func() { + got := TruncateRange(c.arr, c.pos) + So(got, ShouldResemble, c.want) + }) + } +} + +func Test_DifferentRange(t *testing.T) { + cases := []struct { + title string + rng *Range + arr []*Range + want *Range + }{ + { + "空数组", + &Range{100, 100}, + []*Range{}, + &Range{100, 100}, + }, + { + "区间的右边被截断", + &Range{50, 150}, + []*Range{{100, 100}, {300, 100}}, + &Range{50, 50}, + }, + { + "区间的左边被截断", + &Range{150, 150}, + []*Range{{100, 100}, {300, 100}}, + &Range{200, 100}, + }, + { + "区间被彻底截断", + &Range{100, 100}, + []*Range{{100, 100}, {300, 100}}, + &Range{200, 0}, + }, + { + "区间被截断成多份1", + &Range{0, 400}, + []*Range{{100, 100}, {300, 100}}, + &Range{0, 100}, + }, + { + "区间被截断成多份2", + &Range{100, 300}, + []*Range{{100, 100}, {300, 100}}, + &Range{200, 100}, + }, + } + + for _, c := range cases { + Convey(c.title, t, func() { + DifferentRange(c.rng, c.arr) + So(c.rng, ShouldResemble, c.want) + }) + } +} diff --git a/client2/internal/mount/vfs/cache/remote.go b/client2/internal/mount/vfs/cache/remote.go new file mode 100644 index 0000000..50d1780 --- /dev/null +++ b/client2/internal/mount/vfs/cache/remote.go @@ -0,0 +1,95 @@ +package cache + +import ( + "io" + "time" +) + +type RemoteLoader struct { + file *CacheFile + loaders []*OpenLoader +} + +type OpenLoader struct { + reader io.ReadCloser + pos int64 + lastUsedTime time.Time +} + +func newRemoteLoader(file *CacheFile) *RemoteLoader { + return &RemoteLoader{ + file: file, + loaders: make([]*OpenLoader, 2), + } +} + +func (r *RemoteLoader) Load(p []byte, pos int64) (n int, err error) { + replaceIdx := -1 + for i := 0; i < len(r.loaders); i++ { + loader := r.loaders[i] + if loader == nil { + replaceIdx = i + continue + } + + if loader.pos == pos { + loader.lastUsedTime = time.Now() + n, err = io.ReadFull(loader.reader, p) + loader.pos += int64(n) + if err != nil { + loader.reader.Close() + r.loaders[i] = nil + } + return + } + + if replaceIdx == -1 || r.loaders[replaceIdx] != nil && r.loaders[replaceIdx].lastUsedTime.After(loader.lastUsedTime) { + replaceIdx = i + } + } + + if r.loaders[replaceIdx] != nil { + r.loaders[replaceIdx].reader.Close() + r.loaders[replaceIdx] = nil + } + + loader, err := r.newLoader(pos) + if err != nil { + return 0, err + } + loader.lastUsedTime = time.Now() + + r.loaders[replaceIdx] = loader + n, err = io.ReadFull(loader.reader, p) + loader.pos += int64(n) + if err != nil { + loader.reader.Close() + r.loaders[replaceIdx] = nil + } + return +} + +func (r *RemoteLoader) Close() { + for i := 0; i < len(r.loaders); i++ { + if r.loaders[i] != nil { + r.loaders[i].reader.Close() + } + } +} + +func (r *RemoteLoader) newLoader(pos int64) (*OpenLoader, error) { + detail, err := r.file.cache.db.Object().GetDetail(r.file.cache.db.DefCtx(), r.file.remoteObj.ObjectID) + if err != nil { + return nil, err + } + + down, err := r.file.cache.downloader.DownloadObjectByDetail(detail, pos, -1) + if err != nil { + return nil, err + } + + return &OpenLoader{ + reader: down.File, + pos: pos, + }, nil +} diff --git a/client2/internal/mount/vfs/cache/utils.go b/client2/internal/mount/vfs/cache/utils.go index fa6e0d1..bf749be 100644 --- a/client2/internal/mount/vfs/cache/utils.go +++ b/client2/internal/mount/vfs/cache/utils.go @@ -1,9 +1,11 @@ package cache -type Ranger interface { - GetPosition() int64 - GetLength() int64 -} +import ( + "github.com/samber/lo" + "gitlink.org.cn/cloudream/common/utils/lo2" + "gitlink.org.cn/cloudream/common/utils/math2" + "gitlink.org.cn/cloudream/common/utils/sort2" +) // 查找第一个包含指定位置的段索引。如果所有段都不包含指定位置,那么有以下三种情况: // @@ -14,7 +16,7 @@ type Ranger interface { // 3. pos在段之间的空洞内,那么会返回小于pos的最后一个段 // // 注:2、3情况返回的结果是相同的 -func FirstContains[T Ranger](arr []T, pos int64) int { +func FirstContainsIndex(arr []*Range, pos int64) int { low, high := 0, len(arr)-1 for low <= high { @@ -22,17 +24,146 @@ func FirstContains[T Ranger](arr []T, pos int64) int { if arr[mid].GetPosition() > pos { high = mid - 1 - } else if arr[mid].GetPosition() < pos { - if mid == len(arr)-1 { - return mid - } + } else { + low = mid + 1 + } + } + + return low - 1 +} - low = mid +func FirstContains(arr []*Range, pos int64) (*Range, bool) { + index := FirstContainsIndex(arr, pos) + + var ret *Range + if index < 0 { + return nil, false + } + + ret = arr[index] + + if ret.End() <= pos { + return nil, false + } + + return ret, true +} + +func AddRange(arr []*Range, rng *Range) []*Range { + if len(arr) == 0 { + return []*Range{rng} + } + + idx := FirstContainsIndex(arr, rng.GetPosition()) + arr = lo2.Insert(arr, idx+1, rng) + + lowerIdx := math2.Max(0, idx) + lowerRng := arr[lowerIdx] + + for i := lowerIdx + 1; i < len(arr); i++ { + nextRng := arr[i] + if nextRng.GetPosition() > lowerRng.End() { + break + } + + curRngEnd := lowerRng.End() + nextRngEnd := nextRng.End() + lowerRng.SetPosition(math2.Min(lowerRng.GetPosition(), nextRng.GetPosition())) + lowerRng.SetLength(math2.Max(curRngEnd, nextRngEnd) - lowerRng.GetPosition()) + + nextRng.SetLength(0) + } + + return lo.Reject(arr, func(i *Range, idx int) bool { + return i.GetLength() == 0 + }) +} + +func MergeRanges(arr []*Range, rngs []*Range) []*Range { + type Point struct { + pos int64 + isStart bool + } + points := make([]Point, 0, (len(arr)+len(rngs))*2) + + for _, rng := range arr { + points = append(points, Point{rng.GetPosition(), true}) + points = append(points, Point{rng.GetPosition() + rng.GetLength(), false}) + } + + for _, rng := range rngs { + points = append(points, Point{rng.GetPosition(), true}) + points = append(points, Point{rng.GetPosition() + rng.GetLength(), false}) + } + + points = sort2.Sort(points, func(a, b Point) int { + if a.pos == b.pos { + // 位置相同,则endpoint在后 + return sort2.CmpBool(b.isStart, a.isStart) + } + return math2.Sign(a.pos - b.pos) + }) + + var merged []*Range + depth := 0 + for _, p := range points { + if p.isStart { + depth++ + if depth == 1 { + merged = append(merged, &Range{p.pos, 0}) + } } else { - return mid + depth-- + if depth == 0 { + lastIdx := len(merged) - 1 + merged[lastIdx].Length = p.pos - merged[lastIdx].Position + } } } - return high + return merged +} + +func TruncateRange(arr []*Range, length int64) []*Range { + if len(arr) == 0 { + return arr + } + + idx := FirstContainsIndex(arr, length) + if idx < 0 { + return arr[:0] + } + + rng := arr[idx] + if rng.End() > length { + rng.Length = length - rng.Position + } + + if rng.Length <= 0 { + return arr[:idx] + } + + return arr[:idx+1] +} + +// 修剪指定的范围,使其仅包含在arr的空洞中。 +// 如果范围会被切分成多个,那么就只会返回第一个切分段。 +func DifferentRange(rng *Range, arr []*Range) { + rngStart := rng.Position + rngEnd := rng.End() + + idx := FirstContainsIndex(arr, rng.Position) + if idx >= 0 { + leftRng := arr[idx] + rngStart = math2.Max(rng.Position, leftRng.End()) + } + + if idx+1 < len(arr) { + rightRng := arr[idx+1] + rngEnd = math2.Min(rngEnd, rightRng.Position) + } + + rng.Position = rngStart + rng.Length = rngEnd - rngStart } diff --git a/client2/internal/mount/vfs/fuse_bucket.go b/client2/internal/mount/vfs/fuse_bucket.go index 6e37a29..9d89225 100644 --- a/client2/internal/mount/vfs/fuse_bucket.go +++ b/client2/internal/mount/vfs/fuse_bucket.go @@ -1,22 +1,250 @@ package vfs import ( + "context" + "fmt" + "os" + "time" + cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" "gitlink.org.cn/cloudream/storage/client2/internal/mount/fuse" "gitlink.org.cn/cloudream/storage/client2/internal/mount/vfs/cache" + "gitlink.org.cn/cloudream/storage/common/pkgs/db2" + "gorm.io/gorm" ) type FuseBucket struct { + vfs *Vfs + name string + modTime time.Time +} + +func newBucketFromCache(c cache.CacheEntryInfo, vfs *Vfs) fuse.FsDir { + return &FuseBucket{ + vfs: vfs, + name: c.PathComps[len(c.PathComps)-1], + modTime: c.ModTime, + } +} + +func (r *FuseBucket) PathComps() []string { + return []string{r.name} +} + +func (r *FuseBucket) Name() string { + return r.name +} + +func (r *FuseBucket) Size() int64 { + return 0 +} + +func (r *FuseBucket) Mode() os.FileMode { + return os.ModeDir | 0755 +} + +func (r *FuseBucket) ModTime() time.Time { + return r.modTime +} + +func (r *FuseBucket) IsDir() bool { + return true +} + +func (r *FuseBucket) SetModTime(time time.Time) error { + dir := r.loadCacheDir() + if dir == nil { + return fuse.ErrNotExists + } + + return dir.SetModTime(time) +} + +// 如果不存在,应该返回ErrNotExists +func (r *FuseBucket) Child(ctx context.Context, name string) (fuse.FsEntry, error) { + childPathComps := []string{r.name, name} + ca := r.vfs.cache.Stat(childPathComps) + + if ca == nil { + // TODO UserID + + pkg, err := r.vfs.db.Package().GetUserPackageByName(r.vfs.db.DefCtx(), 1, r.name, name) + if err == nil { + dir := r.vfs.cache.LoadDir(childPathComps, &cache.CreateDirOption{ + ModTime: pkg.CreateTime, + }) + if dir == nil { + return nil, fuse.ErrNotExists + } + + return newPackageFromCache(dir.Info(), r.vfs), nil + } + + if err == gorm.ErrRecordNotFound { + return nil, fuse.ErrNotExists + } + + return nil, err + } + + if ca.IsDir { + return newPackageFromCache(*ca, r.vfs), nil + } + + return newFileFromCache(*ca, r.vfs), nil +} + +func (r *FuseBucket) Children(ctx context.Context) ([]fuse.FsEntry, error) { + return r.listChildren() +} + +func (r *FuseBucket) ReadChildren() (fuse.DirReader, error) { + ens, err := r.listChildren() + if err != nil { + return nil, err + } + + return newFuseDirReader(ens), nil +} + +func (r *FuseBucket) listChildren() ([]fuse.FsEntry, error) { + var ens []fuse.FsEntry + + infos := r.vfs.cache.StatMany([]string{r.name}) + + pkgs, err := r.vfs.db.Package().GetBucketPackagesByName(r.vfs.db.DefCtx(), r.name) + if err != nil { + return nil, err + } + + pkgMap := make(map[string]*cdssdk.Package) + for _, pkg := range pkgs { + p := pkg + pkgMap[pkg.Name] = &p + } + + for _, c := range infos { + delete(pkgMap, c.PathComps[len(c.PathComps)-1]) + + if c.IsDir { + ens = append(ens, newPackageFromCache(c, r.vfs)) + } else { + ens = append(ens, newFileFromCache(c, r.vfs)) + } + } + + for _, pkg := range pkgMap { + dir := r.vfs.cache.LoadDir([]string{r.name, pkg.Name}, &cache.CreateDirOption{ + ModTime: pkg.CreateTime, + }) + if dir == nil { + continue + } + + ens = append(ens, newPackageFromCache(dir.Info(), r.vfs)) + } + + return ens, nil +} + +func (r *FuseBucket) NewDir(ctx context.Context, name string) (fuse.FsDir, error) { + cache := r.vfs.cache.CreateDir([]string{r.name, name}) + if cache == nil { + return nil, fuse.ErrPermission + } + + // TODO 用户ID,失败了可以打个日志 + // TODO 生成系统事件 + // 不关注创建是否成功,仅尝试一下 + r.vfs.db.DoTx(func(tx db2.SQLContext) error { + db := r.vfs.db + bkt, err := db.Bucket().GetByName(tx, name) + if err != nil { + return fmt.Errorf("get bucket: %v", err) + } + + _, err = db.Package().Create(tx, bkt.BucketID, name) + if err != nil { + return fmt.Errorf("create package: %v", err) + } + + return err + }) + + return newPackageFromCache(cache.Info(), r.vfs), nil } -func newBucketOrFileFromCache(c cache.CacheEntry, vfs *Vfs) fuse.FsEntry { +func (r *FuseBucket) NewFile(ctx context.Context, name string, flags uint32) (fuse.FileHandle, error) { + cache := r.vfs.cache.CreateFile([]string{r.name, name}) + if cache == nil { + return nil, fuse.ErrPermission + } + // Open之后会给cache的引用计数额外+1,即使cache先于FileHandle被关闭, + // 也有有FileHandle的计数保持cache的有效性 + + fileNode := newFileFromCache(cache.Info(), r.vfs) + if flags&uint32(os.O_WRONLY) != 0 { + hd := cache.Open(false) + return newFileHandle(fileNode, hd), nil + } + + if flags&uint32(os.O_RDONLY) != 0 { + hd := cache.Open(true) + return newFileHandle(fileNode, hd), nil + } + + return nil, fuse.ErrPermission } -func newBucketFromCache(c *cache.CacheDir, vfs *Vfs) fuse.FsDir { +func (r *FuseBucket) RemoveChild(ctx context.Context, name string) error { + err := r.vfs.cache.Remove([]string{r.name, name}) + if err != nil { + return err + } + + // TODO 生成系统事件 + // 不关心是否成功 + r.vfs.db.DoTx(func(tx db2.SQLContext) error { + d := r.vfs.db + + pkg, err := d.Package().GetUserPackageByName(tx, 1, r.name, name) + if err != nil { + if err == gorm.ErrRecordNotFound { + return nil + } + } + return d.Package().DeleteComplete(tx, pkg.PackageID) + }) + + return nil } -func newBucketFromDB(bkt cdssdk.Bucket, vfs *Vfs) fuse.FsEntry { +func (r *FuseBucket) MoveChild(ctx context.Context, oldName string, newName string, newParent fuse.FsDir) error { + + // TODO 有问题 + newParentNode := newParent.(FuseNode) + _, err := r.vfs.cache.Move([]string{r.name, oldName}, append(newParentNode.PathComps(), newName)) + if err != nil { + return err + } + + return nil } + +func (r *FuseBucket) loadCacheDir() *cache.CacheDir { + var createOpt *cache.CreateDirOption + bkt, err := r.vfs.db.Bucket().GetByName(r.vfs.db.DefCtx(), r.name) + if err == nil { + createOpt = &cache.CreateDirOption{ + ModTime: bkt.CreateTime, + } + } + + return r.vfs.cache.LoadDir([]string{r.name}, createOpt) +} + +var _ fuse.FsDir = (*FuseBucket)(nil) +var _ FuseNode = (*FuseBucket)(nil) diff --git a/client2/internal/mount/vfs/fuse_dir.go b/client2/internal/mount/vfs/fuse_dir.go new file mode 100644 index 0000000..5b754b6 --- /dev/null +++ b/client2/internal/mount/vfs/fuse_dir.go @@ -0,0 +1,301 @@ +package vfs + +import ( + "context" + "fmt" + "os" + "strings" + "time" + + cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" + "gitlink.org.cn/cloudream/common/utils/lo2" + "gitlink.org.cn/cloudream/storage/client2/internal/mount/fuse" + "gitlink.org.cn/cloudream/storage/client2/internal/mount/vfs/cache" + "gitlink.org.cn/cloudream/storage/common/pkgs/db2" + "gorm.io/gorm" +) + +type FuseDir struct { + vfs *Vfs + pathComps []string + modTime time.Time +} + +func newDirFromCache(ch cache.CacheEntryInfo, vfs *Vfs) *FuseDir { + return &FuseDir{ + vfs: vfs, + pathComps: ch.PathComps, + modTime: ch.ModTime, + } +} + +func (r *FuseDir) PathComps() []string { + return r.pathComps +} + +func (r *FuseDir) Name() string { + return r.pathComps[len(r.pathComps)-1] +} + +func (r *FuseDir) Size() int64 { + return 0 +} + +func (r *FuseDir) Mode() os.FileMode { + return os.ModeDir | 0755 +} + +func (r *FuseDir) ModTime() time.Time { + return r.modTime +} + +func (r *FuseDir) IsDir() bool { + return true +} + +func (r *FuseDir) SetModTime(time time.Time) error { + dir := r.loadCacheDir() + if dir == nil { + return fuse.ErrNotExists + } + + return dir.SetModTime(time) +} + +// 如果不存在,应该返回ErrNotExists +func (r *FuseDir) Child(ctx context.Context, name string) (fuse.FsEntry, error) { + childPathComps := append(lo2.ArrayClone(r.pathComps), name) + ca := r.vfs.cache.Stat(childPathComps) + if ca == nil { + var ret fuse.FsEntry + + db := r.vfs.db + err := db.DoTx(func(tx db2.SQLContext) error { + pkg, err := db.Package().GetUserPackageByName(tx, 1, r.pathComps[0], r.pathComps[1]) + if err != nil { + return err + } + + objPath := cdssdk.JoinObjectPath(childPathComps[2:]...) + obj, err := db.Object().GetByPath(tx, pkg.PackageID, objPath) + if err == nil { + ret = newFileFromObject(r.vfs, childPathComps, obj) + return nil + } + if err != gorm.ErrRecordNotFound { + return err + } + + has, err := db.Object().HasObjectWithPrefix(tx, pkg.PackageID, objPath+cdssdk.ObjectPathSeparator) + if err != nil { + return err + } + + if has { + dir := r.vfs.cache.LoadDir(childPathComps, &cache.CreateDirOption{ + ModTime: time.Now(), + }) + if dir == nil { + return nil + } + + ret = newDirFromCache(dir.Info(), r.vfs) + } + + return nil + }) + if err != nil { + return nil, err + } + + if ret == nil { + return nil, fuse.ErrNotExists + } + + return ret, nil + } + + if ca.IsDir { + return newDirFromCache(*ca, r.vfs), nil + } + + return newFileFromCache(*ca, r.vfs), nil +} + +func (r *FuseDir) Children(ctx context.Context) ([]fuse.FsEntry, error) { + return r.listChildren() +} + +func (r *FuseDir) ReadChildren() (fuse.DirReader, error) { + ens, err := r.listChildren() + if err != nil { + return nil, err + } + + return newFuseDirReader(ens), nil +} + +func (r *FuseDir) listChildren() ([]fuse.FsEntry, error) { + var ens []fuse.FsEntry + + infos := r.vfs.cache.StatMany(r.pathComps) + + dbEntries := make(map[string]fuse.FsEntry) + + db := r.vfs.db + db.DoTx(func(tx db2.SQLContext) error { + pkg, err := db.Package().GetUserPackageByName(tx, 1, r.pathComps[0], r.pathComps[1]) + if err != nil { + return err + } + + objPath := cdssdk.JoinObjectPath(r.pathComps[2:]...) + + coms, err := db.Object().GetCommonPrefixes(tx, pkg.PackageID, objPath+cdssdk.ObjectPathSeparator) + if err != nil { + return fmt.Errorf("getting common prefixes: %w", err) + } + + objs, err := db.Object().GetDirectChildren(tx, pkg.PackageID, objPath+cdssdk.ObjectPathSeparator) + if err != nil { + return fmt.Errorf("getting direct children: %w", err) + } + + for _, dir := range coms { + dir = strings.TrimSuffix(dir, cdssdk.ObjectPathSeparator) + pathComps := append(lo2.ArrayClone(r.pathComps), cdssdk.BaseName(dir)) + + cd := r.vfs.cache.LoadDir(pathComps, &cache.CreateDirOption{ + ModTime: time.Now(), + }) + if cd == nil { + continue + } + + dbEntries[dir] = newDirFromCache(cd.Info(), r.vfs) + } + + for _, obj := range objs { + pathComps := append(lo2.ArrayClone(r.pathComps), cdssdk.BaseName(obj.Path)) + file := newFileFromObject(r.vfs, pathComps, obj) + dbEntries[file.Name()] = file + } + + return nil + }) + + for _, c := range infos { + delete(dbEntries, c.PathComps[len(c.PathComps)-1]) + + if c.IsDir { + ens = append(ens, newDirFromCache(c, r.vfs)) + } else { + ens = append(ens, newFileFromCache(c, r.vfs)) + } + } + + for _, e := range dbEntries { + ens = append(ens, e) + } + + return ens, nil +} + +func (r *FuseDir) NewDir(ctx context.Context, name string) (fuse.FsDir, error) { + cache := r.vfs.cache.CreateDir(append(lo2.ArrayClone(r.pathComps), name)) + if cache == nil { + return nil, fuse.ErrPermission + } + + return newDirFromCache(cache.Info(), r.vfs), nil +} + +func (r *FuseDir) NewFile(ctx context.Context, name string, flags uint32) (fuse.FileHandle, error) { + cache := r.vfs.cache.CreateFile(append(lo2.ArrayClone(r.pathComps), name)) + if cache == nil { + return nil, fuse.ErrPermission + } + // Open之后会给cache的引用计数额外+1,即使cache先于FileHandle被关闭, + // 也有有FileHandle的计数保持cache的有效性 + + fileNode := newFileFromCache(cache.Info(), r.vfs) + + if flags&uint32(os.O_WRONLY) != 0 { + hd := cache.Open(false) + return newFileHandle(fileNode, hd), nil + } + + if flags&uint32(os.O_RDONLY) != 0 { + hd := cache.Open(true) + return newFileHandle(fileNode, hd), nil + } + + return nil, fuse.ErrPermission +} + +func (r *FuseDir) RemoveChild(ctx context.Context, name string) error { + pathComps := append(lo2.ArrayClone(r.pathComps), name) + err := r.vfs.cache.Remove(pathComps) + if err != nil { + return err + } + + // TODO 生成系统事件 + // 不关心是否成功 + r.vfs.db.DoTx(func(tx db2.SQLContext) error { + d := r.vfs.db + pkg, err := d.Package().GetUserPackageByName(tx, 1, pathComps[0], pathComps[1]) + if err != nil { + return err + } + + return d.Object().DeleteByPath(tx, pkg.PackageID, cdssdk.JoinObjectPath(pathComps[2:]...)) + }) + + return nil +} + +func (r *FuseDir) MoveChild(ctx context.Context, oldName string, newName string, newParent fuse.FsDir) error { + + // TODO 有问题 + oldPathComps := append(lo2.ArrayClone(r.pathComps), oldName) + + newParentNode := newParent.(FuseNode) + _, err := r.vfs.cache.Move(oldPathComps, append(newParentNode.PathComps(), newName)) + if err != nil { + return err + } + + return nil +} + +func (r *FuseDir) loadCacheDir() *cache.CacheDir { + var createOpt *cache.CreateDirOption + + err := r.vfs.db.DoTx(func(tx db2.SQLContext) error { + pkg, err := r.vfs.db.Package().GetUserPackageByName(tx, 1, r.pathComps[0], r.pathComps[1]) + if err != nil { + return err + } + + has, err := r.vfs.db.Object().HasObjectWithPrefix(tx, pkg.PackageID, cdssdk.JoinObjectPath(r.pathComps[2:]...)) + if err != nil { + return err + } + + if has { + createOpt = &cache.CreateDirOption{ + ModTime: time.Now(), + } + } + return nil + }) + if err != nil { + return nil + } + + return r.vfs.cache.LoadDir(r.pathComps, createOpt) +} + +var _ fuse.FsDir = (*FuseDir)(nil) +var _ FuseNode = (*FuseDir)(nil) diff --git a/client2/internal/mount/vfs/fuse_file.go b/client2/internal/mount/vfs/fuse_file.go index 6b9fa5e..aaa9d97 100644 --- a/client2/internal/mount/vfs/fuse_file.go +++ b/client2/internal/mount/vfs/fuse_file.go @@ -1,12 +1,14 @@ package vfs import ( + "fmt" "os" "time" cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" "gitlink.org.cn/cloudream/storage/client2/internal/mount/fuse" "gitlink.org.cn/cloudream/storage/client2/internal/mount/vfs/cache" + "gorm.io/gorm" ) type FuseFileNode struct { @@ -17,13 +19,23 @@ type FuseFileNode struct { mode os.FileMode } -func newFile(vfs *Vfs, cache *cache.CacheFile) *FuseFileNode { +func newFileFromCache(info cache.CacheEntryInfo, vfs *Vfs) *FuseFileNode { return &FuseFileNode{ vfs: vfs, - pathComps: cache.PathComps(), - size: cache.Size(), - modTime: cache.ModTime(), - mode: cache.Mode(), + pathComps: info.PathComps, + size: info.Size, + modTime: info.ModTime, + mode: info.Mode, + } +} + +func newFileFromObject(vfs *Vfs, pathComps []string, obj cdssdk.Object) *FuseFileNode { + return &FuseFileNode{ + vfs: vfs, + pathComps: pathComps, + size: obj.Size, + modTime: obj.UpdateTime, + mode: os.FileMode(0755), // TODO Object元数据中是没有保存权限的 } } @@ -61,7 +73,6 @@ func (n *FuseFileNode) SetModTime(time time.Time) error { if cacheFile == nil { return fuse.ErrNotExists } - defer cacheFile.Release() return cacheFile.SetModTime(time) } @@ -72,15 +83,14 @@ func (n *FuseFileNode) Open(flags uint32) (fuse.FileHandle, error) { // 如果文件不存在,也不进行创建,因为创建不应该调用这个接口 return nil, fuse.ErrNotExists } - defer cacheFile.Release() - if flags&uint32(os.O_WRONLY) != 0 { - hd := cacheFile.Open(false) + if flags&uint32(os.O_WRONLY) == uint32(os.O_WRONLY) { + hd := cacheFile.Open(true) return newFileHandle(n, hd), nil } - if flags&uint32(os.O_RDONLY) != 0 { - hd := cacheFile.Open(true) + if flags&uint32(os.O_RDONLY) == uint32(os.O_RDONLY) { + hd := cacheFile.Open(false) return newFileHandle(n, hd), nil } @@ -88,24 +98,25 @@ func (n *FuseFileNode) Open(flags uint32) (fuse.FileHandle, error) { } func (n *FuseFileNode) loadCacheFile() *cache.CacheFile { - cdsObj := n.loadCDSObject() - - if cdsObj != nil { - return n.vfs.cache.LoadOrCreateFile(n.pathComps, *cdsObj) + fmt.Printf("path: %v\n", n.pathComps) + if len(n.pathComps) <= 2 { + return n.vfs.cache.LoadFile(n.pathComps, nil) } - return n.vfs.cache.LoadFile(n.pathComps, false) -} - -func (n *FuseFileNode) loadCDSObject() *cdssdk.Object { - if len(n.pathComps) >= 3 { - pkg, err := n.vfs.db.Package().GetUserPackageByName(n.vfs.db.DefCtx(), 1, n.pathComps[0], n.pathComps[1]) - if err == nil { - obj, err := n.vfs.db.Object().GetByPath(n.vfs.db.DefCtx(), pkg.PackageID, cdssdk.JoinObjectPath(n.pathComps[2:]...)) - if err == nil { - return &obj - } + cdsObj, err := n.vfs.db.Object().GetByFullPath(n.vfs.db.DefCtx(), n.pathComps[0], n.pathComps[1], cdssdk.JoinObjectPath(n.pathComps[2:]...)) + if err == nil { + fmt.Printf("obj: %v\n", cdsObj) + file := n.vfs.cache.LoadFile(n.pathComps, &cdsObj) + if file == nil { + return nil } + + return file + } + + if err == gorm.ErrRecordNotFound { + fmt.Printf("not found\n") + return n.vfs.cache.LoadFile(n.pathComps, nil) } return nil diff --git a/client2/internal/mount/vfs/fuse_package.go b/client2/internal/mount/vfs/fuse_package.go new file mode 100644 index 0000000..63482c9 --- /dev/null +++ b/client2/internal/mount/vfs/fuse_package.go @@ -0,0 +1,282 @@ +package vfs + +import ( + "context" + "fmt" + "os" + "strings" + "time" + + cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" + "gitlink.org.cn/cloudream/storage/client2/internal/mount/fuse" + "gitlink.org.cn/cloudream/storage/client2/internal/mount/vfs/cache" + "gitlink.org.cn/cloudream/storage/common/pkgs/db2" + "gorm.io/gorm" +) + +type FusePackage struct { + vfs *Vfs + bktName string + pkgName string + modTime time.Time +} + +func newPackageFromCache(cache cache.CacheEntryInfo, vfs *Vfs) fuse.FsDir { + pathComps := cache.PathComps + return &FusePackage{ + vfs: vfs, + bktName: pathComps[0], + pkgName: pathComps[1], + modTime: cache.ModTime, + } +} + +func (r *FusePackage) PathComps() []string { + return []string{r.bktName, r.pkgName} +} + +func (r *FusePackage) Name() string { + return r.pkgName +} + +func (r *FusePackage) Size() int64 { + return 0 +} + +func (r *FusePackage) Mode() os.FileMode { + return os.ModeDir | 0755 +} + +func (r *FusePackage) ModTime() time.Time { + return r.modTime +} + +func (r *FusePackage) IsDir() bool { + return true +} + +func (r *FusePackage) SetModTime(time time.Time) error { + dir := r.loadCacheDir() + if dir == nil { + return fuse.ErrNotExists + } + + return dir.SetModTime(time) +} + +// 如果不存在,应该返回ErrNotExists +func (r *FusePackage) Child(ctx context.Context, name string) (fuse.FsEntry, error) { + childPathComps := []string{r.bktName, r.pkgName, name} + ca := r.vfs.cache.Stat(childPathComps) + + if ca == nil { + var ret fuse.FsEntry + + db := r.vfs.db + err := db.DoTx(func(tx db2.SQLContext) error { + pkg, err := db.Package().GetUserPackageByName(tx, 1, r.bktName, r.pkgName) + if err != nil { + return err + } + + obj, err := db.Object().GetByPath(tx, pkg.PackageID, name) + if err == nil { + ret = newFileFromObject(r.vfs, childPathComps, obj) + return nil + } + if err != gorm.ErrRecordNotFound { + return err + } + + has, err := db.Object().HasObjectWithPrefix(tx, pkg.PackageID, name+cdssdk.ObjectPathSeparator) + if err != nil { + return err + } + + if has { + dir := r.vfs.cache.LoadDir(childPathComps, &cache.CreateDirOption{ + ModTime: time.Now(), + }) + if dir == nil { + return nil + } + + ret = newDirFromCache(dir.Info(), r.vfs) + } + + return nil + }) + if err != nil { + return nil, err + } + + if ret == nil { + return nil, fuse.ErrNotExists + } + + return ret, nil + } + + if ca.IsDir { + return newDirFromCache(*ca, r.vfs), nil + } + + return newFileFromCache(*ca, r.vfs), nil +} + +func (r *FusePackage) Children(ctx context.Context) ([]fuse.FsEntry, error) { + return r.listChildren() +} + +func (r *FusePackage) ReadChildren() (fuse.DirReader, error) { + ens, err := r.listChildren() + if err != nil { + return nil, err + } + + return newFuseDirReader(ens), nil +} + +func (r *FusePackage) listChildren() ([]fuse.FsEntry, error) { + var ens []fuse.FsEntry + + infos := r.vfs.cache.StatMany([]string{r.bktName, r.pkgName}) + + dbEntries := make(map[string]fuse.FsEntry) + + db := r.vfs.db + db.DoTx(func(tx db2.SQLContext) error { + pkg, err := db.Package().GetUserPackageByName(tx, 1, r.bktName, r.pkgName) + if err != nil { + return err + } + + coms, err := db.Object().GetCommonPrefixes(tx, pkg.PackageID, "") + if err != nil { + return fmt.Errorf("getting common prefixes: %w", err) + } + + objs, err := db.Object().GetDirectChildren(tx, pkg.PackageID, "") + if err != nil { + return fmt.Errorf("getting direct children: %w", err) + } + + for _, dir := range coms { + dir = strings.TrimSuffix(dir, cdssdk.ObjectPathSeparator) + pathComps := []string{r.bktName, r.pkgName, dir} + cd := r.vfs.cache.LoadDir(pathComps, &cache.CreateDirOption{ + ModTime: time.Now(), + }) + if cd == nil { + continue + } + + dbEntries[dir] = newDirFromCache(cd.Info(), r.vfs) + } + + for _, obj := range objs { + file := newFileFromObject(r.vfs, []string{r.bktName, r.pkgName, obj.Path}, obj) + dbEntries[file.Name()] = file + } + + return nil + }) + + for _, c := range infos { + delete(dbEntries, c.PathComps[len(c.PathComps)-1]) + + if c.IsDir { + ens = append(ens, newDirFromCache(c, r.vfs)) + } else { + ens = append(ens, newFileFromCache(c, r.vfs)) + } + } + + for _, e := range dbEntries { + ens = append(ens, e) + } + + return ens, nil +} + +func (r *FusePackage) NewDir(ctx context.Context, name string) (fuse.FsDir, error) { + cache := r.vfs.cache.CreateDir([]string{r.bktName, r.pkgName, name}) + if cache == nil { + return nil, fuse.ErrPermission + } + + return newPackageFromCache(cache.Info(), r.vfs), nil +} + +func (r *FusePackage) NewFile(ctx context.Context, name string, flags uint32) (fuse.FileHandle, error) { + cache := r.vfs.cache.CreateFile([]string{r.bktName, r.pkgName, name}) + if cache == nil { + return nil, fuse.ErrPermission + } + // Open之后会给cache的引用计数额外+1,即使cache先于FileHandle被关闭, + // 也有有FileHandle的计数保持cache的有效性 + + fileNode := newFileFromCache(cache.Info(), r.vfs) + + if flags&uint32(os.O_WRONLY) != 0 { + hd := cache.Open(false) + return newFileHandle(fileNode, hd), nil + } + + if flags&uint32(os.O_RDONLY) != 0 { + hd := cache.Open(true) + return newFileHandle(fileNode, hd), nil + } + + return nil, fuse.ErrPermission +} + +func (r *FusePackage) RemoveChild(ctx context.Context, name string) error { + err := r.vfs.cache.Remove([]string{r.bktName, r.pkgName, name}) + if err != nil { + return err + } + + // TODO 生成系统事件 + // 不关心是否成功 + r.vfs.db.DoTx(func(tx db2.SQLContext) error { + d := r.vfs.db + + pkg, err := d.Package().GetUserPackageByName(tx, 1, r.bktName, r.pkgName) + if err != nil { + return err + } + + return d.Object().DeleteByPath(tx, pkg.PackageID, name) + }) + + return nil +} + +func (r *FusePackage) MoveChild(ctx context.Context, oldName string, newName string, newParent fuse.FsDir) error { + + // TODO 有问题 + + newParentNode := newParent.(FuseNode) + _, err := r.vfs.cache.Move([]string{r.bktName, r.pkgName, oldName}, append(newParentNode.PathComps(), newName)) + if err != nil { + return err + } + + return nil +} + +func (r *FusePackage) loadCacheDir() *cache.CacheDir { + var createOpt *cache.CreateDirOption + pkg, err := r.vfs.db.Package().GetUserPackageByName(r.vfs.db.DefCtx(), 1, r.bktName, r.pkgName) + if err == nil { + createOpt = &cache.CreateDirOption{ + ModTime: pkg.CreateTime, + } + } + + return r.vfs.cache.LoadDir([]string{r.bktName, r.pkgName}, createOpt) +} + +var _ fuse.FsDir = (*FusePackage)(nil) +var _ FuseNode = (*FusePackage)(nil) diff --git a/client2/internal/mount/vfs/fuse_root.go b/client2/internal/mount/vfs/fuse_root.go index f2169e3..8b5d154 100644 --- a/client2/internal/mount/vfs/fuse_root.go +++ b/client2/internal/mount/vfs/fuse_root.go @@ -5,8 +5,9 @@ import ( "os" "time" - "github.com/samber/lo" + cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" "gitlink.org.cn/cloudream/storage/client2/internal/mount/fuse" + "gitlink.org.cn/cloudream/storage/client2/internal/mount/vfs/cache" "gitlink.org.cn/cloudream/storage/common/pkgs/db2" "gorm.io/gorm" ) @@ -15,6 +16,12 @@ type Root struct { vfs *Vfs } +func newRoot(vfs *Vfs) *Root { + return &Root{ + vfs: vfs, + } +} + func (r *Root) PathComps() []string { return []string{} } @@ -35,10 +42,6 @@ func (r *Root) ModTime() time.Time { return time.Now() } -func (r *Root) CreateTime() time.Time { - return time.Now() -} - func (r *Root) IsDir() bool { return true } @@ -49,15 +52,21 @@ func (r *Root) SetModTime(time time.Time) error { // 如果不存在,应该返回ErrNotExists func (r *Root) Child(ctx context.Context, name string) (fuse.FsEntry, error) { - cache := r.vfs.cache.LoadAny([]string{name}) - if cache != nil { - defer cache.Release() + ca := r.vfs.cache.Stat([]string{name}) + + if ca == nil { + bkt, err := r.vfs.db.Bucket().GetByName(r.vfs.db.DefCtx(), name) + if err == nil { + dir := r.vfs.cache.LoadDir([]string{name}, &cache.CreateDirOption{ + ModTime: bkt.CreateTime, + }) + if dir == nil { + return nil, fuse.ErrNotExists + } - return newBucketOrFileFromCache(cache, r.vfs), nil - } + return newBucketFromCache(dir.Info(), r.vfs), nil + } - bkt, err := r.vfs.db.Bucket().GetByName(r.vfs.db.DefCtx(), name) - if err != nil { if err == gorm.ErrRecordNotFound { return nil, fuse.ErrNotExists } @@ -65,97 +74,99 @@ func (r *Root) Child(ctx context.Context, name string) (fuse.FsEntry, error) { return nil, err } - return newBucketFromDB(bkt, r.vfs), nil + if ca.IsDir { + return newBucketFromCache(*ca, r.vfs), nil + } + + return newFileFromCache(*ca, r.vfs), nil } func (r *Root) Children(ctx context.Context) ([]fuse.FsEntry, error) { - var ens []fuse.FsEntry - - exists := make(map[string]bool) - - caches := r.vfs.cache.LoadMany([]string{}) - for _, c := range caches { - exists[c.Name()] = true - ens = append(ens, newBucketOrFileFromCache(c, r.vfs)) - c.Release() - } + return r.listChildren() +} - remotes, err := r.vfs.db.Bucket().GetAll(r.vfs.db.DefCtx()) +func (r *Root) ReadChildren() (fuse.DirReader, error) { + ens, err := r.listChildren() if err != nil { return nil, err } - for _, remote := range remotes { - if !exists[remote.Name] { - ens = append(ens, newBucketFromDB(remote, r.vfs)) - } - } - return ens, nil + return newFuseDirReader(ens), nil } -func (r *Root) ReadChildren() (fuse.DirReader, error) { - caches := r.vfs.cache.LoadMany([]string{}) +func (r *Root) listChildren() ([]fuse.FsEntry, error) { + var ens []fuse.FsEntry + + infos := r.vfs.cache.StatMany([]string{}) + bkts, err := r.vfs.db.Bucket().GetAll(r.vfs.db.DefCtx()) if err != nil { return nil, err } - exists := make(map[string]fuse.FsEntry) - for _, c := range caches { - exists[c.Name()] = newBucketOrFileFromCache(c, r.vfs) - c.Release() + bktMap := make(map[string]*cdssdk.Bucket) + for _, bkt := range bkts { + b := bkt + bktMap[bkt.Name] = &b } - for _, bkt := range bkts { - if _, ok := exists[bkt.Name]; !ok { - exists[bkt.Name] = newBucketFromDB(bkt, r.vfs) + for _, c := range infos { + delete(bktMap, c.PathComps[len(c.PathComps)-1]) + + if c.IsDir { + ens = append(ens, newBucketFromCache(c, r.vfs)) + } else { + ens = append(ens, newFileFromCache(c, r.vfs)) } } - return newFuseDirReader(lo.Values(exists)), nil + // 将远端目录同步到本地缓存中,防止在给目录中的远端对象创建本地缓存时,顺便创建的目录的元数据不对的情况 + for _, bkt := range bktMap { + dir := r.vfs.cache.LoadDir([]string{bkt.Name}, &cache.CreateDirOption{ + ModTime: bkt.CreateTime, + }) + + if dir == nil { + continue + } + + ens = append(ens, newBucketFromCache(dir.Info(), r.vfs)) + } + + return ens, nil } func (r *Root) NewDir(ctx context.Context, name string) (fuse.FsDir, error) { - cache := r.vfs.cache.LoadDir([]string{name}, true) + cache := r.vfs.cache.CreateDir([]string{name}) if cache == nil { return nil, fuse.ErrPermission } - defer cache.Release() // TODO 用户ID,失败了可以打个日志 // TODO 生成系统事件 // 不关注创建是否成功,仅尝试一下 - r.vfs.db.Bucket().Create(r.vfs.db.DefCtx(), 1, name) + r.vfs.db.Bucket().Create(r.vfs.db.DefCtx(), 1, name, cache.ModTime()) - return newBucketFromCache(cache, r.vfs), nil + return newBucketFromCache(cache.Info(), r.vfs), nil } func (r *Root) NewFile(ctx context.Context, name string, flags uint32) (fuse.FileHandle, error) { - cache := r.vfs.cache.LoadFile([]string{name}, true) + cache := r.vfs.cache.CreateFile([]string{name}) if cache == nil { return nil, fuse.ErrPermission } - defer cache.Release() // Open之后会给cache的引用计数额外+1,即使cache先于FileHandle被关闭, // 也有有FileHandle的计数保持cache的有效性 - fileNode := newFile(r.vfs, cache) + fileNode := newFileFromCache(cache.Info(), r.vfs) if flags&uint32(os.O_WRONLY) != 0 { - hd, err := cache.Open(false) - if err != nil { - return nil, err - } - + hd := cache.Open(false) return newFileHandle(fileNode, hd), nil } if flags&uint32(os.O_RDONLY) != 0 { - hd, err := cache.Open(true) - if err != nil { - return nil, err - } - + hd := cache.Open(true) return newFileHandle(fileNode, hd), nil } @@ -187,6 +198,9 @@ func (r *Root) RemoveChild(ctx context.Context, name string) error { } func (r *Root) MoveChild(ctx context.Context, oldName string, newName string, newParent fuse.FsDir) error { + + // TODO 有问题 + newParentNode := newParent.(FuseNode) _, err := r.vfs.cache.Move([]string{oldName}, append(newParentNode.PathComps(), newName)) if err != nil { diff --git a/client2/internal/mount/vfs/vfs.go b/client2/internal/mount/vfs/vfs.go index 95ee3c6..ff84861 100644 --- a/client2/internal/mount/vfs/vfs.go +++ b/client2/internal/mount/vfs/vfs.go @@ -1,9 +1,13 @@ package vfs import ( + "path/filepath" + "gitlink.org.cn/cloudream/storage/client2/internal/mount/config" + "gitlink.org.cn/cloudream/storage/client2/internal/mount/fuse" "gitlink.org.cn/cloudream/storage/client2/internal/mount/vfs/cache" "gitlink.org.cn/cloudream/storage/common/pkgs/db2" + "gitlink.org.cn/cloudream/storage/common/pkgs/downloader" ) type Vfs struct { @@ -12,6 +16,18 @@ type Vfs struct { cache *cache.Cache } -func (v *Vfs) Root() { +func NewVfs(cfg *config.Config, db *db2.DB, downloader *downloader.Downloader) *Vfs { + return &Vfs{ + db: db, + config: cfg, + cache: cache.NewCache(db, downloader, filepath.Join(cfg.CacheDir, "data"), filepath.Join(cfg.CacheDir, "meta")), + } +} + +func (v *Vfs) Root() fuse.FsDir { + return newRoot(v) +} +func (v *Vfs) Stats() fuse.FsStats { + return fuse.FsStats{} } diff --git a/client2/main.go b/client2/main.go new file mode 100644 index 0000000..a315e94 --- /dev/null +++ b/client2/main.go @@ -0,0 +1,7 @@ +package main + +import "gitlink.org.cn/cloudream/storage/client2/internal/cmd" + +func main() { + cmd.RootCmd.Execute() +} diff --git a/common/pkgs/db2/bucket.go b/common/pkgs/db2/bucket.go index 7dff898..a0ba3c8 100644 --- a/common/pkgs/db2/bucket.go +++ b/common/pkgs/db2/bucket.go @@ -3,6 +3,7 @@ package db2 import ( "errors" "fmt" + "time" "gorm.io/gorm" @@ -95,7 +96,7 @@ func (*BucketDB) GetUserBuckets(ctx SQLContext, userID cdssdk.UserID) ([]model.B return ret, err } -func (db *BucketDB) Create(ctx SQLContext, userID cdssdk.UserID, bucketName string) (cdssdk.BucketID, error) { +func (db *BucketDB) Create(ctx SQLContext, userID cdssdk.UserID, bucketName string, createTime time.Time) (cdssdk.Bucket, error) { var bucketID int64 err := ctx.Table("UserBucket"). Select("Bucket.BucketID"). @@ -104,24 +105,24 @@ func (db *BucketDB) Create(ctx SQLContext, userID cdssdk.UserID, bucketName stri Scan(&bucketID).Error if err != nil { - return 0, err + return cdssdk.Bucket{}, err } if bucketID > 0 { - return 0, gorm.ErrDuplicatedKey + return cdssdk.Bucket{}, gorm.ErrDuplicatedKey } - newBucket := cdssdk.Bucket{Name: bucketName, CreatorID: userID} + newBucket := cdssdk.Bucket{Name: bucketName, CreateTime: createTime, CreatorID: userID} if err := ctx.Table("Bucket").Create(&newBucket).Error; err != nil { - return 0, fmt.Errorf("insert bucket failed, err: %w", err) + return cdssdk.Bucket{}, fmt.Errorf("insert bucket failed, err: %w", err) } err = ctx.Table("UserBucket").Create(&model.UserBucket{UserID: userID, BucketID: newBucket.BucketID}).Error if err != nil { - return 0, fmt.Errorf("insert user bucket: %w", err) + return cdssdk.Bucket{}, fmt.Errorf("insert user bucket: %w", err) } - return newBucket.BucketID, nil + return newBucket, nil } func (db *BucketDB) Rename(ctx SQLContext, bucketID cdssdk.BucketID, bucketName string) error { diff --git a/common/pkgs/db2/object.go b/common/pkgs/db2/object.go index 17de092..a88b5fc 100644 --- a/common/pkgs/db2/object.go +++ b/common/pkgs/db2/object.go @@ -5,6 +5,7 @@ import ( "strings" "time" + "gorm.io/gorm" "gorm.io/gorm/clause" cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" @@ -33,6 +34,15 @@ func (db *ObjectDB) GetByPath(ctx SQLContext, packageID cdssdk.PackageID, path s return ret, err } +func (db *ObjectDB) GetByFullPath(ctx SQLContext, bktName string, pkgName string, path string) (cdssdk.Object, error) { + var ret cdssdk.Object + err := ctx.Table("Object"). + Joins("join Package on Package.PackageID = Object.PackageID and Package.Name = ?", pkgName). + Joins("join Bucket on Bucket.BucketID = Package.BucketID and Bucket.Name = ?", bktName). + Where("Object.Path = ?", path).First(&ret).Error + return ret, err +} + func (db *ObjectDB) GetWithPathPrefix(ctx SQLContext, packageID cdssdk.PackageID, pathPrefix string) ([]cdssdk.Object, error) { var ret []cdssdk.Object err := ctx.Table("Object").Where("PackageID = ? AND Path LIKE ?", packageID, pathPrefix+"%").Order("ObjectID ASC").Find(&ret).Error @@ -77,6 +87,20 @@ func (db *ObjectDB) GetDirectChildren(ctx SQLContext, packageID cdssdk.PackageID return ret, err } +func (db *ObjectDB) HasObjectWithPrefix(ctx SQLContext, packageID cdssdk.PackageID, pathPrefix string) (bool, error) { + var obj cdssdk.Object + err := ctx.Table("Object").Where("PackageID = ? AND Path LIKE ?", packageID, pathPrefix+"%").First(&obj).Error + if err == nil { + return true, nil + } + + if err == gorm.ErrRecordNotFound { + return false, nil + } + + return false, err +} + func (db *ObjectDB) BatchTestObjectID(ctx SQLContext, objectIDs []cdssdk.ObjectID) (map[cdssdk.ObjectID]bool, error) { if len(objectIDs) == 0 { return make(map[cdssdk.ObjectID]bool), nil @@ -124,6 +148,39 @@ func (db *ObjectDB) BatchGetByPackagePath(ctx SQLContext, pkgID cdssdk.PackageID return objs, nil } +func (db *ObjectDB) GetDetail(ctx SQLContext, objectID cdssdk.ObjectID) (stgmod.ObjectDetail, error) { + var obj cdssdk.Object + err := ctx.Table("Object").Where("ObjectID = ?", objectID).First(&obj).Error + if err != nil { + return stgmod.ObjectDetail{}, fmt.Errorf("getting object: %w", err) + } + + // 获取所有的 ObjectBlock + var allBlocks []stgmod.ObjectBlock + err = ctx.Table("ObjectBlock").Where("ObjectID = ?", objectID).Order("`Index` ASC").Find(&allBlocks).Error + if err != nil { + return stgmod.ObjectDetail{}, fmt.Errorf("getting all object blocks: %w", err) + } + + // 获取所有的 PinnedObject + var allPinnedObjs []cdssdk.PinnedObject + err = ctx.Table("PinnedObject").Where("ObjectID = ?", objectID).Order("ObjectID ASC").Find(&allPinnedObjs).Error + if err != nil { + return stgmod.ObjectDetail{}, fmt.Errorf("getting all pinned objects: %w", err) + } + + pinnedAt := make([]cdssdk.StorageID, len(allPinnedObjs)) + for i, po := range allPinnedObjs { + pinnedAt[i] = po.StorageID + } + + return stgmod.ObjectDetail{ + Object: obj, + Blocks: allBlocks, + PinnedAt: pinnedAt, + }, nil +} + // 仅返回查询到的对象 func (db *ObjectDB) BatchGetDetails(ctx SQLContext, objectIDs []cdssdk.ObjectID) ([]stgmod.ObjectDetail, error) { var objs []cdssdk.Object @@ -394,3 +451,7 @@ func (db *ObjectDB) BatchDelete(ctx SQLContext, ids []cdssdk.ObjectID) error { func (db *ObjectDB) DeleteInPackage(ctx SQLContext, packageID cdssdk.PackageID) error { return ctx.Table("Object").Where("PackageID = ?", packageID).Delete(&cdssdk.Object{}).Error } + +func (db *ObjectDB) DeleteByPath(ctx SQLContext, packageID cdssdk.PackageID, path string) error { + return ctx.Table("Object").Where("PackageID = ? AND Path = ?", packageID, path).Delete(&cdssdk.Object{}).Error +} diff --git a/common/pkgs/db2/package.go b/common/pkgs/db2/package.go index 8c8812b..1f4a89c 100644 --- a/common/pkgs/db2/package.go +++ b/common/pkgs/db2/package.go @@ -2,6 +2,7 @@ package db2 import ( "fmt" + "time" "gorm.io/gorm" @@ -76,6 +77,16 @@ func (db *PackageDB) GetBucketPackages(ctx SQLContext, bucketID cdssdk.BucketID) return ret, err } +func (db *PackageDB) GetBucketPackagesByName(ctx SQLContext, bucketName string) ([]model.Package, error) { + var ret []model.Package + err := ctx.Table("Package"). + Select("Package.*"). + Joins("JOIN Bucket ON Package.BucketID = Bucket.BucketID"). + Where("Bucket.Name = ?", bucketName). + Find(&ret).Error + return ret, err +} + // IsAvailable 判断一个用户是否拥有指定对象 func (db *PackageDB) IsAvailable(ctx SQLContext, userID cdssdk.UserID, packageID cdssdk.PackageID) (bool, error) { var pkgID cdssdk.PackageID @@ -133,7 +144,7 @@ func (db *PackageDB) Create(ctx SQLContext, bucketID cdssdk.BucketID, name strin return cdssdk.Package{}, gorm.ErrDuplicatedKey } - newPackage := cdssdk.Package{Name: name, BucketID: bucketID, State: cdssdk.PackageStateNormal} + newPackage := cdssdk.Package{Name: name, BucketID: bucketID, CreateTime: time.Now(), State: cdssdk.PackageStateNormal} if err := ctx.Create(&newPackage).Error; err != nil { return cdssdk.Package{}, fmt.Errorf("insert package failed, err: %w", err) } diff --git a/common/pkgs/downloader/downloader.go b/common/pkgs/downloader/downloader.go index 66712e0..90d23e9 100644 --- a/common/pkgs/downloader/downloader.go +++ b/common/pkgs/downloader/downloader.go @@ -93,8 +93,18 @@ func (d *Downloader) DownloadObjects(reqs []DownloadReqeust) DownloadIterator { return NewDownloadObjectIterator(d, req2s) } -func (d *Downloader) DownloadObjectByDetail(detail stgmod.ObjectDetail, off int64, length int64) (Downloading, error) { - +func (d *Downloader) DownloadObjectByDetail(detail stgmod.ObjectDetail, off int64, length int64) (*Downloading, error) { + req2s := []downloadReqeust2{{ + Detail: &detail, + Raw: DownloadReqeust{ + ObjectID: detail.Object.ObjectID, + Offset: off, + Length: length, + }, + }} + + iter := NewDownloadObjectIterator(d, req2s) + return iter.MoveNext() } func (d *Downloader) DownloadPackage(pkgID cdssdk.PackageID) DownloadIterator { diff --git a/coordinator/internal/mq/bucket.go b/coordinator/internal/mq/bucket.go index 3d79d0c..12f3c91 100644 --- a/coordinator/internal/mq/bucket.go +++ b/coordinator/internal/mq/bucket.go @@ -3,6 +3,7 @@ package mq import ( "errors" "fmt" + "time" stgmod "gitlink.org.cn/cloudream/storage/common/models" "gitlink.org.cn/cloudream/storage/common/pkgs/db2" @@ -71,15 +72,11 @@ func (svc *Service) CreateBucket(msg *coormq.CreateBucket) (*coormq.CreateBucket return fmt.Errorf("getting user by id: %w", err) } - bucketID, err := svc.db2.Bucket().Create(tx, msg.UserID, msg.BucketName) + bucket, err = svc.db2.Bucket().Create(tx, msg.UserID, msg.BucketName, time.Now()) if err != nil { return fmt.Errorf("creating bucket: %w", err) } - bucket, err = svc.db2.Bucket().GetByID(tx, bucketID) - if err != nil { - return fmt.Errorf("getting bucket by id: %w", err) - } return nil }) if err != nil { diff --git a/coordinator/internal/mq/object.go b/coordinator/internal/mq/object.go index 9fb9ba2..5703a37 100644 --- a/coordinator/internal/mq/object.go +++ b/coordinator/internal/mq/object.go @@ -67,10 +67,11 @@ func (svc *Service) GetObjectsByPath(msg *coormq.GetObjectsByPath) (*coormq.GetO } if !msg.IsPrefix { - objs, err = svc.db2.Object().GetByPath(tx, msg.PackageID, msg.Path) + obj, err := svc.db2.Object().GetByPath(tx, msg.PackageID, msg.Path) if err != nil { return fmt.Errorf("getting object by path: %w", err) } + objs = append(objs, obj) return nil } @@ -770,9 +771,9 @@ func (svc *Service) CloneObjects(msg *coormq.CloneObjects) (*coormq.CloneObjects func (svc *Service) NewMultipartUploadObject(msg *coormq.NewMultipartUploadObject) (*coormq.NewMultipartUploadObjectResp, *mq.CodeMessage) { var obj cdssdk.Object err := svc.db2.DoTx(func(tx db2.SQLContext) error { - oldObjs, err := svc.db2.Object().GetByPath(tx, msg.PackageID, msg.Path) - if err == nil && len(oldObjs) > 0 { - obj = oldObjs[0] + oldObj, err := svc.db2.Object().GetByPath(tx, msg.PackageID, msg.Path) + if err == nil { + obj = oldObj err := svc.db2.ObjectBlock().DeleteByObjectID(tx, obj.ObjectID) if err != nil { return fmt.Errorf("delete object blocks: %w", err) diff --git a/magefiles/main.go b/magefiles/main.go index 2ee4010..9a47632 100644 --- a/magefiles/main.go +++ b/magefiles/main.go @@ -117,6 +117,15 @@ func Client() error { }) } +func Client2() error { + return magefiles.Build(magefiles.BuildArgs{ + OutputName: "client2", + OutputDir: "client2", + AssetsDir: "assets", + EntryFile: "client2/main.go", + }) +} + func Coordinator() error { return magefiles.Build(magefiles.BuildArgs{ OutputName: "coordinator",