diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml index 3baae83..3e5826e 100644 --- a/.github/workflows/build.yml +++ b/.github/workflows/build.yml @@ -26,7 +26,7 @@ jobs: strategy: fail-fast: false matrix: - python-version: [ "3.8", "3.9", "3.10", "3.11" ] + python-version: [ "3.8", "3.9", "3.10", "3.11", "3.12" ] steps: - uses: actions/checkout@v2 diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index b591d56..48a1d74 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -4,7 +4,7 @@ repos: rev: 23.3.0 hooks: - id: black - language_version: python3.11 + language_version: python3.12 - repo: https://github.com/PyCQA/flake8 rev: 6.0.0 diff --git a/.readthedocs.yaml b/.readthedocs.yaml index 42fa7ba..d7f44cb 100644 --- a/.readthedocs.yaml +++ b/.readthedocs.yaml @@ -8,7 +8,7 @@ version: 2 build: os: ubuntu-22.04 tools: - python: "3.11" + python: "3.12" # Build documentation in the "docsrc/" directory with Sphinx sphinx: diff --git a/README.md b/README.md index d91ac1a..616c86a 100644 --- a/README.md +++ b/README.md @@ -34,8 +34,9 @@ by Steve McGrath. ## Products - Zscaler Private Access (ZPA) - Zscaler Internet Access (ZIA) +- Zscaler Digital Experience (ZDX) - Zscaler Mobile Admin Portal for Zscaler Client Connector (ZCC) -- Cloud Security Posture Management (CSPM) - (work in progress) +- Zscaler Connector Portal (ZCON) ## Installation @@ -51,17 +52,34 @@ for each product that you are interfacing with. Once you have the requirements a you're ready to go. -### Quick ZIA Example - +### Quick ZIA Example - Explicitly Activate Changes +**Note:** Changes will not be activated until you explicitly call the `activate()` method or the admin session is closed. +It's a best-practice to log out your API session so that logging is sane and you don't have unnecessary sessions open. ```python from pyzscaler import ZIA -from pprint import pprint zia = ZIA(api_key='API_KEY', cloud='CLOUD', username='USERNAME', password='PASSWORD') for user in zia.users.list_users(): - pprint(user) + print(user) + +zia.config.activate() # Explicitly activate changes (if applicable). +zia.session.delete() # Log out of the ZIA API and automatically commit any other changes +``` + +### Quick ZIA Example - Using the Python Context Manager + +**Note**: Using the Python Context Manager will automatically log the admin user out and commit/activate any changes +made when execution is complete. + +```python +from pyzscaler import ZIA +with ZIA(api_key='API_KEY', cloud='CLOUD', username='USERNAME', password='PASSWORD') as zia: + for user in zia.users.list_users(): + print(user) + ``` + ### Quick ZPA Example ```python @@ -83,6 +101,27 @@ zcc = ZCC(client_id='CLIENT_ID', client_secret='CLIENT_SECRET', company_id='COMP for device in zcc.devices.list_devices(): pprint(device) ``` +### Quick ZDX Example + +```python +from pyzscaler import ZDX + +zdx = ZDX(client_id='CLIENT_ID', client_secret='CLIENT_SECRET', cloud='CLOUD') +for device in zdx.devices.list_devices(): + print(device) +``` +### Quick ZCON Example +The Zscaler Connector Portal uses the same authentication methods as ZIA and this will allow us to use the Python Context +Manager just like we did with ZIA. Of course, you can still use the explicit method if you prefer. + +```python +from pyzscaler import ZCON + +with ZCON(api_key='API_KEY', cloud='CLOUD', username='USERNAME', password='PASSWORD') as zcon: + for group in zcon.groups.list_groups(): + print(group) +``` + ## Documentation @@ -96,7 +135,7 @@ pyZscaler makes some quality of life improvements to simplify and clarify argume A start has been made on [user documentation](https://pyzscaler.packet.tech) with examples and explanations on how to implement with pyZcaler. ## Is It Tested? -Yes! pyZscaler has a complete test suite that fully covers all methods within the ZIA and ZPA modules. +Yes! pyZscaler has a complete test suite that fully covers all methods within all modules. ## Contributing diff --git a/docsrc/conf.py b/docsrc/conf.py index 2f62cf0..d8a2a80 100644 --- a/docsrc/conf.py +++ b/docsrc/conf.py @@ -25,9 +25,9 @@ html_title = "" # The short X.Y version -version = '1.5' +version = '1.6' # The full version, including alpha/beta/rc tags -release = '1.5.0' +release = '1.6.0' # -- General configuration --------------------------------------------------- diff --git a/docsrc/index.rst b/docsrc/index.rst index 23b0775..eff4140 100644 --- a/docsrc/index.rst +++ b/docsrc/index.rst @@ -10,6 +10,7 @@ zs/zpa/index zs/zcc/index zs/zdx/index + zs/zcon/index pyZscaler SDK - Library Reference ===================================================================== @@ -43,6 +44,7 @@ Products - :doc:`Zscaler Internet Access (ZIA) ` - :doc:`Zscaler Mobile Admin Portal ` - :doc:`Zscaler Digital Experience (ZDX) ` +- :doc:`Zscaler Connector Portal (ZCON) ` Installation ============== @@ -68,11 +70,10 @@ Quick ZIA Example .. code-block:: python from pyzscaler import ZIA - from pprint import pprint zia = ZIA(api_key='API_KEY', cloud='CLOUD', username='USERNAME', password='PASSWORD') for user in zia.users.list_users(): - pprint(user) + print(user) Quick ZPA Example ^^^^^^^^^^^^^^^^^^ @@ -80,11 +81,10 @@ Quick ZPA Example .. code-block:: python from pyzscaler import ZPA - from pprint import pprint zpa = ZPA(client_id='CLIENT_ID', client_secret='CLIENT_SECRET', customer_id='CUSTOMER_ID') for app_segment in zpa.app_segments.list_segments(): - pprint(app_segment) + print(app_segment) Quick ZCC Example @@ -93,11 +93,10 @@ Quick ZCC Example .. code-block:: python from pyzscaler import ZCC - from pprint import pprint zcc = ZCC(client_id='CLIENT_ID', client_secret='CLIENT_SECRET', company_id='COMPANY_ID) for device in zcc.devices.list_devices(): - pprint(device) + print(device) Quick ZDX Example ^^^^^^^^^^^^^^^^^^^ @@ -105,11 +104,21 @@ Quick ZDX Example .. code-block:: python from pyzscaler import ZDX - from pprint import pprint - zcc = ZDX(client_id='CLIENT_ID', client_secret='CLIENT_SECRET') + zdx = ZDX(client_id='CLIENT_ID', client_secret='CLIENT_SECRET') for device in zdx.devices.list_devices(): - pprint(device) + print(device) + +Quick ZCON Example +^^^^^^^^^^^^^^^^^^^ + +.. code-block:: python + + from pyzscaler import ZCON + + zcon = ZCON(api_key='API_KEY', cloud='CLOUD', username='USERNAME', password='PASSWORD') + for group in zcon.connectors.list_groups(): + print(group) .. automodule:: pyzscaler diff --git a/docsrc/zs/zcon/admin.rst b/docsrc/zs/zcon/admin.rst new file mode 100644 index 0000000..2937082 --- /dev/null +++ b/docsrc/zs/zcon/admin.rst @@ -0,0 +1,12 @@ +admin +-------------- + +The following methods allow for interaction with the ZCON +Admin API endpoints. + +Methods are accessible via ``zcon.admin`` + +.. _zcon-admin: + +.. automodule:: pyzscaler.zcon.admin + :members: \ No newline at end of file diff --git a/docsrc/zs/zcon/config.rst b/docsrc/zs/zcon/config.rst new file mode 100644 index 0000000..39e73f9 --- /dev/null +++ b/docsrc/zs/zcon/config.rst @@ -0,0 +1,12 @@ +config +-------------- + +The following methods allow for interaction with the ZCON +Config API endpoints. + +Methods are accessible via ``zcon.config`` + +.. _zcon-config: + +.. automodule:: pyzscaler.zcon.config + :members: \ No newline at end of file diff --git a/docsrc/zs/zcon/connectors.rst b/docsrc/zs/zcon/connectors.rst new file mode 100644 index 0000000..59e0d83 --- /dev/null +++ b/docsrc/zs/zcon/connectors.rst @@ -0,0 +1,12 @@ +connectors +-------------- + +The following methods allow for interaction with the ZCON +Connectors API endpoints. + +Methods are accessible via ``zcon.connectors`` + +.. _zcon-connectors: + +.. automodule:: pyzscaler.zcon.connectors + :members: \ No newline at end of file diff --git a/docsrc/zs/zcon/index.rst b/docsrc/zs/zcon/index.rst new file mode 100644 index 0000000..3d23a96 --- /dev/null +++ b/docsrc/zs/zcon/index.rst @@ -0,0 +1,13 @@ +ZCON +========== +This package covers the ZCON interface. + +.. toctree:: + :maxdepth: 1 + :glob: + :hidden: + + * + +.. automodule:: pyzscaler.zcon + :members: diff --git a/docsrc/zs/zcon/locations.rst b/docsrc/zs/zcon/locations.rst new file mode 100644 index 0000000..f574529 --- /dev/null +++ b/docsrc/zs/zcon/locations.rst @@ -0,0 +1,12 @@ +locations +-------------- + +The following methods allow for interaction with the ZCON +Locations API endpoints. + +Methods are accessible via ``zcon.locations`` + +.. _zcon-locations: + +.. automodule:: pyzscaler.zcon.locations + :members: \ No newline at end of file diff --git a/docsrc/zs/zcon/session.rst b/docsrc/zs/zcon/session.rst new file mode 100644 index 0000000..e20e8aa --- /dev/null +++ b/docsrc/zs/zcon/session.rst @@ -0,0 +1,12 @@ +session +-------------- + +The following methods allow for interaction with the ZCON +Session API endpoints. + +Methods are accessible via ``zcon.session`` + +.. _zcon-session: + +.. automodule:: pyzscaler.zcon.session + :members: \ No newline at end of file diff --git a/docsrc/zs/zia/apptotal.rst b/docsrc/zs/zia/apptotal.rst new file mode 100644 index 0000000..95bfc9b --- /dev/null +++ b/docsrc/zs/zia/apptotal.rst @@ -0,0 +1,11 @@ +apptotal +============ + +The following methods allow for interaction with the ZIA AppTotal API endpoints. + +Methods are accessible via ``zia.apptotal`` + +.. _zia-apptotal: + +.. automodule:: pyzscaler.zia.apptotal + :members: \ No newline at end of file diff --git a/docsrc/zs/zia/cloud_apps.rst b/docsrc/zs/zia/cloud_apps.rst new file mode 100644 index 0000000..b46cec5 --- /dev/null +++ b/docsrc/zs/zia/cloud_apps.rst @@ -0,0 +1,12 @@ +cloud_apps +------------- + +The following methods allow for interaction with the ZIA +Cloud Applications API endpoints. + +Methods are accessible via ``zia.cloud_apps`` + +.. _zia-cloud_apps: + +.. automodule:: pyzscaler.zia.cloud_apps + :members: \ No newline at end of file diff --git a/pyproject.toml b/pyproject.toml index 9632398..1917d6e 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "pyzscaler" -version = "1.5.0" +version = "1.6.0" description = "A python SDK for the Zscaler API." authors = ["Mitch Kelly "] license = "MIT" @@ -20,6 +20,7 @@ classifiers = [ "Programming Language :: Python :: 3.9", "Programming Language :: Python :: 3.10", "Programming Language :: Python :: 3.11", + "Programming Language :: Python :: 3.12", "Topic :: Security", "Topic :: Software Development :: Libraries :: Python Modules", ] include = [ diff --git a/pyzscaler/__init__.py b/pyzscaler/__init__.py index 12f25c7..b99eef8 100644 --- a/pyzscaler/__init__.py +++ b/pyzscaler/__init__.py @@ -4,9 +4,10 @@ "Dax Mickelson", "Jacob GĂ„rder", ] -__version__ = "1.5.0" +__version__ = "1.6.0" from pyzscaler.zcc import ZCC # noqa +from pyzscaler.zcon import ZCON # noqa from pyzscaler.zdx import ZDX # noqa from pyzscaler.zia import ZIA # noqa from pyzscaler.zpa import ZPA # noqa diff --git a/pyzscaler/utils.py b/pyzscaler/utils.py index 46d924f..c11fa28 100644 --- a/pyzscaler/utils.py +++ b/pyzscaler/utils.py @@ -1,4 +1,5 @@ import functools +import re import time from box import Box, BoxList @@ -16,27 +17,50 @@ def snake_to_camel(name: str): "name_l10n_tag": "nameL10nTag", "surrogate_ip": "surrogateIP", "surrogate_ip_enforced_for_known_browsers": "surrogateIPEnforcedForKnownBrowsers", + "ec_vms": "ecVMs", + "ipv6_enabled": "ipV6Enabled", + "valid_ssl_certificate": "validSSLCertificate", } return edge_cases.get(name, name[0].lower() + name.title()[1:].replace("_", "")) +def camel_to_snake(name: str): + """Converts Zscaler's lower camelCase to Python Snake Case.""" + edge_cases = { + "routableIP": "routable_ip", + "isNameL10nTag": "is_name_l10n_tag", + "nameL10nTag": "name_l10n_tag", + "surrogateIP": "surrogate_ip", + "surrogateIPEnforcedForKnownBrowsers": "surrogate_ip_enforced_for_known_browsers", + "ecVMs": "ec_vms", + "ipV6Enabled": "ipv6_enabled", + "validSSLCertificate": "valid_ssl_certificate", + } + # Check if name is an edge case + if name in edge_cases: + return edge_cases[name] + + name = re.sub("([a-z0-9])([A-Z])", r"\1_\2", name) + return name.lower() + + def chunker(lst, n): """Yield successive n-sized chunks from lst.""" for i in range(0, len(lst), n): yield lst[i : i + n] -# Recursive function to convert all keys and nested keys from snake case -# to camel case. -def convert_keys(data): +def convert_keys(data, direction="to_camel"): + converter = camel_to_snake if direction == "to_snake" else snake_to_camel + if isinstance(data, (list, BoxList)): - return [convert_keys(inner_dict) for inner_dict in data] + return [convert_keys(inner_dict, direction=direction) for inner_dict in data] elif isinstance(data, (dict, Box)): new_dict = {} for k in data.keys(): v = data[k] - new_key = snake_to_camel(k) - new_dict[new_key] = convert_keys(v) if isinstance(v, (dict, list)) else v + new_key = converter(k) + new_dict[new_key] = convert_keys(v, direction=direction) if isinstance(v, (dict, list)) else v return new_dict else: return data diff --git a/pyzscaler/zcc/__init__.py b/pyzscaler/zcc/__init__.py index dee639b..b32d499 100644 --- a/pyzscaler/zcc/__init__.py +++ b/pyzscaler/zcc/__init__.py @@ -49,7 +49,7 @@ class ZCC(APISession): def __init__(self, **kw): self._client_id = kw.get("client_id", os.getenv(f"{self._env_base}_CLIENT_ID")) self._client_secret = kw.get("client_secret", os.getenv(f"{self._env_base}_CLIENT_SECRET")) - self._cloud = kw.get("cloud", os.getenv(f"{self._env_base}_CLOUD")) + self._env_cloud = kw.get("cloud", os.getenv(f"{self._env_base}_CLOUD")) self._url = ( kw.get("override_url", os.getenv(f"{self._env_base}_OVERRIDE_URL")) or f"https://api-mobile.{self._env_cloud}.net/papi" diff --git a/pyzscaler/zcon/__init__.py b/pyzscaler/zcon/__init__.py new file mode 100644 index 0000000..b5f29c6 --- /dev/null +++ b/pyzscaler/zcon/__init__.py @@ -0,0 +1,121 @@ +import os + +from box import Box +from restfly import APISession + +from pyzscaler import __version__ + +from .admin import ZCONAdminAPI +from .config import ZCONConfigAPI +from .connectors import ZCONConnectorsAPI +from .locations import ZCONLocationsAPI +from .session import ZCONSessionAPI + + +class ZCON(APISession): + """ + A Controller to access Endpoints in the Zscaler Cloud and Branch Connector API. + + The ZCON object stores the session token and simplifies access to CRUD options within the ZCON Portal. + + """ + + _vendor = "Zscaler" + _product = "Zscaler Cloud and Branch Connector" + _backoff = 3 + _build = __version__ + _box = True + _box_attrs = {"camel_killer_box": True} + _env_base = "ZCON" + _url = "https://connector.zscaler.net/api/v1" + _env_cloud = "zscaler" + + def __init__(self, **kw): + self._api_key = kw.get("api_key", os.getenv(f"{self._env_base}_API_KEY")) + self._username = kw.get("username", os.getenv(f"{self._env_base}_USERNAME")) + self._password = kw.get("password", os.getenv(f"{self._env_base}_PASSWORD")) + self.env_cloud = kw.get("cloud", os.getenv(f"{self._env_base}_CLOUD")) + self._url = ( + kw.get("override_url", os.getenv(f"{self._env_base}_OVERRIDE_URL")) + or f"https://connector.{self.env_cloud}.net/api/v1" + ) + self.conv_box = True + super(ZCON, self).__init__(**kw) + + def _build_session(self, **kwargs) -> Box: + """ + Build the APISession object. + + This method is called automatically when instantiating the ZCON object. + + Returns: + Box: The Box object representing the ZCON API. + + """ + super(ZCON, self)._build_session(**kwargs) + return self.session.create(api_key=self._api_key, username=self._username, password=self._password) + + def _deauthenticate(self): + """ + End the authentication session. + + Returns: + Box: The Box object representing the ZCON API. + + """ + return self.session.delete() + + @property + def admin(self) -> ZCONAdminAPI: + """ + The interface object for the :ref:`ZCON Admin interface `. + + Returns: + ZCONAdminAPI: The AdminAPI object. + + """ + return ZCONAdminAPI(self) + + @property + def connectors(self) -> ZCONConnectorsAPI: + """ + The interface object for the :ref:`ZCON Connectors interface `. + + Returns: + ZCONConnectorsAPI: The ConnectorsAPI object. + + """ + return ZCONConnectorsAPI(self) + + @property + def config(self) -> ZCONConfigAPI: + """ + The interface object for the :ref:`ZCON Config interface `. + + Returns: + ZCONConfigAPI: The ConfigAPI object. + + """ + return ZCONConfigAPI(self) + + @property + def locations(self) -> ZCONLocationsAPI: + """ + The interface object for the :ref:`ZCON Locations interface `. + + Returns: + ZCONLocationsAPI: The LocationsAPI object. + + """ + return ZCONLocationsAPI(self) + + @property + def session(self) -> ZCONSessionAPI: + """ + The interface object for the :ref:`ZCON Authentication interface `. + + Returns: + ZCONSessionAPI: The SessionAPI object. + + """ + return ZCONSessionAPI(self) diff --git a/pyzscaler/zcon/admin.py b/pyzscaler/zcon/admin.py new file mode 100644 index 0000000..bcc8ca1 --- /dev/null +++ b/pyzscaler/zcon/admin.py @@ -0,0 +1,511 @@ +from box import Box, BoxList +from restfly import APIEndpoint + +from pyzscaler.utils import convert_keys + + +class ZCONAdminAPI(APIEndpoint): + def list_roles(self, **kwargs) -> BoxList: + """ + List all existing admin roles. + + Keyword Args: + include_auditor_role (bool): Include / exclude auditor roles in the response. + include_partner_role (bool): Include / exclude partner roles in the response. + include_api_roles (bool): Include / exclude API roles in the response. + id (list): The ID of the roles to include. + + Returns: + :obj:`BoxList`: The list of roles. + + Examples: + Print all roles:: + + for role in zcon.admin.list_roles(): + print(role) + + Print all roles with additional parameters:: + + for role in zcon.admin.list_roles( + include_auditor_role=True, + include_partner_role=True, + include_api_roles=True, + ): + print(role) + + """ + return self._get("adminRoles") + + def get_role(self, role_id: str) -> Box: + """ + Get details for a specific admin role. + + Args: + role_id (str): The ID of the role to retrieve. + + Returns: + :obj:`Box`: The role details. + + Examples: + Print the details of a role:: + + print(zcon.admin.get_role("123456789") + + """ + return self._get(f"adminRoles/{role_id}") + + def add_role( + self, + name: str, + policy_access: str = "NONE", + report_access: str = "NONE", + username_access: str = "NONE", + dashboard_access: str = "NONE", + **kwargs, + ): + """ + Create a new admin role. + + Args: + name (str): The name of the role. + policy_access (str): The policy access level. + report_access (str): The report access level. + username_access (str): The username access level. + dashboard_access (str): The dashboard access level. + + Keyword Args: + feature_permissions_tuples (:obj:`List[Tuple[str, str]]`): + A list of tuple pairs specifying the feature permissions. Each tuple contains the feature name + (case-insensitive) and its access level. + + Accepted feature names (case-insensitive) are: + + - ``APIKEY_MANAGEMENT`` + - ``EDGE_CONNECTOR_CLOUD_PROVISIONING`` + - ``EDGE_CONNECTOR_LOCATION_MANAGEMENT`` + - ``EDGE_CONNECTOR_DASHBOARD`` + - ``EDGE_CONNECTOR_FORWARDING`` + - ``EDGE_CONNECTOR_TEMPLATE`` + - ``REMOTE_ASSISTANCE_MANAGEMENT`` + - ``EDGE_CONNECTOR_ADMIN_MANAGEMENT`` + - ``EDGE_CONNECTOR_NSS_CONFIGURATION`` + alerting_access (str): The alerting access level. + analysis_access (str): The analysis access level. + admin_acct_access (str): The admin account access level. + device_info_access (str): The device info access level. + + Note: + For access levels, the accepted values are: + + - ``NONE`` + - ``READ_ONLY`` + - ``READ_WRITE`` + + + Returns: + :obj:`dict`: The newly created role. + + Examples: + Minimum required arguments:: + + zcon.admin.add_role(name="NewRole") + + Including keyword arguments:: + + zcon.admin.add_role( + name="AdvancedRole", + policy_access="READ_ONLY", + feature_permissions_tuples=[ + ("apikey_management", "read_only"), + ("EDGE_CONNECTOR_CLOUD_PROVISIONING", "NONE") + ], + alerting_access="READ_WRITE" + ) + + """ + payload = { + "name": name, + "role_type": "EDGE_CONNECTOR_ADMIN", + "policy_access": policy_access, + "report_access": report_access, + "username_access": username_access, + "dashboard_access": dashboard_access, + } + + if feature_permissions_tuples := kwargs.pop("feature_permissions_tuples", None): + payload["feature_permissions"] = {k.upper(): v for k, v in feature_permissions_tuples} + + # Add optional parameters to payload + payload.update({k: v for k, v in kwargs.items() if v is not None}) + + # Convert snake to camelcase + payload = convert_keys(payload) + + return self._post("adminRoles", json=payload) + + def update_role(self, role_id: str, **kwargs) -> Box: + """ + Update an existing admin role. + + Args: + role_id (str): The ID of the role to update. + + Keyword Args: + name (str): The name of the role. + policy_access (str): The policy access level. + report_access (str): The report access level. + username_access (str): The username access level. + dashboard_access (str): The dashboard access level. + feature_permissions (:obj:`List[Tuple[str, str]]`): + A list of tuple pairs specifying the feature permissions. Each tuple contains the feature name + (case-insensitive) and its access level. + + Accepted feature names (case-insensitive) are: + + - ``APIKEY_MANAGEMENT`` + - ``EDGE_CONNECTOR_CLOUD_PROVISIONING`` + - ``EDGE_CONNECTOR_LOCATION_MANAGEMENT`` + - ``EDGE_CONNECTOR_DASHBOARD`` + - ``EDGE_CONNECTOR_FORWARDING`` + - ``EDGE_CONNECTOR_TEMPLATE`` + - ``REMOTE_ASSISTANCE_MANAGEMENT`` + - ``EDGE_CONNECTOR_ADMIN_MANAGEMENT`` + - ``EDGE_CONNECTOR_NSS_CONFIGURATION`` + alerting_access (str): The alerting access level. + analysis_access (str): The analysis access level. + admin_acct_access (str): The admin account access level. + device_info_access (str): The device info access level. + + Note: + For access levels, the accepted values are: + + - ``NONE`` + - ``READ_ONLY`` + - ``READ_WRITE`` + + Returns: + :obj:`Box`: The updated role. + + Examples: + Update a role:: + + zcon.admin.update_role( + role_id="123456789", + policy_access="READ_ONLY", + feature_permissions=[ + ("apikey_management", "read_only"), + ("EDGE_CONNECTOR_CLOUD_PROVISIONING", "NONE") + ], + alerting_access="READ_WRITE" + ) + + """ + payload = self.get_role(role_id) + + # Pop the feature permissions out first so that we retain their format + feature_permissions = kwargs.pop("feature_permissions", None) + + # Add optional parameters to payload + payload.update({k: v for k, v in kwargs.items() if v is not None}) + + # Convert snake to camelcase + payload = convert_keys(payload) + + # Now update the feature permissions + if feature_permissions: + payload["featurePermissions"] = {k.upper(): v for k, v in feature_permissions} + + return self._put(f"adminRoles/{role_id}", json=payload) + + def delete_role(self, role_id: str) -> int: + """ + Delete the specified admin role. + + Args: + role_id (str): The ID of the role to delete. + + Returns: + :obj:`int`: The status code of the operation. + + Examples: + Delete a role:: + + zcon.admin.delete_role("123456789") + + """ + return self._delete(f"adminRoles/{role_id}").status_code + + def change_password(self, username: str, old_password: str, new_password: str) -> int: + """ + Change the password for the specified admin user. + + Args: + username (str): The username of the admin user. + old_password (str): The current password of the admin user. + new_password (str): The new password for the admin user. + + Returns: + :obj:`int`: The status code of the operation. + + Examples: + Change a password:: + + zcon.admin.change_password("jdoe", "oldpassword123", "newpassword123") + + """ + payload = { + "userName": username, + "oldPassword": old_password, + "newPassword": new_password, + } + return self._post("passwordChange", json=payload).status_code + + def list_admins(self, **kwargs) -> BoxList: + """ + List all existing admin users. + + Keyword Args: + include_auditor_users (bool): Include / exclude auditor users in the response. + include_admin_users (bool): Include / exclude admin users in the response. + include_api_roles (bool): Include / exclude API roles in the response. + search (str): The search string to filter by. + page (int): The page offset to return. + page_size (int): The number of records to return per page. + version (int): Specifies the admins from a backup version + + + Returns: + :obj:`BoxList`: The list of admin users. + + Examples: + List all admins:: + + for admin in zcon.admin.list_admins(): + print(admin) + + List all admins with advanced features:: + + for admin in zcon.admin.list_admins( + include_auditor_users=True, + include_admin_users=True, + include_api_roles=True, + ): + print(admin) + + """ + payload = { + "partnerType": "EDGE_CONNECTOR_ADMIN", + } + + # Update the payload with keyword arguments + payload.update({k: v for k, v in kwargs.items() if v is not None}) + + # Convert snake to camelcase if needed + payload = convert_keys(payload) + + return self._get("adminUsers", params=payload) + + def get_admin(self, admin_id: str) -> Box: + """ + Get details for a specific admin user. + + Args: + admin_id (str): The ID of the admin user to retrieve. + + Returns: + :obj:`Box`: The admin user details. + + Examples: + Print the details of an admin user:: + + print(zcon.admin.get_admin("123456789") + + """ + return self._get(f"adminUsers/{admin_id}") + + def add_admin(self, user_name: str, login_name: str, role: str, email: str, password: str, **kwargs) -> Box: + """ + Create a new admin user. + + Args: + user_name (str): The name of the admin user. + login_name (str): The login name of the admin user. + role (str): The role of the admin user. + email (str): The email address of the admin user. + password (str): The password for the admin user. + + Keyword Args: + disabled (bool): Indicates whether the admin is disabled. + new_location_create_allowed (bool): Indicates whether the admin can create new locations. + admin_scope_type (str): The admin scope type. + admin_scope_group_member_entity_ids (list): Applicable if the admin scope type is `LOCATION_GROUP`. + is_default_admin (bool): Indicates whether the admin is the default admin. + is_deprecated_default_admin (bool): Indicates whether this admin is deletable. + is_auditor (bool): Indicates whether the admin is an auditor. + is_security_report_comm_enabled (bool): Indicates whether the admin can receive security reports. + is_service_update_comm_enabled (bool): Indicates whether the admin can receive service updates. + is_password_login_allowed (bool): Indicates whether the admin can log in with a password. + is_product_update_comm_enabled (bool): Indicates whether the admin can receive product updates. + is_exec_mobile_app_enabled (bool): Indicates whether Executive Insights App access is enabled for the admin. + send_mobile_app_invite (bool): + Indicates whether to send an invitation email to download Executive Insights App to admin. + send_zdx_onboard_invite (bool): Indicates whether to send an invitation email for ZDX onboarding to admin. + comments (str): Comments for the admin user. + name (str): + Admin user's "friendly" name, e.g., "FirstName LastName" (this field typically matches userName.) + + Returns: + Box: A Box object representing the newly created admin user. + + Examples: + Create a new admin user with only the required parameters:: + + zcon.admin.add_admin( + name="Jane Smith", + login_name="jsmith", + role="admin", + email="jsmith@example.com", + password="password123", + ) + + Create a new admin with some additional parameters:: + + zcon.admin.add_admin( + name="Jane Smith", + login_name="jsmith", + role="admin", + email="jsmith@example.com", + password="password123", + is_default_admin=False, + disabled=False, + comments="New admin user" + + """ + + payload = { + "loginName": login_name, + "userName": user_name, + "email": email, + "role": role, + "password": password, + } + + # Add optional parameters to payload + payload.update({k: v for k, v in kwargs.items() if v is not None}) + + # Convert snake to camelcase + payload = convert_keys(payload) + + return self._post("adminUsers", json=payload) + + def update_admin(self, admin_id: str, **kwargs) -> Box: + """ + Update an existing admin user. + + Args: + admin_id (str): The ID of the admin user to update. + + Keyword Args: + role (str): The role of the admin user. + email (str): The email address of the admin user. + password (str): The password for the admin user. + disabled (bool): Indicates whether the admin is disabled. + new_location_create_allowed (bool): Indicates whether the admin can create new locations. + admin_scope_type (str): The admin scope type. + admin_scope_group_member_entity_ids (list): Applicable if the admin scope type is `LOCATION_GROUP`. + is_default_admin (bool): Indicates whether the admin is the default admin. + is_deprecated_default_admin (bool): Indicates whether this admin is deletable. + is_auditor (bool): Indicates whether the admin is an auditor. + is_security_report_comm_enabled (bool): Indicates whether the admin can receive security reports. + is_service_update_comm_enabled (bool): Indicates whether the admin can receive service updates. + is_password_login_allowed (bool): Indicates whether the admin can log in with a password. + is_product_update_comm_enabled (bool): Indicates whether the admin can receive product updates. + is_exec_mobile_app_enabled (bool): Indicates whether Executive Insights App access is enabled for the admin. + send_mobile_app_invite (bool): + Indicates whether to send an invitation email to download Executive Insights App to admin. + send_zdx_onboard_invite (bool): Indicates whether to send an invitation email for ZDX onboarding to admin. + comments (str): Comments for the admin user. + name (str): + Admin user's "friendly" name, e.g., "FirstName LastName" (this field typically matches userName.) + + Returns: + Box: A Box object representing the updated admin user. + + Examples: + Update an admin user:: + + zcon.admin.update_admin( + admin_id="123456789", + admin_scope_type="LOCATION_GROUP", + comments="Updated admin user", + ) + + """ + + payload = self.get_admin(admin_id) + + # Add optional parameters to payload + payload.update({k: v for k, v in kwargs.items() if v is not None}) + + # Convert snake to camelcase + payload = convert_keys(payload) + + return self._put(f"adminUsers/{admin_id}", json=payload) + + def delete_admin(self, admin_id: str) -> int: + """ + Delete the specified admin user. + + Args: + admin_id (str): The ID of the admin user to delete. + + Returns: + :obj:`int`: The status code of the operation. + + Examples: + Delete an admin user:: + + zcon.admin.delete_admin("123456789") + + """ + return self._delete(f"adminUsers/{admin_id}").status_code + + def list_api_keys(self, **kwargs) -> BoxList: + """ + List all existing API keys. + + Keyword Args: + include_partner_keys (bool): Include / exclude partner keys in the response. + + Returns: + :obj:`BoxList`: The list of API keys. + + Examples: + List all API keys:: + + for api_key in zcon.admin.list_api_keys(): + print(api_key) + """ + params = {} + if "include_partner_keys" in kwargs: + params["includePartnerKeys"] = kwargs["include_partner_keys"] + + return self._get("apiKeys", params=params) + + def regenerate_api_key(self, api_key_id: str) -> Box: + """ + Regenerate the specified API key. + + Args: + api_key_id (str): The ID of the API key to regenerate. + + Returns: + :obj:`Box`: The regenerated API key. + + Examples: + Regenerate an API key:: + + print(zcon.admin.regenerate_api_key("123456789")) + + """ + return self._post(f"apiKeys/{api_key_id}/regenerate") diff --git a/pyzscaler/zcon/config.py b/pyzscaler/zcon/config.py new file mode 100644 index 0000000..685eee7 --- /dev/null +++ b/pyzscaler/zcon/config.py @@ -0,0 +1,44 @@ +from box import Box +from restfly import APIEndpoint + + +class ZCONConfigAPI(APIEndpoint): + def activate(self, force: bool = False) -> Box: + """ + Activate the configuration. + + Args: + force (bool): If set to True, forces the activation. Default is False. + + Returns: + :obj:`Box`: The status code of the operation. + + Examples: + Activate the configuration without forcing:: + + zcon.config.activate() + + Forcefully activate the configuration:: + + zcon.config.activate(force=True) + + """ + if force: + return self._post("ecAdminActivateStatus/forcedActivate") + else: + return self._post("ecAdminActivateStatus/activate") + + def get_status(self): + """ + Get the status of the configuration. + + Returns: + :obj:`Box`: The status of the configuration. + + Examples: + Get the status of the configuration:: + + print(zcon.config.get_status()) + + """ + return self._get("ecAdminActivateStatus") diff --git a/pyzscaler/zcon/connectors.py b/pyzscaler/zcon/connectors.py new file mode 100644 index 0000000..dd273eb --- /dev/null +++ b/pyzscaler/zcon/connectors.py @@ -0,0 +1,92 @@ +from box import Box, BoxList +from restfly import APIEndpoint + +from pyzscaler.utils import convert_keys + + +class ZCONConnectorsAPI(APIEndpoint): + def list_groups(self, **kwargs) -> BoxList: + """ + List all existing connector groups. + + Keyword Args: + page (int): The page number to return. + page_size (int): The number of items to return per page. + + Returns: + :obj:`BoxList`: The list of cloud and branch connector groups. + + Examples: + List all connector groups:: + + for group in zcon.connectors.list_groups(): + print(group) + + List first page of connector groups with 10 items per page:: + + for group in zcon.connectors.list_groups(page=1, page_size=10): + print(group) + + """ + # There is an edge case in the camelcase to snake conversion that we're going to handle here. We'll revert + # the default camel_killer_box and run it through our conversion function in utils that handles edge-cases. + self._box_attrs = {"camel_killer_box": False} + + return convert_keys(self._get("ecgroup", params=kwargs), "to_snake") + + def get_group(self, group_id: str) -> Box: + """ + Get details for a specific connector group. + + Args: + group_id (str): The ID of the connector group. + + Returns: + :obj:`Box`: The connector group details. + + Examples: + Get details of a specific connector group:: + + print(zcon.connectors.get_group("123456789")) + """ + # There is an edge case in the camelcase to snake conversion that we're going to handle here. We'll revert + # the default camel_killer_box and run it through our conversion function in utils that handles edge-cases. + self._box_attrs = {"camel_killer_box": False} + + return convert_keys(self._get(f"ecgroup/{group_id}"), "to_snake") + + def get_vm(self, group_id: str, vm_id: str) -> Box: + """ + Get details for a specific connector VM. + + Args: + group_id (str): The ID of the connector group. + vm_id (str): The ID of the connector VM. + + Returns: + :obj:`Box`: The connector VM details. + + Examples: + Get details of a specific connector VM:: + + print(zcon.connectors.get_vm("123456789", "987654321")) + """ + return self._get(f"ecgroup/{group_id}/vm/{vm_id}") + + def delete_vm(self, group_id: str, vm_id: str) -> int: + """ + Delete the specified connector VM. + + Args: + group_id (str): The ID of the connector group. + vm_id (str): The ID of the connector VM. + + Returns: + :obj:`Box`: The status of the operation. + + Examples: + Delete a specific connector VM:: + + zcon.connectors.delete_vm("123456789", "987654321") + """ + return self._delete(f"ecgroup/{group_id}/vm/{vm_id}").status_code diff --git a/pyzscaler/zcon/locations.py b/pyzscaler/zcon/locations.py new file mode 100644 index 0000000..41dcee9 --- /dev/null +++ b/pyzscaler/zcon/locations.py @@ -0,0 +1,262 @@ +from box import Box, BoxList +from restfly import APIEndpoint + +from pyzscaler.utils import convert_keys + + +class ZCONLocationsAPI(APIEndpoint): + def list_locations(self, **kwargs) -> BoxList: + """ + List all existing locations. + + Keyword Args: + group_id (str): The ID of the connector group. + search (str): The search string to filter the results. + state (str): The geographical state of the location. + ssl_scan_enabled (bool): Include / exclude locations with SSL scanning enabled. + xff_enabled (bool): Include / exclude locations with XFF enabled. + auth_required (bool): Include / exclude locations with authentication required. + bw_enforced (bool): Include / exclude locations with bandwidth enforcement enabled. + partner_id (str): The ID of the partner. Not used for Cloud/Branch connector + enforce_aup (bool): Include / exclude locations with AUP enforcement enabled. + enable_firewall (bool): Include / exclude locations with firewall enabled. + location_type (str): The type of location, accepted values are: + + - ``NONE`` + - ``CORPORATE`` + - ``SERVER`` + - ``GUESTWIFI`` + - ``IOT`` + - ``WORKLOAD`` + page (int): The page number to return. + page_size (int): The number of items to return per page. + + Returns: + :obj:`BoxList`: The list of connector locations. + + Examples: + List all locations:: + + for location in zcon.locations.list_locations(): + print(location) + + List only IOT locations:: + + for location in zcon.locations.list_locations(location_type="IOT"): + print(location) + + """ + return self._get("location") + + def get_location(self, location_id: str) -> Box: + """ + Get details for a specific location. + + Args: + location_id (str): The ID of the location to retrieve. + + Returns: + :obj:`Box`: The location details. + + Examples: + Get details of a specific location:: + + print(zcon.locations.get_location("123456789")) + + """ + return self._get(f"adminRoles/{location_id}") + + def list_location_templates(self, **kwargs) -> BoxList: + """ + List all existing location templates. + + Args: + **kwargs: Optional keyword args to filter the results. + + Keyword Args: + page (int): The page number to return. + page_size (int): The number of items to return per page. + + Returns: + :obj:`BoxList`: The list of location templates. + + Examples: + List all location templates:: + + for template in zcon.locations.list_location_templates(): + print(template) + + """ + return self._get("locationTemplate", params=kwargs) + + def get_location_template(self, template_id: str) -> Box: + """ + Get details for a specific location template. + + Args: + template_id (str): The ID of the location template to retrieve. + + Returns: + :obj:`Box`: The location template details. + + Examples: + Get details of a specific location template:: + + print(zcon.locations.get_location_template("123456789")) + + """ + return self._get(f"locationTemplate/{template_id}") + + def add_location_template(self, name: str, template: dict = None, **kwargs) -> Box: + """ + Add a new location template. + + Args: + name (str): The name of the location template. + template (dict, optional): A dictionary containing the template settings. Possible keys include: + + - ``template_prefix`` (str): Prefix of Cloud & Branch Connector location template. + - ``xff_forward_enabled`` (bool): Enable to use the X-Forwarded-For headers. + - ``auth_required`` (bool): Enable if "Authentication Required" is needed. + - ``caution_enabled`` (bool): Enable to display an end user notification for unauthenticated traffic. + - ``aup_enabled`` (bool): Enable to display an Acceptable Use Policy (AUP) for unauthenticated traffic. + - ``aup_timeout_in_days`` (int): Frequency in days for displaying the AUP, if enabled. + - ``ofw_enabled`` (bool): Enable the service's firewall controls. + - ``ips_control`` (bool): Enable IPS controls, if firewall is enabled. + - ``enforce_bandwidth_control`` (bool): Enable to specify bandwidth limits. + - ``up_bandwidth`` (int): Upload bandwidth in Mbps, if bandwidth control is enabled. + - ``dn_bandwidth`` (int): Download bandwidth in Mbps, if bandwidth control is enabled. + - ``display_time_unit`` (str): Time unit for IP Surrogate idle time to disassociation. + - ``idle_time_in_minutes`` (int): User mapping idle time in minutes for IP Surrogate. + - ``surrogate_ip_enforced_for_known_browsers`` (bool): Enforce IP Surrogate for all known browsers. + - ``surrogate_refresh_time_unit`` (str): Time unit for refresh time for re-validation of surrogacy. + - ``surrogate_refresh_time_in_minutes`` (int): Refresh time in minutes for re-validation of surrogacy. + - ``surrogate_ip`` (bool): Enable the IP Surrogate feature. + + Keyword Args: + description (str): The description of the location template. + + Returns: + :obj:`Box`: The location template details. + + Examples: + Add a new location template with minimal settings:: + + print(zcon.locations.add_location_template(name="MyTemplate")) + + Add a new location template with additional settings:: + + template_settings = { + "surrogate_ip": True, + "surrogate_ip_enforced_for_known_browsers": False, + "template_prefix": "office", + "aup_enabled": True, + "aup_timeout_in_days": 30, + "ofw_enabled": True, + "idle_time_in_minutes": 35, + "auth_required": True, + "display_time_unit": "MINUTE", + } + print(zcon.locations.add_location_template(name="MyTemplate", template=template_settings)) + """ + # Rename 'description' to 'desc' if it exists + if "description" in kwargs: + kwargs["desc"] = kwargs.pop("description") + + payload = {"name": name, "template": template if template is not None else {}} + + # Add optional parameters to payload + payload.update({k: v for k, v in kwargs.items() if v is not None}) + + # Convert snake to camelcase + payload = convert_keys(payload) + + return self._post("locationTemplate", json=payload) + + def update_location_template(self, template_id: str, **kwargs) -> Box: + """ + Update an existing location template. + + Args: + template_id (str): The ID of the location template to update. + + Keyword Args: + name (str): The name of the location template. + template (dict): A dictionary containing the template settings. Possible keys include: + + - ``template_prefix`` (str): Prefix of Cloud & Branch Connector location template. + - ``xff_forward_enabled`` (bool): Enable to use the X-Forwarded-For headers. + - ``auth_required`` (bool): Enable if "Authentication Required" is needed. + - ``caution_enabled`` (bool): Enable to display an end user notification for unauthenticated traffic. + - ``aup_enabled`` (bool): Enable to display an Acceptable Use Policy (AUP) for unauthenticated traffic. + - ``aup_timeout_in_days`` (int): Frequency in days for displaying the AUP, if enabled. + - ``ofw_enabled`` (bool): Enable the service's firewall controls. + - ``ips_control`` (bool): Enable IPS controls, if firewall is enabled. + - ``enforce_bandwidth_control`` (bool): Enable to specify bandwidth limits. + - ``up_bandwidth`` (int): Upload bandwidth in Mbps, if bandwidth control is enabled. + - ``dn_bandwidth`` (int): Download bandwidth in Mbps, if bandwidth control is enabled. + - ``display_time_unit`` (str): Time unit for IP Surrogate idle time to disassociation. + - ``idle_time_in_minutes`` (int): User mapping idle time in minutes for IP Surrogate. + - ``surrogate_ip_enforced_for_known_browsers`` (bool): Enforce IP Surrogate for all known browsers. + - ``surrogate_refresh_time_unit`` (str): Time unit for refresh time for re-validation of surrogacy. + - ``surrogate_refresh_time_in_minutes`` (int): Refresh time in minutes for re-validation of surrogacy. + - ``surrogate_ip`` (bool): Enable the IP Surrogate feature. + description (str): A description for the location template. + + Returns: + :obj:`Box`: The updated location template details. + + Note: + - Any provided keys will update existing keys. + - The template dictionary does not support partial updates. Any provided template will completely overwrite + the existing template. + + Examples: + Update the name of a location template:: + + print(zcon.locations.update_location_template(template_id="123456789", name="MyTemplate")) + + Update the template details of a location template:: + + template_settings = { + "surrogate_ip": True, + "surrogate_ip_enforced_for_known_browsers": False, + "template_prefix": "office", + "aup_enabled": True, + "aup_timeout_in_days": 30, + "ofw_enabled": True, + "idle_time_in_minutes": 4, # <-- changed to 4 hours + "auth_required": True, + "display_time_unit": "HOUR", # <-- changed from minutes to hours + } + print(zcon.locations.update_location_template(template_id="123456789", template=template_settings)) + """ + + # Rename 'description' to 'desc' if it exists + if "description" in kwargs: + kwargs["desc"] = kwargs.pop("description") + + # Retrieve existing location template + payload = self.get_location_template(template_id) + + # Merge all kwargs into payload + payload.update(convert_keys(kwargs)) + + return self._put(f"locationTemplate/{template_id}", json=payload) + + def delete_location_template(self, template_id: str): + """ + Delete an existing location template. + + Args: + template_id (str): The ID of the location template to delete. + + Returns: + :obj:`int`: The status code of the operation. + + Examples: + Delete a location template:: + + print(zcon.locations.delete_location_template("123456789")) + """ + return self._delete(f"locationTemplate/{template_id}").status_code diff --git a/pyzscaler/zcon/session.py b/pyzscaler/zcon/session.py new file mode 100644 index 0000000..a0c312a --- /dev/null +++ b/pyzscaler/zcon/session.py @@ -0,0 +1,65 @@ +from box import Box +from restfly import APIEndpoint + +from pyzscaler.utils import obfuscate_api_key + + +class ZCONSessionAPI(APIEndpoint): + def status(self) -> Box: + """ + Returns the status of the authentication session if it exists. + + Returns: + :obj:`Box`: Session authentication information. + + Examples: + Check the status of the authentication session:: + + print(zcon.session.status()) + """ + return self._get("auth") + + def create(self, api_key: str, username: str, password: str) -> Box: + """ + Create a new ZCON authentication session. + + Args: + api_key (str): The ZCON API Key. + username (str): Username of admin user for the authentication session. + password (str): Password of the admin user for the authentication session. + + Returns: + :obj:`Box`: The authenticated session information. + + Examples: + Create a new authentication session:: + + zcon.session.create( + api_key='123456789', + username='admin@example.com', + password='MyInsecurePassword' + ) + """ + api_obf = obfuscate_api_key(api_key) + + payload = { + "apiKey": api_obf["key"], + "username": username, + "password": password, + "timestamp": api_obf["timestamp"], + } + return self._post("auth", json=payload) + + def delete(self) -> int: + """ + End the authentication session. + + Returns: + :obj:`int`: The status code of the operation. + + Examples: + End the authentication session:: + + print(zcon.session.delete()) + """ + return self._delete("auth").status diff --git a/pyzscaler/zia/__init__.py b/pyzscaler/zia/__init__.py index 994245f..c6a129c 100644 --- a/pyzscaler/zia/__init__.py +++ b/pyzscaler/zia/__init__.py @@ -6,7 +6,9 @@ from pyzscaler import __version__ from .admin_and_role_management import AdminAndRoleManagementAPI +from .apptotal import AppTotalAPI from .audit_logs import AuditLogsAPI +from .cloud_apps import CloudAppsAPI from .config import ActivationAPI from .dlp import DLPAPI from .firewall import FirewallPolicyAPI @@ -99,6 +101,14 @@ def admin_and_role_management(self): """ return AdminAndRoleManagementAPI(self) + @property + def apptotal(self): + """ + The interface object for the :ref:`ZIA AppTotal interface `. + + """ + return AppTotalAPI(self) + @property def audit_logs(self): """ @@ -148,6 +158,14 @@ def locations(self): """ return LocationsAPI(self) + @property + def cloud_apps(self): + """ + The interface object for the :ref:`ZIA Cloud Applications interface `. + + """ + return CloudAppsAPI(self) + @property def sandbox(self): """ diff --git a/pyzscaler/zia/apptotal.py b/pyzscaler/zia/apptotal.py new file mode 100644 index 0000000..030a312 --- /dev/null +++ b/pyzscaler/zia/apptotal.py @@ -0,0 +1,51 @@ +from box import Box +from restfly.endpoint import APIEndpoint + + +class AppTotalAPI(APIEndpoint): + def get_app(self, app_id: str, verbose: bool = False) -> Box: + """ + Searches the AppTotal App Catalog by app ID. If the app exists in the catalog, the app's information is + returned. If not, the app is submitted for analysis. After analysis is complete, a subsequent GET request is + required to fetch the app's information. + + Args: + app_id (str): The app ID to search for. + verbose (bool, optional): Defaults to False. + + Returns: + :obj:`Box`: The response object. + + Examples: + Return verbose information on an app with ID 12345:: + + zia.apptotal.get_app(app_id="12345", verbose=True) + + """ + params = { + "app_id": app_id, + "verbose": verbose, + } + return self._get("apps/app", params=params) + + def scan_app(self, app_id: str) -> Box: + """ + Submits an app for analysis in the AppTotal Sandbox. After analysis is complete, a subsequent GET request is + required to fetch the app's information. + + Args: + app_id (str): The app ID to scan. + + Returns: + :obj:`Box`: The response object. + + Examples: + Scan an app with ID 12345:: + + zia.apptotal.scan_app(app_id="12345") + + """ + payload = { + "appId": app_id, + } + return self._post("apps/app", json=payload) diff --git a/pyzscaler/zia/cloud_apps.py b/pyzscaler/zia/cloud_apps.py new file mode 100644 index 0000000..ec0a2b9 --- /dev/null +++ b/pyzscaler/zia/cloud_apps.py @@ -0,0 +1,380 @@ +from restfly.endpoint import APIEndpoint + +from pyzscaler.utils import convert_keys + + +class CloudAppsAPI(APIEndpoint): + @staticmethod + def _convert_ids_to_dict_list(id_list): + """Helper function to convert a list of IDs into a list of dictionaries. + + Args: + id_list (list): A list of IDs (str). + + Returns: + list: A list of dictionaries, each with an 'id' key. + """ + return [{"id": str(id)} for id in id_list] + + def bulk_update(self, sanction_state: str, **kwargs): + """ + Updates application status and tag information for predefined or custom cloud applications based on the + IDs specified. + + Args: + sanction_state (str): The sanction state to apply to the cloud applications. + + Accepted values are: + + ``sanctioned``: The cloud application is sanctioned. + ``unsanctioned``: The cloud application is unsanctioned. + ``any``: The cloud application is either sanctioned or unsanctioned. + + **kwargs: + Optional keyword args + + Keyword Args: + application_ids (list): A list of cloud application IDs to update. + custom_tag_ids (list): A list of custom tag IDs to apply to the cloud applications. + + Returns: + :obj:`dict`: The response from the ZIA API. + + Examples: + Update the sanction state of a cloud application:: + + zia.cloud_apps.bulk_update("sanctioned", application_ids=["12345"]) + + Update the sanction state and custom tags of a cloud application:: + + zia.cloud_apps.bulk_update("sanctioned", application_ids=["12345"], custom_tag_ids=["67890"]) + + """ + # Mapping for user-friendly sanction state values to API-expected values + sanction_state_mapping = {"sanctioned": "SANCTIONED", "unsanctioned": "UN_SANCTIONED", "any": "ANY"} + + # Convert user-friendly sanction state to ZIA API-expected value + api_sanction_state = sanction_state_mapping.get(sanction_state.lower()) + if not api_sanction_state: + raise ValueError( + f"Invalid sanction state: {sanction_state}. Accepted values are 'sanctioned', 'unsanctioned', or 'any'." + ) + + payload = {"sanctionedState": api_sanction_state} + + # Process application_ids if provided in kwargs + application_ids = kwargs.pop("application_ids", None) + if application_ids is not None: + payload["applicationIds"] = application_ids + + # Process custom_tag_ids if provided in kwargs + custom_tag_ids = kwargs.pop("custom_tag_ids", None) + if custom_tag_ids is not None: + payload["customTags"] = self._convert_ids_to_dict_list(custom_tag_ids) + + return self._put("cloudApplications/bulkUpdate", json=payload).status_code + + def export_shadow_it_report(self, duration: str = "LAST_1_DAYS", **kwargs) -> str: + """ + Export the Shadow IT Report (in CSV format) for the cloud applications recognized by Zscaler + based on their usage in your organisation. + + Args: + duration (str): + Filters the data by using predefined time frames. Defaults to last day. + + Possible values: ``LAST_1_DAYS``, ``LAST_7_DAYS``, ``LAST_15_DAYS``, ``LAST_MONTH``, ``LAST_QUARTER`` + **kwargs: + Arbitrary keyword arguments for filtering the report. + + Keyword Args: + app_name (str): Filters the data based on the cloud application name that matches the specified string. + order (dict): + Sorts the list in increasing or decreasing order based on the specified attribute. + + Example format for this parameter: + + ``order={"on": "RISK_SCORE", "by": "INCREASING"}`` + + Possible values for ``on``: + + ``RISK_SCORE``, ``APPLICATION``, ``APPLICATION_CATEGORY``, + ``SANCTIONED_STATE``, ``TOTAL_BYTES``, ``UPLOAD_BYTES``, ``DOWNLOAD_BYTES``, ``AUTHENTICATED_USERS``, + ``TRANSACTION_COUNT``, ``UNAUTH_LOCATION``, ``LAST_ACCESSED``. + + Possible values for ``by``: + + ``INCREASING``, ``DECREASING``. + application_category (str): Filters the data based on the cloud application category. + + Possible values: ``ANY``, ``NONE``, ``WEB_MAIL``, ``SOCIAL_NETWORKING``, ``STREAMING``, ``P2P``, + ``INSTANT_MESSAGING``, ``WEB_SEARCH``, ``GENERAL_BROWSING``, ``ADMINISTRATION``, ``ENTERPRISE_COLLABORATION``, + ``BUSINESS_PRODUCTIVITY``, ``SALES_AND_MARKETING``, ``SYSTEM_AND_DEVELOPMENT``, ``CONSUMER``, ``FILE_SHARE``, + ``HOSTING_PROVIDER``, ``IT_SERVICES``, ``DNS_OVER_HTTPS``, ``HUMAN_RESOURCES``, ``LEGAL``, ``HEALTH_CARE``, + ``FINANCE``, ``CUSTOM_CAPP`` + data_consumed (dict): + Filters the data by cloud application usage in terms of total data uploaded and downloaded. + + Example format for this parameter: + + data_consumed={"min": 100, "max": 1000} + + ``min`` and ``max`` fields specify the range respectively. + risk_index (int): + Filters the data based on the risk index assigned to cloud applications. + + Possible values: ``1``, ``2``, ``3``, ``4``, ``5`` + sanctioned_state (str): + Filters the data based on the status of cloud applications. + + Possible values: ``UN_SANCTIONED``, ``SANCTIONED``, ``ANY`` + employees (str): + Filters the data based on the employee count of the cloud application vendor. + + Possible values: ``NONE``, ``RANGE_1_100``, ``RANGE_100_1000``, ``RANGE_1000_10000``, + ``RANGE_10000_INF`` + supported_certifications (dict): Filters the cloud applications by security certifications. + + Example format for this parameter: + + ``supported_certifications={"operation": "INCLUDE", "value": ["ISO_27001", "HIPAA"]}`` + + Possible values for ``operation`` field: ``INCLUDE`` and ``EXCLUDE``. + + Possible values for ``value`` field: ``NONE``, ``CSA_STAR``, ``ISO_27001``, ``HIPAA``, ``FISMA``, + ``FEDRAMP``, ``SOC2``, ``ISO_27018``, ``PCI_DSS``, ``ISO_27017``, ``SOC1``, ``SOC3``, ``GDPR``, + ``CCPA``, ``FERPA``, ``COPPA``, ``HITECH``, ``EU_US_SWISS_PRIVACY_SHIELD``, + ``EU_US_PRIVACY_SHIELD_FRAMEWORK``, ``CISP``, ``AICPA``, ``FIPS``, ``SAFE_BIOPHARMA``, ``ISAE_3000``, + ``SSAE_18``, ``NIST``, ``ISO_14001``, ``SOC``, ``TRUSTE``, ``ISO_26262``, ``ISO_20252``, ``RGPD``, + ``ISO_20243``, ``ISO_10002``, ``JIS_Q_15001_2017``, ``ISMAP``. + source_ip_restriction (str): + Filters the cloud applications based on whether they have source IP restrictions. + + Possible values: ``YES``, ``NO``, ``UNKNOWN``. + mfa_support (str): Filters the cloud applications based on whether they support multi-factor authentication. + + Possible values: ``YES``, ``NO``, ``UNKNOWN``. + admin_audit_logs (str): Filters the cloud applications based on whether they support admin audit logging. + + Possible values: ``YES``, ``NO``, ``UNKNOWN``. + had_breach_in_last_3_years (str): + Filters the cloud applications based on data breaches in the last three years. + + Possible values: ``YES``, ``NO``, ``UNKNOWN``. + have_poor_items_of_service (str): Filters the cloud applications based on their terms of service. + + Possible values: ``YES``, ``NO``, ``UNKNOWN``. + password_strength (str): Filters the cloud applications based on whether they require strong passwords. + + Possible values: ``YES``, ``NO``, ``UNKNOWN``. + ssl_pinned (str): Filters the cloud applications based on whether they use SSL Pinning. + + Possible values: ``YES``, ``NO``, ``UNKNOWN``. + evasive (str): Filters the cloud applications based on their capability to bypass traditional firewalls. + + Possible values: ``YES``, ``NO``, ``UNKNOWN``. + have_http_security_header_support (str): Filters the cloud applications by the presence of security headers. + + Possible values: ``YES``, ``NO``, ``UNKNOWN``. + dns_caa_policy (str): Filters the cloud applications by the presence of DNS CAA policy. + + Possible values: ``YES``, ``NO``, ``UNKNOWN``. + have_weak_cipher_support (str): Filters the cloud applications based on the cryptographic keys used. + + Possible values: ``YES``, ``NO``, ``UNKNOWN``. + ssl_certification_validity (str): Filters the cloud applications based on SSL certificate validity. + + Possible values: ``YES``, ``NO``, ``UNKNOWN``. + file_sharing (str): Filters the cloud applications based on whether they include file-sharing provision. + + Possible values: ``YES``, ``NO``, ``UNKNOWN``. + malware_scanning_content (str): + Filters the cloud applications based on whether they include malware content. + + Possible values: ``YES``, ``NO``, ``UNKNOWN``. + remote_access_screen_sharing (str): + Filters the cloud applications based on whether they support remote access and screen sharing. + + Possible values: ``YES``, ``NO``, ``UNKNOWN``. + sender_policy_framework (str): + Filters the cloud applications based on whether they support Sender Policy Framework. + + Possible values: ``YES``, ``NO``, ``UNKNOWN``. + domain_keys_identified_mail (str): + Filters the cloud applications based on whether they support DomainKeys Identified Mail. + + Possible values: ``YES``, ``NO``, ``UNKNOWN``. + domain_based_message_authentication (str): + Filters the cloud applications based on whether they support Domain-based Message Authentication. + + Possible values: ``YES``, ``NO``, ``UNKNOWN``. + vulnerable_disclosure_program (str): + Filters the cloud applications based on whether they support Vulnerability Disclosure Policy. + + Possible values: ``YES``, ``NO``, ``UNKNOWN``. + waf_support (str): Filters the cloud applications based on whether WAF is enabled for the applications. + + Possible values: ``YES``, ``NO``, ``UNKNOWN``. + vulnerability (str): + Filters the cloud applications based on whether they have published Common Vulnerabilities and + Exposures (CVE). + + Possible values: ``YES``, ``NO``, ``UNKNOWN``. + valid_ssl_certificate (str): + Filters the cloud applications based on whether they have a valid SSL certificate. + + Possible values: ``YES``, ``NO``, ``UNKNOWN``. + data_encryption_in_transit (str): + Filters the cloud applications based on whether they support data encryption in transit. + + Possible values: ``YES``, ``NO``, ``UNKNOWN``. + vulnerable_to_heart_bleed (str): + Filters the cloud applications based on whether they are vulnerable to Heartbleed attack. + + Possible values: ``YES``, ``NO``, ``UNKNOWN``. + vulnerable_to_poodle (str): + Filters the cloud applications based on whether they are vulnerable to Poodle attack. + + Possible values: ``YES``, ``NO``, ``UNKNOWN``. + vulnerable_to_log_jam (str): + Filters the cloud applications based on whether they are vulnerable to Logjam attack. + + Possible values: ``YES``, ``NO``, ``UNKNOWN``. + cert_key_size (dict): + Filters the data by the size of the SSL certificate public keys used by the cloud applications. + + Example format for this parameter: + + ``cert_key_size={"operation": "INCLUDE", "value": ["BITS_2048", "BITS_256"]}`` + + Possible values for ``operation`` field: + + ``INCLUDE``, ``EXCLUDE``. + + Possible values for ``value`` field: + + ``NONE``, ``UN_KNOWN``, ``BITS_2048``, ``BITS_256``, + ``BITS_3072``, ``BITS_384``, ``BITS_4096``, ``BITS_1024``. + + Returns: + :obj:`str`: The Shadow IT Report in CSV format. + + Examples: + Export the Shadow IT Report for the last 7 days:: + + report = zia.shadow_it.export_shadow_it_report('LAST_7_DAYS') + + Notes: + Zscaler has a rate limit of 1 report per-minute, ensure you take this into account when calling this method. + + """ + payload = {"duration": duration} + convert_keys(payload.update(kwargs)) + + return self._post("shadowIT/applications/export", json=payload).text + + def export_shadow_it_csv(self, application: str, entity: str, duration: str = "LAST_1_DAYS", **kwargs): + """ + Export the Shadow IT Report (in CSV format) for the list of users or known locations + identified with using the cloud applications specified in the request. The report + includes details such as user interactions, application category, application usage, + number of transactions, last accessed time, etc. + + You can customize the report using various filters. + + Args: + application (str): The cloud application for which user or location data must be retrieved. + Note: Only one cloud application can be specified at a time. + duration (str): Filters the data using predefined timeframes. Defaults to last day. + + Possible values: ``LAST_1_DAYS``, ``LAST_7_DAYS``, ``LAST_15_DAYS``, ``LAST_MONTH``, ``LAST_QUARTER``. + entity (str): The entity type that the Shadow IT Report will be generated for. + + Possible values: ``USER``, ``LOCATION``. + + Keyword Args: + order (dict): Sorts the list in increasing or decreasing order based on the specified attribute. + + Example format for this parameter: + + ``order={"on": "RISK_SCORE", "by": "INCREASING"}`` + + Possible values for ``on``: + + ``RISK_SCORE``, ``APPLICATION``, ``APPLICATION_CATEGORY``, + ``SANCTIONED_STATE``, ``TOTAL_BYTES``, ``UPLOAD_BYTES``, ``DOWNLOAD_BYTES``, ``AUTHENTICATED_USERS``, + ``TRANSACTION_COUNT``, ``UNAUTH_LOCATION``, ``LAST_ACCESSED``. + + Possible values for ``by``: + + ``INCREASING``, ``DECREASING``. + download_bytes (dict): Filters by the amount of data (in bytes) downloaded from the application. + ``min`` and ``max`` fields specify the range. + upload_bytes (dict): Filters by the amount of data (in bytes) uploaded to the application. + ``min`` and ``max`` fields specify the range. + data_consumed (dict): Filters by the total amount of data uploaded and downloaded from the application. + ``min`` and ``max`` fields specify the range. + users (dict): Filters by user. + ``id`` and ``name`` fields specify the user information. + locations (dict): Filters by location. + ``id`` and ``name`` fields specify the location information. + departments (dict): Filters by department. + ``id`` and ``name`` fields specify the department information. + + Returns: + :obj:`str`: The Shadow IT Report in CSV format. + + Examples: + Export the Shadow IT Report for GitHub the last 15 days:: + + report = zia.shadow_it.export_shadow_it_report(application="Github", duration="LAST_15_DAYS") + + Notes: + Zscaler has a rate limit of 1 report per-minute, ensure you take this into account when calling this method. + """ + + payload = {"application": application, "duration": duration} + + # Process user_ids, location_ids, and department_ids if provided in kwargs + for key in ["users", "locations", "departments"]: + id_list = kwargs.pop(key, None) + if id_list is not None: + payload[key] = self._convert_ids_to_dict_list(id_list) + + convert_keys(payload.update(kwargs)) + + return self._post(f"shadowIT/applications/{entity}/exportCsv", json=payload).text + + def list_apps(self): + """ + List all predefined and custom cloud applications by name and id. + + Returns: + :obj:`BoxList` of :obj:`Box`: A list of cloud applications. + + Examples: + List all cloud applications:: + + apps = zia.cloud_apps.list_apps() + for app in apps: + print(app.name) + + """ + return self._get("cloudApplications/lite") + + def list_custom_tags(self): + """ + List all custom tags by name and id. + + Returns: + :obj:`BoxList` of :obj:`Box`: A list of custom tags available to assign to cloud applications. + + Examples: + List all custom tags:: + + tags = zia.cloud_apps.list_custom_tags() + for tag in tags: + print(tag.name) + + """ + return self._get("customTags") diff --git a/pyzscaler/zia/locations.py b/pyzscaler/zia/locations.py index c14eef3..52ce5cd 100644 --- a/pyzscaler/zia/locations.py +++ b/pyzscaler/zia/locations.py @@ -421,3 +421,73 @@ def delete_location(self, location_id: str) -> int: """ return self._delete(f"locations/{location_id}", box=False).status_code + + def get_geo_by_coordinates(self, latitude: int, longitude: int) -> Box: + """ + Retrieves the geographical data of the region or city that is located in the specified latitude and longitude + coordinates. The geographical data includes the city name, state, country, geographical ID of the city and + state, etc. + + Args: + latitude (int): The latitude of the location. + longitude (int): The longitude of the location. + + Returns: + :obj:`Box`: The geographical data of the region or city that is located in the specified coordinates. + + Examples: + Get the geographical data of the region or city that is located in the specified coordinates:: + + print(zia.locations.get_geo_by_coordinates(37.3860517, -122.0838511)) + + """ + payload = {"latitude": latitude, "longitude": longitude} + return self._get("region/byGeoCoordinates", params=payload) + + def get_geo_by_ip(self, ip: str) -> Box: + """ + Retrieves the geographical data of the region or city that is located in the specified IP address. The + geographical data includes the city name, state, country, geographical ID of the city and state, etc. + + Args: + ip (str): The IP address of the location. + + Returns: + :obj:`Box`: The geographical data of the region or city that is located in the specified IP address. + + Examples: + Get the geographical data of the region or city that is located in the specified IP address:: + + print(zia.locations.get_geo_by_ip("8.8.8.8") + """ + return self._get(f"region/byIPAddress/{ip}") + + def list_cities_by_name(self, **kwargs) -> BoxList: + """ + Retrieves the list of cities (along with their geographical data) that match the prefix search. The geographical + data includes the latitude and longitude coordinates of the city, geographical ID of the city and state, + country, postal code, etc. + + Args: + **kwargs: Optional keyword arguments. + + Keyword Args: + prefix (str): The prefix string to search for cities. + page (int): The page number of the results. + page_size (int): The number of results per page. + + Returns: + :obj:`BoxList`: The list of cities (along with their geographical data) that match the prefix search. + + Examples: + Get the list of cities (along with their geographical data) that match the prefix search:: + + for city in zia.locations.list_cities_by_name(prefix="San Jose"): + print(city) + + Notes: + Very broad or generic search terms may return a large number of results which can take a long time to be + returned. Ensure you narrow your search result as much as possible to avoid this. + + """ + return BoxList(Iterator(self._api, "region/search", **kwargs)) diff --git a/pyzscaler/zia/traffic.py b/pyzscaler/zia/traffic.py index 89d6ca3..0d14582 100644 --- a/pyzscaler/zia/traffic.py +++ b/pyzscaler/zia/traffic.py @@ -661,3 +661,82 @@ def delete_vpn_credential(self, credential_id: str) -> int: """ return self._delete(f"vpnCredentials/{credential_id}", box=False).status_code + + def get_ipv6_config(self) -> Box: + """ + Returns the IPv6 configuration for the organisation. + + Returns: + :obj:`Box`: The IPv6 configuration for the organisation. + + Examples: + Get the IPv6 configuration for the organisation:: + + zia.traffic.get_ipv6_config() + + """ + # There is an edge case in the camelcase to snake conversion that we're going to handle here. We'll revert + # the default camel_killer_box and run it through our conversion function in utils that handles edge-cases. + self._box_attrs = {"camel_killer_box": False} + + return convert_keys(self._get("ipv6config"), "to_snake") + + def list_dns64_prefixes(self, **kwargs): + """ + Returns the list of NAT64 prefixes configured as the DNS64 prefix for the organisation + + Keyword Args: + search (str): Search string to filter results by. Defaults to None. + + Returns: + :obj:`BoxList`: List of NAT64 prefixes configured as the DNS64 prefix for the organisation + + Examples: + List DNS64 prefixes using default settings:: + + for prefix in zia.traffic.list_dns64_prefixes(): + print(prefix) + + """ + + return self._get("ipv6config/dns64prefix", params=kwargs) + + def list_nat64_prefixes(self, **kwargs) -> BoxList: + """ + Returns the list of NAT64 prefixes configured for the organisation + + Keyword Args: + page (int): Page number to return. Defaults to 1. + page_size (int): Number of results to return per page. Defaults to 100. Max size is 1000. + search (str, optional): Search string to filter results by. Defaults to None. + + Returns: + :obj:`BoxList`: List of NAT64 prefixes configured for the organisation + + Examples: + List NAT64 prefixes using default settings:: + + for prefix in zia.traffic.list_nat64_prefixes(): + print(prefix) + + """ + return BoxList(Iterator(self._api, "ipv6config/nat64prefix", **kwargs)) + + def list_gre_ip_addresses(self, **kwargs) -> BoxList: + """ + Returns a list of IP addresses with GRE tunnel details. + + Keyword Args: + ip_addresses (list[str]): Filter based on the list of IP addresses provided. + + Returns: + :obj:`BoxList`: List of GRE IP addresses configured for the organisation + + Examples: + List GRE IP addresses using default settings:: + + for ip_address in zia.traffic.list_gre_ip_addresses(): + print(ip_address) + + """ + return self._get("orgProvisioning/ipGreTunnelInfo", params=convert_keys(kwargs)) diff --git a/tests/zcc/conftest.py b/tests/zcc/conftest.py index 798201d..ae1d1e7 100644 --- a/tests/zcc/conftest.py +++ b/tests/zcc/conftest.py @@ -24,5 +24,5 @@ def zcc(session): return ZCC( client_id="abc123", client_secret="999999", - company_id="88888", + cloud="zscaler", ) diff --git a/tests/zcon/conftest.py b/tests/zcon/conftest.py new file mode 100644 index 0000000..5aa98eb --- /dev/null +++ b/tests/zcon/conftest.py @@ -0,0 +1,32 @@ +import pytest +import responses + +from pyzscaler.zcon import ZCON + + +@pytest.fixture(name="session") +def fixture_session(): + return { + "authType": "ADMIN_LOGIN", + "obfuscateApiKey": False, + "passwordExpiryTime": 0, + "passwordExpiryDays": 0, + } + + +@pytest.fixture(name="zcon") +@responses.activate +def zcon(session): + responses.add( + responses.POST, + url="https://connector.zscaler.net/api/v1/auth", + content_type="application/json", + json=session, + status=200, + ) + return ZCON( + username="test@example.com", + password="hunter2", + cloud="zscaler", + api_key="123456789abcdef", + ) diff --git a/tests/zcon/test_zcon_admin.py b/tests/zcon/test_zcon_admin.py new file mode 100644 index 0000000..9c2c550 --- /dev/null +++ b/tests/zcon/test_zcon_admin.py @@ -0,0 +1,440 @@ +import pytest +import responses +from box import Box, BoxList +from responses.matchers import json_params_matcher + + +@pytest.fixture(name="admin_users") +def fixture_admin_users(): + return [ + { + "id": 1, + "name": "Admin1", + "role": "SuperAdmin", + }, + { + "id": 2, + "name": "Admin2", + "role": "RegularAdmin", + }, + ] + + +@pytest.fixture(name="admin_roles") +def fixture_admin_roles(): + return [ + { + "id": 11111, + "rank": 7, + "name": "NewRole", + "policyAccess": "NONE", + }, + { + "id": 22222, + "rank": 7, + "name": "AdvancedRole", + "policyAccess": "READ_ONLY", + "alertingAccess": "READ_WRITE", + "featurePermissions": { + "APIKEY_MANAGEMENT": "READ_ONLY", + "EDGE_CONNECTOR_CLOUD_PROVISIONING": "NONE", + }, + }, + ] + + +@pytest.fixture +def api_keys(): + return [ + { + "id": 1, + "keyValue": "fakeKeyValue1", + "permissions": ["USER_ACCESS"], + "enabled": True, + "lastModifiedTime": 1631541800, + "lastModifiedBy": { + "id": 1, + "name": "fakeModifier1", + }, + "partner": { + "id": 1, + "name": "fakePartner1", + }, + "partnerUrl": "https://fakepartnerurl1.com", + }, + { + "id": 2, + "keyValue": "fakeKeyValue2", + "permissions": ["USER_ACCESS"], + "enabled": False, + "lastModifiedTime": 1631641800, + "lastModifiedBy": { + "id": 2, + "name": "fakeModifier2", + }, + "partner": { + "id": 2, + "name": "fakePartner2", + }, + "partnerUrl": "https://fakepartnerurl2.com", + }, + ] + + +@responses.activate +def test_add_role_min_args(zcon, admin_roles): + responses.add( + method="POST", + url="https://connector.zscaler.net/api/v1/adminRoles", + json=admin_roles[0], + status=200, + match=[ + json_params_matcher( + { + "name": "NewRole", + "dashboardAccess": "NONE", + "policyAccess": "NONE", + "reportAccess": "NONE", + "roleType": "EDGE_CONNECTOR_ADMIN", + "usernameAccess": "NONE", + } + ) + ], + ) + + resp = zcon.admin.add_role(name="NewRole") + assert isinstance(resp, dict) + assert resp["name"] == "NewRole" + assert resp["id"] == 11111 + + +@responses.activate +def test_add_role_with_args(zcon, admin_roles): + responses.add( + method="POST", + url="https://connector.zscaler.net/api/v1/adminRoles", + json=admin_roles[1], + status=200, + match=[ + json_params_matcher( + { + "name": "AdvancedRole", + "policyAccess": "READ_ONLY", + "alertingAccess": "READ_WRITE", + "dashboardAccess": "NONE", + "reportAccess": "NONE", + "usernameAccess": "NONE", + "roleType": "EDGE_CONNECTOR_ADMIN", + "featurePermissions": {"edgeConnectorCloudProvisioning": "NONE", "apikeyManagement": "READ_ONLY"}, + } + ) + ], + ) + + resp = zcon.admin.add_role( + name="AdvancedRole", + policy_access="READ_ONLY", + feature_permissions_tuples=[ + ("APIKEY_MANAGEMENT", "READ_ONLY"), + ("EDGE_CONNECTOR_CLOUD_PROVISIONING", "NONE"), + ], + alerting_access="READ_WRITE", + ) + assert isinstance(resp, dict) + assert resp["name"] == "AdvancedRole" + assert resp["id"] == 22222 + + +@responses.activate +def test_list_roles(zcon, admin_roles): + responses.add( + method="GET", + url="https://connector.zscaler.net/api/v1/adminRoles", + json=admin_roles, + status=200, + ) + + resp = zcon.admin.list_roles() + assert isinstance(resp, list) + assert len(resp) == len(admin_roles) + assert resp[0]["id"] == admin_roles[0]["id"] + assert resp[1]["name"] == admin_roles[1]["name"] + + +@responses.activate +def test_get_role(zcon, admin_roles): + responses.add( + method="GET", + url="https://connector.zscaler.net/api/v1/adminRoles/11111", + json=admin_roles[0], + status=200, + ) + + resp = zcon.admin.get_role(11111) + assert isinstance(resp, dict) + assert resp["name"] == "NewRole" + assert resp["id"] == 11111 + + +@responses.activate +def test_delete_role(zcon, admin_roles): + responses.add( + method="DELETE", + url="https://connector.zscaler.net/api/v1/adminRoles/11111", + status=200, + ) + + resp = zcon.admin.delete_role(11111) + assert isinstance(resp, int) + assert resp == 200 + + +@responses.activate +def test_update_role(zcon, admin_roles): + role_id = admin_roles[0]["id"] + update_data = { + "policy_access": "READ_ONLY", + "feature_permissions": [("APIKEY_MANAGEMENT", "READ_WRITE"), ("EDGE_CONNECTOR_CLOUD_PROVISIONING", "NONE")], + "alerting_access": "READ_WRITE", + } + + # Simulate the existing role data + responses.add( + method="GET", url=f"https://connector.zscaler.net/api/v1/adminRoles/{role_id}", json=admin_roles[0], status=200 + ) + + # Manually set the expected updated payload + expected_updated_payload = { + "alertingAccess": "READ_WRITE", + "featurePermissions": {"APIKEY_MANAGEMENT": "READ_WRITE", "EDGE_CONNECTOR_CLOUD_PROVISIONING": "NONE"}, + "id": 11111, + "name": "NewRole", + "policyAccess": "READ_ONLY", + "rank": 7, + } + + responses.add( + method="PUT", + url=f"https://connector.zscaler.net/api/v1/adminRoles/{role_id}", + json=expected_updated_payload, + status=200, + match=[json_params_matcher(expected_updated_payload)], + ) + + resp = zcon.admin.update_role(role_id, **update_data) + assert isinstance(resp, Box) + assert resp["id"] == role_id + assert resp["policy_access"] == "READ_ONLY" + assert resp["feature_permissions"] == {"apikey_management": "READ_WRITE", "edge_connector_cloud_provisioning": "NONE"} + assert resp["alerting_access"] == "READ_WRITE" + + +@responses.activate +def test_list_admins_min_args(zcon, admin_users): + responses.add( + method="GET", + url="https://connector.zscaler.net/api/v1/adminUsers?partnerType=EDGE_CONNECTOR_ADMIN", + json=admin_users, + status=200, + ) + + resp = zcon.admin.list_admins() + assert isinstance(resp, BoxList) + assert resp[0]["name"] == "Admin1" + assert resp[1]["name"] == "Admin2" + + +@responses.activate +def test_list_admins_with_args(zcon, admin_users): + responses.add( + method="GET", + url="https://connector.zscaler.net/api/v1/adminUsers?partnerType=EDGE_CONNECTOR_ADMIN&includeAuditorUsers=True&includeAdminUsers=True&includeApiRoles=True", # noqa + json=admin_users, + status=200, + ) + + resp = zcon.admin.list_admins(include_auditor_users=True, include_admin_users=True, include_api_roles=True) + + assert isinstance(resp, BoxList) + assert resp[0]["name"] == "Admin1" + assert resp[1]["name"] == "Admin2" + + +@responses.activate +def test_get_admin(zcon, admin_users): + admin_id = admin_users[0]["id"] + responses.add( + method="GET", url=f"https://connector.zscaler.net/api/v1/adminUsers/{admin_id}", json=admin_users[0], status=200 + ) + + resp = zcon.admin.get_admin(admin_id) + assert isinstance(resp, Box) + assert resp["id"] == admin_id + assert resp["name"] == admin_users[0]["name"] + assert resp["role"] == admin_users[0]["role"] + + +@responses.activate +def test_add_admin(zcon, admin_users): + admin_data = { + "user_name": "John Doe", + "login_name": "johndoe", + "role": "admin", + "email": "johndoe@example.com", + "password": "password123", + "is_default_admin": False, + "disabled": False, + "comments": "New admin user", + } + + responses.add( + method="POST", + url="https://connector.zscaler.net/api/v1/adminUsers", + json=admin_users[0], + status=200, + match=[ + json_params_matcher( + { + "userName": "John Doe", + "loginName": "johndoe", + "role": "admin", + "email": "johndoe@example.com", + "password": "password123", + "isDefaultAdmin": False, + "disabled": False, + "comments": "New admin user", + } + ) + ], + ) + + resp = zcon.admin.add_admin(**admin_data) + assert isinstance(resp, Box) + assert resp["name"] == admin_users[0]["name"] + assert resp["id"] == admin_users[0]["id"] + assert resp["role"] == admin_users[0]["role"] + + +@responses.activate +def test_update_admin(zcon, admin_users): + admin_id = admin_users[0]["id"] + update_data = {"role": "new_role", "email": "newemail@example.com", "disabled": True, "comments": "Updated admin user"} + + # Simulate the existing admin data + responses.add( + method="GET", + url=f"https://connector.zscaler.net/api/v1/adminUsers/{admin_id}", + json=admin_users[0], + status=200, + ) + + # Mock the update operation + updated_admin = admin_users[0].copy() + updated_admin.update(update_data) + + responses.add( + method="PUT", + url=f"https://connector.zscaler.net/api/v1/adminUsers/{admin_id}", + json=updated_admin, + status=200, + match=[ + json_params_matcher( + { + "role": "new_role", + "email": "newemail@example.com", + "disabled": True, + "comments": "Updated admin user", + "id": 1, + "name": "Admin1", + } + ) + ], + ) + + resp = zcon.admin.update_admin(admin_id, **update_data) + assert isinstance(resp, Box) + assert resp["id"] == admin_id + assert resp["role"] == "new_role" + assert resp["email"] == "newemail@example.com" + assert resp["disabled"] is True + assert resp["comments"] == "Updated admin user" + + +@responses.activate +def test_delete_admin(zcon): + admin_id_to_delete = "123456789" + responses.add( + method="DELETE", + url=f"https://connector.zscaler.net/api/v1/adminUsers/{admin_id_to_delete}", + status=204, + ) + + resp = zcon.admin.delete_admin(admin_id_to_delete) + assert isinstance(resp, int) + assert resp == 204 + + +@responses.activate +def test_change_password(zcon): + username = "jdoe" + old_password = "oldpassword123" + new_password = "newpassword123" + + responses.add( + method="POST", + url="https://connector.zscaler.net/api/v1/passwordChange", + status=200, + match=[ + responses.json_params_matcher( + { + "userName": username, + "oldPassword": old_password, + "newPassword": new_password, + } + ) + ], + ) + + resp = zcon.admin.change_password(username, old_password, new_password) + assert isinstance(resp, int) + assert resp == 200 + + +@responses.activate +def test_list_api_keys(zcon, api_keys): + params = {"includePartnerKeys": True} + + responses.add( + method="GET", + url="https://connector.zscaler.net/api/v1/apiKeys?includePartnerKeys=True", + json=api_keys, + status=200, + ) + + resp = zcon.admin.list_api_keys(include_partner_keys=True) + assert isinstance(resp, list) + assert len(resp) == len(api_keys) + assert resp[0]["id"] == api_keys[0]["id"] + assert resp[1]["id"] == api_keys[1]["id"] + + +@responses.activate +def test_regenerate_api_key(zcon, api_keys): + api_key_id = 1 # or "1" if the ID is a string + new_key_value = "newRegenKey" + + # Find the API key with the matching ID and change its 'keyValue' + regenerated_api_key = next(key for key in api_keys if key["id"] == api_key_id) + regenerated_api_key["keyValue"] = new_key_value + + responses.add( + method="POST", + url=f"https://connector.zscaler.net/api/v1/apiKeys/{api_key_id}/regenerate", + json=regenerated_api_key, + status=200, + ) + + resp = zcon.admin.regenerate_api_key(api_key_id) + assert isinstance(resp, dict) + assert resp["id"] == regenerated_api_key["id"] + assert resp["keyValue"] == new_key_value diff --git a/tests/zcon/test_zcon_config.py b/tests/zcon/test_zcon_config.py new file mode 100644 index 0000000..8f48ea0 --- /dev/null +++ b/tests/zcon/test_zcon_config.py @@ -0,0 +1,57 @@ +import pytest +import responses +from box import Box + + +@pytest.fixture(name="activate_status") +def fixture_activate_status(): + return {"status": 200} + + +@pytest.fixture(name="status_data") +def fixture_status_data(): + return { + "status": "Active", + } + + +@responses.activate +def test_activate_min_args(zcon, activate_status): + responses.add( + method="POST", + url="https://connector.zscaler.net/api/v1/ecAdminActivateStatus/activate", + json=activate_status, + status=200, + ) + + resp = zcon.config.activate() + assert isinstance(resp, Box) + assert resp["status"] == 200 + + +@responses.activate +def test_force_activate(zcon, activate_status): + responses.add( + method="POST", + url="https://connector.zscaler.net/api/v1/ecAdminActivateStatus/forcedActivate", + json=activate_status, + status=200, + ) + + resp = zcon.config.activate(force=True) + assert isinstance(resp, Box) + assert resp["status"] == 200 + + +@responses.activate +def test_get_status(zcon, status_data): + responses.add( + method="GET", + url="https://connector.zscaler.net/api/v1/ecAdminActivateStatus", + json=status_data, + status=200, + ) + + resp = zcon.config.get_status() + assert isinstance(resp, Box) + assert resp["status"] == "Active" diff --git a/tests/zcon/test_zcon_connectors.py b/tests/zcon/test_zcon_connectors.py new file mode 100644 index 0000000..b1602ca --- /dev/null +++ b/tests/zcon/test_zcon_connectors.py @@ -0,0 +1,91 @@ +import pytest +import responses + + +@pytest.fixture +def connector_groups_list(): + return [ + { + "id": 1, + "name": "GroupA", + "ec_vms": [ + { + "id": 10, + "name": "GroupA-VM1", + "ec_instances": [{"id": 100, "name": "Instance1"}, {"id": 101, "name": "Instance2"}], + }, + {"id": 11, "name": "GroupA-VM2", "ec_instances": []}, + ], + }, + {"id": 2, "name": "GroupB", "ec_vms": [{"id": 20, "name": "GroupB-VM1", "ec_instances": []}]}, + ] + + +@responses.activate +def test_list_groups(zcon, connector_groups_list): + responses.add( + method="GET", + url="https://connector.zscaler.net/api/v1/ecgroup", + json=connector_groups_list, + status=200, + ) + + resp = zcon.connectors.list_groups() + assert isinstance(resp, list) + assert len(resp) == 2 + assert resp[0]["id"] == 1 + assert resp[0]["name"] == "GroupA" + assert resp[0]["ec_vms"][0]["id"] == 10 + + +@responses.activate +def test_get_group(zcon, connector_groups_list): + connector_group = connector_groups_list[0] + responses.add( + method="GET", + url="https://connector.zscaler.net/api/v1/ecgroup/1", + json=connector_group, + status=200, + ) + + resp = zcon.connectors.get_group("1") + assert isinstance(resp, dict) + assert resp["id"] == 1 + assert resp["name"] == "GroupA" + assert resp["ec_vms"][0]["ec_instances"][0]["id"] == 100 + + +@responses.activate +def test_get_vm(zcon, connector_groups_list): + # Mock the API call to get a specific VM + responses.add( + method="GET", + url="https://connector.zscaler.net/api/v1/ecgroup/1/vm/10", + json=connector_groups_list[0]["ec_vms"][0], + status=204, + ) + + # Execute the function and get the result + result = zcon.connectors.get_vm("1", "10") + + # Verify the response + assert result["id"] == connector_groups_list[0]["ec_vms"][0]["id"] + assert result["name"] == connector_groups_list[0]["ec_vms"][0]["name"] + + +@responses.activate +def test_delete_vm(zcon): + # Mock the API call to delete a specific VM + group_id = "1" + vm_id = "10" + responses.add( + method="DELETE", + url=f"https://connector.zscaler.net/api/v1/ecgroup/{group_id}/vm/{vm_id}", + status=204, + ) + + # Execute the function and get the result + status_code = zcon.connectors.delete_vm(group_id, vm_id) + + # Verify the response + assert status_code == 204 diff --git a/tests/zcon/test_zcon_locations.py b/tests/zcon/test_zcon_locations.py new file mode 100644 index 0000000..1fe1a46 --- /dev/null +++ b/tests/zcon/test_zcon_locations.py @@ -0,0 +1,201 @@ +import pytest +import responses +from box import BoxList +from responses.matchers import json_params_matcher + + +@pytest.fixture(name="location_list") +def fixture_location_list(): + return BoxList( + [ + { + "id": 10000001, + "name": "FakeLocation1", + "non_editable": False, + "parent_id": 0, + "up_bandwidth": 1000, + "dn_bandwidth": 1000, + "country": "AUSTRALIA", + "state": "NSW", + "language": "English", + "tz": "AUSTRALIA_SYDNEY", + "auth_required": True, + "ssl_scan_enabled": True, + "dynamiclocation_groups": [{"id": 101, "name": "FakeGroup1"}], + "profile": "CORPORATE", + }, + { + "id": 10000002, + "name": "FakeLocation2", + "non_editable": True, + "parent_id": 1, + "up_bandwidth": 2000, + "dn_bandwidth": 2000, + "country": "AUSTRALIA", + "state": "VIC", + "language": "English", + "tz": "AUSTRALIA_MELBOURNE", + "auth_required": False, + "ssl_scan_enabled": False, + "dynamiclocation_groups": [{"id": 102, "name": "FakeGroup2"}], + "profile": "SERVER", + }, + ] + ) + + +@pytest.fixture(name="location_template_list") +def fixture_location_template_list(): + return [ + { + "id": 111111, + "name": "Mock Location 1", + "desc": "Mock description 1", + "template": { + "xff_forward_enabled": False, + "auth_required": True, + }, + "editable": False, + "last_mod_time": 1671850480, + }, + { + "id": 222222, + "name": "Mock Location 2", + "desc": "Mock description 2", + "template": { + "xff_forward_enabled": True, + "auth_required": False, + }, + "editable": True, + "last_mod_uid": {"id": 666666, "name": "mock-admin@11111.zscaler.net"}, + "last_mod_time": 1694482952, + }, + ] + + +@responses.activate +def test_list_locations(zcon, location_list): + responses.add( + method="GET", + url="https://connector.zscaler.net/api/v1/location", + json=location_list, + status=200, + ) + + resp = zcon.locations.list_locations() + assert isinstance(resp, BoxList) + assert len(resp) == 2 + assert resp[0]["id"] == 10000001 + assert resp[1]["id"] == 10000002 + + +@responses.activate +def test_get_location(zcon, location_list): + location_id = "10000001" + responses.add( + method="GET", + url=f"https://connector.zscaler.net/api/v1/adminRoles/{location_id}", + json=location_list[0], + status=200, + ) + + resp = zcon.locations.get_location(location_id) + assert resp["id"] == 10000001 + assert resp["name"] == "FakeLocation1" + + +@responses.activate +def test_list_location_templates(zcon, location_template_list): + responses.add( + method="GET", + url="https://connector.zscaler.net/api/v1/locationTemplate", + json=location_template_list, + status=200, + ) + + resp = zcon.locations.list_location_templates() + assert isinstance(resp, list) + assert len(resp) == 2 + assert resp[0]["id"] == 111111 + assert resp[1]["id"] == 222222 + + +@responses.activate +def test_add_location_template(zcon): + # The template to add + name = "New Mock Name" + template = {"surrogate_ip": True, "template_prefix": "office", "auth_required": True} + description = "New mock description" + + # Stub for the POST request to add a new location template + responses.add( + method="POST", + url="https://connector.zscaler.net/api/v1/locationTemplate", + json={"name": name, "template": template, "desc": description}, + match=[ + json_params_matcher( + { + "name": name, + "template": { + "surrogateIP": template["surrogate_ip"], + "templatePrefix": template["template_prefix"], + "authRequired": template["auth_required"], + }, + "desc": description, + } + ) + ], + status=201, + ) + + # Add location template + added_template = zcon.locations.add_location_template(name, template=template, description=description) + + # Verify the response + assert added_template["name"] == name + assert added_template["template"] == template + assert added_template["desc"] == description + + +@responses.activate +def test_update_location_template(zcon, location_template_list): + template_id = "111111" + + # Stub for the GET request to retrieve existing template + responses.add( + method="GET", + url=f"https://connector.zscaler.net/api/v1/locationTemplate/{template_id}", + json=location_template_list[1], + status=200, + ) + + # Stub for the PUT request to update template + responses.add( + method="PUT", + url=f"https://connector.zscaler.net/api/v1/locationTemplate/{template_id}", + json=location_template_list[1], + status=200, + ) + + # Update location template with the name "New Mock Name" + updated_template = zcon.locations.update_location_template( + template_id, name="Mock Location 2", description="Mock description 2" + ) + assert updated_template["name"] == "Mock Location 2" + assert updated_template["desc"] == "Mock description 2" + + +@responses.activate +def test_delete_location_template(zcon): + template_id = "411908" + + # Stub for the DELETE request + responses.add( + method="DELETE", + url=f"https://connector.zscaler.net/api/v1/locationTemplate/{template_id}", + status=204, + ) + + # Delete location template + status_code = zcon.locations.delete_location_template(template_id) + assert status_code == 204 diff --git a/tests/zcon/test_zcon_session.py b/tests/zcon/test_zcon_session.py new file mode 100644 index 0000000..81e8767 --- /dev/null +++ b/tests/zcon/test_zcon_session.py @@ -0,0 +1,19 @@ +import responses + + +@responses.activate +def test_create(zcon, session): + responses.add( + responses.POST, + url="https://connector.zscaler.net/api/v1/auth", + json=session, + status=200, + ) + + resp = zcon.session.create(api_key="test1234567890", username="test@example.com", password="hunter2") + + assert isinstance(resp, dict) + assert resp.auth_type == "ADMIN_LOGIN" + assert resp.obfuscate_api_key is False + assert resp.password_expiry_time == 0 + assert resp.password_expiry_days == 0 diff --git a/tests/zia/test_apptotal.py b/tests/zia/test_apptotal.py new file mode 100644 index 0000000..2e59664 --- /dev/null +++ b/tests/zia/test_apptotal.py @@ -0,0 +1,27 @@ +import responses +from box import Box + + +@responses.activate +def test_get_app(zia): + app_info = {"id": "12345", "name": "AppName", "verbose": False} + responses.add( + method="GET", url="https://zsapi.zscaler.net/api/v1/apps/app?app_id=12345&verbose=False", json=app_info, status=200 + ) + resp = zia.apptotal.get_app(app_id="12345", verbose=False) + assert isinstance(resp, Box) + assert resp.id == "12345" + assert resp.name == "AppName" + + +@responses.activate +def test_scan_app(zia): + scan_status = { + "id": "12345", + "status": "Scanned", + } + responses.add(method="POST", url="https://zsapi.zscaler.net/api/v1/apps/app", json=scan_status, status=200) + resp = zia.apptotal.scan_app(app_id="12345") + assert isinstance(resp, Box) + assert resp.id == "12345" + assert resp.status == "Scanned" diff --git a/tests/zia/test_cloud_apps.py b/tests/zia/test_cloud_apps.py new file mode 100644 index 0000000..536d797 --- /dev/null +++ b/tests/zia/test_cloud_apps.py @@ -0,0 +1,173 @@ +import pytest +import responses +from box import BoxList +from responses import matchers + + +@pytest.fixture +def shadow_it_report(): + return ( + 'Administrator,"admin@example.com"\n' + 'Report Created,"Nov 13, 2023 6:26:52 AM AEDT"\n' + 'Start Time,"Nov 11, 2023 11:00:00 AM AEDT"\n' + 'End Time,"Nov 12, 2023 11:00:00 AM AEDT"\n' + 'Vendor,"zscaler.net"\n' + "No.,Application,Application Category,Application Status,Application Risk Index,Upload Bytes,Download Bytes," + "Total Bytes,Users,Locations,Notes,Integrations,Integration Risks,Certifications,Poor Terms of Service," + "Data Breaches in Last 3 Years,Source IP Restriction,MFA Support,Admin Audit Logs,File Sharing," + "Password Strength,SSL Pinned,Data Encryption in Transit,Evasive,HTTP Security Header Support," + "DNS CAA Policy,Weak Cipher Support,Valid SSL Certificate,Published CVE Vulnerability,SSL Cert Key Size," + "Vulnerable to Heartbleed,Vulnerable to Poodle,Vulnerable to Logjam,Support for WAF,Remote Access Screen " + "Sharing," + "Vulnerability Disclosure Policy,Sender Policy Framework,DomainKeys Identified Mail," + "Domain-Based Message Authentication,Malware Scanning Content\n" + ) + + +@pytest.fixture(name="cloud_apps") +def fixture_cloud_apps(): + return [{"id": 1, "name": "App1"}, {"id": 2, "name": "App2"}] + + +@pytest.fixture(name="custom_tags") +def fixture_custom_tags(): + return [{"id": 101, "name": "Tag1"}, {"id": 102, "name": "Tag2"}] + + +@responses.activate +def test_export_shadow_it_csv(zia, shadow_it_report): + responses.add( + method="POST", + url="https://zsapi.zscaler.net/api/v1/shadowIT/applications/USER/exportCsv", + body=shadow_it_report, + status=200, + ) + + report_csv = zia.cloud_apps.export_shadow_it_csv(application="ExampleApp", entity="USER") + assert report_csv == shadow_it_report + + +@responses.activate +def test_export_shadow_it_csv_with_id_filters(zia, shadow_it_report): + # Setup + application = "test_app" + entity = "USER" + duration = "LAST_7_DAYS" + user_ids = ["123", "456"] + location_ids = ["789", "101"] + department_ids = ["112", "113"] + + # Mocking the API response + responses.add( + method="POST", + url=f"https://zsapi.zscaler.net/api/v1/shadowIT/applications/{entity}/exportCsv", + json=shadow_it_report, + status=200, + ) + + # Calling the method with additional id filters + resp = zia.cloud_apps.export_shadow_it_csv( + application=application, + entity=entity, + duration=duration, + users=user_ids, + locations=location_ids, + departments=department_ids, + ) + + # Assertions + assert isinstance(resp, str) + assert ( + responses.calls[0].request.body.decode("utf-8") + == '{"application": "test_app", "duration": "LAST_7_DAYS", "users": [{"id": "123"}, {"id": "456"}], ' + '"locations": [{"id": "789"}, {"id": "101"}], "departments": [{"id": "112"}, {"id": "113"}]}' + ) + + +@responses.activate +def test_export_shadow_it_report(zia, shadow_it_report): + responses.add( + method="POST", + url="https://zsapi.zscaler.net/api/v1/shadowIT/applications/export", + body=shadow_it_report, + status=200, + ) + + report = zia.cloud_apps.export_shadow_it_report() + assert report == shadow_it_report + + +@responses.activate +def test_list_apps(zia, cloud_apps): + responses.add( + method="GET", + url="https://zsapi.zscaler.net/api/v1/cloudApplications/lite", + json=cloud_apps, + status=200, + ) + + resp = zia.cloud_apps.list_apps() + + assert isinstance(resp, BoxList) + assert len(resp) == 2 + assert resp[0].id == 1 + assert resp[0].name == "App1" + + +@responses.activate +def test_list_custom_tags(zia, custom_tags): + responses.add( + method="GET", + url="https://zsapi.zscaler.net/api/v1/customTags", + json=custom_tags, + status=200, + ) + + resp = zia.cloud_apps.list_custom_tags() + + assert isinstance(resp, BoxList) + assert len(resp) == 2 + assert resp[0].id == 101 + assert resp[0].name == "Tag1" + + +@responses.activate +def test_bulk_update(zia): + sanction_state = "sanctioned" + api_sanction_state = "SANCTIONED" + application_ids = ["12345"] + custom_tag_ids = ["67890"] + payload = { + "sanctionedState": api_sanction_state, + "applicationIds": application_ids, + "customTags": [{"id": tag_id} for tag_id in custom_tag_ids], + } + + # Mock response + responses.add( + method="PUT", + url="https://zsapi.zscaler.net/api/v1/cloudApplications/bulkUpdate", + status=204, + match=[matchers.json_params_matcher(payload)], + ) + + # Call the method + resp = zia.cloud_apps.bulk_update(sanction_state, application_ids=application_ids, custom_tag_ids=custom_tag_ids) + + # Assertions + assert isinstance(resp, int) + assert resp == 204 + + +def test_bulk_update_invalid_sanction_state(zia): + # Set up an invalid sanction state + invalid_sanction_state = "not_valid_state" + application_ids = ["12345"] + + with pytest.raises(ValueError) as excinfo: + zia.cloud_apps.bulk_update(invalid_sanction_state, application_ids=application_ids) + + assert ( + str(excinfo.value) + == f"Invalid sanction state: {invalid_sanction_state}. Accepted values are 'sanctioned', 'unsanctioned', or 'any'." + ) diff --git a/tests/zia/test_locations.py b/tests/zia/test_locations.py index a3415bb..96a71ac 100644 --- a/tests/zia/test_locations.py +++ b/tests/zia/test_locations.py @@ -1,6 +1,6 @@ import pytest import responses -from box import Box +from box import Box, BoxList from responses import matchers from tests.conftest import stub_sleep @@ -66,6 +66,22 @@ def fixture_sub_locations(): ] +@pytest.fixture(name="geo_locations") +def geo_location_data(): + return { + "city_geo_id": 5375480, + "state_geo_id": 5332921, + "latitude": 37.3897, + "longitude": -122.0832, + "city_name": "Mountain View", + "state_name": "California", + "country_name": "United States", + "country_code": "US", + "postal_code": "94041", + "continent_code": "NA", + } + + @responses.activate @stub_sleep def test_list_locations_with_one_page(zia, paginated_items): @@ -183,7 +199,7 @@ def test_get_location_by_id(zia, locations): def test_get_location_by_name_and_id(zia): # Passing location_id and location_name should result in a ValueError. with pytest.raises(ValueError): - resp = zia.locations.get_location(location_id="1", location_name="Test A") + zia.locations.get_location(location_id="1", location_name="Test A") @responses.activate @@ -267,7 +283,7 @@ def test_update_location(zia, locations): resp = zia.locations.update_location("1", name="Updated Test") - assert isinstance(resp, dict) + assert isinstance(resp, Box) assert resp.id == 1 assert resp.name == "Updated Test" @@ -381,3 +397,55 @@ def test_list_sublocations(zia, sub_locations): assert isinstance(resp, list) assert len(resp) == 2 assert resp[0].id == 1 + + +@responses.activate +def test_get_geo_by_coordinates(zia, geo_locations): + responses.add( + responses.GET, + url="https://zsapi.zscaler.net/api/v1/region/byGeoCoordinates", + json=geo_locations, + status=200, + ) + resp = zia.locations.get_geo_by_coordinates(37, -122) + assert isinstance(resp, Box) + assert resp.city_name == "Mountain View" + + +@responses.activate +def test_get_geo_by_ip(zia, geo_locations): + responses.add( + responses.GET, + url="https://zsapi.zscaler.net/api/v1/region/byIPAddress/8.8.8.8", + json=geo_locations, + status=200, + ) + resp = zia.locations.get_geo_by_ip("8.8.8.8") + assert isinstance(resp, Box) + assert resp.city_name == "Mountain View" + + +@stub_sleep +@responses.activate +def test_list_cities_by_name(zia): + list_cities_data = [ + {"city": "San Jose", "state": "CA", "country": "US"}, + {"city": "San Francisco", "state": "CA", "country": "US"}, + ] + + responses.add( + responses.GET, + url="https://zsapi.zscaler.net/api/v1/region/search?prefix=San&page=1", + json=list_cities_data, + status=200, + ) + responses.add( + responses.GET, + url="https://zsapi.zscaler.net/api/v1/region/search?prefix=San&page=2", + json=[], + status=200, + ) + resp = zia.locations.list_cities_by_name(prefix="San") + assert isinstance(resp, BoxList) + assert len(resp) == 2 + assert resp[0].city == "San Jose" diff --git a/tests/zia/test_traffic.py b/tests/zia/test_traffic.py index ee842d2..2c57c51 100644 --- a/tests/zia/test_traffic.py +++ b/tests/zia/test_traffic.py @@ -215,6 +215,14 @@ def fixture_vpn_credentials(): ] +@pytest.fixture(name="ipv6_prefixes") +def fixture_ipv64_prefixes(): + return [ + {"id": 0, "name": "sample1", "prefixMask": "mask1", "dnsPrefix": True}, + {"id": 1, "name": "sample2", "prefixMask": "mask2", "dnsPrefix": False}, + ] + + @responses.activate @stub_sleep def test_list_gre_tunnels(zia, gre_tunnels): @@ -508,8 +516,8 @@ def test_get_vpn_credential_by_fqdn(zia, vpn_credentials): def test_get_vpn_credential_error(zia): - with pytest.raises(Exception) as e_info: - resp = zia.traffic.get_vpn_credential("1", "test@example.com") + with pytest.raises(Exception): + zia.traffic.get_vpn_credential("1", "test@example.com") @responses.activate @@ -575,3 +583,54 @@ def test_list_vips(zia, vips): assert isinstance(resp, BoxList) assert len(resp) == 2 assert resp[0].data_center == "TESTA" + + +@responses.activate +def test_list_dns64_prefixes(zia, ipv6_prefixes): + responses.add( + responses.GET, + url="https://zsapi.zscaler.net/api/v1/ipv6config/dns64prefix", + json=ipv6_prefixes, + status=200, + ) + resp = zia.traffic.list_dns64_prefixes() + assert isinstance(resp, BoxList) + assert len(resp) == 2 + + +@stub_sleep +@responses.activate +def test_list_nat64_prefixes(zia, ipv6_prefixes): + responses.add( + responses.GET, + url="https://zsapi.zscaler.net/api/v1/ipv6config/nat64prefix?page=1", + json=ipv6_prefixes, + status=200, + ) + responses.add( + responses.GET, + url="https://zsapi.zscaler.net/api/v1/ipv6config/nat64prefix?page=2", + json=[], + status=200, + ) + resp = zia.traffic.list_nat64_prefixes() + assert isinstance(resp, BoxList) + assert len(resp) == 2 + + +@responses.activate +def test_list_gre_ip_addresses(zia): + gre_ip_addresses_data = [ + {"ipAddress": "192.168.1.1", "greEnabled": True, "greTunnelIP": "10.0.0.1"}, + {"ipAddress": "192.168.1.2", "greEnabled": False, "greTunnelIP": "10.0.0.2"}, + ] + + responses.add( + responses.GET, + url="https://zsapi.zscaler.net/api/v1/orgProvisioning/ipGreTunnelInfo", + json=gre_ip_addresses_data, + status=200, + ) + resp = zia.traffic.list_gre_ip_addresses() + assert isinstance(resp, BoxList) + assert len(resp) == 2