You can not select more than 25 topics Topics must start with a chinese character,a letter or number, can include dashes ('-') and can be up to 35 characters long.

cache.go 23 kB

8 months ago
8 months ago
8 months ago
8 months ago
8 months ago
8 months ago
8 months ago
8 months ago
7 months ago
7 months ago
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979
  1. package cache
  2. import (
  3. "errors"
  4. "io"
  5. "os"
  6. "path/filepath"
  7. "sync"
  8. "syscall"
  9. "time"
  10. "github.com/inhies/go-bytesize"
  11. "github.com/samber/lo"
  12. "gitlink.org.cn/cloudream/common/pkgs/logger"
  13. "gitlink.org.cn/cloudream/common/pkgs/trie"
  14. "gitlink.org.cn/cloudream/common/utils/io2"
  15. "gitlink.org.cn/cloudream/common/utils/lo2"
  16. "gitlink.org.cn/cloudream/jcs-pub/client/internal/db"
  17. "gitlink.org.cn/cloudream/jcs-pub/client/internal/downloader"
  18. "gitlink.org.cn/cloudream/jcs-pub/client/internal/mount/config"
  19. "gitlink.org.cn/cloudream/jcs-pub/client/internal/mount/fuse"
  20. "gitlink.org.cn/cloudream/jcs-pub/client/internal/uploader"
  21. clitypes "gitlink.org.cn/cloudream/jcs-pub/client/types"
  22. )
  23. const (
  24. SyncFilterConfigName = ".cds.sync.filter"
  25. )
  26. type CacheEntry interface {
  27. fuse.FsEntry
  28. // 在虚拟文件系统中的路径,即不包含缓存目录的路径
  29. PathComps() []string
  30. }
  31. type CacheEntryInfo struct {
  32. PathComps []string
  33. Size int64
  34. Perm os.FileMode
  35. ModTime time.Time
  36. IsDir bool
  37. // 元数据版本号
  38. MetaRevision int
  39. // 文件数据版本号
  40. DataRevision int
  41. // 引用计数
  42. RefCount int
  43. // 上次引用计数归零的时间
  44. FreeTime time.Time
  45. // 缓存等级
  46. Level CacheLevel
  47. // 缓存等级改变时间
  48. ChangeLevelTime time.Time
  49. }
  50. type Cache struct {
  51. cfg *config.Config
  52. db *db.DB
  53. uploader *uploader.Uploader
  54. downloader *downloader.Downloader
  55. lock *sync.RWMutex
  56. cacheDone chan any
  57. activeCache *trie.Trie[*CacheFile]
  58. syncFilter *SyncFilter
  59. }
  60. func NewCache(cfg *config.Config, db *db.DB, uploader *uploader.Uploader, downloader *downloader.Downloader) *Cache {
  61. return &Cache{
  62. cfg: cfg,
  63. db: db,
  64. uploader: uploader,
  65. downloader: downloader,
  66. lock: &sync.RWMutex{},
  67. cacheDone: make(chan any),
  68. activeCache: trie.NewTrie[*CacheFile](),
  69. syncFilter: NewSyncFilter(),
  70. }
  71. }
  72. func (c *Cache) Start() {
  73. c.syncFilter.ReloadConfig(c.GetCacheDataPath(SyncFilterConfigName))
  74. go c.scanningCache()
  75. go c.scanningData()
  76. }
  77. func (c *Cache) Stop() {
  78. close(c.cacheDone)
  79. }
  80. func (c *Cache) GetCacheDataPath(comps ...string) string {
  81. comps2 := make([]string, len(comps)+1)
  82. comps2[0] = c.cfg.DataDir
  83. copy(comps2[1:], comps)
  84. return filepath.Join(comps2...)
  85. }
  86. func (c *Cache) GetCacheMetaPath(comps ...string) string {
  87. comps2 := make([]string, len(comps)+1)
  88. comps2[0] = c.cfg.MetaDir
  89. copy(comps2[1:], comps)
  90. return filepath.Join(comps2...)
  91. }
  92. func (c *Cache) ReloadSyncFilter() {
  93. c.syncFilter.ReloadConfig(c.GetCacheDataPath(SyncFilterConfigName))
  94. }
  95. func (c *Cache) Dump() CacheStatus {
  96. c.lock.RLock()
  97. defer c.lock.RUnlock()
  98. var activeFiles []CacheFileStatus
  99. c.activeCache.Iterate(func(path []string, node *trie.Node[*CacheFile], isWordNode bool) trie.VisitCtrl {
  100. if node.Value == nil {
  101. return trie.VisitContinue
  102. }
  103. info := node.Value.Info()
  104. activeFiles = append(activeFiles, CacheFileStatus{
  105. Path: filepath.Join(path...),
  106. RefCount: info.RefCount,
  107. Level: info.Level.String(),
  108. IsUploading: node.Value.state.uploading != nil,
  109. })
  110. return trie.VisitContinue
  111. })
  112. return CacheStatus{
  113. ActiveFiles: activeFiles,
  114. }
  115. }
  116. // 获取指定位置的缓存条目信息。如果路径不存在,则返回nil。
  117. func (c *Cache) Stat(pathComps []string) *CacheEntryInfo {
  118. c.lock.RLock()
  119. defer c.lock.RUnlock()
  120. node, ok := c.activeCache.WalkEnd(pathComps)
  121. if ok && node.Value != nil {
  122. info := node.Value.Info()
  123. return &info
  124. }
  125. dataPath := c.GetCacheDataPath(pathComps...)
  126. stat, err := os.Stat(dataPath)
  127. if err != nil {
  128. // TODO 日志记录
  129. return nil
  130. }
  131. if stat.IsDir() {
  132. info, err := loadCacheDirInfo(c, pathComps, stat)
  133. if err != nil {
  134. return nil
  135. }
  136. return info
  137. }
  138. info, err := loadCacheFileInfo(c, pathComps, stat)
  139. if err != nil {
  140. return nil
  141. }
  142. return info
  143. }
  144. // 创建一个缓存文件。如果文件已经存在,则会覆盖已有文件。如果加载过程中发生了错误,或者目标位置是一个目录,则会返回nil。
  145. //
  146. // 记得使用Release减少引用计数
  147. func (c *Cache) CreateFile(pathComps []string) *CacheFile {
  148. c.lock.Lock()
  149. defer c.lock.Unlock()
  150. node, ok := c.activeCache.WalkEnd(pathComps)
  151. if ok && node.Value != nil {
  152. node.Value.Delete()
  153. if node.Value.state.uploading != nil {
  154. node.Value.state.uploading.isDeleted = true
  155. }
  156. }
  157. ch, err := createNewCacheFile(c, pathComps)
  158. if err != nil {
  159. logger.Warnf("create new cache file %v: %v", pathComps, err)
  160. return nil
  161. }
  162. ch.IncRef()
  163. c.activeCache.CreateWords(pathComps).Value = ch
  164. logger.Debugf("create new cache file %v", pathComps)
  165. return ch
  166. }
  167. // 尝试加载缓存文件,如果文件不存在,则使用obj的信息创建一个新缓存文件,而如果obj为nil,那么会返回nil。
  168. //
  169. // 记得使用Release减少引用计数
  170. func (c *Cache) LoadFile(pathComps []string, obj *clitypes.Object) *CacheFile {
  171. c.lock.Lock()
  172. defer c.lock.Unlock()
  173. node, ok := c.activeCache.WalkEnd(pathComps)
  174. if ok && node.Value != nil {
  175. node.Value.IncRef()
  176. return node.Value
  177. }
  178. ch, err := loadCacheFile(c, pathComps)
  179. if err == nil {
  180. ch.remoteObj = obj
  181. ch.IncRef()
  182. c.activeCache.CreateWords(pathComps).Value = ch
  183. logger.Debugf("load cache %v", pathComps)
  184. return ch
  185. }
  186. if !os.IsNotExist(err) {
  187. // TODO 日志记录
  188. logger.Warnf("load cache %v: %v", pathComps, err)
  189. return nil
  190. }
  191. if obj == nil {
  192. return nil
  193. }
  194. ch, err = newCacheFileFromObject(c, pathComps, obj)
  195. if err != nil {
  196. logger.Warnf("create cache %v from object: %v", pathComps, err)
  197. return nil
  198. }
  199. ch.IncRef()
  200. c.activeCache.CreateWords(pathComps).Value = ch
  201. logger.Debugf("create cache %v from object %v", pathComps, obj.ObjectID)
  202. return ch
  203. }
  204. // 创建一个缓存目录。如果目录已经存在,则会重置目录属性。如果加载过程中发生了错误,或者目标位置是一个文件,则会返回nil
  205. func (c *Cache) CreateDir(pathComps []string) *CacheDir {
  206. c.lock.Lock()
  207. defer c.lock.Unlock()
  208. ch, err := createNewCacheDir(c, pathComps)
  209. if err != nil {
  210. logger.Warnf("create cache dir: %v", err)
  211. return nil
  212. }
  213. return ch
  214. }
  215. type CreateDirOption struct {
  216. ModTime time.Time
  217. }
  218. // 加载指定缓存目录,如果目录不存在,则使用createOpt选项创建目录,而如果createOpt为nil,那么会返回nil。
  219. func (c *Cache) LoadDir(pathComps []string, createOpt *CreateDirOption) *CacheDir {
  220. c.lock.Lock()
  221. defer c.lock.Unlock()
  222. ch, err := loadCacheDir(c, pathComps)
  223. if err == nil {
  224. return ch
  225. }
  226. if !os.IsNotExist(err) {
  227. // TODO 日志记录
  228. return nil
  229. }
  230. if createOpt == nil {
  231. return nil
  232. }
  233. // 创建目录
  234. ch, err = makeCacheDirFromOption(c, pathComps, *createOpt)
  235. if err != nil {
  236. // TODO 日志记录
  237. return nil
  238. }
  239. return ch
  240. }
  241. // 加载指定路径下的所有缓存条目信息
  242. func (c *Cache) StatMany(pathComps []string) []CacheEntryInfo {
  243. c.lock.RLock()
  244. defer c.lock.RUnlock()
  245. var infos []CacheEntryInfo
  246. exists := make(map[string]bool)
  247. node, ok := c.activeCache.WalkEnd(pathComps)
  248. if ok {
  249. for name, child := range node.WordNexts {
  250. if child.Value != nil {
  251. infos = append(infos, child.Value.Info())
  252. exists[name] = true
  253. }
  254. }
  255. }
  256. osEns, err := os.ReadDir(c.GetCacheDataPath(pathComps...))
  257. if err != nil {
  258. return nil
  259. }
  260. for _, e := range osEns {
  261. if exists[e.Name()] {
  262. continue
  263. }
  264. info, err := e.Info()
  265. if err != nil {
  266. continue
  267. }
  268. if e.IsDir() {
  269. info, err := loadCacheDirInfo(c, append(lo2.ArrayClone(pathComps), e.Name()), info)
  270. if err != nil {
  271. continue
  272. }
  273. infos = append(infos, *info)
  274. } else {
  275. info, err := loadCacheFileInfo(c, append(lo2.ArrayClone(pathComps), e.Name()), info)
  276. if err != nil {
  277. continue
  278. }
  279. infos = append(infos, *info)
  280. }
  281. }
  282. return infos
  283. }
  284. // 删除指定路径的缓存文件或目录。删除目录时如果目录不为空,则会报错。
  285. func (c *Cache) Remove(pathComps []string) error {
  286. c.lock.Lock()
  287. defer c.lock.Unlock()
  288. node, ok := c.activeCache.WalkEnd(pathComps)
  289. if ok {
  290. if len(node.WordNexts) > 0 {
  291. return fuse.ErrNotEmpty
  292. }
  293. if node.Value != nil {
  294. node.Value.Delete()
  295. if node.Value.state.uploading != nil {
  296. node.Value.state.uploading.isDeleted = true
  297. }
  298. }
  299. node.RemoveSelf(true)
  300. logger.Debugf("active cache %v removed", pathComps)
  301. return nil
  302. }
  303. metaPath := c.GetCacheMetaPath(pathComps...)
  304. dataPath := c.GetCacheDataPath(pathComps...)
  305. os.Remove(metaPath)
  306. err := os.Remove(dataPath)
  307. if err == nil || os.IsNotExist(err) {
  308. logger.Debugf("local cache %v removed", pathComps)
  309. return nil
  310. }
  311. if errors.Is(err, syscall.ENOTEMPTY) {
  312. return fuse.ErrNotEmpty
  313. }
  314. return err
  315. }
  316. // 移动指定路径的缓存文件或目录到新的路径。如果目标路径已经存在,则会报错。
  317. //
  318. // 如果移动成功,则返回移动后的缓存文件或目录。如果文件或目录不存在,则返回nil。
  319. func (c *Cache) Move(pathComps []string, newPathComps []string) error {
  320. c.lock.Lock()
  321. defer c.lock.Unlock()
  322. _, ok := c.activeCache.WalkEnd(newPathComps)
  323. if ok {
  324. return fuse.ErrExists
  325. }
  326. newMetaPath := c.GetCacheMetaPath(newPathComps...)
  327. newDataPath := c.GetCacheDataPath(newPathComps...)
  328. _, err := os.Stat(newDataPath)
  329. if err == nil {
  330. return fuse.ErrExists
  331. }
  332. if !os.IsNotExist(err) {
  333. return err
  334. }
  335. oldMetaPath := c.GetCacheMetaPath(pathComps...)
  336. oldDataPath := c.GetCacheDataPath(pathComps...)
  337. // 确定源文件存在,再进行后面的操作
  338. _, err = os.Stat(oldDataPath)
  339. if err != nil {
  340. if os.IsNotExist(err) {
  341. return fuse.ErrNotExists
  342. }
  343. return err
  344. }
  345. // 创建父目录是为了解决被移动的文件不在本地的问题。
  346. // 但同时也导致了如果目的路径的父目录确实不存在,这里会意外的创建了这个目录
  347. newMetaDir := filepath.Dir(newMetaPath)
  348. err = os.MkdirAll(newMetaDir, 0755)
  349. if err != nil {
  350. return err
  351. }
  352. newDataDir := filepath.Dir(newDataPath)
  353. err = os.MkdirAll(newDataDir, 0755)
  354. if err != nil {
  355. return err
  356. }
  357. // 每个缓存文件持有meta文件和data文件的句柄,所以这里移动文件,不影响句柄的使用。
  358. // 只能忽略这里的错误
  359. os.Rename(oldMetaPath, newMetaPath)
  360. os.Rename(oldDataPath, newDataPath)
  361. // 更新缓存
  362. oldNode, ok := c.activeCache.WalkEnd(pathComps)
  363. if ok {
  364. newNode := c.activeCache.CreateWords(newPathComps)
  365. newNode.Value = oldNode.Value
  366. newNode.WordNexts = oldNode.WordNexts
  367. oldNode.RemoveSelf(false)
  368. if newNode.Value != nil {
  369. newNode.Value.Move(newPathComps)
  370. }
  371. newNode.Iterate(func(path []string, node *trie.Node[*CacheFile], isWordNode bool) trie.VisitCtrl {
  372. if node.Value != nil {
  373. node.Value.Move(lo2.AppendNew(newPathComps, path...))
  374. }
  375. return trie.VisitContinue
  376. })
  377. }
  378. logger.Debugf("cache moved: %v -> %v", pathComps, newPathComps)
  379. return nil
  380. }
  381. type syncPackage struct {
  382. bktName string
  383. pkgName string
  384. pkg clitypes.Package
  385. upObjs []*uploadingObject
  386. }
  387. type uploadingObject struct {
  388. pathComps []string
  389. cache *CacheFile
  390. reader *CacheFileHandle
  391. modTime time.Time
  392. metaRevision int
  393. isDeleted bool
  394. isSuccess bool
  395. }
  396. type packageFullName struct {
  397. bktName string
  398. pkgName string
  399. }
  400. func (c *Cache) scanningCache() {
  401. ticker := time.NewTicker(time.Second * 5)
  402. defer ticker.Stop()
  403. lastScanPath := []string{}
  404. for {
  405. select {
  406. case _, ok := <-c.cacheDone:
  407. if !ok {
  408. return
  409. }
  410. case <-ticker.C:
  411. }
  412. c.lock.Lock()
  413. uploadingPkgs := make(map[packageFullName]*syncPackage)
  414. visitCnt := 0
  415. visitBreak := false
  416. node, _ := c.activeCache.WalkEnd(lastScanPath)
  417. node.Iterate(func(path []string, node *trie.Node[*CacheFile], isWordNode bool) trie.VisitCtrl {
  418. ch := node.Value
  419. if ch == nil {
  420. return trie.VisitContinue
  421. }
  422. info := ch.Info()
  423. if info.RefCount > 0 {
  424. logger.Debugf("skip cache %v, refCount: %v", path, info.RefCount)
  425. return trie.VisitContinue
  426. }
  427. visitCnt++
  428. c.visitNode(path, node, ch, info, uploadingPkgs)
  429. // 每次最多遍历500个节点,防止占用锁太久
  430. if visitCnt > 500 {
  431. lastScanPath = lo2.ArrayClone(path)
  432. visitBreak = true
  433. return trie.VisitBreak
  434. }
  435. return trie.VisitContinue
  436. })
  437. if !visitBreak {
  438. lastScanPath = []string{}
  439. }
  440. c.lock.Unlock()
  441. if len(uploadingPkgs) > 0 {
  442. go c.doSync(lo.Values(uploadingPkgs))
  443. }
  444. }
  445. }
  446. func (c *Cache) visitNode(path []string, node *trie.Node[*CacheFile], ch *CacheFile, info CacheEntryInfo, uploadingPkgs map[packageFullName]*syncPackage) {
  447. shouldUpload := true
  448. // 不存放在Package里的文件,不需要上传
  449. if len(ch.pathComps) <= 2 {
  450. shouldUpload = false
  451. }
  452. if !c.syncFilter.ShouldSync(ch.pathComps, info.Size) {
  453. shouldUpload = false
  454. }
  455. // 1. 本地缓存被修改了,如果一段时间内没有被使用,则进行上传
  456. if shouldUpload && (info.DataRevision > 0 || info.MetaRevision > 0) {
  457. if time.Since(info.FreeTime) < c.cfg.UploadPendingTime {
  458. return
  459. }
  460. if ch.state.uploading != nil {
  461. return
  462. }
  463. fullName := packageFullName{ch.pathComps[0], ch.pathComps[1]}
  464. pkg, ok := uploadingPkgs[fullName]
  465. if !ok {
  466. pkg = &syncPackage{
  467. bktName: ch.pathComps[0],
  468. pkgName: ch.pathComps[1],
  469. }
  470. uploadingPkgs[fullName] = pkg
  471. }
  472. obj := &uploadingObject{
  473. pathComps: lo2.ArrayClone(ch.pathComps),
  474. cache: ch,
  475. }
  476. pkg.upObjs = append(pkg.upObjs, obj)
  477. ch.state.uploading = obj
  478. if info.DataRevision > 0 {
  479. obj.reader = ch.OpenReadWhenScanning()
  480. }
  481. if info.MetaRevision > 0 {
  482. obj.modTime = info.ModTime
  483. obj.metaRevision = info.MetaRevision
  484. }
  485. return
  486. }
  487. // 2. 本地缓存没有被修改,如果一段时间内没有被使用,则进行卸载
  488. if info.Level > LevelReadOnly {
  489. if time.Since(info.FreeTime) > c.cfg.CacheActiveTime {
  490. ch.LevelDown(LevelReadOnly)
  491. }
  492. return
  493. }
  494. // 3. 卸载后的缓存,如果一段时间内没有被使用,则进行删除。
  495. if info.Level <= LevelReadOnly {
  496. // 需要同时满足距上次使用时间和距上次卸载时间超过配置的时间,才可以删除
  497. if time.Since(info.FreeTime) > c.cfg.CacheExpireTime && time.Since(info.ChangeLevelTime) > c.cfg.CacheExpireTime {
  498. // 如果文件已经同步到远端,则可以直接删除本地缓存
  499. if info.MetaRevision == 0 && info.DataRevision == 0 {
  500. ch.Delete()
  501. }
  502. node.RemoveSelf(true)
  503. }
  504. return
  505. }
  506. }
  507. func (c *Cache) scanningData() {
  508. ticker := time.NewTicker(c.cfg.ScanDataDirInterval)
  509. defer ticker.Stop()
  510. var walkTrace []*os.File
  511. var walkTraceComps []string
  512. for {
  513. select {
  514. case <-ticker.C:
  515. case <-c.cacheDone:
  516. return
  517. }
  518. logger.Infof("begin scanning data dir")
  519. if len(walkTrace) == 0 {
  520. dir, err := os.Open(c.cfg.DataDir)
  521. if err != nil {
  522. logger.Warnf("open data dir: %v", err)
  523. continue
  524. }
  525. walkTrace = []*os.File{dir}
  526. walkTraceComps = []string{c.cfg.MetaDir}
  527. }
  528. const maxVisitCnt = 5000
  529. const maxUntrackedFiles = 500
  530. var untrackedFiles [][]string
  531. visitCnt := 0
  532. // 一次最多遍历5000个文件(包括路径上的文件夹),一次最多添加500个未跟踪文件
  533. for len(walkTrace) > 0 && visitCnt < maxVisitCnt && len(untrackedFiles) < maxUntrackedFiles {
  534. lastNode := walkTrace[len(walkTrace)-1]
  535. visitCnt++
  536. e, err := lastNode.Readdir(1)
  537. if err == io.EOF {
  538. lastNode.Close()
  539. walkTrace = walkTrace[:len(walkTrace)-1]
  540. walkTraceComps = walkTraceComps[:len(walkTraceComps)-1]
  541. continue
  542. }
  543. if err != nil {
  544. logger.Warnf("read dir %v: %v", lastNode.Name(), err)
  545. lastNode.Close()
  546. walkTrace = walkTrace[:len(walkTrace)-1]
  547. walkTraceComps = walkTraceComps[:len(walkTraceComps)-1]
  548. continue
  549. }
  550. if e[0].IsDir() {
  551. child, err := os.Open(filepath.Join(lastNode.Name(), e[0].Name()))
  552. if err != nil {
  553. logger.Warnf("open dir %v: %v", e[0].Name(), err)
  554. continue
  555. }
  556. walkTrace = append(walkTrace, child)
  557. walkTraceComps = append(walkTraceComps, e[0].Name())
  558. continue
  559. }
  560. // 对于不在Package层级的文件,不跟踪
  561. if len(walkTrace) <= 2 {
  562. continue
  563. }
  564. walkTraceComps = append(walkTraceComps, e[0].Name())
  565. if !c.syncFilter.ShouldSync(walkTraceComps, e[0].Size()) {
  566. walkTraceComps = walkTraceComps[:len(walkTraceComps)-1]
  567. continue
  568. }
  569. fileMetaPath := filepath.Join(walkTraceComps...)
  570. _, err = os.Stat(fileMetaPath)
  571. if err == nil || !os.IsNotExist(err) {
  572. walkTraceComps = walkTraceComps[:len(walkTraceComps)-1]
  573. continue
  574. }
  575. untrackedFiles = append(untrackedFiles, lo2.ArrayClone(walkTraceComps[1:]))
  576. walkTraceComps = walkTraceComps[:len(walkTraceComps)-1]
  577. }
  578. if len(untrackedFiles) > 0 {
  579. for _, comps := range untrackedFiles {
  580. ch := c.LoadFile(comps, nil)
  581. if ch != nil {
  582. ch.Release()
  583. }
  584. }
  585. }
  586. logger.Infof("%v file visited, %v untracked files found", visitCnt, len(untrackedFiles))
  587. }
  588. }
  589. func (c *Cache) doSync(pkgs []*syncPackage) {
  590. var uploadPkgs []*syncPackage
  591. var updateOnlyPkgs []*syncPackage
  592. for _, p := range pkgs {
  593. var updateOnly *syncPackage
  594. var upload *syncPackage
  595. for _, o := range p.upObjs {
  596. if o.reader != nil {
  597. if upload == nil {
  598. upload = &syncPackage{
  599. bktName: p.bktName,
  600. pkgName: p.pkgName,
  601. }
  602. }
  603. upload.upObjs = append(upload.upObjs, o)
  604. } else {
  605. if updateOnly == nil {
  606. updateOnly = &syncPackage{
  607. bktName: p.bktName,
  608. pkgName: p.pkgName,
  609. }
  610. }
  611. updateOnly.upObjs = append(updateOnly.upObjs, o)
  612. }
  613. }
  614. if upload != nil {
  615. uploadPkgs = append(uploadPkgs, upload)
  616. }
  617. if updateOnly != nil {
  618. updateOnlyPkgs = append(updateOnlyPkgs, updateOnly)
  619. }
  620. }
  621. // 先上传文件,再更新文件元数据。上传文件时会创建Package,这样后续更新元数据时就能查到Package。
  622. if len(uploadPkgs) > 0 {
  623. c.doUploading(uploadPkgs)
  624. }
  625. if len(updateOnlyPkgs) > 0 {
  626. c.doUpdatingOnly(updateOnlyPkgs)
  627. }
  628. }
  629. func (c *Cache) doUpdatingOnly(pkgs []*syncPackage) {
  630. /// 1. 只是更新元数据,那么就只尝试查询Package
  631. var sucPkgs []*syncPackage
  632. var failedPkgs []*syncPackage
  633. for _, pkg := range pkgs {
  634. p, err := c.db.Package().GetByFullName(c.db.DefCtx(), pkg.bktName, pkg.pkgName)
  635. if err != nil {
  636. logger.Warnf("get package %v/%v: %v", pkg.bktName, pkg.pkgName, err)
  637. failedPkgs = append(failedPkgs, pkg)
  638. continue
  639. }
  640. pkg.pkg = p
  641. sucPkgs = append(sucPkgs, pkg)
  642. }
  643. /// 2. 对于创建失败的Package, 在锁的保护下取消上传状态
  644. c.lock.Lock()
  645. for _, pkg := range failedPkgs {
  646. for _, obj := range pkg.upObjs {
  647. obj.cache.state.uploading = nil
  648. }
  649. }
  650. c.lock.Unlock()
  651. /// 3. 开始更新每个Package
  652. for _, p := range sucPkgs {
  653. pathes := make([]string, 0, len(p.upObjs))
  654. modTimes := make([]time.Time, 0, len(p.upObjs))
  655. for _, obj := range p.upObjs {
  656. pathes = append(pathes, clitypes.JoinObjectPath(obj.pathComps[2:]...))
  657. modTimes = append(modTimes, obj.modTime)
  658. }
  659. err := c.db.Object().BatchUpdateUpdateTimeByPath(c.db.DefCtx(), p.pkg.PackageID, pathes, modTimes)
  660. if err != nil {
  661. logger.Warnf("batch update package %v/%v: %v", p.bktName, p.pkgName, err)
  662. c.lock.Lock()
  663. for _, obj := range p.upObjs {
  664. obj.cache.state.uploading = nil
  665. }
  666. c.lock.Unlock()
  667. continue
  668. }
  669. logger.Infof("update %v object in package %v/%v", len(p.upObjs), p.bktName, p.pkgName)
  670. // 登记上传结果
  671. c.lock.Lock()
  672. for _, obj := range p.upObjs {
  673. obj.cache.state.uploading = nil
  674. obj.cache.RevisionUploaded(0, obj.metaRevision)
  675. }
  676. c.lock.Unlock()
  677. }
  678. }
  679. func (c *Cache) doUploading(pkgs []*syncPackage) {
  680. /// 1. 先尝试创建Package
  681. var sucPkgs []*syncPackage
  682. var failedPkgs []*syncPackage
  683. for _, pkg := range pkgs {
  684. p, err := db.DoTx21(c.db, c.db.Package().TryCreateAll, pkg.bktName, pkg.pkgName)
  685. if err != nil {
  686. logger.Warnf("try create package %v/%v: %v", pkg.bktName, pkg.pkgName, err)
  687. failedPkgs = append(failedPkgs, pkg)
  688. continue
  689. }
  690. pkg.pkg = p
  691. sucPkgs = append(sucPkgs, pkg)
  692. }
  693. /// 2. 对于创建失败的Package,直接关闭文件,不进行上传
  694. // 在锁的保护下取消上传状态
  695. c.lock.Lock()
  696. for _, pkg := range failedPkgs {
  697. for _, obj := range pkg.upObjs {
  698. obj.cache.state.uploading = nil
  699. }
  700. }
  701. c.lock.Unlock()
  702. for _, pkg := range failedPkgs {
  703. for _, obj := range pkg.upObjs {
  704. obj.reader.Close()
  705. }
  706. }
  707. /// 3. 开始上传每个Package
  708. for _, p := range sucPkgs {
  709. upder, err := c.uploader.BeginUpdate(p.pkg.PackageID, 0, nil, nil)
  710. if err != nil {
  711. logger.Warnf("begin upload package %v/%v: %v", p.bktName, p.pkgName, err)
  712. // 取消上传状态
  713. c.lock.Lock()
  714. for _, obj := range p.upObjs {
  715. obj.cache.state.uploading = nil
  716. }
  717. c.lock.Unlock()
  718. for _, obj := range p.upObjs {
  719. obj.reader.Close()
  720. }
  721. continue
  722. }
  723. upSuc := 0
  724. upSucAmt := int64(0)
  725. upFailed := 0
  726. upStartTime := time.Now()
  727. logger.Infof("begin uploading %v objects to package %v/%v", len(p.upObjs), p.bktName, p.pkgName)
  728. for _, o := range p.upObjs {
  729. rd := cacheFileReader{
  730. rw: o.reader,
  731. }
  732. counter := io2.Counter(&rd)
  733. err = upder.Upload(clitypes.JoinObjectPath(o.pathComps[2:]...), counter, uploader.UploadOption{
  734. CreateTime: o.modTime,
  735. })
  736. if err != nil {
  737. logger.Warnf("upload object %v: %v", o.pathComps, err)
  738. upFailed++
  739. continue
  740. }
  741. o.isSuccess = true
  742. upSuc++
  743. upSucAmt += counter.Count()
  744. }
  745. // 在锁保护下登记上传结果
  746. c.lock.Lock()
  747. upCancel := 0
  748. upRename := 0
  749. // 检查是否有文件在上传期间发生了变化
  750. var sucObjs []*uploadingObject
  751. for _, o := range p.upObjs {
  752. o.cache.state.uploading = nil
  753. if !o.isSuccess {
  754. continue
  755. }
  756. oldPath := clitypes.JoinObjectPath(o.pathComps[2:]...)
  757. newPath := clitypes.JoinObjectPath(o.cache.pathComps[2:]...)
  758. if o.isDeleted {
  759. upder.CancelObject(oldPath)
  760. upCancel++
  761. continue
  762. }
  763. // 如果对象移动到了另一个Package,那么也要取消上传
  764. if !lo2.ArrayEquals(o.pathComps[:2], o.cache.pathComps[:2]) {
  765. upder.CancelObject(oldPath)
  766. upCancel++
  767. continue
  768. }
  769. // 只有仍在同Package内移动的对象才能直接重命名
  770. if newPath != oldPath {
  771. upder.RenameObject(oldPath, newPath)
  772. upRename++
  773. }
  774. sucObjs = append(sucObjs, o)
  775. }
  776. _, err = upder.Commit()
  777. if err != nil {
  778. logger.Warnf("commit update package %v/%v: %v", p.bktName, p.pkgName, err)
  779. } else {
  780. for _, obj := range sucObjs {
  781. obj.cache.RevisionUploaded(obj.reader.revision, obj.metaRevision)
  782. }
  783. upTime := time.Since(upStartTime)
  784. logger.Infof("upload package %v/%v in %v, upload: %v, size: %v, speed: %v/s, cancel: %v, rename: %v",
  785. p.bktName, p.pkgName, upTime, upSuc, upSucAmt, bytesize.New(float64(upSucAmt)/upTime.Seconds()), upCancel, upRename)
  786. }
  787. c.lock.Unlock()
  788. // 关闭文件会影响refCount,所以无论是上传失败还是上传成功,都会在等待一段时间后才进行下一阶段的操作
  789. for _, obj := range p.upObjs {
  790. obj.reader.Close()
  791. }
  792. }
  793. }
  794. type cacheFileReader struct {
  795. rw *CacheFileHandle
  796. pos int64
  797. }
  798. func (r *cacheFileReader) Read(p []byte) (int, error) {
  799. n, err := r.rw.ReadAt(p, r.pos)
  800. r.pos += int64(n)
  801. if err != nil {
  802. return n, err
  803. }
  804. if n != len(p) {
  805. return n, io.EOF
  806. }
  807. return n, nil
  808. }

本项目旨在将云际存储公共基础设施化,使个人及企业可低门槛使用高效的云际存储服务(安装开箱即用云际存储客户端即可,无需关注其他组件的部署),同时支持用户灵活便捷定制云际存储的功能细节。