Browse Source

拆分出ioswitch包

pull/49/head
Sydonian 1 year ago
parent
commit
fef4e5be8d
12 changed files with 948 additions and 0 deletions
  1. +74
    -0
      pkgs/ioswitch/dag/graph.go
  2. +75
    -0
      pkgs/ioswitch/dag/node.go
  3. +112
    -0
      pkgs/ioswitch/dag/var.go
  4. +119
    -0
      pkgs/ioswitch/exec/driver.go
  5. +16
    -0
      pkgs/ioswitch/exec/exec.go
  6. +151
    -0
      pkgs/ioswitch/exec/executor.go
  7. +126
    -0
      pkgs/ioswitch/exec/plan_builder.go
  8. +98
    -0
      pkgs/ioswitch/exec/utils.go
  9. +57
    -0
      pkgs/ioswitch/exec/var.go
  10. +85
    -0
      pkgs/ioswitch/exec/worker.go
  11. +26
    -0
      pkgs/ioswitch/parser/fromto.go
  12. +9
    -0
      pkgs/ioswitch/parser/parser.go

+ 74
- 0
pkgs/ioswitch/dag/graph.go View File

@@ -0,0 +1,74 @@
package dag

import (
"gitlink.org.cn/cloudream/common/utils/lo2"
)

type Graph[NP any, VP any] struct {
Nodes []*Node[NP, VP]
isWalking bool
nextVarID int
}

func NewGraph[NP any, VP any]() *Graph[NP, VP] {
return &Graph[NP, VP]{}
}

func (g *Graph[NP, VP]) NewNode(typ NodeType[NP, VP], props NP) *Node[NP, VP] {
n := &Node[NP, VP]{
Type: typ,
Props: props,
Graph: g,
}
typ.InitNode(n)
g.Nodes = append(g.Nodes, n)
return n
}

func (g *Graph[NP, VP]) RemoveNode(node *Node[NP, VP]) {
for i, n := range g.Nodes {
if n == node {
if g.isWalking {
g.Nodes[i] = nil
} else {
g.Nodes = lo2.RemoveAt(g.Nodes, i)
}
break
}
}
}

func (g *Graph[NP, VP]) Walk(cb func(node *Node[NP, VP]) bool) {
g.isWalking = true
for i := 0; i < len(g.Nodes); i++ {
if g.Nodes[i] == nil {
continue
}

if !cb(g.Nodes[i]) {
break
}
}
g.isWalking = false

g.Nodes = lo2.RemoveAllDefault(g.Nodes)
}

func (g *Graph[NP, VP]) genVarID() int {
g.nextVarID++
return g.nextVarID
}

func NewNode[NP any, VP any, NT NodeType[NP, VP]](graph *Graph[NP, VP], typ NT, props NP) (*Node[NP, VP], NT) {
return graph.NewNode(typ, props), typ
}

func WalkOnlyType[N NodeType[NP, VP], NP any, VP any](g *Graph[NP, VP], cb func(node *Node[NP, VP], typ N) bool) {
g.Walk(func(node *Node[NP, VP]) bool {
typ, ok := node.Type.(N)
if ok {
return cb(node, typ)
}
return true
})
}

+ 75
- 0
pkgs/ioswitch/dag/node.go View File

@@ -0,0 +1,75 @@
package dag

import (
"fmt"

"gitlink.org.cn/cloudream/common/pkgs/ioswitch/exec"
)

type NodeType[NP any, VP any] interface {
InitNode(node *Node[NP, VP])
String(node *Node[NP, VP]) string
GenerateOp(node *Node[NP, VP], blder *exec.PlanBuilder) error
}

type WorkerInfo interface {
// 获取连接到这个worker的GRPC服务的地址
GetAddress() string
// 判断两个worker是否相同
Equals(worker WorkerInfo) bool
}

type NodeEnvType string

const (
EnvUnknown NodeEnvType = ""
EnvExecutor NodeEnvType = "Executor"
EnvWorker NodeEnvType = "Worker"
)

type NodeEnv struct {
Type NodeEnvType
Worker WorkerInfo
}

func (e *NodeEnv) ToEnvUnknown() {
e.Type = EnvUnknown
e.Worker = nil
}

func (e *NodeEnv) ToEnvExecutor() {
e.Type = EnvExecutor
e.Worker = nil
}

