Skip to content
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
55 changes: 35 additions & 20 deletions src/main/scala/eu/neverblink/jelly/cli/command/rdf/RdfInspect.scala
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,12 @@ package eu.neverblink.jelly.cli.command.rdf
import caseapp.{ArgsName, ExtraName, HelpMessage, Recurse}
import caseapp.core.RemainingArgs
import eu.neverblink.jelly.cli.*
import eu.neverblink.jelly.cli.command.rdf.util.{FrameInfo, JellyUtil, MetricsPrinter}
import eu.neverblink.jelly.cli.command.rdf.util.{
FrameDetailInfo,
FrameInfo,
JellyUtil,
MetricsPrinter,
}
import eu.neverblink.jelly.core.proto.v1.*

import scala.jdk.CollectionConverters.*
Expand Down Expand Up @@ -34,6 +39,10 @@ case class RdfInspectOptions(
"If true, the statistics are computed and printed separately for each frame in the stream.",
)
perFrame: Boolean = false,
@HelpMessage(
"Control the detailed output, disabled if not set: flat, node, term",
)
detail: Option[String] = None,
) extends HasJellyCommandOptions

object RdfInspect extends JellyCommand[RdfInspectOptions]:
Expand All @@ -47,22 +56,39 @@ object RdfInspect extends JellyCommand[RdfInspectOptions]:
override def doRun(options: RdfInspectOptions, remainingArgs: RemainingArgs): Unit =
val (inputStream, outputStream) =
this.getIoStreamsFromOptions(remainingArgs.remaining.headOption, options.outputFile)
val (streamOpts, frameIterator) = inspectJelly(inputStream)
if options.perFrame then MetricsPrinter.printPerFrame(streamOpts, frameIterator, outputStream)
else MetricsPrinter.printAggregate(streamOpts, frameIterator, outputStream)
val formatter = options.detail match {
case Some(value) if value == "flat" => MetricsPrinter.flatFormatter
case Some(value) if value == "node" => MetricsPrinter.nodeGroupFormatter
case Some(value) if value == "term" => MetricsPrinter.termGroupFormatter
case Some(value) =>
throw InvalidArgument("--detail", value, Some("Must be one of 'flat', 'node', 'term'"))
case None => MetricsPrinter.flatFormatter
}
val (streamOpts, frameIterator) = inspectJelly(inputStream, options.detail.isDefined)
val metricsPrinter = new MetricsPrinter(formatter)
if options.perFrame then metricsPrinter.printPerFrame(streamOpts, frameIterator, outputStream)
else metricsPrinter.printAggregate(streamOpts, frameIterator, outputStream)

private def inspectJelly(
inputStream: InputStream,
detail: Boolean,
): (RdfStreamOptions, Iterator[FrameInfo]) =

inline def computeMetrics(
frame: RdfStreamFrame,
frameIndex: Int,
): FrameInfo =
val metrics = new FrameInfo(
frameIndex,
frame.getMetadata.asScala.map(entry => entry.getKey -> entry.getValue).toMap,
)
val metrics =
if detail then
new FrameDetailInfo(
frameIndex,
frame.getMetadata.asScala.map(entry => entry.getKey -> entry.getValue).toMap,
)
else
FrameInfo(
frameIndex,
frame.getMetadata.asScala.map(entry => entry.getKey -> entry.getValue).toMap,
)
frame.getRows.asScala.foreach(r => metricsForRow(r, metrics))
metrics

Expand All @@ -84,18 +110,7 @@ object RdfInspect extends JellyCommand[RdfInspectOptions]:
private def metricsForRow(
row: RdfStreamRow,
metadata: FrameInfo,
): Unit =
row.getRow match {
case r: RdfTriple => metadata.tripleCount += 1
case r: RdfQuad => metadata.quadCount += 1
case r: RdfNameEntry => metadata.nameCount += 1
case r: RdfPrefixEntry => metadata.prefixCount += 1
case r: RdfNamespaceDeclaration => metadata.namespaceCount += 1
case r: RdfDatatypeEntry => metadata.datatypeCount += 1
case r: RdfGraphStart => metadata.graphStartCount += 1
case r: RdfGraphEnd => metadata.graphEndCount += 1
case r: RdfStreamOptions => metadata.optionCount += 1
}
): Unit = metadata.processStreamRow(row)

