Browse Source

优化ioswitch的代码

gitlink
Sydonian 1 year ago
parent
commit
bb0c70cf5d
13 changed files with 646 additions and 384 deletions
  1. +32
    -18
      pkgs/ioswitch/dag/graph.go
  2. +137
    -20
      pkgs/ioswitch/dag/node.go
  3. +0
    -13
      pkgs/ioswitch/dag/utils.go
  4. +84
    -83
      pkgs/ioswitch/dag/var.go
  5. +46
    -0
      pkgs/ioswitch/exec/config.go
  6. +67
    -57
      pkgs/ioswitch/plan/generate.go
  7. +49
    -19
      pkgs/ioswitch/plan/ops/driver.go
  8. +17
    -8
      pkgs/ioswitch/plan/ops/drop.go
  9. +13
    -0
      pkgs/ioswitch/plan/ops/ops.go
  10. +88
    -62
      pkgs/ioswitch/plan/ops/send.go
  11. +17
    -12
      pkgs/ioswitch/plan/ops/store.go
  12. +36
    -32
      pkgs/ioswitch/plan/ops/sync.go
  13. +60
    -60
      pkgs/ioswitch/plan/ops/utils.go

+ 32
- 18
pkgs/ioswitch/dag/graph.go View File

@@ -5,7 +5,7 @@ import (
) )


type Graph struct { type Graph struct {
Nodes []*Node
Nodes []Node
isWalking bool isWalking bool
nextVarID int nextVarID int
} }
@@ -14,18 +14,12 @@ func NewGraph() *Graph {
return &Graph{} return &Graph{}
} }


func (g *Graph) NewNode(typ NodeType, props any) *Node {
n := &Node{
Type: typ,
Props: props,
Graph: g,
}
typ.InitNode(n)
g.Nodes = append(g.Nodes, n)
return n
func (g *Graph) AddNode(node Node) {
g.Nodes = append(g.Nodes, node)
node.SetGraph(g)
} }


