Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions services/api/api/routers/agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -693,6 +693,7 @@ async def _iter_events():
class ReleaseRequest(BaseModel):
release_id: str | None = None
cancel_inflight: bool = False
clear_resume_state: bool = False


@router.post("/threads/{thread_key}/release", dependencies=[Depends(require_scope("agent:execute"))])
Expand All @@ -706,6 +707,7 @@ async def release_thread(request: Request, thread_key: str, body: ReleaseRequest
thread_key=thread_key,
release_id=release_id,
cancel_inflight=body.cancel_inflight,
clear_resume_state=body.clear_resume_state,
)
except ControlPlaneError as exc:
return _json_error(exc.code, exc.message, exc.status_code)
Expand Down
12 changes: 12 additions & 0 deletions services/api/api/runtime_control.py
Original file line number Diff line number Diff line change
Expand Up @@ -1822,11 +1822,13 @@ async def release_assignment(
cancel_inflight: bool,
stop_runtime: bool = True,
stop_runtime_background: bool = False,
clear_resume_state: bool = False,
) -> dict[str, Any]:
payload = {
"thread_key": thread_key,
"release_id": release_id,
"cancel_inflight": cancel_inflight,
"clear_resume_state": clear_resume_state,
}
req_hash = request_hash(payload)

Expand Down Expand Up @@ -1878,6 +1880,15 @@ async def release_assignment(
"WHERE thread_key = $1 AND status IN ('queued', 'running', 'cancel_requested', 'retry_wait')",
thread_key,
)
if clear_resume_state:
await conn.execute(
"UPDATE sandbox_sessions SET "
"agent_thread_id = NULL, last_delivered_id = NULL, "
"inflight_turn_id = NULL, inflight_turn_input = NULL, inflight_attempts = 0, "
"last_result = NULL, last_result_at = NULL, updated_at = NOW() "
"WHERE thread_key = $1",
thread_key,
)
response = {
"ok": True,
"thread_key": thread_key,
Expand Down Expand Up @@ -1928,6 +1939,7 @@ async def _stop_released_runtime() -> None:
assignment_generation=response.get("assignment_generation"),
runtime_id=response.get("runtime_id"),
cancel_inflight=cancel_inflight,
clear_resume_state=clear_resume_state,
)
return response

Expand Down
28 changes: 28 additions & 0 deletions services/slackbot/src/centaur/handoff.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,11 @@ export type CentaurHandoffResult =
| { ok: true; status: number; body: unknown }
| { ok: false; status: number; body: unknown }

export type CentaurCancelThreadInput = {
thread_key: string
message_id: string
}

export class CentaurHandoff {
readonly config: AppConfig

Expand Down Expand Up @@ -78,6 +83,29 @@ export class CentaurHandoff {
}
)
}

async cancelThread(input: CentaurCancelThreadInput): Promise<CentaurHandoffResult> {
const url = new URL(
`/agent/threads/${encodeURIComponent(input.thread_key)}/release`,
this.config.CENTAUR_API_URL
)
const apiKey = centaurApiKey(this.config)
const response = await fetch(url, {
method: 'POST',
headers: {
'Content-Type': 'application/json',
...(apiKey ? { Authorization: `Bearer ${apiKey}` } : {})
},
body: JSON.stringify({
release_id: `slack-stop:${input.message_id}`,
cancel_inflight: true,
clear_resume_state: true
})
})

const body = await readResponseBody(response)
return { ok: response.ok, status: response.status, body }
}
}

