Browse Source

Merge branch 'master' of https://gitlink.org.cn/cloudream/common

feature_wq
JeshuaRen 11 months ago
parent
commit
254838e3e5
19 changed files with 666 additions and 193 deletions
  1. +9
    -1
      magefiles/common.go
  2. +5
    -0
      magefiles/targets/targets.go
  3. +6
    -2
      pkgs/ioswitch/dag/graph.go
  4. +313
    -44
      pkgs/ioswitch/dag/node.go
  5. +69
    -56
      pkgs/ioswitch/dag/var.go
  6. +4
    -0
      pkgs/ioswitch/exec/utils.go
  7. +38
    -21
      pkgs/ioswitch/plan/generate.go
  8. +8
    -8
      pkgs/ioswitch/plan/ops/driver.go
  9. +4
    -3
      pkgs/ioswitch/plan/ops/drop.go
  10. +27
    -28
      pkgs/ioswitch/plan/ops/send.go
  11. +3
    -3
      pkgs/ioswitch/plan/ops/store.go
  12. +9
    -13
      pkgs/ioswitch/plan/ops/sync.go
  13. +5
    -8
      sdks/storage/cdsapi/object.go
  14. +56
    -0
      sdks/storage/cdsapi/package.go
  15. +3
    -3
      sdks/storage/cdsapi/storage_test.go
  16. +79
    -1
      sdks/storage/models.go
  17. +2
    -1
      sdks/storage/shared_storage.go
  18. +14
    -1
      utils/lo2/lo.go
  19. +12
    -0
      utils/math2/math.go

+ 9
- 1
magefiles/common.go View File

