You can not select more than 25 topics Topics must start with a chinese character,a letter or number, can include dashes ('-') and can be up to 35 characters long.

redundancy_recover.go 40 kB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224
  1. package ticktock
  2. import (
  3. "context"
  4. "fmt"
  5. "strconv"
  6. "github.com/samber/lo"
  7. "gitlink.org.cn/cloudream/common/pkgs/ioswitch/exec"
  8. "gitlink.org.cn/cloudream/common/pkgs/logger"
  9. "gitlink.org.cn/cloudream/common/utils/math2"
  10. "gitlink.org.cn/cloudream/common/utils/sort2"
  11. "gitlink.org.cn/cloudream/jcs-pub/client/internal/db"
  12. clitypes "gitlink.org.cn/cloudream/jcs-pub/client/types"
  13. "gitlink.org.cn/cloudream/jcs-pub/common/models/datamap"
  14. "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/ioswitch2"
  15. "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/ioswitch2/ops2"
  16. "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/ioswitch2/parser"
  17. "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/ioswitchlrc"
  18. lrcparser "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/ioswitchlrc/parser"
  19. cortypes "gitlink.org.cn/cloudream/jcs-pub/coordinator/types"
  20. )
  21. func (t *ChangeRedundancy) chooseRedundancy(ctx *changeRedundancyContext, obj clitypes.ObjectDetail) (clitypes.Redundancy, []*userSpaceUsageInfo) {
  22. switch obj.Object.Redundancy.(type) {
  23. case *clitypes.NoneRedundancy:
  24. if obj.Object.Size > ctx.ticktock.cfg.ECFileSizeThreshold {
  25. newStgs := t.chooseNewUserSpacesForEC(ctx, &clitypes.DefaultECRedundancy)
  26. return &clitypes.DefaultECRedundancy, newStgs
  27. }
  28. return &clitypes.DefaultRepRedundancy, t.chooseNewUserSpacesForRep(ctx, &clitypes.DefaultRepRedundancy)
  29. case *clitypes.RepRedundancy:
  30. if obj.Object.Size >= ctx.ticktock.cfg.ECFileSizeThreshold {
  31. newStgs := t.chooseNewUserSpacesForEC(ctx, &clitypes.DefaultECRedundancy)
  32. return &clitypes.DefaultECRedundancy, newStgs
  33. }
  34. newSpaces := t.rechooseUserSpacesForRep(ctx, &clitypes.DefaultRepRedundancy)
  35. for _, s := range newSpaces {
  36. if !obj.ContainsBlock(0, s.UserSpace.UserSpace.UserSpaceID) && !obj.ContainsPinned(s.UserSpace.UserSpace.UserSpaceID) {
  37. return &clitypes.DefaultRepRedundancy, newSpaces
  38. }
  39. }
  40. return nil, nil
  41. case *clitypes.ECRedundancy:
  42. if obj.Object.Size < ctx.ticktock.cfg.ECFileSizeThreshold {
  43. return &clitypes.DefaultRepRedundancy, t.chooseNewUserSpacesForRep(ctx, &clitypes.DefaultRepRedundancy)
  44. }
  45. newSpaces := t.rechooseUserSpacesForEC(ctx, obj, &clitypes.DefaultECRedundancy)
  46. for i, s := range newSpaces {
  47. if !obj.ContainsBlock(i, s.UserSpace.UserSpace.UserSpaceID) {
  48. return &clitypes.DefaultECRedundancy, newSpaces
  49. }
  50. }
  51. return nil, nil
  52. case *clitypes.LRCRedundancy:
  53. newLRCStgs := t.rechooseUserSpacesForLRC(ctx, obj, &clitypes.DefaultLRCRedundancy)
  54. for i, s := range newLRCStgs {
  55. if !obj.ContainsBlock(i, s.UserSpace.UserSpace.UserSpaceID) {
  56. return &clitypes.DefaultLRCRedundancy, newLRCStgs
  57. }
  58. }
  59. return nil, nil
  60. }
  61. return nil, nil
  62. }
  63. func (t *ChangeRedundancy) doChangeRedundancy(ctx *changeRedundancyContext, obj clitypes.ObjectDetail, newRed clitypes.Redundancy, selectedUserSpaces []*userSpaceUsageInfo) (*db.UpdatingObjectRedundancy, datamap.SysEventBody, error) {
  64. log := logger.WithType[ChangeRedundancy]("TickTock")
  65. var updating *db.UpdatingObjectRedundancy
  66. var evt datamap.SysEventBody
  67. var err error
  68. switch srcRed := obj.Object.Redundancy.(type) {
  69. case *clitypes.NoneRedundancy:
  70. switch newRed := newRed.(type) {
  71. case *clitypes.RepRedundancy:
  72. log.WithField("ObjectID", obj.Object.ObjectID).Debugf("redundancy: none -> rep")
  73. updating, evt, err = t.noneToRep(ctx, obj, newRed, selectedUserSpaces)
  74. case *clitypes.ECRedundancy:
  75. log.WithField("ObjectID", obj.Object.ObjectID).Debugf("redundancy: none -> ec")
  76. updating, evt, err = t.noneToEC(ctx, obj, newRed, selectedUserSpaces)
  77. case *clitypes.LRCRedundancy:
  78. log.WithField("ObjectID", obj.Object.ObjectID).Debugf("redundancy: none -> lrc")
  79. updating, evt, err = t.noneToLRC(ctx, obj, newRed, selectedUserSpaces)
  80. case *clitypes.SegmentRedundancy:
  81. log.WithField("ObjectID", obj.Object.ObjectID).Debugf("redundancy: none -> segment")
  82. updating, evt, err = t.noneToSeg(ctx, obj, newRed, selectedUserSpaces)
  83. }
  84. case *clitypes.RepRedundancy:
  85. switch newRed := newRed.(type) {
  86. case *clitypes.RepRedundancy:
  87. updating, evt, err = t.repToRep(ctx, obj, srcRed, selectedUserSpaces)
  88. case *clitypes.ECRedundancy:
  89. log.WithField("ObjectID", obj.Object.ObjectID).Debugf("redundancy: rep -> ec")
  90. updating, evt, err = t.repToEC(ctx, obj, newRed, selectedUserSpaces)
  91. }
  92. case *clitypes.ECRedundancy:
  93. switch newRed := newRed.(type) {
  94. case *clitypes.RepRedundancy:
  95. log.WithField("ObjectID", obj.Object.ObjectID).Debugf("redundancy: ec -> rep")
  96. updating, evt, err = t.ecToRep(ctx, obj, srcRed, newRed, selectedUserSpaces)
  97. case *clitypes.ECRedundancy:
  98. updating, evt, err = t.ecToEC(ctx, obj, srcRed, newRed, selectedUserSpaces)
  99. }
  100. case *clitypes.LRCRedundancy:
  101. switch newRed := newRed.(type) {
  102. case *clitypes.LRCRedundancy:
  103. updating, evt, err = t.lrcToLRC(ctx, obj, srcRed, newRed, selectedUserSpaces)
  104. }
  105. }
  106. return updating, evt, err
  107. }
  108. // 统计每个对象块所在的节点,选出块最多的不超过userspaceCnt个节点
  109. func (t *ChangeRedundancy) summaryRepObjectBlockUserSpaces(ctx *changeRedundancyContext, objs []clitypes.ObjectDetail, userspaceCnt int) []clitypes.UserSpaceID {
  110. type stgBlocks struct {
  111. UserSpaceID clitypes.UserSpaceID
  112. Count int
  113. }
  114. stgBlocksMap := make(map[clitypes.UserSpaceID]*stgBlocks)
  115. for _, obj := range objs {
  116. shouldUseEC := obj.Object.Size > ctx.ticktock.cfg.ECFileSizeThreshold
  117. if _, ok := obj.Object.Redundancy.(*clitypes.RepRedundancy); ok && !shouldUseEC {
  118. for _, block := range obj.Blocks {
  119. if _, ok := stgBlocksMap[block.UserSpaceID]; !ok {
  120. stgBlocksMap[block.UserSpaceID] = &stgBlocks{
  121. UserSpaceID: block.UserSpaceID,
  122. Count: 0,
  123. }
  124. }
  125. stgBlocksMap[block.UserSpaceID].Count++
  126. }
  127. }
  128. }
  129. userspaces := lo.Values(stgBlocksMap)
  130. sort2.Sort(userspaces, func(left *stgBlocks, right *stgBlocks) int {
  131. return right.Count - left.Count
  132. })
  133. ids := lo.Map(userspaces, func(item *stgBlocks, idx int) clitypes.UserSpaceID { return item.UserSpaceID })
  134. if len(ids) > userspaceCnt {
  135. ids = ids[:userspaceCnt]
  136. }
  137. return ids
  138. }
  139. func (t *ChangeRedundancy) chooseNewUserSpacesForRep(ctx *changeRedundancyContext, red *clitypes.RepRedundancy) []*userSpaceUsageInfo {
  140. sortedUserSpaces := sort2.Sort(lo.Values(ctx.allUserSpaces), func(left *userSpaceUsageInfo, right *userSpaceUsageInfo) int {
  141. return sort2.Cmp(right.AccessAmount, left.AccessAmount)
  142. })
  143. return t.chooseSoManyUserSpaces(red.RepCount, sortedUserSpaces)
  144. }
  145. func (t *ChangeRedundancy) chooseNewUserSpacesForEC(ctx *changeRedundancyContext, red *clitypes.ECRedundancy) []*userSpaceUsageInfo {
  146. sortedUserSpaces := sort2.Sort(lo.Values(ctx.allUserSpaces), func(left *userSpaceUsageInfo, right *userSpaceUsageInfo) int {
  147. return sort2.Cmp(right.AccessAmount, left.AccessAmount)
  148. })
  149. return t.chooseSoManyUserSpaces(red.N, sortedUserSpaces)
  150. }
  151. func (t *ChangeRedundancy) chooseNewUserSpacesForLRC(ctx *changeRedundancyContext, red *clitypes.LRCRedundancy) []*userSpaceUsageInfo {
  152. sortedUserSpaces := sort2.Sort(lo.Values(ctx.allUserSpaces), func(left *userSpaceUsageInfo, right *userSpaceUsageInfo) int {
  153. return sort2.Cmp(right.AccessAmount, left.AccessAmount)
  154. })
  155. return t.chooseSoManyUserSpaces(red.N, sortedUserSpaces)
  156. }
  157. func (t *ChangeRedundancy) chooseNewUserSpacesForSeg(ctx *changeRedundancyContext, segCount int) []*userSpaceUsageInfo {
  158. sortedUserSpaces := sort2.Sort(lo.Values(ctx.allUserSpaces), func(left *userSpaceUsageInfo, right *userSpaceUsageInfo) int {
  159. return sort2.Cmp(right.AccessAmount, left.AccessAmount)
  160. })
  161. return t.chooseSoManyUserSpaces(segCount, sortedUserSpaces)
  162. }
  163. func (t *ChangeRedundancy) rechooseUserSpacesForRep(ctx *changeRedundancyContext, red *clitypes.RepRedundancy) []*userSpaceUsageInfo {
  164. type rechooseUserSpace struct {
  165. *userSpaceUsageInfo
  166. HasBlock bool
  167. }
  168. var rechooseStgs []*rechooseUserSpace
  169. for _, stg := range ctx.allUserSpaces {
  170. hasBlock := false
  171. for _, id := range ctx.mostBlockStgIDs {
  172. if id == stg.UserSpace.UserSpace.UserSpaceID {
  173. hasBlock = true
  174. break
  175. }
  176. }
  177. rechooseStgs = append(rechooseStgs, &rechooseUserSpace{
  178. userSpaceUsageInfo: stg,
  179. HasBlock: hasBlock,
  180. })
  181. }
  182. sortedStgs := sort2.Sort(rechooseStgs, func(left *rechooseUserSpace, right *rechooseUserSpace) int {
  183. // 已经缓存了文件块的节点优先选择
  184. v := sort2.CmpBool(right.HasBlock, left.HasBlock)
  185. if v != 0 {
  186. return v
  187. }
  188. return sort2.Cmp(right.AccessAmount, left.AccessAmount)
  189. })
  190. return t.chooseSoManyUserSpaces(red.RepCount, lo.Map(sortedStgs, func(userspace *rechooseUserSpace, idx int) *userSpaceUsageInfo { return userspace.userSpaceUsageInfo }))
  191. }
  192. func (t *ChangeRedundancy) rechooseUserSpacesForEC(ctx *changeRedundancyContext, obj clitypes.ObjectDetail, red *clitypes.ECRedundancy) []*userSpaceUsageInfo {
  193. type rechooseStg struct {
  194. *userSpaceUsageInfo
  195. CachedBlockIndex int
  196. }
  197. var rechooseStgs []*rechooseStg
  198. for _, stg := range ctx.allUserSpaces {
  199. cachedBlockIndex := -1
  200. for _, block := range obj.Blocks {
  201. if block.UserSpaceID == stg.UserSpace.UserSpace.UserSpaceID {
  202. cachedBlockIndex = block.Index
  203. break
  204. }
  205. }
  206. rechooseStgs = append(rechooseStgs, &rechooseStg{
  207. userSpaceUsageInfo: stg,
  208. CachedBlockIndex: cachedBlockIndex,
  209. })
  210. }
  211. sortedStgs := sort2.Sort(rechooseStgs, func(left *rechooseStg, right *rechooseStg) int {
  212. // 已经缓存了文件块的节点优先选择
  213. v := sort2.CmpBool(right.CachedBlockIndex > -1, left.CachedBlockIndex > -1)
  214. if v != 0 {
  215. return v
  216. }
  217. return sort2.Cmp(right.AccessAmount, left.AccessAmount)
  218. })
  219. // TODO 可以考虑选择已有块的节点时,能依然按照Index顺序选择
  220. return t.chooseSoManyUserSpaces(red.N, lo.Map(sortedStgs, func(userspace *rechooseStg, idx int) *userSpaceUsageInfo { return userspace.userSpaceUsageInfo }))
  221. }
  222. func (t *ChangeRedundancy) rechooseUserSpacesForLRC(ctx *changeRedundancyContext, obj clitypes.ObjectDetail, red *clitypes.LRCRedundancy) []*userSpaceUsageInfo {
  223. type rechooseStg struct {
  224. *userSpaceUsageInfo
  225. CachedBlockIndex int
  226. }
  227. var rechooseStgs []*rechooseStg
  228. for _, stg := range ctx.allUserSpaces {
  229. cachedBlockIndex := -1
  230. for _, block := range obj.Blocks {
  231. if block.UserSpaceID == stg.UserSpace.UserSpace.UserSpaceID {
  232. cachedBlockIndex = block.Index
  233. break
  234. }
  235. }
  236. rechooseStgs = append(rechooseStgs, &rechooseStg{
  237. userSpaceUsageInfo: stg,
  238. CachedBlockIndex: cachedBlockIndex,
  239. })
  240. }
  241. sortedStgs := sort2.Sort(rechooseStgs, func(left *rechooseStg, right *rechooseStg) int {
  242. // 已经缓存了文件块的节点优先选择
  243. v := sort2.CmpBool(right.CachedBlockIndex > -1, left.CachedBlockIndex > -1)
  244. if v != 0 {
  245. return v
  246. }
  247. return sort2.Cmp(right.AccessAmount, left.AccessAmount)
  248. })
  249. // TODO 可以考虑选择已有块的节点时,能依然按照Index顺序选择
  250. return t.chooseSoManyUserSpaces(red.N, lo.Map(sortedStgs, func(userspace *rechooseStg, idx int) *userSpaceUsageInfo { return userspace.userSpaceUsageInfo }))
  251. }
  252. func (t *ChangeRedundancy) chooseSoManyUserSpaces(count int, stgs []*userSpaceUsageInfo) []*userSpaceUsageInfo {
  253. repeateCount := (count + len(stgs) - 1) / len(stgs)
  254. extendStgs := make([]*userSpaceUsageInfo, repeateCount*len(stgs))
  255. // 使用复制的方式将节点数扩充到要求的数量
  256. // 复制之后的结构:ABCD -> AAABBBCCCDDD
  257. for p := 0; p < repeateCount; p++ {
  258. for i, userspace := range stgs {
  259. putIdx := i*repeateCount + p
  260. extendStgs[putIdx] = userspace
  261. }
  262. }
  263. extendStgs = extendStgs[:count]
  264. var chosen []*userSpaceUsageInfo
  265. for len(chosen) < count {
  266. // 在每一轮内都选不同地区的节点,如果节点数不够,那么就再来一轮
  267. chosenLocations := make(map[cortypes.Location]bool)
  268. for i, stg := range extendStgs {
  269. if stg == nil {
  270. continue
  271. }
  272. if chosenLocations[stg.UserSpace.UserSpace.Storage.GetLocation()] {
  273. continue
  274. }
  275. chosen = append(chosen, stg)
  276. chosenLocations[stg.UserSpace.UserSpace.Storage.GetLocation()] = true
  277. extendStgs[i] = nil
  278. }
  279. }
  280. return chosen
  281. }
  282. func (t *ChangeRedundancy) noneToRep(ctx *changeRedundancyContext, obj clitypes.ObjectDetail, red *clitypes.RepRedundancy, uploadStgs []*userSpaceUsageInfo) (*db.UpdatingObjectRedundancy, datamap.SysEventBody, error) {
  283. if len(obj.Blocks) == 0 {
  284. return nil, nil, fmt.Errorf("object is not cached on any userspaces, cannot change its redundancy to rep")
  285. }
  286. srcStg, ok := ctx.allUserSpaces[obj.Blocks[0].UserSpaceID]
  287. if !ok {
  288. return nil, nil, fmt.Errorf("userspace %v not found", obj.Blocks[0].UserSpaceID)
  289. }
  290. // 如果选择的备份节点都是同一个,那么就只要上传一次
  291. uploadStgs = lo.UniqBy(uploadStgs, func(item *userSpaceUsageInfo) clitypes.UserSpaceID { return item.UserSpace.UserSpace.UserSpaceID })
  292. ft := ioswitch2.NewFromTo()
  293. ft.AddFrom(ioswitch2.NewFromShardstore(obj.Object.FileHash, *srcStg.UserSpace, ioswitch2.RawStream()))
  294. for i, stg := range uploadStgs {
  295. ft.AddTo(ioswitch2.NewToShardStore(*stg.UserSpace, ioswitch2.RawStream(), fmt.Sprintf("%d", i)))
  296. }
  297. plans := exec.NewPlanBuilder()
  298. err := parser.Parse(ft, plans)
  299. if err != nil {
  300. return nil, nil, fmt.Errorf("parsing plan: %w", err)
  301. }
  302. // TODO 添加依赖
  303. execCtx := exec.NewExecContext()
  304. exec.SetValueByType(execCtx, ctx.ticktock.stgPool)
  305. ret, err := plans.Execute(execCtx).Wait(context.Background())
  306. if err != nil {
  307. return nil, nil, fmt.Errorf("executing io plan: %w", err)
  308. }
  309. var blocks []clitypes.ObjectBlock
  310. var blockChgs []datamap.BlockChange
  311. for i, stg := range uploadStgs {
  312. r := ret[fmt.Sprintf("%d", i)].(*ops2.FileInfoValue)
  313. blocks = append(blocks, clitypes.ObjectBlock{
  314. ObjectID: obj.Object.ObjectID,
  315. Index: 0,
  316. UserSpaceID: stg.UserSpace.UserSpace.UserSpaceID,
  317. FileHash: r.Hash,
  318. Size: r.Size,
  319. })
  320. blockChgs = append(blockChgs, &datamap.BlockChangeClone{
  321. BlockType: datamap.BlockTypeRaw,
  322. SourceUserSpaceID: obj.Blocks[0].UserSpaceID,
  323. TargetUserSpaceID: stg.UserSpace.UserSpace.UserSpaceID,
  324. TransferBytes: 1,
  325. })
  326. }
  327. // 删除原本的文件块
  328. blockChgs = append(blockChgs, &datamap.BlockChangeDeleted{
  329. Index: 0,
  330. UserSpaceID: obj.Blocks[0].UserSpaceID,
  331. })
  332. return &db.UpdatingObjectRedundancy{
  333. ObjectID: obj.Object.ObjectID,
  334. FileHash: obj.Object.FileHash,
  335. Size: obj.Object.Size,
  336. Redundancy: red,
  337. Blocks: blocks,
  338. }, &datamap.BodyBlockTransfer{
  339. ObjectID: obj.Object.ObjectID,
  340. PackageID: obj.Object.PackageID,
  341. BlockChanges: blockChgs,
  342. }, nil
  343. }
  344. func (t *ChangeRedundancy) noneToEC(ctx *changeRedundancyContext, obj clitypes.ObjectDetail, red *clitypes.ECRedundancy, uploadStgs []*userSpaceUsageInfo) (*db.UpdatingObjectRedundancy, datamap.SysEventBody, error) {
  345. if len(obj.Blocks) == 0 {
  346. return nil, nil, fmt.Errorf("object is not cached on any userspaces, cannot change its redundancy to ec")
  347. }
  348. srcStg, ok := ctx.allUserSpaces[obj.Blocks[0].UserSpaceID]
  349. if !ok {
  350. return nil, nil, fmt.Errorf("userspace %v not found", obj.Blocks[0].UserSpaceID)
  351. }
  352. ft := ioswitch2.NewFromTo()
  353. ft.ECParam = red
  354. ft.AddFrom(ioswitch2.NewFromShardstore(obj.Object.FileHash, *srcStg.UserSpace, ioswitch2.RawStream()))
  355. for i := 0; i < red.N; i++ {
  356. ft.AddTo(ioswitch2.NewToShardStore(*uploadStgs[i].UserSpace, ioswitch2.ECStream(i), fmt.Sprintf("%d", i)))
  357. }
  358. plans := exec.NewPlanBuilder()
  359. err := parser.Parse(ft, plans)
  360. if err != nil {
  361. return nil, nil, fmt.Errorf("parsing plan: %w", err)
  362. }
  363. execCtx := exec.NewExecContext()
  364. exec.SetValueByType(execCtx, ctx.ticktock.stgPool)
  365. ioRet, err := plans.Execute(execCtx).Wait(context.Background())
  366. if err != nil {
  367. return nil, nil, fmt.Errorf("executing io plan: %w", err)
  368. }
  369. var blocks []clitypes.ObjectBlock
  370. var evtTargetBlocks []datamap.Block
  371. var evtBlockTrans []datamap.DataTransfer
  372. for i := 0; i < red.N; i++ {
  373. r := ioRet[fmt.Sprintf("%d", i)].(*ops2.FileInfoValue)
  374. blocks = append(blocks, clitypes.ObjectBlock{
  375. ObjectID: obj.Object.ObjectID,
  376. Index: i,
  377. UserSpaceID: uploadStgs[i].UserSpace.UserSpace.UserSpaceID,
  378. FileHash: r.Hash,
  379. Size: r.Size,
  380. })
  381. evtTargetBlocks = append(evtTargetBlocks, datamap.Block{
  382. BlockType: datamap.BlockTypeEC,
  383. Index: i,
  384. UserSpaceID: uploadStgs[i].UserSpace.UserSpace.UserSpaceID,
  385. })
  386. evtBlockTrans = append(evtBlockTrans, datamap.DataTransfer{
  387. SourceUserSpaceID: obj.Blocks[0].UserSpaceID,
  388. TargetUserSpaceID: uploadStgs[i].UserSpace.UserSpace.UserSpaceID,
  389. TransferBytes: 1,
  390. })
  391. }
  392. return &db.UpdatingObjectRedundancy{
  393. ObjectID: obj.Object.ObjectID,
  394. FileHash: obj.Object.FileHash,
  395. Size: obj.Object.Size,
  396. Redundancy: red,
  397. Blocks: blocks,
  398. },
  399. &datamap.BodyBlockTransfer{
  400. ObjectID: obj.Object.ObjectID,
  401. PackageID: obj.Object.PackageID,
  402. BlockChanges: []datamap.BlockChange{
  403. &datamap.BlockChangeEnDecode{
  404. SourceBlocks: []datamap.Block{{
  405. BlockType: datamap.BlockTypeRaw,
  406. UserSpaceID: obj.Blocks[0].UserSpaceID,
  407. }},
  408. TargetBlocks: evtTargetBlocks,
  409. DataTransfers: evtBlockTrans,
  410. },
  411. // 删除原本的文件块
  412. &datamap.BlockChangeDeleted{
  413. Index: 0,
  414. UserSpaceID: obj.Blocks[0].UserSpaceID,
  415. },
  416. },
  417. }, nil
  418. }
  419. func (t *ChangeRedundancy) noneToLRC(ctx *changeRedundancyContext, obj clitypes.ObjectDetail, red *clitypes.LRCRedundancy, uploadStgs []*userSpaceUsageInfo) (*db.UpdatingObjectRedundancy, datamap.SysEventBody, error) {
  420. if len(obj.Blocks) == 0 {
  421. return nil, nil, fmt.Errorf("object is not cached on any userspaces, cannot change its redundancy to ec")
  422. }
  423. srcStg, ok := ctx.allUserSpaces[obj.Blocks[0].UserSpaceID]
  424. if !ok {
  425. return nil, nil, fmt.Errorf("userspace %v not found", obj.Blocks[0].UserSpaceID)
  426. }
  427. var toes []ioswitchlrc.To
  428. for i := 0; i < red.N; i++ {
  429. toes = append(toes, ioswitchlrc.NewToStorage(*uploadStgs[i].UserSpace, i, fmt.Sprintf("%d", i)))
  430. }
  431. plans := exec.NewPlanBuilder()
  432. err := lrcparser.Encode(ioswitchlrc.NewFromStorage(obj.Object.FileHash, *srcStg.UserSpace, -1), toes, plans)
  433. if err != nil {
  434. return nil, nil, fmt.Errorf("parsing plan: %w", err)
  435. }
  436. execCtx := exec.NewExecContext()
  437. exec.SetValueByType(execCtx, ctx.ticktock.stgPool)
  438. ioRet, err := plans.Execute(execCtx).Wait(context.Background())
  439. if err != nil {
  440. return nil, nil, fmt.Errorf("executing io plan: %w", err)
  441. }
  442. var blocks []clitypes.ObjectBlock
  443. var evtTargetBlocks []datamap.Block
  444. var evtBlockTrans []datamap.DataTransfer
  445. for i := 0; i < red.N; i++ {
  446. r := ioRet[fmt.Sprintf("%d", i)].(*ops2.FileInfoValue)
  447. blocks = append(blocks, clitypes.ObjectBlock{
  448. ObjectID: obj.Object.ObjectID,
  449. Index: i,
  450. UserSpaceID: uploadStgs[i].UserSpace.UserSpace.UserSpaceID,
  451. FileHash: r.Hash,
  452. Size: r.Size,
  453. })
  454. evtTargetBlocks = append(evtTargetBlocks, datamap.Block{
  455. BlockType: datamap.BlockTypeEC,
  456. Index: i,
  457. UserSpaceID: uploadStgs[i].UserSpace.UserSpace.UserSpaceID,
  458. })
  459. evtBlockTrans = append(evtBlockTrans, datamap.DataTransfer{
  460. SourceUserSpaceID: obj.Blocks[0].UserSpaceID,
  461. TargetUserSpaceID: uploadStgs[i].UserSpace.UserSpace.UserSpaceID,
  462. TransferBytes: 1,
  463. })
  464. }
  465. return &db.UpdatingObjectRedundancy{
  466. ObjectID: obj.Object.ObjectID,
  467. FileHash: obj.Object.FileHash,
  468. Size: obj.Object.Size,
  469. Redundancy: red,
  470. Blocks: blocks,
  471. },
  472. &datamap.BodyBlockTransfer{
  473. ObjectID: obj.Object.ObjectID,
  474. PackageID: obj.Object.PackageID,
  475. BlockChanges: []datamap.BlockChange{
  476. &datamap.BlockChangeEnDecode{
  477. SourceBlocks: []datamap.Block{{
  478. BlockType: datamap.BlockTypeRaw,
  479. UserSpaceID: obj.Blocks[0].UserSpaceID,
  480. }},
  481. TargetBlocks: evtTargetBlocks,
  482. DataTransfers: evtBlockTrans,
  483. },
  484. // 删除原本的文件块
  485. &datamap.BlockChangeDeleted{
  486. Index: 0,
  487. UserSpaceID: obj.Blocks[0].UserSpaceID,
  488. },
  489. },
  490. },
  491. nil
  492. }
  493. func (t *ChangeRedundancy) noneToSeg(ctx *changeRedundancyContext, obj clitypes.ObjectDetail, red *clitypes.SegmentRedundancy, uploadStgs []*userSpaceUsageInfo) (*db.UpdatingObjectRedundancy, datamap.SysEventBody, error) {
  494. if len(obj.Blocks) == 0 {
  495. return nil, nil, fmt.Errorf("object is not cached on any userspaces, cannot change its redundancy to rep")
  496. }
  497. srcStg, ok := ctx.allUserSpaces[obj.Blocks[0].UserSpaceID]
  498. if !ok {
  499. return nil, nil, fmt.Errorf("userspace %v not found", obj.Blocks[0].UserSpaceID)
  500. }
  501. // 如果选择的备份节点都是同一个,那么就只要上传一次
  502. uploadStgs = lo.UniqBy(uploadStgs, func(item *userSpaceUsageInfo) clitypes.UserSpaceID { return item.UserSpace.UserSpace.UserSpaceID })
  503. ft := ioswitch2.NewFromTo()
  504. ft.SegmentParam = red
  505. ft.AddFrom(ioswitch2.NewFromShardstore(obj.Object.FileHash, *srcStg.UserSpace, ioswitch2.RawStream()))
  506. for i, stg := range uploadStgs {
  507. ft.AddTo(ioswitch2.NewToShardStore(*stg.UserSpace, ioswitch2.SegmentStream(i), fmt.Sprintf("%d", i)))
  508. }
  509. plans := exec.NewPlanBuilder()
  510. err := parser.Parse(ft, plans)
  511. if err != nil {
  512. return nil, nil, fmt.Errorf("parsing plan: %w", err)
  513. }
  514. // TODO 添加依赖
  515. execCtx := exec.NewExecContext()
  516. exec.SetValueByType(execCtx, ctx.ticktock.stgPool)
  517. ret, err := plans.Execute(execCtx).Wait(context.Background())
  518. if err != nil {
  519. return nil, nil, fmt.Errorf("executing io plan: %w", err)
  520. }
  521. var blocks []clitypes.ObjectBlock
  522. var evtTargetBlocks []datamap.Block
  523. var evtBlockTrans []datamap.DataTransfer
  524. for i, stg := range uploadStgs {
  525. r := ret[fmt.Sprintf("%d", i)].(*ops2.FileInfoValue)
  526. blocks = append(blocks, clitypes.ObjectBlock{
  527. ObjectID: obj.Object.ObjectID,
  528. Index: i,
  529. UserSpaceID: stg.UserSpace.UserSpace.UserSpaceID,
  530. FileHash: r.Hash,
  531. Size: r.Size,
  532. })
  533. evtTargetBlocks = append(evtTargetBlocks, datamap.Block{
  534. BlockType: datamap.BlockTypeSegment,
  535. Index: i,
  536. UserSpaceID: uploadStgs[i].UserSpace.UserSpace.UserSpaceID,
  537. })
  538. evtBlockTrans = append(evtBlockTrans, datamap.DataTransfer{
  539. SourceUserSpaceID: obj.Blocks[0].UserSpaceID,
  540. TargetUserSpaceID: uploadStgs[i].UserSpace.UserSpace.UserSpaceID,
  541. TransferBytes: 1,
  542. })
  543. }
  544. return &db.UpdatingObjectRedundancy{
  545. ObjectID: obj.Object.ObjectID,
  546. FileHash: obj.Object.FileHash,
  547. Size: obj.Object.Size,
  548. Redundancy: red,
  549. Blocks: blocks,
  550. },
  551. &datamap.BodyBlockTransfer{
  552. ObjectID: obj.Object.ObjectID,
  553. PackageID: obj.Object.PackageID,
  554. BlockChanges: []datamap.BlockChange{
  555. &datamap.BlockChangeEnDecode{
  556. SourceBlocks: []datamap.Block{{
  557. BlockType: datamap.BlockTypeRaw,
  558. UserSpaceID: obj.Blocks[0].UserSpaceID,
  559. }},
  560. TargetBlocks: evtTargetBlocks,
  561. DataTransfers: evtBlockTrans,
  562. },
  563. // 删除原本的文件块
  564. &datamap.BlockChangeDeleted{
  565. Index: 0,
  566. UserSpaceID: obj.Blocks[0].UserSpaceID,
  567. },
  568. },
  569. },
  570. nil
  571. }
  572. func (t *ChangeRedundancy) repToRep(ctx *changeRedundancyContext, obj clitypes.ObjectDetail, red *clitypes.RepRedundancy, uploadStgs []*userSpaceUsageInfo) (*db.UpdatingObjectRedundancy, datamap.SysEventBody, error) {
  573. if len(obj.Blocks) == 0 {
  574. return nil, nil, fmt.Errorf("object is not cached on any userspaces, cannot change its redundancy to rep")
  575. }
  576. srcStg, ok := ctx.allUserSpaces[obj.Blocks[0].UserSpaceID]
  577. if !ok {
  578. return nil, nil, fmt.Errorf("userspace %v not found", obj.Blocks[0].UserSpaceID)
  579. }
  580. // 如果选择的备份节点都是同一个,那么就只要上传一次
  581. uploadStgs = lo.UniqBy(uploadStgs, func(item *userSpaceUsageInfo) clitypes.UserSpaceID { return item.UserSpace.UserSpace.UserSpaceID })
  582. ft := ioswitch2.NewFromTo()
  583. ft.AddFrom(ioswitch2.NewFromShardstore(obj.Object.FileHash, *srcStg.UserSpace, ioswitch2.RawStream()))
  584. for i, stg := range uploadStgs {
  585. ft.AddTo(ioswitch2.NewToShardStore(*stg.UserSpace, ioswitch2.RawStream(), fmt.Sprintf("%d", i)))
  586. }
  587. plans := exec.NewPlanBuilder()
  588. err := parser.Parse(ft, plans)
  589. if err != nil {
  590. return nil, nil, fmt.Errorf("parsing plan: %w", err)
  591. }
  592. // TODO 添加依赖
  593. execCtx := exec.NewExecContext()
  594. exec.SetValueByType(execCtx, ctx.ticktock.stgPool)
  595. ret, err := plans.Execute(execCtx).Wait(context.Background())
  596. if err != nil {
  597. return nil, nil, fmt.Errorf("executing io plan: %w", err)
  598. }
  599. var blocks []clitypes.ObjectBlock
  600. var blockChgs []datamap.BlockChange
  601. for i, stg := range uploadStgs {
  602. r := ret[fmt.Sprintf("%d", i)].(*ops2.FileInfoValue)
  603. blocks = append(blocks, clitypes.ObjectBlock{
  604. ObjectID: obj.Object.ObjectID,
  605. Index: 0,
  606. UserSpaceID: stg.UserSpace.UserSpace.UserSpaceID,
  607. FileHash: r.Hash,
  608. Size: r.Size,
  609. })
  610. blockChgs = append(blockChgs, &datamap.BlockChangeClone{
  611. BlockType: datamap.BlockTypeRaw,
  612. SourceUserSpaceID: obj.Blocks[0].UserSpaceID,
  613. TargetUserSpaceID: stg.UserSpace.UserSpace.UserSpaceID,
  614. TransferBytes: 1,
  615. })
  616. }
  617. // 删除原本的文件块
  618. blockChgs = append(blockChgs, &datamap.BlockChangeDeleted{
  619. Index: 0,
  620. UserSpaceID: obj.Blocks[0].UserSpaceID,
  621. })
  622. return &db.UpdatingObjectRedundancy{
  623. ObjectID: obj.Object.ObjectID,
  624. FileHash: obj.Object.FileHash,
  625. Size: obj.Object.Size,
  626. Redundancy: red,
  627. Blocks: blocks,
  628. },
  629. &datamap.BodyBlockTransfer{
  630. ObjectID: obj.Object.ObjectID,
  631. PackageID: obj.Object.PackageID,
  632. BlockChanges: blockChgs,
  633. },
  634. nil
  635. }
  636. func (t *ChangeRedundancy) repToEC(ctx *changeRedundancyContext, obj clitypes.ObjectDetail, red *clitypes.ECRedundancy, uploadUserSpaces []*userSpaceUsageInfo) (*db.UpdatingObjectRedundancy, datamap.SysEventBody, error) {
  637. return t.noneToEC(ctx, obj, red, uploadUserSpaces)
  638. }
  639. func (t *ChangeRedundancy) ecToRep(ctx *changeRedundancyContext, obj clitypes.ObjectDetail, srcRed *clitypes.ECRedundancy, tarRed *clitypes.RepRedundancy, uploadStgs []*userSpaceUsageInfo) (*db.UpdatingObjectRedundancy, datamap.SysEventBody, error) {
  640. var chosenBlocks []clitypes.GrouppedObjectBlock
  641. var chosenBlockIndexes []int
  642. var chosenBlockStg []clitypes.UserSpaceDetail
  643. for _, block := range obj.GroupBlocks() {
  644. if len(block.UserSpaceIDs) > 0 {
  645. // TODO 考虑选择最优的节点
  646. stg, ok := ctx.allUserSpaces[block.UserSpaceIDs[0]]
  647. if !ok {
  648. continue
  649. }
  650. chosenBlocks = append(chosenBlocks, block)
  651. chosenBlockIndexes = append(chosenBlockIndexes, block.Index)
  652. chosenBlockStg = append(chosenBlockStg, *stg.UserSpace)
  653. }
  654. if len(chosenBlocks) == srcRed.K {
  655. break
  656. }
  657. }
  658. if len(chosenBlocks) < srcRed.K {
  659. return nil, nil, fmt.Errorf("no enough blocks to reconstruct the original file data")
  660. }
  661. // 如果选择的备份节点都是同一个,那么就只要上传一次
  662. uploadStgs = lo.UniqBy(uploadStgs, func(item *userSpaceUsageInfo) clitypes.UserSpaceID { return item.UserSpace.UserSpace.UserSpaceID })
  663. planBlder := exec.NewPlanBuilder()
  664. ft := ioswitch2.NewFromTo()
  665. ft.ECParam = srcRed
  666. for i, block := range chosenBlocks {
  667. ft.AddFrom(ioswitch2.NewFromShardstore(block.FileHash, chosenBlockStg[i], ioswitch2.ECStream(block.Index)))
  668. }
  669. for i := range uploadStgs {
  670. ft.AddTo(ioswitch2.NewToShardStoreWithRange(*uploadStgs[i].UserSpace, ioswitch2.RawStream(), fmt.Sprintf("%d", i), math2.NewRange(0, obj.Object.Size)))
  671. }
  672. err := parser.Parse(ft, planBlder)
  673. if err != nil {
  674. return nil, nil, fmt.Errorf("parsing plan: %w", err)
  675. }
  676. // TODO 添加依赖
  677. execCtx := exec.NewExecContext()
  678. exec.SetValueByType(execCtx, ctx.ticktock.stgPool)
  679. ioRet, err := planBlder.Execute(execCtx).Wait(context.Background())
  680. if err != nil {
  681. return nil, nil, fmt.Errorf("executing io plan: %w", err)
  682. }
  683. var blocks []clitypes.ObjectBlock
  684. for i := range uploadStgs {
  685. r := ioRet[fmt.Sprintf("%d", i)].(*ops2.FileInfoValue)
  686. blocks = append(blocks, clitypes.ObjectBlock{
  687. ObjectID: obj.Object.ObjectID,
  688. Index: 0,
  689. UserSpaceID: uploadStgs[i].UserSpace.UserSpace.UserSpaceID,
  690. FileHash: r.Hash,
  691. Size: r.Size,
  692. })
  693. }
  694. var evtSrcBlocks []datamap.Block
  695. var evtTargetBlocks []datamap.Block
  696. for i2, block := range chosenBlocks {
  697. evtSrcBlocks = append(evtSrcBlocks, datamap.Block{
  698. BlockType: datamap.BlockTypeEC,
  699. Index: block.Index,
  700. UserSpaceID: chosenBlockStg[i2].UserSpace.UserSpaceID,
  701. })
  702. }
  703. for _, stg := range uploadStgs {
  704. evtTargetBlocks = append(evtTargetBlocks, datamap.Block{
  705. BlockType: datamap.BlockTypeRaw,
  706. Index: 0,
  707. UserSpaceID: stg.UserSpace.UserSpace.UserSpaceID,
  708. })
  709. }
  710. var evtBlockTrans []datamap.DataTransfer
  711. for _, stg := range uploadStgs {
  712. for i2 := range chosenBlocks {
  713. evtBlockTrans = append(evtBlockTrans, datamap.DataTransfer{
  714. SourceUserSpaceID: chosenBlockStg[i2].UserSpace.UserSpaceID,
  715. TargetUserSpaceID: stg.UserSpace.UserSpace.UserSpaceID,
  716. TransferBytes: 1,
  717. })
  718. }
  719. }
  720. var blockChgs []datamap.BlockChange
  721. blockChgs = append(blockChgs, &datamap.BlockChangeEnDecode{
  722. SourceBlocks: evtSrcBlocks,
  723. TargetBlocks: evtTargetBlocks,
  724. DataTransfers: evtBlockTrans,
  725. })
  726. for _, block := range obj.Blocks {
  727. blockChgs = append(blockChgs, &datamap.BlockChangeDeleted{
  728. Index: block.Index,
  729. UserSpaceID: block.UserSpaceID,
  730. })
  731. }
  732. return &db.UpdatingObjectRedundancy{
  733. ObjectID: obj.Object.ObjectID,
  734. FileHash: obj.Object.FileHash,
  735. Size: obj.Object.Size,
  736. Redundancy: tarRed,
  737. Blocks: blocks,
  738. },
  739. &datamap.BodyBlockTransfer{
  740. ObjectID: obj.Object.ObjectID,
  741. PackageID: obj.Object.PackageID,
  742. BlockChanges: blockChgs,
  743. },
  744. nil
  745. }
  746. func (t *ChangeRedundancy) ecToEC(ctx *changeRedundancyContext, obj clitypes.ObjectDetail, srcRed *clitypes.ECRedundancy, tarRed *clitypes.ECRedundancy, uploadUserSpaces []*userSpaceUsageInfo) (*db.UpdatingObjectRedundancy, datamap.SysEventBody, error) {
  747. grpBlocks := obj.GroupBlocks()
  748. var chosenBlocks []clitypes.GrouppedObjectBlock
  749. var chosenBlockStg []clitypes.UserSpaceDetail
  750. for _, block := range grpBlocks {
  751. if len(block.UserSpaceIDs) > 0 {
  752. stg, ok := ctx.allUserSpaces[block.UserSpaceIDs[0]]
  753. if !ok {
  754. continue
  755. }
  756. chosenBlocks = append(chosenBlocks, block)
  757. chosenBlockStg = append(chosenBlockStg, *stg.UserSpace)
  758. }
  759. if len(chosenBlocks) == srcRed.K {
  760. break
  761. }
  762. }
  763. if len(chosenBlocks) < srcRed.K {
  764. return nil, nil, fmt.Errorf("no enough blocks to reconstruct the original file data")
  765. }
  766. // 目前EC的参数都相同,所以可以不用重建出完整数据然后再分块,可以直接构建出目的节点需要的块
  767. planBlder := exec.NewPlanBuilder()
  768. var evtSrcBlocks []datamap.Block
  769. var evtTargetBlocks []datamap.Block
  770. ft := ioswitch2.NewFromTo()
  771. ft.ECParam = srcRed
  772. for i, block := range chosenBlocks {
  773. ft.AddFrom(ioswitch2.NewFromShardstore(block.FileHash, chosenBlockStg[i], ioswitch2.ECStream(block.Index)))
  774. evtSrcBlocks = append(evtSrcBlocks, datamap.Block{
  775. BlockType: datamap.BlockTypeEC,
  776. Index: block.Index,
  777. UserSpaceID: chosenBlockStg[i].UserSpace.UserSpaceID,
  778. })
  779. }
  780. var newBlocks []clitypes.ObjectBlock
  781. shouldUpdateBlocks := false
  782. for i, stg := range uploadUserSpaces {
  783. newBlock := clitypes.ObjectBlock{
  784. ObjectID: obj.Object.ObjectID,
  785. Index: i,
  786. UserSpaceID: stg.UserSpace.UserSpace.UserSpaceID,
  787. }
  788. grp, ok := lo.Find(grpBlocks, func(grp clitypes.GrouppedObjectBlock) bool { return grp.Index == i })
  789. // 如果新选中的节点已经记录在Block表中,那么就不需要任何变更
  790. if ok && lo.Contains(grp.UserSpaceIDs, stg.UserSpace.UserSpace.UserSpaceID) {
  791. newBlock.FileHash = grp.FileHash
  792. newBlock.Size = grp.Size
  793. newBlocks = append(newBlocks, newBlock)
  794. continue
  795. }
  796. shouldUpdateBlocks = true
  797. // 否则就要重建出这个节点需要的块
  798. // 输出只需要自己要保存的那一块
  799. ft.AddTo(ioswitch2.NewToShardStore(*stg.UserSpace, ioswitch2.ECStream(i), fmt.Sprintf("%d", i)))
  800. evtTargetBlocks = append(evtTargetBlocks, datamap.Block{
  801. BlockType: datamap.BlockTypeEC,
  802. Index: i,
  803. UserSpaceID: stg.UserSpace.UserSpace.UserSpaceID,
  804. })
  805. newBlocks = append(newBlocks, newBlock)
  806. }
  807. err := parser.Parse(ft, planBlder)
  808. if err != nil {
  809. return nil, nil, fmt.Errorf("parsing plan: %w", err)
  810. }
  811. // 如果没有任何Plan,Wait会直接返回成功
  812. execCtx := exec.NewExecContext()
  813. exec.SetValueByType(execCtx, ctx.ticktock.stgPool)
  814. ret, err := planBlder.Execute(execCtx).Wait(context.Background())
  815. if err != nil {
  816. return nil, nil, fmt.Errorf("executing io plan: %w", err)
  817. }
  818. if !shouldUpdateBlocks {
  819. return nil, nil, nil
  820. }
  821. for k, v := range ret {
  822. idx, err := strconv.ParseInt(k, 10, 64)
  823. if err != nil {
  824. return nil, nil, fmt.Errorf("parsing result key %s as index: %w", k, err)
  825. }
  826. r := v.(*ops2.FileInfoValue)
  827. newBlocks[idx].FileHash = r.Hash
  828. newBlocks[idx].Size = r.Size
  829. }
  830. var evtBlockTrans []datamap.DataTransfer
  831. for _, src := range evtSrcBlocks {
  832. for _, tar := range evtTargetBlocks {
  833. evtBlockTrans = append(evtBlockTrans, datamap.DataTransfer{
  834. SourceUserSpaceID: src.UserSpaceID,
  835. TargetUserSpaceID: tar.UserSpaceID,
  836. TransferBytes: 1,
  837. })
  838. }
  839. }
  840. var blockChgs []datamap.BlockChange
  841. for _, block := range obj.Blocks {
  842. keep := lo.ContainsBy(newBlocks, func(newBlock clitypes.ObjectBlock) bool {
  843. return newBlock.Index == block.Index && newBlock.UserSpaceID == block.UserSpaceID
  844. })
  845. if !keep {
  846. blockChgs = append(blockChgs, &datamap.BlockChangeDeleted{
  847. Index: block.Index,
  848. UserSpaceID: block.UserSpaceID,
  849. })
  850. }
  851. }
  852. blockChgs = append(blockChgs, &datamap.BlockChangeEnDecode{
  853. SourceBlocks: evtSrcBlocks,
  854. TargetBlocks: evtTargetBlocks,
  855. DataTransfers: evtBlockTrans,
  856. })
  857. return &db.UpdatingObjectRedundancy{
  858. ObjectID: obj.Object.ObjectID,
  859. FileHash: obj.Object.FileHash,
  860. Size: obj.Object.Size,
  861. Redundancy: tarRed,
  862. Blocks: newBlocks,
  863. },
  864. &datamap.BodyBlockTransfer{
  865. ObjectID: obj.Object.ObjectID,
  866. PackageID: obj.Object.PackageID,
  867. BlockChanges: blockChgs,
  868. },
  869. nil
  870. }
  871. func (t *ChangeRedundancy) lrcToLRC(ctx *changeRedundancyContext, obj clitypes.ObjectDetail, srcRed *clitypes.LRCRedundancy, tarRed *clitypes.LRCRedundancy, uploadUserSpaces []*userSpaceUsageInfo) (*db.UpdatingObjectRedundancy, datamap.SysEventBody, error) {
  872. blocksGrpByIndex := obj.GroupBlocks()
  873. var lostBlocks []int
  874. var lostBlockGrps []int
  875. canGroupReconstruct := true
  876. allBlockFlags := make([]bool, srcRed.N)
  877. for _, block := range blocksGrpByIndex {
  878. allBlockFlags[block.Index] = true
  879. }
  880. for i, ok := range allBlockFlags {
  881. grpID := srcRed.FindGroup(i)
  882. if !ok {
  883. if grpID == -1 {
  884. canGroupReconstruct = false
  885. break
  886. }
  887. if len(lostBlocks) > 0 && lostBlockGrps[len(lostBlockGrps)-1] == grpID {
  888. canGroupReconstruct = false
  889. break
  890. }
  891. lostBlocks = append(lostBlocks, i)
  892. lostBlockGrps = append(lostBlockGrps, grpID)
  893. }
  894. }
  895. // TODO 产生BlockTransfer事件
  896. if canGroupReconstruct {
  897. // return t.groupReconstructLRC(obj, lostBlocks, lostBlockGrps, blocksGrpByIndex, srcRed, uploadUserSpaces)
  898. }
  899. return t.reconstructLRC(ctx, obj, blocksGrpByIndex, srcRed, uploadUserSpaces)
  900. }
  901. /*
  902. TODO2 修复这一块的代码
  903. func (t *ChangeRedundancy) groupReconstructLRC(obj clitypes.ObjectDetail, lostBlocks []int, lostBlockGrps []int, grpedBlocks []clitypes.GrouppedObjectBlock, red *clitypes.LRCRedundancy, uploadUserSpaces []*UserSpaceLoadInfo) (*db.UpdatingObjectRedundancy, error) {
  904. grped := make(map[int]clitypes.GrouppedObjectBlock)
  905. for _, b := range grpedBlocks {
  906. grped[b.Index] = b
  907. }
  908. plans := exec.NewPlanBuilder()
  909. for i := 0; i < len(lostBlocks); i++ {
  910. var froms []ioswitchlrc.From
  911. grpEles := red.GetGroupElements(lostBlockGrps[i])
  912. for _, ele := range grpEles {
  913. if ele == lostBlocks[i] {
  914. continue
  915. }
  916. froms = append(froms, ioswitchlrc.NewFromUserSpace(grped[ele].FileHash, nil, ele))
  917. }
  918. err := lrcparser.ReconstructGroup(froms, []ioswitchlrc.To{
  919. ioswitchlrc.NewToUserSpace(uploadUserSpaces[i].UserSpace, lostBlocks[i], fmt.Sprintf("%d", lostBlocks[i])),
  920. }, plans)
  921. if err != nil {
  922. return nil, fmt.Errorf("parsing plan: %w", err)
  923. }
  924. }
  925. fmt.Printf("plans: %v\n", plans)
  926. // 如果没有任何Plan,Wait会直接返回成功
  927. // TODO 添加依赖
  928. ret, err := plans.Execute(exec.NewExecContext()).Wait(context.TODO())
  929. if err != nil {
  930. return nil, fmt.Errorf("executing io plan: %w", err)
  931. }
  932. var newBlocks []clitypes.ObjectBlock
  933. for _, i := range lostBlocks {
  934. newBlocks = append(newBlocks, clitypes.ObjectBlock{
  935. ObjectID: obj.Object.ObjectID,
  936. Index: i,
  937. UserSpaceID: uploadUserSpaces[i].UserSpace.UserSpace.UserSpaceID,
  938. FileHash: ret[fmt.Sprintf("%d", i)].(*ops2.FileHashValue).Hash,
  939. })
  940. }
  941. for _, b := range grpedBlocks {
  942. for _, hubID := range b.UserSpaceIDs {
  943. newBlocks = append(newBlocks, clitypes.ObjectBlock{
  944. ObjectID: obj.Object.ObjectID,
  945. Index: b.Index,
  946. UserSpaceID: hubID,
  947. FileHash: b.FileHash,
  948. })
  949. }
  950. }
  951. return &db.UpdatingObjectRedundancy{
  952. ObjectID: obj.Object.ObjectID,
  953. Redundancy: red,
  954. Blocks: newBlocks,
  955. }, nil
  956. }
  957. */
  958. func (t *ChangeRedundancy) reconstructLRC(ctx *changeRedundancyContext, obj clitypes.ObjectDetail, grpBlocks []clitypes.GrouppedObjectBlock, red *clitypes.LRCRedundancy, uploadUserSpaces []*userSpaceUsageInfo) (*db.UpdatingObjectRedundancy, datamap.SysEventBody, error) {
  959. var chosenBlocks []clitypes.GrouppedObjectBlock
  960. var chosenBlockStg []clitypes.UserSpaceDetail
  961. for _, block := range grpBlocks {
  962. if len(block.UserSpaceIDs) > 0 && block.Index < red.M() {
  963. stg, ok := ctx.allUserSpaces[block.UserSpaceIDs[0]]
  964. if !ok {
  965. continue
  966. }
  967. chosenBlocks = append(chosenBlocks, block)
  968. chosenBlockStg = append(chosenBlockStg, *stg.UserSpace)
  969. }
  970. if len(chosenBlocks) == red.K {
  971. break
  972. }
  973. }
  974. if len(chosenBlocks) < red.K {
  975. return nil, nil, fmt.Errorf("no enough blocks to reconstruct the original file data")
  976. }
  977. // 目前LRC的参数都相同,所以可以不用重建出完整数据然后再分块,可以直接构建出目的节点需要的块
  978. planBlder := exec.NewPlanBuilder()
  979. var froms []ioswitchlrc.From
  980. var toes []ioswitchlrc.To
  981. var newBlocks []clitypes.ObjectBlock
  982. shouldUpdateBlocks := false
  983. for i, userspace := range uploadUserSpaces {
  984. newBlock := clitypes.ObjectBlock{
  985. ObjectID: obj.Object.ObjectID,
  986. Index: i,
  987. UserSpaceID: userspace.UserSpace.UserSpace.UserSpaceID,
  988. }
  989. grp, ok := lo.Find(grpBlocks, func(grp clitypes.GrouppedObjectBlock) bool { return grp.Index == i })
  990. // 如果新选中的节点已经记录在Block表中,那么就不需要任何变更
  991. if ok && lo.Contains(grp.UserSpaceIDs, userspace.UserSpace.UserSpace.UserSpaceID) {
  992. newBlock.FileHash = grp.FileHash
  993. newBlock.Size = grp.Size
  994. newBlocks = append(newBlocks, newBlock)
  995. continue
  996. }
  997. shouldUpdateBlocks = true
  998. // 否则就要重建出这个节点需要的块
  999. for i2, block := range chosenBlocks {
  1000. froms = append(froms, ioswitchlrc.NewFromStorage(block.FileHash, chosenBlockStg[i2], block.Index))
  1001. }
  1002. // 输出只需要自己要保存的那一块
  1003. toes = append(toes, ioswitchlrc.NewToStorage(*userspace.UserSpace, i, fmt.Sprintf("%d", i)))
  1004. newBlocks = append(newBlocks, newBlock)
  1005. }
  1006. err := lrcparser.ReconstructAny(froms, toes, planBlder)
  1007. if err != nil {
  1008. return nil, nil, fmt.Errorf("parsing plan: %w", err)
  1009. }
  1010. fmt.Printf("plans: %v\n", planBlder)
  1011. // 如果没有任何Plan,Wait会直接返回成功
  1012. execCtx := exec.NewExecContext()
  1013. exec.SetValueByType(execCtx, ctx.ticktock.stgPool)
  1014. ret, err := planBlder.Execute(execCtx).Wait(context.Background())
  1015. if err != nil {
  1016. return nil, nil, fmt.Errorf("executing io plan: %w", err)
  1017. }
  1018. if !shouldUpdateBlocks {
  1019. return nil, nil, nil
  1020. }
  1021. for k, v := range ret {
  1022. idx, err := strconv.ParseInt(k, 10, 64)
  1023. if err != nil {
  1024. return nil, nil, fmt.Errorf("parsing result key %s as index: %w", k, err)
  1025. }
  1026. r := v.(*ops2.FileInfoValue)
  1027. newBlocks[idx].FileHash = r.Hash
  1028. newBlocks[idx].Size = r.Size
  1029. }
  1030. // TODO 产生系统事件
  1031. return &db.UpdatingObjectRedundancy{
  1032. ObjectID: obj.Object.ObjectID,
  1033. FileHash: obj.Object.FileHash,
  1034. Size: obj.Object.Size,
  1035. Redundancy: red,
  1036. Blocks: newBlocks,
  1037. }, nil, nil
  1038. }

本项目旨在将云际存储公共基础设施化,使个人及企业可低门槛使用高效的云际存储服务(安装开箱即用云际存储客户端即可,无需关注其他组件的部署),同时支持用户灵活便捷定制云际存储的功能细节。