| @@ -37,7 +37,10 @@ func (o *ChunkedSplit) Execute(ctx *exec.ExecContext, e *exec.Executor) error { | |||||
| sem := semaphore.NewWeighted(int64(len(outputs))) | sem := semaphore.NewWeighted(int64(len(outputs))) | ||||
| for i := range outputs { | for i := range outputs { | ||||
| sem.Acquire(ctx.Context, 1) | |||||
| err = sem.Acquire(ctx.Context, 1) | |||||
| if err != nil { | |||||
| return err | |||||
| } | |||||
| e.PutVar(o.Outputs[i], &exec.StreamValue{ | e.PutVar(o.Outputs[i], &exec.StreamValue{ | ||||
| Stream: io2.AfterReadClosedOnce(outputs[i], func(closer io.ReadCloser) { | Stream: io2.AfterReadClosedOnce(outputs[i], func(closer io.ReadCloser) { | ||||
| @@ -37,7 +37,10 @@ func (o *ChunkedSplit) Execute(ctx *exec.ExecContext, e *exec.Executor) error { | |||||
| sem := semaphore.NewWeighted(int64(len(outputs))) | sem := semaphore.NewWeighted(int64(len(outputs))) | ||||
| for i := range outputs { | for i := range outputs { | ||||
| sem.Acquire(ctx.Context, 1) | |||||
| err = sem.Acquire(ctx.Context, 1) | |||||
| if err != nil { | |||||
| return err | |||||
| } | |||||
| e.PutVar(o.Outputs[i], &exec.StreamValue{ | e.PutVar(o.Outputs[i], &exec.StreamValue{ | ||||
| Stream: io2.AfterReadClosedOnce(outputs[i], func(closer io.ReadCloser) { | Stream: io2.AfterReadClosedOnce(outputs[i], func(closer io.ReadCloser) { | ||||