From fef4e5be8dd20a075b8b056bff3c29fe0859aed6 Mon Sep 17 00:00:00 2001 From: Sydonian <794346190@qq.com> Date: Thu, 15 Aug 2024 16:48:47 +0800 Subject: [PATCH 1/2] =?UTF-8?q?=E6=8B=86=E5=88=86=E5=87=BAioswitch?= =?UTF-8?q?=E5=8C=85?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- pkgs/ioswitch/dag/graph.go | 74 ++++++++++++++ pkgs/ioswitch/dag/node.go | 75 ++++++++++++++ pkgs/ioswitch/dag/var.go | 112 +++++++++++++++++++++ pkgs/ioswitch/exec/driver.go | 119 +++++++++++++++++++++++ pkgs/ioswitch/exec/exec.go | 16 +++ pkgs/ioswitch/exec/executor.go | 151 +++++++++++++++++++++++++++++ pkgs/ioswitch/exec/plan_builder.go | 126 ++++++++++++++++++++++++ pkgs/ioswitch/exec/utils.go | 98 +++++++++++++++++++ pkgs/ioswitch/exec/var.go | 57 +++++++++++ pkgs/ioswitch/exec/worker.go | 85 ++++++++++++++++ pkgs/ioswitch/parser/fromto.go | 26 +++++ pkgs/ioswitch/parser/parser.go | 9 ++ 12 files changed, 948 insertions(+) create mode 100644 pkgs/ioswitch/dag/graph.go create mode 100644 pkgs/ioswitch/dag/node.go create mode 100644 pkgs/ioswitch/dag/var.go create mode 100644 pkgs/ioswitch/exec/driver.go create mode 100644 pkgs/ioswitch/exec/exec.go create mode 100644 pkgs/ioswitch/exec/executor.go create mode 100644 pkgs/ioswitch/exec/plan_builder.go create mode 100644 pkgs/ioswitch/exec/utils.go create mode 100644 pkgs/ioswitch/exec/var.go create mode 100644 pkgs/ioswitch/exec/worker.go create mode 100644 pkgs/ioswitch/parser/fromto.go create mode 100644 pkgs/ioswitch/parser/parser.go diff --git a/pkgs/ioswitch/dag/graph.go b/pkgs/ioswitch/dag/graph.go new file mode 100644 index 0000000..811a6fe --- /dev/null +++ b/pkgs/ioswitch/dag/graph.go @@ -0,0 +1,74 @@ +package dag + +import ( + "gitlink.org.cn/cloudream/common/utils/lo2" +) + +type Graph[NP any, VP any] struct { + Nodes []*Node[NP, VP] + isWalking bool + nextVarID int +} + +func NewGraph[NP any, VP any]() *Graph[NP, VP] { + return &Graph[NP, VP]{} +} + +func (g *Graph[NP, VP]) NewNode(typ NodeType[NP, VP], props NP) *Node[NP, VP] { + n := &Node[NP, VP]{ + Type: typ, + Props: props, + Graph: g, + } + typ.InitNode(n) + g.Nodes = append(g.Nodes, n) + return n +} + +func (g *Graph[NP, VP]) RemoveNode(node *Node[NP, VP]) { + for i, n := range g.Nodes { + if n == node { + if g.isWalking { + g.Nodes[i] = nil + } else { + g.Nodes = lo2.RemoveAt(g.Nodes, i) + } + break + } + } +} + +func (g *Graph[NP, VP]) Walk(cb func(node *Node[NP, VP]) bool) { + g.isWalking = true + for i := 0; i < len(g.Nodes); i++ { + if g.Nodes[i] == nil { + continue + } + + if !cb(g.Nodes[i]) { + break + } + } + g.isWalking = false + + g.Nodes = lo2.RemoveAllDefault(g.Nodes) +} + +func (g *Graph[NP, VP]) genVarID() int { + g.nextVarID++ + return g.nextVarID +} + +func NewNode[NP any, VP any, NT NodeType[NP, VP]](graph *Graph[NP, VP], typ NT, props NP) (*Node[NP, VP], NT) { + return graph.NewNode(typ, props), typ +} + +func WalkOnlyType[N NodeType[NP, VP], NP any, VP any](g *Graph[NP, VP], cb func(node *Node[NP, VP], typ N) bool) { + g.Walk(func(node *Node[NP, VP]) bool { + typ, ok := node.Type.(N) + if ok { + return cb(node, typ) + } + return true + }) +} diff --git a/pkgs/ioswitch/dag/node.go b/pkgs/ioswitch/dag/node.go new file mode 100644 index 0000000..bf3bf24 --- /dev/null +++ b/pkgs/ioswitch/dag/node.go @@ -0,0 +1,75 @@ +package dag + +import ( + "fmt" + + "gitlink.org.cn/cloudream/common/pkgs/ioswitch/exec" +) + +type NodeType[NP any, VP any] interface { + InitNode(node *Node[NP, VP]) + String(node *Node[NP, VP]) string + GenerateOp(node *Node[NP, VP], blder *exec.PlanBuilder) error +} + +type WorkerInfo interface { + // 获取连接到这个worker的GRPC服务的地址 + GetAddress() string + // 判断两个worker是否相同 + Equals(worker WorkerInfo) bool +} + +type NodeEnvType string + +const ( + EnvUnknown NodeEnvType = "" + EnvExecutor NodeEnvType = "Executor" + EnvWorker NodeEnvType = "Worker" +) + +type NodeEnv struct { + Type NodeEnvType + Worker WorkerInfo +} + +func (e *NodeEnv) ToEnvUnknown() { + e.Type = EnvUnknown + e.Worker = nil +} + +func (e *NodeEnv) ToEnvExecutor() { + e.Type = EnvExecutor + e.Worker = nil +} + +func (e *NodeEnv) ToEnvWorker(worker WorkerInfo) { + e.Type = EnvWorker + e.Worker = worker +} + +func (e *NodeEnv) Equals(other NodeEnv) bool { + if e.Type != other.Type { + return false + } + + if e.Type != EnvWorker { + return true + } + + return e.Worker.Equals(other.Worker) +} + +type Node[NP any, VP any] struct { + Type NodeType[NP, VP] + Env NodeEnv + Props NP + InputStreams []*StreamVar[NP, VP] + OutputStreams []*StreamVar[NP, VP] + InputValues []*ValueVar[NP, VP] + OutputValues []*ValueVar[NP, VP] + Graph *Graph[NP, VP] +} + +func (n *Node[NP, VP]) String() string { + return fmt.Sprintf("%v", n.Type.String(n)) +} diff --git a/pkgs/ioswitch/dag/var.go b/pkgs/ioswitch/dag/var.go new file mode 100644 index 0000000..e1caa7b --- /dev/null +++ b/pkgs/ioswitch/dag/var.go @@ -0,0 +1,112 @@ +package dag + +import "gitlink.org.cn/cloudream/common/utils/lo2" + +type EndPoint[NP any, VP any] struct { + Node *Node[NP, VP] + SlotIndex int // 所连接的Node的Output或Input数组的索引 +} + +type StreamVar[NP any, VP any] struct { + ID int + From EndPoint[NP, VP] + Toes []EndPoint[NP, VP] + Props VP +} + +func (v *StreamVar[NP, VP]) To(to *Node[NP, VP], slotIdx int) int { + v.Toes = append(v.Toes, EndPoint[NP, VP]{Node: to, SlotIndex: slotIdx}) + to.InputStreams[slotIdx] = v + return len(v.Toes) - 1 +} + +// func (v *StreamVar[NP, VP]) NotTo(toIdx int) EndPoint[NP, VP] { +// ed := v.Toes[toIdx] +// lo2.RemoveAt(v.Toes, toIdx) +// ed.Node.InputStreams[ed.SlotIndex] = nil +// return ed +// } + +func (v *StreamVar[NP, VP]) NotTo(node *Node[NP, VP]) (EndPoint[NP, VP], bool) { + for i, ed := range v.Toes { + if ed.Node == node { + v.Toes = lo2.RemoveAt(v.Toes, i) + ed.Node.InputStreams[ed.SlotIndex] = nil + return ed, true + } + } + + return EndPoint[NP, VP]{}, false +} + +func (v *StreamVar[NP, VP]) NotToWhere(pred func(to EndPoint[NP, VP]) bool) []EndPoint[NP, VP] { + var newToes []EndPoint[NP, VP] + var rmed []EndPoint[NP, VP] + for _, ed := range v.Toes { + if pred(ed) { + ed.Node.InputStreams[ed.SlotIndex] = nil + rmed = append(rmed, ed) + } else { + newToes = append(newToes, ed) + } + } + v.Toes = newToes + return rmed +} + +func (v *StreamVar[NP, VP]) NotToAll() []EndPoint[NP, VP] { + for _, ed := range v.Toes { + ed.Node.InputStreams[ed.SlotIndex] = nil + } + toes := v.Toes + v.Toes = nil + return toes +} + +func NodeNewOutputStream[NP any, VP any](node *Node[NP, VP], props VP) *StreamVar[NP, VP] { + str := &StreamVar[NP, VP]{ + ID: node.Graph.genVarID(), + From: EndPoint[NP, VP]{Node: node, SlotIndex: len(node.OutputStreams)}, + Props: props, + } + node.OutputStreams = append(node.OutputStreams, str) + return str +} + +func NodeDeclareInputStream[NP any, VP any](node *Node[NP, VP], cnt int) { + node.InputStreams = make([]*StreamVar[NP, VP], cnt) +} + +type ValueVarType int + +const ( + StringValueVar ValueVarType = iota + SignalValueVar +) + +type ValueVar[NP any, VP any] struct { + ID int + From EndPoint[NP, VP] + Toes []EndPoint[NP, VP] + Props VP +} + +func (v *ValueVar[NP, VP]) To(to *Node[NP, VP], slotIdx int) int { + v.Toes = append(v.Toes, EndPoint[NP, VP]{Node: to, SlotIndex: slotIdx}) + to.InputValues[slotIdx] = v + return len(v.Toes) - 1 +} + +func NodeNewOutputValue[NP any, VP any](node *Node[NP, VP], props VP) *ValueVar[NP, VP] { + val := &ValueVar[NP, VP]{ + ID: node.Graph.genVarID(), + From: EndPoint[NP, VP]{Node: node, SlotIndex: len(node.OutputStreams)}, + Props: props, + } + node.OutputValues = append(node.OutputValues, val) + return val +} + +func NodeDeclareInputValue[NP any, VP any](node *Node[NP, VP], cnt int) { + node.InputValues = make([]*ValueVar[NP, VP], cnt) +} diff --git a/pkgs/ioswitch/exec/driver.go b/pkgs/ioswitch/exec/driver.go new file mode 100644 index 0000000..26f8273 --- /dev/null +++ b/pkgs/ioswitch/exec/driver.go @@ -0,0 +1,119 @@ +package exec + +import ( + "context" + "fmt" + "io" + "sync" + + "gitlink.org.cn/cloudream/common/pkgs/future" + "gitlink.org.cn/cloudream/common/utils/io2" + stgglb "gitlink.org.cn/cloudream/storage/common/globals" +) + +type Driver struct { + planID PlanID + planBlder *PlanBuilder + callback *future.SetVoidFuture + ctx context.Context + cancel context.CancelFunc + driverExec *Executor +} + +// 开始写入一个流。此函数会将输入视为一个完整的流,因此会给流包装一个Range来获取只需要的部分。 +func (e *Driver) BeginWrite(str io.ReadCloser, handle *DriverWriteStream) { + handle.Var.Stream = io2.NewRange(str, handle.RangeHint.Offset, handle.RangeHint.Length) + e.driverExec.PutVars(handle.Var) +} + +// 开始写入一个流。此函数默认输入流已经是Handle的RangeHint锁描述的范围,因此不会做任何其他处理 +func (e *Driver) BeginWriteRanged(str io.ReadCloser, handle *DriverWriteStream) { + handle.Var.Stream = str + e.driverExec.PutVars(handle.Var) +} + +func (e *Driver) BeginRead(handle *DriverReadStream) (io.ReadCloser, error) { + err := e.driverExec.BindVars(e.ctx, handle.Var) + if err != nil { + return nil, fmt.Errorf("bind vars: %w", err) + } + + return handle.Var.Stream, nil +} + +func (e *Driver) Signal(signal *DriverSignalVar) { + e.driverExec.PutVars(signal.Var) +} + +func (e *Driver) Wait(ctx context.Context) (map[string]any, error) { + err := e.callback.Wait(ctx) + if err != nil { + return nil, err + } + + ret := make(map[string]any) + e.planBlder.DriverPlan.StoreMap.Range(func(k, v any) bool { + ret[k.(string)] = v + return true + }) + + return ret, nil +} + +func (e *Driver) execute() { + wg := sync.WaitGroup{} + + for _, p := range e.planBlder.WorkerPlans { + wg.Add(1) + + go func(p *WorkerPlanBuilder) { + defer wg.Done() + + plan := Plan{ + ID: e.planID, + Ops: p.Ops, + } + + cli, err := stgglb.AgentRPCPool.Acquire(stgglb.SelectGRPCAddress(&p.Node)) + if err != nil { + e.stopWith(fmt.Errorf("new agent rpc client of node %v: %w", p.Node.NodeID, err)) + return + } + defer stgglb.AgentRPCPool.Release(cli) + + err = cli.ExecuteIOPlan(e.ctx, plan) + if err != nil { + e.stopWith(fmt.Errorf("execute plan at %v: %w", p.Node.NodeID, err)) + return + } + }(p) + } + + err := e.driverExec.Run(e.ctx) + if err != nil { + e.stopWith(fmt.Errorf("run executor switch: %w", err)) + return + } + + wg.Wait() + + e.callback.SetVoid() +} + +func (e *Driver) stopWith(err error) { + e.callback.SetError(err) + e.cancel() +} + +type DriverWriteStream struct { + Var *StreamVar + RangeHint *Range +} + +type DriverReadStream struct { + Var *StreamVar +} + +type DriverSignalVar struct { + Var *SignalVar +} diff --git a/pkgs/ioswitch/exec/exec.go b/pkgs/ioswitch/exec/exec.go new file mode 100644 index 0000000..d1c3558 --- /dev/null +++ b/pkgs/ioswitch/exec/exec.go @@ -0,0 +1,16 @@ +package exec + +import ( + "context" +) + +type PlanID string + +type Plan struct { + ID PlanID `json:"id"` + Ops []Op `json:"ops"` +} + +type Op interface { + Execute(ctx context.Context, sw *Executor) error +} diff --git a/pkgs/ioswitch/exec/executor.go b/pkgs/ioswitch/exec/executor.go new file mode 100644 index 0000000..da4748d --- /dev/null +++ b/pkgs/ioswitch/exec/executor.go @@ -0,0 +1,151 @@ +package exec + +import ( + "context" + "fmt" + "sync" + + "gitlink.org.cn/cloudream/common/pkgs/future" + "gitlink.org.cn/cloudream/common/utils/lo2" + "gitlink.org.cn/cloudream/common/utils/sync2" +) + +type bindingVars struct { + Waittings []Var + Bindeds []Var + Callback *future.SetVoidFuture +} + +type Executor struct { + plan Plan + vars map[VarID]Var + bindings []*bindingVars + lock sync.Mutex +} + +func NewExecutor(plan Plan) *Executor { + planning := Executor{ + plan: plan, + vars: make(map[VarID]Var), + } + + return &planning +} + +func (s *Executor) Plan() *Plan { + return &s.plan +} + +func (s *Executor) Run(ctx context.Context) error { + ctx2, cancel := context.WithCancel(ctx) + defer cancel() + + return sync2.ParallelDo(s.plan.Ops, func(o Op, idx int) error { + err := o.Execute(ctx2, s) + + s.lock.Lock() + defer s.lock.Unlock() + + if err != nil { + cancel() + return fmt.Errorf("%T: %w", o, err) + } + + return nil + }) +} + +func (s *Executor) BindVars(ctx context.Context, vs ...Var) error { + s.lock.Lock() + + callback := future.NewSetVoid() + binding := &bindingVars{ + Callback: callback, + } + + for _, v := range vs { + v2 := s.vars[v.GetID()] + if v2 == nil { + binding.Waittings = append(binding.Waittings, v) + continue + } + + if err := AssignVar(v2, v); err != nil { + s.lock.Unlock() + return fmt.Errorf("assign var %v to %v: %w", v2.GetID(), v.GetID(), err) + } + + binding.Bindeds = append(binding.Bindeds, v) + } + + if len(binding.Waittings) == 0 { + s.lock.Unlock() + return nil + } + + s.bindings = append(s.bindings, binding) + s.lock.Unlock() + + err := callback.Wait(ctx) + + s.lock.Lock() + defer s.lock.Unlock() + + s.bindings = lo2.Remove(s.bindings, binding) + + return err +} + +func (s *Executor) PutVars(vs ...Var) { + s.lock.Lock() + defer s.lock.Unlock() + +loop: + for _, v := range vs { + for ib, b := range s.bindings { + for iw, w := range b.Waittings { + if w.GetID() != v.GetID() { + continue + } + + if err := AssignVar(v, w); err != nil { + b.Callback.SetError(fmt.Errorf("assign var %v to %v: %w", v.GetID(), w.GetID(), err)) + // 绑定类型不对,说明生成的执行计划有问题,怎么处理都可以,因为最终会执行失败 + continue loop + } + + b.Bindeds = append(b.Bindeds, w) + b.Waittings = lo2.RemoveAt(b.Waittings, iw) + if len(b.Waittings) == 0 { + b.Callback.SetVoid() + s.bindings = lo2.RemoveAt(s.bindings, ib) + } + + // 绑定成功,继续最外层循环 + continue loop + } + + } + + // 如果没有绑定,则直接放入变量表中 + s.vars[v.GetID()] = v + } +} + +func BindArrayVars[T Var](sw *Executor, ctx context.Context, vs []T) error { + var vs2 []Var + for _, v := range vs { + vs2 = append(vs2, v) + } + + return sw.BindVars(ctx, vs2...) +} + +func PutArrayVars[T Var](sw *Executor, vs []T) { + var vs2 []Var + for _, v := range vs { + vs2 = append(vs2, v) + } + + sw.PutVars(vs2...) +} diff --git a/pkgs/ioswitch/exec/plan_builder.go b/pkgs/ioswitch/exec/plan_builder.go new file mode 100644 index 0000000..f12d27e --- /dev/null +++ b/pkgs/ioswitch/exec/plan_builder.go @@ -0,0 +1,126 @@ +package exec + +import ( + "context" + "sync" + + "gitlink.org.cn/cloudream/common/pkgs/future" + cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" + "gitlink.org.cn/cloudream/common/utils/lo2" +) + +type PlanBuilder struct { + Vars []Var + WorkerPlans map[cdssdk.NodeID]*WorkerPlanBuilder + DriverPlan DriverPlanBuilder +} + +func NewPlanBuilder() *PlanBuilder { + bld := &PlanBuilder{ + WorkerPlans: make(map[cdssdk.NodeID]*WorkerPlanBuilder), + DriverPlan: DriverPlanBuilder{ + StoreMap: &sync.Map{}, + }, + } + + return bld +} + +func (b *PlanBuilder) AtExecutor() *DriverPlanBuilder { + return &b.DriverPlan +} + +func (b *PlanBuilder) AtAgent(node cdssdk.Node) *WorkerPlanBuilder { + agtPlan, ok := b.WorkerPlans[node.NodeID] + if !ok { + agtPlan = &WorkerPlanBuilder{ + Node: node, + } + b.WorkerPlans[node.NodeID] = agtPlan + } + + return agtPlan +} + +func (b *PlanBuilder) NewStreamVar() *StreamVar { + v := &StreamVar{ + ID: VarID(len(b.Vars)), + } + b.Vars = append(b.Vars, v) + + return v +} + +func (b *PlanBuilder) NewIntVar() *IntVar { + v := &IntVar{ + ID: VarID(len(b.Vars)), + } + b.Vars = append(b.Vars, v) + + return v +} + +func (b *PlanBuilder) NewStringVar() *StringVar { + v := &StringVar{ + ID: VarID(len(b.Vars)), + } + b.Vars = append(b.Vars, v) + + return v +} +func (b *PlanBuilder) NewSignalVar() *SignalVar { + v := &SignalVar{ + ID: VarID(len(b.Vars)), + } + b.Vars = append(b.Vars, v) + + return v +} + +func (b *PlanBuilder) Execute() *Driver { + ctx, cancel := context.WithCancel(context.Background()) + planID := genRandomPlanID() + + execPlan := Plan{ + ID: planID, + Ops: b.DriverPlan.Ops, + } + + exec := Driver{ + planID: planID, + planBlder: b, + callback: future.NewSetVoid(), + ctx: ctx, + cancel: cancel, + driverExec: NewExecutor(execPlan), + } + go exec.execute() + + return &exec +} + +type WorkerPlanBuilder struct { + Node cdssdk.Node + Ops []Op +} + +func (b *WorkerPlanBuilder) AddOp(op Op) { + b.Ops = append(b.Ops, op) +} + +func (b *WorkerPlanBuilder) RemoveOp(op Op) { + b.Ops = lo2.Remove(b.Ops, op) +} + +type DriverPlanBuilder struct { + Ops []Op + StoreMap *sync.Map +} + +func (b *DriverPlanBuilder) AddOp(op Op) { + b.Ops = append(b.Ops, op) +} + +func (b *DriverPlanBuilder) RemoveOp(op Op) { + b.Ops = lo2.Remove(b.Ops, op) +} diff --git a/pkgs/ioswitch/exec/utils.go b/pkgs/ioswitch/exec/utils.go new file mode 100644 index 0000000..4112669 --- /dev/null +++ b/pkgs/ioswitch/exec/utils.go @@ -0,0 +1,98 @@ +package exec + +import ( + "fmt" + "reflect" + + "github.com/google/uuid" + "gitlink.org.cn/cloudream/common/utils/math2" +) + +func genRandomPlanID() PlanID { + return PlanID(uuid.NewString()) +} + +func AssignVar(from Var, to Var) error { + if reflect.TypeOf(from) != reflect.TypeOf(to) { + return fmt.Errorf("cannot assign %T to %T", from, to) + } + + switch from := from.(type) { + case *StreamVar: + to.(*StreamVar).Stream = from.Stream + case *IntVar: + to.(*IntVar).Value = from.Value + case *StringVar: + to.(*StringVar).Value = from.Value + case *SignalVar: + } + + return nil +} + +type Range struct { + Offset int64 + Length *int64 +} + +func (r *Range) Extend(other Range) { + newOffset := math2.Min(r.Offset, other.Offset) + + if r.Length == nil { + r.Offset = newOffset + return + } + + if other.Length == nil { + r.Offset = newOffset + r.Length = nil + return + } + + otherEnd := other.Offset + *other.Length + rEnd := r.Offset + *r.Length + + newEnd := math2.Max(otherEnd, rEnd) + r.Offset = newOffset + *r.Length = newEnd - newOffset +} + +func (r *Range) ExtendStart(start int64) { + r.Offset = math2.Min(r.Offset, start) +} + +func (r *Range) ExtendEnd(end int64) { + if r.Length == nil { + return + } + + rEnd := r.Offset + *r.Length + newLen := math2.Max(end, rEnd) - r.Offset + r.Length = &newLen +} + +func (r *Range) Fix(maxLength int64) { + if r.Length != nil { + return + } + + len := maxLength - r.Offset + r.Length = &len +} + +func (r *Range) ToStartEnd(maxLen int64) (start int64, end int64) { + if r.Length == nil { + return r.Offset, maxLen + } + + end = r.Offset + *r.Length + return r.Offset, end +} + +func (r *Range) ClampLength(maxLen int64) { + if r.Length == nil { + return + } + + *r.Length = math2.Min(*r.Length, maxLen-r.Offset) +} diff --git a/pkgs/ioswitch/exec/var.go b/pkgs/ioswitch/exec/var.go new file mode 100644 index 0000000..62e76ad --- /dev/null +++ b/pkgs/ioswitch/exec/var.go @@ -0,0 +1,57 @@ +package exec + +import ( + "io" + + "gitlink.org.cn/cloudream/common/pkgs/types" + "gitlink.org.cn/cloudream/common/utils/serder" +) + +type VarID int + +type Var interface { + GetID() VarID +} + +var VarUnion = types.NewTypeUnion[Var]( + (*IntVar)(nil), + (*StringVar)(nil), + (*SignalVar)(nil), + (*StreamVar)(nil), +) +var _ = serder.UseTypeUnionExternallyTagged(&VarUnion) + +type StreamVar struct { + ID VarID `json:"id"` + Stream io.ReadCloser `json:"-"` +} + +func (v *StreamVar) GetID() VarID { + return v.ID +} + +type IntVar struct { + ID VarID `json:"id"` + Value string `json:"value"` +} + +func (v *IntVar) GetID() VarID { + return v.ID +} + +type StringVar struct { + ID VarID `json:"id"` + Value string `json:"value"` +} + +func (v *StringVar) GetID() VarID { + return v.ID +} + +type SignalVar struct { + ID VarID `json:"id"` +} + +func (v *SignalVar) GetID() VarID { + return v.ID +} diff --git a/pkgs/ioswitch/exec/worker.go b/pkgs/ioswitch/exec/worker.go new file mode 100644 index 0000000..0a5d9d1 --- /dev/null +++ b/pkgs/ioswitch/exec/worker.go @@ -0,0 +1,85 @@ +package exec + +import ( + "context" + "sync" + + "github.com/samber/lo" + "gitlink.org.cn/cloudream/common/pkgs/future" + "gitlink.org.cn/cloudream/common/utils/lo2" +) + +type finding struct { + PlanID PlanID + Callback *future.SetValueFuture[*Executor] +} + +type Worker struct { + lock sync.Mutex + executors map[PlanID]*Executor + findings []*finding +} + +func NewWorker() Worker { + return Worker{ + executors: make(map[PlanID]*Executor), + } +} + +func (s *Worker) Add(exe *Executor) { + s.lock.Lock() + defer s.lock.Unlock() + + s.executors[exe.Plan().ID] = exe + + s.findings = lo.Reject(s.findings, func(f *finding, idx int) bool { + if f.PlanID != exe.Plan().ID { + return false + } + + f.Callback.SetValue(exe) + return true + }) +} + +func (s *Worker) Remove(sw *Executor) { + s.lock.Lock() + defer s.lock.Unlock() + + delete(s.executors, sw.Plan().ID) +} + +func (s *Worker) FindByID(id PlanID) *Executor { + s.lock.Lock() + defer s.lock.Unlock() + + return s.executors[id] +} + +func (s *Worker) FindByIDContexted(ctx context.Context, id PlanID) *Executor { + s.lock.Lock() + + sw := s.executors[id] + if sw != nil { + s.lock.Unlock() + return sw + } + + cb := future.NewSetValue[*Executor]() + f := &finding{ + PlanID: id, + Callback: cb, + } + s.findings = append(s.findings, f) + + s.lock.Unlock() + + sw, _ = cb.WaitValue(ctx) + + s.lock.Lock() + defer s.lock.Unlock() + + s.findings = lo2.Remove(s.findings, f) + + return sw +} diff --git a/pkgs/ioswitch/parser/fromto.go b/pkgs/ioswitch/parser/fromto.go new file mode 100644 index 0000000..5eaa0c6 --- /dev/null +++ b/pkgs/ioswitch/parser/fromto.go @@ -0,0 +1,26 @@ +package parser + +type From interface{} + +type To interface{} + +type FromTos []FromTo + +type FromTo struct { + Froms []From + Toes []To +} + +func NewFromTo() FromTo { + return FromTo{} +} + +func (ft *FromTo) AddFrom(from From) *FromTo { + ft.Froms = append(ft.Froms, from) + return ft +} + +func (ft *FromTo) AddTo(to To) *FromTo { + ft.Toes = append(ft.Toes, to) + return ft +} diff --git a/pkgs/ioswitch/parser/parser.go b/pkgs/ioswitch/parser/parser.go new file mode 100644 index 0000000..9991c96 --- /dev/null +++ b/pkgs/ioswitch/parser/parser.go @@ -0,0 +1,9 @@ +package parser + +import ( + "gitlink.org.cn/cloudream/common/pkgs/ioswitch/exec" +) + +type FromToParser interface { + Parse(ft FromTo, blder *exec.PlanBuilder) error +} From 2ca4783657129c99e9268fec630a3e40527c585d Mon Sep 17 00:00:00 2001 From: Sydonian <794346190@qq.com> Date: Tue, 20 Aug 2024 09:25:33 +0800 Subject: [PATCH 2/2] =?UTF-8?q?=E6=8B=86=E5=88=86ioswitch=E6=A8=A1?= =?UTF-8?q?=E5=9D=97?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- go.mod | 1 + go.sum | 2 + pkgs/ioswitch/dag/graph.go | 24 ++-- pkgs/ioswitch/dag/node.go | 47 +++--- pkgs/ioswitch/dag/utils.go | 13 ++ pkgs/ioswitch/dag/var.go | 70 ++++----- pkgs/ioswitch/exec/driver.go | 27 ++-- pkgs/ioswitch/exec/exec.go | 12 +- pkgs/ioswitch/exec/executor.go | 22 ++- pkgs/ioswitch/exec/plan_builder.go | 37 +++-- pkgs/ioswitch/exec/worker.go | 18 +++ pkgs/ioswitch/parser/fromto.go | 26 ---- pkgs/ioswitch/parser/parser.go | 9 -- pkgs/ioswitch/plan/generate.go | 157 ++++++++++++++++++++ pkgs/ioswitch/plan/ops/driver.go | 43 ++++++ pkgs/ioswitch/plan/ops/drop.go | 52 +++++++ pkgs/ioswitch/plan/ops/send.go | 224 +++++++++++++++++++++++++++++ pkgs/ioswitch/plan/ops/store.go | 49 +++++++ pkgs/ioswitch/plan/ops/sync.go | 172 ++++++++++++++++++++++ pkgs/ioswitch/plan/ops/utils.go | 75 ++++++++++ pkgs/ioswitch/plan/ops/var.go | 20 +++ 21 files changed, 952 insertions(+), 148 deletions(-) create mode 100644 pkgs/ioswitch/dag/utils.go delete mode 100644 pkgs/ioswitch/parser/fromto.go delete mode 100644 pkgs/ioswitch/parser/parser.go create mode 100644 pkgs/ioswitch/plan/generate.go create mode 100644 pkgs/ioswitch/plan/ops/driver.go create mode 100644 pkgs/ioswitch/plan/ops/drop.go create mode 100644 pkgs/ioswitch/plan/ops/send.go create mode 100644 pkgs/ioswitch/plan/ops/store.go create mode 100644 pkgs/ioswitch/plan/ops/sync.go create mode 100644 pkgs/ioswitch/plan/ops/utils.go create mode 100644 pkgs/ioswitch/plan/ops/var.go diff --git a/go.mod b/go.mod index 8207adb..7d1acf5 100644 --- a/go.mod +++ b/go.mod @@ -61,6 +61,7 @@ require ( go.uber.org/zap v1.24.0 // indirect golang.org/x/crypto v0.6.0 // indirect golang.org/x/net v0.8.0 // indirect + golang.org/x/sync v0.1.0 golang.org/x/sys v0.6.0 // indirect golang.org/x/text v0.8.0 // indirect google.golang.org/genproto v0.0.0-20230403163135-c38d8f061ccd // indirect diff --git a/go.sum b/go.sum index 81f0644..68dc7ef 100644 --- a/go.sum +++ b/go.sum @@ -147,6 +147,8 @@ golang.org/x/net v0.8.0/go.mod h1:QVkue5JL9kW//ek3r6jTKnTFis1tRmNAW2P1shuFdJc= golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.1.0 h1:wsuoTGHzEhffawBOhz5CYhcrV4IdKZbEyZjBMuTp12o= +golang.org/x/sync v0.1.0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= diff --git a/pkgs/ioswitch/dag/graph.go b/pkgs/ioswitch/dag/graph.go index 811a6fe..995ebf2 100644 --- a/pkgs/ioswitch/dag/graph.go +++ b/pkgs/ioswitch/dag/graph.go @@ -4,18 +4,18 @@ import ( "gitlink.org.cn/cloudream/common/utils/lo2" ) -type Graph[NP any, VP any] struct { - Nodes []*Node[NP, VP] +type Graph struct { + Nodes []*Node isWalking bool nextVarID int } -func NewGraph[NP any, VP any]() *Graph[NP, VP] { - return &Graph[NP, VP]{} +func NewGraph() *Graph { + return &Graph{} } -func (g *Graph[NP, VP]) NewNode(typ NodeType[NP, VP], props NP) *Node[NP, VP] { - n := &Node[NP, VP]{ +func (g *Graph) NewNode(typ NodeType, props any) *Node { + n := &Node{ Type: typ, Props: props, Graph: g, @@ -25,7 +25,7 @@ func (g *Graph[NP, VP]) NewNode(typ NodeType[NP, VP], props NP) *Node[NP, VP] { return n } -func (g *Graph[NP, VP]) RemoveNode(node *Node[NP, VP]) { +func (g *Graph) RemoveNode(node *Node) { for i, n := range g.Nodes { if n == node { if g.isWalking { @@ -38,7 +38,7 @@ func (g *Graph[NP, VP]) RemoveNode(node *Node[NP, VP]) { } } -func (g *Graph[NP, VP]) Walk(cb func(node *Node[NP, VP]) bool) { +func (g *Graph) Walk(cb func(node *Node) bool) { g.isWalking = true for i := 0; i < len(g.Nodes); i++ { if g.Nodes[i] == nil { @@ -54,17 +54,17 @@ func (g *Graph[NP, VP]) Walk(cb func(node *Node[NP, VP]) bool) { g.Nodes = lo2.RemoveAllDefault(g.Nodes) } -func (g *Graph[NP, VP]) genVarID() int { +func (g *Graph) genVarID() int { g.nextVarID++ return g.nextVarID } -func NewNode[NP any, VP any, NT NodeType[NP, VP]](graph *Graph[NP, VP], typ NT, props NP) (*Node[NP, VP], NT) { +func NewNode[N NodeType](graph *Graph, typ N, props any) (*Node, N) { return graph.NewNode(typ, props), typ } -func WalkOnlyType[N NodeType[NP, VP], NP any, VP any](g *Graph[NP, VP], cb func(node *Node[NP, VP], typ N) bool) { - g.Walk(func(node *Node[NP, VP]) bool { +func WalkOnlyType[N NodeType](g *Graph, cb func(node *Node, typ N) bool) { + g.Walk(func(node *Node) bool { typ, ok := node.Type.(N) if ok { return cb(node, typ) diff --git a/pkgs/ioswitch/dag/node.go b/pkgs/ioswitch/dag/node.go index bf3bf24..6298b46 100644 --- a/pkgs/ioswitch/dag/node.go +++ b/pkgs/ioswitch/dag/node.go @@ -6,30 +6,23 @@ import ( "gitlink.org.cn/cloudream/common/pkgs/ioswitch/exec" ) -type NodeType[NP any, VP any] interface { - InitNode(node *Node[NP, VP]) - String(node *Node[NP, VP]) string - GenerateOp(node *Node[NP, VP], blder *exec.PlanBuilder) error -} - -type WorkerInfo interface { - // 获取连接到这个worker的GRPC服务的地址 - GetAddress() string - // 判断两个worker是否相同 - Equals(worker WorkerInfo) bool +type NodeType interface { + InitNode(node *Node) + String(node *Node) string + GenerateOp(node *Node) (exec.Op, error) } type NodeEnvType string const ( - EnvUnknown NodeEnvType = "" - EnvExecutor NodeEnvType = "Executor" - EnvWorker NodeEnvType = "Worker" + EnvUnknown NodeEnvType = "" + EnvDriver NodeEnvType = "Driver" + EnvWorker NodeEnvType = "Worker" ) type NodeEnv struct { Type NodeEnvType - Worker WorkerInfo + Worker exec.WorkerInfo } func (e *NodeEnv) ToEnvUnknown() { @@ -37,12 +30,12 @@ func (e *NodeEnv) ToEnvUnknown() { e.Worker = nil } -func (e *NodeEnv) ToEnvExecutor() { - e.Type = EnvExecutor +func (e *NodeEnv) ToEnvDriver() { + e.Type = EnvDriver e.Worker = nil } -func (e *NodeEnv) ToEnvWorker(worker WorkerInfo) { +func (e *NodeEnv) ToEnvWorker(worker exec.WorkerInfo) { e.Type = EnvWorker e.Worker = worker } @@ -59,17 +52,17 @@ func (e *NodeEnv) Equals(other NodeEnv) bool { return e.Worker.Equals(other.Worker) } -type Node[NP any, VP any] struct { - Type NodeType[NP, VP] +type Node struct { + Type NodeType Env NodeEnv - Props NP - InputStreams []*StreamVar[NP, VP] - OutputStreams []*StreamVar[NP, VP] - InputValues []*ValueVar[NP, VP] - OutputValues []*ValueVar[NP, VP] - Graph *Graph[NP, VP] + Props any + InputStreams []*StreamVar + OutputStreams []*StreamVar + InputValues []*ValueVar + OutputValues []*ValueVar + Graph *Graph } -func (n *Node[NP, VP]) String() string { +func (n *Node) String() string { return fmt.Sprintf("%v", n.Type.String(n)) } diff --git a/pkgs/ioswitch/dag/utils.go b/pkgs/ioswitch/dag/utils.go new file mode 100644 index 0000000..cdee0ae --- /dev/null +++ b/pkgs/ioswitch/dag/utils.go @@ -0,0 +1,13 @@ +package dag + +func NProps[T any](n *Node) T { + return n.Props.(T) +} + +func SProps[T any](str *StreamVar) T { + return str.Props.(T) +} + +func VProps[T any](v *ValueVar) T { + return v.Props.(T) +} diff --git a/pkgs/ioswitch/dag/var.go b/pkgs/ioswitch/dag/var.go index e1caa7b..48d890b 100644 --- a/pkgs/ioswitch/dag/var.go +++ b/pkgs/ioswitch/dag/var.go @@ -1,33 +1,37 @@ package dag -import "gitlink.org.cn/cloudream/common/utils/lo2" +import ( + "gitlink.org.cn/cloudream/common/pkgs/ioswitch/exec" + "gitlink.org.cn/cloudream/common/utils/lo2" +) -type EndPoint[NP any, VP any] struct { - Node *Node[NP, VP] +type EndPoint struct { + Node *Node SlotIndex int // 所连接的Node的Output或Input数组的索引 } -type StreamVar[NP any, VP any] struct { +type StreamVar struct { ID int - From EndPoint[NP, VP] - Toes []EndPoint[NP, VP] - Props VP + From EndPoint + Toes []EndPoint + Props any + Var *exec.StreamVar } -func (v *StreamVar[NP, VP]) To(to *Node[NP, VP], slotIdx int) int { - v.Toes = append(v.Toes, EndPoint[NP, VP]{Node: to, SlotIndex: slotIdx}) +func (v *StreamVar) To(to *Node, slotIdx int) int { + v.Toes = append(v.Toes, EndPoint{Node: to, SlotIndex: slotIdx}) to.InputStreams[slotIdx] = v return len(v.Toes) - 1 } -// func (v *StreamVar[NP, VP]) NotTo(toIdx int) EndPoint[NP, VP] { +// func (v *StreamVar) NotTo(toIdx int) EndPoint { // ed := v.Toes[toIdx] // lo2.RemoveAt(v.Toes, toIdx) // ed.Node.InputStreams[ed.SlotIndex] = nil // return ed // } -func (v *StreamVar[NP, VP]) NotTo(node *Node[NP, VP]) (EndPoint[NP, VP], bool) { +func (v *StreamVar) NotTo(node *Node) (EndPoint, bool) { for i, ed := range v.Toes { if ed.Node == node { v.Toes = lo2.RemoveAt(v.Toes, i) @@ -36,12 +40,12 @@ func (v *StreamVar[NP, VP]) NotTo(node *Node[NP, VP]) (EndPoint[NP, VP], bool) { } } - return EndPoint[NP, VP]{}, false + return EndPoint{}, false } -func (v *StreamVar[NP, VP]) NotToWhere(pred func(to EndPoint[NP, VP]) bool) []EndPoint[NP, VP] { - var newToes []EndPoint[NP, VP] - var rmed []EndPoint[NP, VP] +func (v *StreamVar) NotToWhere(pred func(to EndPoint) bool) []EndPoint { + var newToes []EndPoint + var rmed []EndPoint for _, ed := range v.Toes { if pred(ed) { ed.Node.InputStreams[ed.SlotIndex] = nil @@ -54,7 +58,7 @@ func (v *StreamVar[NP, VP]) NotToWhere(pred func(to EndPoint[NP, VP]) bool) []En return rmed } -func (v *StreamVar[NP, VP]) NotToAll() []EndPoint[NP, VP] { +func (v *StreamVar) NotToAll() []EndPoint { for _, ed := range v.Toes { ed.Node.InputStreams[ed.SlotIndex] = nil } @@ -63,18 +67,18 @@ func (v *StreamVar[NP, VP]) NotToAll() []EndPoint[NP, VP] { return toes } -func NodeNewOutputStream[NP any, VP any](node *Node[NP, VP], props VP) *StreamVar[NP, VP] { - str := &StreamVar[NP, VP]{ +func NodeNewOutputStream(node *Node, props any) *StreamVar { + str := &StreamVar{ ID: node.Graph.genVarID(), - From: EndPoint[NP, VP]{Node: node, SlotIndex: len(node.OutputStreams)}, + From: EndPoint{Node: node, SlotIndex: len(node.OutputStreams)}, Props: props, } node.OutputStreams = append(node.OutputStreams, str) return str } -func NodeDeclareInputStream[NP any, VP any](node *Node[NP, VP], cnt int) { - node.InputStreams = make([]*StreamVar[NP, VP], cnt) +func NodeDeclareInputStream(node *Node, cnt int) { + node.InputStreams = make([]*StreamVar, cnt) } type ValueVarType int @@ -84,29 +88,31 @@ const ( SignalValueVar ) -type ValueVar[NP any, VP any] struct { +type ValueVar struct { ID int - From EndPoint[NP, VP] - Toes []EndPoint[NP, VP] - Props VP + Type ValueVarType + From EndPoint + Toes []EndPoint + Props any + Var exec.Var } -func (v *ValueVar[NP, VP]) To(to *Node[NP, VP], slotIdx int) int { - v.Toes = append(v.Toes, EndPoint[NP, VP]{Node: to, SlotIndex: slotIdx}) +func (v *ValueVar) To(to *Node, slotIdx int) int { + v.Toes = append(v.Toes, EndPoint{Node: to, SlotIndex: slotIdx}) to.InputValues[slotIdx] = v return len(v.Toes) - 1 } -func NodeNewOutputValue[NP any, VP any](node *Node[NP, VP], props VP) *ValueVar[NP, VP] { - val := &ValueVar[NP, VP]{ +func NodeNewOutputValue(node *Node, props any) *ValueVar { + val := &ValueVar{ ID: node.Graph.genVarID(), - From: EndPoint[NP, VP]{Node: node, SlotIndex: len(node.OutputStreams)}, + From: EndPoint{Node: node, SlotIndex: len(node.OutputStreams)}, Props: props, } node.OutputValues = append(node.OutputValues, val) return val } -func NodeDeclareInputValue[NP any, VP any](node *Node[NP, VP], cnt int) { - node.InputValues = make([]*ValueVar[NP, VP], cnt) +func NodeDeclareInputValue(node *Node, cnt int) { + node.InputValues = make([]*ValueVar, cnt) } diff --git a/pkgs/ioswitch/exec/driver.go b/pkgs/ioswitch/exec/driver.go index 26f8273..4b51480 100644 --- a/pkgs/ioswitch/exec/driver.go +++ b/pkgs/ioswitch/exec/driver.go @@ -8,13 +8,12 @@ import ( "gitlink.org.cn/cloudream/common/pkgs/future" "gitlink.org.cn/cloudream/common/utils/io2" - stgglb "gitlink.org.cn/cloudream/storage/common/globals" ) type Driver struct { planID PlanID planBlder *PlanBuilder - callback *future.SetVoidFuture + callback *future.SetValueFuture[map[string]any] ctx context.Context cancel context.CancelFunc driverExec *Executor @@ -46,18 +45,12 @@ func (e *Driver) Signal(signal *DriverSignalVar) { } func (e *Driver) Wait(ctx context.Context) (map[string]any, error) { - err := e.callback.Wait(ctx) + stored, err := e.callback.WaitValue(ctx) if err != nil { return nil, err } - ret := make(map[string]any) - e.planBlder.DriverPlan.StoreMap.Range(func(k, v any) bool { - ret[k.(string)] = v - return true - }) - - return ret, nil + return stored, nil } func (e *Driver) execute() { @@ -74,22 +67,22 @@ func (e *Driver) execute() { Ops: p.Ops, } - cli, err := stgglb.AgentRPCPool.Acquire(stgglb.SelectGRPCAddress(&p.Node)) + cli, err := p.Worker.NewClient() if err != nil { - e.stopWith(fmt.Errorf("new agent rpc client of node %v: %w", p.Node.NodeID, err)) + e.stopWith(fmt.Errorf("new client to worker %v: %w", p.Worker, err)) return } - defer stgglb.AgentRPCPool.Release(cli) + defer cli.Close() - err = cli.ExecuteIOPlan(e.ctx, plan) + err = cli.ExecutePlan(e.ctx, plan) if err != nil { - e.stopWith(fmt.Errorf("execute plan at %v: %w", p.Node.NodeID, err)) + e.stopWith(fmt.Errorf("execute plan at worker %v: %w", p.Worker, err)) return } }(p) } - err := e.driverExec.Run(e.ctx) + stored, err := e.driverExec.Run(e.ctx) if err != nil { e.stopWith(fmt.Errorf("run executor switch: %w", err)) return @@ -97,7 +90,7 @@ func (e *Driver) execute() { wg.Wait() - e.callback.SetVoid() + e.callback.SetValue(stored) } func (e *Driver) stopWith(err error) { diff --git a/pkgs/ioswitch/exec/exec.go b/pkgs/ioswitch/exec/exec.go index d1c3558..ec16299 100644 --- a/pkgs/ioswitch/exec/exec.go +++ b/pkgs/ioswitch/exec/exec.go @@ -2,6 +2,10 @@ package exec import ( "context" + + "gitlink.org.cn/cloudream/common/pkgs/types" + "gitlink.org.cn/cloudream/common/utils/reflect2" + "gitlink.org.cn/cloudream/common/utils/serder" ) type PlanID string @@ -11,6 +15,12 @@ type Plan struct { Ops []Op `json:"ops"` } +var opUnion = serder.UseTypeUnionExternallyTagged(types.Ref(types.NewTypeUnion[Op]())) + type Op interface { - Execute(ctx context.Context, sw *Executor) error + Execute(ctx context.Context, e *Executor) error +} + +func UseOp[T Op]() { + opUnion.Add(reflect2.TypeOf[T]()) } diff --git a/pkgs/ioswitch/exec/executor.go b/pkgs/ioswitch/exec/executor.go index da4748d..dcd087e 100644 --- a/pkgs/ioswitch/exec/executor.go +++ b/pkgs/ioswitch/exec/executor.go @@ -21,12 +21,14 @@ type Executor struct { vars map[VarID]Var bindings []*bindingVars lock sync.Mutex + store map[string]any } func NewExecutor(plan Plan) *Executor { planning := Executor{ - plan: plan, - vars: make(map[VarID]Var), + plan: plan, + vars: make(map[VarID]Var), + store: make(map[string]any), } return &planning @@ -36,11 +38,11 @@ func (s *Executor) Plan() *Plan { return &s.plan } -func (s *Executor) Run(ctx context.Context) error { +func (s *Executor) Run(ctx context.Context) (map[string]any, error) { ctx2, cancel := context.WithCancel(ctx) defer cancel() - return sync2.ParallelDo(s.plan.Ops, func(o Op, idx int) error { + err := sync2.ParallelDo(s.plan.Ops, func(o Op, idx int) error { err := o.Execute(ctx2, s) s.lock.Lock() @@ -53,6 +55,11 @@ func (s *Executor) Run(ctx context.Context) error { return nil }) + if err != nil { + return nil, err + } + + return s.store, nil } func (s *Executor) BindVars(ctx context.Context, vs ...Var) error { @@ -132,6 +139,13 @@ loop: } } +func (s *Executor) Store(key string, val any) { + s.lock.Lock() + defer s.lock.Unlock() + + s.store[key] = val +} + func BindArrayVars[T Var](sw *Executor, ctx context.Context, vs []T) error { var vs2 []Var for _, v := range vs { diff --git a/pkgs/ioswitch/exec/plan_builder.go b/pkgs/ioswitch/exec/plan_builder.go index f12d27e..24b0525 100644 --- a/pkgs/ioswitch/exec/plan_builder.go +++ b/pkgs/ioswitch/exec/plan_builder.go @@ -2,44 +2,42 @@ package exec import ( "context" - "sync" "gitlink.org.cn/cloudream/common/pkgs/future" - cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" "gitlink.org.cn/cloudream/common/utils/lo2" ) type PlanBuilder struct { Vars []Var - WorkerPlans map[cdssdk.NodeID]*WorkerPlanBuilder + WorkerPlans []*WorkerPlanBuilder DriverPlan DriverPlanBuilder } func NewPlanBuilder() *PlanBuilder { bld := &PlanBuilder{ - WorkerPlans: make(map[cdssdk.NodeID]*WorkerPlanBuilder), - DriverPlan: DriverPlanBuilder{ - StoreMap: &sync.Map{}, - }, + DriverPlan: DriverPlanBuilder{}, } return bld } -func (b *PlanBuilder) AtExecutor() *DriverPlanBuilder { +func (b *PlanBuilder) AtDriver() *DriverPlanBuilder { return &b.DriverPlan } -func (b *PlanBuilder) AtAgent(node cdssdk.Node) *WorkerPlanBuilder { - agtPlan, ok := b.WorkerPlans[node.NodeID] - if !ok { - agtPlan = &WorkerPlanBuilder{ - Node: node, +func (b *PlanBuilder) AtWorker(worker WorkerInfo) *WorkerPlanBuilder { + for _, p := range b.WorkerPlans { + if p.Worker.Equals(worker) { + return p } - b.WorkerPlans[node.NodeID] = agtPlan } - return agtPlan + p := &WorkerPlanBuilder{ + Worker: worker, + } + b.WorkerPlans = append(b.WorkerPlans, p) + + return p } func (b *PlanBuilder) NewStreamVar() *StreamVar { @@ -89,7 +87,7 @@ func (b *PlanBuilder) Execute() *Driver { exec := Driver{ planID: planID, planBlder: b, - callback: future.NewSetVoid(), + callback: future.NewSetValue[map[string]any](), ctx: ctx, cancel: cancel, driverExec: NewExecutor(execPlan), @@ -100,8 +98,8 @@ func (b *PlanBuilder) Execute() *Driver { } type WorkerPlanBuilder struct { - Node cdssdk.Node - Ops []Op + Worker WorkerInfo + Ops []Op } func (b *WorkerPlanBuilder) AddOp(op Op) { @@ -113,8 +111,7 @@ func (b *WorkerPlanBuilder) RemoveOp(op Op) { } type DriverPlanBuilder struct { - Ops []Op - StoreMap *sync.Map + Ops []Op } func (b *DriverPlanBuilder) AddOp(op Op) { diff --git a/pkgs/ioswitch/exec/worker.go b/pkgs/ioswitch/exec/worker.go index 0a5d9d1..b9bf4f8 100644 --- a/pkgs/ioswitch/exec/worker.go +++ b/pkgs/ioswitch/exec/worker.go @@ -2,6 +2,7 @@ package exec import ( "context" + "io" "sync" "github.com/samber/lo" @@ -83,3 +84,20 @@ func (s *Worker) FindByIDContexted(ctx context.Context, id PlanID) *Executor { return sw } + +type WorkerInfo interface { + NewClient() (WorkerClient, error) + // 判断两个worker是否相同 + Equals(worker WorkerInfo) bool + // Worker信息,比如ID、地址等 + String() string +} + +type WorkerClient interface { + ExecutePlan(ctx context.Context, plan Plan) error + SendStream(ctx context.Context, planID PlanID, v *StreamVar, str io.ReadCloser) error + SendVar(ctx context.Context, planID PlanID, v Var) error + GetStream(ctx context.Context, planID PlanID, v *StreamVar, signal *SignalVar) (io.ReadCloser, error) + GetVar(ctx context.Context, planID PlanID, v Var, signal *SignalVar) error + Close() error +} diff --git a/pkgs/ioswitch/parser/fromto.go b/pkgs/ioswitch/parser/fromto.go deleted file mode 100644 index 5eaa0c6..0000000 --- a/pkgs/ioswitch/parser/fromto.go +++ /dev/null @@ -1,26 +0,0 @@ -package parser - -type From interface{} - -type To interface{} - -type FromTos []FromTo - -type FromTo struct { - Froms []From - Toes []To -} - -func NewFromTo() FromTo { - return FromTo{} -} - -func (ft *FromTo) AddFrom(from From) *FromTo { - ft.Froms = append(ft.Froms, from) - return ft -} - -func (ft *FromTo) AddTo(to To) *FromTo { - ft.Toes = append(ft.Toes, to) - return ft -} diff --git a/pkgs/ioswitch/parser/parser.go b/pkgs/ioswitch/parser/parser.go deleted file mode 100644 index 9991c96..0000000 --- a/pkgs/ioswitch/parser/parser.go +++ /dev/null @@ -1,9 +0,0 @@ -package parser - -import ( - "gitlink.org.cn/cloudream/common/pkgs/ioswitch/exec" -) - -type FromToParser interface { - Parse(ft FromTo, blder *exec.PlanBuilder) error -} diff --git a/pkgs/ioswitch/plan/generate.go b/pkgs/ioswitch/plan/generate.go new file mode 100644 index 0000000..6a7362b --- /dev/null +++ b/pkgs/ioswitch/plan/generate.go @@ -0,0 +1,157 @@ +package plan + +import ( + "gitlink.org.cn/cloudream/common/pkgs/ioswitch/dag" + "gitlink.org.cn/cloudream/common/pkgs/ioswitch/exec" + "gitlink.org.cn/cloudream/common/pkgs/ioswitch/plan/ops" + "gitlink.org.cn/cloudream/storage/common/pkgs/ioswitch" +) + +func Generate(graph *dag.Graph, planBld *exec.PlanBuilder) error { + generateSend(graph) + return buildPlan(graph, planBld) +} + +// 生成Send指令 +func generateSend(graph *dag.Graph) { + graph.Walk(func(node *dag.Node) bool { + for _, out := range node.OutputStreams { + to := out.Toes[0] + if to.Node.Env.Equals(node.Env) { + continue + } + + switch to.Node.Env.Type { + case dag.EnvDriver: + // // 如果是要送到Driver,则只能由Driver主动去拉取 + getNode := graph.NewNode(&ops.GetStreamType{}, ioswitch.NodeProps{}) + getNode.Env.ToEnvDriver() + + // // 同时需要对此变量生成HoldUntil指令,避免Plan结束时Get指令还未到达 + holdNode := graph.NewNode(&ops.HoldUntilType{}, ioswitch.NodeProps{}) + holdNode.Env = node.Env + + // 将Get指令的信号送到Hold指令 + getNode.OutputValues[0].To(holdNode, 0) + // 将Get指令的输出送到目的地 + getNode.OutputStreams[0].To(to.Node, to.SlotIndex) + out.Toes = nil + // 将源节点的输出送到Hold指令 + out.To(holdNode, 0) + // 将Hold指令的输出送到Get指令 + holdNode.OutputStreams[0].To(getNode, 0) + + case dag.EnvWorker: + // 如果是要送到Agent,则可以直接发送 + n := graph.NewNode(&ops.SendStreamType{}, ioswitch.NodeProps{}) + n.Env = node.Env + n.OutputStreams[0].To(to.Node, to.SlotIndex) + out.Toes = nil + out.To(n, 0) + } + } + + for _, out := range node.OutputValues { + to := out.Toes[0] + if to.Node.Env.Equals(node.Env) { + continue + } + + switch to.Node.Env.Type { + case dag.EnvDriver: + // // 如果是要送到Driver,则只能由Driver主动去拉取 + getNode := graph.NewNode(&ops.GetVaType{}, ioswitch.NodeProps{}) + getNode.Env.ToEnvDriver() + + // // 同时需要对此变量生成HoldUntil指令,避免Plan结束时Get指令还未到达 + holdNode := graph.NewNode(&ops.HoldUntilType{}, ioswitch.NodeProps{}) + holdNode.Env = node.Env + + // 将Get指令的信号送到Hold指令 + getNode.OutputValues[0].To(holdNode, 0) + // 将Get指令的输出送到目的地 + getNode.OutputValues[1].To(to.Node, to.SlotIndex) + out.Toes = nil + // 将源节点的输出送到Hold指令 + out.To(holdNode, 0) + // 将Hold指令的输出送到Get指令 + holdNode.OutputValues[0].To(getNode, 0) + + case dag.EnvWorker: + // 如果是要送到Agent,则可以直接发送 + n := graph.NewNode(&ops.SendVarType{}, ioswitch.NodeProps{}) + n.Env = node.Env + n.OutputValues[0].To(to.Node, to.SlotIndex) + out.Toes = nil + out.To(n, 0) + } + } + + return true + }) +} + +// 生成Plan +func buildPlan(graph *dag.Graph, blder *exec.PlanBuilder) error { + var retErr error + graph.Walk(func(node *dag.Node) bool { + for _, out := range node.OutputStreams { + if out.Var != nil { + continue + } + + out.Var = blder.NewStreamVar() + } + + for _, in := range node.InputStreams { + if in.Var != nil { + continue + } + + in.Var = blder.NewStreamVar() + } + + for _, out := range node.OutputValues { + if out.Var != nil { + continue + } + + switch out.Type { + case dag.StringValueVar: + out.Var = blder.NewStringVar() + case dag.SignalValueVar: + out.Var = blder.NewSignalVar() + } + } + + for _, in := range node.InputValues { + if in.Var != nil { + continue + } + + switch in.Type { + case dag.StringValueVar: + in.Var = blder.NewStringVar() + case dag.SignalValueVar: + in.Var = blder.NewSignalVar() + } + } + + op, err := node.Type.GenerateOp(node) + if err != nil { + retErr = err + return false + } + + switch node.Env.Type { + case dag.EnvDriver: + blder.AtDriver().AddOp(op) + case dag.EnvWorker: + blder.AtWorker(node.Env.Worker).AddOp(op) + } + + return true + }) + + return retErr +} diff --git a/pkgs/ioswitch/plan/ops/driver.go b/pkgs/ioswitch/plan/ops/driver.go new file mode 100644 index 0000000..214f333 --- /dev/null +++ b/pkgs/ioswitch/plan/ops/driver.go @@ -0,0 +1,43 @@ +package ops + +import ( + "fmt" + + "gitlink.org.cn/cloudream/common/pkgs/ioswitch/dag" + "gitlink.org.cn/cloudream/common/pkgs/ioswitch/exec" +) + +type FromDriverType struct { + Handle *exec.DriverWriteStream +} + +func (t *FromDriverType) InitNode(node *dag.Node) { + dag.NodeNewOutputStream(node, nil) +} + +func (t *FromDriverType) GenerateOp(op *dag.Node) (exec.Op, error) { + t.Handle.Var = op.OutputStreams[0].Var + return nil, nil +} + +func (t *FromDriverType) String(node *dag.Node) string { + return fmt.Sprintf("FromDriver[]%v%v", formatStreamIO(node), formatValueIO(node)) +} + +type ToDriverType struct { + Handle *exec.DriverReadStream + Range exec.Range +} + +func (t *ToDriverType) InitNode(node *dag.Node) { + dag.NodeDeclareInputStream(node, 1) +} + +func (t *ToDriverType) GenerateOp(op *dag.Node) (exec.Op, error) { + t.Handle.Var = op.InputStreams[0].Var + return nil, nil +} + +func (t *ToDriverType) String(node *dag.Node) string { + return fmt.Sprintf("ToDriver[%v+%v]%v%v", t.Range.Offset, t.Range.Length, formatStreamIO(node), formatValueIO(node)) +} diff --git a/pkgs/ioswitch/plan/ops/drop.go b/pkgs/ioswitch/plan/ops/drop.go new file mode 100644 index 0000000..22f2d5d --- /dev/null +++ b/pkgs/ioswitch/plan/ops/drop.go @@ -0,0 +1,52 @@ +package ops + +import ( + "context" + "fmt" + "io" + + "gitlink.org.cn/cloudream/common/pkgs/ioswitch/dag" + "gitlink.org.cn/cloudream/common/pkgs/ioswitch/exec" +) + +func init() { + exec.UseOp[*DropStream]() +} + +type DropStream struct { + Input *exec.StreamVar `json:"input"` +} + +func (o *DropStream) Execute(ctx context.Context, e *exec.Executor) error { + err := e.BindVars(ctx, o.Input) + if err != nil { + return err + } + + for { + buf := make([]byte, 1024*8) + _, err = o.Input.Stream.Read(buf) + if err == io.EOF { + return nil + } + if err != nil { + return err + } + } +} + +type DropType struct{} + +func (t *DropType) InitNode(node *dag.Node) { + dag.NodeDeclareInputStream(node, 1) +} + +func (t *DropType) GenerateOp(op *dag.Node) (exec.Op, error) { + return &DropStream{ + Input: op.InputStreams[0].Var, + }, nil +} + +func (t *DropType) String(node *dag.Node) string { + return fmt.Sprintf("Drop[]%v%v", formatStreamIO(node), formatValueIO(node)) +} diff --git a/pkgs/ioswitch/plan/ops/send.go b/pkgs/ioswitch/plan/ops/send.go new file mode 100644 index 0000000..148938a --- /dev/null +++ b/pkgs/ioswitch/plan/ops/send.go @@ -0,0 +1,224 @@ +package ops + +import ( + "context" + "fmt" + "io" + + "gitlink.org.cn/cloudream/common/pkgs/future" + "gitlink.org.cn/cloudream/common/pkgs/ioswitch/dag" + "gitlink.org.cn/cloudream/common/pkgs/ioswitch/exec" + "gitlink.org.cn/cloudream/common/pkgs/logger" + "gitlink.org.cn/cloudream/common/utils/io2" +) + +func init() { + exec.UseOp[*SendStream]() + exec.UseOp[*GetStream]() + exec.UseOp[*SendVar]() + exec.UseOp[*GetVar]() +} + +type SendStream struct { + Input *exec.StreamVar `json:"input"` + Send *exec.StreamVar `json:"send"` + Worker exec.WorkerInfo `json:"worker"` +} + +func (o *SendStream) Execute(ctx context.Context, e *exec.Executor) error { + err := e.BindVars(ctx, o.Input) + if err != nil { + return err + } + defer o.Input.Stream.Close() + + cli, err := o.Worker.NewClient() + if err != nil { + return fmt.Errorf("new worker %v client: %w", o.Worker, err) + } + defer cli.Close() + + logger.Debugf("sending stream %v as %v to worker %v", o.Input.ID, o.Send.ID, o.Worker) + + // 发送后流的ID不同 + err = cli.SendStream(ctx, e.Plan().ID, o.Send, o.Input.Stream) + if err != nil { + return fmt.Errorf("sending stream: %w", err) + } + + return nil +} + +type GetStream struct { + Signal *exec.SignalVar `json:"signal"` + Target *exec.StreamVar `json:"target"` + Output *exec.StreamVar `json:"output"` + Worker exec.WorkerInfo `json:"worker"` +} + +func (o *GetStream) Execute(ctx context.Context, e *exec.Executor) error { + cli, err := o.Worker.NewClient() + if err != nil { + return fmt.Errorf("new worker %v client: %w", o.Worker, err) + } + defer cli.Close() + + logger.Debugf("getting stream %v as %v from worker %v", o.Target.ID, o.Output.ID, o.Worker) + + str, err := cli.GetStream(ctx, e.Plan().ID, o.Target, o.Signal) + if err != nil { + return fmt.Errorf("getting stream: %w", err) + } + + fut := future.NewSetVoid() + // 获取后送到本地的流ID是不同的 + o.Output.Stream = io2.AfterReadClosedOnce(str, func(closer io.ReadCloser) { + fut.SetVoid() + }) + e.PutVars(o.Output) + + return fut.Wait(ctx) +} + +type SendVar struct { + Input exec.Var `json:"input"` + Send exec.Var `json:"send"` + Worker exec.WorkerInfo `json:"worker"` +} + +func (o *SendVar) Execute(ctx context.Context, e *exec.Executor) error { + err := e.BindVars(ctx, o.Input) + if err != nil { + return err + } + + cli, err := o.Worker.NewClient() + if err != nil { + return fmt.Errorf("new worker %v client: %w", o.Worker, err) + } + defer cli.Close() + + logger.Debugf("sending var %v as %v to worker %v", o.Input.GetID(), o.Send.GetID(), o.Worker) + + exec.AssignVar(o.Input, o.Send) + err = cli.SendVar(ctx, e.Plan().ID, o.Send) + if err != nil { + return fmt.Errorf("sending var: %w", err) + } + + return nil +} + +type GetVar struct { + Signal *exec.SignalVar `json:"signal"` + Target exec.Var `json:"target"` + Output exec.Var `json:"output"` + Worker exec.WorkerInfo `json:"worker"` +} + +func (o *GetVar) Execute(ctx context.Context, e *exec.Executor) error { + cli, err := o.Worker.NewClient() + if err != nil { + return fmt.Errorf("new worker %v client: %w", o.Worker, err) + } + defer cli.Close() + + logger.Debugf("getting var %v as %v from worker %v", o.Target.GetID(), o.Output.GetID(), o.Worker) + + err = cli.GetVar(ctx, e.Plan().ID, o.Target, o.Signal) + if err != nil { + return fmt.Errorf("getting var: %w", err) + } + exec.AssignVar(o.Target, o.Output) + e.PutVars(o.Output) + + return nil +} + +type SendStreamType struct { + ToWorker exec.WorkerInfo +} + +func (t *SendStreamType) InitNode(node *dag.Node) { + dag.NodeDeclareInputStream(node, 1) + dag.NodeNewOutputStream(node, nil) +} + +func (t *SendStreamType) GenerateOp(op *dag.Node) (exec.Op, error) { + return &SendStream{ + Input: op.InputStreams[0].Var, + Send: op.OutputStreams[0].Var, + Worker: t.ToWorker, + }, nil +} + +func (t *SendStreamType) String(node *dag.Node) string { + return fmt.Sprintf("SendStream[]%v%v", formatStreamIO(node), formatValueIO(node)) +} + +type SendVarType struct { + ToWorker exec.WorkerInfo +} + +func (t *SendVarType) InitNode(node *dag.Node) { + dag.NodeDeclareInputValue(node, 1) + dag.NodeNewOutputValue(node, nil) +} + +func (t *SendVarType) GenerateOp(op *dag.Node) (exec.Op, error) { + return &SendVar{ + Input: op.InputValues[0].Var, + Send: op.OutputValues[0].Var, + Worker: t.ToWorker, + }, nil +} + +func (t *SendVarType) String(node *dag.Node) string { + return fmt.Sprintf("SendVar[]%v%v", formatStreamIO(node), formatValueIO(node)) +} + +type GetStreamType struct { + FromWorker exec.WorkerInfo +} + +func (t *GetStreamType) InitNode(node *dag.Node) { + dag.NodeDeclareInputStream(node, 1) + dag.NodeNewOutputValue(node, nil) + dag.NodeNewOutputStream(node, nil) +} + +func (t *GetStreamType) GenerateOp(op *dag.Node) (exec.Op, error) { + return &GetStream{ + Signal: op.OutputValues[0].Var.(*exec.SignalVar), + Output: op.OutputStreams[0].Var, + Target: op.InputStreams[0].Var, + Worker: t.FromWorker, + }, nil +} + +func (t *GetStreamType) String(node *dag.Node) string { + return fmt.Sprintf("GetStream[]%v%v", formatStreamIO(node), formatValueIO(node)) +} + +type GetVaType struct { + FromWorker exec.WorkerInfo +} + +func (t *GetVaType) InitNode(node *dag.Node) { + dag.NodeDeclareInputValue(node, 1) + dag.NodeNewOutputValue(node, nil) + dag.NodeNewOutputValue(node, nil) +} + +func (t *GetVaType) GenerateOp(op *dag.Node) (exec.Op, error) { + return &GetVar{ + Signal: op.OutputValues[0].Var.(*exec.SignalVar), + Output: op.OutputValues[1].Var, + Target: op.InputValues[0].Var, + Worker: t.FromWorker, + }, nil +} + +func (t *GetVaType) String(node *dag.Node) string { + return fmt.Sprintf("GetVar[]%v%v", formatStreamIO(node), formatValueIO(node)) +} diff --git a/pkgs/ioswitch/plan/ops/store.go b/pkgs/ioswitch/plan/ops/store.go new file mode 100644 index 0000000..08886bd --- /dev/null +++ b/pkgs/ioswitch/plan/ops/store.go @@ -0,0 +1,49 @@ +package ops + +import ( + "context" + "fmt" + + "gitlink.org.cn/cloudream/common/pkgs/ioswitch/dag" + "gitlink.org.cn/cloudream/common/pkgs/ioswitch/exec" +) + +type Store struct { + Var exec.Var + Key string +} + +func (o *Store) Execute(ctx context.Context, e *exec.Executor) error { + err := e.BindVars(ctx, o.Var) + if err != nil { + return err + } + + switch v := o.Var.(type) { + case *exec.IntVar: + e.Store(o.Key, v.Value) + case *exec.StringVar: + e.Store(o.Key, v.Value) + } + + return nil +} + +type StoreType struct { + StoreKey string +} + +func (t *StoreType) InitNode(node *dag.Node) { + dag.NodeDeclareInputValue(node, 1) +} + +func (t *StoreType) GenerateOp(op *dag.Node) (exec.Op, error) { + return &Store{ + Var: op.InputValues[0].Var, + Key: t.StoreKey, + }, nil +} + +func (t *StoreType) String(node *dag.Node) string { + return fmt.Sprintf("Store[%s]%v%v", t.StoreKey, formatStreamIO(node), formatValueIO(node)) +} diff --git a/pkgs/ioswitch/plan/ops/sync.go b/pkgs/ioswitch/plan/ops/sync.go new file mode 100644 index 0000000..26a98ab --- /dev/null +++ b/pkgs/ioswitch/plan/ops/sync.go @@ -0,0 +1,172 @@ +package ops + +import ( + "context" + "fmt" + "io" + + "gitlink.org.cn/cloudream/common/pkgs/future" + "gitlink.org.cn/cloudream/common/pkgs/ioswitch/dag" + "gitlink.org.cn/cloudream/common/pkgs/ioswitch/exec" +) + +func init() { + exec.UseOp[*OnStreamBegin]() + exec.UseOp[*OnStreamEnd]() + exec.UseOp[*HoldUntil]() + exec.UseOp[*HangUntil]() + exec.UseOp[*Broadcast]() +} + +type OnStreamBegin struct { + Raw *exec.StreamVar `json:"raw"` + New *exec.StreamVar `json:"new"` + Signal *exec.SignalVar `json:"signal"` +} + +func (o *OnStreamBegin) Execute(ctx context.Context, e *exec.Executor) error { + err := e.BindVars(ctx, o.Raw) + if err != nil { + return err + } + + o.New.Stream = o.Raw.Stream + + e.PutVars(o.New, o.Signal) + return nil +} + +type OnStreamEnd struct { + Raw *exec.StreamVar `json:"raw"` + New *exec.StreamVar `json:"new"` + Signal *exec.SignalVar `json:"signal"` +} + +type onStreamEnd struct { + inner io.ReadCloser + callback *future.SetVoidFuture +} + +func (o *onStreamEnd) Read(p []byte) (n int, err error) { + n, err = o.inner.Read(p) + if err == io.EOF { + o.callback.SetVoid() + } else if err != nil { + o.callback.SetError(err) + } + return n, err +} + +func (o *onStreamEnd) Close() error { + o.callback.SetError(fmt.Errorf("stream closed early")) + return o.inner.Close() +} + +func (o *OnStreamEnd) Execute(ctx context.Context, e *exec.Executor) error { + err := e.BindVars(ctx, o.Raw) + if err != nil { + return err + } + + cb := future.NewSetVoid() + + o.New.Stream = &onStreamEnd{ + inner: o.Raw.Stream, + callback: cb, + } + e.PutVars(o.New) + + err = cb.Wait(ctx) + if err != nil { + return err + } + + e.PutVars(o.Signal) + return nil +} + +type HoldUntil struct { + Waits []*exec.SignalVar `json:"waits"` + Holds []exec.Var `json:"holds"` + Emits []exec.Var `json:"emits"` +} + +func (w *HoldUntil) Execute(ctx context.Context, e *exec.Executor) error { + err := e.BindVars(ctx, w.Holds...) + if err != nil { + return err + } + + err = exec.BindArrayVars(e, ctx, w.Waits) + if err != nil { + return err + } + + for i := 0; i < len(w.Holds); i++ { + err := exec.AssignVar(w.Holds[i], w.Emits[i]) + if err != nil { + return err + } + } + + e.PutVars(w.Emits...) + return nil +} + +type HangUntil struct { + Waits []*exec.SignalVar `json:"waits"` + Op exec.Op `json:"op"` +} + +func (h *HangUntil) Execute(ctx context.Context, e *exec.Executor) error { + err := exec.BindArrayVars(e, ctx, h.Waits) + if err != nil { + return err + } + + return h.Op.Execute(ctx, e) +} + +type Broadcast struct { + Source *exec.SignalVar `json:"source"` + Targets []*exec.SignalVar `json:"targets"` +} + +func (b *Broadcast) Execute(ctx context.Context, e *exec.Executor) error { + err := e.BindVars(ctx, b.Source) + if err != nil { + return err + } + + exec.PutArrayVars(e, b.Targets) + return nil +} + +type HoldUntilType struct { +} + +func (t *HoldUntilType) InitNode(node *dag.Node) { + dag.NodeDeclareInputValue(node, 1) +} + +func (t *HoldUntilType) GenerateOp(op *dag.Node) (exec.Op, error) { + o := &HoldUntil{ + Waits: []*exec.SignalVar{op.InputValues[0].Var.(*exec.SignalVar)}, + } + + for i := 0; i < len(op.OutputValues); i++ { + o.Holds = append(o.Holds, op.InputValues[i+1].Var) + o.Emits = append(o.Emits, op.OutputValues[i].Var) + } + + for i := 0; i < len(op.OutputStreams); i++ { + o.Holds = append(o.Holds, op.InputStreams[i].Var) + o.Emits = append(o.Emits, op.OutputStreams[i].Var) + } + + return o, nil +} + +func (t *HoldUntilType) String(node *dag.Node) string { + return fmt.Sprintf("HoldUntil[]%v%v", formatStreamIO(node), formatValueIO(node)) +} diff --git a/pkgs/ioswitch/plan/ops/utils.go b/pkgs/ioswitch/plan/ops/utils.go new file mode 100644 index 0000000..0a32ac9 --- /dev/null +++ b/pkgs/ioswitch/plan/ops/utils.go @@ -0,0 +1,75 @@ +package ops + +import ( + "fmt" + + "gitlink.org.cn/cloudream/common/pkgs/ioswitch/dag" +) + +func formatStreamIO(node *dag.Node) string { + is := "" + for i, in := range node.InputStreams { + if i > 0 { + is += "," + } + + if in == nil { + is += "." + } else { + is += fmt.Sprintf("%v", in.ID) + } + } + + os := "" + for i, out := range node.OutputStreams { + if i > 0 { + os += "," + } + + if out == nil { + os += "." + } else { + os += fmt.Sprintf("%v", out.ID) + } + } + + if is == "" && os == "" { + return "" + } + + return fmt.Sprintf("S{%s>%s}", is, os) +} + +func formatValueIO(node *dag.Node) string { + is := "" + for i, in := range node.InputValues { + if i > 0 { + is += "," + } + + if in == nil { + is += "." + } else { + is += fmt.Sprintf("%v", in.ID) + } + } + + os := "" + for i, out := range node.OutputValues { + if i > 0 { + os += "," + } + + if out == nil { + os += "." + } else { + os += fmt.Sprintf("%v", out.ID) + } + } + + if is == "" && os == "" { + return "" + } + + return fmt.Sprintf("V{%s>%s}", is, os) +} diff --git a/pkgs/ioswitch/plan/ops/var.go b/pkgs/ioswitch/plan/ops/var.go new file mode 100644 index 0000000..d2a7fe6 --- /dev/null +++ b/pkgs/ioswitch/plan/ops/var.go @@ -0,0 +1,20 @@ +package ops + +import ( + "context" + + "gitlink.org.cn/cloudream/common/pkgs/ioswitch/exec" +) + +func init() { + exec.UseOp[*ConstVar]() +} + +type ConstVar struct { + Var *exec.StringVar `json:"var"` +} + +func (o *ConstVar) Execute(ctx context.Context, e *exec.Executor) error { + e.PutVars(o.Var) + return nil +}