Skip to content

Commit 4a52825

Browse files
committed
check and cache revoked tokens
we should eventually store this in sqlite or something, but the list is tiny so in mem is fine
1 parent 9514b8a commit 4a52825

9 files changed

+97
-23
lines changed

.envrc

+1-1
Original file line numberDiff line numberDiff line change
@@ -1,2 +1,2 @@
1-
# this is *not* the prod public key, obviously lol
1+
export BIG_CENTRAL_URL="http://localhost:4000";
22
use flake;

Cargo.lock

+1
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

+3-2
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ tracing = { version = "0.1" }
2727
tracing-subscriber = { version = "0.3", features = ["env-filter"] }
2828
opentelemetry-otlp = { version = "0.15.0", features = ["http-proto", "reqwest", "reqwest-client", "reqwest-rustls", "trace", "tokio"] }
2929
opentelemetry = "0.22"
30-
reqwest = "0.11"
30+
reqwest = { version = "0.11", features = ["json"] }
3131
opentelemetry_sdk = { version = "0.22", features = ["rt-tokio", "trace"] }
3232
tracing-opentelemetry = "0.23"
3333

@@ -37,7 +37,8 @@ rcgen = "0.12"
3737
warp = "0.3"
3838
serde = "1"
3939
serde_json = { version = "1", features = ["raw_value"] }
40-
bytes = "1.6"
40+
bytes = "1"
41+
hex = "0.4"
4142

4243
[features]
4344
s3 = ["rust-s3"]

src/action.rs

+5-15
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,12 @@
11
use anyhow::anyhow;
22
use bfsp::internal::ActionInfo;
3-
use std::collections::HashSet;
43
use std::sync::Arc;
54
use std::time::Duration;
6-
use tracing::error_span;
75
use tracing::{error, Level};
86

97
use rand::Rng;
108

11-
use crate::{chunk_db::ChunkDB, meta_db::MetaDB};
9+
use crate::meta_db::MetaDB;
1210

1311
#[derive(Debug)]
1412
enum Action {
@@ -35,10 +33,7 @@ impl TryFrom<String> for Action {
3533
}
3634
}
3735

