You can not select more than 25 topics Topics must start with a chinese character,a letter or number, can include dashes ('-') and can be up to 35 characters long.

agent_check_cache.go 5.5 kB

2 years ago
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180
  1. package event
  2. import (
  3. "time"
  4. "github.com/samber/lo"
  5. "gitlink.org.cn/cloudream/common/pkgs/logger"
  6. "gitlink.org.cn/cloudream/common/pkgs/mq"
  7. cdssdk "gitlink.org.cn/cloudream/common/sdks/storage"
  8. stgglb "gitlink.org.cn/cloudream/storage/common/globals"
  9. "gitlink.org.cn/cloudream/storage/common/pkgs/db2"
  10. agtmq "gitlink.org.cn/cloudream/storage/common/pkgs/mq/agent"
  11. scevt "gitlink.org.cn/cloudream/storage/common/pkgs/mq/scanner/event"
  12. )
  13. // AgentCheckCache 代表一个用于处理代理缓存检查事件的结构体
  14. type AgentCheckCache struct {
  15. *scevt.AgentCheckCache
  16. }
  17. // NewAgentCheckCache 创建一个新的 AgentCheckCache 实例
  18. func NewAgentCheckCache(evt *scevt.AgentCheckCache) *AgentCheckCache {
  19. return &AgentCheckCache{
  20. AgentCheckCache: evt,
  21. }
  22. }
  23. // TryMerge 尝试合并当前事件与另一个事件
  24. // 如果另一个事件类型不匹配或节点ID不同,则不进行合并
  25. func (t *AgentCheckCache) TryMerge(other Event) bool {
  26. event, ok := other.(*AgentCheckCache)
  27. if !ok {
  28. return false
  29. }
  30. if event.StorageID != t.StorageID {
  31. return false
  32. }
  33. return true
  34. }
  35. // Execute 执行缓存检查操作,对比本地缓存与代理返回的缓存信息,更新数据库中的缓存记录
  36. func (t *AgentCheckCache) Execute(execCtx ExecuteContext) {
  37. log := logger.WithType[AgentCheckCache]("Event")
  38. startTime := time.Now()
  39. log.Debugf("begin with %v", logger.FormatStruct(t.AgentCheckCache))
  40. defer func() {
  41. log.Debugf("end, time: %v", time.Since(startTime))
  42. }()
  43. stg, err := execCtx.Args.DB.Storage().GetByID(execCtx.Args.DB.DefCtx(), t.StorageID)
  44. if err != nil {
  45. log.WithField("StorageID", t.StorageID).Warnf("getting shard storage by storage id: %s", err.Error())
  46. return
  47. }
  48. agtCli, err := stgglb.AgentMQPool.Acquire(stg.MasterHub)
  49. if err != nil {
  50. log.WithField("StorageID", t.StorageID).Warnf("create agent client failed, err: %s", err.Error())
  51. return
  52. }
  53. defer stgglb.AgentMQPool.Release(agtCli)
  54. checkResp, err := agtCli.CheckCache(agtmq.NewCheckCache(t.StorageID), mq.RequestOption{Timeout: time.Minute})
  55. if err != nil {
  56. log.WithField("StorageID", t.StorageID).Warnf("checking shard store: %s", err.Error())
  57. return
  58. }
  59. realFileHashes := lo.SliceToMap(checkResp.FileHashes, func(hash cdssdk.FileHash) (cdssdk.FileHash, bool) { return hash, true })
  60. // 在事务中执行缓存更新操作
  61. execCtx.Args.DB.DoTx(func(tx db2.SQLContext) error {
  62. t.checkCache(execCtx, tx, realFileHashes)
  63. t.checkPinnedObject(execCtx, tx, realFileHashes)
  64. t.checkObjectBlock(execCtx, tx, realFileHashes)
  65. return nil
  66. })
  67. }
  68. // checkCache 对比Cache表中的记录,根据实际存在的文件哈希值,进行增加或删除操作
  69. func (t *AgentCheckCache) checkCache(execCtx ExecuteContext, tx db2.SQLContext, realFileHashes map[cdssdk.FileHash]bool) {
  70. log := logger.WithType[AgentCheckCache]("Event")
  71. caches, err := execCtx.Args.DB.Cache().GetByStorageID(tx, t.StorageID)
  72. if err != nil {
  73. log.WithField("StorageID", t.StorageID).Warnf("getting caches by storage id: %s", err.Error())
  74. return
  75. }
  76. realFileHashesCp := make(map[cdssdk.FileHash]bool)
  77. for k, v := range realFileHashes {
  78. realFileHashesCp[k] = v
  79. }
  80. var rms []cdssdk.FileHash
  81. for _, c := range caches {
  82. if realFileHashesCp[c.FileHash] {
  83. delete(realFileHashesCp, c.FileHash)
  84. continue
  85. }
  86. rms = append(rms, c.FileHash)
  87. }
  88. if len(rms) > 0 {
  89. err = execCtx.Args.DB.Cache().StorageBatchDelete(tx, t.StorageID, rms)
  90. if err != nil {
  91. log.Warnf("batch delete storage caches: %w", err.Error())
  92. }
  93. }
  94. if len(realFileHashesCp) > 0 {
  95. err = execCtx.Args.DB.Cache().BatchCreateOnSameStorage(tx, lo.Keys(realFileHashesCp), t.StorageID, 0)
  96. if err != nil {
  97. log.Warnf("batch create storage caches: %w", err)
  98. return
  99. }
  100. }
  101. }
  102. // checkPinnedObject 对比PinnedObject表,若实际文件不存在,则进行删除操作
  103. func (t *AgentCheckCache) checkPinnedObject(execCtx ExecuteContext, tx db2.SQLContext, realFileHashes map[cdssdk.FileHash]bool) {
  104. log := logger.WithType[AgentCheckCache]("Event")
  105. objs, err := execCtx.Args.DB.PinnedObject().GetObjectsByStorageID(tx, t.StorageID)
  106. if err != nil {
  107. log.WithField("StorageID", t.StorageID).Warnf("getting pinned objects by storage id: %s", err.Error())
  108. return
  109. }
  110. var rms []cdssdk.ObjectID
  111. for _, c := range objs {
  112. if realFileHashes[c.FileHash] {
  113. continue
  114. }
  115. rms = append(rms, c.ObjectID)
  116. }
  117. if len(rms) > 0 {
  118. err = execCtx.Args.DB.PinnedObject().StorageBatchDelete(tx, t.StorageID, rms)
  119. if err != nil {
  120. log.Warnf("batch delete storage pinned objects: %s", err.Error())
  121. }
  122. }
  123. }
  124. // checkObjectBlock 对比ObjectBlock表,若实际文件不存在,则进行删除操作
  125. func (t *AgentCheckCache) checkObjectBlock(execCtx ExecuteContext, tx db2.SQLContext, realFileHashes map[cdssdk.FileHash]bool) {
  126. log := logger.WithType[AgentCheckCache]("Event")
  127. blocks, err := execCtx.Args.DB.ObjectBlock().GetByStorageID(tx, t.StorageID)
  128. if err != nil {
  129. log.WithField("StorageID", t.StorageID).Warnf("getting object blocks by storage id: %s", err.Error())
  130. return
  131. }
  132. var rms []cdssdk.FileHash
  133. for _, b := range blocks {
  134. if realFileHashes[b.FileHash] {
  135. continue
  136. }
  137. rms = append(rms, b.FileHash)
  138. }
  139. if len(rms) > 0 {
  140. err = execCtx.Args.DB.ObjectBlock().StorageBatchDelete(tx, t.StorageID, rms)
  141. if err != nil {
  142. log.Warnf("batch delete storage object blocks: %s", err.Error())
  143. }
  144. }
  145. }
  146. // init 注册AgentCheckCache消息转换器
  147. func init() {
  148. RegisterMessageConvertor(NewAgentCheckCache)
  149. }

本项目旨在将云际存储公共基础设施化,使个人及企业可低门槛使用高效的云际存储服务(安装开箱即用云际存储客户端即可,无需关注其他组件的部署),同时支持用户灵活便捷定制云际存储的功能细节。