You can not select more than 25 topics Topics must start with a chinese character,a letter or number, can include dashes ('-') and can be up to 35 characters long.

change_redundancy.go 4.9 kB

6 months ago
6 months ago
6 months ago
6 months ago
6 months ago
6 months ago
6 months ago
6 months ago
6 months ago
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194
  1. package ticktock
  2. import (
  3. "fmt"
  4. "time"
  5. "gitlink.org.cn/cloudream/common/pkgs/logger"
  6. "gitlink.org.cn/cloudream/common/utils/reflect2"
  7. "gitlink.org.cn/cloudream/jcs-pub/client/internal/db"
  8. clitypes "gitlink.org.cn/cloudream/jcs-pub/client/types"
  9. "gitlink.org.cn/cloudream/jcs-pub/common/models/datamap"
  10. "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/distlock/reqbuilder"
  11. )
  12. const (
  13. BatchGetPackageDetailCount = 100
  14. BatchGetObjectDetailCount = 1000
  15. )
  16. type ChangeRedundancy struct {
  17. }
  18. func (j *ChangeRedundancy) Name() string {
  19. return reflect2.TypeNameOf[ChangeRedundancy]()
  20. }
  21. func (j *ChangeRedundancy) Execute(t *TickTock) {
  22. log := logger.WithType[ChangeRedundancy]("TickTock")
  23. startTime := time.Now()
  24. log.Infof("job start")
  25. defer func() {
  26. log.Infof("job end, time: %v", time.Since(startTime))
  27. }()
  28. ctx := &changeRedundancyContext{
  29. ticktock: t,
  30. allUserSpaces: make(map[clitypes.UserSpaceID]*userSpaceLoadInfo),
  31. }
  32. spaceIDs, err := t.db.UserSpace().GetAllIDs(t.db.DefCtx())
  33. if err != nil {
  34. log.Warnf("get user space ids: %v", err)
  35. return
  36. }
  37. spaces := t.spaceMeta.GetMany(spaceIDs)
  38. for _, space := range spaces {
  39. if space == nil {
  40. continue
  41. }
  42. if space.MasterHub == nil {
  43. continue
  44. }
  45. ctx.allUserSpaces[space.UserSpace.UserSpaceID] = &userSpaceLoadInfo{
  46. UserSpace: space,
  47. }
  48. }
  49. if len(ctx.allUserSpaces) == 0 {
  50. log.Warnf("no user space found")
  51. return
  52. }
  53. lastPkgID := clitypes.PackageID(0)
  54. for {
  55. pkgs, err := db.DoTx21(t.db, t.db.Package().BatchGetDetailPaged, lastPkgID, BatchGetPackageDetailCount)
  56. if err != nil {
  57. log.Warnf("get package details: %v", err)
  58. return
  59. }
  60. if len(pkgs) == 0 {
  61. break
  62. }
  63. lastPkgID = pkgs[len(pkgs)-1].Package.PackageID
  64. for _, p := range pkgs {
  65. err := j.changeOne(ctx, p)
  66. if err != nil {
  67. log.Warnf("change redundancy: %v", err)
  68. return
  69. }
  70. }
  71. }
  72. }
  73. type changeRedundancyContext struct {
  74. ticktock *TickTock
  75. allUserSpaces map[clitypes.UserSpaceID]*userSpaceLoadInfo
  76. mostBlockStgIDs []clitypes.UserSpaceID
  77. }
  78. type userSpaceLoadInfo struct {
  79. UserSpace *clitypes.UserSpaceDetail
  80. AccessAmount float64
  81. }
  82. func (j *ChangeRedundancy) changeOne(ctx *changeRedundancyContext, pkg clitypes.PackageDetail) error {
  83. log := logger.WithType[ChangeRedundancy]("TickTock")
  84. db2 := ctx.ticktock.db
  85. // allUserSpaces是复用的,所以需要先清空
  86. for _, space := range ctx.allUserSpaces {
  87. space.AccessAmount = 0
  88. }
  89. pkgAccessStats, err := db2.PackageAccessStat().GetByPackageID(db2.DefCtx(), pkg.Package.PackageID)
  90. if err != nil {
  91. return fmt.Errorf("get package access stats: %w", err)
  92. }
  93. for _, stat := range pkgAccessStats {
  94. info, ok := ctx.allUserSpaces[stat.UserSpaceID]
  95. if !ok {
  96. continue
  97. }
  98. info.AccessAmount = stat.Amount
  99. }
  100. lastObjID := clitypes.ObjectID(0)
  101. for {
  102. objs, err := db.DoTx31(db2, db2.Object().BatchGetDetailsPaged, pkg.Package.PackageID, lastObjID, BatchGetObjectDetailCount)
  103. if err != nil {
  104. return fmt.Errorf("get object details: %w", err)
  105. }
  106. if len(objs) == 0 {
  107. break
  108. }
  109. lastObjID = objs[len(objs)-1].Object.ObjectID
  110. reen := ctx.ticktock.pubLock.BeginReentrant()
  111. var allUpdatings []db.UpdatingObjectRedundancy
  112. var allSysEvts []datamap.SysEventBody
  113. ctx.mostBlockStgIDs = j.summaryRepObjectBlockUserSpaces(ctx, objs, 2)
  114. var willShrinks []clitypes.ObjectDetail
  115. for _, obj := range objs {
  116. newRed, selectedSpaces := j.chooseRedundancy(ctx, obj)
  117. // 冗余策略不需要调整,就检查是否需要收缩
  118. if newRed == nil {
  119. willShrinks = append(willShrinks, obj)
  120. continue
  121. }
  122. reqBlder := reqbuilder.NewBuilder()
  123. for _, space := range selectedSpaces {
  124. reqBlder.Shard().Buzy(space.UserSpace.UserSpace.UserSpaceID)
  125. }
  126. err := reen.Lock(reqBlder.Build())
  127. if err != nil {
  128. log.WithField("ObjectID", obj.Object.ObjectID).Warnf("acquire lock: %s", err.Error())
  129. continue
  130. }
  131. updating, evt, err := j.doChangeRedundancy(ctx, obj, newRed, selectedSpaces)
  132. if updating != nil {
  133. allUpdatings = append(allUpdatings, *updating)
  134. }
  135. if evt != nil {
  136. allSysEvts = append(allSysEvts, evt)
  137. }
  138. if err != nil {
  139. log.WithField("ObjectID", obj.Object.ObjectID).Warnf("%s, its redundancy wont be changed", err.Error())
  140. continue
  141. }
  142. }
  143. udpatings, sysEvts, err := j.doRedundancyShrink(ctx, pkg, willShrinks, reen)
  144. if err != nil {
  145. log.Warnf("redundancy shrink: %s", err.Error())
  146. } else {
  147. allUpdatings = append(allUpdatings, udpatings...)
  148. allSysEvts = append(allSysEvts, sysEvts...)
  149. }
  150. if len(allUpdatings) > 0 {
  151. err := db.DoTx10(db2, db2.Object().BatchUpdateRedundancy, allUpdatings)
  152. if err != nil {
  153. reen.Unlock()
  154. log.Warnf("update object redundancy: %s", err.Error())
  155. return err
  156. }
  157. }
  158. reen.Unlock()
  159. for _, e := range allSysEvts {
  160. ctx.ticktock.evtPub.Publish(e)
  161. }
  162. }
  163. return nil
  164. }

本项目旨在将云际存储公共基础设施化,使个人及企业可低门槛使用高效的云际存储服务(安装开箱即用云际存储客户端即可,无需关注其他组件的部署),同时支持用户灵活便捷定制云际存储的功能细节。