From b78d50b8fb22d66e9cbb28debc27608828531a3d Mon Sep 17 00:00:00 2001 From: JoshuaMoelans <60878493+JoshuaMoelans@users.noreply.github.com> Date: Thu, 15 Jan 2026 16:23:07 +0100 Subject: [PATCH 1/2] initial extract_minidump implementation --- scripts/extract_minidump.py | 243 +++++++++++++++++++++++++++++++ scripts/test_extract_minidump.py | 236 ++++++++++++++++++++++++++++++ tests/fixtures/minidump.envelope | Bin 0 -> 42791 bytes 3 files changed, 479 insertions(+) create mode 100755 scripts/extract_minidump.py create mode 100755 scripts/test_extract_minidump.py create mode 100644 tests/fixtures/minidump.envelope diff --git a/scripts/extract_minidump.py b/scripts/extract_minidump.py new file mode 100755 index 000000000..6521e6183 --- /dev/null +++ b/scripts/extract_minidump.py @@ -0,0 +1,243 @@ +#!/usr/bin/env python3 +""" +Extract minidump (.dmp) attachments from Sentry envelope files. + +Sentry envelope format: +- Line 1: Envelope header (JSON) +- For each item: + - Item header (JSON with "type", "length", and optional metadata) + - Item payload (raw bytes of specified length) + - Items are separated by newlines + +Usage: + python extract_minidump.py [output_file] + +If output_file is not specified, the filename from the envelope will be used. +""" + +import argparse +import json +import os +import sys +from pathlib import Path + + +def parse_envelope(data: bytes) -> tuple[dict, list[tuple[dict, bytes]]]: + """ + Parse a Sentry envelope and return the header and list of items. + + Args: + data: Raw envelope file contents + + Returns: + Tuple of (envelope_header, list of (item_header, item_payload) tuples) + """ + pos = 0 + + # Parse envelope header (first line) + newline_pos = data.find(b'\n', pos) + if newline_pos == -1: + raise ValueError("Invalid envelope: missing newline after header") + + envelope_header = json.loads(data[pos:newline_pos].decode('utf-8')) + pos = newline_pos + 1 + + items = [] + + # Parse items + while pos < len(data): + # Skip any extra newlines between items + while pos < len(data) and data[pos:pos+1] == b'\n': + pos += 1 + + if pos >= len(data): + break + + # Parse item header + newline_pos = data.find(b'\n', pos) + if newline_pos == -1: + # No more complete items + break + + item_header_bytes = data[pos:newline_pos] + try: + item_header = json.loads(item_header_bytes.decode('utf-8')) + except json.JSONDecodeError as e: + print(f"Warning: Failed to parse item header at position {pos}: {e}") + break + + pos = newline_pos + 1 + + # Get payload length + payload_len = item_header.get('length') + + if payload_len is None: + # Length omitted: read until next newline or end + next_newline = data.find(b'\n', pos) + if next_newline == -1: + payload_len = len(data) - pos + else: + payload_len = next_newline - pos + + # Extract payload + payload = data[pos:pos + payload_len] + pos += payload_len + + items.append((item_header, payload)) + + return envelope_header, items + + +def extract_minidump(envelope_path: str, output_path: str = None) -> str: + """ + Extract the minidump attachment from a Sentry envelope file. + + Args: + envelope_path: Path to the envelope file + output_path: Optional output path for the minidump. If not specified, + uses the filename from the envelope metadata. + + Returns: + Path to the extracted minidump file + + Raises: + FileNotFoundError: If envelope file doesn't exist + ValueError: If no minidump found in envelope + """ + envelope_path = Path(envelope_path) + + if not envelope_path.exists(): + raise FileNotFoundError(f"Envelope file not found: {envelope_path}") + + # Read envelope file + with open(envelope_path, 'rb') as f: + data = f.read() + + print(f"Read {len(data)} bytes from {envelope_path}") + + # Parse envelope + envelope_header, items = parse_envelope(data) + + print(f"Envelope event_id: {envelope_header.get('event_id', 'N/A')}") + print(f"Found {len(items)} item(s) in envelope") + + # Find minidump attachment + minidump_item = None + minidump_header = None + + for item_header, item_payload in items: + item_type = item_header.get('type', '') + attachment_type = item_header.get('attachment_type', '') + + print(f" - Item type: {item_type}, attachment_type: {attachment_type}, " + f"length: {len(item_payload)} bytes") + + if item_type == 'attachment' and attachment_type == 'event.minidump': + minidump_item = item_payload + minidump_header = item_header + print(f" -> Found minidump!") + + if minidump_item is None: + raise ValueError("No minidump attachment found in envelope") + + # Determine output path + if output_path is None: + filename = minidump_header.get('filename', 'minidump.dmp') + output_path = envelope_path.parent / filename + else: + output_path = Path(output_path) + + # Verify minidump magic bytes (optional sanity check) + if minidump_item[:4] == b'MDMP': + print(f"Minidump magic verified: MDMP") + else: + print(f"Warning: Unexpected magic bytes: {minidump_item[:4]}") + + # Write minidump + with open(output_path, 'wb') as f: + f.write(minidump_item) + + print(f"\nExtracted minidump to: {output_path}") + print(f"Size: {len(minidump_item)} bytes") + + return str(output_path) + + +def list_envelope_contents(envelope_path: str) -> None: + """ + List the contents of a Sentry envelope file without extracting. + """ + envelope_path = Path(envelope_path) + + if not envelope_path.exists(): + raise FileNotFoundError(f"Envelope file not found: {envelope_path}") + + with open(envelope_path, 'rb') as f: + data = f.read() + + envelope_header, items = parse_envelope(data) + + print(f"Envelope: {envelope_path}") + print(f" Size: {len(data)} bytes") + print(f" Event ID: {envelope_header.get('event_id', 'N/A')}") + print(f" DSN: {envelope_header.get('dsn', 'N/A')}") + print() + print(f"Items ({len(items)}):") + + for i, (item_header, item_payload) in enumerate(items): + print(f" [{i}] Type: {item_header.get('type', 'unknown')}") + print(f" Length: {len(item_payload)} bytes") + + if item_header.get('attachment_type'): + print(f" Attachment Type: {item_header['attachment_type']}") + if item_header.get('filename'): + print(f" Filename: {item_header['filename']}") + if item_header.get('content_type'): + print(f" Content-Type: {item_header['content_type']}") + + # Show preview for text items + if item_header.get('type') in ('event', 'session', 'transaction'): + try: + preview = item_payload[:200].decode('utf-8') + if len(item_payload) > 200: + preview += '...' + print(f" Preview: {preview}") + except UnicodeDecodeError: + pass + + print() + + +def main(): + parser = argparse.ArgumentParser( + description='Extract minidump attachments from Sentry envelope files' + ) + parser.add_argument( + 'envelope', + help='Path to the Sentry envelope file' + ) + parser.add_argument( + 'output', + nargs='?', + help='Output path for the minidump (default: use filename from envelope)' + ) + parser.add_argument( + '-l', '--list', + action='store_true', + help='List envelope contents without extracting' + ) + + args = parser.parse_args() + + try: + if args.list: + list_envelope_contents(args.envelope) + else: + extract_minidump(args.envelope, args.output) + except (FileNotFoundError, ValueError) as e: + print(f"Error: {e}", file=sys.stderr) + sys.exit(1) + + +if __name__ == '__main__': + main() diff --git a/scripts/test_extract_minidump.py b/scripts/test_extract_minidump.py new file mode 100755 index 000000000..375fd0338 --- /dev/null +++ b/scripts/test_extract_minidump.py @@ -0,0 +1,236 @@ +#!/usr/bin/env python3 +""" +Tests for the extract_minidump.py script. + +Uses the minidump.envelope fixture which contains the minidump.dmp from +tests/fixtures to verify that extraction produces identical output. +""" + +import hashlib +import os +import sys +import tempfile +import unittest +from pathlib import Path + +# Add the scripts directory to the path so we can import extract_minidump +SCRIPT_DIR = Path(__file__).parent +sys.path.insert(0, str(SCRIPT_DIR)) + +from extract_minidump import parse_envelope, extract_minidump, list_envelope_contents + + +class TestExtractMinidump(unittest.TestCase): + """Test cases for minidump extraction from Sentry envelopes.""" + + @classmethod + def setUpClass(cls): + """Set up test fixtures paths.""" + cls.repo_root = SCRIPT_DIR.parent + cls.fixtures_dir = cls.repo_root / "tests" / "fixtures" + cls.envelope_path = cls.fixtures_dir / "minidump.envelope" + cls.original_minidump_path = cls.fixtures_dir / "minidump.dmp" + + # Verify fixtures exist + if not cls.envelope_path.exists(): + raise FileNotFoundError( + f"Envelope fixture not found: {cls.envelope_path}\n" + "Run create_envelope_fixture.py to create it." + ) + if not cls.original_minidump_path.exists(): + raise FileNotFoundError( + f"Original minidump not found: {cls.original_minidump_path}" + ) + + def test_parse_envelope_structure(self): + """Test that envelope parsing returns correct structure.""" + with open(self.envelope_path, 'rb') as f: + data = f.read() + + envelope_header, items = parse_envelope(data) + + # Check envelope header + self.assertIn('dsn', envelope_header) + self.assertIn('event_id', envelope_header) + + # Check we have at least 2 items (event + attachment) + self.assertGreaterEqual(len(items), 2) + + # Check item types + item_types = [item[0].get('type') for item in items] + self.assertIn('event', item_types) + self.assertIn('attachment', item_types) + + def test_parse_envelope_minidump_header(self): + """Test that the minidump attachment header is correct.""" + with open(self.envelope_path, 'rb') as f: + data = f.read() + + envelope_header, items = parse_envelope(data) + + # Find minidump item + minidump_item = None + for item_header, item_payload in items: + if item_header.get('attachment_type') == 'event.minidump': + minidump_item = (item_header, item_payload) + break + + self.assertIsNotNone(minidump_item, "No minidump attachment found") + + header, payload = minidump_item + self.assertEqual(header['type'], 'attachment') + self.assertEqual(header['attachment_type'], 'event.minidump') + self.assertIn('filename', header) + self.assertEqual(header['length'], len(payload)) + + def test_minidump_magic_bytes(self): + """Test that extracted minidump has correct magic bytes.""" + with open(self.envelope_path, 'rb') as f: + data = f.read() + + envelope_header, items = parse_envelope(data) + + # Find minidump payload + minidump_payload = None + for item_header, item_payload in items: + if item_header.get('attachment_type') == 'event.minidump': + minidump_payload = item_payload + break + + self.assertIsNotNone(minidump_payload) + # MDMP is the minidump magic signature + self.assertEqual(minidump_payload[:4], b'MDMP', + "Minidump should start with MDMP magic bytes") + + def test_extract_minidump_matches_original(self): + """Test that extracted minidump is identical to original.""" + with tempfile.TemporaryDirectory() as tmpdir: + output_path = Path(tmpdir) / "extracted.dmp" + + # Extract the minidump + result_path = extract_minidump(str(self.envelope_path), str(output_path)) + + self.assertEqual(result_path, str(output_path)) + self.assertTrue(output_path.exists()) + + # Compare with original + with open(self.original_minidump_path, 'rb') as f: + original_data = f.read() + with open(output_path, 'rb') as f: + extracted_data = f.read() + + # Compare sizes + self.assertEqual(len(extracted_data), len(original_data), + f"Size mismatch: extracted={len(extracted_data)}, " + f"original={len(original_data)}") + + # Compare content + self.assertEqual(extracted_data, original_data, + "Extracted minidump content differs from original") + + def test_extract_minidump_hash_comparison(self): + """Test extraction using hash comparison for additional verification.""" + with tempfile.TemporaryDirectory() as tmpdir: + output_path = Path(tmpdir) / "extracted.dmp" + + extract_minidump(str(self.envelope_path), str(output_path)) + + # Calculate hashes + with open(self.original_minidump_path, 'rb') as f: + original_hash = hashlib.md5(f.read()).hexdigest() + with open(output_path, 'rb') as f: + extracted_hash = hashlib.md5(f.read()).hexdigest() + + self.assertEqual(extracted_hash, original_hash, + f"MD5 hash mismatch: extracted={extracted_hash}, " + f"original={original_hash}") + + def test_extract_minidump_default_filename(self): + """Test that extraction uses filename from envelope when not specified.""" + with tempfile.TemporaryDirectory() as tmpdir: + # Copy envelope to temp dir so output goes there + import shutil + temp_envelope = Path(tmpdir) / "test.envelope" + shutil.copy(self.envelope_path, temp_envelope) + + # Extract without specifying output path + result_path = extract_minidump(str(temp_envelope)) + + # Should use filename from envelope header (minidump.dmp) + self.assertTrue(Path(result_path).exists()) + self.assertEqual(Path(result_path).name, "minidump.dmp") + + def test_extract_minidump_nonexistent_file(self): + """Test that extraction fails gracefully for nonexistent file.""" + with self.assertRaises(FileNotFoundError): + extract_minidump("/nonexistent/path/to/envelope.envelope") + + def test_envelope_without_minidump(self): + """Test that extraction fails gracefully when no minidump present.""" + with tempfile.TemporaryDirectory() as tmpdir: + # Create envelope without minidump + import json + envelope_path = Path(tmpdir) / "no_minidump.envelope" + + envelope_header = {"dsn": "https://test@sentry.invalid/42"} + event_payload = {"event_id": "test", "level": "info"} + event_bytes = json.dumps(event_payload).encode('utf-8') + event_header = {"type": "event", "length": len(event_bytes)} + + with open(envelope_path, 'wb') as f: + f.write(json.dumps(envelope_header).encode('utf-8')) + f.write(b'\n') + f.write(json.dumps(event_header).encode('utf-8')) + f.write(b'\n') + f.write(event_bytes) + + with self.assertRaises(ValueError) as ctx: + extract_minidump(str(envelope_path)) + + self.assertIn("No minidump", str(ctx.exception)) + + +class TestParseEnvelope(unittest.TestCase): + """Test cases for envelope parsing edge cases.""" + + def test_parse_empty_envelope(self): + """Test parsing empty data.""" + with self.assertRaises(Exception): + parse_envelope(b'') + + def test_parse_header_only(self): + """Test parsing envelope with only header.""" + import json + data = json.dumps({"dsn": "test"}).encode('utf-8') + b'\n' + header, items = parse_envelope(data) + self.assertEqual(header['dsn'], 'test') + self.assertEqual(len(items), 0) + + def test_parse_multiple_items(self): + """Test parsing envelope with multiple items.""" + import json + + envelope_header = {"dsn": "test"} + item1_payload = b"payload1" + item1_header = {"type": "event", "length": len(item1_payload)} + item2_payload = b"payload2" + item2_header = {"type": "attachment", "length": len(item2_payload)} + + data = b'' + data += json.dumps(envelope_header).encode('utf-8') + b'\n' + data += json.dumps(item1_header).encode('utf-8') + b'\n' + data += item1_payload + data += b'\n' + data += json.dumps(item2_header).encode('utf-8') + b'\n' + data += item2_payload + + header, items = parse_envelope(data) + + self.assertEqual(len(items), 2) + self.assertEqual(items[0][1], item1_payload) + self.assertEqual(items[1][1], item2_payload) + + +if __name__ == '__main__': + # Run tests with verbosity + unittest.main(verbosity=2) diff --git a/tests/fixtures/minidump.envelope b/tests/fixtures/minidump.envelope new file mode 100644 index 0000000000000000000000000000000000000000..cdf5cb740fdca7b787a65ef152cc96e114b96019 GIT binary patch literal 42791 zcmeHQ31C!3((Xw(0&;|KY&;o)M>rCa7$5@500{&aBodIrD7hvi<{ER6Ajg0%X3@pS zdZ2ys18_ ztE#K3kJtTPzdJ(G3-dxoSVJ<4iX4R_h72jP7Z%wH?RiB7W#L(QB`G;s=|iHULIzty z>?IUCCo7$?87oXigk`0NMMhm69X52>@R+ca)U@<4dqxDuaKtqs3p?KtQdH)!b1s>I zN#)q{<`!jARAh7{Bb*y)B2<)>OOmAII+%P^L{xNGL}Xax&}orTBO)T`KRiOZ+04|D zlTwtCUy#eFc_~F%C3YoBi9Ls-GE$0Cay-OMDJn`y%gm)xd&nFe6&)Ex^o-;=9t8}~ z&C1J4FD97?DI<%@nvyGvkNEI(N?X`@a_r=U->$!_W1bCD1f6oGWjyjTc}m^+~h`BxB4UtK40p%DGLSUT&Bx6BlRL=~AtS5Sc_ z%{vfyiiOth4}&i2cy_Up?|VvLy#zW?2*p%pTug^Vm+E01u;Vy(6U1aOO;|;uC=w|m zPo&dt0sUGjHcR9RyF&Bw%M!PD(PKo=j3ruSB8TYA6`4f8^)$H#+bfyLx1*tL*W!rC}TNagcwTKFcC&MqokjuY;)(M2GT_; zSyP(G*R^ig!C!P=smUFf*nz+ zl+@kUxRmGVp9e@^0;Um--^G=;xqyu!tAi1Z8`f1K(>I3&6__F|v8 z%X!Q^2bV;7hD|myCX<|=ZgL{mzi{*u_0b{XV8*v6`&=T#OYOcs%bkUv=pgn6^2ps; z!qGZ&JDP>SWYkEU?Zsl{s!PQ2ufK4vz0|0$36jUd#E55!Lb&rX z*Vk-HPxH#_qt237GEFqeJw@};TQ8GdpMS{Q#Y^tIlq%flgeW_-ULaOp+eK9TYN_A* z#Oqw_kZq&K6oYhQ9Y!lVbaWPfpKyWbbk!YpYhQyG&zN_0Qg+DDU593|`xBdCUU`)% zUe3jqkl`Wh&@(e1>hUjcG`vF^A9n0X~p+;*?L3Ny|uCZxa+k-aEHR6lEZaK6>x#r@zD zU6gGwWM1r=>JMo+6MYk8vixaWu7TIc_GRrarsaf)$KUDX_lniv#kSF@o8raL8!xtv zIIp~(Q??PQ9u1ge;?-K);Bu|- z$}3y9<*p*3s;lTVC#$-`#A~M3AwzFEqxw`jY=WDS!)r?|MWz~Wg(vRWf6)OLvOrThiAO+TVI%of|K-VL&dIg|4RBGEGWEoL63n zk&;)v5I6nOBCMZOKY!#LdEH99)_CQm)-HtAUz8O4i~DvB=sA`;QTOr7m-;&8mDkZJ zvRpMn487bUei+p8__9{=VqLmB*+)$^Vj9<1uxek`9mR8(&^(}OZU47TyjnXy3?nvi zZoKNo7<`rQKmz>Qbqa_v?+tmq&T(>sdb0ewO; zOuSk2M(5mYce?`adS*4hU5QQN%o+A~?|u$Jzp_FKg0XO8Zw zZzV6YZLkjadF3@q)mMcOLweG^!cB{2N0ET7sV{$`-GMGRD|sw#YmANSD^>MT6&CSn zKR+>g`_^rFCSI+rFShx9UU|({yy`7t{EL2KTlW3EHlHIeuCEDRd4;QauTzM4c%o>+NG$!ZHNpx|7R$} zOTMK0S%og6**os>Rnou1zDB~>EnEU2{_1#G!WGx5MV_?Vc9Q@!?(8a)@#DM2F5ST* zeLnjvZFPv7*FF<*0l{l8J++=~{o;v3{1>Hk@teJQR|lrWbeIO`ue~EX@QT{5V(+|O z?U~rhd+zUS+5Dhbvw&9!lzU`_5T}U#E!PQgJrP{mzJthK%#=M{&-$mu6w+PZbkBQ~ zzYm_{xAC~2xO#GXQ5mE1ycal4mj0rfJZT`bVR;`}#{SFv#fOKJ#QlW*yH^K`cVaAJ z;_b9{v4F~XQIf^)7ec(Vm=?^)ebi0m-bT15c3&tagtQZ@M+A%A8QsMDYZr=xdupN; zh7faRr0jY`saH%o)>lSf8Tze+AzM00=tp(EwUoTpEix<;Hb{7AgA`rF$0`^+TERUd z6`FA9o3To5SCy$9B!=^;_7Wo9IY?!&HrA!4)FfaUyndJGv3|z8UA&i)M>{wLqKtM? zcrT|&I4FJ@U9n<>NTU5C10#tRWl{(uOO#M-k|}qdM_L-~6AZ*@+&BW&4>IfZvj!U69uMdP>J<{$}dHnm`b{e6O+VPF^1%fbECA`MJa)J<UQ_YB&u6FIlVZZ*^fJOsr97Eb1G(f^x+tW5 zG~R6+PJ1a)RKDSI-;DRsu90C*iJ?FDyu1@J3>S0 zPnByp;Tw59WiKJU|F|}7=*{M;^OkKmg=(vWO5vcI^t8cIK2XUeD=Hx!7LXZ8f#W8L&J)(3ht7D}zAQXR35(%tCzqF=S+nMo9KsEmewZU2HS1{Gf<;a%5U>MyXj zel9&{K9z!Z=ZmSPJ+0Ok-NI@e)KkqP`_FUJi!UYO@u`4nCyPpUt~3;XjERP3eJRgm z>X+I65}L5N8+@qe=4^Nl(RH_DUvy;NaYwEDQa)~3v#4~_+{U?B8ucbo*%EvxpL;*S zqar_{!sD2lX%xCyfiHar&k~}@l05Cv7ag%ZGL?L4^Vx1v*nBA4MAAe&X(WMW0twUu zg;R|w4f$3UZVh?TrVU#h=7SvOyuQ>k&m_`hPpn3jVQ!`HMHgl|MoD~`E zpQ2}pNEwZs`}zte)&$uk&K!EW%SqGttHmJtOO(HUd|T$DLk_FX2M96&r__Eu!hk2v zsQ5S?k&FbP_IdD3f-AZi5AYWIHI2Y!Y-ngEvFR-Tkywk#$ww0-Bf_F0r$t4KxH?Au z@bihohYpL3jGo<+5c=UvdnrBYm}3u5YYG3Sl5Xo^lcI3jcXu!N*<_G=O?EY1Y!V_Y z3X){_TKQ}IxYBi$yeT)5#p~b0{?xtY{5KQS+Lf@mVbQ(6N5}|(J}k&gf4~V(`bJKh z;Uyp6Ke4MDK2`Jgln3+&t=uQ~m&Pe{`OABey`Nn9#=(t7WZe&nZ%69_wO45*dH7~( zJVZYK-W^Tit2=mL32`9=GNPpK`5v0z?|&=l*e2ZS!r(k+`VO65-BINObY*JB6G0ss zKj+G@mTvd!2`?U<4f&}aYTUD;AAs0EwYnqy2>v&9vskcOcpdp4IQ#9Sls2B`;mtWg9 z{Frsr7H-qP`K|#Tb{uZQ58s9QH2jzq&^8>IHS&KM{!a}*H5KOJN4?B~vSRqLJ)rOq z-&%;_2l#En4>ZvR+cfa=(EtxWHn-u&N=5S+e%gkU^SQGBUBgeE9#3X|bF)-OK!bW0 z61u^=krnzKN%QuSvyV-pVptM2I=_ZydN$1m{C12wKkk0^njG}g2HP~yrh Nd61 zZTN{G6p!J@oTY6zGHc|g9R7C=KPPn|YxLeveZfRIlJGGC?C`2R;;RBwx=Iz|>5Jg= zgjjsb%u)C!vK0g$#DCKF{1|>bOV<`^(?FXBylH@kpSHapZyNeZrAGs`1OIebgP)bBSP$mq(pgYUD6HmY_}wJhFzFC=`ATOHznI1mRg*ZlOC3CPc5FS z)92ukDo1*%VA>Dd6LRYIYuhcqa?EUNx2xVL;V~w!C_N`992iyEvl8D~P}y_zFWzpq z<!0HHh0d`WTr>+3Z`<(3v;IN*mhJAh{q|Ff zS=yBJk`zaln{wZH;mb9J9n#iXY?1lS)1Rzjq};-iw1OfRoo9y~-L&@RgafbV9V%OL zL0SZ(6x!1=^9v1}y-Pue(>I11HRZZ^uD`0k394U8+tKu zXB$d)VHJ#fD$VlS=YzI&doO>-_J8EF1nH@BGwnGJ7tTYAPCftb!(qo9+h5+aanJ)x z83&YHNUOK>ShV8(UZW3NUb}hd=*Xdrl$M|CVX-rZSIsQnn_SZK!NOmf5b)TW zZ@d?D6;r2LOUo}Xx`+Gwt@$eX)}pzy=bs$Y@vkp`#5mGGJ*s5+zSVZyCHHhXw&Yhw zs_*SMfw7bwI|{OLv-qUTLYJ~`Uh~f>U*Fql(f>?qxG(CDkM?Hdd4*AP+-h(4+dH=w zM_+n6{)@+7xqtJh?-(Vwa9&}4TDFT!*`!DP7B267Y}TR+vr@C3{f2R}9GN)|YN;-y zyEAq+Ecng-l^?F(`u9-@;|e9Oth~HJ17+@^Ue825TKi_Z2mWB)ao>Wc7{xt(*_Rt; zuUit+;lSzG!N>1?Q@#_Lo|)!IBWoJwVsz*9`PcpJ_UbFob{~K2)FYw6Oa)jjq>CE< zy8m$5t0kYV&plz^ecdcZauh;d?!5REFFuK^EBSFdfs?!9PkWvehP{5AG}cAGHOx1s z8B6F<603S!I-J6J6z(FHl8@e6ri)Lx#A+E%5vSdzwQnGIH2Tvwu?t6|f>HWbv1*>!z@fAA$ zN$!K(T)mEu(U4CWWoO1yvBq!he#Hm3Ur6EeP1qqFCyA=qbN>aO>=)G&w`NKxccw1K z&!xPmKOvO9>AV|RLtFL9bkGOz5Qe_$4{wquoz?Z=XL9K=T>S_nAjh#~)?Jt{AB{wQ z*cCJ5^lHs7;)h>z7Lib|=sJd!%P@W{){oSQJ37M5H&f>WJ|5LX7|AJ=Gm+2{9ri4QsmqaDO=$dO4^8oylU->G53_iH`!?+hJv;uDvGyqs{HlYfU&%%wJ?1i1-`tO}t2W`OSmU=sr>{Ds=mi={KKUZr zZF@hhKJ^)OTG?a%>TP95ea^tqnUE$Qw@ z1nP7q%4ML_x~O-jH{U;2`M^{LVmAm~GXz;ngxb`3g+KlF*Bv%E_2<9I%W1atvy;&XL6 ziOvZ6p<0fh87e>Uzz6x!Zy_E0VIPPGT{AxDpdZ0}z}%kjh6wPTy8WQtBOZ2SriXS} zs`I64LCtt(zK*r~0`Kp5`60yf>7wjVA5PuRgT6{V_g_EVz+jgs4=AGC<+>b5Hs3pUYK$wkc4|Ip&JWDsv5G(1VQ{<(-!RZq9_Sa(wgW;w&@)3l3<>D> za?KCr1U~2?T&K$qJ-#->Qy#=aPtBE&*9O?doL2g5=_?-F%RKdXr`BtLuBUR-HYNId zA*>#JWPpAWzEFJI=P3CRZ^&y!l!TBIbfJd;O$YJF5B!AaLGq;9+Fc4-FHq3=-Dnwa z?XAK>s1t2@O@kfi1ERg5yve$}5#OnDs9fj%%Y4B@A1LMu)8T3&th)$d>Y-mNe&+cu z?mrXse05MaB}>CaHzx9ZMd?>6-{C9ce!*e5nfksVSV!=>4m+#-HGW{PMvQ6?7ruix zlT{e&9H0j}Hh-0F`%=ll>s0P;o-XtcJsFrsZce5YmMYz#?RohR`GNpIeFT#)u^Htk_sr%5t%l69qP(WUgVAoky z3q7f$?4!cek0~tLbZN#uuhI4*gr|L0YC53j)P&6TiTn-|U)U%3@~h-5@_ZAX(-cNN z$kVvps&xjdmIna*orus7?GW|=Jka&i6%IUPhd%ovLMS_fT&`&Cf!(>vt5-t62Yn;@ zXS7#ye~fXcN{^H1$yQ#bUVX;AVy14#%szdY5Xs@Hop6|Wq90@XRcpDVeSfv~#d8!s zFp%5mD!OE!NqQ(Je;2x7TkPew+CXdFhOQhWweW>4{<0Q3hJ5kYws;=PbT<&>`O$SZ3c+6t;V)b5 zrb6>+gZwpzVE_MvHxd{LHp7V+F zb-JI@_KbQ(dqzKu`bB@NC<_7m++Xb3A0r<4=KKf;>i!Pnd8zLA(XT-s=I1_ESDD%$ z>J{r#pu_ty^*D9oXfKE@0tbIB{bOf*82?UK{@9Hb@BJ+-E_>m3sY*ZPN7Z-}{Jny% zjLmqQXT9TLz0jQf#BonP&^P3V9w0}AmJ9Nk>6ybQXCL{EHriJ@uKMK=ak)gT>8|t; zI^kpWG0$?AcTnqvSpT!?@}OL(53CoK>vXK&g`QCKoI1YNs_q~6oK*3E z;138qgfR~TolKn{>vZ4;e9#319@0U7rlwtbw;xE@_|Qu4~mnI z%nr*uFI8Gl*{v;;XDx zdVbX$UUTuRh8Q36avkG&7&A>)B;s)L6~ch@tvmGvCw;=7lQXfi%3nF5)m~Xc$Onjf z0X!FTTz|EY;~B>hpk8VQzU}p!HDLvk{Zu;Qao>V?*57UGjQJ zMfW0FZbXBAOLC!|)a&~%gweh*KENKCKHDth-aq;4QPFpRE1ipMQ|+T>da{hf7Z}}b zk`rM6vXUfb>wdZpZykN`H&b1B#d>)ZS99rR{D6&3ueX@-S_<>y;Pte4%#cr_>(^2S zAJea!sn*#p&<1YNXU9OlI7_DVta{#TsebN^K8HZglNs&_ITrbZ?&lE3Apyu)|AA*a zLHy2Z%!dS+dsJZlII!YQ)n7Z#s55h%T2D?Lh7_(Zd+Kz=x51`0(AvWwj&1y4m-`8n zO;c+ei+0;MrRkXSt=*zs0isDFTzXd7>r(wC+H-BUI9rO>!KB;cz1;>cLHljG-PWu| zItcz}pyQ0{x6po_I*j%UuC6cm{y%EBt~CWH!3T&l#5jw256GR&v5l{^wOcoZSOWY+ zfaOwQ8R#zt(q@u{DxqaRa(!}tqTB_7pW=_mDB2Ec1A9B@qC z^&}r`y7faY(6gT0FINF0G{1yrJo7oT=E?~j(vc4ky?1lzzymyAdhaDplK9m#iRzF%1>nA4rzdxqo z;~h^dT6|Y0o*gTn)#bCfe977u+sVmDv7IUvsFA=@_bRR9yn15Z_6&C2``$&Ou#g8W zbd9m{kBxtl=|{_dP|JUCNsU5XQb({A@*i34`BXLKk}e1y6FjnTp*-(7UdLmUGokq$ z#N9>Z=jzTEbi0mLcMBkhc!ck2rt1Wqf*8-c4e&vJkO#$y6;h9Yv=Jrbw}uIF9^n0^ROy@f^XeY(LI2Jt7RkHm${5e~ z3_QpMShH5)i++m-GoJ)~AhLeALHuP;_VrW<@Ymk>Mk zib=KRuVRJxcI3eE$Eh4HIj~a%s1eQdkbm)w`fVEEY5JfCh;rX_=mset;-MFmPKTfX)$t8@N?>y^GU zHN@Kyt)=r*!`l;~-r6{E@goWR7EAVN&$nAzYaeJYczdHQx|GWR@=Mm!{P*Q@;qlyrRS+ z;d?EJr+KsV@)H#7{_8W<|EUoD8uTP0ublBXSFhy*UURE!0Y#sA>|SpVFO>QcT3_f- zUE|F$H9lZmLBCt;vI8})(3dn6>q@03;Gw@Z)791C8t>{&d<^<(WfbKy!~W4qe!K-x z_X{;X4RNhfs_)*^t-~t0N-qr@B*!s&s#3xVQ+P*^XL_Y6{D3KZ)D>PQS5sqNd`^Aa zTaV)?|48~yD7(NB>iZt$^c@wh>d*CfKo7M@hh0YK=}%9n{)_Qjk2gQcW7SB2$b;{AAPk7|#Oa7+B(#0ksO`H(WD9NI zHF7lwIUclq*Qo8gMutT+_MK8oh^(%)2(X0fR*9e zKdN>zZKU_~ia77-^?F`nma8p%ercfB z;L#fvB^c{zkltmV+A~Y9=de99KI|EI*G~1c_lgzV9>VHST-N zDJ_S4Pi?mVSg!lGYR~o0x!*MSi_jiht~hX9u2W*&68U_CmEAWqJpATJ%EZ_2R_?v* zWd9E=e2G;fAOD{25%@}9mcDG97(w4apF!W5y@9^DIE8%r^;X{)wqgHFG@!;utE)8b zccru>A>T;hdp(QjG>HO_Hq>eYOj3sh9YDXM|-%22KhX$6OnJZp#^dW z)_JTeCntq|a_Ezp-1AuwV! zeqh160icbfl!6j|rM_4I8&_ac3@EzI{R}mL4KQ7_1Tu3O literal 0 HcmV?d00001 From a52b7e26ec3d5b8354f5ef975d19d55816c7d576 Mon Sep 17 00:00:00 2001 From: JoshuaMoelans <60878493+JoshuaMoelans@users.noreply.github.com> Date: Thu, 15 Jan 2026 16:29:29 +0100 Subject: [PATCH 2/2] (temp) also run in CI --- {scripts => tests}/test_extract_minidump.py | 117 +++++++++++--------- 1 file changed, 62 insertions(+), 55 deletions(-) rename {scripts => tests}/test_extract_minidump.py (69%) mode change 100755 => 100644 diff --git a/scripts/test_extract_minidump.py b/tests/test_extract_minidump.py old mode 100755 new mode 100644 similarity index 69% rename from scripts/test_extract_minidump.py rename to tests/test_extract_minidump.py index 375fd0338..9ec307d56 --- a/scripts/test_extract_minidump.py +++ b/tests/test_extract_minidump.py @@ -7,17 +7,20 @@ """ import hashlib -import os +import json +import shutil import sys import tempfile import unittest from pathlib import Path # Add the scripts directory to the path so we can import extract_minidump -SCRIPT_DIR = Path(__file__).parent -sys.path.insert(0, str(SCRIPT_DIR)) +TESTS_DIR = Path(__file__).parent +REPO_ROOT = TESTS_DIR.parent +SCRIPTS_DIR = REPO_ROOT / "scripts" +sys.path.insert(0, str(SCRIPTS_DIR)) -from extract_minidump import parse_envelope, extract_minidump, list_envelope_contents +from extract_minidump import parse_envelope, extract_minidump class TestExtractMinidump(unittest.TestCase): @@ -26,8 +29,7 @@ class TestExtractMinidump(unittest.TestCase): @classmethod def setUpClass(cls): """Set up test fixtures paths.""" - cls.repo_root = SCRIPT_DIR.parent - cls.fixtures_dir = cls.repo_root / "tests" / "fixtures" + cls.fixtures_dir = TESTS_DIR / "fixtures" cls.envelope_path = cls.fixtures_dir / "minidump.envelope" cls.original_minidump_path = cls.fixtures_dir / "minidump.dmp" @@ -35,7 +37,7 @@ def setUpClass(cls): if not cls.envelope_path.exists(): raise FileNotFoundError( f"Envelope fixture not found: {cls.envelope_path}\n" - "Run create_envelope_fixture.py to create it." + "Run scripts/create_envelope_fixture.py to create it." ) if not cls.original_minidump_path.exists(): raise FileNotFoundError( @@ -44,26 +46,26 @@ def setUpClass(cls): def test_parse_envelope_structure(self): """Test that envelope parsing returns correct structure.""" - with open(self.envelope_path, 'rb') as f: + with open(self.envelope_path, "rb") as f: data = f.read() envelope_header, items = parse_envelope(data) # Check envelope header - self.assertIn('dsn', envelope_header) - self.assertIn('event_id', envelope_header) + self.assertIn("dsn", envelope_header) + self.assertIn("event_id", envelope_header) # Check we have at least 2 items (event + attachment) self.assertGreaterEqual(len(items), 2) # Check item types - item_types = [item[0].get('type') for item in items] - self.assertIn('event', item_types) - self.assertIn('attachment', item_types) + item_types = [item[0].get("type") for item in items] + self.assertIn("event", item_types) + self.assertIn("attachment", item_types) def test_parse_envelope_minidump_header(self): """Test that the minidump attachment header is correct.""" - with open(self.envelope_path, 'rb') as f: + with open(self.envelope_path, "rb") as f: data = f.read() envelope_header, items = parse_envelope(data) @@ -71,21 +73,21 @@ def test_parse_envelope_minidump_header(self): # Find minidump item minidump_item = None for item_header, item_payload in items: - if item_header.get('attachment_type') == 'event.minidump': + if item_header.get("attachment_type") == "event.minidump": minidump_item = (item_header, item_payload) break self.assertIsNotNone(minidump_item, "No minidump attachment found") header, payload = minidump_item - self.assertEqual(header['type'], 'attachment') - self.assertEqual(header['attachment_type'], 'event.minidump') - self.assertIn('filename', header) - self.assertEqual(header['length'], len(payload)) + self.assertEqual(header["type"], "attachment") + self.assertEqual(header["attachment_type"], "event.minidump") + self.assertIn("filename", header) + self.assertEqual(header["length"], len(payload)) def test_minidump_magic_bytes(self): """Test that extracted minidump has correct magic bytes.""" - with open(self.envelope_path, 'rb') as f: + with open(self.envelope_path, "rb") as f: data = f.read() envelope_header, items = parse_envelope(data) @@ -93,14 +95,15 @@ def test_minidump_magic_bytes(self): # Find minidump payload minidump_payload = None for item_header, item_payload in items: - if item_header.get('attachment_type') == 'event.minidump': + if item_header.get("attachment_type") == "event.minidump": minidump_payload = item_payload break self.assertIsNotNone(minidump_payload) # MDMP is the minidump magic signature - self.assertEqual(minidump_payload[:4], b'MDMP', - "Minidump should start with MDMP magic bytes") + self.assertEqual( + minidump_payload[:4], b"MDMP", "Minidump should start with MDMP magic bytes" + ) def test_extract_minidump_matches_original(self): """Test that extracted minidump is identical to original.""" @@ -114,19 +117,25 @@ def test_extract_minidump_matches_original(self): self.assertTrue(output_path.exists()) # Compare with original - with open(self.original_minidump_path, 'rb') as f: + with open(self.original_minidump_path, "rb") as f: original_data = f.read() - with open(output_path, 'rb') as f: + with open(output_path, "rb") as f: extracted_data = f.read() # Compare sizes - self.assertEqual(len(extracted_data), len(original_data), - f"Size mismatch: extracted={len(extracted_data)}, " - f"original={len(original_data)}") + self.assertEqual( + len(extracted_data), + len(original_data), + f"Size mismatch: extracted={len(extracted_data)}, " + f"original={len(original_data)}", + ) # Compare content - self.assertEqual(extracted_data, original_data, - "Extracted minidump content differs from original") + self.assertEqual( + extracted_data, + original_data, + "Extracted minidump content differs from original", + ) def test_extract_minidump_hash_comparison(self): """Test extraction using hash comparison for additional verification.""" @@ -136,20 +145,22 @@ def test_extract_minidump_hash_comparison(self): extract_minidump(str(self.envelope_path), str(output_path)) # Calculate hashes - with open(self.original_minidump_path, 'rb') as f: + with open(self.original_minidump_path, "rb") as f: original_hash = hashlib.md5(f.read()).hexdigest() - with open(output_path, 'rb') as f: + with open(output_path, "rb") as f: extracted_hash = hashlib.md5(f.read()).hexdigest() - self.assertEqual(extracted_hash, original_hash, - f"MD5 hash mismatch: extracted={extracted_hash}, " - f"original={original_hash}") + self.assertEqual( + extracted_hash, + original_hash, + f"MD5 hash mismatch: extracted={extracted_hash}, " + f"original={original_hash}", + ) def test_extract_minidump_default_filename(self): """Test that extraction uses filename from envelope when not specified.""" with tempfile.TemporaryDirectory() as tmpdir: # Copy envelope to temp dir so output goes there - import shutil temp_envelope = Path(tmpdir) / "test.envelope" shutil.copy(self.envelope_path, temp_envelope) @@ -169,19 +180,18 @@ def test_envelope_without_minidump(self): """Test that extraction fails gracefully when no minidump present.""" with tempfile.TemporaryDirectory() as tmpdir: # Create envelope without minidump - import json envelope_path = Path(tmpdir) / "no_minidump.envelope" envelope_header = {"dsn": "https://test@sentry.invalid/42"} event_payload = {"event_id": "test", "level": "info"} - event_bytes = json.dumps(event_payload).encode('utf-8') + event_bytes = json.dumps(event_payload).encode("utf-8") event_header = {"type": "event", "length": len(event_bytes)} - with open(envelope_path, 'wb') as f: - f.write(json.dumps(envelope_header).encode('utf-8')) - f.write(b'\n') - f.write(json.dumps(event_header).encode('utf-8')) - f.write(b'\n') + with open(envelope_path, "wb") as f: + f.write(json.dumps(envelope_header).encode("utf-8")) + f.write(b"\n") + f.write(json.dumps(event_header).encode("utf-8")) + f.write(b"\n") f.write(event_bytes) with self.assertRaises(ValueError) as ctx: @@ -196,32 +206,29 @@ class TestParseEnvelope(unittest.TestCase): def test_parse_empty_envelope(self): """Test parsing empty data.""" with self.assertRaises(Exception): - parse_envelope(b'') + parse_envelope(b"") def test_parse_header_only(self): """Test parsing envelope with only header.""" - import json - data = json.dumps({"dsn": "test"}).encode('utf-8') + b'\n' + data = json.dumps({"dsn": "test"}).encode("utf-8") + b"\n" header, items = parse_envelope(data) - self.assertEqual(header['dsn'], 'test') + self.assertEqual(header["dsn"], "test") self.assertEqual(len(items), 0) def test_parse_multiple_items(self): """Test parsing envelope with multiple items.""" - import json - envelope_header = {"dsn": "test"} item1_payload = b"payload1" item1_header = {"type": "event", "length": len(item1_payload)} item2_payload = b"payload2" item2_header = {"type": "attachment", "length": len(item2_payload)} - data = b'' - data += json.dumps(envelope_header).encode('utf-8') + b'\n' - data += json.dumps(item1_header).encode('utf-8') + b'\n' + data = b"" + data += json.dumps(envelope_header).encode("utf-8") + b"\n" + data += json.dumps(item1_header).encode("utf-8") + b"\n" data += item1_payload - data += b'\n' - data += json.dumps(item2_header).encode('utf-8') + b'\n' + data += b"\n" + data += json.dumps(item2_header).encode("utf-8") + b"\n" data += item2_payload header, items = parse_envelope(data) @@ -231,6 +238,6 @@ def test_parse_multiple_items(self): self.assertEqual(items[1][1], item2_payload) -if __name__ == '__main__': +if __name__ == "__main__": # Run tests with verbosity unittest.main(verbosity=2)