Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
69 changes: 68 additions & 1 deletion cli/src/api/apiSession.test.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { describe, expect, it } from 'vitest'
import { isExternalUserMessage } from './apiSession'
import { isExternalUserMessage, IncomingMessageFilter } from './apiSession'

describe('isExternalUserMessage', () => {
const baseUserMsg = {
Expand Down Expand Up @@ -96,3 +96,70 @@ describe('isExternalUserMessage', () => {
).toBe(false)
})
})

describe('IncomingMessageFilter (HAPI Bot R3 finding #1)', () => {
it('accepts a mature scheduled message whose seq is below the latest cursor', () => {
// schedule seq=10, immediate seq=11 acks first → cursor=11.
// seq=10 matures: seq-only dedup would drop it; id-based dedup must accept.
const filter = new IncomingMessageFilter()
expect(filter.accept({ id: 'msg-imm', seq: 11 })).toBe(true)
expect(filter.accept({ id: 'msg-sched', seq: 10 })).toBe(true)
})

it('rejects an exact id duplicate (re-emit on the next mature tick)', () => {
const filter = new IncomingMessageFilter()
expect(filter.accept({ id: 'msg-1', seq: 1 })).toBe(true)
expect(filter.accept({ id: 'msg-1', seq: 1 })).toBe(false)
})

it('falls back to seq-only dedup for messages without an id', () => {
const filter = new IncomingMessageFilter()
expect(filter.accept({ seq: 5 })).toBe(true)
// seq <= cursor and no id → drop (legacy behaviour preserved).
expect(filter.accept({ seq: 4 })).toBe(false)
expect(filter.accept({ seq: 5 })).toBe(false)
})

it('advances cursorSeq monotonically regardless of arrival order', () => {
const filter = new IncomingMessageFilter()
filter.accept({ id: 'a', seq: 11 })
filter.accept({ id: 'b', seq: 10 })
expect(filter.cursorSeq()).toBe(11)
})

it('bounds the seen-id set to the configured capacity (LRU eviction)', () => {
const filter = new IncomingMessageFilter(3)
filter.accept({ id: 'a', seq: 1 })
filter.accept({ id: 'b', seq: 2 })
filter.accept({ id: 'c', seq: 3 })
filter.accept({ id: 'd', seq: 4 })
// 'a' should have been evicted — re-presenting it is treated as new.
expect(filter.accept({ id: 'a', seq: 5 })).toBe(true)
// 'd' is still in the set.
expect(filter.accept({ id: 'd', seq: 6 })).toBe(false)
})

it('refreshes recency on dedup hit so re-emits survive bursts of unrelated ids', () => {
// Models the documented contract: the hub re-emits the same id every 5 s
// until the CLI acks. If the dedup were FIFO (insert-order only), a
// burst of capacity-many unrelated ids between re-emits would evict the
// pending id and the next re-emit would double-deliver.
const filter = new IncomingMessageFilter(3)
// Pre-fill so 'pending' is not at the head.
filter.accept({ id: 'a', seq: 1 })
filter.accept({ id: 'pending', seq: 2 })
filter.accept({ id: 'b', seq: 3 })
// Re-emit pending → recency refresh moves it to the tail.
expect(filter.accept({ id: 'pending', seq: 4 })).toBe(false)
// Burst that evicts oldest entries. Without the refresh 'pending' would
// be at insert position 2 and would be evicted; with the refresh it is
// now the newest entry and survives.
filter.accept({ id: 'c', seq: 5 })
filter.accept({ id: 'd', seq: 6 })
// 'a' (oldest) and then 'b' should have been evicted; 'pending' must
// still dedup.
expect(filter.accept({ id: 'pending', seq: 7 })).toBe(false)
expect(filter.accept({ id: 'a', seq: 8 })).toBe(true)
expect(filter.accept({ id: 'b', seq: 9 })).toBe(true)
})
})
74 changes: 64 additions & 10 deletions cli/src/api/apiSession.ts
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,64 @@ export function isExternalUserMessage(body: RawJSONLines): body is Extract<RawJS
return true
}

