Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 33 additions & 17 deletions src/main/scala/eu/neverblink/jelly/cli/command/rdf/RdfToJelly.scala
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,8 @@ import caseapp.*
import eu.neverblink.jelly.cli.*
import eu.neverblink.jelly.cli.command.rdf.util.*
import eu.neverblink.jelly.cli.command.rdf.util.RdfFormat.*
import eu.ostrzyciel.jelly.convert.jena.riot.JellyLanguage
import eu.neverblink.jelly.cli.util.jena.riot.JellyStreamWriterGraphs
import eu.ostrzyciel.jelly.convert.jena.riot.{JellyFormatVariant, JellyLanguage}
import eu.ostrzyciel.jelly.core.proto.v1.RdfStreamFrame
import org.apache.jena.riot.system.StreamRDFWriter
import org.apache.jena.riot.{Lang, RDFParser, RIOT}
Expand Down Expand Up @@ -98,23 +99,38 @@ object RdfToJelly extends RdfSerDesCommand[RdfToJellyOptions, RdfFormat.Readable
inputStream: InputStream,
outputStream: OutputStream,
): Unit =
val jellyOpt = getOptions.jellySerializationOptions.asRdfStreamOptions
// Configure the writer
val writerContext = RIOT.getContext.copy()
.set(
JellyLanguage.SYMBOL_STREAM_OPTIONS,
getOptions.jellySerializationOptions.asRdfStreamOptions,
)
.set(JellyLanguage.SYMBOL_FRAME_SIZE, getOptions.rowsPerFrame)
.set(
JellyLanguage.SYMBOL_ENABLE_NAMESPACE_DECLARATIONS,
getOptions.enableNamespaceDeclarations,
)
.set(JellyLanguage.SYMBOL_DELIMITED_OUTPUT, getOptions.delimited)
val jellyWriter = StreamRDFWriter.getWriterStream(
outputStream,
JellyLanguage.JELLY,
writerContext,
)
val jellyWriter =
if jellyOpt.physicalType.isGraphs then
// GRAPHS
JellyStreamWriterGraphs(
JellyFormatVariant(
opt = jellyOpt,
frameSize = getOptions.rowsPerFrame,
enableNamespaceDeclarations = getOptions.enableNamespaceDeclarations,
delimited = getOptions.delimited,
),
out = outputStream,
)
else
// TRIPLES or QUADS
val writerContext = RIOT.getContext.copy()
.set(
JellyLanguage.SYMBOL_STREAM_OPTIONS,
jellyOpt,
)
.set(JellyLanguage.SYMBOL_FRAME_SIZE, getOptions.rowsPerFrame)
.set(
JellyLanguage.SYMBOL_ENABLE_NAMESPACE_DECLARATIONS,
getOptions.enableNamespaceDeclarations,
)
.set(JellyLanguage.SYMBOL_DELIMITED_OUTPUT, getOptions.delimited)
StreamRDFWriter.getWriterStream(
outputStream,
JellyLanguage.JELLY,
writerContext,
)
RDFParser.source(inputStream).lang(jenaLang).parse(jellyWriter)

