Skip to content

Commit

Permalink
checkpoint
Browse files Browse the repository at this point in the history
  • Loading branch information
omnipresent07 committed Sep 28, 2019
1 parent dec0db9 commit 13ccf34
Show file tree
Hide file tree
Showing 40 changed files with 1,143 additions and 1,026 deletions.
2 changes: 1 addition & 1 deletion .scalafmt.conf
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
version = "2.0.1"
version = "2.4.2"
maxColumn = 140
align = most
continuationIndent.defnSite = 2
Expand Down
Original file line number Diff line number Diff line change
@@ -1,26 +1,78 @@
package hmda.api.http.directives

import akka.event.LoggingAdapter
import akka.http.scaladsl.server.Directive0
import akka.http.scaladsl.model.{ HttpRequest, HttpResponse, StatusCodes }
import akka.http.scaladsl.server.{ Directive0, Route, RouteResult }
import akka.http.scaladsl.server.Directives._
import akka.http.scaladsl.server.RouteResult.{ Complete, Rejected }
import akka.stream.scaladsl.Flow
import akka.util.ByteString
import org.slf4j.{ Logger, LoggerFactory }

trait HmdaTimeDirectives {
import scala.concurrent.ExecutionContext
import scala.util.{ Failure, Success, Try }

val log: LoggingAdapter
object HmdaTimeDirectives {
def timed(route: Route)(implicit ec: ExecutionContext): Route =
aroundRequest(timeRequest)(ec)(route)

def timedGet = get & time & extractUri
def timedPost = post & time & extractUri
def timedPut = put & time & extractUri
def timedDelete = delete & time & extractUri
def timedOptions = options & time & extractUri
private val log: Logger = LoggerFactory.getLogger(getClass)

def time: Directive0 = {
val startTime = System.currentTimeMillis()
mapResponse { response =>
val endTime = System.currentTimeMillis()
val responseTime = endTime - startTime
log.debug(s"Request took $responseTime ms")
response
private val timeoutResponse = HttpResponse(StatusCodes.NetworkReadTimeout, entity = "Unable to serve response within time limit.")

// Reference: https://blog.softwaremill.com/measuring-response-time-in-akka-http-7b6312ec70cf
private def timeRequest(request: HttpRequest): Try[RouteResult] => Unit = {
val start = System.currentTimeMillis()

{
case Success(Complete(resp)) =>
val end = System.currentTimeMillis()
val responseTime = end - start
log.info(s"[${resp.status.intValue()}] ${request.method.name} ${request.uri} took: $responseTime ms")

case Success(Rejected(_)) =>
log.debug("Request was rejected, not timing it")

case Failure(_) =>
log.debug("Request failed, not timing it")
}
}
}

// Reference: https://blog.softwaremill.com/measuring-response-time-in-akka-http-7b6312ec70cf
private def aroundRequest(onRequest: HttpRequest => Try[RouteResult] => Unit)(implicit ec: ExecutionContext): Directive0 =
extractRequestContext.flatMap { ctx =>
val onDone = onRequest(ctx.request)
mapInnerRoute { inner =>
withRequestTimeoutResponse { _ =>
onDone(Success(Complete(timeoutResponse)))
timeoutResponse
} {
inner.andThen { resultFuture =>
resultFuture.map {
case c @ Complete(response) =>
Complete(response.mapEntity { entity =>
if (entity.isKnownEmpty()) {
onDone(Success(c))
entity
} else {
// On an empty entity, `transformDataBytes` unsets `isKnownEmpty`.
// Call onDone right away, since there's no significant amount of
// data to send, anyway.
entity.transformDataBytes(Flow[ByteString].watchTermination() {
case (m, f) =>
f.map(_ => c).onComplete(onDone)
m
})
}
})
case other =>
onDone(Success(other))
other
}.andThen { // skip this if you use akka.http.scaladsl.server.handleExceptions, put onDone there
case Failure(ex) =>
onDone(Failure(ex))
}
}
}
}
}
}
27 changes: 12 additions & 15 deletions common/src/main/scala/hmda/api/http/routes/BaseHttpApi.scala
Original file line number Diff line number Diff line change
Expand Up @@ -3,38 +3,35 @@ package hmda.api.http.routes
import java.net.InetAddress
import java.time.Instant

import akka.actor.ActorSystem
import akka.event.LoggingAdapter
import akka.http.scaladsl.marshalling.ToResponseMarshallable
import hmda.api.http.directives.HmdaTimeDirectives._
import akka.http.scaladsl.server.Directives._
import akka.stream.ActorMaterializer
import akka.http.scaladsl.server.Route
import de.heikoseeberger.akkahttpcirce.FailFastCirceSupport._
import hmda.api.http.directives.HmdaTimeDirectives
import hmda.api.http.model.HmdaServiceStatus
import io.circe.generic.auto._
import org.slf4j.Logger

trait BaseHttpApi extends HmdaTimeDirectives {
import scala.concurrent.ExecutionContext

implicit val system: ActorSystem
implicit val materializer: ActorMaterializer
val log: LoggingAdapter
trait BaseHttpApi {
val log: Logger
implicit val ec: ExecutionContext

def rootPath(name: String) =
private def rootPath(name: String): Route =
pathSingleSlash {
timedGet { _ =>
timed {
complete {
val now = Instant.now.toString
val host = InetAddress.getLocalHost.getHostName
val status = HmdaServiceStatus("OK", name, now, host)
log.debug(status.toString)
ToResponseMarshallable(status)
status
}
}
}

def routes(apiName: String) =
def routes(apiName: String): Route =
encodeResponse {
rootPath(apiName)
}

}
}
30 changes: 29 additions & 1 deletion common/src/main/scala/hmda/api/ws/routes/BaseWsApi.scala
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,34 @@ trait BaseWsApi {
}
}

def routes(apiName: String): Route = cors() { rootPath(apiName) }
def routes(apiName: String): Route = cors()(rootPath(apiName))

}

object BaseWsApi {
def route(apiName: String): Route = cors() {
rootPath(apiName)
}

private def rootPath(name: String): Route =
pathSingleSlash {
get {
handleWebSocketMessages(baseHandler(name))
}
}

private def baseHandler(name: String): Flow[Message, Message, NotUsed] =
Flow[Message].map {
case TextMessage.Strict(txt) =>
txt match {
case "status" =>
val now = Instant.now.toString
val host = InetAddress.getLocalHost.getHostName
val status = HmdaServiceStatus("OK", name, now, host)
TextMessage.Strict(status.asJson.toString)

case _ => TextMessage.Strict("Message not supported")
}
case _ => TextMessage.Strict("Message not supported")
}
}
14 changes: 6 additions & 8 deletions common/src/main/scala/hmda/projection/ResumableProjection.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,12 @@ package hmda.projection

import akka.actor.typed.scaladsl.AskPattern._
import akka.actor.typed.scaladsl.Behaviors
import akka.actor.typed.scaladsl.adapter._
import akka.actor.typed.{ Behavior, TypedActorContext }
import akka.actor.{ ActorSystem, Scheduler }
import akka.actor.typed.{ ActorSystem, Behavior, Scheduler, TypedActorContext }
import akka.persistence.query.{ EventEnvelope, NoOffset, Offset }
import akka.persistence.typed.PersistenceId
import akka.persistence.typed.scaladsl.EventSourcedBehavior.CommandHandler
import akka.persistence.typed.scaladsl.{ Effect, EventSourcedBehavior }
import akka.stream.ActorMaterializer
import akka.stream.Materializer
import akka.stream.scaladsl.Sink
import akka.util.Timeout
import hmda.messages.projection.CommonProjectionMessages._
Expand All @@ -28,7 +26,7 @@ trait ResumableProjection {
def behavior: Behavior[ProjectionCommand] =
Behaviors.setup { ctx =>
EventSourcedBehavior[ProjectionCommand, ProjectionEvent, ResumableProjectionState](
persistenceId = PersistenceId(entityTypeHint = "", entityId = name, separator = ""),
persistenceId = PersistenceId.ofUniqueId(name),
emptyState = ResumableProjectionState(),
commandHandler = commandHandler(ctx),
eventHandler = eventHandler
Expand All @@ -41,9 +39,9 @@ trait ResumableProjection {
val log = ctx.asScala.log
cmd match {
case StartStreaming =>
implicit val system: ActorSystem = ctx.asScala.system.toClassic
implicit val materializer: ActorMaterializer = ActorMaterializer()
implicit val scheduler: Scheduler = system.scheduler
val system: ActorSystem[_] = ctx.asScala.system
implicit val materializer: Materializer = Materializer(system)
implicit val scheduler: Scheduler = system.scheduler
log.info("Streaming messages from {}", name)
readJournal(system)
.eventsByTag("institution", state.offset)
Expand Down
58 changes: 32 additions & 26 deletions common/src/main/scala/hmda/publication/KafkaUtils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -3,18 +3,18 @@ package hmda.publication
import java.util.UUID

import akka.Done
import akka.actor.ActorSystem
import akka.actor.typed.ActorSystem
import akka.kafka.scaladsl.Consumer.DrainingControl
import akka.kafka.scaladsl.{ Consumer, Producer }
import akka.kafka.{ ConsumerSettings, ProducerSettings, Subscriptions }
import akka.stream.ActorMaterializer
import akka.stream.{ ActorMaterializer, Materializer }
import akka.stream.scaladsl.{ Keep, Sink, Source }
import com.typesafe.config.ConfigFactory
import com.typesafe.config.{ Config, ConfigFactory }
import hmda.messages.institution.InstitutionEvents.InstitutionKafkaEvent
import hmda.messages.pubsub.HmdaGroups
import hmda.serialization.kafka.{ InstitutionKafkaEventsDeserializer, InstitutionKafkaEventsSerializer }
import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.clients.producer.{ Producer => KafkaProducer, ProducerRecord }
import org.apache.kafka.clients.producer.{ ProducerRecord, Producer => KafkaProducer }
import org.apache.kafka.common.serialization.{ StringDeserializer, StringSerializer }

import scala.concurrent.{ ExecutionContext, Future }
Expand All @@ -24,7 +24,7 @@ object KafkaUtils {
val config = ConfigFactory.load()
val kafkaHosts = config.getString("kafka.hosts")

def getStringKafkaProducer(system: ActorSystem): KafkaProducer[String, String] = {
def getStringKafkaProducer(system: ActorSystem[_]): KafkaProducer[String, String] = {

val kafkaConfig = system.settings.config.getConfig("akka.kafka.producer")
val producerSettings =
Expand All @@ -34,19 +34,24 @@ object KafkaUtils {
producerSettings.createKafkaProducer()
}

def getInstitutionKafkaProducer(system: ActorSystem): KafkaProducer[String, InstitutionKafkaEvent] = {
def getInstitutionKafkaProducer(system: ActorSystem[_]): KafkaProducer[String, InstitutionKafkaEvent] = {
val kafkaConfig = system.settings.config.getConfig("akka.kafka.producer")
val producerSettings =
ProducerSettings(kafkaConfig, new StringSerializer, new InstitutionKafkaEventsSerializer)
.withBootstrapServers(kafkaHosts)
producerSettings.createKafkaProducer()
}

def produceInstitutionRecord(topic: String, key: String, value: InstitutionKafkaEvent, kafkaProducer: KafkaProducer[String, InstitutionKafkaEvent])(implicit system: ActorSystem,
materializer: ActorMaterializer): Future[Done] = {
def produceInstitutionRecord(
topic: String,
key: String,
value: InstitutionKafkaEvent,
kafkaProducer: KafkaProducer[String, InstitutionKafkaEvent],
config: Config
)(implicit materializer: Materializer): Future[Done] = {

val producerSettings =
ProducerSettings(system, new StringSerializer, new InstitutionKafkaEventsSerializer)
ProducerSettings(config, new StringSerializer, new InstitutionKafkaEventsSerializer)
.withBootstrapServers(kafkaHosts)

Source
Expand All @@ -55,11 +60,13 @@ object KafkaUtils {
.run()
}

def produceRecord(topic: String, key: String, value: String, producer: KafkaProducer[String, String])(implicit system: ActorSystem,
materializer: ActorMaterializer): Future[Done] = {
def produceRecord(topic: String, key: String, value: String, producer: KafkaProducer[String, String], config: Config)(
implicit system: ActorSystem[_],
materializer: Materializer
): Future[Done] = {

val producerSettings =
ProducerSettings(system, new StringSerializer, new StringSerializer)
ProducerSettings(config, new StringSerializer, new StringSerializer)
.withBootstrapServers(kafkaHosts)

Source
Expand All @@ -68,9 +75,11 @@ object KafkaUtils {
.run()
}

def consumeRecords(topic: String, f: Future[Done], parallelism: Int)(implicit system: ActorSystem,
materializer: ActorMaterializer,
ec: ExecutionContext) = {
def consumeRecords(
topic: String,
f: Future[Done],
parallelism: Int
)(implicit system: ActorSystem[_], materializer: Materializer, ec: ExecutionContext) = {

val config = system.settings.config.getConfig("akka.kafka.consumer")

Expand All @@ -82,21 +91,20 @@ object KafkaUtils {

Consumer
.committableSource(consumerSettings, Subscriptions.topics(topic))
.mapAsync(parallelism * 2) { msg =>
f.map(_ => msg.committableOffset)
}
.mapAsync(parallelism * 2)(msg => f.map(_ => msg.committableOffset))
.mapAsync(parallelism)(offset => offset.commitScaladsl())
.toMat(Sink.seq)(Keep.both)
.mapMaterializedValue(DrainingControl.apply)
.run()

}

def consumeInstitutionRecords(topic: String, f: Future[Done], parallelism: Int)(implicit system: ActorSystem,
materializer: ActorMaterializer,
ec: ExecutionContext) = {

val config = system.settings.config.getConfig("akka.kafka.consumer")
def consumeInstitutionRecords(
topic: String,
f: Future[Done],
parallelism: Int,
config: Config
)(implicit materializer: Materializer, ec: ExecutionContext) = {

val consumerSettings: ConsumerSettings[String, InstitutionKafkaEvent] =
ConsumerSettings(config, new StringDeserializer, new InstitutionKafkaEventsDeserializer)
Expand All @@ -106,9 +114,7 @@ object KafkaUtils {

Consumer
.committableSource(consumerSettings, Subscriptions.topics(topic))
.mapAsync(parallelism * 2) { msg =>
f.map(_ => msg.committableOffset)
}
.mapAsync(parallelism * 2)(msg => f.map(_ => msg.committableOffset))
.mapAsync(parallelism)(offset => offset.commitScaladsl())
.toMat(Sink.seq)(Keep.both)
.mapMaterializedValue(DrainingControl.apply)
Expand Down
Loading

0 comments on commit 13ccf34

Please sign in to comment.