Skip to content

Commit c6ecf70

Browse files
RossHammerandreynering
authored andcommitted
Adding a --concurrency (-C) flag
1 parent f0cd7d2 commit c6ecf70

File tree

6 files changed

+119
-22
lines changed

6 files changed

+119
-22
lines changed

CHANGELOG.md

+6
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,11 @@
11
# Changelog
22

3+
## Unreleased
4+
5+
- Add a `--concurrency` (alias `-C`) flag, to limit the number of tasks that
6+
run concurrently. This is useful for heavy workloads.
7+
([#345](https://github.com/go-task/task/pull/345)).
8+
39
## v3.2.2 - 2021-01-12
410

511
- Improve performance of `--list` and `--summary` by skipping running shell

cmd/task/task.go

+13-10
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,7 @@ func main() {
6565
dry bool
6666
summary bool
6767
parallel bool
68+
concurrency int
6869
dir string
6970
entrypoint string
7071
output string
@@ -87,6 +88,7 @@ func main() {
8788
pflag.StringVarP(&entrypoint, "taskfile", "t", "", `choose which Taskfile to run. Defaults to "Taskfile.yml"`)
8889
pflag.StringVarP(&output, "output", "o", "", "sets output style: [interleaved|group|prefixed]")
8990
pflag.BoolVarP(&color, "color", "c", true, "colored output")
91+
pflag.IntVarP(&concurrency, "concurrency", "C", 0, "limit number tasks to run concurrently")
9092
pflag.Parse()
9193

9294
if versionFlag {
@@ -122,16 +124,17 @@ func main() {
122124
}
123125

124126
e := task.Executor{
125-
Force: force,
126-
Watch: watch,
127-
Verbose: verbose,
128-
Silent: silent,
129-
Dir: dir,
130-
Dry: dry,
131-
Entrypoint: entrypoint,
132-
Summary: summary,
133-
Parallel: parallel,
134-
Color: color,
127+
Force: force,
128+
Watch: watch,
129+
Verbose: verbose,
130+
Silent: silent,
131+
Dir: dir,
132+
Dry: dry,
133+
Entrypoint: entrypoint,
134+
Summary: summary,
135+
Parallel: parallel,
136+
Color: color,
137+
Concurrency: concurrency,
135138

136139
Stdin: os.Stdin,
137140
Stdout: os.Stdout,

concurrency.go

+25
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
package task
2+
3+
func (e *Executor) acquireConcurrencyLimit() func() {
4+
if e.concurrencySemaphore == nil {
5+
return emptyFunc
6+
}
7+
8+
e.concurrencySemaphore <- struct{}{}
9+
return func() {
10+
<-e.concurrencySemaphore
11+
}
12+
}
13+
14+
func (e *Executor) releaseConcurrencyLimit() func() {
15+
if e.concurrencySemaphore == nil {
16+
return emptyFunc
17+
}
18+
19+
<-e.concurrencySemaphore
20+
return func() {
21+
e.concurrencySemaphore <- struct{}{}
22+
}
23+
}
24+
25+
func emptyFunc() {}

task.go

+27-12
Original file line numberDiff line numberDiff line change
@@ -32,16 +32,17 @@ const (
3232
type Executor struct {
3333
Taskfile *taskfile.Taskfile
3434

35-
Dir string
36-
Entrypoint string
37-
Force bool
38-
Watch bool
39-
Verbose bool
40-
Silent bool
41-
Dry bool
42-
Summary bool
43-
Parallel bool
44-
Color bool
35+
Dir string
36+
Entrypoint string
37+
Force bool
38+
Watch bool
39+
Verbose bool
40+
Silent bool
41+
Dry bool
42+
Summary bool
43+
Parallel bool
44+
Color bool
45+
Concurrency int
4546

4647
Stdin io.Reader
4748
Stdout io.Writer
@@ -54,8 +55,9 @@ type Executor struct {
5455

5556
taskvars *taskfile.Vars
5657

57-
taskCallCount map[string]*int32
58-
mkdirMutexMap map[string]*sync.Mutex
58+
concurrencySemaphore chan struct{}
59+
taskCallCount map[string]*int32
60+
mkdirMutexMap map[string]*sync.Mutex
5961
}
6062

6163
// Run runs Task
@@ -247,6 +249,10 @@ func (e *Executor) Setup() error {
247249
e.taskCallCount[k] = new(int32)
248250
e.mkdirMutexMap[k] = &sync.Mutex{}
249251
}
252+
253+
if e.Concurrency > 0 {
254+
e.concurrencySemaphore = make(chan struct{}, e.Concurrency)
255+
}
250256
return nil
251257
}
252258

@@ -260,6 +266,9 @@ func (e *Executor) RunTask(ctx context.Context, call taskfile.Call) error {
260266
return &MaximumTaskCallExceededError{task: call.Task}
261267
}
262268

269+
release := e.acquireConcurrencyLimit()
270+
defer release()
271+
263272
if err := e.runDeps(ctx, t); err != nil {
264273
return err
265274
}
@@ -324,6 +333,9 @@ func (e *Executor) mkdir(t *taskfile.Task) error {
324333
func (e *Executor) runDeps(ctx context.Context, t *taskfile.Task) error {
325334
g, ctx := errgroup.WithContext(ctx)
326335

336+
reacquire := e.releaseConcurrencyLimit()
337+
defer reacquire()
338+
327339
for _, d := range t.Deps {
328340
d := d
329341

@@ -344,6 +356,9 @@ func (e *Executor) runCommand(ctx context.Context, t *taskfile.Task, call taskfi
344356

345357
switch {
346358
case cmd.Task != "":
359+
reacquire := e.releaseConcurrencyLimit()
360+
defer reacquire()
361+
347362
err := e.RunTask(ctx, taskfile.Call{Task: cmd.Task, Vars: cmd.Vars})
348363
if err != nil {
349364
return err

task_test.go

+16
Original file line numberDiff line numberDiff line change
@@ -171,6 +171,22 @@ func TestVarsInvalidTmpl(t *testing.T) {
171171
assert.EqualError(t, e.Run(context.Background(), taskfile.Call{Task: target}), expectError, "e.Run(target)")
172172
}
173173

174+
func TestConcurrency(t *testing.T) {
175+
const (
176+
dir = "testdata/concurrency"
177+
target = "default"
178+
)
179+
180+
e := &task.Executor{
181+
Dir: dir,
182+
Stdout: ioutil.Discard,
183+
Stderr: ioutil.Discard,
184+
Concurrency: 1,
185+
}
186+
assert.NoError(t, e.Setup(), "e.Setup()")
187+
assert.NoError(t, e.Run(context.Background(), taskfile.Call{Task: target}), "e.Run(target)")
188+
}
189+
174190
func TestParams(t *testing.T) {
175191
tt := fileContentTest{
176192
Dir: "testdata/params",

testdata/concurrency/Taskfile.yml

+32
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
version: '2'
2+
3+
tasks:
4+
default:
5+
deps:
6+
- t1
7+
8+
t1:
9+
deps:
10+
- t3
11+
- t4
12+
cmds:
13+
- task: t2
14+
- echo done 1
15+
t2:
16+
deps:
17+
- t5
18+
- t6
19+
cmds:
20+
- echo done 2
21+
t3:
22+
cmds:
23+
- echo done 3
24+
t4:
25+
cmds:
26+
- echo done 4
27+
t5:
28+
cmds:
29+
- echo done 5
30+
t6:
31+
cmds:
32+
- echo done 6

0 commit comments

Comments
 (0)