diff --git a/src/minisweagent/models/litellm_model.py b/src/minisweagent/models/litellm_model.py index 483db576..3b1b7780 100644 --- a/src/minisweagent/models/litellm_model.py +++ b/src/minisweagent/models/litellm_model.py @@ -19,7 +19,7 @@ from tenacity import ( before_sleep_log, retry, - retry_if_not_exception_type, + retry_if_exception, stop_after_attempt, wait_exponential, ) @@ -293,6 +293,64 @@ def _merge_completion_kwargs( return filtered +# --------------------------------------------------------------------------- +# Retry policy +# --------------------------------------------------------------------------- +# +# The core42 SaFE proxy that fronts the gateway occasionally returns a 401 whose +# body is ``Client is not connected to the query engine, you must call +# connect()``. Despite the 401 status this is a *transient proxy-side* fault +# (the proxy's upstream connection dropped mid-burst), not a credential failure: +# the very next identical call succeeds. LiteLLM surfaces it as +# ``litellm.exceptions.AuthenticationError`` — which, together with its parent +# ``APIError``, sat in the no-retry exclusion list, so a single blip aborted the +# whole optimization round. We special-case this signature so it is retried with +# backoff, while a genuine bad-key 401 (any other AuthenticationError text) +# stays fatal so we don't spin for minutes on an actually-invalid credential. + +_TRANSIENT_PROXY_AUTH_SIGNATURES: tuple[str, ...] = ( + "not connected to the query engine", + "you must call connect", + "call connect()", +) + +# Genuinely fatal — never worth a retry (mirrors the historical exclusion set, +# minus AuthenticationError/APIError which are now decided by message below). +_NO_RETRY_EXC: tuple[type[BaseException], ...] = ( + litellm.exceptions.UnsupportedParamsError, + litellm.exceptions.NotFoundError, + litellm.exceptions.PermissionDeniedError, + litellm.exceptions.ContextWindowExceededError, + KeyboardInterrupt, +) + + +def _is_transient_proxy_auth_error(exc: BaseException) -> bool: + """True for the transient core42 proxy "query engine not connected" 401.""" + if not isinstance(exc, litellm.exceptions.AuthenticationError): + return False + text = (getattr(exc, "message", "") or str(exc)).lower() + return any(sig in text for sig in _TRANSIENT_PROXY_AUTH_SIGNATURES) + + +def should_retry_llm_error(exc: BaseException) -> bool: + """Tenacity predicate: retry transient faults, abort on genuine ones. + + Order matters — the transient proxy 401 is checked *before* the generic + auth/APIError exclusion so it is retried even though it is an + ``AuthenticationError`` (subclass of ``APIError``). + """ + if _is_transient_proxy_auth_error(exc): + return True + if isinstance(exc, _NO_RETRY_EXC): + return False + # A real (non-transient) auth failure or any other APIError is fatal. + if isinstance(exc, (litellm.exceptions.AuthenticationError, litellm.exceptions.APIError)): + return False + # Everything else (timeouts, rate limits, transient network errors) → retry. + return True + + class LitellmModel: """Query models through LiteLLM with GEAK-compatible tool and cost accounting.""" @@ -330,17 +388,7 @@ def set_tools(self, tools: list[dict[str, Any]]) -> None: stop=stop_after_attempt(int(os.getenv("MSWEA_MODEL_RETRY_STOP_AFTER_ATTEMPT", "10"))), wait=wait_exponential(multiplier=1, min=4, max=60), before_sleep=before_sleep_log(logger, logging.WARNING), - retry=retry_if_not_exception_type( - ( - litellm.exceptions.UnsupportedParamsError, - litellm.exceptions.NotFoundError, - litellm.exceptions.PermissionDeniedError, - litellm.exceptions.ContextWindowExceededError, - litellm.exceptions.APIError, - litellm.exceptions.AuthenticationError, - KeyboardInterrupt, - ) - ), + retry=retry_if_exception(should_retry_llm_error), ) def _query(self, messages: list[dict[str, Any]], **kwargs: Any) -> Any: filtered = _merge_completion_kwargs(self.config, kwargs) diff --git a/src/minisweagent/run/mini.py b/src/minisweagent/run/mini.py index 302eeb04..02d23dae 100644 --- a/src/minisweagent/run/mini.py +++ b/src/minisweagent/run/mini.py @@ -354,13 +354,31 @@ def main( if enabled is False: disabled_tools.append(tool_name) + # Build acceleration for CK/.cu harness compiles: cap parallel compile jobs and + # route the compiler through ccache (when present) so repeated candidate rebuilds + # are incremental instead of cold multi-hour recompiles. setdefault => never + # overrides an operator-provided value. + import shutil + os.environ.setdefault("MAX_JOBS", str(min((os.cpu_count() or 8), 32))) + if shutil.which("ccache"): + for _v in ("CMAKE_C_COMPILER_LAUNCHER", "CMAKE_CXX_COMPILER_LAUNCHER", "CMAKE_HIP_COMPILER_LAUNCHER"): + os.environ.setdefault(_v, "ccache") + # RAG MCP toggle: disable RAG tools when rag is not enabled. RAG is # best-effort — if it is enabled but the package or index cannot be # provisioned (offline node, pip failure, build error), degrade to # RAG-disabled and continue the optimization run instead of aborting the # whole agent. Previously any provisioning failure raised and killed the # run, so an environment hiccup wasted the entire kernel attempt (GH #316). + # NOTE: the prior fix only caught EXCEPTIONS — an unbounded pip install / index + # build that HANGS (no exception, just blocks) still consumed the whole per-kernel + # budget (observed: TimeoutExpired after 7800s). The subprocess calls below now + # carry timeouts so a hang raises TimeoutExpired -> caught here -> degrade. Set + # GEAK_DISABLE_RAG=1 to skip the optional RAG setup entirely. rag_enabled = tools_cfg.get("rag", False) + if rag_enabled and os.environ.get("GEAK_DISABLE_RAG", "").strip() == "1": + logger.info("GEAK_DISABLE_RAG=1: skipping optional RAG tools.") + rag_enabled = False rag_failed = False if rag_enabled: try: @@ -374,6 +392,7 @@ def main( result = subprocess.run( [sys.executable, "-m", "pip", "install", "-e", str(_rag_mcp_path)], capture_output=True, text=True, + timeout=int(os.environ.get("GEAK_RAG_INSTALL_TIMEOUT", "180")), ) if result.returncode != 0: raise RuntimeError( @@ -398,6 +417,7 @@ def main( result = subprocess.run( [sys.executable, str(_build_script), "--force"], capture_output=True, text=True, + timeout=int(os.environ.get("GEAK_RAG_INDEX_TIMEOUT", "600")), ) if result.returncode != 0: raise RuntimeError( diff --git a/src/minisweagent/run/postprocess/evaluation.py b/src/minisweagent/run/postprocess/evaluation.py index 46dfe5fd..7a78b63a 100644 --- a/src/minisweagent/run/postprocess/evaluation.py +++ b/src/minisweagent/run/postprocess/evaluation.py @@ -749,28 +749,39 @@ def run_correctness_and_benchmark( logger.warning("No CORRECTNESS commands found in COMMANDMENT") # --- GEAK_VERIFY_IN_LOOP: trust the in-loop full-set verified benchmark --- - # When the subagent's `--benchmark` already runs the FULL weighted shape set - # and emits GEAK_RESULT_SPEEDUP (see kernel_languages/contract.py + - # subagents/preprocess/harness-generator/SUBAGENT.yaml), the post-round - # FULL_BENCHMARK below is a redundant re-timing that ALSO times out on heavy - # CK/.cu rebuilds (GEAK_BENCH_TIMEOUT). Skip the FB subprocess, but KEEP the - # clean-worktree CORRECTNESS above — it is the only worktree-bypass guard - # (see contract.py "always ~1.00x" note). Adopt the already-trusted in-loop - # speedup as the verified value so select_best_verified_round_evaluation can - # rank it. Default-off => byte-identical to current behaviour. - if os.environ.get("GEAK_VERIFY_IN_LOOP", "").strip() == "1": + # The subagent's `--benchmark` runs the SAME full config set as + # `--full-benchmark` (the harness contract — see SUBAGENT.yaml + contract.py; + # `_cap(ALL_CONFIGS)`, uncapped under GEAK_MAX_BENCHMARK_SHAPES=0), and emits + # GEAK_RESULT_SPEEDUP. So the post-round FULL_BENCHMARK below is a redundant + # re-timing of the identical shapes that also times out on heavy CK/.cu + # rebuilds (GEAK_BENCH_TIMEOUT). Adopt the already-trusted in-loop speedup as + # the verified value (so select_best_verified_round_evaluation can rank it) + # and skip the FB subprocess — while KEEPING the clean-worktree CORRECTNESS + # above, the only worktree-bypass guard (see contract.py "always ~1.00x"). + # + # DEFAULT-ON: since `--benchmark == --full-benchmark` by contract, re-running + # FB measures nothing new. Opt back in to the separate FB pass with + # GEAK_VERIFY_IN_LOOP=0. The downstream HL paired same-config A/B remains the + # real E2E arbiter regardless, so even an optimistic micro number is caught + # there (NEEDS_REVIEW / no promotion) rather than silently shipped. + # + # Guard: only adopt-and-skip when we actually have a usable in-loop number. + # If the in-loop benchmark produced no GEAK_RESULT_SPEEDUP, fall through and + # run the post-round FULL_BENCHMARK so the round can still earn a value. + _verify_in_loop = os.environ.get("GEAK_VERIFY_IN_LOOP", "1").strip().lower() not in ("0", "false", "no") + _in_loop_speedup = round_eval.get("benchmark_speedup") + if _verify_in_loop and isinstance(_in_loop_speedup, (int, float)): if round_eval.get("status") == "correctness_failed": return - in_loop = round_eval.get("benchmark_speedup") round_eval["full_benchmark"] = { - "verified_speedup": float(in_loop) if isinstance(in_loop, (int, float)) else None, + "verified_speedup": float(_in_loop_speedup), "candidate_ms": round_eval.get("candidate_shape_latency_ms"), "baseline_ms": round_eval.get("baseline_shape_latency_ms"), "success": True, "source": "in_loop_full_benchmark", } logger.info( - "GEAK_VERIFY_IN_LOOP=1: skipping redundant post-round FULL_BENCHMARK; " + "GEAK_VERIFY_IN_LOOP (default-on): skipping redundant post-round FULL_BENCHMARK; " "adopting in-loop full-set verified_speedup=%s " "(correctness re-checked in clean worktree above).", round_eval["full_benchmark"]["verified_speedup"], diff --git a/src/minisweagent/run/preprocess_v3/baseline.py b/src/minisweagent/run/preprocess_v3/baseline.py index 7b02e9fb..7e9db428 100644 --- a/src/minisweagent/run/preprocess_v3/baseline.py +++ b/src/minisweagent/run/preprocess_v3/baseline.py @@ -660,9 +660,20 @@ def _invoke_profiler_mcp( } if workdir is not None: kwargs["workdir"] = workdir - with ThreadPoolExecutor(max_workers=1) as pool: + # Do NOT use ``with ThreadPoolExecutor(...) as pool``: its ``__exit__`` + # calls ``shutdown(wait=True)``, which JOINS the worker thread. When the + # profiler hangs (e.g. ROCprofiler-SDK / LD_PRELOAD contention on a busy + # host), ``future.result(timeout=...)`` raises on time but the implicit + # join then blocks forever on the still-running worker — defeating the + # timeout and starving the whole preprocess budget. Manage the pool + # explicitly and abandon the zombie thread with ``wait=False`` so the + # timeout is actually honored and we fall through to a profile-less run. + pool = ThreadPoolExecutor(max_workers=1) + try: future = pool.submit(profile_fn, **kwargs) return future.result(timeout=_PROFILE_TIMEOUT_S) + finally: + pool.shutdown(wait=False, cancel_futures=True) except FuturesTimeoutError: logger.warning("profiler-mcp timed out after %ds", _PROFILE_TIMEOUT_S) return None