You can not select more than 25 topics Topics must start with a chinese character,a letter or number, can include dashes ('-') and can be up to 35 characters long.

redundancy_recover.go 40 kB


  1. package ticktock
  2. import (
  3. "context"
  4. "fmt"
  5. "strconv"
  6. "github.com/samber/lo"
  7. "gitlink.org.cn/cloudream/common/pkgs/ioswitch/exec"
  8. "gitlink.org.cn/cloudream/common/pkgs/logger"
  9. "gitlink.org.cn/cloudream/common/utils/math2"
  10. "gitlink.org.cn/cloudream/common/utils/sort2"
  11. "gitlink.org.cn/cloudream/jcs-pub/client/internal/db"
  12. clitypes "gitlink.org.cn/cloudream/jcs-pub/client/types"
  13. "gitlink.org.cn/cloudream/jcs-pub/common/models/datamap"
  14. "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/ioswitch2"
  15. "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/ioswitch2/ops2"
  16. "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/ioswitch2/parser"
  17. "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/ioswitchlrc"
  18. lrcparser "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/ioswitchlrc/parser"
  19. cortypes "gitlink.org.cn/cloudream/jcs-pub/coordinator/types"
  20. )
  21. func (t *ChangeRedundancy) chooseRedundancy(ctx *changeRedundancyContext, obj clitypes.ObjectDetail) (clitypes.Redundancy, []*userSpaceUsageInfo) {
  22. switch obj.Object.Redundancy.(type) {
  23. case *clitypes.NoneRedundancy:
  24. if obj.Object.Size > ctx.ticktock.cfg.ECFileSizeThreshold {
  25. newStgs := t.chooseNewUserSpacesForEC(ctx, &clitypes.DefaultECRedundancy)
  26. return &clitypes.DefaultECRedundancy, newStgs
  27. }
  28. return &clitypes.DefaultRepRedundancy, t.chooseNewUserSpacesForRep(ctx, &clitypes.DefaultRepRedundancy)
  29. case *clitypes.RepRedundancy:
  30. if obj.Object.Size >= ctx.ticktock.cfg.ECFileSizeThreshold {
  31. newStgs := t.chooseNewUserSpacesForEC(ctx, &clitypes.DefaultECRedundancy)
  32. return &clitypes.DefaultECRedundancy, newStgs
  33. }
  34. return clitypes.DefaultRepRedundancy, t.rechooseUserSpacesForRep(ctx, &clitypes.DefaultRepRedundancy)
  35. case *clitypes.ECRedundancy:
  36. if obj.Object.Size < ctx.ticktock.cfg.ECFileSizeThreshold {
  37. return &clitypes.DefaultRepRedundancy, t.chooseNewUserSpacesForRep(ctx, &clitypes.DefaultRepRedundancy)
  38. }
  39. return clitypes.DefaultECRedundancy, t.rechooseUserSpacesForEC(ctx, obj, &clitypes.DefaultECRedundancy)
  40. case *clitypes.LRCRedundancy:
  41. newLRCStgs := t.rechooseUserSpacesForLRC(ctx, obj, &clitypes.DefaultLRCRedundancy)
  42. return &clitypes.DefaultLRCRedundancy, newLRCStgs
  43. }
  44. return nil, nil
  45. }
  46. func (t *ChangeRedundancy) doChangeRedundancy(ctx *changeRedundancyContext, obj clitypes.ObjectDetail, newRed clitypes.Redundancy, selectedUserSpaces []*userSpaceUsageInfo) (*db.UpdatingObjectRedundancy, datamap.SysEventBody, error) {
  47. log := logger.WithType[ChangeRedundancy]("TickTock")
  48. var updating *db.UpdatingObjectRedundancy
  49. var evt datamap.SysEventBody
  50. var err error
  51. switch srcRed := obj.Object.Redundancy.(type) {
  52. case *clitypes.NoneRedundancy:
  53. switch newRed := newRed.(type) {
  54. case *clitypes.RepRedundancy:
  55. log.WithField("ObjectID", obj.Object.ObjectID).Debugf("redundancy: none -> rep")
  56. updating, evt, err = t.noneToRep(ctx, obj, newRed, selectedUserSpaces)
  57. case *clitypes.ECRedundancy:
  58. log.WithField("ObjectID", obj.Object.ObjectID).Debugf("redundancy: none -> ec")
  59. updating, evt, err = t.noneToEC(ctx, obj, newRed, selectedUserSpaces)
  60. case *clitypes.LRCRedundancy:
  61. log.WithField("ObjectID", obj.Object.ObjectID).Debugf("redundancy: none -> lrc")
  62. updating, evt, err = t.noneToLRC(ctx, obj, newRed, selectedUserSpaces)
  63. case *clitypes.SegmentRedundancy:
  64. log.WithField("ObjectID", obj.Object.ObjectID).Debugf("redundancy: none -> segment")
  65. updating, evt, err = t.noneToSeg(ctx, obj, newRed, selectedUserSpaces)
  66. }
  67. case *clitypes.RepRedundancy:
  68. switch newRed := newRed.(type) {
  69. case *clitypes.RepRedundancy:
  70. updating, evt, err = t.repToRep(ctx, obj, srcRed, selectedUserSpaces)
  71. case *clitypes.ECRedundancy:
  72. log.WithField("ObjectID", obj.Object.ObjectID).Debugf("redundancy: rep -> ec")
  73. updating, evt, err = t.repToEC(ctx, obj, newRed, selectedUserSpaces)
  74. }
  75. case *clitypes.ECRedundancy:
  76. switch newRed := newRed.(type) {
  77. case *clitypes.RepRedundancy:
  78. log.WithField("ObjectID", obj.Object.ObjectID).Debugf("redundancy: ec -> rep")
  79. updating, evt, err = t.ecToRep(ctx, obj, srcRed, newRed, selectedUserSpaces)
  80. case *clitypes.ECRedundancy:
  81. updating, evt, err = t.ecToEC(ctx, obj, srcRed, newRed, selectedUserSpaces)
  82. }
  83. case *clitypes.LRCRedundancy:
  84. switch newRed := newRed.(type) {
  85. case *clitypes.LRCRedundancy:
  86. updating, evt, err = t.lrcToLRC(ctx, obj, srcRed, newRed, selectedUserSpaces)
  87. }
  88. }
  89. return updating, evt, err
  90. }
  91. // 统计每个对象块所在的节点,选出块最多的不超过userspaceCnt个节点
  92. func (t *ChangeRedundancy) summaryRepObjectBlockUserSpaces(ctx *changeRedundancyContext, objs []clitypes.ObjectDetail, userspaceCnt int) []clitypes.UserSpaceID {
  93. type stgBlocks struct {
  94. UserSpaceID clitypes.UserSpaceID
  95. Count int
  96. }
  97. stgBlocksMap := make(map[clitypes.UserSpaceID]*stgBlocks)
  98. for _, obj := range objs {
  99. shouldUseEC := obj.Object.Size > ctx.ticktock.cfg.ECFileSizeThreshold
  100. if _, ok := obj.Object.Redundancy.(*clitypes.RepRedundancy); ok && !shouldUseEC {
  101. for _, block := range obj.Blocks {
  102. if _, ok := stgBlocksMap[block.UserSpaceID]; !ok {
  103. stgBlocksMap[block.UserSpaceID] = &stgBlocks{
  104. UserSpaceID: block.UserSpaceID,
  105. Count: 0,
  106. }
  107. }
  108. stgBlocksMap[block.UserSpaceID].Count++
  109. }
  110. }
  111. }
  112. userspaces := lo.Values(stgBlocksMap)
  113. sort2.Sort(userspaces, func(left *stgBlocks, right *stgBlocks) int {
  114. return right.Count - left.Count
  115. })
  116. ids := lo.Map(userspaces, func(item *stgBlocks, idx int) clitypes.UserSpaceID { return item.UserSpaceID })
  117. if len(ids) > userspaceCnt {
  118. ids = ids[:userspaceCnt]
  119. }
  120. return ids
  121. }
  122. func (t *ChangeRedundancy) chooseNewUserSpacesForRep(ctx *changeRedundancyContext, red *clitypes.RepRedundancy) []*userSpaceUsageInfo {
  123. sortedUserSpaces := sort2.Sort(lo.Values(ctx.allUserSpaces), func(left *userSpaceUsageInfo, right *userSpaceUsageInfo) int {
  124. return sort2.Cmp(right.AccessAmount, left.AccessAmount)
  125. })
  126. return t.chooseSoManyUserSpaces(red.RepCount, sortedUserSpaces)
  127. }
  128. func (t *ChangeRedundancy) chooseNewUserSpacesForEC(ctx *changeRedundancyContext, red *clitypes.ECRedundancy) []*userSpaceUsageInfo {
  129. sortedUserSpaces := sort2.Sort(lo.Values(ctx.allUserSpaces), func(left *userSpaceUsageInfo, right *userSpaceUsageInfo) int {
  130. return sort2.Cmp(right.AccessAmount, left.AccessAmount)
  131. })
  132. return t.chooseSoManyUserSpaces(red.N, sortedUserSpaces)
  133. }
  134. func (t *ChangeRedundancy) chooseNewUserSpacesForLRC(ctx *changeRedundancyContext, red *clitypes.LRCRedundancy) []*userSpaceUsageInfo {
  135. sortedUserSpaces := sort2.Sort(lo.Values(ctx.allUserSpaces), func(left *userSpaceUsageInfo, right *userSpaceUsageInfo) int {
  136. return sort2.Cmp(right.AccessAmount, left.AccessAmount)
  137. })
  138. return t.chooseSoManyUserSpaces(red.N, sortedUserSpaces)
  139. }
  140. func (t *ChangeRedundancy) chooseNewUserSpacesForSeg(ctx *changeRedundancyContext, segCount int) []*userSpaceUsageInfo {
  141. sortedUserSpaces := sort2.Sort(lo.Values(ctx.allUserSpaces), func(left *userSpaceUsageInfo, right *userSpaceUsageInfo) int {
  142. return sort2.Cmp(right.AccessAmount, left.AccessAmount)
  143. })
  144. return t.chooseSoManyUserSpaces(segCount, sortedUserSpaces)
  145. }
  146. func (t *ChangeRedundancy) rechooseUserSpacesForRep(ctx *changeRedundancyContext, red *clitypes.RepRedundancy) []*userSpaceUsageInfo {
  147. type rechooseUserSpace struct {
  148. *userSpaceUsageInfo
  149. HasBlock bool
  150. }
  151. var rechooseStgs []*rechooseUserSpace
  152. for _, stg := range ctx.allUserSpaces {
  153. hasBlock := false
  154. for _, id := range ctx.mostBlockStgIDs {
  155. if id == stg.UserSpace.UserSpace.UserSpaceID {
  156. hasBlock = true
  157. break
  158. }
  159. }
  160. rechooseStgs = append(rechooseStgs, &rechooseUserSpace{
  161. userSpaceUsageInfo: stg,
  162. HasBlock: hasBlock,
  163. })
  164. }
  165. sortedStgs := sort2.Sort(rechooseStgs, func(left *rechooseUserSpace, right *rechooseUserSpace) int {
  166. // 已经缓存了文件块的节点优先选择
  167. v := sort2.CmpBool(right.HasBlock, left.HasBlock)
  168. if v != 0 {
  169. return v
  170. }
  171. return sort2.Cmp(right.AccessAmount, left.AccessAmount)
  172. })
  173. return t.chooseSoManyUserSpaces(red.RepCount, lo.Map(sortedStgs, func(userspace *rechooseUserSpace, idx int) *userSpaceUsageInfo { return userspace.userSpaceUsageInfo }))
  174. }
  175. func (t *ChangeRedundancy) rechooseUserSpacesForEC(ctx *changeRedundancyContext, obj clitypes.ObjectDetail, red *clitypes.ECRedundancy) []*userSpaceUsageInfo {
  176. type rechooseStg struct {
  177. *userSpaceUsageInfo
  178. CachedBlockIndex int
  179. }
  180. var rechooseStgs []*rechooseStg
  181. for _, stg := range ctx.allUserSpaces {
  182. cachedBlockIndex := -1
  183. for _, block := range obj.Blocks {
  184. if block.UserSpaceID == stg.UserSpace.UserSpace.UserSpaceID {
  185. cachedBlockIndex = block.Index
  186. break
  187. }
  188. }
  189. rechooseStgs = append(rechooseStgs, &rechooseStg{
  190. userSpaceUsageInfo: stg,
  191. CachedBlockIndex: cachedBlockIndex,
  192. })
  193. }
  194. sortedStgs := sort2.Sort(rechooseStgs, func(left *rechooseStg, right *rechooseStg) int {
  195. // 已经缓存了文件块的节点优先选择
  196. v := sort2.CmpBool(right.CachedBlockIndex > -1, left.CachedBlockIndex > -1)
  197. if v != 0 {
  198. return v
  199. }
  200. return sort2.Cmp(right.AccessAmount, left.AccessAmount)
  201. })
  202. // TODO 可以考虑选择已有块的节点时,能依然按照Index顺序选择
  203. return t.chooseSoManyUserSpaces(red.N, lo.Map(sortedStgs, func(userspace *rechooseStg, idx int) *userSpaceUsageInfo { return userspace.userSpaceUsageInfo }))
  204. }
  205. func (t *ChangeRedundancy) rechooseUserSpacesForLRC(ctx *changeRedundancyContext, obj clitypes.ObjectDetail, red *clitypes.LRCRedundancy) []*userSpaceUsageInfo {
  206. type rechooseStg struct {
  207. *userSpaceUsageInfo
  208. CachedBlockIndex int
  209. }
  210. var rechooseStgs []*rechooseStg
  211. for _, stg := range ctx.allUserSpaces {
  212. cachedBlockIndex := -1
  213. for _, block := range obj.Blocks {
  214. if block.UserSpaceID == stg.UserSpace.UserSpace.UserSpaceID {
  215. cachedBlockIndex = block.Index
  216. break
  217. }
  218. }
  219. rechooseStgs = append(rechooseStgs, &rechooseStg{
  220. userSpaceUsageInfo: stg,
  221. CachedBlockIndex: cachedBlockIndex,
  222. })
  223. }
  224. sortedStgs := sort2.Sort(rechooseStgs, func(left *rechooseStg, right *rechooseStg) int {
  225. // 已经缓存了文件块的节点优先选择
  226. v := sort2.CmpBool(right.CachedBlockIndex > -1, left.CachedBlockIndex > -1)
  227. if v != 0 {
  228. return v
  229. }
  230. return sort2.Cmp(right.AccessAmount, left.AccessAmount)
  231. })
  232. // TODO 可以考虑选择已有块的节点时,能依然按照Index顺序选择
  233. return t.chooseSoManyUserSpaces(red.N, lo.Map(sortedStgs, func(userspace *rechooseStg, idx int) *userSpaceUsageInfo { return userspace.userSpaceUsageInfo }))
  234. }
  235. func (t *ChangeRedundancy) chooseSoManyUserSpaces(count int, stgs []*userSpaceUsageInfo) []*userSpaceUsageInfo {
  236. repeateCount := (count + len(stgs) - 1) / len(stgs)
  237. extendStgs := make([]*userSpaceUsageInfo, repeateCount*len(stgs))
  238. // 使用复制的方式将节点数扩充到要求的数量
  239. // 复制之后的结构:ABCD -> AAABBBCCCDDD
  240. for p := 0; p < repeateCount; p++ {
  241. for i, userspace := range stgs {
  242. putIdx := i*repeateCount + p
  243. extendStgs[putIdx] = userspace
  244. }
  245. }
  246. extendStgs = extendStgs[:count]
  247. var chosen []*userSpaceUsageInfo
  248. for len(chosen) < count {
  249. // 在每一轮内都选不同地区的节点,如果节点数不够,那么就再来一轮
  250. chosenLocations := make(map[cortypes.Location]bool)
  251. for i, stg := range extendStgs {
  252. if stg == nil {
  253. continue
  254. }
  255. if chosenLocations[stg.UserSpace.UserSpace.Storage.GetLocation()] {
  256. continue
  257. }
  258. chosen = append(chosen, stg)
  259. chosenLocations[stg.UserSpace.UserSpace.Storage.GetLocation()] = true
  260. extendStgs[i] = nil
  261. }
  262. }
  263. return chosen
  264. }
  265. func (t *ChangeRedundancy) noneToRep(ctx *changeRedundancyContext, obj clitypes.ObjectDetail, red *clitypes.RepRedundancy, uploadStgs []*userSpaceUsageInfo) (*db.UpdatingObjectRedundancy, datamap.SysEventBody, error) {
  266. if len(obj.Blocks) == 0 {
  267. return nil, nil, fmt.Errorf("object is not cached on any userspaces, cannot change its redundancy to rep")
  268. }
  269. srcStg, ok := ctx.allUserSpaces[obj.Blocks[0].UserSpaceID]
  270. if !ok {
  271. return nil, nil, fmt.Errorf("userspace %v not found", obj.Blocks[0].UserSpaceID)
  272. }
  273. // 如果选择的备份节点都是同一个,那么就只要上传一次
  274. uploadStgs = lo.UniqBy(uploadStgs, func(item *userSpaceUsageInfo) clitypes.UserSpaceID { return item.UserSpace.UserSpace.UserSpaceID })
  275. ft := ioswitch2.NewFromTo()
  276. ft.AddFrom(ioswitch2.NewFromShardstore(obj.Object.FileHash, *srcStg.UserSpace, ioswitch2.RawStream()))
  277. for i, stg := range uploadStgs {
  278. ft.AddTo(ioswitch2.NewToShardStore(*stg.UserSpace, ioswitch2.RawStream(), fmt.Sprintf("%d", i)))
  279. }
  280. plans := exec.NewPlanBuilder()
  281. err := parser.Parse(ft, plans)
  282. if err != nil {
  283. return nil, nil, fmt.Errorf("parsing plan: %w", err)
  284. }
  285. // TODO 添加依赖
  286. execCtx := exec.NewExecContext()
  287. exec.SetValueByType(execCtx, ctx.ticktock.stgPool)
  288. ret, err := plans.Execute(execCtx).Wait(context.Background())
  289. if err != nil {
  290. return nil, nil, fmt.Errorf("executing io plan: %w", err)
  291. }
  292. var blocks []clitypes.ObjectBlock
  293. var blockChgs []datamap.BlockChange
  294. for i, stg := range uploadStgs {
  295. r := ret[fmt.Sprintf("%d", i)].(*ops2.FileInfoValue)
  296. blocks = append(blocks, clitypes.ObjectBlock{
  297. ObjectID: obj.Object.ObjectID,
  298. Index: 0,
  299. UserSpaceID: stg.UserSpace.UserSpace.UserSpaceID,
  300. FileHash: r.Hash,
  301. Size: r.Size,
  302. })
  303. blockChgs = append(blockChgs, &datamap.BlockChangeClone{
  304. BlockType: datamap.BlockTypeRaw,
  305. SourceUserSpaceID: obj.Blocks[0].UserSpaceID,
  306. TargetUserSpaceID: stg.UserSpace.UserSpace.UserSpaceID,
  307. TransferBytes: 1,
  308. })
  309. }
  310. // 删除原本的文件块
  311. blockChgs = append(blockChgs, &datamap.BlockChangeDeleted{
  312. Index: 0,
  313. UserSpaceID: obj.Blocks[0].UserSpaceID,
  314. })
  315. return &db.UpdatingObjectRedundancy{
  316. ObjectID: obj.Object.ObjectID,
  317. FileHash: obj.Object.FileHash,
  318. Size: obj.Object.Size,
  319. Redundancy: red,
  320. Blocks: blocks,
  321. }, &datamap.BodyBlockTransfer{
  322. ObjectID: obj.Object.ObjectID,
  323. PackageID: obj.Object.PackageID,
  324. BlockChanges: blockChgs,
  325. }, nil
  326. }
  327. func (t *ChangeRedundancy) noneToEC(ctx *changeRedundancyContext, obj clitypes.ObjectDetail, red *clitypes.ECRedundancy, uploadStgs []*userSpaceUsageInfo) (*db.UpdatingObjectRedundancy, datamap.SysEventBody, error) {
  328. if len(obj.Blocks) == 0 {
  329. return nil, nil, fmt.Errorf("object is not cached on any userspaces, cannot change its redundancy to ec")
  330. }
  331. srcStg, ok := ctx.allUserSpaces[obj.Blocks[0].UserSpaceID]
  332. if !ok {
  333. return nil, nil, fmt.Errorf("userspace %v not found", obj.Blocks[0].UserSpaceID)
  334. }
  335. ft := ioswitch2.NewFromTo()
  336. ft.ECParam = red
  337. ft.AddFrom(ioswitch2.NewFromShardstore(obj.Object.FileHash, *srcStg.UserSpace, ioswitch2.RawStream()))
  338. for i := 0; i < red.N; i++ {
  339. ft.AddTo(ioswitch2.NewToShardStore(*uploadStgs[i].UserSpace, ioswitch2.ECStream(i), fmt.Sprintf("%d", i)))
  340. }
  341. plans := exec.NewPlanBuilder()
  342. err := parser.Parse(ft, plans)
  343. if err != nil {
  344. return nil, nil, fmt.Errorf("parsing plan: %w", err)
  345. }
  346. execCtx := exec.NewExecContext()
  347. exec.SetValueByType(execCtx, ctx.ticktock.stgPool)
  348. ioRet, err := plans.Execute(execCtx).Wait(context.Background())
  349. if err != nil {
  350. return nil, nil, fmt.Errorf("executing io plan: %w", err)
  351. }
  352. var blocks []clitypes.ObjectBlock
  353. var evtTargetBlocks []datamap.Block
  354. var evtBlockTrans []datamap.DataTransfer
  355. for i := 0; i < red.N; i++ {
  356. r := ioRet[fmt.Sprintf("%d", i)].(*ops2.FileInfoValue)
  357. blocks = append(blocks, clitypes.ObjectBlock{
  358. ObjectID: obj.Object.ObjectID,
  359. Index: i,
  360. UserSpaceID: uploadStgs[i].UserSpace.UserSpace.UserSpaceID,
  361. FileHash: r.Hash,
  362. Size: r.Size,
  363. })
  364. evtTargetBlocks = append(evtTargetBlocks, datamap.Block{
  365. BlockType: datamap.BlockTypeEC,
  366. Index: i,
  367. UserSpaceID: uploadStgs[i].UserSpace.UserSpace.UserSpaceID,
  368. })
  369. evtBlockTrans = append(evtBlockTrans, datamap.DataTransfer{
  370. SourceUserSpaceID: obj.Blocks[0].UserSpaceID,
  371. TargetUserSpaceID: uploadStgs[i].UserSpace.UserSpace.UserSpaceID,
  372. TransferBytes: 1,
  373. })
  374. }
  375. return &db.UpdatingObjectRedundancy{
  376. ObjectID: obj.Object.ObjectID,
  377. FileHash: obj.Object.FileHash,
  378. Size: obj.Object.Size,
  379. Redundancy: red,
  380. Blocks: blocks,
  381. },
  382. &datamap.BodyBlockTransfer{
  383. ObjectID: obj.Object.ObjectID,
  384. PackageID: obj.Object.PackageID,
  385. BlockChanges: []datamap.BlockChange{
  386. &datamap.BlockChangeEnDecode{
  387. SourceBlocks: []datamap.Block{{
  388. BlockType: datamap.BlockTypeRaw,
  389. UserSpaceID: obj.Blocks[0].UserSpaceID,
  390. }},
  391. TargetBlocks: evtTargetBlocks,
  392. DataTransfers: evtBlockTrans,
  393. },
  394. // 删除原本的文件块
  395. &datamap.BlockChangeDeleted{
  396. Index: 0,
  397. UserSpaceID: obj.Blocks[0].UserSpaceID,
  398. },
  399. },
  400. }, nil
  401. }
  402. func (t *ChangeRedundancy) noneToLRC(ctx *changeRedundancyContext, obj clitypes.ObjectDetail, red *clitypes.LRCRedundancy, uploadStgs []*userSpaceUsageInfo) (*db.UpdatingObjectRedundancy, datamap.SysEventBody, error) {
  403. if len(obj.Blocks) == 0 {
  404. return nil, nil, fmt.Errorf("object is not cached on any userspaces, cannot change its redundancy to ec")
  405. }
  406. srcStg, ok := ctx.allUserSpaces[obj.Blocks[0].UserSpaceID]
  407. if !ok {
  408. return nil, nil, fmt.Errorf("userspace %v not found", obj.Blocks[0].UserSpaceID)
  409. }
  410. var toes []ioswitchlrc.To
  411. for i := 0; i < red.N; i++ {
  412. toes = append(toes, ioswitchlrc.NewToStorage(*uploadStgs[i].UserSpace, i, fmt.Sprintf("%d", i)))
  413. }
  414. plans := exec.NewPlanBuilder()
  415. err := lrcparser.Encode(ioswitchlrc.NewFromStorage(obj.Object.FileHash, *srcStg.UserSpace, -1), toes, plans)
  416. if err != nil {
  417. return nil, nil, fmt.Errorf("parsing plan: %w", err)
  418. }
  419. execCtx := exec.NewExecContext()
  420. exec.SetValueByType(execCtx, ctx.ticktock.stgPool)
  421. ioRet, err := plans.Execute(execCtx).Wait(context.Background())
  422. if err != nil {
  423. return nil, nil, fmt.Errorf("executing io plan: %w", err)
  424. }
  425. var blocks []clitypes.ObjectBlock
  426. var evtTargetBlocks []datamap.Block
  427. var evtBlockTrans []datamap.DataTransfer
  428. for i := 0; i < red.N; i++ {
  429. r := ioRet[fmt.Sprintf("%d", i)].(*ops2.FileInfoValue)
  430. blocks = append(blocks, clitypes.ObjectBlock{
  431. ObjectID: obj.Object.ObjectID,
  432. Index: i,
  433. UserSpaceID: uploadStgs[i].UserSpace.UserSpace.UserSpaceID,
  434. FileHash: r.Hash,
  435. Size: r.Size,
  436. })
  437. evtTargetBlocks = append(evtTargetBlocks, datamap.Block{
  438. BlockType: datamap.BlockTypeEC,
  439. Index: i,
  440. UserSpaceID: uploadStgs[i].UserSpace.UserSpace.UserSpaceID,
  441. })
  442. evtBlockTrans = append(evtBlockTrans, datamap.DataTransfer{
  443. SourceUserSpaceID: obj.Blocks[0].UserSpaceID,
  444. TargetUserSpaceID: uploadStgs[i].UserSpace.UserSpace.UserSpaceID,
  445. TransferBytes: 1,
  446. })
  447. }
  448. return &db.UpdatingObjectRedundancy{
  449. ObjectID: obj.Object.ObjectID,
  450. FileHash: obj.Object.FileHash,
  451. Size: obj.Object.Size,
  452. Redundancy: red,
  453. Blocks: blocks,
  454. },
  455. &datamap.BodyBlockTransfer{
  456. ObjectID: obj.Object.ObjectID,
  457. PackageID: obj.Object.PackageID,
  458. BlockChanges: []datamap.BlockChange{
  459. &datamap.BlockChangeEnDecode{
  460. SourceBlocks: []datamap.Block{{
  461. BlockType: datamap.BlockTypeRaw,
  462. UserSpaceID: obj.Blocks[0].UserSpaceID,
  463. }},
  464. TargetBlocks: evtTargetBlocks,
  465. DataTransfers: evtBlockTrans,
  466. },
  467. // 删除原本的文件块
  468. &datamap.BlockChangeDeleted{
  469. Index: 0,
  470. UserSpaceID: obj.Blocks[0].UserSpaceID,
  471. },
  472. },
  473. },
  474. nil
  475. }
  476. func (t *ChangeRedundancy) noneToSeg(ctx *changeRedundancyContext, obj clitypes.ObjectDetail, red *clitypes.SegmentRedundancy, uploadStgs []*userSpaceUsageInfo) (*db.UpdatingObjectRedundancy, datamap.SysEventBody, error) {
  477. if len(obj.Blocks) == 0 {
  478. return nil, nil, fmt.Errorf("object is not cached on any userspaces, cannot change its redundancy to rep")
  479. }
  480. srcStg, ok := ctx.allUserSpaces[obj.Blocks[0].UserSpaceID]
  481. if !ok {
  482. return nil, nil, fmt.Errorf("userspace %v not found", obj.Blocks[0].UserSpaceID)
  483. }
  484. // 如果选择的备份节点都是同一个,那么就只要上传一次
  485. uploadStgs = lo.UniqBy(uploadStgs, func(item *userSpaceUsageInfo) clitypes.UserSpaceID { return item.UserSpace.UserSpace.UserSpaceID })
  486. ft := ioswitch2.NewFromTo()
  487. ft.SegmentParam = red
  488. ft.AddFrom(ioswitch2.NewFromShardstore(obj.Object.FileHash, *srcStg.UserSpace, ioswitch2.RawStream()))
  489. for i, stg := range uploadStgs {
  490. ft.AddTo(ioswitch2.NewToShardStore(*stg.UserSpace, ioswitch2.SegmentStream(i), fmt.Sprintf("%d", i)))
  491. }
  492. plans := exec.NewPlanBuilder()
  493. err := parser.Parse(ft, plans)
  494. if err != nil {
  495. return nil, nil, fmt.Errorf("parsing plan: %w", err)
  496. }
  497. // TODO 添加依赖
  498. execCtx := exec.NewExecContext()
  499. exec.SetValueByType(execCtx, ctx.ticktock.stgPool)
  500. ret, err := plans.Execute(execCtx).Wait(context.Background())
  501. if err != nil {
  502. return nil, nil, fmt.Errorf("executing io plan: %w", err)
  503. }
  504. var blocks []clitypes.ObjectBlock
  505. var evtTargetBlocks []datamap.Block
  506. var evtBlockTrans []datamap.DataTransfer
  507. for i, stg := range uploadStgs {
  508. r := ret[fmt.Sprintf("%d", i)].(*ops2.FileInfoValue)
  509. blocks = append(blocks, clitypes.ObjectBlock{
  510. ObjectID: obj.Object.ObjectID,
  511. Index: i,
  512. UserSpaceID: stg.UserSpace.UserSpace.UserSpaceID,
  513. FileHash: r.Hash,
  514. Size: r.Size,
  515. })
  516. evtTargetBlocks = append(evtTargetBlocks, datamap.Block{
  517. BlockType: datamap.BlockTypeSegment,
  518. Index: i,
  519. UserSpaceID: uploadStgs[i].UserSpace.UserSpace.UserSpaceID,
  520. })
  521. evtBlockTrans = append(evtBlockTrans, datamap.DataTransfer{
  522. SourceUserSpaceID: obj.Blocks[0].UserSpaceID,
  523. TargetUserSpaceID: uploadStgs[i].UserSpace.UserSpace.UserSpaceID,
  524. TransferBytes: 1,
  525. })
  526. }
  527. return &db.UpdatingObjectRedundancy{
  528. ObjectID: obj.Object.ObjectID,
  529. FileHash: obj.Object.FileHash,
  530. Size: obj.Object.Size,
  531. Redundancy: red,
  532. Blocks: blocks,
  533. },
  534. &datamap.BodyBlockTransfer{
  535. ObjectID: obj.Object.ObjectID,
  536. PackageID: obj.Object.PackageID,
  537. BlockChanges: []datamap.BlockChange{
  538. &datamap.BlockChangeEnDecode{
  539. SourceBlocks: []datamap.Block{{
  540. BlockType: datamap.BlockTypeRaw,
  541. UserSpaceID: obj.Blocks[0].UserSpaceID,
  542. }},
  543. TargetBlocks: evtTargetBlocks,
  544. DataTransfers: evtBlockTrans,
  545. },
  546. // 删除原本的文件块
  547. &datamap.BlockChangeDeleted{
  548. Index: 0,
  549. UserSpaceID: obj.Blocks[0].UserSpaceID,
  550. },
  551. },
  552. },
  553. nil
  554. }
  555. func (t *ChangeRedundancy) repToRep(ctx *changeRedundancyContext, obj clitypes.ObjectDetail, red *clitypes.RepRedundancy, uploadStgs []*userSpaceUsageInfo) (*db.UpdatingObjectRedundancy, datamap.SysEventBody, error) {
  556. if len(obj.Blocks) == 0 {
  557. return nil, nil, fmt.Errorf("object is not cached on any userspaces, cannot change its redundancy to rep")
  558. }
  559. srcStg, ok := ctx.allUserSpaces[obj.Blocks[0].UserSpaceID]
  560. if !ok {
  561. return nil, nil, fmt.Errorf("userspace %v not found", obj.Blocks[0].UserSpaceID)
  562. }
  563. // 如果选择的备份节点都是同一个,那么就只要上传一次
  564. uploadStgs = lo.UniqBy(uploadStgs, func(item *userSpaceUsageInfo) clitypes.UserSpaceID { return item.UserSpace.UserSpace.UserSpaceID })
  565. ft := ioswitch2.NewFromTo()
  566. ft.AddFrom(ioswitch2.NewFromShardstore(obj.Object.FileHash, *srcStg.UserSpace, ioswitch2.RawStream()))
  567. for i, stg := range uploadStgs {
  568. ft.AddTo(ioswitch2.NewToShardStore(*stg.UserSpace, ioswitch2.RawStream(), fmt.Sprintf("%d", i)))
  569. }
  570. plans := exec.NewPlanBuilder()
  571. err := parser.Parse(ft, plans)
  572. if err != nil {
  573. return nil, nil, fmt.Errorf("parsing plan: %w", err)
  574. }
  575. // TODO 添加依赖
  576. execCtx := exec.NewExecContext()
  577. exec.SetValueByType(execCtx, ctx.ticktock.stgPool)
  578. ret, err := plans.Execute(execCtx).Wait(context.Background())
  579. if err != nil {
  580. return nil, nil, fmt.Errorf("executing io plan: %w", err)
  581. }
  582. var blocks []clitypes.ObjectBlock
  583. var blockChgs []datamap.BlockChange
  584. for i, stg := range uploadStgs {
  585. r := ret[fmt.Sprintf("%d", i)].(*ops2.FileInfoValue)
  586. blocks = append(blocks, clitypes.ObjectBlock{
  587. ObjectID: obj.Object.ObjectID,
  588. Index: 0,
  589. UserSpaceID: stg.UserSpace.UserSpace.UserSpaceID,
  590. FileHash: r.Hash,
  591. Size: r.Size,
  592. })
  593. blockChgs = append(blockChgs, &datamap.BlockChangeClone{
  594. BlockType: datamap.BlockTypeRaw,
  595. SourceUserSpaceID: obj.Blocks[0].UserSpaceID,
  596. TargetUserSpaceID: stg.UserSpace.UserSpace.UserSpaceID,
  597. TransferBytes: 1,
  598. })
  599. }
  600. // 删除原本的文件块
  601. blockChgs = append(blockChgs, &datamap.BlockChangeDeleted{
  602. Index: 0,
  603. UserSpaceID: obj.Blocks[0].UserSpaceID,
  604. })
  605. return &db.UpdatingObjectRedundancy{
  606. ObjectID: obj.Object.ObjectID,
  607. FileHash: obj.Object.FileHash,
  608. Size: obj.Object.Size,
  609. Redundancy: red,
  610. Blocks: blocks,
  611. },
  612. &datamap.BodyBlockTransfer{
  613. ObjectID: obj.Object.ObjectID,
  614. PackageID: obj.Object.PackageID,
  615. BlockChanges: blockChgs,
  616. },
  617. nil
  618. }
  619. func (t *ChangeRedundancy) repToEC(ctx *changeRedundancyContext, obj clitypes.ObjectDetail, red *clitypes.ECRedundancy, uploadUserSpaces []*userSpaceUsageInfo) (*db.UpdatingObjectRedundancy, datamap.SysEventBody, error) {
  620. return t.noneToEC(ctx, obj, red, uploadUserSpaces)
  621. }
  622. func (t *ChangeRedundancy) ecToRep(ctx *changeRedundancyContext, obj clitypes.ObjectDetail, srcRed *clitypes.ECRedundancy, tarRed *clitypes.RepRedundancy, uploadStgs []*userSpaceUsageInfo) (*db.UpdatingObjectRedundancy, datamap.SysEventBody, error) {
  623. var chosenBlocks []clitypes.GrouppedObjectBlock
  624. var chosenBlockIndexes []int
  625. var chosenBlockStg []clitypes.UserSpaceDetail
  626. for _, block := range obj.GroupBlocks() {
  627. if len(block.UserSpaceIDs) > 0 {
  628. // TODO 考虑选择最优的节点
  629. stg, ok := ctx.allUserSpaces[block.UserSpaceIDs[0]]
  630. if !ok {
  631. continue
  632. }
  633. chosenBlocks = append(chosenBlocks, block)
  634. chosenBlockIndexes = append(chosenBlockIndexes, block.Index)
  635. chosenBlockStg = append(chosenBlockStg, *stg.UserSpace)
  636. }
  637. if len(chosenBlocks) == srcRed.K {
  638. break
  639. }
  640. }
  641. if len(chosenBlocks) < srcRed.K {
  642. return nil, nil, fmt.Errorf("no enough blocks to reconstruct the original file data")
  643. }
  644. // 如果选择的备份节点都是同一个,那么就只要上传一次
  645. uploadStgs = lo.UniqBy(uploadStgs, func(item *userSpaceUsageInfo) clitypes.UserSpaceID { return item.UserSpace.UserSpace.UserSpaceID })
  646. planBlder := exec.NewPlanBuilder()
  647. ft := ioswitch2.NewFromTo()
  648. ft.ECParam = srcRed
  649. for i, block := range chosenBlocks {
  650. ft.AddFrom(ioswitch2.NewFromShardstore(block.FileHash, chosenBlockStg[i], ioswitch2.ECStream(block.Index)))
  651. }
  652. for i := range uploadStgs {
  653. ft.AddTo(ioswitch2.NewToShardStoreWithRange(*uploadStgs[i].UserSpace, ioswitch2.RawStream(), fmt.Sprintf("%d", i), math2.NewRange(0, obj.Object.Size)))
  654. }
  655. err := parser.Parse(ft, planBlder)
  656. if err != nil {
  657. return nil, nil, fmt.Errorf("parsing plan: %w", err)
  658. }
  659. // TODO 添加依赖
  660. execCtx := exec.NewExecContext()
  661. exec.SetValueByType(execCtx, ctx.ticktock.stgPool)
  662. ioRet, err := planBlder.Execute(execCtx).Wait(context.Background())
  663. if err != nil {
  664. return nil, nil, fmt.Errorf("executing io plan: %w", err)
  665. }
  666. var blocks []clitypes.ObjectBlock
  667. for i := range uploadStgs {
  668. r := ioRet[fmt.Sprintf("%d", i)].(*ops2.FileInfoValue)
  669. blocks = append(blocks, clitypes.ObjectBlock{
  670. ObjectID: obj.Object.ObjectID,
  671. Index: 0,
  672. UserSpaceID: uploadStgs[i].UserSpace.UserSpace.UserSpaceID,
  673. FileHash: r.Hash,
  674. Size: r.Size,
  675. })
  676. }
  677. var evtSrcBlocks []datamap.Block
  678. var evtTargetBlocks []datamap.Block
  679. for i2, block := range chosenBlocks {
  680. evtSrcBlocks = append(evtSrcBlocks, datamap.Block{
  681. BlockType: datamap.BlockTypeEC,
  682. Index: block.Index,
  683. UserSpaceID: chosenBlockStg[i2].UserSpace.UserSpaceID,
  684. })
  685. }
  686. for _, stg := range uploadStgs {
  687. evtTargetBlocks = append(evtTargetBlocks, datamap.Block{
  688. BlockType: datamap.BlockTypeRaw,
  689. Index: 0,
  690. UserSpaceID: stg.UserSpace.UserSpace.UserSpaceID,
  691. })
  692. }
  693. var evtBlockTrans []datamap.DataTransfer
  694. for _, stg := range uploadStgs {
  695. for i2 := range chosenBlocks {
  696. evtBlockTrans = append(evtBlockTrans, datamap.DataTransfer{
  697. SourceUserSpaceID: chosenBlockStg[i2].UserSpace.UserSpaceID,
  698. TargetUserSpaceID: stg.UserSpace.UserSpace.UserSpaceID,
  699. TransferBytes: 1,
  700. })
  701. }
  702. }
  703. var blockChgs []datamap.BlockChange
  704. blockChgs = append(blockChgs, &datamap.BlockChangeEnDecode{
  705. SourceBlocks: evtSrcBlocks,
  706. TargetBlocks: evtTargetBlocks,
  707. DataTransfers: evtBlockTrans,
  708. })
  709. for _, block := range obj.Blocks {
  710. blockChgs = append(blockChgs, &datamap.BlockChangeDeleted{
  711. Index: block.Index,
  712. UserSpaceID: block.UserSpaceID,
  713. })
  714. }
  715. return &db.UpdatingObjectRedundancy{
  716. ObjectID: obj.Object.ObjectID,
  717. FileHash: obj.Object.FileHash,
  718. Size: obj.Object.Size,
  719. Redundancy: tarRed,
  720. Blocks: blocks,
  721. },
  722. &datamap.BodyBlockTransfer{
  723. ObjectID: obj.Object.ObjectID,
  724. PackageID: obj.Object.PackageID,
  725. BlockChanges: blockChgs,
  726. },
  727. nil
  728. }
  729. func (t *ChangeRedundancy) ecToEC(ctx *changeRedundancyContext, obj clitypes.ObjectDetail, srcRed *clitypes.ECRedundancy, tarRed *clitypes.ECRedundancy, uploadUserSpaces []*userSpaceUsageInfo) (*db.UpdatingObjectRedundancy, datamap.SysEventBody, error) {
  730. grpBlocks := obj.GroupBlocks()
  731. var chosenBlocks []clitypes.GrouppedObjectBlock
  732. var chosenBlockStg []clitypes.UserSpaceDetail
  733. for _, block := range grpBlocks {
  734. if len(block.UserSpaceIDs) > 0 {
  735. stg, ok := ctx.allUserSpaces[block.UserSpaceIDs[0]]
  736. if !ok {
  737. continue
  738. }
  739. chosenBlocks = append(chosenBlocks, block)
  740. chosenBlockStg = append(chosenBlockStg, *stg.UserSpace)
  741. }
  742. if len(chosenBlocks) == srcRed.K {
  743. break
  744. }
  745. }
  746. if len(chosenBlocks) < srcRed.K {
  747. return nil, nil, fmt.Errorf("no enough blocks to reconstruct the original file data")
  748. }
  749. // 目前EC的参数都相同,所以可以不用重建出完整数据然后再分块,可以直接构建出目的节点需要的块
  750. planBlder := exec.NewPlanBuilder()
  751. var evtSrcBlocks []datamap.Block
  752. var evtTargetBlocks []datamap.Block
  753. ft := ioswitch2.NewFromTo()
  754. ft.ECParam = srcRed
  755. for i, block := range chosenBlocks {
  756. ft.AddFrom(ioswitch2.NewFromShardstore(block.FileHash, chosenBlockStg[i], ioswitch2.ECStream(block.Index)))
  757. evtSrcBlocks = append(evtSrcBlocks, datamap.Block{
  758. BlockType: datamap.BlockTypeEC,
  759. Index: block.Index,
  760. UserSpaceID: chosenBlockStg[i].UserSpace.UserSpaceID,
  761. })
  762. }
  763. var newBlocks []clitypes.ObjectBlock
  764. shouldUpdateBlocks := false
  765. for i, stg := range uploadUserSpaces {
  766. newBlock := clitypes.ObjectBlock{
  767. ObjectID: obj.Object.ObjectID,
  768. Index: i,
  769. UserSpaceID: stg.UserSpace.UserSpace.UserSpaceID,
  770. }
  771. grp, ok := lo.Find(grpBlocks, func(grp clitypes.GrouppedObjectBlock) bool { return grp.Index == i })
  772. // 如果新选中的节点已经记录在Block表中,那么就不需要任何变更
  773. if ok && lo.Contains(grp.UserSpaceIDs, stg.UserSpace.UserSpace.UserSpaceID) {
  774. newBlock.FileHash = grp.FileHash
  775. newBlock.Size = grp.Size
  776. newBlocks = append(newBlocks, newBlock)
  777. continue
  778. }
  779. shouldUpdateBlocks = true
  780. // 否则就要重建出这个节点需要的块
  781. // 输出只需要自己要保存的那一块
  782. ft.AddTo(ioswitch2.NewToShardStore(*stg.UserSpace, ioswitch2.ECStream(i), fmt.Sprintf("%d", i)))
  783. evtTargetBlocks = append(evtTargetBlocks, datamap.Block{
  784. BlockType: datamap.BlockTypeEC,
  785. Index: i,
  786. UserSpaceID: stg.UserSpace.UserSpace.UserSpaceID,
  787. })
  788. newBlocks = append(newBlocks, newBlock)
  789. }
  790. err := parser.Parse(ft, planBlder)
  791. if err != nil {
  792. return nil, nil, fmt.Errorf("parsing plan: %w", err)
  793. }
  794. // 如果没有任何Plan,Wait会直接返回成功
  795. execCtx := exec.NewExecContext()
  796. exec.SetValueByType(execCtx, ctx.ticktock.stgPool)
  797. ret, err := planBlder.Execute(execCtx).Wait(context.Background())
  798. if err != nil {
  799. return nil, nil, fmt.Errorf("executing io plan: %w", err)
  800. }
  801. if !shouldUpdateBlocks {
  802. return nil, nil, nil
  803. }
  804. for k, v := range ret {
  805. idx, err := strconv.ParseInt(k, 10, 64)
  806. if err != nil {
  807. return nil, nil, fmt.Errorf("parsing result key %s as index: %w", k, err)
  808. }
  809. r := v.(*ops2.FileInfoValue)
  810. newBlocks[idx].FileHash = r.Hash
  811. newBlocks[idx].Size = r.Size
  812. }
  813. var evtBlockTrans []datamap.DataTransfer
  814. for _, src := range evtSrcBlocks {
  815. for _, tar := range evtTargetBlocks {
  816. evtBlockTrans = append(evtBlockTrans, datamap.DataTransfer{
  817. SourceUserSpaceID: src.UserSpaceID,
  818. TargetUserSpaceID: tar.UserSpaceID,
  819. TransferBytes: 1,
  820. })
  821. }
  822. }
  823. var blockChgs []datamap.BlockChange
  824. for _, block := range obj.Blocks {
  825. keep := lo.ContainsBy(newBlocks, func(newBlock clitypes.ObjectBlock) bool {
  826. return newBlock.Index == block.Index && newBlock.UserSpaceID == block.UserSpaceID
  827. })
  828. if !keep {
  829. blockChgs = append(blockChgs, &datamap.BlockChangeDeleted{
  830. Index: block.Index,
  831. UserSpaceID: block.UserSpaceID,
  832. })
  833. }
  834. }
  835. blockChgs = append(blockChgs, &datamap.BlockChangeEnDecode{
  836. SourceBlocks: evtSrcBlocks,
  837. TargetBlocks: evtTargetBlocks,
  838. DataTransfers: evtBlockTrans,
  839. })
  840. return &db.UpdatingObjectRedundancy{
  841. ObjectID: obj.Object.ObjectID,
  842. FileHash: obj.Object.FileHash,
  843. Size: obj.Object.Size,
  844. Redundancy: tarRed,
  845. Blocks: newBlocks,
  846. },
  847. &datamap.BodyBlockTransfer{
  848. ObjectID: obj.Object.ObjectID,
  849. PackageID: obj.Object.PackageID,
  850. BlockChanges: blockChgs,
  851. },
  852. nil
  853. }
  854. func (t *ChangeRedundancy) lrcToLRC(ctx *changeRedundancyContext, obj clitypes.ObjectDetail, srcRed *clitypes.LRCRedundancy, tarRed *clitypes.LRCRedundancy, uploadUserSpaces []*userSpaceUsageInfo) (*db.UpdatingObjectRedundancy, datamap.SysEventBody, error) {
  855. blocksGrpByIndex := obj.GroupBlocks()
  856. var lostBlocks []int
  857. var lostBlockGrps []int
  858. canGroupReconstruct := true
  859. allBlockFlags := make([]bool, srcRed.N)
  860. for _, block := range blocksGrpByIndex {
  861. allBlockFlags[block.Index] = true
  862. }
  863. for i, ok := range allBlockFlags {
  864. grpID := srcRed.FindGroup(i)
  865. if !ok {
  866. if grpID == -1 {
  867. canGroupReconstruct = false
  868. break
  869. }
  870. if len(lostBlocks) > 0 && lostBlockGrps[len(lostBlockGrps)-1] == grpID {
  871. canGroupReconstruct = false
  872. break
  873. }
  874. lostBlocks = append(lostBlocks, i)
  875. lostBlockGrps = append(lostBlockGrps, grpID)
  876. }
  877. }
  878. // TODO 产生BlockTransfer事件
  879. if canGroupReconstruct {
  880. // return t.groupReconstructLRC(obj, lostBlocks, lostBlockGrps, blocksGrpByIndex, srcRed, uploadUserSpaces)
  881. }
  882. return t.reconstructLRC(ctx, obj, blocksGrpByIndex, srcRed, uploadUserSpaces)
  883. }
  884. /*
  885. TODO2 修复这一块的代码
  886. func (t *ChangeRedundancy) groupReconstructLRC(obj clitypes.ObjectDetail, lostBlocks []int, lostBlockGrps []int, grpedBlocks []clitypes.GrouppedObjectBlock, red *clitypes.LRCRedundancy, uploadUserSpaces []*UserSpaceLoadInfo) (*db.UpdatingObjectRedundancy, error) {
  887. grped := make(map[int]clitypes.GrouppedObjectBlock)
  888. for _, b := range grpedBlocks {
  889. grped[b.Index] = b
  890. }
  891. plans := exec.NewPlanBuilder()
  892. for i := 0; i < len(lostBlocks); i++ {
  893. var froms []ioswitchlrc.From
  894. grpEles := red.GetGroupElements(lostBlockGrps[i])
  895. for _, ele := range grpEles {
  896. if ele == lostBlocks[i] {
  897. continue
  898. }
  899. froms = append(froms, ioswitchlrc.NewFromUserSpace(grped[ele].FileHash, nil, ele))
  900. }
  901. err := lrcparser.ReconstructGroup(froms, []ioswitchlrc.To{
  902. ioswitchlrc.NewToUserSpace(uploadUserSpaces[i].UserSpace, lostBlocks[i], fmt.Sprintf("%d", lostBlocks[i])),
  903. }, plans)
  904. if err != nil {
  905. return nil, fmt.Errorf("parsing plan: %w", err)
  906. }
  907. }
  908. fmt.Printf("plans: %v\n", plans)
  909. // 如果没有任何Plan,Wait会直接返回成功
  910. // TODO 添加依赖
  911. ret, err := plans.Execute(exec.NewExecContext()).Wait(context.TODO())
  912. if err != nil {
  913. return nil, fmt.Errorf("executing io plan: %w", err)
  914. }
  915. var newBlocks []clitypes.ObjectBlock
  916. for _, i := range lostBlocks {
  917. newBlocks = append(newBlocks, clitypes.ObjectBlock{
  918. ObjectID: obj.Object.ObjectID,
  919. Index: i,
  920. UserSpaceID: uploadUserSpaces[i].UserSpace.UserSpace.UserSpaceID,
  921. FileHash: ret[fmt.Sprintf("%d", i)].(*ops2.FileHashValue).Hash,
  922. })
  923. }
  924. for _, b := range grpedBlocks {
  925. for _, hubID := range b.UserSpaceIDs {
  926. newBlocks = append(newBlocks, clitypes.ObjectBlock{
  927. ObjectID: obj.Object.ObjectID,
  928. Index: b.Index,
  929. UserSpaceID: hubID,
  930. FileHash: b.FileHash,
  931. })
  932. }
  933. }
  934. return &db.UpdatingObjectRedundancy{
  935. ObjectID: obj.Object.ObjectID,
  936. Redundancy: red,
  937. Blocks: newBlocks,
  938. }, nil
  939. }
  940. */
  941. func (t *ChangeRedundancy) reconstructLRC(ctx *changeRedundancyContext, obj clitypes.ObjectDetail, grpBlocks []clitypes.GrouppedObjectBlock, red *clitypes.LRCRedundancy, uploadUserSpaces []*userSpaceUsageInfo) (*db.UpdatingObjectRedundancy, datamap.SysEventBody, error) {
  942. var chosenBlocks []clitypes.GrouppedObjectBlock
  943. var chosenBlockStg []clitypes.UserSpaceDetail
  944. for _, block := range grpBlocks {
  945. if len(block.UserSpaceIDs) > 0 && block.Index < red.M() {
  946. stg, ok := ctx.allUserSpaces[block.UserSpaceIDs[0]]
  947. if !ok {
  948. continue
  949. }
  950. chosenBlocks = append(chosenBlocks, block)
  951. chosenBlockStg = append(chosenBlockStg, *stg.UserSpace)
  952. }
  953. if len(chosenBlocks) == red.K {
  954. break
  955. }
  956. }
  957. if len(chosenBlocks) < red.K {
  958. return nil, nil, fmt.Errorf("no enough blocks to reconstruct the original file data")
  959. }
  960. // 目前LRC的参数都相同,所以可以不用重建出完整数据然后再分块,可以直接构建出目的节点需要的块
  961. planBlder := exec.NewPlanBuilder()
  962. var froms []ioswitchlrc.From
  963. var toes []ioswitchlrc.To
  964. var newBlocks []clitypes.ObjectBlock
  965. shouldUpdateBlocks := false
  966. for i, userspace := range uploadUserSpaces {
  967. newBlock := clitypes.ObjectBlock{
  968. ObjectID: obj.Object.ObjectID,
  969. Index: i,
  970. UserSpaceID: userspace.UserSpace.UserSpace.UserSpaceID,
  971. }
  972. grp, ok := lo.Find(grpBlocks, func(grp clitypes.GrouppedObjectBlock) bool { return grp.Index == i })
  973. // 如果新选中的节点已经记录在Block表中,那么就不需要任何变更
  974. if ok && lo.Contains(grp.UserSpaceIDs, userspace.UserSpace.UserSpace.UserSpaceID) {
  975. newBlock.FileHash = grp.FileHash
  976. newBlock.Size = grp.Size
  977. newBlocks = append(newBlocks, newBlock)
  978. continue
  979. }
  980. shouldUpdateBlocks = true
  981. // 否则就要重建出这个节点需要的块
  982. for i2, block := range chosenBlocks {
  983. froms = append(froms, ioswitchlrc.NewFromStorage(block.FileHash, chosenBlockStg[i2], block.Index))
  984. }
  985. // 输出只需要自己要保存的那一块
  986. toes = append(toes, ioswitchlrc.NewToStorage(*userspace.UserSpace, i, fmt.Sprintf("%d", i)))
  987. newBlocks = append(newBlocks, newBlock)
  988. }
  989. err := lrcparser.ReconstructAny(froms, toes, planBlder)
  990. if err != nil {
  991. return nil, nil, fmt.Errorf("parsing plan: %w", err)
  992. }
  993. fmt.Printf("plans: %v\n", planBlder)
  994. // 如果没有任何Plan,Wait会直接返回成功
  995. execCtx := exec.NewExecContext()
  996. exec.SetValueByType(execCtx, ctx.ticktock.stgPool)
  997. ret, err := planBlder.Execute(execCtx).Wait(context.Background())
  998. if err != nil {
  999. return nil, nil, fmt.Errorf("executing io plan: %w", err)
  1000. }
  1001. if !shouldUpdateBlocks {
  1002. return nil, nil, nil
  1003. }
  1004. for k, v := range ret {
  1005. idx, err := strconv.ParseInt(k, 10, 64)
  1006. if err != nil {
  1007. return nil, nil, fmt.Errorf("parsing result key %s as index: %w", k, err)
  1008. }
  1009. r := v.(*ops2.FileInfoValue)
  1010. newBlocks[idx].FileHash = r.Hash
  1011. newBlocks[idx].Size = r.Size
  1012. }
  1013. // TODO 产生系统事件
  1014. return &db.UpdatingObjectRedundancy{
  1015. ObjectID: obj.Object.ObjectID,
  1016. FileHash: obj.Object.FileHash,
  1017. Size: obj.Object.Size,
  1018. Redundancy: red,
  1019. Blocks: newBlocks,
  1020. }, nil, nil
  1021. }

本项目旨在将云际存储公共基础设施化,使个人及企业可低门槛使用高效的云际存储服务(安装开箱即用云际存储客户端即可,无需关注其他组件的部署),同时支持用户灵活便捷定制云际存储的功能细节。