diff --git a/magefiles/common.go b/magefiles/common.go index 577cf7c..9f207f7 100644 --- a/magefiles/common.go +++ b/magefiles/common.go @@ -15,6 +15,7 @@ var Global = struct { OS string Arch string BuildRoot string + Debug bool }{ Arch: "amd64", } @@ -50,7 +51,14 @@ func Build(args BuildArgs) error { binPath := filepath.Join(fullOutputDir, args.OutputName+goBuildArgs.OutputExt) fmt.Printf("building to %s\n", binPath) - goCmdArgs := []string{"build", "-o", binPath} + goCmdArgs := []string{"build"} + + if Global.Debug { + goCmdArgs = append(goCmdArgs, "-gcflags", "all=-N -l") + } + + goCmdArgs = append(goCmdArgs, "-o", binPath) + if args.EntryFile != "" { goCmdArgs = append(goCmdArgs, args.EntryFile) } diff --git a/magefiles/targets/targets.go b/magefiles/targets/targets.go index 09c8212..148cd3f 100644 --- a/magefiles/targets/targets.go +++ b/magefiles/targets/targets.go @@ -28,3 +28,8 @@ func ARM64() { func BuildRoot(dir string) { magefiles.Global.BuildRoot = dir } + +// [配置项]关闭编译优化,用于调试 +func Debug() { + magefiles.Global.Debug = true +} diff --git a/pkgs/ioswitch/dag/graph.go b/pkgs/ioswitch/dag/graph.go index d73a8e4..1a4e0f1 100644 --- a/pkgs/ioswitch/dag/graph.go +++ b/pkgs/ioswitch/dag/graph.go @@ -47,8 +47,12 @@ func (g *Graph) Walk(cb func(node Node) bool) { g.Nodes = lo2.RemoveAllDefault(g.Nodes) } -func (g *Graph) NewVar() *Var { - return &Var{} +func (g *Graph) NewStreamVar() *StreamVar { + return &StreamVar{} +} + +func (g *Graph) NewValueVar() *ValueVar { + return &ValueVar{} } func AddNode[N Node](graph *Graph, typ N) N { diff --git a/pkgs/ioswitch/dag/node.go b/pkgs/ioswitch/dag/node.go index 198cc6f..45f7f25 100644 --- a/pkgs/ioswitch/dag/node.go +++ b/pkgs/ioswitch/dag/node.go @@ -1,6 +1,7 @@ package dag import ( + "github.com/samber/lo" "gitlink.org.cn/cloudream/common/pkgs/ioswitch/exec" "gitlink.org.cn/cloudream/common/utils/lo2" ) @@ -50,105 +51,373 @@ type Node interface { Graph() *Graph SetGraph(graph *Graph) Env() *NodeEnv - InputStreams() *InputSlots - OutputStreams() *OutputSlots - InputValues() *InputSlots - OutputValues() *OutputSlots + InputStreams() *StreamInputSlots + OutputStreams() *StreamOutputSlots + InputValues() *ValueInputSlots + OutputValues() *ValueOutputSlots GenerateOp() (exec.Op, error) // String() string } -type VarSlots []*Var +type VarSlots[T any] []*T -func (s *VarSlots) Len() int { +func (s *VarSlots[T]) Len() int { return len(*s) } -func (s *VarSlots) Get(idx int) *Var { +func (s *VarSlots[T]) Get(idx int) *T { return (*s)[idx] } -func (s *VarSlots) Set(idx int, val *Var) *Var { +func (s *VarSlots[T]) Set(idx int, val *T) *T { old := (*s)[idx] (*s)[idx] = val return old } -func (s *VarSlots) Append(val *Var) int { +func (s *VarSlots[T]) IndexOf(v *T) int { + return lo.IndexOf(*s, v) +} + +func (s *VarSlots[T]) Append(val *T) int { *s = append(*s, val) return s.Len() - 1 } -func (s *VarSlots) RemoveAt(idx int) { +func (s *VarSlots[T]) Clear(val *T) { + for i := 0; i < s.Len(); i++ { + if (*s)[i] == val { + (*s)[i] = nil + } + } +} + +func (s *VarSlots[T]) RemoveAt(idx int) { (*s) = lo2.RemoveAt(*s, idx) } -func (s *VarSlots) Resize(size int) { +func (s *VarSlots[T]) RemoveRange(start int, cnt int) { + *s = lo2.RemoveRange(*s, start, cnt) +} + +func (s *VarSlots[T]) Resize(size int) { if s.Len() < size { - *s = append(*s, make([]*Var, size-s.Len())...) + *s = append(*s, make([]*T, size-s.Len())...) } else if s.Len() > size { *s = (*s)[:size] } } -func (s *VarSlots) SetRawArray(arr []*Var) { +func (s *VarSlots[T]) SetRawArray(arr []*T) { *s = arr } -func (s *VarSlots) RawArray() []*Var { +func (s *VarSlots[T]) RawArray() []*T { return *s } -type InputSlots struct { - VarSlots +type StreamInputSlots struct { + Slots VarSlots[StreamVar] +} + +func (s *StreamInputSlots) Len() int { + return s.Slots.Len() +} + +func (s *StreamInputSlots) Get(idx int) *StreamVar { + return s.Slots.Get(idx) +} + +func (s *StreamInputSlots) IndexOf(v *StreamVar) int { + return s.Slots.IndexOf(v) +} + +// 初始化输入流槽。调用者应该保证没有正在使用的槽位(即Slots的每一个元素都为nil) +func (s *StreamInputSlots) Init(cnt int) { + s.Slots.Resize(cnt) +} + +func (s *StreamInputSlots) EnlargeOne() int { + s.Slots.Append(nil) + return s.Len() - 1 +} + +func (s *StreamInputSlots) ClearInputAt(my Node, idx int) { + v := s.Get(idx) + if v == nil { + return + } + s.Slots.Set(idx, nil) + + v.Dst.Remove(my) +} + +func (s *StreamInputSlots) ClearAllInput(my Node) { + for i := 0; i < s.Len(); i++ { + v := s.Get(i) + if v == nil { + continue + } + s.Slots.Set(i, nil) + + v.Dst.Remove(my) + } +} + +func (s *StreamInputSlots) GetVarIDs() []exec.VarID { + var ids []exec.VarID + for _, v := range s.Slots.RawArray() { + if v == nil { + continue + } + ids = append(ids, v.VarID) + } + + return ids +} + +func (s *StreamInputSlots) GetVarIDsRanged(start, end int) []exec.VarID { + var ids []exec.VarID + for i := start; i < end; i++ { + v := s.Get(i) + if v == nil { + continue + } + ids = append(ids, v.VarID) + } + + return ids +} + +type ValueInputSlots struct { + Slots VarSlots[ValueVar] +} + +func (s *ValueInputSlots) Len() int { + return s.Slots.Len() } -func (s *InputSlots) EnsureSize(cnt int) { +func (s *ValueInputSlots) Get(idx int) *ValueVar { + return s.Slots.Get(idx) +} + +func (s *ValueInputSlots) IndexOf(v *ValueVar) int { + return s.Slots.IndexOf(v) +} + +// 初始化输入流槽。调用者应该保证没有正在使用的槽位(即Slots的每一个元素都为nil) +func (s *ValueInputSlots) Init(cnt int) { if s.Len() < cnt { - s.VarSlots = append(s.VarSlots, make([]*Var, cnt-s.Len())...) + s.Slots = append(s.Slots, make([]*ValueVar, cnt-s.Len())...) } } -func (s *InputSlots) EnlargeOne() int { - s.Append(nil) +func (s *ValueInputSlots) EnlargeOne() int { + s.Slots.Append(nil) return s.Len() - 1 } -type OutputSlots struct { - VarSlots +func (s *ValueInputSlots) ClearInputAt(my Node, idx int) { + v := s.Get(idx) + if v == nil { + return + } + s.Slots.Set(idx, nil) + + v.Dst.Remove(my) } -func (s *OutputSlots) Setup(my Node, v *Var, slotIdx int) { - if s.Len() <= slotIdx { - s.VarSlots = append(s.VarSlots, make([]*Var, slotIdx-s.Len()+1)...) +func (s *ValueInputSlots) GetVarIDs() []exec.VarID { + var ids []exec.VarID + for _, v := range s.Slots.RawArray() { + if v == nil { + continue + } + ids = append(ids, v.VarID) } - s.Set(slotIdx, v) - *v.From() = EndPoint{ - Node: my, - SlotIndex: slotIdx, + return ids +} + +func (s *ValueInputSlots) GetVarIDsRanged(start, end int) []exec.VarID { + var ids []exec.VarID + for i := start; i < end; i++ { + v := s.Get(i) + if v == nil { + continue + } + ids = append(ids, v.VarID) } + + return ids } -func (s *OutputSlots) SetupNew(my Node, v *Var) { - s.Append(v) - *v.From() = EndPoint{ - Node: my, - SlotIndex: s.Len() - 1, +type StreamOutputSlots struct { + Slots VarSlots[StreamVar] +} + +func (s *StreamOutputSlots) Len() int { + return s.Slots.Len() +} + +func (s *StreamOutputSlots) Get(idx int) *StreamVar { + return s.Slots.Get(idx) +} + +func (s *StreamOutputSlots) IndexOf(v *StreamVar) int { + return s.Slots.IndexOf(v) +} + +// 设置Slots大小,并为每个Slot创建一个StreamVar。 +// 调用者应该保证没有正在使用的输出流,即每一个输出流的Dst都为空。 +func (s *StreamOutputSlots) Init(my Node, size int) { + s.Slots.Resize(size) + for i := 0; i < size; i++ { + v := my.Graph().NewStreamVar() + v.Src = my + s.Slots.Set(i, v) } } -type Slot struct { - Var *Var +// 在Slots末尾增加一个StreamVar,并返回它的索引 +func (s *StreamOutputSlots) AppendNew(my Node) StreamSlot { + v := my.Graph().NewStreamVar() + v.Src = my + s.Slots.Append(v) + return StreamSlot{Var: v, Index: s.Len() - 1} +} + +// 断开指定位置的输出流到指定节点的连接 +func (s *StreamOutputSlots) ClearOutputAt(idx int, dst Node) { + v := s.Get(idx) + v.Dst.Remove(dst) + dst.InputStreams().Slots.Clear(v) +} + +// 断开所有输出流的所有连接,完全清空所有输出流。但会保留流变量 +func (s *StreamOutputSlots) ClearAllOutput(my Node) { + for i := 0; i < s.Len(); i++ { + v := s.Get(i) + v.ClearAllDst() + } +} + +func (s *StreamOutputSlots) GetVarIDs() []exec.VarID { + var ids []exec.VarID + for _, v := range s.Slots.RawArray() { + if v == nil { + continue + } + ids = append(ids, v.VarID) + } + + return ids +} + +func (s *StreamOutputSlots) GetVarIDsRanged(start, end int) []exec.VarID { + var ids []exec.VarID + for i := start; i < end; i++ { + v := s.Get(i) + if v == nil { + continue + } + ids = append(ids, v.VarID) + } + + return ids +} + +type ValueOutputSlots struct { + Slots VarSlots[ValueVar] +} + +func (s *ValueOutputSlots) Len() int { + return s.Slots.Len() +} + +func (s *ValueOutputSlots) Get(idx int) *ValueVar { + return s.Slots.Get(idx) +} + +func (s *ValueOutputSlots) IndexOf(v *ValueVar) int { + return s.Slots.IndexOf(v) +} + +// 设置Slots大小,并为每个Slot创建一个StreamVar +// 调用者应该保证没有正在使用的输出流,即每一个输出流的Dst都为空。 +func (s *ValueOutputSlots) Init(my Node, size int) { + s.Slots.Resize(size) + for i := 0; i < size; i++ { + v := my.Graph().NewValueVar() + v.Src = my + s.Slots.Set(i, v) + } +} + +// 在Slots末尾增加一个StreamVar,并返回它的索引 +func (s *ValueOutputSlots) AppendNew(my Node) ValueSlot { + v := my.Graph().NewValueVar() + v.Src = my + s.Slots.Append(v) + return ValueSlot{Var: v, Index: s.Len() - 1} +} + +// 断开指定位置的输出流到指定节点的连接 +func (s *ValueOutputSlots) ClearOutputAt(idx int, dst Node) { + v := s.Get(idx) + v.Dst.Remove(dst) + dst.InputValues().Slots.Clear(v) +} + +// 断开所有输出流的所有连接,完全清空所有输出流。但会保留流变量 +func (s *ValueOutputSlots) ClearAllOutput(my Node) { + for i := 0; i < s.Len(); i++ { + v := s.Get(i) + v.ClearAllDst() + } +} + +func (s *ValueOutputSlots) GetVarIDs() []exec.VarID { + var ids []exec.VarID + for _, v := range s.Slots.RawArray() { + if v == nil { + continue + } + ids = append(ids, v.VarID) + } + + return ids +} + +func (s *ValueOutputSlots) GetVarIDsRanged(start, end int) []exec.VarID { + var ids []exec.VarID + for i := start; i < end; i++ { + v := s.Get(i) + if v == nil { + continue + } + ids = append(ids, v.VarID) + } + + return ids +} + +type StreamSlot struct { + Var *StreamVar + Index int +} + +type ValueSlot struct { + Var *ValueVar Index int } type NodeBase struct { env NodeEnv - inputStreams InputSlots - outputStreams OutputSlots - inputValues InputSlots - outputValues OutputSlots + inputStreams StreamInputSlots + outputStreams StreamOutputSlots + inputValues ValueInputSlots + outputValues ValueOutputSlots graph *Graph } @@ -164,18 +433,18 @@ func (n *NodeBase) Env() *NodeEnv { return &n.env } -func (n *NodeBase) InputStreams() *InputSlots { +func (n *NodeBase) InputStreams() *StreamInputSlots { return &n.inputStreams } -func (n *NodeBase) OutputStreams() *OutputSlots { +func (n *NodeBase) OutputStreams() *StreamOutputSlots { return &n.outputStreams } -func (n *NodeBase) InputValues() *InputSlots { +func (n *NodeBase) InputValues() *ValueInputSlots { return &n.inputValues } -func (n *NodeBase) OutputValues() *OutputSlots { +func (n *NodeBase) OutputValues() *ValueOutputSlots { return &n.outputValues } diff --git a/pkgs/ioswitch/dag/var.go b/pkgs/ioswitch/dag/var.go index b8928cd..75e99f2 100644 --- a/pkgs/ioswitch/dag/var.go +++ b/pkgs/ioswitch/dag/var.go @@ -5,95 +5,108 @@ import ( "gitlink.org.cn/cloudream/common/utils/lo2" ) -type EndPoint struct { - Node Node - SlotIndex int // 所连接的Node的Output或Input数组的索引 +type Var2 interface { + GetVarID() exec.VarID } -type EndPointSlots []EndPoint - -func (s *EndPointSlots) Len() int { - return len(*s) +type StreamVar struct { + VarID exec.VarID + Src Node + Dst DstList } -func (s *EndPointSlots) Get(idx int) *EndPoint { - return &(*s)[idx] +func (v *StreamVar) GetVarID() exec.VarID { + return v.VarID } -func (s *EndPointSlots) Add(ed EndPoint) int { - (*s) = append((*s), ed) - return len(*s) - 1 +func (v *StreamVar) IndexAtSrc() int { + return v.Src.OutputStreams().IndexOf(v) } -func (s *EndPointSlots) Remove(ed EndPoint) { - for i, e := range *s { - if e == ed { - (*s) = lo2.RemoveAt((*s), i) - return - } - } +func (v *StreamVar) To(to Node, slotIdx int) { + v.Dst.Add(to) + to.InputStreams().Slots.Set(slotIdx, v) } -func (s *EndPointSlots) RemoveAt(idx int) { - lo2.RemoveAt((*s), idx) +func (v *StreamVar) NotTo(node Node) { + v.Dst.Remove(node) + node.InputStreams().Slots.Clear(v) } -func (s *EndPointSlots) Resize(size int) { - if s.Len() < size { - (*s) = append((*s), make([]EndPoint, size-s.Len())...) - } else if s.Len() > size { - (*s) = (*s)[:size] +func (v *StreamVar) ClearAllDst() { + for _, n := range v.Dst { + n.InputStreams().Slots.Clear(v) } + v.Dst = nil } -func (s *EndPointSlots) RawArray() []EndPoint { - return *s +type ValueVar struct { + VarID exec.VarID + Src Node + Dst DstList } -type Var struct { - VarID exec.VarID - from EndPoint - to EndPointSlots +func (v *ValueVar) GetVarID() exec.VarID { + return v.VarID } -func (v *Var) From() *EndPoint { - return &v.from +func (v *ValueVar) IndexAtSrc() int { + return v.Src.InputValues().IndexOf(v) } -func (v *Var) To() *EndPointSlots { - return &v.to +func (v *ValueVar) To(to Node, slotIdx int) { + v.Dst.Add(to) + to.InputValues().Slots.Set(slotIdx, v) } -func (v *Var) ValueTo(to Node, slotIdx int) { - v.To().Add(EndPoint{Node: to, SlotIndex: slotIdx}) - to.InputValues().Set(slotIdx, v) +func (v *ValueVar) NotTo(node Node) { + v.Dst.Remove(node) + node.InputValues().Slots.Clear(v) } -func (v *Var) ValueNotTo(node Node, slotIdx int) { - v.to.Remove(EndPoint{Node: node, SlotIndex: slotIdx}) - node.InputValues().Set(slotIdx, nil) +func (v *ValueVar) ClearAllDst() { + for _, n := range v.Dst { + n.InputValues().Slots.Clear(v) + } + v.Dst = nil } -func (v *Var) StreamTo(to Node, slotIdx int) { - v.To().Add(EndPoint{Node: to, SlotIndex: slotIdx}) - to.InputStreams().Set(slotIdx, v) +type DstList []Node + +func (s *DstList) Len() int { + return len(*s) } -func (v *Var) StreamNotTo(node Node, slotIdx int) { - v.to.Remove(EndPoint{Node: node, SlotIndex: slotIdx}) - node.InputStreams().Set(slotIdx, nil) +func (s *DstList) Get(idx int) Node { + return (*s)[idx] } -func (v *Var) NoInputAllValue() { - for _, ed := range v.to { - ed.Node.InputValues().Set(ed.SlotIndex, nil) +func (s *DstList) Add(n Node) int { + (*s) = append((*s), n) + return len(*s) - 1 +} + +func (s *DstList) Remove(n Node) { + for i, e := range *s { + if e == n { + (*s) = lo2.RemoveAt((*s), i) + return + } } - v.to = nil } -func (v *Var) NoInputAllStream() { - for _, ed := range v.to { - ed.Node.InputStreams().Set(ed.SlotIndex, nil) +func (s *DstList) RemoveAt(idx int) { + lo2.RemoveAt((*s), idx) +} + +func (s *DstList) Resize(size int) { + if s.Len() < size { + (*s) = append((*s), make([]Node, size-s.Len())...) + } else if s.Len() > size { + (*s) = (*s)[:size] } - v.to = nil +} + +func (s *DstList) RawArray() []Node { + return *s } diff --git a/pkgs/ioswitch/exec/utils.go b/pkgs/ioswitch/exec/utils.go index f876f2c..09c48d3 100644 --- a/pkgs/ioswitch/exec/utils.go +++ b/pkgs/ioswitch/exec/utils.go @@ -14,6 +14,10 @@ type Range struct { Length *int64 } +func NewRange(offset int64, length int64) Range { + return Range{Offset: offset, Length: &length} +} + func (r *Range) Extend(other Range) { newOffset := math2.Min(r.Offset, other.Offset) diff --git a/pkgs/ioswitch/plan/generate.go b/pkgs/ioswitch/plan/generate.go index 6d155d9..addeaed 100644 --- a/pkgs/ioswitch/plan/generate.go +++ b/pkgs/ioswitch/plan/generate.go @@ -30,57 +30,61 @@ func generateSend(graph *ops.GraphNodeBuilder) { for i := 0; i < node.OutputStreams().Len(); i++ { out := node.OutputStreams().Get(i) - to := out.To().Get(0) - if to.Node.Env().Equals(node.Env()) { + to := out.Dst.Get(0) + if to.Env().Equals(node.Env()) { continue } - switch to.Node.Env().Type { + switch to.Env().Type { case dag.EnvDriver: // // 如果是要送到Driver,则只能由Driver主动去拉取 + dstNode := out.Dst.Get(0) + getNode := graph.NewGetStream(node.Env().Worker) getNode.Env().ToEnvDriver() // // 同时需要对此变量生成HoldUntil指令,避免Plan结束时Get指令还未到达 - holdType := graph.NewHoldUntil() //dag.NewNode(graph, &ops.HoldUntilNode{}, nil) - *holdType.Env() = *node.Env() + holdNode := graph.NewHoldUntil() + *holdNode.Env() = *node.Env() // 将Get指令的信号送到Hold指令 - holdType.SetSignal(getNode.SignalVar()) + holdNode.SetSignal(getNode.SignalVar()) - out.To().RemoveAt(0) + out.Dst.RemoveAt(0) // 将源节点的输出送到Hold指令,将Hold指令的输出送到Get指令 - getNode.Get(holdType.HoldStream(out)). + getNode.Get(holdNode.HoldStream(out)). // 将Get指令的输出送到目的地 - StreamTo(to.Node, to.SlotIndex) + To(to, dstNode.InputStreams().IndexOf(out)) case dag.EnvWorker: // 如果是要送到Agent,则可以直接发送 - n := graph.NewSendStream(to.Node.Env().Worker) + dstNode := out.Dst.Get(0) + n := graph.NewSendStream(to.Env().Worker) *n.Env() = *node.Env() - out.To().RemoveAt(0) - n.Send(out).StreamTo(to.Node, to.SlotIndex) + out.Dst.RemoveAt(0) + n.Send(out).To(to, dstNode.InputStreams().IndexOf(out)) } } for i := 0; i < node.OutputValues().Len(); i++ { out := node.OutputValues().Get(i) // 允许Value变量不被使用 - if out.To().Len() == 0 { + if out.Dst.Len() == 0 { continue } - to := out.To().Get(0) - if to.Node.Env().Equals(node.Env()) { + to := out.Dst.Get(0) + if to.Env().Equals(node.Env()) { continue } - switch to.Node.Env().Type { + switch to.Env().Type { case dag.EnvDriver: // // 如果是要送到Driver,则只能由Driver主动去拉取 + dstNode := out.Dst.Get(0) getNode := graph.NewGetValue(node.Env().Worker) getNode.Env().ToEnvDriver() @@ -91,21 +95,22 @@ func generateSend(graph *ops.GraphNodeBuilder) { // 将Get指令的信号送到Hold指令 holdNode.SetSignal(getNode.SignalVar()) - out.To().RemoveAt(0) + out.Dst.RemoveAt(0) // 将源节点的输出送到Hold指令,将Hold指令的输出送到Get指令 getNode.Get(holdNode.HoldVar(out)). // 将Get指令的输出送到目的地 - ValueTo(to.Node, to.SlotIndex) + To(to, dstNode.InputValues().IndexOf(out)) case dag.EnvWorker: // 如果是要送到Agent,则可以直接发送 - t := graph.NewSendValue(to.Node.Env().Worker) + dstNode := out.Dst.Get(0) + t := graph.NewSendValue(to.Env().Worker) *t.Env() = *node.Env() - out.To().RemoveAt(0) + out.Dst.RemoveAt(0) - t.Send(out).ValueTo(to.Node, to.SlotIndex) + t.Send(out).To(to, dstNode.InputValues().IndexOf(out)) } } @@ -119,6 +124,9 @@ func buildPlan(graph *dag.Graph, blder *exec.PlanBuilder) error { graph.Walk(func(node dag.Node) bool { for i := 0; i < node.OutputStreams().Len(); i++ { out := node.OutputStreams().Get(i) + if out == nil { + continue + } if out.VarID > 0 { continue @@ -129,6 +137,9 @@ func buildPlan(graph *dag.Graph, blder *exec.PlanBuilder) error { for i := 0; i < node.InputStreams().Len(); i++ { in := node.InputStreams().Get(i) + if in == nil { + continue + } if in.VarID > 0 { continue @@ -139,6 +150,9 @@ func buildPlan(graph *dag.Graph, blder *exec.PlanBuilder) error { for i := 0; i < node.OutputValues().Len(); i++ { out := node.OutputValues().Get(i) + if out == nil { + continue + } if out.VarID > 0 { continue @@ -149,6 +163,9 @@ func buildPlan(graph *dag.Graph, blder *exec.PlanBuilder) error { for i := 0; i < node.InputValues().Len(); i++ { in := node.InputValues().Get(i) + if in == nil { + continue + } if in.VarID > 0 { continue diff --git a/pkgs/ioswitch/plan/ops/driver.go b/pkgs/ioswitch/plan/ops/driver.go index 392b88a..a15fbee 100644 --- a/pkgs/ioswitch/plan/ops/driver.go +++ b/pkgs/ioswitch/plan/ops/driver.go @@ -16,13 +16,13 @@ func (b *GraphNodeBuilder) NewFromDriver(handle *exec.DriverWriteStream) *FromDr } b.AddNode(node) - node.OutputStreams().SetupNew(node, b.NewVar()) + node.OutputStreams().Init(node, 1) return node } -func (t *FromDriverNode) Output() dag.Slot { - return dag.Slot{ +func (t *FromDriverNode) Output() dag.StreamSlot { + return dag.StreamSlot{ Var: t.OutputStreams().Get(0), Index: 0, } @@ -49,16 +49,16 @@ func (b *GraphNodeBuilder) NewToDriver(handle *exec.DriverReadStream) *ToDriverN } b.AddNode(node) + node.InputStreams().Init(1) return node } -func (t *ToDriverNode) SetInput(v *dag.Var) { - t.InputStreams().EnsureSize(1) - v.StreamTo(t, 0) +func (t *ToDriverNode) SetInput(v *dag.StreamVar) { + v.To(t, 0) } -func (t *ToDriverNode) Input() dag.Slot { - return dag.Slot{ +func (t *ToDriverNode) Input() dag.StreamSlot { + return dag.StreamSlot{ Var: t.InputStreams().Get(0), Index: 0, } diff --git a/pkgs/ioswitch/plan/ops/drop.go b/pkgs/ioswitch/plan/ops/drop.go index f03283a..bb946f7 100644 --- a/pkgs/ioswitch/plan/ops/drop.go +++ b/pkgs/ioswitch/plan/ops/drop.go @@ -46,12 +46,13 @@ type DropNode struct { func (b *GraphNodeBuilder) NewDropStream() *DropNode { node := &DropNode{} b.AddNode(node) + + node.InputStreams().Init(1) return node } -func (t *DropNode) SetInput(v *dag.Var) { - t.InputStreams().EnsureSize(1) - v.StreamTo(t, 0) +func (t *DropNode) SetInput(v *dag.StreamVar) { + v.To(t, 0) } func (t *DropNode) GenerateOp() (exec.Op, error) { diff --git a/pkgs/ioswitch/plan/ops/send.go b/pkgs/ioswitch/plan/ops/send.go index 625f701..81bd6da 100644 --- a/pkgs/ioswitch/plan/ops/send.go +++ b/pkgs/ioswitch/plan/ops/send.go @@ -150,15 +150,15 @@ func (b *GraphNodeBuilder) NewSendStream(to exec.WorkerInfo) *SendStreamNode { ToWorker: to, } b.AddNode(node) + + node.InputStreams().Init(1) + node.OutputStreams().Init(node, 1) return node } -func (t *SendStreamNode) Send(v *dag.Var) *dag.Var { - t.InputStreams().EnsureSize(1) - v.StreamTo(t, 0) - output := t.Graph().NewVar() - t.OutputStreams().Setup(t, output, 0) - return output +func (t *SendStreamNode) Send(v *dag.StreamVar) *dag.StreamVar { + v.To(t, 0) + return t.OutputStreams().Get(0) } func (t *SendStreamNode) GenerateOp() (exec.Op, error) { @@ -183,15 +183,15 @@ func (b *GraphNodeBuilder) NewSendValue(to exec.WorkerInfo) *SendValueNode { ToWorker: to, } b.AddNode(node) + + node.InputValues().Init(1) + node.OutputValues().Init(node, 1) return node } -func (t *SendValueNode) Send(v *dag.Var) *dag.Var { - t.InputValues().EnsureSize(1) - v.ValueTo(t, 0) - output := t.Graph().NewVar() - t.OutputValues().Setup(t, output, 0) - return output +func (t *SendValueNode) Send(v *dag.ValueVar) *dag.ValueVar { + v.To(t, 0) + return t.OutputValues().Get(0) } func (t *SendValueNode) GenerateOp() (exec.Op, error) { @@ -216,19 +216,19 @@ func (b *GraphNodeBuilder) NewGetStream(from exec.WorkerInfo) *GetStreamNode { FromWorker: from, } b.AddNode(node) - node.OutputValues().Setup(node, node.Graph().NewVar(), 0) + + node.InputStreams().Init(1) + node.OutputValues().Init(node, 1) + node.OutputStreams().Init(node, 1) return node } -func (t *GetStreamNode) Get(v *dag.Var) *dag.Var { - t.InputStreams().EnsureSize(1) - v.StreamTo(t, 0) - output := t.Graph().NewVar() - t.OutputStreams().Setup(t, output, 0) - return output +func (t *GetStreamNode) Get(v *dag.StreamVar) *dag.StreamVar { + v.To(t, 0) + return t.OutputStreams().Get(0) } -func (t *GetStreamNode) SignalVar() *dag.Var { +func (t *GetStreamNode) SignalVar() *dag.ValueVar { return t.OutputValues().Get(0) } @@ -255,19 +255,18 @@ func (b *GraphNodeBuilder) NewGetValue(from exec.WorkerInfo) *GetValueNode { FromWorker: from, } b.AddNode(node) - node.OutputValues().Setup(node, node.Graph().NewVar(), 0) + + node.InputValues().Init(1) + node.OutputValues().Init(node, 2) return node } -func (t *GetValueNode) Get(v *dag.Var) *dag.Var { - t.InputValues().EnsureSize(1) - v.ValueTo(t, 0) - output := t.Graph().NewVar() - t.OutputValues().Setup(t, output, 1) - return output +func (t *GetValueNode) Get(v *dag.ValueVar) *dag.ValueVar { + v.To(t, 0) + return t.OutputValues().Get(1) } -func (t *GetValueNode) SignalVar() *dag.Var { +func (t *GetValueNode) SignalVar() *dag.ValueVar { return t.OutputValues().Get(0) } diff --git a/pkgs/ioswitch/plan/ops/store.go b/pkgs/ioswitch/plan/ops/store.go index f2a7f74..977af42 100644 --- a/pkgs/ioswitch/plan/ops/store.go +++ b/pkgs/ioswitch/plan/ops/store.go @@ -37,10 +37,10 @@ func (b *GraphNodeBuilder) NewStore() *StoreNode { return node } -func (t *StoreNode) Store(key string, v *dag.Var) { +func (t *StoreNode) Store(key string, v *dag.ValueVar) { t.Key = key - t.InputValues().EnsureSize(1) - v.ValueTo(t, 0) + t.InputValues().Init(1) + v.To(t, 0) } func (t *StoreNode) GenerateOp() (exec.Op, error) { diff --git a/pkgs/ioswitch/plan/ops/sync.go b/pkgs/ioswitch/plan/ops/sync.go index b4437f1..a356bb1 100644 --- a/pkgs/ioswitch/plan/ops/sync.go +++ b/pkgs/ioswitch/plan/ops/sync.go @@ -165,26 +165,22 @@ type HoldUntilNode struct { func (b *GraphNodeBuilder) NewHoldUntil() *HoldUntilNode { node := &HoldUntilNode{} b.AddNode(node) + node.InputValues().Init(1) return node } -func (t *HoldUntilNode) SetSignal(s *dag.Var) { - t.InputValues().EnsureSize(1) - s.ValueTo(t, 0) +func (t *HoldUntilNode) SetSignal(s *dag.ValueVar) { + s.To(t, 0) } -func (t *HoldUntilNode) HoldStream(str *dag.Var) *dag.Var { - str.StreamTo(t, t.InputStreams().EnlargeOne()) - output := t.Graph().NewVar() - t.OutputStreams().SetupNew(t, output) - return output +func (t *HoldUntilNode) HoldStream(str *dag.StreamVar) *dag.StreamVar { + str.To(t, t.InputStreams().EnlargeOne()) + return t.OutputStreams().AppendNew(t).Var } -func (t *HoldUntilNode) HoldVar(v *dag.Var) *dag.Var { - v.ValueTo(t, t.InputValues().EnlargeOne()) - output := t.Graph().NewVar() - t.OutputValues().SetupNew(t, output) - return output +func (t *HoldUntilNode) HoldVar(v *dag.ValueVar) *dag.ValueVar { + v.To(t, t.InputValues().EnlargeOne()) + return t.OutputValues().AppendNew(t).Var } func (t *HoldUntilNode) GenerateOp() (exec.Op, error) { diff --git a/sdks/storage/cdsapi/object.go b/sdks/storage/cdsapi/object.go index 5283be0..f0743f6 100644 --- a/sdks/storage/cdsapi/object.go +++ b/sdks/storage/cdsapi/object.go @@ -33,9 +33,10 @@ type ObjectUpload struct { } type ObjectUploadInfo struct { - UserID cdssdk.UserID `json:"userID" binding:"required"` - PackageID cdssdk.PackageID `json:"packageID" binding:"required"` - StorageAffinity cdssdk.StorageID `json:"storageAffinity"` + UserID cdssdk.UserID `json:"userID" binding:"required"` + PackageID cdssdk.PackageID `json:"packageID" binding:"required"` + Affinity cdssdk.StorageID `json:"affinity"` + LoadTo []cdssdk.StorageID `json:"loadTo"` } type UploadingObject struct { @@ -46,11 +47,7 @@ type UploadingObject struct { type UploadObjectIterator = iterator.Iterator[*UploadingObject] type ObjectUploadResp struct { - Uploadeds []UploadedObject `json:"uploadeds"` -} -type UploadedObject struct { - Object *cdssdk.Object `json:"object"` - Error string `json:"error"` + Uploadeds []cdssdk.Object `json:"uploadeds"` } func (c *ObjectService) Upload(req ObjectUpload) (*ObjectUploadResp, error) { diff --git a/sdks/storage/cdsapi/package.go b/sdks/storage/cdsapi/package.go index 9dfb6c7..6be57aa 100644 --- a/sdks/storage/cdsapi/package.go +++ b/sdks/storage/cdsapi/package.go @@ -6,6 +6,7 @@ import ( "strings" "gitlink.org.cn/cloudream/common/consts/errorcode" + "gitlink.org.cn/cloudream/common/pkgs/iterator" cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" "gitlink.org.cn/cloudream/common/utils/http2" "gitlink.org.cn/cloudream/common/utils/serder" @@ -127,6 +128,61 @@ func (s *PackageService) Create(req PackageCreate) (*PackageCreateResp, error) { return nil, codeResp.ToError() } +const PackageCreateLoadPath = "/package/createLoad" + +type PackageCreateLoad struct { + PackageCreateLoadInfo + Files UploadObjectIterator `json:"-"` +} +type PackageCreateLoadInfo struct { + UserID cdssdk.UserID `json:"userID" binding:"required"` + BucketID cdssdk.BucketID `json:"bucketID" binding:"required"` + Name string `json:"name" binding:"required"` + LoadTo []cdssdk.StorageID `json:"loadTo" binding:"required"` +} +type PackageCreateLoadResp struct { + Package cdssdk.Package `json:"package"` + Objects []cdssdk.Object `json:"objects"` + LoadedDirs []string `json:"loadedDirs"` +} + +func (c *PackageService) CreateLoad(req PackageCreateLoad) (*PackageCreateLoadResp, error) { + url, err := url.JoinPath(c.baseURL, PackageCreateLoadPath) + if err != nil { + return nil, err + } + + infoJSON, err := serder.ObjectToJSON(req) + if err != nil { + return nil, fmt.Errorf("upload info to json: %w", err) + } + + resp, err := http2.PostMultiPart(url, http2.MultiPartRequestParam{ + Form: map[string]string{"info": string(infoJSON)}, + Files: iterator.Map(req.Files, func(src *UploadingObject) (*http2.IterMultiPartFile, error) { + return &http2.IterMultiPartFile{ + FieldName: "files", + FileName: src.Path, + File: src.File, + }, nil + }), + }) + if err != nil { + return nil, err + } + + codeResp, err := ParseJSONResponse[response[PackageCreateLoadResp]](resp) + if err != nil { + return nil, err + } + + if codeResp.Code == errorcode.OK { + return &codeResp.Data, nil + } + + return nil, codeResp.ToError() +} + const PackageDeletePath = "/package/delete" type PackageDelete struct { diff --git a/sdks/storage/cdsapi/storage_test.go b/sdks/storage/cdsapi/storage_test.go index 6a76ad4..cec219e 100644 --- a/sdks/storage/cdsapi/storage_test.go +++ b/sdks/storage/cdsapi/storage_test.go @@ -89,9 +89,9 @@ func Test_Object(t *testing.T) { _, err = cli.Object().Upload(ObjectUpload{ ObjectUploadInfo: ObjectUploadInfo{ - UserID: 1, - PackageID: createResp.Package.PackageID, - StorageAffinity: stgAff, + UserID: 1, + PackageID: createResp.Package.PackageID, + Affinity: stgAff, }, Files: iterator.Array( &UploadingObject{ diff --git a/sdks/storage/models.go b/sdks/storage/models.go index cfc28bd..2b48c9c 100644 --- a/sdks/storage/models.go +++ b/sdks/storage/models.go @@ -5,6 +5,7 @@ import ( "fmt" "time" + "github.com/samber/lo" "gitlink.org.cn/cloudream/common/pkgs/types" "gitlink.org.cn/cloudream/common/utils/serder" ) @@ -33,7 +34,6 @@ type FileHash string /// TODO 将分散在各处的公共结构体定义集中到这里来 type Redundancy interface { - driver.Valuer } var RedundancyUnion = serder.UseTypeUnionInternallyTagged(types.Ref(types.NewTypeUnion[Redundancy]( @@ -41,6 +41,7 @@ var RedundancyUnion = serder.UseTypeUnionInternallyTagged(types.Ref(types.NewTyp (*RepRedundancy)(nil), (*ECRedundancy)(nil), (*LRCRedundancy)(nil), + (*SegmentRedundancy)(nil), )), "type") type NoneRedundancy struct { @@ -162,6 +163,83 @@ func (b *LRCRedundancy) GetGroupElements(grp int) []int { return idxes } +type SegmentRedundancy struct { + serder.Metadata `union:"segment"` + Type string `json:"type"` + Segments []int64 `json:"segments"` // 每一段的大小 +} + +func NewSegmentRedundancy(totalSize int64, segmentCount int) *SegmentRedundancy { + var segs []int64 + segLen := int64(0) + // 计算每一段的大小。大小不一定都相同,但总和应该等于总大小。 + for i := 0; i < segmentCount; i++ { + curLen := totalSize*int64(i+1)/int64(segmentCount) - segLen + segs = append(segs, curLen) + segLen += curLen + } + + return &SegmentRedundancy{ + Type: "segment", + Segments: segs, + } +} + +func (r *SegmentRedundancy) SegmentCount() int { + return len(r.Segments) +} + +func (r *SegmentRedundancy) CalcSegmentStart(index int) int64 { + return lo.Sum(r.Segments[:index]) +} + +// 计算指定位置取整到最近的段的起始位置。 +func (r *SegmentRedundancy) FloorSegmentPosition(pos int64) int64 { + fpos := int64(0) + for _, segLen := range r.Segments { + segEnd := fpos + segLen + if pos < segEnd { + break + } + fpos += segLen + } + + return fpos +} + +// 计算指定范围内的段索引范围,参数和返回值所代表的范围都是左闭右开的。 +// 如果end == -1,则代表计算从start到最后一个字节的范围。 +func (b *SegmentRedundancy) CalcSegmentRange(start int64, end *int64) (segIdxStart int, segIdxEnd int) { + segIdxStart = len(b.Segments) + segIdxEnd = len(b.Segments) + + // 找到第一个包含start的段索引 + segStart := int64(0) + for i, segLen := range b.Segments { + segEnd := segStart + segLen + if start < segEnd { + segIdxStart = i + break + } + segStart += segLen + } + + if end != nil { + // 找到第一个包含end的段索引 + segStart = int64(0) + for i, segLen := range b.Segments { + segEnd := segStart + segLen + if *end <= segEnd { + segIdxEnd = i + 1 + break + } + segStart += segLen + } + } + + return +} + const ( PackageStateNormal = "Normal" PackageStateDeleted = "Deleted" diff --git a/sdks/storage/shared_storage.go b/sdks/storage/shared_storage.go index 4f4fa0c..7364143 100644 --- a/sdks/storage/shared_storage.go +++ b/sdks/storage/shared_storage.go @@ -18,7 +18,8 @@ var _ = serder.UseTypeUnionInternallyTagged(types.Ref(types.NewTypeUnion[SharedS )), "type") type LocalSharedStorage struct { - Type string `json:"type"` + serder.Metadata `union:"Local"` + Type string `json:"type"` // 调度Package时的Package的根路径 LoadBase string `json:"loadBase"` } diff --git a/utils/lo2/lo.go b/utils/lo2/lo.go index 056f7e7..930f6d3 100644 --- a/utils/lo2/lo.go +++ b/utils/lo2/lo.go @@ -1,6 +1,9 @@ package lo2 -import "github.com/samber/lo" +import ( + "github.com/samber/lo" + "gitlink.org.cn/cloudream/common/utils/math2" +) func Remove[T comparable](arr []T, item T) []T { index := lo.IndexOf(arr, item) @@ -25,6 +28,16 @@ func RemoveAt[T any](arr []T, index int) []T { return append(arr[:index], arr[index+1:]...) } +func RemoveRange[T any](arr []T, start int, length int) []T { + if start >= len(arr) { + return arr + } + + length = math2.Min(len(arr), start+length) - start + copy(arr[start:], arr[start+length:]) + return arr[:len(arr)-length] +} + func RemoveAllDefault[T comparable](arr []T) []T { var def T return lo.Filter(arr, func(i T, idx int) bool { diff --git a/utils/math2/math.go b/utils/math2/math.go index 6677d2b..ee65faa 100644 --- a/utils/math2/math.go +++ b/utils/math2/math.go @@ -33,3 +33,15 @@ func CeilDiv[T constraints.Integer](v T, div T) T { func FloorDiv[T constraints.Integer](v T, div T) T { return v / div } + +func Clamp[T constraints.Integer](v, min, max T) T { + if v < min { + return min + } + + if v > max { + return max + } + + return v +}