Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions contrib/chart/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -305,6 +305,8 @@ postgres:
args:
- "-c"
- "shared_preload_libraries=pg_search,pg_cron"
- "-c"
- "max_connections=500"
auth:
database: ai_v2
username: tempo
Expand Down
102 changes: 63 additions & 39 deletions patches/@chat-adapter__slack@4.30.0.patch
Original file line number Diff line number Diff line change
@@ -1,20 +1,17 @@
diff --git a/dist/index.js b/dist/index.js
index 53a35e4..8fc9ebd 100644
index 53a35e40d7dc03dea57f965325728761467ad7c7..3e63db3d328688a104cf408ff97f8e984b25ba93 100644
--- a/dist/index.js
+++ b/dist/index.js
@@ -31,6 +31,126 @@ import {
@@ -31,6 +31,123 @@ import {
toModalElement,
toPlainText
} from "chat";
+var STREAM_BUFFER_SIZE = 256;
+var STREAM_SEGMENT_LIMIT = 11500;
+var STREAM_CHUNK_LIMIT = 256;
+var STREAM_TASK_LIMIT = 48;
+var STREAM_TASK_LIMIT = 50;
+var STREAM_FENCE_RESERVE = 64;
+var FENCE_PATTERN = /^(`{3,}|~{3,})/;
+function isOpenTaskStatus(status) {
+ return status === "pending" || status === "in_progress";
+}
+var Fence = class {
+ constructor() {
+ this.buffer = "";
Expand Down Expand Up @@ -129,7 +126,7 @@ index 53a35e4..8fc9ebd 100644

// src/cards.ts
import {
@@ -3434,24 +3551,102 @@ var SlackAdapter = class _SlackAdapter {
@@ -3434,26 +3551,122 @@ var SlackAdapter = class _SlackAdapter {
}
this.logger.debug("Slack: starting stream", { channel, threadTs });
const token = await this.getToken();
Expand All @@ -142,11 +139,10 @@ index 53a35e4..8fc9ebd 100644
- task_display_mode: options.taskDisplayMode
- }
+ const createSegment = () => ({
+ cards: 0,
+ hasContent: false,
+ length: 0,
+ openTasks: /* @__PURE__ */ new Set(),
+ stopped: false,
+ visibleTasks: /* @__PURE__ */ new Set(),
+ streamer: this._client.chatStream({
+ channel,
+ thread_ts: threadTs,
Expand All @@ -159,9 +155,12 @@ index 53a35e4..8fc9ebd 100644
+ })
});
+ let current = createSegment();
+ const structured = /* @__PURE__ */ new Set();
+ let lastResult;
let lastAppended = "";
+ let plan;
+ const taskParts = /* @__PURE__ */ new Map();
+ const taskSegments = /* @__PURE__ */ new Map();
+ const fence = new Fence();
const renderer = new StreamingMarkdownRenderer({
wrapTablesForAppend: false
Expand All @@ -183,25 +182,15 @@ index 53a35e4..8fc9ebd 100644
+ current.hasContent = true;
+ current.length += closing.length;
+ }
+ await stopSegment(current);
+ if (!structured.has(current)) {
+ await stopSegment(current);
+ }
+ current = createSegment();
+ if (opening) {
+ await current.streamer.append({ markdown_text: opening, token });
+ current.hasContent = true;
+ current.length += opening.length;
+ }
+ };
+ const shouldRotateSegment = () => current.visibleTasks.size >= STREAM_TASK_LIMIT && current.openTasks.size === 0;
+ const rememberTaskStatus = (chunk) => {
+ if (chunk.type !== "task_update") {
+ return;
+ }
+ current.visibleTasks.add(chunk.id);
+ if (isOpenTaskStatus(chunk.status)) {
+ current.openTasks.add(chunk.id);
+ } else {
+ current.openTasks.delete(chunk.id);
+ }
+ };
const flushMarkdownDelta = async (delta) => {
if (delta.length === 0) {
Expand Down Expand Up @@ -240,33 +229,63 @@ index 53a35e4..8fc9ebd 100644
+ }
};
let structuredChunksSupported = true;
+ const appendStructuredChunk = async (segment, chunk) => {
+ await segment.streamer.append({ chunks: [chunk], token });
+ segment.hasContent = true;
+ };
+ const createStructuredSegment = async () => {
+ current = createSegment();
+ structured.add(current);
+ if (plan) {
+ await appendStructuredChunk(current, plan);
+ }
+ return current;
+ };
+ const getTaskSegment = async (id) => {
+ const existing = taskSegments.get(id);
+ if (existing) {
+ return existing;
+ }
+ if (current.cards >= STREAM_TASK_LIMIT) {
+ await createStructuredSegment();
+ } else {
+ structured.add(current);
+ }
+ current.cards += 1;
+ taskSegments.set(id, current);
+ return current;
+ };
const sendStructuredChunk = async (chunk) => {
@@ -3463,7 +3645,23 @@ var SlackAdapter = class _SlackAdapter {
if (!structuredChunksSupported) {
return;
@@ -3463,7 +3676,25 @@ var SlackAdapter = class _SlackAdapter {
await flushMarkdownDelta(delta);
lastAppended = committable;
try {
- await streamer.append({ chunks: [chunk], token });
+ const chunks = chunk.type === "task_update" ? splitTask(chunk, taskParts.get(chunk.id) ?? 0) : [
+ {
+ if (chunk.type === "plan_update") {
+ plan = {
+ ...chunk,
+ title: truncateText(chunk.title, STREAM_CHUNK_LIMIT)
+ };
+ if (structured.size === 0) {
+ structured.add(current);
+ }
+ ];
+ if (chunk.type === "task_update") {
+ taskParts.set(chunk.id, chunks.length);
+ for (const segment of structured) {
+ await appendStructuredChunk(segment, plan);
+ }
+ return;
+ }
+ const chunks = splitTask(chunk, taskParts.get(chunk.id) ?? 0);
+ taskParts.set(chunk.id, chunks.length);
+ for (const normalized of chunks) {
+ if (shouldRotateSegment()) {
+ await rotateSegment(fence.closing, fence.opening);
+ }
+ await current.streamer.append({ chunks: [normalized], token });
+ current.hasContent = true;
+ rememberTaskStatus(normalized);
+ const segment = await getTaskSegment(normalized.id);
+ await appendStructuredChunk(segment, normalized);
+ }
} catch (error) {
structuredChunksSupported = false;
this.logger.warn(
@@ -3479,31 +3677,67 @@ var SlackAdapter = class _SlackAdapter {
@@ -3479,31 +3710,72 @@ var SlackAdapter = class _SlackAdapter {
await flushMarkdownDelta(delta);
lastAppended = committable;
};
Expand All @@ -286,7 +305,7 @@ index 53a35e4..8fc9ebd 100644
+ } else {
+ await sendStructuredChunk(chunk);
+ }
+ }
}
+ renderer.finish();
+ const finalCommittable = renderer.getCommittableText();
+ const finalDelta = finalCommittable.slice(lastAppended.length);
Expand All @@ -300,16 +319,21 @@ index 53a35e4..8fc9ebd 100644
+ current.hasContent = true;
+ current.length += fence.closing.length;
+ }
+ for (const segment of structured) {
+ if (segment !== current) {
+ await stopSegment(segment);
+ }
+ }
+ lastResult = await stopSegment(
+ current,
+ options?.stopBlocks,
+ true
+ );
+ } catch (error) {
+ const segments = /* @__PURE__ */ new Set([current]);
+ const segments = /* @__PURE__ */ new Set([...structured, current]);
+ fence.finish();
+ for (const segment of segments) {
+ if (!segment?.hasContent) {
+ if (!segment.hasContent) {
+ continue;
+ }
+ try {
Expand All @@ -326,7 +350,7 @@ index 53a35e4..8fc9ebd 100644
+ error: stopError
+ });
+ }
}
+ }
+ throw error;
}
- renderer.finish();
Expand Down
6 changes: 3 additions & 3 deletions pnpm-lock.yaml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion services/api-rs/crates/centaur-session-sqlx/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ use uuid::Uuid;
static MIGRATOR: sqlx::migrate::Migrator = sqlx::migrate!("./migrations");

pub const SESSION_EVENTS_CHANNEL: &str = "centaur_session_events";
const DEFAULT_MAX_CONNECTIONS: u32 = 500;

#[derive(Clone, Debug)]
pub struct CreateExecutionResult {
Expand Down Expand Up @@ -47,7 +48,7 @@ impl PgSessionStore {

pub async fn connect(database_url: &str) -> Result<Self, SessionStoreError> {
let pool = PgPoolOptions::new()
.max_connections(50)
.max_connections(DEFAULT_MAX_CONNECTIONS)
.connect(database_url)
.await?;
Ok(Self::new(pool))
Expand Down
2 changes: 1 addition & 1 deletion services/slackbotv2/test/chat-sdk-emulate.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -892,7 +892,7 @@ describe('slackbotv2', () => {
.filter(chunk => chunk.type === 'task_update')
.map(chunk => stringField(chunk.id))
).size
).toBe(48)
).toBe(50)
expect(await threadText(parent.ts)).toContain('TASK_STREAM_CONTINUATION_OK')
})

Expand Down
Loading