diff --git a/alembic.ini b/alembic.ini index fc7e311..6f8520d 100644 --- a/alembic.ini +++ b/alembic.ini @@ -38,7 +38,16 @@ script_location = alembic [test_sqlite] sqlalchemy.url = sqlite:///modelmeta/data/mddb-v2.sqlite +[test_sqlite_local] +# SQLite on local filesystem +sqlalchemy.url = sqlite:///test.sqlite + +[test_pg_local] +# Postgres server in local docker container +sqlalchemy.url = postgresql://postgres@localhost:30011/mm_test + [test_pg_pcic_meta] +# DEPRECATED: Postgres server running locally (not dockerized) sqlalchemy.url = postgresql://pcic_meta@localhost/modelmeta_test [prod_pcic_meta] diff --git a/alembic/versions/c0810e121564_add_streamflow_order_and_results_tables.py b/alembic/versions/c0810e121564_add_streamflow_order_and_results_tables.py new file mode 100644 index 0000000..4dd7d0c --- /dev/null +++ b/alembic/versions/c0810e121564_add_streamflow_order_and_results_tables.py @@ -0,0 +1,47 @@ +"""Add streamflow order and results tables + +Revision ID: c0810e121564 +Revises: 12f290b63791 +Create Date: 2018-09-05 11:41:58.245456 + +""" +from alembic import op +import sqlalchemy as sa + + +# revision identifiers, used by Alembic. +revision = 'c0810e121564' +down_revision = '12f290b63791' +branch_labels = None +depends_on = None + + +def upgrade(): + op.create_table('streamflow_results', + sa.Column('streamflow_result_id', sa.Integer(), nullable=False), + sa.Column('data_file_id', sa.Integer(), nullable=True), + sa.Column('station_id', sa.Integer(), nullable=True), + sa.Column('status', sa.Enum('queued', 'processing', 'error', 'cancelled', 'ready', 'removed', name='streamflow_result_statuses'), nullable=False), + sa.ForeignKeyConstraint(['data_file_id'], ['data_files.data_file_id'], ), + sa.ForeignKeyConstraint(['station_id'], ['stations.station_id'], ), + sa.PrimaryKeyConstraint('streamflow_result_id') + ) + + op.create_table('streamflow_orders', + sa.Column('streamflow_order_id', sa.Integer(), nullable=False), + sa.Column('hydromodel_output_id', sa.Integer(), nullable=False), + sa.Column('streamflow_result_id', sa.Integer(), nullable=False), + sa.Column('longitude', sa.Float(), nullable=False), + sa.Column('latitude', sa.Float(), nullable=False), + sa.Column('notification_method', sa.Enum('none', 'email', name='notification_methods'), nullable=False), + sa.Column('notification_address', sa.String(length=255), nullable=True), + sa.Column('status', sa.Enum('accepted', 'fulfilled', 'cancelled', 'error', name='streamflow_order_statuses'), nullable=False), + sa.ForeignKeyConstraint(['hydromodel_output_id'], ['data_files.data_file_id'], ), + sa.ForeignKeyConstraint(['streamflow_result_id'], ['streamflow_results.streamflow_result_id'], ), + sa.PrimaryKeyConstraint('streamflow_order_id') + ) + + +def downgrade(): + op.drop_table('streamflow_orders') + op.drop_table('streamflow_results') diff --git a/modelmeta/v2.py b/modelmeta/v2.py index 22e9715..60974d6 100644 --- a/modelmeta/v2.py +++ b/modelmeta/v2.py @@ -26,6 +26,8 @@ VariableAlias YCellBound SpatialRefSys + StreamflowOrder + StreamflowResult '''.split() from pkg_resources import resource_filename @@ -692,3 +694,71 @@ def __repr__(self): # We don't declare constraints on SpatialRefSys because the Postgis plugin is # responsible for creating it. + + +class StreamflowOrder(Base): + __tablename__ = 'streamflow_orders' + + # column definitions + id = Column('streamflow_order_id', Integer, primary_key=True, nullable=False) + hydromodel_output_id = Column( + Integer, ForeignKey('data_files.data_file_id'), + nullable=False) + streamflow_result_id = Column( + Integer, ForeignKey('streamflow_results.streamflow_result_id'), + nullable=False) + longitude = Column(Float, nullable=False) + latitude = Column(Float, nullable=False) + notification_method = Column( + Enum( + 'none', 'email', + name='notification_methods' + ), + nullable=False + ) + notification_address = Column(String(length=255), nullable=True) + status = Column( + Enum( + 'accepted', 'fulfilled', 'cancelled', 'error', + name='streamflow_order_statuses' + ), + nullable=False + ) + + # relationships + hydromodel_output = relationship('DataFile') + result = relationship('StreamflowResult', back_populates='orders') + + def __str__(self): + return obj_repr( + 'id hydromodel_output_id streamflow_result_id longitude latitude ' + 'notification_address status', self) + + +class StreamflowResult(Base): + __tablename__ = 'streamflow_results' + + # column definitions + id = Column('streamflow_result_id', Integer, primary_key=True, nullable=False) + data_file_id = Column( + Integer, ForeignKey('data_files.data_file_id'), + nullable=True) + station_id = Column( + Integer, ForeignKey('stations.station_id'), + nullable=True) + status = Column( + Enum( + 'queued', 'processing', 'error', 'cancelled', 'ready', 'removed', + name='streamflow_result_statuses', + ), + nullable=False + ) + + # relationships + data_file = relationship('DataFile') + station = relationship('Station') + orders = relationship('StreamflowOrder', back_populates='result') + + def __str__(self): + return obj_repr( + 'id data_file_id station_id status', self) diff --git a/tests/conftest.py b/tests/conftest.py index 85bb9ab..c3eed79 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -90,6 +90,10 @@ def make_data_file(i, run=None, timeset=None): def data_file_1(): return make_data_file(1) +@pytest.fixture(scope='function') +def data_file_2(): + return make_data_file(2) + # Grid @@ -181,15 +185,15 @@ def make_test_dfv_dsg_time_series(i, file=None, variable_alias=None): @pytest.fixture(scope='function') -def dfv_dsg_time_series_1(data_file_1, variable_alias_1): +def dfv_dsg_time_series_1(data_file_2, variable_alias_1): return make_test_dfv_dsg_time_series( - 1, file=data_file_1, variable_alias=variable_alias_1) + 1, file=data_file_2, variable_alias=variable_alias_1) @pytest.fixture(scope='function') -def dfv_dsg_time_series_2(data_file_1, variable_alias_2): +def dfv_dsg_time_series_2(data_file_2, variable_alias_2): return make_test_dfv_dsg_time_series( - 2, file=data_file_1, variable_alias=variable_alias_2) + 2, file=data_file_2, variable_alias=variable_alias_2) # Station diff --git a/tests/modelmeta/test_orm.py b/tests/modelmeta/test_orm.py index c38cc54..2ea83d0 100644 --- a/tests/modelmeta/test_orm.py +++ b/tests/modelmeta/test_orm.py @@ -24,7 +24,9 @@ Variable, \ VariableAlias, \ YCellBound, \ - SpatialRefSys + SpatialRefSys, \ + StreamflowOrder, \ + StreamflowResult all_classes = [ ClimatologicalTime, @@ -62,6 +64,14 @@ def check_db_is_empty(sesh): assert sesh.query(C).count() == 0, C.__name__ +def add_and_flush(sesh, to_add): + if isinstance(to_add, list): + sesh.add_all(to_add) + else: + sesh.add(to_add) + sesh.flush() + + def test_dfv_gridded( test_session_with_empty_db, dfv_gridded_1, data_file_1, variable_alias_1, level_set_1, grid_1): @@ -80,7 +90,7 @@ def test_dfv_gridded( def test_dfv_dsg_time_series( test_session_with_empty_db, dfv_dsg_time_series_1, - data_file_1, variable_alias_1): + data_file_2, variable_alias_1): sesh = test_session_with_empty_db check_db_is_empty(sesh) sesh.add(dfv_dsg_time_series_1) @@ -88,11 +98,11 @@ def test_dfv_dsg_time_series( print() print(dfv_dsg_time_series_1) assert dfv_dsg_time_series_1.geometry_type == 'dsg_time_series' - assert data_file_1.data_file_variables == [dfv_dsg_time_series_1] + assert data_file_2.data_file_variables == [dfv_dsg_time_series_1] assert variable_alias_1.data_file_variables == [dfv_dsg_time_series_1] -def associate_dfvs_and_stations(sesh, dfvs, stations): +def add_and_associate_dfvs_and_stations(sesh, dfvs, stations): """" Helper. Associate each timeSeries geometry variable with each station. """ @@ -129,7 +139,7 @@ def test_dfv_dsg_ts_x_station_delete( dfvs = {dfv_dsg_time_series_1, dfv_dsg_time_series_2} stations = {station_1, station_2} - associate_dfvs_and_stations(sesh, dfvs, stations) + add_and_associate_dfvs_and_stations(sesh, dfvs, stations) if del_dfv: sesh.delete(dfv_dsg_time_series_1) @@ -147,3 +157,166 @@ def test_dfv_dsg_ts_x_station_delete( for station in stations: assert set(station.data_file_variables) == dfvs + +def streamflow_workflow_new( + sesh, hydromodel_output, streamflow, station_1, station_2, + remove=True): + """ + A happy-path order fulfillment workflow where the result that fulfils + the order must be computed (does not already exist). Steps are: + + #. (setup) Add hydromodel output DF etc. for Order parametrization + #. Add Result (counterintuitively, this comes first) in 'queued' state. + This fulfils the Order; it represents a computation that is at this + moment queued. + #. Add Order in 'accepted' state that links to Result and hydromodel + output DF. + #. Modify Result state to 'processing'. + #. Add DF, DFV, Stns to represent streamflow result file. + #. Modify Result state to 'ready', point at appropriate DF and Station; + #. Modify Order state to 'fulfilled'. + #. Modify Result state to 'removed'. + + """ + + # (setup) Add hydromodel output DF etc. for Order parametrization + add_and_flush(sesh, hydromodel_output) + + # Add Result (counterintuitively, this comes first) in 'queued' state. + # This fulfils the Order; it represents a computation that is at this + # moment queued. + result = StreamflowResult( + status='queued' + ) + add_and_flush(sesh, result) + + # Add Order in 'accepted' state that links to Result and hydromodel + # output DF. + order = StreamflowOrder( + hydromodel_output=hydromodel_output, + longitude=-123.5, + latitude=50.5, + notification_method='email', + notification_address='abc@example.ca', + status='accepted', + result=result + ) + add_and_flush(sesh, order) + + # Modify Result state to 'processing'. + result.status = 'processing' + sesh.flush() + + # Add DF, DFV, Stns to represent streamflow result file. + dfvs = {streamflow.data_file_variables[0]} + stations = {station_1, station_2} + add_and_associate_dfvs_and_stations(sesh, dfvs, stations) + + # Modify Result state to 'ready', point at appropriate DF and Station. + result.status = 'ready' + result.data_file = streamflow + result.station = station_1 + + # Modify Order state to 'fulfilled'. + order.status = 'fulfilled' + sesh.flush() + + if remove: + # Modify Result state to 'removed'; remove relations to DF and Stn + result.status = 'removed' + result.data_file = None + result.station = None + sesh.flush() + + +def test_streamflow_workflow_new( + test_session_with_empty_db, + dfv_gridded_1, + dfv_dsg_time_series_1, station_1, station_2, +): + """ + Test a happy-path order fulfillment workflow. + """ + sesh = test_session_with_empty_db + check_db_is_empty(sesh) + + hydromodel_output = dfv_gridded_1.file + streamflow = dfv_dsg_time_series_1.file + + # Execute the new-result workflow: does it explode? + streamflow_workflow_new( + sesh, hydromodel_output, streamflow, station_1, station_2) + + # Do a few nominal query-based tests + q = sesh.query(StreamflowOrder).filter_by(status='fulfilled') + assert q.count() == 1 + order = q.one() + assert order.hydromodel_output_id == hydromodel_output.id + assert order.result.status == 'removed' + assert order.result.data_file is None + + +def streamflow_workflow_existing(sesh, hydromodel_output): + """ + A happy-path order fulfillment workflow where the result that fulfils + the order already exists. Steps: + + #. Find existing Order that matches parameters. + #. Add a new, 'fulfilled' Order that points to the existing Result. + """ + + # Params for matching query + longitude = -123.5 + latitude = 50.5 + + existing_order = ( + sesh.query(StreamflowOrder).join(StreamflowResult) + .filter(StreamflowOrder.hydromodel_output_id == hydromodel_output.id) + .filter(StreamflowOrder.longitude == longitude) + .filter(StreamflowOrder.latitude == latitude) + .filter(StreamflowOrder.status =='fulfilled') + .filter(StreamflowResult.status == 'ready') + ).first() + + new_order = StreamflowOrder( + hydromodel_output=hydromodel_output, + longitude=longitude, + latitude=latitude, + result=existing_order.result, + notification_method='email', + notification_address='foo@xyz.com', + status='fulfilled' + ) + add_and_flush(sesh, new_order) + + +def test_streamflow_workflow_existing( + test_session_with_empty_db, + dfv_gridded_1, + dfv_dsg_time_series_1, station_1, station_2, +): + """ + Test order fulfillment workflow where there is a pre-existing result + that fulfils the order. + """ + sesh = test_session_with_empty_db + check_db_is_empty(sesh) + + hydromodel_output = dfv_gridded_1.file + streamflow = dfv_dsg_time_series_1.file + + # Add "pre-existing" Order, Result, and supporting records. + # This is just the new-result workflow, which we reuse here. + streamflow_workflow_new( + sesh, hydromodel_output, streamflow, station_1, station_2, remove=False) + + # Execute the existing-result workflow + streamflow_workflow_existing(sesh, hydromodel_output) + + # Test that everything is as it should be + q = sesh.query(StreamflowOrder).filter_by(status='fulfilled') + assert q.count() == 2 + for order in q.all(): + assert order.hydromodel_output_id == hydromodel_output.id + assert order.result.status == 'ready' + assert order.result.data_file_id == streamflow.id