Skip to content

Commit

Permalink
fix xlsx report for diff (#644)
Browse files Browse the repository at this point in the history
* fix xlsx report for diff

* use . instead _ in diff report name for py3.8 compatibility

* style

* import optimization

* update test

* Update __init__.py

* remove warnings

* remove get_json_filenames
  • Loading branch information
babenek authored Dec 23, 2024
1 parent 2d403d7 commit f5f9656
Show file tree
Hide file tree
Showing 12 changed files with 121 additions and 116 deletions.
2 changes: 1 addition & 1 deletion credsweeper/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,4 +18,4 @@
'__version__'
]

__version__ = "1.10.0"
__version__ = "1.10.1"
38 changes: 10 additions & 28 deletions credsweeper/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
import sys
import time
from argparse import ArgumentParser, ArgumentTypeError, Namespace
from typing import Any, Union, Optional, Dict
from typing import Any, Union, Dict

from credsweeper import __version__
from credsweeper.app import APP_PATH, CredSweeper
Expand Down Expand Up @@ -253,36 +253,19 @@ def get_arguments() -> Namespace:
return parser.parse_args()


def get_json_filenames(json_filename: str):
"""Auxiliary function to get names for json files with added and deleted .patch data
Args:
json_filename: original json path
Returns:
Tuple of paths with added and deleted suffixes
"""
if json_filename is None:
return None, None
added_json_filename = json_filename[:-5] + "_added.json"
deleted_json_filename = json_filename[:-5] + "_deleted.json"
return added_json_filename, deleted_json_filename


