package cache import ( "errors" "io" "os" "path/filepath" "sync" "syscall" "time" "github.com/inhies/go-bytesize" "github.com/samber/lo" "gitlink.org.cn/cloudream/common/pkgs/logger" "gitlink.org.cn/cloudream/common/pkgs/trie" "gitlink.org.cn/cloudream/common/utils/io2" "gitlink.org.cn/cloudream/common/utils/lo2" "gitlink.org.cn/cloudream/common/utils/sort2" "gitlink.org.cn/cloudream/jcs-pub/client/internal/db" "gitlink.org.cn/cloudream/jcs-pub/client/internal/downloader" "gitlink.org.cn/cloudream/jcs-pub/client/internal/mount/config" "gitlink.org.cn/cloudream/jcs-pub/client/internal/mount/fuse" "gitlink.org.cn/cloudream/jcs-pub/client/internal/uploader" clitypes "gitlink.org.cn/cloudream/jcs-pub/client/types" ) type CacheEntry interface { fuse.FsEntry // 在虚拟文件系统中的路径,即不包含缓存目录的路径 PathComps() []string } type CacheEntryInfo struct { PathComps []string Size int64 Perm os.FileMode ModTime time.Time IsDir bool // 元数据版本号 MetaRevision int // 文件数据版本号 DataRevision int // 引用计数 RefCount int // 上次引用计数归零的时间,也即上次使用时间 FreeTime time.Time // 缓存等级 Level CacheLevel // 缓存等级改变时间 ChangeLevelTime time.Time } type Cache struct { cfg *config.Config db *db.DB uploader *uploader.Uploader downloader *downloader.Downloader lock *sync.RWMutex cacheDone chan any doFullScan chan any activeCache *trie.Trie[*CacheFile] } func NewCache(cfg *config.Config, db *db.DB, uploader *uploader.Uploader, downloader *downloader.Downloader) *Cache { return &Cache{ cfg: cfg, db: db, uploader: uploader, downloader: downloader, lock: &sync.RWMutex{}, cacheDone: make(chan any, 1), doFullScan: make(chan any, 1), activeCache: trie.NewTrie[*CacheFile](), } } func (c *Cache) Start() { go c.scanningCache() go c.scanningData() } func (c *Cache) Stop() { close(c.cacheDone) } func (c *Cache) GetCacheDataPath(comps ...string) string { comps2 := make([]string, len(comps)+1) comps2[0] = c.cfg.DataDir copy(comps2[1:], comps) return filepath.Join(comps2...) } func (c *Cache) GetCacheMetaPath(comps ...string) string { comps2 := make([]string, len(comps)+1) comps2[0] = c.cfg.MetaDir copy(comps2[1:], comps) return filepath.Join(comps2...) } func (c *Cache) Dump() CacheStatus { c.lock.RLock() defer c.lock.RUnlock() var activeFiles []CacheFileStatus c.activeCache.Iterate(func(path []string, node *trie.Node[*CacheFile], isWordNode bool) trie.VisitCtrl { if node.Value == nil { return trie.VisitContinue } info := node.Value.Info() activeFiles = append(activeFiles, CacheFileStatus{ Path: filepath.Join(path...), RefCount: info.RefCount, Level: info.Level.String(), IsUploading: node.Value.state.uploading != nil, }) return trie.VisitContinue }) return CacheStatus{ ActiveFiles: activeFiles, } } func (c *Cache) ReclaimSpace() { select { case c.doFullScan <- nil: default: } } // 获取指定位置的缓存条目信息。如果路径不存在,则返回nil。 func (c *Cache) Stat(pathComps []string) *CacheEntryInfo { c.lock.RLock() defer c.lock.RUnlock() node, ok := c.activeCache.WalkEnd(pathComps) if ok && node.Value != nil { info := node.Value.Info() return &info } dataPath := c.GetCacheDataPath(pathComps...) stat, err := os.Stat(dataPath) if err != nil { // TODO 日志记录 return nil } if stat.IsDir() { info, err := loadCacheDirInfo(c, pathComps, stat) if err != nil { return nil } return info } info, err := loadCacheFileInfo(c, pathComps, stat) if err != nil { return nil } return info } // 创建一个缓存文件。如果文件已经存在,则会覆盖已有文件。如果加载过程中发生了错误,或者目标位置是一个目录,则会返回nil。 // // 记得使用Release减少引用计数 func (c *Cache) CreateFile(pathComps []string) *CacheFile { c.lock.Lock() defer c.lock.Unlock() node, ok := c.activeCache.WalkEnd(pathComps) if ok && node.Value != nil { node.Value.Delete() if node.Value.state.uploading != nil { node.Value.state.uploading.isDeleted = true } } ch, err := createNewCacheFile(c, pathComps) if err != nil { logger.Warnf("create new cache file %v: %v", pathComps, err) return nil } ch.IncRef() c.activeCache.CreateWords(pathComps).Value = ch logger.Debugf("create new cache file %v", pathComps) return ch } // 尝试加载缓存文件,如果文件不存在,则使用obj的信息创建一个新缓存文件,而如果obj为nil,那么会返回nil。 // // 记得使用Release减少引用计数 func (c *Cache) LoadFile(pathComps []string, obj *clitypes.Object) *CacheFile { c.lock.Lock() defer c.lock.Unlock() node, ok := c.activeCache.WalkEnd(pathComps) if ok && node.Value != nil { node.Value.IncRef() return node.Value } ch, err := loadCacheFile(c, pathComps) if err == nil { ch.remoteObj = obj ch.IncRef() c.activeCache.CreateWords(pathComps).Value = ch logger.Debugf("load cache %v", pathComps) return ch } if !os.IsNotExist(err) { // TODO 日志记录 logger.Warnf("load cache %v: %v", pathComps, err) return nil } if obj == nil { return nil } ch, err = newCacheFileFromObject(c, pathComps, obj) if err != nil { logger.Warnf("create cache %v from object: %v", pathComps, err) return nil } ch.IncRef() c.activeCache.CreateWords(pathComps).Value = ch logger.Debugf("create cache %v from object %v", pathComps, obj.ObjectID) return ch } // 仅加载文件的元数据,如果文件不存在,则返回nil // // 记得使用Release减少引用计数 func (c *Cache) LoadReadOnlyFile(pathComps []string) *CacheFile { c.lock.Lock() defer c.lock.Unlock() node, ok := c.activeCache.WalkEnd(pathComps) if ok && node.Value != nil { node.Value.IncRef() return node.Value } ch, err := loadReadOnlyCacheFile(c, pathComps) if err == nil { ch.IncRef() c.activeCache.CreateWords(pathComps).Value = ch logger.Debugf("load cache %v", pathComps) return ch } if !os.IsNotExist(err) { // TODO 日志记录 logger.Warnf("load cache %v: %v", pathComps, err) return nil } return nil } // 创建一个缓存目录。如果目录已经存在,则会重置目录属性。如果加载过程中发生了错误,或者目标位置是一个文件,则会返回nil func (c *Cache) CreateDir(pathComps []string) *CacheDir { c.lock.Lock() defer c.lock.Unlock() ch, err := createNewCacheDir(c, pathComps) if err != nil { logger.Warnf("create cache dir: %v", err) return nil } return ch } type CreateDirOption struct { ModTime time.Time } // 加载指定缓存目录,如果目录不存在,则使用createOpt选项创建目录,而如果createOpt为nil,那么会返回nil。 func (c *Cache) LoadDir(pathComps []string, createOpt *CreateDirOption) *CacheDir { c.lock.Lock() defer c.lock.Unlock() ch, err := loadCacheDir(c, pathComps) if err == nil { return ch } if !os.IsNotExist(err) { // TODO 日志记录 return nil } if createOpt == nil { return nil } // 创建目录 ch, err = makeCacheDirFromOption(c, pathComps, *createOpt) if err != nil { // TODO 日志记录 return nil } return ch } // 加载指定路径下的所有缓存条目信息 func (c *Cache) StatMany(pathComps []string) []CacheEntryInfo { c.lock.RLock() defer c.lock.RUnlock() var infos []CacheEntryInfo exists := make(map[string]bool) node, ok := c.activeCache.WalkEnd(pathComps) if ok { for name, child := range node.WordNexts { if child.Value != nil { infos = append(infos, child.Value.Info()) exists[name] = true } } } osEns, err := os.ReadDir(c.GetCacheDataPath(pathComps...)) if err != nil { return nil } for _, e := range osEns { if exists[e.Name()] { continue } info, err := e.Info() if err != nil { continue } if e.IsDir() { info, err := loadCacheDirInfo(c, append(lo2.ArrayClone(pathComps), e.Name()), info) if err != nil { continue } infos = append(infos, *info) } else { info, err := loadCacheFileInfo(c, append(lo2.ArrayClone(pathComps), e.Name()), info) if err != nil { continue } infos = append(infos, *info) } } return infos } // 删除指定路径的缓存文件或目录。删除目录时如果目录不为空,则会报错。 func (c *Cache) Remove(pathComps []string) error { c.lock.Lock() defer c.lock.Unlock() node, ok := c.activeCache.WalkEnd(pathComps) if ok { if len(node.WordNexts) > 0 { return fuse.ErrNotEmpty } if node.Value != nil { node.Value.Delete() if node.Value.state.uploading != nil { node.Value.state.uploading.isDeleted = true } } node.RemoveSelf(true) logger.Debugf("active cache %v removed", pathComps) return nil } metaPath := c.GetCacheMetaPath(pathComps...) dataPath := c.GetCacheDataPath(pathComps...) os.Remove(metaPath) err := os.Remove(dataPath) if err == nil || os.IsNotExist(err) { logger.Debugf("local cache %v removed", pathComps) return nil } if errors.Is(err, syscall.ENOTEMPTY) { return fuse.ErrNotEmpty } return err } // 移动指定路径的缓存文件或目录到新的路径。如果目标路径已经存在,则会报错。 // // 如果移动成功,则返回移动后的缓存文件或目录。如果文件或目录不存在,则返回nil。 func (c *Cache) Move(pathComps []string, newPathComps []string) error { c.lock.Lock() defer c.lock.Unlock() _, ok := c.activeCache.WalkEnd(newPathComps) if ok { return fuse.ErrExists } newMetaPath := c.GetCacheMetaPath(newPathComps...) newDataPath := c.GetCacheDataPath(newPathComps...) _, err := os.Stat(newDataPath) if err == nil { return fuse.ErrExists } if !os.IsNotExist(err) { return err } oldMetaPath := c.GetCacheMetaPath(pathComps...) oldDataPath := c.GetCacheDataPath(pathComps...) // 确定源文件存在,再进行后面的操作 _, err = os.Stat(oldDataPath) if err != nil { if os.IsNotExist(err) { return fuse.ErrNotExists } return err } // 创建父目录是为了解决被移动的文件不在本地的问题。 // 但同时也导致了如果目的路径的父目录确实不存在,这里会意外的创建了这个目录 newMetaDir := filepath.Dir(newMetaPath) err = os.MkdirAll(newMetaDir, 0755) if err != nil { return err } newDataDir := filepath.Dir(newDataPath) err = os.MkdirAll(newDataDir, 0755) if err != nil { return err } // 每个缓存文件持有meta文件和data文件的句柄,所以这里移动文件,不影响句柄的使用。 // 只能忽略这里的错误 os.Rename(oldMetaPath, newMetaPath) os.Rename(oldDataPath, newDataPath) // 更新缓存 oldNode, ok := c.activeCache.WalkEnd(pathComps) if ok { newNode := c.activeCache.CreateWords(newPathComps) newNode.Value = oldNode.Value newNode.WordNexts = oldNode.WordNexts oldNode.RemoveSelf(false) if newNode.Value != nil { newNode.Value.Move(newPathComps) } newNode.Iterate(func(path []string, node *trie.Node[*CacheFile], isWordNode bool) trie.VisitCtrl { if node.Value != nil { node.Value.Move(lo2.AppendNew(newPathComps, path...)) } return trie.VisitContinue }) } logger.Debugf("cache moved: %v -> %v", pathComps, newPathComps) return nil } type syncPackage struct { bktName string pkgName string pkg clitypes.Package upObjs []*uploadingObject } type uploadingObject struct { pathComps []string cache *CacheFile reader *CacheFileHandle modTime time.Time metaRevision int isDeleted bool isSuccess bool } type packageFullName struct { bktName string pkgName string } func (c *Cache) scanningCache() { ticker := time.NewTicker(time.Second * 5) defer ticker.Stop() lastScanPath := []string{} nextFullScan := false for { select { case _, ok := <-c.cacheDone: if !ok { return } case <-ticker.C: case <-c.doFullScan: nextFullScan = true } // 每完成一轮快速的渐进全量扫描,就进行一次即时全量扫描 if nextFullScan { c.fullScan() nextFullScan = false continue } lastScanPath = c.fastScan(lastScanPath) if len(lastScanPath) == 0 { nextFullScan = true } } } // 全量扫描,主要是检查总缓存大小是否超标 func (c *Cache) fullScan() { log := logger.WithField("Mod", "Mount") startTime := time.Now() log.Debug("begin full scan") defer func() { log.Debugf("full scan done, time: %v", time.Since(startTime)) }() c.lock.Lock() defer c.lock.Unlock() totalCacheSize := int64(0) type readOnlyCache struct { Info CacheEntryInfo Node *trie.Node[*CacheFile] } var readOnlyCaches []readOnlyCache c.activeCache.Iterate(func(path []string, node *trie.Node[*CacheFile], isWordNode bool) trie.VisitCtrl { ch := node.Value if ch == nil { return trie.VisitContinue } info := ch.Info() if info.Level > LevelReadOnly { return trie.VisitContinue } if info.DataRevision > 0 || info.MetaRevision > 0 { return trie.VisitContinue } readOnlyCaches = append(readOnlyCaches, readOnlyCache{ Info: info, Node: node, }) totalCacheSize += info.Size return trie.VisitContinue }) // 如果总缓存文件大小超过限制,那么就从最早被使用的开始删除 if c.cfg.MaxCacheSize > 0 { needReclaim := totalCacheSize - c.cfg.MaxCacheSize if needReclaim > 0 { readOnlyCaches = sort2.Sort(readOnlyCaches, func(left, right readOnlyCache) int { return left.Info.FreeTime.Compare(right.Info.FreeTime) }) reclaimed := int64(0) rmCnt := 0 for _, rc := range readOnlyCaches { rc.Node.Value.Delete() rc.Node.RemoveSelf(true) needReclaim -= rc.Info.Size reclaimed += rc.Info.Size rmCnt += 1 if needReclaim <= 0 { break } } log.Infof("%v cache file removed, reclaimed %v bytes, total cache size: %v", rmCnt, reclaimed, totalCacheSize-reclaimed) } } // TODO 还可以做点其他的检查,比如文件句柄数 } // 快速扫描,每次只扫描一部分节点,做的事情会繁重一点 func (c *Cache) fastScan(lastScanPath []string) []string { c.lock.Lock() uploadingPkgs := make(map[packageFullName]*syncPackage) visitCnt := 0 visitBreak := false node, _ := c.activeCache.WalkEnd(lastScanPath) node.Iterate(func(path []string, node *trie.Node[*CacheFile], isWordNode bool) trie.VisitCtrl { ch := node.Value if ch == nil { return trie.VisitContinue } info := ch.Info() if info.RefCount > 0 { // logger.Debugf("skip cache %v, refCount: %v", path, info.RefCount) return trie.VisitContinue } visitCnt++ c.visitNode(path, node, ch, info, uploadingPkgs) // 每次最多遍历500个节点,防止占用锁太久 if visitCnt > 500 { lastScanPath = lo2.ArrayClone(path) visitBreak = true return trie.VisitBreak } return trie.VisitContinue }) if !visitBreak { lastScanPath = []string{} } c.lock.Unlock() if len(uploadingPkgs) > 0 { go c.doSync(lo.Values(uploadingPkgs)) } return lastScanPath } func (c *Cache) visitNode(path []string, node *trie.Node[*CacheFile], ch *CacheFile, info CacheEntryInfo, uploadingPkgs map[packageFullName]*syncPackage) { shouldUpload := true // 不存放在Package里的文件,不需要上传 if len(ch.pathComps) <= 2 { shouldUpload = false } // 1. 本地缓存被修改了,如果一段时间内没有被使用,则进行上传 if shouldUpload && (info.DataRevision > 0 || info.MetaRevision > 0) { if time.Since(info.FreeTime) < c.cfg.UploadPendingTime { return } if ch.state.uploading != nil { return } // 上传文件需要完全加载级别的缓存等级 if !ch.LevelUp(LevelComplete) { return } fullName := packageFullName{ch.pathComps[0], ch.pathComps[1]} pkg, ok := uploadingPkgs[fullName] if !ok { pkg = &syncPackage{ bktName: ch.pathComps[0], pkgName: ch.pathComps[1], } uploadingPkgs[fullName] = pkg } up := &uploadingObject{ pathComps: lo2.ArrayClone(ch.pathComps), cache: ch, } pkg.upObjs = append(pkg.upObjs, up) ch.state.uploading = up if info.DataRevision > 0 { up.reader = ch.OpenReadWhenScanning() } if info.MetaRevision > 0 { up.modTime = info.ModTime up.metaRevision = info.MetaRevision } return } // 2. 本地缓存没有被修改,如果一段时间内没有被使用,则进行卸载 if info.Level > LevelReadOnly { if time.Since(info.FreeTime) > c.cfg.CacheActiveTime { ch.LevelDown(LevelReadOnly) } return } // 3. 卸载后的缓存,如果一段时间内没有被使用,则进行删除。 if info.Level <= LevelReadOnly { // 需要同时满足距上次使用时间和距上次卸载时间超过配置的时间,才可以删除 if time.Since(info.FreeTime) > c.cfg.CacheExpireTime && time.Since(info.ChangeLevelTime) > c.cfg.CacheExpireTime { // 如果文件已经同步到远端,则可以直接删除本地缓存 if info.MetaRevision == 0 && info.DataRevision == 0 { ch.Delete() } // 文件数据或者元数据有修改,但缓存等级是ReadOnly以下,意味着在之前检查是否需要上传时被判定为不需要上传 // 这种文件删除缓存记录即可(但会在扫描数据目录时再次被加载进来) node.RemoveSelf(true) } return } } // 扫描文件数据目录 func (c *Cache) scanningData() { ticker := time.NewTicker(c.cfg.ScanDataDirInterval) defer ticker.Stop() var walkTrace []*os.File var walkTraceComps []string for { select { case <-ticker.C: case <-c.cacheDone: return } startTime := time.Now() logger.Infof("begin scanning data dir") if len(walkTrace) == 0 { dir, err := os.Open(c.cfg.DataDir) if err != nil { logger.Warnf("open data dir: %v", err) continue } walkTrace = []*os.File{dir} walkTraceComps = []string{c.cfg.MetaDir} } const maxVisitCnt = 5000 const maxUntrackedFiles = 500 var untrackedFiles [][]string visitCnt := 0 // 一次最多遍历5000个文件(包括路径上的文件夹),一次最多添加500个未跟踪文件 for len(walkTrace) > 0 && visitCnt < maxVisitCnt && len(untrackedFiles) < maxUntrackedFiles { lastNode := walkTrace[len(walkTrace)-1] visitCnt++ e, err := lastNode.Readdir(1) if err == io.EOF { lastNode.Close() walkTrace = walkTrace[:len(walkTrace)-1] walkTraceComps = walkTraceComps[:len(walkTraceComps)-1] continue } if err != nil { logger.Warnf("read dir %v: %v", lastNode.Name(), err) lastNode.Close() walkTrace = walkTrace[:len(walkTrace)-1] walkTraceComps = walkTraceComps[:len(walkTraceComps)-1] continue } // 按Readdir函数的说法,不会存在len(e) == 0且err == nil的情况,但实际发生了 if len(e) == 0 { continue } if e[0].IsDir() { child, err := os.Open(filepath.Join(lastNode.Name(), e[0].Name())) if err != nil { logger.Warnf("open dir %v: %v", e[0].Name(), err) continue } walkTrace = append(walkTrace, child) walkTraceComps = append(walkTraceComps, e[0].Name()) continue } // 对于不在Package层级的文件,不跟踪 if len(walkTrace) <= 2 { continue } // 无条件加载缓存,可能会导致一些不需要被同步到云端的文件在缓存等级降到最低取消跟踪后,又重新被加载进来 // 不过由于扫描频率不高,所以问题不大 walkTraceComps = append(walkTraceComps, e[0].Name()) untrackedFiles = append(untrackedFiles, lo2.ArrayClone(walkTraceComps[1:])) walkTraceComps = walkTraceComps[:len(walkTraceComps)-1] } if len(untrackedFiles) > 0 { for _, comps := range untrackedFiles { ch := c.LoadReadOnlyFile(comps) if ch != nil { ch.Release() } } } logger.Infof("%v file visited, %v untracked files found, time: %v", visitCnt, len(untrackedFiles), time.Since(startTime)) } } func (c *Cache) doSync(pkgs []*syncPackage) { var uploadPkgs []*syncPackage var updateOnlyPkgs []*syncPackage for _, p := range pkgs { var updateOnly *syncPackage var upload *syncPackage for _, o := range p.upObjs { if o.reader != nil { if upload == nil { upload = &syncPackage{ bktName: p.bktName, pkgName: p.pkgName, } } upload.upObjs = append(upload.upObjs, o) } else { if updateOnly == nil { updateOnly = &syncPackage{ bktName: p.bktName, pkgName: p.pkgName, } } updateOnly.upObjs = append(updateOnly.upObjs, o) } } if upload != nil { uploadPkgs = append(uploadPkgs, upload) } if updateOnly != nil { updateOnlyPkgs = append(updateOnlyPkgs, updateOnly) } } // 先上传文件,再更新文件元数据。上传文件时会创建Package,这样后续更新元数据时就能查到Package。 if len(uploadPkgs) > 0 { c.doUploading(uploadPkgs) } if len(updateOnlyPkgs) > 0 { c.doUpdatingOnly(updateOnlyPkgs) } } func (c *Cache) doUpdatingOnly(pkgs []*syncPackage) { /// 1. 只是更新元数据,那么就只尝试查询Package var sucPkgs []*syncPackage var failedPkgs []*syncPackage for _, pkg := range pkgs { p, err := c.db.Package().GetByFullName(c.db.DefCtx(), pkg.bktName, pkg.pkgName) if err != nil { logger.Warnf("get package %v/%v: %v", pkg.bktName, pkg.pkgName, err) failedPkgs = append(failedPkgs, pkg) continue } pkg.pkg = p sucPkgs = append(sucPkgs, pkg) } /// 2. 对于创建失败的Package, 在锁的保护下取消上传状态 c.lock.Lock() for _, pkg := range failedPkgs { for _, obj := range pkg.upObjs { obj.cache.state.uploading = nil } } c.lock.Unlock() /// 3. 开始更新每个Package for _, p := range sucPkgs { pathes := make([]string, 0, len(p.upObjs)) modTimes := make([]time.Time, 0, len(p.upObjs)) for _, obj := range p.upObjs { pathes = append(pathes, clitypes.JoinObjectPath(obj.pathComps[2:]...)) modTimes = append(modTimes, obj.modTime) } err := c.db.Object().BatchUpdateUpdateTimeByPath(c.db.DefCtx(), p.pkg.PackageID, pathes, modTimes) if err != nil { logger.Warnf("batch update package %v/%v: %v", p.bktName, p.pkgName, err) c.lock.Lock() for _, obj := range p.upObjs { obj.cache.state.uploading = nil } c.lock.Unlock() continue } logger.Infof("update %v object in package %v/%v", len(p.upObjs), p.bktName, p.pkgName) // 登记上传结果 c.lock.Lock() for _, obj := range p.upObjs { obj.cache.state.uploading = nil obj.cache.RevisionUploaded(0, obj.metaRevision) } c.lock.Unlock() } } func (c *Cache) doUploading(pkgs []*syncPackage) { /// 1. 先尝试创建Package var sucPkgs []*syncPackage var failedPkgs []*syncPackage for _, pkg := range pkgs { p, err := db.DoTx21(c.db, c.db.Package().TryCreateAll, pkg.bktName, pkg.pkgName) if err != nil { logger.Warnf("try create package %v/%v: %v", pkg.bktName, pkg.pkgName, err) failedPkgs = append(failedPkgs, pkg) continue } pkg.pkg = p sucPkgs = append(sucPkgs, pkg) } /// 2. 对于创建失败的Package,直接关闭文件,不进行上传 // 在锁的保护下取消上传状态 c.lock.Lock() for _, pkg := range failedPkgs { for _, obj := range pkg.upObjs { obj.cache.state.uploading = nil } } c.lock.Unlock() for _, pkg := range failedPkgs { for _, obj := range pkg.upObjs { obj.reader.Close() } } /// 3. 开始上传每个Package for _, p := range sucPkgs { upder, err := c.uploader.BeginUpdate(p.pkg.PackageID, 0, nil, nil) if err != nil { logger.Warnf("begin upload package %v/%v: %v", p.bktName, p.pkgName, err) // 取消上传状态 c.lock.Lock() for _, obj := range p.upObjs { obj.cache.state.uploading = nil } c.lock.Unlock() for _, obj := range p.upObjs { obj.reader.Close() } continue } upSuc := 0 upSucAmt := int64(0) upFailed := 0 upStartTime := time.Now() logger.Infof("begin uploading %v objects to package %v/%v", len(p.upObjs), p.bktName, p.pkgName) for _, o := range p.upObjs { rd := cacheFileReader{ rw: o.reader, } counter := io2.Counter(&rd) err = upder.Upload(clitypes.PathFromComps(o.pathComps[2:]...), counter, uploader.UploadOption{ CreateTime: o.modTime, }) if err != nil { logger.Warnf("upload object %v: %v", o.pathComps, err) upFailed++ continue } o.isSuccess = true upSuc++ upSucAmt += counter.Count() } // 在锁保护下登记上传结果 c.lock.Lock() upCancel := 0 upRename := 0 // 检查是否有文件在上传期间发生了变化 var sucObjs []*uploadingObject for _, o := range p.upObjs { o.cache.state.uploading = nil if !o.isSuccess { continue } oldPath := clitypes.JoinObjectPath(o.pathComps[2:]...) newPath := clitypes.JoinObjectPath(o.cache.pathComps[2:]...) if o.isDeleted { upder.CancelObject(oldPath) upCancel++ continue } // 如果对象移动到了另一个Package,那么也要取消上传 if !lo2.ArrayEquals(o.pathComps[:2], o.cache.pathComps[:2]) { upder.CancelObject(oldPath) upCancel++ continue } // 只有仍在同Package内移动的对象才能直接重命名 if newPath != oldPath { upder.RenameObject(oldPath, newPath) upRename++ } sucObjs = append(sucObjs, o) } _, err = upder.Commit() if err != nil { logger.Warnf("commit update package %v/%v: %v", p.bktName, p.pkgName, err) } else { for _, obj := range sucObjs { obj.cache.RevisionUploaded(obj.reader.revision, obj.metaRevision) } upTime := time.Since(upStartTime) logger.Infof("upload package %v/%v in %v, upload: %v, size: %v, speed: %v/s, cancel: %v, rename: %v", p.bktName, p.pkgName, upTime, upSuc, upSucAmt, bytesize.New(float64(upSucAmt)/upTime.Seconds()), upCancel, upRename) } c.lock.Unlock() // 关闭文件会影响refCount,所以无论是上传失败还是上传成功,都会在等待一段时间后才进行下一阶段的操作 for _, obj := range p.upObjs { obj.reader.Close() } } } type cacheFileReader struct { rw *CacheFileHandle pos int64 } func (r *cacheFileReader) Read(p []byte) (int, error) { n, err := r.rw.ReadAt(p, r.pos) r.pos += int64(n) if err != nil { return n, err } if n != len(p) { return n, io.EOF } return n, nil }