Skip to content

Commit

Permalink
Add API to write single file using custom ParquetWriter
Browse files Browse the repository at this point in the history
  • Loading branch information
flipp5b authored and mjakubowski84 committed Feb 9, 2023
1 parent f532753 commit 7bb5f07
Show file tree
Hide file tree
Showing 10 changed files with 234 additions and 44 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package com.github.mjakubowski84.parquet4s

import akka.Done
import akka.stream.scaladsl.{Flow, Keep, Sink}
import org.apache.parquet.hadoop.ParquetWriter as HadoopParquetWriter
import org.apache.parquet.schema.MessageType
import org.slf4j.{Logger, LoggerFactory}

Expand All @@ -22,6 +23,17 @@ object SingleFileParquetSink {
/** Creates a builder of pipe that processes generic records
*/
def generic(schema: MessageType): Builder[RowParquetRecord]

/** Creates a builder of a sink that processes data of a given type using a
* [[org.apache.parquet.hadoop.ParquetWriter]] built from a provided
* [[org.apache.parquet.hadoop.ParquetWriter.Builder]].
* @tparam T
* Schema type
* @tparam B
* Type of custom [[org.apache.parquet.hadoop.ParquetWriter.Builder]]
*/
@experimental
def custom[T, B <: HadoopParquetWriter.Builder[T, B]](builder: B): CustomBuilder[T]
}

private[parquet4s] object ToParquetImpl extends ToParquet {
Expand All @@ -32,6 +44,8 @@ object SingleFileParquetSink {
schemaResolver = RowParquetRecord.genericParquetSchemaResolver(schema),
encoder = RowParquetRecord.genericParquetRecordEncoder
)
override def custom[T, B <: HadoopParquetWriter.Builder[T, B]](builder: B): CustomBuilder[T] =
CustomBuilderImpl(builder)
}

