diff --git a/vertx-oracle-client/src/main/java/io/vertx/oracleclient/OracleConnectOptions.java b/vertx-oracle-client/src/main/java/io/vertx/oracleclient/OracleConnectOptions.java index 7340ac2bc..c698b6237 100644 --- a/vertx-oracle-client/src/main/java/io/vertx/oracleclient/OracleConnectOptions.java +++ b/vertx-oracle-client/src/main/java/io/vertx/oracleclient/OracleConnectOptions.java @@ -43,6 +43,7 @@ public static OracleConnectOptions wrap(SqlConnectOptions options) { public static final String DEFAULT_PASSWORD = ""; public static final String DEFAULT_DATABASE = ""; public static final boolean DEFAULT_SSL = false; + public static final int DEFAULT_PIPELINING_LIMIT = 1; private String serviceId; private String serviceName; @@ -51,6 +52,7 @@ public static OracleConnectOptions wrap(SqlConnectOptions options) { private String tnsAlias; private String tnsAdmin; private boolean ssl; + private int pipeliningLimit = DEFAULT_PIPELINING_LIMIT; public OracleConnectOptions() { super(); @@ -69,6 +71,7 @@ private void copyFields(OracleConnectOptions other) { this.tnsAlias = other.tnsAlias; this.tnsAdmin = other.tnsAdmin; this.ssl = other.ssl; + this.pipeliningLimit = other.pipeliningLimit; } public OracleConnectOptions(SqlConnectOptions options) { @@ -208,6 +211,29 @@ public OracleConnectOptions setTnsAdmin(String tnsAdmin) { return this; } + /** + * Get the pipelining limit count. + * + * @return the pipelining count + */ + public int getPipeliningLimit() { + return pipeliningLimit; + } + + /** + * Set the pipelining limit count. + * + * @param pipeliningLimit the count to configure + * @return a reference to this, so the API can be used fluently + */ + public OracleConnectOptions setPipeliningLimit(int pipeliningLimit) { + if (pipeliningLimit < 1) { + throw new IllegalArgumentException("pipelining limit can not be less than 1"); + } + this.pipeliningLimit = pipeliningLimit; + return this; + } + // Non-specific options @Override diff --git a/vertx-oracle-client/src/main/java/io/vertx/oracleclient/impl/OracleConnectionFactory.java b/vertx-oracle-client/src/main/java/io/vertx/oracleclient/impl/OracleConnectionFactory.java index 86829bf6e..26b701b81 100644 --- a/vertx-oracle-client/src/main/java/io/vertx/oracleclient/impl/OracleConnectionFactory.java +++ b/vertx-oracle-client/src/main/java/io/vertx/oracleclient/impl/OracleConnectionFactory.java @@ -66,7 +66,7 @@ public Future connect(Context context, OracleConnectOptions optio return executeBlocking(context, () -> { OracleConnection orac = datasource.createConnectionBuilder().build(); OracleMetadata metadata = new OracleMetadata(orac.getMetaData()); - OracleJdbcConnection conn = new OracleJdbcConnection(ctx, metrics, options, orac, metadata); + OracleJdbcConnection conn = new OracleJdbcConnection(ctx, metrics, options, orac, metadata, options.getPipeliningLimit()); OracleConnectionImpl msConn = new OracleConnectionImpl(ctx, this, conn); conn.init(msConn); return msConn; diff --git a/vertx-oracle-client/src/main/java/io/vertx/oracleclient/impl/OracleJdbcConnection.java b/vertx-oracle-client/src/main/java/io/vertx/oracleclient/impl/OracleJdbcConnection.java index 0f00a4fe4..f6e038e51 100644 --- a/vertx-oracle-client/src/main/java/io/vertx/oracleclient/impl/OracleJdbcConnection.java +++ b/vertx-oracle-client/src/main/java/io/vertx/oracleclient/impl/OracleJdbcConnection.java @@ -41,20 +41,23 @@ public class OracleJdbcConnection implements Connection { private final OracleConnectOptions options; @SuppressWarnings("rawtypes") private final ConcurrentMap cursors = new ConcurrentHashMap<>(); + private final int pipeliningLimit; private Holder holder; // Command pipeline state @SuppressWarnings("rawtypes") private final Deque pending = new ArrayDeque<>(); private Promise closePromise; - private boolean inflight, executing; + private boolean executing; + private int inflight; - public OracleJdbcConnection(ContextInternal ctx, ClientMetrics metrics, OracleConnectOptions options, OracleConnection oc, OracleMetadata metadata) { + public OracleJdbcConnection(ContextInternal ctx, ClientMetrics metrics, OracleConnectOptions options, OracleConnection oc, OracleMetadata metadata, int pipeliningLimit) { this.context = ctx; this.metrics = metrics; this.options = options; this.connection = oc; this.metadata = metadata; + this.pipeliningLimit = pipeliningLimit; } @Override @@ -183,8 +186,8 @@ private void checkPending() { try { executing = true; CommandBase cmd; - while (!inflight && (cmd = pending.poll()) != null) { - inflight = true; + while (inflight < pipeliningLimit && (cmd = pending.poll()) != null) { + inflight++; if (metrics != null && cmd instanceof CloseConnectionCommand) { metrics.close(); } @@ -243,7 +246,7 @@ private OracleCommand forExtendedQuery(ExtendedQueryCommand cmd) { } private void actionComplete(CommandBase cmd, OracleCommand action, AsyncResult ar) { - inflight = false; + inflight--; Future future = Future.succeededFuture(); if (ar.failed()) { Throwable cause = ar.cause(); diff --git a/vertx-oracle-client/src/test/java/io/vertx/oracleclient/test/tck/OraclePipeliningQueryTest.java b/vertx-oracle-client/src/test/java/io/vertx/oracleclient/test/tck/OraclePipeliningQueryTest.java new file mode 100644 index 000000000..5e390967c --- /dev/null +++ b/vertx-oracle-client/src/test/java/io/vertx/oracleclient/test/tck/OraclePipeliningQueryTest.java @@ -0,0 +1,92 @@ +package io.vertx.oracleclient.test.tck; + +import io.vertx.ext.unit.TestContext; +import io.vertx.ext.unit.junit.VertxUnitRunner; +import io.vertx.oracleclient.OracleBuilder; +import io.vertx.oracleclient.OracleConnectOptions; +import io.vertx.oracleclient.test.junit.OracleRule; +import io.vertx.sqlclient.PoolOptions; +import io.vertx.sqlclient.Row; +import io.vertx.sqlclient.SqlClient; +import io.vertx.sqlclient.Tuple; +import io.vertx.sqlclient.tck.PipeliningQueryTestBase; +import org.junit.ClassRule; +import org.junit.Ignore; +import org.junit.runner.RunWith; + +import java.util.ArrayList; +import java.util.List; + +@RunWith(VertxUnitRunner.class) +public class OraclePipeliningQueryTest extends PipeliningQueryTestBase { + + @ClassRule + public static final OracleRule rule = OracleRule.SHARED_INSTANCE; + + @Override + protected void cleanTestTable(TestContext ctx) { + connectionConnector.connect(ctx.asyncAssertSuccess(conn -> { + conn + .query("TRUNCATE TABLE mutable") + .execute() + .onComplete(ctx.asyncAssertSuccess(result -> { + conn.close(); + })); + })); + } + + @Override + protected void init() { + options = rule.options(); + OracleConnectOptions oracleConnectOptions = (OracleConnectOptions) options; + oracleConnectOptions.setPipeliningLimit(64); + connectionConnector = ClientConfig.CONNECT.connect(vertx, oracleConnectOptions); + pooledConnectionConnector = ClientConfig.POOLED.connect(vertx, oracleConnectOptions); + pooledClientSupplier = () -> OracleBuilder.pool(b -> b.connectingTo(oracleConnectOptions).with(new PoolOptions().setMaxSize(8)).using(vertx)); + } + + @Override + protected String statement(String... parts) { + return String.join("?", parts); + } + + @Override + protected void testOneShotPreparedBatchInsert(TestContext ctx, SqlClient client) { + List batchParams = new ArrayList<>(); + for (int i = 0; i < 1000; i++) { + batchParams.add(Tuple.of(i, String.format("val-%d", i))); + } + + client.preparedQuery(statement("INSERT INTO mutable(id, val) VALUES (", ", ",")")) + .executeBatch(batchParams) + .onComplete(ctx.asyncAssertSuccess(res -> { + ctx.assertEquals(1000, res.rowCount()); + + client.query("SELECT id, val FROM mutable") + .execute() + .onComplete(ctx.asyncAssertSuccess(res2 -> { + ctx.assertEquals(1000, res2.size()); + int i = 0; + for (Row row : res2) { + ctx.assertEquals(2, row.size()); + ctx.assertEquals(i, row.getInteger(0)); + ctx.assertEquals(String.format("val-%d", i), row.getString(1)); + i++; + } + client.close(); + })); + })); + } + + @Override + @Ignore("Oracle does not support batching queries") + public void testOneShotPreparedBatchQueryConn(TestContext ctx) { + + } + + @Override + @Ignore("Oracle does not support batching queries") + public void testOneShotPreparedBatchQueryPool(TestContext ctx) { + + } +} diff --git a/vertx-sql-client/src/test/java/io/vertx/sqlclient/tck/PipeliningQueryTestBase.java b/vertx-sql-client/src/test/java/io/vertx/sqlclient/tck/PipeliningQueryTestBase.java index 77b669bf0..4dcf374dc 100644 --- a/vertx-sql-client/src/test/java/io/vertx/sqlclient/tck/PipeliningQueryTestBase.java +++ b/vertx-sql-client/src/test/java/io/vertx/sqlclient/tck/PipeliningQueryTestBase.java @@ -196,7 +196,7 @@ public void testOneShotPreparedBatchInsertPool(TestContext ctx) { testOneShotPreparedBatchInsert(ctx, client); } - private void testOneShotPreparedBatchInsert(TestContext ctx, SqlClient client) { + protected void testOneShotPreparedBatchInsert(TestContext ctx, SqlClient client) { Async latch = ctx.async(1000); List batchParams = new ArrayList<>(); for (int i = 0; i < 1000; i++) { @@ -228,7 +228,7 @@ private void testOneShotPreparedBatchInsert(TestContext ctx, SqlClient client) { })); } - private void cleanTestTable(TestContext ctx) { + protected void cleanTestTable(TestContext ctx) { connectionConnector.connect(ctx.asyncAssertSuccess(conn -> { conn .query("TRUNCATE TABLE mutable;")