Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ requires = [
build-backend = "hatchling.build"
[project]
name = "itp_interface"
version = "1.3.0"
version = "1.4.0"
authors = [
{ name="Amitayush Thakur", email="[email protected]" },
]
Expand Down
35 changes: 2 additions & 33 deletions src/itp_interface/lean/lean4_local_data_extraction_transform.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
from pathlib import Path
from filelock import FileLock
from itp_interface.lean.simple_lean4_sync_executor import SimpleLean4SyncExecutor
from itp_interface.lean.parsing_helpers import LeanDeclParser, LeanParseResult, LeanDeclType
from itp_interface.lean.parsing_helpers import LeanDeclType, preprocess_declarations
from itp_interface.tools.coq_training_data_generator import GenericTrainingDataGenerationTransform, TrainingDataGenerationType
from itp_interface.tools.training_data_format import MergableCollection, TrainingDataMetadataFormat, ExtractionDataCollection
from itp_interface.tools.training_data import TrainingData, DataLayoutFormat
Expand Down Expand Up @@ -192,38 +192,7 @@ def _preprocess_declarations(self,
project_path: str,
file_dep_analyses: typing.List[FileDependencyAnalysis]) -> None:
# Preprocess declarations to set file paths and module names
for fda in file_dep_analyses:
for decl in fda.declarations:
self._preprocess_declaration(project_path, decl)

def _preprocess_declaration(
self,
project_path: str,
decl: DeclWithDependencies) -> None:
# Filter all unknown types
if decl.decl_info.decl_type == LeanDeclType.UNKNOWN.value:
# Check if we have docstring or not
if decl.decl_info.doc_string is None or decl.decl_info.doc_string.strip() != "":
parser = LeanDeclParser(decl.decl_info.text)
parse_result = parser.parse()
if parse_result.doc_string is not None:
decl.decl_info.doc_string = parse_result.doc_string.strip()
if parse_result.decl_type != LeanDeclType.UNKNOWN:
decl.decl_info.decl_type = str(parse_result.decl_type)
if parse_result.text is not None:
full_text = []
if parse_result.text_before is not None:
full_text.append(parse_result.text_before.strip())
full_text.append(parse_result.text.strip())
text = "\n".join(full_text)
decl.decl_info.text = text
# Update the proof if not already present
if decl.decl_info.proof is None and parse_result.proof is not None:
decl.decl_info.proof = parse_result.proof.strip()
if parse_result.name is not None:
decl.decl_info.name = parse_result.name.strip()
pass

preprocess_declarations(file_dep_analyses)

def __call__(self,
training_data: TrainingData,
Expand Down
39 changes: 35 additions & 4 deletions src/itp_interface/lean/parsing_helpers.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
import sys
import re
from enum import Enum
from dataclasses import dataclass
from typing import Optional, Set
from typing import Optional
from itp_interface.lean.tactic_parser import DeclWithDependencies, FileDependencyAnalysis

class LeanDeclType(Enum):
LEMMA = "lemma"
Expand Down Expand Up @@ -373,4 +372,36 @@ def _is_word_boundary(self, idx):

def parse_lean_text(text: str) -> LeanParseResult:
parser = LeanDeclParser(text)
return parser.parse()
return parser.parse()

def preprocess_declarations(
file_dep_analyses: list[FileDependencyAnalysis]) -> None:
# Preprocess declarations to set file paths and module names
for fda in file_dep_analyses:
for decl in fda.declarations:
preprocess_declaration(decl)

def preprocess_declaration(
decl: DeclWithDependencies) -> None:
# Filter all unknown types
if decl.decl_info.decl_type == LeanDeclType.UNKNOWN.value:
# Check if we have docstring or not
if decl.decl_info.doc_string is None or decl.decl_info.doc_string.strip() != "":
parser = LeanDeclParser(decl.decl_info.text)
parse_result = parser.parse()
if parse_result.doc_string is not None:
decl.decl_info.doc_string = parse_result.doc_string.strip()
if parse_result.decl_type != LeanDeclType.UNKNOWN:
decl.decl_info.decl_type = str(parse_result.decl_type)
if parse_result.text is not None:
full_text = []
if parse_result.text_before is not None:
full_text.append(parse_result.text_before.strip())
full_text.append(parse_result.text.strip())
text = "\n".join(full_text)
decl.decl_info.text = text
# Update the proof if not already present
if decl.decl_info.proof is None and parse_result.proof is not None:
decl.decl_info.proof = parse_result.proof.strip()
if parse_result.name is not None:
decl.decl_info.name = parse_result.name.strip()
151 changes: 109 additions & 42 deletions src/itp_interface/lean/simple_lean4_sync_executor.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#!/usr/bin/env python3

import os
import base64
import copy
import random
import logging
Expand All @@ -25,12 +26,16 @@
from itp_interface.tools.misc_defns import HammerMode
from itp_interface.tools.iter_helpers import ClonableIterator
from typing import List, Optional, Tuple, OrderedDict, Dict
from tempfile import gettempdir, NamedTemporaryFile
from itp_interface.lean.parsing_helpers import preprocess_declarations

class SimpleLean4SyncExecutor:
theorem_regex = r"((((theorem|lemma)[\s]+([^\s:]*))|example)([\S|\s]*?)(:=|=>)[\s]*?)[\s]+"
theorem_match = re.compile(theorem_regex, re.MULTILINE)
theorem_has_by = r"^[\s]*:=[\s]*by[\s]*"
have_regex = r"(^\s*have\s+([^:]*):([\s|\S]*?))(:=\s*by)([\s|\S]*)"
have_match = re.compile(have_regex, re.MULTILINE)
theorem_has_by_match = re.compile(theorem_has_by, re.MULTILINE)
unsolved_message = "unsolved goals"
no_goals = "No goals to be solved"
no_goals_alternative = "no goals to be solved"
Expand Down Expand Up @@ -176,7 +181,7 @@ def __enter__(self):
self.tactic_parser = TacticParser(project_path=self.project_root, logger=self.logger)
if self.main_file_iter is None:
assert self.main_file is not None, "main_file must be set if main_file_iter is None"
self.main_file_iter = LeanLineByLineReader(self.main_file, remove_comments=True, no_strip=True).instruction_step_generator()
self.main_file_iter = LeanLineByLineReader(self.main_file, remove_comments=False, no_strip=True).instruction_step_generator()
return self

def __exit__(self, exc_type, exc_value, traceback):
Expand All @@ -185,6 +190,9 @@ def __exit__(self, exc_type, exc_value, traceback):
if os.path.exists(self.temp_file_full_path):
os.remove(self.temp_file_full_path)

def associated_file(self) -> str|None:
return self.main_file

def is_in_proof_mode(self):
return True if self.proof_context else (len(self.lean_error_messages) > 0) # It is still in proof mode if we encountered a wrong proof

Expand Down Expand Up @@ -217,13 +225,15 @@ def run_next(self) -> bool:
self._lines_executed.append("") # Add an empty line to keep the line numbers in sync
return True

def extract_all_theorems_and_definitions(self, json_output_path: str|None = None) -> List[FileDependencyAnalysis]:
assert self.main_file is not None, "main_file must be set to extract theorems and definitions"
def extract_all_theorems_and_definitions(self, json_output_path: str|None = None, file_path: str|None = None) -> List[FileDependencyAnalysis]:
assert self.tactic_parser is not None, "tactic_parser must be initialized to extract theorems and definitions"

json_output_path = json_output_path if json_output_path is not None else self.main_file + ".dependency_analysis.json"
file_path = file_path if file_path is not None else self.main_file
assert file_path is not None, "Either file_path or main_file must be provided to extract theorems and definitions"
assert os.path.exists(file_path), f"file_path must exist to extract theorems and definitions ({file_path})"
json_output_path = json_output_path if json_output_path is not None else file_path + ".dependency_analysis.json"
file_dependency_analysis, _ = self.tactic_parser.parse_file(
self.main_file,
file_path,
parse_type=RequestType.PARSE_DEPENDS,
json_output_path=json_output_path)
return file_dependency_analysis
Expand Down Expand Up @@ -589,20 +599,58 @@ def _skip_to_theorem(self, theorem: str):
lines : list[str] = []
assert self.main_file_iter is not None, "main_file_iter should not be None"
assert self.tactic_parser is not None, "Tactic parser is not initialized"
while True:
try:
stmt = next(self.main_file_iter)
except StopIteration:
break
lines.append(stmt)
full_file = "\n".join(lines) + "\n"
extraction_path = self._get_file_path_in_cache()
can_fetch_from_cache = extraction_path is not None
file_dep_analysis = None
if can_fetch_from_cache:
file_path = self.associated_file()
assert file_path is not None, "File path should not be None"
if os.path.exists(extraction_path):
temp_extraction_path_file = open(extraction_path, "r", encoding="utf-8")
file_dep_analysis_str = temp_extraction_path_file.read()
file_dep_analysis = [FileDependencyAnalysis.load_from_string(file_dep_analysis_str)]
else:
file_dep_analysis = self.extract_all_theorems_and_definitions(
json_output_path=extraction_path, file_path=file_path)
assert file_dep_analysis is not None, "File dependency analysis should not be None"
assert len(file_dep_analysis) > 0, "File dependency analysis should not be empty"
temp_extraction_path_file = open(extraction_path, "w", encoding="utf-8")
with temp_extraction_path_file:
temp_extraction_path_file.write(file_dep_analysis[0].to_json())
with open(file_path, "r", encoding="utf-8") as f:
lines = f.readlines()
else:
while True:
try:
stmt = next(self.main_file_iter)
except StopIteration:
break
lines.append(stmt)
full_file = "\n".join(lines) + "\n"
# Dump the file in the temp file
with open(self.temp_file_full_path, "w", encoding="utf-8") as f:
f.write(full_file)
file_path = self.temp_file_full_path
temp_extraction_path_file = NamedTemporaryFile(
prefix="lean4_dep_analysis_",
suffix=".json",
delete_on_close=True)
extraction_path = temp_extraction_path_file.name
file_dep_analysis = self.extract_all_theorems_and_definitions(
json_output_path=extraction_path, file_path=file_path)
temp_extraction_path_file.close()
# Remove the temp file
if os.path.exists(self.temp_file_full_path):
os.remove(self.temp_file_full_path)
assert file_dep_analysis is not None, "File dependency analysis should not be None"
assert len(file_dep_analysis) > 0, "File dependency analysis should not be empty"
preprocess_declarations(file_dep_analysis)
file_dep_analysis = file_dep_analysis[0]
# run the tactic parser in theorem parsing mode
lean_line_infos, _ = self.tactic_parser.parse(
full_file,
fail_on_error=False,
parse_type=RequestType.PARSE_THEOREM)
# Filter out theorems and lemmas
theorems = [info for info in lean_line_infos if info.decl_type == "theorem" or info.decl_type == "lemma"]

theorems = [decl.decl_info for decl in file_dep_analysis.declarations
if decl.decl_info.decl_type == "theorem" or decl.decl_info.decl_type == "lemma"]
found_theorem = False
for thm in theorems:
name = thm.name
Expand All @@ -620,39 +668,31 @@ def _skip_to_theorem(self, theorem: str):
self._lines_executed = lines[:line_num - 1]
theorem_text = thm.text

self._content_till_last_theorem_stmt = "\n".join(self._lines_executed)
assert not theorem_text.endswith(':='), "Theorem text should not end with ':='"
theorem_text = theorem_text + " :="
content_until_after_theorem = "\n".join(self._lines_executed) + "\n" + theorem_text

# Parse out tactics now
all_tactics_till_now, _ = self.tactic_parser.parse(content_until_after_theorem, fail_on_error=True, parse_type=RequestType.PARSE_TACTICS)
# Find the first index which is after the theorem line
first_idx_after_theorem = None
for idx, tactic_info in enumerate(all_tactics_till_now):
if tactic_info.line >= line_num:
first_idx_after_theorem = idx
break
if first_idx_after_theorem is None:
self._content_till_after_theorem_stmt = content_until_after_theorem.strip()
assert self._content_till_after_theorem_stmt.endswith(':='), "Content till last theorem statement should not end with ':='"
# Check if we should follow steps as it is or not
follow_steps = self.main_file is not None and self.associated_file() == self.main_file
if follow_steps and \
thm.proof is not None and \
not SimpleLean4SyncExecutor.theorem_has_by_match.match(thm.proof):
msg = "Could not find the first tactic after the theorem" + \
f" only tactic based proofs are supported. Theorem: '{theorem}' on line {line_num}, file: '{self.main_file}'" + \
" is probably not followed by any tactic based proof." + \
" All tactics parsed till now:\n" + \
"\n".join([f"L {t.line}, C {t.column}: {t.text}" for t in all_tactics_till_now]) + \
f"{thm.proof}" + \
"\n^^^ Cannot see tactics for the theorem."
raise NotImplementedError(msg)
start_tactic = all_tactics_till_now[first_idx_after_theorem]
tactic_start_line = start_tactic.line
tactic_start_col = start_tactic.column
# It is safe to assume that this is a tactic based proof so we can extract steps or change the proof steps going forward
lines_in_theorem_text = theorem_text.splitlines()
tactic_start_line = line_num + len(lines_in_theorem_text) - 1
assert tactic_start_line > 0, "Tactic start line should be greater than 0"
content_until_after_theorem = "\n".join(lines[:tactic_start_line - 1] + [lines[tactic_start_line - 1][:tactic_start_col]])
self._content_till_after_theorem_stmt = content_until_after_theorem
self._content_till_after_theorem_stmt = self._content_till_after_theorem_stmt.strip()
assert self._content_till_after_theorem_stmt.endswith(':='), "Content till last theorem statement should end with ':='"
content_until_before_theorem = "\n".join(lines[:line_num - 1])
self._content_till_last_theorem_stmt = content_until_before_theorem
theorem_stmt = "\n".join(lines[line_num - 1:tactic_start_line - 1] + [lines[tactic_start_line - 1][:tactic_start_col]])
theorem_stmt = theorem_stmt.strip()
self._last_theorem = (given_theorem_name, theorem_stmt, theorem_stmt)
self._last_theorem = (given_theorem_name, theorem_text, theorem_text)
self._theorem_started = True
self._lines_executed.extend(lines[line_num - 1:tactic_start_line - 1] + [lines[tactic_start_line - 1][:tactic_start_col]])
self._lines_executed.extend(lines_in_theorem_text)
self._run_stmt_on_lean_server(tactic_start_line, "by", theorem_started=True)
self._lines_executed.append('by')
# Reset the iterator to the line of the theorem
Expand All @@ -674,6 +714,33 @@ def _parse_proof_context(self, proof_goals: list) -> ProofContext:
else:
return ProofContext(goals, [], [], [])

def _get_temp_cache_dir(self) -> str:
temp_cache = gettempdir()
list_dir = []
for d in os.listdir(temp_cache):
if d.startswith("itp_interface_lean4_cache_"):
list_dir.append(d)
# Create a new cache directory
unique_dir_name = f"itp_interface_lean4_cache_{max(0, len(list_dir) - 1)}"
temp_cache_dir = os.path.join(temp_cache, unique_dir_name)
os.makedirs(temp_cache_dir, exist_ok=True)
return temp_cache_dir

def _get_file_path_in_cache(self) -> str | None:
temp_cache_dir = self._get_temp_cache_dir()
file_path = self.associated_file()
if file_path is None:
return None
else:
# look for the last time file was modified
file_mtime = os.path.getmtime(file_path)
file_mtime_str = "".join(str(file_mtime).split('.'))
# Convert file path to base64 and use that as the file name
fp_encoded = base64.urlsafe_b64encode(file_path.encode()).decode()
fp_encoded = f"b64_{fp_encoded}"
file_name_with_timestamp = f"{fp_encoded}_{file_mtime_str}.json"
return os.path.join(temp_cache_dir, file_name_with_timestamp)

def validate_proof(self, timeout_sec: int = 30, keep_temp_file: bool = True) -> typing.Dict[str, typing.Any]:
"""
Validate the current proof state by running 'lake lean' on a temporary file.
Expand Down Expand Up @@ -875,7 +942,7 @@ def get_all_theorems_in_file(file_path: str, use_cache: bool=False) -> List[Theo
open_namespaces = []
with open(file_path, "r") as f:
file_content = f.read()
line_by_line_reader = LeanLineByLineReader(file_content=file_content, remove_comments=True, no_strip=True)
line_by_line_reader = LeanLineByLineReader(file_content=file_content, remove_comments=False, no_strip=True)
all_stmts = list(line_by_line_reader.instruction_step_generator())
line_positions = [0] + [len(stmt) + 1 for stmt in all_stmts]
# Cumulative sum of the line positions
Expand Down
Loading