Browse Source

增加ioswitch的op

gitlink
Sydonian 1 year ago
parent
commit
d21ab8b0c2
11 changed files with 585 additions and 131 deletions
  1. +7
    -0
      common/pkgs/ec/multiply.go
  2. +8
    -0
      common/pkgs/ioswitch/ioswitch.go
  3. +33
    -0
      common/pkgs/ioswitch/ops/chunked.go
  4. +0
    -45
      common/pkgs/ioswitch/ops/chunked_join.go
  5. +90
    -0
      common/pkgs/ioswitch/ops/ec.go
  6. +135
    -0
      common/pkgs/ioswitch/ops/sync.go
  7. +20
    -0
      common/pkgs/ioswitch/ops/var.go
  8. +253
    -77
      common/pkgs/ioswitch/plans/agent.go
  9. +30
    -9
      common/pkgs/ioswitch/plans/executor.go
  10. +8
    -0
      common/pkgs/ioswitch/plans/plan_builder.go
  11. +1
    -0
      common/pkgs/ioswitch/utils.go

+ 7
- 0
common/pkgs/ec/multiply.go View File

@@ -0,0 +1,7 @@
package ec

import "github.com/klauspost/reedsolomon"

func GaloisMultiplier() *reedsolomon.MultipilerBuilder {
return &reedsolomon.MultipilerBuilder{}
}

+ 8
- 0
common/pkgs/ioswitch/ioswitch.go View File

@@ -54,6 +54,14 @@ func (v *StringVar) GetID() VarID {
return v.ID return v.ID
} }


type SignalVar struct {
ID VarID `json:"id"`
}

func (v *SignalVar) GetID() VarID {
return v.ID
}

type Op interface { type Op interface {
Execute(ctx context.Context, sw *Switch) error Execute(ctx context.Context, sw *Switch) error
} }

common/pkgs/ioswitch/ops/chunked_split.go → common/pkgs/ioswitch/ops/chunked.go View File

@@ -4,6 +4,7 @@ import (
"context" "context"
"io" "io"


"gitlink.org.cn/cloudream/common/pkgs/future"
"gitlink.org.cn/cloudream/common/utils/io2" "gitlink.org.cn/cloudream/common/utils/io2"
"gitlink.org.cn/cloudream/storage/common/pkgs/ioswitch" "gitlink.org.cn/cloudream/storage/common/pkgs/ioswitch"
"golang.org/x/sync/semaphore" "golang.org/x/sync/semaphore"
@@ -40,6 +41,38 @@ func (o *ChunkedSplit) Execute(ctx context.Context, sw *ioswitch.Switch) error {
return sem.Acquire(ctx, int64(len(outputs))) return sem.Acquire(ctx, int64(len(outputs)))
} }


type ChunkedJoin struct {
Inputs []*ioswitch.StreamVar `json:"inputs"`
Output *ioswitch.StreamVar `json:"output"`
ChunkSize int `json:"chunkSize"`
}

func (o *ChunkedJoin) Execute(ctx context.Context, sw *ioswitch.Switch) error {
err := ioswitch.BindArrayVars(sw, ctx, o.Inputs)
if err != nil {
return err
}

var strReaders []io.Reader
for _, s := range o.Inputs {
strReaders = append(strReaders, s.Stream)
}
defer func() {
for _, str := range o.Inputs {
str.Stream.Close()
}
}()

fut := future.NewSetVoid()
o.Output.Stream = io2.AfterReadClosedOnce(io2.ChunkedJoin(strReaders, o.ChunkSize), func(closer io.ReadCloser) {
fut.SetVoid()
})
sw.PutVars(o.Output)

return fut.Wait(ctx)
}

func init() { func init() {
OpUnion.AddT((*ChunkedSplit)(nil)) OpUnion.AddT((*ChunkedSplit)(nil))
OpUnion.AddT((*ChunkedJoin)(nil))
} }

+ 0
- 45
common/pkgs/ioswitch/ops/chunked_join.go View File

@@ -1,45 +0,0 @@
package ops

import (
"context"
"io"

"gitlink.org.cn/cloudream/common/pkgs/future"
"gitlink.org.cn/cloudream/common/utils/io2"
"gitlink.org.cn/cloudream/storage/common/pkgs/ioswitch"
)

type ChunkedJoin struct {
Inputs []*ioswitch.StreamVar `json:"inputs"`
Output *ioswitch.StreamVar `json:"output"`
ChunkSize int `json:"chunkSize"`
}

