You can not select more than 25 topics Topics must start with a chinese character,a letter or number, can include dashes ('-') and can be up to 35 characters long.

agent_check_cache.go 5.0 kB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174
  1. package event
  2. import (
  3. "database/sql"
  4. "time"
  5. "github.com/samber/lo"
  6. "gitlink.org.cn/cloudream/common/pkgs/logger"
  7. "gitlink.org.cn/cloudream/common/pkgs/mq"
  8. "gitlink.org.cn/cloudream/storage-common/globals"
  9. "gitlink.org.cn/cloudream/storage-common/pkgs/db/model"
  10. "gitlink.org.cn/cloudream/storage-common/pkgs/distlock/reqbuilder"
  11. agtmq "gitlink.org.cn/cloudream/storage-common/pkgs/mq/agent"
  12. scevt "gitlink.org.cn/cloudream/storage-common/pkgs/mq/scanner/event"
  13. )
  14. type AgentCheckCache struct {
  15. scevt.AgentCheckCache
  16. }
  17. func NewAgentCheckCache(nodeID int64, fileHashes []string) *AgentCheckCache {
  18. return &AgentCheckCache{
  19. AgentCheckCache: scevt.NewAgentCheckCache(nodeID, fileHashes),
  20. }
  21. }
  22. func (t *AgentCheckCache) TryMerge(other Event) bool {
  23. event, ok := other.(*AgentCheckCache)
  24. if !ok {
  25. return false
  26. }
  27. if event.NodeID != t.NodeID {
  28. return false
  29. }
  30. // FileHashes为nil时代表全量检查
  31. if event.FileHashes == nil {
  32. t.FileHashes = nil
  33. } else if t.FileHashes != nil {
  34. t.FileHashes = lo.Union(t.FileHashes, event.FileHashes)
  35. }
  36. return true
  37. }
  38. func (t *AgentCheckCache) Execute(execCtx ExecuteContext) {
  39. log := logger.WithType[AgentCheckCache]("Event")
  40. log.Debugf("begin with %v", logger.FormatStruct(t))
  41. defer log.Debugf("end")
  42. // TODO unavailable的节点需不需要发送任务?
  43. if t.FileHashes == nil {
  44. t.checkComplete(execCtx)
  45. } else {
  46. t.checkIncrement(execCtx)
  47. }
  48. }
  49. func (t *AgentCheckCache) checkComplete(execCtx ExecuteContext) {
  50. log := logger.WithType[AgentCheckCache]("Event")
  51. mutex, err := reqbuilder.NewBuilder().
  52. Metadata().
  53. // 全量模式下修改某个节点所有的Cache记录
  54. Cache().WriteAny().
  55. IPFS().
  56. // 全量模式下修改某个节点所有的副本数据
  57. WriteAnyRep(t.NodeID).
  58. MutexLock(execCtx.Args.DistLock)
  59. if err != nil {
  60. log.Warnf("acquire locks failed, err: %s", err.Error())
  61. return
  62. }
  63. defer mutex.Unlock()
  64. caches, err := execCtx.Args.DB.Cache().GetNodeCaches(execCtx.Args.DB.SQLCtx(), t.NodeID)
  65. if err != nil {
  66. log.WithField("NodeID", t.NodeID).Warnf("get node caches failed, err: %s", err.Error())
  67. return
  68. }
  69. t.startCheck(execCtx, true, caches)
  70. }
  71. func (t *AgentCheckCache) checkIncrement(execCtx ExecuteContext) {
  72. log := logger.WithType[AgentCheckCache]("Event")
  73. builder := reqbuilder.NewBuilder()
  74. for _, hash := range t.FileHashes {
  75. builder.
  76. // 增量模式下,不会有改动到Cache记录的操作
  77. Metadata().Cache().ReadOne(t.NodeID, hash).
  78. // 由于副本Write锁的特点,Pin文件(创建文件)不需要Create锁
  79. IPFS().WriteOneRep(t.NodeID, hash)
  80. }
  81. mutex, err := builder.MutexLock(execCtx.Args.DistLock)
  82. if err != nil {
  83. log.Warnf("acquire locks failed, err: %s", err.Error())
  84. return
  85. }
  86. defer mutex.Unlock()
  87. var caches []model.Cache
  88. for _, hash := range t.FileHashes {
  89. ch, err := execCtx.Args.DB.Cache().Get(execCtx.Args.DB.SQLCtx(), hash, t.NodeID)
  90. // 记录不存在则跳过
  91. if err == sql.ErrNoRows {
  92. continue
  93. }
  94. if err != nil {
  95. log.WithField("FileHash", hash).WithField("NodeID", t.NodeID).Warnf("get cache failed, err: %s", err.Error())
  96. return
  97. }
  98. caches = append(caches, ch)
  99. }
  100. t.startCheck(execCtx, false, caches)
  101. }
  102. func (t *AgentCheckCache) startCheck(execCtx ExecuteContext, isComplete bool, caches []model.Cache) {
  103. log := logger.WithType[AgentCheckCache]("Event")
  104. // 然后向代理端发送移动文件的请求
  105. agentClient, err := globals.AgentMQPool.Acquire(t.NodeID)
  106. if err != nil {
  107. log.WithField("NodeID", t.NodeID).Warnf("create agent client failed, err: %s", err.Error())
  108. return
  109. }
  110. defer agentClient.Close()
  111. checkResp, err := agentClient.CheckCache(agtmq.NewCheckCache(isComplete, caches), mq.RequestOption{Timeout: time.Minute})
  112. if err != nil {
  113. log.WithField("NodeID", t.NodeID).Warnf("checking ipfs: %s", err.Error())
  114. return
  115. }
  116. // 根据返回结果修改数据库
  117. for _, entry := range checkResp.Entries {
  118. switch entry.Operation {
  119. case agtmq.CHECK_IPFS_RESP_OP_DELETE_TEMP:
  120. err := execCtx.Args.DB.Cache().DeleteTemp(execCtx.Args.DB.SQLCtx(), entry.FileHash, t.NodeID)
  121. if err != nil {
  122. log.WithField("FileHash", entry.FileHash).
  123. WithField("NodeID", t.NodeID).
  124. Warnf("delete temp cache failed, err: %s", err.Error())
  125. }
  126. log.WithField("FileHash", entry.FileHash).
  127. WithField("NodeID", t.NodeID).
  128. Debugf("delete temp cache")
  129. case agtmq.CHECK_IPFS_RESP_OP_CREATE_TEMP:
  130. err := execCtx.Args.DB.Cache().CreateTemp(execCtx.Args.DB.SQLCtx(), entry.FileHash, t.NodeID)
  131. if err != nil {
  132. log.WithField("FileHash", entry.FileHash).
  133. WithField("NodeID", t.NodeID).
  134. Warnf("create temp cache failed, err: %s", err.Error())
  135. }
  136. log.WithField("FileHash", entry.FileHash).
  137. WithField("NodeID", t.NodeID).
  138. Debugf("create temp cache")
  139. }
  140. }
  141. }
  142. func init() {
  143. RegisterMessageConvertor(func(msg scevt.AgentCheckCache) Event { return NewAgentCheckCache(msg.NodeID, msg.FileHashes) })
  144. }

本项目旨在将云际存储公共基础设施化,使个人及企业可低门槛使用高效的云际存储服务(安装开箱即用云际存储客户端即可,无需关注其他组件的部署),同时支持用户灵活便捷定制云际存储的功能细节。