/** Convert Jelly text to Jelly binary.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ package eu.neverblink.jelly.cli.command.rdf.util

import caseapp.*
import eu.neverblink.jelly.cli.InvalidArgument
import eu.ostrzyciel.jelly.core.proto.v1.{LogicalStreamType, RdfStreamOptions}
import eu.ostrzyciel.jelly.core.proto.v1.{LogicalStreamType, PhysicalStreamType, RdfStreamOptions}
import eu.ostrzyciel.jelly.core.{JellyOptions, LogicalStreamTypeFactory}

/** Options for serializing in Jelly-RDF */
Expand All @@ -27,6 +27,11 @@ case class RdfJellySerializationOptions(
"Maximum size of the datatype lookup table. Default: " + JellyOptions.bigStrict.maxDatatypeTableSize,
)
`opt.maxDatatypeTableSize`: Int = JellyOptions.bigStrict.maxDatatypeTableSize,
@HelpMessage(
"Physical stream type. One of: TRIPLES, QUADS, GRAPHS. " +
"Default: either TRIPLES or QUADS, depending on the input format.",
)
`opt.physicalType`: Option[String] = None,
@HelpMessage(
"Logical (RDF-STaX-based) stream type. This can be either a name like " +
"`FLAT_QUADS` or a full IRI like `https://w3id.org/stax/ontology#flatQuadStream`. " +
Expand All @@ -52,12 +57,24 @@ case class RdfJellySerializationOptions(
`opt.logicalType`.get,
Some("Logical type must be either a full RDF-STaX IRI or a name like `FLAT_QUADS`"),
)
val physicalType = `opt.physicalType`.map(_.trim.toUpperCase) match
case Some("TRIPLES") => PhysicalStreamType.TRIPLES
case Some("QUADS") => PhysicalStreamType.QUADS
case Some("GRAPHS") => PhysicalStreamType.GRAPHS
case Some(x) =>
throw InvalidArgument(
"--opt.physical-type",
x,
Some("Physical type must be one of: TRIPLES, QUADS, GRAPHS"),
)
case None => PhysicalStreamType.UNSPECIFIED
RdfStreamOptions(
streamName = `opt.streamName`,
generalizedStatements = `opt.generalizedStatements`,
rdfStar = `opt.rdfStar`,
maxNameTableSize = `opt.maxNameTableSize`,
maxPrefixTableSize = `opt.maxPrefixTableSize`,
maxDatatypeTableSize = `opt.maxDatatypeTableSize`,
physicalType = physicalType,
logicalType = logicalType.getOrElse(LogicalStreamType.UNSPECIFIED),
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
package eu.neverblink.jelly.cli.util.jena.riot

import eu.ostrzyciel.jelly.convert.jena.JenaConverterFactory
import eu.ostrzyciel.jelly.convert.jena.riot.JellyFormatVariant
import eu.ostrzyciel.jelly.core.ProtoEncoder
import eu.ostrzyciel.jelly.core.proto.v1.{RdfStreamFrame, RdfStreamRow}
import org.apache.jena.graph.{Node, Triple}
import org.apache.jena.riot.system.StreamRDF
import org.apache.jena.sparql.core.Quad

import java.io.OutputStream
import scala.collection.mutable.ListBuffer

/** A stream writer that writes RDF data in Jelly format, using the GRAPHS physical stream type.
*
* JellyStreamWriter in jelly-jena only supports TRIPLES and QUADS physical stream types, so this
* is needed to support the remaining GRAPHS physical stream type.
*/
final class JellyStreamWriterGraphs(opt: JellyFormatVariant, out: OutputStream) extends StreamRDF:
private val buffer: ListBuffer[RdfStreamRow] = new ListBuffer[RdfStreamRow]()
// We don't set any options here – it is the responsibility of the caller to set
// a valid stream type (GRAPHS).
private val encoder = JenaConverterFactory.encoder(
ProtoEncoder.Params(
opt.opt,
opt.enableNamespaceDeclarations,
Some(buffer),
),
)
private var currentGraph: Node = null

// No need to handle this, the encoder will emit the header automatically anyway
override def start(): Unit = ()

override def triple(triple: Triple): Unit =
handleGraph(Quad.defaultGraphIRI)
encoder.addTripleStatement(triple)
if opt.delimited && buffer.size >= opt.frameSize then flushBuffer()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this logic could be moved to a separate method (to make sure both conditions are always correctly checked in the if)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Eh, that's how it is in the Jena original. Let's keep it as-is.


override def quad(quad: Quad): Unit =
handleGraph(quad.getGraph)
encoder.addTripleStatement(
quad.getSubject,
quad.getPredicate,
quad.getObject,
)
if opt.delimited && buffer.size >= opt.frameSize then flushBuffer()

// Not supported
override def base(base: String): Unit = ()

override def prefix(prefix: String, iri: String): Unit =
if opt.enableNamespaceDeclarations then
encoder.declareNamespace(prefix, iri)
if opt.delimited && buffer.size >= opt.frameSize then flushBuffer()

private def handleGraph(graph: Node): Unit =
if currentGraph == null then
// First graph in the stream
encoder.startGraph(graph)
currentGraph = graph
else if Quad.isDefaultGraph(currentGraph) then
if !Quad.isDefaultGraph(graph) then
// We are switching default -> named
encoder.endGraph()
encoder.startGraph(graph)
currentGraph = graph
else if Quad.isDefaultGraph(graph) || graph != currentGraph then
// We are switching named -> named or named -> default
encoder.endGraph()
encoder.startGraph(graph)
currentGraph = graph

// Flush the buffer and finish the stream
override def finish(): Unit =
if currentGraph != null then
encoder.endGraph()
currentGraph = null
if !opt.delimited then
// Non-delimited variant – whole stream in one frame
val frame = RdfStreamFrame(rows = buffer.toList)
frame.writeTo(out)
else if buffer.nonEmpty then flushBuffer()
out.flush()

private def flushBuffer(): Unit =
val frame = RdfStreamFrame(rows = buffer.toList)
frame.writeDelimitedTo(out)
buffer.clear()
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package eu.neverblink.jelly.cli.command.helpers

import eu.ostrzyciel.jelly.convert.jena.riot.JellyLanguage
import eu.ostrzyciel.jelly.core.proto.v1.RdfStreamFrame
import org.apache.jena.query.{Dataset, DatasetFactory}
import org.apache.jena.rdf.model.{Model, ModelFactory, ResourceFactory}
import org.apache.jena.riot.{Lang, RDFDataMgr, RDFLanguages}

Expand All @@ -14,14 +15,16 @@ object DataGenHelper:
/** This method generates a triple model with nTriples
* @param nTriples
* number of triples to generate
* @param differentiator
* string to include in iris, to make sure the generated models are different
* @return
* Model
*/
def generateTripleModel(nTriples: Int): Model =
def generateTripleModel(nTriples: Int, differentiator: String = ""): Model =
val model = ModelFactory.createDefaultModel()
val subStr = "http://example.org/subject/index"
val predStr = "http://example.org/predicate/index"
val objStr = "http://example.org/object/index"
val subStr = f"http://example.org/subject/index$differentiator"
val predStr = f"http://example.org/predicate/index$differentiator"
val objStr = f"http://example.org/object/index$differentiator"
val tripleList = (1 to nTriples).map { i =>
val sub = ResourceFactory.createResource(s"$subStr$i")
val pred = ResourceFactory.createProperty(s"$predStr$i")
Expand All @@ -31,6 +34,26 @@ object DataGenHelper:
}
model

/** This method generates an RDF dataset with nGraphs and nTriplesPerGraph
* @param nGraphs
* number of named graphs to generate
* @param nTriplesPerGraph
* number of triples per graph to generate
* @param differentiator
* string to include in iris, to make sure the generated datasets are different
* @return
* Dataset
*/
def generateDataset(nGraphs: Int, nTriplesPerGraph: Int, differentiator: String): Dataset =
val dataset =
DatasetFactory.create(generateTripleModel(nTriplesPerGraph, f"/${differentiator}default_"))
for i <- 1 to nGraphs do
val model = generateTripleModel(nTriplesPerGraph, f"/${i}_")
val graphName =
ResourceFactory.createResource(s"http://example.org/graph/$differentiator$i")
dataset.addNamedModel(graphName, model)
dataset

/** This method generates a Jelly byte array
*
* @param nTriples
Expand Down Expand Up @@ -69,10 +92,16 @@ object DataGenHelper:
/** This method generates a NQuad string with nTriples
* @param nTriples
* number of triples to generate
* @param jenaLang
* the language to use for the output
* @return
* String
*/
def generateJenaString(nTriples: Int, jenaLang: Lang = RDFLanguages.NQUADS): String =
def generateJenaString(
nTriples: Int,
jenaLang: Lang = RDFLanguages.NQUADS,
dataset: Boolean = false,
): String =
val model = generateTripleModel(nTriples)
val outputStream = new ByteArrayOutputStream()
RDFDataMgr.write(outputStream, model, jenaLang)
Expand All @@ -94,3 +123,28 @@ object DataGenHelper:
RDFDataMgr.write(outputStream, model, jenaLang)
val nQuadStream = new ByteArrayInputStream(outputStream.toByteArray)
nQuadStream

/** This method generates a serialized RDF dataset input stream
*
* @param nGraphs
* number of named graphs to generate
* @param nTriples
* number of triples to generate per graph
* @param jenaLang
* the language to use for the output
* @param differentiator
* string to include in iris, to make sure the generated datasets are different
* @return
* ByteArrayInputStream
*/
def generateJenaInputStreamDataset(
nGraphs: Int,
nTriples: Int,
jenaLang: Lang = RDFLanguages.NQUADS,
differentiator: String = "",
): ByteArrayInputStream =
val dataset = generateDataset(nGraphs, nTriples, differentiator)
val outputStream = new ByteArrayOutputStream()
RDFDataMgr.write(outputStream, dataset, jenaLang)
val nQuadStream = new ByteArrayInputStream(outputStream.toByteArray)
nQuadStream
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import org.scalatest.matchers.should.Matchers
import org.scalatest.wordspec.AnyWordSpec

import java.io.{ByteArrayInputStream, FileInputStream, InputStream}
import scala.jdk.CollectionConverters.*
import scala.util.Using

class RdfToJellySpec extends AnyWordSpec with TestFixtureHelper with Matchers:
Expand Down Expand Up @@ -97,6 +98,46 @@ class RdfToJellySpec extends AnyWordSpec with TestFixtureHelper with Matchers:
ds.getDefaultGraph.size() should be(4) // 4 triples in the default graph
}

"input stream to output stream, GRAPHS stream type, RDF dataset" in {
val inputStream = DataGenHelper.generateJenaInputStreamDataset(
10,
testCardinality,
RDFLanguages.NQUADS,
)
RdfToJelly.setStdIn(inputStream)
val (out, err) = RdfToJelly.runTestCommand(
List("rdf", "to-jelly", "--in-format=nq", "--opt.physical-type=GRAPHS"),
)
val newIn = new ByteArrayInputStream(RdfToJelly.getOutBytes)
val ds = DatasetGraphFactory.create()
RDFParser.source(newIn).lang(JellyLanguage.JELLY).parse(ds)
ds.size() should be(10) // 10 named graphs
ds.getDefaultGraph.size() should be(testCardinality)
for gn <- ds.listGraphNodes().asScala do ds.getGraph(gn).size() should be(testCardinality)
}

"input stream to output stream, GRAPHS stream type, 5 RDF datasets" in {
val bytes = (1 to 5).map(i =>
DataGenHelper.generateJenaInputStreamDataset(
4,
testCardinality,
RDFLanguages.NQUADS,
f"dataset$i/",
).readAllBytes(),
).foldLeft(Array.empty[Byte])(_ ++ _)
val inputStream = new ByteArrayInputStream(bytes)
RdfToJelly.setStdIn(inputStream)
val (out, err) = RdfToJelly.runTestCommand(
List("rdf", "to-jelly", "--in-format=nq", "--opt.physical-type=GRAPHS"),
)
val newIn = new ByteArrayInputStream(RdfToJelly.getOutBytes)
val ds = DatasetGraphFactory.create()
RDFParser.source(newIn).lang(JellyLanguage.JELLY).parse(ds)
ds.size() should be(20)
ds.getDefaultGraph.size() should be(testCardinality * 5)
for gn <- ds.listGraphNodes().asScala do ds.getGraph(gn).size() should be(testCardinality)
}

"an input stream to file" in withEmptyJellyFile { j =>
val input = DataGenHelper.generateJenaInputStream(testCardinality)
RdfToJelly.setStdIn(input)
Expand Down Expand Up @@ -458,5 +499,16 @@ class RdfToJellySpec extends AnyWordSpec with TestFixtureHelper with Matchers:
cause.message should include("name table size of 5 ")
e.code should be(1)
}

"unknown physical type specified" in withFullJenaFile { f =>
val e = intercept[ExitException] {
RdfToJelly.runTestCommand(List("rdf", "to-jelly", f, "--opt.physical-type=UNKNOWN"))
}
e.cause.get shouldBe a[InvalidArgument]
val cause = e.cause.get.asInstanceOf[InvalidArgument]
cause.argument should be("--opt.physical-type")
cause.argumentValue should be("UNKNOWN")
e.code should be(1)
}
}
}