Skip to content

Commit 13e896f

Browse files
authored
delete logger from map on error (#548)
Signed-off-by: Nik Nasr <[email protected]>
1 parent fbb8197 commit 13e896f

File tree

1 file changed

+175
-169
lines changed
  • packages/restate-sdk/src/endpoint/handlers

1 file changed

+175
-169
lines changed

packages/restate-sdk/src/endpoint/handlers/generic.ts

Lines changed: 175 additions & 169 deletions
Original file line numberDiff line numberDiff line change
@@ -231,187 +231,193 @@ export class GenericHandler implements RestateHandler {
231231
): Promise<RestateResponse> {
232232
const loggerId = Math.floor(Math.random() * 4_294_967_295 /* u32::MAX */);
233233

234-
// Instantiate core vm and prepare response headers
235-
const vmHeaders = Object.entries(headers)
236-
.filter(([, v]) => v !== undefined)
237-
.map(
238-
([k, v]) =>
239-
new vm.WasmHeader(k, v instanceof Array ? v[0] : (v as string))
234+
try {
235+
// Instantiate core vm and prepare response headers
236+
const vmHeaders = Object.entries(headers)
237+
.filter(([, v]) => v !== undefined)
238+
.map(
239+
([k, v]) =>
240+
new vm.WasmHeader(k, v instanceof Array ? v[0] : (v as string))
241+
);
242+
const coreVm = new vm.WasmVM(
243+
vmHeaders,
244+
restateLogLevelToWasmLogLevel(DEFAULT_CONSOLE_LOGGER_LOG_LEVEL),
245+
loggerId
246+
);
247+
const responseHead = coreVm.get_response_head();
248+
const responseHeaders = responseHead.headers.reduce(
249+
(headers, { key, value }) => ({
250+
[key]: value,
251+
...headers,
252+
}),
253+
{
254+
"x-restate-server": X_RESTATE_SERVER,
255+
}
240256
);
241-
const coreVm = new vm.WasmVM(
242-
vmHeaders,
243-
restateLogLevelToWasmLogLevel(DEFAULT_CONSOLE_LOGGER_LOG_LEVEL),
244-
loggerId
245-
);
246-
const responseHead = coreVm.get_response_head();
247-
const responseHeaders = responseHead.headers.reduce(
248-
(headers, { key, value }) => ({
249-
[key]: value,
250-
...headers,
251-
}),
252-
{
253-
"x-restate-server": X_RESTATE_SERVER,
254-
}
255-
);
256257

257-
// Use a default logger that still respects the endpoint custom logger
258-
// We will override this later with a logger that has a LoggerContext
259-
// See vm_log below for more details
260-
invocationLoggers.set(
261-
loggerId,
262-
createLogger(
263-
this.endpoint.loggerTransport,
264-
LogSource.JOURNAL,
265-
undefined,
266-
() => false
267-
)
268-
);
258+
// Use a default logger that still respects the endpoint custom logger
259+
// We will override this later with a logger that has a LoggerContext
260+
// See vm_log below for more details
261+
invocationLoggers.set(
262+
loggerId,
263+
createLogger(
264+
this.endpoint.loggerTransport,
265+
LogSource.JOURNAL,
266+
undefined
267+
)
268+
);
269269

270-
const inputReader = body.getReader();
270+
const inputReader = body.getReader();
271271

272-
// Now buffer input entries
273-
while (!coreVm.is_ready_to_execute()) {
274-
const nextValue = await inputReader.read();
275-
if (nextValue.value !== undefined) {
276-
coreVm.notify_input(nextValue.value);
277-
}
278-
if (nextValue.done) {
279-
coreVm.notify_input_closed();
280-
break;
272+
// Now buffer input entries
273+
while (!coreVm.is_ready_to_execute()) {
274+
const nextValue = await inputReader.read();
275+
if (nextValue.value !== undefined) {
276+
coreVm.notify_input(nextValue.value);
277+
}
278+
if (nextValue.done) {
279+
coreVm.notify_input_closed();
280+
break;
281+
}
281282
}
282-
}
283283

284-
// Get input
285-
const input = coreVm.sys_input();
286-
287-
const invocationRequest: Request = {
288-
id: input.invocation_id,
289-
headers: input.headers.reduce((headers, { key, value }) => {
290-
headers.set(key, value);
291-
return headers;
292-
}, new Map()),
293-
attemptHeaders: Object.entries(headers).reduce(
294-
(headers, [key, value]) => {
295-
if (value !== undefined) {
296-
headers.set(key, value instanceof Array ? value[0] : value);
297-
}
298-
return headers;
299-
},
300-
new Map()
301-
),
302-
body: input.input,
303-
extraArgs,
304-
attemptCompletedSignal: abortSignal,
305-
};
284+
// Get input
285+
const input = coreVm.sys_input();
306286

307-
// Prepare logger
308-
const loggerContext = new LoggerContext(
309-
input.invocation_id,
310-
handler.component().name(),
311-
handler.name(),
312-
handler.kind() === HandlerKind.SERVICE ? undefined : input.key,
313-
invocationRequest,
314-
additionalContext
315-
);
316-
const ctxLogger = createLogger(
317-
this.endpoint.loggerTransport,
318-
LogSource.USER,
319-
loggerContext,
320-
() => !coreVm.is_processing()
321-
);
322-
const vmLogger = createLogger(
323-
this.endpoint.loggerTransport,
324-
LogSource.JOURNAL,
325-
loggerContext,
326-
// Filtering is done within the shared core
327-
() => false
328-
);
329-
// See vm_log below for more details
330-
invocationLoggers.set(loggerId, vmLogger);
331-
if (!coreVm.is_processing()) {
332-
vmLogger.info("Replaying invocation.");
333-
} else {
334-
vmLogger.info("Starting invocation.");
335-
}
287+
const invocationRequest: Request = {
288+
id: input.invocation_id,
289+
headers: input.headers.reduce((headers, { key, value }) => {
290+
headers.set(key, value);
291+
return headers;
292+
}, new Map()),
293+
attemptHeaders: Object.entries(headers).reduce(
294+
(headers, [key, value]) => {
295+
if (value !== undefined) {
296+
headers.set(key, value instanceof Array ? value[0] : value);
297+
}
298+
return headers;
299+
},
300+
new Map()
301+
),
302+
body: input.input,
303+
extraArgs,
304+
attemptCompletedSignal: abortSignal,
305+
};
336306

337-
// This promise is used to signal the end of the computation,
338-
// which can be either the user returns a value,
339-
// or an exception gets catched, or the state machine fails/suspends.
340-
//
341-
// The last case is handled internally within the ContextImpl.
342-
const invocationEndPromise = new CompletablePromise<void>();
343-
344-
// Prepare response stream
345-
const responseTransformStream = new TransformStream<Uint8Array>();
346-
const outputWriter = responseTransformStream.writable.getWriter();
347-
348-
// Prepare context
349-
const ctx = new ContextImpl(
350-
coreVm,
351-
input,
352-
ctxLogger,
353-
handler.kind(),
354-
vmLogger,
355-
invocationRequest,
356-
invocationEndPromise,
357-
inputReader,
358-
outputWriter
359-
);
307+
// Prepare logger
308+
const loggerContext = new LoggerContext(
309+
input.invocation_id,
310+
handler.component().name(),
311+
handler.name(),
312+
handler.kind() === HandlerKind.SERVICE ? undefined : input.key,
313+
invocationRequest,
314+
additionalContext
315+
);
316+
const ctxLogger = createLogger(
317+
this.endpoint.loggerTransport,
318+
LogSource.USER,
319+
loggerContext,
320+
() => !coreVm.is_processing()
321+
);
322+
const vmLogger = createLogger(
323+
this.endpoint.loggerTransport,
324+
LogSource.JOURNAL,
325+
loggerContext
326+
// Filtering is done within the shared core
327+
);
328+
// See vm_log below for more details
329+
invocationLoggers.set(loggerId, vmLogger);
330+
if (!coreVm.is_processing()) {
331+
vmLogger.info("Replaying invocation.");
332+
} else {
333+
vmLogger.info("Starting invocation.");
334+
}
360335

361-
// Finally invoke user handler
362-
handler
363-
.invoke(ctx, input.input)
364-
.then((bytes) => {
365-
coreVm.sys_write_output_success(bytes);
366-
coreVm.sys_end();
367-
vmLogger.info("Invocation completed successfully.");
368-
})
369-
.catch((e) => {
370-
const error = ensureError(e);
371-
if (
372-
!(error instanceof RestateError) ||
373-
error.code !== SUSPENDED_ERROR_CODE
374-
) {
375-
vmLogger.warn("Invocation completed with an error.\n", error);
376-
}
336+
// This promise is used to signal the end of the computation,
337+
// which can be either the user returns a value,
338+
// or an exception gets catched, or the state machine fails/suspends.
339+
//
340+
// The last case is handled internally within the ContextImpl.
341+
const invocationEndPromise = new CompletablePromise<void>();
342+
343+
// Prepare response stream
344+
const responseTransformStream = new TransformStream<Uint8Array>();
345+
const outputWriter = responseTransformStream.writable.getWriter();
346+
347+
// Prepare context
348+
const ctx = new ContextImpl(
349+
coreVm,
350+
input,
351+
ctxLogger,
352+
handler.kind(),
353+
vmLogger,
354+
invocationRequest,
355+
invocationEndPromise,
356+
inputReader,
357+
outputWriter
358+
);
377359

378-
if (error instanceof TerminalError) {
379-
coreVm.sys_write_output_failure({
380-
code: error.code,
381-
message: error.message,
382-
});
360+
// Finally invoke user handler
361+
handler
362+
.invoke(ctx, input.input)
363+
.then((bytes) => {
364+
coreVm.sys_write_output_success(bytes);
383365
coreVm.sys_end();
384-
} else {
385-
coreVm.notify_error(error.message, error.stack);
386-
}
387-
})
388-
.finally(() => {
389-
invocationEndPromise.resolve();
390-
});
391-
392-
// Let's wire up invocationEndPromise with consuming all the output and closing the streams.
393-
invocationEndPromise.promise
394-
.then(async () => {
395-
// Consume output till the end, write it out, then close the stream
396-
let nextOutput = coreVm.take_output() as Uint8Array | null | undefined;
397-
while (nextOutput !== null && nextOutput !== undefined) {
398-
await outputWriter.write(nextOutput);
399-
nextOutput = coreVm.take_output() as Uint8Array | null | undefined;
400-
}
401-
await outputWriter.close();
402-
// Let's cancel the input reader, if it's still here
403-
inputReader.cancel().catch(() => {});
404-
})
405-
.finally(() => {
406-
invocationLoggers.delete(loggerId);
407-
})
408-
.catch(() => {});
366+
vmLogger.info("Invocation completed successfully.");
367+
})
368+
.catch((e) => {
369+
const error = ensureError(e);
370+
if (
371+
!(error instanceof RestateError) ||
372+
error.code !== SUSPENDED_ERROR_CODE
373+
) {
374+
vmLogger.warn("Invocation completed with an error.\n", error);
375+
}
409376

410-
return {
411-
headers: responseHeaders,
412-
statusCode: responseHead.status_code,
413-
body: responseTransformStream.readable as ReadableStream<Uint8Array>,
414-
};
377+
if (error instanceof TerminalError) {
378+
coreVm.sys_write_output_failure({
379+
code: error.code,
380+
message: error.message,
381+
});
382+
coreVm.sys_end();
383+
} else {
384+
coreVm.notify_error(error.message, error.stack);
385+
}
386+
})
387+
.finally(() => {
388+
invocationEndPromise.resolve();
389+
});
390+
391+
// Let's wire up invocationEndPromise with consuming all the output and closing the streams.
392+
invocationEndPromise.promise
393+
.then(async () => {
394+
// Consume output till the end, write it out, then close the stream
395+
let nextOutput = coreVm.take_output() as
396+
| Uint8Array
397+
| null
398+
| undefined;
399+
while (nextOutput !== null && nextOutput !== undefined) {
400+
await outputWriter.write(nextOutput);
401+
nextOutput = coreVm.take_output() as Uint8Array | null | undefined;
402+
}
403+
await outputWriter.close();
404+
// Let's cancel the input reader, if it's still here
405+
inputReader.cancel().catch(() => {});
406+
})
407+
.finally(() => {
408+
invocationLoggers.delete(loggerId);
409+
})
410+
.catch(() => {});
411+
412+
return {
413+
headers: responseHeaders,
414+
statusCode: responseHead.status_code,
415+
body: responseTransformStream.readable as ReadableStream<Uint8Array>,
416+
};
417+
} catch (error) {
418+
invocationLoggers.delete(loggerId);
419+
throw error;
420+
}
415421
}
416422

417423
private handleDiscovery(

0 commit comments

Comments
 (0)