-
Notifications
You must be signed in to change notification settings - Fork 4
WIP Issue #29 - Add batch capability for running analytics over historical results (e.g. after uploading data from SD cards) #30
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
7bb26f5
158a3f0
c7b01dc
9a9131d
104264e
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | ||||
|---|---|---|---|---|---|---|
|
|
@@ -507,7 +507,10 @@ def _calculate_TWA_and_gauge_for_all_firefighters(self, sensor_log_chunk_df, ff_ | |||||
| # current_utc_timestamp : The UTC datetime for which to calculate sensor analytics. Defaults to 'now' (UTC). | ||||||
| # commit : Utility flag for unit testing - defaults to committing analytic results to | ||||||
| # the database. Setting commit=False prevents unit tests from writing to the database. | ||||||
| def run_analytics (self, current_utc_timestamp=None, commit=True) : | ||||||
| # overwrite : Enables batch execution to overwrite any existing result in the time period being processed. | ||||||
| # log_mins : How often to log an informational message stating which minute is currently being processed. Defaults | ||||||
| # to every '1' min. Can be set to (eg) every '15' mins when running batches, so the logs are readable. | ||||||
| def run_analytics (self, current_utc_timestamp=None, commit=True, overwrite=False, log_mins=1) : | ||||||
|
|
||||||
| # Get the desired timeframe for the analytics run and standardise it to UTC. | ||||||
| if current_utc_timestamp is None: | ||||||
|
|
@@ -528,9 +531,11 @@ def run_analytics (self, current_utc_timestamp=None, commit=True) : | |||||
| # buffer for the data. | ||||||
| timestamp_key = current_utc_timestamp.floor(freq='min') - pd.Timedelta(minutes = 1) | ||||||
|
|
||||||
| message = ("Running Prometeo Analytics for minute key '%s'" % (timestamp_key.isoformat())) | ||||||
| if not self._from_db : message += " (local CSV file mode)" | ||||||
| self.logger.info(message) | ||||||
| # Log progress regularly (e.g. by default, log_mins is 'every 1 min', but could be set to 'every 15 mins'). | ||||||
| if (timestamp_key == timestamp_key.floor(str(log_mins) + 'T')) : | ||||||
| message = ("Running Prometeo Analytics for minute key '%s'" % (timestamp_key.floor(str(log_mins) + 'T'))) | ||||||
| if not self._from_db : message += " (local CSV file mode)" | ||||||
| self.logger.info(message) | ||||||
|
|
||||||
| # Read a block of sensor logs from the DB, covering the longest window we're calculating over (usually 8hrs). | ||||||
| # Note: This has the advantage of always including all known sensor data, even when that data was delayed due | ||||||
|
|
@@ -550,7 +555,77 @@ def run_analytics (self, current_utc_timestamp=None, commit=True) : | |||||
| # Work out all the time-weighted averages and corresponding limit gauges for all firefighters, all limits and all gases. | ||||||
| analytics_df = self._calculate_TWA_and_gauge_for_all_firefighters(sensor_log_df, ff_time_spans_df, timestamp_key) | ||||||
|
|
||||||
| # Write the analytic results to the DB | ||||||
| if commit : | ||||||
|
|
||||||
| # Remove any pre-existing analytic results before writing new ones. | ||||||
| if overwrite : | ||||||
| with self._db_engine.connect() as connection: # 'with' auto-closes the connection | ||||||
| connection.execute("DELETE FROM " + ANALYTICS_TABLE + " where " + TIMESTAMP_COL + " = '" + timestamp_key.isoformat() + "'") | ||||||
|
|
||||||
| analytics_df.to_sql(ANALYTICS_TABLE, self._db_engine, if_exists='append', dtype={FIREFIGHTER_ID_COL:FIREFIGHTER_ID_COL_TYPE}) | ||||||
|
|
||||||
| return analytics_df | ||||||
|
|
||||||
|
|
||||||
| # This is the batched version of 'main' - given a start time and and end time, it generates a minute-by-minute | ||||||
| # playback schedule and runs all of the core analytics for Prometeo for each of those minutes. | ||||||
| # start_time : The date & time at which to start calculating sensor analytics (UTC datetime). | ||||||
| # end_time : The date & time at which to stop calculating sensor analytics (UTC datetime). | ||||||
| # commit : Utility flag for unit testing. Defaults to committing analytic results to the database for production. | ||||||
| # Setting commit=False prevents unit tests from writing to the database. | ||||||
| def batch_run_analytics (self, start_time=None, end_time=None, commit=True) : | ||||||
|
|
||||||
| # Log that a batch procss is starting & what it will look at and what it will over-write. | ||||||
|
||||||
| # Log that a batch procss is starting & what it will look at and what it will over-write. | |
| # Log that a batch process is starting & what it will look at and what it will over-write. |
Copilot
AI
Oct 27, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
SQL injection vulnerability: The date parameter from user input is directly concatenated into the SQL query without parameterization. Use parameterized queries instead: sql = text('SELECT MIN(:col) AS start_time, MAX(:col) AS end_time FROM :table WHERE DATE(:col) = :date') and bind the parameters.
| Original file line number | Diff line number | Diff line change | ||||
|---|---|---|---|---|---|---|
|
|
@@ -7,6 +7,7 @@ | |||||
| from GasExposureAnalytics import GasExposureAnalytics | ||||||
| from dotenv import load_dotenv | ||||||
| import time | ||||||
| from datetime import date | ||||||
| import atexit | ||||||
| from apscheduler.schedulers.background import BackgroundScheduler | ||||||
| import logging | ||||||
|
|
@@ -45,6 +46,7 @@ | |||||
| FIREFIGHTER_ID_COL = 'firefighter_id' | ||||||
| TIMESTAMP_COL = 'timestamp_mins' | ||||||
| STATUS_LED_COL = 'analytics_status_LED' | ||||||
| DATE_PARAMETER = 'date' | ||||||
|
|
||||||
| # We initialize the prometeo Analytics engine. | ||||||
| perMinuteAnalytics = GasExposureAnalytics() | ||||||
|
|
@@ -167,6 +169,45 @@ def getStatusDetails(): | |||||
| logger.error(f'Internal Server Error: {e}') | ||||||
| abort(500) | ||||||
|
|
||||||
|
|
||||||
| @app.route('/batch_run_analytics_by_date', methods=['GET']) | ||||||
| def batch_run_analytics_by_date(): | ||||||
|
|
||||||
| try: | ||||||
| date_str = request.args.get(DATE_PARAMETER) | ||||||
|
|
||||||
| # Return 400 (Bad Request) if the supplied date is invalid. | ||||||
| if (date_str is None) : | ||||||
| logger.error('Missing parameters : '+DATE_PARAMETER+' : '+date_str) | ||||||
|
||||||
| logger.error('Missing parameters : '+DATE_PARAMETER+' : '+date_str) | |
| logger.error('Missing required parameter: ' + DATE_PARAMETER) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
SQL injection vulnerability: The DELETE statement uses string concatenation with
timestamp_key.isoformat()without parameterization. Althoughtimestamp_keyis generated internally, use parameterized queries for consistency and safety. Replace with:connection.execute(text('DELETE FROM :table WHERE :col = :ts'), {'table': ANALYTICS_TABLE, 'col': TIMESTAMP_COL, 'ts': timestamp_key.isoformat()})