You can not select more than 25 topics Topics must start with a chinese character,a letter or number, can include dashes ('-') and can be up to 35 characters long.

multipart.go 3.5 kB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103
  1. package opt
  2. import (
  3. "gitlink.org.cn/cloudream/common/pkgs/ioswitch/dag"
  4. "gitlink.org.cn/cloudream/common/utils/math2"
  5. "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/ioswitch2/ops2"
  6. "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/ioswitch2/parser/state"
  7. "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/storage/factory"
  8. )
  9. // 将SegmentJoin指令替换成分片上传指令
  10. func UseMultipartUploadToShardStore(ctx *state.GenerateState) {
  11. dag.WalkOnlyType[*ops2.SegmentJoinNode](ctx.DAG.Graph, func(joinNode *ops2.SegmentJoinNode) bool {
  12. if joinNode.Joined().Dst.Len() != 1 {
  13. return true
  14. }
  15. joinDst := joinNode.Joined().Dst.Get(0)
  16. bwNode, ok := joinDst.(*ops2.BaseWriteNode)
  17. if !ok {
  18. return true
  19. }
  20. // SegmentJoin的输出流的范围必须与ToShardStore的输入流的范围相同,
  21. // 虽然可以通过调整SegmentJoin的输入流来调整范围,但太复杂,暂不支持
  22. toStrIdx := bwNode.GetTo().GetStreamIndex()
  23. toStrRng := bwNode.GetTo().GetRange()
  24. if toStrIdx.IsRaw() {
  25. if !toStrRng.Equals(ctx.StreamRange) {
  26. return true
  27. }
  28. } else {
  29. return true
  30. }
  31. // Join的目的地必须支持MultipartUpload功能才能替换成分片上传
  32. multiUpload, err := factory.GetBuilder(&bwNode.UserSpace).CreateMultiparter(true)
  33. if err != nil {
  34. return true
  35. }
  36. // Join的每一个段的大小必须超过最小分片大小。
  37. // 目前只支持拆分超过最大分片的流,不支持合并多个小段流以达到最小分片大小。
  38. for _, size := range joinNode.Segments {
  39. if size < multiUpload.MinPartSize() {
  40. return true
  41. }
  42. }
  43. initNode := ctx.DAG.NewMultipartInitiator(bwNode.UserSpace)
  44. initNode.Env().CopyFrom(bwNode.Env())
  45. partNumber := 1
  46. for i, size := range joinNode.Segments {
  47. joinInput := joinNode.InputSlot(i)
  48. if size > multiUpload.MaxPartSize() {
  49. // 如果一个分段的大小大于最大分片大小,则需要拆分为多个小段上传
  50. // 拆分以及上传指令直接在流的产生节点执行
  51. splits := math2.SplitLessThan(size, multiUpload.MaxPartSize())
  52. splitNode := ctx.DAG.NewSegmentSplit(splits)
  53. splitNode.Env().CopyFrom(joinInput.Var().Src.Env())
  54. joinInput.Var().ToSlot(splitNode.InputSlot())
  55. for i2 := 0; i2 < len(splits); i2++ {
  56. uploadNode := ctx.DAG.NewMultipartUpload(bwNode.UserSpace, partNumber, splits[i2])
  57. uploadNode.Env().CopyFrom(joinInput.Var().Src.Env())
  58. initNode.UploadArgsVar().ToSlot(uploadNode.UploadArgsSlot())
  59. splitNode.SegmentVar(i2).ToSlot(uploadNode.PartStreamSlot())
  60. uploadNode.UploadResultVar().ToSlot(initNode.PartInfoSlot())
  61. partNumber++
  62. }
  63. } else {
  64. // 否则直接上传整个分段
  65. uploadNode := ctx.DAG.NewMultipartUpload(bwNode.UserSpace, partNumber, size)
  66. // 上传指令直接在流的产生节点执行
  67. uploadNode.Env().CopyFrom(joinInput.Var().Src.Env())
  68. initNode.UploadArgsVar().ToSlot(uploadNode.UploadArgsSlot())
  69. joinInput.Var().ToSlot(uploadNode.PartStreamSlot())
  70. uploadNode.UploadResultVar().ToSlot(initNode.PartInfoSlot())
  71. partNumber++
  72. }
  73. joinInput.Var().NotTo(joinNode)
  74. }
  75. // BaseWriteNode的FileInfoVar替换为MultipartInitiatorNode的FileInfoVar
  76. for _, dstSlot := range bwNode.FileInfoVar().ListDstSlots() {
  77. initNode.FileInfoVar().ToSlot(dstSlot)
  78. }
  79. // 最后删除Join指令和ToShardStore指令
  80. ctx.DAG.RemoveNode(joinNode)
  81. ctx.DAG.RemoveNode(bwNode)
  82. delete(ctx.ToNodes, bwNode.GetTo())
  83. return true
  84. })
  85. }

本项目旨在将云际存储公共基础设施化,使个人及企业可低门槛使用高效的云际存储服务(安装开箱即用云际存储客户端即可,无需关注其他组件的部署),同时支持用户灵活便捷定制云际存储的功能细节。