trait Builder[T] {
Expand All @@ -54,12 +68,38 @@ object SingleFileParquetSink {
encoder: ParquetRecordEncoder[T]
) extends Builder[T] {
override def options(options: ParquetWriter.Options): Builder[T] = this.copy(options = options)
override def write(path: Path): Sink[T, Future[Done]] = apply(path, options)
override def write(path: Path): Sink[T, Future[Done]] = rowParquetRecordSink(path, options)
}

trait CustomBuilder[T] {

/** @param options
* writer options
*/
def options(options: ParquetWriter.Options): CustomBuilder[T]

/** @return
* final [[akka.stream.scaladsl.Sink]]
*/
def write: Sink[T, Future[Done]]
}

private case class CustomBuilderImpl[T, B <: HadoopParquetWriter.Builder[T, B]](
builder: B,
maybeOptions: Option[ParquetWriter.Options] = None
) extends CustomBuilder[T] {
override def options(options: ParquetWriter.Options): CustomBuilder[T] =
this.copy(maybeOptions = Some(options))

override def write: Sink[T, Future[Done]] = {
val writer = maybeOptions.fold(builder)(_.applyTo[T, B](builder)).build()
sink(writer)
}
}

private val logger: Logger = LoggerFactory.getLogger(this.getClass)

private def apply[T: ParquetRecordEncoder: ParquetSchemaResolver](
private def rowParquetRecordSink[T: ParquetRecordEncoder: ParquetSchemaResolver](
path: Path,
options: ParquetWriter.Options = ParquetWriter.Options()
): Sink[T, Future[Done]] = {
Expand All @@ -71,9 +111,14 @@ object SingleFileParquetSink {

Flow[T]
.map(encode)
.toMat(sink(writer))(Keep.right)
}

private def sink[T](writer: HadoopParquetWriter[T]): Sink[T, Future[Done]] =
Flow[T]
.fold(0) { case (acc, record) => writer.write(record); acc + 1 }
.map { count =>
if (logger.isDebugEnabled) logger.debug(s"$count records were successfully written to $path")
if (logger.isDebugEnabled) logger.debug(s"$count records were successfully written")
try writer.close()
catch {
case _: NullPointerException => // ignores bug in Parquet
Expand All @@ -84,6 +129,5 @@ object SingleFileParquetSink {
throw e
}
.toMat(Sink.ignore)(Keep.right)
}

}
2 changes: 2 additions & 0 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,7 @@ lazy val examples = (project in file("examples"))
publishLocal / skip := true,
libraryDependencies ++= Seq(
"org.apache.hadoop" % "hadoop-client" % hadoopVersion,
"org.apache.parquet" % "parquet-protobuf" % parquetVersion,
"io.github.embeddedkafka" %% "embedded-kafka" % "3.3.1",
"ch.qos.logback" % "logback-classic" % logbackVersion,
"org.slf4j" % "log4j-over-slf4j" % slf4jVersion,
Expand All @@ -200,6 +201,7 @@ lazy val examples = (project in file("examples"))
)
.settings(compilationSettings)
.dependsOn(akka, fs2)
.enablePlugins(ProtobufPlugin)

lazy val coreBenchmarks = (project in file("coreBenchmarks"))
.settings(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,19 @@ object ParquetWriter {
validationEnabled: Boolean = HadoopParquetWriter.DEFAULT_IS_VALIDATING_ENABLED,
hadoopConf: Configuration = new Configuration(),
timeZone: TimeZone = TimeZone.getDefault
)
) {
private[parquet4s] def applyTo[T, B <: HadoopParquetWriter.Builder[T, B]](builder: B): B =
builder
.withWriteMode(writeMode)
.withCompressionCodec(compressionCodecName)
.withDictionaryEncoding(dictionaryEncodingEnabled)
.withDictionaryPageSize(dictionaryPageSize)
.withMaxPaddingSize(maxPaddingSize)
.withPageSize(pageSize)
.withRowGroupSize(rowGroupSize)
.withValidation(validationEnabled)
.withConf(hadoopConf)
}

/** Builder of [[ParquetWriter]].
* @tparam T
Expand Down Expand Up @@ -117,16 +129,8 @@ object ParquetWriter {
}

private[parquet4s] def internalWriter(path: Path, schema: MessageType, options: Options): InternalWriter =
new InternalBuilder(path, schema)
.withWriteMode(options.writeMode)
.withCompressionCodec(options.compressionCodecName)
.withDictionaryEncoding(options.dictionaryEncodingEnabled)
.withDictionaryPageSize(options.dictionaryPageSize)
.withMaxPaddingSize(options.maxPaddingSize)
.withPageSize(options.pageSize)
.withRowGroupSize(options.rowGroupSize)
.withValidation(options.validationEnabled)
.withConf(options.hadoopConf)
options
.applyTo[RowParquetRecord, InternalBuilder](new InternalBuilder(path, schema))
.build()

/** Writes iterable collection of data as a Parquet files at given path. Path can represent local file or directory,
Expand Down
8 changes: 8 additions & 0 deletions examples/src/main/protobuf/data.proto
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
syntax = "proto3";

option java_package = "com.github.mjakubowski84.parquet4s.protobuf";

message Data {
int32 id = 1;
string text = 2;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
package com.github.mjakubowski84.parquet4s.akka

import akka.actor.ActorSystem
import akka.stream.scaladsl.Source
import com.github.mjakubowski84.parquet4s.{ParquetStreams, Path}
import com.github.mjakubowski84.parquet4s.protobuf.DataOuterClass.Data
import org.apache.parquet.proto.ProtoParquetWriter

import java.nio.file.Files
import scala.util.Random

object CustomParquetWriterAkkaApp extends App {
val count = 100
val data = (1 to count).map(i => Data.newBuilder.setId(i).setText(Random.nextString(4)).build)
val path = Path(Files.createTempDirectory("example"))

implicit val system: ActorSystem = ActorSystem()

import system.dispatcher

val builder = ProtoParquetWriter.builder[Data](path.append("data.parquet").hadoopPath).withMessage(classOf[Data])

val sink = ParquetStreams.toParquetSingleFile
.custom[Data, ProtoParquetWriter.Builder[Data]](builder)
.write

for {
_ <- Source(data).runWith(sink)
_ <- system.terminate()
} yield ()
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
package com.github.mjakubowski84.parquet4s.fs2

import cats.effect.{IO, IOApp}
import com.github.mjakubowski84.parquet4s.Path
import com.github.mjakubowski84.parquet4s.parquet.*
import com.github.mjakubowski84.parquet4s.protobuf.DataOuterClass.Data
import fs2.io.file.Files
import fs2.{Pipe, Stream}
import org.apache.parquet.proto.ProtoParquetWriter

import scala.util.Random

object CustomParquetWriterFS2App extends IOApp.Simple {
private val Count = 100

override def run: IO[Unit] = {
def write(path: Path): Pipe[IO, Data, Nothing] = {
val builder = ProtoParquetWriter.builder[Data](path.hadoopPath).withMessage(classOf[Data])
writeSingleFile[IO]
.custom[Data, ProtoParquetWriter.Builder[Data]](builder)
.write
}

val stream = for {
path <- Stream
.resource(Files[IO].tempDirectory(None, "", None))
.map(fs2Path => Path(fs2Path.toNioPath).append("data.parquet"))
_ <- Stream
.range[IO, Int](start = 0, stopExclusive = Count)
.map(i => Data.newBuilder.setId(i).setText(Random.nextString(4)).build)
.through(write(path))
} yield ()

stream.compile.drain
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,11 @@ import com.github.mjakubowski84.parquet4s.{
ParquetWriter,
Path,
RowParquetRecord,
ValueCodecConfiguration
ValueCodecConfiguration,
experimental
}
import fs2.{Chunk, Pipe, Pull, Stream}
import org.apache.parquet.hadoop.ParquetWriter as HadoopParquetWriter
import org.apache.parquet.schema.MessageType

import scala.language.higherKinds
Expand All @@ -28,6 +30,17 @@ private[parquet4s] object writer {
/** Creates a builder of pipe that processes generic records
*/
def generic(schema: MessageType): Builder[F, RowParquetRecord]

/** Creates a builder of pipe that processes data of a given type using a
* [[org.apache.parquet.hadoop.ParquetWriter]] built from a provided
* [[org.apache.parquet.hadoop.ParquetWriter.Builder]].
* @tparam T
* Schema type
* @tparam B
* Type of custom [[org.apache.parquet.hadoop.ParquetWriter.Builder]]
*/
@experimental
def custom[T, B <: HadoopParquetWriter.Builder[T, B]](builder: B): CustomBuilder[F, T]
}

private[parquet4s] class ToParquetImpl[F[_]: Sync] extends ToParquet[F] {
Expand All @@ -39,6 +52,8 @@ private[parquet4s] object writer {
encoder = RowParquetRecord.genericParquetRecordEncoder,
sync = Sync[F]
)
override def custom[T, B <: HadoopParquetWriter.Builder[T, B]](builder: B): CustomBuilder[F, T] =
CustomBuilderImpl(builder)
}

trait Builder[F[_], T] {
Expand All @@ -62,18 +77,40 @@ private[parquet4s] object writer {
sync: Sync[F]
) extends Builder[F, T] {
override def options(options: ParquetWriter.Options): Builder[F, T] = this.copy(options = options)
override def write(path: Path): Pipe[F, T, Nothing] = pipe[F, T](path, options)
override def write(path: Path): Pipe[F, T, Nothing] = rowParquetRecordPipe[F, T](path, options)
}

private class Writer[T, F[_]](internalWriter: ParquetWriter.InternalWriter, encode: T => F[RowParquetRecord])(implicit
F: Sync[F]
) extends AutoCloseable {
trait CustomBuilder[F[_], T] {

/** @param options
* writer options
*/
def options(options: ParquetWriter.Options): CustomBuilder[F, T]

/** @return
* final [[fs2.Pipe]]
*/
def write: Pipe[F, T, Nothing]
}

private case class CustomBuilderImpl[F[_]: Sync, T, B <: HadoopParquetWriter.Builder[T, B]](
builder: B,
maybeOptions: Option[ParquetWriter.Options] = None
) extends CustomBuilder[F, T] {
override def options(options: ParquetWriter.Options): CustomBuilder[F, T] =
this.copy(maybeOptions = Some(options))

override def write: Pipe[F, T, Nothing] =
pipe(
maybeOptions
.fold(builder)(_.applyTo[T, B](builder))
.build()
)
}

private class Writer[T, F[_]](internalWriter: HadoopParquetWriter[T])(implicit F: Sync[F]) extends AutoCloseable {
def write(elem: T): F[Unit] =
for {
record <- encode(elem)
_ <- F.delay(scala.concurrent.blocking(internalWriter.write(record)))
} yield ()
F.delay(scala.concurrent.blocking(internalWriter.write(elem)))

def writePull(chunk: Chunk[T]): Pull[F, Nothing, Unit] =
Pull.eval(chunk.traverse_(write))
Expand All @@ -92,28 +129,28 @@ private[parquet4s] object writer {
case _: NullPointerException => // ignores bug in Parquet
}
}
private object Writer {
def apply[T, F[_]](makeParquetWriter: => HadoopParquetWriter[T])(implicit
F: Sync[F]
): Resource[F, Writer[T, F]] =
Resource.fromAutoCloseable(
F.blocking(makeParquetWriter).map(new Writer[T, F](_))
)
}

private def pipe[F[_]: Sync, T: ParquetRecordEncoder: ParquetSchemaResolver](
path: Path,
options: ParquetWriter.Options
): Pipe[F, T, Nothing] =
in =>
for {
writer <- Stream.resource(writerResource[T, F](path, options))
nothing <- writer.writeAllStream(in)
} yield nothing

private def writerResource[T: ParquetRecordEncoder: ParquetSchemaResolver, F[_]](
private def rowParquetRecordPipe[F[_], T: ParquetRecordEncoder: ParquetSchemaResolver](
path: Path,
options: ParquetWriter.Options
)(implicit F: Sync[F]): Resource[F, Writer[T, F]] =
Resource.fromAutoCloseable(
for {
valueCodecConfiguration <- F.pure(ValueCodecConfiguration(options))
schema <- F.catchNonFatal(ParquetSchemaResolver.resolveSchema[T])
internalWriter <- F.delay(scala.concurrent.blocking(ParquetWriter.internalWriter(path, schema, options)))
encode = { (entity: T) => F.catchNonFatal(ParquetRecordEncoder.encode[T](entity, valueCodecConfiguration)) }
} yield new Writer[T, F](internalWriter, encode)
)
)(implicit F: Sync[F]): Pipe[F, T, Nothing] = { in =>
val valueCodecConfiguration = ValueCodecConfiguration(options)
in
.evalMapChunk(entity => F.catchNonFatal(ParquetRecordEncoder.encode[T](entity, valueCodecConfiguration)))
.through(pipe(ParquetWriter.internalWriter(path, ParquetSchemaResolver.resolveSchema[T], options)))
}

private def pipe[F[_]: Sync, T](makeParquetWriter: => HadoopParquetWriter[T]): Pipe[F, T, Nothing] =
in =>
Stream
.resource(Writer[T, F](makeParquetWriter))
.flatMap(_.writeAllStream(in))
}
1 change: 1 addition & 0 deletions project/plugins.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -3,3 +3,4 @@ addSbtPlugin("com.thoughtworks.sbt-api-mappings" % "sbt-api-mappings" % "3.0.2")
addSbtPlugin("pl.project13.scala" % "sbt-jmh" % "0.4.3")
addSbtPlugin("org.scalameta" % "sbt-scalafmt" % "2.5.0")
addSbtPlugin("com.47deg" % "sbt-microsites" % "1.4.1")
addSbtPlugin("com.github.sbt" % "sbt-protobuf" % "0.7.1")
Loading

0 comments on commit 7bb5f07

Please sign in to comment.