Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
104 changes: 58 additions & 46 deletions openverifiablellm/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,7 @@ def verify_merkle_proof(chunk_bytes: bytes, proof, merkle_root: str) -> bool:


# extract clean wikipage from actual wikipage
CHECKPOINT_INTERVAL = 1_000 # Save checkpoint every N pages
CHECKPOINT_INTERVAL = 100 # Save checkpoint every N pages


def _checkpoint_path(output_dir: Path) -> Path:
Expand All @@ -202,7 +202,11 @@ def _compute_input_identity(input_path: Path) -> str:
def _load_checkpoint(checkpoint_path: Path, input_path: Path, output_path: Path) -> Dict[str, Any]:
"""Load checkpoint safely and validate resume conditions."""
if not checkpoint_path.exists():
return {"pages_processed": 0}
return {
"pages_processed": 0,
"input_identity": _compute_input_identity(input_path),
"file_offset": 0,
}

try:
with checkpoint_path.open("r", encoding="utf-8") as f:
Expand All @@ -224,21 +228,30 @@ def _load_checkpoint(checkpoint_path: Path, input_path: Path, output_path: Path)

logger.info("Resuming from checkpoint: %d pages already processed", pages_processed)

return data
return {
"pages_processed": pages_processed,
"input_identity": stored_identity,
"file_offset": data.get("file_offset", 0),
}

except Exception as e:
logger.warning("Checkpoint invalid (%s) — starting fresh.", e)
return {"pages_processed": 0}
return {
"pages_processed": 0,
"input_identity": _compute_input_identity(input_path),
"file_offset": 0,
}


def _save_checkpoint(checkpoint_path: Path, pages_processed: int, input_identity: str) -> None:
def _save_checkpoint(checkpoint_path: Path, pages_processed: int, input_identity: str, file_offset: int,) -> None:
"""Atomically save checkpoint with input identity."""
tmp = checkpoint_path.with_suffix(".tmp")

try:
checkpoint_data = {
"pages_processed": pages_processed,
"input_identity": input_identity,
"file_offset": file_offset,
}

