Skip to content

Commit

Permalink
with IorT
Browse files Browse the repository at this point in the history
  • Loading branch information
benjben committed Feb 14, 2024
1 parent 5b40ead commit d0732fa
Showing 1 changed file with 41 additions and 58 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ import org.joda.time.DateTime
import io.circe.Json
import cats.{Applicative, Monad}
import cats.data.{EitherT, Ior, NonEmptyList, OptionT, StateT}
import cats.effect.kernel.Sync
import cats.effect.kernel.{Ref, Sync}
import cats.implicits._

import com.snowplowanalytics.refererparser._
Expand All @@ -42,6 +42,7 @@ import com.snowplowanalytics.snowplow.enrich.common.enrichments.registry.sqlquer
import com.snowplowanalytics.snowplow.enrich.common.enrichments.web.{PageEnrichments => WPE}
import com.snowplowanalytics.snowplow.enrich.common.outputs.EnrichedEvent
import com.snowplowanalytics.snowplow.enrich.common.utils.{IgluUtils, ConversionUtils => CU}
import cats.data.IorT

object EnrichmentManager {

Expand All @@ -66,60 +67,36 @@ object EnrichmentManager {
registryLookup: RegistryLookup[F],
atomicFields: AtomicFields,
emitIncomplete: Boolean = true
): F[Ior[BadRow, EnrichedEvent]] =
): IorT[F, BadRow, EnrichedEvent] =
for {
enrichedEvent <- Sync[F].delay(new EnrichedEvent)
validatedInput <- mapAndValidateInput(raw, enrichedEvent, etlTstamp, processor, client, registryLookup)
(schemaViolations, extractResult) = validatedInput
enriched <- runEnrichingStep(
List(schemaViolations),
emitIncomplete,
enrich(
enrichedEvent,
registry,
client,
processor,
raw,
extractResult.contexts,
extractResult.unstructEvent,
featureFlags.legacyEnrichmentOrder,
registryLookup
),
(None, Nil)
)
(enrichFailures, enrichmentsContexts) = enriched
validationFailures <- runEnrichingStep(
List(schemaViolations, enrichFailures),
emitIncomplete,
validateEnriched(enrichedEvent, raw, processor, featureFlags.acceptInvalid, invalidCount, atomicFields),
None
)
badRows = List(schemaViolations, enrichFailures, validationFailures).flatten
output = badRows match {
case Nil =>
Ior.right(enrichedEvent)
case head :: _ =>
if (!emitIncomplete)
Ior.left(head)
else {
val failuresContext = createFailuresContext(badRows)
ME.formatContexts(failuresContext :: enrichmentsContexts ::: extractResult.validationInfoContexts)
.foreach(c => enrichedEvent.derived_contexts = c)
Ior.both(head, enrichedEvent)
}
}
badRows <- IorT.liftF[F, BadRow, Ref[F, List[BadRow]]](Ref.of[F, List[BadRow]](Nil))
enrichedEvent = new EnrichedEvent
extractResult <- mapAndValidateInput(raw, enrichedEvent, etlTstamp, processor, client, registryLookup, badRows, emitIncomplete)
enrichmentsContexts <- enrich(
enrichedEvent,
registry,
client,
processor,
raw,
extractResult.contexts,
extractResult.unstructEvent,
featureFlags.legacyEnrichmentOrder,
registryLookup,
badRows,
emitIncomplete
)
_ <- validateEnriched(enrichedEvent, raw, processor, featureFlags.acceptInvalid, invalidCount, atomicFields, badRows, emitIncomplete)
output <- IorT { badRows.get.map { list => list match {
case Nil => Ior.right(enrichedEvent)
case l if !emitIncomplete => Ior.left(l.last)
case l =>
val failuresContext = createFailuresContext(l)
ME.formatContexts(failuresContext :: enrichmentsContexts ::: extractResult.validationInfoContexts)
.foreach(c => enrichedEvent.derived_contexts = c)
Ior.both(l.last, enrichedEvent)
}}}
} yield output

def runEnrichingStep[F[_]: Sync, A](
errors: List[Option[BadRow]],
emitIncomplete: Boolean,
step: F[A],
fallback: A
): F[A] =
if (errors.forall(_.isEmpty) || emitIncomplete)
step
else
Sync[F].pure(fallback)

// TODO: aggregate all the errors inside same SchemaViolations
def mapAndValidateInput[F[_]: Sync](
Expand All @@ -128,8 +105,10 @@ object EnrichmentManager {
etlTstamp: DateTime,
processor: Processor,
client: IgluCirceClient[F],
registryLookup: RegistryLookup[F]
): F[(Option[BadRow], IgluUtils.EventExtractResult)] =
registryLookup: RegistryLookup[F],
badRows: Ref[F, List[BadRow]],
emitIncomplete: Boolean
): IorT[F, BadRow, IgluUtils.EventExtractResult] =
for {
setupViolations <- Sync[F].delay(setupEnrichedEvent(raw, enrichedEvent, etlTstamp, processor))
validated <- IgluUtils.extractAndValidateInputJsons(enrichedEvent, client, raw, processor, registryLookup)
Expand All @@ -154,8 +133,10 @@ object EnrichmentManager {
inputContexts: List[SelfDescribingData[Json]],
unstructEvent: Option[SelfDescribingData[Json]],
legacyOrder: Boolean,
registryLookup: RegistryLookup[F]
): F[(Option[BadRow.EnrichmentFailures], List[SelfDescribingData[Json]])] =
registryLookup: RegistryLookup[F],
badRows: Ref[F, List[BadRow]],
emitIncomplete: Boolean
): IorT[F, BadRow, List[SelfDescribingData[Json]]] =
for {
enriched <- runEnrichments(
registry,
Expand Down Expand Up @@ -844,8 +825,10 @@ object EnrichmentManager {
processor: Processor,
acceptInvalid: Boolean,
invalidCount: F[Unit],
atomicFields: AtomicFields
): F[Option[BadRow.EnrichmentFailures]] =
atomicFields: AtomicFields,
badRows: Ref[F, List[BadRow]],
emitIncomplete: Boolean
): IorT[F, BadRow, Unit] =
// We're using static field's length validation. See more in https://github.com/snowplow/enrich/issues/608
AtomicFieldsLengthValidator
.validate[F](enriched, raw, processor, acceptInvalid, invalidCount, atomicFields)
Expand Down

0 comments on commit d0732fa

Please sign in to comment.