package cache import ( "fmt" "io" "os" "path/filepath" "sync" "time" "gitlink.org.cn/cloudream/common/pkgs/logger" "gitlink.org.cn/cloudream/common/utils/io2" "gitlink.org.cn/cloudream/common/utils/lo2" "gitlink.org.cn/cloudream/common/utils/math2" "gitlink.org.cn/cloudream/common/utils/serder" "gitlink.org.cn/cloudream/jcs-pub/client/internal/mount/fuse" jcstypes "gitlink.org.cn/cloudream/jcs-pub/common/types" ) type CacheLevel int const ( // 未加载 LevelNotLoaded CacheLevel = iota // 缓存数据都完整,但仅加载了一次元数据,只能读取,不能修改。 LevelReadOnly // 缓存数据都完整存在,但仅加载了元数据,没有加载文件数据 LevelMetaLoaded // 缓存数据都完整存在,且已加载到内存中 LevelComplete ) func (l CacheLevel) String() string { var levels = []string{"NotLoaded", "ReadOnly", "MetaLoaded", "Complete"} return levels[l] } type FileInfo struct { // 文件总大小。可能会超过对应的远端文件的大小。 // 此大小可能与本地缓存文件大小也不同,需要定时将本地缓存文件大小修正到与这个值相同。 Size int64 // 文件数据的版本号。如果大于0,则代表有未提交的修改 DataRevision int // 文件元数据的版本号 MetaRevision int // 数据段列表,按照段开始位置从小到大排列 Segments []*Range // 文件对应的对象ID,仅在文件是一个缓存文件时才有值 // ObjectID cdssdk.ObjectID // 文件对应的对象大小,仅在文件是一个缓存文件时才有值。 // 此值代表有多少数据应该从远端加载,所以可能会小于远端实际大小 ObjectSize int64 // 如果本文件完全是一个缓存文件,那么这个字段记录了其内容的哈希值,用于在下载缓存数据时,检查远端文件是否被修改过 // Hash cdssdk.FileHash // 文件的最后修改时间 ModTime time.Time // 文件的权限 Perm os.FileMode } func (f *FileInfo) Clone() FileInfo { n := *f n.Segments = make([]*Range, len(f.Segments)) for i, seg := range f.Segments { n.Segments[i] = &Range{ Position: seg.Position, Length: seg.Length, } } return n } type Range struct { Position int64 Length int64 } func (r *Range) GetPosition() int64 { return r.Position } func (r *Range) SetPosition(pos int64) { r.Position = pos } func (r *Range) GetLength() int64 { return r.Length } func (r *Range) SetLength(length int64) { r.Length = length } func (r *Range) End() int64 { return r.Position + r.Length } // 所有读写过程共用同一个CacheFile对象。 // 不应该将此结构体保存到对象中 type CacheFile struct { cache *Cache pathComps []string info FileInfo remoteObj *jcstypes.Object rwLock *sync.RWMutex readers []*CacheFileHandle writers []*CacheFileHandle saveMetaChan chan any saveMetaLock *sync.Mutex stopSaveMeta *bool isDeleted bool level CacheLevel refCount int freeTime time.Time changeLevelTime time.Time metaFile *os.File dataFile *os.File writeLock *sync.RWMutex // 缓存文件的状态,用于管理缓存文件的生命周期。不受rwLock保护,而是由Cache管理 state cacheState } type cacheState struct { uploading *uploadingObject } func createNewCacheFile(cache *Cache, pathComps []string) (*CacheFile, error) { metaPath := cache.GetCacheMetaPath(pathComps...) dataPath := cache.GetCacheDataPath(pathComps...) info := FileInfo{ DataRevision: 1, ModTime: time.Now(), Perm: 0755, } infoData, err := serder.ObjectToJSON(info) if err != nil { return nil, err } err = os.MkdirAll(filepath.Dir(metaPath), 0755) if err != nil { return nil, err } metaFile, err := os.OpenFile(metaPath, os.O_CREATE|os.O_RDWR|os.O_TRUNC, 0644) if err != nil { return nil, fmt.Errorf("create cache meta file: %w", err) } err = io2.WriteAll(metaFile, infoData) if err != nil { metaFile.Close() return nil, fmt.Errorf("save cache meta file: %w", err) } dataFile, err := os.OpenFile(dataPath, os.O_CREATE|os.O_RDWR|os.O_TRUNC, 0644) if err != nil { metaFile.Close() return nil, fmt.Errorf("create cache data file: %w", err) } ch := &CacheFile{ cache: cache, pathComps: pathComps, info: info, rwLock: &sync.RWMutex{}, saveMetaChan: make(chan any, 1), saveMetaLock: &sync.Mutex{}, stopSaveMeta: new(bool), level: LevelComplete, freeTime: time.Now(), changeLevelTime: time.Now(), metaFile: metaFile, dataFile: dataFile, writeLock: &sync.RWMutex{}, state: cacheState{}, } go ch.serving(ch.saveMetaChan, ch.stopSaveMeta) return ch, nil } func loadCacheFile(cache *Cache, pathComps []string) (*CacheFile, error) { metaPath := cache.GetCacheMetaPath(pathComps...) dataPath := cache.GetCacheDataPath(pathComps...) dataFile, err := os.OpenFile(dataPath, os.O_RDWR, 0644) if err != nil { // 不要包装这里的err return nil, err } info := &FileInfo{} metaFile, err := os.OpenFile(metaPath, os.O_RDWR, 0644) if err != nil { // 如果有数据文件,而没有元数据文件,则创建一个元数据文件 if !os.IsNotExist(err) { dataFile.Close() return nil, err } stat, err := dataFile.Stat() if err != nil { dataFile.Close() return nil, err } err = os.MkdirAll(filepath.Dir(metaPath), 0755) if err != nil { dataFile.Close() return nil, err } metaFile, err = os.OpenFile(metaPath, os.O_CREATE|os.O_RDWR|os.O_TRUNC, 0644) if err != nil { dataFile.Close() return nil, err } info.Size = stat.Size() info.ModTime = stat.ModTime() info.Perm = stat.Mode().Perm() info.Segments = []*Range{{Position: 0, Length: info.Size}} info.MetaRevision = 1 // 未同步的文件视为已修改 info.DataRevision = 1 } else { err = serder.JSONToObjectStream(metaFile, info) if err != nil { dataFile.Close() return nil, err } } ch := &CacheFile{ cache: cache, pathComps: pathComps, info: *info, rwLock: &sync.RWMutex{}, saveMetaChan: make(chan any, 1), saveMetaLock: &sync.Mutex{}, stopSaveMeta: new(bool), level: LevelComplete, freeTime: time.Now(), changeLevelTime: time.Now(), metaFile: metaFile, dataFile: dataFile, writeLock: &sync.RWMutex{}, state: cacheState{}, } go ch.serving(ch.saveMetaChan, ch.stopSaveMeta) return ch, nil } func newCacheFileFromObject(cache *Cache, pathComps []string, obj *jcstypes.Object) (*CacheFile, error) { metaPath := cache.GetCacheMetaPath(pathComps...) dataPath := cache.GetCacheDataPath(pathComps...) info := FileInfo{ Size: obj.Size, ObjectSize: obj.Size, ModTime: obj.UpdateTime, Perm: 0755, } infoData, err := serder.ObjectToJSON(info) if err != nil { return nil, err } err = os.MkdirAll(filepath.Dir(metaPath), 0755) if err != nil { return nil, err } metaFile, err := os.OpenFile(metaPath, os.O_CREATE|os.O_RDWR|os.O_TRUNC, 0644) if err != nil { return nil, fmt.Errorf("create cache meta file: %w", err) } err = io2.WriteAll(metaFile, infoData) if err != nil { metaFile.Close() return nil, fmt.Errorf("save cache meta file: %w", err) } dataFile, err := os.OpenFile(dataPath, os.O_CREATE|os.O_RDWR|os.O_TRUNC, 0644) if err != nil { metaFile.Close() return nil, fmt.Errorf("create cache file: %w", err) } ch := &CacheFile{ cache: cache, pathComps: pathComps, info: info, remoteObj: obj, rwLock: &sync.RWMutex{}, saveMetaChan: make(chan any, 1), saveMetaLock: &sync.Mutex{}, stopSaveMeta: new(bool), level: LevelComplete, freeTime: time.Now(), changeLevelTime: time.Now(), metaFile: metaFile, dataFile: dataFile, writeLock: &sync.RWMutex{}, state: cacheState{}, } go ch.serving(ch.saveMetaChan, ch.stopSaveMeta) return ch, nil } func loadReadOnlyCacheFile(cache *Cache, pathComps []string) (*CacheFile, error) { metaPath := cache.GetCacheMetaPath(pathComps...) dataPath := cache.GetCacheDataPath(pathComps...) dataStat, err := os.Stat(dataPath) if err != nil { // 不要包装这里的err return nil, err } if dataStat.IsDir() { return nil, fmt.Errorf("target is a directory") } info := &FileInfo{} metaData, err := os.ReadFile(metaPath) if err != nil { if !os.IsNotExist(err) { return nil, err } err = os.MkdirAll(filepath.Dir(metaPath), 0755) if err != nil { return nil, err } info.Size = dataStat.Size() info.ModTime = dataStat.ModTime() info.Perm = dataStat.Mode().Perm() info.Segments = []*Range{{Position: 0, Length: info.Size}} info.MetaRevision = 1 // 未同步的文件视为已修改 info.DataRevision = 1 // 实际的元数据文件在LevelUp时才创建 } else { err = serder.JSONToObject(metaData, info) if err != nil { return nil, err } } ch := &CacheFile{ cache: cache, pathComps: pathComps, info: *info, rwLock: &sync.RWMutex{}, saveMetaChan: nil, saveMetaLock: &sync.Mutex{}, stopSaveMeta: nil, level: LevelReadOnly, freeTime: time.Now(), changeLevelTime: time.Now(), metaFile: nil, dataFile: nil, writeLock: &sync.RWMutex{}, state: cacheState{}, } return ch, nil } func loadCacheFileInfo(cache *Cache, pathComps []string, dataFileInfo os.FileInfo) (*CacheEntryInfo, error) { metaPath := cache.GetCacheMetaPath(pathComps...) metaData, err := os.ReadFile(metaPath) if err == nil { info := &FileInfo{} err = serder.JSONToObject(metaData, info) if err != nil { return nil, err } return &CacheEntryInfo{ PathComps: pathComps, Size: info.Size, Perm: info.Perm, ModTime: info.ModTime, IsDir: false, }, nil } if !os.IsNotExist(err) { return nil, err } return &CacheEntryInfo{ PathComps: pathComps, Size: dataFileInfo.Size(), Perm: dataFileInfo.Mode(), ModTime: dataFileInfo.ModTime(), IsDir: false, }, nil } // 增加一个引用计数。不应该被Cache之外的代码调用。 func (f *CacheFile) IncRef() { f.rwLock.Lock() defer f.rwLock.Unlock() f.refCount++ } // 减少一个引用计数 func (f *CacheFile) Release() { f.rwLock.Lock() defer f.rwLock.Unlock() f.refCount-- if f.refCount == 0 { f.freeTime = time.Now() } } func (f *CacheFile) LevelDown(level CacheLevel) bool { if level <= LevelReadOnly { // 如果降级到不需要保存元数据的级别,就要先暂停保存元数据的操作 f.saveMetaLock.Lock() defer f.saveMetaLock.Unlock() } f.rwLock.Lock() defer f.rwLock.Unlock() if level >= f.level { return true } // 缓存正在被使用时,不能降级 if f.refCount > 0 { return false } switch f.level { case LevelComplete: f.dataFile.Close() f.level = LevelMetaLoaded if level >= f.level { break } fallthrough case LevelMetaLoaded: if !f.isDeleted { // TODO 日志 f.saveMeta(f.info) } f.saveMetaChan = nil // 由于已经获取了saveMetaLock,所以这里设置true之后调用metaFile.Close不会导致saveMeta线程的保存失败 // 因为saveMeta线程会先检查stopSaveMete的值 *f.stopSaveMeta = true f.stopSaveMeta = nil f.metaFile.Close() f.level = LevelReadOnly if level >= f.level { break } fallthrough case LevelReadOnly: f.level = LevelNotLoaded if level >= f.level { break } fallthrough case LevelNotLoaded: } f.changeLevelTime = time.Now() return true } func (f *CacheFile) LevelUp(level CacheLevel) bool { f.rwLock.Lock() defer f.rwLock.Unlock() if level <= f.level { return true } // 缓存正在使用时,可以升级 switch f.level { case LevelNotLoaded: f.level = LevelReadOnly if level <= f.level { break } fallthrough case LevelReadOnly: metaPath := f.cache.GetCacheMetaPath(f.pathComps...) metaFile, err := os.OpenFile(metaPath, os.O_RDWR|os.O_CREATE, 0644) if err != nil { logger.Warnf("open meta file %v: %v", metaPath, err) return false } f.saveMetaChan = make(chan any, 1) f.stopSaveMeta = new(bool) f.metaFile = metaFile f.level = LevelMetaLoaded go f.serving(f.saveMetaChan, f.stopSaveMeta) if level <= f.level { break } fallthrough case LevelMetaLoaded: dataPath := f.cache.GetCacheDataPath(f.pathComps...) dataFile, err := os.OpenFile(dataPath, os.O_RDWR|os.O_CREATE, 0644) if err != nil { logger.Warnf("open data file %v: %v", dataPath, err) return false } f.dataFile = dataFile f.level = LevelComplete if level <= f.level { break } fallthrough case LevelComplete: } f.changeLevelTime = time.Now() return true } func (f *CacheFile) RevisionUploaded(dataRev int, metaRev int) { f.rwLock.Lock() defer f.rwLock.Unlock() if dataRev != 0 && f.info.DataRevision == dataRev { f.info.DataRevision = 0 } if metaRev != 0 && f.info.MetaRevision == metaRev { f.info.MetaRevision = 0 } f.letSave() } func (f *CacheFile) Info() CacheEntryInfo { f.rwLock.RLock() defer f.rwLock.RUnlock() return CacheEntryInfo{ PathComps: f.pathComps, Size: f.info.Size, Perm: f.info.Perm, ModTime: f.info.ModTime, IsDir: false, MetaRevision: f.info.MetaRevision, DataRevision: f.info.DataRevision, RefCount: f.refCount, FreeTime: f.freeTime, Level: f.level, ChangeLevelTime: f.changeLevelTime, } } func (f *CacheFile) Delete() { f.writeLock.Lock() defer f.writeLock.Unlock() f.rwLock.Lock() defer f.rwLock.Unlock() metaPath := f.cache.GetCacheMetaPath(f.pathComps...) dataPath := f.cache.GetCacheDataPath(f.pathComps...) os.Remove(metaPath) os.Remove(dataPath) // 不可能将isDeleted从true改为false,所以这里不需要使用stopSaveChan来等待saveMeta线程退出 f.isDeleted = true if f.saveMetaChan != nil { f.letSave() } } func (f *CacheFile) Move(newPathComps []string) { f.writeLock.Lock() defer f.writeLock.Unlock() f.rwLock.Lock() defer f.rwLock.Unlock() f.pathComps = newPathComps if f.saveMetaChan != nil { f.letSave() } } // 打开一个写入句柄,同时支持读取 func (f *CacheFile) Open(flags uint32) *CacheFileHandle { logger.Tracef("CacheFile.Open: %v, %#x", f.pathComps, flags) f.rwLock.Lock() defer f.rwLock.Unlock() f.refCount++ h := &CacheFileHandle{ file: f, remoteLock: &sync.Mutex{}, revision: f.info.DataRevision, } if flags&uint32(os.O_RDWR) == uint32(os.O_RDWR) { h.readable = true h.writeable = true } else if flags&uint32(os.O_WRONLY) == uint32(os.O_WRONLY) { h.writeable = true } else if flags&uint32(os.O_RDONLY) == uint32(os.O_RDONLY) { h.readable = true } if f.remoteObj != nil { h.remote = newRemoteLoader(f) } if h.writeable { f.writers = append(f.writers, h) } else { f.readers = append(f.readers, h) } return h } // 打开一个读取句柄,用于同步本地文件到远端 func (f *CacheFile) OpenReadWhenScanning() *CacheFileHandle { f.rwLock.Lock() defer f.rwLock.Unlock() f.refCount++ h := &CacheFileHandle{ file: f, remoteLock: &sync.Mutex{}, revision: f.info.DataRevision, readable: true, } if f.remoteObj != nil { h.remote = newRemoteLoader(f) } f.readers = append(f.readers, h) return h } func (f *CacheFile) SetModTime(modTime time.Time) error { logger.Tracef("CacheFile.SetModTime: %v, %v", f.pathComps, modTime) f.rwLock.Lock() f.info.ModTime = modTime f.info.MetaRevision++ f.rwLock.Unlock() f.letSave() return nil } func (f *CacheFile) Truncate(size int64) error { logger.Tracef("CacheFile.Truncate: %v, %v", f.pathComps, size) // 修改文件大小前不允许写入 f.writeLock.Lock() defer f.writeLock.Unlock() err := f.dataFile.Truncate(size) if err != nil { return err } f.rwLock.Lock() defer f.rwLock.Unlock() // 调整能从远端下载的大小 f.info.ObjectSize = math2.Min(f.info.ObjectSize, size) // 调整本地缓存文件里的有效数据大小 if size < f.info.Size { f.info.Segments = TruncateRange(f.info.Segments, size) } else if size > f.info.Size { f.info.Segments = AddRange(f.info.Segments, &Range{Position: f.info.Size, Length: size - f.info.Size}) } if f.info.Size != size { f.info.DataRevision++ } f.info.Size = size f.letSave() return nil } func (f *CacheFile) serving(saveMetaChan chan any, stopSaveMeta *bool) { ticker := time.NewTicker(time.Second * 30) defer ticker.Stop() for { select { case _, ok := <-saveMetaChan: if !ok { return } case <-ticker.C: } f.saveMetaLock.Lock() if *stopSaveMeta { f.saveMetaLock.Unlock() break } f.rwLock.RLock() info := f.info.Clone() // 如果文件已被删除,则不能再保存元数据,防止覆盖掉新创建的同名文件 if f.isDeleted { f.rwLock.RUnlock() f.saveMetaLock.Unlock() break } f.rwLock.RUnlock() // TODO 错误日志 f.saveMeta(info) f.metaFile.Sync() f.saveMetaLock.Unlock() } } func (f *CacheFile) saveMeta(info FileInfo) error { jsonData, err := serder.ObjectToJSON(info) if err != nil { return err } err = f.metaFile.Truncate(0) if err != nil { return err } _, err = f.metaFile.Seek(0, io.SeekStart) if err != nil { return err } err = io2.WriteAll(f.metaFile, jsonData) if err != nil { return err } return nil } func (f *CacheFile) letSave() { select { case f.saveMetaChan <- nil: default: } } type CacheFileHandle struct { file *CacheFile readable bool writeable bool remote *RemoteLoader remoteLock *sync.Mutex revision int // 打开文件时,文件的版本号 } func (h *CacheFileHandle) ReadAt(buf []byte, off int64) (int, error) { log := logger.WithField("F", "CacheFileHandle.ReadAt"). WithField("Path", h.file.pathComps) log.Tracef("buf: %v, off: %v", len(buf), off) if !h.readable { return 0, fuse.ErrPermission } // 读取数据必须读满整个buf,否则就会被认为是文件已经结束了 totalReadLen := 0 for totalReadLen < len(buf) { curBuf := buf[totalReadLen:] curOff := off + int64(totalReadLen) h.file.rwLock.RLock() if curOff >= h.file.info.Size { h.file.rwLock.RUnlock() break } /// 1. 先尝试从本地缓存文件里读取 rngIdx := FirstContainsIndex(h.file.info.Segments, curOff) if rngIdx >= 0 && h.file.info.Segments[rngIdx].End() > curOff { readLen := math2.Min(int64(len(curBuf)), h.file.info.Segments[rngIdx].End()-curOff) realReadLen, err := h.file.dataFile.ReadAt(curBuf[:readLen], curOff) totalReadLen += realReadLen h.file.rwLock.RUnlock() if err != nil { log.Tracef("read from local cache: %v", err) return totalReadLen, err } continue } // 否则从远端下载,计算一下要加载的长度 loadLen := math2.Min(int64(len(curBuf)), h.file.info.ObjectSize-curOff) if rngIdx+1 < len(h.file.info.Segments) { // 最多加载到下一个段的开头 loadLen = math2.Min(loadLen, h.file.info.Segments[rngIdx+1].Position-curOff) } h.file.rwLock.RUnlock() /// 2. 开始从远端下载数据 if h.remote == nil { log.Warnf("no remote file") return totalReadLen, fmt.Errorf("no remote file") } // 由于RemoteLoader的Load方法没有加锁,所以这里要加锁,防止并发Seek导致的问题 // 可以考虑在RemoteLoader里加锁,这样可以实现跨Writer共用Loader h.remoteLock.Lock() realLoadLen, err := h.remote.Load(curBuf[:loadLen], curOff) totalReadLen += realLoadLen if err != nil { h.remoteLock.Unlock() return totalReadLen, err } h.remoteLock.Unlock() log.Tracef("load from remote: %v", realLoadLen) /// 3. 数据加载完毕,写入到本地文件 // 在写入到本地之前,先停止其他的写入,防止冲突 h.file.writeLock.Lock() // 停止其他写入后,就可以计算一下实际要写回的长度。 h.file.rwLock.RLock() loadRng := &Range{Position: curOff, Length: int64(realLoadLen)} DifferentRange(loadRng, h.file.info.Segments) h.file.rwLock.RUnlock() if loadRng.Length == 0 { h.file.writeLock.Unlock() continue } // 写入到本地缓存文件 writeStart := loadRng.Position - curOff _, err = h.file.dataFile.WriteAt(curBuf[writeStart:writeStart+loadRng.Length], curOff) if err != nil { h.file.writeLock.Unlock() log.Warnf("save to local file: %v", err) return totalReadLen, err } log.Tracef("save to local: %v", loadRng.Length) h.file.writeLock.Unlock() // 提交到段列表里 h.file.rwLock.Lock() h.file.info.Segments = AddRange(h.file.info.Segments, loadRng) h.file.rwLock.Unlock() h.file.letSave() } return totalReadLen, nil } func (h *CacheFileHandle) WriteAt(buf []byte, off int64) (int, error) { log := logger.WithField("F", "CacheFileHandle.WriteAt").WithField("Path", h.file.pathComps) log.Tracef("buf: %v, off: %v", len(buf), off) if !h.writeable { return 0, fuse.ErrPermission } // 允许多线程并行写入,但在数据加载期间不能写入 h.file.writeLock.RLock() defer h.file.writeLock.RUnlock() // 写入到本地缓存文件 writeLen, err := h.file.dataFile.WriteAt(buf, off) if err != nil { log.Tracef("save to local file: %v", err) return writeLen, err } // 提交到段列表里 h.file.rwLock.Lock() defer h.file.rwLock.Unlock() h.file.info.Segments = AddRange(h.file.info.Segments, &Range{Position: off, Length: int64(writeLen)}) h.file.info.Size = math2.Max(h.file.info.Size, off+int64(writeLen)) h.file.info.DataRevision++ h.file.letSave() return writeLen, nil } func (f *CacheFileHandle) Sync() error { log := logger.WithField("Path", f.file.pathComps) err := f.file.dataFile.Sync() if err != nil { log.Tracef("sync local file: %v", err) return err } return nil } func (f *CacheFileHandle) Close() error { f.Sync() if f.remote != nil { f.remote.Close() } f.file.rwLock.Lock() defer f.file.rwLock.Unlock() f.file.refCount-- if f.file.refCount == 0 { f.file.freeTime = time.Now() } if f.writeable { f.file.writers = lo2.Remove(f.file.writers, f) } else if f.readable { f.file.readers = lo2.Remove(f.file.readers, f) } return nil }