diff --git a/client/internal/cmdline/test.go b/client/internal/cmdline/test.go index f826862..0aaa6b9 100644 --- a/client/internal/cmdline/test.go +++ b/client/internal/cmdline/test.go @@ -17,8 +17,8 @@ import ( func init() { rootCmd.AddCommand(&cobra.Command{ - Use: "test2", - Short: "test2", + Use: "test", + Short: "test", Run: func(cmd *cobra.Command, args []string) { coorCli, err := stgglb.CoordinatorMQPool.Acquire() if err != nil { @@ -63,8 +63,8 @@ func init() { }, }) rootCmd.AddCommand(&cobra.Command{ - Use: "test", - Short: "test", + Use: "test3", + Short: "test3", Run: func(cmd *cobra.Command, args []string) { coorCli, err := stgglb.CoordinatorMQPool.Acquire() if err != nil { diff --git a/common/pkgs/ioswitch2/ops2/chunked.go b/common/pkgs/ioswitch2/ops2/chunked.go index 3c75f6f..515033e 100644 --- a/common/pkgs/ioswitch2/ops2/chunked.go +++ b/common/pkgs/ioswitch2/ops2/chunked.go @@ -131,7 +131,7 @@ func (t *ChunkedSplitNode) SplitCount() int { return t.OutputStreams().Len() } -func (t *ChunkedSplitNode) Clear() { +func (t *ChunkedSplitNode) RemoveAllStream() { if t.InputStreams().Len() == 0 { return } diff --git a/common/pkgs/ioswitch2/ops2/segment.go b/common/pkgs/ioswitch2/ops2/segment.go index fa274ea..4d1327d 100644 --- a/common/pkgs/ioswitch2/ops2/segment.go +++ b/common/pkgs/ioswitch2/ops2/segment.go @@ -139,6 +139,20 @@ func (n *SegmentSplitNode) SetInput(input *dag.Var) { input.StreamTo(n, 0) } +func (t *SegmentSplitNode) RemoveAllStream() { + if t.InputStreams().Len() == 0 { + return + } + + t.InputStreams().Get(0).StreamNotTo(t, 0) + t.InputStreams().Resize(0) + + for _, out := range t.OutputStreams().RawArray() { + out.NoInputAllStream() + } + t.OutputStreams().Resize(0) +} + func (n *SegmentSplitNode) Segment(index int) *dag.Var { // 必须连续消耗流 for i := 0; i <= index; i++ { @@ -183,6 +197,13 @@ func (n *SegmentJoinNode) SetInput(index int, input *dag.Var) { input.StreamTo(n, index) } +func (n *SegmentJoinNode) RemoveAllInputs() { + for i, in := range n.InputStreams().RawArray() { + in.StreamNotTo(n, i) + } + n.InputStreams().Resize(0) +} + // 记录本计划中实际要使用的分段的范围,范围外的分段流都会取消输入 func (n *SegmentJoinNode) MarkUsed(start, cnt int) { n.UsedStart = start diff --git a/common/pkgs/ioswitch2/parser/parser.go b/common/pkgs/ioswitch2/parser/parser.go index da357ac..f7462d7 100644 --- a/common/pkgs/ioswitch2/parser/parser.go +++ b/common/pkgs/ioswitch2/parser/parser.go @@ -78,6 +78,15 @@ func Parse(ft ioswitch2.FromTo, blder *exec.PlanBuilder) error { if omitSplitJoin(&ctx) { opted = true } + if removeUnusedSegmentJoin(&ctx) { + opted = true + } + if removeUnusedSegmentSplit(&ctx) { + opted = true + } + if omitSegmentSplitJoin(&ctx) { + opted = true + } if !opted { break @@ -513,12 +522,103 @@ func removeUnusedSegment(ctx *ParseContext) error { return err } +// 删除未使用的SegmentJoin +func removeUnusedSegmentJoin(ctx *ParseContext) bool { + changed := false + + dag.WalkOnlyType[*ops2.SegmentJoinNode](ctx.DAG.Graph, func(node *ops2.SegmentJoinNode) bool { + if node.Joined().To().Len() > 0 { + return true + } + + node.RemoveAllInputs() + ctx.DAG.RemoveNode(node) + return true + }) + + return changed +} + +// 删除未使用的SegmentSplit +func removeUnusedSegmentSplit(ctx *ParseContext) bool { + changed := false + dag.WalkOnlyType[*ops2.SegmentSplitNode](ctx.DAG.Graph, func(typ *ops2.SegmentSplitNode) bool { + // Split出来的每一个流都没有被使用,才能删除这个指令 + for _, out := range typ.OutputStreams().RawArray() { + if out.To().Len() > 0 { + return true + } + } + + typ.RemoveAllStream() + ctx.DAG.RemoveNode(typ) + changed = true + return true + }) + + return changed +} + +// 如果Split的结果被完全用于Join,则省略Split和Join指令 +func omitSegmentSplitJoin(ctx *ParseContext) bool { + changed := false + + dag.WalkOnlyType[*ops2.SegmentSplitNode](ctx.DAG.Graph, func(splitNode *ops2.SegmentSplitNode) bool { + // Split指令的每一个输出都有且只有一个目的地 + var dstNode dag.Node + for _, out := range splitNode.OutputStreams().RawArray() { + if out.To().Len() != 1 { + return true + } + + if dstNode == nil { + dstNode = out.To().Get(0).Node + } else if dstNode != out.To().Get(0).Node { + return true + } + } + + if dstNode == nil { + return true + } + + // 且这个目的地要是一个Join指令 + joinNode, ok := dstNode.(*ops2.SegmentJoinNode) + if !ok { + return true + } + + // 同时这个Join指令的输入也必须全部来自Split指令的输出。 + // 由于上面判断了Split指令的输出目的地都相同,所以这里只要判断Join指令的输入数量是否与Split指令的输出数量相同即可 + if joinNode.InputStreams().Len() != splitNode.OutputStreams().Len() { + return true + } + + // 所有条件都满足,可以开始省略操作,将Join操作的目的地的输入流替换为Split操作的输入流: + // F->Split->Join->T 变换为:F->T + splitInput := splitNode.InputStreams().Get(0) + for _, to := range joinNode.Joined().To().RawArray() { + splitInput.StreamTo(to.Node, to.SlotIndex) + } + splitInput.StreamNotTo(splitNode, 0) + + // 并删除这两个指令 + ctx.DAG.RemoveNode(joinNode) + ctx.DAG.RemoveNode(splitNode) + + changed = true + return true + }) + + return changed +} + // 删除输出流未被使用的Join指令 func removeUnusedJoin(ctx *ParseContext) bool { changed := false dag.WalkOnlyType[*ops2.ChunkedJoinNode](ctx.DAG.Graph, func(node *ops2.ChunkedJoinNode) bool { - if node.InputStreams().Len() > 0 { + if node.Joined().To().Len() > 0 { return true } @@ -572,7 +672,7 @@ func removeUnusedSplit(ctx *ParseContext) bool { } } - typ.Clear() + typ.RemoveAllStream() ctx.DAG.RemoveNode(typ) changed = true return true