Skip to content

Commit

Permalink
Enable mutation parameter
Browse files Browse the repository at this point in the history
  • Loading branch information
zhicwu committed Jan 15, 2021
1 parent 2111265 commit d6d502e
Show file tree
Hide file tree
Showing 5 changed files with 62 additions and 20 deletions.
12 changes: 10 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,13 @@ Assuming you started a test environment using docker-compose, please refer to ex
* Mutation
```sql
-- use query parameter
select * from jdbc('ch-server?mutation', 'drop table if exists system.test_table');
select * from jdbc('ch-server?mutation', 'create table system.test_table(a String, b UInt8) engine=Memory()');
select * from jdbc('ch-server?mutation', 'insert into system.test_table values(''a'', 1)');
select * from jdbc('ch-server?mutation', 'truncate table system.test_table');
-- use JDBC table engine
drop table if exists system.test_table;
create table system.test_table (
a String,
Expand Down Expand Up @@ -302,8 +309,9 @@ Assuming you started a test environment using docker-compose, please refer to ex
Couple of timeout settings you should be aware of:
1. datasource timeout, for example: `max_execution_time` in MariaDB
2. JDBC driver timeout, for example: `connectTimeout` and `socketTimeout` in [MariaDB Connector/J](https://mariadb.com/kb/en/about-mariadb-connector-j/)
3. Vertx timeout - see `config/server.json` and `config/vertx.json`
4. Client(ClickHouse JDBC driver) timeout - see timeout settings in ClickHouse JDBC driver
3. JDBC bridge timeout, for examples: `queryTimeout` in `config/server.json`, and `maxWorkerExecuteTime` in `config/vertx.json`
4. ClickHouse timeout like `max_execution_time` and `keep_alive_timeout` etc.
5. Client timeout, for example: `socketTimeout` in ClickHouse JDBC driver
## Migration
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -528,7 +528,9 @@ private void handleWrite(RoutingContext ctx) {
ds.executeMutation(parser.getRawSchema(), table, parser.getTable(), params, ByteBuffer.wrap(ctx.getBody()),
writer);

resp.end(ByteBuffer.asBuffer(WRITE_RESPONSE));
if (writer.isOpen()) {
resp.end(ByteBuffer.asBuffer(WRITE_RESPONSE));
}
}

private Repository<NamedDataSource> getDataSourceRepository() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,36 @@ private void writeDebugResult(String schema, String originalQuery, String loaded
Objects.requireNonNull(writer).write(buffer);
}

protected final void writeMutationResult(long effectedRows, ColumnDefinition[] requestColumns,
ColumnDefinition[] customColumns, ResponseWriter writer) {
ByteBuffer buffer = ByteBuffer.newInstance(100);
Map<String, String> values = new HashMap<>();
for (ColumnDefinition c : customColumns) {
values.put(c.getName(), converter.as(String.class, c.getValue()));
}

String typeName = TableDefinition.MUTATION_COLUMNS.getColumn(0).getName();
String rowsName = TableDefinition.MUTATION_COLUMNS.getColumn(1).getName();

values.put(typeName, this.getType());

for (ColumnDefinition c : requestColumns) {
String name = c.getName();
if (rowsName.equals(name)) {
buffer.writeUInt64(effectedRows);
} else {
String str = values.get(name);
if (str == null) {
buffer.writeNull();
} else {
buffer.writeNonNull().writeString(str);
}
}
}

Objects.requireNonNull(writer).write(buffer);
}

protected void writeMutationResult(String schema, String originalQuery, String loadedQuery, QueryParameters params,
ColumnDefinition[] requestColumns, ColumnDefinition[] customColumns, DefaultValues defaultValues,
ResponseWriter writer) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,8 @@ public class TableDefinition {
// datasource type: jdbc, config, script etc.
new ColumnDefinition("type", DataType.Str, true, DEFAULT_LENGTH, DEFAULT_PRECISION, DEFAULT_SCALE),
// operation: read or write
new ColumnDefinition("operation", DataType.Str, true, DEFAULT_LENGTH, DEFAULT_PRECISION, DEFAULT_SCALE),
// new ColumnDefinition("operation", DataType.Str, true, DEFAULT_LENGTH,
// DEFAULT_PRECISION, DEFAULT_SCALE),
new ColumnDefinition("rows", DataType.UInt64, false, DEFAULT_LENGTH, DEFAULT_PRECISION, DEFAULT_SCALE));

private static final String COLUMN_HEADER = "columns format version: ";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -517,6 +517,20 @@ protected final void setTimeout(Statement stmt, int expectedTimeout) {
}
}

protected long getFirstMutationResult(Statement stmt) throws SQLException {
long count = 0L;

try {
count = stmt.getLargeUpdateCount();
} catch (SQLException e) {
throw e;
} catch (Exception e) {
count = stmt.getUpdateCount();
}

return count == -1 ? 0 : count;
}

protected ResultSet getFirstQueryResult(Statement stmt, boolean hasResultSet) throws SQLException {
ResultSet rs = null;

Expand Down Expand Up @@ -654,27 +668,14 @@ protected void writeMutationResult(String schema, String originalQuery, String l
try (Connection conn = getConnection(); Statement stmt = createStatement(conn, params)) {
setTimeout(stmt, this.getQueryTimeout(params.getTimeout()));

final ResultSet rs = getFirstQueryResult(stmt, stmt.execute(loadedQuery));

DataTableReader reader = new ResultSetReader(getId(), rs, params);
reader.process(getId(), requestColumns, customColumns, getColumnsFromResultSet(rs, params), defaultValues,
getTimeZone(), params, writer);

/*
* if (stmt.execute(loadedQuery)) { // TODO multiple resultsets
*
* } else if (columns.size() == 1 && columns.getColumn(0).getType() ==
* ClickHouseDataType.Int32) {
* writer.write(ClickHouseBuffer.newInstance(4).writeInt32(stmt.getUpdateCount()
* )); } else { throw new IllegalStateException(
* "Not able to handle query result due to incompatible columns: " + columns); }
*/
stmt.execute(loadedQuery);
this.writeMutationResult(getFirstMutationResult(stmt), requestColumns, customColumns, writer);
} catch (SQLException e) {
throw new DataAccessException(getId(), buildErrorMessage(e), e);
} catch (DataAccessException e) {
Throwable cause = e.getCause();
throw new IllegalStateException(
"Failed to query against [" + this.getId() + "] due to: " + buildErrorMessage(cause), cause);
"Failed to mutate against [" + this.getId() + "] due to: " + buildErrorMessage(cause), cause);
}
}

Expand Down

0 comments on commit d6d502e

Please sign in to comment.