From 576f8543a08e76b88d2d7af1ae47575a7d1f494e Mon Sep 17 00:00:00 2001 From: Sydonian <794346190@qq.com> Date: Thu, 18 Jul 2024 17:02:13 +0800 Subject: [PATCH 1/2] =?UTF-8?q?=E4=BC=98=E5=8C=96ioswtich=E7=9A=84?= =?UTF-8?q?=E8=AE=BE=E8=AE=A1?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- common/pkgs/ioswitch/plans/executor.go | 21 ++--- common/pkgs/ioswitch/plans/fromto.go | 96 ++++++++++++++++++++++ common/pkgs/ioswitch/plans/parser.go | 15 ++++ common/pkgs/ioswitch/plans/plan_builder.go | 2 +- common/pkgs/ioswitch/plans/plans.go | 16 ---- 5 files changed, 123 insertions(+), 27 deletions(-) create mode 100644 common/pkgs/ioswitch/plans/fromto.go create mode 100644 common/pkgs/ioswitch/plans/parser.go delete mode 100644 common/pkgs/ioswitch/plans/plans.go diff --git a/common/pkgs/ioswitch/plans/executor.go b/common/pkgs/ioswitch/plans/executor.go index 2d810af..3e016af 100644 --- a/common/pkgs/ioswitch/plans/executor.go +++ b/common/pkgs/ioswitch/plans/executor.go @@ -15,19 +15,19 @@ import ( type Executor struct { planID ioswitch.PlanID - plan *PlanBuilder + planBlder *PlanBuilder callback *future.SetVoidFuture ctx context.Context cancel context.CancelFunc executorSw *ioswitch.Switch } -func (e *Executor) BeginWrite(str io.ReadCloser, target ExecutorWriteStream) { +func (e *Executor) BeginWrite(str io.ReadCloser, target *ExecutorWriteStream) { target.stream.Stream = str e.executorSw.PutVars(target.stream) } -func (e *Executor) BeginRead(target ExecutorReadStream) (io.ReadCloser, error) { +func (e *Executor) BeginRead(target *ExecutorReadStream) (io.ReadCloser, error) { err := e.executorSw.BindVars(e.ctx, target.stream) if err != nil { return nil, fmt.Errorf("bind vars: %w", err) @@ -36,7 +36,7 @@ func (e *Executor) BeginRead(target ExecutorReadStream) (io.ReadCloser, error) { return target.stream.Stream, nil } -func (e *Executor) Signal(signal ExecutorSignalVar) { +func (e *Executor) Signal(signal *ExecutorSignalVar) { e.executorSw.PutVars(signal.v) } @@ -47,7 +47,7 @@ func (e *Executor) Wait(ctx context.Context) (map[string]any, error) { } ret := make(map[string]any) - e.plan.storeMap.Range(func(k, v any) bool { + e.planBlder.storeMap.Range(func(k, v any) bool { ret[k.(string)] = v return true }) @@ -58,7 +58,7 @@ func (e *Executor) Wait(ctx context.Context) (map[string]any, error) { func (e *Executor) execute() { wg := sync.WaitGroup{} - for _, p := range e.plan.agentPlans { + for _, p := range e.planBlder.agentPlans { wg.Add(1) go func(p *AgentPlanBuilder) { @@ -113,9 +113,10 @@ type ExecutorWriteStream struct { stream *ioswitch.StreamVar } -func (b *ExecutorPlanBuilder) WillWrite() (ExecutorWriteStream, *ExecutorStreamVar) { +func (b *ExecutorPlanBuilder) WillWrite(str *ExecutorWriteStream) *ExecutorStreamVar { stream := b.blder.newStreamVar() - return ExecutorWriteStream{stream}, &ExecutorStreamVar{blder: b.blder, v: stream} + str.stream = stream + return &ExecutorStreamVar{blder: b.blder, v: stream} } func (b *ExecutorPlanBuilder) WillSignal() *ExecutorSignalVar { @@ -127,8 +128,8 @@ type ExecutorReadStream struct { stream *ioswitch.StreamVar } -func (v *ExecutorStreamVar) WillRead() ExecutorReadStream { - return ExecutorReadStream{v.v} +func (v *ExecutorStreamVar) WillRead(str *ExecutorReadStream) { + str.stream = v.v } func (s *ExecutorStreamVar) To(node cdssdk.Node) *AgentStreamVar { diff --git a/common/pkgs/ioswitch/plans/fromto.go b/common/pkgs/ioswitch/plans/fromto.go new file mode 100644 index 0000000..93d07fd --- /dev/null +++ b/common/pkgs/ioswitch/plans/fromto.go @@ -0,0 +1,96 @@ +package plans + +import cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" + +type From interface { + GetDataIndex() int +} + +type To interface { + GetDataIndex() int +} + +type FromTos []FromTo + +type FromTo struct { + Froms []From + Tos []To +} + +type FromExecutor struct { + Stream ExecutorWriteStream + DataIndex int +} + +func (f *FromExecutor) GetDataIndex() int { + return f.DataIndex +} + +type FromIPFS struct { + Node cdssdk.Node + FileHash string + DataIndex int +} + +func NewFromIPFS(node cdssdk.Node, fileHash string, dataIndex int) *FromIPFS { + return &FromIPFS{ + Node: node, + FileHash: fileHash, + DataIndex: dataIndex, + } +} + +func (f *FromIPFS) GetDataIndex() int { + return f.DataIndex +} + +type ToExecutor struct { + Stream *ExecutorReadStream + DataIndex int +} + +func NewToExecutor(dataIndex int) (*ToExecutor, *ExecutorReadStream) { + str := ExecutorReadStream{} + return &ToExecutor{ + Stream: &str, + DataIndex: dataIndex, + }, &str +} + +func (t *ToExecutor) GetDataIndex() int { + return t.DataIndex +} + +type ToIPFS struct { + Node cdssdk.Node + DataIndex int + FileHashKey string +} + +func NewToIPFS(node cdssdk.Node, dataIndex int, fileHashKey string) *ToIPFS { + return &ToIPFS{ + Node: node, + DataIndex: dataIndex, + FileHashKey: fileHashKey, + } +} + +func (t *ToIPFS) GetDataIndex() int { + return t.DataIndex +} + +type ToStorage struct { + Storage cdssdk.Storage + DataIndex int +} + +func NewToStorage(storage cdssdk.Storage, dataIndex int) *ToStorage { + return &ToStorage{ + Storage: storage, + DataIndex: dataIndex, + } +} + +func (t *ToStorage) GetDataIndex() int { + return t.DataIndex +} diff --git a/common/pkgs/ioswitch/plans/parser.go b/common/pkgs/ioswitch/plans/parser.go new file mode 100644 index 0000000..40721f5 --- /dev/null +++ b/common/pkgs/ioswitch/plans/parser.go @@ -0,0 +1,15 @@ +package plans + +import cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" + +type FromToParser interface { + Parse(ft FromTo, blder *PlanBuilder) error +} + +type DefaultParser struct { + EC *cdssdk.ECRedundancy +} + +func (p *DefaultParser) Parse(ft FromTo, blder *PlanBuilder) error { + +} diff --git a/common/pkgs/ioswitch/plans/plan_builder.go b/common/pkgs/ioswitch/plans/plan_builder.go index 0086774..d674c0c 100644 --- a/common/pkgs/ioswitch/plans/plan_builder.go +++ b/common/pkgs/ioswitch/plans/plan_builder.go @@ -54,7 +54,7 @@ func (b *PlanBuilder) Execute() *Executor { exec := Executor{ planID: planID, - plan: b, + planBlder: b, callback: future.NewSetVoid(), ctx: ctx, cancel: cancel, diff --git a/common/pkgs/ioswitch/plans/plans.go b/common/pkgs/ioswitch/plans/plans.go deleted file mode 100644 index 60bafa8..0000000 --- a/common/pkgs/ioswitch/plans/plans.go +++ /dev/null @@ -1,16 +0,0 @@ -package plans - -import ( - cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" - "gitlink.org.cn/cloudream/storage/common/pkgs/ioswitch" -) - -type AgentPlan struct { - Node cdssdk.Node - Plan ioswitch.Plan -} - -type ComposedPlan struct { - ID ioswitch.PlanID - AgentPlans []AgentPlan -} From 4ab4b1431bc8ad5e32a1d5469b9909af6676e9ea Mon Sep 17 00:00:00 2001 From: Sydonian <794346190@qq.com> Date: Thu, 25 Jul 2024 10:08:33 +0800 Subject: [PATCH 2/2] =?UTF-8?q?=E5=88=9D=E6=AD=A5=E5=AE=9E=E7=8E=B0Parser?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- common/pkgs/downloader/io.go | 17 +- common/pkgs/ioswitch/ops/ec.go | 2 +- common/pkgs/ioswitch/plans/agent.go | 111 ++-- common/pkgs/ioswitch/plans/executor.go | 62 +- common/pkgs/ioswitch/plans/fromto.go | 93 ++- common/pkgs/ioswitch/plans/ops.go | 240 +++++++ common/pkgs/ioswitch/plans/parser.go | 725 ++++++++++++++++++++- common/pkgs/ioswitch/plans/plan_builder.go | 102 +-- 8 files changed, 1196 insertions(+), 156 deletions(-) create mode 100644 common/pkgs/ioswitch/plans/ops.go diff --git a/common/pkgs/downloader/io.go b/common/pkgs/downloader/io.go index 5bcee17..464c429 100644 --- a/common/pkgs/downloader/io.go +++ b/common/pkgs/downloader/io.go @@ -107,17 +107,24 @@ func (r *IPFSReader) openStream() (io.ReadCloser, error) { func (r *IPFSReader) fromNode() (io.ReadCloser, error) { planBld := plans.NewPlanBuilder() - fileStr := planBld.AtAgent(r.node).IPFSRead(r.fileHash, ipfs.ReadOption{ - Offset: r.offset, - Length: -1, - }).ToExecutor().WillRead() + toExe, toStr := plans.NewToExecutor(-1) + ft := plans.FromTo{ + Froms: []plans.From{ + plans.NewFromIPFS(r.node, r.fileHash, -1), + }, + Tos: []plans.To{ + toExe, + }, + } + par := plans.DefaultParser{} + par.Parse(ft, planBld) exec := planBld.Execute() go func() { exec.Wait(context.Background()) }() - return exec.BeginRead(fileStr) + return exec.BeginRead(toStr) } func (r *IPFSReader) fromLocalIPFS() (io.ReadCloser, error) { diff --git a/common/pkgs/ioswitch/ops/ec.go b/common/pkgs/ioswitch/ops/ec.go index fbc197c..41a4415 100644 --- a/common/pkgs/ioswitch/ops/ec.go +++ b/common/pkgs/ioswitch/ops/ec.go @@ -102,8 +102,8 @@ func (o *ECReconstruct) Execute(ctx context.Context, sw *ioswitch.Switch) error } type ECMultiply struct { - Inputs []*ioswitch.StreamVar `json:"inputs"` Coef [][]byte `json:"coef"` + Inputs []*ioswitch.StreamVar `json:"inputs"` Outputs []*ioswitch.StreamVar `json:"outputs"` ChunkSize int64 `json:"chunkSize"` } diff --git a/common/pkgs/ioswitch/plans/agent.go b/common/pkgs/ioswitch/plans/agent.go index 1243bae..3eec7d5 100644 --- a/common/pkgs/ioswitch/plans/agent.go +++ b/common/pkgs/ioswitch/plans/agent.go @@ -1,18 +1,6 @@ package plans -import ( - "gitlink.org.cn/cloudream/common/pkgs/ipfs" - cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" - "gitlink.org.cn/cloudream/storage/common/pkgs/ioswitch" - "gitlink.org.cn/cloudream/storage/common/pkgs/ioswitch/ops" -) - -type AgentPlanBuilder struct { - blder *PlanBuilder - node cdssdk.Node - ops []ioswitch.Op -} - +/* func (b *AgentPlanBuilder) IPFSRead(fileHash string, opts ...ipfs.ReadOption) *AgentStreamVar { opt := ipfs.ReadOption{ Offset: 0, @@ -24,10 +12,10 @@ func (b *AgentPlanBuilder) IPFSRead(fileHash string, opts ...ipfs.ReadOption) *A str := &AgentStreamVar{ owner: b, - v: b.blder.newStreamVar(), + v: b.blder.NewStreamVar(), } - b.ops = append(b.ops, &ops.IPFSRead{ + b.Ops = append(b.Ops, &ops.IPFSRead{ Output: str.v, FileHash: fileHash, Option: opt, @@ -38,10 +26,10 @@ func (b *AgentPlanBuilder) IPFSRead(fileHash string, opts ...ipfs.ReadOption) *A func (b *AgentPlanBuilder) FileRead(filePath string) *AgentStreamVar { agtStr := &AgentStreamVar{ owner: b, - v: b.blder.newStreamVar(), + v: b.blder.NewStreamVar(), } - b.ops = append(b.ops, &ops.FileRead{ + b.Ops = append(b.Ops, &ops.FileRead{ Output: agtStr.v, FilePath: filePath, }) @@ -59,7 +47,7 @@ func (b *AgentPlanBuilder) ECReconstructAny(ec cdssdk.ECRedundancy, inBlockIndex var outputStrVars []*ioswitch.StreamVar for i := 0; i < len(outBlockIndexes); i++ { - v := b.blder.newStreamVar() + v := b.blder.NewStreamVar() strs = append(strs, &AgentStreamVar{ owner: b, v: v, @@ -67,7 +55,7 @@ func (b *AgentPlanBuilder) ECReconstructAny(ec cdssdk.ECRedundancy, inBlockIndex outputStrVars = append(outputStrVars, v) } - b.ops = append(b.ops, &ops.ECReconstructAny{ + b.Ops = append(b.Ops, &ops.ECReconstructAny{ EC: ec, Inputs: inputStrVars, Outputs: outputStrVars, @@ -88,7 +76,7 @@ func (b *AgentPlanBuilder) ECReconstruct(ec cdssdk.ECRedundancy, inBlockIndexes var outputStrVars []*ioswitch.StreamVar for i := 0; i < ec.K; i++ { - v := b.blder.newStreamVar() + v := b.blder.NewStreamVar() strs = append(strs, &AgentStreamVar{ owner: b, v: v, @@ -96,7 +84,7 @@ func (b *AgentPlanBuilder) ECReconstruct(ec cdssdk.ECRedundancy, inBlockIndexes outputStrVars = append(outputStrVars, v) } - b.ops = append(b.ops, &ops.ECReconstruct{ + b.Ops = append(b.Ops, &ops.ECReconstruct{ EC: ec, Inputs: inputStrVars, Outputs: outputStrVars, @@ -111,7 +99,7 @@ func (b *AgentPlanBuilder) ECMultiply(coef [][]byte, inputs []*AgentStreamVar, c outs := make([]*AgentStreamVar, len(coef)) outVars := make([]*ioswitch.StreamVar, len(coef)) for i := 0; i < len(outs); i++ { - sv := b.blder.newStreamVar() + sv := b.blder.NewStreamVar() outs[i] = &AgentStreamVar{ owner: b, v: sv, @@ -124,7 +112,7 @@ func (b *AgentPlanBuilder) ECMultiply(coef [][]byte, inputs []*AgentStreamVar, c ins[i] = inputs[i].v } - b.ops = append(b.ops, &ops.ECMultiply{ + b.Ops = append(b.Ops, &ops.ECMultiply{ Inputs: ins, Outputs: outVars, Coef: coef, @@ -137,7 +125,7 @@ func (b *AgentPlanBuilder) ECMultiply(coef [][]byte, inputs []*AgentStreamVar, c func (b *AgentPlanBuilder) Join(length int64, streams []*AgentStreamVar) *AgentStreamVar { agtStr := &AgentStreamVar{ owner: b, - v: b.blder.newStreamVar(), + v: b.blder.NewStreamVar(), } var inputStrVars []*ioswitch.StreamVar @@ -145,7 +133,7 @@ func (b *AgentPlanBuilder) Join(length int64, streams []*AgentStreamVar) *AgentS inputStrVars = append(inputStrVars, str.v) } - b.ops = append(b.ops, &ops.Join{ + b.Ops = append(b.Ops, &ops.Join{ Inputs: inputStrVars, Output: agtStr.v, Length: length, @@ -157,7 +145,7 @@ func (b *AgentPlanBuilder) Join(length int64, streams []*AgentStreamVar) *AgentS func (b *AgentPlanBuilder) ChunkedJoin(chunkSize int, streams []*AgentStreamVar) *AgentStreamVar { agtStr := &AgentStreamVar{ owner: b, - v: b.blder.newStreamVar(), + v: b.blder.NewStreamVar(), } var inputStrVars []*ioswitch.StreamVar @@ -165,7 +153,7 @@ func (b *AgentPlanBuilder) ChunkedJoin(chunkSize int, streams []*AgentStreamVar) inputStrVars = append(inputStrVars, str.v) } - b.ops = append(b.ops, &ops.ChunkedJoin{ + b.Ops = append(b.Ops, &ops.ChunkedJoin{ Inputs: inputStrVars, Output: agtStr.v, ChunkSize: chunkSize, @@ -175,7 +163,7 @@ func (b *AgentPlanBuilder) ChunkedJoin(chunkSize int, streams []*AgentStreamVar) } func (b *AgentPlanBuilder) NewString(str string) *AgentStringVar { - v := b.blder.newStringVar() + v := b.blder.NewStringVar() v.Value = str return &AgentStringVar{ @@ -185,7 +173,7 @@ func (b *AgentPlanBuilder) NewString(str string) *AgentStringVar { } func (b *AgentPlanBuilder) NewSignal() *AgentSignalVar { - v := b.blder.newSignalVar() + v := b.blder.NewSignalVar() return &AgentSignalVar{ owner: b, @@ -200,9 +188,9 @@ type AgentStreamVar struct { } func (s *AgentStreamVar) IPFSWrite() *AgentStringVar { - v := s.owner.blder.newStringVar() + v := s.owner.blder.NewStringVar() - s.owner.ops = append(s.owner.ops, &ops.IPFSWrite{ + s.owner.Ops = append(s.owner.Ops, &ops.IPFSWrite{ Input: s.v, FileHash: v, }) @@ -214,7 +202,7 @@ func (s *AgentStreamVar) IPFSWrite() *AgentStringVar { } func (b *AgentStreamVar) FileWrite(filePath string) { - b.owner.ops = append(b.owner.ops, &ops.FileWrite{ + b.owner.Ops = append(b.owner.Ops, &ops.FileWrite{ Input: b.v, FilePath: filePath, }) @@ -225,7 +213,7 @@ func (b *AgentStreamVar) ChunkedSplit(chunkSize int, streamCount int, paddingZer var outputStrVars []*ioswitch.StreamVar for i := 0; i < streamCount; i++ { - v := b.owner.blder.newStreamVar() + v := b.owner.blder.NewStreamVar() strs = append(strs, &AgentStreamVar{ owner: b.owner, v: v, @@ -233,7 +221,7 @@ func (b *AgentStreamVar) ChunkedSplit(chunkSize int, streamCount int, paddingZer outputStrVars = append(outputStrVars, v) } - b.owner.ops = append(b.owner.ops, &ops.ChunkedSplit{ + b.owner.Ops = append(b.owner.Ops, &ops.ChunkedSplit{ Input: b.v, Outputs: outputStrVars, ChunkSize: chunkSize, @@ -246,10 +234,10 @@ func (b *AgentStreamVar) ChunkedSplit(chunkSize int, streamCount int, paddingZer func (s *AgentStreamVar) Length(length int64) *AgentStreamVar { agtStr := &AgentStreamVar{ owner: s.owner, - v: s.owner.blder.newStreamVar(), + v: s.owner.blder.NewStreamVar(), } - s.owner.ops = append(s.owner.ops, &ops.Length{ + s.owner.Ops = append(s.owner.Ops, &ops.Length{ Input: s.v, Output: agtStr.v, Length: length, @@ -259,7 +247,7 @@ func (s *AgentStreamVar) Length(length int64) *AgentStreamVar { } func (s *AgentStreamVar) To(node cdssdk.Node) *AgentStreamVar { - s.owner.ops = append(s.owner.ops, &ops.SendStream{Stream: s.v, Node: node}) + s.owner.Ops = append(s.owner.Ops, &ops.SendStream{Stream: s.v, Node: node}) s.owner = s.owner.blder.AtAgent(node) return s @@ -268,7 +256,7 @@ func (s *AgentStreamVar) To(node cdssdk.Node) *AgentStreamVar { func (s *AgentStreamVar) ToExecutor() *ExecutorStreamVar { s.owner.blder.executorPlan.ops = append(s.owner.blder.executorPlan.ops, &ops.GetStream{ Stream: s.v, - Node: s.owner.node, + Node: s.owner.Node, }) return &ExecutorStreamVar{ @@ -282,7 +270,7 @@ func (s *AgentStreamVar) Clone(cnt int) []*AgentStreamVar { var outputStrVars []*ioswitch.StreamVar for i := 0; i < cnt; i++ { - v := s.owner.blder.newStreamVar() + v := s.owner.blder.NewStreamVar() strs = append(strs, &AgentStreamVar{ owner: s.owner, v: v, @@ -290,7 +278,7 @@ func (s *AgentStreamVar) Clone(cnt int) []*AgentStreamVar { outputStrVars = append(outputStrVars, v) } - s.owner.ops = append(s.owner.ops, &ops.CloneStream{ + s.owner.Ops = append(s.owner.Ops, &ops.CloneStream{ Input: s.v, Outputs: outputStrVars, }) @@ -300,10 +288,10 @@ func (s *AgentStreamVar) Clone(cnt int) []*AgentStreamVar { // 当流产生时发送一个信号 func (v *AgentStreamVar) OnBegin() (*AgentStreamVar, *AgentSignalVar) { - ns := v.owner.blder.newStreamVar() - s := v.owner.blder.newSignalVar() + ns := v.owner.blder.NewStreamVar() + s := v.owner.blder.NewSignalVar() - v.owner.ops = append(v.owner.ops, &ops.OnStreamBegin{ + v.owner.Ops = append(v.owner.Ops, &ops.OnStreamBegin{ Raw: v.v, New: ns, Signal: s, @@ -313,10 +301,10 @@ func (v *AgentStreamVar) OnBegin() (*AgentStreamVar, *AgentSignalVar) { // 当流结束时发送一个信号 func (v *AgentStreamVar) OnEnd() (*AgentStreamVar, *AgentSignalVar) { - ns := v.owner.blder.newStreamVar() - s := v.owner.blder.newSignalVar() + ns := v.owner.blder.NewStreamVar() + s := v.owner.blder.NewSignalVar() - v.owner.ops = append(v.owner.ops, &ops.OnStreamEnd{ + v.owner.Ops = append(v.owner.Ops, &ops.OnStreamEnd{ Raw: v.v, New: ns, Signal: s, @@ -326,8 +314,8 @@ func (v *AgentStreamVar) OnEnd() (*AgentStreamVar, *AgentSignalVar) { // 将此流暂存,直到一个信号产生后才释放(一个新流) func (v *AgentStreamVar) HoldUntil(wait *AgentSignalVar) *AgentStreamVar { - nv := v.owner.blder.newStreamVar() - v.owner.ops = append(v.owner.ops, &ops.HoldUntil{ + nv := v.owner.blder.NewStreamVar() + v.owner.Ops = append(v.owner.Ops, &ops.HoldUntil{ Waits: []*ioswitch.SignalVar{wait.v}, Holds: []ioswitch.Var{v.v}, Emits: []ioswitch.Var{nv}, @@ -342,7 +330,7 @@ type AgentStringVar struct { } func (v *AgentStringVar) To(node cdssdk.Node) *AgentStringVar { - v.owner.ops = append(v.owner.ops, &ops.SendVar{Var: v.v, Node: node}) + v.owner.Ops = append(v.owner.Ops, &ops.SendVar{Var: v.v, Node: node}) v.owner = v.owner.blder.AtAgent(node) return v @@ -351,7 +339,7 @@ func (v *AgentStringVar) To(node cdssdk.Node) *AgentStringVar { func (v *AgentStringVar) ToExecutor() *ExecutorStringVar { v.owner.blder.executorPlan.ops = append(v.owner.blder.executorPlan.ops, &ops.GetVar{ Var: v.v, - Node: v.owner.node, + Node: v.owner.Node, }) return &ExecutorStringVar{ @@ -361,10 +349,10 @@ func (v *AgentStringVar) ToExecutor() *ExecutorStringVar { } func (v *AgentStringVar) Clone() (*AgentStringVar, *AgentStringVar) { - c1 := v.owner.blder.newStringVar() - c2 := v.owner.blder.newStringVar() + c1 := v.owner.blder.NewStringVar() + c2 := v.owner.blder.NewStringVar() - v.owner.ops = append(v.owner.ops, &ops.CloneVar{ + v.owner.Ops = append(v.owner.Ops, &ops.CloneVar{ Raw: v.v, Cloneds: []ioswitch.Var{c1, c2}, }) @@ -377,7 +365,7 @@ func (v *AgentStringVar) CloneN(cnt int) []*AgentStringVar { var strs []*AgentStringVar var cloned []ioswitch.Var for i := 0; i < cnt+1; i++ { - c := v.owner.blder.newStringVar() + c := v.owner.blder.NewStringVar() strs = append(strs, &AgentStringVar{ owner: v.owner, v: c, @@ -385,7 +373,7 @@ func (v *AgentStringVar) CloneN(cnt int) []*AgentStringVar { cloned = append(cloned, c) } - v.owner.ops = append(v.owner.ops, &ops.CloneVar{ + v.owner.Ops = append(v.owner.Ops, &ops.CloneVar{ Raw: v.v, Cloneds: cloned, }) @@ -395,8 +383,8 @@ func (v *AgentStringVar) CloneN(cnt int) []*AgentStringVar { // 将此变量暂存,直到一个信号产生后才释放(一个新变量) func (v *AgentStringVar) HoldUntil(wait *AgentSignalVar) *AgentStringVar { - nv := v.owner.blder.newStringVar() - v.owner.ops = append(v.owner.ops, &ops.HoldUntil{ + nv := v.owner.blder.NewStringVar() + v.owner.Ops = append(v.owner.Ops, &ops.HoldUntil{ Waits: []*ioswitch.SignalVar{wait.v}, Holds: []ioswitch.Var{v.v}, Emits: []ioswitch.Var{nv}, @@ -416,7 +404,7 @@ type AgentSignalVar struct { } func (v *AgentSignalVar) To(node cdssdk.Node) *AgentSignalVar { - v.owner.ops = append(v.owner.ops, &ops.SendVar{Var: v.v, Node: node}) + v.owner.Ops = append(v.owner.Ops, &ops.SendVar{Var: v.v, Node: node}) v.owner = v.owner.blder.AtAgent(node) return v @@ -425,7 +413,7 @@ func (v *AgentSignalVar) To(node cdssdk.Node) *AgentSignalVar { func (v *AgentSignalVar) ToExecutor() *ExecutorSignalVar { v.owner.blder.executorPlan.ops = append(v.owner.blder.executorPlan.ops, &ops.GetVar{ Var: v.v, - Node: v.owner.node, + Node: v.owner.Node, }) return &ExecutorSignalVar{ @@ -440,7 +428,7 @@ func (v *AgentSignalVar) Broadcast(cnt int) []*AgentSignalVar { var targets []*ioswitch.SignalVar for i := 0; i < cnt; i++ { - c := v.owner.blder.newSignalVar() + c := v.owner.blder.NewSignalVar() ss = append(ss, &AgentSignalVar{ owner: v.owner, v: c, @@ -448,10 +436,11 @@ func (v *AgentSignalVar) Broadcast(cnt int) []*AgentSignalVar { targets = append(targets, c) } - v.owner.ops = append(v.owner.ops, &ops.Broadcast{ + v.owner.Ops = append(v.owner.Ops, &ops.Broadcast{ Source: v.v, Targets: targets, }) return ss } +*/ diff --git a/common/pkgs/ioswitch/plans/executor.go b/common/pkgs/ioswitch/plans/executor.go index 3e016af..0618fe3 100644 --- a/common/pkgs/ioswitch/plans/executor.go +++ b/common/pkgs/ioswitch/plans/executor.go @@ -7,10 +7,8 @@ import ( "sync" "gitlink.org.cn/cloudream/common/pkgs/future" - cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" stgglb "gitlink.org.cn/cloudream/storage/common/globals" "gitlink.org.cn/cloudream/storage/common/pkgs/ioswitch" - "gitlink.org.cn/cloudream/storage/common/pkgs/ioswitch/ops" ) type Executor struct { @@ -47,7 +45,7 @@ func (e *Executor) Wait(ctx context.Context) (map[string]any, error) { } ret := make(map[string]any) - e.planBlder.storeMap.Range(func(k, v any) bool { + e.planBlder.StoreMap.Range(func(k, v any) bool { ret[k.(string)] = v return true }) @@ -58,7 +56,7 @@ func (e *Executor) Wait(ctx context.Context) (map[string]any, error) { func (e *Executor) execute() { wg := sync.WaitGroup{} - for _, p := range e.planBlder.agentPlans { + for _, p := range e.planBlder.AgentPlans { wg.Add(1) go func(p *AgentPlanBuilder) { @@ -66,19 +64,19 @@ func (e *Executor) execute() { plan := ioswitch.Plan{ ID: e.planID, - Ops: p.ops, + Ops: p.Ops, } - cli, err := stgglb.AgentRPCPool.Acquire(stgglb.SelectGRPCAddress(&p.node)) + cli, err := stgglb.AgentRPCPool.Acquire(stgglb.SelectGRPCAddress(&p.Node)) if err != nil { - e.stopWith(fmt.Errorf("new agent rpc client of node %v: %w", p.node.NodeID, err)) + e.stopWith(fmt.Errorf("new agent rpc client of node %v: %w", p.Node.NodeID, err)) return } defer stgglb.AgentRPCPool.Release(cli) err = cli.ExecuteIOPlan(e.ctx, plan) if err != nil { - e.stopWith(fmt.Errorf("execute plan at %v: %w", p.node.NodeID, err)) + e.stopWith(fmt.Errorf("execute plan at %v: %w", p.Node.NodeID, err)) return } }(p) @@ -100,40 +98,35 @@ func (e *Executor) stopWith(err error) { e.cancel() } -type ExecutorPlanBuilder struct { - blder *PlanBuilder - ops []ioswitch.Op -} - -type ExecutorStreamVar struct { - blder *PlanBuilder - v *ioswitch.StreamVar -} +// type ExecutorStreamVar struct { +// blder *PlanBuilder +// v *ioswitch.StreamVar +// } type ExecutorWriteStream struct { stream *ioswitch.StreamVar } -func (b *ExecutorPlanBuilder) WillWrite(str *ExecutorWriteStream) *ExecutorStreamVar { - stream := b.blder.newStreamVar() - str.stream = stream - return &ExecutorStreamVar{blder: b.blder, v: stream} -} +// func (b *ExecutorPlanBuilder) WillWrite(str *ExecutorWriteStream) *ExecutorStreamVar { +// stream := b.blder.NewStreamVar() +// str.stream = stream +// return &ExecutorStreamVar{blder: b.blder, v: stream} +// } -func (b *ExecutorPlanBuilder) WillSignal() *ExecutorSignalVar { - s := b.blder.newSignalVar() - return &ExecutorSignalVar{blder: b.blder, v: s} -} +// func (b *ExecutorPlanBuilder) WillSignal() *ExecutorSignalVar { +// s := b.blder.NewSignalVar() +// return &ExecutorSignalVar{blder: b.blder, v: s} +// } type ExecutorReadStream struct { stream *ioswitch.StreamVar } -func (v *ExecutorStreamVar) WillRead(str *ExecutorReadStream) { - str.stream = v.v -} - +// func (v *ExecutorStreamVar) WillRead(str *ExecutorReadStream) { +// str.stream = v.v +// } +/* func (s *ExecutorStreamVar) To(node cdssdk.Node) *AgentStreamVar { - s.blder.executorPlan.ops = append(s.blder.executorPlan.ops, &ops.SendStream{Stream: s.v, Node: node}) + s.blder.ExecutorPlan.ops = append(s.blder.ExecutorPlan.ops, &ops.SendStream{Stream: s.v, Node: node}) return &AgentStreamVar{ owner: s.blder.AtAgent(node), v: s.v, @@ -146,10 +139,10 @@ type ExecutorStringVar struct { } func (s *ExecutorStringVar) Store(key string) { - s.blder.executorPlan.ops = append(s.blder.executorPlan.ops, &ops.Store{ + s.blder.ExecutorPlan.ops = append(s.blder.ExecutorPlan.ops, &ops.Store{ Var: s.v, Key: key, - Store: s.blder.storeMap, + Store: s.blder.StoreMap, }) } @@ -159,9 +152,10 @@ type ExecutorSignalVar struct { } func (s *ExecutorSignalVar) To(node cdssdk.Node) *AgentSignalVar { - s.blder.executorPlan.ops = append(s.blder.executorPlan.ops, &ops.SendVar{Var: s.v, Node: node}) + s.blder.ExecutorPlan.ops = append(s.blder.ExecutorPlan.ops, &ops.SendVar{Var: s.v, Node: node}) return &AgentSignalVar{ owner: s.blder.AtAgent(node), v: s.v, } } +*/ diff --git a/common/pkgs/ioswitch/plans/fromto.go b/common/pkgs/ioswitch/plans/fromto.go index 93d07fd..3f353b3 100644 --- a/common/pkgs/ioswitch/plans/fromto.go +++ b/common/pkgs/ioswitch/plans/fromto.go @@ -1,13 +1,17 @@ package plans -import cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" +import ( + cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" +) type From interface { GetDataIndex() int + BuildOp() Node } type To interface { GetDataIndex() int + BuildOp() Node } type FromTos []FromTo @@ -18,7 +22,7 @@ type FromTo struct { } type FromExecutor struct { - Stream ExecutorWriteStream + Stream *ExecutorWriteStream DataIndex int } @@ -26,13 +30,25 @@ func (f *FromExecutor) GetDataIndex() int { return f.DataIndex } +func (f *FromExecutor) BuildOp() Node { + op := Node{ + Env: &ExecutorEnv{}, + Type: FromExecutorOp{ + OutputVar: 0, + Handle: f.Stream, + }, + } + op.NewOutput(nil) + return op +} + type FromIPFS struct { - Node cdssdk.Node + Node *cdssdk.Node FileHash string DataIndex int } -func NewFromIPFS(node cdssdk.Node, fileHash string, dataIndex int) *FromIPFS { +func NewFromIPFS(node *cdssdk.Node, fileHash string, dataIndex int) *FromIPFS { return &FromIPFS{ Node: node, FileHash: fileHash, @@ -44,6 +60,25 @@ func (f *FromIPFS) GetDataIndex() int { return f.DataIndex } +func (f *FromIPFS) BuildOp() Node { + op := Node{ + Pinned: true, + Type: &IPFSReadType{ + OutputVar: 0, + FileHash: f.FileHash, + }, + } + + if f.Node == nil { + op.Env = nil + } else { + op.Env = &AgentEnv{*f.Node} + } + + op.NewOutput(nil) + return op +} + type ToExecutor struct { Stream *ExecutorReadStream DataIndex int @@ -61,6 +96,19 @@ func (t *ToExecutor) GetDataIndex() int { return t.DataIndex } +func (t *ToExecutor) BuildOp() Node { + op := Node{ + Env: &ExecutorEnv{}, + Pinned: true, + Type: ToExecutorOp{ + InputVar: 0, + Handle: t.Stream, + }, + } + op.NewOutput(nil) + return op +} + type ToIPFS struct { Node cdssdk.Node DataIndex int @@ -79,18 +127,31 @@ func (t *ToIPFS) GetDataIndex() int { return t.DataIndex } -type ToStorage struct { - Storage cdssdk.Storage - DataIndex int -} - -func NewToStorage(storage cdssdk.Storage, dataIndex int) *ToStorage { - return &ToStorage{ - Storage: storage, - DataIndex: dataIndex, +func (t *ToIPFS) BuildOp() Node { + op := Node{ + Env: &AgentEnv{t.Node}, + Pinned: true, + Type: &IPFSWriteType{ + InputVar: 0, + FileHashVar: 0, + }, } + op.NewInput(nil) + return op } -func (t *ToStorage) GetDataIndex() int { - return t.DataIndex -} +// type ToStorage struct { +// Storage cdssdk.Storage +// DataIndex int +// } + +// func NewToStorage(storage cdssdk.Storage, dataIndex int) *ToStorage { +// return &ToStorage{ +// Storage: storage, +// DataIndex: dataIndex, +// } +// } + +// func (t *ToStorage) GetDataIndex() int { +// return t.DataIndex +// } diff --git a/common/pkgs/ioswitch/plans/ops.go b/common/pkgs/ioswitch/plans/ops.go new file mode 100644 index 0000000..ebb2fc5 --- /dev/null +++ b/common/pkgs/ioswitch/plans/ops.go @@ -0,0 +1,240 @@ +package plans + +import ( + "github.com/samber/lo" + "gitlink.org.cn/cloudream/common/pkgs/ipfs" + cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" + "gitlink.org.cn/cloudream/common/utils/lo2" + "gitlink.org.cn/cloudream/storage/common/pkgs/ioswitch" +) + +type VarIndex int + +type StreamVar struct { + DataIndex int + From *Node + Toes []*Node + Var *ioswitch.StreamVar +} + +func (v *StreamVar) AddTo(to *Node) { + v.Toes = append(v.Toes, to) +} + +func (v *StreamVar) RemoveTo(to *Node) { + v.Toes = lo2.Remove(v.Toes, to) +} + +type ValueVarType int + +const ( + StringValueVar ValueVarType = iota +) + +type ValueVar struct { + Type ValueVarType + From *Node + Toes []*Node + Var ioswitch.Var +} + +func (v *ValueVar) AddTo(to *Node) { + v.Toes = append(v.Toes, to) +} + +func (v *ValueVar) RemoveTo(to *Node) { + v.Toes = lo2.Remove(v.Toes, to) +} + +type OpEnv interface { + Equals(env OpEnv) bool +} + +type AgentEnv struct { + Node cdssdk.Node +} + +func (e *AgentEnv) Equals(env OpEnv) bool { + if agentEnv, ok := env.(*AgentEnv); ok { + return e.Node.NodeID == agentEnv.Node.NodeID + } + return false +} + +type ExecutorEnv struct{} + +func (e *ExecutorEnv) Equals(env OpEnv) bool { + _, ok := env.(*ExecutorEnv) + return ok +} + +type OpType interface { + GenerateOp(node *Node, blder *PlanBuilder) error +} + +type Node struct { + Env OpEnv // Op将在哪里执行,Agent或者Executor + Type OpType + InputStreams []*StreamVar + OutputStreams []*StreamVar + InputValues []*ValueVar + OutputValues []*ValueVar +} + +func (o *Node) NewOutput(dataIndex int) *StreamVar { + v := &StreamVar{DataIndex: dataIndex, From: o} + o.OutputStreams = append(o.OutputStreams, v) + return v +} + +func (o *Node) AddInput(str *StreamVar) { + o.InputStreams = append(o.InputStreams, str) + str.AddTo(o) +} + +func (o *Node) ReplaceInput(org *StreamVar, new *StreamVar) { + idx := lo.IndexOf(o.InputStreams, org) + if idx < 0 { + return + } + + o.InputStreams[idx].RemoveTo(o) + o.InputStreams[idx] = new + new.AddTo(o) +} + +func (o *Node) NewOutputVar(typ ValueVarType) *ValueVar { + v := &ValueVar{Type: typ, From: o} + o.OutputValues = append(o.OutputValues, v) + return v +} + +func (o *Node) AddInputVar(v *ValueVar) { + o.InputValues = append(o.InputValues, v) + v.AddTo(o) +} + +func (o *Node) ReplaceInputVar(org *ValueVar, new *ValueVar) { + idx := lo.IndexOf(o.InputValues, org) + if idx < 0 { + return + } + + o.InputValues[idx].RemoveTo(o) + o.InputValues[idx] = new + new.AddTo(o) +} + +type IPFSReadType struct { + FileHash string + Option ipfs.ReadOption +} + +func (t *IPFSReadType) GenerateOp(op *Node, blder *PlanBuilder) error { + +} + +type IPFSWriteType struct { + FileHashStoreKey string +} + +func (t *IPFSWriteType) GenerateOp(op *Node, blder *PlanBuilder) error { + +} + +type ChunkedSplitOp struct { + ChunkSize int + PaddingZeros bool +} + +func (t *ChunkedSplitOp) GenerateOp(op *Node, blder *PlanBuilder) error { + +} + +type ChunkedJoinOp struct { + ChunkSize int +} + +func (t *ChunkedJoinOp) GenerateOp(op *Node, blder *PlanBuilder) error { + +} + +type CloneStreamOp struct{} + +func (t *CloneStreamOp) GenerateOp(op *Node, blder *PlanBuilder) error { + +} + +type CloneVarOp struct{} + +func (t *CloneVarOp) GenerateOp(op *Node, blder *PlanBuilder) error { + +} + +type MultiplyOp struct { + Coef [][]byte + ChunkSize int +} + +func (t *MultiplyOp) GenerateOp(op *Node, blder *PlanBuilder) error { +} + +type FileReadOp struct { + FilePath string +} + +func (t *FileReadOp) GenerateOp(op *Node, blder *PlanBuilder) error { +} + +type FileWriteOp struct { + FilePath string +} + +func (t *FileWriteOp) GenerateOp(op *Node, blder *PlanBuilder) error { +} + +type FromExecutorOp struct { + Handle *ExecutorWriteStream +} + +func (t *FromExecutorOp) GenerateOp(op *Node, blder *PlanBuilder) error { +} + +type ToExecutorOp struct { + Handle *ExecutorReadStream +} + +func (t *ToExecutorOp) GenerateOp(op *Node, blder *PlanBuilder) error { +} + +type StoreOp struct { + StoreKey string +} + +func (t *StoreOp) GenerateOp(op *Node, blder *PlanBuilder) error { +} + +type DropOp struct{} + +func (t *DropOp) GenerateOp(op *Node, blder *PlanBuilder) error { +} + +type SendStreamOp struct{} + +func (t *SendStreamOp) GenerateOp(op *Node, blder *PlanBuilder) error { +} + +type GetStreamOp struct{} + +func (t *GetStreamOp) GenerateOp(op *Node, blder *PlanBuilder) error { +} + +type SendVarOp struct{} + +func (t *SendVarOp) GenerateOp(op *Node, blder *PlanBuilder) error { +} + +type GetVarOp struct{} + +func (t *GetVarOp) GenerateOp(op *Node, blder *PlanBuilder) error { +} diff --git a/common/pkgs/ioswitch/plans/parser.go b/common/pkgs/ioswitch/plans/parser.go index 40721f5..5e6b765 100644 --- a/common/pkgs/ioswitch/plans/parser.go +++ b/common/pkgs/ioswitch/plans/parser.go @@ -1,6 +1,11 @@ package plans -import cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" +import ( + "fmt" + + cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" + "gitlink.org.cn/cloudream/common/utils/lo2" +) type FromToParser interface { Parse(ft FromTo, blder *PlanBuilder) error @@ -10,6 +15,724 @@ type DefaultParser struct { EC *cdssdk.ECRedundancy } +type ParseContext struct { + Ft FromTo + Ops []*Node +} + func (p *DefaultParser) Parse(ft FromTo, blder *PlanBuilder) error { + ctx := ParseContext{Ft: ft} + + // 分成两个阶段: + // 1. 基于From和To生成更多指令,初步匹配to的需求 + err := p.extend(&ctx, ft, blder) + if err != nil { + return err + } + + // 2. 优化上一步生成的指令 + + // 对于删除指令的优化,需要反复进行,直到没有变化为止。 + // 从目前实现上来说不会死循环 + for { + opted := false + if p.removeUnusedJoin(&ctx) { + opted = true + } + if p.removeUnusedMultiplyOutput(&ctx) { + opted = true + } + if p.removeUnusedSplit(&ctx) { + opted = true + } + if p.omitSplitJoin(&ctx) { + opted = true + } + + if !opted { + break + } + } + + // 确定指令执行位置的过程,也需要反复进行,直到没有变化为止。 + // 从目前实现上来说不会死循环 + for { + opted := false + if p.pinIPFSRead(&ctx) { + opted = true + } + if p.pinJoin(&ctx) { + opted = true + } + if p.pinMultiply(&ctx) { + opted = true + } + if p.pinSplit(&ctx) { + opted = true + } + + if !opted { + break + } + } + + // 下面这些只需要执行一次,但需要按顺序 + p.dropUnused(&ctx) + p.storeIPFSWriteResult(&ctx) + p.generateClone(&ctx) + p.generateSend(&ctx) + + return p.buildPlan(&ctx, blder) +} +func (p *DefaultParser) findOutputStream(ctx *ParseContext, dataIndex int) *StreamVar { + for _, op := range ctx.Ops { + for _, o := range op.OutputStreams { + if o.DataIndex == dataIndex { + return o + } + } + } + + return nil +} + +func (p *DefaultParser) extend(ctx *ParseContext, ft FromTo, blder *PlanBuilder) error { + for _, f := range ft.Froms { + o := f.BuildOp() + ctx.Ops = append(ctx.Ops, &o) + + // 对于完整文件的From,生成Split指令 + if f.GetDataIndex() == -1 { + splitOp := &Node{ + Env: nil, + Type: &ChunkedSplitOp{ChunkSize: p.EC.ChunkSize, PaddingZeros: true}, + } + splitOp.AddInput(o.OutputStreams[0]) + for i := 0; i < p.EC.K; i++ { + splitOp.NewOutput(i) + } + ctx.Ops = append(ctx.Ops, splitOp) + } + } + + // 如果有K个不同的文件块流,则生成Multiply指令,同时针对其生成的流,生成Join指令 + ecInputStrs := make(map[int]*StreamVar) +loop: + for _, o := range ctx.Ops { + for _, s := range o.OutputStreams { + if s.DataIndex >= 0 && ecInputStrs[s.DataIndex] == nil { + ecInputStrs[s.DataIndex] = s + if len(ecInputStrs) == p.EC.K { + break loop + } + } + } + } + if len(ecInputStrs) == p.EC.K { + mulOp := &Node{ + Env: nil, + Type: &MultiplyOp{ChunkSize: p.EC.ChunkSize}, + } + + for _, s := range ecInputStrs { + mulOp.AddInput(s) + } + for i := 0; i < p.EC.N; i++ { + mulOp.NewOutput(i) + } + ctx.Ops = append(ctx.Ops, mulOp) + + joinOp := &Node{ + Env: nil, + Type: &ChunkedJoinOp{p.EC.ChunkSize}, + } + for i := 0; i < p.EC.K; i++ { + // 不可能找不到流 + joinOp.AddInput(p.findOutputStream(ctx, i)) + } + joinOp.NewOutput(-1) + } + + // 为每一个To找到一个输入流 + for _, t := range ft.Tos { + o := t.BuildOp() + ctx.Ops = append(ctx.Ops, &o) + + str := p.findOutputStream(ctx, t.GetDataIndex()) + if str == nil { + return fmt.Errorf("no output stream found for data index %d", t.GetDataIndex()) + } + + o.AddInput(str) + } + + return nil +} + +// 删除输出流未被使用的Join指令 +func (p *DefaultParser) removeUnusedJoin(ctx *ParseContext) bool { + opted := false + for i, op := range ctx.Ops { + _, ok := op.Type.(*ChunkedJoinOp) + if !ok { + continue + } + + if len(op.OutputStreams[0].Toes) > 0 { + continue + } + + for _, in := range op.InputStreams { + in.RemoveTo(op) + } + + ctx.Ops[i] = nil + opted = true + } + + ctx.Ops = lo2.RemoveAllDefault(ctx.Ops) + return opted +} + +// 减少未使用的Multiply指令的输出流。如果减少到0,则删除该指令 +func (p *DefaultParser) removeUnusedMultiplyOutput(ctx *ParseContext) bool { + opted := false + for i, op := range ctx.Ops { + _, ok := op.Type.(*MultiplyOp) + if !ok { + continue + } + + for i2, out := range op.OutputStreams { + if len(out.Toes) > 0 { + continue + } + + op.OutputStreams[i2] = nil + } + op.OutputStreams = lo2.RemoveAllDefault(op.OutputStreams) + + if len(op.OutputStreams) == 0 { + for _, in := range op.InputStreams { + in.RemoveTo(op) + } + + ctx.Ops[i] = nil + } + + opted = true + } + + ctx.Ops = lo2.RemoveAllDefault(ctx.Ops) + return opted +} + +// 删除未使用的Split指令 +func (p *DefaultParser) removeUnusedSplit(ctx *ParseContext) bool { + opted := false + for i, op := range ctx.Ops { + _, ok := op.Type.(*ChunkedSplitOp) + if !ok { + continue + } + + // Split出来的每一个流都没有被使用,才能删除这个指令 + isAllUnused := true + for _, out := range op.OutputStreams { + if len(out.Toes) > 0 { + isAllUnused = false + break + } + } + + if isAllUnused { + op.InputStreams[0].RemoveTo(op) + ctx.Ops[i] = nil + opted = true + } + } + + ctx.Ops = lo2.RemoveAllDefault(ctx.Ops) + return opted +} + +// 如果Split的结果被完全用于Join,则省略Split和Join指令 +func (p *DefaultParser) omitSplitJoin(ctx *ParseContext) bool { + opted := false +loop: + for iSplit, splitOp := range ctx.Ops { + // 进行合并操作时会删除多个指令,因此这里存在splitOp == nil的情况 + if splitOp == nil { + continue + } + + _, ok := splitOp.Type.(*ChunkedSplitOp) + if !ok { + continue + } + + // Split指令的每一个输出都有且只有一个目的地 + var joinOp *Node + for _, out := range splitOp.OutputStreams { + if len(out.Toes) != 1 { + continue + } + + if joinOp == nil { + joinOp = out.Toes[0] + } else if joinOp != out.Toes[0] { + continue loop + } + } + + if joinOp == nil { + continue + } + + // 且这个目的地要是一个Join指令 + _, ok = joinOp.Type.(*ChunkedJoinOp) + if !ok { + continue + } + + // 同时这个Join指令的输入也必须全部来自Split指令的输出。 + // 由于上面判断了Split指令的输出目的地都相同,所以这里只要判断Join指令的输入数量是否与Split指令的输出数量相同即可 + if len(joinOp.InputStreams) != len(splitOp.OutputStreams) { + continue + } + + // 所有条件都满足,可以开始省略操作,将Join操作的目的地的输入流替换为Split操作的输入流: + // F->Split->Join->T 变换为:F->T + splitOp.InputStreams[0].RemoveTo(splitOp) + for _, to := range joinOp.OutputStreams[0].Toes { + to.ReplaceInput(joinOp.OutputStreams[0], splitOp.InputStreams[0]) + } + + // 并删除这两个指令 + ctx.Ops[iSplit] = nil + lo2.Clear(ctx.Ops, joinOp) + opted = true + } + + ctx.Ops = lo2.RemoveAllDefault(ctx.Ops) + return opted +} + +// 确定Split命令的执行位置 +func (p *DefaultParser) pinSplit(ctx *ParseContext) bool { + opted := false + for _, op := range ctx.Ops { + _, ok := op.Type.(*ChunkedSplitOp) + if !ok { + continue + } + + // 如果Split的每一个流的目的地都是同一个,则将Split固定在这个地方执行 + var toEnv OpEnv + useToEnv := true + for _, out := range op.OutputStreams { + for _, to := range out.Toes { + // 如果某个流的目的地也不确定,则将其视为与其他流的目的地相同 + if to.Env == nil { + continue + } + + if toEnv == nil { + toEnv = to.Env + } else if toEnv.Equals(to.Env) { + useToEnv = false + break + } + } + if !useToEnv { + break + } + } + + // 所有输出流的目的地都不确定,那么就不能根据输出流去固定 + if toEnv == nil { + useToEnv = false + } + + if useToEnv { + if op.Env == nil || !op.Env.Equals(toEnv) { + opted = true + } + + op.Env = toEnv + continue + } + + // 此时查看输入流的始发地是否可以确定,可以的话使用这个位置 + fromEnv := op.InputStreams[0].From.Env + if fromEnv != nil { + if op.Env == nil || !op.Env.Equals(fromEnv) { + opted = true + } + + op.Env = fromEnv + } + } + + return opted +} + +// 确定Join命令的执行位置,策略与固定Split类似 +func (p *DefaultParser) pinJoin(ctx *ParseContext) bool { + opted := false + for _, op := range ctx.Ops { + _, ok := op.Type.(*ChunkedJoinOp) + if !ok { + continue + } + + // 先查看输出流的目的地是否可以确定,可以的话使用这个位置 + var toEnv OpEnv + for _, to := range op.OutputStreams[0].Toes { + if to.Env == nil { + continue + } + + if toEnv == nil { + toEnv = to.Env + } else if !toEnv.Equals(to.Env) { + toEnv = nil + break + } + } + + if toEnv != nil { + if op.Env == nil || !op.Env.Equals(toEnv) { + opted = true + } + + op.Env = toEnv + continue + } + + // 否则根据输入流的始发地来固定 + var fromEnv OpEnv + for _, in := range op.InputStreams { + if in.From.Env == nil { + continue + } + + if fromEnv == nil { + fromEnv = in.From.Env + } else if !fromEnv.Equals(in.From.Env) { + // 输入流的始发地不同,那也必须选一个作为固定位置 + break + } + } + + // 所有输入流的始发地都不确定,那没办法了 + if fromEnv != nil { + if op.Env == nil || !op.Env.Equals(fromEnv) { + opted = true + } + + op.Env = fromEnv + continue + } + + } + + return opted +} + +// 确定Multiply命令的执行位置 +func (p *DefaultParser) pinMultiply(ctx *ParseContext) bool { + opted := false + for _, op := range ctx.Ops { + _, ok := op.Type.(*MultiplyOp) + if !ok { + continue + } + + var toEnv OpEnv + for _, out := range op.OutputStreams { + for _, to := range out.Toes { + if to.Env == nil { + continue + } + + if toEnv == nil { + toEnv = to.Env + } else if !toEnv.Equals(to.Env) { + toEnv = nil + break + } + } + } + + if toEnv != nil { + if op.Env == nil || !op.Env.Equals(toEnv) { + opted = true + } + + op.Env = toEnv + continue + } + + // 否则根据输入流的始发地来固定 + var fromEnv OpEnv + for _, in := range op.InputStreams { + if in.From.Env == nil { + continue + } + + if fromEnv == nil { + fromEnv = in.From.Env + } else if !fromEnv.Equals(in.From.Env) { + // 输入流的始发地不同,那也必须选一个作为固定位置 + break + } + } + + // 所有输入流的始发地都不确定,那没办法了 + if fromEnv != nil { + if op.Env == nil || !op.Env.Equals(fromEnv) { + opted = true + } + + op.Env = fromEnv + continue + } + + } + + return opted +} + +// 确定IPFS读取指令的执行位置 +func (p *DefaultParser) pinIPFSRead(ctx *ParseContext) bool { + opted := false + for _, op := range ctx.Ops { + _, ok := op.Type.(*IPFSReadType) + if !ok { + continue + } + + if op.Env != nil { + continue + } + + var toEnv OpEnv + for _, to := range op.OutputStreams[0].Toes { + if to.Env == nil { + continue + } + + if toEnv == nil { + toEnv = to.Env + } else if !toEnv.Equals(to.Env) { + toEnv = nil + break + } + } + + if toEnv != nil { + if op.Env == nil || !op.Env.Equals(toEnv) { + opted = true + } + + op.Env = toEnv + } + } + + return opted +} + +// 对于所有未使用的流,增加Drop指令 +func (p *DefaultParser) dropUnused(ctx *ParseContext) { + for _, op := range ctx.Ops { + for _, out := range op.OutputStreams { + if len(out.Toes) == 0 { + dropOp := &Node{ + Env: nil, + Type: &DropOp{}, + } + dropOp.AddInput(out) + ctx.Ops = append(ctx.Ops, dropOp) + } + } + } +} + +// 为IPFS写入指令存储结果 +func (p *DefaultParser) storeIPFSWriteResult(ctx *ParseContext) { + for _, op := range ctx.Ops { + w, ok := op.Type.(*IPFSWriteType) + if !ok { + continue + } + + if w.FileHashStoreKey == "" { + continue + } + + storeOp := &Node{ + Env: &ExecutorEnv{}, + Type: &StoreOp{ + StoreKey: w.FileHashStoreKey, + }, + } + storeOp.AddInputVar(op.OutputValues[0]) + ctx.Ops = append(ctx.Ops, storeOp) + } +} + +// 生成Clone指令 +func (p *DefaultParser) generateClone(ctx *ParseContext) { + for _, op := range ctx.Ops { + for _, out := range op.OutputStreams { + if len(out.Toes) <= 1 { + continue + } + + cloneOp := &Node{ + Env: op.Env, + Type: &CloneStreamOp{}, + } + for _, to := range out.Toes { + to.ReplaceInput(out, cloneOp.NewOutput(out.DataIndex)) + } + out.Toes = nil + cloneOp.AddInput(out) + ctx.Ops = append(ctx.Ops, cloneOp) + } + + for _, out := range op.OutputValues { + if len(out.Toes) <= 1 { + continue + } + + cloneOp := &Node{ + Env: op.Env, + Type: &CloneVarOp{}, + } + for _, to := range out.Toes { + to.ReplaceInputVar(out, cloneOp.NewOutputVar(out.Type)) + } + out.Toes = nil + cloneOp.AddInputVar(out) + } + } +} + +// 生成Send指令 +func (p *DefaultParser) generateSend(ctx *ParseContext) { + for _, op := range ctx.Ops { + for _, out := range op.OutputStreams { + to := out.Toes[0] + if to.Env.Equals(op.Env) { + continue + } + + switch to.Env.(type) { + case *ExecutorEnv: + // 如果是要送到Executor,则只能由Executor主动去拉取 + getStrOp := &Node{ + Env: &ExecutorEnv{}, + Type: &GetStreamOp{}, + } + out.Toes = nil + getStrOp.AddInput(out) + to.ReplaceInput(out, getStrOp.NewOutput(out.DataIndex)) + ctx.Ops = append(ctx.Ops, getStrOp) + + case *AgentEnv: + // 如果是要送到Agent,则可以直接发送 + sendStrOp := &Node{ + Env: op.Env, + Type: &SendStreamOp{}, + } + out.Toes = nil + sendStrOp.AddInput(out) + to.ReplaceInput(out, sendStrOp.NewOutput(out.DataIndex)) + ctx.Ops = append(ctx.Ops, sendStrOp) + } + } + + for _, out := range op.OutputValues { + to := out.Toes[0] + if to.Env.Equals(op.Env) { + continue + } + + switch to.Env.(type) { + case *ExecutorEnv: + // 如果是要送到Executor,则只能由Executor主动去拉取 + getVarOp := &Node{ + Env: &ExecutorEnv{}, + Type: &GetVarOp{}, + } + out.Toes = nil + getVarOp.AddInputVar(out) + to.ReplaceInputVar(out, getVarOp.NewOutputVar(out.Type)) + ctx.Ops = append(ctx.Ops, getVarOp) + + case *AgentEnv: + // 如果是要送到Agent,则可以直接发送 + sendVarOp := &Node{ + Env: op.Env, + Type: &SendVarOp{}, + } + out.Toes = nil + sendVarOp.AddInputVar(out) + to.ReplaceInputVar(out, sendVarOp.NewOutputVar(out.Type)) + ctx.Ops = append(ctx.Ops, sendVarOp) + } + } + } +} + +// 生成Plan +func (p *DefaultParser) buildPlan(ctx *ParseContext, blder *PlanBuilder) error { + for _, op := range ctx.Ops { + for _, out := range op.OutputStreams { + if out.Var != nil { + continue + } + + out.Var = blder.NewStreamVar() + } + + for _, in := range op.InputStreams { + if in.Var != nil { + continue + } + + in.Var = blder.NewStreamVar() + } + + for _, out := range op.OutputValues { + if out.Var != nil { + continue + } + + switch out.Type { + case StringValueVar: + out.Var = blder.NewStringVar() + } + + } + + for _, in := range op.InputValues { + if in.Var != nil { + continue + } + + switch in.Type { + case StringValueVar: + in.Var = blder.NewStringVar() + } + } + + if err := op.Type.GenerateOp(op, blder); err != nil { + return err + } + } + return nil } diff --git a/common/pkgs/ioswitch/plans/plan_builder.go b/common/pkgs/ioswitch/plans/plan_builder.go index d674c0c..c3656ac 100644 --- a/common/pkgs/ioswitch/plans/plan_builder.go +++ b/common/pkgs/ioswitch/plans/plan_builder.go @@ -6,50 +6,85 @@ import ( "gitlink.org.cn/cloudream/common/pkgs/future" cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" + "gitlink.org.cn/cloudream/common/utils/lo2" "gitlink.org.cn/cloudream/storage/common/pkgs/ioswitch" ) type PlanBuilder struct { - vars []ioswitch.Var - agentPlans map[cdssdk.NodeID]*AgentPlanBuilder - executorPlan ExecutorPlanBuilder - storeMap *sync.Map + Vars []ioswitch.Var + AgentPlans map[cdssdk.NodeID]*AgentPlanBuilder + ExecutorPlan ExecutorPlanBuilder } func NewPlanBuilder() *PlanBuilder { bld := &PlanBuilder{ - agentPlans: make(map[cdssdk.NodeID]*AgentPlanBuilder), - storeMap: &sync.Map{}, + AgentPlans: make(map[cdssdk.NodeID]*AgentPlanBuilder), + ExecutorPlan: ExecutorPlanBuilder{ + StoreMap: &sync.Map{}, + }, } - bld.executorPlan.blder = bld return bld } func (b *PlanBuilder) AtExecutor() *ExecutorPlanBuilder { - return &b.executorPlan + return &b.ExecutorPlan } func (b *PlanBuilder) AtAgent(node cdssdk.Node) *AgentPlanBuilder { - agtPlan, ok := b.agentPlans[node.NodeID] + agtPlan, ok := b.AgentPlans[node.NodeID] if !ok { agtPlan = &AgentPlanBuilder{ - blder: b, - node: node, + Node: node, } - b.agentPlans[node.NodeID] = agtPlan + b.AgentPlans[node.NodeID] = agtPlan } return agtPlan } +func (b *PlanBuilder) NewStreamVar() *ioswitch.StreamVar { + v := &ioswitch.StreamVar{ + ID: ioswitch.VarID(len(b.Vars)), + } + b.Vars = append(b.Vars, v) + + return v +} + +func (b *PlanBuilder) NewIntVar() *ioswitch.IntVar { + v := &ioswitch.IntVar{ + ID: ioswitch.VarID(len(b.Vars)), + } + b.Vars = append(b.Vars, v) + + return v +} + +func (b *PlanBuilder) NewStringVar() *ioswitch.StringVar { + v := &ioswitch.StringVar{ + ID: ioswitch.VarID(len(b.Vars)), + } + b.Vars = append(b.Vars, v) + + return v +} +func (b *PlanBuilder) NewSignalVar() *ioswitch.SignalVar { + v := &ioswitch.SignalVar{ + ID: ioswitch.VarID(len(b.Vars)), + } + b.Vars = append(b.Vars, v) + + return v +} + func (b *PlanBuilder) Execute() *Executor { ctx, cancel := context.WithCancel(context.Background()) planID := genRandomPlanID() execPlan := ioswitch.Plan{ ID: planID, - Ops: b.executorPlan.ops, + Ops: b.ExecutorPlan.Ops, } exec := Executor{ @@ -65,37 +100,28 @@ func (b *PlanBuilder) Execute() *Executor { return &exec } -func (b *PlanBuilder) newStreamVar() *ioswitch.StreamVar { - v := &ioswitch.StreamVar{ - ID: ioswitch.VarID(len(b.vars)), - } - b.vars = append(b.vars, v) - - return v +type AgentPlanBuilder struct { + Node cdssdk.Node + Ops []ioswitch.Op } -func (b *PlanBuilder) newIntVar() *ioswitch.IntVar { - v := &ioswitch.IntVar{ - ID: ioswitch.VarID(len(b.vars)), - } - b.vars = append(b.vars, v) +func (b *AgentPlanBuilder) AddOp(op ioswitch.Op) { + b.Ops = append(b.Ops, op) +} - return v +func (b *AgentPlanBuilder) RemoveOp(op ioswitch.Op) { + b.Ops = lo2.Remove(b.Ops, op) } -func (b *PlanBuilder) newStringVar() *ioswitch.StringVar { - v := &ioswitch.StringVar{ - ID: ioswitch.VarID(len(b.vars)), - } - b.vars = append(b.vars, v) +type ExecutorPlanBuilder struct { + Ops []ioswitch.Op + StoreMap *sync.Map +} - return v +func (b *ExecutorPlanBuilder) AddOp(op ioswitch.Op) { + b.Ops = append(b.Ops, op) } -func (b *PlanBuilder) newSignalVar() *ioswitch.SignalVar { - v := &ioswitch.SignalVar{ - ID: ioswitch.VarID(len(b.vars)), - } - b.vars = append(b.vars, v) - return v +func (b *ExecutorPlanBuilder) RemoveOp(op ioswitch.Op) { + b.Ops = lo2.Remove(b.Ops, op) }