Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
72 changes: 60 additions & 12 deletions src/minisweagent/models/litellm_model.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
from tenacity import (
before_sleep_log,
retry,
retry_if_not_exception_type,
retry_if_exception,
stop_after_attempt,
wait_exponential,
)
Expand Down Expand Up @@ -293,6 +293,64 @@ def _merge_completion_kwargs(
return filtered


# ---------------------------------------------------------------------------
# Retry policy
# ---------------------------------------------------------------------------
#
# The core42 SaFE proxy that fronts the gateway occasionally returns a 401 whose
# body is ``Client is not connected to the query engine, you must call
# connect()``. Despite the 401 status this is a *transient proxy-side* fault
# (the proxy's upstream connection dropped mid-burst), not a credential failure:
# the very next identical call succeeds. LiteLLM surfaces it as
# ``litellm.exceptions.AuthenticationError`` — which, together with its parent
# ``APIError``, sat in the no-retry exclusion list, so a single blip aborted the
# whole optimization round. We special-case this signature so it is retried with
# backoff, while a genuine bad-key 401 (any other AuthenticationError text)
# stays fatal so we don't spin for minutes on an actually-invalid credential.

_TRANSIENT_PROXY_AUTH_SIGNATURES: tuple[str, ...] = (
"not connected to the query engine",
"you must call connect",
"call connect()",
)

# Genuinely fatal — never worth a retry (mirrors the historical exclusion set,
# minus AuthenticationError/APIError which are now decided by message below).
_NO_RETRY_EXC: tuple[type[BaseException], ...] = (
litellm.exceptions.UnsupportedParamsError,
litellm.exceptions.NotFoundError,
litellm.exceptions.PermissionDeniedError,
litellm.exceptions.ContextWindowExceededError,
KeyboardInterrupt,
)


def _is_transient_proxy_auth_error(exc: BaseException) -> bool:
"""True for the transient core42 proxy "query engine not connected" 401."""
if not isinstance(exc, litellm.exceptions.AuthenticationError):
return False
text = (getattr(exc, "message", "") or str(exc)).lower()
return any(sig in text for sig in _TRANSIENT_PROXY_AUTH_SIGNATURES)


def should_retry_llm_error(exc: BaseException) -> bool:
"""Tenacity predicate: retry transient faults, abort on genuine ones.

Order matters — the transient proxy 401 is checked *before* the generic
auth/APIError exclusion so it is retried even though it is an
``AuthenticationError`` (subclass of ``APIError``).
"""
if _is_transient_proxy_auth_error(exc):
return True
if isinstance(exc, _NO_RETRY_EXC):
return False
# A real (non-transient) auth failure or any other APIError is fatal.
if isinstance(exc, (litellm.exceptions.AuthenticationError, litellm.exceptions.APIError)):
return False
# Everything else (timeouts, rate limits, transient network errors) → retry.
return True


class LitellmModel:
"""Query models through LiteLLM with GEAK-compatible tool and cost accounting."""

Expand Down Expand Up @@ -330,17 +388,7 @@ def set_tools(self, tools: list[dict[str, Any]]) -> None:
stop=stop_after_attempt(int(os.getenv("MSWEA_MODEL_RETRY_STOP_AFTER_ATTEMPT", "10"))),
wait=wait_exponential(multiplier=1, min=4, max=60),
before_sleep=before_sleep_log(logger, logging.WARNING),
retry=retry_if_not_exception_type(
(
litellm.exceptions.UnsupportedParamsError,
litellm.exceptions.NotFoundError,
litellm.exceptions.PermissionDeniedError,
litellm.exceptions.ContextWindowExceededError,
litellm.exceptions.APIError,
litellm.exceptions.AuthenticationError,
KeyboardInterrupt,
)
),
retry=retry_if_exception(should_retry_llm_error),
)
def _query(self, messages: list[dict[str, Any]], **kwargs: Any) -> Any:
filtered = _merge_completion_kwargs(self.config, kwargs)
Expand Down
20 changes: 20 additions & 0 deletions src/minisweagent/run/mini.py
Original file line number Diff line number Diff line change
Expand Up @@ -354,13 +354,31 @@ def main(
if enabled is False:
disabled_tools.append(tool_name)

# Build acceleration for CK/.cu harness compiles: cap parallel compile jobs and
# route the compiler through ccache (when present) so repeated candidate rebuilds
# are incremental instead of cold multi-hour recompiles. setdefault => never
# overrides an operator-provided value.
import shutil
os.environ.setdefault("MAX_JOBS", str(min((os.cpu_count() or 8), 32)))
if shutil.which("ccache"):
for _v in ("CMAKE_C_COMPILER_LAUNCHER", "CMAKE_CXX_COMPILER_LAUNCHER", "CMAKE_HIP_COMPILER_LAUNCHER"):
os.environ.setdefault(_v, "ccache")

# RAG MCP toggle: disable RAG tools when rag is not enabled. RAG is
# best-effort — if it is enabled but the package or index cannot be
# provisioned (offline node, pip failure, build error), degrade to
# RAG-disabled and continue the optimization run instead of aborting the
# whole agent. Previously any provisioning failure raised and killed the
# run, so an environment hiccup wasted the entire kernel attempt (GH #316).
# NOTE: the prior fix only caught EXCEPTIONS — an unbounded pip install / index
# build that HANGS (no exception, just blocks) still consumed the whole per-kernel
# budget (observed: TimeoutExpired after 7800s). The subprocess calls below now
# carry timeouts so a hang raises TimeoutExpired -> caught here -> degrade. Set
# GEAK_DISABLE_RAG=1 to skip the optional RAG setup entirely.
rag_enabled = tools_cfg.get("rag", False)
if rag_enabled and os.environ.get("GEAK_DISABLE_RAG", "").strip() == "1":
logger.info("GEAK_DISABLE_RAG=1: skipping optional RAG tools.")
rag_enabled = False
rag_failed = False
if rag_enabled:
try:
Expand All @@ -374,6 +392,7 @@ def main(
result = subprocess.run(
[sys.executable, "-m", "pip", "install", "-e", str(_rag_mcp_path)],
capture_output=True, text=True,
timeout=int(os.environ.get("GEAK_RAG_INSTALL_TIMEOUT", "180")),
)
if result.returncode != 0:
raise RuntimeError(
Expand All @@ -398,6 +417,7 @@ def main(
result = subprocess.run(
[sys.executable, str(_build_script), "--force"],
capture_output=True, text=True,
timeout=int(os.environ.get("GEAK_RAG_INDEX_TIMEOUT", "600")),
)
if result.returncode != 0:
raise RuntimeError(
Expand Down
37 changes: 24 additions & 13 deletions src/minisweagent/run/postprocess/evaluation.py
Original file line number Diff line number Diff line change
Expand Up @@ -749,28 +749,39 @@ def run_correctness_and_benchmark(
logger.warning("No CORRECTNESS commands found in COMMANDMENT")

# --- GEAK_VERIFY_IN_LOOP: trust the in-loop full-set verified benchmark ---
# When the subagent's `--benchmark` already runs the FULL weighted shape set
# and emits GEAK_RESULT_SPEEDUP (see kernel_languages/contract.py +
# subagents/preprocess/harness-generator/SUBAGENT.yaml), the post-round
# FULL_BENCHMARK below is a redundant re-timing that ALSO times out on heavy
# CK/.cu rebuilds (GEAK_BENCH_TIMEOUT). Skip the FB subprocess, but KEEP the
# clean-worktree CORRECTNESS above — it is the only worktree-bypass guard
# (see contract.py "always ~1.00x" note). Adopt the already-trusted in-loop
# speedup as the verified value so select_best_verified_round_evaluation can
# rank it. Default-off => byte-identical to current behaviour.
if os.environ.get("GEAK_VERIFY_IN_LOOP", "").strip() == "1":
# The subagent's `--benchmark` runs the SAME full config set as
# `--full-benchmark` (the harness contract — see SUBAGENT.yaml + contract.py;
# `_cap(ALL_CONFIGS)`, uncapped under GEAK_MAX_BENCHMARK_SHAPES=0), and emits
# GEAK_RESULT_SPEEDUP. So the post-round FULL_BENCHMARK below is a redundant
# re-timing of the identical shapes that also times out on heavy CK/.cu
# rebuilds (GEAK_BENCH_TIMEOUT). Adopt the already-trusted in-loop speedup as
# the verified value (so select_best_verified_round_evaluation can rank it)
# and skip the FB subprocess — while KEEPING the clean-worktree CORRECTNESS
# above, the only worktree-bypass guard (see contract.py "always ~1.00x").
#
# DEFAULT-ON: since `--benchmark == --full-benchmark` by contract, re-running
# FB measures nothing new. Opt back in to the separate FB pass with
# GEAK_VERIFY_IN_LOOP=0. The downstream HL paired same-config A/B remains the
# real E2E arbiter regardless, so even an optimistic micro number is caught
# there (NEEDS_REVIEW / no promotion) rather than silently shipped.
#
# Guard: only adopt-and-skip when we actually have a usable in-loop number.
# If the in-loop benchmark produced no GEAK_RESULT_SPEEDUP, fall through and
# run the post-round FULL_BENCHMARK so the round can still earn a value.
_verify_in_loop = os.environ.get("GEAK_VERIFY_IN_LOOP", "1").strip().lower() not in ("0", "false", "no")
_in_loop_speedup = round_eval.get("benchmark_speedup")
if _verify_in_loop and isinstance(_in_loop_speedup, (int, float)):
if round_eval.get("status") == "correctness_failed":
return
in_loop = round_eval.get("benchmark_speedup")
round_eval["full_benchmark"] = {
"verified_speedup": float(in_loop) if isinstance(in_loop, (int, float)) else None,
"verified_speedup": float(_in_loop_speedup),
"candidate_ms": round_eval.get("candidate_shape_latency_ms"),
"baseline_ms": round_eval.get("baseline_shape_latency_ms"),
"success": True,
"source": "in_loop_full_benchmark",
}
logger.info(
"GEAK_VERIFY_IN_LOOP=1: skipping redundant post-round FULL_BENCHMARK; "
"GEAK_VERIFY_IN_LOOP (default-on): skipping redundant post-round FULL_BENCHMARK; "
"adopting in-loop full-set verified_speedup=%s "
"(correctness re-checked in clean worktree above).",
round_eval["full_benchmark"]["verified_speedup"],
Expand Down
13 changes: 12 additions & 1 deletion src/minisweagent/run/preprocess_v3/baseline.py
Original file line number Diff line number Diff line change
Expand Up @@ -660,9 +660,20 @@ def _invoke_profiler_mcp(
}
if workdir is not None:
kwargs["workdir"] = workdir
with ThreadPoolExecutor(max_workers=1) as pool:
# Do NOT use ``with ThreadPoolExecutor(...) as pool``: its ``__exit__``
# calls ``shutdown(wait=True)``, which JOINS the worker thread. When the
# profiler hangs (e.g. ROCprofiler-SDK / LD_PRELOAD contention on a busy
# host), ``future.result(timeout=...)`` raises on time but the implicit
# join then blocks forever on the still-running worker — defeating the
# timeout and starving the whole preprocess budget. Manage the pool
# explicitly and abandon the zombie thread with ``wait=False`` so the
# timeout is actually honored and we fall through to a profile-less run.
pool = ThreadPoolExecutor(max_workers=1)
try:
future = pool.submit(profile_fn, **kwargs)
return future.result(timeout=_PROFILE_TIMEOUT_S)
finally:
pool.shutdown(wait=False, cancel_futures=True)
except FuturesTimeoutError:
logger.warning("profiler-mcp timed out after %ds", _PROFILE_TIMEOUT_S)
return None
Expand Down