Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
06639d4
feat: migrate SQLAlchemy from 1.4 to 2.0 with ORM style
tito Sep 18, 2025
45d1608
test: update test suite for SQLAlchemy 2.0 migration
tito Sep 18, 2025
d21b65e
fix: Complete SQLAlchemy 2.0 migration - add session parameters to al…
tito Sep 18, 2025
9b90aaa
fix: Move timezone import to top-level to fix ruff PLC0415 error
tito Sep 18, 2025
1520f88
fix: Add missing session parameter to test functions
tito Sep 18, 2025
7f178b5
fix: Complete SQLAlchemy 2.0 migration - fix session parameter passing
tito Sep 22, 2025
24980de
fix: Continue SQLAlchemy 2.0 migration - fix test files and cleanup m…
tito Sep 23, 2025
224e402
fix: Complete SQLAlchemy 2.0 migration for test_room_ics.py
tito Sep 23, 2025
4f70a7f
fix: Complete major SQLAlchemy 2.0 test migration
tito Sep 23, 2025
fb5bb39
fix: resolve event loop isolation issues in test suite
tito Sep 23, 2025
04a9c2f
fix: resolve remaining 8 test failures after SQLAlchemy 2.0 migration
tito Sep 23, 2025
5e036d1
refactor: remove excessive comments from test code
tito Sep 23, 2025
606c5f5
refactor: use 'import sqlalchemy as sa' pattern in db/base.py
tito Sep 23, 2025
60cc2b1
Merge remote-tracking branch 'origin/main' into mathieu/sqlalchemy-2-…
tito Sep 23, 2025
617a1c8
refactor: improve session management across worker tasks and pipelines
tito Sep 23, 2025
8ad1270
feat: add @with_session decorator for worker task session management
tito Sep 23, 2025
27b3b9c
test: update test fixtures to use @with_session decorator
tito Sep 23, 2025
1c9e8b9
test: rename db_db_session to db_session across test files
tito Sep 23, 2025
a883df0
test: update test fixtures to use @with_session decorator
tito Sep 23, 2025
e0c71c5
refactor: migrate to SQLAlchemy 2.0 ORM-style patterns
tito Sep 23, 2025
0b2152e
fix: remove duplicated methods
tito Sep 23, 2025
b217c7b
refactor: use @with_session decorator in file pipeline tasks
tito Sep 23, 2025
f51dae8
refactor: create @with_session_and_transcript decorator to simplify p…
tito Sep 23, 2025
a07c621
refactor: add session parameter to ICSSyncService.sync_room_calendar
tito Sep 23, 2025
ad2accb
refactor: remove unnecessary get_session_factory usage
tito Sep 24, 2025
df90936
fix: add missing db_session parameter to transcript audio endpoints
tito Sep 24, 2025
2aa99fe
fix: add missing db_session parameters across codebase
tito Sep 24, 2025
27f19ec
fix: improve session management and testing infrastructure
tito Sep 24, 2025
b7f8e8e
fix: add missing session parameters to controller method calls
tito Sep 24, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
118 changes: 118 additions & 0 deletions server/asyncio_loop_analysis.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
# AsyncIO Event Loop Analysis for test_attendee_parsing_bug.py

## Problem Summary
The test passes but encounters an error during teardown where asyncpg tries to use a different/closed event loop, resulting in:
- `RuntimeError: Task got Future attached to a different loop`
- `RuntimeError: Event loop is closed`

## Root Cause Analysis

### 1. Multiple Event Loop Creation Points

The test environment creates event loops at different scopes:

1. **Session-scoped loop** (conftest.py:27-34):
- Created once per test session
- Used by session-scoped fixtures
- Closed after all tests complete

2. **Function-scoped loop** (pytest-asyncio default):
- Created for each async test function
- This is the loop that runs the actual test
- Closed immediately after test completes

3. **AsyncPG internal loop**:
- AsyncPG connections store a reference to the loop they were created with
- Used for connection lifecycle management

### 2. Event Loop Lifecycle Mismatch

The issue occurs because:

1. **Session fixture creates database connection** on session-scoped loop
2. **Test runs** on function-scoped loop (different from session loop)
3. **During teardown**, the session fixture tries to rollback/close using the original session loop
4. **AsyncPG connection** still references the function-scoped loop which is now closed
5. **Conflict**: SQLAlchemy tries to use session loop, but asyncpg Future is attached to the closed function loop

### 3. Configuration Issues

