Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
96 changes: 58 additions & 38 deletions openverifiablellm/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -252,52 +252,72 @@ def _save_checkpoint(checkpoint_path: Path, pages_processed: int, input_identity
logger.warning("Failed to save checkpoint: %s", e)
tmp.unlink(missing_ok=True)

def count_written_pages(output_path: Path) -> int:
"""
Count number of processed pages based on output file.
Assumes each page is separated by double newline.
"""
if not output_path.exists():
return 0

def extract_text_from_xml(input_path, *, write_manifest: bool = False):
with output_path.open("r", encoding="utf-8") as f:
content = f.read().strip()

if not content:
return 0

return len(content.split("\n\n"))


def truncate_output_to_pages(output_path: Path, max_pages: int) -> None:
"""
Process a Wikipedia XML dump (compressed or uncompressed) into cleaned plain text.

Each <page> element is parsed, its revision text is extracted,
cleaned using `clean_wikitext()`, and appended to a single
output text file.

The processed output is saved to:
data/processed/wiki_clean.txt

Supports resuming interrupted runs via a checkpoint file
(data/processed/wiki_clean.checkpoint.json). If the checkpoint
exists, already-processed pages are skipped and new pages are
appended to the existing output. Delete the checkpoint file to
force a full reprocessing from scratch.

Parameters
----------
input_path : str or Path
Path to the Wikipedia XML dump file.

Output
------
Creates:
data/processed/wiki_clean.txt
Truncate output file to match checkpoint page count.
"""
with output_path.open("r", encoding="utf-8") as f:
content = f.read().strip()

if not content:
return

pages = content.split("\n\n")

with output_path.open("w", encoding="utf-8") as f:
if pages[:max_pages]:
f.write("\n\n".join(pages[:max_pages]) + "\n\n")
else:
f.write("")


def extract_text_from_xml(input_path, *, write_manifest: bool = False):
input_path = Path(input_path)

# Fixed output path
project_root = Path.cwd()
output_dir = project_root / "data" / "processed"
output_dir.mkdir(parents=True, exist_ok=True)

output_path = output_dir / "wiki_clean.txt"
checkpoint_path = _checkpoint_path(output_dir)

# Load checkpoint — tells us how many pages were already written
checkpoint = _load_checkpoint(checkpoint_path, input_path, output_path)
pages_already_done = checkpoint["pages_processed"]

# If resuming, append to existing output; otherwise start fresh
# ================== FIX FOR ISSUE #76 ==================
written_pages = count_written_pages(output_path)

if written_pages > pages_already_done:
logger.warning(
"Output file ahead of checkpoint (%d > %d). Truncating...",
written_pages,
pages_already_done,
)
truncate_output_to_pages(output_path, pages_already_done)
# ======================================================

write_mode = "a" if pages_already_done > 0 else "w"

# Auto-detect file type using magic bytes separation
# FIX: correct input identity usage
input_identity = _compute_input_identity(input_path)

with open(input_path, "rb") as test_f:
is_bz2 = test_f.read(3) == b"BZh"

Expand All @@ -315,7 +335,6 @@ def extract_text_from_xml(input_path, *, write_manifest: bool = False):
if elem.tag.endswith("page"):
pages_seen += 1

# Skip pages already processed in a previous run
if pages_seen <= pages_already_done:
elem.clear()
continue
Expand All @@ -330,25 +349,26 @@ def extract_text_from_xml(input_path, *, write_manifest: bool = False):
pages_written += 1
elem.clear()

# Flush output and save checkpoint periodically
if pages_written % CHECKPOINT_INTERVAL == 0:
# More frequent checkpointing (safer)
if pages_written % 100 == 0:
out.flush()
_save_checkpoint(checkpoint_path, pages_written, input_path)
_save_checkpoint(checkpoint_path, pages_written, input_identity)

except KeyboardInterrupt:
_save_checkpoint(checkpoint_path, pages_written, input_path)
_save_checkpoint(checkpoint_path, pages_written, input_identity)
logger.warning("Interrupted by user after %d pages. Run again to resume.", pages_written)
raise

except Exception:
# Save progress before propagating the exception so the next run can resume
_save_checkpoint(checkpoint_path, pages_written, input_path)
_save_checkpoint(checkpoint_path, pages_written, input_identity)
logger.error("Processing interrupted after %d pages. Run again to resume.", pages_written)
raise

# Processing finished successfully — remove checkpoint so a fresh
# re-run (if ever needed) starts from the beginning
if write_manifest:
generate_manifest(input_path, output_path)

checkpoint_path.unlink(missing_ok=True)

logger.info(
"Preprocessing complete. %d pages processed. Output saved to %s",
pages_written,
Expand Down
14 changes: 14 additions & 0 deletions test.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
<mediawiki>
<page>
<title>Test1</title>
<revision>
<text>This is [[sample]] text</text>
</revision>
</page>
<page>
<title>Test2</title>
<revision>
<text>Another {{template}} example</text>
</revision>
</page>
</mediawiki>
Comment on lines +1 to +14
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧹 Nitpick | 🔵 Trivial

Add an empty-output page to this fixture.

Both current pages still produce non-empty cleaned text, so this file cannot exercise the counter skew that happens when a processed page emits nothing. A page like <text>{{template}}</text> would cover the resume failure mode this PR is targeting.

Suggested fixture addition
 <mediawiki>
   <page>
     <title>Test1</title>
     <revision>
       <text>This is [[sample]] text</text>
     </revision>
   </page>
   <page>
     <title>Test2</title>
     <revision>
       <text>Another {{template}} example</text>
     </revision>
   </page>
+  <page>
+    <title>EmptyAfterClean</title>
+    <revision>
+      <text>{{template}}</text>
+    </revision>
+  </page>
 </mediawiki>
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
<mediawiki>
<page>
<title>Test1</title>
<revision>
<text>This is [[sample]] text</text>
</revision>
</page>
<page>
<title>Test2</title>
<revision>
<text>Another {{template}} example</text>
</revision>
</page>
</mediawiki>
<mediawiki>
<page>
<title>Test1</title>
<revision>
<text>This is [[sample]] text</text>
</revision>
</page>
<page>
<title>Test2</title>
<revision>
<text>Another {{template}} example</text>
</revision>
</page>
<page>
<title>EmptyAfterClean</title>
<revision>
<text>{{template}}</text>
</revision>
</page>
</mediawiki>
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@test.xml` around lines 1 - 14, Add a new <page> entry to the XML fixture that
will produce an empty cleaned output to exercise the resume/skew case; create a
page element (use the same structure as existing pages: <page>, <title>,
<revision>, <text>) with a title like "EmptyOutput" and a
<text>{{template}}</text> (or other content that cleans to nothing) so that
processing of that page emits no cleaned text and triggers the edge case.

Loading