Skip to content
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ lazy val root = (project in file("."))
"eu.ostrzyciel.jelly" %% "jelly-jena" % jellyV,
"com.github.alexarchambault" %% "case-app" % "2.1.0-M30",
"org.scalatest" %% "scalatest" % "3.2.19" % Test,
"org.yaml" % "snakeyaml" % "2.4" % Test,
),
scalacOptions ++= Seq(
"-Wunused:imports",
Expand Down
1 change: 1 addition & 0 deletions src/main/scala/eu/neverblink/jelly/cli/App.scala
Original file line number Diff line number Diff line change
Expand Up @@ -22,4 +22,5 @@ object App extends CommandsEntryPoint:
Version,
RdfFromJelly,
RdfToJelly,
RdfInspect,
)
3 changes: 1 addition & 2 deletions src/main/scala/eu/neverblink/jelly/cli/Exceptions.scala
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
package eu.neverblink.jelly.cli

import com.google.protobuf.InvalidProtocolBufferException
import org.apache.jena.riot.RiotException

/** Contains a set of common jelly-cli exceptions with custom output messages.
Expand All @@ -22,7 +21,7 @@ case class JellyTranscodingError(message: String)
extends CriticalException(s"Jelly transcoding error: $message")
case class JenaRiotException(e: RiotException)
extends CriticalException(s"Jena RDF I/O exception: ${e.getMessage}")
case class InvalidJellyFile(e: InvalidProtocolBufferException)
case class InvalidJellyFile(e: Exception)
extends CriticalException(s"Invalid Jelly file: ${e.getMessage}")
case class InvalidFormatSpecified(format: String, validFormats: String)
extends CriticalException(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,9 @@ import caseapp.*
import eu.neverblink.jelly.cli.*
import eu.neverblink.jelly.cli.command.rdf.RdfFormat.*
import eu.neverblink.jelly.cli.command.rdf.RdfFormat.Jena.*
import eu.neverblink.jelly.cli.util.JellyUtil
import eu.ostrzyciel.jelly.convert.jena.riot.JellyLanguage
import eu.ostrzyciel.jelly.core.proto.v1.RdfStreamFrame
import eu.ostrzyciel.jelly.core.IoUtils
import org.apache.jena.riot.system.StreamRDFWriter
import org.apache.jena.riot.{Lang, RDFParser}

Expand All @@ -25,7 +25,7 @@ case class RdfFromJellyOptions(
@ExtraName("out-format") outputFormat: Option[String] = None,
) extends HasJellyCommandOptions

object RdfFromJelly extends RdfCommand[RdfFromJellyOptions, RdfFormat.Writeable]:
object RdfFromJelly extends RdfTranscodeCommand[RdfFromJellyOptions, RdfFormat.Writeable]:

override def names: List[List[String]] = List(
List("rdf", "from-jelly"),
Expand Down Expand Up @@ -83,30 +83,10 @@ object RdfFromJelly extends RdfCommand[RdfFromJellyOptions, RdfFormat.Writeable]
outputStream.write(frame.getBytes)

try {
iterateRdfStream(inputStream, outputStream).zipWithIndex.foreach {
JellyUtil.iterateRdfStream(inputStream).zipWithIndex.foreach {
case (maybeFrame, frameIndex) =>
writeFrameToOutput(maybeFrame, frameIndex)
}
} finally {
outputStream.flush()
}

/** This method reads the Jelly file and returns an iterator of RdfStreamFrame
* @param inputStream
* @param outputStream
* @return
*/
private def iterateRdfStream(
inputStream: InputStream,
outputStream: OutputStream,
): Iterator[RdfStreamFrame] =
IoUtils.autodetectDelimiting(inputStream) match
case (false, newIn) =>
// Non-delimited Jelly file
// In this case, we can only read one frame
Iterator(RdfStreamFrame.parseFrom(newIn))
case (true, newIn) =>
// Delimited Jelly file
// In this case, we can read multiple frames
Iterator.continually(RdfStreamFrame.parseDelimitedFrom(newIn))
.takeWhile(_.isDefined).map(_.get)
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
package eu.neverblink.jelly.cli.command.rdf

import caseapp.{ExtraName, Recurse}
import caseapp.core.RemainingArgs
import eu.neverblink.jelly.cli.util.{FrameInfo, JellyUtil, MetricsPrinter}
import eu.neverblink.jelly.cli.*
import eu.ostrzyciel.jelly.core.proto.v1.*

import java.io.InputStream

case class RdfInspectOptions(
@Recurse
common: JellyCommandOptions = JellyCommandOptions(),
@ExtraName("to") outputFile: Option[String] = None,
@ExtraName("per-frame") perFrame: Boolean = false,
) extends HasJellyCommandOptions

