Browse Source

优化代码

master
Sydonian 5 months ago
parent
commit
a59532e284
5 changed files with 36 additions and 7 deletions
  1. +27
    -3
      pkgs/ioswitch/dag/node.go
  2. +5
    -0
      pkgs/ioswitch/dag/var.go
  3. +2
    -2
      pkgs/ioswitch/plan/compile.go
  4. +1
    -1
      pkgs/ioswitch/plan/ops/store.go
  5. +1
    -1
      pkgs/ioswitch/plan/ops/sync.go

+ 27
- 3
pkgs/ioswitch/dag/node.go View File

@@ -20,24 +20,28 @@ type NodeEnv struct {
Pinned bool // 如果为true,则不应该改变这个节点的执行环境
}

func (e *NodeEnv) ToEnvUnknown() {
func (e *NodeEnv) ToEnvUnknown(pinned bool) {
e.Type = EnvUnknown
e.Worker = nil
e.Pinned = pinned
}

func (e *NodeEnv) ToEnvDriver() {
func (e *NodeEnv) ToEnvDriver(pinned bool) {
e.Type = EnvDriver
e.Worker = nil
e.Pinned = pinned
}

func (e *NodeEnv) ToEnvWorker(worker exec.WorkerInfo) {
func (e *NodeEnv) ToEnvWorker(worker exec.WorkerInfo, pinned bool) {
e.Type = EnvWorker
e.Worker = worker
e.Pinned = pinned
}

func (e *NodeEnv) CopyFrom(other *NodeEnv) {
e.Type = other.Type
e.Worker = other.Worker
e.Pinned = other.Pinned
}

func (e *NodeEnv) Equals(other *NodeEnv) bool {
@@ -461,6 +465,16 @@ func (s StreamOutputSlot) ToSlot(slot StreamInputSlot) {
s.Var().To(slot.Node, slot.Index)
}

// 查询所有输出的连接的输入槽位
func (s StreamOutputSlot) ListDstSlots() []StreamInputSlot {
slots := make([]StreamInputSlot, s.Var().Dst.Len())
myVar := s.Var()
for i, dst := range s.Var().Dst {
slots[i] = StreamInputSlot{Node: dst, Index: dst.InputStreams().IndexOf(myVar)}
}
return slots
}

type StreamInputSlot struct {
Node Node
Index int
@@ -483,6 +497,16 @@ func (s ValueOutputSlot) ToSlot(slot ValueInputSlot) {
s.Var().To(slot.Node, slot.Index)
}

// 查询所有输出的连接的输入槽位
func (s ValueOutputSlot) ListDstSlots() []ValueInputSlot {
slots := make([]ValueInputSlot, s.Var().Dst.Len())
myVar := s.Var()
for i, dst := range s.Var().Dst {
slots[i] = ValueInputSlot{Node: dst, Index: dst.InputValues().IndexOf(myVar)}
}
return slots
}

type ValueInputSlot struct {
Node Node
Index int


+ 5
- 0
pkgs/ioswitch/dag/var.go View File

@@ -64,6 +64,11 @@ func (v *ValueVar) To(to Node, slotIdx int) {
to.InputValues().Slots.Set(slotIdx, v)
}

func (v *ValueVar) ToSlot(slot ValueInputSlot) {
v.Dst.Add(slot.Node)
slot.Node.InputValues().Slots.Set(slot.Index, v)
}

func (v *ValueVar) NotTo(node Node) {
v.Dst.Remove(node)
node.InputValues().Slots.Clear(v)


+ 2
- 2
pkgs/ioswitch/plan/compile.go View File

@@ -42,7 +42,7 @@ func generateSend(graph *ops.GraphNodeBuilder) {
dstNode := out.Dst.Get(0)

getNode := graph.NewGetStream(node.Env().Worker)
getNode.Env().ToEnvDriver()
getNode.Env().ToEnvDriver(true)

// // 同时需要对此变量生成HoldUntil指令,避免Plan结束时Get指令还未到达
holdNode := graph.NewHoldUntil()
@@ -86,7 +86,7 @@ func generateSend(graph *ops.GraphNodeBuilder) {
// // 如果是要送到Driver,则只能由Driver主动去拉取
dstNode := out.Dst.Get(0)
getNode := graph.NewGetValue(node.Env().Worker)
getNode.Env().ToEnvDriver()
getNode.Env().ToEnvDriver(true)

// // 同时需要对此变量生成HoldUntil指令,避免Plan结束时Get指令还未到达
holdNode := graph.NewHoldUntil()


+ 1
- 1
pkgs/ioswitch/plan/ops/store.go View File

@@ -23,7 +23,7 @@ func (o *Store) Execute(ctx *exec.ExecContext, e *exec.Executor) error {
}

func (o *Store) String() string {
return fmt.Sprintf("Store %v: %v", o.Key, o.Var)
return fmt.Sprintf("Store %v as \"%v\"", o.Var, o.Key)
}

type StoreConst struct {


+ 1
- 1
pkgs/ioswitch/plan/ops/sync.go View File

@@ -113,7 +113,7 @@ func (w *HoldUntil) Execute(ctx *exec.ExecContext, e *exec.Executor) error {
}

func (w *HoldUntil) String() string {
return fmt.Sprintf("HoldUntil Waits: %v, (%v) -> (%v)", utils.FormatVarIDs(w.Waits), utils.FormatVarIDs(w.Holds), utils.FormatVarIDs(w.Emits))
return fmt.Sprintf("HoldUntil(waits=%v): %v -> %v", utils.FormatVarIDs(w.Waits), utils.FormatVarIDs(w.Holds), utils.FormatVarIDs(w.Emits))
}

type HangUntil struct {


Loading…
Cancel
Save