Skip to content

Commit

Permalink
Add bmp relay feature
Browse files Browse the repository at this point in the history
  • Loading branch information
yuyuyureka committed Jan 23, 2025
1 parent ed4df46 commit 345f811
Show file tree
Hide file tree
Showing 8 changed files with 270 additions and 5 deletions.
3 changes: 1 addition & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ tokio-util = { version = "0.7", features = ["codec"] }
weak-table = "0.3"
nibbletree = { version = "0.2", path = "./nibbletree", features = ["ipnet"] }
autometrics = { version = "0.3", features = ["prometheus-exporter"] }
zettabgp = "0.3.4"
zettabgp = { version = "0.3.4", git = "https://github.com/wobcom/zettabgp" }
hickory-resolver = "0.24"
include_dir = { version = "0.7", optional = true }
mime_guess = { version = "2.0", optional = true }
Expand Down
2 changes: 1 addition & 1 deletion config.example.yml
Original file line number Diff line number Diff line change
Expand Up @@ -6,4 +6,4 @@ collectors:
collector_type: Bmp
bind: "[::]:11019"
peers:
"192.0.2.1": {}
"2a0e:b940:0:2:a00e:f9ff:fe1b:b7e9": {}
160 changes: 160 additions & 0 deletions src/bmp_relay.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,160 @@
use crate::compressed_attrs::decompress_route_attrs;
use crate::store::make_bgp_withdraw;
use crate::store::TableSelector;
use crate::store_impl::InMemoryStore;
use crate::table_impl::Action;
use crate::table_stream::table_stream;
use futures_util::pin_mut;
use futures_util::StreamExt;
use serde::Deserialize;
use std::net::IpAddr;
use std::net::Ipv4Addr;
use std::net::SocketAddr;
use std::time::Duration;
use tokio::io::AsyncWriteExt;
use tokio::net::TcpSocket;
use tokio::net::TcpStream;
use zettabgp::bmp::prelude::*;
use zettabgp::prelude::*;

#[derive(Debug, Deserialize)]
pub struct RelayConfig {
table: TableSelector,

/// For LocRIB fake BGP open message
router_id: Ipv4Addr,
asn: u32,

monitoring_station: SocketAddr,

bind_addr: Option<IpAddr>,
bind_port: Option<u16>,
}

async fn connect(cfg: &RelayConfig) -> std::io::Result<TcpStream> {
let (sock, default_bind_addr) = match cfg.monitoring_station.ip() {
IpAddr::V4(_) => (TcpSocket::new_v4()?, "0.0.0.0".parse().unwrap()),
IpAddr::V6(_) => (TcpSocket::new_v6()?, "::".parse().unwrap()),
};
let bind_addr = SocketAddr::new(
cfg.bind_addr.unwrap_or(default_bind_addr),
cfg.bind_port.unwrap_or(0),
);
sock.bind(bind_addr)?;
Ok(sock.connect(cfg.monitoring_station).await?)
}

async fn run_(cfg: RelayConfig, store: InMemoryStore) -> ! {
let table = store.get_table(cfg.table.clone());
let mut buf = [0; 10000];
'outer: loop {
let updates_stream = table_stream(&table);
pin_mut!(updates_stream);

let mut tcp_stream = match connect(&cfg).await {
Err(_) => {
log::info!("trying to connect {}", cfg.monitoring_station);
tokio::time::sleep(Duration::from_secs(5)).await;
continue 'outer;
}
Ok(v) => v,
};
log::info!("connected {}", cfg.monitoring_station);

let fake_open_message = BgpOpenMessage {
as_num: cfg.asn,
caps: vec![
BgpCapability::SafiIPv4u,
BgpCapability::SafiIPv6u,
BgpCapability::SafiVPNv4u,
BgpCapability::SafiVPNv6u,
BgpCapability::CapRR,
BgpCapability::CapASN32(cfg.asn),
],
hold_time: 0,
router_id: cfg.router_id,
};
let peer_hdr = BmpMessagePeerHeader {
peertype: 3,
flags: 0,
peerdistinguisher: BgpRD::new(0, 0),
peeraddress: "::".parse().unwrap(),
asnum: cfg.asn,
routerid: cfg.router_id,
timestamp: 0,
};
let mut bmp_messages = futures_util::stream::iter([
BmpMessage::Initiation(BmpMessageInitiation {
str0: None,
sys_descr: None,
sys_name: None,
}),
BmpMessage::PeerUpNotification(BmpMessagePeerUp {
peer: peer_hdr.clone(),
localaddress: "::".parse().unwrap(),
localport: 0,
remoteport: 0,
msg1: fake_open_message.clone(),
msg2: fake_open_message,
}),
])
.chain(updates_stream.map(|action| {
let update = match action {
(net, num, Action::Withdraw) => {
if num != 0 {
//log::warn!("add-paths table is not yet implemented");
}
make_bgp_withdraw(net)
}
(net, num, Action::Update(attrs)) => {
if num != 0 {
//log::warn!("add-paths table is not yet implemented");
}
decompress_route_attrs(&attrs).to_bgp_update(net)
}
};

BmpMessage::RouteMonitoring(BmpMessageRouteMonitoring {
peer: peer_hdr.clone(),
update,
})
}));

'inner: while let Some(bmp_msg) = bmp_messages.next().await {
log::trace!("sending message {}: {:?}", cfg.monitoring_station, bmp_msg);
let mut len = 0;
match bmp_msg.encode_to(&mut buf[5..]) {
Ok(i) => len += i,
Err(e) => {
log::warn!("error encoding BMP message {:?}: {}", bmp_msg, e);
continue 'inner;
}
}
let msg_hdr = BmpMessageHeader {
version: 3,
msglength: len + 5,
};
len += msg_hdr.encode_to(&mut buf).unwrap();

if let Err(e) = tcp_stream.write_all(&buf[..len]).await {
log::warn!(
"resetting connection {:?}, reason: {}",
cfg.monitoring_station,
e
);
continue 'outer;
}
}
}
}

