Skip to content

Commit

Permalink
create a 2.0 version with better thread management. Hoping this fixes
Browse files Browse the repository at this point in the history
an issue where after crawling for a certain amount of time, suddenly
all tika fork parsers return EOF exceptions on all requests.
  • Loading branch information
nddipiazza committed Mar 28, 2020
1 parent 143d204 commit 2c22997
Show file tree
Hide file tree
Showing 10 changed files with 5,844 additions and 2,362 deletions.
12 changes: 12 additions & 0 deletions CHANGELOG.txt
Original file line number Diff line number Diff line change
@@ -1,3 +1,15 @@
2.0

overhaul on the timeout management and thread management

1.0onenote5

adding the stack trace into the warning for "cannot parse metadata"

1.0onenote4

making the onenote parser for 2007 office documents perform better.

1.0b09
- move to slf4j log4j2.xml log configuration files.
- hide all pdfbox verbose warnings
Expand Down
9 changes: 9 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,3 +18,12 @@ This program attempts to deal with these problems:
## Usage

See the [Tika Fork Process Unit Test](tika-fork/src/test/java/org/apache/tika/fork/TikaProcessTest.java) for several detailed examples of how to use the program.


## Running the tests

Run ./gradlew :tika-fork-main:clean :tika-fork-main:dist

first to set up the tika remote build.

Then go ahead and run the tests.
4 changes: 2 additions & 2 deletions gradle.properties
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
# version
version=1.0onenote5
version=2.0

# deps
# depsre
tikaVersion=1.23-lucidworks5
slf4jVersion=1.7.26
commonsPoolVersion=2.6.2
Expand Down
7 changes: 0 additions & 7 deletions release.txt

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,8 @@ public TikaProcess(String javaPath,
command.add("-Xmx" + tikaMaxHeapSizeMb + "m");
}
command.add("-Djava.awt.headless=true");
// remove this later
//command.add("-agentlib:jdwp=transport=dt_socket,server=y,suspend=y,address=9090");
command.add("-cp");
command.add(tikaDistPath + File.separator + "*");
command.add("org.apache.tika.fork.main.TikaForkMain");
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
package org.apache.tika.client;

