Skip to content

Commit 402f0a8

Browse files
committed
allow queueing specific actions
we can now say "hey, run this action after $time". This will mostly be used for suspending users after their free trial period is up in big_money
1 parent c204d38 commit 402f0a8

File tree

3 files changed

+269
-37
lines changed

3 files changed

+269
-37
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
create table queued_actions (
2+
id serial primary key not null,
3+
action text not null,
4+
user_id bigint not null,
5+
execute_at timestamptz not null,
6+
status text not null
7+
);
8+
9+
CREATE INDEX id_user_id_status ON queued_actions (id, user_id, status);

src/action.rs

+130
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,130 @@
1+
use anyhow::anyhow;
2+
use bfsp::internal::ActionInfo;
3+
use std::collections::HashSet;
4+
use std::sync::Arc;
5+
use std::time::Duration;
6+
use tracing::error_span;
7+
use tracing::{error, Level};
8+
9+
use rand::Rng;
10+
11+
use crate::{chunk_db::ChunkDB, meta_db::MetaDB};
12+
13+
#[derive(Debug)]
14+
enum Action {
15+
DeleteFiles,
16+
SuspendRead,
17+
SuspendWrite,
18+
SuspendDelete,
19+
SuspendQuery,
20+
}
21+
22+
impl TryFrom<String> for Action {
23+
type Error = anyhow::Error;
24+
25+
#[tracing::instrument(err)]
26+
fn try_from(value: String) -> Result<Self, Self::Error> {
27+
match value.as_str() {
28+
"delete_files" => Ok(Self::DeleteFiles),
29+
"suspend_read" => Ok(Self::SuspendRead),
30+
"suspend_write" => Ok(Self::SuspendWrite),
31+
"suspend_delete" => Ok(Self::SuspendDelete),
32+
"suspend_query" => Ok(Self::SuspendQuery),
33+
_ => Err(anyhow!("invalid action")),
34+
}
35+
}
36+
}
37+
38+
pub async fn check_run_actions_loop<M: MetaDB + 'static, C: ChunkDB + 'static>(
39+
meta_db: Arc<M>,
40+
chunk_db: Arc<C>,
41+
) {
42+
loop {
43+
tracing::span!(Level::INFO, "run_current_actions");
44+
45+
match meta_db
46+
.list_actions(Some("pending".to_string()), true)
47+
.await
48+
{
49+
Ok(actions) => {
50+
for action_info in actions.into_iter() {
51+
let meta_db = Arc::clone(&meta_db);
52+
let chunk_db = Arc::clone(&chunk_db);
53+
54+
tokio::task::spawn(async move {
55+
match run_action(Arc::clone(&meta_db), chunk_db, &action_info).await {
56+
Ok(_) => {
57+
let _ = meta_db.executed_action(action_info.id.unwrap()).await;
58+
}
59+
Err(err) => {
60+
error!("Error running action: {err}");
61+
}
62+
}
63+
});
64+
}
65+
}
66+
Err(err) => {
67+
error!("Error listing actions: {err}");
68+
}
69+
}
70+
71+
// random jitter to make servers less likely to run multiple actions at once
72+
let jitter = rand::thread_rng().gen_range(-1.0..=1.0);
73+
tokio::time::sleep(Duration::from_secs_f32(10.0 + jitter)).await;
74+
}
75+
}
76+
77+
#[tracing::instrument(err, skip(meta_db, chunk_db))]
78+
async fn run_action<M: MetaDB, C: ChunkDB>(
79+
meta_db: Arc<M>,
80+
chunk_db: Arc<C>,
81+
action_info: &ActionInfo,
82+
) -> anyhow::Result<()> {
83+
let action: Action = action_info.action.clone().try_into()?;
84+
let user_id = action_info.user_id;
85+
86+
match action {
87+
Action::DeleteFiles => {
88+
meta_db.delete_all_meta(user_id).await?;
89+
}
90+
//FIXME: REPLACE ALL WITH JSON_INSERT FUNCTION!!!!!!
91+
Action::SuspendRead => {
92+
// FIXME: i'm so god damn sure i can just do something like suspension_info->read = true;
93+
let suspensions = meta_db.suspensions(&[user_id]).await?;
94+
let mut suspension = suspensions[&user_id];
95+
suspension.read_suspended = true;
96+
meta_db
97+
.set_suspensions([(user_id, suspension)].into())
98+
.await?;
99+
}
100+
Action::SuspendWrite => {
101+
// FIXME: i'm so god damn sure i can just do something like suspension_info->read = true;
102+
let suspensions = meta_db.suspensions(&[user_id]).await?;
103+
let mut suspension = suspensions[&user_id];
104+
suspension.write_suspended = true;
105+
meta_db
106+
.set_suspensions([(user_id, suspension)].into())
107+
.await?;
108+
}
109+
Action::SuspendDelete => {
110+
// FIXME: i'm so god damn sure i can just do something like suspension_info->read = true;
111+
let suspensions = meta_db.suspensions(&[user_id]).await?;
112+
let mut suspension = suspensions[&user_id];
113+
suspension.delete_suspended = true;
114+
meta_db
115+
.set_suspensions([(user_id, suspension)].into())
116+
.await?;
117+
}
118+
Action::SuspendQuery => {
119+
// FIXME: i'm so god damn sure i can just do something like suspension_info->read = true;
120+
let suspensions = meta_db.suspensions(&[user_id]).await?;
121+
let mut suspension = suspensions[&user_id];
122+
suspension.query_suspended = true;
123+
meta_db
124+
.set_suspensions([(user_id, suspension)].into())
125+
.await?;
126+
}
127+
};
128+
129+
Ok(())
130+
}

0 commit comments

Comments
 (0)