@@ -15,6 +15,7 @@ var Global = struct {
OS string OS string
Arch string Arch string
BuildRoot string BuildRoot string
Debug bool
}{ }{
Arch: "amd64", Arch: "amd64",
} }
@@ -50,7 +51,14 @@ func Build(args BuildArgs) error {
binPath := filepath.Join(fullOutputDir, args.OutputName+goBuildArgs.OutputExt) binPath := filepath.Join(fullOutputDir, args.OutputName+goBuildArgs.OutputExt)
fmt.Printf("building to %s\n", binPath) fmt.Printf("building to %s\n", binPath)


goCmdArgs := []string{"build", "-o", binPath}
goCmdArgs := []string{"build"}

if Global.Debug {
goCmdArgs = append(goCmdArgs, "-gcflags", "all=-N -l")
}

goCmdArgs = append(goCmdArgs, "-o", binPath)

if args.EntryFile != "" { if args.EntryFile != "" {
goCmdArgs = append(goCmdArgs, args.EntryFile) goCmdArgs = append(goCmdArgs, args.EntryFile)
} }


+ 5
- 0
magefiles/targets/targets.go View File

@@ -28,3 +28,8 @@ func ARM64() {
func BuildRoot(dir string) { func BuildRoot(dir string) {
magefiles.Global.BuildRoot = dir magefiles.Global.BuildRoot = dir
} }

// [配置项]关闭编译优化,用于调试
func Debug() {
magefiles.Global.Debug = true
}

+ 6
- 2
pkgs/ioswitch/dag/graph.go View File

@@ -47,8 +47,12 @@ func (g *Graph) Walk(cb func(node Node) bool) {
g.Nodes = lo2.RemoveAllDefault(g.Nodes) g.Nodes = lo2.RemoveAllDefault(g.Nodes)
} }


func (g *Graph) NewVar() *Var {
return &Var{}
func (g *Graph) NewStreamVar() *StreamVar {
return &StreamVar{}
}

func (g *Graph) NewValueVar() *ValueVar {
return &ValueVar{}
} }


func AddNode[N Node](graph *Graph, typ N) N { func AddNode[N Node](graph *Graph, typ N) N {


+ 313
- 44
pkgs/ioswitch/dag/node.go View File

@@ -1,6 +1,7 @@
package dag package dag


import ( import (
"github.com/samber/lo"
"gitlink.org.cn/cloudream/common/pkgs/ioswitch/exec" "gitlink.org.cn/cloudream/common/pkgs/ioswitch/exec"
"gitlink.org.cn/cloudream/common/utils/lo2" "gitlink.org.cn/cloudream/common/utils/lo2"
) )
@@ -50,105 +51,373 @@ type Node interface {
Graph() *Graph Graph() *Graph
SetGraph(graph *Graph) SetGraph(graph *Graph)
Env() *NodeEnv Env() *NodeEnv
InputStreams() *InputSlots
OutputStreams() *OutputSlots
InputValues() *InputSlots
OutputValues() *OutputSlots
InputStreams() *StreamInputSlots
OutputStreams() *StreamOutputSlots
InputValues() *ValueInputSlots
OutputValues() *ValueOutputSlots
GenerateOp() (exec.Op, error) GenerateOp() (exec.Op, error)
// String() string // String() string
} }


type VarSlots []*Var
type VarSlots[T any] []*T


func (s *VarSlots) Len() int {
func (s *VarSlots[T]) Len() int {
return len(*s) return len(*s)
} }


func (s *VarSlots) Get(idx int) *Var {
func (s *VarSlots[T]) Get(idx int) *T {
return (*s)[idx] return (*s)[idx]
} }


func (s *VarSlots) Set(idx int, val *Var) *Var {
func (s *VarSlots[T]) Set(idx int, val *T) *T {
old := (*s)[idx] old := (*s)[idx]
(*s)[idx] = val (*s)[idx] = val
return old return old
} }


func (s *VarSlots) Append(val *Var) int {
func (s *VarSlots[T]) IndexOf(v *T) int {
return lo.IndexOf(*s, v)
}

func (s *VarSlots[T]) Append(val *T) int {
*s = append(*s, val) *s = append(*s, val)
return s.Len() - 1 return s.Len() - 1
} }


func (s *VarSlots) RemoveAt(idx int) {
func (s *VarSlots[T]) Clear(val *T) {
for i := 0; i < s.Len(); i++ {
if (*s)[i] == val {
(*s)[i] = nil
}
}
}

func (s *VarSlots[T]) RemoveAt(idx int) {
(*s) = lo2.RemoveAt(*s, idx) (*s) = lo2.RemoveAt(*s, idx)
} }


func (s *VarSlots) Resize(size int) {
func (s *VarSlots[T]) RemoveRange(start int, cnt int) {
*s = lo2.RemoveRange(*s, start, cnt)
}

func (s *VarSlots[T]) Resize(size int) {
if s.Len() < size { if s.Len() < size {
*s = append(*s, make([]*Var, size-s.Len())...)
*s = append(*s, make([]*T, size-s.Len())...)
} else if s.Len() > size { } else if s.Len() > size {
*s = (*s)[:size] *s = (*s)[:size]
} }
} }


func (s *VarSlots) SetRawArray(arr []*Var) {
func (s *VarSlots[T]) SetRawArray(arr []*T) {
*s = arr *s = arr
} }


func (s *VarSlots) RawArray() []*Var {
func (s *VarSlots[T]) RawArray() []*T {
return *s return *s
} }


type InputSlots struct {
VarSlots
type StreamInputSlots struct {
Slots VarSlots[StreamVar]
}

func (s *StreamInputSlots) Len() int {
return s.Slots.Len()
}

func (s *StreamInputSlots) Get(idx int) *StreamVar {
return s.Slots.Get(idx)
}

func (s *StreamInputSlots) IndexOf(v *StreamVar) int {
return s.Slots.IndexOf(v)
}

// 初始化输入流槽。调用者应该保证没有正在使用的槽位(即Slots的每一个元素都为nil)
func (s *StreamInputSlots) Init(cnt int) {
s.Slots.Resize(cnt)
}

func (s *StreamInputSlots) EnlargeOne() int {
s.Slots.Append(nil)
return s.Len() - 1
}

func (s *StreamInputSlots) ClearInputAt(my Node, idx int) {
v := s.Get(idx)
if v == nil {
return
}
s.Slots.Set(idx, nil)

v.Dst.Remove(my)
}

func (s *StreamInputSlots) ClearAllInput(my Node) {
for i := 0; i < s.Len(); i++ {
v := s.Get(i)
if v == nil {
continue
}
s.Slots.Set(i, nil)

v.Dst.Remove(my)
}
}

func (s *StreamInputSlots) GetVarIDs() []exec.VarID {
var ids []exec.VarID
for _, v := range s.Slots.RawArray() {
if v == nil {
continue
}
ids = append(ids, v.VarID)
}

return ids
}

func (s *StreamInputSlots) GetVarIDsRanged(start, end int) []exec.VarID {
var ids []exec.VarID
for i := start; i < end; i++ {
v := s.Get(i)
if v == nil {
continue
}
ids = append(ids, v.VarID)
}

return ids
}

type ValueInputSlots struct {
Slots VarSlots[ValueVar]
}

func (s *ValueInputSlots) Len() int {
return s.Slots.Len()
} }


func (s *InputSlots) EnsureSize(cnt int) {
func (s *ValueInputSlots) Get(idx int) *ValueVar {
return s.Slots.Get(idx)
}

func (s *ValueInputSlots) IndexOf(v *ValueVar) int {
return s.Slots.IndexOf(v)
}

// 初始化输入流槽。调用者应该保证没有正在使用的槽位(即Slots的每一个元素都为nil)
func (s *ValueInputSlots) Init(cnt int) {
if s.Len() < cnt { if s.Len() < cnt {
s.VarSlots = append(s.VarSlots, make([]*Var, cnt-s.Len())...)
s.Slots = append(s.Slots, make([]*ValueVar, cnt-s.Len())...)
} }
} }


func (s *InputSlots) EnlargeOne() int {
s.Append(nil)
func (s *ValueInputSlots) EnlargeOne() int {
s.Slots.Append(nil)
return s.Len() - 1 return s.Len() - 1
} }


type OutputSlots struct {
VarSlots
func (s *ValueInputSlots) ClearInputAt(my Node, idx int) {
v := s.Get(idx)
if v == nil {
return
}
s.Slots.Set(idx, nil)

v.Dst.Remove(my)
} }


func (s *OutputSlots) Setup(my Node, v *Var, slotIdx int) {
if s.Len() <= slotIdx {
s.VarSlots = append(s.VarSlots, make([]*Var, slotIdx-s.Len()+1)...)
func (s *ValueInputSlots) GetVarIDs() []exec.VarID {
var ids []exec.VarID
for _, v := range s.Slots.RawArray() {
if v == nil {
continue
}
ids = append(ids, v.VarID)
} }


s.Set(slotIdx, v)
*v.From() = EndPoint{
Node: my,
SlotIndex: slotIdx,
return ids
}

func (s *ValueInputSlots) GetVarIDsRanged(start, end int) []exec.VarID {
var ids []exec.VarID
for i := start; i < end; i++ {
v := s.Get(i)
if v == nil {
continue
}
ids = append(ids, v.VarID)
} }

return ids
} }


func (s *OutputSlots) SetupNew(my Node, v *Var) {
s.Append(v)
*v.From() = EndPoint{
Node: my,
SlotIndex: s.Len() - 1,
type StreamOutputSlots struct {
Slots VarSlots[StreamVar]
}

func (s *StreamOutputSlots) Len() int {
return s.Slots.Len()
}

func (s *StreamOutputSlots) Get(idx int) *StreamVar {
return s.Slots.Get(idx)
}

func (s *StreamOutputSlots) IndexOf(v *StreamVar) int {
return s.Slots.IndexOf(v)
}

// 设置Slots大小,并为每个Slot创建一个StreamVar。
// 调用者应该保证没有正在使用的输出流,即每一个输出流的Dst都为空。
func (s *StreamOutputSlots) Init(my Node, size int) {
s.Slots.Resize(size)
for i := 0; i < size; i++ {
v := my.Graph().NewStreamVar()
v.Src = my
s.Slots.Set(i, v)
} }
} }


type Slot struct {
Var *Var
// 在Slots末尾增加一个StreamVar,并返回它的索引
func (s *StreamOutputSlots) AppendNew(my Node) StreamSlot {
v := my.Graph().NewStreamVar()
v.Src = my
s.Slots.Append(v)
return StreamSlot{Var: v, Index: s.Len() - 1}
}

// 断开指定位置的输出流到指定节点的连接
func (s *StreamOutputSlots) ClearOutputAt(idx int, dst Node) {
v := s.Get(idx)
v.Dst.Remove(dst)
dst.InputStreams().Slots.Clear(v)
}

// 断开所有输出流的所有连接,完全清空所有输出流。但会保留流变量
func (s *StreamOutputSlots) ClearAllOutput(my Node) {
for i := 0; i < s.Len(); i++ {
v := s.Get(i)
v.ClearAllDst()
}
}

func (s *StreamOutputSlots) GetVarIDs() []exec.VarID {
var ids []exec.VarID
for _, v := range s.Slots.RawArray() {
if v == nil {
continue
}
ids = append(ids, v.VarID)
}

return ids
}

func (s *StreamOutputSlots) GetVarIDsRanged(start, end int) []exec.VarID {
var ids []exec.VarID
for i := start; i < end; i++ {
v := s.Get(i)
if v == nil {
continue
}
ids = append(ids, v.VarID)
}

return ids
}

type ValueOutputSlots struct {
Slots VarSlots[ValueVar]
}

func (s *ValueOutputSlots) Len() int {
return s.Slots.Len()
}

func (s *ValueOutputSlots) Get(idx int) *ValueVar {
return s.Slots.Get(idx)
}

func (s *ValueOutputSlots) IndexOf(v *ValueVar) int {
return s.Slots.IndexOf(v)
}

// 设置Slots大小,并为每个Slot创建一个StreamVar
// 调用者应该保证没有正在使用的输出流,即每一个输出流的Dst都为空。
func (s *ValueOutputSlots) Init(my Node, size int) {
s.Slots.Resize(size)
for i := 0; i < size; i++ {
v := my.Graph().NewValueVar()
v.Src = my
s.Slots.Set(i, v)
}
}

// 在Slots末尾增加一个StreamVar,并返回它的索引
func (s *ValueOutputSlots) AppendNew(my Node) ValueSlot {
v := my.Graph().NewValueVar()
v.Src = my
s.Slots.Append(v)
return ValueSlot{Var: v, Index: s.Len() - 1}
}

// 断开指定位置的输出流到指定节点的连接
func (s *ValueOutputSlots) ClearOutputAt(idx int, dst Node) {
v := s.Get(idx)
v.Dst.Remove(dst)
dst.InputValues().Slots.Clear(v)
}

// 断开所有输出流的所有连接,完全清空所有输出流。但会保留流变量
func (s *ValueOutputSlots) ClearAllOutput(my Node) {
for i := 0; i < s.Len(); i++ {
v := s.Get(i)
v.ClearAllDst()
}
}

func (s *ValueOutputSlots) GetVarIDs() []exec.VarID {
var ids []exec.VarID
for _, v := range s.Slots.RawArray() {
if v == nil {
continue
}
ids = append(ids, v.VarID)
}

return ids
}

func (s *ValueOutputSlots) GetVarIDsRanged(start, end int) []exec.VarID {
var ids []exec.VarID
for i := start; i < end; i++ {
v := s.Get(i)
if v == nil {
continue
}
ids = append(ids, v.VarID)
}

return ids
}

type StreamSlot struct {
Var *StreamVar
Index int
}

type ValueSlot struct {
Var *ValueVar
Index int Index int
} }


type NodeBase struct { type NodeBase struct {
env NodeEnv env NodeEnv
inputStreams InputSlots
outputStreams OutputSlots
inputValues InputSlots
outputValues OutputSlots
inputStreams StreamInputSlots
outputStreams StreamOutputSlots
inputValues ValueInputSlots
outputValues ValueOutputSlots
graph *Graph graph *Graph
} }


@@ -164,18 +433,18 @@ func (n *NodeBase) Env() *NodeEnv {
return &n.env return &n.env
} }


func (n *NodeBase) InputStreams() *InputSlots {
func (n *NodeBase) InputStreams() *StreamInputSlots {
return &n.inputStreams return &n.inputStreams
} }


func (n *NodeBase) OutputStreams() *OutputSlots {
func (n *NodeBase) OutputStreams() *StreamOutputSlots {
return &n.outputStreams return &n.outputStreams
} }


func (n *NodeBase) InputValues() *InputSlots {
func (n *NodeBase) InputValues() *ValueInputSlots {
return &n.inputValues return &n.inputValues
} }


func (n *NodeBase) OutputValues() *OutputSlots {
func (n *NodeBase) OutputValues() *ValueOutputSlots {
return &n.outputValues return &n.outputValues
} }

+ 69
- 56
pkgs/ioswitch/dag/var.go View File

@@ -5,95 +5,108 @@ import (
"gitlink.org.cn/cloudream/common/utils/lo2" "gitlink.org.cn/cloudream/common/utils/lo2"
) )