object RdfInspect extends JellyCommand[RdfInspectOptions]:

override def names: List[List[String]] = List(
List("rdf", "inspect"),
)

override final def group = "rdf"

override def doRun(options: RdfInspectOptions, remainingArgs: RemainingArgs): Unit =
val (inputStream, outputStream) =
this.getIoStreamsFromOptions(remainingArgs.remaining.headOption, options.outputFile)
val printer = inspectJelly(inputStream)
if options.perFrame then printer.printPerFrame(outputStream)
else printer.printAggregate(outputStream)

private def inspectJelly(
inputStream: InputStream,
): MetricsPrinter =

inline def computeMetrics(
frame: RdfStreamFrame,
frameIndex: Int,
printer: MetricsPrinter,
): Unit =
val metrics = new FrameInfo(frameIndex)
frame.rows.foreach(r => metricsForRow(r, metrics))
printer.frameInfo += metrics

try {
val allRows = JellyUtil.iterateRdfStream(inputStream).toList
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This requires you to allocate a data structure per frame and keep it in memory... making all of this a non-streaming algorithm. If you feed in a very long file, you're going to have OOMs.

Can you rewrite it so that it operates on iterators?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I rewrote the whole thing to work on iterators - only in the case of the last step of the --per-frame aggregation, I write each frame stat to the output inside of a foreach statement, which in my understanding should be fine because I'm immediately discarding any materialized objects, but please let me know if it is not so.

// we need to check if the first frame contains options
val streamOptions = checkOptions(allRows)
val printer = new MetricsPrinter(streamOptions)
// We compute the metrics for each frame
// and then sum them all during the printing if desired
allRows.zipWithIndex.foreach { case (maybeFrame, frameIndex) =>
computeMetrics(maybeFrame, frameIndex, printer)
}
printer
} catch {
case e: Exception =>
throw InvalidJellyFile(e)
}

private def metricsForRow(
row: RdfStreamRow,
metadata: FrameInfo,
): Unit =
row.row match {
case r: RdfTriple => metadata.tripleCount += 1
case r: RdfQuad => metadata.quadCount += 1
case r: RdfNameEntry => metadata.nameCount += 1
case r: RdfPrefixEntry => metadata.prefixCount += 1
case r: RdfNamespaceDeclaration => metadata.namespaceCount += 1
case r: RdfDatatypeEntry => metadata.datatypeCount += 1
case r: RdfGraphStart => metadata.graphStartCount += 1
case r: RdfGraphEnd => metadata.graphEndCount += 1
case r: RdfStreamOptions => metadata.optionCount += 1
}

