Skip to content

Commit 24af647

Browse files
committed
Images
1 parent b1f0cba commit 24af647

File tree

1 file changed

+329
-0
lines changed

1 file changed

+329
-0
lines changed

EnumImages.py

Lines changed: 329 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,329 @@
1+
#!/usr/bin/env python3
2+
3+
from __future__ import annotations
4+
5+
import argparse
6+
import json
7+
import math
8+
import re
9+
import sys
10+
from pathlib import Path
11+
from typing import Any, Dict, List, Optional, Tuple
12+
13+
import requests
14+
15+
# local import for progress; keep at top-level to fail early if missing
16+
from alive_progress import alive_bar
17+
18+
DEFAULT_URL = "http://localhost:2375"
19+
DEFAULT_TIMEOUT = 10
20+
21+
# --- heuristics / patterns ---
22+
PATTERNS = {
23+
"aws_key": re.compile(r"AKIA[0-9A-Z]{16}"),
24+
"private_key_header": re.compile(r"-----BEGIN (RSA |OPENSSH |PRIVATE )?PRIVATE KEY-----", re.I),
25+
"private_key_any": re.compile(r"-----BEGIN .*PRIVATE KEY-----", re.I),
26+
"password_assignment": re.compile(r"(password|passwd|pwd|secret|token)\s*(=|:)\s*[\S]+", re.I),
27+
"basic_auth_like": re.compile(r"[uU]sername[:=]\s*\S+|[pP]assword[:=]\s*\S+"),
28+
# long base64-like string (>=40 chars of base64 chars, optional padding)
29+
"long_base64": re.compile(r"(?:[A-Za-z0-9+/]{40,}={0,2})"),
30+
# hex-like long string (likely API tokens / sha1/sha256 if long)
31+
"long_hex": re.compile(r"\b[0-9a-fA-F]{32,}\b"),
32+
# likely jwts: three base64url segments separated by dots (header.payload.signature)
33+
"jwt_like": re.compile(r"^[A-Za-z0-9\-_]+\.[A-Za-z0-9\-_]+\.[A-Za-z0-9\-_]+$"),
34+
}
35+
36+
MIN_ENTROPY_LEN = 20
37+
ENTROPY_THRESHOLD = 4.2 # heuristic threshold for high-entropy (0..~6.5 for ascii)
38+
# higher thresholds reduce false positives; tuned conservative
39+
40+
41+
# --- utility functions ---
42+
def shannon_entropy(s: str) -> float:
43+
"""Compute Shannon entropy for string s (bits/char)."""
44+
if not s:
45+
return 0.0
46+
freq: Dict[str, int] = {}
47+
for ch in s:
48+
freq[ch] = freq.get(ch, 0) + 1
49+
ent = 0.0
50+
length = len(s)
51+
for count in freq.values():
52+
p = count / length
53+
ent -= p * math.log2(p)
54+
return ent
55+
56+
57+
def find_matches_in_text(text: str) -> List[Dict[str, Any]]:
58+
"""Run regex heuristics and entropy checks on text, return list of findings."""
59+
findings: List[Dict[str, Any]] = []
60+
if not text:
61+
return findings
62+
63+
# quick checks for patterns
64+
for name, pat in PATTERNS.items():
65+
for m in pat.finditer(text):
66+
candidate = m.group(0)
67+
ent = shannon_entropy(candidate) if len(candidate) >= MIN_ENTROPY_LEN else 0.0
68+
findings.append(
69+
{
70+
"type": name,
71+
"match": candidate,
72+
"context_snippet": _context_snippet(text, m.start(), m.end()),
73+
"entropy": round(ent, 3),
74+
}
75+
)
76+
77+
# scan for potential high-entropy substrings separated by whitespace/punctuation
78+
# break into tokens and test long tokens
79+
tokens = re.split(r"[\s\"'`<>(){}[\],;]+", text)
80+
for t in tokens:
81+
if len(t) >= MIN_ENTROPY_LEN:
82+
ent = shannon_entropy(t)
83+
if ent >= ENTROPY_THRESHOLD:
84+
findings.append(
85+
{
86+
"type": "high_entropy_token",
87+
"match": t[:200] if len(t) > 200 else t,
88+
"entropy": round(ent, 3),
89+
"length": len(t),
90+
}
91+
)
92+
93+
return findings
94+
95+
96+
def _context_snippet(text: str, start: int, end: int, window: int = 40) -> str:
97+
s = max(0, start - window)
98+
e = min(len(text), end + window)
99+
snippet = text[s:e]
100+
return snippet.replace("\n", " ") # single-line snippet
101+
102+
103+
# --- Docker API helpers ---
104+
def get_engine_info(base_url: str, timeout: int = DEFAULT_TIMEOUT) -> Dict[str, Any]:
105+
try:
106+
resp = requests.get(f"{base_url.rstrip('/')}/info", timeout=timeout)
107+
resp.raise_for_status()
108+
return resp.json()
109+
except requests.RequestException as e:
110+
print(f"[!] Failed to /info: {e}", file=sys.stderr)
111+
return {}
112+
113+
114+
def get_images(base_url: str, timeout: int = DEFAULT_TIMEOUT) -> List[Dict[str, Any]]:
115+
try:
116+
resp = requests.get(f"{base_url.rstrip('/')}/images/json?all=true", timeout=timeout)
117+
resp.raise_for_status()
118+
return resp.json()
119+
except requests.RequestException as e:
120+
print(f"[!] Failed to list images: {e}", file=sys.stderr)
121+
return []
122+
123+
124+
def inspect_image(base_url: str, image_ref: str, timeout: int = DEFAULT_TIMEOUT) -> Optional[Dict[str, Any]]:
125+
# image_ref may be id or repo:tag. Use id if available.
126+
try:
127+
resp = requests.get(f"{base_url.rstrip('/')}/images/{image_ref}/json", timeout=timeout)
128+
resp.raise_for_status()
129+
return resp.json()
130+
except requests.RequestException:
131+
return None
132+
133+
134+
def image_history(base_url: str, image_ref: str, timeout: int = DEFAULT_TIMEOUT) -> List[Dict[str, Any]]:
135+
try:
136+
resp = requests.get(f"{base_url.rstrip('/')}/images/{image_ref}/history", timeout=timeout)
137+
resp.raise_for_status()
138+
return resp.json()
139+
except requests.RequestException:
140+
return []
141+
142+
143+
# --- main scanning logic ---
144+
def scan_image(base_url: str, image: Dict[str, Any], timeout: int = DEFAULT_TIMEOUT) -> Dict[str, Any]:
145+
"""
146+
Examine an image metadata and history for suspicious items.
147+
Returns a dict with findings.
148+
"""
149+
img_id = image.get("Id") or image.get("ID") or image.get("id") or image.get("Digest") or "<unknown>"
150+
tags = image.get("RepoTags") or image.get("Names") or []
151+
size = image.get("Size")
152+
created = image.get("Created")
153+
# prefer a short friendly name for API calls; use tag if present
154+
call_ref = img_id
155+
# strip "sha256:" prefix when calling API with ID sometimes works with full id too; leave as-is
156+
157+
details = inspect_image(DEFAULT_URL if base_url is None else base_url, call_ref, timeout=timeout) or {}
158+
history = image_history(DEFAULT_URL if base_url is None else base_url, call_ref, timeout=timeout)
159+
160+
findings: List[Dict[str, Any]] = []
161+
# gather candidate strings to scan
162+
candidate_texts: List[Tuple[str, str]] = [] # (source, text)
163+
164+
# Repo tags
165+
if tags:
166+
candidate_texts.append(("repo_tags", " ".join(tags) if isinstance(tags, list) else str(tags)))
167+
168+
# Config section from inspect result
169+
cfg = details.get("Config") or {}
170+
if cfg:
171+
# Env
172+
envs = cfg.get("Env") or []
173+
if envs:
174+
candidate_texts.append(("env", "\n".join(envs)))
175+
# Labels
176+
labels = cfg.get("Labels") or {}
177+
if labels:
178+
# join label keys and values
179+
candidate_texts.append(("labels", " ".join(f"{k}={v}" for k, v in labels.items())))
180+
# Cmd / Entrypoint
181+
cmd = cfg.get("Cmd")
182+
if cmd:
183+
candidate_texts.append(("cmd", " ".join(cmd)))
184+
entry = cfg.get("Entrypoint")
185+
if entry:
186+
candidate_texts.append(("entrypoint", " ".join(entry)))
187+
working_dir = cfg.get("WorkingDir")
188+
if working_dir:
189+
candidate_texts.append(("working_dir", working_dir))
190+
191+
# ContainerConfig also may hold env/cmd
192+
ccfg = details.get("ContainerConfig") or {}
193+
if ccfg:
194+
if ccfg.get("Env"):
195+
candidate_texts.append(("container_config_env", " ".join(ccfg.get("Env"))))
196+
if ccfg.get("Cmd"):
197+
candidate_texts.append(("container_config_cmd", " ".join(ccfg.get("Cmd"))))
198+
199+
# History: created_by often includes the RUN lines
200+
if history:
201+
# created_by fields are useful for seeing RUN commands
202+
created_bys = []
203+
for h in history:
204+
cb = h.get("CreatedBy") or ""
205+
if cb:
206+
created_bys.append(cb)
207+
if created_bys:
208+
candidate_texts.append(("history_created_by", "\n".join(created_bys)))
209+
210+
# tags in image manifest / repo digests
211+
if image.get("RepoDigests"):
212+
candidate_texts.append(("repo_digests", " ".join(image.get("RepoDigests"))))
213+
214+
# also include raw JSON dump of details (shortened) for scanning
215+
if details:
216+
# convert to string but limit size to avoid scanning huge blobs
217+
jdump = json.dumps(details, default=str)
218+
candidate_texts.append(("inspect_json", jdump[:20000])) # cap to 20k chars
219+
220+
# run heuristics on each candidate text
221+
for source, txt in candidate_texts:
222+
matches = find_matches_in_text(txt)
223+
for m in matches:
224+
finding = dict(m)
225+
finding["source"] = source
226+
findings.append(finding)
227+
228+
# Summarize some metadata
229+
summary = {
230+
"image_id": img_id,
231+
"tags": tags,
232+
"size": size,
233+
"created": created,
234+
"findings": findings,
235+
"inspect_available": bool(details),
236+
"history_entries": len(history),
237+
}
238+
return summary
239+
240+
241+
def parse_args() -> argparse.Namespace:
242+
p = argparse.ArgumentParser(description="Audit Docker images for potential sensitive information (metadata + history).")
243+
p.add_argument("--url", default=DEFAULT_URL, help=f"Docker Engine API base URL (default: {DEFAULT_URL})")
244+
p.add_argument("--timeout", type=int, default=DEFAULT_TIMEOUT, help="HTTP timeout in seconds.")
245+
p.add_argument("--out", metavar="FILE", help="Optional JSON output file to save report.")
246+
p.add_argument("--min-findings", type=int, default=0, help="Only include images with at least this many findings in printed summary.")
247+
p.add_argument("--show-inspect-json", action="store_true", help="Print full inspect JSON for images (can be verbose).")
248+
return p.parse_args()
249+
250+
251+
def print_engine_overview(info: Dict[str, Any]) -> None:
252+
if not info:
253+
print("Engine Info: (unavailable)")
254+
return
255+
print("=== Docker Engine Overview (/info) ===")
256+
print(f"Server Version : {info.get('ServerVersion')}")
257+
print(f"OS / Arch : {info.get('OperatingSystem')} ({info.get('OSType')}/{info.get('Architecture')})")
258+
print(f"CPUs / Memory : {info.get('NCPU')} / {info.get('MemTotal')}")
259+
print(f"Storage Driver : {info.get('Driver')}")
260+
print("=" * 36)
261+
262+
263+
def main() -> int:
264+
args = parse_args()
265+
base_url = args.url.rstrip("/")
266+
timeout = args.timeout
267+
268+
engine_info = get_engine_info(base_url, timeout=timeout)
269+
print_engine_overview(engine_info)
270+
271+
images = get_images(base_url, timeout=timeout)
272+
if not images:
273+
print("No images returned by /images/json.")
274+
return 0
275+
276+
report: Dict[str, Any] = {"engine_info": engine_info, "images": []}
277+
278+
print(f"Found {len(images)} images. Scanning metadata and history for sensitive information...")
279+
280+
with alive_bar(len(images), title="Scanning images") as bar:
281+
for img in images:
282+
summary = scan_image(base_url, img, timeout=timeout)
283+
# Collect and optionally print summary
284+
if len(summary["findings"]) >= args.min_findings:
285+
print("\n---")
286+
tags = summary.get("tags") or []
287+
tagstr = ", ".join(tags) if tags else "(untagged)"
288+
print(f"Image: {tagstr}")
289+
print(f"ID: {summary.get('image_id')}")
290+
print(f"Size bytes: {summary.get('size')}, history entries: {summary.get('history_entries')}")
291+
if summary["findings"]:
292+
print("Findings:")
293+
for f in summary["findings"]:
294+
# compact display
295+
typ = f.get("type")
296+
ent = f.get("entropy")
297+
src = f.get("source")
298+
snippet = f.get("context_snippet") or f.get("match")
299+
snippet = f.get("context_snippet") or f.get("match")
300+
print(f" - [{typ}] source={src} entropy={ent} match={repr(f.get('match'))}")
301+
if snippet:
302+
print(f" → snippet: {snippet}")
303+
else:
304+
print("Findings: None")
305+
if args.show_inspect_json and summary.get("inspect_available"):
306+
# call inspect again to print full JSON
307+
details = inspect_image(base_url, summary["image_id"], timeout=timeout)
308+
if details:
309+
print("Full inspect JSON (truncated):")
310+
print(json.dumps(details, indent=2)[:4000])
311+
report["images"].append(summary)
312+
bar()
313+
314+
# Save JSON report if requested
315+
if args.out:
316+
try:
317+
p = Path(args.out)
318+
p.write_text(json.dumps(report, indent=2))
319+
print(f"\nSaved JSON report to {p.resolve()}")
320+
except Exception as e:
321+
print(f"[!] Failed to save report to {args.out}: {e}", file=sys.stderr)
322+
return 2
323+
324+
print("\nScan complete. Review flagged findings manually — these are heuristics, not definitive evidence.")
325+
return 0
326+
327+
328+
if __name__ == "__main__":
329+
sys.exit(main())

0 commit comments

Comments
 (0)