Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import scala.jdk.CollectionConverters.*
import java.io.InputStream
@HelpMessage(
"Prints statistics about a Jelly-RDF stream.\n" +
"Statistics include: Jelly stream options and counts of various row types, " +
"Statistics include: Jelly stream options and counts/sizes of various row types, " +
"including triples, quads, names, prefixes, " +
"namespaces, datatypes, and graphs.\n" +
"Output statistics are returned as a valid YAML. \n" +
Expand Down Expand Up @@ -40,6 +40,10 @@ case class RdfInspectOptions(
"term position ('term'), or doesn't aggregate ('all').",
)
detail: Option[String] = None,
@HelpMessage(
"Report the size (in bytes) of rows and other elements, rather than their counts.",
)
size: Boolean = false,
) extends HasJellyCommandOptions

object RdfInspect extends JellyCommand[RdfInspectOptions]:
Expand All @@ -61,6 +65,8 @@ object RdfInspect extends JellyCommand[RdfInspectOptions]:
throw InvalidArgument("--detail", value, Some("Must be one of 'all', 'node', 'term'"))
case None => MetricsPrinter.allFormatter
}
val statCollector = if options.size then FrameInfo.SizeStatistic else FrameInfo.CountStatistic
given FrameInfo.StatisticCollector = statCollector
val (streamOpts, frameIterator) = inspectJelly(inputStream, options.detail.isDefined)
val metricsPrinter = new MetricsPrinter(formatter)
if options.perFrame then metricsPrinter.printPerFrame(streamOpts, frameIterator, outputStream)
Expand All @@ -69,7 +75,7 @@ object RdfInspect extends JellyCommand[RdfInspectOptions]:
private def inspectJelly(
inputStream: InputStream,
detail: Boolean,
): (RdfStreamOptions, Iterator[FrameInfo]) =
)(using FrameInfo.StatisticCollector): (RdfStreamOptions, Iterator[FrameInfo]) =