func (o *ChunkedJoin) Execute(ctx context.Context, sw *ioswitch.Switch) error {
err := ioswitch.BindArrayVars(sw, ctx, o.Inputs)
if err != nil {
return err
}

var strReaders []io.Reader
for _, s := range o.Inputs {
strReaders = append(strReaders, s.Stream)
}
defer func() {
for _, str := range o.Inputs {
str.Stream.Close()
}
}()

fut := future.NewSetVoid()
o.Output.Stream = io2.AfterReadClosedOnce(io2.ChunkedJoin(strReaders, o.ChunkSize), func(closer io.ReadCloser) {
fut.SetVoid()
})
sw.PutVars(o.Output)

return fut.Wait(ctx)
}

func init() {
OpUnion.AddT((*ChunkedJoin)(nil))
}

+ 90
- 0
common/pkgs/ioswitch/ops/ec.go View File

@@ -5,8 +5,10 @@ import (
"fmt" "fmt"
"io" "io"


"gitlink.org.cn/cloudream/common/pkgs/future"
cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" cdssdk "gitlink.org.cn/cloudream/common/sdks/storage"
"gitlink.org.cn/cloudream/common/utils/io2" "gitlink.org.cn/cloudream/common/utils/io2"
"gitlink.org.cn/cloudream/common/utils/sync2"
"gitlink.org.cn/cloudream/storage/common/pkgs/ec" "gitlink.org.cn/cloudream/storage/common/pkgs/ec"
"gitlink.org.cn/cloudream/storage/common/pkgs/ioswitch" "gitlink.org.cn/cloudream/storage/common/pkgs/ioswitch"
"golang.org/x/sync/semaphore" "golang.org/x/sync/semaphore"
@@ -99,7 +101,95 @@ func (o *ECReconstruct) Execute(ctx context.Context, sw *ioswitch.Switch) error
return sem.Acquire(ctx, int64(len(o.Outputs))) return sem.Acquire(ctx, int64(len(o.Outputs)))
} }


type ECMultiply struct {
Inputs []*ioswitch.StreamVar `json:"inputs"`
Coef [][]byte `json:"coef"`
Outputs []*ioswitch.StreamVar `json:"outputs"`
ChunkSize int64 `json:"chunkSize"`
}

func (o *ECMultiply) Execute(ctx context.Context, sw *ioswitch.Switch) error {
err := ioswitch.BindArrayVars(sw, ctx, o.Inputs)
if err != nil {
return err
}
defer func() {
for _, s := range o.Inputs {
s.Stream.Close()
}
}()

outputVars := make([]*ioswitch.StreamVar, len(o.Outputs))
outputWrs := make([]*io.PipeWriter, len(o.Outputs))

for i := range o.Outputs {
rd, wr := io.Pipe()
outputVars[i] = &ioswitch.StreamVar{
Stream: rd,
}
outputWrs[i] = wr
}

fut := future.NewSetVoid()
go func() {
mul := ec.GaloisMultiplier().BuildGalois()

inputChunks := make([][]byte, len(o.Inputs))
for i := range o.Inputs {
inputChunks[i] = make([]byte, o.ChunkSize)
}
outputChunks := make([][]byte, len(o.Outputs))
for i := range o.Outputs {
outputChunks[i] = make([]byte, o.ChunkSize)
}

for {
err := sync2.ParallelDo(o.Inputs, func(s *ioswitch.StreamVar, i int) error {
_, err := io.ReadFull(s.Stream, inputChunks[i])
return err
})
if err == io.EOF {
fut.SetVoid()
return
}
if err != nil {
fut.SetError(err)
return
}

err = mul.Multiply(o.Coef, inputChunks, outputChunks)
if err != nil {
fut.SetError(err)
return
}

for i := range o.Outputs {
err := io2.WriteAll(outputWrs[i], outputChunks[i])
if err != nil {
fut.SetError(err)
return
}
}
}
}()

ioswitch.PutArrayVars(sw, outputVars)
err = fut.Wait(ctx)
if err != nil {
for _, wr := range outputWrs {
wr.CloseWithError(err)
}
return err
}

for _, wr := range outputWrs {
wr.Close()
}
return nil
}

