Skip to content
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -5,18 +5,20 @@ import eu.neverblink.jelly.cli.*
import eu.neverblink.jelly.cli.command.rdf.util.*
import eu.neverblink.jelly.cli.command.rdf.util.RdfFormat.*
import eu.neverblink.jelly.cli.util.args.IndexRange
import eu.neverblink.jelly.cli.util.jena.StreamRdfBatchWriter
import eu.neverblink.jelly.convert.jena.JenaConverterFactory
import eu.neverblink.jelly.core.JellyOptions
import eu.neverblink.jelly.core.RdfHandler.AnyStatementHandler
import eu.neverblink.jelly.core.proto.v1.RdfStreamFrame
import eu.neverblink.jelly.core.proto.google.v1 as google
import org.apache.jena.graph.{Node, Triple}
import org.apache.jena.riot.Lang
import org.apache.jena.riot.system.StreamRDF
import org.apache.jena.riot.system.StreamRDFWriter
import org.apache.jena.sparql.core.Quad

import java.io.{InputStream, OutputStream}
import scala.jdk.CollectionConverters.*
import eu.neverblink.jelly.cli.util.jena.StreamRdfCombiningBatchWriter

object RdfFromJellyPrint extends RdfCommandPrintUtil[RdfFormat.Writeable]:
override val defaultFormat: RdfFormat = RdfFormat.NQuads
Expand All @@ -27,7 +29,10 @@ object RdfFromJellyPrint extends RdfCommandPrintUtil[RdfFormat.Writeable]:
"If no output file is specified, the output is written to stdout.\n" +
"If an error is detected, the program will exit with a non-zero code.\n" +
"Otherwise, the program will exit with code 0.\n" +
"Note: this command works in a streaming manner and scales well to large files",
"Note: this command works in a streaming manner where possible and scales well to\n" +
"large files. Non-streaming formats (e.g. RDF/XML) by default work on a\n" +
"frame-by-frame basis, but they can be combined into one object with the\n" +
"--combine option.",
)
@ArgsName("<file-to-convert>")
case class RdfFromJellyOptions(
Expand All @@ -47,6 +52,11 @@ case class RdfFromJellyOptions(
IndexRange.helpText,
)
takeFrames: String = "",
@HelpMessage(
"Add to combine the results into one object, when using a non-streaming output format. " +
"Ignored otherwise. Take care with input size, as this option will load everything into memory.",
)
combine: Boolean = false,
) extends HasJellyCommandOptions

