| @@ -61,6 +61,7 @@ require ( | |||||
| go.uber.org/zap v1.24.0 // indirect | go.uber.org/zap v1.24.0 // indirect | ||||
| golang.org/x/crypto v0.6.0 // indirect | golang.org/x/crypto v0.6.0 // indirect | ||||
| golang.org/x/net v0.8.0 // indirect | golang.org/x/net v0.8.0 // indirect | ||||
| golang.org/x/sync v0.1.0 | |||||
| golang.org/x/sys v0.6.0 // indirect | golang.org/x/sys v0.6.0 // indirect | ||||
| golang.org/x/text v0.8.0 // indirect | golang.org/x/text v0.8.0 // indirect | ||||
| google.golang.org/genproto v0.0.0-20230403163135-c38d8f061ccd // indirect | google.golang.org/genproto v0.0.0-20230403163135-c38d8f061ccd // indirect | ||||
| @@ -147,6 +147,8 @@ golang.org/x/net v0.8.0/go.mod h1:QVkue5JL9kW//ek3r6jTKnTFis1tRmNAW2P1shuFdJc= | |||||
| golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= | golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= | ||||
| golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= | golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= | ||||
| golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= | golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= | ||||
| golang.org/x/sync v0.1.0 h1:wsuoTGHzEhffawBOhz5CYhcrV4IdKZbEyZjBMuTp12o= | |||||
| golang.org/x/sync v0.1.0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= | |||||
| golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= | golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= | ||||
| golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= | golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= | ||||
| golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= | golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= | ||||
| @@ -4,18 +4,18 @@ import ( | |||||
| "gitlink.org.cn/cloudream/common/utils/lo2" | "gitlink.org.cn/cloudream/common/utils/lo2" | ||||
| ) | ) | ||||
| type Graph[NP any, VP any] struct { | |||||
| Nodes []*Node[NP, VP] | |||||
| type Graph struct { | |||||
| Nodes []*Node | |||||
| isWalking bool | isWalking bool | ||||
| nextVarID int | nextVarID int | ||||
| } | } | ||||
| func NewGraph[NP any, VP any]() *Graph[NP, VP] { | |||||
| return &Graph[NP, VP]{} | |||||
| func NewGraph() *Graph { | |||||
| return &Graph{} | |||||
| } | } | ||||
| func (g *Graph[NP, VP]) NewNode(typ NodeType[NP, VP], props NP) *Node[NP, VP] { | |||||
| n := &Node[NP, VP]{ | |||||
| func (g *Graph) NewNode(typ NodeType, props any) *Node { | |||||
| n := &Node{ | |||||
| Type: typ, | Type: typ, | ||||
| Props: props, | Props: props, | ||||
| Graph: g, | Graph: g, | ||||
| @@ -25,7 +25,7 @@ func (g *Graph[NP, VP]) NewNode(typ NodeType[NP, VP], props NP) *Node[NP, VP] { | |||||
| return n | return n | ||||
| } | } | ||||
| func (g *Graph[NP, VP]) RemoveNode(node *Node[NP, VP]) { | |||||
| func (g *Graph) RemoveNode(node *Node) { | |||||
| for i, n := range g.Nodes { | for i, n := range g.Nodes { | ||||
| if n == node { | if n == node { | ||||
| if g.isWalking { | if g.isWalking { | ||||
| @@ -38,7 +38,7 @@ func (g *Graph[NP, VP]) RemoveNode(node *Node[NP, VP]) { | |||||
| } | } | ||||
| } | } | ||||
| func (g *Graph[NP, VP]) Walk(cb func(node *Node[NP, VP]) bool) { | |||||
| func (g *Graph) Walk(cb func(node *Node) bool) { | |||||
| g.isWalking = true | g.isWalking = true | ||||
| for i := 0; i < len(g.Nodes); i++ { | for i := 0; i < len(g.Nodes); i++ { | ||||
| if g.Nodes[i] == nil { | if g.Nodes[i] == nil { | ||||
| @@ -54,17 +54,17 @@ func (g *Graph[NP, VP]) Walk(cb func(node *Node[NP, VP]) bool) { | |||||
| g.Nodes = lo2.RemoveAllDefault(g.Nodes) | g.Nodes = lo2.RemoveAllDefault(g.Nodes) | ||||
| } | } | ||||
| func (g *Graph[NP, VP]) genVarID() int { | |||||
| func (g *Graph) genVarID() int { | |||||
| g.nextVarID++ | g.nextVarID++ | ||||
| return g.nextVarID | return g.nextVarID | ||||
| } | } | ||||
| func NewNode[NP any, VP any, NT NodeType[NP, VP]](graph *Graph[NP, VP], typ NT, props NP) (*Node[NP, VP], NT) { | |||||
| func NewNode[N NodeType](graph *Graph, typ N, props any) (*Node, N) { | |||||
| return graph.NewNode(typ, props), typ | return graph.NewNode(typ, props), typ | ||||
| } | } | ||||
| func WalkOnlyType[N NodeType[NP, VP], NP any, VP any](g *Graph[NP, VP], cb func(node *Node[NP, VP], typ N) bool) { | |||||
| g.Walk(func(node *Node[NP, VP]) bool { | |||||
| func WalkOnlyType[N NodeType](g *Graph, cb func(node *Node, typ N) bool) { | |||||
| g.Walk(func(node *Node) bool { | |||||
| typ, ok := node.Type.(N) | typ, ok := node.Type.(N) | ||||
| if ok { | if ok { | ||||
| return cb(node, typ) | return cb(node, typ) | ||||
| @@ -6,30 +6,23 @@ import ( | |||||
| "gitlink.org.cn/cloudream/common/pkgs/ioswitch/exec" | "gitlink.org.cn/cloudream/common/pkgs/ioswitch/exec" | ||||
| ) | ) | ||||
| type NodeType[NP any, VP any] interface { | |||||
| InitNode(node *Node[NP, VP]) | |||||
| String(node *Node[NP, VP]) string | |||||
| GenerateOp(node *Node[NP, VP], blder *exec.PlanBuilder) error | |||||
| } | |||||
| type WorkerInfo interface { | |||||
| // 获取连接到这个worker的GRPC服务的地址 | |||||
| GetAddress() string | |||||
| // 判断两个worker是否相同 | |||||
| Equals(worker WorkerInfo) bool | |||||
| type NodeType interface { | |||||
| InitNode(node *Node) | |||||
| String(node *Node) string | |||||
| GenerateOp(node *Node) (exec.Op, error) | |||||
| } | } | ||||
| type NodeEnvType string | type NodeEnvType string | ||||
| const ( | const ( | ||||
| EnvUnknown NodeEnvType = "" | |||||
| EnvExecutor NodeEnvType = "Executor" | |||||
| EnvWorker NodeEnvType = "Worker" | |||||
| EnvUnknown NodeEnvType = "" | |||||
| EnvDriver NodeEnvType = "Driver" | |||||
| EnvWorker NodeEnvType = "Worker" | |||||
| ) | ) | ||||
| type NodeEnv struct { | type NodeEnv struct { | ||||
| Type NodeEnvType | Type NodeEnvType | ||||
| Worker WorkerInfo | |||||
| Worker exec.WorkerInfo | |||||
| } | } | ||||
| func (e *NodeEnv) ToEnvUnknown() { | func (e *NodeEnv) ToEnvUnknown() { | ||||
| @@ -37,12 +30,12 @@ func (e *NodeEnv) ToEnvUnknown() { | |||||
| e.Worker = nil | e.Worker = nil | ||||
| } | } | ||||
| func (e *NodeEnv) ToEnvExecutor() { | |||||
| e.Type = EnvExecutor | |||||
| func (e *NodeEnv) ToEnvDriver() { | |||||
| e.Type = EnvDriver | |||||
| e.Worker = nil | e.Worker = nil | ||||
| } | } | ||||
| func (e *NodeEnv) ToEnvWorker(worker WorkerInfo) { | |||||
| func (e *NodeEnv) ToEnvWorker(worker exec.WorkerInfo) { | |||||
| e.Type = EnvWorker | e.Type = EnvWorker | ||||
| e.Worker = worker | e.Worker = worker | ||||
| } | } | ||||
| @@ -59,17 +52,17 @@ func (e *NodeEnv) Equals(other NodeEnv) bool { | |||||
| return e.Worker.Equals(other.Worker) | return e.Worker.Equals(other.Worker) | ||||
| } | } | ||||
| type Node[NP any, VP any] struct { | |||||
| Type NodeType[NP, VP] | |||||
| type Node struct { | |||||
| Type NodeType | |||||
| Env NodeEnv | Env NodeEnv | ||||
| Props NP | |||||
| InputStreams []*StreamVar[NP, VP] | |||||
| OutputStreams []*StreamVar[NP, VP] | |||||
| InputValues []*ValueVar[NP, VP] | |||||
| OutputValues []*ValueVar[NP, VP] | |||||
| Graph *Graph[NP, VP] | |||||
| Props any | |||||
| InputStreams []*StreamVar | |||||
| OutputStreams []*StreamVar | |||||
| InputValues []*ValueVar | |||||
| OutputValues []*ValueVar | |||||
| Graph *Graph | |||||
| } | } | ||||
| func (n *Node[NP, VP]) String() string { | |||||
| func (n *Node) String() string { | |||||
| return fmt.Sprintf("%v", n.Type.String(n)) | return fmt.Sprintf("%v", n.Type.String(n)) | ||||
| } | } | ||||
| @@ -0,0 +1,13 @@ | |||||
| package dag | |||||
| func NProps[T any](n *Node) T { | |||||
| return n.Props.(T) | |||||
| } | |||||
| func SProps[T any](str *StreamVar) T { | |||||
| return str.Props.(T) | |||||
| } | |||||
| func VProps[T any](v *ValueVar) T { | |||||
| return v.Props.(T) | |||||
| } | |||||
| @@ -1,33 +1,37 @@ | |||||
| package dag | package dag | ||||
| import "gitlink.org.cn/cloudream/common/utils/lo2" | |||||
| import ( | |||||
| "gitlink.org.cn/cloudream/common/pkgs/ioswitch/exec" | |||||
| "gitlink.org.cn/cloudream/common/utils/lo2" | |||||
| ) | |||||
| type EndPoint[NP any, VP any] struct { | |||||
| Node *Node[NP, VP] | |||||
| type EndPoint struct { | |||||
| Node *Node | |||||
| SlotIndex int // 所连接的Node的Output或Input数组的索引 | SlotIndex int // 所连接的Node的Output或Input数组的索引 | ||||
| } | } | ||||
| type StreamVar[NP any, VP any] struct { | |||||
| type StreamVar struct { | |||||
| ID int | ID int | ||||
| From EndPoint[NP, VP] | |||||
| Toes []EndPoint[NP, VP] | |||||
| Props VP | |||||
| From EndPoint | |||||
| Toes []EndPoint | |||||
| Props any | |||||
| Var *exec.StreamVar | |||||
| } | } | ||||
| func (v *StreamVar[NP, VP]) To(to *Node[NP, VP], slotIdx int) int { | |||||
| v.Toes = append(v.Toes, EndPoint[NP, VP]{Node: to, SlotIndex: slotIdx}) | |||||
| func (v *StreamVar) To(to *Node, slotIdx int) int { | |||||
| v.Toes = append(v.Toes, EndPoint{Node: to, SlotIndex: slotIdx}) | |||||
| to.InputStreams[slotIdx] = v | to.InputStreams[slotIdx] = v | ||||
| return len(v.Toes) - 1 | return len(v.Toes) - 1 | ||||
| } | } | ||||
| // func (v *StreamVar[NP, VP]) NotTo(toIdx int) EndPoint[NP, VP] { | |||||
| // func (v *StreamVar) NotTo(toIdx int) EndPoint { | |||||
| // ed := v.Toes[toIdx] | // ed := v.Toes[toIdx] | ||||
| // lo2.RemoveAt(v.Toes, toIdx) | // lo2.RemoveAt(v.Toes, toIdx) | ||||
| // ed.Node.InputStreams[ed.SlotIndex] = nil | // ed.Node.InputStreams[ed.SlotIndex] = nil | ||||
| // return ed | // return ed | ||||
| // } | // } | ||||
| func (v *StreamVar[NP, VP]) NotTo(node *Node[NP, VP]) (EndPoint[NP, VP], bool) { | |||||
| func (v *StreamVar) NotTo(node *Node) (EndPoint, bool) { | |||||
| for i, ed := range v.Toes { | for i, ed := range v.Toes { | ||||
| if ed.Node == node { | if ed.Node == node { | ||||
| v.Toes = lo2.RemoveAt(v.Toes, i) | v.Toes = lo2.RemoveAt(v.Toes, i) | ||||
| @@ -36,12 +40,12 @@ func (v *StreamVar[NP, VP]) NotTo(node *Node[NP, VP]) (EndPoint[NP, VP], bool) { | |||||
| } | } | ||||
| } | } | ||||
| return EndPoint[NP, VP]{}, false | |||||
| return EndPoint{}, false | |||||
| } | } | ||||
| func (v *StreamVar[NP, VP]) NotToWhere(pred func(to EndPoint[NP, VP]) bool) []EndPoint[NP, VP] { | |||||
| var newToes []EndPoint[NP, VP] | |||||
| var rmed []EndPoint[NP, VP] | |||||
| func (v *StreamVar) NotToWhere(pred func(to EndPoint) bool) []EndPoint { | |||||
| var newToes []EndPoint | |||||
| var rmed []EndPoint | |||||
| for _, ed := range v.Toes { | for _, ed := range v.Toes { | ||||
| if pred(ed) { | if pred(ed) { | ||||
| ed.Node.InputStreams[ed.SlotIndex] = nil | ed.Node.InputStreams[ed.SlotIndex] = nil | ||||
| @@ -54,7 +58,7 @@ func (v *StreamVar[NP, VP]) NotToWhere(pred func(to EndPoint[NP, VP]) bool) []En | |||||
| return rmed | return rmed | ||||
| } | } | ||||
| func (v *StreamVar[NP, VP]) NotToAll() []EndPoint[NP, VP] { | |||||
| func (v *StreamVar) NotToAll() []EndPoint { | |||||
| for _, ed := range v.Toes { | for _, ed := range v.Toes { | ||||
| ed.Node.InputStreams[ed.SlotIndex] = nil | ed.Node.InputStreams[ed.SlotIndex] = nil | ||||
| } | } | ||||
| @@ -63,18 +67,18 @@ func (v *StreamVar[NP, VP]) NotToAll() []EndPoint[NP, VP] { | |||||
| return toes | return toes | ||||
| } | } | ||||
| func NodeNewOutputStream[NP any, VP any](node *Node[NP, VP], props VP) *StreamVar[NP, VP] { | |||||
| str := &StreamVar[NP, VP]{ | |||||
| func NodeNewOutputStream(node *Node, props any) *StreamVar { | |||||
| str := &StreamVar{ | |||||
| ID: node.Graph.genVarID(), | ID: node.Graph.genVarID(), | ||||
| From: EndPoint[NP, VP]{Node: node, SlotIndex: len(node.OutputStreams)}, | |||||
| From: EndPoint{Node: node, SlotIndex: len(node.OutputStreams)}, | |||||
| Props: props, | Props: props, | ||||
| } | } | ||||
| node.OutputStreams = append(node.OutputStreams, str) | node.OutputStreams = append(node.OutputStreams, str) | ||||
| return str | return str | ||||
| } | } | ||||
| func NodeDeclareInputStream[NP any, VP any](node *Node[NP, VP], cnt int) { | |||||
| node.InputStreams = make([]*StreamVar[NP, VP], cnt) | |||||
| func NodeDeclareInputStream(node *Node, cnt int) { | |||||
| node.InputStreams = make([]*StreamVar, cnt) | |||||
| } | } | ||||
| type ValueVarType int | type ValueVarType int | ||||
| @@ -84,29 +88,31 @@ const ( | |||||
| SignalValueVar | SignalValueVar | ||||
| ) | ) | ||||
| type ValueVar[NP any, VP any] struct { | |||||
| type ValueVar struct { | |||||
| ID int | ID int | ||||
| From EndPoint[NP, VP] | |||||
| Toes []EndPoint[NP, VP] | |||||
| Props VP | |||||
| Type ValueVarType | |||||
| From EndPoint | |||||
| Toes []EndPoint | |||||
| Props any | |||||
| Var exec.Var | |||||
| } | } | ||||
| func (v *ValueVar[NP, VP]) To(to *Node[NP, VP], slotIdx int) int { | |||||
| v.Toes = append(v.Toes, EndPoint[NP, VP]{Node: to, SlotIndex: slotIdx}) | |||||
| func (v *ValueVar) To(to *Node, slotIdx int) int { | |||||
| v.Toes = append(v.Toes, EndPoint{Node: to, SlotIndex: slotIdx}) | |||||
| to.InputValues[slotIdx] = v | to.InputValues[slotIdx] = v | ||||
| return len(v.Toes) - 1 | return len(v.Toes) - 1 | ||||
| } | } | ||||
| func NodeNewOutputValue[NP any, VP any](node *Node[NP, VP], props VP) *ValueVar[NP, VP] { | |||||
| val := &ValueVar[NP, VP]{ | |||||
| func NodeNewOutputValue(node *Node, props any) *ValueVar { | |||||
| val := &ValueVar{ | |||||
| ID: node.Graph.genVarID(), | ID: node.Graph.genVarID(), | ||||
| From: EndPoint[NP, VP]{Node: node, SlotIndex: len(node.OutputStreams)}, | |||||
| From: EndPoint{Node: node, SlotIndex: len(node.OutputStreams)}, | |||||
| Props: props, | Props: props, | ||||
| } | } | ||||
| node.OutputValues = append(node.OutputValues, val) | node.OutputValues = append(node.OutputValues, val) | ||||
| return val | return val | ||||
| } | } | ||||
| func NodeDeclareInputValue[NP any, VP any](node *Node[NP, VP], cnt int) { | |||||
| node.InputValues = make([]*ValueVar[NP, VP], cnt) | |||||
| func NodeDeclareInputValue(node *Node, cnt int) { | |||||
| node.InputValues = make([]*ValueVar, cnt) | |||||
| } | } | ||||
| @@ -8,13 +8,12 @@ import ( | |||||
| "gitlink.org.cn/cloudream/common/pkgs/future" | "gitlink.org.cn/cloudream/common/pkgs/future" | ||||
| "gitlink.org.cn/cloudream/common/utils/io2" | "gitlink.org.cn/cloudream/common/utils/io2" | ||||
| stgglb "gitlink.org.cn/cloudream/storage/common/globals" | |||||
| ) | ) | ||||
| type Driver struct { | type Driver struct { | ||||
| planID PlanID | planID PlanID | ||||
| planBlder *PlanBuilder | planBlder *PlanBuilder | ||||
| callback *future.SetVoidFuture | |||||
| callback *future.SetValueFuture[map[string]any] | |||||
| ctx context.Context | ctx context.Context | ||||
| cancel context.CancelFunc | cancel context.CancelFunc | ||||
| driverExec *Executor | driverExec *Executor | ||||
| @@ -46,18 +45,12 @@ func (e *Driver) Signal(signal *DriverSignalVar) { | |||||
| } | } | ||||
| func (e *Driver) Wait(ctx context.Context) (map[string]any, error) { | func (e *Driver) Wait(ctx context.Context) (map[string]any, error) { | ||||
| err := e.callback.Wait(ctx) | |||||
| stored, err := e.callback.WaitValue(ctx) | |||||
| if err != nil { | if err != nil { | ||||
| return nil, err | return nil, err | ||||
| } | } | ||||
| ret := make(map[string]any) | |||||
| e.planBlder.DriverPlan.StoreMap.Range(func(k, v any) bool { | |||||
| ret[k.(string)] = v | |||||
| return true | |||||
| }) | |||||
| return ret, nil | |||||
| return stored, nil | |||||
| } | } | ||||
| func (e *Driver) execute() { | func (e *Driver) execute() { | ||||
| @@ -74,22 +67,22 @@ func (e *Driver) execute() { | |||||
| Ops: p.Ops, | Ops: p.Ops, | ||||
| } | } | ||||
| cli, err := stgglb.AgentRPCPool.Acquire(stgglb.SelectGRPCAddress(&p.Node)) | |||||
| cli, err := p.Worker.NewClient() | |||||
| if err != nil { | if err != nil { | ||||
| e.stopWith(fmt.Errorf("new agent rpc client of node %v: %w", p.Node.NodeID, err)) | |||||
| e.stopWith(fmt.Errorf("new client to worker %v: %w", p.Worker, err)) | |||||
| return | return | ||||
| } | } | ||||
| defer stgglb.AgentRPCPool.Release(cli) | |||||
| defer cli.Close() | |||||
| err = cli.ExecuteIOPlan(e.ctx, plan) | |||||
| err = cli.ExecutePlan(e.ctx, plan) | |||||
| if err != nil { | if err != nil { | ||||
| e.stopWith(fmt.Errorf("execute plan at %v: %w", p.Node.NodeID, err)) | |||||
| e.stopWith(fmt.Errorf("execute plan at worker %v: %w", p.Worker, err)) | |||||
| return | return | ||||
| } | } | ||||
| }(p) | }(p) | ||||
| } | } | ||||
| err := e.driverExec.Run(e.ctx) | |||||
| stored, err := e.driverExec.Run(e.ctx) | |||||
| if err != nil { | if err != nil { | ||||
| e.stopWith(fmt.Errorf("run executor switch: %w", err)) | e.stopWith(fmt.Errorf("run executor switch: %w", err)) | ||||
| return | return | ||||
| @@ -97,7 +90,7 @@ func (e *Driver) execute() { | |||||
| wg.Wait() | wg.Wait() | ||||
| e.callback.SetVoid() | |||||
| e.callback.SetValue(stored) | |||||
| } | } | ||||
| func (e *Driver) stopWith(err error) { | func (e *Driver) stopWith(err error) { | ||||
| @@ -2,6 +2,10 @@ package exec | |||||
| import ( | import ( | ||||
| "context" | "context" | ||||
| "gitlink.org.cn/cloudream/common/pkgs/types" | |||||
| "gitlink.org.cn/cloudream/common/utils/reflect2" | |||||
| "gitlink.org.cn/cloudream/common/utils/serder" | |||||
| ) | ) | ||||
| type PlanID string | type PlanID string | ||||
| @@ -11,6 +15,12 @@ type Plan struct { | |||||
| Ops []Op `json:"ops"` | Ops []Op `json:"ops"` | ||||
| } | } | ||||
| var opUnion = serder.UseTypeUnionExternallyTagged(types.Ref(types.NewTypeUnion[Op]())) | |||||
| type Op interface { | type Op interface { | ||||
| Execute(ctx context.Context, sw *Executor) error | |||||
| Execute(ctx context.Context, e *Executor) error | |||||
| } | |||||
| func UseOp[T Op]() { | |||||
| opUnion.Add(reflect2.TypeOf[T]()) | |||||
| } | } | ||||
| @@ -21,12 +21,14 @@ type Executor struct { | |||||
| vars map[VarID]Var | vars map[VarID]Var | ||||
| bindings []*bindingVars | bindings []*bindingVars | ||||
| lock sync.Mutex | lock sync.Mutex | ||||
| store map[string]any | |||||
| } | } | ||||
| func NewExecutor(plan Plan) *Executor { | func NewExecutor(plan Plan) *Executor { | ||||
| planning := Executor{ | planning := Executor{ | ||||
| plan: plan, | |||||
| vars: make(map[VarID]Var), | |||||
| plan: plan, | |||||
| vars: make(map[VarID]Var), | |||||
| store: make(map[string]any), | |||||
| } | } | ||||
| return &planning | return &planning | ||||
| @@ -36,11 +38,11 @@ func (s *Executor) Plan() *Plan { | |||||
| return &s.plan | return &s.plan | ||||
| } | } | ||||
| func (s *Executor) Run(ctx context.Context) error { | |||||
| func (s *Executor) Run(ctx context.Context) (map[string]any, error) { | |||||
| ctx2, cancel := context.WithCancel(ctx) | ctx2, cancel := context.WithCancel(ctx) | ||||
| defer cancel() | defer cancel() | ||||
| return sync2.ParallelDo(s.plan.Ops, func(o Op, idx int) error { | |||||
| err := sync2.ParallelDo(s.plan.Ops, func(o Op, idx int) error { | |||||
| err := o.Execute(ctx2, s) | err := o.Execute(ctx2, s) | ||||
| s.lock.Lock() | s.lock.Lock() | ||||
| @@ -53,6 +55,11 @@ func (s *Executor) Run(ctx context.Context) error { | |||||
| return nil | return nil | ||||
| }) | }) | ||||
| if err != nil { | |||||
| return nil, err | |||||
| } | |||||
| return s.store, nil | |||||
| } | } | ||||
| func (s *Executor) BindVars(ctx context.Context, vs ...Var) error { | func (s *Executor) BindVars(ctx context.Context, vs ...Var) error { | ||||
| @@ -132,6 +139,13 @@ loop: | |||||
| } | } | ||||
| } | } | ||||
| func (s *Executor) Store(key string, val any) { | |||||
| s.lock.Lock() | |||||
| defer s.lock.Unlock() | |||||
| s.store[key] = val | |||||
| } | |||||
| func BindArrayVars[T Var](sw *Executor, ctx context.Context, vs []T) error { | func BindArrayVars[T Var](sw *Executor, ctx context.Context, vs []T) error { | ||||
| var vs2 []Var | var vs2 []Var | ||||
| for _, v := range vs { | for _, v := range vs { | ||||
| @@ -2,44 +2,42 @@ package exec | |||||
| import ( | import ( | ||||
| "context" | "context" | ||||
| "sync" | |||||
| "gitlink.org.cn/cloudream/common/pkgs/future" | "gitlink.org.cn/cloudream/common/pkgs/future" | ||||
| cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" | |||||
| "gitlink.org.cn/cloudream/common/utils/lo2" | "gitlink.org.cn/cloudream/common/utils/lo2" | ||||
| ) | ) | ||||
| type PlanBuilder struct { | type PlanBuilder struct { | ||||
| Vars []Var | Vars []Var | ||||
| WorkerPlans map[cdssdk.NodeID]*WorkerPlanBuilder | |||||
| WorkerPlans []*WorkerPlanBuilder | |||||
| DriverPlan DriverPlanBuilder | DriverPlan DriverPlanBuilder | ||||
| } | } | ||||
| func NewPlanBuilder() *PlanBuilder { | func NewPlanBuilder() *PlanBuilder { | ||||
| bld := &PlanBuilder{ | bld := &PlanBuilder{ | ||||
| WorkerPlans: make(map[cdssdk.NodeID]*WorkerPlanBuilder), | |||||
| DriverPlan: DriverPlanBuilder{ | |||||
| StoreMap: &sync.Map{}, | |||||
| }, | |||||
| DriverPlan: DriverPlanBuilder{}, | |||||
| } | } | ||||
| return bld | return bld | ||||
| } | } | ||||
| func (b *PlanBuilder) AtExecutor() *DriverPlanBuilder { | |||||
| func (b *PlanBuilder) AtDriver() *DriverPlanBuilder { | |||||
| return &b.DriverPlan | return &b.DriverPlan | ||||
| } | } | ||||
| func (b *PlanBuilder) AtAgent(node cdssdk.Node) *WorkerPlanBuilder { | |||||
| agtPlan, ok := b.WorkerPlans[node.NodeID] | |||||
| if !ok { | |||||
| agtPlan = &WorkerPlanBuilder{ | |||||
| Node: node, | |||||
| func (b *PlanBuilder) AtWorker(worker WorkerInfo) *WorkerPlanBuilder { | |||||
| for _, p := range b.WorkerPlans { | |||||
| if p.Worker.Equals(worker) { | |||||
| return p | |||||
| } | } | ||||
| b.WorkerPlans[node.NodeID] = agtPlan | |||||
| } | } | ||||
| return agtPlan | |||||
| p := &WorkerPlanBuilder{ | |||||
| Worker: worker, | |||||
| } | |||||
| b.WorkerPlans = append(b.WorkerPlans, p) | |||||
| return p | |||||
| } | } | ||||
| func (b *PlanBuilder) NewStreamVar() *StreamVar { | func (b *PlanBuilder) NewStreamVar() *StreamVar { | ||||
| @@ -89,7 +87,7 @@ func (b *PlanBuilder) Execute() *Driver { | |||||
| exec := Driver{ | exec := Driver{ | ||||
| planID: planID, | planID: planID, | ||||
| planBlder: b, | planBlder: b, | ||||
| callback: future.NewSetVoid(), | |||||
| callback: future.NewSetValue[map[string]any](), | |||||
| ctx: ctx, | ctx: ctx, | ||||
| cancel: cancel, | cancel: cancel, | ||||
| driverExec: NewExecutor(execPlan), | driverExec: NewExecutor(execPlan), | ||||
| @@ -100,8 +98,8 @@ func (b *PlanBuilder) Execute() *Driver { | |||||
| } | } | ||||
| type WorkerPlanBuilder struct { | type WorkerPlanBuilder struct { | ||||
| Node cdssdk.Node | |||||
| Ops []Op | |||||
| Worker WorkerInfo | |||||
| Ops []Op | |||||
| } | } | ||||
| func (b *WorkerPlanBuilder) AddOp(op Op) { | func (b *WorkerPlanBuilder) AddOp(op Op) { | ||||
| @@ -113,8 +111,7 @@ func (b *WorkerPlanBuilder) RemoveOp(op Op) { | |||||
| } | } | ||||
| type DriverPlanBuilder struct { | type DriverPlanBuilder struct { | ||||
| Ops []Op | |||||
| StoreMap *sync.Map | |||||
| Ops []Op | |||||
| } | } | ||||
| func (b *DriverPlanBuilder) AddOp(op Op) { | func (b *DriverPlanBuilder) AddOp(op Op) { | ||||
| @@ -2,6 +2,7 @@ package exec | |||||
| import ( | import ( | ||||
| "context" | "context" | ||||
| "io" | |||||
| "sync" | "sync" | ||||
| "github.com/samber/lo" | "github.com/samber/lo" | ||||
| @@ -83,3 +84,20 @@ func (s *Worker) FindByIDContexted(ctx context.Context, id PlanID) *Executor { | |||||
| return sw | return sw | ||||
| } | } | ||||
| type WorkerInfo interface { | |||||
| NewClient() (WorkerClient, error) | |||||
| // 判断两个worker是否相同 | |||||
| Equals(worker WorkerInfo) bool | |||||
| // Worker信息,比如ID、地址等 | |||||
| String() string | |||||
| } | |||||
| type WorkerClient interface { | |||||
| ExecutePlan(ctx context.Context, plan Plan) error | |||||
| SendStream(ctx context.Context, planID PlanID, v *StreamVar, str io.ReadCloser) error | |||||
| SendVar(ctx context.Context, planID PlanID, v Var) error | |||||
| GetStream(ctx context.Context, planID PlanID, v *StreamVar, signal *SignalVar) (io.ReadCloser, error) | |||||
| GetVar(ctx context.Context, planID PlanID, v Var, signal *SignalVar) error | |||||
| Close() error | |||||
| } | |||||
| @@ -1,26 +0,0 @@ | |||||
| package parser | |||||
| type From interface{} | |||||
| type To interface{} | |||||
| type FromTos []FromTo | |||||
| type FromTo struct { | |||||
| Froms []From | |||||
| Toes []To | |||||
| } | |||||
| func NewFromTo() FromTo { | |||||
| return FromTo{} | |||||
| } | |||||
| func (ft *FromTo) AddFrom(from From) *FromTo { | |||||
| ft.Froms = append(ft.Froms, from) | |||||
| return ft | |||||
| } | |||||
| func (ft *FromTo) AddTo(to To) *FromTo { | |||||
| ft.Toes = append(ft.Toes, to) | |||||
| return ft | |||||
| } | |||||
| @@ -1,9 +0,0 @@ | |||||
| package parser | |||||
| import ( | |||||
| "gitlink.org.cn/cloudream/common/pkgs/ioswitch/exec" | |||||
| ) | |||||
| type FromToParser interface { | |||||
| Parse(ft FromTo, blder *exec.PlanBuilder) error | |||||
| } | |||||
| @@ -0,0 +1,157 @@ | |||||
| package plan | |||||
| import ( | |||||
| "gitlink.org.cn/cloudream/common/pkgs/ioswitch/dag" | |||||
| "gitlink.org.cn/cloudream/common/pkgs/ioswitch/exec" | |||||
| "gitlink.org.cn/cloudream/common/pkgs/ioswitch/plan/ops" | |||||
| "gitlink.org.cn/cloudream/storage/common/pkgs/ioswitch" | |||||
| ) | |||||
| func Generate(graph *dag.Graph, planBld *exec.PlanBuilder) error { | |||||
| generateSend(graph) | |||||
| return buildPlan(graph, planBld) | |||||
| } | |||||
| // 生成Send指令 | |||||
| func generateSend(graph *dag.Graph) { | |||||
| graph.Walk(func(node *dag.Node) bool { | |||||
| for _, out := range node.OutputStreams { | |||||
| to := out.Toes[0] | |||||
| if to.Node.Env.Equals(node.Env) { | |||||
| continue | |||||
| } | |||||
| switch to.Node.Env.Type { | |||||
| case dag.EnvDriver: | |||||
| // // 如果是要送到Driver,则只能由Driver主动去拉取 | |||||
| getNode := graph.NewNode(&ops.GetStreamType{}, ioswitch.NodeProps{}) | |||||
| getNode.Env.ToEnvDriver() | |||||
| // // 同时需要对此变量生成HoldUntil指令,避免Plan结束时Get指令还未到达 | |||||
| holdNode := graph.NewNode(&ops.HoldUntilType{}, ioswitch.NodeProps{}) | |||||
| holdNode.Env = node.Env | |||||
| // 将Get指令的信号送到Hold指令 | |||||
| getNode.OutputValues[0].To(holdNode, 0) | |||||
| // 将Get指令的输出送到目的地 | |||||
| getNode.OutputStreams[0].To(to.Node, to.SlotIndex) | |||||
| out.Toes = nil | |||||
| // 将源节点的输出送到Hold指令 | |||||
| out.To(holdNode, 0) | |||||
| // 将Hold指令的输出送到Get指令 | |||||
| holdNode.OutputStreams[0].To(getNode, 0) | |||||
| case dag.EnvWorker: | |||||
| // 如果是要送到Agent,则可以直接发送 | |||||
| n := graph.NewNode(&ops.SendStreamType{}, ioswitch.NodeProps{}) | |||||
| n.Env = node.Env | |||||
| n.OutputStreams[0].To(to.Node, to.SlotIndex) | |||||
| out.Toes = nil | |||||
| out.To(n, 0) | |||||
| } | |||||
| } | |||||
| for _, out := range node.OutputValues { | |||||
| to := out.Toes[0] | |||||
| if to.Node.Env.Equals(node.Env) { | |||||
| continue | |||||
| } | |||||
| switch to.Node.Env.Type { | |||||
| case dag.EnvDriver: | |||||
| // // 如果是要送到Driver,则只能由Driver主动去拉取 | |||||
| getNode := graph.NewNode(&ops.GetVaType{}, ioswitch.NodeProps{}) | |||||
| getNode.Env.ToEnvDriver() | |||||
| // // 同时需要对此变量生成HoldUntil指令,避免Plan结束时Get指令还未到达 | |||||
| holdNode := graph.NewNode(&ops.HoldUntilType{}, ioswitch.NodeProps{}) | |||||
| holdNode.Env = node.Env | |||||
| // 将Get指令的信号送到Hold指令 | |||||
| getNode.OutputValues[0].To(holdNode, 0) | |||||
| // 将Get指令的输出送到目的地 | |||||
| getNode.OutputValues[1].To(to.Node, to.SlotIndex) | |||||
| out.Toes = nil | |||||
| // 将源节点的输出送到Hold指令 | |||||
| out.To(holdNode, 0) | |||||
| // 将Hold指令的输出送到Get指令 | |||||
| holdNode.OutputValues[0].To(getNode, 0) | |||||
| case dag.EnvWorker: | |||||
| // 如果是要送到Agent,则可以直接发送 | |||||
| n := graph.NewNode(&ops.SendVarType{}, ioswitch.NodeProps{}) | |||||
| n.Env = node.Env | |||||
| n.OutputValues[0].To(to.Node, to.SlotIndex) | |||||
| out.Toes = nil | |||||
| out.To(n, 0) | |||||
| } | |||||
| } | |||||
| return true | |||||
| }) | |||||
| } | |||||
| // 生成Plan | |||||
| func buildPlan(graph *dag.Graph, blder *exec.PlanBuilder) error { | |||||
| var retErr error | |||||
| graph.Walk(func(node *dag.Node) bool { | |||||
| for _, out := range node.OutputStreams { | |||||
| if out.Var != nil { | |||||
| continue | |||||
| } | |||||
| out.Var = blder.NewStreamVar() | |||||
| } | |||||
| for _, in := range node.InputStreams { | |||||
| if in.Var != nil { | |||||
| continue | |||||
| } | |||||
| in.Var = blder.NewStreamVar() | |||||
| } | |||||
| for _, out := range node.OutputValues { | |||||
| if out.Var != nil { | |||||
| continue | |||||
| } | |||||
| switch out.Type { | |||||
| case dag.StringValueVar: | |||||
| out.Var = blder.NewStringVar() | |||||
| case dag.SignalValueVar: | |||||
| out.Var = blder.NewSignalVar() | |||||
| } | |||||
| } | |||||
| for _, in := range node.InputValues { | |||||
| if in.Var != nil { | |||||
| continue | |||||
| } | |||||
| switch in.Type { | |||||
| case dag.StringValueVar: | |||||
| in.Var = blder.NewStringVar() | |||||
| case dag.SignalValueVar: | |||||
| in.Var = blder.NewSignalVar() | |||||
| } | |||||
| } | |||||
| op, err := node.Type.GenerateOp(node) | |||||
| if err != nil { | |||||
| retErr = err | |||||
| return false | |||||
| } | |||||
| switch node.Env.Type { | |||||
| case dag.EnvDriver: | |||||
| blder.AtDriver().AddOp(op) | |||||
| case dag.EnvWorker: | |||||
| blder.AtWorker(node.Env.Worker).AddOp(op) | |||||
| } | |||||
| return true | |||||
| }) | |||||
| return retErr | |||||
| } | |||||
| @@ -0,0 +1,43 @@ | |||||
| package ops | |||||
| import ( | |||||
| "fmt" | |||||
| "gitlink.org.cn/cloudream/common/pkgs/ioswitch/dag" | |||||
| "gitlink.org.cn/cloudream/common/pkgs/ioswitch/exec" | |||||
| ) | |||||
| type FromDriverType struct { | |||||
| Handle *exec.DriverWriteStream | |||||
| } | |||||
| func (t *FromDriverType) InitNode(node *dag.Node) { | |||||
| dag.NodeNewOutputStream(node, nil) | |||||
| } | |||||
| func (t *FromDriverType) GenerateOp(op *dag.Node) (exec.Op, error) { | |||||
| t.Handle.Var = op.OutputStreams[0].Var | |||||
| return nil, nil | |||||
| } | |||||
| func (t *FromDriverType) String(node *dag.Node) string { | |||||
| return fmt.Sprintf("FromDriver[]%v%v", formatStreamIO(node), formatValueIO(node)) | |||||
| } | |||||
| type ToDriverType struct { | |||||
| Handle *exec.DriverReadStream | |||||
| Range exec.Range | |||||
| } | |||||
| func (t *ToDriverType) InitNode(node *dag.Node) { | |||||
| dag.NodeDeclareInputStream(node, 1) | |||||
| } | |||||
| func (t *ToDriverType) GenerateOp(op *dag.Node) (exec.Op, error) { | |||||
| t.Handle.Var = op.InputStreams[0].Var | |||||
| return nil, nil | |||||
| } | |||||
| func (t *ToDriverType) String(node *dag.Node) string { | |||||
| return fmt.Sprintf("ToDriver[%v+%v]%v%v", t.Range.Offset, t.Range.Length, formatStreamIO(node), formatValueIO(node)) | |||||
| } | |||||
| @@ -0,0 +1,52 @@ | |||||
| package ops | |||||
| import ( | |||||
| "context" | |||||
| "fmt" | |||||
| "io" | |||||
| "gitlink.org.cn/cloudream/common/pkgs/ioswitch/dag" | |||||
| "gitlink.org.cn/cloudream/common/pkgs/ioswitch/exec" | |||||
| ) | |||||
| func init() { | |||||
| exec.UseOp[*DropStream]() | |||||
| } | |||||
| type DropStream struct { | |||||
| Input *exec.StreamVar `json:"input"` | |||||
| } | |||||
| func (o *DropStream) Execute(ctx context.Context, e *exec.Executor) error { | |||||
| err := e.BindVars(ctx, o.Input) | |||||
| if err != nil { | |||||
| return err | |||||
| } | |||||
| for { | |||||
| buf := make([]byte, 1024*8) | |||||
| _, err = o.Input.Stream.Read(buf) | |||||
| if err == io.EOF { | |||||
| return nil | |||||
| } | |||||
| if err != nil { | |||||
| return err | |||||
| } | |||||
| } | |||||
| } | |||||
| type DropType struct{} | |||||
| func (t *DropType) InitNode(node *dag.Node) { | |||||
| dag.NodeDeclareInputStream(node, 1) | |||||
| } | |||||
| func (t *DropType) GenerateOp(op *dag.Node) (exec.Op, error) { | |||||
| return &DropStream{ | |||||
| Input: op.InputStreams[0].Var, | |||||
| }, nil | |||||
| } | |||||
| func (t *DropType) String(node *dag.Node) string { | |||||
| return fmt.Sprintf("Drop[]%v%v", formatStreamIO(node), formatValueIO(node)) | |||||
| } | |||||
| @@ -0,0 +1,224 @@ | |||||
| package ops | |||||
| import ( | |||||
| "context" | |||||
| "fmt" | |||||
| "io" | |||||
| "gitlink.org.cn/cloudream/common/pkgs/future" | |||||
| "gitlink.org.cn/cloudream/common/pkgs/ioswitch/dag" | |||||
| "gitlink.org.cn/cloudream/common/pkgs/ioswitch/exec" | |||||
| "gitlink.org.cn/cloudream/common/pkgs/logger" | |||||
| "gitlink.org.cn/cloudream/common/utils/io2" | |||||
| ) | |||||
| func init() { | |||||
| exec.UseOp[*SendStream]() | |||||
| exec.UseOp[*GetStream]() | |||||
| exec.UseOp[*SendVar]() | |||||
| exec.UseOp[*GetVar]() | |||||
| } | |||||
| type SendStream struct { | |||||
| Input *exec.StreamVar `json:"input"` | |||||
| Send *exec.StreamVar `json:"send"` | |||||
| Worker exec.WorkerInfo `json:"worker"` | |||||
| } | |||||
| func (o *SendStream) Execute(ctx context.Context, e *exec.Executor) error { | |||||
| err := e.BindVars(ctx, o.Input) | |||||
| if err != nil { | |||||
| return err | |||||
| } | |||||
| defer o.Input.Stream.Close() | |||||
| cli, err := o.Worker.NewClient() | |||||
| if err != nil { | |||||
| return fmt.Errorf("new worker %v client: %w", o.Worker, err) | |||||
| } | |||||
| defer cli.Close() | |||||
| logger.Debugf("sending stream %v as %v to worker %v", o.Input.ID, o.Send.ID, o.Worker) | |||||
| // 发送后流的ID不同 | |||||
| err = cli.SendStream(ctx, e.Plan().ID, o.Send, o.Input.Stream) | |||||
| if err != nil { | |||||
| return fmt.Errorf("sending stream: %w", err) | |||||
| } | |||||
| return nil | |||||
| } | |||||
| type GetStream struct { | |||||
| Signal *exec.SignalVar `json:"signal"` | |||||
| Target *exec.StreamVar `json:"target"` | |||||
| Output *exec.StreamVar `json:"output"` | |||||
| Worker exec.WorkerInfo `json:"worker"` | |||||
| } | |||||
| func (o *GetStream) Execute(ctx context.Context, e *exec.Executor) error { | |||||
| cli, err := o.Worker.NewClient() | |||||
| if err != nil { | |||||
| return fmt.Errorf("new worker %v client: %w", o.Worker, err) | |||||
| } | |||||
| defer cli.Close() | |||||
| logger.Debugf("getting stream %v as %v from worker %v", o.Target.ID, o.Output.ID, o.Worker) | |||||
| str, err := cli.GetStream(ctx, e.Plan().ID, o.Target, o.Signal) | |||||
| if err != nil { | |||||
| return fmt.Errorf("getting stream: %w", err) | |||||
| } | |||||
| fut := future.NewSetVoid() | |||||
| // 获取后送到本地的流ID是不同的 | |||||
| o.Output.Stream = io2.AfterReadClosedOnce(str, func(closer io.ReadCloser) { | |||||
| fut.SetVoid() | |||||
| }) | |||||
| e.PutVars(o.Output) | |||||
| return fut.Wait(ctx) | |||||
| } | |||||
| type SendVar struct { | |||||
| Input exec.Var `json:"input"` | |||||
| Send exec.Var `json:"send"` | |||||
| Worker exec.WorkerInfo `json:"worker"` | |||||
| } | |||||
| func (o *SendVar) Execute(ctx context.Context, e *exec.Executor) error { | |||||
| err := e.BindVars(ctx, o.Input) | |||||
| if err != nil { | |||||
| return err | |||||
| } | |||||
| cli, err := o.Worker.NewClient() | |||||
| if err != nil { | |||||
| return fmt.Errorf("new worker %v client: %w", o.Worker, err) | |||||
| } | |||||
| defer cli.Close() | |||||
| logger.Debugf("sending var %v as %v to worker %v", o.Input.GetID(), o.Send.GetID(), o.Worker) | |||||
| exec.AssignVar(o.Input, o.Send) | |||||
| err = cli.SendVar(ctx, e.Plan().ID, o.Send) | |||||
| if err != nil { | |||||
| return fmt.Errorf("sending var: %w", err) | |||||
| } | |||||
| return nil | |||||
| } | |||||
| type GetVar struct { | |||||
| Signal *exec.SignalVar `json:"signal"` | |||||
| Target exec.Var `json:"target"` | |||||
| Output exec.Var `json:"output"` | |||||
| Worker exec.WorkerInfo `json:"worker"` | |||||
| } | |||||
| func (o *GetVar) Execute(ctx context.Context, e *exec.Executor) error { | |||||
| cli, err := o.Worker.NewClient() | |||||
| if err != nil { | |||||
| return fmt.Errorf("new worker %v client: %w", o.Worker, err) | |||||
| } | |||||
| defer cli.Close() | |||||
| logger.Debugf("getting var %v as %v from worker %v", o.Target.GetID(), o.Output.GetID(), o.Worker) | |||||
| err = cli.GetVar(ctx, e.Plan().ID, o.Target, o.Signal) | |||||
| if err != nil { | |||||
| return fmt.Errorf("getting var: %w", err) | |||||
| } | |||||
| exec.AssignVar(o.Target, o.Output) | |||||
| e.PutVars(o.Output) | |||||
| return nil | |||||
| } | |||||
| type SendStreamType struct { | |||||
| ToWorker exec.WorkerInfo | |||||
| } | |||||
| func (t *SendStreamType) InitNode(node *dag.Node) { | |||||
| dag.NodeDeclareInputStream(node, 1) | |||||
| dag.NodeNewOutputStream(node, nil) | |||||
| } | |||||
| func (t *SendStreamType) GenerateOp(op *dag.Node) (exec.Op, error) { | |||||
| return &SendStream{ | |||||
| Input: op.InputStreams[0].Var, | |||||
| Send: op.OutputStreams[0].Var, | |||||
| Worker: t.ToWorker, | |||||
| }, nil | |||||
| } | |||||
| func (t *SendStreamType) String(node *dag.Node) string { | |||||
| return fmt.Sprintf("SendStream[]%v%v", formatStreamIO(node), formatValueIO(node)) | |||||
| } | |||||
| type SendVarType struct { | |||||
| ToWorker exec.WorkerInfo | |||||
| } | |||||
| func (t *SendVarType) InitNode(node *dag.Node) { | |||||
| dag.NodeDeclareInputValue(node, 1) | |||||
| dag.NodeNewOutputValue(node, nil) | |||||
| } | |||||
| func (t *SendVarType) GenerateOp(op *dag.Node) (exec.Op, error) { | |||||
| return &SendVar{ | |||||
| Input: op.InputValues[0].Var, | |||||
| Send: op.OutputValues[0].Var, | |||||
| Worker: t.ToWorker, | |||||
| }, nil | |||||
| } | |||||
| func (t *SendVarType) String(node *dag.Node) string { | |||||
| return fmt.Sprintf("SendVar[]%v%v", formatStreamIO(node), formatValueIO(node)) | |||||
| } | |||||
| type GetStreamType struct { | |||||
| FromWorker exec.WorkerInfo | |||||
| } | |||||
| func (t *GetStreamType) InitNode(node *dag.Node) { | |||||
| dag.NodeDeclareInputStream(node, 1) | |||||
| dag.NodeNewOutputValue(node, nil) | |||||
| dag.NodeNewOutputStream(node, nil) | |||||
| } | |||||
| func (t *GetStreamType) GenerateOp(op *dag.Node) (exec.Op, error) { | |||||
| return &GetStream{ | |||||
| Signal: op.OutputValues[0].Var.(*exec.SignalVar), | |||||
| Output: op.OutputStreams[0].Var, | |||||
| Target: op.InputStreams[0].Var, | |||||
| Worker: t.FromWorker, | |||||
| }, nil | |||||
| } | |||||
| func (t *GetStreamType) String(node *dag.Node) string { | |||||
| return fmt.Sprintf("GetStream[]%v%v", formatStreamIO(node), formatValueIO(node)) | |||||
| } | |||||
| type GetVaType struct { | |||||
| FromWorker exec.WorkerInfo | |||||
| } | |||||
| func (t *GetVaType) InitNode(node *dag.Node) { | |||||
| dag.NodeDeclareInputValue(node, 1) | |||||
| dag.NodeNewOutputValue(node, nil) | |||||
| dag.NodeNewOutputValue(node, nil) | |||||
| } | |||||
| func (t *GetVaType) GenerateOp(op *dag.Node) (exec.Op, error) { | |||||
| return &GetVar{ | |||||
| Signal: op.OutputValues[0].Var.(*exec.SignalVar), | |||||
| Output: op.OutputValues[1].Var, | |||||
| Target: op.InputValues[0].Var, | |||||
| Worker: t.FromWorker, | |||||
| }, nil | |||||
| } | |||||
| func (t *GetVaType) String(node *dag.Node) string { | |||||
| return fmt.Sprintf("GetVar[]%v%v", formatStreamIO(node), formatValueIO(node)) | |||||
| } | |||||
| @@ -0,0 +1,49 @@ | |||||
| package ops | |||||
| import ( | |||||
| "context" | |||||
| "fmt" | |||||
| "gitlink.org.cn/cloudream/common/pkgs/ioswitch/dag" | |||||
| "gitlink.org.cn/cloudream/common/pkgs/ioswitch/exec" | |||||
| ) | |||||
| type Store struct { | |||||
| Var exec.Var | |||||
| Key string | |||||
| } | |||||
| func (o *Store) Execute(ctx context.Context, e *exec.Executor) error { | |||||
| err := e.BindVars(ctx, o.Var) | |||||
| if err != nil { | |||||
| return err | |||||
| } | |||||
| switch v := o.Var.(type) { | |||||
| case *exec.IntVar: | |||||
| e.Store(o.Key, v.Value) | |||||
| case *exec.StringVar: | |||||
| e.Store(o.Key, v.Value) | |||||
| } | |||||
| return nil | |||||
| } | |||||
| type StoreType struct { | |||||
| StoreKey string | |||||
| } | |||||
| func (t *StoreType) InitNode(node *dag.Node) { | |||||
| dag.NodeDeclareInputValue(node, 1) | |||||
| } | |||||
| func (t *StoreType) GenerateOp(op *dag.Node) (exec.Op, error) { | |||||
| return &Store{ | |||||
| Var: op.InputValues[0].Var, | |||||
| Key: t.StoreKey, | |||||
| }, nil | |||||
| } | |||||
| func (t *StoreType) String(node *dag.Node) string { | |||||
| return fmt.Sprintf("Store[%s]%v%v", t.StoreKey, formatStreamIO(node), formatValueIO(node)) | |||||
| } | |||||
| @@ -0,0 +1,172 @@ | |||||
| package ops | |||||
| import ( | |||||
| "context" | |||||
| "fmt" | |||||
| "io" | |||||
| "gitlink.org.cn/cloudream/common/pkgs/future" | |||||
| "gitlink.org.cn/cloudream/common/pkgs/ioswitch/dag" | |||||
| "gitlink.org.cn/cloudream/common/pkgs/ioswitch/exec" | |||||
| ) | |||||
| func init() { | |||||
| exec.UseOp[*OnStreamBegin]() | |||||
| exec.UseOp[*OnStreamEnd]() | |||||
| exec.UseOp[*HoldUntil]() | |||||
| exec.UseOp[*HangUntil]() | |||||
| exec.UseOp[*Broadcast]() | |||||
| } | |||||
| type OnStreamBegin struct { | |||||
| Raw *exec.StreamVar `json:"raw"` | |||||
| New *exec.StreamVar `json:"new"` | |||||
| Signal *exec.SignalVar `json:"signal"` | |||||
| } | |||||
| func (o *OnStreamBegin) Execute(ctx context.Context, e *exec.Executor) error { | |||||
| err := e.BindVars(ctx, o.Raw) | |||||
| if err != nil { | |||||
| return err | |||||
| } | |||||
| o.New.Stream = o.Raw.Stream | |||||
| e.PutVars(o.New, o.Signal) | |||||
| return nil | |||||
| } | |||||
| type OnStreamEnd struct { | |||||
| Raw *exec.StreamVar `json:"raw"` | |||||
| New *exec.StreamVar `json:"new"` | |||||
| Signal *exec.SignalVar `json:"signal"` | |||||
| } | |||||
| type onStreamEnd struct { | |||||
| inner io.ReadCloser | |||||
| callback *future.SetVoidFuture | |||||
| } | |||||
| func (o *onStreamEnd) Read(p []byte) (n int, err error) { | |||||
| n, err = o.inner.Read(p) | |||||
| if err == io.EOF { | |||||
| o.callback.SetVoid() | |||||
| } else if err != nil { | |||||
| o.callback.SetError(err) | |||||
| } | |||||
| return n, err | |||||
| } | |||||
| func (o *onStreamEnd) Close() error { | |||||
| o.callback.SetError(fmt.Errorf("stream closed early")) | |||||
| return o.inner.Close() | |||||
| } | |||||
| func (o *OnStreamEnd) Execute(ctx context.Context, e *exec.Executor) error { | |||||
| err := e.BindVars(ctx, o.Raw) | |||||
| if err != nil { | |||||
| return err | |||||
| } | |||||
| cb := future.NewSetVoid() | |||||
| o.New.Stream = &onStreamEnd{ | |||||
| inner: o.Raw.Stream, | |||||
| callback: cb, | |||||
| } | |||||
| e.PutVars(o.New) | |||||
| err = cb.Wait(ctx) | |||||
| if err != nil { | |||||
| return err | |||||
| } | |||||
| e.PutVars(o.Signal) | |||||
| return nil | |||||
| } | |||||
| type HoldUntil struct { | |||||
| Waits []*exec.SignalVar `json:"waits"` | |||||
| Holds []exec.Var `json:"holds"` | |||||
| Emits []exec.Var `json:"emits"` | |||||
| } | |||||
| func (w *HoldUntil) Execute(ctx context.Context, e *exec.Executor) error { | |||||
| err := e.BindVars(ctx, w.Holds...) | |||||
| if err != nil { | |||||
| return err | |||||
| } | |||||
| err = exec.BindArrayVars(e, ctx, w.Waits) | |||||
| if err != nil { | |||||
| return err | |||||
| } | |||||
| for i := 0; i < len(w.Holds); i++ { | |||||
| err := exec.AssignVar(w.Holds[i], w.Emits[i]) | |||||
| if err != nil { | |||||
| return err | |||||
| } | |||||
| } | |||||
| e.PutVars(w.Emits...) | |||||
| return nil | |||||
| } | |||||
| type HangUntil struct { | |||||
| Waits []*exec.SignalVar `json:"waits"` | |||||
| Op exec.Op `json:"op"` | |||||
| } | |||||
| func (h *HangUntil) Execute(ctx context.Context, e *exec.Executor) error { | |||||
| err := exec.BindArrayVars(e, ctx, h.Waits) | |||||
| if err != nil { | |||||
| return err | |||||
| } | |||||
| return h.Op.Execute(ctx, e) | |||||
| } | |||||
| type Broadcast struct { | |||||
| Source *exec.SignalVar `json:"source"` | |||||
| Targets []*exec.SignalVar `json:"targets"` | |||||
| } | |||||
| func (b *Broadcast) Execute(ctx context.Context, e *exec.Executor) error { | |||||
| err := e.BindVars(ctx, b.Source) | |||||
| if err != nil { | |||||
| return err | |||||
| } | |||||
| exec.PutArrayVars(e, b.Targets) | |||||
| return nil | |||||
| } | |||||
| type HoldUntilType struct { | |||||
| } | |||||
| func (t *HoldUntilType) InitNode(node *dag.Node) { | |||||
| dag.NodeDeclareInputValue(node, 1) | |||||
| } | |||||
| func (t *HoldUntilType) GenerateOp(op *dag.Node) (exec.Op, error) { | |||||
| o := &HoldUntil{ | |||||
| Waits: []*exec.SignalVar{op.InputValues[0].Var.(*exec.SignalVar)}, | |||||
| } | |||||
| for i := 0; i < len(op.OutputValues); i++ { | |||||
| o.Holds = append(o.Holds, op.InputValues[i+1].Var) | |||||
| o.Emits = append(o.Emits, op.OutputValues[i].Var) | |||||
| } | |||||
| for i := 0; i < len(op.OutputStreams); i++ { | |||||
| o.Holds = append(o.Holds, op.InputStreams[i].Var) | |||||
| o.Emits = append(o.Emits, op.OutputStreams[i].Var) | |||||
| } | |||||
| return o, nil | |||||
| } | |||||
| func (t *HoldUntilType) String(node *dag.Node) string { | |||||
| return fmt.Sprintf("HoldUntil[]%v%v", formatStreamIO(node), formatValueIO(node)) | |||||
| } | |||||
| @@ -0,0 +1,75 @@ | |||||
| package ops | |||||
| import ( | |||||
| "fmt" | |||||
| "gitlink.org.cn/cloudream/common/pkgs/ioswitch/dag" | |||||
| ) | |||||
| func formatStreamIO(node *dag.Node) string { | |||||
| is := "" | |||||
| for i, in := range node.InputStreams { | |||||
| if i > 0 { | |||||
| is += "," | |||||
| } | |||||
| if in == nil { | |||||
| is += "." | |||||
| } else { | |||||
| is += fmt.Sprintf("%v", in.ID) | |||||
| } | |||||
| } | |||||
| os := "" | |||||
| for i, out := range node.OutputStreams { | |||||
| if i > 0 { | |||||
| os += "," | |||||
| } | |||||
| if out == nil { | |||||
| os += "." | |||||
| } else { | |||||
| os += fmt.Sprintf("%v", out.ID) | |||||
| } | |||||
| } | |||||
| if is == "" && os == "" { | |||||
| return "" | |||||
| } | |||||
| return fmt.Sprintf("S{%s>%s}", is, os) | |||||
| } | |||||
| func formatValueIO(node *dag.Node) string { | |||||
| is := "" | |||||
| for i, in := range node.InputValues { | |||||
| if i > 0 { | |||||
| is += "," | |||||
| } | |||||
| if in == nil { | |||||
| is += "." | |||||
| } else { | |||||
| is += fmt.Sprintf("%v", in.ID) | |||||
| } | |||||
| } | |||||
| os := "" | |||||
| for i, out := range node.OutputValues { | |||||
| if i > 0 { | |||||
| os += "," | |||||
| } | |||||
| if out == nil { | |||||
| os += "." | |||||
| } else { | |||||
| os += fmt.Sprintf("%v", out.ID) | |||||
| } | |||||
| } | |||||
| if is == "" && os == "" { | |||||
| return "" | |||||
| } | |||||
| return fmt.Sprintf("V{%s>%s}", is, os) | |||||
| } | |||||
| @@ -0,0 +1,20 @@ | |||||
| package ops | |||||
| import ( | |||||
| "context" | |||||
| "gitlink.org.cn/cloudream/common/pkgs/ioswitch/exec" | |||||
| ) | |||||
| func init() { | |||||
| exec.UseOp[*ConstVar]() | |||||
| } | |||||
| type ConstVar struct { | |||||
| Var *exec.StringVar `json:"var"` | |||||
| } | |||||
| func (o *ConstVar) Execute(ctx context.Context, e *exec.Executor) error { | |||||
| e.PutVars(o.Var) | |||||
| return nil | |||||
| } | |||||