Skip to content

Commit

Permalink
create a 2.0 version with better thread management. Hoping this fixes
Browse files Browse the repository at this point in the history
an issue where after crawling for a certain amount of time, suddenly
all tika fork parsers return EOF exceptions on all requests.
  • Loading branch information
nddipiazza committed Jun 11, 2020
1 parent 2c22997 commit 952163a
Show file tree
Hide file tree
Showing 13 changed files with 193 additions and 14 deletions.
15 changes: 15 additions & 0 deletions CHANGELOG.txt
Original file line number Diff line number Diff line change
@@ -1,3 +1,18 @@
2.3

bump to tika 1.24.1 instead of using the custom build

create allow an alternative method that will do the tika fork parser with a filename as an input instead of an input
stream. this eliminates the file download portion of parsing, and can cut down significantly on timeout exceptions.

2.2

stop filtering output from tika fork parser. it is crucial when there are issues.

2.1

log the reason for content not being downloaded

2.0

overhaul on the timeout management and thread management
Expand Down
5 changes: 2 additions & 3 deletions gradle.properties
Original file line number Diff line number Diff line change
@@ -1,11 +1,10 @@
# version
version=2.0
version=2.3

# depsre
tikaVersion=1.23-lucidworks5
tikaVersion=1.24.1
slf4jVersion=1.7.26
commonsPoolVersion=2.6.2
junitVerneed to sion=4.12
jacksonDatabindVersion=2.4.4
args4jVersion=2.33
commonsIoVersion=2.6
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -160,8 +160,8 @@ private void inheritIO(final InputStream src) {
while (sc.hasNextLine()) {
String nextLine = sc.nextLine();
// Do not log stuff that snuck into stdout.
if (nextLine != null && nextLine.startsWith("TIKAFORK")) {
LOG.info(nextLine.substring(8));
if (nextLine != null) {
LOG.info(nextLine);
}
}
}).start();
Expand All @@ -175,4 +175,13 @@ public Metadata parse(String baseUri,
long maxBytesToParse) throws InterruptedException, ExecutionException, TimeoutException {
return tikaRunner.parse(baseUri, contentType, contentInputStream, contentOutputStream, abortAfterMs, maxBytesToParse);
}

public Metadata parse(String baseUri,
String contentType,
String filename,
OutputStream contentOutputStream,
long abortAfterMs,
long maxBytesToParse) throws InterruptedException, ExecutionException, TimeoutException, IOException {
return tikaRunner.parse(baseUri, contentType, filename, contentOutputStream, abortAfterMs, maxBytesToParse);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,33 @@ public Metadata parse(String baseUri,
}
}

public Metadata parse(String baseUri,
String contentType,
String filename,
OutputStream contentOutputStream,
long abortAfterMs,
long maxBytesToParse) throws Exception {
TikaProcess process = (TikaProcess) pool.borrowObject();
try {
return process.parse(baseUri,
contentType,
filename,
contentOutputStream,
abortAfterMs,
maxBytesToParse);
} catch (Exception e) {
pool.invalidateObject(process);
// Do not return the object to the pool twice
process = null;
throw e;
} finally {
// Make sure the object is returned to the pool
if (null != process) {
pool.returnObject(process);
}
}
}

