Skip to content

Commit 35e97a2

Browse files
Add an inspect command that gathers metrics about a Jelly file (#65)
Issue: #39 By default aggregates the metrics based on the whole file, but can also return them --per-frame Returns a valid Yaml as an output Can return the metrics to a file with --to Accepts an input stream as well as a file --------- Co-authored-by: Karolina Bogacka <karolina@neverblink.eu>
1 parent 7d168b1 commit 35e97a2

File tree

12 files changed

+425
-32
lines changed

12 files changed

+425
-32
lines changed

build.sbt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ lazy val root = (project in file("."))
3838
"eu.ostrzyciel.jelly" %% "jelly-jena" % jellyV,
3939
"com.github.alexarchambault" %% "case-app" % "2.1.0-M30",
4040
"org.scalatest" %% "scalatest" % "3.2.19" % Test,
41+
"org.yaml" % "snakeyaml" % "2.4" % Test,
4142
),
4243
scalacOptions ++= Seq(
4344
"-Wunused:imports",

src/main/scala/eu/neverblink/jelly/cli/App.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,4 +22,5 @@ object App extends CommandsEntryPoint:
2222
Version,
2323
RdfFromJelly,
2424
RdfToJelly,
25+
RdfInspect,
2526
)

src/main/scala/eu/neverblink/jelly/cli/Exceptions.scala

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
11
package eu.neverblink.jelly.cli
22

3-
import com.google.protobuf.InvalidProtocolBufferException
43
import org.apache.jena.riot.RiotException
54

