From f5f965696d6874a968fb52f1d8e28d1c08836fab Mon Sep 17 00:00:00 2001 From: Roman Babenko Date: Mon, 23 Dec 2024 16:28:52 +0200 Subject: [PATCH] fix xlsx report for diff (#644) * fix xlsx report for diff * use . instead _ in diff report name for py3.8 compatibility * style * import optimization * update test * Update __init__.py * remove warnings * remove get_json_filenames --- credsweeper/__init__.py | 2 +- credsweeper/__main__.py | 38 +++------- credsweeper/app.py | 39 +++++++--- credsweeper/scanner/scan_type/scan_type.py | 1 - tests/__init__.py | 2 +- tests/filters/test_line_git_binary_check.py | 4 +- tests/filters/test_line_uue_part_check.py | 5 +- tests/filters/test_value_hex_number_check.py | 2 +- tests/ml_model/test_features.py | 1 - tests/samples/uuid-update.patch | 10 +++ tests/test_app.py | 77 ++++++++++++++------ tests/test_main.py | 56 +++----------- 12 files changed, 121 insertions(+), 116 deletions(-) create mode 100644 tests/samples/uuid-update.patch diff --git a/credsweeper/__init__.py b/credsweeper/__init__.py index ebe783388..564d3558f 100644 --- a/credsweeper/__init__.py +++ b/credsweeper/__init__.py @@ -18,4 +18,4 @@ '__version__' ] -__version__ = "1.10.0" +__version__ = "1.10.1" diff --git a/credsweeper/__main__.py b/credsweeper/__main__.py index c15c7d36e..058bc15ac 100644 --- a/credsweeper/__main__.py +++ b/credsweeper/__main__.py @@ -4,7 +4,7 @@ import sys import time from argparse import ArgumentParser, ArgumentTypeError, Namespace -from typing import Any, Union, Optional, Dict +from typing import Any, Union, Dict from credsweeper import __version__ from credsweeper.app import APP_PATH, CredSweeper @@ -253,36 +253,19 @@ def get_arguments() -> Namespace: return parser.parse_args() -def get_json_filenames(json_filename: str): - """Auxiliary function to get names for json files with added and deleted .patch data - - Args: - json_filename: original json path - - Returns: - Tuple of paths with added and deleted suffixes - - """ - if json_filename is None: - return None, None - added_json_filename = json_filename[:-5] + "_added.json" - deleted_json_filename = json_filename[:-5] + "_deleted.json" - return added_json_filename, deleted_json_filename - - -def scan(args: Namespace, content_provider: AbstractProvider, json_filename: Optional[str], - xlsx_filename: Optional[str]) -> int: +def scan(args: Namespace, content_provider: AbstractProvider) -> int: """Scan content_provider data, print results or save them to json_filename is not None Args: args: arguments of the application content_provider: FilesProvider instance to scan data from - json_filename: json type report file path or None - xlsx_filename: xlsx type report file path or None Returns: Number of detected credentials + Warnings: + DeprecationWarning: Using 'json_filename' and/or 'xlsx_filename' will issue a warning. + """ try: if args.denylist_path is not None: @@ -292,8 +275,8 @@ def scan(args: Namespace, content_provider: AbstractProvider, json_filename: Opt credsweeper = CredSweeper(rule_path=args.rule_path, config_path=args.config_path, - json_filename=json_filename, - xlsx_filename=xlsx_filename, + json_filename=args.json_filename, + xlsx_filename=args.xlsx_filename, color=args.color, hashed=args.hashed, subtext=args.subtext, @@ -332,21 +315,20 @@ def main() -> int: if args.path: logger.info(f"Run analyzer on path: {args.path}") content_provider: AbstractProvider = FilesProvider(args.path, skip_ignored=args.skip_ignored) - credentials_number = scan(args, content_provider, args.json_filename, args.xlsx_filename) + credentials_number = scan(args, content_provider) summary["Detected Credentials"] = credentials_number if 0 <= credentials_number: result = EXIT_SUCCESS elif args.diff_path: - added_json_filename, deleted_json_filename = get_json_filenames(args.json_filename) # Analyze added data logger.info(f"Run analyzer on added rows from patch files: {args.diff_path}") content_provider = PatchesProvider(args.diff_path, change_type=DiffRowType.ADDED) - add_credentials_number = scan(args, content_provider, added_json_filename, args.xlsx_filename) + add_credentials_number = scan(args, content_provider) summary["Added File Credentials"] = add_credentials_number # Analyze deleted data logger.info(f"Run analyzer on deleted rows from patch files: {args.diff_path}") content_provider = PatchesProvider(args.diff_path, change_type=DiffRowType.DELETED) - del_credentials_number = scan(args, content_provider, deleted_json_filename, args.xlsx_filename) + del_credentials_number = scan(args, content_provider) summary["Deleted File Credentials"] = del_credentials_number if 0 <= add_credentials_number and 0 <= del_credentials_number: result = EXIT_SUCCESS diff --git a/credsweeper/app.py b/credsweeper/app.py index 556fc12c4..0061dc8a6 100644 --- a/credsweeper/app.py +++ b/credsweeper/app.py @@ -10,7 +10,7 @@ # Directory of credsweeper sources MUST be placed before imports to avoid circular import error APP_PATH = Path(__file__).resolve().parent -from credsweeper.common.constants import KeyValidationOption, Severity, ThresholdPreset +from credsweeper.common.constants import KeyValidationOption, Severity, ThresholdPreset, DiffRowType from credsweeper.config import Config from credsweeper.credentials import Candidate, CredentialManager, CandidateKey from credsweeper.deep_scanner.deep_scanner import DeepScanner @@ -67,10 +67,8 @@ def __init__(self, validation was the grained candidate model on machine learning config_path: optional str variable, path of CredSweeper config file default built-in config is used if None - json_filename: optional string variable, path to save result - to json - xlsx_filename: optional string variable, path to save result - to xlsx + json_filename: optional string variable, path to save result to json + xlsx_filename: optional string variable, path to save result to xlsx color: print results to stdout with colorization hashed: use hash of line, value and variable instead plain text subtext: use subtext of line near variable-value like it performed in ML @@ -241,7 +239,9 @@ def run(self, content_provider: AbstractProvider) -> int: logger.info(f"Start Scanner for {len(file_extractors)} providers") self.scan(file_extractors) self.post_processing() - self.export_results() + # PatchesProvider has the attribute. Circular import error appears with using the isinstance + change_type = content_provider.change_type if hasattr(content_provider, "change_type") else None + self.export_results(change_type) return len(self.credential_manager.get_credentials()) @@ -381,12 +381,19 @@ def post_processing(self) -> None: # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # - def export_results(self) -> None: - """Save credential candidates to json file or print them to a console.""" + def export_results(self, change_type: Optional[DiffRowType] = None) -> None: + """ + Save credential candidates to json file or print them to a console. + + Args: + change_type: flag to know which file should be created for a patch + """ is_exported = False credentials = self.credential_manager.get_credentials() + logger.info(f"Exporting {len(credentials)} credentials") + if self.sort_output: credentials.sort(key=lambda x: ( # x.line_data_list[0].path, # @@ -398,9 +405,13 @@ def export_results(self) -> None: )) if self.json_filename: + json_path = Path(self.json_filename) is_exported = True + if isinstance(change_type, DiffRowType): + # add suffix for appropriated reports to create two files for the patch scan + json_path = json_path.with_suffix(f".{change_type.value}{json_path.suffix}") Util.json_dump([credential.to_json(hashed=self.hashed, subtext=self.subtext) for credential in credentials], - file_path=self.json_filename) + file_path=json_path) if self.xlsx_filename: is_exported = True @@ -408,7 +419,15 @@ def export_results(self) -> None: for credential in credentials: data_list.extend(credential.to_dict_list(hashed=self.hashed, subtext=self.subtext)) df = pd.DataFrame(data=data_list) - df.to_excel(self.xlsx_filename, index=False) + if isinstance(change_type, DiffRowType): + if Path(self.xlsx_filename).exists(): + with pd.ExcelWriter(self.xlsx_filename, mode='a', engine="openpyxl", + if_sheet_exists="replace") as writer: + df.to_excel(writer, sheet_name=change_type.value, index=False) + else: + df.to_excel(self.xlsx_filename, sheet_name=change_type.value, index=False) + else: + df.to_excel(self.xlsx_filename, sheet_name="report", index=False) if self.color: is_exported = True diff --git a/credsweeper/scanner/scan_type/scan_type.py b/credsweeper/scanner/scan_type/scan_type.py index 208578a86..15e385e38 100644 --- a/credsweeper/scanner/scan_type/scan_type.py +++ b/credsweeper/scanner/scan_type/scan_type.py @@ -7,7 +7,6 @@ from credsweeper.config import Config from credsweeper.credentials import Candidate, LineData from credsweeper.file_handler.analysis_target import AnalysisTarget - from credsweeper.filters import Filter from credsweeper.rules import Rule diff --git a/tests/__init__.py b/tests/__init__.py index 89cb9fffe..f42a82d37 100644 --- a/tests/__init__.py +++ b/tests/__init__.py @@ -1,7 +1,7 @@ from pathlib import Path # total number of files in test samples -SAMPLES_FILES_COUNT = 141 +SAMPLES_FILES_COUNT = 142 # the lowest value of ML threshold is used to display possible lowest values NEGLIGIBLE_ML_THRESHOLD = 0.0001 diff --git a/tests/filters/test_line_git_binary_check.py b/tests/filters/test_line_git_binary_check.py index 8f20208a1..34e1eec3b 100644 --- a/tests/filters/test_line_git_binary_check.py +++ b/tests/filters/test_line_git_binary_check.py @@ -1,8 +1,8 @@ import pytest from credsweeper.file_handler.analysis_target import AnalysisTarget -from credsweeper.filters import LineSpecificKeyCheck, LineGitBinaryCheck -from tests.filters.conftest import LINE_VALUE_PATTERN, DUMMY_ANALYSIS_TARGET, DUMMY_DESCRIPTOR +from credsweeper.filters import LineGitBinaryCheck +from tests.filters.conftest import LINE_VALUE_PATTERN, DUMMY_DESCRIPTOR from tests.test_utils.dummy_line_data import get_line_data diff --git a/tests/filters/test_line_uue_part_check.py b/tests/filters/test_line_uue_part_check.py index 7044c9dc7..c75920fac 100644 --- a/tests/filters/test_line_uue_part_check.py +++ b/tests/filters/test_line_uue_part_check.py @@ -1,11 +1,8 @@ from unittest import TestCase -import pytest - from credsweeper.file_handler.analysis_target import AnalysisTarget -from credsweeper.filters import LineSpecificKeyCheck, LineGitBinaryCheck from credsweeper.filters.line_uue_part_check import LineUUEPartCheck -from tests.filters.conftest import LINE_VALUE_PATTERN, DUMMY_ANALYSIS_TARGET, DUMMY_DESCRIPTOR +from tests.filters.conftest import LINE_VALUE_PATTERN, DUMMY_DESCRIPTOR from tests.test_utils.dummy_line_data import get_line_data diff --git a/tests/filters/test_value_hex_number_check.py b/tests/filters/test_value_hex_number_check.py index a7778a297..fad9ad100 100644 --- a/tests/filters/test_value_hex_number_check.py +++ b/tests/filters/test_value_hex_number_check.py @@ -1,6 +1,6 @@ import pytest -from credsweeper.filters import ValueNumberCheck, ValueHexNumberCheck +from credsweeper.filters import ValueHexNumberCheck from tests.filters.conftest import LINE_VALUE_PATTERN, DUMMY_ANALYSIS_TARGET from tests.test_utils.dummy_line_data import get_line_data diff --git a/tests/ml_model/test_features.py b/tests/ml_model/test_features.py index 4dfaefc16..9f063e514 100644 --- a/tests/ml_model/test_features.py +++ b/tests/ml_model/test_features.py @@ -9,7 +9,6 @@ from credsweeper.ml_model.features.is_secret_numeric import IsSecretNumeric from credsweeper.ml_model.features.word_in_line import WordInLine from credsweeper.ml_model.features.word_in_value import WordInValue -from credsweeper.utils.entropy_validator import EntropyValidator from tests import AZ_STRING RE_TEST_PATTERN = re.compile(r"(?P.*) (?Pover) (?P.+)") diff --git a/tests/samples/uuid-update.patch b/tests/samples/uuid-update.patch new file mode 100644 index 000000000..2d2618e0a --- /dev/null +++ b/tests/samples/uuid-update.patch @@ -0,0 +1,10 @@ +diff --git a/uuid b/uuid +index 0a20cf1..b52370e 100644 +--- a/uuid ++++ b/uuid +@@ -1 +1 @@ +-bace4d19-fa7e-beef-cafe-9129474bcd81 +\ No newline at end of file ++bace4d19-fa7e-dead-beef-9129474bcd81 +\ No newline at end of file + diff --git a/tests/test_app.py b/tests/test_app.py index da8831c26..5152cd451 100644 --- a/tests/test_app.py +++ b/tests/test_app.py @@ -10,7 +10,8 @@ from unittest import TestCase import deepdiff -import pytest +import numpy as np +import pandas as pd from credsweeper.app import APP_PATH from credsweeper.utils import Util @@ -25,12 +26,12 @@ def setUp(self): @staticmethod def _m_credsweeper(args) -> Tuple[str, str]: - proc = subprocess.Popen( + with subprocess.Popen( [sys.executable, "-m", "credsweeper", *args], # - cwd=APP_PATH.parent, # - stdout=subprocess.PIPE, # - stderr=subprocess.PIPE) # - _stdout, _stderr = proc.communicate() + cwd=APP_PATH.parent, # + stdout=subprocess.PIPE, # + stderr=subprocess.PIPE) as proc: + _stdout, _stderr = proc.communicate() def transform(x: AnyStr) -> str: if isinstance(x, bytes): @@ -332,8 +333,8 @@ def test_patch_save_json_p(self) -> None: json_filename = os.path.join(tmp_dir, f"{__name__}.json") _stdout, _stderr = self._m_credsweeper( ["--diff_path", target_path, "--save-json", json_filename, "--log", "silence"]) - self.assertTrue(os.path.exists(os.path.join(tmp_dir, f"{__name__}_added.json"))) - self.assertTrue(os.path.exists(os.path.join(tmp_dir, f"{__name__}_deleted.json"))) + self.assertTrue(os.path.exists(os.path.join(tmp_dir, f"{__name__}.added.json"))) + self.assertTrue(os.path.exists(os.path.join(tmp_dir, f"{__name__}.deleted.json"))) # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # @@ -644,22 +645,56 @@ def test_no_filters_p(self) -> None: # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # - def test_severity_p(self) -> None: - _stdout, _stderr = self._m_credsweeper([ # - "--log", "silence", "--ml_threshold", "0", "--severity", "medium", "--path", - str(SAMPLES_PATH) - ]) - self.assertIn("severity: medium", _stdout) - self.assertNotIn("severity: info", _stdout) + def test_severity_patch_xlsx_n(self) -> None: + # uuid is info level - no report + with tempfile.TemporaryDirectory() as tmp_dir: + _stdout, _stderr = self._m_credsweeper([ # + "--severity", + "low", + "--diff", + str(SAMPLES_PATH / "uuid-update.patch"), + "--save-xlsx", + os.path.join(tmp_dir, f"{__name__}.xlsx"), + "--save-json", + os.path.join(tmp_dir, f"{__name__}.json"), + ]) + # reports are created + self.assertEqual(3, len(os.listdir(tmp_dir))) + # but empty + self.assertListEqual([], Util.json_load(os.path.join(tmp_dir, f"{__name__}.deleted.json"))) + self.assertListEqual([], Util.json_load(os.path.join(tmp_dir, f"{__name__}.added.json"))) + self.assertEqual(0, len(pd.read_excel(os.path.join(tmp_dir, f"{__name__}.xlsx")))) # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # - def test_severity_n(self) -> None: - _stdout, _stderr = self._m_credsweeper([ # - "--log", "silence", "--ml_threshold", "0", "--severity", "critical", "--path", - str(SAMPLES_PATH) - ]) - self.assertNotIn("severity: medium", _stdout) + def test_severity_patch_xlsx_p(self) -> None: + # info level produces xlsx file with "added" and "deleted" sheets and two json files + with tempfile.TemporaryDirectory() as tmp_dir: + xlsx_filename = os.path.join(tmp_dir, f"{__name__}.xlsx") + _stdout, _stderr = self._m_credsweeper([ # + "--severity", + "info", + "--diff", + str(SAMPLES_PATH / "uuid-update.patch"), + "--save-xlsx", + xlsx_filename, + "--save-json", + os.path.join(tmp_dir, f"{__name__}.json"), + ]) + deleted_report_file = os.path.join(tmp_dir, f"{__name__}.deleted.json") + deleted_report = Util.json_load(deleted_report_file) + self.assertEqual("UUID", deleted_report[0]["rule"]) + added_report_file = os.path.join(tmp_dir, f"{__name__}.added.json") + added_report = Util.json_load(added_report_file) + self.assertEqual("UUID", added_report[0]["rule"]) + book = pd.read_excel(xlsx_filename, sheet_name=None, header=None) + # two sheets should be created + self.assertSetEqual({"deleted", "added"}, set(book.keys())) + # values in xlsx are wrapped to double quotes + deleted_value = f'"{deleted_report[0]["line_data_list"][0]["value"]}"' + self.assertTrue(np.isin(deleted_value, book["deleted"].values)) + added_value = f'"{added_report[0]["line_data_list"][0]["value"]}"' + self.assertTrue(np.isin(added_value, book["added"].values)) # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # diff --git a/tests/test_main.py b/tests/test_main.py index 890ae1bd8..0436658ff 100644 --- a/tests/test_main.py +++ b/tests/test_main.py @@ -8,7 +8,7 @@ import uuid from argparse import ArgumentTypeError from pathlib import Path -from typing import List, Set, Any, Dict +from typing import List, Any, Dict from unittest import mock from unittest.mock import Mock, patch @@ -22,7 +22,6 @@ from credsweeper.app import APP_PATH from credsweeper.app import CredSweeper from credsweeper.common.constants import ThresholdPreset, Severity -from credsweeper.credentials import Candidate from credsweeper.file_handler.abstract_provider import AbstractProvider from credsweeper.file_handler.files_provider import FilesProvider from credsweeper.file_handler.text_content_provider import TextContentProvider @@ -70,42 +69,6 @@ def test_use_filters_n(self) -> None: # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # - @mock.patch("json.dump") - def test_save_json_p(self, mock_json_dump) -> None: - cred_sweeper = CredSweeper(json_filename="unittest_output.json") - cred_sweeper.run([]) - mock_json_dump.assert_called() - self.assertTrue(os.path.exists("unittest_output.json")) - os.remove("unittest_output.json") - - # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # - - @mock.patch("json.dump") - def test_save_json_n(self, mock_json_dump) -> None: - cred_sweeper = CredSweeper() - cred_sweeper.run([]) - mock_json_dump.assert_not_called() - - # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # - - def test_save_xlsx_p(self) -> None: - with tempfile.TemporaryDirectory() as tmp_dir: - test_filename = os.path.join(tmp_dir, "unittest_output.xlsx") - self.assertFalse(os.path.exists(test_filename)) - cred_sweeper = CredSweeper(xlsx_filename=test_filename) - cred_sweeper.run([]) - self.assertTrue(os.path.exists(test_filename)) - - # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # - - @mock.patch("pandas.DataFrame", return_value=pd.DataFrame(data=[])) - def test_save_xlsx_n(self, mock_xlsx_to_excel) -> None: - cred_sweeper = CredSweeper() - cred_sweeper.run([]) - mock_xlsx_to_excel.assert_not_called() - - # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # - @mock.patch("credsweeper.__main__.scan", return_value=None) @mock.patch("credsweeper.__main__.get_arguments") def test_main_n(self, mock_get_arguments, mock_scan) -> None: @@ -124,8 +87,8 @@ def test_main_path_p(self, mock_get_arguments) -> None: path=None, config_path=None, diff_path=[str(target_path)], - json_filename=os.path.join(tmp_dir, f"{__name__}.json"), - xlsx_filename=None, + json_filename=Path(os.path.join(tmp_dir, f"{__name__}.json")), + xlsx_filename=Path(os.path.join(tmp_dir, f"{__name__}.xlsx")), color=False, subtext=False, hashed=False, @@ -140,9 +103,10 @@ def test_main_path_p(self, mock_get_arguments) -> None: denylist_path=None) mock_get_arguments.return_value = args_mock self.assertEqual(EXIT_SUCCESS, app_main.main()) - self.assertTrue(os.path.exists(os.path.join(tmp_dir, f"{__name__}_deleted.json"))) - self.assertTrue(os.path.exists(os.path.join(tmp_dir, f"{__name__}_added.json"))) - report = Util.json_load(os.path.join(tmp_dir, f"{__name__}_added.json")) + self.assertTrue(os.path.exists(os.path.join(tmp_dir, f"{__name__}.xlsx"))) + self.assertTrue(os.path.exists(os.path.join(tmp_dir, f"{__name__}.deleted.json"))) + self.assertTrue(os.path.exists(os.path.join(tmp_dir, f"{__name__}.added.json"))) + report = Util.json_load(os.path.join(tmp_dir, f"{__name__}.added.json")) self.assertTrue(report) self.assertEqual(3, report[0]["line_data_list"][0]["line_num"]) self.assertEqual("dkajco1", report[0]["line_data_list"][0]["value"]) @@ -174,9 +138,9 @@ def test_binary_patch_p(self, mock_get_arguments) -> None: denylist_path=None) mock_get_arguments.return_value = args_mock self.assertEqual(EXIT_SUCCESS, app_main.main()) - self.assertTrue(os.path.exists(os.path.join(tmp_dir, f"{__name__}_deleted.json"))) - self.assertTrue(os.path.exists(os.path.join(tmp_dir, f"{__name__}_added.json"))) - report = Util.json_load(os.path.join(tmp_dir, f"{__name__}_added.json")) + self.assertTrue(os.path.exists(os.path.join(tmp_dir, f"{__name__}.deleted.json"))) + self.assertTrue(os.path.exists(os.path.join(tmp_dir, f"{__name__}.added.json"))) + report = Util.json_load(os.path.join(tmp_dir, f"{__name__}.added.json")) self.assertTrue(report) self.assertEqual(5, len(report)) # zip file inside binary diff