func (e *NodeEnv) ToEnvWorker(worker WorkerInfo) {
e.Type = EnvWorker
e.Worker = worker
}

func (e *NodeEnv) Equals(other NodeEnv) bool {
if e.Type != other.Type {
return false
}

if e.Type != EnvWorker {
return true
}

return e.Worker.Equals(other.Worker)
}

type Node[NP any, VP any] struct {
Type NodeType[NP, VP]
Env NodeEnv
Props NP
InputStreams []*StreamVar[NP, VP]
OutputStreams []*StreamVar[NP, VP]
InputValues []*ValueVar[NP, VP]
OutputValues []*ValueVar[NP, VP]
Graph *Graph[NP, VP]
}

func (n *Node[NP, VP]) String() string {
return fmt.Sprintf("%v", n.Type.String(n))
}

+ 112
- 0
pkgs/ioswitch/dag/var.go View File

@@ -0,0 +1,112 @@
package dag

import "gitlink.org.cn/cloudream/common/utils/lo2"

type EndPoint[NP any, VP any] struct {
Node *Node[NP, VP]
SlotIndex int // 所连接的Node的Output或Input数组的索引
}

type StreamVar[NP any, VP any] struct {
ID int
From EndPoint[NP, VP]
Toes []EndPoint[NP, VP]
Props VP
}

func (v *StreamVar[NP, VP]) To(to *Node[NP, VP], slotIdx int) int {
v.Toes = append(v.Toes, EndPoint[NP, VP]{Node: to, SlotIndex: slotIdx})
to.InputStreams[slotIdx] = v
return len(v.Toes) - 1
}

// func (v *StreamVar[NP, VP]) NotTo(toIdx int) EndPoint[NP, VP] {
// ed := v.Toes[toIdx]
// lo2.RemoveAt(v.Toes, toIdx)
// ed.Node.InputStreams[ed.SlotIndex] = nil
// return ed
// }

func (v *StreamVar[NP, VP]) NotTo(node *Node[NP, VP]) (EndPoint[NP, VP], bool) {
for i, ed := range v.Toes {
if ed.Node == node {
v.Toes = lo2.RemoveAt(v.Toes, i)
ed.Node.InputStreams[ed.SlotIndex] = nil
return ed, true
}
}

return EndPoint[NP, VP]{}, false
}

func (v *StreamVar[NP, VP]) NotToWhere(pred func(to EndPoint[NP, VP]) bool) []EndPoint[NP, VP] {
var newToes []EndPoint[NP, VP]
var rmed []EndPoint[NP, VP]
for _, ed := range v.Toes {
if pred(ed) {
ed.Node.InputStreams[ed.SlotIndex] = nil
rmed = append(rmed, ed)
} else {
newToes = append(newToes, ed)
}
}
v.Toes = newToes
return rmed
}

func (v *StreamVar[NP, VP]) NotToAll() []EndPoint[NP, VP] {
for _, ed := range v.Toes {
ed.Node.InputStreams[ed.SlotIndex] = nil
}
toes := v.Toes
v.Toes = nil
return toes
}

func NodeNewOutputStream[NP any, VP any](node *Node[NP, VP], props VP) *StreamVar[NP, VP] {
str := &StreamVar[NP, VP]{
ID: node.Graph.genVarID(),
From: EndPoint[NP, VP]{Node: node, SlotIndex: len(node.OutputStreams)},
Props: props,
}
node.OutputStreams = append(node.OutputStreams, str)
return str
}

func NodeDeclareInputStream[NP any, VP any](node *Node[NP, VP], cnt int) {
node.InputStreams = make([]*StreamVar[NP, VP], cnt)
}

type ValueVarType int

const (
StringValueVar ValueVarType = iota
SignalValueVar
)

type ValueVar[NP any, VP any] struct {
ID int
From EndPoint[NP, VP]
Toes []EndPoint[NP, VP]
Props VP
}

func (v *ValueVar[NP, VP]) To(to *Node[NP, VP], slotIdx int) int {
v.Toes = append(v.Toes, EndPoint[NP, VP]{Node: to, SlotIndex: slotIdx})
to.InputValues[slotIdx] = v
return len(v.Toes) - 1
}

