2626import com .google .cloud .spanner .DatabaseClient ;
2727import com .google .cloud .spanner .ErrorCode ;
2828import com .google .cloud .spanner .Mutation ;
29+ import com .google .cloud .spanner .Options ;
2930import com .google .cloud .spanner .Options .QueryOption ;
31+ import com .google .cloud .spanner .Options .UpdateOption ;
3032import com .google .cloud .spanner .ReadContext .QueryAnalyzeMode ;
3133import com .google .cloud .spanner .ResultSet ;
3234import com .google .cloud .spanner .ResultSets ;
4547import com .google .common .util .concurrent .MoreExecutors ;
4648import com .google .spanner .v1 .ExecuteSqlRequest .QueryOptions ;
4749import java .util .ArrayList ;
50+ import java .util .Arrays ;
4851import java .util .Collections ;
4952import java .util .Iterator ;
5053import java .util .LinkedList ;
@@ -206,6 +209,9 @@ static UnitOfWorkType of(TransactionMode transactionMode) {
206209 private TimestampBound readOnlyStaleness = TimestampBound .strong ();
207210 private QueryOptions queryOptions = QueryOptions .getDefaultInstance ();
208211
212+ private String transactionTag ;
213+ private String statementTag ;
214+
209215 /** Create a connection and register it in the SpannerPool. */
210216 ConnectionImpl (ConnectionOptions options ) {
211217 Preconditions .checkNotNull (options );
@@ -512,6 +518,47 @@ public void setTransactionMode(TransactionMode transactionMode) {
512518 this .unitOfWorkType = UnitOfWorkType .of (transactionMode );
513519 }
514520
521+ @ Override
522+ public String getTransactionTag () {
523+ ConnectionPreconditions .checkState (!isClosed (), CLOSED_ERROR_MSG );
524+ ConnectionPreconditions .checkState (!isDdlBatchActive (), "This connection is in a DDL batch" );
525+ return transactionTag ;
526+ }
527+
528+ @ Override
529+ public void setTransactionTag (String tag ) {
530+ ConnectionPreconditions .checkState (!isClosed (), CLOSED_ERROR_MSG );
531+ ConnectionPreconditions .checkState (
532+ !isBatchActive (), "Cannot set transaction tag while in a batch" );
533+ ConnectionPreconditions .checkState (isInTransaction (), "This connection has no transaction" );
534+ ConnectionPreconditions .checkState (
535+ !isTransactionStarted (),
536+ "The transaction tag cannot be set after the transaction has started" );
537+ ConnectionPreconditions .checkState (
538+ getTransactionMode () == TransactionMode .READ_WRITE_TRANSACTION ,
539+ "Transaction tag can only be set for a read/write transaction" );
540+
541+ this .transactionBeginMarked = true ;
542+ this .transactionTag = tag ;
543+ }
544+
545+ @ Override
546+ public String getStatementTag () {
547+ ConnectionPreconditions .checkState (!isClosed (), CLOSED_ERROR_MSG );
548+ ConnectionPreconditions .checkState (
549+ !isBatchActive (), "Statement tags are not allowed inside a batch" );
550+ return statementTag ;
551+ }
552+
553+ @ Override
554+ public void setStatementTag (String tag ) {
555+ ConnectionPreconditions .checkState (!isClosed (), CLOSED_ERROR_MSG );
556+ ConnectionPreconditions .checkState (
557+ !isBatchActive (), "Statement tags are not allowed inside a batch" );
558+
559+ this .statementTag = tag ;
560+ }
561+
515562 /**
516563 * Throws an {@link SpannerException} with code {@link ErrorCode#FAILED_PRECONDITION} if the
517564 * current state of this connection does not allow changing the setting for retryAbortsInternally.
@@ -643,6 +690,7 @@ private void setDefaultTransactionOptions() {
643690 ? UnitOfWorkType .READ_ONLY_TRANSACTION
644691 : UnitOfWorkType .READ_WRITE_TRANSACTION ;
645692 batchMode = BatchMode .NONE ;
693+ transactionTag = null ;
646694 } else {
647695 popUnitOfWorkFromTransactionStack ();
648696 }
@@ -717,6 +765,8 @@ public ApiFuture<Void> rollbackAsync() {
717765 private ApiFuture <Void > endCurrentTransactionAsync (EndTransactionMethod endTransactionMethod ) {
718766 ConnectionPreconditions .checkState (!isBatchActive (), "This connection has an active batch" );
719767 ConnectionPreconditions .checkState (isInTransaction (), "This connection has no transaction" );
768+ ConnectionPreconditions .checkState (
769+ statementTag == null , "Statement tags are not supported for COMMIT or ROLLBACK" );
720770 ApiFuture <Void > res ;
721771 try {
722772 if (isTransactionStarted ()) {
@@ -954,14 +1004,43 @@ public ApiFuture<long[]> executeBatchUpdateAsync(Iterable<Statement> updates) {
9541004 return internalExecuteBatchUpdateAsync (parsedStatements );
9551005 }
9561006
1007+ private QueryOption [] mergeQueryStatementTag (QueryOption ... options ) {
1008+ if (this .statementTag != null ) {
1009+ // Shortcut for the most common scenario.
1010+ if (options == null || options .length == 0 ) {
1011+ options = new QueryOption [] {Options .tag (statementTag )};
1012+ } else {
1013+ options = Arrays .copyOf (options , options .length + 1 );
1014+ options [options .length - 1 ] = Options .tag (statementTag );
1015+ }
1016+ this .statementTag = null ;
1017+ }
1018+ return options ;
1019+ }
1020+
1021+ private UpdateOption [] mergeUpdateStatementTag (UpdateOption ... options ) {
1022+ if (this .statementTag != null ) {
1023+ // Shortcut for the most common scenario.
1024+ if (options == null || options .length == 0 ) {
1025+ options = new UpdateOption [] {Options .tag (statementTag )};
1026+ } else {
1027+ options = Arrays .copyOf (options , options .length + 1 );
1028+ options [options .length - 1 ] = Options .tag (statementTag );
1029+ }
1030+ this .statementTag = null ;
1031+ }
1032+ return options ;
1033+ }
1034+
9571035 private ResultSet internalExecuteQuery (
9581036 final ParsedStatement statement ,
9591037 final AnalyzeMode analyzeMode ,
9601038 final QueryOption ... options ) {
9611039 Preconditions .checkArgument (
9621040 statement .getType () == StatementType .QUERY , "Statement must be a query" );
9631041 UnitOfWork transaction = getCurrentUnitOfWorkOrStartNewUnitOfWork ();
964- return get (transaction .executeQueryAsync (statement , analyzeMode , options ));
1042+ return get (
1043+ transaction .executeQueryAsync (statement , analyzeMode , mergeQueryStatementTag (options )));
9651044 }
9661045
9671046 private AsyncResultSet internalExecuteQueryAsync (
@@ -972,21 +1051,23 @@ private AsyncResultSet internalExecuteQueryAsync(
9721051 statement .getType () == StatementType .QUERY , "Statement must be a query" );
9731052 UnitOfWork transaction = getCurrentUnitOfWorkOrStartNewUnitOfWork ();
9741053 return ResultSets .toAsyncResultSet (
975- transaction .executeQueryAsync (statement , analyzeMode , options ),
1054+ transaction .executeQueryAsync (statement , analyzeMode , mergeQueryStatementTag ( options ) ),
9761055 spanner .getAsyncExecutorProvider (),
9771056 options );
9781057 }
9791058
980- private ApiFuture <Long > internalExecuteUpdateAsync (final ParsedStatement update ) {
1059+ private ApiFuture <Long > internalExecuteUpdateAsync (
1060+ final ParsedStatement update , UpdateOption ... options ) {
9811061 Preconditions .checkArgument (
9821062 update .getType () == StatementType .UPDATE , "Statement must be an update" );
9831063 UnitOfWork transaction = getCurrentUnitOfWorkOrStartNewUnitOfWork ();
984- return transaction .executeUpdateAsync (update );
1064+ return transaction .executeUpdateAsync (update , mergeUpdateStatementTag ( options ) );
9851065 }
9861066
987- private ApiFuture <long []> internalExecuteBatchUpdateAsync (List <ParsedStatement > updates ) {
1067+ private ApiFuture <long []> internalExecuteBatchUpdateAsync (
1068+ List <ParsedStatement > updates , UpdateOption ... options ) {
9881069 UnitOfWork transaction = getCurrentUnitOfWorkOrStartNewUnitOfWork ();
989- return transaction .executeBatchUpdateAsync (updates );
1070+ return transaction .executeBatchUpdateAsync (updates , mergeUpdateStatementTag ( options ) );
9901071 }
9911072
9921073 /**
@@ -1001,7 +1082,8 @@ UnitOfWork getCurrentUnitOfWorkOrStartNewUnitOfWork() {
10011082 return this .currentUnitOfWork ;
10021083 }
10031084
1004- private UnitOfWork createNewUnitOfWork () {
1085+ @ VisibleForTesting
1086+ UnitOfWork createNewUnitOfWork () {
10051087 if (isAutocommit () && !isInTransaction () && !isInBatch ()) {
10061088 return SingleUseTransaction .newBuilder ()
10071089 .setDdlClient (ddlClient )
@@ -1021,6 +1103,7 @@ private UnitOfWork createNewUnitOfWork() {
10211103 .setReadOnlyStaleness (readOnlyStaleness )
10221104 .setStatementTimeout (statementTimeout )
10231105 .withStatementExecutor (statementExecutor )
1106+ .setTransactionTag (transactionTag )
10241107 .build ();
10251108 case READ_WRITE_TRANSACTION :
10261109 return ReadWriteTransaction .newBuilder ()
@@ -1030,6 +1113,7 @@ private UnitOfWork createNewUnitOfWork() {
10301113 .setTransactionRetryListeners (transactionRetryListeners )
10311114 .setStatementTimeout (statementTimeout )
10321115 .withStatementExecutor (statementExecutor )
1116+ .setTransactionTag (transactionTag )
10331117 .build ();
10341118 case DML_BATCH :
10351119 // A DML batch can run inside the current transaction. It should therefore only
@@ -1039,6 +1123,7 @@ private UnitOfWork createNewUnitOfWork() {
10391123 .setTransaction (currentUnitOfWork )
10401124 .setStatementTimeout (statementTimeout )
10411125 .withStatementExecutor (statementExecutor )
1126+ .setStatementTag (statementTag )
10421127 .build ();
10431128 case DDL_BATCH :
10441129 return DdlBatch .newBuilder ()
0 commit comments