Skip to content

Commit

Permalink
skip kafka records that don't conform to a standard
Browse files Browse the repository at this point in the history
  • Loading branch information
omnipresent07 committed Nov 20, 2020
1 parent 50dc727 commit 7355fc9
Show file tree
Hide file tree
Showing 5 changed files with 86 additions and 25 deletions.
40 changes: 40 additions & 0 deletions common/src/main/scala/hmda/messages/HmdaMessageFilter.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
package hmda.messages

import akka.kafka.ConsumerMessage.{ CommittableMessage, CommittableOffset }
import akka.stream.scaladsl.Source
import com.typesafe.scalalogging.StrictLogging

import scala.concurrent.{ ExecutionContext, Future }
import scala.util.Try

object HmdaMessageFilter extends StrictLogging {

case class StandardMsgKey(lei: String, year: Int, quarter: Option[String])

def parse(key: String): Option[StandardMsgKey] = {
Try {
// lei1:lei2-year-q1
val regex = "^(?<lei1>[A-Z0-9]+):(?<lei2>[A-Z0-9]+)-(?<year>[0-9]{4})(-(?<quarter>q[0-9]{1}))?$".r
for {
onlyMatch <- regex.findFirstMatchIn(key)
lei1 = onlyMatch.group("lei1")
lei2 = onlyMatch.group("lei2")
year = onlyMatch.group("year")
quarterOpt = Option(onlyMatch.group("quarter"))
_ <- if (lei1 == lei2) Some(()) else None
year <- Try(year.toInt).toOption
} yield StandardMsgKey(lei1, year, quarterOpt)
}.toOption.flatten // regex api is not the safest one and we don't want it to throw accidentally
}

type Processor = CommittableMessage[String, String] => Future[CommittableOffset]

def processOnlyValidKeys[V](f: Processor)(implicit ec: ExecutionContext): Processor = msg => {
parse(msg.record.key()) match {
case Some(_) => f(msg)
case None =>
logger.warn(s"Kafka message has invalid key and will be committed without being processed. Msg: ${msg}")
Future(msg.committableOffset)
}
}
}
17 changes: 17 additions & 0 deletions common/src/test/scala/hmda/message/HmdaMessageFilterTest.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package hmda.messages

import hmda.messages.HmdaMessageFilter.StandardMsgKey
import org.scalatest.FunSuite

