Skip to content

Commit 8674169

Browse files
authored
Merge pull request #512 from ydb-platform/query_client_settings
Add missing ability to configure QueryClientSettings
2 parents cea4669 + 54bf667 commit 8674169

File tree

5 files changed

+180
-5
lines changed

5 files changed

+180
-5
lines changed
+143
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,143 @@
1+
import pytest
2+
3+
import ydb
4+
5+
from datetime import date, datetime, timedelta
6+
7+
8+
@pytest.fixture
9+
def settings_on():
10+
settings = (
11+
ydb.QueryClientSettings()
12+
.with_native_date_in_result_sets(True)
13+
.with_native_datetime_in_result_sets(True)
14+
.with_native_timestamp_in_result_sets(True)
15+
.with_native_interval_in_result_sets(True)
16+
.with_native_json_in_result_sets(True)
17+
)
18+
return settings
19+
20+
21+
@pytest.fixture
22+
def settings_off():
23+
settings = (
24+
ydb.QueryClientSettings()
25+
.with_native_date_in_result_sets(False)
26+
.with_native_datetime_in_result_sets(False)
27+
.with_native_timestamp_in_result_sets(False)
28+
.with_native_interval_in_result_sets(False)
29+
.with_native_json_in_result_sets(False)
30+
)
31+
return settings
32+
33+
34+
params = pytest.mark.parametrize(
35+
"value,ydb_type,casted_result,uncasted_result",
36+
[
37+
(365, "Date", date(1971, 1, 1), 365),
38+
(3600 * 24 * 365, "Datetime", datetime(1971, 1, 1, 0, 0), 3600 * 24 * 365),
39+
(timedelta(seconds=1), "Interval", timedelta(seconds=1), 1000000),
40+
(
41+
1511789040123456,
42+
"Timestamp",
43+
datetime.fromisoformat("2017-11-27 13:24:00.123456"),
44+
1511789040123456,
45+
),
46+
('{"foo": "bar"}', "Json", {"foo": "bar"}, '{"foo": "bar"}'),
47+
('{"foo": "bar"}', "JsonDocument", {"foo": "bar"}, '{"foo":"bar"}'),
48+
],
49+
)
50+
51+
52+
class TestQueryClientSettings:
53+
@params
54+
def test_driver_turn_on(self, driver_sync, settings_on, value, ydb_type, casted_result, uncasted_result):
55+
driver_sync._driver_config.query_client_settings = settings_on
56+
pool = ydb.QuerySessionPool(driver_sync)
57+
result = pool.execute_with_retries(
58+
f"DECLARE $param as {ydb_type}; SELECT $param as value",
59+
{"$param": (value, getattr(ydb.PrimitiveType, ydb_type))},
60+
)
61+
assert result[0].rows[0].value == casted_result
62+
pool.stop()
63+
64+
@params
65+
def test_driver_turn_off(self, driver_sync, settings_off, value, ydb_type, casted_result, uncasted_result):
66+
driver_sync._driver_config.query_client_settings = settings_off
67+
pool = ydb.QuerySessionPool(driver_sync)
68+
result = pool.execute_with_retries(
69+
f"DECLARE $param as {ydb_type}; SELECT $param as value",
70+
{"$param": (value, getattr(ydb.PrimitiveType, ydb_type))},
71+
)
72+
assert result[0].rows[0].value == uncasted_result
73+
pool.stop()
74+
75+
@params
76+
def test_session_pool_turn_on(self, driver_sync, settings_on, value, ydb_type, casted_result, uncasted_result):
77+
pool = ydb.QuerySessionPool(driver_sync, query_client_settings=settings_on)
78+
result = pool.execute_with_retries(
79+
f"DECLARE $param as {ydb_type}; SELECT $param as value",
80+
{"$param": (value, getattr(ydb.PrimitiveType, ydb_type))},
81+
)
82+
assert result[0].rows[0].value == casted_result
83+
pool.stop()
84+
85+
@params
86+
def test_session_pool_turn_off(self, driver_sync, settings_off, value, ydb_type, casted_result, uncasted_result):
87+
pool = ydb.QuerySessionPool(driver_sync, query_client_settings=settings_off)
88+
result = pool.execute_with_retries(
89+
f"DECLARE $param as {ydb_type}; SELECT $param as value",
90+
{"$param": (value, getattr(ydb.PrimitiveType, ydb_type))},
91+
)
92+
assert result[0].rows[0].value == uncasted_result
93+
pool.stop()
94+
95+
@pytest.mark.asyncio
96+
@params
97+
async def test_driver_async_turn_on(self, driver, settings_on, value, ydb_type, casted_result, uncasted_result):
98+
driver._driver_config.query_client_settings = settings_on
99+
pool = ydb.aio.QuerySessionPool(driver)
100+
result = await pool.execute_with_retries(
101+
f"DECLARE $param as {ydb_type}; SELECT $param as value",
102+
{"$param": (value, getattr(ydb.PrimitiveType, ydb_type))},
103+
)
104+
assert result[0].rows[0].value == casted_result
105+
await pool.stop()
106+
107+
@pytest.mark.asyncio
108+
@params
109+
async def test_driver_async_turn_off(self, driver, settings_off, value, ydb_type, casted_result, uncasted_result):
110+
driver._driver_config.query_client_settings = settings_off
111+
pool = ydb.aio.QuerySessionPool(driver)
112+
result = await pool.execute_with_retries(
113+
f"DECLARE $param as {ydb_type}; SELECT $param as value",
114+
{"$param": (value, getattr(ydb.PrimitiveType, ydb_type))},
115+
)
116+
assert result[0].rows[0].value == uncasted_result
117+
await pool.stop()
118+
119+
@pytest.mark.asyncio
120+
@params
121+
async def test_session_pool_async_turn_on(
122+
self, driver, settings_on, value, ydb_type, casted_result, uncasted_result
123+
):
124+
pool = ydb.aio.QuerySessionPool(driver, query_client_settings=settings_on)
125+
result = await pool.execute_with_retries(
126+
f"DECLARE $param as {ydb_type}; SELECT $param as value",
127+
{"$param": (value, getattr(ydb.PrimitiveType, ydb_type))},
128+
)
129+
assert result[0].rows[0].value == casted_result
130+
await pool.stop()
131+
132+
@pytest.mark.asyncio
133+
@params
134+
async def test_session_pool_async_turn_off(
135+
self, driver, settings_off, value, ydb_type, casted_result, uncasted_result
136+
):
137+
pool = ydb.aio.QuerySessionPool(driver, query_client_settings=settings_off)
138+
result = await pool.execute_with_retries(
139+
f"DECLARE $param as {ydb_type}; SELECT $param as value",
140+
{"$param": (value, getattr(ydb.PrimitiveType, ydb_type))},
141+
)
142+
assert result[0].rows[0].value == uncasted_result
143+
await pool.stop()

