JeshuaRen 1 year ago
parent
commit
6b852356f9
18 changed files with 344 additions and 462 deletions
  1. +2
    -23
      pkgs/ioswitch/dag/graph.go
  2. +35
    -40
      pkgs/ioswitch/dag/node.go
  3. +11
    -50
      pkgs/ioswitch/dag/var.go
  4. +9
    -10
      pkgs/ioswitch/exec/driver.go
  5. +61
    -76
      pkgs/ioswitch/exec/executor.go
  6. +6
    -33
      pkgs/ioswitch/exec/plan_builder.go
  7. +0
    -21
      pkgs/ioswitch/exec/utils.go
  8. +55
    -28
      pkgs/ioswitch/exec/var.go
  9. +7
    -4
      pkgs/ioswitch/exec/worker.go
  10. +8
    -26
      pkgs/ioswitch/plan/generate.go
  11. +8
    -8
      pkgs/ioswitch/plan/ops/driver.go
  12. +7
    -7
      pkgs/ioswitch/plan/ops/drop.go
  13. +49
    -50
      pkgs/ioswitch/plan/ops/send.go
  14. +6
    -12
      pkgs/ioswitch/plan/ops/store.go
  15. +43
    -47
      pkgs/ioswitch/plan/ops/sync.go
  16. +3
    -2
      pkgs/ioswitch/plan/ops/var.go
  17. +2
    -2
      pkgs/ioswitch/utils/utils.go
  18. +32
    -23
      sdks/storage/cdsapi/hub_io.go

+ 2
- 23
pkgs/ioswitch/dag/graph.go View File

