Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 13 additions & 24 deletions .github/workflows/code-format-check.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,30 +2,19 @@ name: Code format check

on: push

jobs:
test:

runs-on: ubuntu-22.04
concurrency:
group: ${{ github.workflow }}-${{ github.event.number || github.ref }}
cancel-in-progress: true

jobs:
black:
runs-on: ubuntu-24.04
steps:
- name: Checkout
uses: actions/checkout@v2

- name: Set up Python
uses: actions/setup-python@v2
- uses: actions/checkout@v4
- uses: actions/setup-python@v5 # Python ≥3.11 required for use_pyproject
with:
python-version: 3.11

- name: Install Poetry
run: |
curl -sSL https://install.python-poetry.org | python3 -

- name: Install project
run: |
# We only need Black, and we want to use the project-specific version.
# (Defaults in Black change from time to time and we want only to deal with
# the consequences, if any, on our own schedule.)
poetry install --only dev

- name: Run check
run: poetry run black . --check
python-version: "3.11"
- uses: psf/black@stable
with:
use_pyproject: true
options: "--check"
2 changes: 1 addition & 1 deletion .github/workflows/pypi-publish.yml
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ jobs:
- uses: actions/checkout@v2

- name: Set up Python
uses: actions/setup-python@v2
uses: actions/setup-python@v5
with:
python-version: '3.12'

Expand Down
14 changes: 11 additions & 3 deletions .github/workflows/python-ci.yml
Original file line number Diff line number Diff line change
@@ -1,6 +1,12 @@
name: Python CI

on: push
on:
pull_request:
branches: ["master"]

concurrency:
group: ${{ github.workflow }}-${{ github.event.number || github.ref }}
cancel-in-progress: true

jobs:
test:
Expand All @@ -13,13 +19,15 @@ jobs:
- "3.10"
- "3.11"
- "3.12"
- "3.13"
- "3.14"

steps:
- name: Checkout
uses: actions/checkout@v2

- name: Set up Python ${{ matrix.python-version }}
uses: actions/setup-python@v2
uses: actions/setup-python@v5
with:
python-version: ${{ matrix.python-version }}

Expand All @@ -31,7 +39,7 @@ jobs:

- name: Install Poetry
run: |
curl -sSL https://install.python-poetry.org | python3 -
pipx install poetry==2.1.3

- name: Install project
run: |
Expand Down
2 changes: 1 addition & 1 deletion crmprtd/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@

Row = namedtuple(
"Row",
"time val variable_name unit network_name station_id lat lon",
"time val variable_name unit network_key station_id lat lon",
)


Expand Down
73 changes: 36 additions & 37 deletions crmprtd/align.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,14 +80,14 @@ def memoize(sesh, *args, **kwargs):
return wrapper