func init() { func init() {
OpUnion.AddT((*ECReconstructAny)(nil)) OpUnion.AddT((*ECReconstructAny)(nil))
OpUnion.AddT((*ECReconstruct)(nil)) OpUnion.AddT((*ECReconstruct)(nil))
OpUnion.AddT((*ECMultiply)(nil))
} }

+ 135
- 0
common/pkgs/ioswitch/ops/sync.go View File

@@ -0,0 +1,135 @@
package ops

import (
"context"
"fmt"
"io"

"gitlink.org.cn/cloudream/common/pkgs/future"
"gitlink.org.cn/cloudream/storage/common/pkgs/ioswitch"
)

type OnStreamBegin struct {
Raw *ioswitch.StreamVar `json:"raw"`
New *ioswitch.StreamVar `json:"new"`
Signal *ioswitch.SignalVar `json:"signal"`
}

func (o *OnStreamBegin) Execute(ctx context.Context, sw *ioswitch.Switch) error {
err := sw.BindVars(ctx, o.Raw)
if err != nil {
return err
}

o.New.Stream = o.Raw.Stream

sw.PutVars(o.New, o.Signal)
return nil
}

type OnStreamEnd struct {
Raw *ioswitch.StreamVar `json:"raw"`
New *ioswitch.StreamVar `json:"new"`
Signal *ioswitch.SignalVar `json:"signal"`
}

type onStreamEnd struct {
inner io.ReadCloser
callback *future.SetVoidFuture
}

func (o *onStreamEnd) Read(p []byte) (n int, err error) {
n, err = o.inner.Read(p)
if err == io.EOF {
o.callback.SetVoid()
} else if err != nil {
o.callback.SetError(err)
}
return n, err
}

func (o *onStreamEnd) Close() error {
o.callback.SetError(fmt.Errorf("stream closed early"))
return o.inner.Close()
}

func (o *OnStreamEnd) Execute(ctx context.Context, sw *ioswitch.Switch) error {
err := sw.BindVars(ctx, o.Raw)
if err != nil {
return err
}

cb := future.NewSetVoid()

o.New.Stream = &onStreamEnd{
inner: o.Raw.Stream,
callback: cb,
}
sw.PutVars(o.New)

err = cb.Wait(ctx)
if err != nil {
return err
}

sw.PutVars(o.Signal)
return nil
}

type HoldUntil struct {
Waits []*ioswitch.SignalVar `json:"waits"`
Holds []ioswitch.Var `json:"holds"`
Emits []ioswitch.Var `json:"emits"`
}

func (w *HoldUntil) Execute(ctx context.Context, sw *ioswitch.Switch) error {
err := sw.BindVars(ctx, w.Holds...)
if err != nil {
return err
}

err = ioswitch.BindArrayVars(sw, ctx, w.Waits)
if err != nil {
return err
}

sw.PutVars(w.Emits...)
return nil
}

type HangUntil struct {
Waits []*ioswitch.SignalVar `json:"waits"`
Op ioswitch.Op `json:"op"`
}

func (h *HangUntil) Execute(ctx context.Context, sw *ioswitch.Switch) error {
err := ioswitch.BindArrayVars(sw, ctx, h.Waits)
if err != nil {
return err
}

return h.Op.Execute(ctx, sw)
}

type Broadcast struct {
Source *ioswitch.SignalVar `json:"source"`
Targets []*ioswitch.SignalVar `json:"targets"`
}

func (b *Broadcast) Execute(ctx context.Context, sw *ioswitch.Switch) error {
err := sw.BindVars(ctx, b.Source)
if err != nil {
return err
}

ioswitch.PutArrayVars(sw, b.Targets)
return nil
}

func init() {
OpUnion.AddT((*OnStreamBegin)(nil))
OpUnion.AddT((*OnStreamEnd)(nil))
OpUnion.AddT((*HoldUntil)(nil))
OpUnion.AddT((*HangUntil)(nil))
OpUnion.AddT((*Broadcast)(nil))
}

+ 20
- 0
common/pkgs/ioswitch/ops/var.go View File

@@ -0,0 +1,20 @@
package ops

import (
"context"

"gitlink.org.cn/cloudream/storage/common/pkgs/ioswitch"
)

type ConstVar struct {
Var *ioswitch.StringVar `json:"var"`
}

func (o *ConstVar) Execute(ctx context.Context, sw *ioswitch.Switch) error {
sw.PutVars(o.Var)
return nil
}

func init() {
OpUnion.AddT((*ConstVar)(nil))
}