with tmp.open("w", encoding="utf-8") as f:
Expand All @@ -254,50 +267,40 @@ def _save_checkpoint(checkpoint_path: Path, pages_processed: int, input_identity


def extract_text_from_xml(input_path, *, write_manifest: bool = False):
"""
Process a Wikipedia XML dump (compressed or uncompressed) into cleaned plain text.

Each <page> element is parsed, its revision text is extracted,
cleaned using `clean_wikitext()`, and appended to a single
output text file.

The processed output is saved to:
data/processed/wiki_clean.txt

Supports resuming interrupted runs via a checkpoint file
(data/processed/wiki_clean.checkpoint.json). If the checkpoint
exists, already-processed pages are skipped and new pages are
appended to the existing output. Delete the checkpoint file to
force a full reprocessing from scratch.

Parameters
----------
input_path : str or Path
Path to the Wikipedia XML dump file.

Output
------
Creates:
data/processed/wiki_clean.txt
"""
input_path = Path(input_path)

# Fixed output path
project_root = Path.cwd()
output_dir = project_root / "data" / "processed"
output_dir.mkdir(parents=True, exist_ok=True)

output_path = output_dir / "wiki_clean.txt"
checkpoint_path = _checkpoint_path(output_dir)

# Load checkpoint — tells us how many pages were already written
# Load checkpoint
checkpoint = _load_checkpoint(checkpoint_path, input_path, output_path)
pages_already_done = checkpoint["pages_processed"]
pages_already_done = checkpoint.get("pages_processed", 0)
input_identity = checkpoint.get("input_identity")
file_offset = checkpoint.get("file_offset", 0)

# ================== FIX FOR ISSUE #76 ==================
# Ensure output file matches checkpoint state using byte offset
if output_path.exists() and file_offset > 0:
with output_path.open("rb+") as f:
f.seek(0, os.SEEK_END)
current_size = f.tell()

if current_size > file_offset:
logger.warning(
"Output file ahead of checkpoint (%d > %d). Truncating...",
current_size,
file_offset,
)
f.truncate(file_offset)
# ======================================================

# If resuming, append to existing output; otherwise start fresh
write_mode = "a" if pages_already_done > 0 else "w"

# Auto-detect file type using magic bytes separation
# Detect file type
with open(input_path, "rb") as test_f:
is_bz2 = test_f.read(3) == b"BZh"

Expand All @@ -311,11 +314,14 @@ def extract_text_from_xml(input_path, *, write_manifest: bool = False):
context = ET.iterparse(f, events=("end",))

with open(output_path, write_mode, encoding="utf-8") as out:
# Move pointer to correct position when resuming
if write_mode == "a" and output_path.exists():
out.seek(0, os.SEEK_END)

for _, elem in context:
if elem.tag.endswith("page"):
pages_seen += 1

# Skip pages already processed in a previous run
if pages_seen <= pages_already_done:
elem.clear()
continue
Expand All @@ -326,36 +332,42 @@ def extract_text_from_xml(input_path, *, write_manifest: bool = False):
cleaned = clean_wikitext(text_elem.text)
if cleaned:
out.write(cleaned + "\n\n")
out.flush()
file_offset = output_path.stat().st_size # Track exact byte position

pages_written += 1
elem.clear()

# Flush output and save checkpoint periodically
# Save checkpoint periodically
if pages_written % CHECKPOINT_INTERVAL == 0:
out.flush()
_save_checkpoint(checkpoint_path, pages_written, input_path)
_save_checkpoint(
checkpoint_path,
pages_written,
input_identity,
file_offset,
)

except KeyboardInterrupt:
_save_checkpoint(checkpoint_path, pages_written, input_path)
_save_checkpoint(checkpoint_path, pages_written, input_identity, file_offset)
logger.warning("Interrupted by user after %d pages. Run again to resume.", pages_written)
raise

except Exception:
# Save progress before propagating the exception so the next run can resume
_save_checkpoint(checkpoint_path, pages_written, input_path)
_save_checkpoint(checkpoint_path, pages_written, input_identity, file_offset)
logger.error("Processing interrupted after %d pages. Run again to resume.", pages_written)
raise

# Processing finished successfully — remove checkpoint so a fresh
# re-run (if ever needed) starts from the beginning
if write_manifest:
generate_manifest(input_path, output_path)

checkpoint_path.unlink(missing_ok=True)

logger.info(
"Preprocessing complete. %d pages processed. Output saved to %s",
pages_written,
output_path,
)


# generate data manifest
def generate_manifest(raw_path, processed_path):
raw_path = Path(raw_path)
Expand Down
14 changes: 14 additions & 0 deletions test.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
<mediawiki>
<page>
<title>Test1</title>
<revision>
<text>This is [[sample]] text</text>
</revision>
</page>
<page>
<title>Test2</title>
<revision>
<text>Another {{template}} example</text>
</revision>
</page>
</mediawiki>
Comment on lines +1 to +14
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧹 Nitpick | 🔵 Trivial

Add an empty-output page to this fixture.

Both current pages still produce non-empty cleaned text, so this file cannot exercise the counter skew that happens when a processed page emits nothing. A page like <text>{{template}}</text> would cover the resume failure mode this PR is targeting.

Suggested fixture addition
 <mediawiki>
   <page>
     <title>Test1</title>
     <revision>
       <text>This is [[sample]] text</text>
     </revision>
   </page>
   <page>
     <title>Test2</title>
     <revision>
       <text>Another {{template}} example</text>
     </revision>
   </page>
+  <page>
+    <title>EmptyAfterClean</title>
+    <revision>
+      <text>{{template}}</text>
+    </revision>
+  </page>
 </mediawiki>
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
<mediawiki>
<page>
<title>Test1</title>
<revision>
<text>This is [[sample]] text</text>
</revision>
</page>
<page>
<title>Test2</title>
<revision>
<text>Another {{template}} example</text>
</revision>
</page>
</mediawiki>
<mediawiki>
<page>
<title>Test1</title>
<revision>
<text>This is [[sample]] text</text>
</revision>
</page>
<page>
<title>Test2</title>
<revision>
<text>Another {{template}} example</text>
</revision>
</page>
<page>
<title>EmptyAfterClean</title>
<revision>
<text>{{template}}</text>
</revision>
</page>
</mediawiki>
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@test.xml` around lines 1 - 14, Add a new <page> entry to the XML fixture that
will produce an empty cleaned output to exercise the resume/skew case; create a
page element (use the same structure as existing pages: <page>, <title>,
<revision>, <text>) with a title like "EmptyOutput" and a
<text>{{template}}</text> (or other content that cleans to nothing) so that
processing of that page emits no cleaned text and triggers the edge case.

Loading