Skip to content

Commit

Permalink
Disable backpressure (#4)
Browse files Browse the repository at this point in the history
* disable export client side backpressure

* less aggresive backpressure; retry batch for serviceUnavaibleExeception

* addaptive backpressure; tune with binary search

* retry only failed ones, not entire batch

* add a jitter to backoff

* implement fullGitter

* add option to choose from equalJitter and decorJitter

* throttle each putBatch call considering number of corrent client

* code refactoring; settable concurrent writer

* address the reviews; expose two tunable parameters for users
  • Loading branch information
JadenW authored Jun 10, 2016
1 parent ca11e02 commit 3d585ed
Show file tree
Hide file tree
Showing 7 changed files with 485 additions and 38 deletions.
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -96,3 +96,5 @@ INSERT INTO ALERTS (ID,MSG,CONTINENT,COUNTRY) VALUES (1,'fab-02 inoperable','EU'
- `access.key` (mandatory) user's access key
- `secret.key` (mandatory) user's secret key
- `timezone` (optional, _default:_ local timezone) timezone used to format timestamp values
- `stream.limit` (optional, _default:_ 5000 records/s) Firehose Delivery Stream limit on AWS side, i.e. how many records per second it can accept
- `concurrent.writers` (optional, _default:_ 1) The number of writers in the concurrent writer pool, sharing the connection to AWS
82 changes: 82 additions & 0 deletions src/main/java/org/voltdb/exportclient/BackOff.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
/*
* The MIT License (MIT)
*
* Copyright (C) 2008-2016 VoltDB Inc.
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in
* all copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
* THE SOFTWARE.
*/

package org.voltdb.exportclient;

import java.util.concurrent.ThreadLocalRandom;

abstract class BackOff {
int base;
int cap;

public BackOff(int base, int cap) {
this.base = base;
this.cap = cap;
}

public int expo(int n) {
return Math.min(cap, base * (1<<n));
}

abstract public int backoff(int n);
}

class ExpoBackOffFullJitter extends BackOff {

public ExpoBackOffFullJitter(int base, int cap) {
super(base, cap);
}

@Override
public int backoff(int n) {
return ThreadLocalRandom.current().nextInt(expo(n));
}
}

class ExpoBackOffEqualJitter extends BackOff {

public ExpoBackOffEqualJitter(int base, int cap) {
super(base, cap);
}

@Override
public int backoff(int n) {
int v = expo(n) / 2 ;
return ThreadLocalRandom.current().nextInt(v, 3 * v);
}
}

class ExpoBackOffDecor extends BackOff {
int sleep;
public ExpoBackOffDecor(int base, int cap) {
super(base, cap);
sleep = base;
}

@Override
public int backoff(int n) {
sleep = Math.min(cap, ThreadLocalRandom.current().nextInt(sleep * (1+n) + base));
return sleep;
}
}
40 changes: 40 additions & 0 deletions src/main/java/org/voltdb/exportclient/BackOffFactory.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
/*
* The MIT License (MIT)
*
* Copyright (C) 2008-2016 VoltDB Inc.
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in
* all copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
* THE SOFTWARE.
*/

package org.voltdb.exportclient;

public class BackOffFactory {
public static BackOff getBackOff(String backOffType, int backOffBase, int backOffCap) {
switch (backOffType) {
case "full":
return new ExpoBackOffFullJitter(backOffBase, backOffCap);
case "equal":
return new ExpoBackOffEqualJitter(backOffBase, backOffCap);
case "decor":
return new ExpoBackOffDecor(backOffBase, backOffCap);
default:
return new ExpoBackOffDecor(backOffBase, backOffCap);
}
}
}
65 changes: 65 additions & 0 deletions src/main/java/org/voltdb/exportclient/FirehoseExportException.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
/*
* The MIT License (MIT)
*
* Copyright (C) 2008-2016 VoltDB Inc.
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in
* all copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
* THE SOFTWARE.
*/

package org.voltdb.exportclient;

import java.util.Arrays;
import java.util.IllegalFormatConversionException;
import java.util.MissingFormatArgumentException;
import java.util.UnknownFormatConversionException;

public class FirehoseExportException extends RuntimeException {

private static final long serialVersionUID = 4260074108700559787L;

public FirehoseExportException() {
}

public FirehoseExportException(String format, Object...args) {
super(format(format, args));
}

public FirehoseExportException(Throwable cause) {
super(cause);
}

public FirehoseExportException(String format, Throwable cause, Object...args) {
super(format(format, args), cause);
}

static protected String format(String format, Object...args) {
String formatted = null;
try {
formatted = String.format(format, args);
} catch (MissingFormatArgumentException|IllegalFormatConversionException|
UnknownFormatConversionException ignoreThem) {
}
finally {
if (formatted == null) {
formatted = "Format: " + format + ", arguments: " + Arrays.toString(args);
}
}
return formatted;
}
}
114 changes: 114 additions & 0 deletions src/main/java/org/voltdb/exportclient/FirehoseExportLogger.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
/*
* The MIT License (MIT)
*
* Copyright (C) 2008-2016 VoltDB Inc.
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in
* all copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
* THE SOFTWARE.
*/

package org.voltdb.exportclient;

import java.util.concurrent.TimeUnit;

import org.voltcore.logging.Level;
import org.voltcore.logging.VoltLogger;
import org.voltcore.utils.EstTime;
import org.voltcore.utils.RateLimitedLogger;

public class FirehoseExportLogger {

final static long SUPPRESS_INTERVAL = 10;
final private VoltLogger m_logger = new VoltLogger("ExportClient");

public FirehoseExportLogger() {
}

private void log(Level level, Throwable cause, String format, Object...args) {
RateLimitedLogger.tryLogForMessage(
EstTime.currentTimeMillis(),
SUPPRESS_INTERVAL, TimeUnit.SECONDS,
m_logger, level,
cause, format, args
);
}

public VoltLogger getLogger() {
return m_logger;
}

public void trace(String format, Object...args) {
if (m_logger.isTraceEnabled()) {
log(Level.TRACE, null, format, args);
}
}

public void debug(String format, Object...args) {
if (m_logger.isDebugEnabled()) {
log(Level.DEBUG, null, format, args);
}
}

public void info(String format, Object...args) {
if (m_logger.isInfoEnabled()) {
log(Level.INFO, null, format, args);
}
}

public void warn(String format, Object...args) {
log(Level.WARN, null, format, args);
}

public void error(String format, Object...args) {
log(Level.ERROR, null, format, args);
}

public void fatal(String format, Object...args) {
log(Level.FATAL, null, format, args);
}

public void trace(String format, Throwable cause, Object...args) {
if (m_logger.isTraceEnabled()) {
log(Level.TRACE, cause, format, args);
}
}

public void debug(String format, Throwable cause, Object...args) {
if (m_logger.isDebugEnabled()) {
log(Level.DEBUG, cause, format, args);
}
}

public void info(String format, Throwable cause, Object...args) {
if (m_logger.isInfoEnabled()) {
log(Level.INFO, cause, format, args);
}
}

public void warn(String format, Throwable cause, Object...args) {
log(Level.WARN, cause, format, args);
}

public void error(String format, Throwable cause, Object...args) {
log(Level.ERROR, cause, format, args);
}

public void fatal(String format, Throwable cause, Object...args) {
log(Level.FATAL, cause, format, args);
}
}
Loading

0 comments on commit 3d585ed

Please sign in to comment.