Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -70,11 +70,10 @@ public WakeupDispatcher(MessageBus messageBus, WakeupTarget target) {
}

/**
* Starts the dispatcher: performs an initial drain of any queued wakeups (to pick up signals
* produced while this process was down), then subscribes to the live signal channel.
* Starts the dispatcher: subscribes to the live signal channel, then performs an initial drain
* of any queued wakeups produced while this process was down.
*/
public void start() {
drainAndDispatch();
subscription =
messageBus
.subscribeWakeup()
Expand All @@ -85,6 +84,7 @@ public void start() {
"WakeupDispatcher subscription error; "
+ "dispatcher is dead",
err));
drainAndDispatch();
log.info("WakeupDispatcher started");
}

Expand All @@ -100,14 +100,20 @@ public void close() {

private void drainAndDispatch() {
try {
List<BusEntry> entries =
messageBus.queueDrain("agentscope:wakeups", MAX_DRAIN_COUNT).block();
if (entries == null || entries.isEmpty()) {
return;
}

for (BusEntry entry : entries) {
dispatch(entry.payload());
while (true) {
List<BusEntry> entries =
messageBus.queueDrain("agentscope:wakeups", MAX_DRAIN_COUNT).block();
if (entries == null || entries.isEmpty()) {
return;
}

for (BusEntry entry : entries) {
dispatch(entry.payload());
}

if (entries.size() < MAX_DRAIN_COUNT) {
return;
}
}
} catch (Exception e) {
log.warn("WakeupDispatcher: drainAndDispatch failed", e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.DisplayName;
import org.junit.jupiter.api.Test;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

class WakeupDispatcherTest {
Expand Down Expand Up @@ -115,6 +116,44 @@ void initialDrain_picksUpPreExisting() throws Exception {
assertEquals("sess-pre", target.wokenSessions.get(0));
}

@Test
@DisplayName("Startup drain processes all pre-existing wakeups across multiple batches")
void initialDrain_processesMoreThanOneBatch() {
int wakeupCount = 65;
for (int i = 0; i < wakeupCount; i++) {
String sessionId = "sess-pre-" + i;
target.addKnown(sessionId);
messageBus
.queuePush(
"agentscope:wakeups",
Map.of(
"userId", "user-1",
"sessionId", sessionId,
"agentId", "agent-main"))
.block();
}

dispatcher.start();

assertEquals(wakeupCount, target.wokenSessions.size());
}

@Test
@DisplayName("Startup subscribes before draining the durable wakeup queue")
void startup_subscribesBeforeInitialDrain() {
String sessionId = "sess-startup-race";
target.addKnown(sessionId);
messageBus =
new EnqueueOnSubscribeMessageBus(
new LocalFilesystem(tmpDir, true, 10), "/bus", sessionId);
dispatcher = new WakeupDispatcher(messageBus, target);

dispatcher.start();

assertEquals(1, target.wokenSessions.size());
assertEquals(sessionId, target.wokenSessions.get(0));
}

// ------------------------------------------------------------------
// Test double
// ------------------------------------------------------------------
Expand Down Expand Up @@ -157,4 +196,30 @@ boolean awaitWakeup(int expected, long timeout, TimeUnit unit) throws Interrupte
return wakeupLatch.await(timeout, unit);
}
}

private static final class EnqueueOnSubscribeMessageBus extends WorkspaceMessageBus {

private final String sessionId;

private EnqueueOnSubscribeMessageBus(
LocalFilesystem filesystem, String busRoot, String sessionId) {
super(filesystem, busRoot);
this.sessionId = sessionId;
}

@Override
public Flux<Map<String, Object>> subscribeWakeup() {
return Flux.defer(
() -> {
queuePush(
"agentscope:wakeups",
Map.of(
"userId", "user-1",
"sessionId", sessionId,
"agentId", "agent-main"))
.block();
return Flux.never();
});
}
}
}
Loading