diff --git a/Cargo.toml b/Cargo.toml index 4129e68..5f6a63e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -5,14 +5,17 @@ edition = "2024" [dependencies] -sqlx = { version = "0.8", features = ["runtime-tokio-native-tls", "postgres"] } +deadpool-postgres = { version = "0.14.1", features = ["serde"] } serde = { version = "1.0", features = ["derive"] } moka = { version = "0.12", features = ["future"] } uuid = { version = "1.0", features = ["v4"] } +tokio-postgres = "0.7.13" env_logger = "0.11.6" actix-web = "4.11.0" actix-cors = "0.7.1" +deadpool = "0.12.2" serde_json = "1.0" +tokio = "1.47.1" log = "0.4" diff --git a/src/db/pgsql_handlers.rs b/src/db/pgsql_handlers.rs index 7d1ee6d..6f85bc1 100644 --- a/src/db/pgsql_handlers.rs +++ b/src/db/pgsql_handlers.rs @@ -1,6 +1,8 @@ use serde::{Deserialize, Serialize}; -use sqlx::postgres::PgPool; -use sqlx::Row; +use deadpool_postgres::{ + PoolError as PgError, + Pool as PgPool +}; #[derive(Deserialize, Serialize)] @@ -21,34 +23,33 @@ pub struct Record { // DB working state Check -pub async fn health_check(db_pool: &PgPool) -> Result<(), sqlx::Error> { +pub async fn health_check(db_pool: &PgPool) -> Result<(), PgError> { // Simple query to check if the database is responsive - sqlx::query("SELECT 1") - .fetch_one(db_pool) - .await - .map(|_| ()) // If successful, return Ok(()) + let client = db_pool.get().await?; + let _ = client.query("SELECT 1", &[]).await?; + Ok(()) } // Sample private function to create a new note -async fn create_single_note(db_pool: &PgPool, note: Note) -> Result { - let row = sqlx::query( - r#" - INSERT INTO notes (title, content) - VALUES ($1, $2) - RETURNING id - "# - ) - .bind(¬e.title) - .bind(¬e.content) - .fetch_one(db_pool) - .await?; - - Ok(row.get("id")) +async fn create_single_note(db_pool: &PgPool, note: Note) -> Result { + let client = db_pool.get().await?; + let result = client + .query( + r#" + INSERT INTO notes (title, content) + VALUES ($1, $2) + RETURNING id + "#, + &[¬e.title, ¬e.content], + ) + .await?; + Ok(result[0].get("id")) } + // Add few sample data in DB -pub async fn add_new_notes(db_pool: &PgPool, values: Vec) -> Result<(), sqlx::Error> { +pub async fn add_new_notes(db_pool: &PgPool, values: Vec) -> Result<(), PgError> { for note in values { // We can do like this to purely put the query in one function and call it in another function // We can even do some processing before calling the query (but all db related stuff should be in db module only) @@ -59,19 +60,25 @@ pub async fn add_new_notes(db_pool: &PgPool, values: Vec) -> Result<(), sq } // Fetch all notes from DB -pub async fn fetch_all_notes(db_pool: &PgPool) -> Result, sqlx::Error> { - let result = sqlx::query( - r#" - SELECT id, title, content FROM notes - "# - ) - .map(|row: sqlx::postgres::PgRow| Record { - id: row.get("id"), - title: row.get("title"), - content: row.get("content"), - }) - .fetch_all(db_pool) - .await; +pub async fn fetch_all_notes(db_pool: &PgPool) -> Result, PgError> { + let client = db_pool.get().await?; + let rows = client + .query( + r#" + SELECT id, title, content FROM notes + "#, + &[], + ) + .await?; + + let notes = rows + .iter() + .map(|row| Record { + id: row.get("id"), + title: row.get("title"), + content: row.get("content"), + }) + .collect(); - result + Ok(notes) } diff --git a/src/routes/health.rs b/src/routes/health.rs index ec0a9ba..14dd8de 100644 --- a/src/routes/health.rs +++ b/src/routes/health.rs @@ -1,6 +1,6 @@ -use crate::db::pgsql_handlers::health_check as check_db; +use crate::db::pgsql_handlers::health_check as health_check_pgsql; use actix_web::{get, web, HttpResponse, Responder}; -use sqlx::PgPool; +use deadpool_postgres::Pool as PgPool; // Health check endpoint @@ -13,7 +13,7 @@ async fn api_health_check() -> impl Responder { // Database health check #[get("/pgsql")] async fn db_health_check(state: web::Data) -> impl Responder { - match check_db(&state).await { + match health_check_pgsql(&state).await { Ok(_) => HttpResponse::Ok().body("Database is running!"), Err(err) => HttpResponse::InternalServerError().json(format!("Failed: {}", err)), } diff --git a/src/routes/sample_db.rs b/src/routes/sample_db.rs index 47e7399..f884f5b 100644 --- a/src/routes/sample_db.rs +++ b/src/routes/sample_db.rs @@ -1,7 +1,7 @@ use crate::db::pgsql_handlers::{Note, add_new_notes, fetch_all_notes}; use actix_web::{get, post, web, HttpResponse, Responder}; use crate::types::{AppCache, make_key}; -use sqlx::PgPool; +use deadpool_postgres::Pool as PgPool; #[post("/create-note")] diff --git a/src/state.rs b/src/state.rs index 9472183..9de646a 100644 --- a/src/state.rs +++ b/src/state.rs @@ -1,66 +1,237 @@ -use sqlx::postgres::{PgPool, PgPoolOptions}; +use deadpool_postgres::{Manager, RecyclingMethod, Pool as PgPool}; +use deadpool::{managed::Timeouts, Runtime}; use actix_web::web::Data as webData; +use tokio_postgres::{Config, NoTls}; use std::env::var as env_var; use super::types::AppCache; use std::time::Duration; +use log::{info, warn}; use actix_web::web; -use log::info; -async fn init_postgres() -> PgPool { - // Read the pool size from the environment variable - let max_pool_size: u32 = env_var("POSTGRES_DB_MAX_POOL_SIZE") - .unwrap_or("100".to_string()) // Default to 2 if not set - .parse() - .expect("POSTGRES_DB_MAX_POOL_SIZE must be a number"); +struct PgSettings { + url: String, + conn_timeout: u64, + max_pool_size: usize, + wait_timeout: u64, + new_connection_timeout: u64, + recycle_timeout: u64, + warm_pool: bool, + warm_pool_size: usize, +} + + +struct MokaSettings { + cache_size: u64, + expiration_time: Duration, +} + + +struct AppSettings { + pg_settings: PgSettings, + cache_settings: MokaSettings, + enable_logging: bool, +} + + +trait FromEnv { + fn from_env() -> Self; +} + + +impl FromEnv for PgSettings { + fn from_env() -> Self { + let url = env_var("POSTGRES_DB_URL").expect("POSTGRES_DB_URL must be set"); + let conn_timeout = env_var("POSTGRES_CONN_TIMEOUT") + .ok() + .and_then(|s| s.parse().ok()) + .expect("POSTGRES_CONN_TIMEOUT must be a positive integer of type u64"); + let max_pool_size = env_var("PG_POOL_MAX_SIZE") + .ok() + .and_then(|s| s.parse().ok()) + .expect("PG_POOL_MAX_SIZE must be a positive integer of type usize"); + let wait_timeout = env_var("PG_POOL_WAIT_TIMEOUT") + .ok() + .and_then(|s| s.parse().ok()) + .expect("PG_POOL_WAIT_TIMEOUT must be a positive integer of type u64"); + let new_connection_timeout = env_var("PG_POOL_NEW_CONNECTION_TIMEOUT") + .ok() + .and_then(|s| s.parse().ok()) + .expect("PG_POOL_NEW_CONNECTION_TIMEOUT must be a positive integer of type u64"); + let recycle_timeout = env_var("PG_POOL_RECYCLE_TIMEOUT") + .ok() + .and_then(|s| s.parse().ok()) + .expect("PG_POOL_RECYCLE_TIMEOUT must be a positive integer of type u64"); + let warm_pool = env_var("PG_POOL_WARM_POOL").expect("PG_POOL_WARM_POOL must be set as true or false"); + let warm_pool = match warm_pool.to_lowercase().as_str() { + "true" => true, + "false" => false, + _ => panic!("PG_POOL_WARM_POOL must be set as true or false"), + }; + let warm_pool_size = env_var("PG_POOL_WARM_POOL_SIZE") + .ok() + .and_then(|s| s.parse().ok()) + .expect("PG_POOL_WARM_POOL_SIZE must be a positive integer of type usize"); + + // Warm pool size can not go above 128 (if warm pool is enabled) + if warm_pool_size > max_pool_size { + panic!("PG_POOL_WARM_POOL_SIZE must be at most PG_POOL_MAX_SIZE, it can not go more than {}", max_pool_size); + } + if warm_pool && warm_pool_size > 128 { + panic!("PG_POOL_WARM_POOL_SIZE must be at most 128, and the optimal size is 64"); + } + + PgSettings { + url, + conn_timeout, + max_pool_size, + wait_timeout, + new_connection_timeout, + recycle_timeout, + warm_pool, + warm_pool_size, + } + } +} + + +impl FromEnv for MokaSettings { + fn from_env() -> Self { + let cache_size = env_var("CACHE_SIZE") + .ok() + .and_then(|s| s.parse().ok()) + .expect("CACHE_SIZE must be a positive integer of type u64"); + let expiration_time = env_var("CACHE_EXPIRATION_TIME") + .ok() + .and_then(|s| s.parse().ok()) + .expect("CACHE_EXPIRATION_TIME must be a positive integer of type u64"); - // Create the pool using PgPoolOptions and set the max pool size - let db_url = env_var("POSTGRES_DB_URL").expect("POSTGRES_DB_URL must be set"); + MokaSettings { + cache_size, + expiration_time: Duration::from_secs(expiration_time), + } + } +} - let db_pool = PgPoolOptions::new() - .max_connections(max_pool_size) // Set the pool size here - .connect(&db_url) - .await - .expect("Failed to connect to the database"); - info!("Successfully connected to the database"); +impl FromEnv for AppSettings { + fn from_env() -> Self { + let enable_logging = env_var("ENABLE_LOGGING").expect("ENABLE_LOGGING must be set as true or false"); + let enable_logging = match enable_logging.to_lowercase().as_str() { + "true" => true, + "false" => false, + _ => panic!("ENABLE_LOGGING must be set as true or false"), + }; - db_pool + AppSettings { + pg_settings: PgSettings::from_env(), + cache_settings: MokaSettings::from_env(), + enable_logging, + } + } } -fn init_cache() -> AppCache { - // Get the max capacity from the environment variable - let max_capacity: u64 = env_var("CACHE_MAX_CAPACITY") - .unwrap_or("100_000".to_string()) - .parse() - .expect("CACHE_MAX_CAPACITY must be a number"); +async fn warm_pool(pool: &PgPool, pg: &PgSettings) { + // Warm pool to avoid first-hit latency + if !pg.warm_pool { + // Return early if warm pool is not enabled + return; + } + + let warm_n = pg.max_pool_size.min(pg.warm_pool_size); + let mut ok = 0; + + for _ in 0..warm_n { + match pool.get().await { + Ok(client) => { + let _ = client.simple_query("SELECT 1").await; + ok += 1; + } + Err(_) => { + warn!("Pool warm-up: failed to get a connection"); + } + } + } + + // Log the warm-up results + if ok == 0 { + warn!("Pool warm-up failed, all attempts to get a connection were unsuccessful: {warm_n}"); + } else { + info!("Pool warm-up: {ok} conns warmed up out of {warm_n}. Success rate: {:.2}%", ok as f64 / warm_n as f64 * 100.0); + } +} + + +fn build_pg_config(settings: &PgSettings) -> Config { + // Initialize the Postgres configuration + let mut cfg: Config = settings.url.parse::().expect("invalid POSTGRES_DB_URL"); + cfg.application_name("rust-api"); + cfg.connect_timeout(Duration::from_secs(settings.conn_timeout)); + + cfg +} - // Max Cache TTL, get from the environment variable - let time_to_live: u64 = env_var("CACHE_TIME_TO_LIVE") - .unwrap_or("300".to_string()) - .parse() - .expect("CACHE_TIME_TO_LIVE must be a number"); +fn init_pg_pool(pg_settings: &PgSettings) -> PgPool { + // Get the Postgres base configuration + let cfg: Config = build_pg_config(pg_settings); + let mgr = Manager::from_config( + cfg, + NoTls, + deadpool_postgres::ManagerConfig { + recycling_method: RecyclingMethod::Fast, + }, + ); + + let pool = PgPool::builder(mgr) + .max_size(pg_settings.max_pool_size) + .runtime(Runtime::Tokio1) + .timeouts(Timeouts { + // how long to wait for an idle connection from the pool + wait: Some(Duration::from_secs(pg_settings.wait_timeout)), + // how long to spend creating a new connection (if pool can grow) + create: Some(Duration::from_secs(pg_settings.new_connection_timeout)), + // how long to spend recycling/validating a connection + recycle: Some(Duration::from_secs(pg_settings.recycle_timeout)), + }) + .build() + .expect("failed to build pg pool"); + + info!("Postgres pool initialized (max_pool_size={})", pg_settings.max_pool_size); + pool +} + + +fn init_cache(cache_settings: &MokaSettings) -> AppCache { // Build the AppCache let cache: AppCache = AppCache::builder() - .max_capacity(max_capacity) - .time_to_live(Duration::from_secs(time_to_live)) + .max_capacity(cache_settings.cache_size) + .time_to_live(cache_settings.expiration_time) .build(); + info!("In-memory cache initialized (max_capacity={})", cache_settings.cache_size); cache } + pub async fn init() -> (webData, web::Data) { - info!("Starting the server by initializing the logger and the in-memory cache"); - // Initializers for the logger and the database - env_logger::init(); // Initialize the logger to log all the logs + // Preparing to start the server by collecting environment variables + let app_settings: AppSettings = AppSettings::from_env(); + + if app_settings.enable_logging { + let _ = env_logger::try_init(); // Initialize the logger to log all the logs + info!("Starting the server by initializing the application state"); + } // Initialize the Postgres client - let postgres_state = init_postgres().await; + let postgres_state = init_pg_pool(&app_settings.pg_settings); + + // Warm up the connection pool if enabled + warm_pool(&postgres_state, &app_settings.pg_settings).await; // Initialize the in-memory cache (Moka) - let in_mem_cache = init_cache(); + let in_mem_cache = init_cache(&app_settings.cache_settings); // Wrap the state of the application and share it (webData::new(postgres_state), webData::new(in_mem_cache)) diff --git a/src/types.rs b/src/types.rs index 832ad9c..e915c98 100644 --- a/src/types.rs +++ b/src/types.rs @@ -1,8 +1,10 @@ use moka::future::Cache; use std::sync::Arc; -pub type Key = Arc; -pub type Value = String; + +// Cache key and value types +type Key = Arc; +type Value = String; pub type AppCache = Cache;