def histories_within_threshold(sesh, network_name, lon, lat, threshold):
def histories_within_threshold(sesh, network_key, lon, lat, threshold):
"""
Find existing histories associated with the given network and within a threshold
distance of the point specified by (lon, lat). Return the history id and distance
for each such history, as a list in ascending order of distance.

:param sesh: SQLAlchemy db session
:param network_name: Name of network associated to history.
:param network_key: Key of network associated to history.
:param lat: Lat for History
:param lon: Lon for History
:param threshold: Include only histories within this distance (m) from (lat, lon)
Expand All @@ -109,7 +109,7 @@ def histories_within_threshold(sesh, network_name, lon, lat, threshold):
.select_from(History)
.join(Station, History.station_id == Station.id)
.join(Network, Station.network_id == Network.id)
.filter(Network.name == network_name)
.filter(Network.key == network_key)
.filter(
History.the_geom.intersects(ST_Buffer(p_ref, threshold)),
)
Expand Down Expand Up @@ -178,20 +178,20 @@ def find_active_history(histories):
"Multiple active stations in db",
extra={
"num_active_stns": len(matching_histories),
"network_name": matching_histories[0].network_name,
"network_key": matching_histories[0].network_key,
},
)
return None


def find_nearest_history(
sesh, network_name, native_id, lat, lon, histories, diagnostic=False
sesh, network_key, native_id, lat, lon, histories, diagnostic=False
):
close_histories = histories_within_threshold(sesh, network_name, lon, lat, 800)
close_histories = histories_within_threshold(sesh, network_key, lon, lat, 800)

if len(close_histories) == 0:
return create_station_and_history_entry(
sesh, network_name, native_id, lat, lon, diagnostic=diagnostic
sesh, network_key, native_id, lat, lon, diagnostic=diagnostic
)

for close_history in close_histories:
Expand All @@ -203,45 +203,45 @@ def find_nearest_history(
return history


def match_history(sesh, network_name, native_id, lat, lon, histories, diagnostic=False):
def match_history(sesh, network_key, native_id, lat, lon, histories, diagnostic=False):
if lat and lon:
return find_nearest_history(
sesh, network_name, native_id, lat, lon, histories, diagnostic=diagnostic
sesh, network_key, native_id, lat, lon, histories, diagnostic=diagnostic
)
else:
return find_active_history(histories)


def create_station_and_history_entry(
sesh, network_name, native_id, lat, lon, diagnostic=False
sesh, network_key, native_id, lat, lon, diagnostic=False
):
"""
Create a Station and an associated History object according to the arguments.

:param sesh: SQLAlchemy db session
:param network_name: Name of network associated to Station.
:param network_key: Key of network associated to Station.
:param native_id: Native id of Station.
:param lat: Lat for History
:param lon: Lon for History
:param diagnostic: Boolean. In diagnostic mode? Not used!
:return: None
"""
network = sesh.query(Network).filter(Network.name == network_name).first()
network = sesh.query(Network).filter(Network.key == network_key).first()

action = "Requires" if diagnostic else "Created"

station = Station(native_id=native_id, network_id=network.id)
log.info(
f"{action} new station entry",
extra={"native_id": station.native_id, "network_name": network.name},
extra={"native_id": station.native_id, "network_key": network.key},
)

history = History(station=station, lat=lat, lon=lon)
log.warning(
f"{action} new history entry",
extra={
"history": history.id,
"network_name": network_name,
"network_key": network.key,
"native_id": station.native_id,
"lat": lat,
"lon": lon,
Expand All @@ -251,7 +251,7 @@ def create_station_and_history_entry(
if diagnostic:
log.info(
f"In diagnostic mode. Skipping insertion of new history entry for: "
f"network_name={network_name}, native_id={native_id}, lat={lat}, lon={lon}"
f"network_key={network.key}, native_id={native_id}, lat={lat}, lon={lon}"
)
return None

Expand All @@ -271,28 +271,28 @@ def create_station_and_history_entry(


@cached_function(["unit", "id"])
def get_variable(sesh, network_name, variable_name):
def get_variable(sesh, network_key, variable_name):
"""
Find (but not create) a Variable matching the arguments, if possible.

