Skip to content

Commit

Permalink
Tidy up code and increase default timeout from 30 to 60
Browse files Browse the repository at this point in the history
  • Loading branch information
zhicwu committed Jan 15, 2021
1 parent cc8a4c0 commit 2111265
Show file tree
Hide file tree
Showing 3 changed files with 91 additions and 115 deletions.
2 changes: 1 addition & 1 deletion docker/config/server.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"requestTimeout": 5000,
"queryTimeout": 30000,
"queryTimeout": 60000,
"configScanPeriod": 5000,
"repositories": [
{
Expand Down
2 changes: 1 addition & 1 deletion misc/perf-test/jdbc-bridge/config/datasources/mariadb.json
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
"https://repo1.maven.org/maven2/org/mariadb/jdbc/mariadb-java-client/2.7.0/mariadb-java-client-2.7.0.jar"
],
"driverClassName": "org.mariadb.jdbc.Driver",
"jdbcUrl": "jdbc:mariadb://mariadb:3306/test?useSSL=false&useCompression=false",
"jdbcUrl": "jdbc:mariadb://mariadb:3306/test?useSSL=false&useCompression=false&rewriteBatchedStatements=true",
"dataSource": {
"user": "root",
"password": "root"
Expand Down
202 changes: 89 additions & 113 deletions src/main/java/ru/yandex/clickhouse/jdbcbridge/JdbcBridgeVerticle.java
Original file line number Diff line number Diff line change
Expand Up @@ -242,7 +242,7 @@ private void startServer(JsonObject bridgeServerConfig, JsonObject httpServerCon
// .setTcpFastOpen(true).setLogActivity(true));

long requestTimeout = bridgeServerConfig.getLong("requestTimeout", 5000L);
long queryTimeout = Math.max(requestTimeout, bridgeServerConfig.getLong("queryTimeout", 30000L));
long queryTimeout = Math.max(requestTimeout, bridgeServerConfig.getLong("queryTimeout", 60000L));

TimeoutHandler requestTimeoutHandler = TimeoutHandler.create(requestTimeout);
TimeoutHandler queryTimeoutHandler = TimeoutHandler.create(queryTimeout);
Expand All @@ -262,8 +262,10 @@ private void startServer(JsonObject bridgeServerConfig, JsonObject httpServerCon
.handler(this::handleIdentifierQuote);
router.post("/columns_info").produces(RESPONSE_CONTENT_TYPE).handler(queryTimeoutHandler)
.handler(this::handleColumnsInfo);
router.post("/").produces(RESPONSE_CONTENT_TYPE).handler(queryTimeoutHandler).handler(this::handleQuery);
router.post("/write").produces(RESPONSE_CONTENT_TYPE).handler(queryTimeoutHandler).handler(this::handleWrite);
router.post("/").produces(RESPONSE_CONTENT_TYPE).handler(queryTimeoutHandler)
.blockingHandler(this::handleQuery);
router.post("/write").produces(RESPONSE_CONTENT_TYPE).handler(queryTimeoutHandler)
.blockingHandler(this::handleWrite);

log.info("Starting web server...");
int port = bridgeServerConfig.getInteger("serverPort", DEFAULT_SERVER_PORT);
Expand Down Expand Up @@ -415,84 +417,73 @@ private void handleQuery(RoutingContext ctx) {

final HttpServerResponse resp = ctx.response().setChunked(true);

vertx.executeBlocking(promise -> {
if (log.isTraceEnabled()) {
log.trace("About to execute query...");
}
if (log.isTraceEnabled()) {
log.trace("About to execute query...");
}

QueryParameters params = parser.getQueryParameters();
NamedDataSource ds = getDataSource(manager, parser.getConnectionString(), params.isDebug());
params = ds.newQueryParameters(params);
QueryParameters params = parser.getQueryParameters();
NamedDataSource ds = getDataSource(manager, parser.getConnectionString(), params.isDebug());
params = ds.newQueryParameters(params);

String rawSchema = parser.getRawSchema();
NamedSchema namedSchema = getSchemaRepository().get(rawSchema);
String rawSchema = parser.getRawSchema();
NamedSchema namedSchema = getSchemaRepository().get(rawSchema);

String generatedQuery = parser.getRawQuery();
String normalizedQuery = parser.getNormalizedQuery();
// try if it's a named query first
NamedQuery namedQuery = getQueryRepository().get(normalizedQuery);
// in case the "query" is a local file...
normalizedQuery = ds.loadSavedQueryAsNeeded(normalizedQuery, params);
String generatedQuery = parser.getRawQuery();
String normalizedQuery = parser.getNormalizedQuery();
// try if it's a named query first
NamedQuery namedQuery = getQueryRepository().get(normalizedQuery);
// in case the "query" is a local file...
normalizedQuery = ds.loadSavedQueryAsNeeded(normalizedQuery, params);

if (log.isDebugEnabled()) {
log.debug("Generated query:\n{}\nNormalized query:\n{}", generatedQuery, normalizedQuery);
}
if (log.isDebugEnabled()) {
log.debug("Generated query:\n{}\nNormalized query:\n{}", generatedQuery, normalizedQuery);
}

ResponseWriter writer = new ResponseWriter(resp, parser.getStreamOptions(),
ds.getQueryTimeout(params.getTimeout()));
ResponseWriter writer = new ResponseWriter(resp, parser.getStreamOptions(),
ds.getQueryTimeout(params.getTimeout()));

long executionStartTime = System.currentTimeMillis();
if (namedQuery != null) {
if (log.isDebugEnabled()) {
log.debug("Found named query: [{}]", namedQuery);
}
long executionStartTime = System.currentTimeMillis();
if (namedQuery != null) {
if (log.isDebugEnabled()) {
log.debug("Found named query: [{}]", namedQuery);
}

if (namedSchema == null) {
namedSchema = getSchemaRepository().get(namedQuery.getSchema());
}
// columns in request might just be a subset of defined list
// for example:
// - named query 'test' is: select a, b, c from table
// - clickhouse query: select b, a from jdbc('?','','test')
// - requested columns: b, a
ds.executeQuery(rawSchema, namedQuery,
namedSchema != null ? namedSchema.getColumns() : parser.getTable(), params, writer);
} else {
// columnsInfo could be different from what we responded earlier, so let's parse
// it again
TableDefinition queryColumns = namedSchema != null ? namedSchema.getColumns() : parser.getTable();
// unfortunately default values will be lost between two requests, so we have to
// add it back...
List<ColumnDefinition> additionalColumns = new ArrayList<ColumnDefinition>();
if (params.showDatasourceColumn()) {
additionalColumns.add(new ColumnDefinition(TableDefinition.COLUMN_DATASOURCE, DataType.Str, true,
DEFAULT_LENGTH, DEFAULT_PRECISION, DEFAULT_SCALE, null, ds.getId(), null));
}
if (params.showCustomColumns()) {
additionalColumns.addAll(ds.getCustomColumns());
}
if (namedSchema == null) {
namedSchema = getSchemaRepository().get(namedQuery.getSchema());
}
// columns in request might just be a subset of defined list
// for example:
// - named query 'test' is: select a, b, c from table
// - clickhouse query: select b, a from jdbc('?','','test')
// - requested columns: b, a
ds.executeQuery(rawSchema, namedQuery, namedSchema != null ? namedSchema.getColumns() : parser.getTable(),
params, writer);
} else {
// columnsInfo could be different from what we responded earlier, so let's parse
// it again
TableDefinition queryColumns = namedSchema != null ? namedSchema.getColumns() : parser.getTable();
// unfortunately default values will be lost between two requests, so we have to
// add it back...
List<ColumnDefinition> additionalColumns = new ArrayList<ColumnDefinition>();
if (params.showDatasourceColumn()) {
additionalColumns.add(new ColumnDefinition(TableDefinition.COLUMN_DATASOURCE, DataType.Str, true,
DEFAULT_LENGTH, DEFAULT_PRECISION, DEFAULT_SCALE, null, ds.getId(), null));
}
if (params.showCustomColumns()) {
additionalColumns.addAll(ds.getCustomColumns());
}

queryColumns.updateValues(additionalColumns);
queryColumns.updateValues(additionalColumns);

ds.executeQuery(namedSchema == null ? rawSchema : Utils.EMPTY_STRING, parser.getNormalizedQuery(),
normalizedQuery, queryColumns, params, writer);
}
ds.executeQuery(namedSchema == null ? rawSchema : Utils.EMPTY_STRING, parser.getNormalizedQuery(),
normalizedQuery, queryColumns, params, writer);
}

if (log.isDebugEnabled()) {
log.debug("Completed execution in {} ms.", System.currentTimeMillis() - executionStartTime);
}
if (log.isDebugEnabled()) {
log.debug("Completed execution in {} ms.", System.currentTimeMillis() - executionStartTime);
}

promise.complete();
}, false, res -> {
if (res.succeeded()) {
if (log.isDebugEnabled()) {
log.debug("Wrote back query result");
}
ctx.response().end();
} else {
ctx.fail(res.cause());
}
});
resp.end();
}

// https://github.com/ClickHouse/ClickHouse/blob/bee5849c6a7dba20dbd24dfc5fd5a786745d90ff/programs/odbc-bridge/MainHandler.cpp#L169
Expand All @@ -502,58 +493,42 @@ private void handleWrite(RoutingContext ctx) {

final HttpServerResponse resp = ctx.response().setChunked(true);

vertx.executeBlocking(promise -> {
if (log.isTraceEnabled()) {
log.trace("About to execute mutation...");
}
if (log.isTraceEnabled()) {
log.trace("About to execute mutation...");
}

QueryParameters params = parser.getQueryParameters();
NamedDataSource ds = getDataSource(manager, parser.getConnectionString(), params.isDebug());
params = ds == null ? params : ds.newQueryParameters(params);
QueryParameters params = parser.getQueryParameters();
NamedDataSource ds = getDataSource(manager, parser.getConnectionString(), params.isDebug());
params = ds == null ? params : ds.newQueryParameters(params);

final String generatedQuery = parser.getRawQuery();
final String generatedQuery = parser.getRawQuery();

String normalizedQuery = parser.getNormalizedQuery();
if (log.isDebugEnabled()) {
log.debug("Generated query:\n{}\nNormalized query:\n{}", generatedQuery, normalizedQuery);
}
String normalizedQuery = parser.getNormalizedQuery();
if (log.isDebugEnabled()) {
log.debug("Generated query:\n{}\nNormalized query:\n{}", generatedQuery, normalizedQuery);
}

// try if it's a named query first
NamedQuery namedQuery = getQueryRepository().get(normalizedQuery);
// in case the "query" is a local file...
normalizedQuery = ds.loadSavedQueryAsNeeded(normalizedQuery, params);
// try if it's a named query first
NamedQuery namedQuery = getQueryRepository().get(normalizedQuery);
// in case the "query" is a local file...
normalizedQuery = ds.loadSavedQueryAsNeeded(normalizedQuery, params);

// TODO: use named schema as table name?
// TODO: use named schema as table name?

String table = parser.getRawQuery();
if (namedQuery != null) {
table = parser.extractTable(ds.loadSavedQueryAsNeeded(namedQuery.getQuery(), params));
} else {
table = parser.extractTable(ds.loadSavedQueryAsNeeded(normalizedQuery, params));
}
String table = parser.getRawQuery();
if (namedQuery != null) {
table = parser.extractTable(ds.loadSavedQueryAsNeeded(namedQuery.getQuery(), params));
} else {
table = parser.extractTable(ds.loadSavedQueryAsNeeded(normalizedQuery, params));
}

ResponseWriter writer = new ResponseWriter(resp, parser.getStreamOptions(),
ds.getWriteTimeout(params.getTimeout()));
ResponseWriter writer = new ResponseWriter(resp, parser.getStreamOptions(),
ds.getWriteTimeout(params.getTimeout()));

ds.executeMutation(parser.getRawSchema(), table, parser.getTable(), params, ByteBuffer.wrap(ctx.getBody()),
writer);
ds.executeMutation(parser.getRawSchema(), table, parser.getTable(), params, ByteBuffer.wrap(ctx.getBody()),
writer);

if (resp.closed() || resp.ended()) {
promise.fail("Respose has already been closed");
} else {
resp.write(ByteBuffer.asBuffer(WRITE_RESPONSE));
promise.complete();
}
}, false, res -> {
if (res.succeeded()) {
if (log.isDebugEnabled()) {
log.debug("Wrote back query result");
}
resp.end();
} else {
ctx.fail(res.cause());
}
});
resp.end(ByteBuffer.asBuffer(WRITE_RESPONSE));
}

private Repository<NamedDataSource> getDataSourceRepository() {
Expand All @@ -568,6 +543,7 @@ private Repository<NamedQuery> getQueryRepository() {
return getRepositoryManager().getRepository(NamedQuery.class);
}

@SuppressWarnings("unchecked")
@Override
public <T> Extension<T> getExtension(Class<? extends T> clazz) {
String className = Objects.requireNonNull(clazz).getName();
Expand Down

0 comments on commit 2111265

Please sign in to comment.