@@ -45,7 +45,16 @@ import {
4545import { generateAssistantReply } from "@/chat/respond" ;
4646import type { JuniorDatabase } from "@/chat/sql/db" ;
4747import { juniorSqlSchema } from "@/chat/sql/schema" ;
48- import { schedulerPlugin } from "@sentry/junior-scheduler" ;
48+ import {
49+ createSchedulerSqlStore ,
50+ schedulerPlugin ,
51+ type ScheduledTask ,
52+ } from "@sentry/junior-scheduler" ;
53+ import { runPluginHeartbeats } from "@/chat/agent-dispatch/heartbeat" ;
54+ import { runAgentDispatchSlice } from "@/chat/agent-dispatch/runner" ;
55+ import { verifyDispatchCallbackRequest } from "@/chat/agent-dispatch/signing" ;
56+ import { getDispatchRecord } from "@/chat/agent-dispatch/store" ;
57+ import type { DispatchCallback } from "@/chat/agent-dispatch/types" ;
4958import { getStateAdapter } from "@/chat/state/adapter" ;
5059import { resetSkillDiscoveryCache } from "@/chat/skills" ;
5160import { createWebFetchTool } from "@/chat/tools/web/fetch-tool" ;
@@ -130,11 +139,22 @@ interface AssistantContextChangedEvent extends EvalBaseEvent {
130139 user_id ?: string ;
131140}
132141
142+ interface ScheduledTaskDueEvent extends EvalBaseEvent {
143+ type : "scheduled_task_due" ;
144+ now_ms ?: number ;
145+ recurrence ?: "daily" | "weekly" | "monthly" | "yearly" ;
146+ schedule ?: string ;
147+ schedule_kind ?: "one_off" | "recurring" ;
148+ task_text : string ;
149+ timezone ?: string ;
150+ }
151+
133152export type EvalEvent =
134153 | MentionEvent
135154 | SubscribedMessageEvent
136155 | AssistantThreadStartedEvent
137- | AssistantContextChangedEvent ;
156+ | AssistantContextChangedEvent
157+ | ScheduledTaskDueEvent ;
138158
139159interface SubscribedDecisionFixture {
140160 reason : string ;
@@ -1276,7 +1296,7 @@ async function setupHarnessEnvironment(
12761296 scenario . events . flatMap ( ( event ) =>
12771297 "message" in event
12781298 ? [ event . message . author ?. user_id ?. trim ( ) || "U-test" ]
1279- : event . user_id
1299+ : "user_id" in event && event . user_id
12801300 ? [ event . user_id ]
12811301 : [ ] ,
12821302 ) ,
@@ -1569,12 +1589,21 @@ function buildRuntimeServices(
15691589async function processEvents ( args : {
15701590 scenario : EvalScenario ;
15711591 env : HarnessEnvironment ;
1592+ generateAssistantReply : typeof generateAssistantReply ;
15721593 slackRuntime : ReturnType < typeof createSlackRuntime > ;
15731594 getThreadRecord : ( fixture : EvalEventThreadFixture ) => EvalThreadRecord ;
15741595 readyQueueDeliveries : QueueDelivery [ ] ;
1596+ schedulerDb : PluginDb ;
15751597} ) : Promise < void > {
1576- const { scenario, env, slackRuntime, getThreadRecord, readyQueueDeliveries } =
1577- args ;
1598+ const {
1599+ scenario,
1600+ env,
1601+ generateAssistantReply,
1602+ slackRuntime,
1603+ getThreadRecord,
1604+ readyQueueDeliveries,
1605+ schedulerDb,
1606+ } = args ;
15781607
15791608 const consumedOauthStates = new Set < string > ( ) ;
15801609 const consumedMcpAuthSessions = new Set < string > ( ) ;
@@ -1650,9 +1679,113 @@ async function processEvents(args: {
16501679 await slackRuntime . handleAssistantContextChanged ( lifecycleEvent ) ;
16511680 } ;
16521681
1682+ const runScheduledTaskDue = async (
1683+ event : ScheduledTaskDueEvent ,
1684+ ) : Promise < void > => {
1685+ const { thread } = getThreadRecord ( event . thread ) ;
1686+ const nowMs = event . now_ms ?? Date . parse ( "2026-05-26T12:00:00.000Z" ) ;
1687+ const scheduleKind = event . schedule_kind ?? "one_off" ;
1688+ const taskId = `eval_schedule_${ thread . channelId } _${ nowMs } ` ;
1689+ const task : ScheduledTask = {
1690+ id : taskId ,
1691+ createdAtMs : nowMs - 60_000 ,
1692+ createdBy : { slackUserId : "U-test" , userName : "testuser" } ,
1693+ destination : createEvalDestination (
1694+ thread ,
1695+ ) as ScheduledTask [ "destination" ] ,
1696+ nextRunAtMs : nowMs ,
1697+ schedule : {
1698+ description :
1699+ event . schedule ??
1700+ ( scheduleKind === "recurring" ? "Weekly at noon" : "Once now" ) ,
1701+ kind : scheduleKind ,
1702+ timezone : event . timezone ?? "UTC" ,
1703+ ...( scheduleKind === "recurring"
1704+ ? {
1705+ recurrence : {
1706+ frequency : event . recurrence ?? "weekly" ,
1707+ interval : 1 ,
1708+ startDate : new Date ( nowMs ) . toISOString ( ) . slice ( 0 , 10 ) ,
1709+ time : { hour : 12 , minute : 0 } ,
1710+ } ,
1711+ }
1712+ : { } ) ,
1713+ } ,
1714+ status : "active" ,
1715+ task : { text : event . task_text } ,
1716+ updatedAtMs : nowMs - 60_000 ,
1717+ } ;
1718+ const schedulerStore = createSchedulerSqlStore ( schedulerDb ) ;
1719+ await schedulerStore . saveTask ( task ) ;
1720+
1721+ const callbacks : DispatchCallback [ ] = [ ] ;
1722+ const expectedCallbackUrl = new URL (
1723+ "/api/internal/agent-dispatch" ,
1724+ process . env . JUNIOR_BASE_URL ,
1725+ ) . href ;
1726+ const originalFetch = globalThis . fetch ;
1727+ globalThis . fetch = ( async ( input , init ) => {
1728+ const url =
1729+ typeof input === "string"
1730+ ? input
1731+ : input instanceof URL
1732+ ? input . href
1733+ : input . url ;
1734+ if ( new URL ( url ) . href === expectedCallbackUrl ) {
1735+ const callback = await verifyDispatchCallbackRequest (
1736+ new Request ( input , init ) ,
1737+ ) ;
1738+ if ( ! callback ) {
1739+ return new Response ( "Unauthorized" , { status : 401 } ) ;
1740+ }
1741+ callbacks . push ( callback ) ;
1742+ return new Response ( "Accepted" , { status : 202 } ) ;
1743+ }
1744+ return await originalFetch ( input , init ) ;
1745+ } ) as typeof fetch ;
1746+ try {
1747+ await runPluginHeartbeats ( { nowMs } ) ;
1748+ } finally {
1749+ globalThis . fetch = originalFetch ;
1750+ }
1751+ if ( callbacks . length === 0 ) {
1752+ throw new Error (
1753+ "Scheduled eval task did not enqueue a dispatch callback." ,
1754+ ) ;
1755+ }
1756+
1757+ const dispatchedRuns = ( await schedulerStore . listIncompleteRuns ( ) ) . filter (
1758+ ( run ) => run . taskId === taskId && run . dispatchId ,
1759+ ) ;
1760+ if ( dispatchedRuns . length === 0 ) {
1761+ const runs = ( await schedulerStore . listIncompleteRuns ( ) ) . filter (
1762+ ( run ) => run . taskId === taskId ,
1763+ ) ;
1764+ const savedTask = await schedulerStore . getTask ( taskId ) ;
1765+ throw new Error (
1766+ `Scheduled eval task did not create a dispatch: ${ JSON . stringify ( { runs, savedTask } ) } ` ,
1767+ ) ;
1768+ }
1769+ for ( const run of dispatchedRuns ) {
1770+ const dispatch = await getDispatchRecord ( run . dispatchId ! ) ;
1771+ if ( ! dispatch ) {
1772+ throw new Error ( "Scheduled eval dispatch record was not found." ) ;
1773+ }
1774+ const callback = callbacks . find (
1775+ ( candidate ) => candidate . id === dispatch . id ,
1776+ ) ;
1777+ if ( ! callback ) {
1778+ throw new Error ( "Scheduled eval dispatch callback was not captured." ) ;
1779+ }
1780+ await runAgentDispatchSlice ( callback , { generateAssistantReply } ) ;
1781+ }
1782+ } ;
1783+
16531784 for ( const event of scenario . events ) {
16541785 if ( event . type === "new_mention" || event . type === "subscribed_message" ) {
16551786 enqueueEvent ( event ) ;
1787+ } else if ( event . type === "scheduled_task_due" ) {
1788+ await runScheduledTaskDue ( event ) ;
16561789 } else {
16571790 await runLifecycleEvent ( event ) ;
16581791 }
@@ -1803,6 +1936,11 @@ export async function runEvalScenario(
18031936 threadRecordsById ,
18041937 observations ,
18051938 ) ;
1939+ const generateEvalAssistantReply =
1940+ services . replyExecutor ?. generateAssistantReply ;
1941+ if ( ! generateEvalAssistantReply ) {
1942+ throw new Error ( "Eval reply executor was not configured." ) ;
1943+ }
18061944
18071945 const slackRuntime = createSlackRuntime ( {
18081946 getSlackAdapter : ( ) => slackAdapter as any ,
@@ -1812,9 +1950,11 @@ export async function runEvalScenario(
18121950 await processEvents ( {
18131951 scenario,
18141952 env,
1953+ generateAssistantReply : generateEvalAssistantReply ,
18151954 slackRuntime,
18161955 getThreadRecord,
18171956 readyQueueDeliveries,
1957+ schedulerDb : schedulerSql . db ,
18181958 } ) ;
18191959
18201960 return collectResults (
0 commit comments