You can not select more than 25 topics Topics must start with a chinese character,a letter or number, can include dashes ('-') and can be up to 35 characters long.

cache.go 5.4 kB

2 years ago
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153
  1. package mq
  2. import (
  3. "time"
  4. shell "github.com/ipfs/go-ipfs-api"
  5. "gitlink.org.cn/cloudream/common/consts/errorcode"
  6. "gitlink.org.cn/cloudream/common/pkgs/ipfs"
  7. "gitlink.org.cn/cloudream/common/pkgs/logger"
  8. "gitlink.org.cn/cloudream/common/pkgs/mq"
  9. "gitlink.org.cn/cloudream/storage/agent/internal/config"
  10. "gitlink.org.cn/cloudream/storage/agent/internal/task"
  11. mytask "gitlink.org.cn/cloudream/storage/agent/internal/task"
  12. "gitlink.org.cn/cloudream/storage/common/consts"
  13. "gitlink.org.cn/cloudream/storage/common/globals"
  14. agtmq "gitlink.org.cn/cloudream/storage/common/pkgs/mq/agent"
  15. )
  16. func (svc *Service) CheckCache(msg *agtmq.CheckCache) (*agtmq.CheckCacheResp, *mq.CodeMessage) {
  17. ipfsCli, err := globals.IPFSPool.Acquire()
  18. if err != nil {
  19. logger.Warnf("new ipfs client: %s", err.Error())
  20. return nil, mq.Failed(errorcode.OperationFailed, "new ipfs client failed")
  21. }
  22. defer ipfsCli.Close()
  23. filesMap, err := ipfsCli.GetPinnedFiles()
  24. if err != nil {
  25. logger.Warnf("get pinned files from ipfs failed, err: %s", err.Error())
  26. return nil, mq.Failed(errorcode.OperationFailed, "get pinned files from ipfs failed")
  27. }
  28. // TODO 根据锁定清单过滤被锁定的文件的记录
  29. if msg.IsComplete {
  30. return svc.checkComplete(msg, filesMap, ipfsCli)
  31. } else {
  32. return svc.checkIncrement(msg, filesMap, ipfsCli)
  33. }
  34. }
  35. func (svc *Service) checkIncrement(msg *agtmq.CheckCache, filesMap map[string]shell.PinInfo, ipfsCli *ipfs.PoolClient) (*agtmq.CheckCacheResp, *mq.CodeMessage) {
  36. var entries []agtmq.CheckIPFSRespEntry
  37. for _, cache := range msg.Caches {
  38. _, ok := filesMap[cache.FileHash]
  39. if ok {
  40. if cache.State == consts.CacheStatePinned {
  41. // 不处理
  42. } else if cache.State == consts.CacheStateTemp {
  43. logger.WithField("FileHash", cache.FileHash).Debugf("unpin for cache entry state is temp")
  44. err := ipfsCli.Unpin(cache.FileHash)
  45. if err != nil {
  46. logger.WithField("FileHash", cache.FileHash).Warnf("unpin file failed, err: %s", err.Error())
  47. }
  48. }
  49. // 删除map中的记录,表示此记录已被检查过
  50. delete(filesMap, cache.FileHash)
  51. } else {
  52. if cache.State == consts.CacheStatePinned {
  53. svc.taskManager.StartComparable(task.NewIPFSPin(cache.FileHash))
  54. } else if cache.State == consts.CacheStateTemp {
  55. if time.Since(cache.CacheTime) > time.Duration(config.Cfg().TempFileLifetime)*time.Second {
  56. entries = append(entries, agtmq.NewCheckCacheRespEntry(cache.FileHash, agtmq.CHECK_IPFS_RESP_OP_DELETE_TEMP))
  57. }
  58. }
  59. }
  60. }
  61. // 增量情况下,不需要对filesMap中没检查的记录进行处理
  62. return mq.ReplyOK(agtmq.NewCheckCacheResp(entries))
  63. }
  64. func (svc *Service) checkComplete(msg *agtmq.CheckCache, filesMap map[string]shell.PinInfo, ipfsCli *ipfs.PoolClient) (*agtmq.CheckCacheResp, *mq.CodeMessage) {
  65. var entries []agtmq.CheckIPFSRespEntry
  66. for _, cache := range msg.Caches {
  67. _, ok := filesMap[cache.FileHash]
  68. if ok {
  69. if cache.State == consts.CacheStatePinned {
  70. // 不处理
  71. } else if cache.State == consts.CacheStateTemp {
  72. logger.WithField("FileHash", cache.FileHash).Debugf("unpin for cache entry state is temp")
  73. err := ipfsCli.Unpin(cache.FileHash)
  74. if err != nil {
  75. logger.WithField("FileHash", cache.FileHash).Warnf("unpin file failed, err: %s", err.Error())
  76. }
  77. }
  78. // 删除map中的记录,表示此记录已被检查过
  79. delete(filesMap, cache.FileHash)
  80. } else {
  81. if cache.State == consts.CacheStatePinned {
  82. svc.taskManager.StartComparable(task.NewIPFSPin(cache.FileHash))
  83. } else if cache.State == consts.CacheStateTemp {
  84. if time.Since(cache.CacheTime) > time.Duration(config.Cfg().TempFileLifetime)*time.Second {
  85. entries = append(entries, agtmq.NewCheckCacheRespEntry(cache.FileHash, agtmq.CHECK_IPFS_RESP_OP_DELETE_TEMP))
  86. }
  87. }
  88. }
  89. }
  90. // map中剩下的数据是没有被遍历过,即Cache中没有记录的,那么就Unpin文件,并产生一条Temp记录
  91. for hash := range filesMap {
  92. logger.WithField("FileHash", hash).Debugf("unpin for no cacah entry")
  93. err := ipfsCli.Unpin(hash)
  94. if err != nil {
  95. logger.WithField("FileHash", hash).Warnf("unpin file failed, err: %s", err.Error())
  96. }
  97. entries = append(entries, agtmq.NewCheckCacheRespEntry(hash, agtmq.CHECK_IPFS_RESP_OP_CREATE_TEMP))
  98. }
  99. return mq.ReplyOK(agtmq.NewCheckCacheResp(entries))
  100. }
  101. func (svc *Service) StartCacheMovePackage(msg *agtmq.StartCacheMovePackage) (*agtmq.StartCacheMovePackageResp, *mq.CodeMessage) {
  102. tsk := svc.taskManager.StartNew(mytask.NewCacheMovePackage(msg.UserID, msg.PackageID))
  103. return mq.ReplyOK(agtmq.NewStartCacheMovePackageResp(tsk.ID()))
  104. }
  105. func (svc *Service) WaitCacheMovePackage(msg *agtmq.WaitCacheMovePackage) (*agtmq.WaitCacheMovePackageResp, *mq.CodeMessage) {
  106. tsk := svc.taskManager.FindByID(msg.TaskID)
  107. if tsk == nil {
  108. return nil, mq.Failed(errorcode.TaskNotFound, "task not found")
  109. }
  110. if msg.WaitTimeoutMs == 0 {
  111. tsk.Wait()
  112. errMsg := ""
  113. if tsk.Error() != nil {
  114. errMsg = tsk.Error().Error()
  115. }
  116. return mq.ReplyOK(agtmq.NewWaitCacheMovePackageResp(true, errMsg))
  117. } else {
  118. if tsk.WaitTimeout(time.Duration(msg.WaitTimeoutMs)) {
  119. errMsg := ""
  120. if tsk.Error() != nil {
  121. errMsg = tsk.Error().Error()
  122. }
  123. return mq.ReplyOK(agtmq.NewWaitCacheMovePackageResp(true, errMsg))
  124. }
  125. return mq.ReplyOK(agtmq.NewWaitCacheMovePackageResp(false, ""))
  126. }
  127. }

本项目旨在将云际存储公共基础设施化,使个人及企业可低门槛使用高效的云际存储服务(安装开箱即用云际存储客户端即可,无需关注其他组件的部署),同时支持用户灵活便捷定制云际存储的功能细节。