func NodeNewOutputValue[NP any, VP any](node *Node[NP, VP], props VP) *ValueVar[NP, VP] {
val := &ValueVar[NP, VP]{
ID: node.Graph.genVarID(),
From: EndPoint[NP, VP]{Node: node, SlotIndex: len(node.OutputStreams)},
Props: props,
}
node.OutputValues = append(node.OutputValues, val)
return val
}

func NodeDeclareInputValue[NP any, VP any](node *Node[NP, VP], cnt int) {
node.InputValues = make([]*ValueVar[NP, VP], cnt)
}

+ 119
- 0
pkgs/ioswitch/exec/driver.go View File

@@ -0,0 +1,119 @@
package exec

import (
"context"
"fmt"
"io"
"sync"

"gitlink.org.cn/cloudream/common/pkgs/future"
"gitlink.org.cn/cloudream/common/utils/io2"
stgglb "gitlink.org.cn/cloudream/storage/common/globals"
)

type Driver struct {
planID PlanID
planBlder *PlanBuilder
callback *future.SetVoidFuture
ctx context.Context
cancel context.CancelFunc
driverExec *Executor
}

// 开始写入一个流。此函数会将输入视为一个完整的流,因此会给流包装一个Range来获取只需要的部分。
func (e *Driver) BeginWrite(str io.ReadCloser, handle *DriverWriteStream) {
handle.Var.Stream = io2.NewRange(str, handle.RangeHint.Offset, handle.RangeHint.Length)
e.driverExec.PutVars(handle.Var)
}

// 开始写入一个流。此函数默认输入流已经是Handle的RangeHint锁描述的范围,因此不会做任何其他处理
func (e *Driver) BeginWriteRanged(str io.ReadCloser, handle *DriverWriteStream) {
handle.Var.Stream = str
e.driverExec.PutVars(handle.Var)
}

func (e *Driver) BeginRead(handle *DriverReadStream) (io.ReadCloser, error) {
err := e.driverExec.BindVars(e.ctx, handle.Var)
if err != nil {
return nil, fmt.Errorf("bind vars: %w", err)
}

return handle.Var.Stream, nil
}

func (e *Driver) Signal(signal *DriverSignalVar) {
e.driverExec.PutVars(signal.Var)
}

func (e *Driver) Wait(ctx context.Context) (map[string]any, error) {
err := e.callback.Wait(ctx)
if err != nil {
return nil, err
}

ret := make(map[string]any)
e.planBlder.DriverPlan.StoreMap.Range(func(k, v any) bool {
ret[k.(string)] = v
return true
})

return ret, nil
}

func (e *Driver) execute() {
wg := sync.WaitGroup{}

for _, p := range e.planBlder.WorkerPlans {
wg.Add(1)

go func(p *WorkerPlanBuilder) {
defer wg.Done()

plan := Plan{
ID: e.planID,
Ops: p.Ops,
}

cli, err := stgglb.AgentRPCPool.Acquire(stgglb.SelectGRPCAddress(&p.Node))
if err != nil {
e.stopWith(fmt.Errorf("new agent rpc client of node %v: %w", p.Node.NodeID, err))
return
}
defer stgglb.AgentRPCPool.Release(cli)

err = cli.ExecuteIOPlan(e.ctx, plan)
if err != nil {
e.stopWith(fmt.Errorf("execute plan at %v: %w", p.Node.NodeID, err))
return
}
}(p)
}

err := e.driverExec.Run(e.ctx)
if err != nil {
e.stopWith(fmt.Errorf("run executor switch: %w", err))
return
}

wg.Wait()

e.callback.SetVoid()
}

func (e *Driver) stopWith(err error) {
e.callback.SetError(err)
e.cancel()
}

type DriverWriteStream struct {
Var *StreamVar
RangeHint *Range
}

type DriverReadStream struct {
Var *StreamVar
}

type DriverSignalVar struct {
Var *SignalVar
}

+ 16
- 0
pkgs/ioswitch/exec/exec.go View File

@@ -0,0 +1,16 @@
package exec

import (
"context"
)

type PlanID string

type Plan struct {
ID PlanID `json:"id"`
Ops []Op `json:"ops"`
}

type Op interface {
Execute(ctx context.Context, sw *Executor) error
}

+ 151
- 0
pkgs/ioswitch/exec/executor.go View File

@@ -0,0 +1,151 @@
package exec

