Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 7 additions & 3 deletions agent/src/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import uuid

from models import TaskConfig, TaskType
from shell import log

AGENT_WORKSPACE = os.environ.get("AGENT_WORKSPACE", "/workspace")

Expand Down Expand Up @@ -58,6 +59,7 @@ def resolve_linear_api_token() -> str:
return ""
try:
import boto3
from botocore.exceptions import BotoCoreError, ClientError

region = os.environ.get("AWS_REGION") or os.environ.get("AWS_DEFAULT_REGION")
client = boto3.client("secretsmanager", region_name=region)
Expand All @@ -66,10 +68,12 @@ def resolve_linear_api_token() -> str:
if token:
os.environ["LINEAR_API_TOKEN"] = token
return token
except Exception as e:
except (BotoCoreError, ClientError) as e:
# Never let a Secrets Manager outage crash the agent. The Linear MCP
# will simply fail on first call with a clear auth error.
print(f"[config] resolve_linear_api_token failed: {type(e).__name__}: {e}", flush=True)
# will simply fail on first call with a clear auth error. Narrowed
# to botocore exceptions per Alain's #63 review — broader `except`
# hid genuine bugs in the Secrets Manager call shape.
log("WARN", f"resolve_linear_api_token failed: {type(e).__name__}: {e}")
return ""


Expand Down
206 changes: 203 additions & 3 deletions agent/src/linear_reactions.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@
from __future__ import annotations

import os
import threading
import time
from typing import Any

import requests
Expand Down Expand Up @@ -60,6 +62,44 @@
}
""".strip()

#: Fetch reactions on an issue plus each reaction's emoji + owning user id —
#: enough to filter by viewer (the API-token owner) and emoji on re-runs.
_ISSUE_REACTIONS_QUERY = """
query IssueReactions($issueId: String!) {
issue(id: $issueId) {
reactions {
id
emoji
user { id }
}
}
}
""".strip()

#: Resolve the API-token owner so the sweep only deletes our own reactions
#: and never touches reactions a human added.
_VIEWER_QUERY = """
query Viewer { viewer { id } }
""".strip()

#: Reactions we own and want to clear before a fresh run.
_BGAGENT_EMOJIS = frozenset({EMOJI_STARTED, EMOJI_SUCCESS, EMOJI_FAILURE})

#: Module-level cache of the API-token owner's id. Resolved once per
#: container lifetime (Linear's `viewer { id }` is stable for the token).
_viewer_id_cache: str | None = None

#: Auth-failure circuit breaker. Linear API tokens can be revoked mid-run;
#: without a circuit breaker, every subsequent ``_graphql`` call retries
#: (within its 5s timeout) and floods CloudWatch with WARNs while wasting
#: Linear's quota. After ``_AUTH_FAILURE_THRESHOLD`` consecutive 401/403
#: responses, ``_auth_circuit_open`` flips to True and all later calls
#: short-circuit (return None) without hitting the network. A successful
#: 2xx response resets the counter.
_AUTH_FAILURE_THRESHOLD = 3
_consecutive_auth_failures = 0
_auth_circuit_open = False


def _enabled(channel_source: str, channel_metadata: dict[str, str] | None) -> str | None:
"""Return the Linear issue id if reactions should fire, else None.
Expand All @@ -79,8 +119,16 @@ def _graphql(query: str, variables: dict[str, Any]) -> dict[str, Any] | None:
"""POST a GraphQL query. Return parsed data on success, None on any failure.

Swallows network / auth / schema errors with a WARN log — reactions are
advisory and never gate the pipeline.
advisory and never gate the pipeline. After
``_AUTH_FAILURE_THRESHOLD`` consecutive auth failures (401/403), the
module-level circuit breaker flips open and all later calls short-circuit
without hitting the network. A successful 2xx response resets the counter.
"""
global _consecutive_auth_failures, _auth_circuit_open

if _auth_circuit_open:
return None

token = os.environ.get("LINEAR_API_TOKEN", "")
if not token:
log("WARN", "linear_reactions: LINEAR_API_TOKEN not set; skipping reaction")
Expand All @@ -100,10 +148,29 @@ def _graphql(query: str, variables: dict[str, Any]) -> dict[str, Any] | None:
log("WARN", f"linear_reactions: request failed ({type(e).__name__}): {e}")
return None

if resp.status_code in (401, 403):
_consecutive_auth_failures += 1
if _consecutive_auth_failures >= _AUTH_FAILURE_THRESHOLD and not _auth_circuit_open:
_auth_circuit_open = True
log(
"ERROR",
"linear_reactions: auth circuit OPEN after "
f"{_consecutive_auth_failures} consecutive {resp.status_code}s — "
"API token likely revoked. Suppressing further Linear calls "
"for this container.",
)
else:
log("WARN", f"linear_reactions: HTTP {resp.status_code} from Linear (auth)")
return None

