diff --git a/src/main/scala/eu/neverblink/jelly/cli/command/rdf/RdfInspect.scala b/src/main/scala/eu/neverblink/jelly/cli/command/rdf/RdfInspect.scala index 501ef2e..180f0f8 100644 --- a/src/main/scala/eu/neverblink/jelly/cli/command/rdf/RdfInspect.scala +++ b/src/main/scala/eu/neverblink/jelly/cli/command/rdf/RdfInspect.scala @@ -11,7 +11,7 @@ import scala.jdk.CollectionConverters.* import java.io.InputStream @HelpMessage( "Prints statistics about a Jelly-RDF stream.\n" + - "Statistics include: Jelly stream options and counts of various row types, " + + "Statistics include: Jelly stream options and counts/sizes of various row types, " + "including triples, quads, names, prefixes, " + "namespaces, datatypes, and graphs.\n" + "Output statistics are returned as a valid YAML. \n" + @@ -40,6 +40,10 @@ case class RdfInspectOptions( "term position ('term'), or doesn't aggregate ('all').", ) detail: Option[String] = None, + @HelpMessage( + "Report the size (in bytes) of rows and other elements, rather than their counts.", + ) + size: Boolean = false, ) extends HasJellyCommandOptions object RdfInspect extends JellyCommand[RdfInspectOptions]: @@ -61,6 +65,8 @@ object RdfInspect extends JellyCommand[RdfInspectOptions]: throw InvalidArgument("--detail", value, Some("Must be one of 'all', 'node', 'term'")) case None => MetricsPrinter.allFormatter } + val statCollector = if options.size then FrameInfo.SizeStatistic else FrameInfo.CountStatistic + given FrameInfo.StatisticCollector = statCollector val (streamOpts, frameIterator) = inspectJelly(inputStream, options.detail.isDefined) val metricsPrinter = new MetricsPrinter(formatter) if options.perFrame then metricsPrinter.printPerFrame(streamOpts, frameIterator, outputStream) @@ -69,7 +75,7 @@ object RdfInspect extends JellyCommand[RdfInspectOptions]: private def inspectJelly( inputStream: InputStream, detail: Boolean, - ): (RdfStreamOptions, Iterator[FrameInfo]) = + )(using FrameInfo.StatisticCollector): (RdfStreamOptions, Iterator[FrameInfo]) = inline def computeMetrics( frame: RdfStreamFrame, @@ -79,7 +85,7 @@ object RdfInspect extends JellyCommand[RdfInspectOptions]: val metrics = if detail then new FrameDetailInfo(frameIndex, metadata) else FrameInfo(frameIndex, metadata) - frame.getRows.asScala.foreach(r => metricsForRow(r, metrics)) + metrics.processFrame(frame) metrics try { @@ -97,11 +103,6 @@ object RdfInspect extends JellyCommand[RdfInspectOptions]: throw InvalidJellyFile(e) } - private def metricsForRow( - row: RdfStreamRow, - metadata: FrameInfo, - ): Unit = metadata.processStreamRow(row) - /** Checks whether the first frame in the stream contains options and returns them. * @param headFrame * The first frame in the stream as an option. diff --git a/src/main/scala/eu/neverblink/jelly/cli/command/rdf/util/MetricsPrinter.scala b/src/main/scala/eu/neverblink/jelly/cli/command/rdf/util/MetricsPrinter.scala index 2c52867..1c3c3de 100644 --- a/src/main/scala/eu/neverblink/jelly/cli/command/rdf/util/MetricsPrinter.scala +++ b/src/main/scala/eu/neverblink/jelly/cli/command/rdf/util/MetricsPrinter.scala @@ -1,18 +1,40 @@ package eu.neverblink.jelly.cli.command.rdf.util -import com.google.protobuf.ByteString +import com.google.protobuf.{ByteString, CodedOutputStream} +import scala.jdk.CollectionConverters.* import eu.neverblink.jelly.cli.util.io.YamlDocBuilder import eu.neverblink.jelly.cli.util.io.YamlDocBuilder.* import eu.neverblink.jelly.core.proto.v1.* +import eu.neverblink.protoc.java.runtime.ProtoMessage import java.io.OutputStream import scala.language.postfixOps +object FrameInfo: + trait StatisticCollector: + def measure(r: ProtoMessage[?]): Long + def measure(r: String): Long // Needed as bnodes are plain strings + def name(): String + + case object CountStatistic extends StatisticCollector: + override def measure(r: ProtoMessage[?]): Long = 1 + override def measure(r: String): Long = 1 + override def name(): String = "count" + + case object SizeStatistic extends StatisticCollector: + override def measure(r: ProtoMessage[?]): Long = r.getSerializedSize + override def measure(r: String): Long = CodedOutputStream.computeStringSizeNoTag(r) + override def name(): String = "size" + /** This class is used to store the metrics for a single frame */ -class FrameInfo(val frameIndex: Long, val metadata: Map[String, ByteString]): +class FrameInfo(val frameIndex: Long, val metadata: Map[String, ByteString])(using + statCollector: FrameInfo.StatisticCollector, +): var frameCount: Long = 1 - private object count: + private object stat: + var frame: Long = 0 + var row: Long = 0 var option: Long = 0 var name: Long = 0 var namespace: Long = 0 @@ -25,51 +47,69 @@ class FrameInfo(val frameIndex: Long, val metadata: Map[String, ByteString]): def +=(other: FrameInfo): FrameInfo = { this.frameCount += 1 - this.count.option += other.count.option - this.count.name += other.count.name - this.count.namespace += other.count.namespace - this.count.triple += other.count.triple - this.count.quad += other.count.quad - this.count.prefix += other.count.prefix - this.count.datatype += other.count.datatype - this.count.graphStart += other.count.graphStart - this.count.graphEnd += other.count.graphEnd + this.stat.frame += other.stat.frame + this.stat.row += other.stat.row + this.stat.option += other.stat.option + this.stat.name += other.stat.name + this.stat.namespace += other.stat.namespace + this.stat.triple += other.stat.triple + this.stat.quad += other.stat.quad + this.stat.prefix += other.stat.prefix + this.stat.datatype += other.stat.datatype + this.stat.graphStart += other.stat.graphStart + this.stat.graphEnd += other.stat.graphEnd this } - def processStreamRow(row: RdfStreamRow): Unit = row.getRow match { - case r: RdfTriple => handleTriple(r) - case r: RdfQuad => handleQuad(r) - case r: RdfNameEntry => handleNameEntry(r) - case r: RdfPrefixEntry => handlePrefixEntry(r) - case r: RdfNamespaceDeclaration => handleNamespaceDeclaration(r) - case r: RdfDatatypeEntry => handleDatatypeEntry(r) - case r: RdfGraphStart => handleGraphStart(r) - case r: RdfGraphEnd => handleGraphEnd(r) - case r: RdfStreamOptions => handleOption(r) + def processFrame(frame: RdfStreamFrame): Unit = { + stat.frame += statCollector.measure(frame) + frame.getRows.asScala.foreach(r => processStreamRow(r)) } - protected def handleTriple(r: RdfTriple): Unit = count.triple += 1 - protected def handleQuad(r: RdfQuad): Unit = count.quad += 1 - protected def handleNameEntry(r: RdfNameEntry): Unit = count.name += 1 - protected def handlePrefixEntry(r: RdfPrefixEntry): Unit = count.prefix += 1 - protected def handleNamespaceDeclaration(r: RdfNamespaceDeclaration): Unit = count.namespace += 1 - protected def handleDatatypeEntry(r: RdfDatatypeEntry): Unit = count.datatype += 1 - protected def handleGraphStart(r: RdfGraphStart): Unit = count.graphStart += 1 - protected def handleGraphEnd(r: RdfGraphEnd): Unit = count.graphEnd += 1 - protected def handleOption(r: RdfStreamOptions): Unit = count.option += 1 - - def format(): Seq[(String, Long)] = Seq( - ("option_count", count.option), - ("triple_count", count.triple), - ("quad_count", count.quad), - ("graph_start_count", count.graphStart), - ("graph_end_count", count.graphEnd), - ("namespace_count", count.namespace), - ("name_count", count.name), - ("prefix_count", count.prefix), - ("datatype_count", count.datatype), - ) + def processStreamRow(row: RdfStreamRow): Unit = { + stat.row += statCollector.measure(row) + row.getRow match { + case r: RdfTriple => handleTriple(r) + case r: RdfQuad => handleQuad(r) + case r: RdfNameEntry => handleNameEntry(r) + case r: RdfPrefixEntry => handlePrefixEntry(r) + case r: RdfNamespaceDeclaration => handleNamespaceDeclaration(r) + case r: RdfDatatypeEntry => handleDatatypeEntry(r) + case r: RdfGraphStart => handleGraphStart(r) + case r: RdfGraphEnd => handleGraphEnd(r) + case r: RdfStreamOptions => handleOption(r) + } + } + + protected def handleTriple(r: RdfTriple): Unit = stat.triple += statCollector.measure(r) + protected def handleQuad(r: RdfQuad): Unit = stat.quad += statCollector.measure(r) + protected def handleNameEntry(r: RdfNameEntry): Unit = stat.name += statCollector.measure(r) + protected def handlePrefixEntry(r: RdfPrefixEntry): Unit = stat.prefix += statCollector.measure(r) + protected def handleNamespaceDeclaration(r: RdfNamespaceDeclaration): Unit = + stat.namespace += statCollector.measure(r) + protected def handleDatatypeEntry(r: RdfDatatypeEntry): Unit = + stat.datatype += statCollector.measure(r) + protected def handleGraphStart(r: RdfGraphStart): Unit = + stat.graphStart += statCollector.measure(r) + protected def handleGraphEnd(r: RdfGraphEnd): Unit = stat.graphEnd += statCollector.measure(r) + protected def handleOption(r: RdfStreamOptions): Unit = stat.option += statCollector.measure(r) + + def format(): Seq[(String, Long)] = { + val name = statCollector.name() + Seq( + ("frame_" + name, stat.frame), + ("row_" + name, stat.row), + ("option_" + name, stat.option), + ("triple_" + name, stat.triple), + ("quad_" + name, stat.quad), + ("graph_start_" + name, stat.graphStart), + ("graph_end_" + name, stat.graphEnd), + ("namespace_" + name, stat.namespace), + ("name_" + name, stat.name), + ("prefix_" + name, stat.prefix), + ("datatype_" + name, stat.datatype), + ) + } end FrameInfo @@ -77,8 +117,8 @@ end FrameInfo * blank node, literal, triple) and graph term in quads (IRI, blank node, literal, default graph). * For simplicity, this class does not validate these constraints. */ -class NodeDetailInfo: - private object count: +class NodeDetailInfo(using statCollector: FrameInfo.StatisticCollector): + private object stat: var iri: Long = 0 var bnode: Long = 0 var literal: Long = 0 @@ -86,40 +126,44 @@ class NodeDetailInfo: var defaultGraph: Long = 0 def handle(o: Object): Unit = o match { - case r: RdfIri => count.iri += 1 - case r: String => count.bnode += 1 // bnodes are strings - case r: RdfLiteral => count.literal += 1 - case r: RdfTriple => count.triple += 1 - case r: RdfDefaultGraph => count.defaultGraph += 1 + case r: RdfIri => stat.iri += statCollector.measure(r) + case r: String => stat.bnode += statCollector.measure(r) // bnodes are strings + case r: RdfLiteral => stat.literal += statCollector.measure(r) + case r: RdfTriple => stat.triple += statCollector.measure(r) + case r: RdfDefaultGraph => stat.defaultGraph += statCollector.measure(r) } - def format(): Seq[(String, Long)] = Seq( - ("iri_count", count.iri), - ("bnode_count", count.bnode), - ("literal_count", count.literal), - ("triple_count", count.triple), - ("default_graph_count", count.defaultGraph), - ).filter(_._2 > 0) + def format(): Seq[(String, Long)] = { + val name = statCollector.name() + Seq( + ("iri_" + name, stat.iri), + ("bnode_" + name, stat.bnode), + ("literal_" + name, stat.literal), + ("triple_" + name, stat.triple), + ("default_graph_" + name, stat.defaultGraph), + ).filter(_._2 > 0) + } def +=(other: NodeDetailInfo): NodeDetailInfo = { - this.count.iri += other.count.iri - this.count.bnode += other.count.bnode - this.count.literal += other.count.literal - this.count.triple += other.count.triple - this.count.defaultGraph += other.count.defaultGraph + this.stat.iri += other.stat.iri + this.stat.bnode += other.stat.bnode + this.stat.literal += other.stat.literal + this.stat.triple += other.stat.triple + this.stat.defaultGraph += other.stat.defaultGraph this } - def total(): Long = count.iri - + count.bnode - + count.literal - + count.triple - + count.defaultGraph + def total(): Long = stat.iri + + stat.bnode + + stat.literal + + stat.triple + + stat.defaultGraph end NodeDetailInfo -class FrameDetailInfo(frameIndex: Long, metadata: Map[String, ByteString]) - extends FrameInfo(frameIndex, metadata): +class FrameDetailInfo(frameIndex: Long, metadata: Map[String, ByteString])(using + statCollector: FrameInfo.StatisticCollector, +) extends FrameInfo(frameIndex, metadata): private object term: val subjectInfo = new NodeDetailInfo() val predicateInfo = new NodeDetailInfo() @@ -168,12 +212,15 @@ class FrameDetailInfo(frameIndex: Long, metadata: Map[String, ByteString]) out += term.graphInfo out.format() - def formatGroupByTerm(): Seq[(String, Long)] = Seq( - "subject_count" -> term.subjectInfo.total(), - "predicate_count" -> term.predicateInfo.total(), - "object_count" -> term.objectInfo.total(), - "graph_count" -> term.graphInfo.total(), - ) + def formatGroupByTerm(): Seq[(String, Long)] = { + val name = statCollector.name() + Seq( + "subject_" + name -> term.subjectInfo.total(), + "predicate_" + name -> term.predicateInfo.total(), + "object_" + name -> term.objectInfo.total(), + "graph_" + name -> term.graphInfo.total(), + ) + } end FrameDetailInfo diff --git a/src/test/scala/eu/neverblink/jelly/cli/command/rdf/RdfInspectSpec.scala b/src/test/scala/eu/neverblink/jelly/cli/command/rdf/RdfInspectSpec.scala index cd5c39a..0782fd0 100644 --- a/src/test/scala/eu/neverblink/jelly/cli/command/rdf/RdfInspectSpec.scala +++ b/src/test/scala/eu/neverblink/jelly/cli/command/rdf/RdfInspectSpec.scala @@ -16,6 +16,8 @@ import java.util class RdfInspectSpec extends AnyWordSpec with Matchers with TestFixtureHelper: protected val testCardinality: Int = 33 + protected val tripleTerms: Seq[String] = Seq("subject", "predicate", "object") + protected val tripleNodes: Seq[String] = Seq("iri", "bnode", "literal", "triple") "rdf inspect command" should { "be able to return aggregate of all frames as a valid Yaml" in withFullJellyFile { j => @@ -200,9 +202,6 @@ class RdfInspectSpec extends AnyWordSpec with Matchers with TestFixtureHelper: frameSize = 15, ) - val tripleTerms = Seq("subject", "predicate", "object") - val tripleNodes = Seq("iri", "bnode", "literal", "triple") - "given complex jelly file (triples)" in withSpecificJellyFile( testCode = { jellyF => val (out, err) = @@ -254,4 +253,130 @@ class RdfInspectSpec extends AnyWordSpec with Matchers with TestFixtureHelper: fileName = "everythingQuad.jelly", ) } + + "print size statistics" when { + "aggregating multiple frames" in withFullJellyFile( + testCode = { j => + val (out, err) = RdfInspect.runTestCommand(List("rdf", "inspect", "--size", j)) + val yaml = new Yaml() + val parsed = yaml.load(out).asInstanceOf[java.util.Map[String, Any]] + parsed.get("stream_options") should not be None + val options = parsed.get("stream_options").asInstanceOf[java.util.Map[String, Any]] + options.get("max_name_table_size") should be(128) + parsed.get("frames") shouldBe a[util.LinkedHashMap[?, ?]] + val frames = parsed.get("frames").asInstanceOf[java.util.LinkedHashMap[String, Any]] + frames.get("triple_size") should be(16 * testCardinality) + frames.get("frame_count") should be(5) + }, + frameSize = 15, + ) + + "per frame" in withFullJellyFile( + testCode = { j => + val (out, err) = + RdfInspect.runTestCommand(List("rdf", "inspect", "--per-frame", "--size", j)) + val yaml = new Yaml() + val parsed = yaml.load(out).asInstanceOf[util.Map[String, Any]] + parsed.get("frames") shouldBe a[util.ArrayList[?]] + val frames = parsed.get("frames").asInstanceOf[util.ArrayList[Any]] + frames.get(0) shouldBe a[util.LinkedHashMap[String, util.LinkedHashMap[String, ?]]] + val frame1 = + frames.get(0).asInstanceOf[util.LinkedHashMap[String, Any]] + frame1 shouldBe a[util.LinkedHashMap[?, ?]] + frame1.get("triple_size") should be(16 * 6) + }, + frameSize = 15, + ) + } + + "print detailed size statistics" when { + "aggregating multiple frames" in withFullJellyFile( + testCode = { j => + val (out, err) = + RdfInspect.runTestCommand(List("rdf", "inspect", "--size", "--detail", "all", j)) + val yaml = new Yaml() + val parsed = yaml.load(out).asInstanceOf[java.util.Map[String, Any]] + parsed.get("frames") shouldBe a[util.LinkedHashMap[String, util.LinkedHashMap[String, ?]]] + val frames = parsed.get("frames").asInstanceOf[ + util.LinkedHashMap[String, util.LinkedHashMap[String, Any]], + ] + frames.get("subject").get("iri_size").asInstanceOf[Int] should be > testCardinality + frames.get("subject").get("bnode_size") == null shouldBe true + }, + frameSize = 15, + ) + + "per frame" in withFullJellyFile( + testCode = { j => + val (out, err) = + RdfInspect.runTestCommand( + List("rdf", "inspect", "--size", "--per-frame", "--detail", "all", j), + ) + val yaml = new Yaml() + val parsed = yaml.load(out).asInstanceOf[util.Map[String, Any]] + parsed.get("frames") shouldBe a[util.ArrayList[?]] + val frames = parsed.get("frames").asInstanceOf[util.ArrayList[Any]] + frames.get(0) shouldBe a[util.LinkedHashMap[String, util.LinkedHashMap[String, ?]]] + val frame1 = + frames.get(0).asInstanceOf[util.LinkedHashMap[String, util.LinkedHashMap[String, Any]]] + frame1.get("subject").get("iri_size").asInstanceOf[Int] should be > 6 + frame1.get("subject").get("iri_size").asInstanceOf[Int] should be < 15 + frame1.get("subject").get("bnode_size") == null shouldBe true + }, + frameSize = 15, + ) + + "given complex jelly file (triples)" in withSpecificJellyFile( + testCode = { jellyF => + val (out, err) = + RdfInspect.runTestCommand(List("rdf", "inspect", "--size", "--detail", "all", jellyF)) + val yaml = new Yaml() + val parsed = yaml.load(out).asInstanceOf[util.Map[String, Any]] + parsed.get("frames") shouldBe a[util.LinkedHashMap[String, util.LinkedHashMap[String, ?]]] + val frames = parsed.get("frames").asInstanceOf[ + util.LinkedHashMap[String, util.LinkedHashMap[String, Any]], + ] + for + term <- tripleTerms + node <- tripleNodes + do frames.get(term).get(s"${node}_size").asInstanceOf[Int] should be > 0 + + // Graphs == 0 when doing triples + for node <- "default_graph" +: tripleNodes do frames.get("graph") == null shouldBe true + // These are illegal + for term <- tripleTerms do + frames.get(term).get("default_graph_size") == null shouldBe true + }, + fileName = "everythingTriple.jelly", + ) + + "given complex jelly file (quads)" in withSpecificJellyFile( + testCode = { jellyF => + val (out, err) = + RdfInspect.runTestCommand(List("rdf", "inspect", "--size", "--detail", "all", jellyF)) + val yaml = new Yaml() + val parsed = yaml.load(out).asInstanceOf[java.util.Map[String, Any]] + parsed.get("frames") shouldBe a[ + util.LinkedHashMap[String, java.util.LinkedHashMap[String, ?]], + ] + val frames = parsed.get("frames").asInstanceOf[ + java.util.LinkedHashMap[String, java.util.LinkedHashMap[String, Any]], + ] + for + term <- tripleTerms + node <- tripleNodes + do frames.get(term).get(s"${node}_size").asInstanceOf[Int] should be > 0 + for node <- Seq("iri", "bnode", "literal") do + frames.get("graph").get(s"${node}_size").asInstanceOf[Int] should be > 0 + + // Default graph is 0 bytes, so it gets omitted in size statistics + frames.get("graph").get("default_graph_size") == null shouldBe true + // These are illegal + for term <- tripleTerms do + frames.get(term).get("default_graph_size") == null shouldBe true + for term <- tripleTerms do frames.get("graph").get("triple_size") == null shouldBe true + }, + fileName = "everythingQuad.jelly", + ) + } }