Skip to content

Commit 890288f

Browse files
committed
Add channel and cache health check endpoints; update state initialization
1 parent 6b61bb7 commit 890288f

4 files changed

Lines changed: 63 additions & 6 deletions

File tree

src/main.rs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,13 +13,14 @@ mod db;
1313

1414
#[actix_web::main]
1515
async fn main() -> std::io::Result<()> {
16-
let (pg_pool, in_mem_cache) = state::init().await;
16+
let (pg_pool, in_mem_cache, tx) = state::init().await;
1717

1818
// Start the Actix web server
1919
HttpServer::new(move || {
2020
App::new()
2121
.app_data(pg_pool.clone())
2222
.app_data(in_mem_cache.clone())
23+
.app_data(tx.clone())
2324
.wrap(Cors::default()
2425
.allow_any_origin()
2526
.allowed_methods(vec!["GET", "POST"])
@@ -30,6 +31,8 @@ async fn main() -> std::io::Result<()> {
3031
actix_scope("/health")
3132
.service(health::api_health_check)
3233
.service(health::db_health_check)
34+
.service(health::cache_health_check)
35+
.service(health::channel_health_check)
3336
)
3437
.service(
3538
actix_scope("/sample_db")

src/routes/health.rs

Lines changed: 31 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
use crate::db::pgsql_handlers::health_check as health_check_pgsql;
2-
use actix_web::{get, web, HttpResponse, Responder};
2+
use actix_web::{get, post, web, HttpResponse, Responder};
3+
use crate::types::{AppCache, make_key};
34
use deadpool_postgres::Pool as PgPool;
5+
use std::sync::mpsc::Sender;
46

57

68
// Health check endpoint
@@ -15,6 +17,33 @@ async fn api_health_check() -> impl Responder {
1517
async fn db_health_check(state: web::Data<PgPool>) -> impl Responder {
1618
match health_check_pgsql(&state).await {
1719
Ok(_) => HttpResponse::Ok().body("Database is running!"),
18-
Err(err) => HttpResponse::InternalServerError().json(format!("Failed: {}", err)),
20+
Err(err) => HttpResponse::PreconditionFailed().json(format!("Failed: {}", err)),
1921
}
2022
}
23+
24+
25+
// Cache health check
26+
#[get("/cache")]
27+
async fn cache_health_check(cache: web::Data<AppCache>) -> impl Responder {
28+
const CACHE_KEY: &str = "health_check";
29+
const CACHE_VALUE: &str = "Cache is running!";
30+
let key = make_key(CACHE_KEY);
31+
32+
cache.insert(key, CACHE_VALUE.to_string()).await;
33+
34+
if let Some(cached_value) = cache.get(&make_key(CACHE_KEY)).await {
35+
if cached_value == CACHE_VALUE {
36+
return HttpResponse::Ok().body(cached_value);
37+
}
38+
}
39+
HttpResponse::PreconditionFailed().body("Cache health check failed!")
40+
}
41+
42+
43+
// Channel Health check
44+
#[post("/channel")]
45+
async fn channel_health_check(state: web::Data<Sender<u8>>) -> impl Responder {
46+
// Send a 7 int to the channel
47+
let _ = state.send(7);
48+
HttpResponse::Ok().body("Channel health check initiated!")
49+
}

src/state.rs

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,11 +2,12 @@ use deadpool_postgres::{Manager, RecyclingMethod, Pool as PgPool};
22
use deadpool::{managed::Timeouts, Runtime};
33
use actix_web::web::Data as webData;
44
use tokio_postgres::{Config, NoTls};
5+
use crate::types::process_channel;
56
use std::env::var as env_var;
7+
use std::sync::mpsc::Sender;
68
use super::types::AppCache;
79
use std::time::Duration;
810
use log::{info, warn};
9-
use actix_web::web;
1011

1112

1213
struct PgSettings {
@@ -215,7 +216,7 @@ fn init_cache(cache_settings: &MokaSettings) -> AppCache {
215216
}
216217

217218

218-
pub async fn init() -> (webData<PgPool>, web::Data<AppCache>) {
219+
pub async fn init() -> (webData<PgPool>, webData<AppCache>, webData<Sender<u8>>) {
219220
// Preparing to start the server by collecting environment variables
220221
let app_settings: AppSettings = AppSettings::from_env();
221222

@@ -233,6 +234,10 @@ pub async fn init() -> (webData<PgPool>, web::Data<AppCache>) {
233234
// Initialize the in-memory cache (Moka)
234235
let in_mem_cache = init_cache(&app_settings.cache_settings);
235236

237+
// Initialize the channel
238+
let (tx, rx) = std::sync::mpsc::channel::<u8>();
239+
process_channel(rx);
240+
236241
// Wrap the state of the application and share it
237-
(webData::new(postgres_state), webData::new(in_mem_cache))
242+
(webData::new(postgres_state), webData::new(in_mem_cache), webData::new(tx))
238243
}

src/types.rs

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
use std::sync::mpsc::Receiver;
12
use moka::future::Cache;
23
use std::sync::Arc;
34

@@ -31,3 +32,22 @@ pub async fn cache_data<T, S>(key: S, value: &T, cache_conn: &AppCache)
3132
// Insert the JSON string into the cache
3233
cache_conn.insert(make_key(key), json).await;
3334
}
35+
36+
37+
/// Process the channel
38+
pub fn process_channel(rx: Receiver<u8>) {
39+
std::thread::spawn(move || {
40+
// Keep reading
41+
loop {
42+
match rx.recv() {
43+
Ok(val) => {
44+
log::info!("Received: {}", val);
45+
}
46+
Err(_) => {
47+
log::warn!("Channel closed");
48+
break;
49+
}
50+
}
51+
}
52+
});
53+
}

0 commit comments

Comments
 (0)