@@ -7,7 +7,6 @@ import (
type Graph struct { type Graph struct {
Nodes []Node Nodes []Node
isWalking bool isWalking bool
nextVarID int
} }


func NewGraph() *Graph { func NewGraph() *Graph {
@@ -48,28 +47,8 @@ func (g *Graph) Walk(cb func(node Node) bool) {
g.Nodes = lo2.RemoveAllDefault(g.Nodes) g.Nodes = lo2.RemoveAllDefault(g.Nodes)
} }


func (g *Graph) NewStreamVar() *StreamVar {
str := &StreamVar{
VarBase: VarBase{
id: g.genVarID(),
},
}
return str
}

func (g *Graph) NewValueVar(valType ValueVarType) *ValueVar {
val := &ValueVar{
VarBase: VarBase{
id: g.genVarID(),
},
Type: valType,
}
return val
}

func (g *Graph) genVarID() int {
g.nextVarID++
return g.nextVarID
func (g *Graph) NewVar() *Var {
return &Var{}
} }


func AddNode[N Node](graph *Graph, typ N) N { func AddNode[N Node](graph *Graph, typ N) N {


+ 35
- 40
pkgs/ioswitch/dag/node.go View File

@@ -50,78 +50,77 @@ type Node interface {
Graph() *Graph Graph() *Graph
SetGraph(graph *Graph) SetGraph(graph *Graph)
Env() *NodeEnv Env() *NodeEnv
InputStreams() *InputSlots[*StreamVar]
OutputStreams() *OutputSlots[*StreamVar]
InputValues() *InputSlots[*ValueVar]
OutputValues() *OutputSlots[*ValueVar]
InputStreams() *InputSlots
OutputStreams() *OutputSlots
InputValues() *InputSlots
OutputValues() *OutputSlots
GenerateOp() (exec.Op, error) GenerateOp() (exec.Op, error)
// String() string // String() string
} }


type VarSlots[T Var] []T
type VarSlots []*Var


func (s *VarSlots[T]) Len() int {
func (s *VarSlots) Len() int {
return len(*s) return len(*s)
} }


func (s *VarSlots[T]) Get(idx int) T {
func (s *VarSlots) Get(idx int) *Var {
return (*s)[idx] return (*s)[idx]
} }


func (s *VarSlots[T]) Set(idx int, val T) T {
func (s *VarSlots) Set(idx int, val *Var) *Var {
old := (*s)[idx] old := (*s)[idx]
(*s)[idx] = val (*s)[idx] = val
return old return old
} }


func (s *VarSlots[T]) Append(val T) int {
func (s *VarSlots) Append(val *Var) int {
*s = append(*s, val) *s = append(*s, val)
return s.Len() - 1 return s.Len() - 1
} }


func (s *VarSlots[T]) RemoveAt(idx int) {
func (s *VarSlots) RemoveAt(idx int) {
(*s) = lo2.RemoveAt(*s, idx) (*s) = lo2.RemoveAt(*s, idx)
} }


func (s *VarSlots[T]) Resize(size int) {
func (s *VarSlots) Resize(size int) {
if s.Len() < size { if s.Len() < size {
*s = append(*s, make([]T, size-s.Len())...)
*s = append(*s, make([]*Var, size-s.Len())...)
} else if s.Len() > size { } else if s.Len() > size {
*s = (*s)[:size] *s = (*s)[:size]
} }
} }


func (s *VarSlots[T]) SetRawArray(arr []T) {
func (s *VarSlots) SetRawArray(arr []*Var) {
*s = arr *s = arr
} }


func (s *VarSlots[T]) RawArray() []T {
func (s *VarSlots) RawArray() []*Var {
return *s return *s
} }


type InputSlots[T Var] struct {
VarSlots[T]
type InputSlots struct {
VarSlots
} }


func (s *InputSlots[T]) EnsureSize(cnt int) {
func (s *InputSlots) EnsureSize(cnt int) {
if s.Len() < cnt { if s.Len() < cnt {
s.VarSlots = append(s.VarSlots, make([]T, cnt-s.Len())...)
s.VarSlots = append(s.VarSlots, make([]*Var, cnt-s.Len())...)
} }
} }


func (s *InputSlots[T]) EnlargeOne() int {
var t T
s.Append(t)
func (s *InputSlots) EnlargeOne() int {
s.Append(nil)
return s.Len() - 1 return s.Len() - 1
} }


type OutputSlots[T Var] struct {
VarSlots[T]
type OutputSlots struct {
VarSlots
} }


func (s *OutputSlots[T]) Setup(my Node, v T, slotIdx int) {
func (s *OutputSlots) Setup(my Node, v *Var, slotIdx int) {
if s.Len() <= slotIdx { if s.Len() <= slotIdx {
s.VarSlots = append(s.VarSlots, make([]T, slotIdx-s.Len()+1)...)
s.VarSlots = append(s.VarSlots, make([]*Var, slotIdx-s.Len()+1)...)
} }


s.Set(slotIdx, v) s.Set(slotIdx, v)
@@ -131,7 +130,7 @@ func (s *OutputSlots[T]) Setup(my Node, v T, slotIdx int) {
} }
} }


func (s *OutputSlots[T]) SetupNew(my Node, v T) {
func (s *OutputSlots) SetupNew(my Node, v *Var) {
s.Append(v) s.Append(v)
*v.From() = EndPoint{ *v.From() = EndPoint{
Node: my, Node: my,
@@ -139,21 +138,17 @@ func (s *OutputSlots[T]) SetupNew(my Node, v T) {
} }
} }


type Slot[T Var] struct {
Var T
type Slot struct {
Var *Var
Index int Index int
} }


type StreamSlot = Slot[*StreamVar]

type ValueSlot = Slot[*ValueVar]

type NodeBase struct { type NodeBase struct {
env NodeEnv env NodeEnv
inputStreams InputSlots[*StreamVar]
outputStreams OutputSlots[*StreamVar]
inputValues InputSlots[*ValueVar]
outputValues OutputSlots[*ValueVar]
inputStreams InputSlots
outputStreams OutputSlots
inputValues InputSlots
outputValues OutputSlots
graph *Graph graph *Graph
} }


@@ -169,18 +164,18 @@ func (n *NodeBase) Env() *NodeEnv {
return &n.env return &n.env
} }


func (n *NodeBase) InputStreams() *InputSlots[*StreamVar] {
func (n *NodeBase) InputStreams() *InputSlots {
return &n.inputStreams return &n.inputStreams
} }


func (n *NodeBase) OutputStreams() *OutputSlots[*StreamVar] {
func (n *NodeBase) OutputStreams() *OutputSlots {
return &n.outputStreams return &n.outputStreams
} }


func (n *NodeBase) InputValues() *InputSlots[*ValueVar] {
func (n *NodeBase) InputValues() *InputSlots {
return &n.inputValues return &n.inputValues
} }


func (n *NodeBase) OutputValues() *OutputSlots[*ValueVar] {
func (n *NodeBase) OutputValues() *OutputSlots {
return &n.outputValues return &n.outputValues
} }

+ 11
- 50
pkgs/ioswitch/dag/var.go View File

@@ -5,12 +5,6 @@ import (
"gitlink.org.cn/cloudream/common/utils/lo2" "gitlink.org.cn/cloudream/common/utils/lo2"
) )


type Var interface {
ID() int
From() *EndPoint
To() *EndPointSlots
}

type EndPoint struct { type EndPoint struct {
Node Node Node Node
SlotIndex int // 所连接的Node的Output或Input数组的索引 SlotIndex int // 所连接的Node的Output或Input数组的索引
@@ -56,66 +50,33 @@ func (s *EndPointSlots) RawArray() []EndPoint {
return *s return *s
} }


type VarBase struct {
id int
from EndPoint
to EndPointSlots
}

func (v *VarBase) ID() int {
return v.id
type Var struct {
VarID exec.VarID
from EndPoint
to EndPointSlots
} }


func (v *VarBase) From() *EndPoint {
func (v *Var) From() *EndPoint {
return &v.from return &v.from
} }


func (v *VarBase) To() *EndPointSlots {
func (v *Var) To() *EndPointSlots {
return &v.to return &v.to
} }


type StreamVar struct {
VarBase
Var *exec.StreamVar
}

func (v *StreamVar) Connect(to Node, slotIdx int) {
func (v *Var) Connect(to Node, slotIdx int) {
v.To().Add(EndPoint{Node: to, SlotIndex: slotIdx}) v.To().Add(EndPoint{Node: to, SlotIndex: slotIdx})
to.InputStreams().Set(slotIdx, v)
to.InputValues().Set(slotIdx, v)
} }


func (v *StreamVar) Disconnect(node Node, slotIdx int) {
func (v *Var) Disconnect(node Node, slotIdx int) {
v.to.Remove(EndPoint{Node: node, SlotIndex: slotIdx}) v.to.Remove(EndPoint{Node: node, SlotIndex: slotIdx})
node.InputStreams().Set(slotIdx, nil)
node.InputValues().Set(slotIdx, nil)
} }


func (v *StreamVar) DisconnectAll() {
func (v *Var) DisconnectAll() {
for _, ed := range v.to { for _, ed := range v.to {
ed.Node.InputStreams().Set(ed.SlotIndex, nil) ed.Node.InputStreams().Set(ed.SlotIndex, nil)
} }
v.to = nil v.to = nil
} }

type ValueVarType int

const (
UnknownValueVar ValueVarType = iota
StringValueVar
SignalValueVar
)

type ValueVar struct {
VarBase
Type ValueVarType
Var exec.Var
}

func (v *ValueVar) Connect(to Node, slotIdx int) {
v.To().Add(EndPoint{Node: to, SlotIndex: slotIdx})
to.InputValues().Set(slotIdx, v)
}

func (v *ValueVar) Disconnect(node Node, slotIdx int) {
v.to.Remove(EndPoint{Node: node, SlotIndex: slotIdx})
node.InputValues().Set(slotIdx, nil)
}

+ 9
- 10
pkgs/ioswitch/exec/driver.go View File

@@ -21,27 +21,25 @@ type Driver struct {


// 开始写入一个流。此函数会将输入视为一个完整的流,因此会给流包装一个Range来获取只需要的部分。 // 开始写入一个流。此函数会将输入视为一个完整的流,因此会给流包装一个Range来获取只需要的部分。
func (e *Driver) BeginWrite(str io.ReadCloser, handle *DriverWriteStream) { func (e *Driver) BeginWrite(str io.ReadCloser, handle *DriverWriteStream) {
handle.Var.Stream = io2.NewRange(str, handle.RangeHint.Offset, handle.RangeHint.Length)
e.driverExec.PutVars(handle.Var)
e.driverExec.PutVar(handle.ID, &StreamValue{Stream: io2.NewRange(str, handle.RangeHint.Offset, handle.RangeHint.Length)})
} }


// 开始写入一个流。此函数默认输入流已经是Handle的RangeHint锁描述的范围,因此不会做任何其他处理 // 开始写入一个流。此函数默认输入流已经是Handle的RangeHint锁描述的范围,因此不会做任何其他处理
func (e *Driver) BeginWriteRanged(str io.ReadCloser, handle *DriverWriteStream) { func (e *Driver) BeginWriteRanged(str io.ReadCloser, handle *DriverWriteStream) {
handle.Var.Stream = str
e.driverExec.PutVars(handle.Var)
e.driverExec.PutVar(handle.ID, &StreamValue{Stream: str})
} }


func (e *Driver) BeginRead(handle *DriverReadStream) (io.ReadCloser, error) { func (e *Driver) BeginRead(handle *DriverReadStream) (io.ReadCloser, error) {
err := e.driverExec.BindVars(e.ctx.Context, handle.Var)
str, err := BindVar[*StreamValue](e.driverExec, e.ctx.Context, handle.ID)
if err != nil { if err != nil {
return nil, fmt.Errorf("bind vars: %w", err) return nil, fmt.Errorf("bind vars: %w", err)
} }


return handle.Var.Stream, nil
return str.Stream, nil
} }


func (e *Driver) Signal(signal *DriverSignalVar) { func (e *Driver) Signal(signal *DriverSignalVar) {
e.driverExec.PutVars(signal.Var)
e.driverExec.PutVar(signal.ID, &SignalValue{})
} }


func (e *Driver) Wait(ctx context.Context) (map[string]any, error) { func (e *Driver) Wait(ctx context.Context) (map[string]any, error) {
@@ -99,14 +97,15 @@ func (e *Driver) stopWith(err error) {
} }


type DriverWriteStream struct { type DriverWriteStream struct {
Var *StreamVar
ID VarID
RangeHint *Range RangeHint *Range
} }


type DriverReadStream struct { type DriverReadStream struct {
Var *StreamVar
ID VarID
} }


type DriverSignalVar struct { type DriverSignalVar struct {
Var *SignalVar
ID VarID
Signal SignalValue
} }

+ 61
- 76
pkgs/ioswitch/exec/executor.go View File

@@ -10,16 +10,15 @@ import (
"gitlink.org.cn/cloudream/common/utils/sync2" "gitlink.org.cn/cloudream/common/utils/sync2"
) )


type bindingVars struct {
Waittings []Var
Bindeds []Var
Callback *future.SetVoidFuture
type binding struct {
ID VarID
Callback *future.SetValueFuture[VarValue]
} }


type Executor struct { type Executor struct {
plan Plan plan Plan
vars map[VarID]Var vars map[VarID]Var
bindings []*bindingVars
bindings []*binding
lock sync.Mutex lock sync.Mutex
store map[string]any store map[string]any
} }
@@ -64,81 +63,44 @@ func (s *Executor) Run(ctx *ExecContext) (map[string]any, error) {
return s.store, nil return s.store, nil
} }


func (s *Executor) BindVars(ctx context.Context, vs ...Var) error {
func (s *Executor) BindVar(ctx context.Context, id VarID) (VarValue, error) {
s.lock.Lock() s.lock.Lock()


callback := future.NewSetVoid()
binding := &bindingVars{
Callback: callback,
}

for _, v := range vs {
v2 := s.vars[v.GetID()]
if v2 == nil {
binding.Waittings = append(binding.Waittings, v)
continue
}

if err := AssignVar(v2, v); err != nil {
s.lock.Unlock()
return fmt.Errorf("assign var %v to %v: %w", v2.GetID(), v.GetID(), err)
}

binding.Bindeds = append(binding.Bindeds, v)
}

if len(binding.Waittings) == 0 {
gv, ok := s.vars[id]
if ok {
delete(s.vars, id)
s.lock.Unlock() s.lock.Unlock()
return nil
return gv.Value, nil
} }


s.bindings = append(s.bindings, binding)
s.lock.Unlock()

err := callback.Wait(ctx)

s.lock.Lock()
defer s.lock.Unlock()

s.bindings = lo2.Remove(s.bindings, binding)
callback := future.NewSetValue[VarValue]()
s.bindings = append(s.bindings, &binding{
ID: id,
Callback: callback,
})


return err
s.lock.Unlock()
return callback.Wait(ctx)
} }


func (s *Executor) PutVars(vs ...Var) {
func (s *Executor) PutVar(id VarID, value VarValue) *Executor {
s.lock.Lock() s.lock.Lock()
defer s.lock.Unlock() defer s.lock.Unlock()


loop:
for _, v := range vs {
for ib, b := range s.bindings {
for iw, w := range b.Waittings {
if w.GetID() != v.GetID() {
continue
}

if err := AssignVar(v, w); err != nil {
b.Callback.SetError(fmt.Errorf("assign var %v to %v: %w", v.GetID(), w.GetID(), err))
// 绑定类型不对,说明生成的执行计划有问题,怎么处理都可以,因为最终会执行失败
continue loop
}

b.Bindeds = append(b.Bindeds, w)
b.Waittings = lo2.RemoveAt(b.Waittings, iw)
if len(b.Waittings) == 0 {
b.Callback.SetVoid()
s.bindings = lo2.RemoveAt(s.bindings, ib)
}

// 绑定成功,继续最外层循环
continue loop
}

for ib, b := range s.bindings {
if b.ID != id {
continue
} }


// 如果没有绑定,则直接放入变量表中
s.vars[v.GetID()] = v
b.Callback.SetValue(value)
s.bindings = lo2.RemoveAt(s.bindings, ib)

return s
} }

// 如果没有绑定,则直接放入变量表中
s.vars[id] = Var{ID: id, Value: value}
return s
} }


func (s *Executor) Store(key string, val any) { func (s *Executor) Store(key string, val any) {
@@ -148,20 +110,43 @@ func (s *Executor) Store(key string, val any) {
s.store[key] = val s.store[key] = val
} }


func BindArrayVars[T Var](sw *Executor, ctx context.Context, vs []T) error {
var vs2 []Var
for _, v := range vs {
vs2 = append(vs2, v)
func BindVar[T VarValue](e *Executor, ctx context.Context, id VarID) (T, error) {
v, err := e.BindVar(ctx, id)
if err != nil {
var def T
return def, err
} }


return sw.BindVars(ctx, vs2...)
ret, ok := v.(T)
if !ok {
var def T
return def, fmt.Errorf("binded var %v is %T, not %T", id, v, def)
}

return ret, nil
} }


func PutArrayVars[T Var](sw *Executor, vs []T) {
var vs2 []Var
for _, v := range vs {
vs2 = append(vs2, v)
func BindArray[T VarValue](e *Executor, ctx context.Context, ids []VarID) ([]T, error) {
ret := make([]T, len(ids))
for i := range ids {
v, err := e.BindVar(ctx, ids[i])
if err != nil {
return nil, err
}

v2, ok := v.(T)
if !ok {
var def T
return nil, fmt.Errorf("binded var %v is %T, not %T", ids[i], v, def)
}

ret[i] = v2
} }
return ret, nil
}


sw.PutVars(vs2...)
func PutArray[T VarValue](e *Executor, ids []VarID, values []T) {
for i := range ids {
e.PutVar(ids[i], values[i])
}
} }

+ 6
- 33
pkgs/ioswitch/exec/plan_builder.go View File

@@ -9,13 +9,14 @@ import (
) )


type PlanBuilder struct { type PlanBuilder struct {
Vars []Var
NextVarID VarID
WorkerPlans []*WorkerPlanBuilder WorkerPlans []*WorkerPlanBuilder
DriverPlan DriverPlanBuilder DriverPlan DriverPlanBuilder
} }


func NewPlanBuilder() *PlanBuilder { func NewPlanBuilder() *PlanBuilder {
bld := &PlanBuilder{ bld := &PlanBuilder{
NextVarID: VarID(1),
DriverPlan: DriverPlanBuilder{}, DriverPlan: DriverPlanBuilder{},
} }


@@ -41,39 +42,11 @@ func (b *PlanBuilder) AtWorker(worker WorkerInfo) *WorkerPlanBuilder {
return p return p
} }


func (b *PlanBuilder) NewStreamVar() *StreamVar {
v := &StreamVar{
ID: VarID(len(b.Vars)),
}
b.Vars = append(b.Vars, v)

return v
}

func (b *PlanBuilder) NewIntVar() *IntVar {
v := &IntVar{
ID: VarID(len(b.Vars)),
}
b.Vars = append(b.Vars, v)

return v
}

func (b *PlanBuilder) NewStringVar() *StringVar {
v := &StringVar{
ID: VarID(len(b.Vars)),
}
b.Vars = append(b.Vars, v)

return v
}
func (b *PlanBuilder) NewSignalVar() *SignalVar {
v := &SignalVar{
ID: VarID(len(b.Vars)),
}
b.Vars = append(b.Vars, v)
func (b *PlanBuilder) NewVar() VarID {
id := b.NextVarID
b.NextVarID++


return v
return id
} }


func (b *PlanBuilder) Execute(ctx *ExecContext) *Driver { func (b *PlanBuilder) Execute(ctx *ExecContext) *Driver {


+ 0
- 21
pkgs/ioswitch/exec/utils.go View File

@@ -1,9 +1,6 @@
package exec package exec


import ( import (
"fmt"
"reflect"

"github.com/google/uuid" "github.com/google/uuid"
"gitlink.org.cn/cloudream/common/utils/math2" "gitlink.org.cn/cloudream/common/utils/math2"
) )
@@ -12,24 +9,6 @@ func genRandomPlanID() PlanID {
return PlanID(uuid.NewString()) return PlanID(uuid.NewString())
} }


func AssignVar(from Var, to Var) error {
if reflect.TypeOf(from) != reflect.TypeOf(to) {
return fmt.Errorf("cannot assign %T to %T", from, to)
}

switch from := from.(type) {
case *StreamVar:
to.(*StreamVar).Stream = from.Stream
case *IntVar:
to.(*IntVar).Value = from.Value
case *StringVar:
to.(*StringVar).Value = from.Value
case *SignalVar:
}

return nil
}

type Range struct { type Range struct {
Offset int64 Offset int64
Length *int64 Length *int64


+ 55
- 28
pkgs/ioswitch/exec/var.go View File

@@ -4,54 +4,81 @@ import (
"io" "io"


"gitlink.org.cn/cloudream/common/pkgs/types" "gitlink.org.cn/cloudream/common/pkgs/types"
"gitlink.org.cn/cloudream/common/utils/reflect2"
"gitlink.org.cn/cloudream/common/utils/serder" "gitlink.org.cn/cloudream/common/utils/serder"
) )


type VarID int type VarID int


type Var interface {
GetID() VarID
type Var struct {
ID VarID `json:"id"`
Value VarValue `json:"value"`
} }


var VarUnion = types.NewTypeUnion[Var](
(*IntVar)(nil),
(*StringVar)(nil),
(*SignalVar)(nil),
(*StreamVar)(nil),
)
var _ = serder.UseTypeUnionExternallyTagged(&VarUnion)
type VarPack[T VarValue] struct {
ID VarID `json:"id"`
Value T `json:"value"`
}


type StreamVar struct {
ID VarID `json:"id"`
Stream io.ReadCloser `json:"-"`
func (v *VarPack[T]) ToAny() AnyVar {
return AnyVar{
ID: v.ID,
Value: v.Value,
}
} }


func (v *StreamVar) GetID() VarID {
return v.ID
// 变量的值
type VarValue interface {
Clone() VarValue
} }


type IntVar struct {
ID VarID `json:"id"`
Value string `json:"value"`
var valueUnion = serder.UseTypeUnionExternallyTagged(types.Ref(types.NewTypeUnion[VarValue](
(*StreamValue)(nil),
(*SignalValue)(nil),
(*StringValue)(nil),
)))

func UseVarValue[T VarValue]() {
valueUnion.Add(reflect2.TypeOf[T]())
} }


func (v *IntVar) GetID() VarID {
return v.ID
type AnyVar = VarPack[VarValue]

func V(id VarID, value VarValue) AnyVar {
return AnyVar{
ID: id,
Value: value,
}
} }


type StringVar struct {
ID VarID `json:"id"`
Value string `json:"value"`
type StreamValue struct {
Stream io.ReadCloser `json:"-"`
} }


func (v *StringVar) GetID() VarID {
return v.ID
// 不应该被调用
func (v *StreamValue) Clone() VarValue {
panic("StreamValue should not be cloned")
} }


type SignalVar struct {
ID VarID `json:"id"`
type SignalValue struct{}

func (o *SignalValue) Clone() VarValue {
return &SignalValue{}
}

type SignalVar = VarPack[*SignalValue]

func NewSignal(id VarID) SignalVar {
return SignalVar{
ID: id,
Value: &SignalValue{},
}
}

type StringValue struct {
Value string `json:"value"`
} }


func (v *SignalVar) GetID() VarID {
return v.ID
func (o *StringValue) Clone() VarValue {
return &StringValue{Value: o.Value}
} }

+ 7
- 4
pkgs/ioswitch/exec/worker.go View File

@@ -95,9 +95,12 @@ type WorkerInfo interface {


type WorkerClient interface { type WorkerClient interface {
ExecutePlan(ctx context.Context, plan Plan) error ExecutePlan(ctx context.Context, plan Plan) error
SendStream(ctx context.Context, planID PlanID, v *StreamVar, str io.ReadCloser) error
SendVar(ctx context.Context, planID PlanID, v Var) error
GetStream(ctx context.Context, planID PlanID, v *StreamVar, signal *SignalVar) (io.ReadCloser, error)
GetVar(ctx context.Context, planID PlanID, v Var, signal *SignalVar) error

SendStream(ctx context.Context, planID PlanID, id VarID, stream io.ReadCloser) error
SendVar(ctx context.Context, planID PlanID, id VarID, value VarValue) error

GetStream(ctx context.Context, planID PlanID, streamID VarID, signalID VarID, signal VarValue) (io.ReadCloser, error)
GetVar(ctx context.Context, planID PlanID, varID VarID, signalID VarID, signal VarValue) (VarValue, error)

Close() error Close() error
} }

+ 8
- 26
pkgs/ioswitch/plan/generate.go View File

@@ -1,8 +1,6 @@
package plan package plan


import ( import (
"fmt"

"gitlink.org.cn/cloudream/common/pkgs/ioswitch/dag" "gitlink.org.cn/cloudream/common/pkgs/ioswitch/dag"
"gitlink.org.cn/cloudream/common/pkgs/ioswitch/exec" "gitlink.org.cn/cloudream/common/pkgs/ioswitch/exec"
"gitlink.org.cn/cloudream/common/pkgs/ioswitch/plan/ops" "gitlink.org.cn/cloudream/common/pkgs/ioswitch/plan/ops"
@@ -122,57 +120,41 @@ func buildPlan(graph *dag.Graph, blder *exec.PlanBuilder) error {
for i := 0; i < node.OutputStreams().Len(); i++ { for i := 0; i < node.OutputStreams().Len(); i++ {
out := node.OutputStreams().Get(i) out := node.OutputStreams().Get(i)


if out.Var != nil {
if out.VarID > 0 {
continue continue
} }


out.Var = blder.NewStreamVar()
out.VarID = blder.NewVar()
} }


for i := 0; i < node.InputStreams().Len(); i++ { for i := 0; i < node.InputStreams().Len(); i++ {
in := node.InputStreams().Get(i) in := node.InputStreams().Get(i)


if in.Var != nil {
if in.VarID > 0 {
continue continue
} }


in.Var = blder.NewStreamVar()
in.VarID = blder.NewVar()
} }


for i := 0; i < node.OutputValues().Len(); i++ { for i := 0; i < node.OutputValues().Len(); i++ {
out := node.OutputValues().Get(i) out := node.OutputValues().Get(i)


if out.Var != nil {
if out.VarID > 0 {
continue continue
} }


switch out.Type {
case dag.StringValueVar:
out.Var = blder.NewStringVar()
case dag.SignalValueVar:
out.Var = blder.NewSignalVar()
default:
retErr = fmt.Errorf("unsupported value var type: %v", out.Type)
return false
}
out.VarID = blder.NewVar()
} }


for i := 0; i < node.InputValues().Len(); i++ { for i := 0; i < node.InputValues().Len(); i++ {
in := node.InputValues().Get(i) in := node.InputValues().Get(i)


if in.Var != nil {
if in.VarID > 0 {
continue continue
} }


switch in.Type {
case dag.StringValueVar:
in.Var = blder.NewStringVar()
case dag.SignalValueVar:
in.Var = blder.NewSignalVar()
default:
retErr = fmt.Errorf("unsupported value var type: %v", in.Type)
return false
}
in.VarID = blder.NewVar()
} }


op, err := node.GenerateOp() op, err := node.GenerateOp()


+ 8
- 8
pkgs/ioswitch/plan/ops/driver.go View File

@@ -16,20 +16,20 @@ func (b *GraphNodeBuilder) NewFromDriver(handle *exec.DriverWriteStream) *FromDr
} }
b.AddNode(node) b.AddNode(node)


node.OutputStreams().SetupNew(node, b.NewStreamVar())
node.OutputStreams().SetupNew(node, b.NewVar())


return node return node
} }


func (t *FromDriverNode) Output() dag.StreamSlot {
return dag.StreamSlot{
func (t *FromDriverNode) Output() dag.Slot {
return dag.Slot{
Var: t.OutputStreams().Get(0), Var: t.OutputStreams().Get(0),
Index: 0, Index: 0,
} }
} }


func (t *FromDriverNode) GenerateOp() (exec.Op, error) { func (t *FromDriverNode) GenerateOp() (exec.Op, error) {
t.Handle.Var = t.OutputStreams().Get(0).Var
t.Handle.ID = t.OutputStreams().Get(0).VarID
return nil, nil return nil, nil
} }


@@ -52,20 +52,20 @@ func (b *GraphNodeBuilder) NewToDriver(handle *exec.DriverReadStream) *ToDriverN
return node return node
} }


func (t *ToDriverNode) SetInput(v *dag.StreamVar) {
func (t *ToDriverNode) SetInput(v *dag.Var) {
t.InputStreams().EnsureSize(1) t.InputStreams().EnsureSize(1)
v.Connect(t, 0) v.Connect(t, 0)
} }


func (t *ToDriverNode) Input() dag.StreamSlot {
return dag.StreamSlot{
func (t *ToDriverNode) Input() dag.Slot {
return dag.Slot{
Var: t.InputStreams().Get(0), Var: t.InputStreams().Get(0),
Index: 0, Index: 0,
} }
} }


func (t *ToDriverNode) GenerateOp() (exec.Op, error) { func (t *ToDriverNode) GenerateOp() (exec.Op, error) {
t.Handle.Var = t.InputStreams().Get(0).Var
t.Handle.ID = t.InputStreams().Get(0).VarID
return nil, nil return nil, nil
} }




+ 7
- 7
pkgs/ioswitch/plan/ops/drop.go View File

@@ -13,19 +13,19 @@ func init() {
} }


type DropStream struct { type DropStream struct {
Input *exec.StreamVar `json:"input"`
Input exec.VarID `json:"input"`
} }


func (o *DropStream) Execute(ctx *exec.ExecContext, e *exec.Executor) error { func (o *DropStream) Execute(ctx *exec.ExecContext, e *exec.Executor) error {
err := e.BindVars(ctx.Context, o.Input)
str, err := exec.BindVar[*exec.StreamValue](e, ctx.Context, o.Input)
if err != nil { if err != nil {
return err return err
} }
defer o.Input.Stream.Close()
defer str.Stream.Close()


for { for {
buf := make([]byte, 1024*8) buf := make([]byte, 1024*8)
_, err = o.Input.Stream.Read(buf)
_, err = str.Stream.Read(buf)
if err == io.EOF { if err == io.EOF {
return nil return nil
} }
@@ -36,7 +36,7 @@ func (o *DropStream) Execute(ctx *exec.ExecContext, e *exec.Executor) error {
} }


func (o *DropStream) String() string { func (o *DropStream) String() string {
return fmt.Sprintf("DropStream %v", o.Input.ID)
return fmt.Sprintf("DropStream %v", o.Input)
} }


type DropNode struct { type DropNode struct {
@@ -49,14 +49,14 @@ func (b *GraphNodeBuilder) NewDropStream() *DropNode {
return node return node
} }


func (t *DropNode) SetInput(v *dag.StreamVar) {
func (t *DropNode) SetInput(v *dag.Var) {
t.InputStreams().EnsureSize(1) t.InputStreams().EnsureSize(1)
v.Connect(t, 0) v.Connect(t, 0)
} }


func (t *DropNode) GenerateOp() (exec.Op, error) { func (t *DropNode) GenerateOp() (exec.Op, error) {
return &DropStream{ return &DropStream{
Input: t.InputStreams().Get(0).Var,
Input: t.InputStreams().Get(0).VarID,
}, nil }, nil
} }




+ 49
- 50
pkgs/ioswitch/plan/ops/send.go View File

@@ -18,17 +18,17 @@ func init() {
} }


type SendStream struct { type SendStream struct {
Input *exec.StreamVar `json:"input"`
Send *exec.StreamVar `json:"send"`
Input exec.VarID `json:"input"`
Send exec.VarID `json:"send"`
Worker exec.WorkerInfo `json:"worker"` Worker exec.WorkerInfo `json:"worker"`
} }


func (o *SendStream) Execute(ctx *exec.ExecContext, e *exec.Executor) error { func (o *SendStream) Execute(ctx *exec.ExecContext, e *exec.Executor) error {
err := e.BindVars(ctx.Context, o.Input)
inputStr, err := exec.BindVar[*exec.StreamValue](e, ctx.Context, o.Input)
if err != nil { if err != nil {
return err return err
} }
defer o.Input.Stream.Close()
defer inputStr.Stream.Close()


cli, err := o.Worker.NewClient() cli, err := o.Worker.NewClient()
if err != nil { if err != nil {
@@ -37,7 +37,7 @@ func (o *SendStream) Execute(ctx *exec.ExecContext, e *exec.Executor) error {
defer cli.Close() defer cli.Close()


// 发送后流的ID不同 // 发送后流的ID不同
err = cli.SendStream(ctx.Context, e.Plan().ID, o.Send, o.Input.Stream)
err = cli.SendStream(ctx.Context, e.Plan().ID, o.Send, inputStr.Stream)
if err != nil { if err != nil {
return fmt.Errorf("sending stream: %w", err) return fmt.Errorf("sending stream: %w", err)
} }
@@ -46,14 +46,14 @@ func (o *SendStream) Execute(ctx *exec.ExecContext, e *exec.Executor) error {
} }


func (o *SendStream) String() string { func (o *SendStream) String() string {
return fmt.Sprintf("SendStream %v->%v@%v", o.Input.ID, o.Send.ID, o.Worker)
return fmt.Sprintf("SendStream %v->%v@%v", o.Input, o.Send, o.Worker)
} }


type GetStream struct { type GetStream struct {
Signal *exec.SignalVar `json:"signal"`
Target *exec.StreamVar `json:"target"`
Output *exec.StreamVar `json:"output"`
Worker exec.WorkerInfo `json:"worker"`
Signal exec.VarPack[*exec.SignalValue] `json:"signal"`
Target exec.VarID `json:"target"`
Output exec.VarID `json:"output"`
Worker exec.WorkerInfo `json:"worker"`
} }


func (o *GetStream) Execute(ctx *exec.ExecContext, e *exec.Executor) error { func (o *GetStream) Execute(ctx *exec.ExecContext, e *exec.Executor) error {
@@ -63,33 +63,33 @@ func (o *GetStream) Execute(ctx *exec.ExecContext, e *exec.Executor) error {
} }
defer cli.Close() defer cli.Close()


str, err := cli.GetStream(ctx.Context, e.Plan().ID, o.Target, o.Signal)
str, err := cli.GetStream(ctx.Context, e.Plan().ID, o.Target, o.Signal.ID, o.Signal.Value)
if err != nil { if err != nil {
return fmt.Errorf("getting stream: %w", err) return fmt.Errorf("getting stream: %w", err)
} }


fut := future.NewSetVoid() fut := future.NewSetVoid()
// 获取后送到本地的流ID是不同的 // 获取后送到本地的流ID是不同的
o.Output.Stream = io2.AfterReadClosedOnce(str, func(closer io.ReadCloser) {
str = io2.AfterReadClosedOnce(str, func(closer io.ReadCloser) {
fut.SetVoid() fut.SetVoid()
}) })
e.PutVars(o.Output)
e.PutVar(o.Output, &exec.StreamValue{Stream: str})


return fut.Wait(ctx.Context) return fut.Wait(ctx.Context)
} }


func (o *GetStream) String() string { func (o *GetStream) String() string {
return fmt.Sprintf("GetStream %v(S:%v)<-%v@%v", o.Output.ID, o.Signal.ID, o.Target.ID, o.Worker)
return fmt.Sprintf("GetStream %v(S:%v)<-%v@%v", o.Output, o.Signal.ID, o.Target, o.Worker)
} }


type SendVar struct { type SendVar struct {
Input exec.Var `json:"input"`
Send exec.Var `json:"send"`
Input exec.VarID `json:"input"`
Send exec.VarID `json:"send"`
Worker exec.WorkerInfo `json:"worker"` Worker exec.WorkerInfo `json:"worker"`
} }


func (o *SendVar) Execute(ctx *exec.ExecContext, e *exec.Executor) error { func (o *SendVar) Execute(ctx *exec.ExecContext, e *exec.Executor) error {
err := e.BindVars(ctx.Context, o.Input)
input, err := e.BindVar(ctx.Context, o.Input)
if err != nil { if err != nil {
return err return err
} }
@@ -100,8 +100,7 @@ func (o *SendVar) Execute(ctx *exec.ExecContext, e *exec.Executor) error {
} }
defer cli.Close() defer cli.Close()


exec.AssignVar(o.Input, o.Send)
err = cli.SendVar(ctx.Context, e.Plan().ID, o.Send)
err = cli.SendVar(ctx.Context, e.Plan().ID, o.Send, input)
if err != nil { if err != nil {
return fmt.Errorf("sending var: %w", err) return fmt.Errorf("sending var: %w", err)
} }
@@ -110,14 +109,14 @@ func (o *SendVar) Execute(ctx *exec.ExecContext, e *exec.Executor) error {
} }


func (o *SendVar) String() string { func (o *SendVar) String() string {
return fmt.Sprintf("SendVar %v->%v@%v", o.Input.GetID(), o.Send.GetID(), o.Worker)
return fmt.Sprintf("SendVar %v->%v@%v", o.Input, o.Send, o.Worker)
} }


type GetVar struct { type GetVar struct {
Signal *exec.SignalVar `json:"signal"`
Target exec.Var `json:"target"`
Output exec.Var `json:"output"`
Worker exec.WorkerInfo `json:"worker"`
Signal exec.VarPack[*exec.SignalValue] `json:"signal"`
Target exec.VarID `json:"target"`
Output exec.VarID `json:"output"`
Worker exec.WorkerInfo `json:"worker"`
} }


func (o *GetVar) Execute(ctx *exec.ExecContext, e *exec.Executor) error { func (o *GetVar) Execute(ctx *exec.ExecContext, e *exec.Executor) error {
@@ -127,18 +126,18 @@ func (o *GetVar) Execute(ctx *exec.ExecContext, e *exec.Executor) error {
} }
defer cli.Close() defer cli.Close()


err = cli.GetVar(ctx.Context, e.Plan().ID, o.Target, o.Signal)
get, err := cli.GetVar(ctx.Context, e.Plan().ID, o.Target, o.Signal.ID, o.Signal.Value)
if err != nil { if err != nil {
return fmt.Errorf("getting var: %w", err) return fmt.Errorf("getting var: %w", err)
} }
exec.AssignVar(o.Target, o.Output)
e.PutVars(o.Output)
e.PutVar(o.Output, get)


return nil return nil
} }


func (o *GetVar) String() string { func (o *GetVar) String() string {
return fmt.Sprintf("GetVar %v(S:%v)<-%v@%v", o.Output.GetID(), o.Signal.ID, o.Target.GetID(), o.Worker)
return fmt.Sprintf("GetVar %v(S:%v)<-%v@%v", o.Output, o.Signal.ID, o.Target, o.Worker)
} }


type SendStreamNode struct { type SendStreamNode struct {
@@ -154,18 +153,18 @@ func (b *GraphNodeBuilder) NewSendStream(to exec.WorkerInfo) *SendStreamNode {
return node return node
} }


func (t *SendStreamNode) Send(v *dag.StreamVar) *dag.StreamVar {
func (t *SendStreamNode) Send(v *dag.Var) *dag.Var {
t.InputStreams().EnsureSize(1) t.InputStreams().EnsureSize(1)
v.Connect(t, 0) v.Connect(t, 0)
output := t.Graph().NewStreamVar()
output := t.Graph().NewVar()
t.OutputStreams().Setup(t, output, 0) t.OutputStreams().Setup(t, output, 0)
return output return output
} }


func (t *SendStreamNode) GenerateOp() (exec.Op, error) { func (t *SendStreamNode) GenerateOp() (exec.Op, error) {
return &SendStream{ return &SendStream{
Input: t.InputStreams().Get(0).Var,
Send: t.OutputStreams().Get(0).Var,
Input: t.InputStreams().Get(0).VarID,
Send: t.OutputStreams().Get(0).VarID,
Worker: t.ToWorker, Worker: t.ToWorker,
}, nil }, nil
} }
@@ -187,18 +186,18 @@ func (b *GraphNodeBuilder) NewSendValue(to exec.WorkerInfo) *SendValueNode {
return node return node
} }


func (t *SendValueNode) Send(v *dag.ValueVar) *dag.ValueVar {
func (t *SendValueNode) Send(v *dag.Var) *dag.Var {
t.InputValues().EnsureSize(1) t.InputValues().EnsureSize(1)
v.Connect(t, 0) v.Connect(t, 0)
output := t.Graph().NewValueVar(v.Type)
output := t.Graph().NewVar()
t.OutputValues().Setup(t, output, 0) t.OutputValues().Setup(t, output, 0)
return output return output
} }


func (t *SendValueNode) GenerateOp() (exec.Op, error) { func (t *SendValueNode) GenerateOp() (exec.Op, error) {
return &SendVar{ return &SendVar{
Input: t.InputValues().Get(0).Var,
Send: t.OutputValues().Get(0).Var,
Input: t.InputValues().Get(0).VarID,
Send: t.OutputValues().Get(0).VarID,
Worker: t.ToWorker, Worker: t.ToWorker,
}, nil }, nil
} }
@@ -217,27 +216,27 @@ func (b *GraphNodeBuilder) NewGetStream(from exec.WorkerInfo) *GetStreamNode {
FromWorker: from, FromWorker: from,
} }
b.AddNode(node) b.AddNode(node)
node.OutputValues().Setup(node, node.Graph().NewValueVar(dag.SignalValueVar), 0)
node.OutputValues().Setup(node, node.Graph().NewVar(), 0)
return node return node
} }


func (t *GetStreamNode) Get(v *dag.StreamVar) *dag.StreamVar {
func (t *GetStreamNode) Get(v *dag.Var) *dag.Var {
t.InputStreams().EnsureSize(1) t.InputStreams().EnsureSize(1)
v.Connect(t, 0) v.Connect(t, 0)
output := t.Graph().NewStreamVar()
output := t.Graph().NewVar()
t.OutputStreams().Setup(t, output, 0) t.OutputStreams().Setup(t, output, 0)
return output return output
} }


func (t *GetStreamNode) SignalVar() *dag.ValueVar {
func (t *GetStreamNode) SignalVar() *dag.Var {
return t.OutputValues().Get(0) return t.OutputValues().Get(0)
} }


func (t *GetStreamNode) GenerateOp() (exec.Op, error) { func (t *GetStreamNode) GenerateOp() (exec.Op, error) {
return &GetStream{ return &GetStream{
Signal: t.OutputValues().Get(0).Var.(*exec.SignalVar),
Output: t.OutputStreams().Get(0).Var,
Target: t.InputStreams().Get(0).Var,
Signal: exec.NewSignal(t.OutputValues().Get(0).VarID),
Output: t.OutputStreams().Get(0).VarID,
Target: t.InputStreams().Get(0).VarID,
Worker: t.FromWorker, Worker: t.FromWorker,
}, nil }, nil
} }
@@ -256,27 +255,27 @@ func (b *GraphNodeBuilder) NewGetValue(from exec.WorkerInfo) *GetValueNode {
FromWorker: from, FromWorker: from,
} }
b.AddNode(node) b.AddNode(node)
node.OutputValues().Setup(node, node.Graph().NewValueVar(dag.SignalValueVar), 0)
node.OutputValues().Setup(node, node.Graph().NewVar(), 0)
return node return node
} }


func (t *GetValueNode) Get(v *dag.ValueVar) *dag.ValueVar {
func (t *GetValueNode) Get(v *dag.Var) *dag.Var {
t.InputValues().EnsureSize(1) t.InputValues().EnsureSize(1)
v.Connect(t, 0) v.Connect(t, 0)
output := t.Graph().NewValueVar(v.Type)
output := t.Graph().NewVar()
t.OutputValues().Setup(t, output, 1) t.OutputValues().Setup(t, output, 1)
return output return output
} }


func (t *GetValueNode) SignalVar() *dag.ValueVar {
func (t *GetValueNode) SignalVar() *dag.Var {
return t.OutputValues().Get(0) return t.OutputValues().Get(0)
} }


func (t *GetValueNode) GenerateOp() (exec.Op, error) { func (t *GetValueNode) GenerateOp() (exec.Op, error) {
return &GetVar{ return &GetVar{
Signal: t.OutputValues().Get(0).Var.(*exec.SignalVar),
Output: t.OutputValues().Get(1).Var,
Target: t.InputValues().Get(0).Var,
Signal: exec.NewSignal(t.OutputValues().Get(0).VarID),
Output: t.OutputValues().Get(1).VarID,
Target: t.InputValues().Get(0).VarID,
Worker: t.FromWorker, Worker: t.FromWorker,
}, nil }, nil
} }


+ 6
- 12
pkgs/ioswitch/plan/ops/store.go View File

@@ -8,28 +8,22 @@ import (
) )


