diff --git a/agentscope-harness/src/main/java/io/agentscope/harness/agent/gateway/WakeupDispatcher.java b/agentscope-harness/src/main/java/io/agentscope/harness/agent/gateway/WakeupDispatcher.java index d102f9106..04218fb97 100644 --- a/agentscope-harness/src/main/java/io/agentscope/harness/agent/gateway/WakeupDispatcher.java +++ b/agentscope-harness/src/main/java/io/agentscope/harness/agent/gateway/WakeupDispatcher.java @@ -70,11 +70,10 @@ public WakeupDispatcher(MessageBus messageBus, WakeupTarget target) { } /** - * Starts the dispatcher: performs an initial drain of any queued wakeups (to pick up signals - * produced while this process was down), then subscribes to the live signal channel. + * Starts the dispatcher: subscribes to the live signal channel, then performs an initial drain + * of any queued wakeups produced while this process was down. */ public void start() { - drainAndDispatch(); subscription = messageBus .subscribeWakeup() @@ -85,6 +84,7 @@ public void start() { "WakeupDispatcher subscription error; " + "dispatcher is dead", err)); + drainAndDispatch(); log.info("WakeupDispatcher started"); } @@ -100,14 +100,20 @@ public void close() { private void drainAndDispatch() { try { - List entries = - messageBus.queueDrain("agentscope:wakeups", MAX_DRAIN_COUNT).block(); - if (entries == null || entries.isEmpty()) { - return; - } - - for (BusEntry entry : entries) { - dispatch(entry.payload()); + while (true) { + List entries = + messageBus.queueDrain("agentscope:wakeups", MAX_DRAIN_COUNT).block(); + if (entries == null || entries.isEmpty()) { + return; + } + + for (BusEntry entry : entries) { + dispatch(entry.payload()); + } + + if (entries.size() < MAX_DRAIN_COUNT) { + return; + } } } catch (Exception e) { log.warn("WakeupDispatcher: drainAndDispatch failed", e); diff --git a/agentscope-harness/src/test/java/io/agentscope/harness/agent/gateway/WakeupDispatcherTest.java b/agentscope-harness/src/test/java/io/agentscope/harness/agent/gateway/WakeupDispatcherTest.java index a5777ded3..20ddf2329 100644 --- a/agentscope-harness/src/test/java/io/agentscope/harness/agent/gateway/WakeupDispatcherTest.java +++ b/agentscope-harness/src/test/java/io/agentscope/harness/agent/gateway/WakeupDispatcherTest.java @@ -32,6 +32,7 @@ import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.DisplayName; import org.junit.jupiter.api.Test; +import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; class WakeupDispatcherTest { @@ -115,6 +116,44 @@ void initialDrain_picksUpPreExisting() throws Exception { assertEquals("sess-pre", target.wokenSessions.get(0)); } + @Test + @DisplayName("Startup drain processes all pre-existing wakeups across multiple batches") + void initialDrain_processesMoreThanOneBatch() { + int wakeupCount = 65; + for (int i = 0; i < wakeupCount; i++) { + String sessionId = "sess-pre-" + i; + target.addKnown(sessionId); + messageBus + .queuePush( + "agentscope:wakeups", + Map.of( + "userId", "user-1", + "sessionId", sessionId, + "agentId", "agent-main")) + .block(); + } + + dispatcher.start(); + + assertEquals(wakeupCount, target.wokenSessions.size()); + } + + @Test + @DisplayName("Startup subscribes before draining the durable wakeup queue") + void startup_subscribesBeforeInitialDrain() { + String sessionId = "sess-startup-race"; + target.addKnown(sessionId); + messageBus = + new EnqueueOnSubscribeMessageBus( + new LocalFilesystem(tmpDir, true, 10), "/bus", sessionId); + dispatcher = new WakeupDispatcher(messageBus, target); + + dispatcher.start(); + + assertEquals(1, target.wokenSessions.size()); + assertEquals(sessionId, target.wokenSessions.get(0)); + } + // ------------------------------------------------------------------ // Test double // ------------------------------------------------------------------ @@ -157,4 +196,30 @@ boolean awaitWakeup(int expected, long timeout, TimeUnit unit) throws Interrupte return wakeupLatch.await(timeout, unit); } } + + private static final class EnqueueOnSubscribeMessageBus extends WorkspaceMessageBus { + + private final String sessionId; + + private EnqueueOnSubscribeMessageBus( + LocalFilesystem filesystem, String busRoot, String sessionId) { + super(filesystem, busRoot); + this.sessionId = sessionId; + } + + @Override + public Flux> subscribeWakeup() { + return Flux.defer( + () -> { + queuePush( + "agentscope:wakeups", + Map.of( + "userId", "user-1", + "sessionId", sessionId, + "agentId", "agent-main")) + .block(); + return Flux.never(); + }); + } + } }