diff --git a/src/org/hbase/async/Config.java b/src/org/hbase/async/Config.java index 79a4652..29dc93e 100644 --- a/src/org/hbase/async/Config.java +++ b/src/org/hbase/async/Config.java @@ -345,6 +345,7 @@ private void loadSystemAndDefaults() { default_map.put("asynchbase.channel.check_write_state", "false"); default_map.put("assynccassandra.port", "9160"); + default_map.put("asynccassandra.max_conns_per_host", "16"); for (Map.Entry entry : default_map.entrySet()) { if (!properties.containsKey(entry.getKey())) diff --git a/src/org/hbase/async/HBaseClient.java b/src/org/hbase/async/HBaseClient.java index d4d05a6..780b694 100644 --- a/src/org/hbase/async/HBaseClient.java +++ b/src/org/hbase/async/HBaseClient.java @@ -15,7 +15,12 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.google.common.util.concurrent.FutureCallback; +import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.ListeningExecutorService; +import com.google.common.util.concurrent.MoreExecutors; +import com.google.common.util.concurrent.ThreadFactoryBuilder; import com.netflix.astyanax.AstyanaxContext; import com.netflix.astyanax.ColumnListMutation; import com.netflix.astyanax.Keyspace; @@ -76,7 +81,9 @@ public class HBaseClient { final Config config; final ExecutorService executor = Executors.newFixedThreadPool(25); - + final ListeningExecutorService service = + MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(25)); + final ByteMap> contexts = new ByteMap>(); final ByteMap keyspaces = new ByteMap(); @@ -127,11 +134,19 @@ public HBaseClient(final Config config) { "Missing required config 'asynccassandra.seeds'"); } + final int num_workers = config.hasProperty("asynccassandra.workers.size") ? + config.getInt("asynccassandra.workers.size") : + Runtime.getRuntime().availableProcessors() * 2; + ast_config = new AstyanaxConfigurationImpl() - .setDiscoveryType(NodeDiscoveryType.RING_DESCRIBE); + .setDiscoveryType(NodeDiscoveryType.RING_DESCRIBE) + .setAsyncExecutor( + Executors.newFixedThreadPool(num_workers, new ThreadFactoryBuilder().setDaemon(true) + .setNameFormat("AstyanaxAsync-%d") + .build())); pool = new ConnectionPoolConfigurationImpl("MyConnectionPool") .setPort(config.getInt("assynccassandra.port")) - .setMaxConnsPerHost(1) + .setMaxConnsPerHost(config.getInt("asynccassandra.max_conns_per_host")) .setSeeds(config.getString("asynccassandra.seeds")); monitor = new CountingConnectionPoolMonitor(); @@ -187,6 +202,34 @@ public void run() { } } + class FutureCB implements FutureCallback>> { + + @Override + public void onFailure(Throwable e) { + deferred.callback(e); + } + + @Override + public void onSuccess(OperationResult> result) { + try { + // TODO - can track stats here + final ColumnList columns = result.getResult(); + final ArrayList kvs = new ArrayList(columns.size()); + final Iterator> it = columns.iterator(); + while (it.hasNext()) { + final Column column = it.next(); + final KeyValue kv = new KeyValue(request.key, request.family(), + column.getName(), column.getTimestamp() / 1000, // micro to ms + column.getByteArrayValue()); + kvs.add(kv); + } + deferred.callback(kvs); + } catch (RuntimeException e) { + deferred.callback(e); + } + } + } + // Sucks, have to have a family I guess try { final ListenableFuture>> future; @@ -198,8 +241,9 @@ public void run() { } else { future = query.withColumnSlice( Arrays.asList(request.qualifiers())).executeAsync(); - } - future.addListener(new ResponseCB(future), executor); + } + //future.addListener(new ResponseCB(future), executor); + Futures.addCallback(future, new FutureCB(), service); } catch (ConnectionException e) { deferred.callback(e); } @@ -234,7 +278,23 @@ public void run() { } } } - future.addListener(new ResponseCB(), executor); + + class PutCB implements FutureCallback> { + + @Override + public void onFailure(Throwable e) { + deferred.callback(e); + } + + @Override + public void onSuccess(OperationResult arg0) { + deferred.callback(null); + } + + } + + //future.addListener(new ResponseCB(), executor); + Futures.addCallback(future, new PutCB(), service); } catch (ConnectionException e) { deferred.callback(e); }