diff --git a/.mypy.ini b/.mypy.ini index 38c842392..b834f1f13 100644 --- a/.mypy.ini +++ b/.mypy.ini @@ -45,3 +45,6 @@ ignore_missing_imports = True [mypy-password_strength.*] ignore_missing_imports = True + +[mypy-pydriller.*] +ignore_missing_imports = True diff --git a/credsweeper/__main__.py b/credsweeper/__main__.py index 9e7add203..999254092 100644 --- a/credsweeper/__main__.py +++ b/credsweeper/__main__.py @@ -1,10 +1,13 @@ import binascii +import io import logging import os import sys import time from argparse import ArgumentParser, ArgumentTypeError, Namespace -from typing import Any, Union, Optional, Dict +from typing import Any, Union, Optional, Dict, List, Tuple + +from pydriller import Repository from credsweeper import __version__ from credsweeper.app import APP_PATH, CredSweeper @@ -116,6 +119,17 @@ def get_arguments() -> Namespace: const="log.yaml", dest="export_log_config", metavar="PATH") + group.add_argument("--git", nargs="+", help="git repo to scan", dest="git", metavar="PATH") + parser.add_argument("--commits", + help="scan git repo for N commits only", + type=positive_int, + dest="commits", + default=0, + metavar="POSITIVE_INT") + parser.add_argument("--branch", + help="scan git repo for single branch, otherwise - all branches were scanned (slow)", + dest="branch", + type=str) parser.add_argument("--rules", nargs="?", help="path of rule config file (default: credsweeper/rules/config.yaml). " @@ -300,9 +314,40 @@ def scan(args: Namespace, content_provider: FilesProvider, json_filename: Option return credsweeper.run(content_provider=content_provider) except Exception as exc: logger.critical(exc, exc_info=True) + logger.exception(exc) return -1 +def scan_git(args: Namespace) -> Tuple[int, int, int]: + """Scan repository for branches and commits + Returns: + total credentials found + total scanned branches + total scanned commits + """ + total_credentials = 0 + total_branches = 0 + total_commits = 0 + try: + repository = Repository(args.git, only_in_branch=args.branch) + for commit in repository.traverse_commits(): + logger.info(f"Scan commit: {commit.hash}") + paths: List[Tuple[str, io.BytesIO]] = [] + for file in commit.modified_files: + _io = io.BytesIO(file.content) + paths.append((file.filename, _io)) + provider = TextProvider(paths) # type: ignore + json_filename = f"{commit.hash}.{args.json_filename}" if args.json_filename else None + xlsx_filename = f"{commit.hash}.{args.xlsx_filename}" if args.xlsx_filename else None + total_credentials += scan(args, provider, json_filename, xlsx_filename) + total_commits += 1 + total_branches += 1 + except Exception as exc: + logger.critical(exc, exc_info=True) + return -1, total_branches, total_commits + return total_credentials, total_branches, total_commits + + def main() -> int: """Main function""" result = EXIT_FAILURE @@ -311,7 +356,7 @@ def main() -> int: if args.banner: print(f"CredSweeper {__version__} crc32:{check_integrity():08x}") Logger.init_logging(args.log, args.log_config_path) - logger.info(f"Init CredSweeper object with arguments: {args}") + logger.info(f"Init CredSweeper object with arguments: {args} CWD: {os.getcwd()}") summary: Dict[str, int] = {} if args.path: logger.info(f"Run analyzer on path: {args.path}") @@ -334,6 +379,13 @@ def main() -> int: summary["Deleted File Credentials"] = del_credentials_number if 0 <= add_credentials_number and 0 <= del_credentials_number: result = EXIT_SUCCESS + if args.git: + logger.info(f"Run analyzer on GIT: {args.git}") + credentials_number, branches_number, commits_number = scan_git(args) + summary[ + f"Detected Credentials in {branches_number} branches and {commits_number} commits "] = credentials_number + if 0 <= credentials_number: + result = EXIT_SUCCESS elif args.export_config: logging.info(f"Exporting default config to file: {args.export_config}") config_dict = Util.json_load(APP_PATH / "secret" / "config.json") diff --git a/credsweeper/file_handler/files_provider.py b/credsweeper/file_handler/files_provider.py index 2b2cabe32..1089551b7 100644 --- a/credsweeper/file_handler/files_provider.py +++ b/credsweeper/file_handler/files_provider.py @@ -1,7 +1,7 @@ import io from abc import ABC, abstractmethod from pathlib import Path -from typing import List, Union, Tuple +from typing import List, Union, Tuple, Sequence from credsweeper.config import Config from credsweeper.file_handler.diff_content_provider import DiffContentProvider @@ -11,7 +11,7 @@ class FilesProvider(ABC): """Base class for all files provider objects.""" - def __init__(self, paths: List[Union[str, Path, io.BytesIO, Tuple[Union[str, Path], io.BytesIO]]]) -> None: + def __init__(self, paths: Sequence[Union[str, Path, io.BytesIO, Tuple[Union[str, Path], io.BytesIO]]]) -> None: """Initialize Files Provider object for 'paths'. Args: @@ -21,12 +21,12 @@ def __init__(self, paths: List[Union[str, Path, io.BytesIO, Tuple[Union[str, Pat self.paths = paths @property - def paths(self) -> List[Union[str, Path, io.BytesIO, Tuple[Union[str, Path], io.BytesIO]]]: + def paths(self) -> Sequence[Union[str, Path, io.BytesIO, Tuple[Union[str, Path], io.BytesIO]]]: """paths getter""" return self.__paths @paths.setter - def paths(self, paths: List[Union[str, Path, io.BytesIO, Tuple[Union[str, Path], io.BytesIO]]]) -> None: + def paths(self, paths: Sequence[Union[str, Path, io.BytesIO, Tuple[Union[str, Path], io.BytesIO]]]) -> None: """paths setter""" self.__paths = paths diff --git a/credsweeper/file_handler/patch_provider.py b/credsweeper/file_handler/patch_provider.py index e76aa731d..bbf7b9fa0 100644 --- a/credsweeper/file_handler/patch_provider.py +++ b/credsweeper/file_handler/patch_provider.py @@ -1,7 +1,7 @@ import io import logging from pathlib import Path -from typing import List, Union, Tuple +from typing import List, Union, Tuple, Sequence from credsweeper import TextContentProvider from credsweeper.common.constants import DiffRowType @@ -22,20 +22,16 @@ class PatchProvider(FilesProvider): Parameters: paths: file paths list to scan. All files should be in `.patch` format change_type: string, type of analyses changes in patch (added or deleted) - skip_ignored: boolean variable, Checking the directory to the list - of ignored directories from the gitignore file """ - def __init__(self, paths: List[Union[str, Path, io.BytesIO, Tuple[Union[str, Path], io.BytesIO]]], + def __init__(self, paths: Sequence[Union[str, Path, io.BytesIO, Tuple[Union[str, Path], io.BytesIO]]], change_type: DiffRowType) -> None: """Initialize Files Patch Provider for patch files from 'paths'. Args: paths: file paths list to scan. All files should be in `.patch` format change_type: string, type of analyses changes in patch (added or deleted) - skip_ignored: boolean variable, Checking the directory to the list - of ignored directories from the gitignore file """ super().__init__(paths) diff --git a/credsweeper/file_handler/text_provider.py b/credsweeper/file_handler/text_provider.py index 41038d7f4..25cdbe954 100644 --- a/credsweeper/file_handler/text_provider.py +++ b/credsweeper/file_handler/text_provider.py @@ -1,7 +1,7 @@ import io import logging from pathlib import Path -from typing import List, Optional, Union, Tuple +from typing import List, Optional, Union, Tuple, Sequence from credsweeper import DiffContentProvider from credsweeper.config import Config @@ -24,7 +24,7 @@ class TextProvider(FilesProvider): """ def __init__(self, - paths: List[Union[str, Path, io.BytesIO, Tuple[Union[str, Path], io.BytesIO]]], + paths: Sequence[Union[str, Path, io.BytesIO, Tuple[Union[str, Path], io.BytesIO]]], skip_ignored: Optional[bool] = None) -> None: """Initialize Files Text Provider for files from 'paths'. diff --git a/docs/source/guide.rst b/docs/source/guide.rst index 31702f77d..eb2265863 100644 --- a/docs/source/guide.rst +++ b/docs/source/guide.rst @@ -13,7 +13,7 @@ Get all argument list: .. code-block:: text - usage: python -m credsweeper [-h] (--path PATH [PATH ...] | --diff_path PATH [PATH ...] | --export_config [PATH] | --export_log_config [PATH]) [--rules [PATH]] [--severity SEVERITY] [--config [PATH]] + usage: python -m credsweeper [-h] (--path PATH [PATH ...] | --diff_path PATH [PATH ...] | --export_config [PATH] | --export_log_config [PATH] | --git PATH [PATH ...]) [--commits POSITIVE_INT] [--branch BRANCH] [--rules [PATH]] [--severity SEVERITY] [--config [PATH]] [--log_config [PATH]] [--denylist PATH] [--find-by-ext] [--depth POSITIVE_INT] [--no-filters] [--doc] [--ml_threshold FLOAT_OR_STR] [--ml_batch_size POSITIVE_INT] [--azure | --cuda] [--api_validation] [--jobs POSITIVE_INT] [--skip_ignored] [--save-json [PATH]] [--save-xlsx [PATH]] [--sort] [--log LOG_LEVEL] [--size_limit SIZE_LIMIT] [--banner] [--version] @@ -27,6 +27,11 @@ Get all argument list: exporting default config to file (default: config.json) --export_log_config [PATH] exporting default logger config to file (default: log.yaml) + --git PATH [PATH ...] + git repo to scan + --commits POSITIVE_INT + scan git repo for N commits only + --branch BRANCH scan git repo for single branch, otherwise - all branches were scanned (slow) --rules [PATH] path of rule config file (default: credsweeper/rules/config.yaml). severity:['critical', 'high', 'medium', 'low', 'info'] type:['keyword', 'pattern', 'pem_key', 'multi'] --severity SEVERITY set minimum level for rules to apply ['critical', 'high', 'medium', 'low', 'info'](default: 'Severity.INFO', case insensitive) --config [PATH] use custom config (default: built-in) diff --git a/requirements.txt b/requirements.txt index a608ec979..87adca3b2 100644 --- a/requirements.txt +++ b/requirements.txt @@ -16,6 +16,7 @@ whatthepatch==1.0.5 pdfminer.six==20221105 password-strength==0.0.3.post2 python-dateutil==2.8.2 +PyDriller~=2.5.1 # ML requirements numpy==1.24.4 diff --git a/setup.py b/setup.py index 3dc826063..3f2867c9d 100644 --- a/setup.py +++ b/setup.py @@ -24,6 +24,7 @@ "scikit-learn", # "onnxruntime", # "python-dateutil", # + "PyDriller", # ] setuptools.setup( diff --git a/tests/test_app.py b/tests/test_app.py index 6ff314c11..b88a5b794 100644 --- a/tests/test_app.py +++ b/tests/test_app.py @@ -5,12 +5,14 @@ import subprocess import sys import tempfile +from pathlib import Path import time from typing import AnyStr, Tuple from unittest import TestCase import deepdiff import pytest +from git import Repo from credsweeper.app import APP_PATH from credsweeper.utils import Util @@ -22,12 +24,12 @@ class TestApp(TestCase): @staticmethod def _m_credsweeper(args) -> Tuple[str, str]: - proc = subprocess.Popen( + with subprocess.Popen( [sys.executable, "-m", "credsweeper", *args], # - cwd=APP_PATH.parent, # - stdout=subprocess.PIPE, # - stderr=subprocess.PIPE) # - _stdout, _stderr = proc.communicate() + cwd=APP_PATH.parent, # + stdout=subprocess.PIPE, # + stderr=subprocess.PIPE) as proc: + _stdout, _stderr = proc.communicate() def transform(x: AnyStr) -> str: if isinstance(x, bytes): @@ -203,7 +205,10 @@ def test_it_works_n(self) -> None: " | --diff_path PATH [PATH ...]" \ " | --export_config [PATH]" \ " | --export_log_config [PATH]" \ + " | --git PATH [PATH ...]" \ ")" \ + " [--commits POSITIVE_INT]" \ + " [--branch BRANCH]" \ " [--rules [PATH]]" \ " [--severity SEVERITY]" \ " [--config [PATH]]" \ @@ -231,6 +236,7 @@ def test_it_works_n(self) -> None: " --diff_path" \ " --export_config" \ " --export_log_config" \ + " --git" \ " is required " expected = " ".join(expected.split()) self.assertEqual(expected, output) @@ -677,3 +683,32 @@ def test_doc_n(self) -> None: _stdout, _stderr = self._m_credsweeper(["--doc", "--path", str(SAMPLES_PATH), "--save-json", json_filename]) report = Util.json_load(json_filename) self.assertEqual(SAMPLES_IN_DOC, len(report)) + + # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # + + def test_pydriller_p(self) -> None: + with tempfile.TemporaryDirectory() as tmp_dir: + with Repo.init(tmp_dir) as repo: + cred_file = Path(tmp_dir) / "with_cred" + value = "GbdD@23#d0" + with open(cred_file, "w") as f: + f.write(f"git_password: {value}") + repo.index.add([cred_file]) + repo.index.commit("added file") + with open(cred_file, "w") as f: + f.write("DELETED") + repo.index.add([cred_file]) + repo.index.commit("cleared file") + # check that value is not in the file + with open(cred_file, "r") as f: + self.assertNotIn(value, f.read()) + # run git scan + _stdout, _stderr = self._m_credsweeper(["--log", "DEBUG", "--git", str(tmp_dir)]) + self.assertIn("Detected Credentials in 1 branches and 2 commits : 1", _stdout, _stdout) + self.assertNotIn("CRITICAL", _stdout, _stdout) + self.assertNotIn("CRITICAL", _stderr, _stderr) + # check detected value in stdout + self.assertIn(value, _stdout, _stdout) + # del repo + + # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # #