diff --git a/client/internal/cmdline/test.go b/client/internal/cmdline/test.go index 0e8d167..1fb7427 100644 --- a/client/internal/cmdline/test.go +++ b/client/internal/cmdline/test.go @@ -10,14 +10,15 @@ import ( "gitlink.org.cn/cloudream/common/pkgs/ioswitch/exec" cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" stgglb "gitlink.org.cn/cloudream/storage/common/globals" - "gitlink.org.cn/cloudream/storage/common/pkgs/downloader" "gitlink.org.cn/cloudream/storage/common/pkgs/ioswitch2" "gitlink.org.cn/cloudream/storage/common/pkgs/ioswitch2/parser" + "gitlink.org.cn/cloudream/storage/common/pkgs/ioswitchlrc" + lrcparser "gitlink.org.cn/cloudream/storage/common/pkgs/ioswitchlrc/parser" coormq "gitlink.org.cn/cloudream/storage/common/pkgs/mq/coordinator" ) func init() { - cmd := &cobra.Command{ + rootCmd.AddCommand(&cobra.Command{ Use: "test2", Short: "test2", // Args: cobra.ExactArgs(1), @@ -36,22 +37,22 @@ func init() { ft := ioswitch2.NewFromTo() - // ft.AddFrom(plans.NewFromNode("Qmf412jQZiuVUtdgnB36FXFX7xg5V6KEbSJ4dpQuhkLyfD", &nodes.Nodes[0], -1)) - // ft.AddTo(plans.NewToNode(nodes.Nodes[1], -1, "asd")) - // len := int64(3) - // toExec, hd := plans.NewToExecutorWithRange(-1, plans.Range{Offset: 5, Length: &len}) - // ft.AddTo(toExec) - // ft.AddTo(plans.NewToNode(nodes.Nodes[1], 0, "0")) - // ft.AddTo(plans.NewToNode(nodes.Nodes[1], 1, "1")) - // ft.AddTo(plans.NewToNode(nodes.Nodes[1], 2, "2")) - - ft.AddFrom(ioswitch2.NewFromNode("QmS2s8GRYHEurXL7V1zUtKvf2H1BGcQc5NN1T1hiSnWvbd", &nodes.Nodes[0], 1)) - ft.AddFrom(ioswitch2.NewFromNode("QmUgUEUMzdnjPNx6xu9PDGXpSyXTk8wzPWvyYZ9zasE1WW", &nodes.Nodes[1], 2)) + ft.AddFrom(ioswitch2.NewFromNode("Qmf412jQZiuVUtdgnB36FXFX7xg5V6KEbSJ4dpQuhkLyfD", &nodes.Nodes[0], -1)) + ft.AddTo(ioswitch2.NewToNode(nodes.Nodes[1], -1, "asd")) le := int64(3) toExec, hd := ioswitch2.NewToDriverWithRange(-1, exec.Range{Offset: 5, Length: &le}) + ft.AddTo(toExec) + ft.AddTo(ioswitch2.NewToNode(nodes.Nodes[1], 0, "0")) + ft.AddTo(ioswitch2.NewToNode(nodes.Nodes[1], 1, "1")) + ft.AddTo(ioswitch2.NewToNode(nodes.Nodes[1], 2, "2")) + + // ft.AddFrom(ioswitch2.NewFromNode("QmS2s8GRYHEurXL7V1zUtKvf2H1BGcQc5NN1T1hiSnWvbd", &nodes.Nodes[0], 1)) + // ft.AddFrom(ioswitch2.NewFromNode("QmUgUEUMzdnjPNx6xu9PDGXpSyXTk8wzPWvyYZ9zasE1WW", &nodes.Nodes[1], 2)) + // le := int64(5) + // toExec, hd := ioswitch2.NewToDriverWithRange(-1, exec.Range{Offset: 3, Length: &le}) // toExec, hd := plans.NewToExecutorWithRange(1, plans.Range{Offset: 0, Length: nil}) // toExec2, hd2 := plans.NewToExecutorWithRange(2, plans.Range{Offset: 0, Length: nil}) - ft.AddTo(toExec) + // ft.AddTo(toExec) // ft.AddTo(toExec2) // fromExec, hd := plans.NewFromExecutor(-1) @@ -99,23 +100,151 @@ func init() { fut.Wait(context.TODO()) }, - } + }) + + // rootCmd.AddCommand(&cobra.Command{ + // Use: "test", + // Short: "test", + // // Args: cobra.ExactArgs(1), + // Run: func(cmd *cobra.Command, args []string) { + // cmdCtx := GetCmdCtx(cmd) + // file, _ := cmdCtx.Cmdline.Svc.ObjectSvc().Download(1, downloader.DownloadReqeust{ + // ObjectID: 27379, + // Length: -1, + // }) + // data, _ := io.ReadAll(file.File) + // fmt.Printf("data: %v(%v)\n", string(data), len(data)) + // }, + // }) + + rootCmd.AddCommand(&cobra.Command{ + Use: "test3", + Short: "test3", + // Args: cobra.ExactArgs(1), + Run: func(cmd *cobra.Command, args []string) { + // cmdCtx := GetCmdCtx(cmd) + + coorCli, err := stgglb.CoordinatorMQPool.Acquire() + if err != nil { + panic(err) + } + defer stgglb.CoordinatorMQPool.Release(coorCli) + + nodes, err := coorCli.GetNodes(coormq.NewGetNodes([]cdssdk.NodeID{1, 2})) + if err != nil { + panic(err) + } + + red := cdssdk.DefaultLRCRedundancy + + var toes []ioswitchlrc.To + for i := 0; i < red.N; i++ { + toes = append(toes, ioswitchlrc.NewToNode(nodes.Nodes[i%2], i, fmt.Sprintf("%d", i))) + } + + plans := exec.NewPlanBuilder() + err = lrcparser.Encode(ioswitchlrc.NewFromNode("QmNspjDLxQbAsuh37jRXKvLWHE2f7JpqY4HEJ8x7Jgbzqa", &nodes.Nodes[0], -1), toes, plans) + if err != nil { + panic(err) + // return nil, fmt.Errorf("parsing plan: %w", err) + } + + ioRet, err := plans.Execute().Wait(context.TODO()) + if err != nil { + panic(err) + // return nil, fmt.Errorf("executing io plan: %w", err) + } + + fmt.Printf("ioRet: %v\n", ioRet) + }, + }) - cmd2 := &cobra.Command{ + rootCmd.AddCommand(&cobra.Command{ Use: "test", Short: "test", // Args: cobra.ExactArgs(1), Run: func(cmd *cobra.Command, args []string) { - cmdCtx := GetCmdCtx(cmd) - file, _ := cmdCtx.Cmdline.Svc.ObjectSvc().Download(1, downloader.DownloadReqeust{ - ObjectID: 27379, - Length: -1, - }) - data, _ := io.ReadAll(file.File) - fmt.Printf("data: %v(%v)\n", string(data), len(data)) + // cmdCtx := GetCmdCtx(cmd) + + coorCli, err := stgglb.CoordinatorMQPool.Acquire() + if err != nil { + panic(err) + } + defer stgglb.CoordinatorMQPool.Release(coorCli) + + nodes, err := coorCli.GetNodes(coormq.NewGetNodes([]cdssdk.NodeID{1, 2})) + if err != nil { + panic(err) + } + + // red := cdssdk.DefaultLRCRedundancy + + plans := exec.NewPlanBuilder() + err = lrcparser.ReconstructGroup([]ioswitchlrc.From{ + ioswitchlrc.NewFromNode("QmVAZzVQEvnvTvzSz2SvpziAcDSQ8aYCoTyGrZNuV8raEQ", &nodes.Nodes[1], 0), + ioswitchlrc.NewFromNode("QmVAZzVQEvnvTvzSz2SvpziAcDSQ8aYCoTyGrZNuV8raEQ", &nodes.Nodes[1], 1), + }, []ioswitchlrc.To{ + ioswitchlrc.NewToNode(nodes.Nodes[1], 3, "3"), + }, plans) + if err != nil { + panic(err) + // return nil, fmt.Errorf("parsing plan: %w", err) + } + + ioRet, err := plans.Execute().Wait(context.TODO()) + if err != nil { + panic(err) + // return nil, fmt.Errorf("executing io plan: %w", err) + } + + fmt.Printf("ioRet: %v\n", ioRet) }, - } + }) + + rootCmd.AddCommand(&cobra.Command{ + Use: "test4", + Short: "test4", + // Args: cobra.ExactArgs(1), + Run: func(cmd *cobra.Command, args []string) { + // cmdCtx := GetCmdCtx(cmd) - rootCmd.AddCommand(cmd) - rootCmd.AddCommand(cmd2) + coorCli, err := stgglb.CoordinatorMQPool.Acquire() + if err != nil { + panic(err) + } + defer stgglb.CoordinatorMQPool.Release(coorCli) + + nodes, err := coorCli.GetNodes(coormq.NewGetNodes([]cdssdk.NodeID{1, 2})) + if err != nil { + panic(err) + } + + // red := cdssdk.DefaultLRCRedundancy + + plans := exec.NewPlanBuilder() + le := int64(1293) + err = lrcparser.ReconstructAny([]ioswitchlrc.From{ + ioswitchlrc.NewFromNode("QmVAZzVQEvnvTvzSz2SvpziAcDSQ8aYCoTyGrZNuV8raEQ", &nodes.Nodes[0], 0), + ioswitchlrc.NewFromNode("QmQBKncEDqxw3BrGr3th3gS3jUC2fizGz1w29ZxxrrKfNv", &nodes.Nodes[0], 2), + }, []ioswitchlrc.To{ + ioswitchlrc.NewToNodeWithRange(nodes.Nodes[1], -1, "-1", exec.Range{0, &le}), + ioswitchlrc.NewToNode(nodes.Nodes[1], 0, "0"), + ioswitchlrc.NewToNode(nodes.Nodes[1], 1, "1"), + ioswitchlrc.NewToNode(nodes.Nodes[1], 2, "2"), + ioswitchlrc.NewToNode(nodes.Nodes[1], 3, "3"), + }, plans) + if err != nil { + panic(err) + // return nil, fmt.Errorf("parsing plan: %w", err) + } + + ioRet, err := plans.Execute().Wait(context.TODO()) + if err != nil { + panic(err) + // return nil, fmt.Errorf("executing io plan: %w", err) + } + + fmt.Printf("ioRet: %v\n", ioRet) + }, + }) } diff --git a/common/pkgs/downloader/iterator.go b/common/pkgs/downloader/iterator.go index b25c59c..ff6cf3c 100644 --- a/common/pkgs/downloader/iterator.go +++ b/common/pkgs/downloader/iterator.go @@ -151,6 +151,18 @@ func (iter *DownloadObjectIterator) doMove() (*Downloading, error) { return nil, fmt.Errorf("downloading ec object: %w", err) } + return &Downloading{ + Object: &req.Detail.Object, + File: reader, + Request: req.Raw, + }, nil + + case *cdssdk.LRCRedundancy: + reader, err := iter.downloadLRCObject(req, red) + if err != nil { + return nil, fmt.Errorf("downloading lrc object: %w", err) + } + return &Downloading{ Object: &req.Detail.Object, File: reader, diff --git a/common/pkgs/downloader/lrc.go b/common/pkgs/downloader/lrc.go new file mode 100644 index 0000000..843bd9e --- /dev/null +++ b/common/pkgs/downloader/lrc.go @@ -0,0 +1,87 @@ +package downloader + +import ( + "fmt" + "io" + + "gitlink.org.cn/cloudream/common/pkgs/iterator" + "gitlink.org.cn/cloudream/common/pkgs/logger" + cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" + "gitlink.org.cn/cloudream/common/utils/io2" + "gitlink.org.cn/cloudream/common/utils/math2" +) + +func (iter *DownloadObjectIterator) downloadLRCObject(req downloadReqeust2, red *cdssdk.LRCRedundancy) (io.ReadCloser, error) { + allNodes, err := iter.sortDownloadNodes(req) + if err != nil { + return nil, err + } + + var blocks []downloadBlock + selectedBlkIdx := make(map[int]bool) + for _, node := range allNodes { + for _, b := range node.Blocks { + if b.Index >= red.M() || selectedBlkIdx[b.Index] { + continue + } + blocks = append(blocks, downloadBlock{ + Node: node.Node, + Block: b, + }) + selectedBlkIdx[b.Index] = true + } + } + if len(blocks) < red.K { + return nil, fmt.Errorf("not enough blocks to download lrc object") + } + + var logStrs []any = []any{"downloading lrc object from blocks: "} + for i, b := range blocks { + if i > 0 { + logStrs = append(logStrs, ", ") + } + logStrs = append(logStrs, fmt.Sprintf("%v@%v(%v)", b.Block.Index, b.Node.Name, b.Node.NodeID)) + } + logger.Debug(logStrs...) + + pr, pw := io.Pipe() + go func() { + readPos := req.Raw.Offset + totalReadLen := req.Detail.Object.Size - req.Raw.Offset + if req.Raw.Length >= 0 { + totalReadLen = math2.Min(req.Raw.Length, totalReadLen) + } + + firstStripIndex := readPos / int64(red.K) / int64(red.ChunkSize) + stripIter := NewLRCStripIterator(req.Detail.Object, blocks, red, firstStripIndex, iter.downloader.strips, iter.downloader.cfg.ECStripPrefetchCount) + defer stripIter.Close() + + for totalReadLen > 0 { + strip, err := stripIter.MoveNext() + if err == iterator.ErrNoMoreItem { + pw.CloseWithError(io.ErrUnexpectedEOF) + return + } + if err != nil { + pw.CloseWithError(err) + return + } + + readRelativePos := readPos - strip.Position + nextStripPos := strip.Position + int64(red.K)*int64(red.ChunkSize) + curReadLen := math2.Min(totalReadLen, nextStripPos-readPos) + + err = io2.WriteAll(pw, strip.Data[readRelativePos:readRelativePos+curReadLen]) + if err != nil { + pw.CloseWithError(err) + return + } + + totalReadLen -= curReadLen + readPos += curReadLen + } + pw.Close() + }() + + return pr, nil +} diff --git a/common/pkgs/downloader/lrc_strip_iterator.go b/common/pkgs/downloader/lrc_strip_iterator.go new file mode 100644 index 0000000..1309014 --- /dev/null +++ b/common/pkgs/downloader/lrc_strip_iterator.go @@ -0,0 +1,191 @@ +package downloader + +import ( + "context" + "io" + "sync" + + "gitlink.org.cn/cloudream/common/pkgs/ioswitch/exec" + "gitlink.org.cn/cloudream/common/pkgs/iterator" + "gitlink.org.cn/cloudream/common/pkgs/logger" + cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" + "gitlink.org.cn/cloudream/storage/common/pkgs/ioswitchlrc" + "gitlink.org.cn/cloudream/storage/common/pkgs/ioswitchlrc/parser" +) + +type LRCStripIterator struct { + object cdssdk.Object + blocks []downloadBlock + red *cdssdk.LRCRedundancy + curStripIndex int64 + cache *StripCache + dataChan chan dataChanEntry + downloadingDone chan any + downloadingDoneOnce sync.Once + inited bool +} + +func NewLRCStripIterator(object cdssdk.Object, blocks []downloadBlock, red *cdssdk.LRCRedundancy, beginStripIndex int64, cache *StripCache, maxPrefetch int) *LRCStripIterator { + if maxPrefetch <= 0 { + maxPrefetch = 1 + } + + iter := &LRCStripIterator{ + object: object, + blocks: blocks, + red: red, + curStripIndex: beginStripIndex, + cache: cache, + dataChan: make(chan dataChanEntry, maxPrefetch-1), + downloadingDone: make(chan any), + } + + return iter +} + +func (s *LRCStripIterator) MoveNext() (Strip, error) { + if !s.inited { + go s.downloading() + s.inited = true + } + + // 先尝试获取一下,用于判断本次获取是否发生了等待 + select { + case entry, ok := <-s.dataChan: + if !ok || entry.Error == io.EOF { + return Strip{}, iterator.ErrNoMoreItem + } + + if entry.Error != nil { + return Strip{}, entry.Error + } + + s.curStripIndex++ + return Strip{Data: entry.Data, Position: entry.Position}, nil + + default: + logger.Debugf("waitting for ec strip %v of object %v", s.curStripIndex, s.object.ObjectID) + } + + // 发生了等待 + select { + case entry, ok := <-s.dataChan: + if !ok || entry.Error == io.EOF { + return Strip{}, iterator.ErrNoMoreItem + } + + if entry.Error != nil { + return Strip{}, entry.Error + } + + s.curStripIndex++ + return Strip{Data: entry.Data, Position: entry.Position}, nil + + case <-s.downloadingDone: + return Strip{}, iterator.ErrNoMoreItem + } +} + +func (s *LRCStripIterator) Close() { + s.downloadingDoneOnce.Do(func() { + close(s.downloadingDone) + }) +} + +func (s *LRCStripIterator) downloading() { + var froms []ioswitchlrc.From + for _, b := range s.blocks { + froms = append(froms, ioswitchlrc.NewFromNode(b.Block.FileHash, &b.Node, b.Block.Index)) + } + + toExec, hd := ioswitchlrc.NewToDriverWithRange(-1, exec.Range{ + Offset: s.curStripIndex * int64(s.red.ChunkSize*s.red.K), + }) + + plans := exec.NewPlanBuilder() + err := parser.ReconstructAny(froms, []ioswitchlrc.To{toExec}, plans) + if err != nil { + s.sendToDataChan(dataChanEntry{Error: err}) + return + } + exec := plans.Execute() + + ctx, cancel := context.WithCancel(context.Background()) + go exec.Wait(ctx) + defer cancel() + + str, err := exec.BeginRead(hd) + if err != nil { + s.sendToDataChan(dataChanEntry{Error: err}) + return + } + + curStripIndex := s.curStripIndex +loop: + for { + stripBytesPos := curStripIndex * int64(s.red.K) * int64(s.red.ChunkSize) + if stripBytesPos >= s.object.Size { + s.sendToDataChan(dataChanEntry{Error: io.EOF}) + break + } + + stripKey := ECStripKey{ + ObjectID: s.object.ObjectID, + StripIndex: curStripIndex, + } + + item, ok := s.cache.Get(stripKey) + if ok { + if item.ObjectFileHash == s.object.FileHash { + if !s.sendToDataChan(dataChanEntry{Data: item.Data, Position: stripBytesPos}) { + break loop + } + curStripIndex++ + continue + + } else { + // 如果Object的Hash和Cache的Hash不一致,说明Cache是无效的,需要重新下载 + s.cache.Remove(stripKey) + } + } + + dataBuf := make([]byte, int64(s.red.K*s.red.ChunkSize)) + n, err := io.ReadFull(str, dataBuf) + if err == io.ErrUnexpectedEOF { + s.cache.Add(stripKey, ObjectECStrip{ + Data: dataBuf, + ObjectFileHash: s.object.FileHash, + }) + + s.sendToDataChan(dataChanEntry{Data: dataBuf[:n], Position: stripBytesPos}) + s.sendToDataChan(dataChanEntry{Error: io.EOF}) + break loop + } + if err != nil { + s.sendToDataChan(dataChanEntry{Error: err}) + break loop + } + + s.cache.Add(stripKey, ObjectECStrip{ + Data: dataBuf, + ObjectFileHash: s.object.FileHash, + }) + + if !s.sendToDataChan(dataChanEntry{Data: dataBuf, Position: stripBytesPos}) { + break loop + } + + curStripIndex++ + } + + close(s.dataChan) +} + +func (s *LRCStripIterator) sendToDataChan(entry dataChanEntry) bool { + select { + case s.dataChan <- entry: + return true + case <-s.downloadingDone: + return false + } +} diff --git a/common/pkgs/ec/lrc/lrc.go b/common/pkgs/ec/lrc/lrc.go index 6ef6c5a..586dd1f 100644 --- a/common/pkgs/ec/lrc/lrc.go +++ b/common/pkgs/ec/lrc/lrc.go @@ -16,7 +16,7 @@ func New(n int, k int, groups []int) (*LRC, error) { groups: groups, } - l, err := reedsolomon.NewLRC(k, n-k, groups) + l, err := reedsolomon.NewLRC(k, n-len(groups)-k, groups) if err != nil { return nil, err } diff --git a/common/pkgs/ioswitch2/ops2/chunked.go b/common/pkgs/ioswitch2/ops2/chunked.go index 3533a55..0962e11 100644 --- a/common/pkgs/ioswitch2/ops2/chunked.go +++ b/common/pkgs/ioswitch2/ops2/chunked.go @@ -9,6 +9,7 @@ import ( "gitlink.org.cn/cloudream/common/pkgs/future" "gitlink.org.cn/cloudream/common/pkgs/ioswitch/dag" "gitlink.org.cn/cloudream/common/pkgs/ioswitch/exec" + "gitlink.org.cn/cloudream/common/pkgs/ioswitch/utils" "gitlink.org.cn/cloudream/common/utils/io2" "gitlink.org.cn/cloudream/storage/common/pkgs/ioswitch2" "golang.org/x/sync/semaphore" @@ -50,6 +51,16 @@ func (o *ChunkedSplit) Execute(ctx context.Context, e *exec.Executor) error { return sem.Acquire(ctx, int64(len(outputs))) } +func (o *ChunkedSplit) String() string { + return fmt.Sprintf( + "ChunkedSplit(chunkSize=%v, paddingZeros=%v), %v -> (%v)", + o.ChunkSize, + o.PaddingZeros, + o.Input.ID, + utils.FormatVarIDs(o.Outputs), + ) +} + type ChunkedJoin struct { Inputs []*exec.StreamVar `json:"inputs"` Output *exec.StreamVar `json:"output"` @@ -81,6 +92,15 @@ func (o *ChunkedJoin) Execute(ctx context.Context, e *exec.Executor) error { return fut.Wait(ctx) } +func (o *ChunkedJoin) String() string { + return fmt.Sprintf( + "ChunkedJoin(chunkSize=%v), (%v) -> %v", + o.ChunkSize, + utils.FormatVarIDs(o.Inputs), + o.Output.ID, + ) +} + type ChunkedSplitType struct { OutputCount int ChunkSize int diff --git a/common/pkgs/ioswitch2/ops2/clone.go b/common/pkgs/ioswitch2/ops2/clone.go index d577b53..45129a8 100644 --- a/common/pkgs/ioswitch2/ops2/clone.go +++ b/common/pkgs/ioswitch2/ops2/clone.go @@ -8,6 +8,7 @@ import ( "github.com/samber/lo" "gitlink.org.cn/cloudream/common/pkgs/ioswitch/dag" "gitlink.org.cn/cloudream/common/pkgs/ioswitch/exec" + "gitlink.org.cn/cloudream/common/pkgs/ioswitch/utils" "gitlink.org.cn/cloudream/common/utils/io2" "golang.org/x/sync/semaphore" ) @@ -18,30 +19,34 @@ func init() { } type CloneStream struct { - Input *exec.StreamVar `json:"input"` - Outputs []*exec.StreamVar `json:"outputs"` + Raw *exec.StreamVar `json:"raw"` + Cloneds []*exec.StreamVar `json:"cloneds"` } func (o *CloneStream) Execute(ctx context.Context, e *exec.Executor) error { - err := e.BindVars(ctx, o.Input) + err := e.BindVars(ctx, o.Raw) if err != nil { return err } - defer o.Input.Stream.Close() + defer o.Raw.Stream.Close() - cloned := io2.Clone(o.Input.Stream, len(o.Outputs)) + cloned := io2.Clone(o.Raw.Stream, len(o.Cloneds)) - sem := semaphore.NewWeighted(int64(len(o.Outputs))) + sem := semaphore.NewWeighted(int64(len(o.Cloneds))) for i, s := range cloned { sem.Acquire(ctx, 1) - o.Outputs[i].Stream = io2.AfterReadClosedOnce(s, func(closer io.ReadCloser) { + o.Cloneds[i].Stream = io2.AfterReadClosedOnce(s, func(closer io.ReadCloser) { sem.Release(1) }) } - exec.PutArrayVars(e, o.Outputs) + exec.PutArrayVars(e, o.Cloneds) + + return sem.Acquire(ctx, int64(len(o.Cloneds))) +} - return sem.Acquire(ctx, int64(len(o.Outputs))) +func (o *CloneStream) String() string { + return fmt.Sprintf("CloneStream %v -> (%v)", o.Raw.ID, utils.FormatVarIDs(o.Cloneds)) } type CloneVar struct { @@ -65,6 +70,10 @@ func (o *CloneVar) Execute(ctx context.Context, e *exec.Executor) error { return nil } +func (o *CloneVar) String() string { + return fmt.Sprintf("CloneStream %v -> (%v)", o.Raw.GetID(), utils.FormatVarIDs(o.Cloneds)) +} + type CloneStreamType struct{} func (t *CloneStreamType) InitNode(node *dag.Node) { @@ -73,8 +82,8 @@ func (t *CloneStreamType) InitNode(node *dag.Node) { func (t *CloneStreamType) GenerateOp(op *dag.Node) (exec.Op, error) { return &CloneStream{ - Input: op.InputStreams[0].Var, - Outputs: lo.Map(op.OutputStreams, func(v *dag.StreamVar, idx int) *exec.StreamVar { + Raw: op.InputStreams[0].Var, + Cloneds: lo.Map(op.OutputStreams, func(v *dag.StreamVar, idx int) *exec.StreamVar { return v.Var }), }, nil diff --git a/common/pkgs/ioswitch2/ops2/ec.go b/common/pkgs/ioswitch2/ops2/ec.go index 192d79d..32f8163 100644 --- a/common/pkgs/ioswitch2/ops2/ec.go +++ b/common/pkgs/ioswitch2/ops2/ec.go @@ -9,6 +9,7 @@ import ( "gitlink.org.cn/cloudream/common/pkgs/future" "gitlink.org.cn/cloudream/common/pkgs/ioswitch/dag" "gitlink.org.cn/cloudream/common/pkgs/ioswitch/exec" + "gitlink.org.cn/cloudream/common/pkgs/ioswitch/utils" cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" "gitlink.org.cn/cloudream/common/utils/io2" "gitlink.org.cn/cloudream/common/utils/sync2" @@ -18,8 +19,8 @@ import ( ) func init() { - exec.UseOp[*ECReconstructAny]() - exec.UseOp[*ECReconstruct]() + // exec.UseOp[*ECReconstructAny]() + // exec.UseOp[*ECReconstruct]() exec.UseOp[*ECMultiply]() } @@ -194,6 +195,15 @@ func (o *ECMultiply) Execute(ctx context.Context, e *exec.Executor) error { return nil } +func (o *ECMultiply) String() string { + return fmt.Sprintf( + "ECMultiply(coef=%v) (%v) -> (%v)", + o.Coef, + utils.FormatVarIDs(o.Inputs), + utils.FormatVarIDs(o.Outputs), + ) +} + type MultiplyType struct { EC cdssdk.ECRedundancy InputIndexes []int diff --git a/common/pkgs/ioswitch2/ops2/file.go b/common/pkgs/ioswitch2/ops2/file.go index cc6f46c..b6bd994 100644 --- a/common/pkgs/ioswitch2/ops2/file.go +++ b/common/pkgs/ioswitch2/ops2/file.go @@ -51,6 +51,10 @@ func (o *FileWrite) Execute(ctx context.Context, e *exec.Executor) error { return nil } +func (o *FileWrite) String() string { + return fmt.Sprintf("FileWrite %v -> %s", o.Input.ID, o.FilePath) +} + type FileRead struct { Output *exec.StreamVar `json:"output"` FilePath string `json:"filePath"` @@ -72,6 +76,10 @@ func (o *FileRead) Execute(ctx context.Context, e *exec.Executor) error { return nil } +func (o *FileRead) String() string { + return fmt.Sprintf("FileRead %s -> %v", o.FilePath, o.Output.ID) +} + type FileReadType struct { FilePath string } diff --git a/common/pkgs/ioswitch2/ops2/ipfs.go b/common/pkgs/ioswitch2/ops2/ipfs.go index 303157a..d6604e7 100644 --- a/common/pkgs/ioswitch2/ops2/ipfs.go +++ b/common/pkgs/ioswitch2/ops2/ipfs.go @@ -53,6 +53,10 @@ func (o *IPFSRead) Execute(ctx context.Context, e *exec.Executor) error { return fut.Wait(ctx) } +func (o *IPFSRead) String() string { + return fmt.Sprintf("IPFSRead %v -> %v", o.FileHash, o.Output.ID) +} + type IPFSWrite struct { Input *exec.StreamVar `json:"input"` FileHash *exec.StringVar `json:"fileHash"` @@ -86,6 +90,10 @@ func (o *IPFSWrite) Execute(ctx context.Context, e *exec.Executor) error { return nil } +func (o *IPFSWrite) String() string { + return fmt.Sprintf("IPFSWrite %v -> %v", o.Input.ID, o.FileHash.ID) +} + type IPFSReadType struct { FileHash string Option ipfs.ReadOption diff --git a/common/pkgs/ioswitch2/ops2/range.go b/common/pkgs/ioswitch2/ops2/range.go index 383b3c0..8e68238 100644 --- a/common/pkgs/ioswitch2/ops2/range.go +++ b/common/pkgs/ioswitch2/ops2/range.go @@ -72,6 +72,10 @@ func (o *Range) Execute(ctx context.Context, e *exec.Executor) error { return nil } +func (o *Range) String() string { + return fmt.Sprintf("Range(%v+%v) %v -> %v", o.Offset, o.Length, o.Input.ID, o.Output.ID) +} + type RangeType struct { Range exec.Range } diff --git a/common/pkgs/ioswitchlrc/agent_worker.go b/common/pkgs/ioswitchlrc/agent_worker.go index 67b084f..81b280f 100644 --- a/common/pkgs/ioswitchlrc/agent_worker.go +++ b/common/pkgs/ioswitchlrc/agent_worker.go @@ -5,11 +5,17 @@ import ( "io" "gitlink.org.cn/cloudream/common/pkgs/ioswitch/exec" + "gitlink.org.cn/cloudream/common/pkgs/types" cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" + "gitlink.org.cn/cloudream/common/utils/serder" stgglb "gitlink.org.cn/cloudream/storage/common/globals" agtrpc "gitlink.org.cn/cloudream/storage/common/pkgs/grpc/agent" ) +var _ = serder.UseTypeUnionExternallyTagged(types.Ref(types.NewTypeUnion[exec.WorkerInfo]( + (*AgentWorker)(nil), +))) + type AgentWorker struct { Node cdssdk.Node } diff --git a/common/pkgs/ioswitchlrc/ops2/chunked.go b/common/pkgs/ioswitchlrc/ops2/chunked.go index 18090a4..33ae0e6 100644 --- a/common/pkgs/ioswitchlrc/ops2/chunked.go +++ b/common/pkgs/ioswitchlrc/ops2/chunked.go @@ -9,8 +9,9 @@ import ( "gitlink.org.cn/cloudream/common/pkgs/future" "gitlink.org.cn/cloudream/common/pkgs/ioswitch/dag" "gitlink.org.cn/cloudream/common/pkgs/ioswitch/exec" + "gitlink.org.cn/cloudream/common/pkgs/ioswitch/utils" "gitlink.org.cn/cloudream/common/utils/io2" - "gitlink.org.cn/cloudream/storage/common/pkgs/ioswitch2" + "gitlink.org.cn/cloudream/storage/common/pkgs/ioswitchlrc" "golang.org/x/sync/semaphore" ) @@ -50,6 +51,16 @@ func (o *ChunkedSplit) Execute(ctx context.Context, e *exec.Executor) error { return sem.Acquire(ctx, int64(len(outputs))) } +func (o *ChunkedSplit) String() string { + return fmt.Sprintf( + "ChunkedSplit(chunkSize=%v, paddingZeros=%v), %v -> (%v)", + o.ChunkSize, + o.PaddingZeros, + o.Input.ID, + utils.FormatVarIDs(o.Outputs), + ) +} + type ChunkedJoin struct { Inputs []*exec.StreamVar `json:"inputs"` Output *exec.StreamVar `json:"output"` @@ -81,6 +92,15 @@ func (o *ChunkedJoin) Execute(ctx context.Context, e *exec.Executor) error { return fut.Wait(ctx) } +func (o *ChunkedJoin) String() string { + return fmt.Sprintf( + "ChunkedJoin(chunkSize=%v), (%v) -> %v", + o.ChunkSize, + utils.FormatVarIDs(o.Inputs), + o.Output.ID, + ) +} + type ChunkedSplitType struct { OutputCount int ChunkSize int @@ -89,7 +109,7 @@ type ChunkedSplitType struct { func (t *ChunkedSplitType) InitNode(node *dag.Node) { dag.NodeDeclareInputStream(node, 1) for i := 0; i < t.OutputCount; i++ { - dag.NodeNewOutputStream(node, &ioswitch2.VarProps{ + dag.NodeNewOutputStream(node, &ioswitchlrc.VarProps{ StreamIndex: i, }) } @@ -117,7 +137,7 @@ type ChunkedJoinType struct { func (t *ChunkedJoinType) InitNode(node *dag.Node) { dag.NodeDeclareInputStream(node, t.InputCount) - dag.NodeNewOutputStream(node, &ioswitch2.VarProps{ + dag.NodeNewOutputStream(node, &ioswitchlrc.VarProps{ StreamIndex: -1, }) } diff --git a/common/pkgs/ioswitchlrc/ops2/clone.go b/common/pkgs/ioswitchlrc/ops2/clone.go index d577b53..45129a8 100644 --- a/common/pkgs/ioswitchlrc/ops2/clone.go +++ b/common/pkgs/ioswitchlrc/ops2/clone.go @@ -8,6 +8,7 @@ import ( "github.com/samber/lo" "gitlink.org.cn/cloudream/common/pkgs/ioswitch/dag" "gitlink.org.cn/cloudream/common/pkgs/ioswitch/exec" + "gitlink.org.cn/cloudream/common/pkgs/ioswitch/utils" "gitlink.org.cn/cloudream/common/utils/io2" "golang.org/x/sync/semaphore" ) @@ -18,30 +19,34 @@ func init() { } type CloneStream struct { - Input *exec.StreamVar `json:"input"` - Outputs []*exec.StreamVar `json:"outputs"` + Raw *exec.StreamVar `json:"raw"` + Cloneds []*exec.StreamVar `json:"cloneds"` } func (o *CloneStream) Execute(ctx context.Context, e *exec.Executor) error { - err := e.BindVars(ctx, o.Input) + err := e.BindVars(ctx, o.Raw) if err != nil { return err } - defer o.Input.Stream.Close() + defer o.Raw.Stream.Close() - cloned := io2.Clone(o.Input.Stream, len(o.Outputs)) + cloned := io2.Clone(o.Raw.Stream, len(o.Cloneds)) - sem := semaphore.NewWeighted(int64(len(o.Outputs))) + sem := semaphore.NewWeighted(int64(len(o.Cloneds))) for i, s := range cloned { sem.Acquire(ctx, 1) - o.Outputs[i].Stream = io2.AfterReadClosedOnce(s, func(closer io.ReadCloser) { + o.Cloneds[i].Stream = io2.AfterReadClosedOnce(s, func(closer io.ReadCloser) { sem.Release(1) }) } - exec.PutArrayVars(e, o.Outputs) + exec.PutArrayVars(e, o.Cloneds) + + return sem.Acquire(ctx, int64(len(o.Cloneds))) +} - return sem.Acquire(ctx, int64(len(o.Outputs))) +func (o *CloneStream) String() string { + return fmt.Sprintf("CloneStream %v -> (%v)", o.Raw.ID, utils.FormatVarIDs(o.Cloneds)) } type CloneVar struct { @@ -65,6 +70,10 @@ func (o *CloneVar) Execute(ctx context.Context, e *exec.Executor) error { return nil } +func (o *CloneVar) String() string { + return fmt.Sprintf("CloneStream %v -> (%v)", o.Raw.GetID(), utils.FormatVarIDs(o.Cloneds)) +} + type CloneStreamType struct{} func (t *CloneStreamType) InitNode(node *dag.Node) { @@ -73,8 +82,8 @@ func (t *CloneStreamType) InitNode(node *dag.Node) { func (t *CloneStreamType) GenerateOp(op *dag.Node) (exec.Op, error) { return &CloneStream{ - Input: op.InputStreams[0].Var, - Outputs: lo.Map(op.OutputStreams, func(v *dag.StreamVar, idx int) *exec.StreamVar { + Raw: op.InputStreams[0].Var, + Cloneds: lo.Map(op.OutputStreams, func(v *dag.StreamVar, idx int) *exec.StreamVar { return v.Var }), }, nil diff --git a/common/pkgs/ioswitchlrc/ops2/ec.go b/common/pkgs/ioswitchlrc/ops2/ec.go index 7d644d4..01ed734 100644 --- a/common/pkgs/ioswitchlrc/ops2/ec.go +++ b/common/pkgs/ioswitchlrc/ops2/ec.go @@ -9,12 +9,12 @@ import ( "gitlink.org.cn/cloudream/common/pkgs/future" "gitlink.org.cn/cloudream/common/pkgs/ioswitch/dag" "gitlink.org.cn/cloudream/common/pkgs/ioswitch/exec" + "gitlink.org.cn/cloudream/common/pkgs/ioswitch/utils" cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" "gitlink.org.cn/cloudream/common/utils/io2" "gitlink.org.cn/cloudream/common/utils/sync2" "gitlink.org.cn/cloudream/storage/common/pkgs/ec" "gitlink.org.cn/cloudream/storage/common/pkgs/ec/lrc" - "gitlink.org.cn/cloudream/storage/common/pkgs/ioswitch2" "gitlink.org.cn/cloudream/storage/common/pkgs/ioswitchlrc" ) @@ -106,27 +106,29 @@ func (o *GalMultiply) Execute(ctx context.Context, e *exec.Executor) error { return nil } +func (o *GalMultiply) String() string { + return fmt.Sprintf( + "ECMultiply(coef=%v) (%v) -> (%v)", + o.Coef, + utils.FormatVarIDs(o.Inputs), + utils.FormatVarIDs(o.Outputs), + ) +} + type LRCConstructAnyType struct { - LRC cdssdk.LRCRedundancy + LRC cdssdk.LRCRedundancy + InputIndexes []int + OutputIndexes []int } func (t *LRCConstructAnyType) InitNode(node *dag.Node) {} func (t *LRCConstructAnyType) GenerateOp(op *dag.Node) (exec.Op, error) { - var inputIdxs []int - var outputIdxs []int - for _, in := range op.InputStreams { - inputIdxs = append(inputIdxs, ioswitch2.SProps(in).StreamIndex) - } - for _, out := range op.OutputStreams { - outputIdxs = append(outputIdxs, ioswitch2.SProps(out).StreamIndex) - } - l, err := lrc.New(t.LRC.N, t.LRC.K, t.LRC.Groups) if err != nil { return nil, err } - coef, err := l.GenerateMatrix(inputIdxs, outputIdxs) + coef, err := l.GenerateMatrix(t.InputIndexes, t.OutputIndexes) if err != nil { return nil, err } @@ -139,13 +141,23 @@ func (t *LRCConstructAnyType) GenerateOp(op *dag.Node) (exec.Op, error) { }, nil } -func (t *LRCConstructAnyType) AddInput(node *dag.Node, str *dag.StreamVar) { +func (t *LRCConstructAnyType) AddInput(node *dag.Node, str *dag.StreamVar, dataIndex int) { + t.InputIndexes = append(t.InputIndexes, dataIndex) node.InputStreams = append(node.InputStreams, str) str.To(node, len(node.InputStreams)-1) } +func (t *LRCConstructAnyType) RemoveAllInputs(n *dag.Node) { + for _, in := range n.InputStreams { + in.From.Node.OutputStreams[in.From.SlotIndex].NotTo(n) + } + n.InputStreams = nil + t.InputIndexes = nil +} + func (t *LRCConstructAnyType) NewOutput(node *dag.Node, dataIndex int) *dag.StreamVar { - return dag.NodeNewOutputStream(node, &ioswitch2.VarProps{StreamIndex: dataIndex}) + t.OutputIndexes = append(t.OutputIndexes, dataIndex) + return dag.NodeNewOutputStream(node, &ioswitchlrc.VarProps{StreamIndex: dataIndex}) } func (t *LRCConstructAnyType) String(node *dag.Node) string { diff --git a/common/pkgs/ioswitchlrc/ops2/ipfs.go b/common/pkgs/ioswitchlrc/ops2/ipfs.go index 303157a..3ccdc3d 100644 --- a/common/pkgs/ioswitchlrc/ops2/ipfs.go +++ b/common/pkgs/ioswitchlrc/ops2/ipfs.go @@ -12,7 +12,7 @@ import ( "gitlink.org.cn/cloudream/common/pkgs/logger" "gitlink.org.cn/cloudream/common/utils/io2" stgglb "gitlink.org.cn/cloudream/storage/common/globals" - "gitlink.org.cn/cloudream/storage/common/pkgs/ioswitch2" + "gitlink.org.cn/cloudream/storage/common/pkgs/ioswitchlrc" ) func init() { @@ -53,6 +53,10 @@ func (o *IPFSRead) Execute(ctx context.Context, e *exec.Executor) error { return fut.Wait(ctx) } +func (o *IPFSRead) String() string { + return fmt.Sprintf("IPFSRead %v -> %v", o.FileHash, o.Output.ID) +} + type IPFSWrite struct { Input *exec.StreamVar `json:"input"` FileHash *exec.StringVar `json:"fileHash"` @@ -86,13 +90,17 @@ func (o *IPFSWrite) Execute(ctx context.Context, e *exec.Executor) error { return nil } +func (o *IPFSWrite) String() string { + return fmt.Sprintf("IPFSWrite %v -> %v", o.Input.ID, o.FileHash.ID) +} + type IPFSReadType struct { FileHash string Option ipfs.ReadOption } func (t *IPFSReadType) InitNode(node *dag.Node) { - dag.NodeNewOutputStream(node, &ioswitch2.VarProps{}) + dag.NodeNewOutputStream(node, &ioswitchlrc.VarProps{}) } func (t *IPFSReadType) GenerateOp(n *dag.Node) (exec.Op, error) { @@ -114,7 +122,7 @@ type IPFSWriteType struct { func (t *IPFSWriteType) InitNode(node *dag.Node) { dag.NodeDeclareInputStream(node, 1) - dag.NodeNewOutputValue(node, dag.StringValueVar, &ioswitch2.VarProps{}) + dag.NodeNewOutputValue(node, dag.StringValueVar, &ioswitchlrc.VarProps{}) } func (t *IPFSWriteType) GenerateOp(op *dag.Node) (exec.Op, error) { diff --git a/common/pkgs/ioswitchlrc/ops2/range.go b/common/pkgs/ioswitchlrc/ops2/range.go index 383b3c0..5d97155 100644 --- a/common/pkgs/ioswitchlrc/ops2/range.go +++ b/common/pkgs/ioswitchlrc/ops2/range.go @@ -10,7 +10,7 @@ import ( "gitlink.org.cn/cloudream/common/pkgs/ioswitch/exec" "gitlink.org.cn/cloudream/common/utils/io2" "gitlink.org.cn/cloudream/common/utils/math2" - "gitlink.org.cn/cloudream/storage/common/pkgs/ioswitch2" + "gitlink.org.cn/cloudream/storage/common/pkgs/ioswitchlrc" ) func init() { @@ -72,13 +72,17 @@ func (o *Range) Execute(ctx context.Context, e *exec.Executor) error { return nil } +func (o *Range) String() string { + return fmt.Sprintf("Range(%v+%v) %v -> %v", o.Offset, o.Length, o.Input.ID, o.Output.ID) +} + type RangeType struct { Range exec.Range } func (t *RangeType) InitNode(node *dag.Node) { dag.NodeDeclareInputStream(node, 1) - dag.NodeNewOutputStream(node, &ioswitch2.VarProps{}) + dag.NodeNewOutputStream(node, &ioswitchlrc.VarProps{}) } func (t *RangeType) GenerateOp(n *dag.Node) (exec.Op, error) { diff --git a/common/pkgs/ioswitchlrc/parser/generator.go b/common/pkgs/ioswitchlrc/parser/generator.go index 97b4b8c..cabe84b 100644 --- a/common/pkgs/ioswitchlrc/parser/generator.go +++ b/common/pkgs/ioswitchlrc/parser/generator.go @@ -60,18 +60,15 @@ func buildDAGEncode(ctx *GenerateContext, fr ioswitchlrc.From, toes []ioswitchlr // 先创建需要完整文件的To节点,同时统计一下需要哪些文件块 for _, to := range toes { - if to.GetDataIndex() != -1 { - continue - } - - toNode, err := buildToNode(ctx, to) - if err != nil { - return fmt.Errorf("building to node: %w", err) - } - idx := to.GetDataIndex() if idx == -1 { + toNode, err := buildToNode(ctx, to) + if err != nil { + return fmt.Errorf("building to node: %w", err) + } + frNode.OutputStreams[0].To(toNode, 0) + } else if idx < ctx.LRC.K { dataToes = append(dataToes, to) } else { @@ -87,7 +84,8 @@ func buildDAGEncode(ctx *GenerateContext, fr ioswitchlrc.From, toes []ioswitchlr splitNode := ctx.DAG.NewNode(&ops2.ChunkedSplitType{ OutputCount: ctx.LRC.K, ChunkSize: ctx.LRC.ChunkSize, - }, nil) + }, &ioswitchlrc.NodeProps{}) + frNode.OutputStreams[0].To(splitNode, 0) for _, to := range dataToes { toNode, err := buildToNode(ctx, to) @@ -106,10 +104,10 @@ func buildDAGEncode(ctx *GenerateContext, fr ioswitchlrc.From, toes []ioswitchlr conNode, conType := dag.NewNode(ctx.DAG, &ops2.LRCConstructAnyType{ LRC: ctx.LRC, - }, nil) + }, &ioswitchlrc.NodeProps{}) for _, out := range splitNode.OutputStreams { - conType.AddInput(conNode, out) + conType.AddInput(conNode, out, ioswitchlrc.SProps(out).StreamIndex) } for _, to := range parityToes { @@ -193,10 +191,10 @@ func buildDAGReconstructAny(ctx *GenerateContext, frs []ioswitchlrc.From, toes [ conNode, conType := dag.NewNode(ctx.DAG, &ops2.LRCConstructAnyType{ LRC: ctx.LRC, - }, nil) + }, &ioswitchlrc.NodeProps{}) for _, fr := range frNodes { - conType.AddInput(conNode, fr.OutputStreams[0]) + conType.AddInput(conNode, fr.OutputStreams[0], ioswitchlrc.SProps(fr.OutputStreams[0]).StreamIndex) } for _, to := range missedToes { @@ -217,7 +215,7 @@ func buildDAGReconstructAny(ctx *GenerateContext, frs []ioswitchlrc.From, toes [ joinNode := ctx.DAG.NewNode(&ops2.ChunkedJoinType{ InputCount: ctx.LRC.K, ChunkSize: ctx.LRC.ChunkSize, - }, nil) + }, &ioswitchlrc.NodeProps{}) for i := 0; i < ctx.LRC.K; i++ { n := frNodes[i] @@ -237,6 +235,12 @@ func buildDAGReconstructAny(ctx *GenerateContext, frs []ioswitchlrc.From, toes [ joinNode.OutputStreams[0].To(toNode, 0) } + // 如果不需要Construct任何块,则删除这个节点 + if len(conNode.OutputStreams) == 0 { + conType.RemoveAllInputs(conNode) + ctx.DAG.RemoveNode(conNode) + } + return nil } @@ -273,7 +277,7 @@ func buildDAGReconstructGroup(ctx *GenerateContext, frs []ioswitchlrc.From, toes conNode := ctx.DAG.NewNode(&ops2.LRCConstructGroupType{ LRC: ctx.LRC, TargetBlockIndex: missedGrpIdx, - }, nil) + }, &ioswitchlrc.NodeProps{}) for i, fr := range frs { frNode, err := buildFromNode(ctx, fr) diff --git a/common/pkgs/ioswitchlrc/parser/passes.go b/common/pkgs/ioswitchlrc/parser/passes.go index 984cae2..d59f54f 100644 --- a/common/pkgs/ioswitchlrc/parser/passes.go +++ b/common/pkgs/ioswitchlrc/parser/passes.go @@ -225,12 +225,12 @@ func storeIPFSWriteResult(ctx *GenerateContext) { return true } - n := ctx.DAG.NewNode(&ops.StoreType{ + n, t := dag.NewNode(ctx.DAG, &ops.StoreType{ StoreKey: typ.FileHashStoreKey, }, &ioswitchlrc.NodeProps{}) n.Env.ToEnvDriver() - node.OutputValues[0].To(n, 0) + t.Store(n, node.OutputValues[0]) return true }) } @@ -293,7 +293,9 @@ func generateClone(ctx *GenerateContext) { n, t := dag.NewNode(ctx.DAG, &ops2.CloneStreamType{}, &ioswitchlrc.NodeProps{}) n.Env = node.Env for _, to := range out.Toes { - t.NewOutput(node).To(to.Node, to.SlotIndex) + str := t.NewOutput(n) + str.Props = &ioswitchlrc.VarProps{StreamIndex: ioswitchlrc.SProps(out).StreamIndex} + str.To(to.Node, to.SlotIndex) } out.Toes = nil out.To(n, 0) diff --git a/scanner/internal/event/check_package_redundancy.go b/scanner/internal/event/check_package_redundancy.go index 066495e..92e42d0 100644 --- a/scanner/internal/event/check_package_redundancy.go +++ b/scanner/internal/event/check_package_redundancy.go @@ -16,6 +16,8 @@ import ( "gitlink.org.cn/cloudream/storage/common/pkgs/distlock/reqbuilder" "gitlink.org.cn/cloudream/storage/common/pkgs/ioswitch2" "gitlink.org.cn/cloudream/storage/common/pkgs/ioswitch2/parser" + "gitlink.org.cn/cloudream/storage/common/pkgs/ioswitchlrc" + lrcparser "gitlink.org.cn/cloudream/storage/common/pkgs/ioswitchlrc/parser" agtmq "gitlink.org.cn/cloudream/storage/common/pkgs/mq/agent" coormq "gitlink.org.cn/cloudream/storage/common/pkgs/mq/coordinator" scevt "gitlink.org.cn/cloudream/storage/common/pkgs/mq/scanner/event" @@ -91,15 +93,15 @@ func (t *CheckPackageRedundancy) Execute(execCtx ExecuteContext) { return } - allNodes := make(map[cdssdk.NodeID]*NodeLoadInfo) + userAllNodes := make(map[cdssdk.NodeID]*NodeLoadInfo) for _, node := range getNodes.Nodes { - allNodes[node.NodeID] = &NodeLoadInfo{ + userAllNodes[node.NodeID] = &NodeLoadInfo{ Node: node, } } for _, log := range getLogs.Logs { - info, ok := allNodes[log.Storage.NodeID] + info, ok := userAllNodes[log.Storage.NodeID] if !ok { continue } @@ -118,10 +120,11 @@ func (t *CheckPackageRedundancy) Execute(execCtx ExecuteContext) { defEC := cdssdk.DefaultECRedundancy // TODO 目前rep的备份数量固定为2,所以这里直接选出两个节点 + // TODO 放到chooseRedundancy函数中 mostBlockNodeIDs := t.summaryRepObjectBlockNodes(getObjs.Objects, 2) - newRepNodes := t.chooseNewNodesForRep(&defRep, allNodes) - rechoosedRepNodes := t.rechooseNodesForRep(mostBlockNodeIDs, &defRep, allNodes) - newECNodes := t.chooseNewNodesForEC(&defEC, allNodes) + newRepNodes := t.chooseNewNodesForRep(&defRep, userAllNodes) + rechoosedRepNodes := t.rechooseNodesForRep(mostBlockNodeIDs, &defRep, userAllNodes) + newECNodes := t.chooseNewNodesForEC(&defEC, userAllNodes) // 加锁 builder := reqbuilder.NewBuilder() @@ -142,33 +145,50 @@ func (t *CheckPackageRedundancy) Execute(execCtx ExecuteContext) { var updating *coormq.UpdatingObjectRedundancy var err error - shouldUseEC := obj.Object.Size > config.Cfg().ECFileSizeThreshold + newRed, selectedNodes := t.chooseRedundancy(obj, userAllNodes) - switch red := obj.Object.Redundancy.(type) { + switch srcRed := obj.Object.Redundancy.(type) { case *cdssdk.NoneRedundancy: - if shouldUseEC { - log.WithField("ObjectID", obj.Object.ObjectID).Debugf("redundancy: none -> ec") - updating, err = t.noneToEC(obj, &defEC, newECNodes) - } else { + switch newRed := newRed.(type) { + case *cdssdk.RepRedundancy: log.WithField("ObjectID", obj.Object.ObjectID).Debugf("redundancy: none -> rep") - updating, err = t.noneToRep(obj, &defRep, newRepNodes) + updating, err = t.noneToRep(obj, newRed, newRepNodes) + + case *cdssdk.ECRedundancy: + log.WithField("ObjectID", obj.Object.ObjectID).Debugf("redundancy: none -> ec") + updating, err = t.noneToEC(obj, newRed, newECNodes) + + case *cdssdk.LRCRedundancy: + log.WithField("ObjectID", obj.Object.ObjectID).Debugf("redundancy: none -> lrc") + updating, err = t.noneToLRC(obj, newRed, selectedNodes) } case *cdssdk.RepRedundancy: - if shouldUseEC { + switch newRed := newRed.(type) { + case *cdssdk.RepRedundancy: + updating, err = t.repToRep(obj, srcRed, rechoosedRepNodes) + + case *cdssdk.ECRedundancy: log.WithField("ObjectID", obj.Object.ObjectID).Debugf("redundancy: rep -> ec") - updating, err = t.repToEC(obj, &defEC, newECNodes) - } else { - updating, err = t.repToRep(obj, &defRep, rechoosedRepNodes) + updating, err = t.repToEC(obj, newRed, newECNodes) } case *cdssdk.ECRedundancy: - if shouldUseEC { - uploadNodes := t.rechooseNodesForEC(obj, red, allNodes) - updating, err = t.ecToEC(obj, red, &defEC, uploadNodes) - } else { + switch newRed := newRed.(type) { + case *cdssdk.RepRedundancy: log.WithField("ObjectID", obj.Object.ObjectID).Debugf("redundancy: ec -> rep") - updating, err = t.ecToRep(obj, red, &defRep, newRepNodes) + updating, err = t.ecToRep(obj, srcRed, newRed, newRepNodes) + + case *cdssdk.ECRedundancy: + uploadNodes := t.rechooseNodesForEC(obj, srcRed, userAllNodes) + updating, err = t.ecToEC(obj, srcRed, newRed, uploadNodes) + } + + case *cdssdk.LRCRedundancy: + switch newRed := newRed.(type) { + case *cdssdk.LRCRedundancy: + uploadNodes := t.rechooseNodesForLRC(obj, srcRed, userAllNodes) + updating, err = t.lrcToLRC(obj, srcRed, newRed, uploadNodes) } } @@ -192,6 +212,20 @@ func (t *CheckPackageRedundancy) Execute(execCtx ExecuteContext) { } } +func (t *CheckPackageRedundancy) chooseRedundancy(obj stgmod.ObjectDetail, userAllNodes map[cdssdk.NodeID]*NodeLoadInfo) (cdssdk.Redundancy, []*NodeLoadInfo) { + switch obj.Object.Redundancy.(type) { + case *cdssdk.NoneRedundancy: + newLRCNodes := t.chooseNewNodesForLRC(&cdssdk.DefaultLRCRedundancy, userAllNodes) + return &cdssdk.DefaultLRCRedundancy, newLRCNodes + + case *cdssdk.LRCRedundancy: + newLRCNodes := t.rechooseNodesForLRC(obj, &cdssdk.DefaultLRCRedundancy, userAllNodes) + return &cdssdk.DefaultLRCRedundancy, newLRCNodes + + } + return nil, nil +} + // 统计每个对象块所在的节点,选出块最多的不超过nodeCnt个节点 func (t *CheckPackageRedundancy) summaryRepObjectBlockNodes(objs []stgmod.ObjectDetail, nodeCnt int) []cdssdk.NodeID { type nodeBlocks struct { @@ -253,6 +287,19 @@ func (t *CheckPackageRedundancy) chooseNewNodesForEC(red *cdssdk.ECRedundancy, a return t.chooseSoManyNodes(red.N, sortedNodes) } +func (t *CheckPackageRedundancy) chooseNewNodesForLRC(red *cdssdk.LRCRedundancy, allNodes map[cdssdk.NodeID]*NodeLoadInfo) []*NodeLoadInfo { + sortedNodes := sort2.Sort(lo.Values(allNodes), func(left *NodeLoadInfo, right *NodeLoadInfo) int { + dm := right.LoadsRecentMonth - left.LoadsRecentMonth + if dm != 0 { + return dm + } + + return right.LoadsRecentYear - left.LoadsRecentYear + }) + + return t.chooseSoManyNodes(red.N, sortedNodes) +} + func (t *CheckPackageRedundancy) rechooseNodesForRep(mostBlockNodeIDs []cdssdk.NodeID, red *cdssdk.RepRedundancy, allNodes map[cdssdk.NodeID]*NodeLoadInfo) []*NodeLoadInfo { type rechooseNode struct { *NodeLoadInfo @@ -334,6 +381,47 @@ func (t *CheckPackageRedundancy) rechooseNodesForEC(obj stgmod.ObjectDetail, red return t.chooseSoManyNodes(red.N, lo.Map(sortedNodes, func(node *rechooseNode, idx int) *NodeLoadInfo { return node.NodeLoadInfo })) } +func (t *CheckPackageRedundancy) rechooseNodesForLRC(obj stgmod.ObjectDetail, red *cdssdk.LRCRedundancy, allNodes map[cdssdk.NodeID]*NodeLoadInfo) []*NodeLoadInfo { + type rechooseNode struct { + *NodeLoadInfo + CachedBlockIndex int + } + + var rechooseNodes []*rechooseNode + for _, node := range allNodes { + cachedBlockIndex := -1 + for _, block := range obj.Blocks { + if block.NodeID == node.Node.NodeID { + cachedBlockIndex = block.Index + break + } + } + + rechooseNodes = append(rechooseNodes, &rechooseNode{ + NodeLoadInfo: node, + CachedBlockIndex: cachedBlockIndex, + }) + } + + sortedNodes := sort2.Sort(rechooseNodes, func(left *rechooseNode, right *rechooseNode) int { + dm := right.LoadsRecentMonth - left.LoadsRecentMonth + if dm != 0 { + return dm + } + + // 已经缓存了文件块的节点优先选择 + v := sort2.CmpBool(right.CachedBlockIndex > -1, left.CachedBlockIndex > -1) + if v != 0 { + return v + } + + return right.LoadsRecentYear - left.LoadsRecentYear + }) + + // TODO 可以考虑选择已有块的节点时,能依然按照Index顺序选择 + return t.chooseSoManyNodes(red.N, lo.Map(sortedNodes, func(node *rechooseNode, idx int) *NodeLoadInfo { return node.NodeLoadInfo })) +} + func (t *CheckPackageRedundancy) chooseSoManyNodes(count int, nodes []*NodeLoadInfo) []*NodeLoadInfo { repeateCount := (count + len(nodes) - 1) / len(nodes) extedNodes := make([]*NodeLoadInfo, repeateCount*len(nodes)) @@ -449,6 +537,55 @@ func (t *CheckPackageRedundancy) noneToEC(obj stgmod.ObjectDetail, red *cdssdk.E }, nil } +func (t *CheckPackageRedundancy) noneToLRC(obj stgmod.ObjectDetail, red *cdssdk.LRCRedundancy, uploadNodes []*NodeLoadInfo) (*coormq.UpdatingObjectRedundancy, error) { + coorCli, err := stgglb.CoordinatorMQPool.Acquire() + if err != nil { + return nil, fmt.Errorf("new coordinator client: %w", err) + } + defer stgglb.CoordinatorMQPool.Release(coorCli) + + if len(obj.Blocks) == 0 { + return nil, fmt.Errorf("object is not cached on any nodes, cannot change its redundancy to ec") + } + + getNodes, err := coorCli.GetNodes(coormq.NewGetNodes([]cdssdk.NodeID{obj.Blocks[0].NodeID})) + if err != nil { + return nil, fmt.Errorf("requesting to get nodes: %w", err) + } + + var toes []ioswitchlrc.To + for i := 0; i < red.N; i++ { + toes = append(toes, ioswitchlrc.NewToNode(uploadNodes[i].Node, i, fmt.Sprintf("%d", i))) + } + + plans := exec.NewPlanBuilder() + err = lrcparser.Encode(ioswitchlrc.NewFromNode(obj.Object.FileHash, &getNodes.Nodes[0], -1), toes, plans) + if err != nil { + return nil, fmt.Errorf("parsing plan: %w", err) + } + + ioRet, err := plans.Execute().Wait(context.TODO()) + if err != nil { + return nil, fmt.Errorf("executing io plan: %w", err) + } + + var blocks []stgmod.ObjectBlock + for i := 0; i < red.N; i++ { + blocks = append(blocks, stgmod.ObjectBlock{ + ObjectID: obj.Object.ObjectID, + Index: i, + NodeID: uploadNodes[i].Node.NodeID, + FileHash: ioRet[fmt.Sprintf("%d", i)].(string), + }) + } + + return &coormq.UpdatingObjectRedundancy{ + ObjectID: obj.Object.ObjectID, + Redundancy: red, + Blocks: blocks, + }, nil +} + func (t *CheckPackageRedundancy) repToRep(obj stgmod.ObjectDetail, red *cdssdk.RepRedundancy, uploadNodes []*NodeLoadInfo) (*coormq.UpdatingObjectRedundancy, error) { if len(obj.Blocks) == 0 { return nil, fmt.Errorf("object is not cached on any nodes, cannot change its redundancy to rep") @@ -652,6 +789,198 @@ func (t *CheckPackageRedundancy) ecToEC(obj stgmod.ObjectDetail, srcRed *cdssdk. }, nil } +func (t *CheckPackageRedundancy) lrcToLRC(obj stgmod.ObjectDetail, srcRed *cdssdk.LRCRedundancy, tarRed *cdssdk.LRCRedundancy, uploadNodes []*NodeLoadInfo) (*coormq.UpdatingObjectRedundancy, error) { + coorCli, err := stgglb.CoordinatorMQPool.Acquire() + if err != nil { + return nil, fmt.Errorf("new coordinator client: %w", err) + } + defer stgglb.CoordinatorMQPool.Release(coorCli) + + blocksGrpByIndex := obj.GroupBlocks() + + var lostBlocks []int + var lostBlockGrps []int + canGroupReconstruct := true + + allBlockFlags := make([]bool, srcRed.N) + for _, block := range blocksGrpByIndex { + allBlockFlags[block.Index] = true + } + + for i, ok := range allBlockFlags { + grpID := srcRed.FindGroup(i) + if !ok { + if grpID == -1 { + canGroupReconstruct = false + break + } + + if len(lostBlocks) > 0 && lostBlockGrps[len(lostBlockGrps)-1] == grpID { + canGroupReconstruct = false + break + } + + lostBlocks = append(lostBlocks, i) + lostBlockGrps = append(lostBlockGrps, grpID) + } + } + + if canGroupReconstruct { + return t.groupReconstructLRC(obj, lostBlocks, lostBlockGrps, blocksGrpByIndex, srcRed, uploadNodes) + } + + return t.reconstructLRC(obj, blocksGrpByIndex, srcRed, uploadNodes) +} + +func (t *CheckPackageRedundancy) groupReconstructLRC(obj stgmod.ObjectDetail, lostBlocks []int, lostBlockGrps []int, grpedBlocks []stgmod.GrouppedObjectBlock, red *cdssdk.LRCRedundancy, uploadNodes []*NodeLoadInfo) (*coormq.UpdatingObjectRedundancy, error) { + grped := make(map[int]stgmod.GrouppedObjectBlock) + for _, b := range grpedBlocks { + grped[b.Index] = b + } + + plans := exec.NewPlanBuilder() + + for i := 0; i < len(lostBlocks); i++ { + var froms []ioswitchlrc.From + grpEles := red.GetGroupElements(lostBlockGrps[i]) + for _, ele := range grpEles { + if ele == lostBlocks[i] { + continue + } + + froms = append(froms, ioswitchlrc.NewFromNode(grped[ele].FileHash, nil, ele)) + } + + err := lrcparser.ReconstructGroup(froms, []ioswitchlrc.To{ + ioswitchlrc.NewToNode(uploadNodes[i].Node, lostBlocks[i], fmt.Sprintf("%d", lostBlocks[i])), + }, plans) + if err != nil { + return nil, fmt.Errorf("parsing plan: %w", err) + } + } + + fmt.Printf("plans: %v\n", plans) + + // 如果没有任何Plan,Wait会直接返回成功 + ret, err := plans.Execute().Wait(context.TODO()) + if err != nil { + return nil, fmt.Errorf("executing io plan: %w", err) + } + + var newBlocks []stgmod.ObjectBlock + for _, i := range lostBlocks { + newBlocks = append(newBlocks, stgmod.ObjectBlock{ + ObjectID: obj.Object.ObjectID, + Index: i, + NodeID: uploadNodes[i].Node.NodeID, + FileHash: ret[fmt.Sprintf("%d", i)].(string), + }) + } + for _, b := range grpedBlocks { + for _, nodeID := range b.NodeIDs { + newBlocks = append(newBlocks, stgmod.ObjectBlock{ + ObjectID: obj.Object.ObjectID, + Index: b.Index, + NodeID: nodeID, + FileHash: b.FileHash, + }) + } + } + + return &coormq.UpdatingObjectRedundancy{ + ObjectID: obj.Object.ObjectID, + Redundancy: red, + Blocks: newBlocks, + }, nil +} + +func (t *CheckPackageRedundancy) reconstructLRC(obj stgmod.ObjectDetail, grpBlocks []stgmod.GrouppedObjectBlock, red *cdssdk.LRCRedundancy, uploadNodes []*NodeLoadInfo) (*coormq.UpdatingObjectRedundancy, error) { + var chosenBlocks []stgmod.GrouppedObjectBlock + for _, block := range grpBlocks { + if len(block.NodeIDs) > 0 && block.Index < red.M() { + chosenBlocks = append(chosenBlocks, block) + } + + if len(chosenBlocks) == red.K { + break + } + } + + if len(chosenBlocks) < red.K { + return nil, fmt.Errorf("no enough blocks to reconstruct the original file data") + } + + // 目前LRC的参数都相同,所以可以不用重建出完整数据然后再分块,可以直接构建出目的节点需要的块 + planBlder := exec.NewPlanBuilder() + + var froms []ioswitchlrc.From + var toes []ioswitchlrc.To + var newBlocks []stgmod.ObjectBlock + shouldUpdateBlocks := false + for i, node := range uploadNodes { + newBlock := stgmod.ObjectBlock{ + ObjectID: obj.Object.ObjectID, + Index: i, + NodeID: node.Node.NodeID, + } + + grp, ok := lo.Find(grpBlocks, func(grp stgmod.GrouppedObjectBlock) bool { return grp.Index == i }) + + // 如果新选中的节点已经记录在Block表中,那么就不需要任何变更 + if ok && lo.Contains(grp.NodeIDs, node.Node.NodeID) { + newBlock.FileHash = grp.FileHash + newBlocks = append(newBlocks, newBlock) + continue + } + + shouldUpdateBlocks = true + + // 否则就要重建出这个节点需要的块 + + for _, block := range chosenBlocks { + fmt.Printf("b: %v\n", block.Index) + froms = append(froms, ioswitchlrc.NewFromNode(block.FileHash, &node.Node, block.Index)) + } + + // 输出只需要自己要保存的那一块 + toes = append(toes, ioswitchlrc.NewToNode(node.Node, i, fmt.Sprintf("%d", i))) + + newBlocks = append(newBlocks, newBlock) + } + + err := lrcparser.ReconstructAny(froms, toes, planBlder) + if err != nil { + return nil, fmt.Errorf("parsing plan: %w", err) + } + + fmt.Printf("plans: %v\n", planBlder) + + // 如果没有任何Plan,Wait会直接返回成功 + ret, err := planBlder.Execute().Wait(context.TODO()) + if err != nil { + return nil, fmt.Errorf("executing io plan: %w", err) + } + + if !shouldUpdateBlocks { + return nil, nil + } + + for k, v := range ret { + idx, err := strconv.ParseInt(k, 10, 64) + if err != nil { + return nil, fmt.Errorf("parsing result key %s as index: %w", k, err) + } + + newBlocks[idx].FileHash = v.(string) + } + + return &coormq.UpdatingObjectRedundancy{ + ObjectID: obj.Object.ObjectID, + Redundancy: red, + Blocks: newBlocks, + }, nil +} + func (t *CheckPackageRedundancy) pinObject(nodeID cdssdk.NodeID, fileHash string) error { agtCli, err := stgglb.AgentMQPool.Acquire(nodeID) if err != nil {