diff --git a/docs/database.md b/docs/database.md index f8b2f7030..8e9679851 100644 --- a/docs/database.md +++ b/docs/database.md @@ -1,231 +1,218 @@ Starlette is not strictly tied to any particular database implementation. -You can use it with an asynchronous ORM, such as [GINO](https://python-gino.org/), -or use regular non-async endpoints, and integrate with [SQLAlchemy](https://www.sqlalchemy.org/). +You can use it with an asynchronous ORM, such as [GINO](https://python-gino.org/) or [SQLAlchemy](https://www.sqlalchemy.org/), or use regular non-async endpoints. -In this documentation we'll demonstrate how to integrate against [the `databases` package](https://github.com/encode/databases), -which provides SQLAlchemy core support against a range of different database drivers. +In this documentation we'll demonstrate how to integrate against [SQLAlchemy](https://www.sqlalchemy.org/). -Here's a complete example, that includes table definitions, configuring a `database.Database` -instance, and a couple of endpoints that interact with the database. +Here's a complete example, that includes table definitions, configuring a database connection, and a couple of endpoints that interact with the database. ```ini title=".env" -DATABASE_URL=sqlite:///test.db +DATABASE_URL=sqlite+aiosqlite:///test.db ``` ```python title="app.py" -import contextlib +from contextlib import asynccontextmanager +from typing import AsyncIterator + +from sqlalchemy import MetaData, select +from sqlalchemy.ext.asyncio import AsyncAttrs, async_sessionmaker, create_async_engine +from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column +from sqlalchemy.engine import make_url -import databases -import sqlalchemy from starlette.applications import Starlette from starlette.config import Config +from starlette.requests import Request from starlette.responses import JSONResponse from starlette.routing import Route # Configuration from environment variables or '.env' file. -config = Config('.env') -DATABASE_URL = config('DATABASE_URL') - +config = Config(".env") +DATABASE_URL = config( + "DATABASE_URL", cast=make_url, default="sqlite+aiosqlite:///test.db" +) # Database table definitions. -metadata = sqlalchemy.MetaData() - -notes = sqlalchemy.Table( - "notes", - metadata, - sqlalchemy.Column("id", sqlalchemy.Integer, primary_key=True), - sqlalchemy.Column("text", sqlalchemy.String), - sqlalchemy.Column("completed", sqlalchemy.Boolean), -) +metadata = MetaData() -database = databases.Database(DATABASE_URL) -@contextlib.asynccontextmanager -async def lifespan(app): - await database.connect() - yield - await database.disconnect() - -# Main application code. -async def list_notes(request): - query = notes.select() - results = await database.fetch_all(query) - content = [ - { - "text": result["text"], - "completed": result["completed"] - } - for result in results - ] - return JSONResponse(content) - -async def add_note(request): - data = await request.json() - query = notes.insert().values( - text=data["text"], - completed=data["completed"] - ) - await database.execute(query) - return JSONResponse({ - "text": data["text"], - "completed": data["completed"] - }) +class Base(AsyncAttrs, DeclarativeBase): + pass -routes = [ - Route("/notes", endpoint=list_notes, methods=["GET"]), - Route("/notes", endpoint=add_note, methods=["POST"]), -] -app = Starlette( - routes=routes, - lifespan=lifespan, -) -``` +class Note(Base): + __tablename__ = "notes" -Finally, you will need to create the database tables. It is recommended to use -Alembic, which we briefly go over in [Migrations](#migrations) + id: Mapped[int] = mapped_column(primary_key=True) + text: Mapped[str] + completed: Mapped[bool] -## Queries -Queries may be made with as [SQLAlchemy Core queries][sqlalchemy-core]. +engine = create_async_engine(DATABASE_URL, echo=True) +async_session = async_sessionmaker(engine, expire_on_commit=False) -The following methods are supported: -* `rows = await database.fetch_all(query)` -* `row = await database.fetch_one(query)` -* `async for row in database.iterate(query)` -* `await database.execute(query)` -* `await database.execute_many(query)` +# Main application code +@asynccontextmanager +async def lifespan(app: Starlette) -> AsyncIterator[None]: + yield + await engine.dispose() -## Transactions -Database transactions are available either as a decorator, as a -context manager, or as a low-level API. +async def list_notes(request: Request): + async with async_session() as session: + query = await session.execute(select(Note)) + results = query.scalars().all() -Using a decorator on an endpoint: + return JSONResponse( + [{"text": result.text, "completed": result.completed} for result in results] + ) -```python -@database.transaction() -async def populate_note(request): - # This database insert occurs within a transaction. - # It will be rolled back by the `RuntimeError`. - query = notes.insert().values(text="you won't see me", completed=True) - await database.execute(query) - raise RuntimeError() -``` -Using a context manager: +async def add_note(request: Request): + data = await request.json() + new_note = Note(text=data["text"], completed=data["completed"]) -```python -async def populate_note(request): - async with database.transaction(): - # This database insert occurs within a transaction. - # It will be rolled back by the `RuntimeError`. - query = notes.insert().values(text="you won't see me", completed=True) - await request.database.execute(query) - raise RuntimeError() -``` + async with async_session() as session: + async with session.begin(): + session.add(new_note) -Using the low-level API: + return JSONResponse({"text": new_note.text, "completed": new_note.completed}) + + +routes = [ + Route("/notes", endpoint=list_notes, methods=["GET"]), + Route("/notes", endpoint=add_note, methods=["POST"]), +] + +app = Starlette(routes=routes, lifespan=lifespan) -```python -async def populate_note(request): - transaction = await database.transaction() - try: - # This database insert occurs within a transaction. - # It will be rolled back by the `RuntimeError`. - query = notes.insert().values(text="you won't see me", completed=True) - await database.execute(query) - raise RuntimeError() - except: - await transaction.rollback() - raise - else: - await transaction.commit() ``` -## Test isolation +Finally, you will need to create the database tables. It is recommended to use +Alembic, which we briefly go over in [Migrations](#migrations) + +## Testing There are a few things that we want to ensure when running tests against a service that uses a database. Our requirements should be: -* Use a separate database for testing. -* Create a new test database every time we run the tests. -* Ensure that the database state is isolated between each test case. +- Use a separate database for testing. +- Create a new test database every time we run the tests. +- Ensure that the database state is isolated between each test case. + +Install dependencies for testing: + +```sh +$ pip install aiosqlite greenlet httpx pytest pytest_asyncio sqlalchemy_utils +``` + +!!! note + + `pytest-asyncio` requires setting the option `asyncio_default_fixture_loop_scope` but does not provide a default. To suppress this deprecation warning add the following to the project config: + + ``` + # pyproject.toml + [tool.pytest.ini_options] + asyncio_default_fixture_loop_scope = "function" + ``` Here's how we need to structure our application and tests in order to meet those requirements: -```python +```diff title="app.py" from starlette.applications import Starlette from starlette.config import Config -import databases config = Config(".env") -TESTING = config('TESTING', cast=bool, default=False) -DATABASE_URL = config('DATABASE_URL', cast=databases.DatabaseURL) -TEST_DATABASE_URL = DATABASE_URL.replace(database='test_' + DATABASE_URL.database) ++ TESTING = config("TESTING", cast=bool, default=False) +DATABASE_URL = config( + "DATABASE_URL", cast=make_url, default="sqlite+aiosqlite:///test.db" +) + ++ if TESTING: ++ DATABASE_URL = DATABASE_URL.set(database="test_" + DATABASE_URL.database) -# Use 'force_rollback' during testing, to ensure we do not persist database changes -# between each test case. -if TESTING: - database = databases.Database(TEST_DATABASE_URL, force_rollback=True) -else: - database = databases.Database(DATABASE_URL) +engine = create_async_engine(DATABASE_URL, echo=True) +async_session = async_sessionmaker(engine, expire_on_commit=False) ``` We still need to set `TESTING` during a test run, and setup the test database. -Assuming we're using `py.test`, here's how our `conftest.py` might look: +Assuming we're using `pytest`, here's how our `conftest.py` might look: -```python +```python title="conftest.py" import pytest +import pytest_asyncio + from starlette.config import environ from starlette.testclient import TestClient -from sqlalchemy import create_engine + from sqlalchemy_utils import database_exists, create_database, drop_database +from sqlalchemy.util import greenlet_spawn +from sqlalchemy.ext.asyncio import create_async_engine # This sets `os.environ`, but provides some additional protection. # If we placed it below the application import, it would raise an error # informing us that 'TESTING' had already been read from the environment. -environ['TESTING'] = 'True' +environ["TESTING"] = "True" +from app import Base, DATABASE_URL, app + -import app +@pytest_asyncio.fixture(scope="function", autouse=True) +async def create_test_database(): + """ + Create a clean database on every test case. + For safety, we should abort if a database already exists. + + We use the `sqlalchemy_utils` package here for a few helpers in consistently + creating and dropping the database. + """ + assert not database_exists( + DATABASE_URL + ), "Test database already exists. Aborting tests." + await greenlet_spawn(create_database, DATABASE_URL) # Create the test database. -@pytest.fixture(scope="session", autouse=True) -def create_test_database(): - """ - Create a clean database on every test case. - For safety, we should abort if a database already exists. + engine = create_async_engine(DATABASE_URL, echo=True) + async with engine.begin() as conn: + await conn.run_sync(Base.metadata.create_all) # Create the tables. - We use the `sqlalchemy_utils` package here for a few helpers in consistently - creating and dropping the database. - """ - url = str(app.TEST_DATABASE_URL) - engine = create_engine(url) - assert not database_exists(url), 'Test database already exists. Aborting tests.' - create_database(url) # Create the test database. - metadata.create_all(engine) # Create the tables. - yield # Run the tests. - drop_database(url) # Drop the test database. + yield # Run the tests. + drop_database(DATABASE_URL) # Drop the test database. @pytest.fixture() def client(): - """ - When using the 'client' fixture in test cases, we'll get full database - rollbacks between test cases: - - def test_homepage(client): - url = app.url_path_for('homepage') - response = client.get(url) - assert response.status_code == 200 - """ with TestClient(app) as client: yield client + +``` + +When using the 'client' fixture in test cases, we'll get full database rollbacks between test cases: + +```python title="test_notes.py" +from app import app + + +def test_list_notes(client): + url = app.url_path_for("list_notes") + response = client.get(url) + assert response.status_code == 200 + assert response.json() == [] + + +def test_add_note(client): + url = app.url_path_for("list_notes") + response = client.post(url, json={"text": "Test note", "completed": False}) + assert response.status_code == 200 + assert response.json() == {"text": "Test note", "completed": False} + response = client.get(url) + assert response.status_code == 200 + assert response.json() == [ + {"text": "Test note", "completed": False} + ], "Note not found in the list" + ``` ## Migrations @@ -236,7 +223,7 @@ incremental changes to the database. For this we'd strongly recommend ```shell $ pip install alembic -$ alembic init migrations +$ alembic init -t async migrations ``` Now, you'll want to set things up so that Alembic references the configured @@ -248,7 +235,7 @@ In `alembic.ini` remove the following line: sqlalchemy.url = driver://user:pass@localhost/dbname ``` -In `migrations/env.py`, you need to set the ``'sqlalchemy.url'`` configuration key, +In `migrations/env.py`, you need to set the `'sqlalchemy.url'` configuration key, and the `target_metadata` variable. You'll want something like this: ```python @@ -256,9 +243,9 @@ and the `target_metadata` variable. You'll want something like this: config = context.config # Configure Alembic to use our DATABASE_URL and our table definitions... -import app -config.set_main_option('sqlalchemy.url', str(app.DATABASE_URL)) -target_metadata = app.metadata +from app import DATABASE_URL, metadata +config.set_main_option('sqlalchemy.url', str(DATABASE_URL)) +target_metadata = metadata ... ``` @@ -276,9 +263,9 @@ And populate the new file (within `migrations/versions`) with the necessary dire def upgrade(): op.create_table( 'notes', - sqlalchemy.Column("id", sqlalchemy.Integer, primary_key=True), - sqlalchemy.Column("text", sqlalchemy.String), - sqlalchemy.Column("completed", sqlalchemy.Boolean), + sa.Column("id", sa.Integer, primary_key=True), + sa.Column("text", sa.String), + sa.Column("completed", sa.Boolean), ) def downgrade(): @@ -298,25 +285,64 @@ every time it creates the test database. This will help catch any issues in your migration scripts, and will help ensure that the tests are running against a database that's in a consistent state with your live database. -We can adjust the `create_test_database` fixture slightly: +Adjust `migrations/env.py`: ```python -from alembic import command -from alembic.config import Config -import app +def run_migrations_online() -> None: + """Run migrations in 'online' mode.""" + + connectable = config.attributes.get("connection", None) + + if connectable is None: + asyncio.run(run_async_migrations()) + else: + do_run_migrations(connectable) +``` + +See [Programmatic API use (connection sharing) With Asyncio](https://alembic.sqlalchemy.org/en/latest/cookbook.html#programmatic-api-use-connection-sharing-with-asyncio) from the Alembic docs for more details. + +We can adjust the `create_test_database` fixture slightly: + +```diff ++ from alembic import command ++ from alembic.config import Config +- from app import Base, DATABASE_URL, app ++ from app import DATABASE_URL, app ... -@pytest.fixture(scope="session", autouse=True) -def create_test_database(): - url = str(app.DATABASE_URL) - engine = create_engine(url) - assert not database_exists(url), 'Test database already exists. Aborting tests.' - create_database(url) # Create the test database. - config = Config("alembic.ini") # Run the migrations. - command.upgrade(config, "head") - yield # Run the tests. - drop_database(url) # Drop the test database. ++ def run_upgrade(connection, cfg): ++ cfg.attributes["connection"] = connection ++ command.upgrade(cfg, "head") + + ++ async def run_async_upgrade(): ++ async_engine = create_async_engine(DATABASE_URL, echo=True) ++ async with async_engine.begin() as conn: ++ await conn.run_sync(run_upgrade, Config("alembic.ini")) + +@pytest_asyncio.fixture(scope="function", autouse=True) +async def create_test_database(): + """ + Create a clean database on every test case. + For safety, we should abort if a database already exists. + + We use the `sqlalchemy_utils` package here for a few helpers in consistently + creating and dropping the database. + """ + assert not database_exists( + DATABASE_URL + ), "Test database already exists. Aborting tests." + + await greenlet_spawn(create_database, DATABASE_URL) # Create the test database. + +- engine = create_async_engine(DATABASE_URL, echo=True) +- async with engine.begin() as conn: +- await conn.run_sync(Base.metadata.create_all) # Create the tables. ++ await run_async_upgrade() + + yield # Run the tests. + drop_database(DATABASE_URL) # Drop the test database. ``` [sqlalchemy-core]: https://docs.sqlalchemy.org/en/latest/core/