Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 27 additions & 6 deletions packages/junior/src/chat/runtime/processing-reaction.ts
Original file line number Diff line number Diff line change
Expand Up @@ -61,10 +61,31 @@ export async function startSlackProcessingReaction(args: {
return noProcessingReaction;
}

return startSlackProcessingReactionForMessage({
channelId,
timestamp: messageTs,
logException: args.logException,
logContext: args.logContext,
});
}

/** Start Junior's automatic Slack processing reaction for a known Slack message. */
export async function startSlackProcessingReactionForMessage(args: {
channelId: string;
timestamp: string;
logException: (
error: unknown,
eventName: string,
context?: Record<string, unknown>,
attributes?: Record<string, unknown>,
body?: string,
) => string | undefined;
logContext: Record<string, unknown>;
}): Promise<ProcessingReactionSession> {
try {
await addReactionToMessage({
channelId,
timestamp: messageTs,
channelId: args.channelId,
timestamp: args.timestamp,
emoji: PROCESSING_REACTION_EMOJI,
});
} catch (error) {
Expand All @@ -74,7 +95,7 @@ export async function startSlackProcessingReaction(args: {
args.logContext,
{
"app.slack.action": "reactions.add",
"messaging.message.id": messageTs,
"messaging.message.id": args.timestamp,
...getSlackErrorObservabilityAttributes(error),
},
"Failed to add Slack processing reaction",
Expand All @@ -94,8 +115,8 @@ export async function startSlackProcessingReaction(args: {

try {
await removeReactionFromMessage({
channelId,
timestamp: messageTs,
channelId: args.channelId,
timestamp: args.timestamp,
emoji: PROCESSING_REACTION_EMOJI,
});
} catch (error) {
Expand All @@ -105,7 +126,7 @@ export async function startSlackProcessingReaction(args: {
args.logContext,
{
"app.slack.action": "reactions.remove",
"messaging.message.id": messageTs,
"messaging.message.id": args.timestamp,
...getSlackErrorObservabilityAttributes(error),
},
"Failed to remove Slack processing reaction",
Expand Down
22 changes: 22 additions & 0 deletions packages/junior/src/chat/runtime/reply-executor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ import {
getAgentTurnDiagnosticsAttributes,
} from "@/chat/services/turn-failure-response";
import { buildTurnContinuationResponse } from "@/chat/services/turn-continuation-response";
import { buildAuthPauseResponse } from "@/chat/services/auth-pause-response";

export interface ReplyExecutorServices {
generateAssistantReply: typeof generateAssistantReplyImpl;
Expand Down Expand Up @@ -222,6 +223,26 @@ export function createReplyToThread(deps: ReplyExecutorDeps) {
throw error;
}
};
const postAuthPauseNotice = async (): Promise<void> => {
try {
await beforeFirstResponsePost();
await thread.post(
buildSlackOutputMessage(buildAuthPauseResponse()),
);
} catch (error) {
logException(
error,
"slack_auth_pause_notice_post_failed",
turnTraceContext,
{
"app.slack.reply_stage": "thread_reply_auth_pause_notice",
...(messageTs ? { "messaging.message.id": messageTs } : {}),
...getSlackErrorObservabilityAttributes(error),
},
"Failed to post auth pause notice",
);
}
};
const activeTurnId = preparedState.conversation.processing.activeTurnId;
if (conversationId && activeTurnId) {
const resumeRequest =
Expand Down Expand Up @@ -592,6 +613,7 @@ export function createReplyToThread(deps: ReplyExecutorDeps) {
isRetryableTurnError(error, "mcp_auth_resume") ||
isRetryableTurnError(error, "plugin_auth_resume")
) {
await postAuthPauseNotice();
completeAuthPauseTurn({
conversation: preparedState.conversation,
sessionId: error.metadata?.sessionId ?? turnId,
Expand Down
27 changes: 26 additions & 1 deletion packages/junior/src/chat/runtime/slack-resume.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,11 @@ import {
} from "@/chat/slack/reply";
import { postSlackMessage as postSlackApiMessage } from "@/chat/slack/outbound";
import { getStateAdapter } from "@/chat/state/adapter";
import {
startSlackProcessingReactionForMessage,
type ProcessingReactionSession,
} from "@/chat/runtime/processing-reaction";
import { buildAuthPauseResponse } from "@/chat/services/auth-pause-response";

function resolveReplyTimeoutMs(explicitTimeoutMs?: number): number | undefined {
if (typeof explicitTimeoutMs === "number" && explicitTimeoutMs > 0) {
Expand Down Expand Up @@ -55,7 +60,7 @@ async function postSlackMessageBestEffort(
text,
});
} catch {
// The connected notice should not decide whether the resumed turn succeeds.
// Resume-side status notices should not decide whether the turn succeeds.
}
}

Expand Down Expand Up @@ -103,6 +108,7 @@ interface ResumeSlackTurnArgs {
messageText: string;
channelId: string;
threadTs: string;
messageTs?: string;
replyContext?: ReplyRequestContext;
lockKey?: string;
initialText?: string;
Expand Down Expand Up @@ -281,10 +287,19 @@ export async function resumeSlackTurn(args: ResumeSlackTurnArgs) {
channelId: args.channelId,
threadTs: args.threadTs,
});
let processingReaction: ProcessingReactionSession | undefined;
let deferredPauseKind: "auth" | "timeout" | undefined;
let deferredPauseHandler: (() => Promise<void>) | undefined;
let deferredFailureHandler: (() => Promise<void>) | undefined;
try {
if (args.messageTs) {
processingReaction = await startSlackProcessingReactionForMessage({
channelId: args.channelId,
timestamp: args.messageTs,
logException,
logContext: { ...getResumeLogContext(args, lockKey) },
});
}
if (args.initialText) {
await postSlackMessageBestEffort(
args.channelId,
Expand Down Expand Up @@ -370,12 +385,20 @@ export async function resumeSlackTurn(args: ResumeSlackTurnArgs) {
};
}
} finally {
await processingReaction?.stop();
await stateAdapter.releaseLock(lock);
}

if (deferredPauseHandler) {
try {
await deferredPauseHandler();
if (deferredPauseKind === "auth") {
await postSlackMessageBestEffort(
args.channelId,
args.threadTs,
buildAuthPauseResponse(),
);
}
if (deferredPauseKind === "timeout") {
await postTurnContinuationNoticeBestEffort({
lockKey,
Expand Down Expand Up @@ -405,6 +428,7 @@ export async function resumeAuthorizedRequest(args: {
messageText: string;
channelId: string;
threadTs: string;
messageTs?: string;
connectedText: string;
replyContext?: ReplyRequestContext;
lockKey?: string;
Expand All @@ -419,6 +443,7 @@ export async function resumeAuthorizedRequest(args: {
messageText: args.messageText,
channelId: args.channelId,
threadTs: args.threadTs,
messageTs: args.messageTs,
replyContext: args.replyContext,
lockKey: args.lockKey,
initialText: args.connectedText,
Expand Down
17 changes: 17 additions & 0 deletions packages/junior/src/chat/runtime/turn-user-message.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,13 @@ import type {
ThreadConversationState,
} from "@/chat/state/conversation";

function normalizeSlackMessageTs(
value: string | undefined,
): string | undefined {
const trimmed = value?.trim();
return trimmed && /^\d+(?:\.\d+)?$/.test(trimmed) ? trimmed : undefined;
}

/** Return the user message for a persisted turn/session, if one exists. */
export function getTurnUserMessage(
conversation: ThreadConversationState,
Expand All @@ -30,6 +37,16 @@ export function getTurnUserMessageId(
return getTurnUserMessage(conversation, sessionId)?.id;
}

/** Return the Slack timestamp for the user message that a resumed turn acts on. */
export function getTurnUserSlackMessageTs(
message: ConversationMessage | undefined,
): string | undefined {
return (
normalizeSlackMessageTs(message?.meta?.slackTs) ??
normalizeSlackMessageTs(message?.id)
);
}

/** Rebuild attachment context for a resumed turn from the persisted user message. */
export function getTurnUserReplyAttachmentContext(
message: ConversationMessage | undefined,
Expand Down
7 changes: 7 additions & 0 deletions packages/junior/src/chat/services/auth-pause-response.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
const AUTH_PAUSE_RESPONSE =
"I need authorization to continue. Check your private link to connect.";

/** Build the visible Slack thread note for an auth-paused turn. */
export function buildAuthPauseResponse(): string {
return AUTH_PAUSE_RESPONSE;
}
14 changes: 9 additions & 5 deletions packages/junior/src/chat/slack/message.ts
Original file line number Diff line number Diff line change
@@ -1,17 +1,21 @@
import type { Message } from "chat";

function isSlackMessageTs(value: string): boolean {
return /^\d+(?:\.\d+)?$/.test(value.trim());
}

/**
* Preserve the native Slack message timestamp when a synthetic message ID is
* used for routing or deduplication.
*/
export function getSlackMessageTs(
message: Pick<Message, "id" | "raw">,
): string {
if (
message.id.endsWith(":message_changed_mention") &&
message.raw &&
typeof message.raw === "object"
) {
if (isSlackMessageTs(message.id)) {
return message.id;
}

if (message.raw && typeof message.raw === "object") {
const ts = (message.raw as Record<string, unknown>).ts;
if (typeof ts === "string" && ts.length > 0) {
return ts;
Expand Down
2 changes: 2 additions & 0 deletions packages/junior/src/handlers/mcp-oauth-callback.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import {
getTurnUserMessage,
getTurnUserReplyAttachmentContext,
getTurnUserMessageId,
getTurnUserSlackMessageTs,
} from "@/chat/runtime/turn-user-message";
import {
buildConversationContext,
Expand Down Expand Up @@ -229,6 +230,7 @@ async function resumeAuthorizedMcpTurn(args: {
messageText: userMessage.text,
channelId: authSession.channelId,
threadTs: authSession.threadTs,
messageTs: getTurnUserSlackMessageTs(userMessage),
lockKey: authSession.conversationId,
connectedText: "",
replyContext: {
Expand Down
9 changes: 6 additions & 3 deletions packages/junior/src/handlers/oauth-callback.ts
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ import {
} from "@/chat/plugins/auth/oauth-request";
import {
getTurnUserMessage,
getTurnUserSlackMessageTs,
getTurnUserReplyAttachmentContext,
} from "@/chat/runtime/turn-user-message";
import {
Expand Down Expand Up @@ -253,6 +254,7 @@ async function resumeCheckpointedOAuthTurn(
messageText: stored.pendingMessage ?? userMessage.text,
channelId: stored.channelId,
threadTs: stored.threadTs,
messageTs: getTurnUserSlackMessageTs(userMessage),
lockKey: stored.resumeConversationId,
initialText: "",
replyContext: {
Expand Down Expand Up @@ -350,16 +352,17 @@ async function resumePendingOAuthMessage(
const conversation = coerceThreadConversationState(
await getPersistedThreadState(threadId),
);
const latestUserMessageId = [...conversation.messages]
const latestUserMessage = [...conversation.messages]
.reverse()
.find((message) => message.role === "user")?.id;
.find((message) => message.role === "user");
const conversationContext = buildConversationContext(conversation, {
excludeMessageId: latestUserMessageId,
excludeMessageId: latestUserMessage?.id,
});
await resumeAuthorizedRequest({
messageText: stored.pendingMessage,
channelId: stored.channelId,
threadTs: stored.threadTs,
messageTs: getTurnUserSlackMessageTs(latestUserMessage),
connectedText: "",
replyContext: {
requester: { userId: stored.userId },
Expand Down
46 changes: 44 additions & 2 deletions packages/junior/tests/integration/mcp-auth-runtime-slack.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -248,6 +248,25 @@ async function mirrorThreadStateToAdapter(thread: TestThread): Promise<void> {
.set(`thread-state:${thread.id}`, thread.getState());
}

function expectProcessingReactionLifecycles(args: {
channel: string;
count: number;
timestamp: string;
}): void {
const call = () =>
expect.objectContaining({
params: expect.objectContaining({
channel: args.channel,
timestamp: args.timestamp,
name: "eyes",
}),
});
const expected = Array.from({ length: args.count }, call);

expect(getCapturedSlackApiCalls("reactions.add")).toEqual(expected);
expect(getCapturedSlackApiCalls("reactions.remove")).toEqual(expected);
}

describe("mcp auth runtime slack integration", () => {
beforeEach(async () => {
resetAgentProbe();
Expand Down Expand Up @@ -322,6 +341,11 @@ describe("mcp auth runtime slack integration", () => {
userId: "U123",
userName: "dcramer",
},
raw: {
channel: "C123",
ts: "1700000000.002",
thread_ts: "1700000000.001",
},
}),
);

Expand All @@ -340,8 +364,17 @@ describe("mcp auth runtime slack integration", () => {
}),
}),
]);
expect(thread.posts).toHaveLength(0);
expect(thread.posts).toEqual([
expect.objectContaining({
markdown: expect.stringContaining("private link"),
}),
]);
expect(getCapturedSlackApiCalls("chat.postMessage")).toHaveLength(0);
expectProcessingReactionLifecycles({
channel: "C123",
timestamp: "1700000000.002",
count: 1,
});

const pendingAuthSession =
await mcpAuthStoreModule.getLatestMcpAuthSessionForUserProvider(
Expand Down Expand Up @@ -479,6 +512,11 @@ describe("mcp auth runtime slack integration", () => {
}),
}),
]);
expectProcessingReactionLifecycles({
channel: "C123",
timestamp: "1700000000.002",
count: 2,
});
});

it("parks a subscribed-thread MCP auth challenge with the same pending-auth state", async () => {
Expand Down Expand Up @@ -541,7 +579,11 @@ describe("mcp auth runtime slack integration", () => {

expect(agentProbe.promptCallCount).toBe(1);
expect(agentProbe.continueCallCount).toBe(0);
expect(thread.posts).toHaveLength(0);
expect(thread.posts).toEqual([
expect.objectContaining({
markdown: expect.stringContaining("private link"),
}),
]);

const pendingCheckpoint =
await turnSessionStoreModule.getAgentTurnSessionCheckpoint(
Expand Down
Loading
Loading