package cache import ( "errors" "io" "os" "path/filepath" "sync" "syscall" "time" "github.com/inhies/go-bytesize" "github.com/samber/lo" "gitlink.org.cn/cloudream/common/pkgs/logger" "gitlink.org.cn/cloudream/common/pkgs/trie" "gitlink.org.cn/cloudream/common/utils/io2" "gitlink.org.cn/cloudream/common/utils/lo2" "gitlink.org.cn/cloudream/storage2/client/internal/db" "gitlink.org.cn/cloudream/storage2/client/internal/downloader" "gitlink.org.cn/cloudream/storage2/client/internal/mount/config" "gitlink.org.cn/cloudream/storage2/client/internal/mount/fuse" "gitlink.org.cn/cloudream/storage2/client/internal/uploader" clitypes "gitlink.org.cn/cloudream/storage2/client/types" ) type CacheEntry interface { fuse.FsEntry // 在虚拟文件系统中的路径,即不包含缓存目录的路径 PathComps() []string } type CacheEntryInfo struct { PathComps []string Size int64 Mode os.FileMode ModTime time.Time IsDir bool } type Cache struct { cfg *config.Config db *db.DB uploader *uploader.Uploader downloader *downloader.Downloader lock *sync.RWMutex cacheDone chan any activeCache *trie.Trie[*CacheFile] } func NewCache(cfg *config.Config, db *db.DB, uploader *uploader.Uploader, downloader *downloader.Downloader) *Cache { return &Cache{ cfg: cfg, db: db, uploader: uploader, downloader: downloader, lock: &sync.RWMutex{}, cacheDone: make(chan any), activeCache: trie.NewTrie[*CacheFile](), } } func (c *Cache) Start() { go c.scanningCache() go c.scanningData() } func (c *Cache) Stop() { close(c.cacheDone) } func (c *Cache) GetCacheDataPath(comps ...string) string { comps2 := make([]string, len(comps)+1) comps2[0] = c.cfg.DataDir copy(comps2[1:], comps) return filepath.Join(comps2...) } func (c *Cache) GetCacheMetaPath(comps ...string) string { comps2 := make([]string, len(comps)+1) comps2[0] = c.cfg.MetaDir copy(comps2[1:], comps) return filepath.Join(comps2...) } // 获取指定位置的缓存条目信息。如果路径不存在,则返回nil。 func (c *Cache) Stat(pathComps []string) *CacheEntryInfo { c.lock.RLock() defer c.lock.RUnlock() node, ok := c.activeCache.WalkEnd(pathComps) if ok && node.Value != nil { info := node.Value.Info() return &info } dataPath := c.GetCacheDataPath(pathComps...) stat, err := os.Stat(dataPath) if err != nil { // TODO 日志记录 return nil } if stat.IsDir() { info, err := loadCacheDirInfo(c, pathComps, stat) if err != nil { return nil } return info } info, err := loadCacheFileInfo(c, pathComps, stat) if err != nil { return nil } return info } // 创建一个缓存文件。如果文件已经存在,则会覆盖已有文件。如果加载过程中发生了错误,或者目标位置是一个目录,则会返回nil。 // // 记得使用Release减少引用计数 func (c *Cache) CreateFile(pathComps []string) *CacheFile { c.lock.Lock() defer c.lock.Unlock() node, ok := c.activeCache.WalkEnd(pathComps) if ok && node.Value != nil { node.Value.Delete() if node.Value.state.uploading != nil { node.Value.state.uploading.isDeleted = true } } ch, err := createNewCacheFile(c, pathComps) if err != nil { logger.Warnf("create new cache file %v: %v", pathComps, err) return nil } ch.state.refCount++ c.activeCache.CreateWords(pathComps).Value = ch logger.Debugf("create new cache file %v", pathComps) return ch } // 尝试加载缓存文件,如果文件不存在,则使用obj的信息创建一个新缓存文件,而如果obj为nil,那么会返回nil。 // // 记得使用Release减少引用计数 func (c *Cache) LoadFile(pathComps []string, obj *clitypes.Object) *CacheFile { c.lock.Lock() defer c.lock.Unlock() node, ok := c.activeCache.WalkEnd(pathComps) if ok && node.Value != nil { if !node.Value.state.isLoaded { err := node.Value.Load() if err != nil { logger.Warnf("load cache %v: %v", pathComps, err) return nil } } return node.Value } ch, err := loadCacheFile(c, pathComps) if err == nil { ch.remoteObj = obj ch.state.refCount++ c.activeCache.CreateWords(pathComps).Value = ch logger.Debugf("load cache %v", pathComps) return ch } if !os.IsNotExist(err) { // TODO 日志记录 logger.Warnf("load cache %v: %v", pathComps, err) return nil } if obj == nil { return nil } ch, err = newCacheFileFromObject(c, pathComps, obj) if err != nil { logger.Warnf("create cache %v from object: %v", pathComps, err) return nil } ch.state.refCount++ c.activeCache.CreateWords(pathComps).Value = ch logger.Debugf("create cache %v from object %v", pathComps, obj.ObjectID) return ch } // 创建一个缓存目录。如果目录已经存在,则会重置目录属性。如果加载过程中发生了错误,或者目标位置是一个文件,则会返回nil func (c *Cache) CreateDir(pathComps []string) *CacheDir { c.lock.Lock() defer c.lock.Unlock() ch, err := createNewCacheDir(c, pathComps) if err != nil { logger.Warnf("create cache dir: %v", err) return nil } return ch } type CreateDirOption struct { ModTime time.Time } // 加载指定缓存目录,如果目录不存在,则使用createOpt选项创建目录,而如果createOpt为nil,那么会返回nil。 func (c *Cache) LoadDir(pathComps []string, createOpt *CreateDirOption) *CacheDir { c.lock.Lock() defer c.lock.Unlock() ch, err := loadCacheDir(c, pathComps) if err == nil { return ch } if !os.IsNotExist(err) { // TODO 日志记录 return nil } if createOpt == nil { return nil } // 创建目录 ch, err = makeCacheDirFromOption(c, pathComps, *createOpt) if err != nil { // TODO 日志记录 return nil } return ch } // 加载指定路径下的所有缓存条目信息 func (c *Cache) StatMany(pathComps []string) []CacheEntryInfo { c.lock.RLock() defer c.lock.RUnlock() var infos []CacheEntryInfo exists := make(map[string]bool) node, ok := c.activeCache.WalkEnd(pathComps) if ok { for name, child := range node.WordNexts { if child.Value != nil { infos = append(infos, child.Value.Info()) exists[name] = true } } } osEns, err := os.ReadDir(c.GetCacheDataPath(pathComps...)) if err != nil { return nil } for _, e := range osEns { if exists[e.Name()] { continue } info, err := e.Info() if err != nil { continue } if e.IsDir() { info, err := loadCacheDirInfo(c, append(lo2.ArrayClone(pathComps), e.Name()), info) if err != nil { continue } infos = append(infos, *info) } else { info, err := loadCacheFileInfo(c, append(lo2.ArrayClone(pathComps), e.Name()), info) if err != nil { continue } infos = append(infos, *info) } } return infos } // 删除指定路径的缓存文件或目录。删除目录时如果目录不为空,则会报错。 func (c *Cache) Remove(pathComps []string) error { c.lock.Lock() defer c.lock.Unlock() node, ok := c.activeCache.WalkEnd(pathComps) if ok { if len(node.WordNexts) > 0 { return fuse.ErrNotEmpty } if node.Value != nil { node.Value.Delete() if node.Value.state.uploading != nil { node.Value.state.uploading.isDeleted = true } } node.RemoveSelf(true) logger.Debugf("active cache %v removed", pathComps) return nil } metaPath := c.GetCacheMetaPath(pathComps...) dataPath := c.GetCacheDataPath(pathComps...) os.Remove(metaPath) err := os.Remove(dataPath) if err == nil || os.IsNotExist(err) { logger.Debugf("local cache %v removed", pathComps) return nil } if errors.Is(err, syscall.ENOTEMPTY) { return fuse.ErrNotEmpty } return err } // 移动指定路径的缓存文件或目录到新的路径。如果目标路径已经存在,则会报错。 // // 如果移动成功,则返回移动后的缓存文件或目录。如果文件或目录不存在,则返回nil。 func (c *Cache) Move(pathComps []string, newPathComps []string) error { c.lock.Lock() defer c.lock.Unlock() _, ok := c.activeCache.WalkEnd(newPathComps) if ok { return fuse.ErrExists } newMetaPath := c.GetCacheMetaPath(newPathComps...) newDataPath := c.GetCacheDataPath(newPathComps...) _, err := os.Stat(newDataPath) if err == nil { return fuse.ErrExists } if !os.IsNotExist(err) { return err } oldMetaPath := c.GetCacheMetaPath(pathComps...) oldDataPath := c.GetCacheDataPath(pathComps...) // 确定源文件存在,再进行后面的操作 _, err = os.Stat(oldDataPath) if err != nil { if os.IsNotExist(err) { return fuse.ErrNotExists } return err } // 创建父目录是为了解决被移动的文件不在本地的问题。 // 但同时也导致了如果目的路径的父目录确实不存在,这里会意外的创建了这个目录 newMetaDir := filepath.Dir(newMetaPath) err = os.MkdirAll(newMetaDir, 0755) if err != nil { return err } newDataDir := filepath.Dir(newDataPath) err = os.MkdirAll(newDataDir, 0755) if err != nil { return err } // 每个缓存文件持有meta文件和data文件的句柄,所以这里移动文件,不影响句柄的使用。 // 只能忽略这里的错误 os.Rename(oldMetaPath, newMetaPath) os.Rename(oldDataPath, newDataPath) // 更新缓存 oldNode, ok := c.activeCache.WalkEnd(pathComps) if ok { newNode := c.activeCache.CreateWords(newPathComps) newNode.Value = oldNode.Value newNode.WordNexts = oldNode.WordNexts oldNode.RemoveSelf(false) if newNode.Value != nil { newNode.Value.Move(newPathComps) } newNode.Iterate(func(path []string, node *trie.Node[*CacheFile], isWordNode bool) trie.VisitCtrl { if node.Value != nil { node.Value.Move(lo2.AppendNew(newPathComps, path...)) } return trie.VisitContinue }) } logger.Debugf("cache moved: %v -> %v", pathComps, newPathComps) return nil } type uploadingPackage struct { bktName string pkgName string pkg clitypes.Package upObjs []*uploadingObject } type uploadingObject struct { pathComps []string cache *CacheFile reader *CacheFileHandle isDeleted bool isSuccess bool } func (c *Cache) scanningCache() { ticker := time.NewTicker(time.Second * 5) defer ticker.Stop() lastScanPath := []string{} for { select { case _, ok := <-c.cacheDone: if !ok { return } case <-ticker.C: } c.lock.Lock() type packageFullName struct { bktName string pkgName string } uploadingPkgs := make(map[packageFullName]*uploadingPackage) visitCnt := 0 visitBreak := false node, _ := c.activeCache.WalkEnd(lastScanPath) node.Iterate(func(path []string, node *trie.Node[*CacheFile], isWordNode bool) trie.VisitCtrl { ch := node.Value if ch == nil { return trie.VisitContinue } if ch.state.refCount > 0 { logger.Debugf("skip cache %v, refCount: %v", path, ch.state.refCount) return trie.VisitContinue } visitCnt++ shouldUpload := true // 不存放在Package里的文件,不需要上传 if len(ch.pathComps) <= 2 { shouldUpload = false } if ch.Revision() > 0 && shouldUpload { // 1. 本地缓存被修改了,如果一段时间内没有被使用,则进行上传 if time.Since(ch.state.freeTime) > c.cfg.UploadPendingTime && ch.state.uploading == nil { fullName := packageFullName{ch.pathComps[0], ch.pathComps[1]} pkg, ok := uploadingPkgs[fullName] if !ok { pkg = &uploadingPackage{ bktName: ch.pathComps[0], pkgName: ch.pathComps[1], } uploadingPkgs[fullName] = pkg } obj := &uploadingObject{ pathComps: lo2.ArrayClone(ch.pathComps), cache: ch, reader: ch.OpenReadWhenScanning(), } pkg.upObjs = append(pkg.upObjs, obj) ch.state.uploading = obj } } else if ch.state.isLoaded { // 2. 本地缓存没有被修改,如果一段时间内没有被使用,则进行卸载 if time.Since(ch.state.freeTime) > c.cfg.CacheActiveTime { ch.Unload() ch.state.isLoaded = false ch.state.unloadTime = time.Now() } } else { // 3. 卸载后的缓存,如果一段时间内没有被使用,则进行删除。 if time.Since(ch.state.unloadTime) > c.cfg.CacheExpireTime { // 如果文件已经同步到远端,则可以直接删除本地缓存 if ch.Revision() == 0 { ch.Delete() } node.RemoveSelf(true) } } // 每次最多遍历500个节点,防止占用锁太久 if visitCnt > 500 { lastScanPath = lo2.ArrayClone(path) visitBreak = true return trie.VisitBreak } return trie.VisitContinue }) if !visitBreak { lastScanPath = []string{} } c.lock.Unlock() if len(uploadingPkgs) > 0 { go c.doUploading(lo.Values(uploadingPkgs)) } } } func (c *Cache) scanningData() { ticker := time.NewTicker(c.cfg.ScanDataDirInterval) defer ticker.Stop() var walkTrace []*os.File var walkTraceComps []string for { select { case <-ticker.C: case <-c.cacheDone: return } logger.Infof("begin scanning data dir") if len(walkTrace) == 0 { dir, err := os.Open(c.cfg.DataDir) if err != nil { logger.Warnf("open data dir: %v", err) continue } walkTrace = []*os.File{dir} walkTraceComps = []string{c.cfg.MetaDir} } const maxVisitCnt = 5000 const maxUntrackedFiles = 500 var untrackedFiles [][]string visitCnt := 0 // 一次最多遍历5000个文件(包括路径上的文件夹),一次最多添加500个未跟踪文件 for len(walkTrace) > 0 && visitCnt < maxVisitCnt && len(untrackedFiles) < maxUntrackedFiles { lastNode := walkTrace[len(walkTrace)-1] visitCnt++ e, err := lastNode.ReadDir(1) if err == io.EOF { lastNode.Close() walkTrace = walkTrace[:len(walkTrace)-1] walkTraceComps = walkTraceComps[:len(walkTraceComps)-1] continue } if err != nil { logger.Warnf("read dir %v: %v", lastNode.Name(), err) lastNode.Close() walkTrace = walkTrace[:len(walkTrace)-1] walkTraceComps = walkTraceComps[:len(walkTraceComps)-1] continue } if e[0].IsDir() { child, err := os.Open(filepath.Join(lastNode.Name(), e[0].Name())) if err != nil { logger.Warnf("open dir %v: %v", e[0].Name(), err) continue } walkTrace = append(walkTrace, child) walkTraceComps = append(walkTraceComps, e[0].Name()) continue } // 对于不在Package层级的文件,不跟踪 if len(walkTrace) <= 2 { continue } walkTraceComps = append(walkTraceComps, e[0].Name()) fileMetaPath := filepath.Join(walkTraceComps...) _, err = os.Stat(fileMetaPath) if err == nil || !os.IsNotExist(err) { walkTraceComps = walkTraceComps[:len(walkTraceComps)-1] continue } untrackedFiles = append(untrackedFiles, lo2.ArrayClone(walkTraceComps[1:])) walkTraceComps = walkTraceComps[:len(walkTraceComps)-1] } if len(untrackedFiles) > 0 { for _, comps := range untrackedFiles { ch := c.LoadFile(comps, nil) if ch != nil { ch.Release() } } } logger.Infof("%v file visited, %v untracked files found", visitCnt, len(untrackedFiles)) } } func (c *Cache) doUploading(pkgs []*uploadingPackage) { /// 1. 先尝试创建Package var sucPkgs []*uploadingPackage var failedPkgs []*uploadingPackage for _, pkg := range pkgs { p, err := db.DoTx21(c.db, c.db.Package().TryCreateAll, pkg.bktName, pkg.pkgName) if err != nil { logger.Warnf("try create package %v/%v: %v", pkg.bktName, pkg.pkgName, err) failedPkgs = append(failedPkgs, pkg) continue } pkg.pkg = p sucPkgs = append(sucPkgs, pkg) } /// 2. 对于创建失败的Package,直接关闭文件,不进行上传 // 在锁的保护下取消上传状态 c.lock.Lock() for _, pkg := range failedPkgs { for _, obj := range pkg.upObjs { obj.cache.state.uploading = nil } } c.lock.Unlock() // 关闭文件必须在锁外 for _, pkg := range failedPkgs { for _, obj := range pkg.upObjs { obj.reader.Close() } } /// 3. 开始上传每个Package for _, p := range sucPkgs { uploader, err := c.uploader.BeginUpdate(p.pkg.PackageID, 0, nil, nil) if err != nil { logger.Warnf("begin update package %v/%v: %v", p.bktName, p.pkgName, err) // 取消上传状态 c.lock.Lock() for _, obj := range p.upObjs { obj.cache.state.uploading = nil } c.lock.Unlock() for _, obj := range p.upObjs { obj.reader.Close() } continue } upSuc := 0 upSucAmt := int64(0) upFailed := 0 upStartTime := time.Now() logger.Infof("begin uploading %v objects to package %v/%v", len(p.upObjs), p.bktName, p.pkgName) for _, o := range p.upObjs { rd := cacheFileReader{ rw: o.reader, } counter := io2.Counter(&rd) err = uploader.Upload(clitypes.JoinObjectPath(o.pathComps[2:]...), counter) if err != nil { logger.Warnf("upload object %v: %v", o.pathComps, err) upFailed++ continue } o.isSuccess = true upSuc++ upSucAmt += counter.Count() } // 在锁保护下登记上传结果 c.lock.Lock() upCancel := 0 upRename := 0 // 检查是否有文件在上传期间发生了变化 var sucObjs []*uploadingObject for _, o := range p.upObjs { o.cache.state.uploading = nil if !o.isSuccess { continue } oldPath := clitypes.JoinObjectPath(o.pathComps[2:]...) newPath := clitypes.JoinObjectPath(o.cache.pathComps[2:]...) if o.isDeleted { uploader.CancelObject(oldPath) upCancel++ continue } // 如果对象移动到了另一个Package,那么也要取消上传 if !lo2.ArrayEquals(o.pathComps[:2], o.cache.pathComps[:2]) { uploader.CancelObject(oldPath) upCancel++ continue } // 只有仍在同Package内移动的对象才能直接重命名 if newPath != oldPath { uploader.RenameObject(oldPath, newPath) upRename++ } sucObjs = append(sucObjs, o) } _, err = uploader.Commit() if err != nil { logger.Warnf("commit update package %v/%v: %v", p.bktName, p.pkgName, err) } else { for _, obj := range sucObjs { obj.cache.RevisionUploaded(obj.reader.revision) } upTime := time.Since(upStartTime) logger.Infof("upload package %v/%v in %v, upload: %v, size: %v, speed: %v/s, cancel: %v, rename: %v", p.bktName, p.pkgName, upTime, upSuc, upSucAmt, bytesize.New(float64(upSucAmt)/upTime.Seconds()), upCancel, upRename) } c.lock.Unlock() // 在Cache锁以外关闭文件。 // 关闭文件会影响refCount,所以无论是上传失败还是上传成功,都会在等待一段时间后才进行下一阶段的操作 for _, obj := range p.upObjs { obj.reader.Close() } } } type cacheFileReader struct { rw *CacheFileHandle pos int64 } func (r *cacheFileReader) Read(p []byte) (int, error) { n, err := r.rw.ReadAt(p, r.pos) r.pos += int64(n) if err != nil { return n, err } if n != len(p) { return n, io.EOF } return n, nil }