def scan(args: Namespace, content_provider: AbstractProvider, json_filename: Optional[str],
xlsx_filename: Optional[str]) -> int:
def scan(args: Namespace, content_provider: AbstractProvider) -> int:
"""Scan content_provider data, print results or save them to json_filename is not None
Args:
args: arguments of the application
content_provider: FilesProvider instance to scan data from
json_filename: json type report file path or None
xlsx_filename: xlsx type report file path or None
Returns:
Number of detected credentials
Warnings:
DeprecationWarning: Using 'json_filename' and/or 'xlsx_filename' will issue a warning.
"""
try:
if args.denylist_path is not None:
Expand All @@ -292,8 +275,8 @@ def scan(args: Namespace, content_provider: AbstractProvider, json_filename: Opt

credsweeper = CredSweeper(rule_path=args.rule_path,
config_path=args.config_path,
json_filename=json_filename,
xlsx_filename=xlsx_filename,
json_filename=args.json_filename,
xlsx_filename=args.xlsx_filename,
color=args.color,
hashed=args.hashed,
subtext=args.subtext,
Expand Down Expand Up @@ -332,21 +315,20 @@ def main() -> int:
if args.path:
logger.info(f"Run analyzer on path: {args.path}")
content_provider: AbstractProvider = FilesProvider(args.path, skip_ignored=args.skip_ignored)
credentials_number = scan(args, content_provider, args.json_filename, args.xlsx_filename)
credentials_number = scan(args, content_provider)
summary["Detected Credentials"] = credentials_number
if 0 <= credentials_number:
result = EXIT_SUCCESS
elif args.diff_path:
added_json_filename, deleted_json_filename = get_json_filenames(args.json_filename)
# Analyze added data
logger.info(f"Run analyzer on added rows from patch files: {args.diff_path}")
content_provider = PatchesProvider(args.diff_path, change_type=DiffRowType.ADDED)
add_credentials_number = scan(args, content_provider, added_json_filename, args.xlsx_filename)
add_credentials_number = scan(args, content_provider)
summary["Added File Credentials"] = add_credentials_number
# Analyze deleted data
logger.info(f"Run analyzer on deleted rows from patch files: {args.diff_path}")
content_provider = PatchesProvider(args.diff_path, change_type=DiffRowType.DELETED)
del_credentials_number = scan(args, content_provider, deleted_json_filename, args.xlsx_filename)
del_credentials_number = scan(args, content_provider)
summary["Deleted File Credentials"] = del_credentials_number
if 0 <= add_credentials_number and 0 <= del_credentials_number:
result = EXIT_SUCCESS
Expand Down
39 changes: 29 additions & 10 deletions credsweeper/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
# Directory of credsweeper sources MUST be placed before imports to avoid circular import error
APP_PATH = Path(__file__).resolve().parent

from credsweeper.common.constants import KeyValidationOption, Severity, ThresholdPreset
from credsweeper.common.constants import KeyValidationOption, Severity, ThresholdPreset, DiffRowType
from credsweeper.config import Config
from credsweeper.credentials import Candidate, CredentialManager, CandidateKey
from credsweeper.deep_scanner.deep_scanner import DeepScanner
Expand Down Expand Up @@ -67,10 +67,8 @@ def __init__(self,
validation was the grained candidate model on machine learning
config_path: optional str variable, path of CredSweeper config file
default built-in config is used if None
json_filename: optional string variable, path to save result
to json
xlsx_filename: optional string variable, path to save result
to xlsx
json_filename: optional string variable, path to save result to json
xlsx_filename: optional string variable, path to save result to xlsx
color: print results to stdout with colorization
hashed: use hash of line, value and variable instead plain text
subtext: use subtext of line near variable-value like it performed in ML
Expand Down Expand Up @@ -241,7 +239,9 @@ def run(self, content_provider: AbstractProvider) -> int:
logger.info(f"Start Scanner for {len(file_extractors)} providers")
self.scan(file_extractors)
self.post_processing()
self.export_results()
# PatchesProvider has the attribute. Circular import error appears with using the isinstance
change_type = content_provider.change_type if hasattr(content_provider, "change_type") else None
self.export_results(change_type)

return len(self.credential_manager.get_credentials())

Expand Down Expand Up @@ -381,12 +381,19 @@ def post_processing(self) -> None:

# # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # #

def export_results(self) -> None:
"""Save credential candidates to json file or print them to a console."""
def export_results(self, change_type: Optional[DiffRowType] = None) -> None:
"""
Save credential candidates to json file or print them to a console.
Args:
change_type: flag to know which file should be created for a patch
"""
is_exported = False

credentials = self.credential_manager.get_credentials()

logger.info(f"Exporting {len(credentials)} credentials")

if self.sort_output:
credentials.sort(key=lambda x: ( #
x.line_data_list[0].path, #
Expand All @@ -398,17 +405,29 @@ def export_results(self) -> None:
))

if self.json_filename:
json_path = Path(self.json_filename)
is_exported = True
if isinstance(change_type, DiffRowType):
# add suffix for appropriated reports to create two files for the patch scan
json_path = json_path.with_suffix(f".{change_type.value}{json_path.suffix}")
Util.json_dump([credential.to_json(hashed=self.hashed, subtext=self.subtext) for credential in credentials],
file_path=self.json_filename)
file_path=json_path)

if self.xlsx_filename:
is_exported = True
data_list = []
for credential in credentials:
data_list.extend(credential.to_dict_list(hashed=self.hashed, subtext=self.subtext))
df = pd.DataFrame(data=data_list)
df.to_excel(self.xlsx_filename, index=False)
if isinstance(change_type, DiffRowType):
if Path(self.xlsx_filename).exists():
with pd.ExcelWriter(self.xlsx_filename, mode='a', engine="openpyxl",
if_sheet_exists="replace") as writer:
df.to_excel(writer, sheet_name=change_type.value, index=False)
else:
df.to_excel(self.xlsx_filename, sheet_name=change_type.value, index=False)
else:
df.to_excel(self.xlsx_filename, sheet_name="report", index=False)

if self.color:
is_exported = True
Expand Down
1 change: 0 additions & 1 deletion credsweeper/scanner/scan_type/scan_type.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
from credsweeper.config import Config
from credsweeper.credentials import Candidate, LineData
from credsweeper.file_handler.analysis_target import AnalysisTarget

from credsweeper.filters import Filter
from credsweeper.rules import Rule

Expand Down
2 changes: 1 addition & 1 deletion tests/__init__.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
from pathlib import Path

# total number of files in test samples
SAMPLES_FILES_COUNT = 141
SAMPLES_FILES_COUNT = 142

# the lowest value of ML threshold is used to display possible lowest values
NEGLIGIBLE_ML_THRESHOLD = 0.0001
Expand Down
4 changes: 2 additions & 2 deletions tests/filters/test_line_git_binary_check.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
import pytest

from credsweeper.file_handler.analysis_target import AnalysisTarget
from credsweeper.filters import LineSpecificKeyCheck, LineGitBinaryCheck
from tests.filters.conftest import LINE_VALUE_PATTERN, DUMMY_ANALYSIS_TARGET, DUMMY_DESCRIPTOR
from credsweeper.filters import LineGitBinaryCheck
from tests.filters.conftest import LINE_VALUE_PATTERN, DUMMY_DESCRIPTOR
from tests.test_utils.dummy_line_data import get_line_data


Expand Down
5 changes: 1 addition & 4 deletions tests/filters/test_line_uue_part_check.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,8 @@
from unittest import TestCase

import pytest

from credsweeper.file_handler.analysis_target import AnalysisTarget
from credsweeper.filters import LineSpecificKeyCheck, LineGitBinaryCheck
from credsweeper.filters.line_uue_part_check import LineUUEPartCheck
from tests.filters.conftest import LINE_VALUE_PATTERN, DUMMY_ANALYSIS_TARGET, DUMMY_DESCRIPTOR
from tests.filters.conftest import LINE_VALUE_PATTERN, DUMMY_DESCRIPTOR
from tests.test_utils.dummy_line_data import get_line_data


Expand Down
2 changes: 1 addition & 1 deletion tests/filters/test_value_hex_number_check.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import pytest

from credsweeper.filters import ValueNumberCheck, ValueHexNumberCheck
from credsweeper.filters import ValueHexNumberCheck
from tests.filters.conftest import LINE_VALUE_PATTERN, DUMMY_ANALYSIS_TARGET
from tests.test_utils.dummy_line_data import get_line_data

Expand Down
1 change: 0 additions & 1 deletion tests/ml_model/test_features.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
from credsweeper.ml_model.features.is_secret_numeric import IsSecretNumeric
from credsweeper.ml_model.features.word_in_line import WordInLine
from credsweeper.ml_model.features.word_in_value import WordInValue
from credsweeper.utils.entropy_validator import EntropyValidator
from tests import AZ_STRING

RE_TEST_PATTERN = re.compile(r"(?P<variable>.*) (?P<separator>over) (?P<value>.+)")
Expand Down
10 changes: 10 additions & 0 deletions tests/samples/uuid-update.patch
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
diff --git a/uuid b/uuid
index 0a20cf1..b52370e 100644
--- a/uuid
+++ b/uuid
@@ -1 +1 @@
-bace4d19-fa7e-beef-cafe-9129474bcd81
\ No newline at end of file
+bace4d19-fa7e-dead-beef-9129474bcd81
\ No newline at end of file

77 changes: 56 additions & 21 deletions tests/test_app.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,8 @@
from unittest import TestCase

import deepdiff
import pytest
import numpy as np
import pandas as pd

from credsweeper.app import APP_PATH
from credsweeper.utils import Util
Expand All @@ -25,12 +26,12 @@ def setUp(self):

@staticmethod
def _m_credsweeper(args) -> Tuple[str, str]:
proc = subprocess.Popen(
with subprocess.Popen(
[sys.executable, "-m", "credsweeper", *args], #
cwd=APP_PATH.parent, #
stdout=subprocess.PIPE, #
stderr=subprocess.PIPE) #
_stdout, _stderr = proc.communicate()
cwd=APP_PATH.parent, #
stdout=subprocess.PIPE, #
stderr=subprocess.PIPE) as proc:
_stdout, _stderr = proc.communicate()

def transform(x: AnyStr) -> str:
if isinstance(x, bytes):
Expand Down Expand Up @@ -332,8 +333,8 @@ def test_patch_save_json_p(self) -> None:
json_filename = os.path.join(tmp_dir, f"{__name__}.json")
_stdout, _stderr = self._m_credsweeper(
["--diff_path", target_path, "--save-json", json_filename, "--log", "silence"])
self.assertTrue(os.path.exists(os.path.join(tmp_dir, f"{__name__}_added.json")))
self.assertTrue(os.path.exists(os.path.join(tmp_dir, f"{__name__}_deleted.json")))
self.assertTrue(os.path.exists(os.path.join(tmp_dir, f"{__name__}.added.json")))
self.assertTrue(os.path.exists(os.path.join(tmp_dir, f"{__name__}.deleted.json")))

# # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # #

Expand Down Expand Up @@ -644,22 +645,56 @@ def test_no_filters_p(self) -> None:

# # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # #

def test_severity_p(self) -> None:
_stdout, _stderr = self._m_credsweeper([ #
"--log", "silence", "--ml_threshold", "0", "--severity", "medium", "--path",
str(SAMPLES_PATH)
])
self.assertIn("severity: medium", _stdout)
self.assertNotIn("severity: info", _stdout)
def test_severity_patch_xlsx_n(self) -> None:
# uuid is info level - no report
with tempfile.TemporaryDirectory() as tmp_dir:
_stdout, _stderr = self._m_credsweeper([ #
"--severity",
"low",
"--diff",
str(SAMPLES_PATH / "uuid-update.patch"),
"--save-xlsx",
os.path.join(tmp_dir, f"{__name__}.xlsx"),
"--save-json",
os.path.join(tmp_dir, f"{__name__}.json"),
])
# reports are created
self.assertEqual(3, len(os.listdir(tmp_dir)))
# but empty
self.assertListEqual([], Util.json_load(os.path.join(tmp_dir, f"{__name__}.deleted.json")))
self.assertListEqual([], Util.json_load(os.path.join(tmp_dir, f"{__name__}.added.json")))
self.assertEqual(0, len(pd.read_excel(os.path.join(tmp_dir, f"{__name__}.xlsx"))))

# # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # #

def test_severity_n(self) -> None:
_stdout, _stderr = self._m_credsweeper([ #
"--log", "silence", "--ml_threshold", "0", "--severity", "critical", "--path",
str(SAMPLES_PATH)
])
self.assertNotIn("severity: medium", _stdout)
def test_severity_patch_xlsx_p(self) -> None:
# info level produces xlsx file with "added" and "deleted" sheets and two json files
with tempfile.TemporaryDirectory() as tmp_dir:
xlsx_filename = os.path.join(tmp_dir, f"{__name__}.xlsx")
_stdout, _stderr = self._m_credsweeper([ #
"--severity",
"info",
"--diff",
str(SAMPLES_PATH / "uuid-update.patch"),
"--save-xlsx",
xlsx_filename,
"--save-json",
os.path.join(tmp_dir, f"{__name__}.json"),
])
deleted_report_file = os.path.join(tmp_dir, f"{__name__}.deleted.json")
deleted_report = Util.json_load(deleted_report_file)
self.assertEqual("UUID", deleted_report[0]["rule"])
added_report_file = os.path.join(tmp_dir, f"{__name__}.added.json")
added_report = Util.json_load(added_report_file)
self.assertEqual("UUID", added_report[0]["rule"])
book = pd.read_excel(xlsx_filename, sheet_name=None, header=None)
# two sheets should be created
self.assertSetEqual({"deleted", "added"}, set(book.keys()))
# values in xlsx are wrapped to double quotes
deleted_value = f'"{deleted_report[0]["line_data_list"][0]["value"]}"'
self.assertTrue(np.isin(deleted_value, book["deleted"].values))
added_value = f'"{added_report[0]["line_data_list"][0]["value"]}"'
self.assertTrue(np.isin(added_value, book["added"].values))

# # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # #

Expand Down
Loading

0 comments on commit f5f9656

Please sign in to comment.