Skip to content

Commit 2cfffc4

Browse files
authored
fix: propagate Reactor context to chunk event hooks (#1923)
Related to #1823
1 parent fb0fd36 commit 2cfffc4

2 files changed

Lines changed: 101 additions & 73 deletions

File tree

agentscope-core/src/main/java/io/agentscope/core/ReActAgent.java

Lines changed: 88 additions & 60 deletions
Original file line numberDiff line numberDiff line change
@@ -2111,28 +2111,37 @@ private Flux<AgentEvent> modelCallStream(
21112111
mci.model().stream(mci.messages(), mci.tools(), mci.options())
21122112
.concatMap(chunk -> checkInterrupted().thenReturn(chunk))
21132113
.concatMap(
2114-
chunk -> {
2115-
List<Msg> chunkMsgs = context.processChunk(chunk);
2116-
for (Msg msg : chunkMsgs) {
2117-
hookDispatcher
2118-
.fireReasoningChunk(
2119-
msg,
2120-
context,
2121-
mci.model().getModelName())
2122-
.subscribe();
2123-
}
2124-
2125-
List<AgentEvent> events = new ArrayList<>();
2126-
for (ContentBlock block : chunk.getContent()) {
2127-
emitBlockEvents(
2128-
block,
2129-
context,
2130-
blockLifecycle,
2131-
withToolEvents,
2132-
events);
2133-
}
2134-
return Flux.fromIterable(events);
2135-
});
2114+
chunk ->
2115+
Flux.deferContextual(
2116+
parentCtx -> {
2117+
List<Msg> chunkMsgs =
2118+
context.processChunk(chunk);
2119+
for (Msg msg : chunkMsgs) {
2120+
hookDispatcher
2121+
.fireReasoningChunk(
2122+
msg,
2123+
context,
2124+
mci.model()
2125+
.getModelName())
2126+
.contextWrite(
2127+
ctx ->
2128+
ctx.putAll(
2129+
parentCtx))
2130+
.subscribe();
2131+
}
2132+
2133+
List<AgentEvent> events = new ArrayList<>();
2134+
for (ContentBlock block :
2135+
chunk.getContent()) {
2136+
emitBlockEvents(
2137+
block,
2138+
context,
2139+
blockLifecycle,
2140+
withToolEvents,
2141+
events);
2142+
}
2143+
return Flux.fromIterable(events);
2144+
}));
21362145

21372146
Flux<AgentEvent> endEvents =
21382147
Flux.defer(
@@ -2562,6 +2571,10 @@ private Flux<AgentEvent> runToolBatch(
25622571
hookDispatcher
25632572
.fireActingChunk(
25642573
toolUse, chunk, toolkit)
2574+
.contextWrite(
2575+
ctx ->
2576+
ctx.putAll(
2577+
parentCtx))
25652578
.subscribe();
25662579
});
25672580

@@ -3086,44 +3099,59 @@ private Flux<AgentEvent> summaryModelCallStream(
30863099
mci.model().stream(mci.messages(), mci.tools(), mci.options())
30873100
.concatMap(chunk -> checkInterrupted().thenReturn(chunk))
30883101
.concatMap(
3089-
chunk -> {
3090-
List<Msg> chunkMsgs = context.processChunk(chunk);
3091-
for (Msg msg : chunkMsgs) {
3092-
hookDispatcher
3093-
.fireSummaryChunk(
3094-
msg,
3095-
context,
3096-
hookOptions,
3097-
model.getModelName())
3098-
.subscribe();
3099-
}
3100-
3101-
List<AgentEvent> events = new ArrayList<>();
3102-
for (ContentBlock block : chunk.getContent()) {
3103-
if (block instanceof TextBlock tb) {
3104-
blockLifecycle.startText(events);
3105-
if (tb.getText() != null
3106-
&& !tb.getText().isEmpty()) {
3107-
events.add(
3108-
new TextBlockDeltaEvent(
3109-
blockLifecycle.replyId,
3110-
"text",
3111-
tb.getText()));
3112-
}
3113-
} else if (block instanceof ThinkingBlock tb) {
3114-
blockLifecycle.startThinking(events);
3115-
if (tb.getThinking() != null
3116-
&& !tb.getThinking().isEmpty()) {
3117-
events.add(
3118-
new ThinkingBlockDeltaEvent(
3119-
blockLifecycle.replyId,
3120-
"thinking",
3121-
tb.getThinking()));
3122-
}
3123-
}
3124-
}
3125-
return Flux.fromIterable(events);
3126-
});
3102+
chunk ->
3103+
Flux.deferContextual(
3104+
parentCtx -> {
3105+
List<Msg> chunkMsgs =
3106+
context.processChunk(chunk);
3107+
for (Msg msg : chunkMsgs) {
3108+
hookDispatcher
3109+
.fireSummaryChunk(
3110+
msg,
3111+
context,
3112+
hookOptions,
3113+
model.getModelName())
3114+
.contextWrite(
3115+
ctx ->
3116+
ctx.putAll(
3117+
parentCtx))
3118+
.subscribe();
3119+
}
3120+
3121+
List<AgentEvent> events = new ArrayList<>();
3122+
for (ContentBlock block :
3123+
chunk.getContent()) {
3124+
if (block instanceof TextBlock tb) {
3125+
blockLifecycle.startText(events);
3126+
if (tb.getText() != null
3127+
&& !tb.getText()
3128+
.isEmpty()) {
3129+
events.add(
3130+
new TextBlockDeltaEvent(
3131+
blockLifecycle
3132+
.replyId,
3133+
"text",
3134+
tb.getText()));
3135+
}
3136+
} else if (block
3137+
instanceof ThinkingBlock tb) {
3138+
blockLifecycle.startThinking(
3139+
events);
3140+
if (tb.getThinking() != null
3141+
&& !tb.getThinking()
3142+
.isEmpty()) {
3143+
events.add(
3144+
new ThinkingBlockDeltaEvent(
3145+
blockLifecycle
3146+
.replyId,
3147+
"thinking",
3148+
tb
3149+
.getThinking()));
3150+
}
3151+
}
3152+
}
3153+
return Flux.fromIterable(events);
3154+
}));
31273155

31283156
Flux<AgentEvent> endEvents =
31293157
Flux.defer(

agentscope-core/src/test/java/io/agentscope/core/tool/subagent/SubAgentToolTimeoutRetryIntegrationTest.java

Lines changed: 13 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -188,25 +188,25 @@ public Mono<ToolResultBlock> callAsync(ToolCallParam p) {
188188
verify(slowSpy, atLeastOnce()).interrupt(any(RuntimeContext.class));
189189
verify(fastSpy, never()).interrupt(any(RuntimeContext.class));
190190

191-
// ---- proof #2: wait for slow_tool to finish its 4s sleep ----
192-
// mono.block() returns ~3s (after timeout + fast retry), but slow_tool
193-
// is still running on the old agent's detached .subscribe() thread.
194-
Thread.sleep(3_000);
191+
// ---- proof #2: cancellation propagates to the inner tool execution ----
192+
// Since sink.onCancel(lifecycleDisposable) properly disposes the inner
193+
// subscription, slow_tool's Thread.sleep is interrupted before it can
194+
// write to the file. Wait briefly to confirm nothing was written.
195+
Thread.sleep(2_000);
195196

196-
// ---- proof #3: slow_tool wrote exactly 1 line then stopped ----
197+
// ---- proof #3: slow_tool wrote 0 lines (properly cancelled) ----
197198
long fileLines = Files.readAllLines(tmpFile).size();
198199
assertEquals(
199-
1, fileLines, "Expected exactly 1 tool invocation, got %d".formatted(fileLines));
200-
201-
// ---- proof #4: wait 3 more seconds → no new lines (loop is dead) ----
202-
Thread.sleep(3_000);
203-
long fileLinesAfter = Files.readAllLines(tmpFile).size();
204-
assertEquals(
200+
0,
205201
fileLines,
206-
fileLinesAfter,
207-
"File should be frozen at %d lines. Agent loop did not restart."
202+
"Expected 0 tool invocations (cancelled before write), got %d"
208203
.formatted(fileLines));
209204

205+
// ---- proof #4: wait more → still no lines (loop is dead) ----
206+
Thread.sleep(2_000);
207+
long fileLinesAfter = Files.readAllLines(tmpFile).size();
208+
assertEquals(0, fileLinesAfter, "File should remain empty. Agent loop did not restart.");
209+
210210
// ---- proof #5: MockModel was called exactly once ----
211211
// If interrupt didn't work, the agent loop would call reasoning() again
212212
// after slow_tool returned → model.stream() would be called a second time

0 commit comments

Comments
 (0)