Skip to content

Commit

Permalink
communicate the content type and the base URI to the process via the
Browse files Browse the repository at this point in the history
contentInStream instead of communicating that via config file.

fix the log4j stuff. inherit the logging correctly

create a health check for kubernetes purposes
  • Loading branch information
nddipiazza committed Jun 1, 2019
1 parent 169f518 commit c3c188f
Show file tree
Hide file tree
Showing 9 changed files with 212 additions and 97 deletions.
8 changes: 7 additions & 1 deletion gradle.properties
Original file line number Diff line number Diff line change
@@ -1,4 +1,10 @@
# version
version=1.0b03

# deps
tikaVersion=1.20
slf4jVersion=1.7.26
commonsPoolVersion=2.6.2
junitVersion=4.12
junitVersion=4.12
jacksonDatabindVersion=2.4.4
args4jVersion=2.33
15 changes: 12 additions & 3 deletions tika-fork-main/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ plugins {
}

group "org.apache.tika"
version "1.0b02"

sourceCompatibility = 1.8

Expand All @@ -15,10 +14,20 @@ jar {
baseName "tika-fork-main"
}

configurations.all {
exclude module: 'slf4j-log4j12'
}

dependencies {
compile "com.fasterxml.jackson.core:jackson-databind:2.4.4"
compile "com.fasterxml.jackson.core:jackson-databind:${jacksonDatabindVersion}"
compile "org.apache.tika:tika-core:${tikaVersion}"
compile "org.apache.tika:tika-parsers:${tikaVersion}"
compile "args4j:args4j:${args4jVersion}"
compile "org.slf4j:slf4j-api:${slf4jVersion}"
compile "org.slf4j:slf4j-log4j12:${slf4jVersion}"
compile "org.slf4j:slf4j-simple:${slf4jVersion}"
compile "org.slf4j:jcl-over-slf4j:${slf4jVersion}"
compile "org.slf4j:jul-to-slf4j:${slf4jVersion}"
testCompile group: "junit", name: "junit", version: "${junitVersion}"
}

Expand All @@ -32,6 +41,6 @@ task copyJarToDist(type: Copy) {
into new File(buildDir, "dist")
}

