You can not select more than 25 topics Topics must start with a chinese character,a letter or number, can include dashes ('-') and can be up to 35 characters long.

file.go 21 kB

8 months ago
8 months ago
8 months ago
8 months ago
8 months ago
8 months ago
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916
  1. package cache
  2. import (
  3. "fmt"
  4. "io"
  5. "os"
  6. "path/filepath"
  7. "sync"
  8. "time"
  9. "gitlink.org.cn/cloudream/common/pkgs/logger"
  10. "gitlink.org.cn/cloudream/common/utils/io2"
  11. "gitlink.org.cn/cloudream/common/utils/lo2"
  12. "gitlink.org.cn/cloudream/common/utils/math2"
  13. "gitlink.org.cn/cloudream/common/utils/serder"
  14. "gitlink.org.cn/cloudream/jcs-pub/client/internal/mount/fuse"
  15. clitypes "gitlink.org.cn/cloudream/jcs-pub/client/types"
  16. )
  17. type CacheLevel int
  18. const (
  19. // 未加载
  20. LevelNotLoaded CacheLevel = iota
  21. // 缓存数据都完整,但仅加载了一次元数据,只能读取,不能修改。
  22. LevelReadOnly
  23. // 缓存数据都完整存在,但仅加载了元数据,没有加载文件数据
  24. LevelMetaLoaded
  25. // 缓存数据都完整存在,且已加载到内存中
  26. LevelComplete
  27. )
  28. func (l CacheLevel) String() string {
  29. var levels = []string{"NotLoaded", "ReadOnly", "MetaLoaded", "Complete"}
  30. return levels[l]
  31. }
  32. type FileInfo struct {
  33. // 文件总大小。可能会超过对应的远端文件的大小。
  34. // 此大小可能与本地缓存文件大小也不同,需要定时将本地缓存文件大小修正到与这个值相同。
  35. Size int64
  36. // 文件数据的版本号。如果大于0,则代表有未提交的修改
  37. DataRevision int
  38. // 文件元数据的版本号
  39. MetaRevision int
  40. // 数据段列表,按照段开始位置从小到大排列
  41. Segments []*Range
  42. // 文件对应的对象ID,仅在文件是一个缓存文件时才有值
  43. // ObjectID cdssdk.ObjectID
  44. // 文件对应的对象大小,仅在文件是一个缓存文件时才有值。
  45. // 此值代表有多少数据应该从远端加载,所以可能会小于远端实际大小
  46. ObjectSize int64
  47. // 如果本文件完全是一个缓存文件,那么这个字段记录了其内容的哈希值,用于在下载缓存数据时,检查远端文件是否被修改过
  48. // Hash cdssdk.FileHash
  49. // 文件的最后修改时间
  50. ModTime time.Time
  51. // 文件的权限
  52. Perm os.FileMode
  53. }
  54. func (f *FileInfo) Clone() FileInfo {
  55. n := *f
  56. n.Segments = make([]*Range, len(f.Segments))
  57. for i, seg := range f.Segments {
  58. n.Segments[i] = &Range{
  59. Position: seg.Position,
  60. Length: seg.Length,
  61. }
  62. }
  63. return n
  64. }
  65. type Range struct {
  66. Position int64
  67. Length int64
  68. }
  69. func (r *Range) GetPosition() int64 {
  70. return r.Position
  71. }
  72. func (r *Range) SetPosition(pos int64) {
  73. r.Position = pos
  74. }
  75. func (r *Range) GetLength() int64 {
  76. return r.Length
  77. }
  78. func (r *Range) SetLength(length int64) {
  79. r.Length = length
  80. }
  81. func (r *Range) End() int64 {
  82. return r.Position + r.Length
  83. }
  84. // 所有读写过程共用同一个CacheFile对象。
  85. // 不应该将此结构体保存到对象中
  86. type CacheFile struct {
  87. cache *Cache
  88. pathComps []string
  89. info FileInfo
  90. remoteObj *clitypes.Object
  91. rwLock *sync.RWMutex
  92. readers []*CacheFileHandle
  93. writers []*CacheFileHandle
  94. saveMetaChan chan any
  95. saveMetaLock *sync.Mutex
  96. stopSaveMeta *bool
  97. isDeleted bool
  98. level CacheLevel
  99. refCount int
  100. freeTime time.Time
  101. changeLevelTime time.Time
  102. metaFile *os.File
  103. dataFile *os.File
  104. writeLock *sync.RWMutex
  105. // 缓存文件的状态,用于管理缓存文件的生命周期。不受rwLock保护,而是由Cache管理
  106. state cacheState
  107. }
  108. type cacheState struct {
  109. uploading *uploadingObject
  110. }
  111. func createNewCacheFile(cache *Cache, pathComps []string) (*CacheFile, error) {
  112. metaPath := cache.GetCacheMetaPath(pathComps...)
  113. dataPath := cache.GetCacheDataPath(pathComps...)
  114. info := FileInfo{
  115. DataRevision: 1,
  116. ModTime: time.Now(),
  117. Perm: 0755,
  118. }
  119. infoData, err := serder.ObjectToJSON(info)
  120. if err != nil {
  121. return nil, err
  122. }
  123. err = os.MkdirAll(filepath.Dir(metaPath), 0755)
  124. if err != nil {
  125. return nil, err
  126. }
  127. metaFile, err := os.OpenFile(metaPath, os.O_CREATE|os.O_RDWR|os.O_TRUNC, 0644)
  128. if err != nil {
  129. return nil, fmt.Errorf("create cache meta file: %w", err)
  130. }
  131. err = io2.WriteAll(metaFile, infoData)
  132. if err != nil {
  133. metaFile.Close()
  134. return nil, fmt.Errorf("save cache meta file: %w", err)
  135. }
  136. dataFile, err := os.OpenFile(dataPath, os.O_CREATE|os.O_RDWR|os.O_TRUNC, 0644)
  137. if err != nil {
  138. metaFile.Close()
  139. return nil, fmt.Errorf("create cache data file: %w", err)
  140. }
  141. ch := &CacheFile{
  142. cache: cache,
  143. pathComps: pathComps,
  144. info: info,
  145. rwLock: &sync.RWMutex{},
  146. saveMetaChan: make(chan any, 1),
  147. saveMetaLock: &sync.Mutex{},
  148. stopSaveMeta: new(bool),
  149. level: LevelComplete,
  150. metaFile: metaFile,
  151. dataFile: dataFile,
  152. writeLock: &sync.RWMutex{},
  153. state: cacheState{},
  154. }
  155. go ch.serving(ch.saveMetaChan, ch.stopSaveMeta)
  156. return ch, nil
  157. }
  158. func loadCacheFile(cache *Cache, pathComps []string) (*CacheFile, error) {
  159. metaPath := cache.GetCacheMetaPath(pathComps...)
  160. dataPath := cache.GetCacheDataPath(pathComps...)
  161. dataFile, err := os.OpenFile(dataPath, os.O_RDWR, 0644)
  162. if err != nil {
  163. // 不要包装这里的err
  164. return nil, err
  165. }
  166. info := &FileInfo{}
  167. metaFile, err := os.OpenFile(metaPath, os.O_RDWR, 0644)
  168. if err != nil {
  169. // 如果有数据文件,而没有元数据文件,则创建一个元数据文件
  170. if !os.IsNotExist(err) {
  171. dataFile.Close()
  172. return nil, err
  173. }
  174. stat, err := dataFile.Stat()
  175. if err != nil {
  176. dataFile.Close()
  177. return nil, err
  178. }
  179. err = os.MkdirAll(filepath.Dir(metaPath), 0755)
  180. if err != nil {
  181. dataFile.Close()
  182. return nil, err
  183. }
  184. metaFile, err = os.OpenFile(metaPath, os.O_CREATE|os.O_RDWR|os.O_TRUNC, 0644)
  185. if err != nil {
  186. dataFile.Close()
  187. return nil, err
  188. }
  189. info.Size = stat.Size()
  190. info.ModTime = stat.ModTime()
  191. info.Perm = stat.Mode().Perm()
  192. info.Segments = []*Range{{Position: 0, Length: info.Size}}
  193. info.MetaRevision = 1 // 未同步的文件视为已修改
  194. info.DataRevision = 1
  195. } else {
  196. err = serder.JSONToObjectStream(metaFile, info)
  197. if err != nil {
  198. dataFile.Close()
  199. return nil, err
  200. }
  201. }
  202. ch := &CacheFile{
  203. cache: cache,
  204. pathComps: pathComps,
  205. info: *info,
  206. rwLock: &sync.RWMutex{},
  207. saveMetaChan: make(chan any, 1),
  208. saveMetaLock: &sync.Mutex{},
  209. stopSaveMeta: new(bool),
  210. level: LevelComplete,
  211. metaFile: metaFile,
  212. dataFile: dataFile,
  213. writeLock: &sync.RWMutex{},
  214. state: cacheState{},
  215. }
  216. go ch.serving(ch.saveMetaChan, ch.stopSaveMeta)
  217. return ch, nil
  218. }
  219. func newCacheFileFromObject(cache *Cache, pathComps []string, obj *clitypes.Object) (*CacheFile, error) {
  220. metaPath := cache.GetCacheMetaPath(pathComps...)
  221. dataPath := cache.GetCacheDataPath(pathComps...)
  222. info := FileInfo{
  223. Size: obj.Size,
  224. ObjectSize: obj.Size,
  225. ModTime: obj.UpdateTime,
  226. Perm: 0755,
  227. }
  228. infoData, err := serder.ObjectToJSON(info)
  229. if err != nil {
  230. return nil, err
  231. }
  232. err = os.MkdirAll(filepath.Dir(metaPath), 0755)
  233. if err != nil {
  234. return nil, err
  235. }
  236. metaFile, err := os.OpenFile(metaPath, os.O_CREATE|os.O_RDWR|os.O_TRUNC, 0644)
  237. if err != nil {
  238. return nil, fmt.Errorf("create cache meta file: %w", err)
  239. }
  240. err = io2.WriteAll(metaFile, infoData)
  241. if err != nil {
  242. metaFile.Close()
  243. return nil, fmt.Errorf("save cache meta file: %w", err)
  244. }
  245. dataFile, err := os.OpenFile(dataPath, os.O_CREATE|os.O_RDWR|os.O_TRUNC, 0644)
  246. if err != nil {
  247. metaFile.Close()
  248. return nil, fmt.Errorf("create cache file: %w", err)
  249. }
  250. ch := &CacheFile{
  251. cache: cache,
  252. pathComps: pathComps,
  253. info: info,
  254. remoteObj: obj,
  255. rwLock: &sync.RWMutex{},
  256. saveMetaChan: make(chan any, 1),
  257. saveMetaLock: &sync.Mutex{},
  258. stopSaveMeta: new(bool),
  259. level: LevelComplete,
  260. metaFile: metaFile,
  261. dataFile: dataFile,
  262. writeLock: &sync.RWMutex{},
  263. state: cacheState{},
  264. }
  265. go ch.serving(ch.saveMetaChan, ch.stopSaveMeta)
  266. return ch, nil
  267. }
  268. func loadCacheFileInfo(cache *Cache, pathComps []string, dataFileInfo os.FileInfo) (*CacheEntryInfo, error) {
  269. metaPath := cache.GetCacheMetaPath(pathComps...)
  270. metaData, err := os.ReadFile(metaPath)
  271. if err == nil {
  272. info := &FileInfo{}
  273. err = serder.JSONToObject(metaData, info)
  274. if err != nil {
  275. return nil, err
  276. }
  277. return &CacheEntryInfo{
  278. PathComps: pathComps,
  279. Size: info.Size,
  280. Perm: info.Perm,
  281. ModTime: info.ModTime,
  282. IsDir: false,
  283. }, nil
  284. }
  285. if !os.IsNotExist(err) {
  286. return nil, err
  287. }
  288. return &CacheEntryInfo{
  289. PathComps: pathComps,
  290. Size: dataFileInfo.Size(),
  291. Perm: dataFileInfo.Mode(),
  292. ModTime: dataFileInfo.ModTime(),
  293. IsDir: false,
  294. }, nil
  295. }
  296. // 增加一个引用计数。不应该被Cache之外的代码调用。
  297. func (f *CacheFile) IncRef() {
  298. f.rwLock.Lock()
  299. defer f.rwLock.Unlock()
  300. f.refCount++
  301. }
  302. // 减少一个引用计数
  303. func (f *CacheFile) Release() {
  304. f.rwLock.Lock()
  305. defer f.rwLock.Unlock()
  306. f.refCount--
  307. if f.refCount == 0 {
  308. f.freeTime = time.Now()
  309. }
  310. }
  311. func (f *CacheFile) LevelDown(level CacheLevel) bool {
  312. if level <= LevelReadOnly {
  313. // 如果降级到不需要保存元数据的级别,就要先暂停保存元数据的操作
  314. f.saveMetaLock.Lock()
  315. defer f.saveMetaLock.Unlock()
  316. }
  317. f.rwLock.Lock()
  318. defer f.rwLock.Unlock()
  319. if level >= f.level {
  320. return true
  321. }
  322. // 缓存正在被使用时,不能降级
  323. if f.refCount > 0 {
  324. return false
  325. }
  326. switch f.level {
  327. case LevelComplete:
  328. f.dataFile.Close()
  329. f.level = LevelMetaLoaded
  330. if level >= f.level {
  331. break
  332. }
  333. fallthrough
  334. case LevelMetaLoaded:
  335. if !f.isDeleted {
  336. // TODO 日志
  337. f.saveMeta(f.info)
  338. }
  339. f.saveMetaChan = nil
  340. // 由于已经获取了saveMetaLock,所以这里设置true之后调用metaFile.Close不会导致saveMeta线程的保存失败
  341. // 因为saveMeta线程会先检查stopSaveMete的值
  342. *f.stopSaveMeta = true
  343. f.stopSaveMeta = nil
  344. f.metaFile.Close()
  345. f.level = LevelReadOnly
  346. if level >= f.level {
  347. break
  348. }
  349. fallthrough
  350. case LevelReadOnly:
  351. f.level = LevelNotLoaded
  352. if level >= f.level {
  353. break
  354. }
  355. fallthrough
  356. case LevelNotLoaded:
  357. }
  358. f.changeLevelTime = time.Now()
  359. return true
  360. }
  361. func (f *CacheFile) LevelUp(level CacheLevel) bool {
  362. f.rwLock.Lock()
  363. defer f.rwLock.Unlock()
  364. if level <= f.level {
  365. return true
  366. }
  367. // 缓存正在使用时,可以升级
  368. switch f.level {
  369. case LevelNotLoaded:
  370. f.level = LevelReadOnly
  371. if level <= f.level {
  372. break
  373. }
  374. fallthrough
  375. case LevelReadOnly:
  376. metaPath := f.cache.GetCacheMetaPath(f.pathComps...)
  377. metaFile, err := os.OpenFile(metaPath, os.O_RDWR, 0644)
  378. if err != nil {
  379. logger.Warnf("open meta file %v: %v", metaPath, err)
  380. return false
  381. }
  382. f.saveMetaChan = make(chan any, 1)
  383. f.stopSaveMeta = new(bool)
  384. f.metaFile = metaFile
  385. f.level = LevelMetaLoaded
  386. go f.serving(f.saveMetaChan, f.stopSaveMeta)
  387. if level <= f.level {
  388. break
  389. }
  390. fallthrough
  391. case LevelMetaLoaded:
  392. dataPath := f.cache.GetCacheDataPath(f.pathComps...)
  393. dataFile, err := os.OpenFile(dataPath, os.O_RDWR|os.O_CREATE, 0644)
  394. if err != nil {
  395. logger.Warnf("open data file %v: %v", dataPath, err)
  396. return false
  397. }
  398. f.dataFile = dataFile
  399. f.level = LevelComplete
  400. if level <= f.level {
  401. break
  402. }
  403. fallthrough
  404. case LevelComplete:
  405. }
  406. f.changeLevelTime = time.Now()
  407. return true
  408. }
  409. func (f *CacheFile) RevisionUploaded(dataRev int, metaRev int) {
  410. f.rwLock.Lock()
  411. defer f.rwLock.Unlock()
  412. if dataRev != 0 && f.info.DataRevision == dataRev {
  413. f.info.DataRevision = 0
  414. }
  415. if metaRev != 0 && f.info.MetaRevision == metaRev {
  416. f.info.MetaRevision = 0
  417. }
  418. f.letSave()
  419. }
  420. func (f *CacheFile) Info() CacheEntryInfo {
  421. f.rwLock.RLock()
  422. defer f.rwLock.RUnlock()
  423. return CacheEntryInfo{
  424. PathComps: f.pathComps,
  425. Size: f.info.Size,
  426. Perm: f.info.Perm,
  427. ModTime: f.info.ModTime,
  428. IsDir: false,
  429. MetaRevision: f.info.MetaRevision,
  430. DataRevision: f.info.DataRevision,
  431. RefCount: f.refCount,
  432. FreeTime: f.freeTime,
  433. Level: f.level,
  434. ChangeLevelTime: f.changeLevelTime,
  435. }
  436. }
  437. func (f *CacheFile) Delete() {
  438. f.writeLock.Lock()
  439. defer f.writeLock.Unlock()
  440. f.rwLock.Lock()
  441. defer f.rwLock.Unlock()
  442. metaPath := f.cache.GetCacheMetaPath(f.pathComps...)
  443. dataPath := f.cache.GetCacheDataPath(f.pathComps...)
  444. os.Remove(metaPath)
  445. os.Remove(dataPath)
  446. // 不可能将isDeleted从true改为false,所以这里不需要使用stopSaveChan来等待saveMeta线程退出
  447. f.isDeleted = true
  448. if f.saveMetaChan != nil {
  449. f.letSave()
  450. }
  451. }
  452. func (f *CacheFile) Move(newPathComps []string) {
  453. f.writeLock.Lock()
  454. defer f.writeLock.Unlock()
  455. f.rwLock.Lock()
  456. defer f.rwLock.Unlock()
  457. f.pathComps = newPathComps
  458. if f.saveMetaChan != nil {
  459. f.letSave()
  460. }
  461. }
  462. // 打开一个写入句柄,同时支持读取
  463. func (f *CacheFile) Open(flags uint32) *CacheFileHandle {
  464. logger.Tracef("CacheFile.Open: %v, %#x", f.pathComps, flags)
  465. f.rwLock.Lock()
  466. defer f.rwLock.Unlock()
  467. f.refCount++
  468. h := &CacheFileHandle{
  469. file: f,
  470. remoteLock: &sync.Mutex{},
  471. revision: f.info.DataRevision,
  472. }
  473. if flags&uint32(os.O_RDWR) == uint32(os.O_RDWR) {
  474. h.readable = true
  475. h.writeable = true
  476. } else if flags&uint32(os.O_WRONLY) == uint32(os.O_WRONLY) {
  477. h.writeable = true
  478. } else if flags&uint32(os.O_RDONLY) == uint32(os.O_RDONLY) {
  479. h.readable = true
  480. }
  481. if f.remoteObj != nil {
  482. h.remote = newRemoteLoader(f)
  483. }
  484. if h.writeable {
  485. f.writers = append(f.writers, h)
  486. } else {
  487. f.readers = append(f.readers, h)
  488. }
  489. return h
  490. }
  491. // 打开一个读取句柄,用于同步本地文件到远端
  492. func (f *CacheFile) OpenReadWhenScanning() *CacheFileHandle {
  493. f.rwLock.Lock()
  494. defer f.rwLock.Unlock()
  495. f.refCount++
  496. h := &CacheFileHandle{
  497. file: f,
  498. remoteLock: &sync.Mutex{},
  499. revision: f.info.DataRevision,
  500. readable: true,
  501. }
  502. if f.remoteObj != nil {
  503. h.remote = newRemoteLoader(f)
  504. }
  505. f.readers = append(f.readers, h)
  506. return h
  507. }
  508. func (f *CacheFile) SetModTime(modTime time.Time) error {
  509. logger.Tracef("CacheFile.SetModTime: %v, %v", f.pathComps, modTime)
  510. f.rwLock.Lock()
  511. f.info.ModTime = modTime
  512. f.info.MetaRevision++
  513. f.rwLock.Unlock()
  514. f.letSave()
  515. return nil
  516. }
  517. func (f *CacheFile) Truncate(size int64) error {
  518. logger.Tracef("CacheFile.Truncate: %v, %v", f.pathComps, size)
  519. // 修改文件大小前不允许写入
  520. f.writeLock.Lock()
  521. defer f.writeLock.Unlock()
  522. err := f.dataFile.Truncate(size)
  523. if err != nil {
  524. return err
  525. }
  526. f.rwLock.Lock()
  527. defer f.rwLock.Unlock()
  528. // 调整能从远端下载的大小
  529. f.info.ObjectSize = math2.Min(f.info.ObjectSize, size)
  530. // 调整本地缓存文件里的有效数据大小
  531. if size < f.info.Size {
  532. f.info.Segments = TruncateRange(f.info.Segments, size)
  533. } else if size > f.info.Size {
  534. f.info.Segments = AddRange(f.info.Segments, &Range{Position: f.info.Size, Length: size - f.info.Size})
  535. }
  536. if f.info.Size != size {
  537. f.info.DataRevision++
  538. }
  539. f.info.Size = size
  540. f.letSave()
  541. return nil
  542. }
  543. func (f *CacheFile) serving(saveMetaChan chan any, stopSaveMeta *bool) {
  544. ticker := time.NewTicker(time.Second * 30)
  545. defer ticker.Stop()
  546. for {
  547. select {
  548. case _, ok := <-saveMetaChan:
  549. if !ok {
  550. return
  551. }
  552. case <-ticker.C:
  553. }
  554. f.saveMetaLock.Lock()
  555. if *stopSaveMeta {
  556. f.saveMetaLock.Unlock()
  557. break
  558. }
  559. f.rwLock.RLock()
  560. info := f.info.Clone()
  561. // 如果文件已被删除,则不能再保存元数据,防止覆盖掉新创建的同名文件
  562. if f.isDeleted {
  563. f.rwLock.RUnlock()
  564. f.saveMetaLock.Unlock()
  565. break
  566. }
  567. f.rwLock.RUnlock()
  568. // TODO 错误日志
  569. f.saveMeta(info)
  570. f.metaFile.Sync()
  571. f.saveMetaLock.Unlock()
  572. }
  573. }
  574. func (f *CacheFile) saveMeta(info FileInfo) error {
  575. jsonData, err := serder.ObjectToJSON(info)
  576. if err != nil {
  577. return err
  578. }
  579. err = f.metaFile.Truncate(0)
  580. if err != nil {
  581. return err
  582. }
  583. _, err = f.metaFile.Seek(0, io.SeekStart)
  584. if err != nil {
  585. return err
  586. }
  587. err = io2.WriteAll(f.metaFile, jsonData)
  588. if err != nil {
  589. return err
  590. }
  591. return nil
  592. }
  593. func (f *CacheFile) letSave() {
  594. select {
  595. case f.saveMetaChan <- nil:
  596. default:
  597. }
  598. }
  599. type CacheFileHandle struct {
  600. file *CacheFile
  601. readable bool
  602. writeable bool
  603. remote *RemoteLoader
  604. remoteLock *sync.Mutex
  605. revision int // 打开文件时,文件的版本号
  606. }
  607. func (h *CacheFileHandle) ReadAt(buf []byte, off int64) (int, error) {
  608. log := logger.WithField("F", "CacheFileHandle.ReadAt").
  609. WithField("Path", h.file.pathComps)
  610. log.Tracef("buf: %v, off: %v", len(buf), off)
  611. if !h.readable {
  612. return 0, fuse.ErrPermission
  613. }
  614. // 读取数据必须读满整个buf,否则就会被认为是文件已经结束了
  615. totalReadLen := 0
  616. for totalReadLen < len(buf) {
  617. curBuf := buf[totalReadLen:]
  618. curOff := off + int64(totalReadLen)
  619. h.file.rwLock.RLock()
  620. if curOff >= h.file.info.Size {
  621. h.file.rwLock.RUnlock()
  622. break
  623. }
  624. /// 1. 先尝试从本地缓存文件里读取
  625. rngIdx := FirstContainsIndex(h.file.info.Segments, curOff)
  626. if rngIdx >= 0 && h.file.info.Segments[rngIdx].End() > curOff {
  627. readLen := math2.Min(int64(len(curBuf)), h.file.info.Segments[rngIdx].End()-curOff)
  628. realReadLen, err := h.file.dataFile.ReadAt(curBuf[:readLen], curOff)
  629. totalReadLen += realReadLen
  630. h.file.rwLock.RUnlock()
  631. if err != nil {
  632. log.Tracef("read from local cache: %v", err)
  633. return totalReadLen, err
  634. }
  635. continue
  636. }
  637. // 否则从远端下载,计算一下要加载的长度
  638. loadLen := math2.Min(int64(len(curBuf)), h.file.info.ObjectSize-curOff)
  639. if rngIdx+1 < len(h.file.info.Segments) {
  640. // 最多加载到下一个段的开头
  641. loadLen = math2.Min(loadLen, h.file.info.Segments[rngIdx+1].Position-curOff)
  642. }
  643. h.file.rwLock.RUnlock()
  644. /// 2. 开始从远端下载数据
  645. if h.remote == nil {
  646. log.Warnf("no remote file")
  647. return totalReadLen, fmt.Errorf("no remote file")
  648. }
  649. // 由于RemoteLoader的Load方法没有加锁,所以这里要加锁,防止并发Seek导致的问题
  650. // 可以考虑在RemoteLoader里加锁,这样可以实现跨Writer共用Loader
  651. h.remoteLock.Lock()
  652. realLoadLen, err := h.remote.Load(curBuf[:loadLen], curOff)
  653. totalReadLen += realLoadLen
  654. if err != nil {
  655. h.remoteLock.Unlock()
  656. return totalReadLen, err
  657. }
  658. h.remoteLock.Unlock()
  659. log.Tracef("load from remote: %v", realLoadLen)
  660. /// 3. 数据加载完毕,写入到本地文件
  661. // 在写入到本地之前,先停止其他的写入,防止冲突
  662. h.file.writeLock.Lock()
  663. // 停止其他写入后,就可以计算一下实际要写回的长度。
  664. h.file.rwLock.RLock()
  665. loadRng := &Range{Position: curOff, Length: int64(realLoadLen)}
  666. DifferentRange(loadRng, h.file.info.Segments)
  667. h.file.rwLock.RUnlock()
  668. if loadRng.Length == 0 {
  669. h.file.writeLock.Unlock()
  670. continue
  671. }
  672. // 写入到本地缓存文件
  673. writeStart := loadRng.Position - curOff
  674. _, err = h.file.dataFile.WriteAt(curBuf[writeStart:writeStart+loadRng.Length], curOff)
  675. if err != nil {
  676. h.file.writeLock.Unlock()
  677. log.Warnf("save to local file: %v", err)
  678. return totalReadLen, err
  679. }
  680. log.Tracef("save to local: %v", loadRng.Length)
  681. h.file.writeLock.Unlock()
  682. // 提交到段列表里
  683. h.file.rwLock.Lock()
  684. h.file.info.Segments = AddRange(h.file.info.Segments, loadRng)
  685. h.file.rwLock.Unlock()
  686. h.file.letSave()
  687. }
  688. return totalReadLen, nil
  689. }
  690. func (h *CacheFileHandle) WriteAt(buf []byte, off int64) (int, error) {
  691. log := logger.WithField("F", "CacheFileHandle.WriteAt").WithField("Path", h.file.pathComps)
  692. log.Tracef("buf: %v, off: %v", len(buf), off)
  693. if !h.writeable {
  694. return 0, fuse.ErrPermission
  695. }
  696. // 允许多线程并行写入,但在数据加载期间不能写入
  697. h.file.writeLock.RLock()
  698. defer h.file.writeLock.RUnlock()
  699. // 写入到本地缓存文件
  700. writeLen, err := h.file.dataFile.WriteAt(buf, off)
  701. if err != nil {
  702. log.Tracef("save to local file: %v", err)
  703. return writeLen, err
  704. }
  705. // 提交到段列表里
  706. h.file.rwLock.Lock()
  707. defer h.file.rwLock.Unlock()
  708. h.file.info.Segments = AddRange(h.file.info.Segments, &Range{Position: off, Length: int64(writeLen)})
  709. h.file.info.Size = math2.Max(h.file.info.Size, off+int64(writeLen))
  710. h.file.info.DataRevision++
  711. h.file.letSave()
  712. return writeLen, nil
  713. }
  714. func (f *CacheFileHandle) Sync() error {
  715. log := logger.WithField("Path", f.file.pathComps)
  716. err := f.file.dataFile.Sync()
  717. if err != nil {
  718. log.Tracef("sync local file: %v", err)
  719. return err
  720. }
  721. return nil
  722. }
  723. func (f *CacheFileHandle) Close() error {
  724. f.Sync()
  725. if f.remote != nil {
  726. f.remote.Close()
  727. }
  728. f.file.rwLock.Lock()
  729. defer f.file.rwLock.Unlock()
  730. f.file.refCount--
  731. if f.file.refCount == 0 {
  732. f.file.freeTime = time.Now()
  733. }
  734. if f.writeable {
  735. f.file.writers = lo2.Remove(f.file.writers, f)
  736. } else if f.readable {
  737. f.file.readers = lo2.Remove(f.file.readers, f)
  738. }
  739. return nil
  740. }

本项目旨在将云际存储公共基础设施化,使个人及企业可低门槛使用高效的云际存储服务(安装开箱即用云际存储客户端即可,无需关注其他组件的部署),同时支持用户灵活便捷定制云际存储的功能细节。