Skip to content

Commit 0dea824

Browse files
authored
Support already completed journals in lambda handlers (#189)
1 parent 7e6b5d4 commit 0dea824

File tree

2 files changed

+4
-3
lines changed

2 files changed

+4
-3
lines changed

src/connection/lambda_connection.ts

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,13 +23,12 @@ const RESOLVED: Promise<void> = Promise.resolve();
2323
export class LambdaConnection implements Connection {
2424
// Empty buffer to store journal output messages
2525
private outputBuffer: Buffer = Buffer.alloc(0);
26-
private suspendedOrCompleted = false;
2726

2827
// Callback to resolve the invocation promise of the Lambda handler when the response is ready
2928
private readonly completionPromise: Promise<Buffer>;
3029
private resolveOnCompleted!: (value: Buffer | PromiseLike<Buffer>) => void;
3130

32-
constructor() {
31+
constructor(private suspendedOrCompleted = false) {
3332
// Promise that signals when the invocation is over, to then flush the messages
3433
this.completionPromise = new Promise<Buffer>((resolve) => {
3534
this.resolveOnCompleted = resolve;

src/server/restate_lambda_handler.ts

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ import { Message } from "../types/types";
2828
import { StateMachine } from "../state_machine";
2929
import { ensureError } from "../types/errors";
3030
import { KeyedRouter, UnKeyedRouter } from "../public_api";
31+
import {OUTPUT_STREAM_ENTRY_MESSAGE_TYPE} from "../types/protocol";
3132

3233
/**
3334
* Creates an Restate entrypoint for services deployed on AWS Lambda and invoked
@@ -223,10 +224,11 @@ export class LambdaRestateServer extends BaseRestateServer {
223224
let decodedEntries: Message[] | null = decodeLambdaBody(event.body);
224225
const journalBuilder = new InvocationBuilder(method);
225226
decodedEntries.forEach((e: Message) => journalBuilder.handleMessage(e));
227+
const alreadyCompleted = decodedEntries.find((e: Message) => e.messageType === OUTPUT_STREAM_ENTRY_MESSAGE_TYPE) !== undefined
226228
decodedEntries = null;
227229

228230
// set up and invoke the state machine
229-
const connection = new LambdaConnection();
231+
const connection = new LambdaConnection(alreadyCompleted);
230232
const stateMachine = new StateMachine(
231233
connection,
232234
journalBuilder.build(),

0 commit comments

Comments
 (0)