You can not select more than 25 topics Topics must start with a chinese character,a letter or number, can include dashes ('-') and can be up to 35 characters long.

redundancy_recover.go 40 kB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223
  1. package ticktock
  2. import (
  3. "context"
  4. "fmt"
  5. "github.com/samber/lo"
  6. "gitlink.org.cn/cloudream/common/pkgs/logger"
  7. "gitlink.org.cn/cloudream/common/utils/math2"
  8. "gitlink.org.cn/cloudream/common/utils/sort2"
  9. "gitlink.org.cn/cloudream/jcs-pub/client/internal/db"
  10. "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/ioswitch/exec"
  11. "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/ioswitch2"
  12. "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/ioswitch2/ops2"
  13. "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/ioswitch2/parser"
  14. "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/ioswitchlrc"
  15. lrcparser "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/ioswitchlrc/parser"
  16. jcstypes "gitlink.org.cn/cloudream/jcs-pub/common/types"
  17. "gitlink.org.cn/cloudream/jcs-pub/common/types/datamap"
  18. )
  19. func (t *ChangeRedundancy) chooseRedundancy(ctx *changeRedundancyContext, obj jcstypes.ObjectDetail) (jcstypes.Redundancy, []*userSpaceUsageInfo) {
  20. switch obj.Object.Redundancy.(type) {
  21. case *jcstypes.NoneRedundancy:
  22. if obj.Object.Size > ctx.ticktock.cfg.ECFileSizeThreshold {
  23. newStgs := t.chooseNewUserSpacesForEC(ctx, &jcstypes.DefaultECRedundancy)
  24. return &jcstypes.DefaultECRedundancy, newStgs
  25. }
  26. return &jcstypes.DefaultRepRedundancy, t.chooseNewUserSpacesForRep(ctx, &jcstypes.DefaultRepRedundancy)
  27. case *jcstypes.RepRedundancy:
  28. if obj.Object.Size >= ctx.ticktock.cfg.ECFileSizeThreshold {
  29. newStgs := t.chooseNewUserSpacesForEC(ctx, &jcstypes.DefaultECRedundancy)
  30. return &jcstypes.DefaultECRedundancy, newStgs
  31. }
  32. newSpaces := t.rechooseUserSpacesForRep(ctx, &jcstypes.DefaultRepRedundancy)
  33. for _, s := range newSpaces {
  34. if !obj.ContainsBlock(0, s.UserSpace.UserSpace.UserSpaceID) && !obj.ContainsPinned(s.UserSpace.UserSpace.UserSpaceID) {
  35. return &jcstypes.DefaultRepRedundancy, newSpaces
  36. }
  37. }
  38. return nil, nil
  39. case *jcstypes.ECRedundancy:
  40. if obj.Object.Size < ctx.ticktock.cfg.ECFileSizeThreshold {
  41. return &jcstypes.DefaultRepRedundancy, t.chooseNewUserSpacesForRep(ctx, &jcstypes.DefaultRepRedundancy)
  42. }
  43. newSpaces := t.rechooseUserSpacesForEC(ctx, obj, &jcstypes.DefaultECRedundancy)
  44. for i, s := range newSpaces {
  45. if !obj.ContainsBlock(i, s.UserSpace.UserSpace.UserSpaceID) {
  46. return &jcstypes.DefaultECRedundancy, newSpaces
  47. }
  48. }
  49. return nil, nil
  50. case *jcstypes.LRCRedundancy:
  51. newLRCStgs := t.rechooseUserSpacesForLRC(ctx, obj, &jcstypes.DefaultLRCRedundancy)
  52. for i, s := range newLRCStgs {
  53. if !obj.ContainsBlock(i, s.UserSpace.UserSpace.UserSpaceID) {
  54. return &jcstypes.DefaultLRCRedundancy, newLRCStgs
  55. }
  56. }
  57. return nil, nil
  58. }
  59. return nil, nil
  60. }
  61. func (t *ChangeRedundancy) doChangeRedundancy(ctx *changeRedundancyContext, obj jcstypes.ObjectDetail, newRed jcstypes.Redundancy, selectedUserSpaces []*userSpaceUsageInfo) (*db.UpdatingObjectRedundancy, datamap.SysEventBody, error) {
  62. log := logger.WithType[ChangeRedundancy]("TickTock")
  63. var updating *db.UpdatingObjectRedundancy
  64. var evt datamap.SysEventBody
  65. var err error
  66. switch srcRed := obj.Object.Redundancy.(type) {
  67. case *jcstypes.NoneRedundancy:
  68. switch newRed := newRed.(type) {
  69. case *jcstypes.RepRedundancy:
  70. log.WithField("ObjectID", obj.Object.ObjectID).Debugf("redundancy: none -> rep")
  71. updating, evt, err = t.noneToRep(ctx, obj, newRed, selectedUserSpaces)
  72. case *jcstypes.ECRedundancy:
  73. log.WithField("ObjectID", obj.Object.ObjectID).Debugf("redundancy: none -> ec")
  74. updating, evt, err = t.noneToEC(ctx, obj, newRed, selectedUserSpaces)
  75. case *jcstypes.LRCRedundancy:
  76. log.WithField("ObjectID", obj.Object.ObjectID).Debugf("redundancy: none -> lrc")
  77. updating, evt, err = t.noneToLRC(ctx, obj, newRed, selectedUserSpaces)
  78. case *jcstypes.SegmentRedundancy:
  79. log.WithField("ObjectID", obj.Object.ObjectID).Debugf("redundancy: none -> segment")
  80. updating, evt, err = t.noneToSeg(ctx, obj, newRed, selectedUserSpaces)
  81. }
  82. case *jcstypes.RepRedundancy:
  83. switch newRed := newRed.(type) {
  84. case *jcstypes.RepRedundancy:
  85. updating, evt, err = t.repToRep(ctx, obj, srcRed, selectedUserSpaces)
  86. case *jcstypes.ECRedundancy:
  87. log.WithField("ObjectID", obj.Object.ObjectID).Debugf("redundancy: rep -> ec")
  88. updating, evt, err = t.repToEC(ctx, obj, newRed, selectedUserSpaces)
  89. }
  90. case *jcstypes.ECRedundancy:
  91. switch newRed := newRed.(type) {
  92. case *jcstypes.RepRedundancy:
  93. log.WithField("ObjectID", obj.Object.ObjectID).Debugf("redundancy: ec -> rep")
  94. updating, evt, err = t.ecToRep(ctx, obj, srcRed, newRed, selectedUserSpaces)
  95. case *jcstypes.ECRedundancy:
  96. updating, evt, err = t.ecToEC(ctx, obj, srcRed, newRed, selectedUserSpaces)
  97. }
  98. case *jcstypes.LRCRedundancy:
  99. switch newRed := newRed.(type) {
  100. case *jcstypes.LRCRedundancy:
  101. updating, evt, err = t.lrcToLRC(ctx, obj, srcRed, newRed, selectedUserSpaces)
  102. }
  103. }
  104. return updating, evt, err
  105. }
  106. // 统计每个对象块所在的节点,选出块最多的不超过userspaceCnt个节点
  107. func (t *ChangeRedundancy) summaryRepObjectBlockUserSpaces(ctx *changeRedundancyContext, objs []jcstypes.ObjectDetail, userspaceCnt int) []jcstypes.UserSpaceID {
  108. type stgBlocks struct {
  109. UserSpaceID jcstypes.UserSpaceID
  110. Count int
  111. }
  112. stgBlocksMap := make(map[jcstypes.UserSpaceID]*stgBlocks)
  113. for _, obj := range objs {
  114. shouldUseEC := obj.Object.Size > ctx.ticktock.cfg.ECFileSizeThreshold
  115. if _, ok := obj.Object.Redundancy.(*jcstypes.RepRedundancy); ok && !shouldUseEC {
  116. for _, block := range obj.Blocks {
  117. if _, ok := stgBlocksMap[block.UserSpaceID]; !ok {
  118. stgBlocksMap[block.UserSpaceID] = &stgBlocks{
  119. UserSpaceID: block.UserSpaceID,
  120. Count: 0,
  121. }
  122. }
  123. stgBlocksMap[block.UserSpaceID].Count++
  124. }
  125. }
  126. }
  127. userspaces := lo.Values(stgBlocksMap)
  128. sort2.Sort(userspaces, func(left *stgBlocks, right *stgBlocks) int {
  129. return right.Count - left.Count
  130. })
  131. ids := lo.Map(userspaces, func(item *stgBlocks, idx int) jcstypes.UserSpaceID { return item.UserSpaceID })
  132. if len(ids) > userspaceCnt {
  133. ids = ids[:userspaceCnt]
  134. }
  135. return ids
  136. }
  137. func (t *ChangeRedundancy) chooseNewUserSpacesForRep(ctx *changeRedundancyContext, red *jcstypes.RepRedundancy) []*userSpaceUsageInfo {
  138. sortedUserSpaces := sort2.Sort(lo.Values(ctx.allUserSpaces), func(left *userSpaceUsageInfo, right *userSpaceUsageInfo) int {
  139. return sort2.Cmp(right.AccessAmount, left.AccessAmount)
  140. })
  141. return t.chooseSoManyUserSpaces(red.RepCount, sortedUserSpaces)
  142. }
  143. func (t *ChangeRedundancy) chooseNewUserSpacesForEC(ctx *changeRedundancyContext, red *jcstypes.ECRedundancy) []*userSpaceUsageInfo {
  144. sortedUserSpaces := sort2.Sort(lo.Values(ctx.allUserSpaces), func(left *userSpaceUsageInfo, right *userSpaceUsageInfo) int {
  145. return sort2.Cmp(right.AccessAmount, left.AccessAmount)
  146. })
  147. return t.chooseSoManyUserSpaces(red.N, sortedUserSpaces)
  148. }
  149. func (t *ChangeRedundancy) chooseNewUserSpacesForLRC(ctx *changeRedundancyContext, red *jcstypes.LRCRedundancy) []*userSpaceUsageInfo {
  150. sortedUserSpaces := sort2.Sort(lo.Values(ctx.allUserSpaces), func(left *userSpaceUsageInfo, right *userSpaceUsageInfo) int {
  151. return sort2.Cmp(right.AccessAmount, left.AccessAmount)
  152. })
  153. return t.chooseSoManyUserSpaces(red.N, sortedUserSpaces)
  154. }
  155. func (t *ChangeRedundancy) chooseNewUserSpacesForSeg(ctx *changeRedundancyContext, segCount int) []*userSpaceUsageInfo {
  156. sortedUserSpaces := sort2.Sort(lo.Values(ctx.allUserSpaces), func(left *userSpaceUsageInfo, right *userSpaceUsageInfo) int {
  157. return sort2.Cmp(right.AccessAmount, left.AccessAmount)
  158. })
  159. return t.chooseSoManyUserSpaces(segCount, sortedUserSpaces)
  160. }
  161. func (t *ChangeRedundancy) rechooseUserSpacesForRep(ctx *changeRedundancyContext, red *jcstypes.RepRedundancy) []*userSpaceUsageInfo {
  162. type rechooseUserSpace struct {
  163. *userSpaceUsageInfo
  164. HasBlock bool
  165. }
  166. var rechooseStgs []*rechooseUserSpace
  167. for _, stg := range ctx.allUserSpaces {
  168. hasBlock := false
  169. for _, id := range ctx.mostBlockStgIDs {
  170. if id == stg.UserSpace.UserSpace.UserSpaceID {
  171. hasBlock = true
  172. break
  173. }
  174. }
  175. rechooseStgs = append(rechooseStgs, &rechooseUserSpace{
  176. userSpaceUsageInfo: stg,
  177. HasBlock: hasBlock,
  178. })
  179. }
  180. sortedStgs := sort2.Sort(rechooseStgs, func(left *rechooseUserSpace, right *rechooseUserSpace) int {
  181. // 已经缓存了文件块的节点优先选择
  182. v := sort2.CmpBool(right.HasBlock, left.HasBlock)
  183. if v != 0 {
  184. return v
  185. }
  186. return sort2.Cmp(right.AccessAmount, left.AccessAmount)
  187. })
  188. return t.chooseSoManyUserSpaces(red.RepCount, lo.Map(sortedStgs, func(userspace *rechooseUserSpace, idx int) *userSpaceUsageInfo { return userspace.userSpaceUsageInfo }))
  189. }
  190. func (t *ChangeRedundancy) rechooseUserSpacesForEC(ctx *changeRedundancyContext, obj jcstypes.ObjectDetail, red *jcstypes.ECRedundancy) []*userSpaceUsageInfo {
  191. type rechooseStg struct {
  192. *userSpaceUsageInfo
  193. CachedBlockIndex int
  194. }
  195. var rechooseStgs []*rechooseStg
  196. for _, stg := range ctx.allUserSpaces {
  197. cachedBlockIndex := -1
  198. for _, block := range obj.Blocks {
  199. if block.UserSpaceID == stg.UserSpace.UserSpace.UserSpaceID {
  200. cachedBlockIndex = block.Index
  201. break
  202. }
  203. }
  204. rechooseStgs = append(rechooseStgs, &rechooseStg{
  205. userSpaceUsageInfo: stg,
  206. CachedBlockIndex: cachedBlockIndex,
  207. })
  208. }
  209. sortedStgs := sort2.Sort(rechooseStgs, func(left *rechooseStg, right *rechooseStg) int {
  210. // 已经缓存了文件块的节点优先选择
  211. v := sort2.CmpBool(right.CachedBlockIndex > -1, left.CachedBlockIndex > -1)
  212. if v != 0 {
  213. return v
  214. }
  215. return sort2.Cmp(right.AccessAmount, left.AccessAmount)
  216. })
  217. // TODO 可以考虑选择已有块的节点时,能依然按照Index顺序选择
  218. return t.chooseSoManyUserSpaces(red.N, lo.Map(sortedStgs, func(userspace *rechooseStg, idx int) *userSpaceUsageInfo { return userspace.userSpaceUsageInfo }))
  219. }
  220. func (t *ChangeRedundancy) rechooseUserSpacesForLRC(ctx *changeRedundancyContext, obj jcstypes.ObjectDetail, red *jcstypes.LRCRedundancy) []*userSpaceUsageInfo {
  221. type rechooseStg struct {
  222. *userSpaceUsageInfo
  223. CachedBlockIndex int
  224. }
  225. var rechooseStgs []*rechooseStg
  226. for _, stg := range ctx.allUserSpaces {
  227. cachedBlockIndex := -1
  228. for _, block := range obj.Blocks {
  229. if block.UserSpaceID == stg.UserSpace.UserSpace.UserSpaceID {
  230. cachedBlockIndex = block.Index
  231. break
  232. }
  233. }
  234. rechooseStgs = append(rechooseStgs, &rechooseStg{
  235. userSpaceUsageInfo: stg,
  236. CachedBlockIndex: cachedBlockIndex,
  237. })
  238. }
  239. sortedStgs := sort2.Sort(rechooseStgs, func(left *rechooseStg, right *rechooseStg) int {
  240. // 已经缓存了文件块的节点优先选择
  241. v := sort2.CmpBool(right.CachedBlockIndex > -1, left.CachedBlockIndex > -1)
  242. if v != 0 {
  243. return v
  244. }
  245. return sort2.Cmp(right.AccessAmount, left.AccessAmount)
  246. })
  247. // TODO 可以考虑选择已有块的节点时,能依然按照Index顺序选择
  248. return t.chooseSoManyUserSpaces(red.N, lo.Map(sortedStgs, func(userspace *rechooseStg, idx int) *userSpaceUsageInfo { return userspace.userSpaceUsageInfo }))
  249. }
  250. func (t *ChangeRedundancy) chooseSoManyUserSpaces(count int, stgs []*userSpaceUsageInfo) []*userSpaceUsageInfo {
  251. repeateCount := (count + len(stgs) - 1) / len(stgs)
  252. extendStgs := make([]*userSpaceUsageInfo, repeateCount*len(stgs))
  253. // 使用复制的方式将节点数扩充到要求的数量
  254. // 复制之后的结构:ABCD -> AAABBBCCCDDD
  255. for p := 0; p < repeateCount; p++ {
  256. for i, userspace := range stgs {
  257. putIdx := i*repeateCount + p
  258. extendStgs[putIdx] = userspace
  259. }
  260. }
  261. extendStgs = extendStgs[:count]
  262. var chosen []*userSpaceUsageInfo
  263. for len(chosen) < count {
  264. // 在每一轮内都选不同地区的节点,如果节点数不够,那么就再来一轮
  265. chosenLocations := make(map[jcstypes.Location]bool)
  266. for i, stg := range extendStgs {
  267. if stg == nil {
  268. continue
  269. }
  270. if chosenLocations[stg.UserSpace.UserSpace.Storage.GetLocation()] {
  271. continue
  272. }
  273. chosen = append(chosen, stg)
  274. chosenLocations[stg.UserSpace.UserSpace.Storage.GetLocation()] = true
  275. extendStgs[i] = nil
  276. }
  277. }
  278. return chosen
  279. }
  280. func (t *ChangeRedundancy) noneToRep(ctx *changeRedundancyContext, obj jcstypes.ObjectDetail, red *jcstypes.RepRedundancy, uploadStgs []*userSpaceUsageInfo) (*db.UpdatingObjectRedundancy, datamap.SysEventBody, error) {
  281. if len(obj.Blocks) == 0 {
  282. return nil, nil, fmt.Errorf("object is not cached on any userspaces, cannot change its redundancy to rep")
  283. }
  284. srcStg, ok := ctx.allUserSpaces[obj.Blocks[0].UserSpaceID]
  285. if !ok {
  286. return nil, nil, fmt.Errorf("userspace %v not found", obj.Blocks[0].UserSpaceID)
  287. }
  288. // 如果选择的备份节点都是同一个,那么就只要上传一次
  289. uploadStgs = lo.UniqBy(uploadStgs, func(item *userSpaceUsageInfo) jcstypes.UserSpaceID { return item.UserSpace.UserSpace.UserSpaceID })
  290. ft := ioswitch2.NewFromTo()
  291. ft.AddFrom(ioswitch2.NewFromShardstore(obj.Object.FileHash, *srcStg.UserSpace, ioswitch2.RawStream()))
  292. for i, stg := range uploadStgs {
  293. ft.AddTo(ioswitch2.NewToShardStore(*stg.UserSpace, ioswitch2.RawStream(), fmt.Sprintf("%d", i)))
  294. }
  295. plans := exec.NewPlanBuilder()
  296. err := parser.Parse(ft, plans)
  297. if err != nil {
  298. return nil, nil, fmt.Errorf("parsing plan: %w", err)
  299. }
  300. // TODO 添加依赖
  301. execCtx := exec.NewExecContext()
  302. exec.SetValueByType(execCtx, ctx.ticktock.stgPool)
  303. ret, err := plans.Execute(execCtx).Wait(context.Background())
  304. if err != nil {
  305. return nil, nil, fmt.Errorf("executing io plan: %w", err)
  306. }
  307. var blocks []jcstypes.ObjectBlock
  308. var blockChgs []datamap.BlockChange
  309. for i, stg := range uploadStgs {
  310. r := ret.Get(fmt.Sprintf("%d", i)).(*ops2.FileInfoValue)
  311. blocks = append(blocks, jcstypes.ObjectBlock{
  312. ObjectID: obj.Object.ObjectID,
  313. Index: 0,
  314. UserSpaceID: stg.UserSpace.UserSpace.UserSpaceID,
  315. FileHash: r.Hash,
  316. Size: r.Size,
  317. })
  318. blockChgs = append(blockChgs, &datamap.BlockChangeClone{
  319. BlockType: datamap.BlockTypeRaw,
  320. SourceUserSpaceID: obj.Blocks[0].UserSpaceID,
  321. TargetUserSpaceID: stg.UserSpace.UserSpace.UserSpaceID,
  322. TransferBytes: 1,
  323. })
  324. }
  325. // 删除原本的文件块
  326. blockChgs = append(blockChgs, &datamap.BlockChangeDeleted{
  327. Index: 0,
  328. UserSpaceID: obj.Blocks[0].UserSpaceID,
  329. })
  330. return &db.UpdatingObjectRedundancy{
  331. ObjectID: obj.Object.ObjectID,
  332. FileHash: obj.Object.FileHash,
  333. Size: obj.Object.Size,
  334. Redundancy: red,
  335. Blocks: blocks,
  336. }, &datamap.BodyBlockTransfer{
  337. ObjectID: obj.Object.ObjectID,
  338. PackageID: obj.Object.PackageID,
  339. BlockChanges: blockChgs,
  340. }, nil
  341. }
  342. func (t *ChangeRedundancy) noneToEC(ctx *changeRedundancyContext, obj jcstypes.ObjectDetail, red *jcstypes.ECRedundancy, uploadStgs []*userSpaceUsageInfo) (*db.UpdatingObjectRedundancy, datamap.SysEventBody, error) {
  343. if len(obj.Blocks) == 0 {
  344. return nil, nil, fmt.Errorf("object is not cached on any userspaces, cannot change its redundancy to ec")
  345. }
  346. srcStg, ok := ctx.allUserSpaces[obj.Blocks[0].UserSpaceID]
  347. if !ok {
  348. return nil, nil, fmt.Errorf("userspace %v not found", obj.Blocks[0].UserSpaceID)
  349. }
  350. ft := ioswitch2.NewFromTo()
  351. ft.ECParam = red
  352. ft.AddFrom(ioswitch2.NewFromShardstore(obj.Object.FileHash, *srcStg.UserSpace, ioswitch2.RawStream()))
  353. for i := 0; i < red.N; i++ {
  354. ft.AddTo(ioswitch2.NewToShardStore(*uploadStgs[i].UserSpace, ioswitch2.ECStream(i), fmt.Sprintf("%d", i)))
  355. }
  356. plans := exec.NewPlanBuilder()
  357. err := parser.Parse(ft, plans)
  358. if err != nil {
  359. return nil, nil, fmt.Errorf("parsing plan: %w", err)
  360. }
  361. execCtx := exec.NewExecContext()
  362. exec.SetValueByType(execCtx, ctx.ticktock.stgPool)
  363. ioRet, err := plans.Execute(execCtx).Wait(context.Background())
  364. if err != nil {
  365. return nil, nil, fmt.Errorf("executing io plan: %w", err)
  366. }
  367. var blocks []jcstypes.ObjectBlock
  368. var evtTargetBlocks []datamap.Block
  369. var evtBlockTrans []datamap.DataTransfer
  370. for i := 0; i < red.N; i++ {
  371. r := ioRet.Get(fmt.Sprintf("%d", i)).(*ops2.FileInfoValue)
  372. blocks = append(blocks, jcstypes.ObjectBlock{
  373. ObjectID: obj.Object.ObjectID,
  374. Index: i,
  375. UserSpaceID: uploadStgs[i].UserSpace.UserSpace.UserSpaceID,
  376. FileHash: r.Hash,
  377. Size: r.Size,
  378. })
  379. evtTargetBlocks = append(evtTargetBlocks, datamap.Block{
  380. BlockType: datamap.BlockTypeEC,
  381. Index: i,
  382. UserSpaceID: uploadStgs[i].UserSpace.UserSpace.UserSpaceID,
  383. })
  384. evtBlockTrans = append(evtBlockTrans, datamap.DataTransfer{
  385. SourceUserSpaceID: obj.Blocks[0].UserSpaceID,
  386. TargetUserSpaceID: uploadStgs[i].UserSpace.UserSpace.UserSpaceID,
  387. TransferBytes: 1,
  388. })
  389. }
  390. return &db.UpdatingObjectRedundancy{
  391. ObjectID: obj.Object.ObjectID,
  392. FileHash: obj.Object.FileHash,
  393. Size: obj.Object.Size,
  394. Redundancy: red,
  395. Blocks: blocks,
  396. },
  397. &datamap.BodyBlockTransfer{
  398. ObjectID: obj.Object.ObjectID,
  399. PackageID: obj.Object.PackageID,
  400. BlockChanges: []datamap.BlockChange{
  401. &datamap.BlockChangeEnDecode{
  402. SourceBlocks: []datamap.Block{{
  403. BlockType: datamap.BlockTypeRaw,
  404. UserSpaceID: obj.Blocks[0].UserSpaceID,
  405. }},
  406. TargetBlocks: evtTargetBlocks,
  407. DataTransfers: evtBlockTrans,
  408. },
  409. // 删除原本的文件块
  410. &datamap.BlockChangeDeleted{
  411. Index: 0,
  412. UserSpaceID: obj.Blocks[0].UserSpaceID,
  413. },
  414. },
  415. }, nil
  416. }
  417. func (t *ChangeRedundancy) noneToLRC(ctx *changeRedundancyContext, obj jcstypes.ObjectDetail, red *jcstypes.LRCRedundancy, uploadStgs []*userSpaceUsageInfo) (*db.UpdatingObjectRedundancy, datamap.SysEventBody, error) {
  418. if len(obj.Blocks) == 0 {
  419. return nil, nil, fmt.Errorf("object is not cached on any userspaces, cannot change its redundancy to ec")
  420. }
  421. srcStg, ok := ctx.allUserSpaces[obj.Blocks[0].UserSpaceID]
  422. if !ok {
  423. return nil, nil, fmt.Errorf("userspace %v not found", obj.Blocks[0].UserSpaceID)
  424. }
  425. var toes []ioswitchlrc.To
  426. for i := 0; i < red.N; i++ {
  427. toes = append(toes, ioswitchlrc.NewToStorage(*uploadStgs[i].UserSpace, i, fmt.Sprintf("%d", i)))
  428. }
  429. plans := exec.NewPlanBuilder()
  430. err := lrcparser.Encode(ioswitchlrc.NewFromStorage(obj.Object.FileHash, *srcStg.UserSpace, -1), toes, plans)
  431. if err != nil {
  432. return nil, nil, fmt.Errorf("parsing plan: %w", err)
  433. }
  434. execCtx := exec.NewExecContext()
  435. exec.SetValueByType(execCtx, ctx.ticktock.stgPool)
  436. ioRet, err := plans.Execute(execCtx).Wait(context.Background())
  437. if err != nil {
  438. return nil, nil, fmt.Errorf("executing io plan: %w", err)
  439. }
  440. var blocks []jcstypes.ObjectBlock
  441. var evtTargetBlocks []datamap.Block
  442. var evtBlockTrans []datamap.DataTransfer
  443. for i := 0; i < red.N; i++ {
  444. r := ioRet.Get(fmt.Sprintf("%d", i)).(*ops2.FileInfoValue)
  445. blocks = append(blocks, jcstypes.ObjectBlock{
  446. ObjectID: obj.Object.ObjectID,
  447. Index: i,
  448. UserSpaceID: uploadStgs[i].UserSpace.UserSpace.UserSpaceID,
  449. FileHash: r.Hash,
  450. Size: r.Size,
  451. })
  452. evtTargetBlocks = append(evtTargetBlocks, datamap.Block{
  453. BlockType: datamap.BlockTypeEC,
  454. Index: i,
  455. UserSpaceID: uploadStgs[i].UserSpace.UserSpace.UserSpaceID,
  456. })
  457. evtBlockTrans = append(evtBlockTrans, datamap.DataTransfer{
  458. SourceUserSpaceID: obj.Blocks[0].UserSpaceID,
  459. TargetUserSpaceID: uploadStgs[i].UserSpace.UserSpace.UserSpaceID,
  460. TransferBytes: 1,
  461. })
  462. }
  463. return &db.UpdatingObjectRedundancy{
  464. ObjectID: obj.Object.ObjectID,
  465. FileHash: obj.Object.FileHash,
  466. Size: obj.Object.Size,
  467. Redundancy: red,
  468. Blocks: blocks,
  469. },
  470. &datamap.BodyBlockTransfer{
  471. ObjectID: obj.Object.ObjectID,
  472. PackageID: obj.Object.PackageID,
  473. BlockChanges: []datamap.BlockChange{
  474. &datamap.BlockChangeEnDecode{
  475. SourceBlocks: []datamap.Block{{
  476. BlockType: datamap.BlockTypeRaw,
  477. UserSpaceID: obj.Blocks[0].UserSpaceID,
  478. }},
  479. TargetBlocks: evtTargetBlocks,
  480. DataTransfers: evtBlockTrans,
  481. },
  482. // 删除原本的文件块
  483. &datamap.BlockChangeDeleted{
  484. Index: 0,
  485. UserSpaceID: obj.Blocks[0].UserSpaceID,
  486. },
  487. },
  488. },
  489. nil
  490. }
  491. func (t *ChangeRedundancy) noneToSeg(ctx *changeRedundancyContext, obj jcstypes.ObjectDetail, red *jcstypes.SegmentRedundancy, uploadStgs []*userSpaceUsageInfo) (*db.UpdatingObjectRedundancy, datamap.SysEventBody, error) {
  492. if len(obj.Blocks) == 0 {
  493. return nil, nil, fmt.Errorf("object is not cached on any userspaces, cannot change its redundancy to rep")
  494. }
  495. srcStg, ok := ctx.allUserSpaces[obj.Blocks[0].UserSpaceID]
  496. if !ok {
  497. return nil, nil, fmt.Errorf("userspace %v not found", obj.Blocks[0].UserSpaceID)
  498. }
  499. // 如果选择的备份节点都是同一个,那么就只要上传一次
  500. uploadStgs = lo.UniqBy(uploadStgs, func(item *userSpaceUsageInfo) jcstypes.UserSpaceID { return item.UserSpace.UserSpace.UserSpaceID })
  501. ft := ioswitch2.NewFromTo()
  502. ft.SegmentParam = red
  503. ft.AddFrom(ioswitch2.NewFromShardstore(obj.Object.FileHash, *srcStg.UserSpace, ioswitch2.RawStream()))
  504. for i, stg := range uploadStgs {
  505. ft.AddTo(ioswitch2.NewToShardStore(*stg.UserSpace, ioswitch2.SegmentStream(i), fmt.Sprintf("%d", i)))
  506. }
  507. plans := exec.NewPlanBuilder()
  508. err := parser.Parse(ft, plans)
  509. if err != nil {
  510. return nil, nil, fmt.Errorf("parsing plan: %w", err)
  511. }
  512. // TODO 添加依赖
  513. execCtx := exec.NewExecContext()
  514. exec.SetValueByType(execCtx, ctx.ticktock.stgPool)
  515. ret, err := plans.Execute(execCtx).Wait(context.Background())
  516. if err != nil {
  517. return nil, nil, fmt.Errorf("executing io plan: %w", err)
  518. }
  519. var blocks []jcstypes.ObjectBlock
  520. var evtTargetBlocks []datamap.Block
  521. var evtBlockTrans []datamap.DataTransfer
  522. for i, stg := range uploadStgs {
  523. r := ret.Get(fmt.Sprintf("%d", i)).(*ops2.FileInfoValue)
  524. blocks = append(blocks, jcstypes.ObjectBlock{
  525. ObjectID: obj.Object.ObjectID,
  526. Index: i,
  527. UserSpaceID: stg.UserSpace.UserSpace.UserSpaceID,
  528. FileHash: r.Hash,
  529. Size: r.Size,
  530. })
  531. evtTargetBlocks = append(evtTargetBlocks, datamap.Block{
  532. BlockType: datamap.BlockTypeSegment,
  533. Index: i,
  534. UserSpaceID: uploadStgs[i].UserSpace.UserSpace.UserSpaceID,
  535. })
  536. evtBlockTrans = append(evtBlockTrans, datamap.DataTransfer{
  537. SourceUserSpaceID: obj.Blocks[0].UserSpaceID,
  538. TargetUserSpaceID: uploadStgs[i].UserSpace.UserSpace.UserSpaceID,
  539. TransferBytes: 1,
  540. })
  541. }
  542. return &db.UpdatingObjectRedundancy{
  543. ObjectID: obj.Object.ObjectID,
  544. FileHash: obj.Object.FileHash,
  545. Size: obj.Object.Size,
  546. Redundancy: red,
  547. Blocks: blocks,
  548. },
  549. &datamap.BodyBlockTransfer{
  550. ObjectID: obj.Object.ObjectID,
  551. PackageID: obj.Object.PackageID,
  552. BlockChanges: []datamap.BlockChange{
  553. &datamap.BlockChangeEnDecode{
  554. SourceBlocks: []datamap.Block{{
  555. BlockType: datamap.BlockTypeRaw,
  556. UserSpaceID: obj.Blocks[0].UserSpaceID,
  557. }},
  558. TargetBlocks: evtTargetBlocks,
  559. DataTransfers: evtBlockTrans,
  560. },
  561. // 删除原本的文件块
  562. &datamap.BlockChangeDeleted{
  563. Index: 0,
  564. UserSpaceID: obj.Blocks[0].UserSpaceID,
  565. },
  566. },
  567. },
  568. nil
  569. }
  570. func (t *ChangeRedundancy) repToRep(ctx *changeRedundancyContext, obj jcstypes.ObjectDetail, red *jcstypes.RepRedundancy, uploadStgs []*userSpaceUsageInfo) (*db.UpdatingObjectRedundancy, datamap.SysEventBody, error) {
  571. if len(obj.Blocks) == 0 {
  572. return nil, nil, fmt.Errorf("object is not cached on any userspaces, cannot change its redundancy to rep")
  573. }
  574. srcStg, ok := ctx.allUserSpaces[obj.Blocks[0].UserSpaceID]
  575. if !ok {
  576. return nil, nil, fmt.Errorf("userspace %v not found", obj.Blocks[0].UserSpaceID)
  577. }
  578. // 如果选择的备份节点都是同一个,那么就只要上传一次
  579. uploadStgs = lo.UniqBy(uploadStgs, func(item *userSpaceUsageInfo) jcstypes.UserSpaceID { return item.UserSpace.UserSpace.UserSpaceID })
  580. ft := ioswitch2.NewFromTo()
  581. ft.AddFrom(ioswitch2.NewFromShardstore(obj.Object.FileHash, *srcStg.UserSpace, ioswitch2.RawStream()))
  582. for i, stg := range uploadStgs {
  583. ft.AddTo(ioswitch2.NewToShardStore(*stg.UserSpace, ioswitch2.RawStream(), fmt.Sprintf("%d", i)))
  584. }
  585. plans := exec.NewPlanBuilder()
  586. err := parser.Parse(ft, plans)
  587. if err != nil {
  588. return nil, nil, fmt.Errorf("parsing plan: %w", err)
  589. }
  590. // TODO 添加依赖
  591. execCtx := exec.NewExecContext()
  592. exec.SetValueByType(execCtx, ctx.ticktock.stgPool)
  593. ret, err := plans.Execute(execCtx).Wait(context.Background())
  594. if err != nil {
  595. return nil, nil, fmt.Errorf("executing io plan: %w", err)
  596. }
  597. var blocks []jcstypes.ObjectBlock
  598. var blockChgs []datamap.BlockChange
  599. for i, stg := range uploadStgs {
  600. r := ret.Get(fmt.Sprintf("%d", i)).(*ops2.FileInfoValue)
  601. blocks = append(blocks, jcstypes.ObjectBlock{
  602. ObjectID: obj.Object.ObjectID,
  603. Index: 0,
  604. UserSpaceID: stg.UserSpace.UserSpace.UserSpaceID,
  605. FileHash: r.Hash,
  606. Size: r.Size,
  607. })
  608. blockChgs = append(blockChgs, &datamap.BlockChangeClone{
  609. BlockType: datamap.BlockTypeRaw,
  610. SourceUserSpaceID: obj.Blocks[0].UserSpaceID,
  611. TargetUserSpaceID: stg.UserSpace.UserSpace.UserSpaceID,
  612. TransferBytes: 1,
  613. })
  614. }
  615. // 删除原本的文件块
  616. blockChgs = append(blockChgs, &datamap.BlockChangeDeleted{
  617. Index: 0,
  618. UserSpaceID: obj.Blocks[0].UserSpaceID,
  619. })
  620. return &db.UpdatingObjectRedundancy{
  621. ObjectID: obj.Object.ObjectID,
  622. FileHash: obj.Object.FileHash,
  623. Size: obj.Object.Size,
  624. Redundancy: red,
  625. Blocks: blocks,
  626. },
  627. &datamap.BodyBlockTransfer{
  628. ObjectID: obj.Object.ObjectID,
  629. PackageID: obj.Object.PackageID,
  630. BlockChanges: blockChgs,
  631. },
  632. nil
  633. }
  634. func (t *ChangeRedundancy) repToEC(ctx *changeRedundancyContext, obj jcstypes.ObjectDetail, red *jcstypes.ECRedundancy, uploadUserSpaces []*userSpaceUsageInfo) (*db.UpdatingObjectRedundancy, datamap.SysEventBody, error) {
  635. return t.noneToEC(ctx, obj, red, uploadUserSpaces)
  636. }
  637. func (t *ChangeRedundancy) ecToRep(ctx *changeRedundancyContext, obj jcstypes.ObjectDetail, srcRed *jcstypes.ECRedundancy, tarRed *jcstypes.RepRedundancy, uploadStgs []*userSpaceUsageInfo) (*db.UpdatingObjectRedundancy, datamap.SysEventBody, error) {
  638. var chosenBlocks []jcstypes.GrouppedObjectBlock
  639. var chosenBlockIndexes []int
  640. var chosenBlockStg []jcstypes.UserSpaceDetail
  641. for _, block := range obj.GroupBlocks() {
  642. if len(block.UserSpaceIDs) > 0 {
  643. // TODO 考虑选择最优的节点
  644. stg, ok := ctx.allUserSpaces[block.UserSpaceIDs[0]]
  645. if !ok {
  646. continue
  647. }
  648. chosenBlocks = append(chosenBlocks, block)
  649. chosenBlockIndexes = append(chosenBlockIndexes, block.Index)
  650. chosenBlockStg = append(chosenBlockStg, *stg.UserSpace)
  651. }
  652. if len(chosenBlocks) == srcRed.K {
  653. break
  654. }
  655. }
  656. if len(chosenBlocks) < srcRed.K {
  657. return nil, nil, fmt.Errorf("no enough blocks to reconstruct the original file data")
  658. }
  659. // 如果选择的备份节点都是同一个,那么就只要上传一次
  660. uploadStgs = lo.UniqBy(uploadStgs, func(item *userSpaceUsageInfo) jcstypes.UserSpaceID { return item.UserSpace.UserSpace.UserSpaceID })
  661. planBlder := exec.NewPlanBuilder()
  662. ft := ioswitch2.NewFromTo()
  663. ft.ECParam = srcRed
  664. for i, block := range chosenBlocks {
  665. ft.AddFrom(ioswitch2.NewFromShardstore(block.FileHash, chosenBlockStg[i], ioswitch2.ECStream(block.Index)))
  666. }
  667. for i := range uploadStgs {
  668. ft.AddTo(ioswitch2.NewToShardStoreWithRange(*uploadStgs[i].UserSpace, ioswitch2.RawStream(), fmt.Sprintf("%d", i), math2.NewRange(0, obj.Object.Size)))
  669. }
  670. err := parser.Parse(ft, planBlder)
  671. if err != nil {
  672. return nil, nil, fmt.Errorf("parsing plan: %w", err)
  673. }
  674. // TODO 添加依赖
  675. execCtx := exec.NewExecContext()
  676. exec.SetValueByType(execCtx, ctx.ticktock.stgPool)
  677. ioRet, err := planBlder.Execute(execCtx).Wait(context.Background())
  678. if err != nil {
  679. return nil, nil, fmt.Errorf("executing io plan: %w", err)
  680. }
  681. var blocks []jcstypes.ObjectBlock
  682. for i := range uploadStgs {
  683. r := ioRet.Get(fmt.Sprintf("%d", i)).(*ops2.FileInfoValue)
  684. blocks = append(blocks, jcstypes.ObjectBlock{
  685. ObjectID: obj.Object.ObjectID,
  686. Index: 0,
  687. UserSpaceID: uploadStgs[i].UserSpace.UserSpace.UserSpaceID,
  688. FileHash: r.Hash,
  689. Size: r.Size,
  690. })
  691. }
  692. var evtSrcBlocks []datamap.Block
  693. var evtTargetBlocks []datamap.Block
  694. for i2, block := range chosenBlocks {
  695. evtSrcBlocks = append(evtSrcBlocks, datamap.Block{
  696. BlockType: datamap.BlockTypeEC,
  697. Index: block.Index,
  698. UserSpaceID: chosenBlockStg[i2].UserSpace.UserSpaceID,
  699. })
  700. }
  701. for _, stg := range uploadStgs {
  702. evtTargetBlocks = append(evtTargetBlocks, datamap.Block{
  703. BlockType: datamap.BlockTypeRaw,
  704. Index: 0,
  705. UserSpaceID: stg.UserSpace.UserSpace.UserSpaceID,
  706. })
  707. }
  708. var evtBlockTrans []datamap.DataTransfer
  709. for _, stg := range uploadStgs {
  710. for i2 := range chosenBlocks {
  711. evtBlockTrans = append(evtBlockTrans, datamap.DataTransfer{
  712. SourceUserSpaceID: chosenBlockStg[i2].UserSpace.UserSpaceID,
  713. TargetUserSpaceID: stg.UserSpace.UserSpace.UserSpaceID,
  714. TransferBytes: 1,
  715. })
  716. }
  717. }
  718. var blockChgs []datamap.BlockChange
  719. blockChgs = append(blockChgs, &datamap.BlockChangeEnDecode{
  720. SourceBlocks: evtSrcBlocks,
  721. TargetBlocks: evtTargetBlocks,
  722. DataTransfers: evtBlockTrans,
  723. })
  724. for _, block := range obj.Blocks {
  725. blockChgs = append(blockChgs, &datamap.BlockChangeDeleted{
  726. Index: block.Index,
  727. UserSpaceID: block.UserSpaceID,
  728. })
  729. }
  730. return &db.UpdatingObjectRedundancy{
  731. ObjectID: obj.Object.ObjectID,
  732. FileHash: obj.Object.FileHash,
  733. Size: obj.Object.Size,
  734. Redundancy: tarRed,
  735. Blocks: blocks,
  736. },
  737. &datamap.BodyBlockTransfer{
  738. ObjectID: obj.Object.ObjectID,
  739. PackageID: obj.Object.PackageID,
  740. BlockChanges: blockChgs,
  741. },
  742. nil
  743. }
  744. func (t *ChangeRedundancy) ecToEC(ctx *changeRedundancyContext, obj jcstypes.ObjectDetail, srcRed *jcstypes.ECRedundancy, tarRed *jcstypes.ECRedundancy, uploadUserSpaces []*userSpaceUsageInfo) (*db.UpdatingObjectRedundancy, datamap.SysEventBody, error) {
  745. grpBlocks := obj.GroupBlocks()
  746. var chosenBlocks []jcstypes.GrouppedObjectBlock
  747. var chosenBlockStg []jcstypes.UserSpaceDetail
  748. for _, block := range grpBlocks {
  749. if len(block.UserSpaceIDs) > 0 {
  750. stg, ok := ctx.allUserSpaces[block.UserSpaceIDs[0]]
  751. if !ok {
  752. continue
  753. }
  754. chosenBlocks = append(chosenBlocks, block)
  755. chosenBlockStg = append(chosenBlockStg, *stg.UserSpace)
  756. }
  757. if len(chosenBlocks) == srcRed.K {
  758. break
  759. }
  760. }
  761. if len(chosenBlocks) < srcRed.K {
  762. return nil, nil, fmt.Errorf("no enough blocks to reconstruct the original file data")
  763. }
  764. // 目前EC的参数都相同,所以可以不用重建出完整数据然后再分块,可以直接构建出目的节点需要的块
  765. planBlder := exec.NewPlanBuilder()
  766. var evtSrcBlocks []datamap.Block
  767. var evtTargetBlocks []datamap.Block
  768. ft := ioswitch2.NewFromTo()
  769. ft.ECParam = srcRed
  770. for i, block := range chosenBlocks {
  771. ft.AddFrom(ioswitch2.NewFromShardstore(block.FileHash, chosenBlockStg[i], ioswitch2.ECStream(block.Index)))
  772. evtSrcBlocks = append(evtSrcBlocks, datamap.Block{
  773. BlockType: datamap.BlockTypeEC,
  774. Index: block.Index,
  775. UserSpaceID: chosenBlockStg[i].UserSpace.UserSpaceID,
  776. })
  777. }
  778. var newBlocks []jcstypes.ObjectBlock
  779. shouldUpdateBlocks := false
  780. for i, stg := range uploadUserSpaces {
  781. newBlock := jcstypes.ObjectBlock{
  782. ObjectID: obj.Object.ObjectID,
  783. Index: i,
  784. UserSpaceID: stg.UserSpace.UserSpace.UserSpaceID,
  785. }
  786. grp, ok := lo.Find(grpBlocks, func(grp jcstypes.GrouppedObjectBlock) bool { return grp.Index == i })
  787. // 如果新选中的节点已经记录在Block表中,那么就不需要任何变更
  788. if ok && lo.Contains(grp.UserSpaceIDs, stg.UserSpace.UserSpace.UserSpaceID) {
  789. newBlock.FileHash = grp.FileHash
  790. newBlock.Size = grp.Size
  791. newBlocks = append(newBlocks, newBlock)
  792. continue
  793. }
  794. shouldUpdateBlocks = true
  795. // 否则就要重建出这个节点需要的块
  796. // 输出只需要自己要保存的那一块
  797. ft.AddTo(ioswitch2.NewToShardStore(*stg.UserSpace, ioswitch2.ECStream(i), fmt.Sprintf("%d", i)))
  798. evtTargetBlocks = append(evtTargetBlocks, datamap.Block{
  799. BlockType: datamap.BlockTypeEC,
  800. Index: i,
  801. UserSpaceID: stg.UserSpace.UserSpace.UserSpaceID,
  802. })
  803. newBlocks = append(newBlocks, newBlock)
  804. }
  805. err := parser.Parse(ft, planBlder)
  806. if err != nil {
  807. return nil, nil, fmt.Errorf("parsing plan: %w", err)
  808. }
  809. // 如果没有任何Plan,Wait会直接返回成功
  810. execCtx := exec.NewExecContext()
  811. exec.SetValueByType(execCtx, ctx.ticktock.stgPool)
  812. ret, err := planBlder.Execute(execCtx).Wait(context.Background())
  813. if err != nil {
  814. return nil, nil, fmt.Errorf("executing io plan: %w", err)
  815. }
  816. if !shouldUpdateBlocks {
  817. return nil, nil, nil
  818. }
  819. for i := range newBlocks {
  820. v := ret.Get(fmt.Sprintf("%v", i))
  821. if v == nil {
  822. // 有一些块已经存在,所以不会在计划执行结果中
  823. continue
  824. }
  825. r := v.(*ops2.FileInfoValue)
  826. newBlocks[i].FileHash = r.Hash
  827. newBlocks[i].Size = r.Size
  828. }
  829. var evtBlockTrans []datamap.DataTransfer
  830. for _, src := range evtSrcBlocks {
  831. for _, tar := range evtTargetBlocks {
  832. evtBlockTrans = append(evtBlockTrans, datamap.DataTransfer{
  833. SourceUserSpaceID: src.UserSpaceID,
  834. TargetUserSpaceID: tar.UserSpaceID,
  835. TransferBytes: 1,
  836. })
  837. }
  838. }
  839. var blockChgs []datamap.BlockChange
  840. for _, block := range obj.Blocks {
  841. keep := lo.ContainsBy(newBlocks, func(newBlock jcstypes.ObjectBlock) bool {
  842. return newBlock.Index == block.Index && newBlock.UserSpaceID == block.UserSpaceID
  843. })
  844. if !keep {
  845. blockChgs = append(blockChgs, &datamap.BlockChangeDeleted{
  846. Index: block.Index,
  847. UserSpaceID: block.UserSpaceID,
  848. })
  849. }
  850. }
  851. blockChgs = append(blockChgs, &datamap.BlockChangeEnDecode{
  852. SourceBlocks: evtSrcBlocks,
  853. TargetBlocks: evtTargetBlocks,
  854. DataTransfers: evtBlockTrans,
  855. })
  856. return &db.UpdatingObjectRedundancy{
  857. ObjectID: obj.Object.ObjectID,
  858. FileHash: obj.Object.FileHash,
  859. Size: obj.Object.Size,
  860. Redundancy: tarRed,
  861. Blocks: newBlocks,
  862. },
  863. &datamap.BodyBlockTransfer{
  864. ObjectID: obj.Object.ObjectID,
  865. PackageID: obj.Object.PackageID,
  866. BlockChanges: blockChgs,
  867. },
  868. nil
  869. }
  870. func (t *ChangeRedundancy) lrcToLRC(ctx *changeRedundancyContext, obj jcstypes.ObjectDetail, srcRed *jcstypes.LRCRedundancy, tarRed *jcstypes.LRCRedundancy, uploadUserSpaces []*userSpaceUsageInfo) (*db.UpdatingObjectRedundancy, datamap.SysEventBody, error) {
  871. blocksGrpByIndex := obj.GroupBlocks()
  872. var lostBlocks []int
  873. var lostBlockGrps []int
  874. canGroupReconstruct := true
  875. allBlockFlags := make([]bool, srcRed.N)
  876. for _, block := range blocksGrpByIndex {
  877. allBlockFlags[block.Index] = true
  878. }
  879. for i, ok := range allBlockFlags {
  880. grpID := srcRed.FindGroup(i)
  881. if !ok {
  882. if grpID == -1 {
  883. canGroupReconstruct = false
  884. break
  885. }
  886. if len(lostBlocks) > 0 && lostBlockGrps[len(lostBlockGrps)-1] == grpID {
  887. canGroupReconstruct = false
  888. break
  889. }
  890. lostBlocks = append(lostBlocks, i)
  891. lostBlockGrps = append(lostBlockGrps, grpID)
  892. }
  893. }
  894. // TODO 产生BlockTransfer事件
  895. if canGroupReconstruct {
  896. // return t.groupReconstructLRC(obj, lostBlocks, lostBlockGrps, blocksGrpByIndex, srcRed, uploadUserSpaces)
  897. }
  898. return t.reconstructLRC(ctx, obj, blocksGrpByIndex, srcRed, uploadUserSpaces)
  899. }
  900. /*
  901. TODO2 修复这一块的代码
  902. func (t *ChangeRedundancy) groupReconstructLRC(obj jcstypes.ObjectDetail, lostBlocks []int, lostBlockGrps []int, grpedBlocks []jcstypes.GrouppedObjectBlock, red *jcstypes.LRCRedundancy, uploadUserSpaces []*UserSpaceLoadInfo) (*db.UpdatingObjectRedundancy, error) {
  903. grped := make(map[int]jcstypes.GrouppedObjectBlock)
  904. for _, b := range grpedBlocks {
  905. grped[b.Index] = b
  906. }
  907. plans := exec.NewPlanBuilder()
  908. for i := 0; i < len(lostBlocks); i++ {
  909. var froms []ioswitchlrc.From
  910. grpEles := red.GetGroupElements(lostBlockGrps[i])
  911. for _, ele := range grpEles {
  912. if ele == lostBlocks[i] {
  913. continue
  914. }
  915. froms = append(froms, ioswitchlrc.NewFromUserSpace(grped[ele].FileHash, nil, ele))
  916. }
  917. err := lrcparser.ReconstructGroup(froms, []ioswitchlrc.To{
  918. ioswitchlrc.NewToUserSpace(uploadUserSpaces[i].UserSpace, lostBlocks[i], fmt.Sprintf("%d", lostBlocks[i])),
  919. }, plans)
  920. if err != nil {
  921. return nil, fmt.Errorf("parsing plan: %w", err)
  922. }
  923. }
  924. fmt.Printf("plans: %v\n", plans)
  925. // 如果没有任何Plan,Wait会直接返回成功
  926. // TODO 添加依赖
  927. ret, err := plans.Execute(exec.NewExecContext()).Wait(context.TODO())
  928. if err != nil {
  929. return nil, fmt.Errorf("executing io plan: %w", err)
  930. }
  931. var newBlocks []jcstypes.ObjectBlock
  932. for _, i := range lostBlocks {
  933. newBlocks = append(newBlocks, jcstypes.ObjectBlock{
  934. ObjectID: obj.Object.ObjectID,
  935. Index: i,
  936. UserSpaceID: uploadUserSpaces[i].UserSpace.UserSpace.UserSpaceID,
  937. FileHash: ret[fmt.Sprintf("%d", i)].(*ops2.FileHashValue).Hash,
  938. })
  939. }
  940. for _, b := range grpedBlocks {
  941. for _, hubID := range b.UserSpaceIDs {
  942. newBlocks = append(newBlocks, jcstypes.ObjectBlock{
  943. ObjectID: obj.Object.ObjectID,
  944. Index: b.Index,
  945. UserSpaceID: hubID,
  946. FileHash: b.FileHash,
  947. })
  948. }
  949. }
  950. return &db.UpdatingObjectRedundancy{
  951. ObjectID: obj.Object.ObjectID,
  952. Redundancy: red,
  953. Blocks: newBlocks,
  954. }, nil
  955. }
  956. */
  957. func (t *ChangeRedundancy) reconstructLRC(ctx *changeRedundancyContext, obj jcstypes.ObjectDetail, grpBlocks []jcstypes.GrouppedObjectBlock, red *jcstypes.LRCRedundancy, uploadUserSpaces []*userSpaceUsageInfo) (*db.UpdatingObjectRedundancy, datamap.SysEventBody, error) {
  958. var chosenBlocks []jcstypes.GrouppedObjectBlock
  959. var chosenBlockStg []jcstypes.UserSpaceDetail
  960. for _, block := range grpBlocks {
  961. if len(block.UserSpaceIDs) > 0 && block.Index < red.M() {
  962. stg, ok := ctx.allUserSpaces[block.UserSpaceIDs[0]]
  963. if !ok {
  964. continue
  965. }
  966. chosenBlocks = append(chosenBlocks, block)
  967. chosenBlockStg = append(chosenBlockStg, *stg.UserSpace)
  968. }
  969. if len(chosenBlocks) == red.K {
  970. break
  971. }
  972. }
  973. if len(chosenBlocks) < red.K {
  974. return nil, nil, fmt.Errorf("no enough blocks to reconstruct the original file data")
  975. }
  976. // 目前LRC的参数都相同,所以可以不用重建出完整数据然后再分块,可以直接构建出目的节点需要的块
  977. planBlder := exec.NewPlanBuilder()
  978. var froms []ioswitchlrc.From
  979. var toes []ioswitchlrc.To
  980. var newBlocks []jcstypes.ObjectBlock
  981. shouldUpdateBlocks := false
  982. for i, userspace := range uploadUserSpaces {
  983. newBlock := jcstypes.ObjectBlock{
  984. ObjectID: obj.Object.ObjectID,
  985. Index: i,
  986. UserSpaceID: userspace.UserSpace.UserSpace.UserSpaceID,
  987. }
  988. grp, ok := lo.Find(grpBlocks, func(grp jcstypes.GrouppedObjectBlock) bool { return grp.Index == i })
  989. // 如果新选中的节点已经记录在Block表中,那么就不需要任何变更
  990. if ok && lo.Contains(grp.UserSpaceIDs, userspace.UserSpace.UserSpace.UserSpaceID) {
  991. newBlock.FileHash = grp.FileHash
  992. newBlock.Size = grp.Size
  993. newBlocks = append(newBlocks, newBlock)
  994. continue
  995. }
  996. shouldUpdateBlocks = true
  997. // 否则就要重建出这个节点需要的块
  998. for i2, block := range chosenBlocks {
  999. froms = append(froms, ioswitchlrc.NewFromStorage(block.FileHash, chosenBlockStg[i2], block.Index))
  1000. }
  1001. // 输出只需要自己要保存的那一块
  1002. toes = append(toes, ioswitchlrc.NewToStorage(*userspace.UserSpace, i, fmt.Sprintf("%d", i)))
  1003. newBlocks = append(newBlocks, newBlock)
  1004. }
  1005. err := lrcparser.ReconstructAny(froms, toes, planBlder)
  1006. if err != nil {
  1007. return nil, nil, fmt.Errorf("parsing plan: %w", err)
  1008. }
  1009. fmt.Printf("plans: %v\n", planBlder)
  1010. // 如果没有任何Plan,Wait会直接返回成功
  1011. execCtx := exec.NewExecContext()
  1012. exec.SetValueByType(execCtx, ctx.ticktock.stgPool)
  1013. ret, err := planBlder.Execute(execCtx).Wait(context.Background())
  1014. if err != nil {
  1015. return nil, nil, fmt.Errorf("executing io plan: %w", err)
  1016. }
  1017. if !shouldUpdateBlocks {
  1018. return nil, nil, nil
  1019. }
  1020. for i := range newBlocks {
  1021. v := ret.Get(fmt.Sprintf("%v", i))
  1022. if v == nil {
  1023. // 有一些块已经存在,所以不会在计划执行结果中
  1024. continue
  1025. }
  1026. r := v.(*ops2.FileInfoValue)
  1027. newBlocks[i].FileHash = r.Hash
  1028. newBlocks[i].Size = r.Size
  1029. }
  1030. // TODO 产生系统事件
  1031. return &db.UpdatingObjectRedundancy{
  1032. ObjectID: obj.Object.ObjectID,
  1033. FileHash: obj.Object.FileHash,
  1034. Size: obj.Object.Size,
  1035. Redundancy: red,
  1036. Blocks: newBlocks,
  1037. }, nil, nil
  1038. }

本项目旨在将云际存储公共基础设施化,使个人及企业可低门槛使用高效的云际存储服务(安装开箱即用云际存储客户端即可,无需关注其他组件的部署),同时支持用户灵活便捷定制云际存储的功能细节。