-
Notifications
You must be signed in to change notification settings - Fork 11
Expand file tree
/
Copy pathfrom_chan_buffered.go
More file actions
81 lines (69 loc) · 1.76 KB
/
from_chan_buffered.go
File metadata and controls
81 lines (69 loc) · 1.76 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
package co
import (
"sync"
"go.tempura.ink/co/ds/queue"
syncx "go.tempura.ink/co/internal/syncx"
)
type AsyncBufferedChan[R any] struct {
*asyncSequence[R]
sourceCh chan R
sourceEnded syncx.AtomicBool
runOnce sync.Once
bufferedData *queue.Queue[R]
bufferWait *syncx.Condx
}
func FromChanBuffered[R any](ch chan R) *AsyncBufferedChan[R] {
a := &AsyncBufferedChan[R]{
sourceCh: ch,
bufferedData: queue.NewQueue[R](),
bufferWait: syncx.NewCondx(&sync.Mutex{}),
}
a.asyncSequence = NewAsyncSequence[R](a)
a.startListening()
return a
}
func (a *AsyncBufferedChan[T]) startListening() {
a.runOnce.Do(func() {
syncx.SafeGo(func() {
for val := range a.sourceCh {
a.bufferWait.Broadcastify(&syncx.BroadcastOption{
PreProcessFn: func() {
a.bufferedData.Enqueue(val)
},
})
}
a.bufferWait.Broadcastify(&syncx.BroadcastOption{
PreProcessFn: func() { a.sourceEnded.Set(true) },
})
})
})
}
func (a *AsyncBufferedChan[R]) Complete() *AsyncBufferedChan[R] {
syncx.SafeClose(a.sourceCh)
return a
}
func (a *AsyncBufferedChan[R]) iterator() Iterator[R] {
it := &asyncBufferedChanIterator[R]{
AsyncBufferedChan: a,
}
it.asyncSequenceIterator = NewAsyncSequenceIterator[R](it)
return it
}
type asyncBufferedChanIterator[R any] struct {
*asyncSequenceIterator[R]
*AsyncBufferedChan[R]
}
func (it *asyncBufferedChanIterator[R]) next() *Optional[R] {
if it.sourceEnded.Get() && it.bufferedData.Len() == 0 {
return NewOptionalEmpty[R]()
}
it.bufferWait.Waitify(&syncx.WaitOption{
ConditionFn: func() bool {
return !it.sourceEnded.Get() && it.bufferedData.Len() == 0
},
})
if it.sourceEnded.Get() && it.bufferedData.Len() == 0 {
return NewOptionalEmpty[R]()
}
return OptionalOf(it.bufferedData.Dequeue())
}