-
Notifications
You must be signed in to change notification settings - Fork 11
Expand file tree
/
Copy pathasync_map_sequence.go
More file actions
57 lines (48 loc) · 1.28 KB
/
async_map_sequence.go
File metadata and controls
57 lines (48 loc) · 1.28 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
package co
import syncx "go.tempura.ink/co/internal/syncx"
type AsyncMapSequence[R, T any] struct {
*asyncSequence[T]
previousIterator Iterator[R]
predictorFn func(R) T
}
func NewAsyncMapSequence[R, T any](p AsyncSequenceable[R], fn func(R) T) *AsyncMapSequence[R, T] {
a := &AsyncMapSequence[R, T]{
previousIterator: p.iterator(),
predictorFn: fn,
}
a.asyncSequence = NewAsyncSequence[T](a)
return a
}
func (c *AsyncMapSequence[R, T]) SetPredicator(fn func(R) T) *AsyncMapSequence[R, T] {
c.predictorFn = fn
return c
}
func (a *AsyncMapSequence[R, T]) iterator() Iterator[T] {
it := &asyncMapSequenceIterator[R, T]{
AsyncMapSequence: a,
}
it.asyncSequenceIterator = NewAsyncSequenceIterator[T](it)
return it
}
type asyncMapSequenceIterator[R, T any] struct {
*asyncSequenceIterator[T]
*AsyncMapSequence[R, T]
}
func (it *asyncMapSequenceIterator[R, T]) next() *Optional[T] {
for op := it.previousIterator.next(); op.valid; op = it.previousIterator.next() {
mapped, err := syncx.SafeFn(func() T {
return it.predictorFn(op.data)
})
if err != nil {
it.handleError(err)
if it.errorMode.shouldSkip() {
continue
}
if it.errorMode.shouldStop() {
return NewOptionalEmpty[T]()
}
}
return OptionalOf(mapped)
}
return NewOptionalEmpty[T]()
}