import (
"context"
"fmt"
"sync"

"gitlink.org.cn/cloudream/common/pkgs/future"
"gitlink.org.cn/cloudream/common/utils/lo2"
"gitlink.org.cn/cloudream/common/utils/sync2"
)

type bindingVars struct {
Waittings []Var
Bindeds []Var
Callback *future.SetVoidFuture
}

type Executor struct {
plan Plan
vars map[VarID]Var
bindings []*bindingVars
lock sync.Mutex
}

func NewExecutor(plan Plan) *Executor {
planning := Executor{
plan: plan,
vars: make(map[VarID]Var),
}

return &planning
}

func (s *Executor) Plan() *Plan {
return &s.plan
}

func (s *Executor) Run(ctx context.Context) error {
ctx2, cancel := context.WithCancel(ctx)
defer cancel()

return sync2.ParallelDo(s.plan.Ops, func(o Op, idx int) error {
err := o.Execute(ctx2, s)

s.lock.Lock()
defer s.lock.Unlock()

if err != nil {
cancel()
return fmt.Errorf("%T: %w", o, err)
}

return nil
})
}

func (s *Executor) BindVars(ctx context.Context, vs ...Var) error {
s.lock.Lock()

callback := future.NewSetVoid()
binding := &bindingVars{
Callback: callback,
}

for _, v := range vs {
v2 := s.vars[v.GetID()]
if v2 == nil {
binding.Waittings = append(binding.Waittings, v)
continue
}

if err := AssignVar(v2, v); err != nil {
s.lock.Unlock()
return fmt.Errorf("assign var %v to %v: %w", v2.GetID(), v.GetID(), err)
}

binding.Bindeds = append(binding.Bindeds, v)
}

if len(binding.Waittings) == 0 {
s.lock.Unlock()
return nil
}

s.bindings = append(s.bindings, binding)
s.lock.Unlock()

err := callback.Wait(ctx)

s.lock.Lock()
defer s.lock.Unlock()

s.bindings = lo2.Remove(s.bindings, binding)

return err
}

func (s *Executor) PutVars(vs ...Var) {
s.lock.Lock()
defer s.lock.Unlock()

loop:
for _, v := range vs {
for ib, b := range s.bindings {
for iw, w := range b.Waittings {
if w.GetID() != v.GetID() {
continue
}

if err := AssignVar(v, w); err != nil {
b.Callback.SetError(fmt.Errorf("assign var %v to %v: %w", v.GetID(), w.GetID(), err))
// 绑定类型不对,说明生成的执行计划有问题,怎么处理都可以,因为最终会执行失败
continue loop
}

b.Bindeds = append(b.Bindeds, w)
b.Waittings = lo2.RemoveAt(b.Waittings, iw)
if len(b.Waittings) == 0 {
b.Callback.SetVoid()
s.bindings = lo2.RemoveAt(s.bindings, ib)
}

// 绑定成功,继续最外层循环
continue loop
}

}

// 如果没有绑定,则直接放入变量表中
s.vars[v.GetID()] = v
}
}

func BindArrayVars[T Var](sw *Executor, ctx context.Context, vs []T) error {
var vs2 []Var
for _, v := range vs {
vs2 = append(vs2, v)
}

return sw.BindVars(ctx, vs2...)
}

func PutArrayVars[T Var](sw *Executor, vs []T) {
var vs2 []Var
for _, v := range vs {
vs2 = append(vs2, v)
}

sw.PutVars(vs2...)
}

+ 126
- 0
pkgs/ioswitch/exec/plan_builder.go View File

@@ -0,0 +1,126 @@
package exec

import (
"context"
"sync"

"gitlink.org.cn/cloudream/common/pkgs/future"
cdssdk "gitlink.org.cn/cloudream/common/sdks/storage"
"gitlink.org.cn/cloudream/common/utils/lo2"
)

type PlanBuilder struct {
Vars []Var
WorkerPlans map[cdssdk.NodeID]*WorkerPlanBuilder
DriverPlan DriverPlanBuilder
}

func NewPlanBuilder() *PlanBuilder {
bld := &PlanBuilder{
WorkerPlans: make(map[cdssdk.NodeID]*WorkerPlanBuilder),
DriverPlan: DriverPlanBuilder{
StoreMap: &sync.Map{},
},
}

return bld
}