async function readResponseBody(response: Response): Promise<unknown> {
Expand Down
23 changes: 21 additions & 2 deletions services/slackbot/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -601,10 +601,16 @@ async function processSlackEvent(envelope: SlackEnvelope): Promise<void> {
return
}

const stopCommand = isStopCommand(normalized.parts)
spanAttributes(span, {
'centaur.slackbot.event_action': 'handoff'
'centaur.slackbot.event_action': stopCommand ? 'cancel_thread' : 'handoff'
})
const result = await handoff.emit(normalized)
const result = stopCommand
? await handoff.cancelThread({
thread_key: normalized.thread_key,
message_id: normalized.message_id
})
: await handoff.emit(normalized)
spanAttributes(span, {
'centaur.slackbot.handoff_status': result.status,
'centaur.slackbot.handoff_ok': result.ok
Expand Down Expand Up @@ -648,6 +654,19 @@ async function ackWithReaction(client: WebClient, event: NormalizedSlackEvent):
)
}

function isStopCommand(parts: Array<{ type: string; text?: string }>): boolean {
const text = parts
.filter(part => part.type === 'text')
.map(part => part.text ?? '')
.join('\n')
.replace(/<@[A-Z0-9]+>/g, ' ')
.replace(/@U[A-Z0-9]+/g, ' ')
.replace(/\s+/g, ' ')
.trim()
.toLowerCase()
return /^(?:stop|cancel)(?:\s+(?:stop|cancel))*$/.test(text)
}

function slackApiErrorResponse(c: Context, error: unknown) {
const data = (error as { data?: unknown })?.data
if (data && typeof data === 'object') return c.json(sanitizeLogValue(data), 502)
Expand Down
51 changes: 50 additions & 1 deletion services/slackbot/test/emulate/slack-e2e.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ const CHANNEL_ID = 'C000000001'
type WorkflowRunRequest = {
workflow_name: string
trigger_key: string
eager_start?: boolean
input: {
thread_key: string
parts: Array<{ type: string; text?: string }>
Expand Down Expand Up @@ -133,7 +134,7 @@ describe(`Slack Emulate E2E (${IMPLEMENTATION})`, () => {

const run = onlyRun()
expect(run.workflow_name).toBe('slack_thread_turn')
expect('eager_start' in run).toBe(false)
expect(run.eager_start).toBe(true)
expect(run.trigger_key).toBe(`slack:${TEAM_ID}:${CHANNEL_ID}:${parent.ts}`)
expect(run.input.thread_key).toBe(`slack:${TEAM_ID}:${CHANNEL_ID}:${parent.ts}`)
expect(run.input.message_id).toBe(`slack:${TEAM_ID}:${CHANNEL_ID}:${parent.ts}`)
Expand Down Expand Up @@ -186,6 +187,43 @@ describe(`Slack Emulate E2E (${IMPLEMENTATION})`, () => {
)
})

it('cancels the active thread instead of handing stop to the agent', async () => {
const parent = await postUserMessage(`<@${BOT_USER_ID}> write a long answer`)
const stop = await postUserMessage(`<@${BOT_USER_ID}> stop`, parent.ts)
const waits: Promise<unknown>[] = []

const response = await app.request(
'/api/webhooks/slack',
signedSlackEvent({
event_id: 'Ev-emulate-stop',
event: {
type: 'app_mention',
user: USER_ID,
channel: CHANNEL_ID,
ts: stop.ts,
thread_ts: parent.ts,
text: `<@${BOT_USER_ID}> stop`
}
}),
{},
waitUntilContext(waits)
)

expect(response.status).toBe(200)
await Promise.all(waits)
expect(centaur.workflowRuns).toHaveLength(0)
expect(centaur.releases).toEqual([
{
threadKey: `slack:${TEAM_ID}:${CHANNEL_ID}:${parent.ts}`,
body: {
release_id: `slack-stop:slack:${TEAM_ID}:${CHANNEL_ID}:${stop.ts}`,
cancel_inflight: true,
clear_resume_state: true
}
}
])
})

it('dispatches an Alertmanager-style bot-authored mention into a Slack workflow', async () => {
const waits: Promise<unknown>[] = []
const response = await app.request(
Expand Down Expand Up @@ -520,6 +558,7 @@ function waitUntilContext(waits: Promise<unknown>[]): any {

async function createFakeCentaur() {
const workflowRuns: WorkflowRunRequest[] = []
const releases: Array<{ threadKey: string; body: any }> = []
const deliveries: FakeDelivery[] = []
const delivered: string[] = []
const failed: string[] = []
Expand All @@ -532,6 +571,14 @@ async function createFakeCentaur() {
workflowRuns.push((await request.json()) as WorkflowRunRequest)
return Response.json({ ok: true, run_id: `wfr-${workflowRuns.length}` })
}
const releaseMatch = /^\/agent\/threads\/([^/]+)\/release$/.exec(url.pathname)
if (releaseMatch) {
releases.push({
threadKey: decodeURIComponent(releaseMatch[1] ?? ''),
body: await request.json()
})
return Response.json({ ok: true, released: true })
}
if (url.pathname === '/agent/final-deliveries/claim') {
return Response.json({ deliveries: deliveries.splice(0) })
}
Expand All @@ -551,11 +598,13 @@ async function createFakeCentaur() {
return {
url: `http://localhost:${server.port}`,
workflowRuns,
releases,
deliveries,
delivered,
failed,
reset() {
workflowRuns.length = 0
releases.length = 0
deliveries.length = 0
delivered.length = 0
failed.length = 0
Expand Down
Loading