public static GenericObjectPool initializePool(String javaPath,
String workDirectoryPath,
String tikaDistDir,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import org.slf4j.LoggerFactory;

import java.io.EOFException;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.ObjectInputStream;
Expand Down Expand Up @@ -47,6 +48,22 @@ public TikaRunner(int contentInPort,
this.parseContent = parseContent;
}

public Metadata parse(String baseUri,
String contentType,
String filename,
OutputStream contentOutputStream,
long abortAfterMs,
long maxBytesToParse) throws InterruptedException, ExecutionException, TimeoutException, IOException {
try (FileInputStream fis = new FileInputStream(filename)) {
return parse(baseUri,
contentType,
fis,
contentOutputStream,
abortAfterMs,
maxBytesToParse);
}
}

public Metadata parse(String baseUri,
String contentType,
InputStream contentInStream,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ public void run() throws Exception {
fis,
contentOutputStream,
300000L,
500
50000000
);

System.out.println("Metadata elements parsed: " + metadata.size());
Expand Down
1 change: 1 addition & 0 deletions tika-fork-client/src/main/resources/log4j2.xml
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
</Appenders>
<Loggers>
<logger name="org.apache.pdfbox" level="ERROR" />
<logger name="org.apache.tika.parser.microsoft.onenote" level="ERROR" />
<logger name="org.apache.tika.config.InitializableProblemHandler" level="ERROR" />
<Root level="INFO">
<AppenderRef ref="Console"/>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ public class TikaProcessTest {
String bombFilePath = "test-files" + File.separator + "bomb.xls";
String zipBombPath = "test-files" + File.separator + "zip-bomb.zip";
String oneNoteFilePath = "test-files" + File.separator + "test-one-note.one";
String encryptedPpt = "test-files" + File.separator + "encrypted.ppt";
String bombContentType = "application/vnd.ms-excel";
Properties parseProperties;
long maxBytesToParse = 100000000; // 100 MB is a lot for a test, might wanna decrease this
Expand Down Expand Up @@ -62,7 +63,28 @@ public void testExternalTikaMultiThreaded() throws Exception {
3000,
-1,
-1)) {
doParse(tikaProcessPool, true);
doParse(tikaProcessPool, true, false);
}
}

@Test
public void testExternalTikaMultiThreadedSendFilenamesInsteadOfInputStreams() throws Exception {
numThreads = 5;
numFilesPerThread = 50;
try (TikaProcessPool tikaProcessPool = new TikaProcessPool(javaPath,
System.getProperty("java.io.tmpdir"),
tikaDistPath,
200,
parseProperties,
-1,
-1,
20,
true,
30000,
3000,
-1,
-1)) {
doParse(tikaProcessPool, true, true);
}
}

Expand All @@ -83,7 +105,7 @@ public void testExternalTikaSingleThreaded() throws Exception {
3000,
-1,
-1)) {
doParse(tikaProcessPool, true);
doParse(tikaProcessPool, true, false);
}
}

Expand All @@ -105,11 +127,13 @@ public void testExternalTikaSingleNoContent() throws Exception {
3000,
-1,
-1)) {
doParse(tikaProcessPool, false);
doParse(tikaProcessPool, false, false);
}
}

private void doParse(TikaProcessPool tikaProcessPool, boolean parseContent) throws Exception {
private void doParse(TikaProcessPool tikaProcessPool,
boolean parseContent,
boolean sendFilenameInsteadOfInputStream) throws Exception {
AtomicInteger numParsed = new AtomicInteger(0);
Runnable r = () -> {
try {
Expand All @@ -130,7 +154,7 @@ private void doParse(TikaProcessPool tikaProcessPool, boolean parseContent) thro
} else if (i % 4 == 1) {
path = pdfPath;
contentType = "application/pdf";
numExpectedMetadataElms = 41;
numExpectedMetadataElms = 44;
numContentCharsExpected = parseContent ? 1069 : 0;
} else if (i % 4 == 2) {
path = oneNoteFilePath;
Expand All @@ -144,19 +168,34 @@ private void doParse(TikaProcessPool tikaProcessPool, boolean parseContent) thro
numContentCharsExpected = parseContent ? 2648 : 0;
}
ByteArrayOutputStream contentOutputStream = new ByteArrayOutputStream();
try (FileInputStream fis = new FileInputStream(path)) {
if (sendFilenameInsteadOfInputStream) {
Metadata metadata = tikaProcessPool.parse(path,
contentType,
fis,
path,
contentOutputStream,
300000L,
maxBytesToParse
);
);
LOG.info("Metadata from the tika process: {}", metadata);
Assert.assertEquals(numExpectedMetadataElms, metadata.size());
//LOG.info("Content from the tika process: {}", contentOutputStream.toString("UTF-8"));
Assert.assertEquals(numContentCharsExpected, contentOutputStream.toString("UTF-8").length());
numParsed.incrementAndGet();
} else {
try (FileInputStream fis = new FileInputStream(path)) {
Metadata metadata = tikaProcessPool.parse(path,
contentType,
fis,
contentOutputStream,
300000L,
maxBytesToParse
);
LOG.info("Metadata from the tika process: {}", metadata);
Assert.assertEquals(numExpectedMetadataElms, metadata.size());
//LOG.info("Content from the tika process: {}", contentOutputStream.toString("UTF-8"));
Assert.assertEquals(numContentCharsExpected, contentOutputStream.toString("UTF-8").length());
numParsed.incrementAndGet();
}
}
}
} catch (Exception ex) {
Expand Down Expand Up @@ -302,4 +341,41 @@ public void testTikaProcessMaxBytesParsed() throws Exception {
}
}
}