65
/** Contains a set of common jelly-cli exceptions with custom output messages.
@@ -22,7 +21,7 @@ case class JellyTranscodingError(message: String)
2221
extends CriticalException(s"Jelly transcoding error: $message")
2322
case class JenaRiotException(e: RiotException)
2423
extends CriticalException(s"Jena RDF I/O exception: ${e.getMessage}")
25-
case class InvalidJellyFile(e: InvalidProtocolBufferException)
24+
case class InvalidJellyFile(e: Exception)
2625
extends CriticalException(s"Invalid Jelly file: ${e.getMessage}")
2726
case class InvalidFormatSpecified(format: String, validFormats: String)
2827
extends CriticalException(

src/main/scala/eu/neverblink/jelly/cli/command/rdf/RdfFromJelly.scala

Lines changed: 3 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -3,9 +3,9 @@ import caseapp.*
33
import eu.neverblink.jelly.cli.*
44
import eu.neverblink.jelly.cli.command.rdf.RdfFormat.*
55
import eu.neverblink.jelly.cli.command.rdf.RdfFormat.Jena.*
6+
import eu.neverblink.jelly.cli.util.JellyUtil
67
import eu.ostrzyciel.jelly.convert.jena.riot.JellyLanguage
78
import eu.ostrzyciel.jelly.core.proto.v1.RdfStreamFrame
8-
import eu.ostrzyciel.jelly.core.IoUtils
99
import org.apache.jena.riot.system.StreamRDFWriter
1010
import org.apache.jena.riot.{Lang, RDFParser}
1111

@@ -25,7 +25,7 @@ case class RdfFromJellyOptions(
2525
@ExtraName("out-format") outputFormat: Option[String] = None,
2626
) extends HasJellyCommandOptions
2727

28-
object RdfFromJelly extends RdfCommand[RdfFromJellyOptions, RdfFormat.Writeable]:
28+
object RdfFromJelly extends RdfTranscodeCommand[RdfFromJellyOptions, RdfFormat.Writeable]:
2929

3030
override def names: List[List[String]] = List(
3131
List("rdf", "from-jelly"),
@@ -83,30 +83,10 @@ object RdfFromJelly extends RdfCommand[RdfFromJellyOptions, RdfFormat.Writeable]
8383
outputStream.write(frame.getBytes)
8484

8585
try {
86-
iterateRdfStream(inputStream, outputStream).zipWithIndex.foreach {
86+
JellyUtil.iterateRdfStream(inputStream).zipWithIndex.foreach {
8787
case (maybeFrame, frameIndex) =>
8888
writeFrameToOutput(maybeFrame, frameIndex)
8989
}
9090
} finally {
9191
outputStream.flush()
9292
}
93-
94-
/** This method reads the Jelly file and returns an iterator of RdfStreamFrame
95-
* @param inputStream
96-
* @param outputStream
97-
* @return
98-
*/
99-
private def iterateRdfStream(
100-
inputStream: InputStream,
101-
outputStream: OutputStream,
102-
): Iterator[RdfStreamFrame] =
103-
IoUtils.autodetectDelimiting(inputStream) match
104-
case (false, newIn) =>
105-
// Non-delimited Jelly file
106-
// In this case, we can only read one frame
107-
Iterator(RdfStreamFrame.parseFrom(newIn))
108-
case (true, newIn) =>
109-
// Delimited Jelly file
110-
// In this case, we can read multiple frames
111-
Iterator.continually(RdfStreamFrame.parseDelimitedFrom(newIn))
112-
.takeWhile(_.isDefined).map(_.get)
Lines changed: 91 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,91 @@
1+
package eu.neverblink.jelly.cli.command.rdf
2+
3+
import caseapp.{ExtraName, Recurse}
4+
import caseapp.core.RemainingArgs
5+
import eu.neverblink.jelly.cli.util.{FrameInfo, JellyUtil, MetricsPrinter}
6+
import eu.neverblink.jelly.cli.*
7+
import eu.ostrzyciel.jelly.core.proto.v1.*
8+
9+
import java.io.InputStream
10+
11+
case class RdfInspectOptions(
12+
@Recurse
13+
common: JellyCommandOptions = JellyCommandOptions(),
14+
@ExtraName("to") outputFile: Option[String] = None,
15+
@ExtraName("per-frame") perFrame: Boolean = false,
16+
) extends HasJellyCommandOptions
17+
18+
object RdfInspect extends JellyCommand[RdfInspectOptions]:
19+
20+
override def names: List[List[String]] = List(
21+
List("rdf", "inspect"),
22+
)
23+
24+
override final def group = "rdf"
25+
26+
override def doRun(options: RdfInspectOptions, remainingArgs: RemainingArgs): Unit =
27+
val (inputStream, outputStream) =
28+
this.getIoStreamsFromOptions(remainingArgs.remaining.headOption, options.outputFile)
29+
val (streamOpts, frameIterator) = inspectJelly(inputStream)
30+
if options.perFrame then MetricsPrinter.printPerFrame(streamOpts, frameIterator, outputStream)
31+
else MetricsPrinter.printAggregate(streamOpts, frameIterator, outputStream)
32+
33+
private def inspectJelly(
34+
inputStream: InputStream,
35+
): (RdfStreamOptions, Iterator[FrameInfo]) =
36+
37+
inline def computeMetrics(
38+
frame: RdfStreamFrame,
39+
frameIndex: Int,
40+
): FrameInfo =
41+
val metrics = new FrameInfo(frameIndex)
42+
frame.rows.foreach(r => metricsForRow(r, metrics))
43+
metrics
44+
45+
try {
46+
val allRows = JellyUtil.iterateRdfStream(inputStream).buffered
47+
// we need to check if the first frame contains options
48+
val streamOptions = checkOptions(allRows.headOption)
49+
// We compute the metrics for each frame
50+
// and then sum them all during the printing if desired
51+
val frameIterator = allRows.zipWithIndex.map { case (maybeFrame, frameIndex) =>
52+
computeMetrics(maybeFrame, frameIndex)
53+
}
54+
(streamOptions, frameIterator)
55+
} catch {
56+
case e: Exception =>
57+
throw InvalidJellyFile(e)
58+
}
59+
60+
private def metricsForRow(
61+
row: RdfStreamRow,
62+
metadata: FrameInfo,
63+
): Unit =
64+
row.row match {
65+
case r: RdfTriple => metadata.tripleCount += 1
66+
case r: RdfQuad => metadata.quadCount += 1
67+
case r: RdfNameEntry => metadata.nameCount += 1
68+
case r: RdfPrefixEntry => metadata.prefixCount += 1
69+
case r: RdfNamespaceDeclaration => metadata.namespaceCount += 1
70+
case r: RdfDatatypeEntry => metadata.datatypeCount += 1
71+
case r: RdfGraphStart => metadata.graphStartCount += 1
72+
case r: RdfGraphEnd => metadata.graphEndCount += 1
73+
case r: RdfStreamOptions => metadata.optionCount += 1
74+
}
75+
76+
/** Checks whether the first frame in the stream contains options and returns them.
77+
* @param headFrame
78+
* The first frame in the stream as an option.
79+
* @return
80+
* The options from the first frame.
81+
* @throws RuntimeException
82+
* If the first frame does not contain options or if there are no frames in the stream.
83+
*/
84+
private def checkOptions(headFrame: Option[RdfStreamFrame]): RdfStreamOptions =
85+
if headFrame.isEmpty then throw new RuntimeException("No frames in the stream.")
86+
if headFrame.get.rows.isEmpty then throw new RuntimeException("No rows in the frame.")
87+
val frameRows = headFrame.get.rows
88+
frameRows.head.row match {
89+
case r: RdfStreamOptions => r
90+
case _ => throw new RuntimeException("First row of the frame is not an options row.")
91+
}

