Skip to content

Commit

Permalink
Stop mutation when connection is closed
Browse files Browse the repository at this point in the history
  • Loading branch information
zhicwu committed Jan 14, 2021
1 parent e62d7bb commit 634bce9
Show file tree
Hide file tree
Showing 4 changed files with 31 additions and 31 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -291,27 +291,19 @@ private void responseHandlers(RoutingContext ctx) {
// log.trace("[{}] Body:\n{}", path, ctx.getBodyAsString());
// }

HttpServerResponse resp = ctx.response();

resp.endHandler(handler -> {
ctx.response().endHandler(handler -> {
if (log.isTraceEnabled()) {
log.trace("[{}] About to end response...", ctx.normalisedPath());
}
});

resp.closeHandler(handler -> {
}).closeHandler(handler -> {
if (log.isTraceEnabled()) {
log.trace("[{}] About to close response...", ctx.normalisedPath());
}
});

resp.drainHandler(handler -> {
}).drainHandler(handler -> {
if (log.isTraceEnabled()) {
log.trace("[{}] About to drain response...", ctx.normalisedPath());
}
});

resp.exceptionHandler(throwable -> {
}).exceptionHandler(throwable -> {
log.error("Caught exception", throwable);
});

Expand Down Expand Up @@ -421,7 +413,7 @@ private void handleQuery(RoutingContext ctx) {
final Repository<NamedDataSource> manager = getDataSourceRepository();
final QueryParser parser = QueryParser.fromRequest(ctx, manager);

ctx.response().setChunked(true);
final HttpServerResponse resp = ctx.response().setChunked(true);

vertx.executeBlocking(promise -> {
if (log.isTraceEnabled()) {
Expand All @@ -446,8 +438,6 @@ private void handleQuery(RoutingContext ctx) {
log.debug("Generated query:\n{}\nNormalized query:\n{}", generatedQuery, normalizedQuery);
}

final HttpServerResponse resp = ctx.response();

ResponseWriter writer = new ResponseWriter(resp, parser.getStreamOptions(),
ds.getQueryTimeout(params.getTimeout()));

Expand Down Expand Up @@ -510,7 +500,7 @@ private void handleWrite(RoutingContext ctx) {
final Repository<NamedDataSource> manager = getDataSourceRepository();
final QueryParser parser = QueryParser.fromRequest(ctx, manager, true);

ctx.response().setChunked(true);
final HttpServerResponse resp = ctx.response().setChunked(true);

vertx.executeBlocking(promise -> {
if (log.isTraceEnabled()) {
Expand All @@ -521,9 +511,6 @@ private void handleWrite(RoutingContext ctx) {
NamedDataSource ds = getDataSource(manager, parser.getConnectionString(), params.isDebug());
params = ds == null ? params : ds.newQueryParameters(params);

// final HttpServerRequest req = ctx.request();
final HttpServerResponse resp = ctx.response();

final String generatedQuery = parser.getRawQuery();

String normalizedQuery = parser.getNormalizedQuery();
Expand All @@ -545,17 +532,24 @@ private void handleWrite(RoutingContext ctx) {
table = parser.extractTable(ds.loadSavedQueryAsNeeded(normalizedQuery, params));
}

ds.executeMutation(parser.getRawSchema(), table, parser.getTable(), params, ByteBuffer.wrap(ctx.getBody()));
ResponseWriter writer = new ResponseWriter(resp, parser.getStreamOptions(),
ds.getWriteTimeout(params.getTimeout()));

resp.write(ByteBuffer.asBuffer(WRITE_RESPONSE));
ds.executeMutation(parser.getRawSchema(), table, parser.getTable(), params, ByteBuffer.wrap(ctx.getBody()),
writer);

promise.complete();
if (resp.closed() || resp.ended()) {
promise.fail("Respose has already been closed");
} else {
resp.write(ByteBuffer.asBuffer(WRITE_RESPONSE));
promise.complete();
}
}, false, res -> {
if (res.succeeded()) {
if (log.isDebugEnabled()) {
log.debug("Wrote back query result");
}
ctx.response().end();
resp.end();
} else {
ctx.fail(res.cause());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -525,7 +525,7 @@ public final void executeQuery(String schema, String originalQuery, String loade
}

public void executeMutation(String schema, String target, TableDefinition columns, QueryParameters parameters,
ByteBuffer buffer) {
ByteBuffer buffer, ResponseWriter writer) {
log.info("Executing mutation: schema=[{}], target=[{}]", schema, target);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -309,7 +309,7 @@ private int bulkMutation(PreparedStatement stmt) throws SQLException {

int[] results = stmt.executeBatch();
for (int i = 0; i < results.length; i++) {
mutationCount += i;
mutationCount += results[i];
}
stmt.clearBatch();

Expand Down Expand Up @@ -874,7 +874,7 @@ public String getPoolUsage() {

@Override
public void executeMutation(String schema, String table, TableDefinition columns, QueryParameters params,
ByteBuffer buffer) {
ByteBuffer buffer, ResponseWriter writer) {
log.info("Executing mutation: schema=[{}], table=[{}]", schema, table);

StringBuilder sql = new StringBuilder();
Expand All @@ -900,6 +900,7 @@ public void executeMutation(String schema, String table, TableDefinition columns
setTimeout(stmt, this.getWriteTimeout(params.getTimeout()));

int counter = 0;
boolean stopped = false;
while (!buffer.isExausted()) {
write(stmt, cols, params, buffer);
rowCount++;
Expand All @@ -914,14 +915,19 @@ public void executeMutation(String schema, String table, TableDefinition columns
counter = 0;
}
}

if (!writer.isOpen()) {
stopped = true;
break;
}
}

if (batchSize > 0 && counter > 0) {
if (!stopped && batchSize > 0 && counter > 0) {
mutationCount += this.bulkMutation(stmt);
}

log.info("Mutation status(batchSize={}): inputRows={}, effectedRows={}", batchSize, rowCount,
mutationCount);
log.info("Mutation {} on [{}]: batchSize={}, inputRows={}, effectedRows={}",
stopped ? "stopped" : "completed", this.getId(), batchSize, rowCount, mutationCount);
} catch (SQLException e) {
throw new IllegalStateException(
"Failed to mutate in [" + this.getId() + "] due to: " + buildErrorMessage(e), e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -380,7 +380,7 @@ protected void writeQueryResult(String schema, String originalQuery, String load

@Override
public void executeMutation(String schema, String table, TableDefinition columns, QueryParameters parameters,
ByteBuffer buffer) {
super.executeMutation(schema, table, columns, parameters, buffer);
ByteBuffer buffer, ResponseWriter writer) {
super.executeMutation(schema, table, columns, parameters, buffer, writer);
}
}

0 comments on commit 634bce9

Please sign in to comment.