From e72a46b842a40633cda8c11ea5c7f24913af1c58 Mon Sep 17 00:00:00 2001 From: Manuel Sommer Date: Mon, 25 Nov 2024 12:18:41 +0100 Subject: [PATCH] :tada: Implement Ruff --- .flake8 | 22 ------ .github/workflows/flake8.yml | 17 ----- .github/workflows/ruff.yml | 17 +++++ apicomponents/acl.py | 30 ++++---- apicomponents/analysis.py | 21 +++--- apicomponents/badge.py | 26 +++---- apicomponents/bom.py | 70 +++++++++--------- apicomponents/calculator.py | 7 +- apicomponents/configproperty.py | 41 ++++++----- apicomponents/cwe.py | 24 +++---- apicomponents/finding.py | 24 +++---- apicomponents/ldap.py | 44 ++++++------ apicomponents/license.py | 25 +++---- apicomponents/licensegroup.py | 60 +++++++--------- apicomponents/metrics.py | 86 ++++++++++------------ apicomponents/permission.py | 49 +++++++------ apicomponents/policy.py | 74 +++++++++---------- apicomponents/project.py | 42 +++++------ apicomponents/projectProperty.py | 28 ++++---- apicomponents/repository.py | 57 ++++++++------- apicomponents/search.py | 44 +++++------- apicomponents/service.py | 66 +++++++++-------- apicomponents/team.py | 111 ++++++++++++++--------------- apicomponents/user.py | 12 ++-- apicomponents/violation.py | 32 ++++----- apicomponents/violationAnalysis.py | 20 +++--- apicomponents/vulnerability.py | 94 ++++++++++++------------ main.py | 25 +++---- requirements-lint.txt | 1 + ruff.toml | 106 +++++++++++++++++++++++++++ 30 files changed, 657 insertions(+), 618 deletions(-) delete mode 100644 .flake8 delete mode 100644 .github/workflows/flake8.yml create mode 100644 .github/workflows/ruff.yml create mode 100644 requirements-lint.txt create mode 100644 ruff.toml diff --git a/.flake8 b/.flake8 deleted file mode 100644 index 8bb0316..0000000 --- a/.flake8 +++ /dev/null @@ -1,22 +0,0 @@ -[flake8] -# Documentation for flake8 http://flake8.pycqa.org/en/3.1.1/user/index.html - -# we should not ignore these mistakes !!!!!!!! - -ignore = - # undefined file name excpetion - F821 - # Suppress - line too long (> 79 characters) - E501 - # Suppress - Function is too complex - C901 - - -exclude = - # No need to traverse our git directory - .git, - # There's no value in checking cache directories - __pycache__, - # This contains of branch that we don't want to check - # dev -max-complexity = 10 diff --git a/.github/workflows/flake8.yml b/.github/workflows/flake8.yml deleted file mode 100644 index b5f53ea..0000000 --- a/.github/workflows/flake8.yml +++ /dev/null @@ -1,17 +0,0 @@ -name: flake8 Lint - -on: [push, pull_request] - -jobs: - flake8-lint: - runs-on: ubuntu-latest - name: Lint - steps: - - name: Check out source repository - uses: actions/checkout@v3 - - name: Set up Python environment - uses: actions/setup-python@v4 - with: - python-version: "3.11" - - name: flake8 Lint - uses: py-actions/flake8@2014ef764424fd7699d615323c17836092bec9b9 diff --git a/.github/workflows/ruff.yml b/.github/workflows/ruff.yml new file mode 100644 index 0000000..c6b3f5f --- /dev/null +++ b/.github/workflows/ruff.yml @@ -0,0 +1,17 @@ +name: Ruff Linter + +on: [push, pull_request] + +jobs: + ruff-linting: + runs-on: ubuntu-latest + name: Ruff Lint + steps: + - name: Checkout + uses: actions/checkout@v4 + + - name: Install Ruff Linter + run: pip install -r requirements-lint.txt + + - name: Run Ruff Linter + run: ruff check --output-format=github . diff --git a/apicomponents/acl.py b/apicomponents/acl.py index ebb19df..b14510d 100644 --- a/apicomponents/acl.py +++ b/apicomponents/acl.py @@ -1,7 +1,8 @@ -class ACL(object): +class ACL: def put_acl(self, team, project): - """[Adds an ACL mapping] + """ + [Adds an ACL mapping] Args: team ([string]): [name of the team] @@ -13,18 +14,18 @@ def put_acl(self, team, project): if response.status_code == 200: return response.status_code - elif response.status_code == 401: + if response.status_code == 401: return (f"Unauthorized, {response.status_code}") - elif response.status_code == 404: + if response.status_code == 404: return (f"The UUID of the team or project could not be found, {response.status_code}") - elif response.status_code == 409: + if response.status_code == 409: return (f"A mapping with the same team and project already exists, {response.status_code}") - else: - return ((response.content).decode("UTF-8"), - response.status_code) + return ((response.content).decode("UTF-8"), + response.status_code) def get_acl(self, uuid, excludeInactive=False): - """[Returns the projects assigned to the specified team] + """ + [Returns the projects assigned to the specified team] Args: uuid ([string]): [The UUID of the team to retrieve mappings for] @@ -32,9 +33,9 @@ def get_acl(self, uuid, excludeInactive=False): response = self.session.get(self.apicall + f"/v1/acl/team/{uuid}?excludeInactive={excludeInactive}") if response.status_code == 200: return response.json() - elif response.status_code == 401: + if response.status_code == 401: return (f"Unauthorized, {response.status_code}") - elif response.status_code == 404: + if response.status_code == 404: return (f"The UUID of the team could not be found, {response.status_code}") def delete_acl(self, teamUuid, projectUuid): @@ -49,9 +50,8 @@ def delete_acl(self, teamUuid, projectUuid): self.apicall + f"/v1/acl/mapping/team/{teamUuid}/project/{projectUuid}") if response.status_code == 200: return ("successful operation") - elif response.status_code == 401: + if response.status_code == 401: return (f"Unauthorized, {response.status_code}") - elif response.status_code == 404: + if response.status_code == 404: return (f"The UUID of the team or project could not be found, {response.status_code}") - else: - return ((response.content).decode("UTF-8"), response.status_code) + return ((response.content).decode("UTF-8"), response.status_code) diff --git a/apicomponents/analysis.py b/apicomponents/analysis.py index 5bec5a3..49558a5 100644 --- a/apicomponents/analysis.py +++ b/apicomponents/analysis.py @@ -1,10 +1,11 @@ import json -class Analysis(object): +class Analysis: def get_analysis(self, project, component, vulnerability): - """Retrieves an analysis trail + """ + Retrieves an analysis trail Args: project (string): The UUID of the project @@ -12,15 +13,16 @@ def get_analysis(self, project, component, vulnerability): vulnerability (string): The UUID of the vulnerability Returns: - json: """ + json: + """ response = self.session.get(self.apicall + "/v1/analysis/", params={"project": project, "component": component, "vulnerability": vulnerability}) if response.status_code == 200: return response.json() - else: - return (f"{(response.content).decode('utf-8')}, {response.status_code}") + return (f"{(response.content).decode('utf-8')}, {response.status_code}") def record_analysis(self, project, component, vulnerability, suppressed=False): - """Retrieves an analysis trail + """ + Retrieves an analysis trail Args: project (string): The UUID of the project @@ -39,10 +41,9 @@ def record_analysis(self, project, component, vulnerability, suppressed=False): } ], "isSuppressed": true - } """ - + } + """ response = self.session.put(self.apicall + "/v1/analysis/", data=json.dump({"project": project, "component": component, "vulnerability": vulnerability, "suppressed": suppressed})) if response.status_code == 200: return response.json() - else: - return (f"{(response.content).decode('utf-8')}, {response.status_code}") + return (f"{(response.content).decode('utf-8')}, {response.status_code}") diff --git a/apicomponents/badge.py b/apicomponents/badge.py index bb74a00..775ef74 100644 --- a/apicomponents/badge.py +++ b/apicomponents/badge.py @@ -1,8 +1,9 @@ -class Badge(object): +class Badge: def get_badgeByname(self, name, version): # TODO : follow up on response of this functionality - """Returns current metrics for a specific project + """ + Returns current metrics for a specific project Args: name (string): The name of the project to query on @@ -15,18 +16,18 @@ def get_badgeByname(self, name, version): self.apicall + f"/v1/badge/vulns/project/{name}/{version}") if response.status_code == 200: return response.content - elif response.status_code == 401: + if response.status_code == 401: return (f"Unauthorized, {response.status_code}") - elif response.status_code == 404: + if response.status_code == 404: return (f"The project could not be found, {response.status_code}") - elif response.status_code == 204: + if response.status_code == 204: return (f"Badge support is disabled. No content will be returned, {response.status_code}") - else: - return ((response.content).decode("utf-8"), response.status_code) + return ((response.content).decode("utf-8"), response.status_code) def get_badgeByuuid(self, uuid): # TODO : follow up on response of this functionality - """Returns current metrics for a specific project + """ + Returns current metrics for a specific project Args: uuid: The uuid of the project. @@ -38,11 +39,10 @@ def get_badgeByuuid(self, uuid): self.apicall + f"/v1/badge/vulns/project/{uuid}") if response.status_code == 200: return response.content - elif response.status_code == 401: + if response.status_code == 401: return (f"Unauthorized, {response.status_code}") - elif response.status_code == 404: + if response.status_code == 404: return (f"The project could not be found, {response.status_code}") - elif response.status_code == 204: + if response.status_code == 204: return (f"Badge support is disabled. No content will be returned, {response.status_code}") - else: - return ((response.content).decode("utf-8"), response.status_code) + return ((response.content).decode("utf-8"), response.status_code) diff --git a/apicomponents/bom.py b/apicomponents/bom.py index fd02b98..415478b 100644 --- a/apicomponents/bom.py +++ b/apicomponents/bom.py @@ -2,67 +2,71 @@ import json -class Bom(object): +class Bom: def get_bom_token(self, uuid): - """ Determines if there are any tasks associated with the token that are being processed, or in the queue to be processed. + """ + Determines if there are any tasks associated with the token that are being processed, or in the queue to be processed. This endpoint is intended to be used in conjunction with uploading a supported BOM document. Upon upload, a token will be returned. The token can then be queried using this endpoint to determine if any tasks (such as vulnerability analysis) is being performed on the BOM. A value of true indicates processing is occurring. A value of false indicates that no processing is occurring for the specified token. However, a value of false also does not confirm the token is valid, only that no processing is associated with the specified token. Args: - uuid (string): The UUID of the token to query """ + uuid (string): The UUID of the token to query + """ response = self.session.get(self.apicall + f"/v1/bom/token/{uuid}") if response.status_code == 200: return response.json() - elif response.status_code == 401: + if response.status_code == 401: return (f"Unauthorized, {response.status_code}") - else: - return response.status_code + return response.status_code def get_bom_project(self, uuid, format="json"): - """Returns dependency metadata for a project in CycloneDX format + """ + Returns dependency metadata for a project in CycloneDX format Args: uuid (string): The UUID of the project to export format (str, optional): . Defaults to "json". However by default API is xml Returns: - xml or json: returns dependency metadata for a project in CycloneDX format in xml or json """ + xml or json: returns dependency metadata for a project in CycloneDX format in xml or json + """ response = self.session.get(self.apicall + f"/v1/bom/cyclonedx/project/{uuid}", params={"format": format}) if response.status_code == 200: return response.json() - elif response.status_code == 401: + if response.status_code == 401: return (f"Unauthorized, {response.status_code}") - elif response.status_code == 403: + if response.status_code == 403: return (f"Access to the specified project is forbidden, {response.status_code}") - elif response.status_code == 404: + if response.status_code == 404: return (f"Project not found, {response.status_code}") - else: - return ((response.content).decode("utf-8"), response.status_code) + return ((response.content).decode("utf-8"), response.status_code) def get_bom_component(self, uuid, format="json"): - """Returns dependency metadata for a component in CycloneDX format + """ + Returns dependency metadata for a component in CycloneDX format Args: uuid (string): The UUID of the component to export format (str, optional): . Defaults to "json". However by default API is xml Returns: - xml or json: returns dependency metadata for a component in CycloneDX format in xml or json """ + xml or json: returns dependency metadata for a component in CycloneDX format in xml or json + """ response = self.session.get(self.apicall + f"/v1/bom/cyclonedx/component/{uuid}", params={"format": format}) if response.status_code == 200: return response.json() - elif response.status_code == 401: + if response.status_code == 401: return (f"Unauthorized, {response.status_code}") - elif response.status_code == 403: + if response.status_code == 403: return (f"Access to the specified component is forbidden, {response.status_code}") - elif response.status_code == 404: + if response.status_code == 404: return (f"Component not found, {response.status_code}") - else: - return ((response.content).decode("utf-8"), response.status_code) + return ((response.content).decode("utf-8"), response.status_code) def post_bom(self, project, projectName, projectVersion, body, autoCreate=True): # TODO: refactor for formdata - """Upload a supported bill of material format document. Expects CycloneDX along and a valid project UUID. If a UUID is not specified then the projectName and projectVersion must be specified. Optionally, if autoCreate is specified and ‘true’ and the project does not exist, the project will be created. In this scenario, the principal making the request will additionally need the PORTFOLIO_MANAGEMENT or PROJECT_CREATION_UPLOAD permission. + """ + Upload a supported bill of material format document. Expects CycloneDX along and a valid project UUID. If a UUID is not specified then the projectName and projectVersion must be specified. Optionally, if autoCreate is specified and ‘true’ and the project does not exist, the project will be created. In this scenario, the principal making the request will additionally need the PORTFOLIO_MANAGEMENT or PROJECT_CREATION_UPLOAD permission. Args: project (string[formData]): project @@ -72,7 +76,8 @@ def post_bom(self, project, projectName, projectVersion, body, autoCreate=True): autoCreate (bool, optional): create project if it does not exist", response". Defaults to True. Returns: - response status code """ + response status code + """ data = dict() data["project"] = project data["projectName"] = projectName @@ -82,17 +87,17 @@ def post_bom(self, project, projectName, projectVersion, body, autoCreate=True): response = self.session.post(self.apicall + "/v1/bom", files=body) if response.status_code == 200: return ("successful operation") - elif response.status_code == 401: + if response.status_code == 401: return (f"Unauthorized, {response.status_code}") - elif response.status_code == 403: + if response.status_code == 403: return (f"Access to the specified project is forbidden, {response.status_code}") - elif response.status_code == 404: + if response.status_code == 404: return (f"Project not found, {response.status_code}") - else: - return ((response.content).decode("utf-8"), response.status_code) + return ((response.content).decode("utf-8"), response.status_code) def put_bom(self, project, body): - """Upload a supported bill of material format document. Expects CycloneDX along and a valid project UUID. If a UUID is not specified then the projectName and projectVersion must be specified. Optionally, if autoCreate is specified and ‘true’ and the project does not exist, the project will be created. In this scenario, the principal making the request will additionally need the PORTFOLIO_MANAGEMENT or PROJECT_CREATION_UPLOAD permission. + """ + Upload a supported bill of material format document. Expects CycloneDX along and a valid project UUID. If a UUID is not specified then the projectName and projectVersion must be specified. Optionally, if autoCreate is specified and ‘true’ and the project does not exist, the project will be created. In this scenario, the principal making the request will additionally need the PORTFOLIO_MANAGEMENT or PROJECT_CREATION_UPLOAD permission. Args: project (string): The UUID of the project @@ -110,11 +115,10 @@ def put_bom(self, project, body): self.apicall + "/v1/bom", data=json.dumps(data)) if response.status_code == 200: return ("successful operation") - elif response.status_code == 401: + if response.status_code == 401: return (f"Unauthorized, {response.status_code}") - elif response.status_code == 403: + if response.status_code == 403: return (f"Access to the specified project is forbidden, {response.status_code}") - elif response.status_code == 404: + if response.status_code == 404: return (f"Project not found, {response.status_code}") - else: - return ((response.content).decode("utf-8"), response.status_code) + return ((response.content).decode("utf-8"), response.status_code) diff --git a/apicomponents/calculator.py b/apicomponents/calculator.py index 3318f7c..c19d148 100644 --- a/apicomponents/calculator.py +++ b/apicomponents/calculator.py @@ -1,4 +1,4 @@ -class Calculator(object): +class Calculator: def get_calculator(self, cvss): """ @@ -10,7 +10,6 @@ def get_calculator(self, cvss): response = self.session.get(self.apicall + "/v1/calculator/cvss", params={"vector": cvss}) if response.status_code == 200: return response.json() - elif response.status_code == 401: + if response.status_code == 401: return (f"Unauthorized, {response.status_code}") - else: - return ((response.content).decode("UTF-8"), response.status_code) + return ((response.content).decode("UTF-8"), response.status_code) diff --git a/apicomponents/configproperty.py b/apicomponents/configproperty.py index b4debbc..1b2f8e6 100644 --- a/apicomponents/configproperty.py +++ b/apicomponents/configproperty.py @@ -1,7 +1,7 @@ import json -class ConfigProperty(object): +class ConfigProperty: def get_configProperty(self, pageSize=100): """ @@ -13,23 +13,23 @@ def get_configProperty(self, pageSize=100): config_list = list() pageNumber = 1 response = self.session.get(self.apicall + "/v1/configProperty", params={"pageSize": pageSize, "pageNumber": pageNumber}) - for config in range(0, len(response.json())): + for config in range(len(response.json())): config_list.append(response.json()[config] - 1) while len(response.json()) == pageSize: pageNumber += 1 response = self.session.get(self.apicall + "/v1/configProperty", params={ "pageSize": pageSize, "pageNumber": pageNumber}) - for config in range(0, len(response.json())): + for config in range(len(response.json())): config_list.append(response.json()[config] - 1) if response.status_code == 200: return config_list - elif response.status_code == 401: + if response.status_code == 401: return (f"Unauthorized, {response.status_code}") - else: - return ((response.content).decode("utf-8"), response.status_code) + return ((response.content).decode("utf-8"), response.status_code) def post_configProperty(self, body): - """Update a config property + """ + Update a config property Args: body (JSON): { @@ -46,15 +46,15 @@ def post_configProperty(self, body): response = self.session.post(self.apicall + "/v1/configProperty", data=body) if response.status_code == 200: return response.json() - elif response.status_code == 401: + if response.status_code == 401: return (f"Unauthorized, {response.status_code}") - elif response.status_code == 404: + if response.status_code == 404: return (f"The config property could not be found, {response.status_code}") - else: - return ((response.content).decode("utf-8"), response.status_code) + return ((response.content).decode("utf-8"), response.status_code) def post_configPropertyAggregate(self, groupName=None, propertyName=None, propertyValue=None, propertyType=None, description=None): - """Update a config property + """ + Update a config property Args: body (JSON): { @@ -71,22 +71,21 @@ def post_configPropertyAggregate(self, groupName=None, propertyName=None, proper data = { } if groupName is not None: - data['groupName'] = groupName + data["groupName"] = groupName if propertyName is not None: - data['propertyName'] = propertyName + data["propertyName"] = propertyName if propertyValue is not None: - data['propertyValue'] = propertyValue + data["propertyValue"] = propertyValue if propertyType is not None: - data['propertyType'] = propertyType + data["propertyType"] = propertyType if description is not None: - data['description'] = description + data["description"] = description response = self.session.post( self.apicall + "/v1/configProperty/aggregate", data=json.dumps([data])) if response.status_code == 200: return response.json() - elif response.status_code == 401: + if response.status_code == 401: return (f"Unauthorized, {response.status_code}") - elif response.status_code == 404: + if response.status_code == 404: return (f"One or more config properties could not be found, {response.status_code}") - else: - return ((response.content).decode("utf-8"), response.status_code) + return ((response.content).decode("utf-8"), response.status_code) diff --git a/apicomponents/cwe.py b/apicomponents/cwe.py index 6d44507..f7d7d6a 100644 --- a/apicomponents/cwe.py +++ b/apicomponents/cwe.py @@ -1,7 +1,8 @@ -class CWE(object): +class CWE: def get_cwe(self, pageSize=100): - """Returns a list of all CWEs + """ + Returns a list of all CWEs Args: pageSize (int, optional): size of the page. Defaults to 100. @@ -12,22 +13,22 @@ def get_cwe(self, pageSize=100): cwe_list = list() pageNumber = 1 response = self.session.get(self.apicall + "/v1/cwe", params={"pageSize": pageSize, "pageNumber": pageNumber}) - for cwe in range(0, len(response.json())): + for cwe in range(len(response.json())): cwe_list.append(response.json()[cwe - 1]) while len(response.json()) == pageSize: pageNumber += 1 response = self.session.get(self.apicall + "/v1/cwe", params={"pageSize": pageSize, "pageNumber": pageNumber}) - for cwe in range(0, len(response.json())): + for cwe in range(len(response.json())): cwe_list.append(response.json()[cwe - 1]) if response.status_code == 200: return cwe_list - elif response.status_code == 401: + if response.status_code == 401: return (f"Unauthorized, {response.status_code}") - else: - return ((response.content).decode("utf-8"), response.status_code) + return ((response.content).decode("utf-8"), response.status_code) def get_cweById(self, cweId): - """ Returns a specific CWE + """ + Returns a specific CWE Args: cweId (int32): The CWE ID of the CWE to retrieve @@ -41,9 +42,8 @@ def get_cweById(self, cweId): response = self.session.get(self.apicall + f"/v1/cwe/{cweId}") if response.status_code == 200: return response.json() - elif response.status_code == 401: + if response.status_code == 401: return ("Unauthorized ", response.status_code) - elif response.status_code == 404: + if response.status_code == 404: return (f"The CWE could not be found, {response.status_code}") - else: - return ((response.content).decode("utf-8"), response.status_code) + return ((response.content).decode("utf-8"), response.status_code) diff --git a/apicomponents/finding.py b/apicomponents/finding.py index c52279c..3fa3a16 100644 --- a/apicomponents/finding.py +++ b/apicomponents/finding.py @@ -1,4 +1,4 @@ -class Finding(object): +class Finding: def get_project_finding(self, uuid, suppressed=False, pageSize=100): """ @@ -10,23 +10,22 @@ def get_project_finding(self, uuid, suppressed=False, pageSize=100): pageNumber = 1 response = self.session.get(self.apicall + f"/v1/finding/project/{uuid}?suppressed={suppressed}", params={ "pageSize": pageSize, "pageNumber": pageNumber}) - for finding in range(0, len(response.json())): + for finding in range(len(response.json())): finding_list.append(response.json()[finding - 1]) while len(response.json()) == pageSize: response = self.session.get(self.apicall + f"/v1/finding/project/{uuid}?suppressed={suppressed}", params={ "pageSize": pageSize, "pageNumber": pageNumber}) - for finding in range(0, len(response.json())): + for finding in range(len(response.json())): finding_list.append(response.json()[finding - 1]) if response.status_code == 200: return finding_list - elif response.status_code == 401: + if response.status_code == 401: return (f"Unauthorized, {response.status_code}") - elif response.status_code == 403: + if response.status_code == 403: return (f"Access to the specified project is forbidden, {response.status_code}") - elif response.status_code == 404: + if response.status_code == 404: return (f"Project not found, {response.status_code}") - else: - return ((response.content).decode("utf-8"), response.status_code) + return ((response.content).decode("utf-8"), response.status_code) def export_findings(self, uuid): """ @@ -37,11 +36,10 @@ def export_findings(self, uuid): response = self.session.get(self.apicall + f"/v1/findings/project/{uuid}/export") if response.status_code == 200: return response.json() - elif response.status_code == 401: + if response.status_code == 401: return (f"Unauthorized, {response.status_code}") - elif response.status_code == 403: + if response.status_code == 403: return (f"Access to the specified project is forbidden, {response.status_code}") - elif response.status_code == 404: + if response.status_code == 404: return (f"Project not found, {response.status_code}") - else: - return ((response.content).decode("utf-8"), response.status_code) + return ((response.content).decode("utf-8"), response.status_code) diff --git a/apicomponents/ldap.py b/apicomponents/ldap.py index 8bcc6db..c6e8024 100644 --- a/apicomponents/ldap.py +++ b/apicomponents/ldap.py @@ -1,7 +1,7 @@ import json -class LDAP(object): +class LDAP: def list_ldapgroups(self, pageSize=100): """ @@ -10,21 +10,21 @@ def list_ldapgroups(self, pageSize=100): """ ldaplist = list() pageNumber = 1 - response = self.session.get(self.apicall + "/v1/ldap/groups", params={'pageSize': pageSize, 'pageNumber': pageNumber}) - for ldap in range(0, len(response.json())): + response = self.session.get(self.apicall + "/v1/ldap/groups", params={"pageSize": pageSize, "pageNumber": pageNumber}) + for ldap in range(len(response.json())): ldaplist.append(response.json()[ldap - 1]) while len(response.json()) == pageSize: pageNumber += 1 - response = self.session.get(self.apicall + "/v1/ldap/groups", params={'pageSize': pageSize, 'pageNumber': pageNumber}) - for ldap in range(0, len(response.json())): + response = self.session.get(self.apicall + "/v1/ldap/groups", params={"pageSize": pageSize, "pageNumber": pageNumber}) + for ldap in range(len(response.json())): ldaplist.append(response.json()[ldap - 1]) if response.status_code == 200: return ldaplist - else: - return (f"{(response.content).decode('utf-8')}, {response.status_code}") + return (f"{(response.content).decode('utf-8')}, {response.status_code}") def get_ldapteam(self, uuid): - """Returns the DNs of all groups mapped to the specified team + """ + Returns the DNs of all groups mapped to the specified team Args: uuid (string): The UUID of the team to retrieve mappings for. @@ -32,15 +32,15 @@ def get_ldapteam(self, uuid): response = self.session.get(self.apicall + f"/v1/ldap/team/{uuid}") if response.status_code == 200: return response.json() - elif response.status_code == 401: + if response.status_code == 401: return (f"Unauthorized , {response.status_code}") - elif response.status_code == 404: + if response.status_code == 404: return (f"The UUID of the team could not be found , {response.status_code}") - else: - return (f"{(response.content).decode('utf-8')}, {response.status_code}") + return (f"{(response.content).decode('utf-8')}, {response.status_code}") def create_ldap(self, team, dn): - """Adds a mapping + """ + Adds a mapping Args: team (string): The UUID of the team @@ -48,19 +48,18 @@ def create_ldap(self, team, dn): """ data = { "team": team, - "dn": dn + "dn": dn, } response = self.session.put(self.apicall + "/v1/ldap/mapping", data=json.dumps(data)) if response.status_code == 200: return response.json() - elif response.status_code == 401: + if response.status_code == 401: return (f"Unauthorized , {response.status_code}") - elif response.status_code == 404: + if response.status_code == 404: return (f"The UUID of the team could not be found, {response.status_code}") - elif response.status_code == 409: + if response.status_code == 409: return (f"A mapping with the same team and dn already exists, {response.status_code}") - else: - return (f"{(response.content).decode('utf-8')}, {response.status_code}") + return (f"{(response.content).decode('utf-8')}, {response.status_code}") def delete_ldap(self, uuid): """ @@ -72,9 +71,8 @@ def delete_ldap(self, uuid): response = self.session.delete(self.apicall + f"/v1/ldap/mapping/{uuid}") if response.status_code == 204: return (f"successful operation, {response.status_code}") - elif response.status_code == 401: + if response.status_code == 401: return (f"Unauthorized , {response.status_code}") - elif response.status_code == 404: + if response.status_code == 404: return (f"The UUID of the mapping could not be found, {response.status_code}") - else: - return (f"{(response.content).decode('utf-8')}, {response.status_code}") + return (f"{(response.content).decode('utf-8')}, {response.status_code}") diff --git a/apicomponents/license.py b/apicomponents/license.py index b02ef28..00ade75 100644 --- a/apicomponents/license.py +++ b/apicomponents/license.py @@ -1,4 +1,4 @@ -class License(object): +class License: def get_list_license(self, pageSize=100): """Returns a list of all licenses with complete metadata for each license""" @@ -6,18 +6,17 @@ def get_list_license(self, pageSize=100): pageNumber = 1 response = self.session.get( self.apicall + "/v1/license", params={"pageSize": pageSize, "pageNumber": pageNumber}) - for lice in range(0, len(response.json())): + for lice in range(len(response.json())): license_list.append(response.json()[lice - 1]) while len(response.json()) == pageSize: pageNumber += 1 response = self.session.get( self.apicall + "/v1/license", params={"pageSize": pageSize, "pageNumber": pageNumber}) - for lice in range(0, len(response.json())): + for lice in range(len(response.json())): license_list.append(response.json()[lice - 1]) if response.status_code == 200: return license_list - else: - return ("Unauthorized ", response.status_code) + return ("Unauthorized ", response.status_code) def get_license(self, licenseId): """ @@ -27,12 +26,11 @@ def get_license(self, licenseId): response = self.session.get(self.apicall + f"/v1/license/{licenseId}") if response.status_code == 200: return response.json() - elif response.status_code == 401: + if response.status_code == 401: return ("Unauthorized ", response.status_code) - elif response.status_code == 404: + if response.status_code == 404: return ("The license could not be found", response.status_code) - else: - return response.status_code + return response.status_code def get_license_concise(self, pageSize=100): """Returns a concise listing of all licenses""" @@ -40,17 +38,16 @@ def get_license_concise(self, pageSize=100): pageNumber = 1 response = self.session.get( self.apicall + "/v1/license/concise", params={"pageSize": pageSize, "pageNumber": pageNumber}) - for lice in range(0, len(response.json())): + for lice in range(len(response.json())): license_list.append(response.json()[lice - 1]) while len(response.json()) == pageSize: pageNumber += 1 response = self.session.get( self.apicall + "/v1/license/concise", params={"pageSize": pageSize, "pageNumber": pageNumber}) - for lice in range(0, len(response.json())): + for lice in range(len(response.json())): license_list.append(response.json()[lice - 1]) if response.status_code == 200: return license_list - elif response.status_code == 401: + if response.status_code == 401: return (f"Unauthorized, {response.status_code}") - else: - return ((response.content).decode("utf-8"), response.status_code) + return ((response.content).decode("utf-8"), response.status_code) diff --git a/apicomponents/licensegroup.py b/apicomponents/licensegroup.py index 669d5c2..ad75b4d 100644 --- a/apicomponents/licensegroup.py +++ b/apicomponents/licensegroup.py @@ -1,59 +1,56 @@ import json -class LicenseGroup(object): +class LicenseGroup: def list_licensegroups(self, pageSize=100): grouplist = list() pageNumber = 1 response = self.session.get( - self.apicall + "/v1/licenseGroup", params={'pageSize': pageSize, 'pageNumber': pageNumber}) - for group in range(0, len(response.json())): + self.apicall + "/v1/licenseGroup", params={"pageSize": pageSize, "pageNumber": pageNumber}) + for group in range(len(response.json())): grouplist.append(response.json()[group - 1]) while len(response.json()) == pageSize: pageNumber += 1 response = self.session.get( - self.apicall + "/v1/licenseGroup", params={'pageSize': pageSize, 'pageNumber': pageNumber}) - for group in range(0, len(response.json())): + self.apicall + "/v1/licenseGroup", params={"pageSize": pageSize, "pageNumber": pageNumber}) + for group in range(len(response.json())): grouplist.append(response.json()[group - 1]) if response.status_code == 200: return grouplist - elif response.status_code == 401: + if response.status_code == 401: return (f"Unauthorized, {response.status_code}") - else: - return (f"{(response.content).decode('utf-8')}, {response.status_code}") + return (f"{(response.content).decode('utf-8')}, {response.status_code}") def get_licensegroup(self, uuid): response = self.session.get(self.apicall + f"/v1/licenseGroup/{uuid}") if response.status_code == 200: return response.json() - elif response.status_code == 401: + if response.status_code == 401: return (f"Unauthorized, {response.status_code}") - else: - return (f"{(response.content).decode('utf-8')}, {response.status_code}") + return (f"{(response.content).decode('utf-8')}, {response.status_code}") def delete_licensegroup(self, uuid): - """Delete a license group + """ + Delete a license group Args: uuid ([type]): The UUID of the license group to delete """ response = self.session.delete(self.apicall + f"/v1/licenseGroup/{uuid}") if response.status_code == 200: return "Successful operation" - elif response.status_code == 401: + if response.status_code == 401: return (f"Unauthorized, {response.status_code}") - else: - return (f"{(response.content).decode('utf-8')}, {response.status_code}") + return (f"{(response.content).decode('utf-8')}, {response.status_code}") def remove_license_from_licensegroup(self, licensegroup, license): response = self.session.delete(self.apicall + f"/v1/licenseGroup/{licensegroup}/license/{license}") if response.status_code == 200: return "Successful operation" - elif response.status_code == 401: + if response.status_code == 401: return (f"Unauthorized, {response.status_code}") - else: - return (f"{(response.content).decode('utf-8')}, {response.status_code}") + return (f"{(response.content).decode('utf-8')}, {response.status_code}") def add_license_to_group(self, licensegroup, license): @@ -61,27 +58,25 @@ def add_license_to_group(self, licensegroup, license): self.apicall + f"/v1/licenseGroup/{licensegroup}/license/{license}") if response.status_code == 200: return "Successful operation" - elif response.status_code == 401: + if response.status_code == 401: return (f"Unauthorized, {response.status_code}") - elif response.status_code == 304: + if response.status_code == 304: return (f"The license group already has the specified license assigned, {response.status_code}") - else: - return (f"{(response.content).decode('utf-8')}, {response.status_code}") + return (f"{(response.content).decode('utf-8')}, {response.status_code}") def create_licensegroup(self, name, licenses=None, riskWeight=0): - data = {'name': name, "riskWeight": riskWeight} + data = {"name": name, "riskWeight": riskWeight} if licenses: if isinstance(license, list): - data['licenses'] = licenses + data["licenses"] = licenses else: return "Error! Licenses should be a list" response = self.session.put(self.apicall + "/v1/licenseGroup", data=json.dumps(data)) if response.status_code == 201: return response.json() - elif response.status_code == 401: + if response.status_code == 401: return (f"Unauthorized, {response.status_code}") - else: - return (f"{(response.content).decode('utf-8')}, {response.status_code}") + return (f"{(response.content).decode('utf-8')}, {response.status_code}") def update_licensegroup(self, uuid, name=None, licenses=None, riskWeight=None): data = {"uuid": uuid} @@ -89,17 +84,16 @@ def update_licensegroup(self, uuid, name=None, licenses=None, riskWeight=None): data["name"] = name if licenses: if isinstance(license, list): - data['licenses'] = licenses + data["licenses"] = licenses else: return "Error! Licenses should be a list" if riskWeight: - data['risk_weight'] = riskWeight + data["risk_weight"] = riskWeight response = self.session.post(self.apicall + "/v1/licenseGroup", data=json.dumps(data)) if response.status_code == 200: return response.json() - elif response.status_code == 401: + if response.status_code == 401: return (f"Unauthorized, {response.status_code}") - elif response.status_code == 304: + if response.status_code == 304: return (f"The license group already has the specified license assigned, {response.status_code}") - else: - return (f"{(response.content).decode('utf-8')}, {response.status_code}") + return (f"{(response.content).decode('utf-8')}, {response.status_code}") diff --git a/apicomponents/metrics.py b/apicomponents/metrics.py index ef91675..3653626 100644 --- a/apicomponents/metrics.py +++ b/apicomponents/metrics.py @@ -1,4 +1,4 @@ -class Metrics(object): +class Metrics: def get_all_metrics(self, pageSize=100): """ @@ -8,18 +8,17 @@ def get_all_metrics(self, pageSize=100): pageNumber = 1 response = self.session.get(self.apicall + "/v1/metrics/vulnerability", params={ "pageSize": pageSize, "pageNumber": pageNumber}) - for metric in range(0, len(response.json())): + for metric in range(len(response.json())): metrics_list.append(response.json()[metric - 1]) while len(response.json()) == pageSize: pageNumber += 1 response = self.session.get(self.apicall + "/v1/metrics/vulnerability", params={ "pageSize": pageSize, "pageNumber": pageNumber}) - for metric in range(0, len(response.json())): + for metric in range(len(response.json())): metrics_list.append(response.json()[metric - 1]) if response.status_code == 200: return metrics_list - else: - return (f"Unauthorized, {response.status_code}") + return (f"Unauthorized, {response.status_code}") def get_metrics_portolio_bydate(self, date): """ @@ -30,8 +29,7 @@ def get_metrics_portolio_bydate(self, date): self.apicall + f"/v1/metrics/portfolio/since/{date}") if response.status_code == 200: return response.json() - else: - return (f"Unauthorized, {response.status_code}") + return (f"Unauthorized, {response.status_code}") def get_metrics_project_bydate(self, uuid, date): """ @@ -43,14 +41,13 @@ def get_metrics_project_bydate(self, uuid, date): self.apicall + f"/v1/metrics/project/{uuid}/since/{date}") if response.status_code == 200: return response.json() - elif response.status_code == 401: + if response.status_code == 401: return (f"Unauthorized , {response.status_code}") - elif response.status_code == 403: + if response.status_code == 403: return (f"Access to the specified project is forbidden, {response.status_code}") - elif response.status_code == 404: + if response.status_code == 404: return (f"Project not found, {response.status_code}") - else: - return (response.status_code) + return (response.status_code) def get_current_metrics_portfolio(self): """ @@ -60,8 +57,7 @@ def get_current_metrics_portfolio(self): self.apicall + "/v1/metrics/portfolio/current") if response.status_code == 200: return response.json() - else: - return (f"Unauthorized , {response.status_code}") + return (f"Unauthorized , {response.status_code}") def get_metrics_dayNumber(self, days): """ @@ -72,8 +68,7 @@ def get_metrics_dayNumber(self, days): self.apicall + f"/v1/metrics/portfolio/{days}/days") if response.status_code == 200: return response.json() - else: - return (f"Unauthorized , {response.status_code}") + return (f"Unauthorized , {response.status_code}") def get_metrics_refresh_portfolio(self): """ @@ -83,8 +78,7 @@ def get_metrics_refresh_portfolio(self): self.apicall + "/v1/metrics/portfolio/refresh") if response.status_code == 200: return (f"successful operation , {response.status_code}") - else: - return (f"Unauthorized , {response.status_code}") + return (f"Unauthorized , {response.status_code}") def get_metrics_specific_project(self, uuid): """ @@ -95,8 +89,7 @@ def get_metrics_specific_project(self, uuid): self.apicall + f"/v1/metrics/project/{uuid}/current") if response.status_code == 200: return response.json() - else: - return (f"Unauthorized , {response.status_code}") + return (f"Unauthorized , {response.status_code}") def get_metrics_specific_project_days(self, uuid, days): """ @@ -108,14 +101,13 @@ def get_metrics_specific_project_days(self, uuid, days): self.apicall + f"/v1/metrics/project/{uuid}/days/{days}") if response.status_code == 200: return response.json() - elif response.status_code == 401: + if response.status_code == 401: return (f"Unauthorized , {response.status_code}") - elif response.status_code == 403: + if response.status_code == 403: return (f"Access to the specified project is forbidden, {response.status_code}") - elif response.status_code == 404: + if response.status_code == 404: return (f"Project not found, {response.status_code}") - else: - return (response.status_code) + return (response.status_code) def get_metrics_refresh_project(self, uuid): """ @@ -126,14 +118,13 @@ def get_metrics_refresh_project(self, uuid): self.apicall + f"/v1/metrics/project/{uuid}/refresh") if response.status_code == 200: return (f"successful operation , {response.status_code}") - elif response.status_code == 401: + if response.status_code == 401: return (f"Unauthorized , {response.status_code}") - elif response.status_code == 403: + if response.status_code == 403: return (f"Access to the specified project is forbidden , {response.status_code}") - elif response.status_code == 404: + if response.status_code == 404: return (f"Project not found, {response.status_code}") - else: - return (response.status_code) + return (response.status_code) def get_current_metrics_component(self, uuid): """ @@ -144,14 +135,13 @@ def get_current_metrics_component(self, uuid): self.apicall + f"/v1/metrics/component/{uuid}/current") if response.status_code == 200: return response.json() - elif response.status_code == 401: + if response.status_code == 401: return (f"Unauthorized , {response.status_code}") - elif response.status_code == 403: + if response.status_code == 403: return (f"Access to the specified project is forbidden , {response.status_code}") - elif response.status_code == 404: + if response.status_code == 404: return (f"Project not found, {response.status_code}") - else: - return (response.status_code) + return (response.status_code) def get_metrics_component_bydate(self, uuid, date, pageSize=100): """ @@ -165,18 +155,17 @@ def get_metrics_component_bydate(self, uuid, date, pageSize=100): pageNumber = 1 response = self.session.get(self.apicall + f"/v1/metrics/component/{uuid}/since/{date}", params={ "pageSize": pageSize, "pageNumber": pageNumber}) - for metric in range(0, len(response.json())): + for metric in range(len(response.json())): metrics_list.append(response.json()[metric - 1]) while len(response.json()) == pageSize: pageNumber += 1 response = self.session.get(self.apicall + f"/v1/metrics/component/{uuid}/since/{date}", params={ "pageSize": pageSize, "pageNumber": pageNumber}) - for metric in range(0, len(response.json())): + for metric in range(len(response.json())): metrics_list.append(response.json()[metric - 1]) if response.status_code == 200: return metrics_list - else: - return (f"Unauthorized , {response.status_code}") + return (f"Unauthorized , {response.status_code}") def get_metrics_component_bydays(self, uuid, days, pageSize=100): """ @@ -190,21 +179,21 @@ def get_metrics_component_bydays(self, uuid, days, pageSize=100): pageNumber = 1 response = self.session.get(self.apicall + f"/v1/metrics/component/{uuid}/since/{days}", params={ "pageSize": pageSize, "pageNumber": pageNumber}) - for metric in range(0, len(response.json())): + for metric in range(len(response.json())): metrics_list.append(response.json()[metric - 1]) while len(response.json()) == pageSize: pageNumber += 1 response = self.session.get(self.apicall + f"/v1/metrics/component/{uuid}/since/{days}", params={ "pageSize": pageSize, "pageNumber": pageNumber}) - for metric in range(0, len(response.json())): + for metric in range(len(response.json())): metrics_list.append(response.json()[metric - 1]) if response.status_code == 200: return metrics_list - else: - return (f"Unauthorized, {response.status_code}") + return (f"Unauthorized, {response.status_code}") def get_metrics_component_refresh(self, uuid): - """[Requests a refresh of a specific components metrics] + """ + [Requests a refresh of a specific components metrics] Args: uuid ([string]): [The UUID of the component to retrieve metrics for.] @@ -216,11 +205,10 @@ def get_metrics_component_refresh(self, uuid): self.apicall + f"/v1/metrics/component/{uuid}/refresh") if response.status_code == 200: return (f"successful operation, {response.status_code}") - elif response.status_code == 401: + if response.status_code == 401: return (f"Unauthorized, {response.status_code}") - elif response.status_code == 403: + if response.status_code == 403: return (f"Access to the specified project is forbidden, {response.status_code}") - elif response.status_code == 404: + if response.status_code == 404: return (f"Project not found, {response.status_code}") - else: - return (response.status_code) + return (response.status_code) diff --git a/apicomponents/permission.py b/apicomponents/permission.py index 6bd22b1..b43588c 100644 --- a/apicomponents/permission.py +++ b/apicomponents/permission.py @@ -1,4 +1,4 @@ -class Permission(object): +class Permission: def list_permissions(self, pageSize=100): """ @@ -6,21 +6,21 @@ def list_permissions(self, pageSize=100): """ permissionlist = list() pageNumber = 1 - response = self.session.get(self.apicall + "/v1/permission", params={'pageSize': pageSize, 'pageNumber': pageNumber}) - for permission in range(0, len(response.json())): + response = self.session.get(self.apicall + "/v1/permission", params={"pageSize": pageSize, "pageNumber": pageNumber}) + for permission in range(len(response.json())): permissionlist.append(response.json()[permission - 1]) while len(response.json()) == pageSize: pageNumber += 1 - response = self.session.get(self.apicall + "/v1/permission", params={'pageSize': pageSize, 'pageNumber': pageNumber}) - for permission in range(0, len(response.json())): + response = self.session.get(self.apicall + "/v1/permission", params={"pageSize": pageSize, "pageNumber": pageNumber}) + for permission in range(len(response.json())): permissionlist.append(response.json()[permission - 1]) if response.status_code == 200: return permissionlist - else: - return (f"Unable to list permissions, {response.status_code}") + return (f"Unable to list permissions, {response.status_code}") def add_userpermission(self, permission, username): - """Adds the permission to the specified username. + """ + Adds the permission to the specified username. Args: permission (string): A valid permission. @@ -29,17 +29,17 @@ def add_userpermission(self, permission, username): response = self.session.post(self.apicall + f"/v1/permission/{permission}/user/{username}") if response.status_code == 200: return response.json() - elif response.status_code == 401: + if response.status_code == 401: return (f"Unauthorized, {response.status_code}") - elif response.status_code == 404: + if response.status_code == 404: return (f" The user could not be found , {response.status_code}") - elif response.status_code == 304: + if response.status_code == 304: return ("The user already has the specified permission assigned, {response.status_code}") - else: - return ((response.content).decode("utf-8"), response.status_code) + return ((response.content).decode("utf-8"), response.status_code) def delete_userpermission(self, permission, username): - """Removes the permission to the specified username. + """ + Removes the permission to the specified username. Args: permission (string): A valid permission. @@ -48,17 +48,17 @@ def delete_userpermission(self, permission, username): response = self.session.delete(self.apicall + f"/v1/permission/{permission}/user/{username}") if response.status_code == 200: return response.json() - elif response.status_code == 401: + if response.status_code == 401: return (f"Unauthorized , {response.status_code}") - elif response.status_code == 404: + if response.status_code == 404: return (f" The user could not be found, {response.status_code}") - elif response.status_code == 304: + if response.status_code == 304: return ("The user already has the specified permission assigned, {response.status_code}") - else: - return ((response.content).decode("utf-8"), response.status_code) + return ((response.content).decode("utf-8"), response.status_code) def add_teampermission(self, permission, uuid): - """Adds the permission to the specified username. + """ + Adds the permission to the specified username. Args: permission (string): A valid permission. @@ -67,14 +67,13 @@ def add_teampermission(self, permission, uuid): response = self.session.post(self.apicall + f"/v1/permission/{permission}/team/{uuid}") if response.status_code == 200: return response.json() - elif response.status_code == 401: + if response.status_code == 401: return (f"Unauthorized, {response.status_code}") - elif response.status_code == 404: + if response.status_code == 404: return (f" The user could not be found, {response.status_code}") - elif response.status_code == 304: + if response.status_code == 304: return ("The user already has the specified permission assigned, {response.status_code}") - else: - return ((response.content).decode("utf-8"), response.status_code) + return ((response.content).decode("utf-8"), response.status_code) # duplicate # def delete_userpermission(self, permission, uuid): diff --git a/apicomponents/policy.py b/apicomponents/policy.py index 0eb75bd..6c544e9 100644 --- a/apicomponents/policy.py +++ b/apicomponents/policy.py @@ -1,10 +1,11 @@ import json -class Policy(object): +class Policy: def get_policy(self, uuid): - """Returns a specific policy + """ + Returns a specific policy Args: uuid (string): The UUID of the policy to retrieve. @@ -15,13 +16,13 @@ def get_policy(self, uuid): response = self.session.get(self.apicall + f"/v1/policy/{uuid}") if response.status_code == 200: return response.json() - elif response.status_code == 401: + if response.status_code == 401: return (f"Unauthorized, {response.status_code}") - else: - return (f"{(response.content).decode('utf-8')}, {response.status_code}") + return (f"{(response.content).decode('utf-8')}, {response.status_code}") def list_policy(self, pageSize=100): - """Returns a list of all policies + """ + Returns a list of all policies Args: pageSize (int, optional): size of the page. Defaults to 100. @@ -32,24 +33,24 @@ def list_policy(self, pageSize=100): policylist = list() pageNumber = 1 response = self.session.get( - self.apicall + "/v1/policy", params={'pageSize': pageSize, 'pageNumber': pageNumber}) - for policy in range(0, len(response.json())): + self.apicall + "/v1/policy", params={"pageSize": pageSize, "pageNumber": pageNumber}) + for policy in range(len(response.json())): policylist.append(response.json()[policy - 1]) while len(response.json()) == pageSize: pageNumber += 1 response = self.session.get( - self.apicall + "/v1/policy", params={'pageSize': pageSize, 'pageNumber': pageNumber}) - for policy in range(0, len(response.json())): + self.apicall + "/v1/policy", params={"pageSize": pageSize, "pageNumber": pageNumber}) + for policy in range(len(response.json())): policylist.append(response.json()[policy - 1]) if response.status_code == 200: return policylist - elif response.status_code == 401: + if response.status_code == 401: return (f"Unauthorized, {response.status_code}") - else: - return (f"{(response.content).decode('utf-8')}, {response.status_code}") + return (f"{(response.content).decode('utf-8')}, {response.status_code}") def delete_policy(self, uuid): - """ Deletes a specific policy + """ + Deletes a specific policy Args: uuid (string): The UUID of the policy to delete. @@ -57,14 +58,14 @@ def delete_policy(self, uuid): response = self.session.delete(self.apicall + f"/v1/policy/{uuid}") if response.status_code >= 200 and response.status_code <= 299: return ("Successful operation") - elif response.status_code == 401: + if response.status_code == 401: return (f"Unauthorized, {response.status_code}") - else: - return (f"{(response.content).decode('utf-8')}, {response.status_code}") + return (f"{(response.content).decode('utf-8')}, {response.status_code}") def create_policy(self, name, operator="ANY", violationState="INFO", policyCondition=None, projects=None, globals=None): # TODO: create better comments explaining the args - """ Create a policy + """ + Create a policy Args: name (string): Name of the policy @@ -97,14 +98,14 @@ def create_policy(self, name, operator="ANY", violationState="INFO", policyCondi self.apicall + "/v1/policy", data=json.dumps(data)) if response.status_code == 201: return response.json() - elif response.status_code == 401: + if response.status_code == 401: return (f"Unauthorized, {response.status_code}") - else: - return (f"{(response.content).decode('utf-8')}, {response.status_code}") + return (f"{(response.content).decode('utf-8')}, {response.status_code}") def update_policy(self, uuid, name=None, operator=None, violationState=None, policyCondition=None, projects=None, globals=None): # TODO: create better comments explaining the args - """ Create a policy + """ + Create a policy Args: name (string): Name of the policy @@ -117,9 +118,9 @@ def update_policy(self, uuid, name=None, operator=None, violationState=None, pol """ data = {"uuid": uuid} if name: - data['name'] = name + data["name"] = name if violationState: - data['violationState'] = violationState + data["violationState"] = violationState if operator: data["operator"] = operator if policyCondition: @@ -137,13 +138,13 @@ def update_policy(self, uuid, name=None, operator=None, violationState=None, pol response = self.session.post(self.apicall + "/v1/policy", data=json.dumps(data)) if response.status_code == 200: return ("Successful operation") - elif response.status_code == 401: + if response.status_code == 401: return (f"Unauthorized, {response.status_code}") - else: - return (f"{(response.content).decode('utf-8')}, {response.status_code}") + return (f"{(response.content).decode('utf-8')}, {response.status_code}") def add_policyToproject(self, policyUuid, projectUuid): - """Adds project to a policy. + """ + Adds project to a policy. Args: policyUuid (string): The UUID of the policy @@ -152,15 +153,15 @@ def add_policyToproject(self, policyUuid, projectUuid): response = self.session.post(self.apicall + f"/v1/policy/{policyUuid}/projects/{projectUuid}") if response.status_code == 200: return ("Successful operation") - elif response.status_code == 401: + if response.status_code == 401: return (f"Unauthorized, {response.status_code}") - elif response.status_code == 304: + if response.status_code == 304: return (f"The policy already has the specified project assigned, {response.status_code}") - else: - return (f"{(response.content).decode('utf-8')}, {response.status_code}") + return (f"{(response.content).decode('utf-8')}, {response.status_code}") def delete_policyFromproject(self, policyUuid, projectUuid): - """Removes a project from a policy. + """ + Removes a project from a policy. Args: policyUuid (string): The UUID of the policy @@ -169,9 +170,8 @@ def delete_policyFromproject(self, policyUuid, projectUuid): response = self.session.delete(self.apicall + f"/v1/policy/{policyUuid}/projects/{projectUuid}") if response.status_code == 200: return ("Successful operation") - elif response.status_code == 401: + if response.status_code == 401: return (f"Unauthorized, {response.status_code}") - elif response.status_code == 304: + if response.status_code == 304: return (f"The policy does not have the specified project assigned, {response.status_code}") - else: - return (f"{(response.content).decode('utf-8')}, {response.status_code}") + return (f"{(response.content).decode('utf-8')}, {response.status_code}") diff --git a/apicomponents/project.py b/apicomponents/project.py index 7468ff8..2722c72 100644 --- a/apicomponents/project.py +++ b/apicomponents/project.py @@ -1,28 +1,26 @@ -class Project(object): +class Project: def list_projects(self, pageSize=100): projectlist = list() pageNumber = 1 response = self.session.get( - self.apicall + "/v1/project", params={'pageSize': pageSize, 'pageNumber': pageNumber}) - for project in range(0, len(response.json())): + self.apicall + "/v1/project", params={"pageSize": pageSize, "pageNumber": pageNumber}) + for project in range(len(response.json())): projectlist.append(response.json()[project - 1]) while len(response.json()) == pageSize: pageNumber += 1 response = self.session.get( - self.apicall + "/v1/project", params={'pageSize': pageSize, 'pageNumber': pageNumber}) - for project in range(0, len(response.json())): + self.apicall + "/v1/project", params={"pageSize": pageSize, "pageNumber": pageNumber}) + for project in range(len(response.json())): projectlist.append(response.json()[project - 1]) if response.status_code == 200: return projectlist - else: - return ("Unable to list projects", response.status_code) + return ("Unable to list projects", response.status_code) def get_project(self, uuid): response = self.session.get(self.apicall + f"/v1/project/{uuid}") if response.status_code == 200: return response.json() - else: - return ("Unable to find project", response.status_code) + return ("Unable to find project", response.status_code) def get_project_lookup(self, name, version=None): if version is None: @@ -33,15 +31,13 @@ def get_project_lookup(self, name, version=None): self.apicall + f"/v1/project/lookup?{lookup}") if response.status_code == 200: return response.json() - else: - return ("Unable to find project", response.status_code) + return ("Unable to find project", response.status_code) def delete_project_uuid(self, uuid): response = self.session.delete(self.apicall + f"/v1/project/{uuid}") if response.status_code == 204: return ("Successfully deleted the project", response.status_code) - else: - return ("Unable to delete the project", response.status_code) + return ("Unable to delete the project", response.status_code) def create_project(self, name, classifier, version, active=True): # TODO add more options @@ -49,32 +45,30 @@ def create_project(self, name, classifier, version, active=True): "name": name, "classifier": classifier, "version": version, - "active": active + "active": active, } response = self.session.put(self.apicall + "/v1/project", json=data) if response.status_code == 201: print("Successfully created the project", response.status_code) return response.json() - elif response.status_code == 409: + if response.status_code == 409: return ("Project with specified name already exists", response.status_code) - else: - return ("Unable to create the project", response.status_code) + return ("Unable to create the project", response.status_code) def update_project(self, uuid, name=None, classifier=None): # TODO add more options data = { - "uuid": uuid + "uuid": uuid, } if name: - data['name'] = name + data["name"] = name if classifier: - data['classifier'] = classifier + data["classifier"] = classifier response = self.session.post(self.apicall + "/v1/project", json=data) if response.status_code == 200: return ("Successfully updated the project", response.status_code) - elif response.status_code == 404: + if response.status_code == 404: return ("Project with specified uuid could not be found", response.status_code) - elif response.status_code == 409: + if response.status_code == 409: return ("Project with specified name already exists", response.status_code) - else: - return ("Unable to update the project", response.status_code) + return ("Unable to update the project", response.status_code) diff --git a/apicomponents/projectProperty.py b/apicomponents/projectProperty.py index 128ca2b..5151e70 100644 --- a/apicomponents/projectProperty.py +++ b/apicomponents/projectProperty.py @@ -1,29 +1,27 @@ -class ProjectProperty(object): +class ProjectProperty: def get_projectproperty(self, uuid): response = self.session.get(self.apicall + f"/v1/project/{uuid}/property") if response.status_code == 200: return response.json() - else: - return (f"Unable to find project, {response.status_code}") + return (f"Unable to find project, {response.status_code}") def update_projectproperty(self, uuid, name=None, classifier=None): # TODO add more options data = { - "uuid": uuid + "uuid": uuid, } if name: - data['name'] = name + data["name"] = name if classifier: - data['classifier'] = classifier + data["classifier"] = classifier response = self.session.post(self.apicall + f"/v1/project/{uuid}/property", json=data) if response.status_code == 200: return (f"Successfully updated the project, {response.status_code}") - elif response.status_code == 404: + if response.status_code == 404: return (f"Project with specified uuid could not be found, {response.status_code}") - elif response.status_code == 409: + if response.status_code == 409: return (f"Project with specified name already exists, {response.status_code}") - else: - return (f"Unable to update the project, {response.status_code}") + return (f"Unable to update the project, {response.status_code}") def create_projectproperty(self, uuid, propertyValue, groupName="integrations", propertyName="defectdojo.engagementId", propertyType="STRING", description="DefectDojo integration"): """ @@ -45,20 +43,18 @@ def create_projectproperty(self, uuid, propertyValue, groupName="integrations", "propertyName": propertyName, "propertyValue": propertyValue, "propertyType": propertyType, - "description": description + "description": description, } response = self.session.put( self.apicall + f"/v1/project/{uuid}/property", json=data) if response.status_code == 201: return (f"Successfully created the project, {response.status_code}") - elif response.status_code == 409: + if response.status_code == 409: return (f"Project with specified name already exists, {response.status_code}") - else: - return (f"Unable to create the project, {response.status_code}") + return (f"Unable to create the project, {response.status_code}") def delete_projectproperty_uuid(self, uuid): response = self.session.delete(self.apicall + f"/v1/project/{uuid}/property") if response.status_code == 204: return (f"Successfully deleted the project, {response.status_code}") - else: - return (f"Unable to delete the project, {response.status_code}") + return (f"Unable to delete the project, {response.status_code}") diff --git a/apicomponents/repository.py b/apicomponents/repository.py index a96a1a1..4465172 100644 --- a/apicomponents/repository.py +++ b/apicomponents/repository.py @@ -1,10 +1,11 @@ import json -class Repository(object): +class Repository: def list_repository(self, pageSize=100): - """Returns a list of all repositories + """ + Returns a list of all repositories Args: pageSize (int, optional): [description]. Defaults to 100. @@ -14,21 +15,21 @@ def list_repository(self, pageSize=100): """ respositorylist = list() pageNumber = 1 - response = self.session.get(self.apicall + "/v1/repository", params={'pageSize': pageSize, 'pageNumber': pageNumber}) - for repository in range(0, len(response.json())): + response = self.session.get(self.apicall + "/v1/repository", params={"pageSize": pageSize, "pageNumber": pageNumber}) + for repository in range(len(response.json())): respositorylist.append(response.json()[repository - 1]) while len(response.json()) == pageSize: pageNumber += 1 - response = self.session.get(self.apicall + "/v1/repository", params={'pageSize': pageSize, 'pageNumber': pageNumber}) - for repository in range(0, len(response.json())): + response = self.session.get(self.apicall + "/v1/repository", params={"pageSize": pageSize, "pageNumber": pageNumber}) + for repository in range(len(response.json())): respositorylist.append(response.json()[repository - 1]) if response.status_code == 200: return respositorylist - else: - return (f"{(response.content).decode('utf-8')}, {response.status_code}") + return (f"{(response.content).decode('utf-8')}, {response.status_code}") def update_repository(self, uuid, identifier, type, url, resolutionOrder=0, enable=True, internal=True): - """Update a specific repository + """ + Update a specific repository Args: uuid ([type]): [description] @@ -57,16 +58,16 @@ def update_repository(self, uuid, identifier, type, url, resolutionOrder=0, enab "resolutionOder": resolutionOrder, "enable": enable, "internal": internal, - "identifier": identifier + "identifier": identifier, } response = self.session.post(self.apicall + "/v1/repository", data=json.dumps(data)) if response.status_code == 200: return ("Successful operation") - else: - return (f"{(response.content).decode('utf-8')}, {response.status_code}") + return (f"{(response.content).decode('utf-8')}, {response.status_code}") def create_repository(self, identifier, type, url, resolutionOrder=0, enable=True, internal=True): - """ Create a new repository + """ + Create a new repository Args: identifier (string): identity name of the repository @@ -93,16 +94,16 @@ def create_repository(self, identifier, type, url, resolutionOrder=0, enable=Tru "resolutionOder": resolutionOrder, "enable": enable, "internal": internal, - "identifier": identifier + "identifier": identifier, } response = self.session.put(self.apicall + "/v1/repository", data=json.dumps(data)) if response.status_code == 201: return ("Successful operation") - else: - return (f"{(response.content).decode('utf-8')}, {response.status_code}") + return (f"{(response.content).decode('utf-8')}, {response.status_code}") def get_latest_repository(self, purl): - """Attempts to resolve the latest version of the component available in the configured repositories + """ + Attempts to resolve the latest version of the component available in the configured repositories Args: purl (string): The Package URL for the component to query @@ -117,11 +118,10 @@ def get_latest_repository(self, purl): "lastCheck": "2021-12-02T16:50:56.704Z" } """ - response = self.session.get(self.apicall + "/v1/repository/latest", params={'purl': purl}) + response = self.session.get(self.apicall + "/v1/repository/latest", params={"purl": purl}) if response.status_code == 200: return response.json() - else: - return (f"{(response.content).decode('utf-8')}, {response.status_code}") + return (f"{(response.content).decode('utf-8')}, {response.status_code}") def get_repositoryByType(self, type, pageSize=100): """ @@ -136,21 +136,21 @@ def get_repositoryByType(self, type, pageSize=100): """ respositorylist = list() pageNumber = 1 - response = self.session.get(self.apicall + f"/v1/repository/{type}", params={'pageSize': pageSize, 'pageNumber': pageNumber}) - for repository in range(0, len(response.json())): + response = self.session.get(self.apicall + f"/v1/repository/{type}", params={"pageSize": pageSize, "pageNumber": pageNumber}) + for repository in range(len(response.json())): respositorylist.append(response.json()[repository - 1]) while len(response.json()) == pageSize: pageNumber += 1 - response = self.session.get(self.apicall + f"/v1/repository/{type}", params={'pageSize': pageSize, 'pageNumber': pageNumber}) - for repository in range(0, len(response.json())): + response = self.session.get(self.apicall + f"/v1/repository/{type}", params={"pageSize": pageSize, "pageNumber": pageNumber}) + for repository in range(len(response.json())): respositorylist.append(response.json()[repository - 1]) if response.status_code == 200: return respositorylist - else: - return (f"{(response.content).decode('utf-8')}, {response.status_code}") + return (f"{(response.content).decode('utf-8')}, {response.status_code}") def delete_repository(self, uuid): - """Deletes a repository + """ + Deletes a repository Args: uuid (string): the UUID of the repository to delete @@ -158,5 +158,4 @@ def delete_repository(self, uuid): response = self.session.delete(self.apicall + f"/v1/repository/{uuid}") if response.status_code >= 200 or response.status_code <= 299: return ("Successful operation") - else: - return (f"{(response.content).decode('utf-8')}, {response.status_code}") + return (f"{(response.content).decode('utf-8')}, {response.status_code}") diff --git a/apicomponents/search.py b/apicomponents/search.py index 9bedc70..45a04f8 100644 --- a/apicomponents/search.py +++ b/apicomponents/search.py @@ -1,73 +1,67 @@ -class Search(object): +class Search: def general_search(self, query=None): if query: - response = self.session.get(self.apicall + "/v1/search", params={'query': query}) + response = self.session.get(self.apicall + "/v1/search", params={"query": query}) else: response = self.session.get(self.apicall + "/v1/search") if response.status_code == 200: return response.json() - elif response.status_code == 401: + if response.status_code == 401: return (f"Unauthorized, {response.status_code}") - else: - return (f"{(response.content).decode('utf-8')}, {response.status_code}") + return (f"{(response.content).decode('utf-8')}, {response.status_code}") def project_search(self, query=None): if query: - response = self.session.get(self.apicall + "/v1/search/project", params={'query': query}) + response = self.session.get(self.apicall + "/v1/search/project", params={"query": query}) else: response = self.session.get(self.apicall + "/v1/search/project") if response.status_code == 200: return response.json() - elif response.status_code == 401: + if response.status_code == 401: return (f"Unauthorized, {response.status_code}") - else: - return (f"{(response.content).decode('utf-8')}, {response.status_code}") + return (f"{(response.content).decode('utf-8')}, {response.status_code}") def component_search(self, query=None): if query: - response = self.session.get(self.apicall + "/v1/search/component", params={'query': query}) + response = self.session.get(self.apicall + "/v1/search/component", params={"query": query}) else: response = self.session.get(self.apicall + "/v1/search/component") if response.status_code == 200: return response.json() - elif response.status_code == 401: + if response.status_code == 401: return (f"Unauthorized, {response.status_code}") - else: - return (f"{(response.content).decode('utf-8')}, {response.status_code}") + return (f"{(response.content).decode('utf-8')}, {response.status_code}") def service_search(self, query=None): if query: - response = self.session.get(self.apicall + "/v1/search/service", params={'query': query}) + response = self.session.get(self.apicall + "/v1/search/service", params={"query": query}) else: response = self.session.get(self.apicall + "/v1/search/service") if response.status_code == 200: return response.json() - elif response.status_code == 401: + if response.status_code == 401: return (f"Unauthorized, {response.status_code}") - else: - return (f"{(response.content).decode('utf-8')}, {response.status_code}") + return (f"{(response.content).decode('utf-8')}, {response.status_code}") def license_search(self, query=None): if query: - response = self.session.get(self.apicall + "/v1/search/license", params={'query': query}) + response = self.session.get(self.apicall + "/v1/search/license", params={"query": query}) else: response = self.session.get(self.apicall + "/v1/search/license") if response.status_code == 200: return response.json() - elif response.status_code == 401: + if response.status_code == 401: return (f"Unauthorized, {response.status_code}") - else: - return (f"{(response.content).decode('utf-8')}, {response.status_code}") + return (f"{(response.content).decode('utf-8')}, {response.status_code}") def vulnerability_search(self, query=None): if query: - response = self.session.get(self.apicall + "/v1/search/vulnerability", params={'query': query}) + response = self.session.get(self.apicall + "/v1/search/vulnerability", params={"query": query}) else: response = self.session.get(self.apicall + "/v1/search/vulnerability") if response.status_code == 200: return response.json() - elif response.status_code == 401: + if response.status_code == 401: return (f"Unauthorized, {response.status_code}") - else: - return (f"{(response.content).decode('utf-8')}, {response.status_code}") + return (f"{(response.content).decode('utf-8')}, {response.status_code}") diff --git a/apicomponents/service.py b/apicomponents/service.py index d076bbc..7bca4a0 100644 --- a/apicomponents/service.py +++ b/apicomponents/service.py @@ -1,10 +1,11 @@ import json -class Service(object): +class Service: def list_services(self, uuid, pageSize=100): - """Returns a list of all services for a given project + """ + Returns a list of all services for a given project Args: uuid (string): The UUID of the project. @@ -15,29 +16,29 @@ def list_services(self, uuid, pageSize=100): """ servicelist = list() pageNumber = 1 - response = self.session.get(self.apicall + f"/v1/service/project/{uuid}", params={'pageSize': pageSize, 'pageNumber': pageNumber}) + response = self.session.get(self.apicall + f"/v1/service/project/{uuid}", params={"pageSize": pageSize, "pageNumber": pageNumber}) if response.status_code == 200: - for service in range(0, len(response.json())): + for service in range(len(response.json())): servicelist.append(response.json()[service - 1]) while len(response.json()) == pageSize: pageNumber += 1 - response = self.session.get(self.apicall + f"/v1/service/project/{uuid}", params={'pageSize': pageSize, 'pageNumber': pageNumber}) - for service in range(0, len(response.json())): + response = self.session.get(self.apicall + f"/v1/service/project/{uuid}", params={"pageSize": pageSize, "pageNumber": pageNumber}) + for service in range(len(response.json())): servicelist.append(response.json()[service - 1]) if response.status_code == 200: return servicelist else: if response.status_code == 404: return (f"The project could not be found, {response.status_code}") - elif response.status_code == 403: + if response.status_code == 403: return (f"Access to the specified project is forbidden, {response.status_code}") - elif response.status_code == 401: + if response.status_code == 401: return (f"Unauthorized, {response.status_code}") - else: - return (f"{(response.content).decode('utf-8')}, {response.status_code}") + return (f"{(response.content).decode('utf-8')}, {response.status_code}") def get_service(self, uuid): - """Returns a specific service. + """ + Returns a specific service. Args: uuid (string): The UUID of the project. @@ -48,17 +49,17 @@ def get_service(self, uuid): response = self.session.get(self.apicall + f"/v1/service/{uuid}") if response.status_code == 200: return response.json() - elif response.status_code == 401: + if response.status_code == 401: return (f"Unauthorized, {response.status_code}") - elif response.status_code == 404: + if response.status_code == 404: return (f"The service could not be found, {response.status_code}") - elif response.status_code == 403: + if response.status_code == 403: return (f"Access to the specified project is forbidden, {response.status_code}") - else: - return ((response.content).decode("utf-8"), response.status_code) + return ((response.content).decode("utf-8"), response.status_code) def delete_service(self, uuid): - """Deletes a service + """ + Deletes a service Args: uuid (string): The UUID of the project. @@ -66,14 +67,13 @@ def delete_service(self, uuid): response = self.session.delete(self.apicall + f"/v1/service/{uuid}") if response.status_code == 200: return ("Successful operation") - elif response.status_code == 403: + if response.status_code == 403: return (f"Access to the specified service is forbidden, {response.status_code}") - elif response.status_code == 401: + if response.status_code == 401: return (f"Unauthorized, {response.status_code}") - elif response.status_code == 404: + if response.status_code == 404: return (f"The UUID of the service could not be found, {response.status_code}") - else: - return (f"{(response.content).decode('utf-8')}, {response.status_code}") + return (f"{(response.content).decode('utf-8')}, {response.status_code}") def create_service(self, uuid, providerName=None, providerURL=None, contactName=None, contactEmail=None, contactPhone=None): """ @@ -82,27 +82,26 @@ def create_service(self, uuid, providerName=None, providerURL=None, contactName= data = {} contact = {} if providerName: - data['provider'] = {"name": providerName} + data["provider"] = {"name": providerName} if providerURL: - data['provider'] = {"url": providerURL} + data["provider"] = {"url": providerURL} if contactName: - contact['name'] = contactName + contact["name"] = contactName if contactEmail: - contact['email'] = contactEmail + contact["email"] = contactEmail if contactPhone: - contact['phone'] = contactPhone + contact["phone"] = contactPhone if not bool(contact): - data['contact'] = [contact] + data["contact"] = [contact] # TODO: add more option response = self.session.put(self.apicall + f"/v1/service/project/{uuid}", data=json.dumps(data)) if response.status_code == 201: return ("Successful operation") - elif response.status_code == 401: + if response.status_code == 401: return (f"Unauthorized, {response.status_code}") - elif response.status_code == 404: + if response.status_code == 404: return (f"The team could not be found, {response.status_code}") - else: - return (f"{(response.content).decode('utf-8')}, {response.status_code}") + return (f"{(response.content).decode('utf-8')}, {response.status_code}") def update_service(self, **args): @@ -112,5 +111,4 @@ def update_service(self, **args): response = self.session.post(self.apicall + "/v1/service", data=json.dumps(data)) if response.status_code == 200: return ("Successful operation") - else: - return (f"{(response.content).decode('utf-8')}, {response.status_code}") + return (f"{(response.content).decode('utf-8')}, {response.status_code}") diff --git a/apicomponents/team.py b/apicomponents/team.py index bfe09ee..b53d3b9 100644 --- a/apicomponents/team.py +++ b/apicomponents/team.py @@ -1,25 +1,26 @@ import json -class Team(object): +class Team: def get_teamByUUID(self, uuid): - """Returns a specific team + """ + Returns a specific team Args: uuid (string): The UUID of the team to retrieve """ response = self.session.get(self.apicall + f"/v1/team/{uuid}") if response.status_code == 200: return response.json() - elif response.status_code == 401: + if response.status_code == 401: return (f"Unauthorized, {response.status_code}") - elif response.status_code == 404: + if response.status_code == 404: return (f"The team could not be found, {response.status_code}") - else: - return response.status_code + return response.status_code def generate_team_apikey(self, uuid): - """Generate an API key and returns its value + """ + Generate an API key and returns its value Args: uuid (string): The UUID of the team to generate a key for. @@ -27,15 +28,15 @@ def generate_team_apikey(self, uuid): response = self.session.put(self.apicall + f"/v1/team/{uuid}/key") if response.status_code == 201: return response.json() - elif response.status_code == 401: + if response.status_code == 401: return (f"Unauthorized, {response.status_code}") - elif response.status_code == 404: + if response.status_code == 404: return (f"The team could not be found, {response.status_code}") - else: - return (response.status_code) + return (response.status_code) def delete_team(self, name, uuid, key=[]): - """ Deletes a team + """ + Deletes a team Args: name (string): name of the team. @@ -47,17 +48,16 @@ def delete_team(self, name, uuid, key=[]): "uuid": uuid, } if len(key) > 0: - data['apikeys'] = key + data["apikeys"] = key response = self.session.delete( self.apicall + "/v1/team", data=json.dumps(data)) if response.status_code == 204: return ("Successfully operation") - elif response.status_code == 401: + if response.status_code == 401: return (f"Unauthorized, {response.status_code}") - elif response.status_code == 404: + if response.status_code == 404: return (f"The team could not be found, {response.status_code}") - else: - return response.status_code + return response.status_code def update_team(self, name, uuid, key=[], ldapUsers=[], managedUsers=[], oidcUsers=[], mappedLdapGroups=[], mappedOidcGroups=[], permissions=[]): data = { @@ -65,13 +65,13 @@ def update_team(self, name, uuid, key=[], ldapUsers=[], managedUsers=[], oidcUse "uuid": uuid, } if len(key) > 0: - data['apikeys'] = key + data["apikeys"] = key if len(ldapUsers) > 0: - data['ldapusers'] = ldapUsers + data["ldapusers"] = ldapUsers if len(managedUsers) > 0: - data['managedUsers'] = managedUsers + data["managedUsers"] = managedUsers if len(oidcUsers) > 0: - data['oidcUsers'] = oidcUsers + data["oidcUsers"] = oidcUsers if len(mappedLdapGroups) > 0: data["mappedLdapGroups"] = mappedLdapGroups if len(mappedOidcGroups) > 0: @@ -82,12 +82,11 @@ def update_team(self, name, uuid, key=[], ldapUsers=[], managedUsers=[], oidcUse self.apicall + "/v1/team", data=json.dumps(data)) if response.status_code == 200: return (f"successful operation, {response.status_code}") - elif response.status_code == 401: + if response.status_code == 401: return (f"Unauthorized, {response.status_code}") - elif response.status_code == 404: + if response.status_code == 404: return (f"The team could not be found, {response.status_code}") - else: - return response.status_code + return response.status_code def create_team(self, name, uuid=None, key=[], ldapUsers=[], managedUsers=[], oidcUsers=[], mappedLdapGroups=[], mappedOidcGroups=[], permissions=[]): data = { @@ -96,13 +95,13 @@ def create_team(self, name, uuid=None, key=[], ldapUsers=[], managedUsers=[], oi if uuid: data["uuid"] = uuid if len(key) > 0: - data['apikeys'] = key + data["apikeys"] = key if len(ldapUsers) > 0: - data['ldapusers'] = ldapUsers + data["ldapusers"] = ldapUsers if len(managedUsers) > 0: - data['managedUsers'] = managedUsers + data["managedUsers"] = managedUsers if len(oidcUsers) > 0: - data['oidcUsers'] = oidcUsers + data["oidcUsers"] = oidcUsers if len(mappedLdapGroups) > 0: data["mappedLdapGroups"] = mappedLdapGroups if len(mappedOidcGroups) > 0: @@ -113,57 +112,57 @@ def create_team(self, name, uuid=None, key=[], ldapUsers=[], managedUsers=[], oi self.apicall + "/v1/team", data=json.dumps(data)) if response.status_code == 201: return (f"successful operation, {response.status_code}") - elif response.status_code == 401: + if response.status_code == 401: return (f"Unauthorized, {response.status_code}") - else: - return response.status_code + return response.status_code def list_teams(self, pageSize=100): - """Returns a list of all teams + """ + Returns a list of all teams """ teamlist = list() pageNumber = 1 response = self.session.get( - self.apicall + "/v1/team", params={'pageSize': pageSize, 'pageNumber': pageNumber}) - for team in range(0, len(response.json())): + self.apicall + "/v1/team", params={"pageSize": pageSize, "pageNumber": pageNumber}) + for team in range(len(response.json())): teamlist.append(response.json()[team - 1]) while len(response.json()) == pageSize: pageNumber += 1 response = self.session.get( - self.apicall + "/v1/team", params={'pageSize': pageSize, 'pageNumber': pageNumber}) - for team in range(0, len(response.json())): + self.apicall + "/v1/team", params={"pageSize": pageSize, "pageNumber": pageNumber}) + for team in range(len(response.json())): teamlist.append(response.json()[team - 1]) if response.status_code == 200: return teamlist - else: - return ("Unable to list teams", response.status_code) + return ("Unable to list teams", response.status_code) def get_uuid_from_team_name(self, teamname, pageSize=100): - """Returns a list of all teams + """ + Returns a list of all teams """ teamlist = list() teamuid = None pageNumber = 1 response = self.session.get( - self.apicall + "/v1/team", params={'pageSize': pageSize, 'pageNumber': pageNumber}) - for team in range(0, len(response.json())): + self.apicall + "/v1/team", params={"pageSize": pageSize, "pageNumber": pageNumber}) + for team in range(len(response.json())): teamlist.append(response.json()[team - 1]) while len(response.json()) == pageSize: pageNumber += 1 response = self.session.get( - self.apicall + "/v1/team", params={'pageSize': pageSize, 'pageNumber': pageNumber}) - for team in range(0, len(response.json())): + self.apicall + "/v1/team", params={"pageSize": pageSize, "pageNumber": pageNumber}) + for team in range(len(response.json())): teamlist.append(response.json()[team - 1]) for team in teamlist: - if team['name'] == teamname: - teamuid = team['uuid'] + if team["name"] == teamname: + teamuid = team["uuid"] if response.status_code == 200: return teamuid - else: - return ("Unable to list team", response.status_code) + return ("Unable to list team", response.status_code) def delete_apikey(self, apikey): - """Delete specified API key + """ + Delete specified API key Args: apikey (string): The API key to delete. @@ -171,13 +170,14 @@ def delete_apikey(self, apikey): response = self.session.delete(self.apicall + f"/v1/team/key/{apikey}") if response.status_code == 200: return (f"successful operation, {response.status_code}") - elif response.status_code == 401: + if response.status_code == 401: return (f"Unauthorized, {response.status_code}") - elif response.status_code == 404: + if response.status_code == 404: return (f"The API key could not be found, {response.status_code}") def update_apikey(self, apikey): - """Regenerates an API key by removing the specified key, generating a new one and returning its value + """ + Regenerates an API key by removing the specified key, generating a new one and returning its value Args: apikey (string): The API key to regenerate. @@ -185,9 +185,8 @@ def update_apikey(self, apikey): response = self.session.post(self.apicall + f"/v1/team/key/{apikey}") if response.status_code == 200: return response.json() - elif response.status_code == 401: + if response.status_code == 401: return (f"Unauthorized, {response.status_code}") - elif response.status_code == 404: + if response.status_code == 404: return (f"The API key could not be found, {response.status_code}") - else: - return response.status_code + return response.status_code diff --git a/apicomponents/user.py b/apicomponents/user.py index 287efea..2c9c455 100644 --- a/apicomponents/user.py +++ b/apicomponents/user.py @@ -1,7 +1,7 @@ import json -class User(object): +class User: def get_user_oidc(self): """ @@ -9,9 +9,9 @@ def get_user_oidc(self): response = self.session.get(self.apicall + "/v1/user/oidc") # Retuns a list of all OIDC users if response.status_code == 200: return response.json() - elif response.status_code == 401: + if response.status_code == 401: return (f"Unauthorized, {response.status_code}") - elif response.status_code == 404: + if response.status_code == 404: return (f"Could not be found, {response.status_code}") def join_team(self, username, uuid): @@ -28,11 +28,11 @@ def join_team(self, username, uuid): response = self.session.post(self.apicall + f"/v1/user/{username}/membership", data=json.dumps(data)) if response.status_code == 200: return response.json() - elif response.status_code == 304: + if response.status_code == 304: return (f"The user is already a member of the specified team, {response.status_code}") - elif response.status_code == 401: + if response.status_code == 401: return (f"Unauthorized, {response.status_code}") - elif response.status_code == 404: + if response.status_code == 404: return (f"User or team could not be found, {response.status_code}") # TODO extend diff --git a/apicomponents/violation.py b/apicomponents/violation.py index 8b21344..bddad9e 100644 --- a/apicomponents/violation.py +++ b/apicomponents/violation.py @@ -1,7 +1,8 @@ -class Violation(object): +class Violation: def list_violations(self, suppressed=False, pageSize=100): - """Returns a list of all policy violations for the entire portfolio + """ + Returns a list of all policy violations for the entire portfolio Args: suppressed (bool, optional): Optionally includes suppressed violations. Defaults to False. @@ -13,22 +14,22 @@ def list_violations(self, suppressed=False, pageSize=100): violationlist = list() pageNumber = 1 response = self.session.get(self.apicall + "/v1/violation", params={ - 'pageSize': pageSize, 'pageNumber': pageNumber, 'suppressed': suppressed}) - for violation in range(0, len(response.json())): + "pageSize": pageSize, "pageNumber": pageNumber, "suppressed": suppressed}) + for violation in range(len(response.json())): violationlist.append(response.json()[violation - 1]) while len(response.json()) == pageSize: pageNumber += 1 response = self.session.get(self.apicall + "/v1/violation", params={ - 'pageSize': pageSize, 'pageNumber': pageNumber, 'suppressed': suppressed}) - for violation in range(0, len(response.json())): + "pageSize": pageSize, "pageNumber": pageNumber, "suppressed": suppressed}) + for violation in range(len(response.json())): violationlist.append(response.json()[violation - 1]) if response.status_code == 200: return violationlist - else: - return ((response.content).decode("utf-8"), response.status_code) + return ((response.content).decode("utf-8"), response.status_code) def get_project_violation(self, uuid, suppressed=False): - """Returns a list of all policy violations for a specific project + """ + Returns a list of all policy violations for a specific project Args: uuid (string): The UUID of the project @@ -37,14 +38,14 @@ def get_project_violation(self, uuid, suppressed=False): Returns: List: Returns a list of all policy violations for a specific project """ - response = self.session.get(self.apicall + f"/v1/violation/project/{uuid}", params={'suppressed': suppressed}) + response = self.session.get(self.apicall + f"/v1/violation/project/{uuid}", params={"suppressed": suppressed}) if response.status_code == 200: return response.json() - else: - return ((response.content).decode("utf-8"), response.status_code) + return ((response.content).decode("utf-8"), response.status_code) def get_component_violation(self, uuid, suppressed=False): - """Returns a list of all policy violations for a specific component + """ + Returns a list of all policy violations for a specific component Args: uuid (string): The UUID of the project @@ -53,8 +54,7 @@ def get_component_violation(self, uuid, suppressed=False): Returns: List: Returns a list of all policy violations for a specific component. """ - response = self.session.get(self.apicall + f"/v1/violation/component/{uuid}", params={'suppressed': suppressed}) + response = self.session.get(self.apicall + f"/v1/violation/component/{uuid}", params={"suppressed": suppressed}) if response.status_code == 200: return response.json() - else: - return ((response.content).decode("utf-8"), response.status_code) + return ((response.content).decode("utf-8"), response.status_code) diff --git a/apicomponents/violationAnalysis.py b/apicomponents/violationAnalysis.py index ae9b4c2..57bb2c0 100644 --- a/apicomponents/violationAnalysis.py +++ b/apicomponents/violationAnalysis.py @@ -1,10 +1,11 @@ import json -class ViolationAnalysis(object): +class ViolationAnalysis: def record_violation(self, component, policyViolation, suppressed=True): - """Record a violation analysis decision + """ + Record a violation analysis decision Args: component (string): The UUID of the component @@ -17,18 +18,18 @@ def record_violation(self, component, policyViolation, suppressed=True): data = { "component": component, "policyViolation": policyViolation, - "suppressed": suppressed + "suppressed": suppressed, } response = self.session.put(self.apicall + "/v1/violation/analysis", data=json.dumps(data)) if response.status_code == 200: return response.json() - elif response.status_code == 401: + if response.status_code == 401: return (f"Unauthorized, {response.status_code}") - else: - return (f"{(response.content).decode('utf-8')}, {response.status_code}") + return (f"{(response.content).decode('utf-8')}, {response.status_code}") def get_violation_analysis(self, component, policyViolation): - """Retrieve a violation analysis trail + """ + Retrieve a violation analysis trail Args: component (string): The UUID of the component @@ -41,7 +42,6 @@ def get_violation_analysis(self, component, policyViolation): response = self.session.get(self.apicall + "/v1/violation/analysis", params={"component": component, "policyViolation": policyViolation}) if response.status_code == 200: return response.json() - elif response.status_code == 401: + if response.status_code == 401: return (f"Unauthorized, {response.status_code}") - else: - return (f"{(response.content).decode('utf-8')}, {response.status_code}") + return (f"{(response.content).decode('utf-8')}, {response.status_code}") diff --git a/apicomponents/vulnerability.py b/apicomponents/vulnerability.py index 0168409..215e18c 100644 --- a/apicomponents/vulnerability.py +++ b/apicomponents/vulnerability.py @@ -1,21 +1,20 @@ -class Vulnerability(object): +class Vulnerability: def get_all_vulnerabilities(self, pageSize=100): vulnerability_list = list() pageNumber = 1 response = self.session.get(self.apicall + "/v1/vulnerability", params={ - 'pageSize': pageSize, 'pageNumber': pageNumber}) - for vul in range(0, len(response.json())): + "pageSize": pageSize, "pageNumber": pageNumber}) + for vul in range(len(response.json())): vulnerability_list.append(response.json()[vul - 1]) while len(response.json()) == pageSize: pageNumber += 1 response = self.session.get(self.apicall + "/v1/vulnerability", params={ - 'pageSize': pageSize, 'pageNumber': pageNumber}) - for vul in range(0, len(response.json())): + "pageSize": pageSize, "pageNumber": pageNumber}) + for vul in range(len(response.json())): vulnerability_list.append(response.json()[vul - 1]) if response.status_code == 200: return vulnerability_list - else: - return (f"Unable to find any vulnerabilities, {response.status_code}") + return (f"Unable to find any vulnerabilities, {response.status_code}") def get_vulnerability(self, source, vuln): """ @@ -27,14 +26,13 @@ def get_vulnerability(self, source, vuln): self.apicall + f"/v1/vulnerability/source/{source}/vuln/{vuln}") if response.status_code == 200: return response.json() - else: - if response.status_code == 401: - return (f"Unauthorized , {response.status_code}") - else: - return (f"The vulnerability could not be found, {response.status_code}") + if response.status_code == 401: + return (f"Unauthorized , {response.status_code}") + return (f"The vulnerability could not be found, {response.status_code}") def get_component_vulnerability(self, uuid, suppressed=False, pageSize=100): - """ Returns a list of all vulnerabilities for a specific component. + """ + Returns a list of all vulnerabilities for a specific component. uuid: suppressed: optionally includes suppressed vulnerabilities """ @@ -42,74 +40,72 @@ def get_component_vulnerability(self, uuid, suppressed=False, pageSize=100): pageNumber = 1 response = self.session.get(self.apicall + f"/v1/vulnerability/component/{uuid}?suppressed={suppressed}", params={ "pageSize": pageSize, "pageNumber": pageNumber}) - for vul in range(0, len(response.json())): + for vul in range(len(response.json())): vulnerability_list.append(response.json()[vul - 1]) while len(response.json()) == pageSize: pageNumber += 1 response = self.session.get(self.apicall + f"/v1/vulnerability/component/{uuid}?suppressed={suppressed}", params={ - 'pageSize': pageSize, 'pageNumber': pageNumber}) - for vul in range(0, len(response.json())): + "pageSize": pageSize, "pageNumber": pageNumber}) + for vul in range(len(response.json())): vulnerability_list.append(response.json()[vul - 1]) if response.status_code == 200: return vulnerability_list - else: - if response.status_code == 401: - return (f"Unauthorized, {response.status_code}") - elif response.status_code == 403: - return (f"Access to the specified component is forbidden, {response.status_code}") - else: - return (f"The component could not be found, {response.status_code}") + if response.status_code == 401: + return (f"Unauthorized, {response.status_code}") + if response.status_code == 403: + return (f"Access to the specified component is forbidden, {response.status_code}") + return (f"The component could not be found, {response.status_code}") def get_project_vulnerability(self, uuid, suppressed=False, pageSize=100): - """ Returns a list of all vulnerabilities for a specific project. + """ + Returns a list of all vulnerabilities for a specific project. uuid: - suppressed: optionally includes suppressed vulnerabilities(boolean) """ + suppressed: optionally includes suppressed vulnerabilities(boolean) + """ vulnerability_list = list() pageNumber = 1 response = self.session.get(self.apicall + f"/v1/vulnerability/project/{uuid}?suppressed={suppressed}", params={ "pageSize": pageSize, "pageNumber": pageNumber}) - for vul in range(0, len(response.json())): + for vul in range(len(response.json())): vulnerability_list.append(response.json()[vul - 1]) while len(response.json()) == pageSize: pageNumber += 1 response = self.session.get(self.apicall + f"/v1/vulnerability/project/{uuid}?suppressed={suppressed}", params={ - 'pageSize': pageSize, 'pageNumber': pageNumber}) - for vul in range(0, len(response.json())): + "pageSize": pageSize, "pageNumber": pageNumber}) + for vul in range(len(response.json())): vulnerability_list.append(response.json()[vul - 1]) if response.status_code == 200: return vulnerability_list - else: - if response.status_code == 401: - return (f"Unauthorized, {response.status_code}") - elif response.status_code == 403: - return (f"Access to the specified project is forbidden, {response.status_code}") - else: - return (f"The project could not be found, {response.status_code}") + if response.status_code == 401: + return (f"Unauthorized, {response.status_code}") + if response.status_code == 403: + return (f"Access to the specified project is forbidden, {response.status_code}") + return (f"The project could not be found, {response.status_code}") def get_vulnerability_uuid(self, uuid): - """ returns a specific vulnerability - uuid: The UUID of the vulnerability """ + """ + returns a specific vulnerability + uuid: The UUID of the vulnerability + """ response = self.session.get(self.apicall + f"/v1/vulnerability/{uuid}") if response.status_code == 200: return response.json() - else: - if response.status_code == 401: - return (f"Unauthorized, {response.status_code}") - else: - return (f"The vulnerability could not be found, {response.status_code}") + if response.status_code == 401: + return (f"Unauthorized, {response.status_code}") + return (f"The vulnerability could not be found, {response.status_code}") def get_affected_project(self, source, vuln): - """ Returns a list of all projects affected by a specific vulnerability + """ + Returns a list of all projects affected by a specific vulnerability source: - vuln: """ + vuln: + """ response = self.session.get( self.apicall + f"/v1/vulnerability/source/{source}/vuln/{vuln}/projects") if response.status_code == 200: return response.json() - else: - if response.status_code == 401: - return (f"Unauthorized, {response.status_code}") - else: - return (f"The vulnerability could not be found, {response.status_code}") + if response.status_code == 401: + return (f"Unauthorized, {response.status_code}") + return (f"The vulnerability could not be found, {response.status_code}") # TODO: POST,POST /v1/vulnerability # TODO: DELETE, POST /v1/vulnerability/source/{source}/vuln/{vulnId}/component/{component}, DELETE, POST /v1/vulnerability/{uuid}/component/{component} diff --git a/main.py b/main.py index d189d3b..34a23de 100644 --- a/main.py +++ b/main.py @@ -1,28 +1,29 @@ import requests + +from apicomponents.acl import ACL from apicomponents.analysis import Analysis from apicomponents.badge import Badge +from apicomponents.bom import Bom +from apicomponents.calculator import Calculator +from apicomponents.configproperty import ConfigProperty +from apicomponents.cwe import CWE +from apicomponents.finding import Finding from apicomponents.ldap import LDAP +from apicomponents.license import License from apicomponents.licensegroup import LicenseGroup +from apicomponents.metrics import Metrics +from apicomponents.permission import Permission from apicomponents.policy import Policy from apicomponents.project import Project from apicomponents.projectProperty import ProjectProperty from apicomponents.repository import Repository +from apicomponents.search import Search from apicomponents.service import Service +from apicomponents.team import Team +from apicomponents.user import User from apicomponents.violation import Violation from apicomponents.violationAnalysis import ViolationAnalysis from apicomponents.vulnerability import Vulnerability -from apicomponents.finding import Finding -from apicomponents.license import License -from apicomponents.metrics import Metrics -from apicomponents.acl import ACL -from apicomponents.bom import Bom -from apicomponents.cwe import CWE -from apicomponents.configproperty import ConfigProperty -from apicomponents.calculator import Calculator -from apicomponents.team import Team -from apicomponents.permission import Permission -from apicomponents.search import Search -from apicomponents.user import User class DependencyTrackAPI(Project, ProjectProperty, Vulnerability, Finding, License, Metrics, ACL, Bom, CWE, ConfigProperty, Badge, Calculator, Team, Permission, LDAP, Service, Violation, Repository, Analysis, Policy, ViolationAnalysis, LicenseGroup, Search, User): diff --git a/requirements-lint.txt b/requirements-lint.txt new file mode 100644 index 0000000..8bf2f34 --- /dev/null +++ b/requirements-lint.txt @@ -0,0 +1 @@ +ruff==0.7.1 \ No newline at end of file diff --git a/ruff.toml b/ruff.toml new file mode 100644 index 0000000..9f5ce7c --- /dev/null +++ b/ruff.toml @@ -0,0 +1,106 @@ +# Always generate Python 3.11-compatible code. +target-version = "py311" + +# Same as Black. +line-length = 120 + +exclude = [ + ".bzr", + ".direnv", + ".eggs", + ".git", + ".git-rewrite", + ".hg", + ".mypy_cache", + ".nox", + ".pants.d", + ".pytype", + ".ruff_cache", + ".svn", + ".tox", + ".venv", + "__pypackages__", + "_build", + "buck-out", + "build", + "dist", + "node_modules", + "venv", + # Not for the dojo specific stuff + "dojo/db_migrations" +] + +[lint] +select = [ + "F", + "E", + "W", + "C90", + "I", + "D3", "D403", + "UP", + "YTT", + "ASYNC", + "S2", "S5", "S7", "S101", "S112", "S311", + "FBT001", "FBT003", + "A003", "A004", "A006", + "COM", + "T10", + "DJ003", "DJ012", "DJ013", + "EM", + "EXE", + "ISC001", + "ICN", + "LOG", + "G001", "G002", "G1", "G2", + "PIE", + "Q", + "RSE", + "SLOT", + "TID", + "TCH", + "INT", + "ARG003", "ARG004", "ARG005", + "PTH2", + "TD001", "TD005", + "PD", + "PGH", + "PLE", + "PLR0915", + "PLW1", "PLW2", "PLW3", + "TRY003", + "TRY004", + "TRY2", + "FLY", + "NPY", + "FAST", + "AIR", + "FURB", +] +ignore = [ + "E501", + "E722", + "RUF010", + "RUF012", + "RUF015", + "RUF027", + "D205", + "D211", # `one-blank-line-before-class` (D203) and `no-blank-line-before-class` (D211) are incompatible. + "D212", # `multi-line-summary-first-line` (D212) and `multi-line-summary-second-line` (D213) are incompatible. +] + +# Allow autofix for all enabled rules (when `--fix`) is provided. +fixable = ["ALL"] +unfixable = [] +preview = true + +per-file-ignores = {} + +[lint.flake8-boolean-trap] +extend-allowed-calls = ["dojo.utils.get_system_setting"] + +[lint.pylint] +max-statements = 234 + +[lint.mccabe] +max-complexity = 70 # value is far from perfect (recommended default is 10). But we will try to decrease it over the time.