diff --git a/gradle.properties b/gradle.properties index c9b92e3..e5c5121 100644 --- a/gradle.properties +++ b/gradle.properties @@ -1,5 +1,5 @@ # version -version=1.0b09 +version=1.0b10 # deps tikaVersion=1.20 @@ -8,4 +8,9 @@ commonsPoolVersion=2.6.2 junitVersion=4.12 jacksonDatabindVersion=2.4.4 args4jVersion=2.33 -commonsIoVersion=2.6 \ No newline at end of file +commonsIoVersion=2.6 + +# testing deps + +jettyVersion=9.4.19.v20190610 +httpClientVersion=4.5.6 \ No newline at end of file diff --git a/tika-fork-client/build.gradle b/tika-fork-client/build.gradle index d36bc3b..63fcb15 100644 --- a/tika-fork-client/build.gradle +++ b/tika-fork-client/build.gradle @@ -21,17 +21,15 @@ configurations.all { } dependencies { - compile "org.slf4j:slf4j-api:${slf4jVersion}" - compile "org.slf4j:slf4j-log4j12:${slf4jVersion}" - compile "org.slf4j:slf4j-simple:${slf4jVersion}" - compile "org.slf4j:jcl-over-slf4j:${slf4jVersion}" - compile "org.slf4j:jul-to-slf4j:${slf4jVersion}" + compile 'org.apache.logging.log4j:log4j-slf4j-impl:2.9.0' compile group: "org.apache.commons", name: "commons-pool2", version: "${commonsPoolVersion}" compile group: 'commons-io', name: 'commons-io', version: "${commonsIoVersion}" compile "org.apache.tika:tika-core:${tikaVersion}" testCompile group: "junit", name: "junit", version: "${junitVersion}" + testCompile "org.eclipse.jetty:jetty-server:${jettyVersion}" + testCompile "org.apache.httpcomponents:httpclient:${httpClientVersion}" } publishing { diff --git a/tika-fork-client/src/main/java/org/apache/tika/client/TikaProcess.java b/tika-fork-client/src/main/java/org/apache/tika/client/TikaProcess.java index 87f4191..faece51 100644 --- a/tika-fork-client/src/main/java/org/apache/tika/client/TikaProcess.java +++ b/tika-fork-client/src/main/java/org/apache/tika/client/TikaProcess.java @@ -49,15 +49,15 @@ public class TikaProcess { private TikaRunner tikaRunner; public TikaProcess(String javaPath, - String configDirectoryPath, + String workDirectoryPath, String tikaDistPath, int tikaMaxHeapSizeMb, Properties parserProperties) { parseContent = Boolean.parseBoolean(parserProperties.getProperty("parseContent", "false")); - parseConfigPropertiesFilePath = configDirectoryPath + File.separator + "tikafork-config-" + runUuid + ".properties"; - parseContextPropertiesFilePath = configDirectoryPath + File.separator + "tikafork-context-" + runUuid + ".properties"; - portsFilePath = configDirectoryPath + File.separator + "tikafork-ports-" + runUuid + ".properties"; + parseConfigPropertiesFilePath = workDirectoryPath + File.separator + "tikafork-config-" + runUuid + ".properties"; + parseContextPropertiesFilePath = workDirectoryPath + File.separator + "tikafork-context-" + runUuid + ".properties"; + portsFilePath = workDirectoryPath + File.separator + "tikafork-ports-" + runUuid + ".properties"; command = new ArrayList<>(); command.add(javaPath == null || javaPath.trim().length() == 0 ? CURRENT_JAVA_BINARY : javaPath); @@ -79,9 +79,9 @@ public TikaProcess(String javaPath, command.add("-parserPropertiesFilePath"); command.add(parseConfigPropertiesFilePath); - if (configDirectoryPath != null && configDirectoryPath.trim().length() > 0) { - command.add("-configDirectoryPath"); - command.add(configDirectoryPath); + if (workDirectoryPath != null && workDirectoryPath.trim().length() > 0) { + command.add("-workDirectoryPath"); + command.add(workDirectoryPath); } try { process = new ProcessBuilder(command) @@ -157,7 +157,10 @@ private void inheritIO(final InputStream src) { Scanner sc = new Scanner(src); while (sc.hasNextLine()) { String nextLine = sc.nextLine(); - LOG.info(nextLine); + // Do not log stuff that snuck into stdout. + if (nextLine != null && nextLine.startsWith("TIKAFORK")) { + LOG.info(nextLine.substring(8)); + } } }).start(); } diff --git a/tika-fork-client/src/main/java/org/apache/tika/client/TikaProcessPool.java b/tika-fork-client/src/main/java/org/apache/tika/client/TikaProcessPool.java index a0fc7eb..86d8130 100644 --- a/tika-fork-client/src/main/java/org/apache/tika/client/TikaProcessPool.java +++ b/tika-fork-client/src/main/java/org/apache/tika/client/TikaProcessPool.java @@ -16,7 +16,7 @@ public class TikaProcessPool implements AutoCloseable { private GenericObjectPool pool; public TikaProcessPool(String javaPath, - String configDirectoryPath, + String workDirectoryPath, String tikaDistPath, int tikaMaxHeapSizeMb, Properties parseProperties, @@ -29,7 +29,7 @@ public TikaProcessPool(String javaPath, long minEvictableIdleTimeMillis, long softMinEvictableIdleTimeMillis) throws Exception { pool = initializePool(javaPath, - configDirectoryPath, + workDirectoryPath, tikaDistPath, tikaMaxHeapSizeMb, parseProperties, @@ -71,7 +71,7 @@ public Metadata parse(String baseUri, } public static GenericObjectPool initializePool(String javaPath, - String configDirectoryPath, + String workDirectoryPath, String tikaDistDir, int tikaMaxHeapSizeMb, Properties parseProperties, @@ -96,7 +96,7 @@ public static GenericObjectPool initializePool(String javaPath, config.setBlockWhenExhausted(blockWhenExhausted); GenericObjectPool pool = new GenericObjectPool(new TikaProcessFactory(javaPath, - configDirectoryPath, + workDirectoryPath, tikaDistDir, tikaMaxHeapSizeMb, parseProperties), config); diff --git a/tika-fork-client/src/main/java/org/apache/tika/client/TikaRunner.java b/tika-fork-client/src/main/java/org/apache/tika/client/TikaRunner.java index 7ab0772..01e9b7d 100644 --- a/tika-fork-client/src/main/java/org/apache/tika/client/TikaRunner.java +++ b/tika-fork-client/src/main/java/org/apache/tika/client/TikaRunner.java @@ -29,6 +29,8 @@ public class TikaRunner { private int metadataOutPort = 0; private int contentOutPort = 0; private boolean parseContent; + private static final int DEFAULT_BUFFER_SIZE = 1024 * 4; + private static final int EOF = -1; class TikaRunnerThreadFactory implements ThreadFactory { public Thread newThread(Runnable r) { @@ -54,9 +56,9 @@ public Metadata parse(String baseUri, long maxBytesToParse) throws InterruptedException, ExecutionException, TimeoutException { ExecutorService es = Executors.newFixedThreadPool(3, new TikaRunnerThreadFactory()); try { - es.execute(() -> { + es.submit(() -> { try { - writeContent(baseUri, contentType, contentInPort, contentInStream, maxBytesToParse); + writeContent(baseUri, contentType, contentInPort, contentInStream); } catch (Exception e) { throw new RuntimeException("Failed to send content stream to forked Tika parser JVM", e); } @@ -73,7 +75,7 @@ public Metadata parse(String baseUri, if (parseContent) { Future contentFuture = es.submit(() -> { try { - getContent(contentOutPort, contentOutputStream); + getContent(contentOutPort, contentOutputStream, maxBytesToParse); return true; } catch (Exception e) { throw new RuntimeException("Failed to read content from forked Tika parser JVM", e); @@ -85,10 +87,10 @@ public Metadata parse(String baseUri, contentFuture.get(250, TimeUnit.MILLISECONDS); break; } catch (TimeoutException e) { - LOG.debug("Still waiting for content from parse"); if (Instant.now().isAfter(mustFinishByInstant)) { throw e; } + LOG.debug("Still waiting for content from parse"); } } } @@ -99,10 +101,10 @@ public Metadata parse(String baseUri, metadataResult = metadataFuture.get(250, TimeUnit.MILLISECONDS); break; } catch (TimeoutException e) { - LOG.debug("Still waiting for metadata from parse"); if (Instant.now().isAfter(mustFinishByInstant)) { throw e; } + LOG.debug("Still waiting for metadata from parse"); } } es.shutdown(); @@ -117,18 +119,16 @@ public Metadata parse(String baseUri, private void writeContent(String baseUri, String contentType, int port, - InputStream contentInStream, - long maxBytesToParse) throws Exception { + InputStream contentInStream) throws Exception { Socket socket = getSocket(InetAddress.getLocalHost().getHostAddress(), port); - try (OutputStream out = socket.getOutputStream(); - BoundedInputStream boundedInputStream = new BoundedInputStream(contentInStream, maxBytesToParse)) { + try (OutputStream out = socket.getOutputStream()) { out.write(baseUri.getBytes()); out.write('\n'); out.write(contentType.getBytes()); out.write('\n'); long numChars; do { - numChars = IOUtils.copy(boundedInputStream, out); + numChars = IOUtils.copy(contentInStream, out); } while (numChars > 0); } finally { socket.close(); @@ -150,12 +150,12 @@ private Metadata getMetadata(int port, String baseUri) throws Exception { } } - private void getContent(int port, OutputStream contentOutputStream) throws Exception { + private void getContent(int port, OutputStream contentOutputStream, long maxBytesToParse) throws Exception { Socket socket = getSocket(InetAddress.getLocalHost().getHostAddress(), port); - try (InputStream in = socket.getInputStream()) { + try (BoundedInputStream boundedInputStream = new BoundedInputStream(socket.getInputStream(), maxBytesToParse)) { long numChars; do { - numChars = IOUtils.copy(in, contentOutputStream); + numChars = IOUtils.copy(boundedInputStream, contentOutputStream); } while (numChars > 0); } finally { socket.close(); diff --git a/tika-fork-client/src/main/resources/log4j.properties b/tika-fork-client/src/main/resources/log4j.properties deleted file mode 100644 index 089e424..0000000 --- a/tika-fork-client/src/main/resources/log4j.properties +++ /dev/null @@ -1,24 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -#info,debug, error,fatal ... -log4j.rootLogger=info,stdout - -#console -log4j.appender.stdout=org.apache.log4j.ConsoleAppender -log4j.appender.stdout.layout=org.apache.log4j.PatternLayout -log4j.appender.stdout.Target=System.out - -log4j.appender.stdout.layout.ConversionPattern= %-5p [%t]: %m%n diff --git a/tika-fork-client/src/test/resources/log2j.xml b/tika-fork-client/src/main/resources/log4j2.xml similarity index 62% rename from tika-fork-client/src/test/resources/log2j.xml rename to tika-fork-client/src/main/resources/log4j2.xml index 2270729..c94cfd8 100644 --- a/tika-fork-client/src/test/resources/log2j.xml +++ b/tika-fork-client/src/main/resources/log4j2.xml @@ -6,9 +6,8 @@ - - - + + diff --git a/tika-fork-client/src/test/java/org/apache/tika/fork/TikaDeadParseTest.java b/tika-fork-client/src/test/java/org/apache/tika/fork/TikaDeadParseTest.java new file mode 100644 index 0000000..f219e96 --- /dev/null +++ b/tika-fork-client/src/test/java/org/apache/tika/fork/TikaDeadParseTest.java @@ -0,0 +1,156 @@ +package org.apache.tika.fork; + +import org.apache.http.client.methods.HttpGet; +import org.apache.http.impl.client.CloseableHttpClient; +import org.apache.http.impl.client.HttpClients; +import org.apache.tika.client.TikaProcessPool; +import org.apache.tika.metadata.Metadata; +import org.eclipse.jetty.server.Request; +import org.eclipse.jetty.server.Server; +import org.eclipse.jetty.server.handler.AbstractHandler; +import org.junit.Before; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.servlet.ServletException; +import javax.servlet.http.HttpServletRequest; +import javax.servlet.http.HttpServletResponse; +import java.io.ByteArrayOutputStream; +import java.io.File; +import java.io.IOException; +import java.io.InputStream; +import java.net.InetAddress; +import java.net.ServerSocket; +import java.util.ArrayList; +import java.util.List; +import java.util.Properties; +import java.util.Random; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicReference; + +/** + * Test parses from a web server that freezes up during the request. + */ +public class TikaDeadParseTest { + private static final Logger LOG = LoggerFactory.getLogger(TikaDeadParseTest.class); + + String tikaDistPath; + String javaPath = "java"; + Properties parseProperties; + long maxBytesToParse = 256000000; + int numThreads = 5; + int numRepeats = 2; + int maxRandomDelay = 4000; + + @Before + public void init() { + tikaDistPath = ".." + File.separator + "tika-fork-main" + File.separator + "build" + File.separator + "dist"; + parseProperties = new Properties(); + parseProperties.setProperty("parseContent", "true"); + } + + public static Integer findRandomOpenPortOnAllLocalInterfaces() throws IOException { + try (ServerSocket socket = new ServerSocket(0)) { + return socket.getLocalPort(); + } + } + + @Test + public void testFrozenParse() throws Exception { + Integer port = findRandomOpenPortOnAllLocalInterfaces(); + final Server server = new Server(port); + Thread neverReturnsFromThread = new Thread(() -> { + try { + server.setStopTimeout(-1); + server.setHandler(new AbstractHandler() { + @Override + public void handle(String target, + Request baseRequest, + HttpServletRequest request, + HttpServletResponse response) throws IOException, ServletException { + response.setContentType("text/html;charset=utf-8"); + response.setStatus(HttpServletResponse.SC_OK); + baseRequest.setHandled(true); + LOG.info(target + " is the request"); + response.getWriter().println("start the fun."); + response.getWriter().flush(); + int count = 10; + while (--count > 0) { + try { + Thread.sleep(1000); + response.getWriter().println("continue the fun."); + response.getWriter().flush(); + } catch (InterruptedException e) { + } + } + response.getWriter().println("end the fun."); + LOG.info("Done with web request."); + } + }); + server.start(); + } catch (Exception e) { + System.err.println("Couldn't create embedded jetty server"); + e.printStackTrace(); + } + }); + try (TikaProcessPool tikaProcessPool = new TikaProcessPool(javaPath, + System.getProperty("java.io.tmpdir"), + tikaDistPath, + 200, + parseProperties, + -1, + -1, + 20, + true, + 6000, + 3000, + -1, + -1)) { + AtomicReference exc = new AtomicReference<>(); + neverReturnsFromThread.start(); + Runnable r = () -> { + for (int i = 0; i < numRepeats; ++i) { + try { + LOG.info("Thread {} repeat {}", Thread.currentThread().getId(), i); + Thread.sleep(new Random().nextInt(maxRandomDelay)); + String uri = "http://" + InetAddress.getLocalHost().getHostAddress() + ":" + port; + LOG.info("Server is started on {}", uri); + CloseableHttpClient client = HttpClients.createDefault(); + HttpGet httpGet = new HttpGet(uri); + ByteArrayOutputStream contentOutputStream = new ByteArrayOutputStream(); + try (InputStream is = client.execute(httpGet).getEntity().getContent()) { + LOG.info("Starting the parse"); + Metadata metadata = tikaProcessPool.parse(uri, + "text/html", + is, + contentOutputStream, + 3000L, + maxBytesToParse + ); + } catch (TimeoutException timeoutEx) { + LOG.info("Timeout exception. Good.", timeoutEx); + } + } catch (IOException e) { + LOG.info("Exception is OK - couldn't close the socket."); + } catch (Exception e) { + exc.set(e); + return; + } + } + }; + List ts = new ArrayList<>(); + for (int i = 0; i < numThreads; ++i) { + Thread t = new Thread(r); + t.start(); + ts.add(t); + } + for (Thread t : ts) { + t.join(); + } + if (exc.get() != null) { + throw exc.get(); + } + } + } +} \ No newline at end of file diff --git a/tika-fork-client/src/test/java/org/apache/tika/fork/TikaProcessTest.java b/tika-fork-client/src/test/java/org/apache/tika/fork/TikaProcessTest.java index 67abd1f..374d7a6 100644 --- a/tika-fork-client/src/test/java/org/apache/tika/fork/TikaProcessTest.java +++ b/tika-fork-client/src/test/java/org/apache/tika/fork/TikaProcessTest.java @@ -30,6 +30,7 @@ public class TikaProcessTest { String xlsPath = "test-files" + File.separator + "xls-sample.xls"; String txtPath = "test-files" + File.separator + "out.txt"; String bombFilePath = "test-files" + File.separator + "bomb.xls"; + String zipBombPath = "test-files" + File.separator + "zip-bomb.zip"; String bombContentType = "application/vnd.ms-excel"; Properties parseProperties; long maxBytesToParse = 256000000; @@ -174,8 +175,12 @@ private void doParse(TikaProcessPool tikaProcessPool, boolean parseContent) thro Assert.assertEquals(numFilesPerThread * numThreads, numParsed.get()); } + /** + * This test will run an XLS file known to blow up tika. This will cause an OOM in the fork process but it should + * gracefully return. + */ @Test - public void testExternalTikaBombSingleThread() throws Exception { + public void testExternalTikaXlsBombSingleThread() throws Exception { try (TikaProcessPool tikaProcessPool = new TikaProcessPool(javaPath, System.getProperty("java.io.tmpdir"), tikaDistPath, @@ -197,6 +202,33 @@ public void testExternalTikaBombSingleThread() throws Exception { } } + /** + * This test will stream until it hits maxBytesToParse. Then it will stop. It will have all the partial bytes up + * until it stops. + */ + @Test + public void testExternalTikaBombZipWithCsvSingleThread() throws Exception { + try (TikaProcessPool tikaProcessPool = new TikaProcessPool(javaPath, + System.getProperty("java.io.tmpdir"), + tikaDistPath, + 200, + parseProperties, + 1, + 1, + 1, + true, + 30000, + 3000, + -1, + -1)) { + ByteArrayOutputStream contentOutputStream = new ByteArrayOutputStream(); + try (FileInputStream fis = new FileInputStream(zipBombPath)) { + tikaProcessPool.parse(zipBombPath, "application/zip", fis, contentOutputStream, 300000L, maxBytesToParse); + Assert.assertEquals(maxBytesToParse, contentOutputStream.toString("UTF-8").length()); + } + } + } + @Test public void testTikaParseTimeoutExceeded() throws Exception { try (TikaProcessPool tikaProcessPool = new TikaProcessPool(javaPath, diff --git a/tika-fork-main/src/main/resources/log2j.xml b/tika-fork-client/src/test/resources/log4j2.xml similarity index 62% rename from tika-fork-main/src/main/resources/log2j.xml rename to tika-fork-client/src/test/resources/log4j2.xml index 2270729..c94cfd8 100644 --- a/tika-fork-main/src/main/resources/log2j.xml +++ b/tika-fork-client/src/test/resources/log4j2.xml @@ -6,9 +6,8 @@ - - - + + diff --git a/tika-fork-main/build.gradle b/tika-fork-main/build.gradle index fdaace6..c9c3a71 100644 --- a/tika-fork-main/build.gradle +++ b/tika-fork-main/build.gradle @@ -23,11 +23,7 @@ dependencies { compile "org.apache.tika:tika-core:${tikaVersion}" compile "org.apache.tika:tika-parsers:${tikaVersion}" compile "args4j:args4j:${args4jVersion}" - compile "org.slf4j:slf4j-api:${slf4jVersion}" - compile "org.slf4j:slf4j-log4j12:${slf4jVersion}" - compile "org.slf4j:slf4j-simple:${slf4jVersion}" - compile "org.slf4j:jcl-over-slf4j:${slf4jVersion}" - compile "org.slf4j:jul-to-slf4j:${slf4jVersion}" + compile 'org.apache.logging.log4j:log4j-slf4j-impl:2.9.0' testCompile group: "junit", name: "junit", version: "${junitVersion}" } diff --git a/tika-fork-main/src/main/java/org/apache/tika/fork/main/TikaForkMain.java b/tika-fork-main/src/main/java/org/apache/tika/fork/main/TikaForkMain.java index bcf1510..a8100d3 100644 --- a/tika-fork-main/src/main/java/org/apache/tika/fork/main/TikaForkMain.java +++ b/tika-fork-main/src/main/java/org/apache/tika/fork/main/TikaForkMain.java @@ -52,8 +52,8 @@ private static TikaParsingHandler getContentHandler(String mainUrl, return new TikaParsingHandler(mainUrl, out, main, linksHandler); } - @Option(name = "-configDirectoryPath", usage = "The directory that will contain the configuration files that communicate between the fork process and the client process.") - private String configDirectoryPath; + @Option(name = "-workDirectoryPath", usage = "The directory that will contain the tmp files that communicate between the fork process and the client process and tmp files for parsing tika.") + private String workDirectoryPath; @Option(name = "-parserPropertiesFilePath", usage = "The parse configuration file.") private String parserPropertiesFilePath; @Option(name = "-contentInServerPort", usage = "This is the port for the socket server that will be used to send in the file.") @@ -75,6 +75,15 @@ private static TikaParsingHandler getContentHandler(String mainUrl, boolean includeImages; private void run() throws Exception { + if (StringUtils.isBlank(workDirectoryPath)) { + workDirectoryPath = System.getProperty("java.io.tmpdir"); + } else { + if (workDirectoryPath.endsWith(File.separator)) { + workDirectoryPath = workDirectoryPath.substring(0, workDirectoryPath.length() - 1); + } + // set this here to prevent tika tmp files from being written to the default tmpdir which often is undesirable + System.setProperty("java.io.tmpdir", workDirectoryPath); + } parserProperties = new Properties(); if (StringUtils.isNotBlank(parserPropertiesFilePath)) { try (FileReader fr = new FileReader(parserPropertiesFilePath)) { @@ -85,17 +94,10 @@ private void run() throws Exception { Integer.parseInt(parserProperties.getProperty("zipBombCompressionRatio", "200")), Integer.parseInt(parserProperties.getProperty("zipBombMaxDepth", "200")), Integer.parseInt(parserProperties.getProperty("zipBombMaxPackageEntryDepth", "20"))); - if (StringUtils.isBlank(configDirectoryPath)) { - configDirectoryPath = System.getProperty("java.io.tmpdir"); - } else { - if (configDirectoryPath.endsWith(File.separator)) { - configDirectoryPath = configDirectoryPath.substring(0, configDirectoryPath.length() - 1); - } - } extractHtmlLinks = Boolean.parseBoolean(parserProperties.getProperty("extractHtmlLinks", "false")); includeImages = Boolean.parseBoolean(parserProperties.getProperty("includeImages", "false")); boolean parseContent = Boolean.parseBoolean(parserProperties.getProperty("parseContent", "true")); - String portsFilePath = configDirectoryPath + File.separator + "tikafork-ports-" + parserProperties.get("runUuid") + ".properties"; + String portsFilePath = workDirectoryPath + File.separator + "tikafork-ports-" + parserProperties.get("runUuid") + ".properties"; LOG.info("Tika ports file path: \"{}\"", portsFilePath); File portsFile = new File(portsFilePath); if (parseContent) { diff --git a/tika-fork-main/src/main/resources/log4j2.xml b/tika-fork-main/src/main/resources/log4j2.xml new file mode 100644 index 0000000..26aa23c --- /dev/null +++ b/tika-fork-main/src/main/resources/log4j2.xml @@ -0,0 +1,15 @@ + + + + + + + + + + + + + + + \ No newline at end of file