Skip to content
25 changes: 25 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,31 @@ A package used to easily build command pipelines in your Go applications
# Important
We have not thoroughly tested this package on OSs other than Linux, especially Windows. At this time, using this package on Windows based systems is considered experimental and will be supported only on a best effort basis.

# Migrating to v2

It's normal for pipelines to stop before all input has been consumed[^1]. If an earlier stage continues writing after that happens, the write side of the pipe can fail with `EPIPE`, `SIGPIPE`, or `io.ErrClosedPipe`.

In go-pipe v1 it was possible to get away without handling this case, because a command stage's stdin was connected in a way that often (but not necessarily!) drained the write side and hid the error from the previous stage feeding it. That was an implementation detail, not a guarantee. In go-pipe v2, producer stages are more likely to be connected directly to a command's stdin, and thus see the error themselves.

Fortunately, this is easily handled by wrapping the stage with `pipe.IgnoreError(stage, IsPipeError)`. If the producer only writes output and is otherwise stateless, that's the only thing needed.

If the producer also updates state, metrics, cursors, or has other side effects, in a way that depends on how much of the output was produced, then in addition to using `pipe.IgnoreError`, you must also ensure producer-owned state is brought to a consistent point before returning the error.

For example, if a stateful producer function must process its entire input for correctness regardless of whether it was read by the consumer, it should use a pattern like:

```go
var writeErr error
for _, item := range items {
updateState(item)
if writeErr == nil {
_, writeErr = fmt.Fprintln(stdout, item)
}
}
return writeErr
```

# Links

