|
- package ops2
-
- import (
- "context"
- "fmt"
- "io"
-
- "github.com/samber/lo"
- "gitlink.org.cn/cloudream/common/pkgs/future"
- "gitlink.org.cn/cloudream/common/pkgs/ioswitch/dag"
- "gitlink.org.cn/cloudream/common/pkgs/ioswitch/exec"
- "gitlink.org.cn/cloudream/common/pkgs/ioswitch/utils"
- "gitlink.org.cn/cloudream/common/utils/io2"
- "golang.org/x/sync/semaphore"
- )
-
- func init() {
- exec.UseOp[*ChunkedSplit]()
- exec.UseOp[*ChunkedJoin]()
- }
-
- type ChunkedSplit struct {
- Input *exec.StreamVar `json:"input"`
- Outputs []*exec.StreamVar `json:"outputs"`
- ChunkSize int `json:"chunkSize"`
- PaddingZeros bool `json:"paddingZeros"`
- }
-
- func (o *ChunkedSplit) Execute(ctx context.Context, e *exec.Executor) error {
- err := e.BindVars(ctx, o.Input)
- if err != nil {
- return err
- }
- defer o.Input.Stream.Close()
-
- outputs := io2.ChunkedSplit(o.Input.Stream, o.ChunkSize, len(o.Outputs), io2.ChunkedSplitOption{
- PaddingZeros: o.PaddingZeros,
- })
-
- sem := semaphore.NewWeighted(int64(len(outputs)))
- for i := range outputs {
- sem.Acquire(ctx, 1)
-
- o.Outputs[i].Stream = io2.AfterReadClosedOnce(outputs[i], func(closer io.ReadCloser) {
- sem.Release(1)
- })
- }
- exec.PutArrayVars(e, o.Outputs)
-
- return sem.Acquire(ctx, int64(len(outputs)))
- }
-
- func (o *ChunkedSplit) String() string {
- return fmt.Sprintf(
- "ChunkedSplit(chunkSize=%v, paddingZeros=%v), %v -> (%v)",
- o.ChunkSize,
- o.PaddingZeros,
- o.Input.ID,
- utils.FormatVarIDs(o.Outputs),
- )
- }
-
- type ChunkedJoin struct {
- Inputs []*exec.StreamVar `json:"inputs"`
- Output *exec.StreamVar `json:"output"`
- ChunkSize int `json:"chunkSize"`
- }
-
- func (o *ChunkedJoin) Execute(ctx context.Context, e *exec.Executor) error {
- err := exec.BindArrayVars(e, ctx, o.Inputs)
- if err != nil {
- return err
- }
-
- var strReaders []io.Reader
- for _, s := range o.Inputs {
- strReaders = append(strReaders, s.Stream)
- }
- defer func() {
- for _, str := range o.Inputs {
- str.Stream.Close()
- }
- }()
-
- fut := future.NewSetVoid()
- o.Output.Stream = io2.AfterReadClosedOnce(io2.BufferedChunkedJoin(strReaders, o.ChunkSize), func(closer io.ReadCloser) {
- fut.SetVoid()
- })
- e.PutVars(o.Output)
-
- return fut.Wait(ctx)
- }
-
- func (o *ChunkedJoin) String() string {
- return fmt.Sprintf(
- "ChunkedJoin(chunkSize=%v), (%v) -> %v",
- o.ChunkSize,
- utils.FormatVarIDs(o.Inputs),
- o.Output.ID,
- )
- }
-
- type ChunkedSplitNode struct {
- dag.NodeBase
- ChunkSize int
- }
-
- func (b *GraphNodeBuilder) NewChunkedSplit(chunkSize int) *ChunkedSplitNode {
- node := &ChunkedSplitNode{
- ChunkSize: chunkSize,
- }
- b.AddNode(node)
- return node
- }
-
- func (t *ChunkedSplitNode) Split(input *dag.StreamVar, cnt int) {
- t.InputStreams().EnsureSize(1)
- input.Connect(t, 0)
- t.OutputStreams().Resize(cnt)
- for i := 0; i < cnt; i++ {
- t.OutputStreams().Setup(t, t.Graph().NewStreamVar(), i)
- }
- }
-
- func (t *ChunkedSplitNode) SubStream(idx int) *dag.StreamVar {
- return t.OutputStreams().Get(idx)
- }
-
- func (t *ChunkedSplitNode) SplitCount() int {
- return t.OutputStreams().Len()
- }
-
- func (t *ChunkedSplitNode) GenerateOp() (exec.Op, error) {
- return &ChunkedSplit{
- Input: t.InputStreams().Get(0).Var,
- Outputs: lo.Map(t.OutputStreams().RawArray(), func(v *dag.StreamVar, idx int) *exec.StreamVar {
- return v.Var
- }),
- ChunkSize: t.ChunkSize,
- PaddingZeros: true,
- }, nil
- }
-
- // func (t *ChunkedSplitNode) String() string {
- // return fmt.Sprintf("ChunkedSplit[%v]%v%v", t.ChunkSize, formatStreamIO(node), formatValueIO(node))
- // }
-
- type ChunkedJoinNode struct {
- dag.NodeBase
- ChunkSize int
- }
-
- func (b *GraphNodeBuilder) NewChunkedJoin(chunkSize int) *ChunkedJoinNode {
- node := &ChunkedJoinNode{
- ChunkSize: chunkSize,
- }
- b.AddNode(node)
- node.OutputStreams().SetupNew(node, b.Graph.NewStreamVar())
- return node
- }
-
- func (t *ChunkedJoinNode) AddInput(str *dag.StreamVar) {
- idx := t.InputStreams().EnlargeOne()
- str.Connect(t, idx)
- }
-
- func (t *ChunkedJoinNode) Joined() *dag.StreamVar {
- return t.OutputStreams().Get(0)
- }
-
- func (t *ChunkedJoinNode) GenerateOp() (exec.Op, error) {
- return &ChunkedJoin{
- Inputs: lo.Map(t.InputStreams().RawArray(), func(v *dag.StreamVar, idx int) *exec.StreamVar {
- return v.Var
- }),
- Output: t.OutputStreams().Get(0).Var,
- ChunkSize: t.ChunkSize,
- }, nil
- }
-
- // func (t *ChunkedJoinType) String() string {
- // return fmt.Sprintf("ChunkedJoin[%v]%v%v", t.ChunkSize, formatStreamIO(node), formatValueIO(node))
- // }
|