diff --git a/openverifiablellm/utils.py b/openverifiablellm/utils.py index 47cace0..3207130 100644 --- a/openverifiablellm/utils.py +++ b/openverifiablellm/utils.py @@ -187,7 +187,7 @@ def verify_merkle_proof(chunk_bytes: bytes, proof, merkle_root: str) -> bool: # extract clean wikipage from actual wikipage -CHECKPOINT_INTERVAL = 1_000 # Save checkpoint every N pages +CHECKPOINT_INTERVAL = 100 # Save checkpoint every N pages def _checkpoint_path(output_dir: Path) -> Path: @@ -202,7 +202,11 @@ def _compute_input_identity(input_path: Path) -> str: def _load_checkpoint(checkpoint_path: Path, input_path: Path, output_path: Path) -> Dict[str, Any]: """Load checkpoint safely and validate resume conditions.""" if not checkpoint_path.exists(): - return {"pages_processed": 0} + return { + "pages_processed": 0, + "input_identity": _compute_input_identity(input_path), + "file_offset": 0, + } try: with checkpoint_path.open("r", encoding="utf-8") as f: @@ -224,14 +228,22 @@ def _load_checkpoint(checkpoint_path: Path, input_path: Path, output_path: Path) logger.info("Resuming from checkpoint: %d pages already processed", pages_processed) - return data + return { + "pages_processed": pages_processed, + "input_identity": stored_identity, + "file_offset": data.get("file_offset", 0), + } except Exception as e: logger.warning("Checkpoint invalid (%s) — starting fresh.", e) - return {"pages_processed": 0} + return { + "pages_processed": 0, + "input_identity": _compute_input_identity(input_path), + "file_offset": 0, + } -def _save_checkpoint(checkpoint_path: Path, pages_processed: int, input_identity: str) -> None: +def _save_checkpoint(checkpoint_path: Path, pages_processed: int, input_identity: str, file_offset: int,) -> None: """Atomically save checkpoint with input identity.""" tmp = checkpoint_path.with_suffix(".tmp") @@ -239,6 +251,7 @@ def _save_checkpoint(checkpoint_path: Path, pages_processed: int, input_identity checkpoint_data = { "pages_processed": pages_processed, "input_identity": input_identity, + "file_offset": file_offset, } with tmp.open("w", encoding="utf-8") as f: @@ -254,35 +267,8 @@ def _save_checkpoint(checkpoint_path: Path, pages_processed: int, input_identity def extract_text_from_xml(input_path, *, write_manifest: bool = False): - """ - Process a Wikipedia XML dump (compressed or uncompressed) into cleaned plain text. - - Each element is parsed, its revision text is extracted, - cleaned using `clean_wikitext()`, and appended to a single - output text file. - - The processed output is saved to: - data/processed/wiki_clean.txt - - Supports resuming interrupted runs via a checkpoint file - (data/processed/wiki_clean.checkpoint.json). If the checkpoint - exists, already-processed pages are skipped and new pages are - appended to the existing output. Delete the checkpoint file to - force a full reprocessing from scratch. - - Parameters - ---------- - input_path : str or Path - Path to the Wikipedia XML dump file. - - Output - ------ - Creates: - data/processed/wiki_clean.txt - """ input_path = Path(input_path) - # Fixed output path project_root = Path.cwd() output_dir = project_root / "data" / "processed" output_dir.mkdir(parents=True, exist_ok=True) @@ -290,14 +276,31 @@ def extract_text_from_xml(input_path, *, write_manifest: bool = False): output_path = output_dir / "wiki_clean.txt" checkpoint_path = _checkpoint_path(output_dir) - # Load checkpoint — tells us how many pages were already written + # Load checkpoint checkpoint = _load_checkpoint(checkpoint_path, input_path, output_path) - pages_already_done = checkpoint["pages_processed"] + pages_already_done = checkpoint.get("pages_processed", 0) + input_identity = checkpoint.get("input_identity") + file_offset = checkpoint.get("file_offset", 0) + + # ================== FIX FOR ISSUE #76 ================== + # Ensure output file matches checkpoint state using byte offset + if output_path.exists() and file_offset > 0: + with output_path.open("rb+") as f: + f.seek(0, os.SEEK_END) + current_size = f.tell() + + if current_size > file_offset: + logger.warning( + "Output file ahead of checkpoint (%d > %d). Truncating...", + current_size, + file_offset, + ) + f.truncate(file_offset) + # ====================================================== - # If resuming, append to existing output; otherwise start fresh write_mode = "a" if pages_already_done > 0 else "w" - # Auto-detect file type using magic bytes separation + # Detect file type with open(input_path, "rb") as test_f: is_bz2 = test_f.read(3) == b"BZh" @@ -311,11 +314,14 @@ def extract_text_from_xml(input_path, *, write_manifest: bool = False): context = ET.iterparse(f, events=("end",)) with open(output_path, write_mode, encoding="utf-8") as out: + # Move pointer to correct position when resuming + if write_mode == "a" and output_path.exists(): + out.seek(0, os.SEEK_END) + for _, elem in context: if elem.tag.endswith("page"): pages_seen += 1 - # Skip pages already processed in a previous run if pages_seen <= pages_already_done: elem.clear() continue @@ -326,36 +332,42 @@ def extract_text_from_xml(input_path, *, write_manifest: bool = False): cleaned = clean_wikitext(text_elem.text) if cleaned: out.write(cleaned + "\n\n") + out.flush() + file_offset = output_path.stat().st_size # Track exact byte position pages_written += 1 elem.clear() - # Flush output and save checkpoint periodically + # Save checkpoint periodically if pages_written % CHECKPOINT_INTERVAL == 0: - out.flush() - _save_checkpoint(checkpoint_path, pages_written, input_path) + _save_checkpoint( + checkpoint_path, + pages_written, + input_identity, + file_offset, + ) + except KeyboardInterrupt: - _save_checkpoint(checkpoint_path, pages_written, input_path) + _save_checkpoint(checkpoint_path, pages_written, input_identity, file_offset) logger.warning("Interrupted by user after %d pages. Run again to resume.", pages_written) raise + except Exception: - # Save progress before propagating the exception so the next run can resume - _save_checkpoint(checkpoint_path, pages_written, input_path) + _save_checkpoint(checkpoint_path, pages_written, input_identity, file_offset) logger.error("Processing interrupted after %d pages. Run again to resume.", pages_written) raise - # Processing finished successfully — remove checkpoint so a fresh - # re-run (if ever needed) starts from the beginning if write_manifest: generate_manifest(input_path, output_path) + checkpoint_path.unlink(missing_ok=True) + logger.info( "Preprocessing complete. %d pages processed. Output saved to %s", pages_written, output_path, ) - # generate data manifest def generate_manifest(raw_path, processed_path): raw_path = Path(raw_path) diff --git a/test.xml b/test.xml new file mode 100644 index 0000000..f14b1a3 --- /dev/null +++ b/test.xml @@ -0,0 +1,14 @@ + + + Test1 + + This is [[sample]] text + + + + Test2 + + Another {{template}} example + + + \ No newline at end of file