Skip to content

Commit

Permalink
Merge pull request #3916 from omnipresent07/2020-in-irs
Browse files Browse the repository at this point in the history
Pick indexedtract based on the current year
  • Loading branch information
BarakStout authored Nov 18, 2020
2 parents 845aa20 + 56c7300 commit f2ddb60
Show file tree
Hide file tree
Showing 2 changed files with 9 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -47,18 +47,12 @@ object IrsPublisherApp extends App {

implicit val timeout = Timeout(5.seconds)

val censusTractMap2018: Map[String, Census] =
CensusRecords.indexedTract2018

val censusTractMap2019: Map[String, Census] =
CensusRecords.indexedTract2019

val kafkaConfig = system.settings.config.getConfig("akka.kafka.consumer")
val config = ConfigFactory.load()
val parallelism = config.getInt("hmda.lar.irs.parallelism")

val irsPublisher =
system.spawn(IrsPublisher.behavior(censusTractMap2018, censusTractMap2019), IrsPublisher.name)
system.spawn(IrsPublisher.behavior(), IrsPublisher.name)

val consumerSettings: ConsumerSettings[String, String] =
ConsumerSettings(kafkaConfig, new StringDeserializer, new StringDeserializer)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,23 +7,24 @@ import akka.actor.typed.scaladsl.adapter._
import akka.stream.Materializer
import akka.stream.alpakka.s3.ApiVersion.ListBucketVersion2
import akka.stream.alpakka.s3.scaladsl.S3
import akka.stream.alpakka.s3.{ MemoryBufferType, MultipartUploadResult, S3Attributes, S3Settings }
import akka.stream.scaladsl.{ Sink, Source }
import akka.stream.alpakka.s3.{MemoryBufferType, MultipartUploadResult, S3Attributes, S3Settings}
import akka.stream.scaladsl.{Sink, Source}
import akka.util.ByteString
import com.typesafe.config.ConfigFactory
import hmda.census.records.CensusRecords
import hmda.model.census.Census
import hmda.model.filing.lar.LoanApplicationRegister
import hmda.model.filing.submission.SubmissionId
import hmda.parser.filing.lar.LarCsvParser
import hmda.publication.lar.model.{ Msa, MsaMap, MsaSummary }
import hmda.publication.lar.model.{Msa, MsaMap, MsaSummary}
import hmda.query.HmdaQuery._
import hmda.util.streams.FlowUtils.framing
import software.amazon.awssdk.auth.credentials.{ AwsBasicCredentials, StaticCredentialsProvider }
import software.amazon.awssdk.auth.credentials.{AwsBasicCredentials, StaticCredentialsProvider}
import software.amazon.awssdk.regions.Region
import software.amazon.awssdk.regions.providers.AwsRegionProvider

import scala.concurrent.Future
import scala.util.{ Failure, Success }
import scala.util.{Failure, Success}

sealed trait IrsPublisherCommand
case class PublishIrs(submissionId: SubmissionId) extends IrsPublisherCommand
Expand All @@ -47,7 +48,7 @@ object IrsPublisher {

val awsRegionProvider: AwsRegionProvider = () => Region.of(region)

def behavior(indexTractMap2018: Map[String, Census], indexTractMap2019: Map[String, Census]): Behavior[IrsPublisherCommand] =
def behavior(): Behavior[IrsPublisherCommand] =
Behaviors.setup { ctx =>
val log = ctx.log
implicit val ec = ctx.system.executionContext
Expand All @@ -66,11 +67,7 @@ object IrsPublisher {

def getCensus(hmdaGeoTract: String, year: Int): Msa = {

val indexTractMap = year match {
case 2018 => indexTractMap2018
case 2019 => indexTractMap2019
case _ => indexTractMap2019
}
val indexTractMap = CensusRecords.yearTractMap(year)

val censusResult = indexTractMap.getOrElse(hmdaGeoTract, Census())

Expand Down

0 comments on commit f2ddb60

Please sign in to comment.