Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add BMP Relay feature #23

Open
wants to merge 9 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 26 additions & 4 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,12 @@ tokio-util = { version = "0.7", features = ["codec"] }
weak-table = "0.3"
nibbletree = { version = "0.2", path = "./nibbletree", features = ["ipnet"] }
autometrics = { version = "0.3", features = ["prometheus-exporter"] }
zettabgp = "0.3.4"
zettabgp = { version = "0.3.4", git = "https://github.com/wladwm/zettabgp" }
hickory-resolver = "0.24"
include_dir = { version = "0.7", optional = true }
mime_guess = { version = "2.0", optional = true }
figment = { version = "0.10", features = ["yaml", "env"] }
async-stream = "0.3.6"

[features]
embed-static = ["include_dir", "mime_guess"]
2 changes: 1 addition & 1 deletion config.example.yml
Original file line number Diff line number Diff line change
Expand Up @@ -6,4 +6,4 @@ collectors:
collector_type: Bmp
bind: "[::]:11019"
peers:
"192.0.2.1": {}
"2a0e:b940:0:2:a00e:f9ff:fe1b:b7e9": {}
50 changes: 16 additions & 34 deletions frontend/src/resultsView.js
Original file line number Diff line number Diff line change
Expand Up @@ -88,56 +88,38 @@ const processResults = (results) => {
const asnMap = Object.fromEntries(asnResults.map(r => [r.asn, r.asn_name ]));
const communityMap = Object.fromEntries(communityResults.map(r => [r.community, r.community_description ]));

// stage 1, combine pre- and post-policy adj-in tables
// start out with PostPolicy
const preAndPostPolicy = {};
const preAndPostPolicyKey = route => `${route.session_id.from_client}:${route.session_id.peer_address}:${route.net}`;
// stage 1, combine seen and accepted routes
// start out with Accepted
const seenAndAccepted = {};
const seenAndAcceptedKey = route => `${route.session_id.from_client}:${route.session_id.listener}:${route.session_id.peer_address}:${route.net}`;
for (let route of routeResults) {
if (route.type === "PostPolicyAdjIn") {
preAndPostPolicy[preAndPostPolicyKey(route)] = route;
if (route.state === "Accepted") {
seenAndAccepted[seenAndAcceptedKey(route)] = route;
}
}
// add routes which are _only_ in PrePolicy => have not been accepted
for (let route of routeResults) {
if (route.type === "PrePolicyAdjIn") {
const key = preAndPostPolicyKey(route);
if (!preAndPostPolicy[key]) {
preAndPostPolicy[key] = route;
preAndPostPolicy[key].state = "Filtered";
if (route.type === "Seen") {
const key = seenAndAcceptedKey(route);
if (!seenAndAccepted[key]) {
seenAndAccepted[key] = route;
seenAndAccepted[key].state = "Filtered";
}
}
}

// stage 2, combine adj-in and loc-rib
// stage 2, combine Seen/Accepted and Accepted/Active/Selected (add-paths export / loc-rib)
const all = {};
const allKey = route => `${route.client_name}:${route.net}:${JSON.stringify(route.as_path)}:${JSON.stringify(route.large_communities)}:${route.nexthop}`;
for (let route of Object.values(preAndPostPolicy)) {
for (let route of Object.values(seenAndAccepted)) {
const key = allKey(route);
all[key] = route;
}
for (let route of routeResults) {
if (route.table === "LocRib" && route.state === "Accepted") {
const key = allKey(route);
if (all[key])
all[key].state = "Accepted";
else
all[key] = route;
}
}
for (let route of routeResults) {
if (route.table === "LocRib" && route.state === "Active") {
const key = allKey(route);
if (all[key])
all[key].state = "Active";
else
all[key] = route;
}
}
for (let route of routeResults) {
if (route.table === "LocRib" && route.state === "Selected") {
for (let state of ["Accepted", "Active", "Selected"]) {
for (let route of routeResults.filter(route => route.state === state)) {
const key = allKey(route);
if (all[key])
all[key].state = "Selected";
all[key].state = state;
else
all[key] = route;
}
Expand Down
16 changes: 14 additions & 2 deletions src/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -340,7 +340,14 @@ async fn tables<T: Store>(State(AppState { store, .. }): State<AppState<T>>) ->
serde_json::to_string(&store.get_tables()).unwrap()
}
async fn routers<T: Store>(State(AppState { store, .. }): State<AppState<T>>) -> impl IntoResponse {
serde_json::to_string(&store.get_routers()).unwrap()
serde_json::to_string(
&store
.get_routers()
.into_iter()
.map(|(k, v)| (format!("{},{}", k.0, k.1), v))
.collect::<HashMap<_, _>>(),
)
.unwrap()
}

async fn routing_instances<T: Store>(
Expand All @@ -349,7 +356,12 @@ async fn routing_instances<T: Store>(
let instances = store
.get_routing_instances()
.into_iter()
.map(|(k, v)| (k, v.into_iter().map(|v| (v, v)).collect::<Vec<_>>()))
.map(|(k, v)| {
(
format!("{},{}", k.0, k.1),
v.into_iter().map(|v| (v, v)).collect::<Vec<_>>(),
)
})
.collect::<HashMap<_, _>>();

serde_json::to_string(&instances).unwrap()
Expand Down
19 changes: 11 additions & 8 deletions src/bgp_collector.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use crate::bgpdumper::BgpDumper;
use crate::route_distinguisher::RouteDistinguisher;
use crate::store::{Client, RouteState, SessionId, Store, TableSelector, TableType};
use crate::store::{Client, RouteState, SessionId, Store, TableSelector};
use futures_util::future::join_all;
use futures_util::TryStreamExt;
use log::*;
Expand All @@ -21,6 +21,7 @@ pub async fn run_peer(
store: impl Store,
stream: TcpStream,
client_addr: SocketAddr,
listener_name: String,
) -> anyhow::Result<BgpNotificationMessage> {
let mut caps = vec![
BgpCapability::SafiIPv4u,
Expand Down Expand Up @@ -61,7 +62,8 @@ pub async fn run_peer(
.unwrap_or(client_addr.ip().to_string());
store
.client_up(
client_addr,
client_addr.ip(),
listener_name.clone(),
cfg.route_state,
Client {
client_name,
Expand All @@ -80,12 +82,11 @@ pub async fn run_peer(
.insert_bgp_update(
TableSelector {
session_id: SessionId {
from_client: client_addr,
from_client: client_addr.ip(),
listener: listener_name.clone(),
peer_address: client_addr.ip(),
},
table_type: TableType::LocRib {
route_state: cfg.route_state,
},
route_state: cfg.route_state,
route_distinguisher: RouteDistinguisher::Default,
},
update,
Expand Down Expand Up @@ -113,6 +114,7 @@ pub struct BgpCollectorConfig {
}

pub async fn run(
name: String,
cfg: BgpCollectorConfig,
store: impl Store,
mut shutdown: tokio::sync::watch::Receiver<bool>,
Expand All @@ -127,10 +129,11 @@ pub async fn run(

if let Some(peer_cfg) = cfg.peers.get(&client_addr.ip()).or(cfg.default_peer_config.as_ref()).cloned() {
let store = store.clone();
let name = name.clone();
let mut shutdown = shutdown.clone();
running_tasks.push(tokio::spawn(async move {
tokio::select! {
res = run_peer(peer_cfg, store.clone(), io, client_addr) => {
res = run_peer(peer_cfg, store.clone(), io, client_addr, name.clone()) => {
match res {
Err(e) => warn!("disconnected {} {}", client_addr, e),
Ok(notification) => info!("disconnected {} {:?}", client_addr, notification),
Expand All @@ -139,7 +142,7 @@ pub async fn run(
_ = shutdown.changed() => {
}
};
store.client_down(client_addr).await;
store.client_down(client_addr.ip(), name.clone()).await;
}));
} else {
info!("unexpected connection from {}", client_addr);
Expand Down
Loading