You can not select more than 25 topics Topics must start with a chinese character,a letter or number, can include dashes ('-') and can be up to 35 characters long.

check_rep_count.go 6.6 kB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199
  1. package task
  2. import (
  3. "database/sql"
  4. "fmt"
  5. "math"
  6. "github.com/jmoiron/sqlx"
  7. "github.com/samber/lo"
  8. "gitlink.org.cn/cloudream/common/consts"
  9. "gitlink.org.cn/cloudream/scanner/internal/config"
  10. log "gitlink.org.cn/cloudream/utils/logger"
  11. mymath "gitlink.org.cn/cloudream/utils/math"
  12. mysort "gitlink.org.cn/cloudream/utils/sort"
  13. "gitlink.org.cn/cloudream/db/model"
  14. mysql "gitlink.org.cn/cloudream/db/sql"
  15. )
  16. type CheckRepCountTask struct {
  17. FileHashes []string
  18. }
  19. func NewCheckRepCountTask(fileHashes []string) *CheckRepCountTask {
  20. return &CheckRepCountTask{
  21. FileHashes: fileHashes,
  22. }
  23. }
  24. func (t *CheckRepCountTask) TryMerge(other Task) bool {
  25. chkTask, ok := other.(*CheckRepCountTask)
  26. if !ok {
  27. return false
  28. }
  29. t.FileHashes = lo.Union(t.FileHashes, chkTask.FileHashes)
  30. return true
  31. }
  32. func (t *CheckRepCountTask) Execute(execCtx *ExecuteContext, myOpts ExecuteOption) {
  33. updatedNodeAndHashes := make(map[int][]string)
  34. for _, fileHash := range t.FileHashes {
  35. updatedNodeIDs, err := t.checkOneRepCount(fileHash, execCtx)
  36. if err != nil {
  37. log.WithField("FileHash", fileHash).Warnf("check file rep count failed, err: %s", err.Error())
  38. continue
  39. }
  40. for _, id := range updatedNodeIDs {
  41. hashes := updatedNodeAndHashes[id]
  42. updatedNodeAndHashes[id] = append(hashes, fileHash)
  43. }
  44. }
  45. var agtChkEntries []AgentCheckCacheTaskEntry
  46. for nodeID, hashes := range updatedNodeAndHashes {
  47. agtChkEntries = append(agtChkEntries, NewAgentCheckCacheTaskEntry(nodeID, hashes))
  48. }
  49. // 新任务继承本任务的执行设定(紧急任务依然保持紧急任务)
  50. execCtx.Executor.Post(NewAgentCheckCacheTask(agtChkEntries), myOpts)
  51. }
  52. func (t *CheckRepCountTask) checkOneRepCount(fileHash string, execCtx *ExecuteContext) ([]int, error) {
  53. var updatedNodeIDs []int
  54. err := execCtx.DB.DoTx(sql.LevelSerializable, func(tx *sqlx.Tx) error {
  55. repMaxCnt, err := mysql.ObjectRep.GetFileMaxRepCount(tx, fileHash)
  56. if err != nil {
  57. return fmt.Errorf("get file max rep count failed, err: %w", err)
  58. }
  59. blkCnt, err := mysql.ObjectBlock.CountBlockWithHash(tx, fileHash)
  60. if err != nil {
  61. return fmt.Errorf("count block with hash failed, err: %w", err)
  62. }
  63. // 计算所需的最少备份数:
  64. // ObjectRep中期望备份数的最大值
  65. // 如果ObjectBlock存在对此文件的引用,则至少为1
  66. needRepCount := mymath.Max(repMaxCnt, mymath.Max(1, blkCnt))
  67. repNodes, err := mysql.Cache.GetCachingFileNodes(tx, fileHash)
  68. if err != nil {
  69. return fmt.Errorf("get caching file nodes failed, err: %w", err)
  70. }
  71. allNodes, err := mysql.Node.GetAllNodes(tx)
  72. if err != nil {
  73. return fmt.Errorf("get all nodes failed, err: %w", err)
  74. }
  75. var normalNodes, unavaiNodes []model.Node
  76. for _, node := range repNodes {
  77. if node.State == consts.NODE_STATE_NORMAL {
  78. normalNodes = append(normalNodes, node)
  79. } else if node.State == consts.NODE_STATE_UNAVAILABLE {
  80. unavaiNodes = append(unavaiNodes, node)
  81. }
  82. }
  83. // 如果Available的备份数超过期望备份数,则让一些节点退出
  84. if len(normalNodes) > needRepCount {
  85. delNodes := chooseDeleteAvaiRepNodes(allNodes, normalNodes, needRepCount-len(normalNodes))
  86. for _, node := range delNodes {
  87. err := mysql.Cache.ChangeState(tx, fileHash, node.NodeID, consts.CACHE_STATE_TEMP)
  88. if err != nil {
  89. return fmt.Errorf("change cache state failed, err: %w", err)
  90. }
  91. updatedNodeIDs = append(updatedNodeIDs, node.NodeID)
  92. }
  93. return nil
  94. }
  95. minAvaiNodeCnt := int(math.Ceil(float64(config.Cfg().MinAvailableRepProportion) * float64(needRepCount)))
  96. // 因为总备份数不够,而需要增加的备份数
  97. add1 := mymath.Max(0, needRepCount-len(repNodes))
  98. // 因为Available的备份数占比过少,而需要增加的备份数
  99. add2 := mymath.Max(0, minAvaiNodeCnt-len(normalNodes))
  100. // 最终需要增加的备份数,是以上两种情况的最大值
  101. finalAddCount := mymath.Max(add1, add2)
  102. if finalAddCount > 0 {
  103. newNodes := chooseNewRepNodes(allNodes, repNodes, finalAddCount)
  104. if len(newNodes) < finalAddCount {
  105. log.WithField("FileHash", fileHash).Warnf("need %d more rep nodes, but get only %d nodes", finalAddCount, len(newNodes))
  106. // TODO 节点数不够,进行一个告警
  107. }
  108. for _, node := range newNodes {
  109. err := mysql.Cache.Create(tx, fileHash, node.NodeID)
  110. if err != nil {
  111. return fmt.Errorf("create cache failed, err: %w", err)
  112. }
  113. updatedNodeIDs = append(updatedNodeIDs, node.NodeID)
  114. }
  115. }
  116. return nil
  117. })
  118. if err != nil {
  119. return nil, err
  120. }
  121. return updatedNodeIDs, nil
  122. }
  123. func chooseNewRepNodes(allNodes []model.Node, curRepNodes []model.Node, newCount int) []model.Node {
  124. noRepNodes := lo.Reject(allNodes, func(node model.Node, index int) bool {
  125. return lo.ContainsBy(curRepNodes, func(n model.Node) bool { return node.NodeID == n.NodeID }) ||
  126. node.State != consts.NODE_STATE_NORMAL
  127. })
  128. repNodeLocationIDs := make(map[int]bool)
  129. for _, node := range curRepNodes {
  130. repNodeLocationIDs[node.LocationID] = true
  131. }
  132. mysort.Sort(noRepNodes, func(l, r model.Node) int {
  133. // LocationID不存在时为false,false - true < 0,所以LocationID不存在的会排在前面
  134. return mysort.CmpBool(repNodeLocationIDs[l.LocationID], repNodeLocationIDs[r.LocationID])
  135. })
  136. return noRepNodes[:mymath.Min(newCount, len(noRepNodes))]
  137. }
  138. func chooseDeleteAvaiRepNodes(allNodes []model.Node, curAvaiRepNodes []model.Node, delCount int) []model.Node {
  139. // 按照地域ID分组
  140. locationGroupedNodes := make(map[int][]model.Node)
  141. for _, node := range curAvaiRepNodes {
  142. nodes := locationGroupedNodes[node.LocationID]
  143. nodes = append(nodes, node)
  144. locationGroupedNodes[node.LocationID] = nodes
  145. }
  146. // 每次从每个分组中取出一个元素放入结果数组,并将这个元素从分组中删除
  147. // 最后结果数组中的元素会按照地域交错循环排列,比如:ABCABCBCC。同时还有一个特征:靠后的循环节中的元素都来自于元素数多的分组
  148. // 将结果数组反转(此处是用存放时就逆序的形式实现),就把元素数多的分组提前了,此时从头部取出要删除的节点即可
  149. alternatedNodes := make([]model.Node, len(curAvaiRepNodes))
  150. for i := len(curAvaiRepNodes) - 1; i >= 0; {
  151. for id, nodes := range locationGroupedNodes {
  152. alternatedNodes[i] = nodes[0]
  153. if len(nodes) == 1 {
  154. delete(locationGroupedNodes, id)
  155. } else {
  156. locationGroupedNodes[id] = nodes[1:]
  157. }
  158. // 放置一个元素就移动一下下一个存放点
  159. i--
  160. }
  161. }
  162. return alternatedNodes[:mymath.Min(delCount, len(alternatedNodes))]
  163. }

本项目旨在将云际存储公共基础设施化,使个人及企业可低门槛使用高效的云际存储服务(安装开箱即用云际存储客户端即可,无需关注其他组件的部署),同时支持用户灵活便捷定制云际存储的功能细节。