Skip to content

Commit

Permalink
Add VRF support for FRR Prefix (#13)
Browse files Browse the repository at this point in the history
* Extend functional tests for VRFs

* Add VRF parameter to `FRRoutingPrefix`

* Implement VRF support in `FRRoutingPrefix`

* Rename vrf fixture and move to top level conftest

* Rename `_Prefix` and move it to top level conftest

* Create example VRF in `test_announce_adds_bgp_network`
  • Loading branch information
SRv6d authored Nov 13, 2023
1 parent 1defe96 commit ee5f1f4
Show file tree
Hide file tree
Showing 4 changed files with 84 additions and 35 deletions.
35 changes: 26 additions & 9 deletions src/anycastd/prefix/frrouting.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,23 +3,29 @@
from contextlib import suppress
from ipaddress import IPv4Network, IPv6Network
from pathlib import Path
from typing import TypeAlias

from anycastd._base import BaseExecutor
from anycastd.prefix.base import BasePrefix

VRF: TypeAlias = str | None


class FRRoutingPrefix(BasePrefix):
vrf: VRF
vtysh: Path
executor: BaseExecutor

def __init__(
self,
prefix: IPv4Network | IPv6Network,
*,
vrf: VRF = None,
vtysh: Path = Path("/usr/bin/vtysh"),
executor: BaseExecutor,
):
super().__init__(prefix)
self.vrf = vrf
self.vtysh = vtysh
self.executor = executor

Expand All @@ -29,9 +35,12 @@ async def is_announced(self) -> bool:
Checks if the respective BGP prefix is configured in the default VRF.
"""
family = get_afi(self)
show_prefix = await self._run_vtysh_commands(
(f"show bgp {family} unicast {self.prefix} json",)
cmd = (
f"show bgp vrf {self.vrf} {family} unicast {self.prefix} json"
if self.vrf
else f"show bgp {family} unicast {self.prefix} json"
)
show_prefix = await self._run_vtysh_commands((cmd,))
prefix_info = json.loads(show_prefix)

with suppress(KeyError):
Expand All @@ -49,12 +58,12 @@ async def announce(self) -> None:
Adds the respective BGP prefix to the default VRF.
"""
family = get_afi(self)
asn = await self._get_default_local_asn()
asn = await self._get_local_asn()

await self._run_vtysh_commands(
(
"configure terminal",
f"router bgp {asn}",
f"router bgp {asn} vrf {self.vrf}" if self.vrf else f"router bgp {asn}",
f"address-family {family} unicast",
f"network {self.prefix}",
)
Expand All @@ -66,24 +75,32 @@ async def denounce(self) -> None:
Removes the respective BGP prefix from the default VRF.
"""
family = get_afi(self)
asn = await self._get_default_local_asn()
asn = await self._get_local_asn()

await self._run_vtysh_commands(
(
"configure terminal",
f"router bgp {asn}",
f"router bgp {asn} vrf {self.vrf}" if self.vrf else f"router bgp {asn}",
f"address-family {family} unicast",
f"no network {self.prefix}",
)
)

async def _get_default_local_asn(self) -> int:
"""Returns the local ASN in the default VRF.
async def _get_local_asn(self) -> int:
"""Returns the local ASN in the VRF of the prefix.
Raises:
RuntimeError: Failed to get the local ASN.
"""
show_bgp_detail = await self._run_vtysh_commands(("show bgp detail json",))
show_bgp_detail = await self._run_vtysh_commands(
(
(
f"show bgp vrf {self.vrf} detail json"
if self.vrf
else "show bgp detail json"
),
)
)
bgp_detail = json.loads(show_bgp_detail)
if warning := bgp_detail.get("warning"):
raise RuntimeError(f"Failed to get local ASN: {warning}")
Expand Down
13 changes: 12 additions & 1 deletion tests/conftest.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,11 @@
from ipaddress import IPv4Network, IPv6Network
from typing import TypeAlias

import pytest

_IP_Prefix: TypeAlias = IPv4Network | IPv6Network
_VRF: TypeAlias = str | None


@pytest.fixture(scope="session")
def example_asn() -> int:
Expand All @@ -21,5 +25,12 @@ def ipv6_example_network() -> IPv6Network:
@pytest.fixture(
scope="session", params=["ipv4_example_network", "ipv6_example_network"]
)
def example_networks(request) -> IPv4Network | IPv6Network:
def example_networks(request) -> _IP_Prefix:
"""Parametrize tests with example prefixes."""
return request.getfixturevalue(request.param)


@pytest.fixture(params=[None, "vrf-func-test"])
def example_vrfs(request) -> _VRF:
"""Parametrize tests with example VRFs."""
return request.param
38 changes: 22 additions & 16 deletions tests/prefix/frrouting/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,15 @@
from collections.abc import Callable
from contextlib import suppress
from dataclasses import dataclass, field
from ipaddress import IPv4Network, IPv6Network
from ipaddress import IPv4Network
from pathlib import Path
from typing import TypeAlias

import pytest

_Prefix: TypeAlias = IPv4Network | IPv6Network
from tests.conftest import _VRF, _IP_Prefix


def get_afi(prefix: _Prefix) -> str:
def get_afi(prefix: _IP_Prefix) -> str:
"""Return the FRR string AFI for the given IP type."""
return "ipv6" if not isinstance(prefix, IPv4Network) else "ipv4"

Expand Down Expand Up @@ -126,16 +125,17 @@ def vtysh(frr_container) -> Vtysh:


@pytest.fixture
def bgp_prefix_configured() -> Callable[[_Prefix, Vtysh], bool]:
def bgp_prefix_configured() -> Callable[[_IP_Prefix, Vtysh, _VRF], bool]:
"""A callable that can be used to check if a BGP prefix is configured."""

def _(prefix: _Prefix, vtysh: Vtysh) -> bool:
def _(prefix: _IP_Prefix, vtysh: Vtysh, vrf: _VRF = None) -> bool:
family = get_afi(prefix)
show_prefix = vtysh(
f"show ip bgp {family} unicast {prefix} json",
configure_terminal=False,
context=[],
cmd = (
f"show ip bgp vrf {vrf} {family} unicast {prefix} json"
if vrf
else f"show ip bgp {family} unicast {prefix} json"
)
show_prefix = vtysh(cmd, configure_terminal=False, context=[])
prefix_info = json.loads(show_prefix)

with suppress(KeyError):
Expand All @@ -151,32 +151,38 @@ def _(prefix: _Prefix, vtysh: Vtysh) -> bool:


@pytest.fixture
def add_bgp_prefix() -> Callable[[_Prefix, int, Vtysh], None]:
def add_bgp_prefix() -> Callable[[_IP_Prefix, int, Vtysh, _VRF], None]:
"""A callable that can be used to add a BGP prefix."""

def _(prefix: _Prefix, asn: int, vtysh: Vtysh) -> None:
def _(prefix: _IP_Prefix, asn: int, vtysh: Vtysh, vrf: _VRF = None) -> None:
"""Add a network to the BGP configuration using vtysh."""
family = get_afi(prefix)
vtysh(
f"network {prefix}",
configure_terminal=True,
context=[f"router bgp {asn}", f"address-family {family} unicast"],
context=[
f"router bgp {asn} vrf {vrf}" if vrf else f"router bgp {asn}",
f"address-family {family} unicast",
],
)

return _


@pytest.fixture
def remove_bgp_prefix() -> Callable[[_Prefix, int, Vtysh], None]:
def remove_bgp_prefix() -> Callable[[_IP_Prefix, int, Vtysh, _VRF], None]:
"""A callable that can be used to remove a BGP prefix."""

def _(prefix: _Prefix, asn: int, vtysh: Vtysh) -> None:
def _(prefix: _IP_Prefix, asn: int, vtysh: Vtysh, vrf: _VRF = None) -> None:
"""Remove a network from the BGP configuration using vtysh."""
family = get_afi(prefix)
vtysh(
f"no network {prefix}",
configure_terminal=True,
context=[f"router bgp {asn}", f"address-family {family} unicast"],
context=[
f"router bgp {asn} vrf {vrf}" if vrf else f"router bgp {asn}",
f"address-family {family} unicast",
],
)

return _
33 changes: 24 additions & 9 deletions tests/prefix/frrouting/test_basic_functionality.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,37 +21,49 @@ async def test_announce_adds_bgp_network( # noqa: PLR0913
vtysh,
docker_executor,
example_networks,
example_vrfs,
bgp_prefix_configured,
remove_bgp_prefix,
example_asn,
):
"""Announcing adds the corresponding BGP prefix to the configuration."""
prefix = FRRoutingPrefix(example_networks, executor=docker_executor)
prefix = FRRoutingPrefix(
example_networks, vrf=example_vrfs, executor=docker_executor
)
if example_vrfs:
vtysh(f"router bgp {example_asn} vrf {example_vrfs}", configure_terminal=True)

await prefix.announce()

assert bgp_prefix_configured(prefix.prefix, vtysh)
assert bgp_prefix_configured(prefix.prefix, vtysh=vtysh, vrf=example_vrfs)

# Clean up
remove_bgp_prefix(prefix.prefix, example_asn, vtysh)
remove_bgp_prefix(prefix.prefix, asn=example_asn, vtysh=vtysh, vrf=example_vrfs)
if example_vrfs:
vtysh(
f"no router bgp {example_asn} vrf {example_vrfs}", configure_terminal=True
)


@pytest.mark.asyncio
async def test_denounce_removes_bgp_network( # noqa: PLR0913
vtysh,
docker_executor,
example_networks,
example_vrfs,
example_asn,
bgp_prefix_configured,
add_bgp_prefix,
):
"""Denouncing removes the corresponding BGP prefix from the configuration."""
prefix = FRRoutingPrefix(example_networks, executor=docker_executor)
add_bgp_prefix(prefix.prefix, asn=example_asn, vtysh=vtysh)
prefix = FRRoutingPrefix(
example_networks, vrf=example_vrfs, executor=docker_executor
)
add_bgp_prefix(prefix.prefix, asn=example_asn, vtysh=vtysh, vrf=example_vrfs)

await prefix.denounce()

assert not bgp_prefix_configured(prefix.prefix, vtysh)
assert not bgp_prefix_configured(prefix.prefix, vtysh=vtysh, vrf=example_vrfs)


@pytest.mark.asyncio
Expand All @@ -60,18 +72,21 @@ async def test_announcement_state_reported_correctly( # noqa: PLR0913
vtysh,
docker_executor,
example_networks,
example_vrfs,
example_asn,
add_bgp_prefix,
remove_bgp_prefix,
announced: bool,
):
"""The announcement state is reported correctly."""
prefix = FRRoutingPrefix(example_networks, executor=docker_executor)
prefix = FRRoutingPrefix(
example_networks, vrf=example_vrfs, executor=docker_executor
)
if announced:
add_bgp_prefix(prefix.prefix, asn=example_asn, vtysh=vtysh)
add_bgp_prefix(prefix.prefix, asn=example_asn, vtysh=vtysh, vrf=example_vrfs)

assert await prefix.is_announced() == announced

# Clean up
if announced:
remove_bgp_prefix(prefix.prefix, example_asn, vtysh)
remove_bgp_prefix(prefix.prefix, asn=example_asn, vtysh=vtysh, vrf=example_vrfs)

0 comments on commit ee5f1f4

Please sign in to comment.