Skip to content

Commit

Permalink
Use concurrent map for tracking partition writers
Browse files Browse the repository at this point in the history
  • Loading branch information
mjakubowski84 committed May 4, 2022
1 parent 1aea502 commit eef6eb3
Show file tree
Hide file tree
Showing 3 changed files with 17 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,10 @@ import org.slf4j.LoggerFactory
import java.util.UUID
import java.util.concurrent.TimeUnit
import scala.concurrent.duration.FiniteDuration

import com.github.mjakubowski84.parquet4s.ParquetPartitioningFlow.*

import scala.collection.concurrent.TrieMap

object ParquetPartitioningFlow {

val DefaultMaxCount: Long = HadoopParquetWriter.DEFAULT_BLOCK_SIZE
Expand Down Expand Up @@ -266,11 +267,11 @@ private class ParquetPartitioningFlow[T, W](
var written: Long
)

private var writers: Map[Path, WriterState] = Map.empty
private val writers = TrieMap.empty[Path, WriterState]

setHandlers(in, out, this)

private def compressionExtension: String = writeOptions.compressionCodecName.getExtension
private val compressionExtension: String = writeOptions.compressionCodecName.getExtension
private def newFileName: String = UUID.randomUUID().toString + compressionExtension + ".parquet"

private def partition(record: RowParquetRecord): (Path, RowParquetRecord) =
Expand All @@ -297,11 +298,8 @@ private class ParquetPartitioningFlow[T, W](

pathsAndPartitionedRecords.foldLeft(Map.empty[Path, Long]) {
case (modifiedPartitions, (writerPath, partitionedRecord)) =>
val state = writers.get(writerPath) match {
case Some(state) =>
state

case None =>
val state = writers.getOrElseUpdate(
writerPath, {
logger.debug("Creating writer to write to [{}]", writerPath)

val writer = ParquetWriter.internalWriter(
Expand All @@ -315,10 +313,10 @@ private class ParquetPartitioningFlow[T, W](
written = 0L
)

writers += writerPath -> state
scheduleNextRotation(writerPath, maxDuration)
state
}
}
)

state.writer.write(partitionedRecord)
state.written += 1
Expand All @@ -327,16 +325,14 @@ private class ParquetPartitioningFlow[T, W](
}
}

private def close(path: Path): Unit = {
cancelTimer(path)
writers.get(path) match {
private def close(path: Path): Unit =
writers.remove(path) match {
case Some(writerState) =>
cancelTimer(path)
writerState.writer.close()
writers -= path
case None =>
logger.debug("Trying to close a writer for a path [{}] no state was found", path)
logger.debug("Trying to close a writer for a path [{}], no state was found", path)
}
}

override def onTimer(timerKey: Any): Unit =
timerKey match {
Expand Down Expand Up @@ -373,13 +369,10 @@ private class ParquetPartitioningFlow[T, W](
}

override def postStop(): Unit = {
writers.foreach { case (path, state) =>
cancelTimer(path)
state.writer.close()
writers.keySet.foreach { path =>
close(path)
}

writers = Map.empty

super.postStop()
}

Expand Down
4 changes: 2 additions & 2 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,8 @@ lazy val three = "3.1.1"
lazy val supportedScalaVersions = Seq(twoTwelve, twoThirteen, three)

ThisBuild / organization := "com.github.mjakubowski84"
ThisBuild / version := "2.6.0-SNAPSHOT"
ThisBuild / isSnapshot := true
ThisBuild / version := "2.5.1"
ThisBuild / isSnapshot := false
ThisBuild / scalaVersion := twoThirteen

ThisBuild / javacOptions ++= Seq("-source", "1.8", "-target", "1.8")
Expand Down
2 changes: 1 addition & 1 deletion project/metals.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -2,5 +2,5 @@

// This file enables sbt-bloop to create bloop config files.

addSbtPlugin("ch.epfl.scala" % "sbt-bloop" % "1.4.13")
addSbtPlugin("ch.epfl.scala" % "sbt-bloop" % "1.5.0")

0 comments on commit eef6eb3

Please sign in to comment.