Browse Source

增加LRC冗余方式

gitlink
Sydonian 1 year ago
parent
commit
58040ca5f6
13 changed files with 1614 additions and 0 deletions
  1. +36
    -0
      common/pkgs/ec/lrc/lrc.go
  2. +61
    -0
      common/pkgs/ioswitchlrc/agent_worker.go
  3. +134
    -0
      common/pkgs/ioswitchlrc/fromto.go
  4. +23
    -0
      common/pkgs/ioswitchlrc/ioswitch.go
  5. +137
    -0
      common/pkgs/ioswitchlrc/ops2/chunked.go
  6. +112
    -0
      common/pkgs/ioswitchlrc/ops2/clone.go
  7. +189
    -0
      common/pkgs/ioswitchlrc/ops2/ec.go
  8. +129
    -0
      common/pkgs/ioswitchlrc/ops2/ipfs.go
  9. +75
    -0
      common/pkgs/ioswitchlrc/ops2/ops.go
  10. +95
    -0
      common/pkgs/ioswitchlrc/ops2/range.go
  11. +297
    -0
      common/pkgs/ioswitchlrc/parser/generator.go
  12. +309
    -0
      common/pkgs/ioswitchlrc/parser/passes.go
  13. +17
    -0
      common/pkgs/ioswitchlrc/utils.go

+ 36
- 0
common/pkgs/ec/lrc/lrc.go View File

@@ -0,0 +1,36 @@
package lrc

import "github.com/klauspost/reedsolomon"

type LRC struct {
n int // 总块数,包括局部块
k int // 数据块数量
groups []int // 分组校验块生成时使用的数据块
l *reedsolomon.LRC
}

func New(n int, k int, groups []int) (*LRC, error) {
lrc := &LRC{
n: n,
k: k,
groups: groups,
}

l, err := reedsolomon.NewLRC(k, n-k, groups)
if err != nil {
return nil, err
}

lrc.l = l
return lrc, nil
}

// 根据全局修复的原理,生成根据输入修复指定块的矩阵。要求input内元素的值<n-len(r),且至少包含k个。
func (l *LRC) GenerateMatrix(inputIdxs []int, outputIdxs []int) ([][]byte, error) {
return l.l.GenerateMatrix(inputIdxs, outputIdxs)
}

// 生成修复组内某个块的矩阵。只支持组内缺少一个块的情况,且默认组内的其他块都存在。
func (l *LRC) GenerateGroupMatrix(outputIdx int) ([][]byte, error) {
return l.l.GenerateGroupMatrix(outputIdx)
}

+ 61
- 0
common/pkgs/ioswitchlrc/agent_worker.go View File

@@ -0,0 +1,61 @@
package ioswitchlrc

import (
"context"
"io"

"gitlink.org.cn/cloudream/common/pkgs/ioswitch/exec"
cdssdk "gitlink.org.cn/cloudream/common/sdks/storage"
stgglb "gitlink.org.cn/cloudream/storage/common/globals"
agtrpc "gitlink.org.cn/cloudream/storage/common/pkgs/grpc/agent"
)

type AgentWorker struct {
Node cdssdk.Node
}

func (w *AgentWorker) NewClient() (exec.WorkerClient, error) {
cli, err := stgglb.AgentRPCPool.Acquire(stgglb.SelectGRPCAddress(&w.Node))
if err != nil {
return nil, err
}

return &AgentWorkerClient{cli: cli}, nil
}

func (w *AgentWorker) String() string {
return w.Node.String()
}

func (w *AgentWorker) Equals(worker exec.WorkerInfo) bool {
aw, ok := worker.(*AgentWorker)
if !ok {
return false
}

return w.Node.NodeID == aw.Node.NodeID
}

type AgentWorkerClient struct {
cli *agtrpc.PoolClient
}

func (c *AgentWorkerClient) ExecutePlan(ctx context.Context, plan exec.Plan) error {
return c.cli.ExecuteIOPlan(ctx, plan)
}
func (c *AgentWorkerClient) SendStream(ctx context.Context, planID exec.PlanID, v *exec.StreamVar, str io.ReadCloser) error {
return c.cli.SendStream(ctx, planID, v.ID, str)
}
func (c *AgentWorkerClient) SendVar(ctx context.Context, planID exec.PlanID, v exec.Var) error {
return c.cli.SendVar(ctx, planID, v)
}
func (c *AgentWorkerClient) GetStream(ctx context.Context, planID exec.PlanID, v *exec.StreamVar, signal *exec.SignalVar) (io.ReadCloser, error) {
return c.cli.GetStream(ctx, planID, v.ID, signal)
}
func (c *AgentWorkerClient) GetVar(ctx context.Context, planID exec.PlanID, v exec.Var, signal *exec.SignalVar) error {
return c.cli.GetVar(ctx, planID, v, signal)
}
func (c *AgentWorkerClient) Close() error {
stgglb.AgentRPCPool.Release(c.cli)
return nil
}

+ 134
- 0
common/pkgs/ioswitchlrc/fromto.go View File

@@ -0,0 +1,134 @@
package ioswitchlrc

import (
"gitlink.org.cn/cloudream/common/pkgs/ioswitch/exec"
cdssdk "gitlink.org.cn/cloudream/common/sdks/storage"
)

type From interface {
GetDataIndex() int
}

type To interface {
// To所需要的文件流的范围。具体含义与DataIndex有关系:
// 如果DataIndex == -1,则表示在整个文件的范围。
// 如果DataIndex >= 0,则表示在文件的某个分片的范围。
GetRange() exec.Range
GetDataIndex() int
}

type FromDriver struct {
Handle *exec.DriverWriteStream
DataIndex int
}

func NewFromDriver(dataIndex int) (*FromDriver, *exec.DriverWriteStream) {
handle := &exec.DriverWriteStream{
RangeHint: &exec.Range{},
}
return &FromDriver{
Handle: handle,
DataIndex: dataIndex,
}, handle
}

func (f *FromDriver) GetDataIndex() int {
return f.DataIndex
}

type FromNode struct {
FileHash string
Node *cdssdk.Node
DataIndex int
}

func NewFromNode(fileHash string, node *cdssdk.Node, dataIndex int) *FromNode {
return &FromNode{
FileHash: fileHash,
Node: node,
DataIndex: dataIndex,
}
}

func (f *FromNode) GetDataIndex() int {
return f.DataIndex
}

