From b76a5dd274591d5f7096b83af4cbf963869980a8 Mon Sep 17 00:00:00 2001 From: Oleg Ovcharuk Date: Tue, 22 Apr 2025 12:21:28 +0300 Subject: [PATCH] QueryService stats support --- examples/query-stats/main.py | 44 +++++++++++++++++++++++++++ tests/query/test_query_session.py | 33 +++++++++++++++++++- tests/query/test_query_transaction.py | 30 ++++++++++++++++++ ydb/aio/query/pool.py | 2 +- ydb/aio/query/session.py | 13 ++++++-- ydb/aio/query/transaction.py | 16 +++++++--- ydb/query/__init__.py | 2 ++ ydb/query/base.py | 13 ++++++-- ydb/query/pool.py | 2 +- ydb/query/session.py | 31 +++++++++++++++---- ydb/query/transaction.py | 31 ++++++++++++++----- 11 files changed, 193 insertions(+), 24 deletions(-) create mode 100644 examples/query-stats/main.py diff --git a/examples/query-stats/main.py b/examples/query-stats/main.py new file mode 100644 index 00000000..5db4d299 --- /dev/null +++ b/examples/query-stats/main.py @@ -0,0 +1,44 @@ +import os +import ydb + + +SESSION_QUERY = "SELECT 1" +TX_QUERY = "SELECT 1" + + +def get_query_stats_from_session(pool: ydb.QuerySessionPool): + def callee(session: ydb.QuerySession): + with session.execute(SESSION_QUERY, stats_mode=ydb.QueryStatsMode.PROFILE): + pass + + print(session.last_query_stats) + + pool.retry_operation_sync(callee) + + +def get_query_stats_from_tx(pool: ydb.QuerySessionPool): + def callee(tx: ydb.QueryTxContext): + with tx.execute(TX_QUERY, stats_mode=ydb.QueryStatsMode.PROFILE): + pass + + print(tx.last_query_stats) + + pool.retry_tx_sync(callee) + + +def main(): + driver = ydb.Driver( + endpoint=os.getenv("YDB_ENDPOINT", "grpc://localhost:2136"), + database=os.getenv("YDB_DATABASE", "/local"), + credentials=ydb.AnonymousCredentials(), + ) + + with driver: + # wait until driver become initialized + driver.wait(fail_fast=True, timeout=5) + with ydb.QuerySessionPool(driver) as pool: + get_query_stats_from_session(pool) + get_query_stats_from_tx(pool) + + +main() diff --git a/tests/query/test_query_session.py b/tests/query/test_query_session.py index f151661f..af861668 100644 --- a/tests/query/test_query_session.py +++ b/tests/query/test_query_session.py @@ -4,7 +4,7 @@ from concurrent.futures import _base as b from unittest import mock - +from ydb.query.base import QueryStatsMode from ydb.query.session import QuerySession @@ -143,3 +143,34 @@ def cancel(self): assert "attach stream thread" not in thread_names _check_session_state_empty(session) + + @pytest.mark.parametrize( + "stats_mode", + [ + None, + QueryStatsMode.UNSPECIFIED, + QueryStatsMode.NONE, + QueryStatsMode.BASIC, + QueryStatsMode.FULL, + QueryStatsMode.PROFILE, + ], + ) + def test_stats_mode(self, session: QuerySession, stats_mode: QueryStatsMode): + session.create() + + for _ in session.execute("SELECT 1; SELECT 2; SELECT 3;", stats_mode=stats_mode): + pass + + stats = session.last_query_stats + + if stats_mode in [None, QueryStatsMode.NONE, QueryStatsMode.UNSPECIFIED]: + assert stats is None + return + + assert stats is not None + assert len(stats.query_phases) > 0 + + if stats_mode != QueryStatsMode.BASIC: + assert len(stats.query_plan) > 0 + else: + assert stats.query_plan == "" diff --git a/tests/query/test_query_transaction.py b/tests/query/test_query_transaction.py index 4533e528..77c7251b 100644 --- a/tests/query/test_query_transaction.py +++ b/tests/query/test_query_transaction.py @@ -1,5 +1,6 @@ import pytest +from ydb.query.base import QueryStatsMode from ydb.query.transaction import QueryTxContext from ydb.query.transaction import QueryTxStateEnum @@ -104,3 +105,32 @@ def test_tx_identity_after_begin_works(self, tx: QueryTxContext): assert identity.tx_id == tx.tx_id assert identity.session_id == tx.session_id + + @pytest.mark.parametrize( + "stats_mode", + [ + None, + QueryStatsMode.UNSPECIFIED, + QueryStatsMode.NONE, + QueryStatsMode.BASIC, + QueryStatsMode.FULL, + QueryStatsMode.PROFILE, + ], + ) + def test_stats_mode(self, tx: QueryTxContext, stats_mode: QueryStatsMode): + for _ in tx.execute("SELECT 1; SELECT 2; SELECT 3;", commit_tx=True, stats_mode=stats_mode): + pass + + stats = tx.last_query_stats + + if stats_mode in [None, QueryStatsMode.NONE, QueryStatsMode.UNSPECIFIED]: + assert stats is None + return + + assert stats is not None + assert len(stats.query_phases) > 0 + + if stats_mode != QueryStatsMode.BASIC: + assert len(stats.query_plan) > 0 + else: + assert stats.query_plan == "" diff --git a/ydb/aio/query/pool.py b/ydb/aio/query/pool.py index f1ca68d1..b691a1b1 100644 --- a/ydb/aio/query/pool.py +++ b/ydb/aio/query/pool.py @@ -142,7 +142,7 @@ async def retry_tx_async( """Special interface to execute a bunch of commands with transaction in a safe, retriable way. :param callee: A function, that works with session. - :param tx_mode: Transaction mode, which is a one from the following choises: + :param tx_mode: Transaction mode, which is a one from the following choices: 1) QuerySerializableReadWrite() which is default mode; 2) QueryOnlineReadOnly(allow_inconsistent_reads=False); 3) QuerySnapshotReadOnly(); diff --git a/ydb/aio/query/session.py b/ydb/aio/query/session.py index 0561de8c..fe857878 100644 --- a/ydb/aio/query/session.py +++ b/ydb/aio/query/session.py @@ -117,15 +117,22 @@ async def execute( exec_mode: base.QueryExecMode = None, concurrent_result_sets: bool = False, settings: Optional[BaseRequestSettings] = None, + *, + stats_mode: Optional[base.QueryStatsMode] = None, ) -> AsyncResponseContextIterator: """Sends a query to Query Service :param query: (YQL or SQL text) to be executed. - :param syntax: Syntax of the query, which is a one from the following choises: + :param syntax: Syntax of the query, which is a one from the following choices: 1) QuerySyntax.YQL_V1, which is default; 2) QuerySyntax.PG. :param parameters: dict with parameters and YDB types; :param concurrent_result_sets: A flag to allow YDB mix parts of different result sets. Default is False; + :param stats_mode: Mode of query statistics to gather, which is a one from the following choices: + 1) QueryStatsMode:NONE, which is default; + 2) QueryStatsMode.BASIC; + 3) QueryStatsMode.FULL; + 4) QueryStatsMode.PROFILE; :return: Iterator with result sets """ @@ -133,10 +140,11 @@ async def execute( stream_it = await self._execute_call( query=query, + parameters=parameters, commit_tx=True, syntax=syntax, exec_mode=exec_mode, - parameters=parameters, + stats_mode=stats_mode, concurrent_result_sets=concurrent_result_sets, settings=settings, ) @@ -147,6 +155,7 @@ async def execute( rpc_state=None, response_pb=resp, session_state=self._state, + session=self, settings=self._settings, ), ) diff --git a/ydb/aio/query/transaction.py b/ydb/aio/query/transaction.py index f0547e5f..c9a6e445 100644 --- a/ydb/aio/query/transaction.py +++ b/ydb/aio/query/transaction.py @@ -29,7 +29,7 @@ def __init__(self, driver, session_state, session, tx_mode): :param driver: A driver instance :param session_state: A state of session - :param tx_mode: Transaction mode, which is a one from the following choises: + :param tx_mode: Transaction mode, which is a one from the following choices: 1) QuerySerializableReadWrite() which is default mode; 2) QueryOnlineReadOnly(allow_inconsistent_reads=False); 3) QuerySnapshotReadOnly(); @@ -142,21 +142,28 @@ async def execute( exec_mode: Optional[base.QueryExecMode] = None, concurrent_result_sets: Optional[bool] = False, settings: Optional[BaseRequestSettings] = None, + *, + stats_mode: Optional[base.QueryStatsMode] = None, ) -> AsyncResponseContextIterator: """Sends a query to Query Service :param query: (YQL or SQL text) to be executed. :param parameters: dict with parameters and YDB types; :param commit_tx: A special flag that allows transaction commit. - :param syntax: Syntax of the query, which is a one from the following choises: + :param syntax: Syntax of the query, which is a one from the following choices: 1) QuerySyntax.YQL_V1, which is default; 2) QuerySyntax.PG. - :param exec_mode: Exec mode of the query, which is a one from the following choises: + :param exec_mode: Exec mode of the query, which is a one from the following choices: 1) QueryExecMode.EXECUTE, which is default; 2) QueryExecMode.EXPLAIN; 3) QueryExecMode.VALIDATE; 4) QueryExecMode.PARSE. :param concurrent_result_sets: A flag to allow YDB mix parts of different result sets. Default is False; + :param stats_mode: Mode of query statistics to gather, which is a one from the following choices: + 1) QueryStatsMode:NONE, which is default; + 2) QueryStatsMode.BASIC; + 3) QueryStatsMode.FULL; + 4) QueryStatsMode.PROFILE; :return: Iterator with result sets """ @@ -164,10 +171,11 @@ async def execute( stream_it = await self._execute_call( query=query, + parameters=parameters, commit_tx=commit_tx, syntax=syntax, exec_mode=exec_mode, - parameters=parameters, + stats_mode=stats_mode, concurrent_result_sets=concurrent_result_sets, settings=settings, ) diff --git a/ydb/query/__init__.py b/ydb/query/__init__.py index 59dd7992..76436f98 100644 --- a/ydb/query/__init__.py +++ b/ydb/query/__init__.py @@ -7,6 +7,7 @@ "QuerySessionPool", "QueryClientSettings", "QuerySession", + "QueryStatsMode", "QueryTxContext", ] @@ -14,6 +15,7 @@ from .base import ( QueryClientSettings, + QueryStatsMode, ) from .session import QuerySession diff --git a/ydb/query/base.py b/ydb/query/base.py index a5ebedd9..52a6312e 100644 --- a/ydb/query/base.py +++ b/ydb/query/base.py @@ -25,6 +25,7 @@ if typing.TYPE_CHECKING: from .transaction import BaseQueryTxContext + from .session import BaseQuerySession class QuerySyntax(enum.IntEnum): @@ -41,7 +42,7 @@ class QueryExecMode(enum.IntEnum): EXECUTE = 50 -class StatsMode(enum.IntEnum): +class QueryStatsMode(enum.IntEnum): UNSPECIFIED = 0 NONE = 10 BASIC = 20 @@ -132,12 +133,13 @@ def create_execute_query_request( tx_mode: Optional[BaseQueryTxMode], syntax: Optional[QuerySyntax], exec_mode: Optional[QueryExecMode], + stats_mode: Optional[QueryStatsMode], parameters: Optional[dict], concurrent_result_sets: Optional[bool], ) -> ydb_query.ExecuteQueryRequest: syntax = QuerySyntax.YQL_V1 if not syntax else syntax exec_mode = QueryExecMode.EXECUTE if not exec_mode else exec_mode - stats_mode = StatsMode.NONE # TODO: choise is not supported yet + stats_mode = QueryStatsMode.NONE if stats_mode is None else stats_mode tx_control = None if not tx_id and not tx_mode: @@ -189,6 +191,7 @@ def wrap_execute_query_response( response_pb: _apis.ydb_query.ExecuteQueryResponsePart, session_state: IQuerySessionState, tx: Optional["BaseQueryTxContext"] = None, + session: Optional["BaseQuerySession"] = None, commit_tx: Optional[bool] = False, settings: Optional[QueryClientSettings] = None, ) -> convert.ResultSet: @@ -198,6 +201,12 @@ def wrap_execute_query_response( elif tx and response_pb.tx_meta and not tx.tx_id: tx._move_to_beginned(response_pb.tx_meta.id) + if response_pb.HasField("exec_stats"): + if tx is not None: + tx._last_query_stats = response_pb.exec_stats + if session is not None: + session._last_query_stats = response_pb.exec_stats + if response_pb.HasField("result_set"): return convert.ResultSet.from_message(response_pb.result_set, settings) diff --git a/ydb/query/pool.py b/ydb/query/pool.py index b25f7db8..1cf95ac0 100644 --- a/ydb/query/pool.py +++ b/ydb/query/pool.py @@ -151,7 +151,7 @@ def retry_tx_sync( """Special interface to execute a bunch of commands with transaction in a safe, retriable way. :param callee: A function, that works with session. - :param tx_mode: Transaction mode, which is a one from the following choises: + :param tx_mode: Transaction mode, which is a one from the following choices: 1) QuerySerializableReadWrite() which is default mode; 2) QueryOnlineReadOnly(allow_inconsistent_reads=False); 3) QuerySnapshotReadOnly(); diff --git a/ydb/query/session.py b/ydb/query/session.py index 382c922d..3cc6c13d 100644 --- a/ydb/query/session.py +++ b/ydb/query/session.py @@ -147,6 +147,12 @@ def __init__(self, driver: common_utils.SupportedDriverType, settings: Optional[ .with_timeout(DEFAULT_ATTACH_LONG_TIMEOUT) ) + self._last_query_stats = None + + @property + def last_query_stats(self): + return self._last_query_stats + def _get_client_settings( self, driver: common_utils.SupportedDriverType, @@ -189,22 +195,26 @@ def _attach_call(self) -> Iterable[_apis.ydb_query.SessionState]: def _execute_call( self, query: str, + parameters: dict = None, commit_tx: bool = False, syntax: base.QuerySyntax = None, exec_mode: base.QueryExecMode = None, - parameters: dict = None, + stats_mode: Optional[base.QueryStatsMode] = None, concurrent_result_sets: bool = False, settings: Optional[BaseRequestSettings] = None, ) -> Iterable[_apis.ydb_query.ExecuteQueryResponsePart]: + self._last_query_stats = None + request = base.create_execute_query_request( query=query, - session_id=self._state.session_id, + parameters=parameters, commit_tx=commit_tx, + session_id=self._state.session_id, tx_mode=None, tx_id=None, syntax=syntax, exec_mode=exec_mode, - parameters=parameters, + stats_mode=stats_mode, concurrent_result_sets=concurrent_result_sets, ) @@ -293,7 +303,7 @@ def create(self, settings: Optional[BaseRequestSettings] = None) -> "QuerySessio def transaction(self, tx_mode: Optional[base.BaseQueryTxMode] = None) -> QueryTxContext: """Creates a transaction context manager with specified transaction mode. - :param tx_mode: Transaction mode, which is a one from the following choises: + :param tx_mode: Transaction mode, which is a one from the following choices: 1) QuerySerializableReadWrite() which is default mode; 2) QueryOnlineReadOnly(allow_inconsistent_reads=False); 3) QuerySnapshotReadOnly(); @@ -321,15 +331,22 @@ def execute( exec_mode: base.QueryExecMode = None, concurrent_result_sets: bool = False, settings: Optional[BaseRequestSettings] = None, + *, + stats_mode: Optional[base.QueryStatsMode] = None, ) -> base.SyncResponseContextIterator: """Sends a query to Query Service :param query: (YQL or SQL text) to be executed. - :param syntax: Syntax of the query, which is a one from the following choises: + :param syntax: Syntax of the query, which is a one from the following choices: 1) QuerySyntax.YQL_V1, which is default; 2) QuerySyntax.PG. :param parameters: dict with parameters and YDB types; :param concurrent_result_sets: A flag to allow YDB mix parts of different result sets. Default is False; + :param stats_mode: Mode of query statistics to gather, which is a one from the following choices: + 1) QueryStatsMode:NONE, which is default; + 2) QueryStatsMode.BASIC; + 3) QueryStatsMode.FULL; + 4) QueryStatsMode.PROFILE; :return: Iterator with result sets """ @@ -337,10 +354,11 @@ def execute( stream_it = self._execute_call( query=query, + parameters=parameters, commit_tx=True, syntax=syntax, exec_mode=exec_mode, - parameters=parameters, + stats_mode=stats_mode, concurrent_result_sets=concurrent_result_sets, settings=settings, ) @@ -351,6 +369,7 @@ def execute( rpc_state=None, response_pb=resp, session_state=self._state, + session=self, settings=self._settings, ), ) diff --git a/ydb/query/transaction.py b/ydb/query/transaction.py index ae7642db..008ac7c4 100644 --- a/ydb/query/transaction.py +++ b/ydb/query/transaction.py @@ -197,7 +197,7 @@ def __init__(self, driver, session_state, session, tx_mode): :param driver: A driver instance :param session_state: A state of session - :param tx_mode: Transaction mode, which is a one from the following choises: + :param tx_mode: Transaction mode, which is a one from the following choices: 1) QuerySerializableReadWrite() which is default mode; 2) QueryOnlineReadOnly(allow_inconsistent_reads=False); 3) QuerySnapshotReadOnly(); @@ -210,6 +210,7 @@ def __init__(self, driver, session_state, session, tx_mode): self.session = session self._prev_stream = None self._external_error = None + self._last_query_stats = None @property def session_id(self) -> str: @@ -229,6 +230,10 @@ def tx_id(self) -> Optional[str]: """ return self._tx_state.tx_id + @property + def last_query_stats(self): + return self._last_query_stats + def _tx_identity(self) -> _ydb_topic.TransactionIdentity: if not self.tx_id: raise RuntimeError("Unable to get tx identity without started tx.") @@ -283,25 +288,29 @@ def _rollback_call(self, settings: Optional[BaseRequestSettings]) -> "BaseQueryT def _execute_call( self, query: str, + parameters: Optional[dict], commit_tx: Optional[bool], syntax: Optional[base.QuerySyntax], exec_mode: Optional[base.QueryExecMode], - parameters: Optional[dict], + stats_mode: Optional[base.QueryStatsMode], concurrent_result_sets: Optional[bool], settings: Optional[BaseRequestSettings], ) -> Iterable[_apis.ydb_query.ExecuteQueryResponsePart]: self._tx_state._check_tx_ready_to_use() self._check_external_error_set() + self._last_query_stats = None + request = base.create_execute_query_request( query=query, - session_id=self._session_state.session_id, + parameters=parameters, commit_tx=commit_tx, + session_id=self._session_state.session_id, tx_id=self._tx_state.tx_id, tx_mode=self._tx_state.tx_mode, syntax=syntax, exec_mode=exec_mode, - parameters=parameters, + stats_mode=stats_mode, concurrent_result_sets=concurrent_result_sets, ) @@ -338,7 +347,7 @@ def __init__(self, driver, session_state, session, tx_mode): :param driver: A driver instance :param session_state: A state of session - :param tx_mode: Transaction mode, which is a one from the following choises: + :param tx_mode: Transaction mode, which is a one from the following choices: 1) QuerySerializableReadWrite() which is default mode; 2) QueryOnlineReadOnly(allow_inconsistent_reads=False); 3) QuerySnapshotReadOnly(); @@ -451,22 +460,29 @@ def execute( exec_mode: Optional[base.QueryExecMode] = None, concurrent_result_sets: Optional[bool] = False, settings: Optional[BaseRequestSettings] = None, + *, + stats_mode: Optional[base.QueryStatsMode] = None, ) -> base.SyncResponseContextIterator: """Sends a query to Query Service :param query: (YQL or SQL text) to be executed. :param parameters: dict with parameters and YDB types; :param commit_tx: A special flag that allows transaction commit. - :param syntax: Syntax of the query, which is a one from the following choises: + :param syntax: Syntax of the query, which is a one from the following choices: 1) QuerySyntax.YQL_V1, which is default; 2) QuerySyntax.PG. - :param exec_mode: Exec mode of the query, which is a one from the following choises: + :param exec_mode: Exec mode of the query, which is a one from the following choices: 1) QueryExecMode.EXECUTE, which is default; 2) QueryExecMode.EXPLAIN; 3) QueryExecMode.VALIDATE; 4) QueryExecMode.PARSE. :param concurrent_result_sets: A flag to allow YDB mix parts of different result sets. Default is False; :param settings: An additional request settings BaseRequestSettings; + :param stats_mode: Mode of query statistics to gather, which is a one from the following choices: + 1) QueryStatsMode:NONE, which is default; + 2) QueryStatsMode.BASIC; + 3) QueryStatsMode.FULL; + 4) QueryStatsMode.PROFILE; :return: Iterator with result sets """ @@ -477,6 +493,7 @@ def execute( commit_tx=commit_tx, syntax=syntax, exec_mode=exec_mode, + stats_mode=stats_mode, parameters=parameters, concurrent_result_sets=concurrent_result_sets, settings=settings,