Skip to content

Commit bfd1b55

Browse files
authored
enhance(local executor): print to stdout via client field (#339)
1 parent 6c8171c commit bfd1b55

File tree

11 files changed

+111
-32
lines changed

11 files changed

+111
-32
lines changed

cmd/vela-worker/exec.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,7 @@ func (w *Worker) exec(index int) error {
6969
// https://godoc.org/github.com/go-vela/worker/executor#New
7070
_executor, err := executor.New(&executor.Setup{
7171
Logger: logger,
72+
Mock: w.Config.Mock,
7273
Driver: w.Config.Executor.Driver,
7374
LogMethod: w.Config.Executor.LogMethod,
7475
MaxLogSize: w.Config.Executor.MaxLogSize,

cmd/vela-worker/worker.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@ type (
4646

4747
// Config represents the worker configuration.
4848
Config struct {
49+
Mock bool // Mock should only be true for tests
4950
API *API
5051
Build *Build
5152
CheckIn time.Duration

executor/local/api.go

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,6 @@ package local
77
import (
88
"context"
99
"fmt"
10-
"os"
1110
"time"
1211

1312
"github.com/go-vela/types/constants"
@@ -193,7 +192,7 @@ func (c *client) CancelBuild() (*library.Build, error) {
193192

194193
err = c.DestroyBuild(context.Background())
195194
if err != nil {
196-
fmt.Fprintln(os.Stdout, "unable to destroy build:", err)
195+
fmt.Fprintln(c.stdout, "unable to destroy build:", err)
197196
}
198197

199198
return b, nil

executor/local/build.go

Lines changed: 21 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,6 @@ package local
77
import (
88
"context"
99
"fmt"
10-
"os"
1110
"sync"
1211
"time"
1312

@@ -97,7 +96,7 @@ func (c *client) PlanBuild(ctx context.Context) error {
9796
}
9897

9998
// output init progress to stdout
100-
fmt.Fprintln(os.Stdout, _pattern, "> Inspecting runtime network...")
99+
fmt.Fprintln(c.stdout, _pattern, "> Inspecting runtime network...")
101100

102101
// inspect the runtime network for the pipeline
103102
network, err := c.Runtime.InspectNetwork(ctx, c.pipeline)
@@ -107,7 +106,7 @@ func (c *client) PlanBuild(ctx context.Context) error {
107106
}
108107

109108
// output the network information to stdout
110-
fmt.Fprintln(os.Stdout, _pattern, string(network))
109+
fmt.Fprintln(c.stdout, _pattern, string(network))
111110

112111
// create the runtime volume for the pipeline
113112
err = c.Runtime.CreateVolume(ctx, c.pipeline)
@@ -117,7 +116,7 @@ func (c *client) PlanBuild(ctx context.Context) error {
117116
}
118117

119118
// output init progress to stdout
120-
fmt.Fprintln(os.Stdout, _pattern, "> Inspecting runtime volume...")
119+
fmt.Fprintln(c.stdout, _pattern, "> Inspecting runtime volume...")
121120

122121
// inspect the runtime volume for the pipeline
123122
volume, err := c.Runtime.InspectVolume(ctx, c.pipeline)
@@ -127,7 +126,7 @@ func (c *client) PlanBuild(ctx context.Context) error {
127126
}
128127

129128
// output the volume information to stdout
130-
fmt.Fprintln(os.Stdout, _pattern, string(volume))
129+
fmt.Fprintln(c.stdout, _pattern, string(volume))
131130

132131
return c.err
133132
}
@@ -162,7 +161,7 @@ func (c *client) AssembleBuild(ctx context.Context) error {
162161
}
163162

164163
// output init progress to stdout
165-
fmt.Fprintln(os.Stdout, _pattern, "> Preparing service images...")
164+
fmt.Fprintln(c.stdout, _pattern, "> Preparing service images...")
166165

167166
// create the services for the pipeline
168167
for _, _service := range c.pipeline.Services {
@@ -183,11 +182,11 @@ func (c *client) AssembleBuild(ctx context.Context) error {
183182
}
184183

185184
// output the image information to stdout
186-
fmt.Fprintln(os.Stdout, _pattern, string(image))
185+
fmt.Fprintln(c.stdout, _pattern, string(image))
187186
}
188187

189188
// output init progress to stdout
190-
fmt.Fprintln(os.Stdout, _pattern, "> Preparing stage images...")
189+
fmt.Fprintln(c.stdout, _pattern, "> Preparing stage images...")
191190

192191
// create the stages for the pipeline
193192
for _, _stage := range c.pipeline.Stages {
@@ -206,7 +205,7 @@ func (c *client) AssembleBuild(ctx context.Context) error {
206205
}
207206

208207
// output init progress to stdout
209-
fmt.Fprintln(os.Stdout, _pattern, "> Preparing step images...")
208+
fmt.Fprintln(c.stdout, _pattern, "> Preparing step images...")
210209

211210
// create the steps for the pipeline
212211
for _, _step := range c.pipeline.Steps {
@@ -229,11 +228,11 @@ func (c *client) AssembleBuild(ctx context.Context) error {
229228
}
230229

231230
// output the image information to stdout
232-
fmt.Fprintln(os.Stdout, _pattern, string(image))
231+
fmt.Fprintln(c.stdout, _pattern, string(image))
233232
}
234233

235234
// output a new line for readability to stdout
236-
fmt.Fprintln(os.Stdout, "")
235+
fmt.Fprintln(c.stdout, "")
237236

238237
// assemble runtime build just before any containers execute
239238
c.err = c.Runtime.AssembleBuild(ctx, c.pipeline)
@@ -353,14 +352,14 @@ func (c *client) StreamBuild(ctx context.Context) error {
353352
streams, streamCtx := errgroup.WithContext(ctx)
354353

355354
defer func() {
356-
fmt.Fprintln(os.Stdout, "waiting for stream functions to return")
355+
fmt.Fprintln(c.stdout, "waiting for stream functions to return")
357356

358357
err := streams.Wait()
359358
if err != nil {
360-
fmt.Fprintln(os.Stdout, "error in a stream request:", err)
359+
fmt.Fprintln(c.stdout, "error in a stream request:", err)
361360
}
362361

363-
fmt.Fprintln(os.Stdout, "all stream functions have returned")
362+
fmt.Fprintln(c.stdout, "all stream functions have returned")
364363
}()
365364

366365
// allow the runtime to do log/event streaming setup at build-level
@@ -374,11 +373,11 @@ func (c *client) StreamBuild(ctx context.Context) error {
374373
select {
375374
case req := <-c.streamRequests:
376375
streams.Go(func() error {
377-
fmt.Fprintf(os.Stdout, "[%s: %s] > Streaming container '%s'...\n", req.Key, req.Container.Name, req.Container.ID)
376+
fmt.Fprintf(c.stdout, "[%s: %s] > Streaming container '%s'...\n", req.Key, req.Container.Name, req.Container.ID)
378377

379378
err := req.Stream(streamCtx, req.Container)
380379
if err != nil {
381-
fmt.Fprintln(os.Stdout, "error streaming:", err)
380+
fmt.Fprintln(c.stdout, "error streaming:", err)
382381
}
383382

384383
return nil
@@ -399,7 +398,7 @@ func (c *client) DestroyBuild(ctx context.Context) error {
399398
err = c.Runtime.RemoveBuild(ctx, c.pipeline)
400399
if err != nil {
401400
// output the error information to stdout
402-
fmt.Fprintln(os.Stdout, "unable to destroy runtime build:", err)
401+
fmt.Fprintln(c.stdout, "unable to destroy runtime build:", err)
403402
}
404403
}()
405404

@@ -414,7 +413,7 @@ func (c *client) DestroyBuild(ctx context.Context) error {
414413
err = c.DestroyStep(ctx, _step)
415414
if err != nil {
416415
// output the error information to stdout
417-
fmt.Fprintln(os.Stdout, "unable to destroy step:", err)
416+
fmt.Fprintln(c.stdout, "unable to destroy step:", err)
418417
}
419418
}
420419

@@ -429,7 +428,7 @@ func (c *client) DestroyBuild(ctx context.Context) error {
429428
err = c.DestroyStage(ctx, _stage)
430429
if err != nil {
431430
// output the error information to stdout
432-
fmt.Fprintln(os.Stdout, "unable to destroy stage:", err)
431+
fmt.Fprintln(c.stdout, "unable to destroy stage:", err)
433432
}
434433
}
435434

@@ -439,22 +438,22 @@ func (c *client) DestroyBuild(ctx context.Context) error {
439438
err = c.DestroyService(ctx, _service)
440439
if err != nil {
441440
// output the error information to stdout
442-
fmt.Fprintln(os.Stdout, "unable to destroy service:", err)
441+
fmt.Fprintln(c.stdout, "unable to destroy service:", err)
443442
}
444443
}
445444

446445
// remove the runtime volume for the pipeline
447446
err = c.Runtime.RemoveVolume(ctx, c.pipeline)
448447
if err != nil {
449448
// output the error information to stdout
450-
fmt.Fprintln(os.Stdout, "unable to destroy runtime volume:", err)
449+
fmt.Fprintln(c.stdout, "unable to destroy runtime volume:", err)
451450
}
452451

453452
// remove the runtime network for the pipeline
454453
err = c.Runtime.RemoveNetwork(ctx, c.pipeline)
455454
if err != nil {
456455
// output the error information to stdout
457-
fmt.Fprintln(os.Stdout, "unable to destroy runtime network:", err)
456+
fmt.Fprintln(c.stdout, "unable to destroy runtime network:", err)
458457
}
459458

460459
return err

executor/local/local.go

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
package local
66

77
import (
8+
"os"
89
"reflect"
910
"sync"
1011

@@ -33,9 +34,24 @@ type (
3334
user *library.User
3435
err error
3536
streamRequests chan message.StreamRequest
37+
38+
// internal field partially exported for tests
39+
stdout *os.File
40+
mockStdoutReader *os.File
41+
}
42+
43+
// MockedClient is for internal use to facilitate testing the local executor.
44+
MockedClient interface {
45+
MockStdout() *os.File
3646
}
3747
)
3848

49+
// MockStdout is for internal use to facilitate testing the local executor.
50+
// MockStdout returns a reader over a mocked Stdout.
51+
func (c *client) MockStdout() *os.File {
52+
return c.mockStdoutReader
53+
}
54+
3955
// equal returns true if the other client is the equivalent.
4056
func Equal(a, b *client) bool {
4157
// handle any nil comparisons
@@ -64,6 +80,9 @@ func New(opts ...Opt) (*client, error) {
6480
// create new local client
6581
c := new(client)
6682

83+
// Add stdout by default
84+
c.stdout = os.Stdout
85+
6786
// instantiate streamRequests channel (which may be overridden using withStreamRequests()).
6887
c.streamRequests = make(chan message.StreamRequest)
6988

executor/local/opts.go

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ package local
66

77
import (
88
"fmt"
9+
"os"
910

1011
"github.com/go-vela/worker/internal/message"
1112
"github.com/go-vela/worker/runtime"
@@ -121,6 +122,28 @@ func WithVersion(version string) Opt {
121122
}
122123
}
123124

125+
// WithMockStdout adds a mock stdout writer to the client if mock is true.
126+
// If mock is true, then you must use a goroutine to read from
127+
// MockStdout as quickly as possible, or writing to stdout will hang.
128+
func WithMockStdout(mock bool) Opt {
129+
return func(c *client) error {
130+
if !mock {
131+
return nil
132+
}
133+
134+
// New() sets c.stdout = os.stdout, replace it if a mock is required.
135+
reader, writer, err := os.Pipe()
136+
if err != nil {
137+
return err
138+
}
139+
140+
c.mockStdoutReader = reader
141+
c.stdout = writer
142+
143+
return nil
144+
}
145+
}
146+
124147
// withStreamRequests sets the streamRequests channel in the executor client for Linux
125148
// (primarily used for tests).
126149
func withStreamRequests(s chan message.StreamRequest) Opt {

executor/local/opts_test.go

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -331,3 +331,39 @@ func TestLocal_Opt_WithVersion(t *testing.T) {
331331
})
332332
}
333333
}
334+
335+
func TestLocal_Opt_WithMockStdout(t *testing.T) {
336+
// setup tests
337+
tests := []struct {
338+
name string
339+
mock bool
340+
wantNil bool
341+
}{
342+
{
343+
name: "standard",
344+
mock: false,
345+
wantNil: true,
346+
},
347+
{
348+
name: "mocked",
349+
mock: true,
350+
wantNil: false,
351+
},
352+
}
353+
354+
// run tests
355+
for _, test := range tests {
356+
t.Run(test.name, func(t *testing.T) {
357+
_engine, err := New(
358+
WithMockStdout(test.mock),
359+
)
360+
if err != nil {
361+
t.Errorf("unable to create local engine: %v", err)
362+
}
363+
364+
if !reflect.DeepEqual(_engine.MockStdout() == nil, test.wantNil) {
365+
t.Errorf("WithMockStdout is %v, wantNil = %v", _engine.MockStdout() == nil, test.wantNil)
366+
}
367+
})
368+
}
369+
}

executor/local/service.go

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,6 @@ import (
88
"bufio"
99
"context"
1010
"fmt"
11-
"os"
1211
"time"
1312

1413
"github.com/go-vela/worker/internal/message"
@@ -125,7 +124,7 @@ func (c *client) StreamService(ctx context.Context, ctn *pipeline.Container) err
125124
// scan entire container output
126125
for scanner.Scan() {
127126
// ensure we output to stdout
128-
fmt.Fprintln(os.Stdout, _pattern, scanner.Text())
127+
fmt.Fprintln(c.stdout, _pattern, scanner.Text())
129128
}
130129

131130
return scanner.Err()

executor/local/stage.go

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,6 @@ package local
77
import (
88
"context"
99
"fmt"
10-
"os"
1110
"sync"
1211

1312
"github.com/go-vela/types/pipeline"
@@ -23,7 +22,7 @@ func (c *client) CreateStage(ctx context.Context, s *pipeline.Stage) error {
2322
_pattern := fmt.Sprintf(stagePattern, c.init.Name, c.init.Name)
2423

2524
// output init progress to stdout
26-
fmt.Fprintln(os.Stdout, _pattern, "> Preparing step images for stage", s.Name, "...")
25+
fmt.Fprintln(c.stdout, _pattern, "> Preparing step images for stage", s.Name, "...")
2726

2827
// create the steps for the stage
2928
for _, _step := range s.Steps {
@@ -43,7 +42,7 @@ func (c *client) CreateStage(ctx context.Context, s *pipeline.Stage) error {
4342
}
4443

4544
// output the image information to stdout
46-
fmt.Fprintln(os.Stdout, _pattern, string(image))
45+
fmt.Fprintln(c.stdout, _pattern, string(image))
4746
}
4847

4948
return nil
@@ -121,7 +120,7 @@ func (c *client) DestroyStage(ctx context.Context, s *pipeline.Stage) error {
121120
// destroy the step
122121
err = c.DestroyStep(ctx, _step)
123122
if err != nil {
124-
fmt.Fprintln(os.Stdout, "unable to destroy step: ", err)
123+
fmt.Fprintln(c.stdout, "unable to destroy step: ", err)
125124
}
126125
}
127126

executor/local/step.go

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,6 @@ import (
88
"bufio"
99
"context"
1010
"fmt"
11-
"os"
1211
"time"
1312

1413
"github.com/go-vela/types/constants"
@@ -164,7 +163,7 @@ func (c *client) StreamStep(ctx context.Context, ctn *pipeline.Container) error
164163
// scan entire container output
165164
for scanner.Scan() {
166165
// ensure we output to stdout
167-
fmt.Fprintln(os.Stdout, _pattern, scanner.Text())
166+
fmt.Fprintln(c.stdout, _pattern, scanner.Text())
168167
}
169168

170169
return scanner.Err()

0 commit comments

Comments
 (0)