diff --git a/src/server/events/execution_event_bus.ts b/src/server/events/execution_event_bus.ts index 0ffd9fd..6422de6 100644 --- a/src/server/events/execution_event_bus.ts +++ b/src/server/events/execution_event_bus.ts @@ -1,37 +1,93 @@ -import { EventEmitter } from 'events'; +import { Message, Task, TaskStatusUpdateEvent, TaskArtifactUpdateEvent } from "../../types.js"; -import { - Message, - Task, - TaskStatusUpdateEvent, - TaskArtifactUpdateEvent, -} from "../../types.js"; +export type AgentExecutionEvent = Message | Task | TaskStatusUpdateEvent | TaskArtifactUpdateEvent; -export type AgentExecutionEvent = - | Message - | Task - | TaskStatusUpdateEvent - | TaskArtifactUpdateEvent; +export type EventListener = (event: AgentExecutionEvent) => void; export interface ExecutionEventBus { - publish(event: AgentExecutionEvent): void; - on(eventName: 'event' | 'finished', listener: (event: AgentExecutionEvent) => void): this; - off(eventName: 'event' | 'finished', listener: (event: AgentExecutionEvent) => void): this; - once(eventName: 'event' | 'finished', listener: (event: AgentExecutionEvent) => void): this; - removeAllListeners(eventName?: 'event' | 'finished'): this; - finished(): void; + publish(event: AgentExecutionEvent): void; + on(eventName: "event" | "finished", listener: EventListener): this; + off(eventName: "event" | "finished", listener: EventListener): this; + once(eventName: "event" | "finished", listener: EventListener): this; + removeAllListeners(eventName?: "event" | "finished"): this; + finished(): void; } -export class DefaultExecutionEventBus extends EventEmitter implements ExecutionEventBus { - constructor() { - super(); +export class DefaultExecutionEventBus implements ExecutionEventBus { + private eventListeners: Map> = new Map(); + + constructor() { + this.eventListeners.set("event", new Set()); + this.eventListeners.set("finished", new Set()); + } + + publish(event: AgentExecutionEvent): void { + const listeners = this.eventListeners.get("event"); + if (listeners) { + // Create a copy of listeners to avoid issues if listeners are modified during iteration + const listenersCopy = Array.from(listeners); + for (const listener of listenersCopy) { + try { + listener(event); + } catch (error) { + console.error("Error in event listener:", error); + } + } + } + } + + finished(): void { + // Emit finished event to 'finished' listeners + const finishedListeners = this.eventListeners.get("finished"); + if (finishedListeners) { + const listenersCopy = Array.from(finishedListeners); + for (const listener of listenersCopy) { + try { + // For finished event, we don't pass an event object + listener({} as AgentExecutionEvent); + } catch (error) { + console.error("Error in finished listener:", error); + } + } } + } - publish(event: AgentExecutionEvent): void { - this.emit('event', event); + on(eventName: "event" | "finished", listener: EventListener): this { + const listeners = this.eventListeners.get(eventName); + if (listeners) { + listeners.add(listener); } + return this; + } + + off(eventName: "event" | "finished", listener: EventListener): this { + const listeners = this.eventListeners.get(eventName); + if (listeners) { + listeners.delete(listener); + } + return this; + } + + once(eventName: "event" | "finished", listener: EventListener): this { + const onceWrapper = (event: AgentExecutionEvent) => { + listener(event); + this.off(eventName, onceWrapper); + }; + return this.on(eventName, onceWrapper); + } - finished(): void { - this.emit('finished'); + removeAllListeners(eventName?: "event" | "finished"): this { + if (eventName) { + const listeners = this.eventListeners.get(eventName); + if (listeners) { + listeners.clear(); + } + } else { + // Remove all listeners for all events + for (const listeners of this.eventListeners.values()) { + listeners.clear(); + } } + return this; + } } diff --git a/src/server/index.ts b/src/server/index.ts index 682d196..60db48e 100644 --- a/src/server/index.ts +++ b/src/server/index.ts @@ -6,7 +6,7 @@ export type { AgentExecutor } from "./agent_execution/agent_executor.js"; export { RequestContext } from "./agent_execution/request_context.js"; -export type { AgentExecutionEvent, ExecutionEventBus } from "./events/execution_event_bus.js"; +export type { AgentExecutionEvent, EventListener, ExecutionEventBus } from "./events/execution_event_bus.js"; export { DefaultExecutionEventBus } from "./events/execution_event_bus.js"; export type { ExecutionEventBusManager } from "./events/execution_event_bus_manager.js"; export { DefaultExecutionEventBusManager } from "./events/execution_event_bus_manager.js";