Skip to content

Commit

Permalink
Merge pull request #13 from torfmaster/feature/discard-logs
Browse files Browse the repository at this point in the history
feat: discard old logs
  • Loading branch information
torfmaster authored Aug 21, 2024
2 parents 970c8b8 + 5e13ddb commit c794fb8
Show file tree
Hide file tree
Showing 2 changed files with 75 additions and 2 deletions.
66 changes: 64 additions & 2 deletions server/src/data/mod.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
use std::{collections::VecDeque, path::PathBuf, sync::Arc};
use std::{collections::VecDeque, env::temp_dir, path::PathBuf, sync::Arc};

use chrono::{DateTime, Duration, TimeZone, Utc};
use hackdose_server_shared::DataPoint;
use tokio::{
fs::File,
fs::{copy, File},
io::{AsyncBufReadExt, AsyncWriteExt, BufReader},
sync::Mutex,
};
Expand All @@ -14,6 +14,7 @@ use crate::Configuration;
pub(crate) struct EnergyData {
store: Arc<Mutex<VecDeque<DataPoint>>>,
log_location: PathBuf,
log_semaphore: Arc<Mutex<()>>,
}

pub(crate) mod constants {
Expand Down Expand Up @@ -60,6 +61,7 @@ impl EnergyData {
let energy_data = EnergyData {
store: Default::default(),
log_location: config.log_location.clone(),
log_semaphore: Default::default(),
};
let mut buf = energy_data.store.lock().await;
let data = File::open(config.log_location.clone()).await;
Expand Down Expand Up @@ -97,11 +99,13 @@ impl EnergyData {
Err(_) => Self {
store: Default::default(),
log_location: config.log_location.clone(),
log_semaphore: Default::default(),
},
}
}

pub(crate) async fn log_data(&self, data: DataPoint) {
let s = self.log_semaphore.lock().await;
let f = data.date.format(constants::PERSIST_DATE_FORMAT);
let log_line = format!("{};{}\n", f, data.value);
let log = tokio::fs::OpenOptions::new()
Expand All @@ -116,6 +120,61 @@ impl EnergyData {
}
Err(_) => (),
}
drop(s);
}

pub(crate) async fn rotate_log(&self) {
let to_date = Utc::now();
let from_date = to_date - Duration::days(constants::DATA_RETENTION_PERIOD_DAYS);

let s = self.log_semaphore.lock().await;
let old_filename = self.log_location.clone();
let mut new_filename = temp_dir();
new_filename.set_file_name("hackdose_rotate.tmp");

let data = File::open(old_filename.clone()).await;

let mut log = tokio::fs::OpenOptions::new()
.append(true)
.create(true)
.create_new(true)
.open(new_filename.clone())
.await
.unwrap();

match data {
Ok(data) => {
let mut rdr = BufReader::new(data).lines();
while let Ok(Some(line)) = rdr.next_line().await {
let l = line
.split(";")
.map(|x| x.to_string())
.collect::<Vec<String>>();
match &l[..] {
[date_string, _, ..] => {
let date_parsed =
Utc.datetime_from_str(date_string, constants::PERSIST_DATE_FORMAT);
match date_parsed {
Ok(date) => {
if (date < to_date) && (date > from_date) {
let date_string = format!("{}\n", line);
let _ = log.write_all(date_string.as_bytes()).await;
}
}
Err(_) => (),
}
}
_ => (),
}
}
drop(rdr);
drop(log);
copy(new_filename, old_filename).await;
}
Err(_) => {}
}

drop(s);
}
}

Expand All @@ -130,6 +189,7 @@ mod test {
let mut energy_data = EnergyData {
store: Default::default(),
log_location: Default::default(),
log_semaphore: Default::default(),
};
let t_lower = Utc.with_ymd_and_hms(2022, 02, 02, 0, 0, 0).unwrap();
let t_upper = Utc.with_ymd_and_hms(2022, 04, 04, 0, 3, 0).unwrap();
Expand Down Expand Up @@ -173,6 +233,7 @@ mod test {
let mut energy_data = EnergyData {
store: Default::default(),
log_location: Default::default(),
log_semaphore: Default::default(),
};
let t_lower = Utc.with_ymd_and_hms(2022, 04, 04, 0, 1, 0).unwrap();
let t_upper = Utc.with_ymd_and_hms(2022, 04, 04, 0, 3, 0).unwrap();
Expand Down Expand Up @@ -219,6 +280,7 @@ mod test {
let mut energy_data = EnergyData {
store: Default::default(),
log_location: Default::default(),
log_semaphore: Default::default(),
};
let t_lower = Utc.with_ymd_and_hms(2022, 04, 04, 11, 11, 11).unwrap();
let t_upper = Utc.with_ymd_and_hms(2022, 08, 04, 11, 11, 11).unwrap();
Expand Down
11 changes: 11 additions & 0 deletions server/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,11 @@ use hackdose_sml_parser::message_stream::sml_message_stream;
use serde::{Deserialize, Serialize};
use smart_meter::{enable_ir_sensor_power_supply, uart_ir_sensor_data_stream};
use std::sync::Arc;
use std::time::Duration;
use std::{collections::HashMap, path::PathBuf};
use tokio::fs::File;
use tokio::io::BufReader;
use tokio::time::sleep;

use actors::control_actors;
use rest::serve_rest_endpoint;
Expand Down Expand Up @@ -114,6 +116,15 @@ async fn main() {
let rest_event_mutex = mutex.clone();
let rest_config = config.clone();
let rest_energy_data = energy_data.clone();
let rotate_energy_data = energy_data.clone();

tokio::spawn(async move {
loop {
sleep(Duration::from_secs(86400)).await;
rotate_energy_data.rotate_log().await;
}
});

serve_rest_endpoint(rest_event_mutex, rest_energy_data, &rest_config).await;
}

Expand Down

0 comments on commit c794fb8

Please sign in to comment.