Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 25 additions & 20 deletions src/main/scala/eu/neverblink/jelly/cli/command/rdf/RdfInspect.scala
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ package eu.neverblink.jelly.cli.command.rdf
import caseapp.{ArgsName, ExtraName, HelpMessage, Recurse}
import caseapp.core.RemainingArgs
import eu.neverblink.jelly.cli.*
import eu.neverblink.jelly.cli.command.rdf.util.{FrameInfo, JellyUtil, MetricsPrinter}
import eu.neverblink.jelly.cli.command.rdf.util.*
import eu.neverblink.jelly.core.proto.v1.*

import scala.jdk.CollectionConverters.*
Expand Down Expand Up @@ -34,6 +34,12 @@ case class RdfInspectOptions(
"If true, the statistics are computed and printed separately for each frame in the stream.",
)
perFrame: Boolean = false,
@HelpMessage(
"Control the detailed output. One of 'node', 'term', 'all'. " +
"Groups output by node type ('node'), subject/predicate/object " +
"term position ('term'), or doesn't aggregate ('all').",
)
detail: Option[String] = None,
) extends HasJellyCommandOptions

object RdfInspect extends JellyCommand[RdfInspectOptions]:
Expand All @@ -47,22 +53,32 @@ object RdfInspect extends JellyCommand[RdfInspectOptions]:
override def doRun(options: RdfInspectOptions, remainingArgs: RemainingArgs): Unit =
val (inputStream, outputStream) =
this.getIoStreamsFromOptions(remainingArgs.remaining.headOption, options.outputFile)
val (streamOpts, frameIterator) = inspectJelly(inputStream)
if options.perFrame then MetricsPrinter.printPerFrame(streamOpts, frameIterator, outputStream)
else MetricsPrinter.printAggregate(streamOpts, frameIterator, outputStream)
val formatter = options.detail match {
case Some("all") => MetricsPrinter.allFormatter
case Some("node") => MetricsPrinter.nodeGroupFormatter
case Some("term") => MetricsPrinter.termGroupFormatter
case Some(value) =>
throw InvalidArgument("--detail", value, Some("Must be one of 'all', 'node', 'term'"))
case None => MetricsPrinter.allFormatter
}
val (streamOpts, frameIterator) = inspectJelly(inputStream, options.detail.isDefined)
val metricsPrinter = new MetricsPrinter(formatter)
if options.perFrame then metricsPrinter.printPerFrame(streamOpts, frameIterator, outputStream)
else metricsPrinter.printAggregate(streamOpts, frameIterator, outputStream)

private def inspectJelly(
inputStream: InputStream,
detail: Boolean,
): (RdfStreamOptions, Iterator[FrameInfo]) =

inline def computeMetrics(
frame: RdfStreamFrame,
frameIndex: Int,
): FrameInfo =
val metrics = new FrameInfo(
frameIndex,
frame.getMetadata.asScala.map(entry => entry.getKey -> entry.getValue).toMap,
)
val metadata = frame.getMetadata.asScala.map(entry => entry.getKey -> entry.getValue).toMap
val metrics =
if detail then new FrameDetailInfo(frameIndex, metadata)
else FrameInfo(frameIndex, metadata)
frame.getRows.asScala.foreach(r => metricsForRow(r, metrics))
metrics

Expand All @@ -84,18 +100,7 @@ object RdfInspect extends JellyCommand[RdfInspectOptions]:
private def metricsForRow(
row: RdfStreamRow,
metadata: FrameInfo,
): Unit =
row.getRow match {
case r: RdfTriple => metadata.tripleCount += 1
case r: RdfQuad => metadata.quadCount += 1
case r: RdfNameEntry => metadata.nameCount += 1
case r: RdfPrefixEntry => metadata.prefixCount += 1
case r: RdfNamespaceDeclaration => metadata.namespaceCount += 1
case r: RdfDatatypeEntry => metadata.datatypeCount += 1
case r: RdfGraphStart => metadata.graphStartCount += 1
case r: RdfGraphEnd => metadata.graphEndCount += 1
case r: RdfStreamOptions => metadata.optionCount += 1
}
): Unit = metadata.processStreamRow(row)

/** Checks whether the first frame in the stream contains options and returns them.
* @param headFrame
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,43 +3,211 @@ package eu.neverblink.jelly.cli.command.rdf.util
import com.google.protobuf.ByteString
import eu.neverblink.jelly.cli.util.io.YamlDocBuilder
import eu.neverblink.jelly.cli.util.io.YamlDocBuilder.*
import eu.neverblink.jelly.core.proto.v1.RdfStreamOptions
import eu.neverblink.jelly.core.proto.v1.*

import java.io.OutputStream
import scala.language.postfixOps

/** This class is used to store the metrics for a single frame
*/
final class FrameInfo(val frameIndex: Long, val metadata: Map[String, ByteString]):
class FrameInfo(val frameIndex: Long, val metadata: Map[String, ByteString]):
var frameCount: Long = 1
var optionCount: Long = 0
var nameCount: Long = 0
var namespaceCount: Long = 0
var tripleCount: Long = 0
var quadCount: Long = 0
var prefixCount: Long = 0
var datatypeCount: Long = 0
var graphStartCount: Long = 0
var graphEndCount: Long = 0
private object count:
var option: Long = 0
var name: Long = 0
var namespace: Long = 0
var triple: Long = 0
var quad: Long = 0
var prefix: Long = 0
var datatype: Long = 0
var graphStart: Long = 0
var graphEnd: Long = 0

