Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Offset validation observer #1014

Open
wants to merge 5 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,19 @@

package akka.projection.r2dbc.internal

import java.time.Clock
import java.time.Instant
import java.time.{ Duration => JDuration }
import java.util.UUID
import java.util.concurrent.atomic.AtomicBoolean
import java.util.concurrent.atomic.AtomicReference

import scala.annotation.tailrec
import scala.collection.immutable
import scala.concurrent.ExecutionContext
import scala.concurrent.Future
import scala.util.Success

import akka.Done
import akka.actor.typed.ActorSystem
import akka.actor.typed.scaladsl.LoggerOps
Expand All @@ -22,24 +35,15 @@ import akka.persistence.typed.PersistenceId
import akka.projection.BySlicesSourceProvider
import akka.projection.MergeableOffset
import akka.projection.ProjectionId
import akka.projection.internal.EnsembleTelemetry
import akka.projection.internal.ManagementState
import akka.projection.internal.OffsetSerialization
import akka.projection.internal.OffsetSerialization.MultipleOffsets
import akka.projection.internal.Telemetry
import akka.projection.r2dbc.R2dbcProjectionSettings
import io.r2dbc.spi.Connection
import org.slf4j.LoggerFactory

import java.time.Clock
import java.time.Instant
import java.time.{ Duration => JDuration }
import java.util.UUID
import java.util.concurrent.atomic.AtomicBoolean
import java.util.concurrent.atomic.AtomicReference
import scala.annotation.tailrec
import scala.collection.immutable
import scala.concurrent.ExecutionContext
import scala.concurrent.Future

