From bb0c70cf5dc8c4c6e9bcb986f381ba36e02d33d6 Mon Sep 17 00:00:00 2001 From: Sydonian <794346190@qq.com> Date: Mon, 2 Sep 2024 11:20:40 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BC=98=E5=8C=96ioswitch=E7=9A=84=E4=BB=A3?= =?UTF-8?q?=E7=A0=81?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- pkgs/ioswitch/dag/graph.go | 50 +++++---- pkgs/ioswitch/dag/node.go | 157 +++++++++++++++++++++++++---- pkgs/ioswitch/dag/utils.go | 13 --- pkgs/ioswitch/dag/var.go | 167 ++++++++++++++++--------------- pkgs/ioswitch/exec/config.go | 46 +++++++++ pkgs/ioswitch/plan/generate.go | 124 ++++++++++++----------- pkgs/ioswitch/plan/ops/driver.go | 68 +++++++++---- pkgs/ioswitch/plan/ops/drop.go | 25 +++-- pkgs/ioswitch/plan/ops/ops.go | 13 +++ pkgs/ioswitch/plan/ops/send.go | 150 +++++++++++++++------------ pkgs/ioswitch/plan/ops/store.go | 29 +++--- pkgs/ioswitch/plan/ops/sync.go | 68 +++++++------ pkgs/ioswitch/plan/ops/utils.go | 120 +++++++++++----------- 13 files changed, 646 insertions(+), 384 deletions(-) delete mode 100644 pkgs/ioswitch/dag/utils.go create mode 100644 pkgs/ioswitch/exec/config.go create mode 100644 pkgs/ioswitch/plan/ops/ops.go diff --git a/pkgs/ioswitch/dag/graph.go b/pkgs/ioswitch/dag/graph.go index 995ebf2..5456250 100644 --- a/pkgs/ioswitch/dag/graph.go +++ b/pkgs/ioswitch/dag/graph.go @@ -5,7 +5,7 @@ import ( ) type Graph struct { - Nodes []*Node + Nodes []Node isWalking bool nextVarID int } @@ -14,18 +14,12 @@ func NewGraph() *Graph { return &Graph{} } -func (g *Graph) NewNode(typ NodeType, props any) *Node { - n := &Node{ - Type: typ, - Props: props, - Graph: g, - } - typ.InitNode(n) - g.Nodes = append(g.Nodes, n) - return n +func (g *Graph) AddNode(node Node) { + g.Nodes = append(g.Nodes, node) + node.SetGraph(g) } -func (g *Graph) RemoveNode(node *Node) { +func (g *Graph) RemoveNode(node Node) { for i, n := range g.Nodes { if n == node { if g.isWalking { @@ -38,7 +32,7 @@ func (g *Graph) RemoveNode(node *Node) { } } -func (g *Graph) Walk(cb func(node *Node) bool) { +func (g *Graph) Walk(cb func(node Node) bool) { g.isWalking = true for i := 0; i < len(g.Nodes); i++ { if g.Nodes[i] == nil { @@ -54,20 +48,40 @@ func (g *Graph) Walk(cb func(node *Node) bool) { g.Nodes = lo2.RemoveAllDefault(g.Nodes) } +func (g *Graph) NewStreamVar() *StreamVar { + str := &StreamVar{ + VarBase: VarBase{ + id: g.genVarID(), + }, + } + return str +} + +func (g *Graph) NewValueVar(valType ValueVarType) *ValueVar { + val := &ValueVar{ + VarBase: VarBase{ + id: g.genVarID(), + }, + Type: valType, + } + return val +} + func (g *Graph) genVarID() int { g.nextVarID++ return g.nextVarID } -func NewNode[N NodeType](graph *Graph, typ N, props any) (*Node, N) { - return graph.NewNode(typ, props), typ +func AddNode[N Node](graph *Graph, typ N) N { + graph.AddNode(typ) + return typ } -func WalkOnlyType[N NodeType](g *Graph, cb func(node *Node, typ N) bool) { - g.Walk(func(node *Node) bool { - typ, ok := node.Type.(N) +func WalkOnlyType[N Node](g *Graph, cb func(node N) bool) { + g.Walk(func(n Node) bool { + node, ok := n.(N) if ok { - return cb(node, typ) + return cb(node) } return true }) diff --git a/pkgs/ioswitch/dag/node.go b/pkgs/ioswitch/dag/node.go index 6f24072..c547011 100644 --- a/pkgs/ioswitch/dag/node.go +++ b/pkgs/ioswitch/dag/node.go @@ -1,17 +1,10 @@ package dag import ( - "fmt" - "gitlink.org.cn/cloudream/common/pkgs/ioswitch/exec" + "gitlink.org.cn/cloudream/common/utils/lo2" ) -type NodeType interface { - InitNode(node *Node) - String(node *Node) string - GenerateOp(node *Node) (exec.Op, error) -} - type NodeEnvType string const ( @@ -41,7 +34,7 @@ func (e *NodeEnv) ToEnvWorker(worker exec.WorkerInfo) { e.Worker = worker } -func (e *NodeEnv) Equals(other NodeEnv) bool { +func (e *NodeEnv) Equals(other *NodeEnv) bool { if e.Type != other.Type { return false } @@ -53,17 +46,141 @@ func (e *NodeEnv) Equals(other NodeEnv) bool { return e.Worker.Equals(other.Worker) } -type Node struct { - Type NodeType - Env NodeEnv - Props any - InputStreams []*StreamVar - OutputStreams []*StreamVar - InputValues []*ValueVar - OutputValues []*ValueVar - Graph *Graph +type Node interface { + Graph() *Graph + SetGraph(graph *Graph) + Env() *NodeEnv + InputStreams() *InputSlots[*StreamVar] + OutputStreams() *OutputSlots[*StreamVar] + InputValues() *InputSlots[*ValueVar] + OutputValues() *OutputSlots[*ValueVar] + GenerateOp() (exec.Op, error) + // String() string +} + +type VarSlots[T Var] []T + +func (s *VarSlots[T]) Len() int { + return len(*s) +} + +func (s *VarSlots[T]) Get(idx int) T { + return (*s)[idx] +} + +func (s *VarSlots[T]) Set(idx int, val T) T { + old := (*s)[idx] + (*s)[idx] = val + return old +} + +func (s *VarSlots[T]) Append(val T) int { + *s = append(*s, val) + return s.Len() - 1 +} + +func (s *VarSlots[T]) RemoveAt(idx int) { + (*s) = lo2.RemoveAt(*s, idx) +} + +func (s *VarSlots[T]) Resize(size int) { + if s.Len() < size { + *s = append(*s, make([]T, size-s.Len())...) + } else if s.Len() > size { + *s = (*s)[:size] + } +} + +func (s *VarSlots[T]) SetRawArray(arr []T) { + *s = arr +} + +func (s *VarSlots[T]) RawArray() []T { + return *s +} + +type InputSlots[T Var] struct { + VarSlots[T] +} + +func (s *InputSlots[T]) EnsureSize(cnt int) { + if s.Len() < cnt { + s.VarSlots = append(s.VarSlots, make([]T, cnt-s.Len())...) + } +} + +func (s *InputSlots[T]) EnlargeOne() int { + var t T + s.Append(t) + return s.Len() - 1 +} + +type OutputSlots[T Var] struct { + VarSlots[T] +} + +func (s *OutputSlots[T]) Setup(my Node, v T, slotIdx int) { + if s.Len() <= slotIdx { + s.VarSlots = append(s.VarSlots, make([]T, slotIdx-s.Len()+1)...) + } + + s.Set(slotIdx, v) + *v.From() = EndPoint{ + Node: my, + SlotIndex: slotIdx, + } +} + +func (s *OutputSlots[T]) SetupNew(my Node, v T) { + s.Append(v) + *v.From() = EndPoint{ + Node: my, + SlotIndex: s.Len() - 1, + } +} + +type Slot[T Var] struct { + Var T + Index int +} + +type StreamSlot = Slot[*StreamVar] + +type ValueSlot = Slot[*ValueVar] + +type NodeBase struct { + env NodeEnv + inputStreams InputSlots[*StreamVar] + outputStreams OutputSlots[*StreamVar] + inputValues InputSlots[*ValueVar] + outputValues OutputSlots[*ValueVar] + graph *Graph +} + +func (n *NodeBase) Graph() *Graph { + return n.graph +} + +func (n *NodeBase) SetGraph(graph *Graph) { + n.graph = graph +} + +func (n *NodeBase) Env() *NodeEnv { + return &n.env +} + +func (n *NodeBase) InputStreams() *InputSlots[*StreamVar] { + return &n.inputStreams +} + +func (n *NodeBase) OutputStreams() *OutputSlots[*StreamVar] { + return &n.outputStreams +} + +func (n *NodeBase) InputValues() *InputSlots[*ValueVar] { + return &n.inputValues } -func (n *Node) String() string { - return fmt.Sprintf("%v", n.Type.String(n)) +func (n *NodeBase) OutputValues() *OutputSlots[*ValueVar] { + return &n.outputValues } diff --git a/pkgs/ioswitch/dag/utils.go b/pkgs/ioswitch/dag/utils.go deleted file mode 100644 index cdee0ae..0000000 --- a/pkgs/ioswitch/dag/utils.go +++ /dev/null @@ -1,13 +0,0 @@ -package dag - -func NProps[T any](n *Node) T { - return n.Props.(T) -} - -func SProps[T any](str *StreamVar) T { - return str.Props.(T) -} - -func VProps[T any](v *ValueVar) T { - return v.Props.(T) -} diff --git a/pkgs/ioswitch/dag/var.go b/pkgs/ioswitch/dag/var.go index f10ea82..325bdb1 100644 --- a/pkgs/ioswitch/dag/var.go +++ b/pkgs/ioswitch/dag/var.go @@ -5,80 +5,95 @@ import ( "gitlink.org.cn/cloudream/common/utils/lo2" ) +type Var interface { + ID() int + From() *EndPoint + To() *EndPointSlots +} + type EndPoint struct { - Node *Node + Node Node SlotIndex int // 所连接的Node的Output或Input数组的索引 } -type StreamVar struct { - ID int - From EndPoint - Toes []EndPoint - Props any - Var *exec.StreamVar -} - -func (v *StreamVar) To(to *Node, slotIdx int) int { - v.Toes = append(v.Toes, EndPoint{Node: to, SlotIndex: slotIdx}) - to.InputStreams[slotIdx] = v - return len(v.Toes) - 1 -} - -// func (v *StreamVar) NotTo(toIdx int) EndPoint { -// ed := v.Toes[toIdx] -// lo2.RemoveAt(v.Toes, toIdx) -// ed.Node.InputStreams[ed.SlotIndex] = nil -// return ed -// } - -func (v *StreamVar) NotTo(node *Node) (EndPoint, bool) { - for i, ed := range v.Toes { - if ed.Node == node { - v.Toes = lo2.RemoveAt(v.Toes, i) - ed.Node.InputStreams[ed.SlotIndex] = nil - return ed, true - } - } +type EndPointSlots []EndPoint + +func (s *EndPointSlots) Len() int { + return len(*s) +} + +func (s *EndPointSlots) Get(idx int) *EndPoint { + return &(*s)[idx] +} - return EndPoint{}, false +func (s *EndPointSlots) Add(ed EndPoint) int { + (*s) = append((*s), ed) + return len(*s) - 1 } -func (v *StreamVar) NotToWhere(pred func(to EndPoint) bool) []EndPoint { - var newToes []EndPoint - var rmed []EndPoint - for _, ed := range v.Toes { - if pred(ed) { - ed.Node.InputStreams[ed.SlotIndex] = nil - rmed = append(rmed, ed) - } else { - newToes = append(newToes, ed) +func (s *EndPointSlots) Remove(ed EndPoint) { + for i, e := range *s { + if e == ed { + (*s) = lo2.RemoveAt((*s), i) + return } } - v.Toes = newToes - return rmed } -func (v *StreamVar) NotToAll() []EndPoint { - for _, ed := range v.Toes { - ed.Node.InputStreams[ed.SlotIndex] = nil - } - toes := v.Toes - v.Toes = nil - return toes +func (s *EndPointSlots) RemoveAt(idx int) { + lo2.RemoveAt((*s), idx) } -func NodeNewOutputStream(node *Node, props any) *StreamVar { - str := &StreamVar{ - ID: node.Graph.genVarID(), - From: EndPoint{Node: node, SlotIndex: len(node.OutputStreams)}, - Props: props, +func (s *EndPointSlots) Resize(size int) { + if s.Len() < size { + (*s) = append((*s), make([]EndPoint, size-s.Len())...) + } else if s.Len() > size { + (*s) = (*s)[:size] } - node.OutputStreams = append(node.OutputStreams, str) - return str } -func NodeDeclareInputStream(node *Node, cnt int) { - node.InputStreams = make([]*StreamVar, cnt) +func (s *EndPointSlots) RawArray() []EndPoint { + return *s +} + +type VarBase struct { + id int + from EndPoint + to EndPointSlots +} + +func (v *VarBase) ID() int { + return v.id +} + +func (v *VarBase) From() *EndPoint { + return &v.from +} + +func (v *VarBase) To() *EndPointSlots { + return &v.to +} + +type StreamVar struct { + VarBase + Var *exec.StreamVar +} + +func (v *StreamVar) Connect(to Node, slotIdx int) { + v.To().Add(EndPoint{Node: to, SlotIndex: slotIdx}) + to.InputStreams().Set(slotIdx, v) +} + +func (v *StreamVar) Disconnect(node Node, slotIdx int) { + v.to.Remove(EndPoint{Node: node, SlotIndex: slotIdx}) + node.InputStreams().Set(slotIdx, nil) +} + +func (v *StreamVar) DisconnectAll() { + for _, ed := range v.to { + ed.Node.InputStreams().Set(ed.SlotIndex, nil) + } + v.to = nil } type ValueVarType int @@ -90,31 +105,17 @@ const ( ) type ValueVar struct { - ID int - Type ValueVarType - From EndPoint - Toes []EndPoint - Props any - Var exec.Var -} - -func (v *ValueVar) To(to *Node, slotIdx int) int { - v.Toes = append(v.Toes, EndPoint{Node: to, SlotIndex: slotIdx}) - to.InputValues[slotIdx] = v - return len(v.Toes) - 1 -} - -func NodeNewOutputValue(node *Node, typ ValueVarType, props any) *ValueVar { - val := &ValueVar{ - ID: node.Graph.genVarID(), - Type: typ, - From: EndPoint{Node: node, SlotIndex: len(node.OutputStreams)}, - Props: props, - } - node.OutputValues = append(node.OutputValues, val) - return val + VarBase + Type ValueVarType + Var exec.Var +} + +func (v *ValueVar) Connect(to Node, slotIdx int) { + v.To().Add(EndPoint{Node: to, SlotIndex: slotIdx}) + to.InputValues().Set(slotIdx, v) } -func NodeDeclareInputValue(node *Node, cnt int) { - node.InputValues = make([]*ValueVar, cnt) +func (v *ValueVar) Disconnect(node Node, slotIdx int) { + v.to.Remove(EndPoint{Node: node, SlotIndex: slotIdx}) + node.InputValues().Set(slotIdx, nil) } diff --git a/pkgs/ioswitch/exec/config.go b/pkgs/ioswitch/exec/config.go new file mode 100644 index 0000000..0e15df5 --- /dev/null +++ b/pkgs/ioswitch/exec/config.go @@ -0,0 +1,46 @@ +package exec + +import ( + "gitlink.org.cn/cloudream/common/pkgs/types" + "gitlink.org.cn/cloudream/common/utils/reflect2" + "gitlink.org.cn/cloudream/common/utils/serder/json" +) + +type ConfigBuilder struct { + unions []*types.AnyTypeUnion + opUnion types.TypeUnion[Op] + workerInfoType reflect2.Type +} + +func (c *ConfigBuilder) UseUnion(u *types.AnyTypeUnion) *ConfigBuilder { + c.unions = append(c.unions, u) + return c +} + +func (c *ConfigBuilder) UseOpType(nilValue Op) *ConfigBuilder { + c.opUnion.Add(reflect2.TypeOfValue(nilValue)) + return c +} + +func (c *ConfigBuilder) UseWorkerInfoType(nilValue WorkerInfo) *ConfigBuilder { + c.workerInfoType = reflect2.TypeOfValue(nilValue) + return c +} + +func (c *ConfigBuilder) Build() Config { + b := json.New().UseUnionExternallyTagged(c.opUnion.ToAny()) + for _, u := range c.unions { + b.UseUnionExternallyTagged(u) + } + + // b.UseExtension(&workerInfoJSONExt{workerInfoType: c.workerInfoType}) + + ser := b.Build() + return Config{ + Serder: ser, + } +} + +type Config struct { + Serder json.Serder +} diff --git a/pkgs/ioswitch/plan/generate.go b/pkgs/ioswitch/plan/generate.go index 2e94477..e11324f 100644 --- a/pkgs/ioswitch/plan/generate.go +++ b/pkgs/ioswitch/plan/generate.go @@ -9,103 +9,105 @@ import ( ) func Generate(graph *dag.Graph, planBld *exec.PlanBuilder) error { - generateSend(graph) + myGraph := &ops.GraphNodeBuilder{graph} + generateSend(myGraph) return buildPlan(graph, planBld) } // 生成Send指令 -func generateSend(graph *dag.Graph) { - graph.Walk(func(node *dag.Node) bool { - switch node.Type.(type) { - case *ops.SendStreamType: +func generateSend(graph *ops.GraphNodeBuilder) { + graph.Walk(func(node dag.Node) bool { + switch node.(type) { + case *ops.SendStreamNode: return true - case *ops.SendVarType: + case *ops.SendValueNode: return true - case *ops.GetStreamType: + case *ops.GetStreamNode: return true - case *ops.GetVaType: + case *ops.GetValueNode: return true - case *ops.HoldUntilType: + case *ops.HoldUntilNode: return true } - for _, out := range node.OutputStreams { - to := out.Toes[0] - if to.Node.Env.Equals(node.Env) { + for i := 0; i < node.OutputStreams().Len(); i++ { + out := node.OutputStreams().Get(i) + to := out.To().Get(0) + if to.Node.Env().Equals(node.Env()) { continue } - switch to.Node.Env.Type { + switch to.Node.Env().Type { case dag.EnvDriver: + // // 如果是要送到Driver,则只能由Driver主动去拉取 - getNode, getType := dag.NewNode(graph, &ops.GetStreamType{ - FromWorker: node.Env.Worker, - }, nil) - getNode.Env.ToEnvDriver() + getNode := graph.NewGetStream(node.Env().Worker) + getNode.Env().ToEnvDriver() // // 同时需要对此变量生成HoldUntil指令,避免Plan结束时Get指令还未到达 - holdNode, holdType := dag.NewNode(graph, &ops.HoldUntilType{}, nil) - holdNode.Env = node.Env + holdType := graph.NewHoldUntil() //dag.NewNode(graph, &ops.HoldUntilNode{}, nil) + *holdType.Env() = *node.Env() // 将Get指令的信号送到Hold指令 - holdType.Signal(holdNode, getType.SignalVar(getNode)) + holdType.SetSignal(getNode.SignalVar()) - out.Toes = nil + out.To().RemoveAt(0) // 将源节点的输出送到Hold指令,将Hold指令的输出送到Get指令 - getType.Get(getNode, holdType.HoldStream(holdNode, out)). + getNode.Get(holdType.HoldStream(out)). // 将Get指令的输出送到目的地 - To(to.Node, to.SlotIndex) + Connect(to.Node, to.SlotIndex) case dag.EnvWorker: // 如果是要送到Agent,则可以直接发送 - n, t := dag.NewNode(graph, &ops.SendStreamType{ - ToWorker: to.Node.Env.Worker, - }, nil) - n.Env = node.Env + n := graph.NewSendStream(to.Node.Env().Worker) + *n.Env() = *node.Env() - out.Toes = nil - t.Send(n, out).To(to.Node, to.SlotIndex) + out.To().RemoveAt(0) + n.Send(out).Connect(to.Node, to.SlotIndex) } } - for _, out := range node.OutputValues { - to := out.Toes[0] - if to.Node.Env.Equals(node.Env) { + for i := 0; i < node.OutputValues().Len(); i++ { + out := node.OutputValues().Get(i) + // 允许Value变量不被使用 + if out.To().Len() == 0 { continue } - switch to.Node.Env.Type { + to := out.To().Get(0) + if to.Node.Env().Equals(node.Env()) { + continue + } + + switch to.Node.Env().Type { case dag.EnvDriver: // // 如果是要送到Driver,则只能由Driver主动去拉取 - getNode, getType := dag.NewNode(graph, &ops.GetVaType{ - FromWorker: node.Env.Worker, - }, nil) - getNode.Env.ToEnvDriver() + getNode := graph.NewGetValue(node.Env().Worker) + getNode.Env().ToEnvDriver() // // 同时需要对此变量生成HoldUntil指令,避免Plan结束时Get指令还未到达 - holdNode, holdType := dag.NewNode(graph, &ops.HoldUntilType{}, nil) - holdNode.Env = node.Env + holdNode := graph.NewHoldUntil() + *holdNode.Env() = *node.Env() // 将Get指令的信号送到Hold指令 - holdType.Signal(holdNode, getType.SignalVar(getNode)) + holdNode.SetSignal(getNode.SignalVar()) - out.Toes = nil + out.To().RemoveAt(0) // 将源节点的输出送到Hold指令,将Hold指令的输出送到Get指令 - getType.Get(getNode, holdType.HoldVar(holdNode, out)). + getNode.Get(holdNode.HoldVar(out)). // 将Get指令的输出送到目的地 - To(to.Node, to.SlotIndex) + Connect(to.Node, to.SlotIndex) case dag.EnvWorker: // 如果是要送到Agent,则可以直接发送 - n, t := dag.NewNode(graph, &ops.SendVarType{ - ToWorker: to.Node.Env.Worker, - }, nil) - n.Env = node.Env + t := graph.NewSendValue(to.Node.Env().Worker) + *t.Env() = *node.Env() + + out.To().RemoveAt(0) - out.Toes = nil - t.Send(n, out).To(to.Node, to.SlotIndex) + t.Send(out).Connect(to.Node, to.SlotIndex) } } @@ -116,8 +118,10 @@ func generateSend(graph *dag.Graph) { // 生成Plan func buildPlan(graph *dag.Graph, blder *exec.PlanBuilder) error { var retErr error - graph.Walk(func(node *dag.Node) bool { - for _, out := range node.OutputStreams { + graph.Walk(func(node dag.Node) bool { + for i := 0; i < node.OutputStreams().Len(); i++ { + out := node.OutputStreams().Get(i) + if out.Var != nil { continue } @@ -125,7 +129,9 @@ func buildPlan(graph *dag.Graph, blder *exec.PlanBuilder) error { out.Var = blder.NewStreamVar() } - for _, in := range node.InputStreams { + for i := 0; i < node.InputStreams().Len(); i++ { + in := node.InputStreams().Get(i) + if in.Var != nil { continue } @@ -133,7 +139,9 @@ func buildPlan(graph *dag.Graph, blder *exec.PlanBuilder) error { in.Var = blder.NewStreamVar() } - for _, out := range node.OutputValues { + for i := 0; i < node.OutputValues().Len(); i++ { + out := node.OutputValues().Get(i) + if out.Var != nil { continue } @@ -149,7 +157,9 @@ func buildPlan(graph *dag.Graph, blder *exec.PlanBuilder) error { } } - for _, in := range node.InputValues { + for i := 0; i < node.InputValues().Len(); i++ { + in := node.InputValues().Get(i) + if in.Var != nil { continue } @@ -165,7 +175,7 @@ func buildPlan(graph *dag.Graph, blder *exec.PlanBuilder) error { } } - op, err := node.Type.GenerateOp(node) + op, err := node.GenerateOp() if err != nil { retErr = err return false @@ -176,11 +186,11 @@ func buildPlan(graph *dag.Graph, blder *exec.PlanBuilder) error { return true } - switch node.Env.Type { + switch node.Env().Type { case dag.EnvDriver: blder.AtDriver().AddOp(op) case dag.EnvWorker: - blder.AtWorker(node.Env.Worker).AddOp(op) + blder.AtWorker(node.Env().Worker).AddOp(op) } return true diff --git a/pkgs/ioswitch/plan/ops/driver.go b/pkgs/ioswitch/plan/ops/driver.go index 7de285e..0ec9469 100644 --- a/pkgs/ioswitch/plan/ops/driver.go +++ b/pkgs/ioswitch/plan/ops/driver.go @@ -1,44 +1,74 @@ package ops import ( - "fmt" - "gitlink.org.cn/cloudream/common/pkgs/ioswitch/dag" "gitlink.org.cn/cloudream/common/pkgs/ioswitch/exec" - "gitlink.org.cn/cloudream/storage/common/pkgs/ioswitch2" ) -type FromDriverType struct { +type FromDriverNode struct { + dag.NodeBase Handle *exec.DriverWriteStream } -func (t *FromDriverType) InitNode(node *dag.Node) { - dag.NodeNewOutputStream(node, &ioswitch2.VarProps{}) +func (b *GraphNodeBuilder) NewFromDriver(handle *exec.DriverWriteStream) *FromDriverNode { + node := &FromDriverNode{ + Handle: handle, + } + b.AddNode(node) + + node.OutputStreams().SetupNew(node, b.NewStreamVar()) + + return node +} + +func (t *FromDriverNode) Output() dag.StreamSlot { + return dag.StreamSlot{ + Var: t.OutputStreams().Get(0), + Index: 0, + } } -func (t *FromDriverType) GenerateOp(op *dag.Node) (exec.Op, error) { - t.Handle.Var = op.OutputStreams[0].Var +func (t *FromDriverNode) GenerateOp() (exec.Op, error) { + t.Handle.Var = t.OutputStreams().Get(0).Var return nil, nil } -func (t *FromDriverType) String(node *dag.Node) string { - return fmt.Sprintf("FromDriver[]%v%v", formatStreamIO(node), formatValueIO(node)) -} +// func (t *FromDriverType) String() string { +// return fmt.Sprintf("FromDriver[]%v%v", formatStreamIO(node), formatValueIO(node)) +// } -type ToDriverType struct { +type ToDriverNode struct { + dag.NodeBase Handle *exec.DriverReadStream Range exec.Range } -func (t *ToDriverType) InitNode(node *dag.Node) { - dag.NodeDeclareInputStream(node, 1) +func (b *GraphNodeBuilder) NewToDriver(handle *exec.DriverReadStream) *ToDriverNode { + node := &ToDriverNode{ + Handle: handle, + } + b.AddNode(node) + + return node } -func (t *ToDriverType) GenerateOp(op *dag.Node) (exec.Op, error) { - t.Handle.Var = op.InputStreams[0].Var - return nil, nil +func (t *ToDriverNode) SetInput(v *dag.StreamVar) { + t.InputStreams().EnsureSize(1) + v.Connect(t, 0) +} + +func (t *ToDriverNode) Input() dag.StreamSlot { + return dag.StreamSlot{ + Var: t.InputStreams().Get(0), + Index: 0, + } } -func (t *ToDriverType) String(node *dag.Node) string { - return fmt.Sprintf("ToDriver[%v+%v]%v%v", t.Range.Offset, t.Range.Length, formatStreamIO(node), formatValueIO(node)) +func (t *ToDriverNode) GenerateOp() (exec.Op, error) { + t.Handle.Var = t.InputStreams().Get(0).Var + return nil, nil } + +// func (t *ToDriverType) String() string { +// return fmt.Sprintf("ToDriver[%v+%v]%v%v", t.Range.Offset, t.Range.Length, formatStreamIO(node), formatValueIO(node)) +// } diff --git a/pkgs/ioswitch/plan/ops/drop.go b/pkgs/ioswitch/plan/ops/drop.go index b1cbf2e..384ceea 100644 --- a/pkgs/ioswitch/plan/ops/drop.go +++ b/pkgs/ioswitch/plan/ops/drop.go @@ -40,18 +40,27 @@ func (o *DropStream) String() string { return fmt.Sprintf("DropStream %v", o.Input.ID) } -type DropType struct{} +type DropNode struct { + dag.NodeBase +} + +func (b *GraphNodeBuilder) NewDropStream() *DropNode { + node := &DropNode{} + b.AddNode(node) + return node +} -func (t *DropType) InitNode(node *dag.Node) { - dag.NodeDeclareInputStream(node, 1) +func (t *DropNode) SetInput(v *dag.StreamVar) { + t.InputStreams().EnsureSize(1) + v.Connect(t, 0) } -func (t *DropType) GenerateOp(op *dag.Node) (exec.Op, error) { +func (t *DropNode) GenerateOp() (exec.Op, error) { return &DropStream{ - Input: op.InputStreams[0].Var, + Input: t.InputStreams().Get(0).Var, }, nil } -func (t *DropType) String(node *dag.Node) string { - return fmt.Sprintf("Drop[]%v%v", formatStreamIO(node), formatValueIO(node)) -} +// func (t *DropType) String(node *dag.Node) string { +// return fmt.Sprintf("Drop[]%v%v", formatStreamIO(node), formatValueIO(node)) +// } diff --git a/pkgs/ioswitch/plan/ops/ops.go b/pkgs/ioswitch/plan/ops/ops.go new file mode 100644 index 0000000..50f1a1c --- /dev/null +++ b/pkgs/ioswitch/plan/ops/ops.go @@ -0,0 +1,13 @@ +package ops + +import "gitlink.org.cn/cloudream/common/pkgs/ioswitch/dag" + +type GraphNodeBuilder struct { + *dag.Graph +} + +func NewGraphNodeBuilder() *GraphNodeBuilder { + return &GraphNodeBuilder{ + Graph: dag.NewGraph(), + } +} diff --git a/pkgs/ioswitch/plan/ops/send.go b/pkgs/ioswitch/plan/ops/send.go index 08f44fa..ae52139 100644 --- a/pkgs/ioswitch/plan/ops/send.go +++ b/pkgs/ioswitch/plan/ops/send.go @@ -151,120 +151,146 @@ func (o *GetVar) String() string { return fmt.Sprintf("GetVar %v(S:%v)<-%v@%v", o.Output.GetID(), o.Signal.ID, o.Target.GetID(), o.Worker) } -type SendStreamType struct { +type SendStreamNode struct { + dag.NodeBase ToWorker exec.WorkerInfo } -func (t *SendStreamType) Send(n *dag.Node, v *dag.StreamVar) *dag.StreamVar { - v.To(n, 0) - return n.OutputStreams[0] +func (b *GraphNodeBuilder) NewSendStream(to exec.WorkerInfo) *SendStreamNode { + node := &SendStreamNode{ + ToWorker: to, + } + b.AddNode(node) + return node } -func (t *SendStreamType) InitNode(node *dag.Node) { - dag.NodeDeclareInputStream(node, 1) - dag.NodeNewOutputStream(node, nil) +func (t *SendStreamNode) Send(v *dag.StreamVar) *dag.StreamVar { + t.InputStreams().EnsureSize(1) + v.Connect(t, 0) + output := t.Graph().NewStreamVar() + t.OutputStreams().Setup(t, output, 0) + return output } -func (t *SendStreamType) GenerateOp(op *dag.Node) (exec.Op, error) { +func (t *SendStreamNode) GenerateOp() (exec.Op, error) { return &SendStream{ - Input: op.InputStreams[0].Var, - Send: op.OutputStreams[0].Var, + Input: t.InputStreams().Get(0).Var, + Send: t.OutputStreams().Get(0).Var, Worker: t.ToWorker, }, nil } -func (t *SendStreamType) String(node *dag.Node) string { - return fmt.Sprintf("SendStream[]%v%v", formatStreamIO(node), formatValueIO(node)) -} +// func (t *SendStreamType) String() string { +// return fmt.Sprintf("SendStream[]%v%v", formatStreamIO(node), formatValueIO(node)) +// } -type SendVarType struct { +type SendValueNode struct { + dag.NodeBase ToWorker exec.WorkerInfo } -func (t *SendVarType) Send(n *dag.Node, v *dag.ValueVar) *dag.ValueVar { - v.To(n, 0) - n.OutputValues[0].Type = v.Type - return n.OutputValues[0] +func (b *GraphNodeBuilder) NewSendValue(to exec.WorkerInfo) *SendValueNode { + node := &SendValueNode{ + ToWorker: to, + } + b.AddNode(node) + return node } -func (t *SendVarType) InitNode(node *dag.Node) { - dag.NodeDeclareInputValue(node, 1) - dag.NodeNewOutputValue(node, 0, nil) +func (t *SendValueNode) Send(v *dag.ValueVar) *dag.ValueVar { + t.InputValues().EnsureSize(1) + v.Connect(t, 0) + output := t.Graph().NewValueVar(v.Type) + t.OutputValues().Setup(t, output, 0) + return output } -func (t *SendVarType) GenerateOp(op *dag.Node) (exec.Op, error) { +func (t *SendValueNode) GenerateOp() (exec.Op, error) { return &SendVar{ - Input: op.InputValues[0].Var, - Send: op.OutputValues[0].Var, + Input: t.InputValues().Get(0).Var, + Send: t.OutputValues().Get(0).Var, Worker: t.ToWorker, }, nil } -func (t *SendVarType) String(node *dag.Node) string { - return fmt.Sprintf("SendVar[]%v%v", formatStreamIO(node), formatValueIO(node)) -} +// func (t *SendVarType) String() string { +// return fmt.Sprintf("SendVar[]%v%v", formatStreamIO(node), formatValueIO(node)) +// } -type GetStreamType struct { +type GetStreamNode struct { + dag.NodeBase FromWorker exec.WorkerInfo } -func (t *GetStreamType) Get(n *dag.Node, v *dag.StreamVar) *dag.StreamVar { - v.To(n, 0) - return n.OutputStreams[0] +func (b *GraphNodeBuilder) NewGetStream(from exec.WorkerInfo) *GetStreamNode { + node := &GetStreamNode{ + FromWorker: from, + } + b.AddNode(node) + node.OutputValues().Setup(node, node.Graph().NewValueVar(dag.SignalValueVar), 0) + return node } -func (t *GetStreamType) SignalVar(n *dag.Node) *dag.ValueVar { - return n.OutputValues[0] +func (t *GetStreamNode) Get(v *dag.StreamVar) *dag.StreamVar { + t.InputStreams().EnsureSize(1) + v.Connect(t, 0) + output := t.Graph().NewStreamVar() + t.OutputStreams().Setup(t, output, 0) + return output } -func (t *GetStreamType) InitNode(node *dag.Node) { - dag.NodeDeclareInputStream(node, 1) - dag.NodeNewOutputValue(node, dag.SignalValueVar, nil) - dag.NodeNewOutputStream(node, nil) +func (t *GetStreamNode) SignalVar() *dag.ValueVar { + return t.OutputValues().Get(0) } -func (t *GetStreamType) GenerateOp(op *dag.Node) (exec.Op, error) { +func (t *GetStreamNode) GenerateOp() (exec.Op, error) { return &GetStream{ - Signal: op.OutputValues[0].Var.(*exec.SignalVar), - Output: op.OutputStreams[0].Var, - Target: op.InputStreams[0].Var, + Signal: t.OutputValues().Get(0).Var.(*exec.SignalVar), + Output: t.OutputStreams().Get(0).Var, + Target: t.InputStreams().Get(0).Var, Worker: t.FromWorker, }, nil } -func (t *GetStreamType) String(node *dag.Node) string { - return fmt.Sprintf("GetStream[]%v%v", formatStreamIO(node), formatValueIO(node)) -} +// func (t *GetStreamType) String() string { +// return fmt.Sprintf("GetStream[]%v%v", formatStreamIO(node), formatValueIO(node)) +// } -type GetVaType struct { +type GetValueNode struct { + dag.NodeBase FromWorker exec.WorkerInfo } -func (t *GetVaType) Get(n *dag.Node, v *dag.ValueVar) *dag.ValueVar { - v.To(n, 0) - n.OutputValues[1].Type = v.Type - return n.OutputValues[1] +func (b *GraphNodeBuilder) NewGetValue(from exec.WorkerInfo) *GetValueNode { + node := &GetValueNode{ + FromWorker: from, + } + b.AddNode(node) + node.OutputValues().Setup(node, node.Graph().NewValueVar(dag.SignalValueVar), 0) + return node } -func (t *GetVaType) SignalVar(n *dag.Node) *dag.ValueVar { - return n.OutputValues[0] +func (t *GetValueNode) Get(v *dag.ValueVar) *dag.ValueVar { + t.InputValues().EnsureSize(1) + v.Connect(t, 0) + output := t.Graph().NewValueVar(v.Type) + t.OutputValues().Setup(t, output, 1) + return output } -func (t *GetVaType) InitNode(node *dag.Node) { - dag.NodeDeclareInputValue(node, 1) - dag.NodeNewOutputValue(node, dag.SignalValueVar, nil) - dag.NodeNewOutputValue(node, 0, nil) +func (t *GetValueNode) SignalVar() *dag.ValueVar { + return t.OutputValues().Get(0) } -func (t *GetVaType) GenerateOp(op *dag.Node) (exec.Op, error) { +func (t *GetValueNode) GenerateOp() (exec.Op, error) { return &GetVar{ - Signal: op.OutputValues[0].Var.(*exec.SignalVar), - Output: op.OutputValues[1].Var, - Target: op.InputValues[0].Var, + Signal: t.OutputValues().Get(0).Var.(*exec.SignalVar), + Output: t.OutputValues().Get(1).Var, + Target: t.InputValues().Get(0).Var, Worker: t.FromWorker, }, nil } -func (t *GetVaType) String(node *dag.Node) string { - return fmt.Sprintf("GetVar[]%v%v", formatStreamIO(node), formatValueIO(node)) -} +// func (t *GetVaType) String() string { +// return fmt.Sprintf("GetVar[]%v%v", formatStreamIO(node), formatValueIO(node)) +// } diff --git a/pkgs/ioswitch/plan/ops/store.go b/pkgs/ioswitch/plan/ops/store.go index ad48ab5..c04dfdf 100644 --- a/pkgs/ioswitch/plan/ops/store.go +++ b/pkgs/ioswitch/plan/ops/store.go @@ -33,25 +33,30 @@ func (o *Store) String() string { return fmt.Sprintf("Store %v: %v", o.Key, o.Var.GetID()) } -type StoreType struct { - StoreKey string +type StoreNode struct { + dag.NodeBase + Key string } -func (t *StoreType) Store(node *dag.Node, v *dag.ValueVar) { - v.To(node, 0) +func (b *GraphNodeBuilder) NewStore() *StoreNode { + node := &StoreNode{} + b.AddNode(node) + return node } -func (t *StoreType) InitNode(node *dag.Node) { - dag.NodeDeclareInputValue(node, 1) +func (t *StoreNode) Store(key string, v *dag.ValueVar) { + t.Key = key + t.InputValues().EnsureSize(1) + v.Connect(t, 0) } -func (t *StoreType) GenerateOp(op *dag.Node) (exec.Op, error) { +func (t *StoreNode) GenerateOp() (exec.Op, error) { return &Store{ - Var: op.InputValues[0].Var, - Key: t.StoreKey, + Var: t.InputValues().Get(0).Var, + Key: t.Key, }, nil } -func (t *StoreType) String(node *dag.Node) string { - return fmt.Sprintf("Store[%s]%v%v", t.StoreKey, formatStreamIO(node), formatValueIO(node)) -} +// func (t *StoreType) String() string { +// return fmt.Sprintf("Store[%s]%v%v", t.StoreKey, formatStreamIO(node), formatValueIO(node)) +// } diff --git a/pkgs/ioswitch/plan/ops/sync.go b/pkgs/ioswitch/plan/ops/sync.go index 4689e70..746c8e4 100644 --- a/pkgs/ioswitch/plan/ops/sync.go +++ b/pkgs/ioswitch/plan/ops/sync.go @@ -163,49 +163,53 @@ func (b *Broadcast) String() string { return "Broadcast" } -type HoldUntilType struct { +type HoldUntilNode struct { + dag.NodeBase } -func (t *HoldUntilType) InitNode(node *dag.Node) { - dag.NodeDeclareInputValue(node, 1) +func (b *GraphNodeBuilder) NewHoldUntil() *HoldUntilNode { + node := &HoldUntilNode{} + b.AddNode(node) + return node } -func (t *HoldUntilType) GenerateOp(op *dag.Node) (exec.Op, error) { - o := &HoldUntil{ - Waits: []*exec.SignalVar{op.InputValues[0].Var.(*exec.SignalVar)}, - } - - for i := 0; i < len(op.OutputValues); i++ { - o.Holds = append(o.Holds, op.InputValues[i+1].Var) - o.Emits = append(o.Emits, op.OutputValues[i].Var) - } - - for i := 0; i < len(op.OutputStreams); i++ { - o.Holds = append(o.Holds, op.InputStreams[i].Var) - o.Emits = append(o.Emits, op.OutputStreams[i].Var) - } +func (t *HoldUntilNode) SetSignal(s *dag.ValueVar) { + t.InputValues().EnsureSize(1) + s.Connect(t, 0) +} - return o, nil +func (t *HoldUntilNode) HoldStream(str *dag.StreamVar) *dag.StreamVar { + str.Connect(t, t.InputStreams().EnlargeOne()) + output := t.Graph().NewStreamVar() + t.OutputStreams().SetupNew(t, output) + return output } -func (t *HoldUntilType) Signal(n *dag.Node, s *dag.ValueVar) { - s.To(n, 0) +func (t *HoldUntilNode) HoldVar(v *dag.ValueVar) *dag.ValueVar { + v.Connect(t, t.InputValues().EnlargeOne()) + output := t.Graph().NewValueVar(v.Type) + t.OutputValues().SetupNew(t, output) + return output } -func (t *HoldUntilType) HoldStream(n *dag.Node, str *dag.StreamVar) *dag.StreamVar { - n.InputStreams = append(n.InputStreams, nil) - str.To(n, len(n.InputStreams)-1) +func (t *HoldUntilNode) GenerateOp() (exec.Op, error) { + o := &HoldUntil{ + Waits: []*exec.SignalVar{t.InputValues().Get(0).Var.(*exec.SignalVar)}, + } - return dag.NodeNewOutputStream(n, nil) -} + for i := 0; i < t.OutputValues().Len(); i++ { + o.Holds = append(o.Holds, t.InputValues().Get(i+1).Var) + o.Emits = append(o.Emits, t.OutputValues().Get(i).Var) + } -func (t *HoldUntilType) HoldVar(n *dag.Node, v *dag.ValueVar) *dag.ValueVar { - n.InputValues = append(n.InputValues, nil) - v.To(n, len(n.InputValues)-1) + for i := 0; i < t.OutputStreams().Len(); i++ { + o.Holds = append(o.Holds, t.InputStreams().Get(i).Var) + o.Emits = append(o.Emits, t.OutputStreams().Get(i).Var) + } - return dag.NodeNewOutputValue(n, v.Type, nil) + return o, nil } -func (t *HoldUntilType) String(node *dag.Node) string { - return fmt.Sprintf("HoldUntil[]%v%v", formatStreamIO(node), formatValueIO(node)) -} +// func (t *HoldUntilType) String() string { +// return fmt.Sprintf("HoldUntil[]%v%v", formatStreamIO(node), formatValueIO(node)) +// } diff --git a/pkgs/ioswitch/plan/ops/utils.go b/pkgs/ioswitch/plan/ops/utils.go index 0a32ac9..1481f32 100644 --- a/pkgs/ioswitch/plan/ops/utils.go +++ b/pkgs/ioswitch/plan/ops/utils.go @@ -1,75 +1,75 @@ package ops -import ( - "fmt" +// import ( +// "fmt" - "gitlink.org.cn/cloudream/common/pkgs/ioswitch/dag" -) +// "gitlink.org.cn/cloudream/common/pkgs/ioswitch/dag" +// ) -func formatStreamIO(node *dag.Node) string { - is := "" - for i, in := range node.InputStreams { - if i > 0 { - is += "," - } +// func formatStreamIO(node *dag.Node) string { +// is := "" +// for i, in := range node.InputStreams { +// if i > 0 { +// is += "," +// } - if in == nil { - is += "." - } else { - is += fmt.Sprintf("%v", in.ID) - } - } +// if in == nil { +// is += "." +// } else { +// is += fmt.Sprintf("%v", in.ID) +// } +// } - os := "" - for i, out := range node.OutputStreams { - if i > 0 { - os += "," - } +// os := "" +// for i, out := range node.OutputStreams { +// if i > 0 { +// os += "," +// } - if out == nil { - os += "." - } else { - os += fmt.Sprintf("%v", out.ID) - } - } +// if out == nil { +// os += "." +// } else { +// os += fmt.Sprintf("%v", out.ID) +// } +// } - if is == "" && os == "" { - return "" - } +// if is == "" && os == "" { +// return "" +// } - return fmt.Sprintf("S{%s>%s}", is, os) -} +// return fmt.Sprintf("S{%s>%s}", is, os) +// } -func formatValueIO(node *dag.Node) string { - is := "" - for i, in := range node.InputValues { - if i > 0 { - is += "," - } +// func formatValueIO(node *dag.Node) string { +// is := "" +// for i, in := range node.InputValues { +// if i > 0 { +// is += "," +// } - if in == nil { - is += "." - } else { - is += fmt.Sprintf("%v", in.ID) - } - } +// if in == nil { +// is += "." +// } else { +// is += fmt.Sprintf("%v", in.ID) +// } +// } - os := "" - for i, out := range node.OutputValues { - if i > 0 { - os += "," - } +// os := "" +// for i, out := range node.OutputValues { +// if i > 0 { +// os += "," +// } - if out == nil { - os += "." - } else { - os += fmt.Sprintf("%v", out.ID) - } - } +// if out == nil { +// os += "." +// } else { +// os += fmt.Sprintf("%v", out.ID) +// } +// } - if is == "" && os == "" { - return "" - } +// if is == "" && os == "" { +// return "" +// } - return fmt.Sprintf("V{%s>%s}", is, os) -} +// return fmt.Sprintf("V{%s>%s}", is, os) +// }