From 16d0c68f4150dfe4791710e238f0ce4dc22465e5 Mon Sep 17 00:00:00 2001 From: Roman Babenko Date: Sat, 21 Oct 2023 14:11:22 +0300 Subject: [PATCH] Implement git repositories scanning with commits --- .mypy.ini | 3 + credsweeper/__main__.py | 103 ++++++++++++++++++- credsweeper/app.py | 15 ++- credsweeper/common/keyword_checklist.txt | 1 + credsweeper/file_handler/files_provider.py | 8 +- credsweeper/file_handler/patch_provider.py | 8 +- credsweeper/file_handler/text_provider.py | 4 +- docs/source/guide.rst | 7 +- requirements.txt | 1 + setup.py | 1 + tests/test_app.py | 65 ++++++++---- tests/test_main.py | 112 ++++++++++++++------- 12 files changed, 258 insertions(+), 70 deletions(-) diff --git a/.mypy.ini b/.mypy.ini index 38c842392..b834f1f13 100644 --- a/.mypy.ini +++ b/.mypy.ini @@ -45,3 +45,6 @@ ignore_missing_imports = True [mypy-password_strength.*] ignore_missing_imports = True + +[mypy-pydriller.*] +ignore_missing_imports = True diff --git a/credsweeper/__main__.py b/credsweeper/__main__.py index 9e7add203..c938554c0 100644 --- a/credsweeper/__main__.py +++ b/credsweeper/__main__.py @@ -1,10 +1,15 @@ +import base64 import binascii +import hashlib +import io import logging import os import sys import time from argparse import ArgumentParser, ArgumentTypeError, Namespace -from typing import Any, Union, Optional, Dict +from typing import Any, Union, Optional, Dict, List, Tuple + +from pydriller import Repository from credsweeper import __version__ from credsweeper.app import APP_PATH, CredSweeper @@ -116,6 +121,17 @@ def get_arguments() -> Namespace: const="log.yaml", dest="export_log_config", metavar="PATH") + group.add_argument("--git", nargs="+", help="git repo to scan", dest="git", metavar="PATH") + parser.add_argument("--commits", + help="scan git repo for N commits only", + type=positive_int, + dest="commits", + default=0, + metavar="POSITIVE_INT") + parser.add_argument("--branch", + help="scan git repo for single branch, otherwise - all branches were scanned (slow)", + dest="branch", + type=str) parser.add_argument("--rules", nargs="?", help="path of rule config file (default: credsweeper/rules/config.yaml). " @@ -300,9 +316,85 @@ def scan(args: Namespace, content_provider: FilesProvider, json_filename: Option return credsweeper.run(content_provider=content_provider) except Exception as exc: logger.critical(exc, exc_info=True) + logger.exception(exc) return -1 +def scan_git(args: Namespace) -> Tuple[int, int, int]: + """Scan repository for branches and commits + Returns: + total credentials found + total scanned branches + total scanned commits + """ + total_credentials = 0 + total_branches = 0 + total_commits = 0 + try: + sha1git = hashlib.sha1(str(args.git).encode()).digest() + repo_hash = base64.b32encode(sha1git).decode("ascii") + journal_filename = f"{repo_hash}.json" + logger.info(f"{args.git} sha1 in base32 {repo_hash}") + repo_journal = Util.json_load(journal_filename) + if not isinstance(repo_journal, dict): + with open(journal_filename, "w") as f: + f.write("{}") + repo_journal = dict() + credsweeper = CredSweeper(rule_path=args.rule_path, + config_path=args.config_path, + api_validation=args.api_validation, + sort_output=args.sort_output, + use_filters=args.no_filters, + pool_count=args.jobs, + ml_batch_size=args.ml_batch_size, + ml_threshold=args.ml_threshold, + azure=args.azure, + cuda=args.cuda, + find_by_ext=args.find_by_ext, + depth=args.depth, + doc=args.doc, + severity=args.severity, + size_limit=args.size_limit, + log_level=args.log) + repository = Repository(args.git, only_in_branch=args.branch) + for commit in repository.traverse_commits(): + if commit.hash in repo_journal: + logger.debug(f"Skip already scanned commit: {commit.hash}") + continue + logger.info(f"Scan commit: {commit.hash}") + paths: List[Tuple[str, io.BytesIO]] = [] + for file in commit.modified_files: + logger.info(f"FILE: {file.old_path} -> {file.new_path}") + try: + if file.new_path is not None: + _io = io.BytesIO(file.content) + paths.append((file.filename, _io)) + except ValueError as exc: + logger.error("Possible missed submodule:%s", str(exc)) + provider = TextProvider(paths) + if args.json_filename: + ext = Util.get_extension(args.json_filename, False) + credsweeper.json_filename = f"{args.json_filename[:-len(ext)]}.{commit.hash}{ext}" + if args.xlsx_filename: + ext = Util.get_extension(args.xlsx_filename, False) + credsweeper.xlsx_filename = f"{args.xlsx_filename[:-len(ext)]}.{commit.hash}{ext}" + + commit_cred_number = credsweeper.run(provider) + if credsweeper.is_ml_validator_inited: + # reset not-pickled object for multiprocess + credsweeper.ml_validator = None + credsweeper.credential_manager.candidates.clear() + total_credentials += commit_cred_number + total_commits += 1 + repo_journal[commit.hash] = commit_cred_number + Util.json_dump(repo_journal, journal_filename) + total_branches += 1 + except Exception as exc: + logger.critical(exc, exc_info=True) + return -1, total_branches, total_commits + return total_credentials, total_branches, total_commits + + def main() -> int: """Main function""" result = EXIT_FAILURE @@ -311,7 +403,7 @@ def main() -> int: if args.banner: print(f"CredSweeper {__version__} crc32:{check_integrity():08x}") Logger.init_logging(args.log, args.log_config_path) - logger.info(f"Init CredSweeper object with arguments: {args}") + logger.info(f"Init CredSweeper object with arguments: {args} CWD: {os.getcwd()}") summary: Dict[str, int] = {} if args.path: logger.info(f"Run analyzer on path: {args.path}") @@ -334,6 +426,13 @@ def main() -> int: summary["Deleted File Credentials"] = del_credentials_number if 0 <= add_credentials_number and 0 <= del_credentials_number: result = EXIT_SUCCESS + elif args.git: + logger.info(f"Run analyzer on GIT: {args.git}") + credentials_number, branches_number, commits_number = scan_git(args) + summary[ + f"Detected Credentials in {branches_number} branches and {commits_number} commits "] = credentials_number + if 0 <= credentials_number: + result = EXIT_SUCCESS elif args.export_config: logging.info(f"Exporting default config to file: {args.export_config}") config_dict = Util.json_load(APP_PATH / "secret" / "config.json") diff --git a/credsweeper/app.py b/credsweeper/app.py index b50961a5f..ff06c2841 100644 --- a/credsweeper/app.py +++ b/credsweeper/app.py @@ -175,11 +175,18 @@ def _use_ml_validation(self) -> bool: # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # + @property + def is_ml_validator_inited(self) -> bool: + """method to check whether ml_validator was inited without creation""" + return bool(self.__ml_validator) + + # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # + @property def ml_validator(self) -> MlValidator: """ml_validator getter""" from credsweeper.ml_model import MlValidator - if not self.__ml_validator: + if not self.is_ml_validator_inited: self.__ml_validator: MlValidator = MlValidator(threshold=self.ml_threshold) assert self.__ml_validator, "self.__ml_validator was not initialized" return self.__ml_validator @@ -372,6 +379,12 @@ def export_results(self) -> None: credentials = self.credential_manager.get_credentials() + if credentials: + logger.info(f"Exporting {len(credentials)} credentials") + else: + logger.info("No credentials were found") + return + if self.sort_output: credentials.sort(key=lambda x: ( # x.line_data_list[0].path, # diff --git a/credsweeper/common/keyword_checklist.txt b/credsweeper/common/keyword_checklist.txt index 224cd42f0..8416a4685 100644 --- a/credsweeper/common/keyword_checklist.txt +++ b/credsweeper/common/keyword_checklist.txt @@ -772,6 +772,7 @@ since single sites sizing +skip sleep slice slick diff --git a/credsweeper/file_handler/files_provider.py b/credsweeper/file_handler/files_provider.py index 2b2cabe32..1089551b7 100644 --- a/credsweeper/file_handler/files_provider.py +++ b/credsweeper/file_handler/files_provider.py @@ -1,7 +1,7 @@ import io from abc import ABC, abstractmethod from pathlib import Path -from typing import List, Union, Tuple +from typing import List, Union, Tuple, Sequence from credsweeper.config import Config from credsweeper.file_handler.diff_content_provider import DiffContentProvider @@ -11,7 +11,7 @@ class FilesProvider(ABC): """Base class for all files provider objects.""" - def __init__(self, paths: List[Union[str, Path, io.BytesIO, Tuple[Union[str, Path], io.BytesIO]]]) -> None: + def __init__(self, paths: Sequence[Union[str, Path, io.BytesIO, Tuple[Union[str, Path], io.BytesIO]]]) -> None: """Initialize Files Provider object for 'paths'. Args: @@ -21,12 +21,12 @@ def __init__(self, paths: List[Union[str, Path, io.BytesIO, Tuple[Union[str, Pat self.paths = paths @property - def paths(self) -> List[Union[str, Path, io.BytesIO, Tuple[Union[str, Path], io.BytesIO]]]: + def paths(self) -> Sequence[Union[str, Path, io.BytesIO, Tuple[Union[str, Path], io.BytesIO]]]: """paths getter""" return self.__paths @paths.setter - def paths(self, paths: List[Union[str, Path, io.BytesIO, Tuple[Union[str, Path], io.BytesIO]]]) -> None: + def paths(self, paths: Sequence[Union[str, Path, io.BytesIO, Tuple[Union[str, Path], io.BytesIO]]]) -> None: """paths setter""" self.__paths = paths diff --git a/credsweeper/file_handler/patch_provider.py b/credsweeper/file_handler/patch_provider.py index e76aa731d..bbf7b9fa0 100644 --- a/credsweeper/file_handler/patch_provider.py +++ b/credsweeper/file_handler/patch_provider.py @@ -1,7 +1,7 @@ import io import logging from pathlib import Path -from typing import List, Union, Tuple +from typing import List, Union, Tuple, Sequence from credsweeper import TextContentProvider from credsweeper.common.constants import DiffRowType @@ -22,20 +22,16 @@ class PatchProvider(FilesProvider): Parameters: paths: file paths list to scan. All files should be in `.patch` format change_type: string, type of analyses changes in patch (added or deleted) - skip_ignored: boolean variable, Checking the directory to the list - of ignored directories from the gitignore file """ - def __init__(self, paths: List[Union[str, Path, io.BytesIO, Tuple[Union[str, Path], io.BytesIO]]], + def __init__(self, paths: Sequence[Union[str, Path, io.BytesIO, Tuple[Union[str, Path], io.BytesIO]]], change_type: DiffRowType) -> None: """Initialize Files Patch Provider for patch files from 'paths'. Args: paths: file paths list to scan. All files should be in `.patch` format change_type: string, type of analyses changes in patch (added or deleted) - skip_ignored: boolean variable, Checking the directory to the list - of ignored directories from the gitignore file """ super().__init__(paths) diff --git a/credsweeper/file_handler/text_provider.py b/credsweeper/file_handler/text_provider.py index 41038d7f4..25cdbe954 100644 --- a/credsweeper/file_handler/text_provider.py +++ b/credsweeper/file_handler/text_provider.py @@ -1,7 +1,7 @@ import io import logging from pathlib import Path -from typing import List, Optional, Union, Tuple +from typing import List, Optional, Union, Tuple, Sequence from credsweeper import DiffContentProvider from credsweeper.config import Config @@ -24,7 +24,7 @@ class TextProvider(FilesProvider): """ def __init__(self, - paths: List[Union[str, Path, io.BytesIO, Tuple[Union[str, Path], io.BytesIO]]], + paths: Sequence[Union[str, Path, io.BytesIO, Tuple[Union[str, Path], io.BytesIO]]], skip_ignored: Optional[bool] = None) -> None: """Initialize Files Text Provider for files from 'paths'. diff --git a/docs/source/guide.rst b/docs/source/guide.rst index 01e9d34b1..c9f3019ae 100644 --- a/docs/source/guide.rst +++ b/docs/source/guide.rst @@ -13,7 +13,7 @@ Get all argument list: .. code-block:: text - usage: python -m credsweeper [-h] (--path PATH [PATH ...] | --diff_path PATH [PATH ...] | --export_config [PATH] | --export_log_config [PATH]) [--rules [PATH]] [--severity SEVERITY] [--config [PATH]] + usage: python -m credsweeper [-h] (--path PATH [PATH ...] | --diff_path PATH [PATH ...] | --export_config [PATH] | --export_log_config [PATH] | --git PATH [PATH ...]) [--commits POSITIVE_INT] [--branch BRANCH] [--rules [PATH]] [--severity SEVERITY] [--config [PATH]] [--log_config [PATH]] [--denylist PATH] [--find-by-ext] [--depth POSITIVE_INT] [--no-filters] [--doc] [--ml_threshold FLOAT_OR_STR] [--ml_batch_size POSITIVE_INT] [--azure | --cuda] [--api_validation] [--jobs POSITIVE_INT] [--skip_ignored] [--save-json [PATH]] [--save-xlsx [PATH]] [--sort] [--log LOG_LEVEL] [--size_limit SIZE_LIMIT] [--banner] [--version] @@ -27,6 +27,11 @@ Get all argument list: exporting default config to file (default: config.json) --export_log_config [PATH] exporting default logger config to file (default: log.yaml) + --git PATH [PATH ...] + git repo to scan + --commits POSITIVE_INT + scan git repo for N commits only + --branch BRANCH scan git repo for single branch, otherwise - all branches were scanned (slow) --rules [PATH] path of rule config file (default: credsweeper/rules/config.yaml). severity:['critical', 'high', 'medium', 'low', 'info'] type:['keyword', 'pattern', 'pem_key', 'multi'] --severity SEVERITY set minimum level for rules to apply ['critical', 'high', 'medium', 'low', 'info'](default: 'Severity.INFO', case insensitive) --config [PATH] use custom config (default: built-in) diff --git a/requirements.txt b/requirements.txt index 0e903ccea..b9d688090 100644 --- a/requirements.txt +++ b/requirements.txt @@ -16,6 +16,7 @@ whatthepatch==1.0.5 pdfminer.six==20221105 password-strength==0.0.3.post2 python-dateutil==2.8.2 +PyDriller~=2.5.1 # ML requirements numpy==1.24.4 diff --git a/setup.py b/setup.py index 0353ce5e6..1b560c7e6 100644 --- a/setup.py +++ b/setup.py @@ -24,6 +24,7 @@ "scikit-learn", # "onnxruntime", # "python-dateutil", # + "PyDriller", # ] setuptools.setup( diff --git a/tests/test_app.py b/tests/test_app.py index b5dfdc044..6f91e45ec 100644 --- a/tests/test_app.py +++ b/tests/test_app.py @@ -5,12 +5,14 @@ import subprocess import sys import tempfile +from pathlib import Path import time from typing import AnyStr, Tuple from unittest import TestCase import deepdiff import pytest +from git import Repo from credsweeper.app import APP_PATH from credsweeper.utils import Util @@ -22,12 +24,12 @@ class TestApp(TestCase): @staticmethod def _m_credsweeper(args) -> Tuple[str, str]: - proc = subprocess.Popen( + with subprocess.Popen( [sys.executable, "-m", "credsweeper", *args], # - cwd=APP_PATH.parent, # - stdout=subprocess.PIPE, # - stderr=subprocess.PIPE) # - _stdout, _stderr = proc.communicate() + cwd=APP_PATH.parent, # + stdout=subprocess.PIPE, # + stderr=subprocess.PIPE) as proc: + _stdout, _stderr = proc.communicate() def transform(x: AnyStr) -> str: if isinstance(x, bytes): @@ -203,7 +205,10 @@ def test_it_works_n(self) -> None: " | --diff_path PATH [PATH ...]" \ " | --export_config [PATH]" \ " | --export_log_config [PATH]" \ + " | --git PATH [PATH ...]" \ ")" \ + " [--commits POSITIVE_INT]" \ + " [--branch BRANCH]" \ " [--rules [PATH]]" \ " [--severity SEVERITY]" \ " [--config [PATH]]" \ @@ -231,6 +236,7 @@ def test_it_works_n(self) -> None: " --diff_path" \ " --export_config" \ " --export_log_config" \ + " --git" \ " is required " expected = " ".join(expected.split()) self.assertEqual(expected, output) @@ -331,7 +337,8 @@ def test_patch_save_json_p(self) -> None: _stdout, _stderr = self._m_credsweeper( ["--diff_path", target_path, "--save-json", json_filename, "--log", "silence"]) self.assertTrue(os.path.exists(os.path.join(tmp_dir, f"{__name__}_added.json"))) - self.assertTrue(os.path.exists(os.path.join(tmp_dir, f"{__name__}_deleted.json"))) + # deleted patch contains no issues + self.assertFalse(os.path.exists(os.path.join(tmp_dir, f"{__name__}_deleted.json"))) # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # @@ -469,10 +476,7 @@ def test_find_by_ext_n(self) -> None: json_filename = os.path.join(tmp_dir, f"{__name__}.json") _stdout, _stderr = self._m_credsweeper( ["--path", tmp_dir, "--save-json", json_filename, "--log", "silence"]) - self.assertTrue(os.path.exists(json_filename)) - with open(json_filename, "r") as json_file: - report = json.load(json_file) - self.assertEqual(0, len(report)) + self.assertFalse(os.path.exists(json_filename)) # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # @@ -534,9 +538,7 @@ def test_denylist_value_p(self) -> None: _stdout, _stderr = self._m_credsweeper([ "--path", target_path, "--denylist", denylist_filename, "--save-json", json_filename, "--log", "silence" ]) - with open(json_filename, "r") as json_file: - report = json.load(json_file) - self.assertEqual(0, len(report)) + self.assertFalse(os.path.exists(json_filename)) # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # @@ -550,6 +552,7 @@ def test_denylist_value_n(self) -> None: _stdout, _stderr = self._m_credsweeper([ "--path", target_path, "--denylist", denylist_filename, "--save-json", json_filename, "--log", "silence" ]) + self.assertTrue(os.path.exists(json_filename)) with open(json_filename, "r") as json_file: report = json.load(json_file) self.assertEqual(1, len(report)) @@ -566,9 +569,7 @@ def test_denylist_line_p(self) -> None: _stdout, _stderr = self._m_credsweeper([ "--path", target_path, "--denylist", denylist_filename, "--save-json", json_filename, "--log", "silence" ]) - with open(json_filename, "r") as json_file: - report = json.load(json_file) - self.assertEqual(0, len(report)) + self.assertFalse(os.path.exists(json_filename)) # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # @@ -582,6 +583,7 @@ def test_denylist_line_n(self) -> None: _stdout, _stderr = self._m_credsweeper([ "--path", target_path, "--denylist", denylist_filename, "--save-json", json_filename, "--log", "silence" ]) + self.assertTrue(os.path.exists(json_filename)) with open(json_filename, "r") as json_file: report = json.load(json_file) self.assertEqual(1, len(report)) @@ -603,7 +605,7 @@ def test_rules_ml_p(self) -> None: report_set = set([i["rule"] for i in report]) rules = Util.yaml_load(APP_PATH / "rules" / "config.yaml") rules_set = set([i["name"] for i in rules]) - missed = { # + missed = { # type: ignore "ID_PASSWD_PAIR", "SECRET_PAIR", "IP_ID_PASSWORD_TRIPLE", @@ -689,3 +691,32 @@ def test_doc_n(self) -> None: _stdout, _stderr = self._m_credsweeper(["--doc", "--path", str(SAMPLES_PATH), "--save-json", json_filename]) report = Util.json_load(json_filename) self.assertEqual(SAMPLES_IN_DOC, len(report)) + + # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # + + def test_pydriller_p(self) -> None: + with tempfile.TemporaryDirectory() as tmp_dir: + with Repo.init(tmp_dir) as repo: + cred_file = Path(tmp_dir) / "with_cred" + value = "GbdD@23#d0" + with open(cred_file, "w") as f: + f.write(f"git_password: {value}") + repo.index.add([cred_file]) + repo.index.commit("added file") + with open(cred_file, "w") as f: + f.write("DELETED") + repo.index.add([cred_file]) + repo.index.commit("cleared file") + # check that value is not in the file + with open(cred_file, "r") as f: + self.assertNotIn(value, f.read()) + # run git scan + _stdout, _stderr = self._m_credsweeper(["--log", "DEBUG", "--git", str(tmp_dir)]) + self.assertIn("Detected Credentials in 1 branches and 2 commits : 1", _stdout, _stdout) + self.assertNotIn("CRITICAL", _stdout, _stdout) + self.assertNotIn("CRITICAL", _stderr, _stderr) + # check detected value in stdout + self.assertIn(value, _stdout, _stdout) + # del repo + + # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # diff --git a/tests/test_main.py b/tests/test_main.py index 89a05eaa1..5323a8def 100644 --- a/tests/test_main.py +++ b/tests/test_main.py @@ -103,42 +103,6 @@ def test_use_filters_n(self) -> None: # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # - @mock.patch("json.dump") - def test_save_json_p(self, mock_json_dump) -> None: - cred_sweeper = CredSweeper(json_filename="unittest_output.json") - cred_sweeper.run([]) - mock_json_dump.assert_called() - self.assertTrue(os.path.exists("unittest_output.json")) - os.remove("unittest_output.json") - - # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # - - @mock.patch("json.dump") - def test_save_json_n(self, mock_json_dump) -> None: - cred_sweeper = CredSweeper() - cred_sweeper.run([]) - mock_json_dump.assert_not_called() - - # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # - - def test_save_xlsx_p(self) -> None: - with tempfile.TemporaryDirectory() as tmp_dir: - test_filename = os.path.join(tmp_dir, "unittest_output.xlsx") - self.assertFalse(os.path.exists(test_filename)) - cred_sweeper = CredSweeper(xlsx_filename=test_filename) - cred_sweeper.run([]) - self.assertTrue(os.path.exists(test_filename)) - - # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # - - @mock.patch("pandas.DataFrame", return_value=pd.DataFrame(data=[])) - def test_save_xlsx_n(self, mock_xlsx_to_excel) -> None: - cred_sweeper = CredSweeper() - cred_sweeper.run([]) - mock_xlsx_to_excel.assert_not_called() - - # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # - @mock.patch("credsweeper.__main__.scan", return_value=None) @mock.patch("credsweeper.__main__.get_arguments") def test_main_n(self, mock_get_arguments, mock_scan) -> None: @@ -169,8 +133,9 @@ def test_main_path_p(self, mock_get_arguments) -> None: denylist_path=None) mock_get_arguments.return_value = args_mock self.assertEqual(EXIT_SUCCESS, app_main.main()) - self.assertTrue(os.path.exists(os.path.join(tmp_dir, f"{__name__}_deleted.json"))) self.assertTrue(os.path.exists(os.path.join(tmp_dir, f"{__name__}_added.json"))) + # deleted patch contains no issue + self.assertFalse(os.path.exists(os.path.join(tmp_dir, f"{__name__}_deleted.json"))) report = Util.json_load(os.path.join(tmp_dir, f"{__name__}_added.json")) self.assertTrue(report) self.assertEqual(3, report[0]["line_data_list"][0]["line_num"]) @@ -262,6 +227,37 @@ def test_report_p(self, mock_get_arguments) -> None: # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # + @mock.patch("credsweeper.__main__.get_arguments") + def test_report_n(self, mock_get_arguments) -> None: + # no reports will be generated when no credentials are found + with tempfile.TemporaryDirectory() as tmp_dir: + json_filename = os.path.join(tmp_dir, "report.json") + xlsx_filename = os.path.join(tmp_dir, "report.xlsx") + args_mock = Mock( + log='warning', + config_path=None, + path=[tmp_dir], # empty dir + diff_path=None, + json_filename=json_filename, + xlsx_filename=xlsx_filename, + sort_output=True, + rule_path=None, + jobs=1, + ml_threshold=0.0, + depth=0, + doc=False, + size_limit="1G", + find_by_ext=False, + api_validation=False, + denylist_path=None, + severity=Severity.INFO) + mock_get_arguments.return_value = args_mock + self.assertEqual(EXIT_SUCCESS, app_main.main()) + self.assertFalse(os.path.exists(xlsx_filename)) + self.assertFalse(os.path.exists(json_filename)) + + # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # + @mock.patch("argparse.ArgumentParser.parse_args") def test_parse_args_n(self, mock_parse) -> None: self.assertTrue(app_main.get_arguments()) @@ -360,6 +356,48 @@ def test_find_by_ext_and_not_ignore_p(self) -> None: # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # + def test_multiple_invocation_p(self) -> None: + # test whether ml_validator is created once + self.maxDiff = None + cred_sweeper = CredSweeper() + self.assertFalse(cred_sweeper.is_ml_validator_inited) + # found candidate is not ML validated + provider = StringContentProvider(["qpF8Q~PCM5MhMoyTFc5TYEomnzRUKim9UJhe8a6E"]) + candidates = cred_sweeper.file_scan(provider) + self.assertEqual(1, len(candidates)) + self.assertEqual("Azure Secret Value", candidates[0].rule_name) + self.assertFalse(cred_sweeper.is_ml_validator_inited) + cred_sweeper.credential_manager.set_credentials(candidates) + cred_sweeper.post_processing() + self.assertFalse(cred_sweeper.is_ml_validator_inited) + + # found candidate is ML validated + provider = StringContentProvider(['"nonce": "qPRjfoZWaBPH0KbXMCicm5v1VdG5Hj0DUFMHdSxPOiS"']) + candidates = cred_sweeper.file_scan(provider) + self.assertEqual(1, len(candidates)) + self.assertEqual("Nonce", candidates[0].rule_name) + self.assertFalse(cred_sweeper.is_ml_validator_inited) + cred_sweeper.credential_manager.set_credentials(candidates) + cred_sweeper.post_processing() + self.assertTrue(cred_sweeper.is_ml_validator_inited) + # remember id of the validator + validator_id = id(cred_sweeper.ml_validator) + + # found candidate is ML validated also + provider = StringContentProvider(["password = Xdj@jcN834b"]) + candidates = cred_sweeper.file_scan(provider) + self.assertEqual(1, len(candidates)) + self.assertEqual("Password", candidates[0].rule_name) + # the ml_validator still initialized + self.assertTrue(cred_sweeper.is_ml_validator_inited) + cred_sweeper.credential_manager.set_credentials(candidates) + cred_sweeper.post_processing() + self.assertTrue(cred_sweeper.is_ml_validator_inited) + # the same id of the validator + self.assertEqual(validator_id, id(cred_sweeper.ml_validator)) + + # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # + def test_multi_jobs_p(self) -> None: # real result might be shown in code coverage content_provider: FilesProvider = TextProvider([SAMPLES_PATH])