-
Notifications
You must be signed in to change notification settings - Fork 1.8k
Auditlog: Add django-pghistory
as audit log (optional for now)
#13126
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
1933759
to
5058de8
Compare
cd137fb
to
b69c2fe
Compare
🟡 Please give this pull request extra attention during review.This pull request introduces multiple security and reliability regressions: it allows IP spoofing in audit logs via direct use of X-Forwarded-For, uses icontains filters that can force full-table scans causing DB DoS, and exposes audit information that permits username enumeration and disclosure of sensitive fields. It also adds operational risks and fragile code—silent-destructive --force log clearing, a hardcoded excluded-fields map that can leak secrets, broad exception handling that can silently disable audit triggers, an initializer that permits starting with missing migrations, and template changes that enable stored/reflected XSS by rendering unvalidated/unsafe audit data and URLs.
🟡 Potential Cross-Site Scripting in
|
Vulnerability | Potential Cross-Site Scripting |
---|---|
Description | File: dojo/templates/dojo/action_history.html -- The patch directly renders user-controllable values into the HTML response in multiple locations that can lead to XSS: 1) <a href="{{ h.url }}" ... title="{{ h.url }}">: The template inserts h.url directly into the href attribute and into the title attribute. Even though Django templates escape HTML special characters, an attacker-controlled value such as a "javascript:alert(1)" URI will remain a valid URI in the href and will execute when clicked. Using user-provided URLs as href targets without validation allows injection of javascript: or other harmful schemes and is a reflected/DOM XSS vector. 2) {{ h.pgh_datapprint }} inside : The template outputs h.pgh_data after applying a pprint filter. If the pprint filter returns HTML or marks its output safe (or does not perform proper HTML-escaping), any HTML/JavaScript contained in stored pgh_data will be rendered by the browser. Even inside a, content marked safe will be interpreted as HTML, enabling stored XSS. 3) {{ h.pgh_contextpprint }} similarly may expose stored data in the same way as pgh_data. 4) {{ h.changesaction_log_entrylinebreaks }}: The changes value is passed through an action_log_entry filter and then the Django linebreaks filter. The linebreaks filter converts newlines to HTML tags and returns a SafeString, so if action_log_entry inserts unescaped user content or marks content as safe, that HTML will be rendered verbatim. This makes stored audit log entries that contain attacker-controlled data a likely stored XSS vector. These issues are introduced by the added template blocks for pghistory_history and auditlog_history in the referenced file. The root cause is rendering potentially attacker-controlled data (h.url, h.pgh_data, h.pgh_context, h.changes) into the response without enforcing a safe, context-appropriate escaping or validating the URL scheme. |
django-DefectDojo/dojo/templates/dojo/action_history.html
Lines 4 to 158 in d89dd5e
{{ block.super }} | |
<div class="row"> | |
<div class="col-md-12"> | |
{% if pghistory_history %} | |
<div class="panel panel-default"> | |
<div class="panel-heading tight"> | |
<h4> | |
PostgreSQL History (pghistory) | |
<div class="dropdown pull-right"> | |
<button id="show-pghistory-filters" data-toggle="collapse" data-target="#pghistory-filters" class="btn btn-primary toggle-filters" aria-label="Filters"> <i class="fa-solid fa-filter"></i> <i class="caret"></i> </button> | |
</div> | |
</h4> | |
</div> | |
<div id="pghistory-filters" class="is-filters panel-body collapse {% if pghistory_filter.form.has_changed %}in{% endif %}"> | |
{% include "dojo/filter_snippet.html" with form=pghistory_filter.form %} | |
</div> | |
<div class="clearfix"> | |
{% include "dojo/paging_snippet.html" with page=pghistory_history %} | |
</div> | |
<div class="table-responsive"> | |
<table class="tablesorter-bootstrap table table-bordered table-condensed table-striped table-hover"> | |
<tr> | |
<th>Timestamp</th> | |
<th>Label</th> | |
<th>Object</th> | |
<th>User</th> | |
<th>URL</th> | |
<th>IP Address</th> | |
<th>Data</th> | |
<th>Context</th> | |
<th>Object ID</th> | |
<th>Changes</th> | |
</tr> | |
{% for h in pghistory_history %} | |
<tr> | |
<td>{{ h.pgh_created_at }}</td> | |
<td>{{ h.pgh_label }}</td> | |
<td>{{ h.object_str|default:"N/A" }}</td> | |
<td>{{ h.user|default:"N/A" }}</td> | |
<td> | |
{% if h.url and h.url != "N/A" %} | |
<a href="{{ h.url }}" target="_blank" title="{{ h.url }}">{{ h.url|truncatechars:50 }}</a> | |
{% else %} | |
N/A | |
{% endif %} | |
</td> | |
<td>{{ h.remote_addr|default:"N/A" }}</td> | |
<td> | |
<details> | |
<summary style="cursor: pointer; color: #007bff; text-decoration: underline; font-weight: 500;"> | |
<i class="fa fa-plus-circle" style="margin-right: 5px;"></i>View | |
</summary> | |
<pre>{{ h.pgh_data|pprint|default:"N/A" }}</pre> | |
</details> | |
</td> | |
<td> | |
{% if h.pgh_context %} | |
<details> | |
<summary style="cursor: pointer; color: #007bff; text-decoration: underline; font-weight: 500;"> | |
<i class="fa fa-plus-circle" style="margin-right: 5px;"></i>View | |
</summary> | |
<pre>{{ h.pgh_context|pprint|default:"N/A" }}</pre> | |
</details> | |
{% else %} | |
<span class="text-muted">None</span> | |
{% endif %} | |
</td> | |
<td>{{ h.pgh_obj_id|default:"N/A" }}</td> | |
<td> | |
{% if h.pgh_label == "initial_import" %} | |
<span class="badge badge-info">Initial Import</span> | |
{% elif h.pgh_diff %} | |
<div> | |
{% for field, values in h.pgh_diff.items %} | |
<div style="margin-bottom: 4px;"> | |
<strong>{{ field }}:</strong> | |
<span class="text-danger"> | |
{% if values.0 %} | |
{{ values.0|truncatechars:50 }} | |
{% else %} | |
<em>empty</em> | |
{% endif %} | |
</span> | |
to | |
<span class="text-success"> | |
{% if values.1 %} | |
{{ values.1|truncatechars:50 }} | |
{% else %} | |
<em>empty</em> | |
{% endif %} | |
</span> | |
</div> | |
{% endfor %} | |
</div> | |
{% else %} | |
<span class="text-muted">No Changes</span> | |
{% endif %} | |
</td> | |
</tr> | |
{% endfor %} | |
</table> | |
</div> | |
<div class="clearfix"> | |
{% include "dojo/paging_snippet.html" with page=pghistory_history %} | |
</div> | |
</div> | |
{% endif %} | |
{% if auditlog_history %} | |
<div class="panel panel-default"> | |
<div class="panel-heading tight"> | |
<h4> | |
Audit Log History (django-auditlog) | |
<div class="dropdown pull-right"> | |
<button id="show-filters" data-toggle="collapse" data-target="#the-filters" class="btn btn-primary toggle-filters" aria-label="Filters"> <i class="fa-solid fa-filter"></i> <i class="caret"></i> </button> | |
</div> | |
</h4> | |
</div> | |
<div id="the-filters" class="is-filters panel-body collapse {% if log_entry_filter.form.has_changed %}in{% endif %}"> | |
{% include "dojo/filter_snippet.html" with form=log_entry_filter.form %} | |
</div> | |
<div class="clearfix"> | |
{% include "dojo/paging_snippet.html" with page=auditlog_history %} | |
</div> | |
<div class="table-responsive"> | |
<table class="tablesorter-bootstrap table table-bordered table-condensed table-striped table-hover"> | |
<tr> | |
<th>Action</th> | |
<th>Actor</th> | |
<th>Date/Time</th> | |
<th>Changes</th> | |
</tr> | |
{% for h in auditlog_history %} | |
<tr> | |
<td>{{ h }}</td> | |
<td>{{ h.actor }}</td> | |
<td>{{ h.timestamp }}</td> | |
<td> | |
{{ h.changes|action_log_entry|linebreaks}} | |
</td> | |
</tr> | |
{% endfor %} | |
</table> | |
</div> | |
<div class="clearfix"> | |
{% include "dojo/paging_snippet.html" with page=auditlog_history %} | |
</div> | |
</div> | |
{% endif %} | |
{% if not pghistory_history and not auditlog_history %} | |
<p class="text-center">No update history found for this object</p> | |
{% endif %} | |
</div> | |
</div> |
Information Disclosure: User Enumeration via Audit Log Fallback Logic in dojo/engagement/signals.py
Vulnerability | Information Disclosure: User Enumeration via Audit Log Fallback Logic |
---|---|
Description | The action_history view, accessible to users with Product_View or Engagement_View permissions, displays audit events. When an object (like an Engagement, Product, or Finding Group) is deleted, the system attempts to identify the deleting user from pghistory or django-auditlog . If a user is found, their username is explicitly included in the audit description displayed in the action_history view. This allows low-privileged users to enumerate the usernames of users (including potentially higher-privileged ones) who performed deletion actions on objects they have view access to. |
django-DefectDojo/dojo/engagement/signals.py
Lines 47 to 84 in d89dd5e
with contextlib.suppress(sender.DoesNotExist, Product.DoesNotExist): | |
if instance == origin: | |
description = _('The engagement "%(name)s" was deleted') % {"name": instance.name} | |
user = None | |
if settings.ENABLE_AUDITLOG: | |
# First try to find deletion author in pghistory events | |
# Look for delete events for this specific engagement instance | |
pghistory_delete_events = DojoEvents.objects.filter( | |
pgh_obj_model="dojo.Engagement", | |
pgh_obj_id=instance.id, | |
pgh_label="delete", | |
).order_by("-pgh_created_at") | |
if pghistory_delete_events.exists(): | |
latest_delete = pghistory_delete_events.first() | |
# Extract user from pghistory context | |
if latest_delete.user: | |
User = get_user_model() | |
with contextlib.suppress(User.DoesNotExist): | |
user = User.objects.get(id=latest_delete.user) | |
# Fall back to django-auditlog if no user found in pghistory | |
if not user: | |
if le := LogEntry.objects.filter( | |
action=LogEntry.Action.DELETE, | |
content_type=ContentType.objects.get(app_label="dojo", model="engagement"), | |
object_id=instance.id, | |
).order_by("-id").first(): | |
user = le.actor | |
# Update description with user if found | |
if user: | |
description = _('The engagement "%(name)s" was deleted by %(user)s') % { | |
"name": instance.name, "user": user} | |
create_notification(event="engagement_deleted", # template does not exists, it will default to "other" but this event name needs to stay because of unit testing | |
title=_("Deletion of %(name)s") % {"name": instance.name}, | |
description=description, |
IP Address Spoofing in Audit Logs in dojo/middleware.py
Vulnerability | IP Address Spoofing in Audit Logs |
---|---|
Description | The PgHistoryMiddleware in dojo/middleware.py directly uses the HTTP_X_FORWARDED_FOR header to determine the client's IP address for audit logging. This header is easily spoofable by an attacker. The code explicitly takes the first IP address from the comma-separated list in HTTP_X_FORWARDED_FOR . Without proper configuration of USE_X_FORWARDED_HOST and FORWARED_HOST_ALLOWED_IPS in Django settings, or a trusted proxy that sanitizes this header, an attacker can set this header to an arbitrary IP address, thereby falsifying their origin in the audit logs. This undermines the integrity and reliability of the audit trail, as audit logs could be manipulated to obscure an attacker's true source IP. |
django-DefectDojo/dojo/middleware.py
Lines 192 to 217 in d89dd5e
return self.get_response(request) | |
class PgHistoryMiddleware(pghistory.middleware.HistoryMiddleware): | |
""" | |
Custom pghistory middleware for DefectDojo that extends the built-in HistoryMiddleware | |
to add remote_addr context following the pattern from: | |
https://django-pghistory.readthedocs.io/en/3.8.1/context/#middleware | |
""" | |
def get_context(self, request): | |
context = super().get_context(request) | |
# Add remote address with proxy support | |
remote_addr = request.META.get("HTTP_X_FORWARDED_FOR") | |
# Get the first IP if there are multiple (proxy chain), or fall back to REMOTE_ADDR | |
remote_addr = remote_addr.split(",")[0].strip() if remote_addr else request.META.get("REMOTE_ADDR") | |
context["remote_addr"] = remote_addr | |
return context | |
class LongRunningRequestAlertMiddleware: | |
def __init__(self, get_response): | |
self.get_response = get_response |
Denial-of-Service via inefficient icontains lookup on pghistory_context in dojo/filters.py
Vulnerability | Denial-of-Service via inefficient icontains lookup on pghistory_context |
---|---|
Description | The PgHistoryFilter in dojo/filters.py uses icontains lookups for remote_addr and pgh_diff . The icontains lookup in Django translates to a LIKE '%value%' SQL query in PostgreSQL. PostgreSQL's standard B-tree indexes, even expression indexes created on (metadata->>'remote_addr') as seen in dojo/db_migrations/0244_pghistory_indices.py , are not effectively utilized for LIKE '%value%' queries. B-tree indexes are optimized for prefix matches (LIKE 'value%' ) or exact matches, but not for arbitrary substring searches. Similarly, the GIN index created on the entire metadata JSONB field (ON "pghistory_context" USING GIN ("metadata") ) is primarily designed for JSONB-specific operators (e.g., @> , ? , ? , ?& ) which check for key existence or containment of specific JSON structures. It does not efficiently accelerate LIKE '%value%' queries on text values extracted from within the JSONB document. Therefore, when a user applies a filter using icontains on remote_addr or pgh_diff , the database will likely perform a full table scan on the pghistory_context table (or the relevant event table, which joins with pghistory_context to get remote_addr and pgh_diff from metadata ). If there is a large volume of audit log entries, this full table scan can consume significant CPU and I/O resources, leading to a denial-of-service (DoS) for the database and the application. |
django-DefectDojo/dojo/filters.py
Lines 3488 to 3554 in d89dd5e
} | |
class PgHistoryFilter(DojoFilter): | |
""" | |
Filter for django-pghistory audit entries. | |
This filter works with pghistory event tables that have: | |
- pgh_created_at: timestamp of the event | |
- pgh_label: event type (insert/update/delete) | |
- user: user ID from context | |
- url: URL from context | |
- remote_addr: IP address from context | |
""" | |
# Filter by event creation time (equivalent to auditlog timestamp) | |
pgh_created_at = DateRangeFilter(field_name="pgh_created_at", label="Timestamp") | |
# Filter by event type/label | |
pgh_label = ChoiceFilter( | |
field_name="pgh_label", | |
label="Event Type", | |
choices=[ | |
("", "All"), | |
("insert", "Insert"), | |
("update", "Update"), | |
("delete", "Delete"), | |
("initial_import", "Initial Import"), | |
], | |
) | |
# Filter by user (from context) | |
user = ModelChoiceFilter( | |
field_name="user", | |
queryset=Dojo_User.objects.none(), | |
label="User", | |
empty_label="All Users", | |
) | |
# Filter by IP address (from context) | |
remote_addr = CharFilter( | |
field_name="remote_addr", | |
lookup_expr="icontains", | |
label="IP Address Contains", | |
) | |
# Filter by changes/diff field (JSON field containing what changed) | |
pgh_diff = CharFilter( | |
field_name="pgh_diff", | |
lookup_expr="icontains", | |
label="Changes Contains", | |
help_text="Search for field names or values in the changes", | |
) | |
def __init__(self, *args, **kwargs): | |
super().__init__(*args, **kwargs) | |
self.form.fields["user"].queryset = get_authorized_users(Permissions.Product_View) | |
class Meta: | |
fields = ["pgh_created_at", "pgh_label", "user", "url", "remote_addr", "pgh_diff"] | |
exclude = [] | |
class ProductTypeFilter(DojoFilter): | |
name = CharFilter(lookup_expr="icontains") | |
Destructive Operation with --force option bypasses critical confirmation in dojo/management/commands/pghistory_clear.py
Vulnerability | Destructive Operation with --force option bypasses critical confirmation |
---|---|
Description | The pghistory_clear.py management command includes a --force flag that, when used, bypasses the interactive confirmation prompt before executing a destructive operation (truncating or dropping pghistory audit log tables). This means that an attacker who has gained the ability to execute Django management commands could use this flag to silently and irreversibly delete audit log data, potentially covering their tracks without user interaction. |
django-DefectDojo/dojo/management/commands/pghistory_clear.py
Lines 1 to 206 in d89dd5e
""" | |
Management command to clear all pghistory Event tables. | |
This command removes all historical event data from django-pghistory tables. | |
Use with caution as this operation is irreversible. It's meant to be used only during development/testing. | |
""" | |
import logging | |
from django.apps import apps | |
from django.conf import settings | |
from django.core.management.base import BaseCommand | |
from django.db import connection, transaction | |
logger = logging.getLogger(__name__) | |
class Command(BaseCommand): | |
help = "Clear all pghistory Event tables" | |
def add_arguments(self, parser): | |
parser.add_argument( | |
"--dry-run", | |
action="store_true", | |
help="Show what would be cleared without actually clearing", | |
) | |
parser.add_argument( | |
"--force", | |
action="store_true", | |
help="Skip confirmation prompt (use with caution)", | |
) | |
parser.add_argument( | |
"--drop", | |
action="store_true", | |
help="Drop tables entirely instead of truncating (EXTREMELY DESTRUCTIVE)", | |
) | |
def handle(self, *args, **options): | |
if not settings.ENABLE_AUDITLOG or settings.AUDITLOG_TYPE != "django-pghistory": | |
self.stdout.write( | |
self.style.WARNING( | |
"pghistory is not enabled. Set DD_ENABLE_AUDITLOG=True and " | |
"DD_AUDITLOG_TYPE=django-pghistory", | |
), | |
) | |
return | |
# All pghistory Event tables based on tracked models | |
event_tables = [ | |
"Cred_UserEvent", | |
"Dojo_UserEvent", | |
"EndpointEvent", | |
"EngagementEvent", | |
"Finding_GroupEvent", | |
"Finding_TemplateEvent", | |
"FindingEvent", | |
"Notification_WebhooksEvent", | |
"Product_TypeEvent", | |
"ProductEvent", | |
"Risk_AcceptanceEvent", | |
"TestEvent", | |
] | |
dry_run = options["dry_run"] | |
force = options["force"] | |
drop_tables = options["drop"] | |
if dry_run: | |
self.stdout.write( | |
self.style.WARNING("DRY RUN MODE - No data will be cleared"), | |
) | |
total_records = 0 | |
table_counts = {} | |
# First, count all records | |
self.stdout.write("Analyzing pghistory Event tables...") | |
for table_name in event_tables: | |
try: | |
EventModel = apps.get_model("dojo", table_name) | |
count = EventModel.objects.count() | |
table_counts[table_name] = count | |
total_records += count | |
if count > 0: | |
self.stdout.write(f" {table_name}: {count:,} records") | |
else: | |
self.stdout.write(f" {table_name}: empty") | |
except LookupError: | |
self.stdout.write( | |
self.style.WARNING(f" {table_name}: table not found (skipping)"), | |
) | |
continue | |
except Exception as e: | |
self.stdout.write( | |
self.style.ERROR(f" {table_name}: error counting records - {e}"), | |
) | |
continue | |
if total_records == 0: | |
self.stdout.write( | |
self.style.SUCCESS("No pghistory records found. Nothing to clear."), | |
) | |
return | |
self.stdout.write(f"\nTotal records to clear: {total_records:,}") | |
if dry_run: | |
operation = "drop" if drop_tables else "clear" | |
self.stdout.write( | |
self.style.SUCCESS( | |
f"\nDRY RUN COMPLETE: Would {operation} {total_records:,} records " | |
f"from {len([t for t in table_counts.values() if t > 0])} tables", | |
), | |
) | |
return | |
# Confirmation prompt | |
if not force: | |
if drop_tables: | |
self.stdout.write( | |
self.style.ERROR( | |
f"\n🚨 EXTREMELY DESTRUCTIVE WARNING: This will DROP {len([t for t in table_counts.values() if t > 0])} " | |
f"pghistory Event tables entirely, deleting {total_records:,} records and the table structure! " | |
"You will need to recreate tables and run migrations to restore them!", | |
), | |
) | |
else: | |
self.stdout.write( | |
self.style.WARNING( | |
f"\n⚠️ WARNING: This will permanently delete {total_records:,} " | |
"pghistory records. This operation cannot be undone!", | |
), | |
) | |
operation_type = "DROP TABLES" if drop_tables else "truncate tables" | |
confirm = input(f"Are you sure you want to {operation_type}? Type 'yes' to continue: ") | |
if confirm.lower() != "yes": | |
self.stdout.write(self.style.ERROR("Operation cancelled.")) | |
return | |
# Clear the tables using TRUNCATE or DROP | |
operation_verb = "Dropping" if drop_tables else "Truncating" | |
self.stdout.write(f"\n{operation_verb} pghistory Event tables...") | |
cleared_records = 0 | |
cleared_tables = 0 | |
for table_name in event_tables: | |
if table_counts.get(table_name, 0) == 0: | |
continue # Skip empty tables | |
try: | |
EventModel = apps.get_model("dojo", table_name) | |
# Use raw SQL TRUNCATE or DROP for better performance on large tables | |
with transaction.atomic(): | |
count = table_counts.get(table_name, 0) | |
if count > 0: | |
# Get the actual database table name | |
db_table = EventModel._meta.db_table | |
with connection.cursor() as cursor: | |
if drop_tables: | |
# DROP TABLE - completely removes the table structure | |
cursor.execute(f'DROP TABLE IF EXISTS "{db_table}" CASCADE') | |
operation_past = "Dropped" | |
else: | |
# TRUNCATE TABLE - removes all data but keeps table structure | |
cursor.execute(f'TRUNCATE TABLE "{db_table}" RESTART IDENTITY CASCADE') | |
operation_past = "Truncated" | |
cleared_records += count | |
cleared_tables += 1 | |
self.stdout.write( | |
self.style.SUCCESS(f" ✓ {operation_past} {table_name}: {count:,} records"), | |
) | |
except LookupError: | |
# Already handled in counting phase | |
continue | |
except Exception as e: | |
operation_verb_lower = "drop" if drop_tables else "truncate" | |
self.stdout.write( | |
self.style.ERROR(f" ✗ Failed to {operation_verb_lower} {table_name}: {e}"), | |
) | |
logger.error(f"Error {operation_verb_lower}ing {table_name}: {e}") | |
# Final success message | |
if drop_tables: | |
self.stdout.write( | |
self.style.SUCCESS( | |
f"\n🎉 DROP COMPLETE: Dropped {cleared_tables} tables with {cleared_records:,} records", | |
), | |
) | |
self.stdout.write( | |
self.style.WARNING( | |
"⚠️ Remember to run migrations to recreate the dropped tables!", | |
), | |
) | |
else: | |
self.stdout.write( | |
self.style.SUCCESS( | |
f"\n🎉 CLEARING COMPLETE: Cleared {cleared_records:,} records " | |
f"from {cleared_tables} tables", | |
), | |
) |
Information Disclosure via Raw Audit Data in dojo/templates/dojo/action_history.html
Vulnerability | Information Disclosure via Raw Audit Data |
---|---|
Description | The action_history.html template directly renders raw pgh_data and pgh_context fields from django-pghistory events. The pgh_data field contains a snapshot of all model fields (unless explicitly excluded), and pgh_context contains request details like remote_addr and url . This can lead to the unintended exposure of sensitive information (e.g., internal notes, PII like IP addresses, or sensitive GET parameters in URLs) to authorized users who access the audit history, even if they don't have direct access to those specific sensitive fields in the live object. |
django-DefectDojo/dojo/templates/dojo/action_history.html
Lines 4 to 158 in d89dd5e
{{ block.super }} | |
<div class="row"> | |
<div class="col-md-12"> | |
{% if pghistory_history %} | |
<div class="panel panel-default"> | |
<div class="panel-heading tight"> | |
<h4> | |
PostgreSQL History (pghistory) | |
<div class="dropdown pull-right"> | |
<button id="show-pghistory-filters" data-toggle="collapse" data-target="#pghistory-filters" class="btn btn-primary toggle-filters" aria-label="Filters"> <i class="fa-solid fa-filter"></i> <i class="caret"></i> </button> | |
</div> | |
</h4> | |
</div> | |
<div id="pghistory-filters" class="is-filters panel-body collapse {% if pghistory_filter.form.has_changed %}in{% endif %}"> | |
{% include "dojo/filter_snippet.html" with form=pghistory_filter.form %} | |
</div> | |
<div class="clearfix"> | |
{% include "dojo/paging_snippet.html" with page=pghistory_history %} | |
</div> | |
<div class="table-responsive"> | |
<table class="tablesorter-bootstrap table table-bordered table-condensed table-striped table-hover"> | |
<tr> | |
<th>Timestamp</th> | |
<th>Label</th> | |
<th>Object</th> | |
<th>User</th> | |
<th>URL</th> | |
<th>IP Address</th> | |
<th>Data</th> | |
<th>Context</th> | |
<th>Object ID</th> | |
<th>Changes</th> | |
</tr> | |
{% for h in pghistory_history %} | |
<tr> | |
<td>{{ h.pgh_created_at }}</td> | |
<td>{{ h.pgh_label }}</td> | |
<td>{{ h.object_str|default:"N/A" }}</td> | |
<td>{{ h.user|default:"N/A" }}</td> | |
<td> | |
{% if h.url and h.url != "N/A" %} | |
<a href="{{ h.url }}" target="_blank" title="{{ h.url }}">{{ h.url|truncatechars:50 }}</a> | |
{% else %} | |
N/A | |
{% endif %} | |
</td> | |
<td>{{ h.remote_addr|default:"N/A" }}</td> | |
<td> | |
<details> | |
<summary style="cursor: pointer; color: #007bff; text-decoration: underline; font-weight: 500;"> | |
<i class="fa fa-plus-circle" style="margin-right: 5px;"></i>View | |
</summary> | |
<pre>{{ h.pgh_data|pprint|default:"N/A" }}</pre> | |
</details> | |
</td> | |
<td> | |
{% if h.pgh_context %} | |
<details> | |
<summary style="cursor: pointer; color: #007bff; text-decoration: underline; font-weight: 500;"> | |
<i class="fa fa-plus-circle" style="margin-right: 5px;"></i>View | |
</summary> | |
<pre>{{ h.pgh_context|pprint|default:"N/A" }}</pre> | |
</details> | |
{% else %} | |
<span class="text-muted">None</span> | |
{% endif %} | |
</td> | |
<td>{{ h.pgh_obj_id|default:"N/A" }}</td> | |
<td> | |
{% if h.pgh_label == "initial_import" %} | |
<span class="badge badge-info">Initial Import</span> | |
{% elif h.pgh_diff %} | |
<div> | |
{% for field, values in h.pgh_diff.items %} | |
<div style="margin-bottom: 4px;"> | |
<strong>{{ field }}:</strong> | |
<span class="text-danger"> | |
{% if values.0 %} | |
{{ values.0|truncatechars:50 }} | |
{% else %} | |
<em>empty</em> | |
{% endif %} | |
</span> | |
to | |
<span class="text-success"> | |
{% if values.1 %} | |
{{ values.1|truncatechars:50 }} | |
{% else %} | |
<em>empty</em> | |
{% endif %} | |
</span> | |
</div> | |
{% endfor %} | |
</div> | |
{% else %} | |
<span class="text-muted">No Changes</span> | |
{% endif %} | |
</td> | |
</tr> | |
{% endfor %} | |
</table> | |
</div> | |
<div class="clearfix"> | |
{% include "dojo/paging_snippet.html" with page=pghistory_history %} | |
</div> | |
</div> | |
{% endif %} | |
{% if auditlog_history %} | |
<div class="panel panel-default"> | |
<div class="panel-heading tight"> | |
<h4> | |
Audit Log History (django-auditlog) | |
<div class="dropdown pull-right"> | |
<button id="show-filters" data-toggle="collapse" data-target="#the-filters" class="btn btn-primary toggle-filters" aria-label="Filters"> <i class="fa-solid fa-filter"></i> <i class="caret"></i> </button> | |
</div> | |
</h4> | |
</div> | |
<div id="the-filters" class="is-filters panel-body collapse {% if log_entry_filter.form.has_changed %}in{% endif %}"> | |
{% include "dojo/filter_snippet.html" with form=log_entry_filter.form %} | |
</div> | |
<div class="clearfix"> | |
{% include "dojo/paging_snippet.html" with page=auditlog_history %} | |
</div> | |
<div class="table-responsive"> | |
<table class="tablesorter-bootstrap table table-bordered table-condensed table-striped table-hover"> | |
<tr> | |
<th>Action</th> | |
<th>Actor</th> | |
<th>Date/Time</th> | |
<th>Changes</th> | |
</tr> | |
{% for h in auditlog_history %} | |
<tr> | |
<td>{{ h }}</td> | |
<td>{{ h.actor }}</td> | |
<td>{{ h.timestamp }}</td> | |
<td> | |
{{ h.changes|action_log_entry|linebreaks}} | |
</td> | |
</tr> | |
{% endfor %} | |
</table> | |
</div> | |
<div class="clearfix"> | |
{% include "dojo/paging_snippet.html" with page=auditlog_history %} | |
</div> | |
</div> | |
{% endif %} | |
{% if not pghistory_history and not auditlog_history %} | |
<p class="text-center">No update history found for this object</p> | |
{% endif %} | |
</div> | |
</div> |
Application Starts with Incomplete Database Schema in docker/entrypoint-initializer.sh
Vulnerability | Application Starts with Incomplete Database Schema |
---|---|
Description | The entrypoint-initializer.sh script has been modified to allow the application to continue starting even if Django database migrations are detected as missing. This change transforms a critical failure condition into a warning. Running an application with an inconsistent database schema can lead to unpredictable behavior, data integrity issues, and potential security vulnerabilities if critical security-related model changes (e.g., new access control fields, data validation constraints) are not applied to the database. While the change aims to improve resilience, it introduces a risk of operating in an undefined state where security controls might not be fully enforced. |
django-DefectDojo/docker/entrypoint-initializer.sh
Lines 121 to 145 in d89dd5e
great documentation thoroughly: | |
https://docs.djangoproject.com/en/5.0/topics/migrations/ | |
This is now a WARNING and the container will continue to start. | |
However, you should create the necessary migrations as soon as possible using: | |
docker compose exec uwsgi bash -c 'python manage.py makemigrations -v2' | |
******************************************************************************** | |
EOF | |
echo "WARNING: Continuing startup despite missing migrations..." | |
} | |
echo "Migrating" | |
python3 manage.py migrate | |
echo "Configuring pghistory triggers based on audit settings" | |
cat <<EOD | python3 manage.py shell | |
from dojo.auditlog import configure_pghistory_triggers | |
configure_pghistory_triggers() | |
EOD | |
echo "Admin user: ${DD_ADMIN_USER}" | |
ADMIN_EXISTS=$(echo "SELECT * from auth_user;" | python manage.py dbshell | grep "${DD_ADMIN_USER}" || true) | |
# Abort if the admin user already exists, instead of giving a new fake password that won't work |
Potential for Information Disclosure of Sensitive Data via Audit Logs in dojo/management/commands/pghistory_backfill.py
Vulnerability | Potential for Information Disclosure of Sensitive Data via Audit Logs |
---|---|
Description | The pghistory_backfill.py command uses a manually maintained, hardcoded dictionary excluded_fields_map to prevent sensitive fields from being written into the audit history during a backfill. While the currently identified sensitive fields (password , header_name , header_value ) are correctly excluded in both the backfill script and the live django-pghistory configuration, this manual approach is fragile. If a new sensitive field is added to a model in the future, a developer might forget to update this exclusion list in pghistory_backfill.py (and potentially dojo/auditlog.py ), leading to sensitive data being inadvertently written to the audit log tables. These logs may have different access controls and longer retention periods, increasing the risk of information disclosure. |
django-DefectDojo/dojo/management/commands/pghistory_backfill.py
Lines 1 to 265 in d89dd5e
""" | |
Management command to backfill existing data into django-pghistory. | |
This command creates initial snapshots for all existing records in tracked models. | |
""" | |
import logging | |
from django.apps import apps | |
from django.conf import settings | |
from django.core.management.base import BaseCommand | |
from django.utils import timezone | |
logger = logging.getLogger(__name__) | |
class Command(BaseCommand): | |
help = "Backfill existing data into django-pghistory" | |
def add_arguments(self, parser): | |
parser.add_argument( | |
"--model", | |
type=str, | |
help='Specific model to backfill (e.g., "Finding", "Product")', | |
) | |
parser.add_argument( | |
"--batch-size", | |
type=int, | |
default=1000, | |
help="Number of records to process in each batch (default: 1000)", | |
) | |
parser.add_argument( | |
"--dry-run", | |
action="store_true", | |
help="Show what would be done without actually creating events", | |
) | |
def get_excluded_fields(self, model_name): | |
"""Get the list of excluded fields for a specific model from pghistory configuration.""" | |
# Define excluded fields for each model (matching auditlog.py) | |
excluded_fields_map = { | |
"Dojo_User": ["password"], | |
"Product": ["updated"], # This is the key change | |
"Cred_User": ["password"], | |
"Notification_Webhooks": ["header_name", "header_value"], | |
} | |
return excluded_fields_map.get(model_name, []) | |
def handle(self, *args, **options): | |
if not settings.ENABLE_AUDITLOG or settings.AUDITLOG_TYPE != "django-pghistory": | |
self.stdout.write( | |
self.style.WARNING( | |
"pghistory is not enabled. Set DD_ENABLE_AUDITLOG=True and " | |
"DD_AUDITLOG_TYPE=django-pghistory", | |
), | |
) | |
return | |
# Models that are tracked by pghistory | |
tracked_models = [ | |
"Dojo_User", "Endpoint", "Engagement", "Finding", "Finding_Group", | |
"Product_Type", "Product", "Test", "Risk_Acceptance", | |
"Finding_Template", "Cred_User", "Notification_Webhooks", | |
] | |
specific_model = options.get("model") | |
if specific_model: | |
if specific_model not in tracked_models: | |
self.stdout.write( | |
self.style.ERROR( | |
f'Model "{specific_model}" is not tracked by pghistory. ' | |
f'Available models: {", ".join(tracked_models)}', | |
), | |
) | |
return | |
tracked_models = [specific_model] | |
batch_size = options["batch_size"] | |
dry_run = options["dry_run"] | |
if dry_run: | |
self.stdout.write( | |
self.style.WARNING("DRY RUN MODE - No events will be created"), | |
) | |
total_processed = 0 | |
self.stdout.write(f"Starting backfill for {len(tracked_models)} model(s)...") | |
for model_name in tracked_models: | |
self.stdout.write(f"\nProcessing {model_name}...") | |
try: | |
# Get the Django model | |
Model = apps.get_model("dojo", model_name) | |
# Get total count | |
total_count = Model.objects.count() | |
if total_count == 0: | |
self.stdout.write(f" No records found for {model_name}") | |
continue | |
self.stdout.write(f" Found {total_count:,} records") | |
# Get the corresponding Event model for bulk operations | |
event_table_name = f"{model_name}Event" | |
try: | |
EventModel = apps.get_model("dojo", event_table_name) | |
except LookupError: | |
self.stdout.write( | |
self.style.ERROR( | |
f" Event model {event_table_name} not found. " | |
f"Is {model_name} tracked by pghistory?", | |
), | |
) | |
continue | |
# Get IDs of records that already have initial_import events | |
existing_initial_import_ids = set( | |
EventModel.objects.filter(pgh_label="initial_import").values_list("pgh_obj_id", flat=True), | |
) | |
# Filter to only get records that don't have initial_import events | |
records_needing_backfill = Model.objects.exclude(id__in=existing_initial_import_ids) | |
backfill_count = records_needing_backfill.count() | |
existing_count = len(existing_initial_import_ids) | |
# Log the breakdown | |
self.stdout.write(f" Records with initial_import events: {existing_count:,}") | |
self.stdout.write(f" Records needing initial_import events: {backfill_count:,}") | |
if backfill_count == 0: | |
self.stdout.write( | |
self.style.SUCCESS(f" ✓ All {total_count:,} records already have initial_import events"), | |
) | |
processed = total_count | |
continue | |
if dry_run: | |
self.stdout.write(f" Would process {backfill_count:,} records in batches of {batch_size:,}...") | |
else: | |
self.stdout.write(f" Processing {backfill_count:,} records in batches of {batch_size:,}...") | |
# Process records one by one and bulk insert every batch_size records | |
processed = 0 | |
event_records = [] | |
failed_records = [] | |
for instance in records_needing_backfill.iterator(): | |
try: | |
# Create event record with all model fields | |
event_data = {} | |
# Get excluded fields for this model from pghistory configuration | |
excluded_fields = self.get_excluded_fields(model_name) | |
# Copy all fields from the instance to event_data, except excluded ones | |
for field in instance._meta.fields: | |
field_name = field.name | |
if field_name not in excluded_fields: | |
field_value = getattr(instance, field_name) | |
event_data[field_name] = field_value | |
# Explicitly preserve created timestamp from the original instance | |
# Only if not excluded and exists | |
if hasattr(instance, "created") and instance.created and "created" not in excluded_fields: | |
event_data["created"] = instance.created | |
# Note: We don't preserve 'updated' for Product since it's excluded | |
# Add pghistory-specific fields | |
event_data.update({ | |
"pgh_label": "initial_import", | |
"pgh_obj": instance, # ForeignKey to the original object | |
"pgh_context": None, # No context for backfilled events | |
}) | |
# Set pgh_created_at to current time (this is for the event creation time) | |
# The created/updated fields above contain the original instance timestamps | |
event_data["pgh_created_at"] = timezone.now() | |
event_records.append(EventModel(**event_data)) | |
except Exception as e: | |
failed_records.append(instance.id) | |
logger.error( | |
f"Failed to prepare event for {model_name} ID {instance.id}: {e}", | |
) | |
# Bulk create when we hit batch_size records | |
if len(event_records) >= batch_size: | |
if not dry_run and event_records: | |
try: | |
attempted = len(event_records) | |
created_objects = EventModel.objects.bulk_create(event_records, batch_size=batch_size) | |
actually_created = len(created_objects) if created_objects else 0 | |
processed += actually_created | |
if actually_created != attempted: | |
logger.warning( | |
f"bulk_create for {model_name}: attempted {attempted}, " | |
f"actually created {actually_created} ({attempted - actually_created} skipped)", | |
) | |
except Exception as e: | |
logger.error(f"Failed to bulk create events for {model_name}: {e}") | |
raise | |
elif dry_run: | |
processed += len(event_records) | |
event_records = [] # Reset for next batch | |
# Progress update | |
progress = (processed / backfill_count) * 100 | |
self.stdout.write(f" Processed {processed:,}/{backfill_count:,} records needing backfill ({progress:.1f}%)") | |
# Handle remaining records | |
if event_records: | |
if not dry_run: | |
try: | |
attempted = len(event_records) | |
created_objects = EventModel.objects.bulk_create(event_records, batch_size=batch_size) | |
actually_created = len(created_objects) if created_objects else 0 | |
processed += actually_created | |
if actually_created != attempted: | |
logger.warning( | |
f"bulk_create final batch for {model_name}: attempted {attempted}, " | |
f"actually created {actually_created} ({attempted - actually_created} skipped)", | |
) | |
except Exception as e: | |
logger.error(f"Failed to bulk create final batch for {model_name}: {e}") | |
raise | |
else: | |
processed += len(event_records) | |
# Final progress update | |
if backfill_count > 0: | |
progress = (processed / backfill_count) * 100 | |
self.stdout.write(f" Processed {processed:,}/{backfill_count:,} records needing backfill ({progress:.1f}%)") | |
total_processed += processed | |
# Show completion summary | |
if failed_records: | |
self.stdout.write( | |
self.style.WARNING( | |
f" ⚠ Completed {model_name}: {processed:,} records processed, " | |
f"{len(failed_records)} records failed", | |
), | |
) | |
else: | |
self.stdout.write( | |
self.style.SUCCESS( | |
f" ✓ Completed {model_name}: {processed:,} records", | |
), | |
) | |
except Exception as e: | |
self.stdout.write( | |
self.style.ERROR(f" ✗ Failed to process {model_name}: {e}"), | |
) | |
logger.error(f"Error processing {model_name}: {e}") | |
self.stdout.write( | |
self.style.SUCCESS( | |
f"\nBACKFILL COMPLETE: Processed {total_processed:,} records", | |
), | |
) |
Incomplete Error Handling for Audit Trigger Failures in dojo/auditlog.py
Vulnerability | Incomplete Error Handling for Audit Trigger Failures |
---|---|
Description | The enable_django_pghistory , disable_django_pghistory , and configure_pghistory_triggers functions in dojo/auditlog.py use broad try...except Exception blocks when calling pgtrigger commands. If pgtrigger fails to enable or disable its database triggers (e.g., due to database connection issues, permissions, or misconfiguration), the application will log a warning but continue to start and operate. This leads to a silent failure of the django-pghistory audit logging system, resulting in a critical loss of security visibility as audit events will not be recorded, despite the system appearing to function normally. |
django-DefectDojo/dojo/auditlog.py
Lines 1 to 388 in d89dd5e
""" | |
Audit logging configuration for DefectDojo. | |
This module handles conditional registration of models with either django-auditlog | |
or django-pghistory based on the DD_AUDITLOG_TYPE setting. | |
""" | |
import contextlib | |
import logging | |
import sys | |
import pghistory | |
from django.conf import settings | |
from django.core.management import call_command | |
from django.db import models | |
logger = logging.getLogger(__name__) | |
def enable_django_auditlog(): | |
"""Enable django-auditlog by registering models.""" | |
# Import inside function to avoid AppRegistryNotReady errors | |
from auditlog.registry import auditlog # noqa: PLC0415 | |
from dojo.models import ( # noqa: PLC0415 | |
Cred_User, | |
Dojo_User, | |
Endpoint, | |
Engagement, | |
Finding, | |
Finding_Group, | |
Finding_Template, | |
Notification_Webhooks, | |
Product, | |
Product_Type, | |
Risk_Acceptance, | |
Test, | |
) | |
logger.info("Enabling django-auditlog: Registering models") | |
auditlog.register(Dojo_User, exclude_fields=["password"]) | |
auditlog.register(Endpoint) | |
auditlog.register(Engagement) | |
auditlog.register(Finding, m2m_fields={"reviewers"}) | |
auditlog.register(Finding_Group) | |
auditlog.register(Product_Type) | |
auditlog.register(Product) | |
auditlog.register(Test) | |
auditlog.register(Risk_Acceptance) | |
auditlog.register(Finding_Template) | |
auditlog.register(Cred_User, exclude_fields=["password"]) | |
auditlog.register(Notification_Webhooks, exclude_fields=["header_name", "header_value"]) | |
logger.info("Successfully enabled django-auditlog") | |
def disable_django_auditlog(): | |
"""Disable django-auditlog by unregistering models.""" | |
# Import inside function to avoid AppRegistryNotReady errors | |
from auditlog.registry import auditlog # noqa: PLC0415 | |
from dojo.models import ( # noqa: PLC0415 | |
Cred_User, | |
Dojo_User, | |
Endpoint, | |
Engagement, | |
Finding, | |
Finding_Group, | |
Finding_Template, | |
Notification_Webhooks, | |
Product, | |
Product_Type, | |
Risk_Acceptance, | |
Test, | |
) | |
# Only log during actual application startup, not during shell commands | |
if "shell" not in sys.argv: | |
logger.info("Django-auditlog disabled - unregistering models") | |
# Unregister all models from auditlog | |
models_to_unregister = [ | |
Dojo_User, Endpoint, Engagement, Finding, Finding_Group, | |
Product_Type, Product, Test, Risk_Acceptance, Finding_Template, | |
Cred_User, Notification_Webhooks, | |
] | |
for model in models_to_unregister: | |
with contextlib.suppress(Exception): | |
# Model might not be registered, ignore the error | |
auditlog.unregister(model) | |
def register_django_pghistory_models(): | |
""" | |
Register models with django-pghistory (always called to avoid migrations). | |
Note: This function is always called regardless of audit logging settings because: | |
1. Django migrations are generated based on model registration at import time | |
2. If pghistory models are not registered, Django will try to create migrations | |
to remove the pghistory tables when the models are not found | |
3. This would cause migration conflicts and database inconsistencies | |
4. By always registering the models, we ensure the database schema remains | |
stable while controlling audit behavior through trigger enable/disable | |
So we always register the models and make migrations for them. | |
Then we control the enabling/disabling by enabling/disabling the underlying database | |
triggers. | |
""" | |
# Import models inside function to avoid AppRegistryNotReady errors | |
from dojo.models import ( # noqa: PLC0415 | |
Cred_User, | |
Dojo_User, | |
Endpoint, | |
Engagement, | |
Finding, | |
Finding_Group, | |
Finding_Template, | |
Notification_Webhooks, | |
Product, | |
Product_Type, | |
Risk_Acceptance, | |
Test, | |
) | |
# Only log during actual application startup, not during shell commands | |
if "shell" not in sys.argv: | |
logger.info("Registering models with django-pghistory") | |
# Register models with pghistory for tracking changes | |
# Using pghistory.track() as a decorator function (correct syntax) | |
# The function returns a decorator that should be applied to the model class | |
# Track Dojo_User with excluded fields | |
pghistory.track( | |
pghistory.InsertEvent(), | |
pghistory.UpdateEvent(condition=pghistory.AnyChange(exclude_auto=True)), | |
pghistory.DeleteEvent(), | |
pghistory.ManualEvent(label="initial_import"), | |
exclude=["password"], | |
# add some indexes manually so we don't have to define a customer phistory Event model with overridden fields. | |
meta={ | |
"indexes": [ | |
models.Index(fields=["pgh_created_at"]), | |
models.Index(fields=["pgh_label"]), | |
models.Index(fields=["pgh_context_id"]), | |
], | |
}, | |
)(Dojo_User) | |
pghistory.track( | |
pghistory.InsertEvent(), | |
pghistory.UpdateEvent(condition=pghistory.AnyChange(exclude_auto=True)), | |
pghistory.DeleteEvent(), | |
pghistory.ManualEvent(label="initial_import"), | |
meta={ | |
"indexes": [ | |
models.Index(fields=["pgh_created_at"]), | |
models.Index(fields=["pgh_label"]), | |
models.Index(fields=["pgh_context_id"]), | |
], | |
}, | |
)(Endpoint) | |
pghistory.track( | |
pghistory.InsertEvent(), | |
pghistory.UpdateEvent(condition=pghistory.AnyChange(exclude_auto=True)), | |
pghistory.DeleteEvent(), | |
pghistory.ManualEvent(label="initial_import"), | |
meta={ | |
"indexes": [ | |
models.Index(fields=["pgh_created_at"]), | |
models.Index(fields=["pgh_label"]), | |
models.Index(fields=["pgh_context_id"]), | |
], | |
}, | |
)(Engagement) | |
pghistory.track( | |
pghistory.InsertEvent(), | |
pghistory.UpdateEvent(condition=pghistory.AnyChange(exclude_auto=True)), | |
pghistory.DeleteEvent(), | |
pghistory.ManualEvent(label="initial_import"), | |
meta={ | |
"indexes": [ | |
models.Index(fields=["pgh_created_at"]), | |
models.Index(fields=["pgh_label"]), | |
models.Index(fields=["pgh_context_id"]), | |
], | |
}, | |
)(Finding) | |
pghistory.track( | |
pghistory.InsertEvent(), | |
pghistory.UpdateEvent(condition=pghistory.AnyChange(exclude_auto=True)), | |
pghistory.DeleteEvent(), | |
pghistory.ManualEvent(label="initial_import"), | |
meta={ | |
"indexes": [ | |
models.Index(fields=["pgh_created_at"]), | |
models.Index(fields=["pgh_label"]), | |
models.Index(fields=["pgh_context_id"]), | |
], | |
}, | |
)(Finding_Group) | |
pghistory.track( | |
pghistory.InsertEvent(), | |
pghistory.UpdateEvent(condition=pghistory.AnyChange(exclude_auto=True)), | |
pghistory.DeleteEvent(), | |
pghistory.ManualEvent(label="initial_import"), | |
meta={ | |
"indexes": [ | |
models.Index(fields=["pgh_created_at"]), | |
models.Index(fields=["pgh_label"]), | |
models.Index(fields=["pgh_context_id"]), | |
], | |
}, | |
)(Product_Type) | |
pghistory.track( | |
pghistory.InsertEvent(), | |
pghistory.UpdateEvent(condition=pghistory.AnyChange(exclude_auto=True)), | |
pghistory.DeleteEvent(), | |
pghistory.ManualEvent(label="initial_import"), | |
meta={ | |
"indexes": [ | |
models.Index(fields=["pgh_created_at"]), | |
models.Index(fields=["pgh_label"]), | |
models.Index(fields=["pgh_context_id"]), | |
], | |
}, | |
)(Product) | |
pghistory.track( | |
pghistory.InsertEvent(), | |
pghistory.UpdateEvent(condition=pghistory.AnyChange(exclude_auto=True)), | |
pghistory.DeleteEvent(), | |
pghistory.ManualEvent(label="initial_import"), | |
meta={ | |
"indexes": [ | |
models.Index(fields=["pgh_created_at"]), | |
models.Index(fields=["pgh_label"]), | |
models.Index(fields=["pgh_context_id"]), | |
], | |
}, | |
)(Test) | |
pghistory.track( | |
pghistory.InsertEvent(), | |
pghistory.UpdateEvent(condition=pghistory.AnyChange(exclude_auto=True)), | |
pghistory.DeleteEvent(), | |
pghistory.ManualEvent(label="initial_import"), | |
meta={ | |
"indexes": [ | |
models.Index(fields=["pgh_created_at"]), | |
models.Index(fields=["pgh_label"]), | |
models.Index(fields=["pgh_context_id"]), | |
], | |
}, | |
)(Risk_Acceptance) | |
pghistory.track( | |
pghistory.InsertEvent(), | |
pghistory.UpdateEvent(condition=pghistory.AnyChange(exclude_auto=True)), | |
pghistory.DeleteEvent(), | |
pghistory.ManualEvent(label="initial_import"), | |
meta={ | |
"indexes": [ | |
models.Index(fields=["pgh_created_at"]), | |
models.Index(fields=["pgh_label"]), | |
models.Index(fields=["pgh_context_id"]), | |
], | |
}, | |
)(Finding_Template) | |
pghistory.track( | |
pghistory.InsertEvent(), | |
pghistory.UpdateEvent(condition=pghistory.AnyChange(exclude_auto=True)), | |
pghistory.DeleteEvent(), | |
pghistory.ManualEvent(label="initial_import"), | |
exclude=["password"], | |
meta={ | |
"indexes": [ | |
models.Index(fields=["pgh_created_at"]), | |
models.Index(fields=["pgh_label"]), | |
models.Index(fields=["pgh_context_id"]), | |
], | |
}, | |
)(Cred_User) | |
pghistory.track( | |
pghistory.InsertEvent(), | |
pghistory.UpdateEvent(condition=pghistory.AnyChange(exclude_auto=True)), | |
pghistory.DeleteEvent(), | |
pghistory.ManualEvent(label="initial_import"), | |
exclude=["header_name", "header_value"], | |
meta={ | |
"indexes": [ | |
models.Index(fields=["pgh_created_at"]), | |
models.Index(fields=["pgh_label"]), | |
models.Index(fields=["pgh_context_id"]), | |
], | |
}, | |
)(Notification_Webhooks) | |
# Only log during actual application startup, not during shell commands | |
if "shell" not in sys.argv: | |
logger.info("Successfully registered models with django-pghistory") | |
def enable_django_pghistory(): | |
"""Enable django-pghistory by enabling triggers.""" | |
logger.info("Enabling django-pghistory: Enabling triggers") | |
# Enable pghistory triggers | |
try: | |
call_command("pgtrigger", "enable") | |
logger.info("Successfully enabled pghistory triggers") | |
except Exception as e: | |
logger.warning(f"Failed to enable pgtrigger triggers: {e}") | |
# Don't raise the exception as this shouldn't prevent Django from starting | |
def disable_django_pghistory(): | |
"""Disable django-pghistory by disabling triggers.""" | |
logger.info("Disabling django-pghistory: Disabling triggers") | |
try: | |
call_command("pgtrigger", "disable") | |
logger.info("Successfully disabled pghistory triggers") | |
except Exception as e: | |
logger.warning(f"Failed to disable pgtrigger triggers: {e}") | |
# Don't raise the exception as this shouldn't prevent Django from starting | |
def configure_pghistory_triggers(): | |
""" | |
Configure pghistory triggers based on audit settings. | |
This function should be called after Django startup and migrations to properly | |
enable/disable pghistory triggers without database access warnings. | |
""" | |
if not settings.ENABLE_AUDITLOG: | |
logger.info("Audit logging disabled - disabling pghistory triggers") | |
try: | |
call_command("pgtrigger", "disable") | |
logger.info("Successfully disabled pghistory triggers") | |
except Exception as e: | |
logger.error(f"Failed to disable pghistory triggers: {e}") | |
raise | |
elif settings.AUDITLOG_TYPE == "django-pghistory": | |
try: | |
call_command("pgtrigger", "enable") | |
logger.info("Successfully enabled pghistory triggers") | |
except Exception as e: | |
logger.error(f"Failed to enable pghistory triggers: {e}") | |
raise | |
else: | |
try: | |
call_command("pgtrigger", "disable") | |
logger.info("Successfully disabled pghistory triggers") | |
except Exception as e: | |
logger.error(f"Failed to disable pghistory triggers: {e}") | |
raise | |
def configure_audit_system(): | |
""" | |
Configure the audit system based on settings. | |
Note: This function only handles auditlog registration. pghistory model registration | |
is handled in apps.py, and trigger management should be done via the | |
configure_pghistory_triggers() function to avoid database access during initialization. | |
""" | |
# Only log during actual application startup, not during shell commands | |
log_enabled = "shell" not in sys.argv | |
if not settings.ENABLE_AUDITLOG: | |
if log_enabled: | |
logger.info("Audit logging disabled") | |
disable_django_auditlog() | |
return | |
if settings.AUDITLOG_TYPE == "django-auditlog": | |
if log_enabled: | |
logger.info("Configuring audit system: django-auditlog enabled") | |
enable_django_auditlog() | |
else: | |
if log_enabled: | |
logger.info("django-auditlog disabled (pghistory or other audit type selected)") | |
disable_django_auditlog() |
All finding details can be found in the DryRun Security Dashboard.
Superseded by #13169 on an upstream branch. |
Summary
As part of evaluating and improving import/reimport performance we have selected django-pghistory as the new auditlog for Defect Dojo.
It is entirely based on Postgres triggers to maintain a history/audit log. This makes it run faster and offers new features such as reverting a record to an old version, adding richer context to processes and events, finding events on related records, etc.
This PR doesn't introduce new features yet, it just adds
django-pghistory
as replacement fordjango-auditlog
.EDIT:
This PR allows still for using
dang-auditlog
as auditlog. Initially I planned to let the default todjango-auditlog
in this PR, but then we would not be able to run the unit tests withdjango-pghistory
and vice versa. So I set the default todjango-pghistory
in this PR for now. We'll have to decide on a final approach before merging. The more I think of it, the more I think we should just go for a "big bang" release as catering for both auditlogs in the same codebase is just expensive to maintain, even for a short period. The changes in this PR are in a branch in the main repo, so not in my fork.In a future release
dhango-pghistory
will be the auditlog implementation anddjango-auditlog
will no longer be available for auditlogging.HowTo
To switch an instance over to
django-pghistory
, some steps are needed:DD_AUDITLOG_TYPE
todjango-pghistory
docker compose exec -it uwsgi ./manage.py pghistory backfill
to load the initial snapshot of data intodjango-pghistory
. (this is NOT a data migration fromdjango-auditlog
, but an initial snapshot from the current data in Defect Dojo).In the future both will be part of a new release which will perform the backfill automatically.
Once switched over, you cannot switch back (unless you know what you're doing).
The action history pages will always display the data from both
django-pghistory
anddjango-auditlog
. As these data formats are completely different, there are two tables on the action history page.Some notes about the implementation
django-pghistory
works with database triggers these are created as part of a migration regardless of which audit log is configured.django-auditlog
depending on the chosen auditlog type in settings.Performance
I did a couple of test runs to compare both audit log types using the JFrog Unified Very Many sample scan that contains ~14000 findings. These tests runs show a 20-30% speed improvement on my laptop running docker compose. In a (production) environment with an external database the difference might be bigger due to the increased latency.
django-pghistory
runs inside the database so there are a lot less network roundtrips needed.Scaling
djang-pghistory
documents two settings or design decisions that affect performance: https://django-pghistory.readthedocs.io/en/3.4.3/performance/Row level vs Statement level triggers.
Defect Dojo doesn't do (m)any bulk inserts/updates on the tracked models. So there's no benefit now to switch to Statement level triggers.
If needed we can switch in the future using a one-time schema-only migration.
Denormalizing Context
Context is stored in its own table and events have a foreign key relation to this table. If the Context is intensively used and updated during processes/requests and lots of parallel requests are happening that trigger pghistory events, there could be some contention on that Context table. To "solve" this we could choose to store the context in a column in the Event tables. I don't think the typical use-case in Defect Dojo would really benefit from this, it could be detrimental as it would require more storage and filtering/extracting data from the context might be slower.
If needed we can switch in the future using a one-time schema+data migration.
Indices
By default
djang-pghistory
doesn't create much indices, only a couple of ones we don't need. Changes made:Remove indices on foreign key fields in the event tables (i.e. user_id, found_by, etc). We don't query on those.
Add index on fields that we query on like
pgh_created
,pgh_label
.Add index on
obj_id
field used in joins/queriesAdd index on fields from the json context, i.e.
user
,remote_addr
,url
Next steps
When testing is succesfull and we want to make
django-pghistory
the default, we need to do the following in a follow up PRinitial_import
recorddjango-auditlog
django-auditlog
from MIDDLEWAREdjango-auditlog
installed apps (not sure, it might need to remain to be able to load LogEntry model)