diff --git a/sip-app/src/main/java/eu/delving/sip/files/ReportWriter.java b/sip-app/src/main/java/eu/delving/sip/files/ReportWriter.java index ec878c1a5..da8bec6c7 100644 --- a/sip-app/src/main/java/eu/delving/sip/files/ReportWriter.java +++ b/sip-app/src/main/java/eu/delving/sip/files/ReportWriter.java @@ -21,6 +21,7 @@ package eu.delving.sip.files; +import eu.delving.groovy.DiscardRecordException; import eu.delving.groovy.MappingException; import eu.delving.groovy.MetadataRecord; import eu.delving.groovy.XmlNodePrinter; @@ -50,6 +51,8 @@ */ public class ReportWriter { + + private final Object lock = new Object(); private File reportFile; private File reportIndexFile; private File reportConclusionFile; @@ -74,12 +77,39 @@ public ReportWriter(File reportFile, File reportIndexFile, File reportConclusion this.out = new OutputStreamWriter(count, "UTF-8"); } - public void invalid(MappingResult mappingResult, Exception e) throws IOException { + public void invalid(MappingResult mappingResult, Throwable e) throws IOException { report(ReportType.INVALID, e.getMessage()); out.write(mappingResult.toXml()); terminate(); } + public boolean recordError(MetadataRecord metadataRecord, MappingResult result, Throwable e) { + if (e == null) { + throw new NullPointerException(); + } + + try { + if (e instanceof DiscardRecordException) { + synchronized (lock) { + discarded(metadataRecord, e.getMessage()); + } + return false; + } else if (e instanceof MappingException) { + synchronized (lock) { + unexpected(metadataRecord, (MappingException) e); + } + } else { + synchronized (lock) { + invalid(result, e); + } + } + } catch (IOException ioe) { + ioe.addSuppressed(e); + throw new RuntimeException(ioe); + } + return true; + } + public void discarded(MetadataRecord inputRecord, String discardMessage) throws IOException { report(ReportType.DISCARDED, discardMessage); out.write("Reason: "); diff --git a/sip-app/src/main/java/eu/delving/sip/model/SipModel.java b/sip-app/src/main/java/eu/delving/sip/model/SipModel.java index 995016501..a3f1b4ab1 100644 --- a/sip-app/src/main/java/eu/delving/sip/model/SipModel.java +++ b/sip-app/src/main/java/eu/delving/sip/model/SipModel.java @@ -441,11 +441,11 @@ private RecordScanner(ScanPredicate scanPredicate, Swing finished) { public void run() { try { if (parser == null) { - parser = new MetadataParser(dataSetModel.getDataSet().openSourceInputStream(), statsModel.getRecordCount()); + parser = new MetadataParser(dataSetModel.getDataSet().openSourceInputStream()); } parser.setNotExhausted(); parser.setProgressListener(progressListener); - for (MetadataRecord metadataRecord = parser.nextRecord(); metadataRecord != null && !metadataRecord.isPoison(); metadataRecord = parser.nextRecord()) { + for (MetadataRecord metadataRecord = parser.nextRecord(); metadataRecord != null; metadataRecord = parser.nextRecord()) { if (scanPredicate == null || scanPredicate.accept(metadataRecord)) { for (ParseListener parseListener : parseListeners) { parseListener.updatedRecord(metadataRecord); diff --git a/sip-app/src/main/java/eu/delving/sip/xml/AnalysisParser.java b/sip-app/src/main/java/eu/delving/sip/xml/AnalysisParser.java index 9cf49a776..9489566c7 100644 --- a/sip-app/src/main/java/eu/delving/sip/xml/AnalysisParser.java +++ b/sip-app/src/main/java/eu/delving/sip/xml/AnalysisParser.java @@ -30,24 +30,24 @@ import eu.delving.sip.files.DataSet; import eu.delving.sip.model.DataSetModel; import eu.delving.stats.Stats; -import org.apache.commons.io.IOUtils; import org.codehaus.stax2.XMLStreamReader2; import javax.xml.namespace.QName; import javax.xml.stream.XMLInputFactory; +import javax.xml.stream.XMLStreamException; import javax.xml.stream.events.XMLEvent; import java.io.InputStream; /** * Analyze xml input and compile statistics. When analysis fails, the .error will be appended to the filename * of the erroneous file. - * */ public class AnalysisParser implements Work.LongTermWork, Work.DataSetWork { public static final int ELEMENT_STEP = 10000; - private Stats stats = new Stats(); - private Listener listener; - private DataSetModel dataSetModel; + + private final Listener listener; + private final DataSetModel dataSetModel; + private final int maxUniqueValueLength; private ProgressListener progressListener; public interface Listener { @@ -60,7 +60,7 @@ public interface Listener { public AnalysisParser(DataSetModel dataSetModel, int maxUniqueValueLength, Listener listener) { this.dataSetModel = dataSetModel; this.listener = listener; - stats.maxUniqueValueLength = maxUniqueValueLength; + this.maxUniqueValueLength = maxUniqueValueLength; } @Override @@ -82,71 +82,26 @@ public void setProgressListener(ProgressListener progressListener) { @Override public void run() { try { - XMLInputFactory xmlif = XMLToolFactory.xmlInputFactory(); - Path path = Path.create(); - InputStream inputStream = null; if (dataSetModel.isEmpty()) return; - try { - switch (dataSetModel.getDataSetState()) { - case SOURCED: - inputStream = dataSetModel.getDataSet().openSourceInputStream(); - stats.freshStats(); - break; - default: - throw new IllegalStateException("Unexpected state: " + dataSetModel.getDataSetState()); - } - stats.name = dataSetModel.getDataSet().getDataSetFacts().get("name"); - XMLStreamReader2 input = (XMLStreamReader2) xmlif.createXMLStreamReader(getClass().getName(), inputStream); - StringBuilder text = new StringBuilder(); - int count = 0; - while (true) { - switch (input.getEventType()) { - case XMLEvent.START_ELEMENT: - if (++count % ELEMENT_STEP == 0) { - if (listener != null) progressListener.setProgress(count); - } - for (int walk = 0; walk < input.getNamespaceCount(); walk++) { - stats.recordNamespace(input.getNamespacePrefix(walk), input.getNamespaceURI(walk)); - } - String chunk = text.toString().trim(); - if (!chunk.isEmpty()) { - stats.recordValue(path, chunk); - } - text.setLength(0); - path = path.child(Tag.element(input.getName())); - if (input.getAttributeCount() > 0) { - for (int walk = 0; walk < input.getAttributeCount(); walk++) { - QName attributeName = input.getAttributeName(walk); - Path withAttr = path.child(Tag.attribute(attributeName)); - stats.recordValue(withAttr, input.getAttributeValue(walk)); - } - } - break; - case XMLEvent.CHARACTERS: - case XMLEvent.CDATA: - text.append(input.getText()); - break; - case XMLEvent.END_ELEMENT: - // todo: stats.recordRecordEnd() - stats.recordValue(path, text.toString().trim()); - text.setLength(0); - path = path.parent(); - break; + + switch (dataSetModel.getDataSetState()) { + case SOURCED: + Stats stats = new Stats(); + stats.maxUniqueValueLength = maxUniqueValueLength; + + try (InputStream in = dataSetModel.getDataSet().openSourceInputStream()) { + updateStats(stats, in, progressListener); + stats.name = dataSetModel.getDataSet().getDataSetFacts().get("name"); + stats.finish(); + listener.success(stats); } - if (!input.hasNext()) break; - input.next(); - } - } - finally { - IOUtils.closeQuietly(inputStream); + break; + default: + throw new IllegalStateException("Unexpected state: " + dataSetModel.getDataSetState()); } - stats.finish(); - listener.success(stats); - } - catch (CancelException e) { + } catch (CancelException e) { listener.failure("Cancellation", e); - } - catch (Exception e) { + } catch (Exception e) { switch (dataSetModel.getDataSetState()) { case SOURCED: dataSetModel.getDataSet().deleteSource(); @@ -157,4 +112,54 @@ public void run() { listener.failure("The imported file contains errors, the file has been deleted", e); } } + + public static Stats updateStats(Stats stats, InputStream inputStream) throws XMLStreamException, CancelException { + return updateStats(stats, inputStream, null); + } + + private static Stats updateStats(Stats stats, InputStream inputStream, ProgressListener listener) throws XMLStreamException, CancelException { + XMLInputFactory xmlif = XMLToolFactory.xmlInputFactory(); + Path path = Path.create(); + XMLStreamReader2 input = (XMLStreamReader2) xmlif.createXMLStreamReader(AnalysisParser.class.getName(), inputStream); + StringBuilder text = new StringBuilder(); + int count = 0; + while (true) { + switch (input.getEventType()) { + case XMLEvent.START_ELEMENT: + if (++count % ELEMENT_STEP == 0) { + if (listener != null) listener.setProgress(count); + } + for (int walk = 0; walk < input.getNamespaceCount(); walk++) { + stats.recordNamespace(input.getNamespacePrefix(walk), input.getNamespaceURI(walk)); + } + String chunk = text.toString().trim(); + if (!chunk.isEmpty()) { + stats.recordValue(path, chunk); + } + text.setLength(0); + path = path.child(Tag.element(input.getName())); + if (input.getAttributeCount() > 0) { + for (int walk = 0; walk < input.getAttributeCount(); walk++) { + QName attributeName = input.getAttributeName(walk); + Path withAttr = path.child(Tag.attribute(attributeName)); + stats.recordValue(withAttr, input.getAttributeValue(walk)); + } + } + break; + case XMLEvent.CHARACTERS: + case XMLEvent.CDATA: + text.append(input.getText()); + break; + case XMLEvent.END_ELEMENT: + // todo: stats.recordRecordEnd() + stats.recordValue(path, text.toString().trim()); + text.setLength(0); + path = path.parent(); + break; + } + if (!input.hasNext()) break; + input.next(); + } + return stats; + } } diff --git a/sip-app/src/main/java/eu/delving/sip/xml/FileProcessor.java b/sip-app/src/main/java/eu/delving/sip/xml/FileProcessor.java index be9d6e81e..e1e34f706 100644 --- a/sip-app/src/main/java/eu/delving/sip/xml/FileProcessor.java +++ b/sip-app/src/main/java/eu/delving/sip/xml/FileProcessor.java @@ -30,14 +30,15 @@ import eu.delving.sip.files.DataSet; import eu.delving.sip.files.ReportWriter; import eu.delving.sip.model.Feedback; -import eu.delving.sip.model.SipModel; import org.w3c.dom.Node; +import org.xml.sax.SAXException; import javax.swing.*; import javax.xml.stream.XMLStreamException; import javax.xml.transform.Source; import javax.xml.transform.dom.DOMSource; import javax.xml.validation.Validator; +import javax.xml.xpath.XPathExpressionException; import java.io.File; import java.io.IOException; import java.util.ArrayList; @@ -47,14 +48,10 @@ import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingDeque; -import static eu.delving.sip.files.Storage.XSD_VALIDATION; - /** * Process an input file, mapping it to output records which are validated and used to gather statistics. * A validation report is produced. Output can be recorded for experts as well. The processing is paused * for user input when invalid records are encountered. - * - * */ public class FileProcessor implements Work.DataSetPrefixWork, Work.LongTermWork { @@ -69,12 +66,12 @@ public class FileProcessor implements Work.DataSetPrefixWork, Work.LongTermWork private ProgressListener progressListener; private final Termination termination = new Termination(); private final Object lock = new Object(); + private NQuadWriter nQuadWriter; private void info(String message) { if (feedback != null) { feedback.info(message); - } - else { + } else { System.out.println(message); } } @@ -93,14 +90,14 @@ public interface UriGenerator { } public FileProcessor( - Feedback feedback, - boolean enableXSDValidation, - DataSet dataSet, - RecMapping recMapping, - boolean allowInvalid, - GroovyCodeResource groovyCodeResource, - UriGenerator uriGenerator, - Listener listener + Feedback feedback, + boolean enableXSDValidation, + DataSet dataSet, + RecMapping recMapping, + boolean allowInvalid, + GroovyCodeResource groovyCodeResource, + UriGenerator uriGenerator, + Listener listener ) { this.feedback = feedback; this.enableXSDValidation = enableXSDValidation; @@ -154,11 +151,13 @@ public void run() { groovyCodeResource.clearMappingScripts(); try { // TODO record count is never used - MetadataParser parser = new MetadataParser(getDataSet().openSourceInputStream(), -1); + MetadataParser parser = new MetadataParser(getDataSet().openSourceInputStream()); parser.setProgressListener(progressListener); ReportWriter reportWriter = getDataSet().openReportWriter(recMapping.getRecDefTree().getRecDef()); File outputDir = createEmptyOutputDir(); + nQuadWriter = new NQuadWriter(outputDir.toPath()); + nQuadWriter.start(); Consumer consumer = new Consumer(reportWriter); int engineCount = (int) Math.round(Runtime.getRuntime().availableProcessors() * 1.1); @@ -193,8 +192,7 @@ public void run() { } info(Thread.currentThread().getName() + " about to consume"); consumer.run(); - } - catch (Exception e) { + } catch (Exception e) { termination.dueToException(e); feedback.alert("File processing setup problem", e); } @@ -204,7 +202,7 @@ private File createEmptyOutputDir() { File sipDir = getDataSet().targetOutput().getParentFile(); File outputDir = new File(sipDir, "output"); if (!outputDir.exists()) { - if(!outputDir.mkdir()) { + if (!outputDir.mkdir()) { throw new IllegalStateException("Unable to create directory: " + outputDir); } } else { @@ -216,45 +214,6 @@ private File createEmptyOutputDir() { return outputDir; } - private class MappingOutput { - final MetadataRecord metadataRecord; - final MappingResult mappingResult; - final Exception exception; - - private MappingOutput(MetadataRecord metadataRecord, MappingResult mappingResult, Exception exception) { - this.metadataRecord = metadataRecord; - this.mappingResult = mappingResult; - this.exception = exception; - } - - public void record(ReportWriter reportWriter, XmlOutput xmlOutput) { - try { - if (exception == null) { - xmlOutput.write(mappingResult.getLocalId(), mappingResult.root()); - } - else if (exception instanceof DiscardRecordException) { - synchronized (lock) { - reportWriter.discarded(metadataRecord, exception.getMessage()); - } - } - else if (exception instanceof MappingException) { - synchronized (lock) { - reportWriter.unexpected(metadataRecord, (MappingException) exception); - } - termination.dueToException(exception); - } - else { - synchronized (lock) { - reportWriter.invalid(mappingResult, exception); - } - } - } - catch (Exception e) { - termination.dueToException(e); - } - } - } - private class MetadataParserRunner implements Runnable { final BlockingQueue outputQueue = new LinkedBlockingDeque<>(); @@ -266,29 +225,24 @@ private MetadataParserRunner(MetadataParser metadataParser) { thread.setName(getClass().getName()); } - public MetadataRecord nextRecord() throws InterruptedException { - return outputQueue.take(); + public void drainTo(List collector, int capacity) { + outputQueue.drainTo(collector, capacity); } @Override public void run() { MetadataRecord record; - try { - while (true) { - try { - if (!((record = metadataParser.nextRecord()) != null)) break; - outputQueue.add(record); - while(outputQueue.size() > 1000) { - Thread.sleep(1000); - } - } catch (XMLStreamException | IOException | CancelException | InterruptedException e) { - termination.dueToException(e); - break; + while (true) { + try { + if (!((record = metadataParser.nextRecord()) != null)) break; + outputQueue.add(record); + while (outputQueue.size() > 5000) { + System.out.println("Sleeping... " + System.currentTimeMillis()); + Thread.sleep(250); } - } - } finally { - for (int i = 0; i < 100; i++) { - outputQueue.add(MetadataRecord.poisonPill()); + } catch (XMLStreamException | IOException | CancelException | InterruptedException e) { + termination.dueToException(e); + break; } } } @@ -316,18 +270,19 @@ public void run() { recordCount += engine.recordCount; validCount += engine.validCount; } catch (InterruptedException e) { - termination.dueToException(e); + // nothing to do here } } + nQuadWriter.stop(); + if (termination.isIncomplete()) { info("Abort report writer"); if (reportWriter != null) { reportWriter.abort(); } - } - else { + } else { info(String.format("Finish report writer records=%d valid=%d", recordCount, validCount)); - if(reportWriter != null) { + if (reportWriter != null) { reportWriter.finish(validCount, recordCount - validCount); } termination.normalCompletion(); @@ -351,7 +306,7 @@ private class MappingEngine implements Runnable { public int validCount; final File outputDir; final Map namespaceMap; - final ReportWriter reportWriter; + final ReportWriter reportWriter; final List assertionTests; final MappingRunner MappingRunner; @@ -382,117 +337,89 @@ public void start() { thread.start(); } - public void accept(MetadataRecord metadataRecord, MappingResult mappingResult, Exception exception) { - try { - MappingOutput mappingOutput = new MappingOutput(metadataRecord, mappingResult, exception); - recordCount++; - if (mappingOutput.exception == null) { - validCount++; - } + public void accept(MappingResult mappingResult) throws XMLStreamException, IOException, InterruptedException { + validCount++; - if(Application.canWritePocketFiles()) { - File outputFile = new File(outputDir, mappingOutput.metadataRecord.getRecordNumber() + ".xml"); - XmlOutput xmlOutput = new XmlOutput(outputFile, namespaceMap); - mappingOutput.record(reportWriter, xmlOutput); - xmlOutput.finish(false); - } - } catch (XMLStreamException | IOException e) { - termination.dueToException(e); + byte[] nQuad = mappingResult.toNQuad(dataSet.getDataSetFacts()); + byte[] xml = null; + if (Application.canWritePocketFiles()) { + XmlOutput xmlOutput = new XmlOutput(namespaceMap); + xmlOutput.write(mappingResult.getLocalId(), mappingResult.root()); + xmlOutput.finish(); + + xml = xmlOutput.toBytes(); } + nQuadWriter.put(new NQuadWriter.Input(nQuad, xml)); } @Override public void run() { - try { - while (termination.notYet()) { - MetadataRecord record = metadataParserRunner.nextRecord(); + final int capacity = 50; + final ArrayList records = new ArrayList<>(capacity); - if (record == null || record.isPoison()) break; + while (termination.notYet()) { - try { - Node node = null; - try { - node = MappingRunner.runMapping(record); - } catch (DiscardRecordException e) { - reportWriter.discarded(record, e.toString()); - } + records.clear(); + metadataParserRunner.drainTo(records, capacity); + for (MetadataRecord record : records) { - if (node == null) continue; - MappingResult result = new MappingResult(serializer, uriGenerator.generateUri(record.getId()), node, MappingRunner.getRecDefTree()); - List uriErrors = result.getUriErrors(); - try { - if (!uriErrors.isEmpty()) { - StringBuilder uriErrorsString = new StringBuilder(); - for (String uriError: uriErrors) { - uriErrorsString.append(uriError).append("\n"); - } - throw new Exception("URI Errors\n"+ uriErrorsString); - } - if (validator == null) { - accept(record, result, null); - } - else { - try { - Source source = new DOMSource(node); - validator.validate(source); - for (AssertionTest assertionTest : assertionTests) { - String violation = assertionTest.getViolation(result.root()); - if (violation != null) throw new AssertionException(violation); - } - accept(record, result, null); - } - catch (Exception e) { - accept(record, result, e); - if (!allowInvalid) { - termination.askHowToProceed(record, e); - } - } - } - } - catch (Exception e) { - accept(record, result, e); - if (!allowInvalid) { - termination.askHowToProceed(record, e); - } + try { + recordCount++; + MappingResult result = runMapping(record); + accept(result); + } catch (Throwable e) { + + boolean isFatal = reportWriter.recordError(record, null, e); + if (isFatal) { + termination.dueToException(e); + return; } + } finally { + isDone = true; + } + } + } + } + private MappingResult runMapping(MetadataRecord record) throws XPathExpressionException, MappingException, AssertionException, IOException, SAXException { + Node node = MappingRunner.runMapping(record); + if (node == null) return null; + MappingResult result = new MappingResult(serializer, uriGenerator.generateUri(record.getId()), node, MappingRunner.getRecDefTree()); + validate(node, result); + return result; + } - } - catch (DiscardRecordException e) { - accept(record, null, e); - } - catch (MappingException e) { - accept(record, null, e); - termination.dueToException(record, e); - } + private void validate(Node node, MappingResult result) throws XPathExpressionException, AssertionException, IOException, SAXException { + List uriErrors = result.getUriErrors(); + if (!uriErrors.isEmpty()) { + StringBuilder uriErrorsString = new StringBuilder(); + for (String uriError : uriErrors) { + uriErrorsString.append(uriError).append("\n"); } + throw new RuntimeException("URI Errors\n" + uriErrorsString); } - catch (Exception e) { - termination.dueToException(e); - } finally { - isDone = true; + if (validator != null) { + Source source = new DOMSource(node); + validator.validate(source); + for (AssertionTest assertionTest : assertionTests) { + String violation = assertionTest.getViolation(result.root()); + if (violation != null) throw new AssertionException(violation); + } } } } - private enum NextStep { - CONTINUE, - INVESTIGATE, - ABORT - } - private class Termination { private MetadataRecord failedRecord; - private Exception exception; - private boolean cancelled, completed; - private NextStep nextStep = NextStep.CONTINUE; + private Throwable exception; + private boolean completed; boolean notYet() { return !completed && !isIncomplete(); } boolean isIncomplete() { - return cancelled || failedRecord != null || exception != null; + return failedRecord != null || exception != null; } int getRecordNumber() { @@ -506,21 +433,15 @@ void normalCompletion() { } } - void dueToCancellation() { - cancelled = true; - listener.aborted(FileProcessor.this); - } - - void dueToException(Exception exception) { + void dueToException(Throwable exception) { dueToException(null, exception); } - synchronized void dueToException(MetadataRecord failedRecord, Exception exception) { + synchronized void dueToException(MetadataRecord failedRecord, Throwable exception) { if (this.exception == null) { // only show one of them if (feedback != null) { feedback.alert("Problem processing", exception); - } - else { + } else { System.out.println("Problem processing: "); exception.printStackTrace(); } @@ -531,53 +452,5 @@ synchronized void dueToException(MetadataRecord failedRecord, Exception exceptio listener.failed(FileProcessor.this); } } - - synchronized void askHowToProceed(MetadataRecord failedRecord, Exception exception) { - switch (nextStep) { - case CONTINUE: - break; - case INVESTIGATE: - case ABORT: - return; - } - nextStep = blockForNextStep(failedRecord.getRecordNumber()); - switch (nextStep) { - case CONTINUE: - break; - case INVESTIGATE: - this.exception = exception; // prevent reporting, because they chose to investigate - dueToException(failedRecord, exception); - break; - case ABORT: - dueToCancellation(); - break; - } - } - - private NextStep blockForNextStep(int recordNumber) { - JRadioButton continueButton = new JRadioButton(String.format( - "Continue - Continue the %s mapping of data set %s, discarding invalid record %d", - getPrefix(), getSpec(), recordNumber - )); - JRadioButton investigateButton = new JRadioButton(String.format( - "Investigate - Stop and fix the %s mapping of data set %s, with invalid record %d in view", - getPrefix(), getSpec(), recordNumber - )); - ButtonGroup bg = new ButtonGroup(); - bg.add(continueButton); - continueButton.setSelected(true); - bg.add(investigateButton); - if (feedback.form("Invalid Record! How to proceed?", continueButton, investigateButton)) { - if (investigateButton.isSelected()) { - return NextStep.INVESTIGATE; - } - else { - return NextStep.CONTINUE; - } - } - else { - return NextStep.ABORT; - } - } } } diff --git a/sip-app/src/main/java/eu/delving/sip/xml/MetadataParser.java b/sip-app/src/main/java/eu/delving/sip/xml/MetadataParser.java index e43dfae4a..bd3a940f6 100644 --- a/sip-app/src/main/java/eu/delving/sip/xml/MetadataParser.java +++ b/sip-app/src/main/java/eu/delving/sip/xml/MetadataParser.java @@ -24,7 +24,6 @@ import eu.delving.XMLToolFactory; import eu.delving.groovy.GroovyNode; import eu.delving.groovy.MetadataRecord; -import eu.delving.groovy.MetadataRecordFactory; import eu.delving.metadata.Path; import eu.delving.metadata.StringUtil; import eu.delving.metadata.Tag; @@ -53,21 +52,20 @@ public class MetadataParser { private InputStream inputStream; private XMLStreamReader input; - private int recordIndex, recordCount; + private int recordIndex; private Map namespaces = new TreeMap(); private Path path = Path.create(); - private MetadataRecordFactory factory = new MetadataRecordFactory(namespaces); private ProgressListener progressListener; private boolean isSourceExhausted; - public MetadataParser(InputStream inputStream, int recordCount) throws XMLStreamException { + public MetadataParser(InputStream inputStream) throws XMLStreamException { this.inputStream = inputStream; this.input = XMLToolFactory.xmlInputFactory().createXMLStreamReader("Metadata", inputStream); } public void setProgressListener(ProgressListener progressListener) { this.progressListener = progressListener; - progressListener.prepareFor(recordCount); + progressListener.prepareFor(0); } @SuppressWarnings("unchecked") @@ -125,7 +123,7 @@ else if (node != null) { if (node != null) { if (path.equals(RECORD_CONTAINER)) { // TODO record count is never used - metadataRecord = factory.fromGroovyNode(node, recordIndex++, recordCount); + metadataRecord = MetadataRecord.create(node, recordIndex++); if (progressListener != null) { progressListener.setProgress(recordIndex); } diff --git a/sip-app/src/main/java/eu/delving/sip/xml/NQuadWriter.java b/sip-app/src/main/java/eu/delving/sip/xml/NQuadWriter.java new file mode 100644 index 000000000..8b32eb134 --- /dev/null +++ b/sip-app/src/main/java/eu/delving/sip/xml/NQuadWriter.java @@ -0,0 +1,116 @@ +package eu.delving.sip.xml; + +import eu.delving.stats.Stats; + +import java.io.ByteArrayInputStream; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.Queue; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.zip.GZIPOutputStream; + +public class NQuadWriter implements Runnable { + + public static class Input { + + private final byte[] nQad; + private final byte[] xml; + + public Input(byte[] nQad, byte[] xml) { + this.nQad = nQad; + this.xml = xml; + } + + private Input() { + this(null, null); + } + + private boolean isNull() { + return nQad == null; + } + } + + private final Stats stats = new Stats(); + private final Thread thread = new Thread(this); + private final Queue queue = new LinkedBlockingQueue<>(); + private boolean stop; + private final Path outputDir; + private Throwable error; + + public NQuadWriter(Path outputDir) { + stats.maxUniqueValueLength = 1000; + this.outputDir = outputDir; + } + + public RuntimeException getError() { + if (!(error instanceof RuntimeException)) { + return new RuntimeException(error); + } + return (RuntimeException) error; + } + + public boolean hasError() { + return error != null; + } + + public void put(Input input) throws InterruptedException { + if (hasError()) { + throw getError(); + } + + queue.add(input); + } + + @Override + public void run() { + try ( + GZIPOutputStream nQadOut = new GZIPOutputStream(Files.newOutputStream(outputDir.resolve("output.nq.gz")), false); + GZIPOutputStream xmlOut = new GZIPOutputStream(Files.newOutputStream(outputDir.resolve("output.xml.gz")), false); + ) { + + while (!stop) { + Input input = queue.poll(); + + if (input != null) { + if (input.isNull()) { + stop = true; + + nQadOut.finish(); + nQadOut.flush(); + + xmlOut.finish(); + xmlOut.flush(); + + try(GZIPOutputStream xmlStatsOut = new GZIPOutputStream(Files.newOutputStream(outputDir.resolve("output.stats.xml.gz")), false)) { + Stats.write(stats, xmlStatsOut); + } + break; + } + + nQadOut.write(input.nQad); + if(input.xml != null) { + AnalysisParser.updateStats(stats, new ByteArrayInputStream(input.xml)); + xmlOut.write(input.xml); + } + } + } + + } catch (Throwable e) { + error = e; + } + } + + public void start() { + thread.setName(getClass().getName()); + thread.start(); + } + + public void stop() { + try { + queue.add(new Input()); + thread.join(); + } catch (InterruptedException e) { + // nothing to do + } + } +} \ No newline at end of file diff --git a/sip-app/src/main/java/eu/delving/sip/xml/XmlOutput.java b/sip-app/src/main/java/eu/delving/sip/xml/XmlOutput.java index 13a6b58ce..d0d7dcbf6 100644 --- a/sip-app/src/main/java/eu/delving/sip/xml/XmlOutput.java +++ b/sip-app/src/main/java/eu/delving/sip/xml/XmlOutput.java @@ -34,12 +34,7 @@ import javax.xml.stream.XMLStreamException; import javax.xml.stream.events.Attribute; import javax.xml.stream.events.Namespace; -import java.io.BufferedOutputStream; -import java.io.File; -import java.io.FileOutputStream; -import java.io.IOException; -import java.io.OutputStream; -import java.io.OutputStreamWriter; +import java.io.*; import java.util.ArrayList; import java.util.List; import java.util.Map; @@ -70,18 +65,13 @@ public class XmlOutput { {"dbpedia-owl", "http://dbpedia.org/ontology/"}, {"dbprop", "http://dbpedia.org/property/"} }; - private File outputXmlFile; - private File tempOutputFile; + private XMLEventFactory eventFactory = XMLToolFactory.xmlEventFactory(); - private OutputStream outputStream; + private final ByteArrayOutputStream buffer = new ByteArrayOutputStream(1024 * 8); private XMLEventWriter out; - public XmlOutput(File outputXmlFile, Map namespaces) throws IOException, XMLStreamException { - this.outputXmlFile = outputXmlFile; - FileUtils.deleteQuietly(outputXmlFile); - this.tempOutputFile = new File(outputXmlFile.getParentFile(), outputXmlFile.getName() + ".temp"); - outputStream = new BufferedOutputStream(new FileOutputStream(tempOutputFile)); - out = XMLToolFactory.xmlOutputFactory().createXMLEventWriter(new OutputStreamWriter(outputStream, "UTF-8")); + public XmlOutput(Map namespaces) throws IOException, XMLStreamException { + out = XMLToolFactory.xmlOutputFactory().createXMLEventWriter(new OutputStreamWriter(buffer, "UTF-8")); out.add(eventFactory.createStartDocument()); out.add(eventFactory.createCharacters("\n")); StringBuilder schemaLocation = new StringBuilder(); @@ -121,24 +111,11 @@ public void write(String identifier, Node node) throws XMLStreamException { out.add(eventFactory.createEndElement(RDF_PREFIX, RDF_URI, RDF_RECORD_TAG)); } - public void finish(boolean aborted) { - try { - out.add(eventFactory.createCharacters("\n")); - out.add(eventFactory.createEndElement(RDF_PREFIX, RDF_URI, RDF_ROOT_TAG)); - out.add(eventFactory.createCharacters("\n")); - out.add(eventFactory.createEndDocument()); - out.flush(); - outputStream.close(); - if (aborted) { - FileUtils.deleteQuietly(tempOutputFile); - } - else { - FileUtils.moveFile(tempOutputFile, outputXmlFile); - } - } - catch (Exception e) { - throw new RuntimeException("Trouble closing xml output", e); - } + public void finish() throws XMLStreamException { + out.add(eventFactory.createCharacters("\n")); + out.add(eventFactory.createEndElement(RDF_PREFIX, RDF_URI, RDF_ROOT_TAG)); + out.add(eventFactory.createCharacters("\n")); + out.add(eventFactory.createEndDocument()); } private void writeElement(Element element, int depth) throws XMLStreamException { @@ -193,4 +170,7 @@ private void indent(int depth) throws XMLStreamException { for (int walk = 0; walk < depth; walk++) out.add(eventFactory.createCharacters(" ")); } + public byte[] toBytes() { + return buffer.toByteArray(); + } } diff --git a/sip-app/src/main/java/eu/delving/stats/Stats.java b/sip-app/src/main/java/eu/delving/stats/Stats.java index 4693ac3d5..bf63ed27b 100644 --- a/sip-app/src/main/java/eu/delving/stats/Stats.java +++ b/sip-app/src/main/java/eu/delving/stats/Stats.java @@ -93,10 +93,6 @@ public void recordNamespace(String prefix, String uri) { namespaces.put(prefix, uri); } - public void freshStats() { - recordStats = new RecordStats(); - } - public void recordValue(Path path, String value) { ValueStats valueStats = fieldValueMap.get(path); if (valueStats == null) fieldValueMap.put(path, valueStats = new ValueStats()); @@ -128,7 +124,7 @@ public void finish() { public Map namespaces = new HashMap(); - public RecordStats recordStats; + public RecordStats recordStats = new RecordStats(); @XStreamAlias("field-value-stats") public Map fieldValueMap = new HashMap(); diff --git a/sip-core/src/main/java/eu/delving/groovy/MetadataRecord.java b/sip-core/src/main/java/eu/delving/groovy/MetadataRecord.java index 2039c2bc2..f32bb9597 100644 --- a/sip-core/src/main/java/eu/delving/groovy/MetadataRecord.java +++ b/sip-core/src/main/java/eu/delving/groovy/MetadataRecord.java @@ -21,7 +21,6 @@ package eu.delving.groovy; -import java.util.List; import java.util.regex.Pattern; /** @@ -35,25 +34,15 @@ public class MetadataRecord { private GroovyNode rootNode; - // TODO record count is never used - private int recordNumber, recordCount; + private int recordNumber; - public static MetadataRecord create(GroovyNode rootNode, int recordNumber, int recordCount) { - return new MetadataRecord(rootNode, recordNumber, recordCount); + public static MetadataRecord create(GroovyNode rootNode, int recordNumber) { + return new MetadataRecord(rootNode, recordNumber); } - public static MetadataRecord poisonPill() { - return new MetadataRecord(null, -1, -1); - } - - private MetadataRecord(GroovyNode rootNode, int recordNumber, int recordCount) { + private MetadataRecord(GroovyNode rootNode, int recordNumber) { this.rootNode = rootNode; this.recordNumber = recordNumber; - this.recordCount = recordCount; - } - - public boolean isPoison() { - return rootNode == null; } public GroovyNode getRootNode() { @@ -72,11 +61,6 @@ public int getRecordNumber() { return recordNumber; } - // TODO record count is never used - public int getRecordCount() { - return recordCount; - } - private boolean checkFor(GroovyNode groovyNode, Pattern pattern) { if (groovyNode.text != null && pattern.matcher(groovyNode.text).find()) { return true; @@ -90,7 +74,7 @@ private boolean checkFor(GroovyNode groovyNode, Pattern pattern) { } public String toString() { - return String.format("MetadataRecord(%d / %d)", recordNumber, recordCount); + return String.format("MetadataRecord(%d / ?)", recordNumber); } } diff --git a/sip-core/src/main/java/eu/delving/groovy/MetadataRecordFactory.java b/sip-core/src/main/java/eu/delving/groovy/MetadataRecordFactory.java deleted file mode 100644 index 9a91d2980..000000000 --- a/sip-core/src/main/java/eu/delving/groovy/MetadataRecordFactory.java +++ /dev/null @@ -1,123 +0,0 @@ -/* - * Copyright 2011, 2012 Delving BV - * - * Licensed under the EUPL, Version 1.0 or? as soon they - * will be approved by the European Commission - subsequent - * versions of the EUPL (the "Licence"); - * you may not use this work except in compliance with the - * Licence. - * You may obtain a copy of the Licence at: - * - * http://ec.europa.eu/idabc/eupl - * - * Unless required by applicable law or agreed to in - * writing, software distributed under the Licence is - * distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either - * express or implied. - * See the Licence for the specific language governing - * permissions and limitations under the Licence. - */ - -package eu.delving.groovy; - -import com.ctc.wstx.exc.WstxParsingException; -import eu.delving.XMLToolFactory; -import org.codehaus.stax2.XMLStreamReader2; - -import javax.xml.namespace.QName; -import javax.xml.parsers.DocumentBuilder; -import javax.xml.parsers.ParserConfigurationException; -import javax.xml.stream.XMLInputFactory; -import javax.xml.stream.XMLStreamException; -import javax.xml.stream.events.XMLEvent; -import java.io.Reader; -import java.io.StringReader; -import java.util.Map; - -/** - * When the MetadataRecord instances are not coming from the parse of an input file - * using the MetadataParser, they can be produced one by one using the metadataRecordFrom method, which - * first cleverly wraps the record and then parses it. - * - * - */ - -public class MetadataRecordFactory { - private XMLInputFactory inputFactory = XMLToolFactory.xmlInputFactory(); - private DocumentBuilder documentBuilder; - private Map namespaces; - - public MetadataRecordFactory(Map namespaces) { - this.namespaces = namespaces; - try { - documentBuilder = XMLToolFactory.documentBuilderFactory().newDocumentBuilder(); - } - catch (ParserConfigurationException e) { - throw new RuntimeException("Parser config?", e); - } - } - - public MetadataRecord fromGroovyNode(GroovyNode rootNode, int recordNumber, int recordCount) { - // TODO record count is never used - return MetadataRecord.create(rootNode, recordNumber, recordCount); - } - - public MetadataRecord metadataRecordFrom(String recordContents) throws XMLStreamException { - try { - Reader reader = new StringReader(recordContents); - XMLStreamReader2 input = (XMLStreamReader2) inputFactory.createXMLStreamReader(reader); - GroovyNode rootNode = null, node = null; - StringBuilder value = new StringBuilder(); - while (true) { - switch (input.getEventType()) { - case XMLEvent.START_DOCUMENT: - break; - case XMLEvent.START_ELEMENT: - node = new GroovyNode(node, input.getNamespaceURI(), input.getLocalName(), input.getPrefix()); - if (rootNode == null) { - rootNode = node; - } - if (input.getAttributeCount() > 0) { - for (int walk = 0; walk < input.getAttributeCount(); walk++) { - QName attributeName = input.getAttributeName(walk); - if (attributeName.getPrefix() == null || attributeName.getPrefix().isEmpty()) { - node.attributes().put(attributeName.getLocalPart(), input.getAttributeValue(walk)); - } - else { - node.attributes().put(String.format("%s:%s", attributeName.getPrefix(), attributeName.getLocalPart()), input.getAttributeValue(walk)); - } - } - } - value.setLength(0); - break; - case XMLEvent.CHARACTERS: - value.append(input.getText()); - break; - case XMLEvent.CDATA: - value.append(String.format("", input.getText())); - break; - case XMLEvent.END_ELEMENT: - if (node == null) throw new RuntimeException("Node cannot be null"); - String valueString = value.toString().trim(); - value.setLength(0); - if (valueString.length() > 0) node.setNodeValue(valueString); - node = node.parent(); - break; - case XMLEvent.END_DOCUMENT: { - break; - } - } - if (!input.hasNext()) { - break; - } - input.next(); - } - return MetadataRecord.create(rootNode, -1, -1); - } - catch (WstxParsingException e) { - throw new XMLStreamException("Problem parsing record:\n" + recordContents, e); - } - } - -} diff --git a/sip-core/src/main/java/eu/delving/metadata/MappingResult.java b/sip-core/src/main/java/eu/delving/metadata/MappingResult.java index f0d7a07c6..4c6b12a3d 100644 --- a/sip-core/src/main/java/eu/delving/metadata/MappingResult.java +++ b/sip-core/src/main/java/eu/delving/metadata/MappingResult.java @@ -21,32 +21,43 @@ package eu.delving.metadata; +import com.google.gson.Gson; +import com.google.gson.stream.JsonWriter; import eu.delving.groovy.XmlSerializer; +import org.apache.commons.codec.binary.Hex; import org.apache.jena.rdf.model.Model; +import org.apache.jena.riot.RDFDataMgr; +import org.apache.jena.riot.RDFFormat; import org.w3c.dom.Node; import org.w3c.dom.NodeList; import javax.xml.xpath.XPathConstants; import javax.xml.xpath.XPathExpression; import javax.xml.xpath.XPathExpressionException; -import java.io.ByteArrayInputStream; -import java.io.InputStream; -import java.io.StringReader; +import java.io.*; import java.net.URI; import java.net.URISyntaxException; +import java.nio.charset.StandardCharsets; +import java.security.MessageDigest; +import java.security.NoSuchAlgorithmException; import java.util.ArrayList; +import java.util.HashMap; import java.util.List; import java.util.Map; import org.apache.jena.rdf.model.ModelFactory; +import static org.checkerframework.checker.units.UnitsTools.s; /** * The result of the mapping engine is wrapped in this class so that some post-processing and checking * can be done on the resulting Node tree. - * */ public class MappingResult { + + private final static byte[] SEPARATOR_PREFIX = "# !$".getBytes(); + private final static byte[] NEW_LINE = "\n".getBytes(); + private XmlSerializer serializer; private Node root; private String localId; @@ -59,6 +70,59 @@ public MappingResult(XmlSerializer serializer, String localId, Node root, RecDef this.recDefTree = recDefTree; } + private String sha1(byte[] input) throws NoSuchAlgorithmException { + MessageDigest mDigest = MessageDigest.getInstance("SHA1"); + byte[] result = mDigest.digest(input); + return new String(Hex.encodeHex(result)); + } + + public byte[] toNQuad(Map facts) { + try { + byte[] rdf = toRDF().getBytes(StandardCharsets.UTF_8); + Model model = ModelFactory.createDefaultModel().read(new ByteArrayInputStream(rdf), null, "RDF/XML"); + + ByteArrayOutputStream nquadBuffer = new ByteArrayOutputStream(rdf.length * 2); + RDFDataMgr.write(nquadBuffer, model, RDFFormat.NQUADS); + nquadBuffer.write(NEW_LINE); + + ByteArrayOutputStream quadMapBuffer = new ByteArrayOutputStream(1024 * 4); + try (JsonWriter writer = new JsonWriter(new OutputStreamWriter(quadMapBuffer))) { + writer.beginObject(); + + String orgID = facts.get("orgId"); + String spec = facts.get("spec"); + + writer.name("hubID"); + writer.value(orgID + "_" + spec + "_" + localId); + + writer.name("orgID"); + writer.value(orgID); + + writer.name("localID"); + writer.value(localId); + + writer.name("graphURI"); + writer.value(facts.get("baseUrl")); + + writer.name("datasetID"); + writer.value(spec); + + writer.name("contentHash"); + writer.value(sha1(rdf)); + + writer.endObject(); + writer.flush(); + } + + nquadBuffer.write(SEPARATOR_PREFIX); + nquadBuffer.write(quadMapBuffer.toByteArray()); + nquadBuffer.write(NEW_LINE); + return nquadBuffer.toByteArray(); + } catch (IOException | NoSuchAlgorithmException e) { + throw new RuntimeException(e); + } + } + public String getLocalId() { return localId; } @@ -71,8 +135,7 @@ private boolean uriCheck(String maybeUri) { try { URI uri = new URI(maybeUri); return uri.isAbsolute(); - } - catch (URISyntaxException e) { + } catch (URISyntaxException e) { return false; } } @@ -80,15 +143,15 @@ private boolean uriCheck(String maybeUri) { public List getUriErrors() throws XPathExpressionException { List errors = new ArrayList(); for (Map.Entry entry : recDefTree.getUriCheckPaths().entrySet()) { - // TODO causes the largest spikes in memory usage by a large margin even after the set of URI checks was significantly. See #38738404363a326970f52626ae6ac61deaebe2ec + // TODO causes the largest spikes in memory usage by a large margin even after the set of URI checks was significantly reduced. See #38738404363a326970f52626ae6ac61deaebe2ec NodeList nodeList = (NodeList) entry.getValue().evaluate(root, XPathConstants.NODESET); for (int walk = 0; walk < nodeList.getLength(); walk++) { Node node = nodeList.item(walk); String content = node.getTextContent(); if (!uriCheck(content)) { errors.add(String.format( - "At %s: not a URI: [%s]", - entry.getKey(), content + "At %s: not a URI: [%s]", + entry.getKey(), content )); } } @@ -98,10 +161,10 @@ public List getUriErrors() throws XPathExpressionException { public List getRDFErrors() { List errors = new ArrayList(); - String error = MappingResult.hasRDFError(toRDF()); - if (error.length() > 0) { - errors.add(error); - } +// String error = MappingResult.hasRDFError(toRDF()); +// if (error.length() > 0) { +// errors.add(error); +// } return errors; } @@ -124,14 +187,4 @@ public String toRDF() { public String toString() { return toXml(); } - - public static String hasRDFError(String rdf) { - try { - InputStream in = new ByteArrayInputStream(rdf.getBytes("UTF-8")); - Model mm = ModelFactory.createDefaultModel().read(in, null, "RDF/XML"); - } catch (Exception e) { - return e.toString(); - } - return ""; - } }