Skip to content

Commit 6f10a1f

Browse files
committed
chore(x-goog-request-id): commit testing scaffold
This change commits the scaffolding for which testing will be used. This is a carve out of PRs #1264 and #1364, meant to make those changes lighter and much easier to review then merge. Updates #1261
1 parent e064474 commit 6f10a1f

File tree

6 files changed

+170
-7
lines changed

6 files changed

+170
-7
lines changed

google/cloud/spanner_v1/request_id_header.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,6 @@ def generate_rand_uint64():
3737

3838
def with_request_id(client_id, channel_id, nth_request, attempt, other_metadata=[]):
3939
req_id = f"{REQ_ID_VERSION}.{REQ_RAND_PROCESS_ID}.{client_id}.{channel_id}.{nth_request}.{attempt}"
40-
all_metadata = other_metadata.copy()
40+
all_metadata = (other_metadata or []).copy()
4141
all_metadata.append((REQ_ID_HEADER_KEY, req_id))
4242
return all_metadata

google/cloud/spanner_v1/testing/database_test.py

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
from google.cloud.spanner_v1.testing.interceptors import (
2626
MethodCountInterceptor,
2727
MethodAbortInterceptor,
28+
XGoogRequestIDHeaderInterceptor,
2829
)
2930

3031

@@ -34,6 +35,8 @@ class TestDatabase(Database):
3435
currently, and we don't want to make changes in the Database class for
3536
testing purpose as this is a hack to use interceptors in tests."""
3637

38+
_interceptors = []
39+
3740
def __init__(
3841
self,
3942
database_id,
@@ -74,6 +77,8 @@ def spanner_api(self):
7477
client_options = client._client_options
7578
if self._instance.emulator_host is not None:
7679
channel = grpc.insecure_channel(self._instance.emulator_host)
80+
self._x_goog_request_id_interceptor = XGoogRequestIDHeaderInterceptor()
81+
self._interceptors.append(self._x_goog_request_id_interceptor)
7782
channel = grpc.intercept_channel(channel, *self._interceptors)
7883
transport = SpannerGrpcTransport(channel=channel)
7984
self._spanner_api = SpannerClient(
@@ -110,3 +115,7 @@ def _create_spanner_client_for_tests(self, client_options, credentials):
110115
client_options=client_options,
111116
transport=transport,
112117
)
118+
119+
def reset(self):
120+
if self._x_goog_request_id_interceptor:
121+
self._x_goog_request_id_interceptor.reset()

google/cloud/spanner_v1/testing/interceptors.py

Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,8 @@
1313
# limitations under the License.
1414

1515
from collections import defaultdict
16+
import threading
17+
1618
from grpc_interceptor import ClientInterceptor
1719
from google.api_core.exceptions import Aborted
1820

@@ -63,3 +65,69 @@ def reset(self):
6365
self._method_to_abort = None
6466
self._count = 0
6567
self._connection = None
68+
69+
70+
X_GOOG_REQUEST_ID = "x-goog-spanner-request-id"
71+
# TODO:(@odeke-em): delete this guard when PR #1367 is merged.
72+
__X_GOOG_REQUEST_ID_FUNCTIONALITY_MERGED = False
73+
74+
75+
class XGoogRequestIDHeaderInterceptor(ClientInterceptor):
76+
def __init__(self):
77+
self._unary_req_segments = []
78+
self._stream_req_segments = []
79+
self.__lock = threading.Lock()
80+
81+
def intercept(self, method, request_or_iterator, call_details):
82+
metadata = call_details.metadata
83+
x_goog_request_id = None
84+
for key, value in metadata:
85+
if key == X_GOOG_REQUEST_ID:
86+
x_goog_request_id = value
87+
break
88+
89+
if __X_GOOG_REQUEST_ID_FUNCTIONALITY_MERGED and not x_goog_request_id:
90+
raise Exception(
91+
f"Missing {X_GOOG_REQUEST_ID} header in {call_details.method}"
92+
)
93+
94+
response_or_iterator = method(request_or_iterator, call_details)
95+
streaming = getattr(response_or_iterator, "__iter__", None) is not None
96+
with self.__lock:
97+
if streaming:
98+
self._stream_req_segments.append(
99+
(call_details.method, parse_request_id(x_goog_request_id))
100+
)
101+
else:
102+
self._unary_req_segments.append(
103+
(call_details.method, parse_request_id(x_goog_request_id))
104+
)
105+
106+
return response_or_iterator
107+
108+
@property
109+
def unary_request_ids(self):
110+
return self._unary_req_segments
111+
112+
@property
113+
def stream_request_ids(self):
114+
return self._stream_req_segments
115+
116+
def reset(self):
117+
self._stream_req_segments.clear()
118+
self._unary_req_segments.clear()
119+
120+
121+
def parse_request_id(request_id_str):
122+
splits = request_id_str.split(".")
123+
version, rand_process_id, client_id, channel_id, nth_request, nth_attempt = list(
124+
map(lambda v: int(v), splits)
125+
)
126+
return (
127+
version,
128+
rand_process_id,
129+
client_id,
130+
channel_id,
131+
nth_request,
132+
nth_attempt,
133+
)

google/cloud/spanner_v1/testing/mock_spanner.py

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -22,8 +22,6 @@
2222
from google.cloud.spanner_v1 import (
2323
TransactionOptions,
2424
ResultSetMetadata,
25-
ExecuteSqlRequest,
26-
ExecuteBatchDmlRequest,
2725
)
2826
from google.cloud.spanner_v1.testing.mock_database_admin import DatabaseAdminServicer
2927
import google.cloud.spanner_v1.testing.spanner_database_admin_pb2_grpc as database_admin_grpc
@@ -107,6 +105,7 @@ def CreateSession(self, request, context):
107105

108106
def BatchCreateSessions(self, request, context):
109107
self._requests.append(request)
108+
self.mock_spanner.pop_error(context)
110109
sessions = []
111110
for i in range(request.session_count):
112111
sessions.append(
@@ -186,9 +185,7 @@ def BeginTransaction(self, request, context):
186185
self._requests.append(request)
187186
return self.__create_transaction(request.session, request.options)
188187

189-
def __maybe_create_transaction(
190-
self, request: ExecuteSqlRequest | ExecuteBatchDmlRequest
191-
):
188+
def __maybe_create_transaction(self, request):
192189
started_transaction = None
193190
if not request.transaction.begin == TransactionOptions():
194191
started_transaction = self.__create_transaction(

tests/mockserver_tests/mock_server_test_base.py

Lines changed: 26 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
start_mock_server,
2121
SpannerServicer,
2222
)
23+
from google.cloud.spanner_v1.client import Client
2324
import google.cloud.spanner_v1.types.type as spanner_type
2425
import google.cloud.spanner_v1.types.result_set as result_set
2526
from google.api_core.client_options import ClientOptions
@@ -78,6 +79,27 @@ def unavailable_status() -> _Status:
7879
return status
7980

8081

82+
# Creates an UNAVAILABLE status with the smallest possible retry delay.
83+
def unavailable_status() -> _Status:
84+
error = status_pb2.Status(
85+
code=code_pb2.UNAVAILABLE,
86+
message="Service unavailable.",
87+
)
88+
retry_info = RetryInfo(retry_delay=Duration(seconds=0, nanos=1))
89+
status = _Status(
90+
code=code_to_grpc_status_code(error.code),
91+
details=error.message,
92+
trailing_metadata=(
93+
("grpc-status-details-bin", error.SerializeToString()),
94+
(
95+
"google.rpc.retryinfo-bin",
96+
retry_info.SerializeToString(),
97+
),
98+
),
99+
)
100+
return status
101+
102+
81103
def add_error(method: str, error: status_pb2.Status):
82104
MockServerTestBase.spanner_service.mock_spanner.add_error(method, error)
83105

@@ -153,6 +175,7 @@ def setup_class(cls):
153175
def teardown_class(cls):
154176
if MockServerTestBase.server is not None:
155177
MockServerTestBase.server.stop(grace=None)
178+
Client.NTH_CLIENT.reset()
156179
MockServerTestBase.server = None
157180

158181
def setup_method(self, *args, **kwargs):
@@ -186,6 +209,8 @@ def instance(self) -> Instance:
186209
def database(self) -> Database:
187210
if self._database is None:
188211
self._database = self.instance.database(
189-
"test-database", pool=FixedSizePool(size=10)
212+
"test-database",
213+
pool=FixedSizePool(size=10),
214+
enable_interceptors_in_tests=True,
190215
)
191216
return self._database

tests/unit/test_transaction.py

Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,11 @@
2121
from google.cloud.spanner_v1 import TypeCode
2222
from google.api_core.retry import Retry
2323
from google.api_core import gapic_v1
24+
from google.cloud.spanner_v1._helpers import (
25+
AtomicCounter,
26+
_metadata_with_request_id,
27+
)
28+
from google.cloud.spanner_v1.request_id_header import REQ_RAND_PROCESS_ID
2429

2530
from tests._helpers import (
2631
HAS_OPENTELEMETRY_INSTALLED,
@@ -197,6 +202,11 @@ def test_begin_ok(self):
197202
[
198203
("google-cloud-resource-prefix", database.name),
199204
("x-goog-spanner-route-to-leader", "true"),
205+
# TODO(@odeke-em): enable with PR #1367.
206+
# (
207+
# "x-goog-spanner-request-id",
208+
# f"1.{REQ_RAND_PROCESS_ID}.{_Client.NTH_CLIENT.value}.1.1.1",
209+
# ),
200210
],
201211
)
202212

@@ -301,6 +311,11 @@ def test_rollback_ok(self):
301311
[
302312
("google-cloud-resource-prefix", database.name),
303313
("x-goog-spanner-route-to-leader", "true"),
314+
# TODO(@odeke-em): enable with PR #1367.
315+
# (
316+
# "x-goog-spanner-request-id",
317+
# f"1.{REQ_RAND_PROCESS_ID}.{_Client.NTH_CLIENT.value}.1.1.1",
318+
# ),
304319
],
305320
)
306321

@@ -492,6 +507,11 @@ def _commit_helper(
492507
[
493508
("google-cloud-resource-prefix", database.name),
494509
("x-goog-spanner-route-to-leader", "true"),
510+
# TODO(@odeke-em): enable with PR #1367.
511+
# (
512+
# "x-goog-spanner-request-id",
513+
# f"1.{REQ_RAND_PROCESS_ID}.{_Client.NTH_CLIENT.value}.1.1.1",
514+
# ),
495515
],
496516
)
497517
self.assertEqual(actual_request_options, expected_request_options)
@@ -666,6 +686,11 @@ def _execute_update_helper(
666686
metadata=[
667687
("google-cloud-resource-prefix", database.name),
668688
("x-goog-spanner-route-to-leader", "true"),
689+
# TODO(@odeke-em): enable with PR #1367.
690+
# (
691+
# "x-goog-spanner-request-id",
692+
# f"1.{REQ_RAND_PROCESS_ID}.{_Client.NTH_CLIENT.value}.1.1.1",
693+
# ),
669694
],
670695
)
671696

@@ -859,6 +884,11 @@ def _batch_update_helper(
859884
metadata=[
860885
("google-cloud-resource-prefix", database.name),
861886
("x-goog-spanner-route-to-leader", "true"),
887+
# TODO(@odeke-em): enable with PR #1367.
888+
# (
889+
# "x-goog-spanner-request-id",
890+
# f"1.{REQ_RAND_PROCESS_ID}.{_Client.NTH_CLIENT.value}.1.1.1",
891+
# ),
862892
],
863893
retry=retry,
864894
timeout=timeout,
@@ -974,6 +1004,11 @@ def test_context_mgr_success(self):
9741004
[
9751005
("google-cloud-resource-prefix", database.name),
9761006
("x-goog-spanner-route-to-leader", "true"),
1007+
# TODO(@odeke-em): enable with PR #1367.
1008+
# (
1009+
# "x-goog-spanner-request-id",
1010+
# f"1.{REQ_RAND_PROCESS_ID}.{_Client.NTH_CLIENT.value}.1.2.1",
1011+
# ),
9771012
],
9781013
)
9791014

@@ -1004,11 +1039,19 @@ def test_context_mgr_failure(self):
10041039

10051040

10061041
class _Client(object):
1042+
NTH_CLIENT = AtomicCounter()
1043+
10071044
def __init__(self):
10081045
from google.cloud.spanner_v1 import ExecuteSqlRequest
10091046

10101047
self._query_options = ExecuteSqlRequest.QueryOptions(optimizer_version="1")
10111048
self.directed_read_options = None
1049+
self._nth_client_id = _Client.NTH_CLIENT.increment()
1050+
self._nth_request = AtomicCounter()
1051+
1052+
@property
1053+
def _next_nth_request(self):
1054+
return self._nth_request.increment()
10121055

10131056

10141057
class _Instance(object):
@@ -1024,6 +1067,27 @@ def __init__(self):
10241067
self._directed_read_options = None
10251068
self.default_transaction_options = DefaultTransactionOptions()
10261069

1070+
@property
1071+
def _next_nth_request(self):
1072+
return self._instance._client._next_nth_request
1073+
1074+
@property
1075+
def _nth_client_id(self):
1076+
return self._instance._client._nth_client_id
1077+
1078+
def metadata_with_request_id(self, nth_request, nth_attempt, prior_metadata=[]):
1079+
return _metadata_with_request_id(
1080+
self._nth_client_id,
1081+
self._channel_id,
1082+
nth_request,
1083+
nth_attempt,
1084+
prior_metadata,
1085+
)
1086+
1087+
@property
1088+
def _channel_id(self):
1089+
return 1
1090+
10271091

10281092
class _Session(object):
10291093
_transaction = None

0 commit comments

Comments
 (0)