You can not select more than 25 topics Topics must start with a chinese character,a letter or number, can include dashes ('-') and can be up to 35 characters long.

cache.go 26 kB

8 months ago
8 months ago
8 months ago
8 months ago
8 months ago
8 months ago
8 months ago
8 months ago
7 months ago
12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097
  1. package cache
  2. import (
  3. "errors"
  4. "io"
  5. "os"
  6. "path/filepath"
  7. "sync"
  8. "syscall"
  9. "time"
  10. "github.com/inhies/go-bytesize"
  11. "github.com/samber/lo"
  12. "gitlink.org.cn/cloudream/common/pkgs/logger"
  13. "gitlink.org.cn/cloudream/common/pkgs/trie"
  14. "gitlink.org.cn/cloudream/common/utils/io2"
  15. "gitlink.org.cn/cloudream/common/utils/lo2"
  16. "gitlink.org.cn/cloudream/common/utils/sort2"
  17. "gitlink.org.cn/cloudream/jcs-pub/client/internal/db"
  18. "gitlink.org.cn/cloudream/jcs-pub/client/internal/downloader"
  19. "gitlink.org.cn/cloudream/jcs-pub/client/internal/mount/config"
  20. "gitlink.org.cn/cloudream/jcs-pub/client/internal/mount/fuse"
  21. "gitlink.org.cn/cloudream/jcs-pub/client/internal/uploader"
  22. clitypes "gitlink.org.cn/cloudream/jcs-pub/client/types"
  23. )
  24. type CacheEntry interface {
  25. fuse.FsEntry
  26. // 在虚拟文件系统中的路径,即不包含缓存目录的路径
  27. PathComps() []string
  28. }
  29. type CacheEntryInfo struct {
  30. PathComps []string
  31. Size int64
  32. Perm os.FileMode
  33. ModTime time.Time
  34. IsDir bool
  35. // 元数据版本号
  36. MetaRevision int
  37. // 文件数据版本号
  38. DataRevision int
  39. // 引用计数
  40. RefCount int
  41. // 上次引用计数归零的时间,也即上次使用时间
  42. FreeTime time.Time
  43. // 缓存等级
  44. Level CacheLevel
  45. // 缓存等级改变时间
  46. ChangeLevelTime time.Time
  47. }
  48. type Cache struct {
  49. cfg *config.Config
  50. db *db.DB
  51. uploader *uploader.Uploader
  52. downloader *downloader.Downloader
  53. lock *sync.RWMutex
  54. cacheDone chan any
  55. doFullScan chan any
  56. activeCache *trie.Trie[*CacheFile]
  57. }
  58. func NewCache(cfg *config.Config, db *db.DB, uploader *uploader.Uploader, downloader *downloader.Downloader) *Cache {
  59. return &Cache{
  60. cfg: cfg,
  61. db: db,
  62. uploader: uploader,
  63. downloader: downloader,
  64. lock: &sync.RWMutex{},
  65. cacheDone: make(chan any, 1),
  66. doFullScan: make(chan any, 1),
  67. activeCache: trie.NewTrie[*CacheFile](),
  68. }
  69. }
  70. func (c *Cache) Start() {
  71. go c.scanningCache()
  72. go c.scanningData()
  73. }
  74. func (c *Cache) Stop() {
  75. close(c.cacheDone)
  76. }
  77. func (c *Cache) GetCacheDataPath(comps ...string) string {
  78. comps2 := make([]string, len(comps)+1)
  79. comps2[0] = c.cfg.DataDir
  80. copy(comps2[1:], comps)
  81. return filepath.Join(comps2...)
  82. }
  83. func (c *Cache) GetCacheMetaPath(comps ...string) string {
  84. comps2 := make([]string, len(comps)+1)
  85. comps2[0] = c.cfg.MetaDir
  86. copy(comps2[1:], comps)
  87. return filepath.Join(comps2...)
  88. }
  89. func (c *Cache) Dump() CacheStatus {
  90. c.lock.RLock()
  91. defer c.lock.RUnlock()
  92. var activeFiles []CacheFileStatus
  93. c.activeCache.Iterate(func(path []string, node *trie.Node[*CacheFile], isWordNode bool) trie.VisitCtrl {
  94. if node.Value == nil {
  95. return trie.VisitContinue
  96. }
  97. info := node.Value.Info()
  98. activeFiles = append(activeFiles, CacheFileStatus{
  99. Path: filepath.Join(path...),
  100. RefCount: info.RefCount,
  101. Level: info.Level.String(),
  102. IsUploading: node.Value.state.uploading != nil,
  103. })
  104. return trie.VisitContinue
  105. })
  106. return CacheStatus{
  107. ActiveFiles: activeFiles,
  108. }
  109. }
  110. func (c *Cache) ReclaimSpace() {
  111. select {
  112. case c.doFullScan <- nil:
  113. default:
  114. }
  115. }
  116. // 获取指定位置的缓存条目信息。如果路径不存在,则返回nil。
  117. func (c *Cache) Stat(pathComps []string) *CacheEntryInfo {
  118. c.lock.RLock()
  119. defer c.lock.RUnlock()
  120. node, ok := c.activeCache.WalkEnd(pathComps)
  121. if ok && node.Value != nil {
  122. info := node.Value.Info()
  123. return &info
  124. }
  125. dataPath := c.GetCacheDataPath(pathComps...)
  126. stat, err := os.Stat(dataPath)
  127. if err != nil {
  128. // TODO 日志记录
  129. return nil
  130. }
  131. if stat.IsDir() {
  132. info, err := loadCacheDirInfo(c, pathComps, stat)
  133. if err != nil {
  134. return nil
  135. }
  136. return info
  137. }
  138. info, err := loadCacheFileInfo(c, pathComps, stat)
  139. if err != nil {
  140. return nil
  141. }
  142. return info
  143. }
  144. // 创建一个缓存文件。如果文件已经存在,则会覆盖已有文件。如果加载过程中发生了错误,或者目标位置是一个目录,则会返回nil。
  145. //
  146. // 记得使用Release减少引用计数
  147. func (c *Cache) CreateFile(pathComps []string) *CacheFile {
  148. c.lock.Lock()
  149. defer c.lock.Unlock()
  150. node, ok := c.activeCache.WalkEnd(pathComps)
  151. if ok && node.Value != nil {
  152. node.Value.Delete()
  153. if node.Value.state.uploading != nil {
  154. node.Value.state.uploading.isDeleted = true
  155. }
  156. }
  157. ch, err := createNewCacheFile(c, pathComps)
  158. if err != nil {
  159. logger.Warnf("create new cache file %v: %v", pathComps, err)
  160. return nil
  161. }
  162. ch.IncRef()
  163. c.activeCache.CreateWords(pathComps).Value = ch
  164. logger.Debugf("create new cache file %v", pathComps)
  165. return ch
  166. }
  167. // 尝试加载缓存文件,如果文件不存在,则使用obj的信息创建一个新缓存文件,而如果obj为nil,那么会返回nil。
  168. //
  169. // 记得使用Release减少引用计数
  170. func (c *Cache) LoadFile(pathComps []string, obj *clitypes.Object) *CacheFile {
  171. c.lock.Lock()
  172. defer c.lock.Unlock()
  173. node, ok := c.activeCache.WalkEnd(pathComps)
  174. if ok && node.Value != nil {
  175. node.Value.IncRef()
  176. return node.Value
  177. }
  178. ch, err := loadCacheFile(c, pathComps)
  179. if err == nil {
  180. ch.remoteObj = obj
  181. ch.IncRef()
  182. c.activeCache.CreateWords(pathComps).Value = ch
  183. logger.Debugf("load cache %v", pathComps)
  184. return ch
  185. }
  186. if !os.IsNotExist(err) {
  187. // TODO 日志记录
  188. logger.Warnf("load cache %v: %v", pathComps, err)
  189. return nil
  190. }
  191. if obj == nil {
  192. return nil
  193. }
  194. ch, err = newCacheFileFromObject(c, pathComps, obj)
  195. if err != nil {
  196. logger.Warnf("create cache %v from object: %v", pathComps, err)
  197. return nil
  198. }
  199. ch.IncRef()
  200. c.activeCache.CreateWords(pathComps).Value = ch
  201. logger.Debugf("create cache %v from object %v", pathComps, obj.ObjectID)
  202. return ch
  203. }
  204. // 仅加载文件的元数据,如果文件不存在,则返回nil
  205. //
  206. // 记得使用Release减少引用计数
  207. func (c *Cache) LoadReadOnlyFile(pathComps []string) *CacheFile {
  208. c.lock.Lock()
  209. defer c.lock.Unlock()
  210. node, ok := c.activeCache.WalkEnd(pathComps)
  211. if ok && node.Value != nil {
  212. node.Value.IncRef()
  213. return node.Value
  214. }
  215. ch, err := loadReadOnlyCacheFile(c, pathComps)
  216. if err == nil {
  217. ch.IncRef()
  218. c.activeCache.CreateWords(pathComps).Value = ch
  219. logger.Debugf("load cache %v", pathComps)
  220. return ch
  221. }
  222. if !os.IsNotExist(err) {
  223. // TODO 日志记录
  224. logger.Warnf("load cache %v: %v", pathComps, err)
  225. return nil
  226. }
  227. return nil
  228. }
  229. // 创建一个缓存目录。如果目录已经存在,则会重置目录属性。如果加载过程中发生了错误,或者目标位置是一个文件,则会返回nil
  230. func (c *Cache) CreateDir(pathComps []string) *CacheDir {
  231. c.lock.Lock()
  232. defer c.lock.Unlock()
  233. ch, err := createNewCacheDir(c, pathComps)
  234. if err != nil {
  235. logger.Warnf("create cache dir: %v", err)
  236. return nil
  237. }
  238. return ch
  239. }
  240. type CreateDirOption struct {
  241. ModTime time.Time
  242. }
  243. // 加载指定缓存目录,如果目录不存在,则使用createOpt选项创建目录,而如果createOpt为nil,那么会返回nil。
  244. func (c *Cache) LoadDir(pathComps []string, createOpt *CreateDirOption) *CacheDir {
  245. c.lock.Lock()
  246. defer c.lock.Unlock()
  247. ch, err := loadCacheDir(c, pathComps)
  248. if err == nil {
  249. return ch
  250. }
  251. if !os.IsNotExist(err) {
  252. // TODO 日志记录
  253. return nil
  254. }
  255. if createOpt == nil {
  256. return nil
  257. }
  258. // 创建目录
  259. ch, err = makeCacheDirFromOption(c, pathComps, *createOpt)
  260. if err != nil {
  261. // TODO 日志记录
  262. return nil
  263. }
  264. return ch
  265. }
  266. // 加载指定路径下的所有缓存条目信息
  267. func (c *Cache) StatMany(pathComps []string) []CacheEntryInfo {
  268. c.lock.RLock()
  269. defer c.lock.RUnlock()
  270. var infos []CacheEntryInfo
  271. exists := make(map[string]bool)
  272. node, ok := c.activeCache.WalkEnd(pathComps)
  273. if ok {
  274. for name, child := range node.WordNexts {
  275. if child.Value != nil {
  276. infos = append(infos, child.Value.Info())
  277. exists[name] = true
  278. }
  279. }
  280. }
  281. osEns, err := os.ReadDir(c.GetCacheDataPath(pathComps...))
  282. if err != nil {
  283. return nil
  284. }
  285. for _, e := range osEns {
  286. if exists[e.Name()] {
  287. continue
  288. }
  289. info, err := e.Info()
  290. if err != nil {
  291. continue
  292. }
  293. if e.IsDir() {
  294. info, err := loadCacheDirInfo(c, append(lo2.ArrayClone(pathComps), e.Name()), info)
  295. if err != nil {
  296. continue
  297. }
  298. infos = append(infos, *info)
  299. } else {
  300. info, err := loadCacheFileInfo(c, append(lo2.ArrayClone(pathComps), e.Name()), info)
  301. if err != nil {
  302. continue
  303. }
  304. infos = append(infos, *info)
  305. }
  306. }
  307. return infos
  308. }
  309. // 删除指定路径的缓存文件或目录。删除目录时如果目录不为空,则会报错。
  310. func (c *Cache) Remove(pathComps []string) error {
  311. c.lock.Lock()
  312. defer c.lock.Unlock()
  313. node, ok := c.activeCache.WalkEnd(pathComps)
  314. if ok {
  315. if len(node.WordNexts) > 0 {
  316. return fuse.ErrNotEmpty
  317. }
  318. if node.Value != nil {
  319. node.Value.Delete()
  320. if node.Value.state.uploading != nil {
  321. node.Value.state.uploading.isDeleted = true
  322. }
  323. }
  324. node.RemoveSelf(true)
  325. logger.Debugf("active cache %v removed", pathComps)
  326. return nil
  327. }
  328. metaPath := c.GetCacheMetaPath(pathComps...)
  329. dataPath := c.GetCacheDataPath(pathComps...)
  330. os.Remove(metaPath)
  331. err := os.Remove(dataPath)
  332. if err == nil || os.IsNotExist(err) {
  333. logger.Debugf("local cache %v removed", pathComps)
  334. return nil
  335. }
  336. if errors.Is(err, syscall.ENOTEMPTY) {
  337. return fuse.ErrNotEmpty
  338. }
  339. return err
  340. }
  341. // 移动指定路径的缓存文件或目录到新的路径。如果目标路径已经存在,则会报错。
  342. //
  343. // 如果移动成功,则返回移动后的缓存文件或目录。如果文件或目录不存在,则返回nil。
  344. func (c *Cache) Move(pathComps []string, newPathComps []string) error {
  345. c.lock.Lock()
  346. defer c.lock.Unlock()
  347. _, ok := c.activeCache.WalkEnd(newPathComps)
  348. if ok {
  349. return fuse.ErrExists
  350. }
  351. newMetaPath := c.GetCacheMetaPath(newPathComps...)
  352. newDataPath := c.GetCacheDataPath(newPathComps...)
  353. _, err := os.Stat(newDataPath)
  354. if err == nil {
  355. return fuse.ErrExists
  356. }
  357. if !os.IsNotExist(err) {
  358. return err
  359. }
  360. oldMetaPath := c.GetCacheMetaPath(pathComps...)
  361. oldDataPath := c.GetCacheDataPath(pathComps...)
  362. // 确定源文件存在,再进行后面的操作
  363. _, err = os.Stat(oldDataPath)
  364. if err != nil {
  365. if os.IsNotExist(err) {
  366. return fuse.ErrNotExists
  367. }
  368. return err
  369. }
  370. // 创建父目录是为了解决被移动的文件不在本地的问题。
  371. // 但同时也导致了如果目的路径的父目录确实不存在,这里会意外的创建了这个目录
  372. newMetaDir := filepath.Dir(newMetaPath)
  373. err = os.MkdirAll(newMetaDir, 0755)
  374. if err != nil {
  375. return err
  376. }
  377. newDataDir := filepath.Dir(newDataPath)
  378. err = os.MkdirAll(newDataDir, 0755)
  379. if err != nil {
  380. return err
  381. }
  382. // 每个缓存文件持有meta文件和data文件的句柄,所以这里移动文件,不影响句柄的使用。
  383. // 只能忽略这里的错误
  384. os.Rename(oldMetaPath, newMetaPath)
  385. os.Rename(oldDataPath, newDataPath)
  386. // 更新缓存
  387. oldNode, ok := c.activeCache.WalkEnd(pathComps)
  388. if ok {
  389. newNode := c.activeCache.CreateWords(newPathComps)
  390. newNode.Value = oldNode.Value
  391. newNode.WordNexts = oldNode.WordNexts
  392. oldNode.RemoveSelf(false)
  393. if newNode.Value != nil {
  394. newNode.Value.Move(newPathComps)
  395. }
  396. newNode.Iterate(func(path []string, node *trie.Node[*CacheFile], isWordNode bool) trie.VisitCtrl {
  397. if node.Value != nil {
  398. node.Value.Move(lo2.AppendNew(newPathComps, path...))
  399. }
  400. return trie.VisitContinue
  401. })
  402. }
  403. logger.Debugf("cache moved: %v -> %v", pathComps, newPathComps)
  404. return nil
  405. }
  406. type syncPackage struct {
  407. bktName string
  408. pkgName string
  409. pkg clitypes.Package
  410. upObjs []*uploadingObject
  411. }
  412. type uploadingObject struct {
  413. pathComps []string
  414. cache *CacheFile
  415. reader *CacheFileHandle
  416. modTime time.Time
  417. metaRevision int
  418. isDeleted bool
  419. isSuccess bool
  420. }
  421. type packageFullName struct {
  422. bktName string
  423. pkgName string
  424. }
  425. func (c *Cache) scanningCache() {
  426. ticker := time.NewTicker(time.Second * 5)
  427. defer ticker.Stop()
  428. lastScanPath := []string{}
  429. nextFullScan := false
  430. for {
  431. select {
  432. case _, ok := <-c.cacheDone:
  433. if !ok {
  434. return
  435. }
  436. case <-ticker.C:
  437. case <-c.doFullScan:
  438. nextFullScan = true
  439. }
  440. // 每完成一轮快速的渐进全量扫描,就进行一次即时全量扫描
  441. if nextFullScan {
  442. c.fullScan()
  443. nextFullScan = false
  444. continue
  445. }
  446. lastScanPath = c.fastScan(lastScanPath)
  447. if len(lastScanPath) == 0 {
  448. nextFullScan = true
  449. }
  450. }
  451. }
  452. // 全量扫描,主要是检查总缓存大小是否超标
  453. func (c *Cache) fullScan() {
  454. log := logger.WithField("Mod", "Mount")
  455. startTime := time.Now()
  456. log.Debug("begin full scan")
  457. defer func() {
  458. log.Debugf("full scan done, time: %v", time.Since(startTime))
  459. }()
  460. c.lock.Lock()
  461. defer c.lock.Unlock()
  462. totalCacheSize := int64(0)
  463. type readOnlyCache struct {
  464. Info CacheEntryInfo
  465. Node *trie.Node[*CacheFile]
  466. }
  467. var readOnlyCaches []readOnlyCache
  468. c.activeCache.Iterate(func(path []string, node *trie.Node[*CacheFile], isWordNode bool) trie.VisitCtrl {
  469. ch := node.Value
  470. if ch == nil {
  471. return trie.VisitContinue
  472. }
  473. info := ch.Info()
  474. if info.Level > LevelReadOnly {
  475. return trie.VisitContinue
  476. }
  477. if info.DataRevision > 0 || info.MetaRevision > 0 {
  478. return trie.VisitContinue
  479. }
  480. readOnlyCaches = append(readOnlyCaches, readOnlyCache{
  481. Info: info,
  482. Node: node,
  483. })
  484. totalCacheSize += info.Size
  485. return trie.VisitContinue
  486. })
  487. // 如果总缓存文件大小超过限制,那么就从最早被使用的开始删除
  488. if c.cfg.MaxCacheSize > 0 {
  489. needReclaim := totalCacheSize - c.cfg.MaxCacheSize
  490. if needReclaim > 0 {
  491. readOnlyCaches = sort2.Sort(readOnlyCaches, func(left, right readOnlyCache) int {
  492. return left.Info.FreeTime.Compare(right.Info.FreeTime)
  493. })
  494. reclaimed := int64(0)
  495. rmCnt := 0
  496. for _, rc := range readOnlyCaches {
  497. rc.Node.Value.Delete()
  498. rc.Node.RemoveSelf(true)
  499. needReclaim -= rc.Info.Size
  500. reclaimed += rc.Info.Size
  501. rmCnt += 1
  502. if needReclaim <= 0 {
  503. break
  504. }
  505. }
  506. log.Infof("%v cache file removed, reclaimed %v bytes, total cache size: %v", rmCnt, reclaimed, totalCacheSize-reclaimed)
  507. }
  508. }
  509. // TODO 还可以做点其他的检查,比如文件句柄数
  510. }
  511. // 快速扫描,每次只扫描一部分节点,做的事情会繁重一点
  512. func (c *Cache) fastScan(lastScanPath []string) []string {
  513. c.lock.Lock()
  514. uploadingPkgs := make(map[packageFullName]*syncPackage)
  515. visitCnt := 0
  516. visitBreak := false
  517. node, _ := c.activeCache.WalkEnd(lastScanPath)
  518. node.Iterate(func(path []string, node *trie.Node[*CacheFile], isWordNode bool) trie.VisitCtrl {
  519. ch := node.Value
  520. if ch == nil {
  521. return trie.VisitContinue
  522. }
  523. info := ch.Info()
  524. if info.RefCount > 0 {
  525. logger.Debugf("skip cache %v, refCount: %v", path, info.RefCount)
  526. return trie.VisitContinue
  527. }
  528. visitCnt++
  529. c.visitNode(path, node, ch, info, uploadingPkgs)
  530. // 每次最多遍历500个节点,防止占用锁太久
  531. if visitCnt > 500 {
  532. lastScanPath = lo2.ArrayClone(path)
  533. visitBreak = true
  534. return trie.VisitBreak
  535. }
  536. return trie.VisitContinue
  537. })
  538. if !visitBreak {
  539. lastScanPath = []string{}
  540. }
  541. c.lock.Unlock()
  542. if len(uploadingPkgs) > 0 {
  543. go c.doSync(lo.Values(uploadingPkgs))
  544. }
  545. return lastScanPath
  546. }
  547. func (c *Cache) visitNode(path []string, node *trie.Node[*CacheFile], ch *CacheFile, info CacheEntryInfo, uploadingPkgs map[packageFullName]*syncPackage) {
  548. shouldUpload := true
  549. // 不存放在Package里的文件,不需要上传
  550. if len(ch.pathComps) <= 2 {
  551. shouldUpload = false
  552. }
  553. // 1. 本地缓存被修改了,如果一段时间内没有被使用,则进行上传
  554. if shouldUpload && (info.DataRevision > 0 || info.MetaRevision > 0) {
  555. if time.Since(info.FreeTime) < c.cfg.UploadPendingTime {
  556. return
  557. }
  558. if ch.state.uploading != nil {
  559. return
  560. }
  561. // 上传文件需要完全加载级别的缓存等级
  562. ch.LevelUp(LevelComplete)
  563. fullName := packageFullName{ch.pathComps[0], ch.pathComps[1]}
  564. pkg, ok := uploadingPkgs[fullName]
  565. if !ok {
  566. pkg = &syncPackage{
  567. bktName: ch.pathComps[0],
  568. pkgName: ch.pathComps[1],
  569. }
  570. uploadingPkgs[fullName] = pkg
  571. }
  572. obj := &uploadingObject{
  573. pathComps: lo2.ArrayClone(ch.pathComps),
  574. cache: ch,
  575. }
  576. pkg.upObjs = append(pkg.upObjs, obj)
  577. ch.state.uploading = obj
  578. if info.DataRevision > 0 {
  579. obj.reader = ch.OpenReadWhenScanning()
  580. }
  581. if info.MetaRevision > 0 {
  582. obj.modTime = info.ModTime
  583. obj.metaRevision = info.MetaRevision
  584. }
  585. return
  586. }
  587. // 2. 本地缓存没有被修改,如果一段时间内没有被使用,则进行卸载
  588. if info.Level > LevelReadOnly {
  589. if time.Since(info.FreeTime) > c.cfg.CacheActiveTime {
  590. ch.LevelDown(LevelReadOnly)
  591. }
  592. return
  593. }
  594. // 3. 卸载后的缓存,如果一段时间内没有被使用,则进行删除。
  595. if info.Level <= LevelReadOnly {
  596. // 需要同时满足距上次使用时间和距上次卸载时间超过配置的时间,才可以删除
  597. if time.Since(info.FreeTime) > c.cfg.CacheExpireTime && time.Since(info.ChangeLevelTime) > c.cfg.CacheExpireTime {
  598. // 如果文件已经同步到远端,则可以直接删除本地缓存
  599. if info.MetaRevision == 0 && info.DataRevision == 0 {
  600. ch.Delete()
  601. }
  602. // 文件数据或者元数据有修改,但缓存等级是ReadOnly以下,意味着在之前检查是否需要上传时被判定为不需要上传
  603. // 这种文件删除缓存记录即可(但会在扫描数据目录时再次被加载进来)
  604. node.RemoveSelf(true)
  605. }
  606. return
  607. }
  608. }
  609. // 扫描文件数据目录
  610. func (c *Cache) scanningData() {
  611. ticker := time.NewTicker(c.cfg.ScanDataDirInterval)
  612. defer ticker.Stop()
  613. var walkTrace []*os.File
  614. var walkTraceComps []string
  615. for {
  616. select {
  617. case <-ticker.C:
  618. case <-c.cacheDone:
  619. return
  620. }
  621. startTime := time.Now()
  622. logger.Infof("begin scanning data dir")
  623. if len(walkTrace) == 0 {
  624. dir, err := os.Open(c.cfg.DataDir)
  625. if err != nil {
  626. logger.Warnf("open data dir: %v", err)
  627. continue
  628. }
  629. walkTrace = []*os.File{dir}
  630. walkTraceComps = []string{c.cfg.MetaDir}
  631. }
  632. const maxVisitCnt = 5000
  633. const maxUntrackedFiles = 500
  634. var untrackedFiles [][]string
  635. visitCnt := 0
  636. // 一次最多遍历5000个文件(包括路径上的文件夹),一次最多添加500个未跟踪文件
  637. for len(walkTrace) > 0 && visitCnt < maxVisitCnt && len(untrackedFiles) < maxUntrackedFiles {
  638. lastNode := walkTrace[len(walkTrace)-1]
  639. visitCnt++
  640. e, err := lastNode.Readdir(1)
  641. if err == io.EOF {
  642. lastNode.Close()
  643. walkTrace = walkTrace[:len(walkTrace)-1]
  644. walkTraceComps = walkTraceComps[:len(walkTraceComps)-1]
  645. continue
  646. }
  647. if err != nil {
  648. logger.Warnf("read dir %v: %v", lastNode.Name(), err)
  649. lastNode.Close()
  650. walkTrace = walkTrace[:len(walkTrace)-1]
  651. walkTraceComps = walkTraceComps[:len(walkTraceComps)-1]
  652. continue
  653. }
  654. if e[0].IsDir() {
  655. child, err := os.Open(filepath.Join(lastNode.Name(), e[0].Name()))
  656. if err != nil {
  657. logger.Warnf("open dir %v: %v", e[0].Name(), err)
  658. continue
  659. }
  660. walkTrace = append(walkTrace, child)
  661. walkTraceComps = append(walkTraceComps, e[0].Name())
  662. continue
  663. }
  664. // 对于不在Package层级的文件,不跟踪
  665. if len(walkTrace) <= 2 {
  666. continue
  667. }
  668. // 无条件加载缓存,可能会导致一些不需要被同步到云端的文件在缓存等级降到最低取消跟踪后,又重新被加载进来
  669. // 不过由于扫描频率不高,所以问题不大
  670. walkTraceComps = append(walkTraceComps, e[0].Name())
  671. untrackedFiles = append(untrackedFiles, lo2.ArrayClone(walkTraceComps[1:]))
  672. walkTraceComps = walkTraceComps[:len(walkTraceComps)-1]
  673. }
  674. if len(untrackedFiles) > 0 {
  675. for _, comps := range untrackedFiles {
  676. ch := c.LoadReadOnlyFile(comps)
  677. if ch != nil {
  678. ch.Release()
  679. }
  680. }
  681. }
  682. logger.Infof("%v file visited, %v untracked files found, time: %v", visitCnt, len(untrackedFiles), time.Since(startTime))
  683. }
  684. }
  685. func (c *Cache) doSync(pkgs []*syncPackage) {
  686. var uploadPkgs []*syncPackage
  687. var updateOnlyPkgs []*syncPackage
  688. for _, p := range pkgs {
  689. var updateOnly *syncPackage
  690. var upload *syncPackage
  691. for _, o := range p.upObjs {
  692. if o.reader != nil {
  693. if upload == nil {
  694. upload = &syncPackage{
  695. bktName: p.bktName,
  696. pkgName: p.pkgName,
  697. }
  698. }
  699. upload.upObjs = append(upload.upObjs, o)
  700. } else {
  701. if updateOnly == nil {
  702. updateOnly = &syncPackage{
  703. bktName: p.bktName,
  704. pkgName: p.pkgName,
  705. }
  706. }
  707. updateOnly.upObjs = append(updateOnly.upObjs, o)
  708. }
  709. }
  710. if upload != nil {
  711. uploadPkgs = append(uploadPkgs, upload)
  712. }
  713. if updateOnly != nil {
  714. updateOnlyPkgs = append(updateOnlyPkgs, updateOnly)
  715. }
  716. }
  717. // 先上传文件,再更新文件元数据。上传文件时会创建Package,这样后续更新元数据时就能查到Package。
  718. if len(uploadPkgs) > 0 {
  719. c.doUploading(uploadPkgs)
  720. }
  721. if len(updateOnlyPkgs) > 0 {
  722. c.doUpdatingOnly(updateOnlyPkgs)
  723. }
  724. }
  725. func (c *Cache) doUpdatingOnly(pkgs []*syncPackage) {
  726. /// 1. 只是更新元数据,那么就只尝试查询Package
  727. var sucPkgs []*syncPackage
  728. var failedPkgs []*syncPackage
  729. for _, pkg := range pkgs {
  730. p, err := c.db.Package().GetByFullName(c.db.DefCtx(), pkg.bktName, pkg.pkgName)
  731. if err != nil {
  732. logger.Warnf("get package %v/%v: %v", pkg.bktName, pkg.pkgName, err)
  733. failedPkgs = append(failedPkgs, pkg)
  734. continue
  735. }
  736. pkg.pkg = p
  737. sucPkgs = append(sucPkgs, pkg)
  738. }
  739. /// 2. 对于创建失败的Package, 在锁的保护下取消上传状态
  740. c.lock.Lock()
  741. for _, pkg := range failedPkgs {
  742. for _, obj := range pkg.upObjs {
  743. obj.cache.state.uploading = nil
  744. }
  745. }
  746. c.lock.Unlock()
  747. /// 3. 开始更新每个Package
  748. for _, p := range sucPkgs {
  749. pathes := make([]string, 0, len(p.upObjs))
  750. modTimes := make([]time.Time, 0, len(p.upObjs))
  751. for _, obj := range p.upObjs {
  752. pathes = append(pathes, clitypes.JoinObjectPath(obj.pathComps[2:]...))
  753. modTimes = append(modTimes, obj.modTime)
  754. }
  755. err := c.db.Object().BatchUpdateUpdateTimeByPath(c.db.DefCtx(), p.pkg.PackageID, pathes, modTimes)
  756. if err != nil {
  757. logger.Warnf("batch update package %v/%v: %v", p.bktName, p.pkgName, err)
  758. c.lock.Lock()
  759. for _, obj := range p.upObjs {
  760. obj.cache.state.uploading = nil
  761. }
  762. c.lock.Unlock()
  763. continue
  764. }
  765. logger.Infof("update %v object in package %v/%v", len(p.upObjs), p.bktName, p.pkgName)
  766. // 登记上传结果
  767. c.lock.Lock()
  768. for _, obj := range p.upObjs {
  769. obj.cache.state.uploading = nil
  770. obj.cache.RevisionUploaded(0, obj.metaRevision)
  771. }
  772. c.lock.Unlock()
  773. }
  774. }
  775. func (c *Cache) doUploading(pkgs []*syncPackage) {
  776. /// 1. 先尝试创建Package
  777. var sucPkgs []*syncPackage
  778. var failedPkgs []*syncPackage
  779. for _, pkg := range pkgs {
  780. p, err := db.DoTx21(c.db, c.db.Package().TryCreateAll, pkg.bktName, pkg.pkgName)
  781. if err != nil {
  782. logger.Warnf("try create package %v/%v: %v", pkg.bktName, pkg.pkgName, err)
  783. failedPkgs = append(failedPkgs, pkg)
  784. continue
  785. }
  786. pkg.pkg = p
  787. sucPkgs = append(sucPkgs, pkg)
  788. }
  789. /// 2. 对于创建失败的Package,直接关闭文件,不进行上传
  790. // 在锁的保护下取消上传状态
  791. c.lock.Lock()
  792. for _, pkg := range failedPkgs {
  793. for _, obj := range pkg.upObjs {
  794. obj.cache.state.uploading = nil
  795. }
  796. }
  797. c.lock.Unlock()
  798. for _, pkg := range failedPkgs {
  799. for _, obj := range pkg.upObjs {
  800. obj.reader.Close()
  801. }
  802. }
  803. /// 3. 开始上传每个Package
  804. for _, p := range sucPkgs {
  805. upder, err := c.uploader.BeginUpdate(p.pkg.PackageID, 0, nil, nil)
  806. if err != nil {
  807. logger.Warnf("begin upload package %v/%v: %v", p.bktName, p.pkgName, err)
  808. // 取消上传状态
  809. c.lock.Lock()
  810. for _, obj := range p.upObjs {
  811. obj.cache.state.uploading = nil
  812. }
  813. c.lock.Unlock()
  814. for _, obj := range p.upObjs {
  815. obj.reader.Close()
  816. }
  817. continue
  818. }
  819. upSuc := 0
  820. upSucAmt := int64(0)
  821. upFailed := 0
  822. upStartTime := time.Now()
  823. logger.Infof("begin uploading %v objects to package %v/%v", len(p.upObjs), p.bktName, p.pkgName)
  824. for _, o := range p.upObjs {
  825. rd := cacheFileReader{
  826. rw: o.reader,
  827. }
  828. counter := io2.Counter(&rd)
  829. err = upder.Upload(clitypes.JoinObjectPath(o.pathComps[2:]...), counter, uploader.UploadOption{
  830. CreateTime: o.modTime,
  831. })
  832. if err != nil {
  833. logger.Warnf("upload object %v: %v", o.pathComps, err)
  834. upFailed++
  835. continue
  836. }
  837. o.isSuccess = true
  838. upSuc++
  839. upSucAmt += counter.Count()
  840. }
  841. // 在锁保护下登记上传结果
  842. c.lock.Lock()
  843. upCancel := 0
  844. upRename := 0
  845. // 检查是否有文件在上传期间发生了变化
  846. var sucObjs []*uploadingObject
  847. for _, o := range p.upObjs {
  848. o.cache.state.uploading = nil
  849. if !o.isSuccess {
  850. continue
  851. }
  852. oldPath := clitypes.JoinObjectPath(o.pathComps[2:]...)
  853. newPath := clitypes.JoinObjectPath(o.cache.pathComps[2:]...)
  854. if o.isDeleted {
  855. upder.CancelObject(oldPath)
  856. upCancel++
  857. continue
  858. }
  859. // 如果对象移动到了另一个Package,那么也要取消上传
  860. if !lo2.ArrayEquals(o.pathComps[:2], o.cache.pathComps[:2]) {
  861. upder.CancelObject(oldPath)
  862. upCancel++
  863. continue
  864. }
  865. // 只有仍在同Package内移动的对象才能直接重命名
  866. if newPath != oldPath {
  867. upder.RenameObject(oldPath, newPath)
  868. upRename++
  869. }
  870. sucObjs = append(sucObjs, o)
  871. }
  872. _, err = upder.Commit()
  873. if err != nil {
  874. logger.Warnf("commit update package %v/%v: %v", p.bktName, p.pkgName, err)
  875. } else {
  876. for _, obj := range sucObjs {
  877. obj.cache.RevisionUploaded(obj.reader.revision, obj.metaRevision)
  878. }
  879. upTime := time.Since(upStartTime)
  880. logger.Infof("upload package %v/%v in %v, upload: %v, size: %v, speed: %v/s, cancel: %v, rename: %v",
  881. p.bktName, p.pkgName, upTime, upSuc, upSucAmt, bytesize.New(float64(upSucAmt)/upTime.Seconds()), upCancel, upRename)
  882. }
  883. c.lock.Unlock()
  884. // 关闭文件会影响refCount,所以无论是上传失败还是上传成功,都会在等待一段时间后才进行下一阶段的操作
  885. for _, obj := range p.upObjs {
  886. obj.reader.Close()
  887. }
  888. }
  889. }
  890. type cacheFileReader struct {
  891. rw *CacheFileHandle
  892. pos int64
  893. }
  894. func (r *cacheFileReader) Read(p []byte) (int, error) {
  895. n, err := r.rw.ReadAt(p, r.pos)
  896. r.pos += int64(n)
  897. if err != nil {
  898. return n, err
  899. }
  900. if n != len(p) {
  901. return n, io.EOF
  902. }
  903. return n, nil
  904. }

本项目旨在将云际存储公共基础设施化,使个人及企业可低门槛使用高效的云际存储服务(安装开箱即用云际存储客户端即可,无需关注其他组件的部署),同时支持用户灵活便捷定制云际存储的功能细节。