/** Checks whether the first frame in the stream contains options and returns them.
* @param headFrame
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,14 @@ package eu.neverblink.jelly.cli.command.rdf.util
import com.google.protobuf.ByteString
import eu.neverblink.jelly.cli.util.io.YamlDocBuilder
import eu.neverblink.jelly.cli.util.io.YamlDocBuilder.*
import eu.neverblink.jelly.core.proto.v1.RdfStreamOptions
import eu.neverblink.jelly.core.proto.v1.*

import java.io.OutputStream
import scala.language.postfixOps

/** This class is used to store the metrics for a single frame
*/
final class FrameInfo(val frameIndex: Long, val metadata: Map[String, ByteString]):
class FrameInfo(val frameIndex: Long, val metadata: Map[String, ByteString]):
var frameCount: Long = 1
var optionCount: Long = 0
var nameCount: Long = 0
Expand All @@ -36,10 +36,159 @@ final class FrameInfo(val frameIndex: Long, val metadata: Map[String, ByteString
this
}

def processStreamRow(row: RdfStreamRow): Unit = row.getRow match {
case r: RdfTriple => handleTriple(r)
case r: RdfQuad => handleQuad(r)
case r: RdfNameEntry => handleNameEntry(r)
case r: RdfPrefixEntry => handlePrefixEntry(r)
case r: RdfNamespaceDeclaration => handleNamespaceDeclaration(r)
case r: RdfDatatypeEntry => handleDatatypeEntry(r)
case r: RdfGraphStart => handleGraphStart(r)
case r: RdfGraphEnd => handleGraphEnd(r)
case r: RdfStreamOptions => handleOption(r)
}

protected def handleTriple(r: RdfTriple): Unit = tripleCount += 1
protected def handleQuad(r: RdfQuad): Unit = quadCount += 1
protected def handleNameEntry(r: RdfNameEntry): Unit = nameCount += 1
protected def handlePrefixEntry(r: RdfPrefixEntry): Unit = prefixCount += 1
protected def handleNamespaceDeclaration(r: RdfNamespaceDeclaration): Unit = namespaceCount += 1
protected def handleDatatypeEntry(r: RdfDatatypeEntry): Unit = datatypeCount += 1
protected def handleGraphStart(r: RdfGraphStart): Unit = graphStartCount += 1
protected def handleGraphEnd(r: RdfGraphEnd): Unit = graphEndCount += 1
protected def handleOption(r: RdfStreamOptions): Unit = optionCount += 1

def format(): Seq[(String, Long)] = Seq(
("option_count", optionCount),
("triple_count", tripleCount),
("quad_count", quadCount),
("graph_start_count", graphStartCount),
("graph_end_count", graphEndCount),
("namespace_count", namespaceCount),
("name_count", nameCount),
("prefix_count", prefixCount),
("datatype_count", datatypeCount),
)

end FrameInfo

class NodeDetailInfo:
var iriCount: Long = 0
var bNodeCount: Long = 0
var literalCount: Long = 0
var tripleCount: Long = 0
var defaultGraphCount: Long = 0

def handle(o: Object): Unit = o match {
case r: RdfIri => iriCount += 1
case r: String => bNodeCount += 1 // bnodes are strings
case r: RdfLiteral => literalCount += 1
case r: RdfTriple => tripleCount += 1
case r: RdfDefaultGraph => defaultGraphCount += 1
}

def format(): Seq[(String, Long)] = Seq(
("iri_count", iriCount),
("bnode_count", bNodeCount),
("literal_count", literalCount),
("triple_count", tripleCount),
("default_graph_count", defaultGraphCount),
)

def +=(other: NodeDetailInfo): NodeDetailInfo = {
this.iriCount += other.iriCount
this.bNodeCount += other.bNodeCount
this.literalCount += other.literalCount
this.tripleCount += other.tripleCount
this.defaultGraphCount += other.defaultGraphCount
this
}

def total(): Long = iriCount
+ bNodeCount
+ literalCount
+ tripleCount
+ defaultGraphCount

class FrameDetailInfo(frameIndex: Long, metadata: Map[String, ByteString])
extends FrameInfo(frameIndex, metadata):
var subjectInfo = new NodeDetailInfo()
var predicateInfo = new NodeDetailInfo()
var objectInfo = new NodeDetailInfo()
var graphInfo = new NodeDetailInfo()

override def +=(other: FrameInfo): FrameInfo = {
super.+=(other)
(this, other) match {
case (tthis: FrameDetailInfo, oother: FrameDetailInfo) =>
tthis.subjectInfo += oother.subjectInfo
tthis.predicateInfo += oother.predicateInfo
tthis.objectInfo += oother.objectInfo
tthis.graphInfo += oother.graphInfo
case _ =>
}
this
}

override def handleTriple(r: RdfTriple): Unit = {
super.handleTriple(r)
if r.hasSubject then subjectInfo.handle(r.getSubject)
if r.hasPredicate then predicateInfo.handle(r.getPredicate)
if r.hasObject then objectInfo.handle(r.getObject)
}

override def handleQuad(r: RdfQuad): Unit = {
super.handleQuad(r)
if r.hasSubject then subjectInfo.handle(r.getSubject)
if r.hasPredicate then predicateInfo.handle(r.getPredicate)
if r.hasObject then objectInfo.handle(r.getObject)
if r.hasGraph then graphInfo.handle(r.getGraph)
}

def formatFlat(): Seq[(String, Long)] =
subjectInfo.format().map("subject_" ++ _ -> _) ++
predicateInfo.format().map("predicate_" ++ _ -> _) ++
objectInfo.format().map("object_" ++ _ -> _) ++
graphInfo.format().map("graph_" ++ _ -> _)

def formatGroupByNode(): Seq[(String, Long)] =
val out = new NodeDetailInfo()
out += subjectInfo
out += predicateInfo
out += objectInfo
out += graphInfo
out.format()

def formatGroupByTerm(): Seq[(String, Long)] = Seq(
"subject_count" -> subjectInfo.total(),
"predicate_count" -> predicateInfo.total(),
"object_count" -> objectInfo.total(),
"graph_count" -> graphInfo.total(),
)

type Formatter = FrameInfo => Seq[(String, YamlValue)]
type DetailFormatter = FrameDetailInfo => Seq[(String, YamlValue)]

object MetricsPrinter:
private def withFallback(formatter: DetailFormatter): Formatter = {
case frame @ (detailInfo: FrameDetailInfo) =>
frame.format().map(_ -> YamlLong(_)) ++ formatter(detailInfo)
case frame @ (frameInfo: FrameInfo) => frame.format().map(_ -> YamlLong(_))
}

val flatFormatter: Formatter = withFallback(detailInfo => {
detailInfo.formatFlat().map(_ -> YamlLong(_))
})

val termGroupFormatter: Formatter = withFallback(detailInfo => {
Seq("term_details" -> YamlMap(detailInfo.formatGroupByTerm().map(_ -> YamlLong(_))*))
})

val nodeGroupFormatter: Formatter = withFallback(detailInfo => {
Seq("node_details" -> YamlMap(detailInfo.formatGroupByNode().map(_ -> YamlLong(_))*))
})

class MetricsPrinter(val formatter: Formatter):
def printPerFrame(
options: RdfStreamOptions,
iterator: Iterator[FrameInfo],
Expand Down Expand Up @@ -137,17 +286,6 @@ object MetricsPrinter:

private def formatStats(
frame: FrameInfo,
): Seq[(String, YamlValue)] =
Seq(
("option_count", YamlLong(frame.optionCount)),
("triple_count", YamlLong(frame.tripleCount)),
("quad_count", YamlLong(frame.quadCount)),
("graph_start_count", YamlLong(frame.graphStartCount)),
("graph_end_count", YamlLong(frame.graphEndCount)),
("namespace_count", YamlLong(frame.namespaceCount)),
("name_count", YamlLong(frame.nameCount)),
("prefix_count", YamlLong(frame.prefixCount)),
("datatype_count", YamlLong(frame.datatypeCount)),
)
): Seq[(String, YamlValue)] = this.formatter(frame)

end MetricsPrinter
Original file line number Diff line number Diff line change
Expand Up @@ -104,4 +104,97 @@ class RdfInspectSpec extends AnyWordSpec with Matchers with TestFixtureHelper:
val msg = InvalidJellyFile(RuntimeException("")).getMessage
exception.getMessage should include(msg)
}

