Skip to content

Commit ec10361

Browse files
remove SQLAlchemy ORM (#380)
* refactor: remove SQLAlchemy ORM for direct asyncpg driver - Switch from SQLAlchemy to asyncpg for raw PostgreSQL access - Add connection pooling with min=10 max=POOL_SIZE - Set 300s max idle timeout for connections - Implement proper pool lifecycle via FastAPI lifespan * refactor: migrate from SQLAlchemy to asyncpg for database operations - Replace SQLAlchemy ORM with direct asyncpg queries for better performance - Add DeviceRequest Pydantic model for request validation - Implement proper error handling for database operations - Add FastAPI lifespan context manager Testing: - Validated all PostgreSQL operations with neon.tech database - Confirmed working: inserts, error handling, metrics collection TODO: - Need to test memcached operations * refactor: replace SQLAlchemy models with raw SQL schema - Remove models.py as we no longer use SQLAlchemy ORM - Add schema.sql for direct PostgreSQL table definitions * remove SQLAlchemy dependencies * Add async support set operations in aiomcache - Updated requirements.txt
1 parent 2a81423 commit ec10361

File tree

5 files changed

+147
-85
lines changed

5 files changed

+147
-85
lines changed

lessons/231/python-app/db.py

+71-15
Original file line numberDiff line numberDiff line change
@@ -1,27 +1,83 @@
1+
import logging
12
import os
2-
from typing import Annotated
3+
from contextlib import asynccontextmanager
4+
from typing import Annotated, AsyncGenerator
5+
6+
import asyncpg
7+
from fastapi import Depends, FastAPI
8+
9+
logging.basicConfig(level=logging.INFO)
10+
logger = logging.getLogger(__name__)
311

4-
from fastapi import Depends
5-
from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker, create_async_engine
612

713
POSTGRES_URI = os.environ["POSTGRES_URI"]
814
POSTGRES_POOL_SIZE = int(os.environ["POSTGRES_POOL_SIZE"])
915

1016

11-
engine = create_async_engine(
12-
POSTGRES_URI,
13-
echo=False,
14-
pool_pre_ping=False,
15-
pool_size=POSTGRES_POOL_SIZE,
16-
max_overflow=0,
17-
)
17+
class Database:
18+
def __init__(self):
19+
self.pool = None
20+
21+
async def create_pool(self):
22+
"""Create connection pool if it doesn't exist"""
23+
if self.pool is None:
24+
try:
25+
self.pool = await asyncpg.create_pool(
26+
POSTGRES_URI,
27+
min_size=10,
28+
max_size=POSTGRES_POOL_SIZE,
29+
max_inactive_connection_lifetime=300,
30+
)
31+
logger.info(f"Database pool created: {self.pool}")
32+
except asyncpg.exceptions.PostgresError as e:
33+
logging.error(f"Error creating PostgreSQL connection pool: {e}")
34+
raise ValueError("Failed to create PostgreSQL connection pool")
35+
except Exception as e:
36+
logging.error(f"Unexpected error while creating connection pool: {e}")
37+
raise
38+
39+
@asynccontextmanager
40+
async def get_connection(self) -> AsyncGenerator[asyncpg.Connection, None]:
41+
"""Get database connection from pool"""
42+
if not self.pool:
43+
await self.create_pool()
44+
async with self.pool.acquire() as connection:
45+
logger.info("Connection acquired from pool")
46+
yield connection
47+
logger.info("Connection released back to pool")
48+
49+
async def close(self):
50+
"""Close the pool when shutting down"""
51+
if self.pool:
52+
await self.pool.close()
53+
logger.info("Database pool closed")
54+
self.pool = None
55+
56+
57+
db = Database()
58+
1859

19-
async_session = async_sessionmaker(engine, expire_on_commit=False)
60+
async def get_db() -> AsyncGenerator[asyncpg.Connection, None]:
61+
async with db.get_connection() as conn:
62+
yield conn
2063

2164

22-
async def get_postgres_session():
23-
async with async_session() as session:
24-
yield session
65+
PostgresDep = Annotated[asyncpg.Connection, Depends(get_db)]
2566

2667

27-
PostgresDep = Annotated[AsyncSession, Depends(get_postgres_session)]
68+
@asynccontextmanager
69+
async def lifespan(app: FastAPI):
70+
"""Lifespan context manager for database connection"""
71+
print(" Starting up database connection...")
72+
try:
73+
await db.create_pool()
74+
logger.info(" Database pool created successfully")
75+
yield
76+
except Exception as e:
77+
logger.info(f"Failed to create database pool: {e}")
78+
raise
79+
finally:
80+
# Shutdown: close all connections
81+
logger.info(" Shutting down database connection...")
82+
await db.close()
83+
logger.info(" Database connections closed")

lessons/231/python-app/main.py

+60-36
Original file line numberDiff line numberDiff line change
@@ -4,20 +4,22 @@
44
import time
55
import uuid
66

7-
from fastapi import FastAPI
7+
import aiomcache
8+
import orjson
9+
from asyncpg import PostgresError
10+
from fastapi import FastAPI, HTTPException
811
from fastapi.responses import ORJSONResponse, PlainTextResponse
912
from prometheus_client import make_asgi_app
13+
from pydantic import BaseModel
1014
from pymemcache.client.base import Client
11-
from sqlalchemy import insert
12-
from asyncer import asyncify
13-
from db import PostgresDep
15+
16+
from db import PostgresDep, lifespan
1417
from metrics import H
15-
from models import Device
1618

17-
app = FastAPI()
19+
app = FastAPI(lifespan=lifespan)
1820

1921
MEMCACHED_HOST = os.environ["MEMCACHED_HOST"]
20-
cache_client = Client(MEMCACHED_HOST)
22+
cache_client = aiomcache.Client(MEMCACHED_HOST)
2123

2224
metrics_app = make_asgi_app()
2325
app.mount("/metrics", metrics_app)
@@ -75,34 +77,56 @@ def get_devices():
7577
return devices
7678

7779

80+
class DeviceRequest(BaseModel):
81+
mac: str
82+
firmware: str
83+
84+
7885
@app.post("/api/devices", status_code=201, response_class=ORJSONResponse)
79-
async def create_device(device: Device, session: PostgresDep) -> Device:
80-
# To match Go implementation instead of using SQLAlchemy factory.
81-
now = datetime.datetime.now(datetime.timezone.utc)
82-
device_uuid = uuid.uuid4()
83-
84-
stmt = (
85-
insert(Device)
86-
.values(
87-
uuid=device_uuid,
88-
mac=device.mac,
89-
firmware=device.firmware,
90-
created_at=now,
91-
updated_at=now,
86+
async def create_device(device: DeviceRequest, conn: PostgresDep):
87+
try:
88+
now = datetime.datetime.now(datetime.timezone.utc)
89+
device_uuid = uuid.uuid4()
90+
91+
insert_query = """
92+
INSERT INTO python_device (uuid, mac, firmware, created_at, updated_at)
93+
VALUES ($1, $2, $3, $4, $5)
94+
RETURNING id, uuid, mac, firmware, created_at, updated_at;
95+
"""
96+
97+
start_time = time.perf_counter()
98+
99+
row = await conn.fetchrow(
100+
insert_query, device_uuid, device.mac, device.firmware, now, now
101+
)
102+
103+
H.labels(op="insert", db="postgres").observe(time.perf_counter() - start_time)
104+
105+
device_dict = dict(row)
106+
107+
# Measure cache operation
108+
start_time = time.perf_counter()
109+
110+
await cache_client.set(
111+
device_uuid.bytes,
112+
orjson.dumps(device_dict),
113+
exptime=20,
114+
)
115+
116+
H.labels(op="set", db="memcache").observe(time.perf_counter() - start_time)
117+
118+
return device_dict
119+
120+
except PostgresError as e:
121+
raise HTTPException(
122+
status_code=500, detail="Database error occurred while creating device"
123+
)
124+
except aiomcache.exceptions.ClientException as e:
125+
raise HTTPException(
126+
status_code=500,
127+
detail="Memcached Database error occurred while creating device",
128+
)
129+
except Exception as e:
130+
raise HTTPException(
131+
status_code=500, detail="An unexpected error occurred while creating device"
92132
)
93-
.returning(Device)
94-
)
95-
96-
# Measure the same insert operation as in Go
97-
start_time = time.perf_counter()
98-
device_result = await session.execute(stmt)
99-
device_dict = device_result.mappings().one()
100-
await session.commit()
101-
H.labels(op="insert", db="postgres").observe(time.perf_counter() - start_time)
102-
103-
# Measure the same set operation as in Go
104-
start_time = time.perf_counter()
105-
cache_client.set(str(device_uuid), dict(device_dict), expire=20)
106-
H.labels(op="set", db="memcache").observe(time.perf_counter() - start_time)
107-
108-
return device_dict

lessons/231/python-app/models.py

-31
This file was deleted.

lessons/231/python-app/requirements.txt

+2-3
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
aiomcache==0.8.2
12
annotated-types==0.7.0
23
anyio==4.7.0
34
asyncpg==0.30.0
@@ -7,7 +8,6 @@ dnspython==2.7.0
78
email_validator==2.2.0
89
fastapi==0.115.6
910
fastapi-cli==0.0.6
10-
greenlet==3.1.1
1111
h11==0.14.0
1212
httpcore==1.0.7
1313
httptools==0.6.4
@@ -34,12 +34,11 @@ rich==13.9.4
3434
rich-toolkit==0.12.0
3535
shellingham==1.5.4
3636
sniffio==1.3.1
37-
SQLAlchemy==2.0.36
3837
starlette==0.41.3
3938
typer==0.15.1
4039
typing_extensions==4.12.2
4140
uvicorn==0.32.1
4241
uvloop==0.21.0
4342
watchfiles==1.0.0
4443
websockets==14.1
45-
asyncer==0.0.8
44+
asyncer==0.0.8

lessons/231/python-app/schema.sql

+14
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
CREATE TABLE IF NOT EXISTS python_device (
2+
id SERIAL PRIMARY KEY,
3+
uuid UUID DEFAULT NULL,
4+
mac VARCHAR(255) DEFAULT NULL,
5+
firmware VARCHAR(255) DEFAULT NULL,
6+
created_at TIMESTAMP
7+
WITH
8+
TIME ZONE DEFAULT CURRENT_TIMESTAMP,
9+
updated_at TIMESTAMP
10+
WITH
11+
TIME ZONE DEFAULT CURRENT_TIMESTAMP
12+
);
13+
14+
CREATE INDEX IF NOT EXISTS idx_device_uuid ON python_device (uuid);

0 commit comments

Comments
 (0)