Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 17 additions & 5 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
[![Go](https://github.com/tempura-shrimp/co/actions/workflows/go.yml/badge.svg)](https://github.com/tempura-shrimp/co/actions/workflows/go.yml)
[![Go](https://github.com/tempurai/co/actions/workflows/go.yml/badge.svg)](https://github.com/tempurai/co/actions/workflows/go.yml)

# Co

Expand Down Expand Up @@ -28,7 +28,7 @@ However, even though I have mentioned a lot of ReactiveX patterns above. I do no

## APIs

https://godoc.org/go.tempura.ink/co
https://pkg.go.dev/github.com/tempurai/co

### Promising functions:

Expand Down Expand Up @@ -78,7 +78,19 @@ https://godoc.org/go.tempura.ink/co

## Getting started

Navigate to your project base and `go get go.tempura.ink/co`
Navigate to your project base and `go get github.com/tempurai/co`.

If you are in an environment without access to the public Go proxy, you can fetch directly:

```bash
GOPROXY=direct go get github.com/tempurai/co
```

If module checksum verification is blocked in your environment, you can also disable it:

```bash
GOPROXY=direct GOSUMDB=off go get github.com/tempurai/co
```

## Examples

Expand Down Expand Up @@ -166,7 +178,7 @@ Pool benchmark
```golang
goos: darwin
goarch: amd64
pkg: go.tempura.ink/co/benchmark
pkg: github.com/tempurai/co/benchmark
cpu: Intel(R) Core(TM) i7-9750H CPU @ 2.60GHz
BenchmarkUnmarshalLargeJSONWithSequence-12 50 45000332 ns/op 11435352 B/op 137058 allocs/op
BenchmarkUnmarshalLargeJSONWithAwaitAll-12 50 9901537 ns/op 11207428 B/op 134323 allocs/op
Expand All @@ -175,5 +187,5 @@ BenchmarkUnmarshalLargeJSONWithAnts-12 50 100755
BenchmarkUnmarshalLargeJSONWithWorkPool-12 50 9658117 ns/op 11206981 B/op 134322 allocs/op
BenchmarkUnmarshalLargeJSONWithDispatchPool-12 50 10893923 ns/op 11207039 B/op 134322 allocs/op
PASS
ok go.tempura.ink/co/benchmark 6.793s
ok github.com/tempurai/co/benchmark 6.793s
```
2 changes: 1 addition & 1 deletion action.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ package co
import (
"sync"

syncx "go.tempura.ink/co/internal/syncx"
syncx "github.com/tempurai/co/internal/syncx"
)

type Action[E any] struct {
Expand Down
2 changes: 1 addition & 1 deletion action_await_all.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ package co
import (
"sync"

syncx "go.tempura.ink/co/internal/syncx"
syncx "github.com/tempurai/co/internal/syncx"
)

type actionAwait[R any] struct {
Expand Down
2 changes: 1 addition & 1 deletion action_await_race.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ package co
import (
"sync"

syncx "go.tempura.ink/co/internal/syncx"
syncx "github.com/tempurai/co/internal/syncx"
)

type actionRace[R any] struct {
Expand Down
16 changes: 9 additions & 7 deletions action_await_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,13 @@ import (
"time"

"github.com/smartystreets/goconvey/convey"
"go.tempura.ink/co"
"github.com/tempurai/co"
)

func TestAwaitAll(t *testing.T) {
convey.Convey("given a sequential tasks", t, func() {
handlers := make([]func() (int, error), 0)
for i := 0; i < 1000; i++ {
for i := 0; i < 200; i++ {
i := i
handlers = append(handlers, func() (int, error) {
return i + 1, nil
Expand All @@ -27,7 +27,7 @@ func TestAwaitAll(t *testing.T) {

convey.Convey("The responded value should be valid", func() {
expected, actuals := []int{}, []int{}
for i := 0; i < 1000; i++ {
for i := 0; i < 200; i++ {
expected = append(expected, i+1)
actuals = append(actuals, responses[i].GetValue())
}
Expand All @@ -41,11 +41,12 @@ func TestAwaitRace(t *testing.T) {
runtime.GOMAXPROCS(runtime.NumCPU() * 2)

convey.Convey("given a sequential tasks", t, func() {
baseDelay := 10 * time.Millisecond
handlers := make([]func() (int, error), 0)
for i := 0; i < 100; i++ {
for i := 0; i < 10; i++ {
i := i
handlers = append(handlers, func() (int, error) {
time.Sleep(time.Second * time.Duration(i+1))
time.Sleep(baseDelay * time.Duration(i+1))
return i + 1, nil
})
}
Expand All @@ -64,8 +65,9 @@ func TestAwaitAny(t *testing.T) {
runtime.GOMAXPROCS(runtime.NumCPU() * 2)

convey.Convey("given a sequential tasks", t, func() {
baseDelay := 10 * time.Millisecond
handlers := make([]func() (int, error), 0)
for i := 0; i < 100; i++ {
for i := 0; i < 10; i++ {
i := i

err := fmt.Errorf("Determined value")
Expand All @@ -74,7 +76,7 @@ func TestAwaitAny(t *testing.T) {
}

handlers = append(handlers, func() (int, error) {
time.Sleep(time.Second * time.Duration(i+1))
time.Sleep(baseDelay * time.Duration(i+1))
return i + 1, err
})
}
Expand Down
2 changes: 1 addition & 1 deletion async_adjacent_filter_sequence.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package co

import (
syncx "go.tempura.ink/co/internal/syncx"
syncx "github.com/tempurai/co/internal/syncx"
)

type AsyncAdjacentFilterSequence[R any] struct {
Expand Down
2 changes: 1 addition & 1 deletion async_adjacent_filter_sequence_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import (
"testing"

"github.com/smartystreets/goconvey/convey"
"go.tempura.ink/co"
"github.com/tempurai/co"
)

func TestAsyncAdjacentFilterSequence(t *testing.T) {
Expand Down
4 changes: 2 additions & 2 deletions async_any_sequence.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@ package co
import (
"sync"

"go.tempura.ink/co/ds/queue"
syncx "go.tempura.ink/co/internal/syncx"
"github.com/tempurai/co/ds/queue"
syncx "github.com/tempurai/co/internal/syncx"
)

type AsyncAnySequence[R any] struct {
Expand Down
4 changes: 2 additions & 2 deletions async_any_sequence_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@ import (
"time"

"github.com/smartystreets/goconvey/convey"
"go.tempura.ink/co"
"golang.org/x/exp/slices"
"github.com/tempurai/co"
"slices"
)

func TestAsyncAnySequence(t *testing.T) {
Expand Down
43 changes: 35 additions & 8 deletions async_buffer_time_sequence.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import (
"sync"
"time"

syncx "go.tempura.ink/co/internal/syncx"
syncx "github.com/tempurai/co/internal/syncx"
)

type AsyncBufferTimeSequence[R any, T []R] struct {
Expand Down Expand Up @@ -46,9 +46,11 @@ type asyncBufferTimeSequenceIterator[R any, T []R] struct {
previousTime time.Time
bufferedData *List[T]

runOnce sync.Once
sourceEnded bool
bufferWait *syncx.Condx
runOnce sync.Once
sourceEnded bool
bufferWait *syncx.Condx
tickerStop chan struct{}
tickerStopOnce sync.Once
}

func (it *asyncBufferTimeSequenceIterator[R, T]) intervalPassed() bool {
Expand All @@ -58,6 +60,7 @@ func (it *asyncBufferTimeSequenceIterator[R, T]) intervalPassed() bool {
func (it *asyncBufferTimeSequenceIterator[R, T]) startBuffer() {
it.runOnce.Do(func() {
it.previousTime = time.Now()
it.tickerStop = make(chan struct{})

syncx.SafeGo(func() {
for op := it.previousIterator.next(); op.valid; op = it.previousIterator.next() {
Expand All @@ -71,14 +74,38 @@ func (it *asyncBufferTimeSequenceIterator[R, T]) startBuffer() {

if reachedInterval {
it.bufferWait.Broadcastify(&syncx.BroadcastOption{
PreProcessFn: func() { it.previousTime = time.Now() }},
)
PreProcessFn: func() { it.previousTime = time.Now() },
})
}
}
it.bufferWait.Broadcastify(&syncx.BroadcastOption{
PreProcessFn: func() { it.sourceEnded = true }},
)
PreProcessFn: func() {
it.sourceEnded = true
it.stopTicker()
},
})
})

if it.interval > 0 {
syncx.SafeGo(func() {
ticker := time.NewTicker(it.interval)
defer ticker.Stop()
for {
select {
case <-ticker.C:
it.bufferWait.Broadcastify(&syncx.BroadcastOption{})
case <-it.tickerStop:
return
}
}
})
}
})
}

func (it *asyncBufferTimeSequenceIterator[R, T]) stopTicker() {
it.tickerStopOnce.Do(func() {
close(it.tickerStop)
})
}

Expand Down
2 changes: 1 addition & 1 deletion async_buffer_time_sequence_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import (
"time"

"github.com/smartystreets/goconvey/convey"
"go.tempura.ink/co"
"github.com/tempurai/co"
)

func TestAsyncBufferTimeSequence(t *testing.T) {
Expand Down
4 changes: 2 additions & 2 deletions async_combine_latest_sequence.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@ package co
import (
"sync"

"go.tempura.ink/co/ds/queue"
syncx "go.tempura.ink/co/internal/syncx"
"github.com/tempurai/co/ds/queue"
syncx "github.com/tempurai/co/internal/syncx"
)

type asyncCombineLatestFn[R any] func([]any) R
Expand Down
4 changes: 2 additions & 2 deletions async_combine_latest_sequence_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@ import (
"testing"

"github.com/smartystreets/goconvey/convey"
"go.tempura.ink/co"
"golang.org/x/exp/slices"
"github.com/tempurai/co"
"slices"
)

func checkCombineLatest[T comparable](c convey.C, a [][]T, l ...[]T) {
Expand Down
2 changes: 1 addition & 1 deletion async_compacted_sequence.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package co

import syncx "go.tempura.ink/co/internal/syncx"
import syncx "github.com/tempurai/co/internal/syncx"

type AsyncCompactedSequence[R comparable] struct {
*asyncSequence[R]
Expand Down
2 changes: 1 addition & 1 deletion async_compacted_sequence_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import (
"testing"

"github.com/smartystreets/goconvey/convey"
"go.tempura.ink/co"
"github.com/tempurai/co"
)

type vData struct{ v int }
Expand Down
4 changes: 2 additions & 2 deletions async_debounce_sequence.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@ import (
"sync"
"time"

"go.tempura.ink/co/ds/queue"
syncx "go.tempura.ink/co/internal/syncx"
"github.com/tempurai/co/ds/queue"
syncx "github.com/tempurai/co/internal/syncx"
)

type AsyncDebounceSequence[R any] struct {
Expand Down
2 changes: 1 addition & 1 deletion async_debounce_sequence_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import (
"time"

"github.com/smartystreets/goconvey/convey"
"go.tempura.ink/co"
"github.com/tempurai/co"
)

func TestAsyncDebounceSequence(t *testing.T) {
Expand Down
2 changes: 1 addition & 1 deletion async_flatten_sequence_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import (
"testing"

"github.com/smartystreets/goconvey/convey"
"go.tempura.ink/co"
"github.com/tempurai/co"
)

func TestAsyncFlattenSequence(t *testing.T) {
Expand Down
2 changes: 1 addition & 1 deletion async_map_sequence.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package co

import syncx "go.tempura.ink/co/internal/syncx"
import syncx "github.com/tempurai/co/internal/syncx"

type AsyncMapSequence[R, T any] struct {
*asyncSequence[T]
Expand Down
2 changes: 1 addition & 1 deletion async_map_sequence_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import (
"testing"

"github.com/smartystreets/goconvey/convey"
"go.tempura.ink/co"
"github.com/tempurai/co"
)

func TestAsyncMapSequence(t *testing.T) {
Expand Down
2 changes: 1 addition & 1 deletion async_merged_sequence_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import (
"testing"

"github.com/smartystreets/goconvey/convey"
"go.tempura.ink/co"
"github.com/tempurai/co"
)

func TestAsyncMergedSequence(t *testing.T) {
Expand Down
4 changes: 2 additions & 2 deletions async_multicast_sequence.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@ import (
"log"
"sync"

"go.tempura.ink/co/ds/queue"
syncx "go.tempura.ink/co/internal/syncx"
"github.com/tempurai/co/ds/queue"
syncx "github.com/tempurai/co/internal/syncx"
)

type AsyncMulticastSequence[R any] struct {
Expand Down
2 changes: 1 addition & 1 deletion async_multicast_sequence_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import (
"testing"

"github.com/smartystreets/goconvey/convey"
"go.tempura.ink/co"
"github.com/tempurai/co"
)

func TestAsyncMulticastSequence(t *testing.T) {
Expand Down
2 changes: 1 addition & 1 deletion async_pairwise_sequence_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import (
"testing"

"github.com/smartystreets/goconvey/convey"
"go.tempura.ink/co"
"github.com/tempurai/co"
)

func TestAsyncPairwiseSequence(t *testing.T) {
Expand Down
2 changes: 1 addition & 1 deletion async_partition_sequence_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import (
"testing"

"github.com/smartystreets/goconvey/convey"
"go.tempura.ink/co"
"github.com/tempurai/co"
)

func TestAsyncPartitionSequence(t *testing.T) {
Expand Down
Loading