Skip to content

Commit b752e68

Browse files
partick33chickenlj
andauthored
fix(agent): resolve serializeOnKey gate leak in Flux.create callbacks (#1796)
Closes #1798 Co-authored-by: Chickenlj <ken.lj.hz@gmail.com>
1 parent dbc82fe commit b752e68

3 files changed

Lines changed: 172 additions & 85 deletions

File tree

agentscope-core/src/main/java/io/agentscope/core/ReActAgent.java

Lines changed: 73 additions & 58 deletions
Original file line numberDiff line numberDiff line change
@@ -140,6 +140,7 @@
140140
import java.util.stream.IntStream;
141141
import org.slf4j.Logger;
142142
import org.slf4j.LoggerFactory;
143+
import reactor.core.Disposable;
143144
import reactor.core.publisher.Flux;
144145
import reactor.core.publisher.FluxSink;
145146
import reactor.core.publisher.Mono;
@@ -846,28 +847,33 @@ private Flux<AgentEvent> buildAgentStream(
846847
// filtered out by callInternal before reaching the caller.
847848
boolean isSubagentBusPath =
848849
subscriberCtx.hasKey(SubagentEventBus.CONTEXT_KEY);
849-
lifecycle
850-
.contextWrite(c -> c.put(EVENT_SINK_KEY, sink))
851-
.contextWrite(
852-
c ->
853-
isSubagentBusPath
854-
? c
855-
: c.put(
856-
AgentEventEmitter
857-
.CONTEXT_KEY,
858-
(AgentEventEmitter)
859-
sink::next))
860-
.doFinally(
861-
signal -> {
862-
sink.next(new AgentEndEvent(replyId));
863-
sink.complete();
864-
})
865-
.contextWrite(subscriberCtx)
866-
.subscribe(
867-
finalMsg ->
868-
sink.next(
869-
new AgentResultEvent(finalMsg)),
870-
sink::error);
850+
Disposable lifecycleDisposable =
851+
lifecycle
852+
.contextWrite(c -> c.put(EVENT_SINK_KEY, sink))
853+
.contextWrite(
854+
c ->
855+
isSubagentBusPath
856+
? c
857+
: c.put(
858+
AgentEventEmitter
859+
.CONTEXT_KEY,
860+
(AgentEventEmitter)
861+
sink
862+
::next))
863+
.doFinally(
864+
signal -> {
865+
sink.next(
866+
new AgentEndEvent(replyId));
867+
sink.complete();
868+
})
869+
.contextWrite(subscriberCtx)
870+
.subscribe(
871+
finalMsg ->
872+
sink.next(
873+
new AgentResultEvent(
874+
finalMsg)),
875+
sink::error);
876+
sink.onCancel(lifecycleDisposable);
871877
},
872878
FluxSink.OverflowStrategy.BUFFER);
873879
return MiddlewareChain.build(middlewares, this, context, MiddlewareBase::onAgent, core)
@@ -2559,44 +2565,53 @@ private Flux<AgentEvent> runToolBatch(
25592565
.subscribe();
25602566
});
25612567