:param sesh: SQLAlchemy db session
:param network_name: Name of network that Variable must be in.
:param network_key: Key of network that Variable must be in.
:param variable_name: Name of Variable
:return: Variable or None
"""
with sesh.no_autoflush:
variable = (
sesh.query(Variable)
.join(Network)
.filter(and_(Network.name == network_name, Variable.name == variable_name))
.filter(and_(Network.key == network_key, Variable.name == variable_name))
.first()
)
return variable


@cached_function(["id"])
def find_or_create_matching_history_and_station(
sesh, network_name, native_id, lat, lon, diagnostic=False
sesh, network_key, native_id, lat, lon, diagnostic=False
):
"""
Find or create a History and associated Station record matching the arguments,
Expand All @@ -303,7 +303,7 @@ def find_or_create_matching_history_and_station(
In diagnostic mode, do not create any new records (History or Station).

:param sesh: SQLAlchemy db session
:param network_name: Name of network that History must be in.
:param network_key: Key of network that History must be in.
:param native_id: Native id of station that history must be associated with.
:param lat: Lat for history record; either for spatial matching or creation -
see below
Expand All @@ -312,8 +312,7 @@ def find_or_create_matching_history_and_station(
:param diagnostic: Boolean. In diagnostic mode?
:return: History object or None

Search db for existing history records exactly matching network_name and station_id.

Search db for existing history records exactly matching network_key and station_id.
If no such history is found, create one (along with the necessary station record)
and return it. In diagnostic mode, do not create new records.

Expand All @@ -328,32 +327,32 @@ def find_or_create_matching_history_and_station(
sesh.query(History)
.join(Station)
.join(Network)
.filter(and_(Network.name == network_name, Station.native_id == native_id))
.filter(and_(Network.key == network_key, Station.native_id == native_id))
)

if histories.count() == 0:
log.debug("Cound not find native_id %s", native_id)
return create_station_and_history_entry(
sesh, network_name, native_id, lat, lon, diagnostic=diagnostic
sesh, network_key, native_id, lat, lon, diagnostic=diagnostic
)
elif histories.count() == 1:
log.debug("Found exactly one matching history_id")
return histories.one_or_none()
elif histories.count() >= 2:
log.debug("Found multiple history entries. Searching for match.")
return match_history(
sesh, network_name, native_id, lat, lon, histories, diagnostic=diagnostic
sesh, network_key, native_id, lat, lon, histories, diagnostic=diagnostic
)


@cached_function(["name"])
def get_network(sesh, network_name):
return sesh.query(Network).filter(Network.name == network_name).first()
@cached_function(["key"])
def get_network(sesh, network_key):
return sesh.query(Network).filter(Network.key == network_key).first()


def has_required_information(row):
return (
row.network_name is not None
row.network_key is not None
and row.time is not None
and row.val is not None
and row.variable_name is not None
Expand Down Expand Up @@ -382,7 +381,7 @@ def align(sesh, row, diagnostic=False):
log.debug(
"Observation missing critical information",
extra={
"network_name": row.network_name,
"network_key": row.network_key,
"time": row.time,
"val": row.val,
"variable_name": row.variable_name,
Expand All @@ -391,17 +390,17 @@ def align(sesh, row, diagnostic=False):
return None

# Sanity check: specified network exists
if not get_network(sesh, row.network_name):
if not get_network(sesh, row.network_key):
log.error(
"Network does not exist in db",
extra={"network_name": row.network_name},
extra={"network_key": row.network_key},
)
return None

# Find or create a matching History record, if possible.
history = find_or_create_matching_history_and_station(
sesh,
row.network_name,
row.network_key,
row.station_id,
row.lat,
row.lon,
Expand All @@ -411,19 +410,19 @@ def align(sesh, row, diagnostic=False):
log.warning(
"Could not find history match",
extra={
"network_name": row.network_name,
"network_key": row.network_key,
"native_id": row.station_id,
},
)
return None

# Find a matching Variable object, if possible.
variable = get_variable(sesh, row.network_name, row.variable_name)
variable = get_variable(sesh, row.network_key, row.variable_name)
if not variable:
log.debug(
'Variable "%s" from network "%s" is not tracked by crmp',
row.variable_name,
row.network_name,
row.network_key,
)
return None
else:
Expand All @@ -439,7 +438,7 @@ def align(sesh, row, diagnostic=False):
"unit_obs": row.unit,
"unit_db": var_unit,
"data": row.val,
"network_name": row.network_name,
"network_key": row.network_key,
},
)
return None
Expand Down
2 changes: 1 addition & 1 deletion crmprtd/bulk_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ def process(current_time, opts, args):

cache_filename = default_cache_filename(
timestamp=current_time,
network_name=opts.network,
network_key=opts.network,
tag=opts.tag,
frequency=opts.frequency if opts.network == "ec" else None,
province=opts.province if opts.network == "ec" else None,
Expand Down
Loading