Current pytest configuration:
- `asyncio_mode = "auto"` in pyproject.toml
- `asyncio_default_fixture_loop_scope=session` (shown in test output)
- `asyncio_default_test_loop_scope=function` (shown in test output)

This mismatch between fixture loop scope (session) and test loop scope (function) causes the problem.

## Solutions

### Option 1: Align Loop Scopes (Recommended)
Change pytest-asyncio configuration to use consistent loop scopes:

```python
# pyproject.toml
[tool.pytest.ini_options]
asyncio_mode = "auto"
asyncio_default_fixture_loop_scope = "function" # Change from session to function
```

### Option 2: Use Function-Scoped Database Fixture
Change the `session` fixture scope from session to function:

```python
@pytest_asyncio.fixture # Remove scope="session"
async def session(setup_database):
# ... existing code ...
```

### Option 3: Explicit Loop Management
Ensure all async operations use the same loop:

```python
@pytest_asyncio.fixture
async def session(setup_database, event_loop):
# Force using the current event loop
engine = create_async_engine(
settings.DATABASE_URL,
echo=False,
poolclass=NullPool,
connect_args={"loop": event_loop} # Pass explicit loop
)
# ... rest of fixture ...
```

### Option 4: Upgrade pytest-asyncio
The current version (1.1.0) has known issues with loop management. Consider upgrading to the latest version which has better loop scope handling.

## Immediate Workaround

For the test to run cleanly without the teardown error, you can:

1. Add explicit cleanup in the test:
```python
@pytest.mark.asyncio
async def test_attendee_parsing_bug(session):
# ... existing test code ...

# Explicit cleanup before fixture teardown
await session.commit() # or await session.close()
```

2. Or suppress the teardown error (not recommended for production):
```python
@pytest.fixture
async def session(setup_database):
# ... existing setup ...
try:
yield session
await session.rollback()
except RuntimeError as e:
if "Event loop is closed" not in str(e):
raise
finally:
await session.close()
```

## Recommendation

The cleanest solution is to align the loop scopes by setting both fixture and test loop scopes to "function" scope. This ensures each test gets its own clean event loop and avoids cross-contamination between tests.
2 changes: 1 addition & 1 deletion server/migrations/env.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
from alembic import context
from sqlalchemy import engine_from_config, pool

from reflector.db import metadata
from reflector.db.base import metadata
from reflector.settings import settings

# this is the Alembic Config object, which provides
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ def upgrade() -> None:
transcript = table("transcript", column("id", sa.String), column("topics", sa.JSON))

# Select all rows from the transcript table
results = bind.execute(select([transcript.c.id, transcript.c.topics]))
results = bind.execute(select(transcript.c.id, transcript.c.topics))

for row in results:
transcript_id = row["id"]
Expand Down Expand Up @@ -58,7 +58,7 @@ def downgrade() -> None:
transcript = table("transcript", column("id", sa.String), column("topics", sa.JSON))

# Select all rows from the transcript table
results = bind.execute(select([transcript.c.id, transcript.c.topics]))
results = bind.execute(select(transcript.c.id, transcript.c.topics))

for row in results:
transcript_id = row["id"]
Expand Down
4 changes: 1 addition & 3 deletions server/migrations/versions/4814901632bc_fix_duration.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,7 @@ def upgrade() -> None:

# select only the one with duration = 0
results = bind.execute(
select([transcript.c.id, transcript.c.duration]).where(
transcript.c.duration == 0
)
select(transcript.c.id, transcript.c.duration).where(transcript.c.duration == 0)
)

data_dir = Path(settings.DATA_DIR)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ def upgrade() -> None:
transcript = table("transcript", column("id", sa.String), column("topics", sa.JSON))

# Select all rows from the transcript table
results = bind.execute(select([transcript.c.id, transcript.c.topics]))
results = bind.execute(select(transcript.c.id, transcript.c.topics))

for row in results:
transcript_id = row["id"]
Expand Down Expand Up @@ -58,7 +58,7 @@ def downgrade() -> None:
transcript = table("transcript", column("id", sa.String), column("topics", sa.JSON))

# Select all rows from the transcript table
results = bind.execute(select([transcript.c.id, transcript.c.topics]))
results = bind.execute(select(transcript.c.id, transcript.c.topics))

