diff --git a/.github/workflows/bench.yml b/.github/workflows/bench.yml index 962cfb0..b695ca9 100644 --- a/.github/workflows/bench.yml +++ b/.github/workflows/bench.yml @@ -11,7 +11,7 @@ jobs: go-bench: strategy: matrix: - go-version: [ '1.20', 'stable' ] + go-version: [ '1.23', 'stable' ] runs-on: ubuntu-latest timeout-minutes: 15 steps: diff --git a/.github/workflows/go.yml b/.github/workflows/go.yml index f190c54..e938d4d 100644 --- a/.github/workflows/go.yml +++ b/.github/workflows/go.yml @@ -14,7 +14,7 @@ jobs: build: strategy: matrix: - go-version: ['1.20', 'stable'] + go-version: ['1.23', 'stable'] runs-on: ubuntu-latest steps: - uses: actions/checkout@v3 diff --git a/.github/workflows/main.yml b/.github/workflows/main.yml index 87e68ae..9e43e80 100644 --- a/.github/workflows/main.yml +++ b/.github/workflows/main.yml @@ -11,7 +11,7 @@ jobs: go-bench: strategy: matrix: - go-version: [ '1.20', 'stable' ] + go-version: [ '1.23', 'stable' ] runs-on: ubuntu-latest timeout-minutes: 15 steps: diff --git a/go.mod b/go.mod index aa49e92..cc7d5b3 100644 --- a/go.mod +++ b/go.mod @@ -1,6 +1,6 @@ module github.com/sourcegraph/conc -go 1.20 +go 1.23 require github.com/stretchr/testify v1.8.1 diff --git a/pool/context_pool.go b/pool/context_pool.go index 85c34e5..d592054 100644 --- a/pool/context_pool.go +++ b/pool/context_pool.go @@ -2,6 +2,7 @@ package pool import ( "context" + "iter" ) // ContextPool is a pool that runs tasks that take a context. @@ -49,6 +50,15 @@ func (p *ContextPool) Go(f func(ctx context.Context) error) { }) } +// GoForEach executes the given function concurrently for each element in the iterator. +// It maintains the order of the input iterator in the execution. This method is useful +// for parallel processing of iterable data structures while preserving their original sequence. +func (p *ContextPool) GoForEach(seq iter.Seq[func(context.Context) error]) { + for f := range seq { + p.Go(f) + } +} + // Wait cleans up all spawned goroutines, propagates any panics, and // returns an error if any of the tasks errored. func (p *ContextPool) Wait() error { diff --git a/pool/context_pool_test.go b/pool/context_pool_test.go index 3f4c1ef..ac2d1a9 100644 --- a/pool/context_pool_test.go +++ b/pool/context_pool_test.go @@ -4,6 +4,7 @@ import ( "context" "errors" "fmt" + "iter" "strconv" "sync/atomic" "testing" @@ -36,6 +37,34 @@ func ExampleContextPool_WithCancelOnError() { // I will cancel all other tasks! } +func ExampleContextPool_GoForEach() { + seq := func() iter.Seq[func(context.Context) error] { + return func(yield func(func(context.Context) error) bool) { + for i := 0; i < 3; i++ { + if !yield(func(ctx context.Context) error { + if i == 2 { + return errors.New("I will cancel all other tasks!") + } + <-ctx.Done() + return nil + }) { + return + } + } + } + } + + p := pool.New(). + WithMaxGoroutines(4). + WithContext(context.Background()). + WithCancelOnError() + p.GoForEach(seq()) + err := p.Wait() + fmt.Println(err) + // Output: + // I will cancel all other tasks! +} + func TestContextPool(t *testing.T) { t.Parallel() diff --git a/pool/error_pool.go b/pool/error_pool.go index e1789e6..d31199d 100644 --- a/pool/error_pool.go +++ b/pool/error_pool.go @@ -3,6 +3,7 @@ package pool import ( "context" "errors" + "iter" "sync" ) @@ -30,6 +31,15 @@ func (p *ErrorPool) Go(f func() error) { }) } +// GoForEach executes the given function concurrently for each element in the iterator. +// It maintains the order of the input iterator in the execution. This method is useful +// for parallel processing of iterable data structures while preserving their original sequence. +func (p *ErrorPool) GoForEach(seq iter.Seq[func() error]) { + for f := range seq { + p.Go(f) + } +} + // Wait cleans up any spawned goroutines, propagating any panics and // returning any errors from tasks. func (p *ErrorPool) Wait() error { diff --git a/pool/error_pool_test.go b/pool/error_pool_test.go index 814f90b..58bcb62 100644 --- a/pool/error_pool_test.go +++ b/pool/error_pool_test.go @@ -3,6 +3,7 @@ package pool_test import ( "errors" "fmt" + "iter" "strconv" "sync/atomic" "testing" @@ -30,6 +31,30 @@ func ExampleErrorPool() { // oh no! } +func ExampleErrorPool_GoForEach() { + seq := func() iter.Seq[func() error] { + return func(yield func(func() error) bool) { + for i := 0; i < 3; i++ { + if !yield(func() error { + if i == 2 { + return errors.New("oh no!") + } + return nil + }) { + return + } + } + } + } + + p := pool.New().WithErrors() + p.GoForEach(seq()) + err := p.Wait() + fmt.Println(err) + // Output: + // oh no! +} + func TestErrorPool(t *testing.T) { t.Parallel() diff --git a/pool/pool_test.go b/pool/pool_test.go index 6791b97..f782c66 100644 --- a/pool/pool_test.go +++ b/pool/pool_test.go @@ -2,6 +2,7 @@ package pool_test import ( "fmt" + "iter" "strconv" "sync/atomic" "testing" @@ -28,6 +29,30 @@ func ExamplePool() { // conc } +func ExamplePool_GoForEach() { + seq := func(count int) iter.Seq[func()] { + return func(yield func(func()) bool) { + for i := 0; i < count; i++ { + if !yield(func() { + fmt.Println("conc") + }) { + return + } + } + } + } + + p := pool.New().WithMaxGoroutines(3) + p.GoForEach(seq(5)) + p.Wait() + // Output: + // conc + // conc + // conc + // conc + // conc +} + func TestPool(t *testing.T) { t.Parallel() diff --git a/pool/result_context_pool.go b/pool/result_context_pool.go index 6bc30dd..2cf9adf 100644 --- a/pool/result_context_pool.go +++ b/pool/result_context_pool.go @@ -2,6 +2,7 @@ package pool import ( "context" + "iter" ) // ResultContextPool is a pool that runs tasks that take a context and return a @@ -28,6 +29,15 @@ func (p *ResultContextPool[T]) Go(f func(context.Context) (T, error)) { }) } +// GoForEach executes the given function concurrently for each element in the iterator. +// It maintains the order of the input iterator in the execution. This method is useful +// for parallel processing of iterable data structures while preserving their original sequence. +func (p *ResultContextPool[T]) GoForEach(seq iter.Seq[func(context.Context) (T, error)]) { + for f := range seq { + p.Go(f) + } +} + // Wait cleans up all spawned goroutines, propagates any panics, and // returns an error if any of the tasks errored. func (p *ResultContextPool[T]) Wait() ([]T, error) { diff --git a/pool/result_context_pool_test.go b/pool/result_context_pool_test.go index fc3b68a..eea97e4 100644 --- a/pool/result_context_pool_test.go +++ b/pool/result_context_pool_test.go @@ -4,6 +4,7 @@ import ( "context" "errors" "fmt" + "iter" "strconv" "sync/atomic" "testing" @@ -244,4 +245,28 @@ func TestResultContextPool(t *testing.T) { require.ErrorIs(t, errs2, err2) require.NotErrorIs(t, errs2, err1) }) + + t.Run("GoForEach", func(t *testing.T) { + t.Parallel() + seq := func() iter.Seq[func(context.Context) (int, error)] { + return func(yield func(func(context.Context) (int, error)) bool) { + if !yield(func(context.Context) (int, error) { return 0, err1 }) { + return + } + if !yield(func(context.Context) (int, error) { return 0, nil }) { + return + } + if !yield(func(context.Context) (int, error) { return 0, err2 }) { + return + } + } + } + + g := pool.NewWithResults[int]().WithErrors().WithContext(context.Background()) + g.GoForEach(seq()) + res, err := g.Wait() + require.Len(t, res, 1) + require.ErrorIs(t, err, err1) + require.ErrorIs(t, err, err2) + }) } diff --git a/pool/result_error_pool.go b/pool/result_error_pool.go index 832cd9b..8c14995 100644 --- a/pool/result_error_pool.go +++ b/pool/result_error_pool.go @@ -2,6 +2,7 @@ package pool import ( "context" + "iter" ) // ResultErrorPool is a pool that executes tasks that return a generic result @@ -30,6 +31,15 @@ func (p *ResultErrorPool[T]) Go(f func() (T, error)) { }) } +// GoForEach executes the given function concurrently for each element in the iterator. +// It maintains the order of the input iterator in the execution. This method is useful +// for parallel processing of iterable data structures while preserving their original sequence. +func (p *ResultErrorPool[T]) GoForEach(seq iter.Seq[func() (T, error)]) { + for f := range seq { + p.Go(f) + } +} + // Wait cleans up any spawned goroutines, propagating any panics and // returning the results and any errors from tasks. func (p *ResultErrorPool[T]) Wait() ([]T, error) { diff --git a/pool/result_pool.go b/pool/result_pool.go index f73a772..4a7f6b8 100644 --- a/pool/result_pool.go +++ b/pool/result_pool.go @@ -2,6 +2,7 @@ package pool import ( "context" + "iter" "sort" "sync" ) @@ -36,6 +37,15 @@ func (p *ResultPool[T]) Go(f func() T) { }) } +// GoForEach executes the given function concurrently for each element in the iterator. +// It maintains the order of the input iterator in the execution. This method is useful +// for parallel processing of iterable data structures while preserving their original sequence. +func (p *ResultPool[T]) GoForEach(seq iter.Seq[func() T]) { + for f := range seq { + p.Go(f) + } +} + // Wait cleans up all spawned goroutines, propagating any panics, and returning // a slice of results from tasks that did not panic. func (p *ResultPool[T]) Wait() []T { diff --git a/pool/result_pool_test.go b/pool/result_pool_test.go index 69b9de4..09c9474 100644 --- a/pool/result_pool_test.go +++ b/pool/result_pool_test.go @@ -2,6 +2,7 @@ package pool_test import ( "fmt" + "iter" "math/rand" "strconv" "sync/atomic" @@ -28,6 +29,27 @@ func ExampleResultPool() { // [0 2 4 6 8 10 12 14 16 18] } +func ExampleResultPool_GoForEach() { + seq := func() iter.Seq[func() int] { + return func(yield func(func() int) bool) { + for i := 0; i < 10; i++ { + if !yield(func() int { + return i * 2 + }) { + return + } + } + } + } + p := pool.NewWithResults[int]() + p.GoForEach(seq()) + res := p.Wait() + fmt.Println(res) + + // Output: + // [0 2 4 6 8 10 12 14 16 18] +} + func TestResultGroup(t *testing.T) { t.Parallel() diff --git a/stream/stream.go b/stream/stream.go index 6b11e90..faf97f6 100644 --- a/stream/stream.go +++ b/stream/stream.go @@ -2,6 +2,7 @@ package stream import ( + "iter" "sync" "github.com/sourcegraph/conc" @@ -86,6 +87,15 @@ func (s *Stream) Go(f Task) { }) } +// GoForEach executes the given function concurrently for each element in the iterator. +// It maintains the order of the input iterator in the execution. This method is useful +// for parallel processing of iterable data structures while preserving their original sequence. +func (s *Stream) GoForEach(seq iter.Seq[Task]) { + for f := range seq { + s.Go(f) + } +} + // Wait signals to the stream that all tasks have been submitted. Wait will // not return until all tasks and callbacks have been run. func (s *Stream) Wait() { diff --git a/stream/stream_test.go b/stream/stream_test.go index 9f5bce1..663d094 100644 --- a/stream/stream_test.go +++ b/stream/stream_test.go @@ -2,6 +2,8 @@ package stream_test import ( "fmt" + "iter" + "slices" "sync/atomic" "testing" "time" @@ -34,6 +36,35 @@ func ExampleStream() { // 80ms } +func ExampleStream_GoForEach() { + times := []int{20, 52, 16, 45, 4, 80} + seq := func(timeSeq iter.Seq[int]) iter.Seq[stream.Task] { + return func(yield func(stream.Task) bool) { + for millis := range timeSeq { + dur := time.Duration(millis) * time.Millisecond + if !yield(func() stream.Callback { + time.Sleep(dur) + // This will print in the order the tasks were submitted + return func() { fmt.Println(dur) } + }) { + } + } + } + } + + s := stream.New() + s.GoForEach(seq(slices.Values(times))) + s.Wait() + + // Output: + // 20ms + // 52ms + // 16ms + // 45ms + // 4ms + // 80ms +} + func TestStream(t *testing.T) { t.Parallel() diff --git a/waitgroup.go b/waitgroup.go index 47b1bc1..1114922 100644 --- a/waitgroup.go +++ b/waitgroup.go @@ -1,6 +1,7 @@ package conc import ( + "iter" "sync" "github.com/sourcegraph/conc/panics" @@ -33,6 +34,15 @@ func (h *WaitGroup) Go(f func()) { }() } +// GoForEach executes the given function concurrently for each element in the iterator. +// It maintains the order of the input iterator in the execution. This method is useful +// for parallel processing of iterable data structures while preserving their original sequence. +func (h *WaitGroup) GoForEach(seq iter.Seq[func()]) { + for f := range seq { + h.Go(f) + } +} + // Wait will block until all goroutines spawned with Go exit and will // propagate any panics spawned in a child goroutine. func (h *WaitGroup) Wait() { diff --git a/waitgroup_test.go b/waitgroup_test.go index 44ae61a..454a8f4 100644 --- a/waitgroup_test.go +++ b/waitgroup_test.go @@ -2,6 +2,7 @@ package conc_test import ( "fmt" + "iter" "sync/atomic" "testing" @@ -26,6 +27,29 @@ func ExampleWaitGroup() { // 10 } +func ExampleWaitGroup_GoForEach() { + var count atomic.Int64 + + var wg conc.WaitGroup + seq := func() iter.Seq[func()] { + return func(yield func(func()) bool) { + for i := 0; i < 10; i++ { + if !yield(func() { + count.Add(1) + }) { + return + } + } + } + } + wg.GoForEach(seq()) + wg.Wait() + + fmt.Println(count.Load()) + // Output: + // 10 +} + func ExampleWaitGroup_WaitAndRecover() { var wg conc.WaitGroup