type ToDriver struct {
Handle *exec.DriverReadStream
DataIndex int
Range exec.Range
}

func NewToDriver(dataIndex int) (*ToDriver, *exec.DriverReadStream) {
str := exec.DriverReadStream{}
return &ToDriver{
Handle: &str,
DataIndex: dataIndex,
}, &str
}

func NewToDriverWithRange(dataIndex int, rng exec.Range) (*ToDriver, *exec.DriverReadStream) {
str := exec.DriverReadStream{}
return &ToDriver{
Handle: &str,
DataIndex: dataIndex,
Range: rng,
}, &str
}

func (t *ToDriver) GetDataIndex() int {
return t.DataIndex
}

func (t *ToDriver) GetRange() exec.Range {
return t.Range
}

type ToNode struct {
Node cdssdk.Node
DataIndex int
Range exec.Range
FileHashStoreKey string
}

func NewToNode(node cdssdk.Node, dataIndex int, fileHashStoreKey string) *ToNode {
return &ToNode{
Node: node,
DataIndex: dataIndex,
FileHashStoreKey: fileHashStoreKey,
}
}

func NewToNodeWithRange(node cdssdk.Node, dataIndex int, fileHashStoreKey string, rng exec.Range) *ToNode {
return &ToNode{
Node: node,
DataIndex: dataIndex,
FileHashStoreKey: fileHashStoreKey,
Range: rng,
}
}

func (t *ToNode) GetDataIndex() int {
return t.DataIndex
}

func (t *ToNode) GetRange() exec.Range {
return t.Range
}

// type ToStorage struct {
// Storage cdssdk.Storage
// DataIndex int
// }

// func NewToStorage(storage cdssdk.Storage, dataIndex int) *ToStorage {
// return &ToStorage{
// Storage: storage,
// DataIndex: dataIndex,
// }
// }

// func (t *ToStorage) GetDataIndex() int {
// return t.DataIndex
// }

+ 23
- 0
common/pkgs/ioswitchlrc/ioswitch.go View File

@@ -0,0 +1,23 @@
package ioswitchlrc

import (
"gitlink.org.cn/cloudream/common/pkgs/ioswitch/exec"
)

type NodeProps struct {
From From
To To
}

type ValueVarType int

const (
StringValueVar ValueVarType = iota
SignalValueVar
)

type VarProps struct {
StreamIndex int // 流的编号,只在StreamVar上有意义
ValueType ValueVarType // 值类型,只在ValueVar上有意义
Var exec.Var // 生成Plan的时候创建的对应的Var
}

+ 137
- 0
common/pkgs/ioswitchlrc/ops2/chunked.go View File

@@ -0,0 +1,137 @@
package ops2

import (
"context"
"fmt"
"io"

"github.com/samber/lo"
"gitlink.org.cn/cloudream/common/pkgs/future"
"gitlink.org.cn/cloudream/common/pkgs/ioswitch/dag"
"gitlink.org.cn/cloudream/common/pkgs/ioswitch/exec"
"gitlink.org.cn/cloudream/common/utils/io2"
"gitlink.org.cn/cloudream/storage/common/pkgs/ioswitch2"
"golang.org/x/sync/semaphore"
)

func init() {
exec.UseOp[*ChunkedSplit]()
exec.UseOp[*ChunkedJoin]()
}

type ChunkedSplit struct {
Input *exec.StreamVar `json:"input"`
Outputs []*exec.StreamVar `json:"outputs"`
ChunkSize int `json:"chunkSize"`
PaddingZeros bool `json:"paddingZeros"`
}

func (o *ChunkedSplit) Execute(ctx context.Context, e *exec.Executor) error {
err := e.BindVars(ctx, o.Input)
if err != nil {
return err
}
defer o.Input.Stream.Close()

outputs := io2.ChunkedSplit(o.Input.Stream, o.ChunkSize, len(o.Outputs), io2.ChunkedSplitOption{
PaddingZeros: o.PaddingZeros,
})

sem := semaphore.NewWeighted(int64(len(outputs)))
for i := range outputs {
sem.Acquire(ctx, 1)

o.Outputs[i].Stream = io2.AfterReadClosedOnce(outputs[i], func(closer io.ReadCloser) {
sem.Release(1)
})
}
exec.PutArrayVars(e, o.Outputs)

return sem.Acquire(ctx, int64(len(outputs)))
}

type ChunkedJoin struct {
Inputs []*exec.StreamVar `json:"inputs"`
Output *exec.StreamVar `json:"output"`
ChunkSize int `json:"chunkSize"`
}

func (o *ChunkedJoin) Execute(ctx context.Context, e *exec.Executor) error {
err := exec.BindArrayVars(e, ctx, o.Inputs)
if err != nil {
return err
}

var strReaders []io.Reader
for _, s := range o.Inputs {
strReaders = append(strReaders, s.Stream)
}
defer func() {
for _, str := range o.Inputs {
str.Stream.Close()
}
}()

fut := future.NewSetVoid()
o.Output.Stream = io2.AfterReadClosedOnce(io2.BufferedChunkedJoin(strReaders, o.ChunkSize), func(closer io.ReadCloser) {
fut.SetVoid()
})
e.PutVars(o.Output)

return fut.Wait(ctx)
}

type ChunkedSplitType struct {
OutputCount int
ChunkSize int
}

func (t *ChunkedSplitType) InitNode(node *dag.Node) {
dag.NodeDeclareInputStream(node, 1)
for i := 0; i < t.OutputCount; i++ {
dag.NodeNewOutputStream(node, &ioswitch2.VarProps{
StreamIndex: i,
})
}
}

func (t *ChunkedSplitType) GenerateOp(op *dag.Node) (exec.Op, error) {
return &ChunkedSplit{
Input: op.InputStreams[0].Var,
Outputs: lo.Map(op.OutputStreams, func(v *dag.StreamVar, idx int) *exec.StreamVar {
return v.Var
}),
ChunkSize: t.ChunkSize,
PaddingZeros: true,
}, nil
}

func (t *ChunkedSplitType) String(node *dag.Node) string {
return fmt.Sprintf("ChunkedSplit[%v]%v%v", t.ChunkSize, formatStreamIO(node), formatValueIO(node))
}

type ChunkedJoinType struct {
InputCount int
ChunkSize int
}