2562-
executeToolCalls(approved)
2563-
.contextWrite(ctx -> ctx.putAll(parentCtx))
2564-
.subscribe(
2565-
results -> {
2566-
List<
2567-
Map.Entry<
2568+
Disposable toolCallsDisposable =
2569+
executeToolCalls(approved)
2570+
.contextWrite(
2571+
ctx ->
2572+
ctx.putAll(
2573+
parentCtx))
2574+
.subscribe(
2575+
results -> {
2576+
List<
2577+
Map
2578+
.Entry<
2579+
ToolUseBlock,
2580+
ToolResultBlock>>
2581+
merged =
2582+
new ArrayList<>(
2583+
deniedEntries);
2584+
merged.addAll(results);
2585+
resultHolder.set(
2586+
merged);
2587+
for (Map.Entry<
25682588
ToolUseBlock,
2569-
ToolResultBlock>>
2570-
merged =
2571-
new ArrayList<>(
2572-
deniedEntries);
2573-
merged.addAll(results);
2574-
resultHolder.set(merged);
2575-
for (Map.Entry<
2576-
ToolUseBlock,
2577-
ToolResultBlock>
2578-
entry : results) {
2579-
emitToolResultDelta(
2580-
sink,
2581-
replyId,
2582-
entry,
2583-
chunkedToolIds);
2584-
ToolResultState state =
2585-
determineToolResultState(
2586-
entry
2587-
.getValue());
2588-
sink.next(
2589-
new ToolResultEndEvent(
2589+
ToolResultBlock>
2590+
entry :
2591+
results) {
2592+
emitToolResultDelta(
2593+
sink,
25902594
replyId,
2591-
entry.getKey()
2592-
.getId(),
2593-
entry.getKey()
2594-
.getName(),
2595-
state));
2596-
}
2597-
sink.complete();
2598-
},
2599-
sink::error);
2595+
entry,
2596+
chunkedToolIds);
2597+
ToolResultState
2598+
state =
2599+
determineToolResultState(
2600+
entry
2601+
.getValue());
2602+
sink.next(
2603+
new ToolResultEndEvent(
2604+
replyId,
2605+
entry.getKey()
2606+
.getId(),
2607+
entry.getKey()
2608+
.getName(),
2609+
state));
2610+
}
2611+
sink.complete();
2612+
},
2613+
sink::error);
2614+
sink.onCancel(toolCallsDisposable);
26002615
}));
26012616

26022617
return deniedEvents.concatWith(approvedEvents);

agentscope-core/src/main/java/io/agentscope/core/agent/AgentBase.java

Lines changed: 32 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@
3838
import java.util.concurrent.atomic.AtomicBoolean;
3939
import java.util.function.Function;
4040
import java.util.function.Supplier;
41+
import reactor.core.Disposable;
4142
import reactor.core.publisher.Flux;
4243
import reactor.core.publisher.FluxSink;
4344
import reactor.core.publisher.Mono;
@@ -996,33 +997,37 @@ private Flux<Event> createEventStream(StreamOptions options, Supplier<Mono<Msg>>
996997

997998
// Use Mono.defer to ensure trace context propagation
998999
// while maintaining streaming hook functionality
999-
Mono.defer(() -> callSupplier.get())
1000-
.contextWrite(
1001-
context ->
1002-
context.put(
1003-
SubagentEventBus
1004-
.CONTEXT_KEY,
1005-
bus)
1006-
.putAll(ctxView))
1007-
.doFinally(
1008-
signalType -> {
1009-
// Remove temporary hook
1010-
hooks.remove(streamingHook);
1011-
})
1012-
.subscribe(
1013-
finalMsg -> {
1014-
if (options.shouldStream(
1015-
EventType.AGENT_RESULT)) {
1016-
sink.next(
1017-
new Event(
1018-
EventType
1019-
.AGENT_RESULT,
1020-
finalMsg,
1021-
true));
1022-
}
1023-
},
1024-
sink::error,
1025-
sink::complete);
1000+
Disposable callDisposable =
1001+
Mono.defer(() -> callSupplier.get())
1002+
.contextWrite(
1003+
context ->
1004+
context.put(
1005+
SubagentEventBus
1006+
.CONTEXT_KEY,
1007+
bus)
1008+
.putAll(
1009+
ctxView))
1010+
.doFinally(
1011+
signalType -> {
1012+
// Remove temporary hook
1013+
hooks.remove(streamingHook);
1014+
})
1015+
.subscribe(
1016+
finalMsg -> {
1017+
if (options.shouldStream(
1018+
EventType
1019+
.AGENT_RESULT)) {
1020+
sink.next(
1021+
new Event(
1022+
EventType
1023+
.AGENT_RESULT,
1024+
finalMsg,
1025+
true));
1026+
}
1027+
},
1028+
sink::error,
1029+
sink::complete);
1030+
sink.onCancel(callDisposable);
10261031
},
10271032
FluxSink.OverflowStrategy.BUFFER)
10281033
.publishOn(Schedulers.boundedElastic()));

0 commit comments

Comments
 (0)