diff --git a/common/models/models.go b/common/models/models.go index f3f56bb..2538a5d 100644 --- a/common/models/models.go +++ b/common/models/models.go @@ -3,6 +3,7 @@ package stgmod import ( "github.com/samber/lo" cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" + "gitlink.org.cn/cloudream/common/utils/sort2" ) type ObjectBlock struct { @@ -48,7 +49,7 @@ func (o *ObjectDetail) GroupBlocks() []GrouppedObjectBlock { grps[block.Index] = grp } - return lo.Values(grps) + return sort2.Sort(lo.Values(grps), func(l, r GrouppedObjectBlock) int { return l.Index - r.Index }) } type LocalMachineInfo struct { diff --git a/common/pkgs/ioswitch2/agent_worker.go b/common/pkgs/ioswitch2/agent_worker.go index 61793ae..cdca6d9 100644 --- a/common/pkgs/ioswitch2/agent_worker.go +++ b/common/pkgs/ioswitch2/agent_worker.go @@ -5,11 +5,17 @@ import ( "io" "gitlink.org.cn/cloudream/common/pkgs/ioswitch/exec" + "gitlink.org.cn/cloudream/common/pkgs/types" cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" + "gitlink.org.cn/cloudream/common/utils/serder" stgglb "gitlink.org.cn/cloudream/storage/common/globals" agtrpc "gitlink.org.cn/cloudream/storage/common/pkgs/grpc/agent" ) +var _ = serder.UseTypeUnionExternallyTagged(types.Ref(types.NewTypeUnion[exec.WorkerInfo]( + (*AgentWorker)(nil), +))) + type AgentWorker struct { Node cdssdk.Node } diff --git a/common/pkgs/ioswitch2/ops2/chunked.go b/common/pkgs/ioswitch2/ops2/chunked.go index f9d7b0a..3533a55 100644 --- a/common/pkgs/ioswitch2/ops2/chunked.go +++ b/common/pkgs/ioswitch2/ops2/chunked.go @@ -89,7 +89,9 @@ type ChunkedSplitType struct { func (t *ChunkedSplitType) InitNode(node *dag.Node) { dag.NodeDeclareInputStream(node, 1) for i := 0; i < t.OutputCount; i++ { - dag.NodeNewOutputStream(node, &ioswitch2.VarProps{}) + dag.NodeNewOutputStream(node, &ioswitch2.VarProps{ + StreamIndex: i, + }) } } diff --git a/common/pkgs/ioswitch2/ops2/clone.go b/common/pkgs/ioswitch2/ops2/clone.go index d53e29b..d577b53 100644 --- a/common/pkgs/ioswitch2/ops2/clone.go +++ b/common/pkgs/ioswitch2/ops2/clone.go @@ -104,7 +104,7 @@ func (t *CloneVarType) GenerateOp(op *dag.Node) (exec.Op, error) { } func (t *CloneVarType) NewOutput(node *dag.Node) *dag.ValueVar { - return dag.NodeNewOutputValue(node, nil) + return dag.NodeNewOutputValue(node, node.InputValues[0].Type, nil) } func (t *CloneVarType) String(node *dag.Node) string { diff --git a/common/pkgs/ioswitch2/ops2/ec.go b/common/pkgs/ioswitch2/ops2/ec.go index 76e6c22..192d79d 100644 --- a/common/pkgs/ioswitch2/ops2/ec.go +++ b/common/pkgs/ioswitch2/ops2/ec.go @@ -195,26 +195,19 @@ func (o *ECMultiply) Execute(ctx context.Context, e *exec.Executor) error { } type MultiplyType struct { - EC cdssdk.ECRedundancy + EC cdssdk.ECRedundancy + InputIndexes []int + OutputIndexes []int } func (t *MultiplyType) InitNode(node *dag.Node) {} func (t *MultiplyType) GenerateOp(op *dag.Node) (exec.Op, error) { - var inputIdxs []int - var outputIdxs []int - for _, in := range op.InputStreams { - inputIdxs = append(inputIdxs, ioswitch2.SProps(in).StreamIndex) - } - for _, out := range op.OutputStreams { - outputIdxs = append(outputIdxs, ioswitch2.SProps(out).StreamIndex) - } - rs, err := ec.NewRs(t.EC.K, t.EC.N) if err != nil { return nil, err } - coef, err := rs.GenerateMatrix(inputIdxs, outputIdxs) + coef, err := rs.GenerateMatrix(t.InputIndexes, t.OutputIndexes) if err != nil { return nil, err } @@ -227,12 +220,14 @@ func (t *MultiplyType) GenerateOp(op *dag.Node) (exec.Op, error) { }, nil } -func (t *MultiplyType) AddInput(node *dag.Node, str *dag.StreamVar) { +func (t *MultiplyType) AddInput(node *dag.Node, str *dag.StreamVar, dataIndex int) { + t.InputIndexes = append(t.InputIndexes, dataIndex) node.InputStreams = append(node.InputStreams, str) str.To(node, len(node.InputStreams)-1) } func (t *MultiplyType) NewOutput(node *dag.Node, dataIndex int) *dag.StreamVar { + t.OutputIndexes = append(t.OutputIndexes, dataIndex) return dag.NodeNewOutputStream(node, &ioswitch2.VarProps{StreamIndex: dataIndex}) } diff --git a/common/pkgs/ioswitch2/ops2/ipfs.go b/common/pkgs/ioswitch2/ops2/ipfs.go index fa02b6d..303157a 100644 --- a/common/pkgs/ioswitch2/ops2/ipfs.go +++ b/common/pkgs/ioswitch2/ops2/ipfs.go @@ -114,7 +114,7 @@ type IPFSWriteType struct { func (t *IPFSWriteType) InitNode(node *dag.Node) { dag.NodeDeclareInputStream(node, 1) - dag.NodeNewOutputValue(node, &ioswitch2.VarProps{}) + dag.NodeNewOutputValue(node, dag.StringValueVar, &ioswitch2.VarProps{}) } func (t *IPFSWriteType) GenerateOp(op *dag.Node) (exec.Op, error) { diff --git a/common/pkgs/ioswitch2/parser/parser.go b/common/pkgs/ioswitch2/parser/parser.go index bd6aed1..3eff6b5 100644 --- a/common/pkgs/ioswitch2/parser/parser.go +++ b/common/pkgs/ioswitch2/parser/parser.go @@ -35,7 +35,7 @@ type ParseContext struct { } func (p *DefaultParser) Parse(ft ioswitch2.FromTo, blder *exec.PlanBuilder) error { - ctx := ParseContext{Ft: ft} + ctx := ParseContext{Ft: ft, DAG: dag.NewGraph()} // 分成两个阶段: // 1. 基于From和To生成更多指令,初步匹配to的需求 @@ -136,17 +136,15 @@ func (p *DefaultParser) calcStreamRange(ctx *ParseContext) { func (p *DefaultParser) extend(ctx *ParseContext, ft ioswitch2.FromTo) error { for _, fr := range ft.Froms { - _, err := p.buildFromNode(ctx, &ft, fr) + frNode, err := p.buildFromNode(ctx, &ft, fr) if err != nil { return err } // 对于完整文件的From,生成Split指令 if fr.GetDataIndex() == -1 { - n, _ := dag.NewNode(ctx.DAG, &ops2.ChunkedSplitType{ChunkSize: p.EC.ChunkSize, OutputCount: p.EC.K}, &ioswitch2.NodeProps{}) - for i := 0; i < p.EC.K; i++ { - ioswitch2.SProps(n.OutputStreams[i]).StreamIndex = i - } + node, _ := dag.NewNode(ctx.DAG, &ops2.ChunkedSplitType{ChunkSize: p.EC.ChunkSize, OutputCount: p.EC.K}, &ioswitch2.NodeProps{}) + frNode.OutputStreams[0].To(node, 0) } } @@ -170,7 +168,7 @@ loop: }, &ioswitch2.NodeProps{}) for _, s := range ecInputStrs { - mulType.AddInput(mulNode, s) + mulType.AddInput(mulNode, s, ioswitch2.SProps(s).StreamIndex) } for i := 0; i < p.EC.N; i++ { mulType.NewOutput(mulNode, i) @@ -247,6 +245,7 @@ func (p *DefaultParser) buildFromNode(ctx *ParseContext, ft *ioswitch2.FromTo, f if f.Node != nil { n.Env.ToEnvWorker(&ioswitch2.AgentWorker{Node: *f.Node}) + n.Env.Pinned = true } return n, nil @@ -254,6 +253,7 @@ func (p *DefaultParser) buildFromNode(ctx *ParseContext, ft *ioswitch2.FromTo, f case *ioswitch2.FromDriver: n, _ := dag.NewNode(ctx.DAG, &ops.FromDriverType{Handle: f.Handle}, &ioswitch2.NodeProps{From: f}) n.Env.ToEnvDriver() + n.Env.Pinned = true ioswitch2.SProps(n.OutputStreams[0]).StreamIndex = f.DataIndex if f.DataIndex == -1 { @@ -280,12 +280,15 @@ func (p *DefaultParser) buildToNode(ctx *ParseContext, ft *ioswitch2.FromTo, t i }, &ioswitch2.NodeProps{ To: t, }) + n.Env.ToEnvWorker(&ioswitch2.AgentWorker{Node: t.Node}) + n.Env.Pinned = true return n, nil case *ioswitch2.ToDriver: n, _ := dag.NewNode(ctx.DAG, &ops.ToDriverType{Handle: t.Handle, Range: t.Range}, &ioswitch2.NodeProps{To: t}) n.Env.ToEnvDriver() + n.Env.Pinned = true return n, nil @@ -324,9 +327,11 @@ func (p *DefaultParser) removeUnusedMultiplyOutput(ctx *ParseContext) bool { } node.OutputStreams[i2] = nil + typ.OutputIndexes[i2] = -2 changed = true } node.OutputStreams = lo2.RemoveAllDefault(node.OutputStreams) + typ.OutputIndexes = lo2.RemoveAll(typ.OutputIndexes, -2) // 如果所有输出流都被删除,则删除该指令 if len(node.OutputStreams) == 0 { @@ -422,6 +427,10 @@ func (p *DefaultParser) omitSplitJoin(ctx *ParseContext) bool { func (p *DefaultParser) pin(ctx *ParseContext) bool { changed := false ctx.DAG.Walk(func(node *dag.Node) bool { + if node.Env.Pinned { + return true + } + var toEnv *dag.NodeEnv for _, out := range node.OutputStreams { for _, to := range out.Toes { @@ -496,12 +505,11 @@ func (p *DefaultParser) storeIPFSWriteResult(ctx *ParseContext) { return true } - n := ctx.DAG.NewNode(&ops.StoreType{ + n, t := dag.NewNode(ctx.DAG, &ops.StoreType{ StoreKey: typ.FileHashStoreKey, }, &ioswitch2.NodeProps{}) n.Env.ToEnvDriver() - - node.OutputValues[0].To(n, 0) + t.Store(n, node.OutputValues[0]) return true }) } @@ -564,7 +572,9 @@ func (p *DefaultParser) generateClone(ctx *ParseContext) { n, t := dag.NewNode(ctx.DAG, &ops2.CloneStreamType{}, &ioswitch2.NodeProps{}) n.Env = node.Env for _, to := range out.Toes { - t.NewOutput(node).To(to.Node, to.SlotIndex) + str := t.NewOutput(n) + str.Props = &ioswitch2.VarProps{StreamIndex: ioswitch2.SProps(out).StreamIndex} + str.To(to.Node, to.SlotIndex) } out.Toes = nil out.To(n, 0) @@ -578,7 +588,7 @@ func (p *DefaultParser) generateClone(ctx *ParseContext) { n, t := dag.NewNode(ctx.DAG, &ops2.CloneVarType{}, &ioswitch2.NodeProps{}) n.Env = node.Env for _, to := range out.Toes { - t.NewOutput(node).To(to.Node, to.SlotIndex) + t.NewOutput(n).To(to.Node, to.SlotIndex) } out.Toes = nil out.To(n, 0) diff --git a/common/pkgs/ioswitchlrc/ops2/clone.go b/common/pkgs/ioswitchlrc/ops2/clone.go index d53e29b..d577b53 100644 --- a/common/pkgs/ioswitchlrc/ops2/clone.go +++ b/common/pkgs/ioswitchlrc/ops2/clone.go @@ -104,7 +104,7 @@ func (t *CloneVarType) GenerateOp(op *dag.Node) (exec.Op, error) { } func (t *CloneVarType) NewOutput(node *dag.Node) *dag.ValueVar { - return dag.NodeNewOutputValue(node, nil) + return dag.NodeNewOutputValue(node, node.InputValues[0].Type, nil) } func (t *CloneVarType) String(node *dag.Node) string { diff --git a/common/pkgs/ioswitchlrc/ops2/ipfs.go b/common/pkgs/ioswitchlrc/ops2/ipfs.go index fa02b6d..303157a 100644 --- a/common/pkgs/ioswitchlrc/ops2/ipfs.go +++ b/common/pkgs/ioswitchlrc/ops2/ipfs.go @@ -114,7 +114,7 @@ type IPFSWriteType struct { func (t *IPFSWriteType) InitNode(node *dag.Node) { dag.NodeDeclareInputStream(node, 1) - dag.NodeNewOutputValue(node, &ioswitch2.VarProps{}) + dag.NodeNewOutputValue(node, dag.StringValueVar, &ioswitch2.VarProps{}) } func (t *IPFSWriteType) GenerateOp(op *dag.Node) (exec.Op, error) { diff --git a/common/pkgs/ioswitchlrc/parser/passes.go b/common/pkgs/ioswitchlrc/parser/passes.go index ed4b1c9..984cae2 100644 --- a/common/pkgs/ioswitchlrc/parser/passes.go +++ b/common/pkgs/ioswitchlrc/parser/passes.go @@ -89,6 +89,7 @@ func buildFromNode(ctx *GenerateContext, f ioswitchlrc.From) (*dag.Node, error) if f.Node != nil { n.Env.ToEnvWorker(&ioswitchlrc.AgentWorker{Node: *f.Node}) + n.Env.Pinned = true } return n, nil @@ -96,6 +97,7 @@ func buildFromNode(ctx *GenerateContext, f ioswitchlrc.From) (*dag.Node, error) case *ioswitchlrc.FromDriver: n, _ := dag.NewNode(ctx.DAG, &ops.FromDriverType{Handle: f.Handle}, &ioswitchlrc.NodeProps{From: f}) n.Env.ToEnvDriver() + n.Env.Pinned = true ioswitchlrc.SProps(n.OutputStreams[0]).StreamIndex = f.DataIndex if f.DataIndex == -1 { @@ -122,12 +124,15 @@ func buildToNode(ctx *GenerateContext, t ioswitchlrc.To) (*dag.Node, error) { }, &ioswitchlrc.NodeProps{ To: t, }) + n.Env.ToEnvWorker(&ioswitchlrc.AgentWorker{Node: t.Node}) + n.Env.Pinned = true return n, nil case *ioswitchlrc.ToDriver: n, _ := dag.NewNode(ctx.DAG, &ops.ToDriverType{Handle: t.Handle, Range: t.Range}, &ioswitchlrc.NodeProps{To: t}) n.Env.ToEnvDriver() + n.Env.Pinned = true return n, nil @@ -142,6 +147,10 @@ func buildToNode(ctx *GenerateContext, t ioswitchlrc.To) (*dag.Node, error) { func pin(ctx *GenerateContext) bool { changed := false ctx.DAG.Walk(func(node *dag.Node) bool { + if node.Env.Pinned { + return true + } + var toEnv *dag.NodeEnv for _, out := range node.OutputStreams { for _, to := range out.Toes {