From 9824c2e7f6b64d5ec8d7b859faa28b7936cf2e5f Mon Sep 17 00:00:00 2001 From: Sydonian <794346190@qq.com> Date: Wed, 27 Nov 2024 16:57:24 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BC=98=E5=8C=96ioswitch=E6=95=B0=E6=8D=AE?= =?UTF-8?q?=E7=BB=93=E6=9E=84?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- common/pkgs/ioswitch2/ops2/chunked.go | 62 +++---- common/pkgs/ioswitch2/ops2/clone.go | 37 ++--- common/pkgs/ioswitch2/ops2/driver.go | 17 +- common/pkgs/ioswitch2/ops2/ec.go | 21 +-- common/pkgs/ioswitch2/ops2/file.go | 17 +- common/pkgs/ioswitch2/ops2/ops.go | 6 +- common/pkgs/ioswitch2/ops2/range.go | 12 +- common/pkgs/ioswitch2/ops2/segment.go | 93 ++++------- common/pkgs/ioswitch2/ops2/shard_store.go | 22 +-- common/pkgs/ioswitch2/ops2/shared_store.go | 15 +- common/pkgs/ioswitch2/parser/parser.go | 170 +++++++++++++------- common/pkgs/ioswitchlrc/ops2/chunked.go | 67 +++----- common/pkgs/ioswitchlrc/ops2/clone.go | 42 +++-- common/pkgs/ioswitchlrc/ops2/ec.go | 40 ++--- common/pkgs/ioswitchlrc/ops2/ops.go | 6 +- common/pkgs/ioswitchlrc/ops2/range.go | 18 ++- common/pkgs/ioswitchlrc/ops2/shard_store.go | 50 ++++-- common/pkgs/ioswitchlrc/parser/generator.go | 8 +- common/pkgs/ioswitchlrc/parser/passes.go | 48 +++--- 19 files changed, 362 insertions(+), 389 deletions(-) diff --git a/common/pkgs/ioswitch2/ops2/chunked.go b/common/pkgs/ioswitch2/ops2/chunked.go index 515033e..74758bf 100644 --- a/common/pkgs/ioswitch2/ops2/chunked.go +++ b/common/pkgs/ioswitch2/ops2/chunked.go @@ -4,7 +4,6 @@ import ( "fmt" "io" - "github.com/samber/lo" "gitlink.org.cn/cloudream/common/pkgs/future" "gitlink.org.cn/cloudream/common/pkgs/ioswitch/dag" "gitlink.org.cn/cloudream/common/pkgs/ioswitch/exec" @@ -103,54 +102,39 @@ func (o *ChunkedJoin) String() string { type ChunkedSplitNode struct { dag.NodeBase - ChunkSize int + ChunkSize int + SplitCount int } -func (b *GraphNodeBuilder) NewChunkedSplit(chunkSize int) *ChunkedSplitNode { +func (b *GraphNodeBuilder) NewChunkedSplit(chunkSize int, splitCnt int) *ChunkedSplitNode { node := &ChunkedSplitNode{ ChunkSize: chunkSize, } b.AddNode(node) + + node.InputStreams().Init(1) + node.OutputStreams().Init(node, splitCnt) return node } -func (t *ChunkedSplitNode) Split(input *dag.Var, cnt int) { - t.InputStreams().EnsureSize(1) - input.StreamTo(t, 0) - t.OutputStreams().Resize(cnt) - for i := 0; i < cnt; i++ { - t.OutputStreams().Setup(t, t.Graph().NewVar(), i) - } +func (t *ChunkedSplitNode) Split(input *dag.StreamVar) { + input.To(t, 0) } -func (t *ChunkedSplitNode) SubStream(idx int) *dag.Var { +func (t *ChunkedSplitNode) SubStream(idx int) *dag.StreamVar { return t.OutputStreams().Get(idx) } -func (t *ChunkedSplitNode) SplitCount() int { - return t.OutputStreams().Len() -} - func (t *ChunkedSplitNode) RemoveAllStream() { - if t.InputStreams().Len() == 0 { - return - } - - t.InputStreams().Get(0).StreamNotTo(t, 0) - t.InputStreams().Resize(0) - - for _, out := range t.OutputStreams().RawArray() { - out.NoInputAllStream() - } - t.OutputStreams().Resize(0) + t.InputStreams().ClearAllInput(t) + t.OutputStreams().ClearAllOutput(t) + t.OutputStreams().Slots.Resize(0) } func (t *ChunkedSplitNode) GenerateOp() (exec.Op, error) { return &ChunkedSplit{ - Input: t.InputStreams().Get(0).VarID, - Outputs: lo.Map(t.OutputStreams().RawArray(), func(v *dag.Var, idx int) exec.VarID { - return v.VarID - }), + Input: t.InputStreams().Get(0).VarID, + Outputs: t.OutputStreams().GetVarIDs(), ChunkSize: t.ChunkSize, PaddingZeros: true, }, nil @@ -170,31 +154,27 @@ func (b *GraphNodeBuilder) NewChunkedJoin(chunkSize int) *ChunkedJoinNode { ChunkSize: chunkSize, } b.AddNode(node) - node.OutputStreams().SetupNew(node, b.Graph.NewVar()) + node.OutputStreams().Init(node, 1) return node } -func (t *ChunkedJoinNode) AddInput(str *dag.Var) { +func (t *ChunkedJoinNode) AddInput(str *dag.StreamVar) { idx := t.InputStreams().EnlargeOne() - str.StreamTo(t, idx) + str.To(t, idx) } -func (t *ChunkedJoinNode) Joined() *dag.Var { +func (t *ChunkedJoinNode) Joined() *dag.StreamVar { return t.OutputStreams().Get(0) } func (t *ChunkedJoinNode) RemoveAllInputs() { - for i, in := range t.InputStreams().RawArray() { - in.StreamNotTo(t, i) - } - t.InputStreams().Resize(0) + t.InputStreams().ClearAllInput(t) + t.InputStreams().Slots.Resize(0) } func (t *ChunkedJoinNode) GenerateOp() (exec.Op, error) { return &ChunkedJoin{ - Inputs: lo.Map(t.InputStreams().RawArray(), func(v *dag.Var, idx int) exec.VarID { - return v.VarID - }), + Inputs: t.InputStreams().GetVarIDs(), Output: t.OutputStreams().Get(0).VarID, ChunkSize: t.ChunkSize, }, nil diff --git a/common/pkgs/ioswitch2/ops2/clone.go b/common/pkgs/ioswitch2/ops2/clone.go index 578a237..5cf4158 100644 --- a/common/pkgs/ioswitch2/ops2/clone.go +++ b/common/pkgs/ioswitch2/ops2/clone.go @@ -4,7 +4,6 @@ import ( "fmt" "io" - "github.com/samber/lo" "gitlink.org.cn/cloudream/common/pkgs/ioswitch/dag" "gitlink.org.cn/cloudream/common/pkgs/ioswitch/exec" "gitlink.org.cn/cloudream/common/pkgs/ioswitch/utils" @@ -81,26 +80,22 @@ type CloneStreamType struct { func (b *GraphNodeBuilder) NewCloneStream() *CloneStreamType { node := &CloneStreamType{} b.AddNode(node) + node.InputStreams().Init(1) return node } -func (t *CloneStreamType) SetInput(raw *dag.Var) { - t.InputStreams().EnsureSize(1) - raw.StreamTo(t, 0) +func (t *CloneStreamType) SetInput(raw *dag.StreamVar) { + raw.To(t, 0) } -func (t *CloneStreamType) NewOutput() *dag.Var { - output := t.Graph().NewVar() - t.OutputStreams().SetupNew(t, output) - return output +func (t *CloneStreamType) NewOutput() *dag.StreamVar { + return t.OutputStreams().SetupNew(t).Var } func (t *CloneStreamType) GenerateOp() (exec.Op, error) { return &CloneStream{ - Raw: t.InputStreams().Get(0).VarID, - Cloneds: lo.Map(t.OutputStreams().RawArray(), func(v *dag.Var, idx int) exec.VarID { - return v.VarID - }), + Raw: t.InputStreams().Get(0).VarID, + Cloneds: t.OutputValues().GetVarIDs(), }, nil } @@ -118,23 +113,19 @@ func (b *GraphNodeBuilder) NewCloneValue() *CloneVarType { return node } -func (t *CloneVarType) SetInput(raw *dag.Var) { - t.InputValues().EnsureSize(1) - raw.ValueTo(t, 0) +func (t *CloneVarType) SetInput(raw *dag.ValueVar) { + t.InputValues().Init(1) + raw.To(t, 0) } -func (t *CloneVarType) NewOutput() *dag.Var { - output := t.Graph().NewVar() - t.OutputValues().SetupNew(t, output) - return output +func (t *CloneVarType) NewOutput() *dag.ValueVar { + return t.OutputValues().AppendNew(t).Var } func (t *CloneVarType) GenerateOp() (exec.Op, error) { return &CloneVar{ - Raw: t.InputValues().Get(0).VarID, - Cloneds: lo.Map(t.OutputValues().RawArray(), func(v *dag.Var, idx int) exec.VarID { - return v.VarID - }), + Raw: t.InputValues().Get(0).VarID, + Cloneds: t.OutputValues().GetVarIDs(), }, nil } diff --git a/common/pkgs/ioswitch2/ops2/driver.go b/common/pkgs/ioswitch2/ops2/driver.go index e99dd63..a3e539c 100644 --- a/common/pkgs/ioswitch2/ops2/driver.go +++ b/common/pkgs/ioswitch2/ops2/driver.go @@ -21,8 +21,7 @@ func (b *GraphNodeBuilder) NewFromDriver(fr ioswitch2.From, handle *exec.DriverW } b.AddNode(node) - node.OutputStreams().SetupNew(node, b.NewVar()) - + node.OutputStreams().Init(node, 1) return node } @@ -30,8 +29,8 @@ func (f *FromDriverNode) GetFrom() ioswitch2.From { return f.From } -func (t *FromDriverNode) Output() dag.Slot { - return dag.Slot{ +func (t *FromDriverNode) Output() dag.StreamSlot { + return dag.StreamSlot{ Var: t.OutputStreams().Get(0), Index: 0, } @@ -56,6 +55,7 @@ func (b *GraphNodeBuilder) NewToDriver(to ioswitch2.To, handle *exec.DriverReadS } b.AddNode(node) + node.InputStreams().Init(1) return node } @@ -63,13 +63,12 @@ func (t *ToDriverNode) GetTo() ioswitch2.To { return t.To } -func (t *ToDriverNode) SetInput(v *dag.Var) { - t.InputStreams().EnsureSize(1) - v.StreamTo(t, 0) +func (t *ToDriverNode) SetInput(v *dag.StreamVar) { + v.To(t, 0) } -func (t *ToDriverNode) Input() dag.Slot { - return dag.Slot{ +func (t *ToDriverNode) Input() dag.StreamSlot { + return dag.StreamSlot{ Var: t.InputStreams().Get(0), Index: 0, } diff --git a/common/pkgs/ioswitch2/ops2/ec.go b/common/pkgs/ioswitch2/ops2/ec.go index 2235683..2502471 100644 --- a/common/pkgs/ioswitch2/ops2/ec.go +++ b/common/pkgs/ioswitch2/ops2/ec.go @@ -4,7 +4,6 @@ import ( "fmt" "io" - "github.com/samber/lo" "gitlink.org.cn/cloudream/common/pkgs/future" "gitlink.org.cn/cloudream/common/pkgs/ioswitch/dag" "gitlink.org.cn/cloudream/common/pkgs/ioswitch/exec" @@ -129,25 +128,21 @@ func (b *GraphNodeBuilder) NewECMultiply(ec cdssdk.ECRedundancy) *ECMultiplyNode return node } -func (t *ECMultiplyNode) AddInput(str *dag.Var, dataIndex int) { +func (t *ECMultiplyNode) AddInput(str *dag.StreamVar, dataIndex int) { t.InputIndexes = append(t.InputIndexes, dataIndex) idx := t.InputStreams().EnlargeOne() - str.StreamTo(t, idx) + str.To(t, idx) } func (t *ECMultiplyNode) RemoveAllInputs() { - for i, in := range t.InputStreams().RawArray() { - in.StreamNotTo(t, i) - } - t.InputStreams().Resize(0) + t.InputStreams().ClearAllInput(t) + t.InputStreams().Slots.Resize(0) t.InputIndexes = nil } -func (t *ECMultiplyNode) NewOutput(dataIndex int) *dag.Var { +func (t *ECMultiplyNode) NewOutput(dataIndex int) *dag.StreamVar { t.OutputIndexes = append(t.OutputIndexes, dataIndex) - output := t.Graph().NewVar() - t.OutputStreams().SetupNew(t, output) - return output + return t.OutputStreams().SetupNew(t).Var } func (t *ECMultiplyNode) GenerateOp() (exec.Op, error) { @@ -162,8 +157,8 @@ func (t *ECMultiplyNode) GenerateOp() (exec.Op, error) { return &ECMultiply{ Coef: coef, - Inputs: lo.Map(t.InputStreams().RawArray(), func(v *dag.Var, idx int) exec.VarID { return v.VarID }), - Outputs: lo.Map(t.OutputStreams().RawArray(), func(v *dag.Var, idx int) exec.VarID { return v.VarID }), + Inputs: t.InputStreams().GetVarIDs(), + Outputs: t.OutputStreams().GetVarIDs(), ChunkSize: t.EC.ChunkSize, }, nil } diff --git a/common/pkgs/ioswitch2/ops2/file.go b/common/pkgs/ioswitch2/ops2/file.go index be3920a..f490dc6 100644 --- a/common/pkgs/ioswitch2/ops2/file.go +++ b/common/pkgs/ioswitch2/ops2/file.go @@ -89,12 +89,12 @@ func (b *GraphNodeBuilder) NewFileRead(filePath string) *FileReadNode { FilePath: filePath, } b.AddNode(node) - node.OutputStreams().SetupNew(node, b.NewVar()) + node.OutputStreams().Init(node, 1) return node } -func (t *FileReadNode) Output() dag.Slot { - return dag.Slot{ +func (t *FileReadNode) Output() dag.StreamSlot { + return dag.StreamSlot{ Var: t.OutputStreams().Get(0), Index: 0, } @@ -121,19 +121,20 @@ func (b *GraphNodeBuilder) NewFileWrite(filePath string) *FileWriteNode { FilePath: filePath, } b.AddNode(node) + + node.InputStreams().Init(1) return node } -func (t *FileWriteNode) Input() dag.Slot { - return dag.Slot{ +func (t *FileWriteNode) Input() dag.StreamSlot { + return dag.StreamSlot{ Var: t.InputStreams().Get(0), Index: 0, } } -func (t *FileWriteNode) SetInput(str *dag.Var) { - t.InputStreams().EnsureSize(1) - str.StreamTo(t, 0) +func (t *FileWriteNode) SetInput(str *dag.StreamVar) { + str.To(t, 0) } func (t *FileWriteNode) GenerateOp() (exec.Op, error) { diff --git a/common/pkgs/ioswitch2/ops2/ops.go b/common/pkgs/ioswitch2/ops2/ops.go index 9970e8a..701bad9 100644 --- a/common/pkgs/ioswitch2/ops2/ops.go +++ b/common/pkgs/ioswitch2/ops2/ops.go @@ -17,14 +17,14 @@ func NewGraphNodeBuilder() *GraphNodeBuilder { type FromNode interface { dag.Node GetFrom() ioswitch2.From - Output() dag.Slot + Output() dag.StreamSlot } type ToNode interface { dag.Node GetTo() ioswitch2.To - Input() dag.Slot - SetInput(input *dag.Var) + Input() dag.StreamSlot + SetInput(input *dag.StreamVar) } // func formatStreamIO(node *dag.Node) string { diff --git a/common/pkgs/ioswitch2/ops2/range.go b/common/pkgs/ioswitch2/ops2/range.go index 8f41311..82a454e 100644 --- a/common/pkgs/ioswitch2/ops2/range.go +++ b/common/pkgs/ioswitch2/ops2/range.go @@ -87,16 +87,16 @@ type RangeNode struct { func (b *GraphNodeBuilder) NewRange() *RangeNode { node := &RangeNode{} b.AddNode(node) + + node.InputStreams().Init(1) + node.OutputStreams().Init(node, 1) return node } -func (t *RangeNode) RangeStream(input *dag.Var, rng exec.Range) *dag.Var { - t.InputStreams().EnsureSize(1) - input.StreamTo(t, 0) +func (t *RangeNode) RangeStream(input *dag.StreamVar, rng exec.Range) *dag.StreamVar { + input.To(t, 0) t.Range = rng - output := t.Graph().NewVar() - t.OutputStreams().Setup(t, output, 0) - return output + return t.OutputStreams().Get(0) } func (t *RangeNode) GenerateOp() (exec.Op, error) { diff --git a/common/pkgs/ioswitch2/ops2/segment.go b/common/pkgs/ioswitch2/ops2/segment.go index 4d1327d..4cd2e65 100644 --- a/common/pkgs/ioswitch2/ops2/segment.go +++ b/common/pkgs/ioswitch2/ops2/segment.go @@ -122,115 +122,78 @@ func (o *SegmentJoin) String() string { type SegmentSplitNode struct { dag.NodeBase - segments []int64 + Segments []int64 } func (b *GraphNodeBuilder) NewSegmentSplit(segments []int64) *SegmentSplitNode { node := &SegmentSplitNode{ - segments: segments, + Segments: segments, } b.AddNode(node) - node.OutputStreams().Resize(len(segments)) + + node.InputStreams().Init(1) + node.OutputStreams().Init(node, len(segments)) return node } -func (n *SegmentSplitNode) SetInput(input *dag.Var) { - n.InputStreams().EnsureSize(1) - input.StreamTo(n, 0) +func (n *SegmentSplitNode) SetInput(input *dag.StreamVar) { + input.To(n, 0) } func (t *SegmentSplitNode) RemoveAllStream() { - if t.InputStreams().Len() == 0 { - return - } - - t.InputStreams().Get(0).StreamNotTo(t, 0) - t.InputStreams().Resize(0) - - for _, out := range t.OutputStreams().RawArray() { - out.NoInputAllStream() - } - t.OutputStreams().Resize(0) + t.InputStreams().ClearAllInput(t) + t.OutputStreams().ClearAllOutput(t) } -func (n *SegmentSplitNode) Segment(index int) *dag.Var { - // 必须连续消耗流 - for i := 0; i <= index; i++ { - if n.OutputStreams().Get(i) == nil { - n.OutputStreams().Setup(n, n.Graph().NewVar(), i) - } - } +func (n *SegmentSplitNode) Segment(index int) *dag.StreamVar { return n.OutputStreams().Get(index) } -func (t *SegmentSplitNode) GenerateOp() (exec.Op, error) { - lastUsedSeg := 0 - for i := t.OutputStreams().Len() - 1; i >= 0; i-- { - if t.OutputStreams().Get(i) != nil { - lastUsedSeg = i - break +func (n *SegmentSplitNode) GenerateOp() (exec.Op, error) { + var realSegs []int64 + var realSegVarIDs []exec.VarID + + for i := 0; i < len(n.Segments); i++ { + if n.Segments[i] > 0 { + realSegs = append(realSegs, n.Segments[i]) + realSegVarIDs = append(realSegVarIDs, n.Segment(i).VarID) } } return &SegmentSplit{ - Input: t.InputStreams().Get(0).VarID, - Segments: t.segments[:lastUsedSeg+1], - Outputs: t.OutputStreams().GetVarIDs(), + Input: n.InputStreams().Get(0).VarID, + Segments: realSegs, + Outputs: realSegVarIDs, }, nil } type SegmentJoinNode struct { dag.NodeBase - UsedStart int - UsedCount int } func (b *GraphNodeBuilder) NewSegmentJoin(segmentSizes []int64) *SegmentJoinNode { node := &SegmentJoinNode{} b.AddNode(node) - node.InputStreams().Resize(len(segmentSizes)) - node.OutputStreams().SetupNew(node, b.NewVar()) + node.InputStreams().Init(len(segmentSizes)) + node.OutputStreams().Init(node, 1) return node } -func (n *SegmentJoinNode) SetInput(index int, input *dag.Var) { - input.StreamTo(n, index) +func (n *SegmentJoinNode) SetInput(index int, input *dag.StreamVar) { + input.To(n, index) } func (n *SegmentJoinNode) RemoveAllInputs() { - for i, in := range n.InputStreams().RawArray() { - in.StreamNotTo(n, i) - } - n.InputStreams().Resize(0) -} - -// 记录本计划中实际要使用的分段的范围,范围外的分段流都会取消输入 -func (n *SegmentJoinNode) MarkUsed(start, cnt int) { - n.UsedStart = start - n.UsedCount = cnt - - for i := 0; i < start; i++ { - str := n.InputStreams().Get(i) - if str != nil { - str.StreamNotTo(n, i) - } - } - - for i := start + cnt; i < n.InputStreams().Len(); i++ { - str := n.InputStreams().Get(i) - if str != nil { - str.StreamNotTo(n, i) - } - } + n.InputStreams().ClearAllInput(n) } -func (n *SegmentJoinNode) Joined() *dag.Var { +func (n *SegmentJoinNode) Joined() *dag.StreamVar { return n.OutputStreams().Get(0) } func (t *SegmentJoinNode) GenerateOp() (exec.Op, error) { return &SegmentJoin{ - Inputs: t.InputStreams().GetVarIDsRanged(t.UsedStart, t.UsedStart+t.UsedCount), + Inputs: t.InputStreams().GetVarIDs(), Output: t.OutputStreams().Get(0).VarID, }, nil } diff --git a/common/pkgs/ioswitch2/ops2/shard_store.go b/common/pkgs/ioswitch2/ops2/shard_store.go index 2fdb0f9..d6132f7 100644 --- a/common/pkgs/ioswitch2/ops2/shard_store.go +++ b/common/pkgs/ioswitch2/ops2/shard_store.go @@ -128,7 +128,8 @@ func (b *GraphNodeBuilder) NewShardRead(fr ioswitch2.From, stgID cdssdk.StorageI Open: open, } b.AddNode(node) - node.OutputStreams().SetupNew(node, b.NewVar()) + + node.OutputStreams().Init(node, 1) return node } @@ -136,8 +137,8 @@ func (t *ShardReadNode) GetFrom() ioswitch2.From { return t.From } -func (t *ShardReadNode) Output() dag.Slot { - return dag.Slot{ +func (t *ShardReadNode) Output() dag.StreamSlot { + return dag.StreamSlot{ Var: t.OutputStreams().Get(0), Index: 0, } @@ -169,6 +170,9 @@ func (b *GraphNodeBuilder) NewShardWrite(to ioswitch2.To, stgID cdssdk.StorageID FileHashStoreKey: fileHashStoreKey, } b.AddNode(node) + + node.InputStreams().Init(1) + node.OutputValues().Init(node, 1) return node } @@ -176,20 +180,18 @@ func (t *ShardWriteNode) GetTo() ioswitch2.To { return t.To } -func (t *ShardWriteNode) SetInput(input *dag.Var) { - t.InputStreams().EnsureSize(1) - input.StreamTo(t, 0) - t.OutputValues().SetupNew(t, t.Graph().NewVar()) +func (t *ShardWriteNode) SetInput(input *dag.StreamVar) { + input.To(t, 0) } -func (t *ShardWriteNode) Input() dag.Slot { - return dag.Slot{ +func (t *ShardWriteNode) Input() dag.StreamSlot { + return dag.StreamSlot{ Var: t.InputStreams().Get(0), Index: 0, } } -func (t *ShardWriteNode) FileHashVar() *dag.Var { +func (t *ShardWriteNode) FileHashVar() *dag.ValueVar { return t.OutputValues().Get(0) } diff --git a/common/pkgs/ioswitch2/ops2/shared_store.go b/common/pkgs/ioswitch2/ops2/shared_store.go index 36b0746..a7cb71e 100644 --- a/common/pkgs/ioswitch2/ops2/shared_store.go +++ b/common/pkgs/ioswitch2/ops2/shared_store.go @@ -81,6 +81,9 @@ func (b *GraphNodeBuilder) NewSharedLoad(to ioswitch2.To, stgID cdssdk.StorageID Path: path, } b.AddNode(node) + + node.InputStreams().Init(1) + node.OutputValues().Init(node, 1) return node } @@ -88,20 +91,18 @@ func (t *SharedLoadNode) GetTo() ioswitch2.To { return t.To } -func (t *SharedLoadNode) SetInput(input *dag.Var) { - t.InputStreams().EnsureSize(1) - input.StreamTo(t, 0) - t.OutputValues().SetupNew(t, t.Graph().NewVar()) +func (t *SharedLoadNode) SetInput(input *dag.StreamVar) { + input.To(t, 0) } -func (t *SharedLoadNode) Input() dag.Slot { - return dag.Slot{ +func (t *SharedLoadNode) Input() dag.StreamSlot { + return dag.StreamSlot{ Var: t.InputStreams().Get(0), Index: 0, } } -func (t *SharedLoadNode) FullPathVar() *dag.Var { +func (t *SharedLoadNode) FullPathVar() *dag.ValueVar { return t.OutputValues().Get(0) } diff --git a/common/pkgs/ioswitch2/parser/parser.go b/common/pkgs/ioswitch2/parser/parser.go index 00ffef3..321211d 100644 --- a/common/pkgs/ioswitch2/parser/parser.go +++ b/common/pkgs/ioswitch2/parser/parser.go @@ -16,7 +16,7 @@ import ( ) type IndexedStream struct { - Stream *dag.Var + Stream *dag.StreamVar StreamIndex ioswitch2.StreamIndex } @@ -57,7 +57,12 @@ func Parse(ft ioswitch2.FromTo, blder *exec.PlanBuilder) error { // 2. 优化上一步生成的指令 - err = removeUnusedSegment(&ctx) + err = fixSegmentJoin(&ctx) + if err != nil { + return err + } + + err = fixSegmentSplit(&ctx) if err != nil { return err } @@ -106,8 +111,8 @@ func Parse(ft ioswitch2.FromTo, blder *exec.PlanBuilder) error { return plan.Generate(ctx.DAG.Graph, blder) } -func findOutputStream(ctx *ParseContext, streamIndex ioswitch2.StreamIndex) *dag.Var { - var ret *dag.Var +func findOutputStream(ctx *ParseContext, streamIndex ioswitch2.StreamIndex) *dag.StreamVar { + var ret *dag.StreamVar for _, s := range ctx.IndexedStreams { if s.StreamIndex == streamIndex { ret = s.Stream @@ -224,8 +229,8 @@ func extend(ctx *ParseContext) error { if fr.GetStreamIndex().IsRaw() { // 只有输入输出需要EC编码的块时,才生成相关指令 if ctx.UseEC { - splitNode := ctx.DAG.NewChunkedSplit(ctx.Ft.ECParam.ChunkSize) - splitNode.Split(frNode.Output().Var, ctx.Ft.ECParam.K) + splitNode := ctx.DAG.NewChunkedSplit(ctx.Ft.ECParam.ChunkSize, ctx.Ft.ECParam.K) + splitNode.Split(frNode.Output().Var) for i := 0; i < ctx.Ft.ECParam.K; i++ { ctx.IndexedStreams = append(ctx.IndexedStreams, IndexedStream{ Stream: splitNode.SubStream(i), @@ -250,7 +255,7 @@ func extend(ctx *ParseContext) error { if ctx.UseEC { // 如果有K个不同的文件块流,则生成Multiply指令,同时针对其生成的流,生成Join指令 - ecInputStrs := make(map[int]*dag.Var) + ecInputStrs := make(map[int]*dag.StreamVar) for _, s := range ctx.IndexedStreams { if s.StreamIndex.IsEC() && ecInputStrs[s.StreamIndex.Index] == nil { ecInputStrs[s.StreamIndex.Index] = s.Stream @@ -301,8 +306,8 @@ func extend(ctx *ParseContext) error { // SegmentJoin生成的Join指令可以用来生成EC块 if ctx.UseEC { - splitNode := ctx.DAG.NewChunkedSplit(ctx.Ft.ECParam.ChunkSize) - splitNode.Split(joinNode.Joined(), ctx.Ft.ECParam.K) + splitNode := ctx.DAG.NewChunkedSplit(ctx.Ft.ECParam.ChunkSize, ctx.Ft.ECParam.K) + splitNode.Split(joinNode.Joined()) mulNode := ctx.DAG.NewECMultiply(*ctx.Ft.ECParam) @@ -494,8 +499,46 @@ func setEnvByAddress(n dag.Node, hub cdssdk.Hub, addr cdssdk.HubAddressInfo) err return nil } +// 根据StreamRange,调整SegmentSplit中分段的个数和每段的大小 +func fixSegmentSplit(ctx *ParseContext) error { + var err error + dag.WalkOnlyType[*ops2.SegmentSplitNode](ctx.DAG.Graph, func(node *ops2.SegmentSplitNode) bool { + var strEnd *int64 + if ctx.StreamRange.Length != nil { + e := ctx.StreamRange.Offset + *ctx.StreamRange.Length + strEnd = &e + } + + startSeg, endSeg := ctx.Ft.SegmentParam.CalcSegmentRange(ctx.StreamRange.Offset, strEnd) + + // 关闭超出范围的分段 + for i := 0; i < startSeg; i++ { + node.Segments[i] = 0 + node.OutputStreams().Slots.Set(i, nil) + } + + for i := endSeg; i < len(node.Segments); i++ { + node.Segments[i] = 0 + node.OutputStreams().Slots.Set(i, nil) + } + + // StreamRange开始的位置可能在某个分段的中间,此时这个分段的大小等于流开始位置到分段结束位置的距离 + startSegStart := ctx.Ft.SegmentParam.CalcSegmentStart(startSeg) + node.Segments[startSeg] -= ctx.StreamRange.Offset - startSegStart + + // StreamRange结束的位置可能在某个分段的中间,此时这个分段的大小就等于流结束位置到分段起始位置的距离 + if strEnd != nil { + endSegStart := ctx.Ft.SegmentParam.CalcSegmentStart(endSeg - 1) + node.Segments[endSeg-1] = *strEnd - endSegStart + } + return true + }) + + return err +} + // 从SegmentJoin中删除未使用的分段 -func removeUnusedSegment(ctx *ParseContext) error { +func fixSegmentJoin(ctx *ParseContext) error { var err error dag.WalkOnlyType[*ops2.SegmentJoinNode](ctx.DAG.Graph, func(node *ops2.SegmentJoinNode) bool { start := ctx.StreamRange.Offset @@ -505,11 +548,19 @@ func removeUnusedSegment(ctx *ParseContext) error { end = &e } - segStart, segEnd := ctx.Ft.SegmentParam.CalcSegmentRange(start, end) + startSeg, endSeg := ctx.Ft.SegmentParam.CalcSegmentRange(start, end) - node.MarkUsed(segStart, segEnd) + // 关闭超出范围的分段 + for i := 0; i < startSeg; i++ { + node.InputStreams().ClearInputAt(node, i) + } + + for i := endSeg; i < node.InputStreams().Len(); i++ { + node.InputStreams().ClearInputAt(node, i) + } - for i := segStart; i < segEnd; i++ { + // 检查一下必须的分段是否都被加入到Join中 + for i := startSeg; i < endSeg; i++ { if node.InputStreams().Get(i) == nil { err = fmt.Errorf("segment %v missed to join an raw stream", i) return false @@ -527,7 +578,7 @@ func removeUnusedSegmentJoin(ctx *ParseContext) bool { changed := false dag.WalkOnlyType[*ops2.SegmentJoinNode](ctx.DAG.Graph, func(node *ops2.SegmentJoinNode) bool { - if node.Joined().To().Len() > 0 { + if node.Joined().Dst.Len() > 0 { return true } @@ -544,8 +595,8 @@ func removeUnusedSegmentSplit(ctx *ParseContext) bool { changed := false dag.WalkOnlyType[*ops2.SegmentSplitNode](ctx.DAG.Graph, func(typ *ops2.SegmentSplitNode) bool { // Split出来的每一个流都没有被使用,才能删除这个指令 - for _, out := range typ.OutputStreams().RawArray() { - if out.To().Len() > 0 { + for _, out := range typ.OutputStreams().Slots.RawArray() { + if out.Dst.Len() > 0 { return true } } @@ -566,14 +617,14 @@ func omitSegmentSplitJoin(ctx *ParseContext) bool { dag.WalkOnlyType[*ops2.SegmentSplitNode](ctx.DAG.Graph, func(splitNode *ops2.SegmentSplitNode) bool { // Split指令的每一个输出都有且只有一个目的地 var dstNode dag.Node - for _, out := range splitNode.OutputStreams().RawArray() { - if out.To().Len() != 1 { + for _, out := range splitNode.OutputStreams().Slots.RawArray() { + if out.Dst.Len() != 1 { return true } if dstNode == nil { - dstNode = out.To().Get(0) - } else if dstNode != out.To().Get(0) { + dstNode = out.Dst.Get(0) + } else if dstNode != out.Dst.Get(0) { return true } } @@ -597,10 +648,10 @@ func omitSegmentSplitJoin(ctx *ParseContext) bool { // 所有条件都满足,可以开始省略操作,将Join操作的目的地的输入流替换为Split操作的输入流: // F->Split->Join->T 变换为:F->T splitInput := splitNode.InputStreams().Get(0) - for _, to := range joinNode.Joined().To().RawArray() { - splitInput.StreamTo(to, to.InputStreams().IndexOf(joinNode.Joined())) + for _, to := range joinNode.Joined().Dst.RawArray() { + splitInput.To(to, to.InputStreams().IndexOf(joinNode.Joined())) } - splitInput.StreamNotTo(splitNode, 0) + splitInput.NotTo(splitNode) // 并删除这两个指令 ctx.DAG.RemoveNode(joinNode) @@ -618,7 +669,7 @@ func removeUnusedJoin(ctx *ParseContext) bool { changed := false dag.WalkOnlyType[*ops2.ChunkedJoinNode](ctx.DAG.Graph, func(node *ops2.ChunkedJoinNode) bool { - if node.Joined().To().Len() > 0 { + if node.Joined().Dst.Len() > 0 { return true } @@ -634,9 +685,9 @@ func removeUnusedJoin(ctx *ParseContext) bool { func removeUnusedMultiplyOutput(ctx *ParseContext) bool { changed := false dag.WalkOnlyType[*ops2.ECMultiplyNode](ctx.DAG.Graph, func(node *ops2.ECMultiplyNode) bool { - outArr := node.OutputStreams().RawArray() + outArr := node.OutputStreams().Slots.RawArray() for i2, out := range outArr { - if out.To().Len() > 0 { + if out.Dst.Len() > 0 { continue } @@ -645,8 +696,7 @@ func removeUnusedMultiplyOutput(ctx *ParseContext) bool { changed = true } - // TODO2 没有修改SlotIndex - node.OutputStreams().SetRawArray(lo2.RemoveAllDefault(outArr)) + node.OutputStreams().Slots.SetRawArray(lo2.RemoveAllDefault(outArr)) node.OutputIndexes = lo2.RemoveAll(node.OutputIndexes, -2) // 如果所有输出流都被删除,则删除该指令 @@ -666,8 +716,8 @@ func removeUnusedSplit(ctx *ParseContext) bool { changed := false dag.WalkOnlyType[*ops2.ChunkedSplitNode](ctx.DAG.Graph, func(typ *ops2.ChunkedSplitNode) bool { // Split出来的每一个流都没有被使用,才能删除这个指令 - for _, out := range typ.OutputStreams().RawArray() { - if out.To().Len() > 0 { + for _, out := range typ.OutputStreams().Slots.RawArray() { + if out.Dst.Len() > 0 { return true } } @@ -688,14 +738,14 @@ func omitSplitJoin(ctx *ParseContext) bool { dag.WalkOnlyType[*ops2.ChunkedSplitNode](ctx.DAG.Graph, func(splitNode *ops2.ChunkedSplitNode) bool { // Split指令的每一个输出都有且只有一个目的地 var dstNode dag.Node - for _, out := range splitNode.OutputStreams().RawArray() { - if out.To().Len() != 1 { + for _, out := range splitNode.OutputStreams().Slots.RawArray() { + if out.Dst.Len() != 1 { return true } if dstNode == nil { - dstNode = out.To().Get(0) - } else if dstNode != out.To().Get(0) { + dstNode = out.Dst.Get(0) + } else if dstNode != out.Dst.Get(0) { return true } } @@ -719,10 +769,10 @@ func omitSplitJoin(ctx *ParseContext) bool { // 所有条件都满足,可以开始省略操作,将Join操作的目的地的输入流替换为Split操作的输入流: // F->Split->Join->T 变换为:F->T splitInput := splitNode.InputStreams().Get(0) - for _, to := range joinNode.Joined().To().RawArray() { - splitInput.StreamTo(to, to.InputStreams().IndexOf(joinNode.Joined())) + for _, to := range joinNode.Joined().Dst.RawArray() { + splitInput.To(to, to.InputStreams().IndexOf(joinNode.Joined())) } - splitInput.StreamNotTo(splitNode, 0) + splitInput.NotTo(splitNode) // 并删除这两个指令 ctx.DAG.RemoveNode(joinNode) @@ -746,8 +796,8 @@ func pin(ctx *ParseContext) bool { } var toEnv *dag.NodeEnv - for _, out := range node.OutputStreams().RawArray() { - for _, to := range out.To().RawArray() { + for _, out := range node.OutputStreams().Slots.RawArray() { + for _, to := range out.Dst.RawArray() { if to.Env().Type == dag.EnvUnknown { continue } @@ -772,14 +822,14 @@ func pin(ctx *ParseContext) bool { // 否则根据输入流的始发地来固定 var fromEnv *dag.NodeEnv - for _, in := range node.InputStreams().RawArray() { - if in.From().Env().Type == dag.EnvUnknown { + for _, in := range node.InputStreams().Slots.RawArray() { + if in.Src.Env().Type == dag.EnvUnknown { continue } if fromEnv == nil { - fromEnv = in.From().Env() - } else if !fromEnv.Equals(in.From().Env()) { + fromEnv = in.Src.Env() + } else if !fromEnv.Equals(in.Src.Env()) { fromEnv = nil break } @@ -815,8 +865,8 @@ func removeUnusedFromNode(ctx *ParseContext) { // 对于所有未使用的流,增加Drop指令 func dropUnused(ctx *ParseContext) { ctx.DAG.Walk(func(node dag.Node) bool { - for _, out := range node.OutputStreams().RawArray() { - if out.To().Len() == 0 { + for _, out := range node.OutputStreams().Slots.RawArray() { + if out.Dst.Len() == 0 { n := ctx.DAG.NewDropStream() *n.Env() = *node.Env() n.SetInput(out) @@ -853,12 +903,12 @@ func generateRange(ctx *ParseContext) { if toStrIdx.IsRaw() { n := ctx.DAG.NewRange() toInput := toNode.Input() - *n.Env() = *toInput.Var.From().Env() + *n.Env() = *toInput.Var.Src.Env() rnged := n.RangeStream(toInput.Var, exec.Range{ Offset: toRng.Offset - ctx.StreamRange.Offset, Length: toRng.Length, }) - toInput.Var.StreamNotTo(toNode, toInput.Index) + toInput.Var.NotTo(toNode) toNode.SetInput(rnged) } else if toStrIdx.IsEC() { @@ -869,12 +919,12 @@ func generateRange(ctx *ParseContext) { n := ctx.DAG.NewRange() toInput := toNode.Input() - *n.Env() = *toInput.Var.From().Env() + *n.Env() = *toInput.Var.Src.Env() rnged := n.RangeStream(toInput.Var, exec.Range{ Offset: toRng.Offset - blkStart, Length: toRng.Length, }) - toInput.Var.StreamNotTo(toNode, toInput.Index) + toInput.Var.NotTo(toNode) toNode.SetInput(rnged) } else if toStrIdx.IsSegment() { // if frNode, ok := toNode.Input().Var.From().Node.(ops2.FromNode); ok { @@ -894,7 +944,7 @@ func generateRange(ctx *ParseContext) { // Offset: strStart - ctx.StreamRange.Offset, // Length: toRng.Length, // }) - // toInput.Var.StreamNotTo(toNode, toInput.Index) + // toInput.Var.NotTo(toNode, toInput.Index) // toNode.SetInput(rnged) } } @@ -903,31 +953,31 @@ func generateRange(ctx *ParseContext) { // 生成Clone指令 func generateClone(ctx *ParseContext) { ctx.DAG.Walk(func(node dag.Node) bool { - for _, outVar := range node.OutputStreams().RawArray() { - if outVar.To().Len() <= 1 { + for _, outVar := range node.OutputStreams().Slots.RawArray() { + if outVar.Dst.Len() <= 1 { continue } c := ctx.DAG.NewCloneStream() *c.Env() = *node.Env() - for _, to := range outVar.To().RawArray() { - c.NewOutput().StreamTo(to, to.InputStreams().IndexOf(outVar)) + for _, to := range outVar.Dst.RawArray() { + c.NewOutput().To(to, to.InputStreams().IndexOf(outVar)) } - outVar.To().Resize(0) + outVar.Dst.Resize(0) c.SetInput(outVar) } - for _, outVar := range node.OutputValues().RawArray() { - if outVar.To().Len() <= 1 { + for _, outVar := range node.OutputValues().Slots.RawArray() { + if outVar.Dst.Len() <= 1 { continue } t := ctx.DAG.NewCloneValue() *t.Env() = *node.Env() - for _, to := range outVar.To().RawArray() { - t.NewOutput().ValueTo(to, to.InputValues().IndexOf(outVar)) + for _, to := range outVar.Dst.RawArray() { + t.NewOutput().To(to, to.InputValues().IndexOf(outVar)) } - outVar.To().Resize(0) + outVar.Dst.Resize(0) t.SetInput(outVar) } diff --git a/common/pkgs/ioswitchlrc/ops2/chunked.go b/common/pkgs/ioswitchlrc/ops2/chunked.go index 3c75f6f..6a23c26 100644 --- a/common/pkgs/ioswitchlrc/ops2/chunked.go +++ b/common/pkgs/ioswitchlrc/ops2/chunked.go @@ -4,7 +4,6 @@ import ( "fmt" "io" - "github.com/samber/lo" "gitlink.org.cn/cloudream/common/pkgs/future" "gitlink.org.cn/cloudream/common/pkgs/ioswitch/dag" "gitlink.org.cn/cloudream/common/pkgs/ioswitch/exec" @@ -103,54 +102,40 @@ func (o *ChunkedJoin) String() string { type ChunkedSplitNode struct { dag.NodeBase - ChunkSize int + ChunkSize int + SplitCount int } -func (b *GraphNodeBuilder) NewChunkedSplit(chunkSize int) *ChunkedSplitNode { +func (b *GraphNodeBuilder) NewChunkedSplit(chunkSize int, splitCnt int) *ChunkedSplitNode { node := &ChunkedSplitNode{ - ChunkSize: chunkSize, + ChunkSize: chunkSize, + SplitCount: splitCnt, } b.AddNode(node) + + node.InputStreams().Init(1) + node.OutputStreams().Init(node, splitCnt) return node } -func (t *ChunkedSplitNode) Split(input *dag.Var, cnt int) { - t.InputStreams().EnsureSize(1) - input.StreamTo(t, 0) - t.OutputStreams().Resize(cnt) - for i := 0; i < cnt; i++ { - t.OutputStreams().Setup(t, t.Graph().NewVar(), i) - } +func (t *ChunkedSplitNode) Split(input *dag.StreamVar) { + input.To(t, 0) } -func (t *ChunkedSplitNode) SubStream(idx int) *dag.Var { +func (t *ChunkedSplitNode) SubStream(idx int) *dag.StreamVar { return t.OutputStreams().Get(idx) } -func (t *ChunkedSplitNode) SplitCount() int { - return t.OutputStreams().Len() -} - -func (t *ChunkedSplitNode) Clear() { - if t.InputStreams().Len() == 0 { - return - } - - t.InputStreams().Get(0).StreamNotTo(t, 0) - t.InputStreams().Resize(0) - - for _, out := range t.OutputStreams().RawArray() { - out.NoInputAllStream() - } - t.OutputStreams().Resize(0) +func (t *ChunkedSplitNode) RemoveAllStream() { + t.InputStreams().ClearAllInput(t) + t.OutputStreams().ClearAllOutput(t) + t.OutputStreams().Slots.Resize(0) } func (t *ChunkedSplitNode) GenerateOp() (exec.Op, error) { return &ChunkedSplit{ - Input: t.InputStreams().Get(0).VarID, - Outputs: lo.Map(t.OutputStreams().RawArray(), func(v *dag.Var, idx int) exec.VarID { - return v.VarID - }), + Input: t.InputStreams().Get(0).VarID, + Outputs: t.OutputStreams().GetVarIDs(), ChunkSize: t.ChunkSize, PaddingZeros: true, }, nil @@ -170,31 +155,27 @@ func (b *GraphNodeBuilder) NewChunkedJoin(chunkSize int) *ChunkedJoinNode { ChunkSize: chunkSize, } b.AddNode(node) - node.OutputStreams().SetupNew(node, b.Graph.NewVar()) + node.OutputStreams().Init(node, 1) return node } -func (t *ChunkedJoinNode) AddInput(str *dag.Var) { +func (t *ChunkedJoinNode) AddInput(str *dag.StreamVar) { idx := t.InputStreams().EnlargeOne() - str.StreamTo(t, idx) + str.To(t, idx) } -func (t *ChunkedJoinNode) Joined() *dag.Var { +func (t *ChunkedJoinNode) Joined() *dag.StreamVar { return t.OutputStreams().Get(0) } func (t *ChunkedJoinNode) RemoveAllInputs() { - for i, in := range t.InputStreams().RawArray() { - in.StreamNotTo(t, i) - } - t.InputStreams().Resize(0) + t.InputStreams().ClearAllInput(t) + t.InputStreams().Slots.Resize(0) } func (t *ChunkedJoinNode) GenerateOp() (exec.Op, error) { return &ChunkedJoin{ - Inputs: lo.Map(t.InputStreams().RawArray(), func(v *dag.Var, idx int) exec.VarID { - return v.VarID - }), + Inputs: t.InputStreams().GetVarIDs(), Output: t.OutputStreams().Get(0).VarID, ChunkSize: t.ChunkSize, }, nil diff --git a/common/pkgs/ioswitchlrc/ops2/clone.go b/common/pkgs/ioswitchlrc/ops2/clone.go index 7ede930..5cf4158 100644 --- a/common/pkgs/ioswitchlrc/ops2/clone.go +++ b/common/pkgs/ioswitchlrc/ops2/clone.go @@ -4,7 +4,6 @@ import ( "fmt" "io" - "github.com/samber/lo" "gitlink.org.cn/cloudream/common/pkgs/ioswitch/dag" "gitlink.org.cn/cloudream/common/pkgs/ioswitch/exec" "gitlink.org.cn/cloudream/common/pkgs/ioswitch/utils" @@ -33,7 +32,10 @@ func (o *CloneStream) Execute(ctx *exec.ExecContext, e *exec.Executor) error { sem := semaphore.NewWeighted(int64(len(o.Cloneds))) for i, s := range cloned { - sem.Acquire(ctx.Context, 1) + err = sem.Acquire(ctx.Context, 1) + if err != nil { + return err + } e.PutVar(o.Cloneds[i], &exec.StreamValue{ Stream: io2.AfterReadClosedOnce(s, func(closer io.ReadCloser) { @@ -78,26 +80,22 @@ type CloneStreamType struct { func (b *GraphNodeBuilder) NewCloneStream() *CloneStreamType { node := &CloneStreamType{} b.AddNode(node) + node.InputStreams().Init(1) return node } -func (t *CloneStreamType) SetInput(raw *dag.Var) { - t.InputStreams().EnsureSize(1) - raw.StreamTo(t, 0) +func (t *CloneStreamType) SetInput(raw *dag.StreamVar) { + raw.To(t, 0) } -func (t *CloneStreamType) NewOutput() *dag.Var { - output := t.Graph().NewVar() - t.OutputStreams().SetupNew(t, output) - return output +func (t *CloneStreamType) NewOutput() *dag.StreamVar { + return t.OutputStreams().SetupNew(t).Var } func (t *CloneStreamType) GenerateOp() (exec.Op, error) { return &CloneStream{ - Raw: t.InputStreams().Get(0).VarID, - Cloneds: lo.Map(t.OutputStreams().RawArray(), func(v *dag.Var, idx int) exec.VarID { - return v.VarID - }), + Raw: t.InputStreams().Get(0).VarID, + Cloneds: t.OutputValues().GetVarIDs(), }, nil } @@ -115,23 +113,19 @@ func (b *GraphNodeBuilder) NewCloneValue() *CloneVarType { return node } -func (t *CloneVarType) SetInput(raw *dag.Var) { - t.InputValues().EnsureSize(1) - raw.ValueTo(t, 0) +func (t *CloneVarType) SetInput(raw *dag.ValueVar) { + t.InputValues().Init(1) + raw.To(t, 0) } -func (t *CloneVarType) NewOutput() *dag.Var { - output := t.Graph().NewVar() - t.OutputValues().SetupNew(t, output) - return output +func (t *CloneVarType) NewOutput() *dag.ValueVar { + return t.OutputValues().AppendNew(t).Var } func (t *CloneVarType) GenerateOp() (exec.Op, error) { return &CloneVar{ - Raw: t.InputValues().Get(0).VarID, - Cloneds: lo.Map(t.OutputValues().RawArray(), func(v *dag.Var, idx int) exec.VarID { - return v.VarID - }), + Raw: t.InputValues().Get(0).VarID, + Cloneds: t.OutputValues().GetVarIDs(), }, nil } diff --git a/common/pkgs/ioswitchlrc/ops2/ec.go b/common/pkgs/ioswitchlrc/ops2/ec.go index f330028..2c6eb03 100644 --- a/common/pkgs/ioswitchlrc/ops2/ec.go +++ b/common/pkgs/ioswitchlrc/ops2/ec.go @@ -4,7 +4,6 @@ import ( "fmt" "io" - "github.com/samber/lo" "gitlink.org.cn/cloudream/common/pkgs/future" "gitlink.org.cn/cloudream/common/pkgs/ioswitch/dag" "gitlink.org.cn/cloudream/common/pkgs/ioswitch/exec" @@ -128,25 +127,21 @@ func (b *GraphNodeBuilder) NewLRCConstructAny(lrc cdssdk.LRCRedundancy) *LRCCons return node } -func (t *LRCConstructAnyNode) AddInput(str *dag.Var, dataIndex int) { +func (t *LRCConstructAnyNode) AddInput(str *dag.StreamVar, dataIndex int) { t.InputIndexes = append(t.InputIndexes, dataIndex) idx := t.InputStreams().EnlargeOne() - str.StreamTo(t, idx) + str.To(t, idx) } func (t *LRCConstructAnyNode) RemoveAllInputs() { - for i, in := range t.InputStreams().RawArray() { - in.StreamNotTo(t, i) - } - t.InputStreams().Resize(0) + t.InputStreams().ClearAllInput(t) + t.InputStreams().Slots.Resize(0) t.InputIndexes = nil } -func (t *LRCConstructAnyNode) NewOutput(dataIndex int) *dag.Var { +func (t *LRCConstructAnyNode) NewOutput(dataIndex int) *dag.StreamVar { t.OutputIndexes = append(t.OutputIndexes, dataIndex) - output := t.Graph().NewVar() - t.OutputStreams().SetupNew(t, output) - return output + return t.OutputStreams().SetupNew(t).Var } func (t *LRCConstructAnyNode) GenerateOp() (exec.Op, error) { @@ -161,8 +156,8 @@ func (t *LRCConstructAnyNode) GenerateOp() (exec.Op, error) { return &GalMultiply{ Coef: coef, - Inputs: lo.Map(t.InputStreams().RawArray(), func(v *dag.Var, idx int) exec.VarID { return v.VarID }), - Outputs: lo.Map(t.OutputStreams().RawArray(), func(v *dag.Var, idx int) exec.VarID { return v.VarID }), + Inputs: t.InputStreams().GetVarIDs(), + Outputs: t.OutputStreams().GetVarIDs(), ChunkSize: t.LRC.ChunkSize, }, nil } @@ -182,21 +177,20 @@ func (b *GraphNodeBuilder) NewLRCConstructGroup(lrc cdssdk.LRCRedundancy) *LRCCo LRC: lrc, } b.AddNode(node) + + node.OutputStreams().Init(node, 1) return node } -func (t *LRCConstructGroupNode) SetupForTarget(blockIdx int, inputs []*dag.Var) *dag.Var { +func (t *LRCConstructGroupNode) SetupForTarget(blockIdx int, inputs []*dag.StreamVar) *dag.StreamVar { t.TargetBlockIndex = blockIdx - t.InputStreams().Resize(0) - for _, in := range inputs { - idx := t.InputStreams().EnlargeOne() - in.StreamTo(t, idx) + t.InputStreams().Init(len(inputs)) + for i := 0; i < len(inputs); i++ { + inputs[i].To(t, i) } - output := t.Graph().NewVar() - t.OutputStreams().Setup(t, output, 0) - return output + return t.OutputStreams().Get(0) } func (t *LRCConstructGroupNode) GenerateOp() (exec.Op, error) { @@ -211,8 +205,8 @@ func (t *LRCConstructGroupNode) GenerateOp() (exec.Op, error) { return &GalMultiply{ Coef: coef, - Inputs: lo.Map(t.InputStreams().RawArray(), func(v *dag.Var, idx int) exec.VarID { return v.VarID }), - Outputs: lo.Map(t.OutputStreams().RawArray(), func(v *dag.Var, idx int) exec.VarID { return v.VarID }), + Inputs: t.InputStreams().GetVarIDs(), + Outputs: t.OutputStreams().GetVarIDs(), ChunkSize: t.LRC.ChunkSize, }, nil } diff --git a/common/pkgs/ioswitchlrc/ops2/ops.go b/common/pkgs/ioswitchlrc/ops2/ops.go index 83f31b8..a41ec08 100644 --- a/common/pkgs/ioswitchlrc/ops2/ops.go +++ b/common/pkgs/ioswitchlrc/ops2/ops.go @@ -15,13 +15,13 @@ func NewGraphNodeBuilder() *GraphNodeBuilder { type FromNode interface { dag.Node - Output() dag.Slot + Output() dag.StreamSlot } type ToNode interface { dag.Node - Input() dag.Slot - SetInput(input *dag.Var) + Input() dag.StreamSlot + SetInput(input *dag.StreamVar) } // func formatStreamIO(node *dag.Node) string { diff --git a/common/pkgs/ioswitchlrc/ops2/range.go b/common/pkgs/ioswitchlrc/ops2/range.go index b2bf66f..82a454e 100644 --- a/common/pkgs/ioswitchlrc/ops2/range.go +++ b/common/pkgs/ioswitchlrc/ops2/range.go @@ -72,7 +72,11 @@ func (o *Range) Execute(ctx *exec.ExecContext, e *exec.Executor) error { } func (o *Range) String() string { - return fmt.Sprintf("Range(%v+%v) %v -> %v", o.Offset, o.Length, o.Input, o.Output) + len := "" + if o.Length != nil { + len = fmt.Sprintf("%v", *o.Length) + } + return fmt.Sprintf("Range(%v+%v) %v -> %v", o.Offset, len, o.Input, o.Output) } type RangeNode struct { @@ -83,16 +87,16 @@ type RangeNode struct { func (b *GraphNodeBuilder) NewRange() *RangeNode { node := &RangeNode{} b.AddNode(node) + + node.InputStreams().Init(1) + node.OutputStreams().Init(node, 1) return node } -func (t *RangeNode) RangeStream(input *dag.Var, rng exec.Range) *dag.Var { - t.InputStreams().EnsureSize(1) - input.StreamTo(t, 0) +func (t *RangeNode) RangeStream(input *dag.StreamVar, rng exec.Range) *dag.StreamVar { + input.To(t, 0) t.Range = rng - output := t.Graph().NewVar() - t.OutputStreams().Setup(t, output, 0) - return output + return t.OutputStreams().Get(0) } func (t *RangeNode) GenerateOp() (exec.Op, error) { diff --git a/common/pkgs/ioswitchlrc/ops2/shard_store.go b/common/pkgs/ioswitchlrc/ops2/shard_store.go index 3f22f50..3cb341e 100644 --- a/common/pkgs/ioswitchlrc/ops2/shard_store.go +++ b/common/pkgs/ioswitchlrc/ops2/shard_store.go @@ -10,6 +10,7 @@ import ( "gitlink.org.cn/cloudream/common/pkgs/logger" cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" "gitlink.org.cn/cloudream/common/utils/io2" + "gitlink.org.cn/cloudream/storage/common/pkgs/ioswitchlrc" "gitlink.org.cn/cloudream/storage/common/pkgs/storage/mgr" "gitlink.org.cn/cloudream/storage/common/pkgs/storage/types" ) @@ -66,7 +67,7 @@ func (o *ShardRead) Execute(ctx *exec.ExecContext, e *exec.Executor) error { } func (o *ShardRead) String() string { - return fmt.Sprintf("ShardRead %v -> %v", o.Open, o.Output) + return fmt.Sprintf("ShardRead %v -> %v", o.Open.String(), o.Output) } type ShardWrite struct { @@ -100,7 +101,7 @@ func (o *ShardWrite) Execute(ctx *exec.ExecContext, e *exec.Executor) error { fileInfo, err := store.Create(input.Stream) if err != nil { - return fmt.Errorf("finishing writing file to shard store: %w", err) + return fmt.Errorf("writing file to shard store: %w", err) } e.PutVar(o.FileHash, &FileHashValue{ @@ -115,22 +116,29 @@ func (o *ShardWrite) String() string { type ShardReadNode struct { dag.NodeBase + From ioswitchlrc.From StorageID cdssdk.StorageID Open types.OpenOption } -func (b *GraphNodeBuilder) NewShardRead(stgID cdssdk.StorageID, open types.OpenOption) *ShardReadNode { +func (b *GraphNodeBuilder) NewShardRead(fr ioswitchlrc.From, stgID cdssdk.StorageID, open types.OpenOption) *ShardReadNode { node := &ShardReadNode{ + From: fr, StorageID: stgID, Open: open, } b.AddNode(node) - node.OutputStreams().SetupNew(node, b.NewVar()) + + node.OutputStreams().Init(node, 1) return node } -func (t *ShardReadNode) Output() dag.Slot { - return dag.Slot{ +func (t *ShardReadNode) GetFrom() ioswitchlrc.From { + return t.From +} + +func (t *ShardReadNode) Output() dag.StreamSlot { + return dag.StreamSlot{ Var: t.OutputStreams().Get(0), Index: 0, } @@ -150,38 +158,48 @@ func (t *ShardReadNode) GenerateOp() (exec.Op, error) { type ShardWriteNode struct { dag.NodeBase + To ioswitchlrc.To + StorageID cdssdk.StorageID FileHashStoreKey string } -func (b *GraphNodeBuilder) NewShardWrite(fileHashStoreKey string) *ShardWriteNode { +func (b *GraphNodeBuilder) NewShardWrite(to ioswitchlrc.To, stgID cdssdk.StorageID, fileHashStoreKey string) *ShardWriteNode { node := &ShardWriteNode{ + To: to, + StorageID: stgID, FileHashStoreKey: fileHashStoreKey, } b.AddNode(node) + + node.InputStreams().Init(1) + node.OutputValues().Init(node, 1) return node } -func (t *ShardWriteNode) SetInput(input *dag.Var) { - t.InputStreams().EnsureSize(1) - input.StreamTo(t, 0) - t.OutputValues().SetupNew(t, t.Graph().NewVar()) +func (t *ShardWriteNode) GetTo() ioswitchlrc.To { + return t.To } -func (t *ShardWriteNode) Input() dag.Slot { - return dag.Slot{ +func (t *ShardWriteNode) SetInput(input *dag.StreamVar) { + input.To(t, 0) +} + +func (t *ShardWriteNode) Input() dag.StreamSlot { + return dag.StreamSlot{ Var: t.InputStreams().Get(0), Index: 0, } } -func (t *ShardWriteNode) FileHashVar() *dag.Var { +func (t *ShardWriteNode) FileHashVar() *dag.ValueVar { return t.OutputValues().Get(0) } func (t *ShardWriteNode) GenerateOp() (exec.Op, error) { return &ShardWrite{ - Input: t.InputStreams().Get(0).VarID, - FileHash: t.OutputValues().Get(0).VarID, + Input: t.InputStreams().Get(0).VarID, + FileHash: t.OutputValues().Get(0).VarID, + StorageID: t.StorageID, }, nil } diff --git a/common/pkgs/ioswitchlrc/parser/generator.go b/common/pkgs/ioswitchlrc/parser/generator.go index 0ce31ce..27e5425 100644 --- a/common/pkgs/ioswitchlrc/parser/generator.go +++ b/common/pkgs/ioswitchlrc/parser/generator.go @@ -83,8 +83,8 @@ func buildDAGEncode(ctx *GenerateContext, fr ioswitchlrc.From, toes []ioswitchlr } // 需要文件块,则生成Split指令 - splitNode := ctx.DAG.NewChunkedSplit(ctx.LRC.ChunkSize) - splitNode.Split(frNode.Output().Var, ctx.LRC.K) + splitNode := ctx.DAG.NewChunkedSplit(ctx.LRC.ChunkSize, ctx.LRC.K) + splitNode.Split(frNode.Output().Var) for _, to := range dataToes { toNode, err := buildToNode(ctx, to) @@ -104,7 +104,7 @@ func buildDAGEncode(ctx *GenerateContext, fr ioswitchlrc.From, toes []ioswitchlr conType := ctx.DAG.NewLRCConstructAny(ctx.LRC) - for i, out := range splitNode.OutputStreams().RawArray() { + for i, out := range splitNode.OutputStreams().Slots.RawArray() { conType.AddInput(out, i) } @@ -271,7 +271,7 @@ func ReconstructGroup(frs []ioswitchlrc.From, toes []ioswitchlrc.To, blder *exec func buildDAGReconstructGroup(ctx *GenerateContext, frs []ioswitchlrc.From, toes []ioswitchlrc.To) error { - var inputs []*dag.Var + var inputs []*dag.StreamVar for _, fr := range frs { frNode, err := buildFromNode(ctx, fr) if err != nil { diff --git a/common/pkgs/ioswitchlrc/parser/passes.go b/common/pkgs/ioswitchlrc/parser/passes.go index e9aae1d..08c6153 100644 --- a/common/pkgs/ioswitchlrc/parser/passes.go +++ b/common/pkgs/ioswitchlrc/parser/passes.go @@ -64,7 +64,7 @@ func buildFromNode(ctx *GenerateContext, f ioswitchlrc.From) (ops2.FromNode, err switch f := f.(type) { case *ioswitchlrc.FromNode: - t := ctx.DAG.NewShardRead(f.Storage.StorageID, types.NewOpen(f.FileHash)) + t := ctx.DAG.NewShardRead(f, f.Storage.StorageID, types.NewOpen(f.FileHash)) if f.DataIndex == -1 { t.Open.WithNullableLength(repRange.Offset, repRange.Length) @@ -101,7 +101,7 @@ func buildFromNode(ctx *GenerateContext, f ioswitchlrc.From) (ops2.FromNode, err func buildToNode(ctx *GenerateContext, t ioswitchlrc.To) (ops2.ToNode, error) { switch t := t.(type) { case *ioswitchlrc.ToNode: - n := ctx.DAG.NewShardWrite(t.FileHashStoreKey) + n := ctx.DAG.NewShardWrite(t, t.Storage.StorageID, t.FileHashStoreKey) switch addr := t.Hub.Address.(type) { // case *cdssdk.HttpAddressInfo: // n.Env().ToEnvWorker(&ioswitchlrc.HttpHubWorker{Node: t.Hub}) @@ -140,8 +140,8 @@ func pin(ctx *GenerateContext) bool { } var toEnv *dag.NodeEnv - for _, out := range node.OutputStreams().RawArray() { - for _, to := range out.To().RawArray() { + for _, out := range node.OutputStreams().Slots.RawArray() { + for _, to := range out.Dst.RawArray() { if to.Env().Type == dag.EnvUnknown { continue } @@ -166,14 +166,14 @@ func pin(ctx *GenerateContext) bool { // 否则根据输入流的始发地来固定 var fromEnv *dag.NodeEnv - for _, in := range node.InputStreams().RawArray() { - if in.From().Env().Type == dag.EnvUnknown { + for _, in := range node.InputStreams().Slots.RawArray() { + if in.Src.Env().Type == dag.EnvUnknown { continue } if fromEnv == nil { - fromEnv = in.From().Env() - } else if !fromEnv.Equals(in.From().Env()) { + fromEnv = in.Src.Env() + } else if !fromEnv.Equals(in.Src.Env()) { fromEnv = nil break } @@ -195,8 +195,8 @@ func pin(ctx *GenerateContext) bool { // 对于所有未使用的流,增加Drop指令 func dropUnused(ctx *GenerateContext) { ctx.DAG.Walk(func(node dag.Node) bool { - for _, out := range node.OutputStreams().RawArray() { - if out.To().Len() == 0 { + for _, out := range node.OutputStreams().Slots.RawArray() { + if out.Dst.Len() == 0 { n := ctx.DAG.NewDropStream() *n.Env() = *node.Env() n.SetInput(out) @@ -233,12 +233,12 @@ func generateRange(ctx *GenerateContext) { if toDataIdx == -1 { n := ctx.DAG.NewRange() toInput := toNode.Input() - *n.Env() = *toInput.Var.From().Env() + *n.Env() = *toInput.Var.Src.Env() rnged := n.RangeStream(toInput.Var, exec.Range{ Offset: toRng.Offset - ctx.StreamRange.Offset, Length: toRng.Length, }) - toInput.Var.StreamNotTo(toNode, toInput.Index) + toInput.Var.NotTo(toNode) toNode.SetInput(rnged) } else { @@ -249,12 +249,12 @@ func generateRange(ctx *GenerateContext) { n := ctx.DAG.NewRange() toInput := toNode.Input() - *n.Env() = *toInput.Var.From().Env() + *n.Env() = *toInput.Var.Src.Env() rnged := n.RangeStream(toInput.Var, exec.Range{ Offset: toRng.Offset - blkStart, Length: toRng.Length, }) - toInput.Var.StreamNotTo(toNode, toInput.Index) + toInput.Var.NotTo(toNode) toNode.SetInput(rnged) } } @@ -263,31 +263,31 @@ func generateRange(ctx *GenerateContext) { // 生成Clone指令 func generateClone(ctx *GenerateContext) { ctx.DAG.Walk(func(node dag.Node) bool { - for _, outVar := range node.OutputStreams().RawArray() { - if outVar.To().Len() <= 1 { + for _, outVar := range node.OutputStreams().Slots.RawArray() { + if outVar.Dst.Len() <= 1 { continue } t := ctx.DAG.NewCloneStream() *t.Env() = *node.Env() - for _, to := range outVar.To().RawArray() { - t.NewOutput().StreamTo(to, to.InputStreams().IndexOf(outVar)) + for _, to := range outVar.Dst.RawArray() { + t.NewOutput().To(to, to.InputStreams().IndexOf(outVar)) } - outVar.To().Resize(0) + outVar.Dst.Resize(0) t.SetInput(outVar) } - for _, outVar := range node.OutputValues().RawArray() { - if outVar.To().Len() <= 1 { + for _, outVar := range node.OutputValues().Slots.RawArray() { + if outVar.Dst.Len() <= 1 { continue } t := ctx.DAG.NewCloneValue() *t.Env() = *node.Env() - for _, to := range outVar.To().RawArray() { - t.NewOutput().ValueTo(to, to.InputValues().IndexOf(outVar)) + for _, to := range outVar.Dst.RawArray() { + t.NewOutput().To(to, to.InputValues().IndexOf(outVar)) } - outVar.To().Resize(0) + outVar.Dst.Resize(0) t.SetInput(outVar) }