if resp.status_code != 200:
log("WARN", f"linear_reactions: HTTP {resp.status_code} from Linear")
return None

# Successful 2xx — reset the auth failure counter so transient blips don't
# accumulate toward the threshold.
_consecutive_auth_failures = 0

body = resp.json() if resp.content else {}
if body.get("errors"):
log("WARN", f"linear_reactions: GraphQL errors: {body['errors']}")
Expand All @@ -112,18 +179,151 @@ def _graphql(query: str, variables: dict[str, Any]) -> dict[str, Any] | None:
return body.get("data") or {}


def _get_viewer_id() -> str | None:
"""Return the API-token owner's user id, cached for the container lifetime.

Used by ``_sweep_stale_reactions`` to scope deletes to bgagent-owned
reactions only — without this filter, a re-run would also wipe any 👀 / ✅
/ ❌ reactions a human user happened to add for unrelated reasons.
"""
global _viewer_id_cache
if _viewer_id_cache:
return _viewer_id_cache
data = _graphql(_VIEWER_QUERY, {})
if not data:
return None
viewer_id = (data.get("viewer") or {}).get("id")
if isinstance(viewer_id, str) and viewer_id:
_viewer_id_cache = viewer_id
return viewer_id
return None


def _sweep_stale_reactions(issue_id: str, exclude_id: str | None = None) -> None:
"""Delete bgagent-owned 👀/✅/❌ reactions on the issue.

Called from ``react_task_started`` *after* the new 👀 is posted, so
re-runs (label removed and re-applied; or pre-container ❌ from the
orchestrator/processor followed by a successful retry) don't accumulate
stale terminal markers next to the new 👀. Running after the post
means the user-visible 👀 lands fast even if the sweep's first call
hits cold-connection latency on Linear's API.

The just-posted 👀 must not be deleted by the sweep — pass its id as
``exclude_id`` so the filter skips it.

Best-effort: any failure (viewer fetch, reactions query, individual
reactionDelete) is logged and swallowed — sweep is post-👀 cleanup
and never gates the pipeline.
"""
sweep_start = time.monotonic()
viewer_id = _get_viewer_id()
if not viewer_id:
log("WARN", "linear_reactions: skipping sweep — could not resolve viewer id")
return

viewer_ms = int((time.monotonic() - sweep_start) * 1000)
reactions_start = time.monotonic()
data = _graphql(_ISSUE_REACTIONS_QUERY, {"issueId": issue_id})
reactions_ms = int((time.monotonic() - reactions_start) * 1000)
if not data:
log(
"TASK",
"linear_reactions: sweep skipped (reactions query failed) "
f"viewer={viewer_ms}ms reactions={reactions_ms}ms",
)
return

reactions = (data.get("issue") or {}).get("reactions") or []
deletes = 0
deletes_start = time.monotonic()
for r in reactions:
if not isinstance(r, dict):
continue
emoji = r.get("emoji")
if emoji not in _BGAGENT_EMOJIS:
continue
user = r.get("user") or {}
if user.get("id") != viewer_id:
continue
rid = r.get("id")
if not rid:
continue
if exclude_id is not None and rid == exclude_id:
# The 👀 we just posted — skip, it's the new marker.
continue
_graphql(_DELETE_MUTATION, {"id": rid})
deletes += 1
deletes_ms = int((time.monotonic() - deletes_start) * 1000)
total_ms = int((time.monotonic() - sweep_start) * 1000)
log(
"TASK",
f"linear_reactions: sweep done total={total_ms}ms viewer={viewer_ms}ms "
f"reactions={reactions_ms}ms deletes={deletes}({deletes_ms}ms)",
)


def react_task_started(
channel_source: str,
channel_metadata: dict[str, str] | None,
) -> str | None:
"""Post 👀 on the Linear issue. Return the reaction id (or None on failure/no-op)."""
"""Post 👀 on the Linear issue. Return the reaction id (or None on failure/no-op).

Order matters: the 👀 is posted *first*, then we sweep any stale
bgagent-owned 👀/✅/❌ from prior runs (excluding the one we just
posted). This keeps the user-visible signal fast — if Linear's API
is slow on a cold connection, the 5s timeout falls on a sweep call
and nobody waits, instead of falling on the 👀 post and gating it.

Sweep is best-effort; failure leaves stale terminal markers next to
the new 👀 (the visual-duplication bug we set out to fix), but the
pipeline proceeds unaffected.
"""
issue_id = _enabled(channel_source, channel_metadata)
if not issue_id:
return None
log("TASK", f"linear_reactions: react_task_started ENTER issue_id={issue_id}")
started_at = time.monotonic()

