Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 15 additions & 4 deletions clixon/clixon.py
Original file line number Diff line number Diff line change
Expand Up @@ -195,17 +195,28 @@ def get_root(self, path: Optional[str] = None) -> object:
"""
logger.debug("Updating root object")

config = rpc_config_get(user=self.__user, source=self.__source)
# If path is provided, convert it to XPath and use it in the RPC call
# This reduces the amount of data transferred from the backend
if path:
from clixon.helpers import path_to_xpath
xpath = path_to_xpath(path)
config = rpc_config_get(user=self.__user, source=self.__source, xpath=xpath)
else:
config = rpc_config_get(user=self.__user, source=self.__source)

send(self.__socket, config, pp)
data = read(self.__socket, pp)

self.__handle_errors(data)
self.__root = parse_string(data).rpc_reply.data
parsed = parse_string(data).rpc_reply.data

# When path is provided, we still need to use get_path() for client-side navigation
# because the XPath filter may return a subtree containing the requested element
# within a <data> wrapper, and we need to navigate to the exact element requested
if path:
return get_path(self.__root, path)

return get_path(parsed, path)

self.__root = parsed
return self.__root

def __wait_for_notification(self, return_data: Optional[bool] = False) -> None:
Expand Down
36 changes: 36 additions & 0 deletions clixon/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -244,6 +244,42 @@ def is_juniper(device: Element) -> bool:
return False


def path_to_xpath(path: str) -> str:
"""
Converts a simplified path format to XPath expression.

Examples:
path_to_xpath("devices/device[0]") -> "/devices/device[1]"
path_to_xpath("devices/device[name='r1']/config") -> "/devices/device[name='r1']/config"
path_to_xpath("services/bgp-peer[name='bgp-test']") -> "/services/bgp-peer[name='bgp-test']"

:param path: Simplified path (can be None or empty string, which returns "/")
:type path: str
:return: XPath expression
:rtype: str
"""
if not path:
return "/"

# Ensure path starts with /
if not path.startswith("/"):
path = "/" + path

# Replace any [key="value"] with [key='value']
path = re.sub(r'(\[.*?)"(.*?)"', r"\1'\2'", path)

# Convert numeric indices to XPath format (0-based to 1-based)
# Match [digit] patterns and increment them
def convert_index(match):
index = int(match.group(1))
# XPath uses 1-based indexing
return f"[{index + 1}]"

path = re.sub(r'\[(\d+)\]', convert_index, path)

return path


def get_path(root: Element, path: str) -> Optional[Element]:
"""
Returns the element at the path. Poor mans xpath.
Expand Down
6 changes: 4 additions & 2 deletions clixon/netconf.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ class RPCTypes(Enum):


def rpc_config_get(
user: Optional[str] = None, source: Optional[str] = "actions"
user: Optional[str] = None, source: Optional[str] = "actions", xpath: Optional[str] = "/"
) -> Element:
"""
Create a get-config RPC element.
Expand All @@ -42,12 +42,14 @@ def rpc_config_get(
:type user: str
:param source: Source of the configuration
:type source: str
:param xpath: XPath filter expression (default: "/")
:type xpath: str
:return: RPC element
:rtype: Element

"""
attributes = {}
xpath_attributes = {"nc:type": "xpath", "nc:select": "/"}
xpath_attributes = {"nc:type": "xpath", "nc:select": xpath}

if not user:
user = getpass.getuser()
Expand Down
24 changes: 24 additions & 0 deletions tests/test_netconf.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,30 @@ def test_rpc_config_get_user():
assert root.dumps() == xmlstr


def test_rpc_config_get_xpath():
"""
Test the rpc_config_get function with custom xpath.
"""

xmlstr = f"""<rpc xmlns="urn:ietf:params:xml:ns:netconf:base:1.0" username="{user}" xmlns:nc="urn:ietf:params:xml:ns:netconf:base:1.0" message-id="42"><get-config><source><actions xmlns="http://clicon.org/controller"/></source><nc:filter nc:type="xpath" nc:select="/devices/device[1]"/></get-config></rpc>"""

root = netconf.rpc_config_get(xpath="/devices/device[1]")

assert root.dumps() == xmlstr


def test_rpc_config_get_xpath_user():
"""
Test the rpc_config_get function with custom xpath and user.
"""

xmlstr = f"""<rpc xmlns="urn:ietf:params:xml:ns:netconf:base:1.0" username="nisse" xmlns:nc="urn:ietf:params:xml:ns:netconf:base:1.0" message-id="42"><get-config><source><actions xmlns="http://clicon.org/controller"/></source><nc:filter nc:type="xpath" nc:select="/devices/device[name='test']"/></get-config></rpc>"""

root = netconf.rpc_config_get(user="nisse", xpath="/devices/device[name='test']")

assert root.dumps() == xmlstr


def test_rpc_commit():
"""
Test the rpc_commit function.
Expand Down
76 changes: 76 additions & 0 deletions tests/test_path_to_xpath.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
from clixon.helpers import path_to_xpath


def test_path_to_xpath_simple():
"""
Test path_to_xpath with a simple path.
"""
assert path_to_xpath("devices/device") == "/devices/device"


def test_path_to_xpath_with_leading_slash():
"""
Test path_to_xpath with a path that already has a leading slash.
"""
assert path_to_xpath("/devices/device") == "/devices/device"


def test_path_to_xpath_with_index():
"""
Test path_to_xpath with numeric index (0-based to 1-based conversion).
"""
assert path_to_xpath("devices/device[0]") == "/devices/device[1]"
assert path_to_xpath("devices/device[1]") == "/devices/device[2]"
assert path_to_xpath("devices/device[0]/config") == "/devices/device[1]/config"


def test_path_to_xpath_with_name_filter():
"""
Test path_to_xpath with name filter.
"""
assert path_to_xpath("devices/device[name='r1']") == "/devices/device[name='r1']"
assert (
path_to_xpath("devices/device[name='r1']/config")
== "/devices/device[name='r1']/config"
)


def test_path_to_xpath_with_double_quotes():
"""
Test path_to_xpath with double quotes (should be converted to single quotes).
"""
assert path_to_xpath('devices/device[name="r1"]') == "/devices/device[name='r1']"


def test_path_to_xpath_complex():
"""
Test path_to_xpath with complex paths.
"""
assert (
path_to_xpath("devices/device[name='juniper1']/config/configuration/version")
== "/devices/device[name='juniper1']/config/configuration/version"
)
assert (
path_to_xpath(
"devices/device[name='juniper1']/config/configuration/interfaces/interface[name='lo0']/unit[0]"
)
== "/devices/device[name='juniper1']/config/configuration/interfaces/interface[name='lo0']/unit[1]"
)


def test_path_to_xpath_empty():
"""
Test path_to_xpath with empty path.
"""
assert path_to_xpath("") == "/"
assert path_to_xpath(None) == "/"


def test_path_to_xpath_multiple_filters():
"""
Test path_to_xpath with multiple filters in the path.
"""
assert (
path_to_xpath("services/bgp-peer[name='bgp-test']")
== "/services/bgp-peer[name='bgp-test']"
)