diff --git a/client/internal/cmdline/test.go b/client/internal/cmdline/test.go index 1fb7427..02f9e37 100644 --- a/client/internal/cmdline/test.go +++ b/client/internal/cmdline/test.go @@ -19,8 +19,8 @@ import ( func init() { rootCmd.AddCommand(&cobra.Command{ - Use: "test2", - Short: "test2", + Use: "test", + Short: "test", // Args: cobra.ExactArgs(1), Run: func(cmd *cobra.Command, args []string) { // cmdCtx := GetCmdCtx(cmd) @@ -160,8 +160,8 @@ func init() { }) rootCmd.AddCommand(&cobra.Command{ - Use: "test", - Short: "test", + Use: "test4", + Short: "test4", // Args: cobra.ExactArgs(1), Run: func(cmd *cobra.Command, args []string) { // cmdCtx := GetCmdCtx(cmd) @@ -202,8 +202,8 @@ func init() { }) rootCmd.AddCommand(&cobra.Command{ - Use: "test4", - Short: "test4", + Use: "test3", + Short: "test3", // Args: cobra.ExactArgs(1), Run: func(cmd *cobra.Command, args []string) { // cmdCtx := GetCmdCtx(cmd) @@ -228,7 +228,7 @@ func init() { ioswitchlrc.NewFromNode("QmQBKncEDqxw3BrGr3th3gS3jUC2fizGz1w29ZxxrrKfNv", &nodes.Nodes[0], 2), }, []ioswitchlrc.To{ ioswitchlrc.NewToNodeWithRange(nodes.Nodes[1], -1, "-1", exec.Range{0, &le}), - ioswitchlrc.NewToNode(nodes.Nodes[1], 0, "0"), + ioswitchlrc.NewToNodeWithRange(nodes.Nodes[1], 0, "0", exec.Range{10, &le}), ioswitchlrc.NewToNode(nodes.Nodes[1], 1, "1"), ioswitchlrc.NewToNode(nodes.Nodes[1], 2, "2"), ioswitchlrc.NewToNode(nodes.Nodes[1], 3, "3"), diff --git a/common/pkgs/ioswitch2/ioswitch.go b/common/pkgs/ioswitch2/ioswitch.go deleted file mode 100644 index 48b43ee..0000000 --- a/common/pkgs/ioswitch2/ioswitch.go +++ /dev/null @@ -1,23 +0,0 @@ -package ioswitch2 - -import ( - "gitlink.org.cn/cloudream/common/pkgs/ioswitch/exec" -) - -type NodeProps struct { - From From - To To -} - -type ValueVarType int - -const ( - StringValueVar ValueVarType = iota - SignalValueVar -) - -type VarProps struct { - StreamIndex int // 流的编号,只在StreamVar上有意义 - ValueType ValueVarType // 值类型,只在ValueVar上有意义 - Var exec.Var // 生成Plan的时候创建的对应的Var -} diff --git a/common/pkgs/ioswitch2/ops2/chunked.go b/common/pkgs/ioswitch2/ops2/chunked.go index 0962e11..b4087b9 100644 --- a/common/pkgs/ioswitch2/ops2/chunked.go +++ b/common/pkgs/ioswitch2/ops2/chunked.go @@ -11,7 +11,6 @@ import ( "gitlink.org.cn/cloudream/common/pkgs/ioswitch/exec" "gitlink.org.cn/cloudream/common/pkgs/ioswitch/utils" "gitlink.org.cn/cloudream/common/utils/io2" - "gitlink.org.cn/cloudream/storage/common/pkgs/ioswitch2" "golang.org/x/sync/semaphore" ) @@ -101,24 +100,54 @@ func (o *ChunkedJoin) String() string { ) } -type ChunkedSplitType struct { - OutputCount int - ChunkSize int +type ChunkedSplitNode struct { + dag.NodeBase + ChunkSize int } -func (t *ChunkedSplitType) InitNode(node *dag.Node) { - dag.NodeDeclareInputStream(node, 1) - for i := 0; i < t.OutputCount; i++ { - dag.NodeNewOutputStream(node, &ioswitch2.VarProps{ - StreamIndex: i, - }) +func (b *GraphNodeBuilder) NewChunkedSplit(chunkSize int) *ChunkedSplitNode { + node := &ChunkedSplitNode{ + ChunkSize: chunkSize, + } + b.AddNode(node) + return node +} + +func (t *ChunkedSplitNode) Split(input *dag.StreamVar, cnt int) { + t.InputStreams().EnsureSize(1) + input.Connect(t, 0) + t.OutputStreams().Resize(cnt) + for i := 0; i < cnt; i++ { + t.OutputStreams().Setup(t, t.Graph().NewStreamVar(), i) + } +} + +func (t *ChunkedSplitNode) SubStream(idx int) *dag.StreamVar { + return t.OutputStreams().Get(idx) +} + +func (t *ChunkedSplitNode) SplitCount() int { + return t.OutputStreams().Len() +} + +func (t *ChunkedSplitNode) Clear() { + if t.InputStreams().Len() == 0 { + return } + + t.InputStreams().Get(0).Disconnect(t, 0) + t.InputStreams().Resize(0) + + for _, out := range t.OutputStreams().RawArray() { + out.DisconnectAll() + } + t.OutputStreams().Resize(0) } -func (t *ChunkedSplitType) GenerateOp(op *dag.Node) (exec.Op, error) { +func (t *ChunkedSplitNode) GenerateOp() (exec.Op, error) { return &ChunkedSplit{ - Input: op.InputStreams[0].Var, - Outputs: lo.Map(op.OutputStreams, func(v *dag.StreamVar, idx int) *exec.StreamVar { + Input: t.InputStreams().Get(0).Var, + Outputs: lo.Map(t.OutputStreams().RawArray(), func(v *dag.StreamVar, idx int) *exec.StreamVar { return v.Var }), ChunkSize: t.ChunkSize, @@ -126,30 +155,50 @@ func (t *ChunkedSplitType) GenerateOp(op *dag.Node) (exec.Op, error) { }, nil } -func (t *ChunkedSplitType) String(node *dag.Node) string { - return fmt.Sprintf("ChunkedSplit[%v]%v%v", t.ChunkSize, formatStreamIO(node), formatValueIO(node)) +// func (t *ChunkedSplitNode) String() string { +// return fmt.Sprintf("ChunkedSplit[%v]%v%v", t.ChunkSize, formatStreamIO(node), formatValueIO(node)) +// } + +type ChunkedJoinNode struct { + dag.NodeBase + ChunkSize int } -type ChunkedJoinType struct { - InputCount int - ChunkSize int +func (b *GraphNodeBuilder) NewChunkedJoin(chunkSize int) *ChunkedJoinNode { + node := &ChunkedJoinNode{ + ChunkSize: chunkSize, + } + b.AddNode(node) + node.OutputStreams().SetupNew(node, b.Graph.NewStreamVar()) + return node } -func (t *ChunkedJoinType) InitNode(node *dag.Node) { - dag.NodeDeclareInputStream(node, t.InputCount) - dag.NodeNewOutputStream(node, &ioswitch2.VarProps{}) +func (t *ChunkedJoinNode) AddInput(str *dag.StreamVar) { + idx := t.InputStreams().EnlargeOne() + str.Connect(t, idx) } -func (t *ChunkedJoinType) GenerateOp(op *dag.Node) (exec.Op, error) { +func (t *ChunkedJoinNode) Joined() *dag.StreamVar { + return t.OutputStreams().Get(0) +} + +func (t *ChunkedJoinNode) RemoveAllInputs() { + for i, in := range t.InputStreams().RawArray() { + in.Disconnect(t, i) + } + t.InputStreams().Resize(0) +} + +func (t *ChunkedJoinNode) GenerateOp() (exec.Op, error) { return &ChunkedJoin{ - Inputs: lo.Map(op.InputStreams, func(v *dag.StreamVar, idx int) *exec.StreamVar { + Inputs: lo.Map(t.InputStreams().RawArray(), func(v *dag.StreamVar, idx int) *exec.StreamVar { return v.Var }), - Output: op.OutputStreams[0].Var, + Output: t.OutputStreams().Get(0).Var, ChunkSize: t.ChunkSize, }, nil } -func (t *ChunkedJoinType) String(node *dag.Node) string { - return fmt.Sprintf("ChunkedJoin[%v]%v%v", t.ChunkSize, formatStreamIO(node), formatValueIO(node)) -} +// func (t *ChunkedJoinType) String() string { +// return fmt.Sprintf("ChunkedJoin[%v]%v%v", t.ChunkSize, formatStreamIO(node), formatValueIO(node)) +// } diff --git a/common/pkgs/ioswitch2/ops2/clone.go b/common/pkgs/ioswitch2/ops2/clone.go index 45129a8..66d1194 100644 --- a/common/pkgs/ioswitch2/ops2/clone.go +++ b/common/pkgs/ioswitch2/ops2/clone.go @@ -74,48 +74,70 @@ func (o *CloneVar) String() string { return fmt.Sprintf("CloneStream %v -> (%v)", o.Raw.GetID(), utils.FormatVarIDs(o.Cloneds)) } -type CloneStreamType struct{} +type CloneStreamType struct { + dag.NodeBase +} + +func (b *GraphNodeBuilder) NewCloneStream() *CloneStreamType { + node := &CloneStreamType{} + b.AddNode(node) + return node +} + +func (t *CloneStreamType) SetInput(raw *dag.StreamVar) { + t.InputStreams().EnsureSize(1) + raw.Connect(t, 0) +} -func (t *CloneStreamType) InitNode(node *dag.Node) { - dag.NodeDeclareInputStream(node, 1) +func (t *CloneStreamType) NewOutput() *dag.StreamVar { + output := t.Graph().NewStreamVar() + t.OutputStreams().SetupNew(t, output) + return output } -func (t *CloneStreamType) GenerateOp(op *dag.Node) (exec.Op, error) { +func (t *CloneStreamType) GenerateOp() (exec.Op, error) { return &CloneStream{ - Raw: op.InputStreams[0].Var, - Cloneds: lo.Map(op.OutputStreams, func(v *dag.StreamVar, idx int) *exec.StreamVar { + Raw: t.InputStreams().Get(0).Var, + Cloneds: lo.Map(t.OutputStreams().RawArray(), func(v *dag.StreamVar, idx int) *exec.StreamVar { return v.Var }), }, nil } -func (t *CloneStreamType) NewOutput(node *dag.Node) *dag.StreamVar { - return dag.NodeNewOutputStream(node, nil) +// func (t *CloneStreamType) String() string { +// return fmt.Sprintf("CloneStream[]%v%v", formatStreamIO(node), formatValueIO(node)) +// } + +type CloneVarType struct { + dag.NodeBase } -func (t *CloneStreamType) String(node *dag.Node) string { - return fmt.Sprintf("CloneStream[]%v%v", formatStreamIO(node), formatValueIO(node)) +func (b *GraphNodeBuilder) NewCloneValue() *CloneVarType { + node := &CloneVarType{} + b.AddNode(node) + return node } -type CloneVarType struct{} +func (t *CloneVarType) SetInput(raw *dag.ValueVar) { + t.InputValues().EnsureSize(1) + raw.Connect(t, 0) +} -func (t *CloneVarType) InitNode(node *dag.Node) { - dag.NodeDeclareInputValue(node, 1) +func (t *CloneVarType) NewOutput() *dag.ValueVar { + output := t.Graph().NewValueVar(t.InputValues().Get(0).Type) + t.OutputValues().SetupNew(t, output) + return output } -func (t *CloneVarType) GenerateOp(op *dag.Node) (exec.Op, error) { +func (t *CloneVarType) GenerateOp() (exec.Op, error) { return &CloneVar{ - Raw: op.InputValues[0].Var, - Cloneds: lo.Map(op.OutputValues, func(v *dag.ValueVar, idx int) exec.Var { + Raw: t.InputValues().Get(0).Var, + Cloneds: lo.Map(t.OutputValues().RawArray(), func(v *dag.ValueVar, idx int) exec.Var { return v.Var }), }, nil } -func (t *CloneVarType) NewOutput(node *dag.Node) *dag.ValueVar { - return dag.NodeNewOutputValue(node, node.InputValues[0].Type, nil) -} - -func (t *CloneVarType) String(node *dag.Node) string { - return fmt.Sprintf("CloneVar[]%v%v", formatStreamIO(node), formatValueIO(node)) -} +// func (t *CloneVarType) String() string { +// return fmt.Sprintf("CloneVar[]%v%v", formatStreamIO(node), formatValueIO(node)) +// } diff --git a/common/pkgs/ioswitch2/ops2/ec.go b/common/pkgs/ioswitch2/ops2/ec.go index 32f8163..246151b 100644 --- a/common/pkgs/ioswitch2/ops2/ec.go +++ b/common/pkgs/ioswitch2/ops2/ec.go @@ -14,7 +14,6 @@ import ( "gitlink.org.cn/cloudream/common/utils/io2" "gitlink.org.cn/cloudream/common/utils/sync2" "gitlink.org.cn/cloudream/storage/common/pkgs/ec" - "gitlink.org.cn/cloudream/storage/common/pkgs/ioswitch2" "golang.org/x/sync/semaphore" ) @@ -204,15 +203,43 @@ func (o *ECMultiply) String() string { ) } -type MultiplyType struct { +type ECMultiplyNode struct { + dag.NodeBase EC cdssdk.ECRedundancy InputIndexes []int OutputIndexes []int } -func (t *MultiplyType) InitNode(node *dag.Node) {} +func (b *GraphNodeBuilder) NewECMultiply(ec cdssdk.ECRedundancy) *ECMultiplyNode { + node := &ECMultiplyNode{ + EC: ec, + } + b.AddNode(node) + return node +} + +func (t *ECMultiplyNode) AddInput(str *dag.StreamVar, dataIndex int) { + t.InputIndexes = append(t.InputIndexes, dataIndex) + idx := t.InputStreams().EnlargeOne() + str.Connect(t, idx) +} + +func (t *ECMultiplyNode) RemoveAllInputs() { + for i, in := range t.InputStreams().RawArray() { + in.Disconnect(t, i) + } + t.InputStreams().Resize(0) + t.InputIndexes = nil +} + +func (t *ECMultiplyNode) NewOutput(dataIndex int) *dag.StreamVar { + t.OutputIndexes = append(t.OutputIndexes, dataIndex) + output := t.Graph().NewStreamVar() + t.OutputStreams().SetupNew(t, output) + return output +} -func (t *MultiplyType) GenerateOp(op *dag.Node) (exec.Op, error) { +func (t *ECMultiplyNode) GenerateOp() (exec.Op, error) { rs, err := ec.NewRs(t.EC.K, t.EC.N) if err != nil { return nil, err @@ -224,23 +251,12 @@ func (t *MultiplyType) GenerateOp(op *dag.Node) (exec.Op, error) { return &ECMultiply{ Coef: coef, - Inputs: lo.Map(op.InputStreams, func(v *dag.StreamVar, idx int) *exec.StreamVar { return v.Var }), - Outputs: lo.Map(op.OutputStreams, func(v *dag.StreamVar, idx int) *exec.StreamVar { return v.Var }), + Inputs: lo.Map(t.InputStreams().RawArray(), func(v *dag.StreamVar, idx int) *exec.StreamVar { return v.Var }), + Outputs: lo.Map(t.OutputStreams().RawArray(), func(v *dag.StreamVar, idx int) *exec.StreamVar { return v.Var }), ChunkSize: t.EC.ChunkSize, }, nil } -func (t *MultiplyType) AddInput(node *dag.Node, str *dag.StreamVar, dataIndex int) { - t.InputIndexes = append(t.InputIndexes, dataIndex) - node.InputStreams = append(node.InputStreams, str) - str.To(node, len(node.InputStreams)-1) -} - -func (t *MultiplyType) NewOutput(node *dag.Node, dataIndex int) *dag.StreamVar { - t.OutputIndexes = append(t.OutputIndexes, dataIndex) - return dag.NodeNewOutputStream(node, &ioswitch2.VarProps{StreamIndex: dataIndex}) -} - -func (t *MultiplyType) String(node *dag.Node) string { - return fmt.Sprintf("Multiply[]%v%v", formatStreamIO(node), formatValueIO(node)) -} +// func (t *MultiplyType) String() string { +// return fmt.Sprintf("Multiply[]%v%v", formatStreamIO(node), formatValueIO(node)) +// } diff --git a/common/pkgs/ioswitch2/ops2/file.go b/common/pkgs/ioswitch2/ops2/file.go index b6bd994..33dba33 100644 --- a/common/pkgs/ioswitch2/ops2/file.go +++ b/common/pkgs/ioswitch2/ops2/file.go @@ -11,7 +11,6 @@ import ( "gitlink.org.cn/cloudream/common/pkgs/ioswitch/dag" "gitlink.org.cn/cloudream/common/pkgs/ioswitch/exec" "gitlink.org.cn/cloudream/common/utils/io2" - "gitlink.org.cn/cloudream/storage/common/pkgs/ioswitch2" ) func init() { @@ -80,36 +79,66 @@ func (o *FileRead) String() string { return fmt.Sprintf("FileRead %s -> %v", o.FilePath, o.Output.ID) } -type FileReadType struct { +type FileReadNode struct { + dag.NodeBase FilePath string } -func (t *FileReadType) InitNode(node *dag.Node) { - dag.NodeNewOutputStream(node, &ioswitch2.VarProps{}) +func (b *GraphNodeBuilder) NewFileRead(filePath string) *FileReadNode { + node := &FileReadNode{ + FilePath: filePath, + } + b.AddNode(node) + node.OutputStreams().SetupNew(node, b.NewStreamVar()) + return node +} + +func (t *FileReadNode) Output() dag.StreamSlot { + return dag.StreamSlot{ + Var: t.OutputStreams().Get(0), + Index: 0, + } } -func (t *FileReadType) GenerateOp(op *dag.Node) (exec.Op, error) { +func (t *FileReadNode) GenerateOp() (exec.Op, error) { return &FileRead{ - Output: op.OutputStreams[0].Var, + Output: t.OutputStreams().Get(0).Var, FilePath: t.FilePath, }, nil } -func (t *FileReadType) String(node *dag.Node) string { - return fmt.Sprintf("FileRead[%s]%v%v", t.FilePath, formatStreamIO(node), formatValueIO(node)) -} +// func (t *FileReadType) String() string { +// return fmt.Sprintf("FileRead[%s]%v%v", t.FilePath, formatStreamIO(node), formatValueIO(node)) +// } -type FileWriteType struct { +type FileWriteNode struct { + dag.NodeBase FilePath string } -func (t *FileWriteType) InitNode(node *dag.Node) { - dag.NodeDeclareInputStream(node, 1) +func (b *GraphNodeBuilder) NewFileWrite(filePath string) *FileWriteNode { + node := &FileWriteNode{ + FilePath: filePath, + } + b.AddNode(node) + return node +} + +func (t *FileWriteNode) Input() dag.StreamSlot { + return dag.StreamSlot{ + Var: t.InputStreams().Get(0), + Index: 0, + } +} + +func (t *FileWriteNode) SetInput(str *dag.StreamVar) { + t.InputStreams().EnsureSize(1) + str.Connect(t, 0) } -func (t *FileWriteType) GenerateOp(op *dag.Node) (exec.Op, error) { +func (t *FileWriteNode) GenerateOp() (exec.Op, error) { return &FileWrite{ - Input: op.InputStreams[0].Var, + Input: t.InputStreams().Get(0).Var, FilePath: t.FilePath, }, nil } diff --git a/common/pkgs/ioswitch2/ops2/ipfs.go b/common/pkgs/ioswitch2/ops2/ipfs.go index d6604e7..f8e4e2a 100644 --- a/common/pkgs/ioswitch2/ops2/ipfs.go +++ b/common/pkgs/ioswitch2/ops2/ipfs.go @@ -12,7 +12,6 @@ import ( "gitlink.org.cn/cloudream/common/pkgs/logger" "gitlink.org.cn/cloudream/common/utils/io2" stgglb "gitlink.org.cn/cloudream/storage/common/globals" - "gitlink.org.cn/cloudream/storage/common/pkgs/ioswitch2" ) func init() { @@ -94,44 +93,78 @@ func (o *IPFSWrite) String() string { return fmt.Sprintf("IPFSWrite %v -> %v", o.Input.ID, o.FileHash.ID) } -type IPFSReadType struct { +type IPFSReadNode struct { + dag.NodeBase FileHash string Option ipfs.ReadOption } -func (t *IPFSReadType) InitNode(node *dag.Node) { - dag.NodeNewOutputStream(node, &ioswitch2.VarProps{}) +func (b *GraphNodeBuilder) NewIPFSRead(fileHash string, option ipfs.ReadOption) *IPFSReadNode { + node := &IPFSReadNode{ + FileHash: fileHash, + Option: option, + } + b.AddNode(node) + node.OutputStreams().SetupNew(node, b.NewStreamVar()) + return node +} + +func (t *IPFSReadNode) Output() dag.StreamSlot { + return dag.StreamSlot{ + Var: t.OutputStreams().Get(0), + Index: 0, + } } -func (t *IPFSReadType) GenerateOp(n *dag.Node) (exec.Op, error) { +func (t *IPFSReadNode) GenerateOp() (exec.Op, error) { return &IPFSRead{ - Output: n.OutputStreams[0].Var, + Output: t.OutputStreams().Get(0).Var, FileHash: t.FileHash, Option: t.Option, }, nil } -func (t *IPFSReadType) String(node *dag.Node) string { - return fmt.Sprintf("IPFSRead[%s,%v+%v]%v%v", t.FileHash, t.Option.Offset, t.Option.Length, formatStreamIO(node), formatValueIO(node)) -} +// func (t *IPFSReadType) String() string { +// return fmt.Sprintf("IPFSRead[%s,%v+%v]%v%v", t.FileHash, t.Option.Offset, t.Option.Length, formatStreamIO(node), formatValueIO(node)) +// } -type IPFSWriteType struct { +type IPFSWriteNode struct { + dag.NodeBase FileHashStoreKey string - Range exec.Range } -func (t *IPFSWriteType) InitNode(node *dag.Node) { - dag.NodeDeclareInputStream(node, 1) - dag.NodeNewOutputValue(node, dag.StringValueVar, &ioswitch2.VarProps{}) +func (b *GraphNodeBuilder) NewIPFSWrite(fileHashStoreKey string) *IPFSWriteNode { + node := &IPFSWriteNode{ + FileHashStoreKey: fileHashStoreKey, + } + b.AddNode(node) + return node +} + +func (t *IPFSWriteNode) SetInput(input *dag.StreamVar) { + t.InputStreams().EnsureSize(1) + input.Connect(t, 0) + t.OutputValues().SetupNew(t, t.Graph().NewValueVar(dag.StringValueVar)) +} + +func (t *IPFSWriteNode) Input() dag.StreamSlot { + return dag.StreamSlot{ + Var: t.InputStreams().Get(0), + Index: 0, + } } -func (t *IPFSWriteType) GenerateOp(op *dag.Node) (exec.Op, error) { +func (t *IPFSWriteNode) FileHashVar() *dag.ValueVar { + return t.OutputValues().Get(0) +} + +func (t *IPFSWriteNode) GenerateOp() (exec.Op, error) { return &IPFSWrite{ - Input: op.InputStreams[0].Var, - FileHash: op.OutputValues[0].Var.(*exec.StringVar), + Input: t.InputStreams().Get(0).Var, + FileHash: t.OutputValues().Get(0).Var.(*exec.StringVar), }, nil } -func (t *IPFSWriteType) String(node *dag.Node) string { - return fmt.Sprintf("IPFSWrite[%s,%v+%v]%v%v", t.FileHashStoreKey, t.Range.Offset, t.Range.Length, formatStreamIO(node), formatValueIO(node)) -} +// func (t *IPFSWriteType) String() string { +// return fmt.Sprintf("IPFSWrite[%s,%v+%v]%v%v", t.FileHashStoreKey, t.Range.Offset, t.Range.Length, formatStreamIO(node), formatValueIO(node)) +// } diff --git a/common/pkgs/ioswitch2/ops2/ops.go b/common/pkgs/ioswitch2/ops2/ops.go index c7309b6..c12ce53 100644 --- a/common/pkgs/ioswitch2/ops2/ops.go +++ b/common/pkgs/ioswitch2/ops2/ops.go @@ -1,75 +1,93 @@ package ops2 import ( - "fmt" - "gitlink.org.cn/cloudream/common/pkgs/ioswitch/dag" + "gitlink.org.cn/cloudream/common/pkgs/ioswitch/plan/ops" ) -func formatStreamIO(node *dag.Node) string { - is := "" - for i, in := range node.InputStreams { - if i > 0 { - is += "," - } - - if in == nil { - is += "." - } else { - is += fmt.Sprintf("%v", in.ID) - } - } - - os := "" - for i, out := range node.OutputStreams { - if i > 0 { - os += "," - } - - if out == nil { - os += "." - } else { - os += fmt.Sprintf("%v", out.ID) - } - } - - if is == "" && os == "" { - return "" - } - - return fmt.Sprintf("S{%s>%s}", is, os) +type GraphNodeBuilder struct { + *ops.GraphNodeBuilder +} + +func NewGraphNodeBuilder() *GraphNodeBuilder { + return &GraphNodeBuilder{ops.NewGraphNodeBuilder()} +} + +type FromNode interface { + dag.Node + Output() dag.StreamSlot } -func formatValueIO(node *dag.Node) string { - is := "" - for i, in := range node.InputValues { - if i > 0 { - is += "," - } - - if in == nil { - is += "." - } else { - is += fmt.Sprintf("%v", in.ID) - } - } - - os := "" - for i, out := range node.OutputValues { - if i > 0 { - os += "," - } - - if out == nil { - os += "." - } else { - os += fmt.Sprintf("%v", out.ID) - } - } - - if is == "" && os == "" { - return "" - } - - return fmt.Sprintf("V{%s>%s}", is, os) +type ToNode interface { + dag.Node + Input() dag.StreamSlot + SetInput(input *dag.StreamVar) } + +// func formatStreamIO(node *dag.Node) string { +// is := "" +// for i, in := range node.InputStreams { +// if i > 0 { +// is += "," +// } + +// if in == nil { +// is += "." +// } else { +// is += fmt.Sprintf("%v", in.ID) +// } +// } + +// os := "" +// for i, out := range node.OutputStreams { +// if i > 0 +// os += "," +// } + +// if out == nil { +// os += "." +// } else { +// os += fmt.Sprintf("%v", out.ID) +// } +// } + +// if is == "" && os == "" { +// return "" +// } + +// return fmt.Sprintf("S{%s>%s}", is, os) +// } + +// func formatValueIO(node *dag.Node) string { +// is := "" +// for i, in := range node.InputValues { +// if i > 0 { +// is += "," +// } + +// if in == nil { +// is += "." +// } else { +// is += fmt.Sprintf("%v", in.ID) +// } +// } + +// os := "" +// for i, out := range node.OutputValues { +// if i > 0 { +// os += "," +// } + +// if out == nil { +// os += "." +// } else { +// os += fmt.Sprintf("%v", out.ID) +// } +// } + +// if is == "" && os == "" { +// return "" +// } + +// return fmt.Sprintf("V{%s>%s}", is, os) +// } diff --git a/common/pkgs/ioswitch2/ops2/range.go b/common/pkgs/ioswitch2/ops2/range.go index 8e68238..1475b4e 100644 --- a/common/pkgs/ioswitch2/ops2/range.go +++ b/common/pkgs/ioswitch2/ops2/range.go @@ -10,7 +10,6 @@ import ( "gitlink.org.cn/cloudream/common/pkgs/ioswitch/exec" "gitlink.org.cn/cloudream/common/utils/io2" "gitlink.org.cn/cloudream/common/utils/math2" - "gitlink.org.cn/cloudream/storage/common/pkgs/ioswitch2" ) func init() { @@ -76,24 +75,35 @@ func (o *Range) String() string { return fmt.Sprintf("Range(%v+%v) %v -> %v", o.Offset, o.Length, o.Input.ID, o.Output.ID) } -type RangeType struct { +type RangeNode struct { + dag.NodeBase Range exec.Range } -func (t *RangeType) InitNode(node *dag.Node) { - dag.NodeDeclareInputStream(node, 1) - dag.NodeNewOutputStream(node, &ioswitch2.VarProps{}) +func (b *GraphNodeBuilder) NewRange() *RangeNode { + node := &RangeNode{} + b.AddNode(node) + return node } -func (t *RangeType) GenerateOp(n *dag.Node) (exec.Op, error) { +func (t *RangeNode) RangeStream(input *dag.StreamVar, rng exec.Range) *dag.StreamVar { + t.InputStreams().EnsureSize(1) + input.Connect(t, 0) + t.Range = rng + output := t.Graph().NewStreamVar() + t.OutputStreams().Setup(t, output, 0) + return output +} + +func (t *RangeNode) GenerateOp() (exec.Op, error) { return &Range{ - Input: n.InputStreams[0].Var, - Output: n.OutputStreams[0].Var, + Input: t.InputStreams().Get(0).Var, + Output: t.OutputStreams().Get(0).Var, Offset: t.Range.Offset, Length: t.Range.Length, }, nil } -func (t *RangeType) String(node *dag.Node) string { - return fmt.Sprintf("Range[%v+%v]%v%v", t.Range.Offset, t.Range.Length, formatStreamIO(node), formatValueIO(node)) -} +// func (t *RangeType) String() string { +// return fmt.Sprintf("Range[%v+%v]%v%v", t.Range.Offset, t.Range.Length, formatStreamIO(node), formatValueIO(node)) +// } diff --git a/common/pkgs/ioswitch2/parser/parser.go b/common/pkgs/ioswitch2/parser/parser.go index 3eff6b5..21870c2 100644 --- a/common/pkgs/ioswitch2/parser/parser.go +++ b/common/pkgs/ioswitch2/parser/parser.go @@ -7,7 +7,6 @@ import ( "gitlink.org.cn/cloudream/common/pkgs/ioswitch/dag" "gitlink.org.cn/cloudream/common/pkgs/ioswitch/exec" "gitlink.org.cn/cloudream/common/pkgs/ioswitch/plan" - "gitlink.org.cn/cloudream/common/pkgs/ioswitch/plan/ops" "gitlink.org.cn/cloudream/common/pkgs/ipfs" cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" "gitlink.org.cn/cloudream/common/utils/lo2" @@ -26,16 +25,27 @@ func NewParser(ec cdssdk.ECRedundancy) *DefaultParser { } } +type IndexedStream struct { + Stream *dag.StreamVar + DataIndex int +} + type ParseContext struct { Ft ioswitch2.FromTo - DAG *dag.Graph + DAG *ops2.GraphNodeBuilder // 为了产生所有To所需的数据范围,而需要From打开的范围。 // 这个范围是基于整个文件的,且上下界都取整到条带大小的整数倍,因此上界是有可能超过文件大小的。 - StreamRange exec.Range + ToNodes map[ioswitch2.To]ops2.ToNode + IndexedStreams []IndexedStream + StreamRange exec.Range } func (p *DefaultParser) Parse(ft ioswitch2.FromTo, blder *exec.PlanBuilder) error { - ctx := ParseContext{Ft: ft, DAG: dag.NewGraph()} + ctx := ParseContext{ + Ft: ft, + DAG: ops2.NewGraphNodeBuilder(), + ToNodes: make(map[ioswitch2.To]ops2.ToNode), + } // 分成两个阶段: // 1. 基于From和To生成更多指令,初步匹配to的需求 @@ -43,7 +53,7 @@ func (p *DefaultParser) Parse(ft ioswitch2.FromTo, blder *exec.PlanBuilder) erro // 计算一下打开流的范围 p.calcStreamRange(&ctx) - err := p.extend(&ctx, ft) + err := p.extend(&ctx) if err != nil { return err } @@ -82,20 +92,16 @@ func (p *DefaultParser) Parse(ft ioswitch2.FromTo, blder *exec.PlanBuilder) erro p.generateClone(&ctx) p.generateRange(&ctx) - return plan.Generate(ctx.DAG, blder) + return plan.Generate(ctx.DAG.Graph, blder) } func (p *DefaultParser) findOutputStream(ctx *ParseContext, streamIndex int) *dag.StreamVar { var ret *dag.StreamVar - ctx.DAG.Walk(func(n *dag.Node) bool { - for _, o := range n.OutputStreams { - if o != nil && ioswitch2.SProps(o).StreamIndex == streamIndex { - ret = o - return false - } + for _, s := range ctx.IndexedStreams { + if s.DataIndex == streamIndex { + ret = s.Stream + break } - return true - }) - + } return ret } @@ -134,77 +140,86 @@ func (p *DefaultParser) calcStreamRange(ctx *ParseContext) { ctx.StreamRange = rng } -func (p *DefaultParser) extend(ctx *ParseContext, ft ioswitch2.FromTo) error { - for _, fr := range ft.Froms { - frNode, err := p.buildFromNode(ctx, &ft, fr) +func (p *DefaultParser) extend(ctx *ParseContext) error { + for _, fr := range ctx.Ft.Froms { + frNode, err := p.buildFromNode(ctx, fr) if err != nil { return err } + ctx.IndexedStreams = append(ctx.IndexedStreams, IndexedStream{ + Stream: frNode.Output().Var, + DataIndex: fr.GetDataIndex(), + }) + // 对于完整文件的From,生成Split指令 if fr.GetDataIndex() == -1 { - node, _ := dag.NewNode(ctx.DAG, &ops2.ChunkedSplitType{ChunkSize: p.EC.ChunkSize, OutputCount: p.EC.K}, &ioswitch2.NodeProps{}) - frNode.OutputStreams[0].To(node, 0) + splitNode := ctx.DAG.NewChunkedSplit(p.EC.ChunkSize) + splitNode.Split(frNode.Output().Var, p.EC.K) + for i := 0; i < p.EC.K; i++ { + ctx.IndexedStreams = append(ctx.IndexedStreams, IndexedStream{ + Stream: splitNode.SubStream(i), + DataIndex: i, + }) + } } } // 如果有K个不同的文件块流,则生成Multiply指令,同时针对其生成的流,生成Join指令 ecInputStrs := make(map[int]*dag.StreamVar) -loop: - for _, o := range ctx.DAG.Nodes { - for _, s := range o.OutputStreams { - prop := ioswitch2.SProps(s) - if prop.StreamIndex >= 0 && ecInputStrs[prop.StreamIndex] == nil { - ecInputStrs[prop.StreamIndex] = s - if len(ecInputStrs) == p.EC.K { - break loop - } + for _, s := range ctx.IndexedStreams { + if s.DataIndex >= 0 && ecInputStrs[s.DataIndex] == nil { + ecInputStrs[s.DataIndex] = s.Stream + if len(ecInputStrs) == p.EC.K { + break } } } + if len(ecInputStrs) == p.EC.K { - mulNode, mulType := dag.NewNode(ctx.DAG, &ops2.MultiplyType{ - EC: p.EC, - }, &ioswitch2.NodeProps{}) + mulNode := ctx.DAG.NewECMultiply(p.EC) - for _, s := range ecInputStrs { - mulType.AddInput(mulNode, s, ioswitch2.SProps(s).StreamIndex) + for i, s := range ecInputStrs { + mulNode.AddInput(s, i) } for i := 0; i < p.EC.N; i++ { - mulType.NewOutput(mulNode, i) + ctx.IndexedStreams = append(ctx.IndexedStreams, IndexedStream{ + Stream: mulNode.NewOutput(i), + DataIndex: i, + }) } - joinNode, _ := dag.NewNode(ctx.DAG, &ops2.ChunkedJoinType{ - InputCount: p.EC.K, - ChunkSize: p.EC.ChunkSize, - }, &ioswitch2.NodeProps{}) - + joinNode := ctx.DAG.NewChunkedJoin(p.EC.ChunkSize) for i := 0; i < p.EC.K; i++ { // 不可能找不到流 - p.findOutputStream(ctx, i).To(joinNode, i) + joinNode.AddInput(p.findOutputStream(ctx, i)) } - ioswitch2.SProps(joinNode.OutputStreams[0]).StreamIndex = -1 + ctx.IndexedStreams = append(ctx.IndexedStreams, IndexedStream{ + Stream: joinNode.Joined(), + DataIndex: -1, + }) } // 为每一个To找到一个输入流 - for _, to := range ft.Toes { - n, err := p.buildToNode(ctx, &ft, to) + for _, to := range ctx.Ft.Toes { + toNode, err := p.buildToNode(ctx, to) if err != nil { return err } + ctx.ToNodes[to] = toNode str := p.findOutputStream(ctx, to.GetDataIndex()) if str == nil { return fmt.Errorf("no output stream found for data index %d", to.GetDataIndex()) } - str.To(n, 0) + toNode.SetInput(str) } return nil } -func (p *DefaultParser) buildFromNode(ctx *ParseContext, ft *ioswitch2.FromTo, f ioswitch2.From) (*dag.Node, error) { +func (p *DefaultParser) buildFromNode(ctx *ParseContext, f ioswitch2.From) (ops2.FromNode, error) { var repRange exec.Range var blkRange exec.Range @@ -220,16 +235,10 @@ func (p *DefaultParser) buildFromNode(ctx *ParseContext, ft *ioswitch2.FromTo, f switch f := f.(type) { case *ioswitch2.FromNode: - n, t := dag.NewNode(ctx.DAG, &ops2.IPFSReadType{ - FileHash: f.FileHash, - Option: ipfs.ReadOption{ - Offset: 0, - Length: -1, - }, - }, &ioswitch2.NodeProps{ - From: f, + t := ctx.DAG.NewIPFSRead(f.FileHash, ipfs.ReadOption{ + Offset: 0, + Length: -1, }) - ioswitch2.SProps(n.OutputStreams[0]).StreamIndex = f.DataIndex if f.DataIndex == -1 { t.Option.Offset = repRange.Offset @@ -244,17 +253,16 @@ func (p *DefaultParser) buildFromNode(ctx *ParseContext, ft *ioswitch2.FromTo, f } if f.Node != nil { - n.Env.ToEnvWorker(&ioswitch2.AgentWorker{Node: *f.Node}) - n.Env.Pinned = true + t.Env().ToEnvWorker(&ioswitch2.AgentWorker{Node: *f.Node}) + t.Env().Pinned = true } - return n, nil + return t, nil case *ioswitch2.FromDriver: - n, _ := dag.NewNode(ctx.DAG, &ops.FromDriverType{Handle: f.Handle}, &ioswitch2.NodeProps{From: f}) - n.Env.ToEnvDriver() - n.Env.Pinned = true - ioswitch2.SProps(n.OutputStreams[0]).StreamIndex = f.DataIndex + n := ctx.DAG.NewFromDriver(f.Handle) + n.Env().ToEnvDriver() + n.Env().Pinned = true if f.DataIndex == -1 { f.Handle.RangeHint.Offset = repRange.Offset @@ -271,24 +279,19 @@ func (p *DefaultParser) buildFromNode(ctx *ParseContext, ft *ioswitch2.FromTo, f } } -func (p *DefaultParser) buildToNode(ctx *ParseContext, ft *ioswitch2.FromTo, t ioswitch2.To) (*dag.Node, error) { +func (p *DefaultParser) buildToNode(ctx *ParseContext, t ioswitch2.To) (ops2.ToNode, error) { switch t := t.(type) { case *ioswitch2.ToNode: - n, _ := dag.NewNode(ctx.DAG, &ops2.IPFSWriteType{ - FileHashStoreKey: t.FileHashStoreKey, - Range: t.Range, - }, &ioswitch2.NodeProps{ - To: t, - }) - n.Env.ToEnvWorker(&ioswitch2.AgentWorker{Node: t.Node}) - n.Env.Pinned = true + n := ctx.DAG.NewIPFSWrite(t.FileHashStoreKey) + n.Env().ToEnvWorker(&ioswitch2.AgentWorker{Node: t.Node}) + n.Env().Pinned = true return n, nil case *ioswitch2.ToDriver: - n, _ := dag.NewNode(ctx.DAG, &ops.ToDriverType{Handle: t.Handle, Range: t.Range}, &ioswitch2.NodeProps{To: t}) - n.Env.ToEnvDriver() - n.Env.Pinned = true + n := ctx.DAG.NewToDriver(t.Handle) + n.Env().ToEnvDriver() + n.Env().Pinned = true return n, nil @@ -301,15 +304,12 @@ func (p *DefaultParser) buildToNode(ctx *ParseContext, ft *ioswitch2.FromTo, t i func (p *DefaultParser) removeUnusedJoin(ctx *ParseContext) bool { changed := false - dag.WalkOnlyType[*ops2.ChunkedJoinType](ctx.DAG, func(node *dag.Node, typ *ops2.ChunkedJoinType) bool { - if len(node.OutputStreams[0].Toes) > 0 { + dag.WalkOnlyType[*ops2.ChunkedJoinNode](ctx.DAG.Graph, func(node *ops2.ChunkedJoinNode) bool { + if node.InputStreams().Len() > 0 { return true } - for _, in := range node.InputStreams { - in.NotTo(node) - } - + node.RemoveAllInputs() ctx.DAG.RemoveNode(node) return true }) @@ -320,25 +320,23 @@ func (p *DefaultParser) removeUnusedJoin(ctx *ParseContext) bool { // 减少未使用的Multiply指令的输出流。如果减少到0,则删除该指令 func (p *DefaultParser) removeUnusedMultiplyOutput(ctx *ParseContext) bool { changed := false - dag.WalkOnlyType[*ops2.MultiplyType](ctx.DAG, func(node *dag.Node, typ *ops2.MultiplyType) bool { - for i2, out := range node.OutputStreams { - if len(out.Toes) > 0 { + dag.WalkOnlyType[*ops2.ECMultiplyNode](ctx.DAG.Graph, func(node *ops2.ECMultiplyNode) bool { + outArr := node.OutputStreams().RawArray() + for i2, out := range outArr { + if out.To().Len() > 0 { continue } - node.OutputStreams[i2] = nil - typ.OutputIndexes[i2] = -2 + outArr[i2] = nil + node.OutputIndexes[i2] = -2 changed = true } - node.OutputStreams = lo2.RemoveAllDefault(node.OutputStreams) - typ.OutputIndexes = lo2.RemoveAll(typ.OutputIndexes, -2) + node.OutputStreams().SetRawArray(lo2.RemoveAllDefault(outArr)) + node.OutputIndexes = lo2.RemoveAll(node.OutputIndexes, -2) // 如果所有输出流都被删除,则删除该指令 - if len(node.OutputStreams) == 0 { - for _, in := range node.InputStreams { - in.NotTo(node) - } - + if node.OutputStreams().Len() == 0 { + node.RemoveAllInputs() ctx.DAG.RemoveNode(node) changed = true } @@ -351,16 +349,16 @@ func (p *DefaultParser) removeUnusedMultiplyOutput(ctx *ParseContext) bool { // 删除未使用的Split指令 func (p *DefaultParser) removeUnusedSplit(ctx *ParseContext) bool { changed := false - dag.WalkOnlyType[*ops2.ChunkedSplitType](ctx.DAG, func(node *dag.Node, typ *ops2.ChunkedSplitType) bool { + dag.WalkOnlyType[*ops2.ChunkedSplitNode](ctx.DAG.Graph, func(typ *ops2.ChunkedSplitNode) bool { // Split出来的每一个流都没有被使用,才能删除这个指令 - for _, out := range node.OutputStreams { - if len(out.Toes) > 0 { + for _, out := range typ.OutputStreams().RawArray() { + if out.To().Len() > 0 { return true } } - node.InputStreams[0].NotTo(node) - ctx.DAG.RemoveNode(node) + typ.Clear() + ctx.DAG.RemoveNode(typ) changed = true return true }) @@ -372,43 +370,44 @@ func (p *DefaultParser) removeUnusedSplit(ctx *ParseContext) bool { func (p *DefaultParser) omitSplitJoin(ctx *ParseContext) bool { changed := false - dag.WalkOnlyType[*ops2.ChunkedSplitType](ctx.DAG, func(splitNode *dag.Node, typ *ops2.ChunkedSplitType) bool { + dag.WalkOnlyType[*ops2.ChunkedSplitNode](ctx.DAG.Graph, func(splitNode *ops2.ChunkedSplitNode) bool { // Split指令的每一个输出都有且只有一个目的地 - var joinNode *dag.Node - for _, out := range splitNode.OutputStreams { - if len(out.Toes) != 1 { - continue + var dstNode dag.Node + for _, out := range splitNode.OutputStreams().RawArray() { + if out.To().Len() != 1 { + return true } - if joinNode == nil { - joinNode = out.Toes[0].Node - } else if joinNode != out.Toes[0].Node { + if dstNode == nil { + dstNode = out.To().Get(0).Node + } else if dstNode != out.To().Get(0).Node { return true } } - if joinNode == nil { + if dstNode == nil { return true } // 且这个目的地要是一个Join指令 - _, ok := joinNode.Type.(*ops2.ChunkedJoinType) + joinNode, ok := dstNode.(*ops2.ChunkedJoinNode) if !ok { return true } // 同时这个Join指令的输入也必须全部来自Split指令的输出。 // 由于上面判断了Split指令的输出目的地都相同,所以这里只要判断Join指令的输入数量是否与Split指令的输出数量相同即可 - if len(joinNode.InputStreams) != len(splitNode.OutputStreams) { + if joinNode.InputStreams().Len() != splitNode.OutputStreams().Len() { return true } // 所有条件都满足,可以开始省略操作,将Join操作的目的地的输入流替换为Split操作的输入流: // F->Split->Join->T 变换为:F->T - splitNode.InputStreams[0].NotTo(splitNode) - for _, out := range joinNode.OutputStreams[0].Toes { - splitNode.InputStreams[0].To(out.Node, out.SlotIndex) + splitInput := splitNode.InputStreams().Get(0) + for _, to := range joinNode.Joined().To().RawArray() { + splitInput.Connect(to.Node, to.SlotIndex) } + splitInput.Disconnect(splitNode, 0) // 并删除这两个指令 ctx.DAG.RemoveNode(joinNode) @@ -426,21 +425,21 @@ func (p *DefaultParser) omitSplitJoin(ctx *ParseContext) bool { // 所以理论上不会出现有指令的位置始终无法确定的情况。 func (p *DefaultParser) pin(ctx *ParseContext) bool { changed := false - ctx.DAG.Walk(func(node *dag.Node) bool { - if node.Env.Pinned { + ctx.DAG.Walk(func(node dag.Node) bool { + if node.Env().Pinned { return true } var toEnv *dag.NodeEnv - for _, out := range node.OutputStreams { - for _, to := range out.Toes { - if to.Node.Env.Type == dag.EnvUnknown { + for _, out := range node.OutputStreams().RawArray() { + for _, to := range out.To().RawArray() { + if to.Node.Env().Type == dag.EnvUnknown { continue } if toEnv == nil { - toEnv = &to.Node.Env - } else if !toEnv.Equals(to.Node.Env) { + toEnv = to.Node.Env() + } else if !toEnv.Equals(to.Node.Env()) { toEnv = nil break } @@ -448,35 +447,35 @@ func (p *DefaultParser) pin(ctx *ParseContext) bool { } if toEnv != nil { - if !node.Env.Equals(*toEnv) { + if !node.Env().Equals(toEnv) { changed = true } - node.Env = *toEnv + *node.Env() = *toEnv return true } // 否则根据输入流的始发地来固定 var fromEnv *dag.NodeEnv - for _, in := range node.InputStreams { - if in.From.Node.Env.Type == dag.EnvUnknown { + for _, in := range node.InputStreams().RawArray() { + if in.From().Node.Env().Type == dag.EnvUnknown { continue } if fromEnv == nil { - fromEnv = &in.From.Node.Env - } else if !fromEnv.Equals(in.From.Node.Env) { + fromEnv = in.From().Node.Env() + } else if !fromEnv.Equals(in.From().Node.Env()) { fromEnv = nil break } } if fromEnv != nil { - if !node.Env.Equals(*fromEnv) { + if !node.Env().Equals(fromEnv) { changed = true } - node.Env = *fromEnv + *node.Env() = *fromEnv } return true }) @@ -486,12 +485,12 @@ func (p *DefaultParser) pin(ctx *ParseContext) bool { // 对于所有未使用的流,增加Drop指令 func (p *DefaultParser) dropUnused(ctx *ParseContext) { - ctx.DAG.Walk(func(node *dag.Node) bool { - for _, out := range node.OutputStreams { - if len(out.Toes) == 0 { - n := ctx.DAG.NewNode(&ops.DropType{}, &ioswitch2.NodeProps{}) - n.Env = node.Env - out.To(n, 0) + ctx.DAG.Walk(func(node dag.Node) bool { + for _, out := range node.OutputStreams().RawArray() { + if out.To().Len() == 0 { + n := ctx.DAG.NewDropStream() + *n.Env() = *node.Env() + n.SetInput(out) } } return true @@ -500,43 +499,38 @@ func (p *DefaultParser) dropUnused(ctx *ParseContext) { // 为IPFS写入指令存储结果 func (p *DefaultParser) storeIPFSWriteResult(ctx *ParseContext) { - dag.WalkOnlyType[*ops2.IPFSWriteType](ctx.DAG, func(node *dag.Node, typ *ops2.IPFSWriteType) bool { - if typ.FileHashStoreKey == "" { + dag.WalkOnlyType[*ops2.IPFSWriteNode](ctx.DAG.Graph, func(n *ops2.IPFSWriteNode) bool { + if n.FileHashStoreKey == "" { return true } - n, t := dag.NewNode(ctx.DAG, &ops.StoreType{ - StoreKey: typ.FileHashStoreKey, - }, &ioswitch2.NodeProps{}) - n.Env.ToEnvDriver() - t.Store(n, node.OutputValues[0]) + storeNode := ctx.DAG.NewStore() + storeNode.Env().ToEnvDriver() + + storeNode.Store(n.FileHashStoreKey, n.FileHashVar()) return true }) } // 生成Range指令。StreamRange可能超过文件总大小,但Range指令会在数据量不够时不报错而是正常返回 func (p *DefaultParser) generateRange(ctx *ParseContext) { - ctx.DAG.Walk(func(node *dag.Node) bool { - props := ioswitch2.NProps(node) - if props.To == nil { - return true - } + for i := 0; i < len(ctx.Ft.Toes); i++ { + to := ctx.Ft.Toes[i] + toNode := ctx.ToNodes[to] - toDataIdx := props.To.GetDataIndex() - toRng := props.To.GetRange() + toDataIdx := to.GetDataIndex() + toRng := to.GetRange() if toDataIdx == -1 { - n := ctx.DAG.NewNode(&ops2.RangeType{ - Range: exec.Range{ - Offset: toRng.Offset - ctx.StreamRange.Offset, - Length: toRng.Length, - }, - }, &ioswitch2.NodeProps{}) - n.Env = node.InputStreams[0].From.Node.Env - - node.InputStreams[0].To(n, 0) - node.InputStreams[0].NotTo(node) - n.OutputStreams[0].To(node, 0) + n := ctx.DAG.NewRange() + toInput := toNode.Input() + *n.Env() = *toInput.Var.From().Node.Env() + rnged := n.RangeStream(toInput.Var, exec.Range{ + Offset: toRng.Offset - ctx.StreamRange.Offset, + Length: toRng.Length, + }) + toInput.Var.Disconnect(toNode, toInput.Index) + toNode.SetInput(rnged) } else { stripSize := int64(p.EC.ChunkSize * p.EC.K) @@ -544,54 +538,48 @@ func (p *DefaultParser) generateRange(ctx *ParseContext) { blkStart := blkStartIdx * int64(p.EC.ChunkSize) - n := ctx.DAG.NewNode(&ops2.RangeType{ - Range: exec.Range{ - Offset: toRng.Offset - blkStart, - Length: toRng.Length, - }, - }, &ioswitch2.NodeProps{}) - n.Env = node.InputStreams[0].From.Node.Env - - node.InputStreams[0].To(n, 0) - node.InputStreams[0].NotTo(node) - n.OutputStreams[0].To(node, 0) + n := ctx.DAG.NewRange() + toInput := toNode.Input() + *n.Env() = *toInput.Var.From().Node.Env() + rnged := n.RangeStream(toInput.Var, exec.Range{ + Offset: toRng.Offset - blkStart, + Length: toRng.Length, + }) + toInput.Var.Disconnect(toNode, toInput.Index) + toNode.SetInput(rnged) } - - return true - }) + } } // 生成Clone指令 func (p *DefaultParser) generateClone(ctx *ParseContext) { - ctx.DAG.Walk(func(node *dag.Node) bool { - for _, out := range node.OutputStreams { - if len(out.Toes) <= 1 { + ctx.DAG.Walk(func(node dag.Node) bool { + for _, out := range node.OutputStreams().RawArray() { + if out.To().Len() <= 1 { continue } - n, t := dag.NewNode(ctx.DAG, &ops2.CloneStreamType{}, &ioswitch2.NodeProps{}) - n.Env = node.Env - for _, to := range out.Toes { - str := t.NewOutput(n) - str.Props = &ioswitch2.VarProps{StreamIndex: ioswitch2.SProps(out).StreamIndex} - str.To(to.Node, to.SlotIndex) + t := ctx.DAG.NewCloneStream() + *t.Env() = *node.Env() + for _, to := range out.To().RawArray() { + t.NewOutput().Connect(to.Node, to.SlotIndex) } - out.Toes = nil - out.To(n, 0) + out.To().Resize(0) + t.SetInput(out) } - for _, out := range node.OutputValues { - if len(out.Toes) <= 1 { + for _, out := range node.OutputValues().RawArray() { + if out.To().Len() <= 1 { continue } - n, t := dag.NewNode(ctx.DAG, &ops2.CloneVarType{}, &ioswitch2.NodeProps{}) - n.Env = node.Env - for _, to := range out.Toes { - t.NewOutput(n).To(to.Node, to.SlotIndex) + t := ctx.DAG.NewCloneValue() + *t.Env() = *node.Env() + for _, to := range out.To().RawArray() { + t.NewOutput().Connect(to.Node, to.SlotIndex) } - out.Toes = nil - out.To(n, 0) + out.To().Resize(0) + t.SetInput(out) } return true diff --git a/common/pkgs/ioswitch2/utils.go b/common/pkgs/ioswitch2/utils.go deleted file mode 100644 index ad3cb20..0000000 --- a/common/pkgs/ioswitch2/utils.go +++ /dev/null @@ -1,17 +0,0 @@ -package ioswitch2 - -import ( - "gitlink.org.cn/cloudream/common/pkgs/ioswitch/dag" -) - -func NProps(n *dag.Node) *NodeProps { - return dag.NProps[*NodeProps](n) -} - -func SProps(str *dag.StreamVar) *VarProps { - return dag.SProps[*VarProps](str) -} - -func VProps(v *dag.ValueVar) *VarProps { - return dag.VProps[*VarProps](v) -} diff --git a/common/pkgs/ioswitchlrc/agent_worker.go b/common/pkgs/ioswitchlrc/agent_worker.go index 81b280f..122a54a 100644 --- a/common/pkgs/ioswitchlrc/agent_worker.go +++ b/common/pkgs/ioswitchlrc/agent_worker.go @@ -5,16 +5,14 @@ import ( "io" "gitlink.org.cn/cloudream/common/pkgs/ioswitch/exec" - "gitlink.org.cn/cloudream/common/pkgs/types" cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" - "gitlink.org.cn/cloudream/common/utils/serder" stgglb "gitlink.org.cn/cloudream/storage/common/globals" agtrpc "gitlink.org.cn/cloudream/storage/common/pkgs/grpc/agent" ) -var _ = serder.UseTypeUnionExternallyTagged(types.Ref(types.NewTypeUnion[exec.WorkerInfo]( - (*AgentWorker)(nil), -))) +// var _ = serder.UseTypeUnionExternallyTagged(types.Ref(types.NewTypeUnion[exec.WorkerInfo]( +// (*AgentWorker)(nil), +// ))) type AgentWorker struct { Node cdssdk.Node diff --git a/common/pkgs/ioswitchlrc/ioswitch.go b/common/pkgs/ioswitchlrc/ioswitch.go deleted file mode 100644 index b6198a8..0000000 --- a/common/pkgs/ioswitchlrc/ioswitch.go +++ /dev/null @@ -1,23 +0,0 @@ -package ioswitchlrc - -import ( - "gitlink.org.cn/cloudream/common/pkgs/ioswitch/exec" -) - -type NodeProps struct { - From From - To To -} - -type ValueVarType int - -const ( - StringValueVar ValueVarType = iota - SignalValueVar -) - -type VarProps struct { - StreamIndex int // 流的编号,只在StreamVar上有意义 - ValueType ValueVarType // 值类型,只在ValueVar上有意义 - Var exec.Var // 生成Plan的时候创建的对应的Var -} diff --git a/common/pkgs/ioswitchlrc/ops2/chunked.go b/common/pkgs/ioswitchlrc/ops2/chunked.go index 33ae0e6..9707c20 100644 --- a/common/pkgs/ioswitchlrc/ops2/chunked.go +++ b/common/pkgs/ioswitchlrc/ops2/chunked.go @@ -11,7 +11,6 @@ import ( "gitlink.org.cn/cloudream/common/pkgs/ioswitch/exec" "gitlink.org.cn/cloudream/common/pkgs/ioswitch/utils" "gitlink.org.cn/cloudream/common/utils/io2" - "gitlink.org.cn/cloudream/storage/common/pkgs/ioswitchlrc" "golang.org/x/sync/semaphore" ) @@ -101,24 +100,40 @@ func (o *ChunkedJoin) String() string { ) } -type ChunkedSplitType struct { - OutputCount int - ChunkSize int +type ChunkedSplitNode struct { + dag.NodeBase + ChunkSize int } -func (t *ChunkedSplitType) InitNode(node *dag.Node) { - dag.NodeDeclareInputStream(node, 1) - for i := 0; i < t.OutputCount; i++ { - dag.NodeNewOutputStream(node, &ioswitchlrc.VarProps{ - StreamIndex: i, - }) +func (b *GraphNodeBuilder) NewChunkedSplit(chunkSize int) *ChunkedSplitNode { + node := &ChunkedSplitNode{ + ChunkSize: chunkSize, + } + b.AddNode(node) + return node +} + +func (t *ChunkedSplitNode) Split(input *dag.StreamVar, cnt int) { + t.InputStreams().EnsureSize(1) + input.Connect(t, 0) + t.OutputStreams().Resize(cnt) + for i := 0; i < cnt; i++ { + t.OutputStreams().Setup(t, t.Graph().NewStreamVar(), i) } } -func (t *ChunkedSplitType) GenerateOp(op *dag.Node) (exec.Op, error) { +func (t *ChunkedSplitNode) SubStream(idx int) *dag.StreamVar { + return t.OutputStreams().Get(idx) +} + +func (t *ChunkedSplitNode) SplitCount() int { + return t.OutputStreams().Len() +} + +func (t *ChunkedSplitNode) GenerateOp() (exec.Op, error) { return &ChunkedSplit{ - Input: op.InputStreams[0].Var, - Outputs: lo.Map(op.OutputStreams, func(v *dag.StreamVar, idx int) *exec.StreamVar { + Input: t.InputStreams().Get(0).Var, + Outputs: lo.Map(t.OutputStreams().RawArray(), func(v *dag.StreamVar, idx int) *exec.StreamVar { return v.Var }), ChunkSize: t.ChunkSize, @@ -126,32 +141,43 @@ func (t *ChunkedSplitType) GenerateOp(op *dag.Node) (exec.Op, error) { }, nil } -func (t *ChunkedSplitType) String(node *dag.Node) string { - return fmt.Sprintf("ChunkedSplit[%v]%v%v", t.ChunkSize, formatStreamIO(node), formatValueIO(node)) +// func (t *ChunkedSplitNode) String() string { +// return fmt.Sprintf("ChunkedSplit[%v]%v%v", t.ChunkSize, formatStreamIO(node), formatValueIO(node)) +// } + +type ChunkedJoinNode struct { + dag.NodeBase + ChunkSize int } -type ChunkedJoinType struct { - InputCount int - ChunkSize int +func (b *GraphNodeBuilder) NewChunkedJoin(chunkSize int) *ChunkedJoinNode { + node := &ChunkedJoinNode{ + ChunkSize: chunkSize, + } + b.AddNode(node) + node.OutputStreams().SetupNew(node, b.Graph.NewStreamVar()) + return node } -func (t *ChunkedJoinType) InitNode(node *dag.Node) { - dag.NodeDeclareInputStream(node, t.InputCount) - dag.NodeNewOutputStream(node, &ioswitchlrc.VarProps{ - StreamIndex: -1, - }) +func (t *ChunkedJoinNode) AddInput(str *dag.StreamVar) { + idx := t.InputStreams().EnlargeOne() + str.Connect(t, idx) } -func (t *ChunkedJoinType) GenerateOp(op *dag.Node) (exec.Op, error) { +func (t *ChunkedJoinNode) Joined() *dag.StreamVar { + return t.OutputStreams().Get(0) +} + +func (t *ChunkedJoinNode) GenerateOp() (exec.Op, error) { return &ChunkedJoin{ - Inputs: lo.Map(op.InputStreams, func(v *dag.StreamVar, idx int) *exec.StreamVar { + Inputs: lo.Map(t.InputStreams().RawArray(), func(v *dag.StreamVar, idx int) *exec.StreamVar { return v.Var }), - Output: op.OutputStreams[0].Var, + Output: t.OutputStreams().Get(0).Var, ChunkSize: t.ChunkSize, }, nil } -func (t *ChunkedJoinType) String(node *dag.Node) string { - return fmt.Sprintf("ChunkedJoin[%v]%v%v", t.ChunkSize, formatStreamIO(node), formatValueIO(node)) -} +// func (t *ChunkedJoinType) String() string { +// return fmt.Sprintf("ChunkedJoin[%v]%v%v", t.ChunkSize, formatStreamIO(node), formatValueIO(node)) +// } diff --git a/common/pkgs/ioswitchlrc/ops2/clone.go b/common/pkgs/ioswitchlrc/ops2/clone.go index 45129a8..66d1194 100644 --- a/common/pkgs/ioswitchlrc/ops2/clone.go +++ b/common/pkgs/ioswitchlrc/ops2/clone.go @@ -74,48 +74,70 @@ func (o *CloneVar) String() string { return fmt.Sprintf("CloneStream %v -> (%v)", o.Raw.GetID(), utils.FormatVarIDs(o.Cloneds)) } -type CloneStreamType struct{} +type CloneStreamType struct { + dag.NodeBase +} + +func (b *GraphNodeBuilder) NewCloneStream() *CloneStreamType { + node := &CloneStreamType{} + b.AddNode(node) + return node +} + +func (t *CloneStreamType) SetInput(raw *dag.StreamVar) { + t.InputStreams().EnsureSize(1) + raw.Connect(t, 0) +} -func (t *CloneStreamType) InitNode(node *dag.Node) { - dag.NodeDeclareInputStream(node, 1) +func (t *CloneStreamType) NewOutput() *dag.StreamVar { + output := t.Graph().NewStreamVar() + t.OutputStreams().SetupNew(t, output) + return output } -func (t *CloneStreamType) GenerateOp(op *dag.Node) (exec.Op, error) { +func (t *CloneStreamType) GenerateOp() (exec.Op, error) { return &CloneStream{ - Raw: op.InputStreams[0].Var, - Cloneds: lo.Map(op.OutputStreams, func(v *dag.StreamVar, idx int) *exec.StreamVar { + Raw: t.InputStreams().Get(0).Var, + Cloneds: lo.Map(t.OutputStreams().RawArray(), func(v *dag.StreamVar, idx int) *exec.StreamVar { return v.Var }), }, nil } -func (t *CloneStreamType) NewOutput(node *dag.Node) *dag.StreamVar { - return dag.NodeNewOutputStream(node, nil) +// func (t *CloneStreamType) String() string { +// return fmt.Sprintf("CloneStream[]%v%v", formatStreamIO(node), formatValueIO(node)) +// } + +type CloneVarType struct { + dag.NodeBase } -func (t *CloneStreamType) String(node *dag.Node) string { - return fmt.Sprintf("CloneStream[]%v%v", formatStreamIO(node), formatValueIO(node)) +func (b *GraphNodeBuilder) NewCloneValue() *CloneVarType { + node := &CloneVarType{} + b.AddNode(node) + return node } -type CloneVarType struct{} +func (t *CloneVarType) SetInput(raw *dag.ValueVar) { + t.InputValues().EnsureSize(1) + raw.Connect(t, 0) +} -func (t *CloneVarType) InitNode(node *dag.Node) { - dag.NodeDeclareInputValue(node, 1) +func (t *CloneVarType) NewOutput() *dag.ValueVar { + output := t.Graph().NewValueVar(t.InputValues().Get(0).Type) + t.OutputValues().SetupNew(t, output) + return output } -func (t *CloneVarType) GenerateOp(op *dag.Node) (exec.Op, error) { +func (t *CloneVarType) GenerateOp() (exec.Op, error) { return &CloneVar{ - Raw: op.InputValues[0].Var, - Cloneds: lo.Map(op.OutputValues, func(v *dag.ValueVar, idx int) exec.Var { + Raw: t.InputValues().Get(0).Var, + Cloneds: lo.Map(t.OutputValues().RawArray(), func(v *dag.ValueVar, idx int) exec.Var { return v.Var }), }, nil } -func (t *CloneVarType) NewOutput(node *dag.Node) *dag.ValueVar { - return dag.NodeNewOutputValue(node, node.InputValues[0].Type, nil) -} - -func (t *CloneVarType) String(node *dag.Node) string { - return fmt.Sprintf("CloneVar[]%v%v", formatStreamIO(node), formatValueIO(node)) -} +// func (t *CloneVarType) String() string { +// return fmt.Sprintf("CloneVar[]%v%v", formatStreamIO(node), formatValueIO(node)) +// } diff --git a/common/pkgs/ioswitchlrc/ops2/ec.go b/common/pkgs/ioswitchlrc/ops2/ec.go index 01ed734..358a0e3 100644 --- a/common/pkgs/ioswitchlrc/ops2/ec.go +++ b/common/pkgs/ioswitchlrc/ops2/ec.go @@ -15,7 +15,6 @@ import ( "gitlink.org.cn/cloudream/common/utils/sync2" "gitlink.org.cn/cloudream/storage/common/pkgs/ec" "gitlink.org.cn/cloudream/storage/common/pkgs/ec/lrc" - "gitlink.org.cn/cloudream/storage/common/pkgs/ioswitchlrc" ) func init() { @@ -115,15 +114,43 @@ func (o *GalMultiply) String() string { ) } -type LRCConstructAnyType struct { +type LRCConstructAnyNode struct { + dag.NodeBase LRC cdssdk.LRCRedundancy InputIndexes []int OutputIndexes []int } -func (t *LRCConstructAnyType) InitNode(node *dag.Node) {} +func (b *GraphNodeBuilder) NewLRCConstructAny(lrc cdssdk.LRCRedundancy) *LRCConstructAnyNode { + node := &LRCConstructAnyNode{ + LRC: lrc, + } + b.AddNode(node) + return node +} + +func (t *LRCConstructAnyNode) AddInput(str *dag.StreamVar, dataIndex int) { + t.InputIndexes = append(t.InputIndexes, dataIndex) + idx := t.InputStreams().EnlargeOne() + str.Connect(t, idx) +} + +func (t *LRCConstructAnyNode) RemoveAllInputs() { + for i, in := range t.InputStreams().RawArray() { + in.Disconnect(t, i) + } + t.InputStreams().Resize(0) + t.InputIndexes = nil +} -func (t *LRCConstructAnyType) GenerateOp(op *dag.Node) (exec.Op, error) { +func (t *LRCConstructAnyNode) NewOutput(dataIndex int) *dag.StreamVar { + t.OutputIndexes = append(t.OutputIndexes, dataIndex) + output := t.Graph().NewStreamVar() + t.OutputStreams().SetupNew(t, output) + return output +} + +func (t *LRCConstructAnyNode) GenerateOp() (exec.Op, error) { l, err := lrc.New(t.LRC.N, t.LRC.K, t.LRC.Groups) if err != nil { return nil, err @@ -135,50 +162,45 @@ func (t *LRCConstructAnyType) GenerateOp(op *dag.Node) (exec.Op, error) { return &GalMultiply{ Coef: coef, - Inputs: lo.Map(op.InputStreams, func(v *dag.StreamVar, idx int) *exec.StreamVar { return v.Var }), - Outputs: lo.Map(op.OutputStreams, func(v *dag.StreamVar, idx int) *exec.StreamVar { return v.Var }), + Inputs: lo.Map(t.InputStreams().RawArray(), func(v *dag.StreamVar, idx int) *exec.StreamVar { return v.Var }), + Outputs: lo.Map(t.OutputStreams().RawArray(), func(v *dag.StreamVar, idx int) *exec.StreamVar { return v.Var }), ChunkSize: t.LRC.ChunkSize, }, nil } -func (t *LRCConstructAnyType) AddInput(node *dag.Node, str *dag.StreamVar, dataIndex int) { - t.InputIndexes = append(t.InputIndexes, dataIndex) - node.InputStreams = append(node.InputStreams, str) - str.To(node, len(node.InputStreams)-1) -} +// func (t *LRCConstructAnyType) String() string { +// return fmt.Sprintf("LRCAny[]%v%v", formatStreamIO(node), formatValueIO(node)) +// } -func (t *LRCConstructAnyType) RemoveAllInputs(n *dag.Node) { - for _, in := range n.InputStreams { - in.From.Node.OutputStreams[in.From.SlotIndex].NotTo(n) - } - n.InputStreams = nil - t.InputIndexes = nil -} - -func (t *LRCConstructAnyType) NewOutput(node *dag.Node, dataIndex int) *dag.StreamVar { - t.OutputIndexes = append(t.OutputIndexes, dataIndex) - return dag.NodeNewOutputStream(node, &ioswitchlrc.VarProps{StreamIndex: dataIndex}) +type LRCConstructGroupNode struct { + dag.NodeBase + LRC cdssdk.LRCRedundancy + TargetBlockIndex int } -func (t *LRCConstructAnyType) String(node *dag.Node) string { - return fmt.Sprintf("LRCAny[]%v%v", formatStreamIO(node), formatValueIO(node)) +func (b *GraphNodeBuilder) NewLRCConstructGroup(lrc cdssdk.LRCRedundancy) *LRCConstructGroupNode { + node := &LRCConstructGroupNode{ + LRC: lrc, + } + b.AddNode(node) + return node } -type LRCConstructGroupType struct { - LRC cdssdk.LRCRedundancy - TargetBlockIndex int -} +func (t *LRCConstructGroupNode) SetupForTarget(blockIdx int, inputs []*dag.StreamVar) *dag.StreamVar { + t.TargetBlockIndex = blockIdx -func (t *LRCConstructGroupType) InitNode(node *dag.Node) { - dag.NodeNewOutputStream(node, &ioswitchlrc.VarProps{ - StreamIndex: t.TargetBlockIndex, - }) + t.InputStreams().Resize(0) + for _, in := range inputs { + idx := t.InputStreams().EnlargeOne() + in.Connect(t, idx) + } - grpIdx := t.LRC.FindGroup(t.TargetBlockIndex) - dag.NodeDeclareInputStream(node, t.LRC.Groups[grpIdx]) + output := t.Graph().NewStreamVar() + t.OutputStreams().Setup(t, output, 0) + return output } -func (t *LRCConstructGroupType) GenerateOp(op *dag.Node) (exec.Op, error) { +func (t *LRCConstructGroupNode) GenerateOp() (exec.Op, error) { l, err := lrc.New(t.LRC.N, t.LRC.K, t.LRC.Groups) if err != nil { return nil, err @@ -190,12 +212,12 @@ func (t *LRCConstructGroupType) GenerateOp(op *dag.Node) (exec.Op, error) { return &GalMultiply{ Coef: coef, - Inputs: lo.Map(op.InputStreams, func(v *dag.StreamVar, idx int) *exec.StreamVar { return v.Var }), - Outputs: lo.Map(op.OutputStreams, func(v *dag.StreamVar, idx int) *exec.StreamVar { return v.Var }), + Inputs: lo.Map(t.InputStreams().RawArray(), func(v *dag.StreamVar, idx int) *exec.StreamVar { return v.Var }), + Outputs: lo.Map(t.OutputStreams().RawArray(), func(v *dag.StreamVar, idx int) *exec.StreamVar { return v.Var }), ChunkSize: t.LRC.ChunkSize, }, nil } -func (t *LRCConstructGroupType) String(node *dag.Node) string { - return fmt.Sprintf("LRCGroup[]%v%v", formatStreamIO(node), formatValueIO(node)) -} +// func (t *LRCConstructGroupType) String() string { +// return fmt.Sprintf("LRCGroup[]%v%v", formatStreamIO(node), formatValueIO(node)) +// } diff --git a/common/pkgs/ioswitchlrc/ops2/ipfs.go b/common/pkgs/ioswitchlrc/ops2/ipfs.go index 3ccdc3d..f8e4e2a 100644 --- a/common/pkgs/ioswitchlrc/ops2/ipfs.go +++ b/common/pkgs/ioswitchlrc/ops2/ipfs.go @@ -12,7 +12,6 @@ import ( "gitlink.org.cn/cloudream/common/pkgs/logger" "gitlink.org.cn/cloudream/common/utils/io2" stgglb "gitlink.org.cn/cloudream/storage/common/globals" - "gitlink.org.cn/cloudream/storage/common/pkgs/ioswitchlrc" ) func init() { @@ -94,44 +93,78 @@ func (o *IPFSWrite) String() string { return fmt.Sprintf("IPFSWrite %v -> %v", o.Input.ID, o.FileHash.ID) } -type IPFSReadType struct { +type IPFSReadNode struct { + dag.NodeBase FileHash string Option ipfs.ReadOption } -func (t *IPFSReadType) InitNode(node *dag.Node) { - dag.NodeNewOutputStream(node, &ioswitchlrc.VarProps{}) +func (b *GraphNodeBuilder) NewIPFSRead(fileHash string, option ipfs.ReadOption) *IPFSReadNode { + node := &IPFSReadNode{ + FileHash: fileHash, + Option: option, + } + b.AddNode(node) + node.OutputStreams().SetupNew(node, b.NewStreamVar()) + return node +} + +func (t *IPFSReadNode) Output() dag.StreamSlot { + return dag.StreamSlot{ + Var: t.OutputStreams().Get(0), + Index: 0, + } } -func (t *IPFSReadType) GenerateOp(n *dag.Node) (exec.Op, error) { +func (t *IPFSReadNode) GenerateOp() (exec.Op, error) { return &IPFSRead{ - Output: n.OutputStreams[0].Var, + Output: t.OutputStreams().Get(0).Var, FileHash: t.FileHash, Option: t.Option, }, nil } -func (t *IPFSReadType) String(node *dag.Node) string { - return fmt.Sprintf("IPFSRead[%s,%v+%v]%v%v", t.FileHash, t.Option.Offset, t.Option.Length, formatStreamIO(node), formatValueIO(node)) -} +// func (t *IPFSReadType) String() string { +// return fmt.Sprintf("IPFSRead[%s,%v+%v]%v%v", t.FileHash, t.Option.Offset, t.Option.Length, formatStreamIO(node), formatValueIO(node)) +// } -type IPFSWriteType struct { +type IPFSWriteNode struct { + dag.NodeBase FileHashStoreKey string - Range exec.Range } -func (t *IPFSWriteType) InitNode(node *dag.Node) { - dag.NodeDeclareInputStream(node, 1) - dag.NodeNewOutputValue(node, dag.StringValueVar, &ioswitchlrc.VarProps{}) +func (b *GraphNodeBuilder) NewIPFSWrite(fileHashStoreKey string) *IPFSWriteNode { + node := &IPFSWriteNode{ + FileHashStoreKey: fileHashStoreKey, + } + b.AddNode(node) + return node +} + +func (t *IPFSWriteNode) SetInput(input *dag.StreamVar) { + t.InputStreams().EnsureSize(1) + input.Connect(t, 0) + t.OutputValues().SetupNew(t, t.Graph().NewValueVar(dag.StringValueVar)) +} + +func (t *IPFSWriteNode) Input() dag.StreamSlot { + return dag.StreamSlot{ + Var: t.InputStreams().Get(0), + Index: 0, + } } -func (t *IPFSWriteType) GenerateOp(op *dag.Node) (exec.Op, error) { +func (t *IPFSWriteNode) FileHashVar() *dag.ValueVar { + return t.OutputValues().Get(0) +} + +func (t *IPFSWriteNode) GenerateOp() (exec.Op, error) { return &IPFSWrite{ - Input: op.InputStreams[0].Var, - FileHash: op.OutputValues[0].Var.(*exec.StringVar), + Input: t.InputStreams().Get(0).Var, + FileHash: t.OutputValues().Get(0).Var.(*exec.StringVar), }, nil } -func (t *IPFSWriteType) String(node *dag.Node) string { - return fmt.Sprintf("IPFSWrite[%s,%v+%v]%v%v", t.FileHashStoreKey, t.Range.Offset, t.Range.Length, formatStreamIO(node), formatValueIO(node)) -} +// func (t *IPFSWriteType) String() string { +// return fmt.Sprintf("IPFSWrite[%s,%v+%v]%v%v", t.FileHashStoreKey, t.Range.Offset, t.Range.Length, formatStreamIO(node), formatValueIO(node)) +// } diff --git a/common/pkgs/ioswitchlrc/ops2/ops.go b/common/pkgs/ioswitchlrc/ops2/ops.go index c7309b6..a41ec08 100644 --- a/common/pkgs/ioswitchlrc/ops2/ops.go +++ b/common/pkgs/ioswitchlrc/ops2/ops.go @@ -1,75 +1,93 @@ package ops2 import ( - "fmt" - "gitlink.org.cn/cloudream/common/pkgs/ioswitch/dag" + "gitlink.org.cn/cloudream/common/pkgs/ioswitch/plan/ops" ) -func formatStreamIO(node *dag.Node) string { - is := "" - for i, in := range node.InputStreams { - if i > 0 { - is += "," - } - - if in == nil { - is += "." - } else { - is += fmt.Sprintf("%v", in.ID) - } - } - - os := "" - for i, out := range node.OutputStreams { - if i > 0 { - os += "," - } - - if out == nil { - os += "." - } else { - os += fmt.Sprintf("%v", out.ID) - } - } - - if is == "" && os == "" { - return "" - } - - return fmt.Sprintf("S{%s>%s}", is, os) +type GraphNodeBuilder struct { + *ops.GraphNodeBuilder +} + +func NewGraphNodeBuilder() *GraphNodeBuilder { + return &GraphNodeBuilder{ops.NewGraphNodeBuilder()} +} + +type FromNode interface { + dag.Node + Output() dag.StreamSlot } -func formatValueIO(node *dag.Node) string { - is := "" - for i, in := range node.InputValues { - if i > 0 { - is += "," - } - - if in == nil { - is += "." - } else { - is += fmt.Sprintf("%v", in.ID) - } - } - - os := "" - for i, out := range node.OutputValues { - if i > 0 { - os += "," - } - - if out == nil { - os += "." - } else { - os += fmt.Sprintf("%v", out.ID) - } - } - - if is == "" && os == "" { - return "" - } - - return fmt.Sprintf("V{%s>%s}", is, os) +type ToNode interface { + dag.Node + Input() dag.StreamSlot + SetInput(input *dag.StreamVar) } + +// func formatStreamIO(node *dag.Node) string { +// is := "" +// for i, in := range node.InputStreams { +// if i > 0 { +// is += "," +// } + +// if in == nil { +// is += "." +// } else { +// is += fmt.Sprintf("%v", in.ID) +// } +// } + +// os := "" +// for i, out := range node.OutputStreams { +// if i > ops +// os += "," +// } + +// if out == nil { +// os += "." +// } else { +// os += fmt.Sprintf("%v", out.ID) +// } +// } + +// if is == "" && os == "" { +// return "" +// } + +// return fmt.Sprintf("S{%s>%s}", is, os) +// } + +// func formatValueIO(node *dag.Node) string { +// is := "" +// for i, in := range node.InputValues { +// if i > 0 { +// is += "," +// } + +// if in == nil { +// is += "." +// } else { +// is += fmt.Sprintf("%v", in.ID) +// } +// } + +// os := "" +// for i, out := range node.OutputValues { +// if i > 0 { +// os += "," +// } + +// if out == nil { +// os += "." +// } else { +// os += fmt.Sprintf("%v", out.ID) +// } +// } + +// if is == "" && os == "" { +// return "" +// } + +// return fmt.Sprintf("V{%s>%s}", is, os) +// } diff --git a/common/pkgs/ioswitchlrc/ops2/range.go b/common/pkgs/ioswitchlrc/ops2/range.go index 5d97155..1475b4e 100644 --- a/common/pkgs/ioswitchlrc/ops2/range.go +++ b/common/pkgs/ioswitchlrc/ops2/range.go @@ -10,7 +10,6 @@ import ( "gitlink.org.cn/cloudream/common/pkgs/ioswitch/exec" "gitlink.org.cn/cloudream/common/utils/io2" "gitlink.org.cn/cloudream/common/utils/math2" - "gitlink.org.cn/cloudream/storage/common/pkgs/ioswitchlrc" ) func init() { @@ -76,24 +75,35 @@ func (o *Range) String() string { return fmt.Sprintf("Range(%v+%v) %v -> %v", o.Offset, o.Length, o.Input.ID, o.Output.ID) } -type RangeType struct { +type RangeNode struct { + dag.NodeBase Range exec.Range } -func (t *RangeType) InitNode(node *dag.Node) { - dag.NodeDeclareInputStream(node, 1) - dag.NodeNewOutputStream(node, &ioswitchlrc.VarProps{}) +func (b *GraphNodeBuilder) NewRange() *RangeNode { + node := &RangeNode{} + b.AddNode(node) + return node } -func (t *RangeType) GenerateOp(n *dag.Node) (exec.Op, error) { +func (t *RangeNode) RangeStream(input *dag.StreamVar, rng exec.Range) *dag.StreamVar { + t.InputStreams().EnsureSize(1) + input.Connect(t, 0) + t.Range = rng + output := t.Graph().NewStreamVar() + t.OutputStreams().Setup(t, output, 0) + return output +} + +func (t *RangeNode) GenerateOp() (exec.Op, error) { return &Range{ - Input: n.InputStreams[0].Var, - Output: n.OutputStreams[0].Var, + Input: t.InputStreams().Get(0).Var, + Output: t.OutputStreams().Get(0).Var, Offset: t.Range.Offset, Length: t.Range.Length, }, nil } -func (t *RangeType) String(node *dag.Node) string { - return fmt.Sprintf("Range[%v+%v]%v%v", t.Range.Offset, t.Range.Length, formatStreamIO(node), formatValueIO(node)) -} +// func (t *RangeType) String() string { +// return fmt.Sprintf("Range[%v+%v]%v%v", t.Range.Offset, t.Range.Length, formatStreamIO(node), formatValueIO(node)) +// } diff --git a/common/pkgs/ioswitchlrc/parser/generator.go b/common/pkgs/ioswitchlrc/parser/generator.go index cabe84b..33593a3 100644 --- a/common/pkgs/ioswitchlrc/parser/generator.go +++ b/common/pkgs/ioswitchlrc/parser/generator.go @@ -13,8 +13,9 @@ import ( type GenerateContext struct { LRC cdssdk.LRCRedundancy - DAG *dag.Graph - Toes []ioswitchlrc.To + DAG *ops2.GraphNodeBuilder + To []ioswitchlrc.To + ToNodes map[ioswitchlrc.To]ops2.ToNode StreamRange exec.Range } @@ -25,9 +26,10 @@ func Encode(fr ioswitchlrc.From, toes []ioswitchlrc.To, blder *exec.PlanBuilder) } ctx := GenerateContext{ - LRC: cdssdk.DefaultLRCRedundancy, - DAG: dag.NewGraph(), - Toes: toes, + LRC: cdssdk.DefaultLRCRedundancy, + DAG: ops2.NewGraphNodeBuilder(), + To: toes, + ToNodes: make(map[ioswitchlrc.To]ops2.ToNode), } calcStreamRange(&ctx) @@ -46,7 +48,7 @@ func Encode(fr ioswitchlrc.From, toes []ioswitchlrc.To, blder *exec.PlanBuilder) generateClone(&ctx) generateRange(&ctx) - return plan.Generate(ctx.DAG, blder) + return plan.Generate(ctx.DAG.Graph, blder) } func buildDAGEncode(ctx *GenerateContext, fr ioswitchlrc.From, toes []ioswitchlrc.To) error { @@ -66,9 +68,9 @@ func buildDAGEncode(ctx *GenerateContext, fr ioswitchlrc.From, toes []ioswitchlr if err != nil { return fmt.Errorf("building to node: %w", err) } + ctx.ToNodes[to] = toNode - frNode.OutputStreams[0].To(toNode, 0) - + toNode.SetInput(frNode.Output().Var) } else if idx < ctx.LRC.K { dataToes = append(dataToes, to) } else { @@ -81,19 +83,17 @@ func buildDAGEncode(ctx *GenerateContext, fr ioswitchlrc.From, toes []ioswitchlr } // 需要文件块,则生成Split指令 - splitNode := ctx.DAG.NewNode(&ops2.ChunkedSplitType{ - OutputCount: ctx.LRC.K, - ChunkSize: ctx.LRC.ChunkSize, - }, &ioswitchlrc.NodeProps{}) - frNode.OutputStreams[0].To(splitNode, 0) + splitNode := ctx.DAG.NewChunkedSplit(ctx.LRC.ChunkSize) + splitNode.Split(frNode.Output().Var, ctx.LRC.K) for _, to := range dataToes { toNode, err := buildToNode(ctx, to) if err != nil { return fmt.Errorf("building to node: %w", err) } + ctx.ToNodes[to] = toNode - splitNode.OutputStreams[to.GetDataIndex()].To(toNode, 0) + toNode.SetInput(splitNode.SubStream(to.GetDataIndex())) } if len(parityToes) == 0 { @@ -102,12 +102,10 @@ func buildDAGEncode(ctx *GenerateContext, fr ioswitchlrc.From, toes []ioswitchlr // 需要校验块,则进一步生成Construct指令 - conNode, conType := dag.NewNode(ctx.DAG, &ops2.LRCConstructAnyType{ - LRC: ctx.LRC, - }, &ioswitchlrc.NodeProps{}) + conType := ctx.DAG.NewLRCConstructAny(ctx.LRC) - for _, out := range splitNode.OutputStreams { - conType.AddInput(conNode, out, ioswitchlrc.SProps(out).StreamIndex) + for i, out := range splitNode.OutputStreams().RawArray() { + conType.AddInput(out, i) } for _, to := range parityToes { @@ -115,8 +113,9 @@ func buildDAGEncode(ctx *GenerateContext, fr ioswitchlrc.From, toes []ioswitchlr if err != nil { return fmt.Errorf("building to node: %w", err) } + ctx.ToNodes[to] = toNode - conType.NewOutput(conNode, to.GetDataIndex()).To(toNode, 0) + toNode.SetInput(conType.NewOutput(to.GetDataIndex())) } return nil } @@ -124,9 +123,10 @@ func buildDAGEncode(ctx *GenerateContext, fr ioswitchlrc.From, toes []ioswitchlr // 提供数据块+编码块中的k个块,重建任意块,包括完整文件。 func ReconstructAny(frs []ioswitchlrc.From, toes []ioswitchlrc.To, blder *exec.PlanBuilder) error { ctx := GenerateContext{ - LRC: cdssdk.DefaultLRCRedundancy, - DAG: dag.NewGraph(), - Toes: toes, + LRC: cdssdk.DefaultLRCRedundancy, + DAG: ops2.NewGraphNodeBuilder(), + To: toes, + ToNodes: make(map[ioswitchlrc.To]ops2.ToNode), } calcStreamRange(&ctx) @@ -145,11 +145,11 @@ func ReconstructAny(frs []ioswitchlrc.From, toes []ioswitchlrc.To, blder *exec.P generateClone(&ctx) generateRange(&ctx) - return plan.Generate(ctx.DAG, blder) + return plan.Generate(ctx.DAG.Graph, blder) } func buildDAGReconstructAny(ctx *GenerateContext, frs []ioswitchlrc.From, toes []ioswitchlrc.To) error { - frNodes := make(map[int]*dag.Node) + frNodes := make(map[int]ops2.FromNode) for _, fr := range frs { frNode, err := buildFromNode(ctx, fr) if err != nil { @@ -167,12 +167,13 @@ func buildDAGReconstructAny(ctx *GenerateContext, frs []ioswitchlrc.From, toes [ toIdx := to.GetDataIndex() fr := frNodes[toIdx] if fr != nil { - node, err := buildToNode(ctx, to) + toNode, err := buildToNode(ctx, to) if err != nil { return fmt.Errorf("building to node: %w", err) } + ctx.ToNodes[to] = toNode - fr.OutputStreams[0].To(node, 0) + toNode.SetInput(fr.Output().Var) continue } @@ -189,12 +190,9 @@ func buildDAGReconstructAny(ctx *GenerateContext, frs []ioswitchlrc.From, toes [ // 生成Construct指令来恢复缺少的块 - conNode, conType := dag.NewNode(ctx.DAG, &ops2.LRCConstructAnyType{ - LRC: ctx.LRC, - }, &ioswitchlrc.NodeProps{}) - - for _, fr := range frNodes { - conType.AddInput(conNode, fr.OutputStreams[0], ioswitchlrc.SProps(fr.OutputStreams[0]).StreamIndex) + conNode := ctx.DAG.NewLRCConstructAny(ctx.LRC) + for i, fr := range frNodes { + conNode.AddInput(fr.Output().Var, i) } for _, to := range missedToes { @@ -202,8 +200,9 @@ func buildDAGReconstructAny(ctx *GenerateContext, frs []ioswitchlrc.From, toes [ if err != nil { return fmt.Errorf("building to node: %w", err) } + ctx.ToNodes[to] = toNode - conType.NewOutput(conNode, to.GetDataIndex()).To(toNode, 0) + toNode.SetInput(conNode.NewOutput(to.GetDataIndex())) } if len(completeToes) == 0 { @@ -212,17 +211,14 @@ func buildDAGReconstructAny(ctx *GenerateContext, frs []ioswitchlrc.From, toes [ // 需要完整文件,则生成Join指令 - joinNode := ctx.DAG.NewNode(&ops2.ChunkedJoinType{ - InputCount: ctx.LRC.K, - ChunkSize: ctx.LRC.ChunkSize, - }, &ioswitchlrc.NodeProps{}) + joinNode := ctx.DAG.NewChunkedJoin(ctx.LRC.ChunkSize) for i := 0; i < ctx.LRC.K; i++ { - n := frNodes[i] - if n == nil { - conType.NewOutput(conNode, i).To(joinNode, i) + fr := frNodes[i] + if fr == nil { + joinNode.AddInput(conNode.NewOutput(i)) } else { - n.OutputStreams[0].To(joinNode, i) + joinNode.AddInput(fr.Output().Var) } } @@ -231,13 +227,14 @@ func buildDAGReconstructAny(ctx *GenerateContext, frs []ioswitchlrc.From, toes [ if err != nil { return fmt.Errorf("building to node: %w", err) } + ctx.ToNodes[to] = toNode - joinNode.OutputStreams[0].To(toNode, 0) + toNode.SetInput(joinNode.Joined()) } // 如果不需要Construct任何块,则删除这个节点 - if len(conNode.OutputStreams) == 0 { - conType.RemoveAllInputs(conNode) + if conNode.OutputStreams().Len() == 0 { + conNode.RemoveAllInputs() ctx.DAG.RemoveNode(conNode) } @@ -247,9 +244,10 @@ func buildDAGReconstructAny(ctx *GenerateContext, frs []ioswitchlrc.From, toes [ // 输入同一组的多个块,恢复出剩下缺少的一个块。 func ReconstructGroup(frs []ioswitchlrc.From, toes []ioswitchlrc.To, blder *exec.PlanBuilder) error { ctx := GenerateContext{ - LRC: cdssdk.DefaultLRCRedundancy, - DAG: dag.NewGraph(), - Toes: toes, + LRC: cdssdk.DefaultLRCRedundancy, + DAG: ops2.NewGraphNodeBuilder(), + To: toes, + ToNodes: make(map[ioswitchlrc.To]ops2.ToNode), } calcStreamRange(&ctx) @@ -268,33 +266,33 @@ func ReconstructGroup(frs []ioswitchlrc.From, toes []ioswitchlrc.To, blder *exec generateClone(&ctx) generateRange(&ctx) - return plan.Generate(ctx.DAG, blder) + return plan.Generate(ctx.DAG.Graph, blder) } func buildDAGReconstructGroup(ctx *GenerateContext, frs []ioswitchlrc.From, toes []ioswitchlrc.To) error { - missedGrpIdx := toes[0].GetDataIndex() - conNode := ctx.DAG.NewNode(&ops2.LRCConstructGroupType{ - LRC: ctx.LRC, - TargetBlockIndex: missedGrpIdx, - }, &ioswitchlrc.NodeProps{}) - - for i, fr := range frs { + var inputs []*dag.StreamVar + for _, fr := range frs { frNode, err := buildFromNode(ctx, fr) if err != nil { return fmt.Errorf("building from node: %w", err) } - frNode.OutputStreams[0].To(conNode, i) + inputs = append(inputs, frNode.Output().Var) } + missedGrpIdx := toes[0].GetDataIndex() + conNode := ctx.DAG.NewLRCConstructGroup(ctx.LRC) + missedBlk := conNode.SetupForTarget(missedGrpIdx, inputs) + for _, to := range toes { toNode, err := buildToNode(ctx, to) if err != nil { return fmt.Errorf("building to node: %w", err) } + ctx.ToNodes[to] = toNode - conNode.OutputStreams[0].To(toNode, 0) + toNode.SetInput(missedBlk) } return nil diff --git a/common/pkgs/ioswitchlrc/parser/passes.go b/common/pkgs/ioswitchlrc/parser/passes.go index d59f54f..4a5ba37 100644 --- a/common/pkgs/ioswitchlrc/parser/passes.go +++ b/common/pkgs/ioswitchlrc/parser/passes.go @@ -6,7 +6,6 @@ import ( "gitlink.org.cn/cloudream/common/pkgs/ioswitch/dag" "gitlink.org.cn/cloudream/common/pkgs/ioswitch/exec" - "gitlink.org.cn/cloudream/common/pkgs/ioswitch/plan/ops" "gitlink.org.cn/cloudream/common/pkgs/ipfs" "gitlink.org.cn/cloudream/common/utils/math2" "gitlink.org.cn/cloudream/storage/common/pkgs/ioswitchlrc" @@ -21,7 +20,7 @@ func calcStreamRange(ctx *GenerateContext) { Offset: math.MaxInt64, } - for _, to := range ctx.Toes { + for _, to := range ctx.To { if to.GetDataIndex() == -1 { toRng := to.GetRange() rng.ExtendStart(math2.Floor(toRng.Offset, stripSize)) @@ -48,7 +47,7 @@ func calcStreamRange(ctx *GenerateContext) { ctx.StreamRange = rng } -func buildFromNode(ctx *GenerateContext, f ioswitchlrc.From) (*dag.Node, error) { +func buildFromNode(ctx *GenerateContext, f ioswitchlrc.From) (ops2.FromNode, error) { var repRange exec.Range var blkRange exec.Range @@ -64,16 +63,10 @@ func buildFromNode(ctx *GenerateContext, f ioswitchlrc.From) (*dag.Node, error) switch f := f.(type) { case *ioswitchlrc.FromNode: - n, t := dag.NewNode(ctx.DAG, &ops2.IPFSReadType{ - FileHash: f.FileHash, - Option: ipfs.ReadOption{ - Offset: 0, - Length: -1, - }, - }, &ioswitchlrc.NodeProps{ - From: f, + t := ctx.DAG.NewIPFSRead(f.FileHash, ipfs.ReadOption{ + Offset: 0, + Length: -1, }) - ioswitchlrc.SProps(n.OutputStreams[0]).StreamIndex = f.DataIndex if f.DataIndex == -1 { t.Option.Offset = repRange.Offset @@ -88,17 +81,16 @@ func buildFromNode(ctx *GenerateContext, f ioswitchlrc.From) (*dag.Node, error) } if f.Node != nil { - n.Env.ToEnvWorker(&ioswitchlrc.AgentWorker{Node: *f.Node}) - n.Env.Pinned = true + t.Env().ToEnvWorker(&ioswitchlrc.AgentWorker{Node: *f.Node}) + t.Env().Pinned = true } - return n, nil + return t, nil case *ioswitchlrc.FromDriver: - n, _ := dag.NewNode(ctx.DAG, &ops.FromDriverType{Handle: f.Handle}, &ioswitchlrc.NodeProps{From: f}) - n.Env.ToEnvDriver() - n.Env.Pinned = true - ioswitchlrc.SProps(n.OutputStreams[0]).StreamIndex = f.DataIndex + n := ctx.DAG.NewFromDriver(f.Handle) + n.Env().ToEnvDriver() + n.Env().Pinned = true if f.DataIndex == -1 { f.Handle.RangeHint.Offset = repRange.Offset @@ -115,24 +107,19 @@ func buildFromNode(ctx *GenerateContext, f ioswitchlrc.From) (*dag.Node, error) } } -func buildToNode(ctx *GenerateContext, t ioswitchlrc.To) (*dag.Node, error) { +func buildToNode(ctx *GenerateContext, t ioswitchlrc.To) (ops2.ToNode, error) { switch t := t.(type) { case *ioswitchlrc.ToNode: - n, _ := dag.NewNode(ctx.DAG, &ops2.IPFSWriteType{ - FileHashStoreKey: t.FileHashStoreKey, - Range: t.Range, - }, &ioswitchlrc.NodeProps{ - To: t, - }) - n.Env.ToEnvWorker(&ioswitchlrc.AgentWorker{Node: t.Node}) - n.Env.Pinned = true + n := ctx.DAG.NewIPFSWrite(t.FileHashStoreKey) + n.Env().ToEnvWorker(&ioswitchlrc.AgentWorker{Node: t.Node}) + n.Env().Pinned = true return n, nil case *ioswitchlrc.ToDriver: - n, _ := dag.NewNode(ctx.DAG, &ops.ToDriverType{Handle: t.Handle, Range: t.Range}, &ioswitchlrc.NodeProps{To: t}) - n.Env.ToEnvDriver() - n.Env.Pinned = true + n := ctx.DAG.NewToDriver(t.Handle) + n.Env().ToEnvDriver() + n.Env().Pinned = true return n, nil @@ -146,21 +133,21 @@ func buildToNode(ctx *GenerateContext, t ioswitchlrc.To) (*dag.Node, error) { // 所以理论上不会出现有指令的位置始终无法确定的情况。 func pin(ctx *GenerateContext) bool { changed := false - ctx.DAG.Walk(func(node *dag.Node) bool { - if node.Env.Pinned { + ctx.DAG.Walk(func(node dag.Node) bool { + if node.Env().Pinned { return true } var toEnv *dag.NodeEnv - for _, out := range node.OutputStreams { - for _, to := range out.Toes { - if to.Node.Env.Type == dag.EnvUnknown { + for _, out := range node.OutputStreams().RawArray() { + for _, to := range out.To().RawArray() { + if to.Node.Env().Type == dag.EnvUnknown { continue } if toEnv == nil { - toEnv = &to.Node.Env - } else if !toEnv.Equals(to.Node.Env) { + toEnv = to.Node.Env() + } else if !toEnv.Equals(to.Node.Env()) { toEnv = nil break } @@ -168,35 +155,35 @@ func pin(ctx *GenerateContext) bool { } if toEnv != nil { - if !node.Env.Equals(*toEnv) { + if !node.Env().Equals(toEnv) { changed = true } - node.Env = *toEnv + *node.Env() = *toEnv return true } // 否则根据输入流的始发地来固定 var fromEnv *dag.NodeEnv - for _, in := range node.InputStreams { - if in.From.Node.Env.Type == dag.EnvUnknown { + for _, in := range node.InputStreams().RawArray() { + if in.From().Node.Env().Type == dag.EnvUnknown { continue } if fromEnv == nil { - fromEnv = &in.From.Node.Env - } else if !fromEnv.Equals(in.From.Node.Env) { + fromEnv = in.From().Node.Env() + } else if !fromEnv.Equals(in.From().Node.Env()) { fromEnv = nil break } } if fromEnv != nil { - if !node.Env.Equals(*fromEnv) { + if !node.Env().Equals(fromEnv) { changed = true } - node.Env = *fromEnv + *node.Env() = *fromEnv } return true }) @@ -206,12 +193,12 @@ func pin(ctx *GenerateContext) bool { // 对于所有未使用的流,增加Drop指令 func dropUnused(ctx *GenerateContext) { - ctx.DAG.Walk(func(node *dag.Node) bool { - for _, out := range node.OutputStreams { - if len(out.Toes) == 0 { - n := ctx.DAG.NewNode(&ops.DropType{}, &ioswitchlrc.NodeProps{}) - n.Env = node.Env - out.To(n, 0) + ctx.DAG.Walk(func(node dag.Node) bool { + for _, out := range node.OutputStreams().RawArray() { + if out.To().Len() == 0 { + n := ctx.DAG.NewDropStream() + *n.Env() = *node.Env() + n.SetInput(out) } } return true @@ -220,44 +207,38 @@ func dropUnused(ctx *GenerateContext) { // 为IPFS写入指令存储结果 func storeIPFSWriteResult(ctx *GenerateContext) { - dag.WalkOnlyType[*ops2.IPFSWriteType](ctx.DAG, func(node *dag.Node, typ *ops2.IPFSWriteType) bool { - if typ.FileHashStoreKey == "" { + dag.WalkOnlyType[*ops2.IPFSWriteNode](ctx.DAG.Graph, func(n *ops2.IPFSWriteNode) bool { + if n.FileHashStoreKey == "" { return true } - n, t := dag.NewNode(ctx.DAG, &ops.StoreType{ - StoreKey: typ.FileHashStoreKey, - }, &ioswitchlrc.NodeProps{}) - n.Env.ToEnvDriver() + storeNode := ctx.DAG.NewStore() + storeNode.Env().ToEnvDriver() - t.Store(n, node.OutputValues[0]) + storeNode.Store(n.FileHashStoreKey, n.FileHashVar()) return true }) } // 生成Range指令。StreamRange可能超过文件总大小,但Range指令会在数据量不够时不报错而是正常返回 func generateRange(ctx *GenerateContext) { - ctx.DAG.Walk(func(node *dag.Node) bool { - props := ioswitchlrc.NProps(node) - if props.To == nil { - return true - } + for i := 0; i < len(ctx.To); i++ { + to := ctx.To[i] + toNode := ctx.ToNodes[to] - toDataIdx := props.To.GetDataIndex() - toRng := props.To.GetRange() + toDataIdx := to.GetDataIndex() + toRng := to.GetRange() if toDataIdx == -1 { - n := ctx.DAG.NewNode(&ops2.RangeType{ - Range: exec.Range{ - Offset: toRng.Offset - ctx.StreamRange.Offset, - Length: toRng.Length, - }, - }, &ioswitchlrc.NodeProps{}) - n.Env = node.InputStreams[0].From.Node.Env - - node.InputStreams[0].To(n, 0) - node.InputStreams[0].NotTo(node) - n.OutputStreams[0].To(node, 0) + n := ctx.DAG.NewRange() + toInput := toNode.Input() + *n.Env() = *toInput.Var.From().Node.Env() + rnged := n.RangeStream(toInput.Var, exec.Range{ + Offset: toRng.Offset - ctx.StreamRange.Offset, + Length: toRng.Length, + }) + toInput.Var.Disconnect(toNode, toInput.Index) + toNode.SetInput(rnged) } else { stripSize := int64(ctx.LRC.ChunkSize * ctx.LRC.K) @@ -265,54 +246,48 @@ func generateRange(ctx *GenerateContext) { blkStart := blkStartIdx * int64(ctx.LRC.ChunkSize) - n := ctx.DAG.NewNode(&ops2.RangeType{ - Range: exec.Range{ - Offset: toRng.Offset - blkStart, - Length: toRng.Length, - }, - }, &ioswitchlrc.NodeProps{}) - n.Env = node.InputStreams[0].From.Node.Env - - node.InputStreams[0].To(n, 0) - node.InputStreams[0].NotTo(node) - n.OutputStreams[0].To(node, 0) + n := ctx.DAG.NewRange() + toInput := toNode.Input() + *n.Env() = *toInput.Var.From().Node.Env() + rnged := n.RangeStream(toInput.Var, exec.Range{ + Offset: toRng.Offset - blkStart, + Length: toRng.Length, + }) + toInput.Var.Disconnect(toNode, toInput.Index) + toNode.SetInput(rnged) } - - return true - }) + } } // 生成Clone指令 func generateClone(ctx *GenerateContext) { - ctx.DAG.Walk(func(node *dag.Node) bool { - for _, out := range node.OutputStreams { - if len(out.Toes) <= 1 { + ctx.DAG.Walk(func(node dag.Node) bool { + for _, out := range node.OutputStreams().RawArray() { + if out.To().Len() <= 1 { continue } - n, t := dag.NewNode(ctx.DAG, &ops2.CloneStreamType{}, &ioswitchlrc.NodeProps{}) - n.Env = node.Env - for _, to := range out.Toes { - str := t.NewOutput(n) - str.Props = &ioswitchlrc.VarProps{StreamIndex: ioswitchlrc.SProps(out).StreamIndex} - str.To(to.Node, to.SlotIndex) + t := ctx.DAG.NewCloneStream() + *t.Env() = *node.Env() + for _, to := range out.To().RawArray() { + t.NewOutput().Connect(to.Node, to.SlotIndex) } - out.Toes = nil - out.To(n, 0) + out.To().Resize(0) + t.SetInput(out) } - for _, out := range node.OutputValues { - if len(out.Toes) <= 1 { + for _, out := range node.OutputValues().RawArray() { + if out.To().Len() <= 1 { continue } - n, t := dag.NewNode(ctx.DAG, &ops2.CloneVarType{}, &ioswitchlrc.NodeProps{}) - n.Env = node.Env - for _, to := range out.Toes { - t.NewOutput(node).To(to.Node, to.SlotIndex) + t := ctx.DAG.NewCloneValue() + *t.Env() = *node.Env() + for _, to := range out.To().RawArray() { + t.NewOutput().Connect(to.Node, to.SlotIndex) } - out.Toes = nil - out.To(n, 0) + out.To().Resize(0) + t.SetInput(out) } return true diff --git a/common/pkgs/ioswitchlrc/utils.go b/common/pkgs/ioswitchlrc/utils.go deleted file mode 100644 index aa162ee..0000000 --- a/common/pkgs/ioswitchlrc/utils.go +++ /dev/null @@ -1,17 +0,0 @@ -package ioswitchlrc - -import ( - "gitlink.org.cn/cloudream/common/pkgs/ioswitch/dag" -) - -func NProps(n *dag.Node) *NodeProps { - return dag.NProps[*NodeProps](n) -} - -func SProps(str *dag.StreamVar) *VarProps { - return dag.SProps[*VarProps](str) -} - -func VProps(v *dag.ValueVar) *VarProps { - return dag.VProps[*VarProps](v) -}