# Post 👀 first — this is the user-visible signal.
create_start = time.monotonic()
data = _graphql(_CREATE_MUTATION, {"issueId": issue_id, "emoji": EMOJI_STARTED})
create_ms = int((time.monotonic() - create_start) * 1000)
if not data:
total_ms = int((time.monotonic() - started_at) * 1000)
log(
"WARN",
"linear_reactions: react_task_started EXIT (👀 failed) "
f"total={total_ms}ms create={create_ms}ms",
)
return None
return (data.get("reactionCreate") or {}).get("reaction", {}).get("id")
rid = (data.get("reactionCreate") or {}).get("reaction", {}).get("id")
eyes_ms = int((time.monotonic() - started_at) * 1000)
log(
"TASK",
f"linear_reactions: 👀 posted reaction_id={rid} create={create_ms}ms "
f"(eyes-visible at +{eyes_ms}ms)",
)

# Sweep prior bgagent reactions in a background thread so the agent
# pipeline doesn't block on Linear API latency. Daemon=True so the
# thread doesn't keep the container alive past the agent's terminal
# status. The sweep filters out the just-posted reaction id so it
# never deletes itself.
threading.Thread(
target=_sweep_stale_reactions,
args=(issue_id,),
kwargs={"exclude_id": rid},
daemon=True,
name="linear-reactions-sweep",
).start()

log(
"TASK",
f"linear_reactions: react_task_started EXIT (sweep dispatched) "
f"total={eyes_ms}ms create={create_ms}ms reaction_id={rid}",
)
return rid


def react_task_finished(
Expand Down
12 changes: 9 additions & 3 deletions agent/src/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -330,6 +330,11 @@ def _on_trace_truncated(max_bytes: int, first_dropped: int) -> None:
)

trajectory.set_truncation_callback(_on_trace_truncated)
# Declared up-front so the crash handler at the bottom of this `try`
# can reference it via a normal name rather than ``locals().get(...)``
# — survives refactors and reads cleanly. Stays None until the Linear
# `react_task_started` call assigns the actual reaction id.
linear_eyes_reaction_id: str | None = None
try:
# Context hydration
with task_span("task.context_hydration"):
Expand Down Expand Up @@ -710,13 +715,14 @@ def _on_trace_truncated(max_bytes: int, first_dropped: int) -> None:
task_state.write_terminal(config.task_id, "FAILED", crash_result.model_dump())
# Best-effort ❌ on the Linear issue so the stale 👀 doesn't linger.
# No-op for non-Linear tasks; network/GraphQL failures are swallowed.
# `linear_eyes_reaction_id` may be unbound if we crashed before the
# start-reaction call — guarded with locals() to stay safe.
# `linear_eyes_reaction_id` is initialized to None at the top of
# this try block, so it's always bound here even if we crashed
# before the start-reaction call assigned a real id.
react_task_finished(
config.channel_source,
config.channel_metadata,
success=False,
started_reaction_id=locals().get("linear_eyes_reaction_id"),
started_reaction_id=linear_eyes_reaction_id,
)
raise

Expand Down
22 changes: 14 additions & 8 deletions agent/src/prompt_builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,17 +98,23 @@ def _channel_prompt_addendum(config: TaskConfig) -> str:
return (
"\n\n## Linear issue progress updates (REQUIRED)\n\n"
f"This task was submitted from Linear issue{issue_ref}. The Linear MCP "
"server is loaded. You MUST perform these three updates; they are part "
"of the task contract, not optional:\n\n"
"server is loaded. You MUST perform these updates; they are part of "
"the task contract, not optional:\n\n"
"1. **At start** — call `mcp__linear-server__save_comment` with a short "
'"🤖 Starting on this issue…" message.\n'
'"🤖 Starting on this issue…" message, then call '
"`mcp__linear-server__save_issue` to transition the issue state. Use "
"`mcp__linear-server__list_issue_statuses` first if you don't already "
"know the state ids; pick the one named `In Progress` (fall back to "
"`Todo` if that state doesn't exist). If the issue is already in "
"`In Progress` or any later state (`In Review`, `Done`), skip the "
"transition. If neither exists, skip — the comment alone is enough. "
"Do not invent state names or loop on `list_issue_statuses`.\n"
"2. **When you open the PR** — call `mcp__linear-server__save_comment` "
"with the PR URL, then call `mcp__linear-server__save_issue` to "
"transition the issue state. Use `mcp__linear-server__list_issue_statuses` "
"first if you don't already know the state ids; pick the one named "
"`In Review` (fall back to `In Progress` if that state doesn't exist). "
"If neither exists, skip the state transition — the PR comment alone "
"is enough. Do not invent state names or loop on `list_issue_statuses`.\n"
"transition the issue state to `In Review` (fall back to `In Progress` "
"if that state doesn't exist). If neither exists, skip the state "
"transition — the PR comment alone is enough. Do not invent state "
"names or loop on `list_issue_statuses`.\n"
"3. **On completion or failure** — call `mcp__linear-server__save_comment` "
"with the final status (succeeded / failed + short reason).\n\n"
"Keep comments concise. Do not mirror the full agent transcript back to "
Expand Down
Loading