func (g *Graph) RemoveNode(node *Node) {
func (g *Graph) RemoveNode(node Node) {
for i, n := range g.Nodes { for i, n := range g.Nodes {
if n == node { if n == node {
if g.isWalking { if g.isWalking {
@@ -38,7 +32,7 @@ func (g *Graph) RemoveNode(node *Node) {
} }
} }


func (g *Graph) Walk(cb func(node *Node) bool) {
func (g *Graph) Walk(cb func(node Node) bool) {
g.isWalking = true g.isWalking = true
for i := 0; i < len(g.Nodes); i++ { for i := 0; i < len(g.Nodes); i++ {
if g.Nodes[i] == nil { if g.Nodes[i] == nil {
@@ -54,20 +48,40 @@ func (g *Graph) Walk(cb func(node *Node) bool) {
g.Nodes = lo2.RemoveAllDefault(g.Nodes) g.Nodes = lo2.RemoveAllDefault(g.Nodes)
} }


func (g *Graph) NewStreamVar() *StreamVar {
str := &StreamVar{
VarBase: VarBase{
id: g.genVarID(),
},
}
return str
}

func (g *Graph) NewValueVar(valType ValueVarType) *ValueVar {
val := &ValueVar{
VarBase: VarBase{
id: g.genVarID(),
},
Type: valType,
}
return val
}

func (g *Graph) genVarID() int { func (g *Graph) genVarID() int {
g.nextVarID++ g.nextVarID++
return g.nextVarID return g.nextVarID
} }


func NewNode[N NodeType](graph *Graph, typ N, props any) (*Node, N) {
return graph.NewNode(typ, props), typ
func AddNode[N Node](graph *Graph, typ N) N {
graph.AddNode(typ)
return typ
} }


func WalkOnlyType[N NodeType](g *Graph, cb func(node *Node, typ N) bool) {
g.Walk(func(node *Node) bool {
typ, ok := node.Type.(N)
func WalkOnlyType[N Node](g *Graph, cb func(node N) bool) {
g.Walk(func(n Node) bool {
node, ok := n.(N)
if ok { if ok {
return cb(node, typ)
return cb(node)
} }
return true return true
}) })


+ 137
- 20
pkgs/ioswitch/dag/node.go View File

@@ -1,17 +1,10 @@
package dag package dag


import ( import (
"fmt"

"gitlink.org.cn/cloudream/common/pkgs/ioswitch/exec" "gitlink.org.cn/cloudream/common/pkgs/ioswitch/exec"
"gitlink.org.cn/cloudream/common/utils/lo2"
) )


type NodeType interface {
InitNode(node *Node)
String(node *Node) string
GenerateOp(node *Node) (exec.Op, error)
}

type NodeEnvType string type NodeEnvType string


const ( const (
@@ -41,7 +34,7 @@ func (e *NodeEnv) ToEnvWorker(worker exec.WorkerInfo) {
e.Worker = worker e.Worker = worker
} }


func (e *NodeEnv) Equals(other NodeEnv) bool {
func (e *NodeEnv) Equals(other *NodeEnv) bool {
if e.Type != other.Type { if e.Type != other.Type {
return false return false
} }
@@ -53,17 +46,141 @@ func (e *NodeEnv) Equals(other NodeEnv) bool {
return e.Worker.Equals(other.Worker) return e.Worker.Equals(other.Worker)
} }


type Node struct {
Type NodeType
Env NodeEnv
Props any
InputStreams []*StreamVar
OutputStreams []*StreamVar
InputValues []*ValueVar
OutputValues []*ValueVar
Graph *Graph
type Node interface {
Graph() *Graph
SetGraph(graph *Graph)
Env() *NodeEnv
InputStreams() *InputSlots[*StreamVar]
OutputStreams() *OutputSlots[*StreamVar]
InputValues() *InputSlots[*ValueVar]
OutputValues() *OutputSlots[*ValueVar]
GenerateOp() (exec.Op, error)
// String() string
}

type VarSlots[T Var] []T

func (s *VarSlots[T]) Len() int {
return len(*s)
}

func (s *VarSlots[T]) Get(idx int) T {
return (*s)[idx]
}

func (s *VarSlots[T]) Set(idx int, val T) T {
old := (*s)[idx]
(*s)[idx] = val
return old
}

func (s *VarSlots[T]) Append(val T) int {
*s = append(*s, val)
return s.Len() - 1
}

func (s *VarSlots[T]) RemoveAt(idx int) {
(*s) = lo2.RemoveAt(*s, idx)
}

func (s *VarSlots[T]) Resize(size int) {
if s.Len() < size {
*s = append(*s, make([]T, size-s.Len())...)
} else if s.Len() > size {
*s = (*s)[:size]
}
}

func (s *VarSlots[T]) SetRawArray(arr []T) {
*s = arr
}

func (s *VarSlots[T]) RawArray() []T {
return *s
}

type InputSlots[T Var] struct {
VarSlots[T]
}

func (s *InputSlots[T]) EnsureSize(cnt int) {
if s.Len() < cnt {
s.VarSlots = append(s.VarSlots, make([]T, cnt-s.Len())...)
}
}

func (s *InputSlots[T]) EnlargeOne() int {
var t T
s.Append(t)
return s.Len() - 1
}

type OutputSlots[T Var] struct {
VarSlots[T]
}

func (s *OutputSlots[T]) Setup(my Node, v T, slotIdx int) {
if s.Len() <= slotIdx {
s.VarSlots = append(s.VarSlots, make([]T, slotIdx-s.Len()+1)...)
}

s.Set(slotIdx, v)
*v.From() = EndPoint{
Node: my,
SlotIndex: slotIdx,
}
}

func (s *OutputSlots[T]) SetupNew(my Node, v T) {
s.Append(v)
*v.From() = EndPoint{
Node: my,
SlotIndex: s.Len() - 1,
}
}

type Slot[T Var] struct {
Var T
Index int
}

type StreamSlot = Slot[*StreamVar]

type ValueSlot = Slot[*ValueVar]

type NodeBase struct {
env NodeEnv
inputStreams InputSlots[*StreamVar]
outputStreams OutputSlots[*StreamVar]
inputValues InputSlots[*ValueVar]
outputValues OutputSlots[*ValueVar]
graph *Graph
}

func (n *NodeBase) Graph() *Graph {
return n.graph
}

func (n *NodeBase) SetGraph(graph *Graph) {
n.graph = graph
}

func (n *NodeBase) Env() *NodeEnv {
return &n.env
}

func (n *NodeBase) InputStreams() *InputSlots[*StreamVar] {
return &n.inputStreams
}

func (n *NodeBase) OutputStreams() *OutputSlots[*StreamVar] {
return &n.outputStreams
}

func (n *NodeBase) InputValues() *InputSlots[*ValueVar] {
return &n.inputValues
} }


func (n *Node) String() string {
return fmt.Sprintf("%v", n.Type.String(n))
func (n *NodeBase) OutputValues() *OutputSlots[*ValueVar] {
return &n.outputValues
} }

+ 0
- 13
pkgs/ioswitch/dag/utils.go View File

@@ -1,13 +0,0 @@
package dag

func NProps[T any](n *Node) T {
return n.Props.(T)
}

func SProps[T any](str *StreamVar) T {
return str.Props.(T)
}

func VProps[T any](v *ValueVar) T {
return v.Props.(T)
}

+ 84
- 83
pkgs/ioswitch/dag/var.go View File

@@ -5,80 +5,95 @@ import (
"gitlink.org.cn/cloudream/common/utils/lo2" "gitlink.org.cn/cloudream/common/utils/lo2"
) )


type Var interface {
ID() int
From() *EndPoint
To() *EndPointSlots
}

type EndPoint struct { type EndPoint struct {
Node *Node
Node Node
SlotIndex int // 所连接的Node的Output或Input数组的索引 SlotIndex int // 所连接的Node的Output或Input数组的索引
} }


type StreamVar struct {
ID int
From EndPoint
Toes []EndPoint
Props any
Var *exec.StreamVar
}

func (v *StreamVar) To(to *Node, slotIdx int) int {
v.Toes = append(v.Toes, EndPoint{Node: to, SlotIndex: slotIdx})
to.InputStreams[slotIdx] = v
return len(v.Toes) - 1
}

// func (v *StreamVar) NotTo(toIdx int) EndPoint {
// ed := v.Toes[toIdx]
// lo2.RemoveAt(v.Toes, toIdx)
// ed.Node.InputStreams[ed.SlotIndex] = nil
// return ed
// }

func (v *StreamVar) NotTo(node *Node) (EndPoint, bool) {
for i, ed := range v.Toes {
if ed.Node == node {
v.Toes = lo2.RemoveAt(v.Toes, i)
ed.Node.InputStreams[ed.SlotIndex] = nil
return ed, true
}
}
type EndPointSlots []EndPoint

func (s *EndPointSlots) Len() int {
return len(*s)
}

func (s *EndPointSlots) Get(idx int) *EndPoint {
return &(*s)[idx]
}


return EndPoint{}, false
func (s *EndPointSlots) Add(ed EndPoint) int {
(*s) = append((*s), ed)
return len(*s) - 1
} }


func (v *StreamVar) NotToWhere(pred func(to EndPoint) bool) []EndPoint {
var newToes []EndPoint
var rmed []EndPoint
for _, ed := range v.Toes {
if pred(ed) {
ed.Node.InputStreams[ed.SlotIndex] = nil
rmed = append(rmed, ed)
} else {
newToes = append(newToes, ed)
func (s *EndPointSlots) Remove(ed EndPoint) {
for i, e := range *s {
if e == ed {
(*s) = lo2.RemoveAt((*s), i)
return
} }
} }
v.Toes = newToes
return rmed
} }


func (v *StreamVar) NotToAll() []EndPoint {
for _, ed := range v.Toes {
ed.Node.InputStreams[ed.SlotIndex] = nil
}
toes := v.Toes
v.Toes = nil
return toes
func (s *EndPointSlots) RemoveAt(idx int) {
lo2.RemoveAt((*s), idx)
} }


func NodeNewOutputStream(node *Node, props any) *StreamVar {
str := &StreamVar{
ID: node.Graph.genVarID(),
From: EndPoint{Node: node, SlotIndex: len(node.OutputStreams)},
Props: props,
func (s *EndPointSlots) Resize(size int) {
if s.Len() < size {
(*s) = append((*s), make([]EndPoint, size-s.Len())...)
} else if s.Len() > size {
(*s) = (*s)[:size]
} }
node.OutputStreams = append(node.OutputStreams, str)
return str
} }


func NodeDeclareInputStream(node *Node, cnt int) {
node.InputStreams = make([]*StreamVar, cnt)
func (s *EndPointSlots) RawArray() []EndPoint {
return *s
}

type VarBase struct {
id int
from EndPoint
to EndPointSlots
}

func (v *VarBase) ID() int {
return v.id
}

func (v *VarBase) From() *EndPoint {
return &v.from
}

func (v *VarBase) To() *EndPointSlots {
return &v.to
}

type StreamVar struct {
VarBase
Var *exec.StreamVar
}

func (v *StreamVar) Connect(to Node, slotIdx int) {
v.To().Add(EndPoint{Node: to, SlotIndex: slotIdx})
to.InputStreams().Set(slotIdx, v)
}

func (v *StreamVar) Disconnect(node Node, slotIdx int) {
v.to.Remove(EndPoint{Node: node, SlotIndex: slotIdx})
node.InputStreams().Set(slotIdx, nil)
}

func (v *StreamVar) DisconnectAll() {
for _, ed := range v.to {
ed.Node.InputStreams().Set(ed.SlotIndex, nil)
}
v.to = nil
} }


type ValueVarType int type ValueVarType int
@@ -90,31 +105,17 @@ const (
) )


type ValueVar struct { type ValueVar struct {
ID int
Type ValueVarType
From EndPoint
Toes []EndPoint
Props any
Var exec.Var
}

func (v *ValueVar) To(to *Node, slotIdx int) int {
v.Toes = append(v.Toes, EndPoint{Node: to, SlotIndex: slotIdx})
to.InputValues[slotIdx] = v
return len(v.Toes) - 1
}

func NodeNewOutputValue(node *Node, typ ValueVarType, props any) *ValueVar {
val := &ValueVar{
ID: node.Graph.genVarID(),
Type: typ,
From: EndPoint{Node: node, SlotIndex: len(node.OutputStreams)},
Props: props,
}
node.OutputValues = append(node.OutputValues, val)
return val
VarBase
Type ValueVarType
Var exec.Var
}

func (v *ValueVar) Connect(to Node, slotIdx int) {
v.To().Add(EndPoint{Node: to, SlotIndex: slotIdx})
to.InputValues().Set(slotIdx, v)
} }


func NodeDeclareInputValue(node *Node, cnt int) {
node.InputValues = make([]*ValueVar, cnt)
func (v *ValueVar) Disconnect(node Node, slotIdx int) {
v.to.Remove(EndPoint{Node: node, SlotIndex: slotIdx})
node.InputValues().Set(slotIdx, nil)
} }

+ 46
- 0
pkgs/ioswitch/exec/config.go View File

@@ -0,0 +1,46 @@
package exec

import (
"gitlink.org.cn/cloudream/common/pkgs/types"
"gitlink.org.cn/cloudream/common/utils/reflect2"
"gitlink.org.cn/cloudream/common/utils/serder/json"
)

type ConfigBuilder struct {
unions []*types.AnyTypeUnion
opUnion types.TypeUnion[Op]
workerInfoType reflect2.Type
}

func (c *ConfigBuilder) UseUnion(u *types.AnyTypeUnion) *ConfigBuilder {
c.unions = append(c.unions, u)
return c
}

func (c *ConfigBuilder) UseOpType(nilValue Op) *ConfigBuilder {
c.opUnion.Add(reflect2.TypeOfValue(nilValue))
return c
}

func (c *ConfigBuilder) UseWorkerInfoType(nilValue WorkerInfo) *ConfigBuilder {
c.workerInfoType = reflect2.TypeOfValue(nilValue)
return c
}

func (c *ConfigBuilder) Build() Config {
b := json.New().UseUnionExternallyTagged(c.opUnion.ToAny())
for _, u := range c.unions {
b.UseUnionExternallyTagged(u)
}

// b.UseExtension(&workerInfoJSONExt{workerInfoType: c.workerInfoType})

ser := b.Build()
return Config{
Serder: ser,
}
}

type Config struct {
Serder json.Serder
}

+ 67
- 57
pkgs/ioswitch/plan/generate.go View File

@@ -9,103 +9,105 @@ import (
) )


func Generate(graph *dag.Graph, planBld *exec.PlanBuilder) error { func Generate(graph *dag.Graph, planBld *exec.PlanBuilder) error {
generateSend(graph)
myGraph := &ops.GraphNodeBuilder{graph}
generateSend(myGraph)
return buildPlan(graph, planBld) return buildPlan(graph, planBld)
} }


// 生成Send指令 // 生成Send指令
func generateSend(graph *dag.Graph) {
graph.Walk(func(node *dag.Node) bool {
switch node.Type.(type) {
case *ops.SendStreamType:
func generateSend(graph *ops.GraphNodeBuilder) {
graph.Walk(func(node dag.Node) bool {
switch node.(type) {
case *ops.SendStreamNode:
return true return true
case *ops.SendVarType:
case *ops.SendValueNode:
return true return true
case *ops.GetStreamType:
case *ops.GetStreamNode:
return true return true
case *ops.GetVaType:
case *ops.GetValueNode:
return true return true
case *ops.HoldUntilType:
case *ops.HoldUntilNode:
return true return true
} }


for _, out := range node.OutputStreams {
to := out.Toes[0]
if to.Node.Env.Equals(node.Env) {
for i := 0; i < node.OutputStreams().Len(); i++ {
out := node.OutputStreams().Get(i)
to := out.To().Get(0)
if to.Node.Env().Equals(node.Env()) {
continue continue
} }


switch to.Node.Env.Type {
switch to.Node.Env().Type {
case dag.EnvDriver: case dag.EnvDriver:

// // 如果是要送到Driver,则只能由Driver主动去拉取 // // 如果是要送到Driver,则只能由Driver主动去拉取
getNode, getType := dag.NewNode(graph, &ops.GetStreamType{
FromWorker: node.Env.Worker,
}, nil)
getNode.Env.ToEnvDriver()
getNode := graph.NewGetStream(node.Env().Worker)
getNode.Env().ToEnvDriver()


// // 同时需要对此变量生成HoldUntil指令,避免Plan结束时Get指令还未到达 // // 同时需要对此变量生成HoldUntil指令,避免Plan结束时Get指令还未到达
holdNode, holdType := dag.NewNode(graph, &ops.HoldUntilType{}, nil)
holdNode.Env = node.Env
holdType := graph.NewHoldUntil() //dag.NewNode(graph, &ops.HoldUntilNode{}, nil)
*holdType.Env() = *node.Env()


// 将Get指令的信号送到Hold指令 // 将Get指令的信号送到Hold指令
holdType.Signal(holdNode, getType.SignalVar(getNode))
holdType.SetSignal(getNode.SignalVar())


out.Toes = nil
out.To().RemoveAt(0)


// 将源节点的输出送到Hold指令,将Hold指令的输出送到Get指令 // 将源节点的输出送到Hold指令,将Hold指令的输出送到Get指令
getType.Get(getNode, holdType.HoldStream(holdNode, out)).
getNode.Get(holdType.HoldStream(out)).
// 将Get指令的输出送到目的地 // 将Get指令的输出送到目的地
To(to.Node, to.SlotIndex)
Connect(to.Node, to.SlotIndex)


case dag.EnvWorker: case dag.EnvWorker:
// 如果是要送到Agent,则可以直接发送 // 如果是要送到Agent,则可以直接发送
n, t := dag.NewNode(graph, &ops.SendStreamType{
ToWorker: to.Node.Env.Worker,
}, nil)
n.Env = node.Env
n := graph.NewSendStream(to.Node.Env().Worker)
*n.Env() = *node.Env()


out.Toes = nil
t.Send(n, out).To(to.Node, to.SlotIndex)
out.To().RemoveAt(0)
n.Send(out).Connect(to.Node, to.SlotIndex)
} }
} }


for _, out := range node.OutputValues {
to := out.Toes[0]
if to.Node.Env.Equals(node.Env) {
for i := 0; i < node.OutputValues().Len(); i++ {
out := node.OutputValues().Get(i)
// 允许Value变量不被使用
if out.To().Len() == 0 {
continue continue
} }


switch to.Node.Env.Type {
to := out.To().Get(0)
if to.Node.Env().Equals(node.Env()) {
continue
}

switch to.Node.Env().Type {
case dag.EnvDriver: case dag.EnvDriver:
// // 如果是要送到Driver,则只能由Driver主动去拉取 // // 如果是要送到Driver,则只能由Driver主动去拉取
getNode, getType := dag.NewNode(graph, &ops.GetVaType{
FromWorker: node.Env.Worker,
}, nil)
getNode.Env.ToEnvDriver()
getNode := graph.NewGetValue(node.Env().Worker)
getNode.Env().ToEnvDriver()


// // 同时需要对此变量生成HoldUntil指令,避免Plan结束时Get指令还未到达 // // 同时需要对此变量生成HoldUntil指令,避免Plan结束时Get指令还未到达
holdNode, holdType := dag.NewNode(graph, &ops.HoldUntilType{}, nil)
holdNode.Env = node.Env
holdNode := graph.NewHoldUntil()
*holdNode.Env() = *node.Env()


// 将Get指令的信号送到Hold指令 // 将Get指令的信号送到Hold指令
holdType.Signal(holdNode, getType.SignalVar(getNode))
holdNode.SetSignal(getNode.SignalVar())


out.Toes = nil
out.To().RemoveAt(0)


// 将源节点的输出送到Hold指令,将Hold指令的输出送到Get指令 // 将源节点的输出送到Hold指令,将Hold指令的输出送到Get指令
getType.Get(getNode, holdType.HoldVar(holdNode, out)).
getNode.Get(holdNode.HoldVar(out)).
// 将Get指令的输出送到目的地 // 将Get指令的输出送到目的地
To(to.Node, to.SlotIndex)
Connect(to.Node, to.SlotIndex)


case dag.EnvWorker: case dag.EnvWorker:
// 如果是要送到Agent,则可以直接发送 // 如果是要送到Agent,则可以直接发送
n, t := dag.NewNode(graph, &ops.SendVarType{
ToWorker: to.Node.Env.Worker,
}, nil)
n.Env = node.Env
t := graph.NewSendValue(to.Node.Env().Worker)
*t.Env() = *node.Env()
out.To().RemoveAt(0)


out.Toes = nil
t.Send(n, out).To(to.Node, to.SlotIndex)
t.Send(out).Connect(to.Node, to.SlotIndex)
} }
} }


@@ -116,8 +118,10 @@ func generateSend(graph *dag.Graph) {
// 生成Plan // 生成Plan
func buildPlan(graph *dag.Graph, blder *exec.PlanBuilder) error { func buildPlan(graph *dag.Graph, blder *exec.PlanBuilder) error {
var retErr error var retErr error
graph.Walk(func(node *dag.Node) bool {
for _, out := range node.OutputStreams {
graph.Walk(func(node dag.Node) bool {
for i := 0; i < node.OutputStreams().Len(); i++ {
out := node.OutputStreams().Get(i)

if out.Var != nil { if out.Var != nil {
continue continue
} }
@@ -125,7 +129,9 @@ func buildPlan(graph *dag.Graph, blder *exec.PlanBuilder) error {
out.Var = blder.NewStreamVar() out.Var = blder.NewStreamVar()
} }


for _, in := range node.InputStreams {
for i := 0; i < node.InputStreams().Len(); i++ {
in := node.InputStreams().Get(i)

if in.Var != nil { if in.Var != nil {
continue continue
} }
@@ -133,7 +139,9 @@ func buildPlan(graph *dag.Graph, blder *exec.PlanBuilder) error {
in.Var = blder.NewStreamVar() in.Var = blder.NewStreamVar()
} }


for _, out := range node.OutputValues {
for i := 0; i < node.OutputValues().Len(); i++ {
out := node.OutputValues().Get(i)

if out.Var != nil { if out.Var != nil {
continue continue
} }
@@ -149,7 +157,9 @@ func buildPlan(graph *dag.Graph, blder *exec.PlanBuilder) error {
} }
} }


for _, in := range node.InputValues {
for i := 0; i < node.InputValues().Len(); i++ {
in := node.InputValues().Get(i)

if in.Var != nil { if in.Var != nil {
continue continue
} }
@@ -165,7 +175,7 @@ func buildPlan(graph *dag.Graph, blder *exec.PlanBuilder) error {
} }
} }


op, err := node.Type.GenerateOp(node)
op, err := node.GenerateOp()
if err != nil { if err != nil {
retErr = err retErr = err
return false return false
@@ -176,11 +186,11 @@ func buildPlan(graph *dag.Graph, blder *exec.PlanBuilder) error {
return true return true
} }


switch node.Env.Type {
switch node.Env().Type {
case dag.EnvDriver: case dag.EnvDriver:
blder.AtDriver().AddOp(op) blder.AtDriver().AddOp(op)
case dag.EnvWorker: case dag.EnvWorker:
blder.AtWorker(node.Env.Worker).AddOp(op)
blder.AtWorker(node.Env().Worker).AddOp(op)
} }


return true return true


+ 49
- 19
pkgs/ioswitch/plan/ops/driver.go View File

@@ -1,44 +1,74 @@
package ops package ops


import ( import (
"fmt"

"gitlink.org.cn/cloudream/common/pkgs/ioswitch/dag" "gitlink.org.cn/cloudream/common/pkgs/ioswitch/dag"
"gitlink.org.cn/cloudream/common/pkgs/ioswitch/exec" "gitlink.org.cn/cloudream/common/pkgs/ioswitch/exec"
"gitlink.org.cn/cloudream/storage/common/pkgs/ioswitch2"
) )


type FromDriverType struct {
type FromDriverNode struct {
dag.NodeBase
Handle *exec.DriverWriteStream Handle *exec.DriverWriteStream
} }


func (t *FromDriverType) InitNode(node *dag.Node) {
dag.NodeNewOutputStream(node, &ioswitch2.VarProps{})
func (b *GraphNodeBuilder) NewFromDriver(handle *exec.DriverWriteStream) *FromDriverNode {
node := &FromDriverNode{
Handle: handle,
}
b.AddNode(node)

node.OutputStreams().SetupNew(node, b.NewStreamVar())

return node
}

func (t *FromDriverNode) Output() dag.StreamSlot {
return dag.StreamSlot{
Var: t.OutputStreams().Get(0),
Index: 0,
}
} }


func (t *FromDriverType) GenerateOp(op *dag.Node) (exec.Op, error) {
t.Handle.Var = op.OutputStreams[0].Var
func (t *FromDriverNode) GenerateOp() (exec.Op, error) {
t.Handle.Var = t.OutputStreams().Get(0).Var
return nil, nil return nil, nil
} }


func (t *FromDriverType) String(node *dag.Node) string {
return fmt.Sprintf("FromDriver[]%v%v", formatStreamIO(node), formatValueIO(node))
}
// func (t *FromDriverType) String() string {
// return fmt.Sprintf("FromDriver[]%v%v", formatStreamIO(node), formatValueIO(node))
// }


type ToDriverType struct {
type ToDriverNode struct {
dag.NodeBase
Handle *exec.DriverReadStream Handle *exec.DriverReadStream
Range exec.Range Range exec.Range
} }


func (t *ToDriverType) InitNode(node *dag.Node) {
dag.NodeDeclareInputStream(node, 1)
func (b *GraphNodeBuilder) NewToDriver(handle *exec.DriverReadStream) *ToDriverNode {
node := &ToDriverNode{
Handle: handle,
}
b.AddNode(node)

return node
} }


func (t *ToDriverType) GenerateOp(op *dag.Node) (exec.Op, error) {
t.Handle.Var = op.InputStreams[0].Var
return nil, nil
func (t *ToDriverNode) SetInput(v *dag.StreamVar) {
t.InputStreams().EnsureSize(1)
v.Connect(t, 0)
}

func (t *ToDriverNode) Input() dag.StreamSlot {
return dag.StreamSlot{
Var: t.InputStreams().Get(0),
Index: 0,
}
} }


func (t *ToDriverType) String(node *dag.Node) string {
return fmt.Sprintf("ToDriver[%v+%v]%v%v", t.Range.Offset, t.Range.Length, formatStreamIO(node), formatValueIO(node))
func (t *ToDriverNode) GenerateOp() (exec.Op, error) {
t.Handle.Var = t.InputStreams().Get(0).Var
return nil, nil
} }

// func (t *ToDriverType) String() string {
// return fmt.Sprintf("ToDriver[%v+%v]%v%v", t.Range.Offset, t.Range.Length, formatStreamIO(node), formatValueIO(node))
// }

+ 17
- 8
pkgs/ioswitch/plan/ops/drop.go View File

@@ -40,18 +40,27 @@ func (o *DropStream) String() string {
return fmt.Sprintf("DropStream %v", o.Input.ID) return fmt.Sprintf("DropStream %v", o.Input.ID)
} }


type DropType struct{}
type DropNode struct {
dag.NodeBase
}

func (b *GraphNodeBuilder) NewDropStream() *DropNode {
node := &DropNode{}
b.AddNode(node)
return node
}


func (t *DropType) InitNode(node *dag.Node) {
dag.NodeDeclareInputStream(node, 1)
func (t *DropNode) SetInput(v *dag.StreamVar) {
t.InputStreams().EnsureSize(1)
v.Connect(t, 0)
} }


func (t *DropType) GenerateOp(op *dag.Node) (exec.Op, error) {
func (t *DropNode) GenerateOp() (exec.Op, error) {
return &DropStream{ return &DropStream{
Input: op.InputStreams[0].Var,
Input: t.InputStreams().Get(0).Var,
}, nil }, nil
} }


func (t *DropType) String(node *dag.Node) string {
return fmt.Sprintf("Drop[]%v%v", formatStreamIO(node), formatValueIO(node))
}
// func (t *DropType) String(node *dag.Node) string {
// return fmt.Sprintf("Drop[]%v%v", formatStreamIO(node), formatValueIO(node))
// }

+ 13
- 0
pkgs/ioswitch/plan/ops/ops.go View File

@@ -0,0 +1,13 @@
package ops

import "gitlink.org.cn/cloudream/common/pkgs/ioswitch/dag"

type GraphNodeBuilder struct {
*dag.Graph
}

func NewGraphNodeBuilder() *GraphNodeBuilder {
return &GraphNodeBuilder{
Graph: dag.NewGraph(),
}
}

+ 88
- 62
pkgs/ioswitch/plan/ops/send.go View File

@@ -151,120 +151,146 @@ func (o *GetVar) String() string {
return fmt.Sprintf("GetVar %v(S:%v)<-%v@%v", o.Output.GetID(), o.Signal.ID, o.Target.GetID(), o.Worker) return fmt.Sprintf("GetVar %v(S:%v)<-%v@%v", o.Output.GetID(), o.Signal.ID, o.Target.GetID(), o.Worker)
} }


type SendStreamType struct {
type SendStreamNode struct {
dag.NodeBase
ToWorker exec.WorkerInfo ToWorker exec.WorkerInfo
} }


func (t *SendStreamType) Send(n *dag.Node, v *dag.StreamVar) *dag.StreamVar {
v.To(n, 0)
return n.OutputStreams[0]
func (b *GraphNodeBuilder) NewSendStream(to exec.WorkerInfo) *SendStreamNode {
node := &SendStreamNode{
ToWorker: to,
}
b.AddNode(node)
return node
} }


func (t *SendStreamType) InitNode(node *dag.Node) {
dag.NodeDeclareInputStream(node, 1)
dag.NodeNewOutputStream(node, nil)
func (t *SendStreamNode) Send(v *dag.StreamVar) *dag.StreamVar {
t.InputStreams().EnsureSize(1)
v.Connect(t, 0)
output := t.Graph().NewStreamVar()
t.OutputStreams().Setup(t, output, 0)
return output
} }


func (t *SendStreamType) GenerateOp(op *dag.Node) (exec.Op, error) {
func (t *SendStreamNode) GenerateOp() (exec.Op, error) {
return &SendStream{ return &SendStream{
Input: op.InputStreams[0].Var,
Send: op.OutputStreams[0].Var,
Input: t.InputStreams().Get(0).Var,
Send: t.OutputStreams().Get(0).Var,
Worker: t.ToWorker, Worker: t.ToWorker,
}, nil }, nil
} }


func (t *SendStreamType) String(node *dag.Node) string {
return fmt.Sprintf("SendStream[]%v%v", formatStreamIO(node), formatValueIO(node))
}
// func (t *SendStreamType) String() string {
// return fmt.Sprintf("SendStream[]%v%v", formatStreamIO(node), formatValueIO(node))
// }


type SendVarType struct {
type SendValueNode struct {
dag.NodeBase
ToWorker exec.WorkerInfo ToWorker exec.WorkerInfo
} }


func (t *SendVarType) Send(n *dag.Node, v *dag.ValueVar) *dag.ValueVar {
v.To(n, 0)
n.OutputValues[0].Type = v.Type
return n.OutputValues[0]
func (b *GraphNodeBuilder) NewSendValue(to exec.WorkerInfo) *SendValueNode {
node := &SendValueNode{
ToWorker: to,
}
b.AddNode(node)
return node
} }


func (t *SendVarType) InitNode(node *dag.Node) {
dag.NodeDeclareInputValue(node, 1)
dag.NodeNewOutputValue(node, 0, nil)
func (t *SendValueNode) Send(v *dag.ValueVar) *dag.ValueVar {
t.InputValues().EnsureSize(1)
v.Connect(t, 0)
output := t.Graph().NewValueVar(v.Type)
t.OutputValues().Setup(t, output, 0)
return output
} }


func (t *SendVarType) GenerateOp(op *dag.Node) (exec.Op, error) {
func (t *SendValueNode) GenerateOp() (exec.Op, error) {
return &SendVar{ return &SendVar{
Input: op.InputValues[0].Var,
Send: op.OutputValues[0].Var,
Input: t.InputValues().Get(0).Var,
Send: t.OutputValues().Get(0).Var,
Worker: t.ToWorker, Worker: t.ToWorker,
}, nil }, nil
} }


func (t *SendVarType) String(node *dag.Node) string {
return fmt.Sprintf("SendVar[]%v%v", formatStreamIO(node), formatValueIO(node))
}
// func (t *SendVarType) String() string {
// return fmt.Sprintf("SendVar[]%v%v", formatStreamIO(node), formatValueIO(node))
// }


type GetStreamType struct {
type GetStreamNode struct {
dag.NodeBase
FromWorker exec.WorkerInfo FromWorker exec.WorkerInfo
} }


func (t *GetStreamType) Get(n *dag.Node, v *dag.StreamVar) *dag.StreamVar {
v.To(n, 0)
return n.OutputStreams[0]
func (b *GraphNodeBuilder) NewGetStream(from exec.WorkerInfo) *GetStreamNode {
node := &GetStreamNode{
FromWorker: from,
}
b.AddNode(node)
node.OutputValues().Setup(node, node.Graph().NewValueVar(dag.SignalValueVar), 0)
return node
} }


func (t *GetStreamType) SignalVar(n *dag.Node) *dag.ValueVar {
return n.OutputValues[0]
func (t *GetStreamNode) Get(v *dag.StreamVar) *dag.StreamVar {
t.InputStreams().EnsureSize(1)
v.Connect(t, 0)
output := t.Graph().NewStreamVar()
t.OutputStreams().Setup(t, output, 0)
return output
} }


func (t *GetStreamType) InitNode(node *dag.Node) {
dag.NodeDeclareInputStream(node, 1)
dag.NodeNewOutputValue(node, dag.SignalValueVar, nil)
dag.NodeNewOutputStream(node, nil)
func (t *GetStreamNode) SignalVar() *dag.ValueVar {
return t.OutputValues().Get(0)
} }


func (t *GetStreamType) GenerateOp(op *dag.Node) (exec.Op, error) {
func (t *GetStreamNode) GenerateOp() (exec.Op, error) {
return &GetStream{ return &GetStream{
Signal: op.OutputValues[0].Var.(*exec.SignalVar),
Output: op.OutputStreams[0].Var,
Target: op.InputStreams[0].Var,
Signal: t.OutputValues().Get(0).Var.(*exec.SignalVar),
Output: t.OutputStreams().Get(0).Var,
Target: t.InputStreams().Get(0).Var,
Worker: t.FromWorker, Worker: t.FromWorker,
}, nil }, nil
} }


func (t *GetStreamType) String(node *dag.Node) string {
return fmt.Sprintf("GetStream[]%v%v", formatStreamIO(node), formatValueIO(node))
}
// func (t *GetStreamType) String() string {
// return fmt.Sprintf("GetStream[]%v%v", formatStreamIO(node), formatValueIO(node))
// }


type GetVaType struct {
type GetValueNode struct {
dag.NodeBase
FromWorker exec.WorkerInfo FromWorker exec.WorkerInfo
} }


func (t *GetVaType) Get(n *dag.Node, v *dag.ValueVar) *dag.ValueVar {
v.To(n, 0)
n.OutputValues[1].Type = v.Type
return n.OutputValues[1]
func (b *GraphNodeBuilder) NewGetValue(from exec.WorkerInfo) *GetValueNode {
node := &GetValueNode{
FromWorker: from,
}
b.AddNode(node)
node.OutputValues().Setup(node, node.Graph().NewValueVar(dag.SignalValueVar), 0)
return node
} }


func (t *GetVaType) SignalVar(n *dag.Node) *dag.ValueVar {
return n.OutputValues[0]
func (t *GetValueNode) Get(v *dag.ValueVar) *dag.ValueVar {
t.InputValues().EnsureSize(1)
v.Connect(t, 0)
output := t.Graph().NewValueVar(v.Type)
t.OutputValues().Setup(t, output, 1)
return output
} }


func (t *GetVaType) InitNode(node *dag.Node) {
dag.NodeDeclareInputValue(node, 1)
dag.NodeNewOutputValue(node, dag.SignalValueVar, nil)
dag.NodeNewOutputValue(node, 0, nil)
func (t *GetValueNode) SignalVar() *dag.ValueVar {
return t.OutputValues().Get(0)
} }


func (t *GetVaType) GenerateOp(op *dag.Node) (exec.Op, error) {
func (t *GetValueNode) GenerateOp() (exec.Op, error) {
return &GetVar{ return &GetVar{
Signal: op.OutputValues[0].Var.(*exec.SignalVar),
Output: op.OutputValues[1].Var,
Target: op.InputValues[0].Var,
Signal: t.OutputValues().Get(0).Var.(*exec.SignalVar),
Output: t.OutputValues().Get(1).Var,
Target: t.InputValues().Get(0).Var,
Worker: t.FromWorker, Worker: t.FromWorker,
}, nil }, nil
} }


func (t *GetVaType) String(node *dag.Node) string {
return fmt.Sprintf("GetVar[]%v%v", formatStreamIO(node), formatValueIO(node))
}
// func (t *GetVaType) String() string {
// return fmt.Sprintf("GetVar[]%v%v", formatStreamIO(node), formatValueIO(node))
// }

+ 17
- 12
pkgs/ioswitch/plan/ops/store.go View File

@@ -33,25 +33,30 @@ func (o *Store) String() string {
return fmt.Sprintf("Store %v: %v", o.Key, o.Var.GetID()) return fmt.Sprintf("Store %v: %v", o.Key, o.Var.GetID())
} }


type StoreType struct {
StoreKey string
type StoreNode struct {
dag.NodeBase
Key string
} }


func (t *StoreType) Store(node *dag.Node, v *dag.ValueVar) {
v.To(node, 0)
func (b *GraphNodeBuilder) NewStore() *StoreNode {
node := &StoreNode{}
b.AddNode(node)
return node
} }


func (t *StoreType) InitNode(node *dag.Node) {
dag.NodeDeclareInputValue(node, 1)
func (t *StoreNode) Store(key string, v *dag.ValueVar) {
t.Key = key
t.InputValues().EnsureSize(1)
v.Connect(t, 0)
} }


func (t *StoreType) GenerateOp(op *dag.Node) (exec.Op, error) {
func (t *StoreNode) GenerateOp() (exec.Op, error) {
return &Store{ return &Store{
Var: op.InputValues[0].Var,
Key: t.StoreKey,
Var: t.InputValues().Get(0).Var,
Key: t.Key,
}, nil }, nil
} }


func (t *StoreType) String(node *dag.Node) string {
return fmt.Sprintf("Store[%s]%v%v", t.StoreKey, formatStreamIO(node), formatValueIO(node))
}
// func (t *StoreType) String() string {
// return fmt.Sprintf("Store[%s]%v%v", t.StoreKey, formatStreamIO(node), formatValueIO(node))
// }

+ 36
- 32
pkgs/ioswitch/plan/ops/sync.go View File

@@ -163,49 +163,53 @@ func (b *Broadcast) String() string {
return "Broadcast" return "Broadcast"
} }


type HoldUntilType struct {
type HoldUntilNode struct {
dag.NodeBase
} }


func (t *HoldUntilType) InitNode(node *dag.Node) {
dag.NodeDeclareInputValue(node, 1)
func (b *GraphNodeBuilder) NewHoldUntil() *HoldUntilNode {
node := &HoldUntilNode{}
b.AddNode(node)
return node
} }


func (t *HoldUntilType) GenerateOp(op *dag.Node) (exec.Op, error) {
o := &HoldUntil{
Waits: []*exec.SignalVar{op.InputValues[0].Var.(*exec.SignalVar)},
}

for i := 0; i < len(op.OutputValues); i++ {
o.Holds = append(o.Holds, op.InputValues[i+1].Var)
o.Emits = append(o.Emits, op.OutputValues[i].Var)
}

for i := 0; i < len(op.OutputStreams); i++ {
o.Holds = append(o.Holds, op.InputStreams[i].Var)
o.Emits = append(o.Emits, op.OutputStreams[i].Var)
}
func (t *HoldUntilNode) SetSignal(s *dag.ValueVar) {
t.InputValues().EnsureSize(1)
s.Connect(t, 0)
}


return o, nil
func (t *HoldUntilNode) HoldStream(str *dag.StreamVar) *dag.StreamVar {
str.Connect(t, t.InputStreams().EnlargeOne())
output := t.Graph().NewStreamVar()
t.OutputStreams().SetupNew(t, output)
return output
} }


func (t *HoldUntilType) Signal(n *dag.Node, s *dag.ValueVar) {
s.To(n, 0)
func (t *HoldUntilNode) HoldVar(v *dag.ValueVar) *dag.ValueVar {
v.Connect(t, t.InputValues().EnlargeOne())
output := t.Graph().NewValueVar(v.Type)
t.OutputValues().SetupNew(t, output)
return output
} }


func (t *HoldUntilType) HoldStream(n *dag.Node, str *dag.StreamVar) *dag.StreamVar {
n.InputStreams = append(n.InputStreams, nil)
str.To(n, len(n.InputStreams)-1)
func (t *HoldUntilNode) GenerateOp() (exec.Op, error) {
o := &HoldUntil{
Waits: []*exec.SignalVar{t.InputValues().Get(0).Var.(*exec.SignalVar)},
}


return dag.NodeNewOutputStream(n, nil)
}
for i := 0; i < t.OutputValues().Len(); i++ {
o.Holds = append(o.Holds, t.InputValues().Get(i+1).Var)
o.Emits = append(o.Emits, t.OutputValues().Get(i).Var)
}


func (t *HoldUntilType) HoldVar(n *dag.Node, v *dag.ValueVar) *dag.ValueVar {
n.InputValues = append(n.InputValues, nil)
v.To(n, len(n.InputValues)-1)
for i := 0; i < t.OutputStreams().Len(); i++ {
o.Holds = append(o.Holds, t.InputStreams().Get(i).Var)
o.Emits = append(o.Emits, t.OutputStreams().Get(i).Var)
}


return dag.NodeNewOutputValue(n, v.Type, nil)
return o, nil
} }


func (t *HoldUntilType) String(node *dag.Node) string {
return fmt.Sprintf("HoldUntil[]%v%v", formatStreamIO(node), formatValueIO(node))
}
// func (t *HoldUntilType) String() string {
// return fmt.Sprintf("HoldUntil[]%v%v", formatStreamIO(node), formatValueIO(node))
// }

+ 60
- 60
pkgs/ioswitch/plan/ops/utils.go View File

@@ -1,75 +1,75 @@
package ops package ops


import (
"fmt"
// import (
// "fmt"


"gitlink.org.cn/cloudream/common/pkgs/ioswitch/dag"
)
// "gitlink.org.cn/cloudream/common/pkgs/ioswitch/dag"
// )


func formatStreamIO(node *dag.Node) string {
is := ""
for i, in := range node.InputStreams {
if i > 0 {
is += ","
}
// func formatStreamIO(node *dag.Node) string {
// is := ""
// for i, in := range node.InputStreams {
// if i > 0 {
// is += ","
// }


if in == nil {
is += "."
} else {
is += fmt.Sprintf("%v", in.ID)
}
}
// if in == nil {
// is += "."
// } else {
// is += fmt.Sprintf("%v", in.ID)
// }
// }


os := ""
for i, out := range node.OutputStreams {
if i > 0 {
os += ","
}
// os := ""
// for i, out := range node.OutputStreams {
// if i > 0 {
// os += ","
// }


if out == nil {
os += "."
} else {
os += fmt.Sprintf("%v", out.ID)
}
}
// if out == nil {
// os += "."
// } else {
// os += fmt.Sprintf("%v", out.ID)
// }
// }


if is == "" && os == "" {
return ""
}
// if is == "" && os == "" {
// return ""
// }


return fmt.Sprintf("S{%s>%s}", is, os)
}
// return fmt.Sprintf("S{%s>%s}", is, os)
// }


func formatValueIO(node *dag.Node) string {
is := ""
for i, in := range node.InputValues {
if i > 0 {
is += ","
}
// func formatValueIO(node *dag.Node) string {
// is := ""
// for i, in := range node.InputValues {
// if i > 0 {
// is += ","
// }


if in == nil {
is += "."
} else {
is += fmt.Sprintf("%v", in.ID)
}
}
// if in == nil {
// is += "."
// } else {
// is += fmt.Sprintf("%v", in.ID)
// }
// }


os := ""
for i, out := range node.OutputValues {
if i > 0 {
os += ","
}
// os := ""
// for i, out := range node.OutputValues {
// if i > 0 {
// os += ","
// }


if out == nil {
os += "."
} else {
os += fmt.Sprintf("%v", out.ID)
}
}
// if out == nil {
// os += "."
// } else {
// os += fmt.Sprintf("%v", out.ID)
// }
// }


if is == "" && os == "" {
return ""
}
// if is == "" && os == "" {
// return ""
// }


return fmt.Sprintf("V{%s>%s}", is, os)
}
// return fmt.Sprintf("V{%s>%s}", is, os)
// }

Loading…
Cancel
Save