diff --git a/src/main/scala/eu/neverblink/jelly/cli/command/rdf/RdfToJelly.scala b/src/main/scala/eu/neverblink/jelly/cli/command/rdf/RdfToJelly.scala index 398e88d..c4f223c 100644 --- a/src/main/scala/eu/neverblink/jelly/cli/command/rdf/RdfToJelly.scala +++ b/src/main/scala/eu/neverblink/jelly/cli/command/rdf/RdfToJelly.scala @@ -4,8 +4,9 @@ import caseapp.* import eu.neverblink.jelly.cli.* import eu.neverblink.jelly.cli.command.rdf.util.* import eu.neverblink.jelly.cli.command.rdf.util.RdfFormat.* -import eu.ostrzyciel.jelly.convert.jena.riot.JellyLanguage -import eu.ostrzyciel.jelly.core.proto.v1.RdfStreamFrame +import eu.neverblink.jelly.cli.util.jena.riot.JellyStreamWriterGraphs +import eu.ostrzyciel.jelly.convert.jena.riot.{JellyFormatVariant, JellyLanguage} +import eu.ostrzyciel.jelly.core.proto.v1.{LogicalStreamType, RdfStreamFrame} import org.apache.jena.riot.system.StreamRDFWriter import org.apache.jena.riot.{Lang, RDFParser, RIOT} @@ -98,23 +99,43 @@ object RdfToJelly extends RdfSerDesCommand[RdfToJellyOptions, RdfFormat.Readable inputStream: InputStream, outputStream: OutputStream, ): Unit = + val jellyOpt = getOptions.jellySerializationOptions.asRdfStreamOptions // Configure the writer - val writerContext = RIOT.getContext.copy() - .set( - JellyLanguage.SYMBOL_STREAM_OPTIONS, - getOptions.jellySerializationOptions.asRdfStreamOptions, - ) - .set(JellyLanguage.SYMBOL_FRAME_SIZE, getOptions.rowsPerFrame) - .set( - JellyLanguage.SYMBOL_ENABLE_NAMESPACE_DECLARATIONS, - getOptions.enableNamespaceDeclarations, - ) - .set(JellyLanguage.SYMBOL_DELIMITED_OUTPUT, getOptions.delimited) - val jellyWriter = StreamRDFWriter.getWriterStream( - outputStream, - JellyLanguage.JELLY, - writerContext, - ) + val jellyWriter = + if jellyOpt.physicalType.isGraphs then + // GRAPHS + JellyStreamWriterGraphs( + JellyFormatVariant( + // By default, set the logical stream type to FLAT_QUADS (this is what JellyStreamWriter + // in jelly-jena does for physical type QUADS). + opt = jellyOpt.withLogicalType( + if jellyOpt.logicalType.isUnspecified then LogicalStreamType.FLAT_QUADS + else jellyOpt.logicalType, + ), + frameSize = getOptions.rowsPerFrame, + enableNamespaceDeclarations = getOptions.enableNamespaceDeclarations, + delimited = getOptions.delimited, + ), + out = outputStream, + ) + else + // TRIPLES or QUADS + val writerContext = RIOT.getContext.copy() + .set( + JellyLanguage.SYMBOL_STREAM_OPTIONS, + jellyOpt, + ) + .set(JellyLanguage.SYMBOL_FRAME_SIZE, getOptions.rowsPerFrame) + .set( + JellyLanguage.SYMBOL_ENABLE_NAMESPACE_DECLARATIONS, + getOptions.enableNamespaceDeclarations, + ) + .set(JellyLanguage.SYMBOL_DELIMITED_OUTPUT, getOptions.delimited) + StreamRDFWriter.getWriterStream( + outputStream, + JellyLanguage.JELLY, + writerContext, + ) RDFParser.source(inputStream).lang(jenaLang).parse(jellyWriter) /** Convert Jelly text to Jelly binary. diff --git a/src/main/scala/eu/neverblink/jelly/cli/command/rdf/util/RdfJellySerializationOptions.scala b/src/main/scala/eu/neverblink/jelly/cli/command/rdf/util/RdfJellySerializationOptions.scala index 68ca9df..2a10250 100644 --- a/src/main/scala/eu/neverblink/jelly/cli/command/rdf/util/RdfJellySerializationOptions.scala +++ b/src/main/scala/eu/neverblink/jelly/cli/command/rdf/util/RdfJellySerializationOptions.scala @@ -2,7 +2,7 @@ package eu.neverblink.jelly.cli.command.rdf.util import caseapp.* import eu.neverblink.jelly.cli.InvalidArgument -import eu.ostrzyciel.jelly.core.proto.v1.{LogicalStreamType, RdfStreamOptions} +import eu.ostrzyciel.jelly.core.proto.v1.{LogicalStreamType, PhysicalStreamType, RdfStreamOptions} import eu.ostrzyciel.jelly.core.{JellyOptions, LogicalStreamTypeFactory} /** Options for serializing in Jelly-RDF */ @@ -27,6 +27,11 @@ case class RdfJellySerializationOptions( "Maximum size of the datatype lookup table. Default: " + JellyOptions.bigStrict.maxDatatypeTableSize, ) `opt.maxDatatypeTableSize`: Int = JellyOptions.bigStrict.maxDatatypeTableSize, + @HelpMessage( + "Physical stream type. One of: TRIPLES, QUADS, GRAPHS. " + + "Default: either TRIPLES or QUADS, depending on the input format.", + ) + `opt.physicalType`: Option[String] = None, @HelpMessage( "Logical (RDF-STaX-based) stream type. This can be either a name like " + "`FLAT_QUADS` or a full IRI like `https://w3id.org/stax/ontology#flatQuadStream`. " + @@ -52,6 +57,17 @@ case class RdfJellySerializationOptions( `opt.logicalType`.get, Some("Logical type must be either a full RDF-STaX IRI or a name like `FLAT_QUADS`"), ) + val physicalType = `opt.physicalType`.map(_.trim.toUpperCase) match + case Some("TRIPLES") => PhysicalStreamType.TRIPLES + case Some("QUADS") => PhysicalStreamType.QUADS + case Some("GRAPHS") => PhysicalStreamType.GRAPHS + case Some(x) => + throw InvalidArgument( + "--opt.physical-type", + x, + Some("Physical type must be one of: TRIPLES, QUADS, GRAPHS"), + ) + case None => PhysicalStreamType.UNSPECIFIED RdfStreamOptions( streamName = `opt.streamName`, generalizedStatements = `opt.generalizedStatements`, @@ -59,5 +75,6 @@ case class RdfJellySerializationOptions( maxNameTableSize = `opt.maxNameTableSize`, maxPrefixTableSize = `opt.maxPrefixTableSize`, maxDatatypeTableSize = `opt.maxDatatypeTableSize`, + physicalType = physicalType, logicalType = logicalType.getOrElse(LogicalStreamType.UNSPECIFIED), ) diff --git a/src/main/scala/eu/neverblink/jelly/cli/util/jena/riot/JellyStreamWriterGraphs.scala b/src/main/scala/eu/neverblink/jelly/cli/util/jena/riot/JellyStreamWriterGraphs.scala new file mode 100644 index 0000000..3fa0ce0 --- /dev/null +++ b/src/main/scala/eu/neverblink/jelly/cli/util/jena/riot/JellyStreamWriterGraphs.scala @@ -0,0 +1,89 @@ +package eu.neverblink.jelly.cli.util.jena.riot + +import eu.ostrzyciel.jelly.convert.jena.JenaConverterFactory +import eu.ostrzyciel.jelly.convert.jena.riot.JellyFormatVariant +import eu.ostrzyciel.jelly.core.ProtoEncoder +import eu.ostrzyciel.jelly.core.proto.v1.{RdfStreamFrame, RdfStreamRow} +import org.apache.jena.graph.{Node, Triple} +import org.apache.jena.riot.system.StreamRDF +import org.apache.jena.sparql.core.Quad + +import java.io.OutputStream +import scala.collection.mutable.ListBuffer + +/** A stream writer that writes RDF data in Jelly format, using the GRAPHS physical stream type. + * + * JellyStreamWriter in jelly-jena only supports TRIPLES and QUADS physical stream types, so this + * is needed to support the remaining GRAPHS physical stream type. + */ +final class JellyStreamWriterGraphs(opt: JellyFormatVariant, out: OutputStream) extends StreamRDF: + private val buffer: ListBuffer[RdfStreamRow] = new ListBuffer[RdfStreamRow]() + // We don't set any options here – it is the responsibility of the caller to set + // a valid stream type (GRAPHS). + private val encoder = JenaConverterFactory.encoder( + ProtoEncoder.Params( + opt.opt, + opt.enableNamespaceDeclarations, + Some(buffer), + ), + ) + private var currentGraph: Node = null + + // No need to handle this, the encoder will emit the header automatically anyway + override def start(): Unit = () + + override def triple(triple: Triple): Unit = + handleGraph(Quad.defaultGraphIRI) + encoder.addTripleStatement(triple) + if opt.delimited && buffer.size >= opt.frameSize then flushBuffer() + + override def quad(quad: Quad): Unit = + handleGraph(quad.getGraph) + encoder.addTripleStatement( + quad.getSubject, + quad.getPredicate, + quad.getObject, + ) + if opt.delimited && buffer.size >= opt.frameSize then flushBuffer() + + // Not supported + override def base(base: String): Unit = () + + override def prefix(prefix: String, iri: String): Unit = + if opt.enableNamespaceDeclarations then + encoder.declareNamespace(prefix, iri) + if opt.delimited && buffer.size >= opt.frameSize then flushBuffer() + + private def handleGraph(graph: Node): Unit = + if currentGraph == null then + // First graph in the stream + encoder.startGraph(graph) + currentGraph = graph + else if Quad.isDefaultGraph(currentGraph) then + if !Quad.isDefaultGraph(graph) then + // We are switching default -> named + encoder.endGraph() + encoder.startGraph(graph) + currentGraph = graph + else if Quad.isDefaultGraph(graph) || graph != currentGraph then + // We are switching named -> named or named -> default + encoder.endGraph() + encoder.startGraph(graph) + currentGraph = graph + + // Flush the buffer and finish the stream + override def finish(): Unit = + if currentGraph != null then + encoder.endGraph() + currentGraph = null + if !opt.delimited then + // Non-delimited variant – whole stream in one frame + val frame = RdfStreamFrame(rows = buffer.toList) + frame.writeTo(out) + else if buffer.nonEmpty then flushBuffer() + out.flush() + + private def flushBuffer(): Unit = + val frame = RdfStreamFrame(rows = buffer.toList) + frame.writeDelimitedTo(out) + buffer.clear() diff --git a/src/test/scala/eu/neverblink/jelly/cli/command/helpers/DataGenHelper.scala b/src/test/scala/eu/neverblink/jelly/cli/command/helpers/DataGenHelper.scala index fe396f6..9bfb281 100644 --- a/src/test/scala/eu/neverblink/jelly/cli/command/helpers/DataGenHelper.scala +++ b/src/test/scala/eu/neverblink/jelly/cli/command/helpers/DataGenHelper.scala @@ -2,6 +2,7 @@ package eu.neverblink.jelly.cli.command.helpers import eu.ostrzyciel.jelly.convert.jena.riot.JellyLanguage import eu.ostrzyciel.jelly.core.proto.v1.RdfStreamFrame +import org.apache.jena.query.{Dataset, DatasetFactory} import org.apache.jena.rdf.model.{Model, ModelFactory, ResourceFactory} import org.apache.jena.riot.{Lang, RDFDataMgr, RDFLanguages} @@ -14,14 +15,16 @@ object DataGenHelper: /** This method generates a triple model with nTriples * @param nTriples * number of triples to generate + * @param differentiator + * string to include in iris, to make sure the generated models are different * @return * Model */ - def generateTripleModel(nTriples: Int): Model = + def generateTripleModel(nTriples: Int, differentiator: String = ""): Model = val model = ModelFactory.createDefaultModel() - val subStr = "http://example.org/subject/index" - val predStr = "http://example.org/predicate/index" - val objStr = "http://example.org/object/index" + val subStr = f"http://example.org/subject/index$differentiator" + val predStr = f"http://example.org/predicate/index$differentiator" + val objStr = f"http://example.org/object/index$differentiator" val tripleList = (1 to nTriples).map { i => val sub = ResourceFactory.createResource(s"$subStr$i") val pred = ResourceFactory.createProperty(s"$predStr$i") @@ -31,6 +34,26 @@ object DataGenHelper: } model + /** This method generates an RDF dataset with nGraphs and nTriplesPerGraph + * @param nGraphs + * number of named graphs to generate + * @param nTriplesPerGraph + * number of triples per graph to generate + * @param differentiator + * string to include in iris, to make sure the generated datasets are different + * @return + * Dataset + */ + def generateDataset(nGraphs: Int, nTriplesPerGraph: Int, differentiator: String): Dataset = + val dataset = + DatasetFactory.create(generateTripleModel(nTriplesPerGraph, f"/${differentiator}default_")) + for i <- 1 to nGraphs do + val model = generateTripleModel(nTriplesPerGraph, f"/${i}_") + val graphName = + ResourceFactory.createResource(s"http://example.org/graph/$differentiator$i") + dataset.addNamedModel(graphName, model) + dataset + /** This method generates a Jelly byte array * * @param nTriples @@ -69,10 +92,16 @@ object DataGenHelper: /** This method generates a NQuad string with nTriples * @param nTriples * number of triples to generate + * @param jenaLang + * the language to use for the output * @return * String */ - def generateJenaString(nTriples: Int, jenaLang: Lang = RDFLanguages.NQUADS): String = + def generateJenaString( + nTriples: Int, + jenaLang: Lang = RDFLanguages.NQUADS, + dataset: Boolean = false, + ): String = val model = generateTripleModel(nTriples) val outputStream = new ByteArrayOutputStream() RDFDataMgr.write(outputStream, model, jenaLang) @@ -94,3 +123,28 @@ object DataGenHelper: RDFDataMgr.write(outputStream, model, jenaLang) val nQuadStream = new ByteArrayInputStream(outputStream.toByteArray) nQuadStream + + /** This method generates a serialized RDF dataset input stream + * + * @param nGraphs + * number of named graphs to generate + * @param nTriples + * number of triples to generate per graph + * @param jenaLang + * the language to use for the output + * @param differentiator + * string to include in iris, to make sure the generated datasets are different + * @return + * ByteArrayInputStream + */ + def generateJenaInputStreamDataset( + nGraphs: Int, + nTriples: Int, + jenaLang: Lang = RDFLanguages.NQUADS, + differentiator: String = "", + ): ByteArrayInputStream = + val dataset = generateDataset(nGraphs, nTriples, differentiator) + val outputStream = new ByteArrayOutputStream() + RDFDataMgr.write(outputStream, dataset, jenaLang) + val nQuadStream = new ByteArrayInputStream(outputStream.toByteArray) + nQuadStream diff --git a/src/test/scala/eu/neverblink/jelly/cli/command/rdf/RdfToJellySpec.scala b/src/test/scala/eu/neverblink/jelly/cli/command/rdf/RdfToJellySpec.scala index d1735b8..ebbe451 100644 --- a/src/test/scala/eu/neverblink/jelly/cli/command/rdf/RdfToJellySpec.scala +++ b/src/test/scala/eu/neverblink/jelly/cli/command/rdf/RdfToJellySpec.scala @@ -13,6 +13,7 @@ import org.scalatest.matchers.should.Matchers import org.scalatest.wordspec.AnyWordSpec import java.io.{ByteArrayInputStream, FileInputStream, InputStream} +import scala.jdk.CollectionConverters.* import scala.util.Using class RdfToJellySpec extends AnyWordSpec with TestFixtureHelper with Matchers: @@ -97,6 +98,64 @@ class RdfToJellySpec extends AnyWordSpec with TestFixtureHelper with Matchers: ds.getDefaultGraph.size() should be(4) // 4 triples in the default graph } + "input stream to output stream, GRAPHS stream type, RDF dataset" in { + val inputStream = DataGenHelper.generateJenaInputStreamDataset( + 10, + testCardinality, + RDFLanguages.NQUADS, + ) + RdfToJelly.setStdIn(inputStream) + val (out, err) = RdfToJelly.runTestCommand( + List("rdf", "to-jelly", "--in-format=nq", "--opt.physical-type=GRAPHS"), + ) + val ds = DatasetGraphFactory.create() + val bytes = RdfToJelly.getOutBytes + RDFParser.source(ByteArrayInputStream(bytes)).lang( + JellyLanguage.JELLY, + ).parse(ds) + ds.size() should be(10) // 10 named graphs + ds.getDefaultGraph.size() should be(testCardinality) + for gn <- ds.listGraphNodes().asScala do ds.getGraph(gn).size() should be(testCardinality) + // Check the logical stream type -- should be the default one + val frame = RdfStreamFrame.parseDelimitedFrom(ByteArrayInputStream(bytes)) + .get + frame.rows.head.row.options.logicalType should be(LogicalStreamType.FLAT_QUADS) + } + + "input stream to output stream, GRAPHS stream type, 5 RDF datasets" in { + val bytes = (1 to 5).map(i => + DataGenHelper.generateJenaInputStreamDataset( + 4, + testCardinality, + RDFLanguages.NQUADS, + f"dataset$i/", + ).readAllBytes(), + ).foldLeft(Array.empty[Byte])(_ ++ _) + val inputStream = new ByteArrayInputStream(bytes) + RdfToJelly.setStdIn(inputStream) + val (out, err) = RdfToJelly.runTestCommand( + List( + "rdf", + "to-jelly", + "--in-format=nq", + "--opt.physical-type=GRAPHS", + "--opt.logical-type=DATASETS", + ), + ) + val ds = DatasetGraphFactory.create() + val outBytes = RdfToJelly.getOutBytes + RDFParser.source(ByteArrayInputStream(outBytes)).lang( + JellyLanguage.JELLY, + ).parse(ds) + ds.size() should be(20) + ds.getDefaultGraph.size() should be(testCardinality * 5) + for gn <- ds.listGraphNodes().asScala do ds.getGraph(gn).size() should be(testCardinality) + // Check the logical stream type -- should be DATASETS + val frame = RdfStreamFrame.parseDelimitedFrom(ByteArrayInputStream(outBytes)) + .get + frame.rows.head.row.options.logicalType should be(LogicalStreamType.DATASETS) + } + "an input stream to file" in withEmptyJellyFile { j => val input = DataGenHelper.generateJenaInputStream(testCardinality) RdfToJelly.setStdIn(input) @@ -458,5 +517,16 @@ class RdfToJellySpec extends AnyWordSpec with TestFixtureHelper with Matchers: cause.message should include("name table size of 5 ") e.code should be(1) } + + "unknown physical type specified" in withFullJenaFile { f => + val e = intercept[ExitException] { + RdfToJelly.runTestCommand(List("rdf", "to-jelly", f, "--opt.physical-type=UNKNOWN")) + } + e.cause.get shouldBe a[InvalidArgument] + val cause = e.cause.get.asInstanceOf[InvalidArgument] + cause.argument should be("--opt.physical-type") + cause.argumentValue should be("UNKNOWN") + e.code should be(1) + } } }