Skip to content

Commit d75fcbe

Browse files
dcramercodex
andcommitted
fix(slack): Harden streamed continuation edges
Keep streamed Slack replies stable when code fences hit the continuation boundary. Reserve enough room for the streamed continuation suffix up front and avoid skipping normalization when the non-streamed path falls back to raw reply text. Add regression coverage for the boundary fence case and whitespace fallback. Co-Authored-By: Codex GPT-5.4 <noreply@openai.com>
1 parent a2d92e8 commit d75fcbe

3 files changed

Lines changed: 98 additions & 5 deletions

File tree

packages/junior/src/chat/runtime/reply-executor.ts

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -14,8 +14,8 @@ import {
1414
} from "@/chat/logging";
1515
import {
1616
buildSlackOutputMessage,
17-
getSlackContinuationBudget,
1817
getSlackInterruptionMarker,
18+
getSlackStreamingContinuationBudget,
1919
splitSlackReplyText,
2020
takeSlackContinuationPrefix,
2121
} from "@/chat/slack/output";
@@ -291,7 +291,7 @@ export function createReplyToThread(deps: ReplyExecutorDeps) {
291291
let overflowText = "";
292292
let streamOverflowed = false;
293293
let beforeFirstResponsePostCalled = false;
294-
const continuationBudget = getSlackContinuationBudget();
294+
const continuationBudget = getSlackStreamingContinuationBudget();
295295
const normalizeStreamDelta = createSlackStreamDeltaNormalizer();
296296
const beforeFirstResponsePost = async (): Promise<void> => {
297297
if (beforeFirstResponsePostCalled) {
@@ -556,8 +556,10 @@ export function createReplyToThread(deps: ReplyExecutorDeps) {
556556
// completed after the visible reply has been accepted by Slack.
557557
if (shouldPostThreadReply) {
558558
if (!streamedReplyPromise) {
559-
const postChunks =
560-
replyTextChunks.length > 0 ? replyTextChunks : [reply.text];
559+
const hasNormalizedPostChunks = replyTextChunks.length > 0;
560+
const postChunks = hasNormalizedPostChunks
561+
? replyTextChunks
562+
: [reply.text];
561563
let sent: SentMessage | undefined;
562564
for (const [index, chunk] of postChunks.entries()) {
563565
sent = await postThreadReply(
@@ -566,7 +568,7 @@ export function createReplyToThread(deps: ReplyExecutorDeps) {
566568
index === 0 && resolvedAttachFiles === "inline"
567569
? replyFiles
568570
: undefined,
569-
{ normalized: true },
571+
{ normalized: hasNormalizedPostChunks },
570572
),
571573
index === 0 ? "thread_reply" : "thread_reply_continuation",
572574
);

packages/junior/src/chat/slack/output.ts

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ const MAX_INLINE_CHARS = 2200;
55
const MAX_INLINE_LINES = 45;
66
const CONTINUED_MARKER = "\n\n[Continued below]";
77
const INTERRUPTED_MARKER = "\n\n[Response interrupted before completion]";
8+
const STREAMING_FENCE_CLOSE_GUARD = "\n```";
89

910
/** Insert blank lines between content blocks so Slack renders them with visual separation. */
1011
export function ensureBlockSpacing(text: string): string {
@@ -372,6 +373,19 @@ export function getSlackContinuationBudget(): {
372373
return reserveInlineBudgetForSuffix(CONTINUED_MARKER);
373374
}
374375

376+
/**
377+
* Reserve enough inline budget for streamed continuations, including the
378+
* close fence we may need to append once overflow is detected mid-code block.
379+
*/
380+
export function getSlackStreamingContinuationBudget(): {
381+
maxChars: number;
382+
maxLines: number;
383+
} {
384+
return reserveInlineBudgetForSuffix(
385+
`${STREAMING_FENCE_CLOSE_GUARD}${CONTINUED_MARKER}`,
386+
);
387+
}
388+
375389
/** Normalize text for Slack and wrap it as a PostableMessage with optional file attachments. */
376390
export function buildSlackOutputMessage(
377391
text: string,

packages/junior/tests/integration/slack/streaming-reply-behavior.test.ts

Lines changed: 77 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ import { describe, expect, it } from "vitest";
33
import {
44
getSlackContinuationMarker,
55
getSlackInterruptionMarker,
6+
slackOutputPolicy,
67
} from "@/chat/slack/output";
78
import { createTestChatRuntime } from "../../fixtures/chat-runtime";
89
import {
@@ -238,6 +239,35 @@ describe("Slack behavior: streaming replies", () => {
238239
);
239240
});
240241

242+
it("normalizes raw non-streamed fallback replies before posting", async () => {
243+
const { slackRuntime } = createTestChatRuntime({
244+
services: {
245+
replyExecutor: {
246+
generateAssistantReply: async () => ({
247+
text: "\r\n\t \r\n",
248+
diagnostics: makeDiagnostics(),
249+
}),
250+
},
251+
},
252+
});
253+
254+
const thread = createTestThread({ id: "slack:C_STREAM:1700006002.750" });
255+
await slackRuntime.handleNewMention(
256+
thread,
257+
createTestMessage({
258+
id: "m-stream-whitespace-fallback",
259+
text: "<@U_APP> whitespace",
260+
isMention: true,
261+
threadId: thread.id,
262+
}),
263+
);
264+
265+
expect(thread.postKinds).toEqual(["value"]);
266+
expect(toPostedText(thread.posts[0])).toBe(
267+
"I couldn't produce a response.",
268+
);
269+
});
270+
241271
it("keeps trailing ack-like text once streaming has started", async () => {
242272
const { slackRuntime } = createTestChatRuntime({
243273
services: {
@@ -398,6 +428,53 @@ describe("Slack behavior: streaming replies", () => {
398428
expect(toPostedText(thread.posts[1])).toMatch(/^```ts\nconst value/);
399429
});
400430

431+
it("does not garble streamed fence continuations near the budget boundary", async () => {
432+
const firstDelta =
433+
"```\n" +
434+
"a".repeat(
435+
slackOutputPolicy.maxInlineChars -
436+
getSlackContinuationMarker().length -
437+
1 -
438+
"```\n".length,
439+
);
440+
const tail = "bcdef\n```";
441+
const fullReply = `${firstDelta}${tail}`;
442+
const { slackRuntime } = createTestChatRuntime({
443+
services: {
444+
replyExecutor: {
445+
generateAssistantReply: async (_prompt, context) => {
446+
await context?.onTextDelta?.(firstDelta);
447+
await context?.onTextDelta?.(tail);
448+
return {
449+
text: fullReply,
450+
diagnostics: makeDiagnostics(),
451+
};
452+
},
453+
},
454+
},
455+
});
456+
457+
const thread = createTestThread({ id: "slack:C_STREAM:1700006005.750" });
458+
await slackRuntime.handleNewMention(
459+
thread,
460+
createTestMessage({
461+
id: "m-stream-6-code-boundary",
462+
text: "<@U_APP> show edge code",
463+
isMention: true,
464+
threadId: thread.id,
465+
}),
466+
);
467+
468+
expect(thread.postKinds).toEqual(["stream", "value"]);
469+
const firstPost = String(thread.posts[0]);
470+
const secondPost = toPostedText(thread.posts[1]);
471+
const continuationSuffix = `\n\`\`\`${getSlackContinuationMarker()}`;
472+
473+
expect(firstPost.endsWith(continuationSuffix)).toBe(true);
474+
expect(secondPost.startsWith("```\n")).toBe(true);
475+
expect(secondPost).toContain("bcdef\n```");
476+
});
477+
401478
it("posts an interruption notice when a streamed reply ends in provider error", async () => {
402479
const { slackRuntime } = createTestChatRuntime({
403480
services: {

0 commit comments

Comments
 (0)