/** Checks whether the first frame in the stream contains options and returns them.
* @param allFrames
* The list of all frames in the stream.
* @return
* The options from the first frame.
* @throws RuntimeException
* If the first frame does not contain options or if there are no frames in the stream.
*/
private def checkOptions(allFrames: List[RdfStreamFrame]): RdfStreamOptions =
if allFrames.isEmpty then throw new RuntimeException("No frames in the stream.")
if allFrames.head.rows.isEmpty then throw new RuntimeException("No rows in the frame.")
val frameRows = allFrames.head.rows
frameRows.head.row match {
case r: RdfStreamOptions => r
case _ => throw new RuntimeException("First row of the frame is not an options row.")
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ case class RdfToJellyOptions(
delimited: Boolean = true,
) extends HasJellyCommandOptions

object RdfToJelly extends RdfCommand[RdfToJellyOptions, RdfFormat.Readable]:
object RdfToJelly extends RdfTranscodeCommand[RdfToJellyOptions, RdfFormat.Readable]:

override def names: List[List[String]] = List(
List("rdf", "to-jelly"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@ import java.io.{InputStream, OutputStream}

/** This abstract class is responsible for the common logic in both RDF parsing commands
*/
abstract class RdfCommand[T <: HasJellyCommandOptions: {Parser, Help}, F <: RdfFormat](using
tt: TypeTest[RdfFormat, F],
abstract class RdfTranscodeCommand[T <: HasJellyCommandOptions: {Parser, Help}, F <: RdfFormat](
using tt: TypeTest[RdfFormat, F],
) extends JellyCommand[T]:

override final def group = "rdf"
Expand Down
27 changes: 27 additions & 0 deletions src/main/scala/eu/neverblink/jelly/cli/util/JellyUtil.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
package eu.neverblink.jelly.cli.util

import eu.ostrzyciel.jelly.core.IoUtils
import eu.ostrzyciel.jelly.core.proto.v1.RdfStreamFrame

import java.io.InputStream

object JellyUtil:
/** This method reads the Jelly file and returns an iterator of RdfStreamFrame
*
* @param inputStream
* @param outputStream
* @return
*/
def iterateRdfStream(
inputStream: InputStream,
): Iterator[RdfStreamFrame] =
IoUtils.autodetectDelimiting(inputStream) match
case (false, newIn) =>
// Non-delimited Jelly file
// In this case, we can only read one frame
Iterator(RdfStreamFrame.parseFrom(newIn))
case (true, newIn) =>
// Delimited Jelly file
// In this case, we can read multiple frames
Iterator.continually(RdfStreamFrame.parseDelimitedFrom(newIn))
.takeWhile(_.isDefined).map(_.get)
117 changes: 117 additions & 0 deletions src/main/scala/eu/neverblink/jelly/cli/util/MetricsPrinter.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
package eu.neverblink.jelly.cli.util

import eu.neverblink.jelly.cli.util.YamlDocBuilder.*
import eu.ostrzyciel.jelly.core.proto.v1.RdfStreamOptions

import java.io.OutputStream
import scala.collection.mutable.ListBuffer

/** This class is used to store the metrics for a single frame
*/
final class FrameInfo(val frameIndex: Int):
var optionCount: Int = 0
var nameCount: Int = 0
var namespaceCount: Int = 0
var tripleCount: Int = 0
var quadCount: Int = 0
var prefixCount: Int = 0
var datatypeCount: Int = 0
var graphStartCount: Int = 0
var graphEndCount: Int = 0

def +=(other: FrameInfo): FrameInfo = {
this.optionCount += other.optionCount
this.nameCount += other.nameCount
this.namespaceCount += other.namespaceCount
this.tripleCount += other.tripleCount
this.quadCount += other.quadCount
this.prefixCount += other.prefixCount
this.datatypeCount += other.datatypeCount
this.graphStartCount += other.graphStartCount
this.graphEndCount += other.graphEndCount
this
}

end FrameInfo

final class MetricsPrinter(printOptions: RdfStreamOptions):
import eu.neverblink.jelly.cli.util.MetricsPrinter.*

var frameInfo: ListBuffer[FrameInfo] = ListBuffer.empty

def printPerFrame(o: OutputStream): Unit = {
val options = formatOptions(options = printOptions)
val yamlFrames = YamlDocBuilder.YamlList(frameInfo.map { frame =>
formatStatsIndex(frame)
}.toSeq)
val fullString =
YamlDocBuilder.build(
YamlMap(
"stream_options" -> options,
"frames" -> yamlFrames,
),
)
o.write(fullString.getBytes)

}

def printAggregate(o: OutputStream): Unit = {
val frameCount = frameInfo.length
val sumCounts = frameInfo.reduce(_ += _)
val options = formatOptions(options = printOptions)
val fullString =
YamlDocBuilder.build(
YamlMap(
"stream_options" -> options,
"frames" -> formatStatsCount(sumCounts, frameCount),
),
)
o.write(fullString.getBytes)
}

end MetricsPrinter

object MetricsPrinter:

private def formatOptions(
options: RdfStreamOptions,
): YamlMap =
YamlMap(
"stream_name" -> YamlString(options.streamName),
"physical_type" -> YamlEnum(options.physicalType.toString, options.physicalType.value),
"generalized_statements" -> YamlBool(options.generalizedStatements),
"rdf_star" -> YamlBool(options.rdfStar),
"max_name_table_size" -> YamlInt(options.maxNameTableSize),
"max_prefix_table_size" -> YamlInt(options.maxPrefixTableSize),
"max_datatype_table_size" -> YamlInt(options.maxDatatypeTableSize),
"logical_type" -> YamlEnum(options.logicalType.toString, options.logicalType.value),
"version" -> YamlInt(options.version),
)

private def formatStatsIndex(
frame: FrameInfo,
): YamlMap =
YamlMap(Seq(("frame_index", YamlInt(frame.frameIndex))) ++ formatStats(frame)*)

private def formatStatsCount(
frame: FrameInfo,
frameCount: Int,
): YamlMap =
YamlMap(Seq(("frame_count", YamlInt(frameCount))) ++ formatStats(frame)*)

private def formatStats(
frame: FrameInfo,
): Seq[(String, YamlValue)] =
Seq(
("option_count", YamlInt(frame.optionCount)),
("triple_count", YamlInt(frame.tripleCount)),
("quad_count", YamlInt(frame.quadCount)),
("graph_start_count", YamlInt(frame.graphStartCount)),
("graph_end_count", YamlInt(frame.graphEndCount)),
("namespace_count", YamlInt(frame.namespaceCount)),
("name_count", YamlInt(frame.nameCount)),
("prefix_count", YamlInt(frame.prefixCount)),
("datatype_count", YamlInt(frame.datatypeCount)),
)

end MetricsPrinter
Loading