pub async fn run(
cfg: RelayConfig,
store: InMemoryStore,
mut shutdown: tokio::sync::watch::Receiver<bool>,
) -> anyhow::Result<()> {
tokio::select! {
_ = run_(cfg, store) => unreachable!(),
_ = shutdown.changed() => Ok(()),
}
}
3 changes: 3 additions & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ pub mod api;
pub mod bgp_collector;
mod bgpdumper;
pub mod bmp_collector;
pub mod bmp_relay;
mod compressed_attrs;
pub mod route_distinguisher;
pub mod store;
Expand Down Expand Up @@ -42,4 +43,6 @@ pub struct Config {
/// Only check config and exit
#[serde(default)]
pub config_check: bool,
#[serde(default)]
pub bmp_relays: HashMap<String, bmp_relay::RelayConfig>,
}
4 changes: 4 additions & 0 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,10 @@ async fn main() -> anyhow::Result<()> {
}),
);

futures.extend(cfg.bmp_relays.into_values().map(|relay| {
tokio::task::spawn(bmp_relay::run(relay, store.clone(), shutdown_rx.clone()))
}));

let mut sigint = signal(SignalKind::interrupt())?;
let mut sigterm = signal(SignalKind::terminate())?;
let res = tokio::select! {
Expand Down
99 changes: 99 additions & 0 deletions src/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,84 @@ impl Default for QueryLimits {
}
}

pub fn make_bgp_withdraw(net: IpNet) -> zettabgp::prelude::BgpUpdateMessage {
use zettabgp::prelude::*;

BgpUpdateMessage {
attrs: vec![BgpAttrItem::MPWithdraws(BgpMPWithdraws {
addrs: net_to_bgp_addrs(net),
})],
..Default::default()
}
}

impl RouteAttrs {
pub fn to_bgp_update(&self, net: IpNet) -> zettabgp::prelude::BgpUpdateMessage {
use zettabgp::prelude::*;

let mut attrs = vec![];

if let Some(nexthop) = self.nexthop {
attrs.push(BgpAttrItem::MPUpdates(BgpMPUpdates {
nexthop: std_addr_to_bgp_addr(nexthop),
addrs: net_to_bgp_addrs(net),
}));
} else {
warn!("Can not build MPUpdates without nexthop");
}

if let Some(communities) = &self.communities {
attrs.push(BgpAttrItem::CommunityList(BgpCommunityList {
value: communities
.iter()
.map(|(high, low)| BgpCommunity::new(((*high as u32) << 16) + *low as u32))
.collect(),
}));
}
if let Some(large_communities) = &self.large_communities {
attrs.push(BgpAttrItem::LargeCommunityList(BgpLargeCommunityList {
value: large_communities
.iter()
.cloned()
.map(|(ga, ldp1, ldp2)| BgpLargeCommunity { ga, ldp1, ldp2 })
.collect(),
}));
}

if let Some(med) = self.med {
attrs.push(BgpAttrItem::MED(BgpMED { value: med }));
}
if let Some(local_pref) = self.local_pref {
attrs.push(BgpAttrItem::LocalPref(BgpLocalpref { value: local_pref }));
}

if let Some(origin) = &self.origin {
attrs.push(BgpAttrItem::Origin(BgpOrigin {
value: match origin {
RouteOrigin::Igp => BgpAttrOrigin::Igp,
RouteOrigin::Egp => BgpAttrOrigin::Egp,
RouteOrigin::Incomplete => BgpAttrOrigin::Incomplete,
},
}));
}

if let Some(as_path) = &self.as_path {
attrs.push(BgpAttrItem::ASPath(BgpASpath {
value: as_path
.iter()
.cloned()
.map(|value| BgpAS { value })
.collect(),
}));
}

BgpUpdateMessage {
attrs,
..Default::default()
}
}
}

pub trait Store: Clone + Send + Sync + 'static {
fn update_route(
&self,
Expand Down Expand Up @@ -384,6 +462,27 @@ fn bgp_addrs_to_nets(
}
}

fn std_addr_to_bgp_addr(net: IpAddr) -> zettabgp::prelude::BgpAddr {
use zettabgp::prelude::*;
match net {
IpAddr::V4(v4) => BgpAddr::V4(v4),
IpAddr::V6(v6) => BgpAddr::V6(v6),
}
}
fn net_to_bgp_addrs(net: IpNet) -> zettabgp::prelude::BgpAddrs {
use zettabgp::prelude::*;
match net {
IpNet::V4(v4) => BgpAddrs::IPV4U(vec![BgpAddrV4 {
addr: v4.addr(),
prefixlen: v4.prefix_len(),
}]),
IpNet::V6(v6) => BgpAddrs::IPV6U(vec![BgpAddrV6 {
addr: v6.addr(),
prefixlen: v6.prefix_len(),
}]),
}
}

fn bgpv4addr_to_ipnet(addr: &BgpAddrV4) -> Option<IpNet> {
Ipv4Net::new(addr.addr, addr.prefixlen)
.inspect_err(|_| warn!("invalid BgpAddrs prefixlen"))
Expand Down
2 changes: 1 addition & 1 deletion src/store_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ impl InMemoryStore {
== query_router_id
}
}
fn get_table(&self, sel: TableSelector) -> InMemoryTable {
pub fn get_table(&self, sel: TableSelector) -> InMemoryTable {
self.tables
.lock()
.unwrap()
Expand Down

0 comments on commit 345f811

Please sign in to comment.