Skip to content

Improve unblob "skip-extraction" mode of operation #692

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Jan 3, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 34 additions & 0 deletions tests/test_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -333,3 +333,37 @@ def test_skip_extension(
result = runner.invoke(unblob.cli.cli, params)
assert extracted_files_count == len(list(tmp_path.rglob("*")))
assert result.exit_code == 0


@pytest.mark.parametrize(
"args, skip_extraction, fail_message",
[
([], False, "Should *NOT* have skipped extraction"),
(["-s"], True, "Should have skipped extraction"),
(["--skip-extraction"], True, "Should have skipped extraction"),
],
)
def test_skip_extraction(
args: List[str], skip_extraction: bool, fail_message: str, tmp_path: Path
):
runner = CliRunner()
in_path = (
Path(__file__).parent
/ "integration"
/ "archive"
/ "zip"
/ "regular"
/ "__input__"
/ "apple.zip"
)
params = [*args, "--extract-dir", str(tmp_path), str(in_path)]

process_file_mock = mock.MagicMock()
with mock.patch.object(unblob.cli, "process_file", process_file_mock):
result = runner.invoke(unblob.cli.cli, params)

assert result.exit_code == 0
process_file_mock.assert_called_once()
assert (
process_file_mock.call_args.args[0].skip_extraction == skip_extraction
), fail_message
31 changes: 31 additions & 0 deletions tests/test_processing.py
Original file line number Diff line number Diff line change
Expand Up @@ -447,6 +447,37 @@ def get_all(file_name, report_type: Type[ReportType]) -> List[ReportType]:
)


@pytest.mark.parametrize(
"skip_extraction, file_count, extracted_file_count",
[
(True, 5, 0),
(False, 5, 6),
],
)
def test_skip_extraction(
skip_extraction: bool,
file_count: int,
extracted_file_count: int,
tmp_path: Path,
extraction_config: ExtractionConfig,
):
input_file = tmp_path / "input"
with zipfile.ZipFile(input_file, "w") as zf:
for i in range(file_count):
zf.writestr(f"file{i}", data=b"This is a test file.")

extraction_config.extract_root = tmp_path / "output"
extraction_config.skip_extraction = skip_extraction

process_result = process_file(extraction_config, input_file)
task_result_by_path = {r.task.path: r for r in process_result.results}

assert len(task_result_by_path) == extracted_file_count + 1
assert (
len(list(extraction_config.extract_root.rglob("**/*"))) == extracted_file_count
)


class ConcatenateExtractor(DirectoryExtractor):
def extract(self, paths: List[Path], outdir: Path):
outfile = outdir / "data"
Expand Down
59 changes: 56 additions & 3 deletions unblob/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,18 @@
import click
from rich.console import Console
from rich.panel import Panel
from rich.style import Style
from rich.table import Table
from structlog import get_logger

from unblob.models import DirectoryHandlers, Handlers, ProcessResult
from unblob.plugins import UnblobPluginManager
from unblob.report import ChunkReport, Severity, StatReport, UnknownChunkReport
from unblob.report import (
ChunkReport,
Severity,
StatReport,
UnknownChunkReport,
)

from .cli_options import verbosity_option
from .dependencies import get_dependencies, pretty_format_dependencies
Expand Down Expand Up @@ -200,7 +206,7 @@ def __init__(
)
@click.option(
"-s",
"--skip_extraction",
"--skip-extraction",
"skip_extraction",
is_flag=True,
show_default=True,
Expand Down Expand Up @@ -279,7 +285,10 @@ def cli(
logger.info("Start processing file", file=file)
process_results = process_file(config, file, report_file)
if verbose == 0:
print_report(process_results)
if skip_extraction:
print_scan_report(process_results)
else:
print_report(process_results)
return process_results


Expand Down Expand Up @@ -349,6 +358,50 @@ def get_size_report(task_results: List) -> Tuple[int, int, int, int]:
return total_files, total_dirs, total_links, extracted_size


def print_scan_report(reports: ProcessResult):
console = Console(stderr=True)

chunks_offset_table = Table(
expand=False,
show_lines=True,
show_edge=True,
style=Style(color="white"),
header_style=Style(color="white"),
row_styles=[Style(color="red")],
)
chunks_offset_table.add_column("Start offset")
chunks_offset_table.add_column("End offset")
chunks_offset_table.add_column("Size")
chunks_offset_table.add_column("Description")

for task_result in reports.results:
chunk_reports = [
report
for report in task_result.reports
if isinstance(report, (ChunkReport, UnknownChunkReport))
]
chunk_reports.sort(key=lambda x: x.start_offset)

for chunk_report in chunk_reports:
if isinstance(chunk_report, ChunkReport):
chunks_offset_table.add_row(
f"{chunk_report.start_offset:0d}",
f"{chunk_report.end_offset:0d}",
human_size(chunk_report.size),
chunk_report.handler_name,
style=Style(color="#00FFC8"),
)
if isinstance(chunk_report, UnknownChunkReport):
chunks_offset_table.add_row(
f"{chunk_report.start_offset:0d}",
f"{chunk_report.end_offset:0d}",
human_size(chunk_report.size),
"unknown",
style=Style(color="#008ED5"),
)
console.print(chunks_offset_table)


def print_report(reports: ProcessResult):
total_files, total_dirs, total_links, extracted_size = get_size_report(
reports.results
Expand Down
14 changes: 11 additions & 3 deletions unblob/processing.py
Original file line number Diff line number Diff line change
Expand Up @@ -136,8 +136,9 @@ def process_file(

process_result = _process_task(config, task)

# ensure that the root extraction directory is created even for empty extractions
extract_dir.mkdir(parents=True, exist_ok=True)
if not config.skip_extraction:
# ensure that the root extraction directory is created even for empty extractions
extract_dir.mkdir(parents=True, exist_ok=True)

if report_file:
write_json_report(report_file, process_result)
Expand Down Expand Up @@ -475,7 +476,7 @@ def __init__(
def process(self):
logger.debug("Processing file", path=self.task.path, size=self.size)

if self.carve_dir.exists():
if self.carve_dir.exists() and not self.config.skip_extraction:
# Extraction directory is not supposed to exist, it is usually a simple mistake of running
# unblob again without cleaning up or using --force.
# It would cause problems continuing, as it would mix up original and extracted files,
Expand Down Expand Up @@ -515,6 +516,13 @@ def _process_chunks(
if unknown_chunks:
logger.warning("Found unknown Chunks", chunks=unknown_chunks)

if self.config.skip_extraction:
for chunk in unknown_chunks:
self.result.add_report(chunk.as_report(entropy=None))
for chunk in outer_chunks:
self.result.add_report(chunk.as_report(extraction_reports=[]))
return

for chunk in unknown_chunks:
carved_unknown_path = carve_unknown_chunk(self.carve_dir, file, chunk)
entropy = self._calculate_entropy(carved_unknown_path)
Expand Down