func (t *ChunkedJoinType) InitNode(node *dag.Node) {
dag.NodeDeclareInputStream(node, t.InputCount)
dag.NodeNewOutputStream(node, &ioswitch2.VarProps{
StreamIndex: -1,
})
}

func (t *ChunkedJoinType) GenerateOp(op *dag.Node) (exec.Op, error) {
return &ChunkedJoin{
Inputs: lo.Map(op.InputStreams, func(v *dag.StreamVar, idx int) *exec.StreamVar {
return v.Var
}),
Output: op.OutputStreams[0].Var,
ChunkSize: t.ChunkSize,
}, nil
}

func (t *ChunkedJoinType) String(node *dag.Node) string {
return fmt.Sprintf("ChunkedJoin[%v]%v%v", t.ChunkSize, formatStreamIO(node), formatValueIO(node))
}

+ 112
- 0
common/pkgs/ioswitchlrc/ops2/clone.go View File

@@ -0,0 +1,112 @@
package ops2

import (
"context"
"fmt"
"io"

"github.com/samber/lo"
"gitlink.org.cn/cloudream/common/pkgs/ioswitch/dag"
"gitlink.org.cn/cloudream/common/pkgs/ioswitch/exec"
"gitlink.org.cn/cloudream/common/utils/io2"
"golang.org/x/sync/semaphore"
)

func init() {
exec.UseOp[*CloneStream]()
exec.UseOp[*CloneVar]()
}

type CloneStream struct {
Input *exec.StreamVar `json:"input"`
Outputs []*exec.StreamVar `json:"outputs"`
}

func (o *CloneStream) Execute(ctx context.Context, e *exec.Executor) error {
err := e.BindVars(ctx, o.Input)
if err != nil {
return err
}
defer o.Input.Stream.Close()

cloned := io2.Clone(o.Input.Stream, len(o.Outputs))

sem := semaphore.NewWeighted(int64(len(o.Outputs)))
for i, s := range cloned {
sem.Acquire(ctx, 1)

o.Outputs[i].Stream = io2.AfterReadClosedOnce(s, func(closer io.ReadCloser) {
sem.Release(1)
})
}
exec.PutArrayVars(e, o.Outputs)

return sem.Acquire(ctx, int64(len(o.Outputs)))
}

type CloneVar struct {
Raw exec.Var `json:"raw"`
Cloneds []exec.Var `json:"cloneds"`
}

func (o *CloneVar) Execute(ctx context.Context, e *exec.Executor) error {
err := e.BindVars(ctx, o.Raw)
if err != nil {
return err
}

for _, v := range o.Cloneds {
if err := exec.AssignVar(o.Raw, v); err != nil {
return fmt.Errorf("clone var: %w", err)
}
}
e.PutVars(o.Cloneds...)

return nil
}

type CloneStreamType struct{}

func (t *CloneStreamType) InitNode(node *dag.Node) {
dag.NodeDeclareInputStream(node, 1)
}

func (t *CloneStreamType) GenerateOp(op *dag.Node) (exec.Op, error) {
return &CloneStream{
Input: op.InputStreams[0].Var,
Outputs: lo.Map(op.OutputStreams, func(v *dag.StreamVar, idx int) *exec.StreamVar {
return v.Var
}),
}, nil
}

func (t *CloneStreamType) NewOutput(node *dag.Node) *dag.StreamVar {
return dag.NodeNewOutputStream(node, nil)
}

func (t *CloneStreamType) String(node *dag.Node) string {
return fmt.Sprintf("CloneStream[]%v%v", formatStreamIO(node), formatValueIO(node))
}

type CloneVarType struct{}

func (t *CloneVarType) InitNode(node *dag.Node) {
dag.NodeDeclareInputValue(node, 1)
}

func (t *CloneVarType) GenerateOp(op *dag.Node) (exec.Op, error) {
return &CloneVar{
Raw: op.InputValues[0].Var,
Cloneds: lo.Map(op.OutputValues, func(v *dag.ValueVar, idx int) exec.Var {
return v.Var
}),
}, nil
}

func (t *CloneVarType) NewOutput(node *dag.Node) *dag.ValueVar {
return dag.NodeNewOutputValue(node, nil)
}

func (t *CloneVarType) String(node *dag.Node) string {
return fmt.Sprintf("CloneVar[]%v%v", formatStreamIO(node), formatValueIO(node))
}

+ 189
- 0
common/pkgs/ioswitchlrc/ops2/ec.go View File

@@ -0,0 +1,189 @@
package ops2

import (
"context"
"fmt"
"io"

"github.com/samber/lo"
"gitlink.org.cn/cloudream/common/pkgs/future"
"gitlink.org.cn/cloudream/common/pkgs/ioswitch/dag"
"gitlink.org.cn/cloudream/common/pkgs/ioswitch/exec"
cdssdk "gitlink.org.cn/cloudream/common/sdks/storage"
"gitlink.org.cn/cloudream/common/utils/io2"
"gitlink.org.cn/cloudream/common/utils/sync2"
"gitlink.org.cn/cloudream/storage/common/pkgs/ec"
"gitlink.org.cn/cloudream/storage/common/pkgs/ec/lrc"
"gitlink.org.cn/cloudream/storage/common/pkgs/ioswitch2"
"gitlink.org.cn/cloudream/storage/common/pkgs/ioswitchlrc"
)

func init() {
exec.UseOp[*GalMultiply]()
}

type GalMultiply struct {
Coef [][]byte `json:"coef"`
Inputs []*exec.StreamVar `json:"inputs"`
Outputs []*exec.StreamVar `json:"outputs"`
ChunkSize int `json:"chunkSize"`
}