object RdfFromJelly extends RdfSerDesCommand[RdfFromJellyOptions, RdfFormat.Writeable]:
Expand All @@ -58,7 +68,7 @@ object RdfFromJelly extends RdfSerDesCommand[RdfFromJellyOptions, RdfFormat.Writ
lazy val printUtil: RdfCommandPrintUtil[RdfFormat.Writeable] = RdfFromJellyPrint

val defaultAction: (InputStream, OutputStream) => Unit =
jellyToLang(RdfFormat.NQuads.jenaLang, _, _)
(in, out) => jellyToLang(in, StreamRDFWriter.getWriterStream(out, RdfFormat.NQuads.jenaLang))

private def takeFrames: IndexRange = IndexRange(getOptions.takeFrames, "--take-frames")

Expand All @@ -72,9 +82,16 @@ object RdfFromJelly extends RdfSerDesCommand[RdfFromJellyOptions, RdfFormat.Writ
override def matchFormatToAction(
format: RdfFormat.Writeable,
): Option[(InputStream, OutputStream) => Unit] =
format match
case j: RdfFormat.Jena.Writeable => Some(jellyToLang(j.jenaLang, _, _))
case RdfFormat.JellyText => Some(jellyBinaryToText)
(format, getOptions.combine) match
case (j: RdfFormat.Jena.StreamWriteable, _) =>
Some((in, out) => jellyToLang(in, StreamRDFWriter.getWriterStream(out, j.jenaLang)))
case (j: RdfFormat.Jena.BatchWriteable, true) =>
Some((in, out) =>
StreamRdfCombiningBatchWriter(out, j.jenaLang).runAndOutput(x => jellyToLang(in, x)),
)
case (j: RdfFormat.Jena.BatchWriteable, false) =>
Some((in, out) => jellyToLang(in, StreamRdfBatchWriter(out, j.jenaLang)))
case (RdfFormat.JellyText, _) => Some(jellyBinaryToText)

/** This method reads the Jelly file, rewrites it to specified format and writes it to some output
* stream
Expand All @@ -86,11 +103,9 @@ object RdfFromJelly extends RdfSerDesCommand[RdfFromJellyOptions, RdfFormat.Writ
* OutputStream
*/
private def jellyToLang(
jenaLang: Lang,
inputStream: InputStream,
outputStream: OutputStream,
writer: StreamRDF,
): Unit =
val writer = StreamRDFWriter.getWriterStream(outputStream, jenaLang)
// Whether the output is active at this moment
var outputEnabled = false
val handler = new AnyStatementHandler[Node] {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,45 +16,46 @@ object RdfFormat:
val jenaLang: Lang

object Jena:
sealed trait Writeable extends Jena, RdfFormat.Writeable
sealed trait StreamWriteable extends Jena, RdfFormat.Writeable
sealed trait Readable extends Jena, RdfFormat.Readable
sealed trait BatchWriteable extends Jena, RdfFormat.Writeable

case object NQuads extends RdfFormat.Jena.Writeable, RdfFormat.Jena.Readable:
case object NQuads extends RdfFormat.Jena.StreamWriteable, RdfFormat.Jena.Readable:
override val fullName: String = "N-Quads"
override val cliOptions: List[String] = List("nq", "nquads")
override val jenaLang: Lang = RDFLanguages.NQUADS

case object NTriples extends RdfFormat.Jena.Writeable, RdfFormat.Jena.Readable:
case object NTriples extends RdfFormat.Jena.StreamWriteable, RdfFormat.Jena.Readable:
override val fullName: String = "N-Triples"
override val cliOptions: List[String] = List("nt", "ntriples")
override val jenaLang: Lang = RDFLanguages.NTRIPLES

case object Turtle extends RdfFormat.Jena.Writeable, RdfFormat.Jena.Readable:
case object Turtle extends RdfFormat.Jena.StreamWriteable, RdfFormat.Jena.Readable:
override val fullName: String = "Turtle"
override val cliOptions: List[String] = List("ttl", "turtle")
override val jenaLang: Lang = RDFLanguages.TURTLE

case object TriG extends RdfFormat.Jena.Writeable, RdfFormat.Jena.Readable:
case object TriG extends RdfFormat.Jena.StreamWriteable, RdfFormat.Jena.Readable:
override val fullName: String = "TriG"
override val cliOptions: List[String] = List("trig")
override val jenaLang: Lang = RDFLanguages.TRIG

case object RdfProto extends RdfFormat.Jena.Writeable, RdfFormat.Jena.Readable:
case object RdfProto extends RdfFormat.Jena.StreamWriteable, RdfFormat.Jena.Readable:
override val fullName: String = "RDF Protobuf"
override val cliOptions: List[String] = List("jenaproto", "jena-proto")
override val jenaLang: Lang = RDFLanguages.RDFPROTO

case object Thrift extends RdfFormat.Jena.Writeable, RdfFormat.Jena.Readable:
case object Thrift extends RdfFormat.Jena.StreamWriteable, RdfFormat.Jena.Readable:
override val fullName: String = "RDF Thrift"
override val cliOptions: List[String] = List("jenathrift", "jena-thrift")
override val jenaLang: Lang = RDFLanguages.RDFTHRIFT

case object RdfXml extends RdfFormat.Jena.Readable:
case object RdfXml extends RdfFormat.Jena.Readable, RdfFormat.Jena.BatchWriteable:
override val fullName: String = "RDF/XML"
override val cliOptions: List[String] = List("rdfxml", "rdf-xml")
override val jenaLang: Lang = RDFLanguages.RDFXML

case object JsonLd extends RdfFormat.Jena.Readable:
case object JsonLd extends RdfFormat.Jena.Readable, RdfFormat.Jena.BatchWriteable:
override val fullName: String = "JSON-LD"
override val cliOptions: List[String] = List("jsonld", "json-ld")
override val jenaLang: Lang = RDFLanguages.JSONLD
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
package eu.neverblink.jelly.cli.util.jena

import org.apache.jena.riot.system.StreamRDF
import org.apache.jena.sparql.core.Quad
import org.apache.jena.graph.Triple
import org.apache.jena.riot.system.StreamRDFLib
import java.io.OutputStream
import org.apache.jena.riot.Lang
import org.apache.jena.riot.RDFDataMgr
import org.apache.jena.query.DatasetFactory
import org.apache.jena.query.Dataset

/** A StreamRDF implementation that collects everything into a Model. When finishing, formats
* everything according to the lang, and emits it to the outputStream. This is meant to be a
* fallback for non-streaming RDF formats, as it requires all data to be loaded in memory.
*/
class StreamRdfBatchWriter(val outputStream: OutputStream, val lang: Lang) extends StreamRDF:
protected val dataset: Dataset = DatasetFactory.create()
protected val datasetStream: StreamRDF = StreamRDFLib.dataset(dataset.asDatasetGraph())
override def quad(quad: Quad): Unit = datasetStream.quad(quad)
override def triple(triple: Triple): Unit = datasetStream.triple(triple)
override def prefix(prefix: String, iri: String): Unit = datasetStream.prefix(prefix, iri)
override def base(base: String): Unit = datasetStream.base(base)
override def finish(): Unit = writeOutput()
override def start(): Unit = ()
def writeOutput(): Unit =
if lang == Lang.RDFXML then RDFDataMgr.write(outputStream, dataset.getDefaultModel, lang)
else RDFDataMgr.write(outputStream, dataset, lang)
def runAndOutput(runnable: StreamRDF => Unit): Unit = {
runnable(this)
writeOutput()
}

class StreamRdfCombiningBatchWriter(outputStream: OutputStream, lang: Lang)
extends StreamRdfBatchWriter(outputStream, lang):
override def finish(): Unit = ()
Original file line number Diff line number Diff line change
Expand Up @@ -148,3 +148,14 @@ object DataGenHelper:
RDFDataMgr.write(outputStream, dataset, jenaLang)
val nQuadStream = new ByteArrayInputStream(outputStream.toByteArray)
nQuadStream

def generateJellyInputStreamDataset(
nGraphs: Int,
nTriplesPerGraph: Int,
differentiator: String,
): ByteArrayInputStream =
val model = generateDataset(nGraphs, nTriplesPerGraph, differentiator)
val outputStream = new ByteArrayOutputStream()
RDFDataMgr.write(outputStream, model, JellyLanguage.JELLY)
val jellyStream = new ByteArrayInputStream(outputStream.toByteArray)
jellyStream
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import eu.neverblink.jelly.cli.command.helpers.*
import eu.neverblink.jelly.cli.command.rdf.util.RdfFormat
import eu.neverblink.jelly.core.proto.v1.{PhysicalStreamType, RdfStreamFrame}
import eu.neverblink.jelly.core.{JellyOptions, JellyTranscoderFactory}
import org.apache.jena.riot.RDFLanguages
import org.apache.jena.query.DatasetFactory
import org.scalatest.matchers.should.Matchers
import org.scalatest.wordspec.AnyWordSpec

Expand All @@ -15,6 +15,8 @@ import java.nio.file.attribute.PosixFilePermissions
import java.nio.file.{Files, Paths}
import scala.io.Source
import scala.util.Using
import org.apache.jena.riot.RDFDataMgr
import org.apache.jena.rdf.model.ModelFactory

class RdfFromJellySpec extends AnyWordSpec with Matchers with TestFixtureHelper:

Expand Down Expand Up @@ -217,6 +219,57 @@ class RdfFromJellySpec extends AnyWordSpec with Matchers with TestFixtureHelper:
}
}

"handle conversion of Jelly binary to various formats" when {
for (lang, header) <- Seq(
(RdfFormat.JsonLd, "\\{\n {4}\"@graph\":".r),
(RdfFormat.RdfXml, "<rdf:RDF".r),
)
do
s"input stream to output ${lang.fullName} stream" in {
val input = DataGenHelper.generateJellyInputStream(testCardinality)
RdfFromJelly.setStdIn(input)
val model = DataGenHelper.generateTripleModel(testCardinality)
val (out, err) = RdfFromJelly.runTestCommand(
List("rdf", "from-jelly", "--out-format", lang.cliOptions.head),
)
val newModel = ModelFactory.createDefaultModel()
RDFDataMgr.read(newModel, new ByteArrayInputStream(out.getBytes()), lang.jenaLang)
model.isIsomorphicWith(newModel) shouldBe true
}

s"dataset input stream to output ${lang.fullName} stream" in {
val input = DataGenHelper.generateJellyInputStreamDataset(2, testCardinality, "")
RdfFromJelly.setStdIn(input)
val dataset = DataGenHelper.generateDataset(2, testCardinality, "")
val (out, err) = RdfFromJelly.runTestCommand(
List("rdf", "from-jelly", "--out-format", lang.cliOptions.head),
)
val newDataset = DatasetFactory.create()
RDFDataMgr.read(newDataset, new ByteArrayInputStream(out.getBytes()), lang.jenaLang)
newDataset.isEmpty shouldBe false
dataset.getDefaultModel.isIsomorphicWith(newDataset.getDefaultModel) shouldBe true
dataset.getNamedModel("http://example.org/graph/2").isIsomorphicWith(
newDataset.getNamedModel("http://example.org/graph/2"),
) shouldBe true
}

s"multiple frames input stream to output ${lang.fullName} stream without --combine flag" in {
RdfFromJelly.setStdIn(ByteArrayInputStream(input10Frames))
val (out, err) = RdfFromJelly.runTestCommand(
List("rdf", "from-jelly", "--out-format", lang.cliOptions.head),
)
header.findAllIn(out).length shouldBe 10
}

s"multiple frames input stream to output ${lang.fullName} stream with --combine flag" in {
RdfFromJelly.setStdIn(ByteArrayInputStream(input10Frames))
val (out, err) = RdfFromJelly.runTestCommand(
List("rdf", "from-jelly", "--combine", "--out-format", lang.cliOptions.head),
)
header.findAllIn(out).length shouldBe 1
}
}

"throw proper exception" when {
"input file is not found" in {
val nonExist = "non-existing-file"
Expand Down Expand Up @@ -339,34 +392,6 @@ class RdfFromJellySpec extends AnyWordSpec with Matchers with TestFixtureHelper:
}
}

"readable but not writable format supplied" in withFullJellyFile { j =>
withEmptyJenaFile(
testCode = { q =>
val exception =
intercept[ExitException] {
RdfFromJelly.runTestCommand(
List(
"rdf",
"from-jelly",
j,
"--to",
q,
"--out-format",
RdfFormat.RdfXml.cliOptions.head,
),
)
}
val msg = InvalidFormatSpecified(
RdfFormat.RdfXml.cliOptions.head,
RdfFromJellyPrint.validFormatsString,
)
RdfFromJelly.getErrString should include(msg.getMessage)
exception.code should be(1)
},
jenaLang = RDFLanguages.RDFXML,
)
}

"invalid --take-frames argument provided" in {
val e = intercept[ExitException] {
RdfFromJelly.runTestCommand(
Expand Down
Loading