import org.apache.commons.io.input.BoundedInputStream;
import org.apache.tika.io.IOUtils;
import org.apache.tika.metadata.Metadata;
import org.slf4j.Logger;
Expand Down Expand Up @@ -30,6 +29,7 @@ public class TikaRunner {
private int metadataOutPort = 0;
private int contentOutPort = 0;
private boolean parseContent;
private int contentChunkSize = 5000000;

class TikaRunnerThreadFactory implements ThreadFactory {
public Thread newThread(Runnable r) {
Expand Down Expand Up @@ -64,7 +64,7 @@ public Metadata parse(String baseUri,
});
Future<Metadata> metadataFuture = es.submit(() -> {
try {
return getMetadata(metadataOutPort, baseUri, maxBytesToParse);
return getMetadata(metadataOutPort, baseUri);
} catch (Exception e) {
throw new RuntimeException("Failed to read metadata from forked Tika parser JVM", e);
}
Expand All @@ -73,8 +73,13 @@ public Metadata parse(String baseUri,
Instant mustFinishByInstant = Instant.now().plus(Duration.ofMillis(abortAfterMs));
if (parseContent) {
Future contentFuture = es.submit(() -> {
try {
getContent(contentOutPort, contentOutputStream, maxBytesToParse);
try (TikaRunnerGetContentResult contentResult = getContent(contentOutPort, contentOutputStream, maxBytesToParse, baseUri)) {
while (!metadataFuture.isDone()) {
if (Instant.now().isAfter(mustFinishByInstant)) {
throw new TimeoutException("Timed out waiting " + abortAfterMs + " ms for metadata after content was fully parsed.");
}
Thread.sleep(100L);
}
return true;
} catch (Exception e) {
throw new RuntimeException("Failed to read content from forked Tika parser JVM", e);
Expand Down Expand Up @@ -134,34 +139,66 @@ private void writeContent(String baseUri,
}
}

private Metadata getMetadata(int port, String baseUri, long maxBytesToParse) throws Exception {
private Metadata getMetadata(int port, String baseUri) throws Exception {
Socket socket = getSocket(InetAddress.getLocalHost().getHostAddress(), port);
try (InputStream metadataIn = socket.getInputStream()) {
ObjectInputStream objectInputStream = new ObjectInputStream(metadataIn);
try {
return (Metadata) objectInputStream.readObject();
} catch (EOFException e) {
// Is there some particular IOExceptions we should allow not to fall through?
LOG.warn("Could not parse metadata for {} due to EOFException. May have exceeded max bytes to return {}", baseUri, maxBytesToParse);
return new Metadata();
LOG.warn("Could not parse metadata for {} due to EOFException.", baseUri);
Metadata blankedResult = new Metadata();
blankedResult.set("eof_during_parse", "true");
return blankedResult;
}
} finally {
socket.close();
}
}

private void getContent(int port, OutputStream contentOutputStream, long maxBytesToParse) throws Exception {
Socket socket = getSocket(InetAddress.getLocalHost().getHostAddress(), port);
try (BoundedInputStream boundedInputStream = new BoundedInputStream(socket.getInputStream(), maxBytesToParse)) {
long numChars;
do {
numChars = IOUtils.copy(boundedInputStream, contentOutputStream);
} while (numChars > 0);
} finally {
socket.close();
static private class TikaRunnerGetContentResult implements AutoCloseable {
public Socket socket;
public InputStream inputStream;
public boolean exceededMaxBytes;

@Override
public void close() {
try {
inputStream.close();
} catch (IOException e) {
LOG.error("Could not close GetContent input stream", e);
}
try {
socket.close();
} catch (IOException e) {
LOG.error("Could not close GetContent socket", e);
}
}
}

private TikaRunnerGetContentResult getContent(int port, OutputStream contentOutputStream, long maxBytesToParse, String baseUri) throws Exception {
TikaRunnerGetContentResult result = new TikaRunnerGetContentResult();
result.socket = getSocket(InetAddress.getLocalHost().getHostAddress(), port);
result.inputStream = result.socket.getInputStream();
int numParsed = 0;
int numChars;
do {
int nextNumBytesToParse = (numParsed + contentChunkSize > (int)maxBytesToParse) ?
((int)maxBytesToParse - numParsed) : contentChunkSize;
byte[] buf = new byte[nextNumBytesToParse];
numChars = IOUtils.read(result.inputStream, buf, 0, nextNumBytesToParse);
contentOutputStream.write(buf, 0, numChars);
numParsed += numChars;
if (numParsed >= maxBytesToParse) {
LOG.info("Max bytes {} reached on {}. Ignoring the rest.", maxBytesToParse, baseUri);
result.exceededMaxBytes = true;
return result;
}
} while (numChars > 0);
return result;
}

private static Socket getSocket(String host, int port) throws InterruptedException {
Socket socket;
int maxRetries = 20;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ public class TikaProcessTest {
String oneNoteFilePath = "test-files" + File.separator + "test-one-note.one";
String bombContentType = "application/vnd.ms-excel";
Properties parseProperties;
long maxBytesToParse = 100000000;
long maxBytesToParse = 100000000; // 100 MB is a lot for a test, might wanna decrease this

AssertionError exc;

Expand Down Expand Up @@ -228,7 +228,14 @@ public void testExternalTikaBombZipWithCsvSingleThread() throws Exception {
-1)) {
ByteArrayOutputStream contentOutputStream = new ByteArrayOutputStream();
try (FileInputStream fis = new FileInputStream(zipBombPath)) {
tikaProcessPool.parse(zipBombPath, "application/zip", fis, contentOutputStream, 300000L, maxBytesToParse);
Metadata metadata = null;
try {
metadata = tikaProcessPool.parse(zipBombPath, "application/zip", fis, contentOutputStream, 15000L, maxBytesToParse);
Assert.fail("Expected a timeout");
} catch (Exception e) {
Assert.assertEquals(100000000, contentOutputStream.toByteArray().length);
Assert.assertNull(metadata);
}
}
}
}
Expand Down Expand Up @@ -291,7 +298,7 @@ public void testTikaProcessMaxBytesParsed() throws Exception {
100
);
LOG.info("Content from the tika process: {}", contentOutputStream.toString("UTF-8"));
Assert.assertEquals(101, contentOutputStream.toString("UTF-8").length());
Assert.assertEquals(100, contentOutputStream.toString("UTF-8").length());
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import java.net.ServerSocket;
import java.net.Socket;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
Expand All @@ -41,7 +42,6 @@ public class TikaForkMain {
private static final Logger LOG = LoggerFactory.getLogger(TikaForkMain.class);

private static TikaParsingHandler getContentHandler(String mainUrl,
Metadata metadata,
OutputStream out,
boolean extractHtmlLinks) throws TikaException {
ContentHandler main = new TikaBodyContentHandler(out, TikaConstants.defaultOutputEncoding);
Expand Down Expand Up @@ -126,6 +126,8 @@ private void run() throws Exception {

contentInputStream.connect(contentOutputStream);

CountDownLatch latch = new CountDownLatch(3);

es.execute(() -> {
try {
parseFile(metadataOutputStream, contentOutputStream);
Expand All @@ -136,10 +138,12 @@ private void run() throws Exception {
LOG.debug("Couldn't close content output stream.");
}
throw new RuntimeException("Could not parse file", e);
} finally {
latch.countDown();
}
});

Future metadataFuture = es.submit(() -> {
es.execute(() -> {
try {
writeMetadata(metadataInputStream);
} catch (Exception e) {
Expand All @@ -149,10 +153,12 @@ private void run() throws Exception {
LOG.debug("Couldn't close metadata output stream.");
}
throw new RuntimeException("Could not write metadata", e);
} finally {
latch.countDown();
}
});

Future contentFuture = es.submit(() -> {
es.execute(() -> {
try {
writeContent(contentInputStream);
} catch (Exception e) {
Expand All @@ -162,11 +168,16 @@ private void run() throws Exception {
LOG.debug("Couldn't close content input stream.");
}
throw new RuntimeException("Could not write content", e);
} finally {
latch.countDown();
}
});

metadataFuture.get();
contentFuture.get();
try {
latch.await();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
} finally {
try {
Expand Down Expand Up @@ -328,7 +339,7 @@ private void parseFile(OutputStream metadataOutputStream, OutputStream contentOu

TikaInputStream tikaInputStream = TikaInputStream.get(inputStream);

TikaParsingHandler contentHandler = getContentHandler(baseUri, metadata, contentOutputStream, extractHtmlLinks);
TikaParsingHandler contentHandler = getContentHandler(baseUri, contentOutputStream, extractHtmlLinks);
compositeParser.parse(tikaInputStream, contentHandler, metadata, context);

objectOutputStream.writeObject(metadata);
Expand Down
Loading

0 comments on commit 2c22997

Please sign in to comment.