func (o *GalMultiply) Execute(ctx context.Context, e *exec.Executor) error {
err := exec.BindArrayVars(e, ctx, o.Inputs)
if err != nil {
return err
}
defer func() {
for _, s := range o.Inputs {
s.Stream.Close()
}
}()

outputWrs := make([]*io.PipeWriter, len(o.Outputs))

for i := range o.Outputs {
rd, wr := io.Pipe()
o.Outputs[i].Stream = rd
outputWrs[i] = wr
}

fut := future.NewSetVoid()
go func() {
mul := ec.GaloisMultiplier().BuildGalois()

inputChunks := make([][]byte, len(o.Inputs))
for i := range o.Inputs {
inputChunks[i] = make([]byte, o.ChunkSize)
}
outputChunks := make([][]byte, len(o.Outputs))
for i := range o.Outputs {
outputChunks[i] = make([]byte, o.ChunkSize)
}

for {
err := sync2.ParallelDo(o.Inputs, func(s *exec.StreamVar, i int) error {
_, err := io.ReadFull(s.Stream, inputChunks[i])
return err
})
if err == io.EOF {
fut.SetVoid()
return
}
if err != nil {
fut.SetError(err)
return
}

err = mul.Multiply(o.Coef, inputChunks, outputChunks)
if err != nil {
fut.SetError(err)
return
}

for i := range o.Outputs {
err := io2.WriteAll(outputWrs[i], outputChunks[i])
if err != nil {
fut.SetError(err)
return
}
}
}
}()

exec.PutArrayVars(e, o.Outputs)
err = fut.Wait(ctx)
if err != nil {
for _, wr := range outputWrs {
wr.CloseWithError(err)
}
return err
}

for _, wr := range outputWrs {
wr.Close()
}
return nil
}

type LRCConstructAnyType struct {
LRC cdssdk.LRCRedundancy
}

func (t *LRCConstructAnyType) InitNode(node *dag.Node) {}

func (t *LRCConstructAnyType) GenerateOp(op *dag.Node) (exec.Op, error) {
var inputIdxs []int
var outputIdxs []int
for _, in := range op.InputStreams {
inputIdxs = append(inputIdxs, ioswitch2.SProps(in).StreamIndex)
}
for _, out := range op.OutputStreams {
outputIdxs = append(outputIdxs, ioswitch2.SProps(out).StreamIndex)
}

l, err := lrc.New(t.LRC.N, t.LRC.K, t.LRC.Groups)
if err != nil {
return nil, err
}
coef, err := l.GenerateMatrix(inputIdxs, outputIdxs)
if err != nil {
return nil, err
}

return &GalMultiply{
Coef: coef,
Inputs: lo.Map(op.InputStreams, func(v *dag.StreamVar, idx int) *exec.StreamVar { return v.Var }),
Outputs: lo.Map(op.OutputStreams, func(v *dag.StreamVar, idx int) *exec.StreamVar { return v.Var }),
ChunkSize: t.LRC.ChunkSize,
}, nil
}

func (t *LRCConstructAnyType) AddInput(node *dag.Node, str *dag.StreamVar) {
node.InputStreams = append(node.InputStreams, str)
str.To(node, len(node.InputStreams)-1)
}

func (t *LRCConstructAnyType) NewOutput(node *dag.Node, dataIndex int) *dag.StreamVar {
return dag.NodeNewOutputStream(node, &ioswitch2.VarProps{StreamIndex: dataIndex})
}

func (t *LRCConstructAnyType) String(node *dag.Node) string {
return fmt.Sprintf("LRCAny[]%v%v", formatStreamIO(node), formatValueIO(node))
}

type LRCConstructGroupType struct {
LRC cdssdk.LRCRedundancy
TargetBlockIndex int
}

func (t *LRCConstructGroupType) InitNode(node *dag.Node) {
dag.NodeNewOutputStream(node, &ioswitchlrc.VarProps{
StreamIndex: t.TargetBlockIndex,
})

grpIdx := t.LRC.FindGroup(t.TargetBlockIndex)
dag.NodeDeclareInputStream(node, t.LRC.Groups[grpIdx])
}

func (t *LRCConstructGroupType) GenerateOp(op *dag.Node) (exec.Op, error) {
l, err := lrc.New(t.LRC.N, t.LRC.K, t.LRC.Groups)
if err != nil {
return nil, err
}
coef, err := l.GenerateGroupMatrix(t.TargetBlockIndex)
if err != nil {
return nil, err
}

return &GalMultiply{
Coef: coef,
Inputs: lo.Map(op.InputStreams, func(v *dag.StreamVar, idx int) *exec.StreamVar { return v.Var }),
Outputs: lo.Map(op.OutputStreams, func(v *dag.StreamVar, idx int) *exec.StreamVar { return v.Var }),
ChunkSize: t.LRC.ChunkSize,
}, nil
}

func (t *LRCConstructGroupType) String(node *dag.Node) string {
return fmt.Sprintf("LRCGroup[]%v%v", formatStreamIO(node), formatValueIO(node))
}

+ 129
- 0
common/pkgs/ioswitchlrc/ops2/ipfs.go View File

@@ -0,0 +1,129 @@
package ops2

import (
"context"
"fmt"
"io"

"gitlink.org.cn/cloudream/common/pkgs/future"
"gitlink.org.cn/cloudream/common/pkgs/ioswitch/dag"
"gitlink.org.cn/cloudream/common/pkgs/ioswitch/exec"
"gitlink.org.cn/cloudream/common/pkgs/ipfs"
"gitlink.org.cn/cloudream/common/pkgs/logger"
"gitlink.org.cn/cloudream/common/utils/io2"
stgglb "gitlink.org.cn/cloudream/storage/common/globals"
"gitlink.org.cn/cloudream/storage/common/pkgs/ioswitch2"
)

func init() {
exec.UseOp[*IPFSRead]()
exec.UseOp[*IPFSWrite]()
}

type IPFSRead struct {
Output *exec.StreamVar `json:"output"`
FileHash string `json:"fileHash"`
Option ipfs.ReadOption `json:"option"`
}

func (o *IPFSRead) Execute(ctx context.Context, e *exec.Executor) error {
logger.
WithField("FileHash", o.FileHash).
Debugf("ipfs read op")
defer logger.Debugf("ipfs read op finished")

ipfsCli, err := stgglb.IPFSPool.Acquire()
if err != nil {
return fmt.Errorf("new ipfs client: %w", err)
}
defer stgglb.IPFSPool.Release(ipfsCli)

file, err := ipfsCli.OpenRead(o.FileHash, o.Option)
if err != nil {
return fmt.Errorf("reading ipfs: %w", err)
}
defer file.Close()

fut := future.NewSetVoid()
o.Output.Stream = io2.AfterReadClosedOnce(file, func(closer io.ReadCloser) {
fut.SetVoid()
})
e.PutVars(o.Output)

return fut.Wait(ctx)
}

type IPFSWrite struct {
Input *exec.StreamVar `json:"input"`
FileHash *exec.StringVar `json:"fileHash"`
}