for row in results:
transcript_id = row["id"]
Expand Down
10 changes: 7 additions & 3 deletions server/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@ dependencies = [
"sentry-sdk[fastapi]>=1.29.2",
"httpx>=0.24.1",
"fastapi-pagination>=0.12.6",
"databases[aiosqlite, asyncpg]>=0.7.0",
"sqlalchemy<1.5",
"sqlalchemy>=2.0.0",
"asyncpg>=0.29.0",
"alembic>=1.11.3",
"nltk>=3.8.1",
"prometheus-fastapi-instrumentator>=6.1.0",
Expand All @@ -46,6 +46,7 @@ dev = [
"black>=24.1.1",
"stamina>=23.1.0",
"pyinstrument>=4.6.1",
"pytest-async-sqlalchemy>=0.2.0",
]
tests = [
"pytest-cov>=4.1.0",
Expand Down Expand Up @@ -111,12 +112,15 @@ source = ["reflector"]

[tool.pytest_env]
ENVIRONMENT = "pytest"
DATABASE_URL = "postgresql://test_user:test_password@localhost:15432/reflector_test"
DATABASE_URL = "postgresql+asyncpg://test_user:test_password@localhost:15432/reflector_test"

[tool.pytest.ini_options]
addopts = "-ra -q --disable-pytest-warnings --cov --cov-report html -v"
testpaths = ["tests"]
asyncio_mode = "auto"
asyncio_debug = true
asyncio_default_fixture_loop_scope = "session"
asyncio_default_test_loop_scope = "session"
markers = [
"model_api: tests for the unified model-serving HTTP API (backend- and hardware-agnostic)",
]
Expand Down
13 changes: 3 additions & 10 deletions server/reflector/asynctask.py
Original file line number Diff line number Diff line change
@@ -1,21 +1,14 @@
import asyncio
import functools

from reflector.db import get_database


def asynctask(f):
@functools.wraps(f)
def wrapper(*args, **kwargs):
async def run_with_db():
database = get_database()
await database.connect()
try:
return await f(*args, **kwargs)
finally:
await database.disconnect()
async def run_async():
return await f(*args, **kwargs)

coro = run_with_db()
coro = run_async()
try:
loop = asyncio.get_running_loop()
except RuntimeError:
Expand Down
71 changes: 46 additions & 25 deletions server/reflector/db/__init__.py
Original file line number Diff line number Diff line change
@@ -1,48 +1,69 @@
import contextvars
from typing import Optional
from typing import AsyncGenerator

import databases
import sqlalchemy
from sqlalchemy.ext.asyncio import (
AsyncEngine,
AsyncSession,
async_sessionmaker,
create_async_engine,
)

from reflector.db.base import Base as Base
from reflector.db.base import metadata as metadata
from reflector.events import subscribers_shutdown, subscribers_startup
from reflector.settings import settings

metadata = sqlalchemy.MetaData()
_engine: AsyncEngine | None = None
_session_factory: async_sessionmaker[AsyncSession] | None = None


def get_engine() -> AsyncEngine:
global _engine
if _engine is None:
_engine = create_async_engine(
settings.DATABASE_URL,
echo=False,
pool_pre_ping=True,
)
return _engine

_database_context: contextvars.ContextVar[Optional[databases.Database]] = (
contextvars.ContextVar("database", default=None)
)

def get_session_factory() -> async_sessionmaker[AsyncSession]:
global _session_factory
if _session_factory is None:
_session_factory = async_sessionmaker(
get_engine(),
class_=AsyncSession,
expire_on_commit=False,
)
return _session_factory

def get_database() -> databases.Database:
"""Get database instance for current asyncio context"""
db = _database_context.get()
if db is None:
db = databases.Database(settings.DATABASE_URL)
_database_context.set(db)
return db

async def _get_session() -> AsyncGenerator[AsyncSession, None]:
# necessary implementation to ease mocking on pytest
async with get_session_factory()() as session:
yield session


async def get_session() -> AsyncGenerator[AsyncSession, None]:
async for session in _get_session():
yield session


# import models
import reflector.db.calendar_events # noqa
import reflector.db.meetings # noqa
import reflector.db.recordings # noqa
import reflector.db.rooms # noqa
import reflector.db.transcripts # noqa

kwargs = {}
if "postgres" not in settings.DATABASE_URL:
raise Exception("Only postgres database is supported in reflector")
engine = sqlalchemy.create_engine(settings.DATABASE_URL, **kwargs)


@subscribers_startup.append
async def database_connect(_):
database = get_database()
await database.connect()
get_engine()


@subscribers_shutdown.append
async def database_disconnect(_):
database = get_database()
await database.disconnect()
global _engine
if _engine:
await _engine.dispose()
_engine = None
Loading
Loading