diff --git a/akka-projection-r2dbc/src/main/scala/akka/projection/r2dbc/internal/R2dbcOffsetStore.scala b/akka-projection-r2dbc/src/main/scala/akka/projection/r2dbc/internal/R2dbcOffsetStore.scala index f301ab2f6..acd15561a 100644 --- a/akka-projection-r2dbc/src/main/scala/akka/projection/r2dbc/internal/R2dbcOffsetStore.scala +++ b/akka-projection-r2dbc/src/main/scala/akka/projection/r2dbc/internal/R2dbcOffsetStore.scala @@ -4,6 +4,19 @@ package akka.projection.r2dbc.internal +import java.time.Clock +import java.time.Instant +import java.time.{ Duration => JDuration } +import java.util.UUID +import java.util.concurrent.atomic.AtomicBoolean +import java.util.concurrent.atomic.AtomicReference + +import scala.annotation.tailrec +import scala.collection.immutable +import scala.concurrent.ExecutionContext +import scala.concurrent.Future +import scala.util.Success + import akka.Done import akka.actor.typed.ActorSystem import akka.actor.typed.scaladsl.LoggerOps @@ -22,24 +35,15 @@ import akka.persistence.typed.PersistenceId import akka.projection.BySlicesSourceProvider import akka.projection.MergeableOffset import akka.projection.ProjectionId +import akka.projection.internal.EnsembleTelemetry import akka.projection.internal.ManagementState import akka.projection.internal.OffsetSerialization import akka.projection.internal.OffsetSerialization.MultipleOffsets +import akka.projection.internal.Telemetry import akka.projection.r2dbc.R2dbcProjectionSettings import io.r2dbc.spi.Connection import org.slf4j.LoggerFactory -import java.time.Clock -import java.time.Instant -import java.time.{ Duration => JDuration } -import java.util.UUID -import java.util.concurrent.atomic.AtomicBoolean -import java.util.concurrent.atomic.AtomicReference -import scala.annotation.tailrec -import scala.collection.immutable -import scala.concurrent.ExecutionContext -import scala.concurrent.Future - /** * INTERNAL API */ @@ -164,13 +168,15 @@ private[projection] object R2dbcOffsetStore { final class RejectedEnvelope(message: String) extends IllegalStateException(message) - sealed trait Validation + sealed trait Validation extends R2dbcOffsetValidationObserver.OffsetValidation object Validation { - case object Accepted extends Validation - case object Duplicate extends Validation - case object RejectedSeqNr extends Validation - case object RejectedBacktrackingSeqNr extends Validation + import R2dbcOffsetValidationObserver.OffsetValidation + + case object Accepted extends Validation with OffsetValidation.Accepted + case object Duplicate extends Validation with OffsetValidation.Duplicate + case object RejectedSeqNr extends Validation with OffsetValidation.Rejected + case object RejectedBacktrackingSeqNr extends Validation with OffsetValidation.Rejected val FutureAccepted: Future[Validation] = Future.successful(Accepted) val FutureDuplicate: Future[Validation] = Future.successful(Duplicate) @@ -236,6 +242,8 @@ private[projection] class R2dbcOffsetStore( // To avoid delete requests when no new offsets have been stored since previous delete private val idle = new AtomicBoolean(false) + private var validationObservers: immutable.Seq[R2dbcOffsetValidationObserver] = Nil + system.scheduler.scheduleWithFixedDelay( settings.deleteInterval, settings.deleteInterval, @@ -259,6 +267,17 @@ private[projection] class R2dbcOffsetStore( } } + def setTelemetry(telemetry: Telemetry): Unit = { + validationObservers = telemetry match { + case observer: R2dbcOffsetValidationObserver => List(observer) + case ens: EnsembleTelemetry => + ens.telemetries.collect { + case observer: R2dbcOffsetValidationObserver => observer + } + case _ => Nil + } + } + def getState(): State = state.get() @@ -490,29 +509,38 @@ private[projection] class R2dbcOffsetStore( def validateAll[Envelope](envelopes: immutable.Seq[Envelope]): Future[immutable.Seq[(Envelope, Validation)]] = { import Validation._ - envelopes - .foldLeft(Future.successful((getInflight(), Vector.empty[(Envelope, Validation)]))) { (acc, envelope) => - acc.flatMap { - case (inflight, filteredEnvelopes) => - createRecordWithOffset(envelope) match { - case Some(recordWithOffset) => - validate(recordWithOffset, inflight).map { - case Accepted => - ( - inflight.updated(recordWithOffset.record.pid, recordWithOffset.record.seqNr), - filteredEnvelopes :+ (envelope -> Accepted)) - case rejected => - (inflight, filteredEnvelopes :+ (envelope -> rejected)) - } - case None => - Future.successful((inflight, filteredEnvelopes :+ (envelope -> Accepted))) - } + val result = + envelopes + .foldLeft(Future.successful((getInflight(), Vector.empty[(Envelope, Validation)]))) { (acc, envelope) => + acc.flatMap { + case (inflight, filteredEnvelopes) => + createRecordWithOffset(envelope) match { + case Some(recordWithOffset) => + validate(recordWithOffset, inflight).map { + case Accepted => + ( + inflight.updated(recordWithOffset.record.pid, recordWithOffset.record.seqNr), + filteredEnvelopes :+ (envelope -> Accepted)) + case rejected => + (inflight, filteredEnvelopes :+ (envelope -> rejected)) + } + + case None => + Future.successful((inflight, filteredEnvelopes :+ (envelope -> Accepted))) + } + } } + .map { + case (_, filteredEnvelopes) => + filteredEnvelopes + } + + if (validationObservers.nonEmpty) + result.andThen { + case Success(xs) => xs.foreach { case (env, v) => notifyValidationObserver(env, v) } } - .map { - case (_, filteredEnvelopes) => - filteredEnvelopes - } + else + result } /** @@ -521,10 +549,16 @@ private[projection] class R2dbcOffsetStore( * should be rejected. */ def validate[Envelope](envelope: Envelope): Future[Validation] = { - createRecordWithOffset(envelope) match { + val result = createRecordWithOffset(envelope) match { case Some(recordWithOffset) => validate(recordWithOffset, getInflight()) case None => Validation.FutureAccepted } + if (validationObservers.nonEmpty) + result.andThen { + case Success(v) => notifyValidationObserver(envelope, v) + } + else + result } private def validate(recordWithOffset: RecordWithOffset, currentInflight: Map[Pid, SeqNr]): Future[Validation] = { @@ -653,6 +687,12 @@ private[projection] class R2dbcOffsetStore( } } + private def notifyValidationObserver[Envelope](env: Envelope, validation: Validation): Unit = { + if (validationObservers.nonEmpty) { + validationObservers.foreach(_.onOffsetValidated(env, validation)) + } + } + @tailrec final def addInflight[Envelope](envelope: Envelope): Unit = { createRecordWithOffset(envelope) match { case Some(recordWithOffset) => diff --git a/akka-projection-r2dbc/src/main/scala/akka/projection/r2dbc/internal/R2dbcOffsetValidationObserver.scala b/akka-projection-r2dbc/src/main/scala/akka/projection/r2dbc/internal/R2dbcOffsetValidationObserver.scala new file mode 100644 index 000000000..e71675744 --- /dev/null +++ b/akka-projection-r2dbc/src/main/scala/akka/projection/r2dbc/internal/R2dbcOffsetValidationObserver.scala @@ -0,0 +1,32 @@ +/* + * Copyright (C) 2023 Lightbend Inc. + */ + +package akka.projection.r2dbc.internal + +import akka.annotation.DoNotInherit +import akka.annotation.InternalStableApi + +@InternalStableApi +object R2dbcOffsetValidationObserver { + // effectively sealed, but to avoid translation the R2dbcOffsetStore.Validation extend this + @DoNotInherit + trait OffsetValidation + + object OffsetValidation { + @DoNotInherit trait Accepted extends OffsetValidation + @DoNotInherit trait Duplicate extends OffsetValidation + @DoNotInherit trait Rejected extends OffsetValidation + } +} + +/** + * [[akka.projection.internal.Telemetry]] may implement this trait for offset validation progress tracking. + */ +@InternalStableApi +trait R2dbcOffsetValidationObserver { + import R2dbcOffsetValidationObserver._ + + def onOffsetValidated[Envelope](envelope: Envelope, result: OffsetValidation): Unit + +} diff --git a/akka-projection-r2dbc/src/main/scala/akka/projection/r2dbc/internal/R2dbcProjectionImpl.scala b/akka-projection-r2dbc/src/main/scala/akka/projection/r2dbc/internal/R2dbcProjectionImpl.scala index 72556065f..99ed929d8 100644 --- a/akka-projection-r2dbc/src/main/scala/akka/projection/r2dbc/internal/R2dbcProjectionImpl.scala +++ b/akka-projection-r2dbc/src/main/scala/akka/projection/r2dbc/internal/R2dbcProjectionImpl.scala @@ -605,8 +605,13 @@ private[projection] class R2dbcProjectionImpl[Offset, Envelope]( * This method returns the projection Source mapped with user 'handler' function, but before any sink attached. This * is mainly intended to be used by the TestKit allowing it to attach a TestSink to it. */ - override private[projection] def mappedSource()(implicit system: ActorSystem[_]): Source[Done, Future[Done]] = - new R2dbcInternalProjectionState(settingsOrDefaults).mappedSource() + override private[projection] def mappedSource()(implicit system: ActorSystem[_]): Source[Done, Future[Done]] = { + val projectionState = new R2dbcInternalProjectionState(settingsOrDefaults) + val result = projectionState.mappedSource() + // telemetry is initialized by mappedSource() + offsetStore.setTelemetry(projectionState.getTelemetry()) + result + } private class R2dbcInternalProjectionState(settings: ProjectionSettings)(implicit val system: ActorSystem[_]) extends InternalProjectionState[Offset, Envelope](