Skip to content

Commit

Permalink
add endpoints. add retry. stop generating quarter after 8 days
Browse files Browse the repository at this point in the history
  • Loading branch information
omnipresent07 committed Nov 13, 2020
1 parent a6d25b6 commit 631a136
Show file tree
Hide file tree
Showing 19 changed files with 421 additions and 90 deletions.
2 changes: 1 addition & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,7 @@ lazy val `hmda-data-publisher` = (project in file("hmda-data-publisher"))
.settings(hmdaBuildSettings: _*)
.settings(
Seq(
libraryDependencies ++= commonDeps ++ akkaDeps ++ akkaHttpDeps ++ circeDeps ++ slickDeps,
libraryDependencies ++= commonDeps ++ akkaDeps ++ akkaHttpDeps ++ circeDeps ++ slickDeps ++ enumeratumDeps,
mainClass in Compile := Some("hmda.publisher.HmdaDataPublisherApp"),
assemblyJarName in assembly := {
s"${name.value}.jar"
Expand Down
14 changes: 11 additions & 3 deletions hmda-data-publisher/src/main/resources/application.conf
Original file line number Diff line number Diff line change
Expand Up @@ -80,8 +80,16 @@ hmda {
port = ${?GRPC_REGULATOR_PORT}
}
}
publisher.validation.reportingUrl = ""
publisher.validation.reportingUrl = ${?VALIDATION_REPORTING_URL}
publisher {
http {
host = "0.0.0.0"
host = ${?HTTP_REGULATOR_HOST}
port = "9190"
port = ${?HTTP_REGULATOR_PORT}
}
validation.reportingUrl = ""
validation.reportingUrl = ${?VALIDATION_REPORTING_URL}
}
}

private-aws {
Expand Down Expand Up @@ -176,4 +184,4 @@ pg-tables {
//Common PG Email Table
emailTableName ="institutions_emails_2018"
emailTableName =${?EMAIL_TABLE}
}
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package hmda.publisher

import akka.actor.typed.scaladsl.adapter.ClassicActorSystemOps
import akka.actor.{ActorSystem, Props}
import hmda.publisher.api.HmdaDataPublisherApi
import hmda.publisher.helper.PGTableNameLoader
import hmda.publisher.scheduler._
import org.slf4j.LoggerFactory
Expand Down Expand Up @@ -41,13 +43,15 @@ object HmdaDataPublisherApp extends App with PGTableNameLoader {

config.getObject("akka.quartz.schedules").forEach((k, v) => log.info(s"$k = ${v.render()}"))

actorSystem.actorOf(Props[PanelScheduler], "PanelScheduler")
actorSystem.actorOf(Props[LarScheduler], "LarScheduler")
actorSystem.actorOf(Props[TsScheduler], "TsScheduler")
actorSystem.actorOf(Props[LarPublicScheduler], "LarPublicScheduler")
actorSystem.actorOf(Props[TsPublicScheduler], "TsPublicScheduler")

val allSchedulers = AllSchedulers(
larPublicScheduler = actorSystem.actorOf(Props[LarPublicScheduler], "LarPublicScheduler"),
larScheduler = actorSystem.actorOf(Props[LarScheduler], "LarScheduler"),
panelScheduler = actorSystem.actorOf(Props[PanelScheduler], "PanelScheduler"),
tsPublicScheduler = actorSystem.actorOf(Props[TsPublicScheduler], "TsPublicScheduler"),
tsScheduler = actorSystem.actorOf(Props[TsScheduler], "TsScheduler"),
)

actorSystem.spawn[Nothing](HmdaDataPublisherApi(allSchedulers), HmdaDataPublisherApi.name)

}
// $COVERAGE-ON$
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
package hmda.publisher.api

import akka.actor.ActorRef
import akka.http.scaladsl.server.Directives._
import akka.http.scaladsl.server.Route
import ch.megard.akka.http.cors.scaladsl.CorsDirectives._
import de.heikoseeberger.akkahttpcirce.FailFastCirceSupport._
import hmda.publisher.scheduler.AllSchedulers
import hmda.publisher.scheduler.schedules.{ Schedule, Schedules }

import scala.concurrent.ExecutionContext

private class DataPublisherHttpApi(
schedulers: AllSchedulers
)(implicit ec: ExecutionContext) {

//trigger/<schedulername>
private val triggerScheduler =
path("trigger" / Segment) { schedulerName =>
post {
Schedules.withNameOption(schedulerName) match {
case Some(schedule) =>
triggerSchedule(schedule)
complete(202 -> s"Schedule ${schedulerName} has been triggered")
case None =>
complete(404 -> s"Scheduler ${schedulerName} not found. Available: ${Schedules.values.map(_.entryName).mkString(", ")}")
}
}
}

private def triggerSchedule(msg: Schedule): Unit = {
import schedulers._
val receiver = msg match {
case Schedules.PanelScheduler2018 => panelScheduler
case Schedules.PanelScheduler2019 => panelScheduler
case Schedules.LarPublicScheduler2018 => larPublicScheduler
case Schedules.LarPublicScheduler2019 => larPublicScheduler
case Schedules.LarScheduler2018 => larScheduler
case Schedules.LarScheduler2019 => larScheduler
case Schedules.LarSchedulerLoanLimit2019 => larScheduler
case Schedules.LarSchedulerQuarterly2020 => larScheduler
case Schedules.TsPublicScheduler2018 => tsPublicScheduler
case Schedules.TsPublicScheduler2019 => tsPublicScheduler
case Schedules.TsScheduler2018 => tsScheduler
case Schedules.TsScheduler2019 => tsScheduler
case Schedules.TsSchedulerQuarterly2020 => tsScheduler
}
receiver ! msg
}

def routes: Route =
handleRejections(corsRejectionHandler) {
cors() {
encodeResponse {
triggerScheduler
}
}
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
package hmda.publisher.api

import akka.actor.typed.Behavior
import akka.actor.typed.scaladsl.Behaviors
import akka.actor.typed.scaladsl.adapter._
import akka.actor.{ActorSystem, CoordinatedShutdown}
import akka.http.scaladsl.server.Directives._
import hmda.api.http.directives.HmdaTimeDirectives._
import hmda.api.http.routes.BaseHttpApi
import hmda.publisher.scheduler.AllSchedulers

import scala.concurrent.ExecutionContext

// $COVERAGE-OFF$
object HmdaDataPublisherApi {
val name = "hmda-data-publisher-api"

def apply(allSchedulers: AllSchedulers): Behavior[Nothing] = Behaviors.setup[Nothing] { ctx =>
implicit val ec: ExecutionContext = ctx.executionContext
implicit val classic: ActorSystem = ctx.system.toClassic
val shutdown: CoordinatedShutdown = CoordinatedShutdown(ctx.system)
val config = classic.settings.config
val host: String = config.getString("hmda.publisher.http.host")
val port: Int = config.getInt("hmda.publisher.http.port")

val routes = BaseHttpApi.routes(name) ~ new DataPublisherHttpApi(allSchedulers).routes
BaseHttpApi.runServer(shutdown, name)(timed(routes), host, port)

Behaviors.ignore
}
}
// $COVERAGE-ON$
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
package hmda.publisher.helper

import java.time.{Clock, LocalDate}

import hmda.publisher.validation.PublishingGuard.Period
import hmda.util.BankFilterUtils.config
import hmda.util.Filer

class QuarterTimeBarrier(clock: Clock) {

def runIfStillRelevant[T](quarter: Period.Quarter)(thunk: => T): Option[T] = {
val now = LocalDate.now(clock)
if (now.isBefore(QuarterTimeBarrier.getEndDateForQuarter(quarter).plusDays(8))) {
Some(thunk)
} else {
None
}
}


}

object QuarterTimeBarrier {
private val rulesConfig = Filer.parse(config).fold(error => throw new RuntimeException(s"Failed to parse filing rules in HOCON: $error"), identity)

def getEndDateForQuarter(quarter: Period.Quarter): LocalDate = {
quarter match {
case Period.y2020Q1 => LocalDate.ofYearDay(2020,rulesConfig.qf.q1.endDayOfYear)
case Period.y2020Q2 => LocalDate.ofYearDay(2020,rulesConfig.qf.q2.endDayOfYear)
case Period.y2020Q3 => LocalDate.ofYearDay(2020,rulesConfig.qf.q3.endDayOfYear)
}
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
package hmda.publisher.helper

import scala.concurrent.{ExecutionContext, Future}
import scala.concurrent.duration.Duration

object RetryUtils {

def retry[T](retries: Int, delay: Duration)(f: () => Future[T])(implicit ec: ExecutionContext): Future[T] = {
f() recoverWith { case _ if retries > 0 => Future(Thread.sleep(delay.toMillis)).flatMap(_ => retry(retries - 1, delay)(f)) }
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package hmda.publisher.helper

import akka.NotUsed
import akka.stream.Materializer
import akka.stream.alpakka.s3.MultipartUploadResult
import akka.stream.scaladsl.{ Sink, Source }
import akka.util.ByteString

import scala.concurrent.duration.DurationInt
import scala.concurrent.{ ExecutionContext, Future }

object S3Utils {

def uploadWithRetry(
source: Source[ByteString, NotUsed],
uploadSink: Sink[ByteString, Future[MultipartUploadResult]]
)(implicit mat: Materializer, ec: ExecutionContext): Future[MultipartUploadResult] =
RetryUtils.retry(retries = 3, delay = 1.minute)(() => source.runWith(uploadSink))

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
package hmda.publisher.scheduler

import akka.actor.ActorRef

case class AllSchedulers(
larPublicScheduler: ActorRef,
larScheduler: ActorRef,
panelScheduler: ActorRef,
tsPublicScheduler: ActorRef,
tsScheduler: ActorRef
)
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import hmda.publisher.helper.{
PrivateAWSConfigLoader,
PublicAWSConfigLoader,
S3Archiver,
S3Utils,
SnapshotCheck
}
import hmda.publisher.query.component.{ PublisherComponent2018, PublisherComponent2019, PublisherComponent2020 }
Expand Down Expand Up @@ -126,7 +127,8 @@ class LarPublicScheduler

val resultsPSV = for {
_ <- S3Archiver.archiveFileIfExists(bucket, key, bucketPrivate, s3Settings)
uploadResult <- zipStream.via(Archive.zip()).runWith(s3SinkPSV)
source = zipStream.via(Archive.zip())
uploadResult <- S3Utils.uploadWithRetry(source, s3SinkPSV)
} yield uploadResult

resultsPSV onComplete {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,36 +1,31 @@
package hmda.publisher.scheduler

import java.time.LocalDateTime
import java.time.format.DateTimeFormatter
import java.time.{Clock, LocalDateTime}

import akka.NotUsed
import akka.stream.Materializer
import akka.stream.alpakka.s3.ApiVersion.ListBucketVersion2
import akka.stream.alpakka.s3.scaladsl.S3
import akka.stream.alpakka.s3.{ MemoryBufferType, MetaHeaders, S3Attributes, S3Settings }
import akka.stream.alpakka.s3.{MemoryBufferType, MetaHeaders, S3Attributes, S3Settings}
import akka.stream.scaladsl.Source
import akka.util.ByteString
import com.typesafe.akka.extension.quartz.QuartzSchedulerExtension
import hmda.actor.HmdaActor
import hmda.census.records.CensusRecords
import hmda.model.census.Census
import hmda.model.publication.Msa
import hmda.publisher.helper.{ LoanLimitLarHeader, PrivateAWSConfigLoader, SnapshotCheck }
import hmda.publisher.query.component.{ PublisherComponent2018, PublisherComponent2019, PublisherComponent2020 }
import hmda.publisher.helper._
import hmda.publisher.query.component.{PublisherComponent2018, PublisherComponent2019, PublisherComponent2020}
import hmda.publisher.query.lar.LarEntityImpl2019
import hmda.publisher.scheduler.schedules.Schedules.{
LarScheduler2018,
LarScheduler2019,
LarSchedulerLoanLimit2019,
LarSchedulerQuarterly2020
}
import hmda.publisher.scheduler.schedules.Schedules.{LarScheduler2018, LarScheduler2019, LarSchedulerLoanLimit2019, LarSchedulerQuarterly2020}
import hmda.publisher.validation.PublishingGuard
import hmda.publisher.validation.PublishingGuard.{ Period, Scope }
import hmda.publisher.validation.PublishingGuard.{Period, Scope}
import hmda.query.DbConfiguration.dbConfig
import hmda.util.BankFilterUtils._

import scala.concurrent.Future
import scala.util.{ Failure, Success }
import scala.util.{Failure, Success}

class LarScheduler
extends HmdaActor
Expand All @@ -52,6 +47,7 @@ class LarScheduler
def larRepository2020Q2 = new LarRepository2020Q2(dbConfig)
def larRepository2020Q3 = new LarRepository2020Q3(dbConfig)
val publishingGuard: PublishingGuard = PublishingGuard.create(this)(context.system)
val timeBarrier: QuarterTimeBarrier = new QuarterTimeBarrier(Clock.systemDefaultZone())

val indexTractMap2018: Map[String, Census] = CensusRecords.indexedTract2018
val indexTractMap2019: Map[String, Census] = CensusRecords.indexedTract2019
Expand Down Expand Up @@ -136,18 +132,23 @@ class LarScheduler
val includeQuarterly = true
val now = LocalDateTime.now().minusDays(1)
val formattedDate = fullDateQuarterly.format(now)
def publishQuarter[Table <: LarTableBase](quarter: Period, fileNameSuffix: String, repo: LarRepository2020Base[Table]) =
publishingGuard.runIfDataIsValid(quarter, Scope.Private) {
val fileName = formattedDate + fileNameSuffix

val allResultsSource: Source[String, NotUsed] = Source
.fromPublisher(repo.getAllLARs(getFilterList(), includeQuarterly))
.map(larEntity => larEntity.toRegulatorPSV)
def publishQuarter[Table <: LarTableBase](quarter: Period.Quarter, fileNameSuffix: String, repo: LarRepository2020Base[Table]) = {
timeBarrier.runIfStillRelevant(quarter) {
publishingGuard.runIfDataIsValid(quarter, Scope.Private) {
val fileName = formattedDate + fileNameSuffix

val allResultsSource: Source[String, NotUsed] = Source
.fromPublisher(repo.getAllLARs(getFilterList(), includeQuarterly))
.map(larEntity => larEntity.toRegulatorPSV)

def countF: Future[Int] = repo.getAllLARsCount(getFilterList(), includeQuarterly)
def countF: Future[Int] = repo.getAllLARsCount(getFilterList(), includeQuarterly)

publishPSVtoS3(fileName, allResultsSource, countF)
publishPSVtoS3(fileName, allResultsSource, countF)
}
}
}

publishQuarter(Period.y2020Q1, "_quarter_1_2020_lar.txt", larRepository2020Q1)
publishQuarter(Period.y2020Q2, "_quarter_2_2020_lar.txt", larRepository2020Q2)
publishQuarter(Period.y2020Q3, "_quarter_3_2020_lar.txt", larRepository2020Q3)
Expand All @@ -167,7 +168,7 @@ class LarScheduler
s3Sink = S3
.multipartUpload(bucketPrivate, fullFilePath, metaHeaders = MetaHeaders(Map(LarScheduler.entriesCountMetaName -> count.toString)))
.withAttributes(S3Attributes.settings(s3Settings))
result <- bytesStream.runWith(s3Sink)
result <- S3Utils.uploadWithRetry(bytesStream, s3Sink)
} yield result

results onComplete {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import akka.util.ByteString
import com.typesafe.akka.extension.quartz.QuartzSchedulerExtension
import com.typesafe.config.ConfigFactory
import hmda.actor.HmdaActor
import hmda.publisher.helper.{PrivateAWSConfigLoader, SnapshotCheck}
import hmda.publisher.helper.{PrivateAWSConfigLoader, S3Utils, SnapshotCheck}
import hmda.publisher.query.component.{InstitutionEmailComponent, PublisherComponent2018, PublisherComponent2019}
import hmda.publisher.query.panel.{InstitutionAltEntity, InstitutionEmailEntity, InstitutionEntity}
import hmda.publisher.scheduler.schedules.Schedules.{PanelScheduler2018, PanelScheduler2019}
Expand Down Expand Up @@ -78,13 +78,13 @@ class PanelScheduler extends HmdaActor with PublisherComponent2018 with Publishe
val s3Sink =
S3.multipartUpload(bucketPrivate, fullFilePath)
.withAttributes(S3Attributes.settings(s3Settings))
val results: Future[MultipartUploadResult] = Source
val source = Source
.future(allResults)
.mapConcat(seek => seek.toList)
.mapAsync(1)(institution => appendEmailDomains2018(institution))
.map(institution => institution.toPSV + "\n")
.map(s => ByteString(s))
.runWith(s3Sink)
val results: Future[MultipartUploadResult] = S3Utils.uploadWithRetry(source, s3Sink)

results onComplete {
case Success(result) =>
Expand All @@ -107,13 +107,13 @@ class PanelScheduler extends HmdaActor with PublisherComponent2018 with Publishe
val s3Sink =
S3.multipartUpload(bucketPrivate, fullFilePath)
.withAttributes(S3Attributes.settings(s3Settings))
val results: Future[MultipartUploadResult] = Source
val source = Source
.future(allResults)
.mapConcat(seek => seek.toList)
.mapAsync(1)(institution => appendEmailDomains(institution))
.map(institution => institution.toPSV + "\n")
.map(s => ByteString(s))
.runWith(s3Sink)
val results: Future[MultipartUploadResult] = S3Utils.uploadWithRetry(source, s3Sink)

results onComplete {
case Success(result) =>
Expand Down
Loading

0 comments on commit 631a136

Please sign in to comment.