Skip to content

Commit 844a26b

Browse files
GH-93 Allow non streaming output formats in rdf from-jelly (#148)
* Add StreamRdfBatchWriteable as a fallback for non-streaming formats * Remove test for unwriteable formats * Rename RdfFormat.Jena.Writeable to RdfFormat.Jena.StreamWriteable * Add support for combining frames * Add tests for Dataset handling, frame handling, and frame combining. * Doc corrections
1 parent 0318ee9 commit 844a26b

File tree

5 files changed

+136
-47
lines changed

5 files changed

+136
-47
lines changed

src/main/scala/eu/neverblink/jelly/cli/command/rdf/RdfFromJelly.scala

Lines changed: 24 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -5,18 +5,20 @@ import eu.neverblink.jelly.cli.*
55
import eu.neverblink.jelly.cli.command.rdf.util.*
66
import eu.neverblink.jelly.cli.command.rdf.util.RdfFormat.*
77
import eu.neverblink.jelly.cli.util.args.IndexRange
8+
import eu.neverblink.jelly.cli.util.jena.StreamRdfBatchWriter
89
import eu.neverblink.jelly.convert.jena.JenaConverterFactory
910
import eu.neverblink.jelly.core.JellyOptions
1011
import eu.neverblink.jelly.core.RdfHandler.AnyStatementHandler
1112
import eu.neverblink.jelly.core.proto.v1.RdfStreamFrame
1213
import eu.neverblink.jelly.core.proto.google.v1 as google
1314
import org.apache.jena.graph.{Node, Triple}
14-
import org.apache.jena.riot.Lang
15+
import org.apache.jena.riot.system.StreamRDF
1516
import org.apache.jena.riot.system.StreamRDFWriter
1617
import org.apache.jena.sparql.core.Quad
1718

1819
import java.io.{InputStream, OutputStream}
1920
import scala.jdk.CollectionConverters.*
21+
import eu.neverblink.jelly.cli.util.jena.StreamRdfCombiningBatchWriter
2022

2123
object RdfFromJellyPrint extends RdfCommandPrintUtil[RdfFormat.Writeable]:
2224
override val defaultFormat: RdfFormat = RdfFormat.NQuads
@@ -27,7 +29,10 @@ object RdfFromJellyPrint extends RdfCommandPrintUtil[RdfFormat.Writeable]:
2729
"If no output file is specified, the output is written to stdout.\n" +
2830
"If an error is detected, the program will exit with a non-zero code.\n" +
2931
"Otherwise, the program will exit with code 0.\n" +
30-
"Note: this command works in a streaming manner and scales well to large files",
32+
"Note: this command works in a streaming manner where possible and scales well to\n" +
33+
"large files. Non-streaming formats (e.g. RDF/XML) by default work on a\n" +
34+
"frame-by-frame basis, but they can be combined into one dataset with the\n" +
35+
"--combine option. RDF/XML will only serialize the default model.",
3136
)
3237
@ArgsName("<file-to-convert>")
3338
case class RdfFromJellyOptions(
@@ -47,6 +52,11 @@ case class RdfFromJellyOptions(
4752
IndexRange.helpText,
4853
)
4954
takeFrames: String = "",
55+
@HelpMessage(
56+
"Add to combine the results into one dataset, when using a non-streaming output format. " +
57+
"Ignored otherwise. Take care with input size, as this option will load everything into memory.",
58+
)
59+
combine: Boolean = false,
5060
) extends HasJellyCommandOptions
5161

