Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 24 additions & 17 deletions packages/junior/src/chat/respond.ts
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ import {
toAgentThinkingLevel,
type TurnThinkingSelection,
} from "@/chat/services/turn-thinking-level";
import type { AgentTurnUsage } from "@/chat/usage";
import { hasAgentTurnUsage, type AgentTurnUsage } from "@/chat/usage";
import {
loadTurnCheckpoint,
persistCompletedCheckpoint,
Expand Down Expand Up @@ -179,6 +179,16 @@ function trimRouterAttachmentText(text: string): string {
: `${normalized.slice(0, MAX_ROUTER_ATTACHMENT_PREVIEW_CHARS)}...`;
}

function extractSliceUsage(
messages: PiMessage[],
beforeMessageCount: number,
): AgentTurnUsage | undefined {
const usage = extractGenAiUsageSummary(
...messages.slice(beforeMessageCount).filter(isAssistantMessage),
);
return hasAgentTurnUsage(usage) ? usage : undefined;
}

function supportsRouterTextPreview(mediaType: string): boolean {
const baseMediaType = mediaType.split(";", 1)[0]?.trim().toLowerCase();
if (!baseMediaType) {
Expand Down Expand Up @@ -1033,9 +1043,7 @@ export async function generateAssistantReply(
agent.state,
...outputMessages,
);
turnUsage = Object.values(usageSummary).some(
(value) => value !== undefined,
)
turnUsage = hasAgentTurnUsage(usageSummary)
? usageSummary
: undefined;
setSpanAttributes({
Expand Down Expand Up @@ -1071,6 +1079,8 @@ export async function generateAssistantReply(
) {
await persistCompletedCheckpoint({
conversationId: sessionConversationId,
currentDurationMs: Date.now() - replyStartedAtMs,
currentUsage: turnUsage,
sessionId,
sliceId: currentSliceId,
allMessages: agent.state.messages,
Expand Down Expand Up @@ -1103,10 +1113,15 @@ export async function generateAssistantReply(
});
} catch (error) {
if (timedOut && timeoutResumeConversationId && timeoutResumeSessionId) {
turnUsage =
turnUsage ??
extractSliceUsage(timeoutResumeMessages, beforeMessageCount);
const checkpoint = await persistTimeoutCheckpoint({
conversationId: timeoutResumeConversationId,
sessionId: timeoutResumeSessionId,
currentSliceId: timeoutResumeSliceId,
currentDurationMs: Date.now() - replyStartedAtMs,
currentUsage: turnUsage,
messages: timeoutResumeMessages,
loadedSkillNames: loadedSkillNamesForResume,
errorMessage: error instanceof Error ? error.message : String(error),
Expand Down Expand Up @@ -1140,25 +1155,17 @@ export async function generateAssistantReply(
timeoutResumeSessionId
) {
if (!turnUsage && timeoutResumeMessages.length > 0) {
// Match the canonical slice-scoped extraction: sum usage from new
// assistant messages produced during this slice, not the full
// message history (which may include prior slices whose usage was
// already reported in earlier footers).
const fallbackUsage = extractGenAiUsageSummary(
...timeoutResumeMessages
.slice(beforeMessageCount)
.filter(isAssistantMessage),
turnUsage = extractSliceUsage(
timeoutResumeMessages,
beforeMessageCount,
);
turnUsage = Object.values(fallbackUsage).some(
(value) => value !== undefined,
)
? fallbackUsage
: undefined;
}
const nextSliceId = await persistAuthPauseCheckpoint({
conversationId: timeoutResumeConversationId,
sessionId: timeoutResumeSessionId,
currentSliceId: timeoutResumeSliceId,
currentDurationMs: Date.now() - replyStartedAtMs,
currentUsage: turnUsage,
messages: timeoutResumeMessages,
loadedSkillNames: loadedSkillNamesForResume,
errorMessage: error.message,
Expand Down
29 changes: 24 additions & 5 deletions packages/junior/src/chat/runtime/reply-executor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,10 @@ import {
isVisionEnabled,
} from "@/chat/services/vision-context";
import { createSlackAdapterAssistantStatusSession } from "@/chat/slack/assistant-thread/status";
import { buildSlackReplyFooter } from "@/chat/slack/footer";
import {
buildSlackReplyBlocks,
buildSlackReplyFooter,
} from "@/chat/slack/footer";
import { maybeUpdateAssistantTitle } from "@/chat/slack/assistant-thread/title";
import { appendSlackLegacyAttachmentText } from "@/chat/slack/legacy-attachments";
import { type ThreadArtifactsState } from "@/chat/state/artifacts";
Expand All @@ -61,7 +64,7 @@ import { buildDeterministicTurnId } from "@/chat/runtime/turn";
import { markTurnCompleted, markTurnFailed } from "@/chat/runtime/turn";
import { startActiveTurn } from "@/chat/runtime/turn";
import { isRedundantReactionAckText } from "@/chat/services/reply-delivery-plan";
import { deleteSlackMessage } from "@/chat/slack/outbound";
import { deleteSlackMessage, postSlackMessage } from "@/chat/slack/outbound";
import {
finalizeFailedTurnReply,
getAgentTurnDiagnosticsAttributes,
Expand Down Expand Up @@ -204,9 +207,25 @@ export function createReplyToThread(deps: ReplyExecutorDeps) {
const postTurnContinuationNotice = async (): Promise<void> => {
try {
await beforeFirstResponsePost();
await thread.post(
buildSlackOutputMessage(buildTurnContinuationResponse()),
);
const text = buildTurnContinuationResponse();
const footer = buildSlackReplyFooter({ conversationId });
const shouldUseSlackFooter =
Boolean(footer) &&
Boolean(channelId && threadTs) &&
(thread.adapter as { name?: string } | undefined)?.name ===
"slack";
if (shouldUseSlackFooter && channelId && threadTs) {
const blocks = buildSlackReplyBlocks(text, footer);
await postSlackMessage({
channelId,
threadTs,
text,
...(blocks ? { blocks } : {}),
});
return;
}

await thread.post(buildSlackOutputMessage(text));
} catch (error) {
logException(
error,
Expand Down
46 changes: 42 additions & 4 deletions packages/junior/src/chat/runtime/slack-resume.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,18 @@ import {
createSlackWebApiAssistantStatusSession,
type AssistantStatusSession,
} from "@/chat/slack/assistant-thread/status";
import { buildSlackReplyFooter } from "@/chat/slack/footer";
import {
buildSlackReplyBlocks,
buildSlackReplyFooter,
} from "@/chat/slack/footer";
import {
planSlackReplyPosts,
postSlackApiReplyPosts,
} from "@/chat/slack/reply";
import { postSlackMessage as postSlackApiMessage } from "@/chat/slack/outbound";
import { getStateAdapter } from "@/chat/state/adapter";
import { getAgentTurnSessionCheckpoint } from "@/chat/state/turn-session-store";
import { addAgentTurnUsage } from "@/chat/usage";
import {
startSlackProcessingReactionForMessage,
type ProcessingReactionSession,
Expand Down Expand Up @@ -172,11 +177,18 @@ async function postTurnContinuationNoticeBestEffort(args: {
lockKey: string;
resumeArgs: ResumeSlackTurnArgs;
}): Promise<void> {
const text = buildTurnContinuationResponse();
const footer = buildSlackReplyFooter({
conversationId:
args.resumeArgs.replyContext?.correlation?.conversationId ?? args.lockKey,
});
const blocks = buildSlackReplyBlocks(text, footer);
try {
await postSlackApiMessage({
channelId: args.resumeArgs.channelId,
threadTs: args.resumeArgs.threadTs,
text: buildTurnContinuationResponse(),
text,
...(blocks ? { blocks } : {}),
});
} catch (error) {
logException(
Expand Down Expand Up @@ -258,6 +270,19 @@ function createResumeReplyContext(
};
}

async function getResumeCheckpoint(args: {
conversationId?: string;
sessionId?: string;
}) {
if (!args.conversationId || !args.sessionId) {
return undefined;
}
return await getAgentTurnSessionCheckpoint(
args.conversationId,
args.sessionId,
);
}

/**
* Resume a paused Slack turn under the normal thread lock.
*
Expand Down Expand Up @@ -311,6 +336,10 @@ export async function resumeSlackTurn(args: ResumeSlackTurnArgs) {

const generateReply = args.generateReply ?? generateAssistantReply;
const replyContext = createResumeReplyContext(args, status);
const priorCheckpoint = await getResumeCheckpoint({
conversationId: replyContext.correlation?.conversationId,
sessionId: replyContext.correlation?.turnId,
});
const replyPromise = generateReply(args.messageText, replyContext);
const replyTimeoutMs = resolveReplyTimeoutMs(args.replyTimeoutMs);
let reply =
Expand Down Expand Up @@ -339,9 +368,18 @@ export async function resumeSlackTurn(args: ResumeSlackTurnArgs) {
await status.stop();
const footer = buildSlackReplyFooter({
conversationId: args.replyContext?.correlation?.conversationId ?? lockKey,
durationMs: reply.diagnostics.durationMs,
durationMs:
typeof priorCheckpoint?.cumulativeDurationMs === "number" ||
typeof reply.diagnostics.durationMs === "number"
? (priorCheckpoint?.cumulativeDurationMs ?? 0) +
(reply.diagnostics.durationMs ?? 0)
: undefined,
thinkingLevel: reply.diagnostics.thinkingLevel,
usage: reply.diagnostics.usage,
usage:
addAgentTurnUsage(
priorCheckpoint?.cumulativeUsage,
reply.diagnostics.usage,
) ?? reply.diagnostics.usage,
});
await postSlackApiReplyPosts({
channelId: args.channelId,
Expand Down
15 changes: 8 additions & 7 deletions packages/junior/src/chat/sandbox/sandbox.ts
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import {
missingFileResult,
sliceFileContent,
} from "@/chat/tools/sandbox/read-file";
import { ToolInputError } from "@/chat/tools/execution/tool-input-error";

// Spec: specs/security-policy.md (sandbox isolation, network policy, credential lifecycle)
// Spec: specs/logging/tracing-spec.md (required sandbox span semantics)
Expand Down Expand Up @@ -248,7 +249,7 @@ export function createSandboxExecutor(options?: {
): Promise<SandboxExecutionEnvelope<T>> => {
const filePath = String(rawInput.path ?? "").trim();
if (!filePath) {
throw new Error("path is required");
throw new ToolInputError("path is required");
}
const offset = positiveInteger(rawInput.offset);
const limit = positiveInteger(rawInput.limit);
Expand Down Expand Up @@ -330,7 +331,7 @@ export function createSandboxExecutor(options?: {
): Promise<SandboxExecutionEnvelope<T>> => {
const filePath = String(rawInput.path ?? "").trim();
if (!filePath) {
throw new Error("path is required");
throw new ToolInputError("path is required");
}

const content = String(rawInput.content ?? "");
Expand Down Expand Up @@ -370,10 +371,10 @@ export function createSandboxExecutor(options?: {
): Promise<SandboxExecutionEnvelope<T>> => {
const filePath = String(rawInput.path ?? "").trim();
if (!filePath) {
throw new Error("path is required");
throw new ToolInputError("path is required");
}
if (!Array.isArray(rawInput.edits)) {
throw new Error("edits is required");
throw new ToolInputError("edits is required");
}

logSandboxBootRequest("tool.editFile", {
Expand Down Expand Up @@ -406,7 +407,7 @@ export function createSandboxExecutor(options?: {
): Promise<SandboxExecutionEnvelope<T>> => {
const pattern = String(rawInput.pattern ?? "");
if (!pattern) {
throw new Error("pattern is required");
throw new ToolInputError("pattern is required");
}

logSandboxBootRequest("tool.grep");
Expand Down Expand Up @@ -447,7 +448,7 @@ export function createSandboxExecutor(options?: {
): Promise<SandboxExecutionEnvelope<T>> => {
const pattern = String(rawInput.pattern ?? "");
if (!pattern) {
throw new Error("pattern is required");
throw new ToolInputError("pattern is required");
}

logSandboxBootRequest("tool.findFiles");
Expand Down Expand Up @@ -509,7 +510,7 @@ export function createSandboxExecutor(options?: {

if (params.toolName === "bash") {
if (!bashCommand) {
throw new Error("command is required");
throw new ToolInputError("command is required");
}
if (options?.runBashCustomCommand) {
const custom = await options.runBashCustomCommand(bashCommand);
Expand Down
Loading