ydb/aio/query/pool.py

+11-2
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
RetrySettings,
1414
retry_operation_async,
1515
)
16+
from ...query.base import QueryClientSettings
1617
from ... import convert
1718
from ..._grpc.grpcwrapper import common_utils
1819

@@ -22,10 +23,17 @@
2223
class QuerySessionPool:
2324
"""QuerySessionPool is an object to simplify operations with sessions of Query Service."""
2425

25-
def __init__(self, driver: common_utils.SupportedDriverType, size: int = 100):
26+
def __init__(
27+
self,
28+
driver: common_utils.SupportedDriverType,
29+
size: int = 100,
30+
*,
31+
query_client_settings: Optional[QueryClientSettings] = None,
32+
):
2633
"""
2734
:param driver: A driver instance
2835
:param size: Size of session pool
36+
:param query_client_settings: ydb.QueryClientSettings object to configure QueryService behavior
2937
"""
3038

3139
self._driver = driver
@@ -35,9 +43,10 @@ def __init__(self, driver: common_utils.SupportedDriverType, size: int = 100):
3543
self._current_size = 0
3644
self._waiters = 0
3745
self._loop = asyncio.get_running_loop()
46+
self._query_client_settings = query_client_settings
3847

3948
async def _create_new_session(self):
40-
session = QuerySession(self._driver)
49+
session = QuerySession(self._driver, settings=self._query_client_settings)
4150
await session.create()
4251
logger.debug(f"New session was created for pool. Session id: {session._state.session_id}")
4352
return session

