Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 22 additions & 6 deletions poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

20 changes: 20 additions & 0 deletions pycds/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,16 @@
"HistoryStationNetwork",
"ObsCountPerDayHistory",
"ObsWithFlags",
"ClimatologicalPeriod",
"ClimatologicalPeriodHistory",
"ClimatologicalStation",
"ClimatologicalStationHistory",
"ClimatologicalStationXHistory",
"ClimatologicalStationXHistoryHistory",
"ClimatologicalVariable",
"ClimatologicalVariableHistory",
"ClimatologicalValue",
"ClimatologicalValueHistory",
]

from pycds.context import get_schema_name, get_su_role_name
Expand All @@ -94,6 +104,16 @@
NativeFlag,
PCICFlag,
DerivedValue,
ClimatologicalPeriod,
ClimatologicalPeriodHistory,
ClimatologicalStation,
ClimatologicalStationHistory,
ClimatologicalStationXHistory,
ClimatologicalStationXHistoryHistory,
ClimatologicalVariable,
ClimatologicalVariableHistory,
ClimatologicalValue,
ClimatologicalValueHistory,
)

from .orm.views import (
Expand Down
40 changes: 28 additions & 12 deletions pycds/alembic/change_history_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
the change history functionality.
"""

from typing import Iterable, Any
from typing import Iterable, Any, Optional

from alembic import op

Expand Down Expand Up @@ -57,13 +57,15 @@ def add_history_cols_to_primary(


def drop_history_cols_from_primary(
collection_name: str, columns: tuple[str] = ("mod_time", "mod_user")
collection_name: str, columns: tuple[str, str] = ("mod_time", "mod_user")
):
drop_columns = ", ".join(f"DROP COLUMN {c}" for c in columns)
op.execute(f"ALTER TABLE {main_table_name(collection_name)} {drop_columns}")


def create_history_table(collection_name: str, foreign_tables: list[tuple[str, str]]):
def create_history_table(
collection_name: str, foreign_tables: Optional[list[tuple[str, str]]]
):
# Create the history table. We can't use Alembic create_table here because it doesn't
# support the LIKE syntax we need.
columns = ", ".join(
Expand All @@ -89,18 +91,24 @@ def drop_history_table(collection_name: str):

def create_history_table_indexes(
collection_name: str,
pri_id_name: str,
foreign_tables: list[tuple[str, str]],
pri_id_name: list[str] | str,
foreign_tables: Optional[Iterable[tuple[str, str]]],
extras=None,
):
"""
Create indexes on the history table. For analysis on what indexes are needed,
see https://github.com/pacificclimate/pycds/issues/228
"""

if isinstance(pri_id_name, str):
pri_id_name = [pri_id_name]

seen = []

for columns in (
# Index on primary table primary key, mod_time, mod_user
([pri_id_name], ["mod_time"], ["mod_user"])
tuple([x] for x in pri_id_name)
+ (["mod_time"], ["mod_user"])
# Index on all foreign main table primary keys
+ tuple([ft_pk_name] for _, ft_pk_name in (foreign_tables or tuple()))
# Index on all foreign history table primary keys
Expand All @@ -110,6 +118,9 @@ def create_history_table_indexes(
)
+ (extras or tuple())
):
if columns in seen:
continue
seen.append(columns)
# How much do we care about index naming? SQLAlchemy uses a different pattern than
# appears typical in CRMP.
op.create_index(
Expand All @@ -122,9 +133,9 @@ def create_history_table_indexes(

def populate_history_table(
collection_name: str,
pri_id_name: str,
foreign_tables: list[tuple[str, str]],
limit: int = None,
pri_id_name: list[str] | str,
foreign_tables: Optional[list[tuple[str, str]]],
limit: Optional[int] = None,
):
"""
Populate the history table with data from the main table, in order of item id (main
Expand All @@ -140,7 +151,7 @@ def populate_history_table(
# foreign table definitions: the CTE names, the CTE definitions, and their usages
# within the query that populates the target history table.

foreign_tables = foreign_tables or tuple()
foreign_tables = foreign_tables or []

conditional_comma = "," if len(foreign_tables) > 0 else ""

Expand Down Expand Up @@ -173,6 +184,11 @@ def populate_history_table(
else ""
)

if isinstance(pri_id_name, str):
pri_id_name = [pri_id_name]

pri_order_clause = ", ".join(f"main.{idn}" for idn in pri_id_name)

stmt = f"""
{"WITH" if len(foreign_tables) > 0 else ""}
{ft_cte_list}
Expand All @@ -183,7 +199,7 @@ def populate_history_table(
FROM {main_table_name(collection_name)} main
{conditional_comma} {ft_cte_name_list}
{ft_where_clause}
ORDER BY main.{pri_id_name}
ORDER BY {pri_order_clause}
"""
op.execute(stmt)

Expand All @@ -209,7 +225,7 @@ def create_primary_table_triggers(collection_name: str, prefix: str = "t100_"):


def create_history_table_triggers(
collection_name: str, foreign_tables: list, prefix: str = "t100_"
collection_name: str, foreign_tables: Optional[list], prefix: str = "t100_"
):
# Trigger: Add foreign key values to each record inserted into history table.
ft_args = (
Expand Down
1 change: 1 addition & 0 deletions pycds/alembic/env.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
# Now to test if it works when crmp contains an upgraded schema and other
# does not. This works.

import alembic_postgresql_enum
from alembic import context
from sqlalchemy import engine_from_config, pool
from logging.config import fileConfig
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
"""apply hx tracking to multi climo normals

Revision ID: 7244176be9fa
Revises: 758be4f4ce0f
Create Date: 2025-09-23 16:15:58.236278

"""

from alembic import op
import sqlalchemy as sa

from pycds.alembic.util import grant_standard_table_privileges
from pycds.context import get_schema_name
from pycds.alembic.change_history_utils import (
add_history_cols_to_primary,
create_history_table,
populate_history_table,
drop_history_triggers,
drop_history_table,
drop_history_cols_from_primary,
create_history_table_triggers,
create_primary_table_triggers,
create_history_table_indexes,
)


# revision identifiers, used by Alembic.
revision = "7244176be9fa"
down_revision = "758be4f4ce0f"
branch_labels = None
depends_on = None


schema_name = get_schema_name()


table_info = (
# table_name, primary_key_name, foreign_keys, extra_indexes
("climo_period", "climo_period_id", None, None),
(
"climo_station",
"climo_station_id",
[
("climo_period", "climo_period_id"),
],
None,
),
(
"climo_stn_x_hist",
["climo_station_id", "history_id"],
[("climo_station", "climo_station_id"), ("meta_history", "history_id")],
None,
),
("climo_variable", "climo_variable_id", None, None),
(
"climo_value",
"climo_value_id",
[
("climo_variable", "climo_variable_id"),
("climo_station", "climo_station_id"),
],
None,
),
)


def upgrade():

# We have to set the search_path so that the trigger functions fired when
# the history table is populated can find the functions that they call.
op.get_bind().execute(sa.text(f"SET search_path TO {schema_name}, public"))

for table_name, primary_key_name, foreign_tables, extra_indexes in table_info:
# Primary table
add_history_cols_to_primary(table_name)
create_primary_table_triggers(table_name)

# History table
create_history_table(table_name, foreign_tables)
populate_history_table(table_name, primary_key_name, foreign_tables)
# History table triggers must be created after the table is populated.
create_history_table_triggers(table_name, foreign_tables)
create_history_table_indexes(
table_name, primary_key_name, foreign_tables, extra_indexes
)
grant_standard_table_privileges(table_name, schema=schema_name)


def downgrade():
for table_name, _, _, _ in reversed(table_info):
drop_history_triggers(table_name)
drop_history_table(table_name)
drop_history_cols_from_primary(table_name)
Loading