Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

schema: multi-tenancy #1140

Open
wants to merge 7 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion datajunction-query/alembic/env.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,10 @@ def run_migrations_online():
and associate a connection with the context.

"""
connectable = create_engine(settings.index)
connectable = create_engine(
settings.index,
connect_args={"options": f"-csearch_path={settings.customSchema}"},
)

with connectable.connect() as connection:
context.configure(connection=connection, target_metadata=target_metadata)
Expand Down
2 changes: 2 additions & 0 deletions datajunction-query/djqs/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ class Settings(BaseSettings): # pylint: disable=too-few-public-methods
# SQLAlchemy URI for the metadata database.
index: str = "sqlite:///djqs.db?check_same_thread=False"

customSchema: str = "public"

# The default engine to use for reflection
default_reflection_engine: str = "default"

Expand Down
12 changes: 10 additions & 2 deletions datajunction-query/djqs/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,14 @@
import logging
import os
from functools import lru_cache
from typing import Iterator
from typing import Iterator, Optional

from dotenv import load_dotenv
from pydantic.datetime_parse import parse_datetime
from rich.logging import RichHandler
from sqlalchemy.engine import Engine
from sqlmodel import Session, create_engine
from starlette.requests import Request

from djqs.config import Settings

Expand Down Expand Up @@ -56,11 +57,18 @@ def get_metadata_engine() -> Engine:
return engine


def get_session() -> Iterator[Session]:
def get_session(request: Request = None) -> Iterator[Session]:
"""
Per-request session.
"""
schema = request.headers.get("tenant")
engine = get_metadata_engine()
settings = get_settings()

if schema:
engine = engine.execution_options(schema_translate_map={None: schema})

settings.customSchema = request.headers.get("new_tenant")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@sadath-12 can you help me understand why you need to modify the settings instance directly like this? Also, since you're pulling schema from the "tenant" header, you don't need this settings property at all anymore, right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @samredai , this is to tell dj to switch to the schema when we hit register schema endpoint (created internally) so dj can run migrations on it on runtime or else it will have to be restarted

Copy link
Contributor

@samredai samredai Aug 20, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @sadath-12 I understand the intent here now. I don't think modifying the application settings is the right approach though. Instead we should parameterize the alembic migration scripts (I think I recall @shangyian proposing this). Then in your custom endpoint logic, you can programmatically set that schema parameter using the context.


with Session(engine, autoflush=False) as session: # pragma: no cover
yield session
Expand Down
7 changes: 6 additions & 1 deletion datajunction-server/alembic/env.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,12 @@ def run_migrations_online():
and associate a connection with the context.

"""
connectable = create_engine(settings.index)
settings = get_settings()

connectable = create_engine(
settings.index,
connect_args={"options": f"-csearch_path={settings.customSchema}"},
)

with connectable.connect() as connection:
context.configure(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
from datetime import timedelta
from http import HTTPStatus

from fastapi import APIRouter, Depends, Form
from fastapi import APIRouter, Depends, Request
from fastapi.responses import JSONResponse, Response
from fastapi.security import OAuth2PasswordRequestForm
from sqlalchemy import select
Expand All @@ -25,14 +25,19 @@

@router.post("/basic/user/")
async def create_a_user(
email: str = Form(),
username: str = Form(),
password: str = Form(),
request: Request,
session: AsyncSession = Depends(get_session),
) -> JSONResponse:
"""
Create a new user
"""
body = await request.body()
if not body:
return JSONResponse(content={"error": "Request body is empty"}, status_code=400)
data = await request.json()
username = data.get("username")
email = data.get("email")
password = data.get("password")
user_result = await session.execute(select(User).where(User.username == username))
if user_result.scalar_one_or_none():
raise DJException(
Expand Down
2 changes: 2 additions & 0 deletions datajunction-server/datajunction_server/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ class Settings(
# SQLAlchemy URI for the metadata database.
index: str = "postgresql+psycopg://dj:dj@postgres_metadata:5432/dj"

customSchema: str = "public"

# Directory where the repository lives. This should have 2 subdirectories, "nodes" and
# "databases".
repository: Path = Path(".")
Expand Down
21 changes: 16 additions & 5 deletions datajunction-server/datajunction_server/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
from dotenv import load_dotenv
from fastapi import Depends
from rich.logging import RichHandler
from sqlalchemy import AsyncAdaptedQueuePool
from sqlalchemy import AsyncAdaptedQueuePool, text
from sqlalchemy.dialects.postgresql import insert
from sqlalchemy.ext.asyncio import (
AsyncEngine,
Expand Down Expand Up @@ -70,6 +70,7 @@ def __init__(self):
self.engine: AsyncEngine | None = None
self.session_maker = None
self.session = None
self.schema = None

def init_db(self):
"""
Expand All @@ -94,6 +95,11 @@ def init_db(self):
},
)

if self.schema:
self.engine = self.engine.execution_options(
schema_translate_map={None: self.schema}
)

async_session_factory = async_sessionmaker(
bind=self.engine,
autocommit=False,
Expand All @@ -115,17 +121,20 @@ async def close(self):


@lru_cache(maxsize=None)
def get_session_manager() -> DatabaseSessionManager:
def get_session_manager(request: Optional[Request] = None) -> DatabaseSessionManager:
"""
Get session manager
"""
session_manager = DatabaseSessionManager()
session_manager.schema = request.headers.get("tenant")
settings = get_settings()
settings.customSchema = request.headers.get("new_tenant")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@sadath-12 from our discussions I remember you telling me that you had to do this so that the alembic script would run properly when you init a new tenant's schema, but since this commit the alembic scripts are now configurable to target a specific schema. This means in any of your custom endpoints you could do something like this:

from alembic.config import Config
from alembic import command

...
SCHEMA = request.headers.get("tenant") # For example, "tenant1"
if SCHEMA:
    alembic_cfg = Config()
    alembic_cfg.set_main_option("script_location", "./alembic")
    alembic_cfg.cmd_opts = {"x": [f"uri=postgresql+psycopg://username:password@server:5432/{SCHEMA}"]}
    command.upgrade(alembic_cfg, "head")
...

Which means you don't need to add this customSchema to the settings class at all.

session_manager.init_db()
return session_manager


@lru_cache(maxsize=None)
def get_engine() -> AsyncEngine:
def get_engine(schema: str) -> AsyncEngine:
"""
Create the metadata engine.
"""
Expand All @@ -143,14 +152,16 @@ def get_engine() -> AsyncEngine:
"connect_timeout": settings.db_connect_timeout,
},
)
if schema:
engine = engine.execution_options(schema_translate_map={None: schema})
return engine


async def get_session() -> AsyncIterator[AsyncSession]:
async def get_session(request: Request = None) -> AsyncIterator[AsyncSession]:
"""
Async database session.
"""
session_manager = get_session_manager()
session_manager = get_session_manager(request)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Related to my other comment, I would just pass in the schema here. so:

session_manager = get_session_manager(schema=request.headers.get("schema")

session = session_manager.session()
try:
yield session
Expand Down
Loading