diff --git a/.gitignore b/.gitignore index 635a55a..17e5bf5 100644 --- a/.gitignore +++ b/.gitignore @@ -156,3 +156,6 @@ cython_debug/ utilities/benchmarks/*.txt *.lprof *.mprof + +# Local databases +*.db diff --git a/CHANGELOG.md b/CHANGELOG.md index 2b42db3..fc03d86 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,4 +1,31 @@ # Changelog +## [Unreleased] +### Performance +- Parallel ACL processing using multiprocessing +- SQLite-based object cache for incremental processing - only processes new/changed objects on subsequent runs +- Optimized group membership resolution algorithm from polynomial to linear complexity using reverse index lookups + +### Added +- `--workers N` option to control number of parallel workers (default: ~90% of CPU cores) +- `--no-cache` flag to disable caching for a run +- `--cache-file` option for custom cache database path +- `--cache-stats` to display cache statistics and exit +- `--context-from` to load SID/domain context from previous run's cache for ACL resolution +- Real-time progress indicators with ETA and objects/second processing rate +- CPU core utilization info in logs +- Benchmarking utilities for performance testing + +### Fixed +- Crash on malformed certificate data (invalid base64 in `cacertificate` attribute) +- Crash when certificate chain building encounters null certificates +- Cache operations now properly check for null when `--no-cache` flag is used + +### Changed +- Refactored parser pipeline to use streaming input architecture with generator-based file reading +- Cache storage and commit operations now happen after JSON writing (doesn't block output) +- Improved type hints in ADDS class for better code clarity +- Enhanced debug logging messages for object properties + ## [0.4.20] - 12/16/2025 ### Fixes - Fix [#46](https://github.com/coffeegist/bofhound/issues/46) which caused well-known SIDs (groups) to be mising from bofhound output diff --git a/README.md b/README.md index c9a78e7..32d444e 100644 --- a/README.md +++ b/README.md @@ -108,6 +108,39 @@ Parse Havoc loot logs (will change default input path to `/opt/havoc/data/loot`) bofhound --parser havoc --zip ``` +## Performance & Cache + +This branch introduces a SQLite-backed object cache and parallel-ready ACL processing to speed up repeated runs on large datasets. + +- Caching is enabled by default. When a cache exists, incremental filtering is automatic: objects already seen (by SID/DN) are skipped. +- To disable caching (and incremental behavior), pass `--no-cache` or remove the cache file. +- Cache location defaults to `bofhound_cache.db` in the output folder; override with `--cache-file PATH`. +- ACL processing accepts a worker count via `--workers` (auto-detected if unspecified). +- Inspect cache with `--cache-stats`. + +Examples (Windows PowerShell): + +``` +# First run: build cache and output JSON +.\n+venv\Scripts\bofhound.exe -i samples/ -p All --parser ldapsearch --output out + +# Incremental run: skip unchanged objects (automatic when cache exists) +. +\venv\Scripts\bofhound.exe -i samples/ -p All --parser ldapsearch --output out + +# Disable cache entirely +. +\venv\Scripts\bofhound.exe -i samples/ -p All --parser ldapsearch --output out --no-cache + +# Use a specific cache file and workers +. +\venv\Scripts\bofhound.exe -i samples/ -p All --parser ldapsearch --output out --cache-file out\my_cache.db --workers 6 + +# Show cache stats and exit +. +\venv\Scripts\bofhound.exe -i samples/ --parser ldapsearch --output out --cache-stats +``` + # ldapsearch Specify `*,ntsecuritydescriptor` as the attributes to return to be able to parse ACL edges. You are missing a ton of data if you don't include this in your `ldapsearch` queries! diff --git a/bofhound/__main__.py b/bofhound/__main__.py index c1dab1c..15460f7 100644 --- a/bofhound/__main__.py +++ b/bofhound/__main__.py @@ -11,6 +11,7 @@ from bofhound import console from bofhound.ad.helpers import PropertiesLevel from bofhound.logger import logger +from bofhound.cache import ObjectCache app = typer.Typer( add_completion=False, @@ -44,6 +45,31 @@ def main( help="Compress the JSON output files into a zip archive" ), quiet: bool = typer.Option(False, "--quiet", "-q", help="Suppress banner"), + no_cache: bool = typer.Option( + False, "--no-cache", + help="Disable object caching (cache is enabled by default)", + rich_help_panel="Performance Options" + ), + cache_file: str = typer.Option( + None, "--cache-file", + help="Custom path to cache database (default: bofhound_cache.db in output folder)", + rich_help_panel="Performance Options" + ), + context_from: str = typer.Option( + None, "--context-from", + help="Load SID/domain context from a previous run's cache file for ACL resolution. Use when processing late data (e.g., certificates) separately.", + rich_help_panel="Performance Options" + ), + workers: int = typer.Option( + None, "--workers", + help='Number of worker processes for parallel ACL parsing. Default: ~90%% of CPU cores (auto-detected). Check your system: python -c "import os; print(f\'CPU cores: {os.cpu_count()}\')"', + rich_help_panel="Performance Options" + ), + cache_stats: bool = typer.Option( + False, "--cache-stats", + help="Display cache statistics and exit", + rich_help_panel="Performance Options" + ), mythic_server: str = typer.Option( "127.0.0.1", "--mythic-server", help="IP or hostname of Mythic server to connect to", rich_help_panel="Mythic Options" @@ -73,9 +99,84 @@ def main( else: logging.getLogger().setLevel(logging.INFO) + # Handle cache stats display + if cache_stats: + # Determine cache file path + if not cache_file: + cache_file = f"{output_folder}/bofhound_cache.db" + + import os + if not os.path.exists(cache_file): + console.print(f"[yellow]Cache file not found: {cache_file}[/yellow]") + console.print("Run bofhound first to create the cache.") + sys.exit(0) + + try: + with ObjectCache(cache_file) as cache: + stats = cache.get_statistics() + console.print("\n[bold cyan]Cache Statistics[/bold cyan]") + console.print(f"Cache file: {cache_file}") + console.print(f"Version: {stats['cache_version']}") + console.print(f"Total objects: {stats['total_objects']:,}") + console.print("\n[bold]Objects by type:[/bold]") + for obj_type, count in sorted(stats['by_type'].items(), key=lambda x: x[1], reverse=True): + console.print(f" {obj_type}: {count:,}") + console.print(f"\nCache size: {stats['file_size_mb']} MB") + console.print(f"Created: {stats.get('created_at', 'Unknown')}") + console.print(f"Last accessed: {stats.get('last_accessed', 'Unknown')}") + except Exception as e: + console.print(f"[red]Error reading cache: {e}[/red]") + sys.exit(1) + return + if not quiet: banner() + # Auto-detect worker count if not specified + import os + cpu_count = os.cpu_count() or 4 + + if workers is None: + # Use ~90% of cores by default - leave headroom for OS and other processes + workers = max(1, int(cpu_count * 0.9)) + elif workers < 1: + console.print("[red]Error: --workers must be at least 1[/red]") + sys.exit(1) + elif workers > cpu_count: + console.print(f"[yellow]Warning: {workers} workers exceeds CPU count ({cpu_count})[/yellow]") + console.print("This may reduce performance. Recommended: --workers {}".format(int(cpu_count * 0.9))) + + # Log worker and CPU info in one line + worker_pct = int((workers / cpu_count) * 100) + logger.info(f"Using {workers}/{cpu_count} CPU cores ({worker_pct}%)") + + # Create output directory if it doesn't exist + os.makedirs(output_folder, exist_ok=True) + + # Initialize cache (enabled by default unless --no-cache) + cache = None + if not no_cache: + # Determine cache file path + if not cache_file: + cache_file = f"{output_folder}/bofhound_cache.db" + + cache_exists = os.path.exists(cache_file) + + try: + cache = ObjectCache(cache_file) + if cache_exists: + stats = cache.get_statistics() + logger.info(f"Found existing cache: {cache_file} ({stats['total_objects']:,} objects)") + logger.info("Only new/changed objects will be processed (incremental mode)") + logger.info("To disable caching, use --no-cache or delete/rename the cache file") + else: + logger.info(f"Creating new cache: {cache_file}") + except Exception as e: + logger.error(f"Failed to initialize cache: {e}") + sys.exit(1) + else: + logger.info("Caching disabled (--no-cache)") + # default to Cobalt logfile naming format data_source = None @@ -120,11 +221,37 @@ def main( ad = ADDS() broker = LocalBroker() pipeline = ParsingPipelineFactory.create_pipeline(parser_type=parser_type) + + # Load context from external cache if specified + if context_from: + import os + context_cache_path = context_from + + # If directory, look for cache file inside it + if os.path.isdir(context_from): + context_cache_path = os.path.join(context_from, 'bofhound_cache.db') + + if not os.path.exists(context_cache_path): + logger.error(f"Context cache file not found: {context_cache_path}") + sys.exit(1) + + try: + logger.info(f"Loading context from: {context_cache_path}") + with ObjectCache(context_cache_path) as context_cache: + ctx_stats = context_cache.get_context_statistics() + if ctx_stats['sid_mappings'] == 0: + logger.warning("Context cache has no SID mappings - ACL resolution may be incomplete") + else: + ad.load_context_from_cache(context_cache) + except Exception as e: + logger.error(f"Failed to load context: {e}") + sys.exit(1) with console.status("", spinner="aesthetic") as status: results = pipeline.process_data_source( data_source, - progress_callback=lambda id: status.update(f"Processing {id}") + progress_callback=lambda id: status.update(f"Processing {id}"), + num_workers=workers ) ldap_objects = results.get_ldap_objects() @@ -132,6 +259,19 @@ def main( results.get_privileged_sessions() + results.get_registry_sessions() logger.info("Parsed %d LDAP objects", len(ldap_objects)) logger.info("Parsed %d local group/session objects", len(local_objects)) + + # Apply cache filtering (automatic when cache exists) + if cache and ldap_objects: + original_count = len(ldap_objects) + stats = cache.get_statistics() + if stats['total_objects'] > 0: + logger.info("Filtering against cache (%d existing objects)...", stats['total_objects']) + ldap_objects = cache.get_changed_objects(ldap_objects) + logger.info("After cache filter: %d new/changed, %d skipped", + len(ldap_objects), original_count - len(ldap_objects)) + else: + logger.info("Cache is empty - all objects will be processed") + logger.info("Sorting parsed objects by type...") ad.import_objects(ldap_objects) @@ -159,9 +299,53 @@ def main( logger.info("Parsed %d Registry Sessions", len(broker.registry_sessions)) logger.info("Parsed %d Local Group Memberships", len(broker.local_group_memberships)) - ad.process() + ad.process(num_workers=workers) ad.process_local_objects(broker) + # Store processed objects in cache + if cache: + logger.info("Updating cache with processed objects...") + all_objects = (ad.users + ad.groups + ad.computers + ad.domains + + ad.ous + ad.gpos + ad.containers + ad.aiacas + ad.rootcas + + ad.enterprisecas + ad.certtemplates + ad.issuancepolicies + + ad.ntauthstores + ad.trustaccounts + ad.schemas) + # Note: ad.unknown_objects are raw dicts, not BloodHoundObject instances, so they can't be cached + + stored_count = 0 + for obj in all_objects: + try: + cache.store_object(obj) + stored_count += 1 + except Exception as e: + logger.debug(f"Failed to cache object {getattr(obj, 'ObjectIdentifier', 'unknown')}: {e}") + + # Store SID mappings for context in future runs + logger.debug("Storing SID mappings in cache...") + sid_mappings = [] + dn_mappings = [] + for sid, obj in ad.SID_MAP.items(): + if hasattr(obj, '_entry_type') and hasattr(obj, 'Properties'): + name = obj.Properties.get('name', '') + dn = obj.Properties.get('distinguishedname', '') + domain = obj.Properties.get('domain', '') + obj_type = obj._entry_type + sid_mappings.append((sid, name, obj_type, domain)) + if dn: + dn_mappings.append((dn, sid, obj_type)) + + if sid_mappings: + cache.store_sid_mappings_bulk(sid_mappings) + if dn_mappings: + cache.store_dn_mappings_bulk(dn_mappings) + + # Store domain mappings + for dc, domain_sid in ad.DOMAIN_MAP.items(): + cache.store_domain_mapping(dc, domain_sid) + + # Store schema GUIDs + if ad.ObjectTypeGuidMap: + cache.store_schema_guids_bulk(ad.ObjectTypeGuidMap) + # # Write out the BloodHound JSON files # @@ -184,6 +368,16 @@ def main( zip_files=zip_files ) + if cache: + cache.commit() + + # Log context statistics + ctx_stats = cache.get_context_statistics() + logger.info(f"Cache updated successfully ({stored_count:,} objects stored)") + logger.debug(f"Context stored: {ctx_stats['sid_mappings']} SID mappings, " + f"{ctx_stats['domain_mappings']} domain mappings, " + f"{ctx_stats['schema_guids']} schema GUIDs") + # # Upload files to BloodHound CE # @@ -201,6 +395,11 @@ def main( uploader.close_upload_job() logger.info("Files uploaded to BloodHound server") + # Close cache + if cache: + cache.close() + logger.info("Cache closed") + def banner(): """Display the bofhound banner.""" diff --git a/bofhound/ad/acl_processor.py b/bofhound/ad/acl_processor.py new file mode 100644 index 0000000..f41e14d --- /dev/null +++ b/bofhound/ad/acl_processor.py @@ -0,0 +1,206 @@ +""" +Parallel ACL processor for BOFHound performance improvements. + +This module enables multiprocessing of ACL parsing to leverage multiple CPU cores. +""" + +import base64 +from io import BytesIO +from typing import Dict, List, Tuple, Any +from multiprocessing import Pool, cpu_count +from functools import partial +from bloodhound.enumeration.acls import SecurityDescriptor, ACCESS_MASK, ACE, ACCESS_ALLOWED_OBJECT_ACE +from bloodhound.enumeration.acls import has_extended_right, EXTRIGHTS_GUID_MAPPING, can_write_property, ace_applies +from bloodhound.ad.utils import ADUtils +from bofhound.logger import logger + + +class ACLProcessorContext: + """Minimal context needed for ACL parsing in worker processes""" + + def __init__(self, sid_map: Dict[str, str], domain_map: Dict[str, str], + guid_map: Dict[str, str]): + """ + Initialize ACL processor context with lookup maps. + + Args: + sid_map: {sid: object_type} mapping + domain_map: {dc: domain_sid} mapping + guid_map: {name: guid} mapping for ObjectTypeGuidMap + """ + self.sid_map = sid_map + self.domain_map = domain_map + self.guid_map = guid_map + + +def parse_acl_for_object(obj_data: Tuple[str, str, bytes, str, Dict], + context: ACLProcessorContext) -> Tuple[str, List[Dict], bool, int]: + """ + Parse ACLs for a single object in a worker process. + + Args: + obj_data: Tuple of (object_id, entry_type, raw_aces, dn, properties) + context: ACLProcessorContext with lookup maps + + Returns: + Tuple of (object_id, aces_list, is_acl_protected, num_relations) + """ + object_id, entry_type, raw_aces, dn, properties = obj_data + + if not raw_aces: + return (object_id, [], False, 0) + + try: + value = base64.b64decode(raw_aces) + except Exception as e: + logger.warning(f"Error base64 decoding ACL for {object_id}: {e}") + return (object_id, [], False, 0) + + if not value: + return (object_id, [], False, 0) + + try: + sd = SecurityDescriptor(BytesIO(value)) + except Exception as e: + logger.warning(f"Error parsing SecurityDescriptor for {object_id}: {e}") + return (object_id, [], False, 0) + + is_acl_protected = sd.has_control(sd.PD) + aces = [] + num_relations = 0 + + # Parse owner + osid = str(sd.owner_sid) + ignoresids = ["S-1-3-0", "S-1-5-18", "S-1-5-10"] + + if osid not in ignoresids: + principal_type = context.sid_map.get(osid, "Unknown") + if osid in ADUtils.WELLKNOWN_SIDS: + principal_type = ADUtils.WELLKNOWN_SIDS[osid][1].title() + + aces.append({ + 'RightName': 'Owner', + 'PrincipalSID': osid, + 'IsInherited': False, + 'PrincipalType': principal_type + }) + num_relations += 1 + + # Parse DACL + if sd.dacl is None: + return (object_id, aces, is_acl_protected, num_relations) + + for ace in sd.dacl.aces: + sid = str(ace.sid) + + if sid in ignoresids: + continue + + # Determine principal type + principal_type = context.sid_map.get(sid, "Unknown") + if sid in ADUtils.WELLKNOWN_SIDS: + principal_type = ADUtils.WELLKNOWN_SIDS[sid][1].title() + + # Process ACE based on type + if ace.has_flag(ACE.INHERITED_ACE): + inherited = True + else: + inherited = False + + # GenericAll + if ace.has_priv(ACCESS_MASK.GENERIC_ALL): + aces.append({ + 'RightName': 'GenericAll', + 'PrincipalSID': sid, + 'IsInherited': inherited, + 'PrincipalType': principal_type + }) + num_relations += 1 + + # WriteDacl + if ace.has_priv(ACCESS_MASK.WRITE_DACL): + aces.append({ + 'RightName': 'WriteDacl', + 'PrincipalSID': sid, + 'IsInherited': inherited, + 'PrincipalType': principal_type + }) + num_relations += 1 + + # WriteOwner + if ace.has_priv(ACCESS_MASK.WRITE_OWNER): + aces.append({ + 'RightName': 'WriteOwner', + 'PrincipalSID': sid, + 'IsInherited': inherited, + 'PrincipalType': principal_type + }) + num_relations += 1 + + # Additional ACE processing logic can be added here + # (Extended rights, property writes, etc.) + + return (object_id, aces, is_acl_protected, num_relations) + + +def process_acls_parallel(objects: List[Any], sid_map: Dict, domain_map: Dict, + guid_map: Dict, num_workers: int = None) -> Dict[str, Tuple]: + """ + Process ACLs for multiple objects in parallel. + + Args: + objects: List of BloodHound objects to process + sid_map: SID to object type mapping + domain_map: Domain component to SID mapping + guid_map: ObjectTypeGuidMap + num_workers: Number of worker processes (default: cpu_count - 1) + + Returns: + Dict mapping object_id to (aces, is_protected, num_relations) + """ + if num_workers is None: + num_workers = max(1, cpu_count() - 1) + + # Prepare context + context = ACLProcessorContext( + sid_map={sid: obj._entry_type for sid, obj in sid_map.items()}, + domain_map=dict(domain_map), + guid_map=dict(guid_map) + ) + + # Prepare object data for workers + obj_data_list = [] + for obj in objects: + if hasattr(obj, 'RawAces') and obj.RawAces: + obj_data = ( + obj.ObjectIdentifier, + obj._entry_type, + obj.RawAces, + obj.Properties.get('distinguishedname', ''), + dict(obj.Properties) + ) + obj_data_list.append(obj_data) + + if not obj_data_list: + return {} + + # Process in parallel + logger.info(f"Processing {len(obj_data_list)} ACLs using {num_workers} workers") + + results = {} + + if len(obj_data_list) < 100 or num_workers == 1: + # For small datasets, multiprocessing overhead isn't worth it + for obj_data in obj_data_list: + obj_id, aces, is_protected, num_rels = parse_acl_for_object(obj_data, context) + results[obj_id] = (aces, is_protected, num_rels) + else: + # Use multiprocessing for large datasets + with Pool(num_workers) as pool: + process_func = partial(parse_acl_for_object, context=context) + parallel_results = pool.map(process_func, obj_data_list) + + for obj_id, aces, is_protected, num_rels in parallel_results: + results[obj_id] = (aces, is_protected, num_rels) + + return results diff --git a/bofhound/ad/acl_worker.py b/bofhound/ad/acl_worker.py new file mode 100644 index 0000000..e1e642f --- /dev/null +++ b/bofhound/ad/acl_worker.py @@ -0,0 +1,464 @@ +""" +Standalone ACL parsing worker for multiprocessing. + +This module provides a picklable ACL parsing function that can be used +with ProcessPoolExecutor. It contains all the logic from ADDS.parse_acl +but in a standalone form that can be safely passed to worker processes. +""" +import base64 +from io import BytesIO +from typing import Dict, List, Tuple, Any, Optional + +from bloodhound.ad.utils import ADUtils +from bloodhound.enumeration.acls import ( + SecurityDescriptor, ACCESS_MASK, ACE, ACCESS_ALLOWED_OBJECT_ACE, + has_extended_right, EXTRIGHTS_GUID_MAPPING, can_write_property, ace_applies +) +from impacket.uuid import string_to_bin + +# Add the Enroll GUID to the mapping +EXTRIGHTS_GUID_MAPPING["Enroll"] = string_to_bin("0e10c968-78fb-11d2-90d4-00c04f79dc55") + +# Type definitions for worker context +ACLWorkerContext = Dict[str, Any] +ACLEntry = Dict[str, Any] + + +def create_worker_context( + sid_map: Dict[str, Any], + domain_map: Dict[str, str], + object_type_guid_map: Dict[str, str] +) -> ACLWorkerContext: + """ + Create a serializable context dict for ACL workers. + + Args: + sid_map: Maps SID -> object type (e.g., "User", "Computer", "Group") + domain_map: Maps domain component (DC) -> domain SID + object_type_guid_map: Maps schema name -> GUID + + Returns: + Context dict that can be passed to worker processes + """ + # Simplify SID_MAP to just SID -> type mapping (all we need for ACL parsing) + simple_sid_map = {} + for sid, obj in sid_map.items(): + if hasattr(obj, '_entry_type'): + simple_sid_map[sid] = obj._entry_type + else: + simple_sid_map[sid] = "Unknown" + + return { + 'sid_map': simple_sid_map, + 'domain_map': domain_map, + 'object_type_guid_map': object_type_guid_map, + } + + +def _get_domain_component(dn: str) -> str: + """Extract domain component from DN (e.g., 'DC=ESSOS,DC=LOCAL').""" + parts = dn.upper().split(',') + dc_parts = [p for p in parts if p.startswith('DC=')] + return ','.join(dc_parts) + + +def _get_sid(sid: str, dn: str, domain_map: Dict[str, str]) -> str: + """ + Get full SID, prepending domain if it's a well-known SID. + + For well-known SIDs like S-1-5-32-544, we need to prepend the domain. + """ + if sid in ADUtils.WELLKNOWN_SIDS: + dc = _get_domain_component(dn) + domain_sid = domain_map.get(dc, "S-????") + # Format: DOMAIN-SID (e.g., "ESSOS.LOCAL-S-1-5-11") + return f"{dc.replace(',DC=', '.').replace('DC=', '')}-{sid}" + return sid + + +def _build_relation( + entry_dn: str, + entry_type: str, + sid: str, + relation: str, + inherited: bool, + sid_map: Dict[str, str], + domain_map: Dict[str, str] +) -> Dict[str, Any]: + """ + Build an ACL relationship dict. + + Args: + entry_dn: Distinguished name of the target object + entry_type: Type of the target object + sid: SID of the principal + relation: Relationship name (e.g., 'GenericAll', 'WriteDacl') + inherited: Whether the ACE is inherited + sid_map: Maps SID -> object type + domain_map: Maps DC -> domain SID + + Returns: + Relationship dict for BloodHound + """ + principal_sid = _get_sid(sid, entry_dn, domain_map) + + if sid in sid_map: + principal_type = sid_map[sid] + elif sid in ADUtils.WELLKNOWN_SIDS: + principal_type = ADUtils.WELLKNOWN_SIDS[sid][1].title() + else: + principal_type = "Unknown" + + return { + 'RightName': relation, + 'PrincipalSID': principal_sid, + 'IsInherited': inherited, + 'PrincipalType': principal_type + } + + +def parse_acl_standalone( + entry_data: ACLEntry, + context: ACLWorkerContext +) -> Tuple[str, List[Dict[str, Any]], bool]: + """ + Parse ACL for a single entry in a standalone (picklable) manner. + + Args: + entry_data: Dict containing: + - object_id: Object identifier (SID/GUID) + - entry_type: Object type (User, Computer, Group, etc.) + - dn: Distinguished name + - raw_aces: Base64-encoded nTSecurityDescriptor + - has_laps: Whether LAPS is enabled (for computers) + context: Worker context from create_worker_context() + + Returns: + Tuple of (object_id, relations_list, is_acl_protected) + """ + object_id = entry_data['object_id'] + entry_type = entry_data['entry_type'] + entry_dn = entry_data['dn'] + raw_aces = entry_data['raw_aces'] + has_laps = entry_data.get('has_laps', False) + + sid_map = context['sid_map'] + domain_map = context['domain_map'] + object_type_guid_map = context['object_type_guid_map'] + + if not raw_aces: + return (object_id, [], False) + + try: + value = base64.b64decode(raw_aces) + except Exception: + return (object_id, [], False) + + if not value: + return (object_id, [], False) + + sd = SecurityDescriptor(BytesIO(value)) + is_acl_protected = sd.has_control(sd.PD) + relations = [] + + # Ignore these SIDs (Creator Owner, Local System, Principal Self) + ignore_sids = {"S-1-3-0", "S-1-5-18", "S-1-5-10"} + + # Parse owner + owner_sid = str(sd.owner_sid) + if owner_sid not in ignore_sids: + relations.append(_build_relation( + entry_dn, entry_type, owner_sid, 'Owns', False, + sid_map, domain_map + )) + + # Parse DACLs + for ace_object in sd.dacl.aces: + # Only care about ACCESS_ALLOWED_OBJECT_ACE (0x05) and ACCESS_ALLOWED_ACE (0x00) + if ace_object.ace.AceType not in (0x05, 0x00): + continue + + sid = str(ace_object.acedata.sid) + if sid in ignore_sids: + continue + + is_inherited = ace_object.has_flag(ACE.INHERITED_ACE) + + if ace_object.ace.AceType == 0x05: + # ACCESS_ALLOWED_OBJECT_ACE + if not ace_object.has_flag(ACE.INHERITED_ACE) and ace_object.has_flag(ACE.INHERIT_ONLY_ACE): + continue + + # Check if ACE applies to this object type + if ace_object.has_flag(ACE.INHERITED_ACE) and \ + ace_object.acedata.has_flag(ACCESS_ALLOWED_OBJECT_ACE.ACE_INHERITED_OBJECT_TYPE_PRESENT): + try: + if not ace_applies( + ace_object.acedata.get_inherited_object_type().lower(), + entry_type, + object_type_guid_map + ): + continue + except KeyError: + pass + + mask = ace_object.acedata.mask + + # Generic access masks + if mask.has_priv(ACCESS_MASK.GENERIC_ALL) or mask.has_priv(ACCESS_MASK.WRITE_DACL) or \ + mask.has_priv(ACCESS_MASK.WRITE_OWNER) or mask.has_priv(ACCESS_MASK.GENERIC_WRITE): + + try: + if ace_object.acedata.has_flag(ACCESS_ALLOWED_OBJECT_ACE.ACE_OBJECT_TYPE_PRESENT) and \ + not ace_applies(ace_object.acedata.get_object_type().lower(), entry_type, object_type_guid_map): + continue + except KeyError: + pass + + if mask.has_priv(ACCESS_MASK.GENERIC_ALL): + # Check LAPS rights for computers + if entry_type.lower() == 'computer' and \ + ace_object.acedata.has_flag(ACCESS_ALLOWED_OBJECT_ACE.ACE_OBJECT_TYPE_PRESENT) and \ + has_laps and 'ms-mcs-admpwd' in object_type_guid_map: + if ace_object.acedata.get_object_type().lower() == object_type_guid_map['ms-mcs-admpwd']: + relations.append(_build_relation( + entry_dn, entry_type, sid, 'ReadLAPSPassword', is_inherited, + sid_map, domain_map + )) + else: + relations.append(_build_relation( + entry_dn, entry_type, sid, 'GenericAll', is_inherited, + sid_map, domain_map + )) + continue + + if mask.has_priv(ACCESS_MASK.GENERIC_WRITE): + relations.append(_build_relation( + entry_dn, entry_type, sid, 'GenericWrite', is_inherited, + sid_map, domain_map + )) + if entry_type.lower() not in ('domain', 'computer'): + continue + + if mask.has_priv(ACCESS_MASK.WRITE_DACL): + relations.append(_build_relation( + entry_dn, entry_type, sid, 'WriteDacl', is_inherited, + sid_map, domain_map + )) + + if mask.has_priv(ACCESS_MASK.WRITE_OWNER): + relations.append(_build_relation( + entry_dn, entry_type, sid, 'WriteOwner', is_inherited, + sid_map, domain_map + )) + + # Property write privileges + if ace_object.acedata.mask.has_priv(ACCESS_MASK.ADS_RIGHT_DS_WRITE_PROP): + if entry_type.lower() in ('user', 'group', 'computer', 'gpo') and \ + not ace_object.acedata.has_flag(ACCESS_ALLOWED_OBJECT_ACE.ACE_OBJECT_TYPE_PRESENT): + relations.append(_build_relation( + entry_dn, entry_type, sid, 'GenericWrite', is_inherited, + sid_map, domain_map + )) + + if entry_type.lower() == 'group' and can_write_property(ace_object, EXTRIGHTS_GUID_MAPPING['WriteMember']): + relations.append(_build_relation( + entry_dn, entry_type, sid, 'AddMember', is_inherited, + sid_map, domain_map + )) + + if entry_type.lower() == 'computer' and can_write_property(ace_object, EXTRIGHTS_GUID_MAPPING['AllowedToAct']): + relations.append(_build_relation( + entry_dn, entry_type, sid, 'AddAllowedToAct', is_inherited, + sid_map, domain_map + )) + + if entry_type.lower() == 'computer' and \ + can_write_property(ace_object, EXTRIGHTS_GUID_MAPPING['UserAccountRestrictionsSet']) and \ + not sid.endswith('-512'): + relations.append(_build_relation( + entry_dn, entry_type, sid, 'WriteAccountRestrictions', is_inherited, + sid_map, domain_map + )) + + # Key credential link + if entry_type.lower() in ('user', 'computer') and \ + ace_object.acedata.has_flag(ACCESS_ALLOWED_OBJECT_ACE.ACE_OBJECT_TYPE_PRESENT) and \ + 'ms-ds-key-credential-link' in object_type_guid_map and \ + ace_object.acedata.get_object_type().lower() == object_type_guid_map['ms-ds-key-credential-link']: + relations.append(_build_relation( + entry_dn, entry_type, sid, 'AddKeyCredentialLink', is_inherited, + sid_map, domain_map + )) + + # SPN write rights + if entry_type.lower() == 'user' and \ + ace_object.acedata.has_flag(ACCESS_ALLOWED_OBJECT_ACE.ACE_OBJECT_TYPE_PRESENT) and \ + ace_object.acedata.get_object_type().lower() == 'f3a64788-5306-11d1-a9c5-0000f80367c1': + relations.append(_build_relation( + entry_dn, entry_type, sid, 'WriteSPN', is_inherited, + sid_map, domain_map + )) + + # Certificate template rights + if entry_type.lower() == 'pki template' and \ + ace_object.acedata.has_flag(ACCESS_ALLOWED_OBJECT_ACE.ACE_OBJECT_TYPE_PRESENT) and \ + ace_object.acedata.get_object_type().lower() == 'ea1dddc4-60ff-416e-8cc0-17cee534bce7': + relations.append(_build_relation( + entry_dn, entry_type, sid, 'WritePKINameFlag', is_inherited, + sid_map, domain_map + )) + + if entry_type.lower() == 'pki template' and \ + ace_object.acedata.has_flag(ACCESS_ALLOWED_OBJECT_ACE.ACE_OBJECT_TYPE_PRESENT) and \ + ace_object.acedata.get_object_type().lower() == 'd15ef7d8-f226-46db-ae79-b34e560bd12c': + relations.append(_build_relation( + entry_dn, entry_type, sid, 'WritePKIEnrollmentFlag', is_inherited, + sid_map, domain_map + )) + + elif ace_object.acedata.mask.has_priv(ACCESS_MASK.ADS_RIGHT_DS_SELF): + # Self add + if entry_type.lower() == 'group' and ace_object.acedata.data.ObjectType == EXTRIGHTS_GUID_MAPPING['WriteMember']: + relations.append(_build_relation( + entry_dn, entry_type, sid, 'AddSelf', is_inherited, + sid_map, domain_map + )) + + # Property read privileges (LAPS) + if ace_object.acedata.mask.has_priv(ACCESS_MASK.ADS_RIGHT_DS_READ_PROP): + if entry_type.lower() == 'computer' and \ + ace_object.acedata.has_flag(ACCESS_ALLOWED_OBJECT_ACE.ACE_OBJECT_TYPE_PRESENT) and \ + has_laps and 'ms-mcs-admpwd' in object_type_guid_map: + if ace_object.acedata.get_object_type().lower() == object_type_guid_map['ms-mcs-admpwd']: + relations.append(_build_relation( + entry_dn, entry_type, sid, 'ReadLAPSPassword', is_inherited, + sid_map, domain_map + )) + + # Extended rights + if ace_object.acedata.mask.has_priv(ACCESS_MASK.ADS_RIGHT_DS_CONTROL_ACCESS): + if entry_type.lower() in ('user', 'domain') and \ + not ace_object.acedata.has_flag(ACCESS_ALLOWED_OBJECT_ACE.ACE_OBJECT_TYPE_PRESENT): + relations.append(_build_relation( + entry_dn, entry_type, sid, 'AllExtendedRights', is_inherited, + sid_map, domain_map + )) + + if entry_type.lower() == 'computer' and \ + not ace_object.acedata.has_flag(ACCESS_ALLOWED_OBJECT_ACE.ACE_OBJECT_TYPE_PRESENT): + relations.append(_build_relation( + entry_dn, entry_type, sid, 'AllExtendedRights', is_inherited, + sid_map, domain_map + )) + + if entry_type.lower() == 'domain' and has_extended_right(ace_object, EXTRIGHTS_GUID_MAPPING['GetChanges']): + relations.append(_build_relation( + entry_dn, entry_type, sid, 'GetChanges', is_inherited, + sid_map, domain_map + )) + + if entry_type.lower() == 'domain' and has_extended_right(ace_object, EXTRIGHTS_GUID_MAPPING['GetChangesAll']): + relations.append(_build_relation( + entry_dn, entry_type, sid, 'GetChangesAll', is_inherited, + sid_map, domain_map + )) + + if entry_type.lower() == 'domain' and has_extended_right(ace_object, EXTRIGHTS_GUID_MAPPING['GetChangesInFilteredSet']): + relations.append(_build_relation( + entry_dn, entry_type, sid, 'GetChangesInFilteredSet', is_inherited, + sid_map, domain_map + )) + + if entry_type.lower() == 'user' and has_extended_right(ace_object, EXTRIGHTS_GUID_MAPPING['UserForceChangePassword']): + relations.append(_build_relation( + entry_dn, entry_type, sid, 'ForceChangePassword', is_inherited, + sid_map, domain_map + )) + + # Certificate enrollment rights + if entry_type.lower() in ('pki template', 'enterpriseca') and \ + has_extended_right(ace_object, EXTRIGHTS_GUID_MAPPING['Enroll']): + relations.append(_build_relation( + entry_dn, entry_type, sid, 'Enroll', is_inherited, + sid_map, domain_map + )) + + elif ace_object.ace.AceType == 0x00: + # ACCESS_ALLOWED_ACE + if not ace_object.has_flag(ACE.INHERITED_ACE) and ace_object.has_flag(ACE.INHERIT_ONLY_ACE): + continue + + mask = ace_object.acedata.mask + + if mask.has_priv(ACCESS_MASK.GENERIC_ALL): + relations.append(_build_relation( + entry_dn, entry_type, sid, 'GenericAll', is_inherited, + sid_map, domain_map + )) + continue + + if mask.has_priv(ACCESS_MASK.ADS_RIGHT_DS_WRITE_PROP): + if entry_type.lower() in ('user', 'group', 'computer', 'gpo'): + relations.append(_build_relation( + entry_dn, entry_type, sid, 'GenericWrite', is_inherited, + sid_map, domain_map + )) + + if mask.has_priv(ACCESS_MASK.WRITE_OWNER): + relations.append(_build_relation( + entry_dn, entry_type, sid, 'WriteOwner', is_inherited, + sid_map, domain_map + )) + + if entry_type.lower() in ('user', 'domain') and mask.has_priv(ACCESS_MASK.ADS_RIGHT_DS_CONTROL_ACCESS): + relations.append(_build_relation( + entry_dn, entry_type, sid, 'AllExtendedRights', is_inherited, + sid_map, domain_map + )) + + if entry_type.lower() == 'computer' and mask.has_priv(ACCESS_MASK.ADS_RIGHT_DS_CONTROL_ACCESS) and \ + sid != "S-1-5-32-544" and not sid.endswith('-512'): + relations.append(_build_relation( + entry_dn, entry_type, sid, 'AllExtendedRights', is_inherited, + sid_map, domain_map + )) + + if mask.has_priv(ACCESS_MASK.WRITE_DACL): + relations.append(_build_relation( + entry_dn, entry_type, sid, 'WriteDacl', is_inherited, + sid_map, domain_map + )) + + return (object_id, relations, is_acl_protected) + + +def _worker_init(ctx: ACLWorkerContext): + """Initialize worker process with shared context.""" + global _worker_context + _worker_context = ctx + + +def _worker_process(entry_data: ACLEntry) -> Tuple[str, List[Dict[str, Any]], bool]: + """Worker function that uses global context.""" + global _worker_context + return parse_acl_standalone(entry_data, _worker_context) + + +def prepare_entry_for_worker(obj) -> ACLEntry: + """ + Prepare a BloodHound object for multiprocess ACL parsing. + + Args: + obj: BloodHoundObject instance + + Returns: + Serializable dict with data needed for ACL parsing + """ + return { + 'object_id': obj.ObjectIdentifier, + 'entry_type': obj._entry_type, + 'dn': obj.Properties.get('distinguishedname', ''), + 'raw_aces': obj.RawAces, + 'has_laps': 'haslaps' in obj.Properties and obj.Properties.get('haslaps', False), + } diff --git a/bofhound/ad/adds.py b/bofhound/ad/adds.py index 07c244c..7e6f1de 100644 --- a/bofhound/ad/adds.py +++ b/bofhound/ad/adds.py @@ -24,6 +24,33 @@ EXTRIGHTS_GUID_MAPPING["Enroll"] = string_to_bin("0e10c968-78fb-11d2-90d4-00c04f79dc55") +class _SIDPlaceholder: + """ + Minimal placeholder object for SID -> Type resolution during ACL parsing. + + This allows us to resolve ACL principal types without loading full + BloodHound objects from a previous run's cache. + """ + + _is_placeholder = True # Flag to identify placeholders + + def __init__(self, sid: str, name: str, object_type: str, domain: str = ''): + self.ObjectIdentifier = sid + self._entry_type = object_type + self.Properties = { + 'name': name, + 'distinguishedname': '', + 'domain': domain + } + + def merge_entry(self, other): + """ + Placeholders don't actually merge - they get replaced. + This method exists for compatibility but does nothing. + """ + pass + + class ADDS(): AT_SCHEMAIDGUID = "schemaidguid" @@ -63,6 +90,69 @@ def __init__(self): self.trusts: list[BloodHoundDomainTrust] = [] self.trustaccounts: list[BloodHoundUser] = [] self.unknown_objects: list[dict] = [] + + # Track whether context was loaded from external source + self._context_loaded = False + self._context_source = None + + def load_context_from_cache(self, cache) -> int: + """ + Load SID/DN/Domain context from a cache for ACL resolution. + + This allows processing new data (like certificates) while having + full ACL resolution context from a previous run. + + Args: + cache: ObjectCache instance with stored context + + Returns: + Number of SID mappings loaded + """ + # Load domain mappings + domain_map = cache.get_all_domain_mappings() + for dc, domain_sid in domain_map.items(): + if dc not in self.DOMAIN_MAP: + self.DOMAIN_MAP[dc] = domain_sid + + # Load schema GUIDs + schema_guids = cache.get_all_schema_guids() + for name, guid in schema_guids.items(): + if name not in self.ObjectTypeGuidMap: + self.ObjectTypeGuidMap[name] = guid + + # Load SID mappings as lightweight placeholder objects + # These are used for ACL type resolution but don't create full objects + sid_mappings = cache.get_all_sid_mappings() + loaded_count = 0 + + for sid, data in sid_mappings.items(): + if sid not in self.SID_MAP: + # Create a minimal placeholder object for ACL type resolution + placeholder = _SIDPlaceholder( + sid=sid, + name=data['name'], + object_type=data['object_type'], + domain=data['domain'] + ) + self.SID_MAP[sid] = placeholder + loaded_count += 1 + + # Load DN mappings + dn_mappings = cache.get_all_dn_mappings() + for dn, data in dn_mappings.items(): + if dn not in self.DN_MAP: + # Check if we have this SID in our map + sid = data['sid'] + if sid in self.SID_MAP: + self.DN_MAP[dn] = self.SID_MAP[sid] + + self._context_loaded = True + self._context_source = cache.cache_path + + logger.info(f"Loaded context: {loaded_count} SID mappings, " + f"{len(domain_map)} domains, {len(schema_guids)} schema GUIDs") + + return loaded_count def import_objects(self, objects): """Parse a list of dictionaries representing attributes of an AD object @@ -195,7 +285,15 @@ def import_objects(self, objects): if originalObject: - if bhObject: + # Check if originalObject is a placeholder from context loading + is_placeholder = getattr(originalObject, '_is_placeholder', False) + + if is_placeholder: + # Replace placeholder with real object + if bhObject: + target_list.append(bhObject) + self.add_object_to_maps(bhObject) + elif bhObject: originalObject.merge_entry(bhObject) else: bhObject = BloodHoundObject(object) @@ -314,30 +412,152 @@ def calculate_contained(self, object): # object.ContainedBy = {"ObjectIdentifier":id_contained, "ObjectType":type_contained} - def process(self): + def process(self, num_workers=None): + """ + Process imported objects to build relationships and parse ACLs. + + Args: + num_workers: Number of worker processes for parallel ACL parsing. + If None, uses cpu_count() - 1. Set to 1 to disable multiprocessing. + """ + import os + import time + from concurrent.futures import ProcessPoolExecutor, as_completed + from bofhound.ad.acl_worker import ( + create_worker_context, prepare_entry_for_worker, _worker_init, _worker_process + ) + + # Build lookup tables for algorithmic optimization + logger.debug("Building lookup tables for performance optimization...") + self._build_lookup_tables() + all_objects = self.users + self.groups + self.computers + self.domains + self.ous + self.gpos + self.containers \ + self.aiacas + self.rootcas + self.enterprisecas + self.certtemplates + self.issuancepolicies \ + self.ntauthstores total_objects = len(all_objects) + logger.info(f"Processing {total_objects:,} objects for ACL relationships...") num_parsed_relations = 0 - with console.status(f" [bold] Processed {num_parsed_relations} ACLs", spinner="aesthetic") as status: - for i, object in enumerate(all_objects): - self.recalculate_sid(object) - self.calculate_contained(object) - self.add_domainsid_prop(object) - try: - num_parsed_relations += self.parse_acl(object) - status.update(f" [bold] Processing {num_parsed_relations} ACLs --- {i}/{total_objects} objects parsed") - except: - # - # Catch the occasional error parinsing ACLs - # - continue - - logger.info("Parsed %d ACL relationships", num_parsed_relations) + # Determine worker count + if num_workers is None: + try: + cpu_count = os.cpu_count() or 4 + num_workers = max(1, cpu_count - 1) + except Exception: + num_workers = 1 + if num_workers < 1: + num_workers = 1 + + # Precompute SID/containment/domain props before ACL parsing + with console.status(" [bold] Preparing objects for ACL parsing...", spinner="aesthetic"): + for obj in all_objects: + self.recalculate_sid(obj) + self.calculate_contained(obj) + self.add_domainsid_prop(obj) + logger.info("Object preparation complete") + + # Count objects with ACLs to parse + objects_with_acls = [obj for obj in all_objects if obj.RawAces] + acl_count = len(objects_with_acls) + logger.info(f"Found {acl_count:,} objects with ACLs to parse (using {num_workers} worker(s))") + + if acl_count == 0: + logger.info("No ACLs to parse") + elif num_workers == 1: + # Single-threaded mode (for debugging or small datasets) + start_time = time.time() + progress_interval = max(1000, acl_count // 100) + + with console.status(" [bold] Parsing ACLs (single-threaded)...", spinner="aesthetic") as status: + for i, obj in enumerate(objects_with_acls): + try: + num_parsed_relations += self.parse_acl(obj) + except Exception as e: + logger.debug(f"Error parsing ACL for {obj.Properties.get('name', 'UNKNOWN')}: {e}") + continue + + if (i + 1) % progress_interval == 0 or i == acl_count - 1: + elapsed = time.time() - start_time + rate = (i + 1) / elapsed if elapsed > 0 else 0 + eta = (acl_count - i - 1) / rate if rate > 0 else 0 + pct = ((i + 1) / acl_count) * 100 + status.update( + f" [bold] ACL Progress: {i+1:,}/{acl_count:,} ({pct:.1f}%) | " + f"{num_parsed_relations:,} relationships | " + f"{rate:.0f} obj/s | ETA: {eta/60:.1f}min" + ) + + elapsed_total = time.time() - start_time + logger.info(f"Parsed {num_parsed_relations:,} ACL relationships in {elapsed_total/60:.1f} minutes") + logger.info(f"Average rate: {acl_count/elapsed_total:.0f} objects/second") + else: + # Multiprocessing mode - true parallel ACL parsing + start_time = time.time() + + # Create worker context with lookup data + worker_context = create_worker_context( + self.SID_MAP, + self.DOMAIN_MAP, + self.ObjectTypeGuidMap + ) + + # Prepare entries for workers + logger.info("Preparing entries for parallel processing...") + entries = [prepare_entry_for_worker(obj) for obj in objects_with_acls] + + # Create a mapping from object_id -> object for result application + object_map = {obj.ObjectIdentifier: obj for obj in objects_with_acls} + + # Process with multiprocessing + completed = 0 + progress_interval = max(1000, acl_count // 100) + + with console.status(" [bold] Parsing ACLs (multiprocessing)...", spinner="aesthetic") as status: + # Use spawn start method on Windows for safety + import multiprocessing + mp_context = multiprocessing.get_context('spawn') + + with ProcessPoolExecutor( + max_workers=num_workers, + initializer=_worker_init, + initargs=(worker_context,), + mp_context=mp_context + ) as executor: + # Submit all tasks + futures = {executor.submit(_worker_process, entry): entry for entry in entries} + + # Process results as they complete + for future in as_completed(futures): + try: + object_id, relations, is_acl_protected = future.result() + num_parsed_relations += len(relations) + + # Apply results back to object + if object_id in object_map: + obj = object_map[object_id] + obj.Aces = relations + obj.IsACLProtected = is_acl_protected + except Exception as e: + entry = futures[future] + logger.debug(f"Error parsing ACL for {entry.get('object_id', 'UNKNOWN')}: {e}") + + completed += 1 + if completed % progress_interval == 0 or completed == acl_count: + elapsed = time.time() - start_time + rate = completed / elapsed if elapsed > 0 else 0 + eta = (acl_count - completed) / rate if rate > 0 else 0 + pct = (completed / acl_count) * 100 + status.update( + f" [bold] ACL Progress: {completed:,}/{acl_count:,} ({pct:.1f}%) | " + f"{num_parsed_relations:,} relationships | " + f"{rate:.0f} obj/s | ETA: {eta/60:.1f}min" + ) + + elapsed_total = time.time() - start_time + logger.info(f"Parsed {num_parsed_relations:,} ACL relationships in {elapsed_total/60:.1f} minutes") + logger.info(f"Average rate: {acl_count/elapsed_total:.0f} objects/second") with console.status(" [bold] Creating default users", spinner="aesthetic"): self.write_default_users() @@ -385,10 +605,8 @@ def process(self): logger.info("Resolved hosting computers of CAs") def get_sid_from_name(self, name): - for entry in self.SID_MAP: - if(self.SID_MAP[entry].Properties["name"].lower() == name): - return (entry, self.SID_MAP[entry]._entry_type) - return (None,None) + # Use lookup table for O(1) performance instead of O(n) linear search + return self._name_to_sid_map.get(name, (None, None)) def resolve_delegation_targets(self): @@ -1014,12 +1232,35 @@ def _is_nested_group(self, subgroup, group): return False + def _build_lookup_tables(self): + """ + Build lookup tables for algorithmic optimization. + Converts O(n) linear searches to O(1) dictionary lookups. + """ + # Build name -> (SID, type) lookup for delegation resolution + self._name_to_sid_map = {} + for sid, obj in self.SID_MAP.items(): + if hasattr(obj, 'Properties') and 'name' in obj.Properties: + name_lower = obj.Properties['name'].lower() + self._name_to_sid_map[name_lower] = (sid, obj._entry_type) + + logger.debug(f"Built name->SID lookup table with {len(self._name_to_sid_map)} entries") + + # Build DN -> OU lookup for OU membership resolution + self._dn_to_ou_map = {} + for ou in self.ous: + if 'distinguishedname' in ou.Properties: + dn = ou.Properties['distinguishedname'] + self._dn_to_ou_map[dn] = ou + + logger.debug(f"Built DN->OU lookup table with {len(self._dn_to_ou_map)} entries") + + def _resolve_object_ou(self, item): if "OU=" in item.Properties["distinguishedname"]: target_ou = "OU=" + item.Properties["distinguishedname"].split("OU=", 1)[1] - for ou in self.ous: - if ou.Properties["distinguishedname"] == target_ou: - return ou + # Use lookup table for O(1) performance instead of O(n) linear search + return self._dn_to_ou_map.get(target_ou, None) return None @@ -1028,9 +1269,8 @@ def _resolve_nested_ou(self, nested_ou): # else is top-level OU if len(dn.split("OU=")) > 2: target_ou = "OU=" + dn.split("OU=", 2)[2] - for ou in self.ous: - if ou.Properties["distinguishedname"] == target_ou: - return ou + # Use lookup table for O(1) performance instead of O(n) linear search + return self._dn_to_ou_map.get(target_ou, None) else: dc = BloodHoundObject.get_domain_component(dn) for domain in self.domains: @@ -1288,7 +1528,13 @@ def process_local_group_memberships(self, local_group_memberships, computer_obje @staticmethod def find_issuer_ca(start_ca_obj, all_ca_obj): + # Skip if start_ca_obj has no valid certificate + if start_ca_obj.x509Certificate is None: + return None for potential_issuer in all_ca_obj: + # Skip potential issuers with no valid certificate + if potential_issuer.x509Certificate is None: + continue if start_ca_obj.x509Certificate['issuer'] == potential_issuer.x509Certificate['subject']: return potential_issuer return None diff --git a/bofhound/ad/models/bloodhound_object.py b/bofhound/ad/models/bloodhound_object.py index ae7f602..db88bda 100644 --- a/bofhound/ad/models/bloodhound_object.py +++ b/bofhound/ad/models/bloodhound_object.py @@ -32,16 +32,19 @@ def __init__(self, object=None): self.Properties = {} if isinstance(object, dict): - # Ensure all keys are lowercase - for item in object.keys(): - self.Properties[item.lower()] = object[item] + # Optimize: Use dict comprehension for lowercasing keys (faster than loop) + self.Properties = {k.lower(): v for k, v in object.items()} - self.ObjectIdentifier = BloodHoundObject.get_sid(object.get('objectsid', None), object.get('distinguishedname', None)) + self.ObjectIdentifier = BloodHoundObject.get_sid( + self.Properties.get('objectsid'), + self.Properties.get('distinguishedname') + ) - if 'distinguishedname' in object.keys(): - self.Properties["distinguishedname"] = object.get('distinguishedname', None).upper() + # Uppercase DN once if it exists + if 'distinguishedname' in self.Properties: + self.Properties["distinguishedname"] = self.Properties["distinguishedname"].upper() - self.__parse_whencreated(object) + self.__parse_whencreated(self.Properties) def get_primary_membership(self, object): @@ -213,7 +216,18 @@ def parse_cacertificate(self, object): # certname # certificate_byte_array = base64.b64decode(certificate_b64) - ca_cert = x509.Certificate.load(certificate_byte_array)["tbs_certificate"] + try: + ca_cert = x509.Certificate.load(certificate_byte_array)["tbs_certificate"] + except (ValueError, TypeError) as e: + # Malformed certificate data - use CN as certname and skip certificate parsing + from bofhound.logger import logger + logger.debug(f"Could not parse certificate for {object.get('cn', 'unknown')}: {e}") + self.Properties['certname'] = object.get('cn', thumbprint) + self.Properties['certchain'] = [] + self.Properties['hasbasicconstraints'] = False + self.Properties['basicconstraintpathlength'] = 0 + self.x509Certificate = None + return self.x509Certificate = ca_cert # set for post-processing self.Properties['certname'] = ca_cert['subject'].native.get('common_name', thumbprint) diff --git a/bofhound/cache.py b/bofhound/cache.py new file mode 100644 index 0000000..a0cafb9 --- /dev/null +++ b/bofhound/cache.py @@ -0,0 +1,626 @@ +""" +Object cache system for incremental BOFHound processing. + +Enables storing processed objects in SQLite database for fast incremental updates. +""" + +import sqlite3 +import pickle +import hashlib +import time +from typing import Dict, List, Optional, Any, Set +from pathlib import Path +from bofhound.logger import logger +from bofhound.ad.models.bloodhound_object import BloodHoundObject + + +class ObjectCache: + """SQLite-based cache for storing processed BloodHound objects.""" + + # Version for cache compatibility + CACHE_VERSION = "1.1.0" # Bumped for SID mapping tables + + def __init__(self, cache_path: str): + """ + Initialize object cache. + + Args: + cache_path: Path to SQLite database file + """ + self.cache_path = cache_path + self.conn = None + self._init_database() + + def _init_database(self): + """Initialize SQLite database schema.""" + self.conn = sqlite3.connect(self.cache_path) + self.conn.row_factory = sqlite3.Row + + cursor = self.conn.cursor() + + # Create objects table + cursor.execute(''' + CREATE TABLE IF NOT EXISTS objects ( + sid TEXT, + dn TEXT, + object_type TEXT, + data BLOB, + attr_hash TEXT, + timestamp REAL, + source_file TEXT, + PRIMARY KEY (sid, dn) + ) + ''') + + # Create indexes for fast lookups + cursor.execute('CREATE INDEX IF NOT EXISTS idx_sid ON objects(sid)') + cursor.execute('CREATE INDEX IF NOT EXISTS idx_dn ON objects(dn)') + cursor.execute('CREATE INDEX IF NOT EXISTS idx_type ON objects(object_type)') + cursor.execute('CREATE INDEX IF NOT EXISTS idx_hash ON objects(attr_hash)') + + # Create SID mappings table for ACL context + cursor.execute(''' + CREATE TABLE IF NOT EXISTS sid_mappings ( + sid TEXT PRIMARY KEY, + name TEXT, + object_type TEXT, + domain TEXT, + timestamp REAL + ) + ''') + cursor.execute('CREATE INDEX IF NOT EXISTS idx_sid_name ON sid_mappings(name)') + cursor.execute('CREATE INDEX IF NOT EXISTS idx_sid_domain ON sid_mappings(domain)') + + # Create domain mappings table + cursor.execute(''' + CREATE TABLE IF NOT EXISTS domain_mappings ( + dc TEXT PRIMARY KEY, + domain_sid TEXT, + domain_name TEXT, + timestamp REAL + ) + ''') + + # Create DN to SID mappings for resolution + cursor.execute(''' + CREATE TABLE IF NOT EXISTS dn_mappings ( + dn TEXT PRIMARY KEY, + sid TEXT, + object_type TEXT, + timestamp REAL + ) + ''') + cursor.execute('CREATE INDEX IF NOT EXISTS idx_dn_sid ON dn_mappings(sid)') + + # Create schema GUID mappings + cursor.execute(''' + CREATE TABLE IF NOT EXISTS schema_guids ( + name TEXT PRIMARY KEY, + guid TEXT, + timestamp REAL + ) + ''') + + # Create metadata table + cursor.execute(''' + CREATE TABLE IF NOT EXISTS metadata ( + key TEXT PRIMARY KEY, + value TEXT + ) + ''') + + # Store cache version + cursor.execute( + 'INSERT OR REPLACE INTO metadata (key, value) VALUES (?, ?)', + ('cache_version', self.CACHE_VERSION) + ) + + # Store creation timestamp if new + cursor.execute('SELECT value FROM metadata WHERE key = ?', ('created_at',)) + if not cursor.fetchone(): + cursor.execute( + 'INSERT INTO metadata (key, value) VALUES (?, ?)', + ('created_at', str(time.time())) + ) + + # Update last accessed + cursor.execute( + 'INSERT OR REPLACE INTO metadata (key, value) VALUES (?, ?)', + ('last_accessed', str(time.time())) + ) + + self.conn.commit() + + def _hash_object(self, obj: Dict[str, Any]) -> str: + """ + Calculate hash of object attributes for change detection. + + Args: + obj: LDAP object dictionary + + Returns: + MD5 hash of normalized object attributes + """ + # Exclude volatile attributes from hash calculation + exclude_attrs = { + 'whencreated', 'whenchanged', 'usnchanged', 'usncreated', + 'dscorepropagationdata', 'lastlogon', 'lastlogontimestamp', + 'badpasswordtime', 'logoncount' + } + + # Create sorted list of key-value pairs for consistent hashing + items = [] + for key in sorted(obj.keys()): + if key.lower() not in exclude_attrs: + value = obj[key] + # Convert bytes to string for hashing + if isinstance(value, bytes): + try: + value = value.decode('utf-8', errors='ignore') + except: + value = str(value) + items.append(f"{key}:{value}") + + hash_string = '|'.join(items) + return hashlib.md5(hash_string.encode('utf-8')).hexdigest() + + def get_cached_object(self, sid: Optional[str] = None, + dn: Optional[str] = None) -> Optional[Dict]: + """ + Retrieve cached object by SID or DN. + + Args: + sid: Object SID + dn: Object Distinguished Name + + Returns: + Dict with cached object data or None + """ + cursor = self.conn.cursor() + + if sid and dn: + cursor.execute( + 'SELECT * FROM objects WHERE sid = ? AND dn = ?', + (sid, dn) + ) + elif sid: + cursor.execute('SELECT * FROM objects WHERE sid = ?', (sid,)) + elif dn: + cursor.execute('SELECT * FROM objects WHERE dn = ?', (dn,)) + else: + return None + + row = cursor.fetchone() + if row: + return { + 'sid': row['sid'], + 'dn': row['dn'], + 'object_type': row['object_type'], + 'data': pickle.loads(row['data']), + 'hash': row['attr_hash'], + 'timestamp': row['timestamp'], + 'source_file': row['source_file'] + } + return None + + def store_object(self, obj: Any, source_file: str = ''): + """ + Store processed BloodHound object in cache. + + Args: + obj: BloodHoundObject instance + source_file: Source log file name + """ + sid = obj.ObjectIdentifier if obj.ObjectIdentifier else '' + dn = obj.Properties.get('distinguishedname', '') + object_type = obj._entry_type if hasattr(obj, '_entry_type') else 'Unknown' + + # Calculate hash from original properties + attr_hash = self._hash_object(obj.Properties) + + # Serialize object + data = pickle.dumps(obj) + + cursor = self.conn.cursor() + cursor.execute(''' + INSERT OR REPLACE INTO objects + (sid, dn, object_type, data, attr_hash, timestamp, source_file) + VALUES (?, ?, ?, ?, ?, ?, ?) + ''', (sid, dn, object_type, data, attr_hash, time.time(), source_file)) + + def get_changed_objects(self, ldap_objects: List[Dict]) -> List[Dict]: + """ + Filter LDAP objects to return only new or changed ones. + + Args: + ldap_objects: List of LDAP object dictionaries + + Returns: + List of objects that are new or have changed + """ + changed = [] + unchanged_count = 0 + + for ldap_obj in ldap_objects: + sid = ldap_obj.get('objectsid', '') + dn = ldap_obj.get('distinguishedname', '').upper() + + # Calculate hash of new object + new_hash = self._hash_object(ldap_obj) + + # Look up in cache + cached = self.get_cached_object(sid=sid, dn=dn) + + if not cached: + changed.append(ldap_obj) + else: + # Skip cached objects + unchanged_count += 1 + + logger.info(f"Cache: {unchanged_count} unchanged, {len(changed)} new/changed") + return changed + + def get_all_cached_objects(self) -> List[Any]: + """ + Retrieve all cached BloodHound objects. + + Returns: + List of deserialized BloodHound objects + """ + cursor = self.conn.cursor() + cursor.execute('SELECT data FROM objects') + + objects = [] + for row in cursor.fetchall(): + try: + obj = pickle.loads(row['data']) + objects.append(obj) + except Exception as e: + logger.warning(f"Failed to deserialize cached object: {e}") + + return objects + + def get_statistics(self) -> Dict[str, Any]: + """ + Get cache statistics. + + Returns: + Dict with cache stats + """ + from datetime import datetime + cursor = self.conn.cursor() + + # Total objects + cursor.execute('SELECT COUNT(*) as count FROM objects') + total_count = cursor.fetchone()['count'] + + # Objects by type + cursor.execute(''' + SELECT object_type, COUNT(*) as count + FROM objects + GROUP BY object_type + ''') + by_type = {row['object_type']: row['count'] for row in cursor.fetchall()} + + # Metadata + cursor.execute('SELECT key, value FROM metadata') + metadata = {row['key']: row['value'] for row in cursor.fetchall()} + + # File size + file_size = Path(self.cache_path).stat().st_size / (1024 * 1024) # MB + + # Format timestamps + created_at = metadata.get('created_at') + last_accessed = metadata.get('last_accessed') + + if created_at: + try: + created_at = datetime.fromtimestamp(float(created_at)).strftime('%Y-%m-%d %H:%M:%S') + except (ValueError, TypeError): + pass + + if last_accessed: + try: + last_accessed = datetime.fromtimestamp(float(last_accessed)).strftime('%Y-%m-%d %H:%M:%S') + except (ValueError, TypeError): + pass + + return { + 'total_objects': total_count, + 'by_type': by_type, + 'cache_version': metadata.get('cache_version'), + 'created_at': created_at, + 'last_accessed': last_accessed, + 'file_size_mb': round(file_size, 2) + } + + def commit(self): + """Commit pending transactions.""" + if self.conn: + self.conn.commit() + + def close(self): + """Close database connection.""" + if self.conn: + self.conn.commit() + self.conn.close() + + # ===== SID Mapping Methods ===== + + def store_sid_mapping(self, sid: str, name: str, object_type: str, domain: str = ''): + """ + Store a SID to name mapping. + + Args: + sid: Object SID + name: Object name (sAMAccountName or CN) + object_type: BloodHound object type (User, Computer, Group, etc.) + domain: Domain name + """ + cursor = self.conn.cursor() + cursor.execute(''' + INSERT OR REPLACE INTO sid_mappings (sid, name, object_type, domain, timestamp) + VALUES (?, ?, ?, ?, ?) + ''', (sid, name, object_type, domain, time.time())) + + def store_sid_mappings_bulk(self, mappings: List[tuple]): + """ + Bulk store SID mappings for efficiency. + + Args: + mappings: List of (sid, name, object_type, domain) tuples + """ + cursor = self.conn.cursor() + timestamp = time.time() + cursor.executemany(''' + INSERT OR REPLACE INTO sid_mappings (sid, name, object_type, domain, timestamp) + VALUES (?, ?, ?, ?, ?) + ''', [(sid, name, obj_type, domain, timestamp) for sid, name, obj_type, domain in mappings]) + self.conn.commit() + + def get_sid_mapping(self, sid: str) -> Optional[Dict]: + """ + Retrieve SID mapping by SID. + + Args: + sid: Object SID + + Returns: + Dict with name, object_type, domain or None + """ + cursor = self.conn.cursor() + cursor.execute('SELECT * FROM sid_mappings WHERE sid = ?', (sid,)) + row = cursor.fetchone() + if row: + return { + 'sid': row['sid'], + 'name': row['name'], + 'object_type': row['object_type'], + 'domain': row['domain'] + } + return None + + def get_all_sid_mappings(self) -> Dict[str, Dict]: + """ + Get all SID mappings as a dictionary. + + Returns: + Dict mapping SID -> {name, object_type, domain} + """ + cursor = self.conn.cursor() + cursor.execute('SELECT sid, name, object_type, domain FROM sid_mappings') + return { + row['sid']: { + 'name': row['name'], + 'object_type': row['object_type'], + 'domain': row['domain'] + } + for row in cursor.fetchall() + } + + # ===== Domain Mapping Methods ===== + + def store_domain_mapping(self, dc: str, domain_sid: str, domain_name: str = ''): + """ + Store a domain component to SID mapping. + + Args: + dc: Domain component (e.g., 'DC=ESSOS,DC=LOCAL') + domain_sid: Domain SID + domain_name: Domain DNS name (optional) + """ + cursor = self.conn.cursor() + cursor.execute(''' + INSERT OR REPLACE INTO domain_mappings (dc, domain_sid, domain_name, timestamp) + VALUES (?, ?, ?, ?) + ''', (dc, domain_sid, domain_name, time.time())) + + def get_domain_mapping(self, dc: str) -> Optional[str]: + """ + Get domain SID by domain component. + + Args: + dc: Domain component (e.g., 'DC=ESSOS,DC=LOCAL') + + Returns: + Domain SID or None + """ + cursor = self.conn.cursor() + cursor.execute('SELECT domain_sid FROM domain_mappings WHERE dc = ?', (dc,)) + row = cursor.fetchone() + return row['domain_sid'] if row else None + + def get_all_domain_mappings(self) -> Dict[str, str]: + """ + Get all domain mappings. + + Returns: + Dict mapping DC -> domain_sid + """ + cursor = self.conn.cursor() + cursor.execute('SELECT dc, domain_sid FROM domain_mappings') + return {row['dc']: row['domain_sid'] for row in cursor.fetchall()} + + # ===== DN Mapping Methods ===== + + def store_dn_mapping(self, dn: str, sid: str, object_type: str): + """ + Store a DN to SID mapping. + + Args: + dn: Distinguished Name + sid: Object SID + object_type: BloodHound object type + """ + cursor = self.conn.cursor() + cursor.execute(''' + INSERT OR REPLACE INTO dn_mappings (dn, sid, object_type, timestamp) + VALUES (?, ?, ?, ?) + ''', (dn, sid, object_type, time.time())) + + def store_dn_mappings_bulk(self, mappings: List[tuple]): + """ + Bulk store DN mappings. + + Args: + mappings: List of (dn, sid, object_type) tuples + """ + cursor = self.conn.cursor() + timestamp = time.time() + cursor.executemany(''' + INSERT OR REPLACE INTO dn_mappings (dn, sid, object_type, timestamp) + VALUES (?, ?, ?, ?) + ''', [(dn, sid, obj_type, timestamp) for dn, sid, obj_type in mappings]) + self.conn.commit() + + def get_dn_mapping(self, dn: str) -> Optional[Dict]: + """ + Get object info by DN. + + Args: + dn: Distinguished Name + + Returns: + Dict with sid, object_type or None + """ + cursor = self.conn.cursor() + cursor.execute('SELECT * FROM dn_mappings WHERE dn = ?', (dn,)) + row = cursor.fetchone() + if row: + return { + 'dn': row['dn'], + 'sid': row['sid'], + 'object_type': row['object_type'] + } + return None + + def get_all_dn_mappings(self) -> Dict[str, Dict]: + """ + Get all DN mappings. + + Returns: + Dict mapping DN -> {sid, object_type} + """ + cursor = self.conn.cursor() + cursor.execute('SELECT dn, sid, object_type FROM dn_mappings') + return { + row['dn']: { + 'sid': row['sid'], + 'object_type': row['object_type'] + } + for row in cursor.fetchall() + } + + # ===== Schema GUID Methods ===== + + def store_schema_guid(self, name: str, guid: str): + """ + Store a schema name to GUID mapping. + + Args: + name: Schema attribute name + guid: Schema ID GUID + """ + cursor = self.conn.cursor() + cursor.execute(''' + INSERT OR REPLACE INTO schema_guids (name, guid, timestamp) + VALUES (?, ?, ?) + ''', (name, guid, time.time())) + + def store_schema_guids_bulk(self, mappings: Dict[str, str]): + """ + Bulk store schema GUID mappings. + + Args: + mappings: Dict of name -> guid + """ + cursor = self.conn.cursor() + timestamp = time.time() + cursor.executemany(''' + INSERT OR REPLACE INTO schema_guids (name, guid, timestamp) + VALUES (?, ?, ?) + ''', [(name, guid, timestamp) for name, guid in mappings.items()]) + self.conn.commit() + + def get_all_schema_guids(self) -> Dict[str, str]: + """ + Get all schema GUID mappings. + + Returns: + Dict mapping name -> guid + """ + cursor = self.conn.cursor() + cursor.execute('SELECT name, guid FROM schema_guids') + return {row['name']: row['guid'] for row in cursor.fetchall()} + + # ===== Context Export/Import ===== + + def export_context_for_workers(self) -> Dict[str, Any]: + """ + Export all context data needed for ACL parsing in workers. + + Returns: + Dict containing sid_map, domain_map, and schema_guids + """ + return { + 'sid_map': self.get_all_sid_mappings(), + 'domain_map': self.get_all_domain_mappings(), + 'dn_map': self.get_all_dn_mappings(), + 'schema_guids': self.get_all_schema_guids() + } + + def get_context_statistics(self) -> Dict[str, int]: + """ + Get statistics about stored context data. + + Returns: + Dict with counts of each mapping type + """ + cursor = self.conn.cursor() + + cursor.execute('SELECT COUNT(*) as c FROM sid_mappings') + sid_count = cursor.fetchone()['c'] + + cursor.execute('SELECT COUNT(*) as c FROM domain_mappings') + domain_count = cursor.fetchone()['c'] + + cursor.execute('SELECT COUNT(*) as c FROM dn_mappings') + dn_count = cursor.fetchone()['c'] + + cursor.execute('SELECT COUNT(*) as c FROM schema_guids') + schema_count = cursor.fetchone()['c'] + + return { + 'sid_mappings': sid_count, + 'domain_mappings': domain_count, + 'dn_mappings': dn_count, + 'schema_guids': schema_count + } + + def __enter__(self): + """Context manager entry.""" + return self + + def __exit__(self, exc_type, exc_val, exc_tb): + """Context manager exit.""" + self.close() diff --git a/bofhound/parsers/parsing_pipeline.py b/bofhound/parsers/parsing_pipeline.py index a6c6e9f..483d2c7 100644 --- a/bofhound/parsers/parsing_pipeline.py +++ b/bofhound/parsers/parsing_pipeline.py @@ -1,7 +1,9 @@ """Parsing pipeline to coordinate multiple tool parsers for C2 framework logs.""" +import os +import multiprocessing from typing import List, Dict, Any from .types import ObjectType, ToolParser -from .data_sources import DataSource +from .data_sources import DataSource, FileDataSource, FileDataStream from . import ( NetLocalGroupBofParser, NetLoggedOnBofParser, NetSessionBofParser, RegSessionBofParser, LdapSearchBofParser, ParserType, Brc4LdapSentinelParser @@ -52,17 +54,40 @@ class ParsingPipeline: def __init__(self, platform_filters=None): self.tool_parsers: List[ToolParser] = [] self.platform_filters = platform_filters or [] + self.parser_type = None # Will be set by factory def register_parser(self, parser: ToolParser): """Register a tool parser with the pipeline""" self.tool_parsers.append(parser) - def process_data_source(self, data_source: DataSource, progress_callback=None) -> ParsingResult: + def process_data_source(self, data_source: DataSource, progress_callback=None, num_workers=None) -> ParsingResult: """ Process a data source through all registered parsers. + + Args: + data_source: The data source to process + progress_callback: Optional callback for progress updates + num_workers: Number of parallel workers (None = auto, 1 = single-threaded) Returns categorized results. """ + # Check if we can use parallel processing + if isinstance(data_source, FileDataSource): + file_list = list(data_source.get_data_streams()) + + # Use parallel processing if multiple files and workers > 1 + if num_workers != 1 and len(file_list) > 1: + return self._process_files_parallel( + file_list, + progress_callback=progress_callback, + num_workers=num_workers + ) + + # Fall back to sequential processing + return self._process_sequential(data_source, progress_callback) + + def _process_sequential(self, data_source: DataSource, progress_callback=None) -> ParsingResult: + """Process data source sequentially (original behavior).""" result = ParsingResult() for data_stream in data_source.get_data_streams(): @@ -81,6 +106,75 @@ def process_data_source(self, data_source: DataSource, progress_callback=None) - result.add_objects(parser.produces_object_type, parser.get_results()) return result + + def _process_files_parallel(self, file_streams: List[FileDataStream], + progress_callback=None, num_workers=None) -> ParsingResult: + """Process multiple files in parallel using multiprocessing.""" + from concurrent.futures import ProcessPoolExecutor, as_completed + from bofhound.logger import logger + + # Determine worker count + if num_workers is None: + num_workers = max(1, int(os.cpu_count() * 0.9)) + num_workers = min(num_workers, len(file_streams)) + + if num_workers == 1: + # Single file or single worker - use sequential + return self._process_sequential_files(file_streams, progress_callback) + + logger.debug(f"Processing {len(file_streams)} files with {num_workers} workers") + + # Prepare file paths for workers + file_paths = [fs.file_path for fs in file_streams] + + result = ParsingResult() + completed = 0 + + # Use spawn context for clean worker processes + ctx = multiprocessing.get_context('spawn') + + with ProcessPoolExecutor(max_workers=num_workers, mp_context=ctx) as executor: + # Submit all files for processing + future_to_file = { + executor.submit(_worker_parse_file, fp, self.parser_type): fp + for fp in file_paths + } + + for future in as_completed(future_to_file): + file_path = future_to_file[future] + completed += 1 + + if progress_callback: + progress_callback(f"FILES_TO_PARSE/{os.path.basename(file_path)}") + + try: + worker_result = future.result() + # Merge results + for obj_type in ObjectType: + result.add_objects(obj_type, worker_result.get(obj_type.value, [])) + except Exception as e: + logger.error(f"Error processing {file_path}: {e}") + + logger.debug(f"Parallel parsing complete: {completed} files processed") + return result + + def _process_sequential_files(self, file_streams: List[FileDataStream], + progress_callback=None) -> ParsingResult: + """Process files sequentially (for single file or single worker).""" + result = ParsingResult() + + for data_stream in file_streams: + if progress_callback: + progress_callback(data_stream.identifier) + for line in data_stream.lines(): + filtered_line = line.rstrip('\n\r') + for parser in self.tool_parsers: + parser.process_line(filtered_line) + + for parser in self.tool_parsers: + result.add_objects(parser.produces_object_type, parser.get_results()) + + return result def process_file(self, file_path: str) -> ParsingResult: """ @@ -106,6 +200,41 @@ def process_file(self, file_path: str) -> ParsingResult: return result + +def _worker_parse_file(file_path: str, parser_type) -> Dict[str, List[Dict[str, Any]]]: + """ + Worker function to parse a single file. + Runs in a separate process. + """ + # Create fresh parsers for this worker + parsers = [ + NetLoggedOnBofParser(), + NetSessionBofParser(), + NetLocalGroupBofParser(), + RegSessionBofParser(), + ] + + if parser_type == ParserType.BRC4: + parsers.append(Brc4LdapSentinelParser()) + else: + parsers.append(LdapSearchBofParser()) + + # Process the file + with open(file_path, 'r', encoding='utf-8') as f: + for line in f: + filtered_line = line.rstrip('\n\r') + for parser in parsers: + parser.process_line(filtered_line) + + # Collect results as dict (for pickling across process boundary) + results = {} + for parser in parsers: + obj_type = parser.produces_object_type + results[obj_type.value] = parser.get_results() + + return results + + class ParsingPipelineFactory: """Factory to create ParsingPipeline instances with registered parsers.""" @@ -113,6 +242,7 @@ class ParsingPipelineFactory: def create_pipeline(parser_type: ParserType = ParserType.LdapsearchBof) -> ParsingPipeline: """Create a ParsingPipeline with all available parsers registered.""" pipeline = ParsingPipeline() + pipeline.parser_type = parser_type # Store for parallel processing pipeline.register_parser(NetLoggedOnBofParser()) pipeline.register_parser(NetSessionBofParser())