You can not select more than 25 topics Topics must start with a chinese character,a letter or number, can include dashes ('-') and can be up to 35 characters long.

cache.go 27 kB

8 months ago
8 months ago
8 months ago
8 months ago
8 months ago
8 months ago
8 months ago
8 months ago
7 months ago
7 months ago
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122
  1. package cache
  2. import (
  3. "errors"
  4. "io"
  5. "os"
  6. "path/filepath"
  7. "sync"
  8. "syscall"
  9. "time"
  10. "github.com/inhies/go-bytesize"
  11. "github.com/samber/lo"
  12. "gitlink.org.cn/cloudream/common/pkgs/logger"
  13. "gitlink.org.cn/cloudream/common/pkgs/trie"
  14. "gitlink.org.cn/cloudream/common/utils/io2"
  15. "gitlink.org.cn/cloudream/common/utils/lo2"
  16. "gitlink.org.cn/cloudream/common/utils/sort2"
  17. "gitlink.org.cn/cloudream/jcs-pub/client/internal/db"
  18. "gitlink.org.cn/cloudream/jcs-pub/client/internal/downloader"
  19. "gitlink.org.cn/cloudream/jcs-pub/client/internal/mount/config"
  20. "gitlink.org.cn/cloudream/jcs-pub/client/internal/mount/fuse"
  21. "gitlink.org.cn/cloudream/jcs-pub/client/internal/uploader"
  22. clitypes "gitlink.org.cn/cloudream/jcs-pub/client/types"
  23. )
  24. const (
  25. SyncFilterConfigName = ".cds.sync.filter"
  26. )
  27. type CacheEntry interface {
  28. fuse.FsEntry
  29. // 在虚拟文件系统中的路径,即不包含缓存目录的路径
  30. PathComps() []string
  31. }
  32. type CacheEntryInfo struct {
  33. PathComps []string
  34. Size int64
  35. Perm os.FileMode
  36. ModTime time.Time
  37. IsDir bool
  38. // 元数据版本号
  39. MetaRevision int
  40. // 文件数据版本号
  41. DataRevision int
  42. // 引用计数
  43. RefCount int
  44. // 上次引用计数归零的时间,也即上次使用时间
  45. FreeTime time.Time
  46. // 缓存等级
  47. Level CacheLevel
  48. // 缓存等级改变时间
  49. ChangeLevelTime time.Time
  50. }
  51. type Cache struct {
  52. cfg *config.Config
  53. db *db.DB
  54. uploader *uploader.Uploader
  55. downloader *downloader.Downloader
  56. lock *sync.RWMutex
  57. cacheDone chan any
  58. doFullScan chan any
  59. activeCache *trie.Trie[*CacheFile]
  60. syncFilter *SyncFilter
  61. }
  62. func NewCache(cfg *config.Config, db *db.DB, uploader *uploader.Uploader, downloader *downloader.Downloader) *Cache {
  63. return &Cache{
  64. cfg: cfg,
  65. db: db,
  66. uploader: uploader,
  67. downloader: downloader,
  68. lock: &sync.RWMutex{},
  69. cacheDone: make(chan any, 1),
  70. doFullScan: make(chan any, 1),
  71. activeCache: trie.NewTrie[*CacheFile](),
  72. syncFilter: NewSyncFilter(),
  73. }
  74. }
  75. func (c *Cache) Start() {
  76. c.syncFilter.ReloadConfig(c.GetCacheDataPath(SyncFilterConfigName))
  77. go c.scanningCache()
  78. go c.scanningData()
  79. }
  80. func (c *Cache) Stop() {
  81. close(c.cacheDone)
  82. }
  83. func (c *Cache) GetCacheDataPath(comps ...string) string {
  84. comps2 := make([]string, len(comps)+1)
  85. comps2[0] = c.cfg.DataDir
  86. copy(comps2[1:], comps)
  87. return filepath.Join(comps2...)
  88. }
  89. func (c *Cache) GetCacheMetaPath(comps ...string) string {
  90. comps2 := make([]string, len(comps)+1)
  91. comps2[0] = c.cfg.MetaDir
  92. copy(comps2[1:], comps)
  93. return filepath.Join(comps2...)
  94. }
  95. func (c *Cache) ReloadSyncFilter() {
  96. c.syncFilter.ReloadConfig(c.GetCacheDataPath(SyncFilterConfigName))
  97. }
  98. func (c *Cache) Dump() CacheStatus {
  99. c.lock.RLock()
  100. defer c.lock.RUnlock()
  101. var activeFiles []CacheFileStatus
  102. c.activeCache.Iterate(func(path []string, node *trie.Node[*CacheFile], isWordNode bool) trie.VisitCtrl {
  103. if node.Value == nil {
  104. return trie.VisitContinue
  105. }
  106. info := node.Value.Info()
  107. activeFiles = append(activeFiles, CacheFileStatus{
  108. Path: filepath.Join(path...),
  109. RefCount: info.RefCount,
  110. Level: info.Level.String(),
  111. IsUploading: node.Value.state.uploading != nil,
  112. })
  113. return trie.VisitContinue
  114. })
  115. return CacheStatus{
  116. ActiveFiles: activeFiles,
  117. }
  118. }
  119. func (c *Cache) ReclaimSpace() {
  120. select {
  121. case c.doFullScan <- nil:
  122. default:
  123. }
  124. }
  125. // 获取指定位置的缓存条目信息。如果路径不存在,则返回nil。
  126. func (c *Cache) Stat(pathComps []string) *CacheEntryInfo {
  127. c.lock.RLock()
  128. defer c.lock.RUnlock()
  129. node, ok := c.activeCache.WalkEnd(pathComps)
  130. if ok && node.Value != nil {
  131. info := node.Value.Info()
  132. return &info
  133. }
  134. dataPath := c.GetCacheDataPath(pathComps...)
  135. stat, err := os.Stat(dataPath)
  136. if err != nil {
  137. // TODO 日志记录
  138. return nil
  139. }
  140. if stat.IsDir() {
  141. info, err := loadCacheDirInfo(c, pathComps, stat)
  142. if err != nil {
  143. return nil
  144. }
  145. return info
  146. }
  147. info, err := loadCacheFileInfo(c, pathComps, stat)
  148. if err != nil {
  149. return nil
  150. }
  151. return info
  152. }
  153. // 创建一个缓存文件。如果文件已经存在,则会覆盖已有文件。如果加载过程中发生了错误,或者目标位置是一个目录,则会返回nil。
  154. //
  155. // 记得使用Release减少引用计数
  156. func (c *Cache) CreateFile(pathComps []string) *CacheFile {
  157. c.lock.Lock()
  158. defer c.lock.Unlock()
  159. node, ok := c.activeCache.WalkEnd(pathComps)
  160. if ok && node.Value != nil {
  161. node.Value.Delete()
  162. if node.Value.state.uploading != nil {
  163. node.Value.state.uploading.isDeleted = true
  164. }
  165. }
  166. ch, err := createNewCacheFile(c, pathComps)
  167. if err != nil {
  168. logger.Warnf("create new cache file %v: %v", pathComps, err)
  169. return nil
  170. }
  171. ch.IncRef()
  172. c.activeCache.CreateWords(pathComps).Value = ch
  173. logger.Debugf("create new cache file %v", pathComps)
  174. return ch
  175. }
  176. // 尝试加载缓存文件,如果文件不存在,则使用obj的信息创建一个新缓存文件,而如果obj为nil,那么会返回nil。
  177. //
  178. // 记得使用Release减少引用计数
  179. func (c *Cache) LoadFile(pathComps []string, obj *clitypes.Object) *CacheFile {
  180. c.lock.Lock()
  181. defer c.lock.Unlock()
  182. node, ok := c.activeCache.WalkEnd(pathComps)
  183. if ok && node.Value != nil {
  184. node.Value.IncRef()
  185. return node.Value
  186. }
  187. ch, err := loadCacheFile(c, pathComps)
  188. if err == nil {
  189. ch.remoteObj = obj
  190. ch.IncRef()
  191. c.activeCache.CreateWords(pathComps).Value = ch
  192. logger.Debugf("load cache %v", pathComps)
  193. return ch
  194. }
  195. if !os.IsNotExist(err) {
  196. // TODO 日志记录
  197. logger.Warnf("load cache %v: %v", pathComps, err)
  198. return nil
  199. }
  200. if obj == nil {
  201. return nil
  202. }
  203. ch, err = newCacheFileFromObject(c, pathComps, obj)
  204. if err != nil {
  205. logger.Warnf("create cache %v from object: %v", pathComps, err)
  206. return nil
  207. }
  208. ch.IncRef()
  209. c.activeCache.CreateWords(pathComps).Value = ch
  210. logger.Debugf("create cache %v from object %v", pathComps, obj.ObjectID)
  211. return ch
  212. }
  213. // 仅加载文件的元数据,如果文件不存在,则返回nil
  214. //
  215. // 记得使用Release减少引用计数
  216. func (c *Cache) LoadReadOnlyFile(pathComps []string) *CacheFile {
  217. c.lock.Lock()
  218. defer c.lock.Unlock()
  219. node, ok := c.activeCache.WalkEnd(pathComps)
  220. if ok && node.Value != nil {
  221. node.Value.IncRef()
  222. return node.Value
  223. }
  224. ch, err := loadReadOnlyCacheFile(c, pathComps)
  225. if err == nil {
  226. ch.IncRef()
  227. c.activeCache.CreateWords(pathComps).Value = ch
  228. logger.Debugf("load cache %v", pathComps)
  229. return ch
  230. }
  231. if !os.IsNotExist(err) {
  232. // TODO 日志记录
  233. logger.Warnf("load cache %v: %v", pathComps, err)
  234. return nil
  235. }
  236. return nil
  237. }
  238. // 创建一个缓存目录。如果目录已经存在,则会重置目录属性。如果加载过程中发生了错误,或者目标位置是一个文件,则会返回nil
  239. func (c *Cache) CreateDir(pathComps []string) *CacheDir {
  240. c.lock.Lock()
  241. defer c.lock.Unlock()
  242. ch, err := createNewCacheDir(c, pathComps)
  243. if err != nil {
  244. logger.Warnf("create cache dir: %v", err)
  245. return nil
  246. }
  247. return ch
  248. }
  249. type CreateDirOption struct {
  250. ModTime time.Time
  251. }
  252. // 加载指定缓存目录,如果目录不存在,则使用createOpt选项创建目录,而如果createOpt为nil,那么会返回nil。
  253. func (c *Cache) LoadDir(pathComps []string, createOpt *CreateDirOption) *CacheDir {
  254. c.lock.Lock()
  255. defer c.lock.Unlock()
  256. ch, err := loadCacheDir(c, pathComps)
  257. if err == nil {
  258. return ch
  259. }
  260. if !os.IsNotExist(err) {
  261. // TODO 日志记录
  262. return nil
  263. }
  264. if createOpt == nil {
  265. return nil
  266. }
  267. // 创建目录
  268. ch, err = makeCacheDirFromOption(c, pathComps, *createOpt)
  269. if err != nil {
  270. // TODO 日志记录
  271. return nil
  272. }
  273. return ch
  274. }
  275. // 加载指定路径下的所有缓存条目信息
  276. func (c *Cache) StatMany(pathComps []string) []CacheEntryInfo {
  277. c.lock.RLock()
  278. defer c.lock.RUnlock()
  279. var infos []CacheEntryInfo
  280. exists := make(map[string]bool)
  281. node, ok := c.activeCache.WalkEnd(pathComps)
  282. if ok {
  283. for name, child := range node.WordNexts {
  284. if child.Value != nil {
  285. infos = append(infos, child.Value.Info())
  286. exists[name] = true
  287. }
  288. }
  289. }
  290. osEns, err := os.ReadDir(c.GetCacheDataPath(pathComps...))
  291. if err != nil {
  292. return nil
  293. }
  294. for _, e := range osEns {
  295. if exists[e.Name()] {
  296. continue
  297. }
  298. info, err := e.Info()
  299. if err != nil {
  300. continue
  301. }
  302. if e.IsDir() {
  303. info, err := loadCacheDirInfo(c, append(lo2.ArrayClone(pathComps), e.Name()), info)
  304. if err != nil {
  305. continue
  306. }
  307. infos = append(infos, *info)
  308. } else {
  309. info, err := loadCacheFileInfo(c, append(lo2.ArrayClone(pathComps), e.Name()), info)
  310. if err != nil {
  311. continue
  312. }
  313. infos = append(infos, *info)
  314. }
  315. }
  316. return infos
  317. }
  318. // 删除指定路径的缓存文件或目录。删除目录时如果目录不为空,则会报错。
  319. func (c *Cache) Remove(pathComps []string) error {
  320. c.lock.Lock()
  321. defer c.lock.Unlock()
  322. node, ok := c.activeCache.WalkEnd(pathComps)
  323. if ok {
  324. if len(node.WordNexts) > 0 {
  325. return fuse.ErrNotEmpty
  326. }
  327. if node.Value != nil {
  328. node.Value.Delete()
  329. if node.Value.state.uploading != nil {
  330. node.Value.state.uploading.isDeleted = true
  331. }
  332. }
  333. node.RemoveSelf(true)
  334. logger.Debugf("active cache %v removed", pathComps)
  335. return nil
  336. }
  337. metaPath := c.GetCacheMetaPath(pathComps...)
  338. dataPath := c.GetCacheDataPath(pathComps...)
  339. os.Remove(metaPath)
  340. err := os.Remove(dataPath)
  341. if err == nil || os.IsNotExist(err) {
  342. logger.Debugf("local cache %v removed", pathComps)
  343. return nil
  344. }
  345. if errors.Is(err, syscall.ENOTEMPTY) {
  346. return fuse.ErrNotEmpty
  347. }
  348. return err
  349. }
  350. // 移动指定路径的缓存文件或目录到新的路径。如果目标路径已经存在,则会报错。
  351. //
  352. // 如果移动成功,则返回移动后的缓存文件或目录。如果文件或目录不存在,则返回nil。
  353. func (c *Cache) Move(pathComps []string, newPathComps []string) error {
  354. c.lock.Lock()
  355. defer c.lock.Unlock()
  356. _, ok := c.activeCache.WalkEnd(newPathComps)
  357. if ok {
  358. return fuse.ErrExists
  359. }
  360. newMetaPath := c.GetCacheMetaPath(newPathComps...)
  361. newDataPath := c.GetCacheDataPath(newPathComps...)
  362. _, err := os.Stat(newDataPath)
  363. if err == nil {
  364. return fuse.ErrExists
  365. }
  366. if !os.IsNotExist(err) {
  367. return err
  368. }
  369. oldMetaPath := c.GetCacheMetaPath(pathComps...)
  370. oldDataPath := c.GetCacheDataPath(pathComps...)
  371. // 确定源文件存在,再进行后面的操作
  372. _, err = os.Stat(oldDataPath)
  373. if err != nil {
  374. if os.IsNotExist(err) {
  375. return fuse.ErrNotExists
  376. }
  377. return err
  378. }
  379. // 创建父目录是为了解决被移动的文件不在本地的问题。
  380. // 但同时也导致了如果目的路径的父目录确实不存在,这里会意外的创建了这个目录
  381. newMetaDir := filepath.Dir(newMetaPath)
  382. err = os.MkdirAll(newMetaDir, 0755)
  383. if err != nil {
  384. return err
  385. }
  386. newDataDir := filepath.Dir(newDataPath)
  387. err = os.MkdirAll(newDataDir, 0755)
  388. if err != nil {
  389. return err
  390. }
  391. // 每个缓存文件持有meta文件和data文件的句柄,所以这里移动文件,不影响句柄的使用。
  392. // 只能忽略这里的错误
  393. os.Rename(oldMetaPath, newMetaPath)
  394. os.Rename(oldDataPath, newDataPath)
  395. // 更新缓存
  396. oldNode, ok := c.activeCache.WalkEnd(pathComps)
  397. if ok {
  398. newNode := c.activeCache.CreateWords(newPathComps)
  399. newNode.Value = oldNode.Value
  400. newNode.WordNexts = oldNode.WordNexts
  401. oldNode.RemoveSelf(false)
  402. if newNode.Value != nil {
  403. newNode.Value.Move(newPathComps)
  404. }
  405. newNode.Iterate(func(path []string, node *trie.Node[*CacheFile], isWordNode bool) trie.VisitCtrl {
  406. if node.Value != nil {
  407. node.Value.Move(lo2.AppendNew(newPathComps, path...))
  408. }
  409. return trie.VisitContinue
  410. })
  411. }
  412. logger.Debugf("cache moved: %v -> %v", pathComps, newPathComps)
  413. return nil
  414. }
  415. type syncPackage struct {
  416. bktName string
  417. pkgName string
  418. pkg clitypes.Package
  419. upObjs []*uploadingObject
  420. }
  421. type uploadingObject struct {
  422. pathComps []string
  423. cache *CacheFile
  424. reader *CacheFileHandle
  425. modTime time.Time
  426. metaRevision int
  427. isDeleted bool
  428. isSuccess bool
  429. }
  430. type packageFullName struct {
  431. bktName string
  432. pkgName string
  433. }
  434. func (c *Cache) scanningCache() {
  435. ticker := time.NewTicker(time.Second * 5)
  436. defer ticker.Stop()
  437. lastScanPath := []string{}
  438. nextFullScan := false
  439. for {
  440. select {
  441. case _, ok := <-c.cacheDone:
  442. if !ok {
  443. return
  444. }
  445. case <-ticker.C:
  446. case <-c.doFullScan:
  447. nextFullScan = true
  448. }
  449. // 每完成一轮快速的渐进全量扫描,就进行一次即时全量扫描
  450. if nextFullScan {
  451. c.fullScan()
  452. nextFullScan = false
  453. continue
  454. }
  455. lastScanPath = c.fastScan(lastScanPath)
  456. if len(lastScanPath) == 0 {
  457. nextFullScan = true
  458. }
  459. }
  460. }
  461. // 全量扫描,主要是检查总缓存大小是否超标
  462. func (c *Cache) fullScan() {
  463. log := logger.WithField("Mod", "Mount")
  464. startTime := time.Now()
  465. log.Debug("begin full scan")
  466. defer func() {
  467. log.Debugf("full scan done, time: %v", time.Since(startTime))
  468. }()
  469. c.lock.Lock()
  470. defer c.lock.Unlock()
  471. totalCacheSize := int64(0)
  472. type readOnlyCache struct {
  473. Info CacheEntryInfo
  474. Node *trie.Node[*CacheFile]
  475. }
  476. var readOnlyCaches []readOnlyCache
  477. c.activeCache.Iterate(func(path []string, node *trie.Node[*CacheFile], isWordNode bool) trie.VisitCtrl {
  478. ch := node.Value
  479. if ch == nil {
  480. return trie.VisitContinue
  481. }
  482. info := ch.Info()
  483. if info.Level > LevelReadOnly {
  484. return trie.VisitContinue
  485. }
  486. if info.DataRevision > 0 || info.MetaRevision > 0 {
  487. return trie.VisitContinue
  488. }
  489. readOnlyCaches = append(readOnlyCaches, readOnlyCache{
  490. Info: info,
  491. Node: node,
  492. })
  493. totalCacheSize += info.Size
  494. return trie.VisitContinue
  495. })
  496. // 如果总缓存文件大小超过限制,那么就从最早被使用的开始删除
  497. if c.cfg.MaxCacheSize > 0 {
  498. needReclaim := totalCacheSize - c.cfg.MaxCacheSize
  499. if needReclaim > 0 {
  500. readOnlyCaches = sort2.Sort(readOnlyCaches, func(left, right readOnlyCache) int {
  501. return left.Info.FreeTime.Compare(right.Info.FreeTime)
  502. })
  503. reclaimed := int64(0)
  504. rmCnt := 0
  505. for _, rc := range readOnlyCaches {
  506. rc.Node.Value.Delete()
  507. rc.Node.RemoveSelf(true)
  508. needReclaim -= rc.Info.Size
  509. reclaimed += rc.Info.Size
  510. rmCnt += 1
  511. if needReclaim <= 0 {
  512. break
  513. }
  514. }
  515. log.Infof("%v cache file removed, reclaimed %v bytes, total cache size: %v", rmCnt, reclaimed, totalCacheSize-reclaimed)
  516. }
  517. }
  518. // TODO 还可以做点其他的检查,比如文件句柄数
  519. }
  520. // 快速扫描,每次只扫描一部分节点,做的事情会繁重一点
  521. func (c *Cache) fastScan(lastScanPath []string) []string {
  522. c.lock.Lock()
  523. uploadingPkgs := make(map[packageFullName]*syncPackage)
  524. visitCnt := 0
  525. visitBreak := false
  526. node, _ := c.activeCache.WalkEnd(lastScanPath)
  527. node.Iterate(func(path []string, node *trie.Node[*CacheFile], isWordNode bool) trie.VisitCtrl {
  528. ch := node.Value
  529. if ch == nil {
  530. return trie.VisitContinue
  531. }
  532. info := ch.Info()
  533. if info.RefCount > 0 {
  534. logger.Debugf("skip cache %v, refCount: %v", path, info.RefCount)
  535. return trie.VisitContinue
  536. }
  537. visitCnt++
  538. c.visitNode(path, node, ch, info, uploadingPkgs)
  539. // 每次最多遍历500个节点,防止占用锁太久
  540. if visitCnt > 500 {
  541. lastScanPath = lo2.ArrayClone(path)
  542. visitBreak = true
  543. return trie.VisitBreak
  544. }
  545. return trie.VisitContinue
  546. })
  547. if !visitBreak {
  548. lastScanPath = []string{}
  549. }
  550. c.lock.Unlock()
  551. if len(uploadingPkgs) > 0 {
  552. go c.doSync(lo.Values(uploadingPkgs))
  553. }
  554. return lastScanPath
  555. }
  556. func (c *Cache) visitNode(path []string, node *trie.Node[*CacheFile], ch *CacheFile, info CacheEntryInfo, uploadingPkgs map[packageFullName]*syncPackage) {
  557. shouldUpload := true
  558. // 不存放在Package里的文件,不需要上传
  559. if len(ch.pathComps) <= 2 {
  560. shouldUpload = false
  561. }
  562. if !c.syncFilter.ShouldSync(ch.pathComps, info.Size) {
  563. shouldUpload = false
  564. }
  565. // 1. 本地缓存被修改了,如果一段时间内没有被使用,则进行上传
  566. if shouldUpload && (info.DataRevision > 0 || info.MetaRevision > 0) {
  567. if time.Since(info.FreeTime) < c.cfg.UploadPendingTime {
  568. return
  569. }
  570. if ch.state.uploading != nil {
  571. return
  572. }
  573. // 上传文件需要完全加载级别的缓存等级
  574. if !ch.LevelUp(LevelComplete) {
  575. return
  576. }
  577. fullName := packageFullName{ch.pathComps[0], ch.pathComps[1]}
  578. pkg, ok := uploadingPkgs[fullName]
  579. if !ok {
  580. pkg = &syncPackage{
  581. bktName: ch.pathComps[0],
  582. pkgName: ch.pathComps[1],
  583. }
  584. uploadingPkgs[fullName] = pkg
  585. }
  586. up := &uploadingObject{
  587. pathComps: lo2.ArrayClone(ch.pathComps),
  588. cache: ch,
  589. }
  590. pkg.upObjs = append(pkg.upObjs, up)
  591. ch.state.uploading = up
  592. if info.DataRevision > 0 {
  593. up.reader = ch.OpenReadWhenScanning()
  594. }
  595. if info.MetaRevision > 0 {
  596. up.modTime = info.ModTime
  597. up.metaRevision = info.MetaRevision
  598. }
  599. return
  600. }
  601. // 2. 本地缓存没有被修改,如果一段时间内没有被使用,则进行卸载
  602. if info.Level > LevelReadOnly {
  603. if time.Since(info.FreeTime) > c.cfg.CacheActiveTime {
  604. ch.LevelDown(LevelReadOnly)
  605. }
  606. return
  607. }
  608. // 3. 卸载后的缓存,如果一段时间内没有被使用,则进行删除。
  609. if info.Level <= LevelReadOnly {
  610. // 需要同时满足距上次使用时间和距上次卸载时间超过配置的时间,才可以删除
  611. if time.Since(info.FreeTime) > c.cfg.CacheExpireTime && time.Since(info.ChangeLevelTime) > c.cfg.CacheExpireTime {
  612. // 如果文件已经同步到远端,则可以直接删除本地缓存
  613. if info.MetaRevision == 0 && info.DataRevision == 0 {
  614. ch.Delete()
  615. }
  616. // 文件数据或者元数据有修改,但缓存等级是ReadOnly以下,意味着在之前检查是否需要上传时被判定为不需要上传
  617. // 这种文件删除缓存记录即可(但会在扫描数据目录时再次被加载进来)
  618. node.RemoveSelf(true)
  619. }
  620. return
  621. }
  622. }
  623. // 扫描文件数据目录
  624. func (c *Cache) scanningData() {
  625. ticker := time.NewTicker(c.cfg.ScanDataDirInterval)
  626. defer ticker.Stop()
  627. var walkTrace []*os.File
  628. var walkTraceComps []string
  629. for {
  630. select {
  631. case <-ticker.C:
  632. case <-c.cacheDone:
  633. return
  634. }
  635. startTime := time.Now()
  636. logger.Infof("begin scanning data dir")
  637. if len(walkTrace) == 0 {
  638. dir, err := os.Open(c.cfg.DataDir)
  639. if err != nil {
  640. logger.Warnf("open data dir: %v", err)
  641. continue
  642. }
  643. walkTrace = []*os.File{dir}
  644. walkTraceComps = []string{c.cfg.MetaDir}
  645. }
  646. const maxVisitCnt = 5000
  647. const maxUntrackedFiles = 500
  648. var untrackedFiles [][]string
  649. visitCnt := 0
  650. // 一次最多遍历5000个文件(包括路径上的文件夹),一次最多添加500个未跟踪文件
  651. for len(walkTrace) > 0 && visitCnt < maxVisitCnt && len(untrackedFiles) < maxUntrackedFiles {
  652. lastNode := walkTrace[len(walkTrace)-1]
  653. visitCnt++
  654. e, err := lastNode.Readdir(1)
  655. if err == io.EOF {
  656. lastNode.Close()
  657. walkTrace = walkTrace[:len(walkTrace)-1]
  658. walkTraceComps = walkTraceComps[:len(walkTraceComps)-1]
  659. continue
  660. }
  661. if err != nil {
  662. logger.Warnf("read dir %v: %v", lastNode.Name(), err)
  663. lastNode.Close()
  664. walkTrace = walkTrace[:len(walkTrace)-1]
  665. walkTraceComps = walkTraceComps[:len(walkTraceComps)-1]
  666. continue
  667. }
  668. if e[0].IsDir() {
  669. child, err := os.Open(filepath.Join(lastNode.Name(), e[0].Name()))
  670. if err != nil {
  671. logger.Warnf("open dir %v: %v", e[0].Name(), err)
  672. continue
  673. }
  674. walkTrace = append(walkTrace, child)
  675. walkTraceComps = append(walkTraceComps, e[0].Name())
  676. continue
  677. }
  678. // 对于不在Package层级的文件,不跟踪
  679. if len(walkTrace) <= 2 {
  680. continue
  681. }
  682. // 无条件加载缓存,可能会导致一些不需要被同步到云端的文件在缓存等级降到最低取消跟踪后,又重新被加载进来
  683. // 不过由于扫描频率不高,所以问题不大
  684. walkTraceComps = append(walkTraceComps, e[0].Name())
  685. // 第一个元素是data目录,所以要从第二个元素开始
  686. if !c.syncFilter.ShouldSync(walkTraceComps[1:], e[0].Size()) {
  687. walkTraceComps = walkTraceComps[:len(walkTraceComps)-1]
  688. continue
  689. }
  690. untrackedFiles = append(untrackedFiles, lo2.ArrayClone(walkTraceComps[1:]))
  691. walkTraceComps = walkTraceComps[:len(walkTraceComps)-1]
  692. }
  693. if len(untrackedFiles) > 0 {
  694. for _, comps := range untrackedFiles {
  695. ch := c.LoadReadOnlyFile(comps)
  696. if ch != nil {
  697. ch.Release()
  698. }
  699. }
  700. }
  701. logger.Infof("%v file visited, %v untracked files found, time: %v", visitCnt, len(untrackedFiles), time.Since(startTime))
  702. }
  703. }
  704. func (c *Cache) doSync(pkgs []*syncPackage) {
  705. var uploadPkgs []*syncPackage
  706. var updateOnlyPkgs []*syncPackage
  707. for _, p := range pkgs {
  708. var updateOnly *syncPackage
  709. var upload *syncPackage
  710. for _, o := range p.upObjs {
  711. if o.reader != nil {
  712. if upload == nil {
  713. upload = &syncPackage{
  714. bktName: p.bktName,
  715. pkgName: p.pkgName,
  716. }
  717. }
  718. upload.upObjs = append(upload.upObjs, o)
  719. } else {
  720. if updateOnly == nil {
  721. updateOnly = &syncPackage{
  722. bktName: p.bktName,
  723. pkgName: p.pkgName,
  724. }
  725. }
  726. updateOnly.upObjs = append(updateOnly.upObjs, o)
  727. }
  728. }
  729. if upload != nil {
  730. uploadPkgs = append(uploadPkgs, upload)
  731. }
  732. if updateOnly != nil {
  733. updateOnlyPkgs = append(updateOnlyPkgs, updateOnly)
  734. }
  735. }
  736. // 先上传文件,再更新文件元数据。上传文件时会创建Package,这样后续更新元数据时就能查到Package。
  737. if len(uploadPkgs) > 0 {
  738. c.doUploading(uploadPkgs)
  739. }
  740. if len(updateOnlyPkgs) > 0 {
  741. c.doUpdatingOnly(updateOnlyPkgs)
  742. }
  743. }
  744. func (c *Cache) doUpdatingOnly(pkgs []*syncPackage) {
  745. /// 1. 只是更新元数据,那么就只尝试查询Package
  746. var sucPkgs []*syncPackage
  747. var failedPkgs []*syncPackage
  748. for _, pkg := range pkgs {
  749. p, err := c.db.Package().GetByFullName(c.db.DefCtx(), pkg.bktName, pkg.pkgName)
  750. if err != nil {
  751. logger.Warnf("get package %v/%v: %v", pkg.bktName, pkg.pkgName, err)
  752. failedPkgs = append(failedPkgs, pkg)
  753. continue
  754. }
  755. pkg.pkg = p
  756. sucPkgs = append(sucPkgs, pkg)
  757. }
  758. /// 2. 对于创建失败的Package, 在锁的保护下取消上传状态
  759. c.lock.Lock()
  760. for _, pkg := range failedPkgs {
  761. for _, obj := range pkg.upObjs {
  762. obj.cache.state.uploading = nil
  763. }
  764. }
  765. c.lock.Unlock()
  766. /// 3. 开始更新每个Package
  767. for _, p := range sucPkgs {
  768. pathes := make([]string, 0, len(p.upObjs))
  769. modTimes := make([]time.Time, 0, len(p.upObjs))
  770. for _, obj := range p.upObjs {
  771. pathes = append(pathes, clitypes.JoinObjectPath(obj.pathComps[2:]...))
  772. modTimes = append(modTimes, obj.modTime)
  773. }
  774. err := c.db.Object().BatchUpdateUpdateTimeByPath(c.db.DefCtx(), p.pkg.PackageID, pathes, modTimes)
  775. if err != nil {
  776. logger.Warnf("batch update package %v/%v: %v", p.bktName, p.pkgName, err)
  777. c.lock.Lock()
  778. for _, obj := range p.upObjs {
  779. obj.cache.state.uploading = nil
  780. }
  781. c.lock.Unlock()
  782. continue
  783. }
  784. logger.Infof("update %v object in package %v/%v", len(p.upObjs), p.bktName, p.pkgName)
  785. // 登记上传结果
  786. c.lock.Lock()
  787. for _, obj := range p.upObjs {
  788. obj.cache.state.uploading = nil
  789. obj.cache.RevisionUploaded(0, obj.metaRevision)
  790. }
  791. c.lock.Unlock()
  792. }
  793. }
  794. func (c *Cache) doUploading(pkgs []*syncPackage) {
  795. /// 1. 先尝试创建Package
  796. var sucPkgs []*syncPackage
  797. var failedPkgs []*syncPackage
  798. for _, pkg := range pkgs {
  799. p, err := db.DoTx21(c.db, c.db.Package().TryCreateAll, pkg.bktName, pkg.pkgName)
  800. if err != nil {
  801. logger.Warnf("try create package %v/%v: %v", pkg.bktName, pkg.pkgName, err)
  802. failedPkgs = append(failedPkgs, pkg)
  803. continue
  804. }
  805. pkg.pkg = p
  806. sucPkgs = append(sucPkgs, pkg)
  807. }
  808. /// 2. 对于创建失败的Package,直接关闭文件,不进行上传
  809. // 在锁的保护下取消上传状态
  810. c.lock.Lock()
  811. for _, pkg := range failedPkgs {
  812. for _, obj := range pkg.upObjs {
  813. obj.cache.state.uploading = nil
  814. }
  815. }
  816. c.lock.Unlock()
  817. for _, pkg := range failedPkgs {
  818. for _, obj := range pkg.upObjs {
  819. obj.reader.Close()
  820. }
  821. }
  822. /// 3. 开始上传每个Package
  823. for _, p := range sucPkgs {
  824. upder, err := c.uploader.BeginUpdate(p.pkg.PackageID, 0, nil, nil)
  825. if err != nil {
  826. logger.Warnf("begin upload package %v/%v: %v", p.bktName, p.pkgName, err)
  827. // 取消上传状态
  828. c.lock.Lock()
  829. for _, obj := range p.upObjs {
  830. obj.cache.state.uploading = nil
  831. }
  832. c.lock.Unlock()
  833. for _, obj := range p.upObjs {
  834. obj.reader.Close()
  835. }
  836. continue
  837. }
  838. upSuc := 0
  839. upSucAmt := int64(0)
  840. upFailed := 0
  841. upStartTime := time.Now()
  842. logger.Infof("begin uploading %v objects to package %v/%v", len(p.upObjs), p.bktName, p.pkgName)
  843. for _, o := range p.upObjs {
  844. rd := cacheFileReader{
  845. rw: o.reader,
  846. }
  847. counter := io2.Counter(&rd)
  848. err = upder.Upload(clitypes.JoinObjectPath(o.pathComps[2:]...), counter, uploader.UploadOption{
  849. CreateTime: o.modTime,
  850. })
  851. if err != nil {
  852. logger.Warnf("upload object %v: %v", o.pathComps, err)
  853. upFailed++
  854. continue
  855. }
  856. o.isSuccess = true
  857. upSuc++
  858. upSucAmt += counter.Count()
  859. }
  860. // 在锁保护下登记上传结果
  861. c.lock.Lock()
  862. upCancel := 0
  863. upRename := 0
  864. // 检查是否有文件在上传期间发生了变化
  865. var sucObjs []*uploadingObject
  866. for _, o := range p.upObjs {
  867. o.cache.state.uploading = nil
  868. if !o.isSuccess {
  869. continue
  870. }
  871. oldPath := clitypes.JoinObjectPath(o.pathComps[2:]...)
  872. newPath := clitypes.JoinObjectPath(o.cache.pathComps[2:]...)
  873. if o.isDeleted {
  874. upder.CancelObject(oldPath)
  875. upCancel++
  876. continue
  877. }
  878. // 如果对象移动到了另一个Package,那么也要取消上传
  879. if !lo2.ArrayEquals(o.pathComps[:2], o.cache.pathComps[:2]) {
  880. upder.CancelObject(oldPath)
  881. upCancel++
  882. continue
  883. }
  884. // 只有仍在同Package内移动的对象才能直接重命名
  885. if newPath != oldPath {
  886. upder.RenameObject(oldPath, newPath)
  887. upRename++
  888. }
  889. sucObjs = append(sucObjs, o)
  890. }
  891. _, err = upder.Commit()
  892. if err != nil {
  893. logger.Warnf("commit update package %v/%v: %v", p.bktName, p.pkgName, err)
  894. } else {
  895. for _, obj := range sucObjs {
  896. obj.cache.RevisionUploaded(obj.reader.revision, obj.metaRevision)
  897. }
  898. upTime := time.Since(upStartTime)
  899. logger.Infof("upload package %v/%v in %v, upload: %v, size: %v, speed: %v/s, cancel: %v, rename: %v",
  900. p.bktName, p.pkgName, upTime, upSuc, upSucAmt, bytesize.New(float64(upSucAmt)/upTime.Seconds()), upCancel, upRename)
  901. }
  902. c.lock.Unlock()
  903. // 关闭文件会影响refCount,所以无论是上传失败还是上传成功,都会在等待一段时间后才进行下一阶段的操作
  904. for _, obj := range p.upObjs {
  905. obj.reader.Close()
  906. }
  907. }
  908. }
  909. type cacheFileReader struct {
  910. rw *CacheFileHandle
  911. pos int64
  912. }
  913. func (r *cacheFileReader) Read(p []byte) (int, error) {
  914. n, err := r.rw.ReadAt(p, r.pos)
  915. r.pos += int64(n)
  916. if err != nil {
  917. return n, err
  918. }
  919. if n != len(p) {
  920. return n, io.EOF
  921. }
  922. return n, nil
  923. }

本项目旨在将云际存储公共基础设施化,使个人及企业可低门槛使用高效的云际存储服务(安装开箱即用云际存储客户端即可,无需关注其他组件的部署),同时支持用户灵活便捷定制云际存储的功能细节。