func (o *IPFSWrite) Execute(ctx context.Context, e *exec.Executor) error {
logger.
WithField("Input", o.Input.ID).
WithField("FileHashVar", o.FileHash.ID).
Debugf("ipfs write op")

ipfsCli, err := stgglb.IPFSPool.Acquire()
if err != nil {
return fmt.Errorf("new ipfs client: %w", err)
}
defer stgglb.IPFSPool.Release(ipfsCli)

err = e.BindVars(ctx, o.Input)
if err != nil {
return err
}
defer o.Input.Stream.Close()

o.FileHash.Value, err = ipfsCli.CreateFile(o.Input.Stream)
if err != nil {
return fmt.Errorf("creating ipfs file: %w", err)
}

e.PutVars(o.FileHash)

return nil
}

type IPFSReadType struct {
FileHash string
Option ipfs.ReadOption
}

func (t *IPFSReadType) InitNode(node *dag.Node) {
dag.NodeNewOutputStream(node, &ioswitch2.VarProps{})
}

func (t *IPFSReadType) GenerateOp(n *dag.Node) (exec.Op, error) {
return &IPFSRead{
Output: n.OutputStreams[0].Var,
FileHash: t.FileHash,
Option: t.Option,
}, nil
}

func (t *IPFSReadType) String(node *dag.Node) string {
return fmt.Sprintf("IPFSRead[%s,%v+%v]%v%v", t.FileHash, t.Option.Offset, t.Option.Length, formatStreamIO(node), formatValueIO(node))
}

type IPFSWriteType struct {
FileHashStoreKey string
Range exec.Range
}

func (t *IPFSWriteType) InitNode(node *dag.Node) {
dag.NodeDeclareInputStream(node, 1)
dag.NodeNewOutputValue(node, &ioswitch2.VarProps{})
}

func (t *IPFSWriteType) GenerateOp(op *dag.Node) (exec.Op, error) {
return &IPFSWrite{
Input: op.InputStreams[0].Var,
FileHash: op.OutputValues[0].Var.(*exec.StringVar),
}, nil
}

func (t *IPFSWriteType) String(node *dag.Node) string {
return fmt.Sprintf("IPFSWrite[%s,%v+%v]%v%v", t.FileHashStoreKey, t.Range.Offset, t.Range.Length, formatStreamIO(node), formatValueIO(node))
}

+ 75
- 0
common/pkgs/ioswitchlrc/ops2/ops.go View File

@@ -0,0 +1,75 @@
package ops2

import (
"fmt"

"gitlink.org.cn/cloudream/common/pkgs/ioswitch/dag"
)

func formatStreamIO(node *dag.Node) string {
is := ""
for i, in := range node.InputStreams {
if i > 0 {
is += ","
}

if in == nil {
is += "."
} else {
is += fmt.Sprintf("%v", in.ID)
}
}

os := ""
for i, out := range node.OutputStreams {
if i > 0 {
os += ","
}

if out == nil {
os += "."
} else {
os += fmt.Sprintf("%v", out.ID)
}
}

if is == "" && os == "" {
return ""
}

return fmt.Sprintf("S{%s>%s}", is, os)
}

func formatValueIO(node *dag.Node) string {
is := ""
for i, in := range node.InputValues {
if i > 0 {
is += ","
}

if in == nil {
is += "."
} else {
is += fmt.Sprintf("%v", in.ID)
}
}

os := ""
for i, out := range node.OutputValues {
if i > 0 {
os += ","
}

if out == nil {
os += "."
} else {
os += fmt.Sprintf("%v", out.ID)
}
}

if is == "" && os == "" {
return ""
}

return fmt.Sprintf("V{%s>%s}", is, os)
}

+ 95
- 0
common/pkgs/ioswitchlrc/ops2/range.go View File

@@ -0,0 +1,95 @@
package ops2

import (
"context"
"fmt"
"io"

"gitlink.org.cn/cloudream/common/pkgs/future"
"gitlink.org.cn/cloudream/common/pkgs/ioswitch/dag"
"gitlink.org.cn/cloudream/common/pkgs/ioswitch/exec"
"gitlink.org.cn/cloudream/common/utils/io2"
"gitlink.org.cn/cloudream/common/utils/math2"
"gitlink.org.cn/cloudream/storage/common/pkgs/ioswitch2"
)

func init() {
exec.UseOp[*Range]()
}

type Range struct {
Input *exec.StreamVar `json:"input"`
Output *exec.StreamVar `json:"output"`
Offset int64 `json:"offset"`
Length *int64 `json:"length"`
}

func (o *Range) Execute(ctx context.Context, e *exec.Executor) error {
err := e.BindVars(ctx, o.Input)
if err != nil {
return err
}
defer o.Input.Stream.Close()

buf := make([]byte, 1024*16)

// 跳过前Offset个字节
for o.Offset > 0 {
rdCnt := math2.Min(o.Offset, int64(len(buf)))
rd, err := o.Input.Stream.Read(buf[:rdCnt])
if err == io.EOF {
// 输入流不够长度也不报错,只是产生一个空的流
break
}
if err != nil {
return err
}
o.Offset -= int64(rd)
}

fut := future.NewSetVoid()

if o.Length == nil {
o.Output.Stream = io2.AfterEOF(o.Input.Stream, func(closer io.ReadCloser, err error) {
fut.SetVoid()
})

e.PutVars(o.Output)
return fut.Wait(ctx)
}

o.Output.Stream = io2.AfterEOF(io2.Length(o.Input.Stream, *o.Length), func(closer io.ReadCloser, err error) {
fut.SetVoid()
})

e.PutVars(o.Output)
err = fut.Wait(ctx)
if err != nil {
return err
}

io2.DropWithBuf(o.Input.Stream, buf)
return nil
}

type RangeType struct {
Range exec.Range
}

func (t *RangeType) InitNode(node *dag.Node) {
dag.NodeDeclareInputStream(node, 1)
dag.NodeNewOutputStream(node, &ioswitch2.VarProps{})
}

func (t *RangeType) GenerateOp(n *dag.Node) (exec.Op, error) {
return &Range{
Input: n.InputStreams[0].Var,
Output: n.OutputStreams[0].Var,
Offset: t.Range.Offset,
Length: t.Range.Length,
}, nil
}

func (t *RangeType) String(node *dag.Node) string {
return fmt.Sprintf("Range[%v+%v]%v%v", t.Range.Offset, t.Range.Length, formatStreamIO(node), formatValueIO(node))
}

