From ee5f1f4d643b6a00cdb128ac2240cc1044f2d707 Mon Sep 17 00:00:00 2001 From: Marvin Vogt Date: Mon, 13 Nov 2023 15:44:32 +0100 Subject: [PATCH] Add VRF support for FRR Prefix (#13) * Extend functional tests for VRFs * Add VRF parameter to `FRRoutingPrefix` * Implement VRF support in `FRRoutingPrefix` * Rename vrf fixture and move to top level conftest * Rename `_Prefix` and move it to top level conftest * Create example VRF in `test_announce_adds_bgp_network` --- src/anycastd/prefix/frrouting.py | 35 ++++++++++++----- tests/conftest.py | 13 ++++++- tests/prefix/frrouting/conftest.py | 38 +++++++++++-------- .../frrouting/test_basic_functionality.py | 33 +++++++++++----- 4 files changed, 84 insertions(+), 35 deletions(-) diff --git a/src/anycastd/prefix/frrouting.py b/src/anycastd/prefix/frrouting.py index 0a73d8a..e0e0e75 100644 --- a/src/anycastd/prefix/frrouting.py +++ b/src/anycastd/prefix/frrouting.py @@ -3,12 +3,16 @@ from contextlib import suppress from ipaddress import IPv4Network, IPv6Network from pathlib import Path +from typing import TypeAlias from anycastd._base import BaseExecutor from anycastd.prefix.base import BasePrefix +VRF: TypeAlias = str | None + class FRRoutingPrefix(BasePrefix): + vrf: VRF vtysh: Path executor: BaseExecutor @@ -16,10 +20,12 @@ def __init__( self, prefix: IPv4Network | IPv6Network, *, + vrf: VRF = None, vtysh: Path = Path("/usr/bin/vtysh"), executor: BaseExecutor, ): super().__init__(prefix) + self.vrf = vrf self.vtysh = vtysh self.executor = executor @@ -29,9 +35,12 @@ async def is_announced(self) -> bool: Checks if the respective BGP prefix is configured in the default VRF. """ family = get_afi(self) - show_prefix = await self._run_vtysh_commands( - (f"show bgp {family} unicast {self.prefix} json",) + cmd = ( + f"show bgp vrf {self.vrf} {family} unicast {self.prefix} json" + if self.vrf + else f"show bgp {family} unicast {self.prefix} json" ) + show_prefix = await self._run_vtysh_commands((cmd,)) prefix_info = json.loads(show_prefix) with suppress(KeyError): @@ -49,12 +58,12 @@ async def announce(self) -> None: Adds the respective BGP prefix to the default VRF. """ family = get_afi(self) - asn = await self._get_default_local_asn() + asn = await self._get_local_asn() await self._run_vtysh_commands( ( "configure terminal", - f"router bgp {asn}", + f"router bgp {asn} vrf {self.vrf}" if self.vrf else f"router bgp {asn}", f"address-family {family} unicast", f"network {self.prefix}", ) @@ -66,24 +75,32 @@ async def denounce(self) -> None: Removes the respective BGP prefix from the default VRF. """ family = get_afi(self) - asn = await self._get_default_local_asn() + asn = await self._get_local_asn() await self._run_vtysh_commands( ( "configure terminal", - f"router bgp {asn}", + f"router bgp {asn} vrf {self.vrf}" if self.vrf else f"router bgp {asn}", f"address-family {family} unicast", f"no network {self.prefix}", ) ) - async def _get_default_local_asn(self) -> int: - """Returns the local ASN in the default VRF. + async def _get_local_asn(self) -> int: + """Returns the local ASN in the VRF of the prefix. Raises: RuntimeError: Failed to get the local ASN. """ - show_bgp_detail = await self._run_vtysh_commands(("show bgp detail json",)) + show_bgp_detail = await self._run_vtysh_commands( + ( + ( + f"show bgp vrf {self.vrf} detail json" + if self.vrf + else "show bgp detail json" + ), + ) + ) bgp_detail = json.loads(show_bgp_detail) if warning := bgp_detail.get("warning"): raise RuntimeError(f"Failed to get local ASN: {warning}") diff --git a/tests/conftest.py b/tests/conftest.py index bf9f1ca..71acc07 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -1,7 +1,11 @@ from ipaddress import IPv4Network, IPv6Network +from typing import TypeAlias import pytest +_IP_Prefix: TypeAlias = IPv4Network | IPv6Network +_VRF: TypeAlias = str | None + @pytest.fixture(scope="session") def example_asn() -> int: @@ -21,5 +25,12 @@ def ipv6_example_network() -> IPv6Network: @pytest.fixture( scope="session", params=["ipv4_example_network", "ipv6_example_network"] ) -def example_networks(request) -> IPv4Network | IPv6Network: +def example_networks(request) -> _IP_Prefix: + """Parametrize tests with example prefixes.""" return request.getfixturevalue(request.param) + + +@pytest.fixture(params=[None, "vrf-func-test"]) +def example_vrfs(request) -> _VRF: + """Parametrize tests with example VRFs.""" + return request.param diff --git a/tests/prefix/frrouting/conftest.py b/tests/prefix/frrouting/conftest.py index 916fea8..f3e02bb 100644 --- a/tests/prefix/frrouting/conftest.py +++ b/tests/prefix/frrouting/conftest.py @@ -3,16 +3,15 @@ from collections.abc import Callable from contextlib import suppress from dataclasses import dataclass, field -from ipaddress import IPv4Network, IPv6Network +from ipaddress import IPv4Network from pathlib import Path -from typing import TypeAlias import pytest -_Prefix: TypeAlias = IPv4Network | IPv6Network +from tests.conftest import _VRF, _IP_Prefix -def get_afi(prefix: _Prefix) -> str: +def get_afi(prefix: _IP_Prefix) -> str: """Return the FRR string AFI for the given IP type.""" return "ipv6" if not isinstance(prefix, IPv4Network) else "ipv4" @@ -126,16 +125,17 @@ def vtysh(frr_container) -> Vtysh: @pytest.fixture -def bgp_prefix_configured() -> Callable[[_Prefix, Vtysh], bool]: +def bgp_prefix_configured() -> Callable[[_IP_Prefix, Vtysh, _VRF], bool]: """A callable that can be used to check if a BGP prefix is configured.""" - def _(prefix: _Prefix, vtysh: Vtysh) -> bool: + def _(prefix: _IP_Prefix, vtysh: Vtysh, vrf: _VRF = None) -> bool: family = get_afi(prefix) - show_prefix = vtysh( - f"show ip bgp {family} unicast {prefix} json", - configure_terminal=False, - context=[], + cmd = ( + f"show ip bgp vrf {vrf} {family} unicast {prefix} json" + if vrf + else f"show ip bgp {family} unicast {prefix} json" ) + show_prefix = vtysh(cmd, configure_terminal=False, context=[]) prefix_info = json.loads(show_prefix) with suppress(KeyError): @@ -151,32 +151,38 @@ def _(prefix: _Prefix, vtysh: Vtysh) -> bool: @pytest.fixture -def add_bgp_prefix() -> Callable[[_Prefix, int, Vtysh], None]: +def add_bgp_prefix() -> Callable[[_IP_Prefix, int, Vtysh, _VRF], None]: """A callable that can be used to add a BGP prefix.""" - def _(prefix: _Prefix, asn: int, vtysh: Vtysh) -> None: + def _(prefix: _IP_Prefix, asn: int, vtysh: Vtysh, vrf: _VRF = None) -> None: """Add a network to the BGP configuration using vtysh.""" family = get_afi(prefix) vtysh( f"network {prefix}", configure_terminal=True, - context=[f"router bgp {asn}", f"address-family {family} unicast"], + context=[ + f"router bgp {asn} vrf {vrf}" if vrf else f"router bgp {asn}", + f"address-family {family} unicast", + ], ) return _ @pytest.fixture -def remove_bgp_prefix() -> Callable[[_Prefix, int, Vtysh], None]: +def remove_bgp_prefix() -> Callable[[_IP_Prefix, int, Vtysh, _VRF], None]: """A callable that can be used to remove a BGP prefix.""" - def _(prefix: _Prefix, asn: int, vtysh: Vtysh) -> None: + def _(prefix: _IP_Prefix, asn: int, vtysh: Vtysh, vrf: _VRF = None) -> None: """Remove a network from the BGP configuration using vtysh.""" family = get_afi(prefix) vtysh( f"no network {prefix}", configure_terminal=True, - context=[f"router bgp {asn}", f"address-family {family} unicast"], + context=[ + f"router bgp {asn} vrf {vrf}" if vrf else f"router bgp {asn}", + f"address-family {family} unicast", + ], ) return _ diff --git a/tests/prefix/frrouting/test_basic_functionality.py b/tests/prefix/frrouting/test_basic_functionality.py index 27044b0..96ed630 100644 --- a/tests/prefix/frrouting/test_basic_functionality.py +++ b/tests/prefix/frrouting/test_basic_functionality.py @@ -21,19 +21,28 @@ async def test_announce_adds_bgp_network( # noqa: PLR0913 vtysh, docker_executor, example_networks, + example_vrfs, bgp_prefix_configured, remove_bgp_prefix, example_asn, ): """Announcing adds the corresponding BGP prefix to the configuration.""" - prefix = FRRoutingPrefix(example_networks, executor=docker_executor) + prefix = FRRoutingPrefix( + example_networks, vrf=example_vrfs, executor=docker_executor + ) + if example_vrfs: + vtysh(f"router bgp {example_asn} vrf {example_vrfs}", configure_terminal=True) await prefix.announce() - assert bgp_prefix_configured(prefix.prefix, vtysh) + assert bgp_prefix_configured(prefix.prefix, vtysh=vtysh, vrf=example_vrfs) # Clean up - remove_bgp_prefix(prefix.prefix, example_asn, vtysh) + remove_bgp_prefix(prefix.prefix, asn=example_asn, vtysh=vtysh, vrf=example_vrfs) + if example_vrfs: + vtysh( + f"no router bgp {example_asn} vrf {example_vrfs}", configure_terminal=True + ) @pytest.mark.asyncio @@ -41,17 +50,20 @@ async def test_denounce_removes_bgp_network( # noqa: PLR0913 vtysh, docker_executor, example_networks, + example_vrfs, example_asn, bgp_prefix_configured, add_bgp_prefix, ): """Denouncing removes the corresponding BGP prefix from the configuration.""" - prefix = FRRoutingPrefix(example_networks, executor=docker_executor) - add_bgp_prefix(prefix.prefix, asn=example_asn, vtysh=vtysh) + prefix = FRRoutingPrefix( + example_networks, vrf=example_vrfs, executor=docker_executor + ) + add_bgp_prefix(prefix.prefix, asn=example_asn, vtysh=vtysh, vrf=example_vrfs) await prefix.denounce() - assert not bgp_prefix_configured(prefix.prefix, vtysh) + assert not bgp_prefix_configured(prefix.prefix, vtysh=vtysh, vrf=example_vrfs) @pytest.mark.asyncio @@ -60,18 +72,21 @@ async def test_announcement_state_reported_correctly( # noqa: PLR0913 vtysh, docker_executor, example_networks, + example_vrfs, example_asn, add_bgp_prefix, remove_bgp_prefix, announced: bool, ): """The announcement state is reported correctly.""" - prefix = FRRoutingPrefix(example_networks, executor=docker_executor) + prefix = FRRoutingPrefix( + example_networks, vrf=example_vrfs, executor=docker_executor + ) if announced: - add_bgp_prefix(prefix.prefix, asn=example_asn, vtysh=vtysh) + add_bgp_prefix(prefix.prefix, asn=example_asn, vtysh=vtysh, vrf=example_vrfs) assert await prefix.is_announced() == announced # Clean up if announced: - remove_bgp_prefix(prefix.prefix, example_asn, vtysh) + remove_bgp_prefix(prefix.prefix, asn=example_asn, vtysh=vtysh, vrf=example_vrfs)