You can not select more than 25 topics Topics must start with a chinese character,a letter or number, can include dashes ('-') and can be up to 35 characters long.

parser.go 22 kB

1 year ago
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836
  1. package parser
  2. import (
  3. "fmt"
  4. "math"
  5. "gitlink.org.cn/cloudream/common/pkgs/ioswitch/dag"
  6. "gitlink.org.cn/cloudream/common/pkgs/ioswitch/exec"
  7. "gitlink.org.cn/cloudream/common/pkgs/ioswitch/plan"
  8. cdssdk "gitlink.org.cn/cloudream/common/sdks/storage"
  9. "gitlink.org.cn/cloudream/common/utils/lo2"
  10. "gitlink.org.cn/cloudream/common/utils/math2"
  11. "gitlink.org.cn/cloudream/storage/common/pkgs/ioswitch2"
  12. "gitlink.org.cn/cloudream/storage/common/pkgs/ioswitch2/ops2"
  13. "gitlink.org.cn/cloudream/storage/common/pkgs/storage/types"
  14. )
  15. type IndexedStream struct {
  16. Stream *dag.Var
  17. StreamIndex ioswitch2.StreamIndex
  18. }
  19. type ParseContext struct {
  20. Ft ioswitch2.FromTo
  21. DAG *ops2.GraphNodeBuilder
  22. // 为了产生所有To所需的数据范围,而需要From打开的范围。
  23. // 这个范围是基于整个文件的,且上下界都取整到条带大小的整数倍,因此上界是有可能超过文件大小的。
  24. ToNodes map[ioswitch2.To]ops2.ToNode
  25. IndexedStreams []IndexedStream
  26. StreamRange exec.Range
  27. UseEC bool // 是否使用纠删码
  28. UseSegment bool // 是否使用分段
  29. }
  30. func Parse(ft ioswitch2.FromTo, blder *exec.PlanBuilder) error {
  31. ctx := ParseContext{
  32. Ft: ft,
  33. DAG: ops2.NewGraphNodeBuilder(),
  34. ToNodes: make(map[ioswitch2.To]ops2.ToNode),
  35. }
  36. // 分成两个阶段:
  37. // 1. 基于From和To生成更多指令,初步匹配to的需求
  38. err := checkEncodingParams(&ctx)
  39. if err != nil {
  40. return err
  41. }
  42. // 计算一下打开流的范围
  43. calcStreamRange(&ctx)
  44. err = extend(&ctx)
  45. if err != nil {
  46. return err
  47. }
  48. // 2. 优化上一步生成的指令
  49. err = removeUnusedSegment(&ctx)
  50. if err != nil {
  51. return err
  52. }
  53. // 对于删除指令的优化,需要反复进行,直到没有变化为止。
  54. // 从目前实现上来说不会死循环
  55. for {
  56. opted := false
  57. if removeUnusedJoin(&ctx) {
  58. opted = true
  59. }
  60. if removeUnusedMultiplyOutput(&ctx) {
  61. opted = true
  62. }
  63. if removeUnusedSplit(&ctx) {
  64. opted = true
  65. }
  66. if omitSplitJoin(&ctx) {
  67. opted = true
  68. }
  69. if !opted {
  70. break
  71. }
  72. }
  73. // 确定指令执行位置的过程,也需要反复进行,直到没有变化为止。
  74. for pin(&ctx) {
  75. }
  76. // 下面这些只需要执行一次,但需要按顺序
  77. removeUnusedFromNode(&ctx)
  78. dropUnused(&ctx)
  79. storeIPFSWriteResult(&ctx)
  80. generateRange(&ctx)
  81. generateClone(&ctx)
  82. return plan.Generate(ctx.DAG.Graph, blder)
  83. }
  84. func findOutputStream(ctx *ParseContext, streamIndex ioswitch2.StreamIndex) *dag.Var {
  85. var ret *dag.Var
  86. for _, s := range ctx.IndexedStreams {
  87. if s.StreamIndex == streamIndex {
  88. ret = s.Stream
  89. break
  90. }
  91. }
  92. return ret
  93. }
  94. // 检查使用不同编码时参数是否设置到位
  95. func checkEncodingParams(ctx *ParseContext) error {
  96. for _, f := range ctx.Ft.Froms {
  97. if f.GetStreamIndex().IsEC() {
  98. ctx.UseEC = true
  99. if ctx.Ft.ECParam == nil {
  100. return fmt.Errorf("EC encoding parameters not set")
  101. }
  102. }
  103. if f.GetStreamIndex().IsSegment() {
  104. ctx.UseSegment = true
  105. if ctx.Ft.SegmentParam == nil {
  106. return fmt.Errorf("segment parameters not set")
  107. }
  108. }
  109. }
  110. for _, t := range ctx.Ft.Toes {
  111. if t.GetStreamIndex().IsEC() {
  112. ctx.UseEC = true
  113. if ctx.Ft.ECParam == nil {
  114. return fmt.Errorf("EC encoding parameters not set")
  115. }
  116. }
  117. if t.GetStreamIndex().IsSegment() {
  118. ctx.UseSegment = true
  119. if ctx.Ft.SegmentParam == nil {
  120. return fmt.Errorf("segment parameters not set")
  121. }
  122. }
  123. }
  124. return nil
  125. }
  126. // 计算输入流的打开范围。如果From或者To中包含EC的流,则会将打开范围扩大到条带大小的整数倍。
  127. func calcStreamRange(ctx *ParseContext) {
  128. rng := exec.NewRange(math.MaxInt64, 0)
  129. for _, to := range ctx.Ft.Toes {
  130. strIdx := to.GetStreamIndex()
  131. if strIdx.IsRaw() {
  132. toRng := to.GetRange()
  133. rng.ExtendStart(toRng.Offset)
  134. if toRng.Length != nil {
  135. rng.ExtendEnd(toRng.Offset + *toRng.Length)
  136. } else {
  137. rng.Length = nil
  138. }
  139. } else if strIdx.IsEC() {
  140. toRng := to.GetRange()
  141. stripSize := ctx.Ft.ECParam.StripSize()
  142. blkStartIndex := math2.FloorDiv(toRng.Offset, int64(ctx.Ft.ECParam.ChunkSize))
  143. rng.ExtendStart(blkStartIndex * stripSize)
  144. if toRng.Length != nil {
  145. blkEndIndex := math2.CeilDiv(toRng.Offset+*toRng.Length, int64(ctx.Ft.ECParam.ChunkSize))
  146. rng.ExtendEnd(blkEndIndex * stripSize)
  147. } else {
  148. rng.Length = nil
  149. }
  150. } else if strIdx.IsSegment() {
  151. // Segment节点的Range是相对于本段的,需要加上本段的起始位置
  152. toRng := to.GetRange()
  153. segStart := ctx.Ft.SegmentParam.CalcSegmentStart(strIdx.Index)
  154. offset := toRng.Offset + segStart
  155. rng.ExtendStart(offset)
  156. if toRng.Length != nil {
  157. rng.ExtendEnd(offset + *toRng.Length)
  158. } else {
  159. rng.Length = nil
  160. }
  161. }
  162. }
  163. if ctx.UseEC {
  164. stripSize := ctx.Ft.ECParam.StripSize()
  165. rng.ExtendStart(math2.Floor(rng.Offset, stripSize))
  166. if rng.Length != nil {
  167. rng.ExtendEnd(math2.Ceil(rng.Offset+*rng.Length, stripSize))
  168. }
  169. }
  170. ctx.StreamRange = rng
  171. }
  172. func extend(ctx *ParseContext) error {
  173. for _, fr := range ctx.Ft.Froms {
  174. frNode, err := buildFromNode(ctx, fr)
  175. if err != nil {
  176. return err
  177. }
  178. ctx.IndexedStreams = append(ctx.IndexedStreams, IndexedStream{
  179. Stream: frNode.Output().Var,
  180. StreamIndex: fr.GetStreamIndex(),
  181. })
  182. // 对于完整文件的From,生成Split指令
  183. if fr.GetStreamIndex().IsRaw() {
  184. // 只有输入输出需要EC编码的块时,才生成相关指令
  185. if ctx.UseEC {
  186. splitNode := ctx.DAG.NewChunkedSplit(ctx.Ft.ECParam.ChunkSize)
  187. splitNode.Split(frNode.Output().Var, ctx.Ft.ECParam.K)
  188. for i := 0; i < ctx.Ft.ECParam.K; i++ {
  189. ctx.IndexedStreams = append(ctx.IndexedStreams, IndexedStream{
  190. Stream: splitNode.SubStream(i),
  191. StreamIndex: ioswitch2.ECSrteam(i),
  192. })
  193. }
  194. }
  195. // 同上
  196. if ctx.UseSegment {
  197. splitNode := ctx.DAG.NewSegmentSplit(ctx.Ft.SegmentParam.Segments)
  198. splitNode.SetInput(frNode.Output().Var)
  199. for i := 0; i < len(ctx.Ft.SegmentParam.Segments); i++ {
  200. ctx.IndexedStreams = append(ctx.IndexedStreams, IndexedStream{
  201. Stream: splitNode.Segment(i),
  202. StreamIndex: ioswitch2.SegmentStream(i),
  203. })
  204. }
  205. }
  206. }
  207. }
  208. if ctx.UseEC {
  209. // 如果有K个不同的文件块流,则生成Multiply指令,同时针对其生成的流,生成Join指令
  210. ecInputStrs := make(map[int]*dag.Var)
  211. for _, s := range ctx.IndexedStreams {
  212. if s.StreamIndex.IsEC() && ecInputStrs[s.StreamIndex.Index] == nil {
  213. ecInputStrs[s.StreamIndex.Index] = s.Stream
  214. if len(ecInputStrs) == ctx.Ft.ECParam.K {
  215. break
  216. }
  217. }
  218. }
  219. if len(ecInputStrs) == ctx.Ft.ECParam.K {
  220. mulNode := ctx.DAG.NewECMultiply(*ctx.Ft.ECParam)
  221. for i, s := range ecInputStrs {
  222. mulNode.AddInput(s, i)
  223. }
  224. for i := 0; i < ctx.Ft.ECParam.N; i++ {
  225. ctx.IndexedStreams = append(ctx.IndexedStreams, IndexedStream{
  226. Stream: mulNode.NewOutput(i),
  227. StreamIndex: ioswitch2.ECSrteam(i),
  228. })
  229. }
  230. joinNode := ctx.DAG.NewChunkedJoin(ctx.Ft.ECParam.ChunkSize)
  231. for i := 0; i < ctx.Ft.ECParam.K; i++ {
  232. // 不可能找不到流
  233. joinNode.AddInput(findOutputStream(ctx, ioswitch2.ECSrteam(i)))
  234. }
  235. ctx.IndexedStreams = append(ctx.IndexedStreams, IndexedStream{
  236. Stream: joinNode.Joined(),
  237. StreamIndex: ioswitch2.RawStream(),
  238. })
  239. }
  240. }
  241. if ctx.UseSegment {
  242. // 先假设有所有的顺序分段,生成Join指令,后续根据Range再实际计算是否缺少流
  243. joinNode := ctx.DAG.NewSegmentJoin(ctx.Ft.SegmentParam.Segments)
  244. for i := 0; i < ctx.Ft.SegmentParam.SegmentCount(); i++ {
  245. str := findOutputStream(ctx, ioswitch2.SegmentStream(i))
  246. if str != nil {
  247. joinNode.SetInput(i, str)
  248. }
  249. }
  250. ctx.IndexedStreams = append(ctx.IndexedStreams, IndexedStream{
  251. Stream: joinNode.Joined(),
  252. StreamIndex: ioswitch2.RawStream(),
  253. })
  254. // SegmentJoin生成的Join指令可以用来生成EC块
  255. if ctx.UseEC {
  256. splitNode := ctx.DAG.NewChunkedSplit(ctx.Ft.ECParam.ChunkSize)
  257. splitNode.Split(joinNode.Joined(), ctx.Ft.ECParam.K)
  258. mulNode := ctx.DAG.NewECMultiply(*ctx.Ft.ECParam)
  259. for i := 0; i < ctx.Ft.ECParam.K; i++ {
  260. mulNode.AddInput(splitNode.SubStream(i), i)
  261. ctx.IndexedStreams = append(ctx.IndexedStreams, IndexedStream{
  262. Stream: splitNode.SubStream(i),
  263. StreamIndex: ioswitch2.ECSrteam(i),
  264. })
  265. }
  266. for i := 0; i < ctx.Ft.ECParam.N; i++ {
  267. ctx.IndexedStreams = append(ctx.IndexedStreams, IndexedStream{
  268. Stream: mulNode.NewOutput(i),
  269. StreamIndex: ioswitch2.ECSrteam(i),
  270. })
  271. }
  272. }
  273. }
  274. // 为每一个To找到一个输入流
  275. for _, to := range ctx.Ft.Toes {
  276. toNode, err := buildToNode(ctx, to)
  277. if err != nil {
  278. return err
  279. }
  280. ctx.ToNodes[to] = toNode
  281. str := findOutputStream(ctx, to.GetStreamIndex())
  282. if str == nil {
  283. return fmt.Errorf("no output stream found for data index %d", to.GetStreamIndex())
  284. }
  285. toNode.SetInput(str)
  286. }
  287. return nil
  288. }
  289. func buildFromNode(ctx *ParseContext, f ioswitch2.From) (ops2.FromNode, error) {
  290. var repRange exec.Range
  291. repRange.Offset = ctx.StreamRange.Offset
  292. if ctx.StreamRange.Length != nil {
  293. repRngLen := *ctx.StreamRange.Length
  294. repRange.Length = &repRngLen
  295. }
  296. var blkRange exec.Range
  297. if ctx.UseEC {
  298. blkRange.Offset = ctx.StreamRange.Offset / int64(ctx.Ft.ECParam.ChunkSize*ctx.Ft.ECParam.K) * int64(ctx.Ft.ECParam.ChunkSize)
  299. if ctx.StreamRange.Length != nil {
  300. blkRngLen := *ctx.StreamRange.Length / int64(ctx.Ft.ECParam.ChunkSize*ctx.Ft.ECParam.K) * int64(ctx.Ft.ECParam.ChunkSize)
  301. blkRange.Length = &blkRngLen
  302. }
  303. }
  304. switch f := f.(type) {
  305. case *ioswitch2.FromShardstore:
  306. t := ctx.DAG.NewShardRead(f, f.Storage.StorageID, types.NewOpen(f.FileHash))
  307. if f.StreamIndex.IsRaw() {
  308. t.Open.WithNullableLength(repRange.Offset, repRange.Length)
  309. } else if f.StreamIndex.IsEC() {
  310. t.Open.WithNullableLength(blkRange.Offset, blkRange.Length)
  311. } else if f.StreamIndex.IsSegment() {
  312. segStart := ctx.Ft.SegmentParam.CalcSegmentStart(f.StreamIndex.Index)
  313. segLen := ctx.Ft.SegmentParam.Segments[f.StreamIndex.Index]
  314. segEnd := segStart + segLen
  315. // 打开的范围不超过本段的范围
  316. openOff := ctx.StreamRange.Offset - segStart
  317. openOff = math2.Clamp(openOff, 0, segLen)
  318. openLen := segLen
  319. if ctx.StreamRange.Length != nil {
  320. strEnd := ctx.StreamRange.Offset + *ctx.StreamRange.Length
  321. openEnd := math2.Min(strEnd, segEnd)
  322. openLen = openEnd - segStart - openOff
  323. }
  324. t.Open.WithNullableLength(openOff, &openLen)
  325. }
  326. switch addr := f.Hub.Address.(type) {
  327. case *cdssdk.HttpAddressInfo:
  328. t.Env().ToEnvWorker(&ioswitch2.HttpHubWorker{Hub: f.Hub})
  329. t.Env().Pinned = true
  330. case *cdssdk.GRPCAddressInfo:
  331. t.Env().ToEnvWorker(&ioswitch2.AgentWorker{Hub: f.Hub, Address: *addr})
  332. t.Env().Pinned = true
  333. default:
  334. return nil, fmt.Errorf("unsupported node address type %T", addr)
  335. }
  336. return t, nil
  337. case *ioswitch2.FromDriver:
  338. n := ctx.DAG.NewFromDriver(f, f.Handle)
  339. n.Env().ToEnvDriver()
  340. n.Env().Pinned = true
  341. if f.StreamIndex.IsRaw() {
  342. f.Handle.RangeHint.Offset = repRange.Offset
  343. f.Handle.RangeHint.Length = repRange.Length
  344. } else if f.StreamIndex.IsEC() {
  345. f.Handle.RangeHint.Offset = blkRange.Offset
  346. f.Handle.RangeHint.Length = blkRange.Length
  347. } else if f.StreamIndex.IsSegment() {
  348. segStart := ctx.Ft.SegmentParam.CalcSegmentStart(f.StreamIndex.Index)
  349. segLen := ctx.Ft.SegmentParam.Segments[f.StreamIndex.Index]
  350. segEnd := segStart + segLen
  351. // 打开的范围不超过本段的范围
  352. openOff := repRange.Offset - segStart
  353. openOff = math2.Clamp(openOff, 0, segLen)
  354. openLen := segLen
  355. if repRange.Length != nil {
  356. repEnd := repRange.Offset + *repRange.Length
  357. openEnd := math2.Min(repEnd, segEnd)
  358. openLen = openEnd - openOff
  359. }
  360. f.Handle.RangeHint.Offset = openOff
  361. f.Handle.RangeHint.Length = &openLen
  362. }
  363. return n, nil
  364. default:
  365. return nil, fmt.Errorf("unsupported from type %T", f)
  366. }
  367. }
  368. func buildToNode(ctx *ParseContext, t ioswitch2.To) (ops2.ToNode, error) {
  369. switch t := t.(type) {
  370. case *ioswitch2.ToShardStore:
  371. n := ctx.DAG.NewShardWrite(t, t.Storage.StorageID, t.FileHashStoreKey)
  372. if err := setEnvByAddress(n, t.Hub, t.Hub.Address); err != nil {
  373. return nil, err
  374. }
  375. n.Env().Pinned = true
  376. return n, nil
  377. case *ioswitch2.ToDriver:
  378. n := ctx.DAG.NewToDriver(t, t.Handle)
  379. n.Env().ToEnvDriver()
  380. n.Env().Pinned = true
  381. return n, nil
  382. case *ioswitch2.LoadToShared:
  383. n := ctx.DAG.NewSharedLoad(t, t.Storage.StorageID, t.UserID, t.PackageID, t.Path)
  384. if err := setEnvByAddress(n, t.Hub, t.Hub.Address); err != nil {
  385. return nil, err
  386. }
  387. n.Env().Pinned = true
  388. return n, nil
  389. default:
  390. return nil, fmt.Errorf("unsupported to type %T", t)
  391. }
  392. }
  393. func setEnvByAddress(n dag.Node, hub cdssdk.Hub, addr cdssdk.HubAddressInfo) error {
  394. switch addr := addr.(type) {
  395. case *cdssdk.HttpAddressInfo:
  396. n.Env().ToEnvWorker(&ioswitch2.HttpHubWorker{Hub: hub})
  397. case *cdssdk.GRPCAddressInfo:
  398. n.Env().ToEnvWorker(&ioswitch2.AgentWorker{Hub: hub, Address: *addr})
  399. default:
  400. return fmt.Errorf("unsupported node address type %T", addr)
  401. }
  402. return nil
  403. }
  404. // 从SegmentJoin中删除未使用的分段
  405. func removeUnusedSegment(ctx *ParseContext) error {
  406. var err error
  407. dag.WalkOnlyType[*ops2.SegmentJoinNode](ctx.DAG.Graph, func(node *ops2.SegmentJoinNode) bool {
  408. start := ctx.StreamRange.Offset
  409. var end *int64
  410. if ctx.StreamRange.Length != nil {
  411. e := ctx.StreamRange.Offset + *ctx.StreamRange.Length
  412. end = &e
  413. }
  414. segStart, segEnd := ctx.Ft.SegmentParam.CalcSegmentRange(start, end)
  415. node.MarkUsed(segStart, segEnd)
  416. for i := segStart; i < segEnd; i++ {
  417. if node.InputStreams().Get(i) == nil {
  418. err = fmt.Errorf("segment %v missed to join an raw stream", i)
  419. return false
  420. }
  421. }
  422. return true
  423. })
  424. return err
  425. }
  426. // 删除输出流未被使用的Join指令
  427. func removeUnusedJoin(ctx *ParseContext) bool {
  428. changed := false
  429. dag.WalkOnlyType[*ops2.ChunkedJoinNode](ctx.DAG.Graph, func(node *ops2.ChunkedJoinNode) bool {
  430. if node.InputStreams().Len() > 0 {
  431. return true
  432. }
  433. node.RemoveAllInputs()
  434. ctx.DAG.RemoveNode(node)
  435. return true
  436. })
  437. return changed
  438. }
  439. // 减少未使用的Multiply指令的输出流。如果减少到0,则删除该指令
  440. func removeUnusedMultiplyOutput(ctx *ParseContext) bool {
  441. changed := false
  442. dag.WalkOnlyType[*ops2.ECMultiplyNode](ctx.DAG.Graph, func(node *ops2.ECMultiplyNode) bool {
  443. outArr := node.OutputStreams().RawArray()
  444. for i2, out := range outArr {
  445. if out.To().Len() > 0 {
  446. continue
  447. }
  448. outArr[i2] = nil
  449. node.OutputIndexes[i2] = -2
  450. changed = true
  451. }
  452. // TODO2 没有修改SlotIndex
  453. node.OutputStreams().SetRawArray(lo2.RemoveAllDefault(outArr))
  454. node.OutputIndexes = lo2.RemoveAll(node.OutputIndexes, -2)
  455. // 如果所有输出流都被删除,则删除该指令
  456. if node.OutputStreams().Len() == 0 {
  457. node.RemoveAllInputs()
  458. ctx.DAG.RemoveNode(node)
  459. changed = true
  460. }
  461. return true
  462. })
  463. return changed
  464. }
  465. // 删除未使用的Split指令
  466. func removeUnusedSplit(ctx *ParseContext) bool {
  467. changed := false
  468. dag.WalkOnlyType[*ops2.ChunkedSplitNode](ctx.DAG.Graph, func(typ *ops2.ChunkedSplitNode) bool {
  469. // Split出来的每一个流都没有被使用,才能删除这个指令
  470. for _, out := range typ.OutputStreams().RawArray() {
  471. if out.To().Len() > 0 {
  472. return true
  473. }
  474. }
  475. typ.Clear()
  476. ctx.DAG.RemoveNode(typ)
  477. changed = true
  478. return true
  479. })
  480. return changed
  481. }
  482. // 如果Split的结果被完全用于Join,则省略Split和Join指令
  483. func omitSplitJoin(ctx *ParseContext) bool {
  484. changed := false
  485. dag.WalkOnlyType[*ops2.ChunkedSplitNode](ctx.DAG.Graph, func(splitNode *ops2.ChunkedSplitNode) bool {
  486. // Split指令的每一个输出都有且只有一个目的地
  487. var dstNode dag.Node
  488. for _, out := range splitNode.OutputStreams().RawArray() {
  489. if out.To().Len() != 1 {
  490. return true
  491. }
  492. if dstNode == nil {
  493. dstNode = out.To().Get(0).Node
  494. } else if dstNode != out.To().Get(0).Node {
  495. return true
  496. }
  497. }
  498. if dstNode == nil {
  499. return true
  500. }
  501. // 且这个目的地要是一个Join指令
  502. joinNode, ok := dstNode.(*ops2.ChunkedJoinNode)
  503. if !ok {
  504. return true
  505. }
  506. // 同时这个Join指令的输入也必须全部来自Split指令的输出。
  507. // 由于上面判断了Split指令的输出目的地都相同,所以这里只要判断Join指令的输入数量是否与Split指令的输出数量相同即可
  508. if joinNode.InputStreams().Len() != splitNode.OutputStreams().Len() {
  509. return true
  510. }
  511. // 所有条件都满足,可以开始省略操作,将Join操作的目的地的输入流替换为Split操作的输入流:
  512. // F->Split->Join->T 变换为:F->T
  513. splitInput := splitNode.InputStreams().Get(0)
  514. for _, to := range joinNode.Joined().To().RawArray() {
  515. splitInput.StreamTo(to.Node, to.SlotIndex)
  516. }
  517. splitInput.StreamNotTo(splitNode, 0)
  518. // 并删除这两个指令
  519. ctx.DAG.RemoveNode(joinNode)
  520. ctx.DAG.RemoveNode(splitNode)
  521. changed = true
  522. return true
  523. })
  524. return changed
  525. }
  526. // 通过流的输入输出位置来确定指令的执行位置。
  527. // To系列的指令都会有固定的执行位置,这些位置会随着pin操作逐步扩散到整个DAG,
  528. // 所以理论上不会出现有指令的位置始终无法确定的情况。
  529. func pin(ctx *ParseContext) bool {
  530. changed := false
  531. ctx.DAG.Walk(func(node dag.Node) bool {
  532. if node.Env().Pinned {
  533. return true
  534. }
  535. var toEnv *dag.NodeEnv
  536. for _, out := range node.OutputStreams().RawArray() {
  537. for _, to := range out.To().RawArray() {
  538. if to.Node.Env().Type == dag.EnvUnknown {
  539. continue
  540. }
  541. if toEnv == nil {
  542. toEnv = to.Node.Env()
  543. } else if !toEnv.Equals(to.Node.Env()) {
  544. toEnv = nil
  545. break
  546. }
  547. }
  548. }
  549. if toEnv != nil {
  550. if !node.Env().Equals(toEnv) {
  551. changed = true
  552. }
  553. *node.Env() = *toEnv
  554. return true
  555. }
  556. // 否则根据输入流的始发地来固定
  557. var fromEnv *dag.NodeEnv
  558. for _, in := range node.InputStreams().RawArray() {
  559. if in.From().Node.Env().Type == dag.EnvUnknown {
  560. continue
  561. }
  562. if fromEnv == nil {
  563. fromEnv = in.From().Node.Env()
  564. } else if !fromEnv.Equals(in.From().Node.Env()) {
  565. fromEnv = nil
  566. break
  567. }
  568. }
  569. if fromEnv != nil {
  570. if !node.Env().Equals(fromEnv) {
  571. changed = true
  572. }
  573. *node.Env() = *fromEnv
  574. }
  575. return true
  576. })
  577. return changed
  578. }
  579. // 删除未使用的From流,不会删除FromDriver
  580. func removeUnusedFromNode(ctx *ParseContext) {
  581. dag.WalkOnlyType[ops2.FromNode](ctx.DAG.Graph, func(node ops2.FromNode) bool {
  582. if _, ok := node.(*ops2.FromDriverNode); ok {
  583. return true
  584. }
  585. if node.Output().Var == nil {
  586. ctx.DAG.RemoveNode(node)
  587. }
  588. return true
  589. })
  590. }
  591. // 对于所有未使用的流,增加Drop指令
  592. func dropUnused(ctx *ParseContext) {
  593. ctx.DAG.Walk(func(node dag.Node) bool {
  594. for _, out := range node.OutputStreams().RawArray() {
  595. if out.To().Len() == 0 {
  596. n := ctx.DAG.NewDropStream()
  597. *n.Env() = *node.Env()
  598. n.SetInput(out)
  599. }
  600. }
  601. return true
  602. })
  603. }
  604. // 为IPFS写入指令存储结果
  605. func storeIPFSWriteResult(ctx *ParseContext) {
  606. dag.WalkOnlyType[*ops2.ShardWriteNode](ctx.DAG.Graph, func(n *ops2.ShardWriteNode) bool {
  607. if n.FileHashStoreKey == "" {
  608. return true
  609. }
  610. storeNode := ctx.DAG.NewStore()
  611. storeNode.Env().ToEnvDriver()
  612. storeNode.Store(n.FileHashStoreKey, n.FileHashVar())
  613. return true
  614. })
  615. }
  616. // 生成Range指令。StreamRange可能超过文件总大小,但Range指令会在数据量不够时不报错而是正常返回
  617. func generateRange(ctx *ParseContext) {
  618. for i := 0; i < len(ctx.Ft.Toes); i++ {
  619. to := ctx.Ft.Toes[i]
  620. toNode := ctx.ToNodes[to]
  621. toStrIdx := to.GetStreamIndex()
  622. toRng := to.GetRange()
  623. if toStrIdx.IsRaw() {
  624. n := ctx.DAG.NewRange()
  625. toInput := toNode.Input()
  626. *n.Env() = *toInput.Var.From().Node.Env()
  627. rnged := n.RangeStream(toInput.Var, exec.Range{
  628. Offset: toRng.Offset - ctx.StreamRange.Offset,
  629. Length: toRng.Length,
  630. })
  631. toInput.Var.StreamNotTo(toNode, toInput.Index)
  632. toNode.SetInput(rnged)
  633. } else if toStrIdx.IsEC() {
  634. stripSize := int64(ctx.Ft.ECParam.ChunkSize * ctx.Ft.ECParam.K)
  635. blkStartIdx := ctx.StreamRange.Offset / stripSize
  636. blkStart := blkStartIdx * int64(ctx.Ft.ECParam.ChunkSize)
  637. n := ctx.DAG.NewRange()
  638. toInput := toNode.Input()
  639. *n.Env() = *toInput.Var.From().Node.Env()
  640. rnged := n.RangeStream(toInput.Var, exec.Range{
  641. Offset: toRng.Offset - blkStart,
  642. Length: toRng.Length,
  643. })
  644. toInput.Var.StreamNotTo(toNode, toInput.Index)
  645. toNode.SetInput(rnged)
  646. } else if toStrIdx.IsSegment() {
  647. // if frNode, ok := toNode.Input().Var.From().Node.(ops2.FromNode); ok {
  648. // // 目前只有To也是分段时,才可能对接一个提供分段的From,此时不需要再生成Range指令
  649. // if frNode.GetFrom().GetStreamIndex().IsSegment() {
  650. // continue
  651. // }
  652. // }
  653. // segStart := ctx.Ft.SegmentParam.CalcSegmentStart(toStrIdx.Index)
  654. // strStart := segStart + toRng.Offset
  655. // n := ctx.DAG.NewRange()
  656. // toInput := toNode.Input()
  657. // *n.Env() = *toInput.Var.From().Node.Env()
  658. // rnged := n.RangeStream(toInput.Var, exec.Range{
  659. // Offset: strStart - ctx.StreamRange.Offset,
  660. // Length: toRng.Length,
  661. // })
  662. // toInput.Var.StreamNotTo(toNode, toInput.Index)
  663. // toNode.SetInput(rnged)
  664. }
  665. }
  666. }
  667. // 生成Clone指令
  668. func generateClone(ctx *ParseContext) {
  669. ctx.DAG.Walk(func(node dag.Node) bool {
  670. for _, out := range node.OutputStreams().RawArray() {
  671. if out.To().Len() <= 1 {
  672. continue
  673. }
  674. c := ctx.DAG.NewCloneStream()
  675. *c.Env() = *node.Env()
  676. for _, to := range out.To().RawArray() {
  677. c.NewOutput().StreamTo(to.Node, to.SlotIndex)
  678. }
  679. out.To().Resize(0)
  680. c.SetInput(out)
  681. }
  682. for _, out := range node.OutputValues().RawArray() {
  683. if out.To().Len() <= 1 {
  684. continue
  685. }
  686. t := ctx.DAG.NewCloneValue()
  687. *t.Env() = *node.Env()
  688. for _, to := range out.To().RawArray() {
  689. t.NewOutput().ValueTo(to.Node, to.SlotIndex)
  690. }
  691. out.To().Resize(0)
  692. t.SetInput(out)
  693. }
  694. return true
  695. })
  696. }

本项目旨在将云际存储公共基础设施化,使个人及企业可低门槛使用高效的云际存储服务(安装开箱即用云际存储客户端即可,无需关注其他组件的部署),同时支持用户灵活便捷定制云际存储的功能细节。