You can not select more than 25 topics Topics must start with a chinese character,a letter or number, can include dashes ('-') and can be up to 35 characters long.

file.go 20 kB

8 months ago
8 months ago
8 months ago
8 months ago
8 months ago
8 months ago

  1. package cache
  2. import (
  3. "fmt"
  4. "io"
  5. "os"
  6. "path/filepath"
  7. "sync"
  8. "time"
  9. "gitlink.org.cn/cloudream/common/pkgs/logger"
  10. "gitlink.org.cn/cloudream/common/utils/io2"
  11. "gitlink.org.cn/cloudream/common/utils/lo2"
  12. "gitlink.org.cn/cloudream/common/utils/math2"
  13. "gitlink.org.cn/cloudream/common/utils/serder"
  14. "gitlink.org.cn/cloudream/jcs-pub/client/internal/mount/fuse"
  15. clitypes "gitlink.org.cn/cloudream/jcs-pub/client/types"
  16. )
  17. type CacheLevel int
  18. const (
  19. // 未加载
  20. LevelNotLoaded CacheLevel = iota
  21. // 缓存数据都完整,但仅加载了一次元数据,只能读取,不能修改。
  22. LevelReadOnly
  23. // 缓存数据都完整存在,但仅加载了元数据,没有加载文件数据
  24. LevelMetaLoaded
  25. // 缓存数据都完整存在,且已加载到内存中
  26. LevelComplete
  27. )
  28. func (l CacheLevel) String() string {
  29. var levels = []string{"NotLoaded", "ReadOnly", "MetaLoaded", "Complete"}
  30. return levels[l]
  31. }
  32. type FileInfo struct {
  33. // 文件总大小。可能会超过对应的远端文件的大小。
  34. // 此大小可能与本地缓存文件大小也不同,需要定时将本地缓存文件大小修正到与这个值相同。
  35. Size int64
  36. // 文件数据的版本号。如果大于0,则代表有未提交的修改
  37. DataRevision int
  38. // 文件元数据的版本号
  39. MetaRevision int
  40. // 数据段列表,按照段开始位置从小到大排列
  41. Segments []*Range
  42. // 文件对应的对象ID,仅在文件是一个缓存文件时才有值
  43. // ObjectID cdssdk.ObjectID
  44. // 文件对应的对象大小,仅在文件是一个缓存文件时才有值。
  45. // 此值代表有多少数据应该从远端加载,所以可能会小于远端实际大小
  46. ObjectSize int64
  47. // 如果本文件完全是一个缓存文件,那么这个字段记录了其内容的哈希值,用于在下载缓存数据时,检查远端文件是否被修改过
  48. // Hash cdssdk.FileHash
  49. // 文件的最后修改时间
  50. ModTime time.Time
  51. // 文件的权限
  52. Perm os.FileMode
  53. }
  54. func (f *FileInfo) Clone() FileInfo {
  55. n := *f
  56. n.Segments = make([]*Range, len(f.Segments))
  57. for i, seg := range f.Segments {
  58. n.Segments[i] = &Range{
  59. Position: seg.Position,
  60. Length: seg.Length,
  61. }
  62. }
  63. return n
  64. }
  65. type Range struct {
  66. Position int64
  67. Length int64
  68. }
  69. func (r *Range) GetPosition() int64 {
  70. return r.Position
  71. }
  72. func (r *Range) SetPosition(pos int64) {
  73. r.Position = pos
  74. }
  75. func (r *Range) GetLength() int64 {
  76. return r.Length
  77. }
  78. func (r *Range) SetLength(length int64) {
  79. r.Length = length
  80. }
  81. func (r *Range) End() int64 {
  82. return r.Position + r.Length
  83. }
  84. // 所有读写过程共用同一个CacheFile对象。
  85. // 不应该将此结构体保存到对象中
  86. type CacheFile struct {
  87. cache *Cache
  88. pathComps []string
  89. info FileInfo
  90. remoteObj *clitypes.Object
  91. rwLock *sync.RWMutex
  92. readers []*CacheFileHandle
  93. writers []*CacheFileHandle
  94. saveMetaChan chan any
  95. stopSaveChan chan any
  96. isDeleted bool
  97. level CacheLevel
  98. refCount int
  99. freeTime time.Time
  100. changeLevelTime time.Time
  101. metaFile *os.File
  102. dataFile *os.File
  103. writeLock *sync.RWMutex
  104. // 缓存文件的状态,用于管理缓存文件的生命周期。不受rwLock保护,而是由Cache管理
  105. state cacheState
  106. }
  107. type cacheState struct {
  108. uploading *uploadingObject
  109. }
  110. func createNewCacheFile(cache *Cache, pathComps []string) (*CacheFile, error) {
  111. metaPath := cache.GetCacheMetaPath(pathComps...)
  112. dataPath := cache.GetCacheDataPath(pathComps...)
  113. info := FileInfo{
  114. DataRevision: 1,
  115. ModTime: time.Now(),
  116. Perm: 0755,
  117. }
  118. infoData, err := serder.ObjectToJSON(info)
  119. if err != nil {
  120. return nil, err
  121. }
  122. err = os.MkdirAll(filepath.Dir(metaPath), 0755)
  123. if err != nil {
  124. return nil, err
  125. }
  126. metaFile, err := os.OpenFile(metaPath, os.O_CREATE|os.O_RDWR|os.O_TRUNC, 0644)
  127. if err != nil {
  128. return nil, fmt.Errorf("create cache meta file: %w", err)
  129. }
  130. err = io2.WriteAll(metaFile, infoData)
  131. if err != nil {
  132. metaFile.Close()
  133. return nil, fmt.Errorf("save cache meta file: %w", err)
  134. }
  135. dataFile, err := os.OpenFile(dataPath, os.O_CREATE|os.O_RDWR|os.O_TRUNC, 0644)
  136. if err != nil {
  137. metaFile.Close()
  138. return nil, fmt.Errorf("create cache data file: %w", err)
  139. }
  140. ch := &CacheFile{
  141. cache: cache,
  142. pathComps: pathComps,
  143. info: info,
  144. rwLock: &sync.RWMutex{},
  145. saveMetaChan: make(chan any, 1),
  146. stopSaveChan: make(chan any),
  147. level: LevelComplete,
  148. metaFile: metaFile,
  149. dataFile: dataFile,
  150. writeLock: &sync.RWMutex{},
  151. state: cacheState{},
  152. }
  153. go ch.serving(ch.saveMetaChan, ch.stopSaveChan)
  154. return ch, nil
  155. }
  156. func loadCacheFile(cache *Cache, pathComps []string) (*CacheFile, error) {
  157. metaPath := cache.GetCacheMetaPath(pathComps...)
  158. dataPath := cache.GetCacheDataPath(pathComps...)
  159. dataFile, err := os.OpenFile(dataPath, os.O_RDWR, 0644)
  160. if err != nil {
  161. // 不要包装这里的err
  162. return nil, err
  163. }
  164. info := &FileInfo{}
  165. metaFile, err := os.OpenFile(metaPath, os.O_RDWR, 0644)
  166. if err != nil {
  167. // 如果有数据文件,而没有元数据文件,则创建一个元数据文件
  168. if !os.IsNotExist(err) {
  169. dataFile.Close()
  170. return nil, err
  171. }
  172. stat, err := dataFile.Stat()
  173. if err != nil {
  174. dataFile.Close()
  175. return nil, err
  176. }
  177. err = os.MkdirAll(filepath.Dir(metaPath), 0755)
  178. if err != nil {
  179. dataFile.Close()
  180. return nil, err
  181. }
  182. metaFile, err = os.OpenFile(metaPath, os.O_CREATE|os.O_RDWR|os.O_TRUNC, 0644)
  183. if err != nil {
  184. dataFile.Close()
  185. return nil, err
  186. }
  187. info.Size = stat.Size()
  188. info.ModTime = stat.ModTime()
  189. info.Perm = stat.Mode().Perm()
  190. info.Segments = []*Range{{Position: 0, Length: info.Size}}
  191. info.MetaRevision = 1 // 未同步的文件视为已修改
  192. info.DataRevision = 1
  193. } else {
  194. err = serder.JSONToObjectStream(metaFile, info)
  195. if err != nil {
  196. dataFile.Close()
  197. return nil, err
  198. }
  199. }
  200. ch := &CacheFile{
  201. cache: cache,
  202. pathComps: pathComps,
  203. info: *info,
  204. rwLock: &sync.RWMutex{},
  205. saveMetaChan: make(chan any, 1),
  206. stopSaveChan: make(chan any),
  207. level: LevelComplete,
  208. metaFile: metaFile,
  209. dataFile: dataFile,
  210. writeLock: &sync.RWMutex{},
  211. state: cacheState{},
  212. }
  213. go ch.serving(ch.saveMetaChan, ch.stopSaveChan)
  214. return ch, nil
  215. }
  216. func newCacheFileFromObject(cache *Cache, pathComps []string, obj *clitypes.Object) (*CacheFile, error) {
  217. metaPath := cache.GetCacheMetaPath(pathComps...)
  218. dataPath := cache.GetCacheDataPath(pathComps...)
  219. info := FileInfo{
  220. Size: obj.Size,
  221. ObjectSize: obj.Size,
  222. ModTime: obj.UpdateTime,
  223. Perm: 0755,
  224. }
  225. infoData, err := serder.ObjectToJSON(info)
  226. if err != nil {
  227. return nil, err
  228. }
  229. err = os.MkdirAll(filepath.Dir(metaPath), 0755)
  230. if err != nil {
  231. return nil, err
  232. }
  233. metaFile, err := os.OpenFile(metaPath, os.O_CREATE|os.O_RDWR|os.O_TRUNC, 0644)
  234. if err != nil {
  235. return nil, fmt.Errorf("create cache meta file: %w", err)
  236. }
  237. err = io2.WriteAll(metaFile, infoData)
  238. if err != nil {
  239. metaFile.Close()
  240. return nil, fmt.Errorf("save cache meta file: %w", err)
  241. }
  242. dataFile, err := os.OpenFile(dataPath, os.O_CREATE|os.O_RDWR|os.O_TRUNC, 0644)
  243. if err != nil {
  244. metaFile.Close()
  245. return nil, fmt.Errorf("create cache file: %w", err)
  246. }
  247. ch := &CacheFile{
  248. cache: cache,
  249. pathComps: pathComps,
  250. info: info,
  251. remoteObj: obj,
  252. rwLock: &sync.RWMutex{},
  253. saveMetaChan: make(chan any, 1),
  254. stopSaveChan: make(chan any),
  255. level: LevelComplete,
  256. metaFile: metaFile,
  257. dataFile: dataFile,
  258. writeLock: &sync.RWMutex{},
  259. state: cacheState{},
  260. }
  261. go ch.serving(ch.saveMetaChan, ch.stopSaveChan)
  262. return ch, nil
  263. }
  264. func loadCacheFileInfo(cache *Cache, pathComps []string, dataFileInfo os.FileInfo) (*CacheEntryInfo, error) {
  265. metaPath := cache.GetCacheMetaPath(pathComps...)
  266. metaData, err := os.ReadFile(metaPath)
  267. if err == nil {
  268. info := &FileInfo{}
  269. err = serder.JSONToObject(metaData, info)
  270. if err != nil {
  271. return nil, err
  272. }
  273. return &CacheEntryInfo{
  274. PathComps: pathComps,
  275. Size: info.Size,
  276. Perm: info.Perm,
  277. ModTime: info.ModTime,
  278. IsDir: false,
  279. }, nil
  280. }
  281. if !os.IsNotExist(err) {
  282. return nil, err
  283. }
  284. return &CacheEntryInfo{
  285. PathComps: pathComps,
  286. Size: dataFileInfo.Size(),
  287. Perm: dataFileInfo.Mode(),
  288. ModTime: dataFileInfo.ModTime(),
  289. IsDir: false,
  290. }, nil
  291. }
  292. // 增加一个引用计数。不应该被Cache之外的代码调用。
  293. func (f *CacheFile) IncRef() {
  294. f.rwLock.Lock()
  295. defer f.rwLock.Unlock()
  296. f.refCount++
  297. }
  298. // 减少一个引用计数
  299. func (f *CacheFile) Release() {
  300. f.rwLock.Lock()
  301. defer f.rwLock.Unlock()
  302. f.refCount--
  303. if f.refCount == 0 {
  304. f.freeTime = time.Now()
  305. }
  306. }
  307. func (f *CacheFile) LevelDown(level CacheLevel) bool {
  308. f.rwLock.Lock()
  309. defer f.rwLock.Unlock()
  310. if level >= f.level {
  311. return true
  312. }
  313. // 缓存正在被使用时,不能降级
  314. if f.refCount > 0 {
  315. return false
  316. }
  317. switch f.level {
  318. case LevelComplete:
  319. f.dataFile.Close()
  320. f.level = LevelMetaLoaded
  321. if level >= f.level {
  322. break
  323. }
  324. fallthrough
  325. case LevelMetaLoaded:
  326. if !f.isDeleted {
  327. // TODO 日志
  328. f.saveMeta(f.info)
  329. }
  330. // 这里会等待直到saveMeta线程退出
  331. f.stopSaveChan <- nil
  332. f.saveMetaChan = nil
  333. f.stopSaveChan = nil
  334. f.metaFile.Close()
  335. f.level = LevelReadOnly
  336. if level >= f.level {
  337. break
  338. }
  339. fallthrough
  340. case LevelReadOnly:
  341. f.level = LevelNotLoaded
  342. if level >= f.level {
  343. break
  344. }
  345. fallthrough
  346. case LevelNotLoaded:
  347. }
  348. f.changeLevelTime = time.Now()
  349. return true
  350. }
  351. func (f *CacheFile) LevelUp(level CacheLevel) bool {
  352. f.rwLock.Lock()
  353. defer f.rwLock.Unlock()
  354. if level <= f.level {
  355. return true
  356. }
  357. // 缓存正在使用时,可以升级
  358. switch f.level {
  359. case LevelNotLoaded:
  360. f.level = LevelReadOnly
  361. if level <= f.level {
  362. break
  363. }
  364. fallthrough
  365. case LevelReadOnly:
  366. metaPath := f.cache.GetCacheMetaPath(f.pathComps...)
  367. metaFile, err := os.OpenFile(metaPath, os.O_RDWR, 0644)
  368. if err != nil {
  369. logger.Warnf("open meta file %v: %v", metaPath, err)
  370. return false
  371. }
  372. f.saveMetaChan = make(chan any, 1)
  373. f.stopSaveChan = make(chan any)
  374. f.metaFile = metaFile
  375. f.level = LevelMetaLoaded
  376. go f.serving(f.saveMetaChan, f.stopSaveChan)
  377. if level <= f.level {
  378. break
  379. }
  380. fallthrough
  381. case LevelMetaLoaded:
  382. dataPath := f.cache.GetCacheDataPath(f.pathComps...)
  383. dataFile, err := os.OpenFile(dataPath, os.O_RDWR|os.O_CREATE, 0644)
  384. if err != nil {
  385. logger.Warnf("open data file %v: %v", dataPath, err)
  386. return false
  387. }
  388. f.dataFile = dataFile
  389. f.level = LevelComplete
  390. if level <= f.level {
  391. break
  392. }
  393. fallthrough
  394. case LevelComplete:
  395. }
  396. f.changeLevelTime = time.Now()
  397. return true
  398. }
  399. func (f *CacheFile) RevisionUploaded(dataRev int, metaRev int) {
  400. f.rwLock.Lock()
  401. defer f.rwLock.Unlock()
  402. if dataRev != 0 && f.info.DataRevision == dataRev {
  403. f.info.DataRevision = 0
  404. }
  405. if metaRev != 0 && f.info.MetaRevision == metaRev {
  406. f.info.MetaRevision = 0
  407. }
  408. f.letSave()
  409. }
  410. func (f *CacheFile) Info() CacheEntryInfo {
  411. f.rwLock.RLock()
  412. defer f.rwLock.RUnlock()
  413. return CacheEntryInfo{
  414. PathComps: f.pathComps,
  415. Size: f.info.Size,
  416. Perm: f.info.Perm,
  417. ModTime: f.info.ModTime,
  418. IsDir: false,
  419. MetaRevision: f.info.MetaRevision,
  420. DataRevision: f.info.DataRevision,
  421. RefCount: f.refCount,
  422. FreeTime: f.freeTime,
  423. Level: f.level,
  424. ChangeLevelTime: f.changeLevelTime,
  425. }
  426. }
  427. func (f *CacheFile) Delete() {
  428. f.writeLock.Lock()
  429. defer f.writeLock.Unlock()
  430. f.rwLock.Lock()
  431. defer f.rwLock.Unlock()
  432. metaPath := f.cache.GetCacheMetaPath(f.pathComps...)
  433. dataPath := f.cache.GetCacheDataPath(f.pathComps...)
  434. os.Remove(metaPath)
  435. os.Remove(dataPath)
  436. // 不可能将isDeleted从true改为false,所以这里不需要使用stopSaveChan来等待saveMeta线程退出
  437. f.isDeleted = true
  438. if f.saveMetaChan != nil {
  439. f.letSave()
  440. }
  441. }
  442. func (f *CacheFile) Move(newPathComps []string) {
  443. f.writeLock.Lock()
  444. defer f.writeLock.Unlock()
  445. f.rwLock.Lock()
  446. defer f.rwLock.Unlock()
  447. f.pathComps = newPathComps
  448. if f.saveMetaChan != nil {
  449. f.letSave()
  450. }
  451. }
  452. // 打开一个写入句柄,同时支持读取
  453. func (f *CacheFile) Open(flags uint32) *CacheFileHandle {
  454. logger.Tracef("CacheFile.Open: %v, %#x", f.pathComps, flags)
  455. f.rwLock.Lock()
  456. defer f.rwLock.Unlock()
  457. f.refCount++
  458. h := &CacheFileHandle{
  459. file: f,
  460. remoteLock: &sync.Mutex{},
  461. revision: f.info.DataRevision,
  462. }
  463. if flags&uint32(os.O_RDWR) == uint32(os.O_RDWR) {
  464. h.readable = true
  465. h.writeable = true
  466. } else if flags&uint32(os.O_WRONLY) == uint32(os.O_WRONLY) {
  467. h.writeable = true
  468. } else if flags&uint32(os.O_RDONLY) == uint32(os.O_RDONLY) {
  469. h.readable = true
  470. }
  471. if f.remoteObj != nil {
  472. h.remote = newRemoteLoader(f)
  473. }
  474. if h.writeable {
  475. f.writers = append(f.writers, h)
  476. } else {
  477. f.readers = append(f.readers, h)
  478. }
  479. return h
  480. }
  481. // 打开一个读取句柄,用于同步本地文件到远端
  482. func (f *CacheFile) OpenReadWhenScanning() *CacheFileHandle {
  483. f.rwLock.Lock()
  484. defer f.rwLock.Unlock()
  485. f.refCount++
  486. h := &CacheFileHandle{
  487. file: f,
  488. remoteLock: &sync.Mutex{},
  489. revision: f.info.DataRevision,
  490. readable: true,
  491. }
  492. if f.remoteObj != nil {
  493. h.remote = newRemoteLoader(f)
  494. }
  495. f.readers = append(f.readers, h)
  496. return h
  497. }
  498. func (f *CacheFile) SetModTime(modTime time.Time) error {
  499. logger.Tracef("CacheFile.SetModTime: %v, %v", f.pathComps, modTime)
  500. f.rwLock.Lock()
  501. f.info.ModTime = modTime
  502. f.info.MetaRevision++
  503. f.rwLock.Unlock()
  504. f.letSave()
  505. return nil
  506. }
  507. func (f *CacheFile) Truncate(size int64) error {
  508. logger.Tracef("CacheFile.Truncate: %v, %v", f.pathComps, size)
  509. // 修改文件大小前不允许写入
  510. f.writeLock.Lock()
  511. defer f.writeLock.Unlock()
  512. err := f.dataFile.Truncate(size)
  513. if err != nil {
  514. return err
  515. }
  516. f.rwLock.Lock()
  517. defer f.rwLock.Unlock()
  518. // 调整能从远端下载的大小
  519. f.info.ObjectSize = math2.Min(f.info.ObjectSize, size)
  520. // 调整本地缓存文件里的有效数据大小
  521. if size < f.info.Size {
  522. f.info.Segments = TruncateRange(f.info.Segments, size)
  523. } else if size > f.info.Size {
  524. f.info.Segments = AddRange(f.info.Segments, &Range{Position: f.info.Size, Length: size - f.info.Size})
  525. }
  526. if f.info.Size != size {
  527. f.info.DataRevision++
  528. }
  529. f.info.Size = size
  530. f.letSave()
  531. return nil
  532. }
  533. func (f *CacheFile) serving(saveMetaChan chan any, stopSaveChan chan any) {
  534. ticker := time.NewTicker(time.Second * 5)
  535. defer ticker.Stop()
  536. for {
  537. select {
  538. case _, ok := <-saveMetaChan:
  539. if !ok {
  540. return
  541. }
  542. case <-stopSaveChan:
  543. return
  544. case <-ticker.C:
  545. }
  546. f.rwLock.RLock()
  547. info := f.info.Clone()
  548. // 如果文件已被删除,则不能再保存元数据,防止覆盖掉新创建的同名文件
  549. if f.isDeleted {
  550. f.rwLock.RUnlock()
  551. break
  552. }
  553. f.rwLock.RUnlock()
  554. // TODO 错误日志
  555. f.saveMeta(info)
  556. f.metaFile.Sync()
  557. }
  558. }
  559. func (f *CacheFile) saveMeta(info FileInfo) error {
  560. jsonData, err := serder.ObjectToJSON(info)
  561. if err != nil {
  562. return err
  563. }
  564. err = f.metaFile.Truncate(0)
  565. if err != nil {
  566. return err
  567. }
  568. _, err = f.metaFile.Seek(0, io.SeekStart)
  569. if err != nil {
  570. return err
  571. }
  572. err = io2.WriteAll(f.metaFile, jsonData)
  573. if err != nil {
  574. return err
  575. }
  576. return nil
  577. }
  578. func (f *CacheFile) letSave() {
  579. select {
  580. case f.saveMetaChan <- nil:
  581. default:
  582. }
  583. }
  584. type CacheFileHandle struct {
  585. file *CacheFile
  586. readable bool
  587. writeable bool
  588. remote *RemoteLoader
  589. remoteLock *sync.Mutex
  590. revision int // 打开文件时,文件的版本号
  591. }
  592. func (h *CacheFileHandle) ReadAt(buf []byte, off int64) (int, error) {
  593. log := logger.WithField("F", "CacheFileHandle.ReadAt").
  594. WithField("Path", h.file.pathComps)
  595. log.Tracef("buf: %v, off: %v", len(buf), off)
  596. if !h.readable {
  597. return 0, fuse.ErrPermission
  598. }
  599. // 读取数据必须读满整个buf,否则就会被认为是文件已经结束了
  600. totalReadLen := 0
  601. for totalReadLen < len(buf) {
  602. curBuf := buf[totalReadLen:]
  603. curOff := off + int64(totalReadLen)
  604. h.file.rwLock.RLock()
  605. if curOff >= h.file.info.Size {
  606. h.file.rwLock.RUnlock()
  607. break
  608. }
  609. /// 1. 先尝试从本地缓存文件里读取
  610. rngIdx := FirstContainsIndex(h.file.info.Segments, curOff)
  611. if rngIdx >= 0 && h.file.info.Segments[rngIdx].End() > curOff {
  612. readLen := math2.Min(int64(len(curBuf)), h.file.info.Segments[rngIdx].End()-curOff)
  613. realReadLen, err := h.file.dataFile.ReadAt(curBuf[:readLen], curOff)
  614. totalReadLen += realReadLen
  615. h.file.rwLock.RUnlock()
  616. if err != nil {
  617. log.Tracef("read from local cache: %v", err)
  618. return totalReadLen, err
  619. }
  620. continue
  621. }
  622. // 否则从远端下载,计算一下要加载的长度
  623. loadLen := math2.Min(int64(len(curBuf)), h.file.info.ObjectSize-curOff)
  624. if rngIdx+1 < len(h.file.info.Segments) {
  625. // 最多加载到下一个段的开头
  626. loadLen = math2.Min(loadLen, h.file.info.Segments[rngIdx+1].Position-curOff)
  627. }
  628. h.file.rwLock.RUnlock()
  629. /// 2. 开始从远端下载数据
  630. if h.remote == nil {
  631. log.Warnf("no remote file")
  632. return totalReadLen, fmt.Errorf("no remote file")
  633. }
  634. // 由于RemoteLoader的Load方法没有加锁,所以这里要加锁,防止并发Seek导致的问题
  635. // 可以考虑在RemoteLoader里加锁,这样可以实现跨Writer共用Loader
  636. h.remoteLock.Lock()
  637. realLoadLen, err := h.remote.Load(curBuf[:loadLen], curOff)
  638. totalReadLen += realLoadLen
  639. if err != nil {
  640. h.remoteLock.Unlock()
  641. return totalReadLen, err
  642. }
  643. h.remoteLock.Unlock()
  644. log.Tracef("load from remote: %v", realLoadLen)
  645. /// 3. 数据加载完毕,写入到本地文件
  646. // 在写入到本地之前,先停止其他的写入,防止冲突
  647. h.file.writeLock.Lock()
  648. // 停止其他写入后,就可以计算一下实际要写回的长度。
  649. h.file.rwLock.RLock()
  650. loadRng := &Range{Position: curOff, Length: int64(realLoadLen)}
  651. DifferentRange(loadRng, h.file.info.Segments)
  652. h.file.rwLock.RUnlock()
  653. if loadRng.Length == 0 {
  654. h.file.writeLock.Unlock()
  655. continue
  656. }
  657. // 写入到本地缓存文件
  658. writeStart := loadRng.Position - curOff
  659. _, err = h.file.dataFile.WriteAt(curBuf[writeStart:writeStart+loadRng.Length], curOff)
  660. if err != nil {
  661. h.file.writeLock.Unlock()
  662. log.Warnf("save to local file: %v", err)
  663. return totalReadLen, err
  664. }
  665. log.Tracef("save to local: %v", loadRng.Length)
  666. h.file.writeLock.Unlock()
  667. // 提交到段列表里
  668. h.file.rwLock.Lock()
  669. h.file.info.Segments = AddRange(h.file.info.Segments, loadRng)
  670. h.file.rwLock.Unlock()
  671. h.file.letSave()
  672. }
  673. return totalReadLen, nil
  674. }
  675. func (h *CacheFileHandle) WriteAt(buf []byte, off int64) (int, error) {
  676. log := logger.WithField("F", "CacheFileHandle.WriteAt").WithField("Path", h.file.pathComps)
  677. log.Tracef("buf: %v, off: %v", len(buf), off)
  678. if !h.writeable {
  679. return 0, fuse.ErrPermission
  680. }
  681. // 允许多线程并行写入,但在数据加载期间不能写入
  682. h.file.writeLock.RLock()
  683. defer h.file.writeLock.RUnlock()
  684. // 写入到本地缓存文件
  685. writeLen, err := h.file.dataFile.WriteAt(buf, off)
  686. if err != nil {
  687. log.Tracef("save to local file: %v", err)
  688. return writeLen, err
  689. }
  690. // 提交到段列表里
  691. h.file.rwLock.Lock()
  692. defer h.file.rwLock.Unlock()
  693. h.file.info.Segments = AddRange(h.file.info.Segments, &Range{Position: off, Length: int64(writeLen)})
  694. h.file.info.Size = math2.Max(h.file.info.Size, off+int64(writeLen))
  695. h.file.info.DataRevision++
  696. h.file.letSave()
  697. return writeLen, nil
  698. }
  699. func (f *CacheFileHandle) Sync() error {
  700. log := logger.WithField("Path", f.file.pathComps)
  701. err := f.file.dataFile.Sync()
  702. if err != nil {
  703. log.Tracef("sync local file: %v", err)
  704. return err
  705. }
  706. return nil
  707. }
  708. func (f *CacheFileHandle) Close() error {
  709. f.Sync()
  710. if f.remote != nil {
  711. f.remote.Close()
  712. }
  713. f.file.rwLock.Lock()
  714. defer f.file.rwLock.Unlock()
  715. f.file.refCount--
  716. if f.file.refCount == 0 {
  717. f.file.freeTime = time.Now()
  718. }
  719. if f.writeable {
  720. f.file.writers = lo2.Remove(f.file.writers, f)
  721. } else if f.readable {
  722. f.file.readers = lo2.Remove(f.file.readers, f)
  723. }
  724. return nil
  725. }

本项目旨在将云际存储公共基础设施化,使个人及企业可低门槛使用高效的云际存储服务(安装开箱即用云际存储客户端即可,无需关注其他组件的部署),同时支持用户灵活便捷定制云际存储的功能细节。