inline def computeMetrics(
frame: RdfStreamFrame,
Expand All @@ -79,7 +85,7 @@ object RdfInspect extends JellyCommand[RdfInspectOptions]:
val metrics =
if detail then new FrameDetailInfo(frameIndex, metadata)
else FrameInfo(frameIndex, metadata)
frame.getRows.asScala.foreach(r => metricsForRow(r, metrics))
metrics.processFrame(frame)
metrics

try {
Expand All @@ -97,11 +103,6 @@ object RdfInspect extends JellyCommand[RdfInspectOptions]:
throw InvalidJellyFile(e)
}

private def metricsForRow(
row: RdfStreamRow,
metadata: FrameInfo,
): Unit = metadata.processStreamRow(row)

/** Checks whether the first frame in the stream contains options and returns them.
* @param headFrame
* The first frame in the stream as an option.
Expand Down
Original file line number Diff line number Diff line change
@@ -1,18 +1,40 @@
package eu.neverblink.jelly.cli.command.rdf.util

import com.google.protobuf.ByteString
import com.google.protobuf.{ByteString, CodedOutputStream}
import scala.jdk.CollectionConverters.*
import eu.neverblink.jelly.cli.util.io.YamlDocBuilder
import eu.neverblink.jelly.cli.util.io.YamlDocBuilder.*
import eu.neverblink.jelly.core.proto.v1.*
import eu.neverblink.protoc.java.runtime.ProtoMessage

import java.io.OutputStream
import scala.language.postfixOps

object FrameInfo:
trait StatisticCollector:
def measure(r: ProtoMessage[?]): Long
def measure(r: String): Long // Needed as bnodes are plain strings
def name(): String

case object CountStatistic extends StatisticCollector:
override def measure(r: ProtoMessage[?]): Long = 1
override def measure(r: String): Long = 1
override def name(): String = "count"

case object SizeStatistic extends StatisticCollector:
override def measure(r: ProtoMessage[?]): Long = r.getSerializedSize
override def measure(r: String): Long = CodedOutputStream.computeStringSizeNoTag(r)
override def name(): String = "size"

/** This class is used to store the metrics for a single frame
*/
class FrameInfo(val frameIndex: Long, val metadata: Map[String, ByteString]):
class FrameInfo(val frameIndex: Long, val metadata: Map[String, ByteString])(using
statCollector: FrameInfo.StatisticCollector,
):
var frameCount: Long = 1
private object count:
private object stat:
var frame: Long = 0
var row: Long = 0
var option: Long = 0
var name: Long = 0
var namespace: Long = 0
Expand All @@ -25,101 +47,123 @@ class FrameInfo(val frameIndex: Long, val metadata: Map[String, ByteString]):

def +=(other: FrameInfo): FrameInfo = {
this.frameCount += 1
this.count.option += other.count.option
this.count.name += other.count.name
this.count.namespace += other.count.namespace
this.count.triple += other.count.triple
this.count.quad += other.count.quad
this.count.prefix += other.count.prefix
this.count.datatype += other.count.datatype
this.count.graphStart += other.count.graphStart
this.count.graphEnd += other.count.graphEnd
this.stat.frame += other.stat.frame
this.stat.row += other.stat.row
this.stat.option += other.stat.option
this.stat.name += other.stat.name
this.stat.namespace += other.stat.namespace
this.stat.triple += other.stat.triple
this.stat.quad += other.stat.quad
this.stat.prefix += other.stat.prefix
this.stat.datatype += other.stat.datatype
this.stat.graphStart += other.stat.graphStart
this.stat.graphEnd += other.stat.graphEnd
this
}

def processStreamRow(row: RdfStreamRow): Unit = row.getRow match {
case r: RdfTriple => handleTriple(r)
case r: RdfQuad => handleQuad(r)
case r: RdfNameEntry => handleNameEntry(r)
case r: RdfPrefixEntry => handlePrefixEntry(r)
case r: RdfNamespaceDeclaration => handleNamespaceDeclaration(r)
case r: RdfDatatypeEntry => handleDatatypeEntry(r)
case r: RdfGraphStart => handleGraphStart(r)
case r: RdfGraphEnd => handleGraphEnd(r)
case r: RdfStreamOptions => handleOption(r)
def processFrame(frame: RdfStreamFrame): Unit = {
stat.frame += statCollector.measure(frame)
frame.getRows.asScala.foreach(r => processStreamRow(r))
}

protected def handleTriple(r: RdfTriple): Unit = count.triple += 1
protected def handleQuad(r: RdfQuad): Unit = count.quad += 1
protected def handleNameEntry(r: RdfNameEntry): Unit = count.name += 1
protected def handlePrefixEntry(r: RdfPrefixEntry): Unit = count.prefix += 1
protected def handleNamespaceDeclaration(r: RdfNamespaceDeclaration): Unit = count.namespace += 1
protected def handleDatatypeEntry(r: RdfDatatypeEntry): Unit = count.datatype += 1
protected def handleGraphStart(r: RdfGraphStart): Unit = count.graphStart += 1
protected def handleGraphEnd(r: RdfGraphEnd): Unit = count.graphEnd += 1
protected def handleOption(r: RdfStreamOptions): Unit = count.option += 1

def format(): Seq[(String, Long)] = Seq(
("option_count", count.option),
("triple_count", count.triple),
("quad_count", count.quad),
("graph_start_count", count.graphStart),
("graph_end_count", count.graphEnd),
("namespace_count", count.namespace),
("name_count", count.name),
("prefix_count", count.prefix),
("datatype_count", count.datatype),
)
def processStreamRow(row: RdfStreamRow): Unit = {
stat.row += statCollector.measure(row)
row.getRow match {
case r: RdfTriple => handleTriple(r)
case r: RdfQuad => handleQuad(r)
case r: RdfNameEntry => handleNameEntry(r)
case r: RdfPrefixEntry => handlePrefixEntry(r)
case r: RdfNamespaceDeclaration => handleNamespaceDeclaration(r)
case r: RdfDatatypeEntry => handleDatatypeEntry(r)
case r: RdfGraphStart => handleGraphStart(r)
case r: RdfGraphEnd => handleGraphEnd(r)
case r: RdfStreamOptions => handleOption(r)
}
}

protected def handleTriple(r: RdfTriple): Unit = stat.triple += statCollector.measure(r)
protected def handleQuad(r: RdfQuad): Unit = stat.quad += statCollector.measure(r)
protected def handleNameEntry(r: RdfNameEntry): Unit = stat.name += statCollector.measure(r)
protected def handlePrefixEntry(r: RdfPrefixEntry): Unit = stat.prefix += statCollector.measure(r)
protected def handleNamespaceDeclaration(r: RdfNamespaceDeclaration): Unit =
stat.namespace += statCollector.measure(r)
protected def handleDatatypeEntry(r: RdfDatatypeEntry): Unit =
stat.datatype += statCollector.measure(r)
protected def handleGraphStart(r: RdfGraphStart): Unit =
stat.graphStart += statCollector.measure(r)
protected def handleGraphEnd(r: RdfGraphEnd): Unit = stat.graphEnd += statCollector.measure(r)
protected def handleOption(r: RdfStreamOptions): Unit = stat.option += statCollector.measure(r)

def format(): Seq[(String, Long)] = {
val name = statCollector.name()
Seq(
("frame_" + name, stat.frame),
("row_" + name, stat.row),
("option_" + name, stat.option),
("triple_" + name, stat.triple),
("quad_" + name, stat.quad),
("graph_start_" + name, stat.graphStart),
("graph_end_" + name, stat.graphEnd),
("namespace_" + name, stat.namespace),
("name_" + name, stat.name),
("prefix_" + name, stat.prefix),
("datatype_" + name, stat.datatype),
)
}

end FrameInfo

/** Class containing statistics for each node type. Combines nodes allowed in triple terms (IRI,
* blank node, literal, triple) and graph term in quads (IRI, blank node, literal, default graph).
* For simplicity, this class does not validate these constraints.
*/
class NodeDetailInfo:
private object count:
class NodeDetailInfo(using statCollector: FrameInfo.StatisticCollector):
private object stat:
var iri: Long = 0
var bnode: Long = 0
var literal: Long = 0
var triple: Long = 0
var defaultGraph: Long = 0

def handle(o: Object): Unit = o match {
case r: RdfIri => count.iri += 1
case r: String => count.bnode += 1 // bnodes are strings
case r: RdfLiteral => count.literal += 1
case r: RdfTriple => count.triple += 1
case r: RdfDefaultGraph => count.defaultGraph += 1
case r: RdfIri => stat.iri += statCollector.measure(r)
case r: String => stat.bnode += statCollector.measure(r) // bnodes are strings
case r: RdfLiteral => stat.literal += statCollector.measure(r)
case r: RdfTriple => stat.triple += statCollector.measure(r)
case r: RdfDefaultGraph => stat.defaultGraph += statCollector.measure(r)
}

def format(): Seq[(String, Long)] = Seq(
("iri_count", count.iri),
("bnode_count", count.bnode),
("literal_count", count.literal),
("triple_count", count.triple),
("default_graph_count", count.defaultGraph),
).filter(_._2 > 0)
def format(): Seq[(String, Long)] = {
val name = statCollector.name()
Seq(
("iri_" + name, stat.iri),
("bnode_" + name, stat.bnode),
("literal_" + name, stat.literal),
("triple_" + name, stat.triple),
("default_graph_" + name, stat.defaultGraph),
).filter(_._2 > 0)
}

def +=(other: NodeDetailInfo): NodeDetailInfo = {
this.count.iri += other.count.iri
this.count.bnode += other.count.bnode
this.count.literal += other.count.literal
this.count.triple += other.count.triple
this.count.defaultGraph += other.count.defaultGraph
this.stat.iri += other.stat.iri
this.stat.bnode += other.stat.bnode
this.stat.literal += other.stat.literal
this.stat.triple += other.stat.triple
this.stat.defaultGraph += other.stat.defaultGraph
this
}

def total(): Long = count.iri
+ count.bnode
+ count.literal
+ count.triple
+ count.defaultGraph
def total(): Long = stat.iri
+ stat.bnode
+ stat.literal
+ stat.triple
+ stat.defaultGraph

end NodeDetailInfo

class FrameDetailInfo(frameIndex: Long, metadata: Map[String, ByteString])
extends FrameInfo(frameIndex, metadata):
class FrameDetailInfo(frameIndex: Long, metadata: Map[String, ByteString])(using
statCollector: FrameInfo.StatisticCollector,
) extends FrameInfo(frameIndex, metadata):
private object term:
val subjectInfo = new NodeDetailInfo()
val predicateInfo = new NodeDetailInfo()
Expand Down Expand Up @@ -168,12 +212,15 @@ class FrameDetailInfo(frameIndex: Long, metadata: Map[String, ByteString])
out += term.graphInfo
out.format()

def formatGroupByTerm(): Seq[(String, Long)] = Seq(
"subject_count" -> term.subjectInfo.total(),
"predicate_count" -> term.predicateInfo.total(),
"object_count" -> term.objectInfo.total(),
"graph_count" -> term.graphInfo.total(),
)
def formatGroupByTerm(): Seq[(String, Long)] = {
val name = statCollector.name()
Seq(
"subject_" + name -> term.subjectInfo.total(),
"predicate_" + name -> term.predicateInfo.total(),
"object_" + name -> term.objectInfo.total(),
"graph_" + name -> term.graphInfo.total(),
)
}

end FrameDetailInfo

Expand Down
Loading