def +=(other: FrameInfo): FrameInfo = {
this.frameCount += 1
this.optionCount += other.optionCount
this.nameCount += other.nameCount
this.namespaceCount += other.namespaceCount
this.tripleCount += other.tripleCount
this.quadCount += other.quadCount
this.prefixCount += other.prefixCount
this.datatypeCount += other.datatypeCount
this.graphStartCount += other.graphStartCount
this.graphEndCount += other.graphEndCount
this.count.option += other.count.option
this.count.name += other.count.name
this.count.namespace += other.count.namespace
this.count.triple += other.count.triple
this.count.quad += other.count.quad
this.count.prefix += other.count.prefix
this.count.datatype += other.count.datatype
this.count.graphStart += other.count.graphStart
this.count.graphEnd += other.count.graphEnd
this
}

def processStreamRow(row: RdfStreamRow): Unit = row.getRow match {
case r: RdfTriple => handleTriple(r)
case r: RdfQuad => handleQuad(r)
case r: RdfNameEntry => handleNameEntry(r)
case r: RdfPrefixEntry => handlePrefixEntry(r)
case r: RdfNamespaceDeclaration => handleNamespaceDeclaration(r)
case r: RdfDatatypeEntry => handleDatatypeEntry(r)
case r: RdfGraphStart => handleGraphStart(r)
case r: RdfGraphEnd => handleGraphEnd(r)
case r: RdfStreamOptions => handleOption(r)
}

protected def handleTriple(r: RdfTriple): Unit = count.triple += 1
protected def handleQuad(r: RdfQuad): Unit = count.quad += 1
protected def handleNameEntry(r: RdfNameEntry): Unit = count.name += 1
protected def handlePrefixEntry(r: RdfPrefixEntry): Unit = count.prefix += 1
protected def handleNamespaceDeclaration(r: RdfNamespaceDeclaration): Unit = count.namespace += 1
protected def handleDatatypeEntry(r: RdfDatatypeEntry): Unit = count.datatype += 1
protected def handleGraphStart(r: RdfGraphStart): Unit = count.graphStart += 1
protected def handleGraphEnd(r: RdfGraphEnd): Unit = count.graphEnd += 1
protected def handleOption(r: RdfStreamOptions): Unit = count.option += 1

def format(): Seq[(String, Long)] = Seq(
("option_count", count.option),
("triple_count", count.triple),
("quad_count", count.quad),
("graph_start_count", count.graphStart),
("graph_end_count", count.graphEnd),
("namespace_count", count.namespace),
("name_count", count.name),
("prefix_count", count.prefix),
("datatype_count", count.datatype),
)

end FrameInfo

/** Class containing statistics for each node type. Combines nodes allowed in triple terms (IRI,
* blank node, literal, triple) and graph term in quads (IRI, blank node, literal, default graph).
* For simplicity, this class does not validate these constraints.
*/
class NodeDetailInfo:
private object count:
var iri: Long = 0
var bnode: Long = 0
var literal: Long = 0
var triple: Long = 0
var defaultGraph: Long = 0

def handle(o: Object): Unit = o match {
case r: RdfIri => count.iri += 1
case r: String => count.bnode += 1 // bnodes are strings
case r: RdfLiteral => count.literal += 1
case r: RdfTriple => count.triple += 1
case r: RdfDefaultGraph => count.defaultGraph += 1
}

def format(): Seq[(String, Long)] = Seq(
("iri_count", count.iri),
("bnode_count", count.bnode),
("literal_count", count.literal),
("triple_count", count.triple),
("default_graph_count", count.defaultGraph),
).filter(_._2 > 0)

def +=(other: NodeDetailInfo): NodeDetailInfo = {
this.count.iri += other.count.iri
this.count.bnode += other.count.bnode
this.count.literal += other.count.literal
this.count.triple += other.count.triple
this.count.defaultGraph += other.count.defaultGraph
this
}

def total(): Long = count.iri
+ count.bnode
+ count.literal
+ count.triple
+ count.defaultGraph

end NodeDetailInfo

