From 95c253e1287f0248244897bc130bd3ad5852862f Mon Sep 17 00:00:00 2001 From: jmorton Date: Tue, 29 May 2018 03:03:20 +0000 Subject: [PATCH] Split apart Extract creation into an ExtractAdapter, to support Hyper extracts in the future. Add Makefile for managing releases --- Makefile | 30 ++++ bin/extract.sh | 2 +- bin/publish.sh | 2 +- pom.xml | 2 +- .../tableau/CommandLinePropertySource.java | 28 ++-- .../net/jlmorton/tableau/ExtractAdapter.java | 19 +++ .../net/jlmorton/tableau/ExtractWriter.java | 6 +- .../jlmorton/tableau/ExtractWriterImpl.java | 130 ------------------ src/main/java/net/jlmorton/tableau/Main.java | 17 +-- .../tableau/MultiThreadedExtractWriter.java | 107 ++++++++++++++ .../java/net/jlmorton/tableau/Properties.java | 4 +- .../java/net/jlmorton/tableau/RowWriter.java | 2 +- .../jlmorton/tableau/TdeExtractAdapter.java | 103 ++++++++++++++ ...va => MultiThreadedExtractWriterTest.java} | 30 ++-- .../net/jlmorton/tableau/RowWriterTest.java | 2 +- 15 files changed, 312 insertions(+), 172 deletions(-) create mode 100644 Makefile create mode 100644 src/main/java/net/jlmorton/tableau/ExtractAdapter.java delete mode 100644 src/main/java/net/jlmorton/tableau/ExtractWriterImpl.java create mode 100644 src/main/java/net/jlmorton/tableau/MultiThreadedExtractWriter.java create mode 100644 src/main/java/net/jlmorton/tableau/TdeExtractAdapter.java rename src/test/java/net/jlmorton/tableau/{ExtractWriterImplTest.java => MultiThreadedExtractWriterTest.java} (75%) diff --git a/Makefile b/Makefile new file mode 100644 index 0000000..94152fb --- /dev/null +++ b/Makefile @@ -0,0 +1,30 @@ +MKDIR_P = mkdir -p +NAME = tableau-sdk-wrapper +VERSION = 1.1 +BUILD_DIR = ${NAME}-${VERSION} +ARTIFACT = tableau-${VERSION}.jar +ARCHIVE = ${NAME}-${VERSION}.zip +TARGET = target +SAMPLES = samples +BIN_DIR = bin + +all: mkdir copy_files archive + +mkdir: + ${MKDIR_P} ${BUILD_DIR}/bin + ${MKDIR_P} ${BUILD_DIR}/lib + ${MKDIR_P} ${BUILD_DIR}/samples + ${MKDIR_P} ${BUILD_DIR}/tmp + ${MKDIR_P} ${BUILD_DIR}/logs + +copy_files: + cp ${TARGET}/${ARTIFACT} ${BUILD_DIR}/lib + cp -a ${SAMPLES}/* ${BUILD_DIR}/samples/ + cp -a ${BIN_DIR}/* ${BUILD_DIR}/bin/ + +archive: + zip -r ${ARCHIVE} ${BUILD_DIR} + +clean: + rm -rf ${BUILD_DIR} + rm -f ${ARCHIVE} diff --git a/bin/extract.sh b/bin/extract.sh index e01a715..1e7ef66 100755 --- a/bin/extract.sh +++ b/bin/extract.sh @@ -2,7 +2,7 @@ DIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd )/.." LIB="$DIR/lib" MAX_MEMORY="2g" -JARFILE="$LIB/tableau-1.0.jar" +JARFILE="$LIB/tableau-1.1.jar" SDK_DIR="$LIB/tableausdk-linux64-10300.18.0510.1135" JAVA_SDK_DIR="$SDK_DIR/lib64/tableausdk/Java/" diff --git a/bin/publish.sh b/bin/publish.sh index bbee256..e96e0fc 100755 --- a/bin/publish.sh +++ b/bin/publish.sh @@ -2,7 +2,7 @@ DIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd )" BASE_PATH="$DIR/.."; LIB="$BASE_PATH/lib" -JARFILE="$LIB/tableau-1.0.jar" +JARFILE="$LIB/tableau-1.1.jar" SDK_DIR="$LIB/tableausdk-linux64-10300.18.0510.1135" JAVA_SDK_DIR="$SDK_DIR/lib64/tableausdk/Java/" diff --git a/pom.xml b/pom.xml index 8214ecb..5e1b858 100644 --- a/pom.xml +++ b/pom.xml @@ -4,7 +4,7 @@ 4.0.0 net.jlmorton tableau - 1.0 + 1.1 UTF-8 1.8 diff --git a/src/main/java/net/jlmorton/tableau/CommandLinePropertySource.java b/src/main/java/net/jlmorton/tableau/CommandLinePropertySource.java index 54c827a..4fc0ff3 100644 --- a/src/main/java/net/jlmorton/tableau/CommandLinePropertySource.java +++ b/src/main/java/net/jlmorton/tableau/CommandLinePropertySource.java @@ -1,10 +1,15 @@ package net.jlmorton.tableau; import org.apache.commons.cli.*; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; import java.io.File; +import java.io.IOException; class CommandLinePropertySource { + private static final Logger LOGGER = LogManager.getLogger(CommandLinePropertySource.class); + private final String[] args; CommandLinePropertySource(String... args) { @@ -16,8 +21,9 @@ Properties getProperties() { return new Properties() { @Override - public String getSchemaPath() { - return commandLine.getOptionValue("schema"); + public Schema getSchema() { + String schemaPath = commandLine.getOptionValue("schema"); + return createSchemaFromJson(schemaPath); } @Override @@ -79,13 +85,18 @@ public boolean isExtract() { return commandLine.hasOption("extract"); } - @Override - public boolean isAppend() { - return commandLine.hasOption("append"); - } }; } + private Schema createSchemaFromJson(String schemaPath) { + try { + return Schema.fromJson(schemaPath); + } catch (IOException e) { + LOGGER.error("Error creating schema with path {}", schemaPath, e); + throw new RuntimeException(e); + } + } + private CommandLine parseCommandLineOptions() { CommandLineParser parser = new DefaultParser(); @@ -97,6 +108,7 @@ private CommandLine parseCommandLineOptions() { return commandLine; } catch (ParseException e) { + LOGGER.error("Could not parse command line options", e); throw new RuntimeException("Could not parse command line options", e); } } @@ -105,13 +117,11 @@ private static Options getOptions() { Options options = new Options(); options.addOption("s", "schema", true, "Schema file for extract"); options.addOption("f", "file", true, "CSV file to import"); - options.addOption("a", "append", false, "Append to existing extract"); - options.addOption("o", "output", true, "Output file name, or name of existing extract in append mode"); options.addOption("t", "threads", true, "Number of threads (default: 1)"); options.addOption("p", "publish", false, "Publish an extract to Tableau (requires --extract, --site, --project, --datasource, --username --password, and --url,"); options.addOption("s", "site", true, "Tableau site name to publish"); options.addOption("c", "project", true, "Project name to publish to"); - options.addOption("e", "extract", true, "Filename of extract to publish"); + options.addOption("e", "extract", true, "Filename of extract"); options.addOption("d", "datasource", true, "Name of datasource to publish"); options.addOption("u", "url", true, "Tableau Server URL for publishing"); options.addOption("n", "username", true, "Tableau Server username for publishing"); diff --git a/src/main/java/net/jlmorton/tableau/ExtractAdapter.java b/src/main/java/net/jlmorton/tableau/ExtractAdapter.java new file mode 100644 index 0000000..d2b541e --- /dev/null +++ b/src/main/java/net/jlmorton/tableau/ExtractAdapter.java @@ -0,0 +1,19 @@ +package net.jlmorton.tableau; + +import com.tableausoftware.TableauException; +import com.tableausoftware.extract.Row; +import com.tableausoftware.extract.TableDefinition; + +public interface ExtractAdapter { + void openExtract() throws TableauException; + + void closeExtract(); + + void insertRow(Row row) throws TableauException; + + TableDefinition getTableDefinition(); + + default String getTableName() { + return "Extract"; + } +} diff --git a/src/main/java/net/jlmorton/tableau/ExtractWriter.java b/src/main/java/net/jlmorton/tableau/ExtractWriter.java index 25570c2..417f15f 100644 --- a/src/main/java/net/jlmorton/tableau/ExtractWriter.java +++ b/src/main/java/net/jlmorton/tableau/ExtractWriter.java @@ -1,7 +1,9 @@ package net.jlmorton.tableau; -import com.tableausoftware.extract.Extract; +import com.tableausoftware.TableauException; public interface ExtractWriter { - Extract createExtract(); + void writeExtract() throws TableauException; + + void closeExtract(); } diff --git a/src/main/java/net/jlmorton/tableau/ExtractWriterImpl.java b/src/main/java/net/jlmorton/tableau/ExtractWriterImpl.java deleted file mode 100644 index 9605c1d..0000000 --- a/src/main/java/net/jlmorton/tableau/ExtractWriterImpl.java +++ /dev/null @@ -1,130 +0,0 @@ -package net.jlmorton.tableau; - -import com.tableausoftware.TableauException; -import com.tableausoftware.common.Type; -import com.tableausoftware.extract.Extract; -import com.tableausoftware.extract.Row; -import com.tableausoftware.extract.Table; -import com.tableausoftware.extract.TableDefinition; -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; - -import java.util.List; -import java.util.concurrent.*; - -public class ExtractWriterImpl implements ExtractWriter { - private static final Logger LOGGER = LogManager.getLogger(ExtractWriterImpl.class); - private static final Logger REJECTS = LogManager.getLogger("Rejects"); - private static final String EXTRACT_NAME = "Extract"; - - private final Schema schema; - private final String outputFileName; - private final int numThreads; - private final RowInputSource rowInputSource; - - ExtractWriterImpl(Schema schema, RowInputSource rowInputSource, Properties properties) { - this.schema = schema; - this.outputFileName = properties.getExtractFilePath(); - this.rowInputSource = rowInputSource; - this.numThreads = properties.getNumberOfThreads(); - } - - public Extract createExtract() { - try { - return createOrAppendToExtract(); - } catch (Exception e) { - throw new RuntimeException("Could not create extract", e); - } - } - - private Extract createOrAppendToExtract() throws TableauException, InterruptedException { - final Extract extract = new Extract(outputFileName); - final TableDefinition tableDefinition = createTableDefinitionFromSchema(schema); - - Table table = createOrOpenTable(extract, tableDefinition); - - final ThreadPoolExecutor threadPoolExecutor = getThreadPoolExecutor(); - - LOGGER.info("Parsing Rows"); - - while (rowInputSource.hasNext()) { - List row = rowInputSource.getNextRow(); - logProgress(rowInputSource.getCurrentRowNumber()); - threadPoolExecutor.submit(() -> insertRow(tableDefinition, table, row, rowInputSource.getCurrentRowNumber())); - } - - threadPoolExecutor.shutdown(); - threadPoolExecutor.awaitTermination(4, TimeUnit.HOURS); - - LOGGER.info("Completed writing {} rows to {} extract", rowInputSource.getCurrentRowNumber(), schema.getName()); - - return extract; - } - - private Table createOrOpenTable(Extract extract, TableDefinition tableDefinition) throws TableauException { - if (extract.hasTable(EXTRACT_NAME)) { - return extract.openTable(EXTRACT_NAME); - } else { - return extract.addTable(EXTRACT_NAME, tableDefinition); - } - } - - private void insertRow(TableDefinition tableDefinition, Table table, List parsedRow, int rowIndex) { - try { - final Row row = RowWriter.createRow(parsedRow, tableDefinition); - insertToTable(table, row); - row.close(); - - } catch (Exception e) { - REJECTS.info("Row {} {}", rowIndex, e.getMessage()); - LOGGER.error("Could not parse row {}", rowIndex, e); - } - } - - private void logProgress(int currentRowParsed) { - if ((currentRowParsed % 10000) == 0) { - LOGGER.info("Inserted {} rows to {}", currentRowParsed, schema.getName()); - } - } - - private void insertToTable(Table table, Row row) throws TableauException { - // Table is not thread-safe. Though we can do the work of parsing a row in parallel, - // inserting into the table must be synchronized - synchronized (schema) { - table.insert(row); - } - } - - private TableDefinition createTableDefinitionFromSchema(Schema schema) throws TableauException { - TableDefinition tableDefinition = new TableDefinition(); - schema.getSchema().forEach((name, type) -> safeAddColumn(tableDefinition, name, type)); - - return tableDefinition; - } - - private void safeAddColumn(TableDefinition tableDefinition, String name, Type type) { - try { - tableDefinition.addColumn(name, type); - } catch (TableauException e) { - LOGGER.error("Error while adding column to table definition: {}", name, e); - } - } - - private ThreadPoolExecutor getThreadPoolExecutor() { - return new ThreadPoolExecutor(numThreads, numThreads, 0L, TimeUnit.MILLISECONDS, getBlockingQueue(), getRejectedExecutionHandler()); - } - - private BlockingQueue getBlockingQueue() { - return new ArrayBlockingQueue<>(1000); - } - - private RejectedExecutionHandler getRejectedExecutionHandler() { - return (runnable, executor) -> { - try { - executor.getQueue().put(runnable); - } catch (InterruptedException e) { - LOGGER.error(e); - } - }; - } -} \ No newline at end of file diff --git a/src/main/java/net/jlmorton/tableau/Main.java b/src/main/java/net/jlmorton/tableau/Main.java index 6540319..5fa8fd3 100644 --- a/src/main/java/net/jlmorton/tableau/Main.java +++ b/src/main/java/net/jlmorton/tableau/Main.java @@ -1,13 +1,11 @@ package net.jlmorton.tableau; import com.tableausoftware.TableauException; -import com.tableausoftware.extract.Extract; import org.apache.commons.lang3.StringUtils; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import java.io.File; -import java.io.IOException; import java.util.Objects; public class Main { @@ -44,21 +42,20 @@ private static void publish(Properties properties) throws TableauException { Publisher.publish(properties); } - private static void createExtract(Properties properties) throws IOException { + private static void createExtract(Properties properties) throws TableauException { validatePropertiesForExtract(properties); - Schema schema = Schema.fromJson(properties.getSchemaPath()); - - LOGGER.info("Creating Extract {}", schema.getName()); + LOGGER.info("Creating Extract {}", properties.getSchema().getName()); LOGGER.info("CSV File Path: {}", properties.getCsvFile()); LOGGER.info("Extract Path: {}", properties.getExtractFilePath()); LOGGER.info("Number of Threads: {}", properties.getNumberOfThreads()); RowInputSource rowInputSource = new CsvInputSource(properties.getCsvFile()); + ExtractAdapter extractAdapter = new TdeExtractAdapter(properties); - ExtractWriter extractWriter = new ExtractWriterImpl(schema, rowInputSource, properties); - Extract extract = extractWriter.createExtract(); - extract.close(); + ExtractWriter extractWriter = new MultiThreadedExtractWriter(properties, rowInputSource, extractAdapter); + extractWriter.writeExtract(); + extractWriter.closeExtract(); } private static void validatePropertiesForPublishing(Properties properties) { @@ -84,7 +81,7 @@ private static void validatePropertiesForPublishing(Properties properties) { private static void validatePropertiesForExtract(Properties properties) { boolean hasExtractPath = !Objects.isNull(properties.getExtractFilePath()); boolean hasCsvFilePath = !Objects.isNull(properties.getCsvFile()); - boolean hasSchemaPath = StringUtils.isNotBlank(properties.getSchemaPath()); + boolean hasSchemaPath = !Objects.isNull(properties.getSchema()); if (!(hasExtractPath && hasCsvFilePath && hasSchemaPath)) { LOGGER.error("Must provide extract path, CSV file path, and Schema path when creating an extract"); diff --git a/src/main/java/net/jlmorton/tableau/MultiThreadedExtractWriter.java b/src/main/java/net/jlmorton/tableau/MultiThreadedExtractWriter.java new file mode 100644 index 0000000..4ca49f3 --- /dev/null +++ b/src/main/java/net/jlmorton/tableau/MultiThreadedExtractWriter.java @@ -0,0 +1,107 @@ +package net.jlmorton.tableau; + +import com.tableausoftware.TableauException; +import com.tableausoftware.extract.Row; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import java.util.List; +import java.util.concurrent.*; + +public class MultiThreadedExtractWriter implements ExtractWriter { + private static final Logger LOGGER = LogManager.getLogger(MultiThreadedExtractWriter.class); + private static final Logger REJECTS = LogManager.getLogger("Rejects"); + + private final RowInputSource rowInputSource; + private final Properties properties; + private final ExtractAdapter extractAdapter; + + MultiThreadedExtractWriter(Properties properties, RowInputSource rowInputSource, ExtractAdapter extractAdapter) { + this.rowInputSource = rowInputSource; + this.properties = properties; + this.extractAdapter = extractAdapter; + } + + @Override + public void writeExtract() { + try { + final ThreadPoolExecutor threadPoolExecutor = getThreadPoolExecutor(); + getExtractAdapter().openExtract(); + + LOGGER.info("Parsing Rows"); + while (getRowInputSource().hasNext()) { + List row = getRowInputSource().getNextRow(); + logProgress(getRowInputSource().getCurrentRowNumber()); + threadPoolExecutor.submit(() -> parseAndInsertRow(row, getRowInputSource().getCurrentRowNumber())); + } + + threadPoolExecutor.shutdown(); + threadPoolExecutor.awaitTermination(8, TimeUnit.HOURS); + + LOGGER.info("Completed writing {} rows to {} extract", getRowInputSource().getCurrentRowNumber(), getSchema().getName()); + } catch (TableauException | InterruptedException e) { + LOGGER.error("Error while writing extract", e); + throw new RuntimeException(e); + } + } + + @Override + public void closeExtract() { + getExtractAdapter().closeExtract(); + } + + private void parseAndInsertRow(List textRow, int rowIndex) { + try { + final Row row = RowWriter.parseAndCreateRow(textRow, getExtractAdapter().getTableDefinition()); + getExtractAdapter().insertRow(row); + row.close(); + } catch (Exception e) { + REJECTS.info("Row {} {}", rowIndex, e.getMessage()); + LOGGER.error("Could not parse row {}", rowIndex, e); + } + } + + private void logProgress(int currentRowParsed) { + if ((currentRowParsed % 10000) == 0) { + LOGGER.info("Inserted {} rows to {}", currentRowParsed, getSchema().getName()); + } + } + + private ExtractAdapter getExtractAdapter() { + return this.extractAdapter; + } + + private Properties getProperties() { + return this.properties; + } + + private RowInputSource getRowInputSource() { + return rowInputSource; + } + + private int getNumberOfThreads() { + return getProperties().getNumberOfThreads(); + } + + private Schema getSchema() { + return getProperties().getSchema(); + } + + private ThreadPoolExecutor getThreadPoolExecutor() { + return new ThreadPoolExecutor(getNumberOfThreads(), getNumberOfThreads(), 0L, TimeUnit.MILLISECONDS, getBlockingQueue(), getRejectedExecutionHandler()); + } + + private BlockingQueue getBlockingQueue() { + return new ArrayBlockingQueue<>(1000); + } + + private RejectedExecutionHandler getRejectedExecutionHandler() { + return (runnable, executor) -> { + try { + executor.getQueue().put(runnable); + } catch (InterruptedException e) { + LOGGER.error(e); + } + }; + } +} \ No newline at end of file diff --git a/src/main/java/net/jlmorton/tableau/Properties.java b/src/main/java/net/jlmorton/tableau/Properties.java index ab4057b..987de3d 100644 --- a/src/main/java/net/jlmorton/tableau/Properties.java +++ b/src/main/java/net/jlmorton/tableau/Properties.java @@ -3,7 +3,7 @@ import java.io.File; public interface Properties { - String getSchemaPath(); + Schema getSchema(); File getCsvFile(); @@ -26,6 +26,4 @@ public interface Properties { boolean isPublish(); boolean isExtract(); - - boolean isAppend(); } diff --git a/src/main/java/net/jlmorton/tableau/RowWriter.java b/src/main/java/net/jlmorton/tableau/RowWriter.java index c805430..e5cfe97 100644 --- a/src/main/java/net/jlmorton/tableau/RowWriter.java +++ b/src/main/java/net/jlmorton/tableau/RowWriter.java @@ -18,7 +18,7 @@ class RowWriter { private RowWriter() { } - static Row createRow(List row, TableDefinition tableDefinition) throws TableauException { + static Row parseAndCreateRow(List row, TableDefinition tableDefinition) throws TableauException { Row tableauRow = new Row(tableDefinition); for (int i = 0; i < row.size(); i++) { setRowData(tableauRow, i, tableDefinition.getColumnType(i), row.get(i)); diff --git a/src/main/java/net/jlmorton/tableau/TdeExtractAdapter.java b/src/main/java/net/jlmorton/tableau/TdeExtractAdapter.java new file mode 100644 index 0000000..fd31c1b --- /dev/null +++ b/src/main/java/net/jlmorton/tableau/TdeExtractAdapter.java @@ -0,0 +1,103 @@ +package net.jlmorton.tableau; + +import com.tableausoftware.TableauException; +import com.tableausoftware.common.Type; +import com.tableausoftware.extract.Extract; +import com.tableausoftware.extract.Row; +import com.tableausoftware.extract.Table; +import com.tableausoftware.extract.TableDefinition; + +public class TdeExtractAdapter implements ExtractAdapter { + private final Properties properties; + + private TableDefinition tableDefinition; + private Extract extract; + private Table table; + + @SuppressWarnings("WeakerAccess") + public TdeExtractAdapter(Properties properties) { + this.properties = properties; + } + + @Override + public void openExtract() throws TableauException { + Extract extract = new Extract(getProperties().getExtractFilePath()); + setExtract(extract); + + if (extract.hasTable(getTableName())) { + setTable(extract.openTable(getTableName())); + } else { + TableDefinition tableDefinition = createTableDefinitionFromSchema(getSchema()); + setTable(extract.addTable(getTableName(), tableDefinition)); + setTableDefinition(tableDefinition); + } + } + + @Override + public void closeExtract() { + getExtract().close(); + setExtract(null); + setTable(null); + setTableDefinition(null); + } + + public TableDefinition getTableDefinition() { + if (this.tableDefinition == null) { + throw new IllegalStateException("Table has not been opened"); + } + + return this.tableDefinition; + } + + @Override + public void insertRow(Row row) throws TableauException { + // Table is not thread-safe. Though we can do the work of parsing a row in parallel, + // inserting into the table must be synchronized + synchronized (getTable()) { + getTable().insert(row); + } + } + + private TableDefinition createTableDefinitionFromSchema(Schema schema) throws TableauException { + TableDefinition tableDefinition = new TableDefinition(); + schema.getSchema().forEach((name, type) -> safeAddColumn(tableDefinition, name, type)); + + return tableDefinition; + } + + private void safeAddColumn(TableDefinition tableDefinition, String name, Type type) { + try { + tableDefinition.addColumn(name, type); + } catch (TableauException e) { + throw new RuntimeException("Could not add column to table definition", e); + } + } + + private Schema getSchema() { + return getProperties().getSchema(); + } + + private Properties getProperties() { + return this.properties; + } + + public void setExtract(Extract extract) { + this.extract = extract; + } + + private Table getTable() { + return this.table; + } + + private void setTable(Table table) { + this.table = table; + } + + private void setTableDefinition(TableDefinition tableDefinition) { + this.tableDefinition = tableDefinition; + } + + Extract getExtract() { + return this.extract; + } +} diff --git a/src/test/java/net/jlmorton/tableau/ExtractWriterImplTest.java b/src/test/java/net/jlmorton/tableau/MultiThreadedExtractWriterTest.java similarity index 75% rename from src/test/java/net/jlmorton/tableau/ExtractWriterImplTest.java rename to src/test/java/net/jlmorton/tableau/MultiThreadedExtractWriterTest.java index ac1647a..2fee94c 100644 --- a/src/test/java/net/jlmorton/tableau/ExtractWriterImplTest.java +++ b/src/test/java/net/jlmorton/tableau/MultiThreadedExtractWriterTest.java @@ -9,17 +9,25 @@ import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; -public class ExtractWriterImplTest { +public class MultiThreadedExtractWriterTest { @Test public void testCreateExtract() throws Exception { - String sampleSchemaFileName = getClass().getResource("sample-schema.json").getFile(); - Schema sampleSchema = Schema.fromJson(sampleSchemaFileName); + String schemaFile = getClass().getResource("sample-schema.json").getFile(); + Schema schema = Schema.fromJson(schemaFile); + Properties properties = getProperties(schema); RowInputSource rowInputSource = new CsvInputSource(getSampleCsvFile()); - ExtractWriter extractWriter = new ExtractWriterImpl(sampleSchema, rowInputSource, getProperties()); - Extract extract = extractWriter.createExtract(); + TdeExtractAdapter extractAdapter = new TdeExtractAdapter(properties); + + ExtractWriter extractWriter = new MultiThreadedExtractWriter(properties, rowInputSource, extractAdapter); + extractWriter.writeExtract(); + + Extract extract = extractAdapter.getExtract(); + assertNotNull(extract); - assertTrue(extract.hasTable("Extract")); + assertTrue(extract.hasTable(extractAdapter.getTableName())); + + extractWriter.closeExtract(); } private String getTemporaryFilePath() { @@ -38,11 +46,11 @@ private File getSampleCsvFile() { return new File(this.getClass().getResource("sample-extract.csv").getFile()); } - private Properties getProperties() { + private Properties getProperties(Schema schema) { return new Properties() { @Override - public String getSchemaPath() { - return getClass().getResource("sample-schema.json").getFile(); + public Schema getSchema() { + return schema; } @Override @@ -100,10 +108,6 @@ public boolean isExtract() { return false; } - @Override - public boolean isAppend() { - return false; - } }; } } \ No newline at end of file diff --git a/src/test/java/net/jlmorton/tableau/RowWriterTest.java b/src/test/java/net/jlmorton/tableau/RowWriterTest.java index c96cace..8e71fb1 100644 --- a/src/test/java/net/jlmorton/tableau/RowWriterTest.java +++ b/src/test/java/net/jlmorton/tableau/RowWriterTest.java @@ -19,7 +19,7 @@ public void testCreateRow() throws Exception { tableDefinition.addColumn("foo_int", Type.INTEGER); List textRow = Arrays.asList("bar", "2017-05-01", "100"); - Row row = RowWriter.createRow(textRow, tableDefinition); + Row row = RowWriter.parseAndCreateRow(textRow, tableDefinition); assertNotNull(row); }