diff --git a/server/src/data/mod.rs b/server/src/data/mod.rs index 9aa4175..c7caeff 100644 --- a/server/src/data/mod.rs +++ b/server/src/data/mod.rs @@ -1,9 +1,9 @@ -use std::{collections::VecDeque, path::PathBuf, sync::Arc}; +use std::{collections::VecDeque, env::temp_dir, path::PathBuf, sync::Arc}; use chrono::{DateTime, Duration, TimeZone, Utc}; use hackdose_server_shared::DataPoint; use tokio::{ - fs::File, + fs::{copy, File}, io::{AsyncBufReadExt, AsyncWriteExt, BufReader}, sync::Mutex, }; @@ -14,6 +14,7 @@ use crate::Configuration; pub(crate) struct EnergyData { store: Arc>>, log_location: PathBuf, + log_semaphore: Arc>, } pub(crate) mod constants { @@ -60,6 +61,7 @@ impl EnergyData { let energy_data = EnergyData { store: Default::default(), log_location: config.log_location.clone(), + log_semaphore: Default::default(), }; let mut buf = energy_data.store.lock().await; let data = File::open(config.log_location.clone()).await; @@ -97,11 +99,13 @@ impl EnergyData { Err(_) => Self { store: Default::default(), log_location: config.log_location.clone(), + log_semaphore: Default::default(), }, } } pub(crate) async fn log_data(&self, data: DataPoint) { + let s = self.log_semaphore.lock().await; let f = data.date.format(constants::PERSIST_DATE_FORMAT); let log_line = format!("{};{}\n", f, data.value); let log = tokio::fs::OpenOptions::new() @@ -116,6 +120,61 @@ impl EnergyData { } Err(_) => (), } + drop(s); + } + + pub(crate) async fn rotate_log(&self) { + let to_date = Utc::now(); + let from_date = to_date - Duration::days(constants::DATA_RETENTION_PERIOD_DAYS); + + let s = self.log_semaphore.lock().await; + let old_filename = self.log_location.clone(); + let mut new_filename = temp_dir(); + new_filename.set_file_name("hackdose_rotate.tmp"); + + let data = File::open(old_filename.clone()).await; + + let mut log = tokio::fs::OpenOptions::new() + .append(true) + .create(true) + .create_new(true) + .open(new_filename.clone()) + .await + .unwrap(); + + match data { + Ok(data) => { + let mut rdr = BufReader::new(data).lines(); + while let Ok(Some(line)) = rdr.next_line().await { + let l = line + .split(";") + .map(|x| x.to_string()) + .collect::>(); + match &l[..] { + [date_string, _, ..] => { + let date_parsed = + Utc.datetime_from_str(date_string, constants::PERSIST_DATE_FORMAT); + match date_parsed { + Ok(date) => { + if (date < to_date) && (date > from_date) { + let date_string = format!("{}\n", line); + let _ = log.write_all(date_string.as_bytes()).await; + } + } + Err(_) => (), + } + } + _ => (), + } + } + drop(rdr); + drop(log); + copy(new_filename, old_filename).await; + } + Err(_) => {} + } + + drop(s); } } @@ -130,6 +189,7 @@ mod test { let mut energy_data = EnergyData { store: Default::default(), log_location: Default::default(), + log_semaphore: Default::default(), }; let t_lower = Utc.with_ymd_and_hms(2022, 02, 02, 0, 0, 0).unwrap(); let t_upper = Utc.with_ymd_and_hms(2022, 04, 04, 0, 3, 0).unwrap(); @@ -173,6 +233,7 @@ mod test { let mut energy_data = EnergyData { store: Default::default(), log_location: Default::default(), + log_semaphore: Default::default(), }; let t_lower = Utc.with_ymd_and_hms(2022, 04, 04, 0, 1, 0).unwrap(); let t_upper = Utc.with_ymd_and_hms(2022, 04, 04, 0, 3, 0).unwrap(); @@ -219,6 +280,7 @@ mod test { let mut energy_data = EnergyData { store: Default::default(), log_location: Default::default(), + log_semaphore: Default::default(), }; let t_lower = Utc.with_ymd_and_hms(2022, 04, 04, 11, 11, 11).unwrap(); let t_upper = Utc.with_ymd_and_hms(2022, 08, 04, 11, 11, 11).unwrap(); diff --git a/server/src/main.rs b/server/src/main.rs index deb74bb..b69afc1 100644 --- a/server/src/main.rs +++ b/server/src/main.rs @@ -7,9 +7,11 @@ use hackdose_sml_parser::message_stream::sml_message_stream; use serde::{Deserialize, Serialize}; use smart_meter::{enable_ir_sensor_power_supply, uart_ir_sensor_data_stream}; use std::sync::Arc; +use std::time::Duration; use std::{collections::HashMap, path::PathBuf}; use tokio::fs::File; use tokio::io::BufReader; +use tokio::time::sleep; use actors::control_actors; use rest::serve_rest_endpoint; @@ -114,6 +116,15 @@ async fn main() { let rest_event_mutex = mutex.clone(); let rest_config = config.clone(); let rest_energy_data = energy_data.clone(); + let rotate_energy_data = energy_data.clone(); + + tokio::spawn(async move { + loop { + sleep(Duration::from_secs(86400)).await; + rotate_energy_data.rotate_log().await; + } + }); + serve_rest_endpoint(rest_event_mutex, rest_energy_data, &rest_config).await; }