diff --git a/.github/workflows/scan.yml b/.github/workflows/scan.yml new file mode 100644 index 0000000..3e35a29 --- /dev/null +++ b/.github/workflows/scan.yml @@ -0,0 +1,82 @@ +name: OWASP PR Scanner + +on: + pull_request: + paths: + - 'src/**' + - 'backend/**' + - 'app/**' + - 'services/**' + - '.github/workflows/**' + - 'scanner/**' + +jobs: + scan: + runs-on: ubuntu-latest + steps: + - name: Checkout + uses: actions/checkout@v4 + with: + fetch-depth: 0 + + - name: Set up Python + uses: actions/setup-python@v5 + with: + python-version: '3.11' + + - name: Install deps + run: | + python -m pip install -U pip + if [ -f scanner/requirements.txt ]; then + pip install -r scanner/requirements.txt + elif [ -f requirements.txt ]; then + pip install -r requirements.txt + fi + + - name: Determine changed files for this PR + id: diff + shell: bash + run: | + BASE_SHA="${{ github.event.pull_request.base.sha }}" + HEAD_SHA="${{ github.event.pull_request.head.sha }}" + + RAW=$(git diff --name-only "$BASE_SHA" "$HEAD_SHA" || true) + + APP_CHANGED=$(echo "$RAW" \ + | grep -E '\.(js|jsx|ts|tsx|py|java|go|rb|php|html|css|md)$' \ + | grep -E '^(src/|backend/|app/|services/)' || true) + + SCANNER_ONLY=$(echo "$RAW" | grep -E '^scanner/' || true) + + if [ -z "$APP_CHANGED" ] && [ -n "$SCANNER_ONLY" ]; then + echo "only_scanner_changes=true" >> $GITHUB_OUTPUT + exit 0 + fi + + if [ -z "$APP_CHANGED" ]; then + APP_CHANGED="$(git ls-files src backend app services 2>/dev/null || true)" + fi + + echo "CHANGED_FILES<> "$GITHUB_ENV" + echo "$APP_CHANGED" >> "$GITHUB_ENV" + echo "EOF" >> "$GITHUB_ENV" + + - name: Skip when only scanner/** changed + if: steps.diff.outputs.only_scanner_changes == 'true' + run: echo "Only scanner/** changed; skipping scan." + + - name: Run OWASP scanner on changed files + if: steps.diff.outputs.only_scanner_changes != 'true' + shell: bash + run: | + if [ -z "${CHANGED_FILES}" ]; then + echo "Nothing to scan." + exit 0 + fi + EXIT=0 + while IFS= read -r file; do + [ -z "$file" ] && continue + echo "Scanning: $file" + python -m scanner.main "$file" || EXIT=1 + done <<< "${CHANGED_FILES}" + exit $EXIT diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..3e8b403 --- /dev/null +++ b/.gitignore @@ -0,0 +1 @@ +"scanner/venv/" diff --git a/scanner/__init__.py b/scanner/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/scanner/core.py b/scanner/core.py new file mode 100644 index 0000000..f48deb8 --- /dev/null +++ b/scanner/core.py @@ -0,0 +1,146 @@ +# Responsibilities: +# - Reads target file, stores code lines +# - Manages vulnerability list +# - Runs all rule checks (auto-discovers rules in scanner/rules) +# - Provides add_vulnerability callback +# - Prints a grouped, colourised report + +import os +import importlib +import pkgutil +import scanner.rules as rules_pkg + + +# -------- Rule auto-discovery -------- +def _load_rule_modules(): + modules = [] + for _, modname, _ in pkgutil.iter_modules(rules_pkg.__path__): + if modname.startswith("_"): + continue # skip __init__, _template, etc. + mod = importlib.import_module(f"{rules_pkg.__name__}.{modname}") + if hasattr(mod, "check"): + modules.append(mod) + + # Stable order: by CATEGORY "A01: ..." if provided, else by module name + def key(m): + cat = getattr(m, "CATEGORY", "") + head = cat.split(":", 1)[0].strip() if cat else "" + return (0, int(head[1:])) if head.startswith("A") and head[1:].isdigit() else (1, m.__name__) + + return sorted(modules, key=key) + + +RULE_MODULES = _load_rule_modules() + + +# -------- Scanner -------- +class VulnerabilityScanner: + def __init__(self, file_path): + self.file_path = file_path + self.code_lines = [] + self.vulnerabilities = [] + + def add_vulnerability(self, category, description, line, severity, confidence): + self.vulnerabilities.append( + { + "category": category, + "description": description, + "line": line, + "severity": severity, + "confidence": confidence, + } + ) + + def parse_file(self): + if not os.path.exists(self.file_path): + print(f"File {self.file_path} does not exist.") + return False + with open(self.file_path, "r", encoding="utf-8") as f: + self.code_lines = f.readlines() + return True + + def run_checks(self): + for rule in RULE_MODULES: + # each rule exposes: check(code_lines, add_vulnerability) + rule.check(self.code_lines, self.add_vulnerability) + + def run(self): + if not self.parse_file(): + return + self.run_checks() + + def report(self): + # ---- colour helpers ---- + def supports_truecolor() -> bool: + return os.environ.get("COLORTERM", "").lower() in ("truecolor", "24bit") + + def rgb(r, g, b) -> str: + return f"\033[38;2;{r};{g};{b}m" + + ANSI = { + "reset": "\033[0m", + "bold": "\033[1m", + "cyan": "\033[96m", + "magenta": "\033[95m", + "yellow": "\033[93m", + "red": "\033[91m", + "green": "\033[92m", + "blue": "\033[94m", + } + + TRUECOLOR = supports_truecolor() + + # Severity colours (true-color -> fallback) + CRIT = (rgb(220, 20, 60) if TRUECOLOR else ANSI["red"] + ANSI["bold"]) # crimson + HIGH = (rgb(255, 0, 0) if TRUECOLOR else ANSI["red"]) # red + MED = (rgb(255, 165, 0) if TRUECOLOR else ANSI["yellow"]) # orange-ish + LOW = (rgb(0, 200, 0) if TRUECOLOR else ANSI["green"]) # green + + RESET = ANSI["reset"] + BOLD = ANSI["bold"] + HDR = (rgb(180, 130, 255) if TRUECOLOR else ANSI["magenta"]) # section header + TITLE = (rgb(120, 220, 200) if TRUECOLOR else ANSI["cyan"]) # title + SUM = (rgb(255, 215, 0) if TRUECOLOR else ANSI["yellow"]) # summary label + + sev_color = {"CRITICAL": CRIT, "HIGH": HIGH, "MEDIUM": MED, "LOW": LOW} + + print(f"\n{BOLD}{TITLE}Scan Results for {self.file_path}:{RESET}") + + if not self.vulnerabilities: + ok = rgb(0, 200, 0) if TRUECOLOR else ANSI["green"] + print(f"{ok}✅ No vulnerabilities found.{RESET}") + return + + # Group by category + groups = {} + for v in self.vulnerabilities: + groups.setdefault(v["category"], []).append(v) + + def cat_key(cat: str): + head = cat.split(":", 1)[0].strip() + return (0, int(head[1:])) if head.startswith("A") and head[1:].isdigit() else (1, cat.lower()) + + for cat in sorted(groups.keys(), key=cat_key): + items = sorted(groups[cat], key=lambda x: x["line"]) + # tally + sev_counts = {"CRITICAL": 0, "HIGH": 0, "MEDIUM": 0, "LOW": 0} + for v in items: + sev_counts[v["severity"]] = sev_counts.get(v["severity"], 0) + 1 + + total = len(items) + print(f"\n{BOLD}{HDR}=== {cat} ({total} finding{'s' if total != 1 else ''}) ==={RESET}") + + chips = [] + for k in ["CRITICAL", "HIGH", "MEDIUM", "LOW"]: + n = sev_counts.get(k, 0) + if n: + chips.append(f"{sev_color[k]}{k.title()}{RESET}: {n}") + if chips: + print(f"{SUM}Summary:{RESET} " + ", ".join(chips)) + + for v in items: + sc = sev_color.get(v["severity"], ANSI["blue"]) + print(f"\n {BOLD}• Line {v['line']} |{RESET} " + f"Severity {sc}{v['severity']}{RESET} | " + f"Confidence {v['confidence']}") + print(f" → {v['description']}") diff --git a/scanner/main.py b/scanner/main.py new file mode 100644 index 0000000..7bc4d88 --- /dev/null +++ b/scanner/main.py @@ -0,0 +1,21 @@ +# Entry point for the OWASP PR Scanner CLI tool. +# This script parses the command-line arguments (i.e., the file path to scan), +# initializes the VulnerabilityScanner with the specified file, runs all rule checks, +# and prints a formatted vulnerability report to the console. + + +import argparse +from .core import VulnerabilityScanner + + +def main(): + parser = argparse.ArgumentParser(description="OWASP PR Vulnerability Scanner") + parser.add_argument("path", help="Path to Python file to scan") + args = parser.parse_args() + + scanner = VulnerabilityScanner(args.path) + scanner.run() + scanner.report() + +if __name__ == "__main__": + main() diff --git a/scanner/rules/__init__.py b/scanner/rules/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/scanner/rules/_template.py b/scanner/rules/_template.py new file mode 100644 index 0000000..1d04ded --- /dev/null +++ b/scanner/rules/_template.py @@ -0,0 +1,11 @@ +# Template for adding new OWASP rule modules +def check(code_lines, add_vulnerability): + for i, line in enumerate(code_lines): + if "pattern" in line: # replace with real logic + add_vulnerability( + "Axx: Rule Name", + f"Description: {line.strip()}", + i + 1, + "HIGH", + "MEDIUM" + ) diff --git a/scanner/rules/auth_failures.py b/scanner/rules/auth_failures.py new file mode 100644 index 0000000..faacceb --- /dev/null +++ b/scanner/rules/auth_failures.py @@ -0,0 +1,45 @@ +# A07:2021 – Identification and Authentication Failures +# Detects default credentials (`admin`, `password`) +# Flags login routes without auth checks +# Warns about disabled TLS verification (`verify=False`) +import re + +def check(code_lines, add_vulnerability): + for i, line in enumerate(code_lines): + # Flask/Django style routes that should require auth + if re.search(r"@app\.route\([\"'](/login|/auth|/signin)[\"']", line): + add_vulnerability( + "A07: Identification and Authentication Failures", + f"Authentication-related route without explicit auth checks: {line.strip()}", + i + 1, + "HIGH", + "MEDIUM" + ) + + # Python requests with TLS verify disabled + if "requests." in line and "verify=False" in line: + add_vulnerability( + "A07: Identification and Authentication Failures", + f"Insecure TLS verification disabled: {line.strip()}", + i + 1, + "HIGH", + "HIGH" + ) + + # Hardcoded default creds + if re.search(r"(user(name)?\s*=\s*['\"](admin|root)['\"])", line, re.IGNORECASE): + add_vulnerability( + "A07: Identification and Authentication Failures", + f"Hardcoded default username detected: {line.strip()}", + i + 1, + "HIGH", + "HIGH" + ) + if re.search(r"(password\s*=\s*['\"](admin|1234|password)['\"])", line, re.IGNORECASE): + add_vulnerability( + "A07: Identification and Authentication Failures", + f"Hardcoded default password detected: {line.strip()}", + i + 1, + "HIGH", + "HIGH" + ) diff --git a/scanner/rules/broken_access_control.py b/scanner/rules/broken_access_control.py new file mode 100644 index 0000000..2038991 --- /dev/null +++ b/scanner/rules/broken_access_control.py @@ -0,0 +1,94 @@ +# A01:2021 – Broken Access Control +# +# It looks for common patterns that suggest missing or weak authorization checks: +# 1) Flask routes without an auth/role decorator (e.g., @login_required, @jwt_required). +# 2) Django REST Framework endpoints that explicitly allow unauthenticated access +# (e.g., permission_classes = [AllowAny]). +# 3) Express.js routes that attach a handler directly with no middleware +# (e.g., app.get('/admin', (req, res) => ...)) which often implies no auth check. +# +# Function: +# - `check(code_lines, add_vulnerability)`: Scans lines and reports findings with context. + +import re + +AUTH_DECORATOR_RE = re.compile( + r'@(login_required|jwt_required|roles_required|requires_auth|auth_required|permission_required)', + re.IGNORECASE, +) +FLASK_ROUTE_RE = re.compile(r'@(?:\w+\.)?route\s*\(', re.IGNORECASE) +DEF_RE = re.compile(r'^\s*def\s+\w+\s*\(', re.IGNORECASE) + +DRF_ALLOWANY_RE = re.compile(r'permission_classes\s*=\s*\[\s*AllowAny\s*\]') +DRF_IMPORT_ALLOWANY_RE = re.compile(r'from\s+rest_framework\.permissions\s+import\s+.*AllowAny', re.IGNORECASE) + +# app.get('/path', handler) or router.post("/path", handler) +# If there is a direct callback right after the path, there is probably no middleware. +EXPRESS_ROUTE_RE = re.compile( + r'\b(?:app|router)\.(get|post|put|patch|delete|options|head)\s*\(\s*[\'"][^\'"]+[\'"]\s*,\s*(?:function|\()', + re.IGNORECASE, +) + +def check(code_lines, add_vulnerability): + # Track whether DRF AllowAny is imported to increase confidence + drf_allowany_seen = any(DRF_IMPORT_ALLOWANY_RE.search(line) for line in code_lines) + + # -------- Flask route without auth decorator ---------- + i = 0 + while i < len(code_lines): + line = code_lines[i] + if FLASK_ROUTE_RE.search(line): + # Collect decorators until we hit the function def line + decorators = [] + j = i + while j + 1 < len(code_lines) and not DEF_RE.search(code_lines[j + 1]): + j += 1 + if code_lines[j].lstrip().startswith('@'): + decorators.append(code_lines[j].strip()) + + # If next line is a function def, evaluate decorators + if j + 1 < len(code_lines) and DEF_RE.search(code_lines[j + 1]): + has_auth = any(AUTH_DECORATOR_RE.search(d) for d in decorators) + # Heuristic: mark as High likelihood if path looks sensitive + path_hint = "" + m = re.search(r'route\s*\(\s*[\'"]([^\'"]+)', line, re.IGNORECASE) + if m: + path_hint = m.group(1) + + if not has_auth: + sev_like = "HIGH" if re.search(r'/?(admin|settings|manage|delete|update|user|account)', path_hint, re.IGNORECASE) else "MEDIUM" + add_vulnerability( + "A02: Broken Access Control", + f"Flask route appears without an auth decorator: {line.strip()}", + i + 1, + sev_like, + "HIGH", + ) + i = j + 1 + else: + i += 1 + else: + i += 1 + + # -------- DRF AllowAny on views / viewsets ---------- + for idx, line in enumerate(code_lines): + if DRF_ALLOWANY_RE.search(line): + like = "HIGH" if drf_allowany_seen else "MEDIUM" + add_vulnerability( + "A02: Broken Access Control", + f"DRF endpoint allows unauthenticated access with AllowAny: {line.strip()}", + idx + 1, + like, + "HIGH", + ) + + # -------- Express routes without middleware ---------- + for idx, line in enumerate(code_lines): + if EXPRESS_ROUTE_RE.search(line): + add_vulnerability( + "A02: Broken Access Control", + f"Express route handler attached without visible auth middleware: {line.strip()}", + idx + 1, + "MEDIUM", + "HIGH", + ) diff --git a/scanner/rules/insecure_design.py b/scanner/rules/insecure_design.py new file mode 100644 index 0000000..5548256 --- /dev/null +++ b/scanner/rules/insecure_design.py @@ -0,0 +1,25 @@ +# A04:2021 – Insecure Design +# Flags insecure “TODO” markers, temporary overrides, or auth bypass notes + + +import re + +PATTERNS = [ + re.compile(r'\btodo\b.*\b(insecure|security|auth|bypass)\b', re.IGNORECASE), + re.compile(r'\btemporary\b.*\boverride\b', re.IGNORECASE), + re.compile(r'\bdisable(d)?\s+(auth(entication)?|authori[sz]ation)\b', re.IGNORECASE), + re.compile(r'\bbypass(ing)?\s+(auth|security)\b', re.IGNORECASE), +] + +def check(code_lines, add_vulnerability): + for i, line in enumerate(code_lines): + stripped = line.strip() + # do NOT skip comments — we want to catch insecure design notes in comments too + if any(p.search(stripped) for p in PATTERNS): + add_vulnerability( + "A04: Insecure Design", + f"Potential insecure design marker: {stripped}", + i + 1, + "MEDIUM", + "LOW", + ) \ No newline at end of file diff --git a/scanner/rules/integrity_failures.py b/scanner/rules/integrity_failures.py new file mode 100644 index 0000000..b9a29c9 --- /dev/null +++ b/scanner/rules/integrity_failures.py @@ -0,0 +1,52 @@ +# A08:2021 – Software and Data Integrity Failure +# Flags: eval/exec, unsafe deserialization (pickle), unsafe YAML load, and shell=True + +import re + +UNSAFE_YAML_RE = re.compile(r'\byaml\.load\s*\(') # safe form is yaml.safe_load + +def check(code_lines, add_vulnerability): + for i, line in enumerate(code_lines): + stripped = line.strip() + if stripped.startswith("#"): + continue + + # Dangerous dynamic evaluation + if "eval(" in stripped or "exec(" in stripped: + add_vulnerability( + "A08: Software and Data Integrity Failures", + f"Use of dangerous dynamic evaluation: {stripped}", + i + 1, + "HIGH", + "HIGH", + ) + + # Unsafe deserialization (pickle) + if "pickle.load(" in stripped or "pickle.loads(" in stripped: + add_vulnerability( + "A08: Software and Data Integrity Failures", + f"Potential unsafe deserialization via pickle: {stripped}", + i + 1, + "HIGH", + "HIGH", + ) + + # Unsafe YAML load (must be yaml.safe_load) + if UNSAFE_YAML_RE.search(stripped) and "safe_load" not in stripped: + add_vulnerability( + "A08: Software and Data Integrity Failures", + f"Unsafe YAML load detected; use yaml.safe_load(): {stripped}", + i + 1, + "HIGH", + "MEDIUM", + ) + + # shell=True in subprocess calls + if "subprocess." in stripped and "shell=True" in stripped: + add_vulnerability( + "A08: Software and Data Integrity Failures", + f"subprocess call with shell=True detected: {stripped}", + i + 1, + "HIGH", + "MEDIUM", + ) diff --git a/scanner/rules/logging_failures.py b/scanner/rules/logging_failures.py new file mode 100644 index 0000000..dbe62d8 --- /dev/null +++ b/scanner/rules/logging_failures.py @@ -0,0 +1,50 @@ +# A09:2021 – Security Logging and Monitoring Failures +# Flags: printing secrets, bare except with print, and print in login/auth paths + +import re + +SECRET_WORDS = ("password", "passwd", "secret", "api_key", "apikey", "token") + +def check(code_lines, add_vulnerability): + for i, line in enumerate(code_lines): + stripped = line.strip() + low = stripped.lower() + + if stripped.startswith("#"): + continue + + # Printing potential secrets + if "print(" in low and any(w in low for w in SECRET_WORDS): + add_vulnerability( + "A09: Security Logging and Monitoring Failures", + f"Possible secret printed to stdout: {stripped}", + i + 1, + "MEDIUM", + "MEDIUM", + ) + + # Bare except printing errors (poor monitoring/alerting) + if low.startswith("except:") or re.match(r"^except\s+[A-Za-z_][A-Za-z0-9_]*\s+as\s+\w+\s*:\s*$", low): + # Peek next line(s) for print + nxt = code_lines[i + 1].strip().lower() if i + 1 < len(code_lines) else "" + if "print(" in nxt: + add_vulnerability( + "A09: Security Logging and Monitoring Failures", + f"Exception handled with print() instead of proper logging/alerting near: {stripped}", + i + 1, + "MEDIUM", + "LOW", + ) + + # Print statements in login/auth contexts (heuristic) + if ("@app.route('/login'" in low or "@app.route(\"/login\"" in low) and i + 3 < len(code_lines): + # scan a small window after the route for print usage + window = " ".join(code_lines[i : i + 5]).lower() + if "print(" in window: + add_vulnerability( + "A09: Security Logging and Monitoring Failures", + "Print used in authentication flow; prefer structured, secure logging.", + i + 1, + "MEDIUM", + "LOW", + ) diff --git a/scanner/rules/security_misconfig.py b/scanner/rules/security_misconfig.py new file mode 100644 index 0000000..17885d2 --- /dev/null +++ b/scanner/rules/security_misconfig.py @@ -0,0 +1,86 @@ +# A05:2021 – Security Misconfiguration +# +# It flags risky configuration patterns commonly seen in Python, JS, and YAML: +# 1) Debug modes enabled (Django DEBUG=True, Flask app.run(debug=True)). +# 2) Overly permissive hosts or CORS settings (ALLOWED_HOSTS=['*'], Access-Control-Allow-Origin: *). +# 3) Insecure cookie or transport flags (SECURE_... = False, SESSION_COOKIE_SECURE=False). +# 4) Hardcoded or default-like secrets in config contexts (SECRET_KEY='...', password='admin'). +# +# Function: +# - `check(code_lines, add_vulnerability)`: Scans lines and reports findings with context. + +import re + +DJANGO_DEBUG_RE = re.compile(r'\bDEBUG\s*=\s*True\b') +FLASK_DEBUG_RE = re.compile(r'\bapp\.run\s*\(\s*.*\bdebug\s*=\s*True\b', re.IGNORECASE) +DJANGO_ALLOWED_HOSTS_ANY_RE = re.compile(r'\bALLOWED_HOSTS\s*=\s*\[\s*[\'"]\*\s*[\'"]\s*\]', re.IGNORECASE) + +CORS_WILDCARD_RE = re.compile(r'(Access-Control-Allow-Origin\s*[:=]\s*[\'"]\*\s*[\'"])|("allowAllOrigins"\s*:\s*true)', re.IGNORECASE) +SECURE_FLAG_FALSE_RE = re.compile(r'\b(SECURE_[A-Z_]+|SESSION_COOKIE_SECURE|CSRF_COOKIE_SECURE)\s*=\s*False\b') +INSECURE_COOKIE_RE = re.compile(r'cookie\s*(secure|httpOnly)\s*[:=]\s*false', re.IGNORECASE) + +DEFAULTY_SECRET_RE = re.compile( + r'\b(SECRET_KEY|APP_SECRET|JWT_SECRET|API_KEY|TOKEN|PASSWORD)\s*[:=]\s*[\'"]([^\'"]+)[\'"]', re.IGNORECASE +) +OBVIOUS_DEFAULTS = {'admin', 'password', 'changeme', 'change_me', 'default', 'test', 'secret'} + +def check(code_lines, add_vulnerability): + for i, line in enumerate(code_lines): + # Debug modes + if DJANGO_DEBUG_RE.search(line): + add_vulnerability( + "A05: Security Misconfiguration", + f"Django DEBUG is enabled: {line.strip()}", + i + 1, + "HIGH", + "MEDIUM", + ) + if FLASK_DEBUG_RE.search(line): + add_vulnerability( + "A05: Security Misconfiguration", + f"Flask debug mode is enabled: {line.strip()}", + i + 1, + "HIGH", + "MEDIUM", + ) + + # Permissive hosts and CORS + if DJANGO_ALLOWED_HOSTS_ANY_RE.search(line): + add_vulnerability( + "A05: Security Misconfiguration", + f"ALLOWED_HOSTS permits all hosts: {line.strip()}", + i + 1, + "MEDIUM", + "MEDIUM", + ) + if CORS_WILDCARD_RE.search(line): + add_vulnerability( + "A05: Security Misconfiguration", + f"Wildcard CORS detected: {line.strip()}", + i + 1, + "MEDIUM", + "MEDIUM", + ) + + # Insecure cookie and transport flags + if SECURE_FLAG_FALSE_RE.search(line) or INSECURE_COOKIE_RE.search(line): + add_vulnerability( + "A05: Security Misconfiguration", + f"Insecure cookie or transport flag: {line.strip()}", + i + 1, + "MEDIUM", + "MEDIUM", + ) + + # Default-like or hardcoded secrets + m = DEFAULTY_SECRET_RE.search(line) + if m: + key, value = m.group(1), m.group(2) + like = "HIGH" if value.strip().lower() in OBVIOUS_DEFAULTS else "MEDIUM" + add_vulnerability( + "A05: Security Misconfiguration", + f"Hardcoded secret or credential in config context: {key} = '***'", + i + 1, + like, + "HIGH", + ) diff --git a/scanner/rules/sensitive_data_exposure.py b/scanner/rules/sensitive_data_exposure.py new file mode 100644 index 0000000..c914ea7 --- /dev/null +++ b/scanner/rules/sensitive_data_exposure.py @@ -0,0 +1,38 @@ +# A02:2021 – Cryptographic Failures +# Detects weak hashing algorithms (MD5, SHA1) +# Flags hardcoded secrets, API keys, and default passwords +# Warns about unsafe fallback values +import re + +def check(code_lines, add_vulnerability): + weak_hashes = ["md5", "sha1"] + sensitive_keywords = ["password", "passwd", "secret", "apikey", "api_key", "token"] + + for i, line in enumerate(code_lines): + stripped = line.strip() + + # Skip comments + if stripped.startswith("#"): + continue + + # Weak crypto usage + if any(h in stripped.lower() for h in weak_hashes): + add_vulnerability( + "A03: Sensitive Data Exposure", + f"Weak hashing algorithm detected: {stripped}", + i + 1, + "HIGH", + "HIGH" + ) + + # Hardcoded secrets (but ignore env lookups and hashes) + if any(kw in stripped.lower() for kw in sensitive_keywords) and "=" in stripped: + if "os.environ" in stripped or "hashlib.sha256" in stripped: + continue # safe usage, skip + add_vulnerability( + "A03: Sensitive Data Exposure", + f"Potential hardcoded sensitive data: {stripped}", + i + 1, + "HIGH", + "MEDIUM" + ) diff --git a/scanner/rules/sql_injection.py b/scanner/rules/sql_injection.py new file mode 100644 index 0000000..a220d9c --- /dev/null +++ b/scanner/rules/sql_injection.py @@ -0,0 +1,39 @@ +# A03:2021 – Injection* + +# Specifically, it searches for suspicious SQL query patterns in Python code, +# such as unparameterized queries or string concatenation in `execute()` calls. + +# Function: +# - `check(code_lines, add_vulnerability)`: Accepts lines of code and a callback to report findings. +# Uses regular expressions to detect potential SQLi and sends alerts via `add_vulnerability()`. + +import re + +def check(code_lines, add_vulnerability): + assigned_queries = {} + + for i, line in enumerate(code_lines): + if re.search(r"=\s*['\"]\s*(SELECT|INSERT|UPDATE|DELETE)", line, re.IGNORECASE) and '+' in line: + var_match = re.match(r"\s*(\w+)\s*=", line) + if var_match: + var_name = var_match.group(1) + assigned_queries[var_name] = i + 1 + + add_vulnerability( + "A01: Injection", + f"SQL query created via string concatenation: {line.strip()}", + i + 1, + "HIGH", + "MEDIUM" + ) + + # Detect execution of those suspicious queries + for var_name in assigned_queries: + if f"execute({var_name})" in line: + add_vulnerability( + "A01: Injection", + f"Suspicious query passed to execute(): {line.strip()}", + i + 1, + "HIGH", + "HIGH" + ) \ No newline at end of file diff --git a/scanner/rules/ssrf.py b/scanner/rules/ssrf.py new file mode 100644 index 0000000..c5cb654 --- /dev/null +++ b/scanner/rules/ssrf.py @@ -0,0 +1,39 @@ +# A10:2021 – Server-Side Request Forgery (SSRF) +# Heuristic data-flow: user input -> variable -> requests.*(var) + +import re + +REQUEST_CALL_RE = re.compile(r'\brequests\.(get|post|put|patch|delete|head)\s*\(') + +def check(code_lines, add_vulnerability): + input_vars = set() + + # Track variables that come from input() + for i, line in enumerate(code_lines): + stripped = line.strip() + if stripped.startswith("#"): + continue + + # var = input("...") + m = re.match(r'^\s*([A-Za-z_][A-Za-z0-9_]*)\s*=\s*input\s*\(', stripped) + if m: + input_vars.add(m.group(1)) + + # Flag when those variables are used in requests.*(var) + for i, line in enumerate(code_lines): + stripped = line.strip() + if stripped.startswith("#"): + continue + + if REQUEST_CALL_RE.search(stripped): + # naive arg capture + for var in input_vars: + if re.search(rf'\b{var}\b', stripped): + add_vulnerability( + "A10: Server-Side Request Forgery", + f"Potential SSRF: unvalidated user-controlled URL passed to requests.*(): {stripped}", + i + 1, + "HIGH", + "HIGH", + ) + break diff --git a/scanner/rules/vulnerable_components.py b/scanner/rules/vulnerable_components.py new file mode 100644 index 0000000..a7c0288 --- /dev/null +++ b/scanner/rules/vulnerable_components.py @@ -0,0 +1,33 @@ +# A06:2021 – Vulnerable and Outdated Components +# Placeholder rule: looks for requirements with outdated versions. + +import re + +# e.g., flask==2.0.1, Django==1.11.29, requests==2.25.1 +PIN_RE = re.compile(r'^\s*([A-Za-z0-9][A-Za-z0-9_\-]*)\s*==\s*([A-Za-z0-9\.\-\+]+)\s*$') + +SUSPECT_PACKAGES = {"flask", "django"} # expand as needed + +def check(code_lines, add_vulnerability): + for i, line in enumerate(code_lines): + stripped = line.strip() + + # Skip comments entirely + if stripped.startswith("#"): + continue + + m = PIN_RE.match(stripped) + if not m: + continue + + pkg = m.group(1).lower() + ver = m.group(2) + + if pkg in SUSPECT_PACKAGES: + add_vulnerability( + "A06: Vulnerable and Outdated Components", + f"Dependency pin detected (manual review required): {pkg}=={ver}", + i + 1, + "MEDIUM", + "LOW", + ) \ No newline at end of file