class HmdaMessageFilterTest extends FunSuite {

test("basic") {
testParse("LEI:LEI-2020", Some(StandardMsgKey("LEI", 2020, None)))
testParse("LEI:LEI-2020-q1", Some(StandardMsgKey("LEI", 2020, Some("q1"))))
}

private def testParse(key: String, expectedResult: Option[StandardMsgKey]) = {
assert(HmdaMessageFilter.parse(key) == expectedResult)
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import akka.stream.scaladsl.{Keep, Sink, Source}
import akka.util.{ByteString, Timeout}
import com.typesafe.config.ConfigFactory
import hmda.analytics.query._
import hmda.messages.HmdaMessageFilter
import hmda.messages.pubsub.{HmdaGroups, HmdaTopics}
import hmda.model.filing.lar.LoanApplicationRegister
import hmda.model.filing.submission.SubmissionId
Expand Down Expand Up @@ -107,10 +108,10 @@ object HmdaAnalyticsApp extends App with TransmittalSheetComponent with LarCompo

Consumer
.committableSource(consumerSettings, Subscriptions.topics(HmdaTopics.signTopic, HmdaTopics.analyticsTopic))
.mapAsync(parallelism) { msg =>
.mapAsync(parallelism)(HmdaMessageFilter.processOnlyValidKeys { msg =>
log.info(s"Processing: $msg")
processData(msg.record.value()).map(_ => msg.committableOffset)
}
})
.toMat(Committer.sink(CommitterSettings(system).withParallelism(2)))(Keep.both)
.mapMaterializedValue(DrainingControl.apply)
.run()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,18 +5,19 @@ import akka.actor.ActorSystem
import akka.actor.typed.scaladsl.adapter._
import akka.kafka.scaladsl.Consumer
import akka.kafka.scaladsl.Consumer.DrainingControl
import akka.kafka.{ ConsumerSettings, Subscriptions }
import akka.kafka.{ConsumerSettings, Subscriptions}
import akka.pattern.ask
import akka.stream.Materializer
import akka.stream.scaladsl.{ Keep, Sink, Source }
import akka.stream.scaladsl.{Keep, Sink, Source}
import akka.util.Timeout
import com.typesafe.config.ConfigFactory
import hmda.census.records.CensusRecords
import hmda.messages.pubsub.{ HmdaGroups, HmdaTopics }
import hmda.messages.HmdaMessageFilter
import hmda.messages.pubsub.{HmdaGroups, HmdaTopics}
import hmda.model.census.Census
import hmda.model.filing.submission.SubmissionId
import hmda.publication.KafkaUtils._
import hmda.publication.lar.publication.{ IrsPublisher, PublishIrs }
import hmda.publication.lar.publication.{IrsPublisher, PublishIrs}
import hmda.util.BankFilterUtils._
import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.common.serialization.StringDeserializer
Expand Down Expand Up @@ -62,7 +63,7 @@ object IrsPublisherApp extends App {

Consumer
.committableSource(consumerSettings, Subscriptions.topics(HmdaTopics.signTopic, HmdaTopics.irsTopic))
.mapAsync(parallelism)(msg => processData(msg.record.value()).map(_ => msg.committableOffset))
.mapAsync(parallelism)(HmdaMessageFilter.processOnlyValidKeys(msg => processData(msg.record.value()).map(_ => msg.committableOffset)))
.mapAsync(parallelism * 2)(offset => offset.commitScaladsl())
.toMat(Sink.seq)(Keep.both)
.mapMaterializedValue(DrainingControl.apply)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,17 @@ package hmda.publication.lar
import akka.Done
import akka.actor.typed.scaladsl.AskPattern._
import akka.actor.typed.scaladsl.adapter._
import akka.actor.typed.{ ActorRef, ActorSystem }
import akka.actor.{ ActorSystem => ClassicActorSystem }
import akka.kafka.scaladsl.{ Committer, Consumer }
import akka.kafka.{ CommitterSettings, ConsumerMessage, ConsumerSettings, Subscriptions }
import akka.actor.typed.{ActorRef, ActorSystem}
import akka.actor.{ActorSystem => ClassicActorSystem}
import akka.kafka.scaladsl.{Committer, Consumer}
import akka.kafka.{CommitterSettings, ConsumerMessage, ConsumerSettings, Subscriptions}
import akka.stream.Materializer
import akka.stream.scaladsl._
import akka.util.Timeout
import com.typesafe.config.ConfigFactory
import hmda.census.records._
import hmda.messages.pubsub.{ HmdaGroups, HmdaTopics }
import hmda.messages.HmdaMessageFilter
import hmda.messages.pubsub.{HmdaGroups, HmdaTopics}
import hmda.model.census.Census
import hmda.model.filing.submission.SubmissionId
import hmda.publication.KafkaUtils._
Expand All @@ -26,8 +27,8 @@ import slick.basic.DatabaseConfig
import slick.jdbc.JdbcProfile

import scala.concurrent.duration._
import scala.concurrent.{ ExecutionContext, Future }
import scala.util.{ Failure, Success }
import scala.concurrent.{ExecutionContext, Future}
import scala.util.{Failure, Success}

// Entrypoint
// $COVERAGE-OFF$
Expand Down Expand Up @@ -101,17 +102,18 @@ object ModifiedLarApp extends App {
val (control, streamCompleted) =
Consumer
.committableSource(consumerSettings, Subscriptions.topics(HmdaTopics.signTopic, HmdaTopics.modifiedLarTopic))
.mapAsync(parallelism) { msg =>
def processMsg(): Future[ConsumerMessage.CommittableOffset] = {
log.info(s"Received a message - key: ${msg.record.key()}, value: ${msg.record.value()}")
processKafkaRecord(msg.record.value().trim).map(_ => msg.committableOffset)
}
akka.pattern.retry(
attempt = () => processMsg(),
attempts = 2,
delay = 90.seconds
)(ec, classicSystem.scheduler)
}
.mapAsync(parallelism)(
HmdaMessageFilter.processOnlyValidKeys { msg =>
def processMsg(): Future[ConsumerMessage.CommittableOffset] = {
log.info(s"Received a message - key: ${msg.record.key()}, value: ${msg.record.value()}")
processKafkaRecord(msg.record.value().trim).map(_ => msg.committableOffset)
}
akka.pattern.retry(
attempt = () => processMsg(),
attempts = 2,
delay = 90.seconds
)(ec, classicSystem.scheduler)
})
.toMat(Committer.sink(CommitterSettings(classicSystem)))(Keep.both)
.run()

Expand Down

0 comments on commit 7355fc9

Please sign in to comment.