From a0656bf84eb14d0bac99693015c5576d68be8c41 Mon Sep 17 00:00:00 2001 From: Chris Larsen Date: Tue, 14 Mar 2017 14:30:10 -0700 Subject: [PATCH 1/3] Try switching to callbacks instead of listeners. --- src/org/hbase/async/HBaseClient.java | 59 ++++++++++++++++++++++++++-- 1 file changed, 55 insertions(+), 4 deletions(-) diff --git a/src/org/hbase/async/HBaseClient.java b/src/org/hbase/async/HBaseClient.java index d4d05a6..3a7cb1c 100644 --- a/src/org/hbase/async/HBaseClient.java +++ b/src/org/hbase/async/HBaseClient.java @@ -15,7 +15,11 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.google.common.util.concurrent.FutureCallback; +import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.ListeningExecutorService; +import com.google.common.util.concurrent.MoreExecutors; import com.netflix.astyanax.AstyanaxContext; import com.netflix.astyanax.ColumnListMutation; import com.netflix.astyanax.Keyspace; @@ -76,7 +80,9 @@ public class HBaseClient { final Config config; final ExecutorService executor = Executors.newFixedThreadPool(25); - + final ListeningExecutorService service = + MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(25)); + final ByteMap> contexts = new ByteMap>(); final ByteMap keyspaces = new ByteMap(); @@ -187,6 +193,34 @@ public void run() { } } + class FutureCB implements FutureCallback>> { + + @Override + public void onFailure(Throwable e) { + deferred.callback(e); + } + + @Override + public void onSuccess(OperationResult> result) { + try { + // TODO - can track stats here + final ColumnList columns = result.getResult(); + final ArrayList kvs = new ArrayList(columns.size()); + final Iterator> it = columns.iterator(); + while (it.hasNext()) { + final Column column = it.next(); + final KeyValue kv = new KeyValue(request.key, request.family(), + column.getName(), column.getTimestamp() / 1000, // micro to ms + column.getByteArrayValue()); + kvs.add(kv); + } + deferred.callback(kvs); + } catch (RuntimeException e) { + deferred.callback(e); + } + } + } + // Sucks, have to have a family I guess try { final ListenableFuture>> future; @@ -198,8 +232,9 @@ public void run() { } else { future = query.withColumnSlice( Arrays.asList(request.qualifiers())).executeAsync(); - } - future.addListener(new ResponseCB(future), executor); + } + //future.addListener(new ResponseCB(future), executor); + Futures.addCallback(future, new FutureCB(), service); } catch (ConnectionException e) { deferred.callback(e); } @@ -234,7 +269,23 @@ public void run() { } } } - future.addListener(new ResponseCB(), executor); + + class PutCB implements FutureCallback> { + + @Override + public void onFailure(Throwable e) { + deferred.callback(e); + } + + @Override + public void onSuccess(OperationResult arg0) { + deferred.callback(null); + } + + } + + //future.addListener(new ResponseCB(), executor); + Futures.addCallback(future, new PutCB(), service); } catch (ConnectionException e) { deferred.callback(e); } From fd65c84175704cd9d405cffbf526e7b811737c67 Mon Sep 17 00:00:00 2001 From: Jason Harvey Date: Wed, 15 Mar 2017 15:37:48 -0800 Subject: [PATCH 2/3] Default astyanax threads to core*2, allow customization. --- src/org/hbase/async/HBaseClient.java | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/src/org/hbase/async/HBaseClient.java b/src/org/hbase/async/HBaseClient.java index 3a7cb1c..180b95c 100644 --- a/src/org/hbase/async/HBaseClient.java +++ b/src/org/hbase/async/HBaseClient.java @@ -20,6 +20,7 @@ import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.ListeningExecutorService; import com.google.common.util.concurrent.MoreExecutors; +import com.google.common.util.concurrent.ThreadFactoryBuilder; import com.netflix.astyanax.AstyanaxContext; import com.netflix.astyanax.ColumnListMutation; import com.netflix.astyanax.Keyspace; @@ -133,8 +134,16 @@ public HBaseClient(final Config config) { "Missing required config 'asynccassandra.seeds'"); } + final int num_workers = config.hasProperty("asynccassandra.workers.size") ? + config.getInt("asynccassandra.workers.size") : + Runtime.getRuntime().availableProcessors() * 2; + ast_config = new AstyanaxConfigurationImpl() - .setDiscoveryType(NodeDiscoveryType.RING_DESCRIBE); + .setDiscoveryType(NodeDiscoveryType.RING_DESCRIBE) + .setAsyncExecutor( + Executors.newFixedThreadPool(num_workers, new ThreadFactoryBuilder().setDaemon(true) + .setNameFormat("AstyanaxAsync-%d") + .build())); pool = new ConnectionPoolConfigurationImpl("MyConnectionPool") .setPort(config.getInt("assynccassandra.port")) .setMaxConnsPerHost(1) From e35f8e2c2c664904c90ec277ad0c39372be04244 Mon Sep 17 00:00:00 2001 From: Jason Harvey Date: Wed, 15 Mar 2017 14:23:22 -0800 Subject: [PATCH 3/3] Change default maxconns to 16, allow customization. --- src/org/hbase/async/Config.java | 1 + src/org/hbase/async/HBaseClient.java | 2 +- 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/src/org/hbase/async/Config.java b/src/org/hbase/async/Config.java index 79a4652..29dc93e 100644 --- a/src/org/hbase/async/Config.java +++ b/src/org/hbase/async/Config.java @@ -345,6 +345,7 @@ private void loadSystemAndDefaults() { default_map.put("asynchbase.channel.check_write_state", "false"); default_map.put("assynccassandra.port", "9160"); + default_map.put("asynccassandra.max_conns_per_host", "16"); for (Map.Entry entry : default_map.entrySet()) { if (!properties.containsKey(entry.getKey())) diff --git a/src/org/hbase/async/HBaseClient.java b/src/org/hbase/async/HBaseClient.java index 180b95c..780b694 100644 --- a/src/org/hbase/async/HBaseClient.java +++ b/src/org/hbase/async/HBaseClient.java @@ -146,7 +146,7 @@ public HBaseClient(final Config config) { .build())); pool = new ConnectionPoolConfigurationImpl("MyConnectionPool") .setPort(config.getInt("assynccassandra.port")) - .setMaxConnsPerHost(1) + .setMaxConnsPerHost(config.getInt("asynccassandra.max_conns_per_host")) .setSeeds(config.getString("asynccassandra.seeds")); monitor = new CountingConnectionPoolMonitor();