Skip to content

Commit

Permalink
fix the problem where the maxBytesToParse was used in the write outgo…
Browse files Browse the repository at this point in the history
…ing content instead of the receiving content

remove the useless logging
add a test where tika attempts to parse files from an http server that will never return.
  • Loading branch information
nddipiazza committed Jun 26, 2019
1 parent 8b98d35 commit faaf75a
Show file tree
Hide file tree
Showing 13 changed files with 259 additions and 78 deletions.
9 changes: 7 additions & 2 deletions gradle.properties
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
# version
version=1.0b09
version=1.0b10

# deps
tikaVersion=1.20
Expand All @@ -8,4 +8,9 @@ commonsPoolVersion=2.6.2
junitVersion=4.12
jacksonDatabindVersion=2.4.4
args4jVersion=2.33
commonsIoVersion=2.6
commonsIoVersion=2.6

# testing deps

jettyVersion=9.4.19.v20190610
httpClientVersion=4.5.6
8 changes: 3 additions & 5 deletions tika-fork-client/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -21,17 +21,15 @@ configurations.all {
}

dependencies {
compile "org.slf4j:slf4j-api:${slf4jVersion}"
compile "org.slf4j:slf4j-log4j12:${slf4jVersion}"
compile "org.slf4j:slf4j-simple:${slf4jVersion}"
compile "org.slf4j:jcl-over-slf4j:${slf4jVersion}"
compile "org.slf4j:jul-to-slf4j:${slf4jVersion}"
compile 'org.apache.logging.log4j:log4j-slf4j-impl:2.9.0'
compile group: "org.apache.commons", name: "commons-pool2", version: "${commonsPoolVersion}"
compile group: 'commons-io', name: 'commons-io', version: "${commonsIoVersion}"


compile "org.apache.tika:tika-core:${tikaVersion}"
testCompile group: "junit", name: "junit", version: "${junitVersion}"
testCompile "org.eclipse.jetty:jetty-server:${jettyVersion}"
testCompile "org.apache.httpcomponents:httpclient:${httpClientVersion}"
}

