diff --git a/pkgs/ioswitch/dag/graph.go b/pkgs/ioswitch/dag/graph.go index 5456250..d73a8e4 100644 --- a/pkgs/ioswitch/dag/graph.go +++ b/pkgs/ioswitch/dag/graph.go @@ -7,7 +7,6 @@ import ( type Graph struct { Nodes []Node isWalking bool - nextVarID int } func NewGraph() *Graph { @@ -48,28 +47,8 @@ func (g *Graph) Walk(cb func(node Node) bool) { g.Nodes = lo2.RemoveAllDefault(g.Nodes) } -func (g *Graph) NewStreamVar() *StreamVar { - str := &StreamVar{ - VarBase: VarBase{ - id: g.genVarID(), - }, - } - return str -} - -func (g *Graph) NewValueVar(valType ValueVarType) *ValueVar { - val := &ValueVar{ - VarBase: VarBase{ - id: g.genVarID(), - }, - Type: valType, - } - return val -} - -func (g *Graph) genVarID() int { - g.nextVarID++ - return g.nextVarID +func (g *Graph) NewVar() *Var { + return &Var{} } func AddNode[N Node](graph *Graph, typ N) N { diff --git a/pkgs/ioswitch/dag/node.go b/pkgs/ioswitch/dag/node.go index c547011..198cc6f 100644 --- a/pkgs/ioswitch/dag/node.go +++ b/pkgs/ioswitch/dag/node.go @@ -50,78 +50,77 @@ type Node interface { Graph() *Graph SetGraph(graph *Graph) Env() *NodeEnv - InputStreams() *InputSlots[*StreamVar] - OutputStreams() *OutputSlots[*StreamVar] - InputValues() *InputSlots[*ValueVar] - OutputValues() *OutputSlots[*ValueVar] + InputStreams() *InputSlots + OutputStreams() *OutputSlots + InputValues() *InputSlots + OutputValues() *OutputSlots GenerateOp() (exec.Op, error) // String() string } -type VarSlots[T Var] []T +type VarSlots []*Var -func (s *VarSlots[T]) Len() int { +func (s *VarSlots) Len() int { return len(*s) } -func (s *VarSlots[T]) Get(idx int) T { +func (s *VarSlots) Get(idx int) *Var { return (*s)[idx] } -func (s *VarSlots[T]) Set(idx int, val T) T { +func (s *VarSlots) Set(idx int, val *Var) *Var { old := (*s)[idx] (*s)[idx] = val return old } -func (s *VarSlots[T]) Append(val T) int { +func (s *VarSlots) Append(val *Var) int { *s = append(*s, val) return s.Len() - 1 } -func (s *VarSlots[T]) RemoveAt(idx int) { +func (s *VarSlots) RemoveAt(idx int) { (*s) = lo2.RemoveAt(*s, idx) } -func (s *VarSlots[T]) Resize(size int) { +func (s *VarSlots) Resize(size int) { if s.Len() < size { - *s = append(*s, make([]T, size-s.Len())...) + *s = append(*s, make([]*Var, size-s.Len())...) } else if s.Len() > size { *s = (*s)[:size] } } -func (s *VarSlots[T]) SetRawArray(arr []T) { +func (s *VarSlots) SetRawArray(arr []*Var) { *s = arr } -func (s *VarSlots[T]) RawArray() []T { +func (s *VarSlots) RawArray() []*Var { return *s } -type InputSlots[T Var] struct { - VarSlots[T] +type InputSlots struct { + VarSlots } -func (s *InputSlots[T]) EnsureSize(cnt int) { +func (s *InputSlots) EnsureSize(cnt int) { if s.Len() < cnt { - s.VarSlots = append(s.VarSlots, make([]T, cnt-s.Len())...) + s.VarSlots = append(s.VarSlots, make([]*Var, cnt-s.Len())...) } } -func (s *InputSlots[T]) EnlargeOne() int { - var t T - s.Append(t) +func (s *InputSlots) EnlargeOne() int { + s.Append(nil) return s.Len() - 1 } -type OutputSlots[T Var] struct { - VarSlots[T] +type OutputSlots struct { + VarSlots } -func (s *OutputSlots[T]) Setup(my Node, v T, slotIdx int) { +func (s *OutputSlots) Setup(my Node, v *Var, slotIdx int) { if s.Len() <= slotIdx { - s.VarSlots = append(s.VarSlots, make([]T, slotIdx-s.Len()+1)...) + s.VarSlots = append(s.VarSlots, make([]*Var, slotIdx-s.Len()+1)...) } s.Set(slotIdx, v) @@ -131,7 +130,7 @@ func (s *OutputSlots[T]) Setup(my Node, v T, slotIdx int) { } } -func (s *OutputSlots[T]) SetupNew(my Node, v T) { +func (s *OutputSlots) SetupNew(my Node, v *Var) { s.Append(v) *v.From() = EndPoint{ Node: my, @@ -139,21 +138,17 @@ func (s *OutputSlots[T]) SetupNew(my Node, v T) { } } -type Slot[T Var] struct { - Var T +type Slot struct { + Var *Var Index int } -type StreamSlot = Slot[*StreamVar] - -type ValueSlot = Slot[*ValueVar] - type NodeBase struct { env NodeEnv - inputStreams InputSlots[*StreamVar] - outputStreams OutputSlots[*StreamVar] - inputValues InputSlots[*ValueVar] - outputValues OutputSlots[*ValueVar] + inputStreams InputSlots + outputStreams OutputSlots + inputValues InputSlots + outputValues OutputSlots graph *Graph } @@ -169,18 +164,18 @@ func (n *NodeBase) Env() *NodeEnv { return &n.env } -func (n *NodeBase) InputStreams() *InputSlots[*StreamVar] { +func (n *NodeBase) InputStreams() *InputSlots { return &n.inputStreams } -func (n *NodeBase) OutputStreams() *OutputSlots[*StreamVar] { +func (n *NodeBase) OutputStreams() *OutputSlots { return &n.outputStreams } -func (n *NodeBase) InputValues() *InputSlots[*ValueVar] { +func (n *NodeBase) InputValues() *InputSlots { return &n.inputValues } -func (n *NodeBase) OutputValues() *OutputSlots[*ValueVar] { +func (n *NodeBase) OutputValues() *OutputSlots { return &n.outputValues } diff --git a/pkgs/ioswitch/dag/var.go b/pkgs/ioswitch/dag/var.go index 325bdb1..af74643 100644 --- a/pkgs/ioswitch/dag/var.go +++ b/pkgs/ioswitch/dag/var.go @@ -5,12 +5,6 @@ import ( "gitlink.org.cn/cloudream/common/utils/lo2" ) -type Var interface { - ID() int - From() *EndPoint - To() *EndPointSlots -} - type EndPoint struct { Node Node SlotIndex int // 所连接的Node的Output或Input数组的索引 @@ -56,66 +50,33 @@ func (s *EndPointSlots) RawArray() []EndPoint { return *s } -type VarBase struct { - id int - from EndPoint - to EndPointSlots -} - -func (v *VarBase) ID() int { - return v.id +type Var struct { + VarID exec.VarID + from EndPoint + to EndPointSlots } -func (v *VarBase) From() *EndPoint { +func (v *Var) From() *EndPoint { return &v.from } -func (v *VarBase) To() *EndPointSlots { +func (v *Var) To() *EndPointSlots { return &v.to } -type StreamVar struct { - VarBase - Var *exec.StreamVar -} - -func (v *StreamVar) Connect(to Node, slotIdx int) { +func (v *Var) Connect(to Node, slotIdx int) { v.To().Add(EndPoint{Node: to, SlotIndex: slotIdx}) - to.InputStreams().Set(slotIdx, v) + to.InputValues().Set(slotIdx, v) } -func (v *StreamVar) Disconnect(node Node, slotIdx int) { +func (v *Var) Disconnect(node Node, slotIdx int) { v.to.Remove(EndPoint{Node: node, SlotIndex: slotIdx}) - node.InputStreams().Set(slotIdx, nil) + node.InputValues().Set(slotIdx, nil) } -func (v *StreamVar) DisconnectAll() { +func (v *Var) DisconnectAll() { for _, ed := range v.to { ed.Node.InputStreams().Set(ed.SlotIndex, nil) } v.to = nil } - -type ValueVarType int - -const ( - UnknownValueVar ValueVarType = iota - StringValueVar - SignalValueVar -) - -type ValueVar struct { - VarBase - Type ValueVarType - Var exec.Var -} - -func (v *ValueVar) Connect(to Node, slotIdx int) { - v.To().Add(EndPoint{Node: to, SlotIndex: slotIdx}) - to.InputValues().Set(slotIdx, v) -} - -func (v *ValueVar) Disconnect(node Node, slotIdx int) { - v.to.Remove(EndPoint{Node: node, SlotIndex: slotIdx}) - node.InputValues().Set(slotIdx, nil) -} diff --git a/pkgs/ioswitch/exec/driver.go b/pkgs/ioswitch/exec/driver.go index 088334e..06cb066 100644 --- a/pkgs/ioswitch/exec/driver.go +++ b/pkgs/ioswitch/exec/driver.go @@ -21,27 +21,25 @@ type Driver struct { // 开始写入一个流。此函数会将输入视为一个完整的流,因此会给流包装一个Range来获取只需要的部分。 func (e *Driver) BeginWrite(str io.ReadCloser, handle *DriverWriteStream) { - handle.Var.Stream = io2.NewRange(str, handle.RangeHint.Offset, handle.RangeHint.Length) - e.driverExec.PutVars(handle.Var) + e.driverExec.PutVar(handle.ID, &StreamValue{Stream: io2.NewRange(str, handle.RangeHint.Offset, handle.RangeHint.Length)}) } // 开始写入一个流。此函数默认输入流已经是Handle的RangeHint锁描述的范围,因此不会做任何其他处理 func (e *Driver) BeginWriteRanged(str io.ReadCloser, handle *DriverWriteStream) { - handle.Var.Stream = str - e.driverExec.PutVars(handle.Var) + e.driverExec.PutVar(handle.ID, &StreamValue{Stream: str}) } func (e *Driver) BeginRead(handle *DriverReadStream) (io.ReadCloser, error) { - err := e.driverExec.BindVars(e.ctx.Context, handle.Var) + str, err := BindVar[*StreamValue](e.driverExec, e.ctx.Context, handle.ID) if err != nil { return nil, fmt.Errorf("bind vars: %w", err) } - return handle.Var.Stream, nil + return str.Stream, nil } func (e *Driver) Signal(signal *DriverSignalVar) { - e.driverExec.PutVars(signal.Var) + e.driverExec.PutVar(signal.ID, &SignalValue{}) } func (e *Driver) Wait(ctx context.Context) (map[string]any, error) { @@ -99,14 +97,15 @@ func (e *Driver) stopWith(err error) { } type DriverWriteStream struct { - Var *StreamVar + ID VarID RangeHint *Range } type DriverReadStream struct { - Var *StreamVar + ID VarID } type DriverSignalVar struct { - Var *SignalVar + ID VarID + Signal SignalValue } diff --git a/pkgs/ioswitch/exec/executor.go b/pkgs/ioswitch/exec/executor.go index c4c36aa..6135533 100644 --- a/pkgs/ioswitch/exec/executor.go +++ b/pkgs/ioswitch/exec/executor.go @@ -10,16 +10,15 @@ import ( "gitlink.org.cn/cloudream/common/utils/sync2" ) -type bindingVars struct { - Waittings []Var - Bindeds []Var - Callback *future.SetVoidFuture +type binding struct { + ID VarID + Callback *future.SetValueFuture[VarValue] } type Executor struct { plan Plan vars map[VarID]Var - bindings []*bindingVars + bindings []*binding lock sync.Mutex store map[string]any } @@ -64,81 +63,44 @@ func (s *Executor) Run(ctx *ExecContext) (map[string]any, error) { return s.store, nil } -func (s *Executor) BindVars(ctx context.Context, vs ...Var) error { +func (s *Executor) BindVar(ctx context.Context, id VarID) (VarValue, error) { s.lock.Lock() - callback := future.NewSetVoid() - binding := &bindingVars{ - Callback: callback, - } - - for _, v := range vs { - v2 := s.vars[v.GetID()] - if v2 == nil { - binding.Waittings = append(binding.Waittings, v) - continue - } - - if err := AssignVar(v2, v); err != nil { - s.lock.Unlock() - return fmt.Errorf("assign var %v to %v: %w", v2.GetID(), v.GetID(), err) - } - - binding.Bindeds = append(binding.Bindeds, v) - } - - if len(binding.Waittings) == 0 { + gv, ok := s.vars[id] + if ok { + delete(s.vars, id) s.lock.Unlock() - return nil + return gv.Value, nil } - s.bindings = append(s.bindings, binding) - s.lock.Unlock() - - err := callback.Wait(ctx) - - s.lock.Lock() - defer s.lock.Unlock() - - s.bindings = lo2.Remove(s.bindings, binding) + callback := future.NewSetValue[VarValue]() + s.bindings = append(s.bindings, &binding{ + ID: id, + Callback: callback, + }) - return err + s.lock.Unlock() + return callback.Wait(ctx) } -func (s *Executor) PutVars(vs ...Var) { +func (s *Executor) PutVar(id VarID, value VarValue) *Executor { s.lock.Lock() defer s.lock.Unlock() -loop: - for _, v := range vs { - for ib, b := range s.bindings { - for iw, w := range b.Waittings { - if w.GetID() != v.GetID() { - continue - } - - if err := AssignVar(v, w); err != nil { - b.Callback.SetError(fmt.Errorf("assign var %v to %v: %w", v.GetID(), w.GetID(), err)) - // 绑定类型不对,说明生成的执行计划有问题,怎么处理都可以,因为最终会执行失败 - continue loop - } - - b.Bindeds = append(b.Bindeds, w) - b.Waittings = lo2.RemoveAt(b.Waittings, iw) - if len(b.Waittings) == 0 { - b.Callback.SetVoid() - s.bindings = lo2.RemoveAt(s.bindings, ib) - } - - // 绑定成功,继续最外层循环 - continue loop - } - + for ib, b := range s.bindings { + if b.ID != id { + continue } - // 如果没有绑定,则直接放入变量表中 - s.vars[v.GetID()] = v + b.Callback.SetValue(value) + s.bindings = lo2.RemoveAt(s.bindings, ib) + + return s } + + // 如果没有绑定,则直接放入变量表中 + s.vars[id] = Var{ID: id, Value: value} + return s } func (s *Executor) Store(key string, val any) { @@ -148,20 +110,43 @@ func (s *Executor) Store(key string, val any) { s.store[key] = val } -func BindArrayVars[T Var](sw *Executor, ctx context.Context, vs []T) error { - var vs2 []Var - for _, v := range vs { - vs2 = append(vs2, v) +func BindVar[T VarValue](e *Executor, ctx context.Context, id VarID) (T, error) { + v, err := e.BindVar(ctx, id) + if err != nil { + var def T + return def, err } - return sw.BindVars(ctx, vs2...) + ret, ok := v.(T) + if !ok { + var def T + return def, fmt.Errorf("binded var %v is %T, not %T", id, v, def) + } + + return ret, nil } -func PutArrayVars[T Var](sw *Executor, vs []T) { - var vs2 []Var - for _, v := range vs { - vs2 = append(vs2, v) +func BindArray[T VarValue](e *Executor, ctx context.Context, ids []VarID) ([]T, error) { + ret := make([]T, len(ids)) + for i := range ids { + v, err := e.BindVar(ctx, ids[i]) + if err != nil { + return nil, err + } + + v2, ok := v.(T) + if !ok { + var def T + return nil, fmt.Errorf("binded var %v is %T, not %T", ids[i], v, def) + } + + ret[i] = v2 } + return ret, nil +} - sw.PutVars(vs2...) +func PutArray[T VarValue](e *Executor, ids []VarID, values []T) { + for i := range ids { + e.PutVar(ids[i], values[i]) + } } diff --git a/pkgs/ioswitch/exec/plan_builder.go b/pkgs/ioswitch/exec/plan_builder.go index 6a7202d..d177440 100644 --- a/pkgs/ioswitch/exec/plan_builder.go +++ b/pkgs/ioswitch/exec/plan_builder.go @@ -9,13 +9,14 @@ import ( ) type PlanBuilder struct { - Vars []Var + NextVarID VarID WorkerPlans []*WorkerPlanBuilder DriverPlan DriverPlanBuilder } func NewPlanBuilder() *PlanBuilder { bld := &PlanBuilder{ + NextVarID: VarID(1), DriverPlan: DriverPlanBuilder{}, } @@ -41,39 +42,11 @@ func (b *PlanBuilder) AtWorker(worker WorkerInfo) *WorkerPlanBuilder { return p } -func (b *PlanBuilder) NewStreamVar() *StreamVar { - v := &StreamVar{ - ID: VarID(len(b.Vars)), - } - b.Vars = append(b.Vars, v) - - return v -} - -func (b *PlanBuilder) NewIntVar() *IntVar { - v := &IntVar{ - ID: VarID(len(b.Vars)), - } - b.Vars = append(b.Vars, v) - - return v -} - -func (b *PlanBuilder) NewStringVar() *StringVar { - v := &StringVar{ - ID: VarID(len(b.Vars)), - } - b.Vars = append(b.Vars, v) - - return v -} -func (b *PlanBuilder) NewSignalVar() *SignalVar { - v := &SignalVar{ - ID: VarID(len(b.Vars)), - } - b.Vars = append(b.Vars, v) +func (b *PlanBuilder) NewVar() VarID { + id := b.NextVarID + b.NextVarID++ - return v + return id } func (b *PlanBuilder) Execute(ctx *ExecContext) *Driver { diff --git a/pkgs/ioswitch/exec/utils.go b/pkgs/ioswitch/exec/utils.go index 4112669..f876f2c 100644 --- a/pkgs/ioswitch/exec/utils.go +++ b/pkgs/ioswitch/exec/utils.go @@ -1,9 +1,6 @@ package exec import ( - "fmt" - "reflect" - "github.com/google/uuid" "gitlink.org.cn/cloudream/common/utils/math2" ) @@ -12,24 +9,6 @@ func genRandomPlanID() PlanID { return PlanID(uuid.NewString()) } -func AssignVar(from Var, to Var) error { - if reflect.TypeOf(from) != reflect.TypeOf(to) { - return fmt.Errorf("cannot assign %T to %T", from, to) - } - - switch from := from.(type) { - case *StreamVar: - to.(*StreamVar).Stream = from.Stream - case *IntVar: - to.(*IntVar).Value = from.Value - case *StringVar: - to.(*StringVar).Value = from.Value - case *SignalVar: - } - - return nil -} - type Range struct { Offset int64 Length *int64 diff --git a/pkgs/ioswitch/exec/var.go b/pkgs/ioswitch/exec/var.go index 62e76ad..383b0bf 100644 --- a/pkgs/ioswitch/exec/var.go +++ b/pkgs/ioswitch/exec/var.go @@ -4,54 +4,81 @@ import ( "io" "gitlink.org.cn/cloudream/common/pkgs/types" + "gitlink.org.cn/cloudream/common/utils/reflect2" "gitlink.org.cn/cloudream/common/utils/serder" ) type VarID int -type Var interface { - GetID() VarID +type Var struct { + ID VarID `json:"id"` + Value VarValue `json:"value"` } -var VarUnion = types.NewTypeUnion[Var]( - (*IntVar)(nil), - (*StringVar)(nil), - (*SignalVar)(nil), - (*StreamVar)(nil), -) -var _ = serder.UseTypeUnionExternallyTagged(&VarUnion) +type VarPack[T VarValue] struct { + ID VarID `json:"id"` + Value T `json:"value"` +} -type StreamVar struct { - ID VarID `json:"id"` - Stream io.ReadCloser `json:"-"` +func (v *VarPack[T]) ToAny() AnyVar { + return AnyVar{ + ID: v.ID, + Value: v.Value, + } } -func (v *StreamVar) GetID() VarID { - return v.ID +// 变量的值 +type VarValue interface { + Clone() VarValue } -type IntVar struct { - ID VarID `json:"id"` - Value string `json:"value"` +var valueUnion = serder.UseTypeUnionExternallyTagged(types.Ref(types.NewTypeUnion[VarValue]( + (*StreamValue)(nil), + (*SignalValue)(nil), + (*StringValue)(nil), +))) + +func UseVarValue[T VarValue]() { + valueUnion.Add(reflect2.TypeOf[T]()) } -func (v *IntVar) GetID() VarID { - return v.ID +type AnyVar = VarPack[VarValue] + +func V(id VarID, value VarValue) AnyVar { + return AnyVar{ + ID: id, + Value: value, + } } -type StringVar struct { - ID VarID `json:"id"` - Value string `json:"value"` +type StreamValue struct { + Stream io.ReadCloser `json:"-"` } -func (v *StringVar) GetID() VarID { - return v.ID +// 不应该被调用 +func (v *StreamValue) Clone() VarValue { + panic("StreamValue should not be cloned") } -type SignalVar struct { - ID VarID `json:"id"` +type SignalValue struct{} + +func (o *SignalValue) Clone() VarValue { + return &SignalValue{} +} + +type SignalVar = VarPack[*SignalValue] + +func NewSignal(id VarID) SignalVar { + return SignalVar{ + ID: id, + Value: &SignalValue{}, + } +} + +type StringValue struct { + Value string `json:"value"` } -func (v *SignalVar) GetID() VarID { - return v.ID +func (o *StringValue) Clone() VarValue { + return &StringValue{Value: o.Value} } diff --git a/pkgs/ioswitch/exec/worker.go b/pkgs/ioswitch/exec/worker.go index 8f6e2c7..d7fb915 100644 --- a/pkgs/ioswitch/exec/worker.go +++ b/pkgs/ioswitch/exec/worker.go @@ -95,9 +95,12 @@ type WorkerInfo interface { type WorkerClient interface { ExecutePlan(ctx context.Context, plan Plan) error - SendStream(ctx context.Context, planID PlanID, v *StreamVar, str io.ReadCloser) error - SendVar(ctx context.Context, planID PlanID, v Var) error - GetStream(ctx context.Context, planID PlanID, v *StreamVar, signal *SignalVar) (io.ReadCloser, error) - GetVar(ctx context.Context, planID PlanID, v Var, signal *SignalVar) error + + SendStream(ctx context.Context, planID PlanID, id VarID, stream io.ReadCloser) error + SendVar(ctx context.Context, planID PlanID, id VarID, value VarValue) error + + GetStream(ctx context.Context, planID PlanID, streamID VarID, signalID VarID, signal VarValue) (io.ReadCloser, error) + GetVar(ctx context.Context, planID PlanID, varID VarID, signalID VarID, signal VarValue) (VarValue, error) + Close() error } diff --git a/pkgs/ioswitch/plan/generate.go b/pkgs/ioswitch/plan/generate.go index e11324f..db32a84 100644 --- a/pkgs/ioswitch/plan/generate.go +++ b/pkgs/ioswitch/plan/generate.go @@ -1,8 +1,6 @@ package plan import ( - "fmt" - "gitlink.org.cn/cloudream/common/pkgs/ioswitch/dag" "gitlink.org.cn/cloudream/common/pkgs/ioswitch/exec" "gitlink.org.cn/cloudream/common/pkgs/ioswitch/plan/ops" @@ -122,57 +120,41 @@ func buildPlan(graph *dag.Graph, blder *exec.PlanBuilder) error { for i := 0; i < node.OutputStreams().Len(); i++ { out := node.OutputStreams().Get(i) - if out.Var != nil { + if out.VarID > 0 { continue } - out.Var = blder.NewStreamVar() + out.VarID = blder.NewVar() } for i := 0; i < node.InputStreams().Len(); i++ { in := node.InputStreams().Get(i) - if in.Var != nil { + if in.VarID > 0 { continue } - in.Var = blder.NewStreamVar() + in.VarID = blder.NewVar() } for i := 0; i < node.OutputValues().Len(); i++ { out := node.OutputValues().Get(i) - if out.Var != nil { + if out.VarID > 0 { continue } - switch out.Type { - case dag.StringValueVar: - out.Var = blder.NewStringVar() - case dag.SignalValueVar: - out.Var = blder.NewSignalVar() - default: - retErr = fmt.Errorf("unsupported value var type: %v", out.Type) - return false - } + out.VarID = blder.NewVar() } for i := 0; i < node.InputValues().Len(); i++ { in := node.InputValues().Get(i) - if in.Var != nil { + if in.VarID > 0 { continue } - switch in.Type { - case dag.StringValueVar: - in.Var = blder.NewStringVar() - case dag.SignalValueVar: - in.Var = blder.NewSignalVar() - default: - retErr = fmt.Errorf("unsupported value var type: %v", in.Type) - return false - } + in.VarID = blder.NewVar() } op, err := node.GenerateOp() diff --git a/pkgs/ioswitch/plan/ops/driver.go b/pkgs/ioswitch/plan/ops/driver.go index 0ec9469..192e665 100644 --- a/pkgs/ioswitch/plan/ops/driver.go +++ b/pkgs/ioswitch/plan/ops/driver.go @@ -16,20 +16,20 @@ func (b *GraphNodeBuilder) NewFromDriver(handle *exec.DriverWriteStream) *FromDr } b.AddNode(node) - node.OutputStreams().SetupNew(node, b.NewStreamVar()) + node.OutputStreams().SetupNew(node, b.NewVar()) return node } -func (t *FromDriverNode) Output() dag.StreamSlot { - return dag.StreamSlot{ +func (t *FromDriverNode) Output() dag.Slot { + return dag.Slot{ Var: t.OutputStreams().Get(0), Index: 0, } } func (t *FromDriverNode) GenerateOp() (exec.Op, error) { - t.Handle.Var = t.OutputStreams().Get(0).Var + t.Handle.ID = t.OutputStreams().Get(0).VarID return nil, nil } @@ -52,20 +52,20 @@ func (b *GraphNodeBuilder) NewToDriver(handle *exec.DriverReadStream) *ToDriverN return node } -func (t *ToDriverNode) SetInput(v *dag.StreamVar) { +func (t *ToDriverNode) SetInput(v *dag.Var) { t.InputStreams().EnsureSize(1) v.Connect(t, 0) } -func (t *ToDriverNode) Input() dag.StreamSlot { - return dag.StreamSlot{ +func (t *ToDriverNode) Input() dag.Slot { + return dag.Slot{ Var: t.InputStreams().Get(0), Index: 0, } } func (t *ToDriverNode) GenerateOp() (exec.Op, error) { - t.Handle.Var = t.InputStreams().Get(0).Var + t.Handle.ID = t.InputStreams().Get(0).VarID return nil, nil } diff --git a/pkgs/ioswitch/plan/ops/drop.go b/pkgs/ioswitch/plan/ops/drop.go index b303f0b..dcfdc78 100644 --- a/pkgs/ioswitch/plan/ops/drop.go +++ b/pkgs/ioswitch/plan/ops/drop.go @@ -13,19 +13,19 @@ func init() { } type DropStream struct { - Input *exec.StreamVar `json:"input"` + Input exec.VarID `json:"input"` } func (o *DropStream) Execute(ctx *exec.ExecContext, e *exec.Executor) error { - err := e.BindVars(ctx.Context, o.Input) + str, err := exec.BindVar[*exec.StreamValue](e, ctx.Context, o.Input) if err != nil { return err } - defer o.Input.Stream.Close() + defer str.Stream.Close() for { buf := make([]byte, 1024*8) - _, err = o.Input.Stream.Read(buf) + _, err = str.Stream.Read(buf) if err == io.EOF { return nil } @@ -36,7 +36,7 @@ func (o *DropStream) Execute(ctx *exec.ExecContext, e *exec.Executor) error { } func (o *DropStream) String() string { - return fmt.Sprintf("DropStream %v", o.Input.ID) + return fmt.Sprintf("DropStream %v", o.Input) } type DropNode struct { @@ -49,14 +49,14 @@ func (b *GraphNodeBuilder) NewDropStream() *DropNode { return node } -func (t *DropNode) SetInput(v *dag.StreamVar) { +func (t *DropNode) SetInput(v *dag.Var) { t.InputStreams().EnsureSize(1) v.Connect(t, 0) } func (t *DropNode) GenerateOp() (exec.Op, error) { return &DropStream{ - Input: t.InputStreams().Get(0).Var, + Input: t.InputStreams().Get(0).VarID, }, nil } diff --git a/pkgs/ioswitch/plan/ops/send.go b/pkgs/ioswitch/plan/ops/send.go index be60082..e6eb9dc 100644 --- a/pkgs/ioswitch/plan/ops/send.go +++ b/pkgs/ioswitch/plan/ops/send.go @@ -18,17 +18,17 @@ func init() { } type SendStream struct { - Input *exec.StreamVar `json:"input"` - Send *exec.StreamVar `json:"send"` + Input exec.VarID `json:"input"` + Send exec.VarID `json:"send"` Worker exec.WorkerInfo `json:"worker"` } func (o *SendStream) Execute(ctx *exec.ExecContext, e *exec.Executor) error { - err := e.BindVars(ctx.Context, o.Input) + inputStr, err := exec.BindVar[*exec.StreamValue](e, ctx.Context, o.Input) if err != nil { return err } - defer o.Input.Stream.Close() + defer inputStr.Stream.Close() cli, err := o.Worker.NewClient() if err != nil { @@ -37,7 +37,7 @@ func (o *SendStream) Execute(ctx *exec.ExecContext, e *exec.Executor) error { defer cli.Close() // 发送后流的ID不同 - err = cli.SendStream(ctx.Context, e.Plan().ID, o.Send, o.Input.Stream) + err = cli.SendStream(ctx.Context, e.Plan().ID, o.Send, inputStr.Stream) if err != nil { return fmt.Errorf("sending stream: %w", err) } @@ -46,14 +46,14 @@ func (o *SendStream) Execute(ctx *exec.ExecContext, e *exec.Executor) error { } func (o *SendStream) String() string { - return fmt.Sprintf("SendStream %v->%v@%v", o.Input.ID, o.Send.ID, o.Worker) + return fmt.Sprintf("SendStream %v->%v@%v", o.Input, o.Send, o.Worker) } type GetStream struct { - Signal *exec.SignalVar `json:"signal"` - Target *exec.StreamVar `json:"target"` - Output *exec.StreamVar `json:"output"` - Worker exec.WorkerInfo `json:"worker"` + Signal exec.VarPack[*exec.SignalValue] `json:"signal"` + Target exec.VarID `json:"target"` + Output exec.VarID `json:"output"` + Worker exec.WorkerInfo `json:"worker"` } func (o *GetStream) Execute(ctx *exec.ExecContext, e *exec.Executor) error { @@ -63,33 +63,33 @@ func (o *GetStream) Execute(ctx *exec.ExecContext, e *exec.Executor) error { } defer cli.Close() - str, err := cli.GetStream(ctx.Context, e.Plan().ID, o.Target, o.Signal) + str, err := cli.GetStream(ctx.Context, e.Plan().ID, o.Target, o.Signal.ID, o.Signal.Value) if err != nil { return fmt.Errorf("getting stream: %w", err) } fut := future.NewSetVoid() // 获取后送到本地的流ID是不同的 - o.Output.Stream = io2.AfterReadClosedOnce(str, func(closer io.ReadCloser) { + str = io2.AfterReadClosedOnce(str, func(closer io.ReadCloser) { fut.SetVoid() }) - e.PutVars(o.Output) + e.PutVar(o.Output, &exec.StreamValue{Stream: str}) return fut.Wait(ctx.Context) } func (o *GetStream) String() string { - return fmt.Sprintf("GetStream %v(S:%v)<-%v@%v", o.Output.ID, o.Signal.ID, o.Target.ID, o.Worker) + return fmt.Sprintf("GetStream %v(S:%v)<-%v@%v", o.Output, o.Signal.ID, o.Target, o.Worker) } type SendVar struct { - Input exec.Var `json:"input"` - Send exec.Var `json:"send"` + Input exec.VarID `json:"input"` + Send exec.VarID `json:"send"` Worker exec.WorkerInfo `json:"worker"` } func (o *SendVar) Execute(ctx *exec.ExecContext, e *exec.Executor) error { - err := e.BindVars(ctx.Context, o.Input) + input, err := e.BindVar(ctx.Context, o.Input) if err != nil { return err } @@ -100,8 +100,7 @@ func (o *SendVar) Execute(ctx *exec.ExecContext, e *exec.Executor) error { } defer cli.Close() - exec.AssignVar(o.Input, o.Send) - err = cli.SendVar(ctx.Context, e.Plan().ID, o.Send) + err = cli.SendVar(ctx.Context, e.Plan().ID, o.Send, input) if err != nil { return fmt.Errorf("sending var: %w", err) } @@ -110,14 +109,14 @@ func (o *SendVar) Execute(ctx *exec.ExecContext, e *exec.Executor) error { } func (o *SendVar) String() string { - return fmt.Sprintf("SendVar %v->%v@%v", o.Input.GetID(), o.Send.GetID(), o.Worker) + return fmt.Sprintf("SendVar %v->%v@%v", o.Input, o.Send, o.Worker) } type GetVar struct { - Signal *exec.SignalVar `json:"signal"` - Target exec.Var `json:"target"` - Output exec.Var `json:"output"` - Worker exec.WorkerInfo `json:"worker"` + Signal exec.VarPack[*exec.SignalValue] `json:"signal"` + Target exec.VarID `json:"target"` + Output exec.VarID `json:"output"` + Worker exec.WorkerInfo `json:"worker"` } func (o *GetVar) Execute(ctx *exec.ExecContext, e *exec.Executor) error { @@ -127,18 +126,18 @@ func (o *GetVar) Execute(ctx *exec.ExecContext, e *exec.Executor) error { } defer cli.Close() - err = cli.GetVar(ctx.Context, e.Plan().ID, o.Target, o.Signal) + get, err := cli.GetVar(ctx.Context, e.Plan().ID, o.Target, o.Signal.ID, o.Signal.Value) if err != nil { return fmt.Errorf("getting var: %w", err) } - exec.AssignVar(o.Target, o.Output) - e.PutVars(o.Output) + + e.PutVar(o.Output, get) return nil } func (o *GetVar) String() string { - return fmt.Sprintf("GetVar %v(S:%v)<-%v@%v", o.Output.GetID(), o.Signal.ID, o.Target.GetID(), o.Worker) + return fmt.Sprintf("GetVar %v(S:%v)<-%v@%v", o.Output, o.Signal.ID, o.Target, o.Worker) } type SendStreamNode struct { @@ -154,18 +153,18 @@ func (b *GraphNodeBuilder) NewSendStream(to exec.WorkerInfo) *SendStreamNode { return node } -func (t *SendStreamNode) Send(v *dag.StreamVar) *dag.StreamVar { +func (t *SendStreamNode) Send(v *dag.Var) *dag.Var { t.InputStreams().EnsureSize(1) v.Connect(t, 0) - output := t.Graph().NewStreamVar() + output := t.Graph().NewVar() t.OutputStreams().Setup(t, output, 0) return output } func (t *SendStreamNode) GenerateOp() (exec.Op, error) { return &SendStream{ - Input: t.InputStreams().Get(0).Var, - Send: t.OutputStreams().Get(0).Var, + Input: t.InputStreams().Get(0).VarID, + Send: t.OutputStreams().Get(0).VarID, Worker: t.ToWorker, }, nil } @@ -187,18 +186,18 @@ func (b *GraphNodeBuilder) NewSendValue(to exec.WorkerInfo) *SendValueNode { return node } -func (t *SendValueNode) Send(v *dag.ValueVar) *dag.ValueVar { +func (t *SendValueNode) Send(v *dag.Var) *dag.Var { t.InputValues().EnsureSize(1) v.Connect(t, 0) - output := t.Graph().NewValueVar(v.Type) + output := t.Graph().NewVar() t.OutputValues().Setup(t, output, 0) return output } func (t *SendValueNode) GenerateOp() (exec.Op, error) { return &SendVar{ - Input: t.InputValues().Get(0).Var, - Send: t.OutputValues().Get(0).Var, + Input: t.InputValues().Get(0).VarID, + Send: t.OutputValues().Get(0).VarID, Worker: t.ToWorker, }, nil } @@ -217,27 +216,27 @@ func (b *GraphNodeBuilder) NewGetStream(from exec.WorkerInfo) *GetStreamNode { FromWorker: from, } b.AddNode(node) - node.OutputValues().Setup(node, node.Graph().NewValueVar(dag.SignalValueVar), 0) + node.OutputValues().Setup(node, node.Graph().NewVar(), 0) return node } -func (t *GetStreamNode) Get(v *dag.StreamVar) *dag.StreamVar { +func (t *GetStreamNode) Get(v *dag.Var) *dag.Var { t.InputStreams().EnsureSize(1) v.Connect(t, 0) - output := t.Graph().NewStreamVar() + output := t.Graph().NewVar() t.OutputStreams().Setup(t, output, 0) return output } -func (t *GetStreamNode) SignalVar() *dag.ValueVar { +func (t *GetStreamNode) SignalVar() *dag.Var { return t.OutputValues().Get(0) } func (t *GetStreamNode) GenerateOp() (exec.Op, error) { return &GetStream{ - Signal: t.OutputValues().Get(0).Var.(*exec.SignalVar), - Output: t.OutputStreams().Get(0).Var, - Target: t.InputStreams().Get(0).Var, + Signal: exec.NewSignal(t.OutputValues().Get(0).VarID), + Output: t.OutputStreams().Get(0).VarID, + Target: t.InputStreams().Get(0).VarID, Worker: t.FromWorker, }, nil } @@ -256,27 +255,27 @@ func (b *GraphNodeBuilder) NewGetValue(from exec.WorkerInfo) *GetValueNode { FromWorker: from, } b.AddNode(node) - node.OutputValues().Setup(node, node.Graph().NewValueVar(dag.SignalValueVar), 0) + node.OutputValues().Setup(node, node.Graph().NewVar(), 0) return node } -func (t *GetValueNode) Get(v *dag.ValueVar) *dag.ValueVar { +func (t *GetValueNode) Get(v *dag.Var) *dag.Var { t.InputValues().EnsureSize(1) v.Connect(t, 0) - output := t.Graph().NewValueVar(v.Type) + output := t.Graph().NewVar() t.OutputValues().Setup(t, output, 1) return output } -func (t *GetValueNode) SignalVar() *dag.ValueVar { +func (t *GetValueNode) SignalVar() *dag.Var { return t.OutputValues().Get(0) } func (t *GetValueNode) GenerateOp() (exec.Op, error) { return &GetVar{ - Signal: t.OutputValues().Get(0).Var.(*exec.SignalVar), - Output: t.OutputValues().Get(1).Var, - Target: t.InputValues().Get(0).Var, + Signal: exec.NewSignal(t.OutputValues().Get(0).VarID), + Output: t.OutputValues().Get(1).VarID, + Target: t.InputValues().Get(0).VarID, Worker: t.FromWorker, }, nil } diff --git a/pkgs/ioswitch/plan/ops/store.go b/pkgs/ioswitch/plan/ops/store.go index 0b175f5..f51bbe4 100644 --- a/pkgs/ioswitch/plan/ops/store.go +++ b/pkgs/ioswitch/plan/ops/store.go @@ -8,28 +8,22 @@ import ( ) type Store struct { - Var exec.Var + Var exec.VarID Key string } func (o *Store) Execute(ctx *exec.ExecContext, e *exec.Executor) error { - err := e.BindVars(ctx.Context, o.Var) + v, err := e.BindVar(ctx.Context, o.Var) if err != nil { return err } - switch v := o.Var.(type) { - case *exec.IntVar: - e.Store(o.Key, v.Value) - case *exec.StringVar: - e.Store(o.Key, v.Value) - } - + e.Store(o.Key, v) return nil } func (o *Store) String() string { - return fmt.Sprintf("Store %v: %v", o.Key, o.Var.GetID()) + return fmt.Sprintf("Store %v: %v", o.Key, o.Var) } type StoreNode struct { @@ -43,7 +37,7 @@ func (b *GraphNodeBuilder) NewStore() *StoreNode { return node } -func (t *StoreNode) Store(key string, v *dag.ValueVar) { +func (t *StoreNode) Store(key string, v *dag.Var) { t.Key = key t.InputValues().EnsureSize(1) v.Connect(t, 0) @@ -51,7 +45,7 @@ func (t *StoreNode) Store(key string, v *dag.ValueVar) { func (t *StoreNode) GenerateOp() (exec.Op, error) { return &Store{ - Var: t.InputValues().Get(0).Var, + Var: t.InputValues().Get(0).VarID, Key: t.Key, }, nil } diff --git a/pkgs/ioswitch/plan/ops/sync.go b/pkgs/ioswitch/plan/ops/sync.go index 07149ce..d9ac791 100644 --- a/pkgs/ioswitch/plan/ops/sync.go +++ b/pkgs/ioswitch/plan/ops/sync.go @@ -19,30 +19,29 @@ func init() { } type OnStreamBegin struct { - Raw *exec.StreamVar `json:"raw"` - New *exec.StreamVar `json:"new"` - Signal *exec.SignalVar `json:"signal"` + Raw exec.VarID `json:"raw"` + New exec.VarID `json:"new"` + Signal exec.SignalVar `json:"signal"` } func (o *OnStreamBegin) Execute(ctx *exec.ExecContext, e *exec.Executor) error { - err := e.BindVars(ctx.Context, o.Raw) + raw, err := exec.BindVar[*exec.StreamValue](e, ctx.Context, o.Raw) if err != nil { return err } - o.New.Stream = o.Raw.Stream - - e.PutVars(o.New, o.Signal) + e.PutVar(o.New, &exec.StreamValue{Stream: raw.Stream}). + PutVar(o.Signal.ID, o.Signal.Value) return nil } func (o *OnStreamBegin) String() string { - return fmt.Sprintf("OnStreamBegin %v->%v S:%v", o.Raw.ID, o.New.ID, o.Signal.ID) + return fmt.Sprintf("OnStreamBegin %v->%v S:%v", o.Raw, o.New, o.Signal.ID) } type OnStreamEnd struct { - Raw *exec.StreamVar `json:"raw"` - New *exec.StreamVar `json:"new"` + Raw exec.VarID `json:"raw"` + New exec.VarID `json:"new"` Signal *exec.SignalVar `json:"signal"` } @@ -67,57 +66,49 @@ func (o *onStreamEnd) Close() error { } func (o *OnStreamEnd) Execute(ctx *exec.ExecContext, e *exec.Executor) error { - err := e.BindVars(ctx.Context, o.Raw) + raw, err := exec.BindVar[*exec.StreamValue](e, ctx.Context, o.Raw) if err != nil { return err } cb := future.NewSetVoid() - o.New.Stream = &onStreamEnd{ - inner: o.Raw.Stream, + e.PutVar(o.New, &exec.StreamValue{Stream: &onStreamEnd{ + inner: raw.Stream, callback: cb, - } - e.PutVars(o.New) + }}) err = cb.Wait(ctx.Context) if err != nil { return err } - e.PutVars(o.Signal) + e.PutVar(o.Signal.ID, o.Signal.Value) return nil } func (o *OnStreamEnd) String() string { - return fmt.Sprintf("OnStreamEnd %v->%v S:%v", o.Raw.ID, o.New.ID, o.Signal.ID) + return fmt.Sprintf("OnStreamEnd %v->%v S:%v", o.Raw, o.New, o.Signal.ID) } type HoldUntil struct { - Waits []*exec.SignalVar `json:"waits"` - Holds []exec.Var `json:"holds"` - Emits []exec.Var `json:"emits"` + Waits []exec.VarID `json:"waits"` + Holds []exec.VarID `json:"holds"` + Emits []exec.VarID `json:"emits"` } func (w *HoldUntil) Execute(ctx *exec.ExecContext, e *exec.Executor) error { - err := e.BindVars(ctx.Context, w.Holds...) + holds, err := exec.BindArray[exec.VarValue](e, ctx.Context, w.Holds) if err != nil { return err } - err = exec.BindArrayVars(e, ctx.Context, w.Waits) + _, err = exec.BindArray[exec.VarValue](e, ctx.Context, w.Waits) if err != nil { return err } - for i := 0; i < len(w.Holds); i++ { - err := exec.AssignVar(w.Holds[i], w.Emits[i]) - if err != nil { - return err - } - } - - e.PutVars(w.Emits...) + exec.PutArray(e, w.Emits, holds) return nil } @@ -126,12 +117,12 @@ func (w *HoldUntil) String() string { } type HangUntil struct { - Waits []*exec.SignalVar `json:"waits"` - Op exec.Op `json:"op"` + Waits []exec.VarID `json:"waits"` + Op exec.Op `json:"op"` } func (h *HangUntil) Execute(ctx *exec.ExecContext, e *exec.Executor) error { - err := exec.BindArrayVars(e, ctx.Context, h.Waits) + _, err := exec.BindArray[exec.VarValue](e, ctx.Context, h.Waits) if err != nil { return err } @@ -144,17 +135,22 @@ func (h *HangUntil) String() string { } type Broadcast struct { - Source *exec.SignalVar `json:"source"` - Targets []*exec.SignalVar `json:"targets"` + Source exec.VarID `json:"source"` + Targets []exec.VarID `json:"targets"` } func (b *Broadcast) Execute(ctx *exec.ExecContext, e *exec.Executor) error { - err := e.BindVars(ctx.Context, b.Source) + src, err := exec.BindVar[*exec.SignalValue](e, ctx.Context, b.Source) if err != nil { return err } - exec.PutArrayVars(e, b.Targets) + targets := make([]exec.VarValue, len(b.Targets)) + for i := 0; i < len(b.Targets); i++ { + targets[i] = src.Clone() + } + + exec.PutArray(e, b.Targets, targets) return nil } @@ -172,38 +168,38 @@ func (b *GraphNodeBuilder) NewHoldUntil() *HoldUntilNode { return node } -func (t *HoldUntilNode) SetSignal(s *dag.ValueVar) { +func (t *HoldUntilNode) SetSignal(s *dag.Var) { t.InputValues().EnsureSize(1) s.Connect(t, 0) } -func (t *HoldUntilNode) HoldStream(str *dag.StreamVar) *dag.StreamVar { +func (t *HoldUntilNode) HoldStream(str *dag.Var) *dag.Var { str.Connect(t, t.InputStreams().EnlargeOne()) - output := t.Graph().NewStreamVar() + output := t.Graph().NewVar() t.OutputStreams().SetupNew(t, output) return output } -func (t *HoldUntilNode) HoldVar(v *dag.ValueVar) *dag.ValueVar { +func (t *HoldUntilNode) HoldVar(v *dag.Var) *dag.Var { v.Connect(t, t.InputValues().EnlargeOne()) - output := t.Graph().NewValueVar(v.Type) + output := t.Graph().NewVar() t.OutputValues().SetupNew(t, output) return output } func (t *HoldUntilNode) GenerateOp() (exec.Op, error) { o := &HoldUntil{ - Waits: []*exec.SignalVar{t.InputValues().Get(0).Var.(*exec.SignalVar)}, + Waits: []exec.VarID{t.InputValues().Get(0).VarID}, } for i := 0; i < t.OutputValues().Len(); i++ { - o.Holds = append(o.Holds, t.InputValues().Get(i+1).Var) - o.Emits = append(o.Emits, t.OutputValues().Get(i).Var) + o.Holds = append(o.Holds, t.InputValues().Get(i+1).VarID) + o.Emits = append(o.Emits, t.OutputValues().Get(i).VarID) } for i := 0; i < t.OutputStreams().Len(); i++ { - o.Holds = append(o.Holds, t.InputStreams().Get(i).Var) - o.Emits = append(o.Emits, t.OutputStreams().Get(i).Var) + o.Holds = append(o.Holds, t.InputStreams().Get(i).VarID) + o.Emits = append(o.Emits, t.OutputStreams().Get(i).VarID) } return o, nil diff --git a/pkgs/ioswitch/plan/ops/var.go b/pkgs/ioswitch/plan/ops/var.go index 86c8b79..962ec4c 100644 --- a/pkgs/ioswitch/plan/ops/var.go +++ b/pkgs/ioswitch/plan/ops/var.go @@ -9,11 +9,12 @@ func init() { } type ConstVar struct { - Var *exec.StringVar `json:"var"` + ID exec.VarID `json:"id"` + Value exec.VarValue `json:"value"` } func (o *ConstVar) Execute(ctx *exec.ExecContext, e *exec.Executor) error { - e.PutVars(o.Var) + e.PutVar(o.ID, o.Value) return nil } diff --git a/pkgs/ioswitch/utils/utils.go b/pkgs/ioswitch/utils/utils.go index 2db1ff6..61d1756 100644 --- a/pkgs/ioswitch/utils/utils.go +++ b/pkgs/ioswitch/utils/utils.go @@ -7,10 +7,10 @@ import ( "gitlink.org.cn/cloudream/common/pkgs/ioswitch/exec" ) -func FormatVarIDs[T exec.Var](arr []T) string { +func FormatVarIDs(arr []exec.VarID) string { sb := strings.Builder{} for i, v := range arr { - sb.WriteString(fmt.Sprintf("%v", v.GetID())) + sb.WriteString(fmt.Sprintf("%v", v)) if i < len(arr)-1 { sb.WriteString(",") } diff --git a/sdks/storage/cdsapi/hub_io.go b/sdks/storage/cdsapi/hub_io.go index 7711db7..76a63f3 100644 --- a/sdks/storage/cdsapi/hub_io.go +++ b/sdks/storage/cdsapi/hub_io.go @@ -15,24 +15,28 @@ import ( "gitlink.org.cn/cloudream/common/utils/serder" ) +// TODO2 重新梳理代码 + const GetStreamPath = "/hubIO/getStream" type GetStreamReq struct { - PlanID exec.PlanID `json:"planID"` - VarID exec.VarID `json:"varID"` - Signal *exec.SignalVar `json:"signal"` + PlanID exec.PlanID `json:"planID"` + VarID exec.VarID `json:"varID"` + SignalID exec.VarID `json:"signalID"` + Signal exec.VarValue `json:"signal"` } -func (c *Client) GetStream(planID exec.PlanID, id exec.VarID, signal *exec.SignalVar) (io.ReadCloser, error) { +func (c *Client) GetStream(planID exec.PlanID, id exec.VarID, signalID exec.VarID, signal exec.VarValue) (io.ReadCloser, error) { targetUrl, err := url.JoinPath(c.baseURL, GetStreamPath) if err != nil { return nil, err } req := &GetStreamReq{ - PlanID: planID, - VarID: id, - Signal: signal, + PlanID: planID, + VarID: id, + SignalID: signalID, + Signal: signal, } resp, err := http2.GetJSON(targetUrl, http2.RequestParam{ @@ -154,19 +158,21 @@ func (c *Client) ExecuteIOPlan(plan exec.Plan) error { const SendVarPath = "/hubIO/sendVar" type SendVarReq struct { - PlanID exec.PlanID `json:"planID"` - Var exec.Var `json:"var"` + PlanID exec.PlanID `json:"planID"` + VarID exec.VarID `json:"varID"` + VarValue exec.VarValue `json:"varValue"` } -func (c *Client) SendVar(id exec.PlanID, v exec.Var) error { +func (c *Client) SendVar(planID exec.PlanID, id exec.VarID, value exec.VarValue) error { targetUrl, err := url.JoinPath(c.baseURL, SendVarPath) if err != nil { return err } req := &SendVarReq{ - PlanID: id, - Var: v, + PlanID: planID, + VarID: id, + VarValue: value, } resp, err := http2.PostJSON(targetUrl, http2.RequestParam{ @@ -196,35 +202,38 @@ func (c *Client) SendVar(id exec.PlanID, v exec.Var) error { const GetVarPath = "/hubIO/getVar" type GetVarReq struct { - PlanID exec.PlanID `json:"planID"` - Var exec.Var `json:"var"` - Signal *exec.SignalVar `json:"signal"` + PlanID exec.PlanID `json:"planID"` + VarID exec.VarID `json:"varID"` + SignalID exec.VarID `json:"signalID"` + Signal exec.VarValue `json:"signal"` } -func (c *Client) GetVar(id exec.PlanID, v exec.Var, signal *exec.SignalVar) error { +func (c *Client) GetVar(id exec.PlanID, varID exec.VarID, singalID exec.VarID, signal exec.VarValue) (exec.VarValue, error) { targetUrl, err := url.JoinPath(c.baseURL, GetVarPath) if err != nil { - return err + return nil, err } req := &GetVarReq{ - PlanID: id, - Var: v, - Signal: signal, + PlanID: id, + VarID: varID, + SignalID: singalID, + Signal: signal, } resp, err := http2.GetJSON(targetUrl, http2.RequestParam{ Body: req, }) if err != nil { - return err + return nil, err } + body, _ := io.ReadAll(resp.Body) + if resp.StatusCode != http.StatusOK { // 读取错误信息 - body, _ := io.ReadAll(resp.Body) resp.Body.Close() - return fmt.Errorf("error response from server: %s", string(body)) + return nil, fmt.Errorf("error response from server: %s", string(body)) } return nil