Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions packages/junior-evals/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ Harness override knobs (in `EvalOverrides`):
- `auto_complete_mcp_oauth`: after our app genuinely starts an MCP OAuth flow for the listed providers, the harness immediately completes the fake provider callback.
- `auto_complete_oauth`: after our app genuinely starts a generic OAuth flow for the listed providers, the harness immediately completes the fake provider callback.
- `fail_reply_call`: force a non-retryable reply failure on a specific call.
- `faults.sandbox_bash_stream_interrupts`: inject a fixed number of eval-only sandbox bash stream interruptions so the real agent must recover from failed command results.
- `mock_image_generation`: stub the image-generation HTTP response with a valid image payload while still exercising the real attachment path.
- `plugin_dirs`: load plugin fixtures from eval-local directories without adding workspace packages.
- `reply_texts`: override returned reply text per call.
Expand Down
17 changes: 17 additions & 0 deletions packages/junior-evals/evals/behavior-harness.ts
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,9 @@ export interface EvalOverrides {
auto_complete_oauth?: string[];
enable_test_credentials?: boolean;
fail_reply_call?: number;
faults?: {
sandbox_bash_stream_interrupts?: number;
};
mock_image_generation?: boolean;
plugin_dirs?: string[];
plugin_packages?: string[];
Expand Down Expand Up @@ -397,6 +400,8 @@ const HARNESS_ENV_KEYS = [
"EVAL_TEST_CREDENTIAL_TOKEN",
"JUNIOR_BASE_URL",
"JUNIOR_EXTRA_PLUGIN_ROOTS",
"JUNIOR_EVAL_ENABLE_FAULTS",
"JUNIOR_EVAL_FAULT_SANDBOX_BASH_STREAM_INTERRUPTS",
"JUNIOR_STATE_ADAPTER",
"SLACK_BOT_TOKEN",
] as const;
Expand Down Expand Up @@ -942,6 +947,18 @@ async function setupHarnessEnvironment(
scenario.overrides.test_credential_token;
}
}
const sandboxBashStreamInterrupts =
scenario.overrides?.faults?.sandbox_bash_stream_interrupts;
if (
typeof sandboxBashStreamInterrupts === "number" &&
Number.isFinite(sandboxBashStreamInterrupts) &&
sandboxBashStreamInterrupts > 0
) {
process.env.JUNIOR_EVAL_ENABLE_FAULTS = "1";
process.env.JUNIOR_EVAL_FAULT_SANDBOX_BASH_STREAM_INTERRUPTS = String(
Math.floor(sandboxBashStreamInterrupts),
);
}
process.env.JUNIOR_BASE_URL = "https://junior.example.com";
process.env.JUNIOR_STATE_ADAPTER = "memory";
process.env.JUNIOR_EXTRA_PLUGIN_ROOTS = JSON.stringify(configuredPluginDirs);
Expand Down
33 changes: 33 additions & 0 deletions packages/junior-evals/evals/core/lifecycle-and-resilience.eval.ts
Original file line number Diff line number Diff line change
Expand Up @@ -68,4 +68,37 @@ describeEval("Lifecycle and Resilience", slackEvals, (it) => {
}),
});
});

it("when a sandbox command stream is interrupted, recover and finish the request", async ({
run,
}) => {
await run({
overrides: {
faults: {
sandbox_bash_stream_interrupts: 1,
},
skill_dirs: ["evals/fixtures/skills"],
},
events: [
mention(
"/resilient-working-directory list files in the working directory",
),
],
taskTimeout: 120_000,
criteria: rubric({
contract:
"A transient sandbox command-stream interruption is treated as recoverable tool output, not a terminal assistant failure.",
pass: [
"observed_tool_invocations includes at least two `bash` calls, showing the agent retried after the injected interruption.",
"assistant_posts contains exactly one final reply.",
"The reply includes `Working directory files:` and a fenced list of files from the successful retry.",
],
fail: [
"Do not post a generic assistant failure reply.",
"Do not stop after reporting only the injected stream interruption.",
"Do not mention Sentry event IDs, stack traces, or provider internals.",
],
}),
});
});
});
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
---
name: resilient-working-directory
description: Use for /resilient-working-directory eval requests that verify command interruption recovery.
allowed-tools: bash
---

