Skip to content

Commit da2cac8

Browse files
nirinchevblva
andauthored
fix: don't wait for telemetry events MCP-179 (#521)
Co-authored-by: Bianca Lisle <[email protected]>
1 parent 14176ba commit da2cac8

File tree

8 files changed

+140
-74
lines changed

8 files changed

+140
-74
lines changed

src/common/connectionManager.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -82,7 +82,7 @@ export type TestConnectionManager = ConnectionManager & {
8282

8383
export abstract class ConnectionManager {
8484
protected clientName: string;
85-
protected readonly _events;
85+
protected readonly _events: EventEmitter<ConnectionManagerEvents>;
8686
readonly events: Pick<EventEmitter<ConnectionManagerEvents>, "on" | "off" | "once">;
8787
private state: AnyConnectionState;
8888

src/common/logger.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ export const LogId = {
3535
telemetryMetadataError: mongoLogId(1_002_005),
3636
deviceIdResolutionError: mongoLogId(1_002_006),
3737
deviceIdTimeout: mongoLogId(1_002_007),
38+
telemetryClose: mongoLogId(1_002_008),
3839

3940
toolExecute: mongoLogId(1_003_001),
4041
toolExecuteFailure: mongoLogId(1_003_002),

src/server.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -193,7 +193,7 @@ export class Server {
193193
}
194194
}
195195

196-
this.telemetry.emitEvents([event]).catch(() => {});
196+
this.telemetry.emitEvents([event]);
197197
}
198198

199199
private registerTools(): void {

src/telemetry/eventCache.ts

Lines changed: 15 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -34,11 +34,18 @@ export class EventCache {
3434
}
3535

3636
/**
37-
* Gets a copy of the currently cached events
37+
* Gets the number of currently cached events
38+
*/
39+
public get size(): number {
40+
return this.cache.size;
41+
}
42+
43+
/**
44+
* Gets a copy of the currently cached events along with their ids
3845
* @returns Array of cached BaseEvent objects
3946
*/
40-
public getEvents(): BaseEvent[] {
41-
return Array.from(this.cache.values());
47+
public getEvents(): { id: number; event: BaseEvent }[] {
48+
return Array.from(this.cache.entries()).map(([id, event]) => ({ id, event }));
4249
}
4350

4451
/**
@@ -53,10 +60,11 @@ export class EventCache {
5360
}
5461

5562
/**
56-
* Clears all cached events
63+
* Removes cached events by their ids
5764
*/
58-
public clearEvents(): void {
59-
this.cache.clear();
60-
this.nextId = 0;
65+
public removeEvents(ids: number[]): void {
66+
for (const id of ids) {
67+
this.cache.delete(id);
68+
}
6169
}
6270
}

src/telemetry/telemetry.ts

Lines changed: 81 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -7,16 +7,25 @@ import { MACHINE_METADATA } from "./constants.js";
77
import { EventCache } from "./eventCache.js";
88
import { detectContainerEnv } from "../helpers/container.js";
99
import type { DeviceId } from "../helpers/deviceId.js";
10+
import { EventEmitter } from "events";
1011

1112
type EventResult = {
1213
success: boolean;
1314
error?: Error;
1415
};
1516

17+
export interface TelemetryEvents {
18+
"events-emitted": [];
19+
"events-send-failed": [];
20+
"events-skipped": [];
21+
}
22+
1623
export class Telemetry {
1724
private isBufferingEvents: boolean = true;
1825
/** Resolves when the setup is complete or a timeout occurs */
1926
public setupPromise: Promise<[string, boolean]> | undefined;
27+
public readonly events: EventEmitter<TelemetryEvents> = new EventEmitter();
28+
2029
private eventCache: EventCache;
2130
private deviceId: DeviceId;
2231

@@ -57,6 +66,12 @@ export class Telemetry {
5766

5867
private async setup(): Promise<void> {
5968
if (!this.isTelemetryEnabled()) {
69+
this.session.logger.info({
70+
id: LogId.telemetryEmitFailure,
71+
context: "telemetry",
72+
message: "Telemetry is disabled.",
73+
noRedaction: true,
74+
});
6075
return;
6176
}
6277

@@ -71,34 +86,47 @@ export class Telemetry {
7186

7287
public async close(): Promise<void> {
7388
this.isBufferingEvents = false;
74-
await this.emitEvents(this.eventCache.getEvents());
89+
90+
this.session.logger.debug({
91+
id: LogId.telemetryClose,
92+
message: `Closing telemetry and flushing ${this.eventCache.size} events`,
93+
context: "telemetry",
94+
});
95+
96+
// Wait up to 5 seconds for events to be sent before closing, but don't throw if it times out
97+
const flushMaxWaitTime = 5000;
98+
let flushTimeout: NodeJS.Timeout | undefined;
99+
await Promise.race([
100+
new Promise<void>((resolve) => {
101+
flushTimeout = setTimeout(() => {
102+
this.session.logger.debug({
103+
id: LogId.telemetryClose,
104+
message: `Failed to flush remaining events within ${flushMaxWaitTime}ms timeout`,
105+
context: "telemetry",
106+
});
107+
resolve();
108+
}, flushMaxWaitTime);
109+
flushTimeout.unref();
110+
}),
111+
this.emit([]),
112+
]);
113+
114+
clearTimeout(flushTimeout);
75115
}
76116

77117
/**
78118
* Emits events through the telemetry pipeline
79119
* @param events - The events to emit
80120
*/
81-
public async emitEvents(events: BaseEvent[]): Promise<void> {
82-
try {
83-
if (!this.isTelemetryEnabled()) {
84-
this.session.logger.info({
85-
id: LogId.telemetryEmitFailure,
86-
context: "telemetry",
87-
message: "Telemetry is disabled.",
88-
noRedaction: true,
89-
});
90-
return;
91-
}
92-
93-
await this.emit(events);
94-
} catch {
95-
this.session.logger.debug({
96-
id: LogId.telemetryEmitFailure,
97-
context: "telemetry",
98-
message: "Error emitting telemetry events.",
99-
noRedaction: true,
100-
});
121+
public emitEvents(events: BaseEvent[]): void {
122+
if (!this.isTelemetryEnabled()) {
123+
this.events.emit("events-skipped");
124+
return;
101125
}
126+
127+
// Don't wait for events to be sent - we should not block regular server
128+
// operations on telemetry
129+
void this.emit(events);
102130
}
103131

104132
/**
@@ -144,32 +172,44 @@ export class Telemetry {
144172
return;
145173
}
146174

147-
const cachedEvents = this.eventCache.getEvents();
148-
const allEvents = [...cachedEvents, ...events];
175+
try {
176+
const cachedEvents = this.eventCache.getEvents();
177+
const allEvents = [...cachedEvents.map((e) => e.event), ...events];
149178

150-
this.session.logger.debug({
151-
id: LogId.telemetryEmitStart,
152-
context: "telemetry",
153-
message: `Attempting to send ${allEvents.length} events (${cachedEvents.length} cached)`,
154-
});
179+
this.session.logger.debug({
180+
id: LogId.telemetryEmitStart,
181+
context: "telemetry",
182+
message: `Attempting to send ${allEvents.length} events (${cachedEvents.length} cached)`,
183+
});
184+
185+
const result = await this.sendEvents(this.session.apiClient, allEvents);
186+
if (result.success) {
187+
this.eventCache.removeEvents(cachedEvents.map((e) => e.id));
188+
this.session.logger.debug({
189+
id: LogId.telemetryEmitSuccess,
190+
context: "telemetry",
191+
message: `Sent ${allEvents.length} events successfully: ${JSON.stringify(allEvents)}`,
192+
});
193+
this.events.emit("events-emitted");
194+
return;
195+
}
155196

156-
const result = await this.sendEvents(this.session.apiClient, allEvents);
157-
if (result.success) {
158-
this.eventCache.clearEvents();
159197
this.session.logger.debug({
160-
id: LogId.telemetryEmitSuccess,
198+
id: LogId.telemetryEmitFailure,
161199
context: "telemetry",
162-
message: `Sent ${allEvents.length} events successfully: ${JSON.stringify(allEvents, null, 2)}`,
200+
message: `Error sending event to client: ${result.error instanceof Error ? result.error.message : String(result.error)}`,
163201
});
164-
return;
202+
this.eventCache.appendEvents(events);
203+
this.events.emit("events-send-failed");
204+
} catch (error) {
205+
this.session.logger.debug({
206+
id: LogId.telemetryEmitFailure,
207+
context: "telemetry",
208+
message: `Error emitting telemetry events: ${error instanceof Error ? error.message : String(error)}`,
209+
noRedaction: true,
210+
});
211+
this.events.emit("events-send-failed");
165212
}
166-
167-
this.session.logger.debug({
168-
id: LogId.telemetryEmitFailure,
169-
context: "telemetry",
170-
message: `Error sending event to client: ${result.error instanceof Error ? result.error.message : String(result.error)}`,
171-
});
172-
this.eventCache.appendEvents(events);
173213
}
174214

175215
/**

src/tools/tool.ts

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -82,7 +82,14 @@ export abstract class ToolBase {
8282
});
8383

8484
const result = await this.execute(...args);
85-
await this.emitToolEvent(startTime, result, ...args).catch(() => {});
85+
this.emitToolEvent(startTime, result, ...args);
86+
87+
this.session.logger.debug({
88+
id: LogId.toolExecute,
89+
context: "tool",
90+
message: `Executed tool ${this.name}`,
91+
noRedaction: true,
92+
});
8693
return result;
8794
} catch (error: unknown) {
8895
this.session.logger.error({
@@ -91,7 +98,7 @@ export abstract class ToolBase {
9198
message: `Error executing ${this.name}: ${error as string}`,
9299
});
93100
const toolResult = await this.handleError(error, args[0] as ToolArgs<typeof this.argsShape>);
94-
await this.emitToolEvent(startTime, toolResult, ...args).catch(() => {});
101+
this.emitToolEvent(startTime, toolResult, ...args);
95102
return toolResult;
96103
}
97104
};
@@ -200,11 +207,11 @@ export abstract class ToolBase {
200207
* @param result - Whether the command succeeded or failed
201208
* @param args - The arguments passed to the tool
202209
*/
203-
private async emitToolEvent(
210+
private emitToolEvent(
204211
startTime: number,
205212
result: CallToolResult,
206213
...args: Parameters<ToolCallback<typeof this.argsShape>>
207-
): Promise<void> {
214+
): void {
208215
if (!this.telemetry.isTelemetryEnabled()) {
209216
return;
210217
}
@@ -230,7 +237,7 @@ export abstract class ToolBase {
230237
event.properties.project_id = metadata.projectId;
231238
}
232239

233-
await this.telemetry.emitEvents([event]);
240+
this.telemetry.emitEvents([event]);
234241
}
235242
}
236243

src/transports/streamableHttp.ts

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -124,7 +124,7 @@ export class StreamableHttpRunner extends TransportRunnerBase {
124124
// eslint-disable-next-line @typescript-eslint/no-misused-promises
125125
keepAliveLoop = setInterval(async () => {
126126
try {
127-
this.logger.debug({
127+
server.session.logger.debug({
128128
id: LogId.streamableHttpTransportKeepAlive,
129129
context: "streamableHttpTransport",
130130
message: "Sending ping",
@@ -138,7 +138,7 @@ export class StreamableHttpRunner extends TransportRunnerBase {
138138
} catch (err) {
139139
try {
140140
failedPings++;
141-
this.logger.warning({
141+
server.session.logger.warning({
142142
id: LogId.streamableHttpTransportKeepAliveFailure,
143143
context: "streamableHttpTransport",
144144
message: `Error sending ping (attempt #${failedPings}): ${err instanceof Error ? err.message : String(err)}`,
@@ -162,7 +162,7 @@ export class StreamableHttpRunner extends TransportRunnerBase {
162162
this.logger.error({
163163
id: LogId.streamableHttpTransportSessionCloseFailure,
164164
context: "streamableHttpTransport",
165-
message: `Error closing session: ${error instanceof Error ? error.message : String(error)}`,
165+
message: `Error closing session ${sessionId}: ${error instanceof Error ? error.message : String(error)}`,
166166
});
167167
}
168168
},

0 commit comments

Comments
 (0)