From 09e2dd7b9fd5199cd18f4a356698c7f27b675e5b Mon Sep 17 00:00:00 2001 From: ansipunk Date: Sat, 2 Mar 2024 19:29:15 +0500 Subject: [PATCH 01/15] Add and test psycopg3 support --- .github/workflows/test-suite.yml | 9 +++++---- README.md | 3 +++ databases/backends/common/records.py | 2 -- docs/index.md | 3 +++ requirements.txt | 3 +++ setup.py | 9 +++++---- tests/test_databases.py | 20 +++++++++++++++++--- tests/test_integration.py | 23 ++++++++++++++--------- 8 files changed, 50 insertions(+), 22 deletions(-) diff --git a/.github/workflows/test-suite.yml b/.github/workflows/test-suite.yml index f85ca99a..a46893de 100644 --- a/.github/workflows/test-suite.yml +++ b/.github/workflows/test-suite.yml @@ -18,7 +18,7 @@ jobs: services: mysql: - image: mysql:5.7 + image: mariadb:11 env: MYSQL_USER: username MYSQL_PASSWORD: password @@ -26,10 +26,10 @@ jobs: MYSQL_DATABASE: testsuite ports: - 3306:3306 - options: --health-cmd="mysqladmin ping" --health-interval=10s --health-timeout=5s --health-retries=3 + options: --health-cmd="mariadb-admin ping" --health-interval=10s --health-timeout=5s --health-retries=3 postgres: - image: postgres:14 + image: postgres:16 env: POSTGRES_USER: username POSTGRES_PASSWORD: password @@ -59,5 +59,6 @@ jobs: mysql+asyncmy://username:password@localhost:3306/testsuite, postgresql://username:password@localhost:5432/testsuite, postgresql+aiopg://username:password@127.0.0.1:5432/testsuite, - postgresql+asyncpg://username:password@localhost:5432/testsuite + postgresql+asyncpg://username:password@localhost:5432/testsuite, + postgresql+psycopg://username:password@localhost:5432/testsuite run: "scripts/test" diff --git a/README.md b/README.md index f40cd173..edf68e40 100644 --- a/README.md +++ b/README.md @@ -33,6 +33,7 @@ Database drivers supported are: * [asyncpg][asyncpg] * [aiopg][aiopg] +* [psycopg3][psycopg3] * [aiomysql][aiomysql] * [asyncmy][asyncmy] * [aiosqlite][aiosqlite] @@ -42,6 +43,7 @@ You can install the required database drivers with: ```shell $ pip install databases[asyncpg] $ pip install databases[aiopg] +$ pip install databases[psycopg3] $ pip install databases[aiomysql] $ pip install databases[asyncmy] $ pip install databases[aiosqlite] @@ -105,6 +107,7 @@ for examples of how to start using databases together with SQLAlchemy core expre [pymysql]: https://github.com/PyMySQL/PyMySQL [asyncpg]: https://github.com/MagicStack/asyncpg [aiopg]: https://github.com/aio-libs/aiopg +[psycopg3]: https://github.com/psycopg/psycopg [aiomysql]: https://github.com/aio-libs/aiomysql [asyncmy]: https://github.com/long2ice/asyncmy [aiosqlite]: https://github.com/omnilib/aiosqlite diff --git a/databases/backends/common/records.py b/databases/backends/common/records.py index e963af50..f57de839 100644 --- a/databases/backends/common/records.py +++ b/databases/backends/common/records.py @@ -1,6 +1,4 @@ -import enum import typing -from datetime import date, datetime, time from sqlalchemy.engine.interfaces import Dialect from sqlalchemy.engine.row import Row as SQLRow diff --git a/docs/index.md b/docs/index.md index 7c3cebf2..c4e581d5 100644 --- a/docs/index.md +++ b/docs/index.md @@ -31,6 +31,7 @@ Database drivers supported are: * [asyncpg][asyncpg] * [aiopg][aiopg] +* [psycopg3][psycopg3] * [aiomysql][aiomysql] * [asyncmy][asyncmy] * [aiosqlite][aiosqlite] @@ -40,6 +41,7 @@ You can install the required database drivers with: ```shell $ pip install databases[asyncpg] $ pip install databases[aiopg] +$ pip install databases[psycopg3] $ pip install databases[aiomysql] $ pip install databases[asyncmy] $ pip install databases[aiosqlite] @@ -103,6 +105,7 @@ for examples of how to start using databases together with SQLAlchemy core expre [pymysql]: https://github.com/PyMySQL/PyMySQL [asyncpg]: https://github.com/MagicStack/asyncpg [aiopg]: https://github.com/aio-libs/aiopg +[psycopg3]: https://github.com/psycopg/psycopg [aiomysql]: https://github.com/aio-libs/aiomysql [asyncmy]: https://github.com/long2ice/asyncmy [aiosqlite]: https://github.com/omnilib/aiosqlite diff --git a/requirements.txt b/requirements.txt index 8b05a46e..be086f77 100644 --- a/requirements.txt +++ b/requirements.txt @@ -6,9 +6,12 @@ aiomysql==0.2.0 aiopg==1.4.0 aiosqlite==0.20.0 asyncpg==0.29.0 +psycopg==3.1.18 +psycopg-binary==3.1.18 # Sync database drivers for standard tooling around setup/teardown/migrations. psycopg==3.1.18 +psycopg-binary==3.1.18 pymysql==1.1.0 # Testing diff --git a/setup.py b/setup.py index 41c0c584..9218bc1f 100644 --- a/setup.py +++ b/setup.py @@ -47,14 +47,15 @@ def get_packages(package): author_email="tom@tomchristie.com", packages=get_packages("databases"), package_data={"databases": ["py.typed"]}, - install_requires=["sqlalchemy>=2.0.7"], + install_requires=["sqlalchemy>=2.0.11"], extras_require={ - "postgresql": ["asyncpg"], - "asyncpg": ["asyncpg"], - "aiopg": ["aiopg"], "mysql": ["aiomysql"], "aiomysql": ["aiomysql"], "asyncmy": ["asyncmy"], + "postgresql": ["asyncpg"], + "aiopg": ["aiopg"], + "asyncpg": ["asyncpg"], + "psycopg3": ["psycopg"], "sqlite": ["aiosqlite"], "aiosqlite": ["aiosqlite"], }, diff --git a/tests/test_databases.py b/tests/test_databases.py index d9d9e4d6..90771b1d 100644 --- a/tests/test_databases.py +++ b/tests/test_databases.py @@ -134,6 +134,7 @@ def create_test_database(): "postgresql+aiopg", "sqlite+aiosqlite", "postgresql+asyncpg", + "postgresql+psycopg", ]: url = str(database_url.replace(driver=None)) engine = sqlalchemy.create_engine(url) @@ -151,6 +152,7 @@ def create_test_database(): "postgresql+aiopg", "sqlite+aiosqlite", "postgresql+asyncpg", + "postgresql+psycopg", ]: url = str(database_url.replace(driver=None)) engine = sqlalchemy.create_engine(url) @@ -1354,7 +1356,11 @@ async def test_queries_with_expose_backend_connection(database_url): elif database.url.scheme == "mysql+asyncmy": async with raw_connection.cursor() as cursor: await cursor.execute(insert_query, values) - elif database.url.scheme in ["postgresql", "postgresql+asyncpg"]: + elif database.url.scheme in [ + "postgresql", + "postgresql+asyncpg", + "postgresql+psycopg", + ]: await raw_connection.execute(insert_query, *values) elif database.url.scheme in ["sqlite", "sqlite+aiosqlite"]: await raw_connection.execute(insert_query, values) @@ -1392,7 +1398,11 @@ async def test_queries_with_expose_backend_connection(database_url): async with raw_connection.cursor() as cursor: await cursor.execute(select_query) results = await cursor.fetchall() - elif database.url.scheme in ["postgresql", "postgresql+asyncpg"]: + elif database.url.scheme in [ + "postgresql", + "postgresql+asyncpg", + "postgresql+psycopg", + ]: results = await raw_connection.fetch(select_query) elif database.url.scheme in ["sqlite", "sqlite+aiosqlite"]: results = await raw_connection.execute_fetchall(select_query) @@ -1407,7 +1417,11 @@ async def test_queries_with_expose_backend_connection(database_url): assert results[2][2] == True # fetch_one() - if database.url.scheme in ["postgresql", "postgresql+asyncpg"]: + if database.url.scheme in [ + "postgresql", + "postgresql+asyncpg", + "postgresql+psycopg", + ]: result = await raw_connection.fetchrow(select_query) elif database.url.scheme == "mysql+asyncmy": async with raw_connection.cursor() as cursor: diff --git a/tests/test_integration.py b/tests/test_integration.py index 139f8ffe..0605529f 100644 --- a/tests/test_integration.py +++ b/tests/test_integration.py @@ -1,7 +1,10 @@ +import contextlib + import pytest import sqlalchemy from starlette.applications import Starlette from starlette.responses import JSONResponse +from starlette.routing import Route from starlette.testclient import TestClient from databases import Database, DatabaseURL @@ -29,6 +32,7 @@ def create_test_database(): "postgresql+aiopg", "sqlite+aiosqlite", "postgresql+asyncpg", + "postgresql+psycopg", ]: url = str(database_url.replace(driver=None)) engine = sqlalchemy.create_engine(url) @@ -45,6 +49,7 @@ def create_test_database(): "postgresql+aiopg", "sqlite+aiosqlite", "postgresql+asyncpg", + "postgresql+psycopg", ]: url = str(database_url.replace(driver=None)) engine = sqlalchemy.create_engine(url) @@ -53,17 +58,13 @@ def create_test_database(): def get_app(database_url): database = Database(database_url, force_rollback=True) - app = Starlette() - @app.on_event("startup") - async def startup(): + @contextlib.asynccontextmanager + async def lifespan(app): await database.connect() - - @app.on_event("shutdown") - async def shutdown(): + yield await database.disconnect() - @app.route("/notes", methods=["GET"]) async def list_notes(request): query = notes.select() results = await database.fetch_all(query) @@ -73,14 +74,18 @@ async def list_notes(request): ] return JSONResponse(content) - @app.route("/notes", methods=["POST"]) async def add_note(request): data = await request.json() query = notes.insert().values(text=data["text"], completed=data["completed"]) await database.execute(query) return JSONResponse({"text": data["text"], "completed": data["completed"]}) - return app + routes = [ + Route("/notes", list_notes, methods=["GET"]), + Route("/notes", add_note, methods=["POST"]), + ] + + return Starlette(routes=routes, lifespan=lifespan) @pytest.mark.parametrize("database_url", DATABASE_URLS) From 37c450cf1d7848def2fa9483f6c7c507038d9829 Mon Sep 17 00:00:00 2001 From: ansipunk Date: Sat, 2 Mar 2024 19:40:30 +0500 Subject: [PATCH 02/15] Update docker-compose example to use newer MariaDB image --- docs/contributing.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/docs/contributing.md b/docs/contributing.md index 92ab3b3c..fccc8687 100644 --- a/docs/contributing.md +++ b/docs/contributing.md @@ -73,7 +73,7 @@ run all of those with lint script version: '2.1' services: postgres: - image: postgres:10.8 + image: postgres:16 environment: POSTGRES_USER: username POSTGRES_PASSWORD: password @@ -82,7 +82,7 @@ run all of those with lint script - 5432:5432 mysql: - image: mysql:5.7 + image: mariadb:11 environment: MYSQL_USER: username MYSQL_PASSWORD: password From 8cbcccb3c09304a51d14fcbcbe45e5f983de248f Mon Sep 17 00:00:00 2001 From: ansipunk Date: Sat, 2 Mar 2024 22:40:33 +0500 Subject: [PATCH 03/15] Rename PostgresBackend to AsyncpgBackend --- .../backends/{postgres.py => asyncpg.py} | 33 ++--- databases/backends/dialects/psycopg.py | 11 +- databases/backends/psycopg.py | 118 ++++++++++++++++++ databases/core.py | 8 +- tests/test_connection_options.py | 20 +-- 5 files changed, 154 insertions(+), 36 deletions(-) rename databases/backends/{postgres.py => asyncpg.py} (88%) create mode 100644 databases/backends/psycopg.py diff --git a/databases/backends/postgres.py b/databases/backends/asyncpg.py similarity index 88% rename from databases/backends/postgres.py rename to databases/backends/asyncpg.py index c42688e1..98ac44ea 100644 --- a/databases/backends/postgres.py +++ b/databases/backends/asyncpg.py @@ -7,7 +7,7 @@ from sqlalchemy.sql.ddl import DDLElement from databases.backends.common.records import Record, create_column_maps -from databases.backends.dialects.psycopg import dialect as psycopg_dialect +from databases.backends.dialects.psycopg import get_dialect from databases.core import LOG_EXTRA, DatabaseURL from databases.interfaces import ( ConnectionBackend, @@ -19,28 +19,15 @@ logger = logging.getLogger("databases") -class PostgresBackend(DatabaseBackend): +class AsyncpgBackend(DatabaseBackend): def __init__( self, database_url: typing.Union[DatabaseURL, str], **options: typing.Any ) -> None: self._database_url = DatabaseURL(database_url) self._options = options - self._dialect = self._get_dialect() + self._dialect = get_dialect() self._pool = None - def _get_dialect(self) -> Dialect: - dialect = psycopg_dialect(paramstyle="pyformat") - - dialect.implicit_returning = True - dialect.supports_native_enum = True - dialect.supports_smallserial = True # 9.2+ - dialect._backslash_escapes = False - dialect.supports_sane_multi_rowcount = True # psycopg 2.0.9+ - dialect._has_native_hstore = True - dialect.supports_native_decimal = True - - return dialect - def _get_connection_kwargs(self) -> dict: url_options = self._database_url.options @@ -78,12 +65,12 @@ async def disconnect(self) -> None: await self._pool.close() self._pool = None - def connection(self) -> "PostgresConnection": - return PostgresConnection(self, self._dialect) + def connection(self) -> "AsyncpgConnection": + return AsyncpgConnection(self, self._dialect) -class PostgresConnection(ConnectionBackend): - def __init__(self, database: PostgresBackend, dialect: Dialect): +class AsyncpgConnection(ConnectionBackend): + def __init__(self, database: AsyncpgBackend, dialect: Dialect): self._database = database self._dialect = dialect self._connection: typing.Optional[asyncpg.connection.Connection] = None @@ -159,7 +146,7 @@ async def iterate( yield Record(row, result_columns, self._dialect, column_maps) def transaction(self) -> TransactionBackend: - return PostgresTransaction(connection=self) + return AsyncpgTransaction(connection=self) def _compile(self, query: ClauseElement) -> typing.Tuple[str, list, tuple]: compiled = query.compile( @@ -197,8 +184,8 @@ def raw_connection(self) -> asyncpg.connection.Connection: return self._connection -class PostgresTransaction(TransactionBackend): - def __init__(self, connection: PostgresConnection): +class AsyncpgTransaction(TransactionBackend): + def __init__(self, connection: AsyncpgConnection): self._connection = connection self._transaction: typing.Optional[asyncpg.transaction.Transaction] = None diff --git a/databases/backends/dialects/psycopg.py b/databases/backends/dialects/psycopg.py index 07bd1880..1caf49fe 100644 --- a/databases/backends/dialects/psycopg.py +++ b/databases/backends/dialects/psycopg.py @@ -43,4 +43,13 @@ class PGDialect_psycopg(PGDialect): execution_ctx_cls = PGExecutionContext_psycopg -dialect = PGDialect_psycopg +def get_dialect() -> PGDialect_psycopg: + dialect = PGDialect_psycopg(paramstyle="pyformat") + dialect.implicit_returning = True + dialect.supports_native_enum = True + dialect.supports_smallserial = True # 9.2+ + dialect._backslash_escapes = False + dialect.supports_sane_multi_rowcount = True # psycopg 2.0.9+ + dialect._has_native_hstore = True + dialect.supports_native_decimal = True + return dialect diff --git a/databases/backends/psycopg.py b/databases/backends/psycopg.py new file mode 100644 index 00000000..981742ce --- /dev/null +++ b/databases/backends/psycopg.py @@ -0,0 +1,118 @@ +import typing +from collections.abc import Sequence + +import psycopg_pool +from sqlalchemy.sql import ClauseElement + +from databases.backends.dialects.psycopg import get_dialect +from databases.core import DatabaseURL +from databases.interfaces import ( + ConnectionBackend, + DatabaseBackend, + TransactionBackend, +) + + +class PsycopgBackend(DatabaseBackend): + def __init__( + self, database_url: typing.Union[DatabaseURL, str], **options: typing.Any + ) -> None: + self._database_url = DatabaseURL(database_url) + self._options = options + self._dialect = get_dialect() + self._pool: typing.Optional[psycopg_pool.AsyncConnectionPool] = None + + async def connect(self) -> None: + if self._pool is not None: + return + + self._pool = psycopg_pool.AsyncConnectionPool( + self._database_url.url, open=False, **self._options) + await self._pool.open() + + async def disconnect(self) -> None: + if self._pool is None: + return + + await self._pool.close() + self._pool = None + + def connection(self) -> "PsycopgConnection": + return PsycopgConnection(self) + + +class PsycopgConnection(ConnectionBackend): + def __init__(self, database: PsycopgBackend) -> None: + self._database = database + + async def acquire(self) -> None: + if self._connection is not None: + return + + if self._database._pool is None: + raise RuntimeError("PsycopgBackend is not running") + + # TODO: Add configurable timeouts + self._connection = await self._database._pool.getconn() + + async def release(self) -> None: + if self._connection is None: + return + + await self._database._pool.putconn(self._connection) + self._connection = None + + async def fetch_all(self, query: ClauseElement) -> typing.List["Record"]: + raise NotImplementedError() # pragma: no cover + + async def fetch_one(self, query: ClauseElement) -> typing.Optional["Record"]: + raise NotImplementedError() # pragma: no cover + + async def fetch_val( + self, query: ClauseElement, column: typing.Any = 0 + ) -> typing.Any: + row = await self.fetch_one(query) + return None if row is None else row[column] + + async def execute(self, query: ClauseElement) -> typing.Any: + raise NotImplementedError() # pragma: no cover + + async def execute_many(self, queries: typing.List[ClauseElement]) -> None: + raise NotImplementedError() # pragma: no cover + + async def iterate( + self, query: ClauseElement + ) -> typing.AsyncGenerator[typing.Mapping, None]: + raise NotImplementedError() # pragma: no cover + # mypy needs async iterators to contain a `yield` + # https://github.com/python/mypy/issues/5385#issuecomment-407281656 + yield True # pragma: no cover + + def transaction(self) -> "TransactionBackend": + raise NotImplementedError() # pragma: no cover + + @property + def raw_connection(self) -> typing.Any: + raise NotImplementedError() # pragma: no cover + + +class PsycopgTransaction(TransactionBackend): + async def start( + self, is_root: bool, extra_options: typing.Dict[typing.Any, typing.Any] + ) -> None: + raise NotImplementedError() # pragma: no cover + + async def commit(self) -> None: + raise NotImplementedError() # pragma: no cover + + async def rollback(self) -> None: + raise NotImplementedError() # pragma: no cover + + +class Record(Sequence): + @property + def _mapping(self) -> typing.Mapping: + raise NotImplementedError() # pragma: no cover + + def __getitem__(self, key: typing.Any) -> typing.Any: + raise NotImplementedError() # pragma: no cover diff --git a/databases/core.py b/databases/core.py index d55dd3c8..cba06ced 100644 --- a/databases/core.py +++ b/databases/core.py @@ -43,12 +43,16 @@ class Database: SUPPORTED_BACKENDS = { - "postgresql": "databases.backends.postgres:PostgresBackend", + "postgres": "databases.backends.asyncpg:AsyncpgBackend", + "postgresql": "databases.backends.asyncpg:AsyncpgBackend", "postgresql+aiopg": "databases.backends.aiopg:AiopgBackend", - "postgres": "databases.backends.postgres:PostgresBackend", + "postgresql+asyncpg": "databases.backends.asyncpg:AsyncpgBackend", + "postgresql+psycopg": "databases.backends.psycopg:PsycopgBackend", "mysql": "databases.backends.mysql:MySQLBackend", + "mysql+aiomysql": "databases.backends.asyncmy:MySQLBackend", "mysql+asyncmy": "databases.backends.asyncmy:AsyncMyBackend", "sqlite": "databases.backends.sqlite:SQLiteBackend", + "sqlite+aiosqlite": "databases.backends.sqlite:SQLiteBackend", } _connection_map: "weakref.WeakKeyDictionary[asyncio.Task, 'Connection']" diff --git a/tests/test_connection_options.py b/tests/test_connection_options.py index 81ce2ac7..757393a4 100644 --- a/tests/test_connection_options.py +++ b/tests/test_connection_options.py @@ -6,7 +6,7 @@ import pytest from databases.backends.aiopg import AiopgBackend -from databases.backends.postgres import PostgresBackend +from databases.backends.asyncpg import AsyncpgBackend from databases.core import DatabaseURL from tests.test_databases import DATABASE_URLS, async_adapter @@ -19,7 +19,7 @@ def test_postgres_pool_size(): - backend = PostgresBackend("postgres://localhost/database?min_size=1&max_size=20") + backend = AsyncpgBackend("postgres://localhost/database?min_size=1&max_size=20") kwargs = backend._get_connection_kwargs() assert kwargs == {"min_size": 1, "max_size": 20} @@ -29,43 +29,43 @@ async def test_postgres_pool_size_connect(): for url in DATABASE_URLS: if DatabaseURL(url).dialect != "postgresql": continue - backend = PostgresBackend(url + "?min_size=1&max_size=20") + backend = AsyncpgBackend(url + "?min_size=1&max_size=20") await backend.connect() await backend.disconnect() def test_postgres_explicit_pool_size(): - backend = PostgresBackend("postgres://localhost/database", min_size=1, max_size=20) + backend = AsyncpgBackend("postgres://localhost/database", min_size=1, max_size=20) kwargs = backend._get_connection_kwargs() assert kwargs == {"min_size": 1, "max_size": 20} def test_postgres_ssl(): - backend = PostgresBackend("postgres://localhost/database?ssl=true") + backend = AsyncpgBackend("postgres://localhost/database?ssl=true") kwargs = backend._get_connection_kwargs() assert kwargs == {"ssl": True} def test_postgres_ssl_verify_full(): - backend = PostgresBackend("postgres://localhost/database?ssl=verify-full") + backend = AsyncpgBackend("postgres://localhost/database?ssl=verify-full") kwargs = backend._get_connection_kwargs() assert kwargs == {"ssl": "verify-full"} def test_postgres_explicit_ssl(): - backend = PostgresBackend("postgres://localhost/database", ssl=True) + backend = AsyncpgBackend("postgres://localhost/database", ssl=True) kwargs = backend._get_connection_kwargs() assert kwargs == {"ssl": True} def test_postgres_explicit_ssl_verify_full(): - backend = PostgresBackend("postgres://localhost/database", ssl="verify-full") + backend = AsyncpgBackend("postgres://localhost/database", ssl="verify-full") kwargs = backend._get_connection_kwargs() assert kwargs == {"ssl": "verify-full"} def test_postgres_no_extra_options(): - backend = PostgresBackend("postgres://localhost/database") + backend = AsyncpgBackend("postgres://localhost/database") kwargs = backend._get_connection_kwargs() assert kwargs == {} @@ -74,7 +74,7 @@ def test_postgres_password_as_callable(): def gen_password(): return "Foo" - backend = PostgresBackend( + backend = AsyncpgBackend( "postgres://:password@localhost/database", password=gen_password ) kwargs = backend._get_connection_kwargs() From a86edfaae53792d7eccceb0d860d5f0200faebb0 Mon Sep 17 00:00:00 2001 From: ansipunk Date: Sat, 2 Mar 2024 23:17:09 +0500 Subject: [PATCH 04/15] S01E01 --- databases/backends/asyncpg.py | 45 ++++------------------- databases/backends/dialects/psycopg.py | 30 ++++++++++++++- databases/backends/psycopg.py | 51 +++++++++++++++++--------- 3 files changed, 70 insertions(+), 56 deletions(-) diff --git a/databases/backends/asyncpg.py b/databases/backends/asyncpg.py index 98ac44ea..ff61fe26 100644 --- a/databases/backends/asyncpg.py +++ b/databases/backends/asyncpg.py @@ -4,11 +4,10 @@ import asyncpg from sqlalchemy.engine.interfaces import Dialect from sqlalchemy.sql import ClauseElement -from sqlalchemy.sql.ddl import DDLElement from databases.backends.common.records import Record, create_column_maps -from databases.backends.dialects.psycopg import get_dialect -from databases.core import LOG_EXTRA, DatabaseURL +from databases.backends.dialects.psycopg import compile_query, get_dialect +from databases.core import DatabaseURL from databases.interfaces import ( ConnectionBackend, DatabaseBackend, @@ -88,7 +87,7 @@ async def release(self) -> None: async def fetch_all(self, query: ClauseElement) -> typing.List[RecordInterface]: assert self._connection is not None, "Connection is not acquired" - query_str, args, result_columns = self._compile(query) + query_str, args, result_columns = compile_query(query, self._dialect) rows = await self._connection.fetch(query_str, *args) dialect = self._dialect column_maps = create_column_maps(result_columns) @@ -96,7 +95,7 @@ async def fetch_all(self, query: ClauseElement) -> typing.List[RecordInterface]: async def fetch_one(self, query: ClauseElement) -> typing.Optional[RecordInterface]: assert self._connection is not None, "Connection is not acquired" - query_str, args, result_columns = self._compile(query) + query_str, args, result_columns = compile_query(query, self._dialect) row = await self._connection.fetchrow(query_str, *args) if row is None: return None @@ -124,7 +123,7 @@ async def fetch_val( async def execute(self, query: ClauseElement) -> typing.Any: assert self._connection is not None, "Connection is not acquired" - query_str, args, _ = self._compile(query) + query_str, args, _ = compile_query(query, self._dialect) return await self._connection.fetchval(query_str, *args) async def execute_many(self, queries: typing.List[ClauseElement]) -> None: @@ -133,14 +132,14 @@ async def execute_many(self, queries: typing.List[ClauseElement]) -> None: # loop through multiple executes here, which should all end up # using the same prepared statement. for single_query in queries: - single_query, args, _ = self._compile(single_query) + single_query, args, _ = compile_query(single_query, self._dialect) await self._connection.execute(single_query, *args) async def iterate( self, query: ClauseElement ) -> typing.AsyncGenerator[typing.Any, None]: assert self._connection is not None, "Connection is not acquired" - query_str, args, result_columns = self._compile(query) + query_str, args, result_columns = compile_query(query, self._dialect) column_maps = create_column_maps(result_columns) async for row in self._connection.cursor(query_str, *args): yield Record(row, result_columns, self._dialect, column_maps) @@ -148,36 +147,6 @@ async def iterate( def transaction(self) -> TransactionBackend: return AsyncpgTransaction(connection=self) - def _compile(self, query: ClauseElement) -> typing.Tuple[str, list, tuple]: - compiled = query.compile( - dialect=self._dialect, compile_kwargs={"render_postcompile": True} - ) - - if not isinstance(query, DDLElement): - compiled_params = sorted(compiled.params.items()) - - mapping = { - key: "$" + str(i) for i, (key, _) in enumerate(compiled_params, start=1) - } - compiled_query = compiled.string % mapping - - processors = compiled._bind_processors - args = [ - processors[key](val) if key in processors else val - for key, val in compiled_params - ] - result_map = compiled._result_columns - else: - compiled_query = compiled.string - args = [] - result_map = None - - query_message = compiled_query.replace(" \n", " ").replace("\n", " ") - logger.debug( - "Query: %s Args: %s", query_message, repr(tuple(args)), extra=LOG_EXTRA - ) - return compiled_query, args, result_map - @property def raw_connection(self) -> asyncpg.connection.Connection: assert self._connection is not None, "Connection is not acquired" diff --git a/databases/backends/dialects/psycopg.py b/databases/backends/dialects/psycopg.py index 1caf49fe..cfce052e 100644 --- a/databases/backends/dialects/psycopg.py +++ b/databases/backends/dialects/psycopg.py @@ -10,6 +10,9 @@ from sqlalchemy import types, util from sqlalchemy.dialects.postgresql.base import PGDialect, PGExecutionContext from sqlalchemy.engine import processors +from sqlalchemy.engine.interfaces import Dialect +from sqlalchemy.sql import ClauseElement +from sqlalchemy.sql.ddl import DDLElement from sqlalchemy.types import Float, Numeric @@ -43,7 +46,7 @@ class PGDialect_psycopg(PGDialect): execution_ctx_cls = PGExecutionContext_psycopg -def get_dialect() -> PGDialect_psycopg: +def get_dialect() -> Dialect: dialect = PGDialect_psycopg(paramstyle="pyformat") dialect.implicit_returning = True dialect.supports_native_enum = True @@ -53,3 +56,28 @@ def get_dialect() -> PGDialect_psycopg: dialect._has_native_hstore = True dialect.supports_native_decimal = True return dialect + + +def compile_query(query: ClauseElement, dialect: Dialect) -> typing.Tuple[str, list, tuple]: + compiled = query.compile(dialect=dialect, compile_kwargs={"render_postcompile": True}) + + if not isinstance(query, DDLElement): + compiled_params = sorted(compiled.params.items()) + + mapping = { + key: "$" + str(i) for i, (key, _) in enumerate(compiled_params, start=1) + } + compiled_query = compiled.string % mapping + + processors = compiled._bind_processors + args = [ + processors[key](val) if key in processors else val + for key, val in compiled_params + ] + result_map = compiled._result_columns + else: + compiled_query = compiled.string + args = [] + result_map = None + + return compiled_query, args, result_map diff --git a/databases/backends/psycopg.py b/databases/backends/psycopg.py index 981742ce..a047a85c 100644 --- a/databases/backends/psycopg.py +++ b/databases/backends/psycopg.py @@ -1,26 +1,36 @@ import typing -from collections.abc import Sequence +import psycopg import psycopg_pool +from sqlalchemy.engine.interfaces import Dialect from sqlalchemy.sql import ClauseElement -from databases.backends.dialects.psycopg import get_dialect +from databases.backends.common.records import Record, create_column_maps +from databases.backends.dialects.psycopg import compile_query, get_dialect from databases.core import DatabaseURL from databases.interfaces import ( ConnectionBackend, DatabaseBackend, + Record as RecordInterface, TransactionBackend, ) class PsycopgBackend(DatabaseBackend): + _database_url: DatabaseURL + _options: typing.Dict[str, typing.Any] + _dialect: Dialect + _pool: typing.Optional[psycopg_pool.AsyncConnectionPool] + def __init__( - self, database_url: typing.Union[DatabaseURL, str], **options: typing.Any + self, + database_url: typing.Union[DatabaseURL, str], + **options: typing.Dict[str, typing.Any], ) -> None: self._database_url = DatabaseURL(database_url) self._options = options self._dialect = get_dialect() - self._pool: typing.Optional[psycopg_pool.AsyncConnectionPool] = None + self._pool = None async def connect(self) -> None: if self._pool is not None: @@ -28,22 +38,31 @@ async def connect(self) -> None: self._pool = psycopg_pool.AsyncConnectionPool( self._database_url.url, open=False, **self._options) + + # TODO: Add configurable timeouts await self._pool.open() async def disconnect(self) -> None: if self._pool is None: return + # TODO: Add configurable timeouts await self._pool.close() self._pool = None def connection(self) -> "PsycopgConnection": - return PsycopgConnection(self) + return PsycopgConnection(self, self._dialect) class PsycopgConnection(ConnectionBackend): - def __init__(self, database: PsycopgBackend) -> None: + _database: PsycopgBackend + _dialect: Dialect + _connection: typing.Optional[psycopg.AsyncConnection] + + def __init__(self, database: PsycopgBackend, dialect: Dialect) -> None: self._database = database + self._dialect = dialect + self._connection = None async def acquire(self) -> None: if self._connection is not None: @@ -62,10 +81,17 @@ async def release(self) -> None: await self._database._pool.putconn(self._connection) self._connection = None - async def fetch_all(self, query: ClauseElement) -> typing.List["Record"]: + async def fetch_all(self, query: ClauseElement) -> typing.List[RecordInterface]: + if self._connection is None: + raise RuntimeError("Connection is not acquired") + + query_str, args, result_columns = compile_query(query, self._dialect) + rows = await self._connection.fetch(query_str, *args) + column_maps = create_column_maps(result_columns) + return [Record(row, result_columns, self._dialect, column_maps) for row in rows] raise NotImplementedError() # pragma: no cover - async def fetch_one(self, query: ClauseElement) -> typing.Optional["Record"]: + async def fetch_one(self, query: ClauseElement) -> typing.Optional[RecordInterface]: raise NotImplementedError() # pragma: no cover async def fetch_val( @@ -107,12 +133,3 @@ async def commit(self) -> None: async def rollback(self) -> None: raise NotImplementedError() # pragma: no cover - - -class Record(Sequence): - @property - def _mapping(self) -> typing.Mapping: - raise NotImplementedError() # pragma: no cover - - def __getitem__(self, key: typing.Any) -> typing.Any: - raise NotImplementedError() # pragma: no cover From 6d9256693fda84f21c210ac736d9adda058ce86e Mon Sep 17 00:00:00 2001 From: ansipunk Date: Sun, 3 Mar 2024 02:15:22 +0500 Subject: [PATCH 05/15] S01E02 --- databases/backends/dialects/psycopg.py | 8 +++- databases/backends/psycopg.py | 65 +++++++++++++++++++++----- requirements.txt | 1 + setup.py | 2 +- 4 files changed, 62 insertions(+), 14 deletions(-) diff --git a/databases/backends/dialects/psycopg.py b/databases/backends/dialects/psycopg.py index cfce052e..80bf5b76 100644 --- a/databases/backends/dialects/psycopg.py +++ b/databases/backends/dialects/psycopg.py @@ -58,8 +58,12 @@ def get_dialect() -> Dialect: return dialect -def compile_query(query: ClauseElement, dialect: Dialect) -> typing.Tuple[str, list, tuple]: - compiled = query.compile(dialect=dialect, compile_kwargs={"render_postcompile": True}) +def compile_query( + query: ClauseElement, dialect: Dialect +) -> typing.Tuple[str, list, tuple]: + compiled = query.compile( + dialect=dialect, compile_kwargs={"render_postcompile": True} + ) if not isinstance(query, DDLElement): compiled_params = sorted(compiled.params.items()) diff --git a/databases/backends/psycopg.py b/databases/backends/psycopg.py index a047a85c..0a31b7a7 100644 --- a/databases/backends/psycopg.py +++ b/databases/backends/psycopg.py @@ -37,7 +37,8 @@ async def connect(self) -> None: return self._pool = psycopg_pool.AsyncConnectionPool( - self._database_url.url, open=False, **self._options) + self._database_url._url, open=False, **self._options + ) # TODO: Add configurable timeouts await self._pool.open() @@ -86,13 +87,33 @@ async def fetch_all(self, query: ClauseElement) -> typing.List[RecordInterface]: raise RuntimeError("Connection is not acquired") query_str, args, result_columns = compile_query(query, self._dialect) - rows = await self._connection.fetch(query_str, *args) + + async with self._connection.cursor() as cursor: + await cursor.execute(query_str, args) + rows = await cursor.fetchall() + column_maps = create_column_maps(result_columns) return [Record(row, result_columns, self._dialect, column_maps) for row in rows] - raise NotImplementedError() # pragma: no cover async def fetch_one(self, query: ClauseElement) -> typing.Optional[RecordInterface]: - raise NotImplementedError() # pragma: no cover + if self._connection is None: + raise RuntimeError("Connection is not acquired") + + query_str, args, result_columns = compile_query(query, self._dialect) + + async with self._connection.cursor() as cursor: + await cursor.execute(query_str, args) + row = await cursor.fetchone() + + if row is None: + return None + + return Record( + row, + result_columns, + self._dialect, + create_column_maps(result_columns), + ) async def fetch_val( self, query: ClauseElement, column: typing.Any = 0 @@ -101,25 +122,47 @@ async def fetch_val( return None if row is None else row[column] async def execute(self, query: ClauseElement) -> typing.Any: - raise NotImplementedError() # pragma: no cover + if self._connection is None: + raise RuntimeError("Connection is not acquired") + + query_str, args, _ = compile_query(query, self._dialect) + + async with self._connection.cursor() as cursor: + await cursor.execute(query_str, args) async def execute_many(self, queries: typing.List[ClauseElement]) -> None: - raise NotImplementedError() # pragma: no cover + # TODO: Find a way to use psycopg's executemany + for query in queries: + await self.execute(query) async def iterate( self, query: ClauseElement ) -> typing.AsyncGenerator[typing.Mapping, None]: - raise NotImplementedError() # pragma: no cover - # mypy needs async iterators to contain a `yield` - # https://github.com/python/mypy/issues/5385#issuecomment-407281656 - yield True # pragma: no cover + if self._connection is None: + raise RuntimeError("Connection is not acquired") + + query_str, args, result_columns = compile_query(query, self._dialect) + column_maps = create_column_maps(result_columns) + + async with self._connection.cursor() as cursor: + await cursor.execute(query_str, args) + + while True: + row = await cursor.fetchone() + + if row is None: + break + + yield Record(row, result_columns, self._dialect, column_maps) def transaction(self) -> "TransactionBackend": raise NotImplementedError() # pragma: no cover @property def raw_connection(self) -> typing.Any: - raise NotImplementedError() # pragma: no cover + if self._connection is None: + raise RuntimeError("Connection is not acquired") + return self._connection class PsycopgTransaction(TransactionBackend): diff --git a/requirements.txt b/requirements.txt index be086f77..785c998b 100644 --- a/requirements.txt +++ b/requirements.txt @@ -8,6 +8,7 @@ aiosqlite==0.20.0 asyncpg==0.29.0 psycopg==3.1.18 psycopg-binary==3.1.18 +psycopg-pool==3.2.1 # Sync database drivers for standard tooling around setup/teardown/migrations. psycopg==3.1.18 diff --git a/setup.py b/setup.py index 9218bc1f..4ad4fc5b 100644 --- a/setup.py +++ b/setup.py @@ -55,7 +55,7 @@ def get_packages(package): "postgresql": ["asyncpg"], "aiopg": ["aiopg"], "asyncpg": ["asyncpg"], - "psycopg3": ["psycopg"], + "psycopg3": ["psycopg", "psycopg-pool"], "sqlite": ["aiosqlite"], "aiosqlite": ["aiosqlite"], }, From 9d782217bb564eb3d9da0dc3f2606ea161357168 Mon Sep 17 00:00:00 2001 From: ansipunk Date: Sun, 3 Mar 2024 02:56:16 +0500 Subject: [PATCH 06/15] S01E03 --- databases/backends/psycopg.py | 31 +++++++++++++++++++++++++++---- databases/core.py | 2 +- 2 files changed, 28 insertions(+), 5 deletions(-) diff --git a/databases/backends/psycopg.py b/databases/backends/psycopg.py index 0a31b7a7..db9ec2fe 100644 --- a/databases/backends/psycopg.py +++ b/databases/backends/psycopg.py @@ -156,7 +156,7 @@ async def iterate( yield Record(row, result_columns, self._dialect, column_maps) def transaction(self) -> "TransactionBackend": - raise NotImplementedError() # pragma: no cover + return PsycopgTransaction(connection=self) @property def raw_connection(self) -> typing.Any: @@ -166,13 +166,36 @@ def raw_connection(self) -> typing.Any: class PsycopgTransaction(TransactionBackend): + _connecttion: PsycopgConnection + _transaction: typing.Optional[psycopg.AsyncTransaction] + + def __init__(self, connection: PsycopgConnection): + self._connection = connection + self._transaction: typing.Optional[psycopg.AsyncTransaction] = None + async def start( self, is_root: bool, extra_options: typing.Dict[typing.Any, typing.Any] ) -> None: - raise NotImplementedError() # pragma: no cover + if self._connection._connection is None: + raise RuntimeError("Connection is not acquired") + + transaction = psycopg.AsyncTransaction( + self._connection._connection, **extra_options + ) + async with transaction._conn.lock: + await transaction._conn.wait(transaction._enter_gen()) + self._transaction = transaction async def commit(self) -> None: - raise NotImplementedError() # pragma: no cover + if self._transaction is None: + raise RuntimeError("Transaction was not started") + + async with self._transaction._conn.lock: + await self._transaction._conn.wait(self._transaction._commit_gen()) async def rollback(self) -> None: - raise NotImplementedError() # pragma: no cover + if self._transaction is None: + raise RuntimeError("Transaction was not started") + + async with self._transaction._conn.lock: + await self._transaction._conn.wait(self._transaction._rollback_gen(None)) diff --git a/databases/core.py b/databases/core.py index cba06ced..c09f8814 100644 --- a/databases/core.py +++ b/databases/core.py @@ -44,10 +44,10 @@ class Database: SUPPORTED_BACKENDS = { "postgres": "databases.backends.asyncpg:AsyncpgBackend", - "postgresql": "databases.backends.asyncpg:AsyncpgBackend", "postgresql+aiopg": "databases.backends.aiopg:AiopgBackend", "postgresql+asyncpg": "databases.backends.asyncpg:AsyncpgBackend", "postgresql+psycopg": "databases.backends.psycopg:PsycopgBackend", + "postgresql": "databases.backends.psycopg:PsycopgBackend", "mysql": "databases.backends.mysql:MySQLBackend", "mysql+aiomysql": "databases.backends.asyncmy:MySQLBackend", "mysql+asyncmy": "databases.backends.asyncmy:AsyncMyBackend", From d7ff8e85ccd82f57b0882a3f095da2b52a0f8a26 Mon Sep 17 00:00:00 2001 From: ansipunk Date: Sun, 3 Mar 2024 12:23:27 +0500 Subject: [PATCH 07/15] S01E04 --- databases/backends/psycopg.py | 23 +++++++++++++++++------ tests/test_databases.py | 1 + 2 files changed, 18 insertions(+), 6 deletions(-) diff --git a/databases/backends/psycopg.py b/databases/backends/psycopg.py index db9ec2fe..885e8336 100644 --- a/databases/backends/psycopg.py +++ b/databases/backends/psycopg.py @@ -3,10 +3,10 @@ import psycopg import psycopg_pool from sqlalchemy.engine.interfaces import Dialect +from sqlalchemy.dialects.postgresql.psycopg import PGDialect_psycopg from sqlalchemy.sql import ClauseElement from databases.backends.common.records import Record, create_column_maps -from databases.backends.dialects.psycopg import compile_query, get_dialect from databases.core import DatabaseURL from databases.interfaces import ( ConnectionBackend, @@ -29,7 +29,7 @@ def __init__( ) -> None: self._database_url = DatabaseURL(database_url) self._options = options - self._dialect = get_dialect() + self._dialect = PGDialect_psycopg() self._pool = None async def connect(self) -> None: @@ -86,7 +86,7 @@ async def fetch_all(self, query: ClauseElement) -> typing.List[RecordInterface]: if self._connection is None: raise RuntimeError("Connection is not acquired") - query_str, args, result_columns = compile_query(query, self._dialect) + query_str, args, result_columns = self._compile(query) async with self._connection.cursor() as cursor: await cursor.execute(query_str, args) @@ -99,7 +99,7 @@ async def fetch_one(self, query: ClauseElement) -> typing.Optional[RecordInterfa if self._connection is None: raise RuntimeError("Connection is not acquired") - query_str, args, result_columns = compile_query(query, self._dialect) + query_str, args, result_columns = self._compile(query) async with self._connection.cursor() as cursor: await cursor.execute(query_str, args) @@ -125,7 +125,7 @@ async def execute(self, query: ClauseElement) -> typing.Any: if self._connection is None: raise RuntimeError("Connection is not acquired") - query_str, args, _ = compile_query(query, self._dialect) + query_str, args, _ = self._compile(query) async with self._connection.cursor() as cursor: await cursor.execute(query_str, args) @@ -141,7 +141,7 @@ async def iterate( if self._connection is None: raise RuntimeError("Connection is not acquired") - query_str, args, result_columns = compile_query(query, self._dialect) + query_str, args, result_columns = self._compile(query) column_maps = create_column_maps(result_columns) async with self._connection.cursor() as cursor: @@ -164,6 +164,17 @@ def raw_connection(self) -> typing.Any: raise RuntimeError("Connection is not acquired") return self._connection + def _compile( + self, query: ClauseElement, + ) -> typing.Tuple[str, typing.Mapping[str, typing.Any], tuple]: + compiled = query.compile(dialect=self._dialect) + + compiled_query = compiled.string + params = compiled.params + result_map = compiled._result_columns + + return compiled_query, params, result_map + class PsycopgTransaction(TransactionBackend): _connecttion: PsycopgConnection diff --git a/tests/test_databases.py b/tests/test_databases.py index 90771b1d..5c0b61d1 100644 --- a/tests/test_databases.py +++ b/tests/test_databases.py @@ -1338,6 +1338,7 @@ async def test_queries_with_expose_backend_connection(database_url): "mysql+asyncmy", "mysql+aiomysql", "postgresql+aiopg", + "postgresql+psycopg", ]: insert_query = "INSERT INTO notes (text, completed) VALUES (%s, %s)" else: From ddd8aaa3efbf960b33e1bdeb7d1b72a915d8bb41 Mon Sep 17 00:00:00 2001 From: ansipunk Date: Sun, 3 Mar 2024 12:43:43 +0500 Subject: [PATCH 08/15] S01E05 --- databases/backends/common/records.py | 1 - databases/backends/psycopg.py | 13 +++++++------ 2 files changed, 7 insertions(+), 7 deletions(-) diff --git a/databases/backends/common/records.py b/databases/backends/common/records.py index f57de839..65032fc8 100644 --- a/databases/backends/common/records.py +++ b/databases/backends/common/records.py @@ -4,7 +4,6 @@ from sqlalchemy.engine.row import Row as SQLRow from sqlalchemy.sql.compiler import _CompileLabel from sqlalchemy.sql.schema import Column -from sqlalchemy.sql.sqltypes import JSON from sqlalchemy.types import TypeEngine from databases.interfaces import Record as RecordInterface diff --git a/databases/backends/psycopg.py b/databases/backends/psycopg.py index 885e8336..3392a54b 100644 --- a/databases/backends/psycopg.py +++ b/databases/backends/psycopg.py @@ -2,6 +2,7 @@ import psycopg import psycopg_pool +from psycopg.rows import namedtuple_row from sqlalchemy.engine.interfaces import Dialect from sqlalchemy.dialects.postgresql.psycopg import PGDialect_psycopg from sqlalchemy.sql import ClauseElement @@ -58,12 +59,11 @@ def connection(self) -> "PsycopgConnection": class PsycopgConnection(ConnectionBackend): _database: PsycopgBackend _dialect: Dialect - _connection: typing.Optional[psycopg.AsyncConnection] + _connection: typing.Optional[psycopg.AsyncConnection] = None def __init__(self, database: PsycopgBackend, dialect: Dialect) -> None: self._database = database self._dialect = dialect - self._connection = None async def acquire(self) -> None: if self._connection is not None: @@ -74,6 +74,7 @@ async def acquire(self) -> None: # TODO: Add configurable timeouts self._connection = await self._database._pool.getconn() + await self._connection.set_autocommit(True) async def release(self) -> None: if self._connection is None: @@ -88,7 +89,7 @@ async def fetch_all(self, query: ClauseElement) -> typing.List[RecordInterface]: query_str, args, result_columns = self._compile(query) - async with self._connection.cursor() as cursor: + async with self._connection.cursor(row_factory=namedtuple_row) as cursor: await cursor.execute(query_str, args) rows = await cursor.fetchall() @@ -101,7 +102,7 @@ async def fetch_one(self, query: ClauseElement) -> typing.Optional[RecordInterfa query_str, args, result_columns = self._compile(query) - async with self._connection.cursor() as cursor: + async with self._connection.cursor(row_factory=namedtuple_row) as cursor: await cursor.execute(query_str, args) row = await cursor.fetchone() @@ -127,7 +128,7 @@ async def execute(self, query: ClauseElement) -> typing.Any: query_str, args, _ = self._compile(query) - async with self._connection.cursor() as cursor: + async with self._connection.cursor(row_factory=namedtuple_row) as cursor: await cursor.execute(query_str, args) async def execute_many(self, queries: typing.List[ClauseElement]) -> None: @@ -144,7 +145,7 @@ async def iterate( query_str, args, result_columns = self._compile(query) column_maps = create_column_maps(result_columns) - async with self._connection.cursor() as cursor: + async with self._connection.cursor(row_factory=namedtuple_row) as cursor: await cursor.execute(query_str, args) while True: From e08eb4f4fa78e72187d5d5c610ef47e07601a3c1 Mon Sep 17 00:00:00 2001 From: ansipunk Date: Sun, 3 Mar 2024 14:03:56 +0500 Subject: [PATCH 09/15] S01E06 --- databases/backends/common/records.py | 4 ++++ databases/backends/psycopg.py | 5 ++++- 2 files changed, 8 insertions(+), 1 deletion(-) diff --git a/databases/backends/common/records.py b/databases/backends/common/records.py index 65032fc8..5c7ae25c 100644 --- a/databases/backends/common/records.py +++ b/databases/backends/common/records.py @@ -1,4 +1,5 @@ import typing +from collections import namedtuple from sqlalchemy.engine.interfaces import Dialect from sqlalchemy.engine.row import Row as SQLRow @@ -39,6 +40,9 @@ def __init__( @property def _mapping(self) -> typing.Mapping: + if hasattr(self._row, "_asdict"): + return self._row._asdict() + return self._row def keys(self) -> typing.KeysView: diff --git a/databases/backends/psycopg.py b/databases/backends/psycopg.py index 3392a54b..302f94e5 100644 --- a/databases/backends/psycopg.py +++ b/databases/backends/psycopg.py @@ -168,7 +168,10 @@ def raw_connection(self) -> typing.Any: def _compile( self, query: ClauseElement, ) -> typing.Tuple[str, typing.Mapping[str, typing.Any], tuple]: - compiled = query.compile(dialect=self._dialect) + compiled = query.compile( + dialect=self._dialect, + compile_kwargs={"render_postcompile": True}, + ) compiled_query = compiled.string params = compiled.params From 1c58a73778c10500bbad4f7c0334cb44c30d650c Mon Sep 17 00:00:00 2001 From: ansipunk Date: Sun, 3 Mar 2024 14:24:00 +0500 Subject: [PATCH 10/15] S01E07 --- databases/backends/common/records.py | 6 ++++-- databases/backends/psycopg.py | 28 ++++++++++++++++++++++++---- 2 files changed, 28 insertions(+), 6 deletions(-) diff --git a/databases/backends/common/records.py b/databases/backends/common/records.py index 5c7ae25c..5f6ba263 100644 --- a/databases/backends/common/records.py +++ b/databases/backends/common/records.py @@ -1,5 +1,4 @@ import typing -from collections import namedtuple from sqlalchemy.engine.interfaces import Dialect from sqlalchemy.engine.row import Row as SQLRow @@ -53,7 +52,10 @@ def values(self) -> typing.ValuesView: def __getitem__(self, key: typing.Any) -> typing.Any: if len(self._column_map) == 0: - return self._row[key] + try: + return self._row[key] + except TypeError: + return self._mapping[key] elif isinstance(key, Column): idx, datatype = self._column_map_full[str(key)] elif isinstance(key, int): diff --git a/databases/backends/psycopg.py b/databases/backends/psycopg.py index 302f94e5..bb623cfa 100644 --- a/databases/backends/psycopg.py +++ b/databases/backends/psycopg.py @@ -3,9 +3,10 @@ import psycopg import psycopg_pool from psycopg.rows import namedtuple_row -from sqlalchemy.engine.interfaces import Dialect from sqlalchemy.dialects.postgresql.psycopg import PGDialect_psycopg +from sqlalchemy.engine.interfaces import Dialect from sqlalchemy.sql import ClauseElement +from sqlalchemy.sql.schema import Column from databases.backends.common.records import Record, create_column_maps from databases.core import DatabaseURL @@ -31,6 +32,7 @@ def __init__( self._database_url = DatabaseURL(database_url) self._options = options self._dialect = PGDialect_psycopg() + self._dialect.implicit_returning = True self._pool = None async def connect(self) -> None: @@ -94,7 +96,7 @@ async def fetch_all(self, query: ClauseElement) -> typing.List[RecordInterface]: rows = await cursor.fetchall() column_maps = create_column_maps(result_columns) - return [Record(row, result_columns, self._dialect, column_maps) for row in rows] + return [PsycopgRecord(row, result_columns, self._dialect, column_maps) for row in rows] async def fetch_one(self, query: ClauseElement) -> typing.Optional[RecordInterface]: if self._connection is None: @@ -109,7 +111,7 @@ async def fetch_one(self, query: ClauseElement) -> typing.Optional[RecordInterfa if row is None: return None - return Record( + return PsycopgRecord( row, result_columns, self._dialect, @@ -154,7 +156,7 @@ async def iterate( if row is None: break - yield Record(row, result_columns, self._dialect, column_maps) + yield PsycopgRecord(row, result_columns, self._dialect, column_maps) def transaction(self) -> "TransactionBackend": return PsycopgTransaction(connection=self) @@ -214,3 +216,21 @@ async def rollback(self) -> None: async with self._transaction._conn.lock: await self._transaction._conn.wait(self._transaction._rollback_gen(None)) + + +class PsycopgRecord(Record): + @property + def _mapping(self) -> typing.Mapping: + return self._row._asdict() + + def __getitem__(self, key: typing.Any) -> typing.Any: + if len(self._column_map) == 0: + return self._mapping[key] + elif isinstance(key, Column): + idx, datatype = self._column_map_full[str(key)] + elif isinstance(key, int): + idx, datatype = self._column_map_int[key] + else: + idx, datatype = self._column_map[key] + + return self._row[idx] From 3f26f76056d0da62600a2fca5f3c23e6ee8bbae5 Mon Sep 17 00:00:00 2001 From: ansipunk Date: Sun, 3 Mar 2024 14:39:47 +0500 Subject: [PATCH 11/15] S01E08 --- databases/backends/asyncpg.py | 66 +++++++++++++++++++++----- databases/backends/dialects/psycopg.py | 43 +---------------- databases/backends/psycopg.py | 5 +- databases/core.py | 2 +- 4 files changed, 58 insertions(+), 58 deletions(-) diff --git a/databases/backends/asyncpg.py b/databases/backends/asyncpg.py index ff61fe26..124f7af1 100644 --- a/databases/backends/asyncpg.py +++ b/databases/backends/asyncpg.py @@ -4,10 +4,11 @@ import asyncpg from sqlalchemy.engine.interfaces import Dialect from sqlalchemy.sql import ClauseElement +from sqlalchemy.sql.ddl import DDLElement from databases.backends.common.records import Record, create_column_maps -from databases.backends.dialects.psycopg import compile_query, get_dialect -from databases.core import DatabaseURL +from databases.backends.dialects.psycopg import dialect as psycopg_dialect +from databases.core import LOG_EXTRA, DatabaseURL from databases.interfaces import ( ConnectionBackend, DatabaseBackend, @@ -24,9 +25,20 @@ def __init__( ) -> None: self._database_url = DatabaseURL(database_url) self._options = options - self._dialect = get_dialect() + self._dialect = self._get_dialect() self._pool = None + def _get_dialect(self) -> Dialect: + dialect = psycopg_dialect(paramstyle="pyformat") + dialect.implicit_returning = True + dialect.supports_native_enum = True + dialect.supports_smallserial = True # 9.2+ + dialect._backslash_escapes = False + dialect.supports_sane_multi_rowcount = True # psycopg 2.0.9+ + dialect._has_native_hstore = True + dialect.supports_native_decimal = True + return dialect + def _get_connection_kwargs(self) -> dict: url_options = self._database_url.options @@ -87,7 +99,7 @@ async def release(self) -> None: async def fetch_all(self, query: ClauseElement) -> typing.List[RecordInterface]: assert self._connection is not None, "Connection is not acquired" - query_str, args, result_columns = compile_query(query, self._dialect) + query_str, args, result_columns = self._compile(query) rows = await self._connection.fetch(query_str, *args) dialect = self._dialect column_maps = create_column_maps(result_columns) @@ -95,7 +107,7 @@ async def fetch_all(self, query: ClauseElement) -> typing.List[RecordInterface]: async def fetch_one(self, query: ClauseElement) -> typing.Optional[RecordInterface]: assert self._connection is not None, "Connection is not acquired" - query_str, args, result_columns = compile_query(query, self._dialect) + query_str, args, result_columns = self._compile(query) row = await self._connection.fetchrow(query_str, *args) if row is None: return None @@ -123,7 +135,7 @@ async def fetch_val( async def execute(self, query: ClauseElement) -> typing.Any: assert self._connection is not None, "Connection is not acquired" - query_str, args, _ = compile_query(query, self._dialect) + query_str, args, _ = self._compile(query) return await self._connection.fetchval(query_str, *args) async def execute_many(self, queries: typing.List[ClauseElement]) -> None: @@ -132,14 +144,14 @@ async def execute_many(self, queries: typing.List[ClauseElement]) -> None: # loop through multiple executes here, which should all end up # using the same prepared statement. for single_query in queries: - single_query, args, _ = compile_query(single_query, self._dialect) + single_query, args, _ = self._compile(single_query) await self._connection.execute(single_query, *args) async def iterate( self, query: ClauseElement ) -> typing.AsyncGenerator[typing.Any, None]: assert self._connection is not None, "Connection is not acquired" - query_str, args, result_columns = compile_query(query, self._dialect) + query_str, args, result_columns = self._compile(query) column_maps = create_column_maps(result_columns) async for row in self._connection.cursor(query_str, *args): yield Record(row, result_columns, self._dialect, column_maps) @@ -147,10 +159,40 @@ async def iterate( def transaction(self) -> TransactionBackend: return AsyncpgTransaction(connection=self) - @property - def raw_connection(self) -> asyncpg.connection.Connection: - assert self._connection is not None, "Connection is not acquired" - return self._connection + def _compile(self, query: ClauseElement) -> typing.Tuple[str, list, tuple]: + compiled = query.compile( + dialect=self._dialect, compile_kwargs={"render_postcompile": True} + ) + + if not isinstance(query, DDLElement): + compiled_params = sorted(compiled.params.items()) + + mapping = { + key: "$" + str(i) for i, (key, _) in enumerate(compiled_params, start=1) + } + compiled_query = compiled.string % mapping + + processors = compiled._bind_processors + args = [ + processors[key](val) if key in processors else val + for key, val in compiled_params + ] + result_map = compiled._result_columns + else: + compiled_query = compiled.string + args = [] + result_map = None + + query_message = compiled_query.replace(" \n", " ").replace("\n", " ") + logger.debug( + "Query: %s Args: %s", query_message, repr(tuple(args)), extra=LOG_EXTRA + ) + return compiled_query, args, result_map + + @property + def raw_connection(self) -> asyncpg.connection.Connection: + assert self._connection is not None, "Connection is not acquired" + return self._connection class AsyncpgTransaction(TransactionBackend): diff --git a/databases/backends/dialects/psycopg.py b/databases/backends/dialects/psycopg.py index 80bf5b76..07bd1880 100644 --- a/databases/backends/dialects/psycopg.py +++ b/databases/backends/dialects/psycopg.py @@ -10,9 +10,6 @@ from sqlalchemy import types, util from sqlalchemy.dialects.postgresql.base import PGDialect, PGExecutionContext from sqlalchemy.engine import processors -from sqlalchemy.engine.interfaces import Dialect -from sqlalchemy.sql import ClauseElement -from sqlalchemy.sql.ddl import DDLElement from sqlalchemy.types import Float, Numeric @@ -46,42 +43,4 @@ class PGDialect_psycopg(PGDialect): execution_ctx_cls = PGExecutionContext_psycopg -def get_dialect() -> Dialect: - dialect = PGDialect_psycopg(paramstyle="pyformat") - dialect.implicit_returning = True - dialect.supports_native_enum = True - dialect.supports_smallserial = True # 9.2+ - dialect._backslash_escapes = False - dialect.supports_sane_multi_rowcount = True # psycopg 2.0.9+ - dialect._has_native_hstore = True - dialect.supports_native_decimal = True - return dialect - - -def compile_query( - query: ClauseElement, dialect: Dialect -) -> typing.Tuple[str, list, tuple]: - compiled = query.compile( - dialect=dialect, compile_kwargs={"render_postcompile": True} - ) - - if not isinstance(query, DDLElement): - compiled_params = sorted(compiled.params.items()) - - mapping = { - key: "$" + str(i) for i, (key, _) in enumerate(compiled_params, start=1) - } - compiled_query = compiled.string % mapping - - processors = compiled._bind_processors - args = [ - processors[key](val) if key in processors else val - for key, val in compiled_params - ] - result_map = compiled._result_columns - else: - compiled_query = compiled.string - args = [] - result_map = None - - return compiled_query, args, result_map +dialect = PGDialect_psycopg diff --git a/databases/backends/psycopg.py b/databases/backends/psycopg.py index bb623cfa..f83f4917 100644 --- a/databases/backends/psycopg.py +++ b/databases/backends/psycopg.py @@ -39,9 +39,8 @@ async def connect(self) -> None: if self._pool is not None: return - self._pool = psycopg_pool.AsyncConnectionPool( - self._database_url._url, open=False, **self._options - ) + url = self._database_url._url.replace("postgresql+psycopg", "postgresql") + self._pool = psycopg_pool.AsyncConnectionPool(url, open=False, **self._options) # TODO: Add configurable timeouts await self._pool.open() diff --git a/databases/core.py b/databases/core.py index c09f8814..cba06ced 100644 --- a/databases/core.py +++ b/databases/core.py @@ -44,10 +44,10 @@ class Database: SUPPORTED_BACKENDS = { "postgres": "databases.backends.asyncpg:AsyncpgBackend", + "postgresql": "databases.backends.asyncpg:AsyncpgBackend", "postgresql+aiopg": "databases.backends.aiopg:AiopgBackend", "postgresql+asyncpg": "databases.backends.asyncpg:AsyncpgBackend", "postgresql+psycopg": "databases.backends.psycopg:PsycopgBackend", - "postgresql": "databases.backends.psycopg:PsycopgBackend", "mysql": "databases.backends.mysql:MySQLBackend", "mysql+aiomysql": "databases.backends.asyncmy:MySQLBackend", "mysql+asyncmy": "databases.backends.asyncmy:AsyncMyBackend", From cdbf97f53bb13a58c06e6f444a1abacf474785be Mon Sep 17 00:00:00 2001 From: ansipunk Date: Sun, 3 Mar 2024 15:33:39 +0500 Subject: [PATCH 12/15] S01E09 --- databases/backends/aiopg.py | 6 +- databases/backends/asyncpg.py | 4 +- databases/backends/compilers/__init__.py | 0 databases/backends/compilers/psycopg.py | 17 ----- databases/backends/dialects/__init__.py | 0 databases/backends/dialects/psycopg.py | 46 ------------- databases/backends/psycopg.py | 15 +++-- tests/test_databases.py | 86 +++++++++--------------- 8 files changed, 43 insertions(+), 131 deletions(-) delete mode 100644 databases/backends/compilers/__init__.py delete mode 100644 databases/backends/compilers/psycopg.py delete mode 100644 databases/backends/dialects/__init__.py delete mode 100644 databases/backends/dialects/psycopg.py diff --git a/databases/backends/aiopg.py b/databases/backends/aiopg.py index 0b4d95a3..1df30699 100644 --- a/databases/backends/aiopg.py +++ b/databases/backends/aiopg.py @@ -5,15 +5,13 @@ import uuid import aiopg +from sqlalchemy.dialects.postgresql.psycopg import PGDialect_psycopg from sqlalchemy.engine.cursor import CursorResultMetaData from sqlalchemy.engine.interfaces import Dialect, ExecutionContext -from sqlalchemy.engine.row import Row from sqlalchemy.sql import ClauseElement from sqlalchemy.sql.ddl import DDLElement from databases.backends.common.records import Record, Row, create_column_maps -from databases.backends.compilers.psycopg import PGCompiler_psycopg -from databases.backends.dialects.psycopg import PGDialect_psycopg from databases.core import LOG_EXTRA, DatabaseURL from databases.interfaces import ( ConnectionBackend, @@ -38,12 +36,10 @@ def _get_dialect(self) -> Dialect: dialect = PGDialect_psycopg( json_serializer=json.dumps, json_deserializer=lambda x: x ) - dialect.statement_compiler = PGCompiler_psycopg dialect.implicit_returning = True dialect.supports_native_enum = True dialect.supports_smallserial = True # 9.2+ dialect._backslash_escapes = False - dialect.supports_sane_multi_rowcount = True # psycopg 2.0.9+ dialect._has_native_hstore = True dialect.supports_native_decimal = True diff --git a/databases/backends/asyncpg.py b/databases/backends/asyncpg.py index 124f7af1..cc6441f5 100644 --- a/databases/backends/asyncpg.py +++ b/databases/backends/asyncpg.py @@ -2,12 +2,12 @@ import typing import asyncpg +from sqlalchemy.dialects.postgresql.psycopg import PGDialect_psycopg from sqlalchemy.engine.interfaces import Dialect from sqlalchemy.sql import ClauseElement from sqlalchemy.sql.ddl import DDLElement from databases.backends.common.records import Record, create_column_maps -from databases.backends.dialects.psycopg import dialect as psycopg_dialect from databases.core import LOG_EXTRA, DatabaseURL from databases.interfaces import ( ConnectionBackend, @@ -29,7 +29,7 @@ def __init__( self._pool = None def _get_dialect(self) -> Dialect: - dialect = psycopg_dialect(paramstyle="pyformat") + dialect = PGDialect_psycopg(paramstyle="pyformat") dialect.implicit_returning = True dialect.supports_native_enum = True dialect.supports_smallserial = True # 9.2+ diff --git a/databases/backends/compilers/__init__.py b/databases/backends/compilers/__init__.py deleted file mode 100644 index e69de29b..00000000 diff --git a/databases/backends/compilers/psycopg.py b/databases/backends/compilers/psycopg.py deleted file mode 100644 index 654c22a1..00000000 --- a/databases/backends/compilers/psycopg.py +++ /dev/null @@ -1,17 +0,0 @@ -from sqlalchemy.dialects.postgresql.psycopg import PGCompiler_psycopg - - -class APGCompiler_psycopg2(PGCompiler_psycopg): - def construct_params(self, *args, **kwargs): - pd = super().construct_params(*args, **kwargs) - - for column in self.prefetch: - pd[column.key] = self._exec_default(column.default) - - return pd - - def _exec_default(self, default): - if default.is_callable: - return default.arg(self.dialect) - else: - return default.arg diff --git a/databases/backends/dialects/__init__.py b/databases/backends/dialects/__init__.py deleted file mode 100644 index e69de29b..00000000 diff --git a/databases/backends/dialects/psycopg.py b/databases/backends/dialects/psycopg.py deleted file mode 100644 index 07bd1880..00000000 --- a/databases/backends/dialects/psycopg.py +++ /dev/null @@ -1,46 +0,0 @@ -""" -All the unique changes for the databases package -with the custom Numeric as the deprecated pypostgresql -for backwards compatibility and to make sure the -package can go to SQLAlchemy 2.0+. -""" - -import typing - -from sqlalchemy import types, util -from sqlalchemy.dialects.postgresql.base import PGDialect, PGExecutionContext -from sqlalchemy.engine import processors -from sqlalchemy.types import Float, Numeric - - -class PGExecutionContext_psycopg(PGExecutionContext): - ... - - -class PGNumeric(Numeric): - def bind_processor( - self, dialect: typing.Any - ) -> typing.Union[str, None]: # pragma: no cover - return processors.to_str - - def result_processor( - self, dialect: typing.Any, coltype: typing.Any - ) -> typing.Union[float, None]: # pragma: no cover - if self.asdecimal: - return None - else: - return processors.to_float - - -class PGDialect_psycopg(PGDialect): - colspecs = util.update_copy( - PGDialect.colspecs, - { - types.Numeric: PGNumeric, - types.Float: Float, - }, - ) - execution_ctx_cls = PGExecutionContext_psycopg - - -dialect = PGDialect_psycopg diff --git a/databases/backends/psycopg.py b/databases/backends/psycopg.py index f83f4917..da0a6718 100644 --- a/databases/backends/psycopg.py +++ b/databases/backends/psycopg.py @@ -22,7 +22,7 @@ class PsycopgBackend(DatabaseBackend): _database_url: DatabaseURL _options: typing.Dict[str, typing.Any] _dialect: Dialect - _pool: typing.Optional[psycopg_pool.AsyncConnectionPool] + _pool: typing.Optional[psycopg_pool.AsyncConnectionPool] = None def __init__( self, @@ -33,7 +33,6 @@ def __init__( self._options = options self._dialect = PGDialect_psycopg() self._dialect.implicit_returning = True - self._pool = None async def connect(self) -> None: if self._pool is not None: @@ -95,7 +94,10 @@ async def fetch_all(self, query: ClauseElement) -> typing.List[RecordInterface]: rows = await cursor.fetchall() column_maps = create_column_maps(result_columns) - return [PsycopgRecord(row, result_columns, self._dialect, column_maps) for row in rows] + return [ + PsycopgRecord(row, result_columns, self._dialect, column_maps) + for row in rows + ] async def fetch_one(self, query: ClauseElement) -> typing.Optional[RecordInterface]: if self._connection is None: @@ -167,7 +169,8 @@ def raw_connection(self) -> typing.Any: return self._connection def _compile( - self, query: ClauseElement, + self, + query: ClauseElement, ) -> typing.Tuple[str, typing.Mapping[str, typing.Any], tuple]: compiled = query.compile( dialect=self._dialect, @@ -224,7 +227,9 @@ def _mapping(self) -> typing.Mapping: def __getitem__(self, key: typing.Any) -> typing.Any: if len(self._column_map) == 0: - return self._mapping[key] + if isinstance(key, str): + return self._mapping[key] + return self._row[key] elif isinstance(key, Column): idx, datatype = self._column_map_full[str(key)] elif isinstance(key, int): diff --git a/tests/test_databases.py b/tests/test_databases.py index 5c0b61d1..66164aea 100644 --- a/tests/test_databases.py +++ b/tests/test_databases.py @@ -204,17 +204,17 @@ async def test_queries(database_url): assert len(results) == 3 assert results[0]["text"] == "example1" - assert results[0]["completed"] == True + assert results[0]["completed"] is True assert results[1]["text"] == "example2" - assert results[1]["completed"] == False + assert results[1]["completed"] is False assert results[2]["text"] == "example3" - assert results[2]["completed"] == True + assert results[2]["completed"] is True # fetch_one() query = notes.select() result = await database.fetch_one(query=query) assert result["text"] == "example1" - assert result["completed"] == True + assert result["completed"] is True # fetch_val() query = sqlalchemy.sql.select(*[notes.c.text]) @@ -246,11 +246,11 @@ async def test_queries(database_url): iterate_results.append(result) assert len(iterate_results) == 3 assert iterate_results[0]["text"] == "example1" - assert iterate_results[0]["completed"] == True + assert iterate_results[0]["completed"] is True assert iterate_results[1]["text"] == "example2" - assert iterate_results[1]["completed"] == False + assert iterate_results[1]["completed"] is False assert iterate_results[2]["text"] == "example3" - assert iterate_results[2]["completed"] == True + assert iterate_results[2]["completed"] is True @pytest.mark.parametrize("database_url", DATABASE_URLS) @@ -280,26 +280,26 @@ async def test_queries_raw(database_url): results = await database.fetch_all(query=query, values={"completed": True}) assert len(results) == 2 assert results[0]["text"] == "example1" - assert results[0]["completed"] == True + assert results[0]["completed"] is True assert results[1]["text"] == "example3" - assert results[1]["completed"] == True + assert results[1]["completed"] is True # fetch_one() query = "SELECT * FROM notes WHERE completed = :completed" result = await database.fetch_one(query=query, values={"completed": False}) assert result["text"] == "example2" - assert result["completed"] == False + assert result["completed"] is False # fetch_val() query = "SELECT completed FROM notes WHERE text = :text" result = await database.fetch_val(query=query, values={"text": "example1"}) - assert result == True + assert result is True query = "SELECT * FROM notes WHERE text = :text" result = await database.fetch_val( query=query, values={"text": "example1"}, column="completed" ) - assert result == True + assert result is True # iterate() query = "SELECT * FROM notes" @@ -308,11 +308,11 @@ async def test_queries_raw(database_url): iterate_results.append(result) assert len(iterate_results) == 3 assert iterate_results[0]["text"] == "example1" - assert iterate_results[0]["completed"] == True + assert iterate_results[0]["completed"] is True assert iterate_results[1]["text"] == "example2" - assert iterate_results[1]["completed"] == False + assert iterate_results[1]["completed"] is False assert iterate_results[2]["text"] == "example3" - assert iterate_results[2]["completed"] == True + assert iterate_results[2]["completed"] is True @pytest.mark.parametrize("database_url", DATABASE_URLS) @@ -380,7 +380,7 @@ async def test_results_support_mapping_interface(database_url): assert isinstance(results_as_dicts[0]["id"], int) assert results_as_dicts[0]["text"] == "example1" - assert results_as_dicts[0]["completed"] == True + assert results_as_dicts[0]["completed"] is True @pytest.mark.parametrize("database_url", DATABASE_URLS) @@ -467,7 +467,7 @@ async def test_execute_return_val(database_url): query = notes.select().where(notes.c.id == pk) result = await database.fetch_one(query) assert result["text"] == "example1" - assert result["completed"] == True + assert result["completed"] is True @pytest.mark.parametrize("database_url", DATABASE_URLS) @@ -857,7 +857,7 @@ async def test_transaction_commit_low_level(database_url): try: query = notes.insert().values(text="example1", completed=True) await database.execute(query) - except: # pragma: no cover + except Exception: # pragma: no cover await transaction.rollback() else: await transaction.commit() @@ -881,7 +881,7 @@ async def test_transaction_rollback_low_level(database_url): query = notes.insert().values(text="example1", completed=True) await database.execute(query) raise RuntimeError() - except: + except Exception: await transaction.rollback() else: # pragma: no cover await transaction.commit() @@ -1354,13 +1354,12 @@ async def test_queries_with_expose_backend_connection(database_url): ]: cursor = await raw_connection.cursor() await cursor.execute(insert_query, values) - elif database.url.scheme == "mysql+asyncmy": + elif database.url.scheme in ["mysql+asyncmy", "postgresql+psycopg"]: async with raw_connection.cursor() as cursor: await cursor.execute(insert_query, values) elif database.url.scheme in [ "postgresql", "postgresql+asyncpg", - "postgresql+psycopg", ]: await raw_connection.execute(insert_query, *values) elif database.url.scheme in ["sqlite", "sqlite+aiosqlite"]: @@ -1372,7 +1371,7 @@ async def test_queries_with_expose_backend_connection(database_url): if database.url.scheme in ["mysql", "mysql+aiomysql"]: cursor = await raw_connection.cursor() await cursor.executemany(insert_query, values) - elif database.url.scheme == "mysql+asyncmy": + elif database.url.scheme in ["mysql+asyncmy", "postgresql+psycopg"]: async with raw_connection.cursor() as cursor: await cursor.executemany(insert_query, values) elif database.url.scheme == "postgresql+aiopg": @@ -1395,15 +1394,11 @@ async def test_queries_with_expose_backend_connection(database_url): cursor = await raw_connection.cursor() await cursor.execute(select_query) results = await cursor.fetchall() - elif database.url.scheme == "mysql+asyncmy": + elif database.url.scheme in ["mysql+asyncmy", "postgresql+psycopg"]: async with raw_connection.cursor() as cursor: await cursor.execute(select_query) results = await cursor.fetchall() - elif database.url.scheme in [ - "postgresql", - "postgresql+asyncpg", - "postgresql+psycopg", - ]: + elif database.url.scheme in ["postgresql", "postgresql+asyncpg"]: results = await raw_connection.fetch(select_query) elif database.url.scheme in ["sqlite", "sqlite+aiosqlite"]: results = await raw_connection.execute_fetchall(select_query) @@ -1411,20 +1406,16 @@ async def test_queries_with_expose_backend_connection(database_url): assert len(results) == 3 # Raw output for the raw request assert results[0][1] == "example1" - assert results[0][2] == True + assert results[0][2] is True assert results[1][1] == "example2" - assert results[1][2] == False + assert results[1][2] is False assert results[2][1] == "example3" - assert results[2][2] == True + assert results[2][2] is True # fetch_one() - if database.url.scheme in [ - "postgresql", - "postgresql+asyncpg", - "postgresql+psycopg", - ]: + if database.url.scheme in ["postgresql", "postgresql+asyncpg"]: result = await raw_connection.fetchrow(select_query) - elif database.url.scheme == "mysql+asyncmy": + elif database.url.scheme in ["mysql+asyncmy", "postgresql+psycopg"]: async with raw_connection.cursor() as cursor: await cursor.execute(select_query) result = await cursor.fetchone() @@ -1435,7 +1426,7 @@ async def test_queries_with_expose_backend_connection(database_url): # Raw output for the raw request assert result[1] == "example1" - assert result[2] == True + assert result[2] is True @pytest.mark.parametrize("database_url", DATABASE_URLS) @@ -1606,7 +1597,7 @@ async def test_column_names(database_url, select_query): assert sorted(results[0]._mapping.keys()) == ["completed", "id", "text"] assert results[0]["text"] == "example1" - assert results[0]["completed"] == True + assert results[0]["completed"] is True @pytest.mark.parametrize("database_url", DATABASE_URLS) @@ -1641,23 +1632,6 @@ async def test_result_named_access(database_url): assert result.completed is True -@pytest.mark.parametrize("database_url", DATABASE_URLS) -@async_adapter -async def test_mapping_property_interface(database_url): - """ - Test that all connections implement interface with `_mapping` property - """ - async with Database(database_url) as database: - query = notes.select() - single_result = await database.fetch_one(query=query) - assert single_result._mapping["text"] == "example1" - assert single_result._mapping["completed"] is True - - list_result = await database.fetch_all(query=query) - assert list_result[0]._mapping["text"] == "example1" - assert list_result[0]._mapping["completed"] is True - - @async_adapter async def test_should_not_maintain_ref_when_no_cache_param(): async with Database( From 6910288add45ba7362ed3416ce7dc730502944fd Mon Sep 17 00:00:00 2001 From: ansipunk Date: Sun, 3 Mar 2024 17:30:07 +0500 Subject: [PATCH 13/15] S01E10 --- databases/backends/aiopg.py | 5 ++- databases/backends/asyncpg.py | 14 ++++---- databases/backends/compilers/__init__.py | 0 databases/backends/compilers/psycopg.py | 17 ++++++++++ databases/backends/dialects/__init__.py | 0 databases/backends/dialects/psycopg.py | 42 ++++++++++++++++++++++++ 6 files changed, 71 insertions(+), 7 deletions(-) create mode 100644 databases/backends/compilers/__init__.py create mode 100644 databases/backends/compilers/psycopg.py create mode 100644 databases/backends/dialects/__init__.py create mode 100644 databases/backends/dialects/psycopg.py diff --git a/databases/backends/aiopg.py b/databases/backends/aiopg.py index 1df30699..9928f8b3 100644 --- a/databases/backends/aiopg.py +++ b/databases/backends/aiopg.py @@ -5,13 +5,14 @@ import uuid import aiopg -from sqlalchemy.dialects.postgresql.psycopg import PGDialect_psycopg from sqlalchemy.engine.cursor import CursorResultMetaData from sqlalchemy.engine.interfaces import Dialect, ExecutionContext from sqlalchemy.sql import ClauseElement from sqlalchemy.sql.ddl import DDLElement from databases.backends.common.records import Record, Row, create_column_maps +from databases.backends.compilers.psycopg import PGCompiler_psycopg +from databases.backends.dialects.psycopg import PGDialect_psycopg from databases.core import LOG_EXTRA, DatabaseURL from databases.interfaces import ( ConnectionBackend, @@ -36,10 +37,12 @@ def _get_dialect(self) -> Dialect: dialect = PGDialect_psycopg( json_serializer=json.dumps, json_deserializer=lambda x: x ) + dialect.statement_compiler = PGCompiler_psycopg dialect.implicit_returning = True dialect.supports_native_enum = True dialect.supports_smallserial = True # 9.2+ dialect._backslash_escapes = False + dialect.supports_sane_multi_rowcount = True # psycopg 2.0.9+ dialect._has_native_hstore = True dialect.supports_native_decimal = True diff --git a/databases/backends/asyncpg.py b/databases/backends/asyncpg.py index cc6441f5..92ad93b0 100644 --- a/databases/backends/asyncpg.py +++ b/databases/backends/asyncpg.py @@ -2,12 +2,12 @@ import typing import asyncpg -from sqlalchemy.dialects.postgresql.psycopg import PGDialect_psycopg from sqlalchemy.engine.interfaces import Dialect from sqlalchemy.sql import ClauseElement from sqlalchemy.sql.ddl import DDLElement from databases.backends.common.records import Record, create_column_maps +from databases.backends.dialects.psycopg import dialect as psycopg_dialect from databases.core import LOG_EXTRA, DatabaseURL from databases.interfaces import ( ConnectionBackend, @@ -29,7 +29,8 @@ def __init__( self._pool = None def _get_dialect(self) -> Dialect: - dialect = PGDialect_psycopg(paramstyle="pyformat") + dialect = psycopg_dialect(paramstyle="pyformat") + dialect.implicit_returning = True dialect.supports_native_enum = True dialect.supports_smallserial = True # 9.2+ @@ -37,6 +38,7 @@ def _get_dialect(self) -> Dialect: dialect.supports_sane_multi_rowcount = True # psycopg 2.0.9+ dialect._has_native_hstore = True dialect.supports_native_decimal = True + return dialect def _get_connection_kwargs(self) -> dict: @@ -189,10 +191,10 @@ def _compile(self, query: ClauseElement) -> typing.Tuple[str, list, tuple]: ) return compiled_query, args, result_map - @property - def raw_connection(self) -> asyncpg.connection.Connection: - assert self._connection is not None, "Connection is not acquired" - return self._connection + @property + def raw_connection(self) -> asyncpg.connection.Connection: + assert self._connection is not None, "Connection is not acquired" + return self._connection class AsyncpgTransaction(TransactionBackend): diff --git a/databases/backends/compilers/__init__.py b/databases/backends/compilers/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/databases/backends/compilers/psycopg.py b/databases/backends/compilers/psycopg.py new file mode 100644 index 00000000..654c22a1 --- /dev/null +++ b/databases/backends/compilers/psycopg.py @@ -0,0 +1,17 @@ +from sqlalchemy.dialects.postgresql.psycopg import PGCompiler_psycopg + + +class APGCompiler_psycopg2(PGCompiler_psycopg): + def construct_params(self, *args, **kwargs): + pd = super().construct_params(*args, **kwargs) + + for column in self.prefetch: + pd[column.key] = self._exec_default(column.default) + + return pd + + def _exec_default(self, default): + if default.is_callable: + return default.arg(self.dialect) + else: + return default.arg diff --git a/databases/backends/dialects/__init__.py b/databases/backends/dialects/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/databases/backends/dialects/psycopg.py b/databases/backends/dialects/psycopg.py new file mode 100644 index 00000000..7740d976 --- /dev/null +++ b/databases/backends/dialects/psycopg.py @@ -0,0 +1,42 @@ +""" +All the unique changes for the databases package +with the custom Numeric as the deprecated pypostgresql +for backwards compatibility and to make sure the +package can go to SQLAlchemy 2.0+. +""" + +import typing + +from sqlalchemy.dialects.postgresql.base import PGDialect, PGExecutionContext +from sqlalchemy.engine import processors +from sqlalchemy.types import Numeric + + +class PGExecutionContext_psycopg(PGExecutionContext): + ... + + +class PGNumeric(Numeric): + def bind_processor( + self, dialect: typing.Any + ) -> typing.Union[str, None]: # pragma: no cover + return processors.to_str + + def result_processor( + self, dialect: typing.Any, coltype: typing.Any + ) -> typing.Union[float, None]: # pragma: no cover + if self.asdecimal: + return None + else: + return processors.to_float + + +class PGDialect_psycopg(PGDialect): + colspecs = { + **PGDialect.colspecs, + Numeric: PGNumeric, + } + execution_ctx_cls = PGExecutionContext_psycopg + + +dialect = PGDialect_psycopg From 09c1b2bbb318b12dd97d956416afec88fbdb5b03 Mon Sep 17 00:00:00 2001 From: ansipunk Date: Sun, 3 Mar 2024 17:58:10 +0500 Subject: [PATCH 14/15] S01E11 --- databases/backends/common/records.py | 8 +------- databases/backends/dialects/psycopg.py | 14 +++++++++----- databases/backends/psycopg.py | 22 ++++++++++++++++++++-- 3 files changed, 30 insertions(+), 14 deletions(-) diff --git a/databases/backends/common/records.py b/databases/backends/common/records.py index 5f6ba263..65032fc8 100644 --- a/databases/backends/common/records.py +++ b/databases/backends/common/records.py @@ -39,9 +39,6 @@ def __init__( @property def _mapping(self) -> typing.Mapping: - if hasattr(self._row, "_asdict"): - return self._row._asdict() - return self._row def keys(self) -> typing.KeysView: @@ -52,10 +49,7 @@ def values(self) -> typing.ValuesView: def __getitem__(self, key: typing.Any) -> typing.Any: if len(self._column_map) == 0: - try: - return self._row[key] - except TypeError: - return self._mapping[key] + return self._row[key] elif isinstance(key, Column): idx, datatype = self._column_map_full[str(key)] elif isinstance(key, int): diff --git a/databases/backends/dialects/psycopg.py b/databases/backends/dialects/psycopg.py index 7740d976..07bd1880 100644 --- a/databases/backends/dialects/psycopg.py +++ b/databases/backends/dialects/psycopg.py @@ -7,9 +7,10 @@ import typing +from sqlalchemy import types, util from sqlalchemy.dialects.postgresql.base import PGDialect, PGExecutionContext from sqlalchemy.engine import processors -from sqlalchemy.types import Numeric +from sqlalchemy.types import Float, Numeric class PGExecutionContext_psycopg(PGExecutionContext): @@ -32,10 +33,13 @@ def result_processor( class PGDialect_psycopg(PGDialect): - colspecs = { - **PGDialect.colspecs, - Numeric: PGNumeric, - } + colspecs = util.update_copy( + PGDialect.colspecs, + { + types.Numeric: PGNumeric, + types.Float: Float, + }, + ) execution_ctx_cls = PGExecutionContext_psycopg diff --git a/databases/backends/psycopg.py b/databases/backends/psycopg.py index da0a6718..eb3cec16 100644 --- a/databases/backends/psycopg.py +++ b/databases/backends/psycopg.py @@ -1,6 +1,9 @@ import typing +import orjson import psycopg +import psycopg.adapt +import psycopg.types import psycopg_pool from psycopg.rows import namedtuple_row from sqlalchemy.dialects.postgresql.psycopg import PGDialect_psycopg @@ -18,6 +21,16 @@ ) +class JsonLoader(psycopg.adapt.Loader): + def load(self, data): + return orjson.loads(data) + + +class JsonDumper(psycopg.adapt.Dumper): + def dump(self, data): + return orjson.dumps(data) + + class PsycopgBackend(DatabaseBackend): _database_url: DatabaseURL _options: typing.Dict[str, typing.Any] @@ -73,8 +86,13 @@ async def acquire(self) -> None: raise RuntimeError("PsycopgBackend is not running") # TODO: Add configurable timeouts - self._connection = await self._database._pool.getconn() - await self._connection.set_autocommit(True) + connection = await self._database._pool.getconn() + connection.adapters.register_loader("json", JsonLoader) + connection.adapters.register_loader("jsonb", JsonLoader) + connection.adapters.register_dumper(dict, JsonDumper) + connection.adapters.register_dumper(list, JsonDumper) + await connection.set_autocommit(True) + self._connection = connection async def release(self) -> None: if self._connection is None: From e7358af76575df8bd72c5b12fc6d7de92f086750 Mon Sep 17 00:00:00 2001 From: ansipunk Date: Sun, 3 Mar 2024 18:13:37 +0500 Subject: [PATCH 15/15] S01E12 --- databases/backends/psycopg.py | 23 ++++++++++++++++++++--- requirements.txt | 3 +++ setup.py | 1 + 3 files changed, 24 insertions(+), 3 deletions(-) diff --git a/databases/backends/psycopg.py b/databases/backends/psycopg.py index eb3cec16..527b2600 100644 --- a/databases/backends/psycopg.py +++ b/databases/backends/psycopg.py @@ -1,6 +1,5 @@ import typing -import orjson import psycopg import psycopg.adapt import psycopg.types @@ -20,15 +19,33 @@ TransactionBackend, ) +try: + import orjson + + def load(data): + return orjson.loads(data) + + def dump(data): + return orjson.dumps(data) + +except ImportError: + import json + + def load(data): + return json.loads(data.decode("utf-8")) + + def dump(data): + return json.dumps(data).encode("utf-8") + class JsonLoader(psycopg.adapt.Loader): def load(self, data): - return orjson.loads(data) + return load(data) class JsonDumper(psycopg.adapt.Dumper): def dump(self, data): - return orjson.dumps(data) + return dump(data) class PsycopgBackend(DatabaseBackend): diff --git a/requirements.txt b/requirements.txt index 785c998b..450b1c63 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,5 +1,8 @@ -e . +# Speedups +orjson==3.9.15 + # Async database drivers asyncmy==0.2.9 aiomysql==0.2.0 diff --git a/setup.py b/setup.py index 4ad4fc5b..33b6f137 100644 --- a/setup.py +++ b/setup.py @@ -58,6 +58,7 @@ def get_packages(package): "psycopg3": ["psycopg", "psycopg-pool"], "sqlite": ["aiosqlite"], "aiosqlite": ["aiosqlite"], + "orjson": ["orjson"], }, classifiers=[ "Development Status :: 3 - Alpha",