2
2
import logging
3
3
import typing
4
4
import numpy as np
5
- import ruptures as rpt
6
5
import matplotlib .pyplot as plt
6
+ from scipy .signal import find_peaks
7
+ import pandas as pd
7
8
import re
8
9
9
10
from caimira import models
21
22
class CO2FormData (FormData ):
22
23
CO2_data : dict
23
24
fitting_ventilation_states : list
24
- fitting_ventilation_type : str
25
25
room_capacity : typing .Optional [int ]
26
26
27
27
#: The default values for undefined fields. Note that the defaults here
28
28
#: and the defaults in the html form must not be contradictory.
29
29
_DEFAULTS : typing .ClassVar [typing .Dict [str , typing .Any ]] = {
30
30
'CO2_data' : '{}' ,
31
+ 'fitting_ventilation_states' : '[]' ,
31
32
'exposed_coffee_break_option' : 'coffee_break_0' ,
32
33
'exposed_coffee_duration' : 5 ,
33
34
'exposed_finish' : '17:30' ,
34
35
'exposed_lunch_finish' : '13:30' ,
35
36
'exposed_lunch_option' : True ,
36
37
'exposed_lunch_start' : '12:30' ,
37
38
'exposed_start' : '08:30' ,
38
- 'fitting_ventilation_states' : '[]' ,
39
- 'fitting_ventilation_type' : 'fitting_natural_ventilation' ,
40
39
'infected_coffee_break_option' : 'coffee_break_0' ,
41
40
'infected_coffee_duration' : 5 ,
42
41
'infected_dont_have_breaks_with_exposed' : False ,
@@ -97,55 +96,75 @@ def validate(self):
97
96
raise TypeError (f'Unable to fetch "finish_time" key. Got "{ dict_keys [1 ]} ".' )
98
97
for time in input_break .values ():
99
98
if not re .compile ("^(2[0-3]|[01]?[0-9]):([0-5]?[0-9])$" ).match (time ):
100
- raise TypeError (f'Wrong time format - "HH:MM". Got "{ time } ".' )
99
+ raise TypeError (f'Wrong time format - "HH:MM". Got "{ time } ".' )
101
100
102
- @classmethod
103
- def find_change_points_with_pelt (self , CO2_data : dict ):
101
+ def find_change_points (self ) -> list :
104
102
"""
105
- Perform change point detection using Pelt algorithm from ruptures library with pen=15.
106
- Returns a list of tuples containing (index, X-axis value) for the detected significant changes.
103
+ Perform change point detection using scipy library (find_peaks method) with rolling average of data.
104
+ Incorporate existing state change candidates and adjust the result accordingly.
105
+ Returns a list of the detected ventilation state changes, discarding any occupancy state change.
107
106
"""
108
-
109
- times : list = CO2_data ['times' ]
110
- CO2_values : list = CO2_data ['CO2' ]
107
+ times : list = self .CO2_data ['times' ]
108
+ CO2_values : list = self .CO2_data ['CO2' ]
111
109
112
110
if len (times ) != len (CO2_values ):
113
111
raise ValueError ("times and CO2 values must have the same length." )
114
112
115
- # Convert the input list to a numpy array for use with the ruptures library
116
- CO2_np = np .array (CO2_values )
113
+ # Time difference between two consecutive time data entries, in seconds
114
+ diff = (times [1 ] - times [0 ]) * 3600 # Initial data points in absolute hours, e.g. 14.78
115
+
116
+ # Calculate minimum interval for smoothing technique
117
+ smooth_min_interval_in_minutes = 1 # Minimum time difference for smooth technique
118
+ window_size = max (int ((smooth_min_interval_in_minutes * 60 ) // diff ), 1 )
117
119
118
- # Define the model for change point detection (Radial Basis Function kernel)
119
- model = "rbf"
120
+ # Applying a rolling average to smooth the initial data
121
+ smoothed_co2 = pd . Series ( CO2_values ). rolling ( window = window_size , center = True ). mean ()
120
122
121
- # Fit the Pelt algorithm to the data with the specified model
122
- algo = rpt .Pelt (model = model ).fit (CO2_np )
123
+ # Calculate minimum interval for peaks and valleys detection
124
+ peak_valley_min_interval_in_minutes = 15 # Minimum time difference between two peaks or two valleys
125
+ min_distance_points = max (int ((peak_valley_min_interval_in_minutes * 60 ) // diff ), 1 )
123
126
124
- # Predict change points using the Pelt algorithm with a penalty value of 15
125
- result = algo .predict (pen = 15 )
127
+ # Calculate minimum width of datapoints for valley detection
128
+ width_min_interval_in_minutes = 20 # Minimum duration of a valley
129
+ min_valley_width = max (int ((width_min_interval_in_minutes * 60 ) // diff ), 1 )
126
130
127
- # Find local minima and maxima
128
- segments = np .split (np .arange (len (CO2_values )), result )
129
- merged_segments = [np .hstack ((segments [i ], segments [i + 1 ])) for i in range (len (segments ) - 1 )]
130
- result_set = set ()
131
- for segment in merged_segments [:- 2 ]:
132
- result_set .add (times [CO2_values .index (min (CO2_np [segment ]))])
133
- result_set .add (times [CO2_values .index (max (CO2_np [segment ]))])
134
- return list (result_set )
131
+ # Find peaks (maxima) in the smoothed data applying the distance factor
132
+ peaks , _ = find_peaks (smoothed_co2 .values , prominence = 100 , distance = min_distance_points )
133
+
134
+ # Find valleys (minima) by inverting the smoothed data and applying the width and distance factors
135
+ valleys , _ = find_peaks (- smoothed_co2 .values , prominence = 50 , width = min_valley_width , distance = min_distance_points )
135
136
136
- @classmethod
137
- def generate_ventilation_plot (self , CO2_data : dict ,
138
- transition_times : typing .Optional [list ] = None ,
139
- predictive_CO2 : typing .Optional [list ] = None ):
140
- times_values = CO2_data ['times' ]
141
- CO2_values = CO2_data ['CO2' ]
137
+ # Extract peak and valley timestamps
138
+ timestamps = np .array (times )
139
+ peak_timestamps = timestamps [peaks ]
140
+ valley_timestamps = timestamps [valleys ]
141
+
142
+ return sorted (np .concatenate ((peak_timestamps , valley_timestamps )))
143
+
144
+ def generate_ventilation_plot (self ,
145
+ ventilation_transition_times : typing .Optional [list ] = None ,
146
+ occupancy_transition_times : typing .Optional [list ] = None ,
147
+ predictive_CO2 : typing .Optional [list ] = None ) -> str :
148
+
149
+ # Plot data (x-axis: times; y-axis: CO2 concentrations)
150
+ times_values : list = self .CO2_data ['times' ]
151
+ CO2_values : list = self .CO2_data ['CO2' ]
142
152
143
153
fig = plt .figure (figsize = (7 , 4 ), dpi = 110 )
144
154
plt .plot (times_values , CO2_values , label = 'Input CO₂' )
145
155
146
- if (transition_times ):
147
- for time in transition_times :
148
- plt .axvline (x = time , color = 'grey' , linewidth = 0.5 , linestyle = '--' )
156
+ # Add occupancy state changes:
157
+ if (occupancy_transition_times ):
158
+ for i , time in enumerate (occupancy_transition_times ):
159
+ plt .axvline (x = time , color = 'grey' , linewidth = 0.5 , linestyle = '--' , label = 'Occupancy change (from input)' if i == 0 else None )
160
+ # Add ventilation state changes:
161
+ if (ventilation_transition_times ):
162
+ for i , time in enumerate (ventilation_transition_times ):
163
+ if i == 0 :
164
+ label = 'Ventilation change (detected)' if occupancy_transition_times else 'Ventilation state changes'
165
+ else : label = None
166
+ plt .axvline (x = time , color = 'red' , linewidth = 0.5 , linestyle = '--' , label = label )
167
+
149
168
if (predictive_CO2 ):
150
169
plt .plot (times_values , predictive_CO2 , label = 'Predictive CO₂' )
151
170
plt .xlabel ('Time of day' )
@@ -158,14 +177,18 @@ def population_present_changes(self, infected_presence: models.Interval, exposed
158
177
state_change_times .update (exposed_presence .transition_times ())
159
178
return sorted (state_change_times )
160
179
161
- def ventilation_transition_times (self ) -> typing .Tuple [float , ...]:
162
- # Check what type of ventilation is considered for the fitting
163
- if self .fitting_ventilation_type == 'fitting_natural_ventilation' :
164
- vent_states = self .fitting_ventilation_states
165
- vent_states .append (self .CO2_data ['times' ][- 1 ])
166
- return tuple (vent_states )
167
- else :
168
- return tuple ((self .CO2_data ['times' ][0 ], self .CO2_data ['times' ][- 1 ]))
180
+ def ventilation_transition_times (self ) -> typing .Tuple [float ]:
181
+ '''
182
+ Check if the last time from the input data is
183
+ included in the ventilation ventilations state.
184
+ Given that the last time is a required state change,
185
+ if not included, this method adds it.
186
+ '''
187
+ vent_states = self .fitting_ventilation_states
188
+ last_time_from_input = self .CO2_data ['times' ][- 1 ]
189
+ if (vent_states and last_time_from_input != vent_states [- 1 ]): # The last time value is always needed for the last ACH interval.
190
+ vent_states .append (last_time_from_input )
191
+ return tuple (vent_states )
169
192
170
193
def build_model (self , size = None ) -> models .CO2DataModel : # type: ignore
171
194
size = size or self .data_registry .monte_carlo ['sample_size' ]
@@ -184,7 +207,7 @@ def build_model(self, size=None) -> models.CO2DataModel: # type: ignore
184
207
activity = None , # type: ignore
185
208
)
186
209
187
- all_state_changes = self .population_present_changes (infected_presence , exposed_presence )
210
+ all_state_changes = self .population_present_changes (infected_presence , exposed_presence )
188
211
total_people = [infected_population .people_present (stop ) + exposed_population .people_present (stop )
189
212
for _ , stop in zip (all_state_changes [:- 1 ], all_state_changes [1 :])]
190
213
0 commit comments