-
Notifications
You must be signed in to change notification settings - Fork 94
Implement global PopenExecutor with tagging for improved process control and probe isolation #541
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from 4 commits
eb604bc
e0545e1
f4bd62d
00b767f
b421f6e
d0262bc
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -79,7 +79,7 @@ | |
| warn_code, | ||
| ) | ||
| from halmos.mapper import BuildOut, DeployAddressMapper | ||
| from halmos.processes import ExecutorRegistry, ShutdownError | ||
| from halmos.processes import ExecutorRegistry, ShutdownError, get_global_executor | ||
| from halmos.sevm import ( | ||
| EMPTY_BALANCE, | ||
| FOUNDRY_CALLER, | ||
|
|
@@ -432,6 +432,7 @@ def setup(ctx: FunctionContext) -> Exec: | |
| path_id=path_id, | ||
| query=query, | ||
| solving_ctx=ctx.solving_ctx, | ||
| tag=ctx.info.sig, | ||
| ) | ||
| solver_output = solve_low_level(path_ctx) | ||
| if solver_output.result != unsat: | ||
|
|
@@ -714,11 +715,14 @@ def _compute_frontier(ctx: ContractContext, depth: int) -> Iterator[Exec]: | |
| msg = f"Assertion failure detected in {fun_info.contract_name}.{fun_info.sig}" | ||
|
|
||
| try: | ||
| # Use a unique tag for this specific probe | ||
| probe_tag = f"probe-{fun_info.contract_name}-{fun_info.name}" | ||
| handler.handle_assertion_violation( | ||
| path_id=path_id, | ||
| ex=post_ex, | ||
| panic_found=panic_found, | ||
| description=msg, | ||
| probe_tag=probe_tag, | ||
| ) | ||
| except ShutdownError: | ||
| if args.debug: | ||
|
|
@@ -817,6 +821,7 @@ def handle_assertion_violation( | |
| ex: Exec, | ||
| panic_found: bool, | ||
| description: str = None, | ||
| probe_tag: str = None, | ||
| ) -> None: | ||
| """ | ||
| Handles a potential assertion violation by solving it in a separate process. | ||
|
|
@@ -871,6 +876,7 @@ def handle_assertion_violation( | |
| path_id=path_id, | ||
| query=query, | ||
| solving_ctx=ctx.solving_ctx, | ||
| tag=probe_tag if probe_tag else ctx.info.sig, | ||
|
||
| ) | ||
|
|
||
| # ShutdownError may be raised here and will be handled by the caller | ||
|
|
@@ -918,7 +924,7 @@ def _solve_end_to_end_callback( | |
| solver_output: SolverOutput = future.result() | ||
| result, model = solver_output.result, solver_output.model | ||
|
|
||
| if ctx.solving_ctx.executor.is_shutdown(): | ||
| if get_global_executor().is_shutdown(): | ||
| # if the thread pool is in the process of shutting down, | ||
| # we want to stop processing remaining models/timeouts/errors, etc. | ||
| return | ||
|
|
@@ -968,8 +974,8 @@ def _solve_end_to_end_callback( | |
|
|
||
| # we have a valid counterexample, so we are eligible for early exit | ||
| if args.early_exit: | ||
| debug(f"Shutting down {ctx.info.name}'s solver executor") | ||
| ctx.solving_ctx.executor.shutdown(wait=False) | ||
| debug(f"Interrupting {ctx.info.name}'s solver queries") | ||
| get_global_executor().interrupt(ctx.info.name) | ||
|
||
| else: | ||
| warn_str = f"Counterexample (potentially invalid): {model}" | ||
| warn_code(COUNTEREXAMPLE_INVALID, warn_str) | ||
|
|
@@ -1046,7 +1052,7 @@ def run_test(ctx: FunctionContext) -> TestResult: | |
| path_id = 0 # default value in case we don't enter the loop body | ||
| for path_id, ex in enumerate(exs): | ||
| # check if early exit is triggered | ||
| if ctx.solving_ctx.executor.is_shutdown(): | ||
| if get_global_executor().is_shutdown(): | ||
| if args.debug: | ||
| print("aborting path exploration, executor has been shutdown") | ||
| break | ||
|
|
@@ -1090,6 +1096,7 @@ def run_test(ctx: FunctionContext) -> TestResult: | |
| path_id=path_id, | ||
| query=ex.path.to_smt2(args), | ||
| solving_ctx=ctx.solving_ctx, | ||
| tag=ctx.info.sig, | ||
| ) | ||
| solver_output = solve_low_level(path_ctx) | ||
| if solver_output.result != unsat: | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -34,6 +34,7 @@ def shutdown_all(self): | |
| class PopenFuture(concurrent.futures.Future): | ||
| cmd: list[str] | ||
| timeout: float | None # in seconds, None means no timeout | ||
| tag: str | None # optional tag for grouping and selective cancellation | ||
|
||
| process: subprocess.Popen | None | ||
| stdout: str | None | ||
| stderr: str | None | ||
|
|
@@ -42,10 +43,13 @@ class PopenFuture(concurrent.futures.Future): | |
| end_time: float | None | ||
| _exception: Exception | None | ||
|
|
||
| def __init__(self, cmd: list[str], timeout: float | None = None): | ||
| def __init__( | ||
| self, cmd: list[str], timeout: float | None = None, tag: str | None = None | ||
| ): | ||
| super().__init__() | ||
| self.cmd = cmd | ||
| self.timeout = timeout | ||
| self.tag = tag | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. we probably want to assert that the tag is not empty
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Added assertion |
||
| self.process = None | ||
| self.stdout = None | ||
| self.stderr = None | ||
|
|
@@ -193,6 +197,24 @@ def submit(self, future: PopenFuture) -> PopenFuture: | |
| future.start() | ||
| return future | ||
|
|
||
| def interrupt(self, tag: str) -> None: | ||
| """Interrupts all futures with the specified tag. | ||
|
|
||
| Args: | ||
| tag: The tag identifying futures to interrupt. | ||
| Futures without tags are not affected. | ||
|
||
| """ | ||
| if not tag: | ||
| return | ||
|
||
|
|
||
| with self._lock: | ||
| # Find all futures with the matching tag and cancel them | ||
| futures_to_cancel = [f for f in self._futures if f.tag == tag] | ||
|
|
||
| # Cancel outside the lock to avoid deadlocks | ||
| for future in futures_to_cancel: | ||
| future.cancel() | ||
|
|
||
| def is_shutdown(self) -> bool: | ||
| return self._shutdown.is_set() | ||
|
|
||
|
|
@@ -228,6 +250,25 @@ def _join(self): | |
| future.result() | ||
|
|
||
|
|
||
| # Global PopenExecutor instance for shared use across all tests and probes | ||
| _global_executor: PopenExecutor | None = None | ||
| _global_executor_lock = threading.Lock() | ||
|
|
||
|
|
||
| def get_global_executor() -> PopenExecutor: | ||
| """Get the global PopenExecutor instance, creating it if necessary.""" | ||
| global _global_executor | ||
|
|
||
| if _global_executor is None: | ||
| with _global_executor_lock: | ||
| if _global_executor is None: | ||
| _global_executor = PopenExecutor() | ||
| # Register with the ExecutorRegistry so it gets shut down properly | ||
| ExecutorRegistry().register(_global_executor) | ||
|
|
||
| return _global_executor | ||
|
||
|
|
||
|
|
||
| def main(): | ||
| with PopenExecutor() as executor: | ||
| # example usage | ||
|
|
@@ -251,7 +292,10 @@ def done_callback(future: PopenFuture): | |
| "echo hello", | ||
| ] | ||
|
|
||
| futures = [PopenFuture(command.split()) for command in commands] | ||
| futures = [ | ||
| PopenFuture(command.split(), tag=f"test-{i}") | ||
| for i, command in enumerate(commands) | ||
| ] | ||
|
|
||
| for future in futures: | ||
| future.add_done_callback(done_callback) | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -19,9 +19,8 @@ | |
| warn, | ||
| ) | ||
| from halmos.processes import ( | ||
| ExecutorRegistry, | ||
| PopenExecutor, | ||
| PopenFuture, | ||
| get_global_executor, | ||
| ) | ||
| from halmos.sevm import Address, Exec, SMTQuery | ||
| from halmos.utils import hexify | ||
|
|
@@ -167,9 +166,6 @@ class SolvingContext: | |
| # directory for dumping solver files | ||
| dump_dir: DumpDirectory | ||
|
|
||
| # shared solver executor for all paths in the same function | ||
| executor: PopenExecutor = field(default_factory=PopenExecutor) | ||
|
|
||
| # list of unsat cores | ||
| unsat_cores: list[list] = field(default_factory=list) | ||
|
|
||
|
|
@@ -268,9 +264,6 @@ def __post_init__(self): | |
| ) | ||
| object.__setattr__(self, "thread_pool", thread_pool) | ||
|
|
||
| # register the solver executor to be shutdown on exit | ||
| ExecutorRegistry().register(solving_ctx.executor) | ||
|
|
||
| def append_unsat_core(self, unsat_core: list[str]) -> None: | ||
| self.solving_ctx.unsat_cores.append(unsat_core) | ||
|
|
||
|
|
@@ -282,6 +275,7 @@ class PathContext: | |
| solving_ctx: SolvingContext | ||
| query: SMTQuery | ||
| is_refined: bool = False | ||
| tag: str | None = None # optional tag for grouping solver queries | ||
|
||
|
|
||
| @property | ||
| def dump_file(self) -> Path: | ||
|
|
@@ -297,6 +291,7 @@ def refine(self) -> "PathContext": | |
| solving_ctx=self.solving_ctx, | ||
| query=refine(self.query), | ||
| is_refined=True, | ||
| tag=self.tag, | ||
| ) | ||
|
|
||
|
|
||
|
|
@@ -499,10 +494,10 @@ def solve_low_level(path_ctx: PathContext) -> SolverOutput: | |
| else args.solver_command | ||
| ) | ||
| cmd_with_file = cmd_list + [smt2_filename] | ||
| future = PopenFuture(cmd_with_file, timeout=timeout_seconds) | ||
| future = PopenFuture(cmd_with_file, timeout=timeout_seconds, tag=path_ctx.tag) | ||
|
|
||
| # starts the subprocess asynchronously | ||
| path_ctx.solving_ctx.executor.submit(future) | ||
| get_global_executor().submit(future) | ||
|
|
||
| # block until the external solver returns, times out, is interrupted, fails, etc. | ||
| try: | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,109 @@ | ||
| # SPDX-License-Identifier: AGPL-3.0 | ||
|
|
||
| import threading | ||
| from unittest.mock import Mock | ||
|
|
||
| from halmos.processes import PopenFuture, get_global_executor | ||
|
|
||
|
|
||
| class TestGlobalExecutor: | ||
|
||
| def test_global_executor_singleton(self): | ||
| """Test that get_global_executor returns the same instance.""" | ||
| executor1 = get_global_executor() | ||
| executor2 = get_global_executor() | ||
|
|
||
| assert ( | ||
| executor1 is executor2 | ||
| ), "get_global_executor should return the same instance" | ||
|
|
||
| def test_global_executor_multithreaded(self): | ||
| """Test that get_global_executor works correctly across threads.""" | ||
| results = [] | ||
|
|
||
| def get_executor(): | ||
| executor = get_global_executor() | ||
| results.append(executor) | ||
|
|
||
| threads = [threading.Thread(target=get_executor) for _ in range(5)] | ||
| for thread in threads: | ||
| thread.start() | ||
| for thread in threads: | ||
| thread.join() | ||
|
|
||
| # All threads should get the same executor instance | ||
| assert len(set(id(executor) for executor in results)) == 1 | ||
|
|
||
| def test_popen_future_with_tag(self): | ||
| """Test that PopenFuture accepts and stores tag parameter.""" | ||
| cmd = ["echo", "hello"] | ||
| tag = "test-tag" | ||
|
|
||
| future = PopenFuture(cmd, tag=tag) | ||
|
|
||
| assert future.cmd == cmd | ||
| assert future.tag == tag | ||
|
|
||
| def test_popen_future_without_tag(self): | ||
| """Test that PopenFuture works without tag parameter.""" | ||
| cmd = ["echo", "hello"] | ||
|
|
||
| future = PopenFuture(cmd) | ||
|
|
||
| assert future.cmd == cmd | ||
| assert future.tag is None | ||
|
||
|
|
||
| def test_interrupt_by_tag(self): | ||
| """Test that interrupt() cancels futures with matching tags.""" | ||
| executor = get_global_executor() | ||
|
|
||
| # Create mock futures with different tags | ||
| future1 = Mock(spec=PopenFuture) | ||
| future1.tag = "tag1" | ||
| future2 = Mock(spec=PopenFuture) | ||
| future2.tag = "tag2" | ||
| future3 = Mock(spec=PopenFuture) | ||
| future3.tag = "tag1" | ||
| future4 = Mock(spec=PopenFuture) | ||
| future4.tag = None | ||
|
|
||
| # Add to executor's futures list | ||
| executor._futures = [future1, future2, future3, future4] | ||
|
|
||
| # Interrupt tag1 | ||
| executor.interrupt("tag1") | ||
|
|
||
| # Check that only futures with tag1 were cancelled | ||
| future1.cancel.assert_called_once() | ||
| future2.cancel.assert_not_called() | ||
| future3.cancel.assert_called_once() | ||
| future4.cancel.assert_not_called() | ||
|
|
||
| def test_interrupt_with_empty_tag(self): | ||
| """Test that interrupt() with empty tag does nothing.""" | ||
| executor = get_global_executor() | ||
|
|
||
| # Create mock future | ||
| future = Mock(spec=PopenFuture) | ||
| future.tag = "some-tag" | ||
| executor._futures = [future] | ||
|
|
||
| # Interrupt with empty tag | ||
| executor.interrupt("") | ||
|
|
||
| # No futures should be cancelled | ||
| future.cancel.assert_not_called() | ||
|
||
|
|
||
| def test_interrupt_nonexistent_tag(self): | ||
| """Test that interrupt() with non-existent tag does nothing.""" | ||
| executor = get_global_executor() | ||
|
|
||
| # Create mock future | ||
| future = Mock(spec=PopenFuture) | ||
| future.tag = "existing-tag" | ||
| executor._futures = [future] | ||
|
|
||
| # Interrupt with non-existent tag | ||
| executor.interrupt("nonexistent-tag") | ||
|
|
||
| # No futures should be cancelled | ||
| future.cancel.assert_not_called() | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
probe_tag: str | None = Nonetagargument, not specifically aprobe_tagjust for probes. It should work the same for probes and regular tests, it's just that the tag is constructed differentlyThere was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Updated to use generic
tag: str | None = Noneparameter instead of probe-specific naming. The API now works the same for probes and regular tests. (d0262bc)