"print detailed metrics" when {
"aggregating and flat" in withFullJellyFile(
testCode = { j =>
val (out, err) = RdfInspect.runTestCommand(List("rdf", "inspect", "--detail", "flat", j))
val yaml = new Yaml()
val parsed = yaml.load(out).asInstanceOf[java.util.Map[String, Any]]
parsed.get("frames") shouldBe a[util.LinkedHashMap[?, ?]]
val frames = parsed.get("frames").asInstanceOf[java.util.LinkedHashMap[String, Any]]
frames.get("subject_iri_count") shouldBe testCardinality
frames.get("subject_bnode_count") shouldBe 0
},
frameSize = 15,
)

"per frame and flat" in withFullJellyFile(
testCode = { j =>
val (out, err) =
RdfInspect.runTestCommand(List("rdf", "inspect", "--per-frame", "--detail", "flat", j))
val yaml = new Yaml()
val parsed = yaml.load(out).asInstanceOf[util.Map[String, Any]]
parsed.get("frames") shouldBe a[util.ArrayList[?]]
val frames = parsed.get("frames").asInstanceOf[util.ArrayList[Any]]
frames.get(0) shouldBe a[util.LinkedHashMap[?, ?]]
val frame1 = frames.get(0).asInstanceOf[util.Map[String, Any]]
frame1.get("subject_iri_count") shouldBe 6
frame1.get("subject_bnode_count") shouldBe 0
},
frameSize = 15,
)

"aggregating and grouping by node" in withFullJellyFile(
testCode = { j =>
val (out, err) = RdfInspect.runTestCommand(List("rdf", "inspect", "--detail", "node", j))
val yaml = new Yaml()
val parsed = yaml.load(out).asInstanceOf[util.Map[String, Any]]
parsed.get("frames") shouldBe a[util.LinkedHashMap[?, ?]]
val frames = parsed.get("frames").asInstanceOf[util.LinkedHashMap[String, Any]]
frames.get("node_details") shouldBe a[util.LinkedHashMap[?, ?]]
val nodeDetails = frames.get("node_details").asInstanceOf[util.LinkedHashMap[String, Any]]
nodeDetails.get("iri_count") shouldBe testCardinality * 3
nodeDetails.get("bnode_count") shouldBe 0
},
frameSize = 15,
)

"per frame and grouping by node" in withFullJellyFile(
testCode = { j =>
val (out, err) =
RdfInspect.runTestCommand(List("rdf", "inspect", "--per-frame", "--detail", "node", j))
val yaml = new Yaml()
val parsed = yaml.load(out).asInstanceOf[util.Map[String, Any]]
parsed.get("frames") shouldBe a[util.ArrayList[?]]
val frames = parsed.get("frames").asInstanceOf[util.ArrayList[Any]]
frames.get(0) shouldBe a[util.LinkedHashMap[?, ?]]
val frame1 = frames.get(0).asInstanceOf[util.Map[String, Any]]
val nodeDetails = frame1.get("node_details").asInstanceOf[util.LinkedHashMap[String, Any]]
nodeDetails.get("iri_count") shouldBe 6 * 3
nodeDetails.get("bnode_count") shouldBe 0
},
frameSize = 15,
)

"aggregating and grouping by term" in withFullJellyFile(
testCode = { j =>
val (out, err) = RdfInspect.runTestCommand(List("rdf", "inspect", "--detail", "term", j))
val yaml = new Yaml()
val parsed = yaml.load(out).asInstanceOf[util.Map[String, Any]]
parsed.get("frames") shouldBe a[util.LinkedHashMap[?, ?]]
val frames = parsed.get("frames").asInstanceOf[util.LinkedHashMap[String, Any]]
frames.get("term_details") shouldBe a[util.LinkedHashMap[?, ?]]
val nodeDetails = frames.get("term_details").asInstanceOf[util.LinkedHashMap[String, Any]]
nodeDetails.get("subject_count") shouldBe testCardinality
},
frameSize = 15,
)

"per frame and grouping by term" in withFullJellyFile(
testCode = { j =>
val (out, err) =
RdfInspect.runTestCommand(List("rdf", "inspect", "--per-frame", "--detail", "term", j))
val yaml = new Yaml()
val parsed = yaml.load(out).asInstanceOf[util.Map[String, Any]]
parsed.get("frames") shouldBe a[util.ArrayList[?]]
val frames = parsed.get("frames").asInstanceOf[util.ArrayList[Any]]
frames.get(0) shouldBe a[util.LinkedHashMap[?, ?]]
val frame1 = frames.get(0).asInstanceOf[util.Map[String, Any]]
val nodeDetails = frame1.get("term_details").asInstanceOf[util.LinkedHashMap[String, Any]]
nodeDetails.get("subject_count") shouldBe 6
},
frameSize = 15,
)
}
}