* [Docs](https://pkg.go.dev/github.com/github/go-pipe/v2)

[^1]: In `cat foo | head | grep -q`, for example, either `head` or `grep` could exit before its input is fully consumed.
74 changes: 46 additions & 28 deletions pipe/close_responsibility_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,9 @@ func (w *writeCloseSpy) Close() error {
return nil
}

// TestGoStageHonorsCloseFlags verifies that a Function stage closes
// stdin/stdout iff the corresponding close flag is true.
func TestGoStageHonorsCloseFlags(t *testing.T) {
// TestGoStageHonorsStreamOwnership verifies that a Function stage closes
// stdin/stdout iff the corresponding stream is closing.
func TestGoStageHonorsStreamOwnership(t *testing.T) {
cases := []struct {
name string
leaveIn, leaveOut bool
Expand All @@ -58,34 +58,38 @@ func TestGoStageHonorsCloseFlags(t *testing.T) {

require.NoError(t, s.Start(
context.Background(), StageOptions{},
in, !tc.leaveIn,
out, !tc.leaveOut,
inputForTest(in, !tc.leaveIn),
outputForTest(out, !tc.leaveOut),
))
require.NoError(t, s.Wait())

assert.Equal(t, !tc.leaveIn, in.closed.Load(), "closeStdin=%v", !tc.leaveIn)
assert.Equal(t, !tc.leaveOut, out.closed.Load(), "closeStdout=%v", !tc.leaveOut)
assert.Equal(t, !tc.leaveIn, in.closed.Load(), "closing stdin=%v", !tc.leaveIn)
assert.Equal(t, !tc.leaveOut, out.closed.Load(), "closing stdout=%v", !tc.leaveOut)
})
}
}

func TestStagePanicsWhenOwnedStreamIsNotCloseable(t *testing.T) {
s := Function("f", func(_ context.Context, _ Env, _ io.Reader, _ io.Writer) error {
return nil
})

assert.PanicsWithValue(t, "stage asked to close *strings.Reader, which does not implement io.Closer", func() {
_ = s.Start(
context.Background(), StageOptions{},
strings.NewReader("not closeable"), true,
nil, false,
)
})
func TestStreamConstructorsPreserveOwnershipAndDynamicType(t *testing.T) {
borrowedInput := strings.NewReader("borrowed")
assert.Same(t, borrowedInput, Input(borrowedInput).Reader())
assert.Nil(t, Input(borrowedInput).Closer())

ownedInput := &readCloseSpy{Reader: strings.NewReader("owned")}
assert.Same(t, ownedInput, ClosingInput(ownedInput).Reader())
assert.Same(t, ownedInput, ClosingInput(ownedInput).Closer())

borrowedOutput := &strings.Builder{}
assert.Same(t, borrowedOutput, Output(borrowedOutput).Writer())
assert.Nil(t, Output(borrowedOutput).Closer())

ownedOutput := &writeCloseSpy{Writer: io.Discard}
assert.Same(t, ownedOutput, ClosingOutput(ownedOutput).Writer())
assert.Same(t, ownedOutput, ClosingOutput(ownedOutput).Closer())
}

// TestCommandStageHonorsCloseStdin verifies that a command stage closes a
// non-file stdin (a "late" closer) iff closeStdin is true. An empty
// reader is used so exec.Cmd's input-copy goroutine sees EOF promptly.
// non-file stdin (a "late" closer) iff the input stream is closing. An
// empty reader is used so exec.Cmd's input-copy goroutine sees EOF promptly.
func TestCommandStageHonorsCloseStdin(t *testing.T) {
for _, leave := range []bool{false, true} {
name := "owns stdin"
Expand All @@ -100,19 +104,19 @@ func TestCommandStageHonorsCloseStdin(t *testing.T) {

require.NoError(t, s.Start(
context.Background(), StageOptions{},
in, !leave,
nil, false,
inputForTest(in, !leave),
Output(nil),
))
require.NoError(t, s.Wait())

assert.Equal(t, !leave, in.closed.Load(), "closeStdin=%v", !leave)
assert.Equal(t, !leave, in.closed.Load(), "closing stdin=%v", !leave)
})
}
}

// TestCommandStageHonorsCloseStdout verifies the stdout counterpart: a
// non-file stdout (routed through the pooled-copy path) is closed iff
// closeStdout is true.
// the output stream is closing.
func TestCommandStageHonorsCloseStdout(t *testing.T) {
for _, leave := range []bool{false, true} {
name := "owns stdout"
Expand All @@ -127,12 +131,26 @@ func TestCommandStageHonorsCloseStdout(t *testing.T) {

require.NoError(t, s.Start(
context.Background(), StageOptions{},
nil, false,
out, !leave,
Input(nil),
outputForTest(out, !leave),
))
require.NoError(t, s.Wait())

assert.Equal(t, !leave, out.closed.Load(), "closeStdout=%v", !leave)
assert.Equal(t, !leave, out.closed.Load(), "closing stdout=%v", !leave)
})
}
}

func inputForTest(r io.ReadCloser, closing bool) InputStream {
if closing {
return ClosingInput(r)
}
return Input(r)
}