ydb/driver.py

+3
Original file line numberDiff line numberDiff line change
@@ -89,6 +89,7 @@ class DriverConfig(object):
8989
"secure_channel",
9090
"table_client_settings",
9191
"topic_client_settings",
92+
"query_client_settings",
9293
"endpoints",
9394
"primary_user_agent",
9495
"tracer",
@@ -112,6 +113,7 @@ def __init__(
112113
grpc_keep_alive_timeout=None,
113114
table_client_settings=None,
114115
topic_client_settings=None,
116+
query_client_settings=None,
115117
endpoints=None,
116118
primary_user_agent="python-library",
117119
tracer=None,
@@ -159,6 +161,7 @@ def __init__(
159161
self.grpc_keep_alive_timeout = grpc_keep_alive_timeout
160162
self.table_client_settings = table_client_settings
161163
self.topic_client_settings = topic_client_settings
164+
self.query_client_settings = query_client_settings
162165
self.primary_user_agent = primary_user_agent
163166
self.tracer = tracer if tracer is not None else tracing.Tracer(None)
164167
self.grpc_lb_policy_name = grpc_lb_policy_name

ydb/query/pool.py

+11-2
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
import threading
99
import queue
1010

11+
from .base import QueryClientSettings
1112
from .session import (
1213
QuerySession,
1314
)
@@ -27,10 +28,17 @@
2728
class QuerySessionPool:
2829
"""QuerySessionPool is an object to simplify operations with sessions of Query Service."""
2930

30-
def __init__(self, driver: common_utils.SupportedDriverType, size: int = 100):
31+
def __init__(
32+
self,
33+
driver: common_utils.SupportedDriverType,
34+
size: int = 100,
35+
*,
36+
query_client_settings: Optional[QueryClientSettings] = None,
37+
):
3138
"""
3239
:param driver: A driver instance.
3340
:param size: Max size of Session Pool.
41+
:param query_client_settings: ydb.QueryClientSettings object to configure QueryService behavior
3442
"""
3543

3644
self._driver = driver
@@ -39,9 +47,10 @@ def __init__(self, driver: common_utils.SupportedDriverType, size: int = 100):
3947
self._size = size
4048
self._should_stop = threading.Event()
4149
self._lock = threading.RLock()
50+
self._query_client_settings = query_client_settings
4251

4352
def _create_new_session(self, timeout: Optional[float]):
44-
session = QuerySession(self._driver)
53+
session = QuerySession(self._driver, settings=self._query_client_settings)
4554
session.create(settings=BaseRequestSettings().with_timeout(timeout))
4655
logger.debug(f"New session was created for pool. Session id: {session._state.session_id}")
4756
return session

ydb/query/session.py

+12-1
Original file line numberDiff line numberDiff line change
@@ -134,9 +134,20 @@ class BaseQuerySession:
134134

135135
def __init__(self, driver: common_utils.SupportedDriverType, settings: Optional[base.QueryClientSettings] = None):
136136
self._driver = driver
137-
self._settings = settings if settings is not None else base.QueryClientSettings()
137+
self._settings = self._get_client_settings(driver, settings)
138138
self._state = QuerySessionState(settings)
139139

140+
def _get_client_settings(
141+
self,
142+
driver: common_utils.SupportedDriverType,
143+
settings: Optional[base.QueryClientSettings] = None,
144+
) -> base.QueryClientSettings:
145+
if settings is not None:
146+
return settings
147+
if driver._driver_config.query_client_settings is not None:
148+
return driver._driver_config.query_client_settings
149+
return base.QueryClientSettings()
150+
140151
def _create_call(self, settings: Optional[BaseRequestSettings] = None) -> "BaseQuerySession":
141152
return self._driver(
142153
_apis.ydb_query.CreateSessionRequest(),

0 commit comments

Comments
 (0)