publishing {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,15 +49,15 @@ public class TikaProcess {
private TikaRunner tikaRunner;

public TikaProcess(String javaPath,
String configDirectoryPath,
String workDirectoryPath,
String tikaDistPath,
int tikaMaxHeapSizeMb,
Properties parserProperties) {
parseContent = Boolean.parseBoolean(parserProperties.getProperty("parseContent", "false"));

parseConfigPropertiesFilePath = configDirectoryPath + File.separator + "tikafork-config-" + runUuid + ".properties";
parseContextPropertiesFilePath = configDirectoryPath + File.separator + "tikafork-context-" + runUuid + ".properties";
portsFilePath = configDirectoryPath + File.separator + "tikafork-ports-" + runUuid + ".properties";
parseConfigPropertiesFilePath = workDirectoryPath + File.separator + "tikafork-config-" + runUuid + ".properties";
parseContextPropertiesFilePath = workDirectoryPath + File.separator + "tikafork-context-" + runUuid + ".properties";
portsFilePath = workDirectoryPath + File.separator + "tikafork-ports-" + runUuid + ".properties";

command = new ArrayList<>();
command.add(javaPath == null || javaPath.trim().length() == 0 ? CURRENT_JAVA_BINARY : javaPath);
Expand All @@ -79,9 +79,9 @@ public TikaProcess(String javaPath,

command.add("-parserPropertiesFilePath");
command.add(parseConfigPropertiesFilePath);
if (configDirectoryPath != null && configDirectoryPath.trim().length() > 0) {
command.add("-configDirectoryPath");
command.add(configDirectoryPath);
if (workDirectoryPath != null && workDirectoryPath.trim().length() > 0) {
command.add("-workDirectoryPath");
command.add(workDirectoryPath);
}
try {
process = new ProcessBuilder(command)
Expand Down Expand Up @@ -157,7 +157,10 @@ private void inheritIO(final InputStream src) {
Scanner sc = new Scanner(src);
while (sc.hasNextLine()) {
String nextLine = sc.nextLine();
LOG.info(nextLine);
// Do not log stuff that snuck into stdout.
if (nextLine != null && nextLine.startsWith("TIKAFORK")) {
LOG.info(nextLine.substring(8));
}
}
}).start();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ public class TikaProcessPool implements AutoCloseable {
private GenericObjectPool pool;

public TikaProcessPool(String javaPath,
String configDirectoryPath,
String workDirectoryPath,
String tikaDistPath,
int tikaMaxHeapSizeMb,
Properties parseProperties,
Expand All @@ -29,7 +29,7 @@ public TikaProcessPool(String javaPath,
long minEvictableIdleTimeMillis,
long softMinEvictableIdleTimeMillis) throws Exception {
pool = initializePool(javaPath,
configDirectoryPath,
workDirectoryPath,
tikaDistPath,
tikaMaxHeapSizeMb,
parseProperties,
Expand Down Expand Up @@ -71,7 +71,7 @@ public Metadata parse(String baseUri,
}

public static GenericObjectPool initializePool(String javaPath,
String configDirectoryPath,
String workDirectoryPath,
String tikaDistDir,
int tikaMaxHeapSizeMb,
Properties parseProperties,
Expand All @@ -96,7 +96,7 @@ public static GenericObjectPool initializePool(String javaPath,
config.setBlockWhenExhausted(blockWhenExhausted);

GenericObjectPool pool = new GenericObjectPool<TikaProcess>(new TikaProcessFactory(javaPath,
configDirectoryPath,
workDirectoryPath,
tikaDistDir,
tikaMaxHeapSizeMb,
parseProperties), config);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ public class TikaRunner {
private int metadataOutPort = 0;
private int contentOutPort = 0;
private boolean parseContent;
private static final int DEFAULT_BUFFER_SIZE = 1024 * 4;
private static final int EOF = -1;

class TikaRunnerThreadFactory implements ThreadFactory {
public Thread newThread(Runnable r) {
Expand All @@ -54,9 +56,9 @@ public Metadata parse(String baseUri,
long maxBytesToParse) throws InterruptedException, ExecutionException, TimeoutException {
ExecutorService es = Executors.newFixedThreadPool(3, new TikaRunnerThreadFactory());
try {
es.execute(() -> {
es.submit(() -> {
try {
writeContent(baseUri, contentType, contentInPort, contentInStream, maxBytesToParse);
writeContent(baseUri, contentType, contentInPort, contentInStream);
} catch (Exception e) {
throw new RuntimeException("Failed to send content stream to forked Tika parser JVM", e);
}
Expand All @@ -73,7 +75,7 @@ public Metadata parse(String baseUri,
if (parseContent) {
Future contentFuture = es.submit(() -> {
try {
getContent(contentOutPort, contentOutputStream);
getContent(contentOutPort, contentOutputStream, maxBytesToParse);
return true;
} catch (Exception e) {
throw new RuntimeException("Failed to read content from forked Tika parser JVM", e);
Expand All @@ -85,10 +87,10 @@ public Metadata parse(String baseUri,
contentFuture.get(250, TimeUnit.MILLISECONDS);
break;
} catch (TimeoutException e) {
LOG.debug("Still waiting for content from parse");
if (Instant.now().isAfter(mustFinishByInstant)) {
throw e;
}
LOG.debug("Still waiting for content from parse");
}
}
}
Expand All @@ -99,10 +101,10 @@ public Metadata parse(String baseUri,
metadataResult = metadataFuture.get(250, TimeUnit.MILLISECONDS);
break;
} catch (TimeoutException e) {
LOG.debug("Still waiting for metadata from parse");
if (Instant.now().isAfter(mustFinishByInstant)) {
throw e;
}
LOG.debug("Still waiting for metadata from parse");
}
}
es.shutdown();
Expand All @@ -117,18 +119,16 @@ public Metadata parse(String baseUri,
private void writeContent(String baseUri,
String contentType,
int port,
InputStream contentInStream,
long maxBytesToParse) throws Exception {
InputStream contentInStream) throws Exception {
Socket socket = getSocket(InetAddress.getLocalHost().getHostAddress(), port);
try (OutputStream out = socket.getOutputStream();
BoundedInputStream boundedInputStream = new BoundedInputStream(contentInStream, maxBytesToParse)) {
try (OutputStream out = socket.getOutputStream()) {
out.write(baseUri.getBytes());
out.write('\n');
out.write(contentType.getBytes());
out.write('\n');
long numChars;
do {
numChars = IOUtils.copy(boundedInputStream, out);
numChars = IOUtils.copy(contentInStream, out);
} while (numChars > 0);
} finally {
socket.close();
Expand All @@ -150,12 +150,12 @@ private Metadata getMetadata(int port, String baseUri) throws Exception {
}
}

private void getContent(int port, OutputStream contentOutputStream) throws Exception {
private void getContent(int port, OutputStream contentOutputStream, long maxBytesToParse) throws Exception {
Socket socket = getSocket(InetAddress.getLocalHost().getHostAddress(), port);
try (InputStream in = socket.getInputStream()) {
try (BoundedInputStream boundedInputStream = new BoundedInputStream(socket.getInputStream(), maxBytesToParse)) {
long numChars;
do {
numChars = IOUtils.copy(in, contentOutputStream);
numChars = IOUtils.copy(boundedInputStream, contentOutputStream);
} while (numChars > 0);
} finally {
socket.close();
Expand Down
24 changes: 0 additions & 24 deletions tika-fork-client/src/main/resources/log4j.properties

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,8 @@
</Console>
</Appenders>
<Loggers>
<logger name="org.apache.pdfbox" level="WARN" />
<logger name="org.apache.pdfbox.pdmodel.font" level="ERROR" />
<logger name="org.apache.pdfbox.contentstream.PDFStreamEngine" level="ERROR" />
<logger name="org.apache.pdfbox" level="ERROR" />
<logger name="org.apache.tika.config.InitializableProblemHandler" level="ERROR" />
<Root level="INFO">
<AppenderRef ref="Console"/>
</Root>
Expand Down
Loading

0 comments on commit faaf75a

Please sign in to comment.