src/main/scala/eu/neverblink/jelly/cli/command/rdf/RdfToJelly.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ case class RdfToJellyOptions(
4040
delimited: Boolean = true,
4141
) extends HasJellyCommandOptions
4242

43-
object RdfToJelly extends RdfCommand[RdfToJellyOptions, RdfFormat.Readable]:
43+
object RdfToJelly extends RdfTranscodeCommand[RdfToJellyOptions, RdfFormat.Readable]:
4444

4545
override def names: List[List[String]] = List(
4646
List("rdf", "to-jelly"),

src/main/scala/eu/neverblink/jelly/cli/command/rdf/RdfCommand.scala renamed to src/main/scala/eu/neverblink/jelly/cli/command/rdf/RdfTranscodeCommand.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,8 +12,8 @@ import java.io.{InputStream, OutputStream}
1212

1313
/** This abstract class is responsible for the common logic in both RDF parsing commands
1414
*/
15-
abstract class RdfCommand[T <: HasJellyCommandOptions: {Parser, Help}, F <: RdfFormat](using
16-
tt: TypeTest[RdfFormat, F],
15+
abstract class RdfTranscodeCommand[T <: HasJellyCommandOptions: {Parser, Help}, F <: RdfFormat](
16+
using tt: TypeTest[RdfFormat, F],
1717
) extends JellyCommand[T]:
1818

1919
override final def group = "rdf"
Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
package eu.neverblink.jelly.cli.util
2+
3+
import eu.ostrzyciel.jelly.core.IoUtils
4+
import eu.ostrzyciel.jelly.core.proto.v1.RdfStreamFrame
5+
6+
import java.io.InputStream
7+
8+
object JellyUtil:
9+
/** This method reads the Jelly file and returns an iterator of RdfStreamFrame
10+
*
11+
* @param inputStream
12+
* @param outputStream
13+
* @return
14+
*/
15+
def iterateRdfStream(
16+
inputStream: InputStream,
17+
): Iterator[RdfStreamFrame] =
18+
IoUtils.autodetectDelimiting(inputStream) match
19+
case (false, newIn) =>
20+
// Non-delimited Jelly file
21+
// In this case, we can only read one frame
22+
Iterator(RdfStreamFrame.parseFrom(newIn))
23+
case (true, newIn) =>
24+
// Delimited Jelly file
25+
// In this case, we can read multiple frames
26+
Iterator.continually(RdfStreamFrame.parseDelimitedFrom(newIn))
27+
.takeWhile(_.isDefined).map(_.get)
Lines changed: 131 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,131 @@
1+
package eu.neverblink.jelly.cli.util
2+
3+
import eu.neverblink.jelly.cli.util.YamlDocBuilder.*
4+
import eu.ostrzyciel.jelly.core.proto.v1.RdfStreamOptions
5+
6+
import java.io.OutputStream
7+
8+
/** This class is used to store the metrics for a single frame
9+
*/
10+
final class FrameInfo(val frameIndex: Int):
11+
var frameCount: Int = 1
12+
var optionCount: Int = 0
13+
var nameCount: Int = 0
14+
var namespaceCount: Int = 0
15+
var tripleCount: Int = 0
16+
var quadCount: Int = 0
17+
var prefixCount: Int = 0
18+
var datatypeCount: Int = 0
19+
var graphStartCount: Int = 0
20+
var graphEndCount: Int = 0
21+
22+
def +=(other: FrameInfo): FrameInfo = {
23+
this.frameCount += 1
24+
this.optionCount += other.optionCount
25+
this.nameCount += other.nameCount
26+
this.namespaceCount += other.namespaceCount
27+
this.tripleCount += other.tripleCount
28+
this.quadCount += other.quadCount
29+
this.prefixCount += other.prefixCount
30+
this.datatypeCount += other.datatypeCount
31+
this.graphStartCount += other.graphStartCount
32+
this.graphEndCount += other.graphEndCount
33+
this
34+
}
35+
36+
end FrameInfo
37+
38+
object MetricsPrinter:
39+
40+
def printPerFrame(
41+
options: RdfStreamOptions,
42+
iterator: Iterator[FrameInfo],
43+
o: OutputStream,
44+
): Unit =
45+
printOptions(options, o)
46+
val builder =
47+
YamlDocBuilder.build(
48+
YamlMap(
49+
"frames" -> YamlBlank(),
50+
),
51+
)
52+
val fullString = builder.getString
53+
o.write(fullString.getBytes)
54+
iterator.foreach { frame =>
55+
val yamlFrame = YamlListElem(formatStatsIndex(frame))
56+
val fullString = YamlDocBuilder.build(yamlFrame, builder.currIndent).getString
57+
o.write(fullString.getBytes)
58+
o.write(System.lineSeparator().getBytes)
59+
}
60+
61+
def printAggregate(
62+
options: RdfStreamOptions,
63+
iterator: Iterator[FrameInfo],
64+
o: OutputStream,
65+
): Unit = {
66+
printOptions(options, o)
67+
val sumCounts = iterator.reduce((a, b) => a += b)
68+
val fullString =
69+
YamlDocBuilder.build(
70+
YamlMap(
71+
"frames" -> formatStatsCount(sumCounts),
72+
),
73+
).getString
74+
o.write(fullString.getBytes)
75+
}
76+
77+
private def printOptions(
78+
printOptions: RdfStreamOptions,
79+
o: OutputStream,
80+
): Unit =
81+
val options = formatOptions(options = printOptions)
82+
val fullString =
83+
YamlDocBuilder.build(
84+
YamlMap(
85+
"stream_options" -> options,
86+
),
87+
).getString
88+
o.write(fullString.getBytes)
89+
o.write(System.lineSeparator().getBytes)
90+
91+
private def formatOptions(
92+
options: RdfStreamOptions,
93+
): YamlMap =
94+
YamlMap(
95+
"stream_name" -> YamlString(options.streamName),
96+
"physical_type" -> YamlEnum(options.physicalType.toString, options.physicalType.value),
97+
"generalized_statements" -> YamlBool(options.generalizedStatements),
98+
"rdf_star" -> YamlBool(options.rdfStar),
99+
"max_name_table_size" -> YamlInt(options.maxNameTableSize),
100+
"max_prefix_table_size" -> YamlInt(options.maxPrefixTableSize),
101+
"max_datatype_table_size" -> YamlInt(options.maxDatatypeTableSize),
102+
"logical_type" -> YamlEnum(options.logicalType.toString, options.logicalType.value),
103+
"version" -> YamlInt(options.version),
104+
)
105+
106+
private def formatStatsIndex(
107+
frame: FrameInfo,
108+
): YamlMap =
109+
YamlMap(Seq(("frame_index", YamlInt(frame.frameIndex))) ++ formatStats(frame)*)
110+
111+
private def formatStatsCount(
112+
frame: FrameInfo,
113+
): YamlMap =
114+
YamlMap(Seq(("frame_count", YamlInt(frame.frameCount))) ++ formatStats(frame)*)
115+
116+
private def formatStats(
117+
frame: FrameInfo,
118+
): Seq[(String, YamlValue)] =
119+
Seq(
120+
("option_count", YamlInt(frame.optionCount)),
121+
("triple_count", YamlInt(frame.tripleCount)),
122+
("quad_count", YamlInt(frame.quadCount)),
123+
("graph_start_count", YamlInt(frame.graphStartCount)),
124+
("graph_end_count", YamlInt(frame.graphEndCount)),
125+
("namespace_count", YamlInt(frame.namespaceCount)),
126+
("name_count", YamlInt(frame.nameCount)),
127+
("prefix_count", YamlInt(frame.prefixCount)),
128+
("datatype_count", YamlInt(frame.datatypeCount)),
129+
)
130+
131+
end MetricsPrinter

0 commit comments

Comments
 (0)