Skip to content

Commit 14cd046

Browse files
authored
Merge pull request #64 from aaseenib/webhook
Webhook
2 parents 4fd554c + 66df9cb commit 14cd046

8 files changed

Lines changed: 830 additions & 81 deletions

File tree

packages/core/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@ async-trait = "0.1"
5151

5252
# Metrics
5353
prometheus = "0.13"
54+
dashmap = "6"
5455

5556
[dev-dependencies]
5657
tokio = { version = "1", features = ["full", "test-util"] }

packages/core/src/alerts/mod.rs

Lines changed: 247 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1 +1,248 @@
11
pub mod webhook;
2+
3+
use std::collections::HashSet;
4+
use std::sync::Arc;
5+
6+
use chrono::Utc;
7+
use tokio::sync::Mutex;
8+
9+
use crate::insights::{InsightsUpdate, SpikeSeverity};
10+
11+
use self::webhook::{AlertPayload, WebhookDelivery};
12+
13+
#[derive(Clone)]
14+
pub struct AlertManager {
15+
webhook_delivery: Option<WebhookDelivery>,
16+
alert_threshold: SpikeSeverity,
17+
network: String,
18+
seen_spikes: Arc<Mutex<HashSet<String>>>,
19+
}
20+
21+
impl AlertManager {
22+
pub fn new(
23+
webhook_url: Option<String>,
24+
alert_threshold: SpikeSeverity,
25+
network: String,
26+
) -> Self {
27+
let webhook_delivery = webhook_url.map(WebhookDelivery::new);
28+
Self {
29+
webhook_delivery,
30+
alert_threshold,
31+
network,
32+
seen_spikes: Arc::new(Mutex::new(HashSet::new())),
33+
}
34+
}
35+
36+
pub async fn check_and_dispatch(&self, update: &InsightsUpdate) {
37+
let Some(delivery) = self.webhook_delivery.clone() else {
38+
return;
39+
};
40+
41+
for spike in &update.insights.congestion_trends.recent_spikes {
42+
if !meets_threshold(&spike.severity, &self.alert_threshold) {
43+
continue;
44+
}
45+
46+
let spike_id = format!(
47+
"{}:{}:{}",
48+
severity_to_str(&spike.severity),
49+
spike.start_time.timestamp(),
50+
spike.peak_fee
51+
);
52+
let should_dispatch = {
53+
let mut seen = self.seen_spikes.lock().await;
54+
seen.insert(spike_id)
55+
};
56+
57+
if !should_dispatch {
58+
continue;
59+
}
60+
61+
let payload = AlertPayload {
62+
event: "fee_spike_detected".to_string(),
63+
severity: severity_to_str(&spike.severity).to_string(),
64+
peak_fee: spike.peak_fee,
65+
baseline_fee: spike.baseline_fee,
66+
spike_ratio: spike.spike_ratio,
67+
start_time: spike.start_time.clone(),
68+
duration_seconds: spike.duration.num_seconds().max(0),
69+
network: self.network.clone(),
70+
timestamp: Utc::now(),
71+
};
72+
73+
tokio::spawn(async move {
74+
if let Err(err) = delivery.send_with_retry(&payload).await {
75+
tracing::error!("Webhook dispatch failed: {}", err);
76+
}
77+
});
78+
}
79+
}
80+
}
81+
82+
fn severity_rank(severity: &SpikeSeverity) -> u8 {
83+
match severity {
84+
SpikeSeverity::Minor => 0,
85+
SpikeSeverity::Moderate => 1,
86+
SpikeSeverity::Major => 2,
87+
SpikeSeverity::Critical => 3,
88+
}
89+
}
90+
91+
fn meets_threshold(severity: &SpikeSeverity, threshold: &SpikeSeverity) -> bool {
92+
severity_rank(severity) >= severity_rank(threshold)
93+
}
94+
95+
fn severity_to_str(severity: &SpikeSeverity) -> &'static str {
96+
match severity {
97+
SpikeSeverity::Minor => "Minor",
98+
SpikeSeverity::Moderate => "Moderate",
99+
SpikeSeverity::Major => "Major",
100+
SpikeSeverity::Critical => "Critical",
101+
}
102+
}
103+
104+
#[cfg(test)]
105+
mod tests {
106+
use super::*;
107+
use chrono::{DateTime, Duration, Utc};
108+
use wiremock::{
109+
Mock, MockServer, ResponseTemplate,
110+
matchers::{method, path},
111+
};
112+
113+
use crate::insights::{
114+
AverageResult, CongestionTrends, CurrentInsights, DataQuality, FeeSpike, RollingAverages,
115+
SpikeSeverity, TimeWindow, TrendIndicator, TrendStrength,
116+
};
117+
118+
fn build_update_with_spike(severity: SpikeSeverity) -> InsightsUpdate {
119+
let now = DateTime::parse_from_rfc3339("2025-01-14T10:47:00Z")
120+
.unwrap()
121+
.with_timezone(&Utc);
122+
let spike = FeeSpike {
123+
peak_fee: 5000,
124+
baseline_fee: 130.5,
125+
spike_ratio: 38.3,
126+
start_time: DateTime::parse_from_rfc3339("2025-01-14T10:45:00Z")
127+
.unwrap()
128+
.with_timezone(&Utc),
129+
duration: Duration::seconds(120),
130+
severity,
131+
};
132+
let window = TimeWindow {
133+
name: "1h".to_string(),
134+
duration: Duration::hours(1),
135+
min_samples: 1,
136+
};
137+
let avg = AverageResult {
138+
value: 130.5,
139+
sample_count: 10,
140+
is_partial: false,
141+
calculated_at: now,
142+
time_window: window.clone(),
143+
};
144+
145+
InsightsUpdate {
146+
insights: CurrentInsights {
147+
rolling_averages: RollingAverages {
148+
short_term: avg.clone(),
149+
medium_term: avg.clone(),
150+
long_term: avg,
151+
},
152+
extremes: crate::insights::FeeExtremes {
153+
current_min: crate::insights::ExtremeValue {
154+
value: 100,
155+
timestamp: now,
156+
transaction_hash: "min".to_string(),
157+
},
158+
current_max: crate::insights::ExtremeValue {
159+
value: 5000,
160+
timestamp: now,
161+
transaction_hash: "max".to_string(),
162+
},
163+
period_start: now - Duration::hours(1),
164+
period_end: now,
165+
},
166+
congestion_trends: CongestionTrends {
167+
current_trend: TrendIndicator::Rising,
168+
recent_spikes: vec![spike],
169+
trend_strength: TrendStrength::Strong,
170+
predicted_duration: None,
171+
},
172+
last_updated: now,
173+
data_quality: DataQuality {
174+
completeness: 1.0,
175+
freshness: Duration::seconds(5),
176+
has_gaps: false,
177+
last_gap: None,
178+
},
179+
},
180+
processing_time: Duration::milliseconds(1),
181+
data_points_processed: 1,
182+
}
183+
}
184+
185+
#[tokio::test]
186+
async fn spike_above_threshold_dispatches_webhook() {
187+
let server = MockServer::start().await;
188+
Mock::given(method("POST"))
189+
.and(path("/hook"))
190+
.respond_with(ResponseTemplate::new(200))
191+
.expect(1)
192+
.mount(&server)
193+
.await;
194+
195+
let manager = AlertManager::new(
196+
Some(format!("{}/hook", server.uri())),
197+
SpikeSeverity::Major,
198+
"mainnet".to_string(),
199+
);
200+
let update = build_update_with_spike(SpikeSeverity::Critical);
201+
202+
manager.check_and_dispatch(&update).await;
203+
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
204+
}
205+
206+
#[tokio::test]
207+
async fn spike_below_threshold_is_not_dispatched() {
208+
let server = MockServer::start().await;
209+
Mock::given(method("POST"))
210+
.and(path("/hook"))
211+
.respond_with(ResponseTemplate::new(200))
212+
.expect(0)
213+
.mount(&server)
214+
.await;
215+
216+
let manager = AlertManager::new(
217+
Some(format!("{}/hook", server.uri())),
218+
SpikeSeverity::Critical,
219+
"mainnet".to_string(),
220+
);
221+
let update = build_update_with_spike(SpikeSeverity::Major);
222+
223+
manager.check_and_dispatch(&update).await;
224+
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
225+
}
226+
227+
#[tokio::test]
228+
async fn same_spike_is_dispatched_once() {
229+
let server = MockServer::start().await;
230+
Mock::given(method("POST"))
231+
.and(path("/hook"))
232+
.respond_with(ResponseTemplate::new(200))
233+
.expect(1)
234+
.mount(&server)
235+
.await;
236+
237+
let manager = AlertManager::new(
238+
Some(format!("{}/hook", server.uri())),
239+
SpikeSeverity::Major,
240+
"mainnet".to_string(),
241+
);
242+
let update = build_update_with_spike(SpikeSeverity::Major);
243+
244+
manager.check_and_dispatch(&update).await;
245+
manager.check_and_dispatch(&update).await;
246+
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
247+
}
248+
}

0 commit comments

Comments
 (0)