+ 297
- 0
common/pkgs/ioswitchlrc/parser/generator.go View File

@@ -0,0 +1,297 @@
package parser

import (
"fmt"

"gitlink.org.cn/cloudream/common/pkgs/ioswitch/dag"
"gitlink.org.cn/cloudream/common/pkgs/ioswitch/exec"
"gitlink.org.cn/cloudream/common/pkgs/ioswitch/plan"
cdssdk "gitlink.org.cn/cloudream/common/sdks/storage"
"gitlink.org.cn/cloudream/storage/common/pkgs/ioswitchlrc"
"gitlink.org.cn/cloudream/storage/common/pkgs/ioswitchlrc/ops2"
)

type GenerateContext struct {
LRC cdssdk.LRCRedundancy
DAG *dag.Graph
Toes []ioswitchlrc.To
StreamRange exec.Range
}

// 输入一个完整文件,从这个完整文件产生任意文件块(也可再产生完整文件)。
func Encode(fr ioswitchlrc.From, toes []ioswitchlrc.To, blder *exec.PlanBuilder) error {
if fr.GetDataIndex() != -1 {
return fmt.Errorf("from data is not a complete file")
}

ctx := GenerateContext{
LRC: cdssdk.DefaultLRCRedundancy,
DAG: dag.NewGraph(),
Toes: toes,
}

calcStreamRange(&ctx)
err := buildDAGEncode(&ctx, fr, toes)
if err != nil {
return err
}

// 确定指令执行位置的过程,也需要反复进行,直到没有变化为止。
for pin(&ctx) {
}

// 下面这些只需要执行一次,但需要按顺序
dropUnused(&ctx)
storeIPFSWriteResult(&ctx)
generateClone(&ctx)
generateRange(&ctx)

return plan.Generate(ctx.DAG, blder)
}

func buildDAGEncode(ctx *GenerateContext, fr ioswitchlrc.From, toes []ioswitchlrc.To) error {
frNode, err := buildFromNode(ctx, fr)
if err != nil {
return fmt.Errorf("building from node: %w", err)
}

var dataToes []ioswitchlrc.To
var parityToes []ioswitchlrc.To

// 先创建需要完整文件的To节点,同时统计一下需要哪些文件块
for _, to := range toes {
if to.GetDataIndex() != -1 {
continue
}

toNode, err := buildToNode(ctx, to)
if err != nil {
return fmt.Errorf("building to node: %w", err)
}

idx := to.GetDataIndex()
if idx == -1 {
frNode.OutputStreams[0].To(toNode, 0)
} else if idx < ctx.LRC.K {
dataToes = append(dataToes, to)
} else {
parityToes = append(parityToes, to)
}
}

if len(dataToes) == 0 && len(parityToes) == 0 {
return nil
}

// 需要文件块,则生成Split指令
splitNode := ctx.DAG.NewNode(&ops2.ChunkedSplitType{
OutputCount: ctx.LRC.K,
ChunkSize: ctx.LRC.ChunkSize,
}, nil)

for _, to := range dataToes {
toNode, err := buildToNode(ctx, to)
if err != nil {
return fmt.Errorf("building to node: %w", err)
}

splitNode.OutputStreams[to.GetDataIndex()].To(toNode, 0)
}

if len(parityToes) == 0 {
return nil
}

// 需要校验块,则进一步生成Construct指令

conNode, conType := dag.NewNode(ctx.DAG, &ops2.LRCConstructAnyType{
LRC: ctx.LRC,
}, nil)

for _, out := range splitNode.OutputStreams {
conType.AddInput(conNode, out)
}

for _, to := range parityToes {
toNode, err := buildToNode(ctx, to)
if err != nil {
return fmt.Errorf("building to node: %w", err)
}

conType.NewOutput(conNode, to.GetDataIndex()).To(toNode, 0)
}
return nil
}

// 提供数据块+编码块中的k个块,重建任意块,包括完整文件。
func ReconstructAny(frs []ioswitchlrc.From, toes []ioswitchlrc.To, blder *exec.PlanBuilder) error {
ctx := GenerateContext{
LRC: cdssdk.DefaultLRCRedundancy,
DAG: dag.NewGraph(),
Toes: toes,
}

calcStreamRange(&ctx)
err := buildDAGReconstructAny(&ctx, frs, toes)
if err != nil {
return err
}

// 确定指令执行位置的过程,也需要反复进行,直到没有变化为止。
for pin(&ctx) {
}

// 下面这些只需要执行一次,但需要按顺序
dropUnused(&ctx)
storeIPFSWriteResult(&ctx)
generateClone(&ctx)
generateRange(&ctx)

return plan.Generate(ctx.DAG, blder)
}

func buildDAGReconstructAny(ctx *GenerateContext, frs []ioswitchlrc.From, toes []ioswitchlrc.To) error {
frNodes := make(map[int]*dag.Node)
for _, fr := range frs {
frNode, err := buildFromNode(ctx, fr)
if err != nil {
return fmt.Errorf("building from node: %w", err)
}

frNodes[fr.GetDataIndex()] = frNode
}

var completeToes []ioswitchlrc.To
var missedToes []ioswitchlrc.To

// 先创建需要完整文件的To节点,同时统计一下需要哪些文件块
for _, to := range toes {
toIdx := to.GetDataIndex()
fr := frNodes[toIdx]
if fr != nil {
node, err := buildToNode(ctx, to)
if err != nil {
return fmt.Errorf("building to node: %w", err)
}

fr.OutputStreams[0].To(node, 0)
continue
}

if toIdx == -1 {
completeToes = append(completeToes, to)
} else {
missedToes = append(missedToes, to)
}
}

if len(completeToes) == 0 && len(missedToes) == 0 {
return nil
}

// 生成Construct指令来恢复缺少的块

conNode, conType := dag.NewNode(ctx.DAG, &ops2.LRCConstructAnyType{
LRC: ctx.LRC,
}, nil)

for _, fr := range frNodes {
conType.AddInput(conNode, fr.OutputStreams[0])
}

for _, to := range missedToes {
toNode, err := buildToNode(ctx, to)
if err != nil {
return fmt.Errorf("building to node: %w", err)
}

conType.NewOutput(conNode, to.GetDataIndex()).To(toNode, 0)
}

if len(completeToes) == 0 {
return nil
}

// 需要完整文件,则生成Join指令

joinNode := ctx.DAG.NewNode(&ops2.ChunkedJoinType{
InputCount: ctx.LRC.K,
ChunkSize: ctx.LRC.ChunkSize,
}, nil)

for i := 0; i < ctx.LRC.K; i++ {
n := frNodes[i]
if n == nil {
conType.NewOutput(conNode, i).To(joinNode, i)
} else {
n.OutputStreams[0].To(joinNode, i)
}
}

for _, to := range completeToes {
toNode, err := buildToNode(ctx, to)
if err != nil {
return fmt.Errorf("building to node: %w", err)
}

joinNode.OutputStreams[0].To(toNode, 0)
}

return nil
}

