You can not select more than 25 topics Topics must start with a chinese character,a letter or number, can include dashes ('-') and can be up to 35 characters long.

change_redundancy.go 4.7 kB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186
  1. package ticktock
  2. import (
  3. "fmt"
  4. "time"
  5. "gitlink.org.cn/cloudream/common/pkgs/logger"
  6. "gitlink.org.cn/cloudream/common/utils/reflect2"
  7. "gitlink.org.cn/cloudream/jcs-pub/client/internal/db"
  8. clitypes "gitlink.org.cn/cloudream/jcs-pub/client/types"
  9. "gitlink.org.cn/cloudream/jcs-pub/common/models/datamap"
  10. )
  11. const (
  12. BatchGetPackageDetailCount = 100
  13. BatchGetObjectDetailCount = 1000
  14. )
  15. type ChangeRedundancy struct {
  16. }
  17. func (j *ChangeRedundancy) Name() string {
  18. return reflect2.TypeNameOf[ChangeRedundancy]()
  19. }
  20. func (j *ChangeRedundancy) Execute(t *TickTock) {
  21. log := logger.WithType[ChangeRedundancy]("TickTock")
  22. startTime := time.Now()
  23. log.Debugf("job start")
  24. defer func() {
  25. log.Debugf("job end, time: %v", time.Since(startTime))
  26. }()
  27. ctx := &changeRedundancyContext{
  28. ticktock: t,
  29. allUserSpaces: make(map[clitypes.UserSpaceID]*userSpaceLoadInfo),
  30. }
  31. spaceIDs, err := t.db.UserSpace().GetAllIDs(t.db.DefCtx())
  32. if err != nil {
  33. log.Warnf("get user space ids: %v", err)
  34. return
  35. }
  36. spaces := t.spaceMeta.GetMany(spaceIDs)
  37. for _, space := range spaces {
  38. if space == nil {
  39. continue
  40. }
  41. ctx.allUserSpaces[space.UserSpace.UserSpaceID] = &userSpaceLoadInfo{
  42. UserSpace: space,
  43. }
  44. }
  45. lastPkgID := clitypes.PackageID(0)
  46. for {
  47. pkgs, err := db.DoTx21(t.db, t.db.Package().BatchGetDetailPaged, lastPkgID, BatchGetPackageDetailCount)
  48. if err != nil {
  49. log.Warnf("get package details: %v", err)
  50. return
  51. }
  52. if len(pkgs) == 0 {
  53. break
  54. }
  55. lastPkgID = pkgs[len(pkgs)-1].Package.PackageID
  56. for _, p := range pkgs {
  57. err := j.changeOne(ctx, p)
  58. if err != nil {
  59. log.Warnf("change redundancy: %v", err)
  60. return
  61. }
  62. }
  63. }
  64. }
  65. type changeRedundancyContext struct {
  66. ticktock *TickTock
  67. allUserSpaces map[clitypes.UserSpaceID]*userSpaceLoadInfo
  68. mostBlockStgIDs []clitypes.UserSpaceID
  69. }
  70. type userSpaceLoadInfo struct {
  71. UserSpace *clitypes.UserSpaceDetail
  72. AccessAmount float64
  73. }
  74. func (j *ChangeRedundancy) changeOne(ctx *changeRedundancyContext, pkg clitypes.PackageDetail) error {
  75. log := logger.WithType[ChangeRedundancy]("TickTock")
  76. db2 := ctx.ticktock.db
  77. // allUserSpaces是复用的,所以需要先清空
  78. for _, space := range ctx.allUserSpaces {
  79. space.AccessAmount = 0
  80. }
  81. pkgAccessStats, err := db2.PackageAccessStat().GetByPackageID(db2.DefCtx(), pkg.Package.PackageID)
  82. if err != nil {
  83. return fmt.Errorf("get package access stats: %w", err)
  84. }
  85. for _, stat := range pkgAccessStats {
  86. info, ok := ctx.allUserSpaces[stat.UserSpaceID]
  87. if !ok {
  88. continue
  89. }
  90. info.AccessAmount = stat.Amount
  91. }
  92. lastObjID := clitypes.ObjectID(0)
  93. for {
  94. objs, err := db.DoTx31(db2, db2.Object().BatchGetDetailsPaged, pkg.Package.PackageID, lastObjID, BatchGetObjectDetailCount)
  95. if err != nil {
  96. return fmt.Errorf("get object details: %w", err)
  97. }
  98. if len(objs) == 0 {
  99. break
  100. }
  101. lastObjID = objs[len(objs)-1].Object.ObjectID
  102. var allUpdatings []db.UpdatingObjectRedundancy
  103. var allSysEvts []datamap.SysEventBody
  104. ctx.mostBlockStgIDs = j.summaryRepObjectBlockUserSpaces(ctx, objs, 2)
  105. // // TODO 加锁
  106. // builder := reqbuilder.NewBuilder()
  107. // for _, storage := range newRepStgs {
  108. // builder.Shard().Buzy(storage.Storage.Storage.StorageID)
  109. // }
  110. // for _, storage := range newECStgs {
  111. // builder.Shard().Buzy(storage.Storage.Storage.StorageID)
  112. // }
  113. // mutex, err := builder.MutexLock(execCtx.Args.DistLock)
  114. // if err != nil {
  115. // log.Warnf("acquiring dist lock: %s", err.Error())
  116. // return
  117. // }
  118. // defer mutex.Unlock()
  119. var willShrinks []clitypes.ObjectDetail
  120. for _, obj := range objs {
  121. newRed, selectedStorages := j.chooseRedundancy(ctx, obj)
  122. // 冗余策略不需要调整,就检查是否需要收缩
  123. if newRed == nil {
  124. willShrinks = append(willShrinks, obj)
  125. continue
  126. }
  127. updating, evt, err := j.doChangeRedundancy(ctx, obj, newRed, selectedStorages)
  128. if updating != nil {
  129. allUpdatings = append(allUpdatings, *updating)
  130. }
  131. if evt != nil {
  132. allSysEvts = append(allSysEvts, evt)
  133. }
  134. if err != nil {
  135. log.WithField("ObjectID", obj.Object.ObjectID).Warnf("%s, its redundancy wont be changed", err.Error())
  136. }
  137. }
  138. udpatings, sysEvts, err := j.doRedundancyShrink(ctx, pkg, willShrinks)
  139. if err != nil {
  140. log.Warnf("redundancy shrink: %s", err.Error())
  141. return err
  142. }
  143. allUpdatings = append(allUpdatings, udpatings...)
  144. allSysEvts = append(allSysEvts, sysEvts...)
  145. if len(allUpdatings) > 0 {
  146. err := db.DoTx10(db2, db2.Object().BatchUpdateRedundancy, allUpdatings)
  147. if err != nil {
  148. log.Warnf("update object redundancy: %s", err.Error())
  149. return err
  150. }
  151. }
  152. for _, e := range allSysEvts {
  153. ctx.ticktock.evtPub.Publish(e)
  154. }
  155. }
  156. return nil
  157. }

本项目旨在将云际存储公共基础设施化,使个人及企业可低门槛使用高效的云际存储服务(安装开箱即用云际存储客户端即可,无需关注其他组件的部署),同时支持用户灵活便捷定制云际存储的功能细节。