diff --git a/agent/internal/grpc/io.go b/agent/internal/grpc/io.go index 584b8aa..89eccbe 100644 --- a/agent/internal/grpc/io.go +++ b/agent/internal/grpc/io.go @@ -60,14 +60,10 @@ func (s *Service) SendStream(server agtrpc.Agent_SendStreamServer) error { pr, pw := io.Pipe() - rb := io2.RingBuffer(pr, 16*1024) - rb.UpstreamName = fmt.Sprintf("GRPC(send) input") - rb.DownstreamName = fmt.Sprintf("GRPC(send) output %v", msg.VarID) - varID := exec.VarID(msg.VarID) sw.PutVars(&exec.StreamVar{ ID: varID, - Stream: rb, + Stream: pr, }) // 然后读取文件数据 diff --git a/common/pkgs/ioswitch2/ops2/ipfs.go b/common/pkgs/ioswitch2/ops2/ipfs.go index b1dbd65..f8e4e2a 100644 --- a/common/pkgs/ioswitch2/ops2/ipfs.go +++ b/common/pkgs/ioswitch2/ops2/ipfs.go @@ -44,10 +44,7 @@ func (o *IPFSRead) Execute(ctx context.Context, e *exec.Executor) error { defer file.Close() fut := future.NewSetVoid() - rb := io2.RingBuffer(file, 16*1024) - rb.UpstreamName = "IPFS" - rb.DownstreamName = fmt.Sprintf("IPFS output %v", o.Output.ID) - o.Output.Stream = io2.AfterReadClosedOnce(rb, func(closer io.ReadCloser) { + o.Output.Stream = io2.AfterReadClosedOnce(file, func(closer io.ReadCloser) { fut.SetVoid() }) e.PutVars(o.Output)