/**
* Dedup filter for messages arriving on the realtime socket and via reconnect
* backfill. Keyed by message id (with a bounded LRU) and falls back to the
* legacy seq cursor for messages that lack an id.
*
* Why id-first: scheduled messages keep the seq assigned at insertion time, so
* a row scheduled for T+1h (seq=10) can be released after a later immediate
* message (seq=11) has already advanced the cursor. A pure seq <= cursor
* filter would silently drop the mature emit. See HAPI Bot R3 finding #1.
*/
export class IncomingMessageFilter {
private readonly seenIds = new Set<string>()
private readonly capacity: number
private lastSeenSeq: number | null = null

constructor(capacity = 256) {
this.capacity = capacity
}

cursorSeq(): number | null {
return this.lastSeenSeq
}

/** Returns true if this message should be processed; false to drop as a duplicate. */
accept(message: { id?: string | null; seq?: number | null }): boolean {
const id = typeof message.id === 'string' && message.id.length > 0 ? message.id : null
if (id && this.seenIds.has(id)) {
// Refresh recency: the hub re-emits the same id every 5 s until the
// CLI acks (releaseMatureScheduledMessages contract). Without a
// delete+re-add the entry stays at its first-insert position and can
// be evicted by a burst of unrelated ids before the ack lands —
// the next re-emit would then be treated as new and double-deliver.
this.seenIds.delete(id)
this.seenIds.add(id)
return false
}

const seq = typeof message.seq === 'number' ? message.seq : null
if (!id && seq !== null && this.lastSeenSeq !== null && seq <= this.lastSeenSeq) {
return false
}

if (id) {
this.seenIds.add(id)
if (this.seenIds.size > this.capacity) {
// Set iteration is insertion-ordered; with delete+re-add on dedup hit
// (above) this becomes a true LRU eviction.
const oldest = this.seenIds.values().next().value
if (oldest !== undefined) this.seenIds.delete(oldest)
}
}
if (seq !== null) {
this.lastSeenSeq = Math.max(this.lastSeenSeq ?? 0, seq)
}
return true
}
}