class FrameDetailInfo(frameIndex: Long, metadata: Map[String, ByteString])
extends FrameInfo(frameIndex, metadata):
private object term:
val subjectInfo = new NodeDetailInfo()
val predicateInfo = new NodeDetailInfo()
val objectInfo = new NodeDetailInfo()
val graphInfo = new NodeDetailInfo()

override def +=(other: FrameInfo): FrameInfo = {
super.+=(other)
other match {
case otherDetail: FrameDetailInfo =>
this.term.subjectInfo += otherDetail.term.subjectInfo
this.term.predicateInfo += otherDetail.term.predicateInfo
this.term.objectInfo += otherDetail.term.objectInfo
this.term.graphInfo += otherDetail.term.graphInfo
case _ =>
}
this
}

override def handleTriple(r: RdfTriple): Unit = {
super.handleTriple(r)
if r.hasSubject then term.subjectInfo.handle(r.getSubject)
if r.hasPredicate then term.predicateInfo.handle(r.getPredicate)
if r.hasObject then term.objectInfo.handle(r.getObject)
}

override def handleQuad(r: RdfQuad): Unit = {
super.handleQuad(r)
if r.hasSubject then term.subjectInfo.handle(r.getSubject)
if r.hasPredicate then term.predicateInfo.handle(r.getPredicate)
if r.hasObject then term.objectInfo.handle(r.getObject)
if r.hasGraph then term.graphInfo.handle(r.getGraph)
}

def formatAll(): Seq[(String, Long)] =
term.subjectInfo.format().map("subject_" ++ _ -> _) ++
term.predicateInfo.format().map("predicate_" ++ _ -> _) ++
term.objectInfo.format().map("object_" ++ _ -> _) ++
term.graphInfo.format().map("graph_" ++ _ -> _)

def formatGroupByNode(): Seq[(String, Long)] =
val out = new NodeDetailInfo()
out += term.subjectInfo
out += term.predicateInfo
out += term.objectInfo
out += term.graphInfo
out.format()

def formatGroupByTerm(): Seq[(String, Long)] = Seq(
"subject_count" -> term.subjectInfo.total(),
"predicate_count" -> term.predicateInfo.total(),
"object_count" -> term.objectInfo.total(),
"graph_count" -> term.graphInfo.total(),
)

end FrameDetailInfo

type Formatter = FrameInfo => Seq[(String, YamlValue)]
type DetailFormatter = FrameDetailInfo => Seq[(String, YamlValue)]

object MetricsPrinter:
private def withFallback(formatter: DetailFormatter): Formatter = {
case frame @ (detailInfo: FrameDetailInfo) =>
frame.format().map(_ -> YamlLong(_)) ++ formatter(detailInfo)
case frame @ (frameInfo: FrameInfo) => frame.format().map(_ -> YamlLong(_))
}

val allFormatter: Formatter = withFallback(detailInfo => {
val splitToTriples = detailInfo.formatAll().map((k, v) => {
val split = k.split("_", 2)
(split(0), split(1), v)
})
val groupedByTerm =
splitToTriples.groupMap((term, _, _) => term)((_, node, value) => (node, YamlLong(value)))
val mapOfYamlMaps = groupedByTerm.map(_ -> YamlMap(_*))
val order = IndexedSeq("subject", "predicate", "object", "graph")
mapOfYamlMaps.toSeq.sorted(using Ordering.by[(String, Any), Int](x => order.indexOf(x._1)))
})

val termGroupFormatter: Formatter = withFallback(detailInfo => {
Seq("term_details" -> YamlMap(detailInfo.formatGroupByTerm().map(_ -> YamlLong(_))*))
})

val nodeGroupFormatter: Formatter = withFallback(detailInfo => {
Seq("node_details" -> YamlMap(detailInfo.formatGroupByNode().map(_ -> YamlLong(_))*))
})

class MetricsPrinter(val formatter: Formatter):
def printPerFrame(
options: RdfStreamOptions,
iterator: Iterator[FrameInfo],
Expand Down Expand Up @@ -137,17 +305,6 @@ object MetricsPrinter:

private def formatStats(
frame: FrameInfo,
): Seq[(String, YamlValue)] =
Seq(
("option_count", YamlLong(frame.optionCount)),
("triple_count", YamlLong(frame.tripleCount)),
("quad_count", YamlLong(frame.quadCount)),
("graph_start_count", YamlLong(frame.graphStartCount)),
("graph_end_count", YamlLong(frame.graphEndCount)),
("namespace_count", YamlLong(frame.namespaceCount)),
("name_count", YamlLong(frame.nameCount)),
("prefix_count", YamlLong(frame.prefixCount)),
("datatype_count", YamlLong(frame.datatypeCount)),
)
): Seq[(String, YamlValue)] = this.formatter(frame)

end MetricsPrinter
Binary file added src/test/resources/everythingQuad.jelly
Binary file not shown.
Binary file added src/test/resources/everythingTriple.jelly
Binary file not shown.
Loading