type Store struct { type Store struct {
Var exec.Var
Var exec.VarID
Key string Key string
} }


func (o *Store) Execute(ctx *exec.ExecContext, e *exec.Executor) error { func (o *Store) Execute(ctx *exec.ExecContext, e *exec.Executor) error {
err := e.BindVars(ctx.Context, o.Var)
v, err := e.BindVar(ctx.Context, o.Var)
if err != nil { if err != nil {
return err return err
} }


switch v := o.Var.(type) {
case *exec.IntVar:
e.Store(o.Key, v.Value)
case *exec.StringVar:
e.Store(o.Key, v.Value)
}

e.Store(o.Key, v)
return nil return nil
} }


func (o *Store) String() string { func (o *Store) String() string {
return fmt.Sprintf("Store %v: %v", o.Key, o.Var.GetID())
return fmt.Sprintf("Store %v: %v", o.Key, o.Var)
} }


type StoreNode struct { type StoreNode struct {
@@ -43,7 +37,7 @@ func (b *GraphNodeBuilder) NewStore() *StoreNode {
return node return node
} }


func (t *StoreNode) Store(key string, v *dag.ValueVar) {
func (t *StoreNode) Store(key string, v *dag.Var) {
t.Key = key t.Key = key
t.InputValues().EnsureSize(1) t.InputValues().EnsureSize(1)
v.Connect(t, 0) v.Connect(t, 0)
@@ -51,7 +45,7 @@ func (t *StoreNode) Store(key string, v *dag.ValueVar) {


func (t *StoreNode) GenerateOp() (exec.Op, error) { func (t *StoreNode) GenerateOp() (exec.Op, error) {
return &Store{ return &Store{
Var: t.InputValues().Get(0).Var,
Var: t.InputValues().Get(0).VarID,
Key: t.Key, Key: t.Key,
}, nil }, nil
} }


+ 43
- 47
pkgs/ioswitch/plan/ops/sync.go View File

@@ -19,30 +19,29 @@ func init() {
} }


type OnStreamBegin struct { type OnStreamBegin struct {
Raw *exec.StreamVar `json:"raw"`
New *exec.StreamVar `json:"new"`
Signal *exec.SignalVar `json:"signal"`
Raw exec.VarID `json:"raw"`
New exec.VarID `json:"new"`
Signal exec.SignalVar `json:"signal"`
} }


func (o *OnStreamBegin) Execute(ctx *exec.ExecContext, e *exec.Executor) error { func (o *OnStreamBegin) Execute(ctx *exec.ExecContext, e *exec.Executor) error {
err := e.BindVars(ctx.Context, o.Raw)
raw, err := exec.BindVar[*exec.StreamValue](e, ctx.Context, o.Raw)
if err != nil { if err != nil {
return err return err
} }


o.New.Stream = o.Raw.Stream

e.PutVars(o.New, o.Signal)
e.PutVar(o.New, &exec.StreamValue{Stream: raw.Stream}).
PutVar(o.Signal.ID, o.Signal.Value)
return nil return nil
} }


func (o *OnStreamBegin) String() string { func (o *OnStreamBegin) String() string {
return fmt.Sprintf("OnStreamBegin %v->%v S:%v", o.Raw.ID, o.New.ID, o.Signal.ID)
return fmt.Sprintf("OnStreamBegin %v->%v S:%v", o.Raw, o.New, o.Signal.ID)
} }


type OnStreamEnd struct { type OnStreamEnd struct {
Raw *exec.StreamVar `json:"raw"`
New *exec.StreamVar `json:"new"`
Raw exec.VarID `json:"raw"`
New exec.VarID `json:"new"`
Signal *exec.SignalVar `json:"signal"` Signal *exec.SignalVar `json:"signal"`
} }


@@ -67,57 +66,49 @@ func (o *onStreamEnd) Close() error {
} }


func (o *OnStreamEnd) Execute(ctx *exec.ExecContext, e *exec.Executor) error { func (o *OnStreamEnd) Execute(ctx *exec.ExecContext, e *exec.Executor) error {
err := e.BindVars(ctx.Context, o.Raw)
raw, err := exec.BindVar[*exec.StreamValue](e, ctx.Context, o.Raw)
if err != nil { if err != nil {
return err return err
} }


cb := future.NewSetVoid() cb := future.NewSetVoid()


o.New.Stream = &onStreamEnd{
inner: o.Raw.Stream,
e.PutVar(o.New, &exec.StreamValue{Stream: &onStreamEnd{
inner: raw.Stream,
callback: cb, callback: cb,
}
e.PutVars(o.New)
}})


err = cb.Wait(ctx.Context) err = cb.Wait(ctx.Context)
if err != nil { if err != nil {
return err return err
} }


e.PutVars(o.Signal)
e.PutVar(o.Signal.ID, o.Signal.Value)
return nil return nil
} }


func (o *OnStreamEnd) String() string { func (o *OnStreamEnd) String() string {
return fmt.Sprintf("OnStreamEnd %v->%v S:%v", o.Raw.ID, o.New.ID, o.Signal.ID)
return fmt.Sprintf("OnStreamEnd %v->%v S:%v", o.Raw, o.New, o.Signal.ID)
} }


type HoldUntil struct { type HoldUntil struct {
Waits []*exec.SignalVar `json:"waits"`
Holds []exec.Var `json:"holds"`
Emits []exec.Var `json:"emits"`
Waits []exec.VarID `json:"waits"`
Holds []exec.VarID `json:"holds"`
Emits []exec.VarID `json:"emits"`
} }


func (w *HoldUntil) Execute(ctx *exec.ExecContext, e *exec.Executor) error { func (w *HoldUntil) Execute(ctx *exec.ExecContext, e *exec.Executor) error {
err := e.BindVars(ctx.Context, w.Holds...)
holds, err := exec.BindArray[exec.VarValue](e, ctx.Context, w.Holds)
if err != nil { if err != nil {
return err return err
} }


err = exec.BindArrayVars(e, ctx.Context, w.Waits)
_, err = exec.BindArray[exec.VarValue](e, ctx.Context, w.Waits)
if err != nil { if err != nil {
return err return err
} }


for i := 0; i < len(w.Holds); i++ {
err := exec.AssignVar(w.Holds[i], w.Emits[i])
if err != nil {
return err
}
}

e.PutVars(w.Emits...)
exec.PutArray(e, w.Emits, holds)
return nil return nil
} }


@@ -126,12 +117,12 @@ func (w *HoldUntil) String() string {
} }


type HangUntil struct { type HangUntil struct {
Waits []*exec.SignalVar `json:"waits"`
Op exec.Op `json:"op"`
Waits []exec.VarID `json:"waits"`
Op exec.Op `json:"op"`
} }


func (h *HangUntil) Execute(ctx *exec.ExecContext, e *exec.Executor) error { func (h *HangUntil) Execute(ctx *exec.ExecContext, e *exec.Executor) error {
err := exec.BindArrayVars(e, ctx.Context, h.Waits)
_, err := exec.BindArray[exec.VarValue](e, ctx.Context, h.Waits)
if err != nil { if err != nil {
return err return err
} }
@@ -144,17 +135,22 @@ func (h *HangUntil) String() string {
} }


type Broadcast struct { type Broadcast struct {
Source *exec.SignalVar `json:"source"`
Targets []*exec.SignalVar `json:"targets"`
Source exec.VarID `json:"source"`
Targets []exec.VarID `json:"targets"`
} }


func (b *Broadcast) Execute(ctx *exec.ExecContext, e *exec.Executor) error { func (b *Broadcast) Execute(ctx *exec.ExecContext, e *exec.Executor) error {
err := e.BindVars(ctx.Context, b.Source)
src, err := exec.BindVar[*exec.SignalValue](e, ctx.Context, b.Source)
if err != nil { if err != nil {
return err return err
} }


exec.PutArrayVars(e, b.Targets)
targets := make([]exec.VarValue, len(b.Targets))
for i := 0; i < len(b.Targets); i++ {
targets[i] = src.Clone()
}

exec.PutArray(e, b.Targets, targets)
return nil return nil
} }


@@ -172,38 +168,38 @@ func (b *GraphNodeBuilder) NewHoldUntil() *HoldUntilNode {
return node return node
} }


func (t *HoldUntilNode) SetSignal(s *dag.ValueVar) {
func (t *HoldUntilNode) SetSignal(s *dag.Var) {
t.InputValues().EnsureSize(1) t.InputValues().EnsureSize(1)
s.Connect(t, 0) s.Connect(t, 0)
} }


func (t *HoldUntilNode) HoldStream(str *dag.StreamVar) *dag.StreamVar {
func (t *HoldUntilNode) HoldStream(str *dag.Var) *dag.Var {
str.Connect(t, t.InputStreams().EnlargeOne()) str.Connect(t, t.InputStreams().EnlargeOne())
output := t.Graph().NewStreamVar()
output := t.Graph().NewVar()
t.OutputStreams().SetupNew(t, output) t.OutputStreams().SetupNew(t, output)
return output return output
} }


func (t *HoldUntilNode) HoldVar(v *dag.ValueVar) *dag.ValueVar {
func (t *HoldUntilNode) HoldVar(v *dag.Var) *dag.Var {
v.Connect(t, t.InputValues().EnlargeOne()) v.Connect(t, t.InputValues().EnlargeOne())
output := t.Graph().NewValueVar(v.Type)
output := t.Graph().NewVar()
t.OutputValues().SetupNew(t, output) t.OutputValues().SetupNew(t, output)
return output return output
} }


func (t *HoldUntilNode) GenerateOp() (exec.Op, error) { func (t *HoldUntilNode) GenerateOp() (exec.Op, error) {
o := &HoldUntil{ o := &HoldUntil{
Waits: []*exec.SignalVar{t.InputValues().Get(0).Var.(*exec.SignalVar)},
Waits: []exec.VarID{t.InputValues().Get(0).VarID},
} }


for i := 0; i < t.OutputValues().Len(); i++ { for i := 0; i < t.OutputValues().Len(); i++ {
o.Holds = append(o.Holds, t.InputValues().Get(i+1).Var)
o.Emits = append(o.Emits, t.OutputValues().Get(i).Var)
o.Holds = append(o.Holds, t.InputValues().Get(i+1).VarID)
o.Emits = append(o.Emits, t.OutputValues().Get(i).VarID)
} }


for i := 0; i < t.OutputStreams().Len(); i++ { for i := 0; i < t.OutputStreams().Len(); i++ {
o.Holds = append(o.Holds, t.InputStreams().Get(i).Var)
o.Emits = append(o.Emits, t.OutputStreams().Get(i).Var)
o.Holds = append(o.Holds, t.InputStreams().Get(i).VarID)
o.Emits = append(o.Emits, t.OutputStreams().Get(i).VarID)
} }


return o, nil return o, nil


+ 3
- 2
pkgs/ioswitch/plan/ops/var.go View File

@@ -9,11 +9,12 @@ func init() {
} }


type ConstVar struct { type ConstVar struct {
Var *exec.StringVar `json:"var"`
ID exec.VarID `json:"id"`
Value exec.VarValue `json:"value"`
} }


func (o *ConstVar) Execute(ctx *exec.ExecContext, e *exec.Executor) error { func (o *ConstVar) Execute(ctx *exec.ExecContext, e *exec.Executor) error {
e.PutVars(o.Var)
e.PutVar(o.ID, o.Value)
return nil return nil
} }




+ 2
- 2
pkgs/ioswitch/utils/utils.go View File

@@ -7,10 +7,10 @@ import (
"gitlink.org.cn/cloudream/common/pkgs/ioswitch/exec" "gitlink.org.cn/cloudream/common/pkgs/ioswitch/exec"
) )


func FormatVarIDs[T exec.Var](arr []T) string {
func FormatVarIDs(arr []exec.VarID) string {
sb := strings.Builder{} sb := strings.Builder{}
for i, v := range arr { for i, v := range arr {
sb.WriteString(fmt.Sprintf("%v", v.GetID()))
sb.WriteString(fmt.Sprintf("%v", v))
if i < len(arr)-1 { if i < len(arr)-1 {
sb.WriteString(",") sb.WriteString(",")
} }


+ 32
- 23
sdks/storage/cdsapi/hub_io.go View File

@@ -15,24 +15,28 @@ import (
"gitlink.org.cn/cloudream/common/utils/serder" "gitlink.org.cn/cloudream/common/utils/serder"
) )


// TODO2 重新梳理代码

const GetStreamPath = "/hubIO/getStream" const GetStreamPath = "/hubIO/getStream"


type GetStreamReq struct { type GetStreamReq struct {
PlanID exec.PlanID `json:"planID"`
VarID exec.VarID `json:"varID"`
Signal *exec.SignalVar `json:"signal"`
PlanID exec.PlanID `json:"planID"`
VarID exec.VarID `json:"varID"`
SignalID exec.VarID `json:"signalID"`
Signal exec.VarValue `json:"signal"`
} }


func (c *Client) GetStream(planID exec.PlanID, id exec.VarID, signal *exec.SignalVar) (io.ReadCloser, error) {
func (c *Client) GetStream(planID exec.PlanID, id exec.VarID, signalID exec.VarID, signal exec.VarValue) (io.ReadCloser, error) {
targetUrl, err := url.JoinPath(c.baseURL, GetStreamPath) targetUrl, err := url.JoinPath(c.baseURL, GetStreamPath)
if err != nil { if err != nil {
return nil, err return nil, err
} }


req := &GetStreamReq{ req := &GetStreamReq{
PlanID: planID,
VarID: id,
Signal: signal,
PlanID: planID,
VarID: id,
SignalID: signalID,
Signal: signal,
} }


resp, err := http2.GetJSON(targetUrl, http2.RequestParam{ resp, err := http2.GetJSON(targetUrl, http2.RequestParam{
@@ -154,19 +158,21 @@ func (c *Client) ExecuteIOPlan(plan exec.Plan) error {
const SendVarPath = "/hubIO/sendVar" const SendVarPath = "/hubIO/sendVar"


type SendVarReq struct { type SendVarReq struct {
PlanID exec.PlanID `json:"planID"`
Var exec.Var `json:"var"`
PlanID exec.PlanID `json:"planID"`
VarID exec.VarID `json:"varID"`
VarValue exec.VarValue `json:"varValue"`
} }


func (c *Client) SendVar(id exec.PlanID, v exec.Var) error {
func (c *Client) SendVar(planID exec.PlanID, id exec.VarID, value exec.VarValue) error {
targetUrl, err := url.JoinPath(c.baseURL, SendVarPath) targetUrl, err := url.JoinPath(c.baseURL, SendVarPath)
if err != nil { if err != nil {
return err return err
} }


req := &SendVarReq{ req := &SendVarReq{
PlanID: id,
Var: v,
PlanID: planID,
VarID: id,
VarValue: value,
} }


resp, err := http2.PostJSON(targetUrl, http2.RequestParam{ resp, err := http2.PostJSON(targetUrl, http2.RequestParam{
@@ -196,35 +202,38 @@ func (c *Client) SendVar(id exec.PlanID, v exec.Var) error {
const GetVarPath = "/hubIO/getVar" const GetVarPath = "/hubIO/getVar"


type GetVarReq struct { type GetVarReq struct {
PlanID exec.PlanID `json:"planID"`
Var exec.Var `json:"var"`
Signal *exec.SignalVar `json:"signal"`
PlanID exec.PlanID `json:"planID"`
VarID exec.VarID `json:"varID"`
SignalID exec.VarID `json:"signalID"`
Signal exec.VarValue `json:"signal"`
} }


func (c *Client) GetVar(id exec.PlanID, v exec.Var, signal *exec.SignalVar) error {
func (c *Client) GetVar(id exec.PlanID, varID exec.VarID, singalID exec.VarID, signal exec.VarValue) (exec.VarValue, error) {
targetUrl, err := url.JoinPath(c.baseURL, GetVarPath) targetUrl, err := url.JoinPath(c.baseURL, GetVarPath)
if err != nil { if err != nil {
return err
return nil, err
} }


req := &GetVarReq{ req := &GetVarReq{
PlanID: id,
Var: v,
Signal: signal,
PlanID: id,
VarID: varID,
SignalID: singalID,
Signal: signal,
} }


resp, err := http2.GetJSON(targetUrl, http2.RequestParam{ resp, err := http2.GetJSON(targetUrl, http2.RequestParam{
Body: req, Body: req,
}) })
if err != nil { if err != nil {
return err
return nil, err
} }


body, _ := io.ReadAll(resp.Body)

if resp.StatusCode != http.StatusOK { if resp.StatusCode != http.StatusOK {
// 读取错误信息 // 读取错误信息
body, _ := io.ReadAll(resp.Body)
resp.Body.Close() resp.Body.Close()
return fmt.Errorf("error response from server: %s", string(body))
return nil, fmt.Errorf("error response from server: %s", string(body))
} }


return nil return nil


Loading…
Cancel
Save