-
Notifications
You must be signed in to change notification settings - Fork 20
Seed DynamoDB Local #197
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Seed DynamoDB Local #197
Changes from all commits
78797fd
cfb3e4b
377a8a8
2d10509
9d15443
529d83d
d8b0284
1501e77
ea6582a
7651394
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -6,7 +6,7 @@ use axum::Router; | |
| use http::StatusCode; | ||
| use http_body_util::BodyExt; | ||
| use lambda_http::Error; | ||
| use std::time::SystemTime; | ||
| use std::time::{Duration, SystemTime}; | ||
| use std::{ | ||
| collections::HashMap, | ||
| future::Future, | ||
|
|
@@ -19,6 +19,12 @@ use tower_service::Service; | |
| use tracing_subscriber::EnvFilter; | ||
| use ulid::Ulid; | ||
|
|
||
| const QUESTIONS_EXPIRE_AFTER_DAYS: u64 = 30; | ||
| const QUESTIONS_TTL: Duration = Duration::from_secs(QUESTIONS_EXPIRE_AFTER_DAYS * 24 * 60 * 60); | ||
|
|
||
| const EVENTS_EXPIRE_AFTER_DAYS: u64 = 60; | ||
| const EVENTS_TTL: Duration = Duration::from_secs(EVENTS_EXPIRE_AFTER_DAYS * 24 * 60 * 60); | ||
|
|
||
| #[allow(unused_imports)] | ||
| use tracing::{debug, error, info, trace, warn}; | ||
|
|
||
|
|
@@ -33,7 +39,6 @@ enum Backend { | |
| } | ||
|
|
||
| impl Backend { | ||
| #[cfg(test)] | ||
| async fn local() -> Self { | ||
| Backend::Local(Arc::new(Mutex::new(Local::default()))) | ||
| } | ||
|
|
@@ -161,83 +166,172 @@ fn mint_service_error<E>(e: E) -> SdkError<E> { | |
| ) | ||
| } | ||
|
|
||
| /// Seed the database. | ||
| /// | ||
| /// This will register a test event (with id `00000000000000000000000000`) and | ||
| /// a number of questions for it in the database, whether it's an in-memory [`Local`] | ||
| /// database or a local instance of DynamoDB. Note that in the latter case | ||
| /// we are checking if the test event is already there, and - if so - we are _not_ seeding | ||
| /// the questions. This is to avoid creating duplicated questions when re-running the app. | ||
| /// And this is not an issue of course when running against our in-memory [`Local`] database. | ||
| /// | ||
| /// The returned vector contains IDs of the questions related to the test event. | ||
| #[cfg(debug_assertions)] | ||
| async fn seed(backend: &mut Backend) -> Vec<Ulid> { | ||
| #[derive(serde::Deserialize)] | ||
| struct LiveAskQuestion { | ||
| likes: usize, | ||
| text: String, | ||
| hidden: bool, | ||
| answered: bool, | ||
| #[serde(rename = "createTimeUnix")] | ||
| created: usize, | ||
| } | ||
|
|
||
| let seed: Vec<LiveAskQuestion> = serde_json::from_str(SEED).unwrap(); | ||
| let seed_e = Ulid::from_string("00000000000000000000000000").unwrap(); | ||
| let seed_e_secret = "secret"; | ||
|
|
||
| info!("going to seed test event"); | ||
| match backend.event(&seed_e).await.unwrap() { | ||
| output if output.item().is_some() => { | ||
| warn!("test event is already there, skipping seeding questions"); | ||
| } | ||
| _ => { | ||
| backend.new(&seed_e, seed_e_secret).await.unwrap(); | ||
| info!("successfully registered test event, going to seed questions now"); | ||
| // first create questions ... | ||
| let mut qs = Vec::new(); | ||
| for q in seed { | ||
| let qid = ulid::Ulid::new(); | ||
| backend | ||
| .ask( | ||
| &seed_e, | ||
| &qid, | ||
| ask::Question { | ||
| body: q.text, | ||
| asker: None, | ||
| }, | ||
| ) | ||
| .await | ||
| .unwrap(); | ||
| qs.push((qid, q.created, q.likes, q.hidden, q.answered)); | ||
| } | ||
| // ... then set the vote count + answered/hidden flags | ||
| match backend { | ||
| Backend::Dynamo(ref mut client) => { | ||
| use aws_sdk_dynamodb::types::BatchStatementRequest; | ||
| // DynamoDB supports batch operations using PartiQL syntax with `25` as max batch size | ||
| // https://docs.aws.amazon.com/amazondynamodb/latest/APIReference/API_BatchExecuteStatement.html | ||
| for chunk in qs.chunks(25) { | ||
| let batch_update = chunk | ||
| .iter() | ||
| .map(|(qid, created, votes, hidden, answered)| { | ||
| let builder = BatchStatementRequest::builder(); | ||
| let builder = if *answered { | ||
| builder.statement( | ||
| // numerous words are reserved in the DynamoDB engine (e.g. Key, Id, When) and | ||
| // should be qouted; we are quoting all of our attrs to avoid possible collisions | ||
| r#"UPDATE "questions" SET "answered"=? SET "votes"=? SET "when"=? SET "hidden"=? WHERE "id"=?"#, | ||
| ) | ||
| .parameters(to_dynamo_timestamp(SystemTime::now())) // answered | ||
| } else { | ||
| builder.statement( | ||
| r#"UPDATE "questions" SET "votes"=? SET "when"=? SET "hidden"=? WHERE "id"=?"#, | ||
| ) | ||
| }; | ||
| builder | ||
| .parameters(AttributeValue::N(votes.to_string())) // votes | ||
| .parameters(AttributeValue::N(created.to_string())) // when | ||
| .parameters(AttributeValue::Bool(*hidden)) // hidden | ||
| .parameters(AttributeValue::S(qid.to_string())) // id | ||
| .build() | ||
| .unwrap() | ||
| }) | ||
| .collect::<Vec<_>>(); | ||
| client | ||
| .batch_execute_statement() | ||
| .set_statements(Some(batch_update)) | ||
| .send() | ||
| .await | ||
| .expect("batch to have been written ok"); | ||
| } | ||
| } | ||
| Backend::Local(ref mut state) => { | ||
| let state = Arc::get_mut(state).unwrap(); | ||
| let state = Mutex::get_mut(state).unwrap(); | ||
| for (qid, created, votes, hidden, answered) in qs { | ||
| let q = state.questions.get_mut(&qid).unwrap(); | ||
| q.insert("votes", AttributeValue::N(votes.to_string())); | ||
| if answered { | ||
| q.insert("answered", to_dynamo_timestamp(SystemTime::now())); | ||
| } | ||
| q.insert("hidden", AttributeValue::Bool(hidden)); | ||
| q.insert("when", AttributeValue::N(created.to_string())); | ||
| } | ||
| } | ||
| } | ||
| info!("successfully registered questions"); | ||
| } | ||
| } | ||
| // let's collect ids of the questions related to the test event, | ||
| // we can then use them to auto-generate user votes over time | ||
| backend | ||
| .list(&seed_e, true) | ||
| .await | ||
| .expect("scenned index ok") | ||
| .items() | ||
| .iter() | ||
| .filter_map(|item| { | ||
| let id = item | ||
| .get("id") | ||
| .expect("id is in projection") | ||
| .as_s() | ||
| .expect("id is of type string"); | ||
| ulid::Ulid::from_string(id).ok() | ||
|
Owner
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Shouldn't we just
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. When testing, one can add some questions to dynamodb directly via the admin web UI. The ids of those manually added questions can be invalid ULIDs and so we are just skipping those questions, i.e. not going to vote for them in the vote-over-time task. That was the idea. We could add a comment on this, wdyt?
Owner
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think we're probably better served by asserting here — the convenience of being able to use non-ULIDs is outweighed (I think) by the risk of us encoding the assuming of ULIDs elsewhere in the system that may break in more subtle ways |
||
| }) | ||
| .collect() | ||
| } | ||
|
|
||
| #[tokio::main] | ||
| async fn main() -> Result<(), Error> { | ||
| tracing_subscriber::fmt() | ||
| .with_env_filter(EnvFilter::from_default_env()) | ||
| // TODO: we may _not_ want `without_time` when deploying | ||
| // TODO: on non-Lambda runtimes; this can be addressed as | ||
| // TODO: part of https://github.com/jonhoo/wewerewondering/issues/202 | ||
| .without_time(/* cloudwatch does that */).init(); | ||
|
|
||
| #[cfg(not(debug_assertions))] | ||
| let backend = Backend::dynamo().await; | ||
|
|
||
| #[cfg(debug_assertions)] | ||
| let backend = if std::env::var_os("USE_DYNAMODB").is_some() { | ||
| Backend::dynamo().await | ||
| } else { | ||
| let backend = { | ||
| use rand::prelude::SliceRandom; | ||
| use serde::Deserialize; | ||
| use std::time::Duration; | ||
|
|
||
| #[cfg(debug_assertions)] | ||
| #[derive(Deserialize)] | ||
| struct LiveAskQuestion { | ||
| likes: usize, | ||
| text: String, | ||
| hidden: bool, | ||
| answered: bool, | ||
| #[serde(rename = "createTimeUnix")] | ||
| created: usize, | ||
| } | ||
|
|
||
| let mut state = Local::default(); | ||
| let seed: Vec<LiveAskQuestion> = serde_json::from_str(SEED).unwrap(); | ||
| let seed_e = "00000000000000000000000000"; | ||
| let seed_e = Ulid::from_string(seed_e).unwrap(); | ||
| state.events.insert(seed_e, String::from("secret")); | ||
| state.questions_by_eid.insert(seed_e, Vec::new()); | ||
| let mut state = Backend::Local(Arc::new(Mutex::new(state))); | ||
| let mut qs = Vec::new(); | ||
| for q in seed { | ||
| let qid = ulid::Ulid::new(); | ||
| state | ||
| .ask( | ||
|
jonhoo marked this conversation as resolved.
|
||
| &seed_e, | ||
| &qid, | ||
| ask::Question { | ||
| body: q.text, | ||
| asker: None, | ||
| }, | ||
| ) | ||
| .await | ||
| .unwrap(); | ||
| qs.push((qid, q.created, q.likes, q.hidden, q.answered)); | ||
| } | ||
| let mut qids = Vec::new(); | ||
| { | ||
| let Backend::Local(ref mut state): Backend = state else { | ||
| unreachable!(); | ||
| }; | ||
| let state = Arc::get_mut(state).unwrap(); | ||
| let state = Mutex::get_mut(state).unwrap(); | ||
| for (qid, created, votes, hidden, answered) in qs { | ||
| let q = state.questions.get_mut(&qid).unwrap(); | ||
| q.insert("votes", AttributeValue::N(votes.to_string())); | ||
| if answered { | ||
| q.insert("answered", to_dynamo_timestamp(SystemTime::now())); | ||
| } | ||
| q.insert("hidden", AttributeValue::Bool(hidden)); | ||
| q.insert("when", AttributeValue::N(created.to_string())); | ||
| qids.push(qid); | ||
| } | ||
| } | ||
| let cheat = state.clone(); | ||
| let mut backend = if std::env::var_os("USE_DYNAMODB").is_some() { | ||
| Backend::dynamo().await | ||
| } else { | ||
| Backend::local().await | ||
| }; | ||
|
|
||
| // to aid in development, seed the backend with a test event and related | ||
| // questions, and auto-generate user votes over time | ||
| let qids = seed(&mut backend).await; | ||
| let cheat = backend.clone(); | ||
| tokio::spawn(async move { | ||
| let mut interval = tokio::time::interval(Duration::from_secs(1)); | ||
| interval.tick().await; | ||
| loop { | ||
| tokio::time::sleep(Duration::from_secs(1)).await; | ||
| let qid = qids.choose(&mut rand::thread_rng()).unwrap(); | ||
| interval.tick().await; | ||
| let qid = qids | ||
| .choose(&mut rand::thread_rng()) | ||
| .expect("there _are_ some questions for our test event"); | ||
| let _ = cheat.vote(qid, vote::UpDown::Up).await; | ||
| } | ||
| }); | ||
| state | ||
|
|
||
| backend | ||
| }; | ||
|
|
||
| let app = Router::new() | ||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.