+ 253
- 77
common/pkgs/ioswitch/plans/agent.go View File

@@ -13,21 +13,6 @@ type AgentPlanBuilder struct {
ops []ioswitch.Op ops []ioswitch.Op
} }


type AgentStreamVar struct {
owner *AgentPlanBuilder
v *ioswitch.StreamVar
}

type AgentIntVar struct {
owner *AgentPlanBuilder
v *ioswitch.IntVar
}

type AgentStringVar struct {
owner *AgentPlanBuilder
v *ioswitch.StringVar
}

func (b *AgentPlanBuilder) IPFSRead(fileHash string, opts ...ipfs.ReadOption) *AgentStreamVar { func (b *AgentPlanBuilder) IPFSRead(fileHash string, opts ...ipfs.ReadOption) *AgentStreamVar {
opt := ipfs.ReadOption{ opt := ipfs.ReadOption{
Offset: 0, Offset: 0,
@@ -50,21 +35,6 @@ func (b *AgentPlanBuilder) IPFSRead(fileHash string, opts ...ipfs.ReadOption) *A


return str return str
} }

func (s *AgentStreamVar) IPFSWrite() *AgentStringVar {
v := s.owner.blder.newStringVar()

s.owner.ops = append(s.owner.ops, &ops.IPFSWrite{
Input: s.v,
FileHash: v,
})

return &AgentStringVar{
owner: s.owner,
v: v,
}
}

func (b *AgentPlanBuilder) FileRead(filePath string) *AgentStreamVar { func (b *AgentPlanBuilder) FileRead(filePath string) *AgentStreamVar {
agtStr := &AgentStreamVar{ agtStr := &AgentStreamVar{
owner: b, owner: b,
@@ -79,13 +49,6 @@ func (b *AgentPlanBuilder) FileRead(filePath string) *AgentStreamVar {
return agtStr return agtStr
} }


func (b *AgentStreamVar) FileWrite(filePath string) {
b.owner.ops = append(b.owner.ops, &ops.FileWrite{
Input: b.v,
FilePath: filePath,
})
}

func (b *AgentPlanBuilder) ECReconstructAny(ec cdssdk.ECRedundancy, inBlockIndexes []int, outBlockIndexes []int, streams []*AgentStreamVar) []*AgentStreamVar { func (b *AgentPlanBuilder) ECReconstructAny(ec cdssdk.ECRedundancy, inBlockIndexes []int, outBlockIndexes []int, streams []*AgentStreamVar) []*AgentStreamVar {
var strs []*AgentStreamVar var strs []*AgentStreamVar


@@ -143,6 +106,120 @@ func (b *AgentPlanBuilder) ECReconstruct(ec cdssdk.ECRedundancy, inBlockIndexes
return strs return strs
} }


// 进行galois矩阵乘法运算,ecof * inputs
func (b *AgentPlanBuilder) ECMultiply(coef [][]byte, inputs []*AgentStreamVar, chunkSize int64) []*AgentStreamVar {
outs := make([]*AgentStreamVar, len(coef))
outVars := make([]*ioswitch.StreamVar, len(coef))
for i := 0; i < len(outs); i++ {
sv := b.blder.newStreamVar()
outs[i] = &AgentStreamVar{
owner: b,
v: sv,
}
outVars[i] = sv
}

ins := make([]*ioswitch.StreamVar, len(inputs))
for i := 0; i < len(inputs); i++ {
ins[i] = inputs[i].v
}

b.ops = append(b.ops, &ops.ECMultiply{
Inputs: ins,
Outputs: outVars,
Coef: coef,
ChunkSize: chunkSize,
})

return outs
}

func (b *AgentPlanBuilder) Join(length int64, streams []*AgentStreamVar) *AgentStreamVar {
agtStr := &AgentStreamVar{
owner: b,
v: b.blder.newStreamVar(),
}

var inputStrVars []*ioswitch.StreamVar
for _, str := range streams {
inputStrVars = append(inputStrVars, str.v)
}

b.ops = append(b.ops, &ops.Join{
Inputs: inputStrVars,
Output: agtStr.v,
Length: length,
})

return agtStr
}

func (b *AgentPlanBuilder) ChunkedJoin(chunkSize int, streams []*AgentStreamVar) *AgentStreamVar {
agtStr := &AgentStreamVar{
owner: b,
v: b.blder.newStreamVar(),
}

var inputStrVars []*ioswitch.StreamVar
for _, str := range streams {
inputStrVars = append(inputStrVars, str.v)
}

b.ops = append(b.ops, &ops.ChunkedJoin{
Inputs: inputStrVars,
Output: agtStr.v,
ChunkSize: chunkSize,
})

return agtStr
}

func (b *AgentPlanBuilder) NewString(str string) *AgentStringVar {
v := b.blder.newStringVar()
v.Value = str

return &AgentStringVar{
owner: b,
v: v,
}
}

func (b *AgentPlanBuilder) NewSignal() *AgentSignalVar {
v := b.blder.newSignalVar()

return &AgentSignalVar{
owner: b,
v: v,
}
}

// 字节流变量
type AgentStreamVar struct {
owner *AgentPlanBuilder
v *ioswitch.StreamVar
}

func (s *AgentStreamVar) IPFSWrite() *AgentStringVar {
v := s.owner.blder.newStringVar()

s.owner.ops = append(s.owner.ops, &ops.IPFSWrite{
Input: s.v,
FileHash: v,
})

return &AgentStringVar{
owner: s.owner,
v: v,
}
}

func (b *AgentStreamVar) FileWrite(filePath string) {
b.owner.ops = append(b.owner.ops, &ops.FileWrite{
Input: b.v,
FilePath: filePath,
})
}

func (b *AgentStreamVar) ChunkedSplit(chunkSize int, streamCount int, paddingZeros bool) []*AgentStreamVar { func (b *AgentStreamVar) ChunkedSplit(chunkSize int, streamCount int, paddingZeros bool) []*AgentStreamVar {
var strs []*AgentStreamVar var strs []*AgentStreamVar


@@ -200,46 +277,6 @@ func (s *AgentStreamVar) ToExecutor() *ExecutorStreamVar {
} }
} }


func (b *AgentPlanBuilder) Join(length int64, streams []*AgentStreamVar) *AgentStreamVar {
agtStr := &AgentStreamVar{
owner: b,
v: b.blder.newStreamVar(),
}

var inputStrVars []*ioswitch.StreamVar
for _, str := range streams {
inputStrVars = append(inputStrVars, str.v)
}

b.ops = append(b.ops, &ops.Join{
Inputs: inputStrVars,
Output: agtStr.v,
Length: length,
})

return agtStr
}

func (b *AgentPlanBuilder) ChunkedJoin(chunkSize int, streams []*AgentStreamVar) *AgentStreamVar {
agtStr := &AgentStreamVar{
owner: b,
v: b.blder.newStreamVar(),
}

var inputStrVars []*ioswitch.StreamVar
for _, str := range streams {
inputStrVars = append(inputStrVars, str.v)
}

b.ops = append(b.ops, &ops.ChunkedJoin{
Inputs: inputStrVars,
Output: agtStr.v,
ChunkSize: chunkSize,
})

return agtStr
}

func (s *AgentStreamVar) Clone(cnt int) []*AgentStreamVar { func (s *AgentStreamVar) Clone(cnt int) []*AgentStreamVar {
var strs []*AgentStreamVar var strs []*AgentStreamVar


@@ -261,6 +298,49 @@ func (s *AgentStreamVar) Clone(cnt int) []*AgentStreamVar {
return strs return strs
} }


// 当流产生时发送一个信号
func (v *AgentStreamVar) OnBegin() (*AgentStreamVar, *AgentSignalVar) {
ns := v.owner.blder.newStreamVar()
s := v.owner.blder.newSignalVar()

v.owner.ops = append(v.owner.ops, &ops.OnStreamBegin{
Raw: v.v,
New: ns,
Signal: s,
})
return &AgentStreamVar{owner: v.owner, v: ns}, &AgentSignalVar{owner: v.owner, v: s}
}

// 当流结束时发送一个信号
func (v *AgentStreamVar) OnEnd() (*AgentStreamVar, *AgentSignalVar) {
ns := v.owner.blder.newStreamVar()
s := v.owner.blder.newSignalVar()

v.owner.ops = append(v.owner.ops, &ops.OnStreamEnd{
Raw: v.v,
New: ns,
Signal: s,
})
return &AgentStreamVar{owner: v.owner, v: ns}, &AgentSignalVar{owner: v.owner, v: s}
}

// 将此流暂存,直到一个信号产生后才释放(一个新流)
func (v *AgentStreamVar) HoldUntil(wait *AgentSignalVar) *AgentStreamVar {
nv := v.owner.blder.newStreamVar()
v.owner.ops = append(v.owner.ops, &ops.HoldUntil{
Waits: []*ioswitch.SignalVar{wait.v},
Holds: []ioswitch.Var{v.v},
Emits: []ioswitch.Var{nv},
})
return &AgentStreamVar{owner: v.owner, v: nv}
}

// 字符串变量
type AgentStringVar struct {
owner *AgentPlanBuilder
v *ioswitch.StringVar
}

func (v *AgentStringVar) To(node cdssdk.Node) *AgentStringVar { func (v *AgentStringVar) To(node cdssdk.Node) *AgentStringVar {
v.owner.ops = append(v.owner.ops, &ops.SendVar{Var: v.v, Node: node}) v.owner.ops = append(v.owner.ops, &ops.SendVar{Var: v.v, Node: node})
v.owner = v.owner.blder.AtAgent(node) v.owner = v.owner.blder.AtAgent(node)
@@ -279,3 +359,99 @@ func (v *AgentStringVar) ToExecutor() *ExecutorStringVar {
v: v.v, v: v.v,
} }
} }

func (v *AgentStringVar) Clone() (*AgentStringVar, *AgentStringVar) {
c1 := v.owner.blder.newStringVar()
c2 := v.owner.blder.newStringVar()

v.owner.ops = append(v.owner.ops, &ops.CloneVar{
Raw: v.v,
Cloneds: []ioswitch.Var{c1, c2},
})

return &AgentStringVar{owner: v.owner, v: c1}, &AgentStringVar{owner: v.owner, v: c2}
}

// 返回cnt+1个复制后的变量
func (v *AgentStringVar) CloneN(cnt int) []*AgentStringVar {
var strs []*AgentStringVar
var cloned []ioswitch.Var
for i := 0; i < cnt+1; i++ {
c := v.owner.blder.newStringVar()
strs = append(strs, &AgentStringVar{
owner: v.owner,
v: c,
})
cloned = append(cloned, c)
}

v.owner.ops = append(v.owner.ops, &ops.CloneVar{
Raw: v.v,
Cloneds: cloned,
})

return strs
}

// 将此变量暂存,直到一个信号产生后才释放(一个新变量)
func (v *AgentStringVar) HoldUntil(wait *AgentSignalVar) *AgentStringVar {
nv := v.owner.blder.newStringVar()
v.owner.ops = append(v.owner.ops, &ops.HoldUntil{
Waits: []*ioswitch.SignalVar{wait.v},
Holds: []ioswitch.Var{v.v},
Emits: []ioswitch.Var{nv},
})
return &AgentStringVar{owner: v.owner, v: nv}
}

type AgentIntVar struct {
owner *AgentPlanBuilder
v *ioswitch.IntVar
}

// 信号变量
type AgentSignalVar struct {
owner *AgentPlanBuilder
v *ioswitch.SignalVar
}

func (v *AgentSignalVar) To(node cdssdk.Node) *AgentSignalVar {
v.owner.ops = append(v.owner.ops, &ops.SendVar{Var: v.v, Node: node})
v.owner = v.owner.blder.AtAgent(node)

return v
}

func (v *AgentSignalVar) ToExecutor() *ExecutorSignalVar {
v.owner.blder.executorPlan.ops = append(v.owner.blder.executorPlan.ops, &ops.GetVar{
Var: v.v,
Node: v.owner.node,
})

return &ExecutorSignalVar{
blder: v.owner.blder,
v: v.v,
}
}

// 当这个信号被产生时,同时产生另外n个信号
func (v *AgentSignalVar) Broadcast(cnt int) []*AgentSignalVar {
var ss []*AgentSignalVar
var targets []*ioswitch.SignalVar

for i := 0; i < cnt; i++ {
c := v.owner.blder.newSignalVar()
ss = append(ss, &AgentSignalVar{
owner: v.owner,
v: c,
})
targets = append(targets, c)
}

v.owner.ops = append(v.owner.ops, &ops.Broadcast{
Source: v.v,
Targets: targets,
})

return ss
}

+ 30
- 9
common/pkgs/ioswitch/plans/executor.go View File

@@ -36,6 +36,10 @@ func (e *Executor) BeginRead(target ExecutorReadStream) (io.ReadCloser, error) {
return target.stream.Stream, nil return target.stream.Stream, nil
} }


func (e *Executor) Signal(signal ExecutorSignalVar) {
e.executorSw.PutVars(signal.v)
}

func (e *Executor) Wait(ctx context.Context) (map[string]any, error) { func (e *Executor) Wait(ctx context.Context) (map[string]any, error) {
err := e.callback.Wait(ctx) err := e.callback.Wait(ctx)
if err != nil { if err != nil {
@@ -105,12 +109,6 @@ type ExecutorStreamVar struct {
blder *PlanBuilder blder *PlanBuilder
v *ioswitch.StreamVar v *ioswitch.StreamVar
} }

type ExecutorStringVar struct {
blder *PlanBuilder
v *ioswitch.StringVar
}

type ExecutorWriteStream struct { type ExecutorWriteStream struct {
stream *ioswitch.StreamVar stream *ioswitch.StreamVar
} }
@@ -120,6 +118,11 @@ func (b *ExecutorPlanBuilder) WillWrite() (ExecutorWriteStream, *ExecutorStreamV
return ExecutorWriteStream{stream}, &ExecutorStreamVar{blder: b.blder, v: stream} return ExecutorWriteStream{stream}, &ExecutorStreamVar{blder: b.blder, v: stream}
} }


func (b *ExecutorPlanBuilder) WillSignal() *ExecutorSignalVar {
s := b.blder.newSignalVar()
return &ExecutorSignalVar{blder: b.blder, v: s}
}

type ExecutorReadStream struct { type ExecutorReadStream struct {
stream *ioswitch.StreamVar stream *ioswitch.StreamVar
} }
@@ -128,6 +131,19 @@ func (v *ExecutorStreamVar) WillRead() ExecutorReadStream {
return ExecutorReadStream{v.v} return ExecutorReadStream{v.v}
} }


func (s *ExecutorStreamVar) To(node cdssdk.Node) *AgentStreamVar {
s.blder.executorPlan.ops = append(s.blder.executorPlan.ops, &ops.SendStream{Stream: s.v, Node: node})
return &AgentStreamVar{
owner: s.blder.AtAgent(node),
v: s.v,
}
}

type ExecutorStringVar struct {
blder *PlanBuilder
v *ioswitch.StringVar
}

func (s *ExecutorStringVar) Store(key string) { func (s *ExecutorStringVar) Store(key string) {
s.blder.executorPlan.ops = append(s.blder.executorPlan.ops, &ops.Store{ s.blder.executorPlan.ops = append(s.blder.executorPlan.ops, &ops.Store{
Var: s.v, Var: s.v,
@@ -136,9 +152,14 @@ func (s *ExecutorStringVar) Store(key string) {
}) })
} }


func (s *ExecutorStreamVar) To(node cdssdk.Node) *AgentStreamVar {
s.blder.executorPlan.ops = append(s.blder.executorPlan.ops, &ops.SendStream{Stream: s.v, Node: node})
return &AgentStreamVar{
type ExecutorSignalVar struct {
blder *PlanBuilder
v *ioswitch.SignalVar
}

func (s *ExecutorSignalVar) To(node cdssdk.Node) *AgentSignalVar {
s.blder.executorPlan.ops = append(s.blder.executorPlan.ops, &ops.SendVar{Var: s.v, Node: node})
return &AgentSignalVar{
owner: s.blder.AtAgent(node), owner: s.blder.AtAgent(node),
v: s.v, v: s.v,
} }


+ 8
- 0
common/pkgs/ioswitch/plans/plan_builder.go View File

@@ -91,3 +91,11 @@ func (b *PlanBuilder) newStringVar() *ioswitch.StringVar {


return v return v
} }
func (b *PlanBuilder) newSignalVar() *ioswitch.SignalVar {
v := &ioswitch.SignalVar{
ID: ioswitch.VarID(len(b.vars)),
}
b.vars = append(b.vars, v)

return v
}

+ 1
- 0
common/pkgs/ioswitch/utils.go View File

@@ -17,6 +17,7 @@ func AssignVar(from Var, to Var) error {
to.(*IntVar).Value = from.Value to.(*IntVar).Value = from.Value
case *StringVar: case *StringVar:
to.(*StringVar).Value = from.Value to.(*StringVar).Value = from.Value
case *SignalVar:
} }


return nil return nil


Loading…
Cancel
Save