-
-
Notifications
You must be signed in to change notification settings - Fork 537
/
Copy pathprocessor.go
129 lines (106 loc) · 2.97 KB
/
processor.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
package exec
import (
"bytes"
"fmt"
"io"
"sync"
"github.com/docker/docker/api/types/container"
"github.com/docker/docker/pkg/stdcopy"
)
// ProcessOptions defines options applicable to the reader processor
type ProcessOptions struct {
ExecConfig container.ExecOptions
Reader io.Reader
}
// NewProcessOptions returns a new ProcessOptions instance
// with the given command and default options:
// - detach: false
// - attach stdout: true
// - attach stderr: true
func NewProcessOptions(cmd []string) *ProcessOptions {
return &ProcessOptions{
ExecConfig: container.ExecOptions{
Cmd: cmd,
Detach: false,
AttachStdout: true,
AttachStderr: true,
},
}
}
// ProcessOption defines a common interface to modify the reader processor
// These options can be passed to the Exec function in a variadic way to customize the returned Reader instance
type ProcessOption interface {
Apply(opts *ProcessOptions)
}
type ProcessOptionFunc func(opts *ProcessOptions)
func (fn ProcessOptionFunc) Apply(opts *ProcessOptions) {
fn(opts)
}
func WithUser(user string) ProcessOption {
return ProcessOptionFunc(func(opts *ProcessOptions) {
opts.ExecConfig.User = user
})
}
func WithWorkingDir(workingDir string) ProcessOption {
return ProcessOptionFunc(func(opts *ProcessOptions) {
opts.ExecConfig.WorkingDir = workingDir
})
}
func WithEnv(env []string) ProcessOption {
return ProcessOptionFunc(func(opts *ProcessOptions) {
opts.ExecConfig.Env = env
})
}
// safeBuffer is a goroutine safe buffer.
type safeBuffer struct {
mtx sync.Mutex
buf bytes.Buffer
err error
}
// Error sets an error for the next read.
func (sb *safeBuffer) Error(err error) {
sb.mtx.Lock()
defer sb.mtx.Unlock()
sb.err = err
}
// Write writes p to the buffer.
// It is safe for concurrent use by multiple goroutines.
func (sb *safeBuffer) Write(p []byte) (n int, err error) {
sb.mtx.Lock()
defer sb.mtx.Unlock()
return sb.buf.Write(p)
}
// Read reads up to len(p) bytes into p from the buffer.
// It is safe for concurrent use by multiple goroutines.
func (sb *safeBuffer) Read(p []byte) (n int, err error) {
sb.mtx.Lock()
defer sb.mtx.Unlock()
if sb.err != nil {
return 0, sb.err
}
return sb.buf.Read(p)
}
// Multiplexed returns a [ProcessOption] that configures the command execution
// to combine stdout and stderr into a single stream without Docker's multiplexing headers.
func Multiplexed() ProcessOption {
return ProcessOptionFunc(func(opts *ProcessOptions) {
// returning fast to bypass those options with a nil reader,
// which could be the case when other options are used
// to configure the exec creation.
if opts.Reader == nil {
return
}
done := make(chan struct{})
var outBuff safeBuffer
var errBuff safeBuffer
go func() {
defer close(done)
if _, err := stdcopy.StdCopy(&outBuff, &errBuff, opts.Reader); err != nil {
outBuff.Error(fmt.Errorf("copying output: %w", err))
return
}
}()
<-done
opts.Reader = io.MultiReader(&outBuff, &errBuff)
})
}