You can not select more than 25 topics Topics must start with a chinese character,a letter or number, can include dashes ('-') and can be up to 35 characters long.

agent_check_cache.go 5.0 kB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175
  1. package event
  2. import (
  3. "database/sql"
  4. "time"
  5. "github.com/samber/lo"
  6. "gitlink.org.cn/cloudream/common/pkg/distlock/reqbuilder"
  7. "gitlink.org.cn/cloudream/common/pkg/logger"
  8. "gitlink.org.cn/cloudream/db/model"
  9. "gitlink.org.cn/cloudream/rabbitmq"
  10. "gitlink.org.cn/cloudream/scanner/internal/config"
  11. agtcli "gitlink.org.cn/cloudream/rabbitmq/client/agent"
  12. agtmsg "gitlink.org.cn/cloudream/rabbitmq/message/agent"
  13. scevt "gitlink.org.cn/cloudream/rabbitmq/message/scanner/event"
  14. )
  15. type AgentCheckCache struct {
  16. scevt.AgentCheckCache
  17. }
  18. func NewAgentCheckCache(nodeID int, fileHashes []string) *AgentCheckCache {
  19. return &AgentCheckCache{
  20. AgentCheckCache: scevt.NewAgentCheckCache(nodeID, fileHashes),
  21. }
  22. }
  23. func (t *AgentCheckCache) TryMerge(other Event) bool {
  24. event, ok := other.(*AgentCheckCache)
  25. if !ok {
  26. return false
  27. }
  28. if event.NodeID != t.NodeID {
  29. return false
  30. }
  31. // FileHashes为nil时代表全量检查
  32. if event.FileHashes == nil {
  33. t.FileHashes = nil
  34. } else if t.FileHashes != nil {
  35. t.FileHashes = lo.Union(t.FileHashes, event.FileHashes)
  36. }
  37. return true
  38. }
  39. func (t *AgentCheckCache) Execute(execCtx ExecuteContext) {
  40. log := logger.WithType[AgentCheckCache]("Event")
  41. log.Debugf("begin with %v", logger.FormatStruct(t))
  42. defer log.Debugf("end")
  43. // TODO unavailable的节点需不需要发送任务?
  44. if t.FileHashes == nil {
  45. t.checkComplete(execCtx)
  46. } else {
  47. t.checkIncrement(execCtx)
  48. }
  49. }
  50. func (t *AgentCheckCache) checkComplete(execCtx ExecuteContext) {
  51. log := logger.WithType[AgentCheckCache]("Event")
  52. mutex, err := reqbuilder.NewBuilder().
  53. Metadata().
  54. // 全量模式下修改某个节点所有的Cache记录
  55. Cache().WriteAny().
  56. IPFS().
  57. // 全量模式下修改某个节点所有的副本数据
  58. WriteAnyRep(t.NodeID).
  59. MutexLock(execCtx.Args.DistLock)
  60. if err != nil {
  61. log.Warnf("acquire locks failed, err: %s", err.Error())
  62. return
  63. }
  64. defer mutex.Unlock()
  65. caches, err := execCtx.Args.DB.Cache().GetNodeCaches(execCtx.Args.DB.SQLCtx(), t.NodeID)
  66. if err != nil {
  67. log.WithField("NodeID", t.NodeID).Warnf("get node caches failed, err: %s", err.Error())
  68. return
  69. }
  70. t.startCheck(execCtx, true, caches)
  71. }
  72. func (t *AgentCheckCache) checkIncrement(execCtx ExecuteContext) {
  73. log := logger.WithType[AgentCheckCache]("Event")
  74. builder := reqbuilder.NewBuilder()
  75. for _, hash := range t.FileHashes {
  76. builder.
  77. // 增量模式下,不会有改动到Cache记录的操作
  78. Metadata().Cache().ReadOne(t.NodeID, hash).
  79. // 由于副本Write锁的特点,Pin文件(创建文件)不需要Create锁
  80. IPFS().WriteOneRep(t.NodeID, hash)
  81. }
  82. mutex, err := builder.MutexLock(execCtx.Args.DistLock)
  83. if err != nil {
  84. log.Warnf("acquire locks failed, err: %s", err.Error())
  85. return
  86. }
  87. defer mutex.Unlock()
  88. var caches []model.Cache
  89. for _, hash := range t.FileHashes {
  90. ch, err := execCtx.Args.DB.Cache().Get(execCtx.Args.DB.SQLCtx(), hash, t.NodeID)
  91. // 记录不存在则跳过
  92. if err == sql.ErrNoRows {
  93. continue
  94. }
  95. if err != nil {
  96. log.WithField("FileHash", hash).WithField("NodeID", t.NodeID).Warnf("get cache failed, err: %s", err.Error())
  97. return
  98. }
  99. caches = append(caches, ch)
  100. }
  101. t.startCheck(execCtx, true, caches)
  102. }
  103. func (t *AgentCheckCache) startCheck(execCtx ExecuteContext, isComplete bool, caches []model.Cache) {
  104. log := logger.WithType[AgentCheckCache]("Event")
  105. // 然后向代理端发送移动文件的请求
  106. agentClient, err := agtcli.NewClient(t.NodeID, &config.Cfg().RabbitMQ)
  107. if err != nil {
  108. log.WithField("NodeID", t.NodeID).Warnf("create agent client failed, err: %s", err.Error())
  109. return
  110. }
  111. defer agentClient.Close()
  112. checkResp, err := agentClient.CheckIPFS(agtmsg.NewCheckIPFS(isComplete, caches), rabbitmq.RequestOption{Timeout: time.Minute})
  113. if err != nil {
  114. log.WithField("NodeID", t.NodeID).Warnf("checking ipfs: %s", err.Error())
  115. return
  116. }
  117. // 根据返回结果修改数据库
  118. for _, entry := range checkResp.Entries {
  119. switch entry.Operation {
  120. case agtmsg.CHECK_IPFS_RESP_OP_DELETE_TEMP:
  121. err := execCtx.Args.DB.Cache().DeleteTemp(execCtx.Args.DB.SQLCtx(), entry.FileHash, t.NodeID)
  122. if err != nil {
  123. log.WithField("FileHash", entry.FileHash).
  124. WithField("NodeID", t.NodeID).
  125. Warnf("delete temp cache failed, err: %s", err.Error())
  126. }
  127. log.WithField("FileHash", entry.FileHash).
  128. WithField("NodeID", t.NodeID).
  129. Debugf("delete temp cache")
  130. case agtmsg.CHECK_IPFS_RESP_OP_CREATE_TEMP:
  131. err := execCtx.Args.DB.Cache().CreateTemp(execCtx.Args.DB.SQLCtx(), entry.FileHash, t.NodeID)
  132. if err != nil {
  133. log.WithField("FileHash", entry.FileHash).
  134. WithField("NodeID", t.NodeID).
  135. Warnf("create temp cache failed, err: %s", err.Error())
  136. }
  137. log.WithField("FileHash", entry.FileHash).
  138. WithField("NodeID", t.NodeID).
  139. Debugf("create temp cache")
  140. }
  141. }
  142. }
  143. func init() {
  144. RegisterMessageConvertor(func(msg scevt.AgentCheckCache) Event { return NewAgentCheckCache(msg.NodeID, msg.FileHashes) })
  145. }

本项目旨在将云际存储公共基础设施化,使个人及企业可低门槛使用高效的云际存储服务(安装开箱即用云际存储客户端即可,无需关注其他组件的部署),同时支持用户灵活便捷定制云际存储的功能细节。