diff --git a/cli/src/api/apiSession.test.ts b/cli/src/api/apiSession.test.ts index 06a6bbc99..6078e0eef 100644 --- a/cli/src/api/apiSession.test.ts +++ b/cli/src/api/apiSession.test.ts @@ -1,5 +1,5 @@ import { describe, expect, it } from 'vitest' -import { isExternalUserMessage } from './apiSession' +import { isExternalUserMessage, IncomingMessageFilter } from './apiSession' describe('isExternalUserMessage', () => { const baseUserMsg = { @@ -96,3 +96,70 @@ describe('isExternalUserMessage', () => { ).toBe(false) }) }) + +describe('IncomingMessageFilter (HAPI Bot R3 finding #1)', () => { + it('accepts a mature scheduled message whose seq is below the latest cursor', () => { + // schedule seq=10, immediate seq=11 acks first → cursor=11. + // seq=10 matures: seq-only dedup would drop it; id-based dedup must accept. + const filter = new IncomingMessageFilter() + expect(filter.accept({ id: 'msg-imm', seq: 11 })).toBe(true) + expect(filter.accept({ id: 'msg-sched', seq: 10 })).toBe(true) + }) + + it('rejects an exact id duplicate (re-emit on the next mature tick)', () => { + const filter = new IncomingMessageFilter() + expect(filter.accept({ id: 'msg-1', seq: 1 })).toBe(true) + expect(filter.accept({ id: 'msg-1', seq: 1 })).toBe(false) + }) + + it('falls back to seq-only dedup for messages without an id', () => { + const filter = new IncomingMessageFilter() + expect(filter.accept({ seq: 5 })).toBe(true) + // seq <= cursor and no id → drop (legacy behaviour preserved). + expect(filter.accept({ seq: 4 })).toBe(false) + expect(filter.accept({ seq: 5 })).toBe(false) + }) + + it('advances cursorSeq monotonically regardless of arrival order', () => { + const filter = new IncomingMessageFilter() + filter.accept({ id: 'a', seq: 11 }) + filter.accept({ id: 'b', seq: 10 }) + expect(filter.cursorSeq()).toBe(11) + }) + + it('bounds the seen-id set to the configured capacity (LRU eviction)', () => { + const filter = new IncomingMessageFilter(3) + filter.accept({ id: 'a', seq: 1 }) + filter.accept({ id: 'b', seq: 2 }) + filter.accept({ id: 'c', seq: 3 }) + filter.accept({ id: 'd', seq: 4 }) + // 'a' should have been evicted — re-presenting it is treated as new. + expect(filter.accept({ id: 'a', seq: 5 })).toBe(true) + // 'd' is still in the set. + expect(filter.accept({ id: 'd', seq: 6 })).toBe(false) + }) + + it('refreshes recency on dedup hit so re-emits survive bursts of unrelated ids', () => { + // Models the documented contract: the hub re-emits the same id every 5 s + // until the CLI acks. If the dedup were FIFO (insert-order only), a + // burst of capacity-many unrelated ids between re-emits would evict the + // pending id and the next re-emit would double-deliver. + const filter = new IncomingMessageFilter(3) + // Pre-fill so 'pending' is not at the head. + filter.accept({ id: 'a', seq: 1 }) + filter.accept({ id: 'pending', seq: 2 }) + filter.accept({ id: 'b', seq: 3 }) + // Re-emit pending → recency refresh moves it to the tail. + expect(filter.accept({ id: 'pending', seq: 4 })).toBe(false) + // Burst that evicts oldest entries. Without the refresh 'pending' would + // be at insert position 2 and would be evicted; with the refresh it is + // now the newest entry and survives. + filter.accept({ id: 'c', seq: 5 }) + filter.accept({ id: 'd', seq: 6 }) + // 'a' (oldest) and then 'b' should have been evicted; 'pending' must + // still dedup. + expect(filter.accept({ id: 'pending', seq: 7 })).toBe(false) + expect(filter.accept({ id: 'a', seq: 8 })).toBe(true) + expect(filter.accept({ id: 'b', seq: 9 })).toBe(true) + }) +}) diff --git a/cli/src/api/apiSession.ts b/cli/src/api/apiSession.ts index c19253de9..398f13c0d 100644 --- a/cli/src/api/apiSession.ts +++ b/cli/src/api/apiSession.ts @@ -71,6 +71,64 @@ export function isExternalUserMessage(body: RawJSONLines): body is Extract() + private readonly capacity: number + private lastSeenSeq: number | null = null + + constructor(capacity = 256) { + this.capacity = capacity + } + + cursorSeq(): number | null { + return this.lastSeenSeq + } + + /** Returns true if this message should be processed; false to drop as a duplicate. */ + accept(message: { id?: string | null; seq?: number | null }): boolean { + const id = typeof message.id === 'string' && message.id.length > 0 ? message.id : null + if (id && this.seenIds.has(id)) { + // Refresh recency: the hub re-emits the same id every 5 s until the + // CLI acks (releaseMatureScheduledMessages contract). Without a + // delete+re-add the entry stays at its first-insert position and can + // be evicted by a burst of unrelated ids before the ack lands — + // the next re-emit would then be treated as new and double-deliver. + this.seenIds.delete(id) + this.seenIds.add(id) + return false + } + + const seq = typeof message.seq === 'number' ? message.seq : null + if (!id && seq !== null && this.lastSeenSeq !== null && seq <= this.lastSeenSeq) { + return false + } + + if (id) { + this.seenIds.add(id) + if (this.seenIds.size > this.capacity) { + // Set iteration is insertion-ordered; with delete+re-add on dedup hit + // (above) this becomes a true LRU eviction. + const oldest = this.seenIds.values().next().value + if (oldest !== undefined) this.seenIds.delete(oldest) + } + } + if (seq !== null) { + this.lastSeenSeq = Math.max(this.lastSeenSeq ?? 0, seq) + } + return true + } +} + export class ApiSessionClient extends EventEmitter { private readonly token: string readonly sessionId: string @@ -82,7 +140,7 @@ export class ApiSessionClient extends EventEmitter { private pendingMessages: { message: UserMessage; localId?: string }[] = [] private pendingMessageCallback: ((message: UserMessage, localId?: string) => void) | null = null private cancelQueuedMessageCallback: ((localId: string) => boolean) | null = null - private lastSeenMessageSeq: number | null = null + private readonly incomingFilter = new IncomingMessageFilter() private backfillInFlight: Promise | null = null private needsBackfill = false private hasConnectedOnce = false @@ -274,13 +332,9 @@ export class ApiSessionClient extends EventEmitter { } } - private handleIncomingMessage(message: { seq?: number; localId?: string | null; content: unknown }): void { - const seq = typeof message.seq === 'number' ? message.seq : null - if (seq !== null) { - if (this.lastSeenMessageSeq !== null && seq <= this.lastSeenMessageSeq) { - return - } - this.lastSeenMessageSeq = seq + private handleIncomingMessage(message: { id?: string; seq?: number; localId?: string | null; content: unknown }): void { + if (!this.incomingFilter.accept({ id: message.id, seq: message.seq })) { + return } const userResult = UserMessageSchema.safeParse(message.content) @@ -311,7 +365,7 @@ export class ApiSessionClient extends EventEmitter { return } - const startSeq = this.lastSeenMessageSeq + const startSeq = this.incomingFilter.cursorSeq() if (startSeq === null) { logger.debug('[API] Skipping backfill because no last-seen message sequence is available') return @@ -353,7 +407,7 @@ export class ApiSessionClient extends EventEmitter { this.handleIncomingMessage(message) } - const observedSeq = this.lastSeenMessageSeq ?? maxSeq + const observedSeq = this.incomingFilter.cursorSeq() ?? maxSeq const nextCursor = Math.max(maxSeq, observedSeq) if (nextCursor <= cursor) { logger.debug('[API] Backfill stopped due to non-advancing cursor', { diff --git a/hub/src/index.ts b/hub/src/index.ts index ce6a62bec..f39e9239e 100644 --- a/hub/src/index.ts +++ b/hub/src/index.ts @@ -190,7 +190,8 @@ async function main() { onSessionEnd: (payload) => syncEngine?.handleSessionEnd(payload), onMachineAlive: (payload) => syncEngine?.handleMachineAlive(payload), onBackgroundTaskDelta: (sessionId, delta) => syncEngine?.handleBackgroundTaskDelta(sessionId, delta), - onSessionActivity: (sessionId, updatedAt) => syncEngine?.recordSessionActivity(sessionId, updatedAt) + onSessionActivity: (sessionId, updatedAt) => syncEngine?.recordSessionActivity(sessionId, updatedAt), + onSweepImmediateQueued: (sessionId, now) => syncEngine?.sweepImmediateQueuedOnSessionEnd(sessionId, now) }) syncEngine = new SyncEngine(store, socketServer.io, socketServer.rpcRegistry, sseManager) diff --git a/hub/src/socket/handlers/cli/index.ts b/hub/src/socket/handlers/cli/index.ts index a531437ff..6ef231a1b 100644 --- a/hub/src/socket/handlers/cli/index.ts +++ b/hub/src/socket/handlers/cli/index.ts @@ -43,10 +43,11 @@ export type CliHandlersDeps = { onWebappEvent?: (event: SyncEvent) => void onBackgroundTaskDelta?: (sessionId: string, delta: { started: number; completed: number }) => void onSessionActivity?: (sessionId: string, updatedAt: number) => void + onSweepImmediateQueued?: (sessionId: string, now: number) => void } export function registerCliHandlers(socket: CliSocketWithData, deps: CliHandlersDeps): void { - const { io, store, rpcRegistry, terminalRegistry, onSessionAlive, onSessionEnd, onMachineAlive, onWebappEvent, onBackgroundTaskDelta, onSessionActivity } = deps + const { io, store, rpcRegistry, terminalRegistry, onSessionAlive, onSessionEnd, onMachineAlive, onWebappEvent, onBackgroundTaskDelta, onSessionActivity, onSweepImmediateQueued } = deps const terminalNamespace = io.of('/terminal') const namespace = typeof socket.data.namespace === 'string' ? socket.data.namespace : null @@ -107,7 +108,8 @@ export function registerCliHandlers(socket: CliSocketWithData, deps: CliHandlers onSessionEnd, onWebappEvent, onBackgroundTaskDelta, - onSessionActivity + onSessionActivity, + onSweepImmediateQueued }) registerMachineHandlers(socket, { store, diff --git a/hub/src/socket/handlers/cli/sessionHandlers.ts b/hub/src/socket/handlers/cli/sessionHandlers.ts index 43ea72079..e7a6c75e9 100644 --- a/hub/src/socket/handlers/cli/sessionHandlers.ts +++ b/hub/src/socket/handlers/cli/sessionHandlers.ts @@ -64,10 +64,12 @@ export type SessionHandlersDeps = { onWebappEvent?: (event: SyncEvent) => void onBackgroundTaskDelta?: (sessionId: string, delta: { started: number; completed: number }) => void onSessionActivity?: (sessionId: string, updatedAt: number) => void + /** Delegates session-end immediate-queue sweep to the MessageService layer. */ + onSweepImmediateQueued?: (sessionId: string, now: number) => void } export function registerSessionHandlers(socket: CliSocketWithData, deps: SessionHandlersDeps): void { - const { store, resolveSessionAccess, emitAccessError, onSessionAlive, onSessionEnd, onWebappEvent, onBackgroundTaskDelta, onSessionActivity } = deps + const { store, resolveSessionAccess, emitAccessError, onSessionAlive, onSessionEnd, onWebappEvent, onBackgroundTaskDelta, onSessionActivity, onSweepImmediateQueued } = deps socket.on('message', (data: unknown) => { const parsed = messageSchema.safeParse(data) @@ -299,27 +301,22 @@ export function registerSessionHandlers(socket: CliSocketWithData, deps: Session return } - // Force-invoke any user messages that are still queued at session end. - // Without this, the floating bar pins the queued rows after the CLI is - // gone — there is no longer an ack path (no CLI to emit - // messages-consumed) so they would stay queued forever. + // Force-invoke only immediate-queued messages (scheduled_at IS NULL) at + // session end. *All* scheduled rows — mature or future — are deliberately + // preserved in DB so the mature-scan path (releaseMatureScheduledMessages) + // remains the sole emit channel and the CLI ack remains the sole writer of + // invoked_at. See HAPI Bot R4: stamping a mature scheduled row here would + // make the next mature-scan tick skip it (filter on invoked_at IS NULL) and + // silently drop the user's prompt. + // + // Without this sweep for immediate rows, the floating bar would pin queued + // rows after the CLI exits — there is no longer an ack path, so they would + // stay queued forever. The 5-second tick in syncEngine.expireInactive + // emits scheduled rows when they mature, regardless of session end. try { - const queued = store.messages.getUninvokedLocalMessages(data.sid) - const localIds = queued - .map((m) => m.localId) - .filter((id): id is string => typeof id === 'string') - if (localIds.length > 0) { - const invokedAt = Date.now() - store.messages.markMessagesInvoked(data.sid, localIds, invokedAt) - onWebappEvent?.({ - type: 'messages-consumed', - sessionId: data.sid, - localIds, - invokedAt - }) - } + onSweepImmediateQueued?.(data.sid, Date.now()) } catch (err) { - console.error('session-end markMessagesInvoked failed', err) + console.error('session-end sweep failed', err) } onSessionEnd?.(data) diff --git a/hub/src/socket/server.ts b/hub/src/socket/server.ts index b03b31ec4..e07a829e0 100644 --- a/hub/src/socket/server.ts +++ b/hub/src/socket/server.ts @@ -41,6 +41,7 @@ export type SocketServerDeps = { onMachineAlive?: (payload: { machineId: string; time: number }) => void onBackgroundTaskDelta?: (sessionId: string, delta: { started: number; completed: number }) => void onSessionActivity?: (sessionId: string, updatedAt: number) => void + onSweepImmediateQueued?: (sessionId: string, now: number) => void } export function createSocketServer(deps: SocketServerDeps): { @@ -117,7 +118,8 @@ export function createSocketServer(deps: SocketServerDeps): { onMachineAlive: deps.onMachineAlive, onWebappEvent: deps.onWebappEvent, onBackgroundTaskDelta: deps.onBackgroundTaskDelta, - onSessionActivity: deps.onSessionActivity + onSessionActivity: deps.onSessionActivity, + onSweepImmediateQueued: deps.onSweepImmediateQueued })) terminalNs.use(async (socket, next) => { diff --git a/hub/src/store/index.ts b/hub/src/store/index.ts index ee97856c8..a63e16ac1 100644 --- a/hub/src/store/index.ts +++ b/hub/src/store/index.ts @@ -23,7 +23,7 @@ export { PushStore } from './pushStore' export { SessionStore } from './sessionStore' export { UserStore } from './userStore' -const SCHEMA_VERSION: number = 8 +const SCHEMA_VERSION: number = 9 const REQUIRED_TABLES = [ 'sessions', 'machines', @@ -98,6 +98,7 @@ export class Store { 5: () => this.migrateFromV5ToV6(), 6: () => this.migrateFromV6ToV7(), 7: () => this.migrateFromV7ToV8(), + 8: () => this.migrateFromV8ToV9(), }) if (currentVersion === 0) { @@ -193,12 +194,16 @@ export class Store { seq INTEGER NOT NULL, local_id TEXT, invoked_at INTEGER, + scheduled_at INTEGER, FOREIGN KEY (session_id) REFERENCES sessions(id) ON DELETE CASCADE ); CREATE INDEX IF NOT EXISTS idx_messages_session ON messages(session_id, seq); CREATE UNIQUE INDEX IF NOT EXISTS idx_messages_local_id ON messages(session_id, local_id) WHERE local_id IS NOT NULL; CREATE INDEX IF NOT EXISTS idx_messages_session_position ON messages(session_id, COALESCE(invoked_at, created_at) DESC, seq DESC); + CREATE INDEX IF NOT EXISTS idx_messages_scheduled_pending + ON messages(scheduled_at) + WHERE scheduled_at IS NOT NULL AND invoked_at IS NULL; CREATE TABLE IF NOT EXISTS users ( id INTEGER PRIMARY KEY AUTOINCREMENT, @@ -378,6 +383,24 @@ export class Store { `) } + private migrateFromV8ToV9(): void { + const columns = this.getMessageColumnNames() + if (columns.size === 0) { + // No messages table yet — createSchema will build the up-to-date one. + return + } + if (!columns.has('scheduled_at')) { + this.db.exec('ALTER TABLE messages ADD COLUMN scheduled_at INTEGER') + } + // Partial index for efficient mature scheduled message lookup. + // Idempotent via IF NOT EXISTS. + this.db.exec(` + CREATE INDEX IF NOT EXISTS idx_messages_scheduled_pending + ON messages(scheduled_at) + WHERE scheduled_at IS NOT NULL AND invoked_at IS NULL + `) + } + private getSessionColumnNames(): Set { const rows = this.db.prepare('PRAGMA table_info(sessions)').all() as Array<{ name: string }> return new Set(rows.map((row) => row.name)) diff --git a/hub/src/store/messageStore.ts b/hub/src/store/messageStore.ts index b7173caa1..34fb60a69 100644 --- a/hub/src/store/messageStore.ts +++ b/hub/src/store/messageStore.ts @@ -1,7 +1,7 @@ import type { Database } from 'bun:sqlite' import type { StoredMessage } from './types' -import { addMessage, cancelQueuedMessage, deleteQueuedMessageById, lookupQueuedMessage, getMessages, getMessagesAfter, getMessagesByPosition, getUninvokedLocalMessages, markMessagesInvoked, mergeSessionMessages, type CancelQueuedMessageResult, type LookupQueuedMessageResult } from './messages' +import { addMessage, cancelQueuedMessage, deleteQueuedMessageById, lookupQueuedMessage, getMessages, getDeliverableMessagesAfter, getMessagesByPosition, getUninvokedLocalMessages, getMatureScheduledMessages, getImmediateQueuedLocalMessages, markMessagesInvoked, mergeSessionMessages, type CancelQueuedMessageResult, type LookupQueuedMessageResult } from './messages' export class MessageStore { private readonly db: Database @@ -10,16 +10,16 @@ export class MessageStore { this.db = db } - addMessage(sessionId: string, content: unknown, localId?: string): StoredMessage { - return addMessage(this.db, sessionId, content, localId) + addMessage(sessionId: string, content: unknown, localId?: string, scheduledAt?: number | null): StoredMessage { + return addMessage(this.db, sessionId, content, localId, scheduledAt) } getMessages(sessionId: string, limit: number = 200, beforeSeq?: number): StoredMessage[] { return getMessages(this.db, sessionId, limit, beforeSeq) } - getMessagesAfter(sessionId: string, afterSeq: number, limit: number = 200): StoredMessage[] { - return getMessagesAfter(this.db, sessionId, afterSeq, limit) + getDeliverableMessagesAfter(sessionId: string, afterSeq: number, now: number, limit: number = 200): StoredMessage[] { + return getDeliverableMessagesAfter(this.db, sessionId, afterSeq, now, limit) } getMessagesByPosition(sessionId: string, limit: number, before?: { at: number; seq: number }): StoredMessage[] { @@ -30,6 +30,14 @@ export class MessageStore { return getUninvokedLocalMessages(this.db, sessionId) } + getMatureScheduledMessages(beforeTime: number): StoredMessage[] { + return getMatureScheduledMessages(this.db, beforeTime) + } + + getImmediateQueuedLocalMessages(sessionId: string): StoredMessage[] { + return getImmediateQueuedLocalMessages(this.db, sessionId) + } + cancelQueuedMessage(sessionId: string, messageId: string): CancelQueuedMessageResult { return cancelQueuedMessage(this.db, sessionId, messageId) } diff --git a/hub/src/store/messages.test.ts b/hub/src/store/messages.test.ts index c7c53c370..f6ad990cd 100644 --- a/hub/src/store/messages.test.ts +++ b/hub/src/store/messages.test.ts @@ -172,3 +172,118 @@ describe('cancelQueuedMessage', () => { expect(messages.some(m => m.id === msg.id)).toBe(true) }) }) + +describe('addMessage: scheduledAt invariants', () => { + it('rejects scheduledAt without a localId — would silently invoke immediately', () => { + const store = makeStore() + const session = makeSession(store, 'sched-invariant') + const future = Date.now() + 60_000 + + expect(() => + store.messages.addMessage( + session.id, + { role: 'user', content: { type: 'text', text: 'orphan scheduled' } }, + undefined, + future + ) + ).toThrow(/scheduledAt requires a localId/) + }) + + it('accepts scheduledAt when paired with a localId and keeps invoked_at NULL', () => { + const store = makeStore() + const session = makeSession(store, 'sched-ok') + const future = Date.now() + 60_000 + + const msg = store.messages.addMessage( + session.id, + { role: 'user', content: { type: 'text', text: 'queued for later' } }, + 'lid-sched', + future + ) + + expect(msg.scheduledAt).toBe(future) + expect(msg.invokedAt).toBeNull() + }) +}) + +describe('getDeliverableMessagesAfter: CLI backfill excludes future-scheduled rows', () => { + it('omits rows whose scheduled_at > now (would otherwise be replayed early on reconnect)', () => { + const store = makeStore() + const session = makeSession(store, 'backfill-future-sched') + const now = Date.now() + const future = now + 60_000 + const past = now - 60_000 + + const immediate = store.messages.addMessage( + session.id, + { role: 'user', content: { type: 'text', text: 'immediate' } }, + 'lid-immediate' + ) + store.messages.addMessage( + session.id, + { role: 'user', content: { type: 'text', text: 'future-scheduled' } }, + 'lid-future', + future + ) + const matureSched = store.messages.addMessage( + session.id, + { role: 'user', content: { type: 'text', text: 'mature-scheduled' } }, + 'lid-mature', + past + ) + + const delivered = store.messages.getDeliverableMessagesAfter(session.id, 0, now) + const ids = delivered.map((m) => m.id) + expect(ids).toContain(immediate.id) + expect(ids).toContain(matureSched.id) + expect(ids).not.toContain('lid-future') + const localIds = delivered.map((m) => m.localId) + expect(localIds).not.toContain('lid-future') + }) + + it('returns the row once now advances past scheduled_at (release boundary)', () => { + const store = makeStore() + const session = makeSession(store, 'backfill-release-boundary') + const fireAt = Date.now() - 60_000 + + store.messages.addMessage( + session.id, + { role: 'user', content: { type: 'text', text: 'boundary' } }, + 'lid-bnd', + fireAt + ) + + const before = store.messages.getDeliverableMessagesAfter(session.id, 0, fireAt - 1) + expect(before.find((m) => m.localId === 'lid-bnd')).toBeUndefined() + + const exact = store.messages.getDeliverableMessagesAfter(session.id, 0, fireAt) + expect(exact.find((m) => m.localId === 'lid-bnd')).toBeDefined() + }) + + it('respects afterSeq alongside the scheduled_at filter (2-axis interaction)', () => { + // Verifies the seq cursor and the scheduled-at filter compose correctly: + // a row that satisfies one axis but fails the other must be excluded. + const store = makeStore() + const session = makeSession(store, 'backfill-2axis') + const now = Date.now() + + const m1 = store.messages.addMessage( + session.id, + { role: 'user', content: { type: 'text', text: 'first' } }, + 'lid-1' + ) + const m2 = store.messages.addMessage( + session.id, + { role: 'user', content: { type: 'text', text: 'second' } }, + 'lid-2' + ) + + // afterSeq = m1.seq → only m2 should be returned. + const onlyM2 = store.messages.getDeliverableMessagesAfter(session.id, m1.seq, now) + expect(onlyM2.map((m) => m.id)).toEqual([m2.id]) + + // afterSeq = m2.seq → nothing (cursor at the end). + const empty = store.messages.getDeliverableMessagesAfter(session.id, m2.seq, now) + expect(empty).toHaveLength(0) + }) +}) diff --git a/hub/src/store/messages.ts b/hub/src/store/messages.ts index aa00bcf00..e6684180f 100644 --- a/hub/src/store/messages.ts +++ b/hub/src/store/messages.ts @@ -12,6 +12,7 @@ type DbMessageRow = { seq: number local_id: string | null invoked_at: number | null + scheduled_at: number | null } function toStoredMessage(row: DbMessageRow): StoredMessage { @@ -22,7 +23,8 @@ function toStoredMessage(row: DbMessageRow): StoredMessage { createdAt: row.created_at, seq: row.seq, localId: row.local_id, - invokedAt: row.invoked_at ?? null + invokedAt: row.invoked_at ?? null, + scheduledAt: row.scheduled_at ?? null } } @@ -30,10 +32,20 @@ export function addMessage( db: Database, sessionId: string, content: unknown, - localId?: string + localId?: string, + scheduledAt?: number | null ): StoredMessage { const now = Date.now() + // Without a localId, invoked_at is stamped immediately below — there is no + // ack path to flip it later. A scheduled message in that state would be + // skipped by the future-emit branch and never picked up by + // getMatureScheduledMessages (which filters on invoked_at IS NULL), so + // the schedule would be silently lost. + if (scheduledAt != null && !localId) { + throw new Error('addMessage: scheduledAt requires a localId for the ack flow') + } + if (localId) { const existing = db.prepare( 'SELECT * FROM messages WHERE session_id = ? AND local_id = ? LIMIT 1' @@ -58,9 +70,9 @@ export function addMessage( db.prepare(` INSERT INTO messages ( - id, session_id, content, created_at, seq, local_id, invoked_at + id, session_id, content, created_at, seq, local_id, invoked_at, scheduled_at ) VALUES ( - @id, @session_id, @content, @created_at, @seq, @local_id, @invoked_at + @id, @session_id, @content, @created_at, @seq, @local_id, @invoked_at, @scheduled_at ) `).run({ id, @@ -69,7 +81,8 @@ export function addMessage( created_at: now, seq: msgSeq, local_id: localId ?? null, - invoked_at: invokedAt + invoked_at: invokedAt, + scheduled_at: scheduledAt ?? null }) const row = db.prepare('SELECT * FROM messages WHERE id = ?').get(id) as DbMessageRow | undefined @@ -98,18 +111,32 @@ export function getMessages( return rows.reverse().map(toStoredMessage) } -export function getMessagesAfter( +/** CLI reconnect backfill: returns messages above the seq cursor that are + * deliverable now, i.e. excludes future-scheduled rows (scheduled_at > now). + * Without this filter, a CLI reconnect between schedule time and release time + * would replay future-scheduled rows via the normal message stream and the + * runner would consume them immediately, bypassing the mature-scan path. + * Only the CLI backfill route should use this; the Web thread API still calls + * byPosition / getMessages and needs the full set so scheduled rows surface in + * the queued floating bar. */ +export function getDeliverableMessagesAfter( db: Database, sessionId: string, afterSeq: number, + now: number, limit: number = 200 ): StoredMessage[] { const safeLimit = Number.isFinite(limit) ? Math.max(1, Math.min(200, limit)) : 200 const safeAfterSeq = Number.isFinite(afterSeq) ? afterSeq : 0 - const rows = db.prepare( - 'SELECT * FROM messages WHERE session_id = ? AND seq > ? ORDER BY seq ASC LIMIT ?' - ).all(sessionId, safeAfterSeq, safeLimit) as DbMessageRow[] + const rows = db.prepare(` + SELECT * FROM messages + WHERE session_id = ? + AND seq > ? + AND (scheduled_at IS NULL OR scheduled_at <= ?) + ORDER BY seq ASC + LIMIT ? + `).all(sessionId, safeAfterSeq, now, safeLimit) as DbMessageRow[] return rows.map(toStoredMessage) } @@ -144,9 +171,8 @@ export function getMessagesByPosition( } /** Returns user messages that have a localId but no invoked_at. - * Used to surface queued messages on refresh / secondary clients even when they - * fall outside the latest position-ordered page (their position key is the send - * time, but the floating bar still needs to render them). */ + * Includes future scheduled messages — used to surface all queued messages + * (including scheduled) for the Web floating bar on refresh / secondary clients. */ export function getUninvokedLocalMessages( db: Database, sessionId: string @@ -157,6 +183,47 @@ export function getUninvokedLocalMessages( return rows.map(toStoredMessage) } +/** Returns scheduled messages across all sessions whose scheduled_at <= beforeTime + * and have not yet been invoked. Used by the hub tick to emit mature messages to CLI. + * Ordered by scheduled_at ASC (oldest first). */ +export function getMatureScheduledMessages( + db: Database, + beforeTime: number +): StoredMessage[] { + const rows = db.prepare( + 'SELECT * FROM messages WHERE scheduled_at IS NOT NULL AND scheduled_at <= ? AND invoked_at IS NULL ORDER BY scheduled_at ASC' + ).all(beforeTime) as DbMessageRow[] + return rows.map(toStoredMessage) +} + +/** Returns immediate-queued local messages for a session — i.e. rows that have + * no scheduled_at (scheduled_at IS NULL). Used by the session-end sweep + * (sweepImmediateQueuedOnSessionEnd): these are messages the user posted to a + * CLI session that ended before the runner consumed them, so they cannot ever + * be delivered and must be force-invoked to clear the floating bar. + * + * Scheduled rows (scheduled_at IS NOT NULL) are *deliberately excluded*, mature + * or not. The mature-scan path (releaseMatureScheduledMessages) is the sole + * emit channel for scheduled rows and it does not write invoked_at — the CLI + * ack does. If the session-end sweep stamped a mature scheduled row as + * invoked, a subsequent CLI re-attach would never see the row in the + * mature-scan results (it filters on invoked_at IS NULL), and the user's + * scheduled prompt would be silently dropped. See HAPI Bot R4 finding. */ +export function getImmediateQueuedLocalMessages( + db: Database, + sessionId: string +): StoredMessage[] { + const rows = db.prepare(` + SELECT * FROM messages + WHERE session_id = ? + AND invoked_at IS NULL + AND local_id IS NOT NULL + AND scheduled_at IS NULL + ORDER BY seq ASC + `).all(sessionId) as DbMessageRow[] + return rows.map(toStoredMessage) +} + export function getMaxSeq(db: Database, sessionId: string): number { const row = db.prepare( 'SELECT COALESCE(MAX(seq), 0) AS maxSeq FROM messages WHERE session_id = ?' @@ -221,7 +288,7 @@ export function cancelQueuedMessage( export type LookupQueuedMessageResult = | { status: 'absent' } | { status: 'invoked'; message: StoredMessage } - | { status: 'queued'; localId: string | null; resolvedId: string } + | { status: 'queued'; localId: string | null; resolvedId: string; scheduledAt: number | null } /** Look up a queued message without deleting it. * @@ -251,7 +318,7 @@ export function lookupQueuedMessage( return { status: 'invoked' as const, message: toStoredMessage(row) } } - return { status: 'queued' as const, localId: row.local_id, resolvedId: row.id } + return { status: 'queued' as const, localId: row.local_id, resolvedId: row.id, scheduledAt: row.scheduled_at } } /** Delete a queued (invoked_at IS NULL) message by id or local_id. diff --git a/hub/src/store/migration-v9.test.ts b/hub/src/store/migration-v9.test.ts new file mode 100644 index 000000000..18e8db3a7 --- /dev/null +++ b/hub/src/store/migration-v9.test.ts @@ -0,0 +1,539 @@ +import { describe, expect, it } from 'bun:test' +import { Database } from 'bun:sqlite' +import { mkdtempSync, rmSync } from 'node:fs' +import { join } from 'node:path' +import { tmpdir } from 'node:os' +import { Store } from './index' + +/** + * Tests for V8→V9 schema migration: adding scheduled_at column to messages table. + * Follows the same pattern as migration-v8.test.ts. + */ +describe('Store V8→V9 migration: scheduled_at column', () => { + it('fresh DB has scheduled_at column in messages', () => { + const store = new Store(':memory:') + const cols = getMessageColumns(store) + expect(cols).toContain('scheduled_at') + }) + + it('V8 DB migrates to V9 via Store: scheduled_at added, existing rows have NULL scheduled_at', () => { + const dir = mkdtempSync(join(tmpdir(), 'hapi-migration-v9-test-')) + const dbPath = join(dir, 'test.db') + try { + // Build a V8 DB on disk, insert rows, then open via Store to trigger migration + const db = new Database(dbPath, { create: true, readwrite: true, strict: true }) + db.exec('PRAGMA journal_mode = WAL') + db.exec('PRAGMA foreign_keys = ON') + createV8Schema(db) + db.exec('PRAGMA user_version = 8') + db.exec(`INSERT INTO sessions (id, namespace, created_at, updated_at, seq) + VALUES ('s1', 'default', 1000, 1000, 0)`) + db.exec(`INSERT INTO messages (id, session_id, content, created_at, seq, local_id, invoked_at) + VALUES ('m1', 's1', '"hello"', 1000, 1, 'l1', NULL)`) + db.exec(`INSERT INTO messages (id, session_id, content, created_at, seq, local_id, invoked_at) + VALUES ('m2', 's1', '"world"', 2000, 2, NULL, 2000)`) + db.close() + + // Open via Store — should auto-migrate V8→V9 + const store = new Store(dbPath) + const cols = getMessageColumns(store) + expect(cols).toContain('scheduled_at') + + // Existing rows should have scheduled_at = NULL + const msgs = store.messages.getMessages('s1') + expect(msgs).toHaveLength(2) + const m1 = msgs.find(m => m.id === 'm1')! + const m2 = msgs.find(m => m.id === 'm2')! + expect(m1.scheduledAt).toBeNull() + expect(m2.scheduledAt).toBeNull() + } finally { + rmSync(dir, { recursive: true, force: true }) + } + }) + + it('V7 DB migrates to V9 (multi-hop: V7→V8→V9)', () => { + const dir = mkdtempSync(join(tmpdir(), 'hapi-migration-v7-to-v9-')) + const dbPath = join(dir, 'test.db') + try { + const db = new Database(dbPath, { create: true, readwrite: true, strict: true }) + db.exec('PRAGMA journal_mode = WAL') + db.exec('PRAGMA foreign_keys = ON') + createV7Schema(db) + db.exec('PRAGMA user_version = 7') + db.close() + + const store = new Store(dbPath) + const cols = getMessageColumns(store) + expect(cols).toContain('invoked_at') + expect(cols).toContain('scheduled_at') + } finally { + rmSync(dir, { recursive: true, force: true }) + } + }) + + it('V6 DB migrates to V9 (multi-hop)', () => { + const dir = mkdtempSync(join(tmpdir(), 'hapi-migration-v6-to-v9-')) + const dbPath = join(dir, 'test.db') + try { + const db = new Database(dbPath, { create: true, readwrite: true, strict: true }) + db.exec('PRAGMA journal_mode = WAL') + db.exec('PRAGMA foreign_keys = ON') + createV6Schema(db) + db.exec('PRAGMA user_version = 6') + db.close() + + const store = new Store(dbPath) + const cols = getMessageColumns(store) + expect(cols).toContain('scheduled_at') + const sessionCols = getSessionColumns(store) + expect(sessionCols).toContain('model_reasoning_effort') + } finally { + rmSync(dir, { recursive: true, force: true }) + } + }) + + it('V9 DB reopen is idempotent: schema unchanged', () => { + const dir = mkdtempSync(join(tmpdir(), 'hapi-migration-v9-idempotent-')) + const dbPath = join(dir, 'test.db') + try { + const store1 = new Store(dbPath) + const cols1 = getMessageColumns(store1) + expect(cols1).toContain('scheduled_at') + + // Re-open same DB — version is already 9, must not throw or alter schema + const store2 = new Store(dbPath) + const cols2 = getMessageColumns(store2) + expect(cols2).toEqual(cols1) + } finally { + rmSync(dir, { recursive: true, force: true }) + } + }) + + it('migrateFromV8ToV9 PRAGMA guard: scheduled_at column appears exactly once', () => { + const dir = mkdtempSync(join(tmpdir(), 'hapi-migration-v9-guard-')) + const dbPath = join(dir, 'test.db') + try { + const db = new Database(dbPath, { create: true, readwrite: true, strict: true }) + db.exec('PRAGMA journal_mode = WAL') + db.exec('PRAGMA foreign_keys = ON') + createV8Schema(db) + db.exec('PRAGMA user_version = 8') + db.close() + + const store = new Store(dbPath) + const cols = getMessageColumns(store) + const count = cols.filter(c => c === 'scheduled_at').length + expect(count).toBe(1) + } finally { + rmSync(dir, { recursive: true, force: true }) + } + }) + + it('idx_messages_scheduled_pending index exists on fresh DB', () => { + const store = new Store(':memory:') + const db: Database = (store as any).db + const rows = db.prepare( + "SELECT name FROM sqlite_master WHERE type='index' AND name='idx_messages_scheduled_pending'" + ).all() as Array<{ name: string }> + expect(rows).toHaveLength(1) + }) + + it('idx_messages_scheduled_pending index exists after V8→V9 migration', () => { + const dir = mkdtempSync(join(tmpdir(), 'hapi-index-v8-v9-')) + const dbPath = join(dir, 'test.db') + try { + const db = new Database(dbPath, { create: true, readwrite: true, strict: true }) + db.exec('PRAGMA journal_mode = WAL') + db.exec('PRAGMA foreign_keys = ON') + createV8Schema(db) + db.exec('PRAGMA user_version = 8') + db.close() + + const store = new Store(dbPath) + const db2: Database = (store as any).db + const rows = db2.prepare( + "SELECT name FROM sqlite_master WHERE type='index' AND name='idx_messages_scheduled_pending'" + ).all() as Array<{ name: string }> + expect(rows).toHaveLength(1) + } finally { + rmSync(dir, { recursive: true, force: true }) + } + }) +}) + +describe('Store V9: scheduled_at store operations', () => { + it('addMessage with scheduledAt stores the value', () => { + const store = new Store(':memory:') + const session = store.sessions.getOrCreateSession('test', { path: '/tmp' }, null, 'default') + const futureMs = Date.now() + 60_000 + const msg = store.messages.addMessage(session.id, 'hello', 'local-1', futureMs) + expect(msg.scheduledAt).toBe(futureMs) + }) + + it('addMessage without scheduledAt has scheduledAt = null', () => { + const store = new Store(':memory:') + const session = store.sessions.getOrCreateSession('test', { path: '/tmp' }, null, 'default') + const msg = store.messages.addMessage(session.id, 'hello', 'local-1') + expect(msg.scheduledAt).toBeNull() + }) + + it('getMatureScheduledMessages returns messages with scheduled_at <= now', () => { + const store = new Store(':memory:') + const session = store.sessions.getOrCreateSession('test', { path: '/tmp' }, null, 'default') + const now = Date.now() + const past = now - 1000 + const future = now + 60_000 + + // Mature: scheduled_at in the past + const mature = store.messages.addMessage(session.id, 'mature', 'local-mature', past) + // Future: not yet mature + store.messages.addMessage(session.id, 'future', 'local-future', future) + // No scheduledAt: not scheduled + store.messages.addMessage(session.id, 'plain', 'local-plain') + + const results = store.messages.getMatureScheduledMessages(now) + expect(results.map(m => m.id)).toContain(mature.id) + expect(results).toHaveLength(1) + }) + + it('getMatureScheduledMessages excludes already-invoked messages', () => { + const store = new Store(':memory:') + const session = store.sessions.getOrCreateSession('test', { path: '/tmp' }, null, 'default') + const now = Date.now() + const past = now - 1000 + + const msg = store.messages.addMessage(session.id, 'mature', 'local-m', past) + // Simulate CLI ack + store.messages.markMessagesInvoked(session.id, ['local-m'], now) + + const results = store.messages.getMatureScheduledMessages(now) + expect(results.find(m => m.id === msg.id)).toBeUndefined() + }) + + it('getMatureScheduledMessages returns in scheduled_at ASC order', () => { + const store = new Store(':memory:') + const session = store.sessions.getOrCreateSession('test', { path: '/tmp' }, null, 'default') + const now = Date.now() + + const msg2 = store.messages.addMessage(session.id, 'second', 'local-2', now - 500) + const msg1 = store.messages.addMessage(session.id, 'first', 'local-1', now - 1000) + + const results = store.messages.getMatureScheduledMessages(now) + expect(results.map(m => m.id)).toEqual([msg1.id, msg2.id]) + }) + + it('getImmediateQueuedLocalMessages: returns only immediate queued, excludes mature AND future scheduled (HAPI Bot R4)', () => { + const store = new Store(':memory:') + const session = store.sessions.getOrCreateSession('test', { path: '/tmp' }, null, 'default') + const now = Date.now() + + // Immediate queued (no scheduledAt) — included + const immediate = store.messages.addMessage(session.id, 'immediate', 'local-imm') + // Mature scheduled — must be excluded so the mature-scan path can deliver it + // with the no-stamp + re-emit-until-ack contract. + store.messages.addMessage(session.id, 'mature', 'local-mature', now - 1000) + // Future scheduled — must be excluded + store.messages.addMessage(session.id, 'future', 'local-future', now + 60_000) + + const results = store.messages.getImmediateQueuedLocalMessages(session.id) + const ids = results.map(m => m.id) + expect(ids).toEqual([immediate.id]) + }) + + it('getImmediateQueuedLocalMessages excludes already-invoked messages', () => { + const store = new Store(':memory:') + const session = store.sessions.getOrCreateSession('test', { path: '/tmp' }, null, 'default') + const now = Date.now() + + const msg = store.messages.addMessage(session.id, 'q', 'local-q') + store.messages.markMessagesInvoked(session.id, ['local-q'], now) + + const results = store.messages.getImmediateQueuedLocalMessages(session.id) + expect(results.find(m => m.id === msg.id)).toBeUndefined() + }) + + it('getUninvokedLocalMessages still includes future scheduled (for Web bar display)', () => { + const store = new Store(':memory:') + const session = store.sessions.getOrCreateSession('test', { path: '/tmp' }, null, 'default') + const future = Date.now() + 60_000 + + const scheduled = store.messages.addMessage(session.id, 'future', 'local-f', future) + + const results = store.messages.getUninvokedLocalMessages(session.id) + expect(results.map(m => m.id)).toContain(scheduled.id) + }) + + it('legacy DB (user_version=0 with V8-shape tables): step ladder backfills scheduled_at', () => { + const dir = mkdtempSync(join(tmpdir(), 'hapi-legacy-v0-v9-')) + const dbPath = join(dir, 'test.db') + try { + const db = new Database(dbPath, { create: true, readwrite: true, strict: true }) + db.exec('PRAGMA journal_mode = WAL') + db.exec('PRAGMA foreign_keys = ON') + createV8Schema(db) + // Intentionally do NOT set user_version — leaves it at 0 (legacy) + db.exec(`INSERT INTO sessions (id, namespace, created_at, updated_at, seq) + VALUES ('s1', 'default', 1000, 1000, 0)`) + db.exec(`INSERT INTO messages (id, session_id, content, created_at, seq, local_id, invoked_at) + VALUES ('m1', 's1', '"hi"', 1500, 1, 'l1', NULL)`) + db.close() + + const store = new Store(dbPath) + const cols = getMessageColumns(store) + expect(cols).toContain('scheduled_at') + } finally { + rmSync(dir, { recursive: true, force: true }) + } + }) +}) + +function getMessageColumns(store: Store): string[] { + const db: Database = (store as any).db + const rows = db.prepare('PRAGMA table_info(messages)').all() as Array<{ name: string }> + return rows.map(r => r.name) +} + +function getSessionColumns(store: Store): string[] { + const db: Database = (store as any).db + const rows = db.prepare('PRAGMA table_info(sessions)').all() as Array<{ name: string }> + return rows.map(r => r.name) +} + +/** V8 schema: messages table with invoked_at but without scheduled_at */ +function createV8Schema(db: Database): void { + db.exec(` + CREATE TABLE IF NOT EXISTS sessions ( + id TEXT PRIMARY KEY, + tag TEXT, + namespace TEXT NOT NULL DEFAULT 'default', + machine_id TEXT, + created_at INTEGER NOT NULL, + updated_at INTEGER NOT NULL, + metadata TEXT, + metadata_version INTEGER DEFAULT 1, + agent_state TEXT, + agent_state_version INTEGER DEFAULT 1, + model TEXT, + model_reasoning_effort TEXT, + effort TEXT, + todos TEXT, + todos_updated_at INTEGER, + team_state TEXT, + team_state_updated_at INTEGER, + active INTEGER DEFAULT 0, + active_at INTEGER, + seq INTEGER DEFAULT 0 + ); + CREATE INDEX IF NOT EXISTS idx_sessions_tag ON sessions(tag); + CREATE INDEX IF NOT EXISTS idx_sessions_tag_namespace ON sessions(tag, namespace); + + CREATE TABLE IF NOT EXISTS machines ( + id TEXT PRIMARY KEY, + namespace TEXT NOT NULL DEFAULT 'default', + created_at INTEGER NOT NULL, + updated_at INTEGER NOT NULL, + metadata TEXT, + metadata_version INTEGER DEFAULT 1, + runner_state TEXT, + runner_state_version INTEGER DEFAULT 1, + active INTEGER DEFAULT 0, + active_at INTEGER, + seq INTEGER DEFAULT 0 + ); + CREATE INDEX IF NOT EXISTS idx_machines_namespace ON machines(namespace); + + CREATE TABLE IF NOT EXISTS messages ( + id TEXT PRIMARY KEY, + session_id TEXT NOT NULL, + content TEXT NOT NULL, + created_at INTEGER NOT NULL, + seq INTEGER NOT NULL, + local_id TEXT, + invoked_at INTEGER, + FOREIGN KEY (session_id) REFERENCES sessions(id) ON DELETE CASCADE + ); + CREATE INDEX IF NOT EXISTS idx_messages_session ON messages(session_id, seq); + CREATE UNIQUE INDEX IF NOT EXISTS idx_messages_local_id ON messages(session_id, local_id) WHERE local_id IS NOT NULL; + CREATE INDEX IF NOT EXISTS idx_messages_session_position + ON messages(session_id, COALESCE(invoked_at, created_at) DESC, seq DESC); + + CREATE TABLE IF NOT EXISTS users ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + platform TEXT NOT NULL, + platform_user_id TEXT NOT NULL, + namespace TEXT NOT NULL DEFAULT 'default', + created_at INTEGER NOT NULL, + UNIQUE(platform, platform_user_id) + ); + CREATE INDEX IF NOT EXISTS idx_users_platform ON users(platform); + CREATE INDEX IF NOT EXISTS idx_users_platform_namespace ON users(platform, namespace); + + CREATE TABLE IF NOT EXISTS push_subscriptions ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + namespace TEXT NOT NULL, + endpoint TEXT NOT NULL, + p256dh TEXT NOT NULL, + auth TEXT NOT NULL, + created_at INTEGER NOT NULL, + UNIQUE(namespace, endpoint) + ); + CREATE INDEX IF NOT EXISTS idx_push_subscriptions_namespace ON push_subscriptions(namespace); + `) +} + +/** V7 schema: messages table without invoked_at (and thus without scheduled_at) */ +function createV7Schema(db: Database): void { + db.exec(` + CREATE TABLE IF NOT EXISTS sessions ( + id TEXT PRIMARY KEY, + tag TEXT, + namespace TEXT NOT NULL DEFAULT 'default', + machine_id TEXT, + created_at INTEGER NOT NULL, + updated_at INTEGER NOT NULL, + metadata TEXT, + metadata_version INTEGER DEFAULT 1, + agent_state TEXT, + agent_state_version INTEGER DEFAULT 1, + model TEXT, + model_reasoning_effort TEXT, + effort TEXT, + todos TEXT, + todos_updated_at INTEGER, + team_state TEXT, + team_state_updated_at INTEGER, + active INTEGER DEFAULT 0, + active_at INTEGER, + seq INTEGER DEFAULT 0 + ); + CREATE INDEX IF NOT EXISTS idx_sessions_tag ON sessions(tag); + CREATE INDEX IF NOT EXISTS idx_sessions_tag_namespace ON sessions(tag, namespace); + + CREATE TABLE IF NOT EXISTS machines ( + id TEXT PRIMARY KEY, + namespace TEXT NOT NULL DEFAULT 'default', + created_at INTEGER NOT NULL, + updated_at INTEGER NOT NULL, + metadata TEXT, + metadata_version INTEGER DEFAULT 1, + runner_state TEXT, + runner_state_version INTEGER DEFAULT 1, + active INTEGER DEFAULT 0, + active_at INTEGER, + seq INTEGER DEFAULT 0 + ); + CREATE INDEX IF NOT EXISTS idx_machines_namespace ON machines(namespace); + + CREATE TABLE IF NOT EXISTS messages ( + id TEXT PRIMARY KEY, + session_id TEXT NOT NULL, + content TEXT NOT NULL, + created_at INTEGER NOT NULL, + seq INTEGER NOT NULL, + local_id TEXT, + FOREIGN KEY (session_id) REFERENCES sessions(id) ON DELETE CASCADE + ); + CREATE INDEX IF NOT EXISTS idx_messages_session ON messages(session_id, seq); + CREATE UNIQUE INDEX IF NOT EXISTS idx_messages_local_id ON messages(session_id, local_id) WHERE local_id IS NOT NULL; + + CREATE TABLE IF NOT EXISTS users ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + platform TEXT NOT NULL, + platform_user_id TEXT NOT NULL, + namespace TEXT NOT NULL DEFAULT 'default', + created_at INTEGER NOT NULL, + UNIQUE(platform, platform_user_id) + ); + CREATE INDEX IF NOT EXISTS idx_users_platform ON users(platform); + CREATE INDEX IF NOT EXISTS idx_users_platform_namespace ON users(platform, namespace); + + CREATE TABLE IF NOT EXISTS push_subscriptions ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + namespace TEXT NOT NULL, + endpoint TEXT NOT NULL, + p256dh TEXT NOT NULL, + auth TEXT NOT NULL, + created_at INTEGER NOT NULL, + UNIQUE(namespace, endpoint) + ); + CREATE INDEX IF NOT EXISTS idx_push_subscriptions_namespace ON push_subscriptions(namespace); + `) +} + +/** V6 schema: sessions without model_reasoning_effort; messages without invoked_at */ +function createV6Schema(db: Database): void { + db.exec(` + CREATE TABLE IF NOT EXISTS sessions ( + id TEXT PRIMARY KEY, + tag TEXT, + namespace TEXT NOT NULL DEFAULT 'default', + machine_id TEXT, + created_at INTEGER NOT NULL, + updated_at INTEGER NOT NULL, + metadata TEXT, + metadata_version INTEGER DEFAULT 1, + agent_state TEXT, + agent_state_version INTEGER DEFAULT 1, + model TEXT, + effort TEXT, + todos TEXT, + todos_updated_at INTEGER, + team_state TEXT, + team_state_updated_at INTEGER, + active INTEGER DEFAULT 0, + active_at INTEGER, + seq INTEGER DEFAULT 0 + ); + CREATE INDEX IF NOT EXISTS idx_sessions_tag ON sessions(tag); + CREATE INDEX IF NOT EXISTS idx_sessions_tag_namespace ON sessions(tag, namespace); + + CREATE TABLE IF NOT EXISTS machines ( + id TEXT PRIMARY KEY, + namespace TEXT NOT NULL DEFAULT 'default', + created_at INTEGER NOT NULL, + updated_at INTEGER NOT NULL, + metadata TEXT, + metadata_version INTEGER DEFAULT 1, + runner_state TEXT, + runner_state_version INTEGER DEFAULT 1, + active INTEGER DEFAULT 0, + active_at INTEGER, + seq INTEGER DEFAULT 0 + ); + CREATE INDEX IF NOT EXISTS idx_machines_namespace ON machines(namespace); + + CREATE TABLE IF NOT EXISTS messages ( + id TEXT PRIMARY KEY, + session_id TEXT NOT NULL, + content TEXT NOT NULL, + created_at INTEGER NOT NULL, + seq INTEGER NOT NULL, + local_id TEXT, + FOREIGN KEY (session_id) REFERENCES sessions(id) ON DELETE CASCADE + ); + CREATE INDEX IF NOT EXISTS idx_messages_session ON messages(session_id, seq); + CREATE UNIQUE INDEX IF NOT EXISTS idx_messages_local_id ON messages(session_id, local_id) WHERE local_id IS NOT NULL; + + CREATE TABLE IF NOT EXISTS users ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + platform TEXT NOT NULL, + platform_user_id TEXT NOT NULL, + namespace TEXT NOT NULL DEFAULT 'default', + created_at INTEGER NOT NULL, + UNIQUE(platform, platform_user_id) + ); + CREATE INDEX IF NOT EXISTS idx_users_platform ON users(platform); + CREATE INDEX IF NOT EXISTS idx_users_platform_namespace ON users(platform, namespace); + + CREATE TABLE IF NOT EXISTS push_subscriptions ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + namespace TEXT NOT NULL, + endpoint TEXT NOT NULL, + p256dh TEXT NOT NULL, + auth TEXT NOT NULL, + created_at INTEGER NOT NULL, + UNIQUE(namespace, endpoint) + ); + CREATE INDEX IF NOT EXISTS idx_push_subscriptions_namespace ON push_subscriptions(namespace); + `) +} diff --git a/hub/src/store/types.ts b/hub/src/store/types.ts index a81762f73..fb77d8577 100644 --- a/hub/src/store/types.ts +++ b/hub/src/store/types.ts @@ -43,6 +43,7 @@ export type StoredMessage = { seq: number localId: string | null invokedAt: number | null + scheduledAt: number | null } export type StoredUser = { diff --git a/hub/src/sync/messageService.test.ts b/hub/src/sync/messageService.test.ts index 55ee0c85f..b8928ad66 100644 --- a/hub/src/sync/messageService.test.ts +++ b/hub/src/sync/messageService.test.ts @@ -8,6 +8,9 @@ * Race-E (partial ack): broadcast ack receives err + [{ removed: true }] → DELETE + status='cancelled' */ import { describe, expect, it } from 'bun:test' +import { mkdtempSync, rmSync } from 'node:fs' +import { tmpdir } from 'node:os' +import { join } from 'node:path' import { MessageService } from './messageService' import { Store } from '../store' import type { Server } from 'socket.io' @@ -317,3 +320,659 @@ describe('MessageService.cancelQueuedMessage race scenarios', () => { }) }) }) + +// --------------------------------------------------------------------------- +// #1 cancel × scheduled mature race (expected behavior documentation) +// --------------------------------------------------------------------------- + +describe('MessageService — cancel × mature race (scheduled messages)', () => { + // The 5-second mature tick widens the cancel race window for scheduled + // messages compared to immediately-queued ones. When mature fires first, + // the CLI shifts the row; a subsequent cancel call gets 'not-found' from + // the CLI ack, which stamps invoked_at (PR #568 contract preserved). + // The web client surfaces this as "sent". This test documents that the + // behaviour is intentional — it is the expected outcome, not a bug. + it('cancel after mature-emit stamps invoked_at (race resolved as invoked — expected behavior)', async () => { + const store = makeStore() + const session = makeSession(store, 'race-sched-mature') + const publisher = makePublisher() + + const now = Date.now() + const past = now - 1000 + // Add a scheduled message that is already mature + const msg = store.messages.addMessage( + session.id, + { role: 'user', content: { type: 'text', text: 'sched' } }, + 'local-sched-race', + past + ) + + // Simulate: mature tick already emitted to CLI, CLI shifted the item. + // Cancel arrives and CLI returns not-found (item already shift()-ed). + const io = makeIo((callback) => { + // CLI cannot remove — it already shift()-ed (mature tick beat the cancel) + callback(null, [{ removed: false }]) + }) + + const service = new MessageService(store, io, publisher as any) + // Simulate the mature tick firing first (CLI now has the item) + // Then cancel arrives — CLI says not-found + const result = await service.cancelQueuedMessage(session.id, msg.id) + + // Expected behavior: invoked_at is stamped (PR #568 contract preserved) + // Web client will show the message as "sent" + expect(result.status).toBe('invoked') + if (result.status === 'invoked') { + expect(result.message.localId).toBe('local-sched-race') + expect(result.message.invokedAt).not.toBeNull() + } + + // messages-consumed SSE ensures web clients remove it from the queued bar + const consumed = publisher.events.find(e => e.type === 'messages-consumed') + expect(consumed).toBeDefined() + }) +}) + +// --------------------------------------------------------------------------- +// #1 cancel of future-scheduled message: must DELETE (not invoke) +// --------------------------------------------------------------------------- + +describe('MessageService.cancelQueuedMessage — future-scheduled message', () => { + // A future-scheduled message was never emitted to the CLI. + // When the user clicks X, the hub contacts the CLI (room has sockets) and + // CLI responds not-found — because the message was never there. + // The hub MUST treat this as a clean delete (status='cancelled'), NOT as + // "CLI already consumed it" (which would stamp invoked_at). + it('cancel of future-scheduled msg with CLI online returns cancelled (not invoked)', async () => { + const store = makeStore() + const session = makeSession(store, 'cancel-future-sched') + const publisher = makePublisher() + + const futureMs = Date.now() + 60_000 + const msg = store.messages.addMessage( + session.id, + { role: 'user', content: { type: 'text', text: 'scheduled future' } }, + 'local-future-cancel', + futureMs + ) + + // CLI responds not-found: the message was never emitted there + let ackCalled = false + const io = makeIo((callback) => { + ackCalled = true + callback(null, [{ removed: false }]) + }, 1) // 1 CLI socket online + + const service = new MessageService(store, io, publisher as any) + const result = await service.cancelQueuedMessage(session.id, msg.id) + + // Future-scheduled cancel must succeed as 'cancelled', not 'invoked' + expect(result.status).toBe('cancelled') + + // Row must be gone from DB (not just invoked_at stamped) + const rows = store.messages.getMessages(session.id) + const remaining = rows.find(r => r.id === msg.id) + expect(remaining).toBeUndefined() + + // message-cancelled SSE must be emitted + const cancelled = publisher.events.find(e => e.type === 'message-cancelled') + expect(cancelled).toBeDefined() + + // messages-consumed (invoked path) must NOT be emitted + const consumedCount = publisher.events.filter(e => e.type === 'messages-consumed').length + expect(consumedCount).toBe(0) + + // invoked_at must never have been stamped (row deleted) + // (row is gone, so we just verify the cancel result is not invoked) + expect(result.status).not.toBe('invoked') + + // Short-circuit must have bypassed the CLI ack round-trip entirely. + // ackCalled being false proves the future-scheduled path deleted the row + // without ever contacting the CLI. + expect(ackCalled).toBe(false) + }) + + it('cancel of future-scheduled msg when CLI offline also returns cancelled', async () => { + const store = makeStore() + const session = makeSession(store, 'cancel-future-sched-offline') + const publisher = makePublisher() + + const futureMs = Date.now() + 60_000 + const msg = store.messages.addMessage( + session.id, + { role: 'user', content: { type: 'text', text: 'future offline' } }, + 'local-future-offline', + futureMs + ) + + let ackCalled = false + const io = makeIo(() => { ackCalled = true }, 0) // CLI offline + + const service = new MessageService(store, io, publisher as any) + const result = await service.cancelQueuedMessage(session.id, msg.id) + + expect(result.status).toBe('cancelled') + expect(ackCalled).toBe(false) + + // Row must be deleted + const rows = store.messages.getMessages(session.id) + expect(rows.find(r => r.id === msg.id)).toBeUndefined() + }) +}) + +// --------------------------------------------------------------------------- +// sendMessage with scheduledAt +// --------------------------------------------------------------------------- + +describe('MessageService.sendMessage with scheduledAt', () => { + function makeNoopIo(): Server { + let emittedUpdates: unknown[] = [] + return { + of: (_ns: string) => ({ + to: (_room: string) => ({ + emit: (_event: string, data: unknown) => { emittedUpdates.push(data) }, + timeout: (_ms: number) => ({ + emit: () => {} + }) + }), + adapter: { rooms: { get: () => undefined } } + }), + _emittedUpdates: emittedUpdates + } as unknown as Server + } + + it('future scheduledAt: stores message with scheduledAt, does NOT emit to /cli', async () => { + const store = makeStore() + const session = makeSession(store, 'sched-future') + const publisher = makePublisher() + + const cliEmitted: unknown[] = [] + const io = { + of: (ns: string) => ({ + to: (_room: string) => ({ + emit: (_event: string, data: unknown) => { + if (ns === '/cli') cliEmitted.push(data) + }, + timeout: (_ms: number) => ({ emit: () => {} }) + }), + adapter: { rooms: { get: () => undefined } } + }) + } as unknown as Server + + const futureMs = Date.now() + 60_000 + + const service = new MessageService(store, io, publisher as any) + await service.sendMessage(session.id, { + text: 'hello future', + localId: 'local-sched', + scheduledAt: futureMs + }) + + // DB must have the message with scheduledAt set + const msgs = store.messages.getUninvokedLocalMessages(session.id) + expect(msgs).toHaveLength(1) + expect(msgs[0].scheduledAt).toBe(futureMs) + + // CLI must NOT receive the message yet (future scheduled) + expect(cliEmitted).toHaveLength(0) + + // Web SSE must still receive message-received so the bar renders + const received = publisher.events.find(e => e.type === 'message-received') + expect(received).toBeDefined() + }) + + it('null scheduledAt: immediate send, emits to /cli normally', async () => { + const store = makeStore() + const session = makeSession(store, 'sched-null') + const publisher = makePublisher() + + const cliEmitted: unknown[] = [] + const io = { + of: (ns: string) => ({ + to: (_room: string) => ({ + emit: (_event: string, data: unknown) => { + if (ns === '/cli') cliEmitted.push(data) + }, + timeout: (_ms: number) => ({ emit: () => {} }) + }), + adapter: { rooms: { get: () => undefined } } + }) + } as unknown as Server + + const service = new MessageService(store, io, publisher as any) + await service.sendMessage(session.id, { text: 'immediate', localId: 'local-imm' }) + + // CLI must receive the message immediately + expect(cliEmitted).toHaveLength(1) + + // scheduledAt must be null in DB + const msgs = store.messages.getMessages(session.id) + expect(msgs[0].scheduledAt).toBeNull() + }) + + it('past scheduledAt (already mature): emits to /cli immediately', async () => { + const store = makeStore() + const session = makeSession(store, 'sched-past') + const publisher = makePublisher() + + const cliEmitted: unknown[] = [] + const io = { + of: (ns: string) => ({ + to: (_room: string) => ({ + emit: (_event: string, data: unknown) => { + if (ns === '/cli') cliEmitted.push(data) + }, + timeout: (_ms: number) => ({ emit: () => {} }) + }), + adapter: { rooms: { get: () => undefined } } + }) + } as unknown as Server + + const pastMs = Date.now() - 5_000 + + const service = new MessageService(store, io, publisher as any) + await service.sendMessage(session.id, { + text: 'past scheduled', + localId: 'local-past', + scheduledAt: pastMs + }) + + // Past scheduled_at is already mature → emit to CLI immediately + expect(cliEmitted).toHaveLength(1) + }) + + // #11 TOCTOU: isFutureScheduled must use Date.now() at check time, not the + // pre-addMessage `now` capture, to avoid a double-emit race window. + it('#11 TOCTOU: scheduledAt exactly equal to Date.now() is treated as mature (not future)', async () => { + const store = makeStore() + const session = makeSession(store, 'sched-toctou') + const publisher = makePublisher() + + const cliEmitted: unknown[] = [] + const io = { + of: (ns: string) => ({ + to: (_room: string) => ({ + emit: (_event: string, data: unknown) => { + if (ns === '/cli') cliEmitted.push(data) + }, + timeout: (_ms: number) => ({ emit: () => {} }) + }), + adapter: { rooms: { get: () => undefined } } + }) + } as unknown as Server + + // Use a scheduledAt in the past to simulate TOCTOU: addMessage inserts + // a row, then the post-insert check should use a fresh Date.now() which + // is >= scheduledAt, treating it as mature and emitting to CLI. + const scheduledAt = Date.now() - 1 + const service = new MessageService(store, io, publisher as any) + await service.sendMessage(session.id, { + text: 'toctou', + localId: 'local-toctou', + scheduledAt + }) + + // scheduledAt is in the past at emit-check time → must emit to CLI + expect(cliEmitted).toHaveLength(1) + }) + + // Defence-in-depth: REST already rejects scheduledAt + attachments at the + // Zod layer, but non-REST callers (Telegram bot, MCP, internal) reach + // sendMessage directly and must hit the same invariant — otherwise the CLI + // session's upload directory could be purged before the mature emit lands, + // leaving @path attachment references pointing at deleted files. + it('rejects sendMessage when scheduledAt is set and attachments are non-empty', async () => { + const store = makeStore() + const session = makeSession(store, 'sched-with-attachments') + const publisher = makePublisher() + const service = new MessageService(store, makeNoopIo(), publisher as any) + + const futureMs = Date.now() + 60_000 + await expect( + service.sendMessage(session.id, { + text: 'hello', + localId: 'local-att', + scheduledAt: futureMs, + attachments: [{ + id: 'att-1', + filename: 'a.png', + mimeType: 'image/png', + size: 10, + path: '/tmp/a.png' + }] + }) + ).rejects.toThrow(/scheduled messages with attachments/) + + // Row must NOT have been inserted (throw is the first statement). + const msgs = store.messages.getUninvokedLocalMessages(session.id) + expect(msgs).toHaveLength(0) + }) + + it('accepts sendMessage with scheduledAt and an empty attachments array', async () => { + const store = makeStore() + const session = makeSession(store, 'sched-empty-attachments') + const publisher = makePublisher() + const service = new MessageService(store, makeNoopIo(), publisher as any) + + const futureMs = Date.now() + 60_000 + await service.sendMessage(session.id, { + text: 'hello', + localId: 'local-att-2', + scheduledAt: futureMs, + attachments: [] + }) + + const msgs = store.messages.getUninvokedLocalMessages(session.id) + expect(msgs).toHaveLength(1) + }) +}) + +// --------------------------------------------------------------------------- +// releaseMatureScheduledMessages +// --------------------------------------------------------------------------- + +describe('MessageService.releaseMatureScheduledMessages', () => { + function makeTrackingIo(): { io: Server; cliEmitted: unknown[] } { + const cliEmitted: unknown[] = [] + const io = { + of: (ns: string) => ({ + to: (_room: string) => ({ + emit: (_event: string, data: unknown) => { + if (ns === '/cli') cliEmitted.push(data) + }, + timeout: (_ms: number) => ({ emit: () => {} }) + }), + adapter: { rooms: { get: () => undefined } } + }) + } as unknown as Server + return { io, cliEmitted } + } + + it('emits mature messages to /cli', async () => { + const store = makeStore() + const session = makeSession(store, 'release-emit') + const publisher = makePublisher() + const { io, cliEmitted } = makeTrackingIo() + + const now = Date.now() + const past = now - 1000 + // Insert mature scheduled message directly via store + store.messages.addMessage(session.id, { role: 'user', content: { type: 'text', text: 'hi' } }, 'local-r', past) + + const service = new MessageService(store, io, publisher as any) + service.releaseMatureScheduledMessages(now) + + expect(cliEmitted).toHaveLength(1) + }) + + it('does NOT call markMessagesInvoked (pitfall #2 guard): message is re-emitted on next tick', async () => { + const store = makeStore() + const session = makeSession(store, 'release-no-mark') + const publisher = makePublisher() + const { io, cliEmitted } = makeTrackingIo() + + const now = Date.now() + const past = now - 1000 + store.messages.addMessage(session.id, { role: 'user', content: { type: 'text', text: 'hi' } }, 'local-nm', past) + + const service = new MessageService(store, io, publisher as any) + + // First tick + service.releaseMatureScheduledMessages(now) + expect(cliEmitted).toHaveLength(1) + + // Second tick (simulating hub restart without CLI ack): must re-emit + service.releaseMatureScheduledMessages(now + 5_000) + expect(cliEmitted).toHaveLength(2) + + // invoked_at must still be NULL (not marked) + const msgs = store.messages.getMessages(session.id) + const msg = msgs.find(m => m.localId === 'local-nm')! + expect(msg.invokedAt).toBeNull() + }) + + it('does NOT emit future scheduled messages', async () => { + const store = makeStore() + const session = makeSession(store, 'release-future') + const publisher = makePublisher() + const { io, cliEmitted } = makeTrackingIo() + + const now = Date.now() + const future = now + 60_000 + store.messages.addMessage(session.id, { role: 'user', content: { type: 'text', text: 'hi' } }, 'local-f', future) + + const service = new MessageService(store, io, publisher as any) + service.releaseMatureScheduledMessages(now) + + expect(cliEmitted).toHaveLength(0) + }) + + it('does NOT emit already-invoked messages', async () => { + const store = makeStore() + const session = makeSession(store, 'release-invoked') + const publisher = makePublisher() + const { io, cliEmitted } = makeTrackingIo() + + const now = Date.now() + const past = now - 1000 + store.messages.addMessage(session.id, { role: 'user', content: { type: 'text', text: 'hi' } }, 'local-inv', past) + store.messages.markMessagesInvoked(session.id, ['local-inv'], now - 500) + + const service = new MessageService(store, io, publisher as any) + service.releaseMatureScheduledMessages(now) + + expect(cliEmitted).toHaveLength(0) + }) + + // #10: true cold-start restart simulation — new Store + new MessageService + // share the same SQLite file, replicating what hub restart actually does. + it('#10 hub cold-start restart: mature message is re-emitted by new Store+Service (true restart sim)', () => { + const dir = mkdtempSync(join(tmpdir(), 'hapi-restart-test-')) + const dbPath = join(dir, 'test.db') + try { + // First "run": write a mature scheduled message to disk + const store1 = new Store(dbPath) + const session = store1.sessions.getOrCreateSession('restart-test', { path: '/tmp/restart' }, null, 'default') + const now = Date.now() + const past = now - 2000 + store1.messages.addMessage(session.id, { role: 'user', content: { type: 'text', text: 'restart me' } }, 'local-restart', past) + + // Simulate hub shutdown — close is implicit when GC'd in Bun, but we move on + // Second "run": fresh Store + fresh MessageService (cold start) + const store2 = new Store(dbPath) + const cliEmitted: unknown[] = [] + const io2 = { + of: (ns: string) => ({ + to: (_room: string) => ({ + emit: (_event: string, data: unknown) => { + if (ns === '/cli') cliEmitted.push(data) + }, + timeout: (_ms: number) => ({ emit: () => {} }) + }), + adapter: { rooms: { get: () => undefined } } + }) + } as unknown as Server + const publisher2 = { emit: () => {}, events: [] } + + const service2 = new MessageService(store2, io2, publisher2 as any) + // After cold start, first tick should discover and emit the mature message + service2.releaseMatureScheduledMessages(now + 5_000) + + expect(cliEmitted).toHaveLength(1) + + // invoked_at must still be null (CLI hasn't acked yet) + const msgs = store2.messages.getMessages(session.id) + const msg = msgs.find(m => m.localId === 'local-restart')! + expect(msg.invokedAt).toBeNull() + } finally { + rmSync(dir, { recursive: true, force: true }) + } + }) +}) + +// --------------------------------------------------------------------------- +// HAPI Bot R4: session-end sweep must not stamp mature scheduled rows +// --------------------------------------------------------------------------- + +describe('MessageService.sweepImmediateQueuedOnSessionEnd — scheduled rows are preserved', () => { + function makeNoopIo(): Server { + return { + of: (_ns: string) => ({ + to: (_room: string) => ({ + emit: () => {}, + timeout: (_ms: number) => ({ emit: () => {} }) + }), + adapter: { rooms: { get: () => undefined } } + }) + } as unknown as Server + } + + function makeTrackingIo(): { io: Server; cliEmitted: unknown[] } { + const cliEmitted: unknown[] = [] + const io = { + of: (ns: string) => ({ + to: (_room: string) => ({ + emit: (_event: string, data: unknown) => { + if (ns === '/cli') cliEmitted.push(data) + }, + timeout: (_ms: number) => ({ emit: () => {} }) + }), + adapter: { rooms: { get: () => undefined } } + }) + } as unknown as Server + return { io, cliEmitted } + } + + it('mature scheduled row at session-end stays uninvoked and is emitted by the next mature scan', () => { + // R4 race scenario A: CLI dies just after scheduled_at <= now but before + // the next 5s mature-scan tick — the sweep must NOT touch the scheduled row. + const store = makeStore() + const session = makeSession(store, 'r4-mature-sweep') + const publisher = makePublisher() + const now = Date.now() + const past = now - 1000 + + store.messages.addMessage( + session.id, + { role: 'user', content: { type: 'text', text: 'mature scheduled' } }, + 'local-mature', + past + ) + + const service = new MessageService(store, makeNoopIo(), publisher as any) + + const result = service.sweepImmediateQueuedOnSessionEnd(session.id, now) + expect(result).toBeNull() + // No SSE side effect when there is nothing to sweep. + expect(publisher.events.filter(e => e.type === 'messages-consumed')).toHaveLength(0) + + // Row is still uninvoked and still mature — the next scan picks it up. + const stillQueued = store.messages.getUninvokedLocalMessages(session.id) + expect(stillQueued.find((m) => m.localId === 'local-mature')?.invokedAt).toBeNull() + + // Mature-scan tick after re-attach delivers the row. + const { io, cliEmitted } = makeTrackingIo() + const service2 = new MessageService(store, io, publisher as any) + service2.releaseMatureScheduledMessages(now) + expect(cliEmitted).toHaveLength(1) + }) + + it('mature scheduled row already emitted but not yet acked stays uninvoked across session-end and is re-emitted', () => { + // R4 race scenario B: mature scan emits at T+0, CLI receives but dies + // before sending messages-consumed. Session-end fires while invoked_at + // is still NULL. The sweep must preserve the row (scheduled_at IS NOT + // NULL filter) so the next mature-scan tick re-emits it — preserving the + // documented "re-emit until ack" contract for scheduled rows. + const store = makeStore() + const session = makeSession(store, 'r4-emit-noack-sweep') + const publisher = makePublisher() + const now = Date.now() + const past = now - 1000 + + store.messages.addMessage( + session.id, + { role: 'user', content: { type: 'text', text: 'emit-noack' } }, + 'local-noack', + past + ) + + // First mature-scan emit — does NOT write invoked_at (R3 contract). + const { io: io1, cliEmitted: emitted1 } = makeTrackingIo() + const service1 = new MessageService(store, io1, publisher as any) + service1.releaseMatureScheduledMessages(now) + expect(emitted1).toHaveLength(1) + // Confirm invoked_at is still null (the runner crashed before acking). + expect( + store.messages.getUninvokedLocalMessages(session.id) + .find(m => m.localId === 'local-noack')?.invokedAt + ).toBeNull() + + // Session-end fires. Sweep must leave the row alone. + const sweepResult = service1.sweepImmediateQueuedOnSessionEnd(session.id, now) + expect(sweepResult).toBeNull() + expect(publisher.events.filter(e => e.type === 'messages-consumed')).toHaveLength(0) + expect( + store.messages.getUninvokedLocalMessages(session.id) + .find(m => m.localId === 'local-noack')?.invokedAt + ).toBeNull() + + // Re-attach: next mature-scan tick re-emits the same row. + const { io: io2, cliEmitted: emitted2 } = makeTrackingIo() + const service2 = new MessageService(store, io2, publisher as any) + service2.releaseMatureScheduledMessages(now + 5000) + expect(emitted2).toHaveLength(1) + }) + + it('immediate-queued (no scheduled_at) IS swept and stamped invoked at session-end', () => { + // Confirms the sweep still does its primary job for true immediate rows. + const store = makeStore() + const session = makeSession(store, 'r4-immediate-sweep') + const publisher = makePublisher() + const now = Date.now() + + store.messages.addMessage( + session.id, + { role: 'user', content: { type: 'text', text: 'immediate' } }, + 'local-imm' + ) + + const service = new MessageService(store, makeNoopIo(), publisher as any) + const result = service.sweepImmediateQueuedOnSessionEnd(session.id, now) + expect(result).not.toBeNull() + expect(result?.localIds).toEqual(['local-imm']) + + // SSE side effect carries the swept localIds for the floating bar. + const consumed = publisher.events.find(e => e.type === 'messages-consumed') as + | { type: 'messages-consumed'; sessionId: string; localIds: string[]; invokedAt: number } + | undefined + expect(consumed).toBeDefined() + expect(consumed?.localIds).toEqual(['local-imm']) + + // Row is now stamped — bar can clear. + const stillQueued = store.messages.getUninvokedLocalMessages(session.id) + expect(stillQueued.find((m) => m.localId === 'local-imm')).toBeUndefined() + }) + + it('future scheduled (scheduled_at > now) is also preserved by the sweep', () => { + const store = makeStore() + const session = makeSession(store, 'r4-future-sweep') + const publisher = makePublisher() + const now = Date.now() + const future = now + 60_000 + + store.messages.addMessage( + session.id, + { role: 'user', content: { type: 'text', text: 'future' } }, + 'local-future', + future + ) + + const service = new MessageService(store, makeNoopIo(), publisher as any) + const result = service.sweepImmediateQueuedOnSessionEnd(session.id, now) + expect(result).toBeNull() + expect(publisher.events.filter(e => e.type === 'messages-consumed')).toHaveLength(0) + + const stillQueued = store.messages.getUninvokedLocalMessages(session.id) + expect(stillQueued.find((m) => m.localId === 'local-future')?.invokedAt).toBeNull() + }) +}) diff --git a/hub/src/sync/messageService.ts b/hub/src/sync/messageService.ts index 82a765505..e25179175 100644 --- a/hub/src/sync/messageService.ts +++ b/hub/src/sync/messageService.ts @@ -40,7 +40,8 @@ export class MessageService { localId: message.localId, content: message.content, createdAt: message.createdAt, - invokedAt: message.invokedAt + invokedAt: message.invokedAt, + scheduledAt: message.scheduledAt })) let oldestSeq: number | null = null @@ -105,7 +106,8 @@ export class MessageService { localId: message.localId, content: message.content, createdAt: message.createdAt, - invokedAt: message.invokedAt + invokedAt: message.invokedAt, + scheduledAt: message.scheduledAt })) // The cursor is the oldest row in the actual position-ordered page (pageRows[0]). @@ -135,15 +137,23 @@ export class MessageService { } } - getMessagesAfter(sessionId: string, options: { afterSeq: number; limit: number }): DecryptedMessage[] { - const stored = this.store.messages.getMessagesAfter(sessionId, options.afterSeq, options.limit) + /** CLI reconnect backfill — excludes future-scheduled rows so the runner does + * not consume them ahead of their scheduled_at. See messages.ts:getDeliverableMessagesAfter. */ + getDeliverableMessagesAfter(sessionId: string, options: { afterSeq: number; limit: number; now: number }): DecryptedMessage[] { + const stored = this.store.messages.getDeliverableMessagesAfter( + sessionId, + options.afterSeq, + options.now, + options.limit + ) return stored.map((message) => ({ id: message.id, seq: message.seq, localId: message.localId, content: message.content, createdAt: message.createdAt, - invokedAt: message.invokedAt + invokedAt: message.invokedAt, + scheduledAt: message.scheduledAt })) } @@ -169,7 +179,7 @@ export class MessageService { // Phase 2: row is still queued. Ask the CLI whether it already shifted the item // (race window between collectBatch() shift and messages-consumed ack). - const { localId, resolvedId } = lookup + const { localId, resolvedId, scheduledAt } = lookup if (!localId) { // No localId — row exists but has no cancel path; treat as cancelled. @@ -178,6 +188,29 @@ export class MessageService { return { status: 'cancelled', localId: null } } + // Phase 2b: future-scheduled messages were never emitted to the CLI, so they + // are not in the CLI's in-memory queue. Asking the CLI whether it can remove + // the item would always return 'not-found', which the normal ack path + // misinterprets as "CLI already consumed it" and stamps invoked_at. + // Short-circuit: delete the row directly without a CLI ack round-trip. + // + // Single event loop turn: the scheduledAt > now check and the + // deleteQueuedMessageById call execute atomically with no await between + // them, so the offline-CLI path's re-check pattern is unnecessary here. + // The offline path needs the re-check because it awaits the + // markInvoked between the lookup and the delete. + const now = Date.now() + if (scheduledAt !== null && scheduledAt > now) { + this.store.messages.deleteQueuedMessageById(sessionId, resolvedId) + this.publisher.emit({ + type: 'message-cancelled', + sessionId, + messageId, + localId, + }) + return { status: 'cancelled', localId } + } + // Phase 2a: if no CLI socket is currently in the session room, the CLI is // offline and there is nobody to ack with. Delete the row immediately so a // later CLI reconnect cannot pick it up via seq-backfill and re-enqueue the @@ -322,8 +355,21 @@ export class MessageService { localId?: string | null attachments?: AttachmentMetadata[] sentFrom?: 'telegram-bot' | 'webapp' + scheduledAt?: number | null } ): Promise { + // Defence-in-depth invariant for non-REST callers (Telegram bot, MCP, + // internal callers). Attachment paths live under the CLI session's + // upload directory which `cleanupUploadDir` purges on session end; a + // mature scheduled emit after the CLI exits would dereference deleted + // files via the @path attachment formatter. REST already rejects this + // combination at the Zod layer, but enforcing it here keeps the rule in + // one structural place — same pattern as `addMessage`'s scheduledAt + + // !localId throw. + if (payload.scheduledAt != null && (payload.attachments?.length ?? 0) > 0) { + throw new Error('sendMessage: scheduled messages with attachments are not supported') + } + const sentFrom = payload.sentFrom ?? 'webapp' const content = { @@ -338,27 +384,42 @@ export class MessageService { } } - const msg = this.store.messages.addMessage(sessionId, content, payload.localId ?? undefined) + const msg = this.store.messages.addMessage( + sessionId, + content, + payload.localId ?? undefined, + payload.scheduledAt ?? null + ) this.onSessionActivity?.(sessionId, msg.createdAt) - const update = { - id: msg.id, - seq: msg.seq, - createdAt: msg.createdAt, - body: { - t: 'new-message' as const, - sid: sessionId, - message: { - id: msg.id, - seq: msg.seq, - createdAt: msg.createdAt, - localId: msg.localId, - content: msg.content + // Only emit to CLI if the message is not scheduled for the future. + // Mature or non-scheduled messages go through immediately; future scheduled + // messages wait for the 5-second tick in releaseMatureScheduledMessages. + // Re-measure Date.now() after addMessage to avoid a TOCTOU window where + // the pre-insert `now` capture could misclassify a borderline scheduledAt + // as future when it has already become past by the time we check. + const isFutureScheduled = msg.scheduledAt !== null && msg.scheduledAt > Date.now() + if (!isFutureScheduled) { + const update = { + id: msg.id, + seq: msg.seq, + createdAt: msg.createdAt, + body: { + t: 'new-message' as const, + sid: sessionId, + message: { + id: msg.id, + seq: msg.seq, + createdAt: msg.createdAt, + localId: msg.localId, + content: msg.content + } } } + this.io.of('/cli').to(`session:${sessionId}`).emit('update', update) } - this.io.of('/cli').to(`session:${sessionId}`).emit('update', update) + // Always emit message-received to Web SSE so the floating bar renders. this.publisher.emit({ type: 'message-received', sessionId, @@ -368,8 +429,79 @@ export class MessageService { localId: msg.localId, content: msg.content, createdAt: msg.createdAt, - invokedAt: msg.invokedAt + invokedAt: msg.invokedAt, + scheduledAt: msg.scheduledAt } }) } + + /** + * Force-invoke all immediate-queued messages for a session at session end. + * + * Called by sessionHandlers when the CLI sends 'session-end', so that + * the floating bar is cleared without leaving queued rows pinned forever. + * + * **All scheduled rows are intentionally skipped** (mature or future). The + * mature-scan path (releaseMatureScheduledMessages) is the sole emit channel + * for scheduled rows and relies on the CLI ack to write invoked_at; if this + * sweep stamped a mature scheduled row, a subsequent re-attach would never + * see the row in the next mature-scan tick and the user's prompt would be + * silently dropped. See HAPI Bot R4 finding. + * + * Returns the list of localIds that were stamped and the invokedAt timestamp, + * or null if no messages needed sweeping. + */ + sweepImmediateQueuedOnSessionEnd( + sessionId: string, + invokedAt: number + ): { localIds: string[]; invokedAt: number } | null { + const queued = this.store.messages.getImmediateQueuedLocalMessages(sessionId) + const localIds = queued + .map((m) => m.localId) + .filter((id): id is string => typeof id === 'string') + if (localIds.length === 0) return null + this.store.messages.markMessagesInvoked(sessionId, localIds, invokedAt) + this.publisher.emit({ type: 'messages-consumed', sessionId, localIds, invokedAt }) + return { localIds, invokedAt } + } + + /** Called by the hub 5-second tick (syncEngine.expireInactive). + * + * Finds all scheduled messages whose scheduled_at <= now and emits them to + * the CLI via socket.io. Does NOT call markMessagesInvoked — the CLI ack + * (messages-consumed) handles that. This means a message is re-emitted on + * each tick until the CLI acks it, which is the correct behaviour for hub + * restart scenarios (pitfall #2 guard). + * + * Race window with cancel: this tick widens the cancel race to 5 s for + * scheduled messages (vs near-zero for immediate-queued ones). If the CLI + * has already shift()-ed the row when cancel arrives, cancelQueuedMessage + * gets 'not-found' from the CLI ack and stamps invoked_at (PR #568 contract + * preserved). Web client surfaces this as 'sent' in the thread. + * See messageService.test.ts "cancel × mature race" for the documented + * expected behaviour. */ + releaseMatureScheduledMessages(now: number): void { + const mature = this.store.messages.getMatureScheduledMessages(now) + for (const msg of mature) { + const update = { + id: msg.id, + seq: msg.seq, + createdAt: msg.createdAt, + body: { + t: 'new-message' as const, + sid: msg.sessionId, + message: { + id: msg.id, + seq: msg.seq, + createdAt: msg.createdAt, + localId: msg.localId, + content: msg.content + } + } + } + this.io.of('/cli').to(`session:${msg.sessionId}`).emit('update', update) + // NOTE: do NOT call markMessagesInvoked here (pitfall #2). + // CLI ack (messages-consumed) will handle invoked_at stamping. + } + } } diff --git a/hub/src/sync/syncEngine.ts b/hub/src/sync/syncEngine.ts index bc8836075..1d1618bf3 100644 --- a/hub/src/sync/syncEngine.ts +++ b/hub/src/sync/syncEngine.ts @@ -185,8 +185,8 @@ export class SyncEngine { return this.messageService.getMessagesPageByPosition(sessionId, options) } - getMessagesAfter(sessionId: string, options: { afterSeq: number; limit: number }): DecryptedMessage[] { - return this.messageService.getMessagesAfter(sessionId, options) + getDeliverableMessagesAfter(sessionId: string, options: { afterSeq: number; limit: number; now: number }): DecryptedMessage[] { + return this.messageService.getDeliverableMessagesAfter(sessionId, options) } handleRealtimeEvent(event: SyncEvent): void { @@ -269,6 +269,9 @@ export class SyncEngine { this.triggerDedupIfNeeded(session.id) } this.machineCache.expireInactive() + // Piggybacked on the inactivity tick; not a logical part of expireInactive + // but shares its 5s cadence (avoids a second timer). + this.messageService.releaseMatureScheduledMessages(Date.now()) } private reloadAll(): void { @@ -306,6 +309,7 @@ export class SyncEngine { previewUrl?: string }> sentFrom?: 'telegram-bot' | 'webapp' + scheduledAt?: number | null } ): Promise { await this.messageService.sendMessage(sessionId, payload) @@ -319,6 +323,10 @@ export class SyncEngine { return this.messageService.cancelQueuedMessage(sessionId, messageId) } + sweepImmediateQueuedOnSessionEnd(sessionId: string, invokedAt: number): void { + this.messageService.sweepImmediateQueuedOnSessionEnd(sessionId, invokedAt) + } + async approvePermission( sessionId: string, requestId: string, diff --git a/hub/src/web/routes/cli.ts b/hub/src/web/routes/cli.ts index f464b99b7..e53dcc547 100644 --- a/hub/src/web/routes/cli.ts +++ b/hub/src/web/routes/cli.ts @@ -147,7 +147,15 @@ export function createCliRoutes(getSyncEngine: () => SyncEngine | null): Hono Promise +}) { + const sentMessages: Array<{ sessionId: string; payload: unknown }> = [] + const sendMessage = opts.sendMessage ?? (async (sessionId: string, payload: unknown) => { + sentMessages.push({ sessionId, payload }) + }) + + const engine = { + resolveSessionAccess: () => ({ + ok: true, + sessionId: 'session-1', + session: { id: 'session-1', active: opts.active !== false } + }), + sendMessage, + cancelQueuedMessage: async () => ({ status: 'cancelled' }), + getMessagesPage: () => ({ messages: [], page: {} }), + getMessagesPageByPosition: () => ({ messages: [], page: {} }), + } as unknown as SyncEngine + + const app = new Hono() + app.use('*', async (c, next) => { + c.set('namespace', 'default') + await next() + }) + app.route('/api', createMessagesRoutes(() => engine as SyncEngine)) + + return { app, sentMessages } +} + +// --------------------------------------------------------------------------- +// #2 server-side scheduledAt upper bound +// --------------------------------------------------------------------------- + +describe('POST /api/sessions/:id/messages — #2 scheduledAt upper bound', () => { + it('rejects scheduledAt more than 7 days in the future with 400 and clear message', async () => { + const { app } = createApp({}) + + const eightDaysMs = Date.now() + 8 * 24 * 60 * 60 * 1000 + const response = await app.request('/api/sessions/session-1/messages', { + method: 'POST', + headers: { 'content-type': 'application/json' }, + body: JSON.stringify({ text: 'hello', localId: 'local-1', scheduledAt: eightDaysMs }) + }) + + expect(response.status).toBe(400) + const body = await response.json() as { error: string; issues?: { _errors?: string[] } } + expect(body.error).toBe('Invalid body') + // #4: issues field must be present + expect(body.issues).toBeDefined() + // The 7-day message must appear somewhere in the issues + const issuesStr = JSON.stringify(body.issues) + expect(issuesStr).toContain('7 days') + }) + + it('accepts scheduledAt exactly at the 7-day boundary (inclusive)', async () => { + const { app, sentMessages } = createApp({}) + + // Use slightly less than 7 days to avoid flakiness at the exact boundary + const nearlySevenDays = Date.now() + 7 * 24 * 60 * 60 * 1000 - 1000 + const response = await app.request('/api/sessions/session-1/messages', { + method: 'POST', + headers: { 'content-type': 'application/json' }, + body: JSON.stringify({ text: 'hello', localId: 'local-2', scheduledAt: nearlySevenDays }) + }) + + expect(response.status).toBe(200) + expect(sentMessages).toHaveLength(1) + }) + + it('accepts null scheduledAt (immediate send)', async () => { + const { app, sentMessages } = createApp({}) + + const response = await app.request('/api/sessions/session-1/messages', { + method: 'POST', + headers: { 'content-type': 'application/json' }, + body: JSON.stringify({ text: 'hello', localId: 'local-3', scheduledAt: null }) + }) + + expect(response.status).toBe(200) + expect(sentMessages).toHaveLength(1) + }) + + it('accepts missing scheduledAt (immediate send)', async () => { + const { app, sentMessages } = createApp({}) + + const response = await app.request('/api/sessions/session-1/messages', { + method: 'POST', + headers: { 'content-type': 'application/json' }, + body: JSON.stringify({ text: 'hello' }) + }) + + expect(response.status).toBe(200) + expect(sentMessages).toHaveLength(1) + }) +}) + +// --------------------------------------------------------------------------- +// #4 Zod error details in response body +// --------------------------------------------------------------------------- + +describe('POST /api/sessions/:id/messages — #4 Zod error issues in response', () => { + it('returns issues when scheduledAt is set but localId is missing', async () => { + const { app } = createApp({}) + + const futureMs = Date.now() + 60_000 + const response = await app.request('/api/sessions/session-1/messages', { + method: 'POST', + headers: { 'content-type': 'application/json' }, + body: JSON.stringify({ text: 'hello', scheduledAt: futureMs }) + }) + + expect(response.status).toBe(400) + const body = await response.json() as { error: string; issues?: unknown } + expect(body.error).toBe('Invalid body') + expect(body.issues).toBeDefined() + const issuesStr = JSON.stringify(body.issues) + expect(issuesStr).toContain('localId') + }) + + it('returns issues with a non-string text field', async () => { + const { app } = createApp({}) + + const response = await app.request('/api/sessions/session-1/messages', { + method: 'POST', + headers: { 'content-type': 'application/json' }, + body: JSON.stringify({ text: 123 }) + }) + + expect(response.status).toBe(400) + const body = await response.json() as { error: string; issues?: unknown } + expect(body.error).toBe('Invalid body') + expect(body.issues).toBeDefined() + }) +}) + +// --------------------------------------------------------------------------- +// HAPI Bot R3 finding 3: scheduledAt + attachments rejected +// --------------------------------------------------------------------------- + +describe('POST /api/sessions/:id/messages — scheduledAt + attachments rejected', () => { + it('rejects scheduledAt combined with non-empty attachments with 400', async () => { + const { app, sentMessages } = createApp({}) + + const futureMs = Date.now() + 60_000 + const response = await app.request('/api/sessions/session-1/messages', { + method: 'POST', + headers: { 'content-type': 'application/json' }, + body: JSON.stringify({ + text: 'hello', + localId: 'local-att', + scheduledAt: futureMs, + attachments: [{ id: 'att-1', filename: 'a.png', mimeType: 'image/png', size: 10, path: '/tmp/a.png' }] + }) + }) + + expect(response.status).toBe(400) + const body = await response.json() as { error: string; issues?: unknown } + expect(body.error).toBe('Invalid body') + const issuesStr = JSON.stringify(body.issues) + expect(issuesStr).toContain('attachments') + expect(sentMessages).toHaveLength(0) + }) + + it('accepts scheduledAt with empty attachments array', async () => { + const { app, sentMessages } = createApp({}) + + const futureMs = Date.now() + 60_000 + const response = await app.request('/api/sessions/session-1/messages', { + method: 'POST', + headers: { 'content-type': 'application/json' }, + body: JSON.stringify({ + text: 'hello', + localId: 'local-att-2', + scheduledAt: futureMs, + attachments: [] + }) + }) + + expect(response.status).toBe(200) + expect(sentMessages).toHaveLength(1) + }) + + it('accepts immediate send with attachments (no scheduledAt)', async () => { + const { app, sentMessages } = createApp({}) + + const response = await app.request('/api/sessions/session-1/messages', { + method: 'POST', + headers: { 'content-type': 'application/json' }, + body: JSON.stringify({ + text: 'hello', + attachments: [{ id: 'att-2', filename: 'b.png', mimeType: 'image/png', size: 10, path: '/tmp/b.png' }] + }) + }) + + expect(response.status).toBe(200) + expect(sentMessages).toHaveLength(1) + }) +}) diff --git a/hub/src/web/routes/messages.ts b/hub/src/web/routes/messages.ts index fb1e298c9..cb1d83708 100644 --- a/hub/src/web/routes/messages.ts +++ b/hub/src/web/routes/messages.ts @@ -15,8 +15,30 @@ const querySchema = z.object({ const sendMessageBodySchema = z.object({ text: z.string(), localId: z.string().min(1).optional(), - attachments: z.array(AttachmentMetadataSchema).optional() -}) + attachments: z.array(AttachmentMetadataSchema).optional(), + scheduledAt: z.number().int().positive().nullable().optional() +}).refine( + // Scheduled messages need a localId so the ack flow (markMessagesInvoked + // by localId) can flip invoked_at after the CLI consumes them. Without + // a localId, addMessage stamps invoked_at immediately, which would + // silently swallow the schedule. + (data) => data.scheduledAt == null || typeof data.localId === 'string', + { message: 'scheduledAt requires localId', path: ['localId'] } +).refine( + // Cap scheduledAt at 7 days from now to prevent zombie rows. REST/Telegram/ + // automation callers bypass the frontend 7-day clamp, so we enforce it here. + // Evaluated at request time so Date.now() is fresh on every call. + (data) => data.scheduledAt == null || data.scheduledAt <= Date.now() + 7 * 24 * 60 * 60 * 1000, + { message: 'scheduledAt must be within 7 days from now', path: ['scheduledAt'] } +).refine( + // Attachment paths are stored under the CLI session's upload directory and + // purged on session end (cleanupUploadDir in apiSession.ts:sendSessionDeath). + // A scheduled message that matures after the CLI exits would dereference + // deleted files via the @path attachment formatter. Reject the combination + // until uploads are retained through invocation. + (data) => data.scheduledAt == null || !data.attachments?.length, + { message: 'scheduled messages with attachments are not supported', path: ['attachments'] } +) export function createMessagesRoutes(getSyncEngine: () => SyncEngine | null): Hono { const app = new Hono() @@ -83,7 +105,7 @@ export function createMessagesRoutes(getSyncEngine: () => SyncEngine | null): Ho const body = await c.req.json().catch(() => null) const parsed = sendMessageBodySchema.safeParse(body) if (!parsed.success) { - return c.json({ error: 'Invalid body' }, 400) + return c.json({ error: 'Invalid body', issues: parsed.error.flatten() }, 400) } // Require text or attachments @@ -95,7 +117,8 @@ export function createMessagesRoutes(getSyncEngine: () => SyncEngine | null): Ho text: parsed.data.text, localId: parsed.data.localId, attachments: parsed.data.attachments, - sentFrom: 'webapp' + sentFrom: 'webapp', + scheduledAt: parsed.data.scheduledAt }) return c.json({ ok: true }) }) diff --git a/shared/src/schemas.ts b/shared/src/schemas.ts index a13eae0ad..89bd3a875 100644 --- a/shared/src/schemas.ts +++ b/shared/src/schemas.ts @@ -155,7 +155,8 @@ export const DecryptedMessageSchema = z.object({ localId: z.string().nullable(), content: z.unknown(), createdAt: z.number(), - invokedAt: z.number().nullable().optional() + invokedAt: z.number().nullable().optional(), + scheduledAt: z.number().nullable().optional() }) export type DecryptedMessage = z.infer diff --git a/web/src/api/client.ts b/web/src/api/client.ts index 86d2c9619..688f6d2b3 100644 --- a/web/src/api/client.ts +++ b/web/src/api/client.ts @@ -297,13 +297,14 @@ export class ApiClient { return response.sessionId } - async sendMessage(sessionId: string, text: string, localId?: string | null, attachments?: AttachmentMetadata[]): Promise { + async sendMessage(sessionId: string, text: string, localId?: string | null, attachments?: AttachmentMetadata[], scheduledAt?: number | null): Promise { await this.request(`/api/sessions/${encodeURIComponent(sessionId)}/messages`, { method: 'POST', body: JSON.stringify({ text, localId: localId ?? undefined, - attachments: attachments ?? undefined + attachments: attachments ?? undefined, + scheduledAt: scheduledAt ?? undefined }) }) } diff --git a/web/src/components/AssistantChat/ComposerButtons.tsx b/web/src/components/AssistantChat/ComposerButtons.tsx index 2777c9056..dde957210 100644 --- a/web/src/components/AssistantChat/ComposerButtons.tsx +++ b/web/src/components/AssistantChat/ComposerButtons.tsx @@ -1,6 +1,28 @@ import { ComposerPrimitive } from '@assistant-ui/react' import type { ConversationStatus } from '@/realtime/types' import { useTranslation } from '@/lib/use-translation' +import { ScheduleTimePicker } from './ScheduleTimePicker' +import type { PendingSchedule } from './ScheduleTimePicker' +import { useRef, useState } from 'react' + +function ScheduleIcon() { + return ( + + + + + ) +} function VoiceAssistantIcon() { return ( @@ -319,9 +341,16 @@ export function ComposerButtons(props: { onVoiceToggle: () => void onVoiceMicToggle?: () => void onSend: () => void + pendingSchedule?: PendingSchedule | null + onSchedule?: (pending: PendingSchedule) => void + onClearSchedule?: () => void }) { const { t } = useTranslation() const isVoiceConnected = props.voiceStatus === 'connected' + const [showSchedulePicker, setShowSchedulePicker] = useState(false) + const scheduleButtonRef = useRef(null) + + const hasSchedule = props.pendingSchedule != null return (
@@ -402,6 +431,44 @@ export function ComposerButtons(props: { ) : null} + + {/* Schedule button — only shown when onSchedule handler is provided */} + {props.onSchedule ? ( + <> + + {showSchedulePicker && ( + { + props.onSchedule!(pending) + setShowSchedulePicker(false) + }} + onClose={() => setShowSchedulePicker(false)} + pendingSchedule={props.pendingSchedule} + /> + )} + + ) : null}
void onVoiceMicToggle?: () => void + // Schedule props (lifted from internal state when provided) + pendingSchedule?: PendingSchedule | null + onSchedule?: (pending: PendingSchedule) => void + onClearSchedule?: () => void }) { const { t } = useTranslation() const { @@ -109,7 +114,10 @@ export function HappyComposer(props: { voiceStatus = 'disconnected', voiceMicMuted = false, onVoiceToggle, - onVoiceMicToggle + onVoiceMicToggle, + pendingSchedule: pendingScheduleProp, + onSchedule: onScheduleProp, + onClearSchedule: onClearScheduleProp } = props // Use ?? so missing values fall back to default (destructuring defaults only handle undefined) @@ -150,6 +158,11 @@ export function HappyComposer(props: { const [isAborting, setIsAborting] = useState(false) const [isSwitching, setIsSwitching] = useState(false) const [showContinueHint, setShowContinueHint] = useState(false) + // pendingSchedule is controlled externally when onSchedule prop is provided; otherwise local state + const [pendingScheduleLocal, setPendingScheduleLocal] = useState(null) + const isControlled = onScheduleProp !== undefined + const pendingSchedule = isControlled ? (pendingScheduleProp ?? null) : pendingScheduleLocal + const setPendingSchedule = isControlled ? onScheduleProp : setPendingScheduleLocal const textareaRef = useRef(null) const prevControlledByUser = useRef(controlledByUser) @@ -501,7 +514,13 @@ export function HappyComposer(props: { const handleSend = useCallback(() => { api.composer().send() - }, [api]) + // Clear schedule after send so next message starts fresh (immediate vs controlled) + if (isControlled) { + onClearScheduleProp?.() + } else { + setPendingScheduleLocal(null) + } + }, [api, isControlled, onClearScheduleProp]) const overlays = useMemo(() => { if (showSettings && (showCollaborationSettings || showPermissionSettings || showModelSettings || showModelReasoningEffortSettings || showEffortSettings)) { @@ -826,6 +845,9 @@ export function HappyComposer(props: { onVoiceToggle={onVoiceToggle ?? (() => {})} onVoiceMicToggle={onVoiceMicToggle} onSend={handleSend} + pendingSchedule={pendingSchedule} + onSchedule={setPendingSchedule} + onClearSchedule={isControlled ? onClearScheduleProp : () => setPendingScheduleLocal(null)} /> diff --git a/web/src/components/AssistantChat/QueuedMessagesBar.test.tsx b/web/src/components/AssistantChat/QueuedMessagesBar.test.tsx index e60fc7b9e..719af29e5 100644 --- a/web/src/components/AssistantChat/QueuedMessagesBar.test.tsx +++ b/web/src/components/AssistantChat/QueuedMessagesBar.test.tsx @@ -1,5 +1,6 @@ import { describe, expect, it } from 'vitest' -import { computeCanCancel } from './QueuedMessagesBar' +import type { DecryptedMessage } from '@/types/api' +import { computeCanCancel, computeEditPendingSchedule, formatScheduledTime, sortQueuedMessages } from './QueuedMessagesBar' /** * Unit tests for computeCanCancel — the race guard that prevents sending @@ -56,3 +57,100 @@ describe('computeCanCancel', () => { }) }) }) + +// --------------------------------------------------------------------------- +// #4 computeEditPendingSchedule — edit restores scheduledAt as absolute pending +// --------------------------------------------------------------------------- + +describe('computeEditPendingSchedule', () => { + it('returns null for immediate-queued message (no scheduledAt)', () => { + const now = Date.now() + expect(computeEditPendingSchedule(null, now)).toBeNull() + expect(computeEditPendingSchedule(undefined, now)).toBeNull() + }) + + it('returns null for scheduledAt in the past (message matured)', () => { + const now = Date.now() + const past = now - 5000 // 5 seconds ago + expect(computeEditPendingSchedule(past, now)).toBeNull() + }) + + it('returns absolute PendingSchedule for future scheduledAt', () => { + const now = Date.now() + const future = now + 60_000 // 1 minute from now + const result = computeEditPendingSchedule(future, now) + expect(result).not.toBeNull() + expect(result?.type).toBe('absolute') + if (result?.type === 'absolute') { + expect(result.ms).toBe(future) + } + }) +}) + +describe('sortQueuedMessages', () => { + const make = (id: string, createdAt: number, scheduledAt: number | null = null): DecryptedMessage => ({ + id, + localId: id, + createdAt, + seq: createdAt, + scheduledAt, + invokedAt: null, + content: { role: 'user', content: { type: 'text', text: id } }, + } as unknown as DecryptedMessage) + + it('places immediate-queued messages before scheduled ones', () => { + const a = make('a-immediate', 1000) + const b = make('b-scheduled-soon', 500, Date.now() + 60_000) + const result = sortQueuedMessages([b, a]) + expect(result.map((m) => m.id)).toEqual(['a-immediate', 'b-scheduled-soon']) + }) + + it('orders immediate-queued messages by createdAt ascending', () => { + const older = make('older', 1000) + const newer = make('newer', 2000) + const result = sortQueuedMessages([newer, older]) + expect(result.map((m) => m.id)).toEqual(['older', 'newer']) + }) + + it('orders scheduled messages by scheduledAt ascending (soonest first)', () => { + const later = make('fires-later', 1000, 10_000) + const sooner = make('fires-sooner', 2000, 5_000) + const result = sortQueuedMessages([later, sooner]) + expect(result.map((m) => m.id)).toEqual(['fires-sooner', 'fires-later']) + }) + + it('combined: immediate first, then scheduled in fire-time order', () => { + const im1 = make('im1', 1000) + const im2 = make('im2', 2000) + const sched1 = make('sched-near', 500, 5_000) + const sched2 = make('sched-far', 600, 10_000) + const result = sortQueuedMessages([sched2, im2, sched1, im1]) + expect(result.map((m) => m.id)).toEqual(['im1', 'im2', 'sched-near', 'sched-far']) + }) +}) + +// --------------------------------------------------------------------------- +// formatScheduledTime — cross-year support (#8) +// --------------------------------------------------------------------------- + +describe('formatScheduledTime', () => { + it('omits year for a date in the current year', () => { + const now = new Date() + // Use a date 1 month ahead in the same year, guarding against Dec edge case + const sameYearDate = new Date(now.getFullYear(), now.getMonth() + 1 < 12 ? now.getMonth() + 1 : 0, 15, 10, 30) + if (sameYearDate.getFullYear() !== now.getFullYear()) { + // Wrapped to next year — skip (edge case in late December) + return + } + const result = formatScheduledTime(sameYearDate.getTime()) + // Year digits should not appear + expect(result).not.toContain(String(now.getFullYear())) + }) + + it('includes year for a date in a different year', () => { + const nextYear = new Date().getFullYear() + 1 + const crossYearDate = new Date(nextYear, 0, 15, 10, 30) // Jan 15 next year + const result = formatScheduledTime(crossYearDate.getTime()) + expect(result).toContain(String(nextYear)) + }) +}) diff --git a/web/src/components/AssistantChat/QueuedMessagesBar.tsx b/web/src/components/AssistantChat/QueuedMessagesBar.tsx index bc036e6c3..5866a224a 100644 --- a/web/src/components/AssistantChat/QueuedMessagesBar.tsx +++ b/web/src/components/AssistantChat/QueuedMessagesBar.tsx @@ -1,5 +1,5 @@ import { useAssistantApi } from '@assistant-ui/react' -import { useCallback, useSyncExternalStore } from 'react' +import { useCallback, useMemo, useSyncExternalStore } from 'react' import type { ApiClient } from '@/api/client' import { getMessageWindowState, subscribeMessageWindow } from '@/lib/message-window-store' import { isQueuedForInvocation } from '@/lib/messages' @@ -7,6 +7,9 @@ import { EMPTY_STATE } from '@/hooks/queries/useMessages' import { normalizeDecryptedMessage } from '@/chat/normalize' import type { DecryptedMessage } from '@/types/api' import { useCancelQueuedMessage } from '@/hooks/mutations/useCancelQueuedMessage' +import { useTranslation } from '@/lib/use-translation' +import { useToast } from '@/lib/toast-context' +import type { PendingSchedule } from '@/components/AssistantChat/ScheduleTimePicker' function ClockIcon() { return ( @@ -28,6 +31,28 @@ function ClockIcon() { ) } +/** + * Orders queued messages so the floating bar reads top-down as a single timeline: + * 1. Immediate-queued messages first, in the order they were submitted. + * 2. Scheduled messages after, ordered by their fire time (soonest first). + * + * Without this the bar follows insertion order, which mixes immediate and + * scheduled rows arbitrarily and makes the "what fires next" question + * harder to answer at a glance. + * + * @internal Exported for unit testing. + */ +export function sortQueuedMessages(msgs: DecryptedMessage[]): DecryptedMessage[] { + return [...msgs].sort((a, b) => { + const aSched = a.scheduledAt != null + const bSched = b.scheduledAt != null + if (aSched !== bSched) return aSched ? 1 : -1 + // Both scheduledAt values are non-null here (aSched && bSched is true above). + if (aSched && bSched) return a.scheduledAt! - b.scheduledAt! + return (a.createdAt ?? 0) - (b.createdAt ?? 0) + }) +} + /** * Returns user messages that haven't been invoked yet (invokedAt == null and not sent/failed). * Covers both optimistic (status='queued') and server-loaded (status=undefined, invokedAt=null) cases. @@ -42,8 +67,12 @@ function useQueuedMessages(sessionId: string): DecryptedMessage[] { // `invokedAt` is the source of truth for invocation; see isQueuedForInvocation // (lib/messages) for the shared predicate used by the thread filter and the // window store trim helpers. - const allMessages = [...state.messages, ...state.pending] - return allMessages.filter(isQueuedForInvocation) + // useSyncExternalStore guarantees a stable reference when the snapshot is + // unchanged, so [state] as the dependency avoids unnecessary re-sorts. + return useMemo(() => { + const allMessages = [...state.messages, ...state.pending] + return sortQueuedMessages(allMessages.filter(isQueuedForInvocation)) + }, [state]) } function getTextFromMessage(msg: DecryptedMessage): string { @@ -65,6 +94,24 @@ function getTextFromMessage(msg: DecryptedMessage): string { return attachments.map((a) => a.filename ?? 'attachment').join(', ') } +/** + * Computes the PendingSchedule to restore when editing a queued message. + * + * - If the message has a future scheduledAt, return { type: 'absolute', ms } so the + * user can re-send with the same specific time (or adjust it). + * - If scheduledAt is null, undefined, or in the past (message already matured), + * return null so the re-sent message goes out immediately. + * + * @internal Exported for unit testing. + */ +export function computeEditPendingSchedule( + scheduledAt: number | null | undefined, + now: number +): PendingSchedule | null { + if (scheduledAt == null || scheduledAt <= now) return null + return { type: 'absolute', ms: scheduledAt } +} + /** * Determines whether the user can cancel or edit a queued message. * @@ -100,10 +147,41 @@ export function computeCanCancel({ * Edit = client-side cancel + prefill composer with message text (Codex dialect). * Cancel = DELETE /sessions/:id/messages/:messageId with optimistic removal. */ -export function QueuedMessagesBar({ sessionId, api }: { sessionId: string; api: ApiClient | null }) { +/** @internal Exported for unit testing. */ +export function formatScheduledTime(scheduledAt: number): string { + const date = new Date(scheduledAt) + const now = new Date() + const opts: Intl.DateTimeFormatOptions = { + month: 'short', + day: 'numeric', + hour: '2-digit', + minute: '2-digit', + } + if (date.getFullYear() !== now.getFullYear()) { + opts.year = 'numeric' + } + return date.toLocaleString(undefined, opts) +} + +export function QueuedMessagesBar({ + sessionId, + api, + onEdit, +}: { + sessionId: string + api: ApiClient | null + /** + * Called when the user clicks Edit on a queued message. + * The parent should restore `text` into the composer and `pendingSchedule` into the schedule state. + * Edit is always cancel + prefill, regardless of whether the message is scheduled or immediate. + */ + onEdit?: (params: { text: string; pendingSchedule: PendingSchedule | null }) => void +}) { const queued = useQueuedMessages(sessionId) const assistantApi = useAssistantApi() const cancelMutation = useCancelQueuedMessage(api) + const { t } = useTranslation() + const { addToast } = useToast() if (queued.length === 0) { return null @@ -142,7 +220,10 @@ export function QueuedMessagesBar({ sessionId, api }: { sessionId: string; api: const handleEdit = () => { if (!canCancel) return - // Edit = cancel + prefill composer (Codex dialect: no separate edit mode). + // Edit = cancel + restore composer (text + schedule). + // Works the same for immediate-queued and future-scheduled messages. + const restoredPendingSchedule = computeEditPendingSchedule(msg.scheduledAt, Date.now()) + cancelMutation.mutate( { sessionId, @@ -152,32 +233,53 @@ export function QueuedMessagesBar({ sessionId, api }: { sessionId: string; api: }, { onSuccess: (result) => { - // Race guard: if the agent already consumed this message, skip prefill. - // The hook's own onSuccess already reverted the optimistic removal. - if (result.status === 'invoked') return - // Only prefill if text is available; attachment-only rows get empty string. - const prefillText = text - if (prefillText) { - assistantApi.composer().setText(prefillText) + // Race guard: if the agent already consumed this message, skip prefill + // and inform the user so they aren't confused by the row disappearing. + if (result.status === 'invoked') { + addToast({ + title: t('queuedMessages.editAlreadyInvoked'), + body: '', + sessionId, + url: window.location.href, + }) + return + } + // Restore text into composer + if (text) { + assistantApi.composer().setText(text) } + // Restore schedule via parent callback (if provided) + onEdit?.({ text, pendingSchedule: restoredPendingSchedule }) }, } ) } + const canEdit = canCancel + return (
  • - - {text} - +
    + + {text} + + {msg.scheduledAt != null && msg.scheduledAt > Date.now() && ( +
    + + + {t('queuedMessages.scheduledFor', { time: formatScheduledTime(msg.scheduledAt) })} + +
    + )} +
    + +
    + + {/* Tab content */} +
    + {tab === 'relative' ? ( +
    + {PRESETS.map((preset) => { + const isSelected = pendingSchedule?.type === 'preset' && pendingSchedule.preset === preset + return ( + + ) + })} +
    + ) : ( +
    + handleSpecificChange(e.target.value)} + className="w-full rounded-lg border border-[var(--app-border)] bg-[var(--app-bg)] px-2 py-1.5 text-sm text-[var(--app-fg)] focus:outline-none focus:ring-1 focus:ring-[var(--app-link)]" + /> + {specificError ? ( +

    {specificError}

    + ) : ( +

    + {t('composer.scheduleSpecificHint')} +

    + )} + +
    + )} +
    + + ) +} diff --git a/web/src/components/SessionChat.test.ts b/web/src/components/SessionChat.test.ts new file mode 100644 index 000000000..e1913ca38 --- /dev/null +++ b/web/src/components/SessionChat.test.ts @@ -0,0 +1,43 @@ +import { describe, expect, it } from 'vitest' +import { shouldAutoClearPendingSchedule } from './SessionChat' +import type { PendingSchedule } from '@/components/AssistantChat/ScheduleTimePicker' + +/** + * Unit tests for shouldAutoClearPendingSchedule. + * + * The useEffect in SessionChat auto-clears only 'absolute' pending schedules + * when the chosen time expires. 'preset' schedules must NOT be auto-cleared + * because they are relative to send time and have no fixed expiry. + * + * This test guards against future refactors that accidentally break the + * preset-stays-alive invariant (a silent break: the effect would cancel the + * preset with no user-visible error before send time). + */ +describe('shouldAutoClearPendingSchedule', () => { + it('returns false for null (no schedule set)', () => { + expect(shouldAutoClearPendingSchedule(null)).toBe(false) + }) + + it('returns false for preset schedule — presets do not expire before send', () => { + const preset: PendingSchedule = { type: 'preset', preset: '+5m' } + expect(shouldAutoClearPendingSchedule(preset)).toBe(false) + }) + + it('returns false for all preset values', () => { + const presets: Array<'+5m' | '+30m' | '+1h' | '+4h'> = ['+5m', '+30m', '+1h', '+4h'] + for (const p of presets) { + const pending: PendingSchedule = { type: 'preset', preset: p } + expect(shouldAutoClearPendingSchedule(pending)).toBe(false) + } + }) + + it('returns true for absolute schedule — absolute schedules have a fixed expiry instant', () => { + const absolute: PendingSchedule = { type: 'absolute', ms: Date.now() + 60_000 } + expect(shouldAutoClearPendingSchedule(absolute)).toBe(true) + }) + + it('returns true for expired absolute schedule (ms in the past)', () => { + const expired: PendingSchedule = { type: 'absolute', ms: Date.now() - 1000 } + expect(shouldAutoClearPendingSchedule(expired)).toBe(true) + }) +}) diff --git a/web/src/components/SessionChat.tsx b/web/src/components/SessionChat.tsx index 5561b05bf..583bd95a0 100644 --- a/web/src/components/SessionChat.tsx +++ b/web/src/components/SessionChat.tsx @@ -18,6 +18,8 @@ import { reconcileChatBlocks } from '@/chat/reconcile' import { buildConversationOutline } from '@/chat/outline' import { isQueuedForInvocation } from '@/lib/messages' import { HappyComposer } from '@/components/AssistantChat/HappyComposer' +import type { PendingSchedule } from '@/components/AssistantChat/ScheduleTimePicker' +import { resolvePendingSchedule } from '@/components/AssistantChat/ScheduleTimePicker' import { HappyThread } from '@/components/AssistantChat/HappyThread' import { QueuedMessagesBar } from '@/components/AssistantChat/QueuedMessagesBar' import { useHappyRuntime } from '@/lib/assistant-runtime' @@ -33,6 +35,19 @@ import { useVoiceOptional } from '@/lib/voice-context' import { RealtimeVoiceSession, registerSessionStore, registerVoiceHooksStore, voiceHooks } from '@/realtime' import { isRemoteTerminalSupported } from '@/utils/terminalSupport' +/** + * Returns whether a PendingSchedule should trigger an auto-clear timer. + * + * Only 'absolute' schedules expire (the chosen instant passes). + * 'preset' schedules are relative to send time and have no fixed expiry. + * + * Used both by the auto-clear useEffect and by unit tests, so a future + * variant of PendingSchedule only needs to update this single helper. + */ +export function shouldAutoClearPendingSchedule(pending: PendingSchedule | null): boolean { + return pending !== null && pending.type === 'absolute' +} + function getOutlineTitle(session: Session): string { if (session.metadata?.name) { return session.metadata.name @@ -60,7 +75,7 @@ export function SessionChat(props: { onBack: () => void onRefresh: () => void onLoadMore: () => Promise - onSend: (text: string, attachments?: AttachmentMetadata[]) => void + onSend: (text: string, attachments?: AttachmentMetadata[], scheduledAt?: number | null) => void onFlushPending: () => void onAtBottomChange: (atBottom: boolean) => void onRetryMessage?: (localId: string) => void @@ -372,8 +387,40 @@ export function SessionChat(props: { }) }, [navigate, props.session.id]) - const handleSend = useCallback((text: string, attachments?: AttachmentMetadata[]) => { - props.onSend(text, attachments) + // Scheduled message state — lifted here so useHappyRuntime can read the ref. + // + // pendingSchedule holds what the user selected (preset or absolute ms). + // The ref is read at send time; resolvePendingSchedule converts it to an + // absolute epoch-ms using Date.now() at that moment (send-time base for presets). + const [pendingSchedule, setPendingSchedule] = useState(null) + const pendingScheduleRef = useRef(null) + // Keep render ref in sync so onNew can snapshot at send time + pendingScheduleRef.current = pendingSchedule + + // Auto-clear absolute-type pendingSchedule when the chosen time expires so + // the composer clock button doesn't stay active past the scheduled instant. + // Preset-type schedules are relative so they don't expire until send — the + // shouldAutoClearPendingSchedule predicate is the single source of truth so + // adding a new PendingSchedule variant only needs to update that helper. + useEffect(() => { + if (!shouldAutoClearPendingSchedule(pendingSchedule)) return + // Narrowed to 'absolute' by the predicate above. + const ms = (pendingSchedule as Extract).ms + const remaining = ms - Date.now() + if (remaining <= 0) { + setPendingSchedule(null) + return + } + const timer = setTimeout(() => setPendingSchedule(null), remaining) + return () => clearTimeout(timer) + }, [pendingSchedule]) + + const handleSend = useCallback((text: string, attachments?: AttachmentMetadata[], scheduledAt?: number | null) => { + props.onSend(text, attachments, scheduledAt) + // Also clear pendingSchedule after send so the clock button de-activates + // (belt-and-suspenders: HappyComposer.handleSend calls onClearSchedule too, + // but this ensures the SessionChat state is cleared even if the path differs). + setPendingSchedule(null) setForceScrollToken((token) => token + 1) }, [props.onSend]) @@ -391,7 +438,8 @@ export function SessionChat(props: { onSendMessage: handleSend, onAbort: handleAbort, attachmentAdapter, - allowSendWhenInactive: true + allowSendWhenInactive: true, + pendingScheduleRef }) return ( @@ -454,13 +502,23 @@ export function SessionChat(props: { ) : null}
    - + { + // Restore the schedule so the clock button re-activates + setPendingSchedule(restored) + }} + />
    setPendingSchedule(null)} permissionMode={props.session.permissionMode} collaborationMode={codexCollaborationModeSupported ? props.session.collaborationMode : undefined} model={props.session.model} diff --git a/web/src/hooks/mutations/useSendMessage.ts b/web/src/hooks/mutations/useSendMessage.ts index 4e0a58655..aaa617344 100644 --- a/web/src/hooks/mutations/useSendMessage.ts +++ b/web/src/hooks/mutations/useSendMessage.ts @@ -16,6 +16,7 @@ type SendMessageInput = { localId: string createdAt: number attachments?: AttachmentMetadata[] + scheduledAt?: number | null } type BlockedReason = 'no-api' | 'no-session' | 'pending' @@ -48,6 +49,7 @@ function createOptimisticMessage(input: SendMessageInput, status: 'queued' | 'se // response that omits the field entirely (`undefined`) is treated as // already-invoked and stays in the thread, not the floating bar. invokedAt: null, + scheduledAt: input.scheduledAt ?? null, status, originalText: input.text, } @@ -72,7 +74,7 @@ export function useSendMessage( sessionId: string | null, options?: UseSendMessageOptions ): { - sendMessage: (text: string, attachments?: AttachmentMetadata[]) => void + sendMessage: (text: string, attachments?: AttachmentMetadata[], scheduledAt?: number | null) => void retryMessage: (localId: string) => void isSending: boolean } { @@ -87,7 +89,7 @@ export function useSendMessage( if (!api) { throw new Error('API unavailable') } - await api.sendMessage(input.sessionId, input.text, input.localId, input.attachments) + await api.sendMessage(input.sessionId, input.text, input.localId, input.attachments, input.scheduledAt) }, onMutate: async (input) => { const status = isSessionThinkingRef.current ? 'queued' as const : 'sending' as const @@ -109,7 +111,7 @@ export function useSendMessage( }, }) - const sendMessage = (text: string, attachments?: AttachmentMetadata[]) => { + const sendMessage = (text: string, attachments?: AttachmentMetadata[], scheduledAt?: number | null) => { if (!api) { options?.onBlocked?.('no-api') haptic.notification('error') @@ -152,6 +154,7 @@ export function useSendMessage( localId, createdAt, attachments, + scheduledAt, }) })() } diff --git a/web/src/lib/assistant-runtime.ts b/web/src/lib/assistant-runtime.ts index bcd0a3383..3091cacc1 100644 --- a/web/src/lib/assistant-runtime.ts +++ b/web/src/lib/assistant-runtime.ts @@ -1,6 +1,9 @@ import { useCallback, useMemo } from 'react' +import type React from 'react' import type { AppendMessage, AttachmentAdapter, ThreadMessageLike } from '@assistant-ui/react' import { useExternalMessageConverter, useExternalStoreRuntime } from '@assistant-ui/react' +import type { PendingSchedule } from '@/components/AssistantChat/ScheduleTimePicker' +import { resolvePendingSchedule } from '@/components/AssistantChat/ScheduleTimePicker' import { safeStringify } from '@hapi/protocol' import { renderEventLabel } from '@/chat/presentation' import type { ChatBlock, CliOutputBlock, UsageData } from '@/chat/types' @@ -208,10 +211,11 @@ export function useHappyRuntime(props: { session: Session blocks: readonly ChatBlock[] isSending: boolean - onSendMessage: (text: string, attachments?: AttachmentMetadata[]) => void + onSendMessage: (text: string, attachments?: AttachmentMetadata[], scheduledAt?: number | null) => void onAbort: () => Promise attachmentAdapter?: AttachmentAdapter allowSendWhenInactive?: boolean + pendingScheduleRef?: React.RefObject }) { // Use cached message converter for performance optimization // This prevents re-converting all messages on every render @@ -224,8 +228,13 @@ export function useHappyRuntime(props: { const onNew = useCallback(async (message: AppendMessage) => { const { text, attachments } = extractMessageContent(message) if (!text && attachments.length === 0) return - props.onSendMessage(text, attachments.length > 0 ? attachments : undefined) - }, [props.onSendMessage]) + // Resolve pendingSchedule at send time (Date.now()) so preset-type schedules + // ("5 minutes from now") are relative to the actual send action, not the + // moment the user clicked the preset button. + const sendNow = Date.now() + const scheduledAt = resolvePendingSchedule(props.pendingScheduleRef?.current ?? null, sendNow) + props.onSendMessage(text, attachments.length > 0 ? attachments : undefined, scheduledAt) + }, [props.onSendMessage, props.pendingScheduleRef]) const onCancel = useCallback(async () => { await props.onAbort() @@ -252,5 +261,9 @@ export function useHappyRuntime(props: { props.attachmentAdapter ]) + // Note: pendingScheduleRef is intentionally not in the deps above. + // The ref is read at send time inside onNew (not at render time), so changes + // to pendingSchedule do not need to invalidate the adapter or re-run onNew. + return useExternalStoreRuntime(adapter) } diff --git a/web/src/lib/locales/en.ts b/web/src/lib/locales/en.ts index 8f09b929e..97736de24 100644 --- a/web/src/lib/locales/en.ts +++ b/web/src/lib/locales/en.ts @@ -241,6 +241,14 @@ export default { 'composer.send': 'Send', 'composer.stop': 'Stop', 'composer.voice': 'Voice assistant', + 'composer.scheduleSend': 'Schedule send', + 'composer.scheduleRelativeTab': 'Relative', + 'composer.scheduleSpecificTab': 'Specific', + 'composer.scheduleSpecificHint': 'Max 7 days. Requires the hub running; the CLI catches up the next time it connects after that time.', + 'composer.scheduleErrorPast': 'Scheduled time must be in the future.', + 'composer.scheduleErrorTooFar': 'Maximum schedule time is 7 days.', + 'queuedMessages.scheduledFor': 'Scheduled for {time}', + 'queuedMessages.editAlreadyInvoked': "Message already sent — it can't be edited", 'composer.codexSlashUnsupported.title': 'Codex command unavailable', 'composer.codexSlashUnsupported.body': 'HAPI remote mode does not yet run built-in Codex slash commands like {command}. Use natural language instead, or run it in the local Codex TUI.', diff --git a/web/src/lib/locales/zh-CN.ts b/web/src/lib/locales/zh-CN.ts index fb8850a6b..83868b726 100644 --- a/web/src/lib/locales/zh-CN.ts +++ b/web/src/lib/locales/zh-CN.ts @@ -243,6 +243,14 @@ export default { 'composer.send': '发送', 'composer.stop': '停止', 'composer.voice': '语音助手', + 'composer.scheduleSend': '定时发送', + 'composer.scheduleRelativeTab': '相对时间', + 'composer.scheduleSpecificTab': '指定时间', + 'composer.scheduleSpecificHint': '最多 7 天。需 Hub 运行;CLI 下次连接时会接收消息。', + 'composer.scheduleErrorPast': '发送时间必须在未来。', + 'composer.scheduleErrorTooFar': '最多只能定时 7 天。', + 'queuedMessages.scheduledFor': '定时发送: {time}', + 'queuedMessages.editAlreadyInvoked': '消息已发送,无法编辑', 'composer.codexSlashUnsupported.title': '无法执行 Codex 命令', 'composer.codexSlashUnsupported.body': 'HAPI 远程模式暂不支持 {command} 这类 Codex 内建 slash command,请改用自然语言,或在本地 Codex TUI 中执行。',