Generate a short response for `/resilient-working-directory` requests in eval runs.

## Step 1: List Files

Call `bash` with this input:

```json
{ "command": "ls -1", "timeout_ms": 120000, "max_output_chars": 12000 }
```

## Step 2: Recover Once

If the command result has `ok: false` and `stderr` says the command stream ended before the command finished, call the same `bash` command one more time.

## Step 3: Return Result

- If the final command result has `ok: true`, return markdown with:
- `Working directory files:`
- a fenced code block containing `stdout`
- If the final command result has `ok: false`, return markdown with:
- `Working directory files: unavailable`
- `Error:` and `stderr`
2 changes: 1 addition & 1 deletion packages/junior/src/chat/prompt.ts
Original file line number Diff line number Diff line change
Expand Up @@ -438,7 +438,7 @@ const EXECUTION_CONTRACT_RULES = [
const CONVERSATION_RULES = [
"- In thread follow-ups, answer from prior thread context; do not repeat resolved clarifying questions.",
"- Preserve attribution roles from thread context: the requester is the person asking now, which may differ from the original reporter or subject.",
"- On resumed turns, post a brief continuation notice, then the resumed answer as a separate message.",
"- Runtime owns continuation and authorization notices; on resumed turns, answer with the final requested content only.",
];

const SLACK_ACTION_RULES = [
Expand Down
109 changes: 71 additions & 38 deletions packages/junior/src/chat/respond.ts
Original file line number Diff line number Diff line change
Expand Up @@ -81,11 +81,12 @@ import {
toAgentThinkingLevel,
type TurnThinkingSelection,
} from "@/chat/services/turn-thinking-level";
import type { AgentTurnUsage } from "@/chat/usage";
import { hasAgentTurnUsage, type AgentTurnUsage } from "@/chat/usage";
import {
loadTurnCheckpoint,
persistCompletedCheckpoint,
persistAuthPauseCheckpoint,
persistRunningCheckpoint,
persistTimeoutCheckpoint,
} from "@/chat/services/turn-checkpoint";
import { createMcpAuthOrchestration } from "@/chat/services/mcp-auth-orchestration";
Expand Down Expand Up @@ -185,6 +186,16 @@ function trimRouterAttachmentText(text: string): string {
: `${normalized.slice(0, MAX_ROUTER_ATTACHMENT_PREVIEW_CHARS)}...`;
}

function extractSliceUsage(
messages: PiMessage[],
beforeMessageCount: number,
): AgentTurnUsage | undefined {
const usage = extractGenAiUsageSummary(
...messages.slice(beforeMessageCount).filter(isAssistantMessage),
);
return hasAgentTurnUsage(usage) ? usage : undefined;
}

function supportsRouterTextPreview(mediaType: string): boolean {
const baseMediaType = mediaType.split(";", 1)[0]?.trim().toLowerCase();
if (!baseMediaType) {
Expand Down Expand Up @@ -394,6 +405,14 @@ export async function generateAssistantReply(
let timedOut = false;
let turnUsage: AgentTurnUsage | undefined;
let thinkingSelection: TurnThinkingSelection | undefined;
const checkpointLogContext = {
threadId: context.correlation?.threadId,
requesterId: context.correlation?.requesterId,
channelId: context.correlation?.channelId,
runId: context.correlation?.runId,
assistantUserName: botConfig.userName,
modelId: botConfig.modelId,
};

const getSandboxMetadata = () =>
sandboxExecutor
Expand Down Expand Up @@ -917,8 +936,31 @@ export async function generateAssistantReply(
});
let hasEmittedText = false;
let needsSeparator = false;
const persistSafeBoundary = async (
messages: PiMessage[],
): Promise<void> => {
if (
!checkpointState.canUseTurnSession ||
!sessionConversationId ||
!sessionId
) {
return;
}

await persistRunningCheckpoint({
conversationId: sessionConversationId,
sessionId,
sliceId: currentSliceId,
messages,
loadedSkillNames: loadedSkillNamesForResume,
logContext: checkpointLogContext,
});
};

const unsubscribe = agent.subscribe((event) => {
if (event.type === "turn_end" && event.toolResults.length > 0) {
return persistSafeBoundary([...agent!.state.messages]);
}
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fire-and-forget checkpoint can overwrite timeout checkpoint

Medium Severity

The subscriber callback returns the persistSafeBoundary promise as fire-and-forget, meaning its async I/O runs concurrently with subsequent agent work. If a turn_end event fires and the timeout triggers shortly after, persistTimeoutCheckpoint writes an awaiting_resume state, but the still-in-flight persistRunningCheckpoint from the subscriber can land later and overwrite it with running, silently preventing the continuation from being scheduled. Neither agent.abort() nor unsubscribe() cancels in-flight async operations from prior callbacks.

Additional Locations (1)
Fix in Cursor Fix in Web

Reviewed by Cursor Bugbot for commit a9d6711. Configure here.

if (event.type === "message_start") {
Promise.resolve(context.onAssistantMessageStart?.()).catch((error) => {
logWarn(
Expand Down Expand Up @@ -977,13 +1019,20 @@ export async function generateAssistantReply(
spanContext,
async () => {
let promptResult: unknown;
const freshPromptMessage: PiMessage = {
role: "user",
content: promptContentParts,
timestamp: Date.now(),
} as PiMessage;
if (!resumedFromCheckpoint) {
await persistSafeBoundary([
...agent.state.messages,
freshPromptMessage,
]);
}
const promptPromise = resumedFromCheckpoint
? agent.continue()
: agent.prompt({
role: "user",
content: promptContentParts,
timestamp: Date.now(),
});
: agent.prompt(freshPromptMessage);

let timeoutId: ReturnType<typeof setTimeout> | undefined;
const timeoutPromise = new Promise<never>((_, reject) => {
Expand Down Expand Up @@ -1044,9 +1093,7 @@ export async function generateAssistantReply(
agent.state,
...outputMessages,
);
turnUsage = Object.values(usageSummary).some(
(value) => value !== undefined,
)
turnUsage = hasAgentTurnUsage(usageSummary)
? usageSummary
: undefined;
setSpanAttributes({
Expand Down Expand Up @@ -1082,10 +1129,13 @@ export async function generateAssistantReply(
) {
await persistCompletedCheckpoint({
conversationId: sessionConversationId,
currentDurationMs: Date.now() - replyStartedAtMs,
currentUsage: turnUsage,
sessionId,
sliceId: currentSliceId,
allMessages: agent.state.messages,
loadedSkillNames: activeSkills.map((skill) => skill.name),
logContext: checkpointLogContext,
});
}

Expand Down Expand Up @@ -1114,21 +1164,19 @@ export async function generateAssistantReply(
});
} catch (error) {
if (timedOut && timeoutResumeConversationId && timeoutResumeSessionId) {
turnUsage =
turnUsage ??
extractSliceUsage(timeoutResumeMessages, beforeMessageCount);
const checkpoint = await persistTimeoutCheckpoint({
conversationId: timeoutResumeConversationId,
sessionId: timeoutResumeSessionId,
currentSliceId: timeoutResumeSliceId,
currentDurationMs: Date.now() - replyStartedAtMs,
currentUsage: turnUsage,
messages: timeoutResumeMessages,
loadedSkillNames: loadedSkillNamesForResume,
errorMessage: error instanceof Error ? error.message : String(error),
logContext: {
threadId: context.correlation?.threadId,
requesterId: context.correlation?.requesterId,
channelId: context.correlation?.channelId,
runId: context.correlation?.runId,
assistantUserName: botConfig.userName,
modelId: botConfig.modelId,
},
logContext: checkpointLogContext,
});
if (checkpoint) {
throw new RetryableTurnError(
Expand All @@ -1151,36 +1199,21 @@ export async function generateAssistantReply(
timeoutResumeSessionId
) {
if (!turnUsage && timeoutResumeMessages.length > 0) {
// Match the canonical slice-scoped extraction: sum usage from new
// assistant messages produced during this slice, not the full
// message history (which may include prior slices whose usage was
// already reported in earlier footers).
const fallbackUsage = extractGenAiUsageSummary(
...timeoutResumeMessages
.slice(beforeMessageCount)
.filter(isAssistantMessage),
turnUsage = extractSliceUsage(
timeoutResumeMessages,
beforeMessageCount,
);
turnUsage = Object.values(fallbackUsage).some(
(value) => value !== undefined,
)
? fallbackUsage
: undefined;
}
const nextSliceId = await persistAuthPauseCheckpoint({
conversationId: timeoutResumeConversationId,
sessionId: timeoutResumeSessionId,
currentSliceId: timeoutResumeSliceId,
currentDurationMs: Date.now() - replyStartedAtMs,
currentUsage: turnUsage,
messages: timeoutResumeMessages,
loadedSkillNames: loadedSkillNamesForResume,
errorMessage: error.message,
logContext: {
threadId: context.correlation?.threadId,
requesterId: context.correlation?.requesterId,
channelId: context.correlation?.channelId,
runId: context.correlation?.runId,
assistantUserName: botConfig.userName,
modelId: botConfig.modelId,
},
logContext: checkpointLogContext,
});
throw new RetryableTurnError(
error.kind === "plugin" ? "plugin_auth_resume" : "mcp_auth_resume",
Expand Down
23 changes: 18 additions & 5 deletions packages/junior/src/chat/runtime/reply-executor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -61,12 +61,12 @@ import { buildDeterministicTurnId } from "@/chat/runtime/turn";
import { markTurnCompleted, markTurnFailed } from "@/chat/runtime/turn";
import { startActiveTurn } from "@/chat/runtime/turn";
import { isRedundantReactionAckText } from "@/chat/services/reply-delivery-plan";
import { deleteSlackMessage } from "@/chat/slack/outbound";
import { deleteSlackMessage, postSlackMessage } from "@/chat/slack/outbound";
import {
finalizeFailedTurnReply,
getAgentTurnDiagnosticsAttributes,
} from "@/chat/services/turn-failure-response";
import { buildTurnContinuationResponse } from "@/chat/services/turn-continuation-response";
import { buildSlackTurnContinuationNotice } from "@/chat/slack/turn-continuation-notice";
import { buildAuthPauseResponse } from "@/chat/services/auth-pause-response";
import { maybeApplyProviderDefaultConfigRequest } from "@/chat/services/provider-default-config";

Expand Down Expand Up @@ -232,9 +232,22 @@ export function createReplyToThread(deps: ReplyExecutorDeps) {
const postTurnContinuationNotice = async (): Promise<void> => {
try {
await beforeFirstResponsePost();
await thread.post(
buildSlackOutputMessage(buildTurnContinuationResponse()),
);
const notice = buildSlackTurnContinuationNotice({ conversationId });
const shouldUseSlackFooter =
Boolean(notice.blocks?.length) &&
Boolean(channelId && threadTs) &&
(thread.adapter as { name?: string } | undefined)?.name ===
"slack";
if (shouldUseSlackFooter && channelId && threadTs) {
await postSlackMessage({
channelId,
threadTs,
...notice,
});
return;
}

await thread.post(buildSlackOutputMessage(notice.text));
} catch (error) {
logException(
error,
Expand Down
Loading
Loading