Skip to content

Commit bab4692

Browse files
authored
chore: mark transaction context as closed after use (#1481)
A transaction context should be marked as closed after it has been used, so it can throw a logical error if someone tries to reuse it for a second transaction. Also adds a timeout to the wait function for a transaction ID in the Commit method.
1 parent 9d3479b commit bab4692

File tree

5 files changed

+169
-3
lines changed

5 files changed

+169
-3
lines changed

google-cloud-spanner/src/main/java/com/google/cloud/spanner/AbstractReadContext.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -385,7 +385,7 @@ void initTransaction() {
385385
private boolean isValid = true;
386386

387387
@GuardedBy("lock")
388-
private boolean isClosed = false;
388+
protected boolean isClosed = false;
389389

390390
// A per-transaction sequence number used to identify this ExecuteSqlRequests. Required for DML,
391391
// ignored for query by the server.

google-cloud-spanner/src/main/java/com/google/cloud/spanner/TransactionRunnerImpl.java

Lines changed: 20 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -172,7 +172,7 @@ public void removeListener(Runnable listener) {
172172
* transaction if the BeginTransaction option is included with the first statement of the
173173
* transaction.
174174
*/
175-
private volatile SettableApiFuture<ByteString> transactionIdFuture = null;
175+
@VisibleForTesting volatile SettableApiFuture<ByteString> transactionIdFuture = null;
176176

177177
@VisibleForTesting long waitForTransactionTimeoutMillis = 60_000L;
178178
private final boolean trackTransactionStarter;
@@ -208,6 +208,15 @@ private void decreaseAsyncOperations() {
208208
}
209209
}
210210

211+
@Override
212+
public void close() {
213+
// Only mark the context as closed, but do not end the tracer span, as that is done by the
214+
// commit and rollback methods.
215+
synchronized (lock) {
216+
isClosed = true;
217+
}
218+
}
219+
211220
void ensureTxn() {
212221
try {
213222
ensureTxnAsync().get();
@@ -280,6 +289,8 @@ void commit() {
280289
volatile ApiFuture<CommitResponse> commitFuture;
281290

282291
ApiFuture<CommitResponse> commitAsync() {
292+
close();
293+
283294
final SettableApiFuture<CommitResponse> res = SettableApiFuture.create();
284295
final SettableApiFuture<Void> finishOps;
285296
CommitRequest.Builder builder =
@@ -340,7 +351,10 @@ public void run() {
340351
.setReadWrite(TransactionOptions.ReadWrite.getDefaultInstance()));
341352
} else {
342353
requestBuilder.setTransactionId(
343-
transactionId == null ? transactionIdFuture.get() : transactionId);
354+
transactionId == null
355+
? transactionIdFuture.get(
356+
waitForTransactionTimeoutMillis, TimeUnit.MILLISECONDS)
357+
: transactionId);
344358
}
345359
if (options.hasPriority() || getTransactionTag() != null) {
346360
RequestOptions.Builder requestOptionsBuilder = RequestOptions.newBuilder();
@@ -389,6 +403,8 @@ public void run() {
389403
MoreExecutors.directExecutor());
390404
} catch (InterruptedException e) {
391405
res.setException(SpannerExceptionFactory.propagateInterrupt(e));
406+
} catch (TimeoutException e) {
407+
res.setException(SpannerExceptionFactory.propagateTimeout(e));
392408
} catch (ExecutionException e) {
393409
res.setException(
394410
SpannerExceptionFactory.newSpannerException(e.getCause() == null ? e : e.getCause()));
@@ -419,6 +435,8 @@ void rollback() {
419435
}
420436

421437
ApiFuture<Empty> rollbackAsync() {
438+
close();
439+
422440
// It could be that there is no transaction if the transaction has been marked
423441
// withInlineBegin, and there has not been any query/update statement that has been executed.
424442
// In that case, we do not need to do anything, as there is no transaction.

google-cloud-spanner/src/test/java/com/google/cloud/spanner/DatabaseClientImplTest.java

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,7 @@
7676
import java.util.concurrent.ScheduledThreadPoolExecutor;
7777
import java.util.concurrent.TimeUnit;
7878
import java.util.concurrent.atomic.AtomicInteger;
79+
import java.util.function.Function;
7980
import java.util.logging.Level;
8081
import java.util.logging.Logger;
8182
import org.junit.After;
@@ -2149,4 +2150,32 @@ public void transactionManagerNoAction_ClearsCheckedOutSession() {
21492150

21502151
assertThat(checkedOut).isEmpty();
21512152
}
2153+
2154+
@Test
2155+
public void transactionContextFailsIfUsedMultipleTimes() {
2156+
DatabaseClient client =
2157+
spanner.getDatabaseClient(DatabaseId.of(TEST_PROJECT, TEST_INSTANCE, TEST_DATABASE));
2158+
2159+
Function<TransactionContext, Long> function =
2160+
new Function<TransactionContext, Long>() {
2161+
TransactionContext ctx;
2162+
2163+
@Override
2164+
public Long apply(TransactionContext transactionContext) {
2165+
if (ctx == null) {
2166+
ctx = transactionContext;
2167+
}
2168+
try (ResultSet rs = ctx.executeQuery(SELECT1)) {
2169+
while (rs.next()) {}
2170+
}
2171+
return 1L;
2172+
}
2173+
};
2174+
assertEquals(Long.valueOf(1L), client.readWriteTransaction().run(tx -> function.apply(tx)));
2175+
SpannerException exception =
2176+
assertThrows(
2177+
SpannerException.class,
2178+
() -> client.readWriteTransaction().run(tx -> function.apply(tx)));
2179+
assertTrue(exception.getMessage().contains("Context has been closed"));
2180+
}
21522181
}

google-cloud-spanner/src/test/java/com/google/cloud/spanner/InlineBeginTransactionTest.java

Lines changed: 116 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,8 @@
1919
import static com.google.cloud.spanner.SpannerApiFutures.get;
2020
import static com.google.common.truth.Truth.assertThat;
2121
import static org.junit.Assert.assertEquals;
22+
import static org.junit.Assert.assertNotNull;
23+
import static org.junit.Assert.assertNull;
2224
import static org.junit.Assert.assertThrows;
2325
import static org.junit.Assert.assertTrue;
2426

@@ -112,6 +114,8 @@ public class InlineBeginTransactionTest {
112114
.build())
113115
.setMetadata(SELECT1_METADATA)
114116
.build();
117+
private static final com.google.spanner.v1.ResultSet EMPTY_RESULTSET =
118+
com.google.spanner.v1.ResultSet.newBuilder().setMetadata(SELECT1_METADATA).build();
115119
private static final Statement SELECT1_UNION_ALL_SELECT2 =
116120
Statement.of("SELECT 1 AS COL1 UNION ALL SELECT 2 AS COL1");
117121
private static final com.google.spanner.v1.ResultSet SELECT1_UNION_ALL_SELECT2_RESULTSET =
@@ -1322,6 +1326,38 @@ public Void run(TransactionContext transaction) {
13221326
assertThat(countRequests(CommitRequest.class)).isEqualTo(1);
13231327
}
13241328

1329+
@Test
1330+
public void testWaitForTransactionTimeoutForCommit() {
1331+
DatabaseClient client = spanner.getDatabaseClient(DatabaseId.of("p", "i", "d"));
1332+
AtomicBoolean firstAttempt = new AtomicBoolean(true);
1333+
SpannerException exception =
1334+
assertThrows(
1335+
SpannerException.class,
1336+
() ->
1337+
client
1338+
.readWriteTransaction()
1339+
.run(
1340+
transaction -> {
1341+
TransactionContextImpl impl = (TransactionContextImpl) transaction;
1342+
Struct res =
1343+
transaction.readRow(
1344+
"FOO", Key.of(1L), Collections.singletonList("BAR"));
1345+
if (firstAttempt.compareAndSet(true, false)) {
1346+
impl.waitForTransactionTimeoutMillis = 1L;
1347+
// Simulate that the transaction id got lost.
1348+
impl.transactionIdFuture = SettableApiFuture.create();
1349+
impl.transactionId = null;
1350+
} else {
1351+
impl.waitForTransactionTimeoutMillis = 60_000L;
1352+
}
1353+
return res;
1354+
}));
1355+
assertEquals(ErrorCode.DEADLINE_EXCEEDED, exception.getErrorCode());
1356+
assertEquals(0, countRequests(BeginTransactionRequest.class));
1357+
assertEquals(1, countRequests(ReadRequest.class));
1358+
assertEquals(0, countRequests(CommitRequest.class));
1359+
}
1360+
13251361
@Test
13261362
public void testQueryWithInlineBeginDidNotReturnTransaction() {
13271363
DatabaseClient client = spanner.getDatabaseClient(DatabaseId.of("p", "i", "d"));
@@ -1585,6 +1621,86 @@ public void testInlinedBeginTx_withStickyCancelledOnFirstStatement() {
15851621
assertEquals(0, countRequests(CommitRequest.class));
15861622
assertEquals(2, countTransactionsStarted());
15871623
}
1624+
1625+
@Test
1626+
public void testReadRowAborted() {
1627+
DatabaseClient client = spanner.getDatabaseClient(DatabaseId.of("p", "i", "d"));
1628+
// The retry behavior should be equal regardless whether the readRow operation returns a row
1629+
// or not.
1630+
for (boolean emptyResult : new boolean[] {true, false}) {
1631+
AtomicBoolean firstAttempt = new AtomicBoolean(true);
1632+
Struct row =
1633+
client
1634+
.readWriteTransaction()
1635+
.run(
1636+
transaction -> {
1637+
if (firstAttempt.compareAndSet(true, false)) {
1638+
mockSpanner.putStatementResult(
1639+
StatementResult.exception(
1640+
READ_ROW_STATEMENT,
1641+
mockSpanner.createAbortedException(ByteString.copyFromUtf8("tx"))));
1642+
} else {
1643+
mockSpanner.putStatementResult(
1644+
StatementResult.query(
1645+
READ_ROW_STATEMENT,
1646+
emptyResult ? EMPTY_RESULTSET : SELECT1_RESULTSET));
1647+
}
1648+
return transaction.readRow(
1649+
"FOO", Key.of(1L), Collections.singletonList("BAR"));
1650+
});
1651+
if (emptyResult) {
1652+
assertNull(row);
1653+
} else {
1654+
assertNotNull(row);
1655+
assertEquals(1L, row.getLong(0));
1656+
}
1657+
// The transaction is retried once, and the retry will use an explicit BeginTransaction RPC
1658+
// as the first attempt did not return a transaction id.
1659+
assertThat(countRequests(BeginTransactionRequest.class)).isEqualTo(1);
1660+
assertThat(countRequests(ReadRequest.class)).isEqualTo(2);
1661+
assertThat(countRequests(CommitRequest.class)).isEqualTo(1);
1662+
mockSpanner.clearRequests();
1663+
}
1664+
}
1665+
1666+
@Test
1667+
public void testReadRowCommitAborted() {
1668+
DatabaseClient client = spanner.getDatabaseClient(DatabaseId.of("p", "i", "d"));
1669+
// The retry behavior should be equal regardless whether the readRow operation returns a row
1670+
// or not.
1671+
for (boolean emptyResult : new boolean[] {true, false}) {
1672+
mockSpanner.putStatementResult(
1673+
StatementResult.query(
1674+
READ_ROW_STATEMENT, emptyResult ? EMPTY_RESULTSET : SELECT1_RESULTSET));
1675+
AtomicBoolean firstAttempt = new AtomicBoolean(true);
1676+
Struct row =
1677+
client
1678+
.readWriteTransaction()
1679+
.run(
1680+
transaction -> {
1681+
Struct res =
1682+
transaction.readRow("FOO", Key.of(1L), Collections.singletonList("BAR"));
1683+
// This will cause the commit request to return Aborted.
1684+
if (firstAttempt.compareAndSet(true, false)) {
1685+
mockSpanner.abortTransaction(transaction);
1686+
}
1687+
return res;
1688+
});
1689+
if (emptyResult) {
1690+
assertNull(row);
1691+
} else {
1692+
assertNotNull(row);
1693+
assertEquals(1L, row.getLong(0));
1694+
}
1695+
// The transaction is retried once, and will inline the BeginTransaction option with the
1696+
// read operation during the retry again, as the initial attempt did return a transaction
1697+
// id.
1698+
assertThat(countRequests(BeginTransactionRequest.class)).isEqualTo(0);
1699+
assertThat(countRequests(ReadRequest.class)).isEqualTo(2);
1700+
assertThat(countRequests(CommitRequest.class)).isEqualTo(2);
1701+
mockSpanner.clearRequests();
1702+
}
1703+
}
15881704
}
15891705

15901706
private static int countRequests(Class<? extends AbstractMessage> requestType) {

google-cloud-spanner/src/test/java/com/google/cloud/spanner/MockSpannerServiceImpl.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1466,6 +1466,9 @@ public void streamingRead(
14661466
.withDescription("No result found for " + statement.toString())
14671467
.asRuntimeException();
14681468
}
1469+
if (res.getType() == StatementResult.StatementResultType.EXCEPTION) {
1470+
throw res.getException();
1471+
}
14691472
returnPartialResultSet(
14701473
res.getResultSet(),
14711474
transactionId,

0 commit comments

Comments
 (0)