/**
* INTERNAL API
*/
Expand Down Expand Up @@ -164,13 +168,15 @@ private[projection] object R2dbcOffsetStore {

final class RejectedEnvelope(message: String) extends IllegalStateException(message)

sealed trait Validation
sealed trait Validation extends R2dbcOffsetValidationObserver.OffsetValidation

object Validation {
case object Accepted extends Validation
case object Duplicate extends Validation
case object RejectedSeqNr extends Validation
case object RejectedBacktrackingSeqNr extends Validation
import R2dbcOffsetValidationObserver.OffsetValidation

case object Accepted extends Validation with OffsetValidation.Accepted
case object Duplicate extends Validation with OffsetValidation.Duplicate
case object RejectedSeqNr extends Validation with OffsetValidation.Rejected
case object RejectedBacktrackingSeqNr extends Validation with OffsetValidation.Rejected

val FutureAccepted: Future[Validation] = Future.successful(Accepted)
val FutureDuplicate: Future[Validation] = Future.successful(Duplicate)
Expand Down Expand Up @@ -236,6 +242,8 @@ private[projection] class R2dbcOffsetStore(
// To avoid delete requests when no new offsets have been stored since previous delete
private val idle = new AtomicBoolean(false)

private var validationObservers: immutable.Seq[R2dbcOffsetValidationObserver] = Nil
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Doesn't it need to be volatile or an atomic (or a copy method of some kind)?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's set once at startup, and as far as I can see it's not important if we would miss a few notifications. The purpose is not to track each and every offset, but see overall progress or what is going on. I can add a comment.


system.scheduler.scheduleWithFixedDelay(
settings.deleteInterval,
settings.deleteInterval,
Expand All @@ -259,6 +267,17 @@ private[projection] class R2dbcOffsetStore(
}
}

def setTelemetry(telemetry: Telemetry): Unit = {
validationObservers = telemetry match {
case observer: R2dbcOffsetValidationObserver => List(observer)
case ens: EnsembleTelemetry =>
ens.telemetries.collect {
case observer: R2dbcOffsetValidationObserver => observer
}
case _ => Nil
}
}

def getState(): State =
state.get()

Expand Down Expand Up @@ -490,29 +509,38 @@ private[projection] class R2dbcOffsetStore(

def validateAll[Envelope](envelopes: immutable.Seq[Envelope]): Future[immutable.Seq[(Envelope, Validation)]] = {
import Validation._
envelopes
.foldLeft(Future.successful((getInflight(), Vector.empty[(Envelope, Validation)]))) { (acc, envelope) =>
acc.flatMap {
case (inflight, filteredEnvelopes) =>
createRecordWithOffset(envelope) match {
case Some(recordWithOffset) =>
validate(recordWithOffset, inflight).map {
case Accepted =>
(
inflight.updated(recordWithOffset.record.pid, recordWithOffset.record.seqNr),
filteredEnvelopes :+ (envelope -> Accepted))
case rejected =>
(inflight, filteredEnvelopes :+ (envelope -> rejected))
}
case None =>
Future.successful((inflight, filteredEnvelopes :+ (envelope -> Accepted)))
}
val result =
envelopes
.foldLeft(Future.successful((getInflight(), Vector.empty[(Envelope, Validation)]))) { (acc, envelope) =>
acc.flatMap {
case (inflight, filteredEnvelopes) =>
createRecordWithOffset(envelope) match {
case Some(recordWithOffset) =>
validate(recordWithOffset, inflight).map {
case Accepted =>
(
inflight.updated(recordWithOffset.record.pid, recordWithOffset.record.seqNr),
filteredEnvelopes :+ (envelope -> Accepted))
case rejected =>
(inflight, filteredEnvelopes :+ (envelope -> rejected))
}

case None =>
Future.successful((inflight, filteredEnvelopes :+ (envelope -> Accepted)))
}
}
}
.map {
case (_, filteredEnvelopes) =>
filteredEnvelopes
}

if (validationObservers.nonEmpty)
result.andThen {
case Success(xs) => xs.foreach { case (env, v) => notifyValidationObserver(env, v) }
}
.map {
case (_, filteredEnvelopes) =>
filteredEnvelopes
}
else
result
}

/**
Expand All @@ -521,10 +549,16 @@ private[projection] class R2dbcOffsetStore(
* should be rejected.
*/
def validate[Envelope](envelope: Envelope): Future[Validation] = {
createRecordWithOffset(envelope) match {
val result = createRecordWithOffset(envelope) match {
case Some(recordWithOffset) => validate(recordWithOffset, getInflight())
case None => Validation.FutureAccepted
}
if (validationObservers.nonEmpty)
result.andThen {
case Success(v) => notifyValidationObserver(envelope, v)
}
else
result
}

private def validate(recordWithOffset: RecordWithOffset, currentInflight: Map[Pid, SeqNr]): Future[Validation] = {
Expand Down Expand Up @@ -653,6 +687,12 @@ private[projection] class R2dbcOffsetStore(
}
}

private def notifyValidationObserver[Envelope](env: Envelope, validation: Validation): Unit = {
if (validationObservers.nonEmpty) {
validationObservers.foreach(_.onOffsetValidated(env, validation))
}
}

@tailrec final def addInflight[Envelope](envelope: Envelope): Unit = {
createRecordWithOffset(envelope) match {
case Some(recordWithOffset) =>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
/*
* Copyright (C) 2023 Lightbend Inc. <https://www.lightbend.com>
*/

package akka.projection.r2dbc.internal

import akka.annotation.DoNotInherit
import akka.annotation.InternalStableApi

@InternalStableApi
object R2dbcOffsetValidationObserver {
// effectively sealed, but to avoid translation the R2dbcOffsetStore.Validation extend this
@DoNotInherit
trait OffsetValidation

object OffsetValidation {
@DoNotInherit trait Accepted extends OffsetValidation
@DoNotInherit trait Duplicate extends OffsetValidation
@DoNotInherit trait Rejected extends OffsetValidation
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A thought: these could be public do-not-extend traits with the internal validation concrete adt types extending them so that no translation step is actually needed.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

changed in b0ca096

}

/**
* [[akka.projection.internal.Telemetry]] may implement this trait for offset validation progress tracking.
*/
@InternalStableApi
trait R2dbcOffsetValidationObserver {
import R2dbcOffsetValidationObserver._

def onOffsetValidated[Envelope](envelope: Envelope, result: OffsetValidation): Unit

}
Original file line number Diff line number Diff line change
Expand Up @@ -605,8 +605,13 @@ private[projection] class R2dbcProjectionImpl[Offset, Envelope](
* This method returns the projection Source mapped with user 'handler' function, but before any sink attached. This
* is mainly intended to be used by the TestKit allowing it to attach a TestSink to it.
*/
override private[projection] def mappedSource()(implicit system: ActorSystem[_]): Source[Done, Future[Done]] =
new R2dbcInternalProjectionState(settingsOrDefaults).mappedSource()
override private[projection] def mappedSource()(implicit system: ActorSystem[_]): Source[Done, Future[Done]] = {
val projectionState = new R2dbcInternalProjectionState(settingsOrDefaults)
val result = projectionState.mappedSource()
// telemetry is initialized by mappedSource()
offsetStore.setTelemetry(projectionState.getTelemetry())
result
}

private class R2dbcInternalProjectionState(settings: ProjectionSettings)(implicit val system: ActorSystem[_])
extends InternalProjectionState[Offset, Envelope](
Expand Down