You can not select more than 25 topics Topics must start with a chinese character,a letter or number, can include dashes ('-') and can be up to 35 characters long.

check_cache.go 2.1 kB

2 years ago
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384
  1. package event
  2. import (
  3. "database/sql"
  4. "github.com/samber/lo"
  5. "gitlink.org.cn/cloudream/common/pkgs/logger"
  6. "gitlink.org.cn/cloudream/storage-common/consts"
  7. "gitlink.org.cn/cloudream/storage-common/pkgs/db/model"
  8. "gitlink.org.cn/cloudream/storage-common/pkgs/distlock/reqbuilder"
  9. scevt "gitlink.org.cn/cloudream/storage-common/pkgs/mq/scanner/event"
  10. )
  11. type CheckCache struct {
  12. scevt.CheckCache
  13. }
  14. func NewCheckCache(nodeID int64) *CheckCache {
  15. return &CheckCache{
  16. CheckCache: scevt.NewCheckCache(nodeID),
  17. }
  18. }
  19. func (t *CheckCache) TryMerge(other Event) bool {
  20. event, ok := other.(*CheckCache)
  21. if !ok {
  22. return false
  23. }
  24. if event.NodeID != t.NodeID {
  25. return false
  26. }
  27. return true
  28. }
  29. func (t *CheckCache) Execute(execCtx ExecuteContext) {
  30. log := logger.WithType[AgentCheckStorage]("Event")
  31. log.Debugf("begin with %v", logger.FormatStruct(t))
  32. defer log.Debugf("end")
  33. mutex, err := reqbuilder.NewBuilder().
  34. Metadata().
  35. // 查询节点状态
  36. Node().ReadOne(t.NodeID).
  37. // 删除节点所有的Cache记录
  38. Cache().WriteAny().
  39. MutexLock(execCtx.Args.DistLock)
  40. if err != nil {
  41. log.Warnf("acquire locks failed, err: %s", err.Error())
  42. return
  43. }
  44. defer mutex.Unlock()
  45. node, err := execCtx.Args.DB.Node().GetByID(execCtx.Args.DB.SQLCtx(), t.NodeID)
  46. if err == sql.ErrNoRows {
  47. return
  48. }
  49. if err != nil {
  50. log.WithField("NodeID", t.NodeID).Warnf("get node failed, err: %s", err.Error())
  51. return
  52. }
  53. if node.State != consts.NodeStateUnavailable {
  54. return
  55. }
  56. caches, err := execCtx.Args.DB.Cache().GetNodeCaches(execCtx.Args.DB.SQLCtx(), t.NodeID)
  57. if err != nil {
  58. log.WithField("NodeID", t.NodeID).Warnf("get node caches failed, err: %s", err.Error())
  59. return
  60. }
  61. err = execCtx.Args.DB.Cache().DeleteNodeAll(execCtx.Args.DB.SQLCtx(), t.NodeID)
  62. if err != nil {
  63. log.WithField("NodeID", t.NodeID).Warnf("delete node all caches failed, err: %s", err.Error())
  64. return
  65. }
  66. execCtx.Executor.Post(NewCheckRepCount(lo.Map(caches, func(ch model.Cache, index int) string { return ch.FileHash })))
  67. }
  68. func init() {
  69. RegisterMessageConvertor(func(msg scevt.CheckCache) Event { return NewCheckCache(msg.NodeID) })
  70. }

本项目旨在将云际存储公共基础设施化,使个人及企业可低门槛使用高效的云际存储服务(安装开箱即用云际存储客户端即可,无需关注其他组件的部署),同时支持用户灵活便捷定制云际存储的功能细节。