Browse Source

增加优化代码

gitlink
Sydonian 11 months ago
parent
commit
3623a73605
4 changed files with 128 additions and 7 deletions
  1. +4
    -4
      client/internal/cmdline/test.go
  2. +1
    -1
      common/pkgs/ioswitch2/ops2/chunked.go
  3. +21
    -0
      common/pkgs/ioswitch2/ops2/segment.go
  4. +102
    -2
      common/pkgs/ioswitch2/parser/parser.go

+ 4
- 4
client/internal/cmdline/test.go View File

@@ -17,8 +17,8 @@ import (

func init() {
rootCmd.AddCommand(&cobra.Command{
Use: "test2",
Short: "test2",
Use: "test",
Short: "test",
Run: func(cmd *cobra.Command, args []string) {
coorCli, err := stgglb.CoordinatorMQPool.Acquire()
if err != nil {
@@ -63,8 +63,8 @@ func init() {
},
})
rootCmd.AddCommand(&cobra.Command{
Use: "test",
Short: "test",
Use: "test3",
Short: "test3",
Run: func(cmd *cobra.Command, args []string) {
coorCli, err := stgglb.CoordinatorMQPool.Acquire()
if err != nil {


+ 1
- 1
common/pkgs/ioswitch2/ops2/chunked.go View File

@@ -131,7 +131,7 @@ func (t *ChunkedSplitNode) SplitCount() int {
return t.OutputStreams().Len()
}

func (t *ChunkedSplitNode) Clear() {
func (t *ChunkedSplitNode) RemoveAllStream() {
if t.InputStreams().Len() == 0 {
return
}


+ 21
- 0
common/pkgs/ioswitch2/ops2/segment.go View File

@@ -139,6 +139,20 @@ func (n *SegmentSplitNode) SetInput(input *dag.Var) {
input.StreamTo(n, 0)
}

func (t *SegmentSplitNode) RemoveAllStream() {
if t.InputStreams().Len() == 0 {
return
}

t.InputStreams().Get(0).StreamNotTo(t, 0)
t.InputStreams().Resize(0)

for _, out := range t.OutputStreams().RawArray() {
out.NoInputAllStream()
}
t.OutputStreams().Resize(0)
}

func (n *SegmentSplitNode) Segment(index int) *dag.Var {
// 必须连续消耗流
for i := 0; i <= index; i++ {
@@ -183,6 +197,13 @@ func (n *SegmentJoinNode) SetInput(index int, input *dag.Var) {
input.StreamTo(n, index)
}

func (n *SegmentJoinNode) RemoveAllInputs() {
for i, in := range n.InputStreams().RawArray() {
in.StreamNotTo(n, i)
}
n.InputStreams().Resize(0)
}

// 记录本计划中实际要使用的分段的范围,范围外的分段流都会取消输入
func (n *SegmentJoinNode) MarkUsed(start, cnt int) {
n.UsedStart = start


+ 102
- 2
common/pkgs/ioswitch2/parser/parser.go View File

@@ -78,6 +78,15 @@ func Parse(ft ioswitch2.FromTo, blder *exec.PlanBuilder) error {
if omitSplitJoin(&ctx) {
opted = true
}
if removeUnusedSegmentJoin(&ctx) {
opted = true
}
if removeUnusedSegmentSplit(&ctx) {
opted = true
}
if omitSegmentSplitJoin(&ctx) {
opted = true
}

if !opted {
break
@@ -513,12 +522,103 @@ func removeUnusedSegment(ctx *ParseContext) error {
return err
}

// 删除未使用的SegmentJoin
func removeUnusedSegmentJoin(ctx *ParseContext) bool {
changed := false

dag.WalkOnlyType[*ops2.SegmentJoinNode](ctx.DAG.Graph, func(node *ops2.SegmentJoinNode) bool {
if node.Joined().To().Len() > 0 {
return true
}

node.RemoveAllInputs()
ctx.DAG.RemoveNode(node)
return true
})

return changed
}

// 删除未使用的SegmentSplit
func removeUnusedSegmentSplit(ctx *ParseContext) bool {
changed := false
dag.WalkOnlyType[*ops2.SegmentSplitNode](ctx.DAG.Graph, func(typ *ops2.SegmentSplitNode) bool {
// Split出来的每一个流都没有被使用,才能删除这个指令
for _, out := range typ.OutputStreams().RawArray() {
if out.To().Len() > 0 {
return true
}
}

typ.RemoveAllStream()
ctx.DAG.RemoveNode(typ)
changed = true
return true
})

return changed
}

// 如果Split的结果被完全用于Join,则省略Split和Join指令
func omitSegmentSplitJoin(ctx *ParseContext) bool {
changed := false

dag.WalkOnlyType[*ops2.SegmentSplitNode](ctx.DAG.Graph, func(splitNode *ops2.SegmentSplitNode) bool {
// Split指令的每一个输出都有且只有一个目的地
var dstNode dag.Node
for _, out := range splitNode.OutputStreams().RawArray() {
if out.To().Len() != 1 {
return true
}

if dstNode == nil {
dstNode = out.To().Get(0).Node
} else if dstNode != out.To().Get(0).Node {
return true
}
}

if dstNode == nil {
return true
}

// 且这个目的地要是一个Join指令
joinNode, ok := dstNode.(*ops2.SegmentJoinNode)
if !ok {
return true
}

// 同时这个Join指令的输入也必须全部来自Split指令的输出。
// 由于上面判断了Split指令的输出目的地都相同,所以这里只要判断Join指令的输入数量是否与Split指令的输出数量相同即可
if joinNode.InputStreams().Len() != splitNode.OutputStreams().Len() {
return true
}

// 所有条件都满足,可以开始省略操作,将Join操作的目的地的输入流替换为Split操作的输入流:
// F->Split->Join->T 变换为:F->T
splitInput := splitNode.InputStreams().Get(0)
for _, to := range joinNode.Joined().To().RawArray() {
splitInput.StreamTo(to.Node, to.SlotIndex)
}
splitInput.StreamNotTo(splitNode, 0)

// 并删除这两个指令
ctx.DAG.RemoveNode(joinNode)
ctx.DAG.RemoveNode(splitNode)

changed = true
return true
})

return changed
}

// 删除输出流未被使用的Join指令
func removeUnusedJoin(ctx *ParseContext) bool {
changed := false

dag.WalkOnlyType[*ops2.ChunkedJoinNode](ctx.DAG.Graph, func(node *ops2.ChunkedJoinNode) bool {
if node.InputStreams().Len() > 0 {
if node.Joined().To().Len() > 0 {
return true
}

@@ -572,7 +672,7 @@ func removeUnusedSplit(ctx *ParseContext) bool {
}
}

typ.Clear()
typ.RemoveAllStream()
ctx.DAG.RemoveNode(typ)
changed = true
return true


Loading…
Cancel
Save