// 输入同一组的多个块,恢复出剩下缺少的一个块。
func ReconstructGroup(frs []ioswitchlrc.From, toes []ioswitchlrc.To, blder *exec.PlanBuilder) error {
ctx := GenerateContext{
LRC: cdssdk.DefaultLRCRedundancy,
DAG: dag.NewGraph(),
Toes: toes,
}

calcStreamRange(&ctx)
err := buildDAGReconstructGroup(&ctx, frs, toes)
if err != nil {
return err
}

// 确定指令执行位置的过程,也需要反复进行,直到没有变化为止。
for pin(&ctx) {
}

// 下面这些只需要执行一次,但需要按顺序
dropUnused(&ctx)
storeIPFSWriteResult(&ctx)
generateClone(&ctx)
generateRange(&ctx)

return plan.Generate(ctx.DAG, blder)
}

func buildDAGReconstructGroup(ctx *GenerateContext, frs []ioswitchlrc.From, toes []ioswitchlrc.To) error {
missedGrpIdx := toes[0].GetDataIndex()

conNode := ctx.DAG.NewNode(&ops2.LRCConstructGroupType{
LRC: ctx.LRC,
TargetBlockIndex: missedGrpIdx,
}, nil)

for i, fr := range frs {
frNode, err := buildFromNode(ctx, fr)
if err != nil {
return fmt.Errorf("building from node: %w", err)
}

frNode.OutputStreams[0].To(conNode, i)
}

for _, to := range toes {
toNode, err := buildToNode(ctx, to)
if err != nil {
return fmt.Errorf("building to node: %w", err)
}

conNode.OutputStreams[0].To(toNode, 0)
}

return nil
}

+ 309
- 0
common/pkgs/ioswitchlrc/parser/passes.go View File

@@ -0,0 +1,309 @@
package parser

import (
"fmt"
"math"

"gitlink.org.cn/cloudream/common/pkgs/ioswitch/dag"
"gitlink.org.cn/cloudream/common/pkgs/ioswitch/exec"
"gitlink.org.cn/cloudream/common/pkgs/ioswitch/plan/ops"
"gitlink.org.cn/cloudream/common/pkgs/ipfs"
"gitlink.org.cn/cloudream/common/utils/math2"
"gitlink.org.cn/cloudream/storage/common/pkgs/ioswitchlrc"
"gitlink.org.cn/cloudream/storage/common/pkgs/ioswitchlrc/ops2"
)

// 计算输入流的打开范围。会把流的范围按条带大小取整
func calcStreamRange(ctx *GenerateContext) {
stripSize := int64(ctx.LRC.ChunkSize * ctx.LRC.K)

rng := exec.Range{
Offset: math.MaxInt64,
}

for _, to := range ctx.Toes {
if to.GetDataIndex() == -1 {
toRng := to.GetRange()
rng.ExtendStart(math2.Floor(toRng.Offset, stripSize))
if toRng.Length != nil {
rng.ExtendEnd(math2.Ceil(toRng.Offset+*toRng.Length, stripSize))
} else {
rng.Length = nil
}

} else {
toRng := to.GetRange()

blkStartIndex := math2.FloorDiv(toRng.Offset, int64(ctx.LRC.ChunkSize))
rng.ExtendStart(blkStartIndex * stripSize)
if toRng.Length != nil {
blkEndIndex := math2.CeilDiv(toRng.Offset+*toRng.Length, int64(ctx.LRC.ChunkSize))
rng.ExtendEnd(blkEndIndex * stripSize)
} else {
rng.Length = nil
}
}
}

ctx.StreamRange = rng
}

func buildFromNode(ctx *GenerateContext, f ioswitchlrc.From) (*dag.Node, error) {
var repRange exec.Range
var blkRange exec.Range

repRange.Offset = ctx.StreamRange.Offset
blkRange.Offset = ctx.StreamRange.Offset / int64(ctx.LRC.ChunkSize*ctx.LRC.K) * int64(ctx.LRC.ChunkSize)
if ctx.StreamRange.Length != nil {
repRngLen := *ctx.StreamRange.Length
repRange.Length = &repRngLen

blkRngLen := *ctx.StreamRange.Length / int64(ctx.LRC.ChunkSize*ctx.LRC.K) * int64(ctx.LRC.ChunkSize)
blkRange.Length = &blkRngLen
}

switch f := f.(type) {
case *ioswitchlrc.FromNode:
n, t := dag.NewNode(ctx.DAG, &ops2.IPFSReadType{
FileHash: f.FileHash,
Option: ipfs.ReadOption{
Offset: 0,
Length: -1,
},
}, &ioswitchlrc.NodeProps{
From: f,
})
ioswitchlrc.SProps(n.OutputStreams[0]).StreamIndex = f.DataIndex

if f.DataIndex == -1 {
t.Option.Offset = repRange.Offset
if repRange.Length != nil {
t.Option.Length = *repRange.Length
}
} else {
t.Option.Offset = blkRange.Offset
if blkRange.Length != nil {
t.Option.Length = *blkRange.Length
}
}

if f.Node != nil {
n.Env.ToEnvWorker(&ioswitchlrc.AgentWorker{Node: *f.Node})
}

return n, nil

case *ioswitchlrc.FromDriver:
n, _ := dag.NewNode(ctx.DAG, &ops.FromDriverType{Handle: f.Handle}, &ioswitchlrc.NodeProps{From: f})
n.Env.ToEnvDriver()
ioswitchlrc.SProps(n.OutputStreams[0]).StreamIndex = f.DataIndex

if f.DataIndex == -1 {
f.Handle.RangeHint.Offset = repRange.Offset
f.Handle.RangeHint.Length = repRange.Length
} else {
f.Handle.RangeHint.Offset = blkRange.Offset
f.Handle.RangeHint.Length = blkRange.Length
}

return n, nil

default:
return nil, fmt.Errorf("unsupported from type %T", f)
}
}