type EndPoint struct {
Node Node
SlotIndex int // 所连接的Node的Output或Input数组的索引
type Var2 interface {
GetVarID() exec.VarID
} }


type EndPointSlots []EndPoint
func (s *EndPointSlots) Len() int {
return len(*s)
type StreamVar struct {
VarID exec.VarID
Src Node
Dst DstList
} }


func (s *EndPointSlots) Get(idx int) *EndPoint {
return &(*s)[idx]
func (v *StreamVar) GetVarID() exec.VarID {
return v.VarID
} }


func (s *EndPointSlots) Add(ed EndPoint) int {
(*s) = append((*s), ed)
return len(*s) - 1
func (v *StreamVar) IndexAtSrc() int {
return v.Src.OutputStreams().IndexOf(v)
} }


func (s *EndPointSlots) Remove(ed EndPoint) {
for i, e := range *s {
if e == ed {
(*s) = lo2.RemoveAt((*s), i)
return
}
}
func (v *StreamVar) To(to Node, slotIdx int) {
v.Dst.Add(to)
to.InputStreams().Slots.Set(slotIdx, v)
} }


func (s *EndPointSlots) RemoveAt(idx int) {
lo2.RemoveAt((*s), idx)
func (v *StreamVar) NotTo(node Node) {
v.Dst.Remove(node)
node.InputStreams().Slots.Clear(v)
} }