export class ApiSessionClient extends EventEmitter {
private readonly token: string
readonly sessionId: string
Expand All @@ -82,7 +140,7 @@ export class ApiSessionClient extends EventEmitter {
private pendingMessages: { message: UserMessage; localId?: string }[] = []
private pendingMessageCallback: ((message: UserMessage, localId?: string) => void) | null = null
private cancelQueuedMessageCallback: ((localId: string) => boolean) | null = null
private lastSeenMessageSeq: number | null = null
private readonly incomingFilter = new IncomingMessageFilter()
private backfillInFlight: Promise<void> | null = null
private needsBackfill = false
private hasConnectedOnce = false
Expand Down Expand Up @@ -274,13 +332,9 @@ export class ApiSessionClient extends EventEmitter {
}
}

private handleIncomingMessage(message: { seq?: number; localId?: string | null; content: unknown }): void {
const seq = typeof message.seq === 'number' ? message.seq : null
if (seq !== null) {
if (this.lastSeenMessageSeq !== null && seq <= this.lastSeenMessageSeq) {
return
}
this.lastSeenMessageSeq = seq
private handleIncomingMessage(message: { id?: string; seq?: number; localId?: string | null; content: unknown }): void {
if (!this.incomingFilter.accept({ id: message.id, seq: message.seq })) {
return
}

const userResult = UserMessageSchema.safeParse(message.content)
Expand Down Expand Up @@ -311,7 +365,7 @@ export class ApiSessionClient extends EventEmitter {
return
}

const startSeq = this.lastSeenMessageSeq
const startSeq = this.incomingFilter.cursorSeq()
if (startSeq === null) {
logger.debug('[API] Skipping backfill because no last-seen message sequence is available')
return
Expand Down Expand Up @@ -353,7 +407,7 @@ export class ApiSessionClient extends EventEmitter {
this.handleIncomingMessage(message)
}

const observedSeq = this.lastSeenMessageSeq ?? maxSeq
const observedSeq = this.incomingFilter.cursorSeq() ?? maxSeq
const nextCursor = Math.max(maxSeq, observedSeq)
if (nextCursor <= cursor) {
logger.debug('[API] Backfill stopped due to non-advancing cursor', {
Expand Down
3 changes: 2 additions & 1 deletion hub/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,8 @@ async function main() {
onSessionEnd: (payload) => syncEngine?.handleSessionEnd(payload),
onMachineAlive: (payload) => syncEngine?.handleMachineAlive(payload),
onBackgroundTaskDelta: (sessionId, delta) => syncEngine?.handleBackgroundTaskDelta(sessionId, delta),
onSessionActivity: (sessionId, updatedAt) => syncEngine?.recordSessionActivity(sessionId, updatedAt)
onSessionActivity: (sessionId, updatedAt) => syncEngine?.recordSessionActivity(sessionId, updatedAt),
onSweepImmediateQueued: (sessionId, now) => syncEngine?.sweepImmediateQueuedOnSessionEnd(sessionId, now)
})

syncEngine = new SyncEngine(store, socketServer.io, socketServer.rpcRegistry, sseManager)
Expand Down
6 changes: 4 additions & 2 deletions hub/src/socket/handlers/cli/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -43,10 +43,11 @@ export type CliHandlersDeps = {
onWebappEvent?: (event: SyncEvent) => void
onBackgroundTaskDelta?: (sessionId: string, delta: { started: number; completed: number }) => void
onSessionActivity?: (sessionId: string, updatedAt: number) => void
onSweepImmediateQueued?: (sessionId: string, now: number) => void
}

export function registerCliHandlers(socket: CliSocketWithData, deps: CliHandlersDeps): void {
const { io, store, rpcRegistry, terminalRegistry, onSessionAlive, onSessionEnd, onMachineAlive, onWebappEvent, onBackgroundTaskDelta, onSessionActivity } = deps
const { io, store, rpcRegistry, terminalRegistry, onSessionAlive, onSessionEnd, onMachineAlive, onWebappEvent, onBackgroundTaskDelta, onSessionActivity, onSweepImmediateQueued } = deps
const terminalNamespace = io.of('/terminal')
const namespace = typeof socket.data.namespace === 'string' ? socket.data.namespace : null

Expand Down Expand Up @@ -107,7 +108,8 @@ export function registerCliHandlers(socket: CliSocketWithData, deps: CliHandlers
onSessionEnd,
onWebappEvent,
onBackgroundTaskDelta,
onSessionActivity
onSessionActivity,
onSweepImmediateQueued
})
registerMachineHandlers(socket, {
store,
Expand Down
37 changes: 17 additions & 20 deletions hub/src/socket/handlers/cli/sessionHandlers.ts
Original file line number Diff line number Diff line change
Expand Up @@ -64,10 +64,12 @@ export type SessionHandlersDeps = {
onWebappEvent?: (event: SyncEvent) => void
onBackgroundTaskDelta?: (sessionId: string, delta: { started: number; completed: number }) => void
onSessionActivity?: (sessionId: string, updatedAt: number) => void
/** Delegates session-end immediate-queue sweep to the MessageService layer. */
onSweepImmediateQueued?: (sessionId: string, now: number) => void
}

export function registerSessionHandlers(socket: CliSocketWithData, deps: SessionHandlersDeps): void {
const { store, resolveSessionAccess, emitAccessError, onSessionAlive, onSessionEnd, onWebappEvent, onBackgroundTaskDelta, onSessionActivity } = deps
const { store, resolveSessionAccess, emitAccessError, onSessionAlive, onSessionEnd, onWebappEvent, onBackgroundTaskDelta, onSessionActivity, onSweepImmediateQueued } = deps

socket.on('message', (data: unknown) => {
const parsed = messageSchema.safeParse(data)
Expand Down Expand Up @@ -299,27 +301,22 @@ export function registerSessionHandlers(socket: CliSocketWithData, deps: Session
return
}

// Force-invoke any user messages that are still queued at session end.
// Without this, the floating bar pins the queued rows after the CLI is
// gone — there is no longer an ack path (no CLI to emit
// messages-consumed) so they would stay queued forever.
// Force-invoke only immediate-queued messages (scheduled_at IS NULL) at
// session end. *All* scheduled rows — mature or future — are deliberately
// preserved in DB so the mature-scan path (releaseMatureScheduledMessages)
// remains the sole emit channel and the CLI ack remains the sole writer of
// invoked_at. See HAPI Bot R4: stamping a mature scheduled row here would
// make the next mature-scan tick skip it (filter on invoked_at IS NULL) and
// silently drop the user's prompt.
//
// Without this sweep for immediate rows, the floating bar would pin queued
// rows after the CLI exits — there is no longer an ack path, so they would
// stay queued forever. The 5-second tick in syncEngine.expireInactive
// emits scheduled rows when they mature, regardless of session end.
try {
const queued = store.messages.getUninvokedLocalMessages(data.sid)
const localIds = queued
.map((m) => m.localId)
.filter((id): id is string => typeof id === 'string')
if (localIds.length > 0) {
const invokedAt = Date.now()
store.messages.markMessagesInvoked(data.sid, localIds, invokedAt)
onWebappEvent?.({
type: 'messages-consumed',
sessionId: data.sid,
localIds,
invokedAt
})
}
onSweepImmediateQueued?.(data.sid, Date.now())
} catch (err) {
console.error('session-end markMessagesInvoked failed', err)
console.error('session-end sweep failed', err)
}

onSessionEnd?.(data)
Expand Down
4 changes: 3 additions & 1 deletion hub/src/socket/server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ export type SocketServerDeps = {
onMachineAlive?: (payload: { machineId: string; time: number }) => void
onBackgroundTaskDelta?: (sessionId: string, delta: { started: number; completed: number }) => void
onSessionActivity?: (sessionId: string, updatedAt: number) => void
onSweepImmediateQueued?: (sessionId: string, now: number) => void
}

export function createSocketServer(deps: SocketServerDeps): {
Expand Down Expand Up @@ -117,7 +118,8 @@ export function createSocketServer(deps: SocketServerDeps): {
onMachineAlive: deps.onMachineAlive,
onWebappEvent: deps.onWebappEvent,
onBackgroundTaskDelta: deps.onBackgroundTaskDelta,
onSessionActivity: deps.onSessionActivity
onSessionActivity: deps.onSessionActivity,
onSweepImmediateQueued: deps.onSweepImmediateQueued
}))

terminalNs.use(async (socket, next) => {
Expand Down
25 changes: 24 additions & 1 deletion hub/src/store/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ export { PushStore } from './pushStore'
export { SessionStore } from './sessionStore'
export { UserStore } from './userStore'

const SCHEMA_VERSION: number = 8
const SCHEMA_VERSION: number = 9
const REQUIRED_TABLES = [
'sessions',
'machines',
Expand Down Expand Up @@ -98,6 +98,7 @@ export class Store {
5: () => this.migrateFromV5ToV6(),
6: () => this.migrateFromV6ToV7(),
7: () => this.migrateFromV7ToV8(),
8: () => this.migrateFromV8ToV9(),
})

if (currentVersion === 0) {
Expand Down Expand Up @@ -193,12 +194,16 @@ export class Store {
seq INTEGER NOT NULL,
local_id TEXT,
invoked_at INTEGER,
scheduled_at INTEGER,
FOREIGN KEY (session_id) REFERENCES sessions(id) ON DELETE CASCADE
);
CREATE INDEX IF NOT EXISTS idx_messages_session ON messages(session_id, seq);
CREATE UNIQUE INDEX IF NOT EXISTS idx_messages_local_id ON messages(session_id, local_id) WHERE local_id IS NOT NULL;
CREATE INDEX IF NOT EXISTS idx_messages_session_position
ON messages(session_id, COALESCE(invoked_at, created_at) DESC, seq DESC);
CREATE INDEX IF NOT EXISTS idx_messages_scheduled_pending
ON messages(scheduled_at)
WHERE scheduled_at IS NOT NULL AND invoked_at IS NULL;

CREATE TABLE IF NOT EXISTS users (
id INTEGER PRIMARY KEY AUTOINCREMENT,
Expand Down Expand Up @@ -378,6 +383,24 @@ export class Store {
`)
}

private migrateFromV8ToV9(): void {
const columns = this.getMessageColumnNames()
if (columns.size === 0) {
// No messages table yet — createSchema will build the up-to-date one.
return
}
if (!columns.has('scheduled_at')) {
this.db.exec('ALTER TABLE messages ADD COLUMN scheduled_at INTEGER')
}
// Partial index for efficient mature scheduled message lookup.
// Idempotent via IF NOT EXISTS.
this.db.exec(`
CREATE INDEX IF NOT EXISTS idx_messages_scheduled_pending
ON messages(scheduled_at)
WHERE scheduled_at IS NOT NULL AND invoked_at IS NULL
`)
}

private getSessionColumnNames(): Set<string> {
const rows = this.db.prepare('PRAGMA table_info(sessions)').all() as Array<{ name: string }>
return new Set(rows.map((row) => row.name))
Expand Down
18 changes: 13 additions & 5 deletions hub/src/store/messageStore.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import type { Database } from 'bun:sqlite'

import type { StoredMessage } from './types'
import { addMessage, cancelQueuedMessage, deleteQueuedMessageById, lookupQueuedMessage, getMessages, getMessagesAfter, getMessagesByPosition, getUninvokedLocalMessages, markMessagesInvoked, mergeSessionMessages, type CancelQueuedMessageResult, type LookupQueuedMessageResult } from './messages'
import { addMessage, cancelQueuedMessage, deleteQueuedMessageById, lookupQueuedMessage, getMessages, getDeliverableMessagesAfter, getMessagesByPosition, getUninvokedLocalMessages, getMatureScheduledMessages, getImmediateQueuedLocalMessages, markMessagesInvoked, mergeSessionMessages, type CancelQueuedMessageResult, type LookupQueuedMessageResult } from './messages'

export class MessageStore {
private readonly db: Database
Expand All @@ -10,16 +10,16 @@ export class MessageStore {
this.db = db
}

addMessage(sessionId: string, content: unknown, localId?: string): StoredMessage {
return addMessage(this.db, sessionId, content, localId)
addMessage(sessionId: string, content: unknown, localId?: string, scheduledAt?: number | null): StoredMessage {
return addMessage(this.db, sessionId, content, localId, scheduledAt)
}

getMessages(sessionId: string, limit: number = 200, beforeSeq?: number): StoredMessage[] {
return getMessages(this.db, sessionId, limit, beforeSeq)
}

getMessagesAfter(sessionId: string, afterSeq: number, limit: number = 200): StoredMessage[] {
return getMessagesAfter(this.db, sessionId, afterSeq, limit)
getDeliverableMessagesAfter(sessionId: string, afterSeq: number, now: number, limit: number = 200): StoredMessage[] {
return getDeliverableMessagesAfter(this.db, sessionId, afterSeq, now, limit)
}

getMessagesByPosition(sessionId: string, limit: number, before?: { at: number; seq: number }): StoredMessage[] {
Expand All @@ -30,6 +30,14 @@ export class MessageStore {
return getUninvokedLocalMessages(this.db, sessionId)
}

getMatureScheduledMessages(beforeTime: number): StoredMessage[] {
return getMatureScheduledMessages(this.db, beforeTime)
}

getImmediateQueuedLocalMessages(sessionId: string): StoredMessage[] {
return getImmediateQueuedLocalMessages(this.db, sessionId)
}

cancelQueuedMessage(sessionId: string, messageId: string): CancelQueuedMessageResult {
return cancelQueuedMessage(this.db, sessionId, messageId)
}
Expand Down
Loading
Loading