Browse Source

Merge pull request '重构ioswitch模块' (#49) from feature_gxh into master

gitlink
baohan 1 year ago
parent
commit
4f182d1332
21 changed files with 1752 additions and 0 deletions
  1. +1
    -0
      go.mod
  2. +2
    -0
      go.sum
  3. +74
    -0
      pkgs/ioswitch/dag/graph.go
  4. +68
    -0
      pkgs/ioswitch/dag/node.go
  5. +13
    -0
      pkgs/ioswitch/dag/utils.go
  6. +118
    -0
      pkgs/ioswitch/dag/var.go
  7. +112
    -0
      pkgs/ioswitch/exec/driver.go
  8. +26
    -0
      pkgs/ioswitch/exec/exec.go
  9. +165
    -0
      pkgs/ioswitch/exec/executor.go
  10. +123
    -0
      pkgs/ioswitch/exec/plan_builder.go
  11. +98
    -0
      pkgs/ioswitch/exec/utils.go
  12. +57
    -0
      pkgs/ioswitch/exec/var.go
  13. +103
    -0
      pkgs/ioswitch/exec/worker.go
  14. +157
    -0
      pkgs/ioswitch/plan/generate.go
  15. +43
    -0
      pkgs/ioswitch/plan/ops/driver.go
  16. +52
    -0
      pkgs/ioswitch/plan/ops/drop.go
  17. +224
    -0
      pkgs/ioswitch/plan/ops/send.go
  18. +49
    -0
      pkgs/ioswitch/plan/ops/store.go
  19. +172
    -0
      pkgs/ioswitch/plan/ops/sync.go
  20. +75
    -0
      pkgs/ioswitch/plan/ops/utils.go
  21. +20
    -0
      pkgs/ioswitch/plan/ops/var.go

+ 1
- 0
go.mod View File

@@ -61,6 +61,7 @@ require (
go.uber.org/zap v1.24.0 // indirect
golang.org/x/crypto v0.6.0 // indirect
golang.org/x/net v0.8.0 // indirect
golang.org/x/sync v0.1.0
golang.org/x/sys v0.6.0 // indirect
golang.org/x/text v0.8.0 // indirect
google.golang.org/genproto v0.0.0-20230403163135-c38d8f061ccd // indirect


+ 2
- 0
go.sum View File

@@ -147,6 +147,8 @@ golang.org/x/net v0.8.0/go.mod h1:QVkue5JL9kW//ek3r6jTKnTFis1tRmNAW2P1shuFdJc=
golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.1.0 h1:wsuoTGHzEhffawBOhz5CYhcrV4IdKZbEyZjBMuTp12o=
golang.org/x/sync v0.1.0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=


+ 74
- 0
pkgs/ioswitch/dag/graph.go View File

@@ -0,0 +1,74 @@
package dag

import (
"gitlink.org.cn/cloudream/common/utils/lo2"
)

type Graph struct {
Nodes []*Node
isWalking bool
nextVarID int
}

func NewGraph() *Graph {
return &Graph{}
}

func (g *Graph) NewNode(typ NodeType, props any) *Node {
n := &Node{
Type: typ,
Props: props,
Graph: g,
}
typ.InitNode(n)
g.Nodes = append(g.Nodes, n)
return n
}

func (g *Graph) RemoveNode(node *Node) {
for i, n := range g.Nodes {
if n == node {
if g.isWalking {
g.Nodes[i] = nil
} else {
g.Nodes = lo2.RemoveAt(g.Nodes, i)
}
break
}
}
}

func (g *Graph) Walk(cb func(node *Node) bool) {
g.isWalking = true
for i := 0; i < len(g.Nodes); i++ {
if g.Nodes[i] == nil {
continue
}

if !cb(g.Nodes[i]) {
break
}
}
g.isWalking = false

g.Nodes = lo2.RemoveAllDefault(g.Nodes)
}

func (g *Graph) genVarID() int {
g.nextVarID++
return g.nextVarID
}

func NewNode[N NodeType](graph *Graph, typ N, props any) (*Node, N) {
return graph.NewNode(typ, props), typ
}

func WalkOnlyType[N NodeType](g *Graph, cb func(node *Node, typ N) bool) {
g.Walk(func(node *Node) bool {
typ, ok := node.Type.(N)
if ok {
return cb(node, typ)
}
return true
})
}

+ 68
- 0
pkgs/ioswitch/dag/node.go View File

@@ -0,0 +1,68 @@
package dag

import (
"fmt"

"gitlink.org.cn/cloudream/common/pkgs/ioswitch/exec"
)

type NodeType interface {
InitNode(node *Node)
String(node *Node) string
GenerateOp(node *Node) (exec.Op, error)
}

type NodeEnvType string

const (
EnvUnknown NodeEnvType = ""
EnvDriver NodeEnvType = "Driver"
EnvWorker NodeEnvType = "Worker"
)

type NodeEnv struct {
Type NodeEnvType
Worker exec.WorkerInfo
}

func (e *NodeEnv) ToEnvUnknown() {
e.Type = EnvUnknown
e.Worker = nil
}

func (e *NodeEnv) ToEnvDriver() {
e.Type = EnvDriver
e.Worker = nil
}

func (e *NodeEnv) ToEnvWorker(worker exec.WorkerInfo) {
e.Type = EnvWorker
e.Worker = worker
}

func (e *NodeEnv) Equals(other NodeEnv) bool {
if e.Type != other.Type {
return false
}

if e.Type != EnvWorker {
return true
}

return e.Worker.Equals(other.Worker)
}

type Node struct {
Type NodeType
Env NodeEnv
Props any
InputStreams []*StreamVar
OutputStreams []*StreamVar
InputValues []*ValueVar
OutputValues []*ValueVar
Graph *Graph
}

func (n *Node) String() string {
return fmt.Sprintf("%v", n.Type.String(n))
}

+ 13
- 0
pkgs/ioswitch/dag/utils.go View File

@@ -0,0 +1,13 @@
package dag

func NProps[T any](n *Node) T {
return n.Props.(T)
}

func SProps[T any](str *StreamVar) T {
return str.Props.(T)
}

func VProps[T any](v *ValueVar) T {
return v.Props.(T)
}

+ 118
- 0
pkgs/ioswitch/dag/var.go View File

@@ -0,0 +1,118 @@
package dag

import (
"gitlink.org.cn/cloudream/common/pkgs/ioswitch/exec"
"gitlink.org.cn/cloudream/common/utils/lo2"
)

type EndPoint struct {
Node *Node
SlotIndex int // 所连接的Node的Output或Input数组的索引
}

type StreamVar struct {
ID int
From EndPoint
Toes []EndPoint
Props any
Var *exec.StreamVar
}

func (v *StreamVar) To(to *Node, slotIdx int) int {
v.Toes = append(v.Toes, EndPoint{Node: to, SlotIndex: slotIdx})
to.InputStreams[slotIdx] = v
return len(v.Toes) - 1
}

// func (v *StreamVar) NotTo(toIdx int) EndPoint {
// ed := v.Toes[toIdx]
// lo2.RemoveAt(v.Toes, toIdx)
// ed.Node.InputStreams[ed.SlotIndex] = nil
// return ed
// }

func (v *StreamVar) NotTo(node *Node) (EndPoint, bool) {
for i, ed := range v.Toes {
if ed.Node == node {
v.Toes = lo2.RemoveAt(v.Toes, i)
ed.Node.InputStreams[ed.SlotIndex] = nil
return ed, true
}
}

return EndPoint{}, false
}

func (v *StreamVar) NotToWhere(pred func(to EndPoint) bool) []EndPoint {
var newToes []EndPoint
var rmed []EndPoint
for _, ed := range v.Toes {
if pred(ed) {
ed.Node.InputStreams[ed.SlotIndex] = nil
rmed = append(rmed, ed)
} else {
newToes = append(newToes, ed)
}
}
v.Toes = newToes
return rmed
}

func (v *StreamVar) NotToAll() []EndPoint {
for _, ed := range v.Toes {
ed.Node.InputStreams[ed.SlotIndex] = nil
}
toes := v.Toes
v.Toes = nil
return toes
}

func NodeNewOutputStream(node *Node, props any) *StreamVar {
str := &StreamVar{
ID: node.Graph.genVarID(),
From: EndPoint{Node: node, SlotIndex: len(node.OutputStreams)},
Props: props,
}
node.OutputStreams = append(node.OutputStreams, str)
return str
}

func NodeDeclareInputStream(node *Node, cnt int) {
node.InputStreams = make([]*StreamVar, cnt)
}

type ValueVarType int

const (
StringValueVar ValueVarType = iota
SignalValueVar
)

type ValueVar struct {
ID int
Type ValueVarType
From EndPoint
Toes []EndPoint
Props any
Var exec.Var
}

func (v *ValueVar) To(to *Node, slotIdx int) int {
v.Toes = append(v.Toes, EndPoint{Node: to, SlotIndex: slotIdx})
to.InputValues[slotIdx] = v
return len(v.Toes) - 1
}

func NodeNewOutputValue(node *Node, props any) *ValueVar {
val := &ValueVar{
ID: node.Graph.genVarID(),
From: EndPoint{Node: node, SlotIndex: len(node.OutputStreams)},
Props: props,
}
node.OutputValues = append(node.OutputValues, val)
return val
}

func NodeDeclareInputValue(node *Node, cnt int) {
node.InputValues = make([]*ValueVar, cnt)
}

+ 112
- 0
pkgs/ioswitch/exec/driver.go View File

@@ -0,0 +1,112 @@
package exec

import (
"context"
"fmt"
"io"
"sync"

"gitlink.org.cn/cloudream/common/pkgs/future"
"gitlink.org.cn/cloudream/common/utils/io2"
)

type Driver struct {
planID PlanID
planBlder *PlanBuilder
callback *future.SetValueFuture[map[string]any]
ctx context.Context
cancel context.CancelFunc
driverExec *Executor
}

// 开始写入一个流。此函数会将输入视为一个完整的流,因此会给流包装一个Range来获取只需要的部分。
func (e *Driver) BeginWrite(str io.ReadCloser, handle *DriverWriteStream) {
handle.Var.Stream = io2.NewRange(str, handle.RangeHint.Offset, handle.RangeHint.Length)
e.driverExec.PutVars(handle.Var)
}

// 开始写入一个流。此函数默认输入流已经是Handle的RangeHint锁描述的范围,因此不会做任何其他处理
func (e *Driver) BeginWriteRanged(str io.ReadCloser, handle *DriverWriteStream) {
handle.Var.Stream = str
e.driverExec.PutVars(handle.Var)
}

func (e *Driver) BeginRead(handle *DriverReadStream) (io.ReadCloser, error) {
err := e.driverExec.BindVars(e.ctx, handle.Var)
if err != nil {
return nil, fmt.Errorf("bind vars: %w", err)
}

return handle.Var.Stream, nil
}

func (e *Driver) Signal(signal *DriverSignalVar) {
e.driverExec.PutVars(signal.Var)
}

func (e *Driver) Wait(ctx context.Context) (map[string]any, error) {
stored, err := e.callback.WaitValue(ctx)
if err != nil {
return nil, err
}

return stored, nil
}

func (e *Driver) execute() {
wg := sync.WaitGroup{}

for _, p := range e.planBlder.WorkerPlans {
wg.Add(1)

go func(p *WorkerPlanBuilder) {
defer wg.Done()

plan := Plan{
ID: e.planID,
Ops: p.Ops,
}

cli, err := p.Worker.NewClient()
if err != nil {
e.stopWith(fmt.Errorf("new client to worker %v: %w", p.Worker, err))
return
}
defer cli.Close()

err = cli.ExecutePlan(e.ctx, plan)
if err != nil {
e.stopWith(fmt.Errorf("execute plan at worker %v: %w", p.Worker, err))
return
}
}(p)
}

stored, err := e.driverExec.Run(e.ctx)
if err != nil {
e.stopWith(fmt.Errorf("run executor switch: %w", err))
return
}

wg.Wait()

e.callback.SetValue(stored)
}

func (e *Driver) stopWith(err error) {
e.callback.SetError(err)
e.cancel()
}

type DriverWriteStream struct {
Var *StreamVar
RangeHint *Range
}

type DriverReadStream struct {
Var *StreamVar
}

type DriverSignalVar struct {
Var *SignalVar
}

+ 26
- 0
pkgs/ioswitch/exec/exec.go View File

@@ -0,0 +1,26 @@
package exec

import (
"context"

"gitlink.org.cn/cloudream/common/pkgs/types"
"gitlink.org.cn/cloudream/common/utils/reflect2"
"gitlink.org.cn/cloudream/common/utils/serder"
)

type PlanID string

type Plan struct {
ID PlanID `json:"id"`
Ops []Op `json:"ops"`
}

var opUnion = serder.UseTypeUnionExternallyTagged(types.Ref(types.NewTypeUnion[Op]()))

type Op interface {
Execute(ctx context.Context, e *Executor) error
}

func UseOp[T Op]() {
opUnion.Add(reflect2.TypeOf[T]())
}

+ 165
- 0
pkgs/ioswitch/exec/executor.go View File

@@ -0,0 +1,165 @@
package exec

import (
"context"
"fmt"
"sync"

"gitlink.org.cn/cloudream/common/pkgs/future"
"gitlink.org.cn/cloudream/common/utils/lo2"
"gitlink.org.cn/cloudream/common/utils/sync2"
)

type bindingVars struct {
Waittings []Var
Bindeds []Var
Callback *future.SetVoidFuture
}

type Executor struct {
plan Plan
vars map[VarID]Var
bindings []*bindingVars
lock sync.Mutex
store map[string]any
}

func NewExecutor(plan Plan) *Executor {
planning := Executor{
plan: plan,
vars: make(map[VarID]Var),
store: make(map[string]any),
}

return &planning
}

func (s *Executor) Plan() *Plan {
return &s.plan
}

func (s *Executor) Run(ctx context.Context) (map[string]any, error) {
ctx2, cancel := context.WithCancel(ctx)
defer cancel()

err := sync2.ParallelDo(s.plan.Ops, func(o Op, idx int) error {
err := o.Execute(ctx2, s)

s.lock.Lock()
defer s.lock.Unlock()

if err != nil {
cancel()
return fmt.Errorf("%T: %w", o, err)
}

return nil
})
if err != nil {
return nil, err
}

return s.store, nil
}

func (s *Executor) BindVars(ctx context.Context, vs ...Var) error {
s.lock.Lock()

callback := future.NewSetVoid()
binding := &bindingVars{
Callback: callback,
}

for _, v := range vs {
v2 := s.vars[v.GetID()]
if v2 == nil {
binding.Waittings = append(binding.Waittings, v)
continue
}

if err := AssignVar(v2, v); err != nil {
s.lock.Unlock()
return fmt.Errorf("assign var %v to %v: %w", v2.GetID(), v.GetID(), err)
}

binding.Bindeds = append(binding.Bindeds, v)
}

if len(binding.Waittings) == 0 {
s.lock.Unlock()
return nil
}

s.bindings = append(s.bindings, binding)
s.lock.Unlock()

err := callback.Wait(ctx)

s.lock.Lock()
defer s.lock.Unlock()

s.bindings = lo2.Remove(s.bindings, binding)

return err
}

func (s *Executor) PutVars(vs ...Var) {
s.lock.Lock()
defer s.lock.Unlock()

loop:
for _, v := range vs {
for ib, b := range s.bindings {
for iw, w := range b.Waittings {
if w.GetID() != v.GetID() {
continue
}

if err := AssignVar(v, w); err != nil {
b.Callback.SetError(fmt.Errorf("assign var %v to %v: %w", v.GetID(), w.GetID(), err))
// 绑定类型不对,说明生成的执行计划有问题,怎么处理都可以,因为最终会执行失败
continue loop
}

b.Bindeds = append(b.Bindeds, w)
b.Waittings = lo2.RemoveAt(b.Waittings, iw)
if len(b.Waittings) == 0 {
b.Callback.SetVoid()
s.bindings = lo2.RemoveAt(s.bindings, ib)
}

// 绑定成功,继续最外层循环
continue loop
}

}

// 如果没有绑定,则直接放入变量表中
s.vars[v.GetID()] = v
}
}

func (s *Executor) Store(key string, val any) {
s.lock.Lock()
defer s.lock.Unlock()

s.store[key] = val
}

func BindArrayVars[T Var](sw *Executor, ctx context.Context, vs []T) error {
var vs2 []Var
for _, v := range vs {
vs2 = append(vs2, v)
}

return sw.BindVars(ctx, vs2...)
}

func PutArrayVars[T Var](sw *Executor, vs []T) {
var vs2 []Var
for _, v := range vs {
vs2 = append(vs2, v)
}

sw.PutVars(vs2...)
}

+ 123
- 0
pkgs/ioswitch/exec/plan_builder.go View File

@@ -0,0 +1,123 @@
package exec

import (
"context"

"gitlink.org.cn/cloudream/common/pkgs/future"
"gitlink.org.cn/cloudream/common/utils/lo2"
)

type PlanBuilder struct {
Vars []Var
WorkerPlans []*WorkerPlanBuilder
DriverPlan DriverPlanBuilder
}

func NewPlanBuilder() *PlanBuilder {
bld := &PlanBuilder{
DriverPlan: DriverPlanBuilder{},
}

return bld
}

func (b *PlanBuilder) AtDriver() *DriverPlanBuilder {
return &b.DriverPlan
}

func (b *PlanBuilder) AtWorker(worker WorkerInfo) *WorkerPlanBuilder {
for _, p := range b.WorkerPlans {
if p.Worker.Equals(worker) {
return p
}
}

p := &WorkerPlanBuilder{
Worker: worker,
}
b.WorkerPlans = append(b.WorkerPlans, p)

return p
}

func (b *PlanBuilder) NewStreamVar() *StreamVar {
v := &StreamVar{
ID: VarID(len(b.Vars)),
}
b.Vars = append(b.Vars, v)

return v
}

func (b *PlanBuilder) NewIntVar() *IntVar {
v := &IntVar{
ID: VarID(len(b.Vars)),
}
b.Vars = append(b.Vars, v)

return v
}

func (b *PlanBuilder) NewStringVar() *StringVar {
v := &StringVar{
ID: VarID(len(b.Vars)),
}
b.Vars = append(b.Vars, v)

return v
}
func (b *PlanBuilder) NewSignalVar() *SignalVar {
v := &SignalVar{
ID: VarID(len(b.Vars)),
}
b.Vars = append(b.Vars, v)

return v
}

func (b *PlanBuilder) Execute() *Driver {
ctx, cancel := context.WithCancel(context.Background())
planID := genRandomPlanID()

execPlan := Plan{
ID: planID,
Ops: b.DriverPlan.Ops,
}

exec := Driver{
planID: planID,
planBlder: b,
callback: future.NewSetValue[map[string]any](),
ctx: ctx,
cancel: cancel,
driverExec: NewExecutor(execPlan),
}
go exec.execute()

return &exec
}

type WorkerPlanBuilder struct {
Worker WorkerInfo
Ops []Op
}

func (b *WorkerPlanBuilder) AddOp(op Op) {
b.Ops = append(b.Ops, op)
}

func (b *WorkerPlanBuilder) RemoveOp(op Op) {
b.Ops = lo2.Remove(b.Ops, op)
}

type DriverPlanBuilder struct {
Ops []Op
}

func (b *DriverPlanBuilder) AddOp(op Op) {
b.Ops = append(b.Ops, op)
}

func (b *DriverPlanBuilder) RemoveOp(op Op) {
b.Ops = lo2.Remove(b.Ops, op)
}

+ 98
- 0
pkgs/ioswitch/exec/utils.go View File

@@ -0,0 +1,98 @@
package exec

import (
"fmt"
"reflect"

"github.com/google/uuid"
"gitlink.org.cn/cloudream/common/utils/math2"
)

func genRandomPlanID() PlanID {
return PlanID(uuid.NewString())
}

func AssignVar(from Var, to Var) error {
if reflect.TypeOf(from) != reflect.TypeOf(to) {
return fmt.Errorf("cannot assign %T to %T", from, to)
}

switch from := from.(type) {
case *StreamVar:
to.(*StreamVar).Stream = from.Stream
case *IntVar:
to.(*IntVar).Value = from.Value
case *StringVar:
to.(*StringVar).Value = from.Value
case *SignalVar:
}

return nil
}

type Range struct {
Offset int64
Length *int64
}

func (r *Range) Extend(other Range) {
newOffset := math2.Min(r.Offset, other.Offset)

if r.Length == nil {
r.Offset = newOffset
return
}

if other.Length == nil {
r.Offset = newOffset
r.Length = nil
return
}

otherEnd := other.Offset + *other.Length
rEnd := r.Offset + *r.Length

newEnd := math2.Max(otherEnd, rEnd)
r.Offset = newOffset
*r.Length = newEnd - newOffset
}

func (r *Range) ExtendStart(start int64) {
r.Offset = math2.Min(r.Offset, start)
}

func (r *Range) ExtendEnd(end int64) {
if r.Length == nil {
return
}

rEnd := r.Offset + *r.Length
newLen := math2.Max(end, rEnd) - r.Offset
r.Length = &newLen
}

func (r *Range) Fix(maxLength int64) {
if r.Length != nil {
return
}

len := maxLength - r.Offset
r.Length = &len
}

func (r *Range) ToStartEnd(maxLen int64) (start int64, end int64) {
if r.Length == nil {
return r.Offset, maxLen
}

end = r.Offset + *r.Length
return r.Offset, end
}

func (r *Range) ClampLength(maxLen int64) {
if r.Length == nil {
return
}

*r.Length = math2.Min(*r.Length, maxLen-r.Offset)
}

+ 57
- 0
pkgs/ioswitch/exec/var.go View File

@@ -0,0 +1,57 @@
package exec

import (
"io"

"gitlink.org.cn/cloudream/common/pkgs/types"
"gitlink.org.cn/cloudream/common/utils/serder"
)

type VarID int

type Var interface {
GetID() VarID
}

var VarUnion = types.NewTypeUnion[Var](
(*IntVar)(nil),
(*StringVar)(nil),
(*SignalVar)(nil),
(*StreamVar)(nil),
)
var _ = serder.UseTypeUnionExternallyTagged(&VarUnion)

type StreamVar struct {
ID VarID `json:"id"`
Stream io.ReadCloser `json:"-"`
}

func (v *StreamVar) GetID() VarID {
return v.ID
}

type IntVar struct {
ID VarID `json:"id"`
Value string `json:"value"`
}

func (v *IntVar) GetID() VarID {
return v.ID
}

type StringVar struct {
ID VarID `json:"id"`
Value string `json:"value"`
}

func (v *StringVar) GetID() VarID {
return v.ID
}

type SignalVar struct {
ID VarID `json:"id"`
}

func (v *SignalVar) GetID() VarID {
return v.ID
}

+ 103
- 0
pkgs/ioswitch/exec/worker.go View File

@@ -0,0 +1,103 @@
package exec

import (
"context"
"io"
"sync"

"github.com/samber/lo"
"gitlink.org.cn/cloudream/common/pkgs/future"
"gitlink.org.cn/cloudream/common/utils/lo2"
)

type finding struct {
PlanID PlanID
Callback *future.SetValueFuture[*Executor]
}

type Worker struct {
lock sync.Mutex
executors map[PlanID]*Executor
findings []*finding
}

func NewWorker() Worker {
return Worker{
executors: make(map[PlanID]*Executor),
}
}

func (s *Worker) Add(exe *Executor) {
s.lock.Lock()
defer s.lock.Unlock()

s.executors[exe.Plan().ID] = exe

s.findings = lo.Reject(s.findings, func(f *finding, idx int) bool {
if f.PlanID != exe.Plan().ID {
return false
}

f.Callback.SetValue(exe)
return true
})
}

func (s *Worker) Remove(sw *Executor) {
s.lock.Lock()
defer s.lock.Unlock()

delete(s.executors, sw.Plan().ID)
}

func (s *Worker) FindByID(id PlanID) *Executor {
s.lock.Lock()
defer s.lock.Unlock()

return s.executors[id]
}

func (s *Worker) FindByIDContexted(ctx context.Context, id PlanID) *Executor {
s.lock.Lock()

sw := s.executors[id]
if sw != nil {
s.lock.Unlock()
return sw
}

cb := future.NewSetValue[*Executor]()
f := &finding{
PlanID: id,
Callback: cb,
}
s.findings = append(s.findings, f)

s.lock.Unlock()

sw, _ = cb.WaitValue(ctx)

s.lock.Lock()
defer s.lock.Unlock()

s.findings = lo2.Remove(s.findings, f)

return sw
}

type WorkerInfo interface {
NewClient() (WorkerClient, error)
// 判断两个worker是否相同
Equals(worker WorkerInfo) bool
// Worker信息,比如ID、地址等
String() string
}

type WorkerClient interface {
ExecutePlan(ctx context.Context, plan Plan) error
SendStream(ctx context.Context, planID PlanID, v *StreamVar, str io.ReadCloser) error
SendVar(ctx context.Context, planID PlanID, v Var) error
GetStream(ctx context.Context, planID PlanID, v *StreamVar, signal *SignalVar) (io.ReadCloser, error)
GetVar(ctx context.Context, planID PlanID, v Var, signal *SignalVar) error
Close() error
}

+ 157
- 0
pkgs/ioswitch/plan/generate.go View File

@@ -0,0 +1,157 @@
package plan

import (
"gitlink.org.cn/cloudream/common/pkgs/ioswitch/dag"
"gitlink.org.cn/cloudream/common/pkgs/ioswitch/exec"
"gitlink.org.cn/cloudream/common/pkgs/ioswitch/plan/ops"
"gitlink.org.cn/cloudream/storage/common/pkgs/ioswitch"
)

func Generate(graph *dag.Graph, planBld *exec.PlanBuilder) error {
generateSend(graph)
return buildPlan(graph, planBld)
}

// 生成Send指令
func generateSend(graph *dag.Graph) {
graph.Walk(func(node *dag.Node) bool {
for _, out := range node.OutputStreams {
to := out.Toes[0]
if to.Node.Env.Equals(node.Env) {
continue
}

switch to.Node.Env.Type {
case dag.EnvDriver:
// // 如果是要送到Driver,则只能由Driver主动去拉取
getNode := graph.NewNode(&ops.GetStreamType{}, ioswitch.NodeProps{})
getNode.Env.ToEnvDriver()

// // 同时需要对此变量生成HoldUntil指令,避免Plan结束时Get指令还未到达
holdNode := graph.NewNode(&ops.HoldUntilType{}, ioswitch.NodeProps{})
holdNode.Env = node.Env

// 将Get指令的信号送到Hold指令
getNode.OutputValues[0].To(holdNode, 0)
// 将Get指令的输出送到目的地
getNode.OutputStreams[0].To(to.Node, to.SlotIndex)
out.Toes = nil
// 将源节点的输出送到Hold指令
out.To(holdNode, 0)
// 将Hold指令的输出送到Get指令
holdNode.OutputStreams[0].To(getNode, 0)

case dag.EnvWorker:
// 如果是要送到Agent,则可以直接发送
n := graph.NewNode(&ops.SendStreamType{}, ioswitch.NodeProps{})
n.Env = node.Env
n.OutputStreams[0].To(to.Node, to.SlotIndex)
out.Toes = nil
out.To(n, 0)
}
}

for _, out := range node.OutputValues {
to := out.Toes[0]
if to.Node.Env.Equals(node.Env) {
continue
}

switch to.Node.Env.Type {
case dag.EnvDriver:
// // 如果是要送到Driver,则只能由Driver主动去拉取
getNode := graph.NewNode(&ops.GetVaType{}, ioswitch.NodeProps{})
getNode.Env.ToEnvDriver()

// // 同时需要对此变量生成HoldUntil指令,避免Plan结束时Get指令还未到达
holdNode := graph.NewNode(&ops.HoldUntilType{}, ioswitch.NodeProps{})
holdNode.Env = node.Env

// 将Get指令的信号送到Hold指令
getNode.OutputValues[0].To(holdNode, 0)
// 将Get指令的输出送到目的地
getNode.OutputValues[1].To(to.Node, to.SlotIndex)
out.Toes = nil
// 将源节点的输出送到Hold指令
out.To(holdNode, 0)
// 将Hold指令的输出送到Get指令
holdNode.OutputValues[0].To(getNode, 0)

case dag.EnvWorker:
// 如果是要送到Agent,则可以直接发送
n := graph.NewNode(&ops.SendVarType{}, ioswitch.NodeProps{})
n.Env = node.Env
n.OutputValues[0].To(to.Node, to.SlotIndex)
out.Toes = nil
out.To(n, 0)
}
}

return true
})
}

// 生成Plan
func buildPlan(graph *dag.Graph, blder *exec.PlanBuilder) error {
var retErr error
graph.Walk(func(node *dag.Node) bool {
for _, out := range node.OutputStreams {
if out.Var != nil {
continue
}

out.Var = blder.NewStreamVar()
}

for _, in := range node.InputStreams {
if in.Var != nil {
continue
}

in.Var = blder.NewStreamVar()
}

for _, out := range node.OutputValues {
if out.Var != nil {
continue
}

switch out.Type {
case dag.StringValueVar:
out.Var = blder.NewStringVar()
case dag.SignalValueVar:
out.Var = blder.NewSignalVar()
}
}

for _, in := range node.InputValues {
if in.Var != nil {
continue
}

switch in.Type {
case dag.StringValueVar:
in.Var = blder.NewStringVar()
case dag.SignalValueVar:
in.Var = blder.NewSignalVar()
}
}

op, err := node.Type.GenerateOp(node)
if err != nil {
retErr = err
return false
}

switch node.Env.Type {
case dag.EnvDriver:
blder.AtDriver().AddOp(op)
case dag.EnvWorker:
blder.AtWorker(node.Env.Worker).AddOp(op)
}

return true
})

return retErr
}

+ 43
- 0
pkgs/ioswitch/plan/ops/driver.go View File

@@ -0,0 +1,43 @@
package ops

import (
"fmt"

"gitlink.org.cn/cloudream/common/pkgs/ioswitch/dag"
"gitlink.org.cn/cloudream/common/pkgs/ioswitch/exec"
)

type FromDriverType struct {
Handle *exec.DriverWriteStream
}

func (t *FromDriverType) InitNode(node *dag.Node) {
dag.NodeNewOutputStream(node, nil)
}

func (t *FromDriverType) GenerateOp(op *dag.Node) (exec.Op, error) {
t.Handle.Var = op.OutputStreams[0].Var
return nil, nil
}

func (t *FromDriverType) String(node *dag.Node) string {
return fmt.Sprintf("FromDriver[]%v%v", formatStreamIO(node), formatValueIO(node))
}

type ToDriverType struct {
Handle *exec.DriverReadStream
Range exec.Range
}

func (t *ToDriverType) InitNode(node *dag.Node) {
dag.NodeDeclareInputStream(node, 1)
}

func (t *ToDriverType) GenerateOp(op *dag.Node) (exec.Op, error) {
t.Handle.Var = op.InputStreams[0].Var
return nil, nil
}

func (t *ToDriverType) String(node *dag.Node) string {
return fmt.Sprintf("ToDriver[%v+%v]%v%v", t.Range.Offset, t.Range.Length, formatStreamIO(node), formatValueIO(node))
}

+ 52
- 0
pkgs/ioswitch/plan/ops/drop.go View File

@@ -0,0 +1,52 @@
package ops

import (
"context"
"fmt"
"io"

"gitlink.org.cn/cloudream/common/pkgs/ioswitch/dag"
"gitlink.org.cn/cloudream/common/pkgs/ioswitch/exec"
)

func init() {
exec.UseOp[*DropStream]()
}

type DropStream struct {
Input *exec.StreamVar `json:"input"`
}

func (o *DropStream) Execute(ctx context.Context, e *exec.Executor) error {
err := e.BindVars(ctx, o.Input)
if err != nil {
return err
}

for {
buf := make([]byte, 1024*8)
_, err = o.Input.Stream.Read(buf)
if err == io.EOF {
return nil
}
if err != nil {
return err
}
}
}

type DropType struct{}

func (t *DropType) InitNode(node *dag.Node) {
dag.NodeDeclareInputStream(node, 1)
}

func (t *DropType) GenerateOp(op *dag.Node) (exec.Op, error) {
return &DropStream{
Input: op.InputStreams[0].Var,
}, nil
}

func (t *DropType) String(node *dag.Node) string {
return fmt.Sprintf("Drop[]%v%v", formatStreamIO(node), formatValueIO(node))
}

+ 224
- 0
pkgs/ioswitch/plan/ops/send.go View File

@@ -0,0 +1,224 @@
package ops

import (
"context"
"fmt"
"io"

"gitlink.org.cn/cloudream/common/pkgs/future"
"gitlink.org.cn/cloudream/common/pkgs/ioswitch/dag"
"gitlink.org.cn/cloudream/common/pkgs/ioswitch/exec"
"gitlink.org.cn/cloudream/common/pkgs/logger"
"gitlink.org.cn/cloudream/common/utils/io2"
)

func init() {
exec.UseOp[*SendStream]()
exec.UseOp[*GetStream]()
exec.UseOp[*SendVar]()
exec.UseOp[*GetVar]()
}

type SendStream struct {
Input *exec.StreamVar `json:"input"`
Send *exec.StreamVar `json:"send"`
Worker exec.WorkerInfo `json:"worker"`
}

func (o *SendStream) Execute(ctx context.Context, e *exec.Executor) error {
err := e.BindVars(ctx, o.Input)
if err != nil {
return err
}
defer o.Input.Stream.Close()

cli, err := o.Worker.NewClient()
if err != nil {
return fmt.Errorf("new worker %v client: %w", o.Worker, err)
}
defer cli.Close()

logger.Debugf("sending stream %v as %v to worker %v", o.Input.ID, o.Send.ID, o.Worker)

// 发送后流的ID不同
err = cli.SendStream(ctx, e.Plan().ID, o.Send, o.Input.Stream)
if err != nil {
return fmt.Errorf("sending stream: %w", err)
}

return nil
}

type GetStream struct {
Signal *exec.SignalVar `json:"signal"`
Target *exec.StreamVar `json:"target"`
Output *exec.StreamVar `json:"output"`
Worker exec.WorkerInfo `json:"worker"`
}

func (o *GetStream) Execute(ctx context.Context, e *exec.Executor) error {
cli, err := o.Worker.NewClient()
if err != nil {
return fmt.Errorf("new worker %v client: %w", o.Worker, err)
}
defer cli.Close()

logger.Debugf("getting stream %v as %v from worker %v", o.Target.ID, o.Output.ID, o.Worker)

str, err := cli.GetStream(ctx, e.Plan().ID, o.Target, o.Signal)
if err != nil {
return fmt.Errorf("getting stream: %w", err)
}

fut := future.NewSetVoid()
// 获取后送到本地的流ID是不同的
o.Output.Stream = io2.AfterReadClosedOnce(str, func(closer io.ReadCloser) {
fut.SetVoid()
})
e.PutVars(o.Output)

return fut.Wait(ctx)
}

type SendVar struct {
Input exec.Var `json:"input"`
Send exec.Var `json:"send"`
Worker exec.WorkerInfo `json:"worker"`
}

func (o *SendVar) Execute(ctx context.Context, e *exec.Executor) error {
err := e.BindVars(ctx, o.Input)
if err != nil {
return err
}

cli, err := o.Worker.NewClient()
if err != nil {
return fmt.Errorf("new worker %v client: %w", o.Worker, err)
}
defer cli.Close()

logger.Debugf("sending var %v as %v to worker %v", o.Input.GetID(), o.Send.GetID(), o.Worker)

exec.AssignVar(o.Input, o.Send)
err = cli.SendVar(ctx, e.Plan().ID, o.Send)
if err != nil {
return fmt.Errorf("sending var: %w", err)
}

return nil
}

type GetVar struct {
Signal *exec.SignalVar `json:"signal"`
Target exec.Var `json:"target"`
Output exec.Var `json:"output"`
Worker exec.WorkerInfo `json:"worker"`
}

func (o *GetVar) Execute(ctx context.Context, e *exec.Executor) error {
cli, err := o.Worker.NewClient()
if err != nil {
return fmt.Errorf("new worker %v client: %w", o.Worker, err)
}
defer cli.Close()

logger.Debugf("getting var %v as %v from worker %v", o.Target.GetID(), o.Output.GetID(), o.Worker)

err = cli.GetVar(ctx, e.Plan().ID, o.Target, o.Signal)
if err != nil {
return fmt.Errorf("getting var: %w", err)
}
exec.AssignVar(o.Target, o.Output)
e.PutVars(o.Output)

return nil
}

type SendStreamType struct {
ToWorker exec.WorkerInfo
}

func (t *SendStreamType) InitNode(node *dag.Node) {
dag.NodeDeclareInputStream(node, 1)
dag.NodeNewOutputStream(node, nil)
}

func (t *SendStreamType) GenerateOp(op *dag.Node) (exec.Op, error) {
return &SendStream{
Input: op.InputStreams[0].Var,
Send: op.OutputStreams[0].Var,
Worker: t.ToWorker,
}, nil
}

func (t *SendStreamType) String(node *dag.Node) string {
return fmt.Sprintf("SendStream[]%v%v", formatStreamIO(node), formatValueIO(node))
}

type SendVarType struct {
ToWorker exec.WorkerInfo
}

func (t *SendVarType) InitNode(node *dag.Node) {
dag.NodeDeclareInputValue(node, 1)
dag.NodeNewOutputValue(node, nil)
}

func (t *SendVarType) GenerateOp(op *dag.Node) (exec.Op, error) {
return &SendVar{
Input: op.InputValues[0].Var,
Send: op.OutputValues[0].Var,
Worker: t.ToWorker,
}, nil
}

func (t *SendVarType) String(node *dag.Node) string {
return fmt.Sprintf("SendVar[]%v%v", formatStreamIO(node), formatValueIO(node))
}

type GetStreamType struct {
FromWorker exec.WorkerInfo
}

func (t *GetStreamType) InitNode(node *dag.Node) {
dag.NodeDeclareInputStream(node, 1)
dag.NodeNewOutputValue(node, nil)
dag.NodeNewOutputStream(node, nil)
}

func (t *GetStreamType) GenerateOp(op *dag.Node) (exec.Op, error) {
return &GetStream{
Signal: op.OutputValues[0].Var.(*exec.SignalVar),
Output: op.OutputStreams[0].Var,
Target: op.InputStreams[0].Var,
Worker: t.FromWorker,
}, nil
}

func (t *GetStreamType) String(node *dag.Node) string {
return fmt.Sprintf("GetStream[]%v%v", formatStreamIO(node), formatValueIO(node))
}

type GetVaType struct {
FromWorker exec.WorkerInfo
}

func (t *GetVaType) InitNode(node *dag.Node) {
dag.NodeDeclareInputValue(node, 1)
dag.NodeNewOutputValue(node, nil)
dag.NodeNewOutputValue(node, nil)
}

func (t *GetVaType) GenerateOp(op *dag.Node) (exec.Op, error) {
return &GetVar{
Signal: op.OutputValues[0].Var.(*exec.SignalVar),
Output: op.OutputValues[1].Var,
Target: op.InputValues[0].Var,
Worker: t.FromWorker,
}, nil
}

func (t *GetVaType) String(node *dag.Node) string {
return fmt.Sprintf("GetVar[]%v%v", formatStreamIO(node), formatValueIO(node))
}

+ 49
- 0
pkgs/ioswitch/plan/ops/store.go View File

@@ -0,0 +1,49 @@
package ops

import (
"context"
"fmt"

"gitlink.org.cn/cloudream/common/pkgs/ioswitch/dag"
"gitlink.org.cn/cloudream/common/pkgs/ioswitch/exec"
)

type Store struct {
Var exec.Var
Key string
}

func (o *Store) Execute(ctx context.Context, e *exec.Executor) error {
err := e.BindVars(ctx, o.Var)
if err != nil {
return err
}

switch v := o.Var.(type) {
case *exec.IntVar:
e.Store(o.Key, v.Value)
case *exec.StringVar:
e.Store(o.Key, v.Value)
}

return nil
}

type StoreType struct {
StoreKey string
}

func (t *StoreType) InitNode(node *dag.Node) {
dag.NodeDeclareInputValue(node, 1)
}

func (t *StoreType) GenerateOp(op *dag.Node) (exec.Op, error) {
return &Store{
Var: op.InputValues[0].Var,
Key: t.StoreKey,
}, nil
}

func (t *StoreType) String(node *dag.Node) string {
return fmt.Sprintf("Store[%s]%v%v", t.StoreKey, formatStreamIO(node), formatValueIO(node))
}

+ 172
- 0
pkgs/ioswitch/plan/ops/sync.go View File

@@ -0,0 +1,172 @@
package ops

import (
"context"
"fmt"
"io"

"gitlink.org.cn/cloudream/common/pkgs/future"
"gitlink.org.cn/cloudream/common/pkgs/ioswitch/dag"
"gitlink.org.cn/cloudream/common/pkgs/ioswitch/exec"
)

func init() {
exec.UseOp[*OnStreamBegin]()
exec.UseOp[*OnStreamEnd]()
exec.UseOp[*HoldUntil]()
exec.UseOp[*HangUntil]()
exec.UseOp[*Broadcast]()
}

type OnStreamBegin struct {
Raw *exec.StreamVar `json:"raw"`
New *exec.StreamVar `json:"new"`
Signal *exec.SignalVar `json:"signal"`
}

func (o *OnStreamBegin) Execute(ctx context.Context, e *exec.Executor) error {
err := e.BindVars(ctx, o.Raw)
if err != nil {
return err
}

o.New.Stream = o.Raw.Stream

e.PutVars(o.New, o.Signal)
return nil
}

type OnStreamEnd struct {
Raw *exec.StreamVar `json:"raw"`
New *exec.StreamVar `json:"new"`
Signal *exec.SignalVar `json:"signal"`
}

type onStreamEnd struct {
inner io.ReadCloser
callback *future.SetVoidFuture
}

func (o *onStreamEnd) Read(p []byte) (n int, err error) {
n, err = o.inner.Read(p)
if err == io.EOF {
o.callback.SetVoid()
} else if err != nil {
o.callback.SetError(err)
}
return n, err
}

func (o *onStreamEnd) Close() error {
o.callback.SetError(fmt.Errorf("stream closed early"))
return o.inner.Close()
}

func (o *OnStreamEnd) Execute(ctx context.Context, e *exec.Executor) error {
err := e.BindVars(ctx, o.Raw)
if err != nil {
return err
}

cb := future.NewSetVoid()

o.New.Stream = &onStreamEnd{
inner: o.Raw.Stream,
callback: cb,
}
e.PutVars(o.New)

err = cb.Wait(ctx)
if err != nil {
return err
}

e.PutVars(o.Signal)
return nil
}

type HoldUntil struct {
Waits []*exec.SignalVar `json:"waits"`
Holds []exec.Var `json:"holds"`
Emits []exec.Var `json:"emits"`
}

func (w *HoldUntil) Execute(ctx context.Context, e *exec.Executor) error {
err := e.BindVars(ctx, w.Holds...)
if err != nil {
return err
}

err = exec.BindArrayVars(e, ctx, w.Waits)
if err != nil {
return err
}

for i := 0; i < len(w.Holds); i++ {
err := exec.AssignVar(w.Holds[i], w.Emits[i])
if err != nil {
return err
}
}

e.PutVars(w.Emits...)
return nil
}

type HangUntil struct {
Waits []*exec.SignalVar `json:"waits"`
Op exec.Op `json:"op"`
}

func (h *HangUntil) Execute(ctx context.Context, e *exec.Executor) error {
err := exec.BindArrayVars(e, ctx, h.Waits)
if err != nil {
return err
}

return h.Op.Execute(ctx, e)
}

type Broadcast struct {
Source *exec.SignalVar `json:"source"`
Targets []*exec.SignalVar `json:"targets"`
}

func (b *Broadcast) Execute(ctx context.Context, e *exec.Executor) error {
err := e.BindVars(ctx, b.Source)
if err != nil {
return err
}

exec.PutArrayVars(e, b.Targets)
return nil
}

type HoldUntilType struct {
}

func (t *HoldUntilType) InitNode(node *dag.Node) {
dag.NodeDeclareInputValue(node, 1)
}

func (t *HoldUntilType) GenerateOp(op *dag.Node) (exec.Op, error) {
o := &HoldUntil{
Waits: []*exec.SignalVar{op.InputValues[0].Var.(*exec.SignalVar)},
}

for i := 0; i < len(op.OutputValues); i++ {
o.Holds = append(o.Holds, op.InputValues[i+1].Var)
o.Emits = append(o.Emits, op.OutputValues[i].Var)
}

for i := 0; i < len(op.OutputStreams); i++ {
o.Holds = append(o.Holds, op.InputStreams[i].Var)
o.Emits = append(o.Emits, op.OutputStreams[i].Var)
}

return o, nil
}

func (t *HoldUntilType) String(node *dag.Node) string {
return fmt.Sprintf("HoldUntil[]%v%v", formatStreamIO(node), formatValueIO(node))
}

+ 75
- 0
pkgs/ioswitch/plan/ops/utils.go View File

@@ -0,0 +1,75 @@
package ops

import (
"fmt"

"gitlink.org.cn/cloudream/common/pkgs/ioswitch/dag"
)

func formatStreamIO(node *dag.Node) string {
is := ""
for i, in := range node.InputStreams {
if i > 0 {
is += ","
}

if in == nil {
is += "."
} else {
is += fmt.Sprintf("%v", in.ID)
}
}

os := ""
for i, out := range node.OutputStreams {
if i > 0 {
os += ","
}

if out == nil {
os += "."
} else {
os += fmt.Sprintf("%v", out.ID)
}
}

if is == "" && os == "" {
return ""
}

return fmt.Sprintf("S{%s>%s}", is, os)
}

func formatValueIO(node *dag.Node) string {
is := ""
for i, in := range node.InputValues {
if i > 0 {
is += ","
}

if in == nil {
is += "."
} else {
is += fmt.Sprintf("%v", in.ID)
}
}

os := ""
for i, out := range node.OutputValues {
if i > 0 {
os += ","
}

if out == nil {
os += "."
} else {
os += fmt.Sprintf("%v", out.ID)
}
}

if is == "" && os == "" {
return ""
}

return fmt.Sprintf("V{%s>%s}", is, os)
}

+ 20
- 0
pkgs/ioswitch/plan/ops/var.go View File

@@ -0,0 +1,20 @@
package ops

import (
"context"

"gitlink.org.cn/cloudream/common/pkgs/ioswitch/exec"
)

func init() {
exec.UseOp[*ConstVar]()
}

type ConstVar struct {
Var *exec.StringVar `json:"var"`
}

func (o *ConstVar) Execute(ctx context.Context, e *exec.Executor) error {
e.PutVars(o.Var)
return nil
}

Loading…
Cancel
Save