func outputForTest(w io.WriteCloser, closing bool) OutputStream {
if closing {
return ClosingOutput(w)
}
return Output(w)
}
44 changes: 23 additions & 21 deletions pipe/command.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,11 +88,12 @@ func (s *commandStage) Requirements() StageRequirements {

func (s *commandStage) Start(
ctx context.Context, opts StageOptions,
stdin io.Reader, closeStdin bool,
stdout io.Writer, closeStdout bool,
ins InputStream, outs OutputStream,
) error {
stdinCloser := ownedCloser(stdin, closeStdin)
stdoutCloser := ownedCloser(stdout, closeStdout)
stdin := ins.Reader()
stdinCloser := ins.Closer()
stdout := outs.Writer()
stdoutCloser := outs.Closer()

if s.cmd.Dir == "" {
s.cmd.Dir = opts.Dir
Expand All @@ -118,43 +119,44 @@ func (s *commandStage) Start(
}
}

closeEarlyClosers := func() {
for _, closer := range earlyClosers {
_ = closer.Close()
}
}

// On error, Close any pipes we created and wait for the goroutines to
// exit before propagating the error.
cleanupOnStartFailure := func() {
closeEarlyClosers()
_ = s.wg.Wait()
_ = s.closeLateClosers()
}

if stdout != nil {
if f, ok := stdout.(*os.File); ok {
s.cmd.Stdout = f
if stdoutCloser != nil {
earlyClosers = append(earlyClosers, stdoutCloser)
}
} else {
if stdoutCloser != nil {
s.lateClosers = append(s.lateClosers, stdoutCloser)
}
// Route the copy through our own pipe so we can use a
// pooled buffer rather than letting exec.Cmd allocate a
// fresh 32KB buffer for its internal io.Copy.
ec, err := s.setupPooledStdout(stdout)
if err != nil {
cleanupOnStartFailure()
return err
}
earlyClosers = append(earlyClosers, ec)
if stdoutCloser != nil {
s.lateClosers = append(s.lateClosers, stdoutCloser)
}
}
} else if stdoutCloser != nil {
s.lateClosers = append(s.lateClosers, stdoutCloser)
}

closeEarlyClosers := func() {
for _, closer := range earlyClosers {
_ = closer.Close()
}
}

// On error, Close any pipes we created and wait for the goroutines to
// exit before propagating the error.
cleanupOnStartFailure := func() {
closeEarlyClosers()
_ = s.wg.Wait()
_ = s.closeLateClosers()
}

// If the caller hasn't arranged otherwise, read the command's
// standard error into our `stderr` field:
if s.cmd.Stderr == nil {
Expand Down
17 changes: 11 additions & 6 deletions pipe/command_stdout_fastpath_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,15 @@ import (
// subprocess can detect when that fd is closed.
func TestCommandStageStdoutFastPath(t *testing.T) {
cases := []struct {
name string
closeStdout bool
name string
closingStdout bool
}{
{
name: "raw *os.File with closeStdout",
closeStdout: true,
name: "raw *os.File with closing stdout",
closingStdout: true,
},
{
name: "raw *os.File without closeStdout",
name: "raw *os.File with non-closing stdout",
},
}
for _, tc := range cases {
Expand All @@ -42,7 +42,12 @@ func TestCommandStageStdoutFastPath(t *testing.T) {
cmd := exec.Command("true")
s := CommandStage("true", cmd).(*commandStage)

require.NoError(t, s.Start(ctx, StageOptions{}, nil, false, f, tc.closeStdout))
stdout := OutputStream{writer: f}
if tc.closingStdout {
stdout = ClosingOutput(f)
}

require.NoError(t, s.Start(ctx, StageOptions{}, Input(nil), stdout))
t.Cleanup(func() { _ = s.Wait() })

gotFile, ok := s.cmd.Stdout.(*os.File)
Expand Down
10 changes: 3 additions & 7 deletions pipe/env_stage.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,6 @@
package pipe

import (
"context"
"io"
)
import "context"

// WithExtraEnv returns a Stage that adds env to the environment seen by inner.
func WithExtraEnv(inner Stage, env []EnvVar) Stage {
Expand Down Expand Up @@ -40,13 +37,12 @@ func (s *stageWithExtraEnv) Requirements() StageRequirements {

func (s *stageWithExtraEnv) Start(
ctx context.Context, opts StageOptions,
stdin io.Reader, closeStdin bool,
stdout io.Writer, closeStdout bool,
stdin InputStream, stdout OutputStream,
) error {
opts.Vars = append(opts.Vars[:len(opts.Vars):len(opts.Vars)], func(_ context.Context, vars []EnvVar) []EnvVar {
return append(vars, s.env...)
})
return s.inner.Start(ctx, opts, stdin, closeStdin, stdout, closeStdout)
return s.inner.Start(ctx, opts, stdin, stdout)
}

func (s *stageWithExtraEnv) Wait() error {
Expand Down
12 changes: 11 additions & 1 deletion pipe/filter-error.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,12 @@ type ErrorMatcher func(err error) bool
// the functions from the standard library that has the same signature
// (e.g., `os.IsTimeout`), or some combination of these (e.g.,
// `AnyError(IsSIGPIPE, os.IsTimeout)`).
//
// `IgnoreError` only suppresses the error returned by the wrapped
// stage. If a producer ignores pipe errors because a later stage can
// stop reading early, the producer is still responsible for keeping any
// producer-owned state, metrics, cursors, or other side effects
// consistent before returning the ignored error.
func IgnoreError(s Stage, em ErrorMatcher) Stage {
return FilterError(s,
func(err error) error {
Expand Down Expand Up @@ -128,7 +134,11 @@ var (

// IsPipeError is an `ErrorMatcher` that matches a few different
// errors that typically result if a stage writes to a subsequent
// stage that has stopped reading from its stdin. Use like
// stage that has stopped reading from its stdin. This is commonly
// useful with `IgnoreError` for stateless producer stages whose only
// job is writing output. Stateful producers should continue any
// producer-owned state updates needed for consistency before
// returning the pipe error for `IgnoreError` to suppress. Use like
//
// p.Add(IgnoreError(someStage, IsPipeError))
IsPipeError = AnyError(IsSIGPIPE, IsEPIPE, IsErrClosedPipe)
Expand Down
21 changes: 14 additions & 7 deletions pipe/function.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,15 @@ import (
// Neither `stdin` nor `stdout` are necessarily buffered. If the
// `StageFunc` requires buffering, it needs to arrange that itself.
//
// A later stage can stop reading before this function has written all
// of its output. In that case, writes to `stdout` can fail with an
// error matched by `IsPipeError`. If the function only writes output
// and is otherwise stateless, callers can usually wrap the stage with
// `IgnoreError(stage, IsPipeError)`. If the function also updates
// producer-owned state, metrics, cursors, or other side effects that
// depend on how much output was produced, it should bring those side
// effects to a consistent point before returning the write error.
//
// A `StageFunc` is run in a separate goroutine, so it must be careful
// to synchronize any data access aside from reading and writing.
type StageFunc func(ctx context.Context, env Env, stdin io.Reader, stdout io.Writer) error
Expand Down Expand Up @@ -78,19 +87,17 @@ func (s *goStage) Requirements() StageRequirements {

func (s *goStage) Start(
ctx context.Context, opts StageOptions,
stdin io.Reader, closeStdin bool,
stdout io.Writer, closeStdout bool,
stdin InputStream, stdout OutputStream,
) error {
stdinCloser := ownedCloser(stdin, closeStdin)
stdoutCloser := ownedCloser(stdout, closeStdout)

r := stdin
r := stdin.Reader()
stdinCloser := stdin.Closer()
if r == nil {
// treat nil as empty input.
r = strings.NewReader("")
}

w := stdout
w := stdout.Writer()
stdoutCloser := stdout.Closer()
if w == nil {
// treat nil output as /dev/null
w = io.Discard
Expand Down
15 changes: 5 additions & 10 deletions pipe/pipe_matching_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,17 +104,12 @@ func (s *pipeSniffingStage) Requirements() pipe.StageRequirements {

func (s *pipeSniffingStage) Start(
_ context.Context, _ pipe.StageOptions,
stdin io.Reader, closeStdin bool,
stdout io.Writer, closeStdout bool,
stdin pipe.InputStream, stdout pipe.OutputStream,
) error {
s.stdin = stdin
if closeStdin {
_ = stdin.(io.Closer).Close()
}
s.stdout = stdout
if closeStdout {
_ = stdout.(io.Closer).Close()
}
s.stdin = stdin.Reader()
stdin.Close()
s.stdout = stdout.Writer()
stdout.Close()
return nil
}

Expand Down
Loading
Loading