func buildToNode(ctx *GenerateContext, t ioswitchlrc.To) (*dag.Node, error) {
switch t := t.(type) {
case *ioswitchlrc.ToNode:
n, _ := dag.NewNode(ctx.DAG, &ops2.IPFSWriteType{
FileHashStoreKey: t.FileHashStoreKey,
Range: t.Range,
}, &ioswitchlrc.NodeProps{
To: t,
})

return n, nil

case *ioswitchlrc.ToDriver:
n, _ := dag.NewNode(ctx.DAG, &ops.ToDriverType{Handle: t.Handle, Range: t.Range}, &ioswitchlrc.NodeProps{To: t})
n.Env.ToEnvDriver()

return n, nil

default:
return nil, fmt.Errorf("unsupported to type %T", t)
}
}

// 通过流的输入输出位置来确定指令的执行位置。
// To系列的指令都会有固定的执行位置,这些位置会随着pin操作逐步扩散到整个DAG,
// 所以理论上不会出现有指令的位置始终无法确定的情况。
func pin(ctx *GenerateContext) bool {
changed := false
ctx.DAG.Walk(func(node *dag.Node) bool {
var toEnv *dag.NodeEnv
for _, out := range node.OutputStreams {
for _, to := range out.Toes {
if to.Node.Env.Type == dag.EnvUnknown {
continue
}

if toEnv == nil {
toEnv = &to.Node.Env
} else if !toEnv.Equals(to.Node.Env) {
toEnv = nil
break
}
}
}

if toEnv != nil {
if !node.Env.Equals(*toEnv) {
changed = true
}

node.Env = *toEnv
return true
}

// 否则根据输入流的始发地来固定
var fromEnv *dag.NodeEnv
for _, in := range node.InputStreams {
if in.From.Node.Env.Type == dag.EnvUnknown {
continue
}

if fromEnv == nil {
fromEnv = &in.From.Node.Env
} else if !fromEnv.Equals(in.From.Node.Env) {
fromEnv = nil
break
}
}

if fromEnv != nil {
if !node.Env.Equals(*fromEnv) {
changed = true
}

node.Env = *fromEnv
}
return true
})

return changed
}

// 对于所有未使用的流,增加Drop指令
func dropUnused(ctx *GenerateContext) {
ctx.DAG.Walk(func(node *dag.Node) bool {
for _, out := range node.OutputStreams {
if len(out.Toes) == 0 {
n := ctx.DAG.NewNode(&ops.DropType{}, &ioswitchlrc.NodeProps{})
n.Env = node.Env
out.To(n, 0)
}
}
return true
})
}

// 为IPFS写入指令存储结果
func storeIPFSWriteResult(ctx *GenerateContext) {
dag.WalkOnlyType[*ops2.IPFSWriteType](ctx.DAG, func(node *dag.Node, typ *ops2.IPFSWriteType) bool {
if typ.FileHashStoreKey == "" {
return true
}

n := ctx.DAG.NewNode(&ops.StoreType{
StoreKey: typ.FileHashStoreKey,
}, &ioswitchlrc.NodeProps{})
n.Env.ToEnvDriver()

node.OutputValues[0].To(n, 0)
return true
})
}

// 生成Range指令。StreamRange可能超过文件总大小,但Range指令会在数据量不够时不报错而是正常返回
func generateRange(ctx *GenerateContext) {
ctx.DAG.Walk(func(node *dag.Node) bool {
props := ioswitchlrc.NProps(node)
if props.To == nil {
return true
}

toDataIdx := props.To.GetDataIndex()
toRng := props.To.GetRange()

if toDataIdx == -1 {
n := ctx.DAG.NewNode(&ops2.RangeType{
Range: exec.Range{
Offset: toRng.Offset - ctx.StreamRange.Offset,
Length: toRng.Length,
},
}, &ioswitchlrc.NodeProps{})
n.Env = node.InputStreams[0].From.Node.Env

node.InputStreams[0].To(n, 0)
node.InputStreams[0].NotTo(node)
n.OutputStreams[0].To(node, 0)

} else {
stripSize := int64(ctx.LRC.ChunkSize * ctx.LRC.K)
blkStartIdx := ctx.StreamRange.Offset / stripSize

blkStart := blkStartIdx * int64(ctx.LRC.ChunkSize)

n := ctx.DAG.NewNode(&ops2.RangeType{
Range: exec.Range{
Offset: toRng.Offset - blkStart,
Length: toRng.Length,
},
}, &ioswitchlrc.NodeProps{})
n.Env = node.InputStreams[0].From.Node.Env

node.InputStreams[0].To(n, 0)
node.InputStreams[0].NotTo(node)
n.OutputStreams[0].To(node, 0)
}

return true
})
}

// 生成Clone指令
func generateClone(ctx *GenerateContext) {
ctx.DAG.Walk(func(node *dag.Node) bool {
for _, out := range node.OutputStreams {
if len(out.Toes) <= 1 {
continue
}

n, t := dag.NewNode(ctx.DAG, &ops2.CloneStreamType{}, &ioswitchlrc.NodeProps{})
n.Env = node.Env
for _, to := range out.Toes {
t.NewOutput(node).To(to.Node, to.SlotIndex)
}
out.Toes = nil
out.To(n, 0)
}

for _, out := range node.OutputValues {
if len(out.Toes) <= 1 {
continue
}

n, t := dag.NewNode(ctx.DAG, &ops2.CloneVarType{}, &ioswitchlrc.NodeProps{})
n.Env = node.Env
for _, to := range out.Toes {
t.NewOutput(node).To(to.Node, to.SlotIndex)
}
out.Toes = nil
out.To(n, 0)
}

return true
})
}

+ 17
- 0
common/pkgs/ioswitchlrc/utils.go View File

@@ -0,0 +1,17 @@
package ioswitchlrc

import (
"gitlink.org.cn/cloudream/common/pkgs/ioswitch/dag"
)

func NProps(n *dag.Node) *NodeProps {
return dag.NProps[*NodeProps](n)
}

func SProps(str *dag.StreamVar) *VarProps {
return dag.SProps[*VarProps](str)
}

func VProps(v *dag.ValueVar) *VarProps {
return dag.VProps[*VarProps](v)
}

Loading…
Cancel
Save