You can not select more than 25 topics Topics must start with a chinese character,a letter or number, can include dashes ('-') and can be up to 35 characters long.

generator.go 15 kB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500
  1. package gen
  2. import (
  3. "fmt"
  4. "math"
  5. "gitlink.org.cn/cloudream/common/utils/lo2"
  6. "gitlink.org.cn/cloudream/common/utils/math2"
  7. "gitlink.org.cn/cloudream/common/utils/os2"
  8. "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/ioswitch/dag"
  9. "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/ioswitch2"
  10. "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/ioswitch2/ops2"
  11. "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/ioswitch2/parser/state"
  12. stgtypes "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/storage/types"
  13. jcstypes "gitlink.org.cn/cloudream/jcs-pub/common/types"
  14. )
  15. // 检查使用不同编码时参数是否设置到位
  16. func CheckEncodingParams(ctx *state.GenerateState) error {
  17. for _, f := range ctx.Ft.Froms {
  18. if f.GetStreamIndex().IsEC() {
  19. ctx.UseEC = true
  20. if ctx.Ft.ECParam == nil {
  21. return fmt.Errorf("EC encoding parameters not set")
  22. }
  23. }
  24. if f.GetStreamIndex().IsSegment() {
  25. ctx.UseSegment = true
  26. if ctx.Ft.SegmentParam == nil {
  27. return fmt.Errorf("segment parameters not set")
  28. }
  29. }
  30. }
  31. for _, t := range ctx.Ft.Toes {
  32. if t.GetStreamIndex().IsEC() {
  33. ctx.UseEC = true
  34. if ctx.Ft.ECParam == nil {
  35. return fmt.Errorf("EC encoding parameters not set")
  36. }
  37. }
  38. if t.GetStreamIndex().IsSegment() {
  39. ctx.UseSegment = true
  40. if ctx.Ft.SegmentParam == nil {
  41. return fmt.Errorf("segment parameters not set")
  42. }
  43. }
  44. }
  45. return nil
  46. }
  47. // 计算输入流的打开范围。如果From或者To中包含EC的流,则会将打开范围扩大到条带大小的整数倍。
  48. func CalcStreamRange(ctx *state.GenerateState) {
  49. rng := math2.NewRange(math.MaxInt64, 0)
  50. for _, to := range ctx.Ft.Toes {
  51. strIdx := to.GetStreamIndex()
  52. if strIdx.IsRaw() {
  53. toRng := to.GetRange()
  54. rng.ExtendStart(toRng.Offset)
  55. if toRng.Length != nil {
  56. rng.ExtendEnd(toRng.Offset + *toRng.Length)
  57. } else {
  58. rng.Length = nil
  59. }
  60. } else if strIdx.IsEC() {
  61. toRng := to.GetRange()
  62. stripSize := ctx.Ft.ECParam.StripSize()
  63. blkStartIndex := math2.FloorDiv(toRng.Offset, int64(ctx.Ft.ECParam.ChunkSize))
  64. rng.ExtendStart(blkStartIndex * stripSize)
  65. if toRng.Length != nil {
  66. blkEndIndex := math2.CeilDiv(toRng.Offset+*toRng.Length, int64(ctx.Ft.ECParam.ChunkSize))
  67. rng.ExtendEnd(blkEndIndex * stripSize)
  68. } else {
  69. rng.Length = nil
  70. }
  71. } else if strIdx.IsSegment() {
  72. // Segment节点的Range是相对于本段的,需要加上本段的起始位置
  73. toRng := to.GetRange()
  74. segStart := ctx.Ft.SegmentParam.CalcSegmentStart(strIdx.Index)
  75. offset := toRng.Offset + segStart
  76. rng.ExtendStart(offset)
  77. if toRng.Length != nil {
  78. rng.ExtendEnd(offset + *toRng.Length)
  79. } else {
  80. rng.Length = nil
  81. }
  82. }
  83. }
  84. if ctx.UseEC {
  85. stripSize := ctx.Ft.ECParam.StripSize()
  86. rng.ExtendStart(math2.Floor(rng.Offset, stripSize))
  87. if rng.Length != nil {
  88. rng.ExtendEnd(math2.Ceil(rng.Offset+*rng.Length, stripSize))
  89. }
  90. }
  91. ctx.StreamRange = rng
  92. }
  93. func Extend(ctx *state.GenerateState) error {
  94. for _, fr := range ctx.Ft.Froms {
  95. frNode, err := buildFromNode(ctx, fr)
  96. if err != nil {
  97. return err
  98. }
  99. ctx.IndexedStreams = append(ctx.IndexedStreams, state.IndexedStream{
  100. Stream: frNode.Output().Var(),
  101. StreamIndex: fr.GetStreamIndex(),
  102. })
  103. // 对于完整文件的From,生成Split指令
  104. if fr.GetStreamIndex().IsRaw() {
  105. // 只有输入输出需要EC编码的块时,才生成相关指令
  106. if ctx.UseEC {
  107. splitNode := ctx.DAG.NewChunkedSplit(ctx.Ft.ECParam.ChunkSize, ctx.Ft.ECParam.K)
  108. splitNode.Split(frNode.Output().Var())
  109. for i := 0; i < ctx.Ft.ECParam.K; i++ {
  110. ctx.IndexedStreams = append(ctx.IndexedStreams, state.IndexedStream{
  111. Stream: splitNode.SubStream(i),
  112. StreamIndex: ioswitch2.ECStream(i),
  113. })
  114. }
  115. }
  116. // 同上
  117. if ctx.UseSegment {
  118. splitNode := ctx.DAG.NewSegmentSplit(ctx.Ft.SegmentParam.Segments)
  119. frNode.Output().Var().ToSlot(splitNode.InputSlot())
  120. for i := 0; i < len(ctx.Ft.SegmentParam.Segments); i++ {
  121. ctx.IndexedStreams = append(ctx.IndexedStreams, state.IndexedStream{
  122. Stream: splitNode.Segment(i),
  123. StreamIndex: ioswitch2.SegmentStream(i),
  124. })
  125. }
  126. }
  127. }
  128. }
  129. if ctx.UseEC {
  130. // 如果有K个不同的文件块流,则生成Multiply指令,同时针对其生成的流,生成Join指令
  131. ecInputStrs := make(map[int]*dag.StreamVar)
  132. for _, s := range ctx.IndexedStreams {
  133. if s.StreamIndex.IsEC() && ecInputStrs[s.StreamIndex.Index] == nil {
  134. ecInputStrs[s.StreamIndex.Index] = s.Stream
  135. if len(ecInputStrs) == ctx.Ft.ECParam.K {
  136. break
  137. }
  138. }
  139. }
  140. if len(ecInputStrs) == ctx.Ft.ECParam.K {
  141. mulNode := ctx.DAG.NewECMultiply(*ctx.Ft.ECParam)
  142. for i, s := range ecInputStrs {
  143. mulNode.AddInput(s, i)
  144. }
  145. for i := 0; i < ctx.Ft.ECParam.N; i++ {
  146. ctx.IndexedStreams = append(ctx.IndexedStreams, state.IndexedStream{
  147. Stream: mulNode.NewOutput(i),
  148. StreamIndex: ioswitch2.ECStream(i),
  149. })
  150. }
  151. joinNode := ctx.DAG.NewChunkedJoin(ctx.Ft.ECParam.ChunkSize)
  152. for i := 0; i < ctx.Ft.ECParam.K; i++ {
  153. // 不可能找不到流
  154. joinNode.AddInput(findOutputStream(ctx, ioswitch2.ECStream(i)))
  155. }
  156. ctx.IndexedStreams = append(ctx.IndexedStreams, state.IndexedStream{
  157. Stream: joinNode.Joined(),
  158. StreamIndex: ioswitch2.RawStream(),
  159. })
  160. }
  161. }
  162. if ctx.UseSegment {
  163. // 先假设有所有的顺序分段,生成Join指令,后续根据Range再实际计算是否缺少流
  164. joinNode := ctx.DAG.NewSegmentJoin(ctx.Ft.SegmentParam.Segments)
  165. for i := 0; i < ctx.Ft.SegmentParam.SegmentCount(); i++ {
  166. str := findOutputStream(ctx, ioswitch2.SegmentStream(i))
  167. if str != nil {
  168. str.ToSlot(joinNode.InputSlot(i))
  169. }
  170. }
  171. ctx.IndexedStreams = append(ctx.IndexedStreams, state.IndexedStream{
  172. Stream: joinNode.Joined(),
  173. StreamIndex: ioswitch2.RawStream(),
  174. })
  175. // SegmentJoin生成的Join指令可以用来生成EC块
  176. if ctx.UseEC {
  177. splitNode := ctx.DAG.NewChunkedSplit(ctx.Ft.ECParam.ChunkSize, ctx.Ft.ECParam.K)
  178. splitNode.Split(joinNode.Joined())
  179. mulNode := ctx.DAG.NewECMultiply(*ctx.Ft.ECParam)
  180. for i := 0; i < ctx.Ft.ECParam.K; i++ {
  181. mulNode.AddInput(splitNode.SubStream(i), i)
  182. ctx.IndexedStreams = append(ctx.IndexedStreams, state.IndexedStream{
  183. Stream: splitNode.SubStream(i),
  184. StreamIndex: ioswitch2.ECStream(i),
  185. })
  186. }
  187. for i := 0; i < ctx.Ft.ECParam.N; i++ {
  188. ctx.IndexedStreams = append(ctx.IndexedStreams, state.IndexedStream{
  189. Stream: mulNode.NewOutput(i),
  190. StreamIndex: ioswitch2.ECStream(i),
  191. })
  192. }
  193. }
  194. }
  195. // 为每一个To找到一个输入流
  196. for _, to := range ctx.Ft.Toes {
  197. toNode, err := buildToNode(ctx, to)
  198. if err != nil {
  199. return err
  200. }
  201. str := findOutputStream(ctx, to.GetStreamIndex())
  202. if str == nil {
  203. return fmt.Errorf("no output stream found for data index %d", to.GetStreamIndex())
  204. }
  205. str.ToSlot(toNode.Input())
  206. }
  207. return nil
  208. }
  209. func buildFromNode(ctx *state.GenerateState, f ioswitch2.From) (ops2.FromNode, error) {
  210. var repRange math2.Range
  211. repRange.Offset = ctx.StreamRange.Offset
  212. if ctx.StreamRange.Length != nil {
  213. repRngLen := *ctx.StreamRange.Length
  214. repRange.Length = &repRngLen
  215. }
  216. var blkRange math2.Range
  217. if ctx.UseEC {
  218. blkRange.Offset = ctx.StreamRange.Offset / int64(ctx.Ft.ECParam.ChunkSize*ctx.Ft.ECParam.K) * int64(ctx.Ft.ECParam.ChunkSize)
  219. if ctx.StreamRange.Length != nil {
  220. blkRngLen := *ctx.StreamRange.Length / int64(ctx.Ft.ECParam.ChunkSize*ctx.Ft.ECParam.K) * int64(ctx.Ft.ECParam.ChunkSize)
  221. blkRange.Length = &blkRngLen
  222. }
  223. }
  224. switch f := f.(type) {
  225. case *ioswitch2.FromShardStore:
  226. getShard := ctx.DAG.NewGetShardInfo(f.UserSpace, f.FileHash)
  227. getShard.Env().ToEnvDriver(true)
  228. read := ctx.DAG.NewBaseReadDyn(f, f.UserSpace, stgtypes.DefaultOpen())
  229. getShard.FileInfoVar().ToSlot(read.FileInfoSlot())
  230. if f.StreamIndex.IsRaw() {
  231. read.Option.WithNullableLength(repRange.Offset, repRange.Length)
  232. } else if f.StreamIndex.IsEC() {
  233. read.Option.WithNullableLength(blkRange.Offset, blkRange.Length)
  234. } else if f.StreamIndex.IsSegment() {
  235. segStart := ctx.Ft.SegmentParam.CalcSegmentStart(f.StreamIndex.Index)
  236. segLen := ctx.Ft.SegmentParam.Segments[f.StreamIndex.Index]
  237. segEnd := segStart + segLen
  238. // 打开的范围不超过本段的范围
  239. openOff := ctx.StreamRange.Offset - segStart
  240. openOff = math2.Clamp(openOff, 0, segLen)
  241. openLen := segLen
  242. if ctx.StreamRange.Length != nil {
  243. strEnd := ctx.StreamRange.Offset + *ctx.StreamRange.Length
  244. openEnd := math2.Min(strEnd, segEnd)
  245. openLen = openEnd - segStart - openOff
  246. }
  247. read.Option.WithNullableLength(openOff, &openLen)
  248. }
  249. if err := setEnvBySpace(read, &f.UserSpace); err != nil {
  250. return nil, fmt.Errorf("set node env by user space: %w", err)
  251. }
  252. return read, nil
  253. case *ioswitch2.FromDriver:
  254. n := ctx.DAG.NewFromDriver(f, f.Handle)
  255. n.Env().ToEnvDriver(true)
  256. if f.StreamIndex.IsRaw() {
  257. f.Handle.RangeHint.Offset = repRange.Offset
  258. f.Handle.RangeHint.Length = repRange.Length
  259. } else if f.StreamIndex.IsEC() {
  260. f.Handle.RangeHint.Offset = blkRange.Offset
  261. f.Handle.RangeHint.Length = blkRange.Length
  262. } else if f.StreamIndex.IsSegment() {
  263. segStart := ctx.Ft.SegmentParam.CalcSegmentStart(f.StreamIndex.Index)
  264. segLen := ctx.Ft.SegmentParam.Segments[f.StreamIndex.Index]
  265. segEnd := segStart + segLen
  266. // 打开的范围不超过本段的范围
  267. openOff := repRange.Offset - segStart
  268. openOff = math2.Clamp(openOff, 0, segLen)
  269. openLen := segLen
  270. if repRange.Length != nil {
  271. repEnd := repRange.Offset + *repRange.Length
  272. openEnd := math2.Min(repEnd, segEnd)
  273. openLen = openEnd - openOff
  274. }
  275. f.Handle.RangeHint.Offset = openOff
  276. f.Handle.RangeHint.Length = &openLen
  277. }
  278. return n, nil
  279. case *ioswitch2.FromBaseStore:
  280. // TODO 可以考虑支持设置读取范围
  281. n := ctx.DAG.NewBaseRead(f, f.UserSpace, f.Path, stgtypes.DefaultOpen())
  282. if err := setEnvBySpace(n, &f.UserSpace); err != nil {
  283. return nil, fmt.Errorf("set node env by user space: %w", err)
  284. }
  285. return n, nil
  286. default:
  287. return nil, fmt.Errorf("unsupported from type %T", f)
  288. }
  289. }
  290. func buildToNode(ctx *state.GenerateState, t ioswitch2.To) (ops2.ToNode, error) {
  291. switch t := t.(type) {
  292. case *ioswitch2.ToShardStore:
  293. tempFileName := stgtypes.MakeTempDirPath(&t.UserSpace, os2.GenerateRandomFileName(20))
  294. write := ctx.DAG.NewBaseWrite(t, t.UserSpace, tempFileName, stgtypes.WriteOption{})
  295. if err := setEnvBySpace(write, &t.UserSpace); err != nil {
  296. return nil, fmt.Errorf("set node env by user space: %w", err)
  297. }
  298. write.Env().Pinned = true
  299. add := ctx.DAG.NewStoreShard(t.UserSpace, t.ResultStoreKey)
  300. add.Env().ToEnvDriver(true)
  301. write.FileInfoVar().ToSlot(add.FileInfoSlot())
  302. return write, nil
  303. case *ioswitch2.ToDriver:
  304. n := ctx.DAG.NewToDriver(t, t.Handle)
  305. n.Env().ToEnvDriver(true)
  306. return n, nil
  307. case *ioswitch2.ToBaseStore:
  308. n := ctx.DAG.NewBaseWrite(t, t.UserSpace, t.ObjectPath, t.Option)
  309. if err := setEnvBySpace(n, &t.UserSpace); err != nil {
  310. return nil, fmt.Errorf("set node env by user space: %w", err)
  311. }
  312. n.Env().Pinned = true
  313. return n, nil
  314. default:
  315. return nil, fmt.Errorf("unsupported to type %T", t)
  316. }
  317. }
  318. func setEnvBySpace(n dag.Node, space *jcstypes.UserSpaceDetail) error {
  319. if space.RecommendHub == nil {
  320. n.Env().ToEnvDriver(true)
  321. return nil
  322. }
  323. switch addr := space.RecommendHub.Address.(type) {
  324. case *jcstypes.HttpAddressInfo:
  325. n.Env().ToEnvWorker(&ioswitch2.HttpHubWorker{Hub: *space.RecommendHub}, true)
  326. case *jcstypes.GRPCAddressInfo:
  327. n.Env().ToEnvWorker(&ioswitch2.HubWorker{Hub: *space.RecommendHub, Address: *addr}, true)
  328. default:
  329. return fmt.Errorf("unsupported node address type %T", addr)
  330. }
  331. return nil
  332. }
  333. func findOutputStream(ctx *state.GenerateState, streamIndex ioswitch2.StreamIndex) *dag.StreamVar {
  334. var ret *dag.StreamVar
  335. for _, s := range ctx.IndexedStreams {
  336. if s.StreamIndex == streamIndex {
  337. ret = s.Stream
  338. break
  339. }
  340. }
  341. return ret
  342. }
  343. // 根据StreamRange,调整SegmentSplit中分段的个数和每段的大小
  344. func FixSegmentSplit(ctx *state.GenerateState) error {
  345. var err error
  346. dag.WalkOnlyType[*ops2.SegmentSplitNode](ctx.DAG.Graph, func(node *ops2.SegmentSplitNode) bool {
  347. var strEnd *int64
  348. if ctx.StreamRange.Length != nil {
  349. e := ctx.StreamRange.Offset + *ctx.StreamRange.Length
  350. strEnd = &e
  351. }
  352. startSeg, endSeg := ctx.Ft.SegmentParam.CalcSegmentRange(ctx.StreamRange.Offset, strEnd)
  353. // 关闭超出范围的分段
  354. for i := endSeg; i < len(node.Segments); i++ {
  355. node.OutputStreams().Get(i).ClearAllDst()
  356. }
  357. node.OutputStreams().Slots.RemoveRange(endSeg, ctx.Ft.SegmentParam.SegmentCount()-endSeg)
  358. node.Segments = lo2.RemoveRange(node.Segments, endSeg, ctx.Ft.SegmentParam.SegmentCount()-endSeg)
  359. for i := 0; i < startSeg; i++ {
  360. node.OutputStreams().Get(i).ClearAllDst()
  361. }
  362. node.OutputStreams().Slots.RemoveRange(0, startSeg)
  363. node.Segments = lo2.RemoveRange(node.Segments, 0, startSeg)
  364. // StreamRange开始的位置可能在某个分段的中间,此时这个分段的大小等于流开始位置到分段结束位置的距离
  365. startSegStart := ctx.Ft.SegmentParam.CalcSegmentStart(startSeg)
  366. node.Segments[0] -= ctx.StreamRange.Offset - startSegStart
  367. // StreamRange结束的位置可能在某个分段的中间,此时这个分段的大小就等于流结束位置到分段起始位置的距离
  368. if strEnd != nil {
  369. endSegStart := ctx.Ft.SegmentParam.CalcSegmentStart(endSeg - 1)
  370. node.Segments[len(node.Segments)-1] = *strEnd - endSegStart
  371. }
  372. return true
  373. })
  374. return err
  375. }
  376. // 从SegmentJoin中删除未使用的分段
  377. func FixSegmentJoin(ctx *state.GenerateState) error {
  378. var err error
  379. dag.WalkOnlyType[*ops2.SegmentJoinNode](ctx.DAG.Graph, func(node *ops2.SegmentJoinNode) bool {
  380. start := ctx.StreamRange.Offset
  381. var end *int64
  382. if ctx.StreamRange.Length != nil {
  383. e := ctx.StreamRange.Offset + *ctx.StreamRange.Length
  384. end = &e
  385. }
  386. startSeg, endSeg := ctx.Ft.SegmentParam.CalcSegmentRange(start, end)
  387. // 关闭超出范围的分段
  388. for i := endSeg; i < len(node.Segments); i++ {
  389. node.InputStreams().Get(i).NotTo(node)
  390. }
  391. node.InputStreams().Slots.RemoveRange(endSeg, ctx.Ft.SegmentParam.SegmentCount()-endSeg)
  392. node.Segments = lo2.RemoveRange(node.Segments, endSeg, ctx.Ft.SegmentParam.SegmentCount()-endSeg)
  393. for i := 0; i < startSeg; i++ {
  394. node.InputStreams().Get(i).NotTo(node)
  395. }
  396. node.InputStreams().Slots.RemoveRange(0, startSeg)
  397. node.Segments = lo2.RemoveRange(node.Segments, 0, startSeg)
  398. // StreamRange开始的位置可能在某个分段的中间,此时这个分段的大小等于流开始位置到分段结束位置的距离
  399. startSegStart := ctx.Ft.SegmentParam.CalcSegmentStart(startSeg)
  400. node.Segments[0] -= ctx.StreamRange.Offset - startSegStart
  401. // 检查一下必须的分段是否都被加入到Join中
  402. for i := 0; i < node.InputStreams().Len(); i++ {
  403. if node.InputStreams().Get(i) == nil {
  404. err = fmt.Errorf("segment %v missed to join an raw stream", i+startSeg)
  405. return false
  406. }
  407. }
  408. return true
  409. })
  410. return err
  411. }

本项目旨在将云际存储公共基础设施化,使个人及企业可低门槛使用高效的云际存储服务(安装开箱即用云际存储客户端即可,无需关注其他组件的部署),同时支持用户灵活便捷定制云际存储的功能细节。