diff --git a/src/main/scala/eu/neverblink/jelly/cli/command/rdf/RdfFromJelly.scala b/src/main/scala/eu/neverblink/jelly/cli/command/rdf/RdfFromJelly.scala index 42336a9..9aac84a 100644 --- a/src/main/scala/eu/neverblink/jelly/cli/command/rdf/RdfFromJelly.scala +++ b/src/main/scala/eu/neverblink/jelly/cli/command/rdf/RdfFromJelly.scala @@ -5,18 +5,20 @@ import eu.neverblink.jelly.cli.* import eu.neverblink.jelly.cli.command.rdf.util.* import eu.neverblink.jelly.cli.command.rdf.util.RdfFormat.* import eu.neverblink.jelly.cli.util.args.IndexRange +import eu.neverblink.jelly.cli.util.jena.StreamRdfBatchWriter import eu.neverblink.jelly.convert.jena.JenaConverterFactory import eu.neverblink.jelly.core.JellyOptions import eu.neverblink.jelly.core.RdfHandler.AnyStatementHandler import eu.neverblink.jelly.core.proto.v1.RdfStreamFrame import eu.neverblink.jelly.core.proto.google.v1 as google import org.apache.jena.graph.{Node, Triple} -import org.apache.jena.riot.Lang +import org.apache.jena.riot.system.StreamRDF import org.apache.jena.riot.system.StreamRDFWriter import org.apache.jena.sparql.core.Quad import java.io.{InputStream, OutputStream} import scala.jdk.CollectionConverters.* +import eu.neverblink.jelly.cli.util.jena.StreamRdfCombiningBatchWriter object RdfFromJellyPrint extends RdfCommandPrintUtil[RdfFormat.Writeable]: override val defaultFormat: RdfFormat = RdfFormat.NQuads @@ -27,7 +29,10 @@ object RdfFromJellyPrint extends RdfCommandPrintUtil[RdfFormat.Writeable]: "If no output file is specified, the output is written to stdout.\n" + "If an error is detected, the program will exit with a non-zero code.\n" + "Otherwise, the program will exit with code 0.\n" + - "Note: this command works in a streaming manner and scales well to large files", + "Note: this command works in a streaming manner where possible and scales well to\n" + + "large files. Non-streaming formats (e.g. RDF/XML) by default work on a\n" + + "frame-by-frame basis, but they can be combined into one dataset with the\n" + + "--combine option. RDF/XML will only serialize the default model.", ) @ArgsName("") case class RdfFromJellyOptions( @@ -47,6 +52,11 @@ case class RdfFromJellyOptions( IndexRange.helpText, ) takeFrames: String = "", + @HelpMessage( + "Add to combine the results into one dataset, when using a non-streaming output format. " + + "Ignored otherwise. Take care with input size, as this option will load everything into memory.", + ) + combine: Boolean = false, ) extends HasJellyCommandOptions object RdfFromJelly extends RdfSerDesCommand[RdfFromJellyOptions, RdfFormat.Writeable]: @@ -58,7 +68,7 @@ object RdfFromJelly extends RdfSerDesCommand[RdfFromJellyOptions, RdfFormat.Writ lazy val printUtil: RdfCommandPrintUtil[RdfFormat.Writeable] = RdfFromJellyPrint val defaultAction: (InputStream, OutputStream) => Unit = - jellyToLang(RdfFormat.NQuads.jenaLang, _, _) + (in, out) => jellyToLang(in, StreamRDFWriter.getWriterStream(out, RdfFormat.NQuads.jenaLang)) private def takeFrames: IndexRange = IndexRange(getOptions.takeFrames, "--take-frames") @@ -72,9 +82,16 @@ object RdfFromJelly extends RdfSerDesCommand[RdfFromJellyOptions, RdfFormat.Writ override def matchFormatToAction( format: RdfFormat.Writeable, ): Option[(InputStream, OutputStream) => Unit] = - format match - case j: RdfFormat.Jena.Writeable => Some(jellyToLang(j.jenaLang, _, _)) - case RdfFormat.JellyText => Some(jellyBinaryToText) + (format, getOptions.combine) match + case (j: RdfFormat.Jena.StreamWriteable, _) => + Some((in, out) => jellyToLang(in, StreamRDFWriter.getWriterStream(out, j.jenaLang))) + case (j: RdfFormat.Jena.BatchWriteable, true) => + Some((in, out) => + StreamRdfCombiningBatchWriter(out, j.jenaLang).runAndOutput(x => jellyToLang(in, x)), + ) + case (j: RdfFormat.Jena.BatchWriteable, false) => + Some((in, out) => jellyToLang(in, StreamRdfBatchWriter(out, j.jenaLang))) + case (RdfFormat.JellyText, _) => Some(jellyBinaryToText) /** This method reads the Jelly file, rewrites it to specified format and writes it to some output * stream @@ -86,11 +103,9 @@ object RdfFromJelly extends RdfSerDesCommand[RdfFromJellyOptions, RdfFormat.Writ * OutputStream */ private def jellyToLang( - jenaLang: Lang, inputStream: InputStream, - outputStream: OutputStream, + writer: StreamRDF, ): Unit = - val writer = StreamRDFWriter.getWriterStream(outputStream, jenaLang) // Whether the output is active at this moment var outputEnabled = false val handler = new AnyStatementHandler[Node] { diff --git a/src/main/scala/eu/neverblink/jelly/cli/command/rdf/util/RdfFormat.scala b/src/main/scala/eu/neverblink/jelly/cli/command/rdf/util/RdfFormat.scala index 42cb8a2..3a2d7b7 100644 --- a/src/main/scala/eu/neverblink/jelly/cli/command/rdf/util/RdfFormat.scala +++ b/src/main/scala/eu/neverblink/jelly/cli/command/rdf/util/RdfFormat.scala @@ -16,45 +16,46 @@ object RdfFormat: val jenaLang: Lang object Jena: - sealed trait Writeable extends Jena, RdfFormat.Writeable + sealed trait StreamWriteable extends Jena, RdfFormat.Writeable sealed trait Readable extends Jena, RdfFormat.Readable + sealed trait BatchWriteable extends Jena, RdfFormat.Writeable - case object NQuads extends RdfFormat.Jena.Writeable, RdfFormat.Jena.Readable: + case object NQuads extends RdfFormat.Jena.StreamWriteable, RdfFormat.Jena.Readable: override val fullName: String = "N-Quads" override val cliOptions: List[String] = List("nq", "nquads") override val jenaLang: Lang = RDFLanguages.NQUADS - case object NTriples extends RdfFormat.Jena.Writeable, RdfFormat.Jena.Readable: + case object NTriples extends RdfFormat.Jena.StreamWriteable, RdfFormat.Jena.Readable: override val fullName: String = "N-Triples" override val cliOptions: List[String] = List("nt", "ntriples") override val jenaLang: Lang = RDFLanguages.NTRIPLES - case object Turtle extends RdfFormat.Jena.Writeable, RdfFormat.Jena.Readable: + case object Turtle extends RdfFormat.Jena.StreamWriteable, RdfFormat.Jena.Readable: override val fullName: String = "Turtle" override val cliOptions: List[String] = List("ttl", "turtle") override val jenaLang: Lang = RDFLanguages.TURTLE - case object TriG extends RdfFormat.Jena.Writeable, RdfFormat.Jena.Readable: + case object TriG extends RdfFormat.Jena.StreamWriteable, RdfFormat.Jena.Readable: override val fullName: String = "TriG" override val cliOptions: List[String] = List("trig") override val jenaLang: Lang = RDFLanguages.TRIG - case object RdfProto extends RdfFormat.Jena.Writeable, RdfFormat.Jena.Readable: + case object RdfProto extends RdfFormat.Jena.StreamWriteable, RdfFormat.Jena.Readable: override val fullName: String = "RDF Protobuf" override val cliOptions: List[String] = List("jenaproto", "jena-proto") override val jenaLang: Lang = RDFLanguages.RDFPROTO - case object Thrift extends RdfFormat.Jena.Writeable, RdfFormat.Jena.Readable: + case object Thrift extends RdfFormat.Jena.StreamWriteable, RdfFormat.Jena.Readable: override val fullName: String = "RDF Thrift" override val cliOptions: List[String] = List("jenathrift", "jena-thrift") override val jenaLang: Lang = RDFLanguages.RDFTHRIFT - case object RdfXml extends RdfFormat.Jena.Readable: + case object RdfXml extends RdfFormat.Jena.Readable, RdfFormat.Jena.BatchWriteable: override val fullName: String = "RDF/XML" override val cliOptions: List[String] = List("rdfxml", "rdf-xml") override val jenaLang: Lang = RDFLanguages.RDFXML - case object JsonLd extends RdfFormat.Jena.Readable: + case object JsonLd extends RdfFormat.Jena.Readable, RdfFormat.Jena.BatchWriteable: override val fullName: String = "JSON-LD" override val cliOptions: List[String] = List("jsonld", "json-ld") override val jenaLang: Lang = RDFLanguages.JSONLD diff --git a/src/main/scala/eu/neverblink/jelly/cli/util/jena/StreamRdfBatchWriter.scala b/src/main/scala/eu/neverblink/jelly/cli/util/jena/StreamRdfBatchWriter.scala new file mode 100644 index 0000000..95253a9 --- /dev/null +++ b/src/main/scala/eu/neverblink/jelly/cli/util/jena/StreamRdfBatchWriter.scala @@ -0,0 +1,36 @@ +package eu.neverblink.jelly.cli.util.jena + +import org.apache.jena.riot.system.StreamRDF +import org.apache.jena.sparql.core.Quad +import org.apache.jena.graph.Triple +import org.apache.jena.riot.system.StreamRDFLib +import java.io.OutputStream +import org.apache.jena.riot.Lang +import org.apache.jena.riot.RDFDataMgr +import org.apache.jena.query.DatasetFactory +import org.apache.jena.query.Dataset + +/** A StreamRDF implementation that collects everything into a Model. When finishing, formats + * everything according to the lang, and emits it to the outputStream. This is meant to be a + * fallback for non-streaming RDF formats, as it requires all data to be loaded in memory. + */ +class StreamRdfBatchWriter(val outputStream: OutputStream, val lang: Lang) extends StreamRDF: + protected val dataset: Dataset = DatasetFactory.create() + protected val datasetStream: StreamRDF = StreamRDFLib.dataset(dataset.asDatasetGraph()) + override def quad(quad: Quad): Unit = datasetStream.quad(quad) + override def triple(triple: Triple): Unit = datasetStream.triple(triple) + override def prefix(prefix: String, iri: String): Unit = datasetStream.prefix(prefix, iri) + override def base(base: String): Unit = datasetStream.base(base) + override def finish(): Unit = writeOutput() + override def start(): Unit = () + def writeOutput(): Unit = + if lang == Lang.RDFXML then RDFDataMgr.write(outputStream, dataset.getDefaultModel, lang) + else RDFDataMgr.write(outputStream, dataset, lang) + def runAndOutput(runnable: StreamRDF => Unit): Unit = { + runnable(this) + writeOutput() + } + +class StreamRdfCombiningBatchWriter(outputStream: OutputStream, lang: Lang) + extends StreamRdfBatchWriter(outputStream, lang): + override def finish(): Unit = () diff --git a/src/test/scala/eu/neverblink/jelly/cli/command/helpers/DataGenHelper.scala b/src/test/scala/eu/neverblink/jelly/cli/command/helpers/DataGenHelper.scala index 3933bb7..df96898 100644 --- a/src/test/scala/eu/neverblink/jelly/cli/command/helpers/DataGenHelper.scala +++ b/src/test/scala/eu/neverblink/jelly/cli/command/helpers/DataGenHelper.scala @@ -148,3 +148,14 @@ object DataGenHelper: RDFDataMgr.write(outputStream, dataset, jenaLang) val nQuadStream = new ByteArrayInputStream(outputStream.toByteArray) nQuadStream + + def generateJellyInputStreamDataset( + nGraphs: Int, + nTriplesPerGraph: Int, + differentiator: String, + ): ByteArrayInputStream = + val model = generateDataset(nGraphs, nTriplesPerGraph, differentiator) + val outputStream = new ByteArrayOutputStream() + RDFDataMgr.write(outputStream, model, JellyLanguage.JELLY) + val jellyStream = new ByteArrayInputStream(outputStream.toByteArray) + jellyStream diff --git a/src/test/scala/eu/neverblink/jelly/cli/command/rdf/RdfFromJellySpec.scala b/src/test/scala/eu/neverblink/jelly/cli/command/rdf/RdfFromJellySpec.scala index 1a97cad..903c1ad 100644 --- a/src/test/scala/eu/neverblink/jelly/cli/command/rdf/RdfFromJellySpec.scala +++ b/src/test/scala/eu/neverblink/jelly/cli/command/rdf/RdfFromJellySpec.scala @@ -6,7 +6,7 @@ import eu.neverblink.jelly.cli.command.helpers.* import eu.neverblink.jelly.cli.command.rdf.util.RdfFormat import eu.neverblink.jelly.core.proto.v1.{PhysicalStreamType, RdfStreamFrame} import eu.neverblink.jelly.core.{JellyOptions, JellyTranscoderFactory} -import org.apache.jena.riot.RDFLanguages +import org.apache.jena.query.DatasetFactory import org.scalatest.matchers.should.Matchers import org.scalatest.wordspec.AnyWordSpec @@ -15,6 +15,8 @@ import java.nio.file.attribute.PosixFilePermissions import java.nio.file.{Files, Paths} import scala.io.Source import scala.util.Using +import org.apache.jena.riot.RDFDataMgr +import org.apache.jena.rdf.model.ModelFactory class RdfFromJellySpec extends AnyWordSpec with Matchers with TestFixtureHelper: @@ -217,6 +219,58 @@ class RdfFromJellySpec extends AnyWordSpec with Matchers with TestFixtureHelper: } } + "handle conversion of Jelly binary to various formats" when { + for (lang, header) <- Seq( + (RdfFormat.JsonLd, "\\{\n {4}\"@graph\":".r), + (RdfFormat.RdfXml, " - withEmptyJenaFile( - testCode = { q => - val exception = - intercept[ExitException] { - RdfFromJelly.runTestCommand( - List( - "rdf", - "from-jelly", - j, - "--to", - q, - "--out-format", - RdfFormat.RdfXml.cliOptions.head, - ), - ) - } - val msg = InvalidFormatSpecified( - RdfFormat.RdfXml.cliOptions.head, - RdfFromJellyPrint.validFormatsString, - ) - RdfFromJelly.getErrString should include(msg.getMessage) - exception.code should be(1) - }, - jenaLang = RDFLanguages.RDFXML, - ) - } - "invalid --take-frames argument provided" in { val e = intercept[ExitException] { RdfFromJelly.runTestCommand(