From 9b7212c2743fb3d872987c270cba9352b172e2c4 Mon Sep 17 00:00:00 2001 From: Daniel Weiss Date: Tue, 19 Apr 2016 17:51:04 -0400 Subject: [PATCH] ENG-10202 create kinesis exporter --- .gitignore | 211 +++++++++++++++ LICENSE.md | 23 ++ README.md | 97 +++++++ build.gradle | 69 +++++ .../KinesisFirehoseExportClient.java | 245 ++++++++++++++++++ 5 files changed, 645 insertions(+) create mode 100644 .gitignore create mode 100644 LICENSE.md create mode 100644 README.md create mode 100644 build.gradle create mode 100644 src/main/java/org/voltdb/exportclient/KinesisFirehoseExportClient.java diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..9cc79cc --- /dev/null +++ b/.gitignore @@ -0,0 +1,211 @@ +.gradle +**/build/ +**/tmp/ +loader.cfg + +# Ignore Gradle GUI config +gradle-app.setting +*.class + +# Generated Eclipse stuff +.project +.classpath + +# Package Files # +*.jar +*.war +*.ear + +# virtual machine crash logs, see http://www.java.com/en/download/help/error_hotspot.xml +hs_err_pid* + +.DS_Store +.AppleDouble +.LSOverride + +# Icon must end with two \r +Icon + + +# Thumbnails +._* + +# Files that might appear on external disk +.Spotlight-V100 +.Trashes + +# Directories potentially created on remote AFP share +.AppleDB +.AppleDesktop +Network Trash Folder +Temporary Items +.apdisk + +*~ +\#*\# +/.emacs.desktop +/.emacs.desktop.lock +*.elc +auto-save-list +tramp +.\#* + +# Org-mode +.org-id-locations +*_archive + +# flymake-mode +*_flymake.* + +# eshell files +/eshell/history +/eshell/lastdir + +# elpa packages +/elpa/ + +# reftex files +*.rel + +# AUCTeX auto folder +/auto/ + +# cask packages +.cask/ + +# cache files for sublime text +*.tmlanguage.cache +*.tmPreferences.cache +*.stTheme.cache + +# workspace files are user-specific +*.sublime-workspace + +# project files should be checked into the repository, unless a significant +# proportion of contributors will probably not be using SublimeText +# *.sublime-project + +# sftp configuration file +sftp-config.json + +# KDE directory preferences +.directory + +*.pydevproject +.metadata +.gradle +gradle.properties +bin/ +obj/ +tmp/ +*.tmp +*.bak +*.swp +*~.nib +local.properties +.settings/ +.loadpath + +# External tool builders +.externalToolBuilders/ + +# Locally stored "Eclipse launch configurations" +*.launch + +# CDT-specific +.cproject + +# PDT-specific +.buildpath + +# sbteclipse plugin +.target + +# TeXlipse plugin +.texlipse + +nbproject/private/ +build/ +nbbuild/ +dist/ +nbdist/ +nbactions.xml +nb-configuration.xml + +# It's better to unpack these files and commit the raw source because +# git has its own built in compression methods. +*.7z +*.jar +*.rar +*.zip +*.gz +*.bzip +#*.bz2 +*.xz +*.lzma +*.cab + +#packing-only formats +*.iso +*.tar + +#package management formats +*.dmg +*.xpi +*.gem +*.egg +*.deb +*.rpm +*.msi +*.msm +*.msp + +# Covers JetBrains IDEs: IntelliJ, RubyMine, PhpStorm, AppCode, PyCharm + +*.iml + +## Directory-based project format: +.idea/ +# if you remove the above rule, at least ignore the following: + +# User-specific stuff: +# .idea/workspace.xml +# .idea/tasks.xml +# .idea/dictionaries + +# Sensitive or high-churn files: +# .idea/dataSources.ids +# .idea/dataSources.xml +# .idea/sqlDataSources.xml +# .idea/dynamic.xml +# .idea/uiDesigner.xml + +# Gradle: +# .idea/gradle.xml +# .idea/libraries + +# Mongo Explorer plugin: +# .idea/mongoSettings.xml + +## File-based project format: +*.ipr +*.iws + +## Plugin-specific files: + +# IntelliJ +out/ + +# mpeltonen/sbt-idea plugin +.idea_modules/ + +# JIRA plugin +atlassian-ide-plugin.xml + +# Crashlytics plugin (for Android Studio and IntelliJ) +com_crashlytics_export_strings.xml +crashlytics.properties +crashlytics-build.properties + + +/.recommenders/ diff --git a/LICENSE.md b/LICENSE.md new file mode 100644 index 0000000..99c281b --- /dev/null +++ b/LICENSE.md @@ -0,0 +1,23 @@ +The MIT License (MIT) +===================== + +Copyright © 2015 VoltDB Inc. + +Permission is hereby granted, free of charge, to any person obtaining +a copy of this software and associated documentation files (the +“Software”), to deal in the Software without restriction, including +without limitation the rights to use, copy, modify, merge, publish, +distribute, sublicense, and/or sell copies of the Software, and to +permit persons to whom the Software is furnished to do so, subject to +the following conditions: + +The above copyright notice and this permission notice shall be +included in all copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED “AS IS”, WITHOUT WARRANTY OF ANY KIND, +EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF +MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND +NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE +LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION +OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION +WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. diff --git a/README.md b/README.md new file mode 100644 index 0000000..4bcac84 --- /dev/null +++ b/README.md @@ -0,0 +1,97 @@ +# VoltDB Kinesis Firehose Export Conduit + +An experimental VoltDB to Kinesis Firehose export conduit [Kinesis Firehose API] +(http://docs.aws.amazon.com/firehose/latest/dev/what-is-this-service.html). It +allows export stream writers to push data directly into correspoding Kinesis Firehose stream. + +## How to build artifacts and setup Eclipse + +* Install Gradle + +On a Mac if you have [Homebrew](http://brew.sh/) setup then simply install the gradle bottle + +```bash +brew install gradle +``` + +On Linux setup [GVM](http://gvmtool.net/), and install gradle as follows + +```bash +gvm install gradle +``` + +* Create `gradle.properties` file and set the `voltdbhome` property + to the base directory where your VoltDB is installed + +```bash +echo voltdbhome=/voltdb/home/dirname > gradle.properties +``` + +* Invoke gradle to compile artifacts + +```bash +gradle shadowJar +``` + +* To setup an eclipse project run gradle as follows + +```bash +gradle cleanEclipse eclipse +``` +then import it into your eclipse workspace by using File->Import projects menu option + +## Configuration + +* Copy the built jar from `build/libs` to `lib/extension` under your VoltDB installation directory + +* Edit your deployment file and use the following export XML stanza as a template + +```xml + + + + + + + + + us-east-1 + streamtest + + + + + +``` + +This tells VoltDB to write to the alerts stream and send the content to the Amazon Kinesis Firehose stream +with the name streamtest. If the client created with the supplied access.key and secret.key have access +to this stream then this stream will be successfully created. In this example we create the VoltDB export +with the definition: + +```sql +CREATE STREAM alerts EXPORT TO TARGET default ( + id integer not null, + msg varchar(128), + continent varchar(64), + country varchar(64) +); +``` + +Then data can be inserted into this export stream using the command: + +```sql +INSERT INTO ALERTS (ID,MSG,CONTINENT,COUNTRY) VALUES (1,'fab-02 inoperable','EU','IT'); +``` + +## Configuration Properties + +- `region` (mandatory) designates the AWS region where the Kinesis Firehose stream is defined +- `stream.name` (mandatory) Kinesis Firehose stream name +- `access.key` (mandatory) user's access key +- `secret.key` (mandatory) user's secret key +- `skipinternals` (optional, _default:_ false) flag to skip adding internal metadata to each row +- `timezone` (optional, _default:_ local timezone) timezone used to format timestamp values +- `binaryencoding` (optional, _default:_ hex) Specifies whether VARBINARY data is encoded in hexadecimal or BASE64 format. +- `type` (optional, _default:_ csv) specifies whether export format is csv or tsv. diff --git a/build.gradle b/build.gradle new file mode 100644 index 0000000..5dc3db1 --- /dev/null +++ b/build.gradle @@ -0,0 +1,69 @@ +/* + * The MIT License (MIT) + * + * Copyright (C) 2008-2016 VoltDB Inc. + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in + * all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN + * THE SOFTWARE. + */ + +plugins { + id 'java' + id 'eclipse' + id 'com.github.johnrengelman.shadow' version '1.2.3' +} + +description = 'VoltDB Kinesis Firehose export conduit' +version='1.0-SNAPSHOT' + +repositories { + mavenCentral() + jcenter() +} + +configurations { + voltlib + export +} + +def volt = fileTree(dir: "${voltdbhome}/voltdb", include: 'voltdb-*.jar') + +dependencies { + voltlib volt + voltlib ('commons-logging:commons-logging:1.1.3') { transitive = true } + voltlib ('commons-codec:commons-codec:1.6') { transitive = true } + voltlib ('commons-cli:commons-cli:1.2') { transitive = true } + + export ('com.amazonaws:aws-java-sdk-kinesis:1.10.69') { transitive = true } + export ('com.amazonaws:aws-java-sdk-core:1.10.69') { transitive = true } + export ('com.fasterxml.jackson.core:jackson-databind:2.5.3') { transitive = true } + export ('com.fasterxml.jackson.core:jackson-annotations:2.5.0') { transitive = true } + export ('com.fasterxml.jackson.core:jackson-core:2.5.3') { transitive = true } + export ('com.fasterxml.jackson.dataformat:jackson-dataformat-cbor:2.5.3') { transitive = true } + export ('joda-time:joda-time:2.8.1') { transitive = true } + export ('org.apache.httpcomponents:httpclient:4.3.6') { transitive = true } + export ('org.apache.httpcomponents:httpcore:4.3.3') { transitive = true } + + compile configurations.export + compile configurations.voltlib +} + +shadowJar { + mergeServiceFiles() + configurations = [project.configurations.export] +} diff --git a/src/main/java/org/voltdb/exportclient/KinesisFirehoseExportClient.java b/src/main/java/org/voltdb/exportclient/KinesisFirehoseExportClient.java new file mode 100644 index 0000000..fd3eb69 --- /dev/null +++ b/src/main/java/org/voltdb/exportclient/KinesisFirehoseExportClient.java @@ -0,0 +1,245 @@ +/* + * This file is part of VoltDB. + * Copyright (C) 2008-2016 VoltDB Inc. + */ + +package org.voltdb.exportclient; + +import java.io.IOException; +import java.io.StringWriter; +import java.nio.ByteBuffer; +import java.text.SimpleDateFormat; +import java.util.ArrayList; +import java.util.List; +import java.util.Properties; +import java.util.TimeZone; +import java.util.concurrent.TimeUnit; + +import org.voltcore.logging.VoltLogger; +import org.voltcore.utils.CoreUtils; +import org.voltdb.VoltDB; +import org.voltdb.common.Constants; +import org.voltdb.export.AdvertisedDataSource; + +import au.com.bytecode.opencsv_voltpatches.CSVWriter; + +import com.amazonaws.auth.BasicAWSCredentials; +import com.amazonaws.regions.Region; +import com.amazonaws.regions.RegionUtils; +import com.amazonaws.services.kinesisfirehose.AmazonKinesisFirehoseClient; +import com.amazonaws.services.kinesisfirehose.model.DescribeDeliveryStreamRequest; +import com.amazonaws.services.kinesisfirehose.model.DescribeDeliveryStreamResult; +import com.amazonaws.services.kinesisfirehose.model.InvalidArgumentException; +import com.amazonaws.services.kinesisfirehose.model.PutRecordBatchRequest; +import com.amazonaws.services.kinesisfirehose.model.PutRecordBatchResponseEntry; +import com.amazonaws.services.kinesisfirehose.model.PutRecordBatchResult; +import com.amazonaws.services.kinesisfirehose.model.Record; +import com.amazonaws.services.kinesisfirehose.model.ResourceNotFoundException; +import com.amazonaws.services.kinesisfirehose.model.ServiceUnavailableException; +import com.google_voltpatches.common.base.Throwables; +import com.google_voltpatches.common.util.concurrent.ListeningExecutorService; + +public class KinesisFirehoseExportClient extends ExportClientBase { + private static final VoltLogger m_logger = new VoltLogger("ExportClient"); + + String m_streamName = null; + AmazonKinesisFirehoseClient m_firehoseClient = null; + boolean m_skipInternal; + ExportDecoderBase.BinaryEncoding m_binaryEncoding; + char m_seperator; + // use thread-local to avoid SimpleDateFormat thread-safety issues + ThreadLocal m_ODBCDateformat; + + @Override + public void configure(Properties config) throws Exception + { + String regionName = config.getProperty("region","").trim(); + if (regionName.isEmpty()) { + throw new IllegalArgumentException("KinesisFirehoseExportClient: must provide an region"); + } + Region region = RegionUtils.getRegion(regionName); + + m_streamName = config.getProperty("stream.name","").trim(); + if (m_streamName.isEmpty()) { + throw new IllegalArgumentException("KinesisFirehoseExportClient: must provide a stream.name"); + } + + String accessKey = config.getProperty("access.key","").trim(); + if (accessKey.isEmpty()) { + throw new IllegalArgumentException("KinesisFirehoseExportClient: must provide an access.key"); + } + String secretKey = config.getProperty("secret.key","").trim(); + if (secretKey.isEmpty()) { + throw new IllegalArgumentException("KinesisFirehoseExportClient: must provide a secret.key"); + } + m_firehoseClient = new AmazonKinesisFirehoseClient( + new BasicAWSCredentials(accessKey, secretKey)); + m_firehoseClient.setRegion(region); + + m_skipInternal = Boolean.parseBoolean(config.getProperty("skipinternals", "false")); + + final TimeZone tz = TimeZone.getTimeZone(config.getProperty("timezone", VoltDB.GMT_TIMEZONE.getID())); + m_ODBCDateformat = new ThreadLocal() { + @Override + protected SimpleDateFormat initialValue() { + SimpleDateFormat sdf = new SimpleDateFormat(Constants.ODBC_DATE_FORMAT_STRING); + sdf.setTimeZone(tz); + return sdf; + } + }; + + m_binaryEncoding = ExportDecoderBase.BinaryEncoding.valueOf( + config.getProperty("binaryencoding", "HEX").trim().toUpperCase()); + + // Default to CSV if missing + String type = config.getProperty("type", "csv").trim(); + if (type.equalsIgnoreCase("csv")) { + m_seperator = ','; + } else if (type.equalsIgnoreCase("tsv")) { + m_seperator = '\t'; + } else { + throw new IllegalArgumentException("Error: type must be one of CSV or TSV"); + } + + validateStream(); + // Kinesis firehose limits record size to 1,000 KB + setRowLengthLimit(1000000); + } + + private void validateStream() throws Exception { + DescribeDeliveryStreamRequest describeHoseRequest = new DescribeDeliveryStreamRequest(). + withDeliveryStreamName(m_streamName); + DescribeDeliveryStreamResult describeHoseResult = null; + String status = "UNDEFINED"; + describeHoseResult = m_firehoseClient.describeDeliveryStream(describeHoseRequest); + status = describeHoseResult.getDeliveryStreamDescription().getDeliveryStreamStatus(); + if(status.equalsIgnoreCase("ACTIVE")){ + return; + } + else if(status.equalsIgnoreCase("CREATING")){ + Thread.sleep(5000); + validateStream(); + } + else { + throw new Exception("Cannot use stream " + m_streamName + ", responded with " + status); + } + } + + @Override + public void shutdown() + { + m_firehoseClient.shutdown(); + } + + @Override + public ExportDecoderBase constructExportDecoder(AdvertisedDataSource source) + { + return new KinesisFirehoseExportDecoder(source); + } + + class KinesisFirehoseExportDecoder extends ExportDecoderBase { + private final ListeningExecutorService m_es; + + private ArrayList m_records; + + @Override + public ListeningExecutorService getExecutor() { + return m_es; + } + + public KinesisFirehoseExportDecoder(AdvertisedDataSource source) + { + super(source); + m_es = CoreUtils.getListeningSingleThreadExecutor( + "Kinesis Firehose Export decoder for partition " + source.partitionId + + " table " + source.tableName + + " generation " + source.m_generation, CoreUtils.MEDIUM_STACK_SIZE); + } + + @Override + public boolean processRow(int rowSize, byte[] rowData) throws RestartBlockException + { + StringWriter stringer = new StringWriter(); + CSVWriter csv = new CSVWriter(stringer, m_seperator); + Record record = new Record(); + try { + final ExportRowData row = decodeRow(rowData); + if (!writeRow(row.values, csv, m_skipInternal, m_binaryEncoding, m_ODBCDateformat.get())) { + return false; + } + csv.flush(); + + String data = stringer.toString(); + record.withData(ByteBuffer.wrap(data.getBytes())); + } catch(IOException e) { + rateLimitedLogError(m_logger, "Failed to build record: %s", Throwables.getStackTraceAsString(e)); + throw new RestartBlockException(true); + } finally { + try { csv.close(); } catch (IOException e) {} + } + + m_records.add(record); + return true; + } + + @Override + public void sourceNoLongerAdvertised(AdvertisedDataSource source) + { + m_es.shutdown(); + try { + m_es.awaitTermination(365, TimeUnit.DAYS); + } catch (InterruptedException e) { + Throwables.propagate(e); + } + } + + @Override + public void onBlockStart() throws RestartBlockException + { + m_records = new ArrayList(); + } + + @Override + public void onBlockCompletion() throws RestartBlockException + { + try { + int recordsSize = m_records.size(); + List recordsList; + int sleepTime = 0; + while (recordsSize > 0) { + if (sleepTime > 0) + Thread.sleep(sleepTime); + // PutRecordBatchRequest can not contain more than 500 records + if (recordsSize > 500) { + recordsList = m_records.subList(0, 499); + m_records = new ArrayList(m_records.subList(500, recordsSize-1)); + recordsSize = m_records.size(); + } else { + recordsList = new ArrayList(m_records); + m_records.clear(); + recordsSize = 0; + } + PutRecordBatchRequest batchRequest = new PutRecordBatchRequest(). + withDeliveryStreamName(m_streamName). + withRecords(recordsList); + PutRecordBatchResult res = m_firehoseClient.putRecordBatch(batchRequest); + if (res.getFailedPutCount() > 0) { + for (PutRecordBatchResponseEntry entry : res.getRequestResponses()) { + if (entry.getErrorMessage() != null && !entry.getErrorMessage().contains("Slow down.")) { + rateLimitedLogError(m_logger, "Record failed with response: %s", entry.getErrorMessage()); + throw new RestartBlockException(true); + } + } + m_records.addAll(0, recordsList); + recordsSize = m_records.size(); + sleepTime = sleepTime == 0 ? 1000 : sleepTime*2; + } else + sleepTime = sleepTime == 0 ? 0 : sleepTime-10; + } + } catch (ResourceNotFoundException | InvalidArgumentException | ServiceUnavailableException | InterruptedException e) { + rateLimitedLogError(m_logger, "Failed to send record batch: %s", Throwables.getStackTraceAsString(e)); + throw new RestartBlockException(true); + } + } + } +}