func (b *PlanBuilder) AtExecutor() *DriverPlanBuilder {
return &b.DriverPlan
}

func (b *PlanBuilder) AtAgent(node cdssdk.Node) *WorkerPlanBuilder {
agtPlan, ok := b.WorkerPlans[node.NodeID]
if !ok {
agtPlan = &WorkerPlanBuilder{
Node: node,
}
b.WorkerPlans[node.NodeID] = agtPlan
}

return agtPlan
}

func (b *PlanBuilder) NewStreamVar() *StreamVar {
v := &StreamVar{
ID: VarID(len(b.Vars)),
}
b.Vars = append(b.Vars, v)

return v
}

func (b *PlanBuilder) NewIntVar() *IntVar {
v := &IntVar{
ID: VarID(len(b.Vars)),
}
b.Vars = append(b.Vars, v)

return v
}

func (b *PlanBuilder) NewStringVar() *StringVar {
v := &StringVar{
ID: VarID(len(b.Vars)),
}
b.Vars = append(b.Vars, v)

return v
}
func (b *PlanBuilder) NewSignalVar() *SignalVar {
v := &SignalVar{
ID: VarID(len(b.Vars)),
}
b.Vars = append(b.Vars, v)

return v
}

func (b *PlanBuilder) Execute() *Driver {
ctx, cancel := context.WithCancel(context.Background())
planID := genRandomPlanID()

execPlan := Plan{
ID: planID,
Ops: b.DriverPlan.Ops,
}

exec := Driver{
planID: planID,
planBlder: b,
callback: future.NewSetVoid(),
ctx: ctx,
cancel: cancel,
driverExec: NewExecutor(execPlan),
}
go exec.execute()

return &exec
}

type WorkerPlanBuilder struct {
Node cdssdk.Node
Ops []Op
}

func (b *WorkerPlanBuilder) AddOp(op Op) {
b.Ops = append(b.Ops, op)
}

func (b *WorkerPlanBuilder) RemoveOp(op Op) {
b.Ops = lo2.Remove(b.Ops, op)
}

type DriverPlanBuilder struct {
Ops []Op
StoreMap *sync.Map
}

func (b *DriverPlanBuilder) AddOp(op Op) {
b.Ops = append(b.Ops, op)
}

func (b *DriverPlanBuilder) RemoveOp(op Op) {
b.Ops = lo2.Remove(b.Ops, op)
}

+ 98
- 0
pkgs/ioswitch/exec/utils.go View File

@@ -0,0 +1,98 @@
package exec

import (
"fmt"
"reflect"

"github.com/google/uuid"
"gitlink.org.cn/cloudream/common/utils/math2"
)

func genRandomPlanID() PlanID {
return PlanID(uuid.NewString())
}

func AssignVar(from Var, to Var) error {
if reflect.TypeOf(from) != reflect.TypeOf(to) {
return fmt.Errorf("cannot assign %T to %T", from, to)
}

switch from := from.(type) {
case *StreamVar:
to.(*StreamVar).Stream = from.Stream
case *IntVar:
to.(*IntVar).Value = from.Value
case *StringVar:
to.(*StringVar).Value = from.Value
case *SignalVar:
}

return nil
}

type Range struct {
Offset int64
Length *int64
}

func (r *Range) Extend(other Range) {
newOffset := math2.Min(r.Offset, other.Offset)

if r.Length == nil {
r.Offset = newOffset
return
}

if other.Length == nil {
r.Offset = newOffset
r.Length = nil
return
}

otherEnd := other.Offset + *other.Length
rEnd := r.Offset + *r.Length

newEnd := math2.Max(otherEnd, rEnd)
r.Offset = newOffset
*r.Length = newEnd - newOffset
}

func (r *Range) ExtendStart(start int64) {
r.Offset = math2.Min(r.Offset, start)
}

func (r *Range) ExtendEnd(end int64) {
if r.Length == nil {
return
}

rEnd := r.Offset + *r.Length
newLen := math2.Max(end, rEnd) - r.Offset
r.Length = &newLen
}

func (r *Range) Fix(maxLength int64) {
if r.Length != nil {
return
}

len := maxLength - r.Offset
r.Length = &len
}