task buildWithDependencies(dependsOn: [build, copyDependencies, copyJarToDist]) {
task dist(dependsOn: [build, copyDependencies, copyJarToDist]) {

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
package org.apache.tika.main;

import java.io.BufferedOutputStream;
import java.io.BufferedReader;
import java.io.InputStreamReader;
import java.io.OutputStream;
import java.io.PrintStream;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.Date;

/**
* Useful for kubernetes. Gives you something for the keepalive check.
* Will return the number of milliseconds since it has seen a parse request.
*/
public class HealthCheckServer implements Runnable {
public static long LAST_UPDATE = System.currentTimeMillis();

private static final String newLine = "\r\n";

int port;

public HealthCheckServer(int port) {
this.port = port;
}

@Override
public void run() {
try {
ServerSocket socket = new ServerSocket(port);

while (true) {
Socket connection = socket.accept();

try {
BufferedReader in = new BufferedReader(new InputStreamReader(connection.getInputStream()));
OutputStream out = new BufferedOutputStream(connection.getOutputStream());
PrintStream pout = new PrintStream(out);

// read first line of request
String request = in.readLine();
if (request == null) {
continue;
}

// we ignore the rest
while (true) {
String ignore = in.readLine();
if (ignore == null || ignore.length() == 0) {
break;
}
}

if (!request.startsWith("GET ") ||
!(request.endsWith(" HTTP/1.0") || request.endsWith(" HTTP/1.1"))) {
// bad request
pout.print("HTTP/1.0 400 Bad Request" + newLine + newLine);
} else {
String response = String.valueOf(System.currentTimeMillis() - LAST_UPDATE);

pout.print(
"HTTP/1.0 200 OK" + newLine +
"Content-Type: text/plain" + newLine +
"Date: " + new Date() + newLine +
"Content-length: " + response.length() + newLine + newLine +
response
);
}

pout.close();
} catch (Throwable tri) {
System.err.println("Error handling request: " + tri);
}
}
} catch (Throwable tr) {
System.err.println("Could not start server: " + tr);
}

}
}
140 changes: 71 additions & 69 deletions tika-fork-main/src/main/java/org/apache/tika/main/TikaForkMain.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,13 @@
import org.apache.tika.parser.Parser;
import org.apache.tika.parser.html.HtmlMapper;
import org.apache.tika.parser.pdf.PDFParserConfig;
import org.kohsuke.args4j.CmdLineParser;
import org.kohsuke.args4j.Option;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.xml.sax.ContentHandler;

import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.FileReader;
import java.io.IOException;
import java.io.InputStream;
Expand All @@ -32,8 +32,6 @@
import java.io.PipedOutputStream;
import java.net.ServerSocket;
import java.net.Socket;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
Expand All @@ -55,41 +53,61 @@ private static TikaParsingHandler getContentHandler(String mainUrl,
return new TikaParsingHandler(mainUrl, out, main, linksHandler);
}

@Option(name = "-configDirectoryPath", usage = "The directory that will contain the configuration files that communicate between the fork process and the client process.")
private String configDirectoryPath;
@Option(name = "-parserPropertiesFilePath", usage = "The parse configuration file.")
private String parserPropertiesFilePath;
@Option(name = "-contentInServerPort", usage = "This is the port for the socket server that will be used to send in the file.")
private int contentInServerPort = 0;
@Option(name = "-metadataOutServerPort", usage = "This is the port for the socket server that will be used to send out the parsed metadata.")
private int metadataOutServerPort = 0;
@Option(name = "-contentOutServerPort", usage = "This is the port for the socket server that will be used to send out the parsed file contents.")
private int contentOutServerPort = 0;
@Option(name = "-healthCheckPort", usage = "If specified, this is the port that will be used for a health check service.")
private int healthCheckPort = 0;

private ServerSocket contentInServerSocket;
private ServerSocket metadataOutServerSocket;
private ServerSocket contentOutServerSocket;

private Properties parserProperties;
private ConfigurableAutoDetectParser defaultParser;
private Detector detector = new DefaultDetector();
private String configDirectoryPath;

public TikaForkMain(String configDirectoryPath, Properties parseProperties){
this.parserProperties = parseProperties;
boolean extractHtmlLinks;
boolean includeImages;

private void run() throws Exception {
parserProperties = new Properties();
if (StringUtils.isNotBlank(parserPropertiesFilePath)) {
try (FileReader fr = new FileReader(parserPropertiesFilePath)) {
parserProperties.load(fr);
}
}
defaultParser = new ConfigurableAutoDetectParser(detector,
Integer.parseInt(parseProperties.getProperty("zipBombCompressionRatio", "200")),
Integer.parseInt(parseProperties.getProperty("zipBombMaxDepth", "200")),
Integer.parseInt(parseProperties.getProperty("zipBombMaxPackageEntryDepth", "20")));
Integer.parseInt(parserProperties.getProperty("zipBombCompressionRatio", "200")),
Integer.parseInt(parserProperties.getProperty("zipBombMaxDepth", "200")),
Integer.parseInt(parserProperties.getProperty("zipBombMaxPackageEntryDepth", "20")));
if (StringUtils.isBlank(configDirectoryPath)) {
this.configDirectoryPath = System.getProperty("java.io.tmpdir");
configDirectoryPath = System.getProperty("java.io.tmpdir");
} else {
if (configDirectoryPath.endsWith(File.separator)) {
configDirectoryPath = configDirectoryPath.substring(0, configDirectoryPath.length() - 1);
}
this.configDirectoryPath = configDirectoryPath;
}
}

private void run() throws Exception {
if (Boolean.parseBoolean(parserProperties.getProperty("parseContent", "false"))) {
extractHtmlLinks = Boolean.parseBoolean(parserProperties.getProperty("extractHtmlLinks", "false"));
includeImages = Boolean.parseBoolean(parserProperties.getProperty("includeImages", "false"));
ExecutorService keepAliveEs = Executors.newSingleThreadExecutor();
boolean parseContent = Boolean.parseBoolean(parserProperties.getProperty("parseContent", "true"));
String portsFilePath = configDirectoryPath + File.separator + "tikafork-ports-" + parserProperties.get("runUuid") + ".properties";
LOG.info("Tika ports file path: \"{}\"", portsFilePath);
File portsFile = new File(portsFilePath);
if (parseContent) {
ExecutorService es = Executors.newFixedThreadPool(3);
String portsFilePath = configDirectoryPath + File.separator + "tikafork-ports-" + parserProperties.get("runUuid") + ".properties";
LOG.info("Tika ports file path: \"{}\"", portsFilePath);
File portsFile = new File(portsFilePath);

try {
contentInServerSocket = new ServerSocket(0);
metadataOutServerSocket = new ServerSocket(0);
contentOutServerSocket = new ServerSocket(0);
contentInServerSocket = new ServerSocket(contentInServerPort);
metadataOutServerSocket = new ServerSocket(metadataOutServerPort);
contentOutServerSocket = new ServerSocket(contentOutServerPort);

FileUtils.writeLines(portsFile,
Lists.newArrayList(
Expand All @@ -99,6 +117,10 @@ private void run() throws Exception {
)
);

if (healthCheckPort > 0) {
keepAliveEs.execute(new HealthCheckServer(healthCheckPort));
}

while (true) {
final PipedInputStream metadataInputStream = new PipedInputStream();
final PipedOutputStream metadataOutputStream = new PipedOutputStream();
Expand Down Expand Up @@ -178,13 +200,10 @@ private void run() throws Exception {
}
} else {
ExecutorService es = Executors.newFixedThreadPool(2);
String portsFilePath = configDirectoryPath + File.separator + "tikafork-ports-" + parserProperties.get("runUuid") + ".properties";
LOG.info("Tika ports file path: \"{}\"", portsFilePath);
File portsFile = new File(portsFilePath);

try {
contentInServerSocket = new ServerSocket(0);
metadataOutServerSocket = new ServerSocket(0);
contentInServerSocket = new ServerSocket(contentInServerPort);
metadataOutServerSocket = new ServerSocket(metadataOutServerPort);

FileUtils.writeLines(portsFile,
Lists.newArrayList(
Expand All @@ -193,6 +212,10 @@ private void run() throws Exception {
)
);

if (healthCheckPort > 0) {
keepAliveEs.execute(new HealthCheckServer(healthCheckPort));
}

while (true) {
final PipedInputStream metadataInputStream = new PipedInputStream();
final PipedOutputStream metadataOutputStream = new PipedOutputStream();
Expand Down Expand Up @@ -272,6 +295,7 @@ private void writeContent(InputStream inputStream) throws Exception {
}

private void parseFile(OutputStream metadataOutputStream, OutputStream contentOutputStream) throws Exception {
HealthCheckServer.LAST_UPDATE = System.currentTimeMillis();
ParseContext context = new ParseContext();

// collect extended set of elements
Expand All @@ -286,37 +310,28 @@ private void parseFile(OutputStream metadataOutputStream, OutputStream contentOu
InputStream inputStream = socket.getInputStream();
ObjectOutputStream objectOutputStream = new ObjectOutputStream(metadataOutputStream)) {

Properties parseProperties = new Properties();
String parseContextPropertiesFilePath = configDirectoryPath + File.separator + "tikafork-context-" + parserProperties.get("runUuid") + ".properties";
if (!Files.exists(Paths.get(parseContextPropertiesFilePath))) {
throw new Exception("Cannot find property file: \"" + parseContextPropertiesFilePath + "\"");
}
try (FileInputStream fis = new FileInputStream(parseContextPropertiesFilePath)) {
parseProperties.load(fis);
if (includeImages) {
PDFParserConfig pdfParserConfig = new PDFParserConfig();
pdfParserConfig.setExtractInlineImages(true);
context.set(PDFParserConfig.class, pdfParserConfig);
}

String baseUri = parseProperties.getProperty("baseUri");
if (baseUri == null) {
throw new Exception("Missing property baseUri from the properties file " + parseContextPropertiesFilePath);
String baseUri = "";
String contentType = "";
int nextChar;
while ((nextChar = inputStream.read()) != '\n') {
baseUri += (char)nextChar;
}

String contentType = parseProperties.getProperty("contentType");
if (StringUtils.isNotBlank(contentType)) {
metadata.set(Metadata.CONTENT_TYPE, contentType);
while ((nextChar = inputStream.read()) != '\n') {
contentType += (char)nextChar;
}

boolean extractHtmlLinks = Boolean.parseBoolean(parseProperties.getProperty("extractHtmlLinks", "false"));

boolean includeImages = Boolean.parseBoolean(parseProperties.getProperty("includeImages", "false"));
LOG.info("Next file to parse baseUri={}, contentType={}", baseUri, contentType);

if (includeImages) {
PDFParserConfig pdfParserConfig = new PDFParserConfig();
pdfParserConfig.setExtractInlineImages(true);
context.set(PDFParserConfig.class, pdfParserConfig);
if (StringUtils.isNotBlank(contentType)) {
metadata.set(Metadata.CONTENT_TYPE, contentType);
}

LOG.info("Next file - baseUri={}, contentType={}, extractHtmlLinks={}", baseUri, contentType, extractHtmlLinks);

TikaInputStream tikaInputStream = TikaInputStream.get(inputStream);

TikaParsingHandler contentHandler = getContentHandler(baseUri, metadata, contentOutputStream, extractHtmlLinks);
Expand All @@ -329,25 +344,12 @@ private void parseFile(OutputStream metadataOutputStream, OutputStream contentOu
}

/**
* Runs an external tika parsing server.
* Does not use HTTP... directly uses sockets to avoid overhead.
* @param args The args[0] is the parser config file. This file will be deleted after running the program.
* The args[1] is the configuration directory path.
* Runs the external tika parsing server.
*/
public static void main(String[] args) throws Exception {
Properties parserProperties = new Properties();
File parserPropertiesFile = new File(args[0]);
if (!parserPropertiesFile.exists()) {
throw new FileNotFoundException("Could not find parser file \"" + args[0] + "\"");
}
LOG.info("Starting with parser properties file {}", args[0]);

try (FileReader fr = new FileReader(parserPropertiesFile)) {
parserProperties.load(fr);
TikaForkMain tikaForkMain = new TikaForkMain(args.length > 1 ? args[1] : null, parserProperties);
tikaForkMain.run();
} finally {
FileUtils.deleteQuietly(parserPropertiesFile);
}
TikaForkMain tikaForkMain = new TikaForkMain();
CmdLineParser cmdLineParser = new CmdLineParser(tikaForkMain);
cmdLineParser.parseArgument(args);
tikaForkMain.run();
}
}
10 changes: 5 additions & 5 deletions tika-fork-main/src/main/resources/log4j.properties
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,11 @@
# limitations under the License.

#info,debug, error,fatal ...
log4j.rootLogger=info,stderr
log4j.rootLogger=info,stdout

#console
log4j.appender.stderr=org.apache.log4j.ConsoleAppender
log4j.appender.stderr.layout=org.apache.log4j.PatternLayout
log4j.appender.stderr.Target=System.err
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.Target=System.out

log4j.appender.stderr.layout.ConversionPattern= %-5p [%t]: %m%n
log4j.appender.stdout.layout.ConversionPattern= %-5p [%t]: %m%n
Loading

0 comments on commit c3c188f

Please sign in to comment.