Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
106 changes: 81 additions & 25 deletions src/server/events/execution_event_bus.ts
Original file line number Diff line number Diff line change
@@ -1,37 +1,93 @@
import { EventEmitter } from 'events';
import { Message, Task, TaskStatusUpdateEvent, TaskArtifactUpdateEvent } from "../../types.js";

import {
Message,
Task,
TaskStatusUpdateEvent,
TaskArtifactUpdateEvent,
} from "../../types.js";
export type AgentExecutionEvent = Message | Task | TaskStatusUpdateEvent | TaskArtifactUpdateEvent;

export type AgentExecutionEvent =
| Message
| Task
| TaskStatusUpdateEvent
| TaskArtifactUpdateEvent;
export type EventListener = (event: AgentExecutionEvent) => void;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

To safely handle events that don't have a payload, like the finished event, the EventListener type should be updated to make the event parameter optional. This prevents type-safety issues when dispatching parameter-less events and makes the API more robust.

Suggested change
export type EventListener = (event: AgentExecutionEvent) => void;
export type EventListener = (event?: AgentExecutionEvent) => void;

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1


export interface ExecutionEventBus {
publish(event: AgentExecutionEvent): void;
on(eventName: 'event' | 'finished', listener: (event: AgentExecutionEvent) => void): this;
off(eventName: 'event' | 'finished', listener: (event: AgentExecutionEvent) => void): this;
once(eventName: 'event' | 'finished', listener: (event: AgentExecutionEvent) => void): this;
removeAllListeners(eventName?: 'event' | 'finished'): this;
finished(): void;
publish(event: AgentExecutionEvent): void;
on(eventName: "event" | "finished", listener: EventListener): this;
off(eventName: "event" | "finished", listener: EventListener): this;
once(eventName: "event" | "finished", listener: EventListener): this;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's remove this once method. I don't this is used.

removeAllListeners(eventName?: "event" | "finished"): this;
finished(): void;
}

export class DefaultExecutionEventBus extends EventEmitter implements ExecutionEventBus {
constructor() {
super();
export class DefaultExecutionEventBus implements ExecutionEventBus {
private eventListeners: Map<string, Set<EventListener>> = new Map();

constructor() {
this.eventListeners.set("event", new Set());
this.eventListeners.set("finished", new Set());
}

publish(event: AgentExecutionEvent): void {
const listeners = this.eventListeners.get("event");
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the following code is copied across both publish & finished. Can we just encapsulate this as _emit().

if (listeners) {
// Create a copy of listeners to avoid issues if listeners are modified during iteration
const listenersCopy = Array.from(listeners);
for (const listener of listenersCopy) {
try {
listener(event);
} catch (error) {
console.error("Error in event listener:", error);
}
}
}
}

finished(): void {
// Emit finished event to 'finished' listeners
const finishedListeners = this.eventListeners.get("finished");
if (finishedListeners) {
const listenersCopy = Array.from(finishedListeners);
for (const listener of listenersCopy) {
try {
// For finished event, we don't pass an event object
listener({} as AgentExecutionEvent);
Comment on lines +46 to +47
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

critical

The implementation of the finished event is unsafe and its behavior has subtly changed. The listener is called with {} as AgentExecutionEvent, which is not a valid AgentExecutionEvent and will cause runtime errors for listeners that inspect the event object. The kind property, present on all types within AgentExecutionEvent, will be undefined.

The previous EventEmitter implementation would call the listener with no arguments (i.e., undefined for the first parameter). Calling with an empty object {} changes this behavior, which could break listeners that check for the event's existence (e.g., if (event)).

To maintain previous behavior and avoid unsafe type casting, it's better to call the listener without arguments. This requires a cast to any to bypass the strict type signature, which also highlights a potential design issue in the ExecutionEventBus interface where finished listeners are expected to receive an AgentExecutionEvent.

Suggested change
// For finished event, we don't pass an event object
listener({} as AgentExecutionEvent);
// For finished event, we don't pass an event object, matching previous EventEmitter behavior.
(listener as any)();

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As suggested, lets make the event attribute as optional.

Comment on lines +46 to +47
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

Casting an empty object {} to AgentExecutionEvent is not type-safe and can lead to runtime errors in consumers of this event bus, as it lacks the kind discriminator property. The finished event is a signal and should not carry a payload. In conjunction with making the event parameter optional in EventListener, this should be changed to call the listener without any arguments.

Suggested change
// For finished event, we don't pass an event object
listener({} as AgentExecutionEvent);
// For finished event, we don't pass an event object
listener();

} catch (error) {
console.error("Error in finished listener:", error);
}
}
}
}

publish(event: AgentExecutionEvent): void {
this.emit('event', event);
on(eventName: "event" | "finished", listener: EventListener): this {
const listeners = this.eventListeners.get(eventName);
if (listeners) {
listeners.add(listener);
}
return this;
}

off(eventName: "event" | "finished", listener: EventListener): this {
const listeners = this.eventListeners.get(eventName);
if (listeners) {
listeners.delete(listener);
}
return this;
}

once(eventName: "event" | "finished", listener: EventListener): this {
const onceWrapper = (event: AgentExecutionEvent) => {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

To align with the proposed change to EventListener (making the event parameter optional), the onceWrapper function should also handle an optional event argument to maintain type consistency across the event bus implementation.

Suggested change
const onceWrapper = (event: AgentExecutionEvent) => {
const onceWrapper = (event?: AgentExecutionEvent) => {

listener(event);
this.off(eventName, onceWrapper);
};
return this.on(eventName, onceWrapper);
}

finished(): void {
this.emit('finished');
removeAllListeners(eventName?: "event" | "finished"): this {
if (eventName) {
const listeners = this.eventListeners.get(eventName);
if (listeners) {
listeners.clear();
}
} else {
// Remove all listeners for all events
for (const listeners of this.eventListeners.values()) {
listeners.clear();
}
}
return this;
}
}
2 changes: 1 addition & 1 deletion src/server/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
export type { AgentExecutor } from "./agent_execution/agent_executor.js";
export { RequestContext } from "./agent_execution/request_context.js";

export type { AgentExecutionEvent, ExecutionEventBus } from "./events/execution_event_bus.js";
export type { AgentExecutionEvent, EventListener, ExecutionEventBus } from "./events/execution_event_bus.js";
export { DefaultExecutionEventBus } from "./events/execution_event_bus.js";
export type { ExecutionEventBusManager } from "./events/execution_event_bus_manager.js";
export { DefaultExecutionEventBusManager } from "./events/execution_event_bus_manager.js";
Expand Down