-
Notifications
You must be signed in to change notification settings - Fork 11
Expand file tree
/
Copy pathasync_buffer_time_sequence.go
More file actions
99 lines (79 loc) · 2.43 KB
/
async_buffer_time_sequence.go
File metadata and controls
99 lines (79 loc) · 2.43 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
package co
import (
"sync"
"time"
syncx "go.tempura.ink/co/internal/syncx"
)
type AsyncBufferTimeSequence[R any, T []R] struct {
*asyncSequence[T]
previousIterator Iterator[R]
interval time.Duration
}
func NewAsyncBufferTimeSequence[R any, T []R](it AsyncSequenceable[R], interval time.Duration) *AsyncBufferTimeSequence[R, T] {
a := &AsyncBufferTimeSequence[R, T]{
previousIterator: it.iterator(),
interval: interval,
}
a.asyncSequence = NewAsyncSequence[T](a)
return a
}
func (a *AsyncBufferTimeSequence[R, T]) SetInterval(interval time.Duration) *AsyncBufferTimeSequence[R, T] {
a.interval = interval
return a
}
func (c *AsyncBufferTimeSequence[R, T]) iterator() Iterator[T] {
it := &asyncBufferTimeSequenceIterator[R, T]{
AsyncBufferTimeSequence: c,
bufferedData: NewList[T](),
bufferWait: syncx.NewCondx(&sync.Mutex{}),
}
it.asyncSequenceIterator = NewAsyncSequenceIterator[T](it)
return it
}
type asyncBufferTimeSequenceIterator[R any, T []R] struct {
*asyncSequenceIterator[T]
*AsyncBufferTimeSequence[R, T]
previousTime time.Time
bufferedData *List[T]
runOnce sync.Once
sourceEnded bool
bufferWait *syncx.Condx
}
func (it *asyncBufferTimeSequenceIterator[R, T]) intervalPassed() bool {
return time.Since(it.previousTime) > it.interval
}
func (it *asyncBufferTimeSequenceIterator[R, T]) startBuffer() {
it.runOnce.Do(func() {
it.previousTime = time.Now()
syncx.SafeGo(func() {
for op := it.previousIterator.next(); op.valid; op = it.previousIterator.next() {
reachedInterval := it.intervalPassed()
if it.bufferedData.len() == 0 || reachedInterval {
it.bufferedData.add(T{})
}
lIdx := it.bufferedData.len() - 1
it.bufferedData.setAt(lIdx, append(it.bufferedData.getAt(lIdx), op.data))
if reachedInterval {
it.bufferWait.Broadcastify(&syncx.BroadcastOption{
PreProcessFn: func() { it.previousTime = time.Now() }},
)
}
}
it.bufferWait.Broadcastify(&syncx.BroadcastOption{
PreProcessFn: func() { it.sourceEnded = true }},
)
})
})
}
func (it *asyncBufferTimeSequenceIterator[R, T]) next() *Optional[T] {
it.startBuffer()
it.bufferWait.Waitify(&syncx.WaitOption{
ConditionFn: func() bool {
return !it.sourceEnded && (it.bufferedData.len() == 0 || !it.intervalPassed())
},
})
if it.sourceEnded && it.bufferedData.len() == 0 {
return NewOptionalEmpty[T]()
}
return OptionalOf(it.bufferedData.popFirst())
}