-
Notifications
You must be signed in to change notification settings - Fork 11
Expand file tree
/
Copy pathasync_debounce_sequence.go
More file actions
108 lines (87 loc) · 2.58 KB
/
async_debounce_sequence.go
File metadata and controls
108 lines (87 loc) · 2.58 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
package co
import (
"sync"
"time"
"go.tempura.ink/co/ds/queue"
syncx "go.tempura.ink/co/internal/syncx"
)
type AsyncDebounceSequence[R any] struct {
*asyncSequence[R]
previousIterator Iterator[R]
interval time.Duration
tolerance time.Duration
}
func NewAsyncDebounceSequence[R any](it AsyncSequenceable[R], interval time.Duration) *AsyncDebounceSequence[R] {
a := &AsyncDebounceSequence[R]{
previousIterator: it.iterator(),
interval: interval,
}
a.asyncSequence = NewAsyncSequence[R](a)
return a
}
func (a *AsyncDebounceSequence[R]) SetInterval(interval time.Duration) *AsyncDebounceSequence[R] {
a.interval = interval
return a
}
func (a *AsyncDebounceSequence[R]) SetTolerance(tolerance time.Duration) *AsyncDebounceSequence[R] {
a.tolerance = a.interval + tolerance
return a
}
func (c *AsyncDebounceSequence[R]) iterator() Iterator[R] {
it := &asyncDebounceSequenceIterator[R]{
AsyncDebounceSequence: c,
bufferedData: queue.NewQueue[R](),
bufferWait: syncx.NewCondx(&sync.Mutex{}),
}
it.asyncSequenceIterator = NewAsyncSequenceIterator[R](it)
return it
}
type asyncDebounceSequenceIterator[R any] struct {
*asyncSequenceIterator[R]
*AsyncDebounceSequence[R]
previousTime time.Time
bufferedData *queue.Queue[R]
runOnce sync.Once
sourceEnded bool
bufferWait *syncx.Condx
}
func (it *asyncDebounceSequenceIterator[R]) intervalPassed() bool {
return time.Since(it.previousTime) > it.interval
}
func (it *asyncDebounceSequenceIterator[R]) tolerancePassed() bool {
return time.Since(it.previousTime) > it.tolerance
}
func (it *asyncDebounceSequenceIterator[R]) startBuffer() {
it.runOnce.Do(func() {
it.previousTime = time.Now()
syncx.SafeGo(func() {
for op := it.previousIterator.next(); op.valid; op = it.previousIterator.next() {
reachedInterval := it.intervalPassed()
if !reachedInterval {
continue
}
it.bufferedData.Enqueue(op.data)
if it.tolerance == it.interval || it.tolerancePassed() {
it.bufferWait.Broadcastify(&syncx.BroadcastOption{
PreProcessFn: func() { it.previousTime = time.Now() },
})
}
}
it.bufferWait.Broadcastify(&syncx.BroadcastOption{
PreProcessFn: func() { it.sourceEnded = true },
})
})
})
}
func (it *asyncDebounceSequenceIterator[R]) next() *Optional[R] {
it.startBuffer()
it.bufferWait.Waitify(&syncx.WaitOption{
ConditionFn: func() bool {
return !it.sourceEnded && it.bufferedData.Len() == 0
},
})
if it.sourceEnded && it.bufferedData.Len() == 0 {
return NewOptionalEmpty[R]()
}
return OptionalOf(it.bufferedData.Dequeue())
}