Skip to content

Commit

Permalink
[WIP] Add incomplete events (close #)
Browse files Browse the repository at this point in the history
  • Loading branch information
benjben committed Feb 8, 2024
1 parent d7070eb commit d994d6d
Show file tree
Hide file tree
Showing 4 changed files with 216 additions and 150 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@ package object fs2 {
type ByteSink[F[_]] = List[Array[Byte]] => F[Unit]
type AttributedByteSink[F[_]] = List[AttributedData[Array[Byte]]] => F[Unit]

/** Enrichment result, containing list of (valid and invalid) results as well as the collector timestamp */
type Result = (List[Validated[BadRow, EnrichedEvent]], Option[Long])
type Enriched = (Either[BadRow, EnrichedEvent], Option[EnrichedEvent])
type Result = (List[Enriched], Option[Long])

/** Function to transform an origin raw payload into good and/or bad rows */
type Enrich[F[_]] = Array[Byte] => F[Result]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ object AtomicFieldsLengthValidator {
acceptInvalid: Boolean,
invalidCount: F[Unit],
atomicFields: AtomicFields
): F[Either[BadRow, Unit]] =
): F[Either[BadRow.EnrichmentFailures, Unit]] =
atomicFields.value
.map(validateField(event))
.combineAll match {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@ import java.time.Instant
import org.joda.time.DateTime
import io.circe.Json
import cats.{Applicative, Monad}
import cats.data.{EitherT, NonEmptyList, OptionT, StateT}
import cats.effect.Clock
import cats.data.{EitherT, NonEmptyList, OptionT, StateT, Validated}
import cats.effect.kernel.Sync
import cats.implicits._

import com.snowplowanalytics.refererparser._
Expand Down Expand Up @@ -54,9 +54,8 @@ object EnrichmentManager {
* @param raw Canonical input event to enrich
* @param featureFlags The feature flags available in the current version of Enrich
* @param invalidCount Function to increment the count of invalid events
* @return Enriched event or bad row if a problem occured
*/
def enrichEvent[F[_]: Monad: Clock](
def enrichEvent[F[_]: Sync](
registry: EnrichmentRegistry[F],
client: IgluCirceClient[F],
processor: Processor,
Expand All @@ -65,37 +64,93 @@ object EnrichmentManager {
featureFlags: EtlPipeline.FeatureFlags,
invalidCount: F[Unit],
registryLookup: RegistryLookup[F],
atomicFields: AtomicFields
): EitherT[F, BadRow, EnrichedEvent] =
atomicFields: AtomicFields,
emitIncomplete: Boolean = true
): F[(Either[BadRow, EnrichedEvent], Option[EnrichedEvent])] =
for {
enriched <- EitherT.fromEither[F](setupEnrichedEvent(raw, etlTstamp, processor))
extractResult <- IgluUtils.extractAndValidateInputJsons(enriched, client, raw, processor, registryLookup)
_ = {
ME.formatUnstructEvent(extractResult.unstructEvent).foreach(e => enriched.unstruct_event = e)
ME.formatContexts(extractResult.contexts).foreach(c => enriched.contexts = c)
}
enrichmentsContexts <- runEnrichments(
registry,
processor,
raw,
enriched,
extractResult.contexts,
extractResult.unstructEvent,
featureFlags.legacyEnrichmentOrder
)
_ = ME.formatContexts(enrichmentsContexts ::: extractResult.validationInfoContexts).foreach(c => enriched.derived_contexts = c)
_ <- IgluUtils
.validateEnrichmentsContexts[F](client, enrichmentsContexts, raw, processor, enriched, registryLookup)
_ <- EitherT.rightT[F, BadRow](
anonIp(enriched, registry.anonIp).foreach(enriched.user_ipaddress = _)
)
_ <- EitherT.rightT[F, BadRow] {
piiTransform(enriched, registry.piiPseudonymizer).foreach { pii =>
enriched.pii = pii.asString
}
}
_ <- validateEnriched(enriched, raw, processor, featureFlags.acceptInvalid, invalidCount, atomicFields)
} yield enriched
validatedInput <- mapAndValidateInput(raw, etlTstamp, processor, client, registryLookup)
(schemaViolations, enrichedEvent, extractResult) = validatedInput
enriched <- if (schemaViolations.isEmpty || emitIncomplete)
enrich(
enrichedEvent,
registry,
client,
processor,
raw,
extractResult.contexts,
extractResult.unstructEvent,
featureFlags.legacyEnrichmentOrder,
registryLookup
)
else
Sync[F].pure((None, Nil))
(enrichFailures, enrichmentsContexts) = enriched
validationFailures <- if ((schemaViolations.isEmpty && enrichFailures.isEmpty) || emitIncomplete)
validateEnriched(enrichedEvent, raw, processor, featureFlags.acceptInvalid, invalidCount, atomicFields)
else
Sync[F].pure(None)
badRows = List(schemaViolations, enrichFailures, validationFailures).flatten
output = badRows match {
case Nil =>
(Right(enrichedEvent), None)
case head :: _ =>
if (!emitIncomplete)
(Left(head), None)
else {
val failuresContext = createFailuresContext(badRows)
ME.formatContexts(failuresContext :: enrichmentsContexts ::: extractResult.validationInfoContexts)
.foreach(c => enrichedEvent.derived_contexts = c)
(Left(head), Some(enrichedEvent))
}
}
} yield output

// TODO: aggregate all the errors inside same SchemaViolations
def mapAndValidateInput[F[_]: Sync](
raw: RawEvent,
etlTstamp: DateTime,
processor: Processor,
client: IgluCirceClient[F],
registryLookup: RegistryLookup[F]
): F[(Option[BadRow], EnrichedEvent, IgluUtils.EventExtractResult)] =
for {
mapped <- Sync[F].delay(setupEnrichedEvent(raw, etlTstamp, processor))
(enrichmentFailures, enrichedEvent) = mapped
validated <- IgluUtils.extractAndValidateInputJsons(enrichedEvent, client, raw, processor, registryLookup)
(schemaViolations, sdjs) = validated
maybeBadRow = aggregateBadRows(List(enrichmentFailures, schemaViolations))
} yield (maybeBadRow, enrichedEvent, sdjs)

private def aggregateBadRows(badRows: List[Option[BadRow]]): Option[BadRow] =
badRows.flatten.headOption

def enrich[F[_]: Sync](
enrichedEvent: EnrichedEvent,
registry: EnrichmentRegistry[F],
client: IgluCirceClient[F],
processor: Processor,
raw: RawEvent,
inputContexts: List[SelfDescribingData[Json]],
unstructEvent: Option[SelfDescribingData[Json]],
legacyOrder: Boolean,
registryLookup: RegistryLookup[F]
): F[(Option[BadRow.EnrichmentFailures], List[SelfDescribingData[Json]])] =
for {
enriched <- runEnrichments(
enrichedEvent,
registry,
processor,
raw,
inputContexts,
unstructEvent,
legacyOrder
)
(enrichmentFailures, contexts) = enriched
validated <- IgluUtils.validateEnrichmentsContexts[F](client, contexts, raw, processor, enrichedEvent, registryLookup)
_ <- Sync[F].delay(anonIp(enrichedEvent, registry.anonIp).foreach(enrichedEvent.user_ipaddress = _))
_ <- Sync[F].delay(piiTransform(enrichedEvent, registry.piiPseudonymizer).foreach(pii => enrichedEvent.pii = pii.asString))
output = ???
} yield output

/**
* Run all the enrichments and aggregate the errors if any
Expand All @@ -105,32 +160,31 @@ object EnrichmentManager {
* with at least one enrichment
*/
private def runEnrichments[F[_]: Monad](
enriched: EnrichedEvent,
registry: EnrichmentRegistry[F],
processor: Processor,
raw: RawEvent,
enriched: EnrichedEvent,
inputContexts: List[SelfDescribingData[Json]],
unstructEvent: Option[SelfDescribingData[Json]],
legacyOrder: Boolean
): EitherT[F, BadRow.EnrichmentFailures, List[SelfDescribingData[Json]]] =
EitherT {
accState(registry, raw, inputContexts, unstructEvent, legacyOrder)
.runS(Accumulation(enriched, Nil, Nil))
.map {
case Accumulation(_, failures, contexts) =>
failures.toNel match {
case Some(nel) =>
buildEnrichmentFailuresBadRow(
nel,
EnrichedEvent.toPartiallyEnrichedEvent(enriched),
RawEvent.toRawEvent(raw),
processor
).asLeft
case None =>
contexts.asRight
}
}
}
): F[(Option[BadRow], List[SelfDescribingData[Json]])] =
accState(registry, raw, inputContexts, unstructEvent, legacyOrder)
.runS(Accumulation(enriched, Nil, Nil))
.map {
case Accumulation(_, failures, contexts) =>
failures.toNel match {
case Some(nel) =>
val badRow = buildEnrichmentFailuresBadRow(
nel,
EnrichedEvent.toPartiallyEnrichedEvent(enriched),
RawEvent.toRawEvent(raw),
processor
)
(Some(badRow), contexts)
case None =>
(None, contexts)
}
}

private[enrichments] case class Accumulation(
event: EnrichedEvent,
Expand Down Expand Up @@ -249,11 +303,12 @@ object EnrichmentManager {
}

/** Create the mutable [[EnrichedEvent]] and initialize it. */
// TODO create SchemaViolations instead of EnrichmentsFailures
private def setupEnrichedEvent(
raw: RawEvent,
etlTstamp: DateTime,
processor: Processor
): Either[BadRow.EnrichmentFailures, EnrichedEvent] = {
): (Option[BadRow.EnrichmentFailures], EnrichedEvent) = {
val e = new EnrichedEvent()
e.event_id = EE.generateEventId() // May be updated later if we have an `eid` parameter
e.v_collector = raw.source.name // May be updated later if we have a `cv` parameter
Expand All @@ -271,17 +326,18 @@ object EnrichmentManager {
// Map/validate/transform input fields to enriched event fields
val transformed = Transform.transform(raw, e)

(collectorTstamp |+| transformed)
.leftMap { enrichmentFailures =>
EnrichmentManager.buildEnrichmentFailuresBadRow(
(collectorTstamp |+| transformed) match {
case Validated.Invalid(enrichmentFailures) =>
val badRow = EnrichmentManager.buildEnrichmentFailuresBadRow(
enrichmentFailures,
EnrichedEvent.toPartiallyEnrichedEvent(e),
RawEvent.toRawEvent(raw),
processor
)
}
.as(e)
.toEither
(Some(badRow), e)
case _ =>
(None, e)
}
}

def setCollectorTstamp(event: EnrichedEvent, timestamp: Option[DateTime]): Either[FailureDetails.EnrichmentFailure, Unit] =
Expand Down Expand Up @@ -765,9 +821,17 @@ object EnrichmentManager {
acceptInvalid: Boolean,
invalidCount: F[Unit],
atomicFields: AtomicFields
): EitherT[F, BadRow, Unit] =
EitherT {
//We're using static field's length validation. See more in https://github.com/snowplow/enrich/issues/608
AtomicFieldsLengthValidator.validate[F](enriched, raw, processor, acceptInvalid, invalidCount, atomicFields)
}
): F[Option[BadRow.EnrichmentFailures]] =
// We're using static field's length validation. See more in https://github.com/snowplow/enrich/issues/608
AtomicFieldsLengthValidator
.validate[F](enriched, raw, processor, acceptInvalid, invalidCount, atomicFields)
.map {
case Left(br) => Some(br)
case _ => None
}

// TODO
private def createFailuresContext(
badRows: List[BadRow]
): SelfDescribingData[Json] = ???
}
Loading

0 comments on commit d994d6d

Please sign in to comment.