diff --git a/clixon/clixon.py b/clixon/clixon.py index 25f7728..aca135d 100644 --- a/clixon/clixon.py +++ b/clixon/clixon.py @@ -195,17 +195,28 @@ def get_root(self, path: Optional[str] = None) -> object: """ logger.debug("Updating root object") - config = rpc_config_get(user=self.__user, source=self.__source) + # If path is provided, convert it to XPath and use it in the RPC call + # This reduces the amount of data transferred from the backend + if path: + from clixon.helpers import path_to_xpath + xpath = path_to_xpath(path) + config = rpc_config_get(user=self.__user, source=self.__source, xpath=xpath) + else: + config = rpc_config_get(user=self.__user, source=self.__source) send(self.__socket, config, pp) data = read(self.__socket, pp) self.__handle_errors(data) - self.__root = parse_string(data).rpc_reply.data + parsed = parse_string(data).rpc_reply.data + # When path is provided, we still need to use get_path() for client-side navigation + # because the XPath filter may return a subtree containing the requested element + # within a wrapper, and we need to navigate to the exact element requested if path: - return get_path(self.__root, path) - + return get_path(parsed, path) + + self.__root = parsed return self.__root def __wait_for_notification(self, return_data: Optional[bool] = False) -> None: diff --git a/clixon/helpers.py b/clixon/helpers.py index cf157b4..1da8b91 100644 --- a/clixon/helpers.py +++ b/clixon/helpers.py @@ -244,6 +244,42 @@ def is_juniper(device: Element) -> bool: return False +def path_to_xpath(path: str) -> str: + """ + Converts a simplified path format to XPath expression. + + Examples: + path_to_xpath("devices/device[0]") -> "/devices/device[1]" + path_to_xpath("devices/device[name='r1']/config") -> "/devices/device[name='r1']/config" + path_to_xpath("services/bgp-peer[name='bgp-test']") -> "/services/bgp-peer[name='bgp-test']" + + :param path: Simplified path (can be None or empty string, which returns "/") + :type path: str + :return: XPath expression + :rtype: str + """ + if not path: + return "/" + + # Ensure path starts with / + if not path.startswith("/"): + path = "/" + path + + # Replace any [key="value"] with [key='value'] + path = re.sub(r'(\[.*?)"(.*?)"', r"\1'\2'", path) + + # Convert numeric indices to XPath format (0-based to 1-based) + # Match [digit] patterns and increment them + def convert_index(match): + index = int(match.group(1)) + # XPath uses 1-based indexing + return f"[{index + 1}]" + + path = re.sub(r'\[(\d+)\]', convert_index, path) + + return path + + def get_path(root: Element, path: str) -> Optional[Element]: """ Returns the element at the path. Poor mans xpath. diff --git a/clixon/netconf.py b/clixon/netconf.py index b4844b1..386f964 100644 --- a/clixon/netconf.py +++ b/clixon/netconf.py @@ -33,7 +33,7 @@ class RPCTypes(Enum): def rpc_config_get( - user: Optional[str] = None, source: Optional[str] = "actions" + user: Optional[str] = None, source: Optional[str] = "actions", xpath: Optional[str] = "/" ) -> Element: """ Create a get-config RPC element. @@ -42,12 +42,14 @@ def rpc_config_get( :type user: str :param source: Source of the configuration :type source: str + :param xpath: XPath filter expression (default: "/") + :type xpath: str :return: RPC element :rtype: Element """ attributes = {} - xpath_attributes = {"nc:type": "xpath", "nc:select": "/"} + xpath_attributes = {"nc:type": "xpath", "nc:select": xpath} if not user: user = getpass.getuser() diff --git a/tests/test_netconf.py b/tests/test_netconf.py index c6c296f..aa17a70 100644 --- a/tests/test_netconf.py +++ b/tests/test_netconf.py @@ -61,6 +61,30 @@ def test_rpc_config_get_user(): assert root.dumps() == xmlstr +def test_rpc_config_get_xpath(): + """ + Test the rpc_config_get function with custom xpath. + """ + + xmlstr = f"""""" + + root = netconf.rpc_config_get(xpath="/devices/device[1]") + + assert root.dumps() == xmlstr + + +def test_rpc_config_get_xpath_user(): + """ + Test the rpc_config_get function with custom xpath and user. + """ + + xmlstr = f"""""" + + root = netconf.rpc_config_get(user="nisse", xpath="/devices/device[name='test']") + + assert root.dumps() == xmlstr + + def test_rpc_commit(): """ Test the rpc_commit function. diff --git a/tests/test_path_to_xpath.py b/tests/test_path_to_xpath.py new file mode 100644 index 0000000..1ef30fd --- /dev/null +++ b/tests/test_path_to_xpath.py @@ -0,0 +1,76 @@ +from clixon.helpers import path_to_xpath + + +def test_path_to_xpath_simple(): + """ + Test path_to_xpath with a simple path. + """ + assert path_to_xpath("devices/device") == "/devices/device" + + +def test_path_to_xpath_with_leading_slash(): + """ + Test path_to_xpath with a path that already has a leading slash. + """ + assert path_to_xpath("/devices/device") == "/devices/device" + + +def test_path_to_xpath_with_index(): + """ + Test path_to_xpath with numeric index (0-based to 1-based conversion). + """ + assert path_to_xpath("devices/device[0]") == "/devices/device[1]" + assert path_to_xpath("devices/device[1]") == "/devices/device[2]" + assert path_to_xpath("devices/device[0]/config") == "/devices/device[1]/config" + + +def test_path_to_xpath_with_name_filter(): + """ + Test path_to_xpath with name filter. + """ + assert path_to_xpath("devices/device[name='r1']") == "/devices/device[name='r1']" + assert ( + path_to_xpath("devices/device[name='r1']/config") + == "/devices/device[name='r1']/config" + ) + + +def test_path_to_xpath_with_double_quotes(): + """ + Test path_to_xpath with double quotes (should be converted to single quotes). + """ + assert path_to_xpath('devices/device[name="r1"]') == "/devices/device[name='r1']" + + +def test_path_to_xpath_complex(): + """ + Test path_to_xpath with complex paths. + """ + assert ( + path_to_xpath("devices/device[name='juniper1']/config/configuration/version") + == "/devices/device[name='juniper1']/config/configuration/version" + ) + assert ( + path_to_xpath( + "devices/device[name='juniper1']/config/configuration/interfaces/interface[name='lo0']/unit[0]" + ) + == "/devices/device[name='juniper1']/config/configuration/interfaces/interface[name='lo0']/unit[1]" + ) + + +def test_path_to_xpath_empty(): + """ + Test path_to_xpath with empty path. + """ + assert path_to_xpath("") == "/" + assert path_to_xpath(None) == "/" + + +def test_path_to_xpath_multiple_filters(): + """ + Test path_to_xpath with multiple filters in the path. + """ + assert ( + path_to_xpath("services/bgp-peer[name='bgp-test']") + == "/services/bgp-peer[name='bgp-test']" + )