@Test
public void testTikaProcessEncryptedPpt() throws Exception {
try (TikaProcessPool tikaProcessPool = new TikaProcessPool(javaPath,
System.getProperty("java.io.tmpdir"),
tikaDistPath,
200,
parseProperties,
0,
-1,
3,
true,
30000,
1000,
5000,
-1)) {
String path;
String contentType;

path = encryptedPpt;
contentType = "application/vnd.ms-powerpoint";

ByteArrayOutputStream contentOutputStream = new ByteArrayOutputStream();
try (FileInputStream fis = new FileInputStream(path)) {
Metadata metadata = tikaProcessPool.parse(path,
contentType,
fis,
contentOutputStream,
300000L,
100
);
LOG.info("Content from the tika process: {}", contentOutputStream.toString("UTF-8"));
Assert.assertEquals(0, contentOutputStream.toString("UTF-8").length());
}
}
}
}
1 change: 1 addition & 0 deletions tika-fork-client/src/test/resources/log4j2.xml
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
</Appenders>
<Loggers>
<logger name="org.apache.pdfbox" level="ERROR" />
<logger name="org.apache.tika.parser.microsoft.onenote" level="ERROR" />
<logger name="org.apache.tika.config.InitializableProblemHandler" level="ERROR" />
<Root level="INFO">
<AppenderRef ref="Console"/>
Expand Down
Binary file added tika-fork-client/test-files/encrypted.ppt
Binary file not shown.
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,7 @@ private void run() throws Exception {
} catch (IOException e1) {
LOG.debug("Couldn't close content output stream.");
}
LOG.error("Could not parse file", e);
throw new RuntimeException("Could not parse file", e);
} finally {
latch.countDown();
Expand All @@ -152,6 +153,7 @@ private void run() throws Exception {
} catch (IOException e1) {
LOG.debug("Couldn't close metadata output stream.");
}
LOG.error("Could not parse metadata", e);
throw new RuntimeException("Could not write metadata", e);
} finally {
latch.countDown();
Expand Down
1 change: 1 addition & 0 deletions tika-fork-main/src/main/resources/log4j2.xml
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
</Appenders>
<Loggers>
<logger name="org.apache.pdfbox" level="ERROR" />
<logger name="org.apache.tika.parser.microsoft.onenote" level="ERROR" />
<logger name="org.apache.tika.config" level="ERROR" />
<Root level="INFO">
<AppenderRef ref="Console"/>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ public class TikaForkMainTest {
String txtPath = "test-files" + File.separator + "out.txt";
String bombFilePath = "test-files" + File.separator + "bomb.xls";
String zipBombPath = "test-files" + File.separator + "zip-bomb.zip";
String encryptedPptPath = "test-files" + File.separator + "encrypted.ppt";

public static Integer findRandomOpenPortOnAllLocalInterfaces() throws IOException {
try (ServerSocket socket = new ServerSocket(0)) {
Expand Down Expand Up @@ -790,4 +791,34 @@ public void testMaxBytesZipBomb() throws Exception {

singleThreadEx.shutdownNow();
}

@Test
public void testEncryptedPpt() throws Exception {
ExecutorService singleThreadEx = Executors.newSingleThreadExecutor();

singleThreadEx.execute(() -> {
try {
TikaForkMain.main(args);
} catch (Exception e) {
throw new RuntimeException(e);
}
});

TikaRunner tikaRunner = new TikaRunner(contentInServerPort, metadataOutServerPort, contentOutServerPort, true);

ByteArrayOutputStream contentOutputStream = new ByteArrayOutputStream();
try (FileInputStream fis = new FileInputStream(encryptedPptPath)) {
Metadata metadata = tikaRunner.parse(encryptedPptPath,
"application/vnd.ms-powerpoint",
fis,
contentOutputStream,
4000L,
400000
);

System.out.println(metadata);
}

singleThreadEx.shutdownNow();
}
}

0 comments on commit 952163a

Please sign in to comment.