38-
pub async fn check_run_actions_loop<M: MetaDB + 'static, C: ChunkDB + 'static>(
39-
meta_db: Arc<M>,
40-
chunk_db: Arc<C>,
41-
) {
36+
pub async fn check_run_actions_loop<M: MetaDB + 'static>(meta_db: Arc<M>) {
4237
loop {
4338
tracing::span!(Level::INFO, "run_current_actions");
4439

@@ -49,10 +44,9 @@ pub async fn check_run_actions_loop<M: MetaDB + 'static, C: ChunkDB + 'static>(
4944
Ok(actions) => {
5045
for action_info in actions.into_iter() {
5146
let meta_db = Arc::clone(&meta_db);
52-
let chunk_db = Arc::clone(&chunk_db);
5347

5448
tokio::task::spawn(async move {
55-
match run_action(Arc::clone(&meta_db), chunk_db, &action_info).await {
49+
match run_action(Arc::clone(&meta_db), &action_info).await {
5650
Ok(_) => {
5751
let _ = meta_db.executed_action(action_info.id.unwrap()).await;
5852
}
@@ -74,12 +68,8 @@ pub async fn check_run_actions_loop<M: MetaDB + 'static, C: ChunkDB + 'static>(
7468
}
7569
}
7670

77-
#[tracing::instrument(err, skip(meta_db, chunk_db))]
78-
async fn run_action<M: MetaDB, C: ChunkDB>(
79-
meta_db: Arc<M>,
80-
chunk_db: Arc<C>,
81-
action_info: &ActionInfo,
82-
) -> anyhow::Result<()> {
71+
#[tracing::instrument(err, skip(meta_db))]
72+
async fn run_action<M: MetaDB>(meta_db: Arc<M>, action_info: &ActionInfo) -> anyhow::Result<()> {
8373
let action: Action = action_info.action.clone().try_into()?;
8474
let user_id = action_info.user_id;
8575

src/auth.rs

+7-1
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ use biscuit_auth::{
1010
};
1111
use tracing::{event, Level};
1212

13-
use crate::meta_db::MetaDB;
13+
use crate::{meta_db::MetaDB, tokens::check_token_revoked};
1414

1515
#[derive(Debug)]
1616
pub enum Right {
@@ -20,6 +20,7 @@ pub enum Right {
2020
Delete,
2121
Usage,
2222
Payment,
23+
Settings,
2324
}
2425

2526
impl Right {
@@ -31,6 +32,7 @@ impl Right {
3132
Right::Delete => "delete",
3233
Right::Usage => "usage",
3334
Right::Payment => "payment",
35+
Right::Settings => "settings",
3436
}
3537
}
3638
}
@@ -42,6 +44,10 @@ pub async fn authorize<M: MetaDB>(
4244
file_ids: Vec<String>,
4345
meta_db: &M,
4446
) -> anyhow::Result<i64> {
47+
if check_token_revoked(token).await {
48+
return Err(anyhow!("token is revoked"));
49+
}
50+
4551
let user_id = get_user_id(token)?;
4652

4753
// first, check if the user has been suspended from the right they're trying to execute

src/io.rs

Whitespace-only changes.

src/main.rs

+5-3
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ mod chunk_db;
44
mod internal;
55
mod meta_db;
66
mod tls;
7+
mod tokens;
78

89
use action::check_run_actions_loop;
910
use anyhow::anyhow;
@@ -32,6 +33,7 @@ use std::{
3233
collections::{HashMap, HashSet},
3334
sync::Arc,
3435
};
36+
use tokens::refresh_revoked_tokens;
3537
use tokio::{fs, io};
3638
use tracing::{event, Level};
3739
use tracing_opentelemetry::PreSampledTracer;
@@ -133,12 +135,11 @@ async fn main() -> Result<()> {
133135
let chunk_db_clone = Arc::clone(&chunk_db);
134136
let meta_db_clone = Arc::clone(&meta_db);
135137

138+
tokio::task::spawn(async move { refresh_revoked_tokens().await });
136139
tokio::task::spawn(async move { chunk_db_clone.garbage_collect(meta_db_clone).await });
137140

138-
let chunk_db_clone = Arc::clone(&chunk_db);
139141
let meta_db_clone = Arc::clone(&meta_db);
140-
141-
tokio::task::spawn(async move { check_run_actions_loop(meta_db_clone, chunk_db_clone).await });
142+
tokio::task::spawn(async move { check_run_actions_loop(meta_db_clone).await });
142143

143144
let internal_tcp_addr = "[::]:9990".to_socket_addrs().unwrap().next().unwrap();
144145

@@ -492,6 +493,7 @@ pub async fn handle_message<M: MetaDB + 'static, C: ChunkDB + 'static>(
492493
.encode_to_vec(),
493494
Err(_) => todo!(),
494495
},
496+
_ => todo!(),
495497
}
496498
.prepend_len())
497499
}

src/meta_db.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ use bfsp::{
1212
use serde::{Deserialize, Serialize};
1313
use sqlx::{
1414
types::{time::OffsetDateTime, Json},
15-
Execute, Executor, PgPool, QueryBuilder, Row,
15+
Executor, PgPool, QueryBuilder, Row,
1616
};
1717
use thiserror::Error;
1818

src/tokens.rs

+74
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,74 @@
1+
use std::{collections::HashSet, env, sync::OnceLock, time::Duration};
2+
3+
use anyhow::anyhow;
4+
use biscuit_auth::Biscuit;
5+
use reqwest::StatusCode;
6+
use tokio::sync::RwLock;
7+
8+
pub type RevocationIdentiifer = Vec<u8>;
9+
10+
static REVOKED_TOKENS: OnceLock<RwLock<HashSet<RevocationIdentiifer>>> = OnceLock::new();
11+
12+
#[tracing::instrument]
13+
pub async fn refresh_revoked_tokens() {
14+
let mut token_num: u32 = 0;
15+
loop {
16+
match single_update_revoked_tokens(token_num, 100).await {
17+
Ok(tokens_inserted) => {
18+
// we don't want to get too far behind, we we should keep iterating up til we can't
19+
token_num += tokens_inserted;
20+
if tokens_inserted > 0 {
21+
continue;
22+
}
23+
}
24+
Err(err) => {
25+
tracing::error!(
26+
token_num = token_num,
27+
page_size = 100,
28+
"Error updating revoked tokens: {err}"
29+
);
30+
}
31+
}
32+
tokio::time::sleep(Duration::from_secs(5 * 60)).await;
33+
}
34+
}
35+
36+
pub async fn check_token_revoked(token: &Biscuit) -> bool {
37+
for identifier in token.revocation_identifiers().iter() {
38+
let revoked_tokens = REVOKED_TOKENS.get_or_init(|| RwLock::new(HashSet::new()));
39+
if revoked_tokens.read().await.contains(identifier.as_slice()) {
40+
return true;
41+
}
42+
}
43+
44+
false
45+
}
46+
47+
#[tracing::instrument(err)]
48+
async fn single_update_revoked_tokens(token_num: u32, page_size: u32) -> anyhow::Result<u32> {
49+
let revoked_tokens = REVOKED_TOKENS.get_or_init(|| RwLock::new(HashSet::new()));
50+
let big_central_url = big_central_url();
51+
let resp = reqwest::get(format!(
52+
"{big_central_url}/api/v1/revoked_tokens?token_num={token_num}&page_size={page_size}"
53+
))
54+
.await?;
55+
56+
if resp.status() != StatusCode::OK {
57+
return Err(anyhow!("{}", resp.text().await?));
58+
}
59+
60+
let tokens: Vec<String> = resp.json().await?;
61+
let num_tokens: u32 = tokens.len().try_into()?;
62+
63+
let revoked_tokens = &mut revoked_tokens.write().await;
64+
for token in tokens.into_iter() {
65+
let token: RevocationIdentiifer = hex::decode(token)?;
66+
revoked_tokens.insert(token);
67+
}
68+
69+
Ok(num_tokens)
70+
}
71+
72+
fn big_central_url() -> String {
73+
env::var("BIG_CENTRAL_URL").unwrap_or_else(|_| "https://bbfs.io".to_string())
74+
}

0 commit comments

Comments
 (0)