5262
object RdfFromJelly extends RdfSerDesCommand[RdfFromJellyOptions, RdfFormat.Writeable]:
@@ -58,7 +68,7 @@ object RdfFromJelly extends RdfSerDesCommand[RdfFromJellyOptions, RdfFormat.Writ
5868
lazy val printUtil: RdfCommandPrintUtil[RdfFormat.Writeable] = RdfFromJellyPrint
5969

6070
val defaultAction: (InputStream, OutputStream) => Unit =
61-
jellyToLang(RdfFormat.NQuads.jenaLang, _, _)
71+
(in, out) => jellyToLang(in, StreamRDFWriter.getWriterStream(out, RdfFormat.NQuads.jenaLang))
6272

6373
private def takeFrames: IndexRange = IndexRange(getOptions.takeFrames, "--take-frames")
6474

@@ -72,9 +82,16 @@ object RdfFromJelly extends RdfSerDesCommand[RdfFromJellyOptions, RdfFormat.Writ
7282
override def matchFormatToAction(
7383
format: RdfFormat.Writeable,
7484
): Option[(InputStream, OutputStream) => Unit] =
75-
format match
76-
case j: RdfFormat.Jena.Writeable => Some(jellyToLang(j.jenaLang, _, _))
77-
case RdfFormat.JellyText => Some(jellyBinaryToText)
85+
(format, getOptions.combine) match
86+
case (j: RdfFormat.Jena.StreamWriteable, _) =>
87+
Some((in, out) => jellyToLang(in, StreamRDFWriter.getWriterStream(out, j.jenaLang)))
88+
case (j: RdfFormat.Jena.BatchWriteable, true) =>
89+
Some((in, out) =>
90+
StreamRdfCombiningBatchWriter(out, j.jenaLang).runAndOutput(x => jellyToLang(in, x)),
91+
)
92+
case (j: RdfFormat.Jena.BatchWriteable, false) =>
93+
Some((in, out) => jellyToLang(in, StreamRdfBatchWriter(out, j.jenaLang)))
94+
case (RdfFormat.JellyText, _) => Some(jellyBinaryToText)
7895

7996
/** This method reads the Jelly file, rewrites it to specified format and writes it to some output
8097
* stream
@@ -86,11 +103,9 @@ object RdfFromJelly extends RdfSerDesCommand[RdfFromJellyOptions, RdfFormat.Writ
86103
* OutputStream
87104
*/
88105
private def jellyToLang(
89-
jenaLang: Lang,
90106
inputStream: InputStream,
91-
outputStream: OutputStream,
107+
writer: StreamRDF,
92108
): Unit =
93-
val writer = StreamRDFWriter.getWriterStream(outputStream, jenaLang)
94109
// Whether the output is active at this moment
95110
var outputEnabled = false
96111
val handler = new AnyStatementHandler[Node] {

src/main/scala/eu/neverblink/jelly/cli/command/rdf/util/RdfFormat.scala

Lines changed: 10 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -16,45 +16,46 @@ object RdfFormat:
1616
val jenaLang: Lang
1717

1818
object Jena:
19-
sealed trait Writeable extends Jena, RdfFormat.Writeable
19+
sealed trait StreamWriteable extends Jena, RdfFormat.Writeable
2020
sealed trait Readable extends Jena, RdfFormat.Readable
21+
sealed trait BatchWriteable extends Jena, RdfFormat.Writeable
2122

22-
case object NQuads extends RdfFormat.Jena.Writeable, RdfFormat.Jena.Readable:
23+
case object NQuads extends RdfFormat.Jena.StreamWriteable, RdfFormat.Jena.Readable:
2324
override val fullName: String = "N-Quads"
2425
override val cliOptions: List[String] = List("nq", "nquads")
2526
override val jenaLang: Lang = RDFLanguages.NQUADS
2627

27-
case object NTriples extends RdfFormat.Jena.Writeable, RdfFormat.Jena.Readable:
28+
case object NTriples extends RdfFormat.Jena.StreamWriteable, RdfFormat.Jena.Readable:
2829
override val fullName: String = "N-Triples"
2930
override val cliOptions: List[String] = List("nt", "ntriples")
3031
override val jenaLang: Lang = RDFLanguages.NTRIPLES
3132

32-
case object Turtle extends RdfFormat.Jena.Writeable, RdfFormat.Jena.Readable:
33+
case object Turtle extends RdfFormat.Jena.StreamWriteable, RdfFormat.Jena.Readable:
3334
override val fullName: String = "Turtle"
3435
override val cliOptions: List[String] = List("ttl", "turtle")
3536
override val jenaLang: Lang = RDFLanguages.TURTLE
3637

37-
case object TriG extends RdfFormat.Jena.Writeable, RdfFormat.Jena.Readable:
38+
case object TriG extends RdfFormat.Jena.StreamWriteable, RdfFormat.Jena.Readable:
3839
override val fullName: String = "TriG"
3940
override val cliOptions: List[String] = List("trig")
4041
override val jenaLang: Lang = RDFLanguages.TRIG
4142

42-
case object RdfProto extends RdfFormat.Jena.Writeable, RdfFormat.Jena.Readable:
43+
case object RdfProto extends RdfFormat.Jena.StreamWriteable, RdfFormat.Jena.Readable:
4344
override val fullName: String = "RDF Protobuf"
4445
override val cliOptions: List[String] = List("jenaproto", "jena-proto")
4546
override val jenaLang: Lang = RDFLanguages.RDFPROTO
4647

47-
case object Thrift extends RdfFormat.Jena.Writeable, RdfFormat.Jena.Readable:
48+
case object Thrift extends RdfFormat.Jena.StreamWriteable, RdfFormat.Jena.Readable:
4849
override val fullName: String = "RDF Thrift"
4950
override val cliOptions: List[String] = List("jenathrift", "jena-thrift")
5051
override val jenaLang: Lang = RDFLanguages.RDFTHRIFT
5152

52-
case object RdfXml extends RdfFormat.Jena.Readable:
53+
case object RdfXml extends RdfFormat.Jena.Readable, RdfFormat.Jena.BatchWriteable:
5354
override val fullName: String = "RDF/XML"
5455
override val cliOptions: List[String] = List("rdfxml", "rdf-xml")
5556
override val jenaLang: Lang = RDFLanguages.RDFXML
5657

57-
case object JsonLd extends RdfFormat.Jena.Readable:
58+
case object JsonLd extends RdfFormat.Jena.Readable, RdfFormat.Jena.BatchWriteable:
5859
override val fullName: String = "JSON-LD"
5960
override val cliOptions: List[String] = List("jsonld", "json-ld")
6061
override val jenaLang: Lang = RDFLanguages.JSONLD
Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
package eu.neverblink.jelly.cli.util.jena
2+
3+
import org.apache.jena.riot.system.StreamRDF
4+
import org.apache.jena.sparql.core.Quad
5+
import org.apache.jena.graph.Triple
6+
import org.apache.jena.riot.system.StreamRDFLib
7+
import java.io.OutputStream
8+
import org.apache.jena.riot.Lang
9+
import org.apache.jena.riot.RDFDataMgr
10+
import org.apache.jena.query.DatasetFactory
11+
import org.apache.jena.query.Dataset
12+
13+
/** A StreamRDF implementation that collects everything into a Model. When finishing, formats
14+
* everything according to the lang, and emits it to the outputStream. This is meant to be a
15+
* fallback for non-streaming RDF formats, as it requires all data to be loaded in memory.
16+
*/
17+
class StreamRdfBatchWriter(val outputStream: OutputStream, val lang: Lang) extends StreamRDF:
18+
protected val dataset: Dataset = DatasetFactory.create()
19+
protected val datasetStream: StreamRDF = StreamRDFLib.dataset(dataset.asDatasetGraph())
20+
override def quad(quad: Quad): Unit = datasetStream.quad(quad)
21+
override def triple(triple: Triple): Unit = datasetStream.triple(triple)
22+
override def prefix(prefix: String, iri: String): Unit = datasetStream.prefix(prefix, iri)
23+
override def base(base: String): Unit = datasetStream.base(base)
24+
override def finish(): Unit = writeOutput()
25+
override def start(): Unit = ()
26+
def writeOutput(): Unit =
27+
if lang == Lang.RDFXML then RDFDataMgr.write(outputStream, dataset.getDefaultModel, lang)
28+
else RDFDataMgr.write(outputStream, dataset, lang)
29+
def runAndOutput(runnable: StreamRDF => Unit): Unit = {
30+
runnable(this)
31+
writeOutput()
32+
}
33+
34+
class StreamRdfCombiningBatchWriter(outputStream: OutputStream, lang: Lang)
35+
extends StreamRdfBatchWriter(outputStream, lang):
36+
override def finish(): Unit = ()

src/test/scala/eu/neverblink/jelly/cli/command/helpers/DataGenHelper.scala

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -148,3 +148,14 @@ object DataGenHelper:
148148
RDFDataMgr.write(outputStream, dataset, jenaLang)
149149
val nQuadStream = new ByteArrayInputStream(outputStream.toByteArray)
150150
nQuadStream
151+
152+
def generateJellyInputStreamDataset(
153+
nGraphs: Int,
154+
nTriplesPerGraph: Int,
155+
differentiator: String,
156+
): ByteArrayInputStream =
157+
val model = generateDataset(nGraphs, nTriplesPerGraph, differentiator)
158+
val outputStream = new ByteArrayOutputStream()
159+
RDFDataMgr.write(outputStream, model, JellyLanguage.JELLY)
160+
val jellyStream = new ByteArrayInputStream(outputStream.toByteArray)
161+
jellyStream

src/test/scala/eu/neverblink/jelly/cli/command/rdf/RdfFromJellySpec.scala

Lines changed: 55 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ import eu.neverblink.jelly.cli.command.helpers.*
66
import eu.neverblink.jelly.cli.command.rdf.util.RdfFormat
77
import eu.neverblink.jelly.core.proto.v1.{PhysicalStreamType, RdfStreamFrame}
88
import eu.neverblink.jelly.core.{JellyOptions, JellyTranscoderFactory}
9-
import org.apache.jena.riot.RDFLanguages
9+
import org.apache.jena.query.DatasetFactory
1010
import org.scalatest.matchers.should.Matchers
1111
import org.scalatest.wordspec.AnyWordSpec
1212

@@ -15,6 +15,8 @@ import java.nio.file.attribute.PosixFilePermissions
1515
import java.nio.file.{Files, Paths}
1616
import scala.io.Source
1717
import scala.util.Using
18+
import org.apache.jena.riot.RDFDataMgr
19+
import org.apache.jena.rdf.model.ModelFactory
1820

1921
class RdfFromJellySpec extends AnyWordSpec with Matchers with TestFixtureHelper:
2022

@@ -217,6 +219,58 @@ class RdfFromJellySpec extends AnyWordSpec with Matchers with TestFixtureHelper:
217219
}
218220
}
219221

222+
"handle conversion of Jelly binary to various formats" when {
223+
for (lang, header) <- Seq(
224+
(RdfFormat.JsonLd, "\\{\n {4}\"@graph\":".r),
225+
(RdfFormat.RdfXml, "<rdf:RDF".r),
226+
)
227+
do
228+
s"input stream to output ${lang.fullName} stream" in {
229+
val input = DataGenHelper.generateJellyInputStream(testCardinality)
230+
RdfFromJelly.setStdIn(input)
231+
val model = DataGenHelper.generateTripleModel(testCardinality)
232+
val (out, err) = RdfFromJelly.runTestCommand(
233+
List("rdf", "from-jelly", "--out-format", lang.cliOptions.head),
234+
)
235+
val newModel = ModelFactory.createDefaultModel()
236+
RDFDataMgr.read(newModel, new ByteArrayInputStream(out.getBytes()), lang.jenaLang)
237+
model.isIsomorphicWith(newModel) shouldBe true
238+
}
239+
240+
s"dataset input stream to output ${lang.fullName} stream" in {
241+
val input = DataGenHelper.generateJellyInputStreamDataset(2, testCardinality, "")
242+
RdfFromJelly.setStdIn(input)
243+
val dataset = DataGenHelper.generateDataset(2, testCardinality, "")
244+
val (out, err) = RdfFromJelly.runTestCommand(
245+
List("rdf", "from-jelly", "--out-format", lang.cliOptions.head),
246+
)
247+
val newDataset = DatasetFactory.create()
248+
RDFDataMgr.read(newDataset, new ByteArrayInputStream(out.getBytes()), lang.jenaLang)
249+
newDataset.isEmpty shouldBe false
250+
dataset.getDefaultModel.isIsomorphicWith(newDataset.getDefaultModel) shouldBe true
251+
if lang != RdfFormat.RdfXml then
252+
dataset.getNamedModel("http://example.org/graph/2").isIsomorphicWith(
253+
newDataset.getNamedModel("http://example.org/graph/2"),
254+
) shouldBe true
255+
}
256+
257+
s"multiple frames input stream to output ${lang.fullName} stream without --combine flag" in {
258+
RdfFromJelly.setStdIn(ByteArrayInputStream(input10Frames))
259+
val (out, err) = RdfFromJelly.runTestCommand(
260+
List("rdf", "from-jelly", "--out-format", lang.cliOptions.head),
261+
)
262+
header.findAllIn(out).length shouldBe 10
263+
}
264+
265+
s"multiple frames input stream to output ${lang.fullName} stream with --combine flag" in {
266+
RdfFromJelly.setStdIn(ByteArrayInputStream(input10Frames))
267+
val (out, err) = RdfFromJelly.runTestCommand(
268+
List("rdf", "from-jelly", "--combine", "--out-format", lang.cliOptions.head),
269+
)
270+
header.findAllIn(out).length shouldBe 1
271+
}
272+
}
273+
220274
"throw proper exception" when {
221275
"input file is not found" in {
222276
val nonExist = "non-existing-file"
@@ -339,34 +393,6 @@ class RdfFromJellySpec extends AnyWordSpec with Matchers with TestFixtureHelper:
339393
}
340394
}
341395

342-
"readable but not writable format supplied" in withFullJellyFile { j =>
343-
withEmptyJenaFile(
344-
testCode = { q =>
345-
val exception =
346-
intercept[ExitException] {
347-
RdfFromJelly.runTestCommand(
348-
List(
349-
"rdf",
350-
"from-jelly",
351-
j,
352-
"--to",
353-
q,
354-
"--out-format",
355-
RdfFormat.RdfXml.cliOptions.head,
356-
),
357-
)
358-
}
359-
val msg = InvalidFormatSpecified(
360-
RdfFormat.RdfXml.cliOptions.head,
361-
RdfFromJellyPrint.validFormatsString,
362-
)
363-
RdfFromJelly.getErrString should include(msg.getMessage)
364-
exception.code should be(1)
365-
},
366-
jenaLang = RDFLanguages.RDFXML,
367-
)
368-
}
369-
370396
"invalid --take-frames argument provided" in {
371397
val e = intercept[ExitException] {
372398
RdfFromJelly.runTestCommand(

0 commit comments

Comments
 (0)