func (s *EndPointSlots) Resize(size int) {
if s.Len() < size {
(*s) = append((*s), make([]EndPoint, size-s.Len())...)
} else if s.Len() > size {
(*s) = (*s)[:size]
func (v *StreamVar) ClearAllDst() {
for _, n := range v.Dst {
n.InputStreams().Slots.Clear(v)
} }
v.Dst = nil
} }


func (s *EndPointSlots) RawArray() []EndPoint {
return *s
type ValueVar struct {
VarID exec.VarID
Src Node
Dst DstList
} }


type Var struct {
VarID exec.VarID
from EndPoint
to EndPointSlots
func (v *ValueVar) GetVarID() exec.VarID {
return v.VarID
} }


func (v *Var) From() *EndPoint {
return &v.from
func (v *ValueVar) IndexAtSrc() int {
return v.Src.InputValues().IndexOf(v)
} }


func (v *Var) To() *EndPointSlots {
return &v.to
func (v *ValueVar) To(to Node, slotIdx int) {
v.Dst.Add(to)
to.InputValues().Slots.Set(slotIdx, v)
} }


func (v *Var) ValueTo(to Node, slotIdx int) {
v.To().Add(EndPoint{Node: to, SlotIndex: slotIdx})
to.InputValues().Set(slotIdx, v)
func (v *ValueVar) NotTo(node Node) {
v.Dst.Remove(node)
node.InputValues().Slots.Clear(v)
} }


func (v *Var) ValueNotTo(node Node, slotIdx int) {
v.to.Remove(EndPoint{Node: node, SlotIndex: slotIdx})
node.InputValues().Set(slotIdx, nil)
func (v *ValueVar) ClearAllDst() {
for _, n := range v.Dst {
n.InputValues().Slots.Clear(v)
}
v.Dst = nil
} }


func (v *Var) StreamTo(to Node, slotIdx int) {
v.To().Add(EndPoint{Node: to, SlotIndex: slotIdx})
to.InputStreams().Set(slotIdx, v)
type DstList []Node

func (s *DstList) Len() int {
return len(*s)
} }


func (v *Var) StreamNotTo(node Node, slotIdx int) {
v.to.Remove(EndPoint{Node: node, SlotIndex: slotIdx})
node.InputStreams().Set(slotIdx, nil)
func (s *DstList) Get(idx int) Node {
return (*s)[idx]
} }


func (v *Var) NoInputAllValue() {
for _, ed := range v.to {
ed.Node.InputValues().Set(ed.SlotIndex, nil)
func (s *DstList) Add(n Node) int {
(*s) = append((*s), n)
return len(*s) - 1
}

func (s *DstList) Remove(n Node) {
for i, e := range *s {
if e == n {
(*s) = lo2.RemoveAt((*s), i)
return
}
} }
v.to = nil
} }


func (v *Var) NoInputAllStream() {
for _, ed := range v.to {
ed.Node.InputStreams().Set(ed.SlotIndex, nil)
func (s *DstList) RemoveAt(idx int) {
lo2.RemoveAt((*s), idx)
}

func (s *DstList) Resize(size int) {
if s.Len() < size {
(*s) = append((*s), make([]Node, size-s.Len())...)
} else if s.Len() > size {
(*s) = (*s)[:size]
} }
v.to = nil
}

func (s *DstList) RawArray() []Node {
return *s
} }

+ 4
- 0
pkgs/ioswitch/exec/utils.go View File

@@ -14,6 +14,10 @@ type Range struct {
Length *int64 Length *int64
} }


func NewRange(offset int64, length int64) Range {
return Range{Offset: offset, Length: &length}
}

