Skip to content

Commit 1597b75

Browse files
authored
Fix graceful shutdown "broken pipe" error (#527)
1 parent b1695b1 commit 1597b75

File tree

5 files changed

+94
-21
lines changed

5 files changed

+94
-21
lines changed

tfexec/cmd.go

Lines changed: 24 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -252,20 +252,30 @@ func mergeWriters(writers ...io.Writer) io.Writer {
252252
return io.MultiWriter(compact...)
253253
}
254254

255-
func writeOutput(ctx context.Context, r io.ReadCloser, w io.Writer) error {
256-
// ReadBytes will block until bytes are read, which can cause a delay in
257-
// returning even if the command's context has been canceled. Use a separate
258-
// goroutine to prompt ReadBytes to return on cancel
259-
closeCtx, closeCancel := context.WithCancel(ctx)
260-
defer closeCancel()
261-
go func() {
262-
select {
263-
case <-ctx.Done():
264-
r.Close()
265-
case <-closeCtx.Done():
266-
return
267-
}
268-
}()
255+
func (tf *Terraform) writeOutput(ctx context.Context, r io.ReadCloser, w io.Writer) error {
256+
// ReadBytes will block until all bytes are read, which can cause a delay in
257+
// returning even if the command's context has been canceled. When the
258+
// context is canceled, Terraform receives an interrupt signal and will exit
259+
// after a short while. Once the process has exited, the stdio pipes will
260+
// close, allowing this function to return.
261+
262+
if tf.enableLegacyPipeClosing {
263+
// Rather than wait for the stdio pipes to close naturally, we can close
264+
// them ourselves when the command's context is canceled, causing the
265+
// process to exit immediately. This works around a bug in Terraform
266+
// < v1.1 that would otherwise leave the process (and this function)
267+
// hanging after the context is canceled.
268+
closeCtx, closeCancel := context.WithCancel(ctx)
269+
defer closeCancel()
270+
go func() {
271+
select {
272+
case <-ctx.Done():
273+
r.Close()
274+
case <-closeCtx.Done():
275+
return
276+
}
277+
}()
278+
}
269279

270280
buf := bufio.NewReader(r)
271281
for {

tfexec/cmd_default.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -59,13 +59,13 @@ func (tf *Terraform) runTerraformCmd(ctx context.Context, cmd *exec.Cmd) error {
5959
wg.Add(1)
6060
go func() {
6161
defer wg.Done()
62-
errStdout = writeOutput(ctx, stdoutPipe, stdoutWriter)
62+
errStdout = tf.writeOutput(ctx, stdoutPipe, stdoutWriter)
6363
}()
6464

6565
wg.Add(1)
6666
go func() {
6767
defer wg.Done()
68-
errStderr = writeOutput(ctx, stderrPipe, stderrWriter)
68+
errStderr = tf.writeOutput(ctx, stderrPipe, stderrWriter)
6969
}()
7070

7171
// Reads from pipes must be completed before calling cmd.Wait(). Otherwise

tfexec/cmd_linux.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -64,13 +64,13 @@ func (tf *Terraform) runTerraformCmd(ctx context.Context, cmd *exec.Cmd) error {
6464
wg.Add(1)
6565
go func() {
6666
defer wg.Done()
67-
errStdout = writeOutput(ctx, stdoutPipe, stdoutWriter)
67+
errStdout = tf.writeOutput(ctx, stdoutPipe, stdoutWriter)
6868
}()
6969

7070
wg.Add(1)
7171
go func() {
7272
defer wg.Done()
73-
errStderr = writeOutput(ctx, stderrPipe, stderrWriter)
73+
errStderr = tf.writeOutput(ctx, stderrPipe, stderrWriter)
7474
}()
7575

7676
// Reads from pipes must be completed before calling cmd.Wait(). Otherwise

tfexec/internal/e2etest/errors_test.go

Lines changed: 53 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,8 @@ import (
2323

2424
var (
2525
protocol5MinVersion = version.Must(version.NewVersion("0.12.0"))
26+
27+
gracefulShutdownMinVersion = version.Must(version.NewVersion("1.1.0"))
2628
)
2729

2830
func TestUnparsedError(t *testing.T) {
@@ -168,9 +170,47 @@ func TestContext_sleepTimeoutExpired(t *testing.T) {
168170
t.Skip("the ability to interrupt an apply was added in protocol 5.0 in Terraform 0.12, so test is not valid")
169171
}
170172

171-
// sleep will not react to SIGINT
172-
// This ensures that process is killed within the expected time limit.
173-
tf.SetWaitDelay(500 * time.Millisecond)
173+
if !tfv.GreaterThanOrEqual(gracefulShutdownMinVersion) {
174+
// Versions < 1.1 will not react to SIGINT.
175+
// This ensures the process is killed within the expected time limit.
176+
tf.SetEnableLegacyPipeClosing(true)
177+
tf.SetWaitDelay(500 * time.Millisecond)
178+
}
179+
180+
err := tf.Init(context.Background())
181+
if err != nil {
182+
t.Fatalf("err during init: %s", err)
183+
}
184+
185+
ctx := context.Background()
186+
ctx, cancel := context.WithTimeout(ctx, 5*time.Second)
187+
defer cancel()
188+
189+
errCh := make(chan error)
190+
go func() {
191+
err = tf.Apply(ctx)
192+
if err != nil {
193+
errCh <- err
194+
}
195+
}()
196+
197+
select {
198+
case err := <-errCh:
199+
if !errors.Is(err, context.DeadlineExceeded) {
200+
t.Fatalf("expected context.DeadlineExceeded, got %T %s", err, err)
201+
}
202+
case <-time.After(time.Second * 10):
203+
t.Fatal("terraform apply should have canceled and returned in ~5s")
204+
}
205+
})
206+
}
207+
208+
func TestContext_sleepGracefulShutdown(t *testing.T) {
209+
runTest(t, "sleep", func(t *testing.T, tfv *version.Version, tf *tfexec.Terraform) {
210+
// only testing versions that can shut down gracefully
211+
if !tfv.GreaterThanOrEqual(gracefulShutdownMinVersion) {
212+
t.Skip("graceful shutdown was added in Terraform 1.1, so test is not valid")
213+
}
174214

175215
err := tf.Init(context.Background())
176216
if err != nil {
@@ -194,6 +234,16 @@ func TestContext_sleepTimeoutExpired(t *testing.T) {
194234
if !errors.Is(err, context.DeadlineExceeded) {
195235
t.Fatalf("expected context.DeadlineExceeded, got %T %s", err, err)
196236
}
237+
var ee *exec.ExitError
238+
if !errors.As(err, &ee) {
239+
t.Fatalf("expected exec.ExitError, got %T, %s", err, err)
240+
}
241+
if !ee.Exited() {
242+
t.Fatalf("expected process to have exited, but it did not (%s)", ee.ProcessState.String())
243+
}
244+
if ee.ExitCode() != 1 {
245+
t.Fatalf("expected exit code 1, got %d", ee.ExitCode())
246+
}
197247
case <-time.After(time.Second * 10):
198248
t.Fatal("terraform apply should have canceled and returned in ~5s")
199249
}

tfexec/terraform.go

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,9 @@ type Terraform struct {
7373
// waitDelay represents the WaitDelay field of the [exec.Cmd] of Terraform
7474
waitDelay time.Duration
7575

76+
// enableLegacyPipeClosing closes the stdout/stderr pipes before calling [exec.Cmd.Wait]
77+
enableLegacyPipeClosing bool
78+
7679
versionLock sync.Mutex
7780
execVersion *version.Version
7881
provVersions map[string]*version.Version
@@ -232,6 +235,16 @@ func (tf *Terraform) SetWaitDelay(delay time.Duration) error {
232235
return nil
233236
}
234237

238+
// SetEnableLegacyPipeClosing causes the library to "force-close" stdio pipes.
239+
// This works around a bug in Terraform < v1.1 that would otherwise leave
240+
// the process (and caller) hanging after graceful shutdown.
241+
//
242+
// This option can be safely ignored (set to false) with Terraform 1.1+.
243+
func (tf *Terraform) SetEnableLegacyPipeClosing(enabled bool) error {
244+
tf.enableLegacyPipeClosing = enabled
245+
return nil
246+
}
247+
235248
// WorkingDir returns the working directory for Terraform.
236249
func (tf *Terraform) WorkingDir() string {
237250
return tf.workingDir

0 commit comments

Comments
 (0)