func (r *Range) ToStartEnd(maxLen int64) (start int64, end int64) {
if r.Length == nil {
return r.Offset, maxLen
}

end = r.Offset + *r.Length
return r.Offset, end
}

func (r *Range) ClampLength(maxLen int64) {
if r.Length == nil {
return
}

*r.Length = math2.Min(*r.Length, maxLen-r.Offset)
}

+ 57
- 0
pkgs/ioswitch/exec/var.go View File

@@ -0,0 +1,57 @@
package exec

import (
"io"

"gitlink.org.cn/cloudream/common/pkgs/types"
"gitlink.org.cn/cloudream/common/utils/serder"
)

type VarID int

type Var interface {
GetID() VarID
}

var VarUnion = types.NewTypeUnion[Var](
(*IntVar)(nil),
(*StringVar)(nil),
(*SignalVar)(nil),
(*StreamVar)(nil),
)
var _ = serder.UseTypeUnionExternallyTagged(&VarUnion)

type StreamVar struct {
ID VarID `json:"id"`
Stream io.ReadCloser `json:"-"`
}

func (v *StreamVar) GetID() VarID {
return v.ID
}

type IntVar struct {
ID VarID `json:"id"`
Value string `json:"value"`
}

func (v *IntVar) GetID() VarID {
return v.ID
}

type StringVar struct {
ID VarID `json:"id"`
Value string `json:"value"`
}

func (v *StringVar) GetID() VarID {
return v.ID
}

type SignalVar struct {
ID VarID `json:"id"`
}

func (v *SignalVar) GetID() VarID {
return v.ID
}

+ 85
- 0
pkgs/ioswitch/exec/worker.go View File

@@ -0,0 +1,85 @@
package exec

import (
"context"
"sync"

"github.com/samber/lo"
"gitlink.org.cn/cloudream/common/pkgs/future"
"gitlink.org.cn/cloudream/common/utils/lo2"
)

type finding struct {
PlanID PlanID
Callback *future.SetValueFuture[*Executor]
}

type Worker struct {
lock sync.Mutex
executors map[PlanID]*Executor
findings []*finding
}

func NewWorker() Worker {
return Worker{
executors: make(map[PlanID]*Executor),
}
}

func (s *Worker) Add(exe *Executor) {
s.lock.Lock()
defer s.lock.Unlock()

s.executors[exe.Plan().ID] = exe

s.findings = lo.Reject(s.findings, func(f *finding, idx int) bool {
if f.PlanID != exe.Plan().ID {
return false
}

f.Callback.SetValue(exe)
return true
})
}

func (s *Worker) Remove(sw *Executor) {
s.lock.Lock()
defer s.lock.Unlock()

delete(s.executors, sw.Plan().ID)
}

func (s *Worker) FindByID(id PlanID) *Executor {
s.lock.Lock()
defer s.lock.Unlock()

return s.executors[id]
}

func (s *Worker) FindByIDContexted(ctx context.Context, id PlanID) *Executor {
s.lock.Lock()

sw := s.executors[id]
if sw != nil {
s.lock.Unlock()
return sw
}

cb := future.NewSetValue[*Executor]()
f := &finding{
PlanID: id,
Callback: cb,
}
s.findings = append(s.findings, f)

s.lock.Unlock()

sw, _ = cb.WaitValue(ctx)

s.lock.Lock()
defer s.lock.Unlock()

s.findings = lo2.Remove(s.findings, f)

return sw
}

+ 26
- 0
pkgs/ioswitch/parser/fromto.go View File

@@ -0,0 +1,26 @@
package parser

type From interface{}

type To interface{}

type FromTos []FromTo

type FromTo struct {
Froms []From
Toes []To
}

func NewFromTo() FromTo {
return FromTo{}
}

func (ft *FromTo) AddFrom(from From) *FromTo {
ft.Froms = append(ft.Froms, from)
return ft
}

func (ft *FromTo) AddTo(to To) *FromTo {
ft.Toes = append(ft.Toes, to)
return ft
}

+ 9
- 0
pkgs/ioswitch/parser/parser.go View File

@@ -0,0 +1,9 @@
package parser

import (
"gitlink.org.cn/cloudream/common/pkgs/ioswitch/exec"
)

type FromToParser interface {
Parse(ft FromTo, blder *exec.PlanBuilder) error
}

Loading…
Cancel
Save