@@ -3,54 +3,70 @@ package org.nullvector.query
3
3
import akka .NotUsed
4
4
import akka .stream .scaladsl .Source
5
5
import akka .stream .stage ._
6
- import akka .stream .{Attributes , Outlet , SourceShape }
6
+ import akka .stream .{Attributes , Materializer , Outlet , SourceShape }
7
+ import org .slf4j .{Logger , LoggerFactory }
7
8
8
- import scala .concurrent . ExecutionContext
9
+ import scala .collection . mutable
9
10
import scala .concurrent .duration .FiniteDuration
11
+ import scala .concurrent .{ExecutionContext , Future }
10
12
11
- class PullerGraph [D , O ](
12
- initialOffset : O ,
13
- refreshInterval : FiniteDuration ,
14
- offsetOf : D => O ,
15
- graterOf : (O , O ) => O ,
16
- nextChunk : O => Source [D , NotUsed ],
17
- )(implicit ec : ExecutionContext ) extends GraphStage [SourceShape [Source [ D , NotUsed ]]] {
13
+ class PullerGraph [Element , Offset ](
14
+ initialOffset : Offset ,
15
+ refreshInterval : FiniteDuration ,
16
+ offsetOf : Element => Offset ,
17
+ greaterOf : (Offset , Offset ) => Offset ,
18
+ query : Offset => Source [Element , NotUsed ],
19
+ )(implicit ec : ExecutionContext , mat : Materializer ) extends GraphStage [SourceShape [Seq [ Element ]]] {
18
20
19
- private val outlet : Outlet [Source [ D , NotUsed ]] = Outlet [Source [ D , NotUsed ]](" PullerGraph.OUT" )
21
+ private val outlet : Outlet [Seq [ Element ]] = Outlet [Seq [ Element ]](" PullerGraph.OUT" )
20
22
21
- override def shape : SourceShape [Source [ D , NotUsed ]] = SourceShape .of(outlet)
23
+ override def shape : SourceShape [Seq [ Element ]] = SourceShape .of(outlet)
22
24
23
25
override def createLogic (attributes : Attributes ): GraphStageLogic = new TimerGraphStageLogic (shape) {
24
-
26
+ var currentOffset : Offset = initialOffset
25
27
private val effectiveRefreshInterval : FiniteDuration = attributes.get[RefreshInterval ].fold(refreshInterval)(_.interval)
26
- var currentOffset : O = initialOffset
27
- var eventStreamConsuming = false
28
-
29
- private val updateConsumingState : AsyncCallback [Boolean ] = createAsyncCallback[Boolean ](eventStreamConsuming = _)
30
- private val updateCurrentOffset : AsyncCallback [D ] =
31
- createAsyncCallback[D ](event => currentOffset = graterOf(currentOffset, offsetOf(event)))
28
+ private val updateCurrentOffset = createAsyncCallback[Offset ](offset => currentOffset = offset)
29
+ private val failAsync = createAsyncCallback[Throwable ](throwable => failStage(throwable))
30
+ private val pushElements = createAsyncCallback[Seq [Element ]](elements => push(outlet, elements))
32
31
32
+ private val timerName = " timer"
33
33
setHandler(outlet, new OutHandler {
34
- override def onPull (): Unit = {}
34
+ override def onPull () = scheduleNext()
35
35
36
- override def onDownstreamFinish (cause : Throwable ): Unit = cancelTimer(" timer " )
36
+ override def onDownstreamFinish (cause : Throwable ) = cancelTimer(timerName )
37
37
})
38
38
39
- override def preStart (): Unit = scheduleWithFixedDelay(" timer" , effectiveRefreshInterval, effectiveRefreshInterval)
40
-
41
- override protected def onTimer (timerKey : Any ): Unit = {
42
- if (isAvailable(outlet) && ! eventStreamConsuming) {
43
- eventStreamConsuming = true
44
- val source = nextChunk(currentOffset)
45
- .mapAsync(1 )(entry => updateCurrentOffset.invokeWithFeedback(entry).map(_ => entry))
46
- .watchTermination() { (mat, future) =>
47
- future.onComplete { _ => updateConsumingState.invoke(false ) }
48
- mat
49
- }
50
- push(outlet, source)
39
+ override protected def onTimer (timerKey : Any ) = {
40
+ query(currentOffset)
41
+ .runFold(new Accumulator (currentOffset))((acc, element) => acc.update(element))
42
+ .flatMap(_.pushOrScheduleNext())
43
+ .recover { case throwable : Throwable => failAsync.invoke(throwable) }
44
+ }
45
+
46
+ private def scheduleNext () = {
47
+ if (! isTimerActive(timerName)) {
48
+ scheduleOnce(timerName, effectiveRefreshInterval)
51
49
}
52
50
}
53
51
54
- }
52
+ class Accumulator ( private var latestOffset : Offset , private val elements : mutable. Buffer [ Element ] = mutable. Buffer .empty) {
55
53
54
+ def update (anElement : Element ): Accumulator = {
55
+ latestOffset = greaterOf(latestOffset, offsetOf(anElement))
56
+ elements.append(anElement)
57
+ this
58
+ }
59
+
60
+ def pushOrScheduleNext (): Future [Unit ] = {
61
+ if (elements.nonEmpty) {
62
+ for {
63
+ _ <- updateCurrentOffset.invokeWithFeedback(latestOffset)
64
+ _ <- pushElements.invokeWithFeedback(elements.toSeq)
65
+ } yield ()
66
+ }
67
+ else Future .successful(scheduleNext())
68
+ }
69
+ }
70
+
71
+ }
56
72
}
0 commit comments