From 9fe2fcee549965b408550087a5965fe08bd42956 Mon Sep 17 00:00:00 2001 From: Rod Glover Date: Fri, 24 Aug 2018 17:15:54 -0700 Subject: [PATCH 1/5] Define streamflow orders and results in ORM --- modelmeta/v2.py | 50 +++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 50 insertions(+) diff --git a/modelmeta/v2.py b/modelmeta/v2.py index 22e9715..a14c565 100644 --- a/modelmeta/v2.py +++ b/modelmeta/v2.py @@ -692,3 +692,53 @@ def __repr__(self): # We don't declare constraints on SpatialRefSys because the Postgis plugin is # responsible for creating it. + + +class StreamflowOrder(Base): + __tablename__ = 'streamflow_orders' + + # column definitions + id = Column('streamflow_order_id', Integer, primary_key=True, nullable=False) + hydromodel_output_id = Column( + Integer, ForeignKey('data_files.data_file_id'), + nullable=False) + streamflow_result_id = Column( + Integer, ForeignKey('streamflow_results.streamflow_result_id'), + nullable=False) + longitude = Column(Float, nullable=False) + latitude = Column(Float, nullable=False) + notification_method = Column( + Enum('none', 'email'), + nullable=False + ) + notification_address = Column(String(length=255), nullable=True) + status = Column( + Enum('accepted', 'fulfilled', 'cancelled', 'error'), + nullable=False + ) + + # relationships + hydromodel = relationship('DataFile') + result = relationship('StreamflowResult', back_populates='orders') + + +class StreamflowResult(Base): + __tablename__ = 'streamflow_results' + + # column definitions + id = Column('streamflow_result_id', Integer, primary_key=True, nullable=False) + data_file_id = Column( + Integer, ForeignKey('data_files.data_file_id'), + nullable=False) + station_id = Column( + Integer, ForeignKey('stations.station_id'), + nullable=False) + status = Column( + Enum('queued', 'processing', 'error', 'cancelled', 'ready', 'removed'), + nullable=False + ) + + # relationships + data_file = relationship('DataFile') + station = relationship('Station') + orders = relationship('StreamflowOrder', back_populates='result') \ No newline at end of file From 4f850e283ba3b18eea612ff3426cfe1ddf21f3e5 Mon Sep 17 00:00:00 2001 From: Rod Glover Date: Thu, 30 Aug 2018 15:35:51 -0700 Subject: [PATCH 2/5] Add __str__ methods; export streamflow classes --- modelmeta/v2.py | 13 ++++++++++++- 1 file changed, 12 insertions(+), 1 deletion(-) diff --git a/modelmeta/v2.py b/modelmeta/v2.py index a14c565..10b6fe9 100644 --- a/modelmeta/v2.py +++ b/modelmeta/v2.py @@ -26,6 +26,8 @@ VariableAlias YCellBound SpatialRefSys + StreamflowOrder + StreamflowResult '''.split() from pkg_resources import resource_filename @@ -721,6 +723,11 @@ class StreamflowOrder(Base): hydromodel = relationship('DataFile') result = relationship('StreamflowResult', back_populates='orders') + def __str__(self): + return obj_repr( + 'id hydromodel_output_id streamflow_result_id longitude latitude ' + 'notification_address status', self) + class StreamflowResult(Base): __tablename__ = 'streamflow_results' @@ -741,4 +748,8 @@ class StreamflowResult(Base): # relationships data_file = relationship('DataFile') station = relationship('Station') - orders = relationship('StreamflowOrder', back_populates='result') \ No newline at end of file + orders = relationship('StreamflowOrder', back_populates='result') + + def __str__(self): + return obj_repr( + 'id data_file_id station_id status', self) From 6570f3c17ea54221393838c37e0c59409a7b1c37 Mon Sep 17 00:00:00 2001 From: Rod Glover Date: Fri, 31 Aug 2018 15:25:13 -0700 Subject: [PATCH 3/5] Add basic tests of streamflow order-result; fix ORM issues revealed --- modelmeta/v2.py | 21 +++-- tests/conftest.py | 12 ++- tests/modelmeta/test_orm.py | 183 +++++++++++++++++++++++++++++++++++- 3 files changed, 201 insertions(+), 15 deletions(-) diff --git a/modelmeta/v2.py b/modelmeta/v2.py index 10b6fe9..60974d6 100644 --- a/modelmeta/v2.py +++ b/modelmeta/v2.py @@ -710,17 +710,23 @@ class StreamflowOrder(Base): longitude = Column(Float, nullable=False) latitude = Column(Float, nullable=False) notification_method = Column( - Enum('none', 'email'), + Enum( + 'none', 'email', + name='notification_methods' + ), nullable=False ) notification_address = Column(String(length=255), nullable=True) status = Column( - Enum('accepted', 'fulfilled', 'cancelled', 'error'), + Enum( + 'accepted', 'fulfilled', 'cancelled', 'error', + name='streamflow_order_statuses' + ), nullable=False ) # relationships - hydromodel = relationship('DataFile') + hydromodel_output = relationship('DataFile') result = relationship('StreamflowResult', back_populates='orders') def __str__(self): @@ -736,12 +742,15 @@ class StreamflowResult(Base): id = Column('streamflow_result_id', Integer, primary_key=True, nullable=False) data_file_id = Column( Integer, ForeignKey('data_files.data_file_id'), - nullable=False) + nullable=True) station_id = Column( Integer, ForeignKey('stations.station_id'), - nullable=False) + nullable=True) status = Column( - Enum('queued', 'processing', 'error', 'cancelled', 'ready', 'removed'), + Enum( + 'queued', 'processing', 'error', 'cancelled', 'ready', 'removed', + name='streamflow_result_statuses', + ), nullable=False ) diff --git a/tests/conftest.py b/tests/conftest.py index 85bb9ab..c3eed79 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -90,6 +90,10 @@ def make_data_file(i, run=None, timeset=None): def data_file_1(): return make_data_file(1) +@pytest.fixture(scope='function') +def data_file_2(): + return make_data_file(2) + # Grid @@ -181,15 +185,15 @@ def make_test_dfv_dsg_time_series(i, file=None, variable_alias=None): @pytest.fixture(scope='function') -def dfv_dsg_time_series_1(data_file_1, variable_alias_1): +def dfv_dsg_time_series_1(data_file_2, variable_alias_1): return make_test_dfv_dsg_time_series( - 1, file=data_file_1, variable_alias=variable_alias_1) + 1, file=data_file_2, variable_alias=variable_alias_1) @pytest.fixture(scope='function') -def dfv_dsg_time_series_2(data_file_1, variable_alias_2): +def dfv_dsg_time_series_2(data_file_2, variable_alias_2): return make_test_dfv_dsg_time_series( - 2, file=data_file_1, variable_alias=variable_alias_2) + 2, file=data_file_2, variable_alias=variable_alias_2) # Station diff --git a/tests/modelmeta/test_orm.py b/tests/modelmeta/test_orm.py index c38cc54..2ea83d0 100644 --- a/tests/modelmeta/test_orm.py +++ b/tests/modelmeta/test_orm.py @@ -24,7 +24,9 @@ Variable, \ VariableAlias, \ YCellBound, \ - SpatialRefSys + SpatialRefSys, \ + StreamflowOrder, \ + StreamflowResult all_classes = [ ClimatologicalTime, @@ -62,6 +64,14 @@ def check_db_is_empty(sesh): assert sesh.query(C).count() == 0, C.__name__ +def add_and_flush(sesh, to_add): + if isinstance(to_add, list): + sesh.add_all(to_add) + else: + sesh.add(to_add) + sesh.flush() + + def test_dfv_gridded( test_session_with_empty_db, dfv_gridded_1, data_file_1, variable_alias_1, level_set_1, grid_1): @@ -80,7 +90,7 @@ def test_dfv_gridded( def test_dfv_dsg_time_series( test_session_with_empty_db, dfv_dsg_time_series_1, - data_file_1, variable_alias_1): + data_file_2, variable_alias_1): sesh = test_session_with_empty_db check_db_is_empty(sesh) sesh.add(dfv_dsg_time_series_1) @@ -88,11 +98,11 @@ def test_dfv_dsg_time_series( print() print(dfv_dsg_time_series_1) assert dfv_dsg_time_series_1.geometry_type == 'dsg_time_series' - assert data_file_1.data_file_variables == [dfv_dsg_time_series_1] + assert data_file_2.data_file_variables == [dfv_dsg_time_series_1] assert variable_alias_1.data_file_variables == [dfv_dsg_time_series_1] -def associate_dfvs_and_stations(sesh, dfvs, stations): +def add_and_associate_dfvs_and_stations(sesh, dfvs, stations): """" Helper. Associate each timeSeries geometry variable with each station. """ @@ -129,7 +139,7 @@ def test_dfv_dsg_ts_x_station_delete( dfvs = {dfv_dsg_time_series_1, dfv_dsg_time_series_2} stations = {station_1, station_2} - associate_dfvs_and_stations(sesh, dfvs, stations) + add_and_associate_dfvs_and_stations(sesh, dfvs, stations) if del_dfv: sesh.delete(dfv_dsg_time_series_1) @@ -147,3 +157,166 @@ def test_dfv_dsg_ts_x_station_delete( for station in stations: assert set(station.data_file_variables) == dfvs + +def streamflow_workflow_new( + sesh, hydromodel_output, streamflow, station_1, station_2, + remove=True): + """ + A happy-path order fulfillment workflow where the result that fulfils + the order must be computed (does not already exist). Steps are: + + #. (setup) Add hydromodel output DF etc. for Order parametrization + #. Add Result (counterintuitively, this comes first) in 'queued' state. + This fulfils the Order; it represents a computation that is at this + moment queued. + #. Add Order in 'accepted' state that links to Result and hydromodel + output DF. + #. Modify Result state to 'processing'. + #. Add DF, DFV, Stns to represent streamflow result file. + #. Modify Result state to 'ready', point at appropriate DF and Station; + #. Modify Order state to 'fulfilled'. + #. Modify Result state to 'removed'. + + """ + + # (setup) Add hydromodel output DF etc. for Order parametrization + add_and_flush(sesh, hydromodel_output) + + # Add Result (counterintuitively, this comes first) in 'queued' state. + # This fulfils the Order; it represents a computation that is at this + # moment queued. + result = StreamflowResult( + status='queued' + ) + add_and_flush(sesh, result) + + # Add Order in 'accepted' state that links to Result and hydromodel + # output DF. + order = StreamflowOrder( + hydromodel_output=hydromodel_output, + longitude=-123.5, + latitude=50.5, + notification_method='email', + notification_address='abc@example.ca', + status='accepted', + result=result + ) + add_and_flush(sesh, order) + + # Modify Result state to 'processing'. + result.status = 'processing' + sesh.flush() + + # Add DF, DFV, Stns to represent streamflow result file. + dfvs = {streamflow.data_file_variables[0]} + stations = {station_1, station_2} + add_and_associate_dfvs_and_stations(sesh, dfvs, stations) + + # Modify Result state to 'ready', point at appropriate DF and Station. + result.status = 'ready' + result.data_file = streamflow + result.station = station_1 + + # Modify Order state to 'fulfilled'. + order.status = 'fulfilled' + sesh.flush() + + if remove: + # Modify Result state to 'removed'; remove relations to DF and Stn + result.status = 'removed' + result.data_file = None + result.station = None + sesh.flush() + + +def test_streamflow_workflow_new( + test_session_with_empty_db, + dfv_gridded_1, + dfv_dsg_time_series_1, station_1, station_2, +): + """ + Test a happy-path order fulfillment workflow. + """ + sesh = test_session_with_empty_db + check_db_is_empty(sesh) + + hydromodel_output = dfv_gridded_1.file + streamflow = dfv_dsg_time_series_1.file + + # Execute the new-result workflow: does it explode? + streamflow_workflow_new( + sesh, hydromodel_output, streamflow, station_1, station_2) + + # Do a few nominal query-based tests + q = sesh.query(StreamflowOrder).filter_by(status='fulfilled') + assert q.count() == 1 + order = q.one() + assert order.hydromodel_output_id == hydromodel_output.id + assert order.result.status == 'removed' + assert order.result.data_file is None + + +def streamflow_workflow_existing(sesh, hydromodel_output): + """ + A happy-path order fulfillment workflow where the result that fulfils + the order already exists. Steps: + + #. Find existing Order that matches parameters. + #. Add a new, 'fulfilled' Order that points to the existing Result. + """ + + # Params for matching query + longitude = -123.5 + latitude = 50.5 + + existing_order = ( + sesh.query(StreamflowOrder).join(StreamflowResult) + .filter(StreamflowOrder.hydromodel_output_id == hydromodel_output.id) + .filter(StreamflowOrder.longitude == longitude) + .filter(StreamflowOrder.latitude == latitude) + .filter(StreamflowOrder.status =='fulfilled') + .filter(StreamflowResult.status == 'ready') + ).first() + + new_order = StreamflowOrder( + hydromodel_output=hydromodel_output, + longitude=longitude, + latitude=latitude, + result=existing_order.result, + notification_method='email', + notification_address='foo@xyz.com', + status='fulfilled' + ) + add_and_flush(sesh, new_order) + + +def test_streamflow_workflow_existing( + test_session_with_empty_db, + dfv_gridded_1, + dfv_dsg_time_series_1, station_1, station_2, +): + """ + Test order fulfillment workflow where there is a pre-existing result + that fulfils the order. + """ + sesh = test_session_with_empty_db + check_db_is_empty(sesh) + + hydromodel_output = dfv_gridded_1.file + streamflow = dfv_dsg_time_series_1.file + + # Add "pre-existing" Order, Result, and supporting records. + # This is just the new-result workflow, which we reuse here. + streamflow_workflow_new( + sesh, hydromodel_output, streamflow, station_1, station_2, remove=False) + + # Execute the existing-result workflow + streamflow_workflow_existing(sesh, hydromodel_output) + + # Test that everything is as it should be + q = sesh.query(StreamflowOrder).filter_by(status='fulfilled') + assert q.count() == 2 + for order in q.all(): + assert order.hydromodel_output_id == hydromodel_output.id + assert order.result.status == 'ready' + assert order.result.data_file_id == streamflow.id From ac42146b51242947b2e861695470c07d8421bf29 Mon Sep 17 00:00:00 2001 From: Rod Glover Date: Thu, 6 Sep 2018 15:16:27 -0700 Subject: [PATCH 4/5] Add some test databases to alembic.ini --- alembic.ini | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/alembic.ini b/alembic.ini index fc7e311..6f8520d 100644 --- a/alembic.ini +++ b/alembic.ini @@ -38,7 +38,16 @@ script_location = alembic [test_sqlite] sqlalchemy.url = sqlite:///modelmeta/data/mddb-v2.sqlite +[test_sqlite_local] +# SQLite on local filesystem +sqlalchemy.url = sqlite:///test.sqlite + +[test_pg_local] +# Postgres server in local docker container +sqlalchemy.url = postgresql://postgres@localhost:30011/mm_test + [test_pg_pcic_meta] +# DEPRECATED: Postgres server running locally (not dockerized) sqlalchemy.url = postgresql://pcic_meta@localhost/modelmeta_test [prod_pcic_meta] From 58e72c9829a4e5aa341d7a5a92e1483499f333dc Mon Sep 17 00:00:00 2001 From: Rod Glover Date: Thu, 6 Sep 2018 16:06:19 -0700 Subject: [PATCH 5/5] Add migration --- ...add_streamflow_order_and_results_tables.py | 47 +++++++++++++++++++ 1 file changed, 47 insertions(+) create mode 100644 alembic/versions/c0810e121564_add_streamflow_order_and_results_tables.py diff --git a/alembic/versions/c0810e121564_add_streamflow_order_and_results_tables.py b/alembic/versions/c0810e121564_add_streamflow_order_and_results_tables.py new file mode 100644 index 0000000..4dd7d0c --- /dev/null +++ b/alembic/versions/c0810e121564_add_streamflow_order_and_results_tables.py @@ -0,0 +1,47 @@ +"""Add streamflow order and results tables + +Revision ID: c0810e121564 +Revises: 12f290b63791 +Create Date: 2018-09-05 11:41:58.245456 + +""" +from alembic import op +import sqlalchemy as sa + + +# revision identifiers, used by Alembic. +revision = 'c0810e121564' +down_revision = '12f290b63791' +branch_labels = None +depends_on = None + + +def upgrade(): + op.create_table('streamflow_results', + sa.Column('streamflow_result_id', sa.Integer(), nullable=False), + sa.Column('data_file_id', sa.Integer(), nullable=True), + sa.Column('station_id', sa.Integer(), nullable=True), + sa.Column('status', sa.Enum('queued', 'processing', 'error', 'cancelled', 'ready', 'removed', name='streamflow_result_statuses'), nullable=False), + sa.ForeignKeyConstraint(['data_file_id'], ['data_files.data_file_id'], ), + sa.ForeignKeyConstraint(['station_id'], ['stations.station_id'], ), + sa.PrimaryKeyConstraint('streamflow_result_id') + ) + + op.create_table('streamflow_orders', + sa.Column('streamflow_order_id', sa.Integer(), nullable=False), + sa.Column('hydromodel_output_id', sa.Integer(), nullable=False), + sa.Column('streamflow_result_id', sa.Integer(), nullable=False), + sa.Column('longitude', sa.Float(), nullable=False), + sa.Column('latitude', sa.Float(), nullable=False), + sa.Column('notification_method', sa.Enum('none', 'email', name='notification_methods'), nullable=False), + sa.Column('notification_address', sa.String(length=255), nullable=True), + sa.Column('status', sa.Enum('accepted', 'fulfilled', 'cancelled', 'error', name='streamflow_order_statuses'), nullable=False), + sa.ForeignKeyConstraint(['hydromodel_output_id'], ['data_files.data_file_id'], ), + sa.ForeignKeyConstraint(['streamflow_result_id'], ['streamflow_results.streamflow_result_id'], ), + sa.PrimaryKeyConstraint('streamflow_order_id') + ) + + +def downgrade(): + op.drop_table('streamflow_orders') + op.drop_table('streamflow_results')