func (r *Range) Extend(other Range) { func (r *Range) Extend(other Range) {
newOffset := math2.Min(r.Offset, other.Offset) newOffset := math2.Min(r.Offset, other.Offset)




+ 38
- 21
pkgs/ioswitch/plan/generate.go View File

@@ -30,57 +30,61 @@ func generateSend(graph *ops.GraphNodeBuilder) {


for i := 0; i < node.OutputStreams().Len(); i++ { for i := 0; i < node.OutputStreams().Len(); i++ {
out := node.OutputStreams().Get(i) out := node.OutputStreams().Get(i)
to := out.To().Get(0)
if to.Node.Env().Equals(node.Env()) {
to := out.Dst.Get(0)
if to.Env().Equals(node.Env()) {
continue continue
} }


switch to.Node.Env().Type {
switch to.Env().Type {
case dag.EnvDriver: case dag.EnvDriver:


// // 如果是要送到Driver,则只能由Driver主动去拉取 // // 如果是要送到Driver,则只能由Driver主动去拉取
dstNode := out.Dst.Get(0)

getNode := graph.NewGetStream(node.Env().Worker) getNode := graph.NewGetStream(node.Env().Worker)
getNode.Env().ToEnvDriver() getNode.Env().ToEnvDriver()


// // 同时需要对此变量生成HoldUntil指令,避免Plan结束时Get指令还未到达 // // 同时需要对此变量生成HoldUntil指令,避免Plan结束时Get指令还未到达
holdType := graph.NewHoldUntil() //dag.NewNode(graph, &ops.HoldUntilNode{}, nil)
*holdType.Env() = *node.Env()
holdNode := graph.NewHoldUntil()
*holdNode.Env() = *node.Env()


// 将Get指令的信号送到Hold指令 // 将Get指令的信号送到Hold指令
holdType.SetSignal(getNode.SignalVar())
holdNode.SetSignal(getNode.SignalVar())


out.To().RemoveAt(0)
out.Dst.RemoveAt(0)


// 将源节点的输出送到Hold指令,将Hold指令的输出送到Get指令 // 将源节点的输出送到Hold指令,将Hold指令的输出送到Get指令
getNode.Get(holdType.HoldStream(out)).
getNode.Get(holdNode.HoldStream(out)).
// 将Get指令的输出送到目的地 // 将Get指令的输出送到目的地
StreamTo(to.Node, to.SlotIndex)
To(to, dstNode.InputStreams().IndexOf(out))


case dag.EnvWorker: case dag.EnvWorker:
// 如果是要送到Agent,则可以直接发送 // 如果是要送到Agent,则可以直接发送
n := graph.NewSendStream(to.Node.Env().Worker)
dstNode := out.Dst.Get(0)
n := graph.NewSendStream(to.Env().Worker)
*n.Env() = *node.Env() *n.Env() = *node.Env()


out.To().RemoveAt(0)
n.Send(out).StreamTo(to.Node, to.SlotIndex)
out.Dst.RemoveAt(0)
n.Send(out).To(to, dstNode.InputStreams().IndexOf(out))
} }
} }


for i := 0; i < node.OutputValues().Len(); i++ { for i := 0; i < node.OutputValues().Len(); i++ {
out := node.OutputValues().Get(i) out := node.OutputValues().Get(i)
// 允许Value变量不被使用 // 允许Value变量不被使用
if out.To().Len() == 0 {
if out.Dst.Len() == 0 {
continue continue
} }


to := out.To().Get(0)
if to.Node.Env().Equals(node.Env()) {
to := out.Dst.Get(0)
if to.Env().Equals(node.Env()) {
continue continue
} }


switch to.Node.Env().Type {
switch to.Env().Type {
case dag.EnvDriver: case dag.EnvDriver:
// // 如果是要送到Driver,则只能由Driver主动去拉取 // // 如果是要送到Driver,则只能由Driver主动去拉取
dstNode := out.Dst.Get(0)
getNode := graph.NewGetValue(node.Env().Worker) getNode := graph.NewGetValue(node.Env().Worker)
getNode.Env().ToEnvDriver() getNode.Env().ToEnvDriver()


@@ -91,21 +95,22 @@ func generateSend(graph *ops.GraphNodeBuilder) {
// 将Get指令的信号送到Hold指令 // 将Get指令的信号送到Hold指令
holdNode.SetSignal(getNode.SignalVar()) holdNode.SetSignal(getNode.SignalVar())


out.To().RemoveAt(0)
out.Dst.RemoveAt(0)


// 将源节点的输出送到Hold指令,将Hold指令的输出送到Get指令 // 将源节点的输出送到Hold指令,将Hold指令的输出送到Get指令
getNode.Get(holdNode.HoldVar(out)). getNode.Get(holdNode.HoldVar(out)).
// 将Get指令的输出送到目的地 // 将Get指令的输出送到目的地
ValueTo(to.Node, to.SlotIndex)
To(to, dstNode.InputValues().IndexOf(out))


case dag.EnvWorker: case dag.EnvWorker:
// 如果是要送到Agent,则可以直接发送 // 如果是要送到Agent,则可以直接发送
t := graph.NewSendValue(to.Node.Env().Worker)
dstNode := out.Dst.Get(0)
t := graph.NewSendValue(to.Env().Worker)
*t.Env() = *node.Env() *t.Env() = *node.Env()


out.To().RemoveAt(0)
out.Dst.RemoveAt(0)


t.Send(out).ValueTo(to.Node, to.SlotIndex)
t.Send(out).To(to, dstNode.InputValues().IndexOf(out))
} }
} }


@@ -119,6 +124,9 @@ func buildPlan(graph *dag.Graph, blder *exec.PlanBuilder) error {
graph.Walk(func(node dag.Node) bool { graph.Walk(func(node dag.Node) bool {
for i := 0; i < node.OutputStreams().Len(); i++ { for i := 0; i < node.OutputStreams().Len(); i++ {
out := node.OutputStreams().Get(i) out := node.OutputStreams().Get(i)
if out == nil {
continue
}


if out.VarID > 0 { if out.VarID > 0 {
continue continue
@@ -129,6 +137,9 @@ func buildPlan(graph *dag.Graph, blder *exec.PlanBuilder) error {


for i := 0; i < node.InputStreams().Len(); i++ { for i := 0; i < node.InputStreams().Len(); i++ {
in := node.InputStreams().Get(i) in := node.InputStreams().Get(i)
if in == nil {
continue
}


if in.VarID > 0 { if in.VarID > 0 {
continue continue
@@ -139,6 +150,9 @@ func buildPlan(graph *dag.Graph, blder *exec.PlanBuilder) error {


for i := 0; i < node.OutputValues().Len(); i++ { for i := 0; i < node.OutputValues().Len(); i++ {
out := node.OutputValues().Get(i) out := node.OutputValues().Get(i)
if out == nil {
continue
}


if out.VarID > 0 { if out.VarID > 0 {
continue continue
@@ -149,6 +163,9 @@ func buildPlan(graph *dag.Graph, blder *exec.PlanBuilder) error {


for i := 0; i < node.InputValues().Len(); i++ { for i := 0; i < node.InputValues().Len(); i++ {
in := node.InputValues().Get(i) in := node.InputValues().Get(i)
if in == nil {
continue
}


if in.VarID > 0 { if in.VarID > 0 {
continue continue


+ 8
- 8
pkgs/ioswitch/plan/ops/driver.go View File

@@ -16,13 +16,13 @@ func (b *GraphNodeBuilder) NewFromDriver(handle *exec.DriverWriteStream) *FromDr
} }
b.AddNode(node) b.AddNode(node)


node.OutputStreams().SetupNew(node, b.NewVar())
node.OutputStreams().Init(node, 1)


return node return node
} }


func (t *FromDriverNode) Output() dag.Slot {
return dag.Slot{
func (t *FromDriverNode) Output() dag.StreamSlot {
return dag.StreamSlot{
Var: t.OutputStreams().Get(0), Var: t.OutputStreams().Get(0),
Index: 0, Index: 0,
} }
@@ -49,16 +49,16 @@ func (b *GraphNodeBuilder) NewToDriver(handle *exec.DriverReadStream) *ToDriverN
} }
b.AddNode(node) b.AddNode(node)


node.InputStreams().Init(1)
return node return node
} }


func (t *ToDriverNode) SetInput(v *dag.Var) {
t.InputStreams().EnsureSize(1)
v.StreamTo(t, 0)
func (t *ToDriverNode) SetInput(v *dag.StreamVar) {
v.To(t, 0)
} }


func (t *ToDriverNode) Input() dag.Slot {
return dag.Slot{
func (t *ToDriverNode) Input() dag.StreamSlot {
return dag.StreamSlot{
Var: t.InputStreams().Get(0), Var: t.InputStreams().Get(0),
Index: 0, Index: 0,
} }


+ 4
- 3
pkgs/ioswitch/plan/ops/drop.go View File

@@ -46,12 +46,13 @@ type DropNode struct {
func (b *GraphNodeBuilder) NewDropStream() *DropNode { func (b *GraphNodeBuilder) NewDropStream() *DropNode {
node := &DropNode{} node := &DropNode{}
b.AddNode(node) b.AddNode(node)

node.InputStreams().Init(1)
return node return node
} }


func (t *DropNode) SetInput(v *dag.Var) {
t.InputStreams().EnsureSize(1)
v.StreamTo(t, 0)
func (t *DropNode) SetInput(v *dag.StreamVar) {
v.To(t, 0)
} }


func (t *DropNode) GenerateOp() (exec.Op, error) { func (t *DropNode) GenerateOp() (exec.Op, error) {


+ 27
- 28
pkgs/ioswitch/plan/ops/send.go View File

@@ -150,15 +150,15 @@ func (b *GraphNodeBuilder) NewSendStream(to exec.WorkerInfo) *SendStreamNode {
ToWorker: to, ToWorker: to,
} }
b.AddNode(node) b.AddNode(node)

node.InputStreams().Init(1)
node.OutputStreams().Init(node, 1)
return node return node
} }


func (t *SendStreamNode) Send(v *dag.Var) *dag.Var {
t.InputStreams().EnsureSize(1)
v.StreamTo(t, 0)
output := t.Graph().NewVar()
t.OutputStreams().Setup(t, output, 0)
return output
func (t *SendStreamNode) Send(v *dag.StreamVar) *dag.StreamVar {
v.To(t, 0)
return t.OutputStreams().Get(0)
} }


func (t *SendStreamNode) GenerateOp() (exec.Op, error) { func (t *SendStreamNode) GenerateOp() (exec.Op, error) {
@@ -183,15 +183,15 @@ func (b *GraphNodeBuilder) NewSendValue(to exec.WorkerInfo) *SendValueNode {
ToWorker: to, ToWorker: to,
} }
b.AddNode(node) b.AddNode(node)

node.InputValues().Init(1)
node.OutputValues().Init(node, 1)
return node return node
} }


func (t *SendValueNode) Send(v *dag.Var) *dag.Var {
t.InputValues().EnsureSize(1)
v.ValueTo(t, 0)
output := t.Graph().NewVar()
t.OutputValues().Setup(t, output, 0)
return output
func (t *SendValueNode) Send(v *dag.ValueVar) *dag.ValueVar {
v.To(t, 0)
return t.OutputValues().Get(0)
} }


func (t *SendValueNode) GenerateOp() (exec.Op, error) { func (t *SendValueNode) GenerateOp() (exec.Op, error) {
@@ -216,19 +216,19 @@ func (b *GraphNodeBuilder) NewGetStream(from exec.WorkerInfo) *GetStreamNode {
FromWorker: from, FromWorker: from,
} }
b.AddNode(node) b.AddNode(node)
node.OutputValues().Setup(node, node.Graph().NewVar(), 0)

node.InputStreams().Init(1)
node.OutputValues().Init(node, 1)
node.OutputStreams().Init(node, 1)
return node return node
} }


func (t *GetStreamNode) Get(v *dag.Var) *dag.Var {
t.InputStreams().EnsureSize(1)
v.StreamTo(t, 0)
output := t.Graph().NewVar()
t.OutputStreams().Setup(t, output, 0)
return output
func (t *GetStreamNode) Get(v *dag.StreamVar) *dag.StreamVar {
v.To(t, 0)
return t.OutputStreams().Get(0)
} }


func (t *GetStreamNode) SignalVar() *dag.Var {
func (t *GetStreamNode) SignalVar() *dag.ValueVar {
return t.OutputValues().Get(0) return t.OutputValues().Get(0)
} }


@@ -255,19 +255,18 @@ func (b *GraphNodeBuilder) NewGetValue(from exec.WorkerInfo) *GetValueNode {
FromWorker: from, FromWorker: from,
} }
b.AddNode(node) b.AddNode(node)
node.OutputValues().Setup(node, node.Graph().NewVar(), 0)

node.InputValues().Init(1)
node.OutputValues().Init(node, 2)
return node return node
} }


func (t *GetValueNode) Get(v *dag.Var) *dag.Var {
t.InputValues().EnsureSize(1)
v.ValueTo(t, 0)
output := t.Graph().NewVar()
t.OutputValues().Setup(t, output, 1)
return output
func (t *GetValueNode) Get(v *dag.ValueVar) *dag.ValueVar {
v.To(t, 0)
return t.OutputValues().Get(1)
} }


func (t *GetValueNode) SignalVar() *dag.Var {
func (t *GetValueNode) SignalVar() *dag.ValueVar {
return t.OutputValues().Get(0) return t.OutputValues().Get(0)
} }




+ 3
- 3
pkgs/ioswitch/plan/ops/store.go View File

@@ -37,10 +37,10 @@ func (b *GraphNodeBuilder) NewStore() *StoreNode {
return node return node
} }


func (t *StoreNode) Store(key string, v *dag.Var) {
func (t *StoreNode) Store(key string, v *dag.ValueVar) {
t.Key = key t.Key = key
t.InputValues().EnsureSize(1)
v.ValueTo(t, 0)
t.InputValues().Init(1)
v.To(t, 0)
} }


func (t *StoreNode) GenerateOp() (exec.Op, error) { func (t *StoreNode) GenerateOp() (exec.Op, error) {


+ 9
- 13
pkgs/ioswitch/plan/ops/sync.go View File

@@ -165,26 +165,22 @@ type HoldUntilNode struct {
func (b *GraphNodeBuilder) NewHoldUntil() *HoldUntilNode { func (b *GraphNodeBuilder) NewHoldUntil() *HoldUntilNode {
node := &HoldUntilNode{} node := &HoldUntilNode{}
b.AddNode(node) b.AddNode(node)
node.InputValues().Init(1)
return node return node
} }


func (t *HoldUntilNode) SetSignal(s *dag.Var) {
t.InputValues().EnsureSize(1)
s.ValueTo(t, 0)
func (t *HoldUntilNode) SetSignal(s *dag.ValueVar) {
s.To(t, 0)
} }


func (t *HoldUntilNode) HoldStream(str *dag.Var) *dag.Var {
str.StreamTo(t, t.InputStreams().EnlargeOne())
output := t.Graph().NewVar()
t.OutputStreams().SetupNew(t, output)
return output
func (t *HoldUntilNode) HoldStream(str *dag.StreamVar) *dag.StreamVar {
str.To(t, t.InputStreams().EnlargeOne())
return t.OutputStreams().AppendNew(t).Var
} }


func (t *HoldUntilNode) HoldVar(v *dag.Var) *dag.Var {
v.ValueTo(t, t.InputValues().EnlargeOne())
output := t.Graph().NewVar()
t.OutputValues().SetupNew(t, output)
return output
func (t *HoldUntilNode) HoldVar(v *dag.ValueVar) *dag.ValueVar {
v.To(t, t.InputValues().EnlargeOne())
return t.OutputValues().AppendNew(t).Var
} }


func (t *HoldUntilNode) GenerateOp() (exec.Op, error) { func (t *HoldUntilNode) GenerateOp() (exec.Op, error) {


+ 5
- 8
sdks/storage/cdsapi/object.go View File

@@ -33,9 +33,10 @@ type ObjectUpload struct {
} }


type ObjectUploadInfo struct { type ObjectUploadInfo struct {
UserID cdssdk.UserID `json:"userID" binding:"required"`
PackageID cdssdk.PackageID `json:"packageID" binding:"required"`
StorageAffinity cdssdk.StorageID `json:"storageAffinity"`
UserID cdssdk.UserID `json:"userID" binding:"required"`
PackageID cdssdk.PackageID `json:"packageID" binding:"required"`
Affinity cdssdk.StorageID `json:"affinity"`
LoadTo []cdssdk.StorageID `json:"loadTo"`
} }


type UploadingObject struct { type UploadingObject struct {
@@ -46,11 +47,7 @@ type UploadingObject struct {
type UploadObjectIterator = iterator.Iterator[*UploadingObject] type UploadObjectIterator = iterator.Iterator[*UploadingObject]


type ObjectUploadResp struct { type ObjectUploadResp struct {
Uploadeds []UploadedObject `json:"uploadeds"`
}
type UploadedObject struct {
Object *cdssdk.Object `json:"object"`
Error string `json:"error"`
Uploadeds []cdssdk.Object `json:"uploadeds"`
} }


func (c *ObjectService) Upload(req ObjectUpload) (*ObjectUploadResp, error) { func (c *ObjectService) Upload(req ObjectUpload) (*ObjectUploadResp, error) {


+ 56
- 0
sdks/storage/cdsapi/package.go View File

@@ -6,6 +6,7 @@ import (
"strings" "strings"


"gitlink.org.cn/cloudream/common/consts/errorcode" "gitlink.org.cn/cloudream/common/consts/errorcode"
"gitlink.org.cn/cloudream/common/pkgs/iterator"
cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" cdssdk "gitlink.org.cn/cloudream/common/sdks/storage"
"gitlink.org.cn/cloudream/common/utils/http2" "gitlink.org.cn/cloudream/common/utils/http2"
"gitlink.org.cn/cloudream/common/utils/serder" "gitlink.org.cn/cloudream/common/utils/serder"
@@ -127,6 +128,61 @@ func (s *PackageService) Create(req PackageCreate) (*PackageCreateResp, error) {
return nil, codeResp.ToError() return nil, codeResp.ToError()
} }


const PackageCreateLoadPath = "/package/createLoad"

type PackageCreateLoad struct {
PackageCreateLoadInfo
Files UploadObjectIterator `json:"-"`
}
type PackageCreateLoadInfo struct {
UserID cdssdk.UserID `json:"userID" binding:"required"`
BucketID cdssdk.BucketID `json:"bucketID" binding:"required"`
Name string `json:"name" binding:"required"`
LoadTo []cdssdk.StorageID `json:"loadTo" binding:"required"`
}
type PackageCreateLoadResp struct {
Package cdssdk.Package `json:"package"`
Objects []cdssdk.Object `json:"objects"`
LoadedDirs []string `json:"loadedDirs"`
}

func (c *PackageService) CreateLoad(req PackageCreateLoad) (*PackageCreateLoadResp, error) {
url, err := url.JoinPath(c.baseURL, PackageCreateLoadPath)
if err != nil {
return nil, err
}

infoJSON, err := serder.ObjectToJSON(req)
if err != nil {
return nil, fmt.Errorf("upload info to json: %w", err)
}

resp, err := http2.PostMultiPart(url, http2.MultiPartRequestParam{
Form: map[string]string{"info": string(infoJSON)},
Files: iterator.Map(req.Files, func(src *UploadingObject) (*http2.IterMultiPartFile, error) {
return &http2.IterMultiPartFile{
FieldName: "files",
FileName: src.Path,
File: src.File,
}, nil
}),
})
if err != nil {
return nil, err
}

codeResp, err := ParseJSONResponse[response[PackageCreateLoadResp]](resp)
if err != nil {
return nil, err
}

if codeResp.Code == errorcode.OK {
return &codeResp.Data, nil
}

return nil, codeResp.ToError()
}

const PackageDeletePath = "/package/delete" const PackageDeletePath = "/package/delete"


type PackageDelete struct { type PackageDelete struct {


+ 3
- 3
sdks/storage/cdsapi/storage_test.go View File

@@ -89,9 +89,9 @@ func Test_Object(t *testing.T) {


_, err = cli.Object().Upload(ObjectUpload{ _, err = cli.Object().Upload(ObjectUpload{
ObjectUploadInfo: ObjectUploadInfo{ ObjectUploadInfo: ObjectUploadInfo{
UserID: 1,
PackageID: createResp.Package.PackageID,
StorageAffinity: stgAff,
UserID: 1,
PackageID: createResp.Package.PackageID,
Affinity: stgAff,
}, },
Files: iterator.Array( Files: iterator.Array(
&UploadingObject{ &UploadingObject{


+ 79
- 1
sdks/storage/models.go View File

@@ -5,6 +5,7 @@ import (
"fmt" "fmt"
"time" "time"


"github.com/samber/lo"
"gitlink.org.cn/cloudream/common/pkgs/types" "gitlink.org.cn/cloudream/common/pkgs/types"
"gitlink.org.cn/cloudream/common/utils/serder" "gitlink.org.cn/cloudream/common/utils/serder"
) )
@@ -33,7 +34,6 @@ type FileHash string
/// TODO 将分散在各处的公共结构体定义集中到这里来 /// TODO 将分散在各处的公共结构体定义集中到这里来


type Redundancy interface { type Redundancy interface {
driver.Valuer
} }


var RedundancyUnion = serder.UseTypeUnionInternallyTagged(types.Ref(types.NewTypeUnion[Redundancy]( var RedundancyUnion = serder.UseTypeUnionInternallyTagged(types.Ref(types.NewTypeUnion[Redundancy](
@@ -41,6 +41,7 @@ var RedundancyUnion = serder.UseTypeUnionInternallyTagged(types.Ref(types.NewTyp
(*RepRedundancy)(nil), (*RepRedundancy)(nil),
(*ECRedundancy)(nil), (*ECRedundancy)(nil),
(*LRCRedundancy)(nil), (*LRCRedundancy)(nil),
(*SegmentRedundancy)(nil),
)), "type") )), "type")


type NoneRedundancy struct { type NoneRedundancy struct {
@@ -162,6 +163,83 @@ func (b *LRCRedundancy) GetGroupElements(grp int) []int {
return idxes return idxes
} }


type SegmentRedundancy struct {
serder.Metadata `union:"segment"`
Type string `json:"type"`
Segments []int64 `json:"segments"` // 每一段的大小
}

func NewSegmentRedundancy(totalSize int64, segmentCount int) *SegmentRedundancy {
var segs []int64
segLen := int64(0)
// 计算每一段的大小。大小不一定都相同,但总和应该等于总大小。
for i := 0; i < segmentCount; i++ {
curLen := totalSize*int64(i+1)/int64(segmentCount) - segLen
segs = append(segs, curLen)
segLen += curLen
}

return &SegmentRedundancy{
Type: "segment",
Segments: segs,
}
}

func (r *SegmentRedundancy) SegmentCount() int {
return len(r.Segments)
}

func (r *SegmentRedundancy) CalcSegmentStart(index int) int64 {
return lo.Sum(r.Segments[:index])
}

// 计算指定位置取整到最近的段的起始位置。
func (r *SegmentRedundancy) FloorSegmentPosition(pos int64) int64 {
fpos := int64(0)
for _, segLen := range r.Segments {
segEnd := fpos + segLen
if pos < segEnd {
break
}
fpos += segLen
}

return fpos
}

// 计算指定范围内的段索引范围,参数和返回值所代表的范围都是左闭右开的。
// 如果end == -1,则代表计算从start到最后一个字节的范围。
func (b *SegmentRedundancy) CalcSegmentRange(start int64, end *int64) (segIdxStart int, segIdxEnd int) {
segIdxStart = len(b.Segments)
segIdxEnd = len(b.Segments)

// 找到第一个包含start的段索引
segStart := int64(0)
for i, segLen := range b.Segments {
segEnd := segStart + segLen
if start < segEnd {
segIdxStart = i
break
}
segStart += segLen
}

if end != nil {
// 找到第一个包含end的段索引
segStart = int64(0)
for i, segLen := range b.Segments {
segEnd := segStart + segLen
if *end <= segEnd {
segIdxEnd = i + 1
break
}
segStart += segLen
}
}

return
}

const ( const (
PackageStateNormal = "Normal" PackageStateNormal = "Normal"
PackageStateDeleted = "Deleted" PackageStateDeleted = "Deleted"


+ 2
- 1
sdks/storage/shared_storage.go View File

@@ -18,7 +18,8 @@ var _ = serder.UseTypeUnionInternallyTagged(types.Ref(types.NewTypeUnion[SharedS
)), "type") )), "type")


type LocalSharedStorage struct { type LocalSharedStorage struct {
Type string `json:"type"`
serder.Metadata `union:"Local"`
Type string `json:"type"`
// 调度Package时的Package的根路径 // 调度Package时的Package的根路径
LoadBase string `json:"loadBase"` LoadBase string `json:"loadBase"`
} }


+ 14
- 1
utils/lo2/lo.go View File

@@ -1,6 +1,9 @@
package lo2 package lo2


import "github.com/samber/lo"
import (
"github.com/samber/lo"
"gitlink.org.cn/cloudream/common/utils/math2"
)


func Remove[T comparable](arr []T, item T) []T { func Remove[T comparable](arr []T, item T) []T {
index := lo.IndexOf(arr, item) index := lo.IndexOf(arr, item)
@@ -25,6 +28,16 @@ func RemoveAt[T any](arr []T, index int) []T {
return append(arr[:index], arr[index+1:]...) return append(arr[:index], arr[index+1:]...)
} }


func RemoveRange[T any](arr []T, start int, length int) []T {
if start >= len(arr) {
return arr
}

length = math2.Min(len(arr), start+length) - start
copy(arr[start:], arr[start+length:])
return arr[:len(arr)-length]
}

func RemoveAllDefault[T comparable](arr []T) []T { func RemoveAllDefault[T comparable](arr []T) []T {
var def T var def T
return lo.Filter(arr, func(i T, idx int) bool { return lo.Filter(arr, func(i T, idx int) bool {


+ 12
- 0
utils/math2/math.go View File

@@ -33,3 +33,15 @@ func CeilDiv[T constraints.Integer](v T, div T) T {
func FloorDiv[T constraints.Integer](v T, div T) T { func FloorDiv[T constraints.Integer](v T, div T) T {
return v / div return v / div
} }

func Clamp[T constraints.Integer](v, min, max T) T {
if v < min {
return min
}

if v > max {
return max
}

return v
}

Loading…
Cancel
Save