From d21ab8b0c2510026f090521466f0bd4bc7a82790 Mon Sep 17 00:00:00 2001 From: Sydonian <794346190@qq.com> Date: Mon, 15 Jul 2024 09:28:08 +0800 Subject: [PATCH] =?UTF-8?q?=E5=A2=9E=E5=8A=A0ioswitch=E7=9A=84op?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- common/pkgs/ec/multiply.go | 7 + common/pkgs/ioswitch/ioswitch.go | 8 + .../ops/{chunked_split.go => chunked.go} | 33 ++ common/pkgs/ioswitch/ops/chunked_join.go | 45 --- common/pkgs/ioswitch/ops/ec.go | 90 +++++ common/pkgs/ioswitch/ops/sync.go | 135 +++++++ common/pkgs/ioswitch/ops/var.go | 20 ++ common/pkgs/ioswitch/plans/agent.go | 330 ++++++++++++++---- common/pkgs/ioswitch/plans/executor.go | 39 ++- common/pkgs/ioswitch/plans/plan_builder.go | 8 + common/pkgs/ioswitch/utils.go | 1 + 11 files changed, 585 insertions(+), 131 deletions(-) create mode 100644 common/pkgs/ec/multiply.go rename common/pkgs/ioswitch/ops/{chunked_split.go => chunked.go} (57%) delete mode 100644 common/pkgs/ioswitch/ops/chunked_join.go create mode 100644 common/pkgs/ioswitch/ops/sync.go create mode 100644 common/pkgs/ioswitch/ops/var.go diff --git a/common/pkgs/ec/multiply.go b/common/pkgs/ec/multiply.go new file mode 100644 index 0000000..eb624d3 --- /dev/null +++ b/common/pkgs/ec/multiply.go @@ -0,0 +1,7 @@ +package ec + +import "github.com/klauspost/reedsolomon" + +func GaloisMultiplier() *reedsolomon.MultipilerBuilder { + return &reedsolomon.MultipilerBuilder{} +} diff --git a/common/pkgs/ioswitch/ioswitch.go b/common/pkgs/ioswitch/ioswitch.go index 5b48e57..d49b89f 100644 --- a/common/pkgs/ioswitch/ioswitch.go +++ b/common/pkgs/ioswitch/ioswitch.go @@ -54,6 +54,14 @@ func (v *StringVar) GetID() VarID { return v.ID } +type SignalVar struct { + ID VarID `json:"id"` +} + +func (v *SignalVar) GetID() VarID { + return v.ID +} + type Op interface { Execute(ctx context.Context, sw *Switch) error } diff --git a/common/pkgs/ioswitch/ops/chunked_split.go b/common/pkgs/ioswitch/ops/chunked.go similarity index 57% rename from common/pkgs/ioswitch/ops/chunked_split.go rename to common/pkgs/ioswitch/ops/chunked.go index 9e3eb96..c1e6ab7 100644 --- a/common/pkgs/ioswitch/ops/chunked_split.go +++ b/common/pkgs/ioswitch/ops/chunked.go @@ -4,6 +4,7 @@ import ( "context" "io" + "gitlink.org.cn/cloudream/common/pkgs/future" "gitlink.org.cn/cloudream/common/utils/io2" "gitlink.org.cn/cloudream/storage/common/pkgs/ioswitch" "golang.org/x/sync/semaphore" @@ -40,6 +41,38 @@ func (o *ChunkedSplit) Execute(ctx context.Context, sw *ioswitch.Switch) error { return sem.Acquire(ctx, int64(len(outputs))) } +type ChunkedJoin struct { + Inputs []*ioswitch.StreamVar `json:"inputs"` + Output *ioswitch.StreamVar `json:"output"` + ChunkSize int `json:"chunkSize"` +} + +func (o *ChunkedJoin) Execute(ctx context.Context, sw *ioswitch.Switch) error { + err := ioswitch.BindArrayVars(sw, ctx, o.Inputs) + if err != nil { + return err + } + + var strReaders []io.Reader + for _, s := range o.Inputs { + strReaders = append(strReaders, s.Stream) + } + defer func() { + for _, str := range o.Inputs { + str.Stream.Close() + } + }() + + fut := future.NewSetVoid() + o.Output.Stream = io2.AfterReadClosedOnce(io2.ChunkedJoin(strReaders, o.ChunkSize), func(closer io.ReadCloser) { + fut.SetVoid() + }) + sw.PutVars(o.Output) + + return fut.Wait(ctx) +} + func init() { OpUnion.AddT((*ChunkedSplit)(nil)) + OpUnion.AddT((*ChunkedJoin)(nil)) } diff --git a/common/pkgs/ioswitch/ops/chunked_join.go b/common/pkgs/ioswitch/ops/chunked_join.go deleted file mode 100644 index bb8942b..0000000 --- a/common/pkgs/ioswitch/ops/chunked_join.go +++ /dev/null @@ -1,45 +0,0 @@ -package ops - -import ( - "context" - "io" - - "gitlink.org.cn/cloudream/common/pkgs/future" - "gitlink.org.cn/cloudream/common/utils/io2" - "gitlink.org.cn/cloudream/storage/common/pkgs/ioswitch" -) - -type ChunkedJoin struct { - Inputs []*ioswitch.StreamVar `json:"inputs"` - Output *ioswitch.StreamVar `json:"output"` - ChunkSize int `json:"chunkSize"` -} - -func (o *ChunkedJoin) Execute(ctx context.Context, sw *ioswitch.Switch) error { - err := ioswitch.BindArrayVars(sw, ctx, o.Inputs) - if err != nil { - return err - } - - var strReaders []io.Reader - for _, s := range o.Inputs { - strReaders = append(strReaders, s.Stream) - } - defer func() { - for _, str := range o.Inputs { - str.Stream.Close() - } - }() - - fut := future.NewSetVoid() - o.Output.Stream = io2.AfterReadClosedOnce(io2.ChunkedJoin(strReaders, o.ChunkSize), func(closer io.ReadCloser) { - fut.SetVoid() - }) - sw.PutVars(o.Output) - - return fut.Wait(ctx) -} - -func init() { - OpUnion.AddT((*ChunkedJoin)(nil)) -} diff --git a/common/pkgs/ioswitch/ops/ec.go b/common/pkgs/ioswitch/ops/ec.go index d47c94c..fbc197c 100644 --- a/common/pkgs/ioswitch/ops/ec.go +++ b/common/pkgs/ioswitch/ops/ec.go @@ -5,8 +5,10 @@ import ( "fmt" "io" + "gitlink.org.cn/cloudream/common/pkgs/future" cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" "gitlink.org.cn/cloudream/common/utils/io2" + "gitlink.org.cn/cloudream/common/utils/sync2" "gitlink.org.cn/cloudream/storage/common/pkgs/ec" "gitlink.org.cn/cloudream/storage/common/pkgs/ioswitch" "golang.org/x/sync/semaphore" @@ -99,7 +101,95 @@ func (o *ECReconstruct) Execute(ctx context.Context, sw *ioswitch.Switch) error return sem.Acquire(ctx, int64(len(o.Outputs))) } +type ECMultiply struct { + Inputs []*ioswitch.StreamVar `json:"inputs"` + Coef [][]byte `json:"coef"` + Outputs []*ioswitch.StreamVar `json:"outputs"` + ChunkSize int64 `json:"chunkSize"` +} + +func (o *ECMultiply) Execute(ctx context.Context, sw *ioswitch.Switch) error { + err := ioswitch.BindArrayVars(sw, ctx, o.Inputs) + if err != nil { + return err + } + defer func() { + for _, s := range o.Inputs { + s.Stream.Close() + } + }() + + outputVars := make([]*ioswitch.StreamVar, len(o.Outputs)) + outputWrs := make([]*io.PipeWriter, len(o.Outputs)) + + for i := range o.Outputs { + rd, wr := io.Pipe() + outputVars[i] = &ioswitch.StreamVar{ + Stream: rd, + } + outputWrs[i] = wr + } + + fut := future.NewSetVoid() + go func() { + mul := ec.GaloisMultiplier().BuildGalois() + + inputChunks := make([][]byte, len(o.Inputs)) + for i := range o.Inputs { + inputChunks[i] = make([]byte, o.ChunkSize) + } + outputChunks := make([][]byte, len(o.Outputs)) + for i := range o.Outputs { + outputChunks[i] = make([]byte, o.ChunkSize) + } + + for { + err := sync2.ParallelDo(o.Inputs, func(s *ioswitch.StreamVar, i int) error { + _, err := io.ReadFull(s.Stream, inputChunks[i]) + return err + }) + if err == io.EOF { + fut.SetVoid() + return + } + if err != nil { + fut.SetError(err) + return + } + + err = mul.Multiply(o.Coef, inputChunks, outputChunks) + if err != nil { + fut.SetError(err) + return + } + + for i := range o.Outputs { + err := io2.WriteAll(outputWrs[i], outputChunks[i]) + if err != nil { + fut.SetError(err) + return + } + } + } + }() + + ioswitch.PutArrayVars(sw, outputVars) + err = fut.Wait(ctx) + if err != nil { + for _, wr := range outputWrs { + wr.CloseWithError(err) + } + return err + } + + for _, wr := range outputWrs { + wr.Close() + } + return nil +} + func init() { OpUnion.AddT((*ECReconstructAny)(nil)) OpUnion.AddT((*ECReconstruct)(nil)) + OpUnion.AddT((*ECMultiply)(nil)) } diff --git a/common/pkgs/ioswitch/ops/sync.go b/common/pkgs/ioswitch/ops/sync.go new file mode 100644 index 0000000..c0889b4 --- /dev/null +++ b/common/pkgs/ioswitch/ops/sync.go @@ -0,0 +1,135 @@ +package ops + +import ( + "context" + "fmt" + "io" + + "gitlink.org.cn/cloudream/common/pkgs/future" + "gitlink.org.cn/cloudream/storage/common/pkgs/ioswitch" +) + +type OnStreamBegin struct { + Raw *ioswitch.StreamVar `json:"raw"` + New *ioswitch.StreamVar `json:"new"` + Signal *ioswitch.SignalVar `json:"signal"` +} + +func (o *OnStreamBegin) Execute(ctx context.Context, sw *ioswitch.Switch) error { + err := sw.BindVars(ctx, o.Raw) + if err != nil { + return err + } + + o.New.Stream = o.Raw.Stream + + sw.PutVars(o.New, o.Signal) + return nil +} + +type OnStreamEnd struct { + Raw *ioswitch.StreamVar `json:"raw"` + New *ioswitch.StreamVar `json:"new"` + Signal *ioswitch.SignalVar `json:"signal"` +} + +type onStreamEnd struct { + inner io.ReadCloser + callback *future.SetVoidFuture +} + +func (o *onStreamEnd) Read(p []byte) (n int, err error) { + n, err = o.inner.Read(p) + if err == io.EOF { + o.callback.SetVoid() + } else if err != nil { + o.callback.SetError(err) + } + return n, err +} + +func (o *onStreamEnd) Close() error { + o.callback.SetError(fmt.Errorf("stream closed early")) + return o.inner.Close() +} + +func (o *OnStreamEnd) Execute(ctx context.Context, sw *ioswitch.Switch) error { + err := sw.BindVars(ctx, o.Raw) + if err != nil { + return err + } + + cb := future.NewSetVoid() + + o.New.Stream = &onStreamEnd{ + inner: o.Raw.Stream, + callback: cb, + } + sw.PutVars(o.New) + + err = cb.Wait(ctx) + if err != nil { + return err + } + + sw.PutVars(o.Signal) + return nil +} + +type HoldUntil struct { + Waits []*ioswitch.SignalVar `json:"waits"` + Holds []ioswitch.Var `json:"holds"` + Emits []ioswitch.Var `json:"emits"` +} + +func (w *HoldUntil) Execute(ctx context.Context, sw *ioswitch.Switch) error { + err := sw.BindVars(ctx, w.Holds...) + if err != nil { + return err + } + + err = ioswitch.BindArrayVars(sw, ctx, w.Waits) + if err != nil { + return err + } + + sw.PutVars(w.Emits...) + return nil +} + +type HangUntil struct { + Waits []*ioswitch.SignalVar `json:"waits"` + Op ioswitch.Op `json:"op"` +} + +func (h *HangUntil) Execute(ctx context.Context, sw *ioswitch.Switch) error { + err := ioswitch.BindArrayVars(sw, ctx, h.Waits) + if err != nil { + return err + } + + return h.Op.Execute(ctx, sw) +} + +type Broadcast struct { + Source *ioswitch.SignalVar `json:"source"` + Targets []*ioswitch.SignalVar `json:"targets"` +} + +func (b *Broadcast) Execute(ctx context.Context, sw *ioswitch.Switch) error { + err := sw.BindVars(ctx, b.Source) + if err != nil { + return err + } + + ioswitch.PutArrayVars(sw, b.Targets) + return nil +} + +func init() { + OpUnion.AddT((*OnStreamBegin)(nil)) + OpUnion.AddT((*OnStreamEnd)(nil)) + OpUnion.AddT((*HoldUntil)(nil)) + OpUnion.AddT((*HangUntil)(nil)) + OpUnion.AddT((*Broadcast)(nil)) +} diff --git a/common/pkgs/ioswitch/ops/var.go b/common/pkgs/ioswitch/ops/var.go new file mode 100644 index 0000000..91f0456 --- /dev/null +++ b/common/pkgs/ioswitch/ops/var.go @@ -0,0 +1,20 @@ +package ops + +import ( + "context" + + "gitlink.org.cn/cloudream/storage/common/pkgs/ioswitch" +) + +type ConstVar struct { + Var *ioswitch.StringVar `json:"var"` +} + +func (o *ConstVar) Execute(ctx context.Context, sw *ioswitch.Switch) error { + sw.PutVars(o.Var) + return nil +} + +func init() { + OpUnion.AddT((*ConstVar)(nil)) +} diff --git a/common/pkgs/ioswitch/plans/agent.go b/common/pkgs/ioswitch/plans/agent.go index a5ff32a..1243bae 100644 --- a/common/pkgs/ioswitch/plans/agent.go +++ b/common/pkgs/ioswitch/plans/agent.go @@ -13,21 +13,6 @@ type AgentPlanBuilder struct { ops []ioswitch.Op } -type AgentStreamVar struct { - owner *AgentPlanBuilder - v *ioswitch.StreamVar -} - -type AgentIntVar struct { - owner *AgentPlanBuilder - v *ioswitch.IntVar -} - -type AgentStringVar struct { - owner *AgentPlanBuilder - v *ioswitch.StringVar -} - func (b *AgentPlanBuilder) IPFSRead(fileHash string, opts ...ipfs.ReadOption) *AgentStreamVar { opt := ipfs.ReadOption{ Offset: 0, @@ -50,21 +35,6 @@ func (b *AgentPlanBuilder) IPFSRead(fileHash string, opts ...ipfs.ReadOption) *A return str } - -func (s *AgentStreamVar) IPFSWrite() *AgentStringVar { - v := s.owner.blder.newStringVar() - - s.owner.ops = append(s.owner.ops, &ops.IPFSWrite{ - Input: s.v, - FileHash: v, - }) - - return &AgentStringVar{ - owner: s.owner, - v: v, - } -} - func (b *AgentPlanBuilder) FileRead(filePath string) *AgentStreamVar { agtStr := &AgentStreamVar{ owner: b, @@ -79,13 +49,6 @@ func (b *AgentPlanBuilder) FileRead(filePath string) *AgentStreamVar { return agtStr } -func (b *AgentStreamVar) FileWrite(filePath string) { - b.owner.ops = append(b.owner.ops, &ops.FileWrite{ - Input: b.v, - FilePath: filePath, - }) -} - func (b *AgentPlanBuilder) ECReconstructAny(ec cdssdk.ECRedundancy, inBlockIndexes []int, outBlockIndexes []int, streams []*AgentStreamVar) []*AgentStreamVar { var strs []*AgentStreamVar @@ -143,6 +106,120 @@ func (b *AgentPlanBuilder) ECReconstruct(ec cdssdk.ECRedundancy, inBlockIndexes return strs } +// 进行galois矩阵乘法运算,ecof * inputs +func (b *AgentPlanBuilder) ECMultiply(coef [][]byte, inputs []*AgentStreamVar, chunkSize int64) []*AgentStreamVar { + outs := make([]*AgentStreamVar, len(coef)) + outVars := make([]*ioswitch.StreamVar, len(coef)) + for i := 0; i < len(outs); i++ { + sv := b.blder.newStreamVar() + outs[i] = &AgentStreamVar{ + owner: b, + v: sv, + } + outVars[i] = sv + } + + ins := make([]*ioswitch.StreamVar, len(inputs)) + for i := 0; i < len(inputs); i++ { + ins[i] = inputs[i].v + } + + b.ops = append(b.ops, &ops.ECMultiply{ + Inputs: ins, + Outputs: outVars, + Coef: coef, + ChunkSize: chunkSize, + }) + + return outs +} + +func (b *AgentPlanBuilder) Join(length int64, streams []*AgentStreamVar) *AgentStreamVar { + agtStr := &AgentStreamVar{ + owner: b, + v: b.blder.newStreamVar(), + } + + var inputStrVars []*ioswitch.StreamVar + for _, str := range streams { + inputStrVars = append(inputStrVars, str.v) + } + + b.ops = append(b.ops, &ops.Join{ + Inputs: inputStrVars, + Output: agtStr.v, + Length: length, + }) + + return agtStr +} + +func (b *AgentPlanBuilder) ChunkedJoin(chunkSize int, streams []*AgentStreamVar) *AgentStreamVar { + agtStr := &AgentStreamVar{ + owner: b, + v: b.blder.newStreamVar(), + } + + var inputStrVars []*ioswitch.StreamVar + for _, str := range streams { + inputStrVars = append(inputStrVars, str.v) + } + + b.ops = append(b.ops, &ops.ChunkedJoin{ + Inputs: inputStrVars, + Output: agtStr.v, + ChunkSize: chunkSize, + }) + + return agtStr +} + +func (b *AgentPlanBuilder) NewString(str string) *AgentStringVar { + v := b.blder.newStringVar() + v.Value = str + + return &AgentStringVar{ + owner: b, + v: v, + } +} + +func (b *AgentPlanBuilder) NewSignal() *AgentSignalVar { + v := b.blder.newSignalVar() + + return &AgentSignalVar{ + owner: b, + v: v, + } +} + +// 字节流变量 +type AgentStreamVar struct { + owner *AgentPlanBuilder + v *ioswitch.StreamVar +} + +func (s *AgentStreamVar) IPFSWrite() *AgentStringVar { + v := s.owner.blder.newStringVar() + + s.owner.ops = append(s.owner.ops, &ops.IPFSWrite{ + Input: s.v, + FileHash: v, + }) + + return &AgentStringVar{ + owner: s.owner, + v: v, + } +} + +func (b *AgentStreamVar) FileWrite(filePath string) { + b.owner.ops = append(b.owner.ops, &ops.FileWrite{ + Input: b.v, + FilePath: filePath, + }) +} + func (b *AgentStreamVar) ChunkedSplit(chunkSize int, streamCount int, paddingZeros bool) []*AgentStreamVar { var strs []*AgentStreamVar @@ -200,46 +277,6 @@ func (s *AgentStreamVar) ToExecutor() *ExecutorStreamVar { } } -func (b *AgentPlanBuilder) Join(length int64, streams []*AgentStreamVar) *AgentStreamVar { - agtStr := &AgentStreamVar{ - owner: b, - v: b.blder.newStreamVar(), - } - - var inputStrVars []*ioswitch.StreamVar - for _, str := range streams { - inputStrVars = append(inputStrVars, str.v) - } - - b.ops = append(b.ops, &ops.Join{ - Inputs: inputStrVars, - Output: agtStr.v, - Length: length, - }) - - return agtStr -} - -func (b *AgentPlanBuilder) ChunkedJoin(chunkSize int, streams []*AgentStreamVar) *AgentStreamVar { - agtStr := &AgentStreamVar{ - owner: b, - v: b.blder.newStreamVar(), - } - - var inputStrVars []*ioswitch.StreamVar - for _, str := range streams { - inputStrVars = append(inputStrVars, str.v) - } - - b.ops = append(b.ops, &ops.ChunkedJoin{ - Inputs: inputStrVars, - Output: agtStr.v, - ChunkSize: chunkSize, - }) - - return agtStr -} - func (s *AgentStreamVar) Clone(cnt int) []*AgentStreamVar { var strs []*AgentStreamVar @@ -261,6 +298,49 @@ func (s *AgentStreamVar) Clone(cnt int) []*AgentStreamVar { return strs } +// 当流产生时发送一个信号 +func (v *AgentStreamVar) OnBegin() (*AgentStreamVar, *AgentSignalVar) { + ns := v.owner.blder.newStreamVar() + s := v.owner.blder.newSignalVar() + + v.owner.ops = append(v.owner.ops, &ops.OnStreamBegin{ + Raw: v.v, + New: ns, + Signal: s, + }) + return &AgentStreamVar{owner: v.owner, v: ns}, &AgentSignalVar{owner: v.owner, v: s} +} + +// 当流结束时发送一个信号 +func (v *AgentStreamVar) OnEnd() (*AgentStreamVar, *AgentSignalVar) { + ns := v.owner.blder.newStreamVar() + s := v.owner.blder.newSignalVar() + + v.owner.ops = append(v.owner.ops, &ops.OnStreamEnd{ + Raw: v.v, + New: ns, + Signal: s, + }) + return &AgentStreamVar{owner: v.owner, v: ns}, &AgentSignalVar{owner: v.owner, v: s} +} + +// 将此流暂存,直到一个信号产生后才释放(一个新流) +func (v *AgentStreamVar) HoldUntil(wait *AgentSignalVar) *AgentStreamVar { + nv := v.owner.blder.newStreamVar() + v.owner.ops = append(v.owner.ops, &ops.HoldUntil{ + Waits: []*ioswitch.SignalVar{wait.v}, + Holds: []ioswitch.Var{v.v}, + Emits: []ioswitch.Var{nv}, + }) + return &AgentStreamVar{owner: v.owner, v: nv} +} + +// 字符串变量 +type AgentStringVar struct { + owner *AgentPlanBuilder + v *ioswitch.StringVar +} + func (v *AgentStringVar) To(node cdssdk.Node) *AgentStringVar { v.owner.ops = append(v.owner.ops, &ops.SendVar{Var: v.v, Node: node}) v.owner = v.owner.blder.AtAgent(node) @@ -279,3 +359,99 @@ func (v *AgentStringVar) ToExecutor() *ExecutorStringVar { v: v.v, } } + +func (v *AgentStringVar) Clone() (*AgentStringVar, *AgentStringVar) { + c1 := v.owner.blder.newStringVar() + c2 := v.owner.blder.newStringVar() + + v.owner.ops = append(v.owner.ops, &ops.CloneVar{ + Raw: v.v, + Cloneds: []ioswitch.Var{c1, c2}, + }) + + return &AgentStringVar{owner: v.owner, v: c1}, &AgentStringVar{owner: v.owner, v: c2} +} + +// 返回cnt+1个复制后的变量 +func (v *AgentStringVar) CloneN(cnt int) []*AgentStringVar { + var strs []*AgentStringVar + var cloned []ioswitch.Var + for i := 0; i < cnt+1; i++ { + c := v.owner.blder.newStringVar() + strs = append(strs, &AgentStringVar{ + owner: v.owner, + v: c, + }) + cloned = append(cloned, c) + } + + v.owner.ops = append(v.owner.ops, &ops.CloneVar{ + Raw: v.v, + Cloneds: cloned, + }) + + return strs +} + +// 将此变量暂存,直到一个信号产生后才释放(一个新变量) +func (v *AgentStringVar) HoldUntil(wait *AgentSignalVar) *AgentStringVar { + nv := v.owner.blder.newStringVar() + v.owner.ops = append(v.owner.ops, &ops.HoldUntil{ + Waits: []*ioswitch.SignalVar{wait.v}, + Holds: []ioswitch.Var{v.v}, + Emits: []ioswitch.Var{nv}, + }) + return &AgentStringVar{owner: v.owner, v: nv} +} + +type AgentIntVar struct { + owner *AgentPlanBuilder + v *ioswitch.IntVar +} + +// 信号变量 +type AgentSignalVar struct { + owner *AgentPlanBuilder + v *ioswitch.SignalVar +} + +func (v *AgentSignalVar) To(node cdssdk.Node) *AgentSignalVar { + v.owner.ops = append(v.owner.ops, &ops.SendVar{Var: v.v, Node: node}) + v.owner = v.owner.blder.AtAgent(node) + + return v +} + +func (v *AgentSignalVar) ToExecutor() *ExecutorSignalVar { + v.owner.blder.executorPlan.ops = append(v.owner.blder.executorPlan.ops, &ops.GetVar{ + Var: v.v, + Node: v.owner.node, + }) + + return &ExecutorSignalVar{ + blder: v.owner.blder, + v: v.v, + } +} + +// 当这个信号被产生时,同时产生另外n个信号 +func (v *AgentSignalVar) Broadcast(cnt int) []*AgentSignalVar { + var ss []*AgentSignalVar + var targets []*ioswitch.SignalVar + + for i := 0; i < cnt; i++ { + c := v.owner.blder.newSignalVar() + ss = append(ss, &AgentSignalVar{ + owner: v.owner, + v: c, + }) + targets = append(targets, c) + } + + v.owner.ops = append(v.owner.ops, &ops.Broadcast{ + Source: v.v, + Targets: targets, + }) + + return ss +} diff --git a/common/pkgs/ioswitch/plans/executor.go b/common/pkgs/ioswitch/plans/executor.go index e1de800..2d810af 100644 --- a/common/pkgs/ioswitch/plans/executor.go +++ b/common/pkgs/ioswitch/plans/executor.go @@ -36,6 +36,10 @@ func (e *Executor) BeginRead(target ExecutorReadStream) (io.ReadCloser, error) { return target.stream.Stream, nil } +func (e *Executor) Signal(signal ExecutorSignalVar) { + e.executorSw.PutVars(signal.v) +} + func (e *Executor) Wait(ctx context.Context) (map[string]any, error) { err := e.callback.Wait(ctx) if err != nil { @@ -105,12 +109,6 @@ type ExecutorStreamVar struct { blder *PlanBuilder v *ioswitch.StreamVar } - -type ExecutorStringVar struct { - blder *PlanBuilder - v *ioswitch.StringVar -} - type ExecutorWriteStream struct { stream *ioswitch.StreamVar } @@ -120,6 +118,11 @@ func (b *ExecutorPlanBuilder) WillWrite() (ExecutorWriteStream, *ExecutorStreamV return ExecutorWriteStream{stream}, &ExecutorStreamVar{blder: b.blder, v: stream} } +func (b *ExecutorPlanBuilder) WillSignal() *ExecutorSignalVar { + s := b.blder.newSignalVar() + return &ExecutorSignalVar{blder: b.blder, v: s} +} + type ExecutorReadStream struct { stream *ioswitch.StreamVar } @@ -128,6 +131,19 @@ func (v *ExecutorStreamVar) WillRead() ExecutorReadStream { return ExecutorReadStream{v.v} } +func (s *ExecutorStreamVar) To(node cdssdk.Node) *AgentStreamVar { + s.blder.executorPlan.ops = append(s.blder.executorPlan.ops, &ops.SendStream{Stream: s.v, Node: node}) + return &AgentStreamVar{ + owner: s.blder.AtAgent(node), + v: s.v, + } +} + +type ExecutorStringVar struct { + blder *PlanBuilder + v *ioswitch.StringVar +} + func (s *ExecutorStringVar) Store(key string) { s.blder.executorPlan.ops = append(s.blder.executorPlan.ops, &ops.Store{ Var: s.v, @@ -136,9 +152,14 @@ func (s *ExecutorStringVar) Store(key string) { }) } -func (s *ExecutorStreamVar) To(node cdssdk.Node) *AgentStreamVar { - s.blder.executorPlan.ops = append(s.blder.executorPlan.ops, &ops.SendStream{Stream: s.v, Node: node}) - return &AgentStreamVar{ +type ExecutorSignalVar struct { + blder *PlanBuilder + v *ioswitch.SignalVar +} + +func (s *ExecutorSignalVar) To(node cdssdk.Node) *AgentSignalVar { + s.blder.executorPlan.ops = append(s.blder.executorPlan.ops, &ops.SendVar{Var: s.v, Node: node}) + return &AgentSignalVar{ owner: s.blder.AtAgent(node), v: s.v, } diff --git a/common/pkgs/ioswitch/plans/plan_builder.go b/common/pkgs/ioswitch/plans/plan_builder.go index ccb709f..0086774 100644 --- a/common/pkgs/ioswitch/plans/plan_builder.go +++ b/common/pkgs/ioswitch/plans/plan_builder.go @@ -91,3 +91,11 @@ func (b *PlanBuilder) newStringVar() *ioswitch.StringVar { return v } +func (b *PlanBuilder) newSignalVar() *ioswitch.SignalVar { + v := &ioswitch.SignalVar{ + ID: ioswitch.VarID(len(b.vars)), + } + b.vars = append(b.vars, v) + + return v +} diff --git a/common/pkgs/ioswitch/utils.go b/common/pkgs/ioswitch/utils.go index e912053..ccbe9c1 100644 --- a/common/pkgs/ioswitch/utils.go +++ b/common/pkgs/ioswitch/utils.go @@ -17,6 +17,7 @@ func AssignVar(from Var, to Var) error { to.(*IntVar).Value = from.Value case *StringVar: to.(*StringVar).Value = from.Value + case *SignalVar: } return nil