diff --git a/netmiko/aviat/__init__.py b/netmiko/aviat/__init__.py new file mode 100644 index 000000000..b42e470e6 --- /dev/null +++ b/netmiko/aviat/__init__.py @@ -0,0 +1,3 @@ +from netmiko.aviat.aviat_ssh import AviatWTMSSH + +__all__ = ["AviatWTMSSH"] diff --git a/netmiko/aviat/aviat_ssh.py b/netmiko/aviat/aviat_ssh.py new file mode 100644 index 000000000..1b69c305e --- /dev/null +++ b/netmiko/aviat/aviat_ssh.py @@ -0,0 +1,93 @@ +from netmiko.cisco_base_connection import CiscoSSHConnection +import re +from netmiko import log +from typing import Optional + + +class AviatWTMSSH(CiscoSSHConnection): + """Aviat WTM Outdoor Radio support""" + + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + + def session_preparation(self) -> None: + self._test_channel_read() + self.disable_paging() + self.set_base_prompt() + + def disable_paging( + self, + commands: list = ["session paginate false"], + delay_factor: Optional[float] = None, + cmd_verify: bool = True, + pattern: Optional[str] = None, + ) -> str: + return self.send_config_set( + config_commands=commands, + ) + + def find_prompt(self, delay_factor: float = 1.0, pattern: str = r"[$>#]") -> str: + return super().find_prompt(delay_factor=delay_factor, pattern=pattern) + + def exit_config_mode( + self, + exit_config: str = "end", + pattern: str = r"Uncommitted changes.*CANCEL\]", + confirm: str = "yes", + ) -> str: + """ + Exits from configuration mode. Overwritten from base class because the device + prompts to save uncommitted changes when exiting config mode and requires user confirmation. + If 'Uncommitted changes found' is detected in the output, the function sends a 'confirm' + command. + """ + output = "" + if self.check_config_mode(): + self.write_channel(self.normalize_cmd(exit_config)) + # Make sure you read until you detect the command echo (avoid getting out of sync) + if self.global_cmd_verify is not False: + output += self.read_until_pattern( + pattern=re.escape(exit_config.strip()) + ) + # Read until we detect the uncommitted changes pattern or the usual prompt pattern + output += self.read_until_pattern(pattern=pattern + "|#") + # If uncommitted changes were found, confirm them + if "Uncommitted changes found" in output: + self.write_channel(self.normalize_cmd(confirm)) + output += self.read_until_pattern(pattern=r"[$>#]") + if self.check_config_mode(): + raise ValueError("Failed to exit configuration mode") + log.debug(f"exit_config_mode: {output}") + return output + + def config_mode( + self, + config_command: str = "config", + pattern: str = "", + re_flags: int = 0, + ) -> str: + return super().config_mode( + config_command=config_command, pattern=pattern, re_flags=re_flags + ) + + def enable( + self, + cmd: str = "", + pattern: str = "ssword", + enable_pattern: Optional[str] = None, + check_state: bool = True, + re_flags: int = re.IGNORECASE, + ) -> str: + """Aviat WTM Outdoor Radio does not have an enable mode""" + raise NotImplementedError + + def save_config( + self, cmd: str = "", confirm: bool = False, confirm_response: str = "" + ) -> str: + """ + Aviat WTM Outdoor Radio does not have a 'save config' command. Instead, + when changes are detected in config mode, the user is prompted to commit these + changes. This happens either when trying to exit config mode or when the 'commit' + command is typed in config mode. + """ + raise NotImplementedError diff --git a/netmiko/ssh_dispatcher.py b/netmiko/ssh_dispatcher.py index d05859aee..74f317f81 100755 --- a/netmiko/ssh_dispatcher.py +++ b/netmiko/ssh_dispatcher.py @@ -25,7 +25,7 @@ Audiocode66Telnet, AudiocodeShellTelnet, ) - +from netmiko.aviat import AviatWTMSSH from netmiko.bintec import BintecBossSSH, BintecBossTelnet from netmiko.brocade import BrocadeFOSSSH from netmiko.broadcom import BroadcomIcosSSH @@ -130,6 +130,8 @@ from netmiko.ruijie import RuijieOSSSH, RuijieOSTelnet from netmiko.sixwind import SixwindOSSSH from netmiko.sophos import SophosSfosSSH +from netmiko.supermicro import SmciSwitchSmisSSH +from netmiko.supermicro import SmciSwitchSmisTelnet from netmiko.teldat import TeldatCITSSH, TeldatCITTelnet from netmiko.terminal_server import TerminalServerSSH from netmiko.terminal_server import TerminalServerTelnet @@ -144,8 +146,6 @@ from netmiko.yamaha import YamahaTelnet from netmiko.zte import ZteZxrosSSH from netmiko.zte import ZteZxrosTelnet -from netmiko.supermicro import SmciSwitchSmisSSH -from netmiko.supermicro import SmciSwitchSmisTelnet from netmiko.zyxel import ZyxelSSH if TYPE_CHECKING: @@ -179,6 +179,7 @@ "audiocode_shell": AudiocodeShellSSH, "avaya_ers": ExtremeErsSSH, "avaya_vsp": ExtremeVspSSH, + "aviat_wtm": AviatWTMSSH, "bintec_boss": BintecBossSSH, "broadcom_icos": BroadcomIcosSSH, "brocade_fos": BrocadeFOSSSH, diff --git a/tests/test_netmiko_config.py b/tests/test_netmiko_config.py index 490c69e78..7df3701e0 100755 --- a/tests/test_netmiko_config.py +++ b/tests/test_netmiko_config.py @@ -19,6 +19,9 @@ def test_enable_mode(net_connect, commands, expected_responses): Catch exception for devices that don't support enable """ + # aviatrix_wtm does not support enable mode + if "aviat_wtm" in net_connect.device_type: + assert pytest.skip("Platform doesn't support enable mode.") try: net_connect.enable() enable_prompt = net_connect.find_prompt() diff --git a/tests/test_netmiko_show.py b/tests/test_netmiko_show.py index c050db6c0..14392203c 100755 --- a/tests/test_netmiko_show.py +++ b/tests/test_netmiko_show.py @@ -221,7 +221,6 @@ def test_send_command_genie(net_connect, commands, expected_responses): def test_send_multiline_timing(net_connect): - debug = False if ( @@ -238,7 +237,6 @@ def test_send_multiline_timing(net_connect): def test_send_multiline(net_connect): - debug = False if ( "cisco_ios" not in net_connect.device_type @@ -368,6 +366,9 @@ def test_enable_mode(net_connect, commands, expected_responses): # testuser on pynetqa does not have root access if net_connect.username == "testuser" and net_connect.host == "3.15.148.177": assert pytest.skip() + # aviatrix_wtm does not support enable mode + if "aviat_wtm" in net_connect.device_type: + assert pytest